From 14d912d42e325e6f138bfa8fe21a7d206e4a492b Mon Sep 17 00:00:00 2001 From: Valentyn Valiaiev Date: Thu, 31 Oct 2024 18:35:54 +0200 Subject: [PATCH] Re-format --- Cargo.lock | 195 ++++++++++++++++++++++++++- agentwire/macros/src/broker.rs | 57 ++++++-- agentwire/macros/src/test.rs | 35 +++-- agentwire/src/agent/process.rs | 54 ++++++-- agentwire/src/agent/task.rs | 10 +- agentwire/src/agent/thread.rs | 5 +- agentwire/src/lib.rs | 3 +- agentwire/src/port.rs | 239 +++++++++++++++++++++++++-------- agentwire/src/testing_rt.rs | 18 ++- agentwire/tests/process.rs | 8 +- agentwire/tests/task.rs | 8 +- agentwire/tests/thread.rs | 12 +- 12 files changed, 533 insertions(+), 111 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 048b10b3..6cbad737 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -23,6 +23,43 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "aae1277d39aeec15cb388266ecc24b11c80469deae6067e17a1a7aa9e5c1f234" +[[package]] +name = "agentwire" +version = "0.0.1" +dependencies = [ + "agentwire-macros", + "close_fds", + "futures", + "libc", + "nix 0.26.4", + "rkyv", + "shell-words", + "thiserror", + "tokio", + "tracing", +] + +[[package]] +name = "agentwire-macros" +version = "0.0.1" +dependencies = [ + "heck 0.5.0", + "proc-macro2", + "quote", + "syn 2.0.77", +] + +[[package]] +name = "ahash" +version = "0.7.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "891477e0c6a8957309ee5c45a6368af3ae14bb510732d2684ffa19af310920f9" +dependencies = [ + "getrandom 0.2.12", + "once_cell", + "version_check", +] + [[package]] name = "ahash" version = "0.8.11" @@ -649,7 +686,7 @@ version = "1.46.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4abf69a87be33b6f125a93d5046b5f7395c26d1f449bf8d3927f5577463b6de0" dependencies = [ - "ahash", + "ahash 0.8.11", "aws-credential-types", "aws-runtime", "aws-sigv4", @@ -1131,6 +1168,18 @@ version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06c9989a51171e2e81038ab168b6ae22886fe9ded214430dbb4f41c28cf176da" +[[package]] +name = "bitvec" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bc2832c24239b0141d5674bb9174f9d68a8b5b3f2753311927c172ca46f7e9c" +dependencies = [ + "funty", + "radium", + "tap", + "wyz", +] + [[package]] name = "blake3" version = "1.5.0" @@ -1190,6 +1239,28 @@ version = "1.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c3ac9f8b63eca6fd385229b3675f6cc0dc5c8a5c8a54a59d4f52ffd670d87b0c" +[[package]] +name = "bytecheck" +version = "0.6.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23cdc57ce23ac53c931e88a43d06d070a6fd142f2617be5855eb75efc9beb1c2" +dependencies = [ + "bytecheck_derive", + "ptr_meta", + "simdutf8", +] + +[[package]] +name = "bytecheck_derive" +version = "0.6.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3db406d29fbcd95542e92559bed4d8ad92636d1ca8b3b72ede10b4bcc010e659" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "bytemuck" version = "1.14.0" @@ -1417,6 +1488,16 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4bfbf56724aa9eca8afa4fcfadeb479e722935bb2a0900c2d37e0cc477af0688" +[[package]] +name = "close_fds" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3bc416f33de9d59e79e57560f450d21ff8393adcf1cdfc3e6d8fb93d5f88a2ed" +dependencies = [ + "cfg-if", + "libc", +] + [[package]] name = "cmd_lib" version = "1.9.3" @@ -2510,6 +2591,12 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "funty" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6d5a32815ae3f33302d95fdcb2ce17862f8c65363dcfd29360480ba1001fc9c" + [[package]] name = "futures" version = "0.3.30" @@ -2798,6 +2885,9 @@ name = "hashbrown" version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" +dependencies = [ + "ahash 0.7.8", +] [[package]] name = "hashbrown" @@ -2805,7 +2895,7 @@ version = "0.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "43a3c133739dddd0d2990f9a4bdf8eb4b21ef50e4851ca85ab661199821d510e" dependencies = [ - "ahash", + "ahash 0.8.11", ] [[package]] @@ -2814,7 +2904,7 @@ version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" dependencies = [ - "ahash", + "ahash 0.8.11", "allocator-api2", ] @@ -5269,6 +5359,26 @@ dependencies = [ "prost 0.13.3", ] +[[package]] +name = "ptr_meta" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0738ccf7ea06b608c10564b31debd4f5bc5e197fc8bfe088f68ae5ce81e7a4f1" +dependencies = [ + "ptr_meta_derive", +] + +[[package]] +name = "ptr_meta_derive" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16b845dbfca988fa33db069c0e230574d15a3088f147a87b64c7589eb662c9ac" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "qoi" version = "0.4.1" @@ -5308,6 +5418,12 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "radium" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc33ff2d4973d518d823d61aa239014831e521c75da58e3df4840d3f47749d09" + [[package]] name = "rand" version = "0.7.3" @@ -5530,6 +5646,15 @@ version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" +[[package]] +name = "rend" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "71fe3824f5629716b1589be05dacd749f6aa084c87e00e016714a8cdfccc997c" +dependencies = [ + "bytecheck", +] + [[package]] name = "reqwest" version = "0.11.23" @@ -5644,6 +5769,35 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "rkyv" +version = "0.7.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9008cd6385b9e161d8229e1f6549dd23c3d022f132a2ea37ac3a10ac4935779b" +dependencies = [ + "bitvec", + "bytecheck", + "bytes", + "hashbrown 0.12.3", + "ptr_meta", + "rend", + "rkyv_derive", + "seahash", + "tinyvec", + "uuid 1.7.0", +] + +[[package]] +name = "rkyv_derive" +version = "0.7.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "503d1d27590a2b0a3a4ca4c94755aa2875657196ecbf401a42eff41d7de532c0" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "rle-decode-fast" version = "1.0.3" @@ -5812,6 +5966,12 @@ dependencies = [ "untrusted 0.9.0", ] +[[package]] +name = "seahash" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c107b6f4780854c8b126e228ea8869f4d7b71260f962fefb57b996b8959ba6b" + [[package]] name = "sec1" version = "0.3.0" @@ -6108,6 +6268,12 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "shell-words" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24188a676b6ae68c3b2cb3a01be17fbf7240ce009799bb56d5b1409051e78fde" + [[package]] name = "shlex" version = "1.3.0" @@ -6157,6 +6323,12 @@ dependencies = [ "quote", ] +[[package]] +name = "simdutf8" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3a9fe34e3e7a50316060351f37187a3f546bce95496156754b601a5fa71b76e" + [[package]] name = "similar" version = "2.6.0" @@ -6509,18 +6681,18 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.60" +version = "1.0.65" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "579e9083ca58dd9dcf91a9923bb9054071b9ebbd800b342194c9feb0ee89fc18" +checksum = "5d11abd9594d9b38965ef50805c5e469ca9cc6f197f883f717e0269a3057b3d5" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.60" +version = "1.0.65" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2470041c06ec3ac1ab38d0356a6119054dedaea53e12fbefc0de730a1c08524" +checksum = "ae71770322cbd277e69d762a16c444af02aa0575ac0d174f0b9562d3b37f8602" dependencies = [ "proc-macro2", "quote", @@ -7682,6 +7854,15 @@ dependencies = [ "tokio", ] +[[package]] +name = "wyz" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05f360fc0b24296329c78fda852a1e9ae82de9cf7b27dae4b7f62f118f77b9ed" +dependencies = [ + "tap", +] + [[package]] name = "xattr" version = "1.3.1" diff --git a/agentwire/macros/src/broker.rs b/agentwire/macros/src/broker.rs index c509a928..63c83582 100644 --- a/agentwire/macros/src/broker.rs +++ b/agentwire/macros/src/broker.rs @@ -6,7 +6,8 @@ use syn::{ parse::{Parse, ParseStream, Result}, parse_macro_input, punctuated::{Pair, Punctuated}, - Data, DataStruct, DeriveInput, Expr, Field, Fields, FieldsNamed, Ident, Path, Token, + Data, DataStruct, DeriveInput, Expr, Field, Fields, FieldsNamed, Ident, Path, + Token, }; #[derive(PartialEq, Eq, Hash)] @@ -64,8 +65,12 @@ impl Parse for BrokerAttr { #[allow(clippy::too_many_lines)] pub fn proc_macro_derive(input: TokenStream) -> TokenStream { - let DeriveInput { attrs, ident, data, .. } = parse_macro_input!(input); - let Data::Struct(DataStruct { fields, .. }) = data else { panic!("must be a struct") }; + let DeriveInput { + attrs, ident, data, .. + } = parse_macro_input!(input); + let Data::Struct(DataStruct { fields, .. }) = data else { + panic!("must be a struct") + }; let Fields::Named(FieldsNamed { named: fields, .. }) = fields else { panic!("must have named fields") }; @@ -81,26 +86,50 @@ pub fn proc_macro_derive(input: TokenStream) -> TokenStream { .collect::>(); let broker_plan = broker_attrs .iter() - .find_map(|attr| if let BrokerAttr::Plan(expr) = attr { Some(expr) } else { None }) + .find_map(|attr| { + if let BrokerAttr::Plan(expr) = attr { + Some(expr) + } else { + None + } + }) .expect("#[broker] attribute must set a `plan`"); let broker_error = broker_attrs .iter() - .find_map(|attr| if let BrokerAttr::Error(expr) = attr { Some(expr) } else { None }) + .find_map(|attr| { + if let BrokerAttr::Error(expr) = attr { + Some(expr) + } else { + None + } + }) .expect("#[broker] attribute must set an `error`"); let agent_fields = fields.iter().filter_map(|field| { - field.attrs.iter().find(|attr| attr.path().is_ident("agent")).map(|attr| { - let attrs = attr - .parse_args_with(Punctuated::::parse_terminated) - .expect("failed to parse `agent` attribute"); - (field, attrs.into_pairs().map(Pair::into_value).collect::>()) - }) + field + .attrs + .iter() + .find(|attr| attr.path().is_ident("agent")) + .map(|attr| { + let attrs = attr + .parse_args_with( + Punctuated::::parse_terminated, + ) + .expect("failed to parse `agent` attribute"); + ( + field, + attrs + .into_pairs() + .map(Pair::into_value) + .collect::>(), + ) + }) }); let constructor_name = format_ident!("new_{}", ident.to_string().to_snake_case()); - let constructor_fields = agent_fields - .clone() - .map(|(Field { ident, .. }, _)| quote!(#ident: ::agentwire::agent::Cell::Vacant)); + let constructor_fields = agent_fields.clone().map( + |(Field { ident, .. }, _)| quote!(#ident: ::agentwire::agent::Cell::Vacant), + ); let constructor = quote! { macro_rules! #constructor_name { ($($tokens:tt)*) => { diff --git a/agentwire/macros/src/test.rs b/agentwire/macros/src/test.rs index fe2c09ac..33822315 100644 --- a/agentwire/macros/src/test.rs +++ b/agentwire/macros/src/test.rs @@ -31,24 +31,39 @@ impl Parse for TestAttr { } pub fn proc_macro_attribute(attr: TokenStream, item: TokenStream) -> TokenStream { - let test_attrs = - parse_macro_input!(attr with Punctuated::::parse_terminated); + let test_attrs = parse_macro_input!(attr with Punctuated::::parse_terminated); let init = test_attrs .iter() - .find_map(|attr| if let TestAttr::Init(expr) = attr { Some(quote!(#expr)) } else { None }) + .find_map(|attr| { + if let TestAttr::Init(expr) = attr { + Some(quote!(#expr)) + } else { + None + } + }) .unwrap_or_else(|| quote!(|| {})); let timeout = test_attrs .iter() - .find_map( - |attr| { - if let TestAttr::Timeout(expr) = attr { Some(quote!(#expr)) } else { None } - }, - ) + .find_map(|attr| { + if let TestAttr::Timeout(expr) = attr { + Some(quote!(#expr)) + } else { + None + } + }) .unwrap_or_else(|| quote!(::agentwire::testing_rt::DEFAULT_TIMEOUT)); - let ItemFn { attrs, vis, mut sig, block } = parse_macro_input!(item as ItemFn); + let ItemFn { + attrs, + vis, + mut sig, + block, + } = parse_macro_input!(item as ItemFn); let test_name = LitStr::new(&sig.ident.to_string(), sig.ident.span()); - assert!(take(&mut sig.asyncness).is_some(), "Test function must be async"); + assert!( + take(&mut sig.asyncness).is_some(), + "Test function must be async" + ); let expanded = quote! { #(#attrs)* diff --git a/agentwire/src/agent/process.rs b/agentwire/src/agent/process.rs index 83373408..dd99be73 100644 --- a/agentwire/src/agent/process.rs +++ b/agentwire/src/agent/process.rs @@ -13,7 +13,10 @@ use nix::{ sys::signal::{self, Signal}, unistd::Pid, }; -use rkyv::{de::deserializers::SharedDeserializeMap, Archive, Deserialize, Infallible, Serialize}; +use rkyv::{ + de::deserializers::SharedDeserializeMap, Archive, Deserialize, Infallible, + Serialize, +}; use std::{ env, error::Error, @@ -104,7 +107,8 @@ where ::Archived: Deserialize, Self::Input: Archive + for<'a> Serialize>, Self::Output: Archive + for<'a> Serialize>, - ::Archived: Deserialize, + ::Archived: + Deserialize, { /// Error type returned by the agent. type Error: Debug; @@ -136,9 +140,13 @@ where wait_kill_rx.await.unwrap(); tracing::info!("Process agent {} killed", Self::NAME); }; - let spawn_process = spawn_process_impl(self, inner, send_kill_rx, wait_kill_tx, logger); + let spawn_process = + spawn_process_impl(self, inner, send_kill_rx, wait_kill_tx, logger); spawn_named_thread(format!("proc-ipc-{}", Self::NAME), || { - let rt = runtime::Builder::new_current_thread().enable_all().build().unwrap(); + let rt = runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); rt.block_on(task::LocalSet::new().run_until(spawn_process)); }); (outer, kill.boxed()) @@ -171,7 +179,9 @@ where /// This function must be called as early in the program lifetime as possible. /// Everything before this function call gets duplicated for each process-based /// agent. -pub fn init(call_process_agent: impl FnOnce(&str, OwnedFd) -> Result<(), Box>) { +pub fn init( + call_process_agent: impl FnOnce(&str, OwnedFd) -> Result<(), Box>, +) { match (env::var(SHMEM_ENV), env::var(PARENT_PID_ENV)) { (Ok(shmem), Ok(parent_pid)) => { let result = unsafe { libc::prctl(libc::PR_SET_PDEATHSIG, libc::SIGKILL) }; @@ -188,7 +198,9 @@ pub fn init(call_process_agent: impl FnOnce(&str, OwnedFd) -> Result<(), Box().expect("shared memory file descriptor to be an integer"), + shmem + .parse::() + .expect("shared memory file descriptor to be an integer"), ) }; // Agent's name is the first argument. @@ -198,7 +210,9 @@ pub fn init(call_process_agent: impl FnOnce(&str, OwnedFd) -> Result<(), Box tracing::warn!("Agent {name} exited"), - Err(err) => tracing::error!("Agent {name} exited with an error: {err:#?}"), + Err(err) => { + tracing::error!("Agent {name} exited with an error: {err:#?}"); + } } process::exit(1); } @@ -215,7 +229,11 @@ pub fn init(call_process_agent: impl FnOnce(&str, OwnedFd) -> Result<(), Box( let (shmem_fd, close) = inner .into_shared_memory(T::NAME, &init_state, recovered_inputs) .expect("couldn't initialize shared memory"); - let exe = env::current_exe().expect("couldn't determine current executable file"); + let exe = + env::current_exe().expect("couldn't determine current executable file"); let initializer = T::initializer(); let mut child_fds = initializer.keep_file_descriptors(); @@ -275,7 +294,10 @@ async fn spawn_process_impl( .arg0(format!("proc-{}", T::NAME)) .args( env::var(ARGS_ENV) - .map(|args| shell_words::split(&args).expect("invalid process arguments")) + .map(|args| { + shell_words::split(&args) + .expect("invalid process arguments") + }) .unwrap_or_default(), ) .envs(initializer.envs()) @@ -295,8 +317,16 @@ async fn spawn_process_impl( drop(shmem_fd); drop(initializer); let pid = Pid::from_raw(child.id().unwrap().try_into().unwrap()); - task::spawn(logger(T::NAME, child.stdout.take().unwrap(), child.stderr.take().unwrap())); - tracing::info!("Process agent {} spawned with PID: {}", T::NAME, pid.as_raw()); + task::spawn(logger( + T::NAME, + child.stdout.take().unwrap(), + child.stderr.take().unwrap(), + )); + tracing::info!( + "Process agent {} spawned with PID: {}", + T::NAME, + pid.as_raw() + ); match future::select(Box::pin(child.wait()), &mut send_kill_rx).await { Either::Left((status, _)) => { let status = status.expect("failed to run a sub-process"); diff --git a/agentwire/src/agent/task.rs b/agentwire/src/agent/task.rs index 37954bb7..64d017cb 100644 --- a/agentwire/src/agent/task.rs +++ b/agentwire/src/agent/task.rs @@ -10,7 +10,10 @@ pub trait Task: Agent + Send { type Error: Debug; /// Runs the agent event-loop inside a dedicated asynchronous task. - fn run(self, port: port::Inner) -> impl Future> + Send; + fn run( + self, + port: port::Inner, + ) -> impl Future> + Send; /// Spawns a new task running the agent event-loop and returns a handle for /// bi-directional communication with the agent. @@ -23,7 +26,10 @@ pub trait Task: Agent + Send { tracing::warn!("Task agent {} exited", Self::NAME); } Err(err) => { - tracing::error!("Task agent {} exited with error: {err:#?}", Self::NAME); + tracing::error!( + "Task agent {} exited with error: {err:#?}", + Self::NAME + ); } } }); diff --git a/agentwire/src/agent/thread.rs b/agentwire/src/agent/thread.rs index 8fd4367f..015658f7 100644 --- a/agentwire/src/agent/thread.rs +++ b/agentwire/src/agent/thread.rs @@ -22,7 +22,10 @@ pub trait Thread: Agent + Send { tracing::warn!("Thread agent {} exited", Self::NAME); } Err(err) => { - tracing::error!("Thread agent {} exited with error: {err:#?}", Self::NAME); + tracing::error!( + "Thread agent {} exited with error: {err:#?}", + Self::NAME + ); } } }); diff --git a/agentwire/src/lib.rs b/agentwire/src/lib.rs index 911f9126..8858f5fb 100644 --- a/agentwire/src/lib.rs +++ b/agentwire/src/lib.rs @@ -336,7 +336,8 @@ where .name(name.clone()) .spawn(move || { if let Ok(title) = CString::new(name.as_bytes()) { - let result = unsafe { libc::prctl(libc::PR_SET_NAME, title.as_ptr(), 0, 0, 0) }; + let result = + unsafe { libc::prctl(libc::PR_SET_NAME, title.as_ptr(), 0, 0, 0) }; if result == -1 { eprintln!( "failed to set thread name to '{name}': {:#?}", diff --git a/agentwire/src/port.rs b/agentwire/src/port.rs index 6908ba26..a225956a 100644 --- a/agentwire/src/port.rs +++ b/agentwire/src/port.rs @@ -111,8 +111,8 @@ use rkyv::{ de::deserializers::SharedDeserializeMap, ser::{ serializers::{ - AllocScratch, BufferSerializer, CompositeSerializer, FallbackScratch, HeapScratch, - SharedSerializeMap, + AllocScratch, BufferSerializer, CompositeSerializer, FallbackScratch, + HeapScratch, SharedSerializeMap, }, Serializer, }, @@ -215,7 +215,8 @@ pub trait SharedPort: Port where Self::Input: Archive + for<'a> Serialize>, Self::Output: Archive + for<'a> Serialize>, - ::Archived: Deserialize, + ::Archived: + Deserialize, { /// Buffer size for input messages. Must be at least `size_of::()` /// for a zero-sized input. @@ -312,26 +313,41 @@ type InitialInputs = Vec<(Box<[u8]>, Instant)>; pub fn new() -> (Inner, Outer) { let (input_tx, input_rx) = mpsc::channel(T::INPUT_CAPACITY); let (output_tx, output_rx) = mpsc::channel(T::OUTPUT_CAPACITY); - let inner = Inner { tx: output_tx, rx: input_rx }; - let outer = Outer { tx: input_tx, rx: output_rx }; + let inner = Inner { + tx: output_tx, + rx: input_rx, + }; + let outer = Outer { + tx: input_tx, + rx: output_rx, + }; (inner, outer) } impl Input { /// Creates a new input value with the source timestamp of now. pub fn new(value: T::Input) -> Self { - Self { value, source_ts: Instant::now() } + Self { + value, + source_ts: Instant::now(), + } } /// Creates a new input value with the source timestamp of the original /// input. pub fn derive(&self, value: O::Input) -> Input { - Input { value, source_ts: self.source_ts } + Input { + value, + source_ts: self.source_ts, + } } /// Creates a new output value with the source timestamp of the input. pub fn chain(&self, value: T::Output) -> Output { - Output { value, source_ts: self.source_ts } + Output { + value, + source_ts: self.source_ts, + } } /// Returns a closure, which creates a new output value with the source @@ -348,7 +364,10 @@ where { /// Creates a new output value with the source timestamp of the input. pub fn chain(&self, value: T::Output) -> Output { - Output { value, source_ts: self.source_ts } + Output { + value, + source_ts: self.source_ts, + } } /// Returns a closure, which creates a new output value with the source @@ -362,13 +381,19 @@ where impl Output { /// Creates a new output value with the source timestamp of now. pub fn new(value: T::Output) -> Self { - Self { value, source_ts: Instant::now() } + Self { + value, + source_ts: Instant::now(), + } } /// Creates a new output value with the source timestamp of the original /// output. pub fn derive(&self, value: O::Output) -> Output { - Output { value, source_ts: self.source_ts } + Output { + value, + source_ts: self.source_ts, + } } /// Returns a closure, which creates a new output value with the source @@ -380,7 +405,10 @@ impl Output { /// Creates a new input value with the source timestamp of the output. pub fn chain(&self, value: O::Input) -> Input { - Input { value, source_ts: self.source_ts } + Input { + value, + source_ts: self.source_ts, + } } /// Returns a closure, which creates a new input value with the source @@ -400,7 +428,10 @@ impl Outer { /// from the agent. Instead the broker sends a message to the agent and /// blocks until it's received by the agent. #[allow(clippy::mut_mut)] // triggered by `select!` internals - pub async fn send_unjam(&mut self, message: Input) -> Result<(), SendUnjamError> { + pub async fn send_unjam( + &mut self, + message: Input, + ) -> Result<(), SendUnjamError> { let mut send = self.tx.send(message).fuse(); let mut recv = self.rx.next(); loop { @@ -418,7 +449,10 @@ impl Outer { impl Stream for Outer { type Item = Output; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { Pin::new(&mut self.rx).poll_next(cx) } } @@ -432,7 +466,10 @@ impl FusedStream for Outer { impl Sink> for Outer { type Error = SendError; - fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_ready( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { Pin::new(&mut self.tx).poll_ready(cx) } @@ -440,11 +477,17 @@ impl Sink> for Outer { Pin::new(&mut self.tx).start_send(item) } - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_flush( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { Pin::new(&mut self.tx).poll_flush(cx) } - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_close( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { Pin::new(&mut self.tx).poll_close(cx) } } @@ -452,7 +495,10 @@ impl Sink> for Outer { impl Stream for Inner { type Item = Input; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { Pin::new(&mut self.rx).poll_next(cx) } } @@ -466,19 +512,31 @@ impl FusedStream for Inner { impl Sink> for Inner { type Error = SendError; - fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_ready( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { Pin::new(&mut self.tx).poll_ready(cx) } - fn start_send(mut self: Pin<&mut Self>, item: Output) -> Result<(), Self::Error> { + fn start_send( + mut self: Pin<&mut Self>, + item: Output, + ) -> Result<(), Self::Error> { Pin::new(&mut self.tx).start_send(item) } - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_flush( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { Pin::new(&mut self.tx).poll_flush(cx) } - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_close( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { Pin::new(&mut self.tx).poll_close(cx) } } @@ -526,13 +584,19 @@ where NonZeroUsize::new(size).expect("to always be positive") } - unsafe fn create(name: &str) -> Result<(*mut Self, OwnedFd), CreateSharedMemoryError> { + unsafe fn create( + name: &str, + ) -> Result<(*mut Self, OwnedFd), CreateSharedMemoryError> { let size = Self::size_of(); let name = CString::new(name).map_err(CreateSharedMemoryError::InvalidName)?; let raw_fd = memfd_create(&name, MemFdCreateFlag::empty()) - .map_err(CreateSharedMemoryError::MemfdCreate)? as RawFd; + .map_err(CreateSharedMemoryError::MemfdCreate)? + as RawFd; let fd = unsafe { OwnedFd::from_raw_fd(raw_fd) }; - let len = size.get().try_into().expect("shared memory size is extremely large"); + let len = size + .get() + .try_into() + .expect("shared memory size is extremely large"); ftruncate(fd.as_raw_fd(), len).map_err(CreateSharedMemoryError::Ftruncate)?; let ptr = unsafe { mmap( @@ -547,10 +611,14 @@ where .cast::() }; unsafe { - sem_init(&mut (*ptr).input_tx, 1, 0).map_err(CreateSharedMemoryError::SemInit)?; - sem_init(&mut (*ptr).input_rx, 1, 0).map_err(CreateSharedMemoryError::SemInit)?; - sem_init(&mut (*ptr).output_tx, 1, 1).map_err(CreateSharedMemoryError::SemInit)?; - sem_init(&mut (*ptr).output_rx, 1, 0).map_err(CreateSharedMemoryError::SemInit)?; + sem_init(&mut (*ptr).input_tx, 1, 0) + .map_err(CreateSharedMemoryError::SemInit)?; + sem_init(&mut (*ptr).input_rx, 1, 0) + .map_err(CreateSharedMemoryError::SemInit)?; + sem_init(&mut (*ptr).output_tx, 1, 1) + .map_err(CreateSharedMemoryError::SemInit)?; + sem_init(&mut (*ptr).output_rx, 1, 0) + .map_err(CreateSharedMemoryError::SemInit)?; (*ptr).input_count = 0; (*ptr).input_index = 0; } @@ -575,11 +643,16 @@ where unsafe fn destroy(ptr: *mut Self) -> Result<(), DestroySharedMemoryError> { unsafe { - sem_destroy(&mut (*ptr).input_tx).map_err(DestroySharedMemoryError::SemDestroy)?; - sem_destroy(&mut (*ptr).input_rx).map_err(DestroySharedMemoryError::SemDestroy)?; - sem_destroy(&mut (*ptr).output_tx).map_err(DestroySharedMemoryError::SemDestroy)?; - sem_destroy(&mut (*ptr).output_rx).map_err(DestroySharedMemoryError::SemDestroy)?; - munmap(ptr.cast(), Self::size_of().get()).map_err(DestroySharedMemoryError::Munmap)?; + sem_destroy(&mut (*ptr).input_tx) + .map_err(DestroySharedMemoryError::SemDestroy)?; + sem_destroy(&mut (*ptr).input_rx) + .map_err(DestroySharedMemoryError::SemDestroy)?; + sem_destroy(&mut (*ptr).output_tx) + .map_err(DestroySharedMemoryError::SemDestroy)?; + sem_destroy(&mut (*ptr).output_rx) + .map_err(DestroySharedMemoryError::SemDestroy)?; + munmap(ptr.cast(), Self::size_of().get()) + .map_err(DestroySharedMemoryError::Munmap)?; } Ok(()) } @@ -596,7 +669,10 @@ where unsafe fn input(&mut self, n: usize) -> &mut [u8] { unsafe { slice::from_raw_parts_mut( - ptr::addr_of_mut!(*self).add(1).cast::().add(T::SERIALIZED_INPUT_SIZE * n), + ptr::addr_of_mut!(*self) + .add(1) + .cast::() + .add(T::SERIALIZED_INPUT_SIZE * n), T::SERIALIZED_INPUT_SIZE, ) } @@ -605,7 +681,10 @@ where unsafe fn output(&mut self) -> &mut [u8] { unsafe { slice::from_raw_parts_mut( - ptr::addr_of_mut!(*self).add(1).cast::().add(T::SERIALIZED_INPUT_SIZE * 2), + ptr::addr_of_mut!(*self) + .add(1) + .cast::() + .add(T::SERIALIZED_INPUT_SIZE * 2), T::SERIALIZED_OUTPUT_SIZE, ) } @@ -627,7 +706,10 @@ where init_state: &T, initial_inputs: InitialInputs, ) -> Result< - (OwnedFd, impl Future>), + ( + OwnedFd, + impl Future>, + ), CreateSharedMemoryError, > { let Self { tx, rx } = self; @@ -647,7 +729,9 @@ where let shared_memory = addr as *mut SharedMemory; assert!((*shared_memory).input_count <= 2); for mut i in 0..(*shared_memory).input_count { - if (*shared_memory).input_count == 2 && (*shared_memory).input_index == 0 { + if (*shared_memory).input_count == 2 + && (*shared_memory).input_index == 0 + { i = (i + 1) % 2; } let input = Box::from(&*(*shared_memory).input(i)); @@ -682,7 +766,8 @@ where #[allow(clippy::missing_panics_doc)] pub fn init_state(&mut self) -> &::Archived { unsafe { - let init_state = deserialize_message::((*self.shared_memory).init_state()); + let init_state = + deserialize_message::((*self.shared_memory).init_state()); sem_post(&mut (*self.shared_memory).input_tx).expect("semaphore failure"); init_state } @@ -694,7 +779,9 @@ where unsafe { sem_wait(&mut (*self.shared_memory).input_rx).expect("semaphore failure"); let input_index = 1 - (*self.shared_memory).input_index; - let value = deserialize_message::((*self.shared_memory).input(input_index)); + let value = deserialize_message::( + (*self.shared_memory).input(input_index), + ); let source_ts = (*self.shared_memory).input_ts[input_index]; sem_post(&mut (*self.shared_memory).input_tx).expect("semaphore failure"); ArchivedInput { value, source_ts } @@ -706,7 +793,10 @@ where #[allow(clippy::missing_panics_doc)] pub fn try_recv(&mut self) -> Option> { unsafe { - if sem_getvalue(&mut (*self.shared_memory).input_rx).expect("semaphore failure") > 0 { + if sem_getvalue(&mut (*self.shared_memory).input_rx) + .expect("semaphore failure") + > 0 + { Some(self.recv()) } else { None @@ -719,7 +809,11 @@ where pub fn send(&mut self, output: &Output) { unsafe { sem_wait(&mut (*self.shared_memory).output_tx).expect("semaphore failure"); - serialize_message((*self.shared_memory).output(), &mut self.scratch, &output.value); + serialize_message( + (*self.shared_memory).output(), + &mut self.scratch, + &output.value, + ); (*self.shared_memory).output_ts = output.source_ts; sem_post(&mut (*self.shared_memory).output_rx).expect("semaphore failure"); } @@ -730,7 +824,10 @@ where #[allow(clippy::missing_panics_doc)] pub fn try_send(&mut self, output: &Output) -> bool { unsafe { - if sem_getvalue(&mut (*self.shared_memory).output_tx).expect("semaphore failure") > 0 { + if sem_getvalue(&mut (*self.shared_memory).output_tx) + .expect("semaphore failure") + > 0 + { self.send(output); true } else { @@ -752,7 +849,9 @@ fn serialize_message( scratch.take().unwrap(), SharedSerializeMap::new(), // reuse of this map doesn't work ); - serializer.serialize_value(value).expect("failed to serialize an IPC message"); + serializer + .serialize_value(value) + .expect("failed to serialize an IPC message"); let size = serializer.pos(); let (_, c, _) = serializer.into_components(); buf[..mem::size_of::()].copy_from_slice(&size.to_ne_bytes()); @@ -804,19 +903,24 @@ where }; let mut sem_wait = spawn_sem_wait(); loop { - if let Either::Left((_, sem_wait)) = select(&mut stop_tx_rx, sem_wait).await { + if let Either::Left((_, sem_wait)) = select(&mut stop_tx_rx, sem_wait).await + { unsafe { let shared_memory = addr as *mut SharedMemory; - sem_post(&mut (*shared_memory).output_rx).expect("semaphore failure"); + sem_post(&mut (*shared_memory).output_rx) + .expect("semaphore failure"); } sem_wait.await.unwrap(); break; } let (value, source_ts) = unsafe { let shared_memory = addr as *mut SharedMemory; - let archived = deserialize_message::((*shared_memory).output()); + let archived = + deserialize_message::((*shared_memory).output()); // Reuse of `SharedDeserializeMap` doesn't work - let value = archived.deserialize(&mut SharedDeserializeMap::new()).unwrap(); + let value = archived + .deserialize(&mut SharedDeserializeMap::new()) + .unwrap(); let source_ts = (*shared_memory).output_ts; sem_post(&mut (*shared_memory).output_tx).expect("semaphore failure"); (value, source_ts) @@ -855,10 +959,12 @@ where let mut sem_wait = spawn_sem_wait(); let mut scratch = Some(FallbackScratch::default()); loop { - if let Either::Left((_, sem_wait)) = select(&mut stop_rx_rx, sem_wait).await { + if let Either::Left((_, sem_wait)) = select(&mut stop_rx_rx, sem_wait).await + { unsafe { let shared_memory = addr as *mut SharedMemory; - sem_post(&mut (*shared_memory).input_tx).expect("semaphore failure"); + sem_post(&mut (*shared_memory).input_tx) + .expect("semaphore failure"); } sem_wait.await.unwrap(); break; @@ -874,7 +980,8 @@ where unsafe { let shared_memory = addr as *mut SharedMemory; let input_index = (*shared_memory).input_index; - (*shared_memory).input_count = ((*shared_memory).input_count + 1).min(2); + (*shared_memory).input_count = + ((*shared_memory).input_count + 1).min(2); (*shared_memory).input_index = ((*shared_memory).input_index + 1) % 2; match input { Either::Left((input, input_ts)) => { @@ -904,26 +1011,46 @@ where unsafe fn sem_init(sem: *mut sem_t, pshared: c_int, value: c_uint) -> io::Result<()> { let result = unsafe { libc::sem_init(sem, pshared, value) }; - if result == -1 { Err(io::Error::last_os_error()) } else { Ok(()) } + if result == -1 { + Err(io::Error::last_os_error()) + } else { + Ok(()) + } } unsafe fn sem_destroy(sem: *mut sem_t) -> io::Result<()> { let result = unsafe { libc::sem_destroy(sem) }; - if result == -1 { Err(io::Error::last_os_error()) } else { Ok(()) } + if result == -1 { + Err(io::Error::last_os_error()) + } else { + Ok(()) + } } unsafe fn sem_post(sem: *mut sem_t) -> io::Result<()> { let result = unsafe { libc::sem_post(sem) }; - if result == -1 { Err(io::Error::last_os_error()) } else { Ok(()) } + if result == -1 { + Err(io::Error::last_os_error()) + } else { + Ok(()) + } } unsafe fn sem_getvalue(sem: *mut sem_t) -> io::Result { let mut value = 0; let result = unsafe { libc::sem_getvalue(sem, &mut value) }; - if result == -1 { Err(io::Error::last_os_error()) } else { Ok(value) } + if result == -1 { + Err(io::Error::last_os_error()) + } else { + Ok(value) + } } unsafe fn sem_wait(sem: *mut sem_t) -> io::Result<()> { let result = unsafe { libc::sem_wait(sem) }; - if result == -1 { Err(io::Error::last_os_error()) } else { Ok(()) } + if result == -1 { + Err(io::Error::last_os_error()) + } else { + Ok(()) + } } diff --git a/agentwire/src/testing_rt.rs b/agentwire/src/testing_rt.rs index 14713d8a..9a69b9d1 100644 --- a/agentwire/src/testing_rt.rs +++ b/agentwire/src/testing_rt.rs @@ -31,7 +31,11 @@ pub fn run_broker_test( if env::var(BROKER_TEST_ID_ENV).map_or(false, |var| var == test_id) { let result = catch_unwind(AssertUnwindSafe(|| { init(); - tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap().block_on(f); + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap() + .block_on(f); })); process::exit(result.is_err().into()); } @@ -64,15 +68,21 @@ pub fn run_broker_test( child_args.push("--exact".into()); child_args.push("--".into()); child_args.push(test_name.into()); - let result = - runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async { + let result = runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap() + .block_on(async { let mut child = Command::new(env::current_exe().unwrap()) .args(&child_args) .env(BROKER_TEST_ID_ENV, test_id) .env(agent::process::ARGS_ENV, shell_words::join(&child_args)) .spawn() .unwrap(); - time::timeout(timeout, child.wait()).await.expect("timeouted").unwrap() + time::timeout(timeout, child.wait()) + .await + .expect("timeouted") + .unwrap() }); assert!(result.success(), "test failed"); } diff --git a/agentwire/tests/process.rs b/agentwire/tests/process.rs index 43a24a3d..b34177b5 100644 --- a/agentwire/tests/process.rs +++ b/agentwire/tests/process.rs @@ -103,7 +103,13 @@ async fn test_process() { broker.enable_doubler().unwrap(); let fence = Instant::now(); - broker.doubler.enabled().unwrap().send(port::Input::new(3)).await.unwrap(); + broker + .doubler + .enabled() + .unwrap() + .send(port::Input::new(3)) + .await + .unwrap(); broker.run_with_fence(&mut plan, fence).await.unwrap(); broker.disable_doubler(); diff --git a/agentwire/tests/task.rs b/agentwire/tests/task.rs index c55e62c9..d228b52c 100644 --- a/agentwire/tests/task.rs +++ b/agentwire/tests/task.rs @@ -82,7 +82,13 @@ async fn test_task() { broker.enable_doubler().unwrap(); let fence = Instant::now(); - broker.doubler.enabled().unwrap().send(port::Input::new(3)).await.unwrap(); + broker + .doubler + .enabled() + .unwrap() + .send(port::Input::new(3)) + .await + .unwrap(); broker.run_with_fence(&mut plan, fence).await.unwrap(); broker.disable_doubler(); diff --git a/agentwire/tests/thread.rs b/agentwire/tests/thread.rs index ce9c989b..87de6f4e 100644 --- a/agentwire/tests/thread.rs +++ b/agentwire/tests/thread.rs @@ -35,7 +35,9 @@ impl agent::Thread for Doubler { type Error = DoublerError; fn run(self, mut port: port::Inner) -> Result<(), Self::Error> { - let rt = runtime::Builder::new_current_thread().enable_all().build()?; + let rt = runtime::Builder::new_current_thread() + .enable_all() + .build()?; while let Some(x) = rt.block_on(port.next()) { rt.block_on(port.send(x.chain(x.value * 2)))?; } @@ -92,7 +94,13 @@ async fn test_thread() { broker.enable_doubler().unwrap(); let fence = Instant::now(); - broker.doubler.enabled().unwrap().send(port::Input::new(3)).await.unwrap(); + broker + .doubler + .enabled() + .unwrap() + .send(port::Input::new(3)) + .await + .unwrap(); broker.run_with_fence(&mut plan, fence).await.unwrap(); broker.disable_doubler();