diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 3590bf5f1f..bd601f9848 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -119,15 +119,8 @@ jobs: cargo clippy --workspace \ --all-targets \ --all-features \ - --exclude astria-conductor \ -- --warn clippy::pedantic \ --deny warnings - - name: run default clippy - run: | - cargo clippy --workspace \ - --all-targets \ - --all-features \ - -- --deny warnings test: if: ${{ always() && !cancelled() }} diff --git a/crates/astria-conductor/src/block_verifier.rs b/crates/astria-conductor/src/block_verifier.rs index 8f1e93d6e4..fbce139996 100644 --- a/crates/astria-conductor/src/block_verifier.rs +++ b/crates/astria-conductor/src/block_verifier.rs @@ -440,7 +440,7 @@ mod test { let chain_ids_commitment_inclusion_proof = tx_tree.prove_inclusion(1).unwrap(); let mut header = astria_sequencer_types::test_utils::default_header(); - let height = header.height.value() as u32; + let height = header.height.value().try_into().unwrap(); header.data_hash = Some(Hash::try_from(data_hash.to_vec()).unwrap()); let (validator_set, proposer_address, commit) = @@ -481,7 +481,7 @@ mod test { let chain_ids_commitment_inclusion_proof = tx_tree.prove_inclusion(1).unwrap(); let mut header = astria_sequencer_types::test_utils::default_header(); - let height = header.height.value() as u32; + let height = header.height.value().try_into().unwrap(); header.data_hash = Some(Hash::try_from(data_hash.to_vec()).unwrap()); let (validator_set, proposer_address, commit) = diff --git a/crates/astria-conductor/src/client_provider.rs b/crates/astria-conductor/src/client_provider.rs index 4e992a24a7..f296b0eeb2 100644 --- a/crates/astria-conductor/src/client_provider.rs +++ b/crates/astria-conductor/src/client_provider.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use async_trait::async_trait; use color_eyre::eyre::{ self, @@ -16,15 +18,21 @@ use tokio::{ }, task::JoinHandle, }; -use tracing::instrument::Instrumented; +use tracing::{ + instrument::Instrumented, + warn, +}; +use tryhard::{ + backoff_strategies::ExponentialBackoff, + OnRetry, + RetryFutureConfig, +}; type ClientRx = mpsc::UnboundedReceiver>>; type ClientTx = mpsc::UnboundedSender>>; -pub(super) async fn start_pool(url: &str) -> eyre::Result> { - let client_provider = ClientProvider::new(url) - .await - .wrap_err("failed initializing sequencer client provider")?; +pub(super) fn start_pool(url: &str) -> eyre::Result> { + let client_provider = ClientProvider::new(url); Pool::builder(client_provider) .max_size(50) .build() @@ -44,12 +52,40 @@ pub(crate) struct ClientProvider { _provider_loop: Instrumented>, } +fn make_retry_config( + attempts: u32, +) -> RetryFutureConfig< + ExponentialBackoff, + impl Copy + OnRetry, +> { + RetryFutureConfig::new(attempts) + .exponential_backoff(Duration::from_secs(5)) + .max_delay(Duration::from_secs(60)) + .on_retry( + |attempt, + next_delay: Option, + error: &sequencer_client::tendermint_rpc::Error| { + let error = error.clone(); + let wait_duration = next_delay + .map(humantime::format_duration) + .map(tracing::field::display); + async move { + let error = &error as &(dyn std::error::Error + 'static); + warn!( + attempt, + wait_duration, + error, + "attempt to connect to sequencer websocket failed; retrying after backoff", + ); + } + }, + ) +} + impl ClientProvider { const RECONNECTION_ATTEMPTS: u32 = 1024; - pub(crate) async fn new(url: &str) -> eyre::Result { - use std::time::Duration; - + pub(crate) fn new(url: &str) -> Self { use futures::{ future::FusedFuture as _, FutureExt as _, @@ -57,7 +93,6 @@ impl ClientProvider { use tracing::{ info, info_span, - warn, Instrument as _, }; let (client_tx, mut client_rx): (ClientTx, ClientRx) = mpsc::unbounded_channel(); @@ -67,32 +102,10 @@ impl ClientProvider { strategy = "exponential backoff", "connecting to sequencer websocket" ); - let retry_config = tryhard::RetryFutureConfig::new(Self::RECONNECTION_ATTEMPTS) - .exponential_backoff(Duration::from_secs(5)) - .max_delay(Duration::from_secs(60)) - .on_retry( - |attempt, - next_delay: Option, - error: &sequencer_client::tendermint_rpc::Error| { - let error = error.clone(); - let wait_duration = next_delay - .map(humantime::format_duration) - .map(tracing::field::display); - async move { - let error = &error as &(dyn std::error::Error + 'static); - warn!( - attempt, - wait_duration, - error, - "attempt to connect to sequencer websocket failed; retrying after \ - backoff", - ); - } - }, - ); + let retry_config = make_retry_config(Self::RECONNECTION_ATTEMPTS); let url_ = url.to_string(); - let _provider_loop = tokio::spawn(async move { + let provider_loop = tokio::spawn(async move { let mut client = None; let mut driver_task = futures::future::Fuse::terminated(); let mut reconnect = tryhard::retry_fn(|| { @@ -114,7 +127,7 @@ impl ClientProvider { Ok(Err(e)) => ("error", Some(eyre::Report::new(e).wrap_err("driver task exited with error"))), Err(e) => ("panic", Some(eyre::Report::new(e).wrap_err("driver task failed"))), }; - let error: Option<&(dyn std::error::Error + 'static)> = err.as_ref().map(|e| e.as_ref()); + let error: Option<&(dyn std::error::Error + 'static)> = err.as_ref().map(AsRef::as_ref); warn!( error, reason, @@ -158,14 +171,14 @@ impl ClientProvider { pending_requests.push(tx); } } - ) + ); } }).instrument(info_span!("client provider loop", url)); - Ok(Self { + Self { client_tx, - _provider_loop, - }) + _provider_loop: provider_loop, + } } async fn get(&self) -> Result { @@ -227,10 +240,10 @@ pub(crate) mod mock { let mut module = RpcModule::new(()); module.register_method("say_hello", |_, _| "lo").unwrap(); let address = server.local_addr().unwrap(); - let _handle = server.start(module); - let pool = start_pool(&format!("ws://{address}")).await.unwrap(); + let handle = server.start(module); + let pool = start_pool(&format!("ws://{address}")).unwrap(); Self { - _handle, + _handle: handle, pool, } } diff --git a/crates/astria-conductor/src/conductor.rs b/crates/astria-conductor/src/conductor.rs index 7e6110bcbc..20ebc2c372 100644 --- a/crates/astria-conductor/src/conductor.rs +++ b/crates/astria-conductor/src/conductor.rs @@ -47,25 +47,25 @@ use crate::{ }; pub struct Conductor { - /// Listens for several unix signals and notifies its subscribers. - signals: SignalReceiver, + /// The data availability reader that is spawned after sync is completed. + /// Constructed if constructed if `disable_finalization = false`. + data_availability_reader: Option, - /// The different long-running tasks that make up the conductor; - tasks: JoinMap<&'static str, eyre::Result<()>>, + /// The object pool of sequencer clients that restarts the websocket connection + /// on failure. + sequencer_client_pool: deadpool::managed::Pool, /// Channels to the long-running tasks to shut them down gracefully shutdown_channels: HashMap<&'static str, oneshot::Sender<()>>, - /// The object pool of sequencer clients that restarts the websocket connection - /// on failure. - sequencer_client_pool: deadpool::managed::Pool, + /// Listens for several unix signals and notifies its subscribers. + signals: SignalReceiver, /// The channel over which the sequencer reader task notifies conductor that sync is completed. sync_done: Fuse>, - /// The data availability reader that is spawned after sync is completed. - /// Constructed if constructed if `disable_finalization = false`. - data_availability_reader: Option, + /// The different long-running tasks that make up the conductor; + tasks: JoinMap<&'static str, eyre::Result<()>>, } impl Conductor { @@ -73,6 +73,12 @@ impl Conductor { const EXECUTOR: &'static str = "executor"; const SEQUENCER: &'static str = "sequencer"; + /// Create a new [`Conductor`] from a [`Config`]. + /// + /// # Errors + /// Returns an error in the following cases if one of its constituent + /// actors could not be spawned (executor, sequencer reader, or data availability reader). + /// This usually happens if the actors failed to connect to their respective endpoints. pub async fn new(cfg: Config) -> eyre::Result { use futures::FutureExt; @@ -105,7 +111,6 @@ impl Conductor { }; let sequencer_client_pool = client_provider::start_pool(&cfg.sequencer_url) - .await .wrap_err("failed to create sequencer client pool")?; // Spawn the sequencer task @@ -165,51 +170,33 @@ impl Conductor { }; Ok(Self { - signals, - tasks, + data_availability_reader, sequencer_client_pool, shutdown_channels, + signals, sync_done, - data_availability_reader, + tasks, }) } - pub async fn run_until_stopped(self) -> eyre::Result<()> { - use futures::future::{ - FusedFuture as _, - FutureExt as _, - }; - - let Self { - signals: - SignalReceiver { - mut reload_rx, - mut stop_rx, - }, - mut tasks, - shutdown_channels, - sequencer_client_pool, - sync_done, - mut data_availability_reader, - } = self; - - let mut sync_done = sync_done.fuse(); + pub async fn run_until_stopped(mut self) { + use futures::future::FusedFuture as _; loop { select! { // FIXME: The bias should only be on the signal channels. The two handlers should have the same bias. biased; - _ = stop_rx.changed() => { + _ = self.signals.stop_rx.changed() => { info!("shutting down conductor"); break; } - _ = reload_rx.changed() => { + _ = self.signals.reload_rx.changed() => { info!("reloading is currently not implemented"); } - res = &mut sync_done, if !sync_done.is_terminated() => { + res = &mut self.sync_done, if !self.sync_done.is_terminated() => { match res { Ok(()) => info!("received sync-complete signal from sequencer reader"), Err(e) => { @@ -217,16 +204,16 @@ impl Conductor { warn!(error, "sync-complete channel failed prematurely"); } } - if let Some(data_availability_reader) = data_availability_reader.take() { + if let Some(data_availability_reader) = self.data_availability_reader.take() { info!("starting data availability reader"); - tasks.spawn( + self.tasks.spawn( Self::DATA_AVAILABILITY, data_availability_reader.run_until_stopped(), ); } } - Some((name, res)) = tasks.join_next() => { + Some((name, res)) = self.tasks.join_next() => { match res { Ok(Ok(())) => error!(task.name = name, "task exited unexpectedly, shutting down"), Ok(Err(e)) => { @@ -242,16 +229,21 @@ impl Conductor { } } + info!("shutting down conductor"); + self.shutdown().await; + } + + async fn shutdown(self) { info!("sending shutdown command to all tasks"); - for (_, channel) in shutdown_channels { + for (_, channel) in self.shutdown_channels { let _ = channel.send(()); } - sequencer_client_pool.close(); + self.sequencer_client_pool.close(); info!("waiting 5 seconds for all tasks to shut down"); // put the tasks into an Rc to make them 'static so they can run on a local set - let mut tasks = Rc::new(tasks); + let mut tasks = Rc::new(self.tasks); let local_set = LocalSet::new(); local_set .run_until(async { @@ -294,7 +286,6 @@ impl Conductor { .shutdown() .await; } - Ok(()) } } diff --git a/crates/astria-conductor/src/config.rs b/crates/astria-conductor/src/config.rs index f06ea6742e..07d868b38c 100644 --- a/crates/astria-conductor/src/config.rs +++ b/crates/astria-conductor/src/config.rs @@ -13,11 +13,11 @@ pub enum CommitLevel { } impl CommitLevel { - pub fn is_soft_only(&self) -> bool { + pub(crate) fn is_soft_only(&self) -> bool { matches!(self, Self::SoftOnly) } - pub fn is_firm_only(&self) -> bool { + pub(crate) fn is_firm_only(&self) -> bool { matches!(self, Self::FirmOnly) } } diff --git a/crates/astria-conductor/src/data_availability.rs b/crates/astria-conductor/src/data_availability.rs index acb9dfd4af..b497adeba5 100644 --- a/crates/astria-conductor/src/data_availability.rs +++ b/crates/astria-conductor/src/data_availability.rs @@ -1,5 +1,10 @@ use std::time::Duration; +use astria_sequencer_types::{ + ChainId, + RawSequencerBlockData, + SequencerBlockData, +}; use celestia_client::{ celestia_types::{ nmt::Namespace, @@ -14,6 +19,10 @@ use color_eyre::eyre::{ self, WrapErr as _, }; +use tendermint::{ + block::Header, + Hash, +}; use tokio::{ select, sync::{ @@ -42,9 +51,40 @@ use crate::{ BlockVerifier, }, executor, - types::SequencerBlockSubset, }; +/// `SequencerBlockSubset` is a subset of a `SequencerBlock` that contains +/// information required for transaction data verification, and the transactions +/// for one specific rollup. +#[derive(Clone, Debug)] +pub(crate) struct SequencerBlockSubset { + pub(crate) block_hash: Hash, + pub(crate) header: Header, + pub(crate) rollup_transactions: Vec>, +} + +impl SequencerBlockSubset { + pub(crate) fn from_sequencer_block_data(data: SequencerBlockData, chain_id: &ChainId) -> Self { + // we don't need to verify the action tree root here, + // as [`SequencerBlockData`] would not be constructable + // if it was invalid + let RawSequencerBlockData { + block_hash, + header, + mut rollup_data, + .. + } = data.into_raw(); + + let rollup_transactions = rollup_data.remove(chain_id).unwrap_or_default(); + + Self { + block_hash, + header, + rollup_transactions, + } + } +} + pub(crate) struct Reader { /// The channel used to send messages to the executor task. executor_tx: executor::Sender, @@ -156,7 +196,7 @@ impl Reader { span.in_scope(|| self.send_sequencer_subsets(res)) .wrap_err("failed sending sequencer subsets to executor")?; } - ) + ); } Ok(()) } @@ -166,7 +206,7 @@ impl Reader { let client = self.celestia_client.clone(); self.get_latest_height = Some(tokio::spawn(async move { Ok(client.header_network_head().await?.header.height) - })) + })); } /// Starts fetching sequencer blobs for each height between `self.current_height` @@ -328,7 +368,7 @@ async fn verify_sequencer_blobs_and_assemble_rollups( namespace: Namespace, ) -> eyre::Result> { // spawn the verification tasks - let mut verification_tasks = verify_all_datas(sequencer_blobs, block_verifier); + let mut verification_tasks = verify_all_datas(sequencer_blobs, &block_verifier); let (assembly_tx, assembly_rx) = mpsc::channel(256); let block_assembler = task::spawn(assemble_blocks(assembly_rx)); @@ -338,11 +378,11 @@ async fn verify_sequencer_blobs_and_assemble_rollups( match verification_result { Err(e) => { let error = &e as &(dyn std::error::Error + 'static); - warn!(%block_hash, error, "task verifying sequencer data retrieved from celestia failed; dropping block") + warn!(%block_hash, error, "task verifying sequencer data retrieved from celestia failed; dropping block"); } Ok(Err(e)) => { let error: &(dyn std::error::Error + 'static) = e.as_ref(); - warn!(%block_hash, error, "task verifying sequencer data retrieved from celestia returned with an error; dropping block") + warn!(%block_hash, error, "task verifying sequencer data retrieved from celestia returned with an error; dropping block"); } Ok(Ok(data)) => { fetch_and_verify_rollups.spawn( @@ -426,7 +466,7 @@ async fn assemble_blocks( ) -> Vec { let mut blocks = Vec::new(); while let Some(subset) = assembly_rx.recv().await { - blocks.push(subset) + blocks.push(subset); } blocks.sort_unstable_by(|a, b| a.header.height.cmp(&b.header.height)); blocks @@ -434,7 +474,7 @@ async fn assemble_blocks( fn verify_all_datas( datas: Vec, - block_verifier: BlockVerifier, + block_verifier: &BlockVerifier, ) -> JoinMap> { let mut verification_tasks = JoinMap::new(); for data in datas { diff --git a/crates/astria-conductor/src/execution_client.rs b/crates/astria-conductor/src/executor/client.rs similarity index 98% rename from crates/astria-conductor/src/execution_client.rs rename to crates/astria-conductor/src/executor/client.rs index 1b1aea0779..ac8ccf9cd1 100644 --- a/crates/astria-conductor/src/execution_client.rs +++ b/crates/astria-conductor/src/executor/client.rs @@ -17,12 +17,12 @@ use proto::generated::execution::v1alpha2::{ }; use tonic::transport::Channel; -use crate::types::ExecutorCommitmentState; +use super::ExecutorCommitmentState; /// Extension trait for the ExecutionServiceClient that makes it easier to interact with the /// execution client. #[async_trait::async_trait] -pub(crate) trait ExecutionClientExt { +pub(super) trait ExecutionClientExt { async fn call_batch_get_blocks( &mut self, identifiers: Vec, diff --git a/crates/astria-conductor/src/executor/mod.rs b/crates/astria-conductor/src/executor/mod.rs index 88f038dc4b..1d088c56fe 100644 --- a/crates/astria-conductor/src/executor/mod.rs +++ b/crates/astria-conductor/src/executor/mod.rs @@ -14,6 +14,7 @@ use prost_types::Timestamp as ProstTimestamp; use proto::generated::execution::v1alpha2::{ execution_service_client::ExecutionServiceClient, Block, + CommitmentState, }; use tendermint::{ Hash, @@ -38,33 +39,59 @@ use tracing::{ warn, }; -use crate::{ - execution_client::ExecutionClientExt, - types::{ - ExecutorCommitmentState, - SequencerBlockSubset, - }, -}; +use crate::data_availability::SequencerBlockSubset; +mod client; #[cfg(test)] mod tests; +use client::ExecutionClientExt as _; + /// The channel for sending commands to the executor task. pub(crate) type Sender = UnboundedSender; /// The channel the executor task uses to listen for commands. pub(crate) type Receiver = UnboundedReceiver; +/// `ExecutorCommitmentState` tracks the firm and soft [`Block`]s from the +/// execution client. This is a utility type to avoid dealing with +/// Options all over the place. +#[derive(Clone, Debug)] +pub(crate) struct ExecutorCommitmentState { + firm: Block, + soft: Block, +} + +impl ExecutorCommitmentState { + /// Creates a new `ExecutorCommitmentState` from a `CommitmentState`. + /// `firm` and `soft` should never be `None` + pub(crate) fn from_execution_client_commitment_state(data: CommitmentState) -> Self { + let firm = data.firm.expect( + "could not convert from CommitmentState to ExecutorCommitmentState. `firm` is None. \ + This should never happen.", + ); + let soft = data.soft.expect( + "could not convert from CommitmentState to ExecutorCommitmentState. `soft` is None. \ + This should never happen.", + ); + + Self { + firm, + soft, + } + } +} + // Given `Time`, convert to protobuf timestamp -fn convert_tendermint_to_prost_timestamp(value: Time) -> Result { +fn convert_tendermint_to_prost_timestamp(value: Time) -> ProstTimestamp { use tendermint_proto::google::protobuf::Timestamp as TendermintTimestamp; let TendermintTimestamp { seconds, nanos, } = value.into(); - Ok(ProstTimestamp { + ProstTimestamp { seconds, nanos, - }) + } } #[derive(Debug)] @@ -163,9 +190,11 @@ impl Executor { biased; shutdown = &mut self.shutdown => { - match shutdown { - Err(e) => warn!(error.message = %e, "shutdown channel return with error; shutting down"), - Ok(()) => info!("received shutdown signal; shutting down"), + if let Err(e) = shutdown { + let error: &(dyn std::error::Error + 'static) = &e; + warn!(error, "shutdown channel return with error; shutting down"); + } else { + info!("received shutdown signal; shutting down"); } break; } @@ -187,17 +216,19 @@ impl Executor { match executed_block_result { Ok(executed_block) => { if let Err(e) = self.update_soft_commitment(executed_block.clone()).await { + let error: &(dyn std::error::Error + 'static) = e.as_ref(); error!( height = height, - error = ?e, + error, "failed to update soft commitment" ); } } Err(e) => { + let error: &(dyn std::error::Error + 'static) = e.as_ref(); error!( height = height, - error = ?e, + error, "failed to execute block" ); } @@ -216,7 +247,7 @@ impl Executor { } } } - ) + ); } Ok(()) } @@ -229,7 +260,7 @@ impl Executor { /// execution block hash. #[instrument(skip(self), fields(sequencer_block_hash = ?block.block_hash, sequencer_block_height = block.header.height.value()))] async fn execute_block(&mut self, block: SequencerBlockSubset) -> Result { - if self.executable_block_height as u64 != block.header.height.value() { + if u64::from(self.executable_block_height) != block.header.height.value() { error!( sequencer_block_height = block.header.height.value(), executable_block_height = self.executable_block_height, @@ -257,8 +288,7 @@ impl Executor { "executing block with given parent block", ); - let timestamp = convert_tendermint_to_prost_timestamp(block.header.time) - .wrap_err("failed parsing str as protobuf timestamp")?; + let timestamp = convert_tendermint_to_prost_timestamp(block.header.time); let tx_count = block.rollup_transactions.len(); let executed_block = self @@ -324,42 +354,39 @@ impl Executor { info!("received a message from data availability without blocks; skipping execution"); return Ok(()); } - for block in blocks.into_iter() { + for block in blocks { let sequencer_block_hash = block.block_hash; let maybe_executed_block = self .sequencer_hash_to_execution_block .get(&sequencer_block_hash) .cloned(); - match maybe_executed_block { - Some(executed_block) => { - // this case means block has already been executed. - self.update_firm_commitment(executed_block) - .await - .wrap_err("executor failed to update firm commitment")?; - // remove the sequencer block hash from the map, as it's been firmly committed - self.sequencer_hash_to_execution_block - .remove(&sequencer_block_hash); - } - None => { - // this means either we didn't receive the block from the sequencer stream - - // try executing the block as it hasn't been executed before - // execute_block will check if our namespace has txs; if so, it'll return the - // resulting execution block hash, otherwise None - let executed_block = self - .execute_block(block) - .await - .wrap_err("failed to execute block")?; - - // when we execute a block received from da, nothing else has been executed on - // top of it, so we set FIRM and SOFT to this executed block - self.update_commitments(executed_block) - .await - .wrap_err("executor failed to update both commitments")?; - // remove the sequencer block hash from the map, as it's been firmly committed - self.sequencer_hash_to_execution_block - .remove(&sequencer_block_hash); - } + if let Some(block) = maybe_executed_block { + // this case means block has already been executed. + self.update_firm_commitment(block) + .await + .wrap_err("executor failed to update firm commitment")?; + // remove the sequencer block hash from the map, as it's been firmly committed + self.sequencer_hash_to_execution_block + .remove(&sequencer_block_hash); + } else { + // this means either we didn't receive the block from the sequencer stream + + // try executing the block as it hasn't been executed before + // execute_block will check if our namespace has txs; if so, it'll return the + // resulting execution block hash, otherwise None + let executed_block = self + .execute_block(block) + .await + .wrap_err("failed to execute block")?; + + // when we execute a block received from da, nothing else has been executed on + // top of it, so we set FIRM and SOFT to this executed block + self.update_commitments(executed_block) + .await + .wrap_err("executor failed to update both commitments")?; + // remove the sequencer block hash from the map, as it's been firmly committed + self.sequencer_hash_to_execution_block + .remove(&sequencer_block_hash); }; } Ok(()) diff --git a/crates/astria-conductor/src/executor/tests.rs b/crates/astria-conductor/src/executor/tests.rs index fc580b5628..6373c868d2 100644 --- a/crates/astria-conductor/src/executor/tests.rs +++ b/crates/astria-conductor/src/executor/tests.rs @@ -141,12 +141,12 @@ struct MockEnvironment { } async fn start_mock() -> MockEnvironment { - let _server = MockExecutionServer::spawn().await; + let server = MockExecutionServer::spawn().await; let chain_id = ChainId::new(b"test".to_vec()).unwrap(); - let server_url = format!("http://{}", _server.local_addr()); + let server_url = format!("http://{}", server.local_addr()); - let (_block_tx, block_rx) = mpsc::unbounded_channel(); - let (_shutdown_tx, shutdown_rx) = oneshot::channel(); + let (block_tx, block_rx) = mpsc::unbounded_channel(); + let (shutdown_tx, shutdown_rx) = oneshot::channel(); let executor = Executor::new( &server_url, chain_id, @@ -158,9 +158,9 @@ async fn start_mock() -> MockEnvironment { .unwrap(); MockEnvironment { - _server, - _block_tx, - _shutdown_tx, + _server: server, + _block_tx: block_tx, + _shutdown_tx: shutdown_tx, executor, } } diff --git a/crates/astria-conductor/src/lib.rs b/crates/astria-conductor/src/lib.rs index cb4629b643..18a3a04ca7 100644 --- a/crates/astria-conductor/src/lib.rs +++ b/crates/astria-conductor/src/lib.rs @@ -13,10 +13,8 @@ pub(crate) mod client_provider; pub mod conductor; pub mod config; pub(crate) mod data_availability; -pub(crate) mod execution_client; pub(crate) mod executor; pub(crate) mod sequencer; -pub(crate) mod types; pub use conductor::Conductor; pub use config::Config; diff --git a/crates/astria-conductor/src/main.rs b/crates/astria-conductor/src/main.rs index 6c97e93e93..0eac7aa761 100644 --- a/crates/astria-conductor/src/main.rs +++ b/crates/astria-conductor/src/main.rs @@ -47,12 +47,7 @@ async fn main() -> ExitCode { Ok(conductor) => conductor, }; - if let Err(e) = conductor.run_until_stopped().await { - let error: &(dyn std::error::Error + 'static) = e.as_ref(); - error!(error, "conductor stopped unexpectedly"); - return ExitCode::FAILURE; - } - + conductor.run_until_stopped().await; info!("conductor stopped"); ExitCode::SUCCESS } diff --git a/crates/astria-conductor/src/sequencer/mod.rs b/crates/astria-conductor/src/sequencer/mod.rs index 969a2a48f8..2f35b77a5b 100644 --- a/crates/astria-conductor/src/sequencer/mod.rs +++ b/crates/astria-conductor/src/sequencer/mod.rs @@ -53,15 +53,15 @@ pub(crate) struct Reader { /// The channel used to send messages to the executor task. executor_tx: executor::Sender, - /// The start height from which to start syncing sequencer blocks. - start_sync_height: u32, - /// The object pool providing clients to the sequencer. pool: Pool, /// The shutdown channel to notify `Reader` to shut down. shutdown: oneshot::Receiver<()>, + /// The start height from which to start syncing sequencer blocks. + start_sync_height: u32, + /// The sync-done channel to notify `Conductor` that `Reader` has finished syncing. sync_done: oneshot::Sender<()>, } @@ -75,10 +75,10 @@ impl Reader { sync_done: oneshot::Sender<()>, ) -> Self { Self { - start_sync_height, executor_tx, pool, shutdown, + start_sync_height, sync_done, } } @@ -139,16 +139,13 @@ impl Reader { 'reader_loop: loop { select! { shutdown = &mut shutdown => { - let ret = match shutdown { - Err(e) => { - let error = &e as &(dyn std::error::Error + 'static); - warn!(error, "shutdown channel closed unexpectedly; shutting down"); - Err(e).wrap_err("shut down channel closed unexpectedly") - } - Ok(()) => { - info!("received shutdown signal; shutting down"); - Ok(()) - } + let ret = if let Err(e) = shutdown { + let error = &e as &(dyn std::error::Error + 'static); + warn!(error, "shutdown channel closed unexpectedly; shutting down"); + Err(e).wrap_err("shut down channel closed unexpectedly") + } else { + info!("received shutdown signal; shutting down"); + Ok(()) }; break 'reader_loop ret; } @@ -213,7 +210,7 @@ impl Reader { } } } - } + }; } } } diff --git a/crates/astria-conductor/src/sequencer/sync.rs b/crates/astria-conductor/src/sequencer/sync.rs index 345d48ac9c..00656ecea9 100644 --- a/crates/astria-conductor/src/sequencer/sync.rs +++ b/crates/astria-conductor/src/sequencer/sync.rs @@ -98,7 +98,7 @@ pub(super) async fn run( info!("sync finished"); break 'sync Ok(()) } - ) + ); } } diff --git a/crates/astria-conductor/src/types.rs b/crates/astria-conductor/src/types.rs deleted file mode 100644 index 92e2a6c302..0000000000 --- a/crates/astria-conductor/src/types.rs +++ /dev/null @@ -1,74 +0,0 @@ -use astria_sequencer_types::{ - ChainId, - RawSequencerBlockData, - SequencerBlockData, -}; -use proto::generated::execution::v1alpha2::{ - Block, - CommitmentState, -}; -use tendermint::{ - block::Header, - Hash, -}; - -/// `SequencerBlockSubset` is a subset of a SequencerBlock that contains -/// information required for transaction data verification, and the transactions -/// for one specific rollup. -#[derive(Clone, Debug)] -pub(crate) struct SequencerBlockSubset { - pub(crate) block_hash: Hash, - pub(crate) header: Header, - pub(crate) rollup_transactions: Vec>, -} - -impl SequencerBlockSubset { - pub(crate) fn from_sequencer_block_data(data: SequencerBlockData, chain_id: &ChainId) -> Self { - // we don't need to verify the action tree root here, - // as [`SequencerBlockData`] would not be constructable - // if it was invalid - let RawSequencerBlockData { - block_hash, - header, - mut rollup_data, - .. - } = data.into_raw(); - - let rollup_transactions = rollup_data.remove(chain_id).unwrap_or_default(); - - Self { - block_hash, - header, - rollup_transactions, - } - } -} - -/// `ExecutorCommitmentState` is a struct that contains the firm and soft [`Block`]s from the -/// execution client. This is a utility type to avoid dealing with Options all over the -/// place. -#[derive(Clone, Debug)] -pub(crate) struct ExecutorCommitmentState { - pub(crate) firm: Block, - pub(crate) soft: Block, -} - -impl ExecutorCommitmentState { - /// Creates a new `ExecutorCommitmentState` from a `CommitmentState`. - /// `firm` and `soft` should never be `None` - pub(crate) fn from_execution_client_commitment_state(data: CommitmentState) -> Self { - let firm = data.firm.expect( - "could not convert from CommitmentState to ExecutorCommitmentState. `firm` is None. \ - This should never happen.", - ); - let soft = data.soft.expect( - "could not convert from CommitmentState to ExecutorCommitmentState. `soft` is None. \ - This should never happen.", - ); - - Self { - firm, - soft, - } - } -}