收起左侧

小蚁摄像机转存到EasyNVR

1
回复
145
查看
[ 复制链接 ]

1

主题

9

回帖

0

牛值

江湖小虾

一、小蚁摄像机使用过程中的困惑

小蚁摄像机作为小米生态链起步之作,确实稳定,我2015年买的小蚁摄像机,到今天仍然能用。然后配合我多年前买的小米路由硬盘版R2D,能完美保存很长时间的监控录像。小蚁摄像机的回放只能回放SD卡内的视频,无法查看转存到NAS中的视频,目前老家装了3个小米摄像机,一旦想回看超出SD卡中日期的视频,就很痛苦,小蚁的存储目录结构为XXXX年XX月XX日/XX时/XX分XX秒.mp4,每1分钟一个视频。想象一下,无论是用电脑直接看,还是加载到飞牛影视看,都是极其痛苦的,很难找。

二、解决方案

我看到飞牛的应用商店里上架了一款叫EasyNVR的应用,经过了解发现它竟然有App,并且是一款视频录像机的软件,播放回放有时间轴,就非常Nice了。

所以我就锁定了EasyNVR,想在这上面做一些文章,我探索了2条不同的线路:

1、破解小蚁摄像机使用rtsp流直接存到EasyNVR中,github的项目名为,alienatedsec/yi-hack-v5,有兴趣的,可以自行研究。

2、修改小蚁摄像机视频名称,让其符合EasyNVR的文件存储路径的命名规则。

先说结论,方案2是权衡方案1之后的结果。如果rtsp流直接录像,就必须24小时录像,因为EasyNVR是没有检测画面的功能的,非常浪费存储空间,我只有1块1T的硬盘,我还想在硬盘上存一些我远程备份的文件的,可不想白白浪费这些存储空间。所以直接硬链接转存一下,然后让EasyNVR直接播放回放,因为我并不想看实时画面,米家里面统一管理实时画面挺好的,我只是需要看回放方便一些。

三、转存

首先,我用OBS做了一个虚拟源,用EasyNVR录像,然后获得了EasyNVR的文件存储路径的命名规则:设备ID/通道ID/日期/完整的时间-视频毫秒级时长.mp4(例如:PMAhYiCJ3fpxu/01/20251204/20251204073300-49664.mp4)。

问题来了,那么多视频,总不能手动一个个去改文件名吧,得用程序,但是我不会写程序啊,咋整?我们身在一个AI应用的时代,其实上面的思路并不是我独立想出来的,也是询问豆包之后,给出的答案。同理,我只需要描述好我的需求,豆包可以帮我们写出合适的程序脚本。

基本上我就是描述一下两处文件目录的不同之出,然后提出一些要求,让豆包不断地改善,提高运行效率,最后加入并行处理的功能,基本就收工了。整个过程,程序基本不出bug,至于如何执行脚本,如何每天自动化执行脚本,我就不再赘述,各位自行问豆包,按着操作就行了。EasyNVR的虚拟设备,录像回放的设置等等问题,也不再赘述,自行问豆包即可,如果上面的程序脚本你都搞定了,这么点儿设置问题,自然难不倒你了。

四、附件

我不在老家,EasyNVR手机端的登不上去,以后再补图吧,反正也是有时间轴的,看着很舒服

image.png

image.png

image.png

image.png

image.png

五、python3****脚本

Python的脚本具备多进程、硬链接、日志、记录已扫描过的文件等功能,调试了2天完成的,个人感觉还不错。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import os
import re
import json
import shutil
import subprocess
from datetime import datetime, timedelta
from collections import defaultdict
from concurrent.futures import ProcessPoolExecutor, as_completed
from tqdm import tqdm

# ======= 配置 =======
INPUT_ROOT = "/vol02/1000-0-ef06fa92"
OUTPUT_ROOT = "/vol02/1000-0-ef06fa92/easynvr_rec"
TRACKER_FILE = os.path.join(OUTPUT_ROOT, "processed_files.json")  # 记录已处理文件

CAMERA_MAP = {
    "小蚁智能摄像机_B0D59D26BB07": "PMAhYiCJ3fpxu/南门",
    "小蚁智能摄像机_E8ABFA459D14": "PMAhYiCJ3fpxu/北门",
    "小蚁智能摄像机_B0D59D21EEA9": "PMAhYiCJ3fpxu/鱼池"
}

CLEANUP_DAYS = 90
MAX_WORKERS = 4       # CPU核心数
BATCH_SIZE = 200      # 每批处理的文件数量
# ===================

# 预编译正则
DATE_FOLDER_PATTERN = re.compile(r'(\d{4})年(\d{1,2})月(\d{1,2})日')
HOUR_FOLDER_PATTERN = re.compile(r'(\d{1,2})时')
MINSEC_FILE_PATTERN = re.compile(r'(\d{1,2})分(\d{1,2})秒')


class Logger:
    """日志处理类"""
    def __init__(self, log_dir, log_file_name):
        self.log_dir = log_dir
        self.log_file_path = os.path.join(log_dir, log_file_name)
        self.log_buffer = []
        os.makedirs(log_dir, exist_ok=True)

    def _format_message(self, message):
        """生成带时间戳的日志行"""
        ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        return f"[{ts}] {message}"

    def _write_to_file(self, line):
        """写入日志文件"""
        self.log_buffer.append(line)
        if len(self.log_buffer) >= 100:
            self.flush()

    def flush(self):
        """将缓冲区写入文件并清空"""
        if self.log_buffer:
            with open(self.log_file_path, "a", encoding="utf-8") as f:
                f.write("\n".join(self.log_buffer) + "\n")
            self.log_buffer.clear()

    def log(self, message):
        """终端和日志文件都输出"""
        line = self._format_message(message)
        print(line)
        self._write_to_file(line)

    def log_only(self, message):
        """只写入日志文件,不输出到终端"""
        line = self._format_message(message)
        self._write_to_file(line)


# 初始化日志类
logger = Logger(
    log_dir=os.path.join(OUTPUT_ROOT, "log"),
    log_file_name=f"sync_{datetime.now().strftime('%Y%m%d')}.log"
)


class ProcessedFileTracker:
    """使用 JSON 文件记录已处理的文件"""
    def __init__(self, tracker_file):
        self.tracker_file = tracker_file
        self.data = self._load()

    def _load(self):
        """从 JSON 文件加载记录"""
        if os.path.exists(self.tracker_file):
            try:
                with open(self.tracker_file, "r", encoding="utf-8") as f:
                    return json.load(f)
            except Exception as e:
                logger.log(f"加载记录文件失败: {e}")
        return {"processed_files": {}}

    def is_processed(self, input_path):
        """检查文件是否已处理"""
        return input_path in self.data["processed_files"]

    def mark_processed(self, input_path, timestamp, output_path):
        """标记文件为已处理"""
        self.data["processed_files"][input_path] = {
            "timestamp": timestamp,
            "output_path": output_path,
            "processed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }

    def save(self):
        """保存记录到 JSON 文件"""
        try:
            with open(self.tracker_file, "w", encoding="utf-8") as f:
                json.dump(self.data, f, ensure_ascii=False, indent=2)
        except Exception as e:
            logger.log(f"保存记录文件失败: {e}")


def get_duration_ms(file_path):
    """调用 ffprobe 获取视频时长(毫秒)"""
    try:
        cmd = [
            "ffprobe", "-v", "error",
            "-show_entries", "format=duration",
            "-of", "default=noprint_wrappers=1:nokey=1",
            file_path
        ]
        result = subprocess.run(cmd, stdout=subprocess.PIPE, text=True, timeout=15)
        duration_sec = float(result.stdout.strip())
        return int(duration_sec * 1000)
    except Exception:
        return None


def parse_time_from_path(file_path):
    """从路径解析出时间戳和日期文件夹"""
    date_match = DATE_FOLDER_PATTERN.search(file_path)
    hour_match = HOUR_FOLDER_PATTERN.search(file_path)
    minsec_match = MINSEC_FILE_PATTERN.search(file_path)

    if not (date_match and hour_match and minsec_match):
        return None, None, f"跳过非标准路径: {file_path}"

    year = date_match.group(1)
    month = date_match.group(2).zfill(2)
    day = date_match.group(3).zfill(2)
    hour = hour_match.group(1).zfill(2)
    minute = minsec_match.group(1).zfill(2)
    second = minsec_match.group(2).zfill(2)

    timestamp = f"{year}{month}{day}{hour}{minute}{second}"
    date_folder = f"{year}{month}{day}"

    return timestamp, date_folder, None


def process_single_file(args):
    """多进程单个文件处理 - 硬链接"""
    file_info, root_output = args
    input_path = file_info["path"]
    device_id = file_info["device_id"]
    timestamp = file_info["timestamp"]
    date_folder = file_info["date_folder"]

    # 获取时长
    duration_ms = get_duration_ms(input_path)
    if duration_ms is None:
        return None, f"获取时长失败,跳过: {input_path}"

    # 输出路径
    output_dir = os.path.join(root_output, device_id, date_folder)
    os.makedirs(output_dir, exist_ok=True)
    output_fullpath = os.path.join(output_dir, f"{timestamp}-{duration_ms}.mp4")

    if os.path.exists(output_fullpath):
        logger.log_only(f"硬链接已存在,已记录: {output_fullpath}")
        return (input_path, timestamp, output_fullpath), None

    try:
        os.link(input_path, output_fullpath)
        logger.log_only(f"硬链接完成: {output_fullpath}")
        return (input_path, timestamp, output_fullpath), None
    except Exception as e:
        err_msg = f"硬链接失败 {input_path} -> {output_fullpath}: {e}"
        logger.log(err_msg)  # 错误信息终端和日志都显示
        return None, err_msg


def collect_mp4_files(input_root, tracker):
    """收集待处理的 MP4 文件(跳过已处理的)"""
    mp4_file_map = defaultdict(list)
    today_str = datetime.now().strftime("%Y年%m月%d日")

    for camera_name, device_id in CAMERA_MAP.items():
        input_dir = os.path.join(input_root, camera_name)
        if not os.path.isdir(input_dir):
            logger.log(f"摄像头目录不存在: {input_dir}")
            continue

        for folder in os.listdir(input_dir):
            folder_path = os.path.join(input_dir, folder)
            if not os.path.isdir(folder_path) or not DATE_FOLDER_PATTERN.match(folder) or folder >= today_str:
                continue

            for root, _, files in os.walk(folder_path):
                for file in files:
                    if file.lower().endswith(".mp4"):
                        full_path = os.path.join(root, file)
                        if tracker.is_processed(full_path):
                            continue  # 已处理,跳过

                        timestamp, date_folder, err_msg = parse_time_from_path(full_path)
                        if err_msg:
                            continue

                        mp4_file_map[camera_name].append({
                            "path": full_path,
                            "device_id": device_id,
                            "timestamp": timestamp,
                            "date_folder": date_folder
                        })
    return mp4_file_map


def batch_process_files_parallel(mp4_file_map, root_output, tracker):
    """分批次多进程处理"""
    tasks = []
    for camera_name, file_list in mp4_file_map.items():
        for file_info in file_list:
            tasks.append((file_info, root_output))

    total = len(tasks)
    logger.log(f"开始分批次处理 {total} 个文件,每批 {BATCH_SIZE} 个,进程数 {MAX_WORKERS}")

    pbar_total = tqdm(total=total, desc="总进度", unit="文件", unit_scale=True, dynamic_ncols=True, ascii=True)

    for i in range(0, total, BATCH_SIZE):
        batch = tasks[i:i+BATCH_SIZE]
        logger.log(f"处理批次 {i//BATCH_SIZE + 1}/{(total+BATCH_SIZE-1)//BATCH_SIZE},文件数 {len(batch)}")

        with ProcessPoolExecutor(max_workers=MAX_WORKERS) as executor:
            future_to_task = {executor.submit(process_single_file, t): t for t in batch}

            pbar_batch = tqdm(total=len(batch), desc=f"批次 {i//BATCH_SIZE + 1}", unit="文件",
                              unit_scale=True, dynamic_ncols=True, ascii=True, leave=False)

            for future in as_completed(future_to_task):
                result, msg = future.result()
                if msg:
                    logger.log(msg)  # 只有错误信息才会终端显示
                if result:
                    input_path, timestamp, output_fullpath = result
                    tracker.mark_processed(input_path, timestamp, output_fullpath)
                pbar_batch.update(1)
                pbar_total.update(1)

            pbar_batch.close()

    pbar_total.close()
    tracker.save()  # 保存已处理记录


def _cleanup_old_folders(path, cutoff_date, folder_type):
    """清理过期文件夹"""
    deleted_count = 0
    if not os.path.isdir(path):
        logger.log(f"路径不存在,跳过: {path}")
        return deleted_count

    for folder_name in os.listdir(path):
        folder_path = os.path.join(path, folder_name)
        if not os.path.isdir(folder_path):
            continue

        try:
            if folder_type == "output":
                folder_date = datetime.strptime(folder_name, "%Y%m%d").date()
            else:
                folder_date = datetime.strptime(folder_name, "%Y年%m月%d日").date()
        except ValueError:
            logger.log(f"跳过非日期文件夹: {folder_path}")
            continue

        if folder_date < cutoff_date.date():
            try:
                shutil.rmtree(folder_path)
                deleted_count += 1
                logger.log(f"已删除过期文件夹: {folder_path}")
            except Exception as e:
                logger.log(f"删除文件夹失败 {folder_path}: {e}")

    return deleted_count


def cleanup_old_videos(input_root, output_root, cutoff_date):
    """清理每个设备的 INPUT 和 OUTPUT 过期文件夹"""
    logger.log(f"\n开始清理 {CLEANUP_DAYS} 天前的视频文件夹,截止日期: {cutoff_date.strftime('%Y-%m-%d')}")

    for camera_name, device_id in CAMERA_MAP.items():
        # INPUT 设备路径
        input_device_path = os.path.join(input_root, camera_name)
        logger.log(f"\n[INPUT] 开始清理: {input_device_path}")
        deleted_input = _cleanup_old_folders(input_device_path, cutoff_date, folder_type="input")

        # OUTPUT 设备路径
        output_device_path = os.path.join(output_root, device_id)
        logger.log(f"\n[OUTPUT] 开始清理: {output_device_path}")
        deleted_output = _cleanup_old_folders(output_device_path, cutoff_date, folder_type="output")

        logger.log(f"[{camera_name}] INPUT 删除 {deleted_input} 个,OUTPUT 删除 {deleted_output} 个")

    logger.log(f"\n全部设备清理完成")


if __name__ == "__main__":
    try:
        logger.log("===== 开始同步任务 =====")
        tracker = ProcessedFileTracker(TRACKER_FILE)
        mp4_files = collect_mp4_files(INPUT_ROOT, tracker)
        total = sum(len(v) for v in mp4_files.values())
        logger.log(f"共发现 {total} 个待处理MP4文件")

        batch_process_files_parallel(mp4_files, OUTPUT_ROOT, tracker)

        cutoff_date = datetime.now() - timedelta(days=CLEANUP_DAYS)
        cleanup_old_videos(INPUT_ROOT, OUTPUT_ROOT, cutoff_date)

        logger.log("===== 同步任务结束 =====")
    except Exception as e:
        logger.log(f"任务异常终止: {e}")
    finally:
        logger.flush()
收藏
送赞 1
分享

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x

7

主题

73

回帖

0

牛值

初出茅庐

网络摄像机onvif基本的,用这个onvif或者rtsp都可以。

看看设置是不是报警录像,设置定时录像,一般有设置5分钟一段,或者十分钟,或者30分钟。

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则