收起左侧

#萌新折腾笔记#青龙面板---QB下载器状态监控

2
回复
846
查看
[ 复制链接 ]

11

主题

96

回帖

195

牛值

共建版主

社区上线纪念勋章社区共建团荣誉勋章

2025-1-7 12:15:31 显示全部楼层 阅读模式

[i=s] 本帖最后由 EWEDL 于 2025-2-5 12:03 编辑 [/i]<br /> <br />

更新日志

2025/2/5:去除了初版报错种子的详情显示,新增下载器自定义名称、自动删除状态为“丢失文件”的任务、自动开始已下载完成的任务(辅种)、tracker异常检测以、定时执行(需要配合青龙面板定时策略)以及一些通知逻辑,避免每次执行都进行通知,本帖仅更新脚本内容,具体效果自测

2025/1/7:初版,登录下载器进行数据统计,并将报错种子列出。

前言

作为一个PT新手玩家,手头没什么大站,许是由于资源较差的问题,近期在玩转时发现下载器时不时出现种子报错的现象。故而写了一个python脚本,用于检测下载器的状态。分享给有需要的玩家。

脚本内容

import os
import requests
from urllib.parse import urlparse
import time
import datetime

# qBittorrent Web UI的配置信息列表,一个一行{'url': '网址', 'username': '用户名', 'password': '密码', 'name': '自定义名称'}
qb_config = [
    {'url': 'http://192.168.2.2:18090', 'username': 'cming', 'password': 'cmingcming', 'name': '刷流'},
    {'url': 'http://192.168.2.2:18091', 'username': 'cming', 'password': 'cmingcming', 'name': '保种'},
]

# 功能开关
SHOW_ERROR_DETAILS = False  # tracker状态详情显示
DELETE_MISSINGFILES = True  # 是否删除 "暂停-文件丢失" 类型的任务,如未删除的辅种
START_PAUSEDUP = True  # 是否开始 "暂停-下载完成" 类型的任务,如新添加的辅种

# 通知控制开关
ALWAYS_SEND_NOTIFICATION = False  # 是否每次执行都发送通知
SEND_DELETE_NOTIFICATION = True  # 是否开启删除任务通知
SEND_START_NOTIFICATION = True  # 是否开启开始任务通知
ENABLE_SCHEDULED_NOTIFICATION = True  # 是否开启定时通知
SCHEDULED_NOTIFICATION_TIMES = ["00:00", "12:00"]  # 定时通知时间点,格式为 "HH:MM",该功能需与青龙面板定时配合

# 企业微信机器人Webhook URL,需要在环境变量中添加“WEIXIN_WEBHOOK_URL”
wechat_webhook_url = os.getenv('WEIXIN_WEBHOOK_URL')
# qB状态字典(注意需要小写)
QB_STATUS_DICT = {
    'downloading': '正在下载(传输数据)',
    'stalleddl': '正在下载(未建立连接)',
    'uploading': '正在上传(传输数据)',
    'error': '暂停(发生错误)',
    'pauseddl': '暂停(下载未完成)',
    'pausedup': '暂停(下载完成)',
    'missingfiles': '暂停(文件丢失)',
    'checkingdl': '检查中(下载未完成)',
    'checkingup': '检查中(下载完成)',
    'checkingResumedata': '检查中(启动时恢复数据)',
    'forceddl': '强制下载(忽略队列)',
    'queueddl': '等待下载(排队)',
    'forcedup': '强制上传(忽略队列)',
    'queuedup': '等待上传(排队)',
    'allocating': '分配磁盘空间',
    'metadl': '获取元数据',
    'moving': '移动文件',
    'unknown': '未知状态'
}

def login_to_qb(session, config):
    """登录到qBittorrent"""
    try:
        response = session.post(f"{config['url']}/api/v2/auth/login", data=config, timeout=10)
        if response.status_code == 200 and response.text.strip() == 'Ok.':
            print(f"成功登录 {config['url']}")
            return True
        else:
            print(f"登录失败 {config['url']},状态码: {response.status_code}")
            return False
    except Exception as e:
        print(f"登录 {config['url']} 时出现异常: {e}")
        return False

def get_torrents_status(session, url):
    """获取所有任务的状态和属性(包含tracker信息)"""
    try:
        response = session.get(f'{url}/api/v2/torrents/info', timeout=10)
        if response.status_code == 200:
            torrents_info = response.json()
            torrents_with_trackers = []
            for torrent in torrents_info:
                hash_value = torrent.get('hash')
                trackers_response = session.get(f'{url}/api/v2/torrents/trackers', params={'hash': hash_value}, timeout=10)
                if trackers_response.status_code == 200:
                    torrent['trackers'] = trackers_response.json()
                    torrents_with_trackers.append(torrent)
                else:
                    print(f"获取种子 {torrent.get('name')} 的 tracker 信息失败,状态码:{trackers_response.status_code}")
            return torrents_with_trackers
        else:
            print(f"获取下载任务列表失败,状态码: {response.status_code}")
            return []
    except Exception as e:
        print(f"获取下载任务列表时出现异常: {e}")
        return []

def check_errors(torrents_info, session, config):
    """统计正常和错误的任务数量,并记录错误和tracker异常的详情"""
    error_count = 0
    error_details = []
    no_working_tracker_count = 0
    no_working_tracker_details = {}
    tracker_error_urls = set()
    status_counts = {}
    for status in QB_STATUS_DICT:
        status_counts[status] = 0
    hashes_to_delete = []
    hashes_to_start = []
    for t in torrents_info:
        name = t.get('name')
        hash_value = t.get('hash')
        state = t.get('state').lower()
        if state in QB_STATUS_DICT:
            status_counts[state] += 1
        if state in ['error', 'missingfiles']:
            error_count += 1
            if SHOW_ERROR_DETAILS:
                error_details.append({
                    'name': name,
                    'hash': hash_value,
                    'state': state,
                    'progress': t.get('progress'),
                    'error': t.get('error'),
                    'tracker': t.get('tracker')
                })
            if DELETE_MISSINGFILES and state == 'missingfiles':
                hashes_to_delete.append(hash_value) 
        if state == 'pausedup' and START_PAUSEDUP:
            hashes_to_start.append(hash_value)  
        trackers = t.get('trackers')
        if trackers is None:
            continue
        if isinstance(trackers, list):
            has_working_tracker = False
            first_error_url = None
            for tracker in trackers:
                if tracker.get('status') == 2:
                    has_working_tracker = True
                    break
                elif first_error_url is None:
                    url = tracker.get('url')
                    if url:
                        parsed_url = urlparse(url)
                        if parsed_url.scheme and parsed_url.netloc:
                            first_error_url = parsed_url.scheme + "://" + parsed_url.netloc
            if not has_working_tracker:
                no_working_tracker_count += 1
                if first_error_url:
                    if first_error_url not in no_working_tracker_details:
                        no_working_tracker_details[first_error_url] = []
                    no_working_tracker_details[first_error_url].append(t.get('name'))
                if first_error_url:
                    tracker_error_urls.add(first_error_url)
    deleted_torrent_count = len(hashes_to_delete)
    if hashes_to_delete:
        delete_torrents(session, config['url'], hashes_to_delete, torrents_info)
    started_torrent_count = len(hashes_to_start)
    if hashes_to_start:
        resume_torrents(session, config['url'], hashes_to_start, torrents_info)
    return len(torrents_info), error_count, error_details, no_working_tracker_count, no_working_tracker_details, tracker_error_urls, status_counts, deleted_torrent_count, started_torrent_count

def send_wechat_notification(message):
    """通过企业微信机器人发送通知"""
    if not wechat_webhook_url:
        print("未配置企业微信Webhook URL,无法发送通知。")
        return False
    try:
        headers = {'Content-Type': 'application/json'}
        data = {"msgtype": "text", "text": {"content": message}}
        response = requests.post(wechat_webhook_url, headers=headers, json=data, timeout=10)
        if response.status_code == 200:
            print("企业微信通知发送成功")
            return True
        else:
            print(f"发送企业微信通知失败,状态码: {response.status_code}")
            return False
    except Exception as e:
        print(f"发送企业微信通知时出现异常: {e}")
        return False

def delete_torrents(session, url, hashes_to_delete, torrents_info):
    """删除指定的种子"""
    if not hashes_to_delete:
        print("没有要删除的种子。")
        return
    delete_url = f"{url}/api/v2/torrents/delete"
    delete_params = {
        'hashes': '|'.join([hash.lower() for hash in hashes_to_delete]),
        'deleteFiles': 'true'
    }
    try:
        response = session.post(delete_url, data=delete_params, timeout=10)
        if response.status_code == 200:
            print(f"成功删除种子: {delete_params['hashes']}")
        else:
            print(f"删除种子失败: {response.status_code}, {response.text}")
    except Exception as e:
        print(f"删除种子时发生异常: {e}")

def resume_torrents(session, url, hashes_to_start, torrents_info):
    """开始指定的种子"""
    if not hashes_to_start:
        print("没有要开始的种子。")
        return
    resume_url = f"{url}/api/v2/torrents/resume"
    resume_params = {
        'hashes': '|'.join([hash.lower() for hash in hashes_to_start])
    }
    try:
        response = session.post(resume_url, data=resume_params, timeout=10)
        if response.status_code == 200:
            print(f"成功开始种子: {resume_params['hashes']}")
        else:
            print(f"启动种子失败: {response.status_code}, {response.text}")
    except Exception as e:
        print(f"启动种子时发生异常: {e}")

def should_send_scheduled_notification():
    """检查当前时间是否在设定的通知时间点"""
    now = datetime.datetime.now()
    current_time = now.strftime("%H:%M") 
    return current_time in SCHEDULED_NOTIFICATION_TIMES

def main():
    notification_message = "下载器状态报告:\n"
    results_log = []
    total_errors = 0 
    total_deleted = 0 
    total_started = 0 

    for index, config in enumerate(qb_config):
        with requests.Session() as session:
            if login_to_qb(session, config):
                torrents_info = get_torrents_status(session, config['url'])
                total, errors, error_details, no_working_tracker_count, no_working_tracker_details, tracker_error_urls, status_counts, deleted_torrent_count, started_torrent_count = check_errors(torrents_info, session, config)
                total_errors += errors
                status_msg = (
                    f"下载器: {config['name']}\n"
                    f"总任务数: {total}\n"
                    f"正常任务数: {total - errors}\n"
                    f"错误任务数: {errors}\n"
                )
                notification_message += status_msg
                results_log.append(status_msg)

                # 添加各个状态的统计信息,如果数量大于 0 才进行消息推送
                for status, count in status_counts.items():
                    if count > 0:  # 只显示数量大于 0 的状态
                        status_name = QB_STATUS_DICT.get(status, '未知状态')
                        status_msg = f"{status_name}: {count}"
                        if status == 'pausedup':
                            if START_PAUSEDUP and started_torrent_count > 0:
                                status_msg += " 已开始"
                                total_started += started_torrent_count
                        elif status == 'missingfiles':
                            if DELETE_MISSINGFILES and deleted_torrent_count > 0:
                                status_msg += " 已删除"
                                total_deleted += deleted_torrent_count
                        status_msg += "\n" 
                        notification_message += status_msg
                        results_log.append(status_msg)


                if errors > 0:
                    if SHOW_ERROR_DETAILS:
                        details_msg = "错误种子详情:\n"
                        for detail in error_details:
                            details_msg += (
                                f"名称: {detail['name']}\n"
                                f"状态: {detail['state']}\n"
                                f"进度: {detail['progress']}\n"
                                f"错误信息: {detail['error']}\n"
                            )
                        notification_message += details_msg + "\n"
                        results_log.append(details_msg)

                if no_working_tracker_count > 0:
                    no_tracker_msg = f"异常Tracker详情:\n"
                    for url, torrents in no_working_tracker_details.items():
                        no_tracker_msg += f"{url}  共{len(torrents)}个\n"
                        if SHOW_ERROR_DETAILS:
                            for torrent in torrents:
                                no_tracker_msg += f"{torrent}\n"
                        no_tracker_msg += ""
                    notification_message += no_tracker_msg
                    results_log.append(no_tracker_msg)
                else:
                    notification_message += "\n"
                    results_log.append("\n")
            else:
                status_msg = f"下载器: {config['name']} 登录失败。\n\n"
                notification_message += status_msg
                results_log.append(status_msg)

        if index < len(qb_config) - 1:
            notification_message += "\n"
            results_log.append("\n")

    send_scheduled = ENABLE_SCHEDULED_NOTIFICATION and should_send_scheduled_notification()

    send_notification = ALWAYS_SEND_NOTIFICATION or send_scheduled or \
                        (SEND_DELETE_NOTIFICATION and total_deleted > 0) or \
                        (SEND_START_NOTIFICATION and total_started > 0)

    final_notification_message = ""
    if send_notification:
        final_notification_message = notification_message
        if final_notification_message.endswith("\n\n"):
            final_notification_message = final_notification_message[:-2]
        if wechat_webhook_url:
            send_wechat_notification(final_notification_message)
        print("".join(results_log))
    else:
        print("根据配置,本次不发送企业微信通知。")

if __name__ == "__main__":
    try:
        main()
    except Exception as e:
        print(f"脚本运行出错: {e}")

使用方法

该脚本的运行逻辑十分简单,输入下载器地址和登陆信息--进入下载器获取任务列表中的种子及所其状态--统计排版--调用企业微信机器人进行推送 企业微信机器人key的获取也十分简单,创建群组添加机器人即可获取,随后将获取到的url之间填入环境变量中保存即可正常调用。网上有大量的教程教学,这里不做过多阐述

image.png 脚本本身调用的都是基础库,理论上不需要手动添加依赖即可运行。 运行后效果如下: image.png 手机端接收效果如下:

6c99d4fd02741f7e22f80a034d7ce426.jpg 可以看到报错的是个丢失文件的种子任务[很正常对吧]

最后,下载器的增加和信息修改直接在配置列表里处理即可,如果需要更换其他的通知方式,理论上可以把代码直接丢给ai,脚本写的还算规整应该不会有难度。:lol:

收藏
送赞
分享

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x

7

主题

77

回帖

0

牛值

fnOS系统内测组

2025-1-8 15:26:07 显示全部楼层
好东西,收藏了

7

主题

14

回帖

0

牛值

江湖小虾

2025-1-12 14:52:52 显示全部楼层
楼主在学习日语呀
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则