Skip to content

Commit

Permalink
Rework limiting imported msg queues
Browse files Browse the repository at this point in the history
  • Loading branch information
SpyCheese committed Jul 20, 2023
1 parent e6b77ef commit 869c6fe
Show file tree
Hide file tree
Showing 9 changed files with 30 additions and 51 deletions.
1 change: 0 additions & 1 deletion crypto/block/block.tlb
Original file line number Diff line number Diff line change
Expand Up @@ -829,7 +829,6 @@ top_block_descr#d5 proof_for:BlockIdExt signatures:(Maybe ^BlockSignatures)
// COLLATED DATA
//
top_block_descr_set#4ac789f3 collection:(HashmapE 96 ^TopBlockDescr) = TopBlockDescrSet;
neighbor_msg_queue_limits#7e549333 neighbors:(HashmapE 96 int32) = NeighborMsgQueueLimits;

//
// VALIDATOR MISBEHAVIOR COMPLAINTS
Expand Down
14 changes: 5 additions & 9 deletions crypto/block/output-queue-merger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,10 +199,6 @@ bool OutputQueueMerger::load() {
unsigned long long lt = heap[0]->lt;
std::size_t orig_size = msg_list.size();
do {
if (src_remaining_msgs_[heap[0]->source] == 0) {
std::pop_heap(heap.begin(), heap.end(), MsgKeyValue::greater);
continue;
}
while (heap[0]->is_fork()) {
auto other = std::make_unique<MsgKeyValue>();
if (!heap[0]->split(*other)) {
Expand All @@ -218,17 +214,17 @@ bool OutputQueueMerger::load() {
heap.pop_back();
} while (!heap.empty() && heap[0]->lt <= lt);
std::sort(msg_list.begin() + orig_size, msg_list.end(), MsgKeyValue::less);
size_t j = orig_size;
for (size_t i = orig_size; i < msg_list.size(); ++i) {
td::int32 &remaining = src_remaining_msgs_[msg_list[i]->source];
if (remaining != 0) {
if (remaining > 0) {
if (remaining != -1) {
if (remaining == 0) {
limit_exceeded = true;
} else {
--remaining;
}
msg_list[j++] = std::move(msg_list[i]);
}
msg_list[i]->limit_exceeded = limit_exceeded;
}
msg_list.resize(j);
return msg_list.size() > orig_size;
}

Expand Down
2 changes: 2 additions & 0 deletions crypto/block/output-queue-merger.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ struct OutputQueueMerger {
int source;
int key_len{0};
td::BitArray<max_key_len> key;
bool limit_exceeded{false};
MsgKeyValue() = default;
MsgKeyValue(int src, Ref<vm::Cell> node);
MsgKeyValue(td::ConstBitPtr key_pfx, int key_pfx_len, int src, Ref<vm::Cell> node);
Expand Down Expand Up @@ -82,6 +83,7 @@ struct OutputQueueMerger {
std::vector<td::int32> src_remaining_msgs_;
bool eof;
bool failed;
bool limit_exceeded{false};
void add_root(int src, Ref<vm::Cell> outmsg_root, td::int32 msg_limit);
bool load();
};
Expand Down
1 change: 0 additions & 1 deletion validator/impl/collator-impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ class Collator final : public td::actor::Actor {
std::unique_ptr<block::ShardConfig> shard_conf_;
std::map<BlockSeqno, Ref<MasterchainStateQ>> aux_mc_states_;
std::map<ShardIdFull, td::int32> neighbor_msg_queues_limits_;
vm::Dictionary neighbor_msg_queues_limits_dict_{32 + 64};
std::vector<block::McShardDescr> neighbors_;
std::unique_ptr<block::OutputQueueMerger> nb_out_msgs_;
std::vector<ton::StdSmcAddress> special_smcs;
Expand Down
25 changes: 10 additions & 15 deletions validator/impl/collator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -701,11 +701,7 @@ void Collator::got_neighbor_msg_queue(unsigned i, td::Result<Ref<OutMsgQueueProo
auto queue_root = qinfo.out_queue->prefetch_ref(0);
descr.set_queue_root(queue_root);
if (res->msg_count_ != -1) {
LOG(DEBUG) << "Neighbor " << descr.shard().to_str() << " has msg_limit=" << res->msg_count_;
td::BitArray<96> key;
key.bits().store_int(block_id.id.workchain, 32);
(key.bits() + 32).store_uint(block_id.id.shard, 64);
neighbor_msg_queues_limits_dict_.set_builder(key, vm::CellBuilder().store_long(res->msg_count_, 32));
LOG(INFO) << "neighbor " << descr.shard().to_str() << " has msg_limit=" << res->msg_count_;
neighbor_msg_queues_limits_[block_id.shard_full()] = res->msg_count_;
}
// comment the next two lines in the future when the output queues become huge
Expand Down Expand Up @@ -2833,6 +2829,12 @@ bool Collator::process_inbound_internal_messages() {
block_full_ = !block_limit_status_->fits(block::ParamLimits::cl_normal);
auto kv = nb_out_msgs_->extract_cur();
CHECK(kv && kv->msg.not_null());
if (kv->limit_exceeded) {
LOG(INFO) << "limit for imported messages is reached, stop processing inbound internal messages";
block::EnqueuedMsgDescr enq;
enq.unpack(kv->msg.write()); // Visit cells to include it in proof
break;
}
if (!precheck_inbound_message(kv->msg, kv->lt)) {
if (verbosity > 1) {
std::cerr << "invalid inbound message: lt=" << kv->lt << " from=" << kv->source << " key=" << kv->key.to_hex()
Expand Down Expand Up @@ -4089,21 +4091,14 @@ bool Collator::create_collated_data() {
}
collated_roots_.push_back(std::move(cell));
}
// 2. Message count for neighbors' out queues
if (!neighbor_msg_queues_limits_dict_.is_empty()) {
vm::CellBuilder cb;
cb.store_long(block::gen::t_NeighborMsgQueueLimits.cons_tag[0], 32);
cb.store_maybe_ref(neighbor_msg_queues_limits_dict_.get_root_cell());
collated_roots_.push_back(cb.finalize_novm());
}
if (!full_collated_data_) {
return true;
}
// 3. Proofs for hashes of states: previous states + neighbors
// 2. Proofs for hashes of states: previous states + neighbors
for (const auto& p : block_state_proofs_) {
collated_roots_.push_back(p.second);
}
// 4. Previous state proof (only shadchains)
// 3. Previous state proof (only shadchains)
std::map<td::Bits256, Ref<vm::Cell>> proofs;
if (!is_masterchain()) {
if (!prepare_msg_queue_proof()) {
Expand All @@ -4117,7 +4112,7 @@ bool Collator::create_collated_data() {
}
proofs[prev_state_root_->get_hash().bits()] = std::move(state_proof);
}
// 5. Proofs for message queues
// 4. Proofs for message queues
for (vm::MerkleProofBuilder &mpb : neighbor_proof_builders_) {
auto r_proof = mpb.extract_proof();
if (r_proof.is_error()) {
Expand Down
13 changes: 11 additions & 2 deletions validator/impl/out-msg-queue-proof.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ static td::Result<td::int32> process_queue(BlockIdExt block_id, ShardIdFull dst_

td::HashSet<vm::Cell::Hash> visited;
std::function<void(const vm::CellSlice&)> dfs_cs;
auto dfs = [&](Ref<vm::Cell> cell) {
auto dfs = [&](const Ref<vm::Cell>& cell) {
if (cell.is_null() || !visited.insert(cell->get_hash()).second) {
return;
}
Expand All @@ -65,6 +65,8 @@ static td::Result<td::int32> process_queue(BlockIdExt block_id, ShardIdFull dst_
dfs(cs.prefetch_ref(i));
}
};
TRY_STATUS_PREFIX(check_no_prunned(*qinfo.proc_info), "invalid proc_info proof: ")
TRY_STATUS_PREFIX(check_no_prunned(*qinfo.ihr_pending), "invalid ihr_pending proof: ")
dfs_cs(*qinfo.proc_info);
dfs_cs(*qinfo.ihr_pending);

Expand All @@ -76,6 +78,14 @@ static td::Result<td::int32> process_queue(BlockIdExt block_id, ShardIdFull dst_
while (!queue_merger.is_eof()) {
auto kv = queue_merger.extract_cur();
queue_merger.next();
block::EnqueuedMsgDescr enq;
auto msg = kv->msg;
if (!enq.unpack(msg.write())) {
return td::Status::Error("cannot unpack EnqueuedMsgDescr");
}
if (limit_reached) {
break;
}
++msg_count;

// TODO: Get processed_upto from destination shard (in request?)
Expand Down Expand Up @@ -105,7 +115,6 @@ static td::Result<td::int32> process_queue(BlockIdExt block_id, ShardIdFull dst_
TRY_STATUS_PREFIX(check_no_prunned(*kv->msg), "invalid message proof: ")
if (estimated_proof_size > OutMsgQueueProof::QUEUE_SIZE_THRESHOLD) {
limit_reached = true;
break;
}
}
return limit_reached ? msg_count : -1;
Expand Down
22 changes: 1 addition & 21 deletions validator/impl/validate-query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -526,14 +526,6 @@ bool ValidateQuery::extract_collated_data_from(Ref<vm::Cell> croot, int idx) {
top_shard_descr_dict_ = std::make_unique<vm::Dictionary>(cs.prefetch_ref(), 96);
return true;
}
if (block::gen::t_NeighborMsgQueueLimits.has_valid_tag(cs)) {
LOG(DEBUG) << "collated datum # " << idx << " is a NeighborMsgQueueLimits";
if (!block::gen::t_NeighborMsgQueueLimits.validate_upto(10000, cs)) {
return reject_query("invalid NeighborMsgQueueLimits");
}
neighbor_msg_queues_limits_ = vm::Dictionary{cs.prefetch_ref(0), 32 + 64};
return true;
}
LOG(WARNING) << "collated datum # " << idx << " has unknown type (magic " << cs.prefetch_ulong(32) << "), ignoring";
return true;
}
Expand Down Expand Up @@ -4115,19 +4107,7 @@ bool ValidateQuery::check_in_queue() {
td::BitArray<96> key;
key.bits().store_int(descr.workchain(), 32);
(key.bits() + 32).store_uint(descr.shard().shard, 64);
auto r = neighbor_msg_queues_limits_.lookup(key);
td::int32 msg_limit = r.is_null() ? -1 : (td::int32)r->prefetch_long(32);
if (msg_limit < -1) {
return reject_query("invalid value in NeighborMsgQueueLimits");
}
LOG(DEBUG) << "Neighbor " << descr.shard().to_str() << " has msg_limit=" << msg_limit;
neighbor_queues.emplace_back(descr.top_block_id(), descr.outmsg_root, descr.disabled_, msg_limit);
if (msg_limit != -1 && descr.shard().is_masterchain()) {
return reject_query("masterchain out message queue cannot be limited");
}
if (msg_limit != -1 && shard_intersects(descr.shard(), shard_)) {
return reject_query("prev block out message queue cannot be limited");
}
neighbor_queues.emplace_back(descr.top_block_id(), descr.outmsg_root, descr.disabled_);
}
block::OutputQueueMerger nb_out_msgs(shard_, std::move(neighbor_queues));
while (!nb_out_msgs.is_eof()) {
Expand Down
1 change: 0 additions & 1 deletion validator/impl/validate-query.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,6 @@ class ValidateQuery : public td::actor::Actor {
block::ActionPhaseConfig action_phase_cfg_;
td::RefInt256 masterchain_create_fee_, basechain_create_fee_;

vm::Dictionary neighbor_msg_queues_limits_{32 + 64};
std::vector<block::McShardDescr> neighbors_;
std::map<BlockSeqno, Ref<MasterchainStateQ>> aux_mc_states_;

Expand Down
2 changes: 1 addition & 1 deletion validator/validator-group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ void ValidatorGroup::send_collate_query(td::uint32 round_id, td::Timestamp timeo
promise = td::PromiseCreator::lambda([=, SelfId = actor_id(this), promise = std::move(promise),
timer = td::Timer()](td::Result<BlockCandidate> R) mutable {
if (R.is_ok()) {
LOG(WARNING) << "collate query for " << next_block_id.to_str() << ": success, time=" << timer.elapsed() << "s";
LOG(INFO) << "collate query for " << next_block_id.to_str() << ": success, time=" << timer.elapsed() << "s";
promise.set_result(R.move_as_ok());
return;
}
Expand Down

0 comments on commit 869c6fe

Please sign in to comment.