-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathrcvr.py
105 lines (86 loc) · 3.98 KB
/
rcvr.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
from __future__ import print_function
from proton.reactor import Container
from proton.handlers import MessagingHandler
import time
from loadenv import Options
from wrtr import toFile, toS3, makeFileName
import sys
import json
from io import StringIO # Python 3.x
import gzip
from math import ceil
class Recvr(MessagingHandler):
def __init__(self, opts):
super(Recvr, self).__init__()
# TIMING SETTINGS
self.startTime=time.time()
self.opts=opts
self.messages=[]
def on_reactor_init(self, event):
self.container = event.reactor
self.container.schedule(self.opts.runtime, self)
conn = event.container.connect(url=self.opts.amqp_connection_url,
user=self.opts.connection_username,
password=self.opts.connection_password,
allow_insecure_mechs=True)
if conn:
event.container.create_receiver(conn, source=self.opts.queue_name)
print('Connection Establish')
def on_timer_task(self, event):
event.container.stop()
print('Connection Terminated')
if self.opts.output_maxsize==0:
messageChunks=['\n'.join(self.messages)]
numChunks=['']
else:
totalSizeInMB=sum([sys.getsizeof(message) for message in self.messages])/1024/1024
numberOfFiles=ceil(totalSizeInMB/self.opts.output_maxsize)
numberOfMessagesPerFile=ceil(len(self.messages)/numberOfFiles)
messageChunks = ['\n'.join(self.messages[idx:idx + numberOfMessagesPerFile]) for idx in range(0, len(self.messages), numberOfMessagesPerFile)]
numChunks=['_part%02d'%x for x in range(len(messageChunks))]
for messageChunk, numChunk in zip(messageChunks,numChunks):
filename=makeFileName(self.opts.output_prefix,self.startTime,numChunk+'.njson')
if 'file' in self.opts.output_method:
filepath=toFile(messages=messageChunk,
fdOut=self.opts.output_file_folder,
filename=filename,
compress=self.opts.output_compress)
messageChunk=None
else:
filepath=None
if 's3' in self.opts.output_method:
s3conn_options={'endpoint':'%s:%d'%(self.opts.output_s3_host,self.opts.output_s3_port),
'access_key': self.opts.output_s3_access_key,
'secret_key': self.opts.output_s3_secret_key,
'secure': self.opts.output_s3_secure}
if self.opts.output_s3_region not in [None,'']:
s3conn_options['region']=self.opts.output_s3_region
bucket_options={'bucket_name':self.opts.output_s3_bucket_name,'make_bucket':self.opts.output_s3_make_bucket}
toS3(s3conn_options=s3conn_options,
bucket_options=bucket_options,
filename=filename,
messages=messageChunk,
filepath=filepath,
compress=self.opts.output_compress)
def on_message(self, event):
self.messages.append(json.dumps(dict({'properties':event.message.properties,'body':event.message.body})))
# the on_transport_error event catches socket and authentication failures
def on_transport_error(self, event):
print("Transport error:", event.transport.condition)
MessagingHandler.on_transport_error(self, event)
def on_disconnected(self, event):
print("Disconnected")
def main(SWIMconfigPath='my_configVars.cfg'):
opts = Options(SWIMconfigPath)
try:
container = Container(Recvr(opts))
container.run()
except KeyboardInterrupt:
container.stop()
print()
container.stop()
if __name__ == "__main__":
if len(sys.argv)==1:
main()
else:
main(sys.argv[1])