diff --git a/zfctl/src/main.rs b/zfctl/src/main.rs index 936fa145..6706abc1 100644 --- a/zfctl/src/main.rs +++ b/zfctl/src/main.rs @@ -18,7 +18,8 @@ use instance_command::InstanceCommand; mod daemon_command; use daemon_command::DaemonCommand; -mod run_local; +mod run_local_command; +use run_local_command::RunLocalCommand; mod utils; use std::path::PathBuf; @@ -26,7 +27,7 @@ use std::path::PathBuf; use anyhow::anyhow; use clap::{ArgGroup, Parser, Subcommand}; use utils::{get_random_runtime, get_runtime_by_name}; -use zenoh_flow_commons::{parse_vars, Result, RuntimeId}; +use zenoh_flow_commons::{Result, RuntimeId}; const ZENOH_FLOW_INTERNAL_ERROR: &str = r#" `zfctl` encountered a fatal internal error. @@ -92,27 +93,7 @@ enum Command { /// Run a dataflow locally. #[command(verbatim_doc_comment)] - RunLocal { - /// The data flow to execute. - flow: PathBuf, - /// The path to a Zenoh configuration to manage the connection to the Zenoh - /// network. - /// - /// If no configuration is provided, `zfctl` will default to connecting as - /// a peer with multicast scouting enabled. - #[arg(short = 'z', long, verbatim_doc_comment)] - zenoh_configuration: Option, - /// The, optional, location of the configuration to load nodes implemented not in Rust. - #[arg(short, long, value_name = "path")] - extensions: Option, - /// Variables to add / overwrite in the `vars` section of your data - /// flow, with the form `KEY=VALUE`. Can be repeated multiple times. - /// - /// Example: - /// --vars HOME_DIR=/home/zenoh-flow --vars BUILD=debug - #[arg(long, value_parser = parse_vars::, verbatim_doc_comment)] - vars: Option>, - }, + RunLocal(RunLocalCommand), } #[async_std::main] @@ -154,11 +135,6 @@ async fn main() -> Result<()> { command.run(session, orchestrator_id).await } Command::Daemon(command) => command.run(session).await, - Command::RunLocal { - flow, - zenoh_configuration, - extensions, - vars, - } => run_local::run_locally(flow, zenoh_configuration, extensions, vars).await, + Command::RunLocal(command) => command.run(session).await, } } diff --git a/zfctl/src/run_local.rs b/zfctl/src/run_local.rs deleted file mode 100644 index e655d005..00000000 --- a/zfctl/src/run_local.rs +++ /dev/null @@ -1,117 +0,0 @@ -use std::path::PathBuf; - -use anyhow::Context; -use async_std::io::ReadExt; -use zenoh_flow_commons::{Result, Vars}; -use zenoh_flow_descriptors::{DataFlowDescriptor, FlattenedDataFlowDescriptor}; -use zenoh_flow_records::DataFlowRecord; -use zenoh_flow_runtime::{Extensions, Runtime}; - -pub async fn run_locally( - flow: PathBuf, - zenoh_configuration: Option, - extensions: Option, - vars: Option>, -) -> Result<()> { - let extensions = match extensions { - Some(extensions_path) => { - let (extensions, _) = zenoh_flow_commons::try_parse_from_file::( - extensions_path.as_os_str(), - Vars::default(), - ) - .context(format!( - "Failed to load Loader configuration from < {} >", - &extensions_path.display() - )) - .unwrap(); - - extensions - } - None => Extensions::default(), - }; - - let vars = match vars { - Some(v) => Vars::from(v), - None => Vars::default(), - }; - - let (data_flow, vars) = - zenoh_flow_commons::try_parse_from_file::(flow.as_os_str(), vars) - .context(format!( - "Failed to load data flow descriptor from < {} >", - &flow.display() - )) - .unwrap(); - - let flattened_flow = FlattenedDataFlowDescriptor::try_flatten(data_flow, vars) - .context(format!( - "Failed to flattened data flow extracted from < {} >", - &flow.display() - )) - .unwrap(); - - let mut runtime_builder = Runtime::builder("zenoh-flow-standalone-runtime") - .add_extensions(extensions) - .expect("Failed to add extensions"); - - if let Some(path) = zenoh_configuration { - let zenoh_config = zenoh_flow_runtime::zenoh::Config::from_file(path.clone()) - .unwrap_or_else(|e| { - panic!( - "Failed to parse the Zenoh configuration from < {} >:\n{e:?}", - path.display() - ) - }); - let zenoh_session = zenoh_flow_runtime::zenoh::open(zenoh_config) - .await - .unwrap_or_else(|e| panic!("Failed to open a Zenoh session: {e:?}")); - - runtime_builder = runtime_builder.session(zenoh_session); - } - - let runtime = runtime_builder - .build() - .await - .expect("Failed to build the Zenoh-Flow runtime"); - - let record = DataFlowRecord::try_new(&flattened_flow, runtime.id()) - .context("Failed to create a Record from the flattened data flow descriptor") - .unwrap(); - - let instance_id = record.instance_id().clone(); - let record_name = record.name().clone(); - runtime - .try_load_data_flow(record) - .await - .context("Failed to load Record") - .unwrap(); - - runtime - .try_start_instance(&instance_id) - .await - .unwrap_or_else(|e| panic!("Failed to start data flow < {} >: {:?}", &instance_id, e)); - - let mut stdin = async_std::io::stdin(); - let mut input = [0_u8]; - println!( - r#" - The flow ({}) < {} > was successfully started. - To abort its execution, simply enter 'q'. - "#, - record_name, instance_id - ); - - loop { - let _ = stdin.read_exact(&mut input).await; - if input[0] == b'q' { - break; - } - } - - runtime - .try_delete_instance(&instance_id) - .await - .unwrap_or_else(|e| panic!("Failed to delete data flow < {} >: {:?}", &instance_id, e)); - - Ok(()) -} diff --git a/zfctl/src/run_local_command.rs b/zfctl/src/run_local_command.rs new file mode 100644 index 00000000..78e2d26b --- /dev/null +++ b/zfctl/src/run_local_command.rs @@ -0,0 +1,120 @@ +use std::path::PathBuf; + +use anyhow::Context; +use async_std::io::ReadExt; +use clap::Parser; +use zenoh::Session; +use zenoh_flow_commons::{parse_vars, Result, Vars}; +use zenoh_flow_descriptors::{DataFlowDescriptor, FlattenedDataFlowDescriptor}; +use zenoh_flow_records::DataFlowRecord; +use zenoh_flow_runtime::{Extensions, Runtime}; + +#[derive(Parser)] +pub struct RunLocalCommand { + /// The data flow to execute. + flow: PathBuf, + /// The, optional, location of the configuration to load nodes implemented not in Rust. + #[arg(short, long, value_name = "path")] + extensions: Option, + /// Variables to add / overwrite in the `vars` section of your data + /// flow, with the form `KEY=VALUE`. Can be repeated multiple times. + /// + /// Example: + /// --vars HOME_DIR=/home/zenoh-flow --vars BUILD=debug + #[arg(long, value_parser = parse_vars::, verbatim_doc_comment)] + vars: Option>, +} + +impl RunLocalCommand { + pub async fn run(self, session: Session) -> Result<()> { + let extensions = match self.extensions { + Some(extensions_path) => { + let (extensions, _) = zenoh_flow_commons::try_parse_from_file::( + extensions_path.as_os_str(), + Vars::default(), + ) + .context(format!( + "Failed to load Loader configuration from < {} >", + &extensions_path.display() + )) + .unwrap(); + + extensions + } + None => Extensions::default(), + }; + + let vars = match self.vars { + Some(v) => Vars::from(v), + None => Vars::default(), + }; + + let (data_flow, vars) = zenoh_flow_commons::try_parse_from_file::( + self.flow.as_os_str(), + vars, + ) + .context(format!( + "Failed to load data flow descriptor from < {} >", + &self.flow.display() + )) + .unwrap(); + + let flattened_flow = FlattenedDataFlowDescriptor::try_flatten(data_flow, vars) + .context(format!( + "Failed to flattened data flow extracted from < {} >", + &self.flow.display() + )) + .unwrap(); + + let runtime_builder = Runtime::builder("zenoh-flow-standalone-runtime") + .add_extensions(extensions) + .expect("Failed to add extensions") + .session(session); + + let runtime = runtime_builder + .build() + .await + .expect("Failed to build the Zenoh-Flow runtime"); + + let record = DataFlowRecord::try_new(&flattened_flow, runtime.id()) + .context("Failed to create a Record from the flattened data flow descriptor") + .unwrap(); + + let instance_id = record.instance_id().clone(); + let record_name = record.name().clone(); + runtime + .try_load_data_flow(record) + .await + .context("Failed to load Record") + .unwrap(); + + runtime + .try_start_instance(&instance_id) + .await + .unwrap_or_else(|e| panic!("Failed to start data flow < {} >: {:?}", &instance_id, e)); + + let mut stdin = async_std::io::stdin(); + let mut input = [0_u8]; + println!( + r#" + The flow ({}) < {} > was successfully started. + To abort its execution, simply enter 'q'. + "#, + record_name, instance_id + ); + + loop { + let _ = stdin.read_exact(&mut input).await; + if input[0] == b'q' { + break; + } + } + + runtime + .try_delete_instance(&instance_id) + .await + .unwrap_or_else(|e| panic!("Failed to delete data flow < {} >: {:?}", &instance_id, e)); + + Ok(()) + } +}