diff --git a/core/network/impl/synchronizer_impl.cpp b/core/network/impl/synchronizer_impl.cpp index 236355c4d6..030eb2f8e7 100644 --- a/core/network/impl/synchronizer_impl.cpp +++ b/core/network/impl/synchronizer_impl.cpp @@ -87,8 +87,6 @@ namespace { namespace kagome::network { - static constexpr auto default_max_peers_for_block_request = 5; - SynchronizerImpl::SynchronizerImpl( const application::AppConfiguration &app_config, application::AppStateManager &app_state_manager, @@ -125,8 +123,7 @@ namespace kagome::network { chain_sub_engine_(std::move(chain_sub_engine)), main_pool_handler_{ poolHandlerReadyMake(app_state_manager, main_thread_pool)}, - block_storage_{std::move(block_storage)}, - best_block_number_{} { + block_storage_{std::move(block_storage)} { BOOST_ASSERT(block_tree_); BOOST_ASSERT(block_executor_); BOOST_ASSERT(trie_node_db_); @@ -357,8 +354,6 @@ namespace kagome::network { known_blocks_.find(header.parent_hash) != known_blocks_.end() or block_tree_->has(header.parent_hash); - best_block_number_ = std::max(best_block_number_, header.number); - if (parent_is_known) { loadBlocks(peer_id, block_info, [wp{weak_from_this()}](auto res) { if (auto self = wp.lock()) { @@ -1163,19 +1158,32 @@ namespace kagome::network { continue; } - for (auto p_it = peers.begin(); p_it != peers.end();) { - auto cp_it = p_it++; - - auto &peer_id = *cp_it; - - if (busy_peers_.find(peer_id) != busy_peers_.end()) { - SL_TRACE(log_, "Peer {} for block {} is busy", peer_id, block_info); - continue; + std::vector active_peers; + for (auto it = peers.begin(); it != peers.end(); ++it) { + const auto &peer_id = *it; + if (busy_peers_.find(peer_id) == busy_peers_.end()) { + active_peers.push_back(peer_id); } - - busy_peers_.insert(peers.extract(cp_it)); - SL_TRACE(log_, "Peer {} marked as busy", peer_id); - + } + std::random_device rd; + std::mt19937 gen(rd()); + std::ranges::shuffle(active_peers, gen); + while (active_peers.size() + > static_cast(max_parallel_downloads_)) { + active_peers.pop_back(); + } + SL_INFO(log_, + "Active peers number is {} for block {}", + active_peers.size(), + block_info); + for (auto it = active_peers.begin(); it != active_peers.end(); ++it) { + SL_INFO(log_, + "Ask portion of blocks from peer {} for block {}", + *it, + block_info); + auto &peer_id = *it; + busy_peers_.emplace(peer_id); + peers.erase(peer_id); auto handler = [wp{weak_from_this()}, peer_id](const auto &res) { if (auto self = wp.lock()) { if (self->busy_peers_.erase(peer_id) > 0) { @@ -1199,43 +1207,6 @@ namespace kagome::network { } } }; - // TODO(ErakhtinB): #2326, review peer manager - // remove active_peers filling mechanism over peer states, when peer - // manager methods are implemented correctly - std::vector active_peers; - peer_manager_->enumeratePeerState( - [wp{weak_from_this()}, &active_peers, &peer_id]( - const libp2p::peer::PeerId &peer, network::PeerState &state) { - if (auto self = wp.lock()) { - if (self->best_block_number_ <= state.best_block.number - and peer != peer_id) { - active_peers.push_back(peer); - } - } - return true; - }); - std::vector selected_peers; - selected_peers.push_back(peer_id); - static const auto number_of_peers_to_add = - max_parallel_downloads_ ? max_parallel_downloads_ - 1 : 0; - if (const auto active_peers_size = active_peers.size(); - active_peers_size <= number_of_peers_to_add) { - for (const auto &p_id : active_peers) { - selected_peers.push_back(p_id); - } - } else { - std::vector indices; - indices.reserve(active_peers_size); - for (uint32_t i = 0; i < active_peers_size; ++i) { - indices.push_back(i); - } - std::random_device rd; - std::mt19937 gen(rd()); - std::ranges::shuffle(indices, gen); - for (uint32_t i = 0; i < number_of_peers_to_add; ++i) { - selected_peers.push_back(active_peers[indices[i]]); - } - } if (sync_method_ == application::SyncMethod::Full) { auto lower = generations_.begin()->number; auto upper = generations_.rbegin()->number + 1; @@ -1252,10 +1223,7 @@ namespace kagome::network { lower, upper, hint, - [wp{weak_from_this()}, - peer_id, - handler = std::move(handler), - selected_peers = std::move(selected_peers)]( + [wp{weak_from_this()}, peer_id, handler = std::move(handler)]( outcome::result res) { if (auto self = wp.lock()) { if (not res.has_value()) { @@ -1267,18 +1235,24 @@ namespace kagome::network { return; } auto &common_block_info = res.value(); - for (const auto &p_id : selected_peers) { - self->loadBlocks( - p_id, common_block_info, std::move(handler)); - } + SL_INFO(self->log_, + "Start to load next portion of blocks from {} " + "beginning from {}", + peer_id, + common_block_info); + self->loadBlocks( + peer_id, common_block_info, std::move(handler)); } }); } else { - for (const auto &p_id : selected_peers) { - loadBlocks(p_id, block_info, std::move(handler)); - } + SL_INFO( + log_, + "Start to load next portion of blocks from {} beginning from {}" + "block {}", + peer_id, + block_info); + loadBlocks(peer_id, block_info, std::move(handler)); } - return; } SL_TRACE(log_, diff --git a/core/network/impl/synchronizer_impl.hpp b/core/network/impl/synchronizer_impl.hpp index 65664160f0..daa610b9b3 100644 --- a/core/network/impl/synchronizer_impl.hpp +++ b/core/network/impl/synchronizer_impl.hpp @@ -261,7 +261,6 @@ namespace kagome::network { primitives::events::ChainSubscriptionEnginePtr chain_sub_engine_; std::shared_ptr main_pool_handler_; std::shared_ptr block_storage_; - primitives::BlockNumber best_block_number_; uint32_t max_parallel_downloads_;