From 0bb0236741937e75c507af2ba013621cc0522c2e Mon Sep 17 00:00:00 2001 From: Valentyn Valiaiev Date: Thu, 31 Oct 2024 18:35:54 +0200 Subject: [PATCH] Re-format --- agentwire/src/agent/process.rs | 54 ++++++-- agentwire/src/agent/task.rs | 10 +- agentwire/src/agent/thread.rs | 5 +- agentwire/src/lib.rs | 3 +- agentwire/src/port.rs | 239 +++++++++++++++++++++++++-------- agentwire/src/testing_rt.rs | 18 ++- agentwire/tests/process.rs | 8 +- agentwire/tests/task.rs | 8 +- agentwire/tests/thread.rs | 12 +- 9 files changed, 277 insertions(+), 80 deletions(-) diff --git a/agentwire/src/agent/process.rs b/agentwire/src/agent/process.rs index 83373408..439cee5a 100644 --- a/agentwire/src/agent/process.rs +++ b/agentwire/src/agent/process.rs @@ -13,7 +13,10 @@ use nix::{ sys::signal::{self, Signal}, unistd::Pid, }; -use rkyv::{de::deserializers::SharedDeserializeMap, Archive, Deserialize, Infallible, Serialize}; +use rkyv::{ + de::deserializers::SharedDeserializeMap, Archive, Deserialize, Infallible, + Serialize, +}; use std::{ env, error::Error, @@ -104,7 +107,8 @@ where ::Archived: Deserialize, Self::Input: Archive + for<'a> Serialize>, Self::Output: Archive + for<'a> Serialize>, - ::Archived: Deserialize, + ::Archived: + Deserialize, { /// Error type returned by the agent. type Error: Debug; @@ -136,9 +140,13 @@ where wait_kill_rx.await.unwrap(); tracing::info!("Process agent {} killed", Self::NAME); }; - let spawn_process = spawn_process_impl(self, inner, send_kill_rx, wait_kill_tx, logger); + let spawn_process = + spawn_process_impl(self, inner, send_kill_rx, wait_kill_tx, logger); spawn_named_thread(format!("proc-ipc-{}", Self::NAME), || { - let rt = runtime::Builder::new_current_thread().enable_all().build().unwrap(); + let rt = runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); rt.block_on(task::LocalSet::new().run_until(spawn_process)); }); (outer, kill.boxed()) @@ -171,7 +179,9 @@ where /// This function must be called as early in the program lifetime as possible. /// Everything before this function call gets duplicated for each process-based /// agent. -pub fn init(call_process_agent: impl FnOnce(&str, OwnedFd) -> Result<(), Box>) { +pub fn init( + call_process_agent: impl FnOnce(&str, OwnedFd) -> Result<(), Box>, +) { match (env::var(SHMEM_ENV), env::var(PARENT_PID_ENV)) { (Ok(shmem), Ok(parent_pid)) => { let result = unsafe { libc::prctl(libc::PR_SET_PDEATHSIG, libc::SIGKILL) }; @@ -188,7 +198,9 @@ pub fn init(call_process_agent: impl FnOnce(&str, OwnedFd) -> Result<(), Box().expect("shared memory file descriptor to be an integer"), + shmem + .parse::() + .expect("shared memory file descriptor to be an integer"), ) }; // Agent's name is the first argument. @@ -198,7 +210,9 @@ pub fn init(call_process_agent: impl FnOnce(&str, OwnedFd) -> Result<(), Box tracing::warn!("Agent {name} exited"), - Err(err) => tracing::error!("Agent {name} exited with an error: {err:#?}"), + Err(err) => { + tracing::error!("Agent {name} exited with an error: {err:#?}") + } } process::exit(1); } @@ -215,7 +229,11 @@ pub fn init(call_process_agent: impl FnOnce(&str, OwnedFd) -> Result<(), Box( let (shmem_fd, close) = inner .into_shared_memory(T::NAME, &init_state, recovered_inputs) .expect("couldn't initialize shared memory"); - let exe = env::current_exe().expect("couldn't determine current executable file"); + let exe = + env::current_exe().expect("couldn't determine current executable file"); let initializer = T::initializer(); let mut child_fds = initializer.keep_file_descriptors(); @@ -275,7 +294,10 @@ async fn spawn_process_impl( .arg0(format!("proc-{}", T::NAME)) .args( env::var(ARGS_ENV) - .map(|args| shell_words::split(&args).expect("invalid process arguments")) + .map(|args| { + shell_words::split(&args) + .expect("invalid process arguments") + }) .unwrap_or_default(), ) .envs(initializer.envs()) @@ -295,8 +317,16 @@ async fn spawn_process_impl( drop(shmem_fd); drop(initializer); let pid = Pid::from_raw(child.id().unwrap().try_into().unwrap()); - task::spawn(logger(T::NAME, child.stdout.take().unwrap(), child.stderr.take().unwrap())); - tracing::info!("Process agent {} spawned with PID: {}", T::NAME, pid.as_raw()); + task::spawn(logger( + T::NAME, + child.stdout.take().unwrap(), + child.stderr.take().unwrap(), + )); + tracing::info!( + "Process agent {} spawned with PID: {}", + T::NAME, + pid.as_raw() + ); match future::select(Box::pin(child.wait()), &mut send_kill_rx).await { Either::Left((status, _)) => { let status = status.expect("failed to run a sub-process"); diff --git a/agentwire/src/agent/task.rs b/agentwire/src/agent/task.rs index 37954bb7..64d017cb 100644 --- a/agentwire/src/agent/task.rs +++ b/agentwire/src/agent/task.rs @@ -10,7 +10,10 @@ pub trait Task: Agent + Send { type Error: Debug; /// Runs the agent event-loop inside a dedicated asynchronous task. - fn run(self, port: port::Inner) -> impl Future> + Send; + fn run( + self, + port: port::Inner, + ) -> impl Future> + Send; /// Spawns a new task running the agent event-loop and returns a handle for /// bi-directional communication with the agent. @@ -23,7 +26,10 @@ pub trait Task: Agent + Send { tracing::warn!("Task agent {} exited", Self::NAME); } Err(err) => { - tracing::error!("Task agent {} exited with error: {err:#?}", Self::NAME); + tracing::error!( + "Task agent {} exited with error: {err:#?}", + Self::NAME + ); } } }); diff --git a/agentwire/src/agent/thread.rs b/agentwire/src/agent/thread.rs index 8fd4367f..015658f7 100644 --- a/agentwire/src/agent/thread.rs +++ b/agentwire/src/agent/thread.rs @@ -22,7 +22,10 @@ pub trait Thread: Agent + Send { tracing::warn!("Thread agent {} exited", Self::NAME); } Err(err) => { - tracing::error!("Thread agent {} exited with error: {err:#?}", Self::NAME); + tracing::error!( + "Thread agent {} exited with error: {err:#?}", + Self::NAME + ); } } }); diff --git a/agentwire/src/lib.rs b/agentwire/src/lib.rs index 911f9126..8858f5fb 100644 --- a/agentwire/src/lib.rs +++ b/agentwire/src/lib.rs @@ -336,7 +336,8 @@ where .name(name.clone()) .spawn(move || { if let Ok(title) = CString::new(name.as_bytes()) { - let result = unsafe { libc::prctl(libc::PR_SET_NAME, title.as_ptr(), 0, 0, 0) }; + let result = + unsafe { libc::prctl(libc::PR_SET_NAME, title.as_ptr(), 0, 0, 0) }; if result == -1 { eprintln!( "failed to set thread name to '{name}': {:#?}", diff --git a/agentwire/src/port.rs b/agentwire/src/port.rs index 6908ba26..a225956a 100644 --- a/agentwire/src/port.rs +++ b/agentwire/src/port.rs @@ -111,8 +111,8 @@ use rkyv::{ de::deserializers::SharedDeserializeMap, ser::{ serializers::{ - AllocScratch, BufferSerializer, CompositeSerializer, FallbackScratch, HeapScratch, - SharedSerializeMap, + AllocScratch, BufferSerializer, CompositeSerializer, FallbackScratch, + HeapScratch, SharedSerializeMap, }, Serializer, }, @@ -215,7 +215,8 @@ pub trait SharedPort: Port where Self::Input: Archive + for<'a> Serialize>, Self::Output: Archive + for<'a> Serialize>, - ::Archived: Deserialize, + ::Archived: + Deserialize, { /// Buffer size for input messages. Must be at least `size_of::()` /// for a zero-sized input. @@ -312,26 +313,41 @@ type InitialInputs = Vec<(Box<[u8]>, Instant)>; pub fn new() -> (Inner, Outer) { let (input_tx, input_rx) = mpsc::channel(T::INPUT_CAPACITY); let (output_tx, output_rx) = mpsc::channel(T::OUTPUT_CAPACITY); - let inner = Inner { tx: output_tx, rx: input_rx }; - let outer = Outer { tx: input_tx, rx: output_rx }; + let inner = Inner { + tx: output_tx, + rx: input_rx, + }; + let outer = Outer { + tx: input_tx, + rx: output_rx, + }; (inner, outer) } impl Input { /// Creates a new input value with the source timestamp of now. pub fn new(value: T::Input) -> Self { - Self { value, source_ts: Instant::now() } + Self { + value, + source_ts: Instant::now(), + } } /// Creates a new input value with the source timestamp of the original /// input. pub fn derive(&self, value: O::Input) -> Input { - Input { value, source_ts: self.source_ts } + Input { + value, + source_ts: self.source_ts, + } } /// Creates a new output value with the source timestamp of the input. pub fn chain(&self, value: T::Output) -> Output { - Output { value, source_ts: self.source_ts } + Output { + value, + source_ts: self.source_ts, + } } /// Returns a closure, which creates a new output value with the source @@ -348,7 +364,10 @@ where { /// Creates a new output value with the source timestamp of the input. pub fn chain(&self, value: T::Output) -> Output { - Output { value, source_ts: self.source_ts } + Output { + value, + source_ts: self.source_ts, + } } /// Returns a closure, which creates a new output value with the source @@ -362,13 +381,19 @@ where impl Output { /// Creates a new output value with the source timestamp of now. pub fn new(value: T::Output) -> Self { - Self { value, source_ts: Instant::now() } + Self { + value, + source_ts: Instant::now(), + } } /// Creates a new output value with the source timestamp of the original /// output. pub fn derive(&self, value: O::Output) -> Output { - Output { value, source_ts: self.source_ts } + Output { + value, + source_ts: self.source_ts, + } } /// Returns a closure, which creates a new output value with the source @@ -380,7 +405,10 @@ impl Output { /// Creates a new input value with the source timestamp of the output. pub fn chain(&self, value: O::Input) -> Input { - Input { value, source_ts: self.source_ts } + Input { + value, + source_ts: self.source_ts, + } } /// Returns a closure, which creates a new input value with the source @@ -400,7 +428,10 @@ impl Outer { /// from the agent. Instead the broker sends a message to the agent and /// blocks until it's received by the agent. #[allow(clippy::mut_mut)] // triggered by `select!` internals - pub async fn send_unjam(&mut self, message: Input) -> Result<(), SendUnjamError> { + pub async fn send_unjam( + &mut self, + message: Input, + ) -> Result<(), SendUnjamError> { let mut send = self.tx.send(message).fuse(); let mut recv = self.rx.next(); loop { @@ -418,7 +449,10 @@ impl Outer { impl Stream for Outer { type Item = Output; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { Pin::new(&mut self.rx).poll_next(cx) } } @@ -432,7 +466,10 @@ impl FusedStream for Outer { impl Sink> for Outer { type Error = SendError; - fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_ready( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { Pin::new(&mut self.tx).poll_ready(cx) } @@ -440,11 +477,17 @@ impl Sink> for Outer { Pin::new(&mut self.tx).start_send(item) } - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_flush( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { Pin::new(&mut self.tx).poll_flush(cx) } - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_close( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { Pin::new(&mut self.tx).poll_close(cx) } } @@ -452,7 +495,10 @@ impl Sink> for Outer { impl Stream for Inner { type Item = Input; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { Pin::new(&mut self.rx).poll_next(cx) } } @@ -466,19 +512,31 @@ impl FusedStream for Inner { impl Sink> for Inner { type Error = SendError; - fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_ready( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { Pin::new(&mut self.tx).poll_ready(cx) } - fn start_send(mut self: Pin<&mut Self>, item: Output) -> Result<(), Self::Error> { + fn start_send( + mut self: Pin<&mut Self>, + item: Output, + ) -> Result<(), Self::Error> { Pin::new(&mut self.tx).start_send(item) } - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_flush( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { Pin::new(&mut self.tx).poll_flush(cx) } - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_close( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { Pin::new(&mut self.tx).poll_close(cx) } } @@ -526,13 +584,19 @@ where NonZeroUsize::new(size).expect("to always be positive") } - unsafe fn create(name: &str) -> Result<(*mut Self, OwnedFd), CreateSharedMemoryError> { + unsafe fn create( + name: &str, + ) -> Result<(*mut Self, OwnedFd), CreateSharedMemoryError> { let size = Self::size_of(); let name = CString::new(name).map_err(CreateSharedMemoryError::InvalidName)?; let raw_fd = memfd_create(&name, MemFdCreateFlag::empty()) - .map_err(CreateSharedMemoryError::MemfdCreate)? as RawFd; + .map_err(CreateSharedMemoryError::MemfdCreate)? + as RawFd; let fd = unsafe { OwnedFd::from_raw_fd(raw_fd) }; - let len = size.get().try_into().expect("shared memory size is extremely large"); + let len = size + .get() + .try_into() + .expect("shared memory size is extremely large"); ftruncate(fd.as_raw_fd(), len).map_err(CreateSharedMemoryError::Ftruncate)?; let ptr = unsafe { mmap( @@ -547,10 +611,14 @@ where .cast::() }; unsafe { - sem_init(&mut (*ptr).input_tx, 1, 0).map_err(CreateSharedMemoryError::SemInit)?; - sem_init(&mut (*ptr).input_rx, 1, 0).map_err(CreateSharedMemoryError::SemInit)?; - sem_init(&mut (*ptr).output_tx, 1, 1).map_err(CreateSharedMemoryError::SemInit)?; - sem_init(&mut (*ptr).output_rx, 1, 0).map_err(CreateSharedMemoryError::SemInit)?; + sem_init(&mut (*ptr).input_tx, 1, 0) + .map_err(CreateSharedMemoryError::SemInit)?; + sem_init(&mut (*ptr).input_rx, 1, 0) + .map_err(CreateSharedMemoryError::SemInit)?; + sem_init(&mut (*ptr).output_tx, 1, 1) + .map_err(CreateSharedMemoryError::SemInit)?; + sem_init(&mut (*ptr).output_rx, 1, 0) + .map_err(CreateSharedMemoryError::SemInit)?; (*ptr).input_count = 0; (*ptr).input_index = 0; } @@ -575,11 +643,16 @@ where unsafe fn destroy(ptr: *mut Self) -> Result<(), DestroySharedMemoryError> { unsafe { - sem_destroy(&mut (*ptr).input_tx).map_err(DestroySharedMemoryError::SemDestroy)?; - sem_destroy(&mut (*ptr).input_rx).map_err(DestroySharedMemoryError::SemDestroy)?; - sem_destroy(&mut (*ptr).output_tx).map_err(DestroySharedMemoryError::SemDestroy)?; - sem_destroy(&mut (*ptr).output_rx).map_err(DestroySharedMemoryError::SemDestroy)?; - munmap(ptr.cast(), Self::size_of().get()).map_err(DestroySharedMemoryError::Munmap)?; + sem_destroy(&mut (*ptr).input_tx) + .map_err(DestroySharedMemoryError::SemDestroy)?; + sem_destroy(&mut (*ptr).input_rx) + .map_err(DestroySharedMemoryError::SemDestroy)?; + sem_destroy(&mut (*ptr).output_tx) + .map_err(DestroySharedMemoryError::SemDestroy)?; + sem_destroy(&mut (*ptr).output_rx) + .map_err(DestroySharedMemoryError::SemDestroy)?; + munmap(ptr.cast(), Self::size_of().get()) + .map_err(DestroySharedMemoryError::Munmap)?; } Ok(()) } @@ -596,7 +669,10 @@ where unsafe fn input(&mut self, n: usize) -> &mut [u8] { unsafe { slice::from_raw_parts_mut( - ptr::addr_of_mut!(*self).add(1).cast::().add(T::SERIALIZED_INPUT_SIZE * n), + ptr::addr_of_mut!(*self) + .add(1) + .cast::() + .add(T::SERIALIZED_INPUT_SIZE * n), T::SERIALIZED_INPUT_SIZE, ) } @@ -605,7 +681,10 @@ where unsafe fn output(&mut self) -> &mut [u8] { unsafe { slice::from_raw_parts_mut( - ptr::addr_of_mut!(*self).add(1).cast::().add(T::SERIALIZED_INPUT_SIZE * 2), + ptr::addr_of_mut!(*self) + .add(1) + .cast::() + .add(T::SERIALIZED_INPUT_SIZE * 2), T::SERIALIZED_OUTPUT_SIZE, ) } @@ -627,7 +706,10 @@ where init_state: &T, initial_inputs: InitialInputs, ) -> Result< - (OwnedFd, impl Future>), + ( + OwnedFd, + impl Future>, + ), CreateSharedMemoryError, > { let Self { tx, rx } = self; @@ -647,7 +729,9 @@ where let shared_memory = addr as *mut SharedMemory; assert!((*shared_memory).input_count <= 2); for mut i in 0..(*shared_memory).input_count { - if (*shared_memory).input_count == 2 && (*shared_memory).input_index == 0 { + if (*shared_memory).input_count == 2 + && (*shared_memory).input_index == 0 + { i = (i + 1) % 2; } let input = Box::from(&*(*shared_memory).input(i)); @@ -682,7 +766,8 @@ where #[allow(clippy::missing_panics_doc)] pub fn init_state(&mut self) -> &::Archived { unsafe { - let init_state = deserialize_message::((*self.shared_memory).init_state()); + let init_state = + deserialize_message::((*self.shared_memory).init_state()); sem_post(&mut (*self.shared_memory).input_tx).expect("semaphore failure"); init_state } @@ -694,7 +779,9 @@ where unsafe { sem_wait(&mut (*self.shared_memory).input_rx).expect("semaphore failure"); let input_index = 1 - (*self.shared_memory).input_index; - let value = deserialize_message::((*self.shared_memory).input(input_index)); + let value = deserialize_message::( + (*self.shared_memory).input(input_index), + ); let source_ts = (*self.shared_memory).input_ts[input_index]; sem_post(&mut (*self.shared_memory).input_tx).expect("semaphore failure"); ArchivedInput { value, source_ts } @@ -706,7 +793,10 @@ where #[allow(clippy::missing_panics_doc)] pub fn try_recv(&mut self) -> Option> { unsafe { - if sem_getvalue(&mut (*self.shared_memory).input_rx).expect("semaphore failure") > 0 { + if sem_getvalue(&mut (*self.shared_memory).input_rx) + .expect("semaphore failure") + > 0 + { Some(self.recv()) } else { None @@ -719,7 +809,11 @@ where pub fn send(&mut self, output: &Output) { unsafe { sem_wait(&mut (*self.shared_memory).output_tx).expect("semaphore failure"); - serialize_message((*self.shared_memory).output(), &mut self.scratch, &output.value); + serialize_message( + (*self.shared_memory).output(), + &mut self.scratch, + &output.value, + ); (*self.shared_memory).output_ts = output.source_ts; sem_post(&mut (*self.shared_memory).output_rx).expect("semaphore failure"); } @@ -730,7 +824,10 @@ where #[allow(clippy::missing_panics_doc)] pub fn try_send(&mut self, output: &Output) -> bool { unsafe { - if sem_getvalue(&mut (*self.shared_memory).output_tx).expect("semaphore failure") > 0 { + if sem_getvalue(&mut (*self.shared_memory).output_tx) + .expect("semaphore failure") + > 0 + { self.send(output); true } else { @@ -752,7 +849,9 @@ fn serialize_message( scratch.take().unwrap(), SharedSerializeMap::new(), // reuse of this map doesn't work ); - serializer.serialize_value(value).expect("failed to serialize an IPC message"); + serializer + .serialize_value(value) + .expect("failed to serialize an IPC message"); let size = serializer.pos(); let (_, c, _) = serializer.into_components(); buf[..mem::size_of::()].copy_from_slice(&size.to_ne_bytes()); @@ -804,19 +903,24 @@ where }; let mut sem_wait = spawn_sem_wait(); loop { - if let Either::Left((_, sem_wait)) = select(&mut stop_tx_rx, sem_wait).await { + if let Either::Left((_, sem_wait)) = select(&mut stop_tx_rx, sem_wait).await + { unsafe { let shared_memory = addr as *mut SharedMemory; - sem_post(&mut (*shared_memory).output_rx).expect("semaphore failure"); + sem_post(&mut (*shared_memory).output_rx) + .expect("semaphore failure"); } sem_wait.await.unwrap(); break; } let (value, source_ts) = unsafe { let shared_memory = addr as *mut SharedMemory; - let archived = deserialize_message::((*shared_memory).output()); + let archived = + deserialize_message::((*shared_memory).output()); // Reuse of `SharedDeserializeMap` doesn't work - let value = archived.deserialize(&mut SharedDeserializeMap::new()).unwrap(); + let value = archived + .deserialize(&mut SharedDeserializeMap::new()) + .unwrap(); let source_ts = (*shared_memory).output_ts; sem_post(&mut (*shared_memory).output_tx).expect("semaphore failure"); (value, source_ts) @@ -855,10 +959,12 @@ where let mut sem_wait = spawn_sem_wait(); let mut scratch = Some(FallbackScratch::default()); loop { - if let Either::Left((_, sem_wait)) = select(&mut stop_rx_rx, sem_wait).await { + if let Either::Left((_, sem_wait)) = select(&mut stop_rx_rx, sem_wait).await + { unsafe { let shared_memory = addr as *mut SharedMemory; - sem_post(&mut (*shared_memory).input_tx).expect("semaphore failure"); + sem_post(&mut (*shared_memory).input_tx) + .expect("semaphore failure"); } sem_wait.await.unwrap(); break; @@ -874,7 +980,8 @@ where unsafe { let shared_memory = addr as *mut SharedMemory; let input_index = (*shared_memory).input_index; - (*shared_memory).input_count = ((*shared_memory).input_count + 1).min(2); + (*shared_memory).input_count = + ((*shared_memory).input_count + 1).min(2); (*shared_memory).input_index = ((*shared_memory).input_index + 1) % 2; match input { Either::Left((input, input_ts)) => { @@ -904,26 +1011,46 @@ where unsafe fn sem_init(sem: *mut sem_t, pshared: c_int, value: c_uint) -> io::Result<()> { let result = unsafe { libc::sem_init(sem, pshared, value) }; - if result == -1 { Err(io::Error::last_os_error()) } else { Ok(()) } + if result == -1 { + Err(io::Error::last_os_error()) + } else { + Ok(()) + } } unsafe fn sem_destroy(sem: *mut sem_t) -> io::Result<()> { let result = unsafe { libc::sem_destroy(sem) }; - if result == -1 { Err(io::Error::last_os_error()) } else { Ok(()) } + if result == -1 { + Err(io::Error::last_os_error()) + } else { + Ok(()) + } } unsafe fn sem_post(sem: *mut sem_t) -> io::Result<()> { let result = unsafe { libc::sem_post(sem) }; - if result == -1 { Err(io::Error::last_os_error()) } else { Ok(()) } + if result == -1 { + Err(io::Error::last_os_error()) + } else { + Ok(()) + } } unsafe fn sem_getvalue(sem: *mut sem_t) -> io::Result { let mut value = 0; let result = unsafe { libc::sem_getvalue(sem, &mut value) }; - if result == -1 { Err(io::Error::last_os_error()) } else { Ok(value) } + if result == -1 { + Err(io::Error::last_os_error()) + } else { + Ok(value) + } } unsafe fn sem_wait(sem: *mut sem_t) -> io::Result<()> { let result = unsafe { libc::sem_wait(sem) }; - if result == -1 { Err(io::Error::last_os_error()) } else { Ok(()) } + if result == -1 { + Err(io::Error::last_os_error()) + } else { + Ok(()) + } } diff --git a/agentwire/src/testing_rt.rs b/agentwire/src/testing_rt.rs index 14713d8a..9a69b9d1 100644 --- a/agentwire/src/testing_rt.rs +++ b/agentwire/src/testing_rt.rs @@ -31,7 +31,11 @@ pub fn run_broker_test( if env::var(BROKER_TEST_ID_ENV).map_or(false, |var| var == test_id) { let result = catch_unwind(AssertUnwindSafe(|| { init(); - tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap().block_on(f); + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap() + .block_on(f); })); process::exit(result.is_err().into()); } @@ -64,15 +68,21 @@ pub fn run_broker_test( child_args.push("--exact".into()); child_args.push("--".into()); child_args.push(test_name.into()); - let result = - runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async { + let result = runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap() + .block_on(async { let mut child = Command::new(env::current_exe().unwrap()) .args(&child_args) .env(BROKER_TEST_ID_ENV, test_id) .env(agent::process::ARGS_ENV, shell_words::join(&child_args)) .spawn() .unwrap(); - time::timeout(timeout, child.wait()).await.expect("timeouted").unwrap() + time::timeout(timeout, child.wait()) + .await + .expect("timeouted") + .unwrap() }); assert!(result.success(), "test failed"); } diff --git a/agentwire/tests/process.rs b/agentwire/tests/process.rs index 43a24a3d..b34177b5 100644 --- a/agentwire/tests/process.rs +++ b/agentwire/tests/process.rs @@ -103,7 +103,13 @@ async fn test_process() { broker.enable_doubler().unwrap(); let fence = Instant::now(); - broker.doubler.enabled().unwrap().send(port::Input::new(3)).await.unwrap(); + broker + .doubler + .enabled() + .unwrap() + .send(port::Input::new(3)) + .await + .unwrap(); broker.run_with_fence(&mut plan, fence).await.unwrap(); broker.disable_doubler(); diff --git a/agentwire/tests/task.rs b/agentwire/tests/task.rs index c55e62c9..d228b52c 100644 --- a/agentwire/tests/task.rs +++ b/agentwire/tests/task.rs @@ -82,7 +82,13 @@ async fn test_task() { broker.enable_doubler().unwrap(); let fence = Instant::now(); - broker.doubler.enabled().unwrap().send(port::Input::new(3)).await.unwrap(); + broker + .doubler + .enabled() + .unwrap() + .send(port::Input::new(3)) + .await + .unwrap(); broker.run_with_fence(&mut plan, fence).await.unwrap(); broker.disable_doubler(); diff --git a/agentwire/tests/thread.rs b/agentwire/tests/thread.rs index ce9c989b..87de6f4e 100644 --- a/agentwire/tests/thread.rs +++ b/agentwire/tests/thread.rs @@ -35,7 +35,9 @@ impl agent::Thread for Doubler { type Error = DoublerError; fn run(self, mut port: port::Inner) -> Result<(), Self::Error> { - let rt = runtime::Builder::new_current_thread().enable_all().build()?; + let rt = runtime::Builder::new_current_thread() + .enable_all() + .build()?; while let Some(x) = rt.block_on(port.next()) { rt.block_on(port.send(x.chain(x.value * 2)))?; } @@ -92,7 +94,13 @@ async fn test_thread() { broker.enable_doubler().unwrap(); let fence = Instant::now(); - broker.doubler.enabled().unwrap().send(port::Input::new(3)).await.unwrap(); + broker + .doubler + .enabled() + .unwrap() + .send(port::Input::new(3)) + .await + .unwrap(); broker.run_with_fence(&mut plan, fence).await.unwrap(); broker.disable_doubler();