Skip to content

Commit

Permalink
Entity manager and other conveniences (#121)
Browse files Browse the repository at this point in the history
* Improve the API ergonomics

Improves the API ergonomics by providing a convenience function for running an entity manager and projections etc. The function returns both a task to spawn and a channel for sending commands.

Along the way, I also removed the `NonZeroUsize` type as a parameter as it is ugly, and not intended for APIs.

* Returns a filter of consumers

Replaces having to create a watch, and a convenience for providing an initial set of filters.

* Return envelopes and filters

For convenience, returns an envelope sender and a filter receiver from the task.
  • Loading branch information
huntc authored Nov 13, 2023
1 parent 4aab7a9 commit 4891e6f
Show file tree
Hide file tree
Showing 8 changed files with 537 additions and 494 deletions.
18 changes: 4 additions & 14 deletions akka-persistence-rs-commitlog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -598,15 +598,15 @@ pub mod cbor {

#[cfg(test)]
mod tests {
use std::{env, fs, num::NonZeroUsize, time::Duration};
use std::{env, fs, time::Duration};

use super::*;
use akka_persistence_rs::{entity::EventSourcedBehavior, entity_manager};
use serde::Deserialize;
use streambed::commit_log::{Header, HeaderKey};
use streambed_logged::FileLog;
use test_log::test;
use tokio::{sync::mpsc, time};
use tokio::time;

// Scaffolding

Expand Down Expand Up @@ -831,17 +831,7 @@ mod tests {
Topic::from("some-topic"),
);

let my_behavior = MyBehavior;

let (_, my_command_receiver) = mpsc::channel(10);

assert!(entity_manager::run(
my_behavior,
file_log_topic_adapter,
my_command_receiver,
NonZeroUsize::new(1).unwrap(),
)
.await
.is_ok());
let (entity_manager, _) = entity_manager::task(MyBehavior, file_log_topic_adapter, 10, 1);
assert!(entity_manager.await.is_ok());
}
}
14 changes: 7 additions & 7 deletions akka-persistence-rs/benches/benches.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{io, num::NonZeroUsize, pin::Pin, sync::Arc};
use std::{io, pin::Pin, sync::Arc};

use akka_persistence_rs::{
effect::{persist_event, Effect, EffectExt},
Expand All @@ -8,7 +8,7 @@ use akka_persistence_rs::{
};
use async_trait::async_trait;
use criterion::{criterion_group, criterion_main, Criterion};
use tokio::sync::{mpsc, Notify};
use tokio::sync::Notify;
use tokio_stream::Stream;

const NUM_EVENTS: usize = 10_000;
Expand Down Expand Up @@ -81,16 +81,16 @@ fn criterion_benchmark(c: &mut Criterion) {
.unwrap();

let events_processed = Arc::new(Notify::new());
let (sender, receiver) = mpsc::channel(10);
let _ = rt.spawn(entity_manager::run(
let (task, sender) = entity_manager::task(
Behavior,
Adapter {
event_count: 0,
events_processed: events_processed.clone(),
},
receiver,
NonZeroUsize::new(1).unwrap(),
));
10,
1,
);
let _ = rt.spawn(task);

b.to_async(&rt).iter(|| {
let task_events_processed = events_processed.clone();
Expand Down
69 changes: 42 additions & 27 deletions akka-persistence-rs/src/entity_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ use chrono::Utc;
use log::debug;
use log::warn;
use lru::LruCache;
use std::future::Future;
use std::hash::BuildHasher;
use std::io;
use std::num::NonZeroUsize;
use std::pin::Pin;
use tokio::sync::mpsc;
use tokio::sync::mpsc::Receiver;
use tokio_stream::{Stream, StreamExt};

Expand Down Expand Up @@ -118,53 +120,71 @@ where
}
}

/// Manages the lifecycle of entities given a specific behavior.
/// Entity managers are established given an adapter of persistent events associated
/// Provides an asynchronous task and a command channel that can run and drive an entity manager.
///
/// Entity managers manage the lifecycle of entities given a specific behavior.
/// They are established given an adapter of persistent events associated
/// with an entity type. That source is consumed by subsequently telling
/// the entity manager to run, generally on its own task. Events are persisted by
/// calling on the adapter's handler.
///
/// Commands are sent to a channel established for the entity manager.
/// Effects may be produced as a result of performing a command, which may,
/// in turn, perform side effects and yield events.
pub async fn run<A, B>(
///
/// * `command_capacity` declares size of the command channel and will panic at runtime if zero.
/// * `entity_capacity` declares size of the number of entities to cache in memory at one time,
/// and will panic at runtime if zero.
pub fn task<A, B>(
behavior: B,
adapter: A,
receiver: Receiver<Message<B::Command>>,
capacity: NonZeroUsize,
) -> io::Result<()>
command_capacity: usize,
entity_capacity: usize,
) -> (
impl Future<Output = io::Result<()>>,
mpsc::Sender<Message<B::Command>>,
)
where
B: EventSourcedBehavior + Send + Sync + 'static,
B::Command: Send,
B::State: Send + Sync,
A: SourceProvider<B::Event> + Handler<B::Event> + Send + 'static,
{
run_with_hasher(
behavior,
adapter,
receiver,
capacity,
lru::DefaultHasher::default(),
let (sender, receiver) = mpsc::channel(command_capacity);
(
task_with_hasher(
behavior,
adapter,
receiver,
entity_capacity,
lru::DefaultHasher::default(),
),
sender,
)
.await
}

/// Manages the lifecycle of entities given a specific behavior.
/// Entity managers are established given a source of events associated
/// Provides an asynchronous task and a command channel that can run and drive an entity manager.
///
/// Entity managers manage the lifecycle of entities given a specific behavior.
/// They are established given an adapter of persistent events associated
/// with an entity type. That source is consumed by subsequently telling
/// the entity manager to run, generally on its own task.
/// the entity manager to run, generally on its own task. Events are persisted by
/// calling on the adapter's handler.
///
/// Commands are sent to a channel established for the entity manager.
/// Effects may be produced as a result of performing a command, which may,
/// in turn, perform side effects and yield events.
///
/// A hasher for entity ids can also be supplied which will be used to control the
/// internal caching of entities.
pub async fn run_with_hasher<A, B, S>(
///
/// * `entity_capacity` declares size of the number of entities to cache in memory at one time,
/// and will panic at runtime if zero.
pub async fn task_with_hasher<A, B, S>(
behavior: B,
mut adapter: A,
mut receiver: Receiver<Message<B::Command>>,
capacity: NonZeroUsize,
entity_capacity: usize,
hash_builder: S,
) -> io::Result<()>
where
Expand All @@ -177,7 +197,7 @@ where
// Source our initial events and populate our internal entities map.

let mut entities = EntityLruCache {
cache: LruCache::with_hasher(capacity, hash_builder),
cache: LruCache::with_hasher(NonZeroUsize::new(entity_capacity).unwrap(), hash_builder),
};

let envelopes = adapter.source_initial().await?;
Expand Down Expand Up @@ -490,14 +510,9 @@ mod tests {
captured_events: temp_sensor_events,
};

let (temp_sensor, temp_sensor_receiver) = mpsc::channel(10);

let entity_manager_task = tokio::spawn(run(
temp_sensor_behavior,
temp_sensor_event_adapter,
temp_sensor_receiver,
NonZeroUsize::new(1).unwrap(),
));
let (entity_manager_task, temp_sensor) =
task(temp_sensor_behavior, temp_sensor_event_adapter, 10, 1);
let entity_manager_task = tokio::spawn(entity_manager_task);

// Send a command to update the temperature and wait until it is done. We then wait
// on a noification from within our entity that the update has occurred. Waiting on
Expand Down
Loading

0 comments on commit 4891e6f

Please sign in to comment.