一、迁移元数据
- 目标机器安装相册应用
- 拷贝照片目录, 注意:照片目录最好和源机器一模一样, 包括存储空间, 不然要改路径
- ssh登陆源机器
- 进入 /var/apps, 用 sudo tar zcvfh trim.photos.app.tar trim.photos 打包 trim.photos 目录(注意:一定要加 h 选项, 打包软链接目录)
- 将打包后的tar拷贝到目标机器
- 解压, 解压之后目录应该是

- 目标机器进入 /var/apps/trim.photos, 现在的目录结构是

- 将解压后的目录, 依次拷贝到软链接的目标目录, 例如 etc, 拷贝到 /usr/local/apps/@appconf/trim.photos 用命令 sudo cp -rfv etc/. /usr/local/apps/@appconf/trim.photos/
- etc home meta target tmp var 都处理完之后相册应用应该能看到元数据了
二、修复路径
进入 /var/apps/trim.photos 执行
sudo python3 process_db.py . "/vol2/" "/vol1/"
其中, /vol2/ 是源路径中需要替换的部分, /vol1/是新的路径, 如果路径其他部分也变了, 应该先从最长路径开始替换, 多次执行py脚本即可.
process_db.py内容如下:
#!/usr/bin/env python3
"""
SQLite 数据库字符串查找替换工具
递归处理指定目录下的所有 .db 文件
"""
import argparse
import os
import sqlite3
import sys
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from typing import List, Tuple
import threading
class DatabaseProcessor:
def __init__(self, search_str: str, replace_str: str, verbose: bool = True):
self.search_str = search_str
self.replace_str = replace_str
self.verbose = verbose
self.lock = threading.Lock()
def log(self, message: str):
"""线程安全的日志输出"""
if self.verbose:
with self.lock:
print(message)
def find_db_files(self, target_dir: str) -> List[Path]:
"""
递归查找目标目录下所有的 .db 文件
包括软链接指向的目录
"""
db_files = []
target_path = Path(target_dir).resolve()
if not target_path.exists():
self.log(f"[错误] 目标目录不存在: {target_dir}")
return db_files
self.log(f"[信息] 开始扫描目录: {target_path}")
# 使用 os.walk 支持 followlinks 参数来遍历软链接
for root, dirs, files in os.walk(target_path, followlinks=True):
root_path = Path(root)
for filename in files:
if filename.endswith('.db'):
db_file = root_path / filename
# 检查是否是真正的SQLite数据库文件
if self._is_sqlite_db(db_file):
db_files.append(db_file)
self.log(f"[发现] SQLite数据库: {db_file}")
self.log(f"[信息] 共发现 {len(db_files)} 个 SQLite 数据库文件")
return db_files
@staticmethod
def _is_sqlite_db(file_path: Path) -> bool:
"""检查文件是否为SQLite数据库"""
try:
if not file_path.is_file():
return False
# 读取文件头检查SQLite魔数
with open(file_path, 'rb') as f:
header = f.read(16)
return header.startswith(b'SQLite format 3')
except (Permissi*, OSError):
return False
@staticmethod
def _check_writable(db_path: Path) -> bool:
"""检查数据库文件是否可写"""
# 检查文件本身是否可写
if not os.access(db_path, os.W_OK):
return False
# 检查所在目录是否可写(SQLite需要创建journal文件)
if not os.access(db_path.parent, os.W_OK):
return False
return True
def process_database(self, db_path: Path) -> Tuple[str, int, int]:
"""
处理单个数据库文件
返回: (数据库路径, 处理的表数, 替换的记录数)
"""
db_path_str = str(db_path)
self.log(f"[处理] 开始处理数据库: {db_path}")
# 检查写权限
if not self._check_writable(db_path):
self.log(f"[错误] 数据库文件或目录没有写权限: {db_path}")
self.log(f"[提示] 请检查文件权限: chmod 644 {db_path}")
self.log(f"[提示] 或检查目录权限: chmod 755 {db_path.parent}")
return db_path_str, 0, 0
try:
conn = sqlite3.connect(db_path_str)
cursor = conn.cursor()
# 获取所有表名
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'")
tables = cursor.fetchall()
total_tables = len(tables)
total_replacements = 0
for table_tuple in tables:
table_name = table_tuple[0]
replacements = self._process_table(cursor, table_name)
total_replacements += replacements
conn.close()
self.log(f"[完成] 数据库 {db_path.name}: 处理 {total_tables} 个表, 替换 {total_replacements} 条记录")
return (db_path_str, total_tables, total_replacements)
except sqlite3.Error as e:
self.log(f"[错误] 处理数据库 {db_path} 时出错: {e}")
return (db_path_str, 0, 0)
except Exception as e:
self.log(f"[错误] 处理数据库 {db_path} 时发生未知错误: {e}")
return (db_path_str, 0, 0)
def _process_table(self, cursor, table_name: str) -> int:
"""
处理单个表
返回替换的记录数
"""
try:
# 获取表的列信息
cursor.execute(f"PRAGMA table_info(`{table_name}`)")
columns = cursor.fetchall()
if not columns:
return 0
# 获取主键列名(用于更新)
primary_keys = [col[1] for col in columns if col[5] == 1] # col[5] 是 pk 标志
rowid_col = 'rowid' if not primary_keys else primary_keys[0]
# 所有文本类型的列
text_columns = []
for col in columns:
col_name = col[1]
col_type = col[2].upper() if col[2] else ''
# 检查是否为文本类型
if any(t in col_type for t in ['TEXT', 'CHAR', 'CLOB', 'VARCHAR', 'NVARCHAR']):
text_columns.append(col_name)
if not text_columns:
return 0
# 查询所有数据
cursor.execute(f"SELECT {rowid_col}, * FROM `{table_name}`")
rows = cursor.fetchall()
replacement_count = 0
for row in rows:
row_id = row[0]
row_data = row[1:] # 跳过 rowid
# 检查每一列是否需要替换
for i, col_value in enumerate(row_data):
if col_value is None:
continue
col_str = str(col_value)
if self.search_str in col_str:
# 执行替换
new_value = col_str.replace(self.search_str, self.replace_str)
col_name = columns[i][1]
try:
cursor.execute(
f"UPDATE `{table_name}` SET `{col_name}` = ? WHERE {rowid_col} = ?",
(new_value, row_id)
)
replacement_count += 1
self.log(f" [替换] 表 `{table_name}`.列 `{col_name}` ID={row_id}: '{col_str[:50]}...' -> '{new_value[:50]}...'")
except sqlite3.Error as e:
self.log(f" [错误] 更新失败: {e}")
# 提交当前表的更改
cursor.connection.commit()
return replacement_count
except sqlite3.Error as e:
self.log(f"[错误] 处理表 {table_name} 时出错: {e}")
return 0
def process_databases_concurrently(
target_dir: str,
search_str: str,
replace_str: str,
max_workers: int = 4,
verbose: bool = True
) -> None:
"""并发处理多个数据库文件"""
processor = DatabaseProcessor(search_str, replace_str, verbose)
# 查找所有数据库文件
db_files = processor.find_db_files(target_dir)
if not db_files:
print("[信息] 未找到任何 SQLite 数据库文件")
return
print(f"\n[信息] 开始处理 {len(db_files)} 个数据库文件 (并发数: {max_workers})\n")
# 使用线程池并发处理
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(processor.process_database, db_path): db_path for db_path in db_files}
results = []
for future in as_completed(futures):
db_path = futures[future]
try:
result = future.result()
results.append(result)
except Exception as e:
print(f"[错误] 处理 {db_path} 时发生异常: {e}")
# 输出汇总
print("\n" + "=" * 60)
print("处理汇总:")
print("=" * 60)
total_tables = 0
total_replacements = 0
for db_path, table_count, replacement_count in results:
print(f" {Path(db_path).name}: {table_count} 个表, {replacement_count} 次替换")
total_tables += table_count
total_replacements += replacement_count
print("-" * 60)
print(f"总计: {len(results)} 个数据库, {total_tables} 个表, {total_replacements} 次替换")
print("=" * 60)
def main():
parser = argparse.ArgumentParser(
description='SQLite 数据库字符串查找替换工具',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
示例:
python process_db.py /path/to/dbs "old_string" "new_string"
python process_db.py ./data "localhost" "127.0.0.1" --workers 8
python process_db.py /data/db "test" "production" -q
"""
)
parser.add_argument('target_dir', help='目标目录路径')
parser.add_argument('search_str', help='要查找的字符串')
parser.add_argument('replace_str', help='用于替换的字符串')
parser.add_argument(
'--workers', '-w',
type=int,
default=4,
help='并发处理的数据库数量 (默认: 4)'
)
parser.add_argument(
'--quiet', '-q',
action='store_true',
help='静默模式,只输出汇总信息'
)
args = parser.parse_args()
# 验证参数
if not args.search_str:
print("[错误] 查找字符串不能为空")
sys.exit(1)
if args.search_str == args.replace_str:
print("[警告] 查找字符串和替换字符串相同,不会做任何更改")
# 执行处理
process_databases_concurrently(
target_dir=args.target_dir,
search_str=args.search_str,
replace_str=args.replace_str,
max_workers=args.workers,
verbose=not args.quiet
)
if __name__ == '__main__':
main()