From bf572f9599f92841581de08092db46eb911651e7 Mon Sep 17 00:00:00 2001 From: birydrad <> Date: Tue, 26 Nov 2024 14:01:20 +0400 Subject: [PATCH 1/5] optimistic out-msg-queue broadcast --- tdutils/td/utils/StringBuilder.h | 15 ++ tdutils/td/utils/Timer.cpp | 39 +++- tdutils/td/utils/Timer.h | 43 ++++- tdutils/td/utils/logging.h | 10 +- tl/generate/scheme/ton_api.tl | 6 +- tl/generate/scheme/ton_api.tlo | Bin 108568 -> 108888 bytes ton/ton-types.h | 49 ++++- validator/collation-manager.cpp | 11 +- validator/collation-manager.hpp | 6 +- validator/collator-node.cpp | 119 ++++++++++-- validator/collator-node.hpp | 7 +- validator/full-node-fast-sync-overlays.cpp | 43 +++++ validator/full-node-fast-sync-overlays.hpp | 2 + validator/full-node-shard.hpp | 3 + validator/full-node.cpp | 11 ++ validator/full-node.hpp | 1 + validator/impl/collator-impl.h | 15 +- validator/impl/collator.cpp | 205 ++++++++++++++------- validator/impl/out-msg-queue-proof.cpp | 113 ++++++++++-- validator/impl/out-msg-queue-proof.hpp | 18 +- validator/manager.cpp | 17 +- validator/manager.hpp | 1 + validator/validator-group.cpp | 8 +- validator/validator.h | 7 + 24 files changed, 623 insertions(+), 126 deletions(-) diff --git a/tdutils/td/utils/StringBuilder.h b/tdutils/td/utils/StringBuilder.h index 99e9d5172..685416fe3 100644 --- a/tdutils/td/utils/StringBuilder.h +++ b/tdutils/td/utils/StringBuilder.h @@ -149,4 +149,19 @@ std::enable_if_t::value, string> to_string(const T &x) { return sb.as_cslice().str(); } +template +struct LambdaPrintHelper { + SB& sb; +}; +template +SB& operator<<(const LambdaPrintHelper& helper, F&& f) { + f(helper.sb); + return helper.sb; +} +struct LambdaPrint {}; + +inline LambdaPrintHelper operator<<(td::StringBuilder& sb, const LambdaPrint&) { + return LambdaPrintHelper{sb}; +} + } // namespace td diff --git a/tdutils/td/utils/Timer.cpp b/tdutils/td/utils/Timer.cpp index 24de099aa..c2c678955 100644 --- a/tdutils/td/utils/Timer.cpp +++ b/tdutils/td/utils/Timer.cpp @@ -22,6 +22,8 @@ #include "td/utils/logging.h" #include "td/utils/Time.h" +#include + namespace td { Timer::Timer(bool is_paused) : is_paused_(is_paused) { @@ -60,12 +62,15 @@ StringBuilder &operator<<(StringBuilder &string_builder, const Timer &timer) { return string_builder << format::as_time(timer.elapsed()); } -PerfWarningTimer::PerfWarningTimer(string name, double max_duration, std::function&& callback) +PerfWarningTimer::PerfWarningTimer(string name, double max_duration, std::function &&callback) : name_(std::move(name)), start_at_(Time::now()), max_duration_(max_duration), callback_(std::move(callback)) { } PerfWarningTimer::PerfWarningTimer(PerfWarningTimer &&other) - : name_(std::move(other.name_)), start_at_(other.start_at_), max_duration_(other.max_duration_), callback_(std::move(other.callback_)) { + : name_(std::move(other.name_)) + , start_at_(other.start_at_) + , max_duration_(other.max_duration_) + , callback_(std::move(other.callback_)) { other.start_at_ = 0; } @@ -134,4 +139,34 @@ double ThreadCpuTimer::elapsed() const { return res; } +PerfLogAction PerfLog::start_action(std::string name) { + auto i = entries_.size(); + entries_.push_back({.name = std::move(name), .begin = td::Timestamp::now().at()}); + return PerfLogAction{i, std::unique_ptr(this)}; +} +td::StringBuilder &operator<<(StringBuilder &sb, const PerfLog &log) { + sb << "{"; + std::vector ids(log.entries_.size()); + std::iota(ids.begin(), ids.end(), 0); + std::sort(ids.begin(), ids.end(), [&](auto a, auto b) { + return log.entries_[a].end - log.entries_[a].begin > log.entries_[b].end - log.entries_[b].begin; + }); + sb << "{"; + for (size_t i = 0; i < log.entries_.size(); i++) { + sb << "\n\t"; + auto &entry = log.entries_[ids[i]]; + sb << "{" << entry.name << ":" << entry.begin << "->" << entry.end << "(" << entry.end - entry.begin << ")" + << td::format::cond(entry.status.is_error(), entry.status, "") << "}"; + } + sb << "\n}"; + return sb; +} + +double PerfLog::finish_action(size_t i, td::Status status) { + auto &entry = entries_[i]; + CHECK(entry.end == 0); + entry.end = td::Timestamp::now().at(); + entry.status = std::move(status); + return entry.end - entry.begin; +} } // namespace td diff --git a/tdutils/td/utils/Timer.h b/tdutils/td/utils/Timer.h index a27cac8a7..d787f1ca2 100644 --- a/tdutils/td/utils/Timer.h +++ b/tdutils/td/utils/Timer.h @@ -19,6 +19,7 @@ #pragma once #include "td/utils/StringBuilder.h" +#include "td/utils/Status.h" #include @@ -46,7 +47,7 @@ class Timer { class PerfWarningTimer { public: - explicit PerfWarningTimer(string name, double max_duration = 0.1, std::function&& callback = {}); + explicit PerfWarningTimer(string name, double max_duration = 0.1, std::function &&callback = {}); PerfWarningTimer(const PerfWarningTimer &) = delete; PerfWarningTimer &operator=(const PerfWarningTimer &) = delete; PerfWarningTimer(PerfWarningTimer &&other); @@ -80,4 +81,44 @@ class ThreadCpuTimer { bool is_paused_{false}; }; +class PerfLog; +struct EmptyDeleter { + template + void operator()(T *) { + } +}; +class PerfLogAction { + public: + template + double finish(const T &result); + size_t i_{0}; + std::unique_ptr perf_log_; +}; + +class PerfLog { + public: + PerfLogAction start_action(std::string name); + friend td::StringBuilder &operator<<(td::StringBuilder &sb, const PerfLog &log); + + private: + struct Entry { + std::string name; + double begin{}; + double end{}; + td::Status status; + }; + std::vector entries_; + friend class PerfLogAction; + + double finish_action(size_t i, td::Status status); +}; +template +double PerfLogAction::finish(const T &result) { + if (result.is_ok()) { + return perf_log_->finish_action(i_, td::Status::OK()); + } else { + return perf_log_->finish_action(i_, result.error().clone()); + } +} + } // namespace td diff --git a/tdutils/td/utils/logging.h b/tdutils/td/utils/logging.h index d00fba154..5c9a0621f 100644 --- a/tdutils/td/utils/logging.h +++ b/tdutils/td/utils/logging.h @@ -74,6 +74,7 @@ #define LOG(level) LOG_IMPL(level, level, true, ::td::Slice()) #define LOG_IF(level, condition) LOG_IMPL(level, level, condition, #condition) +#define FLOG(level) LOG_IMPL(level, level, true, ::td::Slice()) << td::LambdaPrint{} << [&](auto &sb) #define VLOG(level) LOG_IMPL(DEBUG, level, true, TD_DEFINE_STR(level)) #define VLOG_IF(level, condition) LOG_IMPL(DEBUG, level, condition, TD_DEFINE_STR(level) " " #condition) @@ -95,13 +96,13 @@ inline bool no_return_func() { #define DUMMY_LOG_CHECK(condition) LOG_IF(NEVER, !(condition)) #ifdef TD_DEBUG - #if TD_MSVC +#if TD_MSVC #define LOG_CHECK(condition) \ __analysis_assume(!!(condition)); \ LOG_IMPL(FATAL, FATAL, !(condition), #condition) - #else +#else #define LOG_CHECK(condition) LOG_IMPL(FATAL, FATAL, !(condition) && no_return_func(), #condition) - #endif +#endif #else #define LOG_CHECK DUMMY_LOG_CHECK #endif @@ -263,6 +264,9 @@ class Logger { sb_ << other; return *this; } + LambdaPrintHelper operator<<(const LambdaPrint &) { + return LambdaPrintHelper{*this}; + } MutableCSlice as_cslice() { return sb_.as_cslice(); diff --git a/tl/generate/scheme/ton_api.tl b/tl/generate/scheme/ton_api.tl index 5b272f268..720f8d496 100644 --- a/tl/generate/scheme/ton_api.tl +++ b/tl/generate/scheme/ton_api.tl @@ -433,6 +433,10 @@ tonNode.newBlockCandidateBroadcast id:tonNode.blockIdExt catchain_seqno:int vali tonNode.newBlockCandidateBroadcastCompressed id:tonNode.blockIdExt catchain_seqno:int validator_set_hash:int collator_signature:tonNode.blockSignature flags:# compressed:bytes = tonNode.Broadcast; +// optimistic broadcast of response to tonNode.getOutMsgQueueProof with dst_shard, block and limits arguments +tonNode.outMsgQueueProofBroadcast dst_shard:tonNode.shardId block:tonNode.blockIdExt + limits:ImportedMsgQueueLimits proof:tonNode.OutMsgQueueProof = tonNode.Broadcast; + tonNode.shardPublicOverlayId workchain:int shard:long zero_state_file_hash:int256 = tonNode.ShardPublicOverlayId; tonNode.privateBlockOverlayId zero_state_file_hash:int256 nodes:(vector int256) = tonNode.PrivateBlockOverlayId; @@ -924,7 +928,7 @@ validatorSession.endValidatorGroupStats session_id:int256 timestamp:double ---functions--- collatorNode.generateBlock shard:tonNode.shardId cc_seqno:int prev_blocks:(vector tonNode.blockIdExt) - creator:int256 = collatorNode.Candidate; + creator:int256 round:int first_block_round:int priority:int = collatorNode.Candidate; collatorNode.ping flags:# = collatorNode.Pong; ---types--- diff --git a/tl/generate/scheme/ton_api.tlo b/tl/generate/scheme/ton_api.tlo index c984fba0b30b868520b6ea2239c9d376838d2378..19a8d13138d85dd2bea8b7e28793cbc85e441aa9 100644 GIT binary patch delta 392 zcmbPnf$hd6whensSWchfai4s^M1+xJqNDicHzp07jE6Vp+7;h}$V_-DG2K9pF=cv! zEMpB=X!-+LMiWkuwwuRFZQDVRw(SW@j2f0KCPv{;ruRfM zsxY!o-x$p}fAWHS*6DAp8RdBe(=v;SOX8Dq@{_aUr(d*Ul%L)+nNef%nG%8R88M6- FyaE5biE#h` delta 264 zcmca{iEYLOwhensSgKTm3MU^h5n*JX=qSGVjY$J1Ofcx@{BUm4dfX`AY2c5Mu=t8 zff5D~u`O`cf>}VtEDA8iG71=0Oi*NOnC$ab1)@_$k6uUgvx diff --git a/ton/ton-types.h b/ton/ton-types.h index 11741c5ec..aeb0595ad 100644 --- a/ton/ton-types.h +++ b/ton/ton-types.h @@ -425,14 +425,47 @@ struct Ed25519_PublicKey { }; // represents (the contents of) a block + +struct OutMsgQueueProofBroadcast : public td::CntObject { + OutMsgQueueProofBroadcast(ShardIdFull dst_shard, BlockIdExt block_id, td::int32 max_bytes, td::int32 max_msgs, + td::BufferSlice queue_proofs, td::BufferSlice block_state_proofs, + std::vector msg_counts) + : dst_shard(std::move(dst_shard)) + , block_id(block_id) + , max_bytes(max_bytes) + , max_msgs(max_msgs) + , queue_proofs(std::move(queue_proofs)) + , block_state_proofs(std::move(block_state_proofs)) + , msg_counts(std::move(msg_counts)) { + } + ShardIdFull dst_shard; + BlockIdExt block_id; + + // importedMsgQueueLimits + td::uint32 max_bytes; + td::uint32 max_msgs; + + // outMsgQueueProof + td::BufferSlice queue_proofs; + td::BufferSlice block_state_proofs; + std::vector msg_counts; + + virtual OutMsgQueueProofBroadcast* make_copy() const { + return new OutMsgQueueProofBroadcast(dst_shard, block_id, max_bytes, max_msgs, queue_proofs.clone(), + block_state_proofs.clone(), msg_counts); + } +}; + struct BlockCandidate { BlockCandidate(Ed25519_PublicKey pubkey, BlockIdExt id, FileHash collated_file_hash, td::BufferSlice data, - td::BufferSlice collated_data) + td::BufferSlice collated_data, + std::vector> out_msg_queue_broadcasts = {}) : pubkey(pubkey) , id(id) , collated_file_hash(collated_file_hash) , data(std::move(data)) - , collated_data(std::move(collated_data)) { + , collated_data(std::move(collated_data)) + , out_msg_queue_proof_broadcasts(std::move(out_msg_queue_broadcasts)) { } Ed25519_PublicKey pubkey; BlockIdExt id; @@ -440,11 +473,21 @@ struct BlockCandidate { td::BufferSlice data; td::BufferSlice collated_data; + // used only locally + std::vector> out_msg_queue_proof_broadcasts; + BlockCandidate clone() const { - return BlockCandidate{pubkey, id, collated_file_hash, data.clone(), collated_data.clone()}; + return BlockCandidate{ + pubkey, id, collated_file_hash, data.clone(), collated_data.clone(), out_msg_queue_proof_broadcasts}; } }; +struct BlockCandidatePriority { + td::uint32 round{}; + td::uint32 first_block_round{}; + td::int32 priority{}; +}; + struct ValidatorDescr { /* ton::validator::ValidatorFullId */ Ed25519_PublicKey key; ValidatorWeight weight; diff --git a/validator/collation-manager.cpp b/validator/collation-manager.cpp index 2ca3a5f10..e84470595 100644 --- a/validator/collation-manager.cpp +++ b/validator/collation-manager.cpp @@ -33,6 +33,7 @@ void CollationManager::start_up() { void CollationManager::collate_block(ShardIdFull shard, BlockIdExt min_masterchain_block_id, std::vector prev, Ed25519_PublicKey creator, + BlockCandidatePriority priority, td::Ref validator_set, td::uint64 max_answer_size, td::CancellationToken cancellation_token, td::Promise promise) { if (shard.is_masterchain()) { @@ -41,12 +42,13 @@ void CollationManager::collate_block(ShardIdFull shard, BlockIdExt min_mastercha std::move(cancellation_token), 0); return; } - collate_shard_block(shard, min_masterchain_block_id, std::move(prev), creator, std::move(validator_set), + collate_shard_block(shard, min_masterchain_block_id, std::move(prev), creator, priority, std::move(validator_set), max_answer_size, std::move(cancellation_token), std::move(promise), td::Timestamp::in(10.0)); } void CollationManager::collate_shard_block(ShardIdFull shard, BlockIdExt min_masterchain_block_id, std::vector prev, Ed25519_PublicKey creator, + BlockCandidatePriority priority, td::Ref validator_set, td::uint64 max_answer_size, td::CancellationToken cancellation_token, td::Promise promise, td::Timestamp timeout) { @@ -133,8 +135,8 @@ void CollationManager::collate_shard_block(ShardIdFull shard, BlockIdExt min_mas delay_action( [=, promise = std::move(promise)]() mutable { td::actor::send_closure(SelfId, &CollationManager::collate_shard_block, shard, min_masterchain_block_id, prev, - creator, validator_set, max_answer_size, cancellation_token, std::move(promise), - timeout); + creator, priority, validator_set, max_answer_size, cancellation_token, + std::move(promise), timeout); }, retry_at); }; @@ -145,7 +147,8 @@ void CollationManager::collate_shard_block(ShardIdFull shard, BlockIdExt min_mas } td::BufferSlice query = create_serialize_tl_object( - create_tl_shard_id(shard), validator_set->get_catchain_seqno(), std::move(prev_blocks), creator.as_bits256()); + create_tl_shard_id(shard), validator_set->get_catchain_seqno(), std::move(prev_blocks), creator.as_bits256(), + priority.round, priority.first_block_round, priority.priority); LOG(INFO) << "sending collate query for " << next_block_id.to_str() << ": send to #" << selected_idx << "(" << selected_collator << ")"; diff --git a/validator/collation-manager.hpp b/validator/collation-manager.hpp index 7ceea1e6b..9ca69814b 100644 --- a/validator/collation-manager.hpp +++ b/validator/collation-manager.hpp @@ -35,7 +35,8 @@ class CollationManager : public td::actor::Actor { void alarm() override; void collate_block(ShardIdFull shard, BlockIdExt min_masterchain_block_id, std::vector prev, - Ed25519_PublicKey creator, td::Ref validator_set, td::uint64 max_answer_size, + Ed25519_PublicKey creator, BlockCandidatePriority priority, + td::Ref validator_set, td::uint64 max_answer_size, td::CancellationToken cancellation_token, td::Promise promise); void update_options(td::Ref opts); @@ -52,7 +53,8 @@ class CollationManager : public td::actor::Actor { td::actor::ActorId rldp_; void collate_shard_block(ShardIdFull shard, BlockIdExt min_masterchain_block_id, std::vector prev, - Ed25519_PublicKey creator, td::Ref validator_set, td::uint64 max_answer_size, + Ed25519_PublicKey creator, BlockCandidatePriority priority, + td::Ref validator_set, td::uint64 max_answer_size, td::CancellationToken cancellation_token, td::Promise promise, td::Timestamp timeout); diff --git a/validator/collator-node.cpp b/validator/collator-node.cpp index cf2770d93..1b8eb79e8 100644 --- a/validator/collator-node.cpp +++ b/validator/collator-node.cpp @@ -191,17 +191,41 @@ void CollatorNode::update_validator_group_info(ShardIdFull shard, std::vectorblock_seqno < info.next_block_seqno) { cache_entry->cancel(td::Status::Error(PSTRING() << "next block seqno " << cache_entry->block_seqno << " is too small, expected " << info.next_block_seqno)); + if (!cache_entry->has_external_query_at && cache_entry->has_internal_query_at) { + LOG(INFO) << "generate block query" + << ": shard=" << shard.to_str() << ", cc_seqno=" << cc_seqno + << ", next_block_seqno=" << cache_entry->block_seqno + << ": nobody asked for block, but we tried to generate it"; + } + if (cache_entry->has_external_query_at && !cache_entry->has_internal_query_at) { + LOG(INFO) << "generate block query" + << ": shard=" << shard.to_str() << ", cc_seqno=" << cc_seqno + << ", next_block_seqno=" << cache_entry->block_seqno + << ": somebody asked for block we didn't even tried to generate"; + } cache_it = info.cache.erase(cache_it); continue; } if (cache_entry->block_seqno == info.next_block_seqno && cached_prev != info.prev) { cache_entry->cancel(td::Status::Error("invalid prev blocks")); + if (!cache_entry->has_external_query_at && cache_entry->has_internal_query_at) { + LOG(INFO) << "generate block query" + << ": shard=" << shard.to_str() << ", cc_seqno=" << cc_seqno + << ", next_block_seqno=" << cache_entry->block_seqno + << ": nobody asked for block, but we tried to generate it"; + } + if (cache_entry->has_external_query_at && !cache_entry->has_internal_query_at) { + LOG(INFO) << "generate block query" + << ": shard=" << shard.to_str() << ", cc_seqno=" << cc_seqno + << ", next_block_seqno=" << cache_entry->block_seqno + << ": somebody asked for block we didn't even tried to generate"; + } cache_it = info.cache.erase(cache_it); continue; } ++cache_it; } - generate_block(shard, cc_seqno, info.prev, td::Timestamp::in(10.0), [](td::Result) {}); + generate_block(shard, cc_seqno, info.prev, {}, td::Timestamp::in(10.0), [](td::Result) {}); } return; } @@ -285,6 +309,14 @@ static BlockCandidate change_creator(BlockCandidate block, Ed25519_PublicKey cre cc_seqno = info.gen_catchain_seqno; val_set_hash = info.gen_validator_list_hash_short; + + for (auto& broadcast_ref : block.out_msg_queue_proof_broadcasts) { + auto block_state_proof = create_block_state_proof(root).move_as_ok(); + + auto &broadcast = broadcast_ref.write(); + broadcast.block_id = block.id; + broadcast.block_state_proofs = vm::std_boc_serialize(std::move(block_state_proof), 31).move_as_ok(); + } return block; } @@ -322,6 +354,11 @@ void CollatorNode::receive_query(adnl::AdnlNodeIdShort src, td::BufferSlice data for (const auto& b : f->prev_blocks_) { prev_blocks.push_back(create_block_id(b)); } + auto priority = BlockCandidatePriority { + .round = static_cast(f->round_), + .first_block_round = static_cast(f->first_block_round_), + .priority = f->priority_ + }; Ed25519_PublicKey creator(f->creator_); td::Promise new_promise = [promise = std::move(promise), src, shard](td::Result R) mutable { @@ -353,11 +390,13 @@ void CollatorNode::receive_query(adnl::AdnlNodeIdShort src, td::BufferSlice data return; } LOG(INFO) << "got adnl query from " << src << ": shard=" << shard.to_str() << ", cc_seqno=" << cc_seqno; - generate_block(shard, cc_seqno, std::move(prev_blocks), td::Timestamp::in(10.0), std::move(new_promise)); + generate_block(shard, cc_seqno, std::move(prev_blocks), priority, td::Timestamp::in(10.0), std::move(new_promise)); } void CollatorNode::generate_block(ShardIdFull shard, CatchainSeqno cc_seqno, std::vector prev_blocks, - td::Timestamp timeout, td::Promise promise) { + std::optional o_priority, td::Timestamp timeout, + td::Promise promise) { + bool is_external = !o_priority; if (last_masterchain_state_.is_null()) { promise.set_error(td::Status::Error(ErrorCode::notready, "not ready")); return; @@ -379,8 +418,8 @@ void CollatorNode::generate_block(ShardIdFull shard, CatchainSeqno cc_seqno, std promise.set_error(td::Status::Error(ErrorCode::timeout)); return; } - td::actor::send_closure(SelfId, &CollatorNode::generate_block, shard, cc_seqno, std::move(prev_blocks), timeout, - std::move(promise)); + td::actor::send_closure(SelfId, &CollatorNode::generate_block, shard, cc_seqno, std::move(prev_blocks), + std::move(o_priority), timeout, std::move(promise)); }); return; } @@ -399,36 +438,81 @@ void CollatorNode::generate_block(ShardIdFull shard, CatchainSeqno cc_seqno, std return; } + static auto prefix_inner = [] (auto &sb, auto &shard, auto cc_seqno, auto block_seqno, + const std::optional &o_priority) { + sb << "generate block query" + << ": shard=" << shard.to_str() << ", cc_seqno=" << cc_seqno << ", next_block_seqno=" << block_seqno; + if (o_priority) { + sb << " external{"; + sb << "round_offset=" << o_priority->round - o_priority->first_block_round << ",priority=" << o_priority->priority; + sb << ",first_block_round=" << o_priority->first_block_round; + sb << "}"; + } else { + sb << " internal" ; + } + }; + auto prefix = [&] (auto &sb) { + prefix_inner(sb, shard, cc_seqno, block_seqno, o_priority); + }; + auto cache_entry = validator_group_info.cache[prev_blocks]; if (cache_entry == nullptr) { cache_entry = validator_group_info.cache[prev_blocks] = std::make_shared(); } + if (is_external && !cache_entry->has_external_query_at) { + cache_entry->has_external_query_at = td::Timestamp::now(); + if (cache_entry->has_internal_query_at && cache_entry->has_external_query_at) { + FLOG(INFO) { + prefix(sb); + sb << ": got external query " << cache_entry->has_external_query_at.at() - cache_entry->has_internal_query_at.at() + << "s after internal query [WON]"; + }; + } + } + if (!is_external && !cache_entry->has_internal_query_at) { + cache_entry->has_internal_query_at = td::Timestamp::now(); + if (cache_entry->has_internal_query_at && cache_entry->has_external_query_at) { + FLOG(INFO) { + prefix(sb); + sb << ": got internal query " << cache_entry->has_internal_query_at.at() - cache_entry->has_external_query_at.at() + << "s after external query [LOST]"; + }; + } + } if (cache_entry->result) { - LOG(INFO) << "generate block query" - << ": shard=" << shard.to_str() << ", cc_seqno=" << cc_seqno << ", next_block_seqno=" << block_seqno - << ": using cached result"; + auto has_result_ago = td::Timestamp::now().at() - cache_entry->has_result_at.at(); + FLOG(INFO) { + prefix(sb); + sb << ": using cached result " << " generated " << has_result_ago << "s ago"; + sb << (is_external ? " for external query [WON]" : " for internal query "); + }; + promise.set_result(cache_entry->result.value().clone()); return; } cache_entry->promises.push_back(std::move(promise)); + if (cache_entry->started) { - LOG(INFO) << "generate block query" - << ": shard=" << shard.to_str() << ", cc_seqno=" << cc_seqno << ", next_block_seqno=" << block_seqno - << ": collation in progress, waiting"; + FLOG(INFO) { + prefix(sb); + sb << ": collation in progress, waiting"; + }; return; } - LOG(INFO) << "generate block query" - << ": shard=" << shard.to_str() << ", cc_seqno=" << cc_seqno << ", next_block_seqno=" << block_seqno - << ": starting collation"; + FLOG(INFO) { + prefix(sb); + sb << ": starting collation"; + }; cache_entry->started = true; cache_entry->block_seqno = block_seqno; run_collate_query( shard, last_masterchain_state_->get_block_id(), std::move(prev_blocks), Ed25519_PublicKey{td::Bits256::zero()}, last_masterchain_state_->get_validator_set(shard), opts_->get_collator_options(), manager_, timeout, [=, SelfId = actor_id(this), timer = td::Timer{}](td::Result R) { - LOG(INFO) << "generate block result" - << ": shard=" << shard.to_str() << ", cc_seqno=" << cc_seqno << ", next_block_seqno=" << block_seqno - << ", time=" << timer.elapsed() << ": " << (R.is_ok() ? "OK" : R.error().to_string()); + FLOG(INFO) { + prefix_inner(sb, shard, cc_seqno, block_seqno,o_priority); + sb << timer.elapsed() << ": " << (R.is_ok() ? "OK" : R.error().to_string()); + }; td::actor::send_closure(SelfId, &CollatorNode::process_result, cache_entry, std::move(R)); }, cache_entry->cancellation_token_source.get_cancellation_token(), @@ -443,6 +527,7 @@ void CollatorNode::process_result(std::shared_ptr cache_entry, td::R } } else { cache_entry->result = R.move_as_ok(); + cache_entry->has_result_at = td::Timestamp::now(); for (auto& p : cache_entry->promises) { p.set_result(cache_entry->result.value().clone()); } diff --git a/validator/collator-node.hpp b/validator/collator-node.hpp index a361ceb30..54876c35f 100644 --- a/validator/collator-node.hpp +++ b/validator/collator-node.hpp @@ -19,6 +19,7 @@ #include "interfaces/validator-manager.h" #include "rldp/rldp.h" #include +#include namespace ton::validator { @@ -57,6 +58,9 @@ class CollatorNode : public td::actor::Actor { struct CacheEntry { bool started = false; + td::Timestamp has_internal_query_at; + td::Timestamp has_external_query_at; + td::Timestamp has_result_at; BlockSeqno block_seqno = 0; td::optional result; td::CancellationTokenSource cancellation_token_source; @@ -84,7 +88,8 @@ class CollatorNode : public td::actor::Actor { td::Result get_future_validator_group(ShardIdFull shard, CatchainSeqno cc_seqno); void generate_block(ShardIdFull shard, CatchainSeqno cc_seqno, std::vector prev_blocks, - td::Timestamp timeout, td::Promise promise); + std::optional o_priority, td::Timestamp timeout, + td::Promise promise); void process_result(std::shared_ptr cache_entry, td::Result R); public: diff --git a/validator/full-node-fast-sync-overlays.cpp b/validator/full-node-fast-sync-overlays.cpp index 270c53f07..4e0d11e48 100644 --- a/validator/full-node-fast-sync-overlays.cpp +++ b/validator/full-node-fast-sync-overlays.cpp @@ -46,6 +46,33 @@ void FullNodeFastSyncOverlay::process_block_broadcast(PublicKeyHash src, ton_api td::actor::send_closure(full_node_, &FullNode::process_block_broadcast, B.move_as_ok()); } +void FullNodeFastSyncOverlay::process_broadcast(PublicKeyHash src, ton_api::tonNode_outMsgQueueProofBroadcast &query) { + BlockIdExt block_id = create_block_id(query.block_); + ShardIdFull shard_id = create_shard_id(query.dst_shard_); + if (query.proof_->get_id() != ton_api::tonNode_outMsgQueueProof::ID) { + LOG(ERROR) << "got tonNode.outMsgQueueProofBroadcast with proof not tonNode.outMsgQueueProof"; + return; + } + auto tl_proof = move_tl_object_as(query.proof_); + auto R = OutMsgQueueProof::fetch(shard_id, {block_id}, + block::ImportedMsgQueueLimits{.max_bytes = td::uint32(query.limits_->max_bytes_), + .max_msgs = td::uint32(query.limits_->max_msgs_)}, + *tl_proof); + if (R.is_error()) { + LOG(ERROR) << "got tonNode.outMsgQueueProofBroadcast with invalid proof: " << R.error(); + return; + } + if (R.ok().size() != 1) { + LOG(ERROR) << "got tonNode.outMsgQueueProofBroadcast with invalid proofs count=" << R.ok().size(); + return; + } + auto proof = std::move(R.move_as_ok()[0]); + + LOG(INFO) << "got tonNode.outMsgQueueProofBroadcast " << shard_id.to_str() << " " << block_id.to_str(); + td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::add_out_msg_queue_proof, shard_id, + std::move(proof)); +} + void FullNodeFastSyncOverlay::process_broadcast(PublicKeyHash src, ton_api::tonNode_newShardBlockBroadcast &query) { BlockIdExt block_id = create_block_id(query.block_->block_); VLOG(FULL_NODE_DEBUG) << "Received newShardBlockBroadcast in fast sync overlay from " << src << ": " @@ -200,6 +227,22 @@ void FullNodeFastSyncOverlay::collect_validator_telemetry(std::string filename) } } +void FullNodeFastSyncOverlay::send_out_msg_queue_proof_broadcast(td::Ref broadcast) { + if (!inited_) { + return; + } + auto B = create_serialize_tl_object( + create_tl_shard_id(broadcast->dst_shard), create_tl_block_id(broadcast->block_id), + create_tl_object(broadcast->max_bytes, broadcast->max_msgs), + create_tl_object(broadcast->queue_proofs.clone(), + broadcast->block_state_proofs.clone(), + std::vector(broadcast->msg_counts))); + VLOG(FULL_NODE_DEBUG) << "Sending outMsgQueueProof in fast sync overlay: " << broadcast->dst_shard.to_str() << " " + << broadcast->block_id.to_str(); + td::actor::send_closure(overlays_, &overlay::Overlays::send_broadcast_fec_ex, local_id_, overlay_id_, + local_id_.pubkey_hash(), overlay::Overlays::BroadcastFlagAnySender(), std::move(B)); +} + void FullNodeFastSyncOverlay::start_up() { auto X = create_hash_tl_object(zero_state_file_hash_, create_tl_shard_id(shard_)); td::BufferSlice b{32}; diff --git a/validator/full-node-fast-sync-overlays.hpp b/validator/full-node-fast-sync-overlays.hpp index 05d83071d..e0db87b7f 100644 --- a/validator/full-node-fast-sync-overlays.hpp +++ b/validator/full-node-fast-sync-overlays.hpp @@ -25,6 +25,7 @@ class FullNodeFastSyncOverlay : public td::actor::Actor { public: void process_broadcast(PublicKeyHash src, ton_api::tonNode_blockBroadcast& query); void process_broadcast(PublicKeyHash src, ton_api::tonNode_blockBroadcastCompressed& query); + void process_broadcast(PublicKeyHash src, ton_api::tonNode_outMsgQueueProofBroadcast& query); void process_block_broadcast(PublicKeyHash src, ton_api::tonNode_Broadcast& query); void process_broadcast(PublicKeyHash src, ton_api::tonNode_newShardBlockBroadcast& query); @@ -46,6 +47,7 @@ class FullNodeFastSyncOverlay : public td::actor::Actor { void send_broadcast(BlockBroadcast broadcast); void send_block_candidate(BlockIdExt block_id, CatchainSeqno cc_seqno, td::uint32 validator_set_hash, td::BufferSlice data); + void send_out_msg_queue_proof_broadcast(td::Ref broadcast); void send_validator_telemetry(tl_object_ptr telemetry); void collect_validator_telemetry(std::string filename); diff --git a/validator/full-node-shard.hpp b/validator/full-node-shard.hpp index 8ac5185d9..fd1ef943e 100644 --- a/validator/full-node-shard.hpp +++ b/validator/full-node-shard.hpp @@ -151,6 +151,9 @@ class FullNodeShardImpl : public FullNodeShard { void process_broadcast(PublicKeyHash src, ton_api::tonNode_ihrMessageBroadcast &query); void process_broadcast(PublicKeyHash src, ton_api::tonNode_externalMessageBroadcast &query); void process_broadcast(PublicKeyHash src, ton_api::tonNode_newShardBlockBroadcast &query); + void process_broadcast(PublicKeyHash src, ton_api::tonNode_outMsgQueueProofBroadcast &query) { + LOG(ERROR) << "Ignore outMsgQueueProofBroadcast"; + } void process_broadcast(PublicKeyHash src, ton_api::tonNode_newBlockCandidateBroadcast &query); void process_broadcast(PublicKeyHash src, ton_api::tonNode_newBlockCandidateBroadcastCompressed &query); diff --git a/validator/full-node.cpp b/validator/full-node.cpp index 0278c9ae4..e7527d768 100644 --- a/validator/full-node.cpp +++ b/validator/full-node.cpp @@ -371,6 +371,14 @@ void FullNodeImpl::send_block_candidate(BlockIdExt block_id, CatchainSeqno cc_se } } +void FullNodeImpl::send_out_msg_queue_proof_broadcast(td::Ref broadcast) { + auto fast_sync_overlay = fast_sync_overlays_.choose_overlay(broadcast->dst_shard).first; + if (!fast_sync_overlay.empty()) { + td::actor::send_closure(fast_sync_overlay, &FullNodeFastSyncOverlay::send_out_msg_queue_proof_broadcast, + std::move(broadcast)); + } +} + void FullNodeImpl::send_broadcast(BlockBroadcast broadcast, int mode) { if (mode & broadcast_mode_custom) { send_block_broadcast_to_custom_overlays(broadcast); @@ -713,6 +721,9 @@ void FullNodeImpl::start_up() { td::actor::send_closure(id_, &FullNodeImpl::send_block_candidate, block_id, cc_seqno, validator_set_hash, std::move(data)); } + void send_out_msg_queue_proof_broadcast(td::Ref broadcast) override { + td::actor::send_closure(id_, &FullNodeImpl::send_out_msg_queue_proof_broadcast, std::move(broadcast)); + } void send_broadcast(BlockBroadcast broadcast, int mode) override { td::actor::send_closure(id_, &FullNodeImpl::send_broadcast, std::move(broadcast), mode); } diff --git a/validator/full-node.hpp b/validator/full-node.hpp index 9e254d7d4..5533b1f43 100644 --- a/validator/full-node.hpp +++ b/validator/full-node.hpp @@ -70,6 +70,7 @@ class FullNodeImpl : public FullNode { void send_block_candidate(BlockIdExt block_id, CatchainSeqno cc_seqno, td::uint32 validator_set_hash, td::BufferSlice data); void send_broadcast(BlockBroadcast broadcast, int mode); + void send_out_msg_queue_proof_broadcast(td::Ref broadcats); void download_block(BlockIdExt id, td::uint32 priority, td::Timestamp timeout, td::Promise promise); void download_zero_state(BlockIdExt id, td::uint32 priority, td::Timestamp timeout, td::Promise promise); diff --git a/validator/impl/collator-impl.h b/validator/impl/collator-impl.h index 72154f861..0090e95de 100644 --- a/validator/impl/collator-impl.h +++ b/validator/impl/collator-impl.h @@ -237,6 +237,7 @@ class Collator final : public td::actor::Actor { bool store_out_msg_queue_size_ = false; td::PerfWarningTimer perf_timer_; + td::PerfLog perf_log_; // block::Account* lookup_account(td::ConstBitPtr addr) const; std::unique_ptr make_account_from(td::ConstBitPtr addr, Ref account, @@ -252,18 +253,18 @@ class Collator final : public td::actor::Actor { bool fatal_error(int err_code, std::string err_msg); bool fatal_error(std::string err_msg, int err_code = -666); void check_pending(); - void after_get_mc_state(td::Result, BlockIdExt>> res); - void after_get_shard_state(int idx, td::Result> res); - void after_get_block_data(int idx, td::Result> res); - void after_get_shard_blocks(td::Result>> res); + void after_get_mc_state(td::Result, BlockIdExt>> res, td::PerfLogAction token); + void after_get_shard_state(int idx, td::Result> res, td::PerfLogAction token); + void after_get_block_data(int idx, td::Result> res, td::PerfLogAction token); + void after_get_shard_blocks(td::Result>> res, td::PerfLogAction token); bool preprocess_prev_mc_state(); bool register_mc_state(Ref other_mc_state); bool request_aux_mc_state(BlockSeqno seqno, Ref& state); Ref get_aux_mc_state(BlockSeqno seqno) const; - void after_get_aux_shard_state(ton::BlockIdExt blkid, td::Result> res); + void after_get_aux_shard_state(ton::BlockIdExt blkid, td::Result> res, td::PerfLogAction token); bool fix_one_processed_upto(block::MsgProcessedUpto& proc, const ton::ShardIdFull& owner); bool fix_processed_upto(block::MsgProcessedUptoCollection& upto); - void got_neighbor_msg_queues(td::Result>> R); + void got_neighbor_msg_queues(td::Result>> R, td::PerfLogAction token); void got_neighbor_msg_queue(unsigned i, Ref res); void got_out_queue_size(size_t i, td::Result res); bool adjust_shard_config(); @@ -309,7 +310,7 @@ class Collator final : public td::actor::Actor { bool is_our_address(Ref addr_ref) const; bool is_our_address(ton::AccountIdPrefixFull addr_prefix) const; bool is_our_address(const ton::StdSmcAddress& addr) const; - void after_get_external_messages(td::Result, int>>> res); + void after_get_external_messages(td::Result, int>>> res, td::PerfLogAction token); td::Result register_external_message_cell(Ref ext_msg, const ExtMessage::Hash& ext_hash, int priority); // td::Result register_external_message(td::Slice ext_msg_boc); diff --git a/validator/impl/collator.cpp b/validator/impl/collator.cpp index 0054ea98a..cd97e3fcc 100644 --- a/validator/impl/collator.cpp +++ b/validator/impl/collator.cpp @@ -217,25 +217,30 @@ void Collator::start_up() { // 2. learn latest masterchain state and block id LOG(DEBUG) << "sending get_top_masterchain_state_block() to Manager"; ++pending; + auto token = perf_log_.start_action("get_top_masterchain_state_block"); if (!is_hardfork_) { td::actor::send_closure_later(manager, &ValidatorManager::get_top_masterchain_state_block, - [self = get_self()](td::Result, BlockIdExt>> res) { + [self = get_self(), token = std::move(token)]( + td::Result, BlockIdExt>> res) mutable { LOG(DEBUG) << "got answer to get_top_masterchain_state_block"; td::actor::send_closure_later(std::move(self), &Collator::after_get_mc_state, - std::move(res)); + std::move(res), std::move(token)); }); } else { - td::actor::send_closure_later( - manager, &ValidatorManager::get_shard_state_from_db_short, min_mc_block_id, - [self = get_self(), block_id = min_mc_block_id](td::Result> res) { - LOG(DEBUG) << "got answer to get_top_masterchain_state_block"; - if (res.is_error()) { - td::actor::send_closure_later(std::move(self), &Collator::after_get_mc_state, res.move_as_error()); - } else { - td::actor::send_closure_later(std::move(self), &Collator::after_get_mc_state, - std::make_pair(Ref(res.move_as_ok()), block_id)); - } - }); + td::actor::send_closure_later(manager, &ValidatorManager::get_shard_state_from_db_short, min_mc_block_id, + [self = get_self(), block_id = min_mc_block_id, + token = std::move(token)](td::Result> res) mutable { + LOG(DEBUG) << "got answer to get_top_masterchain_state_block"; + if (res.is_error()) { + td::actor::send_closure_later(std::move(self), &Collator::after_get_mc_state, + res.move_as_error(), std::move(token)); + } else { + td::actor::send_closure_later( + std::move(self), &Collator::after_get_mc_state, + std::make_pair(Ref(res.move_as_ok()), block_id), + std::move(token)); + } + }); } } // 3. load previous block(s) and corresponding state(s) @@ -245,23 +250,27 @@ void Collator::start_up() { // 3.1. load state LOG(DEBUG) << "sending wait_block_state() query #" << i << " for " << prev_blocks[i].to_str() << " to Manager"; ++pending; - td::actor::send_closure_later(manager, &ValidatorManager::wait_block_state_short, prev_blocks[i], priority(), - timeout, [self = get_self(), i](td::Result> res) { - LOG(DEBUG) << "got answer to wait_block_state query #" << i; - td::actor::send_closure_later(std::move(self), &Collator::after_get_shard_state, i, - std::move(res)); - }); + auto token = perf_log_.start_action(PSTRING() << "wait_block_state #" << i); + td::actor::send_closure_later( + manager, &ValidatorManager::wait_block_state_short, prev_blocks[i], priority(), timeout, + [self = get_self(), i, token = std::move(token)](td::Result> res) mutable { + LOG(DEBUG) << "got answer to wait_block_state query #" << i; + td::actor::send_closure_later(std::move(self), &Collator::after_get_shard_state, i, std::move(res), + std::move(token)); + }); if (prev_blocks[i].seqno()) { // 3.2. load block // NB: we need the block itself only for extracting start_lt and end_lt to create correct prev_blk:ExtBlkRef and related Merkle proofs LOG(DEBUG) << "sending wait_block_data() query #" << i << " for " << prev_blocks[i].to_str() << " to Manager"; ++pending; - td::actor::send_closure_later(manager, &ValidatorManager::wait_block_data_short, prev_blocks[i], priority(), - timeout, [self = get_self(), i](td::Result> res) { - LOG(DEBUG) << "got answer to wait_block_data query #" << i; - td::actor::send_closure_later(std::move(self), &Collator::after_get_block_data, i, - std::move(res)); - }); + auto token = perf_log_.start_action(PSTRING() << "wait_block_data #" << i); + td::actor::send_closure_later( + manager, &ValidatorManager::wait_block_data_short, prev_blocks[i], priority(), timeout, + [self = get_self(), i, token = std::move(token)](td::Result> res) mutable { + LOG(DEBUG) << "got answer to wait_block_data query #" << i; + td::actor::send_closure_later(std::move(self), &Collator::after_get_block_data, i, std::move(res), + std::move(token)); + }); } } if (is_hardfork_) { @@ -271,22 +280,28 @@ void Collator::start_up() { if (!is_hardfork_) { LOG(DEBUG) << "sending get_external_messages() query to Manager"; ++pending; - td::actor::send_closure_later(manager, &ValidatorManager::get_external_messages, shard_, - [self = get_self()](td::Result, int>>> res) -> void { + auto token = perf_log_.start_action("get_external_messages"); + td::actor::send_closure_later( + manager, &ValidatorManager::get_external_messages, shard_, + [self = get_self(), + token = std::move(token)](td::Result, int>>> res) mutable -> void { LOG(DEBUG) << "got answer to get_external_messages() query"; - td::actor::send_closure_later(std::move(self), &Collator::after_get_external_messages, std::move(res)); + td::actor::send_closure_later(std::move(self), &Collator::after_get_external_messages, std::move(res), + std::move(token)); }); } if (is_masterchain() && !is_hardfork_) { // 5. load shard block info messages LOG(DEBUG) << "sending get_shard_blocks_for_collator() query to Manager"; ++pending; - td::actor::send_closure_later( - manager, &ValidatorManager::get_shard_blocks_for_collator, prev_blocks[0], - [self = get_self()](td::Result>> res) -> void { - LOG(DEBUG) << "got answer to get_shard_blocks_for_collator() query"; - td::actor::send_closure_later(std::move(self), &Collator::after_get_shard_blocks, std::move(res)); - }); + auto token = perf_log_.start_action("get_shard_blocks_for_collator"); + td::actor::send_closure_later(manager, &ValidatorManager::get_shard_blocks_for_collator, prev_blocks[0], + [self = get_self(), token = std::move(token)]( + td::Result>> res) mutable -> void { + LOG(DEBUG) << "got answer to get_shard_blocks_for_collator() query"; + td::actor::send_closure_later(std::move(self), &Collator::after_get_shard_blocks, + std::move(res), std::move(token)); + }); } // 6. set timeout alarm_timestamp() = timeout; @@ -362,6 +377,8 @@ bool Collator::fatal_error(td::Status error) { td::Timestamp::in(10.0), std::move(main_promise), std::move(cancellation_token_), mode_, attempt_idx_ + 1); } else { + LOG(INFO) << "collation failed in " << perf_timer_.elapsed() << " s " << error; + LOG(INFO) << perf_log_; main_promise(std::move(error)); } busy_ = false; @@ -482,12 +499,14 @@ bool Collator::request_aux_mc_state(BlockSeqno seqno, Ref& st CHECK(blkid.is_valid_ext() && blkid.is_masterchain()); LOG(DEBUG) << "sending auxiliary wait_block_state() query for " << blkid.to_str() << " to Manager"; ++pending; - td::actor::send_closure_later(manager, &ValidatorManager::wait_block_state_short, blkid, priority(), timeout, - [self = get_self(), blkid](td::Result> res) { - LOG(DEBUG) << "got answer to wait_block_state query for " << blkid.to_str(); - td::actor::send_closure_later(std::move(self), &Collator::after_get_aux_shard_state, - blkid, std::move(res)); - }); + auto token = perf_log_.start_action(PSTRING() << "auxiliary wait_block_state " << blkid.to_str()); + td::actor::send_closure_later( + manager, &ValidatorManager::wait_block_state_short, blkid, priority(), timeout, + [self = get_self(), blkid, token = std::move(token)](td::Result> res) mutable { + LOG(DEBUG) << "got answer to wait_block_state query for " << blkid.to_str(); + td::actor::send_closure_later(std::move(self), &Collator::after_get_aux_shard_state, blkid, std::move(res), + std::move(token)); + }); state.clear(); return true; } @@ -515,9 +534,11 @@ Ref Collator::get_aux_mc_state(BlockSeqno seqno) const { * @param blkid The BlockIdExt of the shard state. * @param res The result of retrieving the shard state. */ -void Collator::after_get_aux_shard_state(ton::BlockIdExt blkid, td::Result> res) { +void Collator::after_get_aux_shard_state(ton::BlockIdExt blkid, td::Result> res, + td::PerfLogAction token) { LOG(DEBUG) << "in Collator::after_get_aux_shard_state(" << blkid.to_str() << ")"; --pending; + token.finish(res); if (res.is_error()) { fatal_error("cannot load auxiliary masterchain state for "s + blkid.to_str() + " : " + res.move_as_error().to_string()); @@ -579,9 +600,11 @@ bool Collator::preprocess_prev_mc_state() { * * @param res The retrieved masterchain state. */ -void Collator::after_get_mc_state(td::Result, BlockIdExt>> res) { +void Collator::after_get_mc_state(td::Result, BlockIdExt>> res, + td::PerfLogAction token) { LOG(WARNING) << "in Collator::after_get_mc_state()"; --pending; + token.finish(res); if (res.is_error()) { fatal_error(res.move_as_error()); return; @@ -598,12 +621,14 @@ void Collator::after_get_mc_state(td::Result, Bl // NB. it is needed only for creating a correct ExtBlkRef reference to it, which requires start_lt and end_lt LOG(DEBUG) << "sending wait_block_data() query #-1 for " << mc_block_id_.to_str() << " to Manager"; ++pending; - td::actor::send_closure_later(manager, &ValidatorManager::wait_block_data_short, mc_block_id_, priority(), timeout, - [self = get_self()](td::Result> res) { - LOG(DEBUG) << "got answer to wait_block_data query #-1"; - td::actor::send_closure_later(std::move(self), &Collator::after_get_block_data, -1, - std::move(res)); - }); + auto token = perf_log_.start_action("wait_block_data #-1"); + td::actor::send_closure_later( + manager, &ValidatorManager::wait_block_data_short, mc_block_id_, priority(), timeout, + [self = get_self(), token = std::move(token)](td::Result> res) mutable { + LOG(DEBUG) << "got answer to wait_block_data query #-1"; + td::actor::send_closure_later(std::move(self), &Collator::after_get_block_data, -1, std::move(res), + std::move(token)); + }); } check_pending(); } @@ -614,9 +639,10 @@ void Collator::after_get_mc_state(td::Result, Bl * @param idx The index of the previous shard block (0 or 1). * @param res The retrieved shard state. */ -void Collator::after_get_shard_state(int idx, td::Result> res) { +void Collator::after_get_shard_state(int idx, td::Result> res, td::PerfLogAction token) { LOG(WARNING) << "in Collator::after_get_shard_state(" << idx << ")"; --pending; + token.finish(res); if (res.is_error()) { fatal_error(res.move_as_error()); return; @@ -647,9 +673,10 @@ void Collator::after_get_shard_state(int idx, td::Result> res) { * @param idx The index of the previous block (0 or 1). * @param res The retreved block data. */ -void Collator::after_get_block_data(int idx, td::Result> res) { +void Collator::after_get_block_data(int idx, td::Result> res, td::PerfLogAction token) { LOG(DEBUG) << "in Collator::after_get_block_data(" << idx << ")"; --pending; + token.finish(res); if (res.is_error()) { fatal_error(res.move_as_error()); return; @@ -691,8 +718,10 @@ void Collator::after_get_block_data(int idx, td::Result> res) { * * @param res The retrieved shard block descriptions. */ -void Collator::after_get_shard_blocks(td::Result>> res) { +void Collator::after_get_shard_blocks(td::Result>> res, + td::PerfLogAction token) { --pending; + token.finish(res); if (res.is_error()) { fatal_error(res.move_as_error()); return; @@ -848,10 +877,13 @@ bool Collator::request_neighbor_msg_queues() { ++i; } ++pending; + auto token = perf_log_.start_action("neighbor_msg_queues"); td::actor::send_closure_later( manager, &ValidatorManager::wait_neighbor_msg_queue_proofs, shard_, std::move(top_blocks), timeout, - [self = get_self()](td::Result>> res) { - td::actor::send_closure_later(std::move(self), &Collator::got_neighbor_msg_queues, std::move(res)); + [self = get_self(), + token = std::move(token)](td::Result>> res) mutable { + td::actor::send_closure_later(std::move(self), &Collator::got_neighbor_msg_queues, std::move(res), + std::move(token)); }); return true; } @@ -883,13 +915,15 @@ bool Collator::request_out_msg_queue_size() { * @param i The index of the neighbor. * @param res The obtained outbound queue. */ -void Collator::got_neighbor_msg_queues(td::Result>> R) { +void Collator::got_neighbor_msg_queues(td::Result>> R, + td::PerfLogAction token) { --pending; + double duration = token.finish(R); if (R.is_error()) { fatal_error(R.move_as_error_prefix("failed to get neighbor msg queues: ")); return; } - LOG(INFO) << "neighbor output queues fetched"; + LOG(INFO) << "neighbor output queues fetched, took " << duration << "s"; auto res = R.move_as_ok(); unsigned i = 0; for (block::McShardDescr& descr : neighbors_) { @@ -2091,12 +2125,9 @@ bool Collator::init_lt() { * @returns True if the configuration parameters were successfully fetched and initialized, false otherwise. */ bool Collator::fetch_config_params() { - auto res = block::FetchConfigParams::fetch_config_params(*config_, - &old_mparams_, &storage_prices_, &storage_phase_cfg_, - &rand_seed_, &compute_phase_cfg_, &action_phase_cfg_, - &masterchain_create_fee_, &basechain_create_fee_, - workchain(), now_ - ); + auto res = block::FetchConfigParams::fetch_config_params( + *config_, &old_mparams_, &storage_prices_, &storage_phase_cfg_, &rand_seed_, &compute_phase_cfg_, + &action_phase_cfg_, &masterchain_create_fee_, &basechain_create_fee_, workchain(), now_); if (res.is_error()) { return fatal_error(res.move_as_error()); } @@ -2217,6 +2248,11 @@ bool Collator::init_value_create() { bool Collator::do_collate() { // After do_collate started it will not be interrupted by timeout alarm_timestamp() = td::Timestamp::never(); + auto token = perf_log_.start_action("do_collate"); + td::Status status = td::Status::Error("some error"); + SCOPE_EXIT { + token.finish(status); + }; LOG(WARNING) << "do_collate() : start"; if (!fetch_config_params()) { @@ -2342,6 +2378,7 @@ bool Collator::do_collate() { if (!create_block_candidate()) { return fatal_error("cannot serialize a new Block candidate"); } + status = td::Status::OK(); return true; } @@ -5811,12 +5848,43 @@ bool Collator::create_block_candidate() { << block_limit_status_->transactions; LOG(INFO) << "serialized collated data size " << cdata_slice.size() << " bytes (preliminary estimate was " << block_limit_status_->collated_data_stat.estimate_proof_size() << ")"; + auto new_block_id_ext = ton::BlockIdExt{ton::BlockId{shard_, new_block_seqno}, new_block->get_hash().bits(), + block::compute_file_hash(blk_slice.as_slice())}; // 3. create a BlockCandidate - block_candidate = std::make_unique( - created_by_, - ton::BlockIdExt{ton::BlockId{shard_, new_block_seqno}, new_block->get_hash().bits(), - block::compute_file_hash(blk_slice.as_slice())}, - block::compute_file_hash(cdata_slice.as_slice()), blk_slice.clone(), cdata_slice.clone()); + block_candidate = + std::make_unique(created_by_, new_block_id_ext, block::compute_file_hash(cdata_slice.as_slice()), + blk_slice.clone(), cdata_slice.clone()); + const bool need_out_msg_queue_broadcasts = true; + if (need_out_msg_queue_broadcasts) { + // we can't generate two proofs at the same time for the same root (it is not currently supported by cells) + // so we have can't reuse new state and have to regenerate it with merkle update + auto new_state = vm::MerkleUpdate::apply(prev_state_root_pure_, state_update); + CHECK(new_state.not_null()); + CHECK(new_state->get_hash() == state_root->get_hash()); + assert(config_ && shard_conf_); + auto neighbor_list = shard_conf_->get_neighbor_shard_hash_ids(shard_); + LOG(INFO) << "Build OutMsgQueueProofs for " << neighbor_list.size() << " neighbours"; + for (ton::BlockId blk_id : neighbor_list) { + auto prefix = blk_id.shard_full(); + auto limits = mc_state_->get_imported_msg_queue_limits(blk_id.workchain); + + // one could use monitor_min_split_depth here, to decrease number of broadcasts + // but current implementation OutMsgQueueImporter doesn't support it + + auto r_proof = OutMsgQueueProof::build( + prefix, {OutMsgQueueProof::OneBlock{.id = new_block_id_ext, .state_root = new_state, .block_root = new_block}}, limits); + if (r_proof.is_ok()) { + auto proof = r_proof.move_as_ok(); + block_candidate->out_msg_queue_proof_broadcasts.push_back(td::Ref( + true, OutMsgQueueProofBroadcast(prefix, new_block_id_ext, limits.max_bytes, limits.max_msgs, + std::move(proof->queue_proofs_), std::move(proof->block_state_proofs_), + std::move(proof->msg_counts_)))); + } else { + LOG(ERROR) << "Failed to build OutMsgQueueProof: " << r_proof.error(); + } + } + } + // 3.1 check block and collated data size auto consensus_config = config_->get_consensus_config(); if (block_candidate->data.size() > consensus_config.max_block_size) { @@ -5896,6 +5964,7 @@ void Collator::return_block_candidate(td::Result saved) { CHECK(block_candidate); LOG(WARNING) << "sending new BlockCandidate to Promise"; LOG(WARNING) << "collation took " << perf_timer_.elapsed() << " s"; + LOG(WARNING) << perf_log_; main_promise(block_candidate->clone()); busy_ = false; stop(); @@ -5974,9 +6043,11 @@ td::Result Collator::register_external_message_cell(Ref ext_msg, * * @param res The result of the external message retrieval operation. */ -void Collator::after_get_external_messages(td::Result, int>>> res) { +void Collator::after_get_external_messages(td::Result, int>>> res, + td::PerfLogAction token) { // res: pair {ext msg, priority} --pending; + token.finish(res); if (res.is_error()) { fatal_error(res.move_as_error()); return; diff --git a/validator/impl/out-msg-queue-proof.cpp b/validator/impl/out-msg-queue-proof.cpp index 444d5846f..4e8802c7e 100644 --- a/validator/impl/out-msg-queue-proof.cpp +++ b/validator/impl/out-msg-queue-proof.cpp @@ -320,13 +320,23 @@ void OutMsgQueueImporter::get_neighbor_msg_queue_proofs( if (prefix.pfx_len() > min_split) { prefix = shard_prefix(prefix, min_split); } - new_queries[prefix].push_back(block); + + LOG(INFO) << "search for out msg queue proof " << prefix.to_str() << block.to_str(); + auto& small_entry = small_cache_[std::make_pair(dst_shard, block)]; + if (!small_entry.result.is_null()) { + entry->result[block] = small_entry.result; + entry->from_small_cache++; + alarm_timestamp().relax(small_entry.timeout = td::Timestamp::in(CACHE_TTL)); + } else { + small_entry.pending_entries.push_back(entry); + ++entry->pending; + new_queries[prefix].push_back(block); + } } }; auto limits = last_masterchain_state_->get_imported_msg_queue_limits(dst_shard.workchain); for (auto& p : new_queries) { for (size_t i = 0; i < p.second.size(); i += 16) { - ++entry->pending; size_t j = std::min(i + 16, p.second.size()); get_proof_import(entry, std::vector(p.second.begin() + i, p.second.begin() + j), limits * (td::uint32)(j - i)); @@ -355,7 +365,7 @@ void OutMsgQueueImporter::get_proof_local(std::shared_ptr entry, Blo if (block.seqno() == 0) { std::vector> proof = { td::Ref(true, block, state->root_cell(), td::Ref{})}; - td::actor::send_closure(SelfId, &OutMsgQueueImporter::got_proof, entry, std::move(proof)); + td::actor::send_closure(SelfId, &OutMsgQueueImporter::got_proof, entry, std::move(proof), ProofSource::Local); return; } td::actor::send_closure( @@ -371,7 +381,8 @@ void OutMsgQueueImporter::get_proof_local(std::shared_ptr entry, Blo Ref block_state_proof = create_block_state_proof(R.ok()->root_cell()).move_as_ok(); std::vector> proof = { td::Ref(true, block, state->root_cell(), std::move(block_state_proof))}; - td::actor::send_closure(SelfId, &OutMsgQueueImporter::got_proof, entry, std::move(proof)); + td::actor::send_closure(SelfId, &OutMsgQueueImporter::got_proof, entry, std::move(proof), + ProofSource::Local); }); }); } @@ -395,27 +406,70 @@ void OutMsgQueueImporter::get_proof_import(std::shared_ptr entry, st retry_after); return; } - td::actor::send_closure(SelfId, &OutMsgQueueImporter::got_proof, entry, R.move_as_ok()); + td::actor::send_closure(SelfId, &OutMsgQueueImporter::got_proof, entry, R.move_as_ok(), ProofSource::Query); }); } -void OutMsgQueueImporter::got_proof(std::shared_ptr entry, std::vector> proofs) { +void OutMsgQueueImporter::got_proof(std::shared_ptr entry, std::vector> proofs, + ProofSource proof_source) { if (!check_timeout(entry)) { return; } + // TODO: maybe save proof to small cache? It would allow other queries to reuse this result for (auto& p : proofs) { - entry->result[p->block_id_] = std::move(p); - } - CHECK(entry->pending > 0); - if (--entry->pending == 0) { - finish_query(entry); + auto block_id = p->block_id_; + if (entry->result.emplace(block_id, std::move(p)).second) { + CHECK(entry->pending > 0); + switch (proof_source) { + case ProofSource::SmallCache: + entry->from_small_cache++; + break; + case ProofSource::Broadcast: + entry->from_broadcast++; + break; + case ProofSource::Query: + entry->from_query++; + break; + case ProofSource::Local: + entry->from_local++; + break; + } + if (--entry->pending == 0) { + finish_query(entry); + } + } } } void OutMsgQueueImporter::finish_query(std::shared_ptr entry) { - LOG(DEBUG) << "Done importing neighbor msg queues for shard " << entry->dst_shard.to_str() << ", " - << entry->blocks.size() << " blocks in " << entry->timer.elapsed() << "s"; + FLOG(INFO) { + sb << "Done importing neighbor msg queues for shard " << entry->dst_shard.to_str() << ", " << entry->blocks.size() + << " blocks in " << entry->timer.elapsed() << "s"; + sb << " sources{"; + if (entry->from_broadcast) { + sb << " broadcast=" << entry->from_broadcast; + } + if (entry->from_small_cache) { + sb << " small_cache=" << entry->from_small_cache; + } + if (entry->from_local) { + sb << " local=" << entry->from_local; + } + if (entry->from_query) { + sb << " query=" << entry->from_query; + } + sb << "}"; + + if (!small_cache_.empty()) { + sb << " small_cache_size=" << small_cache_.size(); + } + if (!cache_.empty()) { + sb << " cache_size=" << cache_.size(); + } + }; + entry->done = true; + CHECK(entry->blocks.size() == entry->result.size()); alarm_timestamp().relax(entry->timeout = td::Timestamp::in(CACHE_TTL)); for (auto& p : entry->promises) { p.first.set_result(entry->result); @@ -441,8 +495,7 @@ bool OutMsgQueueImporter::check_timeout(std::shared_ptr entry) { } void OutMsgQueueImporter::alarm() { - auto it = cache_.begin(); - while (it != cache_.end()) { + for (auto it = cache_.begin(); it != cache_.end();) { auto& promises = it->second->promises; if (it->second->timeout.is_in_past()) { if (!it->second->done) { @@ -469,6 +522,36 @@ void OutMsgQueueImporter::alarm() { promises.resize(j); ++it; } + + for (auto it = small_cache_.begin(); it != small_cache_.end();) { + td::remove_if(it->second.pending_entries, + [](const std::shared_ptr& entry) { return entry->done || entry->promises.empty(); }); + if (it->second.timeout.is_in_past()) { + if (it->second.pending_entries.empty()) { + it = small_cache_.erase(it); + } else { + ++it; + } + } else { + alarm_timestamp().relax(it->second.timeout); + ++it; + } + } +} + +void OutMsgQueueImporter::add_out_msg_queue_proof(ShardIdFull dst_shard, td::Ref proof) { + LOG(INFO) << "add out msg queue proof " << dst_shard.to_str() << proof->block_id_.to_str(); + auto& small_entry = small_cache_[std::make_pair(dst_shard, proof->block_id_)]; + if (!small_entry.result.is_null()) { + return; + } + alarm_timestamp().relax(small_entry.timeout = td::Timestamp::in(CACHE_TTL)); + small_entry.result = proof; + CHECK(proof.not_null()); + auto pending_entries = std::move(small_entry.pending_entries); + for (auto& entry : pending_entries) { + got_proof(entry, {proof}, ProofSource::Broadcast); + } } void BuildOutMsgQueueProof::abort_query(td::Status reason) { diff --git a/validator/impl/out-msg-queue-proof.hpp b/validator/impl/out-msg-queue-proof.hpp index c115a2763..6f4c08d1d 100644 --- a/validator/impl/out-msg-queue-proof.hpp +++ b/validator/impl/out-msg-queue-proof.hpp @@ -41,6 +41,7 @@ class OutMsgQueueImporter : public td::actor::Actor { void new_masterchain_block_notification(td::Ref state, std::set collating_shards); void get_neighbor_msg_queue_proofs(ShardIdFull dst_shard, std::vector blocks, td::Timestamp timeout, td::Promise>> promise); + void add_out_msg_queue_proof(ShardIdFull dst_shard, td::Ref proof); void update_options(td::Ref opts) { opts_ = std::move(opts); @@ -62,13 +63,28 @@ class OutMsgQueueImporter : public td::actor::Actor { td::Timestamp timeout = td::Timestamp::never(); td::Timer timer; size_t pending = 0; + size_t from_small_cache = 0; + size_t from_broadcast = 0; + size_t from_query = 0; + size_t from_local = 0; }; std::map>, std::shared_ptr> cache_; + // This cache has smaller granularity, proof is stored for each block separately + struct SmallCacheEntry { + td::Ref result; + std::vector> pending_entries; + td::Timestamp timeout = td::Timestamp::never(); + }; + std::map, SmallCacheEntry> small_cache_; + void get_proof_local(std::shared_ptr entry, BlockIdExt block); void get_proof_import(std::shared_ptr entry, std::vector blocks, block::ImportedMsgQueueLimits limits); - void got_proof(std::shared_ptr entry, std::vector> proofs); + enum class ProofSource { + SmallCache, Broadcast, Query, Local + }; + void got_proof(std::shared_ptr entry, std::vector> proofs, ProofSource proof_source); void finish_query(std::shared_ptr entry); bool check_timeout(std::shared_ptr entry); diff --git a/validator/manager.cpp b/validator/manager.cpp index d4ea68101..3ff33d234 100644 --- a/validator/manager.cpp +++ b/validator/manager.cpp @@ -1439,6 +1439,11 @@ void ValidatorManagerImpl::set_block_candidate(BlockIdExt id, BlockCandidate can if (!id.is_masterchain()) { add_cached_block_candidate(ReceivedBlock{id, candidate.data.clone()}); } + LOG(INFO) << "Got candidate " << id.to_str() << " with " << candidate.out_msg_queue_proof_broadcasts.size() + << " out msg queue proof broadcasts"; + for (auto broadcast : candidate.out_msg_queue_proof_broadcasts) { + callback_->send_out_msg_queue_proof_broadcast(broadcast); + } td::actor::send_closure(db_, &Db::store_block_candidate, std::move(candidate), std::move(promise)); } @@ -3519,7 +3524,7 @@ void ValidatorManagerImpl::del_collator(adnl::AdnlNodeIdShort id, ShardIdFull sh } else { td::actor::send_closure(it->second.actor, &CollatorNode::del_shard, shard); } -} +}; void ValidatorManagerImpl::get_collation_manager_stats( td::Promise> promise) { @@ -3569,6 +3574,16 @@ void ValidatorManagerImpl::get_collation_manager_stats( td::actor::send_closure(callback, &Cb::dec_pending); } +void ValidatorManagerImpl::add_out_msg_queue_proof(ShardIdFull dst_shard, td::Ref proof) { + if (!collator_nodes_.empty()) { + if (out_msg_queue_importer_.empty()) { + out_msg_queue_importer_ = td::actor::create_actor("outmsgqueueimporter", actor_id(this), + opts_, last_masterchain_state_); + } + td::actor::send_closure(out_msg_queue_importer_, &OutMsgQueueImporter::add_out_msg_queue_proof, dst_shard, + std::move(proof)); + } +} void ValidatorManagerImpl::add_persistent_state_description(td::Ref desc) { auto now = (UnixTime)td::Clocks::system(); if (desc->end_time <= now) { diff --git a/validator/manager.hpp b/validator/manager.hpp index c0d74456f..649d51014 100644 --- a/validator/manager.hpp +++ b/validator/manager.hpp @@ -640,6 +640,7 @@ class ValidatorManagerImpl : public ValidatorManager { void add_collator(adnl::AdnlNodeIdShort id, ShardIdFull shard) override; void del_collator(adnl::AdnlNodeIdShort id, ShardIdFull shard) override; + void add_out_msg_queue_proof(ShardIdFull dst_shard, td::Ref proof) override; void get_collation_manager_stats( td::Promise> promise) override; diff --git a/validator/validator-group.cpp b/validator/validator-group.cpp index f95266b33..344f5c1c3 100644 --- a/validator/validator-group.cpp +++ b/validator/validator-group.cpp @@ -66,8 +66,14 @@ void ValidatorGroup::generate_block_candidate( std::move(R)); }; td::uint64 max_answer_size = config_.max_block_size + config_.max_collated_data_size + 1024; + auto block_candidate_priority = BlockCandidatePriority{ + .round = source_info.round, + .first_block_round = source_info.first_block_round, + .priority = source_info.source_priority + }; td::actor::send_closure(collation_manager_, &CollationManager::collate_block, shard_, min_masterchain_block_id_, - prev_block_ids_, Ed25519_PublicKey{local_id_full_.ed25519_value().raw()}, validator_set_, + prev_block_ids_, Ed25519_PublicKey{local_id_full_.ed25519_value().raw()}, + block_candidate_priority, validator_set_, max_answer_size, cancellation_token_source_.get_cancellation_token(), std::move(P)); } diff --git a/validator/validator.h b/validator/validator.h index 2cd771021..c4a779cfd 100644 --- a/validator/validator.h +++ b/validator/validator.h @@ -198,6 +198,9 @@ class ValidatorManagerInterface : public td::actor::Actor { virtual void send_block_candidate(BlockIdExt block_id, CatchainSeqno cc_seqno, td::uint32 validator_set_hash, td::BufferSlice data) = 0; virtual void send_broadcast(BlockBroadcast broadcast, int mode) = 0; + virtual void send_out_msg_queue_proof_broadcast(td::Ref broadcats) { + LOG(ERROR) << "Unimplemented send_out_msg_queue_proof_broadcast - ignore broadcast"; + } virtual void download_block(BlockIdExt block_id, td::uint32 priority, td::Timestamp timeout, td::Promise promise) = 0; virtual void download_zero_state(BlockIdExt block_id, td::uint32 priority, td::Timestamp timeout, @@ -325,6 +328,10 @@ class ValidatorManagerInterface : public td::actor::Actor { virtual void add_collator(adnl::AdnlNodeIdShort id, ShardIdFull shard) = 0; virtual void del_collator(adnl::AdnlNodeIdShort id, ShardIdFull shard) = 0; + virtual void add_out_msg_queue_proof(ShardIdFull dst_shard, td::Ref proof) { + LOG(ERROR) << "Unimplemented add_out_msg_queu_proof - ignore broadcast"; + } + virtual void get_collation_manager_stats( td::Promise> promise) = 0; }; From 5d79855c94711ad161b41b75c18edff5396be842 Mon Sep 17 00:00:00 2001 From: SpyCheese Date: Wed, 27 Nov 2024 18:12:23 +0300 Subject: [PATCH 2/5] Out msg queues: improve logs, various small changes --- ton/ton-types.h | 15 ++--- validator-session/validator-session-types.h | 3 +- validator-session/validator-session.cpp | 18 ++--- validator/full-node-fast-sync-overlays.cpp | 13 ++-- validator/full-node-shard.cpp | 9 ++- validator/impl/collator.cpp | 18 +++-- validator/impl/out-msg-queue-proof.cpp | 75 +++++++++++---------- validator/manager.cpp | 16 +++-- validator/validator-group.cpp | 18 ++--- 9 files changed, 106 insertions(+), 79 deletions(-) diff --git a/ton/ton-types.h b/ton/ton-types.h index aeb0595ad..f8eb49df1 100644 --- a/ton/ton-types.h +++ b/ton/ton-types.h @@ -428,15 +428,14 @@ struct Ed25519_PublicKey { struct OutMsgQueueProofBroadcast : public td::CntObject { OutMsgQueueProofBroadcast(ShardIdFull dst_shard, BlockIdExt block_id, td::int32 max_bytes, td::int32 max_msgs, - td::BufferSlice queue_proofs, td::BufferSlice block_state_proofs, - std::vector msg_counts) + td::BufferSlice queue_proof, td::BufferSlice block_state_proof, int msg_count) : dst_shard(std::move(dst_shard)) , block_id(block_id) , max_bytes(max_bytes) , max_msgs(max_msgs) - , queue_proofs(std::move(queue_proofs)) - , block_state_proofs(std::move(block_state_proofs)) - , msg_counts(std::move(msg_counts)) { + , queue_proofs(std::move(queue_proof)) + , block_state_proofs(std::move(block_state_proof)) + , msg_count(std::move(msg_count)) { } ShardIdFull dst_shard; BlockIdExt block_id; @@ -448,11 +447,11 @@ struct OutMsgQueueProofBroadcast : public td::CntObject { // outMsgQueueProof td::BufferSlice queue_proofs; td::BufferSlice block_state_proofs; - std::vector msg_counts; + int msg_count; - virtual OutMsgQueueProofBroadcast* make_copy() const { + OutMsgQueueProofBroadcast* make_copy() const override { return new OutMsgQueueProofBroadcast(dst_shard, block_id, max_bytes, max_msgs, queue_proofs.clone(), - block_state_proofs.clone(), msg_counts); + block_state_proofs.clone(), msg_count); } }; diff --git a/validator-session/validator-session-types.h b/validator-session/validator-session-types.h index 7147bf2d3..2edc42bbd 100644 --- a/validator-session/validator-session-types.h +++ b/validator-session/validator-session-types.h @@ -179,9 +179,8 @@ struct EndValidatorGroupStats { }; struct BlockSourceInfo { - td::uint32 round, first_block_round; PublicKey source; - td::int32 source_priority; + BlockCandidatePriority priority; }; } // namespace validatorsession diff --git a/validator-session/validator-session.cpp b/validator-session/validator-session.cpp index 3a913990c..99ee61f23 100644 --- a/validator-session/validator-session.cpp +++ b/validator-session/validator-session.cpp @@ -553,9 +553,9 @@ void ValidatorSessionImpl::check_generate_slot() { LOG(WARNING) << print_id << ": failed to generate block candidate: " << R.move_as_error(); } }); - callback_->on_generate_slot( - BlockSourceInfo{cur_round_, first_block_round_, description().get_source_public_key(local_idx()), priority}, - std::move(P)); + callback_->on_generate_slot(BlockSourceInfo{description().get_source_public_key(local_idx()), + BlockCandidatePriority{cur_round_, first_block_round_, priority}}, + std::move(P)); } else { alarm_timestamp().relax(t); } @@ -634,8 +634,9 @@ void ValidatorSessionImpl::try_approve_block(const SentBlock *block) { pending_approve_.insert(block_id); callback_->on_candidate( - BlockSourceInfo{cur_round_, first_block_round_, description().get_source_public_key(block->get_src_idx()), - description().get_node_priority(block->get_src_idx(), cur_round_)}, + BlockSourceInfo{description().get_source_public_key(block->get_src_idx()), + BlockCandidatePriority{cur_round_, first_block_round_, + description().get_node_priority(block->get_src_idx(), cur_round_)}}, B->root_hash_, B->data_.clone(), B->collated_data_.clone(), std::move(P)); } else if (T.is_in_past()) { if (!active_requests_.count(block_id)) { @@ -909,9 +910,10 @@ void ValidatorSessionImpl::on_new_round(td::uint32 round) { stats.rounds.pop_back(); } - BlockSourceInfo source_info{cur_round_, first_block_round_, - description().get_source_public_key(block->get_src_idx()), - description().get_node_priority(block->get_src_idx(), cur_round_)}; + BlockSourceInfo source_info{ + description().get_source_public_key(block->get_src_idx()), + BlockCandidatePriority{cur_round_, first_block_round_, + description().get_node_priority(block->get_src_idx(), cur_round_)}}; if (it == blocks_.end()) { callback_->on_block_committed(std::move(source_info), block->get_root_hash(), block->get_file_hash(), td::BufferSlice(), std::move(export_sigs), std::move(export_approve_sigs), diff --git a/validator/full-node-fast-sync-overlays.cpp b/validator/full-node-fast-sync-overlays.cpp index 4e0d11e48..947f9dee8 100644 --- a/validator/full-node-fast-sync-overlays.cpp +++ b/validator/full-node-fast-sync-overlays.cpp @@ -47,6 +47,9 @@ void FullNodeFastSyncOverlay::process_block_broadcast(PublicKeyHash src, ton_api } void FullNodeFastSyncOverlay::process_broadcast(PublicKeyHash src, ton_api::tonNode_outMsgQueueProofBroadcast &query) { + if (src == local_id_.pubkey_hash()) { + return; // dropping broadcast from self + } BlockIdExt block_id = create_block_id(query.block_); ShardIdFull shard_id = create_shard_id(query.dst_shard_); if (query.proof_->get_id() != ton_api::tonNode_outMsgQueueProof::ID) { @@ -68,7 +71,8 @@ void FullNodeFastSyncOverlay::process_broadcast(PublicKeyHash src, ton_api::tonN } auto proof = std::move(R.move_as_ok()[0]); - LOG(INFO) << "got tonNode.outMsgQueueProofBroadcast " << shard_id.to_str() << " " << block_id.to_str(); + LOG(INFO) << "got tonNode.outMsgQueueProofBroadcast to " << shard_id.to_str() << " from " << block_id.to_str() + << ", msgs=" << proof->msg_count_ << ", size=" << tl_proof->queue_proofs_.size(); td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::add_out_msg_queue_proof, shard_id, std::move(proof)); } @@ -236,9 +240,10 @@ void FullNodeFastSyncOverlay::send_out_msg_queue_proof_broadcast(td::Ref(broadcast->max_bytes, broadcast->max_msgs), create_tl_object(broadcast->queue_proofs.clone(), broadcast->block_state_proofs.clone(), - std::vector(broadcast->msg_counts))); - VLOG(FULL_NODE_DEBUG) << "Sending outMsgQueueProof in fast sync overlay: " << broadcast->dst_shard.to_str() << " " - << broadcast->block_id.to_str(); + std::vector(1, broadcast->msg_count))); + VLOG(FULL_NODE_DEBUG) << "Sending outMsgQueueProof in fast sync overlay to " << broadcast->dst_shard.to_str() + << " from " << broadcast->block_id.to_str() << ", msgs=" << broadcast->msg_count + << " bytes=" << broadcast->queue_proofs.size(); td::actor::send_closure(overlays_, &overlay::Overlays::send_broadcast_fec_ex, local_id_, overlay_id_, local_id_.pubkey_hash(), overlay::Overlays::BroadcastFlagAnySender(), std::move(B)); } diff --git a/validator/full-node-shard.cpp b/validator/full-node-shard.cpp index 29950363d..c351653f4 100644 --- a/validator/full-node-shard.cpp +++ b/validator/full-node-shard.cpp @@ -703,8 +703,13 @@ void FullNodeShardImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::tonNod promise.set_result(serialize_tl_object(R.move_as_ok(), true)); } }); - VLOG(FULL_NODE_DEBUG) << "Got query getOutMsgQueueProof (" << blocks.size() << " blocks) to shard " - << dst_shard.to_str() << " from " << src; + FLOG(DEBUG) { + sb << "Got query getOutMsgQueueProof to shard " << dst_shard.to_str() << " from blocks"; + for (const BlockIdExt &id : blocks) { + sb << " " << id.id.to_str(); + } + sb << " from " << src; + }; td::actor::create_actor("buildqueueproof", dst_shard, std::move(blocks), limits, validator_manager_, std::move(P)) .release(); diff --git a/validator/impl/collator.cpp b/validator/impl/collator.cpp index cd97e3fcc..dd8d55169 100644 --- a/validator/impl/collator.cpp +++ b/validator/impl/collator.cpp @@ -5854,33 +5854,39 @@ bool Collator::create_block_candidate() { block_candidate = std::make_unique(created_by_, new_block_id_ext, block::compute_file_hash(cdata_slice.as_slice()), blk_slice.clone(), cdata_slice.clone()); - const bool need_out_msg_queue_broadcasts = true; + const bool need_out_msg_queue_broadcasts = !is_masterchain(); if (need_out_msg_queue_broadcasts) { // we can't generate two proofs at the same time for the same root (it is not currently supported by cells) // so we have can't reuse new state and have to regenerate it with merkle update auto new_state = vm::MerkleUpdate::apply(prev_state_root_pure_, state_update); CHECK(new_state.not_null()); CHECK(new_state->get_hash() == state_root->get_hash()); - assert(config_ && shard_conf_); + CHECK(shard_conf_); auto neighbor_list = shard_conf_->get_neighbor_shard_hash_ids(shard_); LOG(INFO) << "Build OutMsgQueueProofs for " << neighbor_list.size() << " neighbours"; - for (ton::BlockId blk_id : neighbor_list) { + for (BlockId blk_id : neighbor_list) { auto prefix = blk_id.shard_full(); + if (shard_intersects(prefix, shard_)) { + continue; + } auto limits = mc_state_->get_imported_msg_queue_limits(blk_id.workchain); // one could use monitor_min_split_depth here, to decrease number of broadcasts // but current implementation OutMsgQueueImporter doesn't support it auto r_proof = OutMsgQueueProof::build( - prefix, {OutMsgQueueProof::OneBlock{.id = new_block_id_ext, .state_root = new_state, .block_root = new_block}}, limits); + prefix, + {OutMsgQueueProof::OneBlock{.id = new_block_id_ext, .state_root = new_state, .block_root = new_block}}, + limits); if (r_proof.is_ok()) { auto proof = r_proof.move_as_ok(); + CHECK(proof->msg_counts_.size() == 1); block_candidate->out_msg_queue_proof_broadcasts.push_back(td::Ref( true, OutMsgQueueProofBroadcast(prefix, new_block_id_ext, limits.max_bytes, limits.max_msgs, std::move(proof->queue_proofs_), std::move(proof->block_state_proofs_), - std::move(proof->msg_counts_)))); + proof->msg_counts_[0]))); } else { - LOG(ERROR) << "Failed to build OutMsgQueueProof: " << r_proof.error(); + LOG(ERROR) << "Failed to build OutMsgQueueProof to " << prefix.to_str() << ": " << r_proof.error(); } } } diff --git a/validator/impl/out-msg-queue-proof.cpp b/validator/impl/out-msg-queue-proof.cpp index 4e8802c7e..edd20a8d2 100644 --- a/validator/impl/out-msg-queue-proof.cpp +++ b/validator/impl/out-msg-queue-proof.cpp @@ -92,29 +92,6 @@ static td::Result> process_queue( ++msg_count[kv->source]; ++msg_count_total; - // TODO: Get processed_upto from destination shard (in request?) - /* - // Parse message to check if it was processed (as in Collator::process_inbound_message) - ton::LogicalTime enqueued_lt = kv->msg->prefetch_ulong(64); - auto msg_env = kv->msg->prefetch_ref(); - block::tlb::MsgEnvelope::Record_std env; - if (!tlb::unpack_cell(msg_env, env)) { - return td::Status::Error("cannot unpack MsgEnvelope of an internal message"); - } - vm::CellSlice cs{vm::NoVmOrd{}, env.msg}; - block::gen::CommonMsgInfo::Record_int_msg_info info; - if (!tlb::unpack(cs, info)) { - return td::Status::Error("cannot unpack CommonMsgInfo of an internal message"); - } - auto src_prefix = block::tlb::MsgAddressInt::get_prefix(info.src); - auto dest_prefix = block::tlb::MsgAddressInt::get_prefix(info.dest); - auto cur_prefix = block::interpolate_addr(src_prefix, dest_prefix, env.cur_addr); - auto next_prefix = block::interpolate_addr(src_prefix, dest_prefix, env.next_addr); - block::EnqueuedMsgDescr descr{cur_prefix, next_prefix, kv->lt, enqueued_lt, env.msg->get_hash().bits()}; - if (dst_processed_upto->already_processed(descr)) { - } else { - }*/ - dfs_cs(*kv->msg); TRY_STATUS_PREFIX(check_no_prunned(*kv->msg), "invalid message proof: ") if (estimated_proof_size >= limits.max_bytes || msg_count_total >= (long long)limits.max_msgs) { @@ -301,7 +278,12 @@ void OutMsgQueueImporter::get_neighbor_msg_queue_proofs( return; } - LOG(DEBUG) << "Importing neighbor msg queues for shard " << dst_shard.to_str() << ", " << blocks.size() << " blocks"; + FLOG(DEBUG) { + sb << "Importing neighbor msg queues for shard " << dst_shard.to_str() << ", " << blocks.size() << " blocks:"; + for (const BlockIdExt& block : blocks) { + sb << " " << block.id.to_str(); + } + }; cache_[{dst_shard, blocks}] = entry = std::make_shared(); entry->dst_shard = dst_shard; @@ -321,7 +303,7 @@ void OutMsgQueueImporter::get_neighbor_msg_queue_proofs( prefix = shard_prefix(prefix, min_split); } - LOG(INFO) << "search for out msg queue proof " << prefix.to_str() << block.to_str(); + LOG(DEBUG) << "search for out msg queue proof " << prefix.to_str() << " " << block.to_str(); auto& small_entry = small_cache_[std::make_pair(dst_shard, block)]; if (!small_entry.result.is_null()) { entry->result[block] = small_entry.result; @@ -397,7 +379,13 @@ void OutMsgQueueImporter::get_proof_import(std::shared_ptr entry, st [=, SelfId = actor_id(this), retry_after = td::Timestamp::in(0.1), dst_shard = entry->dst_shard](td::Result>> R) { if (R.is_error()) { - LOG(DEBUG) << "Failed to get out msg queue for " << dst_shard.to_str() << ": " << R.move_as_error(); + FLOG(DEBUG) { + sb << "Failed to get out msg queue for " << dst_shard.to_str() << " from"; + for (const BlockIdExt &block : blocks) { + sb << " " << block.id.to_str(); + } + sb << ": " << R.move_as_error(); + }; delay_action( [=]() { td::actor::send_closure(SelfId, &OutMsgQueueImporter::get_proof_import, entry, std::move(blocks), @@ -443,8 +431,11 @@ void OutMsgQueueImporter::got_proof(std::shared_ptr entry, std::vect void OutMsgQueueImporter::finish_query(std::shared_ptr entry) { FLOG(INFO) { - sb << "Done importing neighbor msg queues for shard " << entry->dst_shard.to_str() << ", " << entry->blocks.size() - << " blocks in " << entry->timer.elapsed() << "s"; + sb << "Done importing neighbor msg queues for shard " << entry->dst_shard.to_str() << " from"; + for (const BlockIdExt &block : entry->blocks) { + sb << " " << block.id.to_str(); + } + sb << " in " << entry->timer.elapsed() << "s"; sb << " sources{"; if (entry->from_broadcast) { sb << " broadcast=" << entry->from_broadcast; @@ -479,8 +470,13 @@ void OutMsgQueueImporter::finish_query(std::shared_ptr entry) { bool OutMsgQueueImporter::check_timeout(std::shared_ptr entry) { if (entry->timeout.is_in_past()) { - LOG(DEBUG) << "Aborting importing neighbor msg queues for shard " << entry->dst_shard.to_str() << ", " - << entry->blocks.size() << " blocks: timeout"; + FLOG(DEBUG) { + sb << "Aborting importing neighbor msg queues for shard " << entry->dst_shard.to_str() << " from"; + for (const BlockIdExt &block : entry->blocks) { + sb << " " << block.id.to_str(); + } + sb << ": timeout"; + }; for (auto& p : entry->promises) { p.first.set_error(td::Status::Error(ErrorCode::timeout, "timeout")); } @@ -499,8 +495,13 @@ void OutMsgQueueImporter::alarm() { auto& promises = it->second->promises; if (it->second->timeout.is_in_past()) { if (!it->second->done) { - LOG(DEBUG) << "Aborting importing neighbor msg queues for shard " << it->second->dst_shard.to_str() << ", " - << it->second->blocks.size() << " blocks: timeout"; + FLOG(DEBUG) { + sb << "Aborting importing neighbor msg queues for shard " << it->second->dst_shard.to_str() << " from"; + for (const BlockIdExt &block : it->second->blocks) { + sb << " " << block.id.to_str(); + } + sb << ": timeout"; + }; for (auto& p : promises) { p.first.set_error(td::Status::Error(ErrorCode::timeout, "timeout")); } @@ -540,7 +541,7 @@ void OutMsgQueueImporter::alarm() { } void OutMsgQueueImporter::add_out_msg_queue_proof(ShardIdFull dst_shard, td::Ref proof) { - LOG(INFO) << "add out msg queue proof " << dst_shard.to_str() << proof->block_id_.to_str(); + LOG(INFO) << "add out msg queue proof " << dst_shard.to_str() << " " << proof->block_id_.to_str(); auto& small_entry = small_cache_[std::make_pair(dst_shard, proof->block_id_)]; if (!small_entry.result.is_null()) { return; @@ -556,7 +557,13 @@ void OutMsgQueueImporter::add_out_msg_queue_proof(ShardIdFull dst_shard, td::Ref void BuildOutMsgQueueProof::abort_query(td::Status reason) { if (promise_) { - LOG(DEBUG) << "failed to build msg queue proof to " << dst_shard_.to_str() << ": " << reason; + FLOG(DEBUG) { + sb << "failed to build msg queue proof to " << dst_shard_.to_str() << " from"; + for (const auto& block : blocks_) { + sb << " " << block.id.id.to_str(); + } + sb << ": " << reason; + }; promise_.set_error( reason.move_as_error_prefix(PSTRING() << "failed to build msg queue proof to " << dst_shard_.to_str() << ": ")); } diff --git a/validator/manager.cpp b/validator/manager.cpp index 3ff33d234..903e31554 100644 --- a/validator/manager.cpp +++ b/validator/manager.cpp @@ -798,7 +798,9 @@ void ValidatorManagerImpl::wait_neighbor_msg_queue_proofs( if (dst_shard.is_masterchain()) { // We spit queries for masterchain {dst_shard, {block_1, ..., block_n}} into separate queries // {dst_shard, {block_1}}, ..., {dst_shard, {block_n}} - // Also, use cache + // Also, use cache. + // This is performed here and not in OutMsgQueueImporter because it's important to use + // cached_msg_queue_to_masterchain_, which is related to the current list of shard block descriptions class Worker : public td::actor::Actor { public: Worker(size_t pending, td::Promise>> promise) @@ -2958,12 +2960,15 @@ PublicKeyHash ValidatorManagerImpl::get_validator(ShardIdFull shard, td::Refget_collators_list()->self_collate; } bool ValidatorManagerImpl::Collator::can_collate_shard(ShardIdFull shard) const { @@ -3524,7 +3529,7 @@ void ValidatorManagerImpl::del_collator(adnl::AdnlNodeIdShort id, ShardIdFull sh } else { td::actor::send_closure(it->second.actor, &CollatorNode::del_shard, shard); } -}; +} void ValidatorManagerImpl::get_collation_manager_stats( td::Promise> promise) { @@ -3575,15 +3580,18 @@ void ValidatorManagerImpl::get_collation_manager_stats( } void ValidatorManagerImpl::add_out_msg_queue_proof(ShardIdFull dst_shard, td::Ref proof) { - if (!collator_nodes_.empty()) { + if (is_shard_collator(dst_shard)) { if (out_msg_queue_importer_.empty()) { out_msg_queue_importer_ = td::actor::create_actor("outmsgqueueimporter", actor_id(this), opts_, last_masterchain_state_); } td::actor::send_closure(out_msg_queue_importer_, &OutMsgQueueImporter::add_out_msg_queue_proof, dst_shard, std::move(proof)); + } else { + VLOG(VALIDATOR_DEBUG) << "Dropping unneeded out msg queue proof to shard " << dst_shard.to_str(); } } + void ValidatorManagerImpl::add_persistent_state_description(td::Ref desc) { auto now = (UnixTime)td::Clocks::system(); if (desc->end_time <= now) { diff --git a/validator/validator-group.cpp b/validator/validator-group.cpp index 344f5c1c3..97f34ea7c 100644 --- a/validator/validator-group.cpp +++ b/validator/validator-group.cpp @@ -32,13 +32,14 @@ namespace ton { namespace validator { static bool need_send_candidate_broadcast(const validatorsession::BlockSourceInfo &source_info, bool is_masterchain) { - return source_info.first_block_round == source_info.round && source_info.source_priority == 0 && !is_masterchain; + return source_info.priority.first_block_round == source_info.priority.round && source_info.priority.priority == 0 && + !is_masterchain; } void ValidatorGroup::generate_block_candidate( validatorsession::BlockSourceInfo source_info, td::Promise promise) { - td::uint32 round_id = source_info.round; + td::uint32 round_id = source_info.priority.round; if (round_id > last_known_round_id_) { last_known_round_id_ = round_id; } @@ -66,15 +67,10 @@ void ValidatorGroup::generate_block_candidate( std::move(R)); }; td::uint64 max_answer_size = config_.max_block_size + config_.max_collated_data_size + 1024; - auto block_candidate_priority = BlockCandidatePriority{ - .round = source_info.round, - .first_block_round = source_info.first_block_round, - .priority = source_info.source_priority - }; td::actor::send_closure(collation_manager_, &CollationManager::collate_block, shard_, min_masterchain_block_id_, prev_block_ids_, Ed25519_PublicKey{local_id_full_.ed25519_value().raw()}, - block_candidate_priority, validator_set_, - max_answer_size, cancellation_token_source_.get_cancellation_token(), std::move(P)); + source_info.priority, validator_set_, max_answer_size, + cancellation_token_source_.get_cancellation_token(), std::move(P)); } void ValidatorGroup::generated_block_candidate(validatorsession::BlockSourceInfo source_info, @@ -103,7 +99,7 @@ void ValidatorGroup::generated_block_candidate(validatorsession::BlockSourceInfo void ValidatorGroup::validate_block_candidate(validatorsession::BlockSourceInfo source_info, BlockCandidate block, td::Promise> promise) { - td::uint32 round_id = source_info.round; + td::uint32 round_id = source_info.priority.round; if (round_id > last_known_round_id_) { last_known_round_id_ = round_id; } @@ -174,7 +170,7 @@ void ValidatorGroup::accept_block_candidate(validatorsession::BlockSourceInfo so validatorsession::ValidatorSessionStats stats, td::Promise promise) { stats.cc_seqno = validator_set_->get_catchain_seqno(); - td::uint32 round_id = source_info.round; + td::uint32 round_id = source_info.priority.round; if (round_id >= last_known_round_id_) { last_known_round_id_ = round_id + 1; } From 5fae8db7a03d9d16b8bbaedd16a3d70202d22c28 Mon Sep 17 00:00:00 2001 From: SpyCheese Date: Thu, 28 Nov 2024 13:09:40 +0300 Subject: [PATCH 3/5] Adapt "get msg queue sizes" in lite-client and tonlib to non-full liteservers --- lite-client/lite-client.cpp | 90 +++++++++++++++--- lite-client/lite-client.h | 3 +- tonlib/tonlib/TonlibClient.cpp | 167 ++++++++++++++++++++++++++++++--- tonlib/tonlib/tonlib-cli.cpp | 19 ++++ 4 files changed, 253 insertions(+), 26 deletions(-) diff --git a/lite-client/lite-client.cpp b/lite-client/lite-client.cpp index dc09ae52b..ce3dd4b20 100644 --- a/lite-client/lite-client.cpp +++ b/lite-client/lite-client.cpp @@ -1627,27 +1627,93 @@ void TestNode::send_compute_complaint_price_query(ton::StdSmcAddress elector_add } bool TestNode::get_msg_queue_sizes() { - auto q = ton::serialize_tl_object(ton::create_tl_object(0, 0, 0), true); - return envelope_send_query(std::move(q), [Self = actor_id(this)](td::Result res) -> void { - if (res.is_error()) { - LOG(ERROR) << "liteServer.getOutMsgQueueSizes error: " << res.move_as_error(); + ton::BlockIdExt blkid = mc_last_id_; + if (!blkid.is_valid_full()) { + return set_error("must obtain last block information before making other queries"); + } + if (!(ready_ && !client_.empty())) { + return set_error("server connection not ready"); + } + auto b = + ton::create_serialize_tl_object(ton::create_tl_lite_block_id(blkid)); + LOG(INFO) << "requesting recent shard configuration"; + return envelope_send_query(std::move(b), [Self = actor_id(this), blkid](td::Result R) -> void { + if (R.is_error()) { return; } - auto F = ton::fetch_tl_object(res.move_as_ok(), true); + auto F = ton::fetch_tl_object(R.move_as_ok(), true); if (F.is_error()) { - LOG(ERROR) << "cannot parse answer to liteServer.getOutMsgQueueSizes"; - return; + LOG(ERROR) << "cannot parse answer to liteServer.getAllShardsInfo"; + } else { + auto f = F.move_as_ok(); + td::actor::send_closure_later(Self, &TestNode::get_msg_queue_sizes_cont, blkid, std::move(f->data_)); } - td::actor::send_closure_later(Self, &TestNode::got_msg_queue_sizes, F.move_as_ok()); }); } -void TestNode::got_msg_queue_sizes(ton::tl_object_ptr f) { +void TestNode::get_msg_queue_sizes_cont(ton::BlockIdExt mc_blkid, td::BufferSlice data) { + LOG(INFO) << "got shard configuration with respect to block " << mc_blkid.to_str(); + std::vector blocks; + blocks.push_back(mc_blkid); + auto R = vm::std_boc_deserialize(data.clone()); + if (R.is_error()) { + set_error(R.move_as_error_prefix("cannot deserialize shard configuration: ")); + return; + } + auto root = R.move_as_ok(); + block::ShardConfig sh_conf; + if (!sh_conf.unpack(vm::load_cell_slice_ref(root))) { + set_error("cannot extract shard block list from shard configuration"); + return; + } + auto ids = sh_conf.get_shard_hash_ids(true); + for (auto id : ids) { + auto ref = sh_conf.get_shard_hash(ton::ShardIdFull(id)); + if (ref.not_null()) { + blocks.push_back(ref->top_block_id()); + } + } + + struct QueryInfo { + std::vector blocks; + std::vector sizes; + size_t pending; + }; + auto info = std::make_shared(); + info->blocks = std::move(blocks); + info->sizes.resize(info->blocks.size(), 0); + info->pending = info->blocks.size(); + + for (size_t i = 0; i < info->blocks.size(); ++i) { + ton::BlockIdExt block_id = info->blocks[i]; + auto b = ton::create_serialize_tl_object( + 0, ton::create_tl_lite_block_id(block_id), false); + LOG(DEBUG) << "requesting queue size for block " << block_id.to_str(); + envelope_send_query(std::move(b), [=, this](td::Result R) -> void { + if (R.is_error()) { + return; + } + auto F = ton::fetch_tl_object(R.move_as_ok(), true); + if (F.is_error()) { + set_error(F.move_as_error_prefix("failed to get queue size: ")); + return; + } + auto f = F.move_as_ok(); + LOG(DEBUG) << "got queue size for block " << block_id.to_str() << " : " << f->size_; + info->sizes[i] = f->size_; + if (--info->pending == 0) { + get_msg_queue_sizes_finish(std::move(info->blocks), std::move(info->sizes)); + } + }); + } +} + +void TestNode::get_msg_queue_sizes_finish(std::vector blocks, std::vector sizes) { + CHECK(blocks.size() == sizes.size()); td::TerminalIO::out() << "Outbound message queue sizes:" << std::endl; - for (auto &x : f->shards_) { - td::TerminalIO::out() << ton::create_block_id(x->id_).id.to_str() << " " << x->size_ << std::endl; + for (size_t i = 0; i < blocks.size(); ++i) { + td::TerminalIO::out() << blocks[i].id.to_str() << " " << sizes[i] << std::endl; } - td::TerminalIO::out() << "External message queue size limit: " << f->ext_msg_queue_size_limit_ << std::endl; } bool TestNode::get_dispatch_queue_info(ton::BlockIdExt block_id) { diff --git a/lite-client/lite-client.h b/lite-client/lite-client.h index 721d2b20d..57804418f 100644 --- a/lite-client/lite-client.h +++ b/lite-client/lite-client.h @@ -324,7 +324,8 @@ class TestNode : public td::actor::Actor { void send_compute_complaint_price_query(ton::StdSmcAddress elector_addr, unsigned expires_in, unsigned bits, unsigned refs, td::Bits256 chash, std::string filename); bool get_msg_queue_sizes(); - void got_msg_queue_sizes(ton::tl_object_ptr f); + void get_msg_queue_sizes_cont(ton::BlockIdExt mc_blkid, td::BufferSlice data); + void get_msg_queue_sizes_finish(std::vector blocks, std::vector sizes); bool get_dispatch_queue_info(ton::BlockIdExt block_id); bool get_dispatch_queue_info_cont(ton::BlockIdExt block_id, bool first, td::Bits256 after_addr); void got_dispatch_queue_info(ton::BlockIdExt block_id, diff --git a/tonlib/tonlib/TonlibClient.cpp b/tonlib/tonlib/TonlibClient.cpp index b9ff4899c..73cb2a16a 100644 --- a/tonlib/tonlib/TonlibClient.cpp +++ b/tonlib/tonlib/TonlibClient.cpp @@ -1800,6 +1800,132 @@ class GetShardBlockProof : public td::actor::Actor { std::vector> links_; }; +class GetOutMsgQueueSizes : public td::actor::Actor { + public: + GetOutMsgQueueSizes(ExtClientRef ext_client_ref, std::vector blocks, + td::actor::ActorShared<> parent, + td::Promise>&& promise) + : blocks_(std::move(blocks)), parent_(std::move(parent)), promise_(std::move(promise)) { + client_.set_client(ext_client_ref); + } + + void start_up() override { + sizes_.resize(blocks_.size()); + pending_ = blocks_.size() + 1; + + for (size_t i = 0; i < blocks_.size(); ++i) { + client_.send_query( + ton::lite_api::liteServer_getBlockOutMsgQueueSize(1, ton::create_tl_lite_block_id(blocks_[i]), true), + [SelfId = actor_id(this), i](td::Result> R) { + if (R.is_error()) { + td::actor::send_closure(SelfId, &GetOutMsgQueueSizes::abort, R.move_as_error()); + } else { + td::actor::send_closure(SelfId, &GetOutMsgQueueSizes::got_block_queue_size, i, R.move_as_ok()); + } + }); + } + + client_.send_query( + ton::lite_api::liteServer_getOutMsgQueueSizes(1, ton::masterchainId, ton::shardIdAll), + [SelfId = actor_id(this)](td::Result> R) { + if (R.is_error()) { + td::actor::send_closure(SelfId, &GetOutMsgQueueSizes::abort, R.move_as_error()); + } else { + td::actor::send_closure(SelfId, &GetOutMsgQueueSizes::got_ext_msg_queue_size_limit, + R.ok()->ext_msg_queue_size_limit_); + } + }); + } + + void got_block_queue_size(size_t i, lite_api_ptr f) { + try { + auto S = [&, this]() -> td::Status { + TRY_RESULT_PREFIX(roots, vm::std_boc_deserialize_multi(f->proof_), "cannot deserialize proof: "); + if (roots.size() != 2) { + return td::Status::Error("expected 2 roots in proof"); + } + auto state_root = vm::MerkleProof::virtualize(std::move(roots[1]), 1); + if (state_root.is_null()) { + return td::Status::Error("state proof is invalid"); + } + ton::Bits256 state_hash = state_root->get_hash().bits(); + TRY_STATUS_PREFIX(block::check_block_header_proof(vm::MerkleProof::virtualize(std::move(roots[0]), 1), + blocks_[i], &state_hash, true, nullptr, nullptr), + "error in block header proof: "); + + block::gen::ShardStateUnsplit::Record sstate; + block::gen::OutMsgQueueInfo::Record out_msg_queue_info; + if (!tlb::unpack_cell(state_root, sstate) || !tlb::unpack_cell(sstate.out_msg_queue_info, out_msg_queue_info)) { + return td::Status::Error("cannot unpack shard state"); + } + vm::CellSlice& extra_slice = out_msg_queue_info.extra.write(); + if (extra_slice.fetch_long(1) == 0) { + return td::Status::Error("no out_msg_queue_size in shard state"); + } + block::gen::OutMsgQueueExtra::Record out_msg_queue_extra; + if (!tlb::unpack(extra_slice, out_msg_queue_extra)) { + return td::Status::Error("cannot unpack OutMsgQueueExtra"); + } + vm::CellSlice& size_slice = out_msg_queue_extra.out_queue_size.write(); + if (size_slice.fetch_long(1) == 0) { + return td::Status::Error("no out_msg_queue_size in shard state"); + } + td::uint64 size = size_slice.prefetch_ulong(48); + if (size != f->size_) { + return td::Status::Error("queue size mismatch"); + } + return td::Status::OK(); + }(); + if (S.is_error()) { + abort(std::move(S)); + return; + } + } catch (vm::VmError& err) { + abort(err.as_status()); + return; + } catch (vm::VmVirtError& err) { + abort(err.as_status()); + return; + } + + sizes_[i] = f->size_; + dec_pending(); + } + + void got_ext_msg_queue_size_limit(td::uint32 value) { + ext_msg_queue_size_limit_ = value; + dec_pending(); + } + + void dec_pending() { + if (--pending_ == 0) { + std::vector> shards; + for (size_t i = 0; i < blocks_.size(); ++i) { + shards.push_back( + tonlib_api::make_object(to_tonlib_api(blocks_[i]), sizes_[i])); + } + promise_.set_result( + tonlib_api::make_object(std::move(shards), ext_msg_queue_size_limit_)); + stop(); + } + } + + void abort(td::Status error) { + promise_.set_error(std::move(error)); + stop(); + } + + private: + std::vector blocks_; + td::actor::ActorShared<> parent_; + td::Promise> promise_; + ExtClient client_; + + std::vector sizes_; + td::uint32 ext_msg_queue_size_limit_ = 0; + size_t pending_ = 0; +}; + auto to_lite_api(const tonlib_api::ton_blockIdExt& blk) -> td::Result>; auto to_tonlib_api(const ton::lite_api::liteServer_transactionId& txid) -> tonlib_api_ptr; @@ -6129,19 +6255,34 @@ td::Status TonlibClient::do_request(const tonlib_api::blocks_getShardBlockProof& td::Status TonlibClient::do_request(const tonlib_api::blocks_getOutMsgQueueSizes& request, td::Promise>&& promise) { - client_.send_query(ton::lite_api::liteServer_getOutMsgQueueSizes(request.mode_, request.wc_, request.shard_), - promise.wrap([](lite_api_ptr&& queue_sizes) { - tonlib_api::blocks_outMsgQueueSizes result; - result.ext_msg_queue_size_limit_ = queue_sizes->ext_msg_queue_size_limit_; - for (auto &x : queue_sizes->shards_) { - tonlib_api::blocks_outMsgQueueSize shard; - shard.id_ = to_tonlib_api(*x->id_); - shard.size_ = x->size_; - result.shards_.push_back(tonlib_api::make_object(std::move(shard))); - } - return tonlib_api::make_object(std::move(result)); - })); - + auto req_mode = request.mode_; + auto req_shard = ton::ShardIdFull{request.wc_, (ton::ShardId)request.shard_}; + if ((req_mode & 1) && !req_shard.is_valid_ext()) { + return td::Status::Error("invalid shard"); + } + client_.with_last_block( + [=, self = this, promise = std::move(promise)](td::Result r_last_block) mutable { + TRY_RESULT_PROMISE_PREFIX(promise, last_block, std::move(r_last_block), "get last block failed: "); + do_request(tonlib_api::blocks_getShards(to_tonlib_api(last_block.last_block_id)), + [=, mc_blkid = last_block.last_block_id, + promise = std::move(promise)](td::Result> R) mutable { + TRY_RESULT_PROMISE_PREFIX(promise, shards, std::move(R), "get shards failed: "); + std::vector blocks; + if (!(req_mode & 1) || ton::shard_intersects(mc_blkid.shard_full(), req_shard)) { + blocks.push_back(mc_blkid); + } + for (const auto& shard : shards->shards_) { + TRY_RESULT_PROMISE(promise, block_id, to_block_id(*shard)); + if (!(req_mode & 1) || ton::shard_intersects(block_id.shard_full(), req_shard)) { + blocks.push_back(block_id); + } + } + auto actor_id = self->actor_id_++; + self->actors_[actor_id] = td::actor::create_actor( + "GetOutMsgQueueSizes", self->client_.get_client(), std::move(blocks), + actor_shared(this, actor_id), std::move(promise)); + }); + }); return td::Status::OK(); } diff --git a/tonlib/tonlib/tonlib-cli.cpp b/tonlib/tonlib/tonlib-cli.cpp index 2c7100f24..364b8f663 100644 --- a/tonlib/tonlib/tonlib-cli.cpp +++ b/tonlib/tonlib/tonlib-cli.cpp @@ -430,6 +430,7 @@ class TonlibCli : public td::actor::Actor { << "\t 'k' modifier - use fake key\n" << "\t 'c' modifier - just esmitate fees\n"; td::TerminalIO::out() << "getmasterchainsignatures - get sigratures of masterchain block \n"; + td::TerminalIO::out() << "msgqueuesizes - get out msg queue sizes in the latest shard states\n"; } else if (cmd == "genkey") { generate_key(); } else if (cmd == "exit" || cmd == "quit") { @@ -517,6 +518,8 @@ class TonlibCli : public td::actor::Actor { } else if (cmd == "getmasterchainsignatures") { auto seqno = parser.read_word(); run_get_masterchain_block_signatures(seqno, std::move(cmd_promise)); + } else if (cmd == "msgqueuesizes") { + run_get_out_msg_queue_sizes(std::move(cmd_promise)); } else if (cmd == "showtransactions") { run_show_transactions(parser, std::move(cmd_promise)); } else { @@ -2161,6 +2164,22 @@ class TonlibCli : public td::actor::Actor { })); } + void run_get_out_msg_queue_sizes(td::Promise promise) { + send_query(make_object(0, 0, 0), + promise.wrap([](tonlib_api::object_ptr&& f) { + td::TerminalIO::out() << "Outbound message queue sizes:" << std::endl; + for (const auto& shard : f->shards_) { + td::TerminalIO::out() << ton::BlockId{shard->id_->workchain_, (ton::ShardId)shard->id_->shard_, + (ton::BlockSeqno)shard->id_->seqno_} + .to_str() + << " " << shard->size_ << std::endl; + } + td::TerminalIO::out() << "External message queue size limit: " << f->ext_msg_queue_size_limit_ + << std::endl; + return td::Unit(); + })); + } + void run_show_transactions(td::ConstParser& parser, td::Promise promise) { TRY_RESULT_PROMISE(promise, address, to_account_address(parser.read_word(), false)); TRY_RESULT_PROMISE(promise, lt, td::to_integer_safe(parser.read_word())); From 923f1cd69b6d31f1e131764f0f9d2a208cfd4d63 Mon Sep 17 00:00:00 2001 From: SpyCheese Date: Thu, 28 Nov 2024 14:13:49 +0300 Subject: [PATCH 4/5] Improve collator node pings and collation manager stats --- tl/generate/scheme/ton_api.tl | 2 +- tl/generate/scheme/ton_api.tlo | Bin 108888 -> 108972 bytes .../validator-engine-console-query.cpp | 20 +++++++++++++-- validator/collation-manager.cpp | 11 +++++--- validator/collation-manager.hpp | 2 ++ validator/collator-node.cpp | 24 +++++++++++++++++- validator/collator-node.hpp | 4 +++ validator/manager.cpp | 10 ++++++++ 8 files changed, 66 insertions(+), 7 deletions(-) diff --git a/tl/generate/scheme/ton_api.tl b/tl/generate/scheme/ton_api.tl index 720f8d496..4bfd57883 100644 --- a/tl/generate/scheme/ton_api.tl +++ b/tl/generate/scheme/ton_api.tl @@ -750,7 +750,7 @@ engine.validator.perfTimerStats stats:(vector engine.validator.PerfTimerStatsByN engine.validator.shardOutQueueSize size:long = engine.validator.ShardOutQueueSize; engine.validator.collationManagerStats.shard shard_id:tonNode.shardId self_collate:Bool select_mode:string active:Bool collators:(vector int256) = engine.validator.collationManagerStats.Shard; -engine.validator.collationManagerStats.collator adnl_id:int256 active:Bool alive:Bool ping_in:double = engine.validator.collationManagerStats.Collator; +engine.validator.collationManagerStats.collator adnl_id:int256 active:Bool alive:Bool ping_in:double last_ping_ago:double last_ping_status:string = engine.validator.collationManagerStats.Collator; engine.validator.collationManagerStats.localId adnl_id:int256 shards:(vector engine.validator.collationManagerStats.shard) collators:(vector engine.validator.collationManagerStats.collator) = engine.validator.collationManagerStats.LocalId; engine.validator.collationManagerStats local_ids:(vector engine.validator.collationManagerStats.localId) = engine.validator.CollationManagerStats; diff --git a/tl/generate/scheme/ton_api.tlo b/tl/generate/scheme/ton_api.tlo index 19a8d13138d85dd2bea8b7e28793cbc85e441aa9..d5ebffaa061ab442de15cdea2f087fbaf6254856 100644 GIT binary patch delta 126 zcmca{iEYhgwhcP+ERLK?;+qZRcPt09CL73W2qx#}UB)%XsFFignJ%4gtwzL33q_`xpq_lW);BJTwleZi{ L1F@>*#FYmC=kqRX delta 77 zcmZ2;neE0UwhcP+ELtBY&)IAszhgO=HQ7L3Lohi%CnvEazX&9C^O%qi%jCcl8k2pG WM^B!xhXbOFlast_ping_status_; + std::erase_if(status, [](char c) { return c < (char)32; }); + if (status.size() > 128) { + status.resize(128); + } + sb << td::StringBuilder::FixedDouble(collator->last_ping_ago_, 3) << ": " << status; + } + td::TerminalIO::out() << sb.as_cslice() << "\n"; } } } diff --git a/validator/collation-manager.cpp b/validator/collation-manager.cpp index e84470595..879b8d895 100644 --- a/validator/collation-manager.cpp +++ b/validator/collation-manager.cpp @@ -261,6 +261,8 @@ void CollationManager::get_stats( } else { obj->ping_in_ = -1.0; } + obj->last_ping_ago_ = collator.last_ping_at ? td::Time::now() - collator.last_ping_at.at() : -1.0; + obj->last_ping_status_ = collator.last_ping_status.is_ok() ? "OK" : collator.last_ping_status.message().str(); stats->collators_.push_back(std::move(obj)); } promise.set_value(std::move(stats)); @@ -323,7 +325,7 @@ void CollationManager::alarm() { td::actor::send_closure(SelfId, &CollationManager::got_pong, id, std::move(R)); }; LOG(DEBUG) << "sending ping to " << id; - td::actor::send_closure(rldp_, &rldp::Rldp::send_query, local_id_, id, "collatorping", std::move(P), + td::actor::send_closure(rldp_, &rldp::Rldp::send_query, local_id_, id, "ping", std::move(P), td::Timestamp::in(2.0), std::move(query)); } else { alarm_timestamp().relax(collator.ping_at); @@ -340,7 +342,7 @@ void CollationManager::got_pong(adnl::AdnlNodeIdShort id, td::Result td::Result> { - TRY_RESULT_PREFIX(data, std::move(R), "rldp query error: "); + TRY_RESULT(data, std::move(R)); auto r_error = fetch_tl_object(data, true); if (r_error.is_ok()) { auto error = r_error.move_as_ok(); @@ -348,12 +350,15 @@ void CollationManager::got_pong(adnl::AdnlNodeIdShort id, td::Result(data, true); }(); + collator.last_ping_at = td::Timestamp::now(); if (r_pong.is_error()) { - LOG(DEBUG) << "pong from " << id << " : " << r_pong.move_as_error(); + LOG(DEBUG) << "pong from " << id << " : " << r_pong.error(); collator.alive = false; + collator.last_ping_status = r_pong.move_as_error(); } else { LOG(DEBUG) << "pong from " << id << " : OK"; collator.alive = true; + collator.last_ping_status = td::Status::OK(); } collator.ping_at = td::Timestamp::in(td::Random::fast(10.0, 20.0)); if (collator.active_cnt && !collator.sent_ping) { diff --git a/validator/collation-manager.hpp b/validator/collation-manager.hpp index 9ca69814b..0ca4617d0 100644 --- a/validator/collation-manager.hpp +++ b/validator/collation-manager.hpp @@ -65,6 +65,8 @@ class CollationManager : public td::actor::Actor { td::Timestamp ping_at = td::Timestamp::now(); bool sent_ping = false; size_t active_cnt = 0; + td::Timestamp last_ping_at = td::Timestamp::never(); + td::Status last_ping_status = td::Status::Error("not pinged"); }; std::map collators_; diff --git a/validator/collator-node.cpp b/validator/collator-node.cpp index 1b8eb79e8..968835c22 100644 --- a/validator/collator-node.cpp +++ b/validator/collator-node.cpp @@ -167,6 +167,10 @@ void CollatorNode::new_masterchain_block_notification(td::Ref } } +void CollatorNode::update_shard_client_handle(BlockHandle shard_client_handle) { + shard_client_handle_ = shard_client_handle; +} + void CollatorNode::update_validator_group_info(ShardIdFull shard, std::vector prev, CatchainSeqno cc_seqno) { if (!can_collate_shard(shard)) { @@ -225,7 +229,12 @@ void CollatorNode::update_validator_group_info(ShardIdFull shard, std::vector) {}); + auto S = check_out_of_sync(); + if (S.is_ok()) { + generate_block(shard, cc_seqno, info.prev, {}, td::Timestamp::in(10.0), [](td::Result) {}); + } else { + LOG(DEBUG) << "not generating block automatically: " << S; + } } return; } @@ -535,9 +544,22 @@ void CollatorNode::process_result(std::shared_ptr cache_entry, td::R cache_entry->promises.clear(); } +td::Status CollatorNode::check_out_of_sync() { + if (last_masterchain_state_.is_null() || !shard_client_handle_) { + return td::Status::Error("not inited"); + } + auto now = (UnixTime)td::Clocks::system(); + if (last_masterchain_state_->get_unix_time() < now - 60 || shard_client_handle_->unix_time() < now - 60) { + return td::Status::Error(PSTRING() << "out of sync: mc " << now - last_masterchain_state_->get_unix_time() + << "s ago, shardclient " << now - shard_client_handle_->unix_time() << "s ago"); + } + return td::Status::OK(); +} + void CollatorNode::process_ping(adnl::AdnlNodeIdShort src, ton_api::collatorNode_ping& ping, td::Promise promise) { LOG(DEBUG) << "got ping from " << src; + TRY_STATUS_PROMISE(promise, check_out_of_sync()); promise.set_result(create_serialize_tl_object(0)); } diff --git a/validator/collator-node.hpp b/validator/collator-node.hpp index 54876c35f..cca77d26c 100644 --- a/validator/collator-node.hpp +++ b/validator/collator-node.hpp @@ -36,6 +36,7 @@ class CollatorNode : public td::actor::Actor { void del_shard(ShardIdFull shard); void new_masterchain_block_notification(td::Ref state); + void update_shard_client_handle(BlockHandle shard_client_handle); void update_validator_group_info(ShardIdFull shard, std::vector prev, CatchainSeqno cc_seqno); void update_options(td::Ref opts) { @@ -84,6 +85,7 @@ class CollatorNode : public td::actor::Actor { std::map, FutureValidatorGroup> future_validator_groups_; td::Ref last_masterchain_state_; + BlockHandle shard_client_handle_; td::Result get_future_validator_group(ShardIdFull shard, CatchainSeqno cc_seqno); @@ -92,6 +94,8 @@ class CollatorNode : public td::actor::Actor { td::Promise promise); void process_result(std::shared_ptr cache_entry, td::Result R); + td::Status check_out_of_sync(); + public: static tl_object_ptr serialize_candidate(const BlockCandidate& block, bool compress); static td::Result deserialize_candidate(tl_object_ptr f, diff --git a/validator/manager.cpp b/validator/manager.cpp index 903e31554..b8394bdc5 100644 --- a/validator/manager.cpp +++ b/validator/manager.cpp @@ -2754,6 +2754,9 @@ void ValidatorManagerImpl::update_shard_client_block_handle(BlockHandle handle, last_liteserver_state_ = std::move(state); } } + for (auto &c : collator_nodes_) { + td::actor::send_closure(c.second.actor, &CollatorNode::update_shard_client_handle, shard_client_handle_); + } shard_client_update(seqno); promise.set_value(td::Unit()); } @@ -3509,6 +3512,13 @@ void ValidatorManagerImpl::add_collator(adnl::AdnlNodeIdShort id, ShardIdFull sh if (it == collator_nodes_.end()) { it = collator_nodes_.emplace(id, Collator()).first; it->second.actor = td::actor::create_actor("collatornode", id, opts_, actor_id(this), adnl_, rldp_); + if (last_masterchain_state_.not_null()) { + td::actor::send_closure(it->second.actor, &CollatorNode::new_masterchain_block_notification, + last_masterchain_state_); + } + if (shard_client_handle_) { + td::actor::send_closure(it->second.actor, &CollatorNode::update_shard_client_handle, shard_client_handle_); + } } if (!it->second.shards.insert(shard).second) { return; From 0280a288c6c038f2cfcce001cd71e828f76aee58 Mon Sep 17 00:00:00 2001 From: SpyCheese Date: Fri, 29 Nov 2024 10:34:01 +0300 Subject: [PATCH 5/5] Check supported version in collator-node --- validator/collator-node.cpp | 88 +++++++++++++++++++++++++--------- validator/collator-node.hpp | 4 ++ validator/impl/collator-impl.h | 3 ++ 3 files changed, 73 insertions(+), 22 deletions(-) diff --git a/validator/collator-node.cpp b/validator/collator-node.cpp index 968835c22..869ac9ba0 100644 --- a/validator/collator-node.cpp +++ b/validator/collator-node.cpp @@ -21,6 +21,7 @@ #include "block-db.h" #include "td/utils/lz4.h" #include "checksum.h" +#include "impl/collator-impl.h" #include "impl/shard.hpp" #include "validator-session/candidate-serializer.h" @@ -85,6 +86,15 @@ void CollatorNode::del_shard(ShardIdFull shard) { void CollatorNode::new_masterchain_block_notification(td::Ref state) { last_masterchain_state_ = state; + + if (state->last_key_block_id().seqno() != last_key_block_seqno_) { + last_key_block_seqno_ = state->last_key_block_id().seqno(); + mc_config_status_ = check_mc_config(); + if (mc_config_status_.is_error()) { + LOG(ERROR) << "Cannot validate masterchain config (possibly outdated software):" << mc_config_status_; + } + } + if (validator_adnl_ids_.empty() || state->is_key_state()) { validator_adnl_ids_.clear(); for (int next : {-1, 0, 1}) { @@ -216,7 +226,7 @@ void CollatorNode::update_validator_group_info(ShardIdFull shard, std::vectorhas_external_query_at && !cache_entry->has_internal_query_at) { LOG(INFO) << "generate block query" @@ -230,11 +240,15 @@ void CollatorNode::update_validator_group_info(ShardIdFull shard, std::vector) {}); - } else { + if (S.is_error()) { LOG(DEBUG) << "not generating block automatically: " << S; + return; + } + if (mc_config_status_.is_error()) { + LOG(DEBUG) << "not generating block automatically: unsupported mc config: " << mc_config_status_; + return; } + generate_block(shard, cc_seqno, info.prev, {}, td::Timestamp::in(10.0), [](td::Result) {}); } return; } @@ -322,7 +336,7 @@ static BlockCandidate change_creator(BlockCandidate block, Ed25519_PublicKey cre for (auto& broadcast_ref : block.out_msg_queue_proof_broadcasts) { auto block_state_proof = create_block_state_proof(root).move_as_ok(); - auto &broadcast = broadcast_ref.write(); + auto& broadcast = broadcast_ref.write(); broadcast.block_id = block.id; broadcast.block_state_proofs = vm::std_boc_serialize(std::move(block_state_proof), 31).move_as_ok(); } @@ -363,11 +377,9 @@ void CollatorNode::receive_query(adnl::AdnlNodeIdShort src, td::BufferSlice data for (const auto& b : f->prev_blocks_) { prev_blocks.push_back(create_block_id(b)); } - auto priority = BlockCandidatePriority { - .round = static_cast(f->round_), - .first_block_round = static_cast(f->first_block_round_), - .priority = f->priority_ - }; + auto priority = BlockCandidatePriority{.round = static_cast(f->round_), + .first_block_round = static_cast(f->first_block_round_), + .priority = f->priority_}; Ed25519_PublicKey creator(f->creator_); td::Promise new_promise = [promise = std::move(promise), src, shard](td::Result R) mutable { @@ -447,22 +459,21 @@ void CollatorNode::generate_block(ShardIdFull shard, CatchainSeqno cc_seqno, std return; } - static auto prefix_inner = [] (auto &sb, auto &shard, auto cc_seqno, auto block_seqno, - const std::optional &o_priority) { + static auto prefix_inner = [](auto& sb, auto& shard, auto cc_seqno, auto block_seqno, + const std::optional& o_priority) { sb << "generate block query" << ": shard=" << shard.to_str() << ", cc_seqno=" << cc_seqno << ", next_block_seqno=" << block_seqno; if (o_priority) { sb << " external{"; - sb << "round_offset=" << o_priority->round - o_priority->first_block_round << ",priority=" << o_priority->priority; + sb << "round_offset=" << o_priority->round - o_priority->first_block_round + << ",priority=" << o_priority->priority; sb << ",first_block_round=" << o_priority->first_block_round; sb << "}"; } else { - sb << " internal" ; + sb << " internal"; } }; - auto prefix = [&] (auto &sb) { - prefix_inner(sb, shard, cc_seqno, block_seqno, o_priority); - }; + auto prefix = [&](auto& sb) { prefix_inner(sb, shard, cc_seqno, block_seqno, o_priority); }; auto cache_entry = validator_group_info.cache[prev_blocks]; if (cache_entry == nullptr) { @@ -473,7 +484,8 @@ void CollatorNode::generate_block(ShardIdFull shard, CatchainSeqno cc_seqno, std if (cache_entry->has_internal_query_at && cache_entry->has_external_query_at) { FLOG(INFO) { prefix(sb); - sb << ": got external query " << cache_entry->has_external_query_at.at() - cache_entry->has_internal_query_at.at() + sb << ": got external query " + << cache_entry->has_external_query_at.at() - cache_entry->has_internal_query_at.at() << "s after internal query [WON]"; }; } @@ -483,7 +495,8 @@ void CollatorNode::generate_block(ShardIdFull shard, CatchainSeqno cc_seqno, std if (cache_entry->has_internal_query_at && cache_entry->has_external_query_at) { FLOG(INFO) { prefix(sb); - sb << ": got internal query " << cache_entry->has_internal_query_at.at() - cache_entry->has_external_query_at.at() + sb << ": got internal query " + << cache_entry->has_internal_query_at.at() - cache_entry->has_external_query_at.at() << "s after external query [LOST]"; }; } @@ -503,8 +516,8 @@ void CollatorNode::generate_block(ShardIdFull shard, CatchainSeqno cc_seqno, std if (cache_entry->started) { FLOG(INFO) { - prefix(sb); - sb << ": collation in progress, waiting"; + prefix(sb); + sb << ": collation in progress, waiting"; }; return; } @@ -519,7 +532,7 @@ void CollatorNode::generate_block(ShardIdFull shard, CatchainSeqno cc_seqno, std last_masterchain_state_->get_validator_set(shard), opts_->get_collator_options(), manager_, timeout, [=, SelfId = actor_id(this), timer = td::Timer{}](td::Result R) { FLOG(INFO) { - prefix_inner(sb, shard, cc_seqno, block_seqno,o_priority); + prefix_inner(sb, shard, cc_seqno, block_seqno, o_priority); sb << timer.elapsed() << ": " << (R.is_ok() ? "OK" : R.error().to_string()); }; td::actor::send_closure(SelfId, &CollatorNode::process_result, cache_entry, std::move(R)); @@ -556,10 +569,41 @@ td::Status CollatorNode::check_out_of_sync() { return td::Status::OK(); } +td::Status CollatorNode::check_mc_config() { + if (last_masterchain_state_.is_null()) { + return td::Status::Error("not inited"); + } + TRY_RESULT_PREFIX( + config, + block::ConfigInfo::extract_config(last_masterchain_state_->root_cell(), block::ConfigInfo::needCapabilities), + "cannot unpack masterchain config"); + if (config->get_global_version() > Collator::supported_version()) { + return td::Status::Error(PSTRING() << "unsupported global version " << config->get_global_version() + << " (supported: " << Collator::supported_version() << ")"); + } + if (config->get_capabilities() & ~Collator::supported_capabilities()) { + return td::Status::Error(PSTRING() << "unsupported capabilities " << config->get_capabilities() + << " (supported: " << Collator::supported_capabilities() << ")"); + } + td::Status S = td::Status::OK(); + config->foreach_config_param([&](int idx, td::Ref param) { + if (idx < 0) { + return true; + } + if (!block::gen::ConfigParam{idx}.validate_ref(1024, std::move(param))) { + S = td::Status::Error(PSTRING() << "unknown ConfigParam " << idx); + return false; + } + return true; + }); + return S; +} + void CollatorNode::process_ping(adnl::AdnlNodeIdShort src, ton_api::collatorNode_ping& ping, td::Promise promise) { LOG(DEBUG) << "got ping from " << src; TRY_STATUS_PROMISE(promise, check_out_of_sync()); + TRY_STATUS_PROMISE_PREFIX(promise, mc_config_status_.clone(), "unsupported mc config: "); promise.set_result(create_serialize_tl_object(0)); } diff --git a/validator/collator-node.hpp b/validator/collator-node.hpp index cca77d26c..5aab24635 100644 --- a/validator/collator-node.hpp +++ b/validator/collator-node.hpp @@ -87,6 +87,9 @@ class CollatorNode : public td::actor::Actor { td::Ref last_masterchain_state_; BlockHandle shard_client_handle_; + td::Status mc_config_status_ = td::Status::Error("not inited"); + BlockSeqno last_key_block_seqno_ = (BlockSeqno)-1; + td::Result get_future_validator_group(ShardIdFull shard, CatchainSeqno cc_seqno); void generate_block(ShardIdFull shard, CatchainSeqno cc_seqno, std::vector prev_blocks, @@ -95,6 +98,7 @@ class CollatorNode : public td::actor::Actor { void process_result(std::shared_ptr cache_entry, td::Result R); td::Status check_out_of_sync(); + td::Status check_mc_config(); public: static tl_object_ptr serialize_candidate(const BlockCandidate& block, bool compress); diff --git a/validator/impl/collator-impl.h b/validator/impl/collator-impl.h index 0090e95de..b207242fb 100644 --- a/validator/impl/collator-impl.h +++ b/validator/impl/collator-impl.h @@ -40,6 +40,7 @@ namespace validator { using td::Ref; class Collator final : public td::actor::Actor { + public: static constexpr int supported_version() { return SUPPORTED_VERSION; } @@ -47,6 +48,8 @@ class Collator final : public td::actor::Actor { return ton::capCreateStatsEnabled | ton::capBounceMsgBody | ton::capReportVersion | ton::capShortDequeue | ton::capStoreOutMsgQueueSize | ton::capMsgMetadata | ton::capDeferMessages | ton::capFullCollatedData; } + + private: using LtCellRef = block::LtCellRef; using NewOutMsg = block::NewOutMsg; const ShardIdFull shard_;