Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: common PubSub functions + macro #244

Merged
merged 5 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 16 additions & 100 deletions lib/mobile_app_backend/alerts/pub_sub.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,17 @@ defmodule MobileAppBackend.Alerts.PubSub do
1. Regularly scheduled interval - configured by `:alerts_broadcast_interval_ms`
2. When there is a reset event of the underlying alert stream.
"""
use GenServer
use MobileAppBackend.PubSub,
broadcast_interval_ms:
Application.compile_env(:mobile_app_backend, :alerts_broadcast_interval_ms, 500)

alias MBTAV3API.{JsonApi, Store, Stream}
alias MobileAppBackend.Alerts.PubSub

@behaviour PubSub.Behaviour

require Logger

@fetch_registry_key :fetch_registry_key

@typedoc """
tuple {fetch_keys, format_fn} where format_fn transforms the data returned
into the format expected by subscribers.
"""
@type registry_value :: {Store.fetch_keys(), function()}

@type state :: %{last_dispatched_table_name: atom()}

@spec start_link(Keyword.t()) :: GenServer.on_start()
Expand Down Expand Up @@ -66,7 +61,6 @@ defmodule MobileAppBackend.Alerts.PubSub do
@impl GenServer
def init(opts \\ []) do
Stream.StaticInstance.subscribe("alerts:to_store")

broadcast_timer(50)

create_table_fn =
Expand All @@ -78,102 +72,24 @@ defmodule MobileAppBackend.Alerts.PubSub do
create_table_fn.()
end

@impl true
# Any time there is a reset_event, broadcast so that subscribers are immediately
# notified of the changes. This way, when the stream first starts,
# consumers don't have to wait `:alerts_broadcast_interval_ms` to receive their first message.
def handle_info(:reset_event, state) do
send(self(), :broadcast)
{:noreply, state, :hibernate}
end

def handle_info(:timed_broadcast, state) do
send(self(), :broadcast)
broadcast_timer()
{:noreply, state, :hibernate}
end

@impl GenServer
def handle_info(:broadcast, %{last_dispatched_table_name: last_dispatched} = state) do
Registry.dispatch(MobileAppBackend.Alerts.Registry, @fetch_registry_key, fn entries ->
Enum.group_by(
entries,
fn {_, {fetch_keys, format_fn}} -> {fetch_keys, format_fn} end,
fn {pid, _} -> pid end
)
|> Enum.each(fn {registry_value, pids} ->
broadcast_new_alerts(registry_value, pids, last_dispatched)
entries
|> MobileAppBackend.PubSub.group_pids_by_target_data()
|> Enum.each(fn {{fetch_keys, format_fn} = registry_value, pids} ->
fetch_keys
|> Store.Alerts.fetch()
|> format_fn.()
|> MobileAppBackend.PubSub.broadcast_latest_data(
:new_alerts,
registry_value,
pids,
last_dispatched
)
end)
end)

{:noreply, state, :hibernate}
end

defp broadcast_new_alerts(
{fetch_keys, format_fn} = registry_value,
pids,
last_dispatched_table_name
) do
latest_data =
fetch_keys
|> Store.Alerts.fetch()
|> format_fn.()

last_dispatched_entry = :ets.lookup(last_dispatched_table_name, registry_value)

if !already_broadcast(last_dispatched_entry, latest_data) do
broadcast(pids, latest_data, registry_value, last_dispatched_table_name)
end
end

defp broadcast(
pids,
data,
{fetch_keys, _format_fn} = registry_value,
last_dispatched_table_name
) do
Logger.info("#{__MODULE__} broadcasting to pids len=#{length(pids)}")

{time_micros, _result} =
:timer.tc(__MODULE__, :broadcast_to_pids, [
pids,
data
])

Logger.info(
"#{__MODULE__} broadcast_to_pids fetch_keys=#{inspect(fetch_keys)} duration=#{time_micros / 1000}"
)

:ets.insert(last_dispatched_table_name, {registry_value, data})
end

defp already_broadcast([], _latest_data) do
# Nothing has been broadcast yet
false
end

defp already_broadcast([{_registry_key, old_data}], latest_data) do
old_data == latest_data
end

def broadcast_to_pids(pids, data) do
Enum.each(
pids,
&send(
&1,
{:new_alerts, data}
)
)
end

defp broadcast_timer do
interval =
Application.get_env(:mobile_app_backend, :alerts_broadcast_interval_ms, 500)

broadcast_timer(interval)
end

defp broadcast_timer(interval) do
Process.send_after(self(), :timed_broadcast, interval)
end
end
111 changes: 18 additions & 93 deletions lib/mobile_app_backend/predictions/pub_sub.ex
Original file line number Diff line number Diff line change
Expand Up @@ -45,22 +45,20 @@ defmodule MobileAppBackend.Predictions.PubSub do

Based on https://github.com/mbta/dotcom/blob/main/lib/predictions/pub_sub.ex
"""
use GenServer
use MobileAppBackend.PubSub,
broadcast_interval_ms:
Application.compile_env(:mobile_app_backend, :predictions_broadcast_interval_ms, 10_000)

alias MBTAV3API.{Prediction, Stop, Store, Stream, Trip, Vehicle}
alias MobileAppBackend.GlobalDataCache
alias MobileAppBackend.Predictions.PubSub

@behaviour PubSub.Behaviour

require Logger

@behaviour PubSub.Behaviour

@fetch_registry_key :fetch_registry_key

@typedoc """
tuple {fetch_keys, format_fn} where format_fn transforms the data returned
from fetching predictions from the store into the format expected by subscribers.
"""
@type registry_value :: {Store.fetch_keys(), function()}
@type broadcast_message ::
{:new_predictions,
%{
Expand Down Expand Up @@ -260,97 +258,24 @@ defmodule MobileAppBackend.Predictions.PubSub do
create_table_fn.()
end

@impl true
# Any time there is a reset_event, broadcast so that subscribers are immediately
# notified of the changes. This way, when a prediction stream first starts,
# consumers don't have to wait `:predictions_broadcast_interval_ms` to receive their first message.
def handle_info(:reset_event, state) do
send(self(), :broadcast)
{:noreply, state, :hibernate}
end

def handle_info(:timed_broadcast, state) do
send(self(), :broadcast)
broadcast_timer()
{:noreply, state, :hibernate}
end

@impl GenServer
def handle_info(:broadcast, %{last_dispatched_table_name: last_dispatched} = state) do
Registry.dispatch(MobileAppBackend.Predictions.Registry, @fetch_registry_key, fn entries ->
Enum.group_by(
entries,
fn {_, {fetch_keys, format_fn}} -> {fetch_keys, format_fn} end,
fn {pid, {_, _}} -> pid end
)
|> Enum.each(fn {registry_value, pids} ->
broadcast_new_predictions(registry_value, pids, last_dispatched)
entries
|> MobileAppBackend.PubSub.group_pids_by_target_data()
|> Enum.each(fn {{fetch_keys, format_fn} = registry_value, pids} ->
fetch_keys
|> Store.Predictions.fetch_with_associations()
|> format_fn.()
|> MobileAppBackend.PubSub.broadcast_latest_data(
:new_predictions,
registry_value,
pids,
last_dispatched
)
end)
end)

{:noreply, state, :hibernate}
end

defp broadcast_new_predictions(
{fetch_keys, format_fn} = registry_value,
pids,
last_dispatched_table_name
) do
new_predictions =
fetch_keys
|> Store.Predictions.fetch_with_associations()
|> format_fn.()

last_dispatched_entry = :ets.lookup(last_dispatched_table_name, registry_value)

if !predictions_already_broadcast(last_dispatched_entry, new_predictions) do
broadcast_predictions(pids, new_predictions, registry_value, last_dispatched_table_name)
end
end

defp broadcast_predictions(pids, predictions, registry_value, last_dispatched_table_name) do
Logger.info("#{__MODULE__} broadcasting to pids len=#{length(pids)}")

{time_micros, _result} =
:timer.tc(__MODULE__, :broadcast_to_pids, [
pids,
predictions
])

Logger.info(
"#{__MODULE__} broadcast_to_pids fetch_keys=#{inspect(elem(registry_value, 0))}duration=#{time_micros / 1000}"
)

:ets.insert(last_dispatched_table_name, {registry_value, predictions})
end

defp predictions_already_broadcast([], _new_preidctions) do
# Nothing has been broadcast yet
false
end

defp predictions_already_broadcast([{_registry_key, last_predictions}], new_predictions) do
last_predictions == new_predictions
end

def broadcast_to_pids(pids, predictions) do
Enum.each(
pids,
&send(
&1,
{:new_predictions, predictions}
)
)
end

defp broadcast_timer do
interval =
Application.get_env(:mobile_app_backend, :predictions_broadcast_interval_ms, 10_000)

broadcast_timer(interval)
end

defp broadcast_timer(interval) do
Process.send_after(self(), :timed_broadcast, interval)
end
end
Loading
Loading