diff --git a/validator/fabric.h b/validator/fabric.h index 80d962e09..474180472 100644 --- a/validator/fabric.h +++ b/validator/fabric.h @@ -26,6 +26,8 @@ namespace ton { namespace validator { +enum CollateMode { skip_store_candidate = 1 }; + td::actor::ActorOwn create_db_actor(td::actor::ActorId manager, std::string db_root_, td::Ref opts); td::actor::ActorOwn create_liteserver_cache_actor(td::actor::ActorId manager, @@ -81,7 +83,8 @@ void run_validate_query(ShardIdFull shard, BlockIdExt min_masterchain_block_id, void run_collate_query(ShardIdFull shard, const BlockIdExt& min_masterchain_block_id, std::vector prev, Ed25519_PublicKey creator, td::Ref validator_set, td::Ref collator_opts, td::actor::ActorId manager, - td::Timestamp timeout, td::Promise promise, int attempt_idx = 0); + td::Timestamp timeout, td::Promise promise, + td::CancellationToken cancellation_token, unsigned mode, int attempt_idx = 0); void run_collate_hardfork(ShardIdFull shard, const BlockIdExt& min_masterchain_block_id, std::vector prev, td::actor::ActorId manager, td::Timestamp timeout, td::Promise promise); diff --git a/validator/impl/collator-impl.h b/validator/impl/collator-impl.h index 3bfd53fed..ea1695f35 100644 --- a/validator/impl/collator-impl.h +++ b/validator/impl/collator-impl.h @@ -76,6 +76,7 @@ class Collator final : public td::actor::Actor { td::Timestamp timeout; td::Timestamp queue_cleanup_timeout_, soft_timeout_, medium_timeout_; td::Promise main_promise; + unsigned mode_ = 0; int attempt_idx_; bool allow_repeat_collation_ = false; ton::BlockSeqno last_block_seqno{0}; @@ -93,7 +94,7 @@ class Collator final : public td::actor::Actor { Collator(ShardIdFull shard, bool is_hardfork, BlockIdExt min_masterchain_block_id, std::vector prev, Ref validator_set, Ed25519_PublicKey collator_id, Ref collator_opts, td::actor::ActorId manager, td::Timestamp timeout, td::Promise promise, - int attempt_idx); + td::CancellationToken cancellation_token, unsigned mode, int attempt_idx); ~Collator() override = default; bool is_busy() const { return busy_; @@ -352,10 +353,14 @@ class Collator final : public td::actor::Actor { bool create_block(); Ref collate_shard_block_descr_set(); bool create_collated_data(); + bool create_block_candidate(); void return_block_candidate(td::Result saved); bool update_last_proc_int_msg(const std::pair& new_lt_hash); + td::CancellationToken cancellation_token_; + bool check_cancelled(); + public: static td::uint32 get_skip_externals_queue_size(); diff --git a/validator/impl/collator.cpp b/validator/impl/collator.cpp index f3b7ce67e..be9211708 100644 --- a/validator/impl/collator.cpp +++ b/validator/impl/collator.cpp @@ -76,12 +76,15 @@ static inline bool dbg(int c) { * @param manager The ActorId of the ValidatorManager. * @param timeout The timeout for the collator. * @param promise The promise to return the result. + * @param cancellation_token Token to cancel collation. + * @param mode +1 - skip storing candidate to disk. * @param attempt_idx The index of the attempt, starting from 0. On later attempts collator decreases block limits and skips some steps. */ Collator::Collator(ShardIdFull shard, bool is_hardfork, BlockIdExt min_masterchain_block_id, std::vector prev, td::Ref validator_set, Ed25519_PublicKey collator_id, Ref collator_opts, td::actor::ActorId manager, - td::Timestamp timeout, td::Promise promise, int attempt_idx) + td::Timestamp timeout, td::Promise promise, td::CancellationToken cancellation_token, + unsigned mode, int attempt_idx) : shard_(shard) , is_hardfork_(is_hardfork) , min_mc_block_id{min_masterchain_block_id} @@ -96,10 +99,13 @@ Collator::Collator(ShardIdFull shard, bool is_hardfork, BlockIdExt min_mastercha , soft_timeout_(td::Timestamp::at(timeout.at() - 3.0)) , medium_timeout_(td::Timestamp::at(timeout.at() - 1.5)) , main_promise(std::move(promise)) + , mode_(mode) , attempt_idx_(attempt_idx) - , perf_timer_("collate", 0.1, [manager](double duration) { - send_closure(manager, &ValidatorManager::add_perf_timer_stat, "collate", duration); - }) { + , perf_timer_("collate", 0.1, + [manager](double duration) { + send_closure(manager, &ValidatorManager::add_perf_timer_stat, "collate", duration); + }) + , cancellation_token_(std::move(cancellation_token)) { } /** @@ -113,6 +119,9 @@ Collator::Collator(ShardIdFull shard, bool is_hardfork, BlockIdExt min_mastercha void Collator::start_up() { LOG(WARNING) << "Collator for shard " << shard_.to_str() << " started" << (attempt_idx_ ? PSTRING() << " (attempt #" << attempt_idx_ << ")" : ""); + if (!check_cancelled()) { + return; + } LOG(DEBUG) << "Previous block #1 is " << prev_blocks.at(0).to_str(); if (prev_blocks.size() > 1) { LOG(DEBUG) << "Previous block #2 is " << prev_blocks.at(1).to_str(); @@ -345,10 +354,12 @@ bool Collator::fatal_error(td::Status error) { error.ensure_error(); LOG(ERROR) << "cannot generate block candidate for " << show_shard(shard_) << " : " << error.to_string(); if (busy_) { - if (allow_repeat_collation_ && attempt_idx_ + 1 < MAX_ATTEMPTS && !is_hardfork_ && !timeout.is_in_past()) { + if (allow_repeat_collation_ && error.code() != ErrorCode::cancelled && attempt_idx_ + 1 < MAX_ATTEMPTS && + !is_hardfork_ && !timeout.is_in_past()) { LOG(WARNING) << "Repeating collation (attempt #" << attempt_idx_ + 1 << ")"; run_collate_query(shard_, min_mc_block_id, prev_blocks, created_by_, validator_set_, collator_opts_, manager, - td::Timestamp::in(10.0), std::move(main_promise), attempt_idx_ + 1); + td::Timestamp::in(10.0), std::move(main_promise), std::move(cancellation_token_), mode_, + attempt_idx_ + 1); } else { main_promise(std::move(error)); } @@ -393,6 +404,9 @@ bool Collator::fatal_error(std::string err_msg, int err_code) { */ void Collator::check_pending() { // LOG(DEBUG) << "pending = " << pending; + if (!check_cancelled()) { + return; + } if (!pending) { step = 2; try { @@ -2354,6 +2368,9 @@ bool Collator::out_msg_queue_cleanup() { LOG(WARNING) << "cleaning up outbound queue takes too long, ending"; break; } + if (!check_cancelled()) { + return false; + } if (i == queue_parts.size()) { i = 0; } @@ -3553,6 +3570,9 @@ bool Collator::process_inbound_internal_messages() { stats_.limits_log += PSTRING() << "INBOUND_INT_MESSAGES: timeout\n"; break; } + if (!check_cancelled()) { + return false; + } auto kv = nb_out_msgs_->extract_cur(); CHECK(kv && kv->msg.not_null()); LOG(DEBUG) << "processing inbound message with (lt,hash)=(" << kv->lt << "," << kv->key.to_hex() @@ -3611,6 +3631,9 @@ bool Collator::process_inbound_external_messages() { stats_.limits_log += PSTRING() << "INBOUND_EXT_MESSAGES: timeout\n"; break; } + if (!check_cancelled()) { + return false; + } auto ext_msg = ext_msg_struct.cell; ton::Bits256 hash{ext_msg->get_hash().bits()}; int r = process_external_message(std::move(ext_msg)); @@ -4156,6 +4179,9 @@ bool Collator::process_new_messages(bool enqueue_only) { stats_.limits_log += PSTRING() << "NEW_MESSAGES: " << block_full_comment(*block_limit_status_, block::ParamLimits::cl_normal) << "\n"; } + if (!check_cancelled()) { + return false; + } LOG(DEBUG) << "have message with lt=" << msg.lt; int res = process_one_new_message(std::move(msg), enqueue_only); if (res < 0) { @@ -5532,14 +5558,18 @@ bool Collator::create_block_candidate() { << consensus_config.max_collated_data_size << ")"); } // 4. save block candidate - LOG(INFO) << "saving new BlockCandidate"; - td::actor::send_closure_later( - manager, &ValidatorManager::set_block_candidate, block_candidate->id, block_candidate->clone(), - validator_set_->get_catchain_seqno(), validator_set_->get_validator_set_hash(), - [self = get_self()](td::Result saved) -> void { - LOG(DEBUG) << "got answer to set_block_candidate"; - td::actor::send_closure_later(std::move(self), &Collator::return_block_candidate, std::move(saved)); - }); + if (mode_ & CollateMode::skip_store_candidate) { + td::actor::send_closure_later(actor_id(this), &Collator::return_block_candidate, td::Unit()); + } else { + LOG(INFO) << "saving new BlockCandidate"; + td::actor::send_closure_later( + manager, &ValidatorManager::set_block_candidate, block_candidate->id, block_candidate->clone(), + validator_set_->get_catchain_seqno(), validator_set_->get_validator_set_hash(), + [self = get_self()](td::Result saved) -> void { + LOG(DEBUG) << "got answer to set_block_candidate"; + td::actor::send_closure_later(std::move(self), &Collator::return_block_candidate, std::move(saved)); + }); + } // 5. communicate about bad and delayed external messages if (!bad_ext_msgs_.empty() || !delay_ext_msgs_.empty()) { LOG(INFO) << "sending complete_external_messages() to Manager"; @@ -5683,6 +5713,18 @@ void Collator::after_get_external_messages(td::Result prev, Ed25519_PublicKey creator, td::Ref validator_set, td::Ref collator_opts, td::actor::ActorId manager, - td::Timestamp timeout, td::Promise promise, int attempt_idx) { + td::Timestamp timeout, td::Promise promise, + td::CancellationToken cancellation_token, unsigned mode, int attempt_idx) { BlockSeqno seqno = 0; for (auto& p : prev) { if (p.seqno() > seqno) { @@ -225,7 +226,7 @@ void run_collate_query(ShardIdFull shard, const BlockIdExt& min_masterchain_bloc << (attempt_idx ? "_" + td::to_string(attempt_idx) : ""), shard, false, min_masterchain_block_id, std::move(prev), std::move(validator_set), creator, std::move(collator_opts), std::move(manager), timeout, std::move(promise), - attempt_idx) + std::move(cancellation_token), mode, attempt_idx) .release(); } @@ -241,7 +242,7 @@ void run_collate_hardfork(ShardIdFull shard, const BlockIdExt& min_masterchain_b td::actor::create_actor(PSTRING() << "collate" << shard.to_str() << ":" << (seqno + 1), shard, true, min_masterchain_block_id, std::move(prev), td::Ref{}, Ed25519_PublicKey{Bits256::zero()}, td::Ref{true}, - std::move(manager), timeout, std::move(promise), 0) + std::move(manager), timeout, std::move(promise), td::CancellationToken{}, 0, 0) .release(); } diff --git a/validator/manager-disk.cpp b/validator/manager-disk.cpp index 4c74ec60b..3418608d1 100644 --- a/validator/manager-disk.cpp +++ b/validator/manager-disk.cpp @@ -129,7 +129,7 @@ void ValidatorManagerImpl::sync_complete(td::Promise promise) { Ed25519_PublicKey created_by{td::Bits256::zero()}; td::as(created_by.as_bits256().data() + 32 - 4) = ((unsigned)std::time(nullptr) >> 8); run_collate_query(shard_id, last_masterchain_block_id_, prev, created_by, val_set, td::Ref{true}, - actor_id(this), td::Timestamp::in(10.0), std::move(P)); + actor_id(this), td::Timestamp::in(10.0), std::move(P), td::CancellationToken{}, 0); } void ValidatorManagerImpl::validate_fake(BlockCandidate candidate, std::vector prev, BlockIdExt last, diff --git a/validator/validator-group.cpp b/validator/validator-group.cpp index d589758a5..2f43b3b95 100644 --- a/validator/validator-group.cpp +++ b/validator/validator-group.cpp @@ -55,7 +55,7 @@ void ValidatorGroup::generate_block_candidate( validator_set_, opts_->get_collator_options(), manager_, td::Timestamp::in(10.0), [SelfId = actor_id(this), cache = cached_collated_block_](td::Result R) { td::actor::send_closure(SelfId, &ValidatorGroup::generated_block_candidate, std::move(cache), std::move(R)); - }); + }, cancellation_token_source_.get_cancellation_token(), /* mode = */ 0); } void ValidatorGroup::generated_block_candidate(std::shared_ptr cache, td::Result R) { @@ -188,6 +188,7 @@ void ValidatorGroup::accept_block_candidate(td::uint32 round_id, PublicKeyHash s prev_block_ids_ = std::vector{next_block_id}; cached_collated_block_ = nullptr; approved_candidates_cache_.clear(); + cancellation_token_source_.cancel(); } void ValidatorGroup::retry_accept_block_query(BlockIdExt block_id, td::Ref block, @@ -433,6 +434,7 @@ void ValidatorGroup::destroy() { delay_action([ses]() mutable { td::actor::send_closure(ses, &validatorsession::ValidatorSession::destroy); }, td::Timestamp::in(10.0)); } + cancellation_token_source_.cancel(); stop(); } diff --git a/validator/validator-group.hpp b/validator/validator-group.hpp index 16e13b4b6..35d574ac3 100644 --- a/validator/validator-group.hpp +++ b/validator/validator-group.hpp @@ -138,6 +138,7 @@ class ValidatorGroup : public td::actor::Actor { std::vector> promises; }; std::shared_ptr cached_collated_block_; + td::CancellationTokenSource cancellation_token_source_; void generated_block_candidate(std::shared_ptr cache, td::Result R);