From 894896ee1445f138826b79fd8dee9f7bca4de6d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francis=20Clairicia-Rose-Claire-Jos=C3=A9phine?= Date: Sat, 27 Apr 2024 13:05:23 +0200 Subject: [PATCH] Server benchmarks: More accurate results by synchronizing client workers (#283) --- benchmark_server/datagram_echoclient.py | 12 +++++++++++- benchmark_server/stream_echoclient.py | 12 +++++++++++- 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/benchmark_server/datagram_echoclient.py b/benchmark_server/datagram_echoclient.py index ed9584b9..0ecb743c 100755 --- a/benchmark_server/datagram_echoclient.py +++ b/benchmark_server/datagram_echoclient.py @@ -11,6 +11,7 @@ import gc import json import multiprocessing +import multiprocessing.synchronize import socket import sys from typing import Literal, assert_never @@ -19,6 +20,7 @@ def run_test( + barrier: multiprocessing.synchronize.Barrier, socket_family: int, address: str | tuple[str, int], message_size: int, @@ -44,6 +46,8 @@ def run_test( from time import perf_counter + barrier.wait(timeout=1) + current_test_duration = 0.0 while current_test_duration < duration: request_start_time = perf_counter() @@ -80,10 +84,16 @@ def main() -> None: nb_workers: int = args.workers message_size: int = args.msize - with concurrent.futures.ProcessPoolExecutor(max_workers=nb_workers, mp_context=multiprocessing.get_context("spawn")) as e: + mp_context = multiprocessing.get_context("spawn") + with ( + mp_context.Manager() as manager, + concurrent.futures.ProcessPoolExecutor(max_workers=nb_workers, mp_context=mp_context) as e, + ): + barrier: multiprocessing.synchronize.Barrier = manager.Barrier(nb_workers) # type: ignore[attr-defined] workers_list = [ e.submit( run_test, + barrier=barrier, socket_family=SOCKFAMILY, address=SOCKADDR, message_size=message_size, diff --git a/benchmark_server/stream_echoclient.py b/benchmark_server/stream_echoclient.py index bd1a455e..afce2d11 100755 --- a/benchmark_server/stream_echoclient.py +++ b/benchmark_server/stream_echoclient.py @@ -11,6 +11,7 @@ import gc import json import multiprocessing +import multiprocessing.synchronize import socket import ssl import sys @@ -20,6 +21,7 @@ def run_test( + barrier: multiprocessing.synchronize.Barrier, socket_family: int, address: str | tuple[str, int], over_ssl: bool, @@ -61,6 +63,8 @@ def run_test( from time import perf_counter + barrier.wait(timeout=1) + current_test_duration = 0.0 while current_test_duration < duration: request_start_time = perf_counter() @@ -104,10 +108,16 @@ def main() -> None: nb_workers: int = args.workers message_size: int = args.msize - with concurrent.futures.ProcessPoolExecutor(max_workers=nb_workers, mp_context=multiprocessing.get_context("spawn")) as e: + mp_context = multiprocessing.get_context("spawn") + with ( + mp_context.Manager() as manager, + concurrent.futures.ProcessPoolExecutor(max_workers=nb_workers, mp_context=mp_context) as e, + ): + barrier: multiprocessing.synchronize.Barrier = manager.Barrier(nb_workers) # type: ignore[attr-defined] workers_list = [ e.submit( run_test, + barrier=barrier, socket_family=SOCKFAMILY, address=SOCKADDR, over_ssl=args.ssl,