From 669f0e4cef7253a00d4adaa4e4467c5e095991d0 Mon Sep 17 00:00:00 2001 From: John Nunley Date: Thu, 28 Dec 2023 21:02:41 -0800 Subject: [PATCH 1/4] Port to event-listener v5.0 cc smol-rs/event-listener#105 Signed-off-by: John Nunley --- Cargo.toml | 10 ++++++++-- src/lib.rs | 53 +++++++++++++++++++++++------------------------------ 2 files changed, 31 insertions(+), 32 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 579de57..aa8f4e4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,8 +22,8 @@ name = "broadcast_bench" [features] [dependencies] -event-listener = "3" -event-listener-strategy = "0.1.0" +event-listener = "4.0.1" +event-listener-strategy = "0.4.0" futures-core = "0.3.21" [dev-dependencies] @@ -32,3 +32,9 @@ doc-comment = "0.3.3" easy-parallel = "3.2.0" futures-lite = "1.11.3" futures-util = "0.3.21" + +[patch.crates-io] +event-listener = { git = "https://github.com/smol-rs/event-listener.git", branch = "notgull/break" } +event-listener-strategy = { git = "https://github.com/smol-rs/event-listener-strategy.git", branch = "notgull/evl5" } + + diff --git a/src/lib.rs b/src/lib.rs index 9ad33f3..3513b15 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -785,7 +785,7 @@ pub struct Receiver { pos: u64, /// Listens for a send or close event to unblock this stream. - listener: Option>>, + listener: Option, } impl Receiver { @@ -1599,8 +1599,7 @@ easy_wrapper! { #[derive(Debug)] struct SendInner<'a, T> { sender: &'a Sender, - // TODO: Remove the Pin> at the next breaking release and make this type !Unpin - listener: Option>>, + listener: Option, msg: Option, } @@ -1610,7 +1609,7 @@ impl<'a, T: Clone> EventListenerFuture for SendInner<'a, T> { type Output = Result, SendError>; fn poll_with_strategy<'x, S: event_listener_strategy::Strategy<'x>>( - self: Pin<&'x mut Self>, + self: Pin<&mut Self>, strategy: &mut S, context: &mut S::Context, ) -> Poll { @@ -1641,17 +1640,14 @@ impl<'a, T: Clone> EventListenerFuture for SendInner<'a, T> { } // Sending failed - now start listening for notifications or wait for one. - match &mut this.listener { - None => { - // Start listening and then try sending again. - let inner = inner.write().unwrap(); - this.listener = Some(inner.send_ops.listen()); - } - Some(l) => { - // Wait for a notification. - ready!(strategy.poll(l.as_mut(), context)); - this.listener = None; - } + if this.listener.is_none() { + // Start listening and then try sending again. + let inner = inner.write().unwrap(); + this.listener = Some(inner.send_ops.listen()); + } else { + // Wait for a notification. + ready!(strategy.poll(&mut this.listener, context)); + this.listener = None; } } } @@ -1668,7 +1664,7 @@ easy_wrapper! { #[derive(Debug)] struct RecvInner<'a, T> { receiver: &'a mut Receiver, - listener: Option>>, + listener: Option, } impl<'a, T> Unpin for RecvInner<'a, T> {} @@ -1677,7 +1673,7 @@ impl<'a, T: Clone> EventListenerFuture for RecvInner<'a, T> { type Output = Result; fn poll_with_strategy<'x, S: event_listener_strategy::Strategy<'x>>( - self: Pin<&'x mut Self>, + self: Pin<&mut Self>, strategy: &mut S, context: &mut S::Context, ) -> Poll { @@ -1695,19 +1691,16 @@ impl<'a, T: Clone> EventListenerFuture for RecvInner<'a, T> { } // Receiving failed - now start listening for notifications or wait for one. - match &mut this.listener { - None => { - // Start listening and then try receiving again. - this.listener = { - let inner = this.receiver.inner.write().unwrap(); - Some(inner.recv_ops.listen()) - }; - } - Some(l) => { - // Wait for a notification. - ready!(strategy.poll(l.as_mut(), context)); - this.listener = None; - } + if this.listener.is_none() { + // Start listening and then try receiving again. + this.listener = { + let inner = this.receiver.inner.write().unwrap(); + Some(inner.recv_ops.listen()) + }; + } else { + // Wait for a notification. + ready!(strategy.poll(&mut this.listener, context)); + this.listener = None; } } } From 7ed60db910342df45a96a26f4a40dd7e50519715 Mon Sep 17 00:00:00 2001 From: John Nunley Date: Tue, 6 Feb 2024 19:20:56 -0800 Subject: [PATCH 2/4] chore: Use released event-listener Signed-off-by: John Nunley --- Cargo.toml | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index aa8f4e4..261c828 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,8 +22,8 @@ name = "broadcast_bench" [features] [dependencies] -event-listener = "4.0.1" -event-listener-strategy = "0.4.0" +event-listener = "5.0.0" +event-listener-strategy = "0.5.0" futures-core = "0.3.21" [dev-dependencies] @@ -32,9 +32,3 @@ doc-comment = "0.3.3" easy-parallel = "3.2.0" futures-lite = "1.11.3" futures-util = "0.3.21" - -[patch.crates-io] -event-listener = { git = "https://github.com/smol-rs/event-listener.git", branch = "notgull/break" } -event-listener-strategy = { git = "https://github.com/smol-rs/event-listener-strategy.git", branch = "notgull/evl5" } - - From 1308d614afbd1211dc9e4e858f119283b1adea00 Mon Sep 17 00:00:00 2001 From: John Nunley Date: Tue, 6 Feb 2024 19:23:30 -0800 Subject: [PATCH 3/4] chore: Review comments Signed-off-by: John Nunley --- src/lib.rs | 42 ++++++++++++++++++++++++------------------ 1 file changed, 24 insertions(+), 18 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 3513b15..6b1b6f8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1640,14 +1640,17 @@ impl<'a, T: Clone> EventListenerFuture for SendInner<'a, T> { } // Sending failed - now start listening for notifications or wait for one. - if this.listener.is_none() { - // Start listening and then try sending again. - let inner = inner.write().unwrap(); - this.listener = Some(inner.send_ops.listen()); - } else { - // Wait for a notification. - ready!(strategy.poll(&mut this.listener, context)); - this.listener = None; + match &this.listener { + None => { + // Start listening and then try sending again. + let inner = inner.write().unwrap(); + this.listener = Some(inner.send_ops.listen()); + } + Some(_) => { + // Wait for a notification. + ready!(strategy.poll(&mut this.listener, context)); + this.listener = None; + } } } } @@ -1691,16 +1694,19 @@ impl<'a, T: Clone> EventListenerFuture for RecvInner<'a, T> { } // Receiving failed - now start listening for notifications or wait for one. - if this.listener.is_none() { - // Start listening and then try receiving again. - this.listener = { - let inner = this.receiver.inner.write().unwrap(); - Some(inner.recv_ops.listen()) - }; - } else { - // Wait for a notification. - ready!(strategy.poll(&mut this.listener, context)); - this.listener = None; + match &this.listener { + None => { + // Start listening and then try receiving again. + this.listener = { + let inner = this.receiver.inner.write().unwrap(); + Some(inner.recv_ops.listen()) + }; + } + Some(_) => { + // Wait for a notification. + ready!(strategy.poll(&mut this.listener, context)); + this.listener = None; + } } } } From 3c45a8788f815e9cb3d258def804af46ce07a741 Mon Sep 17 00:00:00 2001 From: John Nunley Date: Wed, 7 Feb 2024 19:49:51 -0800 Subject: [PATCH 4/4] breaking: Make futures !Unpin Signed-off-by: John Nunley --- Cargo.toml | 1 + src/lib.rs | 58 ++++++++++++++++++++++++++++++++---------------------- 2 files changed, 36 insertions(+), 23 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 261c828..be64235 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,6 +25,7 @@ name = "broadcast_bench" event-listener = "5.0.0" event-listener-strategy = "0.5.0" futures-core = "0.3.21" +pin-project-lite = "0.2.13" [dev-dependencies] criterion = "0.3.5" diff --git a/src/lib.rs b/src/lib.rs index 6b1b6f8..7450b76 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -111,6 +111,7 @@ use std::convert::TryInto; use std::error; use std::fmt; use std::future::Future; +use std::marker::PhantomPinned; use std::pin::Pin; use std::sync::{Arc, RwLock}; use std::task::{Context, Poll}; @@ -118,6 +119,7 @@ use std::task::{Context, Poll}; use event_listener::{Event, EventListener}; use event_listener_strategy::{easy_wrapper, EventListenerFuture}; use futures_core::{ready, stream::Stream}; +use pin_project_lite::pin_project; /// Create a new broadcast channel. /// @@ -695,6 +697,7 @@ impl Sender { sender: self, listener: None, msg: Some(msg), + _pin: PhantomPinned }) } @@ -1185,6 +1188,7 @@ impl Receiver { Recv::_new(RecvInner { receiver: self, listener: None, + _pin: PhantomPinned }) } @@ -1596,14 +1600,18 @@ easy_wrapper! { pub(crate) wait(); } -#[derive(Debug)] -struct SendInner<'a, T> { - sender: &'a Sender, - listener: Option, - msg: Option, -} +pin_project! { + #[derive(Debug)] + struct SendInner<'a, T> { + sender: &'a Sender, + listener: Option, + msg: Option, -impl<'a, T> Unpin for SendInner<'a, T> {} + // Keeping this type `!Unpin` enables future optimizations. + #[pin] + _pin: PhantomPinned + } +} impl<'a, T: Clone> EventListenerFuture for SendInner<'a, T> { type Output = Result, SendError>; @@ -1613,7 +1621,7 @@ impl<'a, T: Clone> EventListenerFuture for SendInner<'a, T> { strategy: &mut S, context: &mut S::Context, ) -> Poll { - let mut this = Pin::new(self); + let this = self.project(); loop { let msg = this.msg.take().unwrap(); @@ -1632,9 +1640,9 @@ impl<'a, T: Clone> EventListenerFuture for SendInner<'a, T> { return Poll::Ready(Ok(msg)); } Err(TrySendError::Closed(msg)) => return Poll::Ready(Err(SendError(msg))), - Err(TrySendError::Full(m)) => this.msg = Some(m), + Err(TrySendError::Full(m)) => *this.msg = Some(m), Err(TrySendError::Inactive(m)) if inner.read().unwrap().await_active => { - this.msg = Some(m) + *this.msg = Some(m) } Err(TrySendError::Inactive(m)) => return Poll::Ready(Err(SendError(m))), } @@ -1644,12 +1652,12 @@ impl<'a, T: Clone> EventListenerFuture for SendInner<'a, T> { None => { // Start listening and then try sending again. let inner = inner.write().unwrap(); - this.listener = Some(inner.send_ops.listen()); + *this.listener = Some(inner.send_ops.listen()); } Some(_) => { // Wait for a notification. - ready!(strategy.poll(&mut this.listener, context)); - this.listener = None; + ready!(strategy.poll(this.listener, context)); + *this.listener = None; } } } @@ -1664,13 +1672,17 @@ easy_wrapper! { pub(crate) wait(); } -#[derive(Debug)] -struct RecvInner<'a, T> { - receiver: &'a mut Receiver, - listener: Option, -} +pin_project! { + #[derive(Debug)] + struct RecvInner<'a, T> { + receiver: &'a mut Receiver, + listener: Option, -impl<'a, T> Unpin for RecvInner<'a, T> {} + // Keeping this type `!Unpin` enables future optimizations. + #[pin] + _pin: PhantomPinned + } +} impl<'a, T: Clone> EventListenerFuture for RecvInner<'a, T> { type Output = Result; @@ -1680,7 +1692,7 @@ impl<'a, T: Clone> EventListenerFuture for RecvInner<'a, T> { strategy: &mut S, context: &mut S::Context, ) -> Poll { - let mut this = Pin::new(self); + let this = self.project(); loop { // Attempt to receive a message. @@ -1697,15 +1709,15 @@ impl<'a, T: Clone> EventListenerFuture for RecvInner<'a, T> { match &this.listener { None => { // Start listening and then try receiving again. - this.listener = { + *this.listener = { let inner = this.receiver.inner.write().unwrap(); Some(inner.recv_ops.listen()) }; } Some(_) => { // Wait for a notification. - ready!(strategy.poll(&mut this.listener, context)); - this.listener = None; + ready!(strategy.poll(this.listener, context)); + *this.listener = None; } } }