diff --git a/Cargo.lock b/Cargo.lock index a96bb680b750..d2b7a47f84c9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3730,6 +3730,7 @@ dependencies = [ "sp-timestamp", "sp-tracing 16.0.0", "sp-trie", + "sp-version", "substrate-prometheus-endpoint", "tracing", ] @@ -3784,12 +3785,15 @@ dependencies = [ "parity-scale-codec", "parking_lot 0.12.1", "polkadot-node-primitives", + "polkadot-node-subsystem", "polkadot-parachain-primitives", "polkadot-primitives", "polkadot-test-client", "portpicker", + "rstest", "sc-cli", "sc-client-api", + "sp-api", "sp-blockchain", "sp-consensus", "sp-core", @@ -3797,6 +3801,7 @@ dependencies = [ "sp-keystore", "sp-runtime", "sp-state-machine", + "sp-version", "substrate-test-utils", "tokio", "tracing", @@ -3830,9 +3835,11 @@ dependencies = [ name = "cumulus-client-pov-recovery" version = "0.7.0" dependencies = [ + "assert_matches", "async-trait", "cumulus-primitives-core", "cumulus-relay-chain-interface", + "cumulus-test-client", "cumulus-test-service", "futures", "futures-timer", @@ -3843,12 +3850,18 @@ dependencies = [ "polkadot-primitives", "portpicker", "rand 0.8.5", + "rstest", "sc-cli", "sc-client-api", "sc-consensus", + "sc-utils", + "sp-api", + "sp-blockchain", "sp-consensus", "sp-maybe-compressed-blob", "sp-runtime", + "sp-tracing 16.0.0", + "sp-version", "substrate-test-utils", "tokio", "tracing", @@ -4219,6 +4232,7 @@ dependencies = [ "sp-api", "sp-blockchain", "sp-state-machine", + "sp-version", "thiserror", ] diff --git a/cumulus/client/consensus/common/Cargo.toml b/cumulus/client/consensus/common/Cargo.toml index d369304e2e33..09c2f58d45e4 100644 --- a/cumulus/client/consensus/common/Cargo.toml +++ b/cumulus/client/consensus/common/Cargo.toml @@ -28,6 +28,7 @@ sp-core = { path = "../../../../substrate/primitives/core" } sp-runtime = { path = "../../../../substrate/primitives/runtime" } sp-timestamp = { path = "../../../../substrate/primitives/timestamp" } sp-trie = { path = "../../../../substrate/primitives/trie" } +sp-version = { path = "../../../../substrate/primitives/version" } prometheus-endpoint = { package = "substrate-prometheus-endpoint", path = "../../../../substrate/utils/prometheus" } # Polkadot diff --git a/cumulus/client/consensus/common/src/tests.rs b/cumulus/client/consensus/common/src/tests.rs index aca922657072..2a944bc7f9fa 100644 --- a/cumulus/client/consensus/common/src/tests.rs +++ b/cumulus/client/consensus/common/src/tests.rs @@ -38,6 +38,7 @@ use polkadot_primitives::HeadData; use sc_client_api::{Backend as _, UsageProvider}; use sc_consensus::{BlockImport, BlockImportParams, ForkChoiceStrategy}; use sp_consensus::{BlockOrigin, BlockStatus}; +use sp_version::RuntimeVersion; use std::{ collections::{BTreeMap, HashMap}, pin::Pin, @@ -153,6 +154,14 @@ impl RelayChainInterface for Relaychain { unimplemented!("Not needed for test") } + async fn candidates_pending_availability( + &self, + _: PHash, + _: ParaId, + ) -> RelayChainResult> { + unimplemented!("Not needed for test") + } + async fn session_index_for_child(&self, _: PHash) -> RelayChainResult { Ok(0) } @@ -247,6 +256,10 @@ impl RelayChainInterface for Relaychain { extrinsics_root: PHash::zero(), })) } + + async fn version(&self, _: PHash) -> RelayChainResult { + unimplemented!("Not needed for test") + } } fn sproof_with_best_parent(client: &Client) -> RelayStateSproofBuilder { diff --git a/cumulus/client/network/Cargo.toml b/cumulus/client/network/Cargo.toml index d4fc75287258..0dd7c4fdb0f6 100644 --- a/cumulus/client/network/Cargo.toml +++ b/cumulus/client/network/Cargo.toml @@ -24,11 +24,14 @@ sp-consensus = { path = "../../../substrate/primitives/consensus/common" } sp-core = { path = "../../../substrate/primitives/core" } sp-runtime = { path = "../../../substrate/primitives/runtime" } sp-state-machine = { path = "../../../substrate/primitives/state-machine" } +sp-api = { path = "../../../substrate/primitives/api" } +sp-version = { path = "../../../substrate/primitives/version" } # Polkadot polkadot-node-primitives = { path = "../../../polkadot/node/primitives" } polkadot-parachain-primitives = { path = "../../../polkadot/parachain" } polkadot-primitives = { path = "../../../polkadot/primitives" } +polkadot-node-subsystem = { path = "../../../polkadot/node/subsystem" } # Cumulus cumulus-relay-chain-interface = { path = "../relay-chain-interface" } @@ -37,6 +40,7 @@ cumulus-relay-chain-interface = { path = "../relay-chain-interface" } portpicker = "0.1.1" tokio = { version = "1.32.0", features = ["macros"] } url = "2.4.0" +rstest = "0.18.2" # Substrate sc-cli = { path = "../../../substrate/client/cli" } diff --git a/cumulus/client/network/src/lib.rs b/cumulus/client/network/src/lib.rs index f442ed5840bd..dab15bba590a 100644 --- a/cumulus/client/network/src/lib.rs +++ b/cumulus/client/network/src/lib.rs @@ -20,6 +20,7 @@ //! that use the relay chain provided consensus. See [`RequireSecondedInBlockAnnounce`] //! and [`WaitToAnnounce`] for more information about this implementation. +use sp_api::RuntimeApiInfo; use sp_consensus::block_validation::{ BlockAnnounceValidator as BlockAnnounceValidatorT, Validation, }; @@ -28,6 +29,7 @@ use sp_runtime::traits::{Block as BlockT, Header as HeaderT}; use cumulus_relay_chain_interface::RelayChainInterface; use polkadot_node_primitives::{CollationSecondedSignal, Statement}; +use polkadot_node_subsystem::messages::RuntimeApiRequest; use polkadot_parachain_primitives::primitives::HeadData; use polkadot_primitives::{ CandidateReceipt, CompactStatement, Hash as PHash, Id as ParaId, OccupiedCoreAssumption, @@ -266,18 +268,41 @@ where Ok(para_head) } - /// Get the backed block hash of the given parachain in the relay chain. - async fn backed_block_hash( + /// Get the backed block hashes of the given parachain in the relay chain. + async fn backed_block_hashes( relay_chain_interface: &RCInterface, hash: PHash, para_id: ParaId, - ) -> Result, BoxedError> { - let candidate_receipt = relay_chain_interface - .candidate_pending_availability(hash, para_id) + ) -> Result, BoxedError> { + let runtime_api_version = relay_chain_interface + .version(hash) .await .map_err(|e| Box::new(BlockAnnounceError(format!("{:?}", e))) as Box<_>)?; + let parachain_host_runtime_api_version = + runtime_api_version + .api_version( + &>::ID, + ) + .unwrap_or_default(); + + // If the relay chain runtime does not support the new runtime API, fallback to the + // deprecated one. + let candidate_receipts = if parachain_host_runtime_api_version < + RuntimeApiRequest::CANDIDATES_PENDING_AVAILABILITY_RUNTIME_REQUIREMENT + { + #[allow(deprecated)] + relay_chain_interface + .candidate_pending_availability(hash, para_id) + .await + .map(|c| c.into_iter().collect::>()) + } else { + relay_chain_interface.candidates_pending_availability(hash, para_id).await + } + .map_err(|e| Box::new(BlockAnnounceError(format!("{:?}", e))) as Box<_>)?; - Ok(candidate_receipt.map(|cr| cr.descriptor.para_head)) + Ok(candidate_receipts.into_iter().map(|cr| cr.descriptor.para_head)) } /// Handle a block announcement with empty data (no statement) attached to it. @@ -298,15 +323,20 @@ where let best_head = Self::included_block(&relay_chain_interface, relay_chain_best_hash, para_id).await?; let known_best_number = best_head.number(); - let backed_block = || async { - Self::backed_block_hash(&relay_chain_interface, relay_chain_best_hash, para_id).await - }; if best_head == header { tracing::debug!(target: LOG_TARGET, "Announced block matches best block.",); - Ok(Validation::Success { is_new_best: true }) - } else if Some(HeadData(header.encode()).hash()) == backed_block().await? { + return Ok(Validation::Success { is_new_best: true }) + } + + let mut backed_blocks = + Self::backed_block_hashes(&relay_chain_interface, relay_chain_best_hash, para_id) + .await?; + + let head_hash = HeadData(header.encode()).hash(); + + if backed_blocks.any(|block_hash| block_hash == head_hash) { tracing::debug!(target: LOG_TARGET, "Announced block matches latest backed block.",); Ok(Validation::Success { is_new_best: true }) diff --git a/cumulus/client/network/src/tests.rs b/cumulus/client/network/src/tests.rs index 3f5757d5eac1..eb0d7f0e01b3 100644 --- a/cumulus/client/network/src/tests.rs +++ b/cumulus/client/network/src/tests.rs @@ -34,6 +34,7 @@ use polkadot_test_client::{ Client as PClient, ClientBlockImportExt, DefaultTestClientBuilderExt, FullBackend as PBackend, InitPolkadotBlockBuilder, TestClientBuilder, TestClientBuilderExt, }; +use rstest::rstest; use sc_client_api::{Backend, BlockchainEvents}; use sp_blockchain::HeaderBackend; use sp_consensus::BlockOrigin; @@ -42,7 +43,8 @@ use sp_keyring::Sr25519Keyring; use sp_keystore::{testing::MemoryKeystore, Keystore, KeystorePtr}; use sp_runtime::RuntimeAppPublic; use sp_state_machine::StorageValue; -use std::{collections::BTreeMap, time::Duration}; +use sp_version::RuntimeVersion; +use std::{borrow::Cow, collections::BTreeMap, time::Duration}; fn check_error(error: crate::BoxedError, check_error: impl Fn(&BlockAnnounceError) -> bool) { let error = *error @@ -53,6 +55,33 @@ fn check_error(error: crate::BoxedError, check_error: impl Fn(&BlockAnnounceErro } } +fn dummy_candidate() -> CommittedCandidateReceipt { + CommittedCandidateReceipt { + descriptor: CandidateDescriptor { + para_head: polkadot_parachain_primitives::primitives::HeadData( + default_header().encode(), + ) + .hash(), + para_id: 0u32.into(), + relay_parent: PHash::random(), + collator: CollatorPair::generate().0.public(), + persisted_validation_data_hash: PHash::random(), + pov_hash: PHash::random(), + erasure_root: PHash::random(), + signature: sp_core::sr25519::Signature::default().into(), + validation_code_hash: ValidationCodeHash::from(PHash::random()), + }, + commitments: CandidateCommitments { + upward_messages: Default::default(), + horizontal_messages: Default::default(), + new_validation_code: None, + head_data: HeadData(Vec::new()), + processed_downward_messages: 0, + hrmp_watermark: 0, + }, + } +} + #[derive(Clone)] struct DummyRelayChainInterface { data: Arc>, @@ -69,6 +98,8 @@ impl DummyRelayChainInterface { data: Arc::new(Mutex::new(ApiData { validators: vec![Sr25519Keyring::Alice.public().into()], has_pending_availability: false, + runtime_version: + RuntimeApiRequest::CANDIDATES_PENDING_AVAILABILITY_RUNTIME_REQUIREMENT, })), relay_client: Arc::new(builder.build()), relay_backend, @@ -131,36 +162,37 @@ impl RelayChainInterface for DummyRelayChainInterface { _: PHash, _: ParaId, ) -> RelayChainResult> { + if self.data.lock().runtime_version >= + RuntimeApiRequest::CANDIDATES_PENDING_AVAILABILITY_RUNTIME_REQUIREMENT + { + panic!("Should have used candidates_pending_availability instead"); + } + if self.data.lock().has_pending_availability { - Ok(Some(CommittedCandidateReceipt { - descriptor: CandidateDescriptor { - para_head: polkadot_parachain_primitives::primitives::HeadData( - default_header().encode(), - ) - .hash(), - para_id: 0u32.into(), - relay_parent: PHash::random(), - collator: CollatorPair::generate().0.public(), - persisted_validation_data_hash: PHash::random(), - pov_hash: PHash::random(), - erasure_root: PHash::random(), - signature: sp_core::sr25519::Signature::default().into(), - validation_code_hash: ValidationCodeHash::from(PHash::random()), - }, - commitments: CandidateCommitments { - upward_messages: Default::default(), - horizontal_messages: Default::default(), - new_validation_code: None, - head_data: HeadData(Vec::new()), - processed_downward_messages: 0, - hrmp_watermark: 0, - }, - })) + Ok(Some(dummy_candidate())) } else { Ok(None) } } + async fn candidates_pending_availability( + &self, + _: PHash, + _: ParaId, + ) -> RelayChainResult> { + if self.data.lock().runtime_version < + RuntimeApiRequest::CANDIDATES_PENDING_AVAILABILITY_RUNTIME_REQUIREMENT + { + panic!("Should have used candidate_pending_availability instead"); + } + + if self.data.lock().has_pending_availability { + Ok(vec![dummy_candidate()]) + } else { + Ok(vec![]) + } + } + async fn session_index_for_child(&self, _: PHash) -> RelayChainResult { Ok(0) } @@ -264,6 +296,28 @@ impl RelayChainInterface for DummyRelayChainInterface { Ok(header) } + + async fn version(&self, _: PHash) -> RelayChainResult { + let version = self.data.lock().runtime_version; + + let apis = sp_version::create_apis_vec!([( + >::ID, + version + )]) + .into_owned() + .to_vec(); + + Ok(RuntimeVersion { + spec_name: sp_version::create_runtime_str!("test"), + impl_name: sp_version::create_runtime_str!("test"), + authoring_version: 1, + spec_version: 1, + impl_version: 0, + apis: Cow::Owned(apis), + transaction_version: 5, + state_version: 1, + }) + } } fn make_validator_and_api() -> ( @@ -574,11 +628,14 @@ fn relay_parent_not_imported_when_block_announce_is_processed() { /// Ensures that when we receive a block announcement without a statement included, while the block /// is not yet included by the node checking the announcement, but the node is already backed. -#[test] -fn block_announced_without_statement_and_block_only_backed() { +#[rstest] +#[case(RuntimeApiRequest::CANDIDATES_PENDING_AVAILABILITY_RUNTIME_REQUIREMENT)] +#[case(10)] +fn block_announced_without_statement_and_block_only_backed(#[case] runtime_version: u32) { block_on(async move { let (mut validator, api) = make_validator_and_api(); api.data.lock().has_pending_availability = true; + api.data.lock().runtime_version = runtime_version; let header = default_header(); @@ -592,4 +649,5 @@ fn block_announced_without_statement_and_block_only_backed() { struct ApiData { validators: Vec, has_pending_availability: bool, + runtime_version: u32, } diff --git a/cumulus/client/pov-recovery/Cargo.toml b/cumulus/client/pov-recovery/Cargo.toml index 7afe7fae34bd..539802d69386 100644 --- a/cumulus/client/pov-recovery/Cargo.toml +++ b/cumulus/client/pov-recovery/Cargo.toml @@ -22,6 +22,8 @@ sc-consensus = { path = "../../../substrate/client/consensus/common" } sp-consensus = { path = "../../../substrate/primitives/consensus/common" } sp-maybe-compressed-blob = { path = "../../../substrate/primitives/maybe-compressed-blob" } sp-runtime = { path = "../../../substrate/primitives/runtime" } +sp-api = { path = "../../../substrate/primitives/api" } +sp-version = { path = "../../../substrate/primitives/version" } # Polkadot polkadot-node-primitives = { path = "../../../polkadot/node/primitives" } @@ -35,8 +37,14 @@ cumulus-relay-chain-interface = { path = "../relay-chain-interface" } async-trait = "0.1.79" [dev-dependencies] +rstest = "0.18.2" tokio = { version = "1.32.0", features = ["macros"] } portpicker = "0.1.1" +sp-blockchain = { path = "../../../substrate/primitives/blockchain" } +cumulus-test-client = { path = "../../test/client" } +sc-utils = { path = "../../../substrate/client/utils" } +sp-tracing = { path = "../../../substrate/primitives/tracing" } +assert_matches = "1.5" # Cumulus cumulus-test-service = { path = "../../test/service" } diff --git a/cumulus/client/pov-recovery/src/active_candidate_recovery.rs b/cumulus/client/pov-recovery/src/active_candidate_recovery.rs index c41c543f04d1..50de98909ea4 100644 --- a/cumulus/client/pov-recovery/src/active_candidate_recovery.rs +++ b/cumulus/client/pov-recovery/src/active_candidate_recovery.rs @@ -21,7 +21,7 @@ use polkadot_node_subsystem::messages::AvailabilityRecoveryMessage; use futures::{channel::oneshot, stream::FuturesUnordered, Future, FutureExt, StreamExt}; -use std::{collections::HashSet, pin::Pin, sync::Arc}; +use std::{pin::Pin, sync::Arc}; use crate::RecoveryHandle; @@ -32,14 +32,12 @@ pub(crate) struct ActiveCandidateRecovery { /// The recoveries that are currently being executed. recoveries: FuturesUnordered>)> + Send>>>, - /// The block hashes of the candidates currently being recovered. - candidates: HashSet, recovery_handle: Box, } impl ActiveCandidateRecovery { pub fn new(recovery_handle: Box) -> Self { - Self { recoveries: Default::default(), candidates: Default::default(), recovery_handle } + Self { recoveries: Default::default(), recovery_handle } } /// Recover the given `candidate`. @@ -63,8 +61,6 @@ impl ActiveCandidateRecovery { ) .await; - self.candidates.insert(block_hash); - self.recoveries.push( async move { match rx.await { @@ -97,7 +93,6 @@ impl ActiveCandidateRecovery { pub async fn wait_for_recovery(&mut self) -> (Block::Hash, Option>) { loop { if let Some(res) = self.recoveries.next().await { - self.candidates.remove(&res.0); return res } else { futures::pending!() diff --git a/cumulus/client/pov-recovery/src/lib.rs b/cumulus/client/pov-recovery/src/lib.rs index 0ca21749c3eb..6ace18155e87 100644 --- a/cumulus/client/pov-recovery/src/lib.rs +++ b/cumulus/client/pov-recovery/src/lib.rs @@ -48,11 +48,12 @@ use sc_client_api::{BlockBackend, BlockchainEvents, UsageProvider}; use sc_consensus::import_queue::{ImportQueueService, IncomingBlock}; +use sp_api::RuntimeApiInfo; use sp_consensus::{BlockOrigin, BlockStatus, SyncOracle}; use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor}; use polkadot_node_primitives::{PoV, POV_BOMB_LIMIT}; -use polkadot_node_subsystem::messages::AvailabilityRecoveryMessage; +use polkadot_node_subsystem::messages::{AvailabilityRecoveryMessage, RuntimeApiRequest}; use polkadot_overseer::Handle as OverseerHandle; use polkadot_primitives::{ CandidateReceipt, CommittedCandidateReceipt, Id as ParaId, SessionIndex, @@ -75,6 +76,9 @@ use std::{ time::Duration, }; +#[cfg(test)] +mod tests; + mod active_candidate_recovery; use active_candidate_recovery::ActiveCandidateRecovery; @@ -544,7 +548,7 @@ where ) .await { - Ok(pending_candidate_stream) => pending_candidate_stream.fuse(), + Ok(pending_candidates_stream) => pending_candidates_stream.fuse(), Err(err) => { tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve pending candidate stream."); return @@ -554,9 +558,11 @@ where futures::pin_mut!(pending_candidates); loop { select! { - pending_candidate = pending_candidates.next() => { - if let Some((receipt, session_index)) = pending_candidate { - self.handle_pending_candidate(receipt, session_index); + next_pending_candidates = pending_candidates.next() => { + if let Some((candidates, session_index)) = next_pending_candidates { + for candidate in candidates { + self.handle_pending_candidate(candidate, session_index); + } } else { tracing::debug!(target: LOG_TARGET, "Pending candidates stream ended"); return; @@ -615,7 +621,7 @@ async fn pending_candidates( relay_chain_client: impl RelayChainInterface + Clone, para_id: ParaId, sync_service: Arc, -) -> RelayChainResult> { +) -> RelayChainResult, SessionIndex)>> { let import_notification_stream = relay_chain_client.import_notification_stream().await?; let filtered_stream = import_notification_stream.filter_map(move |n| { @@ -632,16 +638,54 @@ async fn pending_candidates( return None } - let pending_availability_result = client_for_closure - .candidate_pending_availability(hash, para_id) + let runtime_api_version = client_for_closure + .version(hash) .await .map_err(|e| { tracing::error!( target: LOG_TARGET, error = ?e, - "Failed to fetch pending candidates.", + "Failed to fetch relay chain runtime version.", ) - }); + }) + .ok()?; + let parachain_host_runtime_api_version = runtime_api_version + .api_version( + &>::ID, + ) + .unwrap_or_default(); + + // If the relay chain runtime does not support the new runtime API, fallback to the + // deprecated one. + let pending_availability_result = if parachain_host_runtime_api_version < + RuntimeApiRequest::CANDIDATES_PENDING_AVAILABILITY_RUNTIME_REQUIREMENT + { + #[allow(deprecated)] + client_for_closure + .candidate_pending_availability(hash, para_id) + .await + .map_err(|e| { + tracing::error!( + target: LOG_TARGET, + error = ?e, + "Failed to fetch pending candidates.", + ) + }) + .map(|candidate| candidate.into_iter().collect::>()) + } else { + client_for_closure.candidates_pending_availability(hash, para_id).await.map_err( + |e| { + tracing::error!( + target: LOG_TARGET, + error = ?e, + "Failed to fetch pending candidates.", + ) + }, + ) + }; + let session_index_result = client_for_closure.session_index_for_child(hash).await.map_err(|e| { tracing::error!( @@ -651,8 +695,8 @@ async fn pending_candidates( ) }); - if let Ok(Some(candidate)) = pending_availability_result { - session_index_result.map(|session_index| (candidate, session_index)).ok() + if let Ok(candidates) = pending_availability_result { + session_index_result.map(|session_index| (candidates, session_index)).ok() } else { None } diff --git a/cumulus/client/pov-recovery/src/tests.rs b/cumulus/client/pov-recovery/src/tests.rs new file mode 100644 index 000000000000..75bf308ef27a --- /dev/null +++ b/cumulus/client/pov-recovery/src/tests.rs @@ -0,0 +1,1404 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +use super::*; +use assert_matches::assert_matches; +use codec::{Decode, Encode}; +use cumulus_primitives_core::relay_chain::{BlockId, CandidateCommitments, CandidateDescriptor}; +use cumulus_relay_chain_interface::{ + InboundDownwardMessage, InboundHrmpMessage, OccupiedCoreAssumption, PHash, PHeader, + PersistedValidationData, StorageValue, ValidationCodeHash, ValidatorId, +}; +use cumulus_test_client::{ + runtime::{Block, Header}, + Sr25519Keyring, +}; +use futures::{channel::mpsc, SinkExt}; +use polkadot_node_primitives::AvailableData; +use polkadot_node_subsystem::{messages::AvailabilityRecoveryMessage, RecoveryError, TimeoutExt}; +use rstest::rstest; +use sc_client_api::{ + BlockImportNotification, ClientInfo, CompactProof, FinalityNotification, FinalityNotifications, + FinalizeSummary, ImportNotifications, StorageEventStream, StorageKey, +}; +use sc_consensus::import_queue::RuntimeOrigin; +use sc_utils::mpsc::{TracingUnboundedReceiver, TracingUnboundedSender}; +use sp_blockchain::Info; +use sp_runtime::{generic::SignedBlock, Justifications}; +use sp_version::RuntimeVersion; +use std::{ + borrow::Cow, + collections::BTreeMap, + ops::Range, + sync::{Arc, Mutex}, +}; +use tokio::task; + +const GENESIS_HASH: PHash = PHash::zero(); +const TEST_SESSION_INDEX: SessionIndex = 0; + +struct AvailabilityRecoverySubsystemHandle { + tx: mpsc::Sender, +} + +impl AvailabilityRecoverySubsystemHandle { + fn new() -> (Self, mpsc::Receiver) { + let (tx, rx) = mpsc::channel(10); + + (Self { tx }, rx) + } +} + +#[async_trait::async_trait] +impl RecoveryHandle for AvailabilityRecoverySubsystemHandle { + async fn send_recovery_msg( + &mut self, + message: AvailabilityRecoveryMessage, + _origin: &'static str, + ) { + self.tx.send(message).await.expect("Receiver dropped"); + } +} + +struct ParachainClientInner { + import_notifications_rx: Option>>, + finality_notifications_rx: Option>>, + usage_infos: Vec>, + block_statuses: Arc>>, +} + +impl ParachainClientInner { + fn new( + usage_infos: Vec>, + block_statuses: Arc>>, + ) -> ( + Self, + TracingUnboundedSender>, + TracingUnboundedSender>, + ) { + let (import_notifications_tx, import_notifications_rx) = + sc_utils::mpsc::tracing_unbounded("import_notif", 10); + let (finality_notifications_tx, finality_notifications_rx) = + sc_utils::mpsc::tracing_unbounded("finality_notif", 10); + ( + Self { + import_notifications_rx: Some(import_notifications_rx), + finality_notifications_rx: Some(finality_notifications_rx), + usage_infos, + block_statuses, + }, + import_notifications_tx, + finality_notifications_tx, + ) + } +} +struct ParachainClient { + inner: Arc>>, +} + +impl ParachainClient { + fn new( + usage_infos: Vec>, + block_statuses: Arc>>, + ) -> ( + Self, + TracingUnboundedSender>, + TracingUnboundedSender>, + ) { + let (inner, import_notifications_tx, finality_notifications_tx) = + ParachainClientInner::new(usage_infos, block_statuses); + ( + Self { inner: Arc::new(Mutex::new(inner)) }, + import_notifications_tx, + finality_notifications_tx, + ) + } +} + +impl BlockchainEvents for ParachainClient { + fn import_notification_stream(&self) -> ImportNotifications { + self.inner + .lock() + .expect("poisoned lock") + .import_notifications_rx + .take() + .expect("Should only be taken once") + } + + fn every_import_notification_stream(&self) -> ImportNotifications { + unimplemented!() + } + + fn finality_notification_stream(&self) -> FinalityNotifications { + self.inner + .lock() + .expect("poisoned lock") + .finality_notifications_rx + .take() + .expect("Should only be taken once") + } + + fn storage_changes_notification_stream( + &self, + _filter_keys: Option<&[StorageKey]>, + _child_filter_keys: Option<&[(StorageKey, Option>)]>, + ) -> sp_blockchain::Result> { + unimplemented!() + } +} + +impl BlockBackend for ParachainClient { + fn block_body( + &self, + _: Block::Hash, + ) -> sp_blockchain::Result::Extrinsic>>> { + unimplemented!() + } + + fn block(&self, _: Block::Hash) -> sp_blockchain::Result>> { + unimplemented!() + } + + fn block_status(&self, hash: Block::Hash) -> sp_blockchain::Result { + Ok(self + .inner + .lock() + .expect("Poisoned lock") + .block_statuses + .lock() + .expect("Poisoned lock") + .get(&hash) + .cloned() + .unwrap_or(BlockStatus::Unknown)) + } + + fn justifications(&self, _: Block::Hash) -> sp_blockchain::Result> { + unimplemented!() + } + + fn block_hash(&self, _: NumberFor) -> sp_blockchain::Result> { + unimplemented!() + } + + fn indexed_transaction(&self, _: Block::Hash) -> sp_blockchain::Result>> { + unimplemented!() + } + + fn has_indexed_transaction(&self, _: Block::Hash) -> sp_blockchain::Result { + unimplemented!() + } + + fn block_indexed_body(&self, _: Block::Hash) -> sp_blockchain::Result>>> { + unimplemented!() + } + + fn requires_full_sync(&self) -> bool { + unimplemented!() + } +} + +impl UsageProvider for ParachainClient { + fn usage_info(&self) -> ClientInfo { + let infos = &mut self.inner.lock().expect("Poisoned lock").usage_infos; + assert!(!infos.is_empty()); + + if infos.len() == 1 { + infos.last().unwrap().clone() + } else { + infos.remove(0) + } + } +} + +struct ParachainImportQueue { + import_requests_tx: TracingUnboundedSender>>, +} + +impl ParachainImportQueue { + fn new() -> (Self, TracingUnboundedReceiver>>) { + let (import_requests_tx, import_requests_rx) = + sc_utils::mpsc::tracing_unbounded("test_import_req_forwarding", 10); + (Self { import_requests_tx }, import_requests_rx) + } +} + +impl ImportQueueService for ParachainImportQueue { + fn import_blocks(&mut self, origin: BlockOrigin, blocks: Vec>) { + assert_matches!(origin, BlockOrigin::ConsensusBroadcast); + self.import_requests_tx.unbounded_send(blocks).unwrap(); + } + + fn import_justifications( + &mut self, + _: RuntimeOrigin, + _: Block::Hash, + _: NumberFor, + _: Justifications, + ) { + unimplemented!() + } +} + +#[derive(Default)] +struct DummySyncOracle { + is_major_syncing: bool, +} + +impl DummySyncOracle { + fn new(is_major_syncing: bool) -> Self { + Self { is_major_syncing } + } +} + +impl SyncOracle for DummySyncOracle { + fn is_major_syncing(&self) -> bool { + self.is_major_syncing + } + + fn is_offline(&self) -> bool { + false + } +} + +#[derive(Clone)] +struct RelaychainInner { + runtime_version: u32, + import_notifications: Vec, + candidates_pending_availability: HashMap>, +} + +#[derive(Clone)] +struct Relaychain { + inner: Arc>, +} + +impl Relaychain { + fn new(relay_chain_blocks: Vec<(PHeader, Vec)>) -> Self { + let (candidates_pending_availability, import_notifications) = relay_chain_blocks + .into_iter() + .map(|(header, receipt)| ((header.hash(), receipt), header)) + .unzip(); + Self { + inner: Arc::new(Mutex::new(RelaychainInner { + import_notifications, + candidates_pending_availability, + // The version that introduced candidates_pending_availability + runtime_version: + RuntimeApiRequest::CANDIDATES_PENDING_AVAILABILITY_RUNTIME_REQUIREMENT, + })), + } + } + + fn set_runtime_version(&self, version: u32) { + self.inner.lock().expect("Poisoned lock").runtime_version = version; + } +} + +#[async_trait::async_trait] +impl RelayChainInterface for Relaychain { + async fn version(&self, _: PHash) -> RelayChainResult { + let version = self.inner.lock().expect("Poisoned lock").runtime_version; + + let apis = sp_version::create_apis_vec!([( + >::ID, + version + )]) + .into_owned() + .to_vec(); + + Ok(RuntimeVersion { + spec_name: sp_version::create_runtime_str!("test"), + impl_name: sp_version::create_runtime_str!("test"), + authoring_version: 1, + spec_version: 1, + impl_version: 0, + apis: Cow::Owned(apis), + transaction_version: 5, + state_version: 1, + }) + } + + async fn validators(&self, _: PHash) -> RelayChainResult> { + unimplemented!("Not needed for test") + } + + async fn best_block_hash(&self) -> RelayChainResult { + unimplemented!("Not needed for test") + } + + async fn finalized_block_hash(&self) -> RelayChainResult { + unimplemented!("Not needed for test") + } + + async fn retrieve_dmq_contents( + &self, + _: ParaId, + _: PHash, + ) -> RelayChainResult> { + unimplemented!("Not needed for test") + } + + async fn retrieve_all_inbound_hrmp_channel_contents( + &self, + _: ParaId, + _: PHash, + ) -> RelayChainResult>> { + unimplemented!("Not needed for test") + } + + async fn persisted_validation_data( + &self, + _: PHash, + _: ParaId, + _: OccupiedCoreAssumption, + ) -> RelayChainResult> { + unimplemented!("Not needed for test") + } + + async fn validation_code_hash( + &self, + _: PHash, + _: ParaId, + _: OccupiedCoreAssumption, + ) -> RelayChainResult> { + unimplemented!("Not needed for test") + } + + async fn candidate_pending_availability( + &self, + hash: PHash, + _: ParaId, + ) -> RelayChainResult> { + if self.inner.lock().expect("Poisoned lock").runtime_version >= + RuntimeApiRequest::CANDIDATES_PENDING_AVAILABILITY_RUNTIME_REQUIREMENT + { + panic!("Should have used candidates_pending_availability instead"); + } + + Ok(self + .inner + .lock() + .expect("Poisoned lock") + .candidates_pending_availability + .remove(&hash) + .map(|mut c| { + assert_eq!(c.len(), 1); + c.pop().unwrap() + })) + } + + async fn candidates_pending_availability( + &self, + hash: PHash, + _: ParaId, + ) -> RelayChainResult> { + if self.inner.lock().expect("Poisoned lock").runtime_version < + RuntimeApiRequest::CANDIDATES_PENDING_AVAILABILITY_RUNTIME_REQUIREMENT + { + panic!("Should have used candidate_pending_availability instead"); + } + + Ok(self + .inner + .lock() + .expect("Poisoned lock") + .candidates_pending_availability + .remove(&hash) + .expect("Not found")) + } + + async fn session_index_for_child(&self, _: PHash) -> RelayChainResult { + Ok(TEST_SESSION_INDEX) + } + + async fn import_notification_stream( + &self, + ) -> RelayChainResult + Send>>> { + Ok(Box::pin( + futures::stream::iter(std::mem::take( + &mut self.inner.lock().expect("Poisoned lock").import_notifications, + )) + .chain(futures::stream::pending()), + )) + } + + async fn finality_notification_stream( + &self, + ) -> RelayChainResult + Send>>> { + unimplemented!("Not needed for test") + } + + async fn is_major_syncing(&self) -> RelayChainResult { + unimplemented!("Not needed for test"); + } + + fn overseer_handle(&self) -> RelayChainResult { + unimplemented!("Not needed for test") + } + + async fn get_storage_by_key( + &self, + _: PHash, + _: &[u8], + ) -> RelayChainResult> { + unimplemented!("Not needed for test") + } + + async fn prove_read( + &self, + _: PHash, + _: &Vec>, + ) -> RelayChainResult { + unimplemented!("Not needed for test") + } + + async fn wait_for_block(&self, _: PHash) -> RelayChainResult<()> { + unimplemented!("Not needed for test"); + } + + async fn new_best_notification_stream( + &self, + ) -> RelayChainResult + Send>>> { + unimplemented!("Not needed for test"); + } + + async fn header(&self, _: BlockId) -> RelayChainResult> { + unimplemented!("Not needed for test"); + } +} + +fn make_candidate_chain(candidate_number_range: Range) -> Vec { + let collator = Sr25519Keyring::Ferdie; + let mut latest_parent_hash = GENESIS_HASH; + let mut candidates = vec![]; + + for number in candidate_number_range { + let head_data = Header { + number, + digest: Default::default(), + extrinsics_root: Default::default(), + parent_hash: latest_parent_hash, + state_root: Default::default(), + }; + + latest_parent_hash = head_data.hash(); + + candidates.push(CommittedCandidateReceipt { + descriptor: CandidateDescriptor { + para_id: ParaId::from(1000), + relay_parent: PHash::zero(), + collator: collator.public().into(), + persisted_validation_data_hash: PHash::zero(), + pov_hash: PHash::zero(), + erasure_root: PHash::zero(), + signature: collator.sign(&[0u8; 132]).into(), + para_head: PHash::zero(), + validation_code_hash: PHash::zero().into(), + }, + commitments: CandidateCommitments { + head_data: head_data.encode().into(), + upward_messages: vec![].try_into().expect("empty vec fits within bounds"), + new_validation_code: None, + horizontal_messages: vec![].try_into().expect("empty vec fits within bounds"), + processed_downward_messages: 0, + hrmp_watermark: 0_u32, + }, + }); + } + + candidates +} + +fn dummy_usage_info(finalized_number: u32) -> ClientInfo { + ClientInfo { + chain: Info { + best_hash: PHash::zero(), + best_number: 0, + genesis_hash: PHash::zero(), + finalized_hash: PHash::zero(), + // Only this field is being used. + finalized_number, + finalized_state: None, + number_leaves: 0, + block_gap: None, + }, + usage: None, + } +} + +fn dummy_pvd() -> PersistedValidationData { + PersistedValidationData { + parent_head: vec![].into(), + relay_parent_number: 1, + relay_parent_storage_root: PHash::zero(), + max_pov_size: 100, + } +} + +#[tokio::test] +async fn pending_candidate_height_lower_than_latest_finalized() { + sp_tracing::init_for_tests(); + + for finalized_number in [3, 4, 5] { + let (recovery_subsystem_tx, mut recovery_subsystem_rx) = + AvailabilityRecoverySubsystemHandle::new(); + let recovery_delay_range = + RecoveryDelayRange { min: Duration::from_millis(0), max: Duration::from_millis(10) }; + let (_explicit_recovery_chan_tx, explicit_recovery_chan_rx) = mpsc::channel(10); + let candidates = make_candidate_chain(1..4); + let relay_chain_client = Relaychain::new(vec![( + PHeader { + parent_hash: PHash::from_low_u64_be(0), + number: 1, + state_root: PHash::random(), + extrinsics_root: PHash::random(), + digest: Default::default(), + }, + candidates, + )]); + let (parachain_client, _import_notifications_tx, _finality_notifications_tx) = + ParachainClient::new(vec![dummy_usage_info(finalized_number)], Default::default()); + let (parachain_import_queue, mut import_requests_rx) = ParachainImportQueue::new(); + + // If the latest finalized block has a larger height compared to the pending candidate, the + // new candidate won't be recovered. Candidates have heights is 1, 2 and 3. Latest finalized + // block is 3, 4 or 5. + let pov_recovery = PoVRecovery::::new( + Box::new(recovery_subsystem_tx), + recovery_delay_range, + Arc::new(parachain_client), + Box::new(parachain_import_queue), + relay_chain_client, + ParaId::new(1000), + explicit_recovery_chan_rx, + Arc::new(DummySyncOracle::default()), + ); + + task::spawn(pov_recovery.run()); + + // No recovery message received + assert_matches!( + recovery_subsystem_rx.next().timeout(Duration::from_millis(100)).await, + None + ); + + // No import request received + assert_matches!(import_requests_rx.next().timeout(Duration::from_millis(100)).await, None); + } +} + +#[rstest] +#[case(RuntimeApiRequest::CANDIDATES_PENDING_AVAILABILITY_RUNTIME_REQUIREMENT)] +#[case(10)] +#[tokio::test] +async fn single_pending_candidate_recovery_success(#[case] runtime_version: u32) { + sp_tracing::init_for_tests(); + + let (recovery_subsystem_tx, mut recovery_subsystem_rx) = + AvailabilityRecoverySubsystemHandle::new(); + let recovery_delay_range = + RecoveryDelayRange { min: Duration::from_millis(0), max: Duration::from_millis(10) }; + let (_explicit_recovery_chan_tx, explicit_recovery_chan_rx) = mpsc::channel(10); + let candidates = make_candidate_chain(1..2); + let header = Header::decode(&mut &candidates[0].commitments.head_data.0[..]).unwrap(); + let candidate_hash = candidates[0].hash(); + + let relay_chain_client = Relaychain::new(vec![( + PHeader { + parent_hash: PHash::from_low_u64_be(0), + number: 1, + state_root: PHash::random(), + extrinsics_root: PHash::random(), + digest: Default::default(), + }, + candidates, + )]); + relay_chain_client.set_runtime_version(runtime_version); + + let mut known_blocks = HashMap::new(); + known_blocks.insert(GENESIS_HASH, BlockStatus::InChainWithState); + let (parachain_client, _import_notifications_tx, _finality_notifications_tx) = + ParachainClient::new(vec![dummy_usage_info(0)], Arc::new(Mutex::new(known_blocks))); + let (parachain_import_queue, mut import_requests_rx) = ParachainImportQueue::new(); + + let pov_recovery = PoVRecovery::::new( + Box::new(recovery_subsystem_tx), + recovery_delay_range, + Arc::new(parachain_client), + Box::new(parachain_import_queue), + relay_chain_client, + ParaId::new(1000), + explicit_recovery_chan_rx, + Arc::new(DummySyncOracle::default()), + ); + + task::spawn(pov_recovery.run()); + + assert_matches!( + recovery_subsystem_rx.next().await, + Some(AvailabilityRecoveryMessage::RecoverAvailableData( + receipt, + session_index, + None, + None, + response_tx + )) => { + assert_eq!(receipt.hash(), candidate_hash); + assert_eq!(session_index, TEST_SESSION_INDEX); + response_tx.send( + Ok( + AvailableData { + pov: Arc::new(PoV { + block_data: ParachainBlockData::::new( + header.clone(), + vec![], + CompactProof {encoded_nodes: vec![]} + ).encode().into() + }), + validation_data: dummy_pvd(), + } + ) + ).unwrap() + } + ); + + // No more recovery messages received. + assert_matches!(recovery_subsystem_rx.next().timeout(Duration::from_millis(100)).await, None); + + // Received import request for the recovered candidate + assert_matches!(import_requests_rx.next().await, Some(incoming_blocks) => { + assert_eq!(incoming_blocks.len(), 1); + assert_eq!(incoming_blocks[0].header, Some(header)); + }); + + // No import request received + assert_matches!(import_requests_rx.next().timeout(Duration::from_millis(100)).await, None); +} + +#[tokio::test] +async fn single_pending_candidate_recovery_retry_succeeds() { + sp_tracing::init_for_tests(); + + let (recovery_subsystem_tx, mut recovery_subsystem_rx) = + AvailabilityRecoverySubsystemHandle::new(); + let recovery_delay_range = + RecoveryDelayRange { min: Duration::from_millis(0), max: Duration::from_millis(10) }; + let (_explicit_recovery_chan_tx, explicit_recovery_chan_rx) = mpsc::channel(10); + let candidates = make_candidate_chain(1..2); + let header = Header::decode(&mut &candidates[0].commitments.head_data.0[..]).unwrap(); + let candidate_hash = candidates[0].hash(); + + let relay_chain_client = Relaychain::new(vec![( + PHeader { + parent_hash: PHash::from_low_u64_be(0), + number: 1, + state_root: PHash::random(), + extrinsics_root: PHash::random(), + digest: Default::default(), + }, + candidates, + )]); + let mut known_blocks = HashMap::new(); + known_blocks.insert(GENESIS_HASH, BlockStatus::InChainWithState); + let (parachain_client, _import_notifications_tx, _finality_notifications_tx) = + ParachainClient::new(vec![dummy_usage_info(0)], Arc::new(Mutex::new(known_blocks))); + let (parachain_import_queue, mut import_requests_rx) = ParachainImportQueue::new(); + + let pov_recovery = PoVRecovery::::new( + Box::new(recovery_subsystem_tx), + recovery_delay_range, + Arc::new(parachain_client), + Box::new(parachain_import_queue), + relay_chain_client, + ParaId::new(1000), + explicit_recovery_chan_rx, + Arc::new(DummySyncOracle::default()), + ); + + task::spawn(pov_recovery.run()); + + // First recovery fails. + assert_matches!( + recovery_subsystem_rx.next().await, + Some(AvailabilityRecoveryMessage::RecoverAvailableData( + receipt, + session_index, + None, + None, + response_tx + )) => { + assert_eq!(receipt.hash(), candidate_hash); + assert_eq!(session_index, TEST_SESSION_INDEX); + response_tx.send( + Err(RecoveryError::Unavailable) + ).unwrap() + } + ); + // Candidate is not imported. + assert_matches!(import_requests_rx.next().timeout(Duration::from_millis(100)).await, None); + + // Recovery is retried and it succeeds now. + assert_matches!( + recovery_subsystem_rx.next().await, + Some(AvailabilityRecoveryMessage::RecoverAvailableData( + receipt, + session_index, + None, + None, + response_tx + )) => { + assert_eq!(receipt.hash(), candidate_hash); + assert_eq!(session_index, TEST_SESSION_INDEX); + response_tx.send( + Ok( + AvailableData { + pov: Arc::new(PoV { + block_data: ParachainBlockData::::new( + header.clone(), + vec![], + CompactProof {encoded_nodes: vec![]} + ).encode().into() + }), + validation_data: dummy_pvd(), + } + ) + ).unwrap() + } + ); + + // No more recovery messages received. + assert_matches!(recovery_subsystem_rx.next().timeout(Duration::from_millis(100)).await, None); + + // Received import request for the recovered candidate + assert_matches!(import_requests_rx.next().await, Some(incoming_blocks) => { + assert_eq!(incoming_blocks.len(), 1); + assert_eq!(incoming_blocks[0].header, Some(header)); + }); + + // No import request received + assert_matches!(import_requests_rx.next().timeout(Duration::from_millis(100)).await, None); +} + +#[tokio::test] +async fn single_pending_candidate_recovery_retry_fails() { + sp_tracing::init_for_tests(); + + let (recovery_subsystem_tx, mut recovery_subsystem_rx) = + AvailabilityRecoverySubsystemHandle::new(); + let recovery_delay_range = + RecoveryDelayRange { min: Duration::from_millis(0), max: Duration::from_millis(10) }; + let (_explicit_recovery_chan_tx, explicit_recovery_chan_rx) = mpsc::channel(10); + let candidates = make_candidate_chain(1..2); + let candidate_hash = candidates[0].hash(); + + let relay_chain_client = Relaychain::new(vec![( + PHeader { + parent_hash: PHash::from_low_u64_be(0), + number: 1, + state_root: PHash::random(), + extrinsics_root: PHash::random(), + digest: Default::default(), + }, + candidates, + )]); + let mut known_blocks = HashMap::new(); + known_blocks.insert(GENESIS_HASH, BlockStatus::InChainWithState); + let (parachain_client, _import_notifications_tx, _finality_notifications_tx) = + ParachainClient::new(vec![dummy_usage_info(0)], Arc::new(Mutex::new(known_blocks))); + let (parachain_import_queue, mut import_requests_rx) = ParachainImportQueue::new(); + + let pov_recovery = PoVRecovery::::new( + Box::new(recovery_subsystem_tx), + recovery_delay_range, + Arc::new(parachain_client), + Box::new(parachain_import_queue), + relay_chain_client, + ParaId::new(1000), + explicit_recovery_chan_rx, + Arc::new(DummySyncOracle::default()), + ); + + task::spawn(pov_recovery.run()); + + // First recovery fails. + assert_matches!( + recovery_subsystem_rx.next().await, + Some(AvailabilityRecoveryMessage::RecoverAvailableData( + receipt, + session_index, + None, + None, + response_tx + )) => { + assert_eq!(receipt.hash(), candidate_hash); + assert_eq!(session_index, TEST_SESSION_INDEX); + response_tx.send( + Err(RecoveryError::Unavailable) + ).unwrap() + } + ); + // Candidate is not imported. + assert_matches!(import_requests_rx.next().timeout(Duration::from_millis(100)).await, None); + + // Second retry fails. + assert_matches!( + recovery_subsystem_rx.next().await, + Some(AvailabilityRecoveryMessage::RecoverAvailableData( + receipt, + session_index, + None, + None, + response_tx + )) => { + assert_eq!(receipt.hash(), candidate_hash); + assert_eq!(session_index, TEST_SESSION_INDEX); + response_tx.send( + Err(RecoveryError::Unavailable) + ).unwrap() + } + ); + // Candidate is not imported. + assert_matches!(import_requests_rx.next().timeout(Duration::from_millis(100)).await, None); + + // After the second attempt, give up. + // No more recovery messages received. + assert_matches!(recovery_subsystem_rx.next().timeout(Duration::from_millis(100)).await, None); +} + +#[tokio::test] +async fn single_pending_candidate_recovery_irrecoverable_error() { + sp_tracing::init_for_tests(); + + let (recovery_subsystem_tx, mut recovery_subsystem_rx) = + AvailabilityRecoverySubsystemHandle::new(); + let recovery_delay_range = + RecoveryDelayRange { min: Duration::from_millis(0), max: Duration::from_millis(10) }; + let (_explicit_recovery_chan_tx, explicit_recovery_chan_rx) = mpsc::channel(10); + let candidates = make_candidate_chain(1..2); + let candidate_hash = candidates[0].hash(); + + let relay_chain_client = Relaychain::new(vec![( + PHeader { + parent_hash: PHash::from_low_u64_be(0), + number: 1, + state_root: PHash::random(), + extrinsics_root: PHash::random(), + digest: Default::default(), + }, + candidates, + )]); + let mut known_blocks = HashMap::new(); + known_blocks.insert(GENESIS_HASH, BlockStatus::InChainWithState); + let (parachain_client, _import_notifications_tx, _finality_notifications_tx) = + ParachainClient::new(vec![dummy_usage_info(0)], Arc::new(Mutex::new(known_blocks))); + let (parachain_import_queue, mut import_requests_rx) = ParachainImportQueue::new(); + + let pov_recovery = PoVRecovery::::new( + Box::new(recovery_subsystem_tx), + recovery_delay_range, + Arc::new(parachain_client), + Box::new(parachain_import_queue), + relay_chain_client, + ParaId::new(1000), + explicit_recovery_chan_rx, + Arc::new(DummySyncOracle::default()), + ); + + task::spawn(pov_recovery.run()); + + // Recovery succeeds but the block data is wrong. Will not be retried. + assert_matches!( + recovery_subsystem_rx.next().await, + Some(AvailabilityRecoveryMessage::RecoverAvailableData( + receipt, + session_index, + None, + None, + response_tx + )) => { + assert_eq!(receipt.hash(), candidate_hash); + assert_eq!(session_index, TEST_SESSION_INDEX); + response_tx.send( + Ok( + AvailableData { + pov: Arc::new(PoV { + // Empty block data. It will fail to decode. + block_data: vec![].into() + }), + validation_data: dummy_pvd(), + } + ) + ).unwrap() + } + ); + // Candidate is not imported. + assert_matches!(import_requests_rx.next().timeout(Duration::from_millis(100)).await, None); + + // No more recovery messages received. + assert_matches!(recovery_subsystem_rx.next().timeout(Duration::from_millis(100)).await, None); +} + +#[tokio::test] +async fn pending_candidates_recovery_skipped_while_syncing() { + sp_tracing::init_for_tests(); + + let (recovery_subsystem_tx, mut recovery_subsystem_rx) = + AvailabilityRecoverySubsystemHandle::new(); + let recovery_delay_range = + RecoveryDelayRange { min: Duration::from_millis(0), max: Duration::from_millis(10) }; + let (_explicit_recovery_chan_tx, explicit_recovery_chan_rx) = mpsc::channel(10); + let candidates = make_candidate_chain(1..4); + + let relay_chain_client = Relaychain::new(vec![( + PHeader { + parent_hash: PHash::from_low_u64_be(0), + number: 1, + state_root: PHash::random(), + extrinsics_root: PHash::random(), + digest: Default::default(), + }, + candidates, + )]); + let mut known_blocks = HashMap::new(); + known_blocks.insert(GENESIS_HASH, BlockStatus::InChainWithState); + let (parachain_client, _import_notifications_tx, _finality_notifications_tx) = + ParachainClient::new(vec![dummy_usage_info(0)], Arc::new(Mutex::new(known_blocks))); + let (parachain_import_queue, mut import_requests_rx) = ParachainImportQueue::new(); + + let pov_recovery = PoVRecovery::::new( + Box::new(recovery_subsystem_tx), + recovery_delay_range, + Arc::new(parachain_client), + Box::new(parachain_import_queue), + relay_chain_client, + ParaId::new(1000), + explicit_recovery_chan_rx, + Arc::new(DummySyncOracle::new(true)), + ); + + task::spawn(pov_recovery.run()); + + // No recovery messages received. + assert_matches!(recovery_subsystem_rx.next().timeout(Duration::from_millis(100)).await, None); + + // No candidate is imported. + assert_matches!(import_requests_rx.next().timeout(Duration::from_millis(100)).await, None); +} + +#[tokio::test] +async fn candidate_is_imported_while_awaiting_recovery() { + sp_tracing::init_for_tests(); + + let (recovery_subsystem_tx, mut recovery_subsystem_rx) = + AvailabilityRecoverySubsystemHandle::new(); + let recovery_delay_range = + RecoveryDelayRange { min: Duration::from_millis(0), max: Duration::from_millis(10) }; + let (_explicit_recovery_chan_tx, explicit_recovery_chan_rx) = mpsc::channel(10); + let candidates = make_candidate_chain(1..2); + let header = Header::decode(&mut &candidates[0].commitments.head_data.0[..]).unwrap(); + let candidate_hash = candidates[0].hash(); + + let relay_chain_client = Relaychain::new(vec![( + PHeader { + parent_hash: PHash::from_low_u64_be(0), + number: 1, + state_root: PHash::random(), + extrinsics_root: PHash::random(), + digest: Default::default(), + }, + candidates, + )]); + let mut known_blocks = HashMap::new(); + known_blocks.insert(GENESIS_HASH, BlockStatus::InChainWithState); + let (parachain_client, import_notifications_tx, _finality_notifications_tx) = + ParachainClient::new(vec![dummy_usage_info(0)], Arc::new(Mutex::new(known_blocks))); + let (parachain_import_queue, mut import_requests_rx) = ParachainImportQueue::new(); + + let pov_recovery = PoVRecovery::::new( + Box::new(recovery_subsystem_tx), + recovery_delay_range, + Arc::new(parachain_client), + Box::new(parachain_import_queue), + relay_chain_client, + ParaId::new(1000), + explicit_recovery_chan_rx, + Arc::new(DummySyncOracle::default()), + ); + + task::spawn(pov_recovery.run()); + + let recovery_response_tx; + + assert_matches!( + recovery_subsystem_rx.next().await, + Some(AvailabilityRecoveryMessage::RecoverAvailableData( + receipt, + session_index, + None, + None, + response_tx + )) => { + assert_eq!(receipt.hash(), candidate_hash); + assert_eq!(session_index, TEST_SESSION_INDEX); + recovery_response_tx = response_tx; + } + ); + + // While candidate is pending recovery, import the candidate from external source. + let (unpin_sender, _unpin_receiver) = sc_utils::mpsc::tracing_unbounded("test_unpin", 10); + import_notifications_tx + .unbounded_send(BlockImportNotification::new( + header.hash(), + BlockOrigin::ConsensusBroadcast, + header.clone(), + false, + None, + unpin_sender, + )) + .unwrap(); + + recovery_response_tx + .send(Ok(AvailableData { + pov: Arc::new(PoV { + block_data: ParachainBlockData::::new( + header.clone(), + vec![], + CompactProof { encoded_nodes: vec![] }, + ) + .encode() + .into(), + }), + validation_data: dummy_pvd(), + })) + .unwrap(); + + // Received import request for the recovered candidate. This could be optimised to not trigger a + // reimport. + assert_matches!(import_requests_rx.next().await, Some(incoming_blocks) => { + assert_eq!(incoming_blocks.len(), 1); + assert_eq!(incoming_blocks[0].header, Some(header)); + }); + + // No more recovery messages received. + assert_matches!(recovery_subsystem_rx.next().timeout(Duration::from_millis(100)).await, None); + + // No more import requests received + assert_matches!(import_requests_rx.next().timeout(Duration::from_millis(100)).await, None); +} + +#[tokio::test] +async fn candidate_is_finalized_while_awaiting_recovery() { + sp_tracing::init_for_tests(); + + let (recovery_subsystem_tx, mut recovery_subsystem_rx) = + AvailabilityRecoverySubsystemHandle::new(); + let recovery_delay_range = + RecoveryDelayRange { min: Duration::from_millis(0), max: Duration::from_millis(10) }; + let (_explicit_recovery_chan_tx, explicit_recovery_chan_rx) = mpsc::channel(10); + let candidates = make_candidate_chain(1..2); + let header = Header::decode(&mut &candidates[0].commitments.head_data.0[..]).unwrap(); + let candidate_hash = candidates[0].hash(); + + let relay_chain_client = Relaychain::new(vec![( + PHeader { + parent_hash: PHash::from_low_u64_be(0), + number: 1, + state_root: PHash::random(), + extrinsics_root: PHash::random(), + digest: Default::default(), + }, + candidates, + )]); + let mut known_blocks = HashMap::new(); + known_blocks.insert(GENESIS_HASH, BlockStatus::InChainWithState); + let (parachain_client, _import_notifications_tx, finality_notifications_tx) = + ParachainClient::new(vec![dummy_usage_info(0)], Arc::new(Mutex::new(known_blocks))); + let (parachain_import_queue, mut import_requests_rx) = ParachainImportQueue::new(); + + let pov_recovery = PoVRecovery::::new( + Box::new(recovery_subsystem_tx), + recovery_delay_range, + Arc::new(parachain_client), + Box::new(parachain_import_queue), + relay_chain_client, + ParaId::new(1000), + explicit_recovery_chan_rx, + Arc::new(DummySyncOracle::default()), + ); + + task::spawn(pov_recovery.run()); + + let recovery_response_tx; + + assert_matches!( + recovery_subsystem_rx.next().await, + Some(AvailabilityRecoveryMessage::RecoverAvailableData( + receipt, + session_index, + None, + None, + response_tx + )) => { + assert_eq!(receipt.hash(), candidate_hash); + assert_eq!(session_index, TEST_SESSION_INDEX); + // save it for later. + recovery_response_tx = response_tx; + } + ); + + // While candidate is pending recovery, it gets finalized. + let (unpin_sender, _unpin_receiver) = sc_utils::mpsc::tracing_unbounded("test_unpin", 10); + finality_notifications_tx + .unbounded_send(FinalityNotification::from_summary( + FinalizeSummary { header: header.clone(), finalized: vec![], stale_heads: vec![] }, + unpin_sender, + )) + .unwrap(); + + recovery_response_tx + .send(Ok(AvailableData { + pov: Arc::new(PoV { + block_data: ParachainBlockData::::new( + header.clone(), + vec![], + CompactProof { encoded_nodes: vec![] }, + ) + .encode() + .into(), + }), + validation_data: dummy_pvd(), + })) + .unwrap(); + + // No more recovery messages received. + assert_matches!(recovery_subsystem_rx.next().timeout(Duration::from_millis(100)).await, None); + + // candidate is imported + assert_matches!(import_requests_rx.next().await, Some(incoming_blocks) => { + assert_eq!(incoming_blocks.len(), 1); + assert_eq!(incoming_blocks[0].header, Some(header)); + }); + + // No more import requests received + assert_matches!(import_requests_rx.next().timeout(Duration::from_millis(100)).await, None); +} + +#[tokio::test] +async fn chained_recovery_success() { + sp_tracing::init_for_tests(); + + let (recovery_subsystem_tx, mut recovery_subsystem_rx) = + AvailabilityRecoverySubsystemHandle::new(); + let recovery_delay_range = + RecoveryDelayRange { min: Duration::from_millis(0), max: Duration::from_millis(0) }; + let (_explicit_recovery_chan_tx, explicit_recovery_chan_rx) = mpsc::channel(10); + let candidates = make_candidate_chain(1..4); + let headers = candidates + .iter() + .map(|candidate| Header::decode(&mut &candidate.commitments.head_data.0[..]).unwrap()) + .collect::>(); + let candidate_hashes = candidates.iter().map(|candidate| candidate.hash()).collect::>(); + + let relay_chain_client = Relaychain::new(vec![( + PHeader { + parent_hash: PHash::from_low_u64_be(0), + number: 1, + state_root: PHash::random(), + extrinsics_root: PHash::random(), + digest: Default::default(), + }, + // 3 pending candidates + candidates, + )]); + let mut known_blocks = HashMap::new(); + known_blocks.insert(GENESIS_HASH, BlockStatus::InChainWithState); + let known_blocks = Arc::new(Mutex::new(known_blocks)); + let (parachain_client, import_notifications_tx, _finality_notifications_tx) = + ParachainClient::new(vec![dummy_usage_info(0)], known_blocks.clone()); + let (parachain_import_queue, mut import_requests_rx) = ParachainImportQueue::new(); + + let pov_recovery = PoVRecovery::::new( + Box::new(recovery_subsystem_tx), + recovery_delay_range, + Arc::new(parachain_client), + Box::new(parachain_import_queue), + relay_chain_client, + ParaId::new(1000), + explicit_recovery_chan_rx, + Arc::new(DummySyncOracle::default()), + ); + + task::spawn(pov_recovery.run()); + + // Candidates are recovered in the right order. + for (candidate_hash, header) in candidate_hashes.into_iter().zip(headers.into_iter()) { + assert_matches!( + recovery_subsystem_rx.next().await, + Some(AvailabilityRecoveryMessage::RecoverAvailableData( + receipt, + session_index, + None, + None, + response_tx + )) => { + assert_eq!(receipt.hash(), candidate_hash); + assert_eq!(session_index, TEST_SESSION_INDEX); + response_tx + .send(Ok(AvailableData { + pov: Arc::new(PoV { + block_data: ParachainBlockData::::new( + header.clone(), + vec![], + CompactProof { encoded_nodes: vec![] }, + ) + .encode() + .into(), + }), + validation_data: dummy_pvd(), + })) + .unwrap(); + } + ); + + assert_matches!(import_requests_rx.next().await, Some(incoming_blocks) => { + assert_eq!(incoming_blocks.len(), 1); + assert_eq!(incoming_blocks[0].header, Some(header.clone())); + }); + + known_blocks + .lock() + .expect("Poisoned lock") + .insert(header.hash(), BlockStatus::InChainWithState); + + let (unpin_sender, _unpin_receiver) = sc_utils::mpsc::tracing_unbounded("test_unpin", 10); + import_notifications_tx + .unbounded_send(BlockImportNotification::new( + header.hash(), + BlockOrigin::ConsensusBroadcast, + header, + false, + None, + unpin_sender, + )) + .unwrap(); + } + + // No more recovery messages received. + assert_matches!(recovery_subsystem_rx.next().timeout(Duration::from_millis(100)).await, None); + + // No more import requests received + assert_matches!(import_requests_rx.next().timeout(Duration::from_millis(100)).await, None); +} + +#[tokio::test] +async fn chained_recovery_child_succeeds_before_parent() { + sp_tracing::init_for_tests(); + + let (recovery_subsystem_tx, mut recovery_subsystem_rx) = + AvailabilityRecoverySubsystemHandle::new(); + let recovery_delay_range = + RecoveryDelayRange { min: Duration::from_millis(0), max: Duration::from_millis(0) }; + let (_explicit_recovery_chan_tx, explicit_recovery_chan_rx) = mpsc::channel(10); + let candidates = make_candidate_chain(1..3); + let headers = candidates + .iter() + .map(|candidate| Header::decode(&mut &candidate.commitments.head_data.0[..]).unwrap()) + .collect::>(); + let candidate_hashes = candidates.iter().map(|candidate| candidate.hash()).collect::>(); + + let relay_chain_client = Relaychain::new(vec![( + PHeader { + parent_hash: PHash::from_low_u64_be(0), + number: 1, + state_root: PHash::random(), + extrinsics_root: PHash::random(), + digest: Default::default(), + }, + // 2 pending candidates + candidates, + )]); + let mut known_blocks = HashMap::new(); + known_blocks.insert(GENESIS_HASH, BlockStatus::InChainWithState); + let known_blocks = Arc::new(Mutex::new(known_blocks)); + let (parachain_client, _import_notifications_tx, _finality_notifications_tx) = + ParachainClient::new(vec![dummy_usage_info(0)], known_blocks.clone()); + let (parachain_import_queue, mut import_requests_rx) = ParachainImportQueue::new(); + + let pov_recovery = PoVRecovery::::new( + Box::new(recovery_subsystem_tx), + recovery_delay_range, + Arc::new(parachain_client), + Box::new(parachain_import_queue), + relay_chain_client, + ParaId::new(1000), + explicit_recovery_chan_rx, + Arc::new(DummySyncOracle::default()), + ); + + task::spawn(pov_recovery.run()); + + let mut recovery_responses_senders = vec![]; + + for candidate_hash in candidate_hashes.iter() { + assert_matches!( + recovery_subsystem_rx.next().await, + Some(AvailabilityRecoveryMessage::RecoverAvailableData( + receipt, + session_index, + None, + None, + response_tx + )) => { + assert_eq!(receipt.hash(), *candidate_hash); + assert_eq!(session_index, TEST_SESSION_INDEX); + recovery_responses_senders.push(response_tx); + } + ); + } + + // Send out the responses in reverse order. + for (recovery_response_sender, header) in + recovery_responses_senders.into_iter().zip(headers.iter()).rev() + { + recovery_response_sender + .send(Ok(AvailableData { + pov: Arc::new(PoV { + block_data: ParachainBlockData::::new( + header.clone(), + vec![], + CompactProof { encoded_nodes: vec![] }, + ) + .encode() + .into(), + }), + validation_data: dummy_pvd(), + })) + .unwrap(); + } + + assert_matches!(import_requests_rx.next().await, Some(incoming_blocks) => { + // The two import requests will be batched. + assert_eq!(incoming_blocks.len(), 2); + assert_eq!(incoming_blocks[0].header, Some(headers[0].clone())); + assert_eq!(incoming_blocks[1].header, Some(headers[1].clone())); + }); + + // No more recovery messages received. + assert_matches!(recovery_subsystem_rx.next().timeout(Duration::from_millis(100)).await, None); + + // No more import requests received + assert_matches!(import_requests_rx.next().timeout(Duration::from_millis(100)).await, None); +} diff --git a/cumulus/client/relay-chain-inprocess-interface/src/lib.rs b/cumulus/client/relay-chain-inprocess-interface/src/lib.rs index 578b942776dc..7871623e8447 100644 --- a/cumulus/client/relay-chain-inprocess-interface/src/lib.rs +++ b/cumulus/client/relay-chain-inprocess-interface/src/lib.rs @@ -30,7 +30,7 @@ use futures::{FutureExt, Stream, StreamExt}; use polkadot_service::{ CollatorPair, Configuration, FullBackend, FullClient, Handle, NewFull, TaskManager, }; -use sc_cli::SubstrateCli; +use sc_cli::{RuntimeVersion, SubstrateCli}; use sc_client_api::{ blockchain::BlockStatus, Backend, BlockchainEvents, HeaderBackend, ImportNotifications, StorageProof, @@ -68,6 +68,10 @@ impl RelayChainInProcessInterface { #[async_trait] impl RelayChainInterface for RelayChainInProcessInterface { + async fn version(&self, relay_parent: PHash) -> RelayChainResult { + Ok(self.full_client.runtime_version_at(relay_parent)?) + } + async fn retrieve_dmq_contents( &self, para_id: ParaId, @@ -251,6 +255,14 @@ impl RelayChainInterface for RelayChainInProcessInterface { }); Ok(Box::pin(notifications_stream)) } + + async fn candidates_pending_availability( + &self, + hash: PHash, + para_id: ParaId, + ) -> RelayChainResult> { + Ok(self.full_client.runtime_api().candidates_pending_availability(hash, para_id)?) + } } pub enum BlockCheckStatus { diff --git a/cumulus/client/relay-chain-interface/Cargo.toml b/cumulus/client/relay-chain-interface/Cargo.toml index 5d612cdc0eef..e8603693ac8d 100644 --- a/cumulus/client/relay-chain-interface/Cargo.toml +++ b/cumulus/client/relay-chain-interface/Cargo.toml @@ -18,6 +18,7 @@ sp-api = { path = "../../../substrate/primitives/api" } sp-blockchain = { path = "../../../substrate/primitives/blockchain" } sp-state-machine = { path = "../../../substrate/primitives/state-machine" } sc-client-api = { path = "../../../substrate/client/api" } +sp-version = { path = "../../../substrate/primitives/version", default-features = false } futures = "0.3.28" async-trait = "0.1.79" diff --git a/cumulus/client/relay-chain-interface/src/lib.rs b/cumulus/client/relay-chain-interface/src/lib.rs index 7c7796b468c0..46e19b40f010 100644 --- a/cumulus/client/relay-chain-interface/src/lib.rs +++ b/cumulus/client/relay-chain-interface/src/lib.rs @@ -16,10 +16,10 @@ use std::{collections::BTreeMap, pin::Pin, sync::Arc}; +use futures::Stream; use polkadot_overseer::prometheus::PrometheusError; use sc_client_api::StorageProof; - -use futures::Stream; +use sp_version::RuntimeVersion; use async_trait::async_trait; use codec::Error as CodecError; @@ -149,8 +149,12 @@ pub trait RelayChainInterface: Send + Sync { _: OccupiedCoreAssumption, ) -> RelayChainResult>; - /// Get the receipt of a candidate pending availability. This returns `Some` for any paras - /// assigned to occupied cores in `availability_cores` and `None` otherwise. + /// Get the receipt of the first candidate pending availability of this para_id. This returns + /// `Some` for any paras assigned to occupied cores in `availability_cores` and `None` + /// otherwise. + #[deprecated( + note = "`candidate_pending_availability` only returns one candidate and is deprecated. Use `candidates_pending_availability` instead." + )] async fn candidate_pending_availability( &self, block_id: PHash, @@ -203,6 +207,16 @@ pub trait RelayChainInterface: Send + Sync { para_id: ParaId, occupied_core_assumption: OccupiedCoreAssumption, ) -> RelayChainResult>; + + /// Get the receipts of all candidates pending availability for this para_id. + async fn candidates_pending_availability( + &self, + block_id: PHash, + para_id: ParaId, + ) -> RelayChainResult>; + + /// Get the runtime version of the relay chain. + async fn version(&self, relay_parent: PHash) -> RelayChainResult; } #[async_trait] @@ -237,6 +251,7 @@ where .await } + #[allow(deprecated)] async fn candidate_pending_availability( &self, block_id: PHash, @@ -321,4 +336,16 @@ where .validation_code_hash(relay_parent, para_id, occupied_core_assumption) .await } + + async fn candidates_pending_availability( + &self, + block_id: PHash, + para_id: ParaId, + ) -> RelayChainResult> { + (**self).candidates_pending_availability(block_id, para_id).await + } + + async fn version(&self, relay_parent: PHash) -> RelayChainResult { + (**self).version(relay_parent).await + } } diff --git a/cumulus/client/relay-chain-rpc-interface/src/lib.rs b/cumulus/client/relay-chain-rpc-interface/src/lib.rs index 3a4c186e301e..bb7bfa5dc322 100644 --- a/cumulus/client/relay-chain-rpc-interface/src/lib.rs +++ b/cumulus/client/relay-chain-rpc-interface/src/lib.rs @@ -33,6 +33,7 @@ use sc_client_api::StorageProof; use sp_core::sp_std::collections::btree_map::BTreeMap; use sp_state_machine::StorageValue; use sp_storage::StorageKey; +use sp_version::RuntimeVersion; use std::pin::Pin; use cumulus_primitives_core::relay_chain::BlockId; @@ -237,4 +238,18 @@ impl RelayChainInterface for RelayChainRpcInterface { let imported_headers_stream = self.rpc_client.get_best_heads_stream()?; Ok(imported_headers_stream.boxed()) } + + async fn candidates_pending_availability( + &self, + hash: RelayHash, + para_id: ParaId, + ) -> RelayChainResult> { + self.rpc_client + .parachain_host_candidates_pending_availability(hash, para_id) + .await + } + + async fn version(&self, relay_parent: RelayHash) -> RelayChainResult { + self.rpc_client.runtime_version(relay_parent).await + } } diff --git a/prdoc/pr_4733.prdoc b/prdoc/pr_4733.prdoc new file mode 100644 index 000000000000..e63324839852 --- /dev/null +++ b/prdoc/pr_4733.prdoc @@ -0,0 +1,27 @@ +title: Add pov-recovery unit tests and support for elastic scaling + +doc: + - audience: Node Dev + description: | + Adds unit tests for cumulus pov-recovery and support for elastic scaling (recovering multiple candidates in a single relay chain block). + +crates: + - name: cumulus-client-network + bump: patch + - name: cumulus-client-pov-recovery + bump: patch + - name: cumulus-relay-chain-interface + bump: major + validate: false + - name: cumulus-relay-chain-inprocess-interface + bump: minor + - name: cumulus-relay-chain-rpc-interface + bump: minor + - name: cumulus-client-consensus-common + bump: none + - name: sc-client-api + bump: minor + - name: sp-blockchain + bump: minor + - name: sp-consensus + bump: minor diff --git a/substrate/client/api/src/client.rs b/substrate/client/api/src/client.rs index 2de09840e4df..45cfafb25846 100644 --- a/substrate/client/api/src/client.rs +++ b/substrate/client/api/src/client.rs @@ -168,7 +168,7 @@ pub trait ProvideUncles { } /// Client info -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct ClientInfo { /// Best block hash. pub chain: Info, diff --git a/substrate/primitives/blockchain/src/backend.rs b/substrate/primitives/blockchain/src/backend.rs index 06e5b682964a..933e41e2ab45 100644 --- a/substrate/primitives/blockchain/src/backend.rs +++ b/substrate/primitives/blockchain/src/backend.rs @@ -284,7 +284,7 @@ impl DisplacedLeavesAfterFinalization { } /// Blockchain info -#[derive(Debug, Eq, PartialEq)] +#[derive(Debug, Eq, PartialEq, Clone)] pub struct Info { /// Best block hash. pub best_hash: Block::Hash, diff --git a/substrate/primitives/consensus/common/src/lib.rs b/substrate/primitives/consensus/common/src/lib.rs index 01d3b7a24f9c..37636b34b03d 100644 --- a/substrate/primitives/consensus/common/src/lib.rs +++ b/substrate/primitives/consensus/common/src/lib.rs @@ -40,7 +40,7 @@ pub use sp_inherents::InherentData; pub use sp_state_machine::Backend as StateBackend; /// Block status. -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, PartialEq, Eq, Clone)] pub enum BlockStatus { /// Added to the import queue. Queued,