From db06f9fc4f256793583a9b4e1eb6cd914ef64e92 Mon Sep 17 00:00:00 2001 From: Azriel Hoh Date: Fri, 29 Dec 2023 10:24:55 +1300 Subject: [PATCH 1/4] Store `ItemStreamOutcome` instead of `StreamOutcome` in `CmdOutcome`. --- crate/cmd_model/src/cmd_outcome.rs | 61 ++++---- crate/cmd_model/src/item_stream_outcome.rs | 132 ++++++++++++++++++ crate/cmd_model/src/lib.rs | 3 +- crate/cmd_rt/src/cmd_execution.rs | 14 +- .../cmd_rt/src/item_stream_outcome_mapper.rs | 60 ++++++++ crate/cmd_rt/src/lib.rs | 2 + examples/envman/src/cmds/env_clean_cmd.rs | 2 +- examples/envman/src/cmds/env_deploy_cmd.rs | 2 +- examples/envman/src/cmds/env_discover_cmd.rs | 2 +- examples/envman/src/cmds/profile_init_cmd.rs | 2 +- examples/envman/src/output.rs | 21 ++- workspace_tests/src/cmd_model.rs | 1 + workspace_tests/src/cmd_model/cmd_outcome.rs | 8 +- .../src/cmd_model/item_stream_outcome.rs | 31 ++++ workspace_tests/src/rt/cmds/clean_cmd.rs | 8 +- workspace_tests/src/rt/cmds/ensure_cmd.rs | 24 ++-- .../src/rt/cmds/states_discover_cmd.rs | 20 +-- 17 files changed, 310 insertions(+), 83 deletions(-) create mode 100644 crate/cmd_model/src/item_stream_outcome.rs create mode 100644 crate/cmd_rt/src/item_stream_outcome_mapper.rs create mode 100644 workspace_tests/src/cmd_model/item_stream_outcome.rs diff --git a/crate/cmd_model/src/cmd_outcome.rs b/crate/cmd_model/src/cmd_outcome.rs index e23038edc..643e7353d 100644 --- a/crate/cmd_model/src/cmd_outcome.rs +++ b/crate/cmd_model/src/cmd_outcome.rs @@ -1,9 +1,8 @@ -use fn_graph::StreamOutcome; use futures::Future; use indexmap::IndexMap; use peace_cfg::ItemId; -use crate::CmdBlockDesc; +use crate::{CmdBlockDesc, ItemStreamOutcome}; /// Outcome of a [`CmdExecution`]. /// @@ -26,7 +25,7 @@ pub enum CmdOutcome { /// Execution ended due to an interruption during command block execution. BlockInterrupted { /// The stream outcome of the interrupted command block. - stream_outcome: StreamOutcome, + item_stream_outcome: ItemStreamOutcome, /// Descriptors of the `CmdBlock`s that were processed. /// /// This does not include the `CmdBlock` that was interrupted. @@ -48,10 +47,10 @@ pub enum CmdOutcome { /// Execution ended due to one or more item errors. /// /// It is also possible for the stream to be interrupted when an error - /// occurs, so the value is wrapped in a `StreamOutcome`. + /// occurs, so the value is wrapped in a `ItemStreamOutcome`. ItemError { /// The outcome value. - stream_outcome: StreamOutcome, + item_stream_outcome: ItemStreamOutcome, /// Descriptors of the `CmdBlock`s that were processed. /// /// This does not include the `CmdBlock` that erred. @@ -73,21 +72,21 @@ impl CmdOutcome { cmd_blocks_processed: _, } => Some(value), CmdOutcome::BlockInterrupted { - stream_outcome, + item_stream_outcome, cmd_blocks_processed: _, cmd_blocks_not_processed: _, - } => Some(stream_outcome.value()), + } => Some(item_stream_outcome.value()), CmdOutcome::ExecutionInterrupted { value, cmd_blocks_processed: _, cmd_blocks_not_processed: _, } => value.as_ref(), CmdOutcome::ItemError { - stream_outcome, + item_stream_outcome, cmd_blocks_processed: _, cmd_blocks_not_processed: _, errors: _, - } => Some(stream_outcome.value()), + } => Some(item_stream_outcome.value()), } } @@ -126,13 +125,13 @@ impl CmdOutcome { } } Self::BlockInterrupted { - stream_outcome, + item_stream_outcome, cmd_blocks_processed, cmd_blocks_not_processed, } => { - let stream_outcome = stream_outcome.map(f); + let item_stream_outcome = item_stream_outcome.map(f); CmdOutcome::BlockInterrupted { - stream_outcome, + item_stream_outcome, cmd_blocks_processed, cmd_blocks_not_processed, } @@ -150,14 +149,14 @@ impl CmdOutcome { } } Self::ItemError { - stream_outcome, + item_stream_outcome, cmd_blocks_processed, cmd_blocks_not_processed, errors, } => { - let stream_outcome = stream_outcome.map(f); + let item_stream_outcome = item_stream_outcome.map(f); CmdOutcome::ItemError { - stream_outcome, + item_stream_outcome, cmd_blocks_processed, cmd_blocks_not_processed, errors, @@ -185,15 +184,15 @@ impl CmdOutcome { } } Self::BlockInterrupted { - stream_outcome, + item_stream_outcome, cmd_blocks_processed, cmd_blocks_not_processed, } => { - let (stream_outcome, value) = stream_outcome.replace(()); + let (item_stream_outcome, value) = item_stream_outcome.replace(()); let value = f(value).await; - let (stream_outcome, ()) = stream_outcome.replace(value); + let (item_stream_outcome, ()) = item_stream_outcome.replace(value); CmdOutcome::BlockInterrupted { - stream_outcome, + item_stream_outcome, cmd_blocks_processed, cmd_blocks_not_processed, } @@ -214,16 +213,16 @@ impl CmdOutcome { } } Self::ItemError { - stream_outcome, + item_stream_outcome, cmd_blocks_processed, cmd_blocks_not_processed, errors, } => { - let (stream_outcome, value) = stream_outcome.replace(()); + let (item_stream_outcome, value) = item_stream_outcome.replace(()); let value = f(value).await; - let (stream_outcome, ()) = stream_outcome.replace(value); + let (item_stream_outcome, ()) = item_stream_outcome.replace(value); CmdOutcome::ItemError { - stream_outcome, + item_stream_outcome, cmd_blocks_processed, cmd_blocks_not_processed, errors, @@ -249,16 +248,16 @@ impl CmdOutcome, E> { Err(e) => Err(e), }, Self::BlockInterrupted { - stream_outcome, + item_stream_outcome, cmd_blocks_processed, cmd_blocks_not_processed, } => { - let (stream_outcome, value) = stream_outcome.replace(()); + let (item_stream_outcome, value) = item_stream_outcome.replace(()); match value { Ok(value) => { - let (stream_outcome, ()) = stream_outcome.replace(value); + let (item_stream_outcome, ()) = item_stream_outcome.replace(value); Ok(CmdOutcome::BlockInterrupted { - stream_outcome, + item_stream_outcome, cmd_blocks_processed, cmd_blocks_not_processed, }) @@ -279,17 +278,17 @@ impl CmdOutcome, E> { Err(e) => Err(e), }, Self::ItemError { - stream_outcome, + item_stream_outcome, cmd_blocks_processed, cmd_blocks_not_processed, errors, } => { - let (stream_outcome, value) = stream_outcome.replace(()); + let (item_stream_outcome, value) = item_stream_outcome.replace(()); match value { Ok(value) => { - let (stream_outcome, ()) = stream_outcome.replace(value); + let (item_stream_outcome, ()) = item_stream_outcome.replace(value); Ok(CmdOutcome::ItemError { - stream_outcome, + item_stream_outcome, cmd_blocks_processed, cmd_blocks_not_processed, errors, diff --git a/crate/cmd_model/src/item_stream_outcome.rs b/crate/cmd_model/src/item_stream_outcome.rs new file mode 100644 index 000000000..86150b96c --- /dev/null +++ b/crate/cmd_model/src/item_stream_outcome.rs @@ -0,0 +1,132 @@ +use fn_graph::StreamOutcomeState; +use peace_cfg::ItemId; + +/// How a `Flow` stream operation ended and IDs that were processed. +/// +/// Currently this is constructed by `ItemStreamOutcomeMapper`. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct ItemStreamOutcome { + /// The value of the outcome. + pub value: T, + /// How a `Flow` stream operation ended. + pub state: StreamOutcomeState, + /// IDs of the items that were processed. + pub item_ids_processed: Vec, + /// IDs of the items that were not processed. + pub item_ids_not_processed: Vec, +} + +impl ItemStreamOutcome { + /// Returns an `ItemStreamOutcome` that is `Finished`. + pub fn finished_with(value: T, item_ids_processed: Vec) -> Self { + Self { + value, + state: StreamOutcomeState::Finished, + item_ids_processed, + item_ids_not_processed: Vec::new(), + } + } + + /// Maps this outcome's value to another. + pub fn map(self, f: impl FnOnce(T) -> TNew) -> ItemStreamOutcome { + let ItemStreamOutcome { + value, + state, + item_ids_processed, + item_ids_not_processed, + } = self; + + let value = f(value); + + ItemStreamOutcome { + value, + state, + item_ids_processed, + item_ids_not_processed, + } + } + + /// Replaces the value from this outcome with another. + pub fn replace(self, value_new: TNew) -> (ItemStreamOutcome, T) { + let ItemStreamOutcome { + value: value_existing, + state, + item_ids_processed, + item_ids_not_processed, + } = self; + + ( + ItemStreamOutcome { + value: value_new, + state, + item_ids_processed, + item_ids_not_processed, + }, + value_existing, + ) + } + + /// Replaces the value from this outcome with another, taking the current + /// value as a parameter. + pub fn replace_with( + self, + f: impl FnOnce(T) -> (TNew, U), + ) -> (ItemStreamOutcome, U) { + let ItemStreamOutcome { + value, + state, + item_ids_processed, + item_ids_not_processed, + } = self; + + let (value, extracted) = f(value); + + ( + ItemStreamOutcome { + value, + state, + item_ids_processed, + item_ids_not_processed, + }, + extracted, + ) + } + + pub fn into_value(self) -> T { + self.value + } + + pub fn value(&self) -> &T { + &self.value + } + + pub fn value_mut(&mut self) -> &mut T { + &mut self.value + } + + pub fn state(&self) -> StreamOutcomeState { + self.state + } + + pub fn item_ids_processed(&self) -> &[ItemId] { + self.item_ids_processed.as_ref() + } + + pub fn item_ids_not_processed(&self) -> &[ItemId] { + self.item_ids_not_processed.as_ref() + } +} + +impl Default for ItemStreamOutcome +where + T: Default, +{ + fn default() -> Self { + Self { + value: T::default(), + state: StreamOutcomeState::NotStarted, + item_ids_processed: Vec::new(), + item_ids_not_processed: Vec::new(), + } + } +} diff --git a/crate/cmd_model/src/lib.rs b/crate/cmd_model/src/lib.rs index 1db55152e..699320767 100644 --- a/crate/cmd_model/src/lib.rs +++ b/crate/cmd_model/src/lib.rs @@ -9,7 +9,7 @@ pub use indexmap; pub use crate::{ cmd_block_desc::CmdBlockDesc, cmd_block_outcome::CmdBlockOutcome, cmd_execution_error::CmdExecutionError, cmd_outcome::CmdOutcome, - stream_outcome_and_errors::StreamOutcomeAndErrors, + item_stream_outcome::ItemStreamOutcome, stream_outcome_and_errors::StreamOutcomeAndErrors, value_and_stream_outcome::ValueAndStreamOutcome, }; @@ -17,5 +17,6 @@ mod cmd_block_desc; mod cmd_block_outcome; mod cmd_execution_error; mod cmd_outcome; +mod item_stream_outcome; mod stream_outcome_and_errors; mod value_and_stream_outcome; diff --git a/crate/cmd_rt/src/cmd_execution.rs b/crate/cmd_rt/src/cmd_execution.rs index e6b62cb2a..165e5d8d5 100644 --- a/crate/cmd_rt/src/cmd_execution.rs +++ b/crate/cmd_rt/src/cmd_execution.rs @@ -12,7 +12,7 @@ use peace_cmd_model::{CmdBlockDesc, CmdOutcome}; use peace_resources::{resources::ts::SetUp, Resources}; use peace_rt_model::{output::OutputWrite, params::ParamsKeys}; -use crate::{CmdBlockError, CmdBlockRtBox}; +use crate::{CmdBlockError, CmdBlockRtBox, ItemStreamOutcomeMapper}; cfg_if::cfg_if! { if #[cfg(feature = "output_progress")] { @@ -305,7 +305,9 @@ where }; let CmdViewAndProgress { - cmd_view, + cmd_view: SingleProfileSingleFlowView { + flow, resources, .. + }, #[cfg(feature = "output_progress")] progress_tx, } = cmd_view_and_progress; @@ -324,6 +326,7 @@ where )), CmdBlockError::Exec(error) => Err(error), CmdBlockError::Interrupt { stream_outcome } => { + let item_stream_outcome = ItemStreamOutcomeMapper::map(flow, stream_outcome); let cmd_blocks_processed = cmd_blocks .range(0..cmd_block_index) .map(|cmd_block_rt| cmd_block_rt.cmd_block_desc()) @@ -334,7 +337,7 @@ where .map(|cmd_block_rt| cmd_block_rt.cmd_block_desc()) .collect::>(); let cmd_outcome = CmdOutcome::BlockInterrupted { - stream_outcome, + item_stream_outcome, cmd_blocks_processed, cmd_blocks_not_processed, }; @@ -345,6 +348,7 @@ where stream_outcome, errors, } => { + let item_stream_outcome = ItemStreamOutcomeMapper::map(flow, stream_outcome); let cmd_blocks_processed = cmd_blocks .range(0..cmd_block_index) .map(|cmd_block_rt| cmd_block_rt.cmd_block_desc()) @@ -356,7 +360,7 @@ where .collect::>(); let cmd_outcome = CmdOutcome::ItemError { - stream_outcome, + item_stream_outcome, cmd_blocks_processed, cmd_blocks_not_processed, errors, @@ -366,7 +370,7 @@ where } } } else { - let execution_outcome = execution_outcome_fetch(cmd_view.resources); + let execution_outcome = execution_outcome_fetch(resources); let cmd_outcome = if let Some(cmd_block_index_next) = cmd_block_index_next { let cmd_blocks_processed = cmd_blocks .range(0..cmd_block_index_next) diff --git a/crate/cmd_rt/src/item_stream_outcome_mapper.rs b/crate/cmd_rt/src/item_stream_outcome_mapper.rs new file mode 100644 index 000000000..9c852f3ab --- /dev/null +++ b/crate/cmd_rt/src/item_stream_outcome_mapper.rs @@ -0,0 +1,60 @@ +use fn_graph::StreamOutcome; +use peace_cmd_model::ItemStreamOutcome; +use peace_rt_model::Flow; + +/// Maps a `StreamOutcome` to an `ItemStreamOutcome`. +/// +/// # Design Note +/// +/// This resides in the `cmd_rt` package as the `Flow` type is needed for +/// mapping, and adding `rt_model` as a dependency of `cmd_model` creates a +/// dependency cycle with `rt_model_core`: +/// +/// ```text +/// cmd_model -> rt_model +/// ^ / +/// / v +/// rt_model_core +/// ``` +pub struct ItemStreamOutcomeMapper; + +impl ItemStreamOutcomeMapper { + /// Maps `FnId`s into `ItemId`s for a better information abstraction level. + pub fn map(flow: &Flow, stream_outcome: StreamOutcome) -> ItemStreamOutcome + where + E: 'static, + { + let StreamOutcome { + value, + state, + fn_ids_processed, + fn_ids_not_processed, + } = stream_outcome; + + let item_ids_processed = fn_ids_processed + .into_iter() + .filter_map(|fn_id| { + flow.graph() + .node_weight(fn_id) + .map(|item| item.id()) + .cloned() + }) + .collect::>(); + let item_ids_not_processed = fn_ids_not_processed + .into_iter() + .filter_map(|fn_id| { + flow.graph() + .node_weight(fn_id) + .map(|item| item.id()) + .cloned() + }) + .collect::>(); + + ItemStreamOutcome { + value, + state, + item_ids_processed, + item_ids_not_processed, + } + } +} diff --git a/crate/cmd_rt/src/lib.rs b/crate/cmd_rt/src/lib.rs index 1936a1a7e..e38516b8e 100644 --- a/crate/cmd_rt/src/lib.rs +++ b/crate/cmd_rt/src/lib.rs @@ -7,10 +7,12 @@ pub use tynm; pub use crate::{ cmd_block::{CmdBlock, CmdBlockError, CmdBlockRt, CmdBlockRtBox, CmdBlockWrapper}, cmd_execution::{CmdExecution, CmdExecutionBuilder}, + item_stream_outcome_mapper::ItemStreamOutcomeMapper, }; mod cmd_block; mod cmd_execution; +mod item_stream_outcome_mapper; cfg_if::cfg_if! { if #[cfg(feature = "output_progress")] { diff --git a/examples/envman/src/cmds/env_clean_cmd.rs b/examples/envman/src/cmds/env_clean_cmd.rs index ecff44bbf..118a196b0 100644 --- a/examples/envman/src/cmds/env_clean_cmd.rs +++ b/examples/envman/src/cmds/env_clean_cmd.rs @@ -98,7 +98,7 @@ where .await?; } - crate::output::cmd_outcome_completion_present(output, flow, resources, states_cleaned_outcome) + crate::output::cmd_outcome_completion_present(output, resources, states_cleaned_outcome) .await?; Ok(()) diff --git a/examples/envman/src/cmds/env_deploy_cmd.rs b/examples/envman/src/cmds/env_deploy_cmd.rs index 35583e268..76012ff49 100644 --- a/examples/envman/src/cmds/env_deploy_cmd.rs +++ b/examples/envman/src/cmds/env_deploy_cmd.rs @@ -98,7 +98,7 @@ where .await?; } - crate::output::cmd_outcome_completion_present(output, flow, resources, states_ensured_outcome) + crate::output::cmd_outcome_completion_present(output, resources, states_ensured_outcome) .await?; Ok(()) diff --git a/examples/envman/src/cmds/env_discover_cmd.rs b/examples/envman/src/cmds/env_discover_cmd.rs index 1a2149421..6285c8909 100644 --- a/examples/envman/src/cmds/env_discover_cmd.rs +++ b/examples/envman/src/cmds/env_discover_cmd.rs @@ -120,7 +120,7 @@ where .await?; } - crate::output::cmd_outcome_completion_present(output, flow, resources, states_discover_outcome) + crate::output::cmd_outcome_completion_present(output, resources, states_discover_outcome) .await?; Ok(()) diff --git a/examples/envman/src/cmds/profile_init_cmd.rs b/examples/envman/src/cmds/profile_init_cmd.rs index e51045a46..47a4fcc46 100644 --- a/examples/envman/src/cmds/profile_init_cmd.rs +++ b/examples/envman/src/cmds/profile_init_cmd.rs @@ -180,7 +180,7 @@ impl ProfileInitCmd { ); } CmdOutcome::ItemError { - stream_outcome: _, + item_stream_outcome: _, cmd_blocks_processed: _, cmd_blocks_not_processed: _, errors, diff --git a/examples/envman/src/output.rs b/examples/envman/src/output.rs index a309dab0b..275ffc729 100644 --- a/examples/envman/src/output.rs +++ b/examples/envman/src/output.rs @@ -6,7 +6,7 @@ use peace::{ presentln, }, resources::{resources::ts::SetUp, Resources}, - rt_model::{output::OutputWrite, Flow, IndexMap}, + rt_model::{output::OutputWrite, IndexMap}, }; use crate::model::EnvManError; @@ -127,7 +127,6 @@ where /// Presents interruption or error information of a `CmdOutcome`. pub async fn cmd_outcome_completion_present( output: &mut O, - flow: &Flow, resources: &Resources, cmd_outcome: CmdOutcome, ) -> Result<(), EnvManError> @@ -142,7 +141,7 @@ where // Nothing to do. } CmdOutcome::BlockInterrupted { - stream_outcome, + item_stream_outcome, cmd_blocks_processed, cmd_blocks_not_processed, } => { @@ -154,15 +153,13 @@ where .iter() .map(|cmd_block_desc| cmd_block_desc.cmd_block_name()) .collect::>(); - let item_ids_complete = stream_outcome - .fn_ids_processed() + let item_ids_processed = item_stream_outcome + .item_ids_processed() .iter() - .filter_map(|fn_id| flow.graph().node_weight(*fn_id).map(|item| item.id())) .collect::>(); - let item_ids_incomplete = stream_outcome - .fn_ids_not_processed() + let item_ids_not_processed = item_stream_outcome + .item_ids_not_processed() .iter() - .filter_map(|fn_id| flow.graph().node_weight(*fn_id).map(|item| item.id())) .collect::>(); presentln!(output, ["Execution was interrupted."]); @@ -178,11 +175,11 @@ where presentln!(output, ["Items completed:"]); presentln!(output, [""]); - presentln!(output, [&item_ids_complete]); + presentln!(output, [&item_ids_processed]); presentln!(output, ["Items not completed:"]); presentln!(output, [""]); - presentln!(output, [&item_ids_incomplete]); + presentln!(output, [&item_ids_not_processed]); } CmdOutcome::ExecutionInterrupted { value: _, @@ -210,7 +207,7 @@ where presentln!(output, [&cmd_blocks_incomplete]); } CmdOutcome::ItemError { - stream_outcome: _, + item_stream_outcome: _, cmd_blocks_processed: _, cmd_blocks_not_processed: _, errors, diff --git a/workspace_tests/src/cmd_model.rs b/workspace_tests/src/cmd_model.rs index cb6dfa1f0..8aa4cad72 100644 --- a/workspace_tests/src/cmd_model.rs +++ b/workspace_tests/src/cmd_model.rs @@ -1,2 +1,3 @@ mod cmd_block_outcome; mod cmd_outcome; +mod item_stream_outcome; diff --git a/workspace_tests/src/cmd_model/cmd_outcome.rs b/workspace_tests/src/cmd_model/cmd_outcome.rs index 553f1d576..4eeca19a0 100644 --- a/workspace_tests/src/cmd_model/cmd_outcome.rs +++ b/workspace_tests/src/cmd_model/cmd_outcome.rs @@ -1,6 +1,6 @@ use peace::{ - cmd_model::CmdOutcome, - rt_model::{fn_graph::StreamOutcome, IndexMap}, + cmd_model::{CmdOutcome, ItemStreamOutcome}, + rt_model::IndexMap, }; #[test] @@ -167,7 +167,7 @@ fn cmd_outcome_complete(value: T) -> CmdOutcome { fn cmd_outcome_block_interrupted(value: T) -> CmdOutcome { CmdOutcome::::BlockInterrupted { - stream_outcome: StreamOutcome::finished_with(value, Vec::new()), + item_stream_outcome: ItemStreamOutcome::finished_with(value, Vec::new()), cmd_blocks_processed: vec![], cmd_blocks_not_processed: vec![], } @@ -183,7 +183,7 @@ fn cmd_outcome_execution_interrupted(value: Option) -> CmdOutcome(value: T) -> CmdOutcome { CmdOutcome::::ItemError { - stream_outcome: StreamOutcome::finished_with(value, Vec::new()), + item_stream_outcome: ItemStreamOutcome::finished_with(value, Vec::new()), cmd_blocks_processed: vec![], cmd_blocks_not_processed: vec![], errors: IndexMap::new(), diff --git a/workspace_tests/src/cmd_model/item_stream_outcome.rs b/workspace_tests/src/cmd_model/item_stream_outcome.rs new file mode 100644 index 000000000..7821db643 --- /dev/null +++ b/workspace_tests/src/cmd_model/item_stream_outcome.rs @@ -0,0 +1,31 @@ +use peace::{ + cfg::{item_id, ItemId}, + cmd_model::ItemStreamOutcome, +}; + +#[test] +fn replace() { + let item_stream_outcome = ItemStreamOutcome::finished_with(1u16, vec![item_id!("mock")]); + + let (item_stream_outcome, n) = item_stream_outcome.replace(2u32); + + assert_eq!(1u16, n); + assert_eq!( + ItemStreamOutcome::finished_with(2u32, vec![item_id!("mock")]), + item_stream_outcome + ); +} + +#[test] +fn replace_with() { + let item_stream_outcome = + ItemStreamOutcome::finished_with((1u16, "value_to_extract"), vec![item_id!("mock")]); + + let (item_stream_outcome, value) = item_stream_outcome.replace_with(|(n, value)| (n, value)); + + assert_eq!("value_to_extract", value); + assert_eq!( + ItemStreamOutcome::finished_with(1u16, vec![item_id!("mock")]), + item_stream_outcome + ); +} diff --git a/workspace_tests/src/rt/cmds/clean_cmd.rs b/workspace_tests/src/rt/cmds/clean_cmd.rs index a5c6b8f94..671807bf8 100644 --- a/workspace_tests/src/rt/cmds/clean_cmd.rs +++ b/workspace_tests/src/rt/cmds/clean_cmd.rs @@ -798,7 +798,7 @@ async fn states_current_not_serialized_on_states_clean_insert_cmd_block_fail() }; let CmdOutcome::ItemError { - stream_outcome, + item_stream_outcome, cmd_blocks_processed: _, cmd_blocks_not_processed: _, errors, @@ -806,7 +806,7 @@ async fn states_current_not_serialized_on_states_clean_insert_cmd_block_fail() else { panic!("Expected `CleanCmd::exec` to complete with item error."); }; - let states_cleaned = stream_outcome.value(); + let states_cleaned = item_stream_outcome.value(); assert_eq!( Some(VecCopyState::from(vec![0u8, 1, 2, 3, 4, 5, 6, 7])).as_ref(), @@ -921,7 +921,7 @@ async fn states_current_not_serialized_on_states_discover_cmd_block_fail() .await?; let CmdOutcome::ItemError { - stream_outcome, + item_stream_outcome, cmd_blocks_processed: _, cmd_blocks_not_processed: _, errors, @@ -929,7 +929,7 @@ async fn states_current_not_serialized_on_states_discover_cmd_block_fail() else { panic!("Expected `CleanCmd::exec` to complete with item error."); }; - let states_cleaned = stream_outcome.value(); + let states_cleaned = item_stream_outcome.value(); assert_eq!( Some(VecCopyState::from(vec![0u8, 1, 2, 3, 4, 5, 6, 7])).as_ref(), diff --git a/workspace_tests/src/rt/cmds/ensure_cmd.rs b/workspace_tests/src/rt/cmds/ensure_cmd.rs index bf125bfd6..5b6950889 100644 --- a/workspace_tests/src/rt/cmds/ensure_cmd.rs +++ b/workspace_tests/src/rt/cmds/ensure_cmd.rs @@ -819,7 +819,7 @@ async fn exec_dry_returns_item_error_when_item_discover_current_returns_error() // Dry ensure states. let CmdOutcome::ItemError { - stream_outcome, + item_stream_outcome, cmd_blocks_processed: _, cmd_blocks_not_processed: _, errors, @@ -827,7 +827,7 @@ async fn exec_dry_returns_item_error_when_item_discover_current_returns_error() else { panic!("Expected `EnsureCmd::exec_dry_with` to complete with item error."); }; - let states_ensured_dry = stream_outcome.value(); + let states_ensured_dry = item_stream_outcome.value(); assert_eq!( Some(VecCopyState::from(vec![0, 1, 2, 3])).as_ref(), @@ -927,7 +927,7 @@ async fn exec_dry_returns_item_error_when_item_discover_goal_returns_error() // Dry ensure states. let CmdOutcome::ItemError { - stream_outcome, + item_stream_outcome, cmd_blocks_processed: _, cmd_blocks_not_processed: _, errors, @@ -935,7 +935,7 @@ async fn exec_dry_returns_item_error_when_item_discover_goal_returns_error() else { panic!("Expected `EnsureCmd::exec_dry_with` to complete with item error."); }; - let states_ensured_dry = stream_outcome.value(); + let states_ensured_dry = item_stream_outcome.value(); assert_eq!( Some(VecCopyState::from(vec![0, 1, 2, 3])).as_ref(), @@ -1035,7 +1035,7 @@ async fn exec_dry_returns_item_error_when_item_apply_check_returns_error() // Dry ensure states. let CmdOutcome::ItemError { - stream_outcome, + item_stream_outcome, cmd_blocks_processed: _, cmd_blocks_not_processed: _, errors, @@ -1043,7 +1043,7 @@ async fn exec_dry_returns_item_error_when_item_apply_check_returns_error() else { panic!("Expected `EnsureCmd::exec_dry_with` to complete with item error."); }; - let states_ensured_dry = stream_outcome.value(); + let states_ensured_dry = item_stream_outcome.value(); assert_eq!( Some(VecCopyState::from(vec![0, 1, 2, 3])).as_ref(), @@ -1143,7 +1143,7 @@ async fn exec_dry_returns_item_error_when_item_apply_dry_returns_error() // Dry ensure states. let CmdOutcome::ItemError { - stream_outcome, + item_stream_outcome, cmd_blocks_processed: _, cmd_blocks_not_processed: _, errors, @@ -1151,7 +1151,7 @@ async fn exec_dry_returns_item_error_when_item_apply_dry_returns_error() else { panic!("Expected `EnsureCmd::exec_dry_with` to complete with item error."); }; - let states_ensured_dry = stream_outcome.value(); + let states_ensured_dry = item_stream_outcome.value(); assert_eq!( Some(VecCopyState::from(vec![0, 1, 2, 3])).as_ref(), @@ -1251,7 +1251,7 @@ async fn exec_returns_item_error_when_item_apply_returns_error() // Ensure states again. let CmdOutcome::ItemError { - stream_outcome, + item_stream_outcome, cmd_blocks_processed: _, cmd_blocks_not_processed: _, errors, @@ -1259,7 +1259,7 @@ async fn exec_returns_item_error_when_item_apply_returns_error() else { panic!("Expected `EnsureCmd::exec_with` to complete with item error."); }; - let states_ensured_again = stream_outcome.value(); + let states_ensured_again = item_stream_outcome.value(); assert_eq!( Some(VecCopyState::from(vec![0, 1, 2, 3])).as_ref(), @@ -1531,7 +1531,7 @@ async fn states_current_not_serialized_on_states_discover_cmd_block_fail() .await?; let CmdOutcome::ItemError { - stream_outcome, + item_stream_outcome, cmd_blocks_processed, cmd_blocks_not_processed, errors, @@ -1539,7 +1539,7 @@ async fn states_current_not_serialized_on_states_discover_cmd_block_fail() else { panic!("Expected `EnsureCmd::exec` to complete with item error."); }; - let states_ensured = stream_outcome.value(); + let states_ensured = item_stream_outcome.value(); assert_eq!( None, diff --git a/workspace_tests/src/rt/cmds/states_discover_cmd.rs b/workspace_tests/src/rt/cmds/states_discover_cmd.rs index 1c4c3450e..45a7bdf7a 100644 --- a/workspace_tests/src/rt/cmds/states_discover_cmd.rs +++ b/workspace_tests/src/rt/cmds/states_discover_cmd.rs @@ -270,7 +270,7 @@ async fn current_returns_error_when_try_state_current_returns_error() .await?; let CmdOutcome::ItemError { - stream_outcome, + item_stream_outcome, cmd_blocks_processed: _, cmd_blocks_not_processed: _, errors, @@ -278,7 +278,7 @@ async fn current_returns_error_when_try_state_current_returns_error() else { panic!("Expected `StatesDiscoverCmd::current` to complete with item error."); }; - let states_current = stream_outcome.value(); + let states_current = item_stream_outcome.value(); let vec_copy_state = states_current.get::(VecCopyItem::ID_DEFAULT); let CmdOutcome::Complete { @@ -346,7 +346,7 @@ async fn goal_returns_error_when_try_state_goal_returns_error() .await?; let CmdOutcome::ItemError { - stream_outcome, + item_stream_outcome, cmd_blocks_processed: _, cmd_blocks_not_processed: _, errors, @@ -354,7 +354,7 @@ async fn goal_returns_error_when_try_state_goal_returns_error() else { panic!("Expected `StatesDiscoverCmd::goal` to complete with item error."); }; - let states_goal = stream_outcome.value(); + let states_goal = item_stream_outcome.value(); let vec_copy_state = states_goal.get::(VecCopyItem::ID_DEFAULT); let CmdOutcome::Complete { @@ -425,7 +425,7 @@ async fn current_and_goal_returns_error_when_try_state_current_returns_error() .await?; let CmdOutcome::ItemError { - stream_outcome, + item_stream_outcome, cmd_blocks_processed: _, cmd_blocks_not_processed: _, errors, @@ -433,7 +433,7 @@ async fn current_and_goal_returns_error_when_try_state_current_returns_error() else { panic!("Expected `StatesDiscoverCmd::current_and_goal` to complete with item error."); }; - let (states_current, states_goal) = stream_outcome.value(); + let (states_current, states_goal) = item_stream_outcome.value(); // States current assertions let CmdOutcome::Complete { @@ -529,7 +529,7 @@ async fn current_and_goal_returns_error_when_try_state_goal_returns_error() .await?; let CmdOutcome::ItemError { - stream_outcome, + item_stream_outcome, cmd_blocks_processed: _, cmd_blocks_not_processed: _, errors, @@ -537,7 +537,7 @@ async fn current_and_goal_returns_error_when_try_state_goal_returns_error() else { panic!("Expected `StatesDiscoverCmd::current_and_goal` to complete with item error."); }; - let (states_current, states_goal) = stream_outcome.value(); + let (states_current, states_goal) = item_stream_outcome.value(); // States current assertions let CmdOutcome::Complete { @@ -635,7 +635,7 @@ async fn current_and_goal_returns_current_error_when_both_try_state_current_and_ .await?; let CmdOutcome::ItemError { - stream_outcome, + item_stream_outcome, cmd_blocks_processed: _, cmd_blocks_not_processed: _, errors, @@ -643,7 +643,7 @@ async fn current_and_goal_returns_current_error_when_both_try_state_current_and_ else { panic!("Expected `StatesDiscoverCmd::current_and_goal` to complete with item error."); }; - let (states_current, states_goal) = stream_outcome.value(); + let (states_current, states_goal) = item_stream_outcome.value(); // States current assertions let CmdOutcome::Complete { From 1bbeb268c71e8b442034dec2d21094092a568842 Mon Sep 17 00:00:00 2001 From: Azriel Hoh Date: Fri, 29 Dec 2023 10:31:57 +1300 Subject: [PATCH 2/4] Update `CHANGELOG.md`. --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9b5cf7421..95409eca6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,11 +5,14 @@ * Change `CmdOutcome` to be an `enum` indicating whether it is completed, interrupted, or erroneous. ([#141], [#163]) * Add `CmdBlock` trait to encompass one function for all items. ([#141], [#163]) * Add interruptibility support using [`interruptible`] through `CmdCtxBuilder::with_interruptibility`. ([#141], [#163]) +* Add `ItemStreamOutcome` to track which `Item`s are processed or not processed. ([#164], [#165]) [`interruptible`]: https://github.com/azriel91/interruptible [#141]: https://github.com/azriel91/peace/issues/141 [#163]: https://github.com/azriel91/peace/pull/163 +[#164]: https://github.com/azriel91/peace/issues/164 +[#165]: https://github.com/azriel91/peace/pull/165 ## 0.0.11 (2023-06-27) From 98a5277f7a80959ccf7457addac83f9b1f45fab2 Mon Sep 17 00:00:00 2001 From: Azriel Hoh Date: Fri, 29 Dec 2023 10:47:58 +1300 Subject: [PATCH 3/4] Add tests for `ItemStreamOutcome`. --- .../src/cmd_model/item_stream_outcome.rs | 78 +++++++++++++++++++ 1 file changed, 78 insertions(+) diff --git a/workspace_tests/src/cmd_model/item_stream_outcome.rs b/workspace_tests/src/cmd_model/item_stream_outcome.rs index 7821db643..f3a32075e 100644 --- a/workspace_tests/src/cmd_model/item_stream_outcome.rs +++ b/workspace_tests/src/cmd_model/item_stream_outcome.rs @@ -1,8 +1,21 @@ use peace::{ cfg::{item_id, ItemId}, cmd_model::ItemStreamOutcome, + rt_model::fn_graph::StreamOutcomeState, }; +#[test] +fn map() { + let item_stream_outcome = ItemStreamOutcome::finished_with(1u16, vec![item_id!("mock")]); + + let item_stream_outcome = item_stream_outcome.map(|n| n + 1); + + assert_eq!( + ItemStreamOutcome::finished_with(2u16, vec![item_id!("mock")]), + item_stream_outcome + ); +} + #[test] fn replace() { let item_stream_outcome = ItemStreamOutcome::finished_with(1u16, vec![item_id!("mock")]); @@ -29,3 +42,68 @@ fn replace_with() { item_stream_outcome ); } + +#[test] +fn into_value() { + let item_stream_outcome = ItemStreamOutcome::finished_with(1u16, vec![item_id!("mock")]); + + let n = item_stream_outcome.into_value(); + + assert_eq!(1u16, n); +} + +#[test] +fn value() { + let item_stream_outcome = ItemStreamOutcome::finished_with(1u16, vec![item_id!("mock")]); + + let n = item_stream_outcome.value(); + + assert_eq!(1u16, *n); +} + +#[test] +fn value_mut() { + let mut item_stream_outcome = ItemStreamOutcome::finished_with(1u16, vec![item_id!("mock")]); + + *item_stream_outcome.value_mut() += 1; + + assert_eq!(2u16, *item_stream_outcome.value()); +} + +#[test] +fn state() { + let item_stream_outcome = ItemStreamOutcome::finished_with(1u16, vec![item_id!("mock")]); + + assert_eq!(StreamOutcomeState::Finished, item_stream_outcome.state()); +} + +#[test] +fn item_ids_processed() { + let item_stream_outcome = ItemStreamOutcome::finished_with(1u16, vec![item_id!("mock")]); + + assert_eq!( + &[item_id!("mock")], + item_stream_outcome.item_ids_processed() + ); +} + +#[test] +fn item_ids_not_processed() { + let mut item_stream_outcome = ItemStreamOutcome::finished_with(1u16, vec![item_id!("mock")]); + item_stream_outcome.item_ids_not_processed = vec![item_id!("mock_1")]; + + assert_eq!( + &[item_id!("mock_1")], + item_stream_outcome.item_ids_not_processed() + ); +} + +#[test] +fn default() { + let item_stream_outcome = ItemStreamOutcome::::default(); + + assert_eq!(0u16, *item_stream_outcome.value()); + assert_eq!(StreamOutcomeState::NotStarted, item_stream_outcome.state()); + assert!(item_stream_outcome.item_ids_processed().is_empty()); + assert!(item_stream_outcome.item_ids_not_processed().is_empty()); +} From 2094d9338334f29402a339d868ea2e05c260e0a7 Mon Sep 17 00:00:00 2001 From: Azriel Hoh Date: Fri, 29 Dec 2023 11:12:39 +1300 Subject: [PATCH 4/4] Add additional interruption test in `EnsureCmd` to cover `BlockInterrupted` variant. --- workspace_tests/src/rt/cmds/ensure_cmd.rs | 140 +++++++++++++++++++++- 1 file changed, 137 insertions(+), 3 deletions(-) diff --git a/workspace_tests/src/rt/cmds/ensure_cmd.rs b/workspace_tests/src/rt/cmds/ensure_cmd.rs index 5b6950889..5b08c1e18 100644 --- a/workspace_tests/src/rt/cmds/ensure_cmd.rs +++ b/workspace_tests/src/rt/cmds/ensure_cmd.rs @@ -1294,7 +1294,6 @@ async fn exec_returns_item_error_when_item_apply_returns_error() Ok(()) } -#[cfg_attr(coverage_nightly, coverage(off))] #[tokio::test] async fn states_current_not_serialized_on_states_current_read_cmd_block_interrupt() -> Result<(), Box> { @@ -1381,7 +1380,6 @@ async fn states_current_not_serialized_on_states_current_read_cmd_block_interrup Ok(()) } -#[cfg_attr(coverage_nightly, coverage(off))] #[tokio::test] async fn states_current_not_serialized_on_states_goal_read_cmd_block_interrupt() -> Result<(), Box> { @@ -1593,7 +1591,6 @@ async fn states_current_not_serialized_on_states_discover_cmd_block_fail() Ok(()) } -#[cfg_attr(coverage_nightly, coverage(off))] #[tokio::test] async fn states_current_not_serialized_on_apply_state_sync_check_cmd_block_interrupt() -> Result<(), Box> { @@ -1694,6 +1691,143 @@ async fn states_current_not_serialized_on_apply_state_sync_check_cmd_block_inter Ok(()) } +#[tokio::test] +async fn states_current_is_serialized_on_apply_exec_cmd_block_interrupt() +-> Result<(), Box> { + let tempdir = tempfile::tempdir()?; + let workspace = Workspace::new( + app_name!(), + WorkspaceSpec::Path(tempdir.path().to_path_buf()), + )?; + let graph = { + let mut graph_builder = ItemGraphBuilder::::new(); + let vec_copy_id = graph_builder.add_fn(VecCopyItem::default().into()); + let mock_id = graph_builder.add_fn(MockItem::<()>::default().into()); + graph_builder.add_edge(vec_copy_id, mock_id)?; + graph_builder.build() + }; + let flow = Flow::new(FlowId::new(crate::fn_name_short!())?, graph); + let mut output = NoOpOutput; + + let (interrupt_tx, interrupt_rx) = mpsc::channel::(16); + + let mut cmd_ctx = CmdCtx::builder_single_profile_single_flow(&mut output, &workspace) + .with_interruptibility(Interruptibility::new( + interrupt_rx.into(), + InterruptStrategy::PollNextN(9), + )) + .with_profile(profile!("test_profile")) + .with_flow(&flow) + .with_item_params::( + VecCopyItem::ID_DEFAULT.clone(), + VecA(vec![0, 1, 2, 3, 4, 5, 6, 7]).into(), + ) + .with_item_params::>(MockItem::<()>::ID_DEFAULT.clone(), MockSrc(1).into()) + .await?; + + // Note: Write custom states current and states goal files to disk. + let flow_dir = cmd_ctx.flow_dir(); + let states_current_content = "\ + vec_copy: []\n\ + mock: 0\n\ + "; + let states_goal_content = "\ + vec_copy: [0, 1, 2, 3, 4, 5, 6, 7]\n\ + mock: 1\n\ + "; + let states_current_file = StatesCurrentFile::from(flow_dir); + tokio::fs::write(&states_current_file, states_current_content.as_bytes()).await?; + let states_goal_file = StatesGoalFile::from(flow_dir); + tokio::fs::write(&states_goal_file, states_goal_content.as_bytes()).await?; + + interrupt_tx.send(InterruptSignal).await?; + let cmd_outcome = EnsureCmd::exec_with(&mut cmd_ctx, ApplyStoredStateSync::Both).await?; + let CmdOutcome::BlockInterrupted { + item_stream_outcome, + cmd_blocks_processed, + cmd_blocks_not_processed, + } = cmd_outcome + else { + panic!( + "Expected `EnsureCmd::exec_with` to complete with interruption,\n\ + but was:\n\ + \n\ + ```ron\n\ + {cmd_outcome:#?}\n\ + ```\n\ + " + ); + }; + let states_ensured = item_stream_outcome.value(); + + // Early interruption returns empty `states_ensured`. + assert_eq!(2, states_ensured.len()); + assert_eq!( + Some(VecCopyState::from(vec![0u8, 1, 2, 3, 4, 5, 6, 7])).as_ref(), + states_ensured.get::(VecCopyItem::ID_DEFAULT) + ); + assert_eq!( + Some(MockState(0)).as_ref(), + states_ensured.get::(MockItem::<()>::ID_DEFAULT) + ); + assert_eq!( + &[ + "StatesCurrentReadCmdBlock", + "StatesGoalReadCmdBlock", + "StatesDiscoverCmdBlock", + "ApplyStateSyncCheckCmdBlock", + ], + cmd_blocks_processed + .iter() + .map(CmdBlockDesc::cmd_block_name) + .collect::>() + .as_slice() + ); + assert_eq!( + &["ApplyExecCmdBlock",], + cmd_blocks_not_processed + .iter() + .map(CmdBlockDesc::cmd_block_name) + .collect::>() + .as_slice() + ); + + // Note: Expect interruption to be between `vec_copy` and `mock` items. + let states_current_content_expected = r#"vec_copy: +- 0 +- 1 +- 2 +- 3 +- 4 +- 5 +- 6 +- 7 +mock: 0 +"#; + // Note: states_goal_file is re-serialized, because we may have generated + // information for earlier items that are used for later items' goal state. + let states_goal_content_expected = r#"vec_copy: +- 0 +- 1 +- 2 +- 3 +- 4 +- 5 +- 6 +- 7 +mock: 1 +"#; + let states_current_content_after_exec = tokio::fs::read_to_string(&states_current_file).await?; + assert_eq!( + states_current_content_expected, + states_current_content_after_exec + ); + let states_goal_content_after_exec = tokio::fs::read_to_string(&states_goal_file).await?; + assert_eq!(states_goal_content_expected, states_goal_content_after_exec); + + Ok(()) +} + #[test] fn debug() { let debug_str = format!("{:?}", EnsureCmd::::default());