diff --git a/bytes/src/lib.rs b/bytes/src/lib.rs index 17c0f3237..7cb803274 100644 --- a/bytes/src/lib.rs +++ b/bytes/src/lib.rs @@ -96,7 +96,7 @@ pub mod arc { sequestered: self.sequestered.clone(), }; - unsafe { self.ptr = self.ptr.offset(index as isize); } + unsafe { self.ptr = self.ptr.add(index); } self.len -= index; result @@ -161,7 +161,7 @@ pub mod arc { /// shared4.try_merge(shared2).ok().expect("Failed to merge 4 and 231"); /// ``` pub fn try_merge(&mut self, other: Bytes) -> Result<(), Bytes> { - if Arc::ptr_eq(&self.sequestered, &other.sequestered) && ::std::ptr::eq(unsafe { self.ptr.offset(self.len as isize) }, other.ptr) { + if Arc::ptr_eq(&self.sequestered, &other.sequestered) && ::std::ptr::eq(unsafe { self.ptr.add(self.len) }, other.ptr) { self.len += other.len; Ok(()) } diff --git a/communication/src/allocator/process.rs b/communication/src/allocator/process.rs index f8ac3322a..c255283c2 100644 --- a/communication/src/allocator/process.rs +++ b/communication/src/allocator/process.rs @@ -35,7 +35,7 @@ impl AllocateBuilder for ProcessBuilder { // Initialize buzzers; send first, then recv. for worker in self.buzzers_send.iter() { - let buzzer = Buzzer::new(); + let buzzer = Buzzer::default(); worker.send(buzzer).expect("Failed to send buzzer"); } let mut buzzers = Vec::with_capacity(self.buzzers_recv.len()); @@ -88,8 +88,8 @@ impl Process { counters_recv .into_iter() - .zip(buzzers_send.into_iter()) - .zip(buzzers_recv.into_iter()) + .zip(buzzers_send) + .zip(buzzers_recv) .enumerate() .map(|(index, ((recv, bsend), brecv))| { ProcessBuilder { diff --git a/communication/src/allocator/thread.rs b/communication/src/allocator/thread.rs index 2f1fca9b6..c957755c5 100644 --- a/communication/src/allocator/thread.rs +++ b/communication/src/allocator/thread.rs @@ -15,11 +15,12 @@ pub struct ThreadBuilder; impl AllocateBuilder for ThreadBuilder { type Allocator = Thread; - fn build(self) -> Self::Allocator { Thread::new() } + fn build(self) -> Self::Allocator { Thread::default() } } /// An allocator for intra-thread communication. +#[derive(Default)] pub struct Thread { /// Shared counts of messages in channels. events: Rc>>, @@ -53,13 +54,6 @@ pub type ThreadPusher = CountPusher>; pub type ThreadPuller = CountPuller>; impl Thread { - /// Allocates a new thread-local channel allocator. - pub fn new() -> Self { - Thread { - events: Rc::new(RefCell::new(Default::default())), - } - } - /// Creates a new thread-local channel from an identifier and shared counts. pub fn new_from(identifier: usize, events: Rc>>) -> (ThreadPusher, ThreadPuller) diff --git a/communication/src/allocator/zero_copy/allocator.rs b/communication/src/allocator/zero_copy/allocator.rs index dffc40380..f1dda0fb5 100644 --- a/communication/src/allocator/zero_copy/allocator.rs +++ b/communication/src/allocator/zero_copy/allocator.rs @@ -82,7 +82,7 @@ impl TcpBuilder { // Fulfill puller obligations. let mut recvs = Vec::with_capacity(self.peers); for promise in self.promises.into_iter() { - let buzzer = crate::buzzer::Buzzer::new(); + let buzzer = crate::buzzer::Buzzer::default(); let queue = MergeQueue::new(buzzer); promise.send(queue.clone()).expect("Failed to send MergeQueue"); recvs.push(queue.clone()); diff --git a/communication/src/allocator/zero_copy/allocator_process.rs b/communication/src/allocator/zero_copy/allocator_process.rs index eb81c20ec..d4eed9b43 100644 --- a/communication/src/allocator/zero_copy/allocator_process.rs +++ b/communication/src/allocator/zero_copy/allocator_process.rs @@ -60,7 +60,7 @@ impl ProcessBuilder { // Fulfill puller obligations. let mut recvs = Vec::with_capacity(self.peers); for puller in self.pullers.into_iter() { - let buzzer = crate::buzzer::Buzzer::new(); + let buzzer = crate::buzzer::Buzzer::default(); let queue = MergeQueue::new(buzzer); puller.send(queue.clone()).expect("Failed to send MergeQueue"); recvs.push(queue.clone()); diff --git a/communication/src/allocator/zero_copy/initialize.rs b/communication/src/allocator/zero_copy/initialize.rs index 622fe15e9..0b162dafe 100644 --- a/communication/src/allocator/zero_copy/initialize.rs +++ b/communication/src/allocator/zero_copy/initialize.rs @@ -63,10 +63,8 @@ pub fn initialize_networking_from_sockets( -> ::std::io::Result<(Vec>, CommsGuard)> { // Sockets are expected to be blocking, - for socket in sockets.iter_mut() { - if let Some(socket) = socket { - socket.set_nonblocking(false).expect("failed to set socket to blocking"); - } + for socket in sockets.iter_mut().flatten() { + socket.set_nonblocking(false).expect("failed to set socket to blocking"); } let processes = sockets.len(); diff --git a/communication/src/allocator/zero_copy/tcp.rs b/communication/src/allocator/zero_copy/tcp.rs index 153037db8..8400a76a4 100644 --- a/communication/src/allocator/zero_copy/tcp.rs +++ b/communication/src/allocator/zero_copy/tcp.rs @@ -1,4 +1,4 @@ -//! +//! Methods related to reading from and writing to TCP connections use std::io::{self, Write}; use crossbeam_channel::{Sender, Receiver}; @@ -67,9 +67,9 @@ where assert!(!buffer.empty().is_empty()); // Attempt to read some more bytes into self.buffer. - let read = match reader.read(&mut buffer.empty()) { + let read = match reader.read(buffer.empty()) { Err(x) => tcp_panic("reading data", x), - Ok(n) if n == 0 => { + Ok(0) => { tcp_panic( "reading data", std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "socket closed"), @@ -102,7 +102,7 @@ where panic!("Clean shutdown followed by data."); } buffer.ensure_capacity(1); - if reader.read(&mut buffer.empty()).unwrap_or_else(|e| tcp_panic("reading EOF", e)) > 0 { + if reader.read(buffer.empty()).unwrap_or_else(|e| tcp_panic("reading EOF", e)) > 0 { panic!("Clean shutdown followed by data."); } } @@ -141,7 +141,7 @@ pub fn send_loop( logger.as_mut().map(|l| l.log(StateEvent { send: true, process, remote, start: true, })); let mut sources: Vec = sources.into_iter().map(|x| { - let buzzer = crate::buzzer::Buzzer::new(); + let buzzer = crate::buzzer::Buzzer::default(); let queue = MergeQueue::new(buzzer); x.send(queue.clone()).expect("failed to send MergeQueue"); queue diff --git a/communication/src/buzzer.rs b/communication/src/buzzer.rs index 09048dc19..7ebdd7795 100644 --- a/communication/src/buzzer.rs +++ b/communication/src/buzzer.rs @@ -8,13 +8,11 @@ pub struct Buzzer { thread: Thread, } +impl Default for Buzzer { + fn default() -> Self { Self { thread: std::thread::current() } } +} + impl Buzzer { - /// Creates a new buzzer for the current thread. - pub fn new() -> Self { - Self { - thread: std::thread::current() - } - } /// Unparks the target thread. pub fn buzz(&self) { self.thread.unpark() diff --git a/communication/src/initialize.rs b/communication/src/initialize.rs index 040b5f06d..7b32d14f2 100644 --- a/communication/src/initialize.rs +++ b/communication/src/initialize.rs @@ -155,15 +155,15 @@ impl Config { Ok((vec![GenericBuilder::Thread(ThreadBuilder)], Box::new(()))) }, Config::Process(threads) => { - Ok((Process::new_vector(threads).into_iter().map(|x| GenericBuilder::Process(x)).collect(), Box::new(()))) + Ok((Process::new_vector(threads).into_iter().map(GenericBuilder::Process).collect(), Box::new(()))) }, Config::ProcessBinary(threads) => { - Ok((ProcessBuilder::new_vector(threads).into_iter().map(|x| GenericBuilder::ProcessBinary(x)).collect(), Box::new(()))) + Ok((ProcessBuilder::new_vector(threads).into_iter().map(GenericBuilder::ProcessBinary).collect(), Box::new(()))) }, Config::Cluster { threads, process, addresses, report, log_fn } => { match initialize_networking(addresses, process, threads, report, log_fn) { Ok((stuff, guard)) => { - Ok((stuff.into_iter().map(|x| GenericBuilder::ZeroCopy(x)).collect(), Box::new(guard))) + Ok((stuff.into_iter().map(GenericBuilder::ZeroCopy).collect(), Box::new(guard))) }, Err(err) => Err(format!("failed to initialize networking: {}", err)) } diff --git a/communication/src/lib.rs b/communication/src/lib.rs index 16fe73eeb..3fe7e0542 100644 --- a/communication/src/lib.rs +++ b/communication/src/lib.rs @@ -181,11 +181,11 @@ fn promise_futures(sends: usize, recvs: usize) -> (Vec>>, Vec = (0 .. sends).map(|_| Vec::with_capacity(recvs)).collect(); let mut recvers: Vec<_> = (0 .. recvs).map(|_| Vec::with_capacity(sends)).collect(); - for sender in 0 .. sends { - for recver in 0 .. recvs { + for sender in senders.iter_mut() { + for recver in recvers.iter_mut() { let (send, recv) = crossbeam_channel::unbounded(); - senders[sender].push(send); - recvers[recver].push(recv); + sender.push(send); + recver.push(recv); } } diff --git a/communication/src/networking.rs b/communication/src/networking.rs index 036e3eef7..dcb167de9 100644 --- a/communication/src/networking.rs +++ b/communication/src/networking.rs @@ -100,7 +100,7 @@ pub fn create_sockets(addresses: Vec, my_index: usize, noisy: bool) -> R let mut results = start_task.join().unwrap()?; results.push(None); let to_extend = await_task.join().unwrap()?; - results.extend(to_extend.into_iter()); + results.extend(to_extend); if noisy { println!("worker {}:\tinitialization complete", my_index) } diff --git a/container/src/columnation.rs b/container/src/columnation.rs index f2a7b4bdc..460640ad3 100644 --- a/container/src/columnation.rs +++ b/container/src/columnation.rs @@ -200,9 +200,9 @@ impl Default for TimelyStack { impl<'a, A: 'a + Columnation> FromIterator<&'a A> for TimelyStack { fn from_iter>(iter: T) -> Self { - let mut iter = iter.into_iter(); + let iter = iter.into_iter(); let mut c = TimelyStack::::with_capacity(iter.size_hint().0); - while let Some(element) = iter.next() { + for element in iter { c.copy(element); } diff --git a/container/src/flatcontainer.rs b/container/src/flatcontainer.rs index 62600a77e..90855a1f0 100644 --- a/container/src/flatcontainer.rs +++ b/container/src/flatcontainer.rs @@ -17,13 +17,13 @@ impl Container for FlatStack { type Iter<'a> = <&'a Self as IntoIterator>::IntoIter; - fn iter<'a>(&'a self) -> Self::Iter<'a> { + fn iter(&self) -> Self::Iter<'_> { IntoIterator::into_iter(self) } type DrainIter<'a> = Self::Iter<'a>; - fn drain<'a>(&'a mut self) -> Self::DrainIter<'a> { + fn drain(&mut self) -> Self::DrainIter<'_> { IntoIterator::into_iter(&*self) } } diff --git a/mdbook/src/chapter_2/chapter_2_4.md b/mdbook/src/chapter_2/chapter_2_4.md index e3f2910dd..dbbf7fbc1 100644 --- a/mdbook/src/chapter_2/chapter_2_4.md +++ b/mdbook/src/chapter_2/chapter_2_4.md @@ -184,7 +184,7 @@ fn main() { in1.binary_frontier(&in2, Pipeline, Pipeline, "concat_buffer", |capability, info| { - let mut notificator = FrontierNotificator::new(); + let mut notificator = FrontierNotificator::default(); let mut stash = HashMap::new(); move |input1, input2, output| { diff --git a/timely/examples/threadless.rs b/timely/examples/threadless.rs index 625af8662..24e32d871 100644 --- a/timely/examples/threadless.rs +++ b/timely/examples/threadless.rs @@ -5,7 +5,7 @@ use timely::WorkerConfig; fn main() { // create a naked single-threaded worker. - let allocator = timely::communication::allocator::Thread::new(); + let allocator = timely::communication::allocator::Thread::default(); let mut worker = timely::worker::Worker::new(WorkerConfig::default(), allocator); // create input and probe handles. diff --git a/timely/src/dataflow/channels/pullers/counter.rs b/timely/src/dataflow/channels/pullers/counter.rs index 1fdcf228d..6f71848a4 100644 --- a/timely/src/dataflow/channels/pullers/counter.rs +++ b/timely/src/dataflow/channels/pullers/counter.rs @@ -24,7 +24,7 @@ pub struct ConsumedGuard { impl ConsumedGuard { pub(crate) fn time(&self) -> &T { - &self.time.as_ref().unwrap() + self.time.as_ref().unwrap() } } diff --git a/timely/src/dataflow/channels/pushers/exchange.rs b/timely/src/dataflow/channels/pushers/exchange.rs index c5e6020f1..07a0f7f17 100644 --- a/timely/src/dataflow/channels/pushers/exchange.rs +++ b/timely/src/dataflow/channels/pushers/exchange.rs @@ -71,7 +71,7 @@ where let hash_func = &mut self.hash_func; // if the number of pushers is a power of two, use a mask - if (self.pushers.len() & (self.pushers.len() - 1)) == 0 { + if self.pushers.len().is_power_of_two() { let mask = (self.pushers.len() - 1) as u64; let pushers = &mut self.pushers; data.push_partitioned( diff --git a/timely/src/dataflow/operators/aggregation/aggregate.rs b/timely/src/dataflow/operators/aggregation/aggregate.rs index 9b8e24282..196f1c5d0 100644 --- a/timely/src/dataflow/operators/aggregation/aggregate.rs +++ b/timely/src/dataflow/operators/aggregation/aggregate.rs @@ -76,7 +76,7 @@ impl Aggregate for hash: H) -> Stream where S::Timestamp: Eq { let mut aggregates = HashMap::new(); - self.unary_notify(Exchange::new(move |&(ref k, _)| hash(k)), "Aggregate", vec![], move |input, output, notificator| { + self.unary_notify(Exchange::new(move |(k, _)| hash(k)), "Aggregate", vec![], move |input, output, notificator| { // read each input, fold into aggregates input.for_each(|time, data| { diff --git a/timely/src/dataflow/operators/aggregation/state_machine.rs b/timely/src/dataflow/operators/aggregation/state_machine.rs index f82c3172e..7347f8c12 100644 --- a/timely/src/dataflow/operators/aggregation/state_machine.rs +++ b/timely/src/dataflow/operators/aggregation/state_machine.rs @@ -7,6 +7,8 @@ use crate::dataflow::{Stream, Scope}; use crate::dataflow::operators::generic::operator::Operator; use crate::dataflow::channels::pact::Exchange; +/// Provides the `state_machine` method. +/// /// Generic state-transition machinery: each key has a state, and receives a sequence of events. /// Events are applied in time-order, but no other promises are made. Each state transition can /// produce output, which is sent. @@ -15,8 +17,6 @@ use crate::dataflow::channels::pact::Exchange; /// updates for the current time reflected in the notificator, though. In the case of partially /// ordered times, the only guarantee is that updates are not applied out of order, not that there /// is some total order on times respecting the total order (updates may be interleaved). - -/// Provides the `state_machine` method. pub trait StateMachine { /// Tracks a state for each presented key, using user-supplied state transition logic. /// @@ -66,7 +66,7 @@ impl StateMachine f let mut pending: HashMap<_, Vec<(K, V)>> = HashMap::new(); // times -> (keys -> state) let mut states = HashMap::new(); // keys -> state - self.unary_notify(Exchange::new(move |&(ref k, _)| hash(k)), "StateMachine", vec![], move |input, output, notificator| { + self.unary_notify(Exchange::new(move |(k, _)| hash(k)), "StateMachine", vec![], move |input, output, notificator| { // go through each time with data, process each (key, val) pair. notificator.for_each(|time,_,_| { @@ -88,7 +88,7 @@ impl StateMachine f // stash if not time yet if notificator.frontier(0).less_than(time.time()) { - pending.entry(time.time().clone()).or_insert_with(Vec::new).extend(data.drain(..)); + pending.entry(time.time().clone()).or_insert_with(Vec::new).append(data); notificator.notify_at(time.retain()); } else { diff --git a/timely/src/dataflow/operators/branch.rs b/timely/src/dataflow/operators/branch.rs index c835986e8..8b4ee8674 100644 --- a/timely/src/dataflow/operators/branch.rs +++ b/timely/src/dataflow/operators/branch.rs @@ -54,7 +54,7 @@ impl Branch for Stream { let mut out1 = output1_handle.session(&time); let mut out2 = output2_handle.session(&time); for datum in data.drain(..) { - if condition(&time.time(), &datum) { + if condition(time.time(), &datum) { out2.give(datum); } else { out1.give(datum); @@ -107,7 +107,7 @@ impl BranchWhen for StreamCore { let mut output2_handle = output2.activate(); input.for_each(|time, data| { - let mut out = if condition(&time.time()) { + let mut out = if condition(time.time()) { output2_handle.session(&time) } else { output1_handle.session(&time) diff --git a/timely/src/dataflow/operators/capability.rs b/timely/src/dataflow/operators/capability.rs index 8aa0adad6..bc5bedc9a 100644 --- a/timely/src/dataflow/operators/capability.rs +++ b/timely/src/dataflow/operators/capability.rs @@ -40,13 +40,13 @@ pub trait CapabilityTrait { fn valid_for_output(&self, query_buffer: &Rc>>) -> bool; } -impl<'a, T: Timestamp, C: CapabilityTrait> CapabilityTrait for &'a C { +impl> CapabilityTrait for &C { fn time(&self) -> &T { (**self).time() } fn valid_for_output(&self, query_buffer: &Rc>>) -> bool { (**self).valid_for_output(query_buffer) } } -impl<'a, T: Timestamp, C: CapabilityTrait> CapabilityTrait for &'a mut C { +impl> CapabilityTrait for &mut C { fn time(&self) -> &T { (**self).time() } fn valid_for_output(&self, query_buffer: &Rc>>) -> bool { (**self).valid_for_output(query_buffer) @@ -227,9 +227,10 @@ impl Error for DowngradeError {} /// A shared list of shared output capability buffers. type CapabilityUpdates = Rc>>>>>; -/// An capability of an input port. Holding onto this capability will implicitly holds onto a -/// capability for all the outputs ports this input is connected to, after the connection summaries -/// have been applied. +/// An capability of an input port. +/// +/// Holding onto this capability will implicitly holds onto a capability for all the outputs +/// ports this input is connected to, after the connection summaries have been applied. /// /// This input capability supplies a `retain_for_output(self)` method which consumes the input /// capability and turns it into a [Capability] for a specific output port. diff --git a/timely/src/dataflow/operators/core/enterleave.rs b/timely/src/dataflow/operators/core/enterleave.rs index 0e1ac86bb..9ecf90c5e 100644 --- a/timely/src/dataflow/operators/core/enterleave.rs +++ b/timely/src/dataflow/operators/core/enterleave.rs @@ -103,7 +103,7 @@ pub trait Leave { fn leave(&self) -> StreamCore; } -impl<'a, G: Scope, C: Clone+Container, T: Timestamp+Refines> Leave for StreamCore, C> { +impl> Leave for StreamCore, C> { fn leave(&self) -> StreamCore { let scope = self.scope(); diff --git a/timely/src/dataflow/operators/core/exchange.rs b/timely/src/dataflow/operators/core/exchange.rs index 8215f00ee..19a34338e 100644 --- a/timely/src/dataflow/operators/core/exchange.rs +++ b/timely/src/dataflow/operators/core/exchange.rs @@ -23,18 +23,18 @@ pub trait Exchange { /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn exchange(&self, route: F) -> Self + fn exchange(&self, route: F) -> Self where - for<'a> F: FnMut(&C::Item<'a>) -> u64; + for<'a> F: FnMut(&C::Item<'a>) -> u64 + 'static; } impl Exchange for StreamCore where C: PushPartitioned + ExchangeData, { - fn exchange(&self, route: F) -> StreamCore + fn exchange(&self, route: F) -> StreamCore where - for<'a> F: FnMut(&C::Item<'a>) -> u64, + for<'a> F: FnMut(&C::Item<'a>) -> u64 + 'static, { self.unary(ExchangeCore::new(route), "Exchange", |_, _| { move |input, output| { diff --git a/timely/src/dataflow/operators/core/inspect.rs b/timely/src/dataflow/operators/core/inspect.rs index e9a231b4c..4b6fa830a 100644 --- a/timely/src/dataflow/operators/core/inspect.rs +++ b/timely/src/dataflow/operators/core/inspect.rs @@ -18,9 +18,9 @@ pub trait Inspect: InspectCore + Sized { /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn inspect(&self, mut func: F) -> Self + fn inspect(&self, mut func: F) -> Self where - for<'a> F: FnMut(C::ItemRef<'a>) + F: for<'a> FnMut(C::ItemRef<'a>) + 'static, { self.inspect_batch(move |_, data| { for datum in data.iter() { func(datum); } @@ -38,13 +38,13 @@ pub trait Inspect: InspectCore + Sized { /// .inspect_time(|t, x| println!("seen at: {:?}\t{:?}", t, x)); /// }); /// ``` - fn inspect_time(&self, mut func: F) -> Self + fn inspect_time(&self, mut func: F) -> Self where - for <'a> F: FnMut(&G::Timestamp, C::ItemRef<'a>), + F: for<'a> FnMut(&G::Timestamp, C::ItemRef<'a>) + 'static, { self.inspect_batch(move |time, data| { for datum in data.iter() { - func(&time, datum); + func(time, datum); } }) } @@ -91,8 +91,8 @@ pub trait Inspect: InspectCore + Sized { } impl Inspect for StreamCore { - fn inspect_core(&self, mut func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>) + 'static { - self.inspect_container(move |r| func(r)) + fn inspect_core(&self, func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>) + 'static { + self.inspect_container(func) } } diff --git a/timely/src/dataflow/operators/core/probe.rs b/timely/src/dataflow/operators/core/probe.rs index b69485e82..1c8d9750c 100644 --- a/timely/src/dataflow/operators/core/probe.rs +++ b/timely/src/dataflow/operators/core/probe.rs @@ -83,8 +83,8 @@ impl Probe for StreamCore { fn probe(&self) -> Handle { // the frontier is shared state; scope updates, handle reads. - let mut handle = Handle::::new(); - self.probe_with(&mut handle); + let handle = Handle::::new(); + self.probe_with(&handle); handle } fn probe_with(&self, handle: &Handle) -> StreamCore { diff --git a/timely/src/dataflow/operators/count.rs b/timely/src/dataflow/operators/count.rs index c0e01d94e..89e650b2b 100644 --- a/timely/src/dataflow/operators/count.rs +++ b/timely/src/dataflow/operators/count.rs @@ -54,7 +54,7 @@ impl Accumulate for Stream { let mut accums = HashMap::new(); self.unary_notify(Pipeline, "Accumulate", vec![], move |input, output, notificator| { input.for_each(|time, data| { - logic(&mut accums.entry(time.time().clone()).or_insert_with(|| default.clone()), data); + logic(accums.entry(time.time().clone()).or_insert_with(|| default.clone()), data); notificator.notify_at(time.retain()); }); diff --git a/timely/src/dataflow/operators/flow_controlled.rs b/timely/src/dataflow/operators/flow_controlled.rs index 0e97037fc..0d91ef7f9 100644 --- a/timely/src/dataflow/operators/flow_controlled.rs +++ b/timely/src/dataflow/operators/flow_controlled.rs @@ -20,18 +20,19 @@ pub struct IteratorSourceInput, I: I } /// Construct a source that repeatedly calls the provided function to ingest input. -/// - The function can return `None` to signal the end of the input; -/// - otherwise, it should return a [`IteratorSourceInput`], where: -/// * `lower_bound` is a lower bound on timestamps that can be emitted by this input in the future, +/// +/// The function can return `None` to signal the end of the input. +/// Otherwise, it should return a [`IteratorSourceInput`], where: +/// * `lower_bound` is a lower bound on timestamps that can be emitted by this input in the future, /// `Default::default()` can be used if this isn't needed (the source will assume that /// the timestamps in `data` are monotonically increasing and will release capabilities /// accordingly); -/// * `data` is any `T: IntoIterator` of new input data in the form (time, data): time must be +/// * `data` is any `T: IntoIterator` of new input data in the form (time, data): time must be /// monotonically increasing; -/// * `target` is a timestamp that represents the frontier that the probe should have +/// * `target` is a timestamp that represents the frontier that the probe should have /// reached before the function is invoked again to ingest additional input. -/// The function will receive the current lower bound of timestamps that can be inserted, -/// `lower_bound`. +/// The function will receive the current lower bound of timestamps that can be inserted, +/// `lower_bound`. /// /// # Example /// ```rust diff --git a/timely/src/dataflow/operators/generic/handles.rs b/timely/src/dataflow/operators/generic/handles.rs index 586cec88a..1eb0cec07 100644 --- a/timely/src/dataflow/operators/generic/handles.rs +++ b/timely/src/dataflow/operators/generic/handles.rs @@ -48,7 +48,7 @@ pub struct FrontieredInputHandleCore<'a, T: Timestamp, C: Container+'a, P: Pull< /// Handle to an operator's input stream and frontier, specialized to vectors. pub type FrontieredInputHandle<'a, T, D, P> = FrontieredInputHandleCore<'a, T, Vec, P>; -impl<'a, T: Timestamp, C: Container, P: Pull>> InputHandleCore { +impl>> InputHandleCore { /// Reads the next input buffer (at some timestamp `t`) and a corresponding capability for `t`. /// The timestamp `t` of the input buffer can be retrieved by invoking `.time()` on the capability. @@ -225,7 +225,7 @@ impl<'a, T: Timestamp, CB: ContainerBuilder, P: Push>> /// }); /// ``` pub fn session_with_builder<'b, CT: CapabilityTrait>(&'b mut self, cap: &'b CT) -> Session<'b, T, CB, PushCounter> where 'a: 'b { - assert!(cap.valid_for_output(&self.internal_buffer), "Attempted to open output session with invalid capability"); + assert!(cap.valid_for_output(self.internal_buffer), "Attempted to open output session with invalid capability"); self.push_buffer.session_with_builder(cap.time()) } @@ -264,7 +264,7 @@ impl<'a, T: Timestamp, C: Container, P: Push>> OutputHandleCore<'a } } -impl<'a, T: Timestamp, CB: ContainerBuilder, P: Push>> Drop for OutputHandleCore<'a, T, CB, P> { +impl>> Drop for OutputHandleCore<'_, T, CB, P> { fn drop(&mut self) { self.push_buffer.cease(); } diff --git a/timely/src/dataflow/operators/generic/notificator.rs b/timely/src/dataflow/operators/generic/notificator.rs index 0a6d0e75e..905fdef4f 100644 --- a/timely/src/dataflow/operators/generic/notificator.rs +++ b/timely/src/dataflow/operators/generic/notificator.rs @@ -89,7 +89,7 @@ impl<'a, T: Timestamp> Notificator<'a, T> { } } -impl<'a, T: Timestamp> Iterator for Notificator<'a, T> { +impl Iterator for Notificator<'_, T> { type Item = (Capability, u64); /// Retrieve the next available notification. @@ -196,7 +196,7 @@ fn notificator_delivers_notifications_in_topo_order() { /// let (in1_handle, in1) = scope.new_input(); /// let (in2_handle, in2) = scope.new_input(); /// in1.binary_frontier(&in2, Pipeline, Pipeline, "example", |mut _default_cap, _info| { -/// let mut notificator = FrontierNotificator::new(); +/// let mut notificator = FrontierNotificator::default(); /// let mut stash = HashMap::new(); /// move |input1, input2, output| { /// while let Some((time, data)) = input1.next() { @@ -236,15 +236,16 @@ pub struct FrontierNotificator { available: ::std::collections::BinaryHeap>, } -impl FrontierNotificator { - /// Allocates a new `FrontierNotificator`. - pub fn new() -> Self { +impl Default for FrontierNotificator { + fn default() -> Self { FrontierNotificator { pending: Vec::new(), available: ::std::collections::BinaryHeap::new(), } } +} +impl FrontierNotificator { /// Allocates a new `FrontierNotificator` with initial capabilities. pub fn from>>(iter: I) -> Self { FrontierNotificator { @@ -268,7 +269,7 @@ impl FrontierNotificator { /// timely::example(|scope| { /// (0..10).to_stream(scope) /// .unary_frontier(Pipeline, "example", |_, _| { - /// let mut notificator = FrontierNotificator::new(); + /// let mut notificator = FrontierNotificator::default(); /// move |input, output| { /// input.for_each(|cap, data| { /// output.session(&cap).give_container(data); @@ -398,7 +399,7 @@ impl FrontierNotificator { /// timely::example(|scope| { /// (0..10).to_stream(scope) /// .unary_frontier(Pipeline, "example", |_, _| { - /// let mut notificator = FrontierNotificator::new(); + /// let mut notificator = FrontierNotificator::default(); /// move |input, output| { /// input.for_each(|cap, data| { /// output.session(&cap).give_container(data); @@ -430,7 +431,7 @@ impl OrderReversed { impl PartialOrd for OrderReversed { fn partial_cmp(&self, other: &Self) -> Option<::std::cmp::Ordering> { - other.element.time().partial_cmp(self.element.time()) + Some(self.cmp(other)) } } impl Ord for OrderReversed { diff --git a/timely/src/dataflow/operators/generic/operator.rs b/timely/src/dataflow/operators/generic/operator.rs index fb34fcacc..e28381dfb 100644 --- a/timely/src/dataflow/operators/generic/operator.rs +++ b/timely/src/dataflow/operators/generic/operator.rs @@ -33,7 +33,7 @@ pub trait Operator { /// (0u64..10).to_stream(scope) /// .unary_frontier(Pipeline, "example", |default_cap, _info| { /// let mut cap = Some(default_cap.delayed(&12)); - /// let mut notificator = FrontierNotificator::new(); + /// let mut notificator = FrontierNotificator::default(); /// let mut stash = HashMap::new(); /// move |input, output| { /// if let Some(ref c) = cap.take() { @@ -147,7 +147,7 @@ pub trait Operator { /// let (in1_handle, in1) = scope.new_input(); /// let (in2_handle, in2) = scope.new_input(); /// in1.binary_frontier(&in2, Pipeline, Pipeline, "example", |mut _default_cap, _info| { - /// let mut notificator = FrontierNotificator::new(); + /// let mut notificator = FrontierNotificator::default(); /// let mut stash = HashMap::new(); /// move |input1, input2, output| { /// while let Some((time, data)) = input1.next() { @@ -349,7 +349,7 @@ impl Operator for StreamCore { (&self, pact: P, name: &str, init: impl IntoIterator, mut logic: L) -> StreamCore { self.unary_frontier(pact, name, move |capability, _info| { - let mut notificator = FrontierNotificator::new(); + let mut notificator = FrontierNotificator::default(); for time in init { notificator.notify_at(capability.delayed(&time)); } @@ -358,7 +358,7 @@ impl Operator for StreamCore { move |input, output| { let frontier = &[input.frontier()]; let notificator = &mut Notificator::new(frontier, &mut notificator, &logging); - logic(&mut input.handle, output, notificator); + logic(input.handle, output, notificator); } }) } @@ -435,7 +435,7 @@ impl Operator for StreamCore { (&self, other: &StreamCore, pact1: P1, pact2: P2, name: &str, init: impl IntoIterator, mut logic: L) -> StreamCore { self.binary_frontier(other, pact1, pact2, name, |capability, _info| { - let mut notificator = FrontierNotificator::new(); + let mut notificator = FrontierNotificator::default(); for time in init { notificator.notify_at(capability.delayed(&time)); } @@ -444,7 +444,7 @@ impl Operator for StreamCore { move |input1, input2, output| { let frontiers = &[input1.frontier(), input2.frontier()]; let notificator = &mut Notificator::new(frontiers, &mut notificator, &logging); - logic(&mut input1.handle, &mut input2.handle, output, notificator); + logic(input1.handle, input2.handle, output, notificator); } }) diff --git a/timely/src/dataflow/operators/result.rs b/timely/src/dataflow/operators/result.rs index 3607f9f8e..6dddf8a71 100644 --- a/timely/src/dataflow/operators/result.rs +++ b/timely/src/dataflow/operators/result.rs @@ -105,19 +105,19 @@ impl ResultStream for Stream T2 + 'static>(&self, mut logic: L) -> Stream> { - self.map(move |r| r.map(|x| logic(x))) + self.map(move |r| r.map(&mut logic)) } fn map_err E2 + 'static>(&self, mut logic: L) -> Stream> { - self.map(move |r| r.map_err(|x| logic(x))) + self.map(move |r| r.map_err(&mut logic)) } fn and_then Result + 'static>(&self, mut logic: L) -> Stream> { - self.map(move |r| r.and_then(|x| logic(x))) + self.map(move |r| r.and_then(&mut logic)) } fn unwrap_or_else T + 'static>(&self, mut logic: L) -> Stream { - self.map(move |r| r.unwrap_or_else(|err| logic(err))) + self.map(move |r| r.unwrap_or_else(&mut logic)) } } diff --git a/timely/src/dataflow/scopes/child.rs b/timely/src/dataflow/scopes/child.rs index 277466100..e8872e849 100644 --- a/timely/src/dataflow/scopes/child.rs +++ b/timely/src/dataflow/scopes/child.rs @@ -37,7 +37,7 @@ where pub progress_logging: Option, } -impl<'a, G, T> Child<'a, G, T> +impl Child<'_, G, T> where G: ScopeParent, T: Timestamp+Refines @@ -50,7 +50,7 @@ where pub fn peers(&self) -> usize { self.parent.peers() } } -impl<'a, G, T> AsWorker for Child<'a, G, T> +impl AsWorker for Child<'_, G, T> where G: ScopeParent, T: Timestamp+Refines @@ -75,7 +75,7 @@ where } } -impl<'a, G, T> Scheduler for Child<'a, G, T> +impl Scheduler for Child<'_, G, T> where G: ScopeParent, T: Timestamp+Refines @@ -85,7 +85,7 @@ where } } -impl<'a, G, T> ScopeParent for Child<'a, G, T> +impl ScopeParent for Child<'_, G, T> where G: ScopeParent, T: Timestamp+Refines @@ -93,7 +93,7 @@ where type Timestamp = T; } -impl<'a, G, T> Scope for Child<'a, G, T> +impl Scope for Child<'_, G, T> where G: ScopeParent, T: Timestamp+Refines, @@ -148,7 +148,7 @@ where } } -impl<'a, G, T> Clone for Child<'a, G, T> +impl Clone for Child<'_, G, T> where G: ScopeParent, T: Timestamp+Refines diff --git a/timely/src/execute.rs b/timely/src/execute.rs index 8e173b518..ed0ad99ed 100644 --- a/timely/src/execute.rs +++ b/timely/src/execute.rs @@ -153,7 +153,7 @@ where T: Send+'static, F: FnOnce(&mut Worker)->T+Send+Sync+'static { - let alloc = crate::communication::allocator::thread::Thread::new(); + let alloc = crate::communication::allocator::thread::Thread::default(); let mut worker = crate::worker::Worker::new(WorkerConfig::default(), alloc); let result = func(&mut worker); while worker.has_dataflows() { diff --git a/timely/src/logging.rs b/timely/src/logging.rs index df2e03822..6043f2d16 100644 --- a/timely/src/logging.rs +++ b/timely/src/logging.rs @@ -33,7 +33,7 @@ impl BatchLogger where P: EventPusher) { if !data.is_empty() { - self.event_pusher.push(Event::Messages(self.time, data.drain(..).collect())); + self.event_pusher.push(Event::Messages(self.time, std::mem::take(data))); } if self.time < time { let new_frontier = time; diff --git a/timely/src/progress/broadcast.rs b/timely/src/progress/broadcast.rs index 19f09ef72..1092929c0 100644 --- a/timely/src/progress/broadcast.rs +++ b/timely/src/progress/broadcast.rs @@ -151,7 +151,7 @@ impl Progcaster { l.log(crate::logging::TimelyProgressEvent { is_send: false, - source: source, + source, seq_no: counter, channel, addr: addr.to_vec(), diff --git a/timely/src/progress/frontier.rs b/timely/src/progress/frontier.rs index e10a7c7f4..8b484112b 100644 --- a/timely/src/progress/frontier.rs +++ b/timely/src/progress/frontier.rs @@ -337,9 +337,9 @@ impl From> for Antichain { } } -impl Into> for Antichain { - fn into(self) -> SmallVec<[T; 1]> { - self.elements +impl From> for SmallVec<[T; 1]> { + fn from(val: Antichain) -> Self { + val.elements } } @@ -634,7 +634,7 @@ pub trait MutableAntichainFilter { impl> MutableAntichainFilter for I { fn filter_through(self, antichain: &mut MutableAntichain) -> smallvec::Drain<[(T,i64); 2]> { - antichain.update_iter(self.into_iter()) + antichain.update_iter(self) } } @@ -675,11 +675,7 @@ pub struct AntichainRef<'a, T: 'a> { } impl<'a, T: 'a> Clone for AntichainRef<'a, T> { - fn clone(&self) -> Self { - Self { - frontier: self.frontier, - } - } + fn clone(&self) -> Self { *self } } impl<'a, T: 'a> Copy for AntichainRef<'a, T> { } @@ -749,7 +745,7 @@ impl AntichainRef<'_, T> { } } -impl<'a, T: PartialEq> PartialEq for AntichainRef<'a, T> { +impl PartialEq for AntichainRef<'_, T> { fn eq(&self, other: &Self) -> bool { // Lengths should be the same, with the option for fast acceptance if identical. self.len() == other.len() && @@ -760,17 +756,17 @@ impl<'a, T: PartialEq> PartialEq for AntichainRef<'a, T> { } } -impl<'a, T: Eq> Eq for AntichainRef<'a, T> { } +impl Eq for AntichainRef<'_, T> { } -impl<'a, T: PartialOrder> PartialOrder for AntichainRef<'a, T> { +impl PartialOrder for AntichainRef<'_, T> { fn less_equal(&self, other: &Self) -> bool { other.iter().all(|t2| self.iter().any(|t1| t1.less_equal(t2))) } } -impl<'a, T: TotalOrder> TotalOrder for AntichainRef<'a, T> { } +impl TotalOrder for AntichainRef<'_, T> { } -impl<'a, T: TotalOrder> AntichainRef<'a, T> { +impl AntichainRef<'_, T> { /// Return a reference to the at most one element the antichain contains. pub fn as_option(&self) -> Option<&T> { debug_assert!(self.len() <= 1); @@ -778,7 +774,7 @@ impl<'a, T: TotalOrder> AntichainRef<'a, T> { } } -impl<'a, T> ::std::ops::Deref for AntichainRef<'a, T> { +impl ::std::ops::Deref for AntichainRef<'_, T> { type Target = [T]; fn deref(&self) -> &Self::Target { self.frontier diff --git a/timely/src/progress/reachability.rs b/timely/src/progress/reachability.rs index 9521b31cd..1f0c45adf 100644 --- a/timely/src/progress/reachability.rs +++ b/timely/src/progress/reachability.rs @@ -738,8 +738,8 @@ impl Tracker { /// Graph locations may be missing from the output, in which case they have no /// paths to scope outputs. fn summarize_outputs( - nodes: &Vec>>>, - edges: &Vec>>, + nodes: &[Vec>>], + edges: &[Vec>], ) -> HashMap>> { // A reverse edge map, to allow us to walk back up the dataflow graph. diff --git a/timely/src/progress/subgraph.rs b/timely/src/progress/subgraph.rs index dbe735655..1fdacac1e 100644 --- a/timely/src/progress/subgraph.rs +++ b/timely/src/progress/subgraph.rs @@ -621,7 +621,7 @@ impl PerOperatorState { name: "External".to_owned(), operator: None, index: 0, - id: usize::max_value(), + id: usize::MAX, local: false, notify: true, inputs, diff --git a/timely/src/progress/timestamp.rs b/timely/src/progress/timestamp.rs index f6441b75c..8b88fb731 100644 --- a/timely/src/progress/timestamp.rs +++ b/timely/src/progress/timestamp.rs @@ -61,7 +61,7 @@ pub trait PathSummary : Clone+'static+Eq+PartialOrder+Debug+Default { fn followed_by(&self, other: &Self) -> Option; } -impl Timestamp for () { type Summary = (); fn minimum() -> Self { () }} +impl Timestamp for () { type Summary = (); fn minimum() -> Self { }} impl PathSummary<()> for () { #[inline] fn results_in(&self, _src: &()) -> Option<()> { Some(()) } #[inline] fn followed_by(&self, _other: &()) -> Option<()> { Some(()) } @@ -73,7 +73,7 @@ macro_rules! implement_timestamp_add { $( impl Timestamp for $index_type { type Summary = $index_type; - fn minimum() -> Self { Self::min_value() } + fn minimum() -> Self { Self::MIN } } impl PathSummary<$index_type> for $index_type { #[inline] @@ -140,8 +140,8 @@ mod refines { $( impl Refines<()> for $index_type { fn to_inner(_: ()) -> $index_type { Default::default() } - fn to_outer(self) -> () { () } - fn summarize(_: <$index_type as Timestamp>::Summary) -> () { () } + fn to_outer(self) -> () { } + fn summarize(_: <$index_type as Timestamp>::Summary) -> () { } } )* ) diff --git a/timely/src/synchronization/sequence.rs b/timely/src/synchronization/sequence.rs index df5fea366..37c61c3f9 100644 --- a/timely/src/synchronization/sequence.rs +++ b/timely/src/synchronization/sequence.rs @@ -179,7 +179,7 @@ impl Sequencer { input.for_each(|time, data| { recvd.reserve(data.len()); for (worker, counter, element) in data.drain(..) { - recvd.push(((time.time().clone(), worker, counter), element)); + recvd.push(((*time.time(), worker, counter), element)); } }); diff --git a/timely/src/worker.rs b/timely/src/worker.rs index 949efe605..9459a107b 100644 --- a/timely/src/worker.rs +++ b/timely/src/worker.rs @@ -34,7 +34,7 @@ use crate::logging::TimelyLogger; /// If you are not certain which option to use, prefer `Demand`, and /// perhaps monitor the progress messages through timely's logging /// infrastructure to see if their volume is surprisingly high. -#[derive(Debug, Clone, Copy, Eq, PartialEq)] +#[derive(Debug, Default, Clone, Copy, Eq, PartialEq)] pub enum ProgressMode { /// Eagerly transmit all progress updates produced by a worker. /// @@ -61,15 +61,10 @@ pub enum ProgressMode { /// the messages can affect. Once the capability is released, the /// progress messages are unblocked and transmitted, in accumulated /// form. + #[default] Demand, } -impl Default for ProgressMode { - fn default() -> ProgressMode { - ProgressMode::Demand - } -} - impl FromStr for ProgressMode { type Err = String;