Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

shuffle vec/array/tuple futures/streams for benchmark #117

Merged
merged 2 commits into from
Feb 9, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
remove unneccessary pin_project in countdown future/stream
wishawa committed Feb 2, 2023
commit c777f28d767f09c88235874e00f89c62158d2def
30 changes: 13 additions & 17 deletions benches/utils/countdown_futures.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use futures_core::Future;
use pin_project::pin_project;

use std::cell::{Cell, RefCell};
use std::collections::BinaryHeap;
@@ -45,7 +44,6 @@ pub fn futures_tuple() -> (
}

/// A future which will _eventually_ be ready, but needs to be polled N times before it is.
#[pin_project]
pub struct CountdownFuture {
state: State,
wakers: Rc<RefCell<BinaryHeap<PrioritizedWaker>>>,
@@ -73,40 +71,38 @@ impl CountdownFuture {
impl Future for CountdownFuture {
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// If we are the last stream to be polled, skip strait to the Polled state.
if this.wakers.borrow().len() + 1 == *this.max_count {
*this.state = State::Polled;
if self.wakers.borrow().len() + 1 == self.max_count {
self.state = State::Polled;
}

match this.state {
match self.state {
State::Init => {
// Push our waker onto the stack so we get woken again someday.
this.wakers
self.wakers
.borrow_mut()
.push(PrioritizedWaker(*this.index, cx.waker().clone()));
*this.state = State::Polled;
.push(PrioritizedWaker(self.index, cx.waker().clone()));
self.state = State::Polled;
Poll::Pending
}
State::Polled => {
// Wake up the next one
let _ = this
let _ = self
.wakers
.borrow_mut()
.pop()
.map(|PrioritizedWaker(_, waker)| waker.wake());

if this.completed_count.get() == *this.index {
*this.state = State::Done;
this.completed_count.set(this.completed_count.get() + 1);
if self.completed_count.get() == self.index {
self.state = State::Done;
self.completed_count.set(self.completed_count.get() + 1);
Poll::Ready(())
} else {
// We're not done yet, so schedule another wakeup
this.wakers
self.wakers
.borrow_mut()
.push(PrioritizedWaker(*this.index, cx.waker().clone()));
.push(PrioritizedWaker(self.index, cx.waker().clone()));
Poll::Pending
}
}
30 changes: 13 additions & 17 deletions benches/utils/countdown_streams.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use futures_core::Stream;
use pin_project::pin_project;

use std::cell::{Cell, RefCell};
use std::collections::BinaryHeap;
@@ -45,7 +44,6 @@ pub fn streams_tuple() -> (
}

/// A stream which will _eventually_ be ready, but needs to be polled N times before it is.
#[pin_project]
pub struct CountdownStream {
state: State,
wakers: Rc<RefCell<BinaryHeap<PrioritizedWaker>>>,
@@ -73,40 +71,38 @@ impl CountdownStream {
impl Stream for CountdownStream {
type Item = ();

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// If we are the last stream to be polled, skip strait to the Polled state.
if this.wakers.borrow().len() + 1 == *this.max_count {
*this.state = State::Polled;
if self.wakers.borrow().len() + 1 == self.max_count {
self.state = State::Polled;
}

match this.state {
match &mut self.state {
State::Init => {
// Push our waker onto the stack so we get woken again someday.
this.wakers
self.wakers
.borrow_mut()
.push(PrioritizedWaker(*this.index, cx.waker().clone()));
*this.state = State::Polled;
.push(PrioritizedWaker(self.index, cx.waker().clone()));
self.state = State::Polled;
Poll::Pending
}
State::Polled => {
// Wake up the next one
let _ = this
let _ = self
.wakers
.borrow_mut()
.pop()
.map(|PrioritizedWaker(_, waker)| waker.wake());

if this.completed_count.get() == *this.index {
*this.state = State::Done;
this.completed_count.set(this.completed_count.get() + 1);
if self.completed_count.get() == self.index {
self.state = State::Done;
self.completed_count.set(self.completed_count.get() + 1);
Poll::Ready(Some(()))
} else {
// We're not done yet, so schedule another wakeup
this.wakers
self.wakers
.borrow_mut()
.push(PrioritizedWaker(*this.index, cx.waker().clone()));
.push(PrioritizedWaker(self.index, cx.waker().clone()));
Poll::Pending
}
}