Skip to content

Commit

Permalink
node: fix header numbers update after snapshot sync (#1311)
Browse files Browse the repository at this point in the history
  • Loading branch information
canepat authored Jul 3, 2023
1 parent 1c09f16 commit 41d4f58
Showing 1 changed file with 36 additions and 36 deletions.
72 changes: 36 additions & 36 deletions silkworm/node/snapshot/sync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ SnapshotSync::~SnapshotSync() {

bool SnapshotSync::download_and_index_snapshots(db::RWTxn& txn) {
if (!settings_.enabled) {
log::Info() << "[Snapshots] snapshot sync disabled, no snapshot must be downloaded";
SILK_INFO << "SnapshotSync: snapshot sync disabled, no snapshot must be downloaded";
return true;
}

log::Info() << "[Snapshots] snapshot repository: " << settings_.repository_dir.string();
SILK_INFO << "SnapshotSync: snapshot repository: " << settings_.repository_dir.string();

if (settings_.no_downloader) {
reopen();
Expand All @@ -67,18 +67,18 @@ bool SnapshotSync::download_and_index_snapshots(db::RWTxn& txn) {

db::write_snapshots(txn, snapshot_file_names);

log::Info() << "[Snapshots] file names saved into db" << log::Args{"count", std::to_string(snapshot_file_names.size())};
SILK_INFO << "SnapshotSync: file names saved into db count=" << std::to_string(snapshot_file_names.size());

index_snapshots();

const auto max_block_available = repository_->max_block_available();
log::Info() << "[Snapshots] max block available: " << max_block_available
<< " (segment max block: " << repository_->segment_max_block()
<< ", idx max block: " << repository_->idx_max_block() << ")";
SILK_INFO << "SnapshotSync: max block available: " << max_block_available
<< " (segment max block: " << repository_->segment_max_block()
<< ", idx max block: " << repository_->idx_max_block() << ")";

const auto snapshot_config = snapshot::Config::lookup_known_config(config_.chain_id, snapshot_file_names);
const auto configured_max_block_number = snapshot_config->max_block_number();
log::Info() << "[Snapshots] configured max block: " << configured_max_block_number;
SILK_INFO << "SnapshotSync: configured max block: " << configured_max_block_number;

// Update chain and stage progresses in database according to available snapshots
update_database(txn, max_block_available);
Expand All @@ -88,15 +88,14 @@ bool SnapshotSync::download_and_index_snapshots(db::RWTxn& txn) {

void SnapshotSync::reopen() {
repository_->reopen_folder();
log::Info() << "[Snapshots] reopen completed"
<< log::Args{"segment_max_block", std::to_string(repository_->segment_max_block()),
"idx_max_block", std::to_string(repository_->idx_max_block())};
SILK_INFO << "SnapshotSync: reopen completed segment_max_block=" << std::to_string(repository_->segment_max_block())
<< " idx_max_block=" << std::to_string(repository_->idx_max_block());
}

bool SnapshotSync::download_snapshots(const std::vector<std::string>& snapshot_file_names) {
const auto missing_block_ranges = repository_->missing_block_ranges();
if (not missing_block_ranges.empty()) {
log::Warning() << "[Snapshots] downloading missing snapshots";
SILK_WARN << "SnapshotSync: downloading missing snapshots";
}

for (auto [block_from, block_to] : missing_block_ranges) {
Expand All @@ -105,24 +104,24 @@ bool SnapshotSync::download_snapshots(const std::vector<std::string>& snapshot_f
if (!snapshot_path.torrent_file_needed()) {
continue;
}
log::Info() << "[Snapshots] seeding a new snapshot [DOING NOTHING NOW]";
SILK_INFO << "SnapshotSync: seeding a new snapshot [DOING NOTHING NOW]";
// TODO(canepat) create torrent file
// client.add_torrent(snapshot_path.create_torrent_file());
}
}

const auto snapshot_config = snapshot::Config::lookup_known_config(config_.chain_id, snapshot_file_names);
if (snapshot_config->preverified_snapshots().empty()) {
log::Error() << "[Snapshots] no preverified snapshots found";
SILK_ERROR << "SnapshotSync: no preverified snapshots found";
return false;
}
for (const auto& preverified_snapshot : snapshot_config->preverified_snapshots()) {
log::Info() << "[Snapshots] adding info hash for preverified: " << preverified_snapshot.file_name;
SILK_TRACE << "SnapshotSync: adding info hash for preverified: " << preverified_snapshot.file_name;
client_.add_info_hash(preverified_snapshot.file_name, preverified_snapshot.torrent_hash);
}

auto log_added = [](const std::filesystem::path& snapshot_file) {
log::Info() << "[Snapshots] download started for: " << snapshot_file.filename().string();
SILK_INFO << "SnapshotSync: download started for: " << snapshot_file.filename().string();
};
client_.added_subscription.connect(log_added);

Expand All @@ -135,18 +134,18 @@ bool SnapshotSync::download_snapshots(const std::vector<std::string>& snapshot_f
counters_dump.append(std::to_string(counters[i]));
if (i != counters.size() - 1) counters_dump.append(", ");
}
log::Debug() << "[Snapshots] download progress: [" << counters_dump << "]";
SILK_TRACE << "SnapshotSync: download progress: [" << counters_dump << "]";
};
client_.stats_subscription.connect(log_stats);

const auto num_snapshots{std::ptrdiff_t(snapshot_config->preverified_snapshots().size())};
log::Info() << "[Snapshots] sync started: [0/" << num_snapshots << "]";
SILK_INFO << "SnapshotSync: sync started: [0/" << num_snapshots << "]";

std::latch download_done{num_snapshots};
auto log_completed = [&](const std::filesystem::path& snapshot_file) {
static int completed{0};
log::Info() << "[Snapshots] download completed for: " << snapshot_file.filename().string()
<< " [" << ++completed << "/" << num_snapshots << "]";
SILK_INFO << "SnapshotSync: download completed for: " << snapshot_file.filename().string()
<< " [" << ++completed << "/" << num_snapshots << "]";
download_done.count_down();
};
client_.completed_subscription.connect(log_completed);
Expand All @@ -161,21 +160,21 @@ bool SnapshotSync::download_snapshots(const std::vector<std::string>& snapshot_f
std::this_thread::sleep_for(kCheckCompletionInterval);
}

log::Info() << "[Snapshots] sync completed: [" << num_snapshots << "/" << num_snapshots << "]";
SILK_INFO << "SnapshotSync: sync completed: [" << num_snapshots << "/" << num_snapshots << "]";

reopen();
return true;
}

void SnapshotSync::index_snapshots() {
if (!settings_.enabled) {
log::Info() << "[Snapshots] snapshot sync disabled, no index must be created";
SILK_INFO << "SnapshotSync: snapshot sync disabled, no index must be created";
return;
}

// Build any missing snapshot index if needed, then reopen
if (repository_->idx_max_block() < repository_->segment_max_block()) {
log::Info() << "[Snapshots] missing indexes detected, rebuild started";
SILK_INFO << "SnapshotSync: missing indexes detected, rebuild started";
build_missing_indexes();
reopen();
}
Expand All @@ -197,9 +196,9 @@ void SnapshotSync::build_missing_indexes() {
const auto missing_indexes = repository_->missing_indexes();
for (const auto& index : missing_indexes) {
workers.push_task([=]() {
log::Info() << "[Snapshots] Build index: " << index->path().filename() << " start";
SILK_INFO << "SnapshotSync: build index: " << index->path().filename() << " start";
index->build();
log::Info() << "[Snapshots] Build index: " << index->path().filename() << " end";
SILK_INFO << "SnapshotSync: build index: " << index->path().filename() << " end";
});
}

Expand All @@ -226,14 +225,14 @@ void SnapshotSync::update_block_headers(db::RWTxn& txn, BlockNum max_block_avail
return;
}

log::Info() << "[Snapshots] Database update started";
SILK_INFO << "SnapshotSync: database update started";

// Iterate on block header snapshots and write header-related tables
etl::Collector hash2bn_collector{};
intx::uint256 total_difficulty{0};
uint64_t block_count{0};
repository_->for_each_header([&](const BlockHeader* header) -> bool {
log::Debug() << "[Snapshots] Header number: " << header->number << " hash: " << to_hex(header->hash());
SILK_TRACE << "SnapshotSync: header number=" << header->number << " hash=" << to_hex(header->hash());
const auto block_number = header->number;
if (block_number > max_block_available) return true;

Expand All @@ -247,31 +246,32 @@ void SnapshotSync::update_block_headers(db::RWTxn& txn, BlockNum max_block_avail
db::write_canonical_hash(txn, block_number, block_hash);

// Collect entries for later loading kHeaderNumbers table
Bytes block_hash_bytes{block_hash.bytes, kHashLength};
Bytes encoded_block_number{sizeof(uint64_t), '\0'};
endian::store_big_u64(encoded_block_number.data(), block_number);
hash2bn_collector.collect({block_hash.bytes, encoded_block_number});
hash2bn_collector.collect({std::move(block_hash_bytes), std::move(encoded_block_number)});

if (++block_count % 1'000'000 == 0) {
log::Info() << "[Snapshots] Processing block header: " << block_number << " count=" << block_count;
SILK_INFO << "SnapshotSync: processing block header=" << block_number << " count=" << block_count;
if (is_stopping()) return false;
}

return true;
});
db::PooledCursor header_numbers_cursor{txn, db::table::kHeaderNumbers};
hash2bn_collector.load(header_numbers_cursor);
log::Info() << "[Snapshots] Database table HeaderNumbers updated";
SILK_INFO << "SnapshotSync: database table HeaderNumbers updated";

// Update head block header in kHeadHeader table
const auto canonical_hash{db::read_canonical_hash(txn, max_block_available)};
ensure(canonical_hash.has_value(), "SnapshotSync::save no canonical head hash found");
db::write_head_header_hash(txn, *canonical_hash);
log::Info() << "[Snapshots] Database table HeadHeader updated";
SILK_INFO << "SnapshotSync: database table HeadHeader updated";

// Update Headers stage progress to the max block in snapshots
db::stages::write_stage_progress(txn, db::stages::kHeadersKey, max_block_available);

log::Info() << "[Snapshots] Database Headers stage progress updated";
SILK_INFO << "SnapshotSync: database Headers stage progress updated";
}

void SnapshotSync::update_block_bodies(db::RWTxn& txn, BlockNum max_block_available) {
Expand All @@ -283,15 +283,15 @@ void SnapshotSync::update_block_bodies(db::RWTxn& txn, BlockNum max_block_availa

// Reset sequence for kBlockTransactions table
const auto tx_snapshot = repository_->find_tx_segment(max_block_available);
ensure(tx_snapshot, "SnapshotSync::update_block_bodies snapshots max block not found in any snapshot");
ensure(tx_snapshot, "SnapshotSync: snapshots max block not found in any snapshot");
const auto last_tx_id = tx_snapshot->idx_txn_hash()->base_data_id() + tx_snapshot->item_count();
db::reset_map_sequence(txn, db::table::kBlockTransactions.name, last_tx_id + 1);
log::Info() << "[Snapshots] Database table BlockTransactions sequence reset";
SILK_INFO << "SnapshotSync: database table BlockTransactions sequence reset";

// Update BlockBodies stage progress to the max block in snapshots
db::stages::write_stage_progress(txn, db::stages::kBlockBodiesKey, max_block_available);

log::Info() << "[Snapshots] Database BlockBodies stage progress updated";
SILK_INFO << "SnapshotSync: database BlockBodies stage progress updated";
}

void SnapshotSync::update_block_hashes(db::RWTxn& txn, BlockNum max_block_available) {
Expand All @@ -304,7 +304,7 @@ void SnapshotSync::update_block_hashes(db::RWTxn& txn, BlockNum max_block_availa
// Update BlockHashes stage progress to the max block in snapshots
db::stages::write_stage_progress(txn, db::stages::kBlockHashesKey, max_block_available);

log::Info() << "[Snapshots] Database BlockHashes stage progress updated";
SILK_INFO << "SnapshotSync: database BlockHashes stage progress updated";
}

void SnapshotSync::update_block_senders(db::RWTxn& txn, BlockNum max_block_available) {
Expand All @@ -317,7 +317,7 @@ void SnapshotSync::update_block_senders(db::RWTxn& txn, BlockNum max_block_avail
// Update Senders stage progress to the max block in snapshots
db::stages::write_stage_progress(txn, db::stages::kSendersKey, max_block_available);

log::Info() << "[Snapshots] Database Senders stage progress updated";
SILK_INFO << "SnapshotSync: database Senders stage progress updated";
}

} // namespace silkworm::snapshot

0 comments on commit 41d4f58

Please sign in to comment.