Revent is a lightweight event-sourcing library for Scala based on cats, designed to stay out of your way
The name "revent" is a portmanteau for reduce + event
Event sourcing is an architectural pattern in which you model your application dat, by capturing a sequence of immutable events. The topic was discussed in length by people like Greg Young, Udi Dahan, Martin Fowler and others. It is recommended to study the benefits and disadvantages of using event sourcing beforehand.
Recommended materials:
A protocol defines a particular view over an event stream. Let's take a closer look at the
Protocol
interface:
trait Protocol {
type EventStream <: org.revent.EventStream
type Aggregate
}
trait EventStream {
type Id
type Payload
}
When defining a protocol, you must supply the types for the event stream ID, the event payload, and the aggregate itself.
In the bank account example, as used in the tests, the protocol definition is as follows:
trait BankAccountStream extends EventStream {
override type Id = UUID
override type Payload = BankAccountEvent
}
trait BankAccountProtocol extends Protocol {
override type EventStream = BankAccountStream
override type Aggregate = BankAccount
}
We see that the aggregate ID (every event belongs to some unique aggregate) is a UUID
, each event
in the stream is a subclass of BankAccountEvent
, and the aggregate itself is of type
BankAccount
. We can use the same event stream with other protocols as well, as shown with the
BankAccountBalanceProtocol
here.
Reducers are used to reduce an event stream into a single value. In most cases, you'll need to
define aggregate reducers, which reduce the event stream into your aggregate type (as defined in
your Protocol
)
Continuing with the bank account example, a reducer might look like this:
object BankAccountReducer extends AggregateReducer[BankAccountProtocol] {
override val empty: BankAccount = BankAccount.Empty
override def handle(account: BankAccount, event: BankAccountEvent): BankAccount = event match {
case OwnerChanged(owner) => account.withOwner(owner)
case DepositPerformed(amount) => account.deposit(amount)
case WithdrawalPerformed(amount) => account.withdraw(amount)
case _ => account
}
}
As seen here, 2 methods need to be implemented:
empty
- which returns an initial value for the aggregatehandle(aggregate, event)
- which takes an aggregate and an event, and returns a new version of the aggregate with the event applied
Note: you must always return an aggregate. If the new event has no effect on the aggregate, return the aggregate as is
The event store abstraction is used to persist and read event streams. It's comprised of two traits:
EventStreamReader[F[_], ES <: EventStream]
and EventStreamWriter[F[_], ES <: EventStream]
. They
are both parameterized by 2 type arguments
F[_]
- is the type constructor of the target monad. This allows the event store to be either synchronous (by usingTry
orEither
, for example) or asynchronous (by usingFuture
)ES
- is the type of the event stream the store is meant to read and write. This may have implications on the way used to serialize and deserialize the stream ID and event payloads
Several implementations are already provided:
- Async store based on Apache's Cassandra - here
- Async store based on Greg Young's Event Store - here
- Sync in memory implementation (useful for testing) - here
The aggregate repository can be used to retrieve the current state of the aggregate. The
ReplayingAggregateRepository
uses the reducer to replay all past events for the aggregate, reduced
to the current state.
For optimization, a SnapshottingAggregateRepository
(WIP) can be used. It will periodically take
snapshots of the aggregate's state to prevent replaying the entire event stream for each request.
The library includes simple CQRS-style command handling.
The EventSourcedCommandHandler
will execute an EventSourcedCommand
by running these steps:
- Fetch current state of the aggregate from the aggregate repository
- Apply the aggregate to the command to produce new events
- Persist events to the event store
During this process, optimistic locking is used to verify consistency.
Example commands and their usage can be found in the tests.
Copyright © 2017 Amitay Horwitz
Distributed under the MIT License