Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
bpaquet committed Jul 21, 2024
1 parent ec9ca36 commit 7cb0992
Show file tree
Hide file tree
Showing 7 changed files with 159 additions and 41 deletions.
1 change: 1 addition & 0 deletions neurow/config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ config :neurow,
String.to_integer(System.get_env("INTERNAL_API_JWT_MAX_LIFETIME") || "120")

config :neurow, sse_timeout: String.to_integer(System.get_env("SSE_TIMEOUT") || "900000")
config :neurow, sse_keepalive: String.to_integer(System.get_env("SSE_KEEPALIVE") || "600000")

config :neurow, ssl_keyfile: System.get_env("SSL_KEYFILE")
config :neurow, ssl_certfile: System.get_env("SSL_CERTFILE")
Expand Down
4 changes: 3 additions & 1 deletion neurow/lib/neurow/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ defmodule Neurow.Application do
name: Neurow.PubSub, options: [adapter: Phoenix.PubSub.PG2, pool_size: 10]},
{Plug.Cowboy, scheme: :http, plug: Neurow.InternalApi, options: [port: internal_api_port]},
{Plug.Cowboy,
scheme: sse_http_scheme, plug: Neurow.PublicApi, options: public_api_http_config}
scheme: sse_http_scheme, plug: Neurow.PublicApi, options: public_api_http_config},
{Plug.Cowboy.Drainer, refs: [Neurow.PublicApi.HTTP], shutdown: 20_000},
{StopListener, []}
]

MetricsPlugExporter.setup()
Expand Down
43 changes: 33 additions & 10 deletions neurow/lib/neurow/configuration.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,37 +10,39 @@ defmodule Neurow.Configuration do
end

def public_api_audience do
Application.fetch_env!(:neurow, :public_api_authentication)[:audience]
GenServer.call(__MODULE__, {:static_param, :public_api_audience})
end

def public_api_verbose_authentication_errors do
Application.fetch_env!(:neurow, :public_api_authentication)[:verbose_authentication_errors]
GenServer.call(__MODULE__, {:static_param, :public_api_verbose_authentication_errors})
end

def internal_api_issuer_jwks(issuer_name) do
GenServer.call(__MODULE__, {:internal_api_issuer_jwks, issuer_name})
end

def internal_api_audience do
Application.fetch_env!(:neurow, :internal_api_authentication)[:audience]
GenServer.call(__MODULE__, {:static_param, :internal_api_audience})
end

def internal_api_verbose_authentication_errors do
Application.fetch_env!(:neurow, :internal_api_authentication)[
:verbose_authentication_errors
]
GenServer.call(__MODULE__, {:static_param, :internal_api_verbose_authentication_errors})
end

def internal_api_jwt_max_lifetime do
Application.fetch_env!(:neurow, :internal_api_jwt_max_lifetime)
GenServer.call(__MODULE__, {:static_param, :internal_api_jwt_max_lifetime})
end

def public_api_jwt_max_lifetime do
Application.fetch_env!(:neurow, :public_api_jwt_max_lifetime)
GenServer.call(__MODULE__, {:static_param, :public_api_jwt_max_lifetime})
end

def sse_timeout do
Application.fetch_env!(:neurow, :sse_timeout)
GenServer.call(__MODULE__, {:static_param, :sse_timeout})
end

def sse_keepalive do
GenServer.call(__MODULE__, {:static_param, :sse_keepalive})
end

@impl true
Expand All @@ -52,7 +54,23 @@ defmodule Neurow.Configuration do
},
internal_api: %{
issuer_jwks: build_issuer_jwks(:internal_api_authentication)
}
},
sse_keepalive: Application.fetch_env!(:neurow, :sse_keepalive),
sse_timeout: Application.fetch_env!(:neurow, :sse_timeout),
internal_api_jwt_max_lifetime:
Application.fetch_env!(:neurow, :internal_api_jwt_max_lifetime),
public_api_jwt_max_lifetime: Application.fetch_env!(:neurow, :public_api_jwt_max_lifetime),
internal_api_verbose_authentication_errors:
Application.fetch_env!(:neurow, :internal_api_authentication)[
:verbose_authentication_errors
],
public_api_verbose_authentication_errors:
Application.fetch_env!(:neurow, :public_api_authentication)[
:verbose_authentication_errors
],
internal_api_audience:
Application.fetch_env!(:neurow, :internal_api_authentication)[:audience],
public_api_audience: Application.fetch_env!(:neurow, :public_api_authentication)[:audience]
}}
end

Expand All @@ -66,6 +84,11 @@ defmodule Neurow.Configuration do
{:reply, state[:internal_api][:issuer_jwks][issuer_name], state}
end

@impl true
def handle_call({:static_param, key}, _from, state) do
{:reply, state[key], state}
end

defp build_issuer_jwks(api_authentication_scope) do
Application.fetch_env!(:neurow, api_authentication_scope)[:issuers]
|> Enum.map(fn {issuer_name, shared_secrets} ->
Expand Down
43 changes: 36 additions & 7 deletions neurow/lib/neurow/public_api.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ defmodule Neurow.PublicApi do
count_error: &Stats.inc_jwt_errors_public/0
)

plug(Neurow.SseOptionsPlug, sse_timeout: &Neurow.Configuration.sse_timeout/0)

plug(:match)
plug(:dispatch)

Expand All @@ -26,10 +24,16 @@ defmodule Neurow.PublicApi do

timeout =
case conn.req_headers |> List.keyfind("x-sse-timeout", 0) do
nil -> conn.assigns[:sse_timeout]
nil -> Neurow.Configuration.sse_timeout()
{"x-sse-timeout", timeout} -> String.to_integer(timeout)
end

keep_alive =
case conn.req_headers |> List.keyfind("x-sse-keepalive", 0) do
nil -> Neurow.Configuration.sse_keepalive()
{"x-sse-keepalive", keepalive} -> String.to_integer(keepalive)
end

conn =
conn
|> put_resp_header("content-type", "text/event-stream")
Expand All @@ -38,14 +42,16 @@ defmodule Neurow.PublicApi do
|> put_resp_header("access-control-allow-origin", "*")
|> put_resp_header("x-sse-server", to_string(node()))
|> put_resp_header("x-sse-timeout", to_string(timeout))
|> put_resp_header("x-sse-keepalive", to_string(keep_alive))

:ok = Phoenix.PubSub.subscribe(Neurow.PubSub, topic)

conn = send_chunked(conn, 200)

Logger.debug("Client subscribed to #{topic}")

conn |> loop(timeout)
last_message = :os.system_time(:millisecond)
conn |> loop(timeout, keep_alive, last_message, last_message)
Logger.debug("Client disconnected from #{topic}")
conn

Expand All @@ -54,14 +60,37 @@ defmodule Neurow.PublicApi do
end
end

defp loop(conn, sse_timeout) do
defp loop(conn, sse_timeout, keep_alive, last_message, last_ping) do
receive do
{:pubsub_message, msg_id, msg} ->
{:ok, conn} = chunk(conn, "id: #{msg_id}\ndata: #{msg}\n\n")
Stats.inc_msg_published()
loop(conn, sse_timeout)
new_last_message = :os.system_time(:millisecond)
loop(conn, sse_timeout, keep_alive, new_last_message, new_last_message)
after
sse_timeout -> :timeout
1000 ->
now = :os.system_time(:millisecond)

cond do
# SSE Timeout
now - last_message > sse_timeout ->
Logger.debug("Client disconnected due to inactivity")
:timeout

# SSE Keep alive, send a ping
now - last_ping > keep_alive ->
chunk(conn, "event: ping\n\n")
loop(conn, sse_timeout, keep_alive, last_message, now)

# We need to stop
StopListener.close_connections?() ->
chunk(conn, "event: reconnect\n\n")
:close

# Nothing
true ->
loop(conn, sse_timeout, keep_alive, last_message, last_ping)
end
end
end

Expand Down
23 changes: 0 additions & 23 deletions neurow/lib/neurow/sse_options_plug.ex

This file was deleted.

30 changes: 30 additions & 0 deletions neurow/lib/stop_listener.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
defmodule StopListener do
use GenServer
require Logger

def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end

@impl true
def init(_) do
:ets.new(__MODULE__, [:set, :named_table, read_concurrency: true])
Process.flag(:trap_exit, true)
{:ok, %{shutdown_in_progress: false}}
end

def close_connections?() do
try do
:ets.lookup(__MODULE__, :close_connections?)
false
rescue
ArgumentError -> true
end
end

@impl GenServer
def terminate(_reason, _state) do
Logger.info("Graceful Shutdown occurring")
:ok
end
end
56 changes: 56 additions & 0 deletions neurow/test/neurow/public_api_integration_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ defmodule Neurow.PublicApiIntegrationTest do
{:http, {_, :stream_start, headers}} ->
{:start, headers}

{:http, {_, :stream_end, _}} ->
{:end}

msg ->
raise("Unexpected message: #{inspect(msg)}")
after
Expand Down Expand Up @@ -103,4 +106,57 @@ defmodule Neurow.PublicApiIntegrationTest do
assert msg == "id: 43\ndata: hello2\n\n"
:ok = :httpc.cancel_request(request_id)
end

test "GET /v1/subscribe 200 sse keepalive" do
url = "http://localhost:4000/v1/subscribe"

headers = [
{["Authorization"], "Bearer #{compute_jwt_token_in_req_header_public_api("foo57")}"},
{["x-sse-keepalive"], "100"}
]

{:ok, request_id} =
:httpc.request(:get, {url, headers}, [], [{:sync, false}, {:stream, :self}])

{:start, headers} = next_message()
assert_headers(headers, {"content-type", "text/event-stream"})
assert_headers(headers, {"cache-control", "no-cache"})
assert_headers(headers, {"connection", "close"})

publish("test_issuer1-foo57", "42", "hello")
Process.sleep(1100)

{:stream, msg} = next_message()
assert msg == "id: 42\ndata: hello\n\n"
{:stream, msg} = next_message()
assert msg == "event: ping\n\n"
Process.sleep(1100)
{:stream, msg} = next_message()
assert msg == "event: ping\n\n"
:ok = :httpc.cancel_request(request_id)
end

test "GET /v1/subscribe 200 sse timeout" do
url = "http://localhost:4000/v1/subscribe"

headers = [
{["Authorization"], "Bearer #{compute_jwt_token_in_req_header_public_api("foo57")}"},
{["x-sse-timeout"], "100"}
]

{:ok, request_id} =
:httpc.request(:get, {url, headers}, [], [{:sync, false}, {:stream, :self}])

{:start, headers} = next_message()
assert_headers(headers, {"content-type", "text/event-stream"})
assert_headers(headers, {"cache-control", "no-cache"})
assert_headers(headers, {"connection", "close"})

Process.sleep(1100)

{:stream, msg} = next_message()
assert msg == ""
{:end} = next_message()
:ok = :httpc.cancel_request(request_id)
end
end

0 comments on commit 7cb0992

Please sign in to comment.