-
Notifications
You must be signed in to change notification settings - Fork 0
/
tweep.py
308 lines (277 loc) · 11.4 KB
/
tweep.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
#!/usr/bin/python3
from bs4 import BeautifulSoup
from time import gmtime, strftime
import argparse
import aiohttp
import asyncio
import async_timeout
import csv
import datetime
import json
import re
import sys
import dataset
async def getUrl(init):
'''
URL Descision:
Tweep utilizes positions of Tweet's from Twitter's search feature to
iterate through a user's Twitter feed. This section decides whether
this is the first URL request or not and develops the URL based on the
args given.
Returns complete URL.
'''
if init == -1:
url = "https://twitter.com/search?f=tweets&vertical=default&lang=en&q="
else:
url = "https://twitter.com/i/search/timeline?f=tweets&vertical=default"
url+= "&lang=en&include_available_features=1&include_entities=1&reset_"
url+= "error_state=false&src=typd&max_position={}&q=".format(init)
if arg.u != None:
url+= "from%3A{0.u}".format(arg)
if arg.g != None:
arg.g = arg.g.replace(" ", "")
url+= "geocode%3A{0.g}".format(arg)
if arg.s != None:
arg.s = arg.s.replace(" ", "%20").replace("#", "%23")
url+= "%20{0.s}".format(arg)
if arg.year != None:
url+= "%20until%3A{0.year}-1-1".format(arg)
if arg.since != None:
url+= "%20since%3A{0.since}".format(arg)
if arg.fruit:
url+= "%20myspace.com%20OR%20last.fm%20OR"
url+= "%20mail%20OR%20email%20OR%20gmail%20OR%20e-mail"
url+= "%20OR%20phone%20OR%20call%20me%20OR%20text%20me"
url+= "%20OR%20keybase"
if arg.verified:
url+= "%20filter%3Averified"
return url
async def fetch(session, url):
'''
Basic aiohttp request with a 30 second timeout.
'''
with async_timeout.timeout(30):
async with session.get(url) as response:
return await response.text()
async def initial(response):
'''
Initial response parsing and collecting the position ID
'''
soup = BeautifulSoup(response, "html.parser")
feed = soup.find_all("li", "js-stream-item")
init = "TWEET-{}-{}".format(feed[-1]["data-item-id"], feed[0]["data-item-id"])
return feed, init
async def cont(response):
'''
Regular json response parsing and collecting Position ID
'''
json_response = json.loads(response)
html = json_response["items_html"]
soup = BeautifulSoup(html, "html.parser")
feed = soup.find_all("li", "js-stream-item")
split = json_response["min_position"].split("-")
split[1] = feed[-1]["data-item-id"]
init = "-".join(split)
return feed, init
async def getFeed(init):
'''
Parsing Descision:
Responses from requests with the position id's are JSON,
so this section decides whether this is an initial request
or not to use the approriate response reading for parsing
with BeautifulSoup4.
Returns html for Tweets and position id.
'''
async with aiohttp.ClientSession() as session:
response = await fetch(session, await getUrl(init))
feed = []
try:
if init == -1:
feed, init = await initial(response)
else:
feed, init = await cont(response)
except:
# Tweep will realize that it's done scraping.
pass
return feed, init
async def outTweet(tweet):
'''
Parsing Section:
This function will create the desired output string and
write it to a file or csv if specified.
Returns output.
'''
tweetid = tweet["data-item-id"]
# Formatting the date & time stamps just how I like it.
datestamp = tweet.find("a", "tweet-timestamp")["title"].rpartition(" - ")[-1]
d = datetime.datetime.strptime(datestamp, "%d %b %Y")
date = d.strftime("%Y-%m-%d")
timestamp = str(datetime.timedelta(seconds=int(tweet.find("span", "_timestamp")["data-time"]))).rpartition(", ")[-1]
t = datetime.datetime.strptime(timestamp, "%H:%M:%S")
time = t.strftime("%H:%M:%S")
# The @ in the username annoys me.
username = tweet.find("span", "username").text.replace("@", "")
timezone = strftime("%Z", gmtime())
# The context of the Tweet compressed into a single line.
text = tweet.find("p", "tweet-text").text.replace("\n", "").replace("http", " http").replace("pic.twitter", " pic.twitter")
# Regex for gathering hashtags
hashtags = ",".join(re.findall(r'(?i)\#\w+', text, flags=re.UNICODE))
replies = tweet.find("span", "ProfileTweet-action--reply u-hiddenVisually").find("span")["data-tweet-stat-count"]
retweets = tweet.find("span", "ProfileTweet-action--retweet u-hiddenVisually").find("span")["data-tweet-stat-count"]
likes = tweet.find("span", "ProfileTweet-action--favorite u-hiddenVisually").find("span")["data-tweet-stat-count"]
'''
This part tries to get a list of mentions.
It sometimes gets slow with Tweets that contain
40+ mentioned people.. rather than just appending
the whole list to the Tweet, it goes through each
one to make sure there arn't any duplicates.
'''
try:
mentions = tweet.find("div", "js-original-tweet")["data-mentions"].split(" ")
for i in range(len(mentions)):
mention = "@{}".format(mentions[i])
if mention not in text:
text = "{} {}".format(mention, text)
except:
pass
# Preparing to output
'''
There were certain cases where I used Tweep
to gather a list of users and then fed that
generated list into Tweep. That's why these
modes exist.
'''
if arg.users:
output = username
elif arg.tweets:
output = tweets
else:
'''
The standard output is how I like it, although
this can be modified to your desire. Uncomment
the bottom line and add in the variables in the
order you want them or how you want it to look.
'''
# output = ""
#output = "{} {} {} {} <{}> {}".format(tweetid, date, time, timezone, username, text)
output = "{}".format(text)
if arg.hashtags:
output+= " {}".format(hashtags)
if arg.stats:
output+= " | {} replies {} retweets {} likes".format(replies, retweets, likes)
# Output section
if arg.o != None:
if arg.csv:
# Write all variables scraped to CSV
dat = [tweetid, date, time, timezone, username, text, replies, retweets, likes, hashtags]
with open(arg.o, "a", newline='') as csv_file:
writer = csv.writer(csv_file, delimiter="|")
writer.writerow(dat)
elif arg.db:
# Write all variables scraped to a SQLite database
db = dataset.connect('sqlite:///' + arg.o)
if arg.users:
dat = {'query': arg.s, 'username': username}
db['users'].insert(dat)
else:
dat = {'tweetid': tweetid, 'date': date, 'time': time, 'timezone': timezone,
'username': username, 'text': text, 'replies': replies, 'retweets': retweets,
'likes': likes, 'hashtags': hashtags}
db['tweets'].insert(dat)
else:
# Writes or appends to a file.
print(output, file=open(arg.o, "a"))
return output
async def getTweets(init):
'''
This function uses the html responses from getFeed()
and sends that info to the Tweet parser outTweet() and
outputs it.
Returns response feed, if it's first-run, and Tweet count.
'''
tweets, init = await getFeed(init)
count = 0
for tweet in tweets:
'''
Certain Tweets get taken down for copyright but are still
visible in the search. We want to avoid those.
'''
copyright = tweet.find("div","StreamItemContent--withheld")
if copyright is None:
count +=1
print(await outTweet(tweet))
return tweets, init, count
async def getUsername():
'''
This function uses a Twitter ID search to resolve a Twitter User
ID and return it's corresponding username.
'''
async with aiohttp.ClientSession() as session:
r = await fetch(session, "https://twitter.com/intent/user?user_id={0.userid}".format(arg))
soup = BeautifulSoup(r, "html.parser")
return soup.find("a", "fn url alternate-context")["href"].replace("/", "")
async def main():
'''
Putting it all together.
'''
if arg.userid is not None:
arg.u = await getUsername()
feed = [-1]
init = -1
num = 0
while True:
'''
If our response from getFeed() has an exception,
it signifies there are no position IDs to continue
with, telling Tweep it's finished scraping.
'''
if len(feed) > 0:
feed, init, count = await getTweets(init)
num += count
else:
break
# Control when we want to stop scraping.
if arg.limit is not None and num <= int(arg.limit):
break
if arg.count:
print("Finished: Successfully collected {} Tweets.".format(num))
def Error(error, message):
# Error formatting
print("[-] {}: {}".format(error, message))
sys.exit(0)
def check():
# Performs main argument checks so nothing unintended happens.
if arg.u is not None:
if arg.users:
Error("Contradicting Args", "Please use --users in combination with -s.")
if arg.verified:
Error("Contradicting Args", "Please use --verified in combination with -s.")
if arg.userid:
Error("Contradicting Args", "--userid and -u cannot be used together.")
if arg.tweets and arg.users:
Error("Contradicting Args", "--users and --tweets cannot be used together.")
if arg.csv and arg.o is None:
Error("Error", "Please specify an output file (Example: -o file.csv")
if __name__ == "__main__":
ap = argparse.ArgumentParser(prog="tweep.py", usage="python3 %(prog)s [options]", description="tweep.py - An Advanced Twitter Scraping Tool")
ap.add_argument("-u", help="User's Tweets you want to scrape.")
ap.add_argument("-s", help="Search for Tweets containing this word or phrase.")
ap.add_argument("-o", help="Save output to a file.")
ap.add_argument("-g", help="Search for geocoded tweets.")
ap.add_argument("--year", help="Filter Tweets before specified year.")
ap.add_argument("--since", help="Filter Tweets sent since date (Example: 2017-12-27).")
ap.add_argument("--fruit", help="Display 'low-hanging-fruit' Tweets.", action="store_true")
ap.add_argument("--tweets", help="Display Tweets only.", action="store_true")
ap.add_argument("--verified", help="Display Tweets only from verified users (Use with -s).", action="store_true")
ap.add_argument("--users", help="Display users only (Use with -s).", action="store_true")
ap.add_argument("--csv", help="Write as .csv file.", action="store_true")
ap.add_argument("--db", help="Write as a database.", action="store_true")
ap.add_argument("--hashtags", help="Output hashtags in seperate column.", action="store_true")
ap.add_argument("--userid", help="Twitter user id")
ap.add_argument("--limit", help="Number of Tweets to pull (Increments of 20).")
ap.add_argument("--count", help="Display number Tweets scraped at the end of session.", action="store_true")
ap.add_argument("--stats", help="Show number of replies, retweets, and likes", action="store_true")
arg = ap.parse_args()
check()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())