Skip to content

Commit

Permalink
Merge branch 'block-generation' into perfnet
Browse files Browse the repository at this point in the history
  • Loading branch information
SpyCheese committed Jul 31, 2023
2 parents 28a42a7 + 5c02459 commit 2e7b955
Show file tree
Hide file tree
Showing 24 changed files with 604 additions and 454 deletions.
6 changes: 3 additions & 3 deletions create-hardfork/create-hardfork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -268,11 +268,11 @@ class HardforkCreator : public td::actor::Actor {
td::Promise<std::vector<ton::BlockIdExt>> promise) override {
}
void download_archive(ton::BlockSeqno masterchain_seqno, std::string tmp_dir, td::Timestamp timeout,

td::Promise<std::string> promise) override {
}
void download_out_msg_queue_proof(ton::BlockIdExt block_id, ton::ShardIdFull dst_shard, td::Timestamp timeout,
td::Promise<td::Ref<ton::validator::OutMsgQueueProof>> promise) override {
void download_out_msg_queue_proof(
ton::ShardIdFull dst_shard, std::vector<ton::BlockIdExt> blocks, block::ImportedMsgQueueLimits limits,
td::Timestamp timeout, td::Promise<std::vector<td::Ref<ton::validator::OutMsgQueueProof>>> promise) override {
}

void new_key_block(ton::validator::BlockHandle handle) override {
Expand Down
4 changes: 2 additions & 2 deletions crypto/block/block.h
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,8 @@ static inline std::ostream& operator<<(std::ostream& os, const MsgProcessedUptoC

struct ImportedMsgQueueLimits {
// Default values
td::uint32 max_bytes = 1 << 18;
td::uint32 max_msgs = 40;
td::uint32 max_bytes = 1 << 19;
td::uint32 max_msgs = 500;
bool deserialize(vm::CellSlice& cs);
};

Expand Down
5 changes: 3 additions & 2 deletions test/test-ton-collator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -372,8 +372,9 @@ class TestNode : public td::actor::Actor {

td::Promise<std::string> promise) override {
}
void download_out_msg_queue_proof(ton::BlockIdExt block_id, ton::ShardIdFull dst_shard, td::Timestamp timeout,
td::Promise<td::Ref<ton::validator::OutMsgQueueProof>> promise) override {
void download_out_msg_queue_proof(
ton::ShardIdFull dst_shard, std::vector<ton::BlockIdExt> blocks, block::ImportedMsgQueueLimits limits,
td::Timestamp timeout, td::Promise<std::vector<td::Ref<ton::validator::OutMsgQueueProof>>> promise) override {
}

void new_key_block(ton::validator::BlockHandle handle) override {
Expand Down
4 changes: 2 additions & 2 deletions tl/generate/scheme/ton_api.tl
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ tonNode.archiveNotFound = tonNode.ArchiveInfo;
tonNode.archiveInfo id:long = tonNode.ArchiveInfo;

tonNode.importedMsgQueueLimits max_bytes:int max_msgs:int = ImportedMsgQueueLimits;
tonNode.outMsgQueueProof queue_proof:bytes block_state_proof:bytes msg_count:int = tonNode.OutMsgQueueProof;
tonNode.outMsgQueueProof queue_proofs:bytes block_state_proofs:bytes msg_counts:(vector int) = tonNode.OutMsgQueueProof;
tonNode.outMsgQueueProofEmpty = tonNode.OutMsgQueueProof;

tonNode.forgetPeer = tonNode.ForgetPeer;
Expand Down Expand Up @@ -453,7 +453,7 @@ tonNode.downloadBlockProofLinks blocks:(vector tonNode.blockIdExt) = tonNode.Dat
tonNode.downloadKeyBlockProofLinks blocks:(vector tonNode.blockIdExt) = tonNode.DataList;
tonNode.getArchiveInfo masterchain_seqno:int = tonNode.ArchiveInfo;
tonNode.getArchiveSlice archive_id:long offset:long max_size:int = tonNode.Data;
tonNode.getOutMsgQueueProof block_id:tonNode.blockIdExt dst_workchain:int dst_shard:long
tonNode.getOutMsgQueueProof dst_shard:tonNode.shardId blocks:(vector tonNode.blockIdExt)
limits:tonNode.importedMsgQueueLimits = tonNode.OutMsgQueueProof;

tonNode.getCapabilities = tonNode.Capabilities;
Expand Down
Binary file modified tl/generate/scheme/ton_api.tlo
Binary file not shown.
36 changes: 4 additions & 32 deletions validator/collator-node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,37 +77,8 @@ void CollatorNode::new_masterchain_block_notification(td::Ref<MasterchainState>
last_masterchain_block_ = state->get_block_id();
last_top_blocks_.clear();
last_top_blocks_[ShardIdFull{masterchainId, shardIdAll}] = last_masterchain_block_;
if (state->get_unix_time() > (td::uint32)td::Clocks::system() - 20) {
std::vector<ShardIdFull> next_shards;
if (can_collate_shard(ShardIdFull(masterchainId))) {
next_shards.push_back(ShardIdFull(masterchainId));
}
for (const auto& desc : state->get_shards()) {
last_top_blocks_[desc->shard()] = desc->top_block_id();
ShardIdFull shard = desc->shard();
if (desc->before_split()) {
if (can_collate_shard(shard_child(shard, true))) {
next_shards.push_back(shard_child(shard, true));
}
if (can_collate_shard(shard_child(shard, false))) {
next_shards.push_back(shard_child(shard, false));
}
} else if (desc->before_merge()) {
if (is_left_child(shard) && can_collate_shard(shard_parent(shard))) {
next_shards.push_back(shard_parent(shard));
}
} else if (can_collate_shard(shard)) {
next_shards.push_back(shard);
}
}
for (const ShardIdFull& shard : next_shards) {
for (const auto& neighbor : last_top_blocks_) {
if (neighbor.first != shard && block::ShardConfig::is_neighbor(shard, neighbor.first)) {
td::actor::send_closure(manager_, &ValidatorManager::wait_out_msg_queue_proof, neighbor.second, shard, 0,
td::Timestamp::in(10.0), [](td::Ref<OutMsgQueueProof>) {});
}
}
}
for (const auto& desc : state->get_shards()) {
last_top_blocks_[desc->shard()] = desc->top_block_id();
}

if (init || state->is_key_state()) {
Expand Down Expand Up @@ -274,7 +245,8 @@ void CollatorNode::receive_query_cont(ShardIdFull shard, td::Ref<MasterchainStat
std::vector<BlockIdExt> prev_blocks, Ed25519_PublicKey creator,
td::Promise<BlockCandidate> promise) {
run_collate_query(shard, min_mc_state->get_block_id(), std::move(prev_blocks), creator,
min_mc_state->get_validator_set(shard), manager_, td::Timestamp::in(10.0), std::move(promise));
min_mc_state->get_validator_set(shard), manager_, td::Timestamp::in(10.0), std::move(promise),
CollateMode::skip_store_candidate);
}

void CollatorNode::process_result(std::shared_ptr<CacheEntry> cache_entry, td::Result<BlockCandidate> R) {
Expand Down
5 changes: 3 additions & 2 deletions validator/fabric.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ namespace ton {
namespace validator {

enum ValidateMode { fake = 1, full_collated_data = 2 };
enum CollateMode { skip_store_candidate = 1 };

td::actor::ActorOwn<Db> create_db_actor(td::actor::ActorId<ValidatorManager> manager, std::string db_root_);
td::actor::ActorOwn<LiteServerCache> create_liteserver_cache_actor(td::actor::ActorId<ValidatorManager> manager,
Expand Down Expand Up @@ -83,9 +84,9 @@ void run_validate_query(ShardIdFull shard, BlockIdExt min_masterchain_block_id,
td::actor::ActorId<ValidatorManager> manager, td::Timestamp timeout,
td::Promise<ValidateCandidateResult> promise, unsigned mode = 0);
void run_collate_query(ShardIdFull shard, const BlockIdExt& min_masterchain_block_id, std::vector<BlockIdExt> prev,
Ed25519_PublicKey local_id, td::Ref<ValidatorSet> validator_set,
Ed25519_PublicKey creator, td::Ref<ValidatorSet> validator_set,
td::actor::ActorId<ValidatorManager> manager, td::Timestamp timeout,
td::Promise<BlockCandidate> promise);
td::Promise<BlockCandidate> promise, unsigned mode = 0);
void run_collate_hardfork(ShardIdFull shard, const BlockIdExt& min_masterchain_block_id, std::vector<BlockIdExt> prev,
td::actor::ActorId<ValidatorManager> manager, td::Timestamp timeout,
td::Promise<BlockCandidate> promise);
Expand Down
82 changes: 50 additions & 32 deletions validator/full-node-shard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -666,18 +666,26 @@ void FullNodeShardImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::tonNod

void FullNodeShardImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::tonNode_getOutMsgQueueProof &query,
td::Promise<td::BufferSlice> promise) {
BlockIdExt block_id = create_block_id(query.block_id_);
ShardIdFull dst_shard(query.dst_workchain_, query.dst_shard_);
block::ImportedMsgQueueLimits limits{(td::uint32)query.limits_->max_bytes_, (td::uint32)query.limits_->max_msgs_};
if (!block_id.is_valid_ext()) {
promise.set_error(td::Status::Error("invalid block_id"));
return;
std::vector<BlockIdExt> blocks;
for (const auto& x : query.blocks_) {
BlockIdExt id = create_block_id(x);
if (!id.is_valid_ext()) {
promise.set_error(td::Status::Error("invalid block_id"));
return;
}
if (!shard_is_ancestor(shard_, id.shard_full())) {
promise.set_error(td::Status::Error("query in wrong overlay"));
return;
}
blocks.push_back(create_block_id(x));
}
ShardIdFull dst_shard = create_shard_id(query.dst_shard_);
if (!dst_shard.is_valid_ext()) {
promise.set_error(td::Status::Error("invalid shard"));
return;
}
if (limits.max_bytes > (1 << 21)) {
block::ImportedMsgQueueLimits limits{(td::uint32)query.limits_->max_bytes_, (td::uint32)query.limits_->max_msgs_};
if (limits.max_bytes > (1 << 24)) {
promise.set_error(td::Status::Error("max_bytes is too big"));
return;
}
Expand All @@ -689,10 +697,10 @@ void FullNodeShardImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::tonNod
promise.set_result(serialize_tl_object(R.move_as_ok(), true));
}
});
VLOG(FULL_NODE_DEBUG) << "Got query getOutMsgQueueProof " << block_id.to_str() << " " << dst_shard.to_str()
<< " from " << src;
td::actor::create_actor<BuildOutMsgQueueProof>("buildqueueproof", block_id, dst_shard, limits, validator_manager_,
std::move(P))
VLOG(FULL_NODE_DEBUG) << "Got query getOutMsgQueueProof (" << blocks.size() << " blocks) to shard "
<< dst_shard.to_str() << " from " << src;
td::actor::create_actor<BuildOutMsgQueueProof>("buildqueueproof", dst_shard, std::move(blocks), limits,
validator_manager_, std::move(P))
.release();
}

Expand Down Expand Up @@ -931,33 +939,43 @@ void FullNodeShardImpl::download_archive(BlockSeqno masterchain_seqno, std::stri
.release();
}

void FullNodeShardImpl::download_out_msg_queue_proof(BlockIdExt block_id, ShardIdFull dst_shard,
void FullNodeShardImpl::download_out_msg_queue_proof(ShardIdFull dst_shard, std::vector<BlockIdExt> blocks,
block::ImportedMsgQueueLimits limits, td::Timestamp timeout,
td::Promise<td::Ref<OutMsgQueueProof>> promise) {
td::Promise<std::vector<td::Ref<OutMsgQueueProof>>> promise) {
// TODO: maybe more complex download (like other requests here)
auto &b = choose_neighbour(true);
if (b.adnl_id == adnl::AdnlNodeIdShort::zero()) {
promise.set_error(td::Status::Error(ErrorCode::notready, "no nodes"));
return;
}
auto P = td::PromiseCreator::lambda(
[=, promise = create_neighbour_promise(b, std::move(promise), true)](td::Result<td::BufferSlice> R) mutable {
if (R.is_error()) {
promise.set_result(R.move_as_error());
return;
}
TRY_RESULT_PROMISE(promise, f, fetch_tl_object<ton_api::tonNode_OutMsgQueueProof>(R.move_as_ok(), true));
ton_api::downcast_call(*f, td::overloaded(
[&](ton_api::tonNode_outMsgQueueProofEmpty &x) {
promise.set_error(td::Status::Error("node doesn't have this block"));
},
[&](ton_api::tonNode_outMsgQueueProof &x) {
promise.set_result(OutMsgQueueProof::fetch(block_id, dst_shard, limits, x));
}));
});
std::vector<tl_object_ptr<ton_api::tonNode_blockIdExt>> blocks_tl;
for (const BlockIdExt &id : blocks) {
blocks_tl.push_back(create_tl_block_id(id));
}
td::BufferSlice query = create_serialize_tl_object<ton_api::tonNode_getOutMsgQueueProof>(
create_tl_block_id(block_id), dst_shard.workchain, dst_shard.shard,
create_tl_shard_id(dst_shard), std::move(blocks_tl),
create_tl_object<ton_api::tonNode_importedMsgQueueLimits>(limits.max_bytes, limits.max_msgs));

auto P = td::PromiseCreator::lambda([=, promise = create_neighbour_promise(b, std::move(promise), true),
blocks = std::move(blocks)](td::Result<td::BufferSlice> R) mutable {
if (R.is_error()) {
promise.set_result(R.move_as_error());
return;
}
TRY_RESULT_PROMISE(promise, f, fetch_tl_object<ton_api::tonNode_OutMsgQueueProof>(R.move_as_ok(), true));
ton_api::downcast_call(
*f, td::overloaded(
[&](ton_api::tonNode_outMsgQueueProofEmpty &x) {
promise.set_error(td::Status::Error("node doesn't have this block"));
},
[&](ton_api::tonNode_outMsgQueueProof &x) {
delay_action(
[=, promise = std::move(promise), blocks = std::move(blocks), x = std::move(x)]() mutable {
promise.set_result(OutMsgQueueProof::fetch(dst_shard, blocks, limits, x));
},
td::Timestamp::now());
}));
});
td::actor::send_closure(overlays_, &overlay::Overlays::send_query_via, b.adnl_id, adnl_id_, overlay_id_,
"get_msg_queue", std::move(P), timeout, std::move(query), 1 << 22, rldp_);
}
Expand Down Expand Up @@ -1352,9 +1370,9 @@ td::actor::ActorOwn<FullNodeShard> FullNodeShard::create(
td::actor::ActorId<rldp::Rldp> rldp, td::actor::ActorId<rldp2::Rldp> rldp2,
td::actor::ActorId<overlay::Overlays> overlays, td::actor::ActorId<ValidatorManagerInterface> validator_manager,
td::actor::ActorId<adnl::AdnlExtClient> client, FullNodeShardMode mode) {
return td::actor::create_actor<FullNodeShardImpl>("tonnode", shard, local_id, adnl_id, zero_state_file_hash, config,
keyring, adnl, rldp, rldp2, overlays, validator_manager, client,
mode);
return td::actor::create_actor<FullNodeShardImpl>(PSTRING() << "fullnode" << shard.to_str(), shard, local_id, adnl_id,
zero_state_file_hash, config, keyring, adnl, rldp, rldp2, overlays,
validator_manager, client, mode);
}

} // namespace fullnode
Expand Down
4 changes: 2 additions & 2 deletions validator/full-node-shard.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ class FullNodeShard : public td::actor::Actor {
td::Promise<std::vector<BlockIdExt>> promise) = 0;
virtual void download_archive(BlockSeqno masterchain_seqno, std::string tmp_dir, td::Timestamp timeout,
td::Promise<std::string> promise) = 0;
virtual void download_out_msg_queue_proof(BlockIdExt block_id, ShardIdFull dst_shard,
virtual void download_out_msg_queue_proof(ShardIdFull dst_shard, std::vector<BlockIdExt> blocks,
block::ImportedMsgQueueLimits limits, td::Timestamp timeout,
td::Promise<td::Ref<OutMsgQueueProof>> promise) = 0;
td::Promise<std::vector<td::Ref<OutMsgQueueProof>>> promise) = 0;

virtual void set_handle(BlockHandle handle, td::Promise<td::Unit> promise) = 0;

Expand Down
5 changes: 3 additions & 2 deletions validator/full-node-shard.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,9 @@ class FullNodeShardImpl : public FullNodeShard {
td::Promise<std::vector<BlockIdExt>> promise) override;
void download_archive(BlockSeqno masterchain_seqno, std::string tmp_dir, td::Timestamp timeout,
td::Promise<std::string> promise) override;
void download_out_msg_queue_proof(BlockIdExt block_id, ShardIdFull dst_shard, block::ImportedMsgQueueLimits limits,
td::Timestamp timeout, td::Promise<td::Ref<OutMsgQueueProof>> promise) override;
void download_out_msg_queue_proof(ShardIdFull dst_shard, std::vector<BlockIdExt> blocks,
block::ImportedMsgQueueLimits limits, td::Timestamp timeout,
td::Promise<std::vector<td::Ref<OutMsgQueueProof>>> promise) override;

void set_handle(BlockHandle handle, td::Promise<td::Unit> promise) override;

Expand Down
24 changes: 15 additions & 9 deletions validator/full-node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -378,17 +378,22 @@ void FullNodeImpl::download_archive(BlockSeqno masterchain_seqno, std::string tm
std::move(promise));
}

void FullNodeImpl::download_out_msg_queue_proof(BlockIdExt block_id, ShardIdFull dst_shard,
void FullNodeImpl::download_out_msg_queue_proof(ShardIdFull dst_shard, std::vector<BlockIdExt> blocks,
block::ImportedMsgQueueLimits limits, td::Timestamp timeout,
td::Promise<td::Ref<OutMsgQueueProof>> promise) {
auto shard = get_shard(block_id.shard_full());
td::Promise<std::vector<td::Ref<OutMsgQueueProof>>> promise) {
if (blocks.empty()) {
promise.set_value({});
return;
}
// All blocks are expected to have the same minsplit shard prefix
auto shard = get_shard(blocks[0].shard_full());
if (shard.empty()) {
VLOG(FULL_NODE_WARNING) << "dropping download msg queue query to unknown shard";
promise.set_error(td::Status::Error(ErrorCode::notready, "shard not ready"));
return;
}
td::actor::send_closure(shard, &FullNodeShard::download_out_msg_queue_proof, block_id, dst_shard, limits, timeout,
std::move(promise));
td::actor::send_closure(shard, &FullNodeShard::download_out_msg_queue_proof, dst_shard, std::move(blocks), limits,
timeout, std::move(promise));
}

td::actor::ActorId<FullNodeShard> FullNodeImpl::get_shard(ShardIdFull shard) {
Expand Down Expand Up @@ -588,10 +593,11 @@ void FullNodeImpl::start_up() {
td::actor::send_closure(id_, &FullNodeImpl::download_archive, masterchain_seqno, std::move(tmp_dir), timeout,
std::move(promise));
}
void download_out_msg_queue_proof(BlockIdExt block_id, ShardIdFull dst_shard, block::ImportedMsgQueueLimits limits,
td::Timestamp timeout, td::Promise<td::Ref<OutMsgQueueProof>> promise) override {
td::actor::send_closure(id_, &FullNodeImpl::download_out_msg_queue_proof, block_id, dst_shard, limits, timeout,
std::move(promise));
void download_out_msg_queue_proof(ShardIdFull dst_shard, std::vector<BlockIdExt> blocks,
block::ImportedMsgQueueLimits limits, td::Timestamp timeout,
td::Promise<std::vector<td::Ref<OutMsgQueueProof>>> promise) override {
td::actor::send_closure(id_, &FullNodeImpl::download_out_msg_queue_proof, dst_shard, std::move(blocks), limits,
timeout, std::move(promise));
}

void new_key_block(BlockHandle handle) override {
Expand Down
5 changes: 3 additions & 2 deletions validator/full-node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,9 @@ class FullNodeImpl : public FullNode {
void get_next_key_blocks(BlockIdExt block_id, td::Timestamp timeout, td::Promise<std::vector<BlockIdExt>> promise);
void download_archive(BlockSeqno masterchain_seqno, std::string tmp_dir, td::Timestamp timeout,
td::Promise<std::string> promise);
void download_out_msg_queue_proof(BlockIdExt block_id, ShardIdFull dst_shard, block::ImportedMsgQueueLimits limits,
td::Timestamp timeout, td::Promise<td::Ref<OutMsgQueueProof>> promise);
void download_out_msg_queue_proof(ShardIdFull dst_shard, std::vector<BlockIdExt> blocks,
block::ImportedMsgQueueLimits limits, td::Timestamp timeout,
td::Promise<std::vector<td::Ref<OutMsgQueueProof>>> promise);

void got_key_block_proof(td::Ref<ProofLink> proof);
void got_zero_block_state(td::Ref<ShardState> state);
Expand Down
Loading

0 comments on commit 2e7b955

Please sign in to comment.