Skip to content

Commit

Permalink
Use extreme with backpressure
Browse files Browse the repository at this point in the history
  • Loading branch information
burmajam committed Jun 26, 2024
1 parent db0fd8e commit 17b6b7a
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 68 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ The format is based on [Keep a
Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to
[Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## 1.1.0 - 2024-06-18

- Use new `Extreme.ListenerWithBackPressure`

## 1.0.1 - 2024-01-31

- Bump all dependencies including bumping `:extreme` to v1.0.5 which fixes a
Expand Down
7 changes: 7 additions & 0 deletions config/test.exs
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
import Config

config :logger, :console,
format: "$time $metadata[$level] $message\n",
level: :debug,
metadata: [:pid, :module, :function]

config :ex_unit, capture_log: true

config :kelvin, ExtremeClient,
db_type: :node,
host: System.get_env("EVENTSTORE_HOST") || "localhost",
Expand Down
61 changes: 28 additions & 33 deletions lib/kelvin/in_order_subscription.ex
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ defmodule Kelvin.InOrderSubscription do

defstruct [
:config,
:subscription,
:extreme_listener,
:self,
:max_buffer_size,
demand: 0,
Expand All @@ -53,13 +53,36 @@ defmodule Kelvin.InOrderSubscription do
Keyword.get(
opts,
:catch_up_chunk_size,
Application.get_env(:kelvin, :catch_up_chunk_size, 256)
Application.get_env(:kelvin, :catch_up_chunk_size, 128)
)

connection = Keyword.fetch!(opts, :connection)
stream_name = Keyword.fetch!(opts, :stream_name)

listener_name =
opts
|> Keyword.get(:name, __MODULE__)
|> Module.concat(ExtremeListener)

{:ok, extreme_listener} =
Kelvin.Listener.start_link(connection, stream_name,
read_per_page: max_buffer_size,
auto_subscribe: false,
ack_timeout: :infinity,
name: listener_name,
producer: self(),
get_stream_position_fun: fn ->
opts
|> Keyword.fetch!(:restore_stream_position!)
|> _do_function()
end
)

state = %__MODULE__{
extreme_listener: extreme_listener,
config: Map.new(opts),
self: Keyword.get(opts, :name, self()),
max_buffer_size: max_buffer_size
max_buffer_size: max_buffer_size * 2
}

Process.send_after(
Expand Down Expand Up @@ -92,27 +115,11 @@ defmodule Kelvin.InOrderSubscription do
end

def handle_info(:subscribe, state) do
if state.subscription do
# coveralls-ignore-start
Logger.warn("#{inspect(__MODULE__)} is already subscribed.")
# coveralls-ignore-stop
else
case _subscribe(state) do
{:ok, sub} ->
Process.link(sub)
{:noreply, [], put_in(state.subscription, sub)}

# coveralls-ignore-start
{:error, reason} ->
{:stop, reason, state}
Kelvin.Listener.subscribe(state.extreme_listener)

# coveralls-ignore-stop
end
end
{:noreply, [], state}
end

def handle_info(_info, state), do: {:noreply, [], state}

@impl GenStage
def handle_call({:on_event, event}, from, state) do
# when the current demand is 0, we should
Expand Down Expand Up @@ -162,18 +169,6 @@ defmodule Kelvin.InOrderSubscription do
end
end

defp _subscribe(state) do
state.config.connection
|> Extreme.RequestManager._name()
|> GenServer.call(
{:read_and_stay_subscribed, self(),
{state.config.stream_name,
_do_function(state.config.restore_stream_position!) + 1,
state.max_buffer_size, true, false, :infinity}},
:infinity
)
end

defp _do_function(func) when is_function(func, 0), do: func.()

defp _do_function({m, f, a}) when is_atom(m) and is_atom(f) and is_list(a) do
Expand Down
22 changes: 22 additions & 0 deletions lib/kelvin/listener.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
defmodule Kelvin.Listener do
@moduledoc false
use Extreme.ListenerWithBackPressure

@impl Extreme.ListenerWithBackPressure
def on_init(opts) do
state = %{
producer: Keyword.fetch!(opts, :producer),
get_stream_position_fun: Keyword.fetch!(opts, :get_stream_position_fun)
}

{:ok, state}
end

@impl Extreme.ListenerWithBackPressure
def get_last_event(_stream_name, %{} = state),
do: state.get_stream_position_fun.()

@impl Extreme.ListenerWithBackPressure
def process_push(push, _stream_name, %{} = state),
do: GenServer.call(state.producer, {:on_event, push})
end
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ defmodule Kelvin.MixProject do
defp deps do
[
{:gen_stage, "~> 1.0"},
{:extreme, "~> 1.0 and >= 1.0.5"},
{:extreme, "~> 1.1.0-rc8"},
# docs
{:ex_doc, ">= 0.0.0", only: :dev, runtime: false},
# test
Expand Down
4 changes: 2 additions & 2 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
"ex_doc": {:hex, :ex_doc, "0.31.1", "8a2355ac42b1cc7b2379da9e40243f2670143721dd50748bf6c3b1184dae2089", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.1", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "3178c3a407c557d8343479e1ff117a96fd31bafe52a039079593fb0524ef61b0"},
"excoveralls": {:hex, :excoveralls, "0.18.0", "b92497e69465dc51bc37a6422226ee690ab437e4c06877e836f1c18daeb35da9", [:mix], [{:castore, "~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "1109bb911f3cb583401760be49c02cbbd16aed66ea9509fc5479335d284da60b"},
"exprotobuf": {:hex, :exprotobuf, "1.2.17", "3003937da617f588a8fb63ebdd7b127a18d78d6502623c272076fd54c07c4de1", [:mix], [{:gpb, "~> 4.0", [hex: :gpb, repo: "hexpm", optional: false]}], "hexpm", "e07ec1e5ae6f8c1c8521450d5f6b658c8c700b1f34c70356e91ece0766f4361a"},
"extreme": {:hex, :extreme, "1.0.5", "fafb04fb514ed63667cdd9385b313d7c67aa10887a9f8f1f290cb721d1ee0e48", [:mix], [{:elixir_uuid, "~> 1.2", [hex: :elixir_uuid, repo: "hexpm", optional: false]}, {:exprotobuf, "~> 1.2.9", [hex: :exprotobuf, repo: "hexpm", optional: false]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "84588993fcb8f410a3b90a2defaf966d50763a1f9db665a83e0546a34335fa45"},
"extreme": {:hex, :extreme, "1.1.0-rc8", "6e37b68a8109c7d38d5cc0d1b968693af2ee9d9a006ef22087e9b1d7dbe3d72f", [:mix], [{:elixir_uuid, "~> 1.2", [hex: :elixir_uuid, repo: "hexpm", optional: false]}, {:exprotobuf, "~> 1.2.9", [hex: :exprotobuf, repo: "hexpm", optional: false]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "664ab127234359d7739e4c5450984cb2a0cc6bba793ae2e543889ef4872ee589"},
"file_system": {:hex, :file_system, "1.0.0", "b689cc7dcee665f774de94b5a832e578bd7963c8e637ef940cd44327db7de2cd", [:mix], [], "hexpm", "6752092d66aec5a10e662aefeed8ddb9531d79db0bc145bb8c40325ca1d8536d"},
"gen_stage": {:hex, :gen_stage, "1.2.1", "19d8b5e9a5996d813b8245338a28246307fd8b9c99d1237de199d21efc4c76a1", [:mix], [], "hexpm", "83e8be657fa05b992ffa6ac1e3af6d57aa50aace8f691fcf696ff02f8335b001"},
"gpb": {:hex, :gpb, "4.21.0", "7a2eb8dd0f3032b7b46b04dcdb490ffabe43eab3a9a1f905bd03c9ec35babb0f", [:make, :rebar3], [], "hexpm", "da45984d26048d8d508d3bbffa6f4a5a5163841cefbf40809622bf92b4640de4"},
"gpb": {:hex, :gpb, "4.21.1", "72e229c242d252d690addcfd04a6416c26c4d4d2c3521e05570a7a78b48d3bd1", [:make, :rebar3], [], "hexpm", "c05c9aea9e25bd341367a43b3d3eb68e951563911072259c5ec4cb6642f4ef22"},
"jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"},
"makeup": {:hex, :makeup, "1.1.1", "fa0bc768698053b2b3869fa8a62616501ff9d11a562f3ce39580d60860c3a55e", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "5dc62fbdd0de44de194898b6710692490be74baa02d9d108bc29f007783b0b48"},
"makeup_elixir": {:hex, :makeup_elixir, "0.16.1", "cc9e3ca312f1cfeccc572b37a09980287e243648108384b97ff2b76e505c3555", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "e127a341ad1b209bd80f7bd1620a15693a9908ed780c3b763bccf7d200c767c6"},
Expand Down
71 changes: 39 additions & 32 deletions test/kelvin/in_order_subscription_test.exs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
defmodule Kelvin.InOrderSubscriptionTest do
use ExUnit.Case, async: true

@moduletag :capture_log

alias Extreme.Messages

setup do
Expand All @@ -14,7 +12,7 @@ defmodule Kelvin.InOrderSubscriptionTest do

describe "given events have been written to a stream" do
setup c do
write_events(0..100, c.stream_name)
_write_events(0..100, c.stream_name)
:ok
end

Expand All @@ -33,7 +31,7 @@ defmodule Kelvin.InOrderSubscriptionTest do
assert event.event.data == to_string(n)
end

write_events(101..200, c.stream_name)
_write_events(101..200, c.stream_name)

for n <- 101..200 do
assert_receive {:events, [event]}, 1_000
Expand Down Expand Up @@ -65,15 +63,7 @@ defmodule Kelvin.InOrderSubscriptionTest do

assert_receive {:DOWN, ^monitor_ref, _, _, _}

# we're hardcoding the restore_stream_position! function so this will
# restart from 0 instead of the current stream position as would be the
# case in a real-life system
for n <- 0..100 do
assert_receive {:events, [event]}, 10_000
assert event.event.data == to_string(n)
end

write_events(101..200, c.stream_name)
_write_events(101..200, c.stream_name)

for n <- 101..200 do
assert_receive {:events, [event]}, 1_000
Expand All @@ -83,28 +73,41 @@ defmodule Kelvin.InOrderSubscriptionTest do
end

describe "given only a few events have been written to a stream" do
setup c do
write_events(0..10, c.stream_name)
:ok
end

test "a slow subscription catches up", c do
total_events = 100

opts = [
producer_name: c.producer_name,
stream_name: c.stream_name,
restore_stream_position!: &restore_stream_position!/0,
test_proc: self(),
# note how we add an artificial bottleneck to the consumer here
sleep_time: 100,
sleep_time: 10,
# and tune down the catch-up (and therefore max buffer queue size)
catch_up_chunk_size: 1
catch_up_chunk_size: 1,
subscribe_after: 1
# in order to simulate a consumer which is slow and get coverage
# on the supply-buffering we do with the queue
]

start_supervised!({MyInOrderSupervisor, opts})

for n <- 0..10 do
spawn(fn ->
Process.sleep(200)
_write_events(0..total_events, c.stream_name)
end)

for n <- 0..total_events do
assert_receive {:events, [event]}, 6_000
assert event.event.data == to_string(n)
end

spawn(fn ->
Process.sleep(200)
_write_events(0..total_events, c.stream_name)
end)

for n <- 0..total_events do
assert_receive {:events, [event]}, 6_000
assert event.event.data == to_string(n)
end
Expand All @@ -113,19 +116,23 @@ defmodule Kelvin.InOrderSubscriptionTest do

defp restore_stream_position!, do: -1

defp write_events(range, stream) do
defp _write_events(range, stream) do
range
|> Enum.map(fn n ->
Messages.NewEvent.new(
event_id: Extreme.Tools.generate_uuid(),
event_type: "kelvin_test_event",
data_content_type: 1,
metadata_content_type: 1,
# valid JSON
data: to_string(n),
metadata: "{}"
)
Process.sleep(5)

[
Messages.NewEvent.new(
event_id: Extreme.Tools.generate_uuid(),
event_type: "kelvin_test_event",
data_content_type: 1,
metadata_content_type: 1,
# valid JSON
data: to_string(n),
metadata: "{}"
)
]
|> ExtremeClient.append_events(stream)
end)
|> ExtremeClient.append_events(stream)
end
end

0 comments on commit 17b6b7a

Please sign in to comment.