Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(ScheduleController): Fetch schedules by individual stop for more cache hits #231

Merged
merged 19 commits into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
2cd5a37
perf(ScheduleController): Fetch schedules by individual stop for more…
KaylaBrady Nov 1, 2024
f1cb85e
perf(MBTAV3API.Repository): Cache schedules responses for 1 hour
KaylaBrady Nov 4, 2024
fa81ced
feat(locustfile): Represent global data caching
KaylaBrady Nov 4, 2024
35a8a64
feat(load_testing): more realistic stop distribution
KaylaBrady Nov 6, 2024
711d8cc
fix(RepositoryCache): Actually start the cache
KaylaBrady Nov 6, 2024
3d7349a
style(locustfile): run linting
KaylaBrady Nov 6, 2024
3cb153f
perf(Repository): Try TTL cache for all static GTFS requests
KaylaBrady Nov 7, 2024
8493926
fix(RepositoryTest): clear cache
KaylaBrady Nov 7, 2024
9ec572e
fix(Locustfile): More realistic nearby stops numbers
KaylaBrady Nov 7, 2024
7015907
perf(ScheduleController): Don't use task when only one stop
KaylaBrady Nov 7, 2024
43f4bb9
perf(ScheduleController): Only make requests async when more than 1
KaylaBrady Nov 7, 2024
b487e91
test(Repository): Test schedules actually cached
KaylaBrady Nov 7, 2024
7245675
cleanup(locustfile): stray prints
KaylaBrady Nov 7, 2024
62b59fe
revert locustfile changes for separate PR
KaylaBrady Nov 7, 2024
83e238b
refactor(Repository): Cache all/3, remove unused alerts fn
KaylaBrady Nov 8, 2024
25330fd
feat(ScheduleController): unordered tasks & unsorted schedule list
KaylaBrady Nov 8, 2024
aef24c8
style: fix formatting
KaylaBrady Nov 8, 2024
f6c9331
fix(ScheduleControllerTest): Remove sorting expectation
KaylaBrady Nov 8, 2024
a8be11f
Merge branch 'main' into kb-cacheable-scheds
KaylaBrady Nov 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ config :mobile_app_backend, MBTAV3API.ResponseCache,
gc_interval: :timer.hours(1),
allocated_memory: 250_000_000

config :mobile_app_backend, MBTAV3API.RepositoryCache,
gc_interval: :timer.hours(2),
allocated_memory: 250_000_000
Comment on lines +24 to +25
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting to 2 hours based on the recommendation here that ttl should be less than gc_interval


# Configures the endpoint
config :mobile_app_backend, MobileAppBackendWeb.Endpoint,
url: [host: "localhost"],
Expand Down
10 changes: 10 additions & 0 deletions lib/mbta_v3_api/repository.ex
Original file line number Diff line number Diff line change
Expand Up @@ -67,24 +67,34 @@ end

defmodule MBTAV3API.Repository.Impl do
@behaviour MBTAV3API.Repository

use Nebulex.Caching.Decorators

alias MBTAV3API.JsonApi

@ttl :timer.hours(1)

@impl true
def alerts(params, opts \\ []), do: all(MBTAV3API.Alert, params, opts)

@impl true
@decorate cacheable(cache: MBTAV3API.RepositoryCache, on_error: :nothing, opts: [ttl: @ttl])
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pulled from dotcom

def route_patterns(params, opts \\ []), do: all(MBTAV3API.RoutePattern, params, opts)

@impl true
@decorate cacheable(cache: MBTAV3API.RepositoryCache, on_error: :nothing, opts: [ttl: @ttl])
def routes(params, opts \\ []), do: all(MBTAV3API.Route, params, opts)

@impl true
@decorate cacheable(cache: MBTAV3API.RepositoryCache, on_error: :nothing, opts: [ttl: @ttl])
def schedules(params, opts \\ []), do: all(MBTAV3API.Schedule, params, opts)

@impl true
@decorate cacheable(cache: MBTAV3API.RepositoryCache, on_error: :nothing, opts: [ttl: @ttl])
def stops(params, opts \\ []), do: all(MBTAV3API.Stop, params, opts)

@impl true
@decorate cacheable(cache: MBTAV3API.RepositoryCache, on_error: :nothing, opts: [ttl: @ttl])
def trips(params, opts \\ []), do: all(MBTAV3API.Trip, params, opts)

@spec all(module(), JsonApi.Params.t(), Keyword.t()) ::
boringcactus marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
6 changes: 6 additions & 0 deletions lib/mbta_v3_api/repository_cache.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
defmodule MBTAV3API.RepositoryCache do
@moduledoc """
Cache used to reduce the number of calls to the V3 API.
"""
use Nebulex.Cache, otp_app: :mobile_app_backend, adapter: Nebulex.Adapters.Local
end
1 change: 1 addition & 0 deletions lib/mobile_app_backend/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ defmodule MobileAppBackend.Application do
:default => [size: 200, count: 10, start_pool_metrics?: true]
}},
{MBTAV3API.ResponseCache, []},
{MBTAV3API.RepositoryCache, []},
MBTAV3API.Supervisor,
{MobileAppBackend.FinchPoolHealth, pool_name: Finch.CustomPool},
MobileAppBackend.MapboxTokenRotator,
Expand Down
79 changes: 63 additions & 16 deletions lib/mobile_app_backend_web/controllers/schedule_controller.ex
Original file line number Diff line number Diff line change
@@ -1,17 +1,34 @@
defmodule MobileAppBackendWeb.ScheduleController do
use MobileAppBackendWeb, :controller
require Logger
alias MBTAV3API.JsonApi
alias MBTAV3API.Repository

def schedules(conn, %{"stop_ids" => stop_ids, "date_time" => date_time}) do
if stop_ids == "" do
def schedules(conn, %{"stop_ids" => stop_ids_concat, "date_time" => date_time}) do
if stop_ids_concat == "" do
json(conn, %{schedules: [], trips: %{}})
else
{:ok, data} =
get_filter(stop_ids, date_time)
|> fetch_schedules()
stop_ids = String.split(stop_ids_concat, ",")

json(conn, data)
service_date = parse_service_date(date_time)

filters = Enum.map(stop_ids, &get_filter(&1, service_date))

data =
case filters do
[filter] -> fetch_schedules(filter)
filters -> fetch_schedules_parallel(filters)
end

case data do
:error ->
conn
|> put_status(:internal_server_error)
|> json(%{error: "fetch_failed"})

data ->
json(conn, data)
end
end
end

Expand Down Expand Up @@ -39,20 +56,50 @@ defmodule MobileAppBackendWeb.ScheduleController do
json(conn, response)
end

@spec get_filter(String.t(), String.t()) :: [JsonApi.Params.filter_param()]
defp get_filter(stop_ids, date_time) do
date_time = Util.parse_datetime!(date_time)
service_date = Util.datetime_to_gtfs(date_time)
[stop: stop_ids, date: service_date]
@spec parse_service_date(String.t()) :: Date.t()
defp parse_service_date(date_string) do
date_string
|> Util.parse_datetime!()
|> Util.datetime_to_gtfs()
end

@spec get_filter(String.t(), Date.t()) :: [JsonApi.Params.filter_param()]
defp get_filter(stop_id, service_date) do
[stop: stop_id, date: service_date]
end

@spec fetch_schedules_parallel([[JsonApi.Params.filter_param()]]) ::
%{schedules: [MBTAV3API.Schedule.t()], trips: JsonApi.Object.trip_map()} | :error
defp fetch_schedules_parallel(filters) do
filters
|> Task.async_stream(fn filter_params ->
{filter_params, fetch_schedules(filter_params)}
end)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to call this with ordered: false?

|> Enum.reduce_while(%{schedules: [], trips: %{}}, fn result, acc ->
case result do
{:ok, {_params, %{schedules: schedules, trips: trips}}} ->
{:cont, %{schedules: acc.schedules ++ schedules, trips: Map.merge(acc.trips, trips)}}
boringcactus marked this conversation as resolved.
Show resolved Hide resolved

{_result_type, {params, _response}} ->
Logger.warning(
"#{__MODULE__} skipped returning schedules due to error. params=#{inspect(params)}"
)

{:halt, :error}
end
end)
end

@spec fetch_schedules([JsonApi.Params.filter_param()]) ::
{:ok, %{schedules: [MBTAV3API.Schedule.t()], trips: JsonApi.Object.trip_map()}}
| {:error, term()}
%{schedules: [MBTAV3API.Schedule.t()], trips: JsonApi.Object.trip_map()}
| :error
defp fetch_schedules(filter) do
with {:ok, %{data: schedules, included: %{trips: trips}}} <-
Repository.schedules(filter: filter, include: :trip, sort: {:departure_time, :asc}) do
{:ok, %{schedules: schedules, trips: trips}}
case Repository.schedules(filter: filter, include: :trip, sort: {:departure_time, :asc}) do
{:ok, %{data: schedules, included: %{trips: trips}}} ->
%{schedules: schedules, trips: trips}

_ ->
:error
end
end
end
94 changes: 67 additions & 27 deletions load_testing/locustfile.py
Original file line number Diff line number Diff line change
@@ -1,85 +1,118 @@
import datetime
import random
from hashlib import sha256
from zoneinfo import ZoneInfo

import requests
from locust import HttpUser, between, events, task

from phoenix_channel import PhoenixChannel, PhoenixChannelUser

all_stop_ids: list[str] = list(map(lambda stop: stop["id"],requests.get(
all_station_ids: list[str] = list(map(lambda stop: stop["id"], requests.get(
"https://api-v3.mbta.com/stops",
{"fields[stop]": "latitude,longitude", "filter[location_type]": "0,1"},
{"fields[stop]": "id", "filter[location_type]": "1"},
).json()["data"]))

rail_stop_ids: list[str] = list(map(lambda stop: stop["id"], requests.get(
standalone_bus_stop_ids: list[str] = list(map(lambda stop: stop["id"],
filter(lambda stop: stop["relationships"]["parent_station"]["data"] is None, requests.get(
"https://api-v3.mbta.com/stops",
{"fields[stop]": "id", "filter[location_type]": "0", "filter[route_type]": "0,1"},
).json()["data"]))
{"fields[stop]": "id", "filter[location_type]": "0", "filter[route_type]": "3"},
).json()["data"])))

cr_stop_ids: list[str] = list(map(lambda stop: stop["id"], requests.get(
"https://api-v3.mbta.com/stops",
{"fields[stop]": "id", "filter[location_type]": "0", "filter[route_type]": "2"},
).json()["data"]))
all_stations_and_bus = all_station_ids + standalone_bus_stop_ids

bus_stop_ids: list[str] = list(map(lambda stop: stop["id"], requests.get(
"https://api-v3.mbta.com/stops",
{"fields[stop]": "id", "filter[location_type]": "0", "filter[route_type]": "3"},
).json()["data"]))

all_routes: list[dict] = requests.get(
"https://api-v3.mbta.com/routes",
{},
).json()["data"]

initial_global_headers = {}
initial_rail_headers = {}


@events.test_start.add_listener
def on_init(environment, **_kwargs):
# Assume some % of users have already loaded global data before.
# Fetch global + rail data once from target host to use as baseline etag headers for newly spawned users
host = environment.host
global initial_global_headers
global initial_rail_headers

initial_global_response = requests.get(f"{host}/api/global")
initial_global_headers = {}
if initial_global_response.status_code == 200:
initial_global_headers = {"if-none-match": sha256(initial_global_response.text.encode()).hexdigest()}

initial_rail_response = requests.get(f"{host}/api/shapes/map-friendly/rail")
initial_rail_headers = {}
if initial_rail_response.status_code == 200:
initial_rail_headers = {"if-none-match": sha256(initial_rail_response.text.encode()).hexdigest()}


@events.init_command_line_parser.add_listener
def _(parser):
parser.add_argument("--api-key", type=str, env_var="V3_API_KEY", default="", help="API Key for the V3 API. Set to avoid rate limiting.")

class MobileAppUser(HttpUser, PhoenixChannelUser):
wait_time = between(5, 20)
wait_time = between(5, 60)
socket_path = "/socket"

prob_reset_initial_load = 0.02
prob_reset_nearby_stops = 0.3
prob_filtered_stop_details = 0.76
prob_already_loaded_global = 0.8
prob_station = 0.6

location: dict | None = None
stop_id: str | None = None
nearby_stop_ids: list[str] | None = None
alerts_channel: PhoenixChannel | None = None
predictions_channel: PhoenixChannel | None = None
vehicles_channel: PhoenixChannel | None = None
did_initial_load = False
global_headers: dict = {}
rail_headers: dict = {}
v3_api_headers: dict = {}



v3_api_headers: dict = {}

def on_start(self):
self.v3_api_headers = {"x-api-key" : self.environment.parsed_options.api_key}

if random.random() < self.prob_already_loaded_global:
self.global_headers = initial_global_headers
self.rail_headers = initial_rail_headers

self.app_reload()

@task(1)
def app_reload(self):
self.client.get("/api/global")
self.client.get("/api/shapes/map-friendly/rail")
global_response = self.client.get("/api/global", headers=self.global_headers)
if global_response.status_code == 200:
self.global_headers = {"if-none-match": sha256(global_response.text.encode()).hexdigest()}

rail_response = self.client.get("/api/shapes/map-friendly/rail", headers=self.rail_headers)
if rail_response.status_code == 200:
self.rail_headers = {"if-none-match": sha256(rail_response.text.encode()).hexdigest()}

if self.alerts_channel is not None:
self.alerts_channel.leave()
self.alerts_channel = None

self.alerts_channel = self.socket.channel("alerts")
self.alerts_channel.join()

self.did_initial_load = True

def fetch_schedules_for_stops(self, stop_ids):
self.client.get(f'/api/schedules?stop_ids={stop_ids}&date_time={datetime.datetime.now().astimezone(ZoneInfo("America/New_York")).replace(microsecond=0).isoformat()}' , name="/api/schedules",)


@task(10)
def nearby_transit(self):
nearby_rail_ids = random.sample(rail_stop_ids, random.randint(2,8))
nearby_cr_ids = random.sample(cr_stop_ids, random.randint(0,14))
nearby_bus_ids = random.sample(bus_stop_ids, random.randint(0,14))
nearby_station_ids = random.sample(all_station_ids, random.randint(2,5))
nearby_bus_ids = random.sample(standalone_bus_stop_ids, random.randint(0,10))

self.nearby_stop_ids = nearby_rail_ids + nearby_cr_ids + nearby_bus_ids
self.nearby_stop_ids = nearby_station_ids + nearby_bus_ids
if (
self.predictions_channel is not None
and random.random() < self.prob_reset_nearby_stops
Expand All @@ -92,12 +125,19 @@ def nearby_transit(self):
f'predictions:stops:v2:{nearby_stops_concat}'
)
self.predictions_channel.join()

self.fetch_schedules_for_stops(self.nearby_stop_ids)



@task(5)
def stop_details(self):
self.stop_id = random.choice(all_stop_ids)
self.client.get(f'/api/schedules?stop_ids={self.stop_id}&date_time={datetime.datetime.now().astimezone(ZoneInfo("America/New_York")).replace(microsecond=0).isoformat()}' , name="/api/schedules",)
if random.random() < self.prob_station:
self.stop_id = random.choice(all_station_ids)
else:
self.stop_id = random.choice(standalone_bus_stop_ids)

self.fetch_schedules_for_stops([self.stop_id])
self.client.get(f'/api/stop/map?stop_id={self.stop_id}', name = "/api/stop/map")

if (
Expand Down Expand Up @@ -126,7 +166,7 @@ def stop_details(self):
@task(5)
def trip_details(self):
if self.stop_id is None:
self.stop_id = random.choice(all_stop_ids)
self.stop_id = random.choice(all_stations_and_bus)
predictions_for_stop = requests.get(
"https://api-v3.mbta.com/predictions",
params={"stop": self.stop_id}, headers=self.v3_api_headers).json()["data"]
Expand Down
1 change: 1 addition & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ defmodule MobileAppBackend.MixProject do
{:esbuild, "~> 0.7", runtime: Mix.env() == :dev},
{:tailwind, "~> 0.2.0", runtime: Mix.env() == :dev},
{:logster, "~> 1.1"},
{:decorator, "~> 1.4"},
{:diskusage_logger, "~> 0.2", only: :prod},
{:ehmon, github: "mbta/ehmon", only: :prod},
{:sobelow, "~> 0.13", only: [:dev, :test], runtime: false},
Expand Down
1 change: 1 addition & 0 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
"cowboy_telemetry": {:hex, :cowboy_telemetry, "0.4.0", "f239f68b588efa7707abce16a84d0d2acf3a0f50571f8bb7f56a15865aae820c", [:rebar3], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "7d98bac1ee4565d31b62d59f8823dfd8356a169e7fcbb83831b8a5397404c9de"},
"cowlib": {:hex, :cowlib, "2.13.0", "db8f7505d8332d98ef50a3ef34b34c1afddec7506e4ee4dd4a3a266285d282ca", [:make, :rebar3], [], "hexpm", "e1e1284dc3fc030a64b1ad0d8382ae7e99da46c3246b815318a4b848873800a4"},
"credo": {:hex, :credo, "1.7.9", "07bb31907746ae2b5e569197c9e16c0d75c8578a22f01bee63f212047efb2647", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "f87c11c34ba579f7c5044f02b2a807e1ed2fa5fdbb24dc7eb4ad59c1904887f3"},
"decorator": {:hex, :decorator, "1.4.0", "a57ac32c823ea7e4e67f5af56412d12b33274661bb7640ec7fc882f8d23ac419", [:mix], [], "hexpm", "0a07cedd9083da875c7418dea95b78361197cf2bf3211d743f6f7ce39656597f"},
"dialyxir": {:hex, :dialyxir, "1.4.4", "fb3ce8741edeaea59c9ae84d5cec75da00fa89fe401c72d6e047d11a61f65f70", [:mix], [{:erlex, ">= 0.2.7", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "cd6111e8017ccd563e65621a4d9a4a1c5cd333df30cebc7face8029cacb4eff6"},
"diskusage_logger": {:hex, :diskusage_logger, "0.2.0", "04fc48b538fe4de43153542a71ea94f623d54707d85844123baacfceedf625c3", [:mix], [], "hexpm", "e3f2aed1b0fc4590931c089a6453a4c4eb4c945912aa97bcabcc0cff7851f34d"},
"dns_cluster": {:hex, :dns_cluster, "0.1.3", "0bc20a2c88ed6cc494f2964075c359f8c2d00e1bf25518a6a6c7fd277c9b0c66", [:mix], [], "hexpm", "46cb7c4a1b3e52c7ad4cbe33ca5079fbde4840dedeafca2baf77996c2da1bc33"},
Expand Down
Loading
Loading