diff --git a/zfctl/src/daemon_command.rs b/zfctl/src/daemon_command.rs index 6789cec7..1d3174d7 100644 --- a/zfctl/src/daemon_command.rs +++ b/zfctl/src/daemon_command.rs @@ -36,6 +36,8 @@ use crate::{ #[derive(Subcommand)] pub(crate) enum DaemonCommand { + /// List all the Zenoh-Flow daemons reachable on the Zenoh network. + List, /// Launch a Zenoh-Flow Daemon. #[command(verbatim_doc_comment)] #[command(group( @@ -52,48 +54,38 @@ pub(crate) enum DaemonCommand { name: Option, /// The path of the configuration of the Zenoh-Flow Daemon. /// - /// This configuration allows setting extensions supported by the Runtime + /// This configuration allows setting extensions supported by the Daemon /// and its name. #[arg(short, long, verbatim_doc_comment)] configuration: Option, - /// The path to a Zenoh configuration to manage the connection to the Zenoh - /// network. - /// - /// If no configuration is provided, `zfctl` will default to connecting as - /// a peer with multicast scouting enabled. - #[arg(short = 'z', long, verbatim_doc_comment)] - zenoh_configuration: Option, }, - /// List all the Zenoh-Flow runtimes reachable on the Zenoh network. - List, - /// Returns the status of the provided Zenoh-Flow runtime. + /// Returns the status of the provided Zenoh-Flow daemon. /// - /// The status consists of general information regarding the runtime and the + /// The status consists of general information regarding the daemon and the /// machine it runs on: - /// - the name associated with the Zenoh-Flow runtime, - /// - the number of CPUs the machine running the Zenoh-Flow runtime has, - /// - the total amount of RAM the machine running the Zenoh-Flow runtime has, - /// - for each data flow the Zenoh-Flow runtime manages (partially or not): + /// - the name associated with the Zenoh-Flow daemon, + /// - the number of CPUs the machine running the Zenoh-Flow daemon has, + /// - the total amount of RAM the machine running the Zenoh-Flow daemon has, + /// - for each data flow the Zenoh-Flow daemon manages (partially or not): /// - its unique identifier, /// - its name, /// - its status. #[command(verbatim_doc_comment)] #[command(group( ArgGroup::new("exclusive") - .args(&["runtime_id", "runtime_name"]) + .args(&["daemon_id", "daemon_name"]) .required(true) .multiple(false) ))] Status { - /// The unique identifier of the Zenoh-Flow runtime to contact. - #[arg(short = 'i', long = "id")] - runtime_id: Option, - /// The name of the Zenoh-Flow runtime to contact. + /// The name of the Zenoh-Flow daemon to contact. /// - /// Note that if several runtimes share the same name, the first to + /// Note that if several daemons share the same name, the first to /// answer will be selected. - #[arg(short = 'n', long = "name")] - runtime_name: Option, + daemon_name: Option, + /// The unique identifier of the Zenoh-Flow daemon to contact. + #[arg(short = 'i', long = "id")] + daemon_id: Option, }, } @@ -103,22 +95,7 @@ impl DaemonCommand { DaemonCommand::Start { name, configuration, - zenoh_configuration, } => { - let zenoh_config = match zenoh_configuration { - Some(path) => zenoh::Config::from_file(path.clone()).unwrap_or_else(|e| { - panic!( - "Failed to parse the Zenoh configuration from < {} >:\n{e:?}", - path.display() - ) - }), - None => zenoh::Config::default(), - }; - - let zenoh_session = zenoh::open(zenoh_config) - .await - .unwrap_or_else(|e| panic!("Failed to open Zenoh session:\n{e:?}")); - let daemon = match configuration { Some(path) => { let (zenoh_flow_configuration, _) = @@ -131,13 +108,13 @@ impl DaemonCommand { ) }); - Daemon::spawn_from_config(zenoh_session, zenoh_flow_configuration) + Daemon::spawn_from_config(session, zenoh_flow_configuration) .await .expect("Failed to spawn the Zenoh-Flow Daemon") } None => Daemon::spawn( Runtime::builder(name.unwrap()) - .session(zenoh_session) + .session(session) .build() .await .expect("Failed to build the Zenoh-Flow Runtime"), @@ -179,10 +156,10 @@ impl DaemonCommand { println!("{table}"); } DaemonCommand::Status { - runtime_id, - runtime_name, + daemon_id, + daemon_name, } => { - let runtime_id = match (runtime_id, runtime_name) { + let runtime_id = match (daemon_id, daemon_name) { (Some(id), _) => id, (None, Some(name)) => get_runtime_by_name(&session, &name).await, (None, None) => { @@ -191,9 +168,9 @@ impl DaemonCommand { // any group. // (2) The `group` macro has `multiple = false` which indicates that only a single entry for // any group is accepted. - // (3) The `runtime_id` and `runtime_name` fields belong to the same group "runtime". + // (3) The `daemon_id` and `daemon_name` fields belong to the same group "exclusive". // - // => A single entry for the group "runtime" is required (and mandatory). + // => A single entry for the group "exclusive" is required (and mandatory). unreachable!() } }; @@ -215,7 +192,7 @@ impl DaemonCommand { .await .map_err(|e| { anyhow!( - "Failed to query Zenoh-Flow runtime < {} >: {:?}", + "Failed to query Zenoh-Flow daemon < {} >: {:?}", runtime_id, e ) @@ -272,7 +249,7 @@ impl DaemonCommand { } } - Err(e) => tracing::error!("Reply to runtime status failed with: {:?}", e), + Err(e) => tracing::error!("Reply to daemon status failed with: {:?}", e), } } } diff --git a/zfctl/src/instance_command.rs b/zfctl/src/instance_command.rs index 814de969..329afff9 100644 --- a/zfctl/src/instance_command.rs +++ b/zfctl/src/instance_command.rs @@ -42,12 +42,12 @@ pub(crate) enum InstanceCommand { /// zfctl instance status /// /// - This call will **not** start the data flow instance, only load on all - /// the involved runtimes the nodes composing the data flow. + /// the involved daemons the nodes composing the data flow. /// - /// - If Zenoh-Flow runtimes are specified in the data flow descriptor, + /// - If Zenoh-Flow daemons are specified in the data flow descriptor, /// there is no need to contact them separately or even to make the - /// `create` query on any of them. The Zenoh-Flow runtime orchestrating - /// the creation will query the appropriate Zenoh-Flow runtimes. + /// `create` query on any of them. The Zenoh-Flow daemon orchestrating + /// the creation will query the appropriate Zenoh-Flow daemons. #[command(verbatim_doc_comment)] Create { /// The path, on your machine, of the data flow descriptor. @@ -64,11 +64,11 @@ pub(crate) enum InstanceCommand { Delete { instance_id: Uuid }, /// Obtain the status of the data flow instance. Status { instance_id: Uuid }, - /// List all the data flow instances on the contacted Zenoh-Flow runtime + /// List all the data flow instances on the contacted Zenoh-Flow daemon List, - /// Start the data flow instance, on all the involved Zenoh-Flow runtimes. + /// Start the data flow instance, on all the involved Zenoh-Flow daemons. Start { instance_id: Uuid }, - /// Abort the data flow instance, on all the involved Zenoh-Flow runtimes. + /// Abort the data flow instance, on all the involved Zenoh-Flow daemons. Abort { instance_id: Uuid }, } diff --git a/zfctl/src/main.rs b/zfctl/src/main.rs index 63a52b2b..a15628f6 100644 --- a/zfctl/src/main.rs +++ b/zfctl/src/main.rs @@ -18,15 +18,16 @@ use instance_command::InstanceCommand; mod daemon_command; use daemon_command::DaemonCommand; -mod run_local; +mod run_local_command; +use run_local_command::RunLocalCommand; mod utils; use std::path::PathBuf; use anyhow::anyhow; -use clap::{Parser, Subcommand}; +use clap::{ArgGroup, Parser, Subcommand}; use utils::{get_random_runtime, get_runtime_by_name}; -use zenoh_flow_commons::{parse_vars, Result, RuntimeId}; +use zenoh_flow_commons::{Result, RuntimeId}; const ZENOH_FLOW_INTERNAL_ERROR: &str = r#" `zfctl` encountered a fatal internal error. @@ -55,6 +56,7 @@ struct Zfctl { /// a peer with multicast scouting enabled. #[arg(short = 'z', long, verbatim_doc_comment)] zenoh_configuration: Option, + #[command(subcommand)] command: Command, } @@ -63,21 +65,26 @@ struct Zfctl { enum Command { /// To manage a data flow instance. /// - /// This command accepts an optional `name` or `id` of a Zenoh-Flow Runtime + /// This command accepts an optional `name` or `id` of a Zenoh-Flow Daemon /// to contact. If no name or id is provided, one is randomly selected. - #[group(required = false, multiple = false)] + #[command(group( + ArgGroup::new("exclusive") + .args(&["daemon_id", "daemon_name"]) + .required(false) + .multiple(false) + ))] Instance { #[command(subcommand)] command: InstanceCommand, - /// The unique identifier of the Zenoh-Flow runtime to contact. - #[arg(short = 'i', long = "id", verbatim_doc_comment, group = "runtime")] - runtime_id: Option, - /// The name of the Zenoh-Flow runtime to contact. + /// The unique identifier of the Zenoh-Flow daemon to contact. + #[arg(short = 'i', long = "id", verbatim_doc_comment)] + daemon_id: Option, + /// The name of the Zenoh-Flow daemon to contact. /// - /// If several runtimes share the same name, `zfctl` will abort + /// If several daemons share the same name, `zfctl` will abort /// its execution asking you to instead use their `id`. - #[arg(short = 'n', long = "name", verbatim_doc_comment, group = "runtime")] - runtime_name: Option, + #[arg(short = 'n', long = "name", verbatim_doc_comment)] + daemon_name: Option, }, /// To interact with a Zenoh-Flow daemon. @@ -86,27 +93,7 @@ enum Command { /// Run a dataflow locally. #[command(verbatim_doc_comment)] - RunLocal { - /// The data flow to execute. - flow: PathBuf, - /// The path to a Zenoh configuration to manage the connection to the Zenoh - /// network. - /// - /// If no configuration is provided, `zfctl` will default to connecting as - /// a peer with multicast scouting enabled. - #[arg(short = 'z', long, verbatim_doc_comment)] - zenoh_configuration: Option, - /// The, optional, location of the configuration to load nodes implemented not in Rust. - #[arg(short, long, value_name = "path")] - extensions: Option, - /// Variables to add / overwrite in the `vars` section of your data - /// flow, with the form `KEY=VALUE`. Can be repeated multiple times. - /// - /// Example: - /// --vars HOME_DIR=/home/zenoh-flow --vars BUILD=debug - #[arg(long, value_parser = parse_vars::, verbatim_doc_comment)] - vars: Option>, - }, + RunLocal(RunLocalCommand), } #[async_std::main] @@ -136,10 +123,10 @@ async fn main() -> Result<()> { match zfctl.command { Command::Instance { command, - runtime_id, - runtime_name, + daemon_id, + daemon_name, } => { - let orchestrator_id = match (runtime_id, runtime_name) { + let orchestrator_id = match (daemon_id, daemon_name) { (Some(id), _) => id, (None, Some(name)) => get_runtime_by_name(&session, &name).await, (None, None) => get_random_runtime(&session).await, @@ -148,11 +135,6 @@ async fn main() -> Result<()> { command.run(session, orchestrator_id).await } Command::Daemon(command) => command.run(session).await, - Command::RunLocal { - flow, - zenoh_configuration, - extensions, - vars, - } => run_local::run_locally(flow, zenoh_configuration, extensions, vars).await, + Command::RunLocal(command) => command.run(session).await, } } diff --git a/zfctl/src/run_local.rs b/zfctl/src/run_local.rs deleted file mode 100644 index e655d005..00000000 --- a/zfctl/src/run_local.rs +++ /dev/null @@ -1,117 +0,0 @@ -use std::path::PathBuf; - -use anyhow::Context; -use async_std::io::ReadExt; -use zenoh_flow_commons::{Result, Vars}; -use zenoh_flow_descriptors::{DataFlowDescriptor, FlattenedDataFlowDescriptor}; -use zenoh_flow_records::DataFlowRecord; -use zenoh_flow_runtime::{Extensions, Runtime}; - -pub async fn run_locally( - flow: PathBuf, - zenoh_configuration: Option, - extensions: Option, - vars: Option>, -) -> Result<()> { - let extensions = match extensions { - Some(extensions_path) => { - let (extensions, _) = zenoh_flow_commons::try_parse_from_file::( - extensions_path.as_os_str(), - Vars::default(), - ) - .context(format!( - "Failed to load Loader configuration from < {} >", - &extensions_path.display() - )) - .unwrap(); - - extensions - } - None => Extensions::default(), - }; - - let vars = match vars { - Some(v) => Vars::from(v), - None => Vars::default(), - }; - - let (data_flow, vars) = - zenoh_flow_commons::try_parse_from_file::(flow.as_os_str(), vars) - .context(format!( - "Failed to load data flow descriptor from < {} >", - &flow.display() - )) - .unwrap(); - - let flattened_flow = FlattenedDataFlowDescriptor::try_flatten(data_flow, vars) - .context(format!( - "Failed to flattened data flow extracted from < {} >", - &flow.display() - )) - .unwrap(); - - let mut runtime_builder = Runtime::builder("zenoh-flow-standalone-runtime") - .add_extensions(extensions) - .expect("Failed to add extensions"); - - if let Some(path) = zenoh_configuration { - let zenoh_config = zenoh_flow_runtime::zenoh::Config::from_file(path.clone()) - .unwrap_or_else(|e| { - panic!( - "Failed to parse the Zenoh configuration from < {} >:\n{e:?}", - path.display() - ) - }); - let zenoh_session = zenoh_flow_runtime::zenoh::open(zenoh_config) - .await - .unwrap_or_else(|e| panic!("Failed to open a Zenoh session: {e:?}")); - - runtime_builder = runtime_builder.session(zenoh_session); - } - - let runtime = runtime_builder - .build() - .await - .expect("Failed to build the Zenoh-Flow runtime"); - - let record = DataFlowRecord::try_new(&flattened_flow, runtime.id()) - .context("Failed to create a Record from the flattened data flow descriptor") - .unwrap(); - - let instance_id = record.instance_id().clone(); - let record_name = record.name().clone(); - runtime - .try_load_data_flow(record) - .await - .context("Failed to load Record") - .unwrap(); - - runtime - .try_start_instance(&instance_id) - .await - .unwrap_or_else(|e| panic!("Failed to start data flow < {} >: {:?}", &instance_id, e)); - - let mut stdin = async_std::io::stdin(); - let mut input = [0_u8]; - println!( - r#" - The flow ({}) < {} > was successfully started. - To abort its execution, simply enter 'q'. - "#, - record_name, instance_id - ); - - loop { - let _ = stdin.read_exact(&mut input).await; - if input[0] == b'q' { - break; - } - } - - runtime - .try_delete_instance(&instance_id) - .await - .unwrap_or_else(|e| panic!("Failed to delete data flow < {} >: {:?}", &instance_id, e)); - - Ok(()) -} diff --git a/zfctl/src/run_local_command.rs b/zfctl/src/run_local_command.rs new file mode 100644 index 00000000..78e2d26b --- /dev/null +++ b/zfctl/src/run_local_command.rs @@ -0,0 +1,120 @@ +use std::path::PathBuf; + +use anyhow::Context; +use async_std::io::ReadExt; +use clap::Parser; +use zenoh::Session; +use zenoh_flow_commons::{parse_vars, Result, Vars}; +use zenoh_flow_descriptors::{DataFlowDescriptor, FlattenedDataFlowDescriptor}; +use zenoh_flow_records::DataFlowRecord; +use zenoh_flow_runtime::{Extensions, Runtime}; + +#[derive(Parser)] +pub struct RunLocalCommand { + /// The data flow to execute. + flow: PathBuf, + /// The, optional, location of the configuration to load nodes implemented not in Rust. + #[arg(short, long, value_name = "path")] + extensions: Option, + /// Variables to add / overwrite in the `vars` section of your data + /// flow, with the form `KEY=VALUE`. Can be repeated multiple times. + /// + /// Example: + /// --vars HOME_DIR=/home/zenoh-flow --vars BUILD=debug + #[arg(long, value_parser = parse_vars::, verbatim_doc_comment)] + vars: Option>, +} + +impl RunLocalCommand { + pub async fn run(self, session: Session) -> Result<()> { + let extensions = match self.extensions { + Some(extensions_path) => { + let (extensions, _) = zenoh_flow_commons::try_parse_from_file::( + extensions_path.as_os_str(), + Vars::default(), + ) + .context(format!( + "Failed to load Loader configuration from < {} >", + &extensions_path.display() + )) + .unwrap(); + + extensions + } + None => Extensions::default(), + }; + + let vars = match self.vars { + Some(v) => Vars::from(v), + None => Vars::default(), + }; + + let (data_flow, vars) = zenoh_flow_commons::try_parse_from_file::( + self.flow.as_os_str(), + vars, + ) + .context(format!( + "Failed to load data flow descriptor from < {} >", + &self.flow.display() + )) + .unwrap(); + + let flattened_flow = FlattenedDataFlowDescriptor::try_flatten(data_flow, vars) + .context(format!( + "Failed to flattened data flow extracted from < {} >", + &self.flow.display() + )) + .unwrap(); + + let runtime_builder = Runtime::builder("zenoh-flow-standalone-runtime") + .add_extensions(extensions) + .expect("Failed to add extensions") + .session(session); + + let runtime = runtime_builder + .build() + .await + .expect("Failed to build the Zenoh-Flow runtime"); + + let record = DataFlowRecord::try_new(&flattened_flow, runtime.id()) + .context("Failed to create a Record from the flattened data flow descriptor") + .unwrap(); + + let instance_id = record.instance_id().clone(); + let record_name = record.name().clone(); + runtime + .try_load_data_flow(record) + .await + .context("Failed to load Record") + .unwrap(); + + runtime + .try_start_instance(&instance_id) + .await + .unwrap_or_else(|e| panic!("Failed to start data flow < {} >: {:?}", &instance_id, e)); + + let mut stdin = async_std::io::stdin(); + let mut input = [0_u8]; + println!( + r#" + The flow ({}) < {} > was successfully started. + To abort its execution, simply enter 'q'. + "#, + record_name, instance_id + ); + + loop { + let _ = stdin.read_exact(&mut input).await; + if input[0] == b'q' { + break; + } + } + + runtime + .try_delete_instance(&instance_id) + .await + .unwrap_or_else(|e| panic!("Failed to delete data flow < {} >: {:?}", &instance_id, e)); + + Ok(()) + } +} diff --git a/zfctl/src/utils.rs b/zfctl/src/utils.rs index 6db3dea2..982d536e 100644 --- a/zfctl/src/utils.rs +++ b/zfctl/src/utils.rs @@ -18,14 +18,14 @@ use zenoh::{query::ConsolidationMode, Session}; use zenoh_flow_commons::RuntimeId; use zenoh_flow_daemon::queries::{selector_all_runtimes, RuntimeInfo, RuntimesQuery}; -/// Returns the list of [RuntimeInfo] of the reachable Zenoh-Flow Runtime(s). +/// Returns the list of [RuntimeInfo] of the reachable Zenoh-Flow Daemon(s). /// /// # Panic /// /// This function will panic if: -/// - (internal error) the query to list the Zenoh-Flow Runtimes could not be serialised by `serde_json`, +/// - (internal error) the query to list the Zenoh-Flow Daemons could not be serialised by `serde_json`, /// - the query on the Zenoh network failed, -/// - no Zenoh-Flow Runtime is reachable. +/// - no Zenoh-Flow Daemon is reachable. pub(crate) async fn get_all_runtimes(session: &Session) -> Vec { let value = serde_json::to_vec(&RuntimesQuery::List) .unwrap_or_else(|e| panic!("`serde_json` failed to serialize `RuntimeQuery::List`: {e:?}")); @@ -33,10 +33,10 @@ pub(crate) async fn get_all_runtimes(session: &Session) -> Vec { let runtime_replies = session .get(selector_all_runtimes()) .payload(value) - // We want to address all the Zenoh-Flow runtimes that are reachable on the Zenoh network. + // We want to address all the Zenoh-Flow daemons that are reachable on the Zenoh network. .consolidation(ConsolidationMode::None) .await - .unwrap_or_else(|e| panic!("Failed to query available runtimes:\n{:?}", e)); + .unwrap_or_else(|e| panic!("Failed to query available daemons:\n{:?}", e)); let mut runtimes = Vec::new(); while let Ok(reply) = runtime_replies.recv_async().await { @@ -56,7 +56,7 @@ pub(crate) async fn get_all_runtimes(session: &Session) -> Vec { if runtimes.is_empty() { panic!( - "No Zenoh-Flow runtime were detected. Have you checked if (i) they are up and (ii) \ + "No Zenoh-Flow daemon were detected. Have you checked if (i) they are up and (ii) \ reachable through Zenoh?" ); } @@ -64,13 +64,13 @@ pub(crate) async fn get_all_runtimes(session: &Session) -> Vec { runtimes } -/// Returns the unique identifier of the Zenoh-Flow Runtime that has the provided `name`. +/// Returns the unique identifier of the Zenoh-Flow Daemon that has the provided `name`. /// /// # Panic /// /// This function will panic if: -/// - there is no Zenoh-Flow Runtime that has the provided name, -/// - there are more than 1 Zenoh-Flow Runtime with the provided name. +/// - there is no Zenoh-Flow Daemon that has the provided name, +/// - there are more than 1 Zenoh-Flow Daemon with the provided name. pub(crate) async fn get_runtime_by_name(session: &Session, name: &str) -> RuntimeId { let runtimes = get_all_runtimes(session).await; let mut matching_runtimes = runtimes @@ -79,14 +79,14 @@ pub(crate) async fn get_runtime_by_name(session: &Session, name: &str) -> Runtim .collect_vec(); if matching_runtimes.is_empty() { - panic!("Found no Zenoh-Flow Runtime with name < {name} >"); + panic!("Found no Zenoh-Flow Daemon with name < {name} >"); } else if matching_runtimes.len() > 1 { - tracing::error!("Found multiple Zenoh-Flow Runtimes named < {name} >:"); + tracing::error!("Found multiple Zenoh-Flow Daemons named < {name} >:"); matching_runtimes.iter().for_each(|&r_info| { tracing::error!("- {} - (id) {}", r_info.name, r_info.id); }); panic!( - "There are multiple Zenoh-Flow Runtimes named < {name} >, please use their 'id' \ + "There are multiple Zenoh-Flow Daemons named < {name} >, please use their 'id' \ instead" ); } else { @@ -94,14 +94,14 @@ pub(crate) async fn get_runtime_by_name(session: &Session, name: &str) -> Runtim } } -/// Returns the unique identifier of a reachable Zenoh-Flow Runtime. +/// Returns the unique identifier of a reachable Zenoh-Flow Daemon. /// /// # Panic /// /// This function will panic if: -/// - (internal error) the query to list the Zenoh-Flow Runtimes could not be serialised by `serde_json`, +/// - (internal error) the query to list the Zenoh-Flow Daemons could not be serialised by `serde_json`, /// - the query on the Zenoh network failed, -/// - no Zenoh-Flow Runtime is reachable. +/// - no Zenoh-Flow Daemon is reachable. pub(crate) async fn get_random_runtime(session: &Session) -> RuntimeId { let mut runtimes = get_all_runtimes(session).await; let orchestrator = runtimes.remove(rand::thread_rng().gen_range(0..runtimes.len()));