Skip to content

Commit

Permalink
better readiness
Browse files Browse the repository at this point in the history
  • Loading branch information
wpbrown committed Aug 29, 2023
1 parent 75125cf commit 1c638ae
Showing 1 changed file with 45 additions and 13 deletions.
58 changes: 45 additions & 13 deletions src/sndlink.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::collections::VecDeque;
use std::{convert::TryFrom, future::Future, mem, pin::Pin, task::Context, task::Poll};

use ntex::channel::condition::Waiter;
use ntex::channel::{condition, oneshot, pool};
use ntex::util::{ready, BufMut, ByteString, Bytes, Either, PoolRef, Ready};
use ntex_amqp_codec::protocol::{
Expand All @@ -17,6 +18,11 @@ pub struct SenderLink {
pub(crate) inner: Cell<SenderLinkInner>,
}

pub struct SenderLinkWaiter {
inner: Cell<SenderLinkInner>,
waiter: Waiter,
}

pub(crate) struct SenderLinkInner {
pub(crate) id: usize,
name: ByteString,
Expand Down Expand Up @@ -96,19 +102,17 @@ impl SenderLink {
/// Get notification when packet could be send to the peer.
///
/// Result indicates if connection is alive
pub async fn ready(&self) -> bool {
loop {
let waiter = {
let inner = self.inner.get_ref();
if inner.closed {
return false;
}
if inner.link_credit > 0 {
return true;
}
inner.on_credit.wait()
};
waiter.await
pub fn ready(&self) -> impl Future<Output = bool> {
match self.inner.get_ref().ready() {
Poll::Ready(value) => Either::Left(std::future::ready(value)),
Poll::Pending => Either::Right(self.waiter()),
}
}

pub fn waiter(&self) -> SenderLinkWaiter {
SenderLinkWaiter {
inner: self.inner.clone(),
waiter: self.inner.get_ref().on_credit.wait(),
}
}

Expand Down Expand Up @@ -571,6 +575,34 @@ impl SenderLinkInner {
buf.freeze()
})
}

fn ready(&self) -> Poll<bool> {
if self.closed {
Poll::Ready(false)
} else if self.link_credit > 0 {
Poll::Ready(true)
} else {
Poll::Pending
}
}
}

impl SenderLinkWaiter {
pub fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<bool> {
let result = self.inner.get_ref().ready();
if result.is_pending() {
let _ = self.waiter.poll_ready(cx);
}
result
}
}

impl Future for SenderLinkWaiter {
type Output = bool;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.poll_ready(cx)
}
}

pub struct SenderLinkBuilder {
Expand Down

0 comments on commit 1c638ae

Please sign in to comment.