Skip to content

Commit

Permalink
Bugfix in processing message queues; improve out_msg_queue_cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
SpyCheese committed Jul 6, 2023
1 parent 5dd0c15 commit f10c7f5
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 17 deletions.
2 changes: 1 addition & 1 deletion crypto/block/output-queue-merger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ bool OutputQueueMerger::load() {
}
}
msg_list.resize(j);
return true;
return msg_list.size() > orig_size;
}

} // namespace block
1 change: 1 addition & 0 deletions crypto/block/output-queue-merger.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ struct OutputQueueMerger {
td::Ref<vm::Cell> outmsg_root_;
bool disabled_;
td::int32 msg_limit_; // -1 - unlimited
Neighbor() = default;
Neighbor(ton::BlockIdExt block_id, td::Ref<vm::Cell> outmsg_root, bool disabled = false, td::int32 msg_limit = -1)
: block_id_(block_id), outmsg_root_(std::move(outmsg_root)), disabled_(disabled), msg_limit_(msg_limit) {
}
Expand Down
5 changes: 3 additions & 2 deletions validator/impl/collator-impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@ class Collator final : public td::actor::Actor {
std::unique_ptr<block::ConfigInfo> config_;
std::unique_ptr<block::ShardConfig> shard_conf_;
std::map<BlockSeqno, Ref<MasterchainStateQ>> aux_mc_states_;
std::vector<block::OutputQueueMerger::Neighbor> neighbor_queues_;
vm::Dictionary neighbor_msg_queues_limits_{32 + 64};
std::map<ShardIdFull, td::int32> neighbor_msg_queues_limits_;
vm::Dictionary neighbor_msg_queues_limits_dict_{32 + 64};
std::vector<block::McShardDescr> neighbors_;
std::unique_ptr<block::OutputQueueMerger> nb_out_msgs_;
std::vector<ton::StdSmcAddress> special_smcs;
Expand Down Expand Up @@ -241,6 +241,7 @@ class Collator final : public td::actor::Actor {
bool register_shard_block_creators(std::vector<td::Bits256> creator_list);
bool init_block_limits();
bool compute_minted_amount(block::CurrencyCollection& to_mint);
bool create_output_queue_merger();
bool init_value_create();
bool try_collate();
bool do_preinit();
Expand Down
69 changes: 55 additions & 14 deletions validator/impl/collator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -672,9 +672,9 @@ void Collator::got_neighbor_msg_queue(unsigned i, td::Result<Ref<OutMsgQueueProo
fatal_error(outq_descr_res.move_as_error());
return;
}
LOG(DEBUG) << "obtained outbound queue for neighbor #" << i;
Ref<MessageQueue> outq_descr = outq_descr_res.move_as_ok();
block::McShardDescr& descr = neighbors_.at(i);
LOG(DEBUG) << "obtained outbound queue for neighbor #" << i << "(" << descr.shard().to_str() << ")";
if (outq_descr->get_block_id() != descr.blk_) {
LOG(DEBUG) << "outq_descr->id = " << outq_descr->get_block_id().to_str() << " ; descr.id = " << descr.blk_.to_str();
fatal_error(
Expand All @@ -692,12 +692,13 @@ void Collator::got_neighbor_msg_queue(unsigned i, td::Result<Ref<OutMsgQueueProo
}
auto queue_root = qinfo.out_queue->prefetch_ref(0);
descr.set_queue_root(queue_root);
neighbor_queues_.emplace_back(descr.top_block_id(), queue_root, descr.is_disabled(), res->msg_count_);
if (res->msg_count_ != -1) {
LOG(DEBUG) << "Neighbor " << descr.shard().to_str() << " has msg_limit=" << res->msg_count_;
td::BitArray<96> key;
key.bits().store_int(block_id.id.workchain, 32);
(key.bits() + 32).store_uint(block_id.id.shard, 64);
neighbor_msg_queues_limits_.set_builder(key, vm::CellBuilder().store_long(res->msg_count_, 32));
neighbor_msg_queues_limits_dict_.set_builder(key, vm::CellBuilder().store_long(res->msg_count_, 32));
neighbor_msg_queues_limits_[block_id.shard_full()] = res->msg_count_;
}
// comment the next two lines in the future when the output queues become huge
// CHECK(block::gen::t_OutMsgQueueInfo.validate_ref(1000000, outq_descr->root_cell()));
Expand Down Expand Up @@ -1674,6 +1675,17 @@ bool Collator::compute_minted_amount(block::CurrencyCollection& to_mint) {
return true;
}

bool Collator::create_output_queue_merger() {
std::vector<block::OutputQueueMerger::Neighbor> neighbor_queues;
for (const auto& descr : neighbors_) {
auto it = neighbor_msg_queues_limits_.find(descr.shard());
td::int32 msg_limit = it == neighbor_msg_queues_limits_.end() ? -1 : it->second;
neighbor_queues.emplace_back(descr.top_block_id(), descr.outmsg_root, descr.disabled_, msg_limit);
}
nb_out_msgs_ = std::make_unique<block::OutputQueueMerger>(shard_, neighbor_queues);
return true;
}

bool Collator::init_value_create() {
value_flow_.created.set_zero();
value_flow_.minted.set_zero();
Expand Down Expand Up @@ -1731,7 +1743,9 @@ bool Collator::do_collate() {
// 1.3. create OutputQueueMerger from adjusted neighbors
CHECK(!nb_out_msgs_);
LOG(DEBUG) << "creating OutputQueueMerger";
nb_out_msgs_ = std::make_unique<block::OutputQueueMerger>(shard_, neighbor_queues_);
if (!create_output_queue_merger()) {
return fatal_error("cannot compute the value to be created / minted / recovered");
}
// 1.4. compute created / minted / recovered
if (!init_value_create()) {
return fatal_error("cannot compute the value to be created / minted / recovered");
Expand Down Expand Up @@ -1861,18 +1875,34 @@ bool Collator::out_msg_queue_cleanup() {
}
}

auto res = out_msg_queue_->filter([&](vm::CellSlice& cs, td::ConstBitPtr key, int n) -> int {
auto queue_root = out_msg_queue_->get_root_cell();
if (queue_root.is_null()) {
LOG(DEBUG) << "out_msg_queue is empty";
return true;
}
// Unwrap UsageCell: don't build proof for visiting output queue (unless something is deleted)
auto r_cell = queue_root->load_cell();
if (r_cell.is_error()) {
return fatal_error(r_cell.move_as_error());
}
auto pure_out_msg_queue =
std::make_unique<vm::AugmentedDictionary>(r_cell.move_as_ok().data_cell, 352, block::tlb::aug_OutMsgQueue);

int deleted = 0;
bool fail = false;
pure_out_msg_queue->check_for_each([&](Ref<vm::CellSlice> value, td::ConstBitPtr key, int n) -> bool {
assert(n == 352);
vm::CellSlice& cs = value.write();
// LOG(DEBUG) << "key is " << key.to_hex(n);
if (queue_cleanup_timeout_.is_in_past(td::Timestamp::now())) {
LOG(WARNING) << "cleaning up outbound queue takes too long, ending";
outq_cleanup_partial_ = true;
return (1 << 30) + 1; // retain all remaining outbound queue entries including this one without processing
return false; // retain all remaining outbound queue entries including this one without processing
}
if (block_full_) {
LOG(WARNING) << "BLOCK FULL while cleaning up outbound queue, cleanup completed only partially";
outq_cleanup_partial_ = true;
return (1 << 30) + 1; // retain all remaining outbound queue entries including this one without processing
return false; // retain all remaining outbound queue entries including this one without processing
}
block::EnqueuedMsgDescr enq_msg_descr;
unsigned long long created_lt;
Expand All @@ -1881,7 +1911,8 @@ bool Collator::out_msg_queue_cleanup() {
&& enq_msg_descr.check_key(key) // check key
&& enq_msg_descr.lt_ == created_lt)) {
LOG(ERROR) << "cannot unpack EnqueuedMsg with key " << key.to_hex(n);
return -1;
fail = true;
return false;
}
LOG(DEBUG) << "scanning outbound message with (lt,hash)=(" << enq_msg_descr.lt_ << ","
<< enq_msg_descr.hash_.to_hex() << ") enqueued_lt=" << enq_msg_descr.enqueued_lt_;
Expand All @@ -1899,20 +1930,30 @@ bool Collator::out_msg_queue_cleanup() {
if (delivered) {
LOG(DEBUG) << "outbound message with (lt,hash)=(" << enq_msg_descr.lt_ << "," << enq_msg_descr.hash_.to_hex()
<< ") enqueued_lt=" << enq_msg_descr.enqueued_lt_ << " has been already delivered, dequeueing";
// Get value from out_msg_queue_ instead of pure_out_msg_queue (for proof)
auto value2 = out_msg_queue_->lookup_delete_with_extra(key, n);
CHECK(value2.not_null());
vm::CellSlice& cs2 = value2.write();
CHECK(cs2.fetch_ulong_bool(64, created_lt) // augmentation
&& enq_msg_descr.unpack(cs2) // unpack EnqueuedMsg
&& enq_msg_descr.check_key(key) // check key
&& enq_msg_descr.lt_ == created_lt);

if (!dequeue_message(std::move(enq_msg_descr.msg_env_), deliver_lt)) {
fatal_error(PSTRING() << "cannot dequeue outbound message with (lt,hash)=(" << enq_msg_descr.lt_ << ","
<< enq_msg_descr.hash_.to_hex() << ") by inserting a msg_export_deq record");
return -1;
fail = true;
return false;
}
register_out_msg_queue_op();
if (!block_limit_status_->fits(block::ParamLimits::cl_normal)) {
block_full_ = true;
}
}
return !delivered;
return true;
});
LOG(DEBUG) << "deleted " << res << " messages from out_msg_queue";
if (res < 0) {
LOG(DEBUG) << "deleted " << deleted << " messages from out_msg_queue";
if (fail) {
return fatal_error("error scanning/updating OutMsgQueue");
}
auto rt = out_msg_queue_->get_root();
Expand Down Expand Up @@ -4026,10 +4067,10 @@ bool Collator::create_collated_data() {
collated_roots_.push_back(std::move(cell));
}
// 2. Message count for neighbors' out queues
if (!neighbor_msg_queues_limits_.is_empty()) {
if (!neighbor_msg_queues_limits_dict_.is_empty()) {
vm::CellBuilder cb;
cb.store_long(block::gen::t_NeighborMsgQueueLimits.cons_tag[0], 32);
cb.store_maybe_ref(neighbor_msg_queues_limits_.get_root_cell());
cb.store_maybe_ref(neighbor_msg_queues_limits_dict_.get_root_cell());
collated_roots_.push_back(cb.finalize_novm());
}
if (!full_collated_data_) {
Expand Down
8 changes: 8 additions & 0 deletions validator/impl/validate-query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,7 @@ bool ValidateQuery::extract_collated_data_from(Ref<vm::Cell> croot, int idx) {
return reject_query("invalid NeighborMsgQueueLimits");
}
neighbor_msg_queues_limits_ = vm::Dictionary{cs.prefetch_ref(0), 32 + 64};
return true;
}
LOG(WARNING) << "collated datum # " << idx << " has unknown type (magic " << cs.prefetch_ulong(32) << "), ignoring";
return true;
Expand Down Expand Up @@ -4123,7 +4124,14 @@ bool ValidateQuery::check_in_queue() {
if (msg_limit < -1) {
return reject_query("invalid value in NeighborMsgQueueLimits");
}
LOG(DEBUG) << "Neighbor " << descr.shard().to_str() << " has msg_limit=" << msg_limit;
neighbor_queues.emplace_back(descr.top_block_id(), descr.outmsg_root, descr.disabled_, msg_limit);
if (msg_limit != -1 && descr.shard().is_masterchain()) {
return reject_query("masterchain out message queue cannot be limited");
}
if (msg_limit != -1 && shard_intersects(descr.shard(), shard_)) {
return reject_query("prev block out message queue cannot be limited");
}
}
block::OutputQueueMerger nb_out_msgs(shard_, std::move(neighbor_queues));
while (!nb_out_msgs.is_eof()) {
Expand Down

0 comments on commit f10c7f5

Please sign in to comment.