From ae86885ff4c1280893311b6b9c3ef18be8fa8629 Mon Sep 17 00:00:00 2001 From: Olaoye Somide Date: Fri, 29 Jan 2021 01:18:00 +0100 Subject: [PATCH] ES Implementation --- README.md | 2 +- main.py | 88 ++++++++++++++++++++++++++++++------------------ requirements.txt | 3 +- 3 files changed, 58 insertions(+), 35 deletions(-) diff --git a/README.md b/README.md index 7e45837..77ddc57 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,2 @@ # TweetStreamer -Ingesting & Analyzing Live Tweets In Real-Time Using Kinesis Firehose +Analysing COVID-19 Tweets in real-time using Twitter Streaming API, ElasticSearch & Kibana. diff --git a/main.py b/main.py index 5206ed1..cfb6740 100644 --- a/main.py +++ b/main.py @@ -1,25 +1,20 @@ # import python libraries -import os, json, tweepy, boto3, geocoder +import os, tweepy, geocoder from textblob import TextBlob from dotenv import load_dotenv +from elasticsearch import Elasticsearch +from datetime import datetime, timedelta # load .ENV file load_dotenv() -# aws SDK client -client = boto3.client('firehose', - aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"), - aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"), - region_name=os.getenv("AWS_DEFAULT_REGION") -) - # authenticate twitter api auth = tweepy.OAuthHandler(os.getenv("TWITTER_API_KEY"), os.getenv("TWITTER_API_SECRET")) auth.set_access_token(os.getenv("TWITTER_TOKEN"), os.getenv("TWITTER_TOKEN_SECRET")) api = tweepy.API(auth) -# kinesis firehose configurations -KINESIS_STREAM_NAME = os.getenv("KINESIS_STREAM_NAME") +# connect to ElasticSearch +es = Elasticsearch([{'host': os.getenv("ES_HOST"), 'port': int(os.getenv("ES_PORT"))}]) # create a stream listener class MyStreamListener(tweepy.StreamListener): @@ -43,10 +38,11 @@ def get_location_data(self, location): try: result = geocoder.osm(location) if result.ok: - lat = result.json['raw']['lat'] - lon = result.json['raw']['lon'] + lat = float(result.json['raw']['lat']) + lon = float(result.json['raw']['lon']) country = result.json['raw']['address']['country'] - data = {'latitude': lat, 'longitude': lon, 'country': country} + country_iso2 = result.json['raw']['address']['country_code'].upper() + data = {'latitude': lat, 'longitude': lon, 'country': country, 'country_iso2': country_iso2} return data else: return None @@ -57,28 +53,25 @@ def get_location_data(self, location): def process_event(self, status): - # process event + """This is called when raw data is received from the stream. + This method handles sending the data to other methods, depending on the + message type. + """ event = {} sentiment = self.sentiment_analysis(status.text) location = self.get_location_data(status.user.location) if status.lang == 'en' and location != None and sentiment != None: - event['_id'] = status.id + event['uid'] = status.id event['text'] = status.text event['user'] = status.user.screen_name event['sentiment'] = self.sentiment_analysis(status.text) - event['latitude'] = location['latitude'] - event['longitude'] = location['longitude'] event['country'] = location['country'] - event['created_at'] = (status.created_at).strftime('%Y-%m-%d %H:%M:%S') - - # push event to kinesis firehose - client.put_record( - DeliveryStreamName=os.getenv("KINESIS_STREAM_NAME"), - Record={ - 'Data': json.dumps(event) - } - ) + event['country_iso2'] = location['country_iso2'] + event['geo'] = { 'location': str(location['latitude']) + "," + str(location['longitude']) } + event['@timestamp'] = datetime.now() - timedelta(minutes=60) + # insert data to elasticsearch index + es.index(index='tweet_streamer', id=event['uid'], body=event) print(event) @@ -94,12 +87,41 @@ def on_error(self, status_code): return False -# create a stream -myStreamListener = MyStreamListener() -myStream = tweepy.Stream(auth = api.auth, listener=myStreamListener) +def main(): + # create a stream + myStreamListener = MyStreamListener() + myStream = tweepy.Stream(auth = api.auth, listener=myStreamListener) + + # create elasticsearch index + try: + body = { + "mappings": { + "properties": { + "geo": { + "properties": { + "location": { + "type": "geo_point" + } + } + } + } + } + } + es.indices.create(index='tweet_streamer', body=body) + except Exception: + pass + + + # start the stream + while True: + try: + # keywords to look for in tweets + keywords = ['corona', 'covid', 'coronavirus', 'covid19', 'covid vaccine'] + myStream.filter(track=keywords, stall_warnings=True) + except Exception: + print("Unable to start streaming...") + continue -# keywords to look for in tweets -keywords = ['corona', 'covid', 'coronavirus', 'covid19', 'covid vaccine'] -# start the stream -myStream.filter(track=keywords, is_async=True) \ No newline at end of file +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 99ddc5c..e07fdd9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ tweepy dotenv geocoder -textblob \ No newline at end of file +textblob +elasticsearch \ No newline at end of file