Skip to content

Commit

Permalink
[TEST] ParachainProcessorEmpty
Browse files Browse the repository at this point in the history
Signed-off-by: iceseer <[email protected]>
  • Loading branch information
iceseer committed Dec 18, 2024
1 parent d018efa commit 4f259e0
Show file tree
Hide file tree
Showing 2 changed files with 161 additions and 12 deletions.
8 changes: 4 additions & 4 deletions core/injector/application_injector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -801,15 +801,15 @@ namespace {
di::bind<parachain::IProspectiveParachains>.template to<parachain::ProspectiveParachains>(),
bind_by_lambda<parachain::BackedCandidatesSource>(
[](const auto &injector) {
return injector.template create<sptr<parachain::ParachainProcessorImpl>>();
return injector.template create<sptr<parachain::ParachainProcessorEmpty>>();
}),
bind_by_lambda<parachain::ParachainStorage>(
[](const auto &injector) {
return injector.template create<sptr<parachain::ParachainProcessorImpl>>();
return injector.template create<sptr<parachain::ParachainProcessorEmpty>>();
}),
bind_by_lambda<parachain::ParachainProcessor>(
[](const auto &injector) {
return injector.template create<sptr<parachain::ParachainProcessorImpl>>();
return injector.template create<sptr<parachain::ParachainProcessorEmpty>>();
}),
di::bind<parachain::IPvfPrecheck>.template to<parachain::PvfPrecheck>(),
bind_by_lambda<network::CanDisconnect>(
Expand Down Expand Up @@ -1039,7 +1039,7 @@ namespace kagome::injector {
std::shared_ptr<parachain::ParachainProcessor>
KagomeNodeInjector::injectParachainProcessor() {
return pimpl_->injector_
.template create<sptr<parachain::ParachainProcessorImpl>>();
.template create<sptr<parachain::ParachainProcessorEmpty>>();
}

std::shared_ptr<parachain::statement_distribution::StatementDistribution>
Expand Down
165 changes: 157 additions & 8 deletions core/parachain/validator/parachain_processor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
#include "parachain/validator/backed_candidates_source.hpp"
#include "parachain/validator/backing_implicit_view.hpp"
#include "parachain/validator/collations.hpp"
#include "parachain/validator/i_parachain_processor.hpp"
#include "parachain/validator/parachain_storage.hpp"
#include "parachain/validator/prospective_parachains/prospective_parachains.hpp"
#include "parachain/validator/signer.hpp"
#include "parachain/validator/statement_distribution/i_statement_distribution.hpp"
Expand All @@ -44,8 +46,6 @@
#include "primitives/event_types.hpp"
#include "utils/non_copyable.hpp"
#include "utils/safe_object.hpp"
#include "parachain/validator/i_parachain_processor.hpp"
#include "parachain/validator/parachain_storage.hpp"

/**
* @file parachain_processor_impl.hpp
Expand Down Expand Up @@ -119,6 +119,156 @@ struct std::hash<kagome::parachain::BlockedCollationId> {

namespace kagome::parachain {

class ParachainProcessorEmpty
: public ParachainStorageImpl,
public std::enable_shared_from_this<ParachainProcessorEmpty>,
public BackedCandidatesSource,
public ParachainProcessor {
struct Status {
size_t legacy_statement_counter_ = 0;
size_t ack_counter_ = 0;
size_t manifest_counter_ = 0;
size_t statement_counter_ = 0;
};

SafeObject<Status> status_;
log::Logger logger_ =
log::createLogger("ParachainProcessorEmpty", "parachain");
std::shared_ptr<statement_distribution::IStatementDistribution>
statement_distribution_;

void process_vstaging_statement(
const libp2p::peer::PeerId &peer_id,
const network::vstaging::StatementDistributionMessage &msg) {
SL_TRACE(logger_,
"Incoming `StatementDistributionMessage`. (peer={})",
peer_id);

if (auto inner =
if_type<const network::vstaging::BackedCandidateAcknowledgement>(
msg)) {
status_.exclusiveAccess([](Status &status) { ++status.ack_counter_; });
statement_distribution_->handle_incoming_acknowledgement(peer_id,
inner->get());
} else if (auto manifest =
if_type<const network::vstaging::BackedCandidateManifest>(
msg)) {
status_.exclusiveAccess(
[](Status &status) { ++status.manifest_counter_; });
statement_distribution_->handle_incoming_manifest(peer_id,
manifest->get());
} else if (auto stm =
if_type<const network::vstaging::
StatementDistributionMessageStatement>(msg)) {
status_.exclusiveAccess(
[](Status &status) { ++status.statement_counter_; });
statement_distribution_->handle_incoming_statement(peer_id, stm->get());
} else {
SL_ERROR(logger_, "Skipped message.");
}
}

void process_legacy_statement(
const libp2p::peer::PeerId &peer_id,
const network::StatementDistributionMessage &msg) {
status_.exclusiveAccess(
[](Status &status) { ++status.legacy_statement_counter_; });
}

public:
ParachainProcessorEmpty(
application::AppStateManager &app_state_manager,
std::shared_ptr<parachain::AvailabilityStore> av_store,
std::shared_ptr<statement_distribution::IStatementDistribution>
statement_distribution)
: ParachainStorageImpl(std::move(av_store)),
statement_distribution_(statement_distribution) {
app_state_manager.takeControl(*this);
}

bool prepare() {
statement_distribution_->store_parachain_processor(weak_from_this());
std::thread t4([wself{weak_from_this()}]() {
auto prev = std::chrono::steady_clock::now();
while (true) {
std::this_thread::sleep_for(std::chrono::seconds(2));
if (auto self = wself.lock()) {
const auto now = std::chrono::steady_clock::now();
const auto diff =
std::chrono::duration_cast<std::chrono::minutes>(now - prev)
.count();
if (diff > 0) {
self->status_.exclusiveAccess([&](Status &status) {
SL_TRACE(
self->logger_,
"[STATISTICS]:\nlegacy_statement_counter:{}\nack_counter:{}"
"\nmanifest_counter:{}\nstatement_counter:{}",
status.legacy_statement_counter_,
status.ack_counter_,
status.manifest_counter_,
status.statement_counter_);

status.legacy_statement_counter_ = 0;
status.ack_counter_ = 0;
status.manifest_counter_ = 0;
status.statement_counter_ = 0;
});
prev = now;
}
} else {
break;
}
}
});
t4.detach();
return true;
}

void onValidationProtocolMsg(
const libp2p::peer::PeerId &peer_id,
const network::VersionedValidatorProtocolMessage &message) override {
SL_TRACE(
logger_, "Incoming validator protocol message. (peer={})", peer_id);
visit_in_place(
message,
[&](const network::ValidatorProtocolMessage &m) {
SL_TRACE(logger_, "V1");
visit_in_place(
m,
[&](const network::StatementDistributionMessage &val) {
process_legacy_statement(peer_id, val);
},
[&](const auto &) {});
},
[&](const network::vstaging::ValidatorProtocolMessage &m) {
SL_TRACE(logger_, "V2");
visit_in_place(
m,
[&](const network::vstaging::StatementDistributionMessage
&val) { process_vstaging_statement(peer_id, val); },
[&](const auto &) {});
},
[&](const auto &m) { SL_WARN(logger_, "UNSUPPORTED Version"); });
}

void handle_advertisement(const RelayHash &relay_parent,
const libp2p::peer::PeerId &peer_id,
std::optional<std::pair<CandidateHash, Hash>>
&&prospective_candidate) override {}

void onIncomingCollator(const libp2p::peer::PeerId &peer_id,
network::CollatorPublicKey pubkey,
network::ParachainId para_id) override {}

outcome::result<void> canProcessParachains() const override {
return outcome::success();
}

void handleStatement(const primitives::BlockHash &relay_parent,
const SignedFullStatementWithPVD &statement) override {
}
};

class ParachainProcessorImpl
: public ParachainStorageImpl,
public std::enable_shared_from_this<ParachainProcessorImpl>,
Expand Down Expand Up @@ -254,10 +404,10 @@ namespace kagome::parachain {
* @param prospective_candidate An optional pair containing the hash of the
* prospective candidate and the hash of the parent block.
*/
void handle_advertisement(
const RelayHash &relay_parent,
const libp2p::peer::PeerId &peer_id,
std::optional<std::pair<CandidateHash, Hash>> &&prospective_candidate) override;
void handle_advertisement(const RelayHash &relay_parent,
const libp2p::peer::PeerId &peer_id,
std::optional<std::pair<CandidateHash, Hash>>
&&prospective_candidate) override;

/**
* @brief This function is used to make a candidate available for
Expand Down Expand Up @@ -303,7 +453,6 @@ namespace kagome::parachain {
std::vector<network::BackedCandidate> getBackedCandidates(
const RelayHash &relay_parent) override;


auto getAvStore() {
return av_store_;
}
Expand All @@ -325,7 +474,7 @@ namespace kagome::parachain {
* SignedFullStatementWithPVD object.
*/
void handleStatement(const primitives::BlockHash &relay_parent,
const SignedFullStatementWithPVD &statement) override;
const SignedFullStatementWithPVD &statement) override;

private:
enum struct StatementType { kSeconded = 0, kValid };
Expand Down

0 comments on commit 4f259e0

Please sign in to comment.