Skip to content

Commit

Permalink
Gather signatures and actually submit scores to chain
Browse files Browse the repository at this point in the history
  • Loading branch information
timorleph committed Jan 8, 2025
1 parent 7873f2a commit 478d4f5
Show file tree
Hide file tree
Showing 6 changed files with 239 additions and 54 deletions.
88 changes: 80 additions & 8 deletions finality-aleph/src/abft/current/performance/service.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,31 @@
use std::collections::HashMap;

use current_aleph_bft::NodeCount;
use futures::{
channel::{mpsc, oneshot},
StreamExt,
};
use log::{debug, error, warn};
use parity_scale_codec::Encode;
use sp_runtime::traits::Hash as _;

use crate::{
abft::{
current::performance::{scorer::Scorer, Batch},
LOG_TARGET,
},
aleph_primitives::{
crypto::SignatureSet, AuthoritySignature, Hash, Hashing, RawScore, Score, ScoreNonce,
},
data_io::AlephData,
metrics::ScoreMetrics,
party::manager::Runnable,
Hasher, UnverifiedHeader,
runtime_api::RuntimeApi,
Hasher, SessionId, UnverifiedHeader,
};

const SCORE_SUBMISSION_PERIOD: usize = 300;

struct FinalizationWrapper<UH, FH>
where
UH: UnverifiedHeader,
Expand Down Expand Up @@ -59,26 +69,41 @@ where
}

/// A service computing the performance score of ABFT nodes based on batches of ordered units.
pub struct Service<UH>
pub struct Service<UH, RA>
where
UH: UnverifiedHeader,
RA: RuntimeApi,
{
my_index: usize,
session_id: SessionId,
batches_from_abft: mpsc::UnboundedReceiver<Batch<UH>>,
hashes_for_aggregator: mpsc::UnboundedSender<Hash>,
signatures_from_aggregator: mpsc::UnboundedReceiver<(Hash, SignatureSet<AuthoritySignature>)>,
runtime_api: RA,
pending_scores: HashMap<Hash, Score>,
nonce: ScoreNonce,
scorer: Scorer,
metrics: ScoreMetrics,
}

impl<UH> Service<UH>
impl<UH, RA> Service<UH, RA>
where
UH: UnverifiedHeader,
RA: RuntimeApi,
{
/// Create a new service, together with a unit finalization handler that should be passed to
/// ABFT. It will wrap the provided finalization handler and call it in the background.
pub fn new<FH>(
my_index: usize,
n_members: usize,
session_id: SessionId,
finalization_handler: FH,
hashes_for_aggregator: mpsc::UnboundedSender<Hash>,
signatures_from_aggregator: mpsc::UnboundedReceiver<(
Hash,
SignatureSet<AuthoritySignature>,
)>,
runtime_api: RA,
metrics: ScoreMetrics,
) -> (
Self,
Expand All @@ -91,34 +116,81 @@ where
(
Service {
my_index,
session_id,
batches_from_abft,
hashes_for_aggregator,
signatures_from_aggregator,
runtime_api,
pending_scores: HashMap::new(),
nonce: 0,
scorer: Scorer::new(NodeCount(n_members)),
metrics,
},
FinalizationWrapper::new(finalization_handler, batches_for_us),
)
}

fn make_score(&mut self, points: RawScore) -> Score {
let result = Score {
session_id: self.session_id.0,
nonce: self.nonce,
points,
};
self.nonce += 1;
result
}
}

#[async_trait::async_trait]
impl<UH> Runnable for Service<UH>
impl<UH, RA> Runnable for Service<UH, RA>
where
UH: UnverifiedHeader,
RA: RuntimeApi,
{
async fn run(mut self, mut exit: oneshot::Receiver<()>) {
let mut batch_counter = 1;
loop {
tokio::select! {
maybe_batch = self.batches_from_abft.next() => {
let score = match maybe_batch {
let points = match maybe_batch {
Some(batch) => self.scorer.process_batch(batch),
None => {
error!(target: LOG_TARGET, "Batches' channel closed, ABFT performance scoring terminating.");
break;
},
};
debug!(target: LOG_TARGET, "Received ABFT score: {:?}.", score);
self.metrics.report_score(score[self.my_index]);
// TODO(A0-4339): sometimes submit these scores to the chain.
self.metrics.report_score(points[self.my_index]);
if batch_counter % SCORE_SUBMISSION_PERIOD == 0 {
let score = self.make_score(points);
let score_hash = Hashing::hash_of(&score.encode());
debug!(target: LOG_TARGET, "Gathering signature under ABFT score: {:?}.", score);
self.pending_scores.insert(score_hash, score);
if let Err(e) = self.hashes_for_aggregator.unbounded_send(score_hash) {
error!(target: LOG_TARGET, "Failed to send score hash to signature aggregation: {}.", e);
break;
}
}
batch_counter += 1;
}
maybe_signed = self.signatures_from_aggregator.next() => {
match maybe_signed {
Some((hash, signature)) => {
match self.pending_scores.remove(&hash) {
Some(score) => {
if let Err(e) = self.runtime_api.submit_abft_score(score, signature) {
warn!(target: LOG_TARGET, "Failed to submit performance score to chain: {}.", e);
}
},
None => {
warn!(target: LOG_TARGET, "Received multisigned hash for unknown performance score, this shouldn't ever happen.");
},
}
},
None => {
error!(target: LOG_TARGET, "Signatures' channel closed, ABFT performance scoring terminating.");
break;
},
}
}
_ = &mut exit => {
debug!(target: LOG_TARGET, "ABFT performance scoring task received exit signal. Terminating.");
Expand Down
15 changes: 10 additions & 5 deletions finality-aleph/src/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use primitives::TransactionHash;
use rate_limiter::SharedRateLimiter;
use sc_client_api::Backend;
use sc_keystore::{Keystore, LocalKeystore};
use sc_transaction_pool_api::TransactionPool;
use sc_transaction_pool_api::{LocalTransactionPool, TransactionPool};
use sp_consensus_aura::AuraApi;

use crate::{
Expand Down Expand Up @@ -60,7 +60,9 @@ where
C: crate::ClientForAleph<Block, BE> + Send + Sync + 'static,
C::Api: AlephSessionApi<Block> + AuraApi<Block, AuraId>,
BE: Backend<Block> + 'static,
TP: TransactionPool<Block = Block, Hash = TransactionHash> + 'static,
TP: LocalTransactionPool<Block = Block>
+ TransactionPool<Block = Block, Hash = TransactionHash>
+ 'static,
{
let AlephConfig {
authentication_network,
Expand Down Expand Up @@ -132,8 +134,10 @@ where
}
});

let runtime_api = RuntimeApiImpl::new(client.clone(), transaction_pool.clone());

let map_updater = SessionMapUpdater::new(
AuthorityProviderImpl::new(client.clone(), RuntimeApiImpl::new(client.clone())),
AuthorityProviderImpl::new(client.clone(), runtime_api.clone()),
FinalityNotifierImpl::new(client.clone()),
session_period,
);
Expand Down Expand Up @@ -178,7 +182,7 @@ where
);

let session_authority_provider =
AuthorityProviderImpl::new(client.clone(), RuntimeApiImpl::new(client.clone()));
AuthorityProviderImpl::new(client.clone(), runtime_api.clone());
let verifier = VerifierCache::new(
session_info.clone(),
SubstrateFinalizationInfo::new(client.clone()),
Expand Down Expand Up @@ -225,7 +229,7 @@ where
ValidatorIndexToAccountIdConverterImpl::new(
client.clone(),
session_info.clone(),
RuntimeApiImpl::new(client.clone()),
runtime_api.clone(),
),
);

Expand Down Expand Up @@ -271,6 +275,7 @@ where
spawn_handle,
connection_manager,
keystore,
runtime_api,
score_metrics,
),
session_info,
Expand Down
Loading

0 comments on commit 478d4f5

Please sign in to comment.