Skip to content

Commit

Permalink
feat(conductor): respect shutdown signals during init (#1080)
Browse files Browse the repository at this point in the history
## Summary
Conductor now respects shutdown signals it receives during init.

## Background
Conductor's task ignored shutdowns while still initializing. This meant
that Conductor would hang for up to 30 seconds.

## Changes
- refactor conductor's constituent long-running tasks to separate
initialization and running
- listen for the shutdown signal in all of conductor's tasks

## Testing
Run conductor with endpoints that hang indefinitely and sending it
SIGTERM. Observe that conductor shuts down quickly.

The main operation of conductor is unaffected on the happy path: all
blackbox tests run to completion.

A proper test for the shutdown logic will be implemented in a follow-up
refactor similar to #889
  • Loading branch information
SuperFluffy authored May 21, 2024
1 parent b83f00f commit 234829f
Show file tree
Hide file tree
Showing 5 changed files with 290 additions and 153 deletions.
14 changes: 10 additions & 4 deletions crates/astria-conductor/src/celestia/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,16 @@ pub(crate) struct Reader {

impl Reader {
pub(crate) async fn run_until_stopped(mut self) -> eyre::Result<()> {
let (executor, sequencer_chain_id) = self
.initialize()
.await
.wrap_err("initialization of runtime information failed")?;
let (executor, sequencer_chain_id) = select!(
() = self.shutdown.clone().cancelled_owned() => {
info!("received shutdown signal while waiting for Celestia reader task to initialize");
return Ok(());
}

res = self.initialize() => {
res.wrap_err("initialization of runtime information failed")?
}
);

RunningReader::from_parts(self, executor, sequencer_chain_id)
.wrap_err("failed entering run loop")?
Expand Down
15 changes: 10 additions & 5 deletions crates/astria-conductor/src/executor/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,12 @@ impl Builder {
shutdown,
} = self;

let rollup_address = rollup_address
.parse()
.wrap_err("failed to parse rollup address as URI")?;
let client = super::client::Client::connect_lazy(&rollup_address).wrap_err_with(|| {
format!(
"failed to construct execution client for provided rollup address \
`{rollup_address}`"
)
})?;

let mut firm_block_tx = None;
let mut firm_block_rx = None;
Expand All @@ -52,16 +55,18 @@ impl Builder {
let (state_tx, state_rx) = state::channel();

let executor = Executor {
client,

mode,

firm_blocks: firm_block_rx,
soft_blocks: soft_block_rx,

rollup_address,

shutdown,
state: state_tx,
blocks_pending_finalization: HashMap::new(),

max_spread: None,
};
let handle = Handle {
firm_blocks: firm_block_tx,
Expand Down
19 changes: 12 additions & 7 deletions crates/astria-conductor/src/executor/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,28 @@ use astria_eyre::eyre::{
};
use bytes::Bytes;
use pbjson_types::Timestamp;
use tonic::transport::Channel;
use tonic::transport::{
Channel,
Endpoint,
Uri,
};
use tracing::instrument;

/// A newtype wrapper around [`ExecutionServiceClient`] to work with
/// idiomatic types.
#[derive(Clone)]
pub(crate) struct Client {
uri: tonic::transport::Uri,
uri: Uri,
inner: ExecutionServiceClient<Channel>,
}

impl Client {
#[instrument(skip_all, fields(rollup_uri = %uri))]
pub(crate) async fn connect(uri: tonic::transport::Uri) -> eyre::Result<Self> {
let inner = ExecutionServiceClient::connect(uri.clone())
.await
.wrap_err("failed constructing execution service client")?;
pub(crate) fn connect_lazy(uri: &str) -> eyre::Result<Self> {
let uri: Uri = uri
.parse()
.wrap_err("failed to parse provided string as uri")?;
let endpoint = Endpoint::from(uri.clone()).connect_lazy();
let inner = ExecutionServiceClient::new(endpoint);
Ok(Self {
uri,
inner,
Expand Down
127 changes: 72 additions & 55 deletions crates/astria-conductor/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,9 @@ impl Handle<StateIsInit> {
}

pub(crate) struct Executor {
/// The execution client driving the rollup.
client: Client,

/// The mode under which this executor (and hence conductor) runs.
mode: CommitLevel,

Expand All @@ -231,8 +234,6 @@ pub(crate) struct Executor {
/// Token to listen for Conductor being shut down.
shutdown: CancellationToken,

rollup_address: tonic::transport::Uri,

/// Tracks the status of the execution chain.
state: StateSender,

Expand All @@ -241,32 +242,29 @@ pub(crate) struct Executor {
/// Required to mark firm blocks received from celestia as executed
/// without re-executing on top of the rollup node.
blocks_pending_finalization: HashMap<u32, Block>,

/// The maximum permitted spread between firm and soft blocks.
max_spread: Option<usize>,
}

impl Executor {
#[instrument(skip_all)]
#[instrument(skip_all, err)]
pub(crate) async fn run_until_stopped(mut self) -> eyre::Result<()> {
let client = Client::connect(self.rollup_address.clone())
.await
.wrap_err("failed connecting to rollup node")?;

self.set_initial_node_state(client.clone())
.await
.wrap_err("failed setting initial rollup node state")?;

let max_spread: usize = self.calculate_max_spread();
if let Some(channel) = self.soft_blocks.as_mut() {
channel.set_capacity(max_spread);
}

info!(
max_spread,
"setting capacity of soft blocks channel to maximum permitted firm<>soft commitment \
spread (this has no effect if conductor is set to perform soft-sync only)"
select!(
() = self.shutdown.clone().cancelled_owned() => {
info!(
"received shutdown signal while initializing task; \
aborting intialization and exiting"
);
return Ok(());
}
res = self.init() => {
res.wrap_err("initialization failed")?;
}
);

let reason = loop {
let spread_not_too_large = !self.is_spread_too_large(max_spread);
let spread_not_too_large = !self.is_spread_too_large();
if spread_not_too_large {
if let Some(channel) = self.soft_blocks.as_mut() {
channel.fill_permits();
Expand All @@ -288,7 +286,7 @@ impl Executor {
block.hash = %telemetry::display::base64(&block.block_hash),
"received block from celestia reader",
);
if let Err(error) = self.execute_firm(client.clone(), block).await {
if let Err(error) = self.execute_firm(block).await {
break Err(error).wrap_err("failed executing firm block");
}
}
Expand All @@ -301,7 +299,7 @@ impl Executor {
block.hash = %telemetry::display::base64(&block.block_hash()),
"received block from sequencer reader",
);
if let Err(error) = self.execute_soft(client.clone(), block).await {
if let Err(error) = self.execute_soft(block).await {
break Err(error).wrap_err("failed executing soft block");
}
}
Expand All @@ -322,6 +320,27 @@ impl Executor {
}
}

/// Runs the init logic that needs to happen before [`Executor`] can enter its main loop.
async fn init(&mut self) -> eyre::Result<()> {
self.set_initial_node_state()
.await
.wrap_err("failed setting initial rollup node state")?;

let max_spread: usize = self.calculate_max_spread();
self.max_spread.replace(max_spread);
if let Some(channel) = self.soft_blocks.as_mut() {
channel.set_capacity(max_spread);
info!(
max_spread,
"setting capacity of soft blocks channel to maximum permitted firm<>soft \
commitment spread (this has no effect if conductor is set to perform soft-sync \
only)"
);
}

Ok(())
}

/// Calculates the maximum allowed spread between firm and soft commitments heights.
///
/// The maximum allowed spread is taken as `max_spread = variance * 6`, where `variance`
Expand All @@ -345,7 +364,11 @@ impl Executor {
/// large.
///
/// Always returns `false` if this executor was configured to run without firm commitments.
fn is_spread_too_large(&self, max_spread: usize) -> bool {
///
/// # Panics
///
/// Panics if called before [`Executor::init`] because `max_spread` must be set.
fn is_spread_too_large(&self) -> bool {
if self.firm_blocks.is_none() {
return false;
}
Expand All @@ -356,7 +379,12 @@ impl Executor {
};

let is_too_far_ahead = usize::try_from(next_soft.saturating_sub(next_firm))
.map(|spread| spread >= max_spread)
.map(|spread| {
spread
>= self
.max_spread
.expect("executor must be initalized and this field set")
})
.unwrap_or(false);

if is_too_far_ahead {
Expand All @@ -369,11 +397,7 @@ impl Executor {
block.hash = %telemetry::display::base64(&block.block_hash()),
block.height = block.height().value(),
))]
async fn execute_soft(
&mut self,
client: Client,
block: FilteredSequencerBlock,
) -> eyre::Result<()> {
async fn execute_soft(&mut self, block: FilteredSequencerBlock) -> eyre::Result<()> {
// TODO(https://github.com/astriaorg/astria/issues/624): add retry logic before failing hard.
let executable_block = ExecutableBlock::from_sequencer(block, self.state.rollup_id());

Expand Down Expand Up @@ -410,14 +434,14 @@ impl Executor {
// The parent hash of the next block is the hash of the block at the current head.
let parent_hash = self.state.soft_hash();
let executed_block = self
.execute_block(client.clone(), parent_hash, executable_block)
.execute_block(parent_hash, executable_block)
.await
.wrap_err("failed to execute block")?;

self.does_block_response_fulfill_contract(ExecutionKind::Soft, &executed_block)
.wrap_err("execution API server violated contract")?;
self.update_commitment_state(client.clone(), Update::OnlySoft(executed_block.clone()))
self.update_commitment_state(Update::OnlySoft(executed_block.clone()))
.await
.wrap_err("failed to update soft commitment state")?;

Expand All @@ -431,11 +455,7 @@ impl Executor {
block.hash = %telemetry::display::base64(&block.block_hash),
block.height = block.sequencer_height().value(),
))]
async fn execute_firm(
&mut self,
client: Client,
block: ReconstructedBlock,
) -> eyre::Result<()> {
async fn execute_firm(&mut self, block: ReconstructedBlock) -> eyre::Result<()> {
let executable_block = ExecutableBlock::from_reconstructed(block);
let expected_height = self.state.next_expected_firm_sequencer_height();
let block_height = executable_block.height;
Expand All @@ -459,7 +479,7 @@ impl Executor {
let update = if self.should_execute_firm_block() {
let parent_hash = self.state.firm_hash();
let executed_block = self
.execute_block(client.clone(), parent_hash, executable_block)
.execute_block(parent_hash, executable_block)
.await
.wrap_err("failed to execute block")?;
self.does_block_response_fulfill_contract(ExecutionKind::Firm, &executed_block)
Expand All @@ -477,7 +497,7 @@ impl Executor {
"pending block not found for block number in cache. THIS SHOULD NOT HAPPEN. \
Trying to fetch the already-executed block from the rollup before giving up."
);
match client.clone().get_block(block_number).await {
match self.client.get_block(block_number).await {
Ok(block) => Update::OnlyFirm(block),
Err(error) => {
error!(
Expand All @@ -493,7 +513,7 @@ impl Executor {
}
};

self.update_commitment_state(client.clone(), update)
self.update_commitment_state(update)
.await
.wrap_err("failed to setting both commitment states to executed block")?;
Ok(())
Expand All @@ -511,7 +531,6 @@ impl Executor {
))]
async fn execute_block(
&mut self,
mut client: Client,
parent_hash: Bytes,
block: ExecutableBlock,
) -> eyre::Result<Block> {
Expand All @@ -521,7 +540,8 @@ impl Executor {
..
} = block;

let executed_block = client
let executed_block = self
.client
.execute_block(parent_hash, transactions, timestamp)
.await
.wrap_err("failed to run execute_block RPC")?;
Expand All @@ -536,20 +556,20 @@ impl Executor {
}

#[instrument(skip_all)]
async fn set_initial_node_state(&mut self, client: Client) -> eyre::Result<()> {
async fn set_initial_node_state(&mut self) -> eyre::Result<()> {
let genesis_info = {
let mut client = client.clone();
async move {
client
async {
self.client
.clone()
.get_genesis_info()
.await
.wrap_err("failed getting genesis info")
}
};
let commitment_state = {
let mut client = client.clone();
async move {
client
async {
self.client
.clone()
.get_commitment_state()
.await
.wrap_err("failed getting commitment state")
Expand All @@ -568,11 +588,7 @@ impl Executor {
}

#[instrument(skip_all)]
async fn update_commitment_state(
&mut self,
mut client: Client,
update: Update,
) -> eyre::Result<()> {
async fn update_commitment_state(&mut self, update: Update) -> eyre::Result<()> {
use Update::{
OnlyFirm,
OnlySoft,
Expand All @@ -588,7 +604,8 @@ impl Executor {
.soft(soft)
.build()
.wrap_err("failed constructing commitment state")?;
let new_state = client
let new_state = self
.client
.update_commitment_state(commitment_state)
.await
.wrap_err("failed updating remote commitment state")?;
Expand Down
Loading

0 comments on commit 234829f

Please sign in to comment.