Skip to content

Commit

Permalink
feat: Make trip channel pull from ETS predictions (#235)
Browse files Browse the repository at this point in the history
* feat: Create new trip v2 channel

* test: Add tests for trip v2 predictions channel

* refactor: Replace existing channel

* fix: Address PR feedback

* fix: Add missing field to typespec

* fix: Fix type for dialyzer

* fix: Broken test
  • Loading branch information
EmmaSimon authored Nov 12, 2024
1 parent fdaf777 commit 4620ff2
Show file tree
Hide file tree
Showing 7 changed files with 247 additions and 246 deletions.
12 changes: 9 additions & 3 deletions .vscode/tasks.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
"version": "2.0.0",
"tasks": [
{
"label": "test",
"label": "credo --strict",
"type": "shell",
"command": "mix test",
"group": "test"
"command": "mix credo --strict",
"group": "build"
},
{
"label": "deps.compile",
Expand All @@ -18,6 +18,12 @@
"type": "shell",
"command": "mix deps.get",
"group": "build"
},
{
"label": "test",
"type": "shell",
"command": "mix test",
"group": "test"
}
]
}
61 changes: 58 additions & 3 deletions lib/mobile_app_backend/predictions/pub_sub.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,33 @@ defmodule MobileAppBackend.Predictions.PubSub.Behaviour do

@type predictions_for_stop :: %{Stop.id() => JsonApi.Object.full_map()}

@type subscribe_response :: %{
@type subscribe_stop_response :: %{
predictions_by_stop: %{Stop.id() => %{Prediction.id() => Prediction.t()}},
trips: %{Trip.id() => Trip.t()},
vehicles: %{Vehicle.id() => Vehicle.t()}
}

@type subscribe_trip_response ::
%{
trip_id: Trip.id(),
predictions: %{Prediction.id() => Prediction.t()},
trips: %{Trip.id() => Trip.t()},
vehicles: %{Vehicle.id() => Vehicle.t()}
}
| :error

@doc """
Subscribe to prediction updates for the given stop. For a parent station, this subscribes to updates for all child stops.
"""
@callback subscribe_for_stop(Stop.id()) :: subscribe_response()
@callback subscribe_for_stop(Stop.id()) :: subscribe_stop_response()
@doc """
Subscribe to prediction updates for multiple stops. For parent stations, this subscribes to updates for all their child stops.
"""
@callback subscribe_for_stops([Stop.id()]) :: subscribe_response()
@callback subscribe_for_stops([Stop.id()]) :: subscribe_stop_response()
@doc """
Subscribe to prediction updates for the given trip.
"""
@callback subscribe_for_trip(Trip.id()) :: subscribe_trip_response()
end

defmodule MobileAppBackend.Predictions.PubSub do
Expand Down Expand Up @@ -114,6 +127,34 @@ defmodule MobileAppBackend.Predictions.PubSub do
subscribe_for_stops([stop_id])
end

@impl true
def subscribe_for_trip(trip_id) do
case :timer.tc(MobileAppBackend.Predictions.StreamSubscriber, :subscribe_for_trip, [trip_id]) do
{time_micros, :ok} ->
Logger.info(
"#{__MODULE__} subscribe_for_trip trip_id=#{trip_id} duration=#{time_micros / 1000} "
)

predictions_data =
register_trip(trip_id)
|> Store.Predictions.fetch_with_associations()

%{
trip_id: trip_id,
predictions: predictions_data.predictions,
trips: predictions_data.trips,
vehicles: predictions_data.vehicles
}

{time_micros, :error} ->
Logger.warning(
"#{__MODULE__} failed to subscribe_for_trip trip_id=#{trip_id} duration=#{time_micros / 1000} "
)

:error
end
end

@spec group_predictions_for_stop(%{Prediction.id() => Prediction.t()}, [Stop.id()], %{
Stop.id() => [Stop.id()]
}) :: %{Stop.id() => %{Prediction.id() => Prediction.t()}}
Expand Down Expand Up @@ -187,6 +228,20 @@ defmodule MobileAppBackend.Predictions.PubSub do
fetch_keys
end

@spec register_trip(Trip.id()) :: Store.fetch_keys()
defp register_trip(trip_id) do
fetch_keys = [trip_id: trip_id]

{:ok, _owner} =
Registry.register(
MobileAppBackend.Predictions.Registry,
@fetch_registry_key,
{fetch_keys, fn data -> Map.put(data, :trip_id, trip_id) end}
)

fetch_keys
end

@impl GenServer
def init(opts \\ []) do
# Predictions are streamed from the V3 API by route,
Expand Down
29 changes: 28 additions & 1 deletion lib/mobile_app_backend/predictions/stream_subscriber.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,30 @@ defmodule MobileAppBackend.Predictions.StreamSubscriber do
"""
alias MBTAV3API.Stop
alias MBTAV3API.Trip
alias MobileAppBackend.Predictions.StreamSubscriber

@doc """
Ensure prediction streams have been started for every route served by the given stops
and the stream of all vehicles has been started.
"""
@callback subscribe_for_stops([Stop.id()]) :: :ok | :error

def subscribe_for_stops(stop_ids) do
Application.get_env(
:mobile_app_backend,
MobileAppBackend.Predictions.StreamSubscriber,
StreamSubscriber.Impl
).subscribe_for_stops(stop_ids)
end

@callback subscribe_for_trip(Trip.id()) :: :ok | :error
def subscribe_for_trip(trip_id) do
Application.get_env(
:mobile_app_backend,
MobileAppBackend.Predictions.StreamSubscriber,
StreamSubscriber.Impl
).subscribe_for_trip(trip_id)
end
end

defmodule MobileAppBackend.Predictions.StreamSubscriber.Impl do
Expand Down Expand Up @@ -53,4 +62,22 @@ defmodule MobileAppBackend.Predictions.StreamSubscriber.Impl do
:ok
end
end

@impl true
def subscribe_for_trip(trip_id) do
with {:ok, %{data: [trip]}} <- MBTAV3API.Repository.trips(filter: [id: trip_id]),
route_id <- trip.route_id,
{:ok, _data} <-
StaticInstance.ensure_stream_started("predictions:route:to_store:#{route_id}",
include_current_data: false
),
{:ok, _data} <-
StaticInstance.ensure_stream_started("vehicles:to_store", include_current_data: false) do
:ok
else
_ ->
Logger.warning("#{__MODULE__} failed to fetch trip from repository for #{trip_id}")
:error
end
end
end
87 changes: 30 additions & 57 deletions lib/mobile_app_backend_web/channels/predictions_for_trip_channel.ex
Original file line number Diff line number Diff line change
@@ -1,74 +1,47 @@
defmodule MobileAppBackendWeb.PredictionsForTripChannel do
use MobileAppBackendWeb, :channel

alias MBTAV3API.JsonApi
alias MBTAV3API.Prediction

@throttle_ms 500
require Logger

@impl true
def join("predictions:trip:" <> trip_id, _payload, socket) do
{:ok, throttler} =
MobileAppBackend.Throttler.start_link(
target: self(),
cast: :send_data,
ms: @throttle_ms
)

{:ok, %{data: [trip]}} = MBTAV3API.Repository.trips(filter: [id: trip_id])

route_id = trip.route_id

{:ok, data} = MBTAV3API.Stream.StaticInstance.subscribe("predictions:route:#{route_id}")
if trip_id == "" do
{:error, %{code: :no_trip_id}}
else
subscribe(trip_id, socket)
end
end

data = filter_data(data, trip_id)
defp subscribe(trip_id, socket) do
pubsub_module =
Application.get_env(
:mobile_app_backend,
MobileAppBackend.Predictions.PubSub,
MobileAppBackend.Predictions.PubSub
)

{:ok, data, assign(socket, data: data, trip_id: trip_id, throttler: throttler)}
end
case :timer.tc(fn -> pubsub_module.subscribe_for_trip(trip_id) end) do
{time_micros, :error} ->
Logger.warning("#{__MODULE__} failed join duration=#{time_micros / 1000}")
{:error, %{code: :subscribe_failed}}

@impl true
def handle_info({:stream_data, "predictions:route:" <> _route_id, data}, socket) do
old_data = socket.assigns.data
new_data = filter_data(data, socket.assigns.trip_id)
{time_micros, initial_data} ->
Logger.info("#{__MODULE__} join duration=#{time_micros / 1000}")

if old_data != new_data do
MobileAppBackend.Throttler.request(socket.assigns.throttler)
{:ok, initial_data, socket}
end

socket = assign(socket, data: new_data)
{:noreply, socket}
end

@impl true
def handle_cast(:send_data, socket) do
:ok = push(socket, "stream_data", socket.assigns.data)
{:noreply, socket}
end
@spec handle_info({:new_predictions, any()}, Phoenix.Socket.t()) ::
{:noreply, Phoenix.Socket.t()}
def handle_info({:new_predictions, new_predictions_for_trip}, socket) do
{time_micros, _result} =
:timer.tc(fn ->
:ok = push(socket, "stream_data", new_predictions_for_trip)
end)

@doc """
Filters the given data to predictions that are at one of the listed stops and the associated trips and vehicles.
"""
@spec filter_data(JsonApi.Object.full_map(), String.t()) :: JsonApi.Object.full_map()
def filter_data(route_data, trip_id) do
%{predictions: predictions, vehicle_ids: vehicle_ids} =
for {_, %Prediction{} = prediction} <- route_data.predictions,
reduce: %{predictions: %{}, vehicle_ids: []} do
%{predictions: predictions, vehicle_ids: vehicle_ids} ->
if prediction.trip_id == trip_id do
%{
predictions: Map.put(predictions, prediction.id, prediction),
vehicle_ids: [prediction.vehicle_id | vehicle_ids]
}
else
%{predictions: predictions, vehicle_ids: vehicle_ids}
end
end
Logger.info("#{__MODULE__} push duration=#{time_micros / 1000}")

%{
JsonApi.Object.to_full_map([])
| predictions: predictions,
trips: Map.take(route_data.trips, [trip_id]),
vehicles: Map.take(route_data.vehicles, vehicle_ids)
}
{:noreply, socket}
end
end
41 changes: 41 additions & 0 deletions test/mobile_app_backend/predictions/pub_sub_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,47 @@ defmodule MobileAppBackend.Predictions.PubSubTests do
end
end

describe "subscribe_for_trip/1" do
test "returns initial data for the given trip" do
prediction_1 =
build(:prediction, id: "p_1", stop_id: "12345", trip_id: "trip_1", vehicle_id: "v_1")

prediction_2 =
build(:prediction, id: "p_2", stop_id: "67890", trip_id: "trip_1", vehicle_id: "v_1")

trip_1 = build(:trip, id: "trip_1")
vehicle_1 = build(:vehicle, id: "v_1")

full_map =
JsonApi.Object.to_full_map([
prediction_1,
prediction_2,
trip_1,
vehicle_1
])

expect(PredictionsStoreMock, :fetch_with_associations, fn [trip_id: "trip_1"] ->
full_map
end)

expect(StreamSubscriberMock, :subscribe_for_trip, fn _ -> :ok end)

assert %{
trip_id: trip_1.id,
predictions: %{"p_1" => prediction_1, "p_2" => prediction_2},
trips: %{"trip_1" => trip_1},
vehicles: %{"v_1" => vehicle_1}
} == PubSub.subscribe_for_trip("trip_1")
end

@tag :capture_log
test "returns an error when subscriber fails" do
expect(StreamSubscriberMock, :subscribe_for_trip, fn _ -> :error end)

assert :error == PubSub.subscribe_for_trip("trip_1")
end
end

describe "handle_info" do
setup do
_dispatched_table = :ets.new(:test_last_dispatched, [:set, :named_table])
Expand Down
34 changes: 34 additions & 0 deletions test/mobile_app_backend/predictions/stream_subscriber_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ defmodule MobileAppBackend.Predictions.StreamSubscriberTest do
alias MobileAppBackend.Predictions.StreamSubscriber
import Mox
import Test.Support.Helpers
import MobileAppBackend.Factory

describe "subscribe_for_stops/1" do
setup do
Expand Down Expand Up @@ -40,4 +41,37 @@ defmodule MobileAppBackend.Predictions.StreamSubscriberTest do
StreamSubscriber.subscribe_for_stops([1, 2])
end
end

describe "subscribe_for_trip/1" do
setup do
verify_on_exit!()

reassign_env(
:mobile_app_backend,
MobileAppBackend.GlobalDataCache.Module,
GlobalDataCacheMock
)

reassign_env(:mobile_app_backend, MBTAV3API.Stream.StaticInstance, StaticInstanceMock)
reassign_env(:mobile_app_backend, MBTAV3API.Repository, RepositoryMock)
end

test "starts streams for to the routes served at the given stops and vehicles" do
trip = build(:trip, id: "trip", route_id: "66")

RepositoryMock
|> expect(:trips, fn _, _ -> {:ok, %{data: [trip]}} end)

StaticInstanceMock
|> expect(:ensure_stream_started, fn "predictions:route:to_store:66",
include_current_data: false ->
{:ok, :no_data}
end)
|> expect(:ensure_stream_started, fn "vehicles:to_store", include_current_data: false ->
{:ok, :no_data}
end)

StreamSubscriber.subscribe_for_trip("trip")
end
end
end
Loading

0 comments on commit 4620ff2

Please sign in to comment.