dispenser
is an Elixir library for buffering events and sending them to multiple subscribers.
The terminology used in dispenser
is similar to the terminology used in gen_stage
.
events
are pieces of data you want to send to subscriber
s.
A buffer
is something that you can put events
into.
Multiple subscriber
s can ask
a buffer
for any number of events
.
The amount of events
that a subscriber
has ask
ed for and is waiting to receive is its demand
.
The basic function of the library is to accept events
and assign them to subscribers
.
There are two main "modes" for the buffer
:
-
Normal Mode: There are more subscription
demand
s than there areevents
in thebuffer
. The buffer is not filling up and events can be sent. -
Overloaded Mode: There are more events than the subscribers can handle. If the buffer becomes completely filled, it will drop events according to its
LimitedQueue.drop_strategy\0
.
Different uses of the library can decide how to handle these cases.
Two example GenServer
s are implemented along with the library (see below).
Append events and add subscription demand in any order, and then use Buffer.assign_events/1
to assign events to subscribers.
alias Dispenser.{AssignmentStrategy, Buffer}
capacity = 4
buffer = Buffer.new(AssignmentStrategy.Even, capacity, :drop_newest)
# Buffer.size(buffer) == 0
# Buffer.stats(buffer) == %{buffered: 0, demand: 0}
events = ["a", "b", "c", "d", "e"]
{buffer, dropped} = Buffer.append(buffer, events)
# dropped == 1
# Buffer.stats(buffer) == %{buffered: 4, demand: 0}
subscription_1 = make_ref()
buffer = Buffer.ask(buffer, subscription_1, 2)
# Buffer.stats(buffer) == %{buffered: 4, demand: 2}
subscription_2 = make_ref()
buffer = Buffer.ask(buffer, subscription_2, 2)
# Buffer.stats(buffer) == %{buffered: 4, demand: 4}
{buffer, assignments} = Buffer.assign_events(buffer)
# Buffer.stats(buffer) == %{buffered: 0, demand: 0}
# assignments == [{^subscription_1, ["a", "b"]}, {^subscription_2, ["c", "d"]}]
The library is broken into several pieces that can be implemented and tested simply.
Dispenser.Demands
is an opaque module that keeps track of demands from subscribers.Dispenser.AssignmentStrategy.Even
is the assignment method we use to decide which subscribers to send a limited number of events to. It is the only assignment method implemented, but this can be extended to other methods.Dispenser.Buffer
is the main buffer that ties everything together and keeps track of demand and events.Dispenser.SubscriptionManager
can monitorsubscribers
and is a helper for buildingGenServer
s that buffer events.Dispenser.MonitoredBuffer
combines theDispenser.Buffer
andDispenser.SubscriptionManager
into one module.
Users of this library will likely implement their own GenServer
, but these examples are a good place to start.
Most normal uses and error cases of the BufferServer
and BatchingBufferServer
are covered in the tests.
The simplest example GenServer
is Dispenser.Server.BufferServer
, which will accept events and send them to subscribers.
-
Normal State:
BufferServer
will accept events and immediately send them to subscribers as evenly as it can (seeDispenser.AssignmentStrategy.Even
and the associated tests for the assignment logic by itself inDispenser.AssignmentStrategy.EvenTest
). -
Overloaded State: The internal buffer is filling up and the
BufferServer
will immediately send events to any subscriber whoask
s.
Because of these two modes, the BufferServer
's state will either have some pending demand, or some buffered events, but never both at the same time.
When a subscribed process unsubscribes or crashes, it is removed from the BufferServer
's subscribers and any remaining demand for its subscriptions is canceled.
Here is a simple example:
alias Dispenser.{AssignmentStrategy, Buffer}
alias Dispenser.Server.BufferServer
capacity = 10
buffer = Buffer.new(AssignmentStrategy.Even, capacity, :drop_oldest)
buffer_server = BufferServer.start_link(%{buffer: buffer})
:ok = BufferServer.ask(buffer_server, 1)
assert BufferServer.stats(buffer_server) == %{buffered: 0, subscribed: 1, demand: 1}
events = ["a", "b", "c", "d", "e"]
{:ok, 0} = BufferServer.append(buffer_server, events)
assert_receive {:handle_assigned_events, ^buffer_server, ["a"]}
assert BufferServer.stats(buffer_server) == %{buffered: 9, subscribed: 1, demand: 0}
Please see the docs for BufferServer.ask/3
for more details on the format of the :handle_assigned_events
message.
Please see the documentation and associated test (Dispenser.Server.BufferServerTest
) for more details.
Dispenser.Server.BatchingBufferServer
is a slightly optimized improvement on BufferServer
that will only send events once a minimum batch size of events has been reached.
The BatchingBufferServer
has the two states from BufferServer
,
but it has a third state where it is waiting for the buffer to reach a specified size before sending out events.
This helps reduce the number of messages sent to subscribers that have demand > 1.
Please see the documentation for BatchingBufferServer
and associated test (Dispenser.Server.BatchingBufferServerTest
) for details.
Tests can be run by running mix test
in the root of the library.
This library contains a lot of internal documentation.
Documentation is available on HexDocs, or you can generate the documentation from source:
$ mix deps.get
$ mix docs