Skip to content

Implement FuturesUnordered::iter_mut #618

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 43 additions & 24 deletions src/stream/futures_unordered.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//! An unbounded set of futures.

use std::cell::UnsafeCell;
use std::fmt::{self, Debug};
use std::iter::FromIterator;
Expand All @@ -9,7 +11,7 @@ use std::sync::atomic::{AtomicPtr, AtomicBool};
use std::sync::{Arc, Weak};
use std::usize;

use {task, Stream, Future, Poll, Async, IntoFuture};
use {task, Stream, Future, Poll, Async};
use executor::{Notify, UnsafeNotify, NotifyHandle};
use task_impl::{self, AtomicTask};

Expand Down Expand Up @@ -51,29 +53,6 @@ pub struct FuturesUnordered<F> {
unsafe impl<T: Send> Send for FuturesUnordered<T> {}
unsafe impl<T: Sync> Sync for FuturesUnordered<T> {}

/// Converts a list of futures into a `Stream` of results from the futures.
///
/// This function will take an list of futures (e.g. a vector, an iterator,
/// etc), and return a stream. The stream will yield items as they become
/// available on the futures internally, in the order that they become
/// available. This function is similar to `buffer_unordered` in that it may
/// return items in a different order than in the list specified.
///
/// Note that the returned set can also be used to dynamically push more
/// futures into the set as they become available.
pub fn futures_unordered<I>(futures: I) -> FuturesUnordered<<I::Item as IntoFuture>::Future>
where I: IntoIterator,
I::Item: IntoFuture
{
let mut set = FuturesUnordered::new();

for future in futures {
set.push(future.into_future());
}

return set
}

// FuturesUnordered is implemented using two linked lists. One which links all
// futures managed by a `FuturesUnordered` and one that tracks futures that have
// been scheduled for polling. The first linked list is not thread safe and is
Expand Down Expand Up @@ -207,6 +186,15 @@ impl<T> FuturesUnordered<T> {
self.inner.enqueue(ptr);
}

/// Returns an iterator that allows modifying each future in the set.
pub fn iter_mut(&mut self) -> IterMut<T> {
IterMut {
node: self.head_all,
len: self.len,
_marker: PhantomData
}
}

fn release_node(&mut self, node: Arc<Node<T>>) {
// The future is done, try to reset the queued flag. This will prevent
// `notify` from doing any work in the future
Expand Down Expand Up @@ -440,6 +428,37 @@ impl<F: Future> FromIterator<F> for FuturesUnordered<F> {
}
}

#[derive(Debug)]
/// Mutable iterator over all futures in the unordered set.
pub struct IterMut<'a, F: 'a> {
node: *const Node<F>,
len: usize,
_marker: PhantomData<&'a mut FuturesUnordered<F>>
}

impl<'a, F> Iterator for IterMut<'a, F> {
type Item = &'a mut F;

fn next(&mut self) -> Option<&'a mut F> {
if self.node.is_null() {
return None;
}
unsafe {
let future = (*(*self.node).future.get()).as_mut().unwrap();
let next = *(*self.node).next_all.get();
self.node = next;
self.len -= 1;
return Some(future);
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
(self.len, Some(self.len))
}
}

impl<'a, F> ExactSizeIterator for IterMut<'a, F> {}

impl<T> Inner<T> {
/// The enqueue function from the 1024cores intrusive MPSC queue algorithm.
fn enqueue(&self, node: *const Node<T>) {
Expand Down
28 changes: 26 additions & 2 deletions src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ if_std! {
mod wait;
mod channel;
mod split;
mod futures_unordered;
pub mod futures_unordered;
mod futures_ordered;
pub use self::buffered::Buffered;
pub use self::buffer_unordered::BufferUnordered;
Expand All @@ -112,7 +112,7 @@ if_std! {
pub use self::collect::Collect;
pub use self::wait::Wait;
pub use self::split::{SplitStream, SplitSink};
pub use self::futures_unordered::{futures_unordered, FuturesUnordered};
pub use self::futures_unordered::FuturesUnordered;
pub use self::futures_ordered::{futures_ordered, FuturesOrdered};

#[doc(hidden)]
Expand Down Expand Up @@ -1102,3 +1102,27 @@ impl<'a, S: ?Sized + Stream> Stream for &'a mut S {
(**self).poll()
}
}

/// Converts a list of futures into a `Stream` of results from the futures.
///
/// This function will take an list of futures (e.g. a vector, an iterator,
/// etc), and return a stream. The stream will yield items as they become
/// available on the futures internally, in the order that they become
/// available. This function is similar to `buffer_unordered` in that it may
/// return items in a different order than in the list specified.
///
/// Note that the returned set can also be used to dynamically push more
/// futures into the set as they become available.
#[cfg(feature = "use_std")]
pub fn futures_unordered<I>(futures: I) -> FuturesUnordered<<I::Item as IntoFuture>::Future>
where I: IntoIterator,
I::Item: IntoFuture
{
let mut set = FuturesUnordered::new();

for future in futures {
set.push(future.into_future());
}

return set
}
42 changes: 42 additions & 0 deletions tests/futures_unordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,45 @@ fn finished_future_ok() {
assert!(spawn.poll_stream_notify(&support::notify_noop(), 0).unwrap().is_not_ready());
assert!(spawn.poll_stream_notify(&support::notify_noop(), 0).unwrap().is_not_ready());
}

#[test]
fn iter_mut_cancel() {
let (a_tx, a_rx) = oneshot::channel::<u32>();
let (b_tx, b_rx) = oneshot::channel::<u32>();
let (c_tx, c_rx) = oneshot::channel::<u32>();

let mut stream = futures_unordered(vec![a_rx, b_rx, c_rx]);

for rx in stream.iter_mut() {
rx.close();
}

assert!(a_tx.is_canceled());
assert!(b_tx.is_canceled());
assert!(c_tx.is_canceled());

let mut spawn = futures::executor::spawn(stream);
assert_eq!(Some(Err(futures::sync::oneshot::Canceled)), spawn.wait_stream());
assert_eq!(Some(Err(futures::sync::oneshot::Canceled)), spawn.wait_stream());
assert_eq!(Some(Err(futures::sync::oneshot::Canceled)), spawn.wait_stream());
assert_eq!(None, spawn.wait_stream());
}

#[test]
fn iter_mut_len() {
let mut stream = futures_unordered(vec![
futures::future::empty::<(),()>(),
futures::future::empty::<(),()>(),
futures::future::empty::<(),()>()
]);

let mut iter_mut = stream.iter_mut();
assert_eq!(iter_mut.len(), 3);
assert!(iter_mut.next().is_some());
assert_eq!(iter_mut.len(), 2);
assert!(iter_mut.next().is_some());
assert_eq!(iter_mut.len(), 1);
assert!(iter_mut.next().is_some());
assert_eq!(iter_mut.len(), 0);
assert!(iter_mut.next().is_none());
}