Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A0-4346: Gather signatures and actually submit scores to chain #1907

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion finality-aleph/src/abft/current/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ mod performance;
mod traits;

pub use network::NetworkData;
pub use performance::Service as PerformanceService;
pub use performance::{Service as PerformanceService, ServiceIO as PerformanceServiceIO};

pub use crate::aleph_primitives::CURRENT_FINALITY_VERSION as VERSION;
use crate::{
Expand Down
2 changes: 1 addition & 1 deletion finality-aleph/src/abft/current/performance/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ use crate::{data_io::AlephData, Hasher};
mod scorer;
mod service;

pub use service::Service;
pub use service::{Service, ServiceIO};

type Batch<UH> = Vec<current_aleph_bft::OrderedUnit<AlephData<UH>, Hasher>>;
94 changes: 86 additions & 8 deletions finality-aleph/src/abft/current/performance/service.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,31 @@
use std::collections::HashMap;

use current_aleph_bft::NodeCount;
use futures::{
channel::{mpsc, oneshot},
StreamExt,
};
use log::{debug, error, warn};
use parity_scale_codec::Encode;
use sp_runtime::traits::Hash as _;

use crate::{
abft::{
current::performance::{scorer::Scorer, Batch},
LOG_TARGET,
},
aleph_primitives::{
crypto::SignatureSet, AuthoritySignature, Hash, Hashing, RawScore, Score, ScoreNonce,
},
data_io::AlephData,
metrics::ScoreMetrics,
party::manager::Runnable,
Hasher, UnverifiedHeader,
runtime_api::RuntimeApi,
Hasher, SessionId, UnverifiedHeader,
};

const SCORE_SUBMISSION_PERIOD: usize = 300;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this aleph-node version publish some scores actually? I run it locally with below diff included, waited 300s, and nothing happened in the logs, nor in metrics

[14:55] marol-Latitude-5521:aleph-node ((e694d21b) *%) | for port in 9615 9616 9617 9618;   curl localhost:9616/metrics 2>/dev/null | grep
 "my_abft_score{"; end
my_abft_score{chain="a0dnet1"} 0
my_abft_score{chain="a0dnet1"} 0
my_abft_score{chain="a0dnet1"} 0
my_abft_score{chain="a0dnet1"} 0

tmp.diff.txt

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No because the first nonce in every session is 0 and should be 1.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, remember that you have to specify finality version as current.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked properly and it seems to be working as intended now. Still requires bumping finality version to 5, but this is by design.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean bumping in primitives? There is a flag in run_nodes for that now.


struct FinalizationWrapper<UH, FH>
where
UH: UnverifiedHeader,
Expand Down Expand Up @@ -59,26 +69,43 @@ where
}

/// A service computing the performance score of ABFT nodes based on batches of ordered units.
pub struct Service<UH>
pub struct Service<UH, RA>
where
UH: UnverifiedHeader,
RA: RuntimeApi,
{
my_index: usize,
session_id: SessionId,
batches_from_abft: mpsc::UnboundedReceiver<Batch<UH>>,
hashes_for_aggregator: mpsc::UnboundedSender<Hash>,
signatures_from_aggregator: mpsc::UnboundedReceiver<(Hash, SignatureSet<AuthoritySignature>)>,
runtime_api: RA,
pending_scores: HashMap<Hash, Score>,
nonce: ScoreNonce,
scorer: Scorer,
metrics: ScoreMetrics,
}

impl<UH> Service<UH>
pub struct ServiceIO {
pub hashes_for_aggregator: mpsc::UnboundedSender<Hash>,
pub signatures_from_aggregator:
mpsc::UnboundedReceiver<(Hash, SignatureSet<AuthoritySignature>)>,
}

impl<UH, RA> Service<UH, RA>
where
UH: UnverifiedHeader,
RA: RuntimeApi,
{
/// Create a new service, together with a unit finalization handler that should be passed to
/// ABFT. It will wrap the provided finalization handler and call it in the background.
pub fn new<FH>(
my_index: usize,
n_members: usize,
session_id: SessionId,
finalization_handler: FH,
io: ServiceIO,
runtime_api: RA,
metrics: ScoreMetrics,
) -> (
Self,
Expand All @@ -87,38 +114,89 @@ where
where
FH: current_aleph_bft::FinalizationHandler<AlephData<UH>>,
{
let ServiceIO {
hashes_for_aggregator,
signatures_from_aggregator,
} = io;
let (batches_for_us, batches_from_abft) = mpsc::unbounded();
(
Service {
my_index,
session_id,
batches_from_abft,
hashes_for_aggregator,
signatures_from_aggregator,
runtime_api,
pending_scores: HashMap::new(),
nonce: 1,
scorer: Scorer::new(NodeCount(n_members)),
metrics,
},
FinalizationWrapper::new(finalization_handler, batches_for_us),
)
}

fn make_score(&mut self, points: RawScore) -> Score {
let result = Score {
session_id: self.session_id.0,
nonce: self.nonce,
points,
};
self.nonce += 1;
result
}
}

#[async_trait::async_trait]
impl<UH> Runnable for Service<UH>
impl<UH, RA> Runnable for Service<UH, RA>
where
UH: UnverifiedHeader,
RA: RuntimeApi,
{
async fn run(mut self, mut exit: oneshot::Receiver<()>) {
let mut batch_counter = 1;
loop {
tokio::select! {
maybe_batch = self.batches_from_abft.next() => {
let score = match maybe_batch {
let points = match maybe_batch {
Some(batch) => self.scorer.process_batch(batch),
None => {
error!(target: LOG_TARGET, "Batches' channel closed, ABFT performance scoring terminating.");
break;
},
};
debug!(target: LOG_TARGET, "Received ABFT score: {:?}.", score);
self.metrics.report_score(score[self.my_index]);
// TODO(A0-4339): sometimes submit these scores to the chain.
self.metrics.report_score(points[self.my_index]);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess self_my_index cannot ever be out of points bounds right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it should always return points of the same length, equal to the total number of nodes. We could try handling failures there here, but I don't think it's worth it.

if batch_counter % SCORE_SUBMISSION_PERIOD == 0 {
let score = self.make_score(points);
let score_hash = Hashing::hash_of(&score.encode());
debug!(target: LOG_TARGET, "Gathering signature under ABFT score: {:?}.", score);
self.pending_scores.insert(score_hash, score);
if let Err(e) = self.hashes_for_aggregator.unbounded_send(score_hash) {
error!(target: LOG_TARGET, "Failed to send score hash to signature aggregation: {}.", e);
break;
}
}
batch_counter += 1;
}
maybe_signed = self.signatures_from_aggregator.next() => {
match maybe_signed {
Some((hash, signature)) => {
match self.pending_scores.remove(&hash) {
Some(score) => {
if let Err(e) = self.runtime_api.submit_abft_score(score, signature) {
warn!(target: LOG_TARGET, "Failed to submit performance score to chain: {}.", e);
}
},
None => {
warn!(target: LOG_TARGET, "Received multisigned hash for unknown performance score, this shouldn't ever happen.");
},
}
},
None => {
error!(target: LOG_TARGET, "Signatures' channel closed, ABFT performance scoring terminating.");
break;
},
}
}
_ = &mut exit => {
debug!(target: LOG_TARGET, "ABFT performance scoring task received exit signal. Terminating.");
Expand Down
2 changes: 1 addition & 1 deletion finality-aleph/src/abft/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub use crypto::Keychain;
pub use current::{
create_aleph_config as current_create_aleph_config, run_member as run_current_member,
NetworkData as CurrentNetworkData, PerformanceService as CurrentPerformanceService,
VERSION as CURRENT_VERSION,
PerformanceServiceIO as CurrentPerformanceServiceIO, VERSION as CURRENT_VERSION,
};
pub use legacy::{
create_aleph_config as legacy_create_aleph_config, run_member as run_legacy_member,
Expand Down
15 changes: 10 additions & 5 deletions finality-aleph/src/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use primitives::TransactionHash;
use rate_limiter::SharedRateLimiter;
use sc_client_api::Backend;
use sc_keystore::{Keystore, LocalKeystore};
use sc_transaction_pool_api::TransactionPool;
use sc_transaction_pool_api::{LocalTransactionPool, TransactionPool};
use sp_consensus_aura::AuraApi;

use crate::{
Expand Down Expand Up @@ -60,7 +60,9 @@ where
C: crate::ClientForAleph<Block, BE> + Send + Sync + 'static,
C::Api: AlephSessionApi<Block> + AuraApi<Block, AuraId>,
BE: Backend<Block> + 'static,
TP: TransactionPool<Block = Block, Hash = TransactionHash> + 'static,
TP: LocalTransactionPool<Block = Block>
+ TransactionPool<Block = Block, Hash = TransactionHash>
+ 'static,
{
let AlephConfig {
authentication_network,
Expand Down Expand Up @@ -132,8 +134,10 @@ where
}
});

let runtime_api = RuntimeApiImpl::new(client.clone(), transaction_pool.clone());

let map_updater = SessionMapUpdater::new(
AuthorityProviderImpl::new(client.clone(), RuntimeApiImpl::new(client.clone())),
AuthorityProviderImpl::new(client.clone(), runtime_api.clone()),
FinalityNotifierImpl::new(client.clone()),
session_period,
);
Expand Down Expand Up @@ -178,7 +182,7 @@ where
);

let session_authority_provider =
AuthorityProviderImpl::new(client.clone(), RuntimeApiImpl::new(client.clone()));
AuthorityProviderImpl::new(client.clone(), runtime_api.clone());
let verifier = VerifierCache::new(
session_info.clone(),
SubstrateFinalizationInfo::new(client.clone()),
Expand Down Expand Up @@ -225,7 +229,7 @@ where
ValidatorIndexToAccountIdConverterImpl::new(
client.clone(),
session_info.clone(),
RuntimeApiImpl::new(client.clone()),
runtime_api.clone(),
),
);

Expand Down Expand Up @@ -271,6 +275,7 @@ where
spawn_handle,
connection_manager,
keystore,
runtime_api,
score_metrics,
),
session_info,
Expand Down
Loading