forked from project-receptor/receptor-stresstest
-
Notifications
You must be signed in to change notification settings - Fork 0
/
tester.py
31 lines (25 loc) · 838 Bytes
/
tester.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from queue import Queue, Empty
import json
import threading
import time
from receptor_stresstest import worker
class FakeInnerEnvelope:
raw_payload = None
def monitor_queue(queue, event):
while not event.is_set():
try:
qsize = queue.qsize()
item = queue.get(False)
except Empty:
time.sleep(0.1)
else:
print(f"Got item from {item['ts']} Blob length: {len(item['blob'])}. Queue size: {qsize}")
if __name__ == '__main__':
queue = Queue()
message = FakeInnerEnvelope()
message.raw_payload = json.dumps(dict(rate=50, length=10, size=512))
all_done = threading.Event()
monitor_thread = threading.Thread(target=monitor_queue, args=(queue, all_done))
monitor_thread.start()
worker.blunderbuss(message, {}, queue)
all_done.set()