Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support submit checkpoint in parallel #840

Merged
merged 12 commits into from
Apr 1, 2024
7 changes: 7 additions & 0 deletions ipc/cli/src/commands/checkpoint/relayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ impl CommandLineHandler for BottomUpRelayer {
parent.clone(),
child.clone(),
Arc::new(RwLock::new(keystore)),
arguments.max_parallelism,
)
.await?;

Expand Down Expand Up @@ -88,4 +89,10 @@ pub(crate) struct BottomUpRelayerArgs {
pub finalization_blocks: Option<u64>,
#[arg(long, help = "The hex encoded address of the submitter")]
pub submitter: Option<String>,
#[arg(
long,
default_value = "4",
help = "The max parallelism for submitting checkpoints"
)]
pub max_parallelism: usize,
}
108 changes: 82 additions & 26 deletions ipc/provider/src/checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@
use crate::config::Subnet;
use crate::manager::{BottomUpCheckpointRelayer, EthSubnetManager};
use anyhow::{anyhow, Result};
use futures_util::future::try_join_all;
use fvm_shared::address::Address;
use fvm_shared::clock::ChainEpoch;
use ipc_api::checkpoint::{BottomUpCheckpointBundle, QuorumReachedEvent};
use ipc_wallet::{EthKeyAddress, PersistentKeyStore};
use std::cmp::max;
use std::fmt::{Display, Formatter};
use std::sync::{Arc, RwLock};
use std::time::Duration;
use tokio::sync::Semaphore;

/// Tracks the config required for bottom up checkpoint submissions
/// parent/child subnet and checkpoint period.
Expand All @@ -26,10 +29,11 @@ pub struct CheckpointConfig {
/// Then it will submit at the next submission height for the new checkpoint.
pub struct BottomUpCheckpointManager<T> {
metadata: CheckpointConfig,
parent_handler: T,
parent_handler: Arc<T>,
child_handler: T,
/// The number of blocks away from the chain head that is considered final
finalization_blocks: ChainEpoch,
submission_semaphore: Arc<Semaphore>,
}

impl<T: BottomUpCheckpointRelayer> BottomUpCheckpointManager<T> {
Expand All @@ -38,6 +42,7 @@ impl<T: BottomUpCheckpointRelayer> BottomUpCheckpointManager<T> {
child: Subnet,
parent_handler: T,
child_handler: T,
max_parallelism: usize,
) -> Result<Self> {
let period = parent_handler
.checkpoint_period(&child.id)
Expand All @@ -49,9 +54,10 @@ impl<T: BottomUpCheckpointRelayer> BottomUpCheckpointManager<T> {
child,
period,
},
parent_handler,
parent_handler: Arc::new(parent_handler),
child_handler,
finalization_blocks: 0,
submission_semaphore: Arc::new(Semaphore::new(max_parallelism)),
})
}

Expand All @@ -66,12 +72,20 @@ impl BottomUpCheckpointManager<EthSubnetManager> {
parent: Subnet,
child: Subnet,
keystore: Arc<RwLock<PersistentKeyStore<EthKeyAddress>>>,
max_parallelism: usize,
) -> Result<Self> {
let parent_handler =
EthSubnetManager::from_subnet_with_wallet_store(&parent, Some(keystore.clone()))?;
let child_handler =
EthSubnetManager::from_subnet_with_wallet_store(&child, Some(keystore))?;
Self::new(parent, child, parent_handler, child_handler).await
Self::new(
parent,
child,
parent_handler,
child_handler,
max_parallelism,
)
.await
}
}

Expand Down Expand Up @@ -106,16 +120,15 @@ impl<T: BottomUpCheckpointRelayer + Send + Sync + 'static> BottomUpCheckpointMan
log::info!("launching {self} for {submitter}");

loop {
if let Err(e) = self.submit_next_epoch(&submitter).await {
if let Err(e) = self.submit_next_epoch(submitter).await {
log::error!("cannot submit checkpoint for submitter: {submitter} due to {e}");
}

tokio::time::sleep(submission_interval).await;
}
}

/// Checks if the relayer has already submitted at the next submission epoch, if not it submits it.
async fn submit_next_epoch(&self, submitter: &Address) -> Result<()> {
async fn submit_next_epoch(&self, submitter: Address) -> Result<()> {
let last_checkpoint_epoch = self
.parent_handler
.last_bottom_up_checkpoint_height(&self.metadata.child.id)
Expand All @@ -137,6 +150,9 @@ impl<T: BottomUpCheckpointRelayer + Send + Sync + 'static> BottomUpCheckpointMan
let start = last_checkpoint_epoch + 1;
log::debug!("start querying quorum reached events from : {start} to {finalized_height}");

let mut count = 0;
let mut all_submit_tasks = vec![];

for h in start..=finalized_height {
let events = self.child_handler.quorum_reached_events(h).await?;
if events.is_empty() {
Expand All @@ -162,30 +178,70 @@ impl<T: BottomUpCheckpointRelayer + Send + Sync + 'static> BottomUpCheckpointMan
.await?;
log::debug!("bottom up bundle: {bundle:?}");

let epoch = self
.parent_handler
.submit_checkpoint(
submitter,
bundle.checkpoint,
bundle.signatures,
bundle.signatories,
)
// We support parallel checkpoint submission using FIFO order with a limited parallelism (controlled by
// the size of submission_semaphore).
// We need to acquire a permit (from a limited permit pool) before submitting a checkpoint.
// We may wait here until a permit is available.
let parent_handler_clone = Arc::clone(&self.parent_handler);
let submission_permit = self
.submission_semaphore
.clone()
.acquire_owned()
.await
.map_err(|e| {
anyhow!(
"cannot submit bottom up checkpoint at height {} due to: {e:}",
event.height
)
})?;

log::info!(
"submitted bottom up checkpoint({}) in parent at height {}",
event.height,
epoch
);
.unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious about the behaviour when the number of checkpoints to submit (say 10) is actually more than number of permits (say 2). So acquire_owned will actually block if 2 checkpoints are being submitted until one of 2 actually finishes? Even if submit_checkpoint fails and the thread crashes, drop(submission_permit) will be executed and no dead lock here right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So acquire_owned will actually block if 2 checkpoints are being submitted until one of 2 actually finishes?

Yes, it will literally block (the for loop just pauses).

Even if submit_checkpoint fails and the thread crashes, drop(submission_permit) will be executed and no dead lock here right?

My last version had an issue where the async task can panic before dropping the permit (because the use of unwrap()). It's been fixed here so the permit will be dropped no matter it succeeds or fails when submitting the checkpoint.

all_submit_tasks.push(tokio::task::spawn(async move {
let height = event.height;
if let Err(e) =
Self::submit_checkpoint(parent_handler_clone, submitter, bundle, event)
.await
{
log::error!("Fail to submit checkpoint at height {height}: {e}");
drop(submission_permit);
Err(e)
} else {
drop(submission_permit);
Ok(())
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If all we're doing is log, we can use inspect_err.

}));

count += 1;
log::debug!("This round has asynchronously submitted {count} checkpoints",);
}
}

log::debug!("Waiting for all submissions to finish");
// Return error if any of the submit task failed.
try_join_all(all_submit_tasks).await?;

Ok(())
raulk marked this conversation as resolved.
Show resolved Hide resolved
}

async fn submit_checkpoint(
parent_handler: Arc<T>,
submitter: Address,
bundle: BottomUpCheckpointBundle,
event: QuorumReachedEvent,
) -> Result<(), anyhow::Error> {
let epoch = parent_handler
.submit_checkpoint(
&submitter,
bundle.checkpoint,
bundle.signatures,
bundle.signatories,
)
.await
.map_err(|e| {
anyhow!(
"cannot submit bottom up checkpoint at height {} due to: {e}",
event.height
)
})?;

log::info!(
"submitted bottom up checkpoint({}) in parent at height {}",
event.height,
epoch
Comment on lines +238 to +240
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"submitted bottom up checkpoint({}) in parent at height {}",
event.height,
epoch
height = event.height, epoch, "submitted bottom up checkpoint"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is also from original code.
Your code doesn't compile? height is not recognized in this log::info! macro. I end up putting them in the string.

);
Ok(())
}
}
Loading