diff --git a/akka-persistence-rs-commitlog/src/lib.rs b/akka-persistence-rs-commitlog/src/lib.rs index 164aeba..cfe5ba9 100644 --- a/akka-persistence-rs-commitlog/src/lib.rs +++ b/akka-persistence-rs-commitlog/src/lib.rs @@ -598,7 +598,7 @@ pub mod cbor { #[cfg(test)] mod tests { - use std::{env, fs, num::NonZeroUsize, time::Duration}; + use std::{env, fs, time::Duration}; use super::*; use akka_persistence_rs::{entity::EventSourcedBehavior, entity_manager}; @@ -606,7 +606,7 @@ mod tests { use streambed::commit_log::{Header, HeaderKey}; use streambed_logged::FileLog; use test_log::test; - use tokio::{sync::mpsc, time}; + use tokio::time; // Scaffolding @@ -831,17 +831,7 @@ mod tests { Topic::from("some-topic"), ); - let my_behavior = MyBehavior; - - let (_, my_command_receiver) = mpsc::channel(10); - - assert!(entity_manager::run( - my_behavior, - file_log_topic_adapter, - my_command_receiver, - NonZeroUsize::new(1).unwrap(), - ) - .await - .is_ok()); + let (entity_manager, _) = entity_manager::task(MyBehavior, file_log_topic_adapter, 10, 1); + assert!(entity_manager.await.is_ok()); } } diff --git a/akka-persistence-rs/benches/benches.rs b/akka-persistence-rs/benches/benches.rs index ce4d972..4728f0a 100644 --- a/akka-persistence-rs/benches/benches.rs +++ b/akka-persistence-rs/benches/benches.rs @@ -1,4 +1,4 @@ -use std::{io, num::NonZeroUsize, pin::Pin, sync::Arc}; +use std::{io, pin::Pin, sync::Arc}; use akka_persistence_rs::{ effect::{persist_event, Effect, EffectExt}, @@ -8,7 +8,7 @@ use akka_persistence_rs::{ }; use async_trait::async_trait; use criterion::{criterion_group, criterion_main, Criterion}; -use tokio::sync::{mpsc, Notify}; +use tokio::sync::Notify; use tokio_stream::Stream; const NUM_EVENTS: usize = 10_000; @@ -81,16 +81,16 @@ fn criterion_benchmark(c: &mut Criterion) { .unwrap(); let events_processed = Arc::new(Notify::new()); - let (sender, receiver) = mpsc::channel(10); - let _ = rt.spawn(entity_manager::run( + let (task, sender) = entity_manager::task( Behavior, Adapter { event_count: 0, events_processed: events_processed.clone(), }, - receiver, - NonZeroUsize::new(1).unwrap(), - )); + 10, + 1, + ); + let _ = rt.spawn(task); b.to_async(&rt).iter(|| { let task_events_processed = events_processed.clone(); diff --git a/akka-persistence-rs/src/entity_manager.rs b/akka-persistence-rs/src/entity_manager.rs index d10ddee..76e1794 100644 --- a/akka-persistence-rs/src/entity_manager.rs +++ b/akka-persistence-rs/src/entity_manager.rs @@ -11,10 +11,12 @@ use chrono::Utc; use log::debug; use log::warn; use lru::LruCache; +use std::future::Future; use std::hash::BuildHasher; use std::io; use std::num::NonZeroUsize; use std::pin::Pin; +use tokio::sync::mpsc; use tokio::sync::mpsc::Receiver; use tokio_stream::{Stream, StreamExt}; @@ -118,8 +120,10 @@ where } } -/// Manages the lifecycle of entities given a specific behavior. -/// Entity managers are established given an adapter of persistent events associated +/// Provides an asynchronous task and a command channel that can run and drive an entity manager. +/// +/// Entity managers manage the lifecycle of entities given a specific behavior. +/// They are established given an adapter of persistent events associated /// with an entity type. That source is consumed by subsequently telling /// the entity manager to run, generally on its own task. Events are persisted by /// calling on the adapter's handler. @@ -127,32 +131,45 @@ where /// Commands are sent to a channel established for the entity manager. /// Effects may be produced as a result of performing a command, which may, /// in turn, perform side effects and yield events. -pub async fn run( +/// +/// * `command_capacity` declares size of the command channel and will panic at runtime if zero. +/// * `entity_capacity` declares size of the number of entities to cache in memory at one time, +/// and will panic at runtime if zero. +pub fn task( behavior: B, adapter: A, - receiver: Receiver>, - capacity: NonZeroUsize, -) -> io::Result<()> + command_capacity: usize, + entity_capacity: usize, +) -> ( + impl Future>, + mpsc::Sender>, +) where B: EventSourcedBehavior + Send + Sync + 'static, B::Command: Send, B::State: Send + Sync, A: SourceProvider + Handler + Send + 'static, { - run_with_hasher( - behavior, - adapter, - receiver, - capacity, - lru::DefaultHasher::default(), + let (sender, receiver) = mpsc::channel(command_capacity); + ( + task_with_hasher( + behavior, + adapter, + receiver, + entity_capacity, + lru::DefaultHasher::default(), + ), + sender, ) - .await } -/// Manages the lifecycle of entities given a specific behavior. -/// Entity managers are established given a source of events associated +/// Provides an asynchronous task and a command channel that can run and drive an entity manager. +/// +/// Entity managers manage the lifecycle of entities given a specific behavior. +/// They are established given an adapter of persistent events associated /// with an entity type. That source is consumed by subsequently telling -/// the entity manager to run, generally on its own task. +/// the entity manager to run, generally on its own task. Events are persisted by +/// calling on the adapter's handler. /// /// Commands are sent to a channel established for the entity manager. /// Effects may be produced as a result of performing a command, which may, @@ -160,11 +177,14 @@ where /// /// A hasher for entity ids can also be supplied which will be used to control the /// internal caching of entities. -pub async fn run_with_hasher( +/// +/// * `entity_capacity` declares size of the number of entities to cache in memory at one time, +/// and will panic at runtime if zero. +pub async fn task_with_hasher( behavior: B, mut adapter: A, mut receiver: Receiver>, - capacity: NonZeroUsize, + entity_capacity: usize, hash_builder: S, ) -> io::Result<()> where @@ -177,7 +197,7 @@ where // Source our initial events and populate our internal entities map. let mut entities = EntityLruCache { - cache: LruCache::with_hasher(capacity, hash_builder), + cache: LruCache::with_hasher(NonZeroUsize::new(entity_capacity).unwrap(), hash_builder), }; let envelopes = adapter.source_initial().await?; @@ -490,14 +510,9 @@ mod tests { captured_events: temp_sensor_events, }; - let (temp_sensor, temp_sensor_receiver) = mpsc::channel(10); - - let entity_manager_task = tokio::spawn(run( - temp_sensor_behavior, - temp_sensor_event_adapter, - temp_sensor_receiver, - NonZeroUsize::new(1).unwrap(), - )); + let (entity_manager_task, temp_sensor) = + task(temp_sensor_behavior, temp_sensor_event_adapter, 10, 1); + let entity_manager_task = tokio::spawn(entity_manager_task); // Send a command to update the temperature and wait until it is done. We then wait // on a noification from within our entity that the update has occurred. Waiting on diff --git a/akka-projection-rs-commitlog/src/offset_store.rs b/akka-projection-rs-commitlog/src/offset_store.rs index 080b6b2..9f5046a 100644 --- a/akka-projection-rs-commitlog/src/offset_store.rs +++ b/akka-projection-rs-commitlog/src/offset_store.rs @@ -12,7 +12,7 @@ use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use smol_str::SmolStr; use std::{cmp::Ordering, sync::Arc}; -use std::{io, num::NonZeroUsize}; +use std::{future::Future, io}; use streambed::commit_log::{ConsumerRecord, Header, HeaderKey, Key, ProducerRecord, Topic}; use streambed_logged::{compaction::KeyBasedRetention, FileLog}; use tokio::sync::{mpsc, oneshot, watch, Notify}; @@ -271,89 +271,98 @@ pub type OffsetStoreId = SmolStr; use internal::*; -/// Runs an offset store. -pub async fn run( +/// Provides an asynchronous task and a command channel that can run and drive an offset store. +pub fn task( mut commit_log: FileLog, keys_expected: usize, offset_store_id: OffsetStoreId, - mut offset_store_receiver: mpsc::Receiver, -) -> io::Result<()> { - let events_entity_type = EntityType::from(offset_store_id.clone()); - let events_topic = Topic::from(offset_store_id.clone()); - - let (offset_store_entities, offset_store_entities_receiver) = mpsc::channel(keys_expected); - let (last_offset, last_offset_receiver) = watch::channel(None); - let ready = Arc::new(Notify::new()); - let task_ready = ready.clone(); - let offset_command_handler = async { - task_ready.notified().await; - task_ready.notify_one(); - while let Some(command) = offset_store_receiver.recv().await { - match command { - offset_store::Command::GetLastOffset { reply_to } => { - let _ = reply_to.send(last_offset_receiver.borrow().clone()); - } - offset_store::Command::GetOffset { - persistence_id, - reply_to, - } => { - let _ = offset_store_entities - .send(Message::new( - EntityId::from(persistence_id.jdk_string_hash().to_string()), - Command::Get { - persistence_id, - reply_to, - }, - )) - .await; - } - offset_store::Command::SaveOffset { - persistence_id, - offset, - } => { - let _ = offset_store_entities - .send(Message::new( - EntityId::from(persistence_id.jdk_string_hash().to_string()), - Command::Save { - persistence_id, - offset, - }, - )) - .await; +) -> ( + impl Future>, + mpsc::Sender, +) { + let (offset_store, mut offset_store_receiver) = mpsc::channel(1); + + let task = async move { + let events_entity_type = EntityType::from(offset_store_id.clone()); + let events_topic = Topic::from(offset_store_id.clone()); + + commit_log + .register_compaction(events_topic.clone(), KeyBasedRetention::new(keys_expected)) + .await + .unwrap(); + + let to_compaction_key = |_: &EntityId, event: &Event| -> Key { + let Event::Saved { persistence_id, .. } = event; + persistence_id.jdk_string_hash() as u64 + }; + + let file_log_topic_adapter = CommitLogTopicAdapter::new( + commit_log, + OffsetStoreEventMarshaller { + entity_type: events_entity_type, + to_compaction_key, + }, + &offset_store_id, + events_topic, + ); + + let (last_offset, last_offset_receiver) = watch::channel(None); + let ready = Arc::new(Notify::new()); + + let (entity_manager_runner, offset_store_entities) = entity_manager::task( + Behavior { + last_offset, + ready: ready.clone(), + }, + file_log_topic_adapter, + keys_expected, + keys_expected, + ); + + let offset_command_handler = async { + ready.notified().await; + ready.notify_one(); + while let Some(command) = offset_store_receiver.recv().await { + match command { + offset_store::Command::GetLastOffset { reply_to } => { + let _ = reply_to.send(last_offset_receiver.borrow().clone()); + } + offset_store::Command::GetOffset { + persistence_id, + reply_to, + } => { + let _ = offset_store_entities + .send(Message::new( + EntityId::from(persistence_id.jdk_string_hash().to_string()), + Command::Get { + persistence_id, + reply_to, + }, + )) + .await; + } + offset_store::Command::SaveOffset { + persistence_id, + offset, + } => { + let _ = offset_store_entities + .send(Message::new( + EntityId::from(persistence_id.jdk_string_hash().to_string()), + Command::Save { + persistence_id, + offset, + }, + )) + .await; + } } } - } - }; - - commit_log - .register_compaction(events_topic.clone(), KeyBasedRetention::new(keys_expected)) - .await - .unwrap(); + }; - let to_compaction_key = |_: &EntityId, event: &Event| -> Key { - let Event::Saved { persistence_id, .. } = event; - persistence_id.jdk_string_hash() as u64 + let (_, r2) = tokio::join!(offset_command_handler, entity_manager_runner); + r2 }; - - let file_log_topic_adapter = CommitLogTopicAdapter::new( - commit_log, - OffsetStoreEventMarshaller { - entity_type: events_entity_type, - to_compaction_key, - }, - &offset_store_id, - events_topic, - ); - - let entity_manager_runner = entity_manager::run( - Behavior { last_offset, ready }, - file_log_topic_adapter, - offset_store_entities_receiver, - NonZeroUsize::new(keys_expected).unwrap(), - ); - - let (_, r2) = tokio::join!(offset_command_handler, entity_manager_runner); - r2 + (task, offset_store) } #[cfg(test)] @@ -373,16 +382,12 @@ mod tests { println!("Writing to {}", logged_dir.to_string_lossy()); let commit_log = FileLog::new(logged_dir); - let (offset_store, offset_store_receiver) = mpsc::channel(1); // Shouldn't be any offsets right now - tokio::spawn(run( - commit_log, - 10, - OffsetStoreId::from("some-offset-id"), - offset_store_receiver, - )); + let (offset_store_task, offset_store) = + task(commit_log, 10, OffsetStoreId::from("some-offset-id")); + tokio::spawn(offset_store_task); let (reply_to, reply_to_receiver) = oneshot::channel(); offset_store diff --git a/akka-projection-rs-grpc/src/consumer.rs b/akka-projection-rs-grpc/src/consumer.rs index 3014d1c..484865e 100644 --- a/akka-projection-rs-grpc/src/consumer.rs +++ b/akka-projection-rs-grpc/src/consumer.rs @@ -23,8 +23,10 @@ use crate::proto; use crate::Envelope; use crate::StreamId; +/// Provides a source of gRPC events consumed from a remote producer. pub struct GrpcSourceProvider { - consumer_filters: Option>>, + consumer_filters: Option>>, + consumer_filters_receiver: Option>>, event_producer_channel: EP, slice_range: Range, stream_id: StreamId, @@ -36,6 +38,9 @@ where EP: Fn() -> EPR, EPR: Future>, { + /// Construct a new source with a closure that is able to construct + /// connections in any way that is required e.g. a secure connection using + /// TLS, or using UDP etc. pub fn new(event_producer_channel: EP, stream_id: StreamId) -> Self { let slice_range = akka_persistence_rs::slice_ranges(1); @@ -46,6 +51,8 @@ where ) } + /// If more than one slice range is required then it can + /// be conveyed here. pub fn with_slice_range( event_producer_channel: EP, stream_id: StreamId, @@ -53,6 +60,7 @@ where ) -> Self { Self { consumer_filters: None, + consumer_filters_receiver: None, event_producer_channel, slice_range, stream_id, @@ -60,13 +68,24 @@ where } } - pub fn with_consumer_filters( - mut self, - consumer_filters: watch::Receiver>, - ) -> Self { + /// Provide an initial filter, or any empty [Vec] if none + /// are required. If more filters are required to be sent + /// then [Self::consumer_filters] can be used to obtain the + /// means to do so. + pub fn with_initial_consumer_filters(mut self, consumer_filters: Vec) -> Self { + let (consumer_filters, consumer_filters_receiver) = watch::channel(consumer_filters); self.consumer_filters = Some(consumer_filters); + self.consumer_filters_receiver = Some(consumer_filters_receiver); self } + + /// Obtain the means to send filters dynamically. There can be + /// only one sender, so calls subsequent to the first one will + /// result in [None] being returned. Additionally, [None] is returned + /// if no initial filters are declared. + pub fn consumer_filters(&mut self) -> Option>> { + self.consumer_filters.take() + } } #[async_trait] @@ -131,7 +150,7 @@ where } }); - let stream_consumer_filters = self.consumer_filters.as_ref().cloned(); + let stream_consumer_filters = self.consumer_filters_receiver.as_ref().cloned(); let consumer_filters = stream! { if let Some(mut consumer_filters) = stream_consumer_filters { @@ -160,7 +179,7 @@ where slice_min: self.slice_range.start as i32, slice_max: self.slice_range.end as i32 - 1, offset, - filter: self.consumer_filters.as_ref().map_or( + filter: self.consumer_filters_receiver.as_ref().map_or( vec![], |consumer_filters| { consumer_filters @@ -446,21 +465,21 @@ mod tests { .unwrap(); }); - let (consumer_filters, consumer_filters_receiver) = - watch::channel(vec![FilterCriteria::IncludeEntityIds { - entity_id_offsets: vec![EntityIdOffset { - entity_id: persistence_id.entity_id.clone(), - seq_nr: 0, - }], - }]); + let initial_consumer_filters = vec![FilterCriteria::IncludeEntityIds { + entity_id_offsets: vec![EntityIdOffset { + entity_id: persistence_id.entity_id.clone(), + seq_nr: 0, + }], + }]; let channel = Channel::from_static("http://127.0.0.1:50051"); - let source_provider = GrpcSourceProvider::::new( + let mut source_provider = GrpcSourceProvider::::new( || channel.connect(), StreamId::from("some-string-id"), ) - .with_consumer_filters(consumer_filters_receiver); + .with_initial_consumer_filters(initial_consumer_filters); + let consumer_filters = source_provider.consumer_filters().unwrap(); assert!(consumer_filters .send(vec![consumer_filter::exclude_all()]) .is_ok()); diff --git a/akka-projection-rs-grpc/src/producer.rs b/akka-projection-rs-grpc/src/producer.rs index 2241b9f..3da381e 100644 --- a/akka-projection-rs-grpc/src/producer.rs +++ b/akka-projection-rs-grpc/src/producer.rs @@ -181,229 +181,251 @@ impl GrpcEventFlow { type Context = (Envelope, oneshot::Sender<()>); -/// Reliably stream event envelopes to a consumer. Event envelope transmission +/// Provides an asynchronous task and a kill switch that can run and stop a +/// a reliable stream event envelopes to a consumer. Event envelope transmission /// requests are sent over a channel and have a reply that is completed on the /// remote consumer's acknowledgement of receipt. -pub async fn run( +/// +/// The `max_in_flight` parameter determines the maximum number of events that +/// we can go unacknowledged at any time. Meeting this threshold will back-pressure the +/// production of events. +#[allow(clippy::type_complexity)] +pub fn task( event_consumer_channel: EC, origin_id: StreamId, stream_id: StreamId, - consumer_filters: watch::Sender>, - mut envelopes: mpsc::Receiver<(Envelope, oneshot::Sender<()>)>, - mut kill_switch: oneshot::Receiver<()>, -) where + max_in_flight: usize, +) -> ( + impl Future, + mpsc::Sender<(Envelope, oneshot::Sender<()>)>, + watch::Receiver>, + oneshot::Sender<()>, +) +where E: Clone + Name + 'static, EC: Fn() -> ECR + Send + Sync, ECR: Future> + Send, { - let mut delayer: Option = None; + let (envelopes, mut envelopes_receiver) = + mpsc::channel::<(Envelope, oneshot::Sender<()>)>(max_in_flight); - let mut in_flight: HashMap>> = HashMap::new(); + let (consumer_filters, consumer_filters_receiver) = watch::channel(vec![]); - 'outer: loop { - if let Err(oneshot::error::TryRecvError::Closed) = kill_switch.try_recv() { - debug!("gRPC producer killed."); - break; - } + let (kill_switch, mut kill_switch_receiver) = oneshot::channel(); - if let Some(d) = &mut delayer { - d.delay().await; - } else { - let mut d = Delayer::default(); - d.delay().await; - delayer = Some(d); - } + let task = async move { + let mut delayer: Option = None; - let mut connection = if let Ok(connection) = (event_consumer_channel)() - .await - .map(proto::event_consumer_service_client::EventConsumerServiceClient::new) - { - delayer = Some(Delayer::default()); - Some(connection) - } else { - None - }; + let mut in_flight: HashMap>> = + HashMap::with_capacity(max_in_flight); - if let Some(connection) = &mut connection { - let origin_id = origin_id.to_string(); - let stream_id = stream_id.to_string(); + 'outer: loop { + if let Err(oneshot::error::TryRecvError::Closed) = kill_switch_receiver.try_recv() { + debug!("gRPC producer killed."); + break; + } - let (event_in, mut event_in_receiver) = mpsc::unbounded_channel::>(); + if let Some(d) = &mut delayer { + d.delay().await; + } else { + let mut d = Delayer::default(); + d.delay().await; + delayer = Some(d); + } + + let mut connection = if let Ok(connection) = (event_consumer_channel)() + .await + .map(proto::event_consumer_service_client::EventConsumerServiceClient::new) + { + delayer = Some(Delayer::default()); + Some(connection) + } else { + None + }; - let request = Request::new(stream! { - yield proto::ConsumeEventIn { - message: Some(proto::consume_event_in::Message::Init( - proto::ConsumerEventInit { - origin_id, - stream_id, - }, - )), - }; + if let Some(connection) = &mut connection { + let origin_id = origin_id.to_string(); + let stream_id = stream_id.to_string(); - let ordinary_events_source = smol_str::SmolStr::from(""); + let (event_in, mut event_in_receiver) = mpsc::unbounded_channel::>(); - while let Some(envelope) = event_in_receiver.recv().await { - match envelope { - Envelope(Envelopes::Event(envelope)) => { - if let Ok(any) = Any::from_msg(&envelope.event) { + let request = Request::new(stream! { + yield proto::ConsumeEventIn { + message: Some(proto::consume_event_in::Message::Init( + proto::ConsumerEventInit { + origin_id, + stream_id, + }, + )), + }; + + let ordinary_events_source = smol_str::SmolStr::from(""); + + while let Some(envelope) = event_in_receiver.recv().await { + match envelope { + Envelope(Envelopes::Event(envelope)) => { + if let Ok(any) = Any::from_msg(&envelope.event) { + let timestamp = prost_types::Timestamp { + seconds: envelope.timestamp.timestamp(), + nanos: envelope.timestamp.timestamp_nanos_opt().unwrap_or_default() as i32 + }; + + yield proto::ConsumeEventIn { + message: Some(proto::consume_event_in::Message::Event( + proto::Event { + persistence_id: envelope.persistence_id.to_string(), + seq_nr: envelope.seq_nr as i64, + slice: envelope.persistence_id.slice() as i32, + offset: Some(proto::Offset { timestamp: Some(timestamp), seen: vec![] }), + payload: Some(any), + source: ordinary_events_source.to_string(), + metadata: None, + tags: vec![] + }, + )), + }; + } + } + Envelope(Envelopes::Filtered(envelope)) => { let timestamp = prost_types::Timestamp { seconds: envelope.timestamp.timestamp(), nanos: envelope.timestamp.timestamp_nanos_opt().unwrap_or_default() as i32 }; yield proto::ConsumeEventIn { - message: Some(proto::consume_event_in::Message::Event( - proto::Event { + message: Some(proto::consume_event_in::Message::FilteredEvent( + proto::FilteredEvent { persistence_id: envelope.persistence_id.to_string(), seq_nr: envelope.seq_nr as i64, slice: envelope.persistence_id.slice() as i32, offset: Some(proto::Offset { timestamp: Some(timestamp), seen: vec![] }), - payload: Some(any), source: ordinary_events_source.to_string(), - metadata: None, - tags: vec![] }, )), }; } - } - Envelope(Envelopes::Filtered(envelope)) => { - let timestamp = prost_types::Timestamp { - seconds: envelope.timestamp.timestamp(), - nanos: envelope.timestamp.timestamp_nanos_opt().unwrap_or_default() as i32 - }; - - yield proto::ConsumeEventIn { - message: Some(proto::consume_event_in::Message::FilteredEvent( - proto::FilteredEvent { - persistence_id: envelope.persistence_id.to_string(), - seq_nr: envelope.seq_nr as i64, - slice: envelope.persistence_id.slice() as i32, - offset: Some(proto::Offset { timestamp: Some(timestamp), seen: vec![] }), - source: ordinary_events_source.to_string(), - }, - )), - }; - } - Envelope(Envelopes::SourceOnly(_)) => { - warn!("Producing a source-only event envelope is not supported. Dropped."); + Envelope(Envelopes::SourceOnly(_)) => { + warn!("Producing a source-only event envelope is not supported. Dropped."); + } } } - } - }); - let result = connection.consume_event(request).await; - - if let Ok(response) = result { - let mut stream_outs = response.into_inner(); - - loop { - tokio::select! { - stream_out = stream_outs.next() => match stream_out { - Some(Ok(proto::ConsumeEventOut { message })) => - match message { - Some(proto::consume_event_out::Message::Start(proto::ConsumerEventStart { filter })) => { - debug!("Starting the protocol"); - let _ = consumer_filters.send( - filter - .into_iter() - .flat_map(|f| f.try_into()) - .collect(), - ); - break; - } - _ => { - warn!("Received a message before starting the protocol - ignoring event"); - } + }); + let result = connection.consume_event(request).await; + + if let Ok(response) = result { + let mut stream_outs = response.into_inner(); + + loop { + tokio::select! { + stream_out = stream_outs.next() => match stream_out { + Some(Ok(proto::ConsumeEventOut { message })) => + match message { + Some(proto::consume_event_out::Message::Start(proto::ConsumerEventStart { filter })) => { + debug!("Starting the protocol"); + let _ = consumer_filters.send( + filter + .into_iter() + .flat_map(|f| f.try_into()) + .collect(), + ); + break; + } + _ => { + warn!("Received a message before starting the protocol - ignoring event"); + } + }, + Some(Err(e)) => { + warn!("Encountered an error while waiting to start the protocol: {e:?}"); + continue 'outer; + } + None => { + continue 'outer; + } }, - Some(Err(e)) => { - warn!("Encountered an error while waiting to start the protocol: {e:?}"); - continue 'outer; - } - None => { - continue 'outer; - } - }, - _ = &mut kill_switch => { - debug!("gRPC producer killed."); - break 'outer + _ = &mut kill_switch_receiver => { + debug!("gRPC producer killed."); + break 'outer + } } } - } - for (_, contexts) in in_flight.iter() { - for (envelope, _) in contexts { - if event_in.send(envelope.clone()).is_err() { - continue 'outer; + for (_, contexts) in in_flight.iter() { + for (envelope, _) in contexts { + if event_in.send(envelope.clone()).is_err() { + continue 'outer; + } } } - } - loop { - tokio::select! { - request = envelopes.recv() => { - if let Some((envelope, reply_to)) = request { - let contexts = in_flight - .entry(envelope.persistence_id().clone()) - .or_default(); - contexts.push_back((envelope.clone(), reply_to)); + loop { + tokio::select! { + request = envelopes_receiver.recv() => { + if let Some((envelope, reply_to)) = request { + let contexts = in_flight + .entry(envelope.persistence_id().clone()) + .or_default(); + contexts.push_back((envelope.clone(), reply_to)); + + if event_in.send(envelope).is_err() { + continue 'outer; + } - if event_in.send(envelope).is_err() { + } else { continue 'outer; } - - } else { - continue 'outer; } - } - stream_out = stream_outs.next() => match stream_out { - Some(Ok(proto::ConsumeEventOut { message })) => match message { - Some(proto::consume_event_out::Message::Start(proto::ConsumerEventStart { .. })) => { - warn!("Received a protocol start when already started - ignoring"); - } - Some(proto::consume_event_out::Message::Ack(proto::ConsumerEventAck { persistence_id, seq_nr })) => { - if let Ok(persistence_id) = persistence_id.parse() { - if let Some(contexts) = in_flight.get_mut(&persistence_id) { - let seq_nr = seq_nr as u64; - while let Some((envelope, reply_to)) = contexts.pop_front() { - if seq_nr == envelope.seq_nr() && reply_to.send(()).is_ok() { - break; + stream_out = stream_outs.next() => match stream_out { + Some(Ok(proto::ConsumeEventOut { message })) => match message { + Some(proto::consume_event_out::Message::Start(proto::ConsumerEventStart { .. })) => { + warn!("Received a protocol start when already started - ignoring"); + } + Some(proto::consume_event_out::Message::Ack(proto::ConsumerEventAck { persistence_id, seq_nr })) => { + if let Ok(persistence_id) = persistence_id.parse() { + if let Some(contexts) = in_flight.get_mut(&persistence_id) { + let seq_nr = seq_nr as u64; + while let Some((envelope, reply_to)) = contexts.pop_front() { + if seq_nr == envelope.seq_nr() && reply_to.send(()).is_ok() { + break; + } + } + if contexts.is_empty() { + in_flight.remove(&persistence_id); } } - if contexts.is_empty() { - in_flight.remove(&persistence_id); - } + } else { + warn!("Received an event but could not parse the persistence id - ignoring event"); } - } else { - warn!("Received an event but could not parse the persistence id - ignoring event"); + } + None => { + warn!("Received an empty message while consuming replies - ignoring event"); } } - None => { - warn!("Received an empty message while consuming replies - ignoring event"); + + Some(Err(e)) => { + // Debug level because connection errors are normal. + debug!("Encountered an error while consuming replies: {e:?}"); + continue 'outer; } - } - Some(Err(e)) => { - // Debug level because connection errors are normal. - debug!("Encountered an error while consuming replies: {e:?}"); - continue 'outer; - } + None => { + continue 'outer; + } + }, - None => { - continue 'outer; + _ = &mut kill_switch_receiver => { + debug!("gRPC producer killed."); + break 'outer } - }, - - _ = &mut kill_switch => { - debug!("gRPC producer killed."); - break 'outer } } } } } - } + }; + + (task, envelopes, consumer_filters_receiver, kill_switch) } #[cfg(test)] @@ -501,24 +523,19 @@ mod tests { .unwrap(); }); - let (consumer_filters, mut consumer_filters_receiver) = watch::channel(vec![]); - let (sender, receiver) = mpsc::channel(10); - let (_task_kill_switch, task_kill_switch_receiver) = oneshot::channel(); - tokio::spawn(async move { - let channel = Channel::from_static("http://127.0.0.1:50052"); - let _ = run( - || channel.connect(), - OriginId::from("some-origin-id"), - StreamId::from("some-stream-id"), - consumer_filters, - receiver, - task_kill_switch_receiver, - ) - .await; - }); + let (task, envelopes, mut consumer_filters_receiver, _task_kill_switch) = task( + || { + let channel = Channel::from_static("http://127.0.0.1:50052"); + async move { channel.connect().await } + }, + OriginId::from("some-origin-id"), + StreamId::from("some-stream-id"), + 10, + ); + tokio::spawn(task); let (reply, reply_receiver) = oneshot::channel(); - assert!(sender + assert!(envelopes .send(( Envelope(Envelopes::Event(EventEnvelope { persistence_id: "entity-type|entity-id".parse().unwrap(), diff --git a/akka-projection-rs/benches/benches.rs b/akka-projection-rs/benches/benches.rs index 41e9eaa..b434add 100644 --- a/akka-projection-rs/benches/benches.rs +++ b/akka-projection-rs/benches/benches.rs @@ -10,7 +10,7 @@ use akka_projection_rs::{ use async_stream::stream; use async_trait::async_trait; use criterion::{criterion_group, criterion_main, Criterion}; -use tokio::sync::{mpsc, oneshot, Notify}; +use tokio::sync::{mpsc, Notify}; use tokio_stream::Stream; const NUM_EVENTS: usize = 10_000; @@ -106,24 +106,18 @@ fn criterion_benchmark(c: &mut Criterion) { .unwrap(); let events_processed = Arc::new(Notify::new()); - let (_registration_projection_command, registration_projection_command_receiver) = - oneshot::channel(); - - let task_events_processed = events_processed.clone(); - let _ = rt.spawn(async move { - let (offset_store, mut offset_store_receiver) = mpsc::channel(1); - let offset_store_task = - async { while let Some(_) = offset_store_receiver.recv().await {} }; - let projection_task = consumer::run( - offset_store, - registration_projection_command_receiver, - TestSourceProvider, - TestHandler { - events_processed: task_events_processed, - }, - ); - tokio::join!(offset_store_task, projection_task) - }); + let (offset_store, mut offset_store_receiver) = mpsc::channel(1); + let offset_store_task = + async move { while let Some(_) = offset_store_receiver.recv().await {} }; + let (projection_task, _kill_switch) = consumer::task( + offset_store, + TestSourceProvider, + TestHandler { + events_processed: events_processed.clone(), + }, + ); + + let _ = rt.spawn(async move { tokio::join!(offset_store_task, projection_task) }); b.to_async(&rt).iter(|| { let task_events_processed = events_processed.clone(); diff --git a/akka-projection-rs/src/consumer.rs b/akka-projection-rs/src/consumer.rs index 0bec2e1..445e7ce 100644 --- a/akka-projection-rs/src/consumer.rs +++ b/akka-projection-rs/src/consumer.rs @@ -20,15 +20,17 @@ struct StorableState { last_offset: Option, } -/// Provides at-least-once projections with storage for projection offsets, +/// Provides an asynchronous task and a kill switch that can run and stop a projection. +/// +/// An at-least-once projection is returned with storage for projection offsets, /// meaning, for multiple runs of a projection, it is possible for events to repeat /// from previous runs. -pub async fn run( +pub fn task( offset_store: mpsc::Sender, - mut kill_switch: oneshot::Receiver<()>, source_provider: SP, handler: IH, -) where +) -> (impl Future, oneshot::Sender<()>) +where A: Handler + Send, B: PendingHandler + Send, EE: TryFrom, @@ -36,165 +38,172 @@ pub async fn run( IH: Into>, SP: SourceProvider, { - let mut handler = handler.into(); - - let mut always_pending_handler: Pin> + Send>> = - Box::pin(future::pending()); - - 'outer: loop { - let mut source = source_provider - .source(|| async { - let (reply_to, reply_to_receiver) = oneshot::channel(); - offset_store - .send(offset_store::Command::GetLastOffset { reply_to }) - .await - .ok()?; - reply_to_receiver.await.ok()? - }) - .await; - - let mut always_pending_source: Pin + Send>> = - Box::pin(stream::pending()); + let (kill_switch, mut kill_switch_receiver) = oneshot::channel(); + + let task = async move { + let mut handler = handler.into(); + + let mut always_pending_handler: Pin< + Box> + Send>, + > = Box::pin(future::pending()); + + 'outer: loop { + let mut source = source_provider + .source(|| async { + let (reply_to, reply_to_receiver) = oneshot::channel(); + offset_store + .send(offset_store::Command::GetLastOffset { reply_to }) + .await + .ok()?; + reply_to_receiver.await.ok()? + }) + .await; - let mut active_source = &mut source; + let mut always_pending_source: Pin + Send>> = + Box::pin(stream::pending()); - let mut handler_futures = VecDeque::with_capacity(B::MAX_PENDING); + let mut active_source = &mut source; - loop { - tokio::select! { - envelope = active_source.next() => { - if let Some(envelope) = envelope { - let persistence_id = envelope.persistence_id().clone(); - let offset = envelope.offset(); + let mut handler_futures = VecDeque::with_capacity(B::MAX_PENDING); - // Validate timestamp offsets if we have one. - let envelope = if matches!(offset, Offset::Timestamp(_)) { - // Process the sequence number. If it isn't what we expect then we go round again. + loop { + tokio::select! { + envelope = active_source.next() => { + if let Some(envelope) = envelope { + let persistence_id = envelope.persistence_id().clone(); + let offset = envelope.offset(); - let seq_nr = envelope.seq_nr(); + // Validate timestamp offsets if we have one. + let envelope = if matches!(offset, Offset::Timestamp(_)) { + // Process the sequence number. If it isn't what we expect then we go round again. - let (reply_to, reply_to_receiver) = oneshot::channel(); - if offset_store - .send(offset_store::Command::GetOffset { persistence_id: persistence_id.clone(), reply_to }) - .await - .is_err() - { - warn!("Cannot send to the offset store: {}. Aborting stream.", persistence_id); - break; - } + let seq_nr = envelope.seq_nr(); - let next_seq_nr = if let Ok(offset) = reply_to_receiver.await { - if let Some(Offset::Timestamp(TimestampOffset { seq_nr, .. })) = offset { - seq_nr.wrapping_add(1) - } else { - 1 + let (reply_to, reply_to_receiver) = oneshot::channel(); + if offset_store + .send(offset_store::Command::GetOffset { persistence_id: persistence_id.clone(), reply_to }) + .await + .is_err() + { + warn!("Cannot send to the offset store: {}. Aborting stream.", persistence_id); + break; } - } else { - warn!("Cannot receive from the offset store: {}. Aborting stream.", persistence_id); - break - }; - let source = envelope.source(); + let next_seq_nr = if let Ok(offset) = reply_to_receiver.await { + if let Some(Offset::Timestamp(TimestampOffset { seq_nr, .. })) = offset { + seq_nr.wrapping_add(1) + } else { + 1 + } + } else { + warn!("Cannot receive from the offset store: {}. Aborting stream.", persistence_id); + break + }; - if seq_nr > next_seq_nr && envelope.source() == Source::Backtrack { - // This shouldn't happen, if so then abort. - warn!("Back track received for a future event: {}. Aborting stream.", persistence_id); - break; - } else if seq_nr != next_seq_nr { - // Duplicate or gap - continue; - } + let source = envelope.source(); - // If the sequence number is what we expect and the producer is backtracking, then - // request its payload. If we can't get its payload then we abort as it is an error. + if seq_nr > next_seq_nr && envelope.source() == Source::Backtrack { + // This shouldn't happen, if so then abort. + warn!("Back track received for a future event: {}. Aborting stream.", persistence_id); + break; + } else if seq_nr != next_seq_nr { + // Duplicate or gap + continue; + } - let resolved_envelope = if source == Source::Backtrack { - if let Some(event) = source_provider.load_envelope(persistence_id.clone(), seq_nr) - .await - { - Some(event) + // If the sequence number is what we expect and the producer is backtracking, then + // request its payload. If we can't get its payload then we abort as it is an error. + + let resolved_envelope = if source == Source::Backtrack { + if let Some(event) = source_provider.load_envelope(persistence_id.clone(), seq_nr) + .await + { + Some(event) + } else { + warn!("Cannot obtain an backtrack envelope: {}. Aborting stream.", persistence_id); + None + } } else { - warn!("Cannot obtain an backtrack envelope: {}. Aborting stream.", persistence_id); - None - } + Some(envelope) + }; + + let Some(envelope) = resolved_envelope else { break; }; + envelope } else { - Some(envelope) + envelope }; - let Some(envelope) = resolved_envelope else { break; }; - envelope - } else { - envelope - }; + // We now have an event correctly sequenced. Process it. - // We now have an event correctly sequenced. Process it. - - match &mut handler { - Handlers::Ready(handler, _) => { - if let Ok(event_envelope) = envelope.try_into() { - if handler.process(event_envelope).await.is_err() { + match &mut handler { + Handlers::Ready(handler, _) => { + if let Ok(event_envelope) = envelope.try_into() { + if handler.process(event_envelope).await.is_err() { + break; + } + } + if offset_store + .send(offset_store::Command::SaveOffset { persistence_id, offset }) + .await + .is_err() + { break; } } - if offset_store - .send(offset_store::Command::SaveOffset { persistence_id, offset }) - .await - .is_err() - { - break; - } - } - Handlers::Pending(handler, _) => { - let pending = if let Ok(event_envelope) = envelope.try_into() { - let Ok(pending) = handler.process_pending(event_envelope).await else { - break; + Handlers::Pending(handler, _) => { + let pending = if let Ok(event_envelope) = envelope.try_into() { + let Ok(pending) = handler.process_pending(event_envelope).await else { + break; + }; + pending + } else { + Box::pin(future::ready(Ok(()))) }; - pending - } else { - Box::pin(future::ready(Ok(()))) - }; - handler_futures.push_back((pending, persistence_id, offset)); - // If we've reached the limit on the pending futures in-flight - // then back off sourcing more. - if handler_futures.len() == B::MAX_PENDING { - active_source = &mut always_pending_source; - } - } + handler_futures.push_back((pending, persistence_id, offset)); + // If we've reached the limit on the pending futures in-flight + // then back off sourcing more. + if handler_futures.len() == B::MAX_PENDING { + active_source = &mut always_pending_source; + } + } + } + } else { + break; } - } else { - break; } - } - pending = handler_futures.get_mut(0).map_or_else(|| &mut always_pending_handler, |(f, _, _)| f) => { - // A pending future will never complete so this MUST mean that we have a element in our queue. - let (_, persistence_id, offset) = handler_futures.pop_front().unwrap(); - - // We've freed up a slot on the pending futures in-flight, so allow more events to be received. - active_source = &mut source; - - // If all is well with our pending future so we can finally cause the offset to be persisted. - if pending.is_err() - || offset_store - .send(offset_store::Command::SaveOffset { persistence_id, offset }) - .await - .is_err() - { - break; + pending = handler_futures.get_mut(0).map_or_else(|| &mut always_pending_handler, |(f, _, _)| f) => { + // A pending future will never complete so this MUST mean that we have a element in our queue. + let (_, persistence_id, offset) = handler_futures.pop_front().unwrap(); + + // We've freed up a slot on the pending futures in-flight, so allow more events to be received. + active_source = &mut source; + + // If all is well with our pending future so we can finally cause the offset to be persisted. + if pending.is_err() + || offset_store + .send(offset_store::Command::SaveOffset { persistence_id, offset }) + .await + .is_err() + { + break; + } } - } - _ = &mut kill_switch => { - debug!("storage killed."); - break 'outer; - } + _ = &mut kill_switch_receiver => { + debug!("storage killed."); + break 'outer; + } - else => { - break 'outer; + else => { + break 'outer; + } } } } - } + }; + + (task, kill_switch) } #[cfg(test)] @@ -383,29 +392,23 @@ mod tests { event_value: String, handler: IH, ) where - A: Handler + Send, - B: PendingHandler + Send, + A: Handler + Send + 'static, + B: PendingHandler + Send + 'static, IH: Into> + Send + 'static, { // Process an event. - let (_registration_projection_command, registration_projection_command_receiver) = - oneshot::channel(); - let (offset_store, mut offset_store_receiver) = mpsc::channel(1); let task_persistence_id = persistence_id.clone(); - tokio::spawn(async move { - run( - offset_store, - registration_projection_command_receiver, - MySourceProvider { - persistence_id: task_persistence_id.clone(), - event_value: event_value.clone(), - }, - handler, - ) - .await - }); + let (projection_task, _kill_switch) = task( + offset_store, + MySourceProvider { + persistence_id: task_persistence_id.clone(), + event_value: event_value.clone(), + }, + handler, + ); + tokio::spawn(projection_task); if let Some(offset_store::Command::GetLastOffset { reply_to }) = offset_store_receiver.recv().await