From 158443e19b8389a93258ea9090e7128ae49a87e3 Mon Sep 17 00:00:00 2001 From: Sameh Abouel-saad Date: Thu, 12 Dec 2024 17:36:18 +0200 Subject: [PATCH] Fix Value error and minor refactor --- tools/rmb_tester/rmb_tester.py | 47 +++++++++++++++++++++------------- 1 file changed, 29 insertions(+), 18 deletions(-) diff --git a/tools/rmb_tester/rmb_tester.py b/tools/rmb_tester/rmb_tester.py index 9224af1..bdabf8a 100755 --- a/tools/rmb_tester/rmb_tester.py +++ b/tools/rmb_tester/rmb_tester.py @@ -74,24 +74,33 @@ def send_all(messages): responses_expected = 0 return_queues = [] with alive_bar(len(messages), title='Sending ..', title_length=12) as bar: - for msg in messages: - r.lpush("msgbus.system.local", msg.to_json()) - responses_expected += len(msg.twin_dst) - return_queues += [msg.reply_to] - bar() + with r.pipeline() as pipe: + for msg in messages: + pipe.lpush("msgbus.system.local", msg.to_json()) + responses_expected += len(msg.twin_dst) + return_queues += [msg.reply_to] + bar() + pipe.execute() # Execute all commands in the pipeline at once return responses_expected, return_queues def wait_all(responses_expected, return_queues, timeout): - responses = [] - err_count = 0 - success_count = 0 - with alive_bar(responses_expected, title='Waiting ..', title_length=12) as bar: - for _ in range(responses_expected): - start = timer() - result = r.blpop(return_queues, timeout=timeout) - if not result: - break - timeout = timeout - round(timer() - start, 3) + responses = [] + err_count = 0 + success_count = 0 + start_time = timer() + + with alive_bar(responses_expected, title='Waiting ..', title_length=12) as bar: + while responses_expected > 0: + elapsed_time = timer() - start_time + remaining_time = timeout - elapsed_time + + if remaining_time <= 0: + print("Timeout reached, stopping waiting for responses.") + break + + # Use the remaining time for the blpop timeout + result = r.blpop(return_queues, timeout=remaining_time) + if result: response = Message.from_json(result[1]) responses.append(response) if response.err is not None: @@ -101,7 +110,9 @@ def wait_all(responses_expected, return_queues, timeout): success_count += 1 bar.text(f'received a response from twin {response.twin_src} ✅') bar() - return responses, err_count, success_count + responses_expected -= 1 + + return responses, err_count, success_count def main(): global r @@ -135,7 +146,7 @@ def main(): print(f"received_success: {success_count}") print(f"received_errors: {err_count}") print(f"no response errors (client give up): {no_responses}") - responding = {int(response.twin_src) for response in responses} + responding = {int(response.twin_src) for response in responses if response.twin_src != "" } not_responding = set(args.dest) - responding print(f"twins not responding (twin IDs): {' '.join(map(str, not_responding))}") print(f"elapsed time: {elapsed_time}") @@ -144,7 +155,7 @@ def main(): print("Responses:") print("=======================") for response in responses: - print(response) + print({k: v for k, v in response.__dict__.items() if v}) print("=======================") print("Errors:") print("=======================")