diff --git a/CHANGELOG.md b/CHANGELOG.md index 065119c..47388aa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## 1.1.0 - 2024-06-18 + +- Use new `Extreme.ListenerWithBackPressure` + ## 1.0.1 - 2024-01-31 - Bump all dependencies including bumping `:extreme` to v1.0.5 which fixes a diff --git a/config/test.exs b/config/test.exs index d1bd4be..fd8732b 100644 --- a/config/test.exs +++ b/config/test.exs @@ -1,5 +1,12 @@ import Config +config :logger, :console, + format: "$time $metadata[$level] $message\n", + level: :debug, + metadata: [:pid, :module, :function] + +config :ex_unit, capture_log: true + config :kelvin, ExtremeClient, db_type: :node, host: System.get_env("EVENTSTORE_HOST") || "localhost", diff --git a/lib/kelvin/in_order_subscription.ex b/lib/kelvin/in_order_subscription.ex index d90c6d3..e961971 100644 --- a/lib/kelvin/in_order_subscription.ex +++ b/lib/kelvin/in_order_subscription.ex @@ -35,7 +35,7 @@ defmodule Kelvin.InOrderSubscription do defstruct [ :config, - :subscription, + :extreme_listener, :self, :max_buffer_size, demand: 0, @@ -53,13 +53,36 @@ defmodule Kelvin.InOrderSubscription do Keyword.get( opts, :catch_up_chunk_size, - Application.get_env(:kelvin, :catch_up_chunk_size, 256) + Application.get_env(:kelvin, :catch_up_chunk_size, 128) + ) + + connection = Keyword.fetch!(opts, :connection) + stream_name = Keyword.fetch!(opts, :stream_name) + + listener_name = + opts + |> Keyword.get(:name, __MODULE__) + |> Module.concat(ExtremeListener) + + {:ok, extreme_listener} = + Kelvin.Listener.start_link(connection, stream_name, + read_per_page: max_buffer_size, + auto_subscribe: false, + ack_timeout: :infinity, + name: listener_name, + producer: self(), + get_stream_position_fun: fn -> + opts + |> Keyword.fetch!(:restore_stream_position!) + |> _do_function() + end ) state = %__MODULE__{ + extreme_listener: extreme_listener, config: Map.new(opts), self: Keyword.get(opts, :name, self()), - max_buffer_size: max_buffer_size + max_buffer_size: max_buffer_size * 2 } Process.send_after( @@ -92,27 +115,11 @@ defmodule Kelvin.InOrderSubscription do end def handle_info(:subscribe, state) do - if state.subscription do - # coveralls-ignore-start - Logger.warn("#{inspect(__MODULE__)} is already subscribed.") - # coveralls-ignore-stop - else - case _subscribe(state) do - {:ok, sub} -> - Process.link(sub) - {:noreply, [], put_in(state.subscription, sub)} - - # coveralls-ignore-start - {:error, reason} -> - {:stop, reason, state} + Kelvin.Listener.subscribe(state.extreme_listener) - # coveralls-ignore-stop - end - end + {:noreply, [], state} end - def handle_info(_info, state), do: {:noreply, [], state} - @impl GenStage def handle_call({:on_event, event}, from, state) do # when the current demand is 0, we should @@ -162,18 +169,6 @@ defmodule Kelvin.InOrderSubscription do end end - defp _subscribe(state) do - state.config.connection - |> Extreme.RequestManager._name() - |> GenServer.call( - {:read_and_stay_subscribed, self(), - {state.config.stream_name, - _do_function(state.config.restore_stream_position!) + 1, - state.max_buffer_size, true, false, :infinity}}, - :infinity - ) - end - defp _do_function(func) when is_function(func, 0), do: func.() defp _do_function({m, f, a}) when is_atom(m) and is_atom(f) and is_list(a) do diff --git a/lib/kelvin/listener.ex b/lib/kelvin/listener.ex new file mode 100644 index 0000000..c805ac8 --- /dev/null +++ b/lib/kelvin/listener.ex @@ -0,0 +1,22 @@ +defmodule Kelvin.Listener do + @moduledoc false + use Extreme.ListenerWithBackPressure + + @impl Extreme.ListenerWithBackPressure + def on_init(opts) do + state = %{ + producer: Keyword.fetch!(opts, :producer), + get_stream_position_fun: Keyword.fetch!(opts, :get_stream_position_fun) + } + + {:ok, state} + end + + @impl Extreme.ListenerWithBackPressure + def get_last_event(_stream_name, %{} = state), + do: state.get_stream_position_fun.() + + @impl Extreme.ListenerWithBackPressure + def process_push(push, _stream_name, %{} = state), + do: GenServer.call(state.producer, {:on_event, push}) +end diff --git a/mix.exs b/mix.exs index 76073dd..aa65bb0 100644 --- a/mix.exs +++ b/mix.exs @@ -45,7 +45,7 @@ defmodule Kelvin.MixProject do defp deps do [ {:gen_stage, "~> 1.0"}, - {:extreme, "~> 1.0 and >= 1.0.5"}, + {:extreme, "~> 1.1.0-rc8"}, # docs {:ex_doc, ">= 0.0.0", only: :dev, runtime: false}, # test diff --git a/mix.lock b/mix.lock index b792348..7681062 100644 --- a/mix.lock +++ b/mix.lock @@ -7,10 +7,10 @@ "ex_doc": {:hex, :ex_doc, "0.31.1", "8a2355ac42b1cc7b2379da9e40243f2670143721dd50748bf6c3b1184dae2089", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.1", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "3178c3a407c557d8343479e1ff117a96fd31bafe52a039079593fb0524ef61b0"}, "excoveralls": {:hex, :excoveralls, "0.18.0", "b92497e69465dc51bc37a6422226ee690ab437e4c06877e836f1c18daeb35da9", [:mix], [{:castore, "~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "1109bb911f3cb583401760be49c02cbbd16aed66ea9509fc5479335d284da60b"}, "exprotobuf": {:hex, :exprotobuf, "1.2.17", "3003937da617f588a8fb63ebdd7b127a18d78d6502623c272076fd54c07c4de1", [:mix], [{:gpb, "~> 4.0", [hex: :gpb, repo: "hexpm", optional: false]}], "hexpm", "e07ec1e5ae6f8c1c8521450d5f6b658c8c700b1f34c70356e91ece0766f4361a"}, - "extreme": {:hex, :extreme, "1.0.5", "fafb04fb514ed63667cdd9385b313d7c67aa10887a9f8f1f290cb721d1ee0e48", [:mix], [{:elixir_uuid, "~> 1.2", [hex: :elixir_uuid, repo: "hexpm", optional: false]}, {:exprotobuf, "~> 1.2.9", [hex: :exprotobuf, repo: "hexpm", optional: false]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "84588993fcb8f410a3b90a2defaf966d50763a1f9db665a83e0546a34335fa45"}, + "extreme": {:hex, :extreme, "1.1.0-rc8", "6e37b68a8109c7d38d5cc0d1b968693af2ee9d9a006ef22087e9b1d7dbe3d72f", [:mix], [{:elixir_uuid, "~> 1.2", [hex: :elixir_uuid, repo: "hexpm", optional: false]}, {:exprotobuf, "~> 1.2.9", [hex: :exprotobuf, repo: "hexpm", optional: false]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "664ab127234359d7739e4c5450984cb2a0cc6bba793ae2e543889ef4872ee589"}, "file_system": {:hex, :file_system, "1.0.0", "b689cc7dcee665f774de94b5a832e578bd7963c8e637ef940cd44327db7de2cd", [:mix], [], "hexpm", "6752092d66aec5a10e662aefeed8ddb9531d79db0bc145bb8c40325ca1d8536d"}, "gen_stage": {:hex, :gen_stage, "1.2.1", "19d8b5e9a5996d813b8245338a28246307fd8b9c99d1237de199d21efc4c76a1", [:mix], [], "hexpm", "83e8be657fa05b992ffa6ac1e3af6d57aa50aace8f691fcf696ff02f8335b001"}, - "gpb": {:hex, :gpb, "4.21.0", "7a2eb8dd0f3032b7b46b04dcdb490ffabe43eab3a9a1f905bd03c9ec35babb0f", [:make, :rebar3], [], "hexpm", "da45984d26048d8d508d3bbffa6f4a5a5163841cefbf40809622bf92b4640de4"}, + "gpb": {:hex, :gpb, "4.21.1", "72e229c242d252d690addcfd04a6416c26c4d4d2c3521e05570a7a78b48d3bd1", [:make, :rebar3], [], "hexpm", "c05c9aea9e25bd341367a43b3d3eb68e951563911072259c5ec4cb6642f4ef22"}, "jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"}, "makeup": {:hex, :makeup, "1.1.1", "fa0bc768698053b2b3869fa8a62616501ff9d11a562f3ce39580d60860c3a55e", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "5dc62fbdd0de44de194898b6710692490be74baa02d9d108bc29f007783b0b48"}, "makeup_elixir": {:hex, :makeup_elixir, "0.16.1", "cc9e3ca312f1cfeccc572b37a09980287e243648108384b97ff2b76e505c3555", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "e127a341ad1b209bd80f7bd1620a15693a9908ed780c3b763bccf7d200c767c6"}, diff --git a/test/kelvin/in_order_subscription_test.exs b/test/kelvin/in_order_subscription_test.exs index ffb67a9..f9d7874 100644 --- a/test/kelvin/in_order_subscription_test.exs +++ b/test/kelvin/in_order_subscription_test.exs @@ -1,8 +1,6 @@ defmodule Kelvin.InOrderSubscriptionTest do use ExUnit.Case, async: true - @moduletag :capture_log - alias Extreme.Messages setup do @@ -14,7 +12,7 @@ defmodule Kelvin.InOrderSubscriptionTest do describe "given events have been written to a stream" do setup c do - write_events(0..100, c.stream_name) + _write_events(0..100, c.stream_name) :ok end @@ -33,7 +31,7 @@ defmodule Kelvin.InOrderSubscriptionTest do assert event.event.data == to_string(n) end - write_events(101..200, c.stream_name) + _write_events(101..200, c.stream_name) for n <- 101..200 do assert_receive {:events, [event]}, 1_000 @@ -65,15 +63,7 @@ defmodule Kelvin.InOrderSubscriptionTest do assert_receive {:DOWN, ^monitor_ref, _, _, _} - # we're hardcoding the restore_stream_position! function so this will - # restart from 0 instead of the current stream position as would be the - # case in a real-life system - for n <- 0..100 do - assert_receive {:events, [event]}, 10_000 - assert event.event.data == to_string(n) - end - - write_events(101..200, c.stream_name) + _write_events(101..200, c.stream_name) for n <- 101..200 do assert_receive {:events, [event]}, 1_000 @@ -83,28 +73,41 @@ defmodule Kelvin.InOrderSubscriptionTest do end describe "given only a few events have been written to a stream" do - setup c do - write_events(0..10, c.stream_name) - :ok - end - test "a slow subscription catches up", c do + total_events = 100 + opts = [ producer_name: c.producer_name, stream_name: c.stream_name, restore_stream_position!: &restore_stream_position!/0, test_proc: self(), # note how we add an artificial bottleneck to the consumer here - sleep_time: 100, + sleep_time: 10, # and tune down the catch-up (and therefore max buffer queue size) - catch_up_chunk_size: 1 + catch_up_chunk_size: 1, + subscribe_after: 1 # in order to simulate a consumer which is slow and get coverage # on the supply-buffering we do with the queue ] start_supervised!({MyInOrderSupervisor, opts}) - for n <- 0..10 do + spawn(fn -> + Process.sleep(200) + _write_events(0..total_events, c.stream_name) + end) + + for n <- 0..total_events do + assert_receive {:events, [event]}, 6_000 + assert event.event.data == to_string(n) + end + + spawn(fn -> + Process.sleep(200) + _write_events(0..total_events, c.stream_name) + end) + + for n <- 0..total_events do assert_receive {:events, [event]}, 6_000 assert event.event.data == to_string(n) end @@ -113,19 +116,23 @@ defmodule Kelvin.InOrderSubscriptionTest do defp restore_stream_position!, do: -1 - defp write_events(range, stream) do + defp _write_events(range, stream) do range |> Enum.map(fn n -> - Messages.NewEvent.new( - event_id: Extreme.Tools.generate_uuid(), - event_type: "kelvin_test_event", - data_content_type: 1, - metadata_content_type: 1, - # valid JSON - data: to_string(n), - metadata: "{}" - ) + Process.sleep(5) + + [ + Messages.NewEvent.new( + event_id: Extreme.Tools.generate_uuid(), + event_type: "kelvin_test_event", + data_content_type: 1, + metadata_content_type: 1, + # valid JSON + data: to_string(n), + metadata: "{}" + ) + ] + |> ExtremeClient.append_events(stream) end) - |> ExtremeClient.append_events(stream) end end