Skip to content

Commit

Permalink
sync: move block when passing new target to block exchange (#1286)
Browse files Browse the repository at this point in the history
  • Loading branch information
canepat authored Jun 25, 2023
1 parent 2e0c13b commit 1829741
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 25 deletions.
10 changes: 5 additions & 5 deletions silkworm/sync/block_exchange.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ BlockExchange::BlockExchange(SentryClient& sentry, const db::ROAccess& dba, cons
}

BlockExchange::~BlockExchange() {
stop();
BlockExchange::stop();
}

const ChainConfig& BlockExchange::chain_config() const { return chain_config_; }
Expand Down Expand Up @@ -284,11 +284,11 @@ void BlockExchange::stop_downloading() {
downloading_active_ = false;
}

void BlockExchange::new_target_block(const Block& block) {
void BlockExchange::new_target_block(std::shared_ptr<Block> block) {
auto message = std::make_shared<InternalMessage<void>>(
[=](HeaderChain& hc, BodySequence& bc) {
hc.add_header(block.header, std::chrono::system_clock::now());
bc.accept_new_block(block, no_peer);
[block = std::move(block)](HeaderChain& hc, BodySequence& bc) {
hc.add_header(block->header, std::chrono::system_clock::now());
bc.accept_new_block(*block, no_peer);
});

accept(message);
Expand Down
2 changes: 1 addition & 1 deletion silkworm/sync/block_exchange.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class BlockExchange final : public ActiveComponent {
};
void download_blocks(BlockNum current_height, Target_Tracking); // start downloading blocks from current_height

void new_target_block(const Block&); // set a new target block to download, to use with Target_Tracking::kByNewPayloads
void new_target_block(std::shared_ptr<Block> block); // set a new target block to download, to use with Target_Tracking::kByNewPayloads

void stop_downloading(); // stop downloading blocks

Expand Down
55 changes: 36 additions & 19 deletions silkworm/sync/sync_pos.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ awaitable<void> PoSSync::download_blocks() {

StopWatch timing(StopWatch::kStart);
RepeatedMeasure<BlockNum> downloaded_headers(initial_block_progress);
log::Info("Sync") << "Waiting for blocks... from=" << initial_block_progress;
log::Info() << "[PoSSync] Waiting for blocks... from=" << initial_block_progress;

asio::steady_timer timer(executor);

Expand All @@ -97,16 +97,12 @@ awaitable<void> PoSSync::download_blocks() {
co_await exec_engine_.insert_blocks(to_plain_blocks(blocks));

downloaded_headers.set(block_progress);
log::Info("Sync") << "Downloading progress: +" << downloaded_headers.delta() << " blocks downloaded, "
<< downloaded_headers.high_res_throughput<seconds_t>() << " headers/secs"
<< ", last=" << downloaded_headers.get()
<< ", head=" << chain_fork_view_.head_height()
<< ", lap.duration=" << StopWatch::format(timing.since_start());
};

block_exchange_.stop_downloading();

log::Warning("Sync") << "PoS sync block downloading stopped";
log::Info() << "[PoSSync] Downloading progress: +" << downloaded_headers.delta() << " blocks downloaded, "
<< downloaded_headers.high_res_throughput<seconds_t>() << " headers/secs"
<< ", last=" << downloaded_headers.get()
<< ", head=" << chain_fork_view_.head_height()
<< ", lap.duration=" << StopWatch::format(timing.since_start());
}
}

// Convert an ExecutionPayload to a Block as per Engine API spec
Expand Down Expand Up @@ -190,6 +186,7 @@ auto PoSSync::new_payload(const rpc::ExecutionPayload& payload) -> asio::awaitab

Hash block_hash = block->header.hash();
if (payload.block_hash != block_hash) co_return rpc::PayloadStatus::InvalidBlockHash;
log::Info() << "[PoSSync] new_payload block_hash=" << block_hash << " block_number: " << block->header.number;

auto [valid, last_valid] = has_valid_ancestor(block_hash);
if (!valid) co_return rpc::PayloadStatus{rpc::PayloadStatus::kInvalid, last_valid, "bad ancestor"};
Expand All @@ -200,10 +197,12 @@ auto PoSSync::new_payload(const rpc::ExecutionPayload& payload) -> asio::awaitab
// if not found, try to get it from the execution engine
auto parent = co_await exec_engine_.get_header(block->header.number - 1, block->header.parent_hash);
if (!parent) {
log::Trace() << "[PoSSync] new_payload parent=" << to_hex(block->header.parent_hash) << " NOT found, extend the chain";
// send payload to the block exchange to extend the chain up to it
block_exchange_.new_target_block(*block);
block_exchange_.new_target_block(std::move(block));
co_return rpc::PayloadStatus::Syncing;
}
log::Trace() << "[PoSSync] new_payload parent=" << to_hex(block->header.parent_hash) << " found, add to chain fork";
// if found, add it to the chain_fork_view_ and calc total difficulty
parent_td = co_await exec_engine_.get_header_td(block->header.parent_hash, block->header.number - 1);
chain_fork_view_.add(*parent, *parent_td);
Expand All @@ -218,39 +217,45 @@ auto PoSSync::new_payload(const rpc::ExecutionPayload& payload) -> asio::awaitab
std::vector<std::shared_ptr<Block>> blocks{block};
co_await exec_engine_.insert_blocks(blocks);
// auto inserted = co_await exec_engine_.insert_block(block); this is not working due to proto interface limitations
auto inserted = co_await exec_engine_.get_block_num(block_hash);
const auto inserted = co_await exec_engine_.get_block_num(block_hash);
if (!inserted) {
co_return rpc::PayloadStatus::Accepted;
}
log::Trace() << "[PoSSync] new_payload block_number=" << *inserted << " inserted";

// NOTE: from here the method execution can be cancelled
auto verification = co_await exec_engine_.validate_chain(block_hash);

if (std::holds_alternative<ValidChain>(verification)) {
// VALID
log::Info() << "[PoSSync] new_payload VALID current_head=" << std::get<ValidChain>(verification).current_head;
co_return rpc::PayloadStatus{.status = rpc::PayloadStatus::kValid, .latest_valid_hash = block_hash};
} else if (std::holds_alternative<InvalidChain>(verification)) {
// INVALID
auto invalid_chain = std::get<InvalidChain>(verification);
const auto invalid_chain = std::get<InvalidChain>(verification);
// auto latest_valid_height = sync_wait(in(exec_engine_), exec_engine_.get_block_num(invalid_chain.latest_valid_head));
auto unwind_point_td = chain_fork_view_.get_total_difficulty(invalid_chain.latest_valid_head);
Hash latest_valid_hash = unwind_point_td < terminal_total_difficulty
? kZeroHash
: invalid_chain.latest_valid_head;
log::Info() << "[PoSSync] new_payload INVALID latest_valid_hash=" << latest_valid_hash;
co_return rpc::PayloadStatus{.status = rpc::PayloadStatus::kInvalid, .latest_valid_hash = latest_valid_hash};
} else {
// ERROR
const auto validation_error = std::get<ValidationError>(verification);
log::Info() << "[PoSSync] new_payload INVALID latest_valid_hash=" << validation_error.latest_valid_head
<< " missing_block=" << validation_error.missing_block;
co_return rpc::PayloadStatus{rpc::PayloadStatus::kInvalid, no_latest_valid_hash, "unknown execution error"};
}

} catch (const PayloadValidationError& e) {
log::Error("Sync") << "Payload validation error: " << e.what();
log::Info() << "[PoSSync] new_payload payload validation error: " << e.what();
co_return rpc::PayloadStatus{rpc::PayloadStatus::kInvalid, no_latest_valid_hash, e.what()};
} catch (const boost::system::system_error& e) {
log::Error("Sync") << "Error processing payload: " << e.what();
log::Error() << "PoSSync: error processing payload: " << e.what();
throw;
} catch (const std::exception& e) {
log::Error("Sync") << "Unexpected error processing payload: " << e.what();
log::Error() << "PoSSync: unexpected error processing payload: " << e.what();
throw;
}
}
Expand All @@ -267,13 +272,16 @@ auto PoSSync::fork_choice_update(const rpc::ForkChoiceState& state,
if (!state.head_block_hash) {
co_return rpc::ForkChoiceUpdatedReply{{rpc::PayloadStatus::kInvalid, no_latest_valid_hash, "invalid head block hash"}, no_payload_id};
}
log::Info() << "[PoSSync] fork_choice_update head_block_hash=" << to_hex(state.head_block_hash)
<< " safe_block_hash=" << to_hex(state.safe_block_hash) << " finalized_block_hash=" << to_hex(state.finalized_block_hash);

Hash head_header_hash = state.head_block_hash;
auto head_header = co_await exec_engine_.get_header(head_header_hash); // todo: decide whether to use chain_fork_view_ cache instead
if (!head_header) {
auto [valid, last_valid] = has_valid_ancestor(head_header_hash);
if (!valid) co_return rpc::ForkChoiceUpdatedReply{{rpc::PayloadStatus::kInvalid, last_valid, "bad ancestor"}, no_payload_id};

log::Info() << "[PoSSync] fork_choice_update head header not found => SYNCING";
// send payload to the block exchange to extend the chain up to it
// block_exchange_.new_target_block(head_header_hash); // todo: implement this!
co_return rpc::ForkChoiceUpdatedReply{rpc::PayloadStatus::Syncing, no_payload_id};
Expand All @@ -283,10 +291,13 @@ auto PoSSync::fork_choice_update(const rpc::ForkChoiceState& state,

auto parent = co_await exec_engine_.get_header(head_header->parent_hash); // todo: decide whether to use chain_fork_view_ cache instead
if (!parent) {
log::Info() << "[PoSSync] fork_choice_update parent header not found => SYNCING";
co_return rpc::ForkChoiceUpdatedReply{rpc::PayloadStatus::Syncing, no_payload_id};
}
auto parent_td = chain_fork_view_.get_total_difficulty(head_header->number - 1, head_header->parent_hash);
if (!parent_td) {
log::Info() << "[PoSSync] fork_choice_update TD not found for parent block number=" << (head_header->number - 1)
<< " hash=" << to_hex(head_header->parent_hash) << " => SYNCING";
co_return rpc::ForkChoiceUpdatedReply{rpc::PayloadStatus::Syncing, no_payload_id};
}

Expand All @@ -302,9 +313,13 @@ auto PoSSync::fork_choice_update(const rpc::ForkChoiceState& state,
Hash latest_valid_hash = unwind_point_td < terminal_total_difficulty
? kZeroHash
: invalid_chain.latest_valid_head;
log::Info() << "[PoSSync] fork_choice_update INVALID latest_valid_hash=" << latest_valid_hash;
co_return rpc::ForkChoiceUpdatedReply{{rpc::PayloadStatus::kInvalid, latest_valid_hash}, no_payload_id};
} else if (!std::holds_alternative<ValidChain>(verification)) {
// ERROR
const auto validation_error = std::get<ValidationError>(verification);
log::Info() << "[PoSSync] fork_choice_update INVALID latest_valid_hash=" << validation_error.latest_valid_head
<< " missing_block=" << validation_error.missing_block;
co_return rpc::ForkChoiceUpdatedReply{{rpc::PayloadStatus::kInvalid, no_latest_valid_hash, "unknown execution error"}, no_payload_id};
}

Expand All @@ -315,6 +330,8 @@ auto PoSSync::fork_choice_update(const rpc::ForkChoiceState& state,
state.finalized_block_hash != kZeroHash ? std::optional<Hash>{state.finalized_block_hash} : std::nullopt;

auto application = co_await exec_engine_.update_fork_choice(state.head_block_hash, finalized_block_hash);
log::Info() << "[PoSSync] fork_choice_update " << (application.success ? "OK" : "KO")
<< " current_head=" << application.current_head << " current_height=" << application.current_height;
if (!application.success) {
// at the moment application doesn't carry information to disambiguate between invalid head and
// finalized_block_hash not found, so we need additional calls:
Expand Down Expand Up @@ -345,10 +362,10 @@ auto PoSSync::fork_choice_update(const rpc::ForkChoiceState& state,
co_return rpc::ForkChoiceUpdatedReply{{rpc::PayloadStatus::kValid, state.head_block_hash}, buildProcessId};

} catch (const boost::system::system_error& e) {
log::Error("Sync") << "Error processing fork-choice: " << e.what();
log::Error() << "PoSSync: error processing fork-choice: " << e.what();
throw;
} catch (const std::exception& e) {
log::Error("Sync") << "Unexpected error processing fork-choice: " << e.what();
log::Error() << "PoSSync: unexpected error processing fork-choice: " << e.what();
throw;
}
}
Expand Down

0 comments on commit 1829741

Please sign in to comment.