Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(load_testing): Enable some automatic reconnection handling #159

Merged
merged 5 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion load_testing/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,9 @@ With `mix phx.server` up in another terminal:
$ cd load_testing/
$ asdf install
$ poetry install
$ poetry run locust --host http://localhost:4000
$ poetry run locust --host http://localhost:4000 --processes -1
# `processes -1` Launches a worker for each logical core.
# Can also run a specified number of workers.
# See https://docs.locust.io/en/stable/running-distributed.html#single-machine for more options
# Keyboard interrupt is only working if running multiple workers
```
3 changes: 2 additions & 1 deletion load_testing/locustfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class MobileAppUser(HttpUser, PhoenixChannelUser):
wait_time = between(1, 5)
socket_path = "/socket"

prob_reset_map_data = 0.3
prob_reset_map_data = 0.02
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Decreasing this since reseting map data will be rare. This was causing memory usage to spike.

prob_reset_location = 0.3
prob_reset_nearby_stops = 0.3

Expand Down Expand Up @@ -48,6 +48,7 @@ def nearby_transit(self):
self.stops_channel is not None
and random.random() < self.prob_reset_nearby_stops
):
print("Leaving")
self.stops_channel.leave()
self.stops_channel = None
if self.stops_channel is None:
Expand Down
70 changes: 55 additions & 15 deletions load_testing/phoenix_channel.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
# derived from https://github.com/SvenskaSpel/locust-plugins/blob/4.4.3/locust_plugins/users/socketio.py
import json
import logging
import threading
import time
import urllib
import urllib.parse
from typing import Any

import gevent
import rel
import websocket
from gevent.event import AsyncResult
from locust import User
Expand All @@ -27,13 +29,27 @@ def __init__(
self, environment: Environment, url: str, headers: dict | list
) -> None:
self.environment = environment
self.ws = websocket.create_connection(
f"{url}/websocket?vsn=2.0.0", header=headers
)
self.ws_greenlet = gevent.spawn(self.receive_loop)
self.ws = websocket.WebSocketApp( f"{url}/websocket?vsn=2.0.0",
on_open=self.on_open,
on_message=self.on_socket_message,
on_error=self.on_error,
on_close=self.on_close)


# run_forever is blocking
# https://github.com/websocket-client/websocket-client/issues/980#issuecomment-2065628852
daemon = threading.Thread(target=self.run_forever)
daemon.daemon = True
daemon.start()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw this comment suggesting that threading can be avoided by using rel b/c it is async, but I found run_forever was blocking even when using rel as the dispatcher.

I don't love this threading, and I'm pretty sure it is the reason why keyboard interrupt doesn't work when running locust with a single worker.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I'm going to try getting rid of rel entirely. Seems like it isn't strictly necessary websocket-client/websocket-client#969


self._next_ref = 0
self.open_pushes: dict[str, PhoenixPush] = dict()

def run_forever(self):
self.ws.run_forever(dispatcher=rel, reconnect=2, ping_interval=60)
rel.signal(2, rel.abort) # Keyboard Interrupt
rel.dispatch()

def channel(self, topic: str, payload: dict[str, Any] | None = None):
if payload is None:
payload = dict()
Expand All @@ -43,16 +59,25 @@ def disconnect(self):
self.closing = True
self.ws.close()

def receive_loop(self):
while self.ws.connected:
try:
message = self.ws.recv()
logging.debug(ellipsize_string(f"WSR: {message}", 256))
if message != "":
self.on_message(message)
except Exception:
if not self.closing:
raise
def on_socket_message(self, ws, message):
try:

logging.debug(ellipsize_string(f"WSR: {message}", 256))
if message != "":
self.on_message(message)
except Exception:
if not self.closing:
raise

def on_error(self, ws, error):
print(f"Socket error: {error}")

def on_close(self, ws, close_status_code, close_msg):
print(f"Socket closed: {close_status_code} {close_msg}")

def on_open(self, ws):
print("Socket opened")


def on_message(self, message):
[join_ref, ref, topic, event, payload] = json.loads(message)
Expand Down Expand Up @@ -100,22 +125,37 @@ def __init__(
if payload is None:
payload = dict()
self.join_ref = socket.next_ref()
self.sleep_ref = 1


self.topic = topic
self.join_push = PhoenixPush(
socket, self.join_ref, self.join_ref, topic, "phx_join", payload
)


def join(self):
print("Joining topic")
self.join_push.send()
self.sleep_with_heartbeat(60)
return self.join_push.get_reply()

def leave(self):
leave_push = PhoenixPush(
self.socket, self.join_ref, self.socket.next_ref(), self.topic, "phx_leave"
self.socket, self.join_ref, self.socket.next_ref(), self.topic, "phx_leave", {}
)
leave_push.send()
return leave_push.get_reply()

def sleep_with_heartbeat(self, seconds):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pulled from dotcom

while seconds >= 0:
gevent.sleep(min(15, seconds))
seconds -= 15
self.sleep_ref += 1
# [null,"2","phoenix","heartbeat",{}]
heartbeat_push = PhoenixPush(self.socket, None, self.sleep_ref, "phoenix", "heartbeat", {})
heartbeat_push.send()


class PhoenixPush:
def __init__(
Expand Down
Loading
Loading