-
Notifications
You must be signed in to change notification settings - Fork 0
/
tools.py
710 lines (658 loc) · 26.4 KB
/
tools.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
# -*- coding: utf-8 -*-
from base64 import b64decode, b64encode
try:
from chardet import detect
except ImportError:
detect = None
from hashlib import md5 as _md5
from importlib import import_module, reload as _reload
from json import loads as json_loads
import os
import re
from smtplib import SMTP_SSL
from time import time
try:
from subprocess import call
except ImportError:
call = None
import sys
from email.mime.text import MIMEText
from email.utils import parseaddr, formataddr
from email.header import Header
try:
from win32clipboard import OpenClipboard, GetClipboardData, CloseClipboard, EmptyClipboard, SetClipboardText
except ImportError:
CloseClipboard = None
try:
from lxml.etree import HTML as etree_HTML
except ImportError:
etree_HTML = None
try:
from PIL.Image import open as Image_open
except ImportError:
Image_open = None
try:
import requests as session
from requests.exceptions import ConnectTimeout
except ImportError:
session = None
def _restore(link) -> str:
bytes_ = b64decode(link)
assert detect, "Please install the `chardet` module."
try:
str_ = bytes_.decode(detect(bytes_)["encoding"])
except (TypeError, AssertionError):
try:
str_ = bytes_.decode("utf8")
except UnicodeDecodeError:
str_ = bytes_.decode("gbk")
return str_
############################################################
# allow
# allow([_type, ]_url)
def allow(*_str) -> bool:
_url = ""
_type = "*"
regcom = re.compile(r"https?(://[^/]*)", re.I)
while _url == "":
for i in _str:
if regcom.search(i):
_url = i
else:
_type = i.upper()
_list = _robots(_url)
if _list:
for i in _list:
# 爬虫名称或种类
if i[0] == _type or i[0] == "*":
for j in i[1]: # j ['Disallow', '.*?/s\\?.*']
reg = re.compile(j[1], re.I)
if reg.match(_url): # fix 以第一个匹配到的为准。之前的写法是错的。
return False if j[0].upper() == "DISALLOW" else True
return True # 默认允许爬取
# 拼接链接,尝试http和https
def _robots(_url):
url_com = str(re.search(r"https?(://[^/]*)", _url)[1])
list1 = robots_(f"http{url_com}/robots.txt")
if list1:
return list1
else:
list2 = robots_(f"https{url_com}/robots.txt")
return list2 if list2 else False
# _
def robots_(url):
"""
任何一条Disallow记录为空,说明该网站的所有部分都允许被访问,在"/robots.txt"文件中,至少要有一条Disallow记录。
如果"/robots.txt"是一个空文件,则对于所有的搜索引擎robot,该网站都是开放的。
一个网站的所有URL默认是Allow的,所以Allow通常与Disallow搭配使用,实现允许访问一部分网页同时禁止访问其它所有URL的功能。
需要特别注意的是Disallow与Allow行的顺序是有意义的,robot会根据第一个匹配成功的Allow或Disallow行确定是否访问某个URL。
robots支持使用通配符"*"和"$"来模糊匹配url: # 其他的不符合正则语法,需要转义
"$" 匹配行结束符。
"*" 匹配0或多个任意字符
"""
assert session, "Please install the or `requests` module."
cache = f"./cache/{md5(url)}"
if not os.path.exists('./cache/'): os.makedirs('./cache/')
if os.path.exists(cache) and int(time()-os.stat(cache).st_mtime)/86400 >= 1:
os.remove(cache)
status_code = 200
if not os.path.exists(cache):
status_code = "error"
text = ""
try:
r = session.get(url, timeout=5, verify=False)
status_code = r.status_code
text = r.text
except:
pass
with open(cache, "w", encoding="utf8") as f:
f.write(text)
else:
with open(cache, "r", encoding="utf8") as f:
text = f.read()
if status_code == 200:
_list1 = re.split(r"\n", text)
list1 = []
reg = re.compile(r"^User-agent:\s*(.*)", re.I)
reg12 = re.compile(r"^((Dis)?allow):\s*(.*)", re.I)
reg3 = re.compile(r"^Crawl-delay:\s*(.*)", re.I)
regz = re.compile(r"([\{\}\[\]\\\^\.\?\+])") # {}[]\^.?+ 记起了再补充
reg_ = re.compile(r"(\*)")
reg__ = re.compile(r"(\$$)")
for i in _list1:
if i == "": # 空行
pass
elif reg.search(i):
list1.append([]) # 爬虫
list1[-1].append(reg.search(i)[1].upper()) # 爬虫名称,大写 [][0]
list1[-1].append([]) # (Disallow or Allow):value [][1] 需要特别注意的是Disallow与Allow行的顺序是有意义的
list1[-1].append([]) # Crawl-delay [][2] 时间间隔
elif reg12.search(i):
# [][1][0] ['Disallow', '/baidu']
# [][1][0][0] 'Disallow'
tmp = ".*?"+reg_.sub(r".*", regz.sub(r"\\\1", reg12.search(i)[3]))
if not reg__.search(i):
tmp += ".*"
list1[-1][1].append([reg12.search(i)[1], tmp])
elif reg3.search(i):
list1[-1][2].append(reg3.search(i)[1])
return list1
else:
return False
# allow
############################################################
def chapterNum(aForHtml: str) -> str:
num = 0
temp = str(aForHtml)
tempNum = re.search(r"第([^0-9章节回(部分)]+)([章节回(部分)])", temp)
if tempNum:
b = tempNum[2]
tempNum = tempNum[1]
tempNum = re.sub(r"[一壹Ⅰ]",r"1", tempNum)
tempNum = re.sub(r"[二贰Ⅱ]",r"2", tempNum)
tempNum = re.sub(r"[三叁Ⅲ]",r"3", tempNum)
tempNum = re.sub(r"[四肆Ⅳ]",r"4", tempNum)
tempNum = re.sub(r"[五伍Ⅴ]",r"5", tempNum)
tempNum = re.sub(r"[六陆Ⅵ]",r"6", tempNum)
tempNum = re.sub(r"[七柒Ⅶ]",r"7", tempNum)
tempNum = re.sub(r"[八捌Ⅷ]",r"8", tempNum)
tempNum = re.sub(r"[九玖Ⅸ]",r"9", tempNum)
tempNum = re.sub(r"[零〇]",r"0", tempNum)
if re.search(r"^\d+$", tempNum):
aForHtml = re.sub(r"第[^0-9章节回(部分)]+([章节回(部分)])", "第"+tempNum+b, aForHtml)
else:
numList = re.findall(r"[1-9][^\d]|[^\d]|\d", tempNum)
for i in numList:
if re.search(r"亿", i):
if re.search(r"\d", i):
num = (num + int(re.search(r"\d", i)[0])) * 100000000
else:
num *= 100000000
elif re.search(r"[万萬]", i):
if re.search(r"\d", i):
num = (num + int(re.search(r"\d", i)[0])) * 10000
else:
num *= 10000
elif re.search(r"[千仟]", i):
num += int(re.search(r"\d", i)[0]) * 1000
elif re.search(r"[百佰]", i):
num += int(re.search(r"\d", i)[0]) * 100
elif re.search(r"[十什拾Ⅹ]", i):
if re.search(r"\d", i):
num += int(re.search(r"\d", i)[0]) * 10
else:
num += 10
elif re.search(r"^\d$", i):
if re.search(r"0", tempNum):
num += int(re.search(r"\d", i)[0])
else:
temp = len(str(re.sub(r"[\s\S]*?(0+$)", r"\1", str(num)))) - 1
num += int(re.search(r"\d", i)[0]) * (10**temp)
aForHtml = re.sub(r"第[^0-9章节回(部分)]+([章节回(部分)])", "第"+str(num)+b, aForHtml)
return aForHtml
class Clipboard():
@property
def data(self):
# Clipboard().data
return self.getData()
@data.setter
def data(self, value):
# Clipboard().data = "xxx"
self.setData(value)
@staticmethod
def init():
assert CloseClipboard, "Please install the `win32clipboard` module."
try:
CloseClipboard() # 解决进程异常结束时可能存在的问题
except:
pass
@staticmethod
def getData():
# Clipboard.getData()
Clipboard.init()
OpenClipboard()
data = GetClipboardData()
CloseClipboard()
return data
@staticmethod
def setData(data: str="") -> None:
# Clipboard.setData()
Clipboard.init()
OpenClipboard()
EmptyClipboard()
SetClipboardText(data)
CloseClipboard()
# download("http://www.baidu.com"[, path])
def download(*_str):
if len(_str) > 0:
url = _str[0]
_type = re.sub(r".*//[^/]*", r"", url)
_type = re.sub(r"[@\?].*", r"", _type)
_type = re.sub(r".*/", r"", _type)
try:
_type = re.match(r".*?(\.[^\.]*)$", _type)[1]
except:
_type = ""
else:
return False
if len(_str) > 1:
fpath = _str[1] + md5(url) + _type
else:
fpath = md5(url) + _type
if len(_str) > 2:
new = _str[2]
else:
new = False
if new or not os.path.exists(fpath):
assert session, "Please install the or `requests` module."
r = session.get(url, timeout=5)
if r.status_code == 200:
with open(fpath, "wb+") as f:
f.write(r.content)
else:
raise Exception(f"The page request failed, the response code is: {r.status_code}\n\r")
del r
return os.path.abspath(fpath)
def filter(url, path="blacklist.txt") -> str:
"""filter(url, path="blacklist.txt") path默认值为blacklist.txt。url在文件(path)里匹配成功则返回值为空,否则原样返回url。"""
try:
with open(path, "r", encoding="utf8") as f:
blacklist = f.readlines()
except FileNotFoundError:
with open(path, "w", encoding="utf8") as f:
pass
blacklist = []
for i in blacklist:
i = i.strip()
try:
j = re.compile(i)
if j.search(url):
return ""
except re.error:
pass
finally:
if url.find(i) > -1:
return ""
return url
def flashgetLinkGenerator(link_: str) -> str:
return "flashget://" + str(b64encode(f"[FLASHGET]{link_}[FLASHGET]".encode("utf-8")))[2:-1]
def flashgetLinkRestore(link_: str):
link = link_[11:]
if len(link) == 0 or not link_.startswith("flashget://"):
print("`{}`不是快车链接!".format(link_))
return None
return _restore(link)[10:-10]
def getIP(url):
# 自行实现
return False
# 例如
domain = len(re.findall(r"\.", url))
if domain<1:
return False
assert session, "Please install the or `requests` module."
domain = re.search(r"[^\./:]+\.[^\.]+$", url)[0]
try:
# r = session.get("https://{}.ipaddress.com".format(url), timeout=5)
r = session.get("https://ipaddress.com/website/{}".format(domain), timeout=5)
except ConnectTimeout:
return
_ip = re.findall("""https://www.ipaddress.com/ipv4/([\d\.]+)""", r.text)
ip = []
for i in _ip:
if i not in ip:
ip.append(i)
return ip if ip else False
class hosts:
"""hosts in Windows"""
def __init__(self, path: str=r"C:\WINDOWS\System32\drivers\etc\hosts"):
self.path = path
self.backup = os.path.join(os.getcwd(), "hosts_backup")
os.system(f"copy {self.path} {self.backup}")
self.new = os.path.join(os.getcwd(), "hosts_new")
# 网络收集,不一定完善
self._github_ = ['avatars5.githubusercontent.com',
'avatars2.githubusercontent.com',
'avatars0.githubusercontent.com', 'github.blog',
'github-production-repository-file-5c1aeb.s3.amazonaws.com',
'gist.github.com', 'alive.github.com', 'github.community',
'github.global.ssl.fastly.net', 'api.github.com',
'pipelines.actions.githubusercontent.com',
'github.githubassets.com', 'favicons.githubusercontent.com',
'github.map.fastly.net', 'central.github.com',
'avatars3.githubusercontent.com',
'github-releases.githubusercontent.com',
'avatars1.githubusercontent.com', 'live.github.com',
'gist.githubusercontent.com', 'cloud.githubusercontent.com',
'github-cloud.s3.amazonaws.com', 'github.io',
'raw.githubusercontent.com', 'collector.github.com',
'objects.githubusercontent.com',
'user-images.githubusercontent.com',
'github-production-user-asset-6210df.s3.amazonaws.com',
'github.dev', 'github-com.s3.amazonaws.com', 'github.com',
'media.githubusercontent.com', 'codeload.github.com',
'camo.githubusercontent.com', 'collector.githubapp.com',
'github-production-release-asset-2e65be.s3.amazonaws.com',
'githubstatus.com', 'desktop.githubusercontent.com',
'avatars4.githubusercontent.com', 'assets-cdn.github.com',
'avatars.githubusercontent.com']
def addGithub(self):
self.removeGithub()
os.system(f"copy {self.path} {self.backup}")
os.system(f"copy {self.backup} {self.new}")
with open(self.backup, "r", encoding="utf-8") as f:
data = f.readlines()
with open(self.new, "a+", encoding="utf-8") as f:
f.write("# github\n")
for value in self._github_:
try:
ip_value = getIP(value)
except AssertionError: # 网页请求失败
continue
print(f"{ip_value} {value}")
if ip_value:
for i in ip_value:
f.write(f"{i} {value}\n")
continue
self.flushdns()
def flushdns(self):
assert call, "Please install the `subprocess` module."
msg = call(["copy", self.new, self.path, "/y"],shell=True) # 复制到系统hosts路径
if msg==0:
os.remove(self.new)
call(["ipconfig", "/flushdns"],shell=True) # 刷新DNS缓存
else:
print(f"请手动将 {self.new} 复制到 {self.path}")
print("ipconfig /flushdns")
def removeGithub(self):
with open(self.backup, "r", encoding="utf-8") as f:
data = f.readlines()
with open(self.new, "w+", encoding="utf-8") as f: # 清空文件
pass
_all_ = []
with open(self.new, "a+", encoding="utf-8") as f:
for i in data:
if "github" in i:
if re.search(r"^#", i): continue # 注释行
value = re.search(r"\d+\.\d+\.\d+\.\d+\s+([^\s]+)", i)[1] # ipv4
if value: continue
f.write(i) # 其他
self.flushdns()
def updateGithub(self):
with open(self.backup, "r", encoding="utf-8") as f:
data = f.readlines()
with open(self.new, "w+", encoding="utf-8") as f: # 清空文件
pass
_all_ = []
with open(self.new, "a+", encoding="utf-8") as f:
for i in data:
if "github" in i:
if re.search(r"^#", i): continue # 注释行
value = re.search(r"\d+\.\d+\.\d+\.\d+\s+([^\s]+)", i)[1] # ipv4
if value in _all_: continue
_all_.append(value)
try:
ip_value = getIP(value)
except AssertionError: # 网页请求失败
f.write(i) # 保持不变
print(str(i))
continue
print(f"{ip_value} {value}")
if ip_value:
for j in ip_value:
f.write(f"{j} {value}\n")
continue
f.write(i) # 其他
self.flushdns()
class image:
@staticmethod
def compress(img: str, out: str="", out_size: int=150, step: int=10, quality: int=80):
"""保持图片长宽比例,压缩到指定大小size(KB)"""
assert Image_open, "Please install the `PIL` module."
img_size = os.path.getsize(img) / 1024
if img_size <= out_size:
return img
name, suffix = os.path.splitext(img)
if not out:
out = f"{name}_compress{suffix}"
im = Image_open(img)
if re.search("jpe?g", os.path.splitext(out)[1], re.I):
im = im.convert("RGB") # fix `OSError: cannot write mode RGBA as JPEG`
while img_size > out_size:
im.save(out, quality=quality)
img_size = os.path.getsize(out) / 1024
if quality - step < 0:
if step < 1 or quality <= 0: break
step /= 2 # 动态调整
quality -= step
return out, img_size
def linkConverter(link_) -> dict:
linkList = {}
if link_.startswith("flashget://"):
linkList["real"] = flashgetLinkRestore(link_)
elif link_.startswith("qqdl://"):
linkList["real"] = qqdlLinkRestore(link_)
elif link_.startswith("thunder://") or link_.startswith("thunderx://"):
linkList["real"] = thunderLinkRestore(link_)
else:
linkList["real"] = link_
linkList["flashget"] = flashgetLinkGenerator(linkList["real"])
linkList["qqdl"] = qqdlLinkGenerator(linkList["real"])
linkList["thunder"] = thunderLinkGenerator(linkList["real"])
return linkList
# md5(str[, encoding]) or md5(bytes) or md5(int)
def md5(*_str):
if len(_str) > 0:
t = _str[0]
if type(t) is not str:
t = str(t)
encode_type = "utf-8"
if len(_str) > 1:
encode_type = _str[1]
m = _md5()
try:
t = t.encode(encode_type)
except LookupError:
t = t.encode("utf-8")
m.update(t)
return m.hexdigest()
else:
print("缺少参数!")
return False
# tools.meiriyiwen().print()
def meiriyiwen(fdir="./cache/", new=False):
"""fdir: 缓存文件夹。new: 重新请求,默认优先使用已有缓存。"""
class SimpleArticle:
def __init__(self):
self.title = ""
self.author = ""
self.content = ""
def print(self, _print=True):
result = "标题:{}\n\r作者:{}\n\r\n\r{}".format(self.title, self.author, self.content)
if _print:
print(result)
return result
article = SimpleArticle()
if not os.path.exists(fdir):
os.makedirs(fdir)
fpath = download("https://meiriyiwen.com/", fdir, new)
with open(fpath, "r", encoding="utf8") as f:
_html = f.read()
assert etree_HTML, "Please install the `lxml` module."
html_ = etree_HTML(_html)
article.title = html_.xpath("//h1")[0].text
article.author = html_.xpath("//p[@class='article_author']/span")[0].text
content_ = html_.xpath("//div[@class='article_text']//p")
for i in content_:
try:
if i.text.strip():
article.content += " " + i.text.strip() + "\n\r"
except Exception as err: # todo 等待下一次报错,获取网页源码
return err + "\n\r\n\r" + str(_html)
return article
def qqdlLinkGenerator(link_: str) -> str:
return "qqdl://" + str(b64encode(link_.encode("utf-8")))[2:-1]
def qqdlLinkRestore(link_: str):
link = link_[7:]
if len(link) == 0 or not link_.startswith("qqdl://"):
print("`{}`不是QQ旋风链接!".format(link_))
return None
return _restore(link)
# 动态加载模块 相当于在{path}路径下使用`import {_module}`
def reload(_module, path=None, raise_=False):
"""
试用阶段!!
- 在系统环境路径列表`最前方`插入{path}
- 使用`importlib.import_module(".", _module)`导入模块
- 测试中,从`dir({_module})`的结果来看,相关的 类/函数 进行删减的时候是无法同步删减的,会保持原样
- `在sys.path各路径下存在模块名称冲突`
* 按照已知的常规导入模块方法,返回的结果是在sys.path中各个路径顺序找到的第一个(第一步的原因)
* 测试可行,应该也是这样的,不过毕竟样本过少,不能保证稳定性。。
- `importlib.reload(module_)`语句可以对模块进行热重载(即在代码运行的过程中进行重载,`即时生效`)
- _module 需要导入的模块
- path 模块所在的路径,可选
- raise_ 导入失败时是否报错,可选
"""
if path:
sys_path_temp = list(sys.path)
sys.path.insert(0, path)
try:
module_ = import_module(".", _module)
_module_ = _reload(module_)
return _module_
except ImportError as err:
if path:
sys.path = sys_path_temp
if raise_:
raise err
return None
############################################################
# sendmail
def _sendmail(account, from_name, to, subject, content) -> bool:
def _format_addr(s):
name,addr = parseaddr(s)
return formataddr((Header(name,"utf-8").encode(), addr if isinstance(addr,str) else addr))
msg = MIMEText(content, "plain", "utf-8")
msg["From"] = _format_addr("{} <{}>".format(from_name, account["name"]))
msg["To"] = to
msg["Subject"] = subject
try:
em = SMTP_SSL(account["smtp_host"], account["smtp_port"])
em.login(account["name"], account["password"])
em.sendmail(msg["From"], msg["To"], msg.as_string())
em.quit()
print("Success!")
return True
except Exception as err:
print("Failed!\n{}".format(err))
return False
def sendmail(username: str, password: str, smtp_host: str, smtp_port: int,
from_name: str, send_to: str, subject: str="主题", content="内容") -> bool:
account = {}
account["name"] = username.strip()
account["password"] = password.strip()
account["smtp_host"] = smtp_host.strip()
account["smtp_port"] = smtp_port if re.sub(r"[^\d]", r"", str(smtp_port)) == str(smtp_port) else 0
if account["smtp_port"]==0:
print("err")
return False
else:
return _sendmail(account, from_name, send_to, subject, content)
# sendmail
############################################################
# 文本转语音,win10测试可行
def text2Speech(text) -> None:
try:
import win32com.client # pip install pypiwin32
# Microsoft Speech API
speak = win32com.client.Dispatch("SAPI.SpVoice")
speak.Speak(text)
except ImportError:
# 语音识别的接口在我这电脑上不知道为什么用不了
try:
import speech
speech.say(text)
except SyntaxError:
print("autofix: start")
import os, sys
for i in sys.path:
if i.endswith("lib\\site-packages"):
path = os.path.join(i, "speech.py")
path_bak = os.path.join(i, "speech.py.bak")
if os.path.exists(path):
with open(path, "r", encoding="utf8") as f:
data = f.read()
with open(path_bak, "w", encoding="utf8") as f:
f.write(data)
print(f"backup: {path} => {path_bak}")
# python 有个 2to3 的小工具,但是这里的改动很少,直接替换也行
data = data.replace("import thread", "import _thread")\
.replace("print prompt", "print(prompt)")\
.replace("thread.start_new_thread(loop, ())",
"_thread.start_new_thread(loop, ())")
with open(path, "w", encoding="utf8") as f:
f.write(data)
print("autofix: end")
import speech
speech.say(text)
except ImportError as err:
raise err
def thunderLinkGenerator(link_: str) -> str:
return "thunder://" + str(b64encode(f"AA{link_}ZZ".encode("utf-8")))[2:-1]
# 迅雷链接还原
def thunderLinkRestore(link_: str):
link = link_[link_.find(':')+3:]
if len(link) == 0 or not link_.startswith("thunder"):
print("`{}`不是迅雷链接!".format(link_))
return None
elif link_.startswith("thunderx"):
'''
thunderx://{
"thunderInstallPack":"",
"threadCount":"",
"downloadDir":"",
"installFile":"",
"runParams":"",
"taskGroupName":"",
"excludePath":"",
"tasks":[{"url":"","originUrl":""},],
"createShortcut":null
}
'''
if link.find('{') < 0:
print('注意:这个不是下载链接!!')
return _restore(link)[2:-2]
null = None
link_json = json_loads(link)
tasks = []
for task in link_json['tasks']:
_url = thunderLinkRestore(task['originUrl'])
# 文件夹结构
_task = {
"path": _url.replace(link_json['excludePath'], ''),
"url": _url
}
tasks.append(_task)
link_json['tasks'] = tasks
return link_json
else:
return _restore(link)[2:-2]
if __name__ == "__main__":
print("部分功能测试(false表示存在异常)\n\r")
temp = Clipboard.getData()
Clipboard.setData("test")
print("getClipboardData/setClipboardData: ", Clipboard.getData()=="test")
Clipboard.setData(temp) # 尝试恢复测试前的剪切板内容
verify = {
"flashget": "flashget://W0ZMQVNIR0VUXXRlc3RbRkxBU0hHRVRd",
"qqdl": "qqdl://dGVzdA==",
"real": "test",
"thunder": "thunder://QUF0ZXN0Wlo="
}
linkList = linkConverter(verify["real"])
print("flashgetLinkGenerator: ", linkList["flashget"]==verify["flashget"])
print("flashgetLinkRestore: ", flashgetLinkRestore(verify["flashget"])==verify["real"])
print("qqdlLinkGenerator: ", linkList["qqdl"]==verify["qqdl"])
print("qqdlLinkRestore: ", qqdlLinkRestore(verify["qqdl"])==verify["real"])
print("thunderLinkGenerator: ", linkList["thunder"]==verify["thunder"])
print("thunderLinkRestore: ", thunderLinkRestore(verify["thunder"])==verify["real"])
os.system("pause")