Skip to content

Commit

Permalink
add: sink handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
NotAPenguin0 committed May 22, 2023
1 parent e4b06d6 commit 4932b69
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 1 deletion.
1 change: 1 addition & 0 deletions crates/scheduler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ version = "0.1.0"
edition = "2021"

[dependencies]
log = "0.4.17"
anyhow = "1.0.70"
inject = { path = "../inject" }
util = { path = "../util" }
38 changes: 38 additions & 0 deletions crates/scheduler/src/bus.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,28 @@
use std::any::type_name;
use std::ops::Deref;
use std::sync::Arc;

use anyhow::Result;
use inject::ErasedStorage;
use log::warn;
use util::RwLock;

use crate::caller::Caller;
use crate::event::{Event, EventContext};
use crate::handler::Handler;
use crate::system::{StoredSystem, System};
use crate::{SinkCaller, SinkHandler};

struct TypedEventBus<E: Event, T> {
systems: Vec<Box<dyn Caller<E, T>>>,
sink: Option<Box<dyn SinkCaller<E, T>>>,
}

impl<E: Event + 'static, T: 'static> TypedEventBus<E, T> {
pub fn new() -> Self {
Self {
systems: vec![],
sink: None,
}
}

Expand All @@ -30,11 +35,32 @@ impl<E: Event + 'static, T: 'static> TypedEventBus<E, T> {
self.systems.push(Box::new(system));
}

/// Register a sink handler for this event. Overwrites any old sink handler (with a warning)
pub fn register_sink<S: 'static>(
&mut self,
system: StoredSystem<S>,
handler: impl SinkHandler<S, E, T> + 'static,
) {
if self.sink.is_some() {
warn!(
"Overwriting sink handler for event {} with new system {}",
type_name::<E>(),
type_name::<S>()
);
}
system.subscribe_sink(handler);
self.sink = Some(Box::new(system));
}

fn publish(&self, event: E, context: &mut EventContext<T>) -> Result<Vec<E::Result>> {
let mut results = Vec::with_capacity(self.systems.len());
for system in &self.systems {
results.push(system.call(&event, context)?);
}

if let Some(sink) = &self.sink {
results.push(sink.call(event, context)?);
}
Ok(results)
}
}
Expand Down Expand Up @@ -136,6 +162,18 @@ impl<T: Clone + Send + Sync + 'static> EventBus<T> {
});
}

/// Subscribe to be the sink of an event. Each event can only have one sink. This handler is called after all
/// other handlers of the event, and receives ownership of the event when called.
pub fn subscribe_sink<S: 'static, E: Event + 'static>(
&self,
system: &StoredSystem<S>,
handler: impl SinkHandler<S, E, T> + 'static,
) {
self.with_event_bus(|bus| {
bus.write().unwrap().register_sink(system.clone(), handler);
});
}

/// Publish an event to the bus
pub fn publish<E: Event + 'static>(&self, event: E) -> Result<Vec<E::Result>> {
// Note: We only lock the entire bus for a short time to get access to the registry.
Expand Down
4 changes: 4 additions & 0 deletions crates/scheduler/src/caller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,7 @@ use crate::event::{Event, EventContext};
pub trait Caller<E: Event + 'static, T> {
fn call(&self, event: &E, context: &mut EventContext<T>) -> Result<E::Result>;
}

pub trait SinkCaller<E: Event + 'static, T> {
fn call(&self, event: E, context: &mut EventContext<T>) -> Result<E::Result>;
}
8 changes: 7 additions & 1 deletion crates/scheduler/src/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::bus::EventBus;
use crate::caller::Caller;
use crate::event::{Event, EventContext};
use crate::handler::Handler;
use crate::SinkHandler;
use crate::{SinkCaller, SinkHandler};

/// A system must implement this to subscribe to events on the bus
pub trait System<T> {
Expand Down Expand Up @@ -92,3 +92,9 @@ impl<S: 'static, E: Event + 'static, T: 'static> Caller<E, T> for StoredSystem<S
self.0.lock().unwrap().handle(event, context)
}
}

impl<S: 'static, E: Event + 'static, T: 'static> SinkCaller<E, T> for StoredSystem<S> {
fn call(&self, event: E, context: &mut EventContext<T>) -> Result<E::Result> {
self.0.lock().unwrap().handle_sink(event, context)
}
}

0 comments on commit 4932b69

Please sign in to comment.