From 1829741b496191caa9eba74887c0641f8516d41d Mon Sep 17 00:00:00 2001 From: canepat <16927169+canepat@users.noreply.github.com> Date: Sun, 25 Jun 2023 20:49:22 +0200 Subject: [PATCH] sync: move block when passing new target to block exchange (#1286) --- silkworm/sync/block_exchange.cpp | 10 +++--- silkworm/sync/block_exchange.hpp | 2 +- silkworm/sync/sync_pos.cpp | 55 +++++++++++++++++++++----------- 3 files changed, 42 insertions(+), 25 deletions(-) diff --git a/silkworm/sync/block_exchange.cpp b/silkworm/sync/block_exchange.cpp index 88fd48c16d..f6cef36714 100644 --- a/silkworm/sync/block_exchange.cpp +++ b/silkworm/sync/block_exchange.cpp @@ -42,7 +42,7 @@ BlockExchange::BlockExchange(SentryClient& sentry, const db::ROAccess& dba, cons } BlockExchange::~BlockExchange() { - stop(); + BlockExchange::stop(); } const ChainConfig& BlockExchange::chain_config() const { return chain_config_; } @@ -284,11 +284,11 @@ void BlockExchange::stop_downloading() { downloading_active_ = false; } -void BlockExchange::new_target_block(const Block& block) { +void BlockExchange::new_target_block(std::shared_ptr block) { auto message = std::make_shared>( - [=](HeaderChain& hc, BodySequence& bc) { - hc.add_header(block.header, std::chrono::system_clock::now()); - bc.accept_new_block(block, no_peer); + [block = std::move(block)](HeaderChain& hc, BodySequence& bc) { + hc.add_header(block->header, std::chrono::system_clock::now()); + bc.accept_new_block(*block, no_peer); }); accept(message); diff --git a/silkworm/sync/block_exchange.hpp b/silkworm/sync/block_exchange.hpp index c3f7f946de..1c49ce54df 100644 --- a/silkworm/sync/block_exchange.hpp +++ b/silkworm/sync/block_exchange.hpp @@ -48,7 +48,7 @@ class BlockExchange final : public ActiveComponent { }; void download_blocks(BlockNum current_height, Target_Tracking); // start downloading blocks from current_height - void new_target_block(const Block&); // set a new target block to download, to use with Target_Tracking::kByNewPayloads + void new_target_block(std::shared_ptr block); // set a new target block to download, to use with Target_Tracking::kByNewPayloads void stop_downloading(); // stop downloading blocks diff --git a/silkworm/sync/sync_pos.cpp b/silkworm/sync/sync_pos.cpp index 78b96e902a..22666df246 100644 --- a/silkworm/sync/sync_pos.cpp +++ b/silkworm/sync/sync_pos.cpp @@ -71,7 +71,7 @@ awaitable PoSSync::download_blocks() { StopWatch timing(StopWatch::kStart); RepeatedMeasure downloaded_headers(initial_block_progress); - log::Info("Sync") << "Waiting for blocks... from=" << initial_block_progress; + log::Info() << "[PoSSync] Waiting for blocks... from=" << initial_block_progress; asio::steady_timer timer(executor); @@ -97,16 +97,12 @@ awaitable PoSSync::download_blocks() { co_await exec_engine_.insert_blocks(to_plain_blocks(blocks)); downloaded_headers.set(block_progress); - log::Info("Sync") << "Downloading progress: +" << downloaded_headers.delta() << " blocks downloaded, " - << downloaded_headers.high_res_throughput() << " headers/secs" - << ", last=" << downloaded_headers.get() - << ", head=" << chain_fork_view_.head_height() - << ", lap.duration=" << StopWatch::format(timing.since_start()); - }; - - block_exchange_.stop_downloading(); - - log::Warning("Sync") << "PoS sync block downloading stopped"; + log::Info() << "[PoSSync] Downloading progress: +" << downloaded_headers.delta() << " blocks downloaded, " + << downloaded_headers.high_res_throughput() << " headers/secs" + << ", last=" << downloaded_headers.get() + << ", head=" << chain_fork_view_.head_height() + << ", lap.duration=" << StopWatch::format(timing.since_start()); + } } // Convert an ExecutionPayload to a Block as per Engine API spec @@ -190,6 +186,7 @@ auto PoSSync::new_payload(const rpc::ExecutionPayload& payload) -> asio::awaitab Hash block_hash = block->header.hash(); if (payload.block_hash != block_hash) co_return rpc::PayloadStatus::InvalidBlockHash; + log::Info() << "[PoSSync] new_payload block_hash=" << block_hash << " block_number: " << block->header.number; auto [valid, last_valid] = has_valid_ancestor(block_hash); if (!valid) co_return rpc::PayloadStatus{rpc::PayloadStatus::kInvalid, last_valid, "bad ancestor"}; @@ -200,10 +197,12 @@ auto PoSSync::new_payload(const rpc::ExecutionPayload& payload) -> asio::awaitab // if not found, try to get it from the execution engine auto parent = co_await exec_engine_.get_header(block->header.number - 1, block->header.parent_hash); if (!parent) { + log::Trace() << "[PoSSync] new_payload parent=" << to_hex(block->header.parent_hash) << " NOT found, extend the chain"; // send payload to the block exchange to extend the chain up to it - block_exchange_.new_target_block(*block); + block_exchange_.new_target_block(std::move(block)); co_return rpc::PayloadStatus::Syncing; } + log::Trace() << "[PoSSync] new_payload parent=" << to_hex(block->header.parent_hash) << " found, add to chain fork"; // if found, add it to the chain_fork_view_ and calc total difficulty parent_td = co_await exec_engine_.get_header_td(block->header.parent_hash, block->header.number - 1); chain_fork_view_.add(*parent, *parent_td); @@ -218,39 +217,45 @@ auto PoSSync::new_payload(const rpc::ExecutionPayload& payload) -> asio::awaitab std::vector> blocks{block}; co_await exec_engine_.insert_blocks(blocks); // auto inserted = co_await exec_engine_.insert_block(block); this is not working due to proto interface limitations - auto inserted = co_await exec_engine_.get_block_num(block_hash); + const auto inserted = co_await exec_engine_.get_block_num(block_hash); if (!inserted) { co_return rpc::PayloadStatus::Accepted; } + log::Trace() << "[PoSSync] new_payload block_number=" << *inserted << " inserted"; // NOTE: from here the method execution can be cancelled auto verification = co_await exec_engine_.validate_chain(block_hash); if (std::holds_alternative(verification)) { // VALID + log::Info() << "[PoSSync] new_payload VALID current_head=" << std::get(verification).current_head; co_return rpc::PayloadStatus{.status = rpc::PayloadStatus::kValid, .latest_valid_hash = block_hash}; } else if (std::holds_alternative(verification)) { // INVALID - auto invalid_chain = std::get(verification); + const auto invalid_chain = std::get(verification); // auto latest_valid_height = sync_wait(in(exec_engine_), exec_engine_.get_block_num(invalid_chain.latest_valid_head)); auto unwind_point_td = chain_fork_view_.get_total_difficulty(invalid_chain.latest_valid_head); Hash latest_valid_hash = unwind_point_td < terminal_total_difficulty ? kZeroHash : invalid_chain.latest_valid_head; + log::Info() << "[PoSSync] new_payload INVALID latest_valid_hash=" << latest_valid_hash; co_return rpc::PayloadStatus{.status = rpc::PayloadStatus::kInvalid, .latest_valid_hash = latest_valid_hash}; } else { // ERROR + const auto validation_error = std::get(verification); + log::Info() << "[PoSSync] new_payload INVALID latest_valid_hash=" << validation_error.latest_valid_head + << " missing_block=" << validation_error.missing_block; co_return rpc::PayloadStatus{rpc::PayloadStatus::kInvalid, no_latest_valid_hash, "unknown execution error"}; } } catch (const PayloadValidationError& e) { - log::Error("Sync") << "Payload validation error: " << e.what(); + log::Info() << "[PoSSync] new_payload payload validation error: " << e.what(); co_return rpc::PayloadStatus{rpc::PayloadStatus::kInvalid, no_latest_valid_hash, e.what()}; } catch (const boost::system::system_error& e) { - log::Error("Sync") << "Error processing payload: " << e.what(); + log::Error() << "PoSSync: error processing payload: " << e.what(); throw; } catch (const std::exception& e) { - log::Error("Sync") << "Unexpected error processing payload: " << e.what(); + log::Error() << "PoSSync: unexpected error processing payload: " << e.what(); throw; } } @@ -267,6 +272,8 @@ auto PoSSync::fork_choice_update(const rpc::ForkChoiceState& state, if (!state.head_block_hash) { co_return rpc::ForkChoiceUpdatedReply{{rpc::PayloadStatus::kInvalid, no_latest_valid_hash, "invalid head block hash"}, no_payload_id}; } + log::Info() << "[PoSSync] fork_choice_update head_block_hash=" << to_hex(state.head_block_hash) + << " safe_block_hash=" << to_hex(state.safe_block_hash) << " finalized_block_hash=" << to_hex(state.finalized_block_hash); Hash head_header_hash = state.head_block_hash; auto head_header = co_await exec_engine_.get_header(head_header_hash); // todo: decide whether to use chain_fork_view_ cache instead @@ -274,6 +281,7 @@ auto PoSSync::fork_choice_update(const rpc::ForkChoiceState& state, auto [valid, last_valid] = has_valid_ancestor(head_header_hash); if (!valid) co_return rpc::ForkChoiceUpdatedReply{{rpc::PayloadStatus::kInvalid, last_valid, "bad ancestor"}, no_payload_id}; + log::Info() << "[PoSSync] fork_choice_update head header not found => SYNCING"; // send payload to the block exchange to extend the chain up to it // block_exchange_.new_target_block(head_header_hash); // todo: implement this! co_return rpc::ForkChoiceUpdatedReply{rpc::PayloadStatus::Syncing, no_payload_id}; @@ -283,10 +291,13 @@ auto PoSSync::fork_choice_update(const rpc::ForkChoiceState& state, auto parent = co_await exec_engine_.get_header(head_header->parent_hash); // todo: decide whether to use chain_fork_view_ cache instead if (!parent) { + log::Info() << "[PoSSync] fork_choice_update parent header not found => SYNCING"; co_return rpc::ForkChoiceUpdatedReply{rpc::PayloadStatus::Syncing, no_payload_id}; } auto parent_td = chain_fork_view_.get_total_difficulty(head_header->number - 1, head_header->parent_hash); if (!parent_td) { + log::Info() << "[PoSSync] fork_choice_update TD not found for parent block number=" << (head_header->number - 1) + << " hash=" << to_hex(head_header->parent_hash) << " => SYNCING"; co_return rpc::ForkChoiceUpdatedReply{rpc::PayloadStatus::Syncing, no_payload_id}; } @@ -302,9 +313,13 @@ auto PoSSync::fork_choice_update(const rpc::ForkChoiceState& state, Hash latest_valid_hash = unwind_point_td < terminal_total_difficulty ? kZeroHash : invalid_chain.latest_valid_head; + log::Info() << "[PoSSync] fork_choice_update INVALID latest_valid_hash=" << latest_valid_hash; co_return rpc::ForkChoiceUpdatedReply{{rpc::PayloadStatus::kInvalid, latest_valid_hash}, no_payload_id}; } else if (!std::holds_alternative(verification)) { // ERROR + const auto validation_error = std::get(verification); + log::Info() << "[PoSSync] fork_choice_update INVALID latest_valid_hash=" << validation_error.latest_valid_head + << " missing_block=" << validation_error.missing_block; co_return rpc::ForkChoiceUpdatedReply{{rpc::PayloadStatus::kInvalid, no_latest_valid_hash, "unknown execution error"}, no_payload_id}; } @@ -315,6 +330,8 @@ auto PoSSync::fork_choice_update(const rpc::ForkChoiceState& state, state.finalized_block_hash != kZeroHash ? std::optional{state.finalized_block_hash} : std::nullopt; auto application = co_await exec_engine_.update_fork_choice(state.head_block_hash, finalized_block_hash); + log::Info() << "[PoSSync] fork_choice_update " << (application.success ? "OK" : "KO") + << " current_head=" << application.current_head << " current_height=" << application.current_height; if (!application.success) { // at the moment application doesn't carry information to disambiguate between invalid head and // finalized_block_hash not found, so we need additional calls: @@ -345,10 +362,10 @@ auto PoSSync::fork_choice_update(const rpc::ForkChoiceState& state, co_return rpc::ForkChoiceUpdatedReply{{rpc::PayloadStatus::kValid, state.head_block_hash}, buildProcessId}; } catch (const boost::system::system_error& e) { - log::Error("Sync") << "Error processing fork-choice: " << e.what(); + log::Error() << "PoSSync: error processing fork-choice: " << e.what(); throw; } catch (const std::exception& e) { - log::Error("Sync") << "Unexpected error processing fork-choice: " << e.what(); + log::Error() << "PoSSync: unexpected error processing fork-choice: " << e.what(); throw; } }