forked from gsi-upm/scaner
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathpopulate_db.py
130 lines (111 loc) · 4.71 KB
/
populate_db.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import argparse
import requests
import json
import glob
import multiprocessing
from itertools import islice
from time import sleep
from os import path
parser = argparse.ArgumentParser(description="Populate the OrientDB database")
parser.add_argument('files', type=str,
help='folder or files to read tweets from')
parser.add_argument('--host', default='localhost:5000',
help='host where the service is running')
parser.add_argument('--limit', default=None, type=int,
help='limit for brand, timestamps and tweets, if specified')
parser.add_argument('--brand-limit', default=None, type=int,
help='limit of brand to load')
parser.add_argument('--timestamp-limit', default=None, type=int,
help='max number of timestamps files to read')
parser.add_argument('--tweet-limit', default=None, type=int,
help='max number of tweets to send')
parser.add_argument('--quiet', dest='verbose', action='store_false')
parser.set_defaults(verbose=True)
parser.add_argument('--parallel', dest='parallel', action='store_true',
help='use multiples processes to parallelize the loading')
parser.add_argument('--n-jobs', default=multiprocessing.cpu_count(), type=int,
help='number of processes used for parallelization requires parallel option active')
parser.set_defaults(parallel=False)
def set_limit(key, limit):
x_limit = None
if args[key] is None:
x_limit = limit
else:
x_limit = args[key]
return x_limit
def print_count(count, resp=None):
if verbose and not resp is None and resp.status_code == 200:
print("Tweet added ({}). Resp. code: {}".format(str(count), resp.status_code))
elif verbose and resp.status_code == 500:
print("Error at {}".format(str(count)))
elif verbose:
print("Tweet added ({})".format(str(count)))
def post_tweet(line, return_response=False):
tweet_full = json.loads(line)
temp = tweet_full['raw']
tweet={}
for k,v in temp.items():
if v:
tweet[k] = v
r = requests.post(url, headers = headers, data=json.dumps(tweet))
if return_response:
return 1, r
else:
return 1
args = vars(parser.parse_args())
url = 'http://{host}/api/v1/tweets'.format(host=args['host'])
headers = {'Content-Type': 'application/json', 'Accept': 'application/json'}
limit = args['limit']
verbose = args['verbose']
parallel = args['parallel']
n_jobs = args['n_jobs']
brand_limit = set_limit('brand_limit', limit)
timestamp_limit= set_limit('timestamp_limit', limit)
tweet_limit = set_limit('tweet_limit', limit)
if verbose:
print('=Limits=')
print('Brand: {},'.format(brand_limit),
'Timestamp: {},'.format(timestamp_limit),
'Tweet: {}'.format(tweet_limit))
print('=')
counter = 0
in_path = path.abspath(args['files'])
if verbose:
print('Reading from: {}'.format(in_path))
if __name__ == '__main__':
if path.isdir(in_path):
brand_list = [d for d in glob.glob(path.join(in_path, '*'))
if path.isdir(d)]
if verbose:
print('Brands to load from:',
(', '.join([path.basename(b) for b in islice(brand_list,
brand_limit)])))
for brand in islice(brand_list, brand_limit):
timestamps_list = glob.glob("{}/*".format(brand))
for timestamp in islice(timestamps_list, timestamp_limit):
with open(timestamp) as f:
if not parallel:
for line in islice(f, tweet_limit):
counter_, resp = post_tweet(line,
return_response=True)
counter += counter_
if verbose:
print_count(counter, resp)
else:
with multiprocessing.Pool(n_jobs) as pool:
results = pool.map(post_tweet, f)
counter += sum(results)
print('Tweets added ({})'.format(counter))
elif path.isfile(in_path):
with open(in_path) as f:
if not parallel:
for line in islice(f, tweet_limit):
counter_, resp = post_tweet(line, return_response=True)
counter += counter_
if verbose:
print_count(counter, resp)
else:
with multiprocessing.Pool(n_jobs) as pool:
results = pool.map(post_tweet, f)
counter += sum(results)
print('Tweets added ({})'.format(counter))