Skip to content

Commit

Permalink
Improve effects (#114)
Browse files Browse the repository at this point in the history
* Provide a test for emitting and reply

A common combo of effects

* "and" should always call its RHS

The RHS will determine what to do with the result.

* Simplify with `then_reply`

`.and_then_reply` has been simplified with a `then_reply` which will only be called when the previous result was successful, and only the state is available to make a decision with. The closure is deliberately not async as it commonly isn't require for this use case.

More complex reply scenarios can still be achieved by using the all-flexible `.and_then` combinator.

* Simplify the and_then_emit_event combinator

Done now in a similar fashion to the `then_reply` combinator.

* Doco
  • Loading branch information
huntc authored Nov 9, 2023
1 parent 1810a90 commit 8ac4a5d
Showing 1 changed file with 215 additions and 39 deletions.
254 changes: 215 additions & 39 deletions akka-persistence-rs/src/effect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,18 @@
//! to be applied (run) before the next command for an entity id is processed.
//!
//! Convience methods are providing for commonly chained operations, and take
//! the form of `and_then` as the prefix. By Rust convention, `and_then`
//! the form of `and_then` and `then` as the prefix. By Rust convention, `and_then`
//! provides the result of the previous operation and expects a result provided
//! given some closure.
//! given some closure. Also by convention, `then` is applied only when the previous operation
//! completed successfully.
//!
//! In the case where there is no convenience method, a generalized `and`
//! operation can be used to chain any effect found here, or a customized
//! effect.

use async_trait::async_trait;
use chrono::Utc;
use std::future::{self, Ready};
use std::result::Result as StdResult;
use std::{future::Future, io, marker::PhantomData};
use tokio::sync::oneshot;
Expand Down Expand Up @@ -87,13 +89,9 @@ where
prev_result,
)
.await;
if r.is_ok() {
self._r
.process(behavior, handler, entities, entity_id, last_seq_nr, r)
.await
} else {
r
}
self._r
.process(behavior, handler, entities, entity_id, last_seq_nr, r)
.await
}
}

Expand Down Expand Up @@ -150,13 +148,37 @@ where
/// An effect to emit an event. The latest state given any previous effect
/// having emitted an event, or else the state at the outset of the effects
/// being applied, is also available.
fn and_then_emit_event<F, R>(self, f: F) -> And<B, Self, ThenEmitEvent<B, F, R>>
///
/// Only applied when the previous result succeeded.
#[allow(clippy::type_complexity)]
fn then_emit_event<F>(
self,
f: F,
) -> And<
B,
Self,
ThenEmitEvent<
B,
Box<
dyn FnOnce(
&B,
Option<&B::State>,
Result,
) -> Ready<StdResult<Option<B::Event>, Error>>
+ Send,
>,
Ready<StdResult<Option<B::Event>, Error>>,
>,
>
where
Self: Sized,
B::State: Send + Sync,
F: FnOnce(&B, Option<&B::State>, Result) -> R + Send,
R: Future<Output = StdResult<Option<B::Event>, Error>> + Send,
F: FnOnce(Option<&B::State>) -> Option<B::Event> + Send + 'static,
{
let f = Box::new(|_b: &B, s: Option<&B::State>, r: Result| {
let r = if let Err(e) = r { Err(e) } else { Ok(f(s)) };
future::ready(r)
});
And {
_l: self,
_r: ThenEmitEvent {
Expand All @@ -168,38 +190,35 @@ where
}
}

/// An effect to emit a deletion event. The latest state given any previous effect
/// having emitted an event, or else the state at the outset of the effects
/// being applied, is also available.
fn and_then_emit_deletion_event<F, R>(self, f: F) -> And<B, Self, ThenEmitEvent<B, F, R>>
where
Self: Sized,
B::State: Send + Sync,
F: FnOnce(&B, Option<&B::State>, Result) -> R + Send,
R: Future<Output = StdResult<Option<B::Event>, Error>> + Send,
{
And {
_l: self,
_r: ThenEmitEvent {
deletion_event: true,
f: Some(f),
phantom: PhantomData,
},
phantom: PhantomData,
}
}

/// An effect to reply an envelope. The latest state given any previous effect
/// having emitted an event, or else the state at the outset of the effects
/// being applied, is also available.
fn and_then_reply<F, R, T>(self, f: F) -> And<B, Self, ThenReply<B, F, R, T>>
///
/// Only applied when the previous result succeeded.
#[allow(clippy::type_complexity)]
fn then_reply<F, T>(
self,
f: F,
) -> And<
B,
Self,
ThenReply<
B,
Box<dyn FnOnce(&B, Option<&B::State>, Result) -> Ready<ReplyResult<T>> + Send>,
Ready<ReplyResult<T>>,
T,
>,
>
where
Self: Sized,
B::State: Send + Sync,
F: FnOnce(&B, Option<&B::State>, Result) -> R + Send,
R: Future<Output = StdResult<Option<(oneshot::Sender<T>, T)>, Error>> + Send,
F: FnOnce(Option<&B::State>) -> Option<ReplyTo<T>> + Send + 'static,
T: Send,
{
let f = Box::new(|_b: &B, s: Option<&B::State>, r: Result| {
let r = if let Err(e) = r { Err(e) } else { Ok(f(s)) };
future::ready(r)
});
And {
_l: self,
_r: ThenReply {
Expand Down Expand Up @@ -303,9 +322,16 @@ where
}
}

/// The reply-to [oneshot::Sender] and the value to send as a result of some
/// tuple.
pub type ReplyTo<T> = (oneshot::Sender<T>, T);

/// A result type for [ReplyTo] which is used to reply if wrapped with [Some].
pub type ReplyResult<T> = StdResult<Option<ReplyTo<T>>, Error>;

/// The return type of [reply].
pub struct Reply<B, T> {
replier: Option<(oneshot::Sender<T>, T)>,
replier: Option<ReplyTo<T>>,
phantom: PhantomData<B>,
}

Expand Down Expand Up @@ -481,7 +507,7 @@ where
B: EventSourcedBehavior + Send + Sync + 'static,
B::State: Send + Sync,
F: FnOnce(&B, Option<&B::State>, Result) -> R + Send,
R: Future<Output = StdResult<Option<(oneshot::Sender<T>, T)>, Error>> + Send,
R: Future<Output = ReplyResult<T>> + Send,
T: Send,
{
async fn process(
Expand Down Expand Up @@ -518,7 +544,7 @@ where
B: EventSourcedBehavior + Send + Sync + 'static,
B::State: Send + Sync,
F: FnOnce(&B, Option<&B::State>, Result) -> R + Send,
R: Future<Output = StdResult<Option<(oneshot::Sender<T>, T)>, Error>> + Send,
R: Future<Output = ReplyResult<T>> + Send,
T: Send,
{
}
Expand Down Expand Up @@ -552,3 +578,153 @@ pub fn unhandled<B>() -> Box<Unhandled<B>> {
phantom: PhantomData,
})
}

#[cfg(test)]
mod tests {

use super::*;

use crate::entity::Context;
use test_log::test;

#[derive(Default)]
struct TestState;

struct TestCommand;

#[derive(Copy, Clone, Debug, PartialEq)]
struct TestEvent;

struct TestBehavior;

impl EventSourcedBehavior for TestBehavior {
type State = TestState;
type Command = TestCommand;
type Event = TestEvent;

fn for_command(
_context: &Context,
_state: &Self::State,
_command: Self::Command,
) -> Box<dyn Effect<Self>> {
todo!()
}

fn on_event(_context: &Context, _state: &mut Self::State, _event: Self::Event) {
todo!()
}
}

struct TestHandler {
expected: EventEnvelope<TestEvent>,
}

#[async_trait]
impl Handler<TestEvent> for TestHandler {
async fn process(
&mut self,
envelope: EventEnvelope<TestEvent>,
) -> io::Result<EventEnvelope<TestEvent>> {
assert_eq!(envelope.deletion_event, self.expected.deletion_event);
assert_eq!(envelope.entity_id, self.expected.entity_id);
assert_eq!(envelope.seq_nr, self.expected.seq_nr);
assert_eq!(envelope.event, self.expected.event);
Ok(envelope)
}
}

struct TestEntityOps {
expected_get_entity_id: EntityId,
get_result: TestState,
expected_update: EventEnvelope<TestEvent>,
}

impl EntityOps<TestBehavior> for TestEntityOps {
fn get(&mut self, entity_id: &EntityId) -> Option<&TestState> {
assert_eq!(entity_id, &self.expected_get_entity_id);
Some(&self.get_result)
}

fn update(&mut self, envelope: EventEnvelope<TestEvent>) -> u64 {
assert_eq!(envelope.deletion_event, self.expected_update.deletion_event);
assert_eq!(envelope.entity_id, self.expected_update.entity_id);
assert_eq!(envelope.seq_nr, self.expected_update.seq_nr);
assert_eq!(envelope.event, self.expected_update.event);
envelope.seq_nr
}
}

#[test(tokio::test)]
async fn test_emit_then_reply() {
let entity_id = EntityId::from("entity-id");
let expected = EventEnvelope {
deletion_event: false,
entity_id: entity_id.clone(),
seq_nr: 1,
event: TestEvent,
timestamp: Utc::now(),
};
let mut handler = TestHandler {
expected: expected.clone(),
};
let mut entity_ops = TestEntityOps {
expected_get_entity_id: entity_id.clone(),
get_result: TestState,
expected_update: expected,
};
let (reply_to, reply_to_receiver) = oneshot::channel();
let reply_value = 1;

assert!(emit_event(TestEvent)
.then_reply(move |_s| Some((reply_to, reply_value)))
.process(
&TestBehavior,
&mut handler,
&mut entity_ops,
&entity_id,
&mut 0,
Ok(()),
)
.await
.is_ok());

assert_eq!(reply_to_receiver.await, Ok(reply_value));
}

#[test(tokio::test)]
async fn test_reply_then_emit() {
let entity_id = EntityId::from("entity-id");
let expected = EventEnvelope {
deletion_event: false,
entity_id: entity_id.clone(),
seq_nr: 1,
event: TestEvent,
timestamp: Utc::now(),
};
let mut handler = TestHandler {
expected: expected.clone(),
};
let mut entity_ops = TestEntityOps {
expected_get_entity_id: entity_id.clone(),
get_result: TestState,
expected_update: expected,
};
let (reply_to, reply_to_receiver) = oneshot::channel();
let reply_value = 1;

assert!(reply(reply_to, reply_value)
.then_emit_event(|_s| Some(TestEvent))
.process(
&TestBehavior,
&mut handler,
&mut entity_ops,
&entity_id,
&mut 0,
Ok(()),
)
.await
.is_ok());

assert_eq!(reply_to_receiver.await, Ok(reply_value));
}
}

0 comments on commit 8ac4a5d

Please sign in to comment.