Skip to content

Commit

Permalink
feat(load_testing): Enable some automatic reconnection handling (#159)
Browse files Browse the repository at this point in the history
* feat(load_testing): Enable some automatic reconnection handling

* refactor(load_testing): switch ws library

The previous library was experiencing flaky SSL errors when sending messages and had high CPU usage while running locally, even in distributed mode. Switching this library seems to have resolved those issues - ran tests for 200 users joining & leaving the same sets of stops and encountered client errors only caused by backend failures

* doc(load_testing): Remove note about keyboard interrupt

* cleanup: remove unused dependencies

* cleanup(load_testing): import order, unused var
  • Loading branch information
KaylaBrady authored Jun 26, 2024
1 parent 557b36e commit 6429ae9
Show file tree
Hide file tree
Showing 5 changed files with 323 additions and 229 deletions.
5 changes: 4 additions & 1 deletion load_testing/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,8 @@ With `mix phx.server` up in another terminal:
$ cd load_testing/
$ asdf install
$ poetry install
$ poetry run locust --host http://localhost:4000
$ poetry run locust --host http://localhost:4000 --processes -1
# `processes -1` Launches a worker for each logical core.
# Can also run a specified number of workers.
# See https://docs.locust.io/en/stable/running-distributed.html#single-machine for more options
```
22 changes: 13 additions & 9 deletions load_testing/locustfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,11 @@
{"fields[stop]": "latitude,longitude", "filter[location_type]": "0"},
).json()["data"]


class MobileAppUser(HttpUser, PhoenixChannelUser):
wait_time = between(1, 5)
socket_path = "/socket"

prob_reset_map_data = 0.3
prob_reset_map_data = 0.02
prob_reset_location = 0.3
prob_reset_nearby_stops = 0.3

Expand All @@ -37,13 +36,18 @@ def nearby_transit(self):
self.location = random.choice(all_stops)["attributes"]
assert self.location is not None
with self.client.rename_request("/api/nearby"):
self.nearby_stop_ids = self.client.get(
"/api/nearby",
params={
"latitude": self.location["latitude"],
"longitude": self.location["longitude"],
},
).json()["stop_ids"]
nearby_result = self.client.get(
"/api/nearby",
params={
"latitude": self.location["latitude"],
"longitude": self.location["longitude"],
},
)
try:
self.nearby_stop_ids = nearby_result.json()["stop_ids"]
except Exception:
print(f"nearby_result: {nearby_result}")
raise
if (
self.stops_channel is not None
and random.random() < self.prob_reset_nearby_stops
Expand Down
48 changes: 31 additions & 17 deletions load_testing/phoenix_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from typing import Any

import gevent
import websocket
import websockets.sync.client as websockets
from gevent.event import AsyncResult
from locust import User
from locust.env import Environment
Expand All @@ -27,12 +27,26 @@ def __init__(
self, environment: Environment, url: str, headers: dict | list
) -> None:
self.environment = environment
self.ws = websocket.create_connection(
f"{url}/websocket?vsn=2.0.0", header=headers
)
self.url = f"{url}/websocket?vsn=2.0.0"
self.ws = websockets.connect(f"{url}/websocket?vsn=2.0.0")
self.ws_greenlet = gevent.spawn(self.receive_loop)
self._next_ref = 0
self.open_pushes: dict[str, PhoenixPush] = dict()
gevent.spawn(self.sleep_with_heartbeat)

def receive_loop(self):
for message in self.ws:
logging.debug(ellipsize_string(f"WSR: {message}", 256))
if message != "":
self.on_message(message)

def sleep_with_heartbeat(self):
while True:
gevent.sleep(15)
# [null,"2","phoenix","heartbeat",{}]
heartbeat_push = PhoenixPush(self, None, self.next_ref(), "phoenix", "heartbeat", {})
heartbeat_push.send()


def channel(self, topic: str, payload: dict[str, Any] | None = None):
if payload is None:
Expand All @@ -43,22 +57,13 @@ def disconnect(self):
self.closing = True
self.ws.close()

def receive_loop(self):
while self.ws.connected:
try:
message = self.ws.recv()
logging.debug(ellipsize_string(f"WSR: {message}", 256))
if message != "":
self.on_message(message)
except Exception:
if not self.closing:
raise

def on_message(self, message):
[join_ref, ref, topic, event, payload] = json.loads(message)
self.on_phoenix_message(join_ref, ref, topic, event, payload, len(message))

def on_phoenix_message(self, join_ref, ref, topic, event, payload, response_length):
# print(f"Ref: {ref} event: {event} all_open_pushes: {self.open_pushes} ")
if (
event == "phx_reply"
and (push := self.open_pushes.pop(ref, None)) is not None
Expand All @@ -76,6 +81,13 @@ def on_phoenix_message(self, join_ref, ref, topic, event, payload, response_leng
)
if exception is None:
push.reply.set(payload["response"])

self.environment.events.request.fire(
request_type=f"WS:RECV {event}",
name=topic,
response_time=None,
response_length=response_length,
)
else:
push.reply.set_exception(exception)
else:
Expand All @@ -100,23 +112,25 @@ def __init__(
if payload is None:
payload = dict()
self.join_ref = socket.next_ref()

self.topic = topic
self.join_push = PhoenixPush(
socket, self.join_ref, self.join_ref, topic, "phx_join", payload
)


def join(self):
self.join_push.send()

return self.join_push.get_reply()

def leave(self):
leave_push = PhoenixPush(
self.socket, self.join_ref, self.socket.next_ref(), self.topic, "phx_leave"
self.socket, self.join_ref, self.socket.next_ref(), self.topic, "phx_leave", {}
)
leave_push.send()
return leave_push.get_reply()


class PhoenixPush:
def __init__(
self,
Expand Down Expand Up @@ -149,7 +163,7 @@ def send(self):
self.socket.ws.send(body)

def get_reply(self):
return self.reply.get()
return self.reply.get(5)


class PhoenixChannelUser(User):
Expand Down
Loading

0 comments on commit 6429ae9

Please sign in to comment.