Skip to content

Commit

Permalink
Merge pull request #165 from azriel91/feature/164/add-item-stream-out…
Browse files Browse the repository at this point in the history
…come
  • Loading branch information
azriel91 authored Dec 28, 2023
2 parents bebd99c + 2094d93 commit 1a1653f
Show file tree
Hide file tree
Showing 18 changed files with 528 additions and 86 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@
* Change `CmdOutcome` to be an `enum` indicating whether it is completed, interrupted, or erroneous. ([#141], [#163])
* Add `CmdBlock` trait to encompass one function for all items. ([#141], [#163])
* Add interruptibility support using [`interruptible`] through `CmdCtxBuilder::with_interruptibility`. ([#141], [#163])
* Add `ItemStreamOutcome` to track which `Item`s are processed or not processed. ([#164], [#165])


[`interruptible`]: https://github.com/azriel91/interruptible
[#141]: https://github.com/azriel91/peace/issues/141
[#163]: https://github.com/azriel91/peace/pull/163
[#164]: https://github.com/azriel91/peace/issues/164
[#165]: https://github.com/azriel91/peace/pull/165


## 0.0.11 (2023-06-27)
Expand Down
61 changes: 30 additions & 31 deletions crate/cmd_model/src/cmd_outcome.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use fn_graph::StreamOutcome;
use futures::Future;
use indexmap::IndexMap;
use peace_cfg::ItemId;

use crate::CmdBlockDesc;
use crate::{CmdBlockDesc, ItemStreamOutcome};

/// Outcome of a [`CmdExecution`].
///
Expand All @@ -26,7 +25,7 @@ pub enum CmdOutcome<T, E> {
/// Execution ended due to an interruption during command block execution.
BlockInterrupted {
/// The stream outcome of the interrupted command block.
stream_outcome: StreamOutcome<T>,
item_stream_outcome: ItemStreamOutcome<T>,
/// Descriptors of the `CmdBlock`s that were processed.
///
/// This does not include the `CmdBlock` that was interrupted.
Expand All @@ -48,10 +47,10 @@ pub enum CmdOutcome<T, E> {
/// Execution ended due to one or more item errors.
///
/// It is also possible for the stream to be interrupted when an error
/// occurs, so the value is wrapped in a `StreamOutcome`.
/// occurs, so the value is wrapped in a `ItemStreamOutcome`.
ItemError {
/// The outcome value.
stream_outcome: StreamOutcome<T>,
item_stream_outcome: ItemStreamOutcome<T>,
/// Descriptors of the `CmdBlock`s that were processed.
///
/// This does not include the `CmdBlock` that erred.
Expand All @@ -73,21 +72,21 @@ impl<T, E> CmdOutcome<T, E> {
cmd_blocks_processed: _,
} => Some(value),
CmdOutcome::BlockInterrupted {
stream_outcome,
item_stream_outcome,
cmd_blocks_processed: _,
cmd_blocks_not_processed: _,
} => Some(stream_outcome.value()),
} => Some(item_stream_outcome.value()),
CmdOutcome::ExecutionInterrupted {
value,
cmd_blocks_processed: _,
cmd_blocks_not_processed: _,
} => value.as_ref(),
CmdOutcome::ItemError {
stream_outcome,
item_stream_outcome,
cmd_blocks_processed: _,
cmd_blocks_not_processed: _,
errors: _,
} => Some(stream_outcome.value()),
} => Some(item_stream_outcome.value()),
}
}

Expand Down Expand Up @@ -126,13 +125,13 @@ impl<T, E> CmdOutcome<T, E> {
}
}
Self::BlockInterrupted {
stream_outcome,
item_stream_outcome,
cmd_blocks_processed,
cmd_blocks_not_processed,
} => {
let stream_outcome = stream_outcome.map(f);
let item_stream_outcome = item_stream_outcome.map(f);
CmdOutcome::BlockInterrupted {
stream_outcome,
item_stream_outcome,
cmd_blocks_processed,
cmd_blocks_not_processed,
}
Expand All @@ -150,14 +149,14 @@ impl<T, E> CmdOutcome<T, E> {
}
}
Self::ItemError {
stream_outcome,
item_stream_outcome,
cmd_blocks_processed,
cmd_blocks_not_processed,
errors,
} => {
let stream_outcome = stream_outcome.map(f);
let item_stream_outcome = item_stream_outcome.map(f);
CmdOutcome::ItemError {
stream_outcome,
item_stream_outcome,
cmd_blocks_processed,
cmd_blocks_not_processed,
errors,
Expand Down Expand Up @@ -185,15 +184,15 @@ impl<T, E> CmdOutcome<T, E> {
}
}
Self::BlockInterrupted {
stream_outcome,
item_stream_outcome,
cmd_blocks_processed,
cmd_blocks_not_processed,
} => {
let (stream_outcome, value) = stream_outcome.replace(());
let (item_stream_outcome, value) = item_stream_outcome.replace(());
let value = f(value).await;
let (stream_outcome, ()) = stream_outcome.replace(value);
let (item_stream_outcome, ()) = item_stream_outcome.replace(value);
CmdOutcome::BlockInterrupted {
stream_outcome,
item_stream_outcome,
cmd_blocks_processed,
cmd_blocks_not_processed,
}
Expand All @@ -214,16 +213,16 @@ impl<T, E> CmdOutcome<T, E> {
}
}
Self::ItemError {
stream_outcome,
item_stream_outcome,
cmd_blocks_processed,
cmd_blocks_not_processed,
errors,
} => {
let (stream_outcome, value) = stream_outcome.replace(());
let (item_stream_outcome, value) = item_stream_outcome.replace(());
let value = f(value).await;
let (stream_outcome, ()) = stream_outcome.replace(value);
let (item_stream_outcome, ()) = item_stream_outcome.replace(value);
CmdOutcome::ItemError {
stream_outcome,
item_stream_outcome,
cmd_blocks_processed,
cmd_blocks_not_processed,
errors,
Expand All @@ -249,16 +248,16 @@ impl<T, E> CmdOutcome<Result<T, E>, E> {
Err(e) => Err(e),
},
Self::BlockInterrupted {
stream_outcome,
item_stream_outcome,
cmd_blocks_processed,
cmd_blocks_not_processed,
} => {
let (stream_outcome, value) = stream_outcome.replace(());
let (item_stream_outcome, value) = item_stream_outcome.replace(());
match value {
Ok(value) => {
let (stream_outcome, ()) = stream_outcome.replace(value);
let (item_stream_outcome, ()) = item_stream_outcome.replace(value);
Ok(CmdOutcome::BlockInterrupted {
stream_outcome,
item_stream_outcome,
cmd_blocks_processed,
cmd_blocks_not_processed,
})
Expand All @@ -279,17 +278,17 @@ impl<T, E> CmdOutcome<Result<T, E>, E> {
Err(e) => Err(e),
},
Self::ItemError {
stream_outcome,
item_stream_outcome,
cmd_blocks_processed,
cmd_blocks_not_processed,
errors,
} => {
let (stream_outcome, value) = stream_outcome.replace(());
let (item_stream_outcome, value) = item_stream_outcome.replace(());
match value {
Ok(value) => {
let (stream_outcome, ()) = stream_outcome.replace(value);
let (item_stream_outcome, ()) = item_stream_outcome.replace(value);
Ok(CmdOutcome::ItemError {
stream_outcome,
item_stream_outcome,
cmd_blocks_processed,
cmd_blocks_not_processed,
errors,
Expand Down
132 changes: 132 additions & 0 deletions crate/cmd_model/src/item_stream_outcome.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
use fn_graph::StreamOutcomeState;
use peace_cfg::ItemId;

/// How a `Flow` stream operation ended and IDs that were processed.
///
/// Currently this is constructed by `ItemStreamOutcomeMapper`.
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ItemStreamOutcome<T> {
/// The value of the outcome.
pub value: T,
/// How a `Flow` stream operation ended.
pub state: StreamOutcomeState,
/// IDs of the items that were processed.
pub item_ids_processed: Vec<ItemId>,
/// IDs of the items that were not processed.
pub item_ids_not_processed: Vec<ItemId>,
}

impl<T> ItemStreamOutcome<T> {
/// Returns an `ItemStreamOutcome` that is `Finished<T>`.
pub fn finished_with(value: T, item_ids_processed: Vec<ItemId>) -> Self {
Self {
value,
state: StreamOutcomeState::Finished,
item_ids_processed,
item_ids_not_processed: Vec::new(),
}
}

/// Maps this outcome's value to another.
pub fn map<TNew>(self, f: impl FnOnce(T) -> TNew) -> ItemStreamOutcome<TNew> {
let ItemStreamOutcome {
value,
state,
item_ids_processed,
item_ids_not_processed,
} = self;

let value = f(value);

ItemStreamOutcome {
value,
state,
item_ids_processed,
item_ids_not_processed,
}
}

/// Replaces the value from this outcome with another.
pub fn replace<TNew>(self, value_new: TNew) -> (ItemStreamOutcome<TNew>, T) {
let ItemStreamOutcome {
value: value_existing,
state,
item_ids_processed,
item_ids_not_processed,
} = self;

(
ItemStreamOutcome {
value: value_new,
state,
item_ids_processed,
item_ids_not_processed,
},
value_existing,
)
}

/// Replaces the value from this outcome with another, taking the current
/// value as a parameter.
pub fn replace_with<TNew, U>(
self,
f: impl FnOnce(T) -> (TNew, U),
) -> (ItemStreamOutcome<TNew>, U) {
let ItemStreamOutcome {
value,
state,
item_ids_processed,
item_ids_not_processed,
} = self;

let (value, extracted) = f(value);

(
ItemStreamOutcome {
value,
state,
item_ids_processed,
item_ids_not_processed,
},
extracted,
)
}

pub fn into_value(self) -> T {
self.value
}

pub fn value(&self) -> &T {
&self.value
}

pub fn value_mut(&mut self) -> &mut T {
&mut self.value
}

pub fn state(&self) -> StreamOutcomeState {
self.state
}

pub fn item_ids_processed(&self) -> &[ItemId] {
self.item_ids_processed.as_ref()
}

pub fn item_ids_not_processed(&self) -> &[ItemId] {
self.item_ids_not_processed.as_ref()
}
}

impl<T> Default for ItemStreamOutcome<T>
where
T: Default,
{
fn default() -> Self {
Self {
value: T::default(),
state: StreamOutcomeState::NotStarted,
item_ids_processed: Vec::new(),
item_ids_not_processed: Vec::new(),
}
}
}
3 changes: 2 additions & 1 deletion crate/cmd_model/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ pub use indexmap;
pub use crate::{
cmd_block_desc::CmdBlockDesc, cmd_block_outcome::CmdBlockOutcome,
cmd_execution_error::CmdExecutionError, cmd_outcome::CmdOutcome,
stream_outcome_and_errors::StreamOutcomeAndErrors,
item_stream_outcome::ItemStreamOutcome, stream_outcome_and_errors::StreamOutcomeAndErrors,
value_and_stream_outcome::ValueAndStreamOutcome,
};

mod cmd_block_desc;
mod cmd_block_outcome;
mod cmd_execution_error;
mod cmd_outcome;
mod item_stream_outcome;
mod stream_outcome_and_errors;
mod value_and_stream_outcome;
Loading

0 comments on commit 1a1653f

Please sign in to comment.