diff --git a/core/consensus/babe/impl/babe.hpp b/core/consensus/babe/impl/babe.hpp index 3333cde102..c41c3b9dc4 100644 --- a/core/consensus/babe/impl/babe.hpp +++ b/core/consensus/babe/impl/babe.hpp @@ -71,7 +71,7 @@ namespace kagome::offchain { namespace kagome::parachain { class BitfieldStore; - class ParachainProcessorImpl; + class ParachainProcessor; struct BackedCandidatesSource; } // namespace kagome::parachain diff --git a/core/injector/application_injector.cpp b/core/injector/application_injector.cpp index 07afea544a..05cd4d2a0b 100644 --- a/core/injector/application_injector.cpp +++ b/core/injector/application_injector.cpp @@ -114,6 +114,7 @@ #include "metrics/impl/metrics_watcher.hpp" #include "metrics/impl/prometheus/handler_impl.hpp" #include "metrics/metrics.hpp" +#include "network/i_peer_view.hpp" #include "network/impl/block_announce_transmitter_impl.hpp" #include "network/impl/extrinsic_observer_impl.hpp" #include "network/impl/grandpa_transmitter_impl.hpp" @@ -152,6 +153,7 @@ #include "outcome/outcome.hpp" #include "parachain/approval/approval_distribution.hpp" #include "parachain/approval/approval_thread_pool.hpp" +#include "parachain/availability/bitfield/signer.hpp" #include "parachain/availability/bitfield/store_impl.hpp" #include "parachain/availability/fetch/fetch_impl.hpp" #include "parachain/availability/recovery/recovery_impl.hpp" @@ -164,6 +166,7 @@ #include "parachain/pvf/workers.hpp" #include "parachain/validator/impl/parachain_observer_impl.hpp" #include "parachain/validator/parachain_processor.hpp" +#include "parachain/validator/statement_distribution/i_statement_distribution.hpp" #include "parachain/validator/statement_distribution/statement_distribution.hpp" #include "runtime/binaryen/binaryen_memory_provider.hpp" #include "runtime/binaryen/instance_environment_factory.hpp" @@ -784,12 +787,35 @@ namespace { di::bind.template to(), di::bind.template to(), di::bind.template to(), + di::bind.template to(), + di::bind.template to(), di::bind.template to(), di::bind.template to(), + di::bind.template to(), di::bind.template to(), di::bind.template to(), - di::bind.template to(), - di::bind.template to(), + di::bind.template to(), + bind_by_lambda( + [](const auto &injector) { + return injector.template create>(); + }), + bind_by_lambda( + [](const auto &injector) { + return injector.template create>(); + }), + bind_by_lambda( + [](const auto &injector) { + return injector.template create>(); + }), + di::bind.template to(), + bind_by_lambda( + [](const auto &injector) { + return injector.template create>(); + }), + bind_by_lambda( + [](const auto &injector) { + return injector.template create>(); + }), di::bind.template to(), bind_by_lambda([config](const auto &) { auto support = parachain::SecureModeSupport::none(); @@ -1034,10 +1060,10 @@ namespace kagome::injector { .template create>(); } - std::shared_ptr + std::shared_ptr KagomeNodeInjector::injectParachainProcessor() { return pimpl_->injector_ - .template create>(); + .template create>(); } std::shared_ptr diff --git a/core/injector/application_injector.hpp b/core/injector/application_injector.hpp index 2bf2caad87..4012f57142 100644 --- a/core/injector/application_injector.hpp +++ b/core/injector/application_injector.hpp @@ -59,12 +59,13 @@ namespace kagome { namespace parachain { class ParachainObserver; - class ParachainProcessorImpl; + class ParachainProcessor; class ApprovalDistribution; namespace statement_distribution { class StatementDistribution; - } + class IStatementDistribution; + } // namespace statement_distribution } // namespace parachain namespace runtime { @@ -138,8 +139,7 @@ namespace kagome::injector { std::shared_ptr injectSyncObserver(); std::shared_ptr injectStateObserver(); std::shared_ptr injectParachainObserver(); - std::shared_ptr - injectParachainProcessor(); + std::shared_ptr injectParachainProcessor(); std::shared_ptr injectStatementDistribution(); std::shared_ptr diff --git a/core/network/i_peer_view.hpp b/core/network/i_peer_view.hpp new file mode 100644 index 0000000000..fb1f1417bc --- /dev/null +++ b/core/network/i_peer_view.hpp @@ -0,0 +1,69 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ +#pragma once + +#include +#include + +#include + +#include "crypto/type_hasher.hpp" +#include "network/types/collator_messages.hpp" +#include "outcome/outcome.hpp" +#include "primitives/event_types.hpp" +#include "subscription/subscriber.hpp" +#include "subscription/subscription_engine.hpp" + +namespace kagome::network { + + using HashedBlockHeader = primitives::BlockHeader; + struct ExView { + View view; + HashedBlockHeader new_head; + std::vector lost; + }; + + struct ExViewRef { + std::optional> new_head; + const std::vector &lost; + }; + + /** + * Observable class for current heads and finalized block number tracking. + */ + class IPeerView { + public: + enum struct EventType : uint32_t { kViewUpdated, kPeerRemoved }; + + using PeerId = libp2p::peer::PeerId; + + using MyViewSubscriptionEngine = subscription:: + SubscriptionEngine; + using MyViewSubscriptionEnginePtr = + std::shared_ptr; + using MyViewSubscriber = MyViewSubscriptionEngine::SubscriberType; + using MyViewSubscriberPtr = std::shared_ptr; + + using PeerViewSubscriptionEngine = subscription:: + SubscriptionEngine; + using PeerViewSubscriptionEnginePtr = + std::shared_ptr; + using PeerViewSubscriber = PeerViewSubscriptionEngine::SubscriberType; + using PeerViewSubscriberPtr = std::shared_ptr; + + virtual ~IPeerView() = default; + + virtual size_t peersCount() const = 0; + virtual MyViewSubscriptionEnginePtr getMyViewObservable() = 0; + virtual PeerViewSubscriptionEnginePtr getRemoteViewObservable() = 0; + + virtual void removePeer(const PeerId &peer_id) = 0; + virtual void updateRemoteView(const PeerId &peer_id, + network::View &&view) = 0; + virtual const View &getMyView() const = 0; + }; + +} // namespace kagome::network diff --git a/core/network/impl/protocols/protocol_fetch_chunk.hpp b/core/network/impl/protocols/protocol_fetch_chunk.hpp index c477a9a343..dd4224f607 100644 --- a/core/network/impl/protocols/protocol_fetch_chunk.hpp +++ b/core/network/impl/protocols/protocol_fetch_chunk.hpp @@ -38,13 +38,12 @@ namespace kagome::network { NonCopyable, NonMovable { public: - FetchChunkProtocolImpl( - libp2p::Host &host, - const application::ChainSpec & /*chain_spec*/, - const blockchain::GenesisBlockHash &genesis_hash, - std::shared_ptr pp, - std::shared_ptr pm, - common::MainThreadPool &main_thread_pool) + FetchChunkProtocolImpl(libp2p::Host &host, + const application::ChainSpec & /*chain_spec*/, + const blockchain::GenesisBlockHash &genesis_hash, + std::shared_ptr pp, + std::shared_ptr pm, + common::MainThreadPool &main_thread_pool) : RequestResponseProtocolImpl< FetchChunkRequest, FetchChunkResponse, @@ -132,7 +131,7 @@ namespace kagome::network { } inline static const auto kFetchChunkProtocolName = "FetchChunkProtocol_v2"s; - std::shared_ptr pp_; + std::shared_ptr pp_; std::shared_ptr pm_; }; diff --git a/core/network/impl/protocols/protocol_fetch_chunk_obsolete.hpp b/core/network/impl/protocols/protocol_fetch_chunk_obsolete.hpp index 5ee803e4dd..118c7c82d2 100644 --- a/core/network/impl/protocols/protocol_fetch_chunk_obsolete.hpp +++ b/core/network/impl/protocols/protocol_fetch_chunk_obsolete.hpp @@ -42,7 +42,7 @@ namespace kagome::network { libp2p::Host &host, const application::ChainSpec & /*chain_spec*/, const blockchain::GenesisBlockHash &genesis_hash, - std::shared_ptr pp, + std::shared_ptr pp, common::MainThreadPool &main_thread_pool) : RequestResponseProtocolImpl< FetchChunkRequest, @@ -117,7 +117,7 @@ namespace kagome::network { } inline static const auto kFetchChunkProtocolName = "FetchChunkProtocol_v1"s; - std::shared_ptr pp_; + std::shared_ptr pp_; }; } // namespace kagome::network diff --git a/core/network/peer_view.hpp b/core/network/peer_view.hpp index 60d768321b..1336912c8a 100644 --- a/core/network/peer_view.hpp +++ b/core/network/peer_view.hpp @@ -5,20 +5,10 @@ */ #pragma once -#include -#include - -#include - #include "application/app_state_manager.hpp" #include "blockchain/block_tree.hpp" -#include "crypto/type_hasher.hpp" #include "injector/lazy.hpp" -#include "network/types/collator_messages.hpp" -#include "outcome/outcome.hpp" -#include "primitives/event_types.hpp" -#include "subscription/subscriber.hpp" -#include "subscription/subscription_engine.hpp" +#include "network/i_peer_view.hpp" #include "utils/non_copyable.hpp" #include "utils/safe_object.hpp" @@ -28,43 +18,14 @@ namespace kagome::blockchain { namespace kagome::network { - using HashedBlockHeader = primitives::BlockHeader; - struct ExView { - View view; - HashedBlockHeader new_head; - std::vector lost; - }; - - struct ExViewRef { - std::optional> new_head; - const std::vector &lost; - }; - /** * Observable class for current heads and finalized block number tracking. */ class PeerView final : public NonCopyable, public NonMovable, + public IPeerView, public std::enable_shared_from_this { public: - enum struct EventType : uint32_t { kViewUpdated, kPeerRemoved }; - - using PeerId = libp2p::peer::PeerId; - - using MyViewSubscriptionEngine = subscription:: - SubscriptionEngine; - using MyViewSubscriptionEnginePtr = - std::shared_ptr; - using MyViewSubscriber = MyViewSubscriptionEngine::SubscriberType; - using MyViewSubscriberPtr = std::shared_ptr; - - using PeerViewSubscriptionEngine = subscription:: - SubscriptionEngine; - using PeerViewSubscriptionEnginePtr = - std::shared_ptr; - using PeerViewSubscriber = PeerViewSubscriptionEngine::SubscriberType; - using PeerViewSubscriberPtr = std::shared_ptr; - PeerView(primitives::events::ChainSubscriptionEnginePtr chain_events_engine, std::shared_ptr app_state_manager, LazySPtr block_tree); @@ -76,14 +37,13 @@ namespace kagome::network { bool prepare(); void stop(); - size_t peersCount() const; - - MyViewSubscriptionEnginePtr getMyViewObservable(); - PeerViewSubscriptionEnginePtr getRemoteViewObservable(); + size_t peersCount() const override; + MyViewSubscriptionEnginePtr getMyViewObservable() override; + PeerViewSubscriptionEnginePtr getRemoteViewObservable() override; - void removePeer(const PeerId &peer_id); - void updateRemoteView(const PeerId &peer_id, network::View &&view); - auto &getMyView() const { + void removePeer(const PeerId &peer_id) override; + void updateRemoteView(const PeerId &peer_id, network::View &&view) override; + const View &getMyView() const override { return my_view_; } diff --git a/core/parachain/CMakeLists.txt b/core/parachain/CMakeLists.txt index 0b64f13673..b87d32a87e 100644 --- a/core/parachain/CMakeLists.txt +++ b/core/parachain/CMakeLists.txt @@ -32,6 +32,18 @@ target_link_libraries(backing_implicit_view outcome ) +add_library(parachain_errors + validator/i_parachain_processor.cpp + ) + +target_link_libraries(parachain_errors + fmt::fmt + soralog::soralog + logger + Boost::boost + outcome + ) + add_library(validator_parachain availability/bitfield/signer.cpp availability/bitfield/store_impl.cpp @@ -54,6 +66,7 @@ add_library(validator_parachain backing/store_impl.cpp backing/cluster.cpp validator/statement_distribution/statement_distribution.cpp + validator/parachain_storage.cpp ) target_link_libraries(validator_parachain @@ -68,6 +81,7 @@ target_link_libraries(validator_parachain runtime_common prospective_parachains backing_implicit_view + parachain_errors ) add_library(kagome_pvf_worker diff --git a/core/parachain/approval/approval_distribution.cpp b/core/parachain/approval/approval_distribution.cpp index 13e3622bb1..3c8b68cca1 100644 --- a/core/parachain/approval/approval_distribution.cpp +++ b/core/parachain/approval/approval_distribution.cpp @@ -577,7 +577,7 @@ namespace kagome::parachain { std::shared_ptr keystore, std::shared_ptr hasher, std::shared_ptr peer_view, - std::shared_ptr parachain_processor, + std::shared_ptr parachain_processor, std::shared_ptr crypto_provider, std::shared_ptr pm, std::shared_ptr router, diff --git a/core/parachain/approval/approval_distribution.hpp b/core/parachain/approval/approval_distribution.hpp index 608b9a195c..7c447f14a1 100644 --- a/core/parachain/approval/approval_distribution.hpp +++ b/core/parachain/approval/approval_distribution.hpp @@ -59,7 +59,7 @@ namespace kagome::consensus::babe { namespace kagome::parachain { class ApprovalThreadPool; - class ParachainProcessorImpl; + class ParachainProcessor; class Pvf; } // namespace kagome::parachain @@ -300,7 +300,7 @@ namespace kagome::parachain { std::shared_ptr keystore, std::shared_ptr hasher, std::shared_ptr peer_view, - std::shared_ptr parachain_processor, + std::shared_ptr parachain_processor, std::shared_ptr crypto_provider, std::shared_ptr pm, std::shared_ptr router, @@ -877,7 +877,7 @@ namespace kagome::parachain { StorePair> store_; - std::shared_ptr parachain_processor_; + std::shared_ptr parachain_processor_; std::shared_ptr crypto_provider_; std::shared_ptr pm_; std::shared_ptr router_; diff --git a/core/parachain/availability/bitfield/signer.cpp b/core/parachain/availability/bitfield/signer.cpp index 640556cc5e..92d88a44f1 100644 --- a/core/parachain/availability/bitfield/signer.cpp +++ b/core/parachain/availability/bitfield/signer.cpp @@ -49,7 +49,7 @@ namespace kagome::parachain { broadcast_ = std::move(callback); } - outcome::result BitfieldSigner::sign(const ValidatorSigner &signer, + outcome::result BitfieldSigner::sign(const IValidatorSigner &signer, const Candidates &candidates) { const BlockHash &relay_parent = signer.relayParent(); scale::BitVec bitfield; @@ -70,10 +70,12 @@ namespace kagome::parachain { } outcome::result BitfieldSigner::onBlock(const BlockHash &relay_parent) { - OUTCOME_TRY(signer, signer_factory_->at(relay_parent)); - if (not signer.has_value()) { + OUTCOME_TRY(opt_signer, signer_factory_->at(relay_parent)); + if (!opt_signer) { return outcome::success(); } + auto &signer = *opt_signer; + Candidates candidates; OUTCOME_TRY(cores, parachain_api_->availability_cores(relay_parent)); OUTCOME_TRY( @@ -108,10 +110,10 @@ namespace kagome::parachain { scheduler_->schedule( [weak{weak_from_this()}, - signer{std::move(*signer)}, + signer{std::move(signer)}, candidates{std::move(candidates)}]() mutable { if (auto self = weak.lock()) { - auto r = self->sign(signer, candidates); + auto r = self->sign(*signer, candidates); if (r.has_error()) { SL_WARN(self->logger_, "sign error {}", r.error()); } diff --git a/core/parachain/availability/bitfield/signer.hpp b/core/parachain/availability/bitfield/signer.hpp index 32d871771d..214adca9db 100644 --- a/core/parachain/availability/bitfield/signer.hpp +++ b/core/parachain/availability/bitfield/signer.hpp @@ -18,13 +18,26 @@ #include "runtime/runtime_api/parachain_host.hpp" namespace kagome::parachain { - /// Signs, stores and broadcasts bitfield for every new head. - class BitfieldSigner : public std::enable_shared_from_this { + class IBitfieldSigner { public: using BroadcastCallback = std::function; using Candidates = std::vector>; + virtual ~IBitfieldSigner() = default; + virtual void start() = 0; + + /// Sign bitfield for given block. + virtual outcome::result sign(const IValidatorSigner &signer, + const Candidates &candidates) = 0; + + virtual void setBroadcastCallback(BroadcastCallback &&callback) = 0; + }; + + /// Signs, stores and broadcasts bitfield for every new head. + class BitfieldSigner : public IBitfieldSigner, + public std::enable_shared_from_this { + public: BitfieldSigner( std::shared_ptr hasher, std::shared_ptr signer_factory, @@ -36,13 +49,12 @@ namespace kagome::parachain { primitives::events::ChainSubscriptionEnginePtr chain_sub_engine); /// Subscribes to new heads. - void start(); + void start() override; - /// Sign bitfield for given block. - outcome::result sign(const ValidatorSigner &signer, - const Candidates &candidates); + outcome::result sign(const IValidatorSigner &signer, + const Candidates &candidates) override; - void setBroadcastCallback(BroadcastCallback &&callback); + void setBroadcastCallback(BroadcastCallback &&callback) override; private: using BlockHash = primitives::BlockHash; diff --git a/core/parachain/pvf/precheck.cpp b/core/parachain/pvf/precheck.cpp index 35bf99ac1d..af1d4d586a 100644 --- a/core/parachain/pvf/precheck.cpp +++ b/core/parachain/pvf/precheck.cpp @@ -69,10 +69,12 @@ namespace kagome::parachain { outcome::result PvfPrecheck::onBlock() { auto block = block_tree_->bestBlock(); - OUTCOME_TRY(signer, signer_factory_->at(block.hash)); - if (not signer.has_value()) { + OUTCOME_TRY(opt_signer, signer_factory_->at(block.hash)); + if (!opt_signer) { return outcome::success(); } + auto &signer = *opt_signer; + if (not session_code_accept_.empty() and signer->getSessionIndex() < session_code_accept_.begin()->first) { SL_WARN(logger_, "past session"); @@ -100,7 +102,8 @@ namespace kagome::parachain { auto &code_zstd = *code_zstd_res.value(); auto res = [&]() -> outcome::result { OUTCOME_TRY(config, sessionParams(*parachain_api_, block.hash)); - OUTCOME_TRY(pvf_pool_->precompile(code_hash, code_zstd, config.context_params)); + OUTCOME_TRY(pvf_pool_->precompile( + code_hash, code_zstd, config.context_params)); return outcome::success(); }(); if (res) { diff --git a/core/parachain/pvf/precheck.hpp b/core/parachain/pvf/precheck.hpp index 97abcad342..ff53769c6f 100644 --- a/core/parachain/pvf/precheck.hpp +++ b/core/parachain/pvf/precheck.hpp @@ -39,13 +39,22 @@ namespace kagome::runtime { namespace kagome::parachain { class PvfPool; - /// Signs pvf check statement for every new head. - class PvfPrecheck : public std::enable_shared_from_this { + class IPvfPrecheck { public: using BroadcastCallback = std::function; using Candidates = std::vector>; + virtual ~IPvfPrecheck() = default; + + /// Subscribes to new heads. + virtual void start() = 0; + }; + + /// Signs pvf check statement for every new head. + class PvfPrecheck : public IPvfPrecheck, + public std::enable_shared_from_this { + public: PvfPrecheck( std::shared_ptr hasher, std::shared_ptr block_tree, @@ -60,7 +69,7 @@ namespace kagome::parachain { primitives::events::ChainSubscriptionEnginePtr chain_sub_engine); /// Subscribes to new heads. - void start(); + void start() override; private: using BlockHash = primitives::BlockHash; diff --git a/core/parachain/validator/backing_implicit_view.cpp b/core/parachain/validator/backing_implicit_view.cpp index 7021e6c901..0cffb52a7c 100644 --- a/core/parachain/validator/backing_implicit_view.cpp +++ b/core/parachain/validator/backing_implicit_view.cpp @@ -30,14 +30,16 @@ OUTCOME_CPP_DEFINE_CATEGORY(kagome::parachain, ImplicitView::Error, e) { namespace kagome::parachain { ImplicitView::ImplicitView( - std::weak_ptr prospective_parachains, + std::weak_ptr prospective_parachains, std::shared_ptr parachain_host_, std::shared_ptr block_tree, - std::optional collating_for_) + std::optional collating_for_, + bool trace_insertions) : parachain_host(std::move(parachain_host_)), collating_for{collating_for_}, prospective_parachains_{std::move(prospective_parachains)}, - block_tree_{std::move(block_tree)} { + block_tree_{std::move(block_tree)}, + trace_insertions_(trace_insertions) { BOOST_ASSERT(!prospective_parachains_.expired()); BOOST_ASSERT(parachain_host); BOOST_ASSERT(block_tree_); @@ -87,6 +89,11 @@ namespace kagome::parachain { ancestors.size()); for (const auto &ancestor : ancestors) { + if (trace_insertions_) { + SL_TRACE(logger, + "activate_leaf_from_prospective_parachains(ancestors): {}", + ancestor.hash); + } block_info_storage.insert_or_assign( ancestor.hash, BlockInfo{ @@ -98,6 +105,12 @@ namespace kagome::parachain { ancestor.hash); } + if (trace_insertions_) { + SL_TRACE(logger, + "activate_leaf_from_prospective_parachains: {} {}", + leaf.hash, + retain_minimum); + } block_info_storage.insert_or_assign( leaf.hash, BlockInfo{ @@ -123,6 +136,9 @@ namespace kagome::parachain { std::vector ImplicitView::deactivate_leaf(const Hash &leaf_hash) { std::vector removed; + if (trace_insertions_) { + SL_TRACE(logger, "deactivate_leaf(leaves): {}", leaf_hash); + } if (leaves.erase(leaf_hash) == 0ull) { return removed; } @@ -133,14 +149,28 @@ namespace kagome::parachain { minimum ? std::min(*minimum, l.retain_minimum) : l.retain_minimum; } + if (trace_insertions_) { + if (minimum) { + SL_TRACE(logger, "deactivate_leaf(--): {}", *minimum); + } else { + SL_TRACE(logger, "deactivate_leaf(--): no"); + } + } + for (auto it = block_info_storage.begin(); it != block_info_storage.end();) { const auto &[hash, i] = *it; const bool keep = minimum && i.block_number >= *minimum; if (keep) { + if (trace_insertions_) { + SL_TRACE(logger, "deactivate_leaf(-): {}", i.block_number); + } ++it; } else { removed.emplace_back(hash); + if (trace_insertions_) { + SL_TRACE(logger, "deactivate_leaf: {}", hash); + } it = block_info_storage.erase(it); } } @@ -157,6 +187,13 @@ namespace kagome::parachain { fetched.minimum_ancestor_number, math::sat_sub_unsigned(fetched.leaf_number, MINIMUM_RETAIN_LENGTH)); + if (trace_insertions_) { + SL_TRACE( + logger, + "activate_leaf(-): {} {}", + fetched.minimum_ancestor_number, + math::sat_sub_unsigned(fetched.leaf_number, MINIMUM_RETAIN_LENGTH)); + } leaves.insert_or_assign( leaf_hash, ActiveLeafPruningInfo{.retain_minimum = retain_minimum}); return outcome::success(); @@ -232,6 +269,12 @@ namespace kagome::parachain { min_min = std::min(x, min_min); } } + if (trace_insertions_) { + SL_TRACE(logger, + "fetch_fresh_leaf_and_insert_ancestry(-): {} {}", + min_min, + leaf_header.number); + } const size_t expected_ancestry_len = math::sat_sub_unsigned(leaf_header.number, min_min) + 1ull; @@ -250,6 +293,11 @@ namespace kagome::parachain { parent_hash = it->second.parent_hash; } else { OUTCOME_TRY(header, block_tree->getBlockHeader(next_ancestor_hash)); + if (trace_insertions_) { + SL_TRACE(logger, + "activate_leaf(next_ancestor_number): {}", + next_ancestor_hash); + } block_info_storage.emplace( next_ancestor_hash, BlockInfo{ @@ -272,6 +320,9 @@ namespace kagome::parachain { ancestry.emplace_back(leaf_hash); } + if (trace_insertions_) { + SL_TRACE(logger, "activate_leaf: {}", leaf_hash); + } block_info_storage.emplace( leaf_hash, BlockInfo{ diff --git a/core/parachain/validator/backing_implicit_view.hpp b/core/parachain/validator/backing_implicit_view.hpp index dbaa04f226..8d1e4c7dd5 100644 --- a/core/parachain/validator/backing_implicit_view.hpp +++ b/core/parachain/validator/backing_implicit_view.hpp @@ -22,7 +22,7 @@ namespace kagome::parachain { - class ProspectiveParachains; + class IProspectiveParachains; // Always aim to retain 1 block before the active leaves. constexpr BlockNumber MINIMUM_RETAIN_LENGTH = 2ull; @@ -124,10 +124,11 @@ namespace kagome::parachain { block_info_storage.size()); } - ImplicitView(std::weak_ptr prospective_parachains, + ImplicitView(std::weak_ptr prospective_parachains, std::shared_ptr parachain_host_, std::shared_ptr block_tree, - std::optional collating_for_); + std::optional collating_for_, + bool trace_insertions = false); private: struct ActiveLeafPruningInfo { @@ -161,8 +162,9 @@ namespace kagome::parachain { std::shared_ptr parachain_host; std::optional collating_for; - std::weak_ptr prospective_parachains_; + std::weak_ptr prospective_parachains_; std::shared_ptr block_tree_; + bool trace_insertions_; log::Logger logger = log::createLogger("BackingImplicitView", "parachain"); }; diff --git a/core/parachain/validator/i_parachain_processor.cpp b/core/parachain/validator/i_parachain_processor.cpp new file mode 100644 index 0000000000..1969207634 --- /dev/null +++ b/core/parachain/validator/i_parachain_processor.cpp @@ -0,0 +1,76 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "parachain/validator/i_parachain_processor.hpp" + +OUTCOME_CPP_DEFINE_CATEGORY(kagome::parachain, ParachainProcessor::Error, e) { + using E = kagome::parachain::ParachainProcessor::Error; + switch (e) { + case E::RESPONSE_ALREADY_RECEIVED: + return "Response already present"; + case E::REJECTED_BY_PROSPECTIVE_PARACHAINS: + return "Rejected by prospective parachains"; + case E::COLLATION_NOT_FOUND: + return "Collation not found"; + case E::UNDECLARED_COLLATOR: + return "Undeclared collator"; + case E::KEY_NOT_PRESENT: + return "Private key is not present"; + case E::VALIDATION_FAILED: + return "Validate and make available failed"; + case E::VALIDATION_SKIPPED: + return "Validate and make available skipped"; + case E::OUT_OF_VIEW: + return "Out of view"; + case E::CORE_INDEX_UNAVAILABLE: + return "Core index unavailable"; + case E::DUPLICATE: + return "Duplicate"; + case E::NO_INSTANCE: + return "No self instance"; + case E::NOT_A_VALIDATOR: + return "Node is not a validator"; + case E::NOT_SYNCHRONIZED: + return "Node not synchronized"; + case E::PEER_LIMIT_REACHED: + return "Peer limit reached"; + case E::PROTOCOL_MISMATCH: + return "Protocol mismatch"; + case E::NOT_CONFIRMED: + return "Candidate not confirmed"; + case E::NO_STATE: + return "No parachain state"; + case E::NO_SESSION_INFO: + return "No session info"; + case E::OUT_OF_BOUND: + return "Index out of bound"; + case E::INCORRECT_BITFIELD_SIZE: + return "Incorrect bitfield size"; + case E::INCORRECT_SIGNATURE: + return "Incorrect signature"; + case E::CLUSTER_TRACKER_ERROR: + return "Cluster tracker error"; + case E::PERSISTED_VALIDATION_DATA_NOT_FOUND: + return "Persisted validation data not found"; + case E::PERSISTED_VALIDATION_DATA_MISMATCH: + return "Persisted validation data mismatch"; + case E::CANDIDATE_HASH_MISMATCH: + return "Candidate hash mismatch"; + case E::PARENT_HEAD_DATA_MISMATCH: + return "Parent head data mismatch"; + case E::NO_PEER: + return "No peer"; + case E::ALREADY_REQUESTED: + return "Already requested"; + case E::NOT_ADVERTISED: + return "Not advertised"; + case E::WRONG_PARA: + return "Wrong para id"; + case E::THRESHOLD_LIMIT_REACHED: + return "Threshold reached"; + } + return "Unknown parachain processor error"; +} diff --git a/core/parachain/validator/i_parachain_processor.hpp b/core/parachain/validator/i_parachain_processor.hpp new file mode 100644 index 0000000000..535d8f50aa --- /dev/null +++ b/core/parachain/validator/i_parachain_processor.hpp @@ -0,0 +1,97 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include "network/types/collator_messages_vstaging.hpp" +#include "outcome/outcome.hpp" +#include "parachain/validator/statement_distribution/types.hpp" + +namespace kagome::parachain { + + class ParachainStorage { + public: + virtual ~ParachainStorage() = default; + + virtual outcome::result OnFetchChunkRequest( + const network::FetchChunkRequest &request) = 0; + + virtual outcome::result + OnFetchChunkRequestObsolete(const network::FetchChunkRequest &request) = 0; + + /** + * @brief Fetches the Proof of Validity (PoV) for a given candidate. + * + * @param candidate_hash The hash of the candidate for which the PoV is to + * be fetched. + * @return network::ResponsePov The PoV associated with the given candidate + * hash. + */ + virtual network::ResponsePov getPov(CandidateHash &&candidate_hash) = 0; + }; + + class ParachainProcessor { + public: + enum class Error { + RESPONSE_ALREADY_RECEIVED = 1, + COLLATION_NOT_FOUND, + KEY_NOT_PRESENT, + VALIDATION_FAILED, + VALIDATION_SKIPPED, + OUT_OF_VIEW, + DUPLICATE, + NO_INSTANCE, + NOT_A_VALIDATOR, + NOT_SYNCHRONIZED, + UNDECLARED_COLLATOR, + PEER_LIMIT_REACHED, + PROTOCOL_MISMATCH, + NOT_CONFIRMED, + NO_STATE, + NO_SESSION_INFO, + OUT_OF_BOUND, + REJECTED_BY_PROSPECTIVE_PARACHAINS, + INCORRECT_BITFIELD_SIZE, + CORE_INDEX_UNAVAILABLE, + INCORRECT_SIGNATURE, + CLUSTER_TRACKER_ERROR, + PERSISTED_VALIDATION_DATA_NOT_FOUND, + PERSISTED_VALIDATION_DATA_MISMATCH, + CANDIDATE_HASH_MISMATCH, + PARENT_HEAD_DATA_MISMATCH, + NO_PEER, + ALREADY_REQUESTED, + NOT_ADVERTISED, + WRONG_PARA, + THRESHOLD_LIMIT_REACHED, + }; + + virtual ~ParachainProcessor() = default; + + virtual void onValidationProtocolMsg( + const libp2p::peer::PeerId &peer_id, + const network::VersionedValidatorProtocolMessage &message) = 0; + + virtual void handle_advertisement( + const RelayHash &relay_parent, + const libp2p::peer::PeerId &peer_id, + std::optional> + &&prospective_candidate) = 0; + + virtual void onIncomingCollator(const libp2p::peer::PeerId &peer_id, + network::CollatorPublicKey pubkey, + network::ParachainId para_id) = 0; + + virtual outcome::result canProcessParachains() const = 0; + + virtual void handleStatement( + const primitives::BlockHash &relay_parent, + const SignedFullStatementWithPVD &statement) = 0; + }; + +} // namespace kagome::parachain + +OUTCOME_HPP_DECLARE_ERROR(kagome::parachain, ParachainProcessor::Error); diff --git a/core/parachain/validator/impl/parachain_observer_impl.cpp b/core/parachain/validator/impl/parachain_observer_impl.cpp index e28e566543..641d2ab886 100644 --- a/core/parachain/validator/impl/parachain_observer_impl.cpp +++ b/core/parachain/validator/impl/parachain_observer_impl.cpp @@ -20,12 +20,14 @@ namespace kagome::parachain { ParachainObserverImpl::ParachainObserverImpl( std::shared_ptr pm, std::shared_ptr crypto_provider, - std::shared_ptr processor, + std::shared_ptr processor, + std::shared_ptr parachain_storage, std::shared_ptr peer_view, std::shared_ptr approval_distribution) : pm_{std::move(pm)}, crypto_provider_{std::move(crypto_provider)}, processor_{std::move(processor)}, + parachain_storage_(std::move(parachain_storage)), peer_view_(std::move(peer_view)), approval_distribution_(std::move(approval_distribution)), logger_(log::createLogger("ParachainObserver", "parachain")) { @@ -34,6 +36,7 @@ namespace kagome::parachain { BOOST_ASSERT_MSG(processor_, "Parachain processor must be initialized!"); BOOST_ASSERT(peer_view_); BOOST_ASSERT(approval_distribution_); + BOOST_ASSERT(parachain_storage_); } void ParachainObserverImpl::onIncomingMessage( @@ -106,7 +109,7 @@ namespace kagome::parachain { outcome::result ParachainObserverImpl::OnPovRequest( network::RequestPov request) { // NOLINTNEXTLINE(hicpp-move-const-arg,performance-move-const-arg) - return processor_->getPov(std::move(request)); + return parachain_storage_->getPov(std::move(request)); } outcome::result diff --git a/core/parachain/validator/impl/parachain_observer_impl.hpp b/core/parachain/validator/impl/parachain_observer_impl.hpp index b708921905..44fd1125e7 100644 --- a/core/parachain/validator/impl/parachain_observer_impl.hpp +++ b/core/parachain/validator/impl/parachain_observer_impl.hpp @@ -21,7 +21,8 @@ namespace kagome::crypto { } namespace kagome::parachain { - class ParachainProcessorImpl; + class ParachainProcessor; + class ParachainStorage; class ApprovalDistribution; } // namespace kagome::parachain @@ -32,7 +33,8 @@ namespace kagome::parachain { ParachainObserverImpl( std::shared_ptr pm, std::shared_ptr crypto_provider, - std::shared_ptr processor, + std::shared_ptr processor, + std::shared_ptr parachain_storage, std::shared_ptr peer_view, std::shared_ptr approval_distribution); @@ -71,7 +73,8 @@ namespace kagome::parachain { std::shared_ptr pm_; std::shared_ptr crypto_provider_; - std::shared_ptr processor_; + std::shared_ptr processor_; + std::shared_ptr parachain_storage_; std::shared_ptr peer_view_; std::shared_ptr approval_distribution_; diff --git a/core/parachain/validator/impl/parachain_processor.cpp b/core/parachain/validator/impl/parachain_processor.cpp index 7e6901e953..c0fbae7569 100644 --- a/core/parachain/validator/impl/parachain_processor.cpp +++ b/core/parachain/validator/impl/parachain_processor.cpp @@ -53,116 +53,183 @@ } #endif // CHECK_OR_RET -OUTCOME_CPP_DEFINE_CATEGORY(kagome::parachain, - ParachainProcessorImpl::Error, - e) { - using E = kagome::parachain::ParachainProcessorImpl::Error; - switch (e) { - case E::RESPONSE_ALREADY_RECEIVED: - return "Response already present"; - case E::REJECTED_BY_PROSPECTIVE_PARACHAINS: - return "Rejected by prospective parachains"; - case E::COLLATION_NOT_FOUND: - return "Collation not found"; - case E::UNDECLARED_COLLATOR: - return "Undeclared collator"; - case E::KEY_NOT_PRESENT: - return "Private key is not present"; - case E::VALIDATION_FAILED: - return "Validate and make available failed"; - case E::VALIDATION_SKIPPED: - return "Validate and make available skipped"; - case E::OUT_OF_VIEW: - return "Out of view"; - case E::CORE_INDEX_UNAVAILABLE: - return "Core index unavailable"; - case E::DUPLICATE: - return "Duplicate"; - case E::NO_INSTANCE: - return "No self instance"; - case E::NOT_A_VALIDATOR: - return "Node is not a validator"; - case E::NOT_SYNCHRONIZED: - return "Node not synchronized"; - case E::PEER_LIMIT_REACHED: - return "Peer limit reached"; - case E::PROTOCOL_MISMATCH: - return "Protocol mismatch"; - case E::NOT_CONFIRMED: - return "Candidate not confirmed"; - case E::NO_STATE: - return "No parachain state"; - case E::NO_SESSION_INFO: - return "No session info"; - case E::OUT_OF_BOUND: - return "Index out of bound"; - case E::INCORRECT_BITFIELD_SIZE: - return "Incorrect bitfield size"; - case E::INCORRECT_SIGNATURE: - return "Incorrect signature"; - case E::CLUSTER_TRACKER_ERROR: - return "Cluster tracker error"; - case E::PERSISTED_VALIDATION_DATA_NOT_FOUND: - return "Persisted validation data not found"; - case E::PERSISTED_VALIDATION_DATA_MISMATCH: - return "Persisted validation data mismatch"; - case E::CANDIDATE_HASH_MISMATCH: - return "Candidate hash mismatch"; - case E::PARENT_HEAD_DATA_MISMATCH: - return "Parent head data mismatch"; - case E::NO_PEER: - return "No peer"; - case E::ALREADY_REQUESTED: - return "Already requested"; - case E::NOT_ADVERTISED: - return "Not advertised"; - case E::WRONG_PARA: - return "Wrong para id"; - case E::THRESHOLD_LIMIT_REACHED: - return "Threshold reached"; - } - return "Unknown parachain processor error"; -} - namespace { constexpr const char *kIsParachainValidator = "kagome_node_is_parachain_validator"; } namespace kagome::parachain { + std::vector + ParachainProcessorEmpty::getBackedCandidates(const RelayHash &relay_parent) { + return {}; + } + + void ParachainProcessorEmpty::process_vstaging_statement( + const libp2p::peer::PeerId &peer_id, + const network::vstaging::StatementDistributionMessage &msg) { + SL_TRACE( + logger_, "Incoming `StatementDistributionMessage`. (peer={})", peer_id); + + if (auto inner = + if_type( + msg)) { + status_.exclusiveAccess([](Status &status) { ++status.ack_counter_; }); + statement_distribution_->handle_incoming_acknowledgement(peer_id, + inner->get()); + } else if (auto manifest = + if_type( + msg)) { + status_.exclusiveAccess( + [](Status &status) { ++status.manifest_counter_; }); + statement_distribution_->handle_incoming_manifest(peer_id, + manifest->get()); + } else if (auto stm = + if_type(msg)) { + status_.exclusiveAccess( + [](Status &status) { ++status.statement_counter_; }); + statement_distribution_->handle_incoming_statement(peer_id, stm->get()); + } else { + SL_ERROR(logger_, "Skipped message."); + } + } + + void ParachainProcessorEmpty::process_legacy_statement( + const libp2p::peer::PeerId &peer_id, + const network::StatementDistributionMessage &msg) { + status_.exclusiveAccess( + [](Status &status) { ++status.legacy_statement_counter_; }); + } + + ParachainProcessorEmpty::ParachainProcessorEmpty( + application::AppStateManager &app_state_manager, + std::shared_ptr av_store, + std::shared_ptr + statement_distribution) + : ParachainStorageImpl(std::move(av_store)), + statement_distribution_(statement_distribution) { + app_state_manager.takeControl(*this); + } + + bool ParachainProcessorEmpty::prepare() { + statement_distribution_->store_parachain_processor(weak_from_this()); + std::thread t4([wself{weak_from_this()}]() { + log::Logger logger = + log::createLogger("ParachainProcessorEmpty1", "parachain"); + + SL_TRACE(logger, "enter"); + + auto prev = std::chrono::steady_clock::now(); + while (true) { + std::this_thread::sleep_for(std::chrono::seconds(2)); + if (auto self = wself.lock()) { + const auto now = std::chrono::steady_clock::now(); + const auto diff = + std::chrono::duration_cast(now - prev) + .count(); + if (diff > 0) { + self->status_.exclusiveAccess([&](Status &status) { + SL_TRACE( + logger, + "[STATISTICS]:\nlegacy_statement_counter:{}\nack_counter:{}" + "\nmanifest_counter:{}\nstatement_counter:{}", + status.legacy_statement_counter_, + status.ack_counter_, + status.manifest_counter_, + status.statement_counter_); + + status.legacy_statement_counter_ = 0; + status.ack_counter_ = 0; + status.manifest_counter_ = 0; + status.statement_counter_ = 0; + }); + prev = now; + } + } else { + break; + } + } + + SL_TRACE(logger, "exit"); + }); + t4.detach(); + return true; + } + + void ParachainProcessorEmpty::onValidationProtocolMsg( + const libp2p::peer::PeerId &peer_id, + const network::VersionedValidatorProtocolMessage &message) { + SL_TRACE( + logger_, "Incoming validator protocol message. (peer={})", peer_id); + visit_in_place( + message, + [&](const network::ValidatorProtocolMessage &m) { + SL_TRACE(logger_, "V1"); + visit_in_place( + m, + [&](const network::StatementDistributionMessage &val) { + process_legacy_statement(peer_id, val); + }, + [&](const auto &) {}); + }, + [&](const network::vstaging::ValidatorProtocolMessage &m) { + SL_TRACE(logger_, "V2"); + visit_in_place( + m, + [&](const network::vstaging::StatementDistributionMessage &val) { + process_vstaging_statement(peer_id, val); + }, + [&](const auto &) {}); + }, + [&](const auto &m) { SL_WARN(logger_, "UNSUPPORTED Version"); }); + } + + void ParachainProcessorEmpty::handle_advertisement( + const RelayHash &relay_parent, + const libp2p::peer::PeerId &peer_id, + std::optional> &&prospective_candidate) {} + + void ParachainProcessorEmpty::onIncomingCollator( + const libp2p::peer::PeerId &peer_id, + network::CollatorPublicKey pubkey, + network::ParachainId para_id) {} + + outcome::result ParachainProcessorEmpty::canProcessParachains() const { + return outcome::success(); + } + + void ParachainProcessorEmpty::handleStatement( + const primitives::BlockHash &relay_parent, + const SignedFullStatementWithPVD &statement) {} ParachainProcessorImpl::ParachainProcessorImpl( std::shared_ptr pm, - std::shared_ptr runtime_info, std::shared_ptr crypto_provider, std::shared_ptr router, - common::MainThreadPool &main_thread_pool, std::shared_ptr hasher, - std::shared_ptr peer_view, - common::WorkerThreadPool &worker_thread_pool, - std::shared_ptr bitfield_signer, - std::shared_ptr pvf_precheck, + std::shared_ptr peer_view, + std::shared_ptr bitfield_signer, + std::shared_ptr pvf_precheck, std::shared_ptr bitfield_store, std::shared_ptr backing_store, std::shared_ptr pvf, std::shared_ptr av_store, std::shared_ptr parachain_host, - std::shared_ptr signer_factory, + std::shared_ptr signer_factory, const application::AppConfiguration &app_config, application::AppStateManager &app_state_manager, primitives::events::ChainSubscriptionEnginePtr chain_sub_engine, primitives::events::SyncStateSubscriptionEnginePtr sync_state_observable, std::shared_ptr query_audi, - std::shared_ptr prospective_parachains, + std::shared_ptr prospective_parachains, std::shared_ptr block_tree, LazySPtr slots_util, std::shared_ptr babe_config_repo, - std::shared_ptr sd) - : pm_(std::move(pm)), - runtime_info_(std::move(runtime_info)), + std::shared_ptr sd) + : ParachainStorageImpl(std::move(av_store)), + pm_(std::move(pm)), crypto_provider_(std::move(crypto_provider)), router_(std::move(router)), - main_pool_handler_{main_thread_pool.handler(app_state_manager)}, hasher_(std::move(hasher)), peer_view_(std::move(peer_view)), pvf_(std::move(pvf)), @@ -171,7 +238,6 @@ namespace kagome::parachain { pvf_precheck_(std::move(pvf_precheck)), bitfield_store_(std::move(bitfield_store)), backing_store_(std::move(backing_store)), - av_store_(std::move(av_store)), parachain_host_(std::move(parachain_host)), app_config_(app_config), sync_state_observable_(std::move(sync_state_observable)), @@ -179,7 +245,6 @@ namespace kagome::parachain { slots_util_(slots_util), babe_config_repo_(std::move(babe_config_repo)), chain_sub_{std::move(chain_sub_engine)}, - worker_pool_handler_{worker_thread_pool.handler(app_state_manager)}, prospective_parachains_{std::move(prospective_parachains)}, block_tree_{std::move(block_tree)}, statement_distribution(std::move(sd)), @@ -189,19 +254,16 @@ namespace kagome::parachain { BOOST_ASSERT(crypto_provider_); BOOST_ASSERT(babe_config_repo_); BOOST_ASSERT(router_); - BOOST_ASSERT(main_pool_handler_); BOOST_ASSERT(hasher_); BOOST_ASSERT(bitfield_signer_); BOOST_ASSERT(bitfield_store_); BOOST_ASSERT(backing_store_); BOOST_ASSERT(pvf_); - BOOST_ASSERT(av_store_); BOOST_ASSERT(parachain_host_); BOOST_ASSERT(signer_factory_); BOOST_ASSERT(sync_state_observable_); BOOST_ASSERT(query_audi_); BOOST_ASSERT(prospective_parachains_); - BOOST_ASSERT(worker_pool_handler_); BOOST_ASSERT(block_tree_); BOOST_ASSERT(statement_distribution); app_state_manager.takeControl(*this); @@ -224,7 +286,6 @@ namespace kagome::parachain { void ParachainProcessorImpl::OnBroadcastBitfields( const primitives::BlockHash &relay_parent, const network::SignedBitfield &bitfield) { - REINVOKE(*main_pool_handler_, OnBroadcastBitfields, relay_parent, bitfield); SL_TRACE(logger_, "Distribute bitfield on {}", relay_parent); router_->getValidationProtocol()->write(network::BitfieldDistribution{ .relay_parent = relay_parent, @@ -274,7 +335,7 @@ namespace kagome::parachain { // view. my_view_sub_ = primitives::events::subscribe( peer_view_->getMyViewObservable(), - network::PeerView::EventType::kViewUpdated, + network::IPeerView::EventType::kViewUpdated, [WEAK_SELF](const network::ExView &event) { WEAK_LOCK(self); self->onViewUpdated(event); @@ -284,9 +345,34 @@ namespace kagome::parachain { } void ParachainProcessorImpl::onViewUpdated(const network::ExView &event) { - REINVOKE(*main_pool_handler_, onViewUpdated, event); - CHECK_OR_RET(canProcessParachains().has_value()); const auto &relay_parent = event.new_head.hash(); + existed_leaves_[relay_parent] = event.new_head.number; + for (const auto &l : event.lost) { + existed_leaves_.erase(l); + } + + if ((event.new_head.number % 10) == 0) { + std::map> tmp; + for (const auto &[h, i] : existed_leaves_) { + tmp[i].emplace_back(h); + } + + size_t counter = 0; + for (const auto &[n, hs] : tmp) { + for (const auto &h : hs) { + SL_TRACE(logger_, "[PARACHAIN PROC]: ACTIVE LEAF {} - {}", n, h); + if (++counter >= 10) { + break; + } + } + if (counter >= 10) { + break; + } + } + } + + CHECK_OR_RET(canProcessParachains().has_value()); + SL_TRACE(logger_, "===> ACTIVE LEAF {}", relay_parent); /// init `prospective_parachains` subsystem if (const auto r = @@ -345,6 +431,7 @@ namespace kagome::parachain { mode ? std::move(pruned_h) : std::vector{removed}; for (const auto &removed : pruned) { + std::cout << fmt::format("---> PRUNED {}\n", removed); our_current_state_.state_by_relay_parent.erase(removed); { /// remove cancelations @@ -390,8 +477,6 @@ namespace kagome::parachain { void ParachainProcessorImpl::onDeactivateBlocks( const primitives::events::RemoveAfterFinalizationParams &event) { - REINVOKE(*main_pool_handler_, onDeactivateBlocks, event); - for (const auto &lost : event.removed) { SL_TRACE(logger_, "Remove from storages.(relay parent={}, number={})", @@ -403,7 +488,7 @@ namespace kagome::parachain { } } - outcome::result> + outcome::result>> ParachainProcessorImpl::isParachainValidator( const primitives::BlockHash &relay_parent) const { return signer_factory_->at(relay_parent); @@ -447,6 +532,7 @@ namespace kagome::parachain { * group. Finally, it returns a `RelayParentState` object that contains the * assignment, validator index, required collator, and table context. */ + bool is_parachain_validator = false; ::libp2p::common::FinalAction metric_updater{ [&] { metric_is_parachain_validator_->set(is_parachain_validator); }}; @@ -490,11 +576,11 @@ namespace kagome::parachain { // https://github.com/paritytech/polkadot-sdk/blob/1e3b8e1639c1cf784eabf0a9afcab1f3987e0ca4/polkadot/node/network/collator-protocol/src/validator_side/mod.rs#L487-L495 CoreIndex current_core = 0; if (validator) { - validator_index = validator->validatorIndex(); + validator_index = (*validator)->validatorIndex(); size_t i_group = 0; for (auto &group : validator_groups) { - if (group.contains(validator->validatorIndex())) { + if (group.contains((*validator)->validatorIndex())) { current_core = group_rotation_info.coreForGroup(i_group, cores.size()); break; @@ -609,8 +695,6 @@ namespace kagome::parachain { const primitives::BlockHash &relay_parent, const network::HashedBlockHeader &block_header, const std::vector &lost) { - BOOST_ASSERT(main_pool_handler_->isInCurrentThread()); - using LeafHasProspectiveParachains = std::optional>; LeafHasProspectiveParachains res; @@ -654,6 +738,7 @@ namespace kagome::parachain { ++it; } else { _keeper_.emplace_back(it->second.per_session_state); + std::cout << fmt::format("---> ERASED {}\n", it->first); it = our_current_state_.state_by_relay_parent.erase(it); } } @@ -783,11 +868,6 @@ namespace kagome::parachain { void ParachainProcessorImpl::handle_collation_fetch_response( network::CollationEvent &&collation_event, network::CollationFetchingResponse &&response) { - REINVOKE(*main_pool_handler_, - handle_collation_fetch_response, - std::move(collation_event), - std::move(response)); - const auto &pending_collation = collation_event.pending_collation; SL_TRACE(logger_, "Processing collation from {}, relay parent: {}, para id: {}", @@ -979,7 +1059,6 @@ namespace kagome::parachain { void ParachainProcessorImpl::process_bitfield_distribution( const network::BitfieldDistributionMessage &val) { - BOOST_ASSERT(main_pool_handler_->isInCurrentThread()); auto bd{boost::get(&val)}; BOOST_ASSERT_MSG( bd, "BitfieldDistribution is not present. Check message format."); @@ -1035,7 +1114,6 @@ namespace kagome::parachain { void ParachainProcessorImpl::process_vstaging_statement( const libp2p::peer::PeerId &peer_id, const network::vstaging::StatementDistributionMessage &msg) { - BOOST_ASSERT(main_pool_handler_->isInCurrentThread()); SL_TRACE( logger_, "Incoming `StatementDistributionMessage`. (peer={})", peer_id); @@ -1061,7 +1139,6 @@ namespace kagome::parachain { void ParachainProcessorImpl::process_legacy_statement( const libp2p::peer::PeerId &peer_id, const network::StatementDistributionMessage &msg) { - BOOST_ASSERT(main_pool_handler_->isInCurrentThread()); if (auto statement_msg{boost::get(&msg)}) { CHECK_OR_RET(canProcessParachains().has_value()); if (auto r = isParachainValidator(statement_msg->relay_parent); @@ -1119,8 +1196,6 @@ namespace kagome::parachain { void ParachainProcessorImpl::onValidationProtocolMsg( const libp2p::peer::PeerId &peer_id, const network::VersionedValidatorProtocolMessage &message) { - REINVOKE(*main_pool_handler_, onValidationProtocolMsg, peer_id, message); - SL_TRACE( logger_, "Incoming validator protocol message. (peer={})", peer_id); visit_in_place( @@ -1173,8 +1248,6 @@ namespace kagome::parachain { AttestingData &attesting_data, const runtime::PersistedValidationData &persisted_validation_data, RelayParentState ¶chain_state) { - BOOST_ASSERT(main_pool_handler_->isInCurrentThread()); - const auto candidate_hash{attesting_data.candidate.hash(*hasher_)}; CHECK_OR_RET(!parachain_state.issued_statements.contains(candidate_hash)); @@ -1231,48 +1304,19 @@ namespace kagome::parachain { relay_parent, candidate_hash, peer_id); - self->validateAsync( - candidate, std::move(*p), std::move(pvd), relay_parent); + self->validateAsync(ValidationTaskType::kAttest, + candidate, + std::move(*p), + std::move(pvd), + relay_parent); }); } } - outcome::result - ParachainProcessorImpl::OnFetchChunkRequest( - const network::FetchChunkRequest &request) { - if (auto chunk = - av_store_->getChunk(request.candidate, request.chunk_index)) { - return network::Chunk{ - .data = chunk->chunk, - .chunk_index = request.chunk_index, - .proof = chunk->proof, - }; - } - return network::Empty{}; - } - - outcome::result - ParachainProcessorImpl::OnFetchChunkRequestObsolete( - const network::FetchChunkRequest &request) { - if (auto chunk = - av_store_->getChunk(request.candidate, request.chunk_index)) { - // This check needed because v1 protocol mustn't have chunk mapping - // https://github.com/paritytech/polkadot-sdk/blob/d2fd53645654d3b8e12cbf735b67b93078d70113/polkadot/node/core/av-store/src/lib.rs#L1345 - if (chunk->index == request.chunk_index) { - return network::ChunkObsolete{ - .data = chunk->chunk, - .proof = chunk->proof, - }; - } - } - return network::Empty{}; - } - std::optional< std::reference_wrapper> ParachainProcessorImpl::tryGetStateByRelayParent( const primitives::BlockHash &relay_parent) { - BOOST_ASSERT(main_pool_handler_->isInCurrentThread()); const auto it = our_current_state_.state_by_relay_parent.find(relay_parent); if (it != our_current_state_.state_by_relay_parent.end()) { return it->second; @@ -1293,7 +1337,6 @@ namespace kagome::parachain { ParachainProcessorImpl::RelayParentState & ParachainProcessorImpl::storeStateByRelayParent( const primitives::BlockHash &relay_parent, RelayParentState &&val) { - BOOST_ASSERT(main_pool_handler_->isInCurrentThread()); const auto &[it, inserted] = our_current_state_.state_by_relay_parent.insert( {relay_parent, std::move(val)}); @@ -1304,7 +1347,6 @@ namespace kagome::parachain { void ParachainProcessorImpl::handleStatement( const primitives::BlockHash &relay_parent, const SignedFullStatementWithPVD &statement) { - REINVOKE(*main_pool_handler_, handleStatement, relay_parent, statement); TRY_GET_OR_RET(opt_parachain_state, tryGetStateByRelayParent(relay_parent)); auto ¶chain_state = opt_parachain_state->get(); @@ -1369,9 +1411,10 @@ namespace kagome::parachain { return std::nullopt; } - const auto our_index = utils::map( - table_context.validator, - [](const auto &signer) { return signer.validatorIndex(); }); + const auto our_index = + utils::map(table_context.validator, [](const auto &signer) { + return signer->validatorIndex(); + }); if (our_index && *our_index == statement.payload.ix) { return std::nullopt; } @@ -1421,7 +1464,6 @@ namespace kagome::parachain { outcome::result ParachainProcessorImpl::get_block_number_under_construction( const RelayHash &relay_parent) const { - BOOST_ASSERT(main_pool_handler_->isInCurrentThread()); OUTCOME_TRY(header, block_tree_->tryGetBlockHeader(relay_parent)); if (not header) { return 0; @@ -1459,7 +1501,6 @@ namespace kagome::parachain { std::vector ParachainProcessorImpl::getBackedCandidates(const RelayHash &relay_parent) { - BOOST_ASSERT(main_pool_handler_->isInCurrentThread()); SL_TRACE(logger_, "Get backed candidates. (relay_parent={})", relay_parent); auto relay_parent_state_opt = tryGetStateByRelayParent(relay_parent); @@ -2094,8 +2135,8 @@ namespace kagome::parachain { T &&payload, RelayParentState ¶chain_state) { /// TODO(iceseer): /// https://github.com/paritytech/polkadot/blob/master/primitives/src/v2/mod.rs#L1535-L1545 - auto sign_result = - parachain_state.table_context.validator->sign(std::forward(payload)); + auto sign_result = (*parachain_state.table_context.validator) + ->sign(std::forward(payload)); if (sign_result.has_error()) { logger_->error( "Unable to sign Commited Candidate Receipt. Failed with error: {}", @@ -2106,14 +2147,6 @@ namespace kagome::parachain { return sign_result.value(); } - network::ResponsePov ParachainProcessorImpl::getPov( - CandidateHash &&candidate_hash) { - if (auto res = av_store_->getPov(candidate_hash)) { - return network::ResponsePov{*res}; - } - return network::Empty{}; - } - void ParachainProcessorImpl::onIncomingCollator( const libp2p::peer::PeerId &peer_id, network::CollatorPublicKey pubkey, @@ -2160,13 +2193,9 @@ namespace kagome::parachain { }); } - template void ParachainProcessorImpl::notifyInvalid( const primitives::BlockHash &parent, const network::CandidateReceipt &candidate_receipt) { - REINVOKE_ONCE( - *main_pool_handler_, notifyInvalid, parent, candidate_receipt); - our_current_state_.validator_side.blocked_from_seconding.erase( BlockedCollationId(candidate_receipt.descriptor.para_id, candidate_receipt.descriptor.para_head_hash)); @@ -2199,12 +2228,9 @@ namespace kagome::parachain { dequeue_next_collation_and_fetch(parent, {id, candidate_hash}); } - template void ParachainProcessorImpl::notifySeconded( const primitives::BlockHash &parent, const SignedFullStatementWithPVD &statement) { - REINVOKE_ONCE(*main_pool_handler_, notifySeconded, parent, statement); - auto seconded = if_type(getPayload(statement)); if (!seconded) { @@ -2378,15 +2404,10 @@ namespace kagome::parachain { logger_->trace("Put chunks set.(candidate={})", candidate_hash); } - template void ParachainProcessorImpl::makeAvailable( + ValidationTaskType kMode, const primitives::BlockHash &candidate_hash, ValidateAndSecondResult &&validate_and_second_result) { - REINVOKE(*main_pool_handler_, - makeAvailable, - candidate_hash, - std::move(validate_and_second_result)); - TRY_GET_OR_RET( parachain_state, tryGetStateByRelayParent(validate_and_second_result.relay_parent)); @@ -2397,31 +2418,24 @@ namespace kagome::parachain { parachain_state->get().awaiting_validation.erase(candidate_hash); auto q{std::move(validate_and_second_result)}; - if constexpr (kMode == ValidationTaskType::kSecond) { + if (kMode == ValidationTaskType::kSecond) { onValidationComplete(q); } else { onAttestComplete(q); } } - template void ParachainProcessorImpl::validateAsync( - network::CandidateReceipt candidate, - network::ParachainBlock &&pov, - runtime::PersistedValidationData &&pvd, - const primitives::BlockHash &_relay_parent) { - REINVOKE(*main_pool_handler_, - validateAsync, - candidate, - std::move(pov), - std::move(pvd), - _relay_parent); - const auto relay_parent = candidate.descriptor.relay_parent; - + ValidationTaskType kMode, + const network::CandidateReceipt &candidate, + const network::ParachainBlock &pov, + const runtime::PersistedValidationData &pvd, + const primitives::BlockHash &) { + const auto relay_parent = candidate.descriptor.relay_parent; TRY_GET_OR_RET(parachain_state, tryGetStateByRelayParent(candidate.descriptor.relay_parent)); const auto candidate_hash{candidate.hash(*hasher_)}; - if constexpr (kMode == ValidationTaskType::kAttest) { + if (kMode == ValidationTaskType::kAttest) { CHECK_OR_RET( !parachain_state->get().issued_statements.contains(candidate_hash)); } @@ -2439,74 +2453,85 @@ namespace kagome::parachain { /// TODO(iceseer): do https://github.com/qdrvm/kagome/issues/1888 /// checks if we still need to execute parachain task auto _measure = std::make_shared("Parachain validation", logger_); - auto cb = [weak_self{weak_from_this()}, - candidate, - pov, - pvd, - relay_parent, - n_validators{ - parachain_state->get().table_context.validators.size()}, - _measure, - candidate_hash]( - outcome::result validation_result) mutable { - TRY_GET_OR_RET(self, weak_self.lock()); - if (!validation_result) { - SL_WARN(self->logger_, - "Candidate {} on relay_parent {}, para_id {} validation failed " - "with " - "error: {}", - candidate_hash, - candidate.descriptor.relay_parent, - candidate.descriptor.para_id, - validation_result.error()); - return; - } + pvf_->pvf( + candidate, + pov, + pvd, + [weak_self{weak_from_this()}, + kMode, + candidate, + pov, + pvd, + relay_parent, + n_validators{parachain_state->get().table_context.validators.size()}, + _measure, + candidate_hash](outcome::result r) mutable { + TRY_GET_OR_RET(self, weak_self.lock()); + self->on_pvf_result_received(kMode, + n_validators, + candidate, + pov, + pvd, + relay_parent, + candidate_hash, + r); + }); + } - auto &[comms, data] = validation_result.value(); - runtime::AvailableData available_data{ - .pov = std::move(pov), - .validation_data = std::move(data), - }; + void ParachainProcessorImpl::on_pvf_result_received( + ValidationTaskType kMode, + size_t n_validators, + const network::CandidateReceipt &candidate, + const network::ParachainBlock &pov, + const runtime::PersistedValidationData &pvd, + const primitives::BlockHash &relay_parent, + const Hash &candidate_hash, + const outcome::result &validation_result) { + if (!validation_result) { + SL_WARN(logger_, + "Candidate {} on relay_parent {}, para_id {} validation failed " + "with " + "error: {}", + candidate_hash, + candidate.descriptor.relay_parent, + candidate.descriptor.para_id, + validation_result.error()); + return; + } - auto chunks_res = - self->validateErasureCoding(available_data, n_validators); - if (chunks_res.has_error()) { - SL_WARN(self->logger_, - "Erasure coding validation failed. (error={})", - chunks_res.error()); - return; - } - auto &chunks = chunks_res.value(); + const auto &[comms, data] = validation_result.value(); + runtime::AvailableData available_data{ + .pov = pov, + .validation_data = data, + }; - self->notifyAvailableData(std::move(chunks), - relay_parent, - candidate_hash, - available_data.pov, - available_data.validation_data); + auto chunks_res = validateErasureCoding(available_data, n_validators); + if (chunks_res.has_error()) { + SL_WARN(logger_, + "Erasure coding validation failed. (error={})", + chunks_res.error()); + return; + } + auto &chunks = chunks_res.value(); - self->makeAvailable( - candidate_hash, - ValidateAndSecondResult{ - .result = outcome::success(), - .relay_parent = relay_parent, - .commitments = std::make_shared( - std::move(comms)), - .candidate = candidate, - .pov = std::move(available_data.pov), - .pvd = std::move(pvd), - }); - }; - pvf_->pvf(candidate, - pov, - pvd, - [weak_self{weak_from_this()}, - cb{std::move(cb)}](outcome::result r) mutable { - TRY_GET_OR_RET(self, weak_self.lock()); - post(*self->main_pool_handler_, - [cb{std::move(cb)}, r{std::move(r)}]() mutable { - cb(std::move(r)); - }); - }); + notifyAvailableData(std::move(chunks), + relay_parent, + candidate_hash, + available_data.pov, + available_data.validation_data); + + makeAvailable( + kMode, + candidate_hash, + ValidateAndSecondResult{ + .result = outcome::success(), + .relay_parent = relay_parent, + .commitments = std::make_shared( + std::move(comms)), + .candidate = candidate, + .pov = std::move(available_data.pov), + .pvd = std::move(pvd), + }); } void ParachainProcessorImpl::onAttestComplete( @@ -2678,8 +2703,6 @@ namespace kagome::parachain { outcome::result ParachainProcessorImpl::kick_off_seconding( network::PendingCollationFetch &&pending_collation_fetch) { - BOOST_ASSERT(main_pool_handler_->isInCurrentThread()); - auto &collation_event = pending_collation_fetch.collation_event; auto pending_collation = collation_event.pending_collation; auto relay_parent = pending_collation.relay_parent; @@ -2783,11 +2806,11 @@ namespace kagome::parachain { std::reference_wrapper>>{})); collations.status = CollationStatus::WaitingOnValidation; - validateAsync( - pending_collation_fetch.candidate_receipt, - std::move(pending_collation_fetch.pov), - std::move(pvd->get()), - relay_parent); + validateAsync(ValidationTaskType::kSecond, + pending_collation_fetch.candidate_receipt, + std::move(pending_collation_fetch.pov), + std::move(pvd->get()), + relay_parent); our_current_state_.validator_side.fetched_candidates.emplace( fetched_collation, collation_event); @@ -2921,12 +2944,6 @@ namespace kagome::parachain { const RelayHash &relay_parent, const libp2p::peer::PeerId &peer_id, std::optional> &&prospective_candidate) { - REINVOKE(*main_pool_handler_, - handle_advertisement, - relay_parent, - peer_id, - std::move(prospective_candidate)); - TRY_GET_OR_RET(opt_per_relay_parent, tryGetStateByRelayParent(relay_parent)); auto &per_relay_parent = opt_per_relay_parent->get(); @@ -3017,7 +3034,6 @@ namespace kagome::parachain { const libp2p::peer::PeerId &peer_id, const CollatorId &collator_id, std::optional> &&prospective_candidate) { - BOOST_ASSERT(main_pool_handler_->isInCurrentThread()); SL_TRACE(logger_, "Received advertise collation. (peer id={}, para id={}, relay " "parent={})", diff --git a/core/parachain/validator/parachain_processor.hpp b/core/parachain/validator/parachain_processor.hpp index 3a15e5ba24..6698c8698b 100644 --- a/core/parachain/validator/parachain_processor.hpp +++ b/core/parachain/validator/parachain_processor.hpp @@ -22,14 +22,13 @@ #include "consensus/timeline/slots_util.hpp" #include "crypto/hasher.hpp" #include "metrics/metrics.hpp" +#include "network/i_peer_view.hpp" #include "network/peer_manager.hpp" -#include "network/peer_view.hpp" #include "network/protocols/req_collation_protocol.hpp" #include "network/router.hpp" #include "network/types/collator_messages_vstaging.hpp" #include "outcome/outcome.hpp" #include "parachain/availability/bitfield/signer.hpp" -#include "parachain/availability/store/store.hpp" #include "parachain/backing/cluster.hpp" #include "parachain/backing/store.hpp" #include "parachain/pvf/precheck.hpp" @@ -37,9 +36,11 @@ #include "parachain/validator/backed_candidates_source.hpp" #include "parachain/validator/backing_implicit_view.hpp" #include "parachain/validator/collations.hpp" +#include "parachain/validator/i_parachain_processor.hpp" +#include "parachain/validator/parachain_storage.hpp" #include "parachain/validator/prospective_parachains/prospective_parachains.hpp" #include "parachain/validator/signer.hpp" -#include "parachain/validator/statement_distribution/statement_distribution.hpp" +#include "parachain/validator/statement_distribution/i_statement_distribution.hpp" #include "parachain/validator/statement_distribution/types.hpp" #include "primitives/common.hpp" #include "primitives/event_types.hpp" @@ -117,45 +118,70 @@ struct std::hash { }; namespace kagome::parachain { - class ParachainProcessorImpl - : public std::enable_shared_from_this, - public BackedCandidatesSource { - public: - enum class Error { - RESPONSE_ALREADY_RECEIVED = 1, - COLLATION_NOT_FOUND, - KEY_NOT_PRESENT, - VALIDATION_FAILED, - VALIDATION_SKIPPED, - OUT_OF_VIEW, - DUPLICATE, - NO_INSTANCE, - NOT_A_VALIDATOR, - NOT_SYNCHRONIZED, - UNDECLARED_COLLATOR, - PEER_LIMIT_REACHED, - PROTOCOL_MISMATCH, - NOT_CONFIRMED, - NO_STATE, - NO_SESSION_INFO, - OUT_OF_BOUND, - REJECTED_BY_PROSPECTIVE_PARACHAINS, - INCORRECT_BITFIELD_SIZE, - CORE_INDEX_UNAVAILABLE, - INCORRECT_SIGNATURE, - CLUSTER_TRACKER_ERROR, - PERSISTED_VALIDATION_DATA_NOT_FOUND, - PERSISTED_VALIDATION_DATA_MISMATCH, - CANDIDATE_HASH_MISMATCH, - PARENT_HEAD_DATA_MISMATCH, - NO_PEER, - ALREADY_REQUESTED, - NOT_ADVERTISED, - WRONG_PARA, - THRESHOLD_LIMIT_REACHED, + + class ParachainProcessorEmpty + : public ParachainStorageImpl, + public std::enable_shared_from_this, + public BackedCandidatesSource, + public ParachainProcessor { + struct Status { + size_t legacy_statement_counter_ = 0; + size_t ack_counter_ = 0; + size_t manifest_counter_ = 0; + size_t statement_counter_ = 0; }; - static constexpr uint64_t kBackgroundWorkers = 5; + SafeObject status_; + log::Logger logger_ = + log::createLogger("ParachainProcessorEmpty", "parachain"); + std::shared_ptr + statement_distribution_; + + void process_vstaging_statement( + const libp2p::peer::PeerId &peer_id, + const network::vstaging::StatementDistributionMessage &msg); + + void process_legacy_statement( + const libp2p::peer::PeerId &peer_id, + const network::StatementDistributionMessage &msg); + + public: + ParachainProcessorEmpty( + application::AppStateManager &app_state_manager, + std::shared_ptr av_store, + std::shared_ptr + statement_distribution); + + bool prepare(); + + void onValidationProtocolMsg( + const libp2p::peer::PeerId &peer_id, + const network::VersionedValidatorProtocolMessage &message) override; + + void handle_advertisement(const RelayHash &relay_parent, + const libp2p::peer::PeerId &peer_id, + std::optional> + &&prospective_candidate) override; + + void onIncomingCollator(const libp2p::peer::PeerId &peer_id, + network::CollatorPublicKey pubkey, + network::ParachainId para_id) override; + + outcome::result canProcessParachains() const override; + + void handleStatement(const primitives::BlockHash &relay_parent, + const SignedFullStatementWithPVD &statement) override; + + std::vector getBackedCandidates( + const RelayHash &relay_parent) override; + }; + + class ParachainProcessorImpl + : public ParachainStorageImpl, + public std::enable_shared_from_this, + public BackedCandidatesSource, + public ParachainProcessor { + public: struct ImportStatementSummary { BackingStore::ImportResult imported; /// Attested more than threshold @@ -164,34 +190,41 @@ namespace kagome::parachain { enum struct ValidationTaskType { kSecond, kAttest }; + using Commitments = std::shared_ptr; + struct ValidateAndSecondResult { + outcome::result result; + primitives::BlockHash relay_parent; + Commitments commitments; + network::CandidateReceipt candidate; + network::ParachainBlock pov; + runtime::PersistedValidationData pvd; + }; + ParachainProcessorImpl( std::shared_ptr pm, - std::shared_ptr runtime_info, std::shared_ptr crypto_provider, std::shared_ptr router, - common::MainThreadPool &main_thread_pool, std::shared_ptr hasher, - std::shared_ptr peer_view, - common::WorkerThreadPool &worker_thread_pool, - std::shared_ptr bitfield_signer, - std::shared_ptr pvf_precheck, + std::shared_ptr peer_view, + std::shared_ptr bitfield_signer, + std::shared_ptr pvf_precheck, std::shared_ptr bitfield_store, std::shared_ptr backing_store, std::shared_ptr pvf, std::shared_ptr av_store, std::shared_ptr parachain_host, - std::shared_ptr signer_factory, + std::shared_ptr signer_factory, const application::AppConfiguration &app_config, application::AppStateManager &app_state_manager, primitives::events::ChainSubscriptionEnginePtr chain_sub_engine, primitives::events::SyncStateSubscriptionEnginePtr sync_state_observable, std::shared_ptr query_audi, - std::shared_ptr prospective_parachains, + std::shared_ptr prospective_parachains, std::shared_ptr block_tree, LazySPtr slots_util, std::shared_ptr babe_config_repo, - std::shared_ptr + std::shared_ptr statement_distribution); ~ParachainProcessorImpl() = default; @@ -206,25 +239,12 @@ namespace kagome::parachain { */ bool prepare(); - /** - * @brief Handles an incoming advertisement for a collation. - * - * @param pending_collation The CollationEvent representing the collation - * being advertised. - * @param prospective_candidate An optional pair containing the hash of the - * prospective candidate and the hash of the parent block. - */ - void handle_advertisement( - const RelayHash &relay_parent, - const libp2p::peer::PeerId &peer_id, - std::optional> &&prospective_candidate); - /** * @ brief We should only process parachains if we are validator and we are * @return outcome::result Returns an error if we cannot process the * parachains. */ - outcome::result canProcessParachains() const; + outcome::result canProcessParachains() const override; /** * @brief Handles an incoming collator. @@ -238,17 +258,89 @@ namespace kagome::parachain { */ void onIncomingCollator(const libp2p::peer::PeerId &peer_id, network::CollatorPublicKey pubkey, - network::ParachainId para_id); + network::ParachainId para_id) override; void onValidationProtocolMsg( const libp2p::peer::PeerId &peer_id, - const network::VersionedValidatorProtocolMessage &message); + const network::VersionedValidatorProtocolMessage &message) override; + + virtual void OnBroadcastBitfields(const primitives::BlockHash &relay_parent, + const network::SignedBitfield &bitfield); + + virtual void onViewUpdated(const network::ExView &event); + + virtual void onDeactivateBlocks( + const primitives::events::RemoveAfterFinalizationParams &event); + + virtual void handle_collation_fetch_response( + network::CollationEvent &&collation_event, + network::CollationFetchingResponse &&response); + + virtual void notifySeconded(const primitives::BlockHash &relay_parent, + const SignedFullStatementWithPVD &statement); + + virtual void notifyInvalid( + const primitives::BlockHash &parent, + const network::CandidateReceipt &candidate_receipt); + + /** + * @brief This function is a template function that validates a candidate + * asynchronously. + * + * @tparam kMode The type of validation task to be performed. + * + * @param candidate The candidate receipt to be validated. + * @param pov The parachain block to be validated. + * @param pvd The persisted validation data to be used in the validation + * process. + * @param peer_id The peer ID of the node performing the validation. + * @param relay_parent The block hash of the relay parent. + * @param n_validators The number of validators in the network. + */ + virtual void validateAsync(ValidationTaskType kMode, + const network::CandidateReceipt &candidate, + const network::ParachainBlock &pov, + const runtime::PersistedValidationData &pvd, + const primitives::BlockHash &relay_parent); - outcome::result OnFetchChunkRequest( - const network::FetchChunkRequest &request); + /** + * @brief Handles an incoming advertisement for a collation. + * + * @param pending_collation The CollationEvent representing the collation + * being advertised. + * @param prospective_candidate An optional pair containing the hash of the + * prospective candidate and the hash of the parent block. + */ + void handle_advertisement(const RelayHash &relay_parent, + const libp2p::peer::PeerId &peer_id, + std::optional> + &&prospective_candidate) override; - outcome::result - OnFetchChunkRequestObsolete(const network::FetchChunkRequest &request); + /** + * @brief This function is used to make a candidate available for + * validation. + * + * @tparam kMode The type of validation task to be performed. It can be + * either 'Second' or 'Attest'. + * @param peer_id The ID of the peer that the candidate is being made + * available to. + * @param candidate_hash The hash of the candidate that is being made + * available. + * @param result The result of the validation and seconding process. + */ + virtual void makeAvailable(ValidationTaskType kMode, + const primitives::BlockHash &candidate_hash, + ValidateAndSecondResult &&result); + + virtual void on_pvf_result_received( + ValidationTaskType kMode, + size_t n_validators, + const network::CandidateReceipt &candidate, + const network::ParachainBlock &pov, + const runtime::PersistedValidationData &pvd, + const primitives::BlockHash &relay_parent, + const Hash &candidate_hash, + const outcome::result &validation_result); outcome::result get_block_number_under_construction( const RelayHash &relay_parent) const; @@ -268,16 +360,6 @@ namespace kagome::parachain { std::vector getBackedCandidates( const RelayHash &relay_parent) override; - /** - * @brief Fetches the Proof of Validity (PoV) for a given candidate. - * - * @param candidate_hash The hash of the candidate for which the PoV is to - * be fetched. - * @return network::ResponsePov The PoV associated with the given candidate - * hash. - */ - network::ResponsePov getPov(CandidateHash &&candidate_hash); - auto getAvStore() { return av_store_; } @@ -299,25 +381,15 @@ namespace kagome::parachain { * SignedFullStatementWithPVD object. */ void handleStatement(const primitives::BlockHash &relay_parent, - const SignedFullStatementWithPVD &statement); + const SignedFullStatementWithPVD &statement) override; private: enum struct StatementType { kSeconded = 0, kValid }; - using Commitments = std::shared_ptr; using WorkersContext = boost::asio::io_context; using WorkGuard = boost::asio::executor_work_guard< boost::asio::io_context::executor_type>; using SecondingAllowed = std::optional>; - struct ValidateAndSecondResult { - outcome::result result; - primitives::BlockHash relay_parent; - Commitments commitments; - network::CandidateReceipt candidate; - network::ParachainBlock pov; - runtime::PersistedValidationData pvd; - }; - struct AttestingData { network::CandidateReceipt candidate; primitives::BlockHash pov_hash; @@ -326,7 +398,7 @@ namespace kagome::parachain { }; struct TableContext { - std::optional validator; + std::optional> validator; std::unordered_map> groups; std::vector validators; @@ -410,42 +482,6 @@ namespace kagome::parachain { outcome::result> validateErasureCoding( const runtime::AvailableData &validating_data, size_t n_validators); - /** - * @brief This function is a template function that validates a candidate - * asynchronously. - * - * @tparam kMode The type of validation task to be performed. - * - * @param candidate The candidate receipt to be validated. - * @param pov The parachain block to be validated. - * @param pvd The persisted validation data to be used in the validation - * process. - * @param peer_id The peer ID of the node performing the validation. - * @param relay_parent The block hash of the relay parent. - * @param n_validators The number of validators in the network. - */ - template - void validateAsync(network::CandidateReceipt candidate, - network::ParachainBlock &&pov, - runtime::PersistedValidationData &&pvd, - const primitives::BlockHash &relay_parent); - - /** - * @brief This function is used to make a candidate available for - * validation. - * - * @tparam kMode The type of validation task to be performed. It can be - * either 'Second' or 'Attest'. - * @param peer_id The ID of the peer that the candidate is being made - * available to. - * @param candidate_hash The hash of the candidate that is being made - * available. - * @param result The result of the validation and seconding process. - */ - template - void makeAvailable(const primitives::BlockHash &candidate_hash, - ValidateAndSecondResult &&result); - /** * @brief Processes a bitfield distribution message. * @@ -534,8 +570,8 @@ namespace kagome::parachain { AttestedCandidate &&attested, TableContext &table_context, bool inject_core_index); - outcome::result> isParachainValidator( - const primitives::BlockHash &relay_parent) const; + outcome::result>> + isParachainValidator(const primitives::BlockHash &relay_parent) const; /* * Logic. @@ -588,9 +624,6 @@ namespace kagome::parachain { AttestingData &attesting_data, const runtime::PersistedValidationData &persisted_validation_data, RelayParentState ¶chain_state); - void handle_collation_fetch_response( - network::CollationEvent &&collation_event, - network::CollationFetchingResponse &&response); template std::optional createAndSignStatement( const ValidateAndSecondResult &validation_result); @@ -649,13 +682,6 @@ namespace kagome::parachain { const network::CandidateHash &candidate_hash, const network::ParachainBlock &pov, const runtime::PersistedValidationData &data); - template - void notifySeconded(const primitives::BlockHash &relay_parent, - const SignedFullStatementWithPVD &statement); - - template - void notifyInvalid(const primitives::BlockHash &parent, - const network::CandidateReceipt &candidate_receipt); /// Notify a collator that its collation got seconded. void notify_collation_seconded(const libp2p::peer::PeerId &peer_id, @@ -663,13 +689,8 @@ namespace kagome::parachain { const RelayHash &relay_parent, const SignedFullStatementWithPVD &statement); - void onDeactivateBlocks( - const primitives::events::RemoveAfterFinalizationParams &event); void handle_active_leaves_update_for_validator(const network::ExView &event, std::vector pruned); - void onViewUpdated(const network::ExView &event); - void OnBroadcastBitfields(const primitives::BlockHash &relay_parent, - const network::SignedBitfield &bitfield); outcome::result fetchCollation(const PendingCollation &pc, const CollatorId &id); outcome::result fetchCollation(const PendingCollation &pc, @@ -761,7 +782,6 @@ namespace kagome::parachain { const network::SignedStatement &statement); std::shared_ptr pm_; - std::shared_ptr runtime_info_; std::shared_ptr crypto_provider_; std::shared_ptr router_; log::Logger logger_ = @@ -788,18 +808,16 @@ namespace kagome::parachain { } validator_side; } our_current_state_; - std::shared_ptr main_pool_handler_; std::shared_ptr hasher_; - std::shared_ptr peer_view_; - network::PeerView::MyViewSubscriberPtr my_view_sub_; + std::shared_ptr peer_view_; + network::IPeerView::MyViewSubscriberPtr my_view_sub_; std::shared_ptr pvf_; - std::shared_ptr signer_factory_; - std::shared_ptr bitfield_signer_; - std::shared_ptr pvf_precheck_; + std::shared_ptr signer_factory_; + std::shared_ptr bitfield_signer_; + std::shared_ptr pvf_precheck_; std::shared_ptr bitfield_store_; std::shared_ptr backing_store_; - std::shared_ptr av_store_; std::shared_ptr parachain_host_; const application::AppConfiguration &app_config_; primitives::events::SyncStateSubscriptionEnginePtr sync_state_observable_; @@ -810,18 +828,209 @@ namespace kagome::parachain { bool synchronized_ = false; primitives::events::ChainSub chain_sub_; - std::shared_ptr worker_pool_handler_; std::default_random_engine random_; - std::shared_ptr prospective_parachains_; + std::shared_ptr prospective_parachains_; std::shared_ptr block_tree_; - std::shared_ptr + std::shared_ptr statement_distribution; std::shared_ptr> per_session; metrics::RegistryPtr metrics_registry_ = metrics::createRegistry(); metrics::Gauge *metric_is_parachain_validator_; + std::unordered_map existed_leaves_; + + public: + void handle_second_message(const network::CandidateReceipt &candidate, + const network::ParachainBlock &pov, + const runtime::PersistedValidationData &pvd, + const primitives::BlockHash &relay_parent) { + validateAsync(ValidationTaskType::kSecond, + candidate, + network::ParachainBlock(pov), + runtime::PersistedValidationData(pvd), + relay_parent); + } }; -} // namespace kagome::parachain + class ThreadedParachainProcessorImpl : public ParachainProcessorImpl { + std::shared_ptr main_pool_handler_; -OUTCOME_HPP_DECLARE_ERROR(kagome::parachain, ParachainProcessorImpl::Error); + public: + ThreadedParachainProcessorImpl( + application::AppStateManager &app_state_manager, + common::MainThreadPool &main_thread_pool, + std::shared_ptr pm, + std::shared_ptr crypto_provider, + std::shared_ptr router, + std::shared_ptr hasher, + std::shared_ptr peer_view, + std::shared_ptr bitfield_signer, + std::shared_ptr pvf_precheck, + std::shared_ptr bitfield_store, + std::shared_ptr backing_store, + std::shared_ptr pvf, + std::shared_ptr av_store, + std::shared_ptr parachain_host, + std::shared_ptr signer_factory, + const application::AppConfiguration &app_config, + primitives::events::ChainSubscriptionEnginePtr chain_sub_engine, + primitives::events::SyncStateSubscriptionEnginePtr + sync_state_observable, + std::shared_ptr query_audi, + std::shared_ptr prospective_parachains, + std::shared_ptr block_tree, + LazySPtr slots_util, + std::shared_ptr babe_config_repo, + std::shared_ptr + statement_distribution) + : ParachainProcessorImpl(std::move(pm), + std::move(crypto_provider), + std::move(router), + std::move(hasher), + std::move(peer_view), + std::move(bitfield_signer), + std::move(pvf_precheck), + std::move(bitfield_store), + std::move(backing_store), + std::move(pvf), + std::move(av_store), + std::move(parachain_host), + std::move(signer_factory), + app_config, + app_state_manager, + std::move(chain_sub_engine), + std::move(sync_state_observable), + std::move(query_audi), + std::move(prospective_parachains), + std::move(block_tree), + slots_util, + std::move(babe_config_repo), + std::move(statement_distribution)), + main_pool_handler_{main_thread_pool.handler(app_state_manager)} { + app_state_manager.takeControl(*this); + } + + void onValidationProtocolMsg( + const libp2p::peer::PeerId &peer_id, + const network::VersionedValidatorProtocolMessage &message) override { + REINVOKE(*main_pool_handler_, + ParachainProcessorImpl::onValidationProtocolMsg, + peer_id, + message); + } + + void OnBroadcastBitfields( + const primitives::BlockHash &relay_parent, + const network::SignedBitfield &bitfield) override { + REINVOKE(*main_pool_handler_, + ParachainProcessorImpl::OnBroadcastBitfields, + relay_parent, + bitfield); + } + + void onViewUpdated(const network::ExView &event) override { + REINVOKE( + *main_pool_handler_, ParachainProcessorImpl::onViewUpdated, event); + } + + void onDeactivateBlocks( + const primitives::events::RemoveAfterFinalizationParams &event) + override { + REINVOKE(*main_pool_handler_, + ParachainProcessorImpl::onDeactivateBlocks, + event); + } + + void handle_collation_fetch_response( + network::CollationEvent &&collation_event, + network::CollationFetchingResponse &&response) override { + REINVOKE(*main_pool_handler_, + ParachainProcessorImpl::handle_collation_fetch_response, + std::move(collation_event), + std::move(response)); + } + + void handleStatement(const primitives::BlockHash &relay_parent, + const SignedFullStatementWithPVD &statement) override { + REINVOKE(*main_pool_handler_, + ParachainProcessorImpl::handleStatement, + relay_parent, + statement); + } + + void notifyInvalid( + const primitives::BlockHash &parent, + const network::CandidateReceipt &candidate_receipt) override { + REINVOKE_ONCE(*main_pool_handler_, + ParachainProcessorImpl::notifyInvalid, + parent, + candidate_receipt); + } + + void notifySeconded(const primitives::BlockHash &parent, + const SignedFullStatementWithPVD &statement) override { + REINVOKE_ONCE(*main_pool_handler_, + ParachainProcessorImpl::notifySeconded, + parent, + statement); + } + + void makeAvailable( + ValidationTaskType kMode, + const primitives::BlockHash &candidate_hash, + ValidateAndSecondResult &&validate_and_second_result) override { + REINVOKE(*main_pool_handler_, + ParachainProcessorImpl::makeAvailable, + kMode, + candidate_hash, + std::move(validate_and_second_result)); + } + + void validateAsync(ValidationTaskType kMode, + const network::CandidateReceipt &candidate, + const network::ParachainBlock &pov, + const runtime::PersistedValidationData &pvd, + const primitives::BlockHash &relay_parent) override { + REINVOKE(*main_pool_handler_, + ParachainProcessorImpl::validateAsync, + kMode, + candidate, + pov, + pvd, + relay_parent); + } + + void on_pvf_result_received( + ValidationTaskType kMode, + size_t n_validators, + const network::CandidateReceipt &candidate, + const network::ParachainBlock &pov, + const runtime::PersistedValidationData &pvd, + const primitives::BlockHash &relay_parent, + const Hash &candidate_hash, + const outcome::result &validation_result) override { + REINVOKE_ONCE(*main_pool_handler_, + ParachainProcessorImpl::on_pvf_result_received, + kMode, + n_validators, + candidate, + pov, + pvd, + relay_parent, + candidate_hash, + validation_result); + } + + void handle_advertisement(const RelayHash &relay_parent, + const libp2p::peer::PeerId &peer_id, + std::optional> + &&prospective_candidate) override { + REINVOKE(*main_pool_handler_, + ParachainProcessorImpl::handle_advertisement, + relay_parent, + peer_id, + std::move(prospective_candidate)); + } + }; + +} // namespace kagome::parachain diff --git a/core/parachain/validator/parachain_storage.cpp b/core/parachain/validator/parachain_storage.cpp new file mode 100644 index 0000000000..95e68f68ce --- /dev/null +++ b/core/parachain/validator/parachain_storage.cpp @@ -0,0 +1,53 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "parachain/validator/parachain_storage.hpp" + +namespace kagome::parachain { + ParachainStorageImpl::ParachainStorageImpl( + std::shared_ptr av_store) + : av_store_(std::move(av_store)) {} + + network::ResponsePov ParachainStorageImpl::getPov( + CandidateHash &&candidate_hash) { + if (auto res = av_store_->getPov(candidate_hash)) { + return network::ResponsePov{*res}; + } + return network::Empty{}; + } + + outcome::result + ParachainStorageImpl::OnFetchChunkRequest( + const network::FetchChunkRequest &request) { + if (auto chunk = + av_store_->getChunk(request.candidate, request.chunk_index)) { + return network::Chunk{ + .data = chunk->chunk, + .chunk_index = request.chunk_index, + .proof = chunk->proof, + }; + } + return network::Empty{}; + } + + outcome::result + ParachainStorageImpl::OnFetchChunkRequestObsolete( + const network::FetchChunkRequest &request) { + if (auto chunk = + av_store_->getChunk(request.candidate, request.chunk_index)) { + // This check needed because v1 protocol mustn't have chunk mapping + // https://github.com/paritytech/polkadot-sdk/blob/d2fd53645654d3b8e12cbf735b67b93078d70113/polkadot/node/core/av-store/src/lib.rs#L1345 + if (chunk->index == request.chunk_index) { + return network::ChunkObsolete{ + .data = chunk->chunk, + .proof = chunk->proof, + }; + } + } + return network::Empty{}; + } + +} // namespace kagome::parachain diff --git a/core/parachain/validator/parachain_storage.hpp b/core/parachain/validator/parachain_storage.hpp new file mode 100644 index 0000000000..4eacdc8089 --- /dev/null +++ b/core/parachain/validator/parachain_storage.hpp @@ -0,0 +1,33 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include +#include "parachain/availability/store/store.hpp" +#include "parachain/validator/i_parachain_processor.hpp" + +namespace kagome::parachain { + + class ParachainStorageImpl : public ParachainStorage { + protected: + std::shared_ptr av_store_; + + public: + ParachainStorageImpl( + std::shared_ptr av_store); + + outcome::result OnFetchChunkRequest( + const network::FetchChunkRequest &request) override; + + outcome::result + OnFetchChunkRequestObsolete( + const network::FetchChunkRequest &request) override; + + network::ResponsePov getPov(CandidateHash &&candidate_hash) override; + }; + +} // namespace kagome::parachain diff --git a/core/parachain/validator/prospective_parachains/prospective_parachains.cpp b/core/parachain/validator/prospective_parachains/prospective_parachains.cpp index e181011806..14ac8feb63 100644 --- a/core/parachain/validator/prospective_parachains/prospective_parachains.cpp +++ b/core/parachain/validator/prospective_parachains/prospective_parachains.cpp @@ -322,6 +322,8 @@ namespace kagome::parachain { size_t ancestors) { std::vector block_info; if (ancestors == 0) { + SL_TRACE( + logger, "`ancestors` is 0. Return. (relay parent={})", relay_hash); return block_info; } @@ -462,6 +464,11 @@ namespace kagome::parachain { }; OUTCOME_TRY(ancestry, fetchAncestry(hash, mode->allowed_ancestry_len)); + if (ancestry.empty()) { + SL_TRACE(logger, + "Failed to get inclusion backing state. (relay parent={})", + hash); + } std::optional>> @@ -598,6 +605,7 @@ namespace kagome::parachain { auto r = view().implicit_view.all_allowed_relay_parents(); std::unordered_set remaining{r.begin(), r.end()}; + SL_TRACE(logger, "Remains RP in PP: {}", remaining.size()); for (auto it = view().per_relay_parent.begin(); it != view().per_relay_parent.end();) { @@ -616,13 +624,28 @@ namespace kagome::parachain { view_.emplace(View{ .per_relay_parent = {}, .active_leaves = {}, - .implicit_view = ImplicitView( - weak_from_this(), parachain_host_, block_tree_, std::nullopt), + .implicit_view = ImplicitView(weak_from_this(), + parachain_host_, + block_tree_, + std::nullopt, + true), }); } return *view_; } + // std::vector< + // std::pair> + // ProspectiveParachains::answer_hypothetical_membership_request( + // const IProspectiveParachains::HypotheticalMembershipRequest + // &request) { + // return answer_hypothetical_membership_request(request.candidates, + // utils::map(request.fragment_chain_relay_parent, [](const auto &v) + // { + // return std::cref(v); + // })) + // } + std::vector< std::pair> ProspectiveParachains::answer_hypothetical_membership_request( diff --git a/core/parachain/validator/prospective_parachains/prospective_parachains.hpp b/core/parachain/validator/prospective_parachains/prospective_parachains.hpp index ec250fb0bc..2771706de6 100644 --- a/core/parachain/validator/prospective_parachains/prospective_parachains.hpp +++ b/core/parachain/validator/prospective_parachains/prospective_parachains.hpp @@ -32,8 +32,76 @@ namespace kagome::parachain { using ParentHeadData = boost::variant; + class IProspectiveParachains { + public: + struct HypotheticalMembershipRequest { + std::vector candidates; + std::optional fragment_chain_relay_parent; + + bool operator==(const HypotheticalMembershipRequest &other) const = + default; + // { + // return candidates == other.candidates && + // fragment_chain_relay_parent == other.fragment_chain_relay_parent; + // } + }; + + virtual ~IProspectiveParachains() = default; + + // Debug print of all internal buffers load. + virtual void printStoragesLoad() = 0; + + virtual std::shared_ptr getBlockTree() = 0; + + virtual std::vector> + answerMinimumRelayParentsRequest(const RelayHash &relay_parent) = 0; + + virtual std::vector> + answerGetBackableCandidates(const RelayHash &relay_parent, + ParachainId para, + uint32_t count, + const fragment::Ancestors &ancestors) = 0; + + virtual outcome::result> + answerProspectiveValidationDataRequest( + const RelayHash &candidate_relay_parent, + const ParentHeadData &parent_head_data, + ParachainId para_id) = 0; + + virtual std::optional prospectiveParachainsMode( + const RelayHash &relay_parent) = 0; + + virtual outcome::result onActiveLeavesUpdate( + const network::ExViewRef &update) = 0; + + // virtual std::vector< + // std::pair> + // answer_hypothetical_membership_request( + // const HypotheticalMembershipRequest &request) = 0; + + virtual std::vector< + std::pair> + answer_hypothetical_membership_request( + const std::span &candidates, + const std::optional> + &fragment_tree_relay_parent) = 0; + + virtual void candidate_backed(ParachainId para, + const CandidateHash &candidate_hash) = 0; + + virtual bool introduce_seconded_candidate( + ParachainId para, + const network::CommittedCandidateReceipt &candidate, + const crypto::Hashed> &pvd, + const CandidateHash &candidate_hash) = 0; + }; + class ProspectiveParachains - : public std::enable_shared_from_this { + : public IProspectiveParachains, + public std::enable_shared_from_this { #ifdef CFG_TESTING public: #endif // CFG_TESTING @@ -86,27 +154,27 @@ namespace kagome::parachain { std::shared_ptr block_tree); // Debug print of all internal buffers load. - void printStoragesLoad(); + void printStoragesLoad() override; - std::shared_ptr getBlockTree(); + std::shared_ptr getBlockTree() override; std::vector> - answerMinimumRelayParentsRequest(const RelayHash &relay_parent); + answerMinimumRelayParentsRequest(const RelayHash &relay_parent) override; std::vector> answerGetBackableCandidates( const RelayHash &relay_parent, ParachainId para, uint32_t count, - const fragment::Ancestors &ancestors); + const fragment::Ancestors &ancestors) override; outcome::result> answerProspectiveValidationDataRequest( const RelayHash &candidate_relay_parent, const ParentHeadData &parent_head_data, - ParachainId para_id); + ParachainId para_id) override; std::optional prospectiveParachainsMode( - const RelayHash &relay_parent); + const RelayHash &relay_parent) override; outcome::result onActiveLeavesUpdate( - const network::ExViewRef &update); + const network::ExViewRef &update) override; /// @brief calculates hypothetical candidate and fragment tree membership /// @param candidates Candidates, in arbitrary order, which should be @@ -144,10 +212,15 @@ namespace kagome::parachain { answer_hypothetical_membership_request( const std::span &candidates, const std::optional> - &fragment_tree_relay_parent); + &fragment_tree_relay_parent) override; + + // std::vector< + // std::pair> + // answer_hypothetical_membership_request( + // const HypotheticalMembershipRequest &request) override; void candidate_backed(ParachainId para, - const CandidateHash &candidate_hash); + const CandidateHash &candidate_hash) override; bool introduce_seconded_candidate( ParachainId para, @@ -155,7 +228,7 @@ namespace kagome::parachain { const crypto::Hashed> &pvd, - const CandidateHash &candidate_hash); + const CandidateHash &candidate_hash) override; }; } // namespace kagome::parachain diff --git a/core/parachain/validator/signer.cpp b/core/parachain/validator/signer.cpp index b005fd123d..82400c945b 100644 --- a/core/parachain/validator/signer.cpp +++ b/core/parachain/validator/signer.cpp @@ -50,21 +50,19 @@ namespace kagome::parachain { hasher_{std::move(hasher)}, sr25519_provider_{std::move(sr25519_provider)} {} - outcome::result> ValidatorSignerFactory::at( - const primitives::BlockHash &relay_parent) { + outcome::result>> + ValidatorSignerFactory::at(const primitives::BlockHash &relay_parent) { OUTCOME_TRY(validators, parachain_api_->validators(relay_parent)); auto keypair = session_keys_->getParaKeyPair(validators); if (not keypair) { return std::nullopt; } OUTCOME_TRY(context, SigningContext::make(parachain_api_, relay_parent)); - return ValidatorSigner{ - keypair->second, - context, - std::move(keypair->first), - hasher_, - sr25519_provider_, - }; + return std::make_shared(keypair->second, + context, + std::move(keypair->first), + hasher_, + sr25519_provider_); } outcome::result> diff --git a/core/parachain/validator/signer.hpp b/core/parachain/validator/signer.hpp index d264173e90..dde38e2106 100644 --- a/core/parachain/validator/signer.hpp +++ b/core/parachain/validator/signer.hpp @@ -57,8 +57,24 @@ namespace kagome::parachain { primitives::BlockHash relay_parent; }; + class IValidatorSigner { + public: + virtual ~IValidatorSigner() = default; + + virtual outcome::result> sign( + const network::Statement &payload) const = 0; + virtual outcome::result> sign( + const scale::BitVec &payload) const = 0; + + virtual ValidatorIndex validatorIndex() const = 0; + virtual SessionIndex getSessionIndex() const = 0; + virtual const primitives::BlockHash &relayParent() const = 0; + virtual outcome::result signRaw( + common::BufferView data) const = 0; + }; + /// Signs payload with signing context and validator keypair. - class ValidatorSigner { + class ValidatorSigner : public IValidatorSigner { public: using ValidatorIndex = network::ValidatorIndex; @@ -68,9 +84,18 @@ namespace kagome::parachain { std::shared_ptr hasher, std::shared_ptr sr25519_provider); + outcome::result> sign( + const network::Statement &payload) const override { + return sign_obj(payload); + } + outcome::result> sign( + const scale::BitVec &payload) const override { + return sign_obj(payload); + } + /// Sign payload. template - outcome::result> sign(T payload) const { + outcome::result> sign_obj(T payload) const { auto data = context_.signable(*hasher_, payload); OUTCOME_TRY(signature, sr25519_provider_->sign(*keypair_, data)); return parachain::IndexedAndSigned{ @@ -79,17 +104,17 @@ namespace kagome::parachain { }; } - outcome::result signRaw(common::BufferView data) const { + outcome::result signRaw(common::BufferView data) const override { return sr25519_provider_->sign(*keypair_, data); } - SessionIndex getSessionIndex() const; + SessionIndex getSessionIndex() const override; /// Get validator index. - ValidatorIndex validatorIndex() const; + ValidatorIndex validatorIndex() const override; /// Get relay parent hash. - const primitives::BlockHash &relayParent() const; + const primitives::BlockHash &relayParent() const override; private: ValidatorIndex validator_index_; @@ -100,7 +125,22 @@ namespace kagome::parachain { }; /// Creates validator signer. - class ValidatorSignerFactory { + class IValidatorSignerFactory { + public: + virtual ~IValidatorSignerFactory() = default; + + /// Create validator signer if keypair belongs to validator at given block. + virtual outcome::result>> + at(const primitives::BlockHash &relay_parent) = 0; + + virtual outcome::result> + getAuthorityValidatorIndex(const primitives::BlockHash &relay_parent) = 0; + }; + + /// Creates validator signer. + class ValidatorSignerFactory + : public IValidatorSignerFactory, + std::enable_shared_from_this { public: ValidatorSignerFactory( std::shared_ptr parachain_api, @@ -109,11 +149,11 @@ namespace kagome::parachain { std::shared_ptr sr25519_provider); /// Create validator signer if keypair belongs to validator at given block. - outcome::result> at( - const primitives::BlockHash &relay_parent); + outcome::result>> at( + const primitives::BlockHash &relay_parent) override; outcome::result> getAuthorityValidatorIndex( - const primitives::BlockHash &relay_parent); + const primitives::BlockHash &relay_parent) override; private: std::shared_ptr parachain_api_; diff --git a/core/parachain/validator/statement_distribution/i_statement_distribution.hpp b/core/parachain/validator/statement_distribution/i_statement_distribution.hpp new file mode 100644 index 0000000000..b522233a43 --- /dev/null +++ b/core/parachain/validator/statement_distribution/i_statement_distribution.hpp @@ -0,0 +1,68 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include +#include +#include "authority_discovery/query/query.hpp" +#include "common/ref_cache.hpp" +#include "consensus/babe/babe_config_repository.hpp" +#include "consensus/babe/impl/babe_digests_util.hpp" +#include "consensus/timeline/slots_util.hpp" +#include "network/can_disconnect.hpp" +#include "network/peer_manager.hpp" +#include "network/peer_view.hpp" +#include "network/router.hpp" +#include "parachain/approval/approval_thread_pool.hpp" +#include "parachain/validator/impl/candidates.hpp" +#include "parachain/validator/network_bridge.hpp" +#include "parachain/validator/signer.hpp" +#include "parachain/validator/statement_distribution/peer_state.hpp" +#include "parachain/validator/statement_distribution/per_session_state.hpp" +#include "parachain/validator/statement_distribution/types.hpp" +#include "utils/pool_handler_ready_make.hpp" + +namespace kagome::parachain { + class ParachainProcessor; +} + +namespace kagome::parachain::statement_distribution { + + class IStatementDistribution { + public: + virtual ~IStatementDistribution() = default; + + virtual void OnFetchAttestedCandidateRequest( + const network::vstaging::AttestedCandidateRequest &request, + std::shared_ptr stream) = 0; + + virtual void store_parachain_processor( + std::weak_ptr pp) = 0; + + virtual void handle_incoming_manifest( + const libp2p::peer::PeerId &peer_id, + const network::vstaging::BackedCandidateManifest &msg) = 0; + + virtual void handle_incoming_acknowledgement( + const libp2p::peer::PeerId &peer_id, + const network::vstaging::BackedCandidateAcknowledgement + &acknowledgement) = 0; + + virtual void handle_incoming_statement( + const libp2p::peer::PeerId &peer_id, + const network::vstaging::StatementDistributionMessageStatement + &stm) = 0; + + virtual void handle_backed_candidate_message( + const CandidateHash &candidate_hash) = 0; + + virtual void share_local_statement( + const primitives::BlockHash &relay_parent, + const SignedFullStatementWithPVD &statement) = 0; + }; + +} // namespace kagome::parachain::statement_distribution diff --git a/core/parachain/validator/statement_distribution/peer_state.hpp b/core/parachain/validator/statement_distribution/peer_state.hpp index 2e11d85579..2537bf3bbf 100644 --- a/core/parachain/validator/statement_distribution/peer_state.hpp +++ b/core/parachain/validator/statement_distribution/peer_state.hpp @@ -20,7 +20,7 @@ #include "utils/pool_handler_ready_make.hpp" namespace kagome::parachain { - class ParachainProcessorImpl; + class ParachainProcessor; } namespace kagome::parachain::statement_distribution { diff --git a/core/parachain/validator/statement_distribution/statement_distribution.cpp b/core/parachain/validator/statement_distribution/statement_distribution.cpp index 416bf1fbe7..53b9d49543 100644 --- a/core/parachain/validator/statement_distribution/statement_distribution.cpp +++ b/core/parachain/validator/statement_distribution/statement_distribution.cpp @@ -107,7 +107,7 @@ namespace kagome::parachain::statement_distribution { std::shared_ptr sf, std::shared_ptr app_state_manager, StatementDistributionThreadPool &statements_distribution_thread_pool, - std::shared_ptr _prospective_parachains, + std::shared_ptr _prospective_parachains, std::shared_ptr _parachain_host, std::shared_ptr _block_tree, std::shared_ptr _query_audi, @@ -233,7 +233,7 @@ namespace kagome::parachain::statement_distribution { peers.erase(peer); } - outcome::result> + outcome::result>> StatementDistribution::is_parachain_validator( const primitives::BlockHash &relay_parent) const { BOOST_ASSERT(main_pool_handler->isInCurrentThread()); @@ -270,7 +270,7 @@ namespace kagome::parachain::statement_distribution { if (auto res = is_parachain_validator(new_relay_parent); res.has_value()) { validator_index = utils::map(res.value(), [](const auto &signer) { - return signer.validatorIndex(); + return signer->validatorIndex(); }); } diff --git a/core/parachain/validator/statement_distribution/statement_distribution.hpp b/core/parachain/validator/statement_distribution/statement_distribution.hpp index 29d3e442d3..1a35d210f3 100644 --- a/core/parachain/validator/statement_distribution/statement_distribution.hpp +++ b/core/parachain/validator/statement_distribution/statement_distribution.hpp @@ -21,19 +21,21 @@ #include "parachain/validator/impl/candidates.hpp" #include "parachain/validator/network_bridge.hpp" #include "parachain/validator/signer.hpp" +#include "parachain/validator/statement_distribution/i_statement_distribution.hpp" #include "parachain/validator/statement_distribution/peer_state.hpp" #include "parachain/validator/statement_distribution/per_session_state.hpp" #include "parachain/validator/statement_distribution/types.hpp" #include "utils/pool_handler_ready_make.hpp" namespace kagome::parachain { - class ParachainProcessorImpl; + class ParachainProcessor; } namespace kagome::parachain::statement_distribution { class StatementDistribution : public std::enable_shared_from_this, + public IStatementDistribution, public network::CanDisconnect { public: enum class Error : uint8_t { @@ -74,7 +76,7 @@ namespace kagome::parachain::statement_distribution { std::shared_ptr sf, std::shared_ptr app_state_manager, StatementDistributionThreadPool &statements_distribution_thread_pool, - std::shared_ptr prospective_parachains, + std::shared_ptr prospective_parachains, std::shared_ptr parachain_host, std::shared_ptr block_tree, std::shared_ptr query_audi, @@ -97,11 +99,12 @@ namespace kagome::parachain::statement_distribution { // outcome::result void OnFetchAttestedCandidateRequest( const network::vstaging::AttestedCandidateRequest &request, - std::shared_ptr stream); + std::shared_ptr stream) override; // CanDisconnect bool can_disconnect(const libp2p::PeerId &) const override; - void store_parachain_processor(std::weak_ptr pp) { + void store_parachain_processor( + std::weak_ptr pp) override { BOOST_ASSERT(!pp.expired()); parachain_processor = std::move(pp); } @@ -113,21 +116,24 @@ namespace kagome::parachain::statement_distribution { // validators group or sends a request to fetch the attested candidate. void handle_incoming_manifest( const libp2p::peer::PeerId &peer_id, - const network::vstaging::BackedCandidateManifest &msg); + const network::vstaging::BackedCandidateManifest &msg) override; void handle_incoming_acknowledgement( const libp2p::peer::PeerId &peer_id, const network::vstaging::BackedCandidateAcknowledgement - &acknowledgement); + &acknowledgement) override; void handle_incoming_statement( const libp2p::peer::PeerId &peer_id, - const network::vstaging::StatementDistributionMessageStatement &stm); + const network::vstaging::StatementDistributionMessageStatement &stm) + override; - void handle_backed_candidate_message(const CandidateHash &candidate_hash); + void handle_backed_candidate_message( + const CandidateHash &candidate_hash) override; - void share_local_statement(const primitives::BlockHash &relay_parent, - const SignedFullStatementWithPVD &statement); + void share_local_statement( + const primitives::BlockHash &relay_parent, + const SignedFullStatementWithPVD &statement) override; private: struct ManifestImportSuccess { @@ -325,8 +331,8 @@ namespace kagome::parachain::statement_distribution { std::vector> frontier); - outcome::result> is_parachain_validator( - const primitives::BlockHash &relay_parent) const; + outcome::result>> + is_parachain_validator(const primitives::BlockHash &relay_parent) const; std::unordered_map> determine_groups_per_para( @@ -364,12 +370,12 @@ namespace kagome::parachain::statement_distribution { /// worker thread std::shared_ptr statements_distribution_thread_handler; std::shared_ptr query_audi; - std::weak_ptr parachain_processor; + std::weak_ptr parachain_processor; std::shared_ptr network_bridge; std::shared_ptr router; std::shared_ptr main_pool_handler; std::shared_ptr hasher; - std::shared_ptr prospective_parachains; + std::shared_ptr prospective_parachains; std::shared_ptr parachain_host; std::shared_ptr crypto_provider; std::shared_ptr peer_view; diff --git a/core/utils/pool_handler.hpp b/core/utils/pool_handler.hpp index c23038988e..aa4a3f28c9 100644 --- a/core/utils/pool_handler.hpp +++ b/core/utils/pool_handler.hpp @@ -100,23 +100,20 @@ namespace kagome { "expected to execute on other thread" \ } -/// Reinvokes function once depending on `template ` argument. +/// Reinvokes function once. /// If `true` reinvoke takes place, otherwise direct call. After reinvoke called /// function has `false` in kReinvoke. -#define REINVOKE_ONCE(ctx, func, ...) \ - ({ \ - if constexpr (kReinvoke) { \ - return post( \ - ctx, \ - [weak{weak_from_this()}, \ - args = std::make_tuple(__VA_ARGS__)]() mutable { \ - if (auto self = weak.lock()) { \ - std::apply( \ - [&](auto &&...args) mutable { \ - self->func(std::forward(args)...); \ - }, \ - std::move(args)); \ - } \ - }); \ - } \ +#define REINVOKE_ONCE(ctx, func, ...) \ + ({ \ + return post(ctx, \ + [weak{weak_from_this()}, \ + args = std::make_tuple(__VA_ARGS__)]() mutable { \ + if (auto self = weak.lock()) { \ + std::apply( \ + [&](auto &&...args) mutable { \ + self->func(std::forward(args)...); \ + }, \ + std::move(args)); \ + } \ + }); \ }) diff --git a/test/core/parachain/CMakeLists.txt b/test/core/parachain/CMakeLists.txt index a2f352b457..e13b9cecb7 100644 --- a/test/core/parachain/CMakeLists.txt +++ b/test/core/parachain/CMakeLists.txt @@ -30,6 +30,21 @@ target_link_libraries(scope_test logger ) +addtest(backing_test + backing.cpp + ) + +target_link_libraries(backing_test + Boost::Boost.DI + Boost::boost + validator_parachain + prospective_parachains + log_configurator + base_fs_test + key_store + logger + ) + addtest(candidates_test candidates.cpp ) diff --git a/test/core/parachain/backing.cpp b/test/core/parachain/backing.cpp new file mode 100644 index 0000000000..65052e850b --- /dev/null +++ b/test/core/parachain/backing.cpp @@ -0,0 +1,676 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "common/main_thread_pool.hpp" +#include "common/worker_thread_pool.hpp" +#include "core/parachain/parachain_test_harness.hpp" +#include "crypto/bip39/impl/bip39_provider_impl.hpp" +#include "crypto/hasher/hasher_impl.hpp" +#include "crypto/pbkdf2/impl/pbkdf2_provider_impl.hpp" +#include "injector/bind_by_lambda.hpp" +#include "mock/core/application/app_configuration_mock.hpp" +#include "mock/core/application/app_state_manager_mock.hpp" +#include "mock/core/authority_discovery/query_mock.hpp" +#include "mock/core/blockchain/block_tree_mock.hpp" +#include "mock/core/consensus/babe/babe_config_repository_mock.hpp" +#include "mock/core/consensus/timeline/slots_util_mock.hpp" +#include "mock/core/crypto/sr25519_provider_mock.hpp" +#include "mock/core/network/peer_manager_mock.hpp" +#include "mock/core/network/peer_view_mock.hpp" +#include "mock/core/network/router_mock.hpp" +#include "mock/core/parachain/availability_store_mock.hpp" +#include "mock/core/parachain/backing_store_mock.hpp" +#include "mock/core/parachain/bitfield_signer_mock.hpp" +#include "mock/core/parachain/bitfield_store_mock.hpp" +#include "mock/core/parachain/prospective_parachains_mock.hpp" +#include "mock/core/parachain/pvf_mock.hpp" +#include "mock/core/parachain/pvf_precheck_mock.hpp" +#include "mock/core/parachain/signer_factory_mock.hpp" +#include "mock/core/parachain/statement_distribution_mock.hpp" +#include "mock/core/runtime/parachain_host_mock.hpp" +#include "parachain/availability/chunks.hpp" +#include "parachain/availability/proof.hpp" +#include "parachain/validator/parachain_processor.hpp" +#include "primitives/event_types.hpp" +#include "testutil/lazy.hpp" +#include "utils/map.hpp" + +using namespace kagome::parachain; +namespace runtime = kagome::runtime; + +using kagome::Watchdog; +using kagome::application::AppConfiguration; +using kagome::application::AppConfigurationMock; +using kagome::application::StartApp; +using kagome::authority_discovery::Query; +using kagome::authority_discovery::QueryMock; +using kagome::blockchain::BlockTree; +using kagome::blockchain::BlockTreeMock; +using kagome::common::MainThreadPool; +using kagome::common::WorkerThreadPool; +using kagome::consensus::SlotsUtil; +using kagome::consensus::SlotsUtilMock; +using kagome::consensus::babe::BabeConfigRepository; +using kagome::consensus::babe::BabeConfigRepositoryMock; +using kagome::crypto::Sr25519Provider; +using kagome::crypto::Sr25519ProviderMock; +using kagome::network::IPeerView; +using kagome::network::PeerManager; +using kagome::network::PeerManagerMock; +using kagome::network::PeerViewMock; +using kagome::network::Router; +using kagome::network::RouterMock; +using kagome::primitives::events::ChainSubscriptionEngine; +using kagome::primitives::events::SyncStateSubscriptionEngine; +using kagome::primitives::events::SyncStateSubscriptionEnginePtr; +using kagome::runtime::ParachainHost; +using kagome::runtime::ParachainHostMock; + +class BackingTest : public ProspectiveParachainsTestHarness { + void SetUp() override { + ProspectiveParachainsTestHarness::SetUp(); + + watchdog_ = std::make_shared(std::chrono::milliseconds(1)); + main_thread_pool_ = std::make_shared( + watchdog_, std::make_shared()); + worker_thread_pool_ = std::make_shared(watchdog_, 1); + peer_manager_ = std::make_shared(); + sr25519_provider_ = std::make_shared(); + router_ = std::make_shared(); + peer_view_ = std::make_shared(); + bitfield_signer_ = std::make_shared(); + pvf_precheck_ = std::make_shared(); + bitfield_store_ = std::make_shared(); + backing_store_ = std::make_shared(); + pvf_ = std::make_shared(); + av_store_ = std::make_shared(); + parachain_host_ = std::make_shared(); + signer_factory_ = std::make_shared(); + chain_sub_engine_ = std::make_shared(); + sync_state_observable_ = std::make_shared(); + query_audi_ = std::make_shared(); + prospective_parachains_ = std::make_shared(); + block_tree_ = std::make_shared(); + slots_util_ = std::make_shared(); + babe_config_repo_ = std::make_shared(); + statement_distribution_ = + std::make_shared(); + signer_ = std::make_shared(); + + my_view_observable_ = + std::make_shared(); + + StartApp app_state_manager; + parachain_processor_ = std::make_shared( + peer_manager_, + sr25519_provider_, + router_, + hasher_, + peer_view_, + bitfield_signer_, + pvf_precheck_, + bitfield_store_, + backing_store_, + pvf_, + av_store_, + parachain_host_, + signer_factory_, + app_config_, + app_state_manager, + chain_sub_engine_, + sync_state_observable_, + query_audi_, + prospective_parachains_, + block_tree_, + testutil::sptr_to_lazy(slots_util_), + babe_config_repo_, + statement_distribution_); + + EXPECT_CALL(*peer_view_, getMyViewObservable()) + .WillRepeatedly(Return(my_view_observable_)); + EXPECT_CALL(*statement_distribution_, store_parachain_processor(testing::_)) + .Times(1); + EXPECT_CALL(*bitfield_signer_, setBroadcastCallback(testing::_)).Times(1); + EXPECT_CALL(app_config_, roles()) + .WillRepeatedly(Return(network::Roles(0xff))); + EXPECT_CALL(*prospective_parachains_, getBlockTree()) + .WillRepeatedly(Return(block_tree_)); + + app_state_manager.start(); + } + + void TearDown() override { + watchdog_->stop(); + parachain_host_.reset(); + ProspectiveParachainsTestHarness::TearDown(); + } + + public: + AppConfigurationMock app_config_; + std::shared_ptr watchdog_; + std::shared_ptr main_thread_pool_; + std::shared_ptr worker_thread_pool_; + std::shared_ptr peer_manager_; + std::shared_ptr sr25519_provider_; + std::shared_ptr router_; + std::shared_ptr peer_view_; + std::shared_ptr bitfield_signer_; + std::shared_ptr pvf_precheck_; + std::shared_ptr bitfield_store_; + std::shared_ptr backing_store_; + std::shared_ptr pvf_; + std::shared_ptr av_store_; + std::shared_ptr parachain_host_; + std::shared_ptr signer_factory_; + std::shared_ptr chain_sub_engine_; + SyncStateSubscriptionEnginePtr sync_state_observable_; + std::shared_ptr query_audi_; + std::shared_ptr prospective_parachains_; + std::shared_ptr block_tree_; + std::shared_ptr slots_util_; + std::shared_ptr babe_config_repo_; + std::shared_ptr + statement_distribution_; + std::shared_ptr parachain_processor_; + std::shared_ptr signer_; + + PeerViewMock::MyViewSubscriptionEnginePtr my_view_observable_; + + struct TestState { + std::vector chain_ids; + std::unordered_map head_data; + std::vector validators; + std::vector availability_cores; + SigningContext signing_context; + uint32_t minimum_backing_votes; + + struct { + std::vector groups; + runtime::GroupDescriptor group_rotation; + } validator_groups; + + TestState() { + const ParachainId chain_a(1); + const ParachainId chain_b(2); + + chain_ids = {chain_a, chain_b}; + + head_data[chain_a] = {4, 5, 6}; + head_data[chain_b] = {5, 6, 7}; + + crypto::Bip39ProviderImpl bip_provider{ + std::make_shared(), + std::make_shared(), + }; + + auto sr25519_provider = std::make_shared(); + auto f = [&](std::string_view phrase) { + auto bip = bip_provider.generateSeed(phrase).value(); + auto keys = sr25519_provider + ->generateKeypair(crypto::Sr25519Seed::from(bip.seed), + bip.junctions) + .value(); + return keys.public_key; + }; + validators.emplace_back(f("//Alice")); + validators.emplace_back(f("//Bob")); + validators.emplace_back(f("//Charlie")); + validators.emplace_back(f("//Dave")); + validators.emplace_back(f("//Ferdie")); + validators.emplace_back(f("//One")); + + validator_groups.groups = {runtime::ValidatorGroup{ + .validators = {2, 0, 3, 5}, + }, + runtime::ValidatorGroup{ + .validators = {1}, + }}; + validator_groups.group_rotation = runtime::GroupDescriptor{ + .session_start_block = 0, + .group_rotation_frequency = 100, + .now_block_num = 1, + }; + + availability_cores = {runtime::ScheduledCore{ + .para_id = chain_a, + .collator = std::nullopt, + }, + runtime::ScheduledCore{ + .para_id = chain_b, + .collator = std::nullopt, + }}; + + const auto relay_parent = fromNumber(5); + + signing_context = SigningContext{ + .session_index = 1, + .relay_parent = relay_parent, + }; + + minimum_backing_votes = LEGACY_MIN_BACKING_VOTES; + } + }; + + struct TestLeaf { + BlockNumber number; + Hash hash; + std::vector> min_relay_parents; + }; + + void activate_leaf(const TestLeaf &leaf, const TestState &test_state) { + const auto &[leaf_number, leaf_hash, min_relay_parents] = leaf; + network::ExView update{ + .view = {}, + .new_head = + BlockHeader{ + .number = leaf_number, + .parent_hash = get_parent_hash(leaf_hash), + .state_root = {}, + .extrinsics_root = {}, + .digest = {}, + .hash_opt = {}, + }, + .lost = {}, + }; + update.new_head.hash_opt = leaf_hash; + + EXPECT_CALL(*parachain_host_, staging_async_backing_params(leaf_hash)) + .WillRepeatedly(Return(outcome::success(fragment::AsyncBackingParams{ + .max_candidate_depth = 4, .allowed_ancestry_len = 3}))); + + EXPECT_CALL(*prospective_parachains_, prospectiveParachainsMode(leaf_hash)) + .WillRepeatedly(Return(ProspectiveParachainsMode{ + .max_candidate_depth = 4, + .allowed_ancestry_len = 3, + })); + + EXPECT_CALL(*signer_, validatorIndex()).WillRepeatedly(Return(0)); + + EXPECT_CALL(*bitfield_store_, printStoragesLoad()).WillRepeatedly(Return()); + EXPECT_CALL(*backing_store_, printStoragesLoad()).WillRepeatedly(Return()); + EXPECT_CALL(*av_store_, printStoragesLoad()).WillRepeatedly(Return()); + EXPECT_CALL(*prospective_parachains_, printStoragesLoad()) + .WillRepeatedly(Return()); + + EXPECT_CALL(*backing_store_, onActivateLeaf(testing::_)) + .WillRepeatedly(Return()); + EXPECT_CALL(*prospective_parachains_, onActiveLeavesUpdate(testing::_)) + .WillRepeatedly(Return(outcome::success())); + EXPECT_CALL(*peer_manager_, enumeratePeerState(testing::_)) + .WillRepeatedly(Return()); + + const BlockNumber min_min = [&]() -> BlockNumber { + std::optional min_min; + for (const auto &[_, block_num] : min_relay_parents) { + min_min = (min_min ? std::min(*min_min, block_num) : block_num); + } + if (min_min) { + return *min_min; + } + return leaf_number; + }(); + + const auto ancestry_len = leaf_number + 1 - min_min; + std::vector ancestry_hashes; + std::vector ancestry_numbers; + + Hash d = leaf_hash; + for (BlockNumber x = 0; x < ancestry_len; ++x) { + ancestry_hashes.emplace_back(d); + ancestry_numbers.push_back(leaf_number - x); + d = get_parent_hash(d); + } + ASSERT_EQ(ancestry_hashes.size(), ancestry_numbers.size()); + + size_t requested_len = 0; + for (size_t i = 0; i < ancestry_hashes.size(); ++i) { + const auto &hash = ancestry_hashes[i]; + const auto &number = ancestry_numbers[i]; + const auto parent_hash = + ((i == ancestry_hashes.size() - 1) ? get_parent_hash(hash) + : ancestry_hashes[i + 1]); + + EXPECT_CALL(*parachain_host_, session_index_for_child(hash)) + .WillRepeatedly(Return(test_state.signing_context.session_index)); + + EXPECT_CALL(*block_tree_, getBlockHeader(hash)) + .WillRepeatedly(Return(BlockHeader{ + .number = number, + .parent_hash = parent_hash, + .state_root = {}, + .extrinsics_root = {}, + .digest = {}, + .hash_opt = {}, + })); + + EXPECT_CALL(*parachain_host_, validators(hash)) + .WillRepeatedly(Return(test_state.validators)); + + EXPECT_CALL(*parachain_host_, validator_groups(hash)) + .WillRepeatedly(Return( + std::make_tuple(test_state.validator_groups.groups, + runtime::GroupDescriptor{ + .session_start_block = + test_state.validator_groups.group_rotation + .session_start_block, + .group_rotation_frequency = + test_state.validator_groups.group_rotation + .group_rotation_frequency, + .now_block_num = number, + }))); + + EXPECT_CALL(*parachain_host_, availability_cores(hash)) + .WillRepeatedly(Return(test_state.availability_cores)); + + EXPECT_CALL(*signer_factory_, at(hash)).WillRepeatedly(Return(signer_)); + EXPECT_CALL(*signer_factory_, getAuthorityValidatorIndex(hash)) + .WillRepeatedly(Return(0)); + + runtime::SessionInfo si; + si.validators = test_state.validators; + si.discovery_keys = test_state.validators; + EXPECT_CALL(*parachain_host_, + session_info(hash, test_state.signing_context.session_index)) + .WillRepeatedly(Return(si)); + + EXPECT_CALL(*parachain_host_, node_features(hash)) + .WillRepeatedly(Return(runtime::NodeFeatures())); + + EXPECT_CALL( + *parachain_host_, + minimum_backing_votes(hash, test_state.signing_context.session_index)) + .WillRepeatedly(Return(test_state.minimum_backing_votes)); + + if (requested_len == 0) { + EXPECT_CALL(*prospective_parachains_, + answerMinimumRelayParentsRequest(leaf_hash)) + .WillRepeatedly(Return(min_relay_parents)); + } + + requested_len += 1; + } + + my_view_observable_->notify(PeerViewMock::EventType::kViewUpdated, update); + } + + runtime::PersistedValidationData dummy_pvd() { + return runtime::PersistedValidationData{ + .parent_head = {7, 8, 9}, + .relay_parent_number = 0, + .relay_parent_storage_root = fromNumber(0), + .max_pov_size = 1024, + }; + } + + template + inline Hash hash_of(T &&t) { + return hasher_->blake2b_256(scale::encode(std::forward(t)).value()); + } + + template + static Hash hash_of(const kagome::crypto::Hasher &hasher, T &&t) { + return hasher.blake2b_256(scale::encode(std::forward(t)).value()); + } + + Hash make_erasure_root( + const TestState &test, + const network::PoV &pov, + const runtime::PersistedValidationData &validation_data) { + const runtime::AvailableData available_data{ + .pov = pov, + .validation_data = validation_data, + }; + + auto chunks = toChunks(test.validators.size(), available_data).value(); + return makeTrieProof(chunks); + } + + void assert_validation_requests( + const runtime::ValidationCode &validation_code) {} + + void assert_hypothetical_membership_requests( + std::vector< + std::pair>>> + expected_requests) { + for (const auto &[request, candidates_membership] : expected_requests) { + EXPECT_CALL(*prospective_parachains_, + answer_hypothetical_membership_request( + request)) // request.candidates, ref + .WillOnce(Return(candidates_membership)); + } + } + + std::vector< + std::pair> + make_hypothetical_membership_response( + const HypotheticalCandidate &hypothetical_candidate, + const Hash &relay_parent_hash) { + return {{hypothetical_candidate, {relay_parent_hash}}}; + } + + void assert_validate_seconded_candidate( + const Hash &relay_parent, + const network::CommittedCandidateReceipt &candidate, + const network::PoV &assert_pov, + const runtime::PersistedValidationData &assert_pvd, + const runtime::ValidationCode &assert_validation_code, + const HeadData &expected_head_data, + bool fetch_pov) { + assert_validation_requests(assert_validation_code); + + const std::pair + pvf_result{network::CandidateCommitments{ + .upward_msgs = {}, + .outbound_hor_msgs = {}, + .opt_para_runtime = std::nullopt, + .para_head = expected_head_data, + .downward_msgs_count = 0, + .watermark = 0, + }, + assert_pvd}; + EXPECT_CALL(*pvf_, + call_pvf(candidate.to_plain(*hasher_), assert_pov, assert_pvd)) + .WillRepeatedly(Return(pvf_result)); + + EXPECT_CALL(*av_store_, + storeData(testing::_, + network::candidateHash(*hasher_, candidate), + testing::_, + assert_pov, + assert_pvd)) + .WillRepeatedly(Return()); + } + + struct TestCandidateBuilder { + ParachainId para_id; + HeadData head_data; + Hash pov_hash; + Hash relay_parent; + Hash erasure_root; + Hash persisted_validation_data_hash; + std::vector validation_code; + + network::CommittedCandidateReceipt build( + const kagome::crypto::Hasher &hasher) { + return network::CommittedCandidateReceipt{ + .descriptor = + network::CandidateDescriptor{ + .para_id = para_id, + .relay_parent = relay_parent, + .reserved_1 = {}, + .persisted_data_hash = persisted_validation_data_hash, + .pov_hash = pov_hash, + .erasure_encoding_root = erasure_root, + .reserved_2 = {}, + .para_head_hash = hash_of(hasher, head_data), + .validation_code_hash = hash_of( + hasher, kagome::runtime::ValidationCode(validation_code)), + }, + .commitments = + network::CandidateCommitments{ + .upward_msgs = {}, + .outbound_hor_msgs = {}, + .opt_para_runtime = std::nullopt, + .para_head = head_data, + .downward_msgs_count = 0, + .watermark = 0, + }, + }; + } + }; +}; + +TEST_F(BackingTest, seconding_sanity_check_allowed_on_all) { + TestState test_state; + + const BlockNumber LEAF_A_BLOCK_NUMBER = 100; + const BlockNumber LEAF_A_ANCESTRY_LEN = 3; + const auto para_id = test_state.chain_ids[0]; + + // `a` is grandparent of `b`. + const auto leaf_a_hash = fromNumber(130); + const auto leaf_a_parent = get_parent_hash(leaf_a_hash); + const TestLeaf test_leaf_a{ + .number = LEAF_A_BLOCK_NUMBER, + .hash = leaf_a_hash, + .min_relay_parents = {{para_id, + LEAF_A_BLOCK_NUMBER - LEAF_A_ANCESTRY_LEN}}, + }; + + const BlockNumber LEAF_B_BLOCK_NUMBER = LEAF_A_BLOCK_NUMBER + 2; + const BlockNumber LEAF_B_ANCESTRY_LEN = 4; + + const auto leaf_b_hash = fromNumber(128); + const TestLeaf test_leaf_b{ + .number = LEAF_B_BLOCK_NUMBER, + .hash = leaf_b_hash, + .min_relay_parents = {{para_id, + LEAF_B_BLOCK_NUMBER - LEAF_B_ANCESTRY_LEN}}, + }; + + sync_state_observable_->notify( + kagome::primitives::events::SyncStateEventType::kSyncState, + kagome::primitives::events::SyncStateEventParams::SYNCHRONIZED); + + activate_leaf(test_leaf_a, test_state); + activate_leaf(test_leaf_b, test_state); + + kagome::network::PoV pov{.payload = {42, 43, 44}}; + const auto pvd = dummy_pvd(); + kagome::runtime::ValidationCode validation_code = {1, 2, 3}; + + const auto &expected_head_data = test_state.head_data[para_id]; + const auto pov_hash = hash_of(pov); + + const auto candidate = + TestCandidateBuilder{ + .para_id = para_id, + .head_data = expected_head_data, + .pov_hash = pov_hash, + .relay_parent = leaf_a_parent, + .erasure_root = make_erasure_root(test_state, pov, pvd), + .persisted_validation_data_hash = hash_of(pvd), + .validation_code = validation_code, + } + .build(*hasher_); + + assert_validate_seconded_candidate(leaf_a_parent, + candidate, + pov, + pvd, + validation_code, + expected_head_data, + false); + + // `seconding_sanity_check` + const HypotheticalCandidateComplete hypothetical_candidate{ + .candidate_hash = network::candidateHash(*hasher_, candidate), + .receipt = candidate, + .persisted_validation_data = pvd, + }; + const IProspectiveParachains::HypotheticalMembershipRequest + expected_request_a{ + .candidates = {hypothetical_candidate}, + .fragment_chain_relay_parent = leaf_a_hash, + }; + const auto expected_response_a = make_hypothetical_membership_response( + hypothetical_candidate, leaf_a_hash); + + const IProspectiveParachains::HypotheticalMembershipRequest + expected_request_b{ + .candidates = {hypothetical_candidate}, + .fragment_chain_relay_parent = leaf_b_hash, + }; + const auto expected_response_b = make_hypothetical_membership_response( + hypothetical_candidate, leaf_b_hash); + + assert_hypothetical_membership_requests( + {{expected_request_a, expected_response_a}, + {expected_request_b, expected_response_b}}); + + const network::CommittedCandidateReceipt receipt{ + .descriptor = candidate.descriptor, + .commitments = + network::CandidateCommitments{ + .upward_msgs = {}, + .outbound_hor_msgs = {}, + .opt_para_runtime = std::nullopt, + .para_head = expected_head_data, + .downward_msgs_count = 0, + .watermark = 0, + }, + }; + network::Statement statement{network::CandidateState{receipt}}; + + IndexedAndSigned signed_statement{ + .payload = + { + .payload = statement, + .ix = 0, + }, + .signature = {}, + }; + + EXPECT_CALL(*signer_, sign(statement)).WillOnce(Return(signed_statement)); + + const auto candidate_hash = network::candidateHash(*hasher_, candidate); + EXPECT_CALL( + *prospective_parachains_, + introduce_seconded_candidate(para_id, + candidate, + testing::_, + candidate_hash)) // request.candidates, ref + .WillOnce(Return(true)); + + EXPECT_CALL(*statement_distribution_, + share_local_statement(leaf_a_parent, testing::_)) + .WillOnce(Return()); + + BackingStore::ImportResult import_result{ + .candidate = candidate_hash, + .group_id = 0, + .validity_votes = 1, + }; + + EXPECT_CALL( + *backing_store_, + put(leaf_a_parent, testing::_, testing::_, signed_statement, testing::_)) + .WillOnce(Return(import_result)); + + BackingStore::StatementInfo stmt_info{ + .group_id = 0, + .candidate = receipt, + .validity_votes = {{0, BackingStore::ValidityVoteIssued{}}}, + }; + EXPECT_CALL(*backing_store_, getCadidateInfo(leaf_a_parent, candidate_hash)) + .WillOnce(Return(std::cref(stmt_info))); + + parachain_processor_->handle_second_message( + candidate.to_plain(*hasher_), pov, pvd, leaf_a_hash); +} diff --git a/test/core/parachain/parachain_test_harness.hpp b/test/core/parachain/parachain_test_harness.hpp index fdc71a7713..6b55d48a04 100644 --- a/test/core/parachain/parachain_test_harness.hpp +++ b/test/core/parachain/parachain_test_harness.hpp @@ -64,6 +64,7 @@ class ProspectiveParachainsTestHarness : public testing::Test { static constexpr uint64_t ALLOWED_ANCESTRY_LEN = 3ull; static constexpr uint32_t MAX_POV_SIZE = 1000000; + static constexpr uint32_t LEGACY_MIN_BACKING_VOTES = 2; Hash hashFromStrData(std::span data) { return ghashFromStrData(hasher_, data); @@ -260,4 +261,9 @@ class ProspectiveParachainsTestHarness : public testing::Test { memset(&h[0], n, 32); return h; } + + static Hash get_parent_hash(const Hash &parent) { + const auto val = *(uint8_t *)&parent[0]; + return fromNumber(val + 1); + } }; diff --git a/test/core/parachain/prospective_parachains.cpp b/test/core/parachain/prospective_parachains.cpp index 16bd50a422..dcde085945 100644 --- a/test/core/parachain/prospective_parachains.cpp +++ b/test/core/parachain/prospective_parachains.cpp @@ -108,11 +108,6 @@ class ProspectiveParachainsTest : public ProspectiveParachainsTestHarness { }; } - static Hash get_parent_hash(const Hash &parent) { - const auto val = *(uint8_t *)&parent[0]; - return fromNumber(val + 1); - } - void handle_leaf_activation_2( const network::ExView &update, const TestLeaf &leaf, @@ -191,7 +186,8 @@ class ProspectiveParachainsTest : public ProspectiveParachainsTestHarness { .digest = {}, .hash_opt = {}, }; - EXPECT_CALL(*block_tree_, tryGetBlockHeader(h_)).WillRepeatedly(Return(h)); + EXPECT_CALL(*block_tree_, tryGetBlockHeader(h_)) + .WillRepeatedly(Return(h)); EXPECT_CALL(*parachain_api_, session_index_for_child(h_)) .WillRepeatedly(Return(outcome::success(1))); used_relay_parents.emplace(h_); @@ -416,7 +412,7 @@ TEST_F(ProspectiveParachainsTest, EXPECT_CALL(*parachain_api_, staging_async_backing_params(hash)) .WillRepeatedly( - Return(outcome::failure(ParachainProcessorImpl::Error::NO_STATE))); + Return(outcome::failure(ParachainProcessor::Error::NO_STATE))); std::ignore = prospective_parachain_->onActiveLeavesUpdate(network::ExViewRef{ .new_head = {update.new_head}, diff --git a/test/mock/core/network/peer_view_mock.hpp b/test/mock/core/network/peer_view_mock.hpp new file mode 100644 index 0000000000..692e18fcb9 --- /dev/null +++ b/test/mock/core/network/peer_view_mock.hpp @@ -0,0 +1,37 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include "network/peer_view.hpp" + +#include + +namespace kagome::network { + class PeerViewMock : public IPeerView { + public: + MOCK_METHOD(size_t, peersCount, (), (const, override)); + + MOCK_METHOD(MyViewSubscriptionEnginePtr, + getMyViewObservable, + (), + (override)); + + MOCK_METHOD(PeerViewSubscriptionEnginePtr, + getRemoteViewObservable, + (), + (override)); + + MOCK_METHOD(void, removePeer, (const PeerId &), (override)); + + MOCK_METHOD(void, + updateRemoteView, + (const PeerId &, network::View &&), + (override)); + + MOCK_METHOD(const View &, getMyView, (), (const, override)); + }; +} // namespace kagome::network diff --git a/test/mock/core/parachain/bitfield_signer_mock.hpp b/test/mock/core/parachain/bitfield_signer_mock.hpp new file mode 100644 index 0000000000..ef956a5921 --- /dev/null +++ b/test/mock/core/parachain/bitfield_signer_mock.hpp @@ -0,0 +1,27 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include + +#include "parachain/availability/bitfield/signer.hpp" + +namespace kagome::parachain { + + class BitfieldSignerMock : public IBitfieldSigner { + public: + MOCK_METHOD(void, start, (), (override)); + + MOCK_METHOD(outcome::result, + sign, + (const IValidatorSigner &, const Candidates &), + (override)); + + MOCK_METHOD(void, setBroadcastCallback, (BroadcastCallback &&), (override)); + }; + +} // namespace kagome::parachain diff --git a/test/mock/core/parachain/prospective_parachains_mock.hpp b/test/mock/core/parachain/prospective_parachains_mock.hpp new file mode 100644 index 0000000000..e16b69275a --- /dev/null +++ b/test/mock/core/parachain/prospective_parachains_mock.hpp @@ -0,0 +1,100 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include "parachain/validator/prospective_parachains/prospective_parachains.hpp" + +#include + +namespace kagome::parachain { + + class ProspectiveParachainsMock : public IProspectiveParachains { + public: + MOCK_METHOD(void, printStoragesLoad, (), (override)); + + MOCK_METHOD(std::shared_ptr, + getBlockTree, + (), + (override)); + + MOCK_METHOD((std::vector>), + answerMinimumRelayParentsRequest, + (const RelayHash &), + (override)); + + MOCK_METHOD( + (std::vector>), + answerGetBackableCandidates, + (const RelayHash &, ParachainId, uint32_t, const fragment::Ancestors &), + (override)); + + MOCK_METHOD( + outcome::result>, + answerProspectiveValidationDataRequest, + (const RelayHash &, const ParentHeadData &, ParachainId), + (override)); + + MOCK_METHOD(std::optional, + prospectiveParachainsMode, + (const RelayHash &), + (override)); + + MOCK_METHOD(outcome::result, + onActiveLeavesUpdate, + (const network::ExViewRef &), + (override)); + + std::vector< + std::pair> + answer_hypothetical_membership_request( + const std::span &candidates, + const std::optional> + &fragment_tree_relay_parent) override { + IProspectiveParachains::HypotheticalMembershipRequest request; + for (const auto &c : candidates) { + request.candidates.emplace_back(c); + } + if (fragment_tree_relay_parent) { + request.fragment_chain_relay_parent = fragment_tree_relay_parent->get(); + } + return answer_hypothetical_membership_request(request); + } + + MOCK_METHOD((std::vector>), + answer_hypothetical_membership_request, + (const IProspectiveParachains::HypotheticalMembershipRequest &), + ()); + + MOCK_METHOD(void, + candidate_backed, + (ParachainId, const CandidateHash &), + (override)); + + MOCK_METHOD(bool, + introduce_seconded_candidate, + (ParachainId, + const network::CommittedCandidateReceipt &, + (const crypto::Hashed> &), + const CandidateHash &), + (override)); + }; + +} // namespace kagome::parachain + +namespace boost { + inline auto &operator<<(std::ostream &s, + const kagome::parachain::ParentHeadData &) { + return s; + } + inline auto &operator<<(std::ostream &s, + const kagome::parachain::HypotheticalCandidate &) { + return s; + } +} // namespace boost diff --git a/test/mock/core/parachain/pvf_mock.hpp b/test/mock/core/parachain/pvf_mock.hpp new file mode 100644 index 0000000000..0df6ded28d --- /dev/null +++ b/test/mock/core/parachain/pvf_mock.hpp @@ -0,0 +1,50 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include + +#include "parachain/pvf/pvf.hpp" + +namespace kagome::parachain { + + class PvfMock : public Pvf { + public: + void pvf(const CandidateReceipt &receipt, + const ParachainBlock &pov, + const runtime::PersistedValidationData &pvd, + Cb cb) const override { + cb(call_pvf(receipt, pov, pvd)); + } + + void pvfValidate(const PersistedValidationData &pvd, + const ParachainBlock &pb, + const CandidateReceipt &r, + const ParachainRuntime &pr, + runtime::PvfExecTimeoutKind kind, + Cb cb) const override { + cb(call_pvfValidate(pvd, pb, r, pr, kind)); + } + + MOCK_METHOD(Result, + call_pvf, + (const CandidateReceipt &, + const ParachainBlock &, + const runtime::PersistedValidationData &), + (const)); + + MOCK_METHOD(Result, + call_pvfValidate, + (const PersistedValidationData &, + const ParachainBlock &, + const CandidateReceipt &, + const ParachainRuntime &, + runtime::PvfExecTimeoutKind), + (const)); + }; + +} // namespace kagome::parachain diff --git a/test/mock/core/parachain/pvf_precheck_mock.hpp b/test/mock/core/parachain/pvf_precheck_mock.hpp new file mode 100644 index 0000000000..e6b4236c0a --- /dev/null +++ b/test/mock/core/parachain/pvf_precheck_mock.hpp @@ -0,0 +1,20 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include + +#include "parachain/pvf/precheck.hpp" + +namespace kagome::parachain { + + class PvfPrecheckMock : public IPvfPrecheck { + public: + MOCK_METHOD(void, start, (), (override)); + }; + +} // namespace kagome::parachain diff --git a/test/mock/core/parachain/signer_factory_mock.hpp b/test/mock/core/parachain/signer_factory_mock.hpp new file mode 100644 index 0000000000..676549f672 --- /dev/null +++ b/test/mock/core/parachain/signer_factory_mock.hpp @@ -0,0 +1,56 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include + +#include "parachain/validator/signer.hpp" + +namespace kagome::parachain { + + class ValidatorSignerMock : public IValidatorSigner { + public: + MOCK_METHOD(outcome::result>, + sign, + (const network::Statement &), + (const, override)); + + MOCK_METHOD(outcome::result>, + sign, + (const scale::BitVec &), + (const, override)); + + MOCK_METHOD(ValidatorIndex, validatorIndex, (), (const, override)); + + MOCK_METHOD(SessionIndex, getSessionIndex, (), (const, override)); + + MOCK_METHOD(const primitives::BlockHash &, + relayParent, + (), + (const, override)); + + MOCK_METHOD(outcome::result, + signRaw, + (common::BufferView), + (const, override)); + }; + + class ValidatorSignerFactoryMock : public IValidatorSignerFactory { + public: + MOCK_METHOD( + outcome::result>>, + at, + (const primitives::BlockHash &), + (override)); + + MOCK_METHOD(outcome::result>, + getAuthorityValidatorIndex, + (const primitives::BlockHash &), + (override)); + }; + +} // namespace kagome::parachain diff --git a/test/mock/core/parachain/statement_distribution_mock.hpp b/test/mock/core/parachain/statement_distribution_mock.hpp new file mode 100644 index 0000000000..990fda3ad5 --- /dev/null +++ b/test/mock/core/parachain/statement_distribution_mock.hpp @@ -0,0 +1,59 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include + +#include "parachain/validator/statement_distribution/i_statement_distribution.hpp" + +namespace kagome::parachain::statement_distribution { + + class StatementDistributionMock : public IStatementDistribution { + public: + MOCK_METHOD(void, + OnFetchAttestedCandidateRequest, + (const network::vstaging::AttestedCandidateRequest &, + std::shared_ptr), + (override)); + + MOCK_METHOD(void, + store_parachain_processor, + (std::weak_ptr), + (override)); + + MOCK_METHOD(void, + handle_incoming_manifest, + (const libp2p::peer::PeerId &, + const network::vstaging::BackedCandidateManifest &), + (override)); + + MOCK_METHOD(void, + handle_incoming_acknowledgement, + (const libp2p::peer::PeerId &, + const network::vstaging::BackedCandidateAcknowledgement &), + (override)); + + MOCK_METHOD( + void, + handle_incoming_statement, + (const libp2p::peer::PeerId &, + const network::vstaging::StatementDistributionMessageStatement &), + (override)); + + MOCK_METHOD(void, + handle_backed_candidate_message, + (const CandidateHash &), + (override)); + + MOCK_METHOD(void, + share_local_statement, + (const primitives::BlockHash &, + const SignedFullStatementWithPVD &), + (override)); + }; + +} // namespace kagome::parachain::statement_distribution