diff --git a/bin/client/src/l1/pipeline.rs b/bin/client/src/l1/pipeline.rs index 36cbf02d..f2305c05 100644 --- a/bin/client/src/l1/pipeline.rs +++ b/bin/client/src/l1/pipeline.rs @@ -1,30 +1,23 @@ //! Contains an oracle-backed pipeline for -use alloc::{boxed::Box, sync::Arc}; +use alloc::sync::Arc; use alloy_consensus::Sealed; use async_trait::async_trait; use core::fmt::Debug; use kona_derive::{ attributes::StatefulAttributesBuilder, - errors::{PipelineError, PipelineErrorKind, ResetError}, pipeline::{DerivationPipeline, PipelineBuilder}, sources::EthereumDataSource, stages::{ AttributesQueue, BatchProvider, BatchStream, ChannelProvider, ChannelReader, FrameQueue, L1Retrieval, L1Traversal, }, - traits::{ - BlobProvider, ChainProvider, L2ChainProvider, OriginProvider, Pipeline, SignalReceiver, - }, - types::{ActivationSignal, ResetSignal, Signal, StepResult}, + traits::{BlobProvider, ChainProvider}, }; use kona_driver::SyncCursor; use kona_mpt::TrieProvider; use kona_preimage::{CommsClient, PreimageKey, PreimageKeyType}; -use op_alloy_genesis::RollupConfig; use op_alloy_protocol::{BatchValidationProvider, BlockInfo, L2BlockInfo}; -use op_alloy_rpc_types_engine::OpAttributesWithParent; -use tracing::{info, warn}; use crate::{ errors::OracleProviderError, l1::OracleL1ChainProvider, l2::OracleL2ChainProvider, BootInfo, @@ -185,101 +178,18 @@ where } #[async_trait] -impl kona_driver::Pipeline for OraclePipeline +impl kona_driver::DriverPipeline> for OraclePipeline where O: CommsClient + FlushableCache + Send + Sync + Debug, B: BlobProvider + Send + Sync + Debug + Clone, { - /// Produces the disputed [OpAttributesWithParent] payload, directly after the starting L2 - /// output root passed through the [crate::BootInfo]. - async fn produce_payload( - &mut self, - l2_safe_head: L2BlockInfo, - ) -> Result { - // As we start the safe head at the disputed block's parent, we step the pipeline until the - // first attributes are produced. All batches at and before the safe head will be - // dropped, so the first payload will always be the disputed one. - loop { - match self.pipeline.step(l2_safe_head).await { - StepResult::PreparedAttributes => { - info!(target: "client_derivation_driver", "Stepped derivation pipeline") - } - StepResult::AdvancedOrigin => { - info!(target: "client_derivation_driver", "Advanced origin") - } - StepResult::OriginAdvanceErr(e) | StepResult::StepFailed(e) => { - warn!(target: "client_derivation_driver", "Failed to step derivation pipeline: {:?}", e); - - // Break the loop unless the error signifies that there is not enough data to - // complete the current step. In this case, we retry the step to see if other - // stages can make progress. - match e { - PipelineErrorKind::Temporary(_) => continue, - PipelineErrorKind::Reset(e) => { - let system_config = self - .pipeline - .l2_chain_provider - .system_config_by_number( - l2_safe_head.block_info.number, - self.pipeline.rollup_config.clone(), - ) - .await?; - - if matches!(e, ResetError::HoloceneActivation) { - self.pipeline - .signal( - ActivationSignal { - l2_safe_head, - l1_origin: self - .pipeline - .origin() - .ok_or(PipelineError::MissingOrigin.crit())?, - system_config: Some(system_config), - } - .signal(), - ) - .await?; - } else { - // Flush the caching oracle if a reorg is detected. - if matches!(e, ResetError::ReorgDetected(_, _)) { - self.caching_oracle.as_ref().flush(); - } - - // Reset the pipeline to the initial L2 safe head and L1 origin, - // and try again. - self.pipeline - .signal( - ResetSignal { - l2_safe_head, - l1_origin: self - .pipeline - .origin() - .ok_or(PipelineError::MissingOrigin.crit())?, - system_config: Some(system_config), - } - .signal(), - ) - .await?; - } - } - PipelineErrorKind::Critical(_) => return Err(e), - } - } - } - - if let Some(attrs) = self.pipeline.next() { - return Ok(attrs); - } - } - } - - /// Signals the derivation pipeline. - async fn signal(&mut self, signal: Signal) -> Result<(), PipelineErrorKind> { - self.pipeline.signal(signal).await + /// Returns the inner pipeline. + fn inner(&mut self) -> &mut OracleDerivationPipeline { + &mut self.pipeline } - /// Returns the rollup configuration. - fn rollup_config(&self) -> Arc { - self.pipeline.rollup_config.clone() + /// Flushes the cache on re-org. + fn flush(&self) { + self.caching_oracle.flush(); } } diff --git a/crates/derive/src/pipeline/core.rs b/crates/derive/src/pipeline/core.rs index d96da92b..28094ef6 100644 --- a/crates/derive/src/pipeline/core.rs +++ b/crates/derive/src/pipeline/core.rs @@ -10,7 +10,7 @@ use crate::{ use alloc::{boxed::Box, collections::VecDeque, sync::Arc}; use async_trait::async_trait; use core::fmt::Debug; -use op_alloy_genesis::RollupConfig; +use op_alloy_genesis::{RollupConfig, SystemConfig}; use op_alloy_protocol::{BlockInfo, L2BlockInfo}; use op_alloy_rpc_types_engine::OpAttributesWithParent; use tracing::{error, trace, warn}; @@ -134,6 +134,22 @@ where self.prepared.front() } + /// Returns the rollup config. + fn rollup_config(&self) -> &RollupConfig { + &self.rollup_config + } + + /// Returns the [SystemConfig] by L2 number. + async fn system_config_by_number( + &mut self, + number: u64, + ) -> Result { + self.l2_chain_provider + .system_config_by_number(number, self.rollup_config.clone()) + .await + .map_err(Into::into) + } + /// Attempts to progress the pipeline. /// /// ## Returns diff --git a/crates/derive/src/traits/pipeline.rs b/crates/derive/src/traits/pipeline.rs index c5be6f52..a07f601a 100644 --- a/crates/derive/src/traits/pipeline.rs +++ b/crates/derive/src/traits/pipeline.rs @@ -1,13 +1,14 @@ //! Defines the interface for the core derivation pipeline. -use super::OriginProvider; -use crate::types::StepResult; use alloc::boxed::Box; use async_trait::async_trait; use core::iter::Iterator; +use op_alloy_genesis::{RollupConfig, SystemConfig}; use op_alloy_protocol::L2BlockInfo; use op_alloy_rpc_types_engine::OpAttributesWithParent; +use crate::{errors::PipelineErrorKind, traits::OriginProvider, types::StepResult}; + /// This trait defines the interface for interacting with the derivation pipeline. #[async_trait] pub trait Pipeline: OriginProvider + Iterator { @@ -16,4 +17,13 @@ pub trait Pipeline: OriginProvider + Iterator { /// Attempts to progress the pipeline. async fn step(&mut self, cursor: L2BlockInfo) -> StepResult; + + /// Returns the rollup config. + fn rollup_config(&self) -> &RollupConfig; + + /// Returns the [SystemConfig] by L2 number. + async fn system_config_by_number( + &mut self, + number: u64, + ) -> Result; } diff --git a/crates/driver/src/core.rs b/crates/driver/src/core.rs index b05104d8..6da34934 100644 --- a/crates/driver/src/core.rs +++ b/crates/driver/src/core.rs @@ -1,12 +1,13 @@ //! The driver of the Derivation Pipeline. use alloc::vec::Vec; -use alloy_consensus::{BlockBody, Header, Sealable, Sealed}; +use alloy_consensus::{BlockBody, Sealable}; use alloy_primitives::B256; use alloy_rlp::Decodable; use core::fmt::Debug; use kona_derive::{ errors::{PipelineError, PipelineErrorKind}, + traits::{Pipeline, SignalReceiver}, types::Signal, }; use op_alloy_consensus::{OpBlock, OpTxEnvelope, OpTxType}; @@ -15,50 +16,45 @@ use op_alloy_protocol::L2BlockInfo; use op_alloy_rpc_types_engine::OpAttributesWithParent; use tracing::{error, info, warn}; -use crate::{DriverError, DriverResult, Executor, ExecutorConstructor, Pipeline, SyncCursor}; +use crate::{DriverError, DriverPipeline, DriverResult, Executor, ExecutorConstructor, SyncCursor}; /// The Rollup Driver entrypoint. #[derive(Debug)] -pub struct Driver +pub struct Driver where E: Executor + Send + Sync + Debug, EC: ExecutorConstructor + Send + Sync + Debug, - P: Pipeline + Send + Sync + Debug, + DP: DriverPipeline

+ Send + Sync + Debug, + P: Pipeline + SignalReceiver + Send + Sync + Debug, { /// Marker for the executor. _marker: core::marker::PhantomData, + /// Marker for the pipeline. + _marker2: core::marker::PhantomData

, /// A pipeline abstraction. - pipeline: P, + pipeline: DP, /// Cursor to keep track of the L2 tip cursor: SyncCursor, /// Executor constructor. executor: EC, } -impl Driver +impl Driver where E: Executor + Send + Sync + Debug, EC: ExecutorConstructor + Send + Sync + Debug, - P: Pipeline + Send + Sync + Debug, + DP: DriverPipeline

+ Send + Sync + Debug, + P: Pipeline + SignalReceiver + Send + Sync + Debug, { /// Creates a new [Driver]. - pub const fn new(cursor: SyncCursor, executor: EC, pipeline: P) -> Self { - Self { _marker: core::marker::PhantomData, cursor, executor, pipeline } - } - - /// Returns the current L2 safe head. - pub const fn l2_safe_head(&self) -> &L2BlockInfo { - self.cursor.l2_safe_head() - } - - /// Returns the header of the L2 safe head. - pub const fn l2_safe_head_header(&self) -> &Sealed

{ - self.cursor.l2_safe_head_header() - } - - /// Returns the output root of the L2 safe head. - pub const fn l2_safe_head_output_root(&self) -> &B256 { - self.cursor.l2_safe_head_output_root() + pub const fn new(cursor: SyncCursor, executor: EC, pipeline: DP) -> Self { + Self { + _marker: core::marker::PhantomData, + _marker2: core::marker::PhantomData, + pipeline, + cursor, + executor, + } } /// Advances the derivation pipeline to the target block number. @@ -78,17 +74,17 @@ where ) -> DriverResult<(u64, B256), E::Error> { loop { // Check if we have reached the target block number. - if self.l2_safe_head().block_info.number >= target { + if self.cursor.l2_safe_head().block_info.number >= target { info!(target: "client", "Derivation complete, reached L2 safe head."); return Ok(( - self.l2_safe_head().block_info.number, - *self.l2_safe_head_output_root(), + self.cursor.l2_safe_head().block_info.number, + *self.cursor.l2_safe_head_output_root(), )); } let OpAttributesWithParent { mut attributes, .. } = match self .pipeline - .produce_payload(*self.l2_safe_head()) + .produce_payload(*self.cursor.l2_safe_head()) .await { Ok(attrs) => attrs, @@ -97,7 +93,7 @@ where // Adjust the target block number to the current safe head, as no more blocks // can be produced. - target = self.l2_safe_head().block_info.number; + target = self.cursor.l2_safe_head().block_info.number; continue; } Err(e) => { @@ -106,7 +102,8 @@ where } }; - let mut executor = self.executor.new_executor(self.l2_safe_head_header().clone()); + let mut executor = + self.executor.new_executor(self.cursor.l2_safe_head_header().clone()); let header = match executor.execute_payload(attributes.clone()) { Ok(header) => header, Err(e) => { @@ -120,7 +117,7 @@ where // deposit-only block due to execution failure, the // batch and channel it is contained in is forwards // invalidated. - self.pipeline.signal(Signal::FlushChannel).await?; + self.pipeline.inner().signal(Signal::FlushChannel).await?; // Strip out all transactions that are not deposits. attributes.transactions = attributes.transactions.map(|txs| { @@ -130,7 +127,8 @@ where }); // Retry the execution. - executor = self.executor.new_executor(self.l2_safe_head_header().clone()); + executor = + self.executor.new_executor(self.cursor.l2_safe_head_header().clone()); match executor.execute_payload(attributes.clone()) { Ok(header) => header, Err(e) => { @@ -166,7 +164,7 @@ where // Update the safe head. self.cursor.l2_safe_head = L2BlockInfo::from_block_and_genesis( &block, - &self.pipeline.rollup_config().genesis, + &self.pipeline.inner().rollup_config().genesis, )?; self.cursor.l2_safe_head_header = header.clone().seal_slow(); self.cursor.l2_safe_head_output_root = diff --git a/crates/driver/src/lib.rs b/crates/driver/src/lib.rs index 68124bc0..0731f63a 100644 --- a/crates/driver/src/lib.rs +++ b/crates/driver/src/lib.rs @@ -13,7 +13,7 @@ mod errors; pub use errors::{DriverError, DriverResult}; mod pipeline; -pub use pipeline::Pipeline; +pub use pipeline::DriverPipeline; mod executor; pub use executor::{Executor, ExecutorConstructor}; diff --git a/crates/driver/src/pipeline.rs b/crates/driver/src/pipeline.rs index dba137db..0e1f919e 100644 --- a/crates/driver/src/pipeline.rs +++ b/crates/driver/src/pipeline.rs @@ -1,26 +1,109 @@ //! Abstracts the derivation pipeline from the driver. -use alloc::{boxed::Box, sync::Arc}; +use alloc::boxed::Box; use async_trait::async_trait; -use kona_derive::{errors::PipelineErrorKind, types::Signal}; -use op_alloy_genesis::RollupConfig; use op_alloy_protocol::L2BlockInfo; use op_alloy_rpc_types_engine::OpAttributesWithParent; -/// Pipeline +use kona_derive::{ + errors::{PipelineError, PipelineErrorKind, ResetError}, + traits::{Pipeline, SignalReceiver}, + types::{ActivationSignal, ResetSignal, StepResult}, +}; +use tracing::{info, warn}; + +/// The Driver's Pipeline /// /// A high-level abstraction for the driver's derivation pipeline. #[async_trait] -pub trait Pipeline { - /// Advance the pipeline to the target block. +pub trait DriverPipeline

+where + P: Pipeline + SignalReceiver, +{ + /// Returns the inner Pipeline. + fn inner(&mut self) -> &mut P; + + /// Flushes any cache on re-org. + fn flush(&self); + + /// Produces the disputed [OpAttributesWithParent] payload, directly after the given + /// starting l2 safe head. async fn produce_payload( &mut self, l2_safe_head: L2BlockInfo, - ) -> Result; + ) -> Result { + // As we start the safe head at the disputed block's parent, we step the pipeline until the + // first attributes are produced. All batches at and before the safe head will be + // dropped, so the first payload will always be the disputed one. + loop { + match self.inner().step(l2_safe_head).await { + StepResult::PreparedAttributes => { + info!(target: "client_derivation_driver", "Stepped derivation pipeline") + } + StepResult::AdvancedOrigin => { + info!(target: "client_derivation_driver", "Advanced origin") + } + StepResult::OriginAdvanceErr(e) | StepResult::StepFailed(e) => { + warn!(target: "client_derivation_driver", "Failed to step derivation pipeline: {:?}", e); + + // Break the loop unless the error signifies that there is not enough data to + // complete the current step. In this case, we retry the step to see if other + // stages can make progress. + match e { + PipelineErrorKind::Temporary(_) => continue, + PipelineErrorKind::Reset(e) => { + let system_config = self + .inner() + .system_config_by_number(l2_safe_head.block_info.number) + .await?; + + if matches!(e, ResetError::HoloceneActivation) { + let l1_origin = self + .inner() + .origin() + .ok_or(PipelineError::MissingOrigin.crit())?; + self.inner() + .signal( + ActivationSignal { + l2_safe_head, + l1_origin, + system_config: Some(system_config), + } + .signal(), + ) + .await?; + } else { + // Flushes cache if a reorg is detected. + if matches!(e, ResetError::ReorgDetected(_, _)) { + self.flush(); + } - /// Signal the pipeline. - async fn signal(&mut self, signal: Signal) -> Result<(), PipelineErrorKind>; + // Reset the pipeline to the initial L2 safe head and L1 origin, + // and try again. + let l1_origin = self + .inner() + .origin() + .ok_or(PipelineError::MissingOrigin.crit())?; + self.inner() + .signal( + ResetSignal { + l2_safe_head, + l1_origin, + system_config: Some(system_config), + } + .signal(), + ) + .await?; + } + } + PipelineErrorKind::Critical(_) => return Err(e), + } + } + } - /// Returns the Pipeline's rollup config. - fn rollup_config(&self) -> Arc; + if let Some(attrs) = self.inner().next() { + return Ok(attrs); + } + } + } }