Skip to content

Commit

Permalink
Tests: Fix broadcaster disconnection test race condition
Browse files Browse the repository at this point in the history
  • Loading branch information
roekatz committed Apr 24, 2024
1 parent 3d6bc24 commit 4c83f56
Showing 1 changed file with 33 additions and 12 deletions.
45 changes: 33 additions & 12 deletions tests/broadcaster_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from multiprocessing import Process

from fastapi_websocket_rpc.logger import get_logger, logging_config, LoggingModes

logging_config.set_mode(LoggingModes.LOGURU)

# Add parent path to use local src as package for tests
Expand All @@ -24,7 +25,11 @@

logger = get_logger("Test")
logger.remove()
logger.add(sys.stderr, format="<green>{time}</green> | {process} | <blue>{name: <50}</blue>|<level>{level:^6} | {message}</level>", level="INFO")
logger.add(
sys.stderr,
format="<green>{time}</green> | {process} | <blue>{name: <50}</blue>|<level>{level:^6} | {message}</level>",
level="INFO",
)

# Configurable
PORT = int(os.environ.get("PORT") or "7990")
Expand All @@ -37,26 +42,28 @@
PG_HOST_PORT = 25432
PG_SLEEP_TIME = 10


@pytest.fixture()
def postgres(request):
CONTAINER_NAME = "broadcastdb" + "".join(
[random.choice(string.ascii_letters) for _ in range(8)]
)

def rm_container():
os.system(f'docker rm -f {CONTAINER_NAME} > /dev/null 2>&1')
os.system(f"docker rm -f {CONTAINER_NAME} > /dev/null 2>&1")

rm_container() # Make sure no previous container exists
rm_container() # Make sure no previous container exists

postgres_args = ''
postgres_args = ""
timeout_marker = request.node.get_closest_marker("postgres_idle_timeout")
if timeout_marker is not None:
timeout = timeout_marker.args[0]
postgres_args = f'-c idle_session_timeout={timeout} -c idle_in_transaction_session_timeout={timeout}'

postgres_args = f"-c idle_session_timeout={timeout} -c idle_in_transaction_session_timeout={timeout}"

logger.info(f"running postgres on host port {PG_HOST_PORT}...")
os.system(f'docker run -d -p {PG_HOST_PORT}:5432 --name {CONTAINER_NAME} -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=postgres postgres:alpine {postgres_args} > /dev/null 2>&1')
os.system(
f"docker run -d -p {PG_HOST_PORT}:5432 --name {CONTAINER_NAME} -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=postgres postgres:alpine {postgres_args} > /dev/null 2>&1"
)
logger.info(f"Sleeping for {PG_SLEEP_TIME} seconds so postgres could stabilize")
time.sleep(PG_SLEEP_TIME)

Expand All @@ -65,18 +72,28 @@ def rm_container():
finally:
rm_container()


def setup_pubsub_endpoint(app: FastAPI, broadcast_url: str, path: str):
"""
sets up endpoints on the fastapi app:
- a pub/sub websocket endpoint for clients to connect to
- a trigger endpoint that causes the pub/sub server to publish a message on a predefined topic
"""
logger.info(f"[{path} endpoint] connecting to broadcast backbone service on '{broadcast_url}'")
endpoint = PubSubEndpoint(broadcaster=broadcast_url, ignore_broadcaster_disconnected=False)
logger.info(
f"[{path} endpoint] connecting to broadcast backbone service on '{broadcast_url}'"
)
endpoint = PubSubEndpoint(
broadcaster=broadcast_url, ignore_broadcaster_disconnected=False
)

@app.websocket(path)
async def websocket_rpc_endpoint(websocket: WebSocket):
await endpoint.main_loop(websocket)
try:
# Close connection if not already closed
await websocket.close()
except:
pass

@app.get(f"{path}/trigger")
async def trigger_events():
Expand All @@ -100,10 +117,11 @@ def setup_server(broadcast_url):
logger.info("Running server app")
uvicorn.run(app, port=PORT)


@pytest.fixture()
def server(postgres):
# Run the server as a separate process
proc = Process(target=setup_server, args=(postgres, ), daemon=True)
proc = Process(target=setup_server, args=(postgres,), daemon=True)
proc.start()
logger.info("Server started on a daemon process")
yield proc
Expand Down Expand Up @@ -132,7 +150,10 @@ async def on_event(data, topic):

async with PubSubClient() as client1:
async with PubSubClient() as client2:
for c, uri in [(client1,first_endpoint_uri), (client2,second_endpoint_uri)]:
for c, uri in [
(client1, first_endpoint_uri),
(client2, second_endpoint_uri),
]:
c.subscribe(EVENT_TOPIC, on_event)
c.start_client(uri)
await c.wait_until_ready()
Expand All @@ -155,6 +176,7 @@ async def wait_for_sem():
if repeat + 1 < repeats:
await asyncio.sleep(interval)


@pytest.mark.postgres_idle_timeout(3000)
@pytest.mark.asyncio
async def test_idle_pg_broadcaster_disconnect(server):
Expand All @@ -170,4 +192,3 @@ async def test_idle_pg_broadcaster_disconnect(server):
- all servers (and clients) will get both of the messages
"""
await test_all_clients_get_a_topic_via_broadcast(server, repeats=3, interval=4)

0 comments on commit 4c83f56

Please sign in to comment.