diff --git a/create-hardfork/create-hardfork.cpp b/create-hardfork/create-hardfork.cpp index 3fb57f41b..165748f98 100644 --- a/create-hardfork/create-hardfork.cpp +++ b/create-hardfork/create-hardfork.cpp @@ -268,11 +268,11 @@ class HardforkCreator : public td::actor::Actor { td::Promise> promise) override { } void download_archive(ton::BlockSeqno masterchain_seqno, std::string tmp_dir, td::Timestamp timeout, - td::Promise promise) override { } - void download_out_msg_queue_proof(ton::BlockIdExt block_id, ton::ShardIdFull dst_shard, td::Timestamp timeout, - td::Promise> promise) override { + void download_out_msg_queue_proof( + ton::ShardIdFull dst_shard, std::vector blocks, block::ImportedMsgQueueLimits limits, + td::Timestamp timeout, td::Promise>> promise) override { } void new_key_block(ton::validator::BlockHandle handle) override { diff --git a/crypto/block/block.h b/crypto/block/block.h index 5f3f31630..6a94f0aad 100644 --- a/crypto/block/block.h +++ b/crypto/block/block.h @@ -218,8 +218,8 @@ static inline std::ostream& operator<<(std::ostream& os, const MsgProcessedUptoC struct ImportedMsgQueueLimits { // Default values - td::uint32 max_bytes = 1 << 18; - td::uint32 max_msgs = 40; + td::uint32 max_bytes = 1 << 19; + td::uint32 max_msgs = 500; bool deserialize(vm::CellSlice& cs); }; diff --git a/test/test-ton-collator.cpp b/test/test-ton-collator.cpp index 8ef469b5f..aa4c256b6 100644 --- a/test/test-ton-collator.cpp +++ b/test/test-ton-collator.cpp @@ -372,8 +372,9 @@ class TestNode : public td::actor::Actor { td::Promise promise) override { } - void download_out_msg_queue_proof(ton::BlockIdExt block_id, ton::ShardIdFull dst_shard, td::Timestamp timeout, - td::Promise> promise) override { + void download_out_msg_queue_proof( + ton::ShardIdFull dst_shard, std::vector blocks, block::ImportedMsgQueueLimits limits, + td::Timestamp timeout, td::Promise>> promise) override { } void new_key_block(ton::validator::BlockHandle handle) override { diff --git a/tl/generate/scheme/ton_api.tl b/tl/generate/scheme/ton_api.tl index 5444c87af..648b5ef44 100644 --- a/tl/generate/scheme/ton_api.tl +++ b/tl/generate/scheme/ton_api.tl @@ -417,7 +417,7 @@ tonNode.archiveNotFound = tonNode.ArchiveInfo; tonNode.archiveInfo id:long = tonNode.ArchiveInfo; tonNode.importedMsgQueueLimits max_bytes:int max_msgs:int = ImportedMsgQueueLimits; -tonNode.outMsgQueueProof queue_proof:bytes block_state_proof:bytes msg_count:int = tonNode.OutMsgQueueProof; +tonNode.outMsgQueueProof queue_proofs:bytes block_state_proofs:bytes msg_counts:(vector int) = tonNode.OutMsgQueueProof; tonNode.outMsgQueueProofEmpty = tonNode.OutMsgQueueProof; tonNode.forgetPeer = tonNode.ForgetPeer; @@ -453,7 +453,7 @@ tonNode.downloadBlockProofLinks blocks:(vector tonNode.blockIdExt) = tonNode.Dat tonNode.downloadKeyBlockProofLinks blocks:(vector tonNode.blockIdExt) = tonNode.DataList; tonNode.getArchiveInfo masterchain_seqno:int = tonNode.ArchiveInfo; tonNode.getArchiveSlice archive_id:long offset:long max_size:int = tonNode.Data; -tonNode.getOutMsgQueueProof block_id:tonNode.blockIdExt dst_workchain:int dst_shard:long +tonNode.getOutMsgQueueProof dst_shard:tonNode.shardId blocks:(vector tonNode.blockIdExt) limits:tonNode.importedMsgQueueLimits = tonNode.OutMsgQueueProof; tonNode.getCapabilities = tonNode.Capabilities; diff --git a/tl/generate/scheme/ton_api.tlo b/tl/generate/scheme/ton_api.tlo index b7c06f5cc..12a8888d7 100644 Binary files a/tl/generate/scheme/ton_api.tlo and b/tl/generate/scheme/ton_api.tlo differ diff --git a/validator/collator-node.cpp b/validator/collator-node.cpp index c4a849f46..feccff1ae 100644 --- a/validator/collator-node.cpp +++ b/validator/collator-node.cpp @@ -77,37 +77,8 @@ void CollatorNode::new_masterchain_block_notification(td::Ref last_masterchain_block_ = state->get_block_id(); last_top_blocks_.clear(); last_top_blocks_[ShardIdFull{masterchainId, shardIdAll}] = last_masterchain_block_; - if (state->get_unix_time() > (td::uint32)td::Clocks::system() - 20) { - std::vector next_shards; - if (can_collate_shard(ShardIdFull(masterchainId))) { - next_shards.push_back(ShardIdFull(masterchainId)); - } - for (const auto& desc : state->get_shards()) { - last_top_blocks_[desc->shard()] = desc->top_block_id(); - ShardIdFull shard = desc->shard(); - if (desc->before_split()) { - if (can_collate_shard(shard_child(shard, true))) { - next_shards.push_back(shard_child(shard, true)); - } - if (can_collate_shard(shard_child(shard, false))) { - next_shards.push_back(shard_child(shard, false)); - } - } else if (desc->before_merge()) { - if (is_left_child(shard) && can_collate_shard(shard_parent(shard))) { - next_shards.push_back(shard_parent(shard)); - } - } else if (can_collate_shard(shard)) { - next_shards.push_back(shard); - } - } - for (const ShardIdFull& shard : next_shards) { - for (const auto& neighbor : last_top_blocks_) { - if (neighbor.first != shard && block::ShardConfig::is_neighbor(shard, neighbor.first)) { - td::actor::send_closure(manager_, &ValidatorManager::wait_out_msg_queue_proof, neighbor.second, shard, 0, - td::Timestamp::in(10.0), [](td::Ref) {}); - } - } - } + for (const auto& desc : state->get_shards()) { + last_top_blocks_[desc->shard()] = desc->top_block_id(); } if (init || state->is_key_state()) { @@ -274,7 +245,8 @@ void CollatorNode::receive_query_cont(ShardIdFull shard, td::Ref prev_blocks, Ed25519_PublicKey creator, td::Promise promise) { run_collate_query(shard, min_mc_state->get_block_id(), std::move(prev_blocks), creator, - min_mc_state->get_validator_set(shard), manager_, td::Timestamp::in(10.0), std::move(promise)); + min_mc_state->get_validator_set(shard), manager_, td::Timestamp::in(10.0), std::move(promise), + CollateMode::skip_store_candidate); } void CollatorNode::process_result(std::shared_ptr cache_entry, td::Result R) { diff --git a/validator/fabric.h b/validator/fabric.h index d1611bea3..bd26b0e39 100644 --- a/validator/fabric.h +++ b/validator/fabric.h @@ -26,6 +26,7 @@ namespace ton { namespace validator { enum ValidateMode { fake = 1, full_collated_data = 2 }; +enum CollateMode { skip_store_candidate = 1 }; td::actor::ActorOwn create_db_actor(td::actor::ActorId manager, std::string db_root_); td::actor::ActorOwn create_liteserver_cache_actor(td::actor::ActorId manager, @@ -83,9 +84,9 @@ void run_validate_query(ShardIdFull shard, BlockIdExt min_masterchain_block_id, td::actor::ActorId manager, td::Timestamp timeout, td::Promise promise, unsigned mode = 0); void run_collate_query(ShardIdFull shard, const BlockIdExt& min_masterchain_block_id, std::vector prev, - Ed25519_PublicKey local_id, td::Ref validator_set, + Ed25519_PublicKey creator, td::Ref validator_set, td::actor::ActorId manager, td::Timestamp timeout, - td::Promise promise); + td::Promise promise, unsigned mode = 0); void run_collate_hardfork(ShardIdFull shard, const BlockIdExt& min_masterchain_block_id, std::vector prev, td::actor::ActorId manager, td::Timestamp timeout, td::Promise promise); diff --git a/validator/full-node-shard.cpp b/validator/full-node-shard.cpp index cefa2e80c..c6891a50d 100644 --- a/validator/full-node-shard.cpp +++ b/validator/full-node-shard.cpp @@ -666,18 +666,26 @@ void FullNodeShardImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::tonNod void FullNodeShardImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::tonNode_getOutMsgQueueProof &query, td::Promise promise) { - BlockIdExt block_id = create_block_id(query.block_id_); - ShardIdFull dst_shard(query.dst_workchain_, query.dst_shard_); - block::ImportedMsgQueueLimits limits{(td::uint32)query.limits_->max_bytes_, (td::uint32)query.limits_->max_msgs_}; - if (!block_id.is_valid_ext()) { - promise.set_error(td::Status::Error("invalid block_id")); - return; + std::vector blocks; + for (const auto& x : query.blocks_) { + BlockIdExt id = create_block_id(x); + if (!id.is_valid_ext()) { + promise.set_error(td::Status::Error("invalid block_id")); + return; + } + if (!shard_is_ancestor(shard_, id.shard_full())) { + promise.set_error(td::Status::Error("query in wrong overlay")); + return; + } + blocks.push_back(create_block_id(x)); } + ShardIdFull dst_shard = create_shard_id(query.dst_shard_); if (!dst_shard.is_valid_ext()) { promise.set_error(td::Status::Error("invalid shard")); return; } - if (limits.max_bytes > (1 << 21)) { + block::ImportedMsgQueueLimits limits{(td::uint32)query.limits_->max_bytes_, (td::uint32)query.limits_->max_msgs_}; + if (limits.max_bytes > (1 << 24)) { promise.set_error(td::Status::Error("max_bytes is too big")); return; } @@ -689,10 +697,10 @@ void FullNodeShardImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::tonNod promise.set_result(serialize_tl_object(R.move_as_ok(), true)); } }); - VLOG(FULL_NODE_DEBUG) << "Got query getOutMsgQueueProof " << block_id.to_str() << " " << dst_shard.to_str() - << " from " << src; - td::actor::create_actor("buildqueueproof", block_id, dst_shard, limits, validator_manager_, - std::move(P)) + VLOG(FULL_NODE_DEBUG) << "Got query getOutMsgQueueProof (" << blocks.size() << " blocks) to shard " + << dst_shard.to_str() << " from " << src; + td::actor::create_actor("buildqueueproof", dst_shard, std::move(blocks), limits, + validator_manager_, std::move(P)) .release(); } @@ -931,33 +939,43 @@ void FullNodeShardImpl::download_archive(BlockSeqno masterchain_seqno, std::stri .release(); } -void FullNodeShardImpl::download_out_msg_queue_proof(BlockIdExt block_id, ShardIdFull dst_shard, +void FullNodeShardImpl::download_out_msg_queue_proof(ShardIdFull dst_shard, std::vector blocks, block::ImportedMsgQueueLimits limits, td::Timestamp timeout, - td::Promise> promise) { + td::Promise>> promise) { // TODO: maybe more complex download (like other requests here) auto &b = choose_neighbour(true); if (b.adnl_id == adnl::AdnlNodeIdShort::zero()) { promise.set_error(td::Status::Error(ErrorCode::notready, "no nodes")); return; } - auto P = td::PromiseCreator::lambda( - [=, promise = create_neighbour_promise(b, std::move(promise), true)](td::Result R) mutable { - if (R.is_error()) { - promise.set_result(R.move_as_error()); - return; - } - TRY_RESULT_PROMISE(promise, f, fetch_tl_object(R.move_as_ok(), true)); - ton_api::downcast_call(*f, td::overloaded( - [&](ton_api::tonNode_outMsgQueueProofEmpty &x) { - promise.set_error(td::Status::Error("node doesn't have this block")); - }, - [&](ton_api::tonNode_outMsgQueueProof &x) { - promise.set_result(OutMsgQueueProof::fetch(block_id, dst_shard, limits, x)); - })); - }); + std::vector> blocks_tl; + for (const BlockIdExt &id : blocks) { + blocks_tl.push_back(create_tl_block_id(id)); + } td::BufferSlice query = create_serialize_tl_object( - create_tl_block_id(block_id), dst_shard.workchain, dst_shard.shard, + create_tl_shard_id(dst_shard), std::move(blocks_tl), create_tl_object(limits.max_bytes, limits.max_msgs)); + + auto P = td::PromiseCreator::lambda([=, promise = create_neighbour_promise(b, std::move(promise), true), + blocks = std::move(blocks)](td::Result R) mutable { + if (R.is_error()) { + promise.set_result(R.move_as_error()); + return; + } + TRY_RESULT_PROMISE(promise, f, fetch_tl_object(R.move_as_ok(), true)); + ton_api::downcast_call( + *f, td::overloaded( + [&](ton_api::tonNode_outMsgQueueProofEmpty &x) { + promise.set_error(td::Status::Error("node doesn't have this block")); + }, + [&](ton_api::tonNode_outMsgQueueProof &x) { + delay_action( + [=, promise = std::move(promise), blocks = std::move(blocks), x = std::move(x)]() mutable { + promise.set_result(OutMsgQueueProof::fetch(dst_shard, blocks, limits, x)); + }, + td::Timestamp::now()); + })); + }); td::actor::send_closure(overlays_, &overlay::Overlays::send_query_via, b.adnl_id, adnl_id_, overlay_id_, "get_msg_queue", std::move(P), timeout, std::move(query), 1 << 22, rldp_); } @@ -1352,9 +1370,9 @@ td::actor::ActorOwn FullNodeShard::create( td::actor::ActorId rldp, td::actor::ActorId rldp2, td::actor::ActorId overlays, td::actor::ActorId validator_manager, td::actor::ActorId client, FullNodeShardMode mode) { - return td::actor::create_actor("tonnode", shard, local_id, adnl_id, zero_state_file_hash, config, - keyring, adnl, rldp, rldp2, overlays, validator_manager, client, - mode); + return td::actor::create_actor(PSTRING() << "fullnode" << shard.to_str(), shard, local_id, adnl_id, + zero_state_file_hash, config, keyring, adnl, rldp, rldp2, overlays, + validator_manager, client, mode); } } // namespace fullnode diff --git a/validator/full-node-shard.h b/validator/full-node-shard.h index fca17bf4b..2131939fe 100644 --- a/validator/full-node-shard.h +++ b/validator/full-node-shard.h @@ -70,9 +70,9 @@ class FullNodeShard : public td::actor::Actor { td::Promise> promise) = 0; virtual void download_archive(BlockSeqno masterchain_seqno, std::string tmp_dir, td::Timestamp timeout, td::Promise promise) = 0; - virtual void download_out_msg_queue_proof(BlockIdExt block_id, ShardIdFull dst_shard, + virtual void download_out_msg_queue_proof(ShardIdFull dst_shard, std::vector blocks, block::ImportedMsgQueueLimits limits, td::Timestamp timeout, - td::Promise> promise) = 0; + td::Promise>> promise) = 0; virtual void set_handle(BlockHandle handle, td::Promise promise) = 0; diff --git a/validator/full-node-shard.hpp b/validator/full-node-shard.hpp index af16ccc91..f2b8bb669 100644 --- a/validator/full-node-shard.hpp +++ b/validator/full-node-shard.hpp @@ -187,8 +187,9 @@ class FullNodeShardImpl : public FullNodeShard { td::Promise> promise) override; void download_archive(BlockSeqno masterchain_seqno, std::string tmp_dir, td::Timestamp timeout, td::Promise promise) override; - void download_out_msg_queue_proof(BlockIdExt block_id, ShardIdFull dst_shard, block::ImportedMsgQueueLimits limits, - td::Timestamp timeout, td::Promise> promise) override; + void download_out_msg_queue_proof(ShardIdFull dst_shard, std::vector blocks, + block::ImportedMsgQueueLimits limits, td::Timestamp timeout, + td::Promise>> promise) override; void set_handle(BlockHandle handle, td::Promise promise) override; diff --git a/validator/full-node.cpp b/validator/full-node.cpp index eebc49f8b..bd4132705 100644 --- a/validator/full-node.cpp +++ b/validator/full-node.cpp @@ -378,17 +378,22 @@ void FullNodeImpl::download_archive(BlockSeqno masterchain_seqno, std::string tm std::move(promise)); } -void FullNodeImpl::download_out_msg_queue_proof(BlockIdExt block_id, ShardIdFull dst_shard, +void FullNodeImpl::download_out_msg_queue_proof(ShardIdFull dst_shard, std::vector blocks, block::ImportedMsgQueueLimits limits, td::Timestamp timeout, - td::Promise> promise) { - auto shard = get_shard(block_id.shard_full()); + td::Promise>> promise) { + if (blocks.empty()) { + promise.set_value({}); + return; + } + // All blocks are expected to have the same minsplit shard prefix + auto shard = get_shard(blocks[0].shard_full()); if (shard.empty()) { VLOG(FULL_NODE_WARNING) << "dropping download msg queue query to unknown shard"; promise.set_error(td::Status::Error(ErrorCode::notready, "shard not ready")); return; } - td::actor::send_closure(shard, &FullNodeShard::download_out_msg_queue_proof, block_id, dst_shard, limits, timeout, - std::move(promise)); + td::actor::send_closure(shard, &FullNodeShard::download_out_msg_queue_proof, dst_shard, std::move(blocks), limits, + timeout, std::move(promise)); } td::actor::ActorId FullNodeImpl::get_shard(ShardIdFull shard) { @@ -588,10 +593,11 @@ void FullNodeImpl::start_up() { td::actor::send_closure(id_, &FullNodeImpl::download_archive, masterchain_seqno, std::move(tmp_dir), timeout, std::move(promise)); } - void download_out_msg_queue_proof(BlockIdExt block_id, ShardIdFull dst_shard, block::ImportedMsgQueueLimits limits, - td::Timestamp timeout, td::Promise> promise) override { - td::actor::send_closure(id_, &FullNodeImpl::download_out_msg_queue_proof, block_id, dst_shard, limits, timeout, - std::move(promise)); + void download_out_msg_queue_proof(ShardIdFull dst_shard, std::vector blocks, + block::ImportedMsgQueueLimits limits, td::Timestamp timeout, + td::Promise>> promise) override { + td::actor::send_closure(id_, &FullNodeImpl::download_out_msg_queue_proof, dst_shard, std::move(blocks), limits, + timeout, std::move(promise)); } void new_key_block(BlockHandle handle) override { diff --git a/validator/full-node.hpp b/validator/full-node.hpp index 99ea49cc3..fd0240cd9 100644 --- a/validator/full-node.hpp +++ b/validator/full-node.hpp @@ -74,8 +74,9 @@ class FullNodeImpl : public FullNode { void get_next_key_blocks(BlockIdExt block_id, td::Timestamp timeout, td::Promise> promise); void download_archive(BlockSeqno masterchain_seqno, std::string tmp_dir, td::Timestamp timeout, td::Promise promise); - void download_out_msg_queue_proof(BlockIdExt block_id, ShardIdFull dst_shard, block::ImportedMsgQueueLimits limits, - td::Timestamp timeout, td::Promise> promise); + void download_out_msg_queue_proof(ShardIdFull dst_shard, std::vector blocks, + block::ImportedMsgQueueLimits limits, td::Timestamp timeout, + td::Promise>> promise); void got_key_block_proof(td::Ref proof); void got_zero_block_state(td::Ref state); diff --git a/validator/impl/collator-impl.h b/validator/impl/collator-impl.h index 2839a0e5b..10953a97f 100644 --- a/validator/impl/collator-impl.h +++ b/validator/impl/collator-impl.h @@ -75,6 +75,7 @@ class Collator final : public td::actor::Actor { td::Timestamp timeout; td::Timestamp queue_cleanup_timeout_, soft_timeout_, medium_timeout_; td::Promise main_promise; + unsigned mode_ = 0; ton::BlockSeqno last_block_seqno{0}; ton::BlockSeqno prev_mc_block_seqno{0}; ton::BlockSeqno new_block_seqno{0}; @@ -89,7 +90,8 @@ class Collator final : public td::actor::Actor { public: Collator(ShardIdFull shard, bool is_hardfork, BlockIdExt min_masterchain_block_id, std::vector prev, Ref validator_set, Ed25519_PublicKey collator_id, - td::actor::ActorId manager, td::Timestamp timeout, td::Promise promise); + td::actor::ActorId manager, td::Timestamp timeout, td::Promise promise, + unsigned mode); ~Collator() override = default; bool is_busy() const { return busy_; @@ -231,7 +233,8 @@ class Collator final : public td::actor::Actor { void after_get_aux_shard_state(ton::BlockIdExt blkid, td::Result> res); bool fix_one_processed_upto(block::MsgProcessedUpto& proc, const ton::ShardIdFull& owner); bool fix_processed_upto(block::MsgProcessedUptoCollection& upto); - void got_neighbor_msg_queue(unsigned i, td::Result> R); + void got_neighbor_msg_queues(td::Result>> R); + void got_neighbor_msg_queue(unsigned i, Ref res); bool adjust_shard_config(); bool store_shard_fees(ShardIdFull shard, const block::CurrencyCollection& fees, const block::CurrencyCollection& created); diff --git a/validator/impl/collator.cpp b/validator/impl/collator.cpp index f11bd1169..d2ace04f6 100644 --- a/validator/impl/collator.cpp +++ b/validator/impl/collator.cpp @@ -57,7 +57,7 @@ static inline bool dbg(int c) { Collator::Collator(ShardIdFull shard, bool is_hardfork, BlockIdExt min_masterchain_block_id, std::vector prev, td::Ref validator_set, Ed25519_PublicKey collator_id, td::actor::ActorId manager, td::Timestamp timeout, - td::Promise promise) + td::Promise promise, unsigned mode) : shard_(shard) , is_hardfork_(is_hardfork) , min_mc_block_id{min_masterchain_block_id} @@ -71,6 +71,7 @@ Collator::Collator(ShardIdFull shard, bool is_hardfork, BlockIdExt min_mastercha , soft_timeout_(td::Timestamp::at(timeout.at() - 3.0)) , medium_timeout_(td::Timestamp::at(timeout.at() - 1.5)) , main_promise(std::move(promise)) + , mode_(mode) , perf_timer_("collate", 0.1, [manager](double duration) { send_closure(manager, &ValidatorManager::add_perf_timer_stat, "collate", duration); }) { @@ -631,27 +632,45 @@ bool Collator::request_neighbor_msg_queues() { } neighbors_.emplace_back(*shard_ptr); } + std::vector top_blocks; unsigned i = 0; for (block::McShardDescr& descr : neighbors_) { LOG(DEBUG) << "neighbor #" << i << " : " << descr.blk_.to_str(); - ++pending; - send_closure_later(manager, &ValidatorManager::wait_out_msg_queue_proof, descr.blk_, shard_, priority(), timeout, - [self = get_self(), i](td::Result> res) { - LOG(DEBUG) << "got msg queue for neighbor #" << i; - send_closure_later(std::move(self), &Collator::got_neighbor_msg_queue, i, std::move(res)); - }); + top_blocks.push_back(descr.blk_); ++i; } + ++pending; + td::actor::send_closure_later( + manager, &ValidatorManager::wait_neighbor_msg_queue_proofs, shard_, std::move(top_blocks), timeout, + [self = get_self()](td::Result>> res) { + td::actor::send_closure_later(std::move(self), &Collator::got_neighbor_msg_queues, std::move(res)); + }); return true; } -void Collator::got_neighbor_msg_queue(unsigned i, td::Result> R) { +void Collator::got_neighbor_msg_queues(td::Result>> R) { --pending; if (R.is_error()) { - fatal_error(R.move_as_error()); + fatal_error(R.move_as_error_prefix("failed to get neighbor msg queues: ")); return; } + LOG(INFO) << "neighbor output queues fetched"; auto res = R.move_as_ok(); + unsigned i = 0; + for (block::McShardDescr& descr : neighbors_) { + LOG(DEBUG) << "neighbor #" << i << " : " << descr.blk_.to_str(); + auto it = res.find(descr.blk_); + if (it == res.end()) { + fatal_error(PSTRING() << "no msg queue from neighbor #" << i); + return; + } + got_neighbor_msg_queue(i, it->second); + ++i; + } + check_pending(); +} + +void Collator::got_neighbor_msg_queue(unsigned i, Ref res) { BlockIdExt block_id = neighbors_.at(i).blk_; if (res->block_state_proof_.not_null() && !block_id.is_masterchain()) { block_state_proofs_.emplace(block_id.root_hash, res->block_state_proof_); @@ -731,10 +750,6 @@ void Collator::got_neighbor_msg_queue(unsigned i, td::Resultid, - block_candidate->clone(), [self = get_self()](td::Result saved) -> void { - LOG(DEBUG) << "got answer to set_block_candidate"; - td::actor::send_closure_later(std::move(self), &Collator::return_block_candidate, - std::move(saved)); - }); + if (mode_ & CollateMode::skip_store_candidate) { + td::actor::send_closure_later(actor_id(this), &Collator::return_block_candidate, td::Unit()); + } else { + LOG(INFO) << "saving new BlockCandidate"; + td::actor::send_closure_later(manager, &ValidatorManager::set_block_candidate, block_candidate->id, + block_candidate->clone(), [self = get_self()](td::Result saved) -> void { + LOG(DEBUG) << "got answer to set_block_candidate"; + td::actor::send_closure_later(std::move(self), &Collator::return_block_candidate, + std::move(saved)); + }); + } // 5. communicate about bad and delayed external messages if (!bad_ext_msgs_.empty() || !delay_ext_msgs_.empty()) { LOG(INFO) << "sending complete_external_messages() to Manager"; diff --git a/validator/impl/fabric.cpp b/validator/impl/fabric.cpp index 5d8b64400..a5c52f7cc 100644 --- a/validator/impl/fabric.cpp +++ b/validator/impl/fabric.cpp @@ -210,9 +210,9 @@ void run_validate_query(ShardIdFull shard, BlockIdExt min_masterchain_block_id, } void run_collate_query(ShardIdFull shard, const BlockIdExt& min_masterchain_block_id, std::vector prev, - Ed25519_PublicKey collator_id, td::Ref validator_set, + Ed25519_PublicKey creator, td::Ref validator_set, td::actor::ActorId manager, td::Timestamp timeout, - td::Promise promise) { + td::Promise promise, unsigned mode) { BlockSeqno seqno = 0; for (auto& p : prev) { if (p.seqno() > seqno) { @@ -220,8 +220,8 @@ void run_collate_query(ShardIdFull shard, const BlockIdExt& min_masterchain_bloc } } td::actor::create_actor(PSTRING() << "collate" << shard.to_str() << ":" << (seqno + 1), shard, false, - min_masterchain_block_id, std::move(prev), std::move(validator_set), collator_id, - std::move(manager), timeout, std::move(promise)) + min_masterchain_block_id, std::move(prev), std::move(validator_set), creator, + std::move(manager), timeout, std::move(promise), mode) .release(); } @@ -236,7 +236,8 @@ void run_collate_hardfork(ShardIdFull shard, const BlockIdExt& min_masterchain_b } td::actor::create_actor(PSTRING() << "collate" << shard.to_str() << ":" << (seqno + 1), shard, true, min_masterchain_block_id, std::move(prev), td::Ref{}, - Ed25519_PublicKey{Bits256::zero()}, std::move(manager), timeout, std::move(promise)) + Ed25519_PublicKey{Bits256::zero()}, std::move(manager), timeout, std::move(promise), + 0) .release(); } diff --git a/validator/impl/out-msg-queue-proof.cpp b/validator/impl/out-msg-queue-proof.cpp index 365913760..601f083aa 100644 --- a/validator/impl/out-msg-queue-proof.cpp +++ b/validator/impl/out-msg-queue-proof.cpp @@ -46,9 +46,9 @@ static td::Status check_no_prunned(const vm::CellSlice& cs) { return td::Status::OK(); } -static td::Result process_queue(BlockIdExt block_id, ShardIdFull dst_shard, - block::ImportedMsgQueueLimits limits, - const block::gen::OutMsgQueueInfo::Record& qinfo) { +static td::Result> process_queue( + ShardIdFull dst_shard, std::vector> blocks, + block::ImportedMsgQueueLimits limits) { td::uint64 estimated_proof_size = 0; td::HashSet visited; @@ -66,14 +66,18 @@ static td::Result process_queue(BlockIdExt block_id, ShardIdFull dst_ dfs(cs.prefetch_ref(i)); } }; - TRY_STATUS_PREFIX(check_no_prunned(*qinfo.proc_info), "invalid proc_info proof: ") - TRY_STATUS_PREFIX(check_no_prunned(*qinfo.ihr_pending), "invalid ihr_pending proof: ") - dfs_cs(*qinfo.proc_info); - dfs_cs(*qinfo.ihr_pending); - - block::OutputQueueMerger queue_merger{ - dst_shard, {block::OutputQueueMerger::Neighbor{block_id, qinfo.out_queue->prefetch_ref()}}}; - td::int32 msg_count = 0; + std::vector neighbors; + for (auto& b : blocks) { + TRY_STATUS_PREFIX(check_no_prunned(*b.second.proc_info), "invalid proc_info proof: ") + TRY_STATUS_PREFIX(check_no_prunned(*b.second.ihr_pending), "invalid ihr_pending proof: ") + dfs_cs(*b.second.proc_info); + dfs_cs(*b.second.ihr_pending); + neighbors.emplace_back(b.first, b.second.out_queue->prefetch_ref()); + } + + block::OutputQueueMerger queue_merger{dst_shard, std::move(neighbors)}; + std::vector msg_count(blocks.size()); + td::int32 msg_count_total = 0; bool limit_reached = false; while (!queue_merger.is_eof()) { @@ -87,7 +91,8 @@ static td::Result process_queue(BlockIdExt block_id, ShardIdFull dst_ if (limit_reached) { break; } - ++msg_count; + ++msg_count[kv->source]; + ++msg_count_total; // TODO: Get processed_upto from destination shard (in request?) /* @@ -114,234 +119,410 @@ static td::Result process_queue(BlockIdExt block_id, ShardIdFull dst_ dfs_cs(*kv->msg); TRY_STATUS_PREFIX(check_no_prunned(*kv->msg), "invalid message proof: ") - if (estimated_proof_size >= limits.max_bytes || msg_count >= (long long)limits.max_msgs) { + if (estimated_proof_size >= limits.max_bytes || msg_count_total >= (long long)limits.max_msgs) { limit_reached = true; } } - return limit_reached ? msg_count : -1; + if (!limit_reached) { + std::fill(msg_count.begin(), msg_count.end(), -1); + } + return msg_count; } td::Result> OutMsgQueueProof::build( - BlockIdExt block_id, ShardIdFull dst_shard, - block::ImportedMsgQueueLimits limits, Ref state_root, Ref block_root) { + ShardIdFull dst_shard, std::vector blocks, block::ImportedMsgQueueLimits limits) { if (!dst_shard.is_valid_ext()) { return td::Status::Error("invalid shard"); } + if (blocks.empty()) { + return create_tl_object(td::BufferSlice{}, td::BufferSlice{}, + std::vector{}); + } + + std::vector> block_state_proofs; + for (auto& block : blocks) { + if (block.id.seqno() != 0) { + if (block.block_root.is_null()) { + return td::Status::Error("block is null"); + } + TRY_RESULT(proof, create_block_state_proof(block.block_root)); + block_state_proofs.push_back(std::move(proof)); + } + if (!block::ShardConfig::is_neighbor(dst_shard, block.id.shard_full())) { + return td::Status::Error("shards are not neighbors"); + } + } + TRY_RESULT(block_state_proof, vm::std_boc_serialize_multi(block_state_proofs)); - vm::MerkleProofBuilder mpb{std::move(state_root)}; - TRY_RESULT(state, ShardStateQ::fetch(block_id, {}, mpb.root())); - TRY_RESULT(outq_descr, state->message_queue()); - block::gen::OutMsgQueueInfo::Record qinfo; - if (!tlb::unpack_cell(outq_descr->root_cell(), qinfo)) { - return td::Status::Error("invalid message queue"); + vm::Dictionary states_dict_pure{32}; + for (size_t i = 0; i < blocks.size(); ++i) { + if (blocks[i].state_root.is_null()) { + return td::Status::Error("state is null"); + } + states_dict_pure.set_ref(td::BitArray<32>{(long long)i}, blocks[i].state_root); } - TRY_RESULT(cnt, process_queue(block_id, dst_shard, limits, qinfo)); - TRY_RESULT(queue_proof, mpb.extract_proof_boc()); - td::BufferSlice block_state_proof; - if (block_id.seqno() != 0) { - TRY_RESULT(proof, create_block_state_proof(std::move(block_root))); - TRY_RESULT_ASSIGN(block_state_proof, vm::std_boc_serialize(std::move(proof), 31)); + vm::MerkleProofBuilder mpb{states_dict_pure.get_root_cell()}; + vm::Dictionary states_dict{mpb.root(), 32}; + std::vector> data(blocks.size()); + for (size_t i = 0; i < blocks.size(); ++i) { + data[i].first = blocks[i].id; + TRY_RESULT(state, ShardStateQ::fetch(blocks[i].id, {}, states_dict.lookup_ref(td::BitArray<32>{(long long)i}))); + TRY_RESULT(outq_descr, state->message_queue()); + block::gen::OutMsgQueueInfo::Record qinfo; + if (!tlb::unpack_cell(outq_descr->root_cell(), data[i].second)) { + return td::Status::Error("invalid message queue"); + } } + TRY_RESULT(msg_count, process_queue(dst_shard, std::move(data), limits)); + + TRY_RESULT(proof, mpb.extract_proof()); + vm::Dictionary states_dict_proof{vm::CellSlice{vm::NoVm(), proof}.prefetch_ref(), 32}; + std::vector> state_proofs; + for (size_t i = 0; i < blocks.size(); ++i) { + td::Ref proof_raw = states_dict_proof.lookup_ref(td::BitArray<32>{(long long)i}); + CHECK(proof_raw.not_null()); + state_proofs.push_back(vm::CellBuilder::create_merkle_proof(proof_raw)); + } + TRY_RESULT(queue_proof, vm::std_boc_serialize_multi(state_proofs)); return create_tl_object(std::move(queue_proof), std::move(block_state_proof), - cnt); + std::move(msg_count)); } -td::Result> OutMsgQueueProof::fetch(BlockIdExt block_id, ShardIdFull dst_shard, - block::ImportedMsgQueueLimits limits, - const ton_api::tonNode_outMsgQueueProof& f) { +td::Result>> OutMsgQueueProof::fetch(ShardIdFull dst_shard, + std::vector blocks, + block::ImportedMsgQueueLimits limits, + const ton_api::tonNode_outMsgQueueProof& f) { try { - Ref block_state_proof; - td::Bits256 state_root_hash; - if (block_id.seqno() == 0) { - if (!f.block_state_proof_.empty()) { - return td::Status::Error("expected empty block state proof"); - } - state_root_hash = block_id.root_hash; - } else { - TRY_RESULT_ASSIGN(block_state_proof, vm::std_boc_deserialize(f.block_state_proof_.as_slice())); - TRY_RESULT_ASSIGN(state_root_hash, unpack_block_state_proof(block_id, block_state_proof)); + std::vector> res; + TRY_RESULT(queue_proofs, vm::std_boc_deserialize_multi(f.queue_proofs_, (int)blocks.size())); + TRY_RESULT(block_state_proofs, vm::std_boc_deserialize_multi(f.block_state_proofs_, (int)blocks.size())); + if (queue_proofs.size() != blocks.size()) { + return td::Status::Error("invalid size of queue_proofs"); } - - TRY_RESULT(queue_proof, vm::std_boc_deserialize(f.queue_proof_.as_slice())); - auto virtual_root = vm::MerkleProof::virtualize(queue_proof, 1); - if (virtual_root.is_null()) { - return td::Status::Error("invalid queue proof"); + if (f.msg_counts_.size() != blocks.size()) { + return td::Status::Error("invalid size of msg_counts"); } - if (virtual_root->get_hash().as_slice() != state_root_hash.as_slice()) { - return td::Status::Error("state root hash mismatch"); + size_t j = 0; + std::vector> data(blocks.size()); + for (size_t i = 0; i < blocks.size(); ++i) { + td::Bits256 state_root_hash; + Ref block_state_proof = {}; + if (blocks[i].seqno() == 0) { + state_root_hash = blocks[i].root_hash; + } else { + if (j == block_state_proofs.size()) { + return td::Status::Error("invalid size of block_state_proofs"); + } + block_state_proof = block_state_proofs[j++]; + TRY_RESULT_ASSIGN(state_root_hash, unpack_block_state_proof(blocks[i], block_state_proof)); + } + auto state_root = vm::MerkleProof::virtualize(queue_proofs[i], 1); + if (state_root->get_hash().as_slice() != state_root_hash.as_slice()) { + return td::Status::Error("state root hash mismatch"); + } + res.emplace_back(true, blocks[i], state_root, block_state_proof, f.msg_counts_[i]); + + data[i].first = blocks[i]; + TRY_RESULT(state, ShardStateQ::fetch(blocks[i], {}, state_root)); + TRY_RESULT(outq_descr, state->message_queue()); + block::gen::OutMsgQueueInfo::Record qinfo; + if (!tlb::unpack_cell(outq_descr->root_cell(), data[i].second)) { + return td::Status::Error("invalid message queue"); + } } - - // Validate proof - TRY_RESULT_PREFIX(state, ShardStateQ::fetch(block_id, {}, virtual_root), "invalid proof: "); - TRY_RESULT_PREFIX(outq_descr, state->message_queue(), "invalid proof: "); - - block::gen::OutMsgQueueInfo::Record qinfo; - if (!tlb::unpack_cell(outq_descr->root_cell(), qinfo)) { - return td::Status::Error("invalid proof: invalid message queue"); + if (j != block_state_proofs.size()) { + return td::Status::Error("invalid size of block_state_proofs"); } - TRY_STATUS_PREFIX(check_no_prunned(qinfo.proc_info->prefetch_ref(0)), "invalid proc_info: ") - TRY_STATUS_PREFIX(check_no_prunned(qinfo.ihr_pending->prefetch_ref(0)), "invalid ihr_pending: ") - TRY_RESULT(cnt, process_queue(block_id, dst_shard, limits, qinfo)); - if (cnt != f.msg_count_) { - return td::Status::Error(PSTRING() << "invalid msg_count: expected=" << f.msg_count_ << ", found=" << cnt); + TRY_RESULT(msg_count, process_queue(dst_shard, std::move(data), limits)); + if (msg_count != f.msg_counts_) { + return td::Status::Error("incorrect msg_count"); } - return Ref(true, std::move(virtual_root), std::move(block_state_proof), cnt); + return res; } catch (vm::VmVirtError& err) { return td::Status::Error(PSTRING() << "invalid proof: " << err.get_msg()); } } -void WaitOutMsgQueueProof::alarm() { - abort_query(td::Status::Error(ErrorCode::timeout, "timeout")); +void OutMsgQueueImporter::new_masterchain_block_notification(td::Ref state, + std::set collating_shards) { + last_masterchain_state_ = state; + if (collating_shards.empty() || state->get_unix_time() < (td::uint32)td::Clocks::system() - 20) { + return; + } + auto can_collate_shard = [&](const ShardIdFull& shard) -> bool { + return std::any_of(collating_shards.begin(), collating_shards.end(), + [&](ShardIdFull our_shard) { return shard_intersects(shard, our_shard); }); + }; + auto shards = state->get_shards(); + auto process_dst_shard = [&](const ShardIdFull& dst_shard) { + if (!can_collate_shard(dst_shard)) { + return; + } + std::vector top_blocks; + for (const auto& shard : shards) { + if (block::ShardConfig::is_neighbor(dst_shard, shard->shard())) { + top_blocks.push_back(shard->top_block_id()); + } + } + get_neighbor_msg_queue_proofs(dst_shard, std::move(top_blocks), td::Timestamp::in(15.0), + [](td::Result>>) {}); + }; + for (const auto& shard : shards) { + if (shard->before_merge()) { + if (is_left_child(shard->shard())) { + process_dst_shard(shard_parent(shard->shard())); + } + } else if (shard->before_split()) { + process_dst_shard(shard_child(shard->shard(), true)); + process_dst_shard(shard_child(shard->shard(), false)); + } else { + process_dst_shard(shard->shard()); + } + } } -void WaitOutMsgQueueProof::abort_query(td::Status reason) { - if (promise_) { - LOG(DEBUG) << "aborting wait msg queue query for " << block_id_.to_str() << " priority=" << priority_ << ": " - << reason; - promise_.set_error( - reason.move_as_error_prefix(PSTRING() << "failed to get msg queue for " << block_id_.to_str() << ": ")); +void OutMsgQueueImporter::get_neighbor_msg_queue_proofs( + ShardIdFull dst_shard, std::vector blocks, td::Timestamp timeout, + td::Promise>> promise) { + std::sort(blocks.begin(), blocks.end()); + auto entry = cache_[{dst_shard, blocks}]; + if (entry) { + if (entry->done) { + promise.set_result(entry->result); + alarm_timestamp().relax(entry->timeout = td::Timestamp::in(CACHE_TTL)); + } else { + entry->timeout = std::max(entry->timeout, timeout); + entry->promises.emplace_back(std::move(promise), timeout); + alarm_timestamp().relax(timeout); + } + return; } - stop(); -} -void WaitOutMsgQueueProof::finish_query(Ref result) { - promise_.set_result(std::move(result)); - stop(); + LOG(DEBUG) << "Importing neighbor msg queues for shard " << dst_shard.to_str() << ", " << blocks.size() << " blocks"; + + cache_[{dst_shard, blocks}] = entry = std::make_shared(); + entry->dst_shard = dst_shard; + entry->blocks = blocks; + entry->promises.emplace_back(std::move(promise), timeout); + alarm_timestamp().relax(entry->timeout = timeout); + + std::map> new_queries; + for (const BlockIdExt& block : blocks) { + if (opts_->need_monitor(block.shard_full(), last_masterchain_state_)) { + ++entry->pending; + get_proof_local(entry, block); + } else { + ShardIdFull prefix = block.shard_full(); + int min_split = last_masterchain_state_->monitor_min_split_depth(prefix.workchain); + if (prefix.pfx_len() > min_split) { + prefix = shard_prefix(prefix, min_split); + } + new_queries[prefix].push_back(block); + } + }; + auto limits = last_masterchain_state_->get_imported_msg_queue_limits(dst_shard.workchain); + for (auto& p : new_queries) { + ++entry->pending; + get_proof_import(entry, std::move(p.second), limits); + } + if (entry->pending == 0) { + finish_query(entry); + } } -void WaitOutMsgQueueProof::start_up() { - alarm_timestamp() = timeout_; - if (local_) { - run_local(); - } else { - run_net(); +void OutMsgQueueImporter::get_proof_local(std::shared_ptr entry, BlockIdExt block) { + if (!check_timeout(entry)) { + return; } + td::actor::send_closure( + manager_, &ValidatorManager::wait_block_state_short, block, 0, entry->timeout, + [=, SelfId = actor_id(this), manager = manager_, timeout = entry->timeout, + retry_after = td::Timestamp::in(0.5)](td::Result> R) mutable { + if (R.is_error()) { + LOG(DEBUG) << "Failed to get block state for " << block.to_str() << ": " << R.move_as_error(); + delay_action([=]() { td::actor::send_closure(SelfId, &OutMsgQueueImporter::get_proof_local, entry, block); }, + retry_after); + return; + } + auto state = R.move_as_ok(); + if (block.seqno() == 0) { + std::vector> proof = { + td::Ref(true, block, state->root_cell(), td::Ref{})}; + td::actor::send_closure(SelfId, &OutMsgQueueImporter::got_proof, entry, std::move(proof)); + return; + } + td::actor::send_closure( + manager, &ValidatorManager::wait_block_data_short, block, 0, timeout, + [=](td::Result> R) mutable { + if (R.is_error()) { + LOG(DEBUG) << "Failed to get block data for " << block.to_str() << ": " << R.move_as_error(); + delay_action( + [=]() { td::actor::send_closure(SelfId, &OutMsgQueueImporter::get_proof_local, entry, block); }, + retry_after); + return; + } + Ref block_state_proof = create_block_state_proof(R.ok()->root_cell()).move_as_ok(); + std::vector> proof = { + td::Ref(true, block, state->root_cell(), std::move(block_state_proof))}; + td::actor::send_closure(SelfId, &OutMsgQueueImporter::got_proof, entry, std::move(proof)); + }); + }); } -void WaitOutMsgQueueProof::run_local() { - ++pending; - td::actor::send_closure(manager_, &ValidatorManager::wait_block_state_short, block_id_, priority_, timeout_, - [SelfId = actor_id(this)](td::Result> R) { - if (R.is_error()) { - td::actor::send_closure(SelfId, &WaitOutMsgQueueProof::abort_query, - R.move_as_error_prefix("failed to get shard state: ")); - } else { - td::actor::send_closure(SelfId, &WaitOutMsgQueueProof::got_state_root, - R.move_as_ok()->root_cell()); - } - }); - if (block_id_.seqno() != 0) { - ++pending; - td::actor::send_closure(manager_, &ValidatorManager::wait_block_data_short, block_id_, priority_, timeout_, - [SelfId = actor_id(this)](td::Result> R) { - if (R.is_error()) { - td::actor::send_closure(SelfId, &WaitOutMsgQueueProof::abort_query, - R.move_as_error_prefix("failed to get block data: ")); - } else { - td::actor::send_closure(SelfId, &WaitOutMsgQueueProof::got_block_root, - R.move_as_ok()->root_cell()); - } - }); +void OutMsgQueueImporter::get_proof_import(std::shared_ptr entry, std::vector blocks, + block::ImportedMsgQueueLimits limits) { + if (!check_timeout(entry)) { + return; } + td::actor::send_closure( + manager_, &ValidatorManager::send_get_out_msg_queue_proof_request, entry->dst_shard, blocks, limits, + [=, SelfId = actor_id(this), retry_after = td::Timestamp::in(0.5), + dst_shard = entry->dst_shard](td::Result>> R) { + if (R.is_error()) { + LOG(DEBUG) << "Failed to get out msg queue for " << dst_shard.to_str() << ": " << R.move_as_error(); + delay_action( + [=]() { + td::actor::send_closure(SelfId, &OutMsgQueueImporter::get_proof_import, entry, std::move(blocks), + limits); + }, + retry_after); + return; + } + td::actor::send_closure(SelfId, &OutMsgQueueImporter::got_proof, entry, R.move_as_ok()); + }); } -void WaitOutMsgQueueProof::got_state_root(Ref root) { - state_root_ = std::move(root); - if (--pending == 0) { - run_local_cont(); +void OutMsgQueueImporter::got_proof(std::shared_ptr entry, std::vector> proofs) { + if (!check_timeout(entry)) { + return; + } + for (auto& p : proofs) { + entry->result[p->block_id_] = std::move(p); + } + CHECK(entry->pending > 0); + if (--entry->pending == 0) { + finish_query(entry); } } -void WaitOutMsgQueueProof::got_block_root(Ref root) { - block_root_ = std::move(root); - if (--pending == 0) { - run_local_cont(); +void OutMsgQueueImporter::finish_query(std::shared_ptr entry) { + LOG(DEBUG) << "Done importing neighbor msg queues for shard " << entry->dst_shard.to_str() << ", " + << entry->blocks.size() << " blocks in " << entry->timer.elapsed() << "s"; + entry->done = true; + alarm_timestamp().relax(entry->timeout = td::Timestamp::in(CACHE_TTL)); + for (auto& p : entry->promises) { + p.first.set_result(entry->result); } + entry->promises.clear(); } -void WaitOutMsgQueueProof::run_local_cont() { - Ref block_state_proof; - if (block_id_.seqno() != 0) { - auto R = create_block_state_proof(std::move(block_root_)); - if (R.is_error()) { - abort_query(R.move_as_error_prefix("failed to create block state proof")); - return; +bool OutMsgQueueImporter::check_timeout(std::shared_ptr entry) { + if (entry->timeout.is_in_past()) { + LOG(DEBUG) << "Aborting importing neighbor msg queues for shard " << entry->dst_shard.to_str() << ", " + << entry->blocks.size() << " blocks: timeout"; + for (auto& p : entry->promises) { + p.first.set_error(td::Status::Error(ErrorCode::timeout, "timeout")); } - block_state_proof = R.move_as_ok(); + entry->promises.clear(); + auto it = cache_.find({entry->dst_shard, entry->blocks}); + if (it != cache_.end() && it->second == entry) { + cache_.erase(it); + } + return false; } - finish_query(td::Ref(true, std::move(state_root_), std::move(block_state_proof))); + return true; } -void WaitOutMsgQueueProof::run_net() { - auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), block_id = block_id_, - retry_after = td::Timestamp::in(0.5)](td::Result> R) { - if (R.is_error()) { - LOG(DEBUG) << "failed to get msg queue for " << block_id.to_str() << " from net: " << R.move_as_error(); - delay_action([SelfId]() mutable { td::actor::send_closure(SelfId, &WaitOutMsgQueueProof::run_net); }, - retry_after); - } else { - td::actor::send_closure(SelfId, &WaitOutMsgQueueProof::finish_query, R.move_as_ok()); +void OutMsgQueueImporter::alarm() { + auto it = cache_.begin(); + while (it != cache_.end()) { + auto& promises = it->second->promises; + if (it->second->timeout.is_in_past()) { + if (!it->second->done) { + LOG(DEBUG) << "Aborting importing neighbor msg queues for shard " << it->second->dst_shard.to_str() << ", " + << it->second->blocks.size() << " blocks: timeout"; + for (auto& p : promises) { + p.first.set_error(td::Status::Error(ErrorCode::timeout, "timeout")); + } + promises.clear(); + } + it = cache_.erase(it); + continue; } - }); - - td::actor::send_closure(manager_, &ValidatorManager::send_get_out_msg_queue_proof_request, block_id_, dst_shard_, - limits_, priority_, std::move(P)); + alarm_timestamp().relax(it->second->timeout); + size_t j = 0; + for (auto& p : promises) { + if (p.second.is_in_past()) { + p.first.set_error(td::Status::Error(ErrorCode::timeout, "timeout")); + } else { + alarm_timestamp().relax(p.second); + promises[j++] = std::move(p); + } + } + promises.resize(j); + ++it; + } } void BuildOutMsgQueueProof::abort_query(td::Status reason) { if (promise_) { - LOG(DEBUG) << "failed to build msg queue proof for " << block_id_.to_str() << ": " << reason; + LOG(DEBUG) << "failed to build msg queue proof to " << dst_shard_.to_str() << ": " << reason; promise_.set_error( - reason.move_as_error_prefix(PSTRING() << "failed to build msg queue proof for " << block_id_.to_str() << ": ")); + reason.move_as_error_prefix(PSTRING() << "failed to build msg queue proof to " << dst_shard_.to_str() << ": ")); } stop(); } void BuildOutMsgQueueProof::start_up() { - ++pending; - td::actor::send_closure(manager_, &ValidatorManagerInterface::get_shard_state_from_db_short, block_id_, - [SelfId = actor_id(this)](td::Result> R) { - if (R.is_error()) { - td::actor::send_closure(SelfId, &BuildOutMsgQueueProof::abort_query, - R.move_as_error_prefix("failed to get shard state: ")); - } else { - td::actor::send_closure(SelfId, &BuildOutMsgQueueProof::got_state_root, - R.move_as_ok()->root_cell()); - } - }); - if (block_id_.seqno() != 0) { + for (size_t i = 0; i < blocks_.size(); ++i) { + BlockIdExt id = blocks_[i].id; ++pending; - td::actor::send_closure(manager_, &ValidatorManagerInterface::get_block_data_from_db_short, block_id_, - [SelfId = actor_id(this)](td::Result> R) { + td::actor::send_closure(manager_, &ValidatorManagerInterface::get_shard_state_from_db_short, id, + [SelfId = actor_id(this), i](td::Result> R) { if (R.is_error()) { td::actor::send_closure(SelfId, &BuildOutMsgQueueProof::abort_query, - R.move_as_error_prefix("failed to get block data: ")); + R.move_as_error_prefix("failed to get shard state: ")); } else { - td::actor::send_closure(SelfId, &BuildOutMsgQueueProof::got_block_root, + td::actor::send_closure(SelfId, &BuildOutMsgQueueProof::got_state_root, i, R.move_as_ok()->root_cell()); } }); + if (id.seqno() != 0) { + ++pending; + td::actor::send_closure(manager_, &ValidatorManagerInterface::get_block_data_from_db_short, id, + [SelfId = actor_id(this), i](td::Result> R) { + if (R.is_error()) { + td::actor::send_closure(SelfId, &BuildOutMsgQueueProof::abort_query, + R.move_as_error_prefix("failed to get block data: ")); + } else { + td::actor::send_closure(SelfId, &BuildOutMsgQueueProof::got_block_root, i, + R.move_as_ok()->root_cell()); + } + }); + } + } + if (pending == 0) { + build_proof(); } } -void BuildOutMsgQueueProof::got_state_root(Ref root) { - state_root_ = std::move(root); +void BuildOutMsgQueueProof::got_state_root(size_t i, Ref root) { + blocks_[i].state_root = std::move(root); if (--pending == 0) { build_proof(); } } -void BuildOutMsgQueueProof::got_block_root(Ref root) { - block_root_ = std::move(root); +void BuildOutMsgQueueProof::got_block_root(size_t i, Ref root) { + blocks_[i].block_root = std::move(root); if (--pending == 0) { build_proof(); } } void BuildOutMsgQueueProof::build_proof() { - auto result = OutMsgQueueProof::build(block_id_, dst_shard_, limits_, std::move(state_root_), std::move(block_root_)); + auto result = OutMsgQueueProof::build(dst_shard_, std::move(blocks_), limits_); if (result.is_error()) { LOG(ERROR) << "Failed to build msg queue proof: " << result.error(); } diff --git a/validator/impl/out-msg-queue-proof.hpp b/validator/impl/out-msg-queue-proof.hpp index 9c5acfef9..33f6c399e 100644 --- a/validator/impl/out-msg-queue-proof.hpp +++ b/validator/impl/out-msg-queue-proof.hpp @@ -20,6 +20,8 @@ #include "auto/tl/ton_api.h" #include "interfaces/out-msg-queue-proof.h" #include "td/actor/actor.h" +#include "interfaces/shard.h" +#include "validator.h" namespace ton { @@ -29,83 +31,77 @@ using td::Ref; class ValidatorManager; class ValidatorManagerInterface; -class WaitOutMsgQueueProof : public td::actor::Actor { +class OutMsgQueueImporter : public td::actor::Actor { public: - WaitOutMsgQueueProof(BlockIdExt block_id, ShardIdFull dst_shard, block::ImportedMsgQueueLimits limits, bool local, - td::uint32 priority, td::actor::ActorId manager, td::Timestamp timeout, - td::Promise> promise) - : block_id_(std::move(block_id)) - , dst_shard_(dst_shard) - , limits_(limits) - , local_(local) - , priority_(priority) - , manager_(manager) - , timeout_(timeout) - , promise_(std::move(promise)) { + OutMsgQueueImporter(td::actor::ActorId manager, td::Ref opts, + td::Ref last_masterchain_state) + : manager_(manager), opts_(opts), last_masterchain_state_(last_masterchain_state) { } - void update_timeout(td::Timestamp timeout, td::uint32 priority) { - timeout_ = timeout; - alarm_timestamp() = timeout_; - priority_ = priority; + void new_masterchain_block_notification(td::Ref state, std::set collating_shards); + void get_neighbor_msg_queue_proofs(ShardIdFull dst_shard, std::vector blocks, td::Timestamp timeout, + td::Promise>> promise); + + void update_options(td::Ref opts) { + opts_ = std::move(opts); } - void abort_query(td::Status reason); - void finish_query(Ref result); void alarm() override; - void start_up() override; - - void run_local(); - void got_state_root(Ref root); - void got_block_root(Ref root); - void run_local_cont(); - - void run_net(); - private: - BlockIdExt block_id_; - ShardIdFull dst_shard_; - block::ImportedMsgQueueLimits limits_; - bool local_; - td::uint32 priority_; - td::actor::ActorId manager_; - td::Timestamp timeout_; - td::Promise> promise_; - - Ref state_root_, block_root_; - unsigned pending = 0; + td::Ref opts_; + td::Ref last_masterchain_state_; + + struct CacheEntry { + ShardIdFull dst_shard; + std::vector blocks; + bool done = false; + std::map> result; + std::vector>>, td::Timestamp>> promises; + td::Timestamp timeout = td::Timestamp::never(); + td::Timer timer; + size_t pending = 0; + }; + std::map>, std::shared_ptr> cache_; + + void get_proof_local(std::shared_ptr entry, BlockIdExt block); + void get_proof_import(std::shared_ptr entry, std::vector blocks, + block::ImportedMsgQueueLimits limits); + void got_proof(std::shared_ptr entry, std::vector> proofs); + void finish_query(std::shared_ptr entry); + bool check_timeout(std::shared_ptr entry); + + constexpr static const double CACHE_TTL = 30.0; }; class BuildOutMsgQueueProof : public td::actor::Actor { public: - BuildOutMsgQueueProof(BlockIdExt block_id, ShardIdFull dst_shard, block::ImportedMsgQueueLimits limits, + BuildOutMsgQueueProof(ShardIdFull dst_shard, std::vector blocks, block::ImportedMsgQueueLimits limits, td::actor::ActorId manager, td::Promise> promise) - : block_id_(std::move(block_id)) - , dst_shard_(dst_shard) - , limits_(limits) - , manager_(manager) - , promise_(std::move(promise)) { + : dst_shard_(dst_shard), limits_(limits), manager_(manager), promise_(std::move(promise)) { + blocks_.resize(blocks.size()); + for (size_t i = 0; i < blocks_.size(); ++i) { + blocks_[i].id = blocks[i]; + } } void abort_query(td::Status reason); void start_up() override; - void got_state_root(Ref root); - void got_block_root(Ref root); + void got_state_root(size_t i, Ref root); + void got_block_root(size_t i, Ref root); void build_proof(); private: - BlockIdExt block_id_; ShardIdFull dst_shard_; + std::vector blocks_; block::ImportedMsgQueueLimits limits_; td::actor::ActorId manager_; td::Promise> promise_; - Ref state_root_, block_root_; - unsigned pending = 0; + size_t pending = 0; }; } // namespace validator diff --git a/validator/interfaces/out-msg-queue-proof.h b/validator/interfaces/out-msg-queue-proof.h index 524168d0d..c0aa56106 100644 --- a/validator/interfaces/out-msg-queue-proof.h +++ b/validator/interfaces/out-msg-queue-proof.h @@ -26,22 +26,31 @@ namespace validator { using td::Ref; struct OutMsgQueueProof : public td::CntObject { - OutMsgQueueProof(Ref state_root, Ref block_state_proof, td::int32 msg_count = -1) - : state_root_(std::move(state_root)), block_state_proof_(std::move(block_state_proof)), msg_count_(msg_count) { + OutMsgQueueProof(BlockIdExt block_id, Ref state_root, Ref block_state_proof, + td::int32 msg_count = -1) + : block_id_(block_id) + , state_root_(std::move(state_root)) + , block_state_proof_(std::move(block_state_proof)) + , msg_count_(msg_count) { } + BlockIdExt block_id_; Ref state_root_; Ref block_state_proof_; - td::int32 msg_count_; // -1 - up to end of queue - - static td::Result> fetch(BlockIdExt block_id, ShardIdFull dst_shard, - block::ImportedMsgQueueLimits limits, - const ton_api::tonNode_outMsgQueueProof &f); - static td::Result> build(BlockIdExt block_id, ShardIdFull dst_shard, - block::ImportedMsgQueueLimits limits, - Ref state_root, - Ref block_root); - + td::int32 msg_count_; // -1 - no limit + + static td::Result>> fetch(ShardIdFull dst_shard, std::vector blocks, + block::ImportedMsgQueueLimits limits, + const ton_api::tonNode_outMsgQueueProof &f); + + struct OneBlock { + BlockIdExt id; + Ref state_root; + Ref block_root; + }; + static td::Result> build(ShardIdFull dst_shard, + std::vector blocks, + block::ImportedMsgQueueLimits limits); }; } // namespace validator diff --git a/validator/interfaces/validator-manager.h b/validator/interfaces/validator-manager.h index a4a1dd167..c09d42937 100644 --- a/validator/interfaces/validator-manager.h +++ b/validator/interfaces/validator-manager.h @@ -129,9 +129,9 @@ class ValidatorManager : public ValidatorManagerInterface { virtual void send_ihr_message(td::Ref message) = 0; virtual void send_top_shard_block_description(td::Ref desc) = 0; virtual void send_block_broadcast(BlockBroadcast broadcast) = 0; - virtual void send_get_out_msg_queue_proof_request(BlockIdExt id, ShardIdFull dst_shard, - block::ImportedMsgQueueLimits limits, td::uint32 priority, - td::Promise> promise) = 0; + virtual void send_get_out_msg_queue_proof_request(ShardIdFull dst_shard, std::vector blocks, + block::ImportedMsgQueueLimits limits, + td::Promise>> promise) = 0; virtual void update_shard_client_state(BlockIdExt masterchain_block_id, td::Promise promise) = 0; virtual void get_shard_client_state(bool from_db, td::Promise promise) = 0; diff --git a/validator/manager-disk.hpp b/validator/manager-disk.hpp index 17a1bffae..bad3fc35f 100644 --- a/validator/manager-disk.hpp +++ b/validator/manager-disk.hpp @@ -152,8 +152,8 @@ class ValidatorManagerImpl : public ValidatorManager { td::Promise> promise) override; void wait_block_state_short(BlockIdExt block_id, td::uint32 priority, td::Timestamp timeout, td::Promise> promise) override; - void wait_out_msg_queue_proof(BlockIdExt block_id, ShardIdFull dst_shard, td::uint32 priority, td::Timestamp timeout, - td::Promise> promise) override { + void wait_neighbor_msg_queue_proofs(ShardIdFull dst_shard, std::vector blocks, td::Timestamp timeout, + td::Promise>> promise) override { UNREACHABLE(); } @@ -255,9 +255,9 @@ class ValidatorManagerImpl : public ValidatorManager { void send_top_shard_block_description(td::Ref desc) override; void send_block_broadcast(BlockBroadcast broadcast) override { } - void send_get_out_msg_queue_proof_request(BlockIdExt id, ShardIdFull dst_shard, block::ImportedMsgQueueLimits limits, - td::uint32 priority, - td::Promise> promise) override { + void send_get_out_msg_queue_proof_request(ShardIdFull dst_shard, std::vector blocks, + block::ImportedMsgQueueLimits limits, + td::Promise>> promise) override { UNREACHABLE(); } diff --git a/validator/manager-hardfork.hpp b/validator/manager-hardfork.hpp index 65d2b3c0d..33b6a1a71 100644 --- a/validator/manager-hardfork.hpp +++ b/validator/manager-hardfork.hpp @@ -182,8 +182,8 @@ class ValidatorManagerImpl : public ValidatorManager { td::Promise> promise) override; void wait_block_state_short(BlockIdExt block_id, td::uint32 priority, td::Timestamp timeout, td::Promise> promise) override; - void wait_out_msg_queue_proof(BlockIdExt block_id, ShardIdFull dst_shard, td::uint32 priority, td::Timestamp timeout, - td::Promise> promise) override { + void wait_neighbor_msg_queue_proofs(ShardIdFull dst_shard, std::vector blocks, td::Timestamp timeout, + td::Promise>> promise) override { UNREACHABLE(); } @@ -321,9 +321,9 @@ class ValidatorManagerImpl : public ValidatorManager { } void send_block_broadcast(BlockBroadcast broadcast) override { } - void send_get_out_msg_queue_proof_request(BlockIdExt id, ShardIdFull dst_shard, block::ImportedMsgQueueLimits limits, - td::uint32 priority, - td::Promise> promise) override { + void send_get_out_msg_queue_proof_request(ShardIdFull dst_shard, std::vector blocks, + block::ImportedMsgQueueLimits limits, + td::Promise>> promise) override { UNREACHABLE(); } diff --git a/validator/manager.cpp b/validator/manager.cpp index d419f80ee..955334dbc 100644 --- a/validator/manager.cpp +++ b/validator/manager.cpp @@ -618,33 +618,15 @@ void ValidatorManagerImpl::wait_block_state_short(BlockIdExt block_id, td::uint3 get_block_handle(block_id, true, std::move(P)); } -void ValidatorManagerImpl::wait_out_msg_queue_proof(BlockIdExt block_id, ShardIdFull dst_shard, td::uint32 priority, - td::Timestamp timeout, - td::Promise> promise) { - auto key = std::make_pair(block_id, dst_shard); - auto it = wait_out_msg_queue_proof_.find(key); - if (it == wait_out_msg_queue_proof_.end()) { - auto P = td::PromiseCreator::lambda( - [SelfId = actor_id(this), block_id, dst_shard](td::Result> R) { - td::actor::send_closure(SelfId, &ValidatorManagerImpl::finished_wait_msg_queue, block_id, dst_shard, - std::move(R)); - }); - auto id = td::actor::create_actor( - "waitmsgqueue", block_id, dst_shard, - last_masterchain_state_->get_imported_msg_queue_limits(block_id.is_masterchain()), - need_monitor(block_id.shard_full()), priority, actor_id(this), td::Timestamp::at(timeout.at() + 10.0), - std::move(P)) - .release(); - wait_out_msg_queue_proof_[key].actor_ = id; - it = wait_out_msg_queue_proof_.find(key); - } else if (it->second.done_) { - promise.set_result(it->second.result_); - it->second.remove_at_ = td::Timestamp::in(30.0); - return; +void ValidatorManagerImpl::wait_neighbor_msg_queue_proofs( + ShardIdFull dst_shard, std::vector blocks, td::Timestamp timeout, + td::Promise>> promise) { + if (out_msg_queue_importer_.empty()) { + out_msg_queue_importer_ = td::actor::create_actor("outmsgqueueimporter", actor_id(this), opts_, + last_masterchain_state_); } - it->second.waiting_.emplace_back(timeout, priority, std::move(promise)); - auto X = it->second.get_timeout(); - td::actor::send_closure(it->second.actor_, &WaitOutMsgQueueProof::update_timeout, X.first, X.second); + td::actor::send_closure(out_msg_queue_importer_, &OutMsgQueueImporter::get_neighbor_msg_queue_proofs, dst_shard, + std::move(blocks), timeout, std::move(promise)); } void ValidatorManagerImpl::wait_block_data(BlockHandle handle, td::uint32 priority, td::Timestamp timeout, @@ -1082,44 +1064,6 @@ void ValidatorManagerImpl::finished_wait_data(BlockHandle handle, td::Result> R) { - auto it = wait_out_msg_queue_proof_.find({block_id, dst_shard}); - if (it != wait_out_msg_queue_proof_.end()) { - if (R.is_error()) { - auto S = R.move_as_error(); - if (S.code() != ErrorCode::timeout) { - for (auto &X : it->second.waiting_) { - X.promise.set_error(S.clone()); - } - } else { - auto X = it->second.get_timeout(); - auto P = td::PromiseCreator::lambda( - [SelfId = actor_id(this), block_id, dst_shard](td::Result> R) { - td::actor::send_closure(SelfId, &ValidatorManagerImpl::finished_wait_msg_queue, block_id, dst_shard, - std::move(R)); - }); - auto id = td::actor::create_actor( - "waitmsgqueue", block_id, dst_shard, - last_masterchain_state_->get_imported_msg_queue_limits(block_id.is_masterchain()), - need_monitor(block_id.shard_full()), X.second, actor_id(this), X.first, std::move(P)) - .release(); - it->second.actor_ = id; - return; - } - wait_out_msg_queue_proof_.erase(it); - } else { - auto r = R.move_as_ok(); - for (auto &X : it->second.waiting_) { - X.promise.set_result(r); - } - it->second.done_ = true; - it->second.result_ = std::move(r); - it->second.remove_at_ = td::Timestamp::in(30.0); - } - } -} - void ValidatorManagerImpl::set_block_state(BlockHandle handle, td::Ref state, td::Promise> promise) { auto P = td::PromiseCreator::lambda( @@ -1513,11 +1457,11 @@ void ValidatorManagerImpl::send_block_broadcast(BlockBroadcast broadcast) { callback_->send_broadcast(std::move(broadcast)); } -void ValidatorManagerImpl::send_get_out_msg_queue_proof_request(BlockIdExt id, ShardIdFull dst_shard, - block::ImportedMsgQueueLimits limits, - td::uint32 priority, - td::Promise> promise) { - callback_->download_out_msg_queue_proof(id, dst_shard, limits, td::Timestamp::in(10.0), std::move(promise)); +void ValidatorManagerImpl::send_get_out_msg_queue_proof_request( + ShardIdFull dst_shard, std::vector blocks, block::ImportedMsgQueueLimits limits, + td::Promise>> promise) { + callback_->download_out_msg_queue_proof(dst_shard, std::move(blocks), limits, td::Timestamp::in(10.0), + std::move(promise)); } void ValidatorManagerImpl::start_up() { @@ -1836,18 +1780,26 @@ void ValidatorManagerImpl::new_masterchain_block() { td::actor::send_closure(shard_client_, &ShardClient::new_masterchain_block_notification, last_masterchain_block_handle_, last_masterchain_state_); } + if (is_collator()) { + std::set collating_shards; + if (validating_masterchain()) { + collating_shards.emplace(masterchainId); + } + for (const auto &collator : collator_nodes_) { + collating_shards.insert(collator.second.shards.begin(), collator.second.shards.end()); + } + if (!collating_shards.empty()) { + if (out_msg_queue_importer_.empty()) { + out_msg_queue_importer_ = td::actor::create_actor("outmsgqueueimporter", actor_id(this), + opts_, last_masterchain_state_); + } + td::actor::send_closure(out_msg_queue_importer_, &OutMsgQueueImporter::new_masterchain_block_notification, + last_masterchain_state_, std::move(collating_shards)); + } + } for (auto &c : collator_nodes_) { td::actor::send_closure(c.second.actor, &CollatorNode::new_masterchain_block_notification, last_masterchain_state_); } - if (opts_->validator_mode() == ValidatorManagerOptions::validator_lite_shards && validating_masterchain() && - last_masterchain_state_->get_unix_time() > (td::uint32)td::Clocks::system() - 20) { - // Prepare neighboours' queues for collating masterchain - for (const auto &desc : last_masterchain_state_->get_shards()) { - wait_out_msg_queue_proof(desc->top_block_id(), ShardIdFull(masterchainId), 0, td::Timestamp::in(10.0), - [](td::Result>) {}); - } - } - if (last_masterchain_seqno_ % 1024 == 0) { LOG(WARNING) << "applied masterchain block " << last_masterchain_block_id_; } @@ -2514,18 +2466,6 @@ void ValidatorManagerImpl::alarm() { } } alarm_timestamp().relax(check_shard_clients_); - if (cleanup_wait_caches_at_.is_in_past()) { - auto it = wait_out_msg_queue_proof_.begin(); - while (it != wait_out_msg_queue_proof_.end()) { - if (it->second.done_ && it->second.remove_at_.is_in_past()) { - it = wait_out_msg_queue_proof_.erase(it); - } else { - ++it; - } - } - cleanup_wait_caches_at_ = td::Timestamp::in(10.0); - } - alarm_timestamp().relax(cleanup_wait_caches_at_); } void ValidatorManagerImpl::update_shard_client_state(BlockIdExt masterchain_block_id, td::Promise promise) { @@ -2809,6 +2749,9 @@ void ValidatorManagerImpl::update_options(td::Ref opts) if (!serializer_.empty()) { td::actor::send_closure(serializer_, &AsyncStateSerializer::update_options, opts); } + if (!out_msg_queue_importer_.empty()) { + td::actor::send_closure(out_msg_queue_importer_, &OutMsgQueueImporter::update_options, opts); + } opts_ = std::move(opts); } diff --git a/validator/manager.hpp b/validator/manager.hpp index a37fcb4e0..ce5254acf 100644 --- a/validator/manager.hpp +++ b/validator/manager.hpp @@ -189,15 +189,14 @@ class ValidatorManagerImpl : public ValidatorManager { }; std::map>> wait_state_; std::map>> wait_block_data_; - std::map, WaitListCaching>> - wait_out_msg_queue_proof_; - td::Timestamp cleanup_wait_caches_at_ = td::Timestamp::now(); struct WaitBlockHandle { std::vector> waiting_; }; std::map wait_block_handle_; + td::actor::ActorOwn out_msg_queue_importer_; + private: // HANDLES CACHE std::map> handles_; @@ -372,8 +371,8 @@ class ValidatorManagerImpl : public ValidatorManager { td::Promise> promise) override; void wait_block_state_short(BlockIdExt block_id, td::uint32 priority, td::Timestamp timeout, td::Promise> promise) override; - void wait_out_msg_queue_proof(BlockIdExt block_id, ShardIdFull dst_shard, td::uint32 priority, td::Timestamp timeout, - td::Promise> promise) override; + void wait_neighbor_msg_queue_proofs(ShardIdFull dst_shard, std::vector blocks, td::Timestamp timeout, + td::Promise>> promise) override; void set_block_data(BlockHandle handle, td::Ref data, td::Promise promise) override; void wait_block_data(BlockHandle handle, td::uint32 priority, td::Timestamp, @@ -462,9 +461,9 @@ class ValidatorManagerImpl : public ValidatorManager { void send_ihr_message(td::Ref message) override; void send_top_shard_block_description(td::Ref desc) override; void send_block_broadcast(BlockBroadcast broadcast) override; - void send_get_out_msg_queue_proof_request(BlockIdExt id, ShardIdFull dst_shard, block::ImportedMsgQueueLimits limits, - td::uint32 priority, - td::Promise> promise) override; + void send_get_out_msg_queue_proof_request(ShardIdFull dst_shard, std::vector blocks, + block::ImportedMsgQueueLimits limits, + td::Promise>> promise) override; void update_shard_client_state(BlockIdExt masterchain_block_id, td::Promise promise) override; void get_shard_client_state(bool from_db, td::Promise promise) override; @@ -499,7 +498,6 @@ class ValidatorManagerImpl : public ValidatorManager { void finished_wait_state(BlockHandle handle, td::Result> R); void finished_wait_data(BlockHandle handle, td::Result> R); - void finished_wait_msg_queue(BlockIdExt block_id, ShardIdFull dst_shard, td::Result> R); void start_up() override; void started(ValidatorManagerInitResult result); diff --git a/validator/validator.h b/validator/validator.h index 5fa23467f..506950e7f 100644 --- a/validator/validator.h +++ b/validator/validator.h @@ -140,9 +140,9 @@ class ValidatorManagerInterface : public td::actor::Actor { td::Promise> promise) = 0; virtual void download_archive(BlockSeqno masterchain_seqno, std::string tmp_dir, td::Timestamp timeout, td::Promise promise) = 0; - virtual void download_out_msg_queue_proof(BlockIdExt block_id, ShardIdFull dst_shard, + virtual void download_out_msg_queue_proof(ShardIdFull dst_shard, std::vector blocks, block::ImportedMsgQueueLimits limits, td::Timestamp timeout, - td::Promise> promise) = 0; + td::Promise>> promise) = 0; virtual void new_key_block(BlockHandle handle) = 0; }; @@ -226,9 +226,9 @@ class ValidatorManagerInterface : public td::actor::Actor { virtual void wait_block_state_short(BlockIdExt block_id, td::uint32 priority, td::Timestamp timeout, td::Promise> promise) = 0; - virtual void wait_out_msg_queue_proof(BlockIdExt block_id, ShardIdFull dst_shard, td::uint32 priority, - td::Timestamp timeout, - td::Promise> promise) = 0; + virtual void wait_neighbor_msg_queue_proofs(ShardIdFull dst_shard, std::vector blocks, + td::Timestamp timeout, + td::Promise>> promise) = 0; virtual void get_archive_id(BlockSeqno masterchain_seqno, td::Promise promise) = 0; virtual void get_archive_slice(td::uint64 archive_id, td::uint64 offset, td::uint32 limit,