forked from CHERWING/CHERWIN_SCRIPTS
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathCHERWIN_TOOLS.py
executable file
·527 lines (468 loc) · 19.4 KB
/
CHERWIN_TOOLS.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
import hashlib
import json
import os
import importlib.util
import random
import string
import subprocess
import sys
import time
import requests
from http import HTTPStatus
from datetime import datetime
NOW_TOOLS_VERSION = '2024.06.30'
if os.path.isfile('DEV_ENV.py'):
import DEV_ENV
IS_DEV = True
else:
IS_DEV = False
# 尝试导入包
def import_or_install(package_name, import_name=None):
# 如果传入了 import_name,则使用它来检查模块,否则默认与包名相同
import_name = import_name or package_name
try:
# 检查模块是否已安装
package_spec = importlib.util.find_spec(import_name)
if package_spec is None:
print(f"{package_name} 模块未安装. 开始安装...")
subprocess.check_call([sys.executable, '-m', 'pip', 'install', package_name])
print(f"{package_name} 模块安装完成。")
else:
print(f"{package_name} 模块已安装。")
# 尝试导入模块检查是否安装成功
__import__(import_name)
module = importlib.import_module(import_name)
print(f"{import_name} 模块导入成功.")
return module
except ImportError as e:
print(f"无法导入 {import_name} 模块. 错误信息: {e}")
except subprocess.CalledProcessError as e:
print(f"安装 {package_name} 模块时出错. 错误信息: {e}")
except Exception as e:
print(f"处理 {package_name} 模块时发生错误. 错误信息: {e}")
def SAVE_INVITE_CODE(file_name, new_data):
# 读取现有JSON文件(如果存在)
try:
with open(file_name, 'r', encoding='utf-8') as file:
data = json.load(file)
except FileNotFoundError:
# 如果文件不存在,创建所需目录并一个新的空JSON文件
directory = os.path.dirname(file_name)
if not os.path.exists(directory):
os.makedirs(directory)
data = {}
# 检查是否已存在相同的键,如果存在,合并数据
for key, value in new_data.items():
if key in data:
# 如果键已存在,将新数据合并到现有数据中
data[key].update(value)
else:
# 如果键不存在,直接插入新数据
data[key] = value
# 将更新后的数据写入JSON文件
with open(file_name, 'w', encoding='utf-8') as file:
json.dump(data, file, indent=4)
# 将参数转换为字典
def create_dict_from_string(self, data_string):
params = {}
key_value_pairs = data_string.split(',')
for pair in key_value_pairs:
key, value = pair.split('=')
params[key] = value
return params
def compare_versions(local_version, server_version):
local_parts = local_version.split('.') # 将本地版本号拆分成数字部分
server_parts = server_version.split('.') # 将服务器版本号拆分成数字部分
for l, s in zip(local_parts, server_parts):
if int(l) < int(s):
return True
# 当前版本低于服务器版本
elif int(l) > int(s):
return False
# 当前版本高于服务器版本
# 如果上述循环没有返回结果,则表示当前版本与服务器版本的数字部分完全相同
if len(local_parts) < len(server_parts):
return True # 当前版本位数较短,即版本号形如 x.y 比 x.y.z 低
else:
return False # 当前版本与服务器版本相同或更高
def CHECK_UPDATE_NEW(local_version, server_version, server_script_url, script_filename, server_version_url=None,
APP_NAME=None):
"""
检查版本并更新
Args:
local_version (str): 本地版本号
server_version_url (str): 服务器版本文件地址
server_script_url (str): 服务器脚本地址
script_filename (str): 要保存的脚本文件名
Returns:
bool: 是否进行了更新操作
"""
print(f'当前检测:【{script_filename}】')
try:
if server_version_url:
# 获取服务器版本号
response = requests.get(server_version_url, verify=False)
response.raise_for_status() # Raises an HTTPError for bad responses
# print(response.text)
server_version = response.text.strip() # 去除首尾空格
if "code" in server_version:
print('【获取远程版本号失败,设为本地同版本】')
server_version = local_version
if not server_version: server_version = NOW_TOOLS_VERSION
print(f'本地版本:【{local_version}】')
print(f'服务器版本:【{server_version}】')
if compare_versions(local_version, server_version):
# 需要更新,下载服务器脚本
AUTO_UPDATE = os.getenv("SCRIPT_UPDATE", "True").lower() != "false"
# print(AUTO_UPDATE)
if AUTO_UPDATE:
print(">>>>>>>发现新版本的脚本,默认自动更新,准备更新...")
print(">>>>>>>禁用更新请定义变量export SCRIPT_UPDATE = 'False'")
if down_file(script_filename, server_script_url):
print(f'请重新运行新脚本\n')
return True
else:
print(">>>>>>>发现新版本的脚本,您禁用了自动更新,如需启用请删除变量SCRIPT_UPDATE\n")
else:
print(f'无需更新\n')
return False
except requests.exceptions.RequestException as e:
print(f'发生网络错误:{e}')
# server_base_url = f"https://py.cherwin.cn/{APP_NAME}/"
# server_script_url = f"{server_base_url}{script_filename}"
# CHECK_UPDATE_NEW(local_version, server_version, server_script_url, script_filename, APP_NAME=APP_NAME)
except Exception as e:
print(f'发生未知错误:{e}')
return False # 返回 False 表示没有进行更新操作
def down_file(filename, file_url):
print(f'开始下载:{filename},下载地址:{file_url}')
try:
response = requests.get(file_url, verify=False, timeout=10)
response.raise_for_status()
with open(filename + '.tmp', 'wb') as f:
f.write(response.content)
print(f'【{filename}】下载完成!')
# 检查临时文件是否存在
temp_filename = filename + '.tmp'
if os.path.exists(temp_filename):
# 删除原有文件
if os.path.exists(filename):
os.remove(filename)
# 重命名临时文件
os.rename(temp_filename, filename)
print(f'【{filename}】重命名成功!')
return True
else:
print(f'【{filename}】临时文件不存在!')
return False
except Exception as e:
print(f'【{filename}】下载失败:{str(e)}')
return False
def get_AuthorInviteCode(url):
global AuthorCode
try:
response = requests.get(url, verify=False, timeout=10)
if response.status_code == 200:
content = json.loads(response.text)
AuthorCode = list(content.values())
# print(f'获取到作者邀请码:{AuthorCode}')
return AuthorCode
else:
# print("无法获取文件。状态代码:", response.status_code)
return {}
except Exception as e:
print(f"An error occurred: {e}")
return {}
def CHECK_PARAMENTERS(index, input_string, required_parameters):
# required_parameters = ['deviceid', 'jysessionid', 'shopid', 'memberid', 'access_token', 'sign']
# 记录缺少的参数
missing_parameters = []
# 将输入字符串和参数列表中的所有字符都转换为小写
input_string_lower = input_string.lower()
required_parameters_lower = [param.lower() for param in required_parameters]
# 判断字符串中是否包含所有必需的参数
for param in required_parameters_lower:
if param not in input_string_lower:
missing_parameters.append(param)
if missing_parameters:
print(f"\n第【{index + 1}】个账号,缺少以下参数:【{missing_parameters}】")
return False
else:
print(f"\n第【{index + 1}】个账号,URL包含所有必需的参数,开始执行脚本")
return True
def QIANWEN(tongyiSysPromt, content, api_key):
print('开始调用通义千问')
# 检查dashscope库是否已安装
dashscope = import_or_install('dashscope')
if dashscope:
dashscope.api_key = api_key
response = dashscope.Generation.call(
model='qwen-max',
messages=[
{"role": "system",
"content": tongyiSysPromt},
{"role": "user", "content": content}],
seed=1234,
top_p=0.8,
result_format='message',
enable_search=False,
max_tokens=1500,
temperature=1.0,
repetition_penalty=1.0,
)
if response.status_code == HTTPStatus.OK:
# print(response)
video_info = response.output['choices'][0]['message']['content']
print('通义生成【成功】!')
return video_info
else:
print(f"无法解析通义返回的信息:{response}")
return None
else:
print('dashscope 模块无法导入,函数无法执行。')
# 取环境变量,并分割
def ENV_SPLIT(input_str):
parts = []
if '&' in input_str:
amp_parts = input_str.split('&')
for part in amp_parts:
if '#' in part:
hash_parts = part.split('#')
for hash_part in hash_parts:
parts.append(hash_part)
else:
parts.append(part)
# print(parts)
return (parts)
elif '#' in input_str:
hash_parts = input_str.split('#')
# print(hash_parts)
return (hash_parts)
else:
out_str = str(input_str)
# print([out_str])
return ([out_str])
# 使用导入的模块进行验证码识别
def CAPCODE(captcha_slider, captcha_bg):
ddddocr = import_or_install('ddddocr')
if ddddocr:
slide = ddddocr.DdddOcr(det=False, ocr=False)
with open(captcha_slider, 'rb') as f:
target_bytes = f.read()
with open(captcha_bg, 'rb') as f:
background_bytes = f.read()
res = slide.slide_match(target_bytes, background_bytes, simple_target=True)
# print(res['target'][0])
# print(type(res['target'][0]))
return res['target'][0]
else:
print('ddddocr 模块无法导入,函数无法执行。')
return False
def send_wxpusher(UID, one_msg, APP_NAME, help=False):
WXPUSHER = os.environ.get('WXPUSHER', False)
if WXPUSHER:
if help:
push_res = wxpusher(WXPUSHER, APP_NAME + '互助', one_msg, UID, TIPS_HTML)
else:
push_res = wxpusher(WXPUSHER, APP_NAME, one_msg, UID, TIPS_HTML)
print(push_res)
def wxpusher(UID, msg, title, help=False):
"""利用 wxpusher 的 web api 发送 json 数据包,实现微信信息的发送"""
WXPUSHER = os.environ.get('WXPUSHER', False)
if WXPUSHER:
if help: title = title + '互助'
print('\n------开始wxpusher推送------')
print(f'标题:【{title}】\n内容:{msg}')
webapi = 'http://wxpusher.zjiecode.com/api/send/message'
msg = msg.replace("\n", "<br>")
# tips = TIPS_HTML.replace("\n", "<br>")
data = {
"appToken": WXPUSHER,
"content": f'{title}<br>{msg}<br>{TIPS_HTML}',
# "summary": msg[:99], # 该参数可选,默认为 msg 的前10个字符
"summary": title,
"contentType": 2,
"uids": [UID],
"url": "https://gj.cherwin.cn"
}
try:
result = requests.post(url=webapi, json=data)
result.raise_for_status() # 对于非2xx状态码,抛出异常
response_json = result.json()
if response_json["success"]:
return "------消息发送成功------\n"
else:
return f"消息发送失败。错误信息:{response_json['msg']}"
except requests.exceptions.RequestException as e:
return f"发送消息时发生错误:{str(e)}"
except Exception as e:
return f"发生意外错误:{str(e)}"
def RESTART_SCRIPT(RESTART_SCRIPT_NAME):
python = sys.executable
os.execl(python, RESTART_SCRIPT_NAME, *sys.argv[1:])
def CHECK():
global CHERWIN_SCRIPT_CONFIG
print('>>>>>>>开始获取版本信息...')
baseurl = 'https://github.com/chenpinetree/CHERWIN_SCRIPTS/raw/main/'
TOOLS_NAME = 'CHERWIN_TOOLS.py'
server_script_url = f'https://github.com/CHERWING/CHERWIN_SCRIPTS/raw/main/{TOOLS_NAME}'
try:
response = requests.get(f'{baseurl}CHERWIN_SCRIPT_CONFIG.json', verify=False)
response.encoding = 'utf-8'
# 读取内容
CHERWIN_SCRIPT_CONFIG = response.json()
if 'code' in CHERWIN_SCRIPT_CONFIG:
CHERWIN_SCRIPT_CONFIG = None
TOOLS_VERSION = CHERWIN_SCRIPT_CONFIG.get('TOOLS_VERSION', NOW_TOOLS_VERSION)
if CHECK_UPDATE_NEW(NOW_TOOLS_VERSION, TOOLS_VERSION, server_script_url, TOOLS_NAME):
print('更新脚本完成')
# print(f'重新检测[{TOOLS_NAME}]版本')
return False
else:
return True
except:
print('获取CHERWIN_SCRIPT_CONFIG.json失败')
return False
def GJJJ_SIGN():
app_id = "667516"
app_crypto = "FH3yRrHG2RfexND8"
timestamp = int(time.time() * 1000)
# timestamp = 1715180892075
text = f"{app_id}{app_crypto}{timestamp}"
sign = hashlib.md5(text.encode()).hexdigest()
new_data = {
'timestamp': str(timestamp),
"sign": sign
}
return new_data
def KWW_SIGN(memberId):
timestamp = int(time.time() * 1000)
random_num = random.randint(0, 31)
u = [
"A", "Z", "B", "Y", "C", "X", "D", "T", "E", "S", "F", "R", "G", "Q", "H", "P", "I", "O", "J", "N", "k",
"M", "L", "a", "c", "d", "f", "h", "k", "p", "y", "n"]
r = f"{timestamp}{memberId}{u[random_num]}"
sign = hashlib.md5(r.encode()).hexdigest()
update_headers = {
"user-sign": sign,
"user-paramname": "memberId",
"user-timestamp": str(timestamp),
"user-random": str(random_num)
}
return update_headers
def TYQH_SIGN(parameters={}, body=None):
sorted_keys = sorted(parameters.keys())
parameter_strings = []
for key in sorted_keys:
if isinstance(parameters[key], dict):
parameter_strings.append(f"{key}={json.dumps(parameters[key])}")
else:
parameter_strings.append(f"{key}={parameters[key]}")
current_time = int(datetime.now().timestamp() * 1000)
secret_chars = list('BxzTx45uIGT25TTHIIBU2')
last_three_digits = str(current_time)[-3:]
for digit in last_three_digits:
secret_chars.insert(int(digit), digit)
secret = hashlib.md5(''.join(secret_chars).encode()).hexdigest()
nonce_str = ''.join(random.choices(string.ascii_letters + string.digits, k=16))
sign_data = {
'client_id': 'game',
'nonstr': nonce_str,
'timestamp': current_time,
'body': json.dumps(body) if body else '',
'query': '&'.join(parameter_strings) if parameter_strings else '',
'secret': secret
}
sign_string = '|'.join([str(v) for v in sign_data.values()])
sign = hashlib.md5(sign_string.encode()).hexdigest().upper()
sign_header = {
'client_id': 'game',
'timestamp': str(current_time),
'nonstr': sign_data['nonstr'],
'sign': sign
}
return sign_header
def YDXQ_SIGN():
sign_nonce = "tnFWIEFpVPJkOuNX4zdsKeBEMIakLS1RsnS7cH0Id6MjEEBGO"
n = str(int(time.time()))
# 拼接字符串并使用md5哈希。注意在Python中,需要对字符串编码才能生成哈希。
sign_string = f"sign_{n}_sign{sign_nonce}"
sign_hash = hashlib.md5(sign_string.encode()).hexdigest()
return sign_hash, n
def HXEK_SIGN(memberId, appid):
# appid = "wxa1f1fa3785a47c7d"
secret = 'damogic8888'
# 获取GMT+8的当前时间戳
timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
# timestamp = '2024-05-22 16:07:54'
# 生成随机数
random_int = random.randint(1000000, 9999999)
# random_int = 4270745
# 构建待加密字符串
raw_string = f"timestamp={timestamp}transId={appid}{timestamp}secret={secret}random={random_int}memberId={memberId}"
# 使用MD5进行加密
md5_hash = hashlib.md5(raw_string.encode())
sign = md5_hash.hexdigest()
return sign, random_int, timestamp
def KPL_SIGN(url, params):
secret_key = "d19b9f22f5aac41ac0b56a1947f82bce"
# 提取URL路径(去掉域名部分)
url_path = url.replace("https://app.tv.kohesport.qq.com", "")
# 如果params是对象,转换为JSON字符串
if isinstance(params, dict):
params_str = json.dumps(params, separators=(',', ':'))
else:
params_str = params
# 拼接路径、参数和密钥
string_to_hash = f"{url_path}{params_str}{secret_key}"
# 计算SHA256哈希值
signature = hashlib.sha256(string_to_hash.encode('utf-8')).hexdigest()
sign_header = {
"X-TGATV-SignatureMethod": "sha256",
"X-TGATV-SignatureVersion": "3",
"X-TGATV-Signature": signature
}
return sign_header
def get_ip():
response = requests.get('https://cdn.jsdelivr.net/gh/parserpp/ip_ports/proxyinfo.json',verify=False)
# 使用正则表达式提取 IP 地址和端口号
data = response.text
lines = data.strip().split('\n')
# json_objects = [json.loads(line) for line in lines]
json_objects = [json.loads(line) for line in lines if json.loads(line)["country"] == "CN"]
# json_array = json.dumps(json_objects, indent=4)
if json_objects:
selected = random.choice(json_objects)
result = f"{selected['type']}://{selected['host']}:{selected['port']}"
proxies = {
selected['type']: result,
}
print(f"当前代理:{result}")
return proxies
else:
print("没匹配到CN的ip")
return None
def main(APP_NAME, local_script_name, ENV_NAME, local_version, need_invite=False):
global APP_INFO, TIPS, TIPS_HTML
git_url = f'https://github.com/CHERWING/CHERWIN_SCRIPTS/raw/main/{local_script_name}'
if CHECK():
APP_INFO = CHERWIN_SCRIPT_CONFIG.get("APP_CONFIG", {}).get(ENV_NAME, {})
# print(APP_INFO)
server_version = APP_INFO.get('NEW_VERSION', '')
if CHECK_UPDATE_NEW(local_version, server_version, git_url, local_script_name, APP_NAME=APP_NAME):
print('更新成功,请重新运行脚本!')
if not APP_INFO.get('ENABLE', False) and not IS_DEV:
print('当前脚本未开放')
exit()
TIPS = APP_INFO.get('NTC', '') if APP_INFO.get('NTC', '') else CHERWIN_SCRIPT_CONFIG.get('GLOBAL_NTC', '')
TIPS_HTML = APP_INFO.get('NTC', '') if APP_INFO.get('NTC', '') else CHERWIN_SCRIPT_CONFIG.get('GLOBAL_NTC_HTML','')
ENV = os.environ.get(ENV_NAME)
if need_invite:
AuthorCode = get_AuthorInviteCode(f'https://yhsh.ziyuand.cn/{ENV_NAME}_INVITE_CODE.json')
else:
AuthorCode = ''
return ENV, APP_INFO, TIPS, TIPS_HTML, AuthorCode
else:
exit()
if __name__ == '__main__':
print(NOW_TOOLS_VERSION)