Skip to content

Commit

Permalink
Optimize masterchain collation
Browse files Browse the repository at this point in the history
Use only shard blocks with ready msg queues
  • Loading branch information
SpyCheese committed Aug 30, 2023
1 parent 47c60d8 commit 1e3a122
Show file tree
Hide file tree
Showing 10 changed files with 151 additions and 82 deletions.
23 changes: 13 additions & 10 deletions validator/impl/collator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,12 +236,12 @@ void Collator::start_up() {
}
if (is_masterchain() && !is_hardfork_) {
// 5. load shard block info messages
LOG(DEBUG) << "sending get_shard_blocks() query to Manager";
LOG(DEBUG) << "sending get_shard_blocks_for_collator() query to Manager";
++pending;
td::actor::send_closure_later(
manager, &ValidatorManager::get_shard_blocks, prev_blocks[0],
manager, &ValidatorManager::get_shard_blocks_for_collator, prev_blocks[0],
[self = get_self()](td::Result<std::vector<Ref<ShardTopBlockDescription>>> res) -> void {
LOG(DEBUG) << "got answer to get_shard_blocks() query";
LOG(DEBUG) << "got answer to get_shard_blocks_for_collator() query";
td::actor::send_closure_later(std::move(self), &Collator::after_get_shard_blocks, std::move(res));
});
}
Expand Down Expand Up @@ -1405,8 +1405,8 @@ bool Collator::import_new_shard_top_blocks() {
prev_descr.clear();
descr.clear();
} else {
LOG(INFO) << "updated top shard block information with " << sh_bd->block_id().to_str() << " and "
<< prev_bd->block_id().to_str();
LOG(DEBUG) << "updated top shard block information with " << sh_bd->block_id().to_str() << " and "
<< prev_bd->block_id().to_str();
CHECK(ures.move_as_ok());
store_shard_fees(std::move(prev_descr));
store_shard_fees(std::move(descr));
Expand Down Expand Up @@ -1448,18 +1448,21 @@ bool Collator::import_new_shard_top_blocks() {
store_shard_fees(std::move(descr));
register_shard_block_creators(sh_bd->get_creator_list(chain_len));
shards_max_end_lt_ = std::max(shards_max_end_lt_, end_lt);
LOG(INFO) << "updated top shard block information with " << sh_bd->block_id().to_str();
LOG(DEBUG) << "updated top shard block information with " << sh_bd->block_id().to_str();
CHECK(ures.move_as_ok());
++tb_act;
used_shard_block_descr_.emplace_back(sh_bd);
}
if (tb_act) {
shard_conf_adjusted_ = true;
}
if (tb_act && verbosity >= 0) { // DEBUG
LOG(INFO) << "updated shard block configuration to ";
auto csr = shard_conf_->get_root_csr();
block::gen::t_ShardHashes.print(std::cerr, csr.write());
if (tb_act) {
LOG(INFO) << "updated shard block configuration: " << tb_act << " new top shard blocks";
if (verbosity >= 1) {
LOG(INFO) << "updated shard block configuration to ";
auto csr = shard_conf_->get_root_csr();
block::gen::t_ShardHashes.print(std::cerr, csr.write());
}
}
block::gen::ShardFeeCreated::Record fc;
if (!(tlb::csr_unpack(fees_import_dict_->get_root_extra(),
Expand Down
44 changes: 0 additions & 44 deletions validator/impl/out-msg-queue-proof.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -289,50 +289,6 @@ void OutMsgQueueImporter::get_neighbor_msg_queue_proofs(
promise.set_value({});
return;
}
if (dst_shard.is_masterchain() && blocks.size() != 1) {
// We spit queries for masterchain {dst_shard, {block_1, ..., block_n}} into separate queries
// {dst_shard, {block_1}}, ..., {dst_shard, {block_n}}
class Worker : public td::actor::Actor {
public:
Worker(size_t pending, td::Promise<std::map<BlockIdExt, td::Ref<OutMsgQueueProof>>> promise)
: pending_(pending), promise_(std::move(promise)) {
CHECK(pending_ > 0);
}

void on_result(td::Ref<OutMsgQueueProof> res) {
result_[res->block_id_] = res;
if (--pending_ == 0) {
promise_.set_result(std::move(result_));
stop();
}
}

void on_error(td::Status error) {
promise_.set_error(std::move(error));
stop();
}

private:
size_t pending_;
td::Promise<std::map<BlockIdExt, td::Ref<OutMsgQueueProof>>> promise_;
std::map<BlockIdExt, td::Ref<OutMsgQueueProof>> result_;
};
auto worker = td::actor::create_actor<Worker>("queueworker", blocks.size(), std::move(promise)).release();
for (const BlockIdExt& block : blocks) {
get_neighbor_msg_queue_proofs(dst_shard, {block}, timeout,
[=](td::Result<std::map<BlockIdExt, td::Ref<OutMsgQueueProof>>> R) {
if (R.is_error()) {
td::actor::send_closure(worker, &Worker::on_error, R.move_as_error());
} else {
auto res = R.move_as_ok();
CHECK(res.size() == 1);
td::actor::send_closure(worker, &Worker::on_result, res.begin()->second);
}
});
}
return;
}

std::sort(blocks.begin(), blocks.end());
auto entry = cache_[{dst_shard, blocks}];
if (entry) {
Expand Down
2 changes: 1 addition & 1 deletion validator/impl/out-msg-queue-proof.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class OutMsgQueueImporter : public td::actor::Actor {
void finish_query(std::shared_ptr<CacheEntry> entry);
bool check_timeout(std::shared_ptr<CacheEntry> entry);

constexpr static const double CACHE_TTL = 30.0;
constexpr static const double CACHE_TTL = 60.0;
};

class BuildOutMsgQueueProof : public td::actor::Actor {
Expand Down
4 changes: 2 additions & 2 deletions validator/interfaces/validator-manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ class ValidatorManager : public ValidatorManagerInterface {
td::Promise<td::Ref<MessageQueue>> promise) = 0;
virtual void get_external_messages(ShardIdFull shard, td::Promise<std::vector<td::Ref<ExtMessage>>> promise) = 0;
virtual void get_ihr_messages(ShardIdFull shard, td::Promise<std::vector<td::Ref<IhrMessage>>> promise) = 0;
virtual void get_shard_blocks(BlockIdExt masterchain_block_id,
td::Promise<std::vector<td::Ref<ShardTopBlockDescription>>> promise) = 0;
virtual void get_shard_blocks_for_collator(BlockIdExt masterchain_block_id,
td::Promise<std::vector<td::Ref<ShardTopBlockDescription>>> promise) = 0;
virtual void complete_external_messages(std::vector<ExtMessage::Hash> to_delay,
std::vector<ExtMessage::Hash> to_delete) = 0;
virtual void complete_ihr_messages(std::vector<IhrMessage::Hash> to_delay,
Expand Down
4 changes: 2 additions & 2 deletions validator/manager-disk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -516,8 +516,8 @@ void ValidatorManagerImpl::get_ihr_messages(ShardIdFull shard, td::Promise<std::
promise.set_result(ihr_messages_);
}

void ValidatorManagerImpl::get_shard_blocks(BlockIdExt masterchain_block_id,
td::Promise<std::vector<td::Ref<ShardTopBlockDescription>>> promise) {
void ValidatorManagerImpl::get_shard_blocks_for_collator(
BlockIdExt masterchain_block_id, td::Promise<std::vector<td::Ref<ShardTopBlockDescription>>> promise) {
if (!last_masterchain_block_handle_) {
promise.set_result(std::vector<td::Ref<ShardTopBlockDescription>>{});
return;
Expand Down
4 changes: 2 additions & 2 deletions validator/manager-disk.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,8 @@ class ValidatorManagerImpl : public ValidatorManager {
td::Promise<td::Ref<MessageQueue>> promise) override;
void get_external_messages(ShardIdFull shard, td::Promise<std::vector<td::Ref<ExtMessage>>> promise) override;
void get_ihr_messages(ShardIdFull shard, td::Promise<std::vector<td::Ref<IhrMessage>>> promise) override;
void get_shard_blocks(BlockIdExt masterchain_block_id,
td::Promise<std::vector<td::Ref<ShardTopBlockDescription>>> promise) override;
void get_shard_blocks_for_collator(BlockIdExt masterchain_block_id,
td::Promise<std::vector<td::Ref<ShardTopBlockDescription>>> promise) override;
void complete_external_messages(std::vector<ExtMessage::Hash> to_delay,
std::vector<ExtMessage::Hash> to_delete) override;
void complete_ihr_messages(std::vector<IhrMessage::Hash> to_delay, std::vector<IhrMessage::Hash> to_delete) override;
Expand Down
4 changes: 2 additions & 2 deletions validator/manager-hardfork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -366,8 +366,8 @@ void ValidatorManagerImpl::get_ihr_messages(ShardIdFull shard, td::Promise<std::
promise.set_result(ihr_messages_);
}

void ValidatorManagerImpl::get_shard_blocks(BlockIdExt masterchain_block_id,
td::Promise<std::vector<td::Ref<ShardTopBlockDescription>>> promise) {
void ValidatorManagerImpl::get_shard_blocks_for_collator(
BlockIdExt masterchain_block_id, td::Promise<std::vector<td::Ref<ShardTopBlockDescription>>> promise) {
}

void ValidatorManagerImpl::get_block_data_from_db(ConstBlockHandle handle, td::Promise<td::Ref<BlockData>> promise) {
Expand Down
2 changes: 1 addition & 1 deletion validator/manager-hardfork.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ class ValidatorManagerImpl : public ValidatorManager {
td::Promise<td::Ref<MessageQueue>> promise) override;
void get_external_messages(ShardIdFull shard, td::Promise<std::vector<td::Ref<ExtMessage>>> promise) override;
void get_ihr_messages(ShardIdFull shard, td::Promise<std::vector<td::Ref<IhrMessage>>> promise) override;
void get_shard_blocks(BlockIdExt masterchain_block_id,
void get_shard_blocks_for_collator(BlockIdExt masterchain_block_id,
td::Promise<std::vector<td::Ref<ShardTopBlockDescription>>> promise) override;
void complete_external_messages(std::vector<ExtMessage::Hash> to_delay,
std::vector<ExtMessage::Hash> to_delete) override {
Expand Down
130 changes: 115 additions & 15 deletions validator/manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ void ValidatorManagerImpl::new_shard_block(BlockIdExt block_id, CatchainSeqno cc
return;
}
auto it = shard_blocks_.find(ShardTopBlockDescriptionId{block_id.shard_full(), cc_seqno});
if (it != shard_blocks_.end() && block_id.id.seqno <= it->second->block_id().id.seqno) {
if (it != shard_blocks_.end() && block_id.id.seqno <= it->second.latest_desc->block_id().id.seqno) {
VLOG(VALIDATOR_DEBUG) << "dropping duplicate shard block broadcast";
return;
}
Expand All @@ -459,11 +459,11 @@ void ValidatorManagerImpl::new_shard_block(BlockIdExt block_id, CatchainSeqno cc
void ValidatorManagerImpl::add_shard_block_description(td::Ref<ShardTopBlockDescription> desc) {
if (desc->may_be_valid(last_masterchain_block_handle_, last_masterchain_state_)) {
auto it = shard_blocks_.find(ShardTopBlockDescriptionId{desc->shard(), desc->catchain_seqno()});
if (it != shard_blocks_.end() && desc->block_id().id.seqno <= it->second->block_id().id.seqno) {
if (it != shard_blocks_.end() && desc->block_id().id.seqno <= it->second.latest_desc->block_id().id.seqno) {
VLOG(VALIDATOR_DEBUG) << "dropping duplicate shard block broadcast";
return;
}
shard_blocks_[ShardTopBlockDescriptionId{desc->block_id().shard_full(), desc->catchain_seqno()}] = desc;
shard_blocks_[ShardTopBlockDescriptionId{desc->block_id().shard_full(), desc->catchain_seqno()}].latest_desc = desc;
VLOG(VALIDATOR_DEBUG) << "new shard block descr for " << desc->block_id();
if (need_monitor(desc->block_id().shard_full())) {
auto P = td::PromiseCreator::lambda([](td::Result<td::Ref<ShardState>> R) {
Expand All @@ -478,13 +478,53 @@ void ValidatorManagerImpl::add_shard_block_description(td::Ref<ShardTopBlockDesc
});
wait_block_state_short(desc->block_id(), 0, td::Timestamp::in(60.0), std::move(P));
}
if (collating_masterchain() && desc->generated_at() > td::Clocks::system() - 20) {
wait_neighbor_msg_queue_proofs(ShardIdFull{masterchainId}, {desc->block_id()}, td::Timestamp::in(15.0),
[](td::Result<std::map<BlockIdExt, td::Ref<OutMsgQueueProof>>>) {});
if (collating_masterchain()) {
preload_msg_queue_to_masterchain(desc);
}
}
}

void ValidatorManagerImpl::preload_msg_queue_to_masterchain(td::Ref<ShardTopBlockDescription> desc) {
auto id = ShardTopBlockDescriptionId{desc->block_id().shard_full(), desc->catchain_seqno()};
auto it = shard_blocks_.find(id);
if (!collating_masterchain() || it == shard_blocks_.end() || it->second.latest_desc->block_id() != desc->block_id()) {
return;
}
wait_neighbor_msg_queue_proofs(
ShardIdFull{masterchainId}, {desc->block_id()}, td::Timestamp::in(10.0),
[=, SelfId = actor_id(this),
retry_at = td::Timestamp::in(1.0)](td::Result<std::map<BlockIdExt, td::Ref<OutMsgQueueProof>>> R) {
if (R.is_error()) {
delay_action(
[=]() { td::actor::send_closure(SelfId, &ValidatorManagerImpl::preload_msg_queue_to_masterchain, desc); },
retry_at);
return;
}
auto res = R.move_as_ok();
auto &queue = res[desc->block_id()];
CHECK(queue.not_null());
td::actor::send_closure(SelfId, &ValidatorManagerImpl::loaded_msg_queue_to_masterchain, desc, std::move(queue));
});
}

void ValidatorManagerImpl::loaded_msg_queue_to_masterchain(td::Ref<ShardTopBlockDescription> desc,
td::Ref<OutMsgQueueProof> res) {
auto id = ShardTopBlockDescriptionId{desc->block_id().shard_full(), desc->catchain_seqno()};
auto it = shard_blocks_.find(id);
if (it == shard_blocks_.end()) {
return;
}
auto &info = it->second;
if (info.ready_desc.is_null() || info.ready_desc->block_id().seqno() < desc->block_id().seqno()) {
VLOG(VALIDATOR_DEBUG) << "loaded out msg queue to masterchain from " << desc->block_id();
if (info.ready_desc.not_null()) {
cached_msg_queue_to_masterchain_.erase(info.ready_desc->block_id());
}
info.ready_desc = desc;
cached_msg_queue_to_masterchain_[desc->block_id()] = std::move(res);
}
}

void ValidatorManagerImpl::add_ext_server_id(adnl::AdnlNodeIdShort id) {
class Cb : public adnl::Adnl::Callback {
private:
Expand Down Expand Up @@ -629,6 +669,56 @@ void ValidatorManagerImpl::wait_neighbor_msg_queue_proofs(
out_msg_queue_importer_ = td::actor::create_actor<OutMsgQueueImporter>("outmsgqueueimporter", actor_id(this), opts_,
last_masterchain_state_);
}
if (dst_shard.is_masterchain()) {
// We spit queries for masterchain {dst_shard, {block_1, ..., block_n}} into separate queries
// {dst_shard, {block_1}}, ..., {dst_shard, {block_n}}
// Also, use cache
class Worker : public td::actor::Actor {
public:
Worker(size_t pending, td::Promise<std::map<BlockIdExt, td::Ref<OutMsgQueueProof>>> promise)
: pending_(pending), promise_(std::move(promise)) {
CHECK(pending_ > 0);
}

void on_result(td::Ref<OutMsgQueueProof> res) {
result_[res->block_id_] = res;
if (--pending_ == 0) {
promise_.set_result(std::move(result_));
stop();
}
}

void on_error(td::Status error) {
promise_.set_error(std::move(error));
stop();
}

private:
size_t pending_;
td::Promise<std::map<BlockIdExt, td::Ref<OutMsgQueueProof>>> promise_;
std::map<BlockIdExt, td::Ref<OutMsgQueueProof>> result_;
};
auto worker = td::actor::create_actor<Worker>("queueworker", blocks.size(), std::move(promise)).release();
for (const BlockIdExt &block : blocks) {
auto it = cached_msg_queue_to_masterchain_.find(block);
if (it != cached_msg_queue_to_masterchain_.end()) {
td::actor::send_closure(worker, &Worker::on_result, it->second);
continue;
}
td::actor::send_closure(out_msg_queue_importer_, &OutMsgQueueImporter::get_neighbor_msg_queue_proofs, dst_shard,
std::vector<BlockIdExt>{1, block}, timeout,
[=](td::Result<std::map<BlockIdExt, td::Ref<OutMsgQueueProof>>> R) {
if (R.is_error()) {
td::actor::send_closure(worker, &Worker::on_error, R.move_as_error());
} else {
auto res = R.move_as_ok();
CHECK(res.size() == 1);
td::actor::send_closure(worker, &Worker::on_result, res.begin()->second);
}
});
}
return;
}
td::actor::send_closure(out_msg_queue_importer_, &OutMsgQueueImporter::get_neighbor_msg_queue_proofs, dst_shard,
std::move(blocks), timeout, std::move(promise));
}
Expand Down Expand Up @@ -843,11 +933,14 @@ void ValidatorManagerImpl::get_ihr_messages(ShardIdFull shard, td::Promise<std::
promise.set_value(std::move(res));
}

void ValidatorManagerImpl::get_shard_blocks(BlockIdExt masterchain_block_id,
td::Promise<std::vector<td::Ref<ShardTopBlockDescription>>> promise) {
void ValidatorManagerImpl::get_shard_blocks_for_collator(
BlockIdExt masterchain_block_id, td::Promise<std::vector<td::Ref<ShardTopBlockDescription>>> promise) {
std::vector<td::Ref<ShardTopBlockDescription>> v;
for (auto &b : shard_blocks_) {
v.push_back(b.second);
if (b.second.ready_desc.is_null()) {
continue;
}
v.push_back(b.second.ready_desc);
}
promise.set_value(std::move(v));
}
Expand Down Expand Up @@ -1786,7 +1879,7 @@ void ValidatorManagerImpl::new_masterchain_block() {
}
if (is_collator()) {
std::set<ShardIdFull> collating_shards;
if (validating_masterchain()) {
if (collating_masterchain()) {
collating_shards.emplace(masterchainId);
}
for (const auto &collator : collator_nodes_) {
Expand Down Expand Up @@ -2080,12 +2173,19 @@ void ValidatorManagerImpl::update_shard_blocks() {
auto it = shard_blocks_.begin();
while (it != shard_blocks_.end()) {
auto &B = it->second;
if (!B->may_be_valid(last_masterchain_block_handle_, last_masterchain_state_)) {
auto it2 = it++;
shard_blocks_.erase(it2);
} else {
++it;
if (!B.latest_desc->may_be_valid(last_masterchain_block_handle_, last_masterchain_state_)) {
if (B.ready_desc.not_null()) {
cached_msg_queue_to_masterchain_.erase(B.ready_desc->block_id());
}
it = shard_blocks_.erase(it);
continue;
}
if (B.ready_desc.not_null() &&
!B.ready_desc->may_be_valid(last_masterchain_block_handle_, last_masterchain_state_)) {
cached_msg_queue_to_masterchain_.erase(B.ready_desc->block_id());
B.ready_desc = {};
}
++it;
}
}

Expand Down
Loading

0 comments on commit 1e3a122

Please sign in to comment.