From 5ad18fe7e4b3f5942998d0f3c48da3289925bc62 Mon Sep 17 00:00:00 2001 From: SpyCheese Date: Tue, 21 May 2024 14:21:34 +0300 Subject: [PATCH] Store out msg queue size in state --- crypto/block/block.cpp | 23 ++++++++--- crypto/block/block.h | 8 ++-- ton/ton-types.h | 3 +- validator/impl/collator-impl.h | 9 +++-- validator/impl/collator.cpp | 24 +++++++---- validator/impl/validate-query.cpp | 67 +++++++++++++++++++++++++++++++ validator/impl/validate-query.hpp | 7 +++- 7 files changed, 119 insertions(+), 22 deletions(-) diff --git a/crypto/block/block.cpp b/crypto/block/block.cpp index 427dd2f10..ff51ed978 100644 --- a/crypto/block/block.cpp +++ b/crypto/block/block.cpp @@ -871,6 +871,9 @@ td::Status ShardState::unpack_out_msg_queue_info(Ref out_msg_queue_inf return td::Status::Error(-666, "cannot unpack OutMsgQueueExtre in the state of "s + id_.to_str()); } dispatch_queue_ = std::make_unique(extra.dispatch_queue, 256, tlb::aug_DispatchQueue); + if (extra.out_queue_size.write().fetch_long(1)) { + out_msg_queue_size_ = extra.out_queue_size->prefetch_ulong(48); + } } else { dispatch_queue_ = std::make_unique(256, tlb::aug_DispatchQueue); } @@ -1009,6 +1012,12 @@ td::Status ShardState::merge_with(ShardState& sib) { return td::Status::Error(-666, "cannot merge dispatch queues of the two ancestors"); } sib.dispatch_queue_.reset(); + // 11. merge out_msg_queue_size + if (out_msg_queue_size_ && sib.out_msg_queue_size_) { + out_msg_queue_size_.value() += sib.out_msg_queue_size_.value(); + } else { + out_msg_queue_size_ = {}; + } // Anything else? add here // ... @@ -1024,8 +1033,8 @@ td::Status ShardState::merge_with(ShardState& sib) { return td::Status::OK(); } -td::Result> ShardState::compute_split_out_msg_queue(ton::ShardIdFull subshard, - td::uint32* queue_size) { +td::Result> ShardState::compute_split_out_msg_queue( + ton::ShardIdFull subshard) { auto shard = id_.shard_full(); if (!ton::shard_is_parent(shard, subshard)) { return td::Status::Error(-666, "cannot split subshard "s + subshard.to_str() + " from state of " + id_.to_str() + @@ -1033,7 +1042,7 @@ td::Result> ShardState::compute_split_o } CHECK(out_msg_queue_); auto subqueue = std::make_unique(*out_msg_queue_); - int res = block::filter_out_msg_queue(*subqueue, shard, subshard, queue_size); + int res = block::filter_out_msg_queue(*subqueue, shard, subshard); if (res < 0) { return td::Status::Error(-666, "error splitting OutMsgQueue of "s + id_.to_str()); } @@ -1055,7 +1064,7 @@ td::Result> ShardState::compu return std::move(sub_processed_upto); } -td::Status ShardState::split(ton::ShardIdFull subshard, td::uint32* queue_size) { +td::Status ShardState::split(ton::ShardIdFull subshard) { if (!ton::shard_is_parent(id_.shard_full(), subshard)) { return td::Status::Error(-666, "cannot split subshard "s + subshard.to_str() + " from state of " + id_.to_str() + " because it is not a parent"); @@ -1073,10 +1082,12 @@ td::Status ShardState::split(ton::ShardIdFull subshard, td::uint32* queue_size) auto shard1 = id_.shard_full(); CHECK(ton::shard_is_parent(shard1, subshard)); CHECK(out_msg_queue_); - int res1 = block::filter_out_msg_queue(*out_msg_queue_, shard1, subshard, queue_size); + td::uint64 queue_size; + int res1 = block::filter_out_msg_queue(*out_msg_queue_, shard1, subshard, &queue_size); if (res1 < 0) { return td::Status::Error(-666, "error splitting OutMsgQueue of "s + id_.to_str()); } + out_msg_queue_size_ = queue_size; LOG(DEBUG) << "split counters: " << res1; // 3. processed_upto LOG(DEBUG) << "splitting ProcessedUpto"; @@ -1119,7 +1130,7 @@ td::Status ShardState::split(ton::ShardIdFull subshard, td::uint32* queue_size) } int filter_out_msg_queue(vm::AugmentedDictionary& out_queue, ton::ShardIdFull old_shard, ton::ShardIdFull subshard, - td::uint32* queue_size) { + td::uint64* queue_size) { if (queue_size) { *queue_size = 0; } diff --git a/crypto/block/block.h b/crypto/block/block.h index e5e39bfee..5f3dadff4 100644 --- a/crypto/block/block.h +++ b/crypto/block/block.h @@ -418,6 +418,7 @@ struct ShardState { std::unique_ptr block_create_stats_; std::shared_ptr processed_upto_; std::unique_ptr dispatch_queue_; + td::optional out_msg_queue_size_; bool is_valid() const { return id_.is_valid(); @@ -434,11 +435,10 @@ struct ShardState { ton::BlockSeqno prev_mc_block_seqno, bool after_split, bool clear_history, std::function for_each_mcseqno); td::Status merge_with(ShardState& sib); - td::Result> compute_split_out_msg_queue(ton::ShardIdFull subshard, - td::uint32* queue_size = nullptr); + td::Result> compute_split_out_msg_queue(ton::ShardIdFull subshard); td::Result> compute_split_processed_upto( ton::ShardIdFull subshard); - td::Status split(ton::ShardIdFull subshard, td::uint32* queue_size = nullptr); + td::Status split(ton::ShardIdFull subshard); td::Status unpack_out_msg_queue_info(Ref out_msg_queue_info); bool clear_load_history() { overload_history_ = underload_history_ = 0; @@ -659,7 +659,7 @@ class MtCarloComputeShare { }; int filter_out_msg_queue(vm::AugmentedDictionary& out_queue, ton::ShardIdFull old_shard, ton::ShardIdFull subshard, - td::uint32* queue_size = nullptr); + td::uint64* queue_size = nullptr); std::ostream& operator<<(std::ostream& os, const ShardId& shard_id); diff --git a/ton/ton-types.h b/ton/ton-types.h index 24d542599..97231dac5 100644 --- a/ton/ton-types.h +++ b/ton/ton-types.h @@ -57,7 +57,8 @@ enum GlobalCapabilities { capBounceMsgBody = 4, capReportVersion = 8, capSplitMergeTransactions = 16, - capShortDequeue = 32 + capShortDequeue = 32, + capStoreOutMsgQueueSize = 64 }; inline int shard_pfx_len(ShardId shard) { diff --git a/validator/impl/collator-impl.h b/validator/impl/collator-impl.h index 4c5ae37a4..201f9dc22 100644 --- a/validator/impl/collator-impl.h +++ b/validator/impl/collator-impl.h @@ -44,7 +44,8 @@ class Collator final : public td::actor::Actor { return SUPPORTED_VERSION; } static constexpr long long supported_capabilities() { - return ton::capCreateStatsEnabled | ton::capBounceMsgBody | ton::capReportVersion | ton::capShortDequeue; + return ton::capCreateStatsEnabled | ton::capBounceMsgBody | ton::capReportVersion | ton::capShortDequeue | + ton::capStoreOutMsgQueueSize; } using LtCellRef = block::LtCellRef; using NewOutMsg = block::NewOutMsg; @@ -192,7 +193,8 @@ class Collator final : public td::actor::Actor { std::priority_queue, std::greater> new_msgs; std::pair last_proc_int_msg_, first_unproc_int_msg_; std::unique_ptr in_msg_dict, out_msg_dict, out_msg_queue_, sibling_out_msg_queue_; - td::uint32 out_msg_queue_size_ = 0; + td::uint64 out_msg_queue_size_ = 0; + bool have_out_msg_queue_size_in_state_ = false; std::unique_ptr ihr_pending; std::shared_ptr processed_upto_, sibling_processed_upto_; std::unique_ptr block_create_stats_; @@ -207,7 +209,8 @@ class Collator final : public td::actor::Actor { std::map sender_generated_messages_count_; unsigned dispatch_queue_ops_{0}; std::map last_enqueued_deferred_lt_; - bool msg_metadata_enabled_ = true; // TODO: enable by config + bool msg_metadata_enabled_ = true; // TODO: enable by config + bool store_out_msg_queue_size_ = true; // TODO: enable by config td::PerfWarningTimer perf_timer_; // diff --git a/validator/impl/collator.cpp b/validator/impl/collator.cpp index 1ad197a9c..f46e44a68 100644 --- a/validator/impl/collator.cpp +++ b/validator/impl/collator.cpp @@ -694,6 +694,7 @@ bool Collator::unpack_last_mc_state() { create_stats_enabled_ = config_->create_stats_enabled(); report_version_ = config_->has_capability(ton::capReportVersion); short_dequeue_records_ = config_->has_capability(ton::capShortDequeue); + store_out_msg_queue_size_ = config_->has_capability(ton::capStoreOutMsgQueueSize); shard_conf_ = std::make_unique(*config_); prev_key_block_exists_ = config_->get_last_key_block(prev_key_block_, prev_key_block_lt_); if (prev_key_block_exists_) { @@ -794,15 +795,16 @@ bool Collator::request_neighbor_msg_queues() { } /** - * Requests the size of the outbound message queue from the previous state(s). + * Requests the size of the outbound message queue from the previous state(s) if needed. * * @returns True if the request was successful, false otherwise. */ bool Collator::request_out_msg_queue_size() { - if (after_split_) { - // If block is after split, the size is calculated during split (see Collator::split_last_state) + if (have_out_msg_queue_size_in_state_) { + // if after_split then have_out_msg_queue_size_in_state_ is always true, since the size is calculated during split return true; } + out_msg_queue_size_ = 0; for (size_t i = 0; i < prev_blocks.size(); ++i) { ++pending; send_closure_later(manager, &ValidatorManager::get_out_msg_queue_size, prev_blocks[i], @@ -1016,7 +1018,7 @@ bool Collator::split_last_state(block::ShardState& ss) { return fatal_error(res2.move_as_error()); } sibling_processed_upto_ = res2.move_as_ok(); - auto res3 = ss.split(shard_, &out_msg_queue_size_); + auto res3 = ss.split(shard_); if (res3.is_error()) { return fatal_error(std::move(res3)); } @@ -1054,6 +1056,10 @@ bool Collator::import_shard_state_data(block::ShardState& ss) { ihr_pending = std::move(ss.ihr_pending_); dispatch_queue_ = std::move(ss.dispatch_queue_); block_create_stats_ = std::move(ss.block_create_stats_); + if (ss.out_msg_queue_size_) { + have_out_msg_queue_size_in_state_ = true; + out_msg_queue_size_ = ss.out_msg_queue_size_.value(); + } return true; } @@ -4928,9 +4934,13 @@ bool Collator::compute_out_msg_queue_info(Ref& out_msg_queue_info) { vm::CellBuilder cb; // out_msg_queue_extra#0 dispatch_queue:DispatchQueue out_queue_size:(Maybe uint48) = OutMsgQueueExtra; // ... extra:(Maybe OutMsgQueueExtra) - if (!dispatch_queue_->is_empty()) { - if (!(cb.store_long_bool(1, 1) && cb.store_long_bool(0, 4) && dispatch_queue_->append_dict_to_bool(cb) && - cb.store_long_bool(0, 1))) { + bool ok = false; + if (!dispatch_queue_->is_empty() || store_out_msg_queue_size_) { + if (!(cb.store_long_bool(1, 1) && cb.store_long_bool(0, 4) && dispatch_queue_->append_dict_to_bool(cb))) { + return false; + } + if (!(cb.store_bool_bool(store_out_msg_queue_size_) && + (!store_out_msg_queue_size_ || cb.store_long_bool(out_msg_queue_size_, 48)))) { return false; } } else { diff --git a/validator/impl/validate-query.cpp b/validator/impl/validate-query.cpp index e8de70ef6..2069fa851 100644 --- a/validator/impl/validate-query.cpp +++ b/validator/impl/validate-query.cpp @@ -895,6 +895,7 @@ bool ValidateQuery::try_unpack_mc_state() { if (!is_masterchain() && !check_this_shard_mc_info()) { return fatal_error("masterchain configuration does not admit creating block "s + id_.to_str()); } + store_out_msg_queue_size_ = config_->has_capability(ton::capStoreOutMsgQueueSize); } catch (vm::VmError& err) { return fatal_error(-666, err.get_msg()); } catch (vm::VmVirtError& err) { @@ -2195,6 +2196,50 @@ bool ValidateQuery::check_utime_lt() { return true; } +/** + * Reads the size of the outbound message queue from the previous state(s), or requests it if needed. + * + * @returns True if the request was successful, false otherwise. + */ +bool ValidateQuery::prepare_out_msg_queue_size() { + if (ps_.out_msg_queue_size_) { + // if after_split then out_msg_queue_size is always present, since it is calculated during split + old_out_msg_queue_size_ = ps_.out_msg_queue_size_.value(); + return true; + } + old_out_msg_queue_size_ = 0; + for (size_t i = 0; i < prev_blocks.size(); ++i) { + ++pending; + send_closure_later(manager, &ValidatorManager::get_out_msg_queue_size, prev_blocks[i], + [self = get_self(), i](td::Result res) { + td::actor::send_closure(std::move(self), &ValidateQuery::got_out_queue_size, i, + std::move(res)); + }); + } + return true; +} + +/** + * Handles the result of obtaining the size of the outbound message queue. + * + * If the block is after merge then the two sizes are added. + * + * @param i The index of the previous block (0 or 1). + * @param res The result object containing the size of the queue. + */ +void ValidateQuery::got_out_queue_size(size_t i, td::Result res) { + --pending; + if (res.is_error()) { + fatal_error( + res.move_as_error_prefix(PSTRING() << "failed to get message queue size from prev block #" << i << ": ")); + return; + } + td::uint32 size = res.move_as_ok(); + LOG(DEBUG) << "got outbound queue size from prev block #" << i << ": " << size; + old_out_msg_queue_size_ += size; + try_validate(); +} + /* * * METHODS CALLED FROM try_validate() stage 1 @@ -3041,6 +3086,7 @@ bool ValidateQuery::precheck_one_message_queue_update(td::ConstBitPtr out_msg_id return reject_query("new EnqueuedMsg with key "s + out_msg_id.to_hex(352) + " is invalid"); } if (new_value.not_null()) { + ++new_out_msg_queue_size_; if (!block::gen::t_EnqueuedMsg.validate_csr(new_value)) { return reject_query("new EnqueuedMsg with key "s + out_msg_id.to_hex(352) + " failed to pass automated validity checks"); @@ -3057,6 +3103,7 @@ bool ValidateQuery::precheck_one_message_queue_update(td::ConstBitPtr out_msg_id } } if (old_value.not_null()) { + --new_out_msg_queue_size_; if (!block::gen::t_EnqueuedMsg.validate_csr(old_value)) { return reject_query("old EnqueuedMsg with key "s + out_msg_id.to_hex(352) + " failed to pass automated validity checks"); @@ -3209,6 +3256,7 @@ bool ValidateQuery::precheck_message_queue_update() { try { CHECK(ps_.out_msg_queue_ && ns_.out_msg_queue_); CHECK(out_msg_dict_); + new_out_msg_queue_size_ = old_out_msg_queue_size_; if (!ps_.out_msg_queue_->scan_diff( *ns_.out_msg_queue_, [this](td::ConstBitPtr key, int key_len, Ref old_val_extra, @@ -3223,6 +3271,22 @@ bool ValidateQuery::precheck_message_queue_update() { return reject_query("invalid OutMsgQueue dictionary difference between the old and the new state: "s + err.get_msg()); } + LOG(INFO) << "outbound message queue size: " << old_out_msg_queue_size_ << " -> " << new_out_msg_queue_size_; + if (store_out_msg_queue_size_) { + if (!ns_.out_msg_queue_size_) { + return reject_query(PSTRING() << "outbound message queue size in the new state is not correct (expected: " + << new_out_msg_queue_size_ << ", found: none)"); + } + if (ns_.out_msg_queue_size_.value() != new_out_msg_queue_size_) { + return reject_query(PSTRING() << "outbound message queue size in the new state is not correct (expected: " + << new_out_msg_queue_size_ << ", found: " << ns_.out_msg_queue_size_.value() + << ")"); + } + } else { + if (ns_.out_msg_queue_size_) { + return reject_query("outbound message queue size in the new state is present, but shouldn't"); + } + } return true; } @@ -6621,6 +6685,9 @@ bool ValidateQuery::try_validate() { if (!check_utime_lt()) { return reject_query("creation utime/lt of the new block is invalid"); } + if (!prepare_out_msg_queue_size()) { + return reject_query("cannot request out msg queue size"); + } stage_ = 1; if (pending) { return true; diff --git a/validator/impl/validate-query.hpp b/validator/impl/validate-query.hpp index 197095475..c94be27d9 100644 --- a/validator/impl/validate-query.hpp +++ b/validator/impl/validate-query.hpp @@ -112,7 +112,8 @@ class ValidateQuery : public td::actor::Actor { return SUPPORTED_VERSION; } static constexpr long long supported_capabilities() { - return ton::capCreateStatsEnabled | ton::capBounceMsgBody | ton::capReportVersion | ton::capShortDequeue; + return ton::capCreateStatsEnabled | ton::capBounceMsgBody | ton::capReportVersion | ton::capShortDequeue | + ton::capStoreOutMsgQueueSize; } public: @@ -234,6 +235,8 @@ class ValidateQuery : public td::actor::Actor { std::map, Ref> new_dispatch_queue_messages_; bool msg_metadata_enabled_ = true; std::set account_expected_defer_all_messages_; + td::uint64 old_out_msg_queue_size_ = 0, new_out_msg_queue_size_ = 0; + bool store_out_msg_queue_size_ = true; td::PerfWarningTimer perf_timer_; @@ -314,6 +317,8 @@ class ValidateQuery : public td::actor::Actor { bool check_cur_validator_set(); bool check_mc_validator_info(bool update_mc_cc); bool check_utime_lt(); + bool prepare_out_msg_queue_size(); + void got_out_queue_size(size_t i, td::Result res); bool fix_one_processed_upto(block::MsgProcessedUpto& proc, ton::ShardIdFull owner, bool allow_cur = false); bool fix_processed_upto(block::MsgProcessedUptoCollection& upto, bool allow_cur = false);