Skip to content

Commit

Permalink
ES Implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
Kamparia committed Jan 29, 2021
1 parent da7fcab commit ae86885
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 35 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
# TweetStreamer
Ingesting & Analyzing Live Tweets In Real-Time Using Kinesis Firehose
Analysing COVID-19 Tweets in real-time using Twitter Streaming API, ElasticSearch & Kibana.
88 changes: 55 additions & 33 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,20 @@
# import python libraries
import os, json, tweepy, boto3, geocoder
import os, tweepy, geocoder
from textblob import TextBlob
from dotenv import load_dotenv
from elasticsearch import Elasticsearch
from datetime import datetime, timedelta

# load .ENV file
load_dotenv()

# aws SDK client
client = boto3.client('firehose',
aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"),
region_name=os.getenv("AWS_DEFAULT_REGION")
)

# authenticate twitter api
auth = tweepy.OAuthHandler(os.getenv("TWITTER_API_KEY"), os.getenv("TWITTER_API_SECRET"))
auth.set_access_token(os.getenv("TWITTER_TOKEN"), os.getenv("TWITTER_TOKEN_SECRET"))
api = tweepy.API(auth)

# kinesis firehose configurations
KINESIS_STREAM_NAME = os.getenv("KINESIS_STREAM_NAME")
# connect to ElasticSearch
es = Elasticsearch([{'host': os.getenv("ES_HOST"), 'port': int(os.getenv("ES_PORT"))}])

# create a stream listener
class MyStreamListener(tweepy.StreamListener):
Expand All @@ -43,10 +38,11 @@ def get_location_data(self, location):
try:
result = geocoder.osm(location)
if result.ok:
lat = result.json['raw']['lat']
lon = result.json['raw']['lon']
lat = float(result.json['raw']['lat'])
lon = float(result.json['raw']['lon'])
country = result.json['raw']['address']['country']
data = {'latitude': lat, 'longitude': lon, 'country': country}
country_iso2 = result.json['raw']['address']['country_code'].upper()
data = {'latitude': lat, 'longitude': lon, 'country': country, 'country_iso2': country_iso2}
return data
else:
return None
Expand All @@ -57,28 +53,25 @@ def get_location_data(self, location):


def process_event(self, status):
# process event
"""This is called when raw data is received from the stream.
This method handles sending the data to other methods, depending on the
message type.
"""
event = {}
sentiment = self.sentiment_analysis(status.text)
location = self.get_location_data(status.user.location)
if status.lang == 'en' and location != None and sentiment != None:
event['_id'] = status.id
event['uid'] = status.id
event['text'] = status.text
event['user'] = status.user.screen_name
event['sentiment'] = self.sentiment_analysis(status.text)
event['latitude'] = location['latitude']
event['longitude'] = location['longitude']
event['country'] = location['country']
event['created_at'] = (status.created_at).strftime('%Y-%m-%d %H:%M:%S')

# push event to kinesis firehose
client.put_record(
DeliveryStreamName=os.getenv("KINESIS_STREAM_NAME"),
Record={
'Data': json.dumps(event)
}
)
event['country_iso2'] = location['country_iso2']
event['geo'] = { 'location': str(location['latitude']) + "," + str(location['longitude']) }
event['@timestamp'] = datetime.now() - timedelta(minutes=60)

# insert data to elasticsearch index
es.index(index='tweet_streamer', id=event['uid'], body=event)
print(event)


Expand All @@ -94,12 +87,41 @@ def on_error(self, status_code):
return False


# create a stream
myStreamListener = MyStreamListener()
myStream = tweepy.Stream(auth = api.auth, listener=myStreamListener)
def main():
# create a stream
myStreamListener = MyStreamListener()
myStream = tweepy.Stream(auth = api.auth, listener=myStreamListener)

# create elasticsearch index
try:
body = {
"mappings": {
"properties": {
"geo": {
"properties": {
"location": {
"type": "geo_point"
}
}
}
}
}
}
es.indices.create(index='tweet_streamer', body=body)
except Exception:
pass


# start the stream
while True:
try:
# keywords to look for in tweets
keywords = ['corona', 'covid', 'coronavirus', 'covid19', 'covid vaccine']
myStream.filter(track=keywords, stall_warnings=True)
except Exception:
print("Unable to start streaming...")
continue

# keywords to look for in tweets
keywords = ['corona', 'covid', 'coronavirus', 'covid19', 'covid vaccine']

# start the stream
myStream.filter(track=keywords, is_async=True)
if __name__ == '__main__':
main()
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
tweepy
dotenv
geocoder
textblob
textblob
elasticsearch

0 comments on commit ae86885

Please sign in to comment.