Skip to content

Commit f3674d8

Browse files
authored
feat: Set synced status to idle after chain consolidation (#391)
* issue fork choice update on startup to sync the EN * set synced status to idle after chain is consolidated * clean up
1 parent 07c1668 commit f3674d8

File tree

2 files changed

+89
-29
lines changed

2 files changed

+89
-29
lines changed

crates/chain-orchestrator/src/lib.rs

Lines changed: 26 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use alloy_rpc_types_engine::ExecutionPayloadV1;
88
use futures::StreamExt;
99
use reth_chainspec::EthChainSpec;
1010
use reth_network_api::{BlockDownloaderProvider, FullNetwork};
11-
use reth_network_p2p::FullBlockClient;
11+
use reth_network_p2p::{sync::SyncState as RethSyncState, FullBlockClient};
1212
use reth_scroll_node::ScrollNetworkPrimitives;
1313
use reth_scroll_primitives::ScrollBlock;
1414
use reth_tasks::shutdown::Shutdown;
@@ -1081,36 +1081,34 @@ impl<
10811081

10821082
if head_block_number == safe_block_number {
10831083
tracing::trace!(target: "scroll::chain_orchestrator", "No unsafe blocks to consolidate");
1084+
} else {
1085+
let start_block_number = safe_block_number + 1;
1086+
// TODO: Make fetching parallel but ensure concurrency limits are respected.
1087+
let mut blocks_to_validate = vec![];
1088+
for block_number in start_block_number..=head_block_number {
1089+
let block = self
1090+
.l2_client
1091+
.get_block_by_number(block_number.into())
1092+
.full()
1093+
.await?
1094+
.ok_or(ChainOrchestratorError::L2BlockNotFoundInL2Client(block_number))?
1095+
.into_consensus()
1096+
.map_transactions(|tx| tx.inner.into_inner());
1097+
blocks_to_validate.push(block);
1098+
}
10841099

1085-
self.notify(ChainOrchestratorEvent::ChainConsolidated {
1086-
from: safe_block_number,
1087-
to: head_block_number,
1088-
});
1089-
return Ok(());
1090-
}
1100+
self.validate_l1_messages(&blocks_to_validate).await?;
10911101

1092-
let start_block_number = safe_block_number + 1;
1093-
// TODO: Make fetching parallel but ensure concurrency limits are respected.
1094-
let mut blocks_to_validate = vec![];
1095-
for block_number in start_block_number..=head_block_number {
1096-
let block = self
1097-
.l2_client
1098-
.get_block_by_number(block_number.into())
1099-
.full()
1100-
.await?
1101-
.ok_or(ChainOrchestratorError::L2BlockNotFoundInL2Client(block_number))?
1102-
.into_consensus()
1103-
.map_transactions(|tx| tx.inner.into_inner());
1104-
blocks_to_validate.push(block);
1105-
}
1106-
1107-
self.validate_l1_messages(&blocks_to_validate).await?;
1102+
self.database
1103+
.update_l1_messages_from_l2_blocks(
1104+
blocks_to_validate.into_iter().map(|b| (&b).into()).collect(),
1105+
)
1106+
.await?;
1107+
};
11081108

1109-
self.database
1110-
.update_l1_messages_from_l2_blocks(
1111-
blocks_to_validate.into_iter().map(|b| (&b).into()).collect(),
1112-
)
1113-
.await?;
1109+
// send a notification to the network that the chain is synced such that it accepts
1110+
// transactions into the transaction pool.
1111+
self.network.handle().inner().update_sync_state(RethSyncState::Idle);
11141112

11151113
self.notify(ChainOrchestratorEvent::ChainConsolidated {
11161114
from: safe_block_number,

crates/node/tests/sync.rs

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use reth_tokio_util::EventStream;
1010
use rollup_node::{
1111
test_utils::{
1212
default_sequencer_test_scroll_rollup_node_config, default_test_scroll_rollup_node_config,
13-
setup_engine,
13+
generate_tx, setup_engine,
1414
},
1515
BlobProviderArgs, ChainOrchestratorArgs, ConsensusArgs, EngineDriverArgs, L1ProviderArgs,
1616
RollupNodeDatabaseArgs, RollupNodeGasPriceOracleArgs, RollupNodeNetworkArgs, RpcArgs,
@@ -98,6 +98,68 @@ async fn test_should_consolidate_to_block_15k() -> eyre::Result<()> {
9898
Ok(())
9999
}
100100

101+
#[allow(clippy::large_stack_frames)]
102+
#[tokio::test]
103+
async fn test_node_produces_block_on_startup() -> eyre::Result<()> {
104+
reth_tracing::init_test_tracing();
105+
106+
let mut sequencer_node_config = default_sequencer_test_scroll_rollup_node_config();
107+
sequencer_node_config.sequencer_args.auto_start = true;
108+
sequencer_node_config.sequencer_args.allow_empty_blocks = false;
109+
110+
let (mut nodes, _tasks, wallet) =
111+
setup_engine(sequencer_node_config, 2, (*SCROLL_DEV).clone(), false, false).await?;
112+
113+
let follower = nodes.pop().unwrap();
114+
let mut follower_events =
115+
follower.inner.add_ons_handle.rollup_manager_handle.get_event_listener().await?;
116+
let follower_l1_watcher_tx = follower.inner.add_ons_handle.l1_watcher_tx.clone().unwrap();
117+
118+
let sequencer = nodes.pop().unwrap();
119+
let mut sequencer_events =
120+
sequencer.inner.add_ons_handle.rollup_manager_handle.get_event_listener().await?;
121+
let sequencer_l1_watcher_tx = sequencer.inner.add_ons_handle.l1_watcher_tx.clone().unwrap();
122+
123+
// Send a notification to the sequencer and follower nodes that the L1 watcher is synced.
124+
sequencer_l1_watcher_tx.send(Arc::new(L1Notification::Synced)).await.unwrap();
125+
follower_l1_watcher_tx.send(Arc::new(L1Notification::Synced)).await.unwrap();
126+
127+
// wait for both nodes to be synced.
128+
wait_n_events(
129+
&mut sequencer_events,
130+
|e| matches!(e, ChainOrchestratorEvent::ChainConsolidated { from: _, to: _ }),
131+
1,
132+
)
133+
.await;
134+
wait_n_events(
135+
&mut follower_events,
136+
|e| matches!(e, ChainOrchestratorEvent::ChainConsolidated { from: _, to: _ }),
137+
1,
138+
)
139+
.await;
140+
141+
// construct a transaction and send it to the follower node.
142+
let wallet = Arc::new(tokio::sync::Mutex::new(wallet));
143+
let handle = tokio::spawn(async move {
144+
loop {
145+
let tx = generate_tx(wallet.clone()).await;
146+
follower.rpc.inject_tx(tx).await.unwrap();
147+
}
148+
});
149+
150+
// Assert that the follower node receives the new block.
151+
wait_n_events(
152+
&mut follower_events,
153+
|e| matches!(e, ChainOrchestratorEvent::ChainExtended(_)),
154+
1,
155+
)
156+
.await;
157+
158+
drop(handle);
159+
160+
Ok(())
161+
}
162+
101163
/// We test if the syncing of the RN is correctly triggered and released when the EN reaches sync.
102164
#[allow(clippy::large_stack_frames)]
103165
#[tokio::test]

0 commit comments

Comments
 (0)