Skip to content

Commit

Permalink
Turn EthService into a Stream and log the events (paradigmxyz#9622)
Browse files Browse the repository at this point in the history
Co-authored-by: Miguel T <[email protected]>
  • Loading branch information
junderw and mvares authored Jul 18, 2024
1 parent 820d3da commit 4a19161
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 17 deletions.
19 changes: 6 additions & 13 deletions crates/ethereum/engine/src/service.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use futures::{ready, StreamExt};
use futures::{Stream, StreamExt};
use pin_project::pin_project;
use reth_beacon_consensus::{BeaconEngineMessage, EthBeaconConsensus};
use reth_chainspec::ChainSpec;
use reth_db_api::database::Database;
use reth_engine_tree::{
backfill::PipelineSync,
chain::ChainOrchestrator,
chain::{ChainEvent, ChainOrchestrator},
download::BasicBlockDownloader,
engine::{EngineApiEvent, EngineApiRequestHandler, EngineHandler, FromEngine},
};
Expand All @@ -14,7 +14,6 @@ use reth_network_p2p::{bodies::client::BodiesClient, headers::client::HeadersCli
use reth_stages_api::Pipeline;
use reth_tasks::TaskSpawner;
use std::{
future::Future,
pin::Pin,
sync::{mpsc::Sender, Arc},
task::{Context, Poll},
Expand Down Expand Up @@ -70,22 +69,16 @@ where
}
}

impl<DB, Client> Future for EthService<DB, Client>
impl<DB, Client> Stream for EthService<DB, Client>
where
DB: Database + 'static,
Client: HeadersClient + BodiesClient + Clone + Unpin + 'static,
{
type Output = Result<(), EthServiceError>;
type Item = ChainEvent<EngineApiEvent>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// Call poll on the inner orchestrator.
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut orchestrator = self.project().orchestrator;
loop {
match ready!(StreamExt::poll_next_unpin(&mut orchestrator, cx)) {
Some(_event) => continue,
None => return Poll::Ready(Ok(())),
}
}
StreamExt::poll_next_unpin(&mut orchestrator, cx)
}
}

Expand Down
10 changes: 6 additions & 4 deletions crates/ethereum/node/src/launch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ where
let (_from_tree_tx, from_tree_rx) = unbounded_channel();

// Configure the consensus engine
let eth_service = EthService::new(
let mut eth_service = EthService::new(
ctx.chain_spec(),
network_client.clone(),
// to tree
Expand Down Expand Up @@ -243,8 +243,10 @@ where
let (tx, rx) = oneshot::channel();
info!(target: "reth::cli", "Starting consensus engine");
ctx.task_executor().spawn_critical_blocking("consensus engine", async move {
let res = eth_service.await;
let _ = tx.send(res);
while let Some(event) = eth_service.next().await {
info!(target: "reth::cli", "Event: {event:?}");
}
let _ = tx.send(());
});

let full_node = FullNode {
Expand All @@ -265,7 +267,7 @@ where

let handle = NodeHandle {
node_exit_future: NodeExitFuture::new(
async { Ok(rx.await??) },
async { Ok(rx.await?) },
full_node.config.debug.terminate,
),
node: full_node,
Expand Down

0 comments on commit 4a19161

Please sign in to comment.