Skip to content

Commit

Permalink
refactor: common PubSub functions + macro (#244)
Browse files Browse the repository at this point in the history
* refactor: common PubSub functions + macro

* cleanup: remove unused broadcast_message_name opt

* refactor: broadcast_interval_ms as module attribute
  • Loading branch information
KaylaBrady authored Nov 21, 2024
1 parent 68ee20c commit aa4cebb
Show file tree
Hide file tree
Showing 5 changed files with 230 additions and 292 deletions.
116 changes: 16 additions & 100 deletions lib/mobile_app_backend/alerts/pub_sub.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,17 @@ defmodule MobileAppBackend.Alerts.PubSub do
1. Regularly scheduled interval - configured by `:alerts_broadcast_interval_ms`
2. When there is a reset event of the underlying alert stream.
"""
use GenServer
use MobileAppBackend.PubSub,
broadcast_interval_ms:
Application.compile_env(:mobile_app_backend, :alerts_broadcast_interval_ms, 500)

alias MBTAV3API.{JsonApi, Store, Stream}
alias MobileAppBackend.Alerts.PubSub

@behaviour PubSub.Behaviour

require Logger

@fetch_registry_key :fetch_registry_key

@typedoc """
tuple {fetch_keys, format_fn} where format_fn transforms the data returned
into the format expected by subscribers.
"""
@type registry_value :: {Store.fetch_keys(), function()}

@type state :: %{last_dispatched_table_name: atom()}

@spec start_link(Keyword.t()) :: GenServer.on_start()
Expand Down Expand Up @@ -66,7 +61,6 @@ defmodule MobileAppBackend.Alerts.PubSub do
@impl GenServer
def init(opts \\ []) do
Stream.StaticInstance.subscribe("alerts:to_store")

broadcast_timer(50)

create_table_fn =
Expand All @@ -78,102 +72,24 @@ defmodule MobileAppBackend.Alerts.PubSub do
create_table_fn.()
end

@impl true
# Any time there is a reset_event, broadcast so that subscribers are immediately
# notified of the changes. This way, when the stream first starts,
# consumers don't have to wait `:alerts_broadcast_interval_ms` to receive their first message.
def handle_info(:reset_event, state) do
send(self(), :broadcast)
{:noreply, state, :hibernate}
end

def handle_info(:timed_broadcast, state) do
send(self(), :broadcast)
broadcast_timer()
{:noreply, state, :hibernate}
end

@impl GenServer
def handle_info(:broadcast, %{last_dispatched_table_name: last_dispatched} = state) do
Registry.dispatch(MobileAppBackend.Alerts.Registry, @fetch_registry_key, fn entries ->
Enum.group_by(
entries,
fn {_, {fetch_keys, format_fn}} -> {fetch_keys, format_fn} end,
fn {pid, _} -> pid end
)
|> Enum.each(fn {registry_value, pids} ->
broadcast_new_alerts(registry_value, pids, last_dispatched)
entries
|> MobileAppBackend.PubSub.group_pids_by_target_data()
|> Enum.each(fn {{fetch_keys, format_fn} = registry_value, pids} ->
fetch_keys
|> Store.Alerts.fetch()
|> format_fn.()
|> MobileAppBackend.PubSub.broadcast_latest_data(
:new_alerts,
registry_value,
pids,
last_dispatched
)
end)
end)

{:noreply, state, :hibernate}
end

defp broadcast_new_alerts(
{fetch_keys, format_fn} = registry_value,
pids,
last_dispatched_table_name
) do
latest_data =
fetch_keys
|> Store.Alerts.fetch()
|> format_fn.()

last_dispatched_entry = :ets.lookup(last_dispatched_table_name, registry_value)

if !already_broadcast(last_dispatched_entry, latest_data) do
broadcast(pids, latest_data, registry_value, last_dispatched_table_name)
end
end

defp broadcast(
pids,
data,
{fetch_keys, _format_fn} = registry_value,
last_dispatched_table_name
) do
Logger.info("#{__MODULE__} broadcasting to pids len=#{length(pids)}")

{time_micros, _result} =
:timer.tc(__MODULE__, :broadcast_to_pids, [
pids,
data
])

Logger.info(
"#{__MODULE__} broadcast_to_pids fetch_keys=#{inspect(fetch_keys)} duration=#{time_micros / 1000}"
)

:ets.insert(last_dispatched_table_name, {registry_value, data})
end

defp already_broadcast([], _latest_data) do
# Nothing has been broadcast yet
false
end

defp already_broadcast([{_registry_key, old_data}], latest_data) do
old_data == latest_data
end

def broadcast_to_pids(pids, data) do
Enum.each(
pids,
&send(
&1,
{:new_alerts, data}
)
)
end

defp broadcast_timer do
interval =
Application.get_env(:mobile_app_backend, :alerts_broadcast_interval_ms, 500)

broadcast_timer(interval)
end

defp broadcast_timer(interval) do
Process.send_after(self(), :timed_broadcast, interval)
end
end
111 changes: 18 additions & 93 deletions lib/mobile_app_backend/predictions/pub_sub.ex
Original file line number Diff line number Diff line change
Expand Up @@ -45,22 +45,20 @@ defmodule MobileAppBackend.Predictions.PubSub do
Based on https://github.com/mbta/dotcom/blob/main/lib/predictions/pub_sub.ex
"""
use GenServer
use MobileAppBackend.PubSub,
broadcast_interval_ms:
Application.compile_env(:mobile_app_backend, :predictions_broadcast_interval_ms, 10_000)

alias MBTAV3API.{Prediction, Stop, Store, Stream, Trip, Vehicle}
alias MobileAppBackend.GlobalDataCache
alias MobileAppBackend.Predictions.PubSub

@behaviour PubSub.Behaviour

require Logger

@behaviour PubSub.Behaviour

@fetch_registry_key :fetch_registry_key

@typedoc """
tuple {fetch_keys, format_fn} where format_fn transforms the data returned
from fetching predictions from the store into the format expected by subscribers.
"""
@type registry_value :: {Store.fetch_keys(), function()}
@type broadcast_message ::
{:new_predictions,
%{
Expand Down Expand Up @@ -260,97 +258,24 @@ defmodule MobileAppBackend.Predictions.PubSub do
create_table_fn.()
end

@impl true
# Any time there is a reset_event, broadcast so that subscribers are immediately
# notified of the changes. This way, when a prediction stream first starts,
# consumers don't have to wait `:predictions_broadcast_interval_ms` to receive their first message.
def handle_info(:reset_event, state) do
send(self(), :broadcast)
{:noreply, state, :hibernate}
end

def handle_info(:timed_broadcast, state) do
send(self(), :broadcast)
broadcast_timer()
{:noreply, state, :hibernate}
end

@impl GenServer
def handle_info(:broadcast, %{last_dispatched_table_name: last_dispatched} = state) do
Registry.dispatch(MobileAppBackend.Predictions.Registry, @fetch_registry_key, fn entries ->
Enum.group_by(
entries,
fn {_, {fetch_keys, format_fn}} -> {fetch_keys, format_fn} end,
fn {pid, {_, _}} -> pid end
)
|> Enum.each(fn {registry_value, pids} ->
broadcast_new_predictions(registry_value, pids, last_dispatched)
entries
|> MobileAppBackend.PubSub.group_pids_by_target_data()
|> Enum.each(fn {{fetch_keys, format_fn} = registry_value, pids} ->
fetch_keys
|> Store.Predictions.fetch_with_associations()
|> format_fn.()
|> MobileAppBackend.PubSub.broadcast_latest_data(
:new_predictions,
registry_value,
pids,
last_dispatched
)
end)
end)

{:noreply, state, :hibernate}
end

defp broadcast_new_predictions(
{fetch_keys, format_fn} = registry_value,
pids,
last_dispatched_table_name
) do
new_predictions =
fetch_keys
|> Store.Predictions.fetch_with_associations()
|> format_fn.()

last_dispatched_entry = :ets.lookup(last_dispatched_table_name, registry_value)

if !predictions_already_broadcast(last_dispatched_entry, new_predictions) do
broadcast_predictions(pids, new_predictions, registry_value, last_dispatched_table_name)
end
end

defp broadcast_predictions(pids, predictions, registry_value, last_dispatched_table_name) do
Logger.info("#{__MODULE__} broadcasting to pids len=#{length(pids)}")

{time_micros, _result} =
:timer.tc(__MODULE__, :broadcast_to_pids, [
pids,
predictions
])

Logger.info(
"#{__MODULE__} broadcast_to_pids fetch_keys=#{inspect(elem(registry_value, 0))}duration=#{time_micros / 1000}"
)

:ets.insert(last_dispatched_table_name, {registry_value, predictions})
end

defp predictions_already_broadcast([], _new_preidctions) do
# Nothing has been broadcast yet
false
end

defp predictions_already_broadcast([{_registry_key, last_predictions}], new_predictions) do
last_predictions == new_predictions
end

def broadcast_to_pids(pids, predictions) do
Enum.each(
pids,
&send(
&1,
{:new_predictions, predictions}
)
)
end

defp broadcast_timer do
interval =
Application.get_env(:mobile_app_backend, :predictions_broadcast_interval_ms, 10_000)

broadcast_timer(interval)
end

defp broadcast_timer(interval) do
Process.send_after(self(), :timed_broadcast, interval)
end
end
Loading

0 comments on commit aa4cebb

Please sign in to comment.