Skip to content

Commit

Permalink
made attestation controller optional
Browse files Browse the repository at this point in the history
  • Loading branch information
pompon0 committed Oct 28, 2024
1 parent 89eadd3 commit 8ab9f01
Show file tree
Hide file tree
Showing 4 changed files with 304 additions and 105 deletions.
25 changes: 18 additions & 7 deletions core/node/consensus/src/en.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ impl EN {
)
.await
.wrap("Store::new()")?;
s.spawn_bg(async { Ok(runner.run(ctx).await?) });
s.spawn_bg(async { Ok(runner.run(ctx).await.context("Store::runner()")?) });

// Run the temporary fetcher until the certificates are backfilled.
// Temporary fetcher should be removed once json RPC syncing is fully deprecated.
Expand All @@ -146,14 +146,25 @@ impl EN {
let (block_store, runner) = BlockStore::new(ctx, Box::new(store.clone()))
.await
.wrap("BlockStore::new()")?;
s.spawn_bg(async { Ok(runner.run(ctx).await?) });
s.spawn_bg(async { Ok(runner.run(ctx).await.context("BlockStore::run()")?) });

let attestation = Arc::new(attestation::Controller::new(attester));
s.spawn_bg(self.run_attestation_controller(
ctx,
global_config.clone(),
attestation.clone(),
));
s.spawn_bg({
let global_config = global_config.clone();
let attestation = attestation.clone();
async {
let res = self
.run_attestation_controller(ctx, global_config, attestation)
.await
.wrap("run_attestation_controller()");
// Attestation currently is not critical for the node to function.
// If it fails, we just log the error and continue.
if let Err(err) = res {
tracing::error!("attestation controller failed: {err:#}");
}
Ok(())
}
});

let executor = executor::Executor {
config: config::executor(&cfg, &secrets, &global_config, build_version)?,
Expand Down
201 changes: 105 additions & 96 deletions core/node/consensus/src/mn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub async fn run_main_node(

tracing::debug!(is_attester = attester.is_some(), "main node attester mode");

scope::run!(&ctx, |ctx, s| async {
let res: ctx::Result<()> = scope::run!(&ctx, |ctx, s| async {
if let Some(spec) = &cfg.genesis_spec {
let spec = config::GenesisSpec::parse(spec).context("GenesisSpec::parse()")?;

Expand All @@ -46,7 +46,7 @@ pub async fn run_main_node(
let (store, runner) = Store::new(ctx, pool.clone(), None, None)
.await
.wrap("Store::new()")?;
s.spawn_bg(runner.run(ctx));
s.spawn_bg(async { Ok(runner.run(ctx).await.context("Store::runner()")?) });

let global_config = pool
.connection(ctx)
Expand All @@ -56,25 +56,36 @@ pub async fn run_main_node(
.await
.wrap("global_config()")?
.context("global_config() disappeared")?;
anyhow::ensure!(
global_config.genesis.leader_selection
== validator::LeaderSelectionMode::Sticky(validator_key.public()),
"unsupported leader selection mode - main node has to be the leader"
);
if global_config.genesis.leader_selection
!= validator::LeaderSelectionMode::Sticky(validator_key.public())
{
return Err(anyhow::format_err!(
"unsupported leader selection mode - main node has to be the leader"
)
.into());
}

let (block_store, runner) = BlockStore::new(ctx, Box::new(store.clone()))
.await
.wrap("BlockStore::new()")?;
s.spawn_bg(runner.run(ctx));
s.spawn_bg(async { Ok(runner.run(ctx).await.context("BlockStore::run()")?) });

let attestation = Arc::new(attestation::Controller::new(attester));
s.spawn_bg(run_attestation_controller(
ctx,
&pool,
global_config.clone(),
attestation.clone(),
));

s.spawn_bg({
let global_config = global_config.clone();
let attestation = attestation.clone();
async {
let res = run_attestation_controller(ctx, &pool, global_config, attestation)
.await
.wrap("run_attestation_controller()");
// Attestation currently is not critical for the node to function.
// If it fails, we just log the error and continue.
if let Err(err) = res {
tracing::error!("attestation controller failed: {err:#}");
}
Ok(())
}
});
let executor = executor::Executor {
config: config::executor(&cfg, &secrets, &global_config, None)?,
block_store,
Expand All @@ -87,9 +98,14 @@ pub async fn run_main_node(
};

tracing::info!("running the main node executor");
executor.run(ctx).await
executor.run(ctx).await.context("executor")?;
Ok(())
})
.await
.await;
match res {
Ok(()) | Err(ctx::Error::Canceled(_)) => Ok(()),
Err(ctx::Error::Internal(err)) => Err(err),
}
}

/// Manages attestation state by configuring the
Expand All @@ -100,91 +116,84 @@ async fn run_attestation_controller(
pool: &ConnectionPool,
cfg: consensus_dal::GlobalConfig,
attestation: Arc<attestation::Controller>,
) -> anyhow::Result<()> {
) -> ctx::Result<()> {
const POLL_INTERVAL: time::Duration = time::Duration::seconds(5);
let registry = registry::Registry::new(cfg.genesis, pool.clone()).await;
let registry_addr = cfg.registry_address.map(registry::Address::new);
let mut next = attester::BatchNumber(0);
let res = async {
loop {
// After regenesis it might happen that the batch number for the first block
// is not immediately known (the first block was not produced yet),
// therefore we need to wait for it.
let status = loop {
match pool
.connection(ctx)
.await
.wrap("connection()")?
.attestation_status(ctx)
.await
.wrap("attestation_status()")?
{
Some(status) if status.next_batch_to_attest >= next => break status,
_ => {}
}
ctx.sleep(POLL_INTERVAL).await?;
};
next = status.next_batch_to_attest.next();
tracing::info!(
"waiting for hash of batch {:?}",
status.next_batch_to_attest
);
let info = pool
.wait_for_batch_info(ctx, status.next_batch_to_attest, POLL_INTERVAL)
.await?;
let hash = consensus_dal::batch_hash(&info);
let Some(committee) = registry
.attester_committee_for(ctx, registry_addr, status.next_batch_to_attest)
.await
.wrap("attester_committee_for()")?
else {
tracing::info!("attestation not required");
continue;
};
let committee = Arc::new(committee);
// Persist the derived committee.
pool.connection(ctx)
.await
.wrap("connection")?
.upsert_attester_committee(ctx, status.next_batch_to_attest, &committee)
.await
.wrap("upsert_attester_committee()")?;
tracing::info!(
"attesting batch {:?} with hash {hash:?}",
status.next_batch_to_attest
);
attestation
.start_attestation(Arc::new(attestation::Info {
batch_to_attest: attester::Batch {
hash,
number: status.next_batch_to_attest,
genesis: status.genesis,
},
committee,
}))
.await
.context("start_attestation()")?;
// Main node is the only node which can update the global AttestationStatus,
// therefore we can synchronously wait for the certificate.
let qc = attestation
.wait_for_cert(ctx, status.next_batch_to_attest)
.await?
.context("attestation config has changed unexpectedly")?;
tracing::info!(
"collected certificate for batch {:?}",
status.next_batch_to_attest
);
pool.connection(ctx)
loop {
// After regenesis it might happen that the batch number for the first block
// is not immediately known (the first block was not produced yet),
// therefore we need to wait for it.
let status = loop {
match pool
.connection(ctx)
.await
.wrap("connection()")?
.insert_batch_certificate(ctx, &qc)
.attestation_status(ctx)
.await
.wrap("insert_batch_certificate()")?;
}
}
.await;
match res {
Ok(()) | Err(ctx::Error::Canceled(_)) => Ok(()),
Err(ctx::Error::Internal(err)) => Err(err),
.wrap("attestation_status()")?
{
Some(status) if status.next_batch_to_attest >= next => break status,
_ => {}
}
ctx.sleep(POLL_INTERVAL).await?;
};
next = status.next_batch_to_attest.next();
tracing::info!(
"waiting for hash of batch {:?}",
status.next_batch_to_attest
);
let info = pool
.wait_for_batch_info(ctx, status.next_batch_to_attest, POLL_INTERVAL)
.await?;
let hash = consensus_dal::batch_hash(&info);
let Some(committee) = registry
.attester_committee_for(ctx, registry_addr, status.next_batch_to_attest)
.await
.wrap("attester_committee_for()")?
else {
tracing::info!("attestation not required");
continue;
};
let committee = Arc::new(committee);
// Persist the derived committee.
pool.connection(ctx)
.await
.wrap("connection")?
.upsert_attester_committee(ctx, status.next_batch_to_attest, &committee)
.await
.wrap("upsert_attester_committee()")?;
tracing::info!(
"attesting batch {:?} with hash {hash:?}",
status.next_batch_to_attest
);
attestation
.start_attestation(Arc::new(attestation::Info {
batch_to_attest: attester::Batch {
hash,
number: status.next_batch_to_attest,
genesis: status.genesis,
},
committee,
}))
.await
.context("start_attestation()")?;
// Main node is the only node which can update the global AttestationStatus,
// therefore we can synchronously wait for the certificate.
let qc = attestation
.wait_for_cert(ctx, status.next_batch_to_attest)
.await?
.context("attestation config has changed unexpectedly")?;
tracing::info!(
"collected certificate for batch {:?}",
status.next_batch_to_attest
);
pool.connection(ctx)
.await
.wrap("connection()")?
.insert_batch_certificate(ctx, &qc)
.await
.wrap("insert_batch_certificate()")?;
}
}
Loading

0 comments on commit 8ab9f01

Please sign in to comment.