收起左侧

分享Python代码mp4快速无损转mkv

2
回复
266
查看
[ 复制链接 ]

1

主题

1

回帖

0

牛值

江湖小虾

2025-3-26 01:57:16 显示全部楼层 阅读模式

[i=s] 本帖最后由 堂吉先生 于 2025-3-26 20:14 编辑 [/i]<br /> <br />

import tkinter as tk
from tkinter import filedialog, ttk, scrolledtext
import os
import concurrent.futures
import threading
import time
import subprocess
import logging
from datetime import datetime
import multiprocessing

class VideoConverterApp:
    def __init__(self, master):
        self.master = master
        master.title("MP4转MKV工具-堂吉先生")
        master.geometry("1024x520+400+200")

        # 初始化日志系统
        self.setup_logging()

        # 界面变量
        self.dir_path = tk.StringVar()
        self.status = tk.StringVar(value="准备就绪")
        self.thread_num = tk.IntVar(value=self.get_optimal_thread_count())
        self.delete_original = tk.BooleanVar(value=True)
        self.stop_requested = False

        # 转换控制参数
        self.total_files = 0
        self.completed_files = 0
        self.total_size_bytes = 0
        self.processed_size_bytes = 0
        self.executor = None
        self.futures = []
        self.start_time = 0
        self.monitor_thread = None
        self.active_count = 0

        # 初始化UI组件
        self.create_widgets()
        self.setup_log_text()

    def get_optimal_thread_count(self):
        """获取最佳线程数"""
        cpu_count = multiprocessing.cpu_count()
        return min(cpu_count + 1, 16)

    def setup_logging(self):
        """配置日志系统"""
        log_dir = os.path.join(os.path.expanduser("~"), "VideoConverterLogs")
        os.makedirs(log_dir, exist_ok=True)
        log_file = os.path.join(log_dir, f"conversion_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log")

        logging.basicConfig(
            level=logging.INFO,
            format="%(asctime)s %(message)s",
            datefmt="%H:%M:%S",
            handlers=[
                logging.FileHandler(log_file),
                logging.StreamHandler()
            ]
        )
        logging.info("应用程序启动")

    def create_widgets(self):
        """创建界面组件"""
        main_frame = ttk.Frame(self.master, padding=20)
        main_frame.pack(fill=tk.BOTH, expand=True)

        # 目录选择部分
        dir_frame = ttk.Frame(main_frame)
        dir_frame.pack(fill=tk.X, pady=5)
        ttk.Label(dir_frame, text="目标目录:").pack(side=tk.LEFT, padx=5)
        ttk.Entry(dir_frame, textvariable=self.dir_path, width=50).pack(side=tk.LEFT, expand=True, fill=tk.X, padx=5)
        ttk.Button(dir_frame, text="浏览", command=self.choose_dir).pack(side=tk.LEFT, padx=5)

        # 设置部分
        settings_frame = ttk.Frame(main_frame)
        settings_frame.pack(fill=tk.X, pady=5)

        optimal_threads = self.get_optimal_thread_count()
        ttk.Label(settings_frame, text=f"线程数 (1-{optimal_threads}):").pack(side=tk.LEFT, padx=5)
        thread_combobox = ttk.Combobox(settings_frame, textvariable=self.thread_num, width=5)
        thread_combobox['values'] = list(range(1, optimal_threads + 1))
        thread_combobox.pack(side=tk.LEFT)
        thread_combobox.set(8)
        ttk.Checkbutton(settings_frame, text="转换后删除MP4文件", variable=self.delete_original).pack(side=tk.LEFT, padx=15)

        # 控制按钮
        control_frame = ttk.Frame(main_frame)
        control_frame.pack(pady=10)
        self.start_btn = ttk.Button(control_frame, text="开始转换", command=self.start_conversion)
        self.start_btn.pack(side=tk.LEFT, padx=5)
        self.stop_btn = ttk.Button(control_frame, text="停止转换", command=self.stop_conversion, state=tk.DISABLED)
        self.stop_btn.pack(side=tk.LEFT, padx=5)

        # 进度条
        self.progress = ttk.Progressbar(main_frame, length=300, mode='determinate')
        self.progress.pack(pady=10)

        # 状态显示
        ttk.Label(main_frame, textvariable=self.status).pack()

    def setup_log_text(self):
        """创建日志文本框"""
        log_frame = ttk.Frame(self.master)
        log_frame.pack(fill=tk.BOTH, expand=True, padx=20, pady=10)

        self.log_text = scrolledtext.ScrolledText(log_frame, wrap=tk.WORD, height=100)
        self.log_text.pack(fill=tk.BOTH, expand=True)

        class TextHandler(logging.Handler):
            def __init__(self, text_widget):
                super().__init__()
                self.text_widget = text_widget

            def emit(self, record):
                msg = self.format(record)
                self.text_widget.configure(state='normal')
                self.text_widget.insert(tk.END, msg + '\n')
                self.text_widget.configure(state='disabled')
                self.text_widget.yview(tk.END)

        text_handler = TextHandler(self.log_text)
        text_handler.setFormatter(logging.Formatter("%(asctime)s %(message)s", datefmt="%H:%M:%S"))
        logging.getLogger().addHandler(text_handler)

    def choose_dir(self):
        """选择目标目录"""
        directory = filedialog.askdirectory()
        if directory:
            self.dir_path.set(directory)
            logging.info(f"选择目录: {directory}")

    def start_conversion(self):
        """启动转换流程"""
        self.start_btn.config(state=tk.DISABLED)
        self.stop_btn.config(state=tk.NORMAL)
        self.stop_requested = False

        target_dir = self.dir_path.get()

        if not target_dir or not os.path.exists(target_dir):
            self.status.set("错误: 请选择有效目录")
            self.start_btn.config(state=tk.NORMAL)
            return

        mp4_files = self.find_mp4_files(target_dir)
        if not mp4_files:
            self.status.set("未找到MP4文件")
            self.start_btn.config(state=tk.NORMAL)
            return

        self.total_size_bytes = sum(os.path.getsize(f) for f in mp4_files)
        total_size_gb = self.total_size_bytes / (1024**3)
        self.status.set(f"开始转换 {len(mp4_files)} 个文件, 总大小: {total_size_gb:.2f} GB")

        logging.info(f"开始转换 {len(mp4_files)} 个文件, 总大小: {total_size_gb:.2f} GB")
        self.total_files = len(mp4_files)
        self.completed_files = 0
        self.progress['value'] = 0
        self.progress['maximum'] = self.total_files

        threading.Thread(target=self.process_files, args=(mp4_files,), daemon=True).start()
        # 启动监控线程
        self.monitor_thread = threading.Thread(target=self.monitor_thread_pool, daemon=True)
        self.monitor_thread.start()

    def stop_conversion(self):
        """停止转换流程"""
        self.stop_requested = True
        if self.executor:
            # 取消所有任务并关闭线程池
            for future in self.futures:
                future.cancel()
            self.executor.shutdown(wait=False)

        self.status.set("正在停止转换...")
        logging.info("用户请求停止转换")

    def monitor_thread_pool(self):
        """监控线程池活动线程数量"""
        while True:
            self.active_count = 0
            if self.executor:
                # 通过线程名称识别线程池线程
                self.active_count = sum(
                    1 for t in threading.enumerate()
                    if t.name.startswith('ConverterThread')
                )
            if self.stop_requested:
                self.status.set(f"正在停止转换...活动线程: {self.active_count}")

            if self.active_count == 0 and self.stop_requested:
                self.status.set("转换完全中止")
                logging.info(f"转换已中止, 已处理大小: {self.processed_size_bytes/(1024**3):.2f}GB")
                break

            time.sleep(0.3)  # 降低CPU占用

    def find_mp4_files(self, directory):
        """递归查找MP4文件"""
        mp4_files = []
        for root, dirs, files in os.walk(directory):
            if '云盘缓存' in dirs:
                dirs.remove('云盘缓存')
            if '云盘缓存文件' in dirs:
                dirs.remove('云盘缓存文件')

            for f in files:
                if f.lower().endswith('.mp4') or f.lower().endswith('.flv'):
                    mp4_files.append(os.path.join(root, f))
        return mp4_files

    def process_files(self, file_list):
        """多线程处理文件转换"""
        self.start_time = time.time()
        thread_num = max(1, min(self.get_optimal_thread_count(), self.thread_num.get()))

        try:
            # 创建带名称前缀的线程池
            self.executor = concurrent.futures.ThreadPoolExecutor(
                max_workers=thread_num,
                thread_name_prefix='ConverterThread'
            )
            self.futures = [self.executor.submit(self.convert_file, file) for file in file_list]

            for future in concurrent.futures.as_completed(self.futures):
                if self.stop_requested:
                    break
                self.update_progress()

        except Exception as e:
            logging.error(f"处理过程中发生错误: {str(e)}")
        finally:
            if self.executor:
                self.executor.shutdown(wait=False)
            total_time = time.time() - self.start_time

            processed_size_mb = self.processed_size_bytes / (1024**2)
            speed = processed_size_mb / total_time if total_time > 0 else 0

            self.start_btn.config(state=tk.NORMAL)
            self.stop_btn.config(state=tk.DISABLED)

            if not self.stop_requested:
                status_msg = (f"转换完成! 耗时: {total_time:.0f}秒 | "
                              f"大小: {processed_size_mb/1024:.2f}GB | "
                              f"速度: {speed:.2f}MB/s | "
                              f"线程: {self.active_count}")
                self.status.set(status_msg)
                logging.info(f"转换完成! 总耗时: {total_time:.0f}秒, 处理大小: {processed_size_mb/1024:.2f}GB, 平均速度: {speed:.2f}MB/s")

    def formatted_time(self,elapsed_time):
        hours, remaining = divmod(elapsed_time, 3600)
        minutes, seconds = divmod(remaining, 60)
        formatted_time = f"{int(hours):02d}时:{int(minutes):02d}分:{seconds:.0f}秒"
        return formatted_time
    def update_progress(self):
        """更新进度显示"""
        if self.stop_requested:
            return

        self.completed_files += 1
        progress_value = (self.completed_files / self.total_files) * 100

        processed_size_mb = self.processed_size_bytes / (1024**2)
        total_size_mb = self.total_size_bytes / (1024**2)

        elapsed_time = time.time() - self.start_time

        speed = processed_size_mb / elapsed_time if elapsed_time > 0 else 0

        status_msg = (f"完成 {self.completed_files}/{self.total_files} ({progress_value:.1f}%) | "
                     f"大小: {processed_size_mb/1024:.1f}/{total_size_mb/1024:.1f}GB | "
                     f"耗时: {self.formatted_time(elapsed_time)} | "
                     f"预计: {self.formatted_time((self.total_size_bytes-self.processed_size_bytes)*elapsed_time/self.processed_size_bytes)} | "
                     f"速度: {speed:.2f}MB/s | "
                     f"线程: {self.active_count}")

        self.progress['value'] = self.completed_files
        self.status.set(status_msg)

    def convert_file(self, input_path):
        """执行FFmpeg转换"""
        if self.stop_requested:
            return

        try:
            base, _ = os.path.splitext(input_path)
            output_path = f"{base}.mkv"

            cmd = [
                'ffmpeg', '-i', input_path,
                '-c:v', 'copy', '-c:a', 'copy',
                '-y', output_path
            ]

            subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
            self.processed_size_bytes += os.path.getsize(input_path)
            logging.info(f"转换成功: {input_path} (大小: {os.path.getsize(input_path)/(1024**2):.2f}MB)")

            if self.delete_original.get():
                os.remove(input_path)

        except Exception as e:
            if not self.stop_requested:
                logging.error(f"处理文件失败: {input_path} - {str(e)}")

if __name__ == "__main__":
    root = tk.Tk()
    app = VideoConverterApp(root)
    root.mainloop()
收藏
送赞
分享

1

主题

1

回帖

0

牛值

江湖小虾

2025-3-26 11:44:13 楼主 显示全部楼层

[i=s] 本帖最后由 堂吉先生 于 2025-3-26 12:29 编辑 [/i]<br /> <br />

python 文件转exe教程 pip install pyinstaller pyinstaller --onefile --windowed 无损转换mkv.py 这里 --onefile 表示将所有内容打包到一个单独的可执行文件中,--windowed 选项用于隐藏控制台窗口(如果你的脚本不依赖于命令行输入)。 dist 文件夹下生成exe文件,build文件夹可以删除

微信图片编辑_20250326113409.jpg

0

主题

10

回帖

0

牛值

江湖小虾

大佬,FN中怎么运行PY程序呀,我想搭建泰拉瑞亚服务器,只有IPV6,在网上找到了“将IPv6的指定端口流量转发到内网IPv4的指定端口”的PY脚本,但是不知道怎么在NAS中运行。cry

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则