Skip to content

Commit

Permalink
Improve importing msg queues
Browse files Browse the repository at this point in the history
  • Loading branch information
SpyCheese committed Aug 3, 2023
1 parent e814973 commit 9e02853
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 28 deletions.
7 changes: 5 additions & 2 deletions crypto/block/block.h
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,12 @@ static inline std::ostream& operator<<(std::ostream& os, const MsgProcessedUptoC

struct ImportedMsgQueueLimits {
// Default values
td::uint32 max_bytes = 1 << 19;
td::uint32 max_msgs = 500;
td::uint32 max_bytes = 1 << 16;
td::uint32 max_msgs = 30;
bool deserialize(vm::CellSlice& cs);
ImportedMsgQueueLimits operator*(td::uint32 x) const {
return {max_bytes * x, max_msgs * x};
}
};

struct ParamLimits {
Expand Down
40 changes: 20 additions & 20 deletions validator/full-node-shard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -960,26 +960,26 @@ void FullNodeShardImpl::download_out_msg_queue_proof(ShardIdFull dst_shard, std:
create_tl_shard_id(dst_shard), std::move(blocks_tl),
create_tl_object<ton_api::tonNode_importedMsgQueueLimits>(limits.max_bytes, limits.max_msgs));

auto P = td::PromiseCreator::lambda([=, promise = create_neighbour_promise(b, std::move(promise), true),
blocks = std::move(blocks)](td::Result<td::BufferSlice> R) mutable {
if (R.is_error()) {
promise.set_result(R.move_as_error());
return;
}
TRY_RESULT_PROMISE(promise, f, fetch_tl_object<ton_api::tonNode_OutMsgQueueProof>(R.move_as_ok(), true));
ton_api::downcast_call(
*f, td::overloaded(
[&](ton_api::tonNode_outMsgQueueProofEmpty &x) {
promise.set_error(td::Status::Error("node doesn't have this block"));
},
[&](ton_api::tonNode_outMsgQueueProof &x) {
delay_action(
[=, promise = std::move(promise), blocks = std::move(blocks), x = std::move(x)]() mutable {
promise.set_result(OutMsgQueueProof::fetch(dst_shard, blocks, limits, x));
},
td::Timestamp::now());
}));
});
auto P = td::PromiseCreator::lambda(
[=, promise = std::move(promise), blocks = std::move(blocks)](td::Result<td::BufferSlice> R) mutable {
if (R.is_error()) {
promise.set_result(R.move_as_error());
return;
}
TRY_RESULT_PROMISE(promise, f, fetch_tl_object<ton_api::tonNode_OutMsgQueueProof>(R.move_as_ok(), true));
ton_api::downcast_call(
*f, td::overloaded(
[&](ton_api::tonNode_outMsgQueueProofEmpty &x) {
promise.set_error(td::Status::Error("node doesn't have this block"));
},
[&](ton_api::tonNode_outMsgQueueProof &x) {
delay_action(
[=, promise = std::move(promise), blocks = std::move(blocks), x = std::move(x)]() mutable {
promise.set_result(OutMsgQueueProof::fetch(dst_shard, blocks, limits, x));
},
td::Timestamp::now());
}));
});
td::actor::send_closure(overlays_, &overlay::Overlays::send_query_via, b.adnl_id, adnl_id_, overlay_id_,
"get_msg_queue", std::move(P), timeout, std::move(query), 1 << 22, rldp_);
}
Expand Down
55 changes: 52 additions & 3 deletions validator/impl/out-msg-queue-proof.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,54 @@ void OutMsgQueueImporter::new_masterchain_block_notification(td::Ref<Masterchain
void OutMsgQueueImporter::get_neighbor_msg_queue_proofs(
ShardIdFull dst_shard, std::vector<BlockIdExt> blocks, td::Timestamp timeout,
td::Promise<std::map<BlockIdExt, td::Ref<OutMsgQueueProof>>> promise) {
if (blocks.empty()) {
promise.set_value({});
return;
}
if (dst_shard.is_masterchain() && blocks.size() != 1) {
// We spit queries for masterchain {dst_shard, {block_1, ..., block_n}} into separate queries
// {dst_shard, {block_1}}, ..., {dst_shard, {block_n}}
class Worker : public td::actor::Actor {
public:
Worker(size_t pending, td::Promise<std::map<BlockIdExt, td::Ref<OutMsgQueueProof>>> promise)
: pending_(pending), promise_(std::move(promise)) {
CHECK(pending_ > 0);
}

void on_result(td::Ref<OutMsgQueueProof> res) {
result_[res->block_id_] = res;
if (--pending_ == 0) {
promise_.set_result(std::move(result_));
stop();
}
}

void on_error(td::Status error) {
promise_.set_error(std::move(error));
stop();
}

private:
size_t pending_;
td::Promise<std::map<BlockIdExt, td::Ref<OutMsgQueueProof>>> promise_;
std::map<BlockIdExt, td::Ref<OutMsgQueueProof>> result_;
};
auto worker = td::actor::create_actor<Worker>("queueworker", blocks.size(), std::move(promise)).release();
for (const BlockIdExt& block : blocks) {
get_neighbor_msg_queue_proofs(dst_shard, {block}, timeout,
[=](td::Result<std::map<BlockIdExt, td::Ref<OutMsgQueueProof>>> R) {
if (R.is_error()) {
td::actor::send_closure(worker, &Worker::on_error, R.move_as_error());
} else {
auto res = R.move_as_ok();
CHECK(res.size() == 1);
td::actor::send_closure(worker, &Worker::on_result, res.begin()->second);
}
});
}
return;
}

std::sort(blocks.begin(), blocks.end());
auto entry = cache_[{dst_shard, blocks}];
if (entry) {
Expand Down Expand Up @@ -326,7 +374,8 @@ void OutMsgQueueImporter::get_neighbor_msg_queue_proofs(
++entry->pending;
for (size_t i = 0; i < p.second.size(); i += 16) {
size_t j = std::min(i + 16, p.second.size());
get_proof_import(entry, std::vector<BlockIdExt>(p.second.begin() + i, p.second.begin() + j), limits);
get_proof_import(entry, std::vector<BlockIdExt>(p.second.begin() + i, p.second.begin() + j),
limits * (td::uint32)(j - i));
}
}
if (entry->pending == 0) {
Expand All @@ -341,7 +390,7 @@ void OutMsgQueueImporter::get_proof_local(std::shared_ptr<CacheEntry> entry, Blo
td::actor::send_closure(
manager_, &ValidatorManager::wait_block_state_short, block, 0, entry->timeout,
[=, SelfId = actor_id(this), manager = manager_, timeout = entry->timeout,
retry_after = td::Timestamp::in(0.5)](td::Result<Ref<ShardState>> R) mutable {
retry_after = td::Timestamp::in(0.1)](td::Result<Ref<ShardState>> R) mutable {
if (R.is_error()) {
LOG(DEBUG) << "Failed to get block state for " << block.to_str() << ": " << R.move_as_error();
delay_action([=]() { td::actor::send_closure(SelfId, &OutMsgQueueImporter::get_proof_local, entry, block); },
Expand Down Expand Up @@ -380,7 +429,7 @@ void OutMsgQueueImporter::get_proof_import(std::shared_ptr<CacheEntry> entry, st
}
td::actor::send_closure(
manager_, &ValidatorManager::send_get_out_msg_queue_proof_request, entry->dst_shard, blocks, limits,
[=, SelfId = actor_id(this), retry_after = td::Timestamp::in(0.5),
[=, SelfId = actor_id(this), retry_after = td::Timestamp::in(0.1),
dst_shard = entry->dst_shard](td::Result<std::vector<td::Ref<OutMsgQueueProof>>> R) {
if (R.is_error()) {
LOG(DEBUG) << "Failed to get out msg queue for " << dst_shard.to_str() << ": " << R.move_as_error();
Expand Down
31 changes: 28 additions & 3 deletions validator/manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,10 @@ void ValidatorManagerImpl::add_shard_block_description(td::Ref<ShardTopBlockDesc
});
wait_block_state_short(desc->block_id(), 0, td::Timestamp::in(60.0), std::move(P));
}
if (collating_masterchain() && desc->generated_at() > td::Clocks::system() - 20) {
wait_neighbor_msg_queue_proofs(ShardIdFull{masterchainId}, {desc->block_id()}, td::Timestamp::in(15.0),
[](td::Result<std::map<BlockIdExt, td::Ref<OutMsgQueueProof>>>) {});
}
}
}

Expand Down Expand Up @@ -2525,6 +2529,16 @@ bool ValidatorManagerImpl::validating_masterchain() {
.is_zero();
}

bool ValidatorManagerImpl::collating_masterchain() {
if (masterchain_collators_) {
return true;
}
if (opts_->validator_mode() == ValidatorManagerOptions::validator_lite_all) {
return false;
}
return validating_masterchain();
}

PublicKeyHash ValidatorManagerImpl::get_validator(ShardIdFull shard, td::Ref<ValidatorSet> val_set) {
for (auto &key : temp_keys_) {
if (val_set->is_validator(key.bits256_value())) {
Expand Down Expand Up @@ -2726,7 +2740,12 @@ void ValidatorManagerImpl::add_collator(adnl::AdnlNodeIdShort id, ShardIdFull sh
it = collator_nodes_.emplace(id, Collator()).first;
it->second.actor = td::actor::create_actor<CollatorNode>("collatornode", id, actor_id(this), adnl_, rldp_);
}
it->second.shards.insert(shard);
if (!it->second.shards.insert(shard).second) {
return;
}
if (shard.is_masterchain()) {
++masterchain_collators_;
}
td::actor::send_closure(it->second.actor, &CollatorNode::add_shard, shard);
}

Expand All @@ -2735,10 +2754,16 @@ void ValidatorManagerImpl::del_collator(adnl::AdnlNodeIdShort id, ShardIdFull sh
if (it == collator_nodes_.end()) {
return;
}
td::actor::send_closure(it->second.actor, &CollatorNode::del_shard, shard);
it->second.shards.erase(shard);
if (!it->second.shards.erase(shard)) {
return;
}
if (shard.is_masterchain()) {
--masterchain_collators_;
}
if (it->second.shards.empty()) {
collator_nodes_.erase(it);
} else {
td::actor::send_closure(it->second.actor, &CollatorNode::del_shard, shard);
}
}

Expand Down
2 changes: 2 additions & 0 deletions validator/manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,7 @@ class ValidatorManagerImpl : public ValidatorManager {
bool is_validator();
bool is_collator();
bool validating_masterchain();
bool collating_masterchain();
PublicKeyHash get_validator(ShardIdFull shard, td::Ref<ValidatorSet> val_set);

ValidatorManagerImpl(td::Ref<ValidatorManagerOptions> opts, std::string db_root,
Expand Down Expand Up @@ -650,6 +651,7 @@ class ValidatorManagerImpl : public ValidatorManager {
std::set<ShardIdFull> shards;
};
std::map<adnl::AdnlNodeIdShort, Collator> collator_nodes_;
size_t masterchain_collators_ = 0;

std::set<ShardIdFull> extra_active_shards_;
std::map<ShardIdFull, BlockSeqno> last_validated_blocks_;
Expand Down

0 comments on commit 9e02853

Please sign in to comment.