-
Notifications
You must be signed in to change notification settings - Fork 0
/
tiktok.py
204 lines (192 loc) · 7.26 KB
/
tiktok.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
import argparse
import requests
import pandas as pd
from TikTokApi import TikTokApi
import random
import pathlib
import numpy as np
from datetime import datetime, date
# arguments required to pass in the command line
parser = argparse.ArgumentParser()
parser.add_argument("--hashtags", type=str,
help="Search hashtags, can be one word or multiple words separated by comma")
parser.add_argument("--count", type=int,
help="number of videos to download per hashtag")
parser.add_argument("--data_file", type=str,
help="Path to the metadata csv file")
parser.add_argument("--data_dir", type=str,
help="Path to the videos directory")
# metadata values to store
headers = ['search_keyword',
'language',
'video_id',
'video_timestamp',
'video_duration',
'video_diggcount',
'video_sharecount',
'video_commentcount',
'video_playcount',
'video_description',
'video_hashtags',
'video_is_ad',
'video_stickers',
'author_username',
'author_name',
'author_verified',
'author_followercount',
'author_followingcount',
'author_heartcount',
'author_videocount',
'author_diggcount',
'download_date']
def tag_search(keyword, offset=0):
"""
:param keyword: the search keyword/hashtag
:param offset: number of videos to skip from the top videos list
:return: metadata of the videos in the search results
"""
params = {
'keyword': keyword,
'offset': offset,
}
cookies = {
'ttwid': 'ttwid', # replace with actual ttwid
'sessionid': 'sessionid', # replace with actual TikTok sessionid
}
request = requests.get("http://us.tiktok.com/api/search/item/full/", params=params, cookies=cookies)
return request.json()
def get_data_row(video_id, hashtag):
"""
:param video_id: metadata object of one video
:param hashtag: hashtag used to get this video
:return: a dataframe row contains the video metadata
"""
data_list = [hashtag, '', str(video_id['id'])]
try:
ctime = video_id['createTime']
data_list.append(datetime.fromtimestamp(int(ctime)).strftime('%Y-%m-%d %H:%M:%S'))
except Exception:
data_list.append('')
try:
data_list.append(video_id['video']['duration'])
except Exception:
data_list.append(np.nan)
try:
data_list.append(video_id['stats']['diggCount'])
except Exception:
data_list.append(np.nan)
try:
data_list.append(video_id['stats']['shareCount'])
except Exception:
data_list.append(np.nan)
try:
data_list.append(video_id['stats']['commentCount'])
except Exception:
data_list.append(np.nan)
try:
data_list.append(video_id['stats']['playCount'])
except Exception:
data_list.append(np.nan)
try:
data_list.append(video_id['desc'])
except Exception:
data_list.append('')
try:
hashtag_list = [f"#{c['hashtagName']}" for c in video_id['textExtra']]
data_list.append(','.join(hashtag_list))
except Exception:
data_list.append('')
try:
data_list.append(video_id['isAd'])
except Exception:
data_list.append('')
try:
video_stickers = []
for sticker in video_id['stickersOnItem']:
for text in sticker['stickerText']:
video_stickers.append(text)
data_list.append(';'.join(video_stickers))
except Exception:
data_list.append('')
try:
data_list.append(video_id['author']['uniqueId'])
except Exception:
try:
data_list.append(video_id['author'])
except Exception:
data_list.append('')
try:
data_list.append(video_id['author']['nickname'])
except Exception:
try:
data_list.append(video_id['nickname'])
except Exception:
data_list.append('')
try:
data_list.append(video_id['author']['verified'])
except Exception:
data_list.append('')
try:
data_list.append(video_id['authorStats']['followerCount'])
except Exception:
data_list.append(np.nan)
try:
data_list.append(video_id['authorStats']['followingCount'])
except Exception:
data_list.append(np.nan)
try:
data_list.append(video_id['authorStats']['heartCount'])
except Exception:
data_list.append(np.nan)
try:
data_list.append(video_id['authorStats']['videoCount'])
except Exception:
data_list.append(np.nan)
try:
data_list.append(video_id['authorStats']['diggCount'])
except Exception:
data_list.append(np.nan)
try:
data_list.append(date.today().strftime('%Y-%m-%d'))
except Exception:
data_list.append(np.nan)
data_dict = dict(zip(headers, data_list))
data_row = pd.DataFrame(data_dict, index=[0])
return data_row
if __name__ == '__main__':
args = parser.parse_args() # parsing commandline arguments
did = str(random.randint(10000, 999999999)) # generating a random number to use as a device id
api = TikTokApi(custom_device_id=did) # connect to the API
data_file = pathlib.Path(args.data_file) # path to the metadata file
if data_file.exists(): # if a metadata file exists
data_df = pd.read_csv(args.data_file) # read the file into a dataframe
data_df['video_id'] = data_df['video_id'].astype("string") # convert the video_id column to string
else: # if no metadata file exists
data_df = pd.DataFrame(columns=headers) # create a new dataframe
hashtags = [tag.strip() for tag in args.hashtags.split(',')] # read the hashtags from the commandline
for hashtag in hashtags: # for each of the hashtags
try:
print(hashtag)
offset = 0
while offset < args.count:
res_json = tag_search(hashtag, offset) # use the search function to retrieve videos metadata
videos_id = [item['id'] for item in res_json['item_list']] # get the videos ids
cnt = 0
for item in res_json['item_list']: # for each video
if item['id'] not in data_df['video_id'].tolist(): # check if the video is a duplicate
try:
video = api.video(id=item['id'])
video_data = video.bytes() # download video
# write video file
with open(f"{args.data_dir}/{item['id']}.mp4", "wb") as out_file:
out_file.write(video_data)
cnt+=1
video_row = get_data_row(item, f'#{hashtag}') # extract video information
data_df = pd.concat([data_df, video_row], ignore_index=True) # append the video info to the metadata file
except Exception:
pass
print(f'Downloaded {cnt} videos')
offset += cnt
except Exception:
pass
data_df.to_csv(args.data_file, index=False) # write the metadata dataframe to a csv file