Skip to content

Commit

Permalink
Replace cast with call on StopListener.shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
achouippe committed Oct 3, 2024
1 parent 6093afd commit 0c2febf
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 15 deletions.
4 changes: 2 additions & 2 deletions neurow/integration_test/sse_livecycle_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ defmodule Neurow.IntegrationTest.SseLifecycleTest do
end

def override_timeout(timeout) do
default_timeout = Application.fetch_env(:neurow, :sse_timeout)
{:ok, default_timeout} = Application.fetch_env(:neurow, :sse_timeout)
TestCluster.update_sse_timeout(timeout)

on_exit(fn ->
Expand All @@ -117,7 +117,7 @@ defmodule Neurow.IntegrationTest.SseLifecycleTest do
end

def override_keepalive(keepalive) do
default_keepalive = Application.fetch_env(:neurow, :sse_keepalive)
{:ok, default_keepalive} = Application.fetch_env(:neurow, :sse_keepalive)
TestCluster.update_sse_keepalive(keepalive)

on_exit(fn ->
Expand Down
14 changes: 7 additions & 7 deletions neurow/lib/neurow/public_api/endpoint.ex
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ defmodule Neurow.PublicApi.Endpoint do
{conn, sent}
end

def loop(conn, sse_timeout_ms, keep_alive_ms, last_message_ts, last_ping_ts, jwt_exp) do
def loop(conn, sse_timeout_ms, keep_alive_ms, last_message_ts, last_ping_ts, jwt_exp_s) do
now_ms = :os.system_time(:millisecond)

cond do
Expand All @@ -205,19 +205,19 @@ defmodule Neurow.PublicApi.Endpoint do
keep_alive_ms,
last_message_ts,
now_ms,
jwt_exp
jwt_exp_s
)

# JWT token expired
jwt_exp * 1000 < now_ms ->
jwt_exp_s * 1000 < now_ms ->
conn |> write_chunk("event: credentials_expired")

# Otherwise, let's wait for a message or the next tick
true ->
# Compute the waiting time before the next tick
next_ping_ms = last_ping_ts + keep_alive_ms - now_ms
timeout_ms = last_message_ts + sse_timeout_ms - now_ms
jwt_exp_ms = jwt_exp * 1000 - now_ms
jwt_exp_ms = jwt_exp_s * 1000 - now_ms

# The Erlang process scheduler does not guarantee the `after` block will be executed with a ms precision,
# So a small tolerance is added, also a minimum of 100ms is set to avoid busy waiting
Expand All @@ -235,7 +235,7 @@ defmodule Neurow.PublicApi.Endpoint do
keep_alive_ms,
new_last_message_ts,
new_last_message_ts,
jwt_exp
jwt_exp_s
)

:shutdown ->
Expand All @@ -244,10 +244,10 @@ defmodule Neurow.PublicApi.Endpoint do

# Consume useless messages to avoid memory overflow
_ ->
conn |> loop(sse_timeout_ms, keep_alive_ms, last_message_ts, last_ping_ts, jwt_exp)
conn |> loop(sse_timeout_ms, keep_alive_ms, last_message_ts, last_ping_ts, jwt_exp_s)
after
next_tick_ms ->
conn |> loop(sse_timeout_ms, keep_alive_ms, last_message_ts, last_ping_ts, jwt_exp)
conn |> loop(sse_timeout_ms, keep_alive_ms, last_message_ts, last_ping_ts, jwt_exp_s)
end
end
end
Expand Down
12 changes: 6 additions & 6 deletions neurow/lib/stop_listener.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ defmodule Neurow.StopListener do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end

def subscribe(value \\ nil) do
case Registry.register(Registry.StopListener, :shutdown_subscribers, value) do
def subscribe() do
case Registry.register(Registry.StopListener, :shutdown_subscribers, nil) do
{:ok, _pid} -> :ok
{:error, cause} -> {:error, cause}
end
end

def shutdown() do
GenServer.cast(__MODULE__, :shutdown)
GenServer.call(__MODULE__, :shutdown)
end

@impl true
Expand All @@ -24,14 +24,14 @@ defmodule Neurow.StopListener do
end

@impl true
def handle_cast(:shutdown, state) do
def handle_call(:shutdown, _from, state) do
Logger.info("Graceful shutdown occurring ...")

Registry.dispatch(Registry.StopListener, :shutdown_subscribers, fn entries ->
Logger.info("Shutting down #{length(entries)} connections")

for {pid, value} <- entries do
send(pid, {:shutdown, value})
for {pid, _value} <- entries do
send(pid, :shutdown)
end
end)

Expand Down

0 comments on commit 0c2febf

Please sign in to comment.