import ssl
import os
import json
import sys
import certifi
import asyncio
import base64
import secrets
try:
import websockets
except:
print("正在尝试自动安装依赖websockets")
os.system("pip3 install websockets &> /dev/null")
import websockets
try:
from Crypto.PublicKey import RSA
from Crypto.Cipher import PKCS1_v1_5 as Cipher_pkcs1_v1_5
from Crypto.Cipher import AES
from Crypto.Hash import HMAC, SHA256
except:
print("正在尝试自动安装依赖pycryptodome")
os.system("pip3 install pycryptodome &> /dev/null")
from Crypto.PublicKey import RSA
from Crypto.Cipher import PKCS1_v1_5 as Cipher_pkcs1_v1_5
from Crypto.Cipher import AES
from Crypto.Hash import HMAC, SHA256
"""
配合 飞牛系统,转存后自动下载
"""
async def create_websocket(url):
if 'wss' in url:
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ssl_context.minimum_version = ssl.TLSVersion.TLSv1_2
ssl_context.maximum_version = ssl.TLSVersion.TLSv1_3
ssl_context.load_verify_locations(certifi.where())
return await websockets.connect(url, ssl=ssl_context, ping_interval=None)
else:
return await websockets.connect(url, ping_interval=None)
async def wss_connect(websocket):
response = await websocket.recv()
return response
async def close_websocket(websocket):
await websocket.close()
async def send_ping(websocket):
while True:
await asyncio.sleep(5) # 每10秒发送一次Ping消息
await websocket.send('{"req":"ping"}')
def rsa_encrypt(message, public_key):
public_key = RSA.import_key(public_key)
cipher = Cipher_pkcs1_v1_5.new(public_key)
cipher_text = base64.b64encode(cipher.encrypt(message.encode('utf-8')))
return cipher_text.decode('utf-8')
def encrypt(text, key, iv):
cipher = AES.new(key, AES.MODE_CBC, iv)
pad = lambda s: s + (16 - len(s) % 16) * chr(16 - len(s) % 16)
encrypted = base64.b64encode(cipher.encrypt(pad(text).encode()))
return encrypted.decode()
def unpad(data):
pad = data[-1]
if type(pad) is int:
pad = chr(pad)
return data[:-ord(pad)]
def decrypt(text, key, iv):
# 将加密数据转换位bytes类型数据
encodebytes = base64.decodebytes(text.encode())
# 解密
cipher = AES.new(key, AES.MODE_CBC, iv)
text_decrypted = cipher.decrypt(encodebytes)
text_decrypted = unpad(text_decrypted)
return base64.b64encode(text_decrypted).decode()
oneMark = True
def print_progress_bar(iteration, total, prefix='', suffix='', length=35):
global oneMark
percent = (iteration / total) * 100
filled_length = int(length * iteration // total)
bar = '#' * filled_length + ' ' * (length - filled_length)
percent_str = str(int(percent)).zfill(2)
if percent < 100:
percent_str = " " + percent_str
if oneMark:
print(f'{prefix} {bar} {percent_str}% {suffix}', end='')
oneMark = False
else:
print(f'\r{prefix} {bar} {percent_str}% {suffix}', end='')
sys.stdout.flush()
def seconds_to_hms(seconds):
hours = seconds // 3600
remainder = seconds % 3600
minutes = remainder // 60
seconds = remainder % 60
hours_str = str(int(hours)).zfill(2)
minutes_str = str(int(minutes)).zfill(2)
seconds_str = str(int(seconds)).zfill(2)
return f'{hours_str}:{minutes_str}:{seconds_str}'
def format_byte_repr(byte_num):
KB = 1024
MB = KB * KB
GB = MB * KB
TB = GB * KB
try:
if isinstance(byte_num, str):
byte_num = int(byte_num)
if byte_num > TB:
result = '%sTB' % round(byte_num / TB, 2)
elif byte_num > GB:
result = '%sGB' % round(byte_num / GB, 2)
elif byte_num > MB:
result = '%sMB' % round(byte_num / MB, 2)
elif byte_num > KB:
result = '%sKB' % round(byte_num / KB, 2)
else:
result = '%sB' % byte_num
return result
except Exception as e:
print(e.args)
return byte_num
class Fnos_v:
default_config = {
"websocket": "", # 飞牛的websocket地址
"user": "", # 飞牛的用户账号
"password": "", # 飞牛的用户密码
"mount_path": "", # Alist挂载的地址
"download_wait": "", # 是否等待下载完成
}
default_task_config = {
"download_path": "", # 下载路径
}
is_active = True
def __init__(self, **kwargs):
self.plugin_name = self.__class__.__name__.lower()
if kwargs:
for key, _ in self.default_config.items():
if key in kwargs:
setattr(self, key, kwargs[key])
else:
print(f"{self.__class__.__name__} 模块缺少必要参数: {key}")
if self.websocket and self.user and self.password and self.mount_path and self.download_wait:
self.is_active = True
def run(self, task, **kwargs):
dramaList = []
if kwargs['tree'] is not None:
for node in kwargs['tree'].all_nodes():
if node.data['is_dir'] is False:
dramaList.append(f'"{self.mount_path}{node.data['path']}"')
if len(dramaList) < 0:
print(f"飞牛:😄 此次转存无需下载文件!")
else:
task_config = task.get("addition", {}).get(self.plugin_name, self.default_task_config)
print(f"飞牛:🎞️ 转存有需下载文件️")
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
websocket = loop.run_until_complete(create_websocket(self.websocket))
loop.run_until_complete(
websocket.send('{"reqid":"676cf70d00000000000000000001","req":"util.crypto.getRSAPub"}'))
try:
aesKeyByte = None
aesIvByte = None
num = 0
asyncio.ensure_future(send_ping(websocket))
while True:
response = loop.run_until_complete(wss_connect(websocket))
if "-----BEGIN PUBLIC KEY-----" in response:
pub = json.loads(response).get("pub")
si = json.loads(response).get("si")
userData = '{"reqid":"676cf70d00000000000000000002","user":"' + self.user + '","password":"' + self.password + '","deviceType":"Browser","deviceName":"Mac OS-Google Chrome","stay":true,"req":"user.login","si":"' + si + '"}'
aesKeyStr = "lUfJn1XJ9akUvmmwQplpVIy1XNC2jJ3q"
aesIv = secrets.token_bytes(16)
aesIvBase64 = base64.b64encode(aesIv).decode('utf-8')
iv = aesIvBase64
rsa = rsa_encrypt(aesKeyStr, pub)
aes = encrypt(userData, aesKeyStr.encode(), aesIv)
aesKeyByte = aesKeyStr.encode()
aesIvByte = aesIv
sendMsg = '{"rsa":"' + rsa + '","iv":"' + iv + '","aes":"' + aes + '","req":"encrypted"}'
loop.run_until_complete(websocket.send(sendMsg))
elif "676cf70d00000000000000000002" in response:
print(f"飞牛:👨 用户认证成功🏅")
secret = json.loads(response).get('secret')
keys = decrypt(secret, aesKeyByte, aesIvByte)
Secret = base64.b64decode(keys)
s = '{"reqid":"676cf70d00000000000000000003","path":"' + task_config.get(
"download_path") + '","req":"file.mkdir"}'
mark = base64.b64encode(HMAC.new(Secret, s.encode(), digestmod=SHA256).digest()).decode()
loop.run_until_complete(websocket.send(mark + s))
elif "pong" in response:
pass
elif "676cf70d00000000000000000003" in response:
if '"result":"succ"' in response or '"errno":4102' in response:
print(f"飞牛:📄 目标文件夹处理完成 {task_config.get("download_path")}")
a = '{"reqid":"676cf70d00000000000000000004","files":[' + ','.join(
dramaList) + '],"pathTo":"' + task_config.get(
"download_path") + '","overwrite":1,"description":"ABetsy剧集下载","req":"file.cp"}'
mark = base64.b64encode(HMAC.new(Secret, a.encode(), digestmod=SHA256).digest()).decode()
loop.run_until_complete(websocket.send(mark + a))
elif '"result":"fail"' in response:
print(f"飞牛:❌ 文件夹创建失败,请检查文件夹路径 {task_config.get("download_path")}")
break
elif "676cf70d00000000000000000004" in response and '"sysNotify":"taskId"' in response:
print(f"飞牛:💼 收到资源下载任务")
pass
elif "676cf70d00000000000000000004" in response and 'percent' in response:
data = json.loads(response)
if 'true' in self.download_wait.lower():
if num != 0 or num < int(data.get('percent')):
time = seconds_to_hms(data.get('time'))
du = format_byte_repr(data.get('size')) + '/' + format_byte_repr(data.get('sizeTotal'))
speed = format_byte_repr(data.get('speed')) + '/S'
suffix = f'{time} {du} {speed}'
print_progress_bar(data.get('percent'), 100, prefix='飞牛:⌛ ️', suffix=suffix)
num = data.get('percent')
else:
print(f"飞牛:🎞️ 下载任务后台执行")
break
elif '"taskInfo":{"reqid":"676cf70d00000000000000000004"' in response:
pass
elif "676cf70d00000000000000000004" in response and '"result":"succ"' in response:
print()
print(f"飞牛:✅ 下载任务完成")
break
elif "676cf70d00000000000000000004" in response and '"result":"fail"' in response:
print()
print(f"飞牛:❌ 下载任务异常,检查您配置")
break
elif "676cf70d00000000000000000004" in response and '"result":"cancel"' in response:
print()
print(f"飞牛:❌ 下载任务被取消")
break
else:
print(f"{response}")
except Exception as e:
print(f"飞牛:❌ 下载任务异常 {e}")
loop.run_until_complete(close_websocket(websocket))