-
Notifications
You must be signed in to change notification settings - Fork 39
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support submit checkpoint in parallel #840
Changes from 9 commits
c8bfcd8
9b4a643
2abd2c0
84d83d7
4cd54de
8137c77
6c9a93e
a46a84b
4752714
143b539
a406d91
207d6e0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
|
@@ -5,13 +5,16 @@ | |||||||||
use crate::config::Subnet; | ||||||||||
use crate::manager::{BottomUpCheckpointRelayer, EthSubnetManager}; | ||||||||||
use anyhow::{anyhow, Result}; | ||||||||||
use futures_util::future::try_join_all; | ||||||||||
use fvm_shared::address::Address; | ||||||||||
use fvm_shared::clock::ChainEpoch; | ||||||||||
use ipc_api::checkpoint::{BottomUpCheckpointBundle, QuorumReachedEvent}; | ||||||||||
use ipc_wallet::{EthKeyAddress, PersistentKeyStore}; | ||||||||||
use std::cmp::max; | ||||||||||
use std::fmt::{Display, Formatter}; | ||||||||||
use std::sync::{Arc, RwLock}; | ||||||||||
use std::time::Duration; | ||||||||||
use tokio::sync::Semaphore; | ||||||||||
|
||||||||||
/// Tracks the config required for bottom up checkpoint submissions | ||||||||||
/// parent/child subnet and checkpoint period. | ||||||||||
|
@@ -26,10 +29,11 @@ pub struct CheckpointConfig { | |||||||||
/// Then it will submit at the next submission height for the new checkpoint. | ||||||||||
pub struct BottomUpCheckpointManager<T> { | ||||||||||
metadata: CheckpointConfig, | ||||||||||
parent_handler: T, | ||||||||||
parent_handler: Arc<T>, | ||||||||||
child_handler: T, | ||||||||||
/// The number of blocks away from the chain head that is considered final | ||||||||||
finalization_blocks: ChainEpoch, | ||||||||||
submission_semaphore: Arc<Semaphore>, | ||||||||||
} | ||||||||||
|
||||||||||
impl<T: BottomUpCheckpointRelayer> BottomUpCheckpointManager<T> { | ||||||||||
|
@@ -38,6 +42,7 @@ impl<T: BottomUpCheckpointRelayer> BottomUpCheckpointManager<T> { | |||||||||
child: Subnet, | ||||||||||
parent_handler: T, | ||||||||||
child_handler: T, | ||||||||||
max_parallel_submission: usize, | ||||||||||
) -> Result<Self> { | ||||||||||
let period = parent_handler | ||||||||||
.checkpoint_period(&child.id) | ||||||||||
|
@@ -49,9 +54,10 @@ impl<T: BottomUpCheckpointRelayer> BottomUpCheckpointManager<T> { | |||||||||
child, | ||||||||||
period, | ||||||||||
}, | ||||||||||
parent_handler, | ||||||||||
parent_handler: Arc::new(parent_handler), | ||||||||||
child_handler, | ||||||||||
finalization_blocks: 0, | ||||||||||
submission_semaphore: Arc::new(Semaphore::new(max_parallel_submission)), | ||||||||||
}) | ||||||||||
} | ||||||||||
|
||||||||||
|
@@ -66,12 +72,20 @@ impl BottomUpCheckpointManager<EthSubnetManager> { | |||||||||
parent: Subnet, | ||||||||||
child: Subnet, | ||||||||||
keystore: Arc<RwLock<PersistentKeyStore<EthKeyAddress>>>, | ||||||||||
max_parallel_submission: usize, | ||||||||||
) -> Result<Self> { | ||||||||||
let parent_handler = | ||||||||||
EthSubnetManager::from_subnet_with_wallet_store(&parent, Some(keystore.clone()))?; | ||||||||||
let child_handler = | ||||||||||
EthSubnetManager::from_subnet_with_wallet_store(&child, Some(keystore))?; | ||||||||||
Self::new(parent, child, parent_handler, child_handler).await | ||||||||||
Self::new( | ||||||||||
parent, | ||||||||||
child, | ||||||||||
parent_handler, | ||||||||||
child_handler, | ||||||||||
max_parallel_submission, | ||||||||||
) | ||||||||||
.await | ||||||||||
} | ||||||||||
} | ||||||||||
|
||||||||||
|
@@ -106,16 +120,15 @@ impl<T: BottomUpCheckpointRelayer + Send + Sync + 'static> BottomUpCheckpointMan | |||||||||
log::info!("launching {self} for {submitter}"); | ||||||||||
|
||||||||||
loop { | ||||||||||
if let Err(e) = self.submit_next_epoch(&submitter).await { | ||||||||||
if let Err(e) = self.submit_next_epoch(submitter).await { | ||||||||||
log::error!("cannot submit checkpoint for submitter: {submitter} due to {e}"); | ||||||||||
} | ||||||||||
|
||||||||||
tokio::time::sleep(submission_interval).await; | ||||||||||
} | ||||||||||
} | ||||||||||
|
||||||||||
/// Checks if the relayer has already submitted at the next submission epoch, if not it submits it. | ||||||||||
async fn submit_next_epoch(&self, submitter: &Address) -> Result<()> { | ||||||||||
async fn submit_next_epoch(&self, submitter: Address) -> Result<()> { | ||||||||||
let last_checkpoint_epoch = self | ||||||||||
.parent_handler | ||||||||||
.last_bottom_up_checkpoint_height(&self.metadata.child.id) | ||||||||||
|
@@ -137,6 +150,9 @@ impl<T: BottomUpCheckpointRelayer + Send + Sync + 'static> BottomUpCheckpointMan | |||||||||
let start = last_checkpoint_epoch + 1; | ||||||||||
log::debug!("start querying quorum reached events from : {start} to {finalized_height}"); | ||||||||||
|
||||||||||
let mut count = 0; | ||||||||||
let mut all_submit_tasks = vec![]; | ||||||||||
|
||||||||||
for h in start..=finalized_height { | ||||||||||
let events = self.child_handler.quorum_reached_events(h).await?; | ||||||||||
if events.is_empty() { | ||||||||||
|
@@ -162,30 +178,70 @@ impl<T: BottomUpCheckpointRelayer + Send + Sync + 'static> BottomUpCheckpointMan | |||||||||
.await?; | ||||||||||
log::debug!("bottom up bundle: {bundle:?}"); | ||||||||||
|
||||||||||
let epoch = self | ||||||||||
.parent_handler | ||||||||||
.submit_checkpoint( | ||||||||||
submitter, | ||||||||||
bundle.checkpoint, | ||||||||||
bundle.signatures, | ||||||||||
bundle.signatories, | ||||||||||
) | ||||||||||
// We support parallel checkpoint submission using FIFO order with a limited parallelism (controlled by | ||||||||||
// the size of submission_semaphore). | ||||||||||
// We need to acquire a permit (from a limited permit pool) before submitting a checkpoint. | ||||||||||
// We may wait here until a permit is available. | ||||||||||
let parent_handler_clone = Arc::clone(&self.parent_handler); | ||||||||||
let submission_permit = self | ||||||||||
.submission_semaphore | ||||||||||
.clone() | ||||||||||
.acquire_owned() | ||||||||||
.await | ||||||||||
.map_err(|e| { | ||||||||||
anyhow!( | ||||||||||
"cannot submit bottom up checkpoint at height {} due to: {e:}", | ||||||||||
event.height | ||||||||||
) | ||||||||||
})?; | ||||||||||
|
||||||||||
log::info!( | ||||||||||
"submitted bottom up checkpoint({}) in parent at height {}", | ||||||||||
event.height, | ||||||||||
epoch | ||||||||||
); | ||||||||||
.unwrap(); | ||||||||||
all_submit_tasks.push(tokio::task::spawn(async move { | ||||||||||
let height = event.height; | ||||||||||
if let Err(e) = | ||||||||||
Self::submit_checkpoint(parent_handler_clone, submitter, bundle, event) | ||||||||||
.await | ||||||||||
{ | ||||||||||
log::error!("Fail to submit checkpoint at height {height}: {e}"); | ||||||||||
drop(submission_permit); | ||||||||||
Err(e) | ||||||||||
} else { | ||||||||||
drop(submission_permit); | ||||||||||
Ok(()) | ||||||||||
} | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If all we're doing is log, we can use |
||||||||||
})); | ||||||||||
|
||||||||||
count += 1; | ||||||||||
log::debug!("This round has asynchronously submitted {count} checkpoints",); | ||||||||||
} | ||||||||||
} | ||||||||||
|
||||||||||
log::debug!("Waiting for all submissions to finish"); | ||||||||||
// Return error if any of the submit task failed. | ||||||||||
try_join_all(all_submit_tasks).await?; | ||||||||||
|
||||||||||
Ok(()) | ||||||||||
raulk marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||
} | ||||||||||
|
||||||||||
async fn submit_checkpoint( | ||||||||||
parent_handler: Arc<T>, | ||||||||||
submitter: Address, | ||||||||||
bundle: BottomUpCheckpointBundle, | ||||||||||
event: QuorumReachedEvent, | ||||||||||
) -> Result<(), anyhow::Error> { | ||||||||||
let epoch = parent_handler | ||||||||||
.submit_checkpoint( | ||||||||||
&submitter, | ||||||||||
bundle.checkpoint, | ||||||||||
bundle.signatures, | ||||||||||
bundle.signatories, | ||||||||||
) | ||||||||||
.await | ||||||||||
.map_err(|e| { | ||||||||||
anyhow!( | ||||||||||
"cannot submit bottom up checkpoint at height {} due to: {e}", | ||||||||||
event.height | ||||||||||
) | ||||||||||
})?; | ||||||||||
|
||||||||||
log::info!( | ||||||||||
"submitted bottom up checkpoint({}) in parent at height {}", | ||||||||||
event.height, | ||||||||||
epoch | ||||||||||
Comment on lines
+238
to
+240
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is also from original code. |
||||||||||
); | ||||||||||
Ok(()) | ||||||||||
} | ||||||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious about the behaviour when the number of checkpoints to submit (say 10) is actually more than number of permits (say 2). So
acquire_owned
will actually block if 2 checkpoints are being submitted until one of 2 actually finishes? Even ifsubmit_checkpoint
fails and the thread crashes,drop(submission_permit)
will be executed and no dead lock here right?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it will literally block (the
for
loop just pauses).My last version had an issue where the async task can panic before dropping the permit (because the use of
unwrap()
). It's been fixed here so the permit will be dropped no matter it succeeds or fails when submitting the checkpoint.