From f10c7f54a82bc8ead6aaae9e6fd01db02493e3c0 Mon Sep 17 00:00:00 2001 From: SpyCheese Date: Thu, 6 Jul 2023 11:44:21 +0300 Subject: [PATCH] Bugfix in processing message queues; improve out_msg_queue_cleanup --- crypto/block/output-queue-merger.cpp | 2 +- crypto/block/output-queue-merger.h | 1 + validator/impl/collator-impl.h | 5 +- validator/impl/collator.cpp | 69 ++++++++++++++++++++++------ validator/impl/validate-query.cpp | 8 ++++ 5 files changed, 68 insertions(+), 17 deletions(-) diff --git a/crypto/block/output-queue-merger.cpp b/crypto/block/output-queue-merger.cpp index ca7021a89..2955b3e01 100644 --- a/crypto/block/output-queue-merger.cpp +++ b/crypto/block/output-queue-merger.cpp @@ -229,7 +229,7 @@ bool OutputQueueMerger::load() { } } msg_list.resize(j); - return true; + return msg_list.size() > orig_size; } } // namespace block diff --git a/crypto/block/output-queue-merger.h b/crypto/block/output-queue-merger.h index f5c15ab4d..3eb4b3799 100644 --- a/crypto/block/output-queue-merger.h +++ b/crypto/block/output-queue-merger.h @@ -60,6 +60,7 @@ struct OutputQueueMerger { td::Ref outmsg_root_; bool disabled_; td::int32 msg_limit_; // -1 - unlimited + Neighbor() = default; Neighbor(ton::BlockIdExt block_id, td::Ref outmsg_root, bool disabled = false, td::int32 msg_limit = -1) : block_id_(block_id), outmsg_root_(std::move(outmsg_root)), disabled_(disabled), msg_limit_(msg_limit) { } diff --git a/validator/impl/collator-impl.h b/validator/impl/collator-impl.h index 80a8e9a7d..598118966 100644 --- a/validator/impl/collator-impl.h +++ b/validator/impl/collator-impl.h @@ -133,8 +133,8 @@ class Collator final : public td::actor::Actor { std::unique_ptr config_; std::unique_ptr shard_conf_; std::map> aux_mc_states_; - std::vector neighbor_queues_; - vm::Dictionary neighbor_msg_queues_limits_{32 + 64}; + std::map neighbor_msg_queues_limits_; + vm::Dictionary neighbor_msg_queues_limits_dict_{32 + 64}; std::vector neighbors_; std::unique_ptr nb_out_msgs_; std::vector special_smcs; @@ -241,6 +241,7 @@ class Collator final : public td::actor::Actor { bool register_shard_block_creators(std::vector creator_list); bool init_block_limits(); bool compute_minted_amount(block::CurrencyCollection& to_mint); + bool create_output_queue_merger(); bool init_value_create(); bool try_collate(); bool do_preinit(); diff --git a/validator/impl/collator.cpp b/validator/impl/collator.cpp index c6ac6f2fb..1ce0a4b4c 100644 --- a/validator/impl/collator.cpp +++ b/validator/impl/collator.cpp @@ -672,9 +672,9 @@ void Collator::got_neighbor_msg_queue(unsigned i, td::Result outq_descr = outq_descr_res.move_as_ok(); block::McShardDescr& descr = neighbors_.at(i); + LOG(DEBUG) << "obtained outbound queue for neighbor #" << i << "(" << descr.shard().to_str() << ")"; if (outq_descr->get_block_id() != descr.blk_) { LOG(DEBUG) << "outq_descr->id = " << outq_descr->get_block_id().to_str() << " ; descr.id = " << descr.blk_.to_str(); fatal_error( @@ -692,12 +692,13 @@ void Collator::got_neighbor_msg_queue(unsigned i, td::Resultprefetch_ref(0); descr.set_queue_root(queue_root); - neighbor_queues_.emplace_back(descr.top_block_id(), queue_root, descr.is_disabled(), res->msg_count_); if (res->msg_count_ != -1) { + LOG(DEBUG) << "Neighbor " << descr.shard().to_str() << " has msg_limit=" << res->msg_count_; td::BitArray<96> key; key.bits().store_int(block_id.id.workchain, 32); (key.bits() + 32).store_uint(block_id.id.shard, 64); - neighbor_msg_queues_limits_.set_builder(key, vm::CellBuilder().store_long(res->msg_count_, 32)); + neighbor_msg_queues_limits_dict_.set_builder(key, vm::CellBuilder().store_long(res->msg_count_, 32)); + neighbor_msg_queues_limits_[block_id.shard_full()] = res->msg_count_; } // comment the next two lines in the future when the output queues become huge // CHECK(block::gen::t_OutMsgQueueInfo.validate_ref(1000000, outq_descr->root_cell())); @@ -1674,6 +1675,17 @@ bool Collator::compute_minted_amount(block::CurrencyCollection& to_mint) { return true; } +bool Collator::create_output_queue_merger() { + std::vector neighbor_queues; + for (const auto& descr : neighbors_) { + auto it = neighbor_msg_queues_limits_.find(descr.shard()); + td::int32 msg_limit = it == neighbor_msg_queues_limits_.end() ? -1 : it->second; + neighbor_queues.emplace_back(descr.top_block_id(), descr.outmsg_root, descr.disabled_, msg_limit); + } + nb_out_msgs_ = std::make_unique(shard_, neighbor_queues); + return true; +} + bool Collator::init_value_create() { value_flow_.created.set_zero(); value_flow_.minted.set_zero(); @@ -1731,7 +1743,9 @@ bool Collator::do_collate() { // 1.3. create OutputQueueMerger from adjusted neighbors CHECK(!nb_out_msgs_); LOG(DEBUG) << "creating OutputQueueMerger"; - nb_out_msgs_ = std::make_unique(shard_, neighbor_queues_); + if (!create_output_queue_merger()) { + return fatal_error("cannot compute the value to be created / minted / recovered"); + } // 1.4. compute created / minted / recovered if (!init_value_create()) { return fatal_error("cannot compute the value to be created / minted / recovered"); @@ -1861,18 +1875,34 @@ bool Collator::out_msg_queue_cleanup() { } } - auto res = out_msg_queue_->filter([&](vm::CellSlice& cs, td::ConstBitPtr key, int n) -> int { + auto queue_root = out_msg_queue_->get_root_cell(); + if (queue_root.is_null()) { + LOG(DEBUG) << "out_msg_queue is empty"; + return true; + } + // Unwrap UsageCell: don't build proof for visiting output queue (unless something is deleted) + auto r_cell = queue_root->load_cell(); + if (r_cell.is_error()) { + return fatal_error(r_cell.move_as_error()); + } + auto pure_out_msg_queue = + std::make_unique(r_cell.move_as_ok().data_cell, 352, block::tlb::aug_OutMsgQueue); + + int deleted = 0; + bool fail = false; + pure_out_msg_queue->check_for_each([&](Ref value, td::ConstBitPtr key, int n) -> bool { assert(n == 352); + vm::CellSlice& cs = value.write(); // LOG(DEBUG) << "key is " << key.to_hex(n); if (queue_cleanup_timeout_.is_in_past(td::Timestamp::now())) { LOG(WARNING) << "cleaning up outbound queue takes too long, ending"; outq_cleanup_partial_ = true; - return (1 << 30) + 1; // retain all remaining outbound queue entries including this one without processing + return false; // retain all remaining outbound queue entries including this one without processing } if (block_full_) { LOG(WARNING) << "BLOCK FULL while cleaning up outbound queue, cleanup completed only partially"; outq_cleanup_partial_ = true; - return (1 << 30) + 1; // retain all remaining outbound queue entries including this one without processing + return false; // retain all remaining outbound queue entries including this one without processing } block::EnqueuedMsgDescr enq_msg_descr; unsigned long long created_lt; @@ -1881,7 +1911,8 @@ bool Collator::out_msg_queue_cleanup() { && enq_msg_descr.check_key(key) // check key && enq_msg_descr.lt_ == created_lt)) { LOG(ERROR) << "cannot unpack EnqueuedMsg with key " << key.to_hex(n); - return -1; + fail = true; + return false; } LOG(DEBUG) << "scanning outbound message with (lt,hash)=(" << enq_msg_descr.lt_ << "," << enq_msg_descr.hash_.to_hex() << ") enqueued_lt=" << enq_msg_descr.enqueued_lt_; @@ -1899,20 +1930,30 @@ bool Collator::out_msg_queue_cleanup() { if (delivered) { LOG(DEBUG) << "outbound message with (lt,hash)=(" << enq_msg_descr.lt_ << "," << enq_msg_descr.hash_.to_hex() << ") enqueued_lt=" << enq_msg_descr.enqueued_lt_ << " has been already delivered, dequeueing"; + // Get value from out_msg_queue_ instead of pure_out_msg_queue (for proof) + auto value2 = out_msg_queue_->lookup_delete_with_extra(key, n); + CHECK(value2.not_null()); + vm::CellSlice& cs2 = value2.write(); + CHECK(cs2.fetch_ulong_bool(64, created_lt) // augmentation + && enq_msg_descr.unpack(cs2) // unpack EnqueuedMsg + && enq_msg_descr.check_key(key) // check key + && enq_msg_descr.lt_ == created_lt); + if (!dequeue_message(std::move(enq_msg_descr.msg_env_), deliver_lt)) { fatal_error(PSTRING() << "cannot dequeue outbound message with (lt,hash)=(" << enq_msg_descr.lt_ << "," << enq_msg_descr.hash_.to_hex() << ") by inserting a msg_export_deq record"); - return -1; + fail = true; + return false; } register_out_msg_queue_op(); if (!block_limit_status_->fits(block::ParamLimits::cl_normal)) { block_full_ = true; } } - return !delivered; + return true; }); - LOG(DEBUG) << "deleted " << res << " messages from out_msg_queue"; - if (res < 0) { + LOG(DEBUG) << "deleted " << deleted << " messages from out_msg_queue"; + if (fail) { return fatal_error("error scanning/updating OutMsgQueue"); } auto rt = out_msg_queue_->get_root(); @@ -4026,10 +4067,10 @@ bool Collator::create_collated_data() { collated_roots_.push_back(std::move(cell)); } // 2. Message count for neighbors' out queues - if (!neighbor_msg_queues_limits_.is_empty()) { + if (!neighbor_msg_queues_limits_dict_.is_empty()) { vm::CellBuilder cb; cb.store_long(block::gen::t_NeighborMsgQueueLimits.cons_tag[0], 32); - cb.store_maybe_ref(neighbor_msg_queues_limits_.get_root_cell()); + cb.store_maybe_ref(neighbor_msg_queues_limits_dict_.get_root_cell()); collated_roots_.push_back(cb.finalize_novm()); } if (!full_collated_data_) { diff --git a/validator/impl/validate-query.cpp b/validator/impl/validate-query.cpp index 3329e4fd5..f9e5be275 100644 --- a/validator/impl/validate-query.cpp +++ b/validator/impl/validate-query.cpp @@ -528,6 +528,7 @@ bool ValidateQuery::extract_collated_data_from(Ref croot, int idx) { return reject_query("invalid NeighborMsgQueueLimits"); } neighbor_msg_queues_limits_ = vm::Dictionary{cs.prefetch_ref(0), 32 + 64}; + return true; } LOG(WARNING) << "collated datum # " << idx << " has unknown type (magic " << cs.prefetch_ulong(32) << "), ignoring"; return true; @@ -4123,7 +4124,14 @@ bool ValidateQuery::check_in_queue() { if (msg_limit < -1) { return reject_query("invalid value in NeighborMsgQueueLimits"); } + LOG(DEBUG) << "Neighbor " << descr.shard().to_str() << " has msg_limit=" << msg_limit; neighbor_queues.emplace_back(descr.top_block_id(), descr.outmsg_root, descr.disabled_, msg_limit); + if (msg_limit != -1 && descr.shard().is_masterchain()) { + return reject_query("masterchain out message queue cannot be limited"); + } + if (msg_limit != -1 && shard_intersects(descr.shard(), shard_)) { + return reject_query("prev block out message queue cannot be limited"); + } } block::OutputQueueMerger nb_out_msgs(shard_, std::move(neighbor_queues)); while (!nb_out_msgs.is_eof()) {