Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: in-memory Alert storage #2121

Merged
merged 1 commit into from
Aug 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions lib/screens/alerts/cache.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
defmodule Screens.Alerts.Cache do
@moduledoc """
GenStage Consumer of Alert server sent event data
"""
use GenStage

require Logger

alias Screens.Alerts
alias ServerSentEventStage.Event

@table __MODULE__

defstruct [:table]

def start_link(opts) do
{name, init_arg} = Keyword.pop(opts, :name, __MODULE__)
GenStage.start_link(__MODULE__, init_arg, name: name)
end

@impl true
def init(init_arg) do
subscribe_to = Keyword.get(init_arg, :subscribe_to, [Screens.Streams.Alerts.Producer])

table = @table

^table =
:ets.new(table, [:named_table, :set, read_concurrency: true, write_concurrency: false])

state = %__MODULE__{table: table}

{:consumer, state, subscribe_to: subscribe_to}
end

@impl true
def handle_events(events, _from, state) do
events
|> Enum.map(&decode_data/1)
|> Enum.each(&handle_event(&1, state))

{:noreply, [], state}
end

def all(table \\ @table) do
table
|> :ets.tab2list()
|> Enum.map(&elem(&1, 1))
end

defp handle_event(%Event{event: "reset", data: data}, state) do
alerts =
Enum.map(data, fn data ->
alert = Alerts.Parser.parse_alert(data)
{alert.id, alert}
end)

true = :ets.delete_all_objects(state.table)
true = :ets.insert(state.table, alerts)

:ok
end

defp handle_event(%Event{event: event, data: alert}, state) when event in ~w[add update] do
alert = Alerts.Parser.parse_alert(alert)

true = :ets.insert(state.table, {alert.id, alert})

:ok
end

defp handle_event(%Event{event: "remove", data: %{"id" => id}}, state) do
true = :ets.delete(state.table, id)

:ok
end

defp decode_data(%Event{data: encoded} = event) do
decoded = Jason.decode!(encoded)

%{event | data: decoded}
end
end
3 changes: 2 additions & 1 deletion lib/screens/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ defmodule Screens.Application do
{Screens.ScreensByAlert.SelfRefreshRunner, name: Screens.ScreensByAlert.SelfRefreshRunner},
Screens.OlCrowding.DynamicSupervisor,
{Screens.OlCrowding.Agent, %{}},
{Screens.ScreenApiResponseCache, []}
{Screens.ScreenApiResponseCache, []},
Screens.Streams.Alerts
]

# See https://hexdocs.pm/elixir/Supervisor.html
Expand Down
46 changes: 46 additions & 0 deletions lib/screens/streams/alerts.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
defmodule Screens.Streams.Alerts do
@moduledoc """
Supervisor for streamed producer and consumer(s) of Alerts data from the
V3 API
"""
use Supervisor

@dialyzer {:nowarn_function, children: 1}
@env Mix.env()

def start_link(opts) do
{name, init_arg} = Keyword.pop(opts, :name, __MODULE__)
Supervisor.start_link(__MODULE__, init_arg, name: name)
end

@impl true
def init(_init_arg) do
children()
|> Supervisor.init(strategy: :one_for_all)
end

defp children(env \\ @env)
defp children(:test), do: []

defp children(_env) do
api_url = Application.get_env(:screens, :default_api_v3_url)
api_key = Application.get_env(:screens, :api_v3_key)

url =
api_url
|> URI.merge("/alerts")
|> URI.to_string()

producer = {
ServerSentEventStage,
name: Screens.Streams.Alerts.Producer, url: url, headers: [{"x-api-key", api_key}]
}

consumer = {
Screens.Alerts.Cache,
name: Screens.Alerts.Cache, subscribe_to: [Screens.Streams.Alerts.Producer]
}

[producer, consumer]
end
end
Loading