Skip to content

Commit

Permalink
Fix Value error and minor refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
sameh-farouk committed Dec 12, 2024
1 parent 0ef17e7 commit 158443e
Showing 1 changed file with 29 additions and 18 deletions.
47 changes: 29 additions & 18 deletions tools/rmb_tester/rmb_tester.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,24 +74,33 @@ def send_all(messages):
responses_expected = 0
return_queues = []
with alive_bar(len(messages), title='Sending ..', title_length=12) as bar:
for msg in messages:
r.lpush("msgbus.system.local", msg.to_json())
responses_expected += len(msg.twin_dst)
return_queues += [msg.reply_to]
bar()
with r.pipeline() as pipe:
for msg in messages:
pipe.lpush("msgbus.system.local", msg.to_json())
responses_expected += len(msg.twin_dst)
return_queues += [msg.reply_to]
bar()
pipe.execute() # Execute all commands in the pipeline at once
return responses_expected, return_queues

def wait_all(responses_expected, return_queues, timeout):
responses = []
err_count = 0
success_count = 0
with alive_bar(responses_expected, title='Waiting ..', title_length=12) as bar:
for _ in range(responses_expected):
start = timer()
result = r.blpop(return_queues, timeout=timeout)
if not result:
break
timeout = timeout - round(timer() - start, 3)
responses = []
err_count = 0
success_count = 0
start_time = timer()

with alive_bar(responses_expected, title='Waiting ..', title_length=12) as bar:
while responses_expected > 0:
elapsed_time = timer() - start_time
remaining_time = timeout - elapsed_time

if remaining_time <= 0:
print("Timeout reached, stopping waiting for responses.")
break

# Use the remaining time for the blpop timeout
result = r.blpop(return_queues, timeout=remaining_time)
if result:
response = Message.from_json(result[1])
responses.append(response)
if response.err is not None:
Expand All @@ -101,7 +110,9 @@ def wait_all(responses_expected, return_queues, timeout):
success_count += 1
bar.text(f'received a response from twin {response.twin_src} ✅')
bar()
return responses, err_count, success_count
responses_expected -= 1

return responses, err_count, success_count

def main():
global r
Expand Down Expand Up @@ -135,7 +146,7 @@ def main():
print(f"received_success: {success_count}")
print(f"received_errors: {err_count}")
print(f"no response errors (client give up): {no_responses}")
responding = {int(response.twin_src) for response in responses}
responding = {int(response.twin_src) for response in responses if response.twin_src != "" }
not_responding = set(args.dest) - responding
print(f"twins not responding (twin IDs): {' '.join(map(str, not_responding))}")
print(f"elapsed time: {elapsed_time}")
Expand All @@ -144,7 +155,7 @@ def main():
print("Responses:")
print("=======================")
for response in responses:
print(response)
print({k: v for k, v in response.__dict__.items() if v})
print("=======================")
print("Errors:")
print("=======================")
Expand Down

0 comments on commit 158443e

Please sign in to comment.