Skip to content

Commit

Permalink
feat: abstract pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
refcell committed Nov 10, 2024
1 parent b79054a commit 270cf15
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 147 deletions.
108 changes: 9 additions & 99 deletions bin/client/src/l1/pipeline.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,23 @@
//! Contains an oracle-backed pipeline for

use alloc::{boxed::Box, sync::Arc};
use alloc::sync::Arc;
use alloy_consensus::Sealed;
use async_trait::async_trait;
use core::fmt::Debug;
use kona_derive::{
attributes::StatefulAttributesBuilder,
errors::{PipelineError, PipelineErrorKind, ResetError},
pipeline::{DerivationPipeline, PipelineBuilder},
sources::EthereumDataSource,
stages::{
AttributesQueue, BatchProvider, BatchStream, ChannelProvider, ChannelReader, FrameQueue,
L1Retrieval, L1Traversal,
},
traits::{
BlobProvider, ChainProvider, L2ChainProvider, OriginProvider, Pipeline, SignalReceiver,
},
types::{ActivationSignal, ResetSignal, Signal, StepResult},
traits::{BlobProvider, ChainProvider},
};
use kona_driver::SyncCursor;
use kona_mpt::TrieProvider;
use kona_preimage::{CommsClient, PreimageKey, PreimageKeyType};
use op_alloy_genesis::RollupConfig;
use op_alloy_protocol::{BatchValidationProvider, BlockInfo, L2BlockInfo};
use op_alloy_rpc_types_engine::OpAttributesWithParent;
use tracing::{info, warn};

use crate::{
errors::OracleProviderError, l1::OracleL1ChainProvider, l2::OracleL2ChainProvider, BootInfo,
Expand Down Expand Up @@ -185,101 +178,18 @@ where
}

#[async_trait]
impl<O, B> kona_driver::Pipeline for OraclePipeline<O, B>
impl<O, B> kona_driver::DriverPipeline<OracleDerivationPipeline<O, B>> for OraclePipeline<O, B>
where
O: CommsClient + FlushableCache + Send + Sync + Debug,
B: BlobProvider + Send + Sync + Debug + Clone,
{
/// Produces the disputed [OpAttributesWithParent] payload, directly after the starting L2
/// output root passed through the [crate::BootInfo].
async fn produce_payload(
&mut self,
l2_safe_head: L2BlockInfo,
) -> Result<OpAttributesWithParent, PipelineErrorKind> {
// As we start the safe head at the disputed block's parent, we step the pipeline until the
// first attributes are produced. All batches at and before the safe head will be
// dropped, so the first payload will always be the disputed one.
loop {
match self.pipeline.step(l2_safe_head).await {
StepResult::PreparedAttributes => {
info!(target: "client_derivation_driver", "Stepped derivation pipeline")
}
StepResult::AdvancedOrigin => {
info!(target: "client_derivation_driver", "Advanced origin")
}
StepResult::OriginAdvanceErr(e) | StepResult::StepFailed(e) => {
warn!(target: "client_derivation_driver", "Failed to step derivation pipeline: {:?}", e);

// Break the loop unless the error signifies that there is not enough data to
// complete the current step. In this case, we retry the step to see if other
// stages can make progress.
match e {
PipelineErrorKind::Temporary(_) => continue,
PipelineErrorKind::Reset(e) => {
let system_config = self
.pipeline
.l2_chain_provider
.system_config_by_number(
l2_safe_head.block_info.number,
self.pipeline.rollup_config.clone(),
)
.await?;

if matches!(e, ResetError::HoloceneActivation) {
self.pipeline
.signal(
ActivationSignal {
l2_safe_head,
l1_origin: self
.pipeline
.origin()
.ok_or(PipelineError::MissingOrigin.crit())?,
system_config: Some(system_config),
}
.signal(),
)
.await?;
} else {
// Flush the caching oracle if a reorg is detected.
if matches!(e, ResetError::ReorgDetected(_, _)) {
self.caching_oracle.as_ref().flush();
}

// Reset the pipeline to the initial L2 safe head and L1 origin,
// and try again.
self.pipeline
.signal(
ResetSignal {
l2_safe_head,
l1_origin: self
.pipeline
.origin()
.ok_or(PipelineError::MissingOrigin.crit())?,
system_config: Some(system_config),
}
.signal(),
)
.await?;
}
}
PipelineErrorKind::Critical(_) => return Err(e),
}
}
}

if let Some(attrs) = self.pipeline.next() {
return Ok(attrs);
}
}
}

/// Signals the derivation pipeline.
async fn signal(&mut self, signal: Signal) -> Result<(), PipelineErrorKind> {
self.pipeline.signal(signal).await
/// Returns the inner pipeline.
fn inner(&mut self) -> &mut OracleDerivationPipeline<O, B> {
&mut self.pipeline
}

/// Returns the rollup configuration.
fn rollup_config(&self) -> Arc<RollupConfig> {
self.pipeline.rollup_config.clone()
/// Flushes the cache on re-org.
fn flush(&self) {
self.caching_oracle.flush();
}
}
18 changes: 17 additions & 1 deletion crates/derive/src/pipeline/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
use alloc::{boxed::Box, collections::VecDeque, sync::Arc};
use async_trait::async_trait;
use core::fmt::Debug;
use op_alloy_genesis::RollupConfig;
use op_alloy_genesis::{RollupConfig, SystemConfig};
use op_alloy_protocol::{BlockInfo, L2BlockInfo};
use op_alloy_rpc_types_engine::OpAttributesWithParent;
use tracing::{error, trace, warn};
Expand Down Expand Up @@ -134,6 +134,22 @@ where
self.prepared.front()
}

/// Returns the rollup config.
fn rollup_config(&self) -> &RollupConfig {
&self.rollup_config
}

/// Returns the [SystemConfig] by L2 number.
async fn system_config_by_number(
&mut self,
number: u64,
) -> Result<SystemConfig, PipelineErrorKind> {
self.l2_chain_provider
.system_config_by_number(number, self.rollup_config.clone())
.await
.map_err(Into::into)
}

/// Attempts to progress the pipeline.
///
/// ## Returns
Expand Down
14 changes: 12 additions & 2 deletions crates/derive/src/traits/pipeline.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
//! Defines the interface for the core derivation pipeline.

use super::OriginProvider;
use crate::types::StepResult;
use alloc::boxed::Box;
use async_trait::async_trait;
use core::iter::Iterator;
use op_alloy_genesis::{RollupConfig, SystemConfig};
use op_alloy_protocol::L2BlockInfo;
use op_alloy_rpc_types_engine::OpAttributesWithParent;

use crate::{errors::PipelineErrorKind, traits::OriginProvider, types::StepResult};

/// This trait defines the interface for interacting with the derivation pipeline.
#[async_trait]
pub trait Pipeline: OriginProvider + Iterator<Item = OpAttributesWithParent> {
Expand All @@ -16,4 +17,13 @@ pub trait Pipeline: OriginProvider + Iterator<Item = OpAttributesWithParent> {

/// Attempts to progress the pipeline.
async fn step(&mut self, cursor: L2BlockInfo) -> StepResult;

/// Returns the rollup config.
fn rollup_config(&self) -> &RollupConfig;

/// Returns the [SystemConfig] by L2 number.
async fn system_config_by_number(
&mut self,
number: u64,
) -> Result<SystemConfig, PipelineErrorKind>;
}
64 changes: 31 additions & 33 deletions crates/driver/src/core.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
//! The driver of the Derivation Pipeline.

use alloc::vec::Vec;
use alloy_consensus::{BlockBody, Header, Sealable, Sealed};
use alloy_consensus::{BlockBody, Sealable};
use alloy_primitives::B256;
use alloy_rlp::Decodable;
use core::fmt::Debug;
use kona_derive::{
errors::{PipelineError, PipelineErrorKind},
traits::{Pipeline, SignalReceiver},
types::Signal,
};
use op_alloy_consensus::{OpBlock, OpTxEnvelope, OpTxType};
Expand All @@ -15,50 +16,45 @@ use op_alloy_protocol::L2BlockInfo;
use op_alloy_rpc_types_engine::OpAttributesWithParent;
use tracing::{error, info, warn};

use crate::{DriverError, DriverResult, Executor, ExecutorConstructor, Pipeline, SyncCursor};
use crate::{DriverError, DriverPipeline, DriverResult, Executor, ExecutorConstructor, SyncCursor};

/// The Rollup Driver entrypoint.
#[derive(Debug)]
pub struct Driver<E, EC, P>
pub struct Driver<E, EC, DP, P>
where
E: Executor + Send + Sync + Debug,
EC: ExecutorConstructor<E> + Send + Sync + Debug,
P: Pipeline + Send + Sync + Debug,
DP: DriverPipeline<P> + Send + Sync + Debug,
P: Pipeline + SignalReceiver + Send + Sync + Debug,
{
/// Marker for the executor.
_marker: core::marker::PhantomData<E>,
/// Marker for the pipeline.
_marker2: core::marker::PhantomData<P>,
/// A pipeline abstraction.
pipeline: P,
pipeline: DP,
/// Cursor to keep track of the L2 tip
cursor: SyncCursor,
/// Executor constructor.
executor: EC,
}

impl<E, EC, P> Driver<E, EC, P>
impl<E, EC, DP, P> Driver<E, EC, DP, P>
where
E: Executor + Send + Sync + Debug,
EC: ExecutorConstructor<E> + Send + Sync + Debug,
P: Pipeline + Send + Sync + Debug,
DP: DriverPipeline<P> + Send + Sync + Debug,
P: Pipeline + SignalReceiver + Send + Sync + Debug,
{
/// Creates a new [Driver].
pub const fn new(cursor: SyncCursor, executor: EC, pipeline: P) -> Self {
Self { _marker: core::marker::PhantomData, cursor, executor, pipeline }
}

/// Returns the current L2 safe head.
pub const fn l2_safe_head(&self) -> &L2BlockInfo {
self.cursor.l2_safe_head()
}

/// Returns the header of the L2 safe head.
pub const fn l2_safe_head_header(&self) -> &Sealed<Header> {
self.cursor.l2_safe_head_header()
}

/// Returns the output root of the L2 safe head.
pub const fn l2_safe_head_output_root(&self) -> &B256 {
self.cursor.l2_safe_head_output_root()
pub const fn new(cursor: SyncCursor, executor: EC, pipeline: DP) -> Self {
Self {
_marker: core::marker::PhantomData,
_marker2: core::marker::PhantomData,
pipeline,
cursor,
executor,
}
}

/// Advances the derivation pipeline to the target block number.
Expand All @@ -78,17 +74,17 @@ where
) -> DriverResult<(u64, B256), E::Error> {
loop {
// Check if we have reached the target block number.
if self.l2_safe_head().block_info.number >= target {
if self.cursor.l2_safe_head().block_info.number >= target {
info!(target: "client", "Derivation complete, reached L2 safe head.");
return Ok((
self.l2_safe_head().block_info.number,
*self.l2_safe_head_output_root(),
self.cursor.l2_safe_head().block_info.number,
*self.cursor.l2_safe_head_output_root(),
));
}

let OpAttributesWithParent { mut attributes, .. } = match self
.pipeline
.produce_payload(*self.l2_safe_head())
.produce_payload(*self.cursor.l2_safe_head())
.await
{
Ok(attrs) => attrs,
Expand All @@ -97,7 +93,7 @@ where

// Adjust the target block number to the current safe head, as no more blocks
// can be produced.
target = self.l2_safe_head().block_info.number;
target = self.cursor.l2_safe_head().block_info.number;
continue;
}
Err(e) => {
Expand All @@ -106,7 +102,8 @@ where
}
};

let mut executor = self.executor.new_executor(self.l2_safe_head_header().clone());
let mut executor =
self.executor.new_executor(self.cursor.l2_safe_head_header().clone());
let header = match executor.execute_payload(attributes.clone()) {
Ok(header) => header,
Err(e) => {
Expand All @@ -120,7 +117,7 @@ where
// deposit-only block due to execution failure, the
// batch and channel it is contained in is forwards
// invalidated.
self.pipeline.signal(Signal::FlushChannel).await?;
self.pipeline.inner().signal(Signal::FlushChannel).await?;

// Strip out all transactions that are not deposits.
attributes.transactions = attributes.transactions.map(|txs| {
Expand All @@ -130,7 +127,8 @@ where
});

// Retry the execution.
executor = self.executor.new_executor(self.l2_safe_head_header().clone());
executor =
self.executor.new_executor(self.cursor.l2_safe_head_header().clone());
match executor.execute_payload(attributes.clone()) {
Ok(header) => header,
Err(e) => {
Expand Down Expand Up @@ -166,7 +164,7 @@ where
// Update the safe head.
self.cursor.l2_safe_head = L2BlockInfo::from_block_and_genesis(
&block,
&self.pipeline.rollup_config().genesis,
&self.pipeline.inner().rollup_config().genesis,
)?;
self.cursor.l2_safe_head_header = header.clone().seal_slow();
self.cursor.l2_safe_head_output_root =
Expand Down
2 changes: 1 addition & 1 deletion crates/driver/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ mod errors;
pub use errors::{DriverError, DriverResult};

mod pipeline;
pub use pipeline::Pipeline;
pub use pipeline::DriverPipeline;

mod executor;
pub use executor::{Executor, ExecutorConstructor};
Expand Down
Loading

0 comments on commit 270cf15

Please sign in to comment.