Skip to content

Commit

Permalink
Move msg queue limits to config
Browse files Browse the repository at this point in the history
  • Loading branch information
SpyCheese committed Jul 24, 2023
1 parent 66b98b6 commit f1e62d0
Show file tree
Hide file tree
Showing 21 changed files with 109 additions and 48 deletions.
20 changes: 16 additions & 4 deletions crypto/block/block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,12 @@ bool EnqueuedMsgDescr::check_key(td::ConstBitPtr key) const {
hash_ == key + 96;
}

bool ImportedMsgQueueLimits::deserialize(vm::CellSlice& cs) {
return cs.fetch_ulong(8) == 0xd3 // imported_msg_queue_limits#d3
&& cs.fetch_uint_to(32, max_bytes) // max_bytes:#
&& cs.fetch_uint_to(32, max_msgs); // max_msgs:#
}

bool ParamLimits::deserialize(vm::CellSlice& cs) {
return cs.fetch_ulong(8) == 0xc3 // param_limits#c3
&& cs.fetch_uint_to(32, limits_[0]) // underload:uint32
Expand All @@ -666,10 +672,16 @@ bool ParamLimits::deserialize(vm::CellSlice& cs) {
}

bool BlockLimits::deserialize(vm::CellSlice& cs) {
return cs.fetch_ulong(8) == 0x5d // block_limits#5d
&& bytes.deserialize(cs) // bytes:ParamLimits
&& gas.deserialize(cs) // gas:ParamLimits
&& lt_delta.deserialize(cs); // lt_delta:ParamLimits
auto tag = cs.fetch_ulong(8);
if (tag != 0x5d && tag != 0x5e) {
return false;
}
// block_limits#5d
// block_limits_v2#5e
return bytes.deserialize(cs) // bytes:ParamLimits
&& gas.deserialize(cs) // gas:ParamLimits
&& lt_delta.deserialize(cs) // lt_delta:ParamLimits
&& (tag == 0x5d || imported_msg_queue.deserialize(cs)); // imported_msg_queue:ImportedMsgQueueLimits
}

int ParamLimits::classify(td::uint64 value) const {
Expand Down
8 changes: 8 additions & 0 deletions crypto/block/block.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,13 @@ static inline std::ostream& operator<<(std::ostream& os, const MsgProcessedUptoC
return proc_coll.print(os);
}

struct ImportedMsgQueueLimits {
// Default values
td::uint32 max_bytes = 1 << 18;
td::uint32 max_msgs = 40;
bool deserialize(vm::CellSlice& cs);
};

struct ParamLimits {
enum { limits_cnt = 4 };
enum { cl_underload = 0, cl_normal = 1, cl_soft = 2, cl_medium = 3, cl_hard = 4 };
Expand Down Expand Up @@ -247,6 +254,7 @@ struct ParamLimits {
struct BlockLimits {
ParamLimits bytes, gas, lt_delta;
ton::LogicalTime start_lt{0};
ImportedMsgQueueLimits imported_msg_queue;
const vm::CellUsageTree* usage_tree{nullptr};
bool deserialize(vm::CellSlice& cs);
int classify_size(td::uint64 size) const;
Expand Down
6 changes: 5 additions & 1 deletion crypto/block/block.tlb
Original file line number Diff line number Diff line change
Expand Up @@ -704,9 +704,13 @@ config_gas_prices#_ GasLimitsPrices = ConfigParam 21;

param_limits#c3 underload:# soft_limit:# { underload <= soft_limit }
hard_limit:# { soft_limit <= hard_limit } = ParamLimits;
imported_msg_queue_limits#d3 max_bytes:# max_msgs:# = ImportedMsgQueueLimits;
block_limits#5d bytes:ParamLimits gas:ParamLimits lt_delta:ParamLimits
= BlockLimits;

block_limits_v2#5e bytes:ParamLimits gas:ParamLimits lt_delta:ParamLimits
imported_msg_queue:ImportedMsgQueueLimits
= BlockLimits;

config_mc_block_limits#_ BlockLimits = ConfigParam 22;
config_block_limits#_ BlockLimits = ConfigParam 23;

Expand Down
4 changes: 3 additions & 1 deletion tl/generate/scheme/ton_api.tl
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,7 @@ tonNode.success = tonNode.Success;
tonNode.archiveNotFound = tonNode.ArchiveInfo;
tonNode.archiveInfo id:long = tonNode.ArchiveInfo;

tonNode.importedMsgQueueLimits max_bytes:int max_msgs:int = ImportedMsgQueueLimits;
tonNode.outMsgQueueProof queue_proof:bytes block_state_proof:bytes msg_count:int = tonNode.OutMsgQueueProof;
tonNode.outMsgQueueProofEmpty = tonNode.OutMsgQueueProof;

Expand Down Expand Up @@ -451,7 +452,8 @@ tonNode.downloadBlockProofLinks blocks:(vector tonNode.blockIdExt) = tonNode.Dat
tonNode.downloadKeyBlockProofLinks blocks:(vector tonNode.blockIdExt) = tonNode.DataList;
tonNode.getArchiveInfo masterchain_seqno:int = tonNode.ArchiveInfo;
tonNode.getArchiveSlice archive_id:long offset:long max_size:int = tonNode.Data;
tonNode.getOutMsgQueueProof block_id:tonNode.blockIdExt dst_workchain:int dst_shard:long = tonNode.OutMsgQueueProof;
tonNode.getOutMsgQueueProof block_id:tonNode.blockIdExt dst_workchain:int dst_shard:long
limits:tonNode.importedMsgQueueLimits = tonNode.OutMsgQueueProof;

tonNode.getCapabilities = tonNode.Capabilities;
tonNode.getCapabilitiesV2 = tonNode.Capabilities;
Expand Down
Binary file modified tl/generate/scheme/ton_api.tlo
Binary file not shown.
17 changes: 12 additions & 5 deletions validator/full-node-shard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,7 @@ void FullNodeShardImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::tonNod
td::Promise<td::BufferSlice> promise) {
BlockIdExt block_id = create_block_id(query.block_id_);
ShardIdFull dst_shard(query.dst_workchain_, query.dst_shard_);
block::ImportedMsgQueueLimits limits{(td::uint32)query.limits_->max_bytes_, (td::uint32)query.limits_->max_msgs_};
if (!block_id.is_valid_ext()) {
promise.set_error(td::Status::Error("invalid block_id"));
return;
Expand All @@ -674,6 +675,10 @@ void FullNodeShardImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::tonNod
promise.set_error(td::Status::Error("invalid shard"));
return;
}
if (limits.max_bytes > (1 << 21)) {
promise.set_error(td::Status::Error("max_bytes is too big"));
return;
}
auto P = td::PromiseCreator::lambda(
[promise = std::move(promise)](td::Result<tl_object_ptr<ton_api::tonNode_outMsgQueueProof>> R) mutable {
if (R.is_error()) {
Expand All @@ -684,7 +689,7 @@ void FullNodeShardImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::tonNod
});
VLOG(FULL_NODE_DEBUG) << "Got query getOutMsgQueueProof " << block_id.to_str() << " " << dst_shard.to_str()
<< " from " << src;
td::actor::create_actor<BuildOutMsgQueueProof>("buildqueueproof", block_id, dst_shard, validator_manager_,
td::actor::create_actor<BuildOutMsgQueueProof>("buildqueueproof", block_id, dst_shard, limits, validator_manager_,
std::move(P))
.release();
}
Expand Down Expand Up @@ -924,7 +929,8 @@ void FullNodeShardImpl::download_archive(BlockSeqno masterchain_seqno, std::stri
.release();
}

void FullNodeShardImpl::download_out_msg_queue_proof(BlockIdExt block_id, ShardIdFull dst_shard, td::Timestamp timeout,
void FullNodeShardImpl::download_out_msg_queue_proof(BlockIdExt block_id, ShardIdFull dst_shard,
block::ImportedMsgQueueLimits limits, td::Timestamp timeout,
td::Promise<td::Ref<OutMsgQueueProof>> promise) {
// TODO: maybe more complex download (like other requests here)
auto &b = choose_neighbour(true);
Expand All @@ -944,13 +950,14 @@ void FullNodeShardImpl::download_out_msg_queue_proof(BlockIdExt block_id, ShardI
promise.set_error(td::Status::Error("node doesn't have this block"));
},
[&](ton_api::tonNode_outMsgQueueProof &x) {
promise.set_result(OutMsgQueueProof::fetch(block_id, dst_shard, x));
promise.set_result(OutMsgQueueProof::fetch(block_id, dst_shard, limits, x));
}));
});
td::BufferSlice query = create_serialize_tl_object<ton_api::tonNode_getOutMsgQueueProof>(
create_tl_block_id(block_id), dst_shard.workchain, dst_shard.shard);
create_tl_block_id(block_id), dst_shard.workchain, dst_shard.shard,
create_tl_object<ton_api::tonNode_importedMsgQueueLimits>(limits.max_bytes, limits.max_msgs));
td::actor::send_closure(overlays_, &overlay::Overlays::send_query_via, b.adnl_id, adnl_id_, overlay_id_,
"get_msg_queue", std::move(P), timeout, std::move(query), 1 << 20, rldp_);
"get_msg_queue", std::move(P), timeout, std::move(query), 1 << 22, rldp_);
}

void FullNodeShardImpl::set_handle(BlockHandle handle, td::Promise<td::Unit> promise) {
Expand Down
3 changes: 2 additions & 1 deletion validator/full-node-shard.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ class FullNodeShard : public td::actor::Actor {
td::Promise<std::vector<BlockIdExt>> promise) = 0;
virtual void download_archive(BlockSeqno masterchain_seqno, std::string tmp_dir, td::Timestamp timeout,
td::Promise<std::string> promise) = 0;
virtual void download_out_msg_queue_proof(BlockIdExt block_id, ShardIdFull dst_shard, td::Timestamp timeout,
virtual void download_out_msg_queue_proof(BlockIdExt block_id, ShardIdFull dst_shard,
block::ImportedMsgQueueLimits limits, td::Timestamp timeout,
td::Promise<td::Ref<OutMsgQueueProof>> promise) = 0;

virtual void set_handle(BlockHandle handle, td::Promise<td::Unit> promise) = 0;
Expand Down
4 changes: 2 additions & 2 deletions validator/full-node-shard.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,8 @@ class FullNodeShardImpl : public FullNodeShard {
td::Promise<std::vector<BlockIdExt>> promise) override;
void download_archive(BlockSeqno masterchain_seqno, std::string tmp_dir, td::Timestamp timeout,
td::Promise<std::string> promise) override;
void download_out_msg_queue_proof(BlockIdExt block_id, ShardIdFull dst_shard, td::Timestamp timeout,
td::Promise<td::Ref<OutMsgQueueProof>> promise) override;
void download_out_msg_queue_proof(BlockIdExt block_id, ShardIdFull dst_shard, block::ImportedMsgQueueLimits limits,
td::Timestamp timeout, td::Promise<td::Ref<OutMsgQueueProof>> promise) override;

void set_handle(BlockHandle handle, td::Promise<td::Unit> promise) override;

Expand Down
11 changes: 6 additions & 5 deletions validator/full-node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -378,15 +378,16 @@ void FullNodeImpl::download_archive(BlockSeqno masterchain_seqno, std::string tm
std::move(promise));
}

void FullNodeImpl::download_out_msg_queue_proof(BlockIdExt block_id, ShardIdFull dst_shard, td::Timestamp timeout,
void FullNodeImpl::download_out_msg_queue_proof(BlockIdExt block_id, ShardIdFull dst_shard,
block::ImportedMsgQueueLimits limits, td::Timestamp timeout,
td::Promise<td::Ref<OutMsgQueueProof>> promise) {
auto shard = get_shard(block_id.shard_full());
if (shard.empty()) {
VLOG(FULL_NODE_WARNING) << "dropping download msg queue query to unknown shard";
promise.set_error(td::Status::Error(ErrorCode::notready, "shard not ready"));
return;
}
td::actor::send_closure(shard, &FullNodeShard::download_out_msg_queue_proof, block_id, dst_shard, timeout,
td::actor::send_closure(shard, &FullNodeShard::download_out_msg_queue_proof, block_id, dst_shard, limits, timeout,
std::move(promise));
}

Expand Down Expand Up @@ -587,9 +588,9 @@ void FullNodeImpl::start_up() {
td::actor::send_closure(id_, &FullNodeImpl::download_archive, masterchain_seqno, std::move(tmp_dir), timeout,
std::move(promise));
}
void download_out_msg_queue_proof(BlockIdExt block_id, ShardIdFull dst_shard, td::Timestamp timeout,
td::Promise<td::Ref<OutMsgQueueProof>> promise) override {
td::actor::send_closure(id_, &FullNodeImpl::download_out_msg_queue_proof, block_id, dst_shard, timeout,
void download_out_msg_queue_proof(BlockIdExt block_id, ShardIdFull dst_shard, block::ImportedMsgQueueLimits limits,
td::Timestamp timeout, td::Promise<td::Ref<OutMsgQueueProof>> promise) override {
td::actor::send_closure(id_, &FullNodeImpl::download_out_msg_queue_proof, block_id, dst_shard, limits, timeout,
std::move(promise));
}

Expand Down
4 changes: 2 additions & 2 deletions validator/full-node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ class FullNodeImpl : public FullNode {
void get_next_key_blocks(BlockIdExt block_id, td::Timestamp timeout, td::Promise<std::vector<BlockIdExt>> promise);
void download_archive(BlockSeqno masterchain_seqno, std::string tmp_dir, td::Timestamp timeout,
td::Promise<std::string> promise);
void download_out_msg_queue_proof(BlockIdExt block_id, ShardIdFull dst_shard, td::Timestamp timeout,
td::Promise<td::Ref<OutMsgQueueProof>> promise);
void download_out_msg_queue_proof(BlockIdExt block_id, ShardIdFull dst_shard, block::ImportedMsgQueueLimits limits,
td::Timestamp timeout, td::Promise<td::Ref<OutMsgQueueProof>> promise);

void got_key_block_proof(td::Ref<ProofLink> proof);
void got_zero_block_state(td::Ref<ShardState> state);
Expand Down
19 changes: 10 additions & 9 deletions validator/impl/out-msg-queue-proof.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ static td::Status check_no_prunned(const vm::CellSlice& cs) {
}

static td::Result<td::int32> process_queue(BlockIdExt block_id, ShardIdFull dst_shard,
block::ImportedMsgQueueLimits limits,
const block::gen::OutMsgQueueInfo::Record& qinfo) {
td::uint64 estimated_proof_size = 0;

Expand Down Expand Up @@ -113,17 +114,16 @@ static td::Result<td::int32> process_queue(BlockIdExt block_id, ShardIdFull dst_

dfs_cs(*kv->msg);
TRY_STATUS_PREFIX(check_no_prunned(*kv->msg), "invalid message proof: ")
if (estimated_proof_size > OutMsgQueueProof::QUEUE_SIZE_THRESHOLD) {
if (estimated_proof_size >= limits.max_bytes || msg_count >= (long long)limits.max_msgs) {
limit_reached = true;
}
}
return limit_reached ? msg_count : -1;
}

td::Result<tl_object_ptr<ton_api::tonNode_outMsgQueueProof>> OutMsgQueueProof::build(BlockIdExt block_id,
ShardIdFull dst_shard,
Ref<vm::Cell> state_root,
Ref<vm::Cell> block_root) {
td::Result<tl_object_ptr<ton_api::tonNode_outMsgQueueProof>> OutMsgQueueProof::build(
BlockIdExt block_id, ShardIdFull dst_shard,
block::ImportedMsgQueueLimits limits, Ref<vm::Cell> state_root, Ref<vm::Cell> block_root) {
if (!dst_shard.is_valid_ext()) {
return td::Status::Error("invalid shard");
}
Expand All @@ -135,7 +135,7 @@ td::Result<tl_object_ptr<ton_api::tonNode_outMsgQueueProof>> OutMsgQueueProof::b
if (!tlb::unpack_cell(outq_descr->root_cell(), qinfo)) {
return td::Status::Error("invalid message queue");
}
TRY_RESULT(cnt, process_queue(block_id, dst_shard, qinfo));
TRY_RESULT(cnt, process_queue(block_id, dst_shard, limits, qinfo));

TRY_RESULT(queue_proof, mpb.extract_proof_boc());
td::BufferSlice block_state_proof;
Expand All @@ -148,6 +148,7 @@ td::Result<tl_object_ptr<ton_api::tonNode_outMsgQueueProof>> OutMsgQueueProof::b
}

td::Result<td::Ref<OutMsgQueueProof>> OutMsgQueueProof::fetch(BlockIdExt block_id, ShardIdFull dst_shard,
block::ImportedMsgQueueLimits limits,
const ton_api::tonNode_outMsgQueueProof& f) {
try {
Ref<vm::Cell> block_state_proof;
Expand Down Expand Up @@ -181,7 +182,7 @@ td::Result<td::Ref<OutMsgQueueProof>> OutMsgQueueProof::fetch(BlockIdExt block_i
}
TRY_STATUS_PREFIX(check_no_prunned(qinfo.proc_info->prefetch_ref(0)), "invalid proc_info: ")
TRY_STATUS_PREFIX(check_no_prunned(qinfo.ihr_pending->prefetch_ref(0)), "invalid ihr_pending: ")
TRY_RESULT(cnt, process_queue(block_id, dst_shard, qinfo));
TRY_RESULT(cnt, process_queue(block_id, dst_shard, limits, qinfo));
if (cnt != f.msg_count_) {
return td::Status::Error(PSTRING() << "invalid msg_count: expected=" << f.msg_count_ << ", found=" << cnt);
}
Expand Down Expand Up @@ -295,7 +296,7 @@ void WaitOutMsgQueueProof::run_net() {
});

td::actor::send_closure(manager_, &ValidatorManager::send_get_out_msg_queue_proof_request, block_id_, dst_shard_,
priority_, std::move(P));
limits_, priority_, std::move(P));
}

void BuildOutMsgQueueProof::abort_query(td::Status reason) {
Expand Down Expand Up @@ -349,7 +350,7 @@ void BuildOutMsgQueueProof::got_block_root(Ref<vm::Cell> root) {
}

void BuildOutMsgQueueProof::build_proof() {
auto result = OutMsgQueueProof::build(block_id_, dst_shard_, std::move(state_root_), std::move(block_root_));
auto result = OutMsgQueueProof::build(block_id_, dst_shard_, limits_, std::move(state_root_), std::move(block_root_));
if (result.is_error()) {
LOG(ERROR) << "Failed to build msg queue proof: " << result.error();
}
Expand Down
15 changes: 11 additions & 4 deletions validator/impl/out-msg-queue-proof.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@ class ValidatorManagerInterface;

class WaitOutMsgQueueProof : public td::actor::Actor {
public:
WaitOutMsgQueueProof(BlockIdExt block_id, ShardIdFull dst_shard, bool local, td::uint32 priority,
td::actor::ActorId<ValidatorManager> manager, td::Timestamp timeout,
WaitOutMsgQueueProof(BlockIdExt block_id, ShardIdFull dst_shard, block::ImportedMsgQueueLimits limits, bool local,
td::uint32 priority, td::actor::ActorId<ValidatorManager> manager, td::Timestamp timeout,
td::Promise<Ref<OutMsgQueueProof>> promise)
: block_id_(std::move(block_id))
, dst_shard_(dst_shard)
, limits_(limits)
, local_(local)
, priority_(priority)
, manager_(manager)
Expand Down Expand Up @@ -65,6 +66,7 @@ class WaitOutMsgQueueProof : public td::actor::Actor {
private:
BlockIdExt block_id_;
ShardIdFull dst_shard_;
block::ImportedMsgQueueLimits limits_;
bool local_;
td::uint32 priority_;

Expand All @@ -78,10 +80,14 @@ class WaitOutMsgQueueProof : public td::actor::Actor {

class BuildOutMsgQueueProof : public td::actor::Actor {
public:
BuildOutMsgQueueProof(BlockIdExt block_id, ShardIdFull dst_shard,
BuildOutMsgQueueProof(BlockIdExt block_id, ShardIdFull dst_shard, block::ImportedMsgQueueLimits limits,
td::actor::ActorId<ValidatorManagerInterface> manager,
td::Promise<tl_object_ptr<ton_api::tonNode_outMsgQueueProof>> promise)
: block_id_(std::move(block_id)), dst_shard_(dst_shard), manager_(manager), promise_(std::move(promise)) {
: block_id_(std::move(block_id))
, dst_shard_(dst_shard)
, limits_(limits)
, manager_(manager)
, promise_(std::move(promise)) {
}

void abort_query(td::Status reason);
Expand All @@ -93,6 +99,7 @@ class BuildOutMsgQueueProof : public td::actor::Actor {
private:
BlockIdExt block_id_;
ShardIdFull dst_shard_;
block::ImportedMsgQueueLimits limits_;

td::actor::ActorId<ValidatorManagerInterface> manager_;
td::Promise<tl_object_ptr<ton_api::tonNode_outMsgQueueProof>> promise_;
Expand Down
7 changes: 7 additions & 0 deletions validator/impl/shard.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,13 @@ class MasterchainStateQ : public MasterchainState, public ShardStateQ {
auto R = config_->get_size_limits_config();
return R.is_error() ? block::SizeLimitsConfig::ExtMsgLimits() : R.ok_ref().ext_msg_limits;
}
block::ImportedMsgQueueLimits get_imported_msg_queue_limits(bool is_masterchain) const override {
auto R = config_->get_block_limits(is_masterchain);
if (R.is_ok() && R.ok()) {
return R.ok()->imported_msg_queue;
}
return {};
}
BlockIdExt last_key_block_id() const override;
BlockIdExt next_key_block_id(BlockSeqno seqno) const override;
BlockIdExt prev_key_block_id(BlockSeqno seqno) const override;
Expand Down
3 changes: 2 additions & 1 deletion validator/interfaces/out-msg-queue-proof.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,13 @@ struct OutMsgQueueProof : public td::CntObject {
td::int32 msg_count_; // -1 - up to end of queue

static td::Result<td::Ref<OutMsgQueueProof>> fetch(BlockIdExt block_id, ShardIdFull dst_shard,
block::ImportedMsgQueueLimits limits,
const ton_api::tonNode_outMsgQueueProof &f);
static td::Result<tl_object_ptr<ton_api::tonNode_outMsgQueueProof>> build(BlockIdExt block_id, ShardIdFull dst_shard,
block::ImportedMsgQueueLimits limits,
Ref<vm::Cell> state_root,
Ref<vm::Cell> block_root);

static const td::uint64 QUEUE_SIZE_THRESHOLD = 128 * 1024;
};

} // namespace validator
Expand Down
1 change: 1 addition & 0 deletions validator/interfaces/shard.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ class MasterchainState : virtual public ShardState {
return td::Status::OK();
}
virtual block::SizeLimitsConfig::ExtMsgLimits get_ext_msg_limits() const = 0;
virtual block::ImportedMsgQueueLimits get_imported_msg_queue_limits(bool is_masterchain) const = 0;
};

} // namespace validator
Expand Down
Loading

0 comments on commit f1e62d0

Please sign in to comment.