diff --git a/neurow/config/runtime.exs b/neurow/config/runtime.exs index 17d1202..7b8c150 100644 --- a/neurow/config/runtime.exs +++ b/neurow/config/runtime.exs @@ -18,6 +18,7 @@ config :neurow, String.to_integer(System.get_env("INTERNAL_API_JWT_MAX_LIFETIME") || "120") config :neurow, sse_timeout: String.to_integer(System.get_env("SSE_TIMEOUT") || "900000") +config :neurow, sse_keepalive: String.to_integer(System.get_env("SSE_KEEPALIVE") || "600000") config :neurow, ssl_keyfile: System.get_env("SSL_KEYFILE") config :neurow, ssl_certfile: System.get_env("SSL_CERTFILE") diff --git a/neurow/lib/neurow/application.ex b/neurow/lib/neurow/application.ex index d2aa305..9ba76ad 100644 --- a/neurow/lib/neurow/application.ex +++ b/neurow/lib/neurow/application.ex @@ -48,7 +48,9 @@ defmodule Neurow.Application do name: Neurow.PubSub, options: [adapter: Phoenix.PubSub.PG2, pool_size: 10]}, {Plug.Cowboy, scheme: :http, plug: Neurow.InternalApi, options: [port: internal_api_port]}, {Plug.Cowboy, - scheme: sse_http_scheme, plug: Neurow.PublicApi, options: public_api_http_config} + scheme: sse_http_scheme, plug: Neurow.PublicApi, options: public_api_http_config}, + {Plug.Cowboy.Drainer, refs: [Neurow.PublicApi.HTTP], shutdown: 20_000}, + {StopListener, []} ] MetricsPlugExporter.setup() diff --git a/neurow/lib/neurow/configuration.ex b/neurow/lib/neurow/configuration.ex index 889d35f..4bc958c 100644 --- a/neurow/lib/neurow/configuration.ex +++ b/neurow/lib/neurow/configuration.ex @@ -10,11 +10,11 @@ defmodule Neurow.Configuration do end def public_api_audience do - Application.fetch_env!(:neurow, :public_api_authentication)[:audience] + GenServer.call(__MODULE__, {:static_param, :public_api_audience}) end def public_api_verbose_authentication_errors do - Application.fetch_env!(:neurow, :public_api_authentication)[:verbose_authentication_errors] + GenServer.call(__MODULE__, {:static_param, :public_api_verbose_authentication_errors}) end def internal_api_issuer_jwks(issuer_name) do @@ -22,25 +22,27 @@ defmodule Neurow.Configuration do end def internal_api_audience do - Application.fetch_env!(:neurow, :internal_api_authentication)[:audience] + GenServer.call(__MODULE__, {:static_param, :internal_api_audience}) end def internal_api_verbose_authentication_errors do - Application.fetch_env!(:neurow, :internal_api_authentication)[ - :verbose_authentication_errors - ] + GenServer.call(__MODULE__, {:static_param, :internal_api_verbose_authentication_errors}) end def internal_api_jwt_max_lifetime do - Application.fetch_env!(:neurow, :internal_api_jwt_max_lifetime) + GenServer.call(__MODULE__, {:static_param, :internal_api_jwt_max_lifetime}) end def public_api_jwt_max_lifetime do - Application.fetch_env!(:neurow, :public_api_jwt_max_lifetime) + GenServer.call(__MODULE__, {:static_param, :public_api_jwt_max_lifetime}) end def sse_timeout do - Application.fetch_env!(:neurow, :sse_timeout) + GenServer.call(__MODULE__, {:static_param, :sse_timeout}) + end + + def sse_keepalive do + GenServer.call(__MODULE__, {:static_param, :sse_keepalive}) end @impl true @@ -52,7 +54,23 @@ defmodule Neurow.Configuration do }, internal_api: %{ issuer_jwks: build_issuer_jwks(:internal_api_authentication) - } + }, + sse_keepalive: Application.fetch_env!(:neurow, :sse_keepalive), + sse_timeout: Application.fetch_env!(:neurow, :sse_timeout), + internal_api_jwt_max_lifetime: + Application.fetch_env!(:neurow, :internal_api_jwt_max_lifetime), + public_api_jwt_max_lifetime: Application.fetch_env!(:neurow, :public_api_jwt_max_lifetime), + internal_api_verbose_authentication_errors: + Application.fetch_env!(:neurow, :internal_api_authentication)[ + :verbose_authentication_errors + ], + public_api_verbose_authentication_errors: + Application.fetch_env!(:neurow, :public_api_authentication)[ + :verbose_authentication_errors + ], + internal_api_audience: + Application.fetch_env!(:neurow, :internal_api_authentication)[:audience], + public_api_audience: Application.fetch_env!(:neurow, :public_api_authentication)[:audience] }} end @@ -66,6 +84,11 @@ defmodule Neurow.Configuration do {:reply, state[:internal_api][:issuer_jwks][issuer_name], state} end + @impl true + def handle_call({:static_param, key}, _from, state) do + {:reply, state[key], state} + end + defp build_issuer_jwks(api_authentication_scope) do Application.fetch_env!(:neurow, api_authentication_scope)[:issuers] |> Enum.map(fn {issuer_name, shared_secrets} -> diff --git a/neurow/lib/neurow/public_api.ex b/neurow/lib/neurow/public_api.ex index 889f7cd..f251cb7 100644 --- a/neurow/lib/neurow/public_api.ex +++ b/neurow/lib/neurow/public_api.ex @@ -14,8 +14,6 @@ defmodule Neurow.PublicApi do count_error: &Stats.inc_jwt_errors_public/0 ) - plug(Neurow.SseOptionsPlug, sse_timeout: &Neurow.Configuration.sse_timeout/0) - plug(:match) plug(:dispatch) @@ -26,10 +24,16 @@ defmodule Neurow.PublicApi do timeout = case conn.req_headers |> List.keyfind("x-sse-timeout", 0) do - nil -> conn.assigns[:sse_timeout] + nil -> Neurow.Configuration.sse_timeout() {"x-sse-timeout", timeout} -> String.to_integer(timeout) end + keep_alive = + case conn.req_headers |> List.keyfind("x-sse-keepalive", 0) do + nil -> Neurow.Configuration.sse_keepalive() + {"x-sse-keepalive", keepalive} -> String.to_integer(keepalive) + end + conn = conn |> put_resp_header("content-type", "text/event-stream") @@ -38,6 +42,7 @@ defmodule Neurow.PublicApi do |> put_resp_header("access-control-allow-origin", "*") |> put_resp_header("x-sse-server", to_string(node())) |> put_resp_header("x-sse-timeout", to_string(timeout)) + |> put_resp_header("x-sse-keepalive", to_string(keep_alive)) :ok = Phoenix.PubSub.subscribe(Neurow.PubSub, topic) @@ -45,7 +50,8 @@ defmodule Neurow.PublicApi do Logger.debug("Client subscribed to #{topic}") - conn |> loop(timeout) + last_message = :os.system_time(:millisecond) + conn |> loop(timeout, keep_alive, last_message, last_message) Logger.debug("Client disconnected from #{topic}") conn @@ -54,14 +60,37 @@ defmodule Neurow.PublicApi do end end - defp loop(conn, sse_timeout) do + defp loop(conn, sse_timeout, keep_alive, last_message, last_ping) do receive do {:pubsub_message, msg_id, msg} -> {:ok, conn} = chunk(conn, "id: #{msg_id}\ndata: #{msg}\n\n") Stats.inc_msg_published() - loop(conn, sse_timeout) + new_last_message = :os.system_time(:millisecond) + loop(conn, sse_timeout, keep_alive, new_last_message, new_last_message) after - sse_timeout -> :timeout + 1000 -> + now = :os.system_time(:millisecond) + + cond do + # SSE Timeout + now - last_message > sse_timeout -> + Logger.debug("Client disconnected due to inactivity") + :timeout + + # SSE Keep alive, send a ping + now - last_ping > keep_alive -> + chunk(conn, "event: ping\n\n") + loop(conn, sse_timeout, keep_alive, last_message, now) + + # We need to stop + StopListener.close_connections?() -> + chunk(conn, "event: reconnect\n\n") + :close + + # Nothing + true -> + loop(conn, sse_timeout, keep_alive, last_message, last_ping) + end end end diff --git a/neurow/lib/neurow/sse_options_plug.ex b/neurow/lib/neurow/sse_options_plug.ex deleted file mode 100644 index 0772df4..0000000 --- a/neurow/lib/neurow/sse_options_plug.ex +++ /dev/null @@ -1,23 +0,0 @@ -defmodule Neurow.SseOptionsPlug do - require Logger - - import Plug.Conn - - defmodule Options do - defstruct [ - :sse_timeout - ] - - def sse_timeout(options) do - if is_function(options.sse_timeout), - do: options.sse_timeout.(), - else: options.sse_timeout - end - end - - def init(options), do: struct(Options, options) - - def call(conn, options) do - conn |> assign(:sse_timeout, options |> Options.sse_timeout()) - end -end diff --git a/neurow/lib/stop_listener.ex b/neurow/lib/stop_listener.ex new file mode 100644 index 0000000..cff73b3 --- /dev/null +++ b/neurow/lib/stop_listener.ex @@ -0,0 +1,30 @@ +defmodule StopListener do + use GenServer + require Logger + + def start_link(opts) do + GenServer.start_link(__MODULE__, opts, name: __MODULE__) + end + + @impl true + def init(_) do + :ets.new(__MODULE__, [:set, :named_table, read_concurrency: true]) + Process.flag(:trap_exit, true) + {:ok, %{shutdown_in_progress: false}} + end + + def close_connections?() do + try do + :ets.lookup(__MODULE__, :close_connections?) + false + rescue + ArgumentError -> true + end + end + + @impl GenServer + def terminate(_reason, _state) do + Logger.info("Graceful Shutdown occurring") + :ok + end +end diff --git a/neurow/test/neurow/public_api_integration_test.exs b/neurow/test/neurow/public_api_integration_test.exs index f4a592c..43268f5 100644 --- a/neurow/test/neurow/public_api_integration_test.exs +++ b/neurow/test/neurow/public_api_integration_test.exs @@ -26,6 +26,9 @@ defmodule Neurow.PublicApiIntegrationTest do {:http, {_, :stream_start, headers}} -> {:start, headers} + {:http, {_, :stream_end, _}} -> + {:end} + msg -> raise("Unexpected message: #{inspect(msg)}") after @@ -103,4 +106,57 @@ defmodule Neurow.PublicApiIntegrationTest do assert msg == "id: 43\ndata: hello2\n\n" :ok = :httpc.cancel_request(request_id) end + + test "GET /v1/subscribe 200 sse keepalive" do + url = "http://localhost:4000/v1/subscribe" + + headers = [ + {["Authorization"], "Bearer #{compute_jwt_token_in_req_header_public_api("foo57")}"}, + {["x-sse-keepalive"], "100"} + ] + + {:ok, request_id} = + :httpc.request(:get, {url, headers}, [], [{:sync, false}, {:stream, :self}]) + + {:start, headers} = next_message() + assert_headers(headers, {"content-type", "text/event-stream"}) + assert_headers(headers, {"cache-control", "no-cache"}) + assert_headers(headers, {"connection", "close"}) + + publish("test_issuer1-foo57", "42", "hello") + Process.sleep(1100) + + {:stream, msg} = next_message() + assert msg == "id: 42\ndata: hello\n\n" + {:stream, msg} = next_message() + assert msg == "event: ping\n\n" + Process.sleep(1100) + {:stream, msg} = next_message() + assert msg == "event: ping\n\n" + :ok = :httpc.cancel_request(request_id) + end + + test "GET /v1/subscribe 200 sse timeout" do + url = "http://localhost:4000/v1/subscribe" + + headers = [ + {["Authorization"], "Bearer #{compute_jwt_token_in_req_header_public_api("foo57")}"}, + {["x-sse-timeout"], "100"} + ] + + {:ok, request_id} = + :httpc.request(:get, {url, headers}, [], [{:sync, false}, {:stream, :self}]) + + {:start, headers} = next_message() + assert_headers(headers, {"content-type", "text/event-stream"}) + assert_headers(headers, {"cache-control", "no-cache"}) + assert_headers(headers, {"connection", "close"}) + + Process.sleep(1100) + + {:stream, msg} = next_message() + assert msg == "" + {:end} = next_message() + :ok = :httpc.cancel_request(request_id) + end end