Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Waker optimization + O(woken) polling for every combinator except chain #115

Closed
wants to merge 13 commits into from
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
O(woken) polling for join,merge,zip
wishawa committed Dec 29, 2022
commit 4e4f8ec9e9ab1ea0304ec83a2ad2e2f938079bb4
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -6,7 +6,7 @@ repository = "https://github.com/yoshuawuyts/futures-concurrency"
documentation = "https://docs.rs/futures-concurrency"
description = "Structured concurrency operations for async Rust"
readme = "README.md"
edition = "2018"
edition = "2021"
keywords = []
categories = []
authors = [
100 changes: 43 additions & 57 deletions src/future/join/array.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::Join as JoinTrait;
use crate::utils::{self, PollArray, WakerArray};
use crate::utils::{self, WakerArray};

use core::array;
use core::fmt;
@@ -23,11 +23,11 @@ pub struct Join<Fut, const N: usize>
where
Fut: Future,
{
consumed: bool,
pending: usize,
items: [MaybeUninit<<Fut as Future>::Output>; N],
wakers: WakerArray<N>,
state: PollArray<N>,
filled: [bool; N],
awake_list_buffer: [usize; N],
#[pin]
futures: [Fut; N],
}
@@ -39,11 +39,11 @@ where
#[inline]
pub(crate) fn new(futures: [Fut; N]) -> Self {
Join {
consumed: false,
pending: N,
items: array::from_fn(|_| MaybeUninit::uninit()),
wakers: WakerArray::new(),
state: PollArray::new(),
filled: [false; N],
awake_list_buffer: [0; N],
futures,
}
}
@@ -68,7 +68,7 @@ where
Fut::Output: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_list().entries(self.state.iter()).finish()
f.debug_list().entries(self.futures.iter()).finish()
}
}

@@ -83,47 +83,40 @@ where
let mut this = self.project();

assert!(
!*this.consumed,
N == 0 || *this.pending > 0,
"Futures must not be polled after completing"
);

let mut readiness = this.wakers.readiness().lock().unwrap();
readiness.set_waker(cx.waker());
if !readiness.any_ready() {
// Nothing is ready yet
return Poll::Pending;
}

// Poll all ready futures
for (i, fut) in utils::iter_pin_mut(this.futures.as_mut()).enumerate() {
if this.state[i].is_pending() && readiness.clear_ready(i) {
// unlock readiness so we don't deadlock when polling
drop(readiness);

// Obtain the intermediate waker.
let mut cx = Context::from_waker(this.wakers.get(i).unwrap());

if let Poll::Ready(value) = fut.poll(&mut cx) {
this.items[i] = MaybeUninit::new(value);
this.state[i].set_ready();
*this.pending -= 1;
}

// Lock readiness so we can use it again
readiness = this.wakers.readiness().lock().unwrap();
let num_awake = {
let mut awakeness = this.wakers.awakeness();
awakeness.set_parent_waker(cx.waker());
let awake_list = awakeness.awake_list();
let num_awake = awake_list.len();
this.awake_list_buffer[..num_awake].copy_from_slice(awake_list);
awakeness.clear();
num_awake
};

for &idx in this.awake_list_buffer.iter().take(num_awake) {
let filled = &mut this.filled[idx];
if *filled {
// Woken future is already complete, don't poll it again.
continue;
}
let fut = utils::get_pin_mut(this.futures.as_mut(), idx).unwrap();
let mut cx = Context::from_waker(this.wakers.get(idx).unwrap());
if let Poll::Ready(value) = fut.poll(&mut cx) {
this.items[idx].write(value);
*filled = true;
*this.pending -= 1;
}
}

// Check whether we're all done now or need to keep going.
if *this.pending == 0 {
// Mark all data as "consumed" before we take it
*this.consumed = true;
for state in this.state.iter_mut() {
debug_assert!(
state.is_ready(),
"Future should have reached a `Ready` state"
);
state.set_consumed();
for filled in this.filled.iter_mut() {
debug_assert!(*filled, "Future should have filled items array");
*filled = false;
}

let mut items = array::from_fn(|_| MaybeUninit::uninit());
@@ -148,19 +141,12 @@ where
fn drop(self: Pin<&mut Self>) {
let this = self.project();

// Get the indexes of the initialized values.
let indexes = this
.state
.iter_mut()
.enumerate()
.filter(|(_, state)| state.is_ready())
.map(|(i, _)| i);

// Drop each value at the index.
for i in indexes {
// SAFETY: we've just filtered down to *only* the initialized values.
// We can assume they're initialized, and this is where we drop them.
unsafe { this.items[i].assume_init_drop() };
for (&filled, output) in this.filled.iter().zip(this.items.iter_mut()) {
if filled {
// SAFETY: we've just filtered down to *only* the initialized values.
// We can assume they're initialized, and this is where we drop them.
unsafe { output.assume_init_drop() };
}
}
}
}
@@ -169,13 +155,13 @@ where
mod test {
use futures_lite::future::yield_now;

use crate::utils::dummy_waker;

use super::*;
use crate::utils::DummyWaker;

use std::cell::RefCell;
use std::future;
use std::future::Future;
use std::sync::Arc;
use std::task::Context;

#[test]
@@ -189,13 +175,13 @@ mod test {
#[test]
fn debug() {
let mut fut = [future::ready("hello"), future::ready("world")].join();
assert_eq!(format!("{:?}", fut), "[Pending, Pending]");
assert_eq!(fut.filled, [false, false]);
let mut fut = Pin::new(&mut fut);

let waker = Arc::new(DummyWaker()).into();
let waker = dummy_waker();
let mut cx = Context::from_waker(&waker);
let _ = fut.as_mut().poll(&mut cx);
assert_eq!(format!("{:?}", fut), "[Consumed, Consumed]");
assert_eq!(fut.filled, [false, false]);
}

#[test]
Loading