-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathassociate_twitter.py
executable file
·345 lines (291 loc) · 15.6 KB
/
associate_twitter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os, sys, json, re, csv
import requests
from datetime import datetime
from itertools import chain
from twitter import Twitter, OAuth
from twitterconfig import KEY, SECRET, OAUTH_TOKEN, OAUTH_SECRET
twitterConn = Twitter(auth=OAuth(OAUTH_TOKEN, OAUTH_SECRET, KEY, SECRET))
if len(sys.argv) < 3:
sys.stderr.write("Please input deputes/senateurs and the path for the Twitter list data\n")
exit(1)
# Logging
def log(msg, typ):
print("[%s/%s] %s" % (typ.upper(), typeparls, msg.encode("utf-8")))
def log_status():
log("%s todo, %s parls left, %s good" % (len(twitter), len(parls), len(goodparls)), "info")
# Read Parls data
typeparls = sys.argv[1]
typeparl = typeparls.rstrip("s")
goodparls = []
if typeparls.endswith(".csv"):
with open(os.path.join(typeparls)) as f:
try:
parls = {}
for parl in csv.DictReader(f):
parl["sites_web"] = [{"site": u.decode("utf-8")} for u in parl["sites_web"].split("|")]
for key in ["nom_de_famille", "prenom"]:
parl[key] = parl[key].decode("utf-8")
parl["nom"] = "%s %s" % (parl["prenom"], parl["nom_de_famille"])
parl["sexe"] = "H" if parl["Civ."] == "M." else "F"
parl["slug"] = parl["slug"] or parl["Slug"]
parls[parl["slug"]] = parl
typeparls = "deputes" if "deputes" in typeparls else "senateurs"
except ValueError:
sys.stderr.write("Could not open Nos%s.fr parlementaires list" % typeparls)
exit(1)
else:
with open(os.path.join(".cache", "%s.json" % typeparls)) as f:
try:
parls = dict((parl["slug"], parl) for parl in [p[typeparl] for p in json.load(f)[typeparls]])
except ValueError:
sys.stderr.write("Could not open Nos%s.fr parlementaires list" % typeparls)
exit(1)
# Collect twitter account from AN webpage in case they updated it:
if typeparls == "deputes":
for parl in parls:
try:
page = requests.get(parl["url_institution"]).text
except:
continue
twitter_line = re.search(r'<a[^>]*href="http[^"]*twitter.com/@?([^"/#]*)([/#][^"]*)?">Consulter le compte Twitter', page)
if twitter_line:
twid = twitter_line.group(1)
if twid.lower() != parl["twitter"].lower():
parl["sites_web"].append({"site": "https://twitter.com/%s" % twid})
# Run checks on preexisting data
try:
with open(os.path.join("data", "%s.json" % typeparls)) as f:
existing = json.load(f)
except Exception as e:
existing = []
# - Collect known accounts to check screenname changes and disappeared accounts
accounts_by_id = {str(p["twitter_id"]): p for p in existing}
ids = accounts_by_id.keys()
screennames_by_id = {}
for i in range(len(ids)/100 + 1):
batch_ids = ids[100 * i:100 * (i+1)]
for user in twitterConn.users.lookup(user_id=",".join(batch_ids)):
screennames_by_id[user["id_str"]] = user["screen_name"]
# - Check ids missing in screennames_by_id = accounts disappeared
if len(ids) != len(screennames_by_id.keys()):
for i in ids:
if i not in screennames_by_id:
parl = accounts_by_id[i]
log("Twitter account %s (@%s) seems to have disappeared for parl %s: https://twitter.com/%s | %s | %s" % (i, parl["twitter"], parl["nom"], parl["twitter"], parl["url_nos%s" % typeparls], parl["url_institution"]), "warning")
auto_handle_changes = {}
for parl in existing:
i = str(parl["twitter_id"])
old_handle = parl["twitter"].strip(" @")
new_handle = screennames_by_id.get(str(parl["twitter_id"]))
if not new_handle:
continue
# Examine accounts with modified screen_name
if new_handle.lower() != old_handle.lower():
log("Twitter account %s (https://twitter.com/%s) has changed its Twitter handle to %s for parl %s: https://twitter.com/%s | %s | %s" % (i, old_handle, new_handle, parl["nom"], new_handle, parl["url_nos%s" % typeparls], parl["url_institution"]), "info")
auto_handle_changes[old_handle.lower()] = new_handle
else:
new_handle = old_handle
# Examine description to check if compte parodique announced in description
if "parodi" in parl["twitter_description"].lower():
log("Twitter account %s for parl %s seems to be a parodical one according to its description \"%s\": https://twitter.com/%s" % (new_handle, parl["nom"], parl["twitter_description"], new_handle), "warning")
# Examine last tweet date to wheck whether account is active
if parl["twitter_last_tweeted_at"] and typeparls != "senateurs":
delay = (datetime.now() - datetime.strptime(parl["twitter_last_tweeted_at"], "%Y-%m-%dT%H:%M:%S")).total_seconds()
four_months = 86400 * 365
if delay > four_months:
log("Twitter account %s for parl %s has been inactive for more than a year (since %s): https://twitter.com/%s" % (new_handle, parl["nom"], parl["twitter_last_tweeted_at"][:10], new_handle), "warning")
# Read Twitter list data
twitter = {}
i = 2
while i < len(sys.argv):
with open(sys.argv[i]) as f:
twitter.update(json.load(f))
i += 1
# Exclude bad accounts
notparls = ["bayrou", "ABachelay", "search", "clyimiyepiz", "OffLineSouth", "joaquimpueyo", "serrenatbrefra", "SergeMuller19", "vbesse", "JorisHebrard", "FlorenceLASSERR"]
groupes = ["crcsenat", "udiuc", "ecolosenat", "senateursps", "senateursump", "lesrep_senat", "indep_senat", "senateurslarem", "rdse_senat", "senateurscrce", "uc_senat"]
doublons = ["teambouillon", "fdebeauce", "vignal2012", "deputecvautrin", "clergeau2012", "isabellebruneau", "roussetalain", "elubondy", "FLefebvre_UMP", "Gabouty2012", "moniquerabin", "PascalAllizard", "pascalegruny", "sergiocoronado", "audeluquet", # 2012-2017
"Darrieussecq", "MireilleRobert", "tamarelle_marie", "Fdumas2017", "stelladupont2", "karamanli72", "micheldelpon", "8306lrem", "offline8306", "ckamowski", "valbeauvais", "cjerretie", "hvchristophe", "SCazenove",
"simonnet2", "BertrandSorre", "unionpop93_10"] # 2022-2027
dead = ["Guy_Delcourt", "ConchitaLacuey", "MichelVERGNIER", "bernardroman59", "AndrSaint", "LucetteLousteau", "CathLEMORTON", "EPhilippe_LH", # 2012-2017
"celiadeputee2017", "davidlorion", "PascalBois2017", "DipompeoChris", "Vincent.Ledoux59", "valeriebeauvais2017", "Josso2017", "ColasRoy2017", "Marc_Delatte", "EricDiardDepute", "bernarddeflesselles", "sttrompille", "pgoulet58", "GCHICHE2017", "obono2017", "Sempastous2017", "JhRatenon", # 2017-2022
"brunosido1", "OHerviaux_Senat", "draoulsenateur", "HuguesPortelliS", "patrick_masclet", "LouisNegre2014", "gonthier_maurin", "danymichelsenat"]
badlinks = ["http://www.facebook.fr/pascalbois2017", "https://fr-fr.facebook.com/GuillaumePeltier", "https://www.facebook.com/valerie.boyer.56", "https://www.facebook.com/Marguerite-Deprez-Audebert-2017-420349688340872", "https://fr-fr.facebook.com/colas.roy.2017", "https://m.facebook.com/ThomasRudigoz2017", "https://www.facebook.com/BSmedoc", "https://fr-fr.facebook.com/sandramarsaudlarepubliquenmarche", "https://fr-fr.facebook.com/profile.php"]
excludes = [t.lower() for t in notparls + groupes + doublons + dead]
for e in excludes:
if e in twitter:
twitter.pop(e)
# Cleaning regexps
accents = [(re.compile(r), s) for r, s in [
(u'[àÀâÂ]', 'a'),
(u'[éÉèÈêÊëË]', 'e'),
(u'[îÎïÏ]', 'i'),
(u'[ôÔöÔ]', 'o'),
(u'[ùÙûÛüÜ]', 'u'),
(u'[çÇ]', 'c'),
]]
def clean_accents(t):
if not isinstance(t, unicode):
t = t.decode('utf-8')
for r, s in accents:
t = r.sub(s, t)
return t
re_clean = re.compile(r'[^a-z]+', re.I)
nospaces = lambda x: re_clean.sub('', x)
clean = lambda x: re_clean.sub(' ', clean_accents(x.lower())).strip()
re_clean_desc = re.compile(r"[\s\n]+")
clean_desc = lambda x: re_clean_desc.sub(" ", x)
re_clean_twiturl = re.compile(r"^.*twitter.com/(?:#!/)*([^/]+).*$", re.I)
clean_twiturl = lambda x: re_clean_twiturl.sub(r"\1", x).strip()
re_clean_url = re.compile(r"^((?:https?://)?(?:(?:www2?|m|fr|fr-fr|deputation)\.)?)(.*?)/?$", re.I)
check_url = lambda x: re_clean_url.sub(r"\2", x.strip().lower())
clean_url = lambda x: re_clean_url.sub(r"\1\2", x.strip())
re_clean_facebook = re.compile(ur"(facebook.com/.*?/?(\?id=.*?)?)([?&].*|#.*|/photos/.*)*$", re.I)
re_clean_facebook2 = re.compile(ur"(facebook.com/)www.facebook.com/", re.I)
clean_facebook = lambda x: re_clean_facebook.sub(ur"\1", re_clean_facebook2.sub(ur"\1", x.replace(u"%C3%A9", u"é")))
re_clean_initiales = re.compile(r"^([A-Z]{1,2}[\. ]+)+(d('|[eus]+ (la )?))?")
clean_initiales = lambda x: nospaces(clean(re_clean_initiales.sub("", x.strip())))
re_reorder = re.compile(r"^(.+)\s+(\S+)$")
reorder = lambda x: re_reorder.sub(r"\2 \1", x.strip())
re_reorder_part = re.compile(r"^(.+)\s+\((d.*)\)$")
reorder_part = lambda x: re_reorder_part.sub(r"\2 \1", x.strip())
re_split = re.compile(r"([A-Z])")
split_twid = lambda x: [clean(w) for w in re_split.sub(r" \1", x).strip().split(" ")]
# Start matching
log_status()
def store_one(twid, parl, slug):
if twid.lower() in auto_handle_changes:
oldsite = {"site": "https://twitter.com/%s" % twid}
if oldsite in parl["sites_web"]:
parl["sites_web"].remove(oldsite)
twid = auto_handle_changes[twid.lower()]
parl["sites_web"].append({"site": "https://twitter.com/%s" % twid})
try:
tw = twitter.pop(twid.lower())
except KeyError:
try:
tw = twitterConn.users.show(screen_name=twid)
except:
return log("Could not get info on Twitter account https://twitter.com/%s" % twid, "warning")
#log("Twitter account %s for %s found in urls but missing from list" % (twid.encode("utf-8"), parl['nom'].encode("utf-8")), "info")
parl['twitter'] = twid
parl['twitter_data'] = tw
goodparls.append(parl)
parls.pop(slug)
# First try to find twitter urls in each parl websites list
for slug in parls.keys():
parl = parls[slug]
found = False
for url in list(parl["sites_web"]):
if "senat.fr" in url["site"] or "assemblee-nationale.fr" in url["site"]:
parl["sites_web"].remove(url)
elif "twitter" in url["site"] and not found:
twid = clean_twiturl(url['site'].decode("utf-8"))
if twid.lower() in excludes:
parl["sites_web"].remove(url)
continue
found = True
store_one(twid, parl, slug)
if len(goodparls):
log_status()
# Then try to identify parl from matching his metadata to the name, urls and description from Twitter
urlentities = lambda tw: [u["expanded_url"] for u in tw["entities"].get("url", {"urls": []})["urls"]]
def match_parl(tw):
twid = tw["screen_name"]
urls = [check_url(u) for u in urlentities(tw) if u]
possible = []
name = clean(tw["name"])
namenospaces = nospaces(name)
subnames = set(chain(*[name.split(" ", i) for i in [1,2,3]]))
for slug in parls.keys():
parl = parls[slug]
# Try to match the full name
check = nospaces(clean(parl["nom"]))
if namenospaces == check:
return store_one(twid, parl, slug)
# Try to replace first name in the right place
reordname = reorder(name)
while reordname != name:
if nospaces(clean(reordname)) == check:
return store_one(twid, parl, slug)
reordname = reorder(reordname)
# Try to find family name matches only
checkfam = nospaces(clean(reorder_part(parl["nom_de_famille"])))
if namenospaces == checkfam:
possible.append(parl)
else:
for word in subnames:
if len(word) > 3 and nospaces(clean(word)) == checkfam:
possible.append(parl)
# Try to remove first name as initiales
if parl not in possible and clean_initiales(tw["name"]) == checkfam:
possible.append(parl)
# Try to search name in twitter id
if parl not in possible:
for word in split_twid(twid.replace('depute', '')):
if len(word) > 3 and word == checkfam:
possible.append(parl)
# Try to match a url
for url in urls:
if url in [check_url(u["site"]) for u in parl["sites_web"] if u] + [check_url(parl.get('url_an', parl.get('url_institution', '')) or '')]:
return store_one(twid, parl, slug)
# Check matches by family name found
if possible:
if len(possible) == 1:
return store_one(twid, possible[0], possible[0]["slug"])
log("Multiple parls found for %s: %s" % (twid, " ".join([p["slug"] for p in possible])), "warning")
for tw in twitter.values():
match_parl(tw)
log_status()
if len(twitter):
log("%s Twitter accounts could not be matched to any parl: %s" % (len(twitter), ", ".join(twitter.keys())), "warning")
# Write output data
if not os.path.isdir("data"):
os.makedirs("data")
formatcsv = lambda x: '"%s"' % x.encode("utf-8").replace('"', '""') if type(x) == unicode else str(x)
headers = ["twitter", "nom", "nom_de_famille", "prenom", "sexe", "twitter_tweets", "twitter_followers", "twitter_following", "twitter_listed", "twitter_favourites", "twitter_verified", "twitter_protected", "twitter_id", "twitter_name", "twitter_description", "twitter_created_at", "twitter_last_tweeted_at", "sites_web", "url_institution", "slug", "url_nos%s_api" % typeparls]
orderparls = sorted(goodparls, key=lambda x: "%s - %s" % (x["nom_de_famille"], x["prenom"]))
with open(os.path.join("data", "%s.csv" % typeparls), "w") as f:
print >> f, ",".join(headers)
for parl in orderparls:
tw = parl["twitter_data"]
parl["twitter_id"] = tw["id"]
if "status" in tw:
parl["twitter_last_tweeted_at"] = datetime.strptime(tw["status"]["created_at"], '%a %b %d %H:%M:%S +0000 %Y').isoformat()
tw.pop("status")
else:
parl["twitter_last_tweeted_at"] = ""
parl["twitter_name"] = tw["name"]
parl["twitter_created_at"] = datetime.strptime(tw["created_at"], '%a %b %d %H:%M:%S +0000 %Y').isoformat()
parl["twitter_description"] = clean_desc(tw["description"])
parl["twitter_tweets"] = tw["statuses_count"]
parl["twitter_favourites"] = tw["favourites_count"]
parl["twitter_followers"] = tw["followers_count"]
parl["twitter_following"] = tw["friends_count"]
parl["twitter_listed"] = tw["listed_count"]
parl["twitter_verified"] = tw["verified"]
parl["twitter_protected"] = tw["protected"]
sites_web = set([clean_url(u) for u in [s["site"] for s in parl["sites_web"] if s] + [u for u in urlentities(tw) if u and "senat.fr" not in u and "assemblee-nationale.fr" not in u]])
clean_sites = []
done_sites = [check_url(u) for u in badlinks]
for site in sorted(sites_web, key=lambda x: len(x)):
if not site.strip():
continue
if not site.startswith("http"):
site = "http://" + site.lstrip("/")
site = clean_facebook(site)
cleaned = check_url(site)
if cleaned not in done_sites:
clean_sites.append(site)
done_sites.append(cleaned)
parl["sites_web"] = "|".join(clean_sites)
if "url_institution" not in parl:
parl["url_institution"] = parl["url_an"]
parl["url_nos%s_api" % typeparls] = parl["url_nos%s" % typeparls] + "/csv"
print >> f, ",".join([formatcsv(parl[k]) for k in headers])
parl["url_nos%s_api" % typeparls] = parl["url_nos%s" % typeparls] + "/json"
parl["sites_web"] = parl["sites_web"].split("|")
with open(os.path.join("data", "%s.json" % typeparls), "w") as f:
json.dump(orderparls, f, indent=2, sort_keys=True)