forked from kekewind/Kaleidoscope
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathDouyinUtils.py
283 lines (243 loc) · 9.1 KB
/
DouyinUtils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
import json
import os
import time
from selenium.webdriver.common.by import By
import MyUtils
MyUtils.setrootpath(MyUtils.getsettings('douyin'))
maxready=99
# 文件定义
allusers = MyUtils.RefreshJson('D:/Kaleidoscope/抖音/AllUsers.txt')
specialusers = MyUtils.RefreshJson('D:/Kaleidoscope/抖音/SpecialUsers.txt')
allpieces = MyUtils.RefreshJson('D:/Kaleidoscope/抖音/AllPieces.txt')
readytodownload = MyUtils.cache('D:/Kaleidoscope/抖音/ReadytoDownload.txt',silent=MyUtils.debug)
exceptuser = MyUtils.rtxt('D:/Kaleidoscope/抖音/FailedUsers.txt')
failed = MyUtils.Json('D:/Kaleidoscope/抖音/FailedPieces.txt')
missing = MyUtils.rjson('D:/Kaleidoscope/抖音/Missing.txt')
expirepiecex=MyUtils.rjson(MyUtils.projectpath('./抖音/ExpiredPieces.txt'))
history=MyUtils.txt('D:/Kaleidoscope/抖音/History.txt')
def TurnHostTab(l,tab='作品'):
"""
切换主页展示条目
@param l:
@param tab:
@return: 建议作品数;False代表获取失败
"""
Page=l[0]
try:
Page.click(f'//span[text()="{tab}"]')
psn=Page.element(f"//span[text()='{tab}']/following-sibling::span/text()",strict=False)
if psn==0:
MyUtils.warn(f'用户无作品。{Page.url()}')
if psn==None:
return False
except Exception as e:
MyUtils.warn(f'发现用户异常。{Page.url()}')
exceptuser.add(MyUtils.gettail(Page.url(),'/'))
return int(psn)
def HostPieces(l,tab='作品'):
"""
获取主页的作品
@param l:页面数组
@return: 所有作品url
"""
Page=l[0]
psn=TurnHostTab(l,tab)
ret=[]
def func(ret,l):
if ret is None:
ret=[]
return ret, l[0].elements('//div[contains(@data-e2e,"user-post-list") or contains(@data-e2e,"user-like-list")]//li//a/@href'), set=True)
# 如果数量获取失败就只获取一次
if psn==False:
psn=1
while psn and len(ret)<psn:
MyUtils.warn(f'作品数量不匹配 {len(ret)}/{psn}')
ret+=Page.Down(start=Page.getscrollheight(),scale=400,pause=2,func=func)
Page.click('//span[text()="刷新"]',strict=False)
return ret
@MyUtils.consume
def piecetourlnum(l):
"""
@param l:作品元素
@return: 作品url,uid
"""
VideolElement = l[0]
elementurl = VideolElement.get_attribute('href')
if elementurl.find('?') > 0:
VideoNum = elementurl[elementurl.rfind('/') + 1:elementurl.find('?')]
else:
VideoNum = elementurl[elementurl.rfind('/') + 1:]
return (elementurl, VideoNum)
def IsPic(l):
"""
传入元素,需要消除二维码页面
@param l: 视频元素
@return: 图文(真)还是视频
"""
elements=MyUtils.Edge.elements(None,'./div/div[3]/div',root=l[0],strict=False)
# 思路是找到一个图文标签即可
# 似乎图文都是在svg里的
for el in elements:
if el.text in ['图文'] or not None==MyUtils.Element([el,By.XPATH,'.//svg'],depth=9,silent=True):
MyUtils.delog('图文')
return True
# if MyUtils.Element([el, By.XPATH, './/span/text()'], depth=9, silent=True) in ['置顶','共创']:
MyUtils.delog('视频')
return False
def HostPiecesNum(l):
page = l[0]
MyUtils.setscrolltop([page, 0])
time.sleep(0.2)
l1 = MyUtils.Element([page, By.XPATH, '//h2/span[2]'], depth=9, silent=True)
l2 = MyUtils.Element([page, By.XPATH, '/html/body/div[1]/div/div[2]/div/div/div/div[2]/div[1]/div[2]/div/div/div[1]/span[2]'], depth=9, silent=True)
ret = 0
if not l1 == None:
ret = int(l1.text)
elif not l2 == None:
ret = int(l2.text)
else:
MyUtils.warn(f'作品数量获取失败.。{l1, l2}')
MyUtils.delog(f'作品数量:{ret}')
return ret
def HostLikeNum(l):
page = l[0]
l1 = MyUtils.Elements([page, By.XPATH, '/html/body/div[1]/div/div[2]/div/div/div[4]/div[1]/div[1]/div[2]/span'], depth=9, silent=True)
l2 = MyUtils.Elements([page, By.XPATH, '/html/body/div[1]/div/div[2]/div/div/div/div[2]/div[1]/div[2]/div/div/div[2]/span[2]'], depth=9, silent=True)
LikeElement = (l1+l2)[0]
LikeNum = LikeElement.text
LikeElement.click()
return LikeNum
# 增加记录
def addauthor(useruid, author, users=allusers):
User = None
for i in users.l:
if not useruid == list(MyUtils.jsontodict(i).keys())[0]:
continue
else:
User = i
if User == None:
users.add({useruid: [author]})
MyUtils.delog(f'添加了新用户在{users.path}中')
return
authors = MyUtils.jsontodict(User)[useruid]
if not author in authors:
users.add({useruid: authors+[author]})
MyUtils.delog(f'添加了用户名称在{users.path}中')
def simplinfo(num, author, title):
return json.dumps({str(num): {'disk': MyUtils.diskpath, 'author': author, 'title': title}}, ensure_ascii=False)
# return json.dumps({str(num):{'disk':MyUtils.hashcode,'author':author,'title':title}},ensure_ascii=True)
def load(l, videourl, author=None, readytoDownload=readytodownload,ispic=None,useruid=None):
"""
作品页进行下载
@param l: 页面
@param videourl: 作品url
@param author: 已知作者
@param readytoDownload:
@param ispic:
@return:useruid(如果没有传入),author(如果没有传入)
"""
page=l[0]
if skiprecorded(videourl):
return
page.get(videourl)
if author==None:
userlink=page.element('//div[@data-e2e="user-info"]//a/@href')
if 'live' in userlink:
MyUtils.delog('直播')
return None,None
useruid=MyUtils.gettail(userlink,'/')
author=page.element('//div[@data-e2e="user-info"]//a//span[not(text()="")]/text()')
if ispic==None:
ispic='note'in page.url()
if not ispic:
VideoUrl=page.elements('//xg-video-container/video/source[1]/@src',depth=8)
else:
VideoUrl=page.elements('//*[@id="root"]/div[1]/div[2]/div/main/div[1]/div[1]/div/div[2]/div/img/@src',depth=8)
num=MyUtils.gettail(page.url(),'/')
title=MyUtils.rmtail(page.element("//title/text()"),' - 抖音')
readytoDownload.add({"list": [num, author, title, VideoUrl, ispic]})
MyUtils.delog(f'下载队列 ({readytoDownload.length()})')
if not useruid==None:
return useruid,author
def dislike(l):
Page=l[0]
l1=Page.elements('//*[@id="root"]/div[1]/div[2]/div/main/div[1]/div[2]/div/div[1]/div[1]',strict=False)
l1+=Page.elements('//*[@id="root"]/div[1]/div[2]/div/div/div[1]/div[3]/div/div[2]/div[1]/div[1]/div',strict=False)
Page.click(l1[0])
MyUtils.delog('已取消喜欢')
time.sleep(3)
def skiprecorded(videourl):
videourl,VideoNum = MyUtils.splittail(videourl, '/')
if (videourl+VideoNum in allpieces.d.keys()):
MyUtils.log(f'作品 {VideoNum} 在记录中,跳过')
return True
return False
def skipdownloaded(flag, record, VideoNum, title, author,num=None):
'''
@param flag:
@param record:
@param VideoNum:
@param title:
@param author:
@param num: 应该要有的图片数
@return:
'''
path = './抖音/' + author
if (os.path.exists(f'{path}/{VideoNum}_{title}.mp4') and not flag):
record.add(simplinfo(VideoNum, author, title))
MyUtils.log(f' {MyUtils.standarlizedPath(path)}/{VideoNum}_{title}.mp4 已存在磁盘中,补全记录')
return True
if flag:
if len(MyUtils.listfile(f'{path}/{VideoNum}_{title}'))==num:
record.add(simplinfo(VideoNum, author, title))
MyUtils.log(f' {path}/{VideoNum}_{title} 共{num}张图片已存在磁盘中,补全记录')
return True
else:
if not MyUtils.isemptydir(f'{path}/{VideoNum}_{title}'):
MyUtils.warn(f"未下载满:\n\t{path}/{VideoNum}_{title} {len(MyUtils.listfile(f'{path}/{VideoNum}_{title}'))}/{num}")
return False
return False
def 滑块验证(l):
page=l[0]
page.skip('//*[@id="captcha_container"]',strict=False)
def 跳转验证(l):
page=l[0]
while '验证码中间'in page.title():
MyUtils.sleep(3)
MyUtils.log('等待页面跳转中...')
def 登录验证(l):
page=l[0]
page.click('//div[@class="dy-account-close"]',strict=False)
def skipverify(l):
"""
@param l:页面
@return:
"""
page=l[0]
MyUtils.sleep(1)
page.skip('//*[@id="captcha-verify-image"]',strict=False)
page.click('//div[@class="dy-account-close"]',strict=False)
page.click('//*[@id="login-pannel"]/div[@class="dy-account-close"]',strict=False)
while '验证码中间'in page.title():
MyUtils.sleep(3)
MyUtils.log('等待页面跳转中...')
MyUtils.tip('DouyinUtils loaded.')
def hostdata(l,tab='作品'):
"""
@param l:主页
@return: author,作品url
"""
Host=l[0]
登录验证([Host])
author=MyUtils.rmtail(Host.title(),'的主页')
ps=HostPieces([Host],tab=tab)
return author,ps
def piecepagedata(l):
"""
@param l:作品页
@return: 作品标题,ispic
"""
Page=l[0]
title=MyUtils.rmtail(Page.title(),' - 抖音')
ispic='note'in Page.url()
return title,ispic