From 41a51d11fc902a42b0384a61c836cb779f1be8b4 Mon Sep 17 00:00:00 2001 From: Boris Erakhtin Date: Thu, 26 Dec 2024 13:05:27 +0500 Subject: [PATCH 01/20] Parallel blocks download --- core/application/app_configuration.hpp | 2 + .../impl/app_configuration_impl.cpp | 18 +++-- .../impl/app_configuration_impl.hpp | 5 ++ core/network/impl/synchronizer_impl.cpp | 67 ++++++++++++++----- core/network/impl/synchronizer_impl.hpp | 3 + .../application/app_configuration_mock.hpp | 2 + 6 files changed, 75 insertions(+), 22 deletions(-) diff --git a/core/application/app_configuration.hpp b/core/application/app_configuration.hpp index 6a5807553c..ec81119f58 100644 --- a/core/application/app_configuration.hpp +++ b/core/application/app_configuration.hpp @@ -325,6 +325,8 @@ namespace kagome::application { const = 0; virtual std::optional precompileWasm() const = 0; + + virtual uint32_t maxParallelDownloads() const = 0; }; } // namespace kagome::application diff --git a/core/application/impl/app_configuration_impl.cpp b/core/application/impl/app_configuration_impl.cpp index 7e34615377..6fd5eed680 100644 --- a/core/application/impl/app_configuration_impl.cpp +++ b/core/application/impl/app_configuration_impl.cpp @@ -115,6 +115,7 @@ namespace { #endif const uint32_t def_db_cache_size = 1024; const uint32_t def_parachain_runtime_instance_cache_size = 100; + const uint32_t def_max_parallel_downloads = 5; /** * Generate once at run random node name if form of UUID @@ -176,12 +177,11 @@ namespace { static constexpr std::array - interpreters { + interpreters{ #if KAGOME_WASM_COMPILER_WASM_EDGE == 1 - "WasmEdge", + "WasmEdge", #endif - "Binaryen" - }; + "Binaryen"}; static const std::string interpreters_str = fmt::format("[{}]", fmt::join(interpreters, ", ")); @@ -841,6 +841,9 @@ namespace kagome::application { ("rpc-methods", po::value(), R"("auto" (default), "unsafe", "safe")") ("no-mdns", po::bool_switch(), "(unused, zombienet stub)") ("prometheus-external", po::bool_switch(), "alias for \"--prometheus-host 0.0.0.0\"") + ("max-parallel-downloads", po::value()->default_value(def_max_parallel_downloads), + "Maximum number of peers from which to ask for the same blocks in parallel." + "This allows downloading announced blocks from multiple peers. Decrease to save traffic and risk increased latency.") ; po::options_description development_desc("Additional options"); @@ -912,8 +915,8 @@ namespace kagome::application { } if (vm.count("help") > 0) { - std::cout - << "Available subcommands: storage-explorer db-editor benchmark key\n"; + std::cout << "Available subcommands: storage-explorer db-editor " + "benchmark key\n"; std::cout << desc << '\n'; return false; } @@ -1600,6 +1603,9 @@ namespace kagome::application { runtime_exec_method_ = RuntimeExecutionMethod::Compile; } + max_parallel_downloads_ = + find_argument(vm, "max-parallel-downloads") + .value_or(def_max_parallel_downloads); // if something wrong with config print help message if (not validate_config()) { std::cout << desc << '\n'; diff --git a/core/application/impl/app_configuration_impl.hpp b/core/application/impl/app_configuration_impl.hpp index 2421b9f874..f676485dd9 100644 --- a/core/application/impl/app_configuration_impl.hpp +++ b/core/application/impl/app_configuration_impl.hpp @@ -239,6 +239,10 @@ namespace kagome::application { return precompile_wasm_; } + uint32_t maxParallelDownloads() const override { + return max_parallel_downloads_; + } + private: void parse_general_segment(const rapidjson::Value &val); void parse_blockchain_segment(const rapidjson::Value &val); @@ -382,6 +386,7 @@ namespace kagome::application { std::max(std::thread::hardware_concurrency(), 1)}; bool disable_secure_mode_{false}; std::optional precompile_wasm_; + uint32_t max_parallel_downloads_; }; } // namespace kagome::application diff --git a/core/network/impl/synchronizer_impl.cpp b/core/network/impl/synchronizer_impl.cpp index a2f9292b37..1857af6fd6 100644 --- a/core/network/impl/synchronizer_impl.cpp +++ b/core/network/impl/synchronizer_impl.cpp @@ -86,6 +86,8 @@ namespace { namespace kagome::network { + static constexpr auto default_max_peers_for_block_request = 5; + SynchronizerImpl::SynchronizerImpl( const application::AppConfiguration &app_config, application::AppStateManager &app_state_manager, @@ -122,7 +124,8 @@ namespace kagome::network { chain_sub_engine_(std::move(chain_sub_engine)), main_pool_handler_{ poolHandlerReadyMake(app_state_manager, main_thread_pool)}, - block_storage_{std::move(block_storage)} { + block_storage_{std::move(block_storage)}, + best_block_number_{} { BOOST_ASSERT(block_tree_); BOOST_ASSERT(block_executor_); BOOST_ASSERT(trie_node_db_); @@ -137,6 +140,7 @@ namespace kagome::network { BOOST_ASSERT(block_storage_); sync_method_ = app_config.syncMethod(); + max_parallel_downloads_ = app_config.maxParallelDownloads(); // Register metrics metrics_registry_->registerGaugeFamily( @@ -352,6 +356,8 @@ namespace kagome::network { known_blocks_.find(header.parent_hash) != known_blocks_.end() or block_tree_->has(header.parent_hash); + best_block_number_ = std::max(best_block_number_, header.number); + if (parent_is_known) { loadBlocks(peer_id, block_info, [wp{weak_from_this()}](auto res) { if (auto self = wp.lock()) { @@ -1192,7 +1198,39 @@ namespace kagome::network { } } }; - + // TODO: remove active_peers filling mechanism, when peer manager + // methods are implemented correctly + std::vector active_peers; + peer_manager_->enumeratePeerState( + [wp{weak_from_this()}, &active_peers, &peer_id]( + const libp2p::peer::PeerId &peer, network::PeerState &state) { + if (auto self = wp.lock()) { + if (self->best_block_number_ <= state.best_block.number + and peer != peer_id) { + active_peers.push_back(peer); + } + } + return true; + }); + std::vector selected_peers; + selected_peers.push_back(peer_id); + static const auto number_of_peers_to_add = + max_parallel_downloads_ ? max_parallel_downloads_ - 1 : 0; + if (const auto active_peers_size = active_peers.size(); + active_peers_size <= number_of_peers_to_add) { + for (const auto &p_id : active_peers) { + selected_peers.push_back(p_id); + } + } else { + std::vector indices(active_peers_size); + std::iota(indices.begin(), indices.end(), 0); + std::random_device rd; + std::mt19937 gen(rd()); + std::ranges::shuffle(indices.begin(), indices.end(), gen); + for (uint32_t i = 0; i < number_of_peers_to_add; ++i) { + selected_peers.push_back(active_peers[indices[i]]); + } + } if (sync_method_ == application::SyncMethod::Full) { auto lower = generations_.begin()->number; auto upper = generations_.rbegin()->number + 1; @@ -1209,7 +1247,10 @@ namespace kagome::network { lower, upper, hint, - [wp{weak_from_this()}, peer_id, handler = std::move(handler)]( + [wp{weak_from_this()}, + peer_id, + handler = std::move(handler), + selected_peers = std::move(selected_peers)]( outcome::result res) { if (auto self = wp.lock()) { if (not res.has_value()) { @@ -1221,22 +1262,16 @@ namespace kagome::network { return; } auto &common_block_info = res.value(); - SL_DEBUG(self->log_, - "Start to load next portion of blocks from {} " - "since block {}", - peer_id, - common_block_info); - self->loadBlocks( - peer_id, common_block_info, std::move(handler)); + for (const auto &p_id : selected_peers) { + self->loadBlocks( + p_id, common_block_info, std::move(handler)); + } } }); } else { - SL_DEBUG(log_, - "Start to load next portion of blocks from {} " - "since block {}", - peer_id, - block_info); - loadBlocks(peer_id, block_info, std::move(handler)); + for (const auto &p_id : selected_peers) { + loadBlocks(p_id, block_info, std::move(handler)); + } } return; } diff --git a/core/network/impl/synchronizer_impl.hpp b/core/network/impl/synchronizer_impl.hpp index 6f932976c4..65664160f0 100644 --- a/core/network/impl/synchronizer_impl.hpp +++ b/core/network/impl/synchronizer_impl.hpp @@ -261,6 +261,9 @@ namespace kagome::network { primitives::events::ChainSubscriptionEnginePtr chain_sub_engine_; std::shared_ptr main_pool_handler_; std::shared_ptr block_storage_; + primitives::BlockNumber best_block_number_; + uint32_t max_parallel_downloads_; + application::SyncMethod sync_method_; diff --git a/test/mock/core/application/app_configuration_mock.hpp b/test/mock/core/application/app_configuration_mock.hpp index 3fa0d2761e..8e67103f16 100644 --- a/test/mock/core/application/app_configuration_mock.hpp +++ b/test/mock/core/application/app_configuration_mock.hpp @@ -197,6 +197,8 @@ namespace kagome::application { precompileWasm, (), (const, override)); + + MOCK_METHOD(uint32_t, maxParallelDownloads, (), (const, override)); }; } // namespace kagome::application From d65d6805ea843ff16ec78893210992b994309108 Mon Sep 17 00:00:00 2001 From: Boris Erakhtin Date: Thu, 26 Dec 2024 13:41:15 +0500 Subject: [PATCH 02/20] Correct TODO format --- core/network/impl/synchronizer_impl.cpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/network/impl/synchronizer_impl.cpp b/core/network/impl/synchronizer_impl.cpp index 7afd6bf6c2..ef86d94989 100644 --- a/core/network/impl/synchronizer_impl.cpp +++ b/core/network/impl/synchronizer_impl.cpp @@ -1199,8 +1199,9 @@ namespace kagome::network { } } }; - // TODO: remove active_peers filling mechanism, when peer manager - // methods are implemented correctly + // TODO(ErakhtinB): #2326, review peer manager + // remove active_peers filling mechanism over peer states, when peer + // manager methods are implemented correctly std::vector active_peers; peer_manager_->enumeratePeerState( [wp{weak_from_this()}, &active_peers, &peer_id]( From 7714e6121bf73dde549122b110341256b9e2eebf Mon Sep 17 00:00:00 2001 From: Boris Erakhtin Date: Thu, 26 Dec 2024 13:45:13 +0500 Subject: [PATCH 03/20] Ranges for indices --- core/network/impl/synchronizer_impl.cpp | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/core/network/impl/synchronizer_impl.cpp b/core/network/impl/synchronizer_impl.cpp index ef86d94989..940b6f3e97 100644 --- a/core/network/impl/synchronizer_impl.cpp +++ b/core/network/impl/synchronizer_impl.cpp @@ -1224,11 +1224,13 @@ namespace kagome::network { selected_peers.push_back(p_id); } } else { - std::vector indices(active_peers_size); - std::iota(indices.begin(), indices.end(), 0); + std::vector indices; + for (int i = 0; i < active_peers_size; ++i) { + indices.push_back(i); + } std::random_device rd; std::mt19937 gen(rd()); - std::ranges::shuffle(indices.begin(), indices.end(), gen); + std::ranges::shuffle(indices, gen); for (uint32_t i = 0; i < number_of_peers_to_add; ++i) { selected_peers.push_back(active_peers[indices[i]]); } From 2aa0cf060d07a09a55589a71a49536676538e440 Mon Sep 17 00:00:00 2001 From: Boris Erakhtin Date: Thu, 26 Dec 2024 14:45:18 +0500 Subject: [PATCH 04/20] Build fix --- core/network/impl/synchronizer_impl.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/network/impl/synchronizer_impl.cpp b/core/network/impl/synchronizer_impl.cpp index dc8bef758e..236355c4d6 100644 --- a/core/network/impl/synchronizer_impl.cpp +++ b/core/network/impl/synchronizer_impl.cpp @@ -1224,9 +1224,9 @@ namespace kagome::network { selected_peers.push_back(p_id); } } else { - std::vector indices; + std::vector indices; indices.reserve(active_peers_size); - for (int i = 0; i < active_peers_size; ++i) { + for (uint32_t i = 0; i < active_peers_size; ++i) { indices.push_back(i); } std::random_device rd; From 35daeb75d46d241f39a3dda10dd82a36f801b9da Mon Sep 17 00:00:00 2001 From: Boris Erakhtin Date: Thu, 26 Dec 2024 19:57:34 +0500 Subject: [PATCH 05/20] No peer manager is used to work with active peers --- core/network/impl/synchronizer_impl.cpp | 99 ++++++++----------------- core/network/impl/synchronizer_impl.hpp | 1 - 2 files changed, 32 insertions(+), 68 deletions(-) diff --git a/core/network/impl/synchronizer_impl.cpp b/core/network/impl/synchronizer_impl.cpp index 236355c4d6..30b3fd55e5 100644 --- a/core/network/impl/synchronizer_impl.cpp +++ b/core/network/impl/synchronizer_impl.cpp @@ -87,8 +87,6 @@ namespace { namespace kagome::network { - static constexpr auto default_max_peers_for_block_request = 5; - SynchronizerImpl::SynchronizerImpl( const application::AppConfiguration &app_config, application::AppStateManager &app_state_manager, @@ -125,8 +123,7 @@ namespace kagome::network { chain_sub_engine_(std::move(chain_sub_engine)), main_pool_handler_{ poolHandlerReadyMake(app_state_manager, main_thread_pool)}, - block_storage_{std::move(block_storage)}, - best_block_number_{} { + block_storage_{std::move(block_storage)} { BOOST_ASSERT(block_tree_); BOOST_ASSERT(block_executor_); BOOST_ASSERT(trie_node_db_); @@ -357,8 +354,6 @@ namespace kagome::network { known_blocks_.find(header.parent_hash) != known_blocks_.end() or block_tree_->has(header.parent_hash); - best_block_number_ = std::max(best_block_number_, header.number); - if (parent_is_known) { loadBlocks(peer_id, block_info, [wp{weak_from_this()}](auto res) { if (auto self = wp.lock()) { @@ -1163,19 +1158,24 @@ namespace kagome::network { continue; } - for (auto p_it = peers.begin(); p_it != peers.end();) { - auto cp_it = p_it++; - - auto &peer_id = *cp_it; - - if (busy_peers_.find(peer_id) != busy_peers_.end()) { - SL_TRACE(log_, "Peer {} for block {} is busy", peer_id, block_info); - continue; + std::vector active_peers; + for (auto it = peers.begin(); it != peers.end(); ++it) { + const auto &peer_id = *it; + if (busy_peers_.find(peer_id) == busy_peers_.end()) { + active_peers.push_back(peer_id); } - - busy_peers_.insert(peers.extract(cp_it)); - SL_TRACE(log_, "Peer {} marked as busy", peer_id); - + } + std::random_device rd; + std::mt19937 gen(rd()); + std::ranges::shuffle(active_peers, gen); + while (active_peers.size() + > static_cast(max_parallel_downloads_)) { + active_peers.pop_back(); + } + for (auto it = active_peers.begin(); it != active_peers.end(); ++it) { + auto &peer_id = *it; + busy_peers_.emplace(peer_id); + peers.erase(peer_id); auto handler = [wp{weak_from_this()}, peer_id](const auto &res) { if (auto self = wp.lock()) { if (self->busy_peers_.erase(peer_id) > 0) { @@ -1199,43 +1199,6 @@ namespace kagome::network { } } }; - // TODO(ErakhtinB): #2326, review peer manager - // remove active_peers filling mechanism over peer states, when peer - // manager methods are implemented correctly - std::vector active_peers; - peer_manager_->enumeratePeerState( - [wp{weak_from_this()}, &active_peers, &peer_id]( - const libp2p::peer::PeerId &peer, network::PeerState &state) { - if (auto self = wp.lock()) { - if (self->best_block_number_ <= state.best_block.number - and peer != peer_id) { - active_peers.push_back(peer); - } - } - return true; - }); - std::vector selected_peers; - selected_peers.push_back(peer_id); - static const auto number_of_peers_to_add = - max_parallel_downloads_ ? max_parallel_downloads_ - 1 : 0; - if (const auto active_peers_size = active_peers.size(); - active_peers_size <= number_of_peers_to_add) { - for (const auto &p_id : active_peers) { - selected_peers.push_back(p_id); - } - } else { - std::vector indices; - indices.reserve(active_peers_size); - for (uint32_t i = 0; i < active_peers_size; ++i) { - indices.push_back(i); - } - std::random_device rd; - std::mt19937 gen(rd()); - std::ranges::shuffle(indices, gen); - for (uint32_t i = 0; i < number_of_peers_to_add; ++i) { - selected_peers.push_back(active_peers[indices[i]]); - } - } if (sync_method_ == application::SyncMethod::Full) { auto lower = generations_.begin()->number; auto upper = generations_.rbegin()->number + 1; @@ -1252,10 +1215,7 @@ namespace kagome::network { lower, upper, hint, - [wp{weak_from_this()}, - peer_id, - handler = std::move(handler), - selected_peers = std::move(selected_peers)]( + [wp{weak_from_this()}, peer_id, handler = std::move(handler)]( outcome::result res) { if (auto self = wp.lock()) { if (not res.has_value()) { @@ -1267,18 +1227,23 @@ namespace kagome::network { return; } auto &common_block_info = res.value(); - for (const auto &p_id : selected_peers) { - self->loadBlocks( - p_id, common_block_info, std::move(handler)); - } + SL_DEBUG(self->log_, + "Start to load next portion of blocks from {} " + "since block {}", + peer_id, + common_block_info); + self->loadBlocks( + peer_id, common_block_info, std::move(handler)); } }); } else { - for (const auto &p_id : selected_peers) { - loadBlocks(p_id, block_info, std::move(handler)); - } + SL_DEBUG(log_, + "Start to load next portion of blocks from {} " + "since block {}", + peer_id, + block_info); + loadBlocks(peer_id, block_info, std::move(handler)); } - return; } SL_TRACE(log_, diff --git a/core/network/impl/synchronizer_impl.hpp b/core/network/impl/synchronizer_impl.hpp index 65664160f0..daa610b9b3 100644 --- a/core/network/impl/synchronizer_impl.hpp +++ b/core/network/impl/synchronizer_impl.hpp @@ -261,7 +261,6 @@ namespace kagome::network { primitives::events::ChainSubscriptionEnginePtr chain_sub_engine_; std::shared_ptr main_pool_handler_; std::shared_ptr block_storage_; - primitives::BlockNumber best_block_number_; uint32_t max_parallel_downloads_; From dd28c19972c6da874fbec9f986727adb7372632f Mon Sep 17 00:00:00 2001 From: Boris Erakhtin Date: Thu, 26 Dec 2024 20:38:45 +0500 Subject: [PATCH 06/20] clang tidy fixes --- core/network/impl/synchronizer_impl.cpp | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/core/network/impl/synchronizer_impl.cpp b/core/network/impl/synchronizer_impl.cpp index 30b3fd55e5..26333af9a2 100644 --- a/core/network/impl/synchronizer_impl.cpp +++ b/core/network/impl/synchronizer_impl.cpp @@ -1159,8 +1159,7 @@ namespace kagome::network { } std::vector active_peers; - for (auto it = peers.begin(); it != peers.end(); ++it) { - const auto &peer_id = *it; + for (const auto &peer_id : peers) { if (busy_peers_.find(peer_id) == busy_peers_.end()) { active_peers.push_back(peer_id); } @@ -1172,8 +1171,7 @@ namespace kagome::network { > static_cast(max_parallel_downloads_)) { active_peers.pop_back(); } - for (auto it = active_peers.begin(); it != active_peers.end(); ++it) { - auto &peer_id = *it; + for (const auto& peer_id : active_peers) { busy_peers_.emplace(peer_id); peers.erase(peer_id); auto handler = [wp{weak_from_this()}, peer_id](const auto &res) { From 5c6d66d61bf41535370a4417003e538e487c1f48 Mon Sep 17 00:00:00 2001 From: Boris Erakhtin Date: Thu, 26 Dec 2024 21:11:56 +0500 Subject: [PATCH 07/20] Better erasing from active_peers --- core/network/impl/synchronizer_impl.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/network/impl/synchronizer_impl.cpp b/core/network/impl/synchronizer_impl.cpp index 26333af9a2..78ee8792b2 100644 --- a/core/network/impl/synchronizer_impl.cpp +++ b/core/network/impl/synchronizer_impl.cpp @@ -1167,11 +1167,11 @@ namespace kagome::network { std::random_device rd; std::mt19937 gen(rd()); std::ranges::shuffle(active_peers, gen); - while (active_peers.size() - > static_cast(max_parallel_downloads_)) { - active_peers.pop_back(); + if (active_peers.size() > static_cast(max_parallel_downloads_)) { + active_peers.erase(active_peers.begin() + max_parallel_downloads_, + active_peers.end()); } - for (const auto& peer_id : active_peers) { + for (const auto &peer_id : active_peers) { busy_peers_.emplace(peer_id); peers.erase(peer_id); auto handler = [wp{weak_from_this()}, peer_id](const auto &res) { From 4f9a09bcab2847aff59c7bd2320afb95e7fea363 Mon Sep 17 00:00:00 2001 From: Boris Erakhtin Date: Fri, 27 Dec 2024 11:15:33 +0500 Subject: [PATCH 08/20] random gen is member of sync impl --- core/network/impl/synchronizer_impl.cpp | 5 ++--- core/network/impl/synchronizer_impl.hpp | 3 ++- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/core/network/impl/synchronizer_impl.cpp b/core/network/impl/synchronizer_impl.cpp index 78ee8792b2..ec824ca073 100644 --- a/core/network/impl/synchronizer_impl.cpp +++ b/core/network/impl/synchronizer_impl.cpp @@ -139,6 +139,7 @@ namespace kagome::network { sync_method_ = app_config.syncMethod(); max_parallel_downloads_ = app_config.maxParallelDownloads(); + random_gen_ = std::mt19937(std::random_device{}()); // Register metrics metrics_registry_->registerGaugeFamily( @@ -1164,9 +1165,7 @@ namespace kagome::network { active_peers.push_back(peer_id); } } - std::random_device rd; - std::mt19937 gen(rd()); - std::ranges::shuffle(active_peers, gen); + std::ranges::shuffle(active_peers, random_gen_); if (active_peers.size() > static_cast(max_parallel_downloads_)) { active_peers.erase(active_peers.begin() + max_parallel_downloads_, active_peers.end()); diff --git a/core/network/impl/synchronizer_impl.hpp b/core/network/impl/synchronizer_impl.hpp index daa610b9b3..fa485a0e65 100644 --- a/core/network/impl/synchronizer_impl.hpp +++ b/core/network/impl/synchronizer_impl.hpp @@ -11,6 +11,7 @@ #include #include #include +#include #include #include @@ -262,7 +263,7 @@ namespace kagome::network { std::shared_ptr main_pool_handler_; std::shared_ptr block_storage_; uint32_t max_parallel_downloads_; - + std::mt19937 random_gen_; application::SyncMethod sync_method_; From 0807169d98b94a00a7378acd04b94e45edd03976 Mon Sep 17 00:00:00 2001 From: Boris Erakhtin Date: Fri, 27 Dec 2024 13:07:57 +0500 Subject: [PATCH 09/20] Just log, remove --- core/network/impl/synchronizer_impl.cpp | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/core/network/impl/synchronizer_impl.cpp b/core/network/impl/synchronizer_impl.cpp index ec824ca073..a678198a08 100644 --- a/core/network/impl/synchronizer_impl.cpp +++ b/core/network/impl/synchronizer_impl.cpp @@ -1170,7 +1170,9 @@ namespace kagome::network { active_peers.erase(active_peers.begin() + max_parallel_downloads_, active_peers.end()); } + SL_INFO(log_, "active_peers size {}", active_peers.size()); for (const auto &peer_id : active_peers) { + SL_INFO(log_, "Iterating peer_id {}", peer_id); busy_peers_.emplace(peer_id); peers.erase(peer_id); auto handler = [wp{weak_from_this()}, peer_id](const auto &res) { @@ -1181,13 +1183,13 @@ namespace kagome::network { SL_TRACE(self->log_, "End asking portion of blocks"); self->asking_blocks_portion_in_progress_ = false; if (not res.has_value()) { - SL_DEBUG(self->log_, + SL_INFO(self->log_, "Loading next portion of blocks from {} is failed: {}", peer_id, res.error()); return; } - SL_DEBUG(self->log_, + SL_INFO(self->log_, "Portion of blocks from {} is loaded till {}", peer_id, res.value()); @@ -1216,7 +1218,7 @@ namespace kagome::network { outcome::result res) { if (auto self = wp.lock()) { if (not res.has_value()) { - SL_DEBUG(self->log_, + SL_INFO(self->log_, "Can't load next portion of blocks from {}: {}", peer_id, res.error()); @@ -1224,7 +1226,7 @@ namespace kagome::network { return; } auto &common_block_info = res.value(); - SL_DEBUG(self->log_, + SL_INFO(self->log_, "Start to load next portion of blocks from {} " "since block {}", peer_id, @@ -1234,7 +1236,7 @@ namespace kagome::network { } }); } else { - SL_DEBUG(log_, + SL_INFO(log_, "Start to load next portion of blocks from {} " "since block {}", peer_id, From c8e46b6645c4489aa22e160eadb738b0b3d3e6db Mon Sep 17 00:00:00 2001 From: Boris Erakhtin Date: Fri, 27 Dec 2024 18:02:44 +0500 Subject: [PATCH 10/20] Multiple blocks download by announce --- core/network/impl/synchronizer_impl.cpp | 67 ++++++++++++++++--------- 1 file changed, 43 insertions(+), 24 deletions(-) diff --git a/core/network/impl/synchronizer_impl.cpp b/core/network/impl/synchronizer_impl.cpp index a678198a08..8689618e5b 100644 --- a/core/network/impl/synchronizer_impl.cpp +++ b/core/network/impl/synchronizer_impl.cpp @@ -356,11 +356,31 @@ namespace kagome::network { or block_tree_->has(header.parent_hash); if (parent_is_known) { - loadBlocks(peer_id, block_info, [wp{weak_from_this()}](auto res) { - if (auto self = wp.lock()) { - SL_TRACE(self->log_, "Block(s) enqueued to apply by announce"); + std::vector selected_peers = {peer_id}; + if (auto b_it = known_blocks_.find(from.hash); + b_it != known_blocks_.end()) { + std::vector active_peers; + for (const auto &peer : b_it->second.peers) { + if (peer != peer_id) { + active_peers.push_back(peer); + } } - }); + std::ranges::shuffle(active_peers, random_gen_); + for (auto it = active_peers.begin(); it != active_peers.end(); ++it) { + if (selected_peers.size() >= max_parallel_downloads_) { + break; + } + selected_peers.push_back(*it); + } + } + + for (const auto &p_id : selected_peers) { + loadBlocks(p_id, block_info, [wp{weak_from_this()}](auto res) { + if (auto self = wp.lock()) { + SL_TRACE(self->log_, "Block(s) enqueued to apply by announce"); + } + }); + } return true; } @@ -1159,22 +1179,19 @@ namespace kagome::network { continue; } - std::vector active_peers; - for (const auto &peer_id : peers) { - if (busy_peers_.find(peer_id) == busy_peers_.end()) { - active_peers.push_back(peer_id); + for (auto p_it = peers.begin(); p_it != peers.end();) { + auto cp_it = p_it++; + + auto &peer_id = *cp_it; + + if (busy_peers_.find(peer_id) != busy_peers_.end()) { + SL_TRACE(log_, "Peer {} for block {} is busy", peer_id, block_info); + continue; } - } - std::ranges::shuffle(active_peers, random_gen_); - if (active_peers.size() > static_cast(max_parallel_downloads_)) { - active_peers.erase(active_peers.begin() + max_parallel_downloads_, - active_peers.end()); - } - SL_INFO(log_, "active_peers size {}", active_peers.size()); - for (const auto &peer_id : active_peers) { - SL_INFO(log_, "Iterating peer_id {}", peer_id); - busy_peers_.emplace(peer_id); - peers.erase(peer_id); + + busy_peers_.insert(peers.extract(cp_it)); + SL_TRACE(log_, "Peer {} marked as busy", peer_id); + auto handler = [wp{weak_from_this()}, peer_id](const auto &res) { if (auto self = wp.lock()) { if (self->busy_peers_.erase(peer_id) > 0) { @@ -1183,13 +1200,13 @@ namespace kagome::network { SL_TRACE(self->log_, "End asking portion of blocks"); self->asking_blocks_portion_in_progress_ = false; if (not res.has_value()) { - SL_INFO(self->log_, + SL_DEBUG(self->log_, "Loading next portion of blocks from {} is failed: {}", peer_id, res.error()); return; } - SL_INFO(self->log_, + SL_DEBUG(self->log_, "Portion of blocks from {} is loaded till {}", peer_id, res.value()); @@ -1198,6 +1215,7 @@ namespace kagome::network { } } }; + if (sync_method_ == application::SyncMethod::Full) { auto lower = generations_.begin()->number; auto upper = generations_.rbegin()->number + 1; @@ -1218,7 +1236,7 @@ namespace kagome::network { outcome::result res) { if (auto self = wp.lock()) { if (not res.has_value()) { - SL_INFO(self->log_, + SL_DEBUG(self->log_, "Can't load next portion of blocks from {}: {}", peer_id, res.error()); @@ -1226,7 +1244,7 @@ namespace kagome::network { return; } auto &common_block_info = res.value(); - SL_INFO(self->log_, + SL_DEBUG(self->log_, "Start to load next portion of blocks from {} " "since block {}", peer_id, @@ -1236,13 +1254,14 @@ namespace kagome::network { } }); } else { - SL_INFO(log_, + SL_DEBUG(log_, "Start to load next portion of blocks from {} " "since block {}", peer_id, block_info); loadBlocks(peer_id, block_info, std::move(handler)); } + return; } SL_TRACE(log_, From 9604a316ade02177f393e9e440a0cbcbfd68f8e0 Mon Sep 17 00:00:00 2001 From: Boris Erakhtin Date: Fri, 27 Dec 2024 19:24:46 +0500 Subject: [PATCH 11/20] Random peers downloading only in syncByHeader --- core/network/impl/synchronizer_impl.cpp | 61 +++++++++++++------------ 1 file changed, 33 insertions(+), 28 deletions(-) diff --git a/core/network/impl/synchronizer_impl.cpp b/core/network/impl/synchronizer_impl.cpp index 8689618e5b..8a0c1926ad 100644 --- a/core/network/impl/synchronizer_impl.cpp +++ b/core/network/impl/synchronizer_impl.cpp @@ -330,11 +330,30 @@ namespace kagome::network { return false; } + std::vector selected_peers = {peer_id}; + std::vector active_peers; + peer_manager_->enumeratePeerState( + [&active_peers, &block_info](const PeerId &peer_id, + PeerState &peer_state) { + if (peer_state.best_block >= block_info) { + active_peers.push_back(peer_id); + } + return true; + }); + std::ranges::shuffle(active_peers, random_gen_); + for (auto it = active_peers.begin(); it != active_peers.end(); ++it) { + if (selected_peers.size() >= max_parallel_downloads_) { + break; + } + selected_peers.push_back(*it); + } // Block is already enqueued if (auto it = known_blocks_.find(block_info.hash); it != known_blocks_.end()) { auto &block_in_queue = it->second; - block_in_queue.peers.emplace(peer_id); + for (const auto &p_id : selected_peers) { + block_in_queue.peers.emplace(p_id); + } return false; } @@ -356,24 +375,6 @@ namespace kagome::network { or block_tree_->has(header.parent_hash); if (parent_is_known) { - std::vector selected_peers = {peer_id}; - if (auto b_it = known_blocks_.find(from.hash); - b_it != known_blocks_.end()) { - std::vector active_peers; - for (const auto &peer : b_it->second.peers) { - if (peer != peer_id) { - active_peers.push_back(peer); - } - } - std::ranges::shuffle(active_peers, random_gen_); - for (auto it = active_peers.begin(); it != active_peers.end(); ++it) { - if (selected_peers.size() >= max_parallel_downloads_) { - break; - } - selected_peers.push_back(*it); - } - } - for (const auto &p_id : selected_peers) { loadBlocks(p_id, block_info, [wp{weak_from_this()}](auto res) { if (auto self = wp.lock()) { @@ -385,15 +386,19 @@ namespace kagome::network { } // Otherwise, is using base way to enqueue - return syncByBlockInfo( - block_info, - peer_id, - [wp{weak_from_this()}](auto res) { - if (auto self = wp.lock()) { - SL_TRACE(self->log_, "Block(s) enqueued to load by announce"); - } - }, - false); + auto res = true; + for (const auto &p_id : selected_peers) { + res &= syncByBlockInfo( + block_info, + p_id, + [wp{weak_from_this()}](auto res) { + if (auto self = wp.lock()) { + SL_TRACE(self->log_, "Block(s) enqueued to load by announce"); + } + }, + false); + } + return res; } void SynchronizerImpl::findCommonBlock( From ab41109565574e6941aeff0afbe8bcce698d1b96 Mon Sep 17 00:00:00 2001 From: Boris Erakhtin Date: Fri, 27 Dec 2024 19:42:10 +0500 Subject: [PATCH 12/20] Check for peer id while iteratring over peer states --- core/network/impl/synchronizer_impl.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/network/impl/synchronizer_impl.cpp b/core/network/impl/synchronizer_impl.cpp index 8a0c1926ad..d2c569fa2c 100644 --- a/core/network/impl/synchronizer_impl.cpp +++ b/core/network/impl/synchronizer_impl.cpp @@ -333,10 +333,10 @@ namespace kagome::network { std::vector selected_peers = {peer_id}; std::vector active_peers; peer_manager_->enumeratePeerState( - [&active_peers, &block_info](const PeerId &peer_id, - PeerState &peer_state) { - if (peer_state.best_block >= block_info) { - active_peers.push_back(peer_id); + [&active_peers, &block_info, &peer_id](const PeerId &p_id, + PeerState &peer_state) { + if (peer_state.best_block >= block_info and p_id != peer_id) { + active_peers.push_back(p_id); } return true; }); From b5630f49e1b6cfda7d3816c223235bd75e3c0b09 Mon Sep 17 00:00:00 2001 From: Boris Erakhtin Date: Tue, 31 Dec 2024 16:17:49 +0500 Subject: [PATCH 13/20] Maximum downloads are handled correct --- core/blockchain/impl/block_tree_impl.cpp | 1 - core/network/impl/synchronizer_impl.cpp | 77 +++++++++++++++++------- core/network/impl/synchronizer_impl.hpp | 3 +- 3 files changed, 56 insertions(+), 25 deletions(-) diff --git a/core/blockchain/impl/block_tree_impl.cpp b/core/blockchain/impl/block_tree_impl.cpp index 115d556a1a..c17bf6e557 100644 --- a/core/blockchain/impl/block_tree_impl.cpp +++ b/core/blockchain/impl/block_tree_impl.cpp @@ -270,7 +270,6 @@ namespace kagome::blockchain { std::move(justification_storage_policy), state_pruner, main_thread_pool)); - // Add non-finalized block to the block tree for (auto &e : collected) { const auto &block = e.first; diff --git a/core/network/impl/synchronizer_impl.cpp b/core/network/impl/synchronizer_impl.cpp index d2c569fa2c..11fd22bf8b 100644 --- a/core/network/impl/synchronizer_impl.cpp +++ b/core/network/impl/synchronizer_impl.cpp @@ -330,31 +330,43 @@ namespace kagome::network { return false; } + // Block is already enqueued + if (auto it = known_blocks_.find(block_info.hash); + it != known_blocks_.end()) { + auto &block_in_queue = it->second; + block_in_queue.peers.emplace(peer_id); + return false; + } + + // Already enough parallel downloads + { + std::shared_lock lock{load_blocks_mutex_}; + if (auto it = load_blocks_.find(block_info); it != load_blocks_.end()) { + if (it->second >= max_parallel_downloads_) { + return false; + } + } + } + std::vector selected_peers = {peer_id}; std::vector active_peers; peer_manager_->enumeratePeerState( - [&active_peers, &block_info, &peer_id](const PeerId &p_id, - PeerState &peer_state) { - if (peer_state.best_block >= block_info and p_id != peer_id) { + [&active_peers, &peer_id](const PeerId &p_id, PeerState &) { + if (p_id != peer_id) { active_peers.push_back(p_id); } return true; }); - std::ranges::shuffle(active_peers, random_gen_); - for (auto it = active_peers.begin(); it != active_peers.end(); ++it) { - if (selected_peers.size() >= max_parallel_downloads_) { - break; - } - selected_peers.push_back(*it); - } - // Block is already enqueued - if (auto it = known_blocks_.find(block_info.hash); - it != known_blocks_.end()) { - auto &block_in_queue = it->second; - for (const auto &p_id : selected_peers) { - block_in_queue.peers.emplace(p_id); - } - return false; + static const auto peers_to_add_number = + max_parallel_downloads_ ? max_parallel_downloads_ - 1 : 0; + if (active_peers.size() <= peers_to_add_number) { + selected_peers.insert( + selected_peers.end(), active_peers.begin(), active_peers.end()); + } else { + std::ranges::shuffle(active_peers, random_gen_); + selected_peers.insert(selected_peers.end(), + active_peers.begin(), + active_peers.begin() + peers_to_add_number); } // Number of provided block header greater currently watched. @@ -580,12 +592,20 @@ namespace kagome::network { return; } - if (not load_blocks_.emplace(from).second) { - if (handler) { - handler(Error::ALREADY_IN_QUEUE); + { + std::unique_lock lock{load_blocks_mutex_}; + if (auto [it, ok] = load_blocks_.emplace(from, 1); not ok) { + auto &requests_number = it->second; + if (requests_number >= max_parallel_downloads_) { + if (handler) { + handler(Error::ALREADY_IN_QUEUE); + } + return; + } + ++requests_number; } - return; } + load_blocks_max_ = {from.number, now}; auto response_handler = @@ -600,7 +620,16 @@ namespace kagome::network { if (not self) { return; } - self->load_blocks_.erase(from); + { + std::unique_lock lock{self->load_blocks_mutex_}; + if (auto it = self->load_blocks_.find(from); + it != self->load_blocks_.end()) { + auto &requests_number = it->second; + if (requests_number) { + --requests_number; + } + } + } // Any error interrupts loading of blocks if (response_res.has_error()) { @@ -782,6 +811,8 @@ namespace kagome::network { }); self->metric_import_queue_length_->set( self->known_blocks_.size()); + std::unique_lock lock{self->load_blocks_mutex_}; + self->load_blocks_.erase(from); } else { it->second.peers.emplace(peer_id); SL_TRACE(self->log_, diff --git a/core/network/impl/synchronizer_impl.hpp b/core/network/impl/synchronizer_impl.hpp index fa485a0e65..18dcb344ad 100644 --- a/core/network/impl/synchronizer_impl.hpp +++ b/core/network/impl/synchronizer_impl.hpp @@ -312,7 +312,8 @@ namespace kagome::network { std::atomic_bool asking_blocks_portion_in_progress_ = false; std::set busy_peers_; - std::unordered_set load_blocks_; + std::unordered_map load_blocks_; + std::shared_mutex load_blocks_mutex_; std::pair load_blocks_max_{}; From 4a3ddce8850206a829aaf6ee94d10afc0f361df9 Mon Sep 17 00:00:00 2001 From: Boris Erakhtin Date: Tue, 31 Dec 2024 20:00:30 +0500 Subject: [PATCH 14/20] Best way to handle --- core/network/impl/synchronizer_impl.cpp | 420 ++++++++++++------------ 1 file changed, 211 insertions(+), 209 deletions(-) diff --git a/core/network/impl/synchronizer_impl.cpp b/core/network/impl/synchronizer_impl.cpp index 11fd22bf8b..5c6f2966b5 100644 --- a/core/network/impl/synchronizer_impl.cpp +++ b/core/network/impl/synchronizer_impl.cpp @@ -608,245 +608,247 @@ namespace kagome::network { load_blocks_max_ = {from.number, now}; - auto response_handler = - [wp{weak_from_this()}, - from, - peer_id, - handler = std::move(handler), - need_body = has(request.fields, BlockAttribute::BODY), - parent_hash = primitives::BlockHash{}]( - outcome::result response_res) mutable { - auto self = wp.lock(); - if (not self) { - return; - } - { - std::unique_lock lock{self->load_blocks_mutex_}; - if (auto it = self->load_blocks_.find(from); - it != self->load_blocks_.end()) { - auto &requests_number = it->second; - if (requests_number) { - --requests_number; - } - } - } + auto response_handler = [wp{weak_from_this()}, + from, + peer_id, + handler = std::move(handler), + need_body = + has(request.fields, BlockAttribute::BODY), + parent_hash = primitives::BlockHash{}]( + outcome::result + response_res) mutable { + auto self = wp.lock(); + if (not self) { + return; + } - // Any error interrupts loading of blocks - if (response_res.has_error()) { - SL_VERBOSE(self->log_, - "Can't load blocks from {} beginning block {}: {}", - peer_id, - from, - response_res.error()); - if (handler) { - handler(response_res.as_failure()); - } - return; - } - auto &blocks = response_res.value().blocks; - - // No block in response is abnormal situation. - // At least one starting block should be returned as existing - if (blocks.empty()) { - SL_VERBOSE(self->log_, - "Can't load blocks from {} beginning block {}: " - "Response does not have any blocks", - peer_id, - from); - if (handler) { - handler(Error::EMPTY_RESPONSE); - } - return; - } + // Any error interrupts loading of blocks + if (response_res.has_error()) { + SL_VERBOSE(self->log_, + "Can't load blocks from {} beginning block {}: {}", + peer_id, + from, + response_res.error()); + if (handler) { + handler(response_res.as_failure()); + } + return; + } + auto &blocks = response_res.value().blocks; - SL_TRACE(self->log_, - "{} blocks are loaded from {} beginning block {}", - blocks.size(), + // No block in response is abnormal situation. + // At least one starting block should be returned as existing + if (blocks.empty()) { + SL_VERBOSE(self->log_, + "Can't load blocks from {} beginning block {}: " + "Response does not have any blocks", peer_id, from); + if (handler) { + handler(Error::EMPTY_RESPONSE); + } + return; + } - if (blocks[0].header - and blocks[0].header->number - > self->block_tree_->getLastFinalized().number - and not self->known_blocks_.contains( - blocks[0].header->parent_hash) - and not self->block_tree_->has(blocks[0].header->parent_hash)) { - if (handler) { - handler(Error::DISCARDED_BLOCK); + SL_TRACE(self->log_, + "{} blocks are loaded from {} beginning block {}", + blocks.size(), + peer_id, + from); + + if (blocks[0].header + and blocks[0].header->number + > self->block_tree_->getLastFinalized().number + and not self->known_blocks_.contains(blocks[0].header->parent_hash) + and not self->block_tree_->has(blocks[0].header->parent_hash)) { + if (handler) { + handler(Error::DISCARDED_BLOCK); + } + return; + } + + bool some_blocks_added = false; + primitives::BlockInfo last_loaded_block; + + for (auto &block : blocks) { + // Check if header is provided + if (not block.header.has_value()) { + SL_VERBOSE(self->log_, + "Can't load blocks from {} starting from block {}: " + "Received block without header", + peer_id, + from); + if (handler) { + handler(Error::RESPONSE_WITHOUT_BLOCK_HEADER); + } + return; + } + auto &header = block.header.value(); + { + std::unique_lock lock{self->load_blocks_mutex_}; + if (auto it = + self->load_blocks_.find(BlockInfo(header.number, block.hash)); + it != self->load_blocks_.end()) { + auto &requests_number = it->second; + if (requests_number > 1) { + --requests_number; + } else { + self->load_blocks_.erase(it); } - return; } + } + + // Check if body is provided + if (need_body and block.header->number != 0 + and not block.body.has_value()) { + SL_VERBOSE(self->log_, + "Can't load blocks from {} starting from block {}: " + "Received block without body", + peer_id, + from); + if (handler) { + handler(Error::RESPONSE_WITHOUT_BLOCK_BODY); + } + return; + } - bool some_blocks_added = false; - primitives::BlockInfo last_loaded_block; + const auto &last_finalized_block = + self->block_tree_->getLastFinalized(); - for (auto &block : blocks) { - // Check if header is provided - if (not block.header.has_value()) { + // Check by number if block is not finalized yet + if (last_finalized_block.number >= header.number) { + if (last_finalized_block.number == header.number) { + if (last_finalized_block.hash != block.hash) { SL_VERBOSE(self->log_, "Can't load blocks from {} starting from block {}: " - "Received block without header", + "Received discarded block {}", peer_id, - from); - if (handler) { - handler(Error::RESPONSE_WITHOUT_BLOCK_HEADER); - } - return; - } - // Check if body is provided - if (need_body and block.header->number != 0 - and not block.body.has_value()) { - SL_VERBOSE(self->log_, - "Can't load blocks from {} starting from block {}: " - "Received block without body", - peer_id, - from); + from, + BlockInfo(header.number, block.hash)); if (handler) { - handler(Error::RESPONSE_WITHOUT_BLOCK_BODY); + handler(Error::DISCARDED_BLOCK); } return; } - auto &header = block.header.value(); - - const auto &last_finalized_block = - self->block_tree_->getLastFinalized(); - - // Check by number if block is not finalized yet - if (last_finalized_block.number >= header.number) { - if (last_finalized_block.number == header.number) { - if (last_finalized_block.hash != block.hash) { - SL_VERBOSE( - self->log_, - "Can't load blocks from {} starting from block {}: " - "Received discarded block {}", - peer_id, - from, - BlockInfo(header.number, block.hash)); - if (handler) { - handler(Error::DISCARDED_BLOCK); - } - return; - } - - SL_TRACE(self->log_, - "Skip block {} received from {}: " - "it is finalized with block #{}", - BlockInfo(header.number, block.hash), - peer_id, - last_finalized_block.number); - continue; - } - SL_TRACE(self->log_, - "Skip block {} received from {}: " - "it is below the last finalized block #{}", - BlockInfo(header.number, block.hash), - peer_id, - last_finalized_block.number); - continue; - } + SL_TRACE(self->log_, + "Skip block {} received from {}: " + "it is finalized with block #{}", + BlockInfo(header.number, block.hash), + peer_id, + last_finalized_block.number); + continue; + } - // Check if block is not discarded - if (last_finalized_block.number + 1 == header.number) { - if (last_finalized_block.hash != header.parent_hash) { - SL_ERROR(self->log_, - "Can't complete blocks loading from {} starting from " - "block {}: Received discarded block {}", - peer_id, - from, - BlockInfo(header.number, header.parent_hash)); - if (handler) { - handler(Error::DISCARDED_BLOCK); - } - return; - } + SL_TRACE(self->log_, + "Skip block {} received from {}: " + "it is below the last finalized block #{}", + BlockInfo(header.number, block.hash), + peer_id, + last_finalized_block.number); + continue; + } - // Start to check parents - parent_hash = header.parent_hash; + // Check if block is not discarded + if (last_finalized_block.number + 1 == header.number) { + if (last_finalized_block.hash != header.parent_hash) { + SL_ERROR(self->log_, + "Can't complete blocks loading from {} starting from " + "block {}: Received discarded block {}", + peer_id, + from, + BlockInfo(header.number, header.parent_hash)); + if (handler) { + handler(Error::DISCARDED_BLOCK); } + return; + } - // Check if block is in chain - static const primitives::BlockHash zero_hash; - if (parent_hash != header.parent_hash && parent_hash != zero_hash) { - SL_ERROR(self->log_, - "Can't complete blocks loading from {} starting from " - "block {}: Received block is not descendant of previous", - peer_id, - from); - if (handler) { - handler(Error::WRONG_ORDER); - } - return; - } + // Start to check parents + parent_hash = header.parent_hash; + } + + // Check if block is in chain + static const primitives::BlockHash zero_hash; + if (parent_hash != header.parent_hash && parent_hash != zero_hash) { + SL_ERROR(self->log_, + "Can't complete blocks loading from {} starting from " + "block {}: Received block is not descendant of previous", + peer_id, + from); + if (handler) { + handler(Error::WRONG_ORDER); + } + return; + } - // Calculate and save hash, 'cause it's new received block - primitives::calculateBlockHash(header, *self->hasher_); + // Calculate and save hash, 'cause it's new received block + primitives::calculateBlockHash(header, *self->hasher_); - // Check if hash is valid - if (block.hash != header.hash()) { - SL_ERROR(self->log_, - "Can't complete blocks loading from {} starting from " - "block {}: " - "Received block whose hash does not match the header", - peer_id, - from); - if (handler) { - handler(Error::INVALID_HASH); - } - return; - } + // Check if hash is valid + if (block.hash != header.hash()) { + SL_ERROR(self->log_, + "Can't complete blocks loading from {} starting from " + "block {}: " + "Received block whose hash does not match the header", + peer_id, + from); + if (handler) { + handler(Error::INVALID_HASH); + } + return; + } - last_loaded_block = header.blockInfo(); - - parent_hash = block.hash; - - // Add block in queue and save peer or just add peer for existing - // record - auto it = self->known_blocks_.find(block.hash); - if (it == self->known_blocks_.end()) { - self->known_blocks_.emplace(block.hash, - KnownBlock{ - .data = block, - .peers = {peer_id}, - }); - self->metric_import_queue_length_->set( - self->known_blocks_.size()); - std::unique_lock lock{self->load_blocks_mutex_}; - self->load_blocks_.erase(from); - } else { - it->second.peers.emplace(peer_id); - SL_TRACE(self->log_, - "Skip block {} received from {}: already enqueued", - BlockInfo(header.number, block.hash), - peer_id); - continue; - } + last_loaded_block = header.blockInfo(); + + parent_hash = block.hash; + + // Add block in queue and save peer or just add peer for existing + // record + auto it = self->known_blocks_.find(block.hash); + if (it == self->known_blocks_.end()) { + self->known_blocks_.emplace(block.hash, + KnownBlock{ + .data = block, + .peers = {peer_id}, + }); + self->metric_import_queue_length_->set(self->known_blocks_.size()); + std::unique_lock lock{self->load_blocks_mutex_}; + self->load_blocks_.erase(BlockInfo(header.number, block.hash)); + } else { + it->second.peers.emplace(peer_id); + SL_TRACE(self->log_, + "Skip block {} received from {}: already enqueued", + BlockInfo(header.number, block.hash), + peer_id); + continue; + } - SL_TRACE(self->log_, - "Enqueue block {} received from {}", - BlockInfo(header.number, block.hash), - peer_id); + SL_TRACE(self->log_, + "Enqueue block {} received from {}", + BlockInfo(header.number, block.hash), + peer_id); - self->generations_.emplace(header.blockInfo()); - self->ancestry_.emplace(header.parent_hash, block.hash); + self->generations_.emplace(header.blockInfo()); + self->ancestry_.emplace(header.parent_hash, block.hash); - some_blocks_added = true; - } + some_blocks_added = true; + } - SL_TRACE(self->log_, "Block loading is finished"); - if (handler) { - handler(last_loaded_block); - } + SL_TRACE(self->log_, "Block loading is finished"); + if (handler) { + handler(last_loaded_block); + } - if (some_blocks_added) { - SL_TRACE(self->log_, "Enqueued some new blocks: schedule applying"); - self->scheduler_->schedule([wp] { - if (auto self = wp.lock()) { - self->applyNextBlock(); - } - }); + if (some_blocks_added) { + SL_TRACE(self->log_, "Enqueued some new blocks: schedule applying"); + self->scheduler_->schedule([wp] { + if (auto self = wp.lock()) { + self->applyNextBlock(); } - }; + }); + } + }; fetch(peer_id, std::move(request), From 6164c3bd470fbc9142ee77a4e8a530632507c633 Mon Sep 17 00:00:00 2001 From: Boris Erakhtin Date: Mon, 6 Jan 2025 11:25:54 +0500 Subject: [PATCH 15/20] Only first block of a pack checked in load_blocks --- core/network/impl/synchronizer_impl.cpp | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/core/network/impl/synchronizer_impl.cpp b/core/network/impl/synchronizer_impl.cpp index 5c6f2966b5..69cdf004d1 100644 --- a/core/network/impl/synchronizer_impl.cpp +++ b/core/network/impl/synchronizer_impl.cpp @@ -669,8 +669,9 @@ namespace kagome::network { bool some_blocks_added = false; primitives::BlockInfo last_loaded_block; - - for (auto &block : blocks) { + for (size_t i = 0; i < blocks.size(); ++i) { + const auto &block = blocks + [i]; // NOLINT(cppcoreguidelines-pro-bounds-constant-array-index) // Check if header is provided if (not block.header.has_value()) { SL_VERBOSE(self->log_, @@ -684,7 +685,7 @@ namespace kagome::network { return; } auto &header = block.header.value(); - { + if (i == 0) { std::unique_lock lock{self->load_blocks_mutex_}; if (auto it = self->load_blocks_.find(BlockInfo(header.number, block.hash)); @@ -813,8 +814,10 @@ namespace kagome::network { .peers = {peer_id}, }); self->metric_import_queue_length_->set(self->known_blocks_.size()); - std::unique_lock lock{self->load_blocks_mutex_}; - self->load_blocks_.erase(BlockInfo(header.number, block.hash)); + if (i == 0) { + std::unique_lock lock{self->load_blocks_mutex_}; + self->load_blocks_.erase(BlockInfo(header.number, block.hash)); + } } else { it->second.peers.emplace(peer_id); SL_TRACE(self->log_, From 3b621de02ca84249d21aa40e80f286187228ca89 Mon Sep 17 00:00:00 2001 From: Boris Erakhtin Date: Mon, 6 Jan 2025 17:02:39 +0500 Subject: [PATCH 16/20] Better request start res handling --- core/network/impl/synchronizer_impl.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/network/impl/synchronizer_impl.cpp b/core/network/impl/synchronizer_impl.cpp index 69cdf004d1..a92e12c17a 100644 --- a/core/network/impl/synchronizer_impl.cpp +++ b/core/network/impl/synchronizer_impl.cpp @@ -398,9 +398,9 @@ namespace kagome::network { } // Otherwise, is using base way to enqueue - auto res = true; + auto res = false; for (const auto &p_id : selected_peers) { - res &= syncByBlockInfo( + res |= syncByBlockInfo( block_info, p_id, [wp{weak_from_this()}](auto res) { From d533fb5e8a6f087c6ea7410e85c6541d941b1663 Mon Sep 17 00:00:00 2001 From: Boris Erakhtin Date: Wed, 8 Jan 2025 13:07:24 +0500 Subject: [PATCH 17/20] Better iteration over blocks --- core/network/impl/synchronizer_impl.cpp | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/core/network/impl/synchronizer_impl.cpp b/core/network/impl/synchronizer_impl.cpp index a92e12c17a..cb1bcf01cf 100644 --- a/core/network/impl/synchronizer_impl.cpp +++ b/core/network/impl/synchronizer_impl.cpp @@ -669,9 +669,9 @@ namespace kagome::network { bool some_blocks_added = false; primitives::BlockInfo last_loaded_block; - for (size_t i = 0; i < blocks.size(); ++i) { - const auto &block = blocks - [i]; // NOLINT(cppcoreguidelines-pro-bounds-constant-array-index) + auto i = 0; + for (const auto &block : blocks) { + const auto first_block_of_pack = i++ == 0; // Check if header is provided if (not block.header.has_value()) { SL_VERBOSE(self->log_, @@ -685,7 +685,7 @@ namespace kagome::network { return; } auto &header = block.header.value(); - if (i == 0) { + if (first_block_of_pack) { std::unique_lock lock{self->load_blocks_mutex_}; if (auto it = self->load_blocks_.find(BlockInfo(header.number, block.hash)); @@ -814,7 +814,7 @@ namespace kagome::network { .peers = {peer_id}, }); self->metric_import_queue_length_->set(self->known_blocks_.size()); - if (i == 0) { + if (first_block_of_pack) { std::unique_lock lock{self->load_blocks_mutex_}; self->load_blocks_.erase(BlockInfo(header.number, block.hash)); } From 49f3845a6bcdbb38f5df27e51d5cc2145acae67c Mon Sep 17 00:00:00 2001 From: Boris Erakhtin Date: Fri, 10 Jan 2025 12:24:24 +0500 Subject: [PATCH 18/20] Erase load_blocks Logic adjustment --- core/network/impl/synchronizer_impl.cpp | 29 +++++++++++-------------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/core/network/impl/synchronizer_impl.cpp b/core/network/impl/synchronizer_impl.cpp index cb1bcf01cf..a2af48b77f 100644 --- a/core/network/impl/synchronizer_impl.cpp +++ b/core/network/impl/synchronizer_impl.cpp @@ -621,7 +621,18 @@ namespace kagome::network { if (not self) { return; } - + { + std::unique_lock lock{self->load_blocks_mutex_}; + if (auto it = self->load_blocks_.find(from); + it != self->load_blocks_.end()) { + auto &requests_number = it->second; + if (requests_number > 1) { + --requests_number; + } else { + self->load_blocks_.erase(it); + } + } + } // Any error interrupts loading of blocks if (response_res.has_error()) { SL_VERBOSE(self->log_, @@ -685,23 +696,9 @@ namespace kagome::network { return; } auto &header = block.header.value(); - if (first_block_of_pack) { - std::unique_lock lock{self->load_blocks_mutex_}; - if (auto it = - self->load_blocks_.find(BlockInfo(header.number, block.hash)); - it != self->load_blocks_.end()) { - auto &requests_number = it->second; - if (requests_number > 1) { - --requests_number; - } else { - self->load_blocks_.erase(it); - } - } - } // Check if body is provided - if (need_body and block.header->number != 0 - and not block.body.has_value()) { + if (need_body and header.number != 0 and not block.body.has_value()) { SL_VERBOSE(self->log_, "Can't load blocks from {} starting from block {}: " "Received block without body", From 6df59acdb7eb9c3b43e76161b4e46c1d15023981 Mon Sep 17 00:00:00 2001 From: Boris Erakhtin Date: Fri, 10 Jan 2025 13:27:54 +0500 Subject: [PATCH 19/20] better init --- core/network/impl/synchronizer_impl.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/network/impl/synchronizer_impl.cpp b/core/network/impl/synchronizer_impl.cpp index a2af48b77f..2c53b58a3b 100644 --- a/core/network/impl/synchronizer_impl.cpp +++ b/core/network/impl/synchronizer_impl.cpp @@ -123,7 +123,9 @@ namespace kagome::network { chain_sub_engine_(std::move(chain_sub_engine)), main_pool_handler_{ poolHandlerReadyMake(app_state_manager, main_thread_pool)}, - block_storage_{std::move(block_storage)} { + block_storage_{std::move(block_storage)}, + max_parallel_downloads_{app_config.maxParallelDownloads()}, + random_gen_{std::random_device{}()} { BOOST_ASSERT(block_tree_); BOOST_ASSERT(block_executor_); BOOST_ASSERT(trie_node_db_); @@ -138,8 +140,6 @@ namespace kagome::network { BOOST_ASSERT(block_storage_); sync_method_ = app_config.syncMethod(); - max_parallel_downloads_ = app_config.maxParallelDownloads(); - random_gen_ = std::mt19937(std::random_device{}()); // Register metrics metrics_registry_->registerGaugeFamily( From 9337b285ad107e4a6b74ca537d73f817f0851ae4 Mon Sep 17 00:00:00 2001 From: Boris Erakhtin Date: Fri, 10 Jan 2025 17:16:02 +0500 Subject: [PATCH 20/20] Logging load blocks bad result --- core/network/impl/synchronizer_impl.cpp | 26 ++++++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/core/network/impl/synchronizer_impl.cpp b/core/network/impl/synchronizer_impl.cpp index 2c53b58a3b..5b601d4089 100644 --- a/core/network/impl/synchronizer_impl.cpp +++ b/core/network/impl/synchronizer_impl.cpp @@ -388,11 +388,27 @@ namespace kagome::network { if (parent_is_known) { for (const auto &p_id : selected_peers) { - loadBlocks(p_id, block_info, [wp{weak_from_this()}](auto res) { - if (auto self = wp.lock()) { - SL_TRACE(self->log_, "Block(s) enqueued to apply by announce"); - } - }); + loadBlocks( + p_id, + block_info, + [wp{weak_from_this()}, p_id, block_info](auto res) { + if (auto self = wp.lock()) { + if (res.has_error()) { + SL_DEBUG( + self->log_, + "Can't load blocks starting from {} from peer {}: {}", + block_info, + p_id, + res.error()); + } else { + SL_TRACE(self->log_, + "Block(s) from {} enqueued to load by announce from " + "peer {}", + block_info, + p_id); + } + } + }); } return true; }