diff --git a/book/src/blocking.md b/book/src/blocking.md index a04e9741d..391bfe8a4 100644 --- a/book/src/blocking.md +++ b/book/src/blocking.md @@ -185,13 +185,13 @@ fn main() -> Result<(), Box> { name: "GreeterName".to_string(), done: event_listener::Event::new(), }; - let done_listener = greeter.done.listen(); + let mut done_listener = greeter.done.listen(); let _handle = connection::Builder::session()? .name("org.zbus.MyGreeter")? .serve_at("/org/zbus/MyGreeter", greeter)? .build()?; - done_listener.wait(); + done_listener.as_mut().wait(); Ok(()) } diff --git a/book/src/server.md b/book/src/server.md index 9319ebd6e..89812722b 100644 --- a/book/src/server.md +++ b/book/src/server.md @@ -262,14 +262,14 @@ async fn main() -> Result<()> { name: "GreeterName".to_string(), done: event_listener::Event::new(), }; - let done_listener = greeter.done.listen(); + let mut done_listener = greeter.done.listen(); let _connection = Builder::session()? .name("org.zbus.MyGreeter")? .serve_at("/org/zbus/MyGreeter", greeter)? .build() .await?; - done_listener.wait(); + done_listener.as_mut().wait(); Ok(()) } diff --git a/zbus/Cargo.toml b/zbus/Cargo.toml index 275ee60a2..b8f20c214 100644 --- a/zbus/Cargo.toml +++ b/zbus/Cargo.toml @@ -45,15 +45,15 @@ zbus_macros = { path = "../zbus_macros", version = "=4.0.0" } enumflags2 = { version = "0.7.7", features = ["serde"] } derivative = "2.2" once_cell = "1.4.0" -async-io = { version = "1.12.0", optional = true } +async-io = { version = "2.2.2", optional = true } futures-core = "0.3.25" futures-sink = "0.3.25" futures-util = { version = "0.3.25", default-features = false, features = [ "sink", "std", ] } -async-lock = { version = "2.6.0", optional = true } -async-broadcast = "0.5.0" +async-lock = { version = "3.0.0", optional = true } +async-broadcast = "0.6.0" async-executor = { version = "1.5.0", optional = true } blocking = { version = "1.0.2", optional = true } async-task = { version = "4.3.0", optional = true } @@ -61,10 +61,10 @@ hex = "0.4.3" ordered-stream = "0.2" rand = "0.8.5" sha1 = { version = "0.10.5", features = ["std"] } -event-listener = "2.5.3" +event-listener = "4.0.1" static_assertions = "1.1.0" async-trait = "0.1.58" -async-fs = { version = "1.6.0", optional = true } +async-fs = { version = "2.0.0", optional = true } # FIXME: We should only enable process feature for Mac OS. See comment on async-process below for why we can't. tokio = { version = "1.21.2", optional = true, features = [ "rt", @@ -95,7 +95,7 @@ winapi = { version = "0.3", features = [ "winerror", "winsock2", ] } -uds_windows = "1.0.2" +uds_windows = "1.1.0" [target.'cfg(unix)'.dependencies] nix = { version = "0.27", default-features = false, features = [ @@ -107,7 +107,7 @@ nix = { version = "0.27", default-features = false, features = [ [target.'cfg(target_os = "macos")'.dependencies] # FIXME: This should only be enabled if async-io feature is enabled but currently # Cargo doesn't provide a way to do that for only specific target OS: https://github.com/rust-lang/cargo/issues/1197. -async-process = "1.7.0" +async-process = "2.0.0" [target.'cfg(any(target_os = "macos", windows))'.dependencies] async-recursion = "1.0.0" diff --git a/zbus/src/address.rs b/zbus/src/address.rs index 27efe6c37..c4558c083 100644 --- a/zbus/src/address.rs +++ b/zbus/src/address.rs @@ -333,6 +333,7 @@ impl Address { Address::Tcp(addr) => connect_tcp(addr).await.map(Stream::Tcp), Address::NonceTcp { addr, nonce_file } => { + #[allow(unused_mut)] let mut stream = connect_tcp(addr).await?; #[cfg(unix)] @@ -352,7 +353,7 @@ impl Address { while !nonce.is_empty() { let len = stream - .write_with_mut(|s| std::io::Write::write(s, nonce)) + .write_with(|mut s| std::io::Write::write(&mut s, nonce)) .await?; nonce = &nonce[len..]; } diff --git a/zbus/src/blocking/connection/mod.rs b/zbus/src/blocking/connection/mod.rs index ff968b055..203070a79 100644 --- a/zbus/src/blocking/connection/mod.rs +++ b/zbus/src/blocking/connection/mod.rs @@ -3,7 +3,7 @@ use enumflags2::BitFlags; use event_listener::EventListener; use static_assertions::assert_impl_all; -use std::{io, ops::Deref}; +use std::{io, ops::Deref, pin::Pin}; use zbus_names::{BusName, ErrorName, InterfaceName, MemberName, OwnedUniqueName, WellKnownName}; use zvariant::ObjectPath; @@ -241,7 +241,7 @@ impl Connection { /// Returns a listener, notified on various connection activity. /// /// This function is meant for the caller to implement idle or timeout on inactivity. - pub fn monitor_activity(&self) -> EventListener { + pub fn monitor_activity(&self) -> Pin> { self.inner.monitor_activity() } @@ -313,7 +313,9 @@ mod tests { }); let c = Builder::unix_stream(p1).p2p().build().unwrap(); - let listener = c.monitor_activity(); + + let mut listener = c.monitor_activity(); + let mut s = MessageIterator::from(&c); tx.send(()).unwrap(); let m = s.next().unwrap().unwrap(); @@ -326,11 +328,15 @@ mod tests { assert_eq!(val, "yay"); // there was some activity - listener.wait(); + listener.as_mut().wait(); // eventually, nothing happens and it will timeout loop { - let listener = c.monitor_activity(); - if !listener.wait_timeout(std::time::Duration::from_millis(10)) { + let mut listener = c.monitor_activity(); + if listener + .as_mut() + .wait_timeout(std::time::Duration::from_millis(10)) + .is_none() + { break; } } diff --git a/zbus/src/blocking/object_server.rs b/zbus/src/blocking/object_server.rs index bdc004774..30ef9c55e 100644 --- a/zbus/src/blocking/object_server.rs +++ b/zbus/src/blocking/object_server.rs @@ -113,13 +113,13 @@ where /// let connection = Connection::session()?; /// /// let quit_event = Event::new(); -/// let quit_listener = quit_event.listen(); +/// let mut quit_listener = quit_event.listen(); /// let interface = Example::new(quit_event); /// connection /// .object_server() /// .at("/org/zbus/path", interface)?; /// -/// quit_listener.wait(); +/// quit_listener.as_mut().wait(); /// # Ok::<_, Box>(()) /// ``` #[derive(Debug)] diff --git a/zbus/src/connection/mod.rs b/zbus/src/connection/mod.rs index 413ff3585..04177000c 100644 --- a/zbus/src/connection/mod.rs +++ b/zbus/src/connection/mod.rs @@ -1212,7 +1212,7 @@ impl Connection { /// Returns a listener, notified on various connection activity. /// /// This function is meant for the caller to implement idle or timeout on inactivity. - pub fn monitor_activity(&self) -> EventListener { + pub fn monitor_activity(&self) -> Pin> { self.inner.activity_event.listen() } diff --git a/zbus/src/connection/socket_reader.rs b/zbus/src/connection/socket_reader.rs index 74659df05..1d56e4338 100644 --- a/zbus/src/connection/socket_reader.rs +++ b/zbus/src/connection/socket_reader.rs @@ -72,7 +72,7 @@ impl SocketReader { } } - if let Err(e) = sender.broadcast(msg.clone()).await { + if let Err(e) = sender.broadcast_direct(msg.clone()).await { // An error would be due to either of these: // // 1. the channel is closed. diff --git a/zbus/src/object_server/mod.rs b/zbus/src/object_server/mod.rs index e76979060..2860b2fb3 100644 --- a/zbus/src/object_server/mod.rs +++ b/zbus/src/object_server/mod.rs @@ -7,6 +7,7 @@ use std::{ fmt::Write, marker::PhantomData, ops::{Deref, DerefMut}, + pin::Pin, sync::Arc, }; use tracing::{debug, instrument, trace}; @@ -809,7 +810,7 @@ pub struct ResponseDispatchNotifier { impl ResponseDispatchNotifier { /// Create a new `NotifyResponse`. - pub fn new(response: R) -> (Self, EventListener) { + pub fn new(response: R) -> (Self, Pin>) { let event = Event::new(); let listener = event.listen(); ( diff --git a/zbus/src/proxy/mod.rs b/zbus/src/proxy/mod.rs index 8afe1c514..3023e9763 100644 --- a/zbus/src/proxy/mod.rs +++ b/zbus/src/proxy/mod.rs @@ -209,7 +209,7 @@ where pub struct PropertyStream<'a, T> { name: &'a str, proxy: Proxy<'a>, - changed_listener: EventListener, + changed_listener: Pin>, phantom: std::marker::PhantomData, }