Skip to content

Commit

Permalink
Store out msg queue size in state
Browse files Browse the repository at this point in the history
  • Loading branch information
SpyCheese committed May 21, 2024
1 parent 77f8db6 commit 5ad18fe
Show file tree
Hide file tree
Showing 7 changed files with 119 additions and 22 deletions.
23 changes: 17 additions & 6 deletions crypto/block/block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -871,6 +871,9 @@ td::Status ShardState::unpack_out_msg_queue_info(Ref<vm::Cell> out_msg_queue_inf
return td::Status::Error(-666, "cannot unpack OutMsgQueueExtre in the state of "s + id_.to_str());
}
dispatch_queue_ = std::make_unique<vm::AugmentedDictionary>(extra.dispatch_queue, 256, tlb::aug_DispatchQueue);
if (extra.out_queue_size.write().fetch_long(1)) {
out_msg_queue_size_ = extra.out_queue_size->prefetch_ulong(48);
}
} else {
dispatch_queue_ = std::make_unique<vm::AugmentedDictionary>(256, tlb::aug_DispatchQueue);
}
Expand Down Expand Up @@ -1009,6 +1012,12 @@ td::Status ShardState::merge_with(ShardState& sib) {
return td::Status::Error(-666, "cannot merge dispatch queues of the two ancestors");
}
sib.dispatch_queue_.reset();
// 11. merge out_msg_queue_size
if (out_msg_queue_size_ && sib.out_msg_queue_size_) {
out_msg_queue_size_.value() += sib.out_msg_queue_size_.value();
} else {
out_msg_queue_size_ = {};
}
// Anything else? add here
// ...

Expand All @@ -1024,16 +1033,16 @@ td::Status ShardState::merge_with(ShardState& sib) {
return td::Status::OK();
}

td::Result<std::unique_ptr<vm::AugmentedDictionary>> ShardState::compute_split_out_msg_queue(ton::ShardIdFull subshard,
td::uint32* queue_size) {
td::Result<std::unique_ptr<vm::AugmentedDictionary>> ShardState::compute_split_out_msg_queue(
ton::ShardIdFull subshard) {
auto shard = id_.shard_full();
if (!ton::shard_is_parent(shard, subshard)) {
return td::Status::Error(-666, "cannot split subshard "s + subshard.to_str() + " from state of " + id_.to_str() +
" because it is not a parent");
}
CHECK(out_msg_queue_);
auto subqueue = std::make_unique<vm::AugmentedDictionary>(*out_msg_queue_);
int res = block::filter_out_msg_queue(*subqueue, shard, subshard, queue_size);
int res = block::filter_out_msg_queue(*subqueue, shard, subshard);
if (res < 0) {
return td::Status::Error(-666, "error splitting OutMsgQueue of "s + id_.to_str());
}
Expand All @@ -1055,7 +1064,7 @@ td::Result<std::shared_ptr<block::MsgProcessedUptoCollection>> ShardState::compu
return std::move(sub_processed_upto);
}

td::Status ShardState::split(ton::ShardIdFull subshard, td::uint32* queue_size) {
td::Status ShardState::split(ton::ShardIdFull subshard) {
if (!ton::shard_is_parent(id_.shard_full(), subshard)) {
return td::Status::Error(-666, "cannot split subshard "s + subshard.to_str() + " from state of " + id_.to_str() +
" because it is not a parent");
Expand All @@ -1073,10 +1082,12 @@ td::Status ShardState::split(ton::ShardIdFull subshard, td::uint32* queue_size)
auto shard1 = id_.shard_full();
CHECK(ton::shard_is_parent(shard1, subshard));
CHECK(out_msg_queue_);
int res1 = block::filter_out_msg_queue(*out_msg_queue_, shard1, subshard, queue_size);
td::uint64 queue_size;
int res1 = block::filter_out_msg_queue(*out_msg_queue_, shard1, subshard, &queue_size);
if (res1 < 0) {
return td::Status::Error(-666, "error splitting OutMsgQueue of "s + id_.to_str());
}
out_msg_queue_size_ = queue_size;
LOG(DEBUG) << "split counters: " << res1;
// 3. processed_upto
LOG(DEBUG) << "splitting ProcessedUpto";
Expand Down Expand Up @@ -1119,7 +1130,7 @@ td::Status ShardState::split(ton::ShardIdFull subshard, td::uint32* queue_size)
}

int filter_out_msg_queue(vm::AugmentedDictionary& out_queue, ton::ShardIdFull old_shard, ton::ShardIdFull subshard,
td::uint32* queue_size) {
td::uint64* queue_size) {
if (queue_size) {
*queue_size = 0;
}
Expand Down
8 changes: 4 additions & 4 deletions crypto/block/block.h
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,7 @@ struct ShardState {
std::unique_ptr<vm::Dictionary> block_create_stats_;
std::shared_ptr<block::MsgProcessedUptoCollection> processed_upto_;
std::unique_ptr<vm::AugmentedDictionary> dispatch_queue_;
td::optional<td::uint64> out_msg_queue_size_;

bool is_valid() const {
return id_.is_valid();
Expand All @@ -434,11 +435,10 @@ struct ShardState {
ton::BlockSeqno prev_mc_block_seqno, bool after_split, bool clear_history,
std::function<bool(ton::BlockSeqno)> for_each_mcseqno);
td::Status merge_with(ShardState& sib);
td::Result<std::unique_ptr<vm::AugmentedDictionary>> compute_split_out_msg_queue(ton::ShardIdFull subshard,
td::uint32* queue_size = nullptr);
td::Result<std::unique_ptr<vm::AugmentedDictionary>> compute_split_out_msg_queue(ton::ShardIdFull subshard);
td::Result<std::shared_ptr<block::MsgProcessedUptoCollection>> compute_split_processed_upto(
ton::ShardIdFull subshard);
td::Status split(ton::ShardIdFull subshard, td::uint32* queue_size = nullptr);
td::Status split(ton::ShardIdFull subshard);
td::Status unpack_out_msg_queue_info(Ref<vm::Cell> out_msg_queue_info);
bool clear_load_history() {
overload_history_ = underload_history_ = 0;
Expand Down Expand Up @@ -659,7 +659,7 @@ class MtCarloComputeShare {
};

int filter_out_msg_queue(vm::AugmentedDictionary& out_queue, ton::ShardIdFull old_shard, ton::ShardIdFull subshard,
td::uint32* queue_size = nullptr);
td::uint64* queue_size = nullptr);

std::ostream& operator<<(std::ostream& os, const ShardId& shard_id);

Expand Down
3 changes: 2 additions & 1 deletion ton/ton-types.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ enum GlobalCapabilities {
capBounceMsgBody = 4,
capReportVersion = 8,
capSplitMergeTransactions = 16,
capShortDequeue = 32
capShortDequeue = 32,
capStoreOutMsgQueueSize = 64
};

inline int shard_pfx_len(ShardId shard) {
Expand Down
9 changes: 6 additions & 3 deletions validator/impl/collator-impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ class Collator final : public td::actor::Actor {
return SUPPORTED_VERSION;
}
static constexpr long long supported_capabilities() {
return ton::capCreateStatsEnabled | ton::capBounceMsgBody | ton::capReportVersion | ton::capShortDequeue;
return ton::capCreateStatsEnabled | ton::capBounceMsgBody | ton::capReportVersion | ton::capShortDequeue |
ton::capStoreOutMsgQueueSize;
}
using LtCellRef = block::LtCellRef;
using NewOutMsg = block::NewOutMsg;
Expand Down Expand Up @@ -192,7 +193,8 @@ class Collator final : public td::actor::Actor {
std::priority_queue<NewOutMsg, std::vector<NewOutMsg>, std::greater<NewOutMsg>> new_msgs;
std::pair<ton::LogicalTime, ton::Bits256> last_proc_int_msg_, first_unproc_int_msg_;
std::unique_ptr<vm::AugmentedDictionary> in_msg_dict, out_msg_dict, out_msg_queue_, sibling_out_msg_queue_;
td::uint32 out_msg_queue_size_ = 0;
td::uint64 out_msg_queue_size_ = 0;
bool have_out_msg_queue_size_in_state_ = false;
std::unique_ptr<vm::Dictionary> ihr_pending;
std::shared_ptr<block::MsgProcessedUptoCollection> processed_upto_, sibling_processed_upto_;
std::unique_ptr<vm::Dictionary> block_create_stats_;
Expand All @@ -207,7 +209,8 @@ class Collator final : public td::actor::Actor {
std::map<StdSmcAddress, td::uint32> sender_generated_messages_count_;
unsigned dispatch_queue_ops_{0};
std::map<StdSmcAddress, LogicalTime> last_enqueued_deferred_lt_;
bool msg_metadata_enabled_ = true; // TODO: enable by config
bool msg_metadata_enabled_ = true; // TODO: enable by config
bool store_out_msg_queue_size_ = true; // TODO: enable by config

td::PerfWarningTimer perf_timer_;
//
Expand Down
24 changes: 17 additions & 7 deletions validator/impl/collator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,7 @@ bool Collator::unpack_last_mc_state() {
create_stats_enabled_ = config_->create_stats_enabled();
report_version_ = config_->has_capability(ton::capReportVersion);
short_dequeue_records_ = config_->has_capability(ton::capShortDequeue);
store_out_msg_queue_size_ = config_->has_capability(ton::capStoreOutMsgQueueSize);
shard_conf_ = std::make_unique<block::ShardConfig>(*config_);
prev_key_block_exists_ = config_->get_last_key_block(prev_key_block_, prev_key_block_lt_);
if (prev_key_block_exists_) {
Expand Down Expand Up @@ -794,15 +795,16 @@ bool Collator::request_neighbor_msg_queues() {
}

/**
* Requests the size of the outbound message queue from the previous state(s).
* Requests the size of the outbound message queue from the previous state(s) if needed.
*
* @returns True if the request was successful, false otherwise.
*/
bool Collator::request_out_msg_queue_size() {
if (after_split_) {
// If block is after split, the size is calculated during split (see Collator::split_last_state)
if (have_out_msg_queue_size_in_state_) {
// if after_split then have_out_msg_queue_size_in_state_ is always true, since the size is calculated during split
return true;
}
out_msg_queue_size_ = 0;
for (size_t i = 0; i < prev_blocks.size(); ++i) {
++pending;
send_closure_later(manager, &ValidatorManager::get_out_msg_queue_size, prev_blocks[i],
Expand Down Expand Up @@ -1016,7 +1018,7 @@ bool Collator::split_last_state(block::ShardState& ss) {
return fatal_error(res2.move_as_error());
}
sibling_processed_upto_ = res2.move_as_ok();
auto res3 = ss.split(shard_, &out_msg_queue_size_);
auto res3 = ss.split(shard_);
if (res3.is_error()) {
return fatal_error(std::move(res3));
}
Expand Down Expand Up @@ -1054,6 +1056,10 @@ bool Collator::import_shard_state_data(block::ShardState& ss) {
ihr_pending = std::move(ss.ihr_pending_);
dispatch_queue_ = std::move(ss.dispatch_queue_);
block_create_stats_ = std::move(ss.block_create_stats_);
if (ss.out_msg_queue_size_) {
have_out_msg_queue_size_in_state_ = true;
out_msg_queue_size_ = ss.out_msg_queue_size_.value();
}
return true;
}

Expand Down Expand Up @@ -4928,9 +4934,13 @@ bool Collator::compute_out_msg_queue_info(Ref<vm::Cell>& out_msg_queue_info) {
vm::CellBuilder cb;
// out_msg_queue_extra#0 dispatch_queue:DispatchQueue out_queue_size:(Maybe uint48) = OutMsgQueueExtra;
// ... extra:(Maybe OutMsgQueueExtra)
if (!dispatch_queue_->is_empty()) {
if (!(cb.store_long_bool(1, 1) && cb.store_long_bool(0, 4) && dispatch_queue_->append_dict_to_bool(cb) &&
cb.store_long_bool(0, 1))) {
bool ok = false;
if (!dispatch_queue_->is_empty() || store_out_msg_queue_size_) {
if (!(cb.store_long_bool(1, 1) && cb.store_long_bool(0, 4) && dispatch_queue_->append_dict_to_bool(cb))) {
return false;
}
if (!(cb.store_bool_bool(store_out_msg_queue_size_) &&
(!store_out_msg_queue_size_ || cb.store_long_bool(out_msg_queue_size_, 48)))) {
return false;
}
} else {
Expand Down
67 changes: 67 additions & 0 deletions validator/impl/validate-query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -895,6 +895,7 @@ bool ValidateQuery::try_unpack_mc_state() {
if (!is_masterchain() && !check_this_shard_mc_info()) {
return fatal_error("masterchain configuration does not admit creating block "s + id_.to_str());
}
store_out_msg_queue_size_ = config_->has_capability(ton::capStoreOutMsgQueueSize);
} catch (vm::VmError& err) {
return fatal_error(-666, err.get_msg());
} catch (vm::VmVirtError& err) {
Expand Down Expand Up @@ -2195,6 +2196,50 @@ bool ValidateQuery::check_utime_lt() {
return true;
}

/**
* Reads the size of the outbound message queue from the previous state(s), or requests it if needed.
*
* @returns True if the request was successful, false otherwise.
*/
bool ValidateQuery::prepare_out_msg_queue_size() {
if (ps_.out_msg_queue_size_) {
// if after_split then out_msg_queue_size is always present, since it is calculated during split
old_out_msg_queue_size_ = ps_.out_msg_queue_size_.value();
return true;
}
old_out_msg_queue_size_ = 0;
for (size_t i = 0; i < prev_blocks.size(); ++i) {
++pending;
send_closure_later(manager, &ValidatorManager::get_out_msg_queue_size, prev_blocks[i],
[self = get_self(), i](td::Result<td::uint32> res) {
td::actor::send_closure(std::move(self), &ValidateQuery::got_out_queue_size, i,
std::move(res));
});
}
return true;
}

/**
* Handles the result of obtaining the size of the outbound message queue.
*
* If the block is after merge then the two sizes are added.
*
* @param i The index of the previous block (0 or 1).
* @param res The result object containing the size of the queue.
*/
void ValidateQuery::got_out_queue_size(size_t i, td::Result<td::uint32> res) {
--pending;
if (res.is_error()) {
fatal_error(
res.move_as_error_prefix(PSTRING() << "failed to get message queue size from prev block #" << i << ": "));
return;
}
td::uint32 size = res.move_as_ok();
LOG(DEBUG) << "got outbound queue size from prev block #" << i << ": " << size;
old_out_msg_queue_size_ += size;
try_validate();
}

/*
*
* METHODS CALLED FROM try_validate() stage 1
Expand Down Expand Up @@ -3041,6 +3086,7 @@ bool ValidateQuery::precheck_one_message_queue_update(td::ConstBitPtr out_msg_id
return reject_query("new EnqueuedMsg with key "s + out_msg_id.to_hex(352) + " is invalid");
}
if (new_value.not_null()) {
++new_out_msg_queue_size_;
if (!block::gen::t_EnqueuedMsg.validate_csr(new_value)) {
return reject_query("new EnqueuedMsg with key "s + out_msg_id.to_hex(352) +
" failed to pass automated validity checks");
Expand All @@ -3057,6 +3103,7 @@ bool ValidateQuery::precheck_one_message_queue_update(td::ConstBitPtr out_msg_id
}
}
if (old_value.not_null()) {
--new_out_msg_queue_size_;
if (!block::gen::t_EnqueuedMsg.validate_csr(old_value)) {
return reject_query("old EnqueuedMsg with key "s + out_msg_id.to_hex(352) +
" failed to pass automated validity checks");
Expand Down Expand Up @@ -3209,6 +3256,7 @@ bool ValidateQuery::precheck_message_queue_update() {
try {
CHECK(ps_.out_msg_queue_ && ns_.out_msg_queue_);
CHECK(out_msg_dict_);
new_out_msg_queue_size_ = old_out_msg_queue_size_;
if (!ps_.out_msg_queue_->scan_diff(
*ns_.out_msg_queue_,
[this](td::ConstBitPtr key, int key_len, Ref<vm::CellSlice> old_val_extra,
Expand All @@ -3223,6 +3271,22 @@ bool ValidateQuery::precheck_message_queue_update() {
return reject_query("invalid OutMsgQueue dictionary difference between the old and the new state: "s +
err.get_msg());
}
LOG(INFO) << "outbound message queue size: " << old_out_msg_queue_size_ << " -> " << new_out_msg_queue_size_;
if (store_out_msg_queue_size_) {
if (!ns_.out_msg_queue_size_) {
return reject_query(PSTRING() << "outbound message queue size in the new state is not correct (expected: "
<< new_out_msg_queue_size_ << ", found: none)");
}
if (ns_.out_msg_queue_size_.value() != new_out_msg_queue_size_) {
return reject_query(PSTRING() << "outbound message queue size in the new state is not correct (expected: "
<< new_out_msg_queue_size_ << ", found: " << ns_.out_msg_queue_size_.value()
<< ")");
}
} else {
if (ns_.out_msg_queue_size_) {
return reject_query("outbound message queue size in the new state is present, but shouldn't");
}
}
return true;
}

Expand Down Expand Up @@ -6621,6 +6685,9 @@ bool ValidateQuery::try_validate() {
if (!check_utime_lt()) {
return reject_query("creation utime/lt of the new block is invalid");
}
if (!prepare_out_msg_queue_size()) {
return reject_query("cannot request out msg queue size");
}
stage_ = 1;
if (pending) {
return true;
Expand Down
7 changes: 6 additions & 1 deletion validator/impl/validate-query.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ class ValidateQuery : public td::actor::Actor {
return SUPPORTED_VERSION;
}
static constexpr long long supported_capabilities() {
return ton::capCreateStatsEnabled | ton::capBounceMsgBody | ton::capReportVersion | ton::capShortDequeue;
return ton::capCreateStatsEnabled | ton::capBounceMsgBody | ton::capReportVersion | ton::capShortDequeue |
ton::capStoreOutMsgQueueSize;
}

public:
Expand Down Expand Up @@ -234,6 +235,8 @@ class ValidateQuery : public td::actor::Actor {
std::map<std::pair<StdSmcAddress, td::uint64>, Ref<vm::Cell>> new_dispatch_queue_messages_;
bool msg_metadata_enabled_ = true;
std::set<StdSmcAddress> account_expected_defer_all_messages_;
td::uint64 old_out_msg_queue_size_ = 0, new_out_msg_queue_size_ = 0;
bool store_out_msg_queue_size_ = true;

td::PerfWarningTimer perf_timer_;

Expand Down Expand Up @@ -314,6 +317,8 @@ class ValidateQuery : public td::actor::Actor {
bool check_cur_validator_set();
bool check_mc_validator_info(bool update_mc_cc);
bool check_utime_lt();
bool prepare_out_msg_queue_size();
void got_out_queue_size(size_t i, td::Result<td::uint32> res);

bool fix_one_processed_upto(block::MsgProcessedUpto& proc, ton::ShardIdFull owner, bool allow_cur = false);
bool fix_processed_upto(block::MsgProcessedUptoCollection& upto, bool allow_cur = false);
Expand Down

0 comments on commit 5ad18fe

Please sign in to comment.