Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(load_testing): Enable some automatic reconnection handling #159

Merged
merged 5 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion load_testing/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,8 @@ With `mix phx.server` up in another terminal:
$ cd load_testing/
$ asdf install
$ poetry install
$ poetry run locust --host http://localhost:4000
$ poetry run locust --host http://localhost:4000 --processes -1
# `processes -1` Launches a worker for each logical core.
# Can also run a specified number of workers.
# See https://docs.locust.io/en/stable/running-distributed.html#single-machine for more options
```
22 changes: 13 additions & 9 deletions load_testing/locustfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,11 @@
{"fields[stop]": "latitude,longitude", "filter[location_type]": "0"},
).json()["data"]


class MobileAppUser(HttpUser, PhoenixChannelUser):
wait_time = between(1, 5)
socket_path = "/socket"

prob_reset_map_data = 0.3
prob_reset_map_data = 0.02
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Decreasing this since reseting map data will be rare. This was causing memory usage to spike.

prob_reset_location = 0.3
prob_reset_nearby_stops = 0.3

Expand All @@ -37,13 +36,18 @@ def nearby_transit(self):
self.location = random.choice(all_stops)["attributes"]
assert self.location is not None
with self.client.rename_request("/api/nearby"):
self.nearby_stop_ids = self.client.get(
"/api/nearby",
params={
"latitude": self.location["latitude"],
"longitude": self.location["longitude"],
},
).json()["stop_ids"]
nearby_result = self.client.get(
"/api/nearby",
params={
"latitude": self.location["latitude"],
"longitude": self.location["longitude"],
},
)
try:
self.nearby_stop_ids = nearby_result.json()["stop_ids"]
except Exception:
print(f"nearby_result: {nearby_result}")
raise
if (
self.stops_channel is not None
and random.random() < self.prob_reset_nearby_stops
Expand Down
48 changes: 31 additions & 17 deletions load_testing/phoenix_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from typing import Any

import gevent
import websocket
import websockets.sync.client as websockets
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The regular async version uses asyncio, which is not supported in locust.

from gevent.event import AsyncResult
from locust import User
from locust.env import Environment
Expand All @@ -27,12 +27,26 @@ def __init__(
self, environment: Environment, url: str, headers: dict | list
) -> None:
self.environment = environment
self.ws = websocket.create_connection(
f"{url}/websocket?vsn=2.0.0", header=headers
)
self.url = f"{url}/websocket?vsn=2.0.0"
self.ws = websockets.connect(f"{url}/websocket?vsn=2.0.0")
self.ws_greenlet = gevent.spawn(self.receive_loop)
self._next_ref = 0
self.open_pushes: dict[str, PhoenixPush] = dict()
gevent.spawn(self.sleep_with_heartbeat)

def receive_loop(self):
for message in self.ws:
logging.debug(ellipsize_string(f"WSR: {message}", 256))
if message != "":
self.on_message(message)

def sleep_with_heartbeat(self):
while True:
gevent.sleep(15)
# [null,"2","phoenix","heartbeat",{}]
heartbeat_push = PhoenixPush(self, None, self.next_ref(), "phoenix", "heartbeat", {})
heartbeat_push.send()


def channel(self, topic: str, payload: dict[str, Any] | None = None):
if payload is None:
Expand All @@ -43,22 +57,13 @@ def disconnect(self):
self.closing = True
self.ws.close()

def receive_loop(self):
while self.ws.connected:
try:
message = self.ws.recv()
logging.debug(ellipsize_string(f"WSR: {message}", 256))
if message != "":
self.on_message(message)
except Exception:
if not self.closing:
raise

def on_message(self, message):
[join_ref, ref, topic, event, payload] = json.loads(message)
self.on_phoenix_message(join_ref, ref, topic, event, payload, len(message))

def on_phoenix_message(self, join_ref, ref, topic, event, payload, response_length):
# print(f"Ref: {ref} event: {event} all_open_pushes: {self.open_pushes} ")
if (
event == "phx_reply"
and (push := self.open_pushes.pop(ref, None)) is not None
Expand All @@ -76,6 +81,13 @@ def on_phoenix_message(self, join_ref, ref, topic, event, payload, response_leng
)
if exception is None:
push.reply.set(payload["response"])

self.environment.events.request.fire(
request_type=f"WS:RECV {event}",
name=topic,
response_time=None,
response_length=response_length,
)
else:
push.reply.set_exception(exception)
else:
Expand All @@ -100,23 +112,25 @@ def __init__(
if payload is None:
payload = dict()
self.join_ref = socket.next_ref()

self.topic = topic
self.join_push = PhoenixPush(
socket, self.join_ref, self.join_ref, topic, "phx_join", payload
)


def join(self):
self.join_push.send()

return self.join_push.get_reply()

def leave(self):
leave_push = PhoenixPush(
self.socket, self.join_ref, self.socket.next_ref(), self.topic, "phx_leave"
self.socket, self.join_ref, self.socket.next_ref(), self.topic, "phx_leave", {}
)
leave_push.send()
return leave_push.get_reply()


class PhoenixPush:
def __init__(
self,
Expand Down Expand Up @@ -149,7 +163,7 @@ def send(self):
self.socket.ws.send(body)

def get_reply(self):
return self.reply.get()
return self.reply.get(5)


class PhoenixChannelUser(User):
Expand Down
Loading
Loading