Skip to content

Commit

Permalink
feat: track last train of the day (#2191)
Browse files Browse the repository at this point in the history
Tracks trips marked as `last_trip: true` and recent departures from all
route-direction-stop tuples.

Fetches TripUpdates and VehiclePositions data from concentrate S3 bucket
to populate two caches (one for last trips and one for a list of recent
departures from a stop on a given route going a direction).

Both caches have TTLs of one hour so old trips and departures will fall
off over time. Recent departures are a special case because they are
tracked as a list of recent departures. Departures (tuples of trip_id
and departure time in unix epoch) are upserted into the cache key for a
given RDS. During update departures more than one hour in the past are
removed from the list for that RDS.

Caches are reset every day after 3:30am EST.

> [!NOTE]
> The realtime_signs code uses `If-Modified-Since` and `Last-Modified`
> headers presumably in an attempt to reduce unnecessary API requests. I
> added support for that behavior into this flow as well but because the
> requested data changes so frequently it almost _always_ had to fetch
> all of the data. I've removed that functionality to keep the code
> simpler given that there is almost no benefit to tracking those time
> stamps / previous responses.

**Asana Task:** [Start tracking LTOTD state for use by screens][task]

[task]: https://app.asana.com/0/1185117109217413/1207574972639078/f
  • Loading branch information
sloanelybutsurely authored Sep 19, 2024
1 parent f8ff68a commit 6f99840
Show file tree
Hide file tree
Showing 21 changed files with 1,316 additions and 1 deletion.
6 changes: 6 additions & 0 deletions .envrc.template
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,9 @@ export API_V3_URL="https://api-v3.mbta.com"
# export AWS_SECRET_ACCESS_KEY=

export SECRET_KEY_BASE="local_secret_key_base_at_least_64_bytes_________________________________"

# # Should point to the Enhanced JSON feeds. Default values are provided, only
# # set these if you need something different.
# # https://github.com/mbta/gtfs-documentation/blob/master/reference/gtfs-realtime.md#enhanced-json-feeds
# export TRIP_UPDATES_URL=
# export VEHICLE_POSITIONS_URL=
4 changes: 4 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,10 @@ config :screens, Screens.ScreenApiResponseCache,
gc_interval: :timer.hours(1),
allocated_memory: 250_000_000

config :screens, Screens.LastTrip,
trip_updates_adapter: Screens.LastTrip.TripUpdates.GTFS,
vehicle_positions_adapter: Screens.LastTrip.VehiclePositions.GTFS

# Import environment specific config. This must remain at the bottom
# of this file so it overrides the configuration defined above.
import_config "#{Mix.env()}.exs"
12 changes: 11 additions & 1 deletion config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,17 @@ import Config
unless config_env() == :test do
config :screens,
api_v3_url: System.get_env("API_V3_URL", "https://api-v3.mbta.com/"),
api_v3_key: System.get_env("API_V3_KEY")
api_v3_key: System.get_env("API_V3_KEY"),
trip_updates_url:
System.get_env(
"TRIP_UPDATES_URL",
"https://cdn.mbta.com/realtime/TripUpdates_enhanced.json"
),
vehicle_positions_url:
System.get_env(
"VEHICLE_POSITIONS_URL",
"https://cdn.mbta.com/realtime/VehiclePositions_enhanced.json"
)
end

if config_env() == :prod do
Expand Down
4 changes: 4 additions & 0 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -164,3 +164,7 @@ config :screens, Screens.V2.ScreenData,
parameters_module: Screens.V2.ScreenData.MockParameters

config :screens, Screens.V2.CandidateGenerator.DupNew, stop_module: Screens.Stops.MockStop

config :screens, Screens.LastTrip,
trip_updates_adapter: Screens.LastTrip.TripUpdates.Noop,
vehicle_positions_adapter: Screens.LastTrip.VehiclePositions.Noop
1 change: 1 addition & 0 deletions lib/screens/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ defmodule Screens.Application do
# Task supervisor for parallel running of candidate generator variants
{Task.Supervisor, name: Screens.V2.ScreenData.ParallelRunSupervisor},
{Screens.ScreenApiResponseCache, []},
Screens.LastTrip,
{Phoenix.PubSub, name: ScreensWeb.PubSub},
ScreensWeb.Endpoint
]
Expand Down
41 changes: 41 additions & 0 deletions lib/screens/last_trip.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
defmodule Screens.LastTrip do
@moduledoc """
Supervisor and public interface for fetching information about the last trips
of the day (AKA Last Train of the Day, LTOTD).
"""
alias Screens.LastTrip.Cache
alias Screens.LastTrip.Poller
use Supervisor

@spec start_link(any()) :: Supervisor.on_start()
def start_link(_) do
Supervisor.start_link(__MODULE__, [], name: __MODULE__)
end

@impl true
def init(_) do
children = [
Cache.LastTrips,
Cache.RecentDepartures,
Poller
]

Supervisor.init(children, strategy: :one_for_one)
end

@spec last_trip?(trip_id :: String.t()) :: boolean()
defdelegate last_trip?(trip_id), to: Cache

@spec service_ended_for_rds?(Cache.rds()) :: boolean()
def service_ended_for_rds?({_r, _d, _s} = rds, now_fn \\ &DateTime.utc_now/0) do
now_unix = now_fn.() |> DateTime.to_unix()

rds
|> Cache.get_recent_departures()
|> Enum.any?(fn {trip_id, departure_time_unix} ->
seconds_since_departure = now_unix - departure_time_unix

seconds_since_departure > 3 and last_trip?(trip_id)
end)
end
end
84 changes: 84 additions & 0 deletions lib/screens/last_trip/cache.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
defmodule Screens.LastTrip.Cache do
@moduledoc """
Public interface into the caches that back last trip (LTOTD) tracking
"""
alias Screens.LastTrip.Cache.LastTrips
alias Screens.LastTrip.Cache.RecentDepartures

@type rds :: {route_id :: String.t(), direction_id :: 0 | 1, stop_id :: String.t()}
@type departing_trip :: {trip_id :: String.t(), departure_time_unix :: integer()}

@last_trips_ttl :timer.hours(1)
@recent_departures_ttl :timer.hours(1)

@spec update_last_trips(
last_trip_entries :: [{trip_id :: LastTrips.key(), last_trip? :: LastTrips.value()}]
) ::
:ok
def update_last_trips(last_trips) do
LastTrips.put_all(last_trips, ttl: @last_trips_ttl)

:ok
end

@spec update_recent_departures(recent_departures :: %{rds() => [departing_trip()]}) :: :ok
def update_recent_departures(recent_departures, now_fn \\ &DateTime.utc_now/0) do
expiration = now_fn.() |> DateTime.add(-1, :hour) |> DateTime.to_unix()

for {rds, departures} <- recent_departures do
RecentDepartures.update(
rds,
departures,
&merge_and_expire_departures(&1, departures, expiration),
ttl: @recent_departures_ttl
)
end

:ok
end

def merge_and_expire_departures(existing_departures, departures, expiration) do
existing_departures =
existing_departures
|> only_latest_departures()
|> Map.new()

departures =
departures
|> only_latest_departures()
|> Map.new()

existing_departures
|> Map.merge(departures)
|> Enum.reject(fn {_, departure_time} -> departure_time <= expiration end)
end

@spec last_trip?(trip_id :: String.t()) :: boolean()
def last_trip?(trip_id) do
LastTrips.get(trip_id) == true
end

@spec get_recent_departures(rds()) :: [departing_trip()]
def get_recent_departures({_r, _d, _s} = rds) do
rds
|> RecentDepartures.get()
|> List.wrap()
end

@spec reset() :: :ok
def reset do
LastTrips.delete_all()
RecentDepartures.delete_all()

:ok
end

defp only_latest_departures(departures) do
# Only take the latest departure time for each trip
departures
|> Enum.group_by(&elem(&1, 0), &elem(&1, 1))
|> Enum.map(fn {trip_id, departure_times} ->
{trip_id, Enum.max(departure_times)}
end)
end
end
11 changes: 11 additions & 0 deletions lib/screens/last_trip/cache/last_trips.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
defmodule Screens.LastTrip.Cache.LastTrips do
@moduledoc """
Cache of Trip IDs (`t:key/0`) where `last_trip` was `true` (`t:value/0`).
"""
use Nebulex.Cache,
otp_app: :screens,
adapter: Nebulex.Adapters.Local

@type key :: trip_id :: String.t()
@type value :: last_trip? :: true
end
13 changes: 13 additions & 0 deletions lib/screens/last_trip/cache/recent_departures.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
defmodule Screens.LastTrip.Cache.RecentDepartures do
@moduledoc """
Cache of recent departures keyed by route-direction-stop tuple (`t:key/0`).
Values are trip id departure time tuples (`t:value/0`).
"""
use Nebulex.Cache,
otp_app: :screens,
adapter: Nebulex.Adapters.Local

@type key :: Screens.LastTrip.Cache.rds()
@type value :: [Screens.LastTrip.Cache.departing_trip()]
end
53 changes: 53 additions & 0 deletions lib/screens/last_trip/parser.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
defmodule Screens.LastTrip.Parser do
@moduledoc """
Functions to parse relevant data from TripUpdate and VehiclePositions maps.
Used by `Screens.LastTrip.Poller`.
"""
alias Screens.LastTrip.Cache.RecentDepartures

@spec get_running_trips(trip_updates_enhanced_json :: map()) :: [trip_update_json :: map()]
def get_running_trips(trip_updates_enhanced_json) do
trip_updates_enhanced_json["entity"]
|> Stream.map(& &1["trip_update"])
|> Enum.reject(&(&1["trip"]["schedule_relationship"] == "CANCELED"))
end

@spec get_last_trips(trip_updates_enhanced_json :: map()) :: [trip_id :: String.t()]
def get_last_trips(trip_updates_enhanced_json) do
trip_updates_enhanced_json
|> get_running_trips()
|> Enum.filter(&(&1["trip"]["last_trip"] == true))
|> Enum.map(& &1["trip"]["trip_id"])
end

@spec get_recent_departures(
trip_updates_enhanced_json :: map(),
vehicle_positions_enhanced_json :: map()
) :: %{RecentDepartures.key() => RecentDepartures.value()}
def get_recent_departures(trip_updates_enhanced_json, vehicle_positions_enhanced_json) do
vehicle_positions_by_id =
Map.new(vehicle_positions_enhanced_json["entity"], &{&1["id"], &1["vehicle"]})

running_trips = get_running_trips(trip_updates_enhanced_json)

for %{"vehicle" => %{"id" => vehicle_id}} = trip <- running_trips,
stop_time_update <- trip["stop_time_update"] do
vehicle_position = vehicle_positions_by_id[vehicle_id]

departure_time = stop_time_update["departure"]["time"]

if vehicle_position["stop_id"] == stop_time_update["stop_id"] and
vehicle_position["current_status"] == "STOPPED_AT" and not is_nil(departure_time) do
rds =
{trip["trip"]["route_id"], trip["trip"]["direction_id"], stop_time_update["stop_id"]}

trip_id = trip["trip"]["trip_id"]

{rds, {trip_id, departure_time}}
end
end
|> Enum.reject(&is_nil/1)
|> Enum.group_by(&elem(&1, 0), &elem(&1, 1))
end
end
86 changes: 86 additions & 0 deletions lib/screens/last_trip/poller.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
defmodule Screens.LastTrip.Poller do
@moduledoc """
GenServer that polls predictions to calculate the last trip of the day
"""
alias Screens.LastTrip.Cache
alias Screens.LastTrip.Parser
alias Screens.LastTrip.TripUpdates
alias Screens.LastTrip.VehiclePositions
use GenServer

defstruct [:next_reset]

@polling_interval :timer.seconds(1)

def start_link(_) do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
end

@impl true
def init(_) do
state = %__MODULE__{next_reset: next_reset()}

send(self(), :poll)

{:ok, state}
end

@impl true
def handle_info(:poll, %__MODULE__{} = state) do
state =
if DateTime.after?(now(), state.next_reset) do
:ok = Cache.reset()
%{state | next_reset: next_reset()}
else
{:ok,
%{
trip_updates: trip_updates,
vehicle_positions: vehicle_positions
}} = fetch_trip_updates_and_vehicle_positions()

update_last_trips(trip_updates)
update_recent_departures(trip_updates, vehicle_positions)

state
end

Process.send_after(self(), :poll, @polling_interval)

{:noreply, state}
end

defp fetch_trip_updates_and_vehicle_positions do
with {:ok, %{status_code: 200, body: trip_updates}} <- TripUpdates.get(),
{:ok, %{status_code: 200, body: vehicle_positions}} <- VehiclePositions.get() do
{:ok,
%{
trip_updates: trip_updates,
vehicle_positions: vehicle_positions
}}
end
end

defp update_last_trips(trip_updates) do
trip_updates
|> Parser.get_last_trips()
|> Enum.map(&{&1, true})
|> Cache.update_last_trips()
end

defp update_recent_departures(trip_updates, vehicle_positions) do
trip_updates
|> Parser.get_recent_departures(vehicle_positions)
|> Cache.update_recent_departures()
end

defp now(now_fn \\ &DateTime.utc_now/0) do
now_fn.() |> DateTime.shift_zone!("America/New_York")
end

defp next_reset do
now()
|> DateTime.add(1, :day)
|> DateTime.to_date()
|> DateTime.new!(~T[03:30:00], "America/New_York")
end
end
10 changes: 10 additions & 0 deletions lib/screens/last_trip/trip_updates.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
defmodule Screens.LastTrip.TripUpdates do
@moduledoc """
Behaviour and proxying module for fetching trip updates
"""
@adapter Application.compile_env!(:screens, [Screens.LastTrip, :trip_updates_adapter])

@callback get() :: {:ok, map()} | {:error, term()}

defdelegate get, to: @adapter
end
17 changes: 17 additions & 0 deletions lib/screens/last_trip/trip_updates/gtfs.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
defmodule Screens.LastTrip.TripUpdates.GTFS do
@moduledoc """
Screens.LastTrip.TripUpdates adapter that fetches trip updates from
GTFS
"""
@behaviour Screens.LastTrip.TripUpdates

@impl true
def get do
trip_updates_url = Application.fetch_env!(:screens, :trip_updates_url)

with {:ok, %{body: body} = response} <- HTTPoison.get(trip_updates_url),
{:ok, decoded} <- Jason.decode(body) do
{:ok, %{response | body: decoded}}
end
end
end
9 changes: 9 additions & 0 deletions lib/screens/last_trip/trip_updates/noop.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
defmodule Screens.LastTrip.TripUpdates.Noop do
@moduledoc "Noop TripUpdates adapter for testing"
@behaviour Screens.LastTrip.TripUpdates

@impl true
def get do
{:ok, %{status_code: 200, body: %{"entity" => []}}}
end
end
10 changes: 10 additions & 0 deletions lib/screens/last_trip/vehicle_positions.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
defmodule Screens.LastTrip.VehiclePositions do
@moduledoc """
Behaviour and proxying module for fetching vehicle positions
"""
@adapter Application.compile_env!(:screens, [Screens.LastTrip, :vehicle_positions_adapter])

@callback get() :: {:ok, map()} | {:error, term()}

defdelegate get, to: @adapter
end
Loading

0 comments on commit 6f99840

Please sign in to comment.