[i=s] 本帖最后由 EWEDL 于 2025-2-5 12:03 编辑 [/i]<br />
<br />
更新日志
2025/2/5:去除了初版报错种子的详情显示,新增下载器自定义名称、自动删除状态为“丢失文件”的任务、自动开始已下载完成的任务(辅种)、tracker异常检测以、定时执行(需要配合青龙面板定时策略)以及一些通知逻辑,避免每次执行都进行通知,本帖仅更新脚本内容,具体效果自测
2025/1/7:初版,登录下载器进行数据统计,并将报错种子列出。
前言
作为一个PT新手玩家,手头没什么大站,许是由于资源较差的问题,近期在玩转时发现下载器时不时出现种子报错的现象。故而写了一个python脚本,用于检测下载器的状态。分享给有需要的玩家。
脚本内容
import os
import requests
from urllib.parse import urlparse
import time
import datetime
# qBittorrent Web UI的配置信息列表,一个一行{'url': '网址', 'username': '用户名', 'password': '密码', 'name': '自定义名称'}
qb_config = [
{'url': 'http://192.168.2.2:18090', 'username': 'cming', 'password': 'cmingcming', 'name': '刷流'},
{'url': 'http://192.168.2.2:18091', 'username': 'cming', 'password': 'cmingcming', 'name': '保种'},
]
# 功能开关
SHOW_ERROR_DETAILS = False # tracker状态详情显示
DELETE_MISSINGFILES = True # 是否删除 "暂停-文件丢失" 类型的任务,如未删除的辅种
START_PAUSEDUP = True # 是否开始 "暂停-下载完成" 类型的任务,如新添加的辅种
# 通知控制开关
ALWAYS_SEND_NOTIFICATION = False # 是否每次执行都发送通知
SEND_DELETE_NOTIFICATION = True # 是否开启删除任务通知
SEND_START_NOTIFICATION = True # 是否开启开始任务通知
ENABLE_SCHEDULED_NOTIFICATION = True # 是否开启定时通知
SCHEDULED_NOTIFICATION_TIMES = ["00:00", "12:00"] # 定时通知时间点,格式为 "HH:MM",该功能需与青龙面板定时配合
# 企业微信机器人Webhook URL,需要在环境变量中添加“WEIXIN_WEBHOOK_URL”
wechat_webhook_url = os.getenv('WEIXIN_WEBHOOK_URL')
# qB状态字典(注意需要小写)
QB_STATUS_DICT = {
'downloading': '正在下载(传输数据)',
'stalleddl': '正在下载(未建立连接)',
'uploading': '正在上传(传输数据)',
'error': '暂停(发生错误)',
'pauseddl': '暂停(下载未完成)',
'pausedup': '暂停(下载完成)',
'missingfiles': '暂停(文件丢失)',
'checkingdl': '检查中(下载未完成)',
'checkingup': '检查中(下载完成)',
'checkingResumedata': '检查中(启动时恢复数据)',
'forceddl': '强制下载(忽略队列)',
'queueddl': '等待下载(排队)',
'forcedup': '强制上传(忽略队列)',
'queuedup': '等待上传(排队)',
'allocating': '分配磁盘空间',
'metadl': '获取元数据',
'moving': '移动文件',
'unknown': '未知状态'
}
def login_to_qb(session, config):
"""登录到qBittorrent"""
try:
response = session.post(f"{config['url']}/api/v2/auth/login", data=config, timeout=10)
if response.status_code == 200 and response.text.strip() == 'Ok.':
print(f"成功登录 {config['url']}")
return True
else:
print(f"登录失败 {config['url']},状态码: {response.status_code}")
return False
except Exception as e:
print(f"登录 {config['url']} 时出现异常: {e}")
return False
def get_torrents_status(session, url):
"""获取所有任务的状态和属性(包含tracker信息)"""
try:
response = session.get(f'{url}/api/v2/torrents/info', timeout=10)
if response.status_code == 200:
torrents_info = response.json()
torrents_with_trackers = []
for torrent in torrents_info:
hash_value = torrent.get('hash')
trackers_response = session.get(f'{url}/api/v2/torrents/trackers', params={'hash': hash_value}, timeout=10)
if trackers_response.status_code == 200:
torrent['trackers'] = trackers_response.json()
torrents_with_trackers.append(torrent)
else:
print(f"获取种子 {torrent.get('name')} 的 tracker 信息失败,状态码:{trackers_response.status_code}")
return torrents_with_trackers
else:
print(f"获取下载任务列表失败,状态码: {response.status_code}")
return []
except Exception as e:
print(f"获取下载任务列表时出现异常: {e}")
return []
def check_errors(torrents_info, session, config):
"""统计正常和错误的任务数量,并记录错误和tracker异常的详情"""
error_count = 0
error_details = []
no_working_tracker_count = 0
no_working_tracker_details = {}
tracker_error_urls = set()
status_counts = {}
for status in QB_STATUS_DICT:
status_counts[status] = 0
hashes_to_delete = []
hashes_to_start = []
for t in torrents_info:
name = t.get('name')
hash_value = t.get('hash')
state = t.get('state').lower()
if state in QB_STATUS_DICT:
status_counts[state] += 1
if state in ['error', 'missingfiles']:
error_count += 1
if SHOW_ERROR_DETAILS:
error_details.append({
'name': name,
'hash': hash_value,
'state': state,
'progress': t.get('progress'),
'error': t.get('error'),
'tracker': t.get('tracker')
})
if DELETE_MISSINGFILES and state == 'missingfiles':
hashes_to_delete.append(hash_value)
if state == 'pausedup' and START_PAUSEDUP:
hashes_to_start.append(hash_value)
trackers = t.get('trackers')
if trackers is None:
continue
if isinstance(trackers, list):
has_working_tracker = False
first_error_url = None
for tracker in trackers:
if tracker.get('status') == 2:
has_working_tracker = True
break
elif first_error_url is None:
url = tracker.get('url')
if url:
parsed_url = urlparse(url)
if parsed_url.scheme and parsed_url.netloc:
first_error_url = parsed_url.scheme + "://" + parsed_url.netloc
if not has_working_tracker:
no_working_tracker_count += 1
if first_error_url:
if first_error_url not in no_working_tracker_details:
no_working_tracker_details[first_error_url] = []
no_working_tracker_details[first_error_url].append(t.get('name'))
if first_error_url:
tracker_error_urls.add(first_error_url)
deleted_torrent_count = len(hashes_to_delete)
if hashes_to_delete:
delete_torrents(session, config['url'], hashes_to_delete, torrents_info)
started_torrent_count = len(hashes_to_start)
if hashes_to_start:
resume_torrents(session, config['url'], hashes_to_start, torrents_info)
return len(torrents_info), error_count, error_details, no_working_tracker_count, no_working_tracker_details, tracker_error_urls, status_counts, deleted_torrent_count, started_torrent_count
def send_wechat_notification(message):
"""通过企业微信机器人发送通知"""
if not wechat_webhook_url:
print("未配置企业微信Webhook URL,无法发送通知。")
return False
try:
headers = {'Content-Type': 'application/json'}
data = {"msgtype": "text", "text": {"content": message}}
response = requests.post(wechat_webhook_url, headers=headers, json=data, timeout=10)
if response.status_code == 200:
print("企业微信通知发送成功")
return True
else:
print(f"发送企业微信通知失败,状态码: {response.status_code}")
return False
except Exception as e:
print(f"发送企业微信通知时出现异常: {e}")
return False
def delete_torrents(session, url, hashes_to_delete, torrents_info):
"""删除指定的种子"""
if not hashes_to_delete:
print("没有要删除的种子。")
return
delete_url = f"{url}/api/v2/torrents/delete"
delete_params = {
'hashes': '|'.join([hash.lower() for hash in hashes_to_delete]),
'deleteFiles': 'true'
}
try:
response = session.post(delete_url, data=delete_params, timeout=10)
if response.status_code == 200:
print(f"成功删除种子: {delete_params['hashes']}")
else:
print(f"删除种子失败: {response.status_code}, {response.text}")
except Exception as e:
print(f"删除种子时发生异常: {e}")
def resume_torrents(session, url, hashes_to_start, torrents_info):
"""开始指定的种子"""
if not hashes_to_start:
print("没有要开始的种子。")
return
resume_url = f"{url}/api/v2/torrents/resume"
resume_params = {
'hashes': '|'.join([hash.lower() for hash in hashes_to_start])
}
try:
response = session.post(resume_url, data=resume_params, timeout=10)
if response.status_code == 200:
print(f"成功开始种子: {resume_params['hashes']}")
else:
print(f"启动种子失败: {response.status_code}, {response.text}")
except Exception as e:
print(f"启动种子时发生异常: {e}")
def should_send_scheduled_notification():
"""检查当前时间是否在设定的通知时间点"""
now = datetime.datetime.now()
current_time = now.strftime("%H:%M")
return current_time in SCHEDULED_NOTIFICATION_TIMES
def main():
notification_message = "下载器状态报告:\n"
results_log = []
total_errors = 0
total_deleted = 0
total_started = 0
for index, config in enumerate(qb_config):
with requests.Session() as session:
if login_to_qb(session, config):
torrents_info = get_torrents_status(session, config['url'])
total, errors, error_details, no_working_tracker_count, no_working_tracker_details, tracker_error_urls, status_counts, deleted_torrent_count, started_torrent_count = check_errors(torrents_info, session, config)
total_errors += errors
status_msg = (
f"下载器: {config['name']}\n"
f"总任务数: {total}\n"
f"正常任务数: {total - errors}\n"
f"错误任务数: {errors}\n"
)
notification_message += status_msg
results_log.append(status_msg)
# 添加各个状态的统计信息,如果数量大于 0 才进行消息推送
for status, count in status_counts.items():
if count > 0: # 只显示数量大于 0 的状态
status_name = QB_STATUS_DICT.get(status, '未知状态')
status_msg = f"{status_name}: {count}"
if status == 'pausedup':
if START_PAUSEDUP and started_torrent_count > 0:
status_msg += " 已开始"
total_started += started_torrent_count
elif status == 'missingfiles':
if DELETE_MISSINGFILES and deleted_torrent_count > 0:
status_msg += " 已删除"
total_deleted += deleted_torrent_count
status_msg += "\n"
notification_message += status_msg
results_log.append(status_msg)
if errors > 0:
if SHOW_ERROR_DETAILS:
details_msg = "错误种子详情:\n"
for detail in error_details:
details_msg += (
f"名称: {detail['name']}\n"
f"状态: {detail['state']}\n"
f"进度: {detail['progress']}\n"
f"错误信息: {detail['error']}\n"
)
notification_message += details_msg + "\n"
results_log.append(details_msg)
if no_working_tracker_count > 0:
no_tracker_msg = f"异常Tracker详情:\n"
for url, torrents in no_working_tracker_details.items():
no_tracker_msg += f"{url} 共{len(torrents)}个\n"
if SHOW_ERROR_DETAILS:
for torrent in torrents:
no_tracker_msg += f"{torrent}\n"
no_tracker_msg += ""
notification_message += no_tracker_msg
results_log.append(no_tracker_msg)
else:
notification_message += "\n"
results_log.append("\n")
else:
status_msg = f"下载器: {config['name']} 登录失败。\n\n"
notification_message += status_msg
results_log.append(status_msg)
if index < len(qb_config) - 1:
notification_message += "\n"
results_log.append("\n")
send_scheduled = ENABLE_SCHEDULED_NOTIFICATION and should_send_scheduled_notification()
send_notification = ALWAYS_SEND_NOTIFICATION or send_scheduled or \
(SEND_DELETE_NOTIFICATION and total_deleted > 0) or \
(SEND_START_NOTIFICATION and total_started > 0)
final_notification_message = ""
if send_notification:
final_notification_message = notification_message
if final_notification_message.endswith("\n\n"):
final_notification_message = final_notification_message[:-2]
if wechat_webhook_url:
send_wechat_notification(final_notification_message)
print("".join(results_log))
else:
print("根据配置,本次不发送企业微信通知。")
if __name__ == "__main__":
try:
main()
except Exception as e:
print(f"脚本运行出错: {e}")
使用方法
该脚本的运行逻辑十分简单,输入下载器地址和登陆信息--进入下载器获取任务列表中的种子及所其状态--统计排版--调用企业微信机器人进行推送
企业微信机器人key的获取也十分简单,创建群组添加机器人即可获取,随后将获取到的url之间填入环境变量中保存即可正常调用。网上有大量的教程教学,这里不做过多阐述
脚本本身调用的都是基础库,理论上不需要手动添加依赖即可运行。
运行后效果如下:
手机端接收效果如下:
可以看到报错的是个丢失文件的种子任务[很正常对吧]
最后,下载器的增加和信息修改直接在配置列表里处理即可,如果需要更换其他的通知方式,理论上可以把代码直接丢给ai,脚本写的还算规整应该不会有难度。:lol: