forked from dylanpdx/BetterTwitFix
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcache.py
147 lines (138 loc) · 4.94 KB
/
cache.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
from configHandler import config
import pymongo
from datetime import date,datetime
import json
import os
import boto3
import vxlogging as log
from utils import getTweetIdFromUrl
link_cache_system = config['config']['link_cache']
link_cache = {}
DYNAMO_CACHE_TBL=None
if link_cache_system=="dynamodb": # pragma: no cover
DYNAMO_CACHE_TBL=config['config']['table']
if link_cache_system == "json":
link_cache = {}
if not os.path.exists("links.json"):
with open("links.json", "w") as outfile:
default_link_cache = {}
json.dump(default_link_cache, outfile)
try:
f = open('links.json',)
link_cache = json.load(f)
except json.decoder.JSONDecodeError:
log.warn("Failed to load cache JSON file. Creating new file.")
link_cache = {}
except FileNotFoundError:
log.warn("Failed to load cache JSON file. Creating new file.")
link_cache = {}
finally:
f.close()
elif link_cache_system == "ram":
link_cache = {}
log.warn("Your link_cache_system is set to 'ram' which is not recommended; this is only intended to be used for tests")
elif link_cache_system == "db":
client = pymongo.MongoClient(config['config']['database'], connect=False)
table = config['config']['table']
db = client[table]
elif link_cache_system == "dynamodb": # pragma: no cover
client = boto3.resource('dynamodb')
def serializeUnknown(obj):
if isinstance(obj, (datetime, date)):
return obj.isoformat()
raise TypeError ("Type %s not serializable" % type(obj))
def addVnfToTweetIdCache(tweet_id, vnf):
global link_cache
try:
if link_cache_system == "db":
out = db.linkCache.update_one(vnf)
log.debug("Link added to DB cache ")
return True
elif link_cache_system == "json":
link_cache[tweet_id] = vnf
with open("links.json", "w") as outfile:
json.dump(link_cache, outfile, indent=4, sort_keys=True, default=serializeUnknown)
log.debug("Link added to JSON cache ")
return True
elif link_cache_system == "ram": # FOR TESTS ONLY
link_cache[tweet_id] = vnf
log.debug("Link added to RAM cache ")
elif link_cache_system == "dynamodb": # pragma: no cover
vnf["ttl"] = int(vnf["ttl"].strftime('%s'))
table = client.Table(DYNAMO_CACHE_TBL)
table.put_item(
Item={
'tweet': tweet_id,
'vnf': vnf,
'ttl':vnf["ttl"]
}
)
log.debug("Link added to dynamodb cache ")
return True
except Exception as e:
log.error("Failed to add link to DB cache: "+str(e)+" "+tweet_id)
return False
def addVnfToLinkCache(twitter_url, vnf):
return addVnfToTweetIdCache(getTweetIdFromUrl(twitter_url), vnf)
def getVnfFromTweetIdCache(tweet_id):
global link_cache
if link_cache_system == "db":
collection = db.linkCache
vnf = collection.find_one({'tweet': tweet_id})
if vnf != None:
hits = ( vnf['hits'] + 1 )
log.debug("Link located in DB cache.")
query = { 'tweet': tweet_id }
change = { "$set" : { "hits" : hits } }
out = db.linkCache.update_one(query, change)
return vnf
else:
log.debug("Link not in DB cache")
return None
elif link_cache_system == "json":
if tweet_id in link_cache:
log.debug("Link located in json cache")
vnf = link_cache[tweet_id]
return vnf
else:
log.debug("Link not in json cache")
return None
elif link_cache_system == "dynamodb": # pragma: no cover
table = client.Table(DYNAMO_CACHE_TBL)
response = table.get_item(
Key={
'tweet': tweet_id
}
)
if 'Item' in response:
log.debug("Link located in dynamodb cache")
vnf = response['Item']['vnf']
return vnf
else:
log.debug("Link not in dynamodb cache")
return None
elif link_cache_system == "ram": # FOR TESTS ONLY
if tweet_id in link_cache:
log.debug("Link located in json cache")
vnf = link_cache[tweet_id]
return vnf
else:
log.debug("Link not in cache")
return None
elif link_cache_system == "none":
return None
def getVnfFromLinkCache(twitter_url):
return getVnfFromTweetIdCache(getTweetIdFromUrl(twitter_url))
def clearCache():
global link_cache
# only intended for use in tests
if link_cache_system == "ram":
link_cache={}
def setCache(value):
newCache = {}
for key in value:
newCache[key.lower()] = value[key]
global link_cache
# only intended for use in tests
if link_cache_system == "ram":
link_cache=newCache