Skip to content

Commit

Permalink
fix: restart ServerSentEventStage if Stream.Consumer crashes (#196)
Browse files Browse the repository at this point in the history
  • Loading branch information
boringcactus authored Sep 18, 2024
1 parent c0775bd commit a01c475
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 1 deletion.
2 changes: 1 addition & 1 deletion lib/mbta_v3_api/stream/instance.ex
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ defmodule MBTAV3API.Stream.Instance do
consumer_spec(Keyword.put(opts, :ref, ref))
]

Supervisor.init(children, strategy: :rest_for_one)
Supervisor.init(children, strategy: :one_for_all)
end

@spec consumer_spec(keyword()) :: {module(), keyword()}
Expand Down
24 changes: 24 additions & 0 deletions test/mbta_v3_api/stream/instance_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,30 @@ defmodule MBTAV3API.Stream.InstanceTest do
assert log =~ "consumer_subscribers=0"
end

test "restarts stream if consumer crashes" do
instance =
start_link_supervised!(
{MBTAV3API.Stream.Instance,
url: "https://example.com", headers: [{"a", "b"}], destination: self(), type: Route}
)

old_sse_stage = SSEStub.get_from_instance(instance)

{_id, consumer, _type, [Stream.Consumer]} =
instance
|> Supervisor.which_children()
|> Enum.find(fn {_id, _child, _type, [module]} -> module == Stream.Consumer end)

Process.exit(consumer, :simulating_crash)

# wait for the supervisor to restart things
Process.sleep(5)

new_sse_stage = SSEStub.get_from_instance(instance)

refute old_sse_stage == new_sse_stage
end

describe "consumer_spec/1" do
test "when not specified, returns default consumer" do
assert {Stream.Consumer, _spec} =
Expand Down

0 comments on commit a01c475

Please sign in to comment.