-
Notifications
You must be signed in to change notification settings - Fork 52
/
Copy pathserver.py
157 lines (114 loc) · 4.19 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
import json
import time
import re
import oauth2 as oauth
import os
from secret import keys
from flask import (Flask, jsonify, render_template)
from model import (connect_to_db, db, Tweet)
from sqlalchemy import desc
key = keys.key()
app = Flask(__name__)
# key generated by OS
app.secret_key = key['Flask_Key']
link = re.compile(r'(http(s)?://\w+(\.\w+)+(/\w+)*|@(\w+)|#(\w+))')
DEFAULT_DB_URI = "postgresql:///newb"
DB_URI = os.environ.get(
'NEWBIE_TWEETS_DB_URI',
DEFAULT_DB_URI,
)
DEFAULT_LISTEN_HOST = '127.0.0.1:5000'
LISTEN_HOST = os.environ.get(
'NEWBIE_TWEETS_LISTEN_HOST',
DEFAULT_LISTEN_HOST,
)
DEFAULT_LISTEN_PORT = '5000'
LISTEN_PORT = int(os.environ.get(
'NEWBIE_TWEETS_LISTEN_PORT',
DEFAULT_LISTEN_PORT,
))
def authorize():
"""authorize w/ twitter api and fetch recent codenewbie tweets, return a json"""
consumer = oauth.Consumer(key=key["consumer_key"], secret=key["consumer_secret"])
access_token = oauth.Token(key=key["access_token"], secret=key["access_secret"])
client = oauth.Client(consumer, access_token)
test_url = "https://api.twitter.com/1.1/search/tweets.json?q=%23codenewbie&result_type=mixed&count=100&include_entities=false"
response, data = client.request(test_url)
return json.loads(data)
def format_tweets():
"""return a list of tuples of recent codenewbie tweets"""
tweet = None
time_created = None
retweets = None
results = authorize()["statuses"]
output = []
for result in results:
if result['text'][0:2] != 'RT':
tweet = result['text']
time_created = time.strftime('%Y-%m-%d %H:%M:%S', time.strptime(result['created_at'],
'%a %b %d %H:%M:%S +0000 %Y'))
handle = result['user']['screen_name']
if 'retweeted_status' in result:
retweets = result['retweeted_status']['retweet_count']
output.append((handle, time_created, tweet, retweets))
return output
def tweet_to_db():
"""Add tweets into db"""
output = format_tweets()
text_list = [a.text for a in Tweet.query.all()]
for tweet in output:
if tweet[2] not in text_list: # need to edit this
tweet = Tweet(handle=tweet[0],
time_created=tweet[1],
text=tweet[2],
retweets=tweet[3])
db.session.add(tweet)
db.session.commit()
def linkyfy(text, is_name=False):
""" Embeds links in anchor tag"""
if is_name:
text = u"<a href='https://twitter.com/{0}'>{0}</a>".format(text)
return text
links = link.findall(text)
for l in links:
if l[0].startswith('@'):
text = text.replace(l[0], r"<a href='https://twitter.com/%s'>%s</a>" % (l[4], l[0]))
elif l[0].startswith('#'):
text = text.replace(l[0], r"<a href='https://twitter.com/search?q=%%23%s'>%s</a>"%(l[5], l[0]))
else:
text = text.replace(l[0], r"<a href='%s'>%s</a>" % (l[0], l[0]))
return text
@app.route("/")
def homepage():
"""Display tweets"""
tweet_to_db()
output = [a for a in Tweet.query.order_by(desc('time_created')).all()]
# to display as hyper links
for tweet in output:
tweet.handle = linkyfy(tweet.handle, is_name=True)
tweet.text = linkyfy(tweet.text)
return render_template("home.html", output=output)
@app.route("/about")
def display_about():
"""Diplay about page"""
return render_template("about.html")
@app.route("/friends")
def display_friends():
"""Display list of contributors to open source code practice project"""
return render_template("friends.html")
@app.route("/api/tweets")
def create_api_endpoint():
"""Using ingested dsta from twitter create an API endpoint"""
tweedict = {}
tweets = Tweet.query.all()
for tweet in tweets:
tweedict[tweet.handle] = tweet.text
return jsonify(tweedict)
@app.route("/archives")
def archives():
""" Displays previous tweets """
return render_template("archives.html")
if __name__ == "__main__":
app.debug = True
connect_to_db(app, DB_URI)
app.run(host=LISTEN_HOST, port=LISTEN_PORT)