Skip to content

Commit

Permalink
Make std::time::Instant optional
Browse files Browse the repository at this point in the history
  • Loading branch information
frankmcsherry committed Aug 26, 2024
1 parent e79b3ee commit e0d98d3
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 43 deletions.
10 changes: 5 additions & 5 deletions timely/examples/logging-send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ fn main() {
let mut probe = ProbeHandle::new();

// Register timely worker logging.
worker.log_register().insert::<TimelyEvent,_>("timely", |_time, data|
worker.log_register().unwrap().insert::<TimelyEvent,_>("timely", |_time, data|
data.iter().for_each(|x| println!("LOG1: {:?}", x))
);

// Register timely progress logging.
// Less generally useful: intended for debugging advanced custom operators or timely
// internals.
worker.log_register().insert::<TimelyProgressEvent,_>("timely/progress", |_time, data|
worker.log_register().unwrap().insert::<TimelyProgressEvent,_>("timely/progress", |_time, data|
data.iter().for_each(|x| {
println!("PROGRESS: {:?}", x);
let (_, _, ev) = x;
Expand All @@ -50,7 +50,7 @@ fn main() {
});

// Register timely worker logging.
worker.log_register().insert::<TimelyEvent,_>("timely", |_time, data|
worker.log_register().unwrap().insert::<TimelyEvent,_>("timely", |_time, data|
data.iter().for_each(|x| println!("LOG2: {:?}", x))
);

Expand All @@ -63,13 +63,13 @@ fn main() {
});

// Register user-level logging.
worker.log_register().insert::<(),_>("input", |_time, data|
worker.log_register().unwrap().insert::<(),_>("input", |_time, data|
for element in data.iter() {
println!("Round tick at: {:?}", element.0);
}
);

let input_logger = worker.log_register().get::<()>("input").expect("Input logger absent");
let input_logger = worker.log_register().unwrap().get::<()>("input").expect("Input logger absent");

let timer = std::time::Instant::now();

Expand Down
2 changes: 1 addition & 1 deletion timely/examples/threadless.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ fn main() {

// create a naked single-threaded worker.
let allocator = timely::communication::allocator::Thread::new();
let mut worker = timely::worker::Worker::new(WorkerConfig::default(), allocator);
let mut worker = timely::worker::Worker::new(WorkerConfig::default(), allocator, None);

// create input and probe handles.
let mut input = InputHandle::new();
Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/scopes/child.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ where
fn new_identifier(&mut self) -> usize {
self.parent.new_identifier()
}
fn log_register(&self) -> ::std::cell::RefMut<crate::logging_core::Registry<crate::logging::WorkerIdentifier>> {
fn log_register(&self) -> Option<::std::cell::RefMut<crate::logging_core::Registry<crate::logging::WorkerIdentifier>>> {
self.parent.log_register()
}
}
Expand Down
4 changes: 2 additions & 2 deletions timely/src/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ where
F: FnOnce(&mut Worker<crate::communication::allocator::thread::Thread>)->T+Send+Sync+'static
{
let alloc = crate::communication::allocator::thread::Thread::new();
let mut worker = crate::worker::Worker::new(WorkerConfig::default(), alloc);
let mut worker = crate::worker::Worker::new(WorkerConfig::default(), alloc, Some(std::time::Instant::now()));
let result = func(&mut worker);
while worker.has_dataflows() {
worker.step_or_park(None);
Expand Down Expand Up @@ -320,7 +320,7 @@ where
T: Send+'static,
F: Fn(&mut Worker<<A as AllocateBuilder>::Allocator>)->T+Send+Sync+'static {
initialize_from(builders, others, move |allocator| {
let mut worker = Worker::new(worker_config.clone(), allocator);
let mut worker = Worker::new(worker_config.clone(), allocator, Some(std::time::Instant::now()));
let result = func(&mut worker);
while worker.has_dataflows() {
worker.step_or_park(None);
Expand Down
7 changes: 5 additions & 2 deletions timely/src/progress/subgraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,11 @@ where
let path = self.path.clone();
let reachability_logging =
worker.log_register()
.get::<reachability::logging::TrackerEvent>("timely/reachability")
.map(|logger| reachability::logging::TrackerLogger::new(path, logger));
.as_ref()
.and_then(|l|
l.get::<reachability::logging::TrackerEvent>("timely/reachability")
.map(|logger| reachability::logging::TrackerLogger::new(path, logger))
);
let (tracker, scope_summary) = builder.build(reachability_logging);

let progcaster = Progcaster::new(worker, &self.path, self.logging.clone(), self.progress_logging.clone());
Expand Down
35 changes: 21 additions & 14 deletions timely/src/scheduling/activate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,14 @@ pub struct Activations {
rx: Receiver<Vec<usize>>,

// Delayed activations.
timer: Instant,
timer: Option<Instant>,
queue: BinaryHeap<Reverse<(Duration, Vec<usize>)>>,
}

impl Activations {

/// Creates a new activation tracker.
pub fn new(timer: Instant) -> Self {
pub fn new(timer: Option<Instant>) -> Self {
let (tx, rx) = crossbeam_channel::unbounded();
Self {
clean: 0,
Expand All @@ -77,13 +77,18 @@ impl Activations {

/// Schedules a future activation for the task addressed by `path`.
pub fn activate_after(&mut self, path: &[usize], delay: Duration) {
// TODO: We could have a minimum delay and immediately schedule anything less than that delay.
if delay == Duration::new(0, 0) {
self.activate(path);
}
if let Some(timer) = self.timer {
// TODO: We could have a minimum delay and immediately schedule anything less than that delay.
if delay == Duration::new(0, 0) {
self.activate(path);
}
else {
let moment = timer.elapsed() + delay;
self.queue.push(Reverse((moment, path.to_vec())));
}
}
else {
let moment = self.timer.elapsed() + delay;
self.queue.push(Reverse((moment, path.to_vec())));
self.activate(path);
}
}

Expand All @@ -96,10 +101,12 @@ impl Activations {
}

// Drain timer-based activations.
let now = self.timer.elapsed();
while self.queue.peek().map(|Reverse((t,_))| t <= &now) == Some(true) {
let Reverse((_time, path)) = self.queue.pop().unwrap();
self.activate(&path[..]);
if let Some(timer) = self.timer {
let now = timer.elapsed();
while self.queue.peek().map(|Reverse((t,_))| t <= &now) == Some(true) {
let Reverse((_time, path)) = self.queue.pop().unwrap();
self.activate(&path[..]);
}
}

self.bounds.drain(.. self.clean);
Expand Down Expand Up @@ -171,12 +178,12 @@ impl Activations {
/// indicates the amount of time before the thread should be unparked for the
/// next scheduled activation.
pub fn empty_for(&self) -> Option<Duration> {
if !self.bounds.is_empty() {
if !self.bounds.is_empty() || self.timer.is_none() {
Some(Duration::new(0,0))
}
else {
self.queue.peek().map(|Reverse((t,_a))| {
let elapsed = self.timer.elapsed();
let elapsed = self.timer.unwrap().elapsed();
if t < &elapsed { Duration::new(0,0) }
else { *t - elapsed }
})
Expand Down
40 changes: 22 additions & 18 deletions timely/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,23 +201,27 @@ pub trait AsWorker : Scheduler {
/// Allocates a new worker-unique identifier.
fn new_identifier(&mut self) -> usize;
/// Provides access to named logging streams.
fn log_register(&self) -> ::std::cell::RefMut<crate::logging_core::Registry<crate::logging::WorkerIdentifier>>;
fn log_register(&self) -> Option<::std::cell::RefMut<crate::logging_core::Registry<crate::logging::WorkerIdentifier>>>;
/// Provides access to the timely logging stream.
fn logging(&self) -> Option<crate::logging::TimelyLogger> { self.log_register().get("timely") }
fn logging(&self) -> Option<crate::logging::TimelyLogger> { self.log_register().and_then(|l| l.get("timely")) }
}

/// A `Worker` is the entry point to a timely dataflow computation. It wraps a `Allocate`,
/// and has a list of dataflows that it manages.
pub struct Worker<A: Allocate> {
config: Config,
timer: Instant,
/// An optional instant from which the start of the computation should be reckoned.
///
/// If this is set to none, system time-based functionality will be unavailable or work badly.
/// For example, logging will be unavailable, and activation after a delay will be unavailable.
timer: Option<Instant>,
paths: Rc<RefCell<HashMap<usize, Vec<usize>>>>,
allocator: Rc<RefCell<A>>,
identifiers: Rc<RefCell<usize>>,
// dataflows: Rc<RefCell<Vec<Wrapper>>>,
dataflows: Rc<RefCell<HashMap<usize, Wrapper>>>,
dataflow_counter: Rc<RefCell<usize>>,
logging: Rc<RefCell<crate::logging_core::Registry<crate::logging::WorkerIdentifier>>>,
logging: Option<Rc<RefCell<crate::logging_core::Registry<crate::logging::WorkerIdentifier>>>>,

activations: Rc<RefCell<Activations>>,
active_dataflows: Vec<usize>,
Expand Down Expand Up @@ -247,7 +251,7 @@ impl<A: Allocate> AsWorker for Worker<A> {
}

fn new_identifier(&mut self) -> usize { self.new_identifier() }
fn log_register(&self) -> RefMut<crate::logging_core::Registry<crate::logging::WorkerIdentifier>> {
fn log_register(&self) -> Option<RefMut<crate::logging_core::Registry<crate::logging::WorkerIdentifier>>> {
self.log_register()
}
}
Expand All @@ -260,8 +264,7 @@ impl<A: Allocate> Scheduler for Worker<A> {

impl<A: Allocate> Worker<A> {
/// Allocates a new `Worker` bound to a channel allocator.
pub fn new(config: Config, c: A) -> Worker<A> {
let now = Instant::now();
pub fn new(config: Config, c: A, now: Option<std::time::Instant>) -> Worker<A> {
let index = c.index();
Worker {
config,
Expand All @@ -271,7 +274,7 @@ impl<A: Allocate> Worker<A> {
identifiers: Default::default(),
dataflows: Default::default(),
dataflow_counter: Default::default(),
logging: Rc::new(RefCell::new(crate::logging_core::Registry::new(now, index))),
logging: now.map(|now| Rc::new(RefCell::new(crate::logging_core::Registry::new(now, index)))),
activations: Rc::new(RefCell::new(Activations::new(now))),
active_dataflows: Default::default(),
temp_channel_ids: Default::default(),
Expand Down Expand Up @@ -407,7 +410,7 @@ impl<A: Allocate> Worker<A> {
}

// Clean up, indicate if dataflows remain.
self.logging.borrow_mut().flush();
self.logging.as_ref().map(|l| l.borrow_mut().flush());
self.allocator.borrow_mut().release();
!self.dataflows.borrow().is_empty()
}
Expand Down Expand Up @@ -478,7 +481,7 @@ impl<A: Allocate> Worker<A> {
///
/// let index = worker.index();
/// let peers = worker.peers();
/// let timer = worker.timer();
/// let timer = worker.timer().unwrap();
///
/// println!("{:?}\tWorker {} of {}", timer.elapsed(), index, peers);
///
Expand All @@ -493,7 +496,7 @@ impl<A: Allocate> Worker<A> {
///
/// let index = worker.index();
/// let peers = worker.peers();
/// let timer = worker.timer();
/// let timer = worker.timer().unwrap();
///
/// println!("{:?}\tWorker {} of {}", timer.elapsed(), index, peers);
///
Expand All @@ -509,13 +512,13 @@ impl<A: Allocate> Worker<A> {
///
/// let index = worker.index();
/// let peers = worker.peers();
/// let timer = worker.timer();
/// let timer = worker.timer().unwrap();
///
/// println!("{:?}\tWorker {} of {}", timer.elapsed(), index, peers);
///
/// });
/// ```
pub fn timer(&self) -> Instant { self.timer }
pub fn timer(&self) -> Option<Instant> { self.timer }

/// Allocate a new worker-unique identifier.
///
Expand All @@ -534,13 +537,14 @@ impl<A: Allocate> Worker<A> {
/// timely::execute_from_args(::std::env::args(), |worker| {
///
/// worker.log_register()
/// .unwrap()
/// .insert::<timely::logging::TimelyEvent,_>("timely", |time, data|
/// println!("{:?}\t{:?}", time, data)
/// );
/// });
/// ```
pub fn log_register(&self) -> ::std::cell::RefMut<crate::logging_core::Registry<crate::logging::WorkerIdentifier>> {
self.logging.borrow_mut()
pub fn log_register(&self) -> Option<::std::cell::RefMut<crate::logging_core::Registry<crate::logging::WorkerIdentifier>>> {
self.logging.as_ref().map(|l| l.borrow_mut())
}

/// Construct a new dataflow.
Expand All @@ -563,7 +567,7 @@ impl<A: Allocate> Worker<A> {
T: Refines<()>,
F: FnOnce(&mut Child<Self, T>)->R,
{
let logging = self.logging.borrow_mut().get("timely");
let logging = self.logging.as_ref().map(|l| l.borrow_mut()).and_then(|l| l.get("timely"));
self.dataflow_core("Dataflow", logging, Box::new(()), |_, child| func(child))
}

Expand All @@ -587,7 +591,7 @@ impl<A: Allocate> Worker<A> {
T: Refines<()>,
F: FnOnce(&mut Child<Self, T>)->R,
{
let logging = self.logging.borrow_mut().get("timely");
let logging = self.logging.as_ref().map(|l| l.borrow_mut()).and_then(|l| l.get("timely"));
self.dataflow_core(name, logging, Box::new(()), |_, child| func(child))
}

Expand Down Expand Up @@ -626,7 +630,7 @@ impl<A: Allocate> Worker<A> {
let dataflow_index = self.allocate_dataflow_index();
let identifier = self.new_identifier();

let progress_logging = self.logging.borrow_mut().get("timely/progress");
let progress_logging = self.logging.as_ref().map(|l| l.borrow_mut()).and_then(|l| l.get("timely/progress"));
let subscope = SubgraphBuilder::new_from(dataflow_index, addr, logging.clone(), progress_logging.clone(), name);
let subscope = RefCell::new(subscope);

Expand Down

0 comments on commit e0d98d3

Please sign in to comment.