收起左侧

迁移相册应用到其他飞牛

1
回复
27
查看
[ 复制链接 ]

2

主题

1

回帖

0

牛值

江湖小虾

一、迁移元数据

  1. 目标机器安装相册应用
  2. 拷贝照片目录, 注意:照片目录最好和源机器一模一样, 包括存储空间, 不然要改路径
  3. ssh登陆源机器
  4. 进入 /var/apps, 用 sudo tar zcvfh trim.photos.app.tar trim.photos 打包 trim.photos 目录(注意:一定要加 h 选项, 打包软链接目录)
  5. 将打包后的tar拷贝到目标机器
  6. 解压, 解压之后目录应该是
    图片.png
  7. 目标机器进入 /var/apps/trim.photos, 现在的目录结构是
    图片.png
  8. 将解压后的目录, 依次拷贝到软链接的目标目录, 例如 etc, 拷贝到 /usr/local/apps/@appconf/trim.photos 用命令 sudo cp -rfv etc/. /usr/local/apps/@appconf/trim.photos/
  9. etc home meta target tmp var 都处理完之后相册应用应该能看到元数据了

二、修复路径

进入 /var/apps/trim.photos 执行

sudo python3 process_db.py . "/vol2/" "/vol1/"

其中, /vol2/ 是源路径中需要替换的部分, /vol1/是新的路径, 如果路径其他部分也变了, 应该先从最长路径开始替换, 多次执行py脚本即可.

process_db.py内容如下:

#!/usr/bin/env python3
"""
SQLite 数据库字符串查找替换工具
递归处理指定目录下的所有 .db 文件
"""

import argparse
import os
import sqlite3
import sys
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from typing import List, Tuple
import threading


class DatabaseProcessor:
    def __init__(self, search_str: str, replace_str: str, verbose: bool = True):
        self.search_str = search_str
        self.replace_str = replace_str
        self.verbose = verbose
        self.lock = threading.Lock()

    def log(self, message: str):
        """线程安全的日志输出"""
        if self.verbose:
            with self.lock:
                print(message)

    def find_db_files(self, target_dir: str) -> List[Path]:
        """
        递归查找目标目录下所有的 .db 文件
        包括软链接指向的目录
        """
        db_files = []
        target_path = Path(target_dir).resolve()

        if not target_path.exists():
            self.log(f"[错误] 目标目录不存在: {target_dir}")
            return db_files

        self.log(f"[信息] 开始扫描目录: {target_path}")

        # 使用 os.walk 支持 followlinks 参数来遍历软链接
        for root, dirs, files in os.walk(target_path, followlinks=True):
            root_path = Path(root)
            for filename in files:
                if filename.endswith('.db'):
                    db_file = root_path / filename
                    # 检查是否是真正的SQLite数据库文件
                    if self._is_sqlite_db(db_file):
                        db_files.append(db_file)
                        self.log(f"[发现] SQLite数据库: {db_file}")

        self.log(f"[信息] 共发现 {len(db_files)} 个 SQLite 数据库文件")
        return db_files

    @staticmethod
    def _is_sqlite_db(file_path: Path) -> bool:
        """检查文件是否为SQLite数据库"""
        try:
            if not file_path.is_file():
                return False
            # 读取文件头检查SQLite魔数
            with open(file_path, 'rb') as f:
                header = f.read(16)
                return header.startswith(b'SQLite format 3')
        except (Permissi*, OSError):
            return False

    @staticmethod
    def _check_writable(db_path: Path) -> bool:
        """检查数据库文件是否可写"""
        # 检查文件本身是否可写
        if not os.access(db_path, os.W_OK):
            return False
        # 检查所在目录是否可写(SQLite需要创建journal文件)
        if not os.access(db_path.parent, os.W_OK):
            return False
        return True

    def process_database(self, db_path: Path) -> Tuple[str, int, int]:
        """
        处理单个数据库文件
        返回: (数据库路径, 处理的表数, 替换的记录数)
        """
        db_path_str = str(db_path)
        self.log(f"[处理] 开始处理数据库: {db_path}")

        # 检查写权限
        if not self._check_writable(db_path):
            self.log(f"[错误] 数据库文件或目录没有写权限: {db_path}")
            self.log(f"[提示] 请检查文件权限: chmod 644 {db_path}")
            self.log(f"[提示] 或检查目录权限: chmod 755 {db_path.parent}")
            return db_path_str, 0, 0

        try:
            conn = sqlite3.connect(db_path_str)
            cursor = conn.cursor()

            # 获取所有表名
            cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'")
            tables = cursor.fetchall()

            total_tables = len(tables)
            total_replacements = 0

            for table_tuple in tables:
                table_name = table_tuple[0]
                replacements = self._process_table(cursor, table_name)
                total_replacements += replacements

            conn.close()

            self.log(f"[完成] 数据库 {db_path.name}: 处理 {total_tables} 个表, 替换 {total_replacements} 条记录")
            return (db_path_str, total_tables, total_replacements)

        except sqlite3.Error as e:
            self.log(f"[错误] 处理数据库 {db_path} 时出错: {e}")
            return (db_path_str, 0, 0)
        except Exception as e:
            self.log(f"[错误] 处理数据库 {db_path} 时发生未知错误: {e}")
            return (db_path_str, 0, 0)

    def _process_table(self, cursor, table_name: str) -> int:
        """
        处理单个表
        返回替换的记录数
        """
        try:
            # 获取表的列信息
            cursor.execute(f"PRAGMA table_info(`{table_name}`)")
            columns = cursor.fetchall()

            if not columns:
                return 0

            # 获取主键列名(用于更新)
            primary_keys = [col[1] for col in columns if col[5] == 1]  # col[5] 是 pk 标志
            rowid_col = 'rowid' if not primary_keys else primary_keys[0]

            # 所有文本类型的列
            text_columns = []
            for col in columns:
                col_name = col[1]
                col_type = col[2].upper() if col[2] else ''
                # 检查是否为文本类型
                if any(t in col_type for t in ['TEXT', 'CHAR', 'CLOB', 'VARCHAR', 'NVARCHAR']):
                    text_columns.append(col_name)

            if not text_columns:
                return 0

            # 查询所有数据
            cursor.execute(f"SELECT {rowid_col}, * FROM `{table_name}`")
            rows = cursor.fetchall()

            replacement_count = 0

            for row in rows:
                row_id = row[0]
                row_data = row[1:]  # 跳过 rowid

                # 检查每一列是否需要替换
                for i, col_value in enumerate(row_data):
                    if col_value is None:
                        continue

                    col_str = str(col_value)
                    if self.search_str in col_str:
                        # 执行替换
                        new_value = col_str.replace(self.search_str, self.replace_str)
                        col_name = columns[i][1]

                        try:
                            cursor.execute(
                                f"UPDATE `{table_name}` SET `{col_name}` = ? WHERE {rowid_col} = ?",
                                (new_value, row_id)
                            )
                            replacement_count += 1
                            self.log(f"  [替换] 表 `{table_name}`.列 `{col_name}` ID={row_id}: '{col_str[:50]}...' -> '{new_value[:50]}...'")
                        except sqlite3.Error as e:
                            self.log(f"  [错误] 更新失败: {e}")

            # 提交当前表的更改
            cursor.connection.commit()
            return replacement_count

        except sqlite3.Error as e:
            self.log(f"[错误] 处理表 {table_name} 时出错: {e}")
            return 0


def process_databases_concurrently(
    target_dir: str,
    search_str: str,
    replace_str: str,
    max_workers: int = 4,
    verbose: bool = True
) -> None:
    """并发处理多个数据库文件"""

    processor = DatabaseProcessor(search_str, replace_str, verbose)

    # 查找所有数据库文件
    db_files = processor.find_db_files(target_dir)

    if not db_files:
        print("[信息] 未找到任何 SQLite 数据库文件")
        return

    print(f"\n[信息] 开始处理 {len(db_files)} 个数据库文件 (并发数: {max_workers})\n")

    # 使用线程池并发处理
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {executor.submit(processor.process_database, db_path): db_path for db_path in db_files}

        results = []
        for future in as_completed(futures):
            db_path = futures[future]
            try:
                result = future.result()
                results.append(result)
            except Exception as e:
                print(f"[错误] 处理 {db_path} 时发生异常: {e}")

    # 输出汇总
    print("\n" + "=" * 60)
    print("处理汇总:")
    print("=" * 60)
    total_tables = 0
    total_replacements = 0

    for db_path, table_count, replacement_count in results:
        print(f"  {Path(db_path).name}: {table_count} 个表, {replacement_count} 次替换")
        total_tables += table_count
        total_replacements += replacement_count

    print("-" * 60)
    print(f"总计: {len(results)} 个数据库, {total_tables} 个表, {total_replacements} 次替换")
    print("=" * 60)


def main():
    parser = argparse.ArgumentParser(
        description='SQLite 数据库字符串查找替换工具',
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
示例:
  python process_db.py /path/to/dbs "old_string" "new_string"
  python process_db.py ./data "localhost" "127.0.0.1" --workers 8
  python process_db.py /data/db "test" "production" -q
        """
    )

    parser.add_argument('target_dir', help='目标目录路径')
    parser.add_argument('search_str', help='要查找的字符串')
    parser.add_argument('replace_str', help='用于替换的字符串')
    parser.add_argument(
        '--workers', '-w',
        type=int,
        default=4,
        help='并发处理的数据库数量 (默认: 4)'
    )
    parser.add_argument(
        '--quiet', '-q',
        action='store_true',
        help='静默模式,只输出汇总信息'
    )

    args = parser.parse_args()

    # 验证参数
    if not args.search_str:
        print("[错误] 查找字符串不能为空")
        sys.exit(1)

    if args.search_str == args.replace_str:
        print("[警告] 查找字符串和替换字符串相同,不会做任何更改")

    # 执行处理
    process_databases_concurrently(
        target_dir=args.target_dir,
        search_str=args.search_str,
        replace_str=args.replace_str,
        max_workers=args.workers,
        verbose=not args.quiet
    )


if __name__ == '__main__':
    main()

收藏
送赞 1
分享

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x

2

主题

1

回帖

0

牛值

江湖小虾

2 小时前 楼主 显示全部楼层
路径修复好像可以在相册应用里操作, 数据库拷贝之后, 在相册的"文件夹管理"里点"扫描" 会提示"发生异常错误,找不到该文件夹" 出现"重连文件夹"按钮, 点击可以选择新的目录.
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则