This library provides the ability to read a named or $all stream from
EventStore and checkpoint back to a store of your choice.
The stream reader is created via. EventStoreSource.Run which will return an
IStreamReader for either an AllStreamReader or a NamedStreamReader depending on the
streamName parameter.
This library has been plucked from a project I am working and is...
- Still a work-in-progress
- Is missing tests
- Has primarily been tested with reading from the
$allstream (AllStreamReader). - May be missing auxiliary components (eg.
paket.dependencies)
I'm hoping to remedy these points soon, but wanted to share the initial work.
The checkpointing implementation uses a generic ICheckpointer which could use any
backend, but the library also provides an EventStoreCheckpointer which implements
this interface.
Obviously, by using the same EventStore instance to read and write from, then
you must be careful not to cause an endless loop. This is where eventFilter parameter
comes in.
This library is currently using the following pinned dependencies (via. the Paket package manager).
nuget EventStore.Client ~> 5
nuget protobuf-net == 2.4
Once the Equinox packages use the new GRPC EventStore client, then I will look
at using it in this package. Until then, this is not possible as the two versions
will not coincide.
The following example is a simplified example from from the composition root in my projector:
let buildProjector cfg =
(*
`cfg` is just a generic configuration record for the app
I make a bunch of separately configured `Serilog.ILoggers`, but you
can use the global logger if you prefer.
val makeLogger : name:string -> Serilog.ILogger
*)
let makeConn connectionName =
let connector =
Equinox.EventStore.Connector (
username = cfg.Username,
password = cfg.Password,
reqTimeout = cfg.Timeout,
reqRetries = cfg.Retries,
heartbeatTimeout = cfg.HeartbeatTimeout,
log = Logger.SerilogVerbose ((makeLogger "Store").ForContext("source", connectionName)),
tags = [ "M", Environment.MachineName; "I", Guid.NewGuid () |> string ]
)
connector.Connect (connectionName, cfg.Discovery, cfg.NodePreference)
|> Async.RunSynchronously
let sink =
Propulsion.Streams.StreamsProjector.Start (
log = makeLogger "Projector",
maxReadAhead = cfg.MaxReadAhead,
maxConcurrentStreams = cfg.MaxConcurrentStreams,
statsInterval = cfg.StatsInterval,
handle = myHandler, // Your stream event handler
stats = stats
)
let checkpointer =
EventStoreCheckpointer (
(makeConn "checkpointer").WriteConnection,
logger = makeLogger "Checkpointer"
) :> ICheckpointer
let runSourcePipeline =
EventStoreSource.Run (
readerLogger = makeLogger "Reader",
statsLogger = makeLogger "Stats",
conn = makeConn "source",
startPosition = cfg.StartPosition,
checkpointer = checkpointer,
consumerGroup = cfg.ConsumerGroup,
maxBatchSize = cfg.MaxReadAhead,
sink = sink,
eventFilter = filterEvent, // ResolvedEvent -> bool
tailSleepInterval = TimeSpan.FromSeconds 5.,
statsInterval = cfg.StatsInterval,
streamType = cfg.StreamType
)
[<EntryPoint>]
let main argv =
// Acquire any configuration
let cfg = getConfig ()
let sink, runSourcePipeline = buildProjector cfg
// Endless loop
let rec runLoop () = async {
match! Async.Catch runSourcePipeline with
| Choice1Of2 _ -> return ()
| Choice2Of2 ex ->
logger.Error (ex, "Exception in run loop: {ex}", ex.Message)
do! Async.Sleep 3_000
logger.Information ("Restarting run loop")
return! runLoop ()
}
runLoop () |> Async.Start
sink.AwaitCompletion () |> Async.RunSynchronously
if sink.RanToCompletion then 0 else 1