From d1a71357829dfec7c8426f77f104561eb140ae8c Mon Sep 17 00:00:00 2001 From: Huan-Cheng Chang Date: Thu, 26 Sep 2024 09:56:51 +0100 Subject: [PATCH] feat(jstzd): implement async OctezNode --- Cargo.lock | 1 + crates/jstzd/Cargo.toml | 1 + crates/jstzd/src/task/async_octez_node.rs | 102 ++++++++++++++++++++++ crates/jstzd/src/task/mod.rs | 1 + crates/jstzd/src/task/octez_node.rs | 18 ++-- 5 files changed, 114 insertions(+), 9 deletions(-) create mode 100644 crates/jstzd/src/task/async_octez_node.rs diff --git a/Cargo.lock b/Cargo.lock index ae264669..3ab076e7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2644,6 +2644,7 @@ dependencies = [ "octez", "rand 0.8.5", "reqwest", + "serde", "tempfile", "tokio", ] diff --git a/crates/jstzd/Cargo.toml b/crates/jstzd/Cargo.toml index 9664bb79..5f9b2804 100644 --- a/crates/jstzd/Cargo.toml +++ b/crates/jstzd/Cargo.toml @@ -12,6 +12,7 @@ async-trait.workspace = true bollard.workspace = true futures-util.workspace = true octez = { path = "../octez" } +serde.workspace = true tempfile.workspace = true tokio.workspace = true diff --git a/crates/jstzd/src/task/async_octez_node.rs b/crates/jstzd/src/task/async_octez_node.rs new file mode 100644 index 00000000..769b273e --- /dev/null +++ b/crates/jstzd/src/task/async_octez_node.rs @@ -0,0 +1,102 @@ +use std::{fs::File, path::PathBuf, process::Stdio}; + +use tokio::process::{Child, Command}; + +use anyhow::{anyhow, Result}; + +fn path_or_default<'a>(path: Option<&'a PathBuf>, default: &'a str) -> &'a str { + path.and_then(|bin| bin.to_str()).unwrap_or(default) +} + +async fn run_command(command: &mut Command) -> Result<()> { + let output = command.output().await?; + + if !output.status.success() { + return Err(anyhow!( + "Command {:?} failed:\n {}", + command, + String::from_utf8_lossy(&output.stderr) + )); + } + + Ok(()) +} + +pub struct AsyncOctezNode { + /// Path to the octez-node binary + /// If None, the binary will inside PATH will be used + pub octez_node_bin: Option, + /// Path to the octez-node directory + pub octez_node_dir: PathBuf, +} + +impl AsyncOctezNode { + fn command(&self) -> Command { + Command::new(path_or_default(self.octez_node_bin.as_ref(), "octez-node")) + } + + pub async fn config_init( + &self, + network: &str, + rpc_endpoint: &str, + num_connections: u32, + ) -> Result<()> { + run_command(self.command().args([ + "config", + "init", + "--network", + network, + "--data-dir", + self.octez_node_dir.to_str().expect("Invalid path"), + "--rpc-addr", + rpc_endpoint, + "--connections", + num_connections.to_string().as_str(), + ])) + .await + } + + pub async fn generate_identity(&self) -> Result<()> { + run_command(self.command().args([ + "identity", + "generate", + "0", + "--data-dir", + self.octez_node_dir.to_str().expect("Invalid path"), + ])) + .await + } + + pub async fn run(&self, log_file: &File, options: &[&str]) -> Result { + let mut command = self.command(); + + command + .args([ + "run", + "--data-dir", + self.octez_node_dir.to_str().expect("Invalid path"), + "--singleprocess", + ]) + .args(options) + .stdout(Stdio::from(log_file.try_clone()?)) + .stderr(Stdio::from(log_file.try_clone()?)); + + Ok(command.spawn()?) + } +} + +#[cfg(test)] +mod tests { + use super::path_or_default; + use std::path::PathBuf; + + #[test] + fn test_path_or_default() { + let path = PathBuf::from("/foo/bar"); + assert_eq!(path_or_default(Some(&path), "not_this_one"), "/foo/bar"); + assert_eq!( + path_or_default(None, "should_return_this"), + "should_return_this" + ); + } +} diff --git a/crates/jstzd/src/task/mod.rs b/crates/jstzd/src/task/mod.rs index fe73028c..72553ba1 100644 --- a/crates/jstzd/src/task/mod.rs +++ b/crates/jstzd/src/task/mod.rs @@ -1,3 +1,4 @@ +mod async_octez_node; pub mod octez_node; use anyhow::Result; diff --git a/crates/jstzd/src/task/octez_node.rs b/crates/jstzd/src/task/octez_node.rs index bc5ec26d..11d44510 100644 --- a/crates/jstzd/src/task/octez_node.rs +++ b/crates/jstzd/src/task/octez_node.rs @@ -5,8 +5,8 @@ use async_trait::async_trait; use std::{fs::File, path::PathBuf, sync::Arc}; use tokio::sync::RwLock; -use octez::OctezNode as InnerOctezNode; -use std::process::Child; +use crate::task::async_octez_node::AsyncOctezNode; +use tokio::process::Child; const DEFAULT_RPC_ENDPOINT: &str = "localhost:8732"; const DEFAULT_NETWORK: &str = "sandbox"; @@ -116,7 +116,7 @@ struct ChildWrapper { impl ChildWrapper { pub async fn kill(&mut self) -> anyhow::Result<()> { if let Some(mut v) = self.inner.take() { - return Ok(v.kill()?); + v.kill().await?; } Ok(()) } @@ -140,15 +140,14 @@ impl Task for OctezNode { /// Spins up the task with the given config. async fn spawn(config: Self::Config) -> Result { - let node = InnerOctezNode { + let node = AsyncOctezNode { octez_node_bin: Some(config.binary_path), octez_node_dir: config.data_dir, }; - // localhost:8731 refers to the peer http endpoint. This will be removed - // when we switch to the async node implementation where this option is removed - node.config_init(&config.network, "localhost:8731", &config.rpc_endpoint, 0)?; - node.generate_identity()?; + node.generate_identity().await?; + node.config_init(&config.network, &config.rpc_endpoint, 0) + .await?; Ok(OctezNode { inner: Arc::new(RwLock::new(AsyncDropper::new(ChildWrapper { inner: Some( @@ -160,7 +159,8 @@ impl Task for OctezNode { .map(|s| s as &str) .collect::>() .as_slice(), - )?, + ) + .await?, ), }))), })