diff --git a/CMake/FindLZ4.cmake b/CMake/FindLZ4.cmake new file mode 100644 index 000000000..e13eca556 --- /dev/null +++ b/CMake/FindLZ4.cmake @@ -0,0 +1,39 @@ +############################################################################### +# Find LZ4 +# +# This sets the following variables: +# LZ4_FOUND - True if LZ4 was found. +# LZ4_INCLUDE_DIRS - Directories containing the LZ4 include files. +# LZ4_LIBRARIES - Libraries needed to use LZ4. +# LZ4_LIBRARY - Library needed to use LZ4. +# LZ4_LIBRARY_DIRS - Library needed to use LZ4. + +find_package(PkgConfig REQUIRED) + +# If found, LZ$_* variables will be defined +pkg_check_modules(LZ4 REQUIRED liblz4) + +if(NOT LZ4_FOUND) + find_path(LZ4_INCLUDE_DIR lz4.h + HINTS "${LZ4_ROOT}" "$ENV{LZ4_ROOT}" + PATHS "$ENV{PROGRAMFILES}/lz4" "$ENV{PROGRAMW6432}/lz4" + PATH_SUFFIXES include) + + find_library(LZ4_LIBRARY + NAMES lz4 lz4_static + HINTS "${LZ4_ROOT}" "$ENV{LZ4_ROOT}" + PATHS "$ENV{PROGRAMFILES}/lz4" "$ENV{PROGRAMW6432}/lz4" + PATH_SUFFIXES lib) + + if(LZ4_LIBRARY) + set(LZ4_LIBRARIES ${LZ4_LIBRARY}) + get_filename_component(LZ4_LIBRARY_DIRS ${LZ4_LIBRARY} DIRECTORY) + endif() +else() + find_library(LZ4_LIBRARY + NAMES lz4 lz4_static + PATHS ${LZ4_LIBRARY_DIRS} + NO_DEFAULT_PATH) +endif() + +mark_as_advanced(LZ4_LIBRARY LZ4_INCLUDE_DIRS LZ4_LIBRARY_DIRS LZ4_LIBRARIES) \ No newline at end of file diff --git a/tdutils/CMakeLists.txt b/tdutils/CMakeLists.txt index 7b577e4f8..ae364a9fe 100644 --- a/tdutils/CMakeLists.txt +++ b/tdutils/CMakeLists.txt @@ -15,6 +15,7 @@ if (NOT DEFINED CMAKE_INSTALL_LIBDIR) endif() find_package(PkgConfig REQUIRED) +find_package(LZ4) if (NOT ZLIB_FOUND) pkg_check_modules(ZLIB zlib) endif() @@ -280,6 +281,15 @@ if (TDUTILS_MIME_TYPE) ) endif() +if (LZ4_FOUND) + set(TD_HAVE_LZ4 1) + set(TDUTILS_SOURCE + ${TDUTILS_SOURCE} + td/utils/lz4.cpp + td/utils/lz4.h + ) +endif() + set(TDUTILS_TEST_SOURCE ${CMAKE_CURRENT_SOURCE_DIR}/test/buffer.cpp ${CMAKE_CURRENT_SOURCE_DIR}/test/ConcurrentHashMap.cpp @@ -338,6 +348,11 @@ endif() if (CRC32C_FOUND) target_link_libraries(tdutils PRIVATE crc32c) endif() + +if (LZ4_FOUND) + target_link_libraries(tdutils PRIVATE ${LZ4_LIBRARIES}) +endif() + if (ABSL_FOUND) target_link_libraries_system(tdutils absl::flat_hash_map absl::flat_hash_set absl::hash) endif() diff --git a/tdutils/td/utils/config.h.in b/tdutils/td/utils/config.h.in index f8b89aeb5..3f4e1bf2b 100644 --- a/tdutils/td/utils/config.h.in +++ b/tdutils/td/utils/config.h.in @@ -3,6 +3,7 @@ #cmakedefine01 TD_HAVE_OPENSSL #cmakedefine01 TD_HAVE_ZLIB #cmakedefine01 TD_HAVE_CRC32C +#cmakedefine01 TD_HAVE_LZ4 #cmakedefine01 TD_HAVE_COROUTINES #cmakedefine01 TD_HAVE_ABSL #cmakedefine01 TD_FD_DEBUG diff --git a/tdutils/td/utils/lz4.cpp b/tdutils/td/utils/lz4.cpp new file mode 100644 index 000000000..ebf456aaa --- /dev/null +++ b/tdutils/td/utils/lz4.cpp @@ -0,0 +1,48 @@ +/* + This file is part of TON Blockchain Library. + + TON Blockchain Library is free software: you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation, either version 2 of the License, or + (at your option) any later version. + + TON Blockchain Library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with TON Blockchain Library. If not, see . +*/ +#include "td/utils/buffer.h" +#include "td/utils/misc.h" +#include + +namespace td { + +td::BufferSlice lz4_compress(td::Slice data) { + int size = narrow_cast(data.size()); + int buf_size = LZ4_compressBound(size); + td::BufferSlice compressed(buf_size); + int compressed_size = LZ4_compress_default(data.data(), compressed.data(), size, buf_size); + CHECK(compressed_size > 0); + return td::BufferSlice{compressed.as_slice().substr(0, compressed_size)}; +} + +td::Result lz4_decompress(td::Slice data, int max_decompressed_size) { + TRY_RESULT(size, narrow_cast_safe(data.size())); + if (max_decompressed_size < 0) { + return td::Status::Error("invalid max_decompressed_size"); + } + td::BufferSlice decompressed(max_decompressed_size); + int result = LZ4_decompress_safe(data.data(), decompressed.data(), size, max_decompressed_size); + if (result < 0) { + return td::Status::Error(PSTRING() << "lz4 decompression failed, error code: " << result); + } + if (result == max_decompressed_size) { + return decompressed; + } + return td::BufferSlice{decompressed.as_slice().substr(0, result)}; +} + +} // namespace td diff --git a/tdutils/td/utils/lz4.h b/tdutils/td/utils/lz4.h new file mode 100644 index 000000000..fbbc470fa --- /dev/null +++ b/tdutils/td/utils/lz4.h @@ -0,0 +1,27 @@ +/* + This file is part of TON Blockchain Library. + + TON Blockchain Library is free software: you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation, either version 2 of the License, or + (at your option) any later version. + + TON Blockchain Library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with TON Blockchain Library. If not, see . +*/ +#pragma once + +#include "td/utils/buffer.h" +#include "td/utils/Status.h" + +namespace td { + +td::BufferSlice lz4_compress(td::Slice data); +td::Result lz4_decompress(td::Slice data, int max_decompressed_size); + +} // namespace td diff --git a/tl/generate/scheme/ton_api.tl b/tl/generate/scheme/ton_api.tl index 5d636e9c3..cbe27dfa3 100644 --- a/tl/generate/scheme/ton_api.tl +++ b/tl/generate/scheme/ton_api.tl @@ -311,6 +311,7 @@ validatorSession.candidateId src:int256 root_hash:int256 file_hash:int256 collat validatorSession.blockUpdate ts:long actions:(vector validatorSession.round.Message) state:int = validatorSession.BlockUpdate; validatorSession.candidate src:int256 round:int root_hash:int256 data:bytes collated_data:bytes = validatorSession.Candidate; +validatorSession.compressedCandidate flags:# src:int256 round:int root_hash:int256 decompressed_size:int data:bytes = validatorSession.Candidate; validatorSession.config catchain_idle_timeout:double catchain_max_deps:int round_candidates:int next_candidate_delay:double round_attempt_duration:int max_round_attempts:int max_block_size:int max_collated_data_size:int = validatorSession.Config; @@ -784,7 +785,9 @@ validatorSession.stats id:tonNode.blockId timestamp:long self:int256 creator:int signatures:int signatures_weight:long approve_signatures:int approve_signatures_weight:long first_round:int rounds:(vector validatorSession.statsRound) = validatorSession.Stats; -collatorNode.generateBlockSuccess candidate:db.Candidate = collatorNode.GenerateBlockResult; +collatorNode.candidate source:PublicKey id:tonNode.blockIdExt data:bytes collated_data:bytes = collatorNode.Candidate; +collatorNode.compressedCandidate flags:# source:PublicKey id:tonNode.blockIdExt decompressed_size:int data:bytes = collatorNode.Candidate; +collatorNode.generateBlockSuccess candidate:collatorNode.Candidate = collatorNode.GenerateBlockResult; collatorNode.generateBlockError code:int message:string = collatorNode.GenerateBlockResult; ---functions--- diff --git a/tl/generate/scheme/ton_api.tlo b/tl/generate/scheme/ton_api.tlo index 60e942922..f83b746c2 100644 Binary files a/tl/generate/scheme/ton_api.tlo and b/tl/generate/scheme/ton_api.tlo differ diff --git a/validator-session/CMakeLists.txt b/validator-session/CMakeLists.txt index 4931e464d..6f82d0a3a 100644 --- a/validator-session/CMakeLists.txt +++ b/validator-session/CMakeLists.txt @@ -5,12 +5,14 @@ if (NOT OPENSSL_FOUND) endif() set(VALIDATOR_SESSION_SOURCE + candidate-serializer.cpp persistent-vector.cpp validator-session-description.cpp validator-session-state.cpp validator-session.cpp validator-session-round-attempt-state.cpp + candidate-serializer.h persistent-vector.h validator-session-description.h validator-session-description.hpp diff --git a/validator-session/candidate-serializer.cpp b/validator-session/candidate-serializer.cpp new file mode 100644 index 000000000..2e92bf076 --- /dev/null +++ b/validator-session/candidate-serializer.cpp @@ -0,0 +1,77 @@ +/* + This file is part of TON Blockchain Library. + + TON Blockchain Library is free software: you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation, either version 2 of the License, or + (at your option) any later version. + + TON Blockchain Library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with TON Blockchain Library. If not, see . +*/ +#pragma once +#include "candidate-serializer.h" +#include "tl-utils/tl-utils.hpp" +#include "vm/boc.h" +#include "td/utils/lz4.h" + +namespace ton { + +namespace validatorsession { + +td::Result serialize_candidate(const tl_object_ptr &block, + bool compression_enabled) { + if (!compression_enabled) { + return serialize_tl_object(block, true); + } + vm::BagOfCells boc1, boc2; + TRY_STATUS(boc1.deserialize(block->data_)); + if (boc1.get_root_count() != 1) { + return td::Status::Error("block candidate should have exactly one root"); + } + std::vector> roots = {boc1.get_root_cell()}; + TRY_STATUS(boc2.deserialize(block->collated_data_)); + for (int i = 0; i < boc2.get_root_count(); ++i) { + roots.push_back(boc2.get_root_cell(i)); + } + TRY_RESULT(data, vm::std_boc_serialize_multi(std::move(roots), 2)); + td::BufferSlice compressed = td::lz4_compress(data); + LOG(DEBUG) << "Compressing candidate: " << block->data_.size() + block->collated_data_.size() << " -> " + << compressed.size(); + return create_serialize_tl_object( + 0, block->src_, block->round_, block->root_hash_, (int)data.size(), std::move(compressed)); +} + +td::Result> deserialize_candidate(td::Slice data, + int max_decompressed_data_size) { + auto R = fetch_tl_object(data, true); + if (R.is_ok()) { + return R; + } + TRY_RESULT(f, fetch_tl_object(data, true)); + if (f->decompressed_size_ > max_decompressed_data_size) { + return td::Status::Error("decompressed size is too big"); + } + TRY_RESULT(decompressed, td::lz4_decompress(f->data_, f->decompressed_size_)); + if (decompressed.size() != (size_t)f->decompressed_size_) { + return td::Status::Error("decompressed size mismatch"); + } + TRY_RESULT(roots, vm::std_boc_deserialize_multi(decompressed)); + if (roots.empty()) { + return td::Status::Error("boc is empty"); + } + TRY_RESULT(block_data, vm::std_boc_serialize(roots[0], 31)); + roots.erase(roots.begin()); + TRY_RESULT(collated_data, vm::std_boc_serialize_multi(std::move(roots), 31)); + return create_tl_object(f->src_, f->round_, f->root_hash_, std::move(block_data), + std::move(collated_data)); +} + +} // namespace validatorsession + +} // namespace ton diff --git a/validator-session/candidate-serializer.h b/validator-session/candidate-serializer.h new file mode 100644 index 000000000..4ccfbc73c --- /dev/null +++ b/validator-session/candidate-serializer.h @@ -0,0 +1,32 @@ +/* + This file is part of TON Blockchain Library. + + TON Blockchain Library is free software: you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation, either version 2 of the License, or + (at your option) any later version. + + TON Blockchain Library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with TON Blockchain Library. If not, see . +*/ +#pragma once +#include "ton/ton-types.h" +#include "auto/tl/ton_api.h" + +namespace ton { + +namespace validatorsession { + +td::Result serialize_candidate(const tl_object_ptr &block, + bool compression_enabled); +td::Result> deserialize_candidate(td::Slice data, + int max_decompressed_data_size); + +} // namespace validatorsession + +} // namespace ton diff --git a/validator-session/validator-session.cpp b/validator-session/validator-session.cpp index dc2aff492..fed7ec0f6 100644 --- a/validator-session/validator-session.cpp +++ b/validator-session/validator-session.cpp @@ -20,6 +20,7 @@ #include "td/utils/Random.h" #include "td/utils/crypto.h" #include "ton/ton-tl.hpp" +#include "candidate-serializer.h" namespace ton { @@ -205,7 +206,7 @@ void ValidatorSessionImpl::preprocess_block(catchain::CatChainBlock *block) { } bool ValidatorSessionImpl::ensure_candidate_unique(td::uint32 src_idx, td::uint32 round, - ValidatorSessionCandidateId block_id) { + ValidatorSessionCandidateId block_id) { auto it = src_round_candidate_[src_idx].find(round); if (it != src_round_candidate_[src_idx].end() && it->second != block_id) { VLOG(VALIDATOR_SESSION_WARNING) << this << "[node " << description_->get_source_adnl_id(src_idx) << "][candidate " @@ -222,7 +223,8 @@ void ValidatorSessionImpl::process_broadcast(PublicKeyHash src, td::BufferSlice // Note: src is not necessarily equal to the sender of this message: // If requested using get_broadcast_p2p, src is the creator of the block, sender possibly is some other node. auto src_idx = description().get_source_idx(src); - auto R = fetch_tl_object(data.clone(), true); + auto R = deserialize_candidate( + data, description().opts().max_block_size + description().opts().max_collated_data_size + 1024); if (R.is_error()) { VLOG(VALIDATOR_SESSION_WARNING) << this << "[node " << src << "][broadcast " << sha256_bits256(data.as_slice()) << "]: failed to parse: " << R.move_as_error(); @@ -344,17 +346,17 @@ void ValidatorSessionImpl::process_query(PublicKeyHash src, td::BufferSlice data } CHECK(block); - auto P = td::PromiseCreator::lambda( - [promise = std::move(promise), src = f->id_->src_, round_id](td::Result R) mutable { - if (R.is_error()) { - promise.set_error(R.move_as_error_prefix("failed to get candidate: ")); - } else { - auto c = R.move_as_ok(); - auto obj = create_tl_object( - src, round_id, c.id.root_hash, std::move(c.data), std::move(c.collated_data)); - promise.set_value(serialize_tl_object(obj, true)); - } - }); + auto P = td::PromiseCreator::lambda([promise = std::move(promise), src = f->id_->src_, round_id, + compress = compress_block_candidates_](td::Result R) mutable { + if (R.is_error()) { + promise.set_error(R.move_as_error_prefix("failed to get candidate: ")); + } else { + auto c = R.move_as_ok(); + auto obj = create_tl_object(src, round_id, c.id.root_hash, std::move(c.data), + std::move(c.collated_data)); + promise.set_result(serialize_candidate(obj, compress)); + } + }); callback_->get_approved_candidate(description().get_source_public_key(block->get_src_idx()), f->id_->root_hash_, f->id_->file_hash_, f->id_->collated_data_file_hash_, std::move(P)); @@ -430,7 +432,7 @@ void ValidatorSessionImpl::generated_block(td::uint32 round, ValidatorSessionCan auto b = create_tl_object(local_id().tl(), round, root_hash, std::move(data), std::move(collated_data)); - auto B = serialize_tl_object(b, true); + auto B = serialize_candidate(b, compress_block_candidates_).move_as_ok(); auto block_id = description().candidate_id(local_idx(), root_hash, file_hash, collated_data_file_hash); @@ -561,8 +563,8 @@ void ValidatorSessionImpl::try_approve_block(const SentBlock *block) { td::actor::send_closure(SelfId, &ValidatorSessionImpl::candidate_decision_ok, round, hash, root_hash, file_hash, src, R.ok_from()); } else { - td::actor::send_closure(SelfId, &ValidatorSessionImpl::candidate_decision_fail, round, hash, R.reason(), - src, R.proof()); + td::actor::send_closure(SelfId, &ValidatorSessionImpl::candidate_decision_fail, round, hash, R.reason(), src, + R.proof()); } }); pending_approve_.insert(block_id); @@ -871,8 +873,8 @@ void ValidatorSessionImpl::on_catchain_started() { auto broadcast = create_tl_object( src.tl(), round, root_hash, std::move(B.data), std::move(B.collated_data)); td::actor::send_closure(SelfId, &ValidatorSessionImpl::process_broadcast, src, - serialize_tl_object(broadcast, true), td::optional(), - false); + serialize_candidate(broadcast, false).move_as_ok(), + td::optional(), false); } }); callback_->get_approved_candidate(description().get_source_public_key(x->get_src_idx()), x->get_root_hash(), @@ -899,6 +901,7 @@ ValidatorSessionImpl::ValidatorSessionImpl(catchain::CatChainSessionId session_i , rldp_(rldp) , overlay_manager_(overlays) , allow_unsafe_self_blocks_resync_(allow_unsafe_self_blocks_resync) { + compress_block_candidates_ = opts.proto_version >= 3; // TODO: if used, change to an appropriate value description_ = ValidatorSessionDescription::create(std::move(opts), nodes, local_id); src_round_candidate_.resize(description_->get_total_nodes()); } @@ -909,9 +912,9 @@ void ValidatorSessionImpl::start() { auto w = description().export_catchain_nodes(); - catchain_ = catchain::CatChain::create( - make_catchain_callback(), description().opts().catchain_opts, keyring_, adnl_, overlay_manager_, std::move(w), - local_id(), unique_hash_, db_root_, db_suffix_, allow_unsafe_self_blocks_resync_); + catchain_ = catchain::CatChain::create(make_catchain_callback(), description().opts().catchain_opts, keyring_, adnl_, + overlay_manager_, std::move(w), local_id(), unique_hash_, db_root_, db_suffix_, + allow_unsafe_self_blocks_resync_); check_all(); } @@ -947,7 +950,7 @@ void ValidatorSessionImpl::stats_init() { void ValidatorSessionImpl::stats_add_round() { cur_stats_.rounds.emplace_back(); - auto& round = cur_stats_.rounds.back(); + auto &round = cur_stats_.rounds.back(); round.timestamp = (td::uint64)td::Clocks::system(); round.producers.resize(description().get_max_priority() + 1); for (td::uint32 i = 0; i < description().get_total_nodes(); i++) { @@ -966,9 +969,9 @@ void ValidatorSessionImpl::stats_set_candidate_status(td::uint32 round, PublicKe if (round < cur_stats_.first_round || round - cur_stats_.first_round >= cur_stats_.rounds.size()) { return; } - auto& stats_round = cur_stats_.rounds[round - cur_stats_.first_round]; + auto &stats_round = cur_stats_.rounds[round - cur_stats_.first_round]; auto it = std::find_if(stats_round.producers.begin(), stats_round.producers.end(), - [&](const ValidatorSessionStats::Producer& p) { return p.id == src; }); + [&](const ValidatorSessionStats::Producer &p) { return p.id == src; }); if (it == stats_round.producers.end()) { return; } @@ -986,8 +989,8 @@ void ValidatorSessionImpl::get_session_info( next_producers.push_back(description().get_source_id(node).bits256_value()); } promise.set_result(create_tl_object( - create_tl_block_id_simple(BlockId{}), description().get_source_id(local_idx()).bits256_value(), - cur_round_, std::move(next_producers))); + create_tl_block_id_simple(BlockId{}), description().get_source_id(local_idx()).bits256_value(), cur_round_, + std::move(next_producers))); } td::actor::ActorOwn ValidatorSession::create( diff --git a/validator-session/validator-session.hpp b/validator-session/validator-session.hpp index 741be11bd..d6883de91 100644 --- a/validator-session/validator-session.hpp +++ b/validator-session/validator-session.hpp @@ -156,6 +156,7 @@ class ValidatorSessionImpl : public ValidatorSession { bool started_ = false; bool catchain_started_ = false; bool allow_unsafe_self_blocks_resync_; + bool compress_block_candidates_ = false; ValidatorSessionStats cur_stats_; void stats_init(); diff --git a/validator/collator-node.cpp b/validator/collator-node.cpp index 2cc06b340..c8ada4d1d 100644 --- a/validator/collator-node.cpp +++ b/validator/collator-node.cpp @@ -19,6 +19,8 @@ #include "fabric.h" #include "block-auto.h" #include "block-db.h" +#include "td/utils/lz4.h" +#include "checksum.h" namespace ton { @@ -71,6 +73,7 @@ void CollatorNode::del_shard(ShardIdFull shard) { } void CollatorNode::new_masterchain_block_notification(td::Ref state) { + bool init = !last_masterchain_block_.is_valid(); last_masterchain_block_ = state->get_block_id(); last_top_blocks_.clear(); last_top_blocks_[ShardIdFull{masterchainId, shardIdAll}] = last_masterchain_block_; @@ -104,7 +107,8 @@ void CollatorNode::new_masterchain_block_notification(td::Ref } } } - if (validators_.empty() || state->is_key_state()) { + + if (init || state->is_key_state()) { validators_.clear(); for (int next : {-1, 0, 1}) { td::Ref vals = state->get_total_validator_set(next); @@ -119,7 +123,9 @@ void CollatorNode::new_masterchain_block_notification(td::Ref } } } + use_compression_ = state->get_consensus_config().proto_version >= 3; // As in ValidatorSessionImpl } + // Remove old cache entries auto it = cache_.begin(); while (it != cache_.end()) { @@ -137,12 +143,6 @@ static td::BufferSlice serialize_error(td::Status error) { return create_serialize_tl_object(error.code(), error.message().c_str()); } -static td::BufferSlice serialize_response(BlockCandidate block) { - return create_serialize_tl_object(create_tl_object( - PublicKey{pubkeys::Ed25519{block.pubkey.as_bits256()}}.tl(), create_tl_block_id(block.id), std::move(block.data), - std::move(block.collated_data))); -} - static BlockCandidate change_creator(BlockCandidate block, Ed25519_PublicKey creator) { CHECK(!block.id.is_masterchain()); if (block.pubkey == creator) { @@ -166,13 +166,15 @@ static BlockCandidate change_creator(BlockCandidate block, Ed25519_PublicKey cre void CollatorNode::receive_query(adnl::AdnlNodeIdShort src, td::BufferSlice data, td::Promise promise) { - td::Promise new_promise = [promise = std::move(promise), src](td::Result R) mutable { + td::Promise new_promise = [promise = std::move(promise), src, + compress = use_compression_](td::Result R) mutable { if (R.is_error()) { LOG(WARNING) << "Query from " << src << ", error: " << R.error(); promise.set_result(serialize_error(R.move_as_error())); } else { LOG(INFO) << "Query from " << src << ", success"; - promise.set_result(serialize_response(R.move_as_ok())); + promise.set_result(create_serialize_tl_object( + serialize_candidate(R.move_as_ok(), compress))); } }; if (!last_masterchain_block_.is_valid()) { @@ -297,6 +299,77 @@ bool CollatorNode::can_collate_shard(ShardIdFull shard) const { return false; } +tl_object_ptr CollatorNode::serialize_candidate(const BlockCandidate& block, + bool compress) { + if (!compress) { + return create_tl_object( + PublicKey{pubkeys::Ed25519{block.pubkey.as_bits256()}}.tl(), create_tl_block_id(block.id), block.data.clone(), + block.collated_data.clone()); + } + vm::BagOfCells boc1, boc2; + boc1.deserialize(block.data).ensure(); + std::vector> roots = {boc1.get_root_cell()}; + boc2.deserialize(block.collated_data).ensure(); + for (int i = 0; i < boc2.get_root_count(); ++i) { + roots.push_back(boc2.get_root_cell(i)); + } + auto data = vm::std_boc_serialize_multi(std::move(roots), 2).move_as_ok(); + td::BufferSlice compressed = td::lz4_compress(data); + LOG(DEBUG) << "Compressing candidate: " << block.data.size() + block.collated_data.size() << " -> " + << compressed.size(); + return create_tl_object( + 0, PublicKey{pubkeys::Ed25519{block.pubkey.as_bits256()}}.tl(), create_tl_block_id(block.id), (int)data.size(), + std::move(compressed)); +} + +td::Result CollatorNode::deserialize_candidate(tl_object_ptr f, + int max_decompressed_data_size) { + td::Result res; + ton_api::downcast_call(*f, td::overloaded( + [&](ton_api::collatorNode_candidate& c) { + res = [&]() -> td::Result { + auto hash = td::sha256_bits256(c.collated_data_); + auto key = ton::PublicKey{c.source_}; + if (!key.is_ed25519()) { + return td::Status::Error("invalid pubkey"); + } + auto e_key = Ed25519_PublicKey{key.ed25519_value().raw()}; + return BlockCandidate{e_key, create_block_id(c.id_), hash, std::move(c.data_), + std::move(c.collated_data_)}; + }(); + }, + [&](ton_api::collatorNode_compressedCandidate& c) { + res = [&]() -> td::Result { + if (c.decompressed_size_ <= 0) { + return td::Status::Error("invalid decompressed size"); + } + if (c.decompressed_size_ > max_decompressed_data_size) { + return td::Status::Error("decompressed size is too big"); + } + TRY_RESULT(decompressed, td::lz4_decompress(c.data_, c.decompressed_size_)); + if (decompressed.size() != (size_t)c.decompressed_size_) { + return td::Status::Error("decompressed size mismatch"); + } + TRY_RESULT(roots, vm::std_boc_deserialize_multi(decompressed)); + if (roots.empty()) { + return td::Status::Error("boc is empty"); + } + TRY_RESULT(block_data, vm::std_boc_serialize(roots[0], 31)); + roots.erase(roots.begin()); + TRY_RESULT(collated_data, vm::std_boc_serialize_multi(std::move(roots), 31)); + auto hash = td::sha256_bits256(collated_data); + auto key = ton::PublicKey{c.source_}; + if (!key.is_ed25519()) { + return td::Status::Error("invalid pubkey"); + } + auto e_key = Ed25519_PublicKey{key.ed25519_value().raw()}; + return BlockCandidate{e_key, create_block_id(c.id_), hash, std::move(block_data), + std::move(collated_data)}; + }(); + })); + return res; +} + } // namespace validator } // namespace ton diff --git a/validator/collator-node.hpp b/validator/collator-node.hpp index d640244a7..c1334a850 100644 --- a/validator/collator-node.hpp +++ b/validator/collator-node.hpp @@ -54,6 +54,7 @@ class CollatorNode : public td::actor::Actor { BlockIdExt last_masterchain_block_{}; std::map last_top_blocks_; + bool use_compression_ = false; struct CacheEntry { bool started = false; @@ -77,6 +78,11 @@ class CollatorNode : public td::actor::Actor { } void process_result(std::shared_ptr cache_entry, td::Result R); + + public: + static tl_object_ptr serialize_candidate(const BlockCandidate& block, bool compress); + static td::Result deserialize_candidate(tl_object_ptr f, + int max_decompressed_data_size); }; } // namespace validator diff --git a/validator/validator-group.cpp b/validator/validator-group.cpp index 5c7bef5b3..8f73620c3 100644 --- a/validator/validator-group.cpp +++ b/validator/validator-group.cpp @@ -23,6 +23,7 @@ #include "common/delay.h" #include "ton/ton-tl.hpp" #include "td/utils/Random.h" +#include "collator-node.hpp" namespace ton { @@ -450,7 +451,7 @@ void ValidatorGroup::send_collate_query(td::uint32 round_id, td::Timestamp timeo std::move(promise)); }); LOG(INFO) << "sending collate query for " << next_block_id.to_str() << ": send to " << collator; - size_t max_answer_size = config_.max_block_size + config_.max_collated_data_size + 256; + size_t max_answer_size = config_.max_block_size + config_.max_collated_data_size + 1024; td::Timestamp query_timeout = td::Timestamp::in(10.0); query_timeout.relax(timeout); td::actor::send_closure(rldp_, &rldp::Rldp::send_query_ex, local_adnl_id_, collator, "collatequery", std::move(P), @@ -464,29 +465,26 @@ void ValidatorGroup::receive_collate_query_response(td::uint32 round_id, td::Buf return; } TRY_RESULT_PROMISE(promise, f, fetch_tl_object(data, true)); - tl_object_ptr b; + td::Result res; ton_api::downcast_call(*f, td::overloaded( [&](ton_api::collatorNode_generateBlockError &r) { td::Status error = td::Status::Error(r.code_, r.message_); - promise.set_error(error.move_as_error_prefix("collate query: ")); + res = error.move_as_error_prefix("collate query: "); }, - [&](ton_api::collatorNode_generateBlockSuccess &r) { b = std::move(r.candidate_); })); - if (!b) { - return; - } - auto key = PublicKey{b->source_}; - if (key != local_id_full_) { + [&](ton_api::collatorNode_generateBlockSuccess &r) { + res = CollatorNode::deserialize_candidate( + std::move(r.candidate_), + config_.max_block_size + config_.max_collated_data_size + 1024); + })); + TRY_RESULT_PROMISE(promise, candidate, std::move(res)); + if (candidate.pubkey.as_bits256() != local_id_full_.ed25519_value().raw()) { promise.set_error(td::Status::Error("collate query: block candidate source mismatch")); return; } - auto e_key = Ed25519_PublicKey{key.ed25519_value().raw()}; - auto block_id = ton::create_block_id(b->id_); - if (block_id.shard_full() != shard_) { + if (candidate.id.shard_full() != shard_) { promise.set_error(td::Status::Error("collate query: shard mismatch")); return; } - auto collated_data_hash = td::sha256_bits256(b->collated_data_); - BlockCandidate candidate(e_key, block_id, collated_data_hash, std::move(b->data_), std::move(b->collated_data_)); auto P = td::PromiseCreator::lambda( [candidate = candidate.clone(), promise = std::move(promise)](td::Result R) mutable {