Skip to content

Commit

Permalink
Server benchmarks: More accurate results by synchronizing client work…
Browse files Browse the repository at this point in the history
…ers (#283)
  • Loading branch information
francis-clairicia authored Apr 27, 2024
1 parent 5c7e26c commit 894896e
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 2 deletions.
12 changes: 11 additions & 1 deletion benchmark_server/datagram_echoclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import gc
import json
import multiprocessing
import multiprocessing.synchronize
import socket
import sys
from typing import Literal, assert_never
Expand All @@ -19,6 +20,7 @@


def run_test(
barrier: multiprocessing.synchronize.Barrier,
socket_family: int,
address: str | tuple[str, int],
message_size: int,
Expand All @@ -44,6 +46,8 @@ def run_test(

from time import perf_counter

barrier.wait(timeout=1)

current_test_duration = 0.0
while current_test_duration < duration:
request_start_time = perf_counter()
Expand Down Expand Up @@ -80,10 +84,16 @@ def main() -> None:

nb_workers: int = args.workers
message_size: int = args.msize
with concurrent.futures.ProcessPoolExecutor(max_workers=nb_workers, mp_context=multiprocessing.get_context("spawn")) as e:
mp_context = multiprocessing.get_context("spawn")
with (
mp_context.Manager() as manager,
concurrent.futures.ProcessPoolExecutor(max_workers=nb_workers, mp_context=mp_context) as e,
):
barrier: multiprocessing.synchronize.Barrier = manager.Barrier(nb_workers) # type: ignore[attr-defined]
workers_list = [
e.submit(
run_test,
barrier=barrier,
socket_family=SOCKFAMILY,
address=SOCKADDR,
message_size=message_size,
Expand Down
12 changes: 11 additions & 1 deletion benchmark_server/stream_echoclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import gc
import json
import multiprocessing
import multiprocessing.synchronize
import socket
import ssl
import sys
Expand All @@ -20,6 +21,7 @@


def run_test(
barrier: multiprocessing.synchronize.Barrier,
socket_family: int,
address: str | tuple[str, int],
over_ssl: bool,
Expand Down Expand Up @@ -61,6 +63,8 @@ def run_test(

from time import perf_counter

barrier.wait(timeout=1)

current_test_duration = 0.0
while current_test_duration < duration:
request_start_time = perf_counter()
Expand Down Expand Up @@ -104,10 +108,16 @@ def main() -> None:

nb_workers: int = args.workers
message_size: int = args.msize
with concurrent.futures.ProcessPoolExecutor(max_workers=nb_workers, mp_context=multiprocessing.get_context("spawn")) as e:
mp_context = multiprocessing.get_context("spawn")
with (
mp_context.Manager() as manager,
concurrent.futures.ProcessPoolExecutor(max_workers=nb_workers, mp_context=mp_context) as e,
):
barrier: multiprocessing.synchronize.Barrier = manager.Barrier(nb_workers) # type: ignore[attr-defined]
workers_list = [
e.submit(
run_test,
barrier=barrier,
socket_family=SOCKFAMILY,
address=SOCKADDR,
over_ssl=args.ssl,
Expand Down

0 comments on commit 894896e

Please sign in to comment.