From 0c2febf14119931496bef0e3183b47087bdb55be Mon Sep 17 00:00:00 2001 From: Alexandre Chouippe Date: Thu, 3 Oct 2024 15:06:15 +0200 Subject: [PATCH] Replace cast with call on StopListener.shutdown --- neurow/integration_test/sse_livecycle_test.exs | 4 ++-- neurow/lib/neurow/public_api/endpoint.ex | 14 +++++++------- neurow/lib/stop_listener.ex | 12 ++++++------ 3 files changed, 15 insertions(+), 15 deletions(-) diff --git a/neurow/integration_test/sse_livecycle_test.exs b/neurow/integration_test/sse_livecycle_test.exs index df7ea0e..4a04d60 100644 --- a/neurow/integration_test/sse_livecycle_test.exs +++ b/neurow/integration_test/sse_livecycle_test.exs @@ -108,7 +108,7 @@ defmodule Neurow.IntegrationTest.SseLifecycleTest do end def override_timeout(timeout) do - default_timeout = Application.fetch_env(:neurow, :sse_timeout) + {:ok, default_timeout} = Application.fetch_env(:neurow, :sse_timeout) TestCluster.update_sse_timeout(timeout) on_exit(fn -> @@ -117,7 +117,7 @@ defmodule Neurow.IntegrationTest.SseLifecycleTest do end def override_keepalive(keepalive) do - default_keepalive = Application.fetch_env(:neurow, :sse_keepalive) + {:ok, default_keepalive} = Application.fetch_env(:neurow, :sse_keepalive) TestCluster.update_sse_keepalive(keepalive) on_exit(fn -> diff --git a/neurow/lib/neurow/public_api/endpoint.ex b/neurow/lib/neurow/public_api/endpoint.ex index fe1e035..68e5ffe 100644 --- a/neurow/lib/neurow/public_api/endpoint.ex +++ b/neurow/lib/neurow/public_api/endpoint.ex @@ -187,7 +187,7 @@ defmodule Neurow.PublicApi.Endpoint do {conn, sent} end - def loop(conn, sse_timeout_ms, keep_alive_ms, last_message_ts, last_ping_ts, jwt_exp) do + def loop(conn, sse_timeout_ms, keep_alive_ms, last_message_ts, last_ping_ts, jwt_exp_s) do now_ms = :os.system_time(:millisecond) cond do @@ -205,11 +205,11 @@ defmodule Neurow.PublicApi.Endpoint do keep_alive_ms, last_message_ts, now_ms, - jwt_exp + jwt_exp_s ) # JWT token expired - jwt_exp * 1000 < now_ms -> + jwt_exp_s * 1000 < now_ms -> conn |> write_chunk("event: credentials_expired") # Otherwise, let's wait for a message or the next tick @@ -217,7 +217,7 @@ defmodule Neurow.PublicApi.Endpoint do # Compute the waiting time before the next tick next_ping_ms = last_ping_ts + keep_alive_ms - now_ms timeout_ms = last_message_ts + sse_timeout_ms - now_ms - jwt_exp_ms = jwt_exp * 1000 - now_ms + jwt_exp_ms = jwt_exp_s * 1000 - now_ms # The Erlang process scheduler does not guarantee the `after` block will be executed with a ms precision, # So a small tolerance is added, also a minimum of 100ms is set to avoid busy waiting @@ -235,7 +235,7 @@ defmodule Neurow.PublicApi.Endpoint do keep_alive_ms, new_last_message_ts, new_last_message_ts, - jwt_exp + jwt_exp_s ) :shutdown -> @@ -244,10 +244,10 @@ defmodule Neurow.PublicApi.Endpoint do # Consume useless messages to avoid memory overflow _ -> - conn |> loop(sse_timeout_ms, keep_alive_ms, last_message_ts, last_ping_ts, jwt_exp) + conn |> loop(sse_timeout_ms, keep_alive_ms, last_message_ts, last_ping_ts, jwt_exp_s) after next_tick_ms -> - conn |> loop(sse_timeout_ms, keep_alive_ms, last_message_ts, last_ping_ts, jwt_exp) + conn |> loop(sse_timeout_ms, keep_alive_ms, last_message_ts, last_ping_ts, jwt_exp_s) end end end diff --git a/neurow/lib/stop_listener.ex b/neurow/lib/stop_listener.ex index 5989975..ceb4c5b 100644 --- a/neurow/lib/stop_listener.ex +++ b/neurow/lib/stop_listener.ex @@ -6,15 +6,15 @@ defmodule Neurow.StopListener do GenServer.start_link(__MODULE__, opts, name: __MODULE__) end - def subscribe(value \\ nil) do - case Registry.register(Registry.StopListener, :shutdown_subscribers, value) do + def subscribe() do + case Registry.register(Registry.StopListener, :shutdown_subscribers, nil) do {:ok, _pid} -> :ok {:error, cause} -> {:error, cause} end end def shutdown() do - GenServer.cast(__MODULE__, :shutdown) + GenServer.call(__MODULE__, :shutdown) end @impl true @@ -24,14 +24,14 @@ defmodule Neurow.StopListener do end @impl true - def handle_cast(:shutdown, state) do + def handle_call(:shutdown, _from, state) do Logger.info("Graceful shutdown occurring ...") Registry.dispatch(Registry.StopListener, :shutdown_subscribers, fn entries -> Logger.info("Shutting down #{length(entries)} connections") - for {pid, value} <- entries do - send(pid, {:shutdown, value}) + for {pid, _value} <- entries do + send(pid, :shutdown) end end)