Skip to content

Commit

Permalink
Activate only by channel ID (#526)
Browse files Browse the repository at this point in the history
* Activate only by channel ID

Signed-off-by: Moritz Hoffmann <[email protected]>

* Remove Event

Signed-off-by: Moritz Hoffmann <[email protected]>

* Remove comment

Signed-off-by: Moritz Hoffmann <[email protected]>

---------

Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru authored Feb 5, 2024
1 parent 08b2087 commit b869dcb
Show file tree
Hide file tree
Showing 8 changed files with 38 additions and 50 deletions.
22 changes: 10 additions & 12 deletions communication/src/allocator/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,21 @@
use std::rc::Rc;
use std::cell::RefCell;
use std::collections::VecDeque;

use crate::{Push, Pull};
use crate::allocator::Event;

/// The push half of an intra-thread channel.
pub struct Pusher<T, P: Push<T>> {
index: usize,
// count: usize,
events: Rc<RefCell<VecDeque<(usize, Event)>>>,
events: Rc<RefCell<Vec<usize>>>,
pusher: P,
phantom: ::std::marker::PhantomData<T>,
}

impl<T, P: Push<T>> Pusher<T, P> {
/// Wraps a pusher with a message counter.
pub fn new(pusher: P, index: usize, events: Rc<RefCell<VecDeque<(usize, Event)>>>) -> Self {
pub fn new(pusher: P, index: usize, events: Rc<RefCell<Vec<usize>>>) -> Self {
Pusher {
index,
// count: 0,
Expand All @@ -36,7 +34,7 @@ impl<T, P: Push<T>> Push<T> for Pusher<T, P> {
// if self.count != 0 {
// self.events
// .borrow_mut()
// .push_back((self.index, Event::Pushed(self.count)));
// .push_back(self.index);
// self.count = 0;
// }
// }
Expand All @@ -47,7 +45,7 @@ impl<T, P: Push<T>> Push<T> for Pusher<T, P> {
// moving information along. Better, but needs cooperation.
self.events
.borrow_mut()
.push_back((self.index, Event::Pushed(1)));
.push(self.index);

self.pusher.push(element)
}
Expand All @@ -59,15 +57,15 @@ use crossbeam_channel::Sender;
pub struct ArcPusher<T, P: Push<T>> {
index: usize,
// count: usize,
events: Sender<(usize, Event)>,
events: Sender<usize>,
pusher: P,
phantom: ::std::marker::PhantomData<T>,
buzzer: crate::buzzer::Buzzer,
}

impl<T, P: Push<T>> ArcPusher<T, P> {
/// Wraps a pusher with a message counter.
pub fn new(pusher: P, index: usize, events: Sender<(usize, Event)>, buzzer: crate::buzzer::Buzzer) -> Self {
pub fn new(pusher: P, index: usize, events: Sender<usize>, buzzer: crate::buzzer::Buzzer) -> Self {
ArcPusher {
index,
// count: 0,
Expand Down Expand Up @@ -99,7 +97,7 @@ impl<T, P: Push<T>> Push<T> for ArcPusher<T, P> {
// and finally awaken the thread. Other orders are defective when
// multiple threads are involved.
self.pusher.push(element);
let _ = self.events.send((self.index, Event::Pushed(1)));
let _ = self.events.send(self.index);
// TODO : Perhaps this shouldn't be a fatal error (e.g. in shutdown).
// .expect("Failed to send message count");
self.buzzer.buzz();
Expand All @@ -110,14 +108,14 @@ impl<T, P: Push<T>> Push<T> for ArcPusher<T, P> {
pub struct Puller<T, P: Pull<T>> {
index: usize,
count: usize,
events: Rc<RefCell<VecDeque<(usize, Event)>>>,
events: Rc<RefCell<Vec<usize>>>,
puller: P,
phantom: ::std::marker::PhantomData<T>,
}

impl<T, P: Pull<T>> Puller<T, P> {
/// Wraps a puller with a message counter.
pub fn new(puller: P, index: usize, events: Rc<RefCell<VecDeque<(usize, Event)>>>) -> Self {
pub fn new(puller: P, index: usize, events: Rc<RefCell<Vec<usize>>>) -> Self {
Puller {
index,
count: 0,
Expand All @@ -135,7 +133,7 @@ impl<T, P: Pull<T>> Pull<T> for Puller<T, P> {
if self.count != 0 {
self.events
.borrow_mut()
.push_back((self.index, Event::Pulled(self.count)));
.push(self.index);
self.count = 0;
}
}
Expand Down
7 changes: 3 additions & 4 deletions communication/src/allocator/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@
use std::rc::Rc;
use std::cell::RefCell;
use std::collections::VecDeque;

use crate::allocator::thread::ThreadBuilder;
use crate::allocator::process::ProcessBuilder as TypedProcessBuilder;
use crate::allocator::{Allocate, AllocateBuilder, Event, Thread, Process};
use crate::allocator::{Allocate, AllocateBuilder, Thread, Process};
use crate::allocator::zero_copy::allocator_process::{ProcessBuilder, ProcessAllocator};
use crate::allocator::zero_copy::allocator::{TcpBuilder, TcpAllocator};

Expand Down Expand Up @@ -74,7 +73,7 @@ impl Generic {
Generic::ZeroCopy(z) => z.release(),
}
}
fn events(&self) -> &Rc<RefCell<VecDeque<(usize, Event)>>> {
fn events(&self) -> &Rc<RefCell<Vec<usize>>> {
match self {
Generic::Thread(ref t) => t.events(),
Generic::Process(ref p) => p.events(),
Expand All @@ -93,7 +92,7 @@ impl Allocate for Generic {

fn receive(&mut self) { self.receive(); }
fn release(&mut self) { self.release(); }
fn events(&self) -> &Rc<RefCell<VecDeque<(usize, Event)>>> { self.events() }
fn events(&self) -> &Rc<RefCell<Vec<usize>>> { self.events() }
fn await_events(&self, _duration: Option<std::time::Duration>) {
match self {
Generic::Thread(t) => t.await_events(_duration),
Expand Down
11 changes: 1 addition & 10 deletions communication/src/allocator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
use std::rc::Rc;
use std::cell::RefCell;
use std::time::Duration;
use std::collections::VecDeque;

pub use self::thread::Thread;
pub use self::process::Process;
Expand Down Expand Up @@ -50,7 +49,7 @@ pub trait Allocate {
/// drain these events in order to drive their computation. If they
/// fail to do so the event queue may become quite large, and turn
/// into a performance problem.
fn events(&self) -> &Rc<RefCell<VecDeque<(usize, Event)>>>;
fn events(&self) -> &Rc<RefCell<Vec<usize>>>;

/// Awaits communication events.
///
Expand Down Expand Up @@ -92,11 +91,3 @@ pub trait Allocate {
thread::Thread::new_from(identifier, self.events().clone())
}
}

/// A communication channel event.
pub enum Event {
/// A number of messages pushed into the channel.
Pushed(usize),
/// A number of messages pulled from the channel.
Pulled(usize),
}
18 changes: 9 additions & 9 deletions communication/src/allocator/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ use std::cell::RefCell;
use std::sync::{Arc, Mutex};
use std::any::Any;
use std::time::Duration;
use std::collections::{HashMap, VecDeque};
use std::collections::{HashMap};
use crossbeam_channel::{Sender, Receiver};

use crate::allocator::thread::{ThreadBuilder};
use crate::allocator::{Allocate, AllocateBuilder, Event, Thread};
use crate::allocator::{Allocate, AllocateBuilder, Thread};
use crate::{Push, Pull, Message};
use crate::buzzer::Buzzer;

Expand All @@ -25,8 +25,8 @@ pub struct ProcessBuilder {
buzzers_send: Vec<Sender<Buzzer>>,
buzzers_recv: Vec<Receiver<Buzzer>>,

counters_send: Vec<Sender<(usize, Event)>>,
counters_recv: Receiver<(usize, Event)>,
counters_send: Vec<Sender<usize>>,
counters_recv: Receiver<usize>,
}

impl AllocateBuilder for ProcessBuilder {
Expand Down Expand Up @@ -63,8 +63,8 @@ pub struct Process {
// below: `Box<Any+Send>` is a `Box<Vec<Option<(Vec<Sender<T>>, Receiver<T>)>>>`
channels: Arc<Mutex<HashMap</* channel id */ usize, Box<dyn Any+Send>>>>,
buzzers: Vec<Buzzer>,
counters_send: Vec<Sender<(usize, Event)>>,
counters_recv: Receiver<(usize, Event)>,
counters_send: Vec<Sender<usize>>,
counters_recv: Receiver<usize>,
}

impl Process {
Expand Down Expand Up @@ -174,7 +174,7 @@ impl Allocate for Process {
(sends, recv)
}

fn events(&self) -> &Rc<RefCell<VecDeque<(usize, Event)>>> {
fn events(&self) -> &Rc<RefCell<Vec<usize>>> {
self.inner.events()
}

Expand All @@ -184,8 +184,8 @@ impl Allocate for Process {

fn receive(&mut self) {
let mut events = self.inner.events().borrow_mut();
while let Ok((index, event)) = self.counters_recv.try_recv() {
events.push_back((index, event));
while let Ok(index) = self.counters_recv.try_recv() {
events.push(index);
}
}
}
Expand Down
10 changes: 5 additions & 5 deletions communication/src/allocator/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::cell::RefCell;
use std::time::Duration;
use std::collections::VecDeque;

use crate::allocator::{Allocate, AllocateBuilder, Event};
use crate::allocator::{Allocate, AllocateBuilder};
use crate::allocator::counters::Pusher as CountPusher;
use crate::allocator::counters::Puller as CountPuller;
use crate::{Push, Pull, Message};
Expand All @@ -22,7 +22,7 @@ impl AllocateBuilder for ThreadBuilder {
/// An allocator for intra-thread communication.
pub struct Thread {
/// Shared counts of messages in channels.
events: Rc<RefCell<VecDeque<(usize, Event)>>>,
events: Rc<RefCell<Vec<usize>>>,
}

impl Allocate for Thread {
Expand All @@ -32,7 +32,7 @@ impl Allocate for Thread {
let (pusher, puller) = Thread::new_from(identifier, self.events.clone());
(vec![Box::new(pusher)], Box::new(puller))
}
fn events(&self) -> &Rc<RefCell<VecDeque<(usize, Event)>>> {
fn events(&self) -> &Rc<RefCell<Vec<usize>>> {
&self.events
}
fn await_events(&self, duration: Option<Duration>) {
Expand All @@ -56,12 +56,12 @@ impl Thread {
/// Allocates a new thread-local channel allocator.
pub fn new() -> Self {
Thread {
events: Rc::new(RefCell::new(VecDeque::new())),
events: Rc::new(RefCell::new(Default::default())),
}
}

/// Creates a new thread-local channel from an identifier and shared counts.
pub fn new_from<T: 'static>(identifier: usize, events: Rc<RefCell<VecDeque<(usize, Event)>>>)
pub fn new_from<T: 'static>(identifier: usize, events: Rc<RefCell<Vec<usize>>>)
-> (ThreadPusher<Message<T>>, ThreadPuller<Message<T>>)
{
let shared = Rc::new(RefCell::new((VecDeque::<Message<T>>::new(), VecDeque::<Message<T>>::new())));
Expand Down
5 changes: 2 additions & 3 deletions communication/src/allocator/zero_copy/allocator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use crate::networking::MessageHeader;

use crate::{Allocate, Message, Data, Push, Pull};
use crate::allocator::AllocateBuilder;
use crate::allocator::Event;
use crate::allocator::canary::Canary;

use super::bytes_exchange::{BytesPull, SendEndpoint, MergeQueue};
Expand Down Expand Up @@ -229,7 +228,7 @@ impl<A: Allocate> Allocate for TcpAllocator<A> {

// Increment message count for channel.
// Safe to do this even if the channel has been dropped.
events.push_back((header.channel, Event::Pushed(1)));
events.push(header.channel);

// Ensure that a queue exists.
match self.to_local.entry(header.channel) {
Expand Down Expand Up @@ -269,7 +268,7 @@ impl<A: Allocate> Allocate for TcpAllocator<A> {
// }
// }
}
fn events(&self) -> &Rc<RefCell<VecDeque<(usize, Event)>>> {
fn events(&self) -> &Rc<RefCell<Vec<usize>>> {
self.inner.events()
}
fn await_events(&self, duration: Option<std::time::Duration>) {
Expand Down
10 changes: 5 additions & 5 deletions communication/src/allocator/zero_copy/allocator_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use bytes::arc::Bytes;
use crate::networking::MessageHeader;

use crate::{Allocate, Message, Data, Push, Pull};
use crate::allocator::{AllocateBuilder, Event};
use crate::allocator::{AllocateBuilder};
use crate::allocator::canary::Canary;

use super::bytes_exchange::{BytesPull, SendEndpoint, MergeQueue};
Expand Down Expand Up @@ -77,7 +77,7 @@ impl ProcessBuilder {
ProcessAllocator {
index: self.index,
peers: self.peers,
events: Rc::new(RefCell::new(VecDeque::new())),
events: Rc::new(RefCell::new(Default::default())),
canaries: Rc::new(RefCell::new(Vec::new())),
channel_id_bound: None,
staged: Vec::new(),
Expand All @@ -103,7 +103,7 @@ pub struct ProcessAllocator {
index: usize, // number out of peers
peers: usize, // number of peer allocators (for typed channel allocation).

events: Rc<RefCell<VecDeque<(usize, Event)>>>,
events: Rc<RefCell<Vec<usize>>>,

canaries: Rc<RefCell<Vec<usize>>>,

Expand Down Expand Up @@ -196,7 +196,7 @@ impl Allocate for ProcessAllocator {

// Increment message count for channel.
// Safe to do this even if the channel has been dropped.
events.push_back((header.channel, Event::Pushed(1)));
events.push(header.channel);

// Ensure that a queue exists.
match self.to_local.entry(header.channel) {
Expand Down Expand Up @@ -237,7 +237,7 @@ impl Allocate for ProcessAllocator {
// }
}

fn events(&self) -> &Rc<RefCell<VecDeque<(usize, Event)>>> {
fn events(&self) -> &Rc<RefCell<Vec<usize>>> {
&self.events
}
fn await_events(&self, duration: Option<std::time::Duration>) {
Expand Down
5 changes: 3 additions & 2 deletions timely/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,8 +338,9 @@ impl<A: Allocate> Worker<A> {
let events = allocator.events().clone();
let mut borrow = events.borrow_mut();
let paths = self.paths.borrow();
for (channel, _event) in borrow.drain(..) {
// TODO: Pay more attent to `_event`.
borrow.sort_unstable();
borrow.dedup();
for channel in borrow.drain(..) {
// Consider tracking whether a channel
// in non-empty, and only activating
// on the basis of non-empty channels.
Expand Down

0 comments on commit b869dcb

Please sign in to comment.