Skip to content

Commit

Permalink
Volatile offset store (#128)
Browse files Browse the repository at this point in the history
* Optional offset store

This commit permits a projection consumer to optionally receive an offset store. Where it is not supplied, the source is requested to provided all of the events that it has.

* Revert "Optional offset store"

This reverts commit ff93e31.

* Provides a volatile offset store

...and joins the offset store with the consumer for convenience. Also saves creating another task for the projection as these two things will always go hand-in-hand.
  • Loading branch information
huntc authored Nov 16, 2023
1 parent c3f09a7 commit d259516
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 14 deletions.
3 changes: 1 addition & 2 deletions akka-projection-rs-commitlog/src/offset_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,11 +367,10 @@ pub fn task(

#[cfg(test)]
mod tests {
use std::{env, fs};

use super::*;

use akka_persistence_rs::TimestampOffset;
use std::{env, fs};
use test_log::test;

#[test(tokio::test)]
Expand Down
11 changes: 5 additions & 6 deletions akka-projection-rs/benches/benches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ use akka_persistence_rs::{
WithSource,
};
use akka_projection_rs::{
consumer, offset_store::LastOffset, Handler, HandlerError, SourceProvider,
consumer, offset_store::LastOffset, volatile_offset_store, Handler, HandlerError,
SourceProvider,
};
use async_stream::stream;
use async_trait::async_trait;
use criterion::{criterion_group, criterion_main, Criterion};
use tokio::sync::{mpsc, Notify};
use tokio::sync::Notify;
use tokio_stream::Stream;

const NUM_EVENTS: usize = 10_000;
Expand Down Expand Up @@ -106,9 +107,7 @@ fn criterion_benchmark(c: &mut Criterion) {
.unwrap();

let events_processed = Arc::new(Notify::new());
let (offset_store, mut offset_store_receiver) = mpsc::channel(1);
let offset_store_task =
async move { while let Some(_) = offset_store_receiver.recv().await {} };
let offset_store = volatile_offset_store::task(1);
let (projection_task, _kill_switch) = consumer::task(
offset_store,
TestSourceProvider,
Expand All @@ -117,7 +116,7 @@ fn criterion_benchmark(c: &mut Criterion) {
},
);

let _ = rt.spawn(async move { tokio::join!(offset_store_task, projection_task) });
let _ = rt.spawn(projection_task);

b.to_async(&rt).iter(|| {
let task_events_processed = events_processed.clone();
Expand Down
22 changes: 16 additions & 6 deletions akka-projection-rs/src/consumer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#![doc = include_str!("../README.md")]

use std::{collections::VecDeque, pin::Pin};
use std::{collections::VecDeque, io, pin::Pin};

use crate::{
offset_store::{self},
Expand All @@ -26,10 +26,13 @@ struct StorableState {
/// meaning, for multiple runs of a projection, it is possible for events to repeat
/// from previous runs.
pub fn task<A, B, Envelope, EE, IH, SP>(
offset_store: mpsc::Sender<offset_store::Command>,
offset_store: (
impl Future<Output = io::Result<()>>,
mpsc::Sender<offset_store::Command>,
),
source_provider: SP,
handler: IH,
) -> (impl Future<Output = ()>, oneshot::Sender<()>)
) -> (impl Future<Output = io::Result<()>>, oneshot::Sender<()>)
where
A: Handler<Envelope = EE> + Send,
B: PendingHandler<Envelope = EE> + Send,
Expand All @@ -40,7 +43,9 @@ where
{
let (kill_switch, mut kill_switch_receiver) = oneshot::channel();

let task = async move {
let (offset_store_task, offset_store) = offset_store;

let projection_task = async move {
let mut handler = handler.into();

let mut always_pending_handler: Pin<
Expand Down Expand Up @@ -203,7 +208,12 @@ where
}
};

(task, kill_switch)
let combined_task = async {
let (_, r) = tokio::join!(projection_task, offset_store_task);
r
};

(combined_task, kill_switch)
}

#[cfg(test)]
Expand Down Expand Up @@ -401,7 +411,7 @@ mod tests {
let (offset_store, mut offset_store_receiver) = mpsc::channel(1);
let task_persistence_id = persistence_id.clone();
let (projection_task, _kill_switch) = task(
offset_store,
(future::pending(), offset_store),
MySourceProvider {
persistence_id: task_persistence_id.clone(),
event_value: event_value.clone(),
Expand Down
1 change: 1 addition & 0 deletions akka-projection-rs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use tokio_stream::Stream;
pub mod consumer;
pub mod consumer_filter;
pub mod offset_store;
pub mod volatile_offset_store;

/// Captures the various types of handlers and the way they are performed.
pub enum Handlers<A, B>
Expand Down
115 changes: 115 additions & 0 deletions akka-projection-rs/src/volatile_offset_store.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
//! A volatile offset store is provided for situations where
//! events are always sourced events from their earliest offset.
//! An example use case is when events are queried over HTTP from
//! a web browser that does not retain the offset where it is up to.
//!
//! All offset data for a given persistence id is retained in
//! memory.

use std::{collections::HashMap, io};

use futures::Future;
use tokio::sync::mpsc;

use crate::offset_store;

/// Provides an asynchronous task and a command channel that can run and drive an in-memory offset store.
pub fn task(
keys_expected: usize,
) -> (
impl Future<Output = io::Result<()>>,
mpsc::Sender<offset_store::Command>,
) {
let (sender, mut receiver) = mpsc::channel(1);
let task = async move {
let mut offsets = HashMap::with_capacity(keys_expected);
while let Some(command) = receiver.recv().await {
match command {
offset_store::Command::GetLastOffset { reply_to } => {
let _ = reply_to.send(None);
offsets.clear();
}
offset_store::Command::GetOffset {
persistence_id,
reply_to,
} => {
let _ = reply_to.send(offsets.get(&persistence_id).cloned());
}
offset_store::Command::SaveOffset {
persistence_id,
offset,
} => {
offsets
.entry(persistence_id)
.and_modify(|v| *v = offset.clone())
.or_insert(offset);
}
}
}
Ok(())
};
(task, sender)
}

#[cfg(test)]
mod tests {
use super::*;

use akka_persistence_rs::{EntityId, EntityType, Offset, PersistenceId};
use test_log::test;
use tokio::sync::oneshot;

#[test(tokio::test)]
async fn test_basic_ops() {
let (task, commands) = task(1);

tokio::spawn(task);

let (reply_to, reply_to_receiver) = oneshot::channel();
assert!(commands
.send(offset_store::Command::GetLastOffset { reply_to })
.await
.is_ok());
assert_eq!(reply_to_receiver.await, Ok(None));

let persistence_id =
PersistenceId::new(EntityType::from("entity-type"), EntityId::from("entity-id"));

let offset = Offset::Sequence(10);

assert!(commands
.send(offset_store::Command::SaveOffset {
persistence_id: persistence_id.clone(),
offset: offset.clone()
})
.await
.is_ok());

let (reply_to, reply_to_receiver) = oneshot::channel();
assert!(commands
.send(offset_store::Command::GetOffset {
persistence_id: persistence_id.clone(),
reply_to
})
.await
.is_ok());
assert_eq!(reply_to_receiver.await, Ok(Some(offset)));

let (reply_to, reply_to_receiver) = oneshot::channel();
assert!(commands
.send(offset_store::Command::GetLastOffset { reply_to })
.await
.is_ok());
assert_eq!(reply_to_receiver.await, Ok(None));

let (reply_to, reply_to_receiver) = oneshot::channel();
assert!(commands
.send(offset_store::Command::GetOffset {
persistence_id,
reply_to
})
.await
.is_ok());
assert_eq!(reply_to_receiver.await, Ok(None));
}
}

0 comments on commit d259516

Please sign in to comment.