From 1c638aebd1f55da91107e9970f79504809d841a8 Mon Sep 17 00:00:00 2001 From: Will Brown Date: Tue, 29 Aug 2023 15:04:05 +0000 Subject: [PATCH] better readiness --- src/sndlink.rs | 58 +++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 45 insertions(+), 13 deletions(-) diff --git a/src/sndlink.rs b/src/sndlink.rs index d237d1e..352d69c 100644 --- a/src/sndlink.rs +++ b/src/sndlink.rs @@ -1,6 +1,7 @@ use std::collections::VecDeque; use std::{convert::TryFrom, future::Future, mem, pin::Pin, task::Context, task::Poll}; +use ntex::channel::condition::Waiter; use ntex::channel::{condition, oneshot, pool}; use ntex::util::{ready, BufMut, ByteString, Bytes, Either, PoolRef, Ready}; use ntex_amqp_codec::protocol::{ @@ -17,6 +18,11 @@ pub struct SenderLink { pub(crate) inner: Cell, } +pub struct SenderLinkWaiter { + inner: Cell, + waiter: Waiter, +} + pub(crate) struct SenderLinkInner { pub(crate) id: usize, name: ByteString, @@ -96,19 +102,17 @@ impl SenderLink { /// Get notification when packet could be send to the peer. /// /// Result indicates if connection is alive - pub async fn ready(&self) -> bool { - loop { - let waiter = { - let inner = self.inner.get_ref(); - if inner.closed { - return false; - } - if inner.link_credit > 0 { - return true; - } - inner.on_credit.wait() - }; - waiter.await + pub fn ready(&self) -> impl Future { + match self.inner.get_ref().ready() { + Poll::Ready(value) => Either::Left(std::future::ready(value)), + Poll::Pending => Either::Right(self.waiter()), + } + } + + pub fn waiter(&self) -> SenderLinkWaiter { + SenderLinkWaiter { + inner: self.inner.clone(), + waiter: self.inner.get_ref().on_credit.wait(), } } @@ -571,6 +575,34 @@ impl SenderLinkInner { buf.freeze() }) } + + fn ready(&self) -> Poll { + if self.closed { + Poll::Ready(false) + } else if self.link_credit > 0 { + Poll::Ready(true) + } else { + Poll::Pending + } + } +} + +impl SenderLinkWaiter { + pub fn poll_ready(&self, cx: &mut Context<'_>) -> Poll { + let result = self.inner.get_ref().ready(); + if result.is_pending() { + let _ = self.waiter.poll_ready(cx); + } + result + } +} + +impl Future for SenderLinkWaiter { + type Output = bool; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.poll_ready(cx) + } } pub struct SenderLinkBuilder {