Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ethexe): Introduce centralized event broadcasting system for services #4503

Merged
merged 11 commits into from
Feb 17, 2025
1 change: 1 addition & 0 deletions ethexe/prometheus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ impl PrometheusConfig {
}
}

#[derive(Debug, Clone)]
pub enum PrometheusEvent {
CollectMetrics,
}
Expand Down
2 changes: 1 addition & 1 deletion ethexe/rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,5 +200,5 @@ impl FusedStream for RpcReceiver {
}
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum RpcEvent {}
132 changes: 126 additions & 6 deletions ethexe/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ use ethexe_common::{
use ethexe_db::{BlockMetaStorage, CodeInfo, CodesStorage, Database};
use ethexe_ethereum::router::RouterQuery;
use ethexe_network::{db_sync, NetworkEvent, NetworkService};
use ethexe_observer::{MockBlobReader, ObserverEvent, ObserverService, RequestBlockData};
use ethexe_observer::{
MockBlobReader, ObserverEvent, ObserverService, RequestBlockData, SimpleBlockData,
};
use ethexe_processor::{LocalOutcome, ProcessorConfig};
use ethexe_prometheus::{PrometheusEvent, PrometheusService};
use ethexe_sequencer::{
Expand All @@ -39,12 +41,100 @@ use futures::StreamExt;
use gprimitives::H256;
use parity_scale_codec::{Decode, Encode};
use std::sync::Arc;
use tokio::sync::{
broadcast::{self, Receiver, Sender},
Mutex,
};

pub mod config;

#[cfg(test)]
mod tests;

#[derive(Debug, Clone)]
pub enum ServiceEvent {
Observer(ObserverEvent),
Sequencer(SequencerEvent),
Network(NetworkEvent),
Prometheus(PrometheusEvent),
Rpc(ethexe_rpc::RpcEvent),
ServiceStarted,
}

pub struct EventsPublisher {
broadcaster: Arc<Mutex<Sender<ServiceEvent>>>,
}

impl EventsPublisher {
pub async fn subscribe(&self) -> EventsListener {
EventsListener {
receiver: self.broadcaster.lock().await.subscribe(),
}
}

pub fn from_broadcaster(broadcaster: Arc<Mutex<Sender<ServiceEvent>>>) -> Self {
Self { broadcaster }
}
}

pub struct EventsListener {
receiver: Receiver<ServiceEvent>,
}

impl Clone for EventsListener {
fn clone(&self) -> Self {
Self {
receiver: self.receiver.resubscribe(),
}
}
}

impl EventsListener {
pub async fn next_event(&mut self) -> Result<ServiceEvent> {
self.receiver.recv().await.map_err(Into::into)
}

pub async fn apply_until<R: Sized>(
&mut self,
mut f: impl FnMut(ServiceEvent) -> Result<Option<R>>,
) -> Result<R> {
loop {
let event = self.next_event().await?;
if let Some(res) = f(event)? {
return Ok(res);
}
}
}

pub async fn apply_until_block_event<R: Sized>(
&mut self,
mut f: impl FnMut(BlockEvent) -> Result<Option<R>>,
) -> Result<R> {
self.apply_until_block_event_with_header(|e, _h| f(e)).await
}

pub async fn apply_until_block_event_with_header<R: Sized>(
&mut self,
mut f: impl FnMut(BlockEvent, &SimpleBlockData) -> Result<Option<R>>,
) -> Result<R> {
loop {
let event = self.next_event().await?;

let ServiceEvent::Observer(ObserverEvent::Block(block)) = event else {
continue;
};

let block_data = block.to_simple();

for event in block.events {
if let Some(res) = f(event, &block_data)? {
return Ok(res);
}
}
}
}
}

/// ethexe service.
pub struct Service {
db: Database,
Expand All @@ -60,6 +150,9 @@ pub struct Service {
validator: Option<ethexe_validator::Validator>,
prometheus: Option<PrometheusService>,
rpc: Option<ethexe_rpc::RpcService>,

// Event broadcasting
event_sender: Arc<Mutex<Sender<ServiceEvent>>>,
}

// TODO: consider to move this to another module #4176
Expand All @@ -79,7 +172,16 @@ pub enum NetworkMessage {
}

impl Service {
pub fn events(&self) -> EventsPublisher {
EventsPublisher {
broadcaster: self.event_sender.clone(),
}
}

pub async fn new(config: &Config) -> Result<Self> {
let (sender, _) = broadcast::channel(2048); // Buffer size of 2048 events
let event_sender = Arc::new(Mutex::new(sender));

let mock_blob_reader: Option<Arc<MockBlobReader>> = if config.node.dev {
Some(Arc::new(MockBlobReader::new(config.ethereum.block_time)))
} else {
Expand Down Expand Up @@ -238,6 +340,7 @@ impl Service {
validator,
prometheus,
rpc,
event_sender,
})
}

Expand All @@ -263,6 +366,7 @@ impl Service {
validator: Option<ethexe_validator::Validator>,
prometheus: Option<PrometheusService>,
rpc: Option<ethexe_rpc::RpcService>,
event_sender: Arc<Mutex<Sender<ServiceEvent>>>,
) -> Self {
Self {
db,
Expand All @@ -276,6 +380,7 @@ impl Service {
validator,
prometheus,
rpc,
event_sender,
}
}

Expand Down Expand Up @@ -432,7 +537,12 @@ impl Service {
mut validator,
mut prometheus,
rpc,
event_sender,
} = self;

// Broadcast service started event
let _ = event_sender.lock().await.send(ServiceEvent::ServiceStarted);

let (mut rpc_handle, mut rpc_receiver) = if let Some(rpc) = rpc {
log::info!("🌐 Rpc server starting at: {}", rpc.port());

Expand All @@ -455,7 +565,9 @@ impl Service {
loop {
tokio::select! {
event = observer.select_next_some() => {
match event? {
let event = event?;
let _ = event_sender.lock().await.send(ServiceEvent::Observer(event.clone()));
match event {
ObserverEvent::Blob { code_id, timestamp, code } => {
// TODO: spawn blocking here?
let valid = processor.process_upload_code_raw(code_id, code.as_slice())?;
Expand Down Expand Up @@ -540,6 +652,8 @@ impl Service {
}
},
event = sequencer.maybe_next_some() => {
let _ = event_sender.lock().await.send(ServiceEvent::Sequencer(event.clone()));

let Some(s) = sequencer.as_mut() else {
unreachable!("couldn't produce event without sequencer");
};
Expand Down Expand Up @@ -597,6 +711,8 @@ impl Service {
}
},
event = network.maybe_next_some() => {
let _ = event_sender.lock().await.send(ServiceEvent::Network(event.clone()));

match event {
NetworkEvent::Message { source, data } => {
log::trace!("Received a network message from peer {source:?}");
Expand Down Expand Up @@ -660,8 +776,11 @@ impl Service {
.request_validated(res);
}
_ => {}
}},
}
},
event = prometheus.maybe_next_some() => {
let _ = event_sender.lock().await.send(ServiceEvent::Prometheus(event.clone()));

let Some(p) = prometheus.as_mut() else {
unreachable!("couldn't produce event without prometheus");
};
Expand All @@ -685,10 +804,11 @@ impl Service {
};
}
}
}
},
event = rpc_receiver.maybe_next_some() => {
log::info!("Received RPC event {event:#?}");
}
let _ = event_sender.lock().await.send(ServiceEvent::Rpc(event.clone()));

},
_ = rpc_handle.as_mut().maybe() => {
log::info!("`RPCWorker` has terminated, shutting down...");
}
Expand Down
Loading
Loading