diff --git a/crates/scheduler/Cargo.toml b/crates/scheduler/Cargo.toml index 9964e46..0eaeead 100644 --- a/crates/scheduler/Cargo.toml +++ b/crates/scheduler/Cargo.toml @@ -4,6 +4,7 @@ version = "0.1.0" edition = "2021" [dependencies] +log = "0.4.17" anyhow = "1.0.70" inject = { path = "../inject" } util = { path = "../util" } \ No newline at end of file diff --git a/crates/scheduler/src/bus.rs b/crates/scheduler/src/bus.rs index 9804417..70cabaf 100644 --- a/crates/scheduler/src/bus.rs +++ b/crates/scheduler/src/bus.rs @@ -1,23 +1,28 @@ +use std::any::type_name; use std::ops::Deref; use std::sync::Arc; use anyhow::Result; use inject::ErasedStorage; +use log::warn; use util::RwLock; use crate::caller::Caller; use crate::event::{Event, EventContext}; use crate::handler::Handler; use crate::system::{StoredSystem, System}; +use crate::{SinkCaller, SinkHandler}; struct TypedEventBus { systems: Vec>>, + sink: Option>>, } impl TypedEventBus { pub fn new() -> Self { Self { systems: vec![], + sink: None, } } @@ -30,11 +35,32 @@ impl TypedEventBus { self.systems.push(Box::new(system)); } + /// Register a sink handler for this event. Overwrites any old sink handler (with a warning) + pub fn register_sink( + &mut self, + system: StoredSystem, + handler: impl SinkHandler + 'static, + ) { + if self.sink.is_some() { + warn!( + "Overwriting sink handler for event {} with new system {}", + type_name::(), + type_name::() + ); + } + system.subscribe_sink(handler); + self.sink = Some(Box::new(system)); + } + fn publish(&self, event: E, context: &mut EventContext) -> Result> { let mut results = Vec::with_capacity(self.systems.len()); for system in &self.systems { results.push(system.call(&event, context)?); } + + if let Some(sink) = &self.sink { + results.push(sink.call(event, context)?); + } Ok(results) } } @@ -136,6 +162,18 @@ impl EventBus { }); } + /// Subscribe to be the sink of an event. Each event can only have one sink. This handler is called after all + /// other handlers of the event, and receives ownership of the event when called. + pub fn subscribe_sink( + &self, + system: &StoredSystem, + handler: impl SinkHandler + 'static, + ) { + self.with_event_bus(|bus| { + bus.write().unwrap().register_sink(system.clone(), handler); + }); + } + /// Publish an event to the bus pub fn publish(&self, event: E) -> Result> { // Note: We only lock the entire bus for a short time to get access to the registry. diff --git a/crates/scheduler/src/caller.rs b/crates/scheduler/src/caller.rs index c01af92..220d118 100644 --- a/crates/scheduler/src/caller.rs +++ b/crates/scheduler/src/caller.rs @@ -5,3 +5,7 @@ use crate::event::{Event, EventContext}; pub trait Caller { fn call(&self, event: &E, context: &mut EventContext) -> Result; } + +pub trait SinkCaller { + fn call(&self, event: E, context: &mut EventContext) -> Result; +} diff --git a/crates/scheduler/src/system.rs b/crates/scheduler/src/system.rs index d470559..8cf2e8f 100644 --- a/crates/scheduler/src/system.rs +++ b/crates/scheduler/src/system.rs @@ -7,7 +7,7 @@ use crate::bus::EventBus; use crate::caller::Caller; use crate::event::{Event, EventContext}; use crate::handler::Handler; -use crate::SinkHandler; +use crate::{SinkCaller, SinkHandler}; /// A system must implement this to subscribe to events on the bus pub trait System { @@ -92,3 +92,9 @@ impl Caller for StoredSystem SinkCaller for StoredSystem { + fn call(&self, event: E, context: &mut EventContext) -> Result { + self.0.lock().unwrap().handle_sink(event, context) + } +}