From e694d21b4cf9605dc587cea244dbfe8de1268123 Mon Sep 17 00:00:00 2001 From: timorleph Date: Wed, 8 Jan 2025 10:15:51 +0100 Subject: [PATCH 1/3] Gather signatures and actually submit scores to chain --- finality-aleph/src/abft/current/mod.rs | 2 +- .../src/abft/current/performance/mod.rs | 2 +- .../src/abft/current/performance/service.rs | 94 +++++++++++++++++-- finality-aleph/src/abft/mod.rs | 2 +- finality-aleph/src/nodes.rs | 15 ++- .../src/party/manager/aggregator.rs | 74 ++++++++++----- finality-aleph/src/party/manager/mod.rs | 39 ++++++-- finality-aleph/src/runtime_api.rs | 77 ++++++++++++--- primitives/src/lib.rs | 5 +- 9 files changed, 252 insertions(+), 58 deletions(-) diff --git a/finality-aleph/src/abft/current/mod.rs b/finality-aleph/src/abft/current/mod.rs index 3544bbd450..d7db69ecf7 100644 --- a/finality-aleph/src/abft/current/mod.rs +++ b/finality-aleph/src/abft/current/mod.rs @@ -9,7 +9,7 @@ mod performance; mod traits; pub use network::NetworkData; -pub use performance::Service as PerformanceService; +pub use performance::{Service as PerformanceService, ServiceIO as PerformanceServiceIO}; pub use crate::aleph_primitives::CURRENT_FINALITY_VERSION as VERSION; use crate::{ diff --git a/finality-aleph/src/abft/current/performance/mod.rs b/finality-aleph/src/abft/current/performance/mod.rs index 5cc4e4175e..23cde654cb 100644 --- a/finality-aleph/src/abft/current/performance/mod.rs +++ b/finality-aleph/src/abft/current/performance/mod.rs @@ -3,6 +3,6 @@ use crate::{data_io::AlephData, Hasher}; mod scorer; mod service; -pub use service::Service; +pub use service::{Service, ServiceIO}; type Batch = Vec, Hasher>>; diff --git a/finality-aleph/src/abft/current/performance/service.rs b/finality-aleph/src/abft/current/performance/service.rs index 06b2db2abb..8fb08ef49d 100644 --- a/finality-aleph/src/abft/current/performance/service.rs +++ b/finality-aleph/src/abft/current/performance/service.rs @@ -1,21 +1,31 @@ +use std::collections::HashMap; + use current_aleph_bft::NodeCount; use futures::{ channel::{mpsc, oneshot}, StreamExt, }; use log::{debug, error, warn}; +use parity_scale_codec::Encode; +use sp_runtime::traits::Hash as _; use crate::{ abft::{ current::performance::{scorer::Scorer, Batch}, LOG_TARGET, }, + aleph_primitives::{ + crypto::SignatureSet, AuthoritySignature, Hash, Hashing, RawScore, Score, ScoreNonce, + }, data_io::AlephData, metrics::ScoreMetrics, party::manager::Runnable, - Hasher, UnverifiedHeader, + runtime_api::RuntimeApi, + Hasher, SessionId, UnverifiedHeader, }; +const SCORE_SUBMISSION_PERIOD: usize = 300; + struct FinalizationWrapper where UH: UnverifiedHeader, @@ -59,26 +69,43 @@ where } /// A service computing the performance score of ABFT nodes based on batches of ordered units. -pub struct Service +pub struct Service where UH: UnverifiedHeader, + RA: RuntimeApi, { my_index: usize, + session_id: SessionId, batches_from_abft: mpsc::UnboundedReceiver>, + hashes_for_aggregator: mpsc::UnboundedSender, + signatures_from_aggregator: mpsc::UnboundedReceiver<(Hash, SignatureSet)>, + runtime_api: RA, + pending_scores: HashMap, + nonce: ScoreNonce, scorer: Scorer, metrics: ScoreMetrics, } -impl Service +pub struct ServiceIO { + pub hashes_for_aggregator: mpsc::UnboundedSender, + pub signatures_from_aggregator: + mpsc::UnboundedReceiver<(Hash, SignatureSet)>, +} + +impl Service where UH: UnverifiedHeader, + RA: RuntimeApi, { /// Create a new service, together with a unit finalization handler that should be passed to /// ABFT. It will wrap the provided finalization handler and call it in the background. pub fn new( my_index: usize, n_members: usize, + session_id: SessionId, finalization_handler: FH, + io: ServiceIO, + runtime_api: RA, metrics: ScoreMetrics, ) -> ( Self, @@ -87,38 +114,89 @@ where where FH: current_aleph_bft::FinalizationHandler>, { + let ServiceIO { + hashes_for_aggregator, + signatures_from_aggregator, + } = io; let (batches_for_us, batches_from_abft) = mpsc::unbounded(); ( Service { my_index, + session_id, batches_from_abft, + hashes_for_aggregator, + signatures_from_aggregator, + runtime_api, + pending_scores: HashMap::new(), + nonce: 0, scorer: Scorer::new(NodeCount(n_members)), metrics, }, FinalizationWrapper::new(finalization_handler, batches_for_us), ) } + + fn make_score(&mut self, points: RawScore) -> Score { + let result = Score { + session_id: self.session_id.0, + nonce: self.nonce, + points, + }; + self.nonce += 1; + result + } } #[async_trait::async_trait] -impl Runnable for Service +impl Runnable for Service where UH: UnverifiedHeader, + RA: RuntimeApi, { async fn run(mut self, mut exit: oneshot::Receiver<()>) { + let mut batch_counter = 1; loop { tokio::select! { maybe_batch = self.batches_from_abft.next() => { - let score = match maybe_batch { + let points = match maybe_batch { Some(batch) => self.scorer.process_batch(batch), None => { error!(target: LOG_TARGET, "Batches' channel closed, ABFT performance scoring terminating."); break; }, }; - debug!(target: LOG_TARGET, "Received ABFT score: {:?}.", score); - self.metrics.report_score(score[self.my_index]); - // TODO(A0-4339): sometimes submit these scores to the chain. + self.metrics.report_score(points[self.my_index]); + if batch_counter % SCORE_SUBMISSION_PERIOD == 0 { + let score = self.make_score(points); + let score_hash = Hashing::hash_of(&score.encode()); + debug!(target: LOG_TARGET, "Gathering signature under ABFT score: {:?}.", score); + self.pending_scores.insert(score_hash, score); + if let Err(e) = self.hashes_for_aggregator.unbounded_send(score_hash) { + error!(target: LOG_TARGET, "Failed to send score hash to signature aggregation: {}.", e); + break; + } + } + batch_counter += 1; + } + maybe_signed = self.signatures_from_aggregator.next() => { + match maybe_signed { + Some((hash, signature)) => { + match self.pending_scores.remove(&hash) { + Some(score) => { + if let Err(e) = self.runtime_api.submit_abft_score(score, signature) { + warn!(target: LOG_TARGET, "Failed to submit performance score to chain: {}.", e); + } + }, + None => { + warn!(target: LOG_TARGET, "Received multisigned hash for unknown performance score, this shouldn't ever happen."); + }, + } + }, + None => { + error!(target: LOG_TARGET, "Signatures' channel closed, ABFT performance scoring terminating."); + break; + }, + } } _ = &mut exit => { debug!(target: LOG_TARGET, "ABFT performance scoring task received exit signal. Terminating."); diff --git a/finality-aleph/src/abft/mod.rs b/finality-aleph/src/abft/mod.rs index 581a96a901..8786ea1f51 100644 --- a/finality-aleph/src/abft/mod.rs +++ b/finality-aleph/src/abft/mod.rs @@ -21,7 +21,7 @@ pub use crypto::Keychain; pub use current::{ create_aleph_config as current_create_aleph_config, run_member as run_current_member, NetworkData as CurrentNetworkData, PerformanceService as CurrentPerformanceService, - VERSION as CURRENT_VERSION, + PerformanceServiceIO as CurrentPerformanceServiceIO, VERSION as CURRENT_VERSION, }; pub use legacy::{ create_aleph_config as legacy_create_aleph_config, run_member as run_legacy_member, diff --git a/finality-aleph/src/nodes.rs b/finality-aleph/src/nodes.rs index 92b06e70b1..eef8361be2 100644 --- a/finality-aleph/src/nodes.rs +++ b/finality-aleph/src/nodes.rs @@ -9,7 +9,7 @@ use primitives::TransactionHash; use rate_limiter::SharedRateLimiter; use sc_client_api::Backend; use sc_keystore::{Keystore, LocalKeystore}; -use sc_transaction_pool_api::TransactionPool; +use sc_transaction_pool_api::{LocalTransactionPool, TransactionPool}; use sp_consensus_aura::AuraApi; use crate::{ @@ -60,7 +60,9 @@ where C: crate::ClientForAleph + Send + Sync + 'static, C::Api: AlephSessionApi + AuraApi, BE: Backend + 'static, - TP: TransactionPool + 'static, + TP: LocalTransactionPool + + TransactionPool + + 'static, { let AlephConfig { authentication_network, @@ -132,8 +134,10 @@ where } }); + let runtime_api = RuntimeApiImpl::new(client.clone(), transaction_pool.clone()); + let map_updater = SessionMapUpdater::new( - AuthorityProviderImpl::new(client.clone(), RuntimeApiImpl::new(client.clone())), + AuthorityProviderImpl::new(client.clone(), runtime_api.clone()), FinalityNotifierImpl::new(client.clone()), session_period, ); @@ -178,7 +182,7 @@ where ); let session_authority_provider = - AuthorityProviderImpl::new(client.clone(), RuntimeApiImpl::new(client.clone())); + AuthorityProviderImpl::new(client.clone(), runtime_api.clone()); let verifier = VerifierCache::new( session_info.clone(), SubstrateFinalizationInfo::new(client.clone()), @@ -225,7 +229,7 @@ where ValidatorIndexToAccountIdConverterImpl::new( client.clone(), session_info.clone(), - RuntimeApiImpl::new(client.clone()), + runtime_api.clone(), ), ); @@ -271,6 +275,7 @@ where spawn_handle, connection_manager, keystore, + runtime_api, score_metrics, ), session_info, diff --git a/finality-aleph/src/party/manager/aggregator.rs b/finality-aleph/src/party/manager/aggregator.rs index c277191b7f..f1bacd066f 100644 --- a/finality-aleph/src/party/manager/aggregator.rs +++ b/finality-aleph/src/party/manager/aggregator.rs @@ -12,7 +12,9 @@ use tokio::time; use crate::{ abft::SignatureSet, aggregation::{Aggregator, SignableTypedHash}, - aleph_primitives::BlockHash, + aleph_primitives::{ + crypto::SignatureSet as PrimitivesSignatureSet, AuthoritySignature, BlockHash, Hash, + }, block::{ substrate::{Justification, JustificationTranslator}, Header, HeaderBackend, @@ -23,7 +25,7 @@ use crate::{ network::data::Network, party::{ manager::aggregator::AggregatorVersion::{Current, Legacy}, - AuthoritySubtaskCommon, Task, + AuthoritySubtaskCommon, Task, LOG_TARGET, }, sync::JustificationSubmissions, BlockId, CurrentRmcNetworkData, Keychain, LegacyRmcNetworkData, SessionBoundaries, @@ -34,15 +36,20 @@ use crate::{ pub enum Error { MultisignaturesStreamTerminated, UnableToProcessHash, + UnableToSendSignedPerformance, } impl Display for Error { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + use Error::*; match self { - Error::MultisignaturesStreamTerminated => { - write!(f, "The stream of multisigned hashes has ended.") + MultisignaturesStreamTerminated => { + write!(f, "the stream of multisigned hashes has ended") + } + UnableToProcessHash => write!(f, "error while processing a block hash"), + UnableToSendSignedPerformance => { + write!(f, "failed to send a signed performance hash to the scorer") } - Error::UnableToProcessHash => write!(f, "Error while processing a hash."), } } } @@ -55,6 +62,9 @@ where pub blocks_from_interpreter: mpsc::UnboundedReceiver, pub justifications_for_chain: JS, pub justification_translator: JustificationTranslator, + pub performance_from_scorer: mpsc::UnboundedReceiver, + pub signed_performance_for_scorer: + mpsc::UnboundedSender<(Hash, PrimitivesSignatureSet)>, } async fn process_new_block_data( @@ -65,7 +75,7 @@ async fn process_new_block_data( CN: Network, LN: Network, { - trace!(target: "aleph-party", "Received unit {:?} in aggregator.", block); + trace!(target: LOG_TARGET, "Received unit {:?} in aggregator.", block); let hash = block.hash(); metrics.report_block(hash, Checkpoint::Ordered); aggregator @@ -93,12 +103,12 @@ where ) { Ok(justification) => justification, Err(e) => { - error!(target: "aleph-party", "Issue with translating justification from Aggregator to Sync Justification: {}.", e); + error!(target: LOG_TARGET, "Issue with translating justification from Aggregator to Sync Justification: {}.", e); return Err(()); } }; if let Err(e) = justifications_for_chain.submit(justification) { - error!(target: "aleph-party", "Issue with sending justification from Aggregator to JustificationHandler {}.", e); + error!(target: LOG_TARGET, "Issue with sending justification from Aggregator to JustificationHandler {}.", e); return Err(()); } Ok(()) @@ -124,27 +134,31 @@ where blocks_from_interpreter, mut justifications_for_chain, justification_translator, + performance_from_scorer, + signed_performance_for_scorer, } = io; let blocks_from_interpreter = blocks_from_interpreter.take_while(|block| { let block_num = block.number(); async move { if block_num == session_boundaries.last_block() { - debug!(target: "aleph-party", "Aggregator is processing last block in session."); + debug!(target: LOG_TARGET, "Aggregator is processing last block in session."); } block_num <= session_boundaries.last_block() } }); pin_mut!(blocks_from_interpreter); + pin_mut!(performance_from_scorer); let mut hash_of_last_block = None; - let mut no_more_blocks = blocks_from_interpreter.is_terminated(); + let mut session_over = blocks_from_interpreter.is_terminated(); + let mut no_more_performance = performance_from_scorer.is_terminated(); let mut status_ticker = time::interval(STATUS_REPORT_INTERVAL); loop { - trace!(target: "aleph-party", "Aggregator Loop started a next iteration"); + trace!(target: LOG_TARGET, "Aggregator Loop started a next iteration"); tokio::select! { - maybe_block = blocks_from_interpreter.next(), if !no_more_blocks => match maybe_block { + maybe_block = blocks_from_interpreter.next(), if !session_over => match maybe_block { Some(block) => { hash_of_last_block = Some(block.hash()); process_new_block_data::( @@ -154,8 +168,19 @@ where ).await; }, None => { - debug!(target: "aleph-party", "Blocks ended in aggregator."); - no_more_blocks = true; + debug!(target: LOG_TARGET, "Blocks ended in aggregator."); + session_over = true; + }, + }, + maybe_performance_hash = performance_from_scorer.next(), if !no_more_performance && !session_over => match maybe_performance_hash { + Some(hash) => { + aggregator + .start_aggregation(SignableTypedHash::Performance(hash)) + .await; + }, + None => { + debug!(target: LOG_TARGET, "Performance hashes ended in aggregator."); + no_more_performance = true; }, }, multisigned_hash = aggregator.next_multisigned_hash() => { @@ -167,23 +192,28 @@ where hash_of_last_block = None; } }, - Performance(_) => unimplemented!("we don't gather multisignatures under performance reports yet"), + Performance(hash) => { + if let Err(e) = signed_performance_for_scorer.unbounded_send((hash, multisignature.into())) { + error!(target: LOG_TARGET, "Issue with sending signed performance hash from Aggregator to Scorer {}.", e); + return Err(Error::UnableToSendSignedPerformance); + } + } } }, _ = status_ticker.tick() => { aggregator.status_report(); }, _ = &mut exit_rx => { - debug!(target: "aleph-party", "Aggregator received exit signal. Terminating."); + debug!(target: LOG_TARGET, "Aggregator received exit signal. Terminating."); break; } } - if hash_of_last_block.is_none() && no_more_blocks { - debug!(target: "aleph-party", "Aggregator processed all provided blocks. Terminating."); + if hash_of_last_block.is_none() && session_over { + debug!(target: LOG_TARGET, "Aggregator processed all provided blocks. Terminating."); break; } } - debug!(target: "aleph-party", "Aggregator finished its work."); + debug!(target: LOG_TARGET, "Aggregator finished its work."); Ok(()) } @@ -220,7 +250,7 @@ where Current(rmc_network) => Aggregator::new_current(&multikeychain, rmc_network), Legacy(rmc_network) => Aggregator::new_legacy(&multikeychain, rmc_network), }; - debug!(target: "aleph-party", "Running the aggregator task for {:?}", session_id); + debug!(target: LOG_TARGET, "Running the aggregator task for {:?}", session_id); let result = run_aggregator( aggregator_io, io, @@ -233,11 +263,11 @@ where let result = match result { Ok(_) => Ok(()), Err(err) => { - error!(target: "aleph-party", "Aggregator exited with error: {err}"); + error!(target: LOG_TARGET, "Aggregator exited with error: {err}"); Err(()) } }; - debug!(target: "aleph-party", "Aggregator task stopped for {:?}", session_id); + debug!(target: LOG_TARGET, "Aggregator task stopped for {:?}", session_id); result } }; diff --git a/finality-aleph/src/party/manager/mod.rs b/finality-aleph/src/party/manager/mod.rs index 2cc552348f..7f48b10501 100644 --- a/finality-aleph/src/party/manager/mod.rs +++ b/finality-aleph/src/party/manager/mod.rs @@ -12,9 +12,11 @@ use sp_runtime::traits::{Block as BlockT, Header as HeaderT}; use crate::{ abft::{ current_create_aleph_config, legacy_create_aleph_config, run_current_member, - run_legacy_member, CurrentPerformanceService, SpawnHandle, + run_legacy_member, CurrentPerformanceService, CurrentPerformanceServiceIO, SpawnHandle, + }, + aleph_primitives::{ + crypto::SignatureSet, AuthoritySignature, BlockHash, BlockNumber, Hash, KEY_TYPE, }, - aleph_primitives::{BlockHash, BlockNumber, KEY_TYPE}, block::{ substrate::{Justification, JustificationTranslator}, BestBlockSelector, Block, Header, HeaderVerifier, UnverifiedHeader, @@ -34,6 +36,7 @@ use crate::{ backup::ABFTBackup, manager::aggregator::AggregatorVersion, traits::NodeSessionManager, LOG_TARGET, }, + runtime_api::RuntimeApi, sync::JustificationSubmissions, AuthorityId, BlockId, CurrentRmcNetworkData, Keychain, LegacyRmcNetworkData, NodeIndex, ProvideRuntimeApi, SessionBoundaries, SessionBoundaryInfo, SessionId, SessionPeriod, @@ -81,6 +84,9 @@ where session_boundaries: SessionBoundaries, subtask_common: TaskCommon, blocks_for_aggregator: mpsc::UnboundedSender, + performance_for_aggregator: mpsc::UnboundedSender, + signed_performance_from_aggregator: + mpsc::UnboundedReceiver<(Hash, SignatureSet)>, chain_info: SubstrateChainInfoProvider, aggregator_io: aggregator::IO, multikeychain: Keychain, @@ -88,7 +94,7 @@ where backup: ABFTBackup, } -pub struct NodeSessionManagerImpl +pub struct NodeSessionManagerImpl where H: Header, B: Block + BlockT, @@ -101,6 +107,7 @@ where SM: SessionManager> + 'static, JS: JustificationSubmissions + Send + Sync + Clone, V: HeaderVerifier, + RA: RuntimeApi, { client: Arc, header_backend: HB, @@ -115,11 +122,13 @@ where spawn_handle: SpawnHandle, session_manager: SM, keystore: Arc, + runtime_api: RA, score_metrics: ScoreMetrics, _phantom: PhantomData<(B, H)>, } -impl NodeSessionManagerImpl +impl + NodeSessionManagerImpl where H: Header, B: Block + BlockT, @@ -132,6 +141,7 @@ where SM: SessionManager> + 'static, JS: JustificationSubmissions + Send + Sync + Clone, V: HeaderVerifier, + RA: RuntimeApi, { #[allow(clippy::too_many_arguments)] pub fn new( @@ -148,6 +158,7 @@ where spawn_handle: SpawnHandle, session_manager: SM, keystore: Arc, + runtime_api: RA, score_metrics: ScoreMetrics, ) -> Self { Self { @@ -164,6 +175,7 @@ where spawn_handle, session_manager, keystore, + runtime_api, score_metrics, _phantom: PhantomData, } @@ -258,6 +270,8 @@ where session_boundaries, subtask_common, blocks_for_aggregator, + performance_for_aggregator, + signed_performance_from_aggregator, chain_info, aggregator_io, multikeychain, @@ -281,7 +295,13 @@ where let (abft_performance, abft_batch_handler) = CurrentPerformanceService::new( node_id.into(), n_members, + session_id, ordered_data_interpreter, + CurrentPerformanceServiceIO { + hashes_for_aggregator: performance_for_aggregator, + signatures_from_aggregator: signed_performance_from_aggregator, + }, + self.runtime_api.clone(), self.score_metrics.clone(), ); let consensus_config = @@ -351,10 +371,14 @@ where spawn_handle: self.spawn_handle.clone(), session_id: session_id.0, }; + let (performance_for_aggregator, performance_from_scorer) = mpsc::unbounded(); + let (signed_performance_for_scorer, signed_performance_from_aggregator) = mpsc::unbounded(); let aggregator_io = aggregator::IO { blocks_from_interpreter, justifications_for_chain: self.justifications_for_sync.clone(), justification_translator: self.justification_translator.clone(), + performance_from_scorer, + signed_performance_for_scorer, }; let data_network = match self @@ -383,6 +407,8 @@ where session_boundaries, subtask_common, blocks_for_aggregator, + performance_for_aggregator, + signed_performance_from_aggregator, chain_info, aggregator_io, multikeychain, @@ -436,8 +462,8 @@ where } #[async_trait] -impl NodeSessionManager - for NodeSessionManagerImpl +impl NodeSessionManager + for NodeSessionManagerImpl where H: Header, B: Block + BlockT, @@ -450,6 +476,7 @@ where SM: SessionManager> + 'static, JS: JustificationSubmissions + Send + Sync + Clone, V: HeaderVerifier, + RA: RuntimeApi, { type Error = SM::Error; diff --git a/finality-aleph/src/runtime_api.rs b/finality-aleph/src/runtime_api.rs index b9b0c23639..594088453e 100644 --- a/finality-aleph/src/runtime_api.rs +++ b/finality-aleph/src/runtime_api.rs @@ -8,21 +8,31 @@ use frame_support::StorageHasher; use pallet_aleph_runtime_api::AlephSessionApi; use parity_scale_codec::{Decode, DecodeAll, Encode, Error as DecodeError}; use sc_client_api::Backend; +use sc_transaction_pool_api::{LocalTransactionPool, OffchainTransactionPoolFactory}; +use sp_api::ApiExt; use sp_application_crypto::key_types::AURA; use sp_core::twox_128; use sp_runtime::traits::{Block, OpaqueKeys}; use crate::{ - aleph_primitives::{AccountId, AuraId}, + aleph_primitives::{crypto::SignatureSet, AccountId, AuraId, AuthoritySignature, Score}, BlockHash, ClientForAleph, }; /// Trait handling connection between host code and runtime storage pub trait RuntimeApi: Clone + Send + Sync + 'static { type Error: Display; + /// Returns aura authorities for the next session using state from block `at` fn next_aura_authorities(&self, at: BlockHash) -> Result, Self::Error>; + + /// Submits a signed ABFT performance score. + fn submit_abft_score( + &self, + score: Score, + signature: SignatureSet, + ) -> Result<(), Self::Error>; } pub struct RuntimeApiImpl @@ -33,7 +43,8 @@ where BE: Backend + 'static, { client: Arc, - _phantom: PhantomData<(B, BE)>, + transaction_pool_factory: OffchainTransactionPoolFactory, + _phantom: PhantomData, } impl Clone for RuntimeApiImpl @@ -44,7 +55,16 @@ where BE: Backend + 'static, { fn clone(&self) -> Self { - RuntimeApiImpl::new(self.client.clone()) + let RuntimeApiImpl { + client, + transaction_pool_factory, + _phantom, + } = self; + RuntimeApiImpl { + client: client.clone(), + transaction_pool_factory: transaction_pool_factory.clone(), + _phantom: *_phantom, + } } } @@ -55,9 +75,14 @@ where B: Block, BE: Backend + 'static, { - pub fn new(client: Arc) -> Self { + pub fn new + 'static>( + client: Arc, + transaction_pool: TP, + ) -> Self { + let transaction_pool_factory = OffchainTransactionPoolFactory::new(transaction_pool); Self { client, + transaction_pool_factory, _phantom: PhantomData, } } @@ -115,22 +140,27 @@ pub enum ApiError { NoStorageMapEntry(String, String), NoStorageValue(String, String), DecodeError(DecodeError), + ScoreSubmissionFailure, + CallFailed, } impl Display for ApiError { fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> { + use ApiError::*; match self { - ApiError::StorageAccessFailure => { + StorageAccessFailure => { write!(f, "blockchain error during a storage read attempt") } - ApiError::NoStorage => write!(f, "no storage found"), - ApiError::NoStorageMapEntry(pallet, item) => { + NoStorage => write!(f, "no storage found"), + NoStorageMapEntry(pallet, item) => { write!(f, "storage map element not found under {}{}", pallet, item) } - ApiError::NoStorageValue(pallet, item) => { + NoStorageValue(pallet, item) => { write!(f, "storage value not found under {}{}", pallet, item) } - ApiError::DecodeError(error) => write!(f, "decode error: {:?}", error), + DecodeError(error) => write!(f, "decode error: {:?}", error), + ScoreSubmissionFailure => write!(f, "failed to submit ABFT score"), + CallFailed => write!(f, "a call to the runtime failed"), } } } @@ -160,6 +190,26 @@ where .filter_map(|(account_id, keys)| keys.get(AURA).map(|key| (account_id, key))) .collect()) } + + fn submit_abft_score( + &self, + score: Score, + signature: SignatureSet, + ) -> Result<(), Self::Error> { + // Use top finalized as base for this submission. + let block_hash = self.client.info().finalized_hash; + let mut runtime_api = self.client.runtime_api(); + runtime_api.register_extension( + self.transaction_pool_factory + .offchain_transaction_pool(block_hash), + ); + + match runtime_api.submit_abft_score(block_hash, score, signature) { + Ok(Some(())) => Ok(()), + Ok(None) => Err(ApiError::ScoreSubmissionFailure), + Err(_) => Err(ApiError::CallFailed), + } + } } #[cfg(test)] @@ -172,6 +222,7 @@ mod test { use frame_support::Twox64Concat; use parity_scale_codec::Encode; use primitives::Hash; + use sc_transaction_pool_api::RejectAllTxPool; use sp_runtime::Storage; use substrate_test_client::ClientExt; @@ -206,7 +257,7 @@ mod test { *client_builder.genesis_init_mut().extra_storage() = storage; let client = Arc::new(client_builder.build()); let genesis_hash = client.genesis_hash(); - let runtime_api = RuntimeApiImpl::new(client); + let runtime_api = RuntimeApiImpl::new(client, RejectAllTxPool::default()); let map_value1 = runtime_api.read_storage_map::( "Pallet", @@ -245,7 +296,7 @@ mod test { *client_builder.genesis_init_mut().extra_storage() = storage; let client = Arc::new(client_builder.build()); let genesis_hash = client.genesis_hash(); - let runtime_api = RuntimeApiImpl::new(client); + let runtime_api = RuntimeApiImpl::new(client, RejectAllTxPool::default()); let result1 = runtime_api.read_storage_map::( "Pallet", @@ -290,7 +341,7 @@ mod test { *client_builder.genesis_init_mut().extra_storage() = storage; let client = Arc::new(client_builder.build()); let genesis_hash = client.genesis_hash(); - let runtime_api = RuntimeApiImpl::new(client); + let runtime_api = RuntimeApiImpl::new(client, RejectAllTxPool::default()); // parameterize function with String instead of u32 let result1 = runtime_api.read_storage_map::( @@ -327,7 +378,7 @@ mod test { let mut client_builder = TestClientBuilder::new(); *client_builder.genesis_init_mut().extra_storage() = storage; let client = Arc::new(client_builder.build()); - let runtime_api = RuntimeApiImpl::new(client); + let runtime_api = RuntimeApiImpl::new(client, RejectAllTxPool::default()); let result1 = runtime_api.read_storage_map::( "Pallet", diff --git a/primitives/src/lib.rs b/primitives/src/lib.rs index 59a0627d75..eb42aedcd9 100644 --- a/primitives/src/lib.rs +++ b/primitives/src/lib.rs @@ -65,6 +65,9 @@ pub type AccountId = ::AccountId; /// never know... pub type AccountIndex = u32; +/// The hashing algorithm we use for everything. +pub type Hashing = BlakeTwo256; + /// A hash of some data used by the chain. pub type Hash = sp_core::H256; @@ -75,7 +78,7 @@ pub type Nonce = u32; pub type Balance = u128; /// Header type. -pub type Header = generic::Header; +pub type Header = generic::Header; /// Block type. pub type Block = generic::Block; From 0edd9113093c17b45867e1d5b0330dc83d198012 Mon Sep 17 00:00:00 2001 From: timorleph Date: Fri, 10 Jan 2025 10:07:16 +0100 Subject: [PATCH 2/3] Fix initial score nonce --- finality-aleph/src/abft/current/performance/service.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/finality-aleph/src/abft/current/performance/service.rs b/finality-aleph/src/abft/current/performance/service.rs index 8fb08ef49d..1dedcb4713 100644 --- a/finality-aleph/src/abft/current/performance/service.rs +++ b/finality-aleph/src/abft/current/performance/service.rs @@ -128,7 +128,7 @@ where signatures_from_aggregator, runtime_api, pending_scores: HashMap::new(), - nonce: 0, + nonce: 1, scorer: Scorer::new(NodeCount(n_members)), metrics, }, From 99102da1edf77281215987d9b8db4ded7462f7ec Mon Sep 17 00:00:00 2001 From: timorleph Date: Fri, 10 Jan 2025 10:21:19 +0100 Subject: [PATCH 3/3] Make run_nodes export metricsproperly --- scripts/run_nodes.sh | 3 +++ 1 file changed, 3 insertions(+) diff --git a/scripts/run_nodes.sh b/scripts/run_nodes.sh index 30cd64ec1e..2d5972440a 100755 --- a/scripts/run_nodes.sh +++ b/scripts/run_nodes.sh @@ -44,6 +44,7 @@ CHAINSPEC_GENERATOR="target/release/chain-bootstrapper" NODE_P2P_PORT_RANGE_START=30333 NODE_VALIDATOR_PORT_RANGE_START=30343 NODE_RPC_PORT_RANGE_START=9944 +PROMETHEUS_PORT_RANGE_START=9615 # ------------------------ argument parsing and usage ----------------------- @@ -174,6 +175,7 @@ function run_node() { --name "${node_name}" --rpc-port $((NODE_RPC_PORT_RANGE_START + index)) --port $((NODE_P2P_PORT_RANGE_START + index)) + --prometheus-port $((PROMETHEUS_PORT_RANGE_START + index)) --validator-port "${validator_port}" --node-key-file "${BASE_PATH}/${account_id}/p2p_secret" --backup-path "${BASE_PATH}/${account_id}/backup-stash" @@ -193,6 +195,7 @@ function run_node() { -laleph-data-store=debug -laleph-updater=debug -laleph-metrics=debug + -laleph-abft=debug ) info "Running node ${index}..."