From b869dcb0717c227f5b24db213ba4a01ad601e566 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Mon, 5 Feb 2024 16:44:17 -0500 Subject: [PATCH] Activate only by channel ID (#526) * Activate only by channel ID Signed-off-by: Moritz Hoffmann * Remove Event Signed-off-by: Moritz Hoffmann * Remove comment Signed-off-by: Moritz Hoffmann --------- Signed-off-by: Moritz Hoffmann --- communication/src/allocator/counters.rs | 22 +++++++++---------- communication/src/allocator/generic.rs | 7 +++--- communication/src/allocator/mod.rs | 11 +--------- communication/src/allocator/process.rs | 18 +++++++-------- communication/src/allocator/thread.rs | 10 ++++----- .../src/allocator/zero_copy/allocator.rs | 5 ++--- .../allocator/zero_copy/allocator_process.rs | 10 ++++----- timely/src/worker.rs | 5 +++-- 8 files changed, 38 insertions(+), 50 deletions(-) diff --git a/communication/src/allocator/counters.rs b/communication/src/allocator/counters.rs index 68d1fa4ab2..4bee06117b 100644 --- a/communication/src/allocator/counters.rs +++ b/communication/src/allocator/counters.rs @@ -2,23 +2,21 @@ use std::rc::Rc; use std::cell::RefCell; -use std::collections::VecDeque; use crate::{Push, Pull}; -use crate::allocator::Event; /// The push half of an intra-thread channel. pub struct Pusher> { index: usize, // count: usize, - events: Rc>>, + events: Rc>>, pusher: P, phantom: ::std::marker::PhantomData, } impl> Pusher { /// Wraps a pusher with a message counter. - pub fn new(pusher: P, index: usize, events: Rc>>) -> Self { + pub fn new(pusher: P, index: usize, events: Rc>>) -> Self { Pusher { index, // count: 0, @@ -36,7 +34,7 @@ impl> Push for Pusher { // if self.count != 0 { // self.events // .borrow_mut() - // .push_back((self.index, Event::Pushed(self.count))); + // .push_back(self.index); // self.count = 0; // } // } @@ -47,7 +45,7 @@ impl> Push for Pusher { // moving information along. Better, but needs cooperation. self.events .borrow_mut() - .push_back((self.index, Event::Pushed(1))); + .push(self.index); self.pusher.push(element) } @@ -59,7 +57,7 @@ use crossbeam_channel::Sender; pub struct ArcPusher> { index: usize, // count: usize, - events: Sender<(usize, Event)>, + events: Sender, pusher: P, phantom: ::std::marker::PhantomData, buzzer: crate::buzzer::Buzzer, @@ -67,7 +65,7 @@ pub struct ArcPusher> { impl> ArcPusher { /// Wraps a pusher with a message counter. - pub fn new(pusher: P, index: usize, events: Sender<(usize, Event)>, buzzer: crate::buzzer::Buzzer) -> Self { + pub fn new(pusher: P, index: usize, events: Sender, buzzer: crate::buzzer::Buzzer) -> Self { ArcPusher { index, // count: 0, @@ -99,7 +97,7 @@ impl> Push for ArcPusher { // and finally awaken the thread. Other orders are defective when // multiple threads are involved. self.pusher.push(element); - let _ = self.events.send((self.index, Event::Pushed(1))); + let _ = self.events.send(self.index); // TODO : Perhaps this shouldn't be a fatal error (e.g. in shutdown). // .expect("Failed to send message count"); self.buzzer.buzz(); @@ -110,14 +108,14 @@ impl> Push for ArcPusher { pub struct Puller> { index: usize, count: usize, - events: Rc>>, + events: Rc>>, puller: P, phantom: ::std::marker::PhantomData, } impl> Puller { /// Wraps a puller with a message counter. - pub fn new(puller: P, index: usize, events: Rc>>) -> Self { + pub fn new(puller: P, index: usize, events: Rc>>) -> Self { Puller { index, count: 0, @@ -135,7 +133,7 @@ impl> Pull for Puller { if self.count != 0 { self.events .borrow_mut() - .push_back((self.index, Event::Pulled(self.count))); + .push(self.index); self.count = 0; } } diff --git a/communication/src/allocator/generic.rs b/communication/src/allocator/generic.rs index 625de6cd68..49f0140727 100644 --- a/communication/src/allocator/generic.rs +++ b/communication/src/allocator/generic.rs @@ -5,11 +5,10 @@ use std::rc::Rc; use std::cell::RefCell; -use std::collections::VecDeque; use crate::allocator::thread::ThreadBuilder; use crate::allocator::process::ProcessBuilder as TypedProcessBuilder; -use crate::allocator::{Allocate, AllocateBuilder, Event, Thread, Process}; +use crate::allocator::{Allocate, AllocateBuilder, Thread, Process}; use crate::allocator::zero_copy::allocator_process::{ProcessBuilder, ProcessAllocator}; use crate::allocator::zero_copy::allocator::{TcpBuilder, TcpAllocator}; @@ -74,7 +73,7 @@ impl Generic { Generic::ZeroCopy(z) => z.release(), } } - fn events(&self) -> &Rc>> { + fn events(&self) -> &Rc>> { match self { Generic::Thread(ref t) => t.events(), Generic::Process(ref p) => p.events(), @@ -93,7 +92,7 @@ impl Allocate for Generic { fn receive(&mut self) { self.receive(); } fn release(&mut self) { self.release(); } - fn events(&self) -> &Rc>> { self.events() } + fn events(&self) -> &Rc>> { self.events() } fn await_events(&self, _duration: Option) { match self { Generic::Thread(t) => t.await_events(_duration), diff --git a/communication/src/allocator/mod.rs b/communication/src/allocator/mod.rs index 4c29b85e15..e5b858f69a 100644 --- a/communication/src/allocator/mod.rs +++ b/communication/src/allocator/mod.rs @@ -3,7 +3,6 @@ use std::rc::Rc; use std::cell::RefCell; use std::time::Duration; -use std::collections::VecDeque; pub use self::thread::Thread; pub use self::process::Process; @@ -50,7 +49,7 @@ pub trait Allocate { /// drain these events in order to drive their computation. If they /// fail to do so the event queue may become quite large, and turn /// into a performance problem. - fn events(&self) -> &Rc>>; + fn events(&self) -> &Rc>>; /// Awaits communication events. /// @@ -92,11 +91,3 @@ pub trait Allocate { thread::Thread::new_from(identifier, self.events().clone()) } } - -/// A communication channel event. -pub enum Event { - /// A number of messages pushed into the channel. - Pushed(usize), - /// A number of messages pulled from the channel. - Pulled(usize), -} diff --git a/communication/src/allocator/process.rs b/communication/src/allocator/process.rs index 132bc6de5d..07d793684e 100644 --- a/communication/src/allocator/process.rs +++ b/communication/src/allocator/process.rs @@ -5,11 +5,11 @@ use std::cell::RefCell; use std::sync::{Arc, Mutex}; use std::any::Any; use std::time::Duration; -use std::collections::{HashMap, VecDeque}; +use std::collections::{HashMap}; use crossbeam_channel::{Sender, Receiver}; use crate::allocator::thread::{ThreadBuilder}; -use crate::allocator::{Allocate, AllocateBuilder, Event, Thread}; +use crate::allocator::{Allocate, AllocateBuilder, Thread}; use crate::{Push, Pull, Message}; use crate::buzzer::Buzzer; @@ -25,8 +25,8 @@ pub struct ProcessBuilder { buzzers_send: Vec>, buzzers_recv: Vec>, - counters_send: Vec>, - counters_recv: Receiver<(usize, Event)>, + counters_send: Vec>, + counters_recv: Receiver, } impl AllocateBuilder for ProcessBuilder { @@ -63,8 +63,8 @@ pub struct Process { // below: `Box` is a `Box>, Receiver)>>>` channels: Arc>>>, buzzers: Vec, - counters_send: Vec>, - counters_recv: Receiver<(usize, Event)>, + counters_send: Vec>, + counters_recv: Receiver, } impl Process { @@ -174,7 +174,7 @@ impl Allocate for Process { (sends, recv) } - fn events(&self) -> &Rc>> { + fn events(&self) -> &Rc>> { self.inner.events() } @@ -184,8 +184,8 @@ impl Allocate for Process { fn receive(&mut self) { let mut events = self.inner.events().borrow_mut(); - while let Ok((index, event)) = self.counters_recv.try_recv() { - events.push_back((index, event)); + while let Ok(index) = self.counters_recv.try_recv() { + events.push(index); } } } diff --git a/communication/src/allocator/thread.rs b/communication/src/allocator/thread.rs index ba5407e4d7..f46e3532b3 100644 --- a/communication/src/allocator/thread.rs +++ b/communication/src/allocator/thread.rs @@ -5,7 +5,7 @@ use std::cell::RefCell; use std::time::Duration; use std::collections::VecDeque; -use crate::allocator::{Allocate, AllocateBuilder, Event}; +use crate::allocator::{Allocate, AllocateBuilder}; use crate::allocator::counters::Pusher as CountPusher; use crate::allocator::counters::Puller as CountPuller; use crate::{Push, Pull, Message}; @@ -22,7 +22,7 @@ impl AllocateBuilder for ThreadBuilder { /// An allocator for intra-thread communication. pub struct Thread { /// Shared counts of messages in channels. - events: Rc>>, + events: Rc>>, } impl Allocate for Thread { @@ -32,7 +32,7 @@ impl Allocate for Thread { let (pusher, puller) = Thread::new_from(identifier, self.events.clone()); (vec![Box::new(pusher)], Box::new(puller)) } - fn events(&self) -> &Rc>> { + fn events(&self) -> &Rc>> { &self.events } fn await_events(&self, duration: Option) { @@ -56,12 +56,12 @@ impl Thread { /// Allocates a new thread-local channel allocator. pub fn new() -> Self { Thread { - events: Rc::new(RefCell::new(VecDeque::new())), + events: Rc::new(RefCell::new(Default::default())), } } /// Creates a new thread-local channel from an identifier and shared counts. - pub fn new_from(identifier: usize, events: Rc>>) + pub fn new_from(identifier: usize, events: Rc>>) -> (ThreadPusher>, ThreadPuller>) { let shared = Rc::new(RefCell::new((VecDeque::>::new(), VecDeque::>::new()))); diff --git a/communication/src/allocator/zero_copy/allocator.rs b/communication/src/allocator/zero_copy/allocator.rs index 64225ff299..6ef9ef6474 100644 --- a/communication/src/allocator/zero_copy/allocator.rs +++ b/communication/src/allocator/zero_copy/allocator.rs @@ -10,7 +10,6 @@ use crate::networking::MessageHeader; use crate::{Allocate, Message, Data, Push, Pull}; use crate::allocator::AllocateBuilder; -use crate::allocator::Event; use crate::allocator::canary::Canary; use super::bytes_exchange::{BytesPull, SendEndpoint, MergeQueue}; @@ -229,7 +228,7 @@ impl Allocate for TcpAllocator { // Increment message count for channel. // Safe to do this even if the channel has been dropped. - events.push_back((header.channel, Event::Pushed(1))); + events.push(header.channel); // Ensure that a queue exists. match self.to_local.entry(header.channel) { @@ -269,7 +268,7 @@ impl Allocate for TcpAllocator { // } // } } - fn events(&self) -> &Rc>> { + fn events(&self) -> &Rc>> { self.inner.events() } fn await_events(&self, duration: Option) { diff --git a/communication/src/allocator/zero_copy/allocator_process.rs b/communication/src/allocator/zero_copy/allocator_process.rs index dd2815a506..74056ac295 100644 --- a/communication/src/allocator/zero_copy/allocator_process.rs +++ b/communication/src/allocator/zero_copy/allocator_process.rs @@ -10,7 +10,7 @@ use bytes::arc::Bytes; use crate::networking::MessageHeader; use crate::{Allocate, Message, Data, Push, Pull}; -use crate::allocator::{AllocateBuilder, Event}; +use crate::allocator::{AllocateBuilder}; use crate::allocator::canary::Canary; use super::bytes_exchange::{BytesPull, SendEndpoint, MergeQueue}; @@ -77,7 +77,7 @@ impl ProcessBuilder { ProcessAllocator { index: self.index, peers: self.peers, - events: Rc::new(RefCell::new(VecDeque::new())), + events: Rc::new(RefCell::new(Default::default())), canaries: Rc::new(RefCell::new(Vec::new())), channel_id_bound: None, staged: Vec::new(), @@ -103,7 +103,7 @@ pub struct ProcessAllocator { index: usize, // number out of peers peers: usize, // number of peer allocators (for typed channel allocation). - events: Rc>>, + events: Rc>>, canaries: Rc>>, @@ -196,7 +196,7 @@ impl Allocate for ProcessAllocator { // Increment message count for channel. // Safe to do this even if the channel has been dropped. - events.push_back((header.channel, Event::Pushed(1))); + events.push(header.channel); // Ensure that a queue exists. match self.to_local.entry(header.channel) { @@ -237,7 +237,7 @@ impl Allocate for ProcessAllocator { // } } - fn events(&self) -> &Rc>> { + fn events(&self) -> &Rc>> { &self.events } fn await_events(&self, duration: Option) { diff --git a/timely/src/worker.rs b/timely/src/worker.rs index 153f5c775f..3db8d97331 100644 --- a/timely/src/worker.rs +++ b/timely/src/worker.rs @@ -338,8 +338,9 @@ impl Worker { let events = allocator.events().clone(); let mut borrow = events.borrow_mut(); let paths = self.paths.borrow(); - for (channel, _event) in borrow.drain(..) { - // TODO: Pay more attent to `_event`. + borrow.sort_unstable(); + borrow.dedup(); + for channel in borrow.drain(..) { // Consider tracking whether a channel // in non-empty, and only activating // on the basis of non-empty channels.