Skip to content

Commit

Permalink
node: update database after snapshot sync (#1284)
Browse files Browse the repository at this point in the history
  • Loading branch information
canepat authored Jun 25, 2023
1 parent 2ca63e8 commit 5c7c607
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 53 deletions.
145 changes: 95 additions & 50 deletions silkworm/node/snapshot/sync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,26 +65,38 @@ bool SnapshotSync::download_and_index_snapshots(db::RWTxn& txn) {
const bool download_completed = download_snapshots(snapshot_file_names);
if (!download_completed) return false;

reopen();

db::write_snapshots(txn, snapshot_file_names);

log::Info() << "[Snapshots] file names saved into db" << log::Args{"count", std::to_string(snapshot_file_names.size())};

return index_snapshots(txn, snapshot_file_names);
index_snapshots();

const auto max_block_available = repository_->max_block_available();
log::Info() << "[Snapshots] max block available: " << max_block_available
<< " (segment max block: " << repository_->segment_max_block()
<< ", idx max block: " << repository_->idx_max_block() << ")";

const auto snapshot_config = snapshot::Config::lookup_known_config(config_.chain_id, snapshot_file_names);
const auto configured_max_block_number = snapshot_config->max_block_number();
log::Info() << "[Snapshots] configured max block: " << configured_max_block_number;

// Update chain and stage progresses in database according to available snapshots
update_database(txn, max_block_available);

return true;
}

void SnapshotSync::reopen() {
repository_->reopen_folder();
log::Info() << "[Snapshots] open_and_verify completed"
log::Info() << "[Snapshots] reopen completed"
<< log::Args{"segment_max_block", std::to_string(repository_->segment_max_block()),
"idx_max_block", std::to_string(repository_->idx_max_block())};
}

bool SnapshotSync::download_snapshots(const std::vector<std::string>& snapshot_file_names) {
const auto missing_block_ranges = repository_->missing_block_ranges();
if (not missing_block_ranges.empty()) {
SILK_WARN << "[Snapshots] downloading missing snapshots";
log::Warning() << "[Snapshots] downloading missing snapshots";
}

for (auto [block_from, block_to] : missing_block_ranges) {
Expand All @@ -93,7 +105,7 @@ bool SnapshotSync::download_snapshots(const std::vector<std::string>& snapshot_f
if (!snapshot_path.torrent_file_needed()) {
continue;
}
SILK_INFO << "[Snapshots] seeding a new snapshot [DOING NOTHING NOW]";
log::Info() << "[Snapshots] seeding a new snapshot [DOING NOTHING NOW]";
// TODO(canepat) create torrent file
// client.add_torrent(snapshot_path.create_torrent_file());
}
Expand All @@ -105,12 +117,12 @@ bool SnapshotSync::download_snapshots(const std::vector<std::string>& snapshot_f
return false;
}
for (const auto& preverified_snapshot : snapshot_config->preverified_snapshots()) {
SILK_INFO << "[Snapshots] adding info hash for preverified: " << preverified_snapshot.file_name;
log::Info() << "[Snapshots] adding info hash for preverified: " << preverified_snapshot.file_name;
client_.add_info_hash(preverified_snapshot.file_name, preverified_snapshot.torrent_hash);
}

auto log_added = [](const std::filesystem::path& snapshot_file) {
SILK_INFO << "[Snapshots] download started for: " << snapshot_file.filename().string();
log::Info() << "[Snapshots] download started for: " << snapshot_file.filename().string();
};
client_.added_subscription.connect(log_added);

Expand All @@ -123,18 +135,18 @@ bool SnapshotSync::download_snapshots(const std::vector<std::string>& snapshot_f
counters_dump.append(std::to_string(counters[i]));
if (i != counters.size() - 1) counters_dump.append(", ");
}
SILK_DEBUG << "[Snapshots] download progress: [" << counters_dump << "]";
log::Debug() << "[Snapshots] download progress: [" << counters_dump << "]";
};
client_.stats_subscription.connect(log_stats);

const auto num_snapshots{std::ptrdiff_t(snapshot_config->preverified_snapshots().size())};
SILK_INFO << "[Snapshots] sync started: [0/" << num_snapshots << "]";
log::Info() << "[Snapshots] sync started: [0/" << num_snapshots << "]";

std::latch download_done{num_snapshots};
auto log_completed = [&](const std::filesystem::path& snapshot_file) {
static int completed{0};
SILK_INFO << "[Snapshots] download completed for: " << snapshot_file.filename().string() << " [" << ++completed
<< "/" << num_snapshots << "]";
log::Info() << "[Snapshots] download completed for: " << snapshot_file.filename().string()
<< " [" << ++completed << "/" << num_snapshots << "]";
download_done.count_down();
};
client_.completed_subscription.connect(log_completed);
Expand All @@ -149,40 +161,24 @@ bool SnapshotSync::download_snapshots(const std::vector<std::string>& snapshot_f
std::this_thread::sleep_for(kCheckCompletionInterval);
}

SILK_INFO << "[Snapshots] sync completed: [" << num_snapshots << "/" << num_snapshots << "]";
log::Info() << "[Snapshots] sync completed: [" << num_snapshots << "/" << num_snapshots << "]";

reopen();
return true;
}

bool SnapshotSync::index_snapshots(db::RWTxn& txn, const std::vector<std::string>& snapshot_file_names) {
void SnapshotSync::index_snapshots() {
if (!settings_.enabled) {
log::Info() << "[Snapshots] snapshot sync disabled, no index must be created";
return true;
return;
}

// Build any missing snapshot index if needed, then reopen
if (repository_->idx_max_block() < repository_->segment_max_block()) {
log::Info() << "[Snapshots] missing indexes detected, rebuild started";
build_missing_indexes();
repository_->reopen_folder();
}

const auto max_block_available = repository_->max_block_available();
log::Info() << "[Snapshots] max block available: " << max_block_available
<< " (segment max block: " << repository_->segment_max_block()
<< ", idx max block: " << repository_->idx_max_block() << ")";

const auto snapshot_config = snapshot::Config::lookup_known_config(config_.chain_id, snapshot_file_names);
const auto configured_max_block_number = snapshot_config->max_block_number();
log::Info() << "[Snapshots] configured max block: " << configured_max_block_number;
if (max_block_available < configured_max_block_number) {
// Iterate on block header snapshots and write header-related tables
return save(txn, max_block_available);
reopen();
}

// TODO(canepat) add_headers_from_snapshots towards header downloader persisted link queue

return true;
}

bool SnapshotSync::stop() {
Expand Down Expand Up @@ -216,13 +212,28 @@ void SnapshotSync::build_missing_indexes() {
workers.wait_for_tasks();
}

bool SnapshotSync::save(db::RWTxn& txn, BlockNum max_block_available) {
void SnapshotSync::update_database(db::RWTxn& txn, BlockNum max_block_available) {
update_block_headers(txn, max_block_available);
update_block_bodies(txn, max_block_available);
update_block_hashes(txn, max_block_available);
update_block_senders(txn, max_block_available);
}

void SnapshotSync::update_block_headers(db::RWTxn& txn, BlockNum max_block_available) {
// Check if Headers stage progress has already reached the max block in snapshots
const auto last_progress{db::stages::read_stage_progress(txn, db::stages::kHeadersKey)};
if (last_progress >= max_block_available) {
return;
}

log::Info() << "[Snapshots] Database update started";

// Iterate on block header snapshots and write header-related tables
etl::Collector hash2bn_collector{};
intx::uint256 total_difficulty{0};
uint64_t block_count{0};
repository_->for_each_header([&](const BlockHeader* header) -> bool {
SILK_DEBUG << "Header number: " << header->number << " hash: " << to_hex(header->hash());
log::Debug() << "[Snapshots] Header number: " << header->number << " hash: " << to_hex(header->hash());
const auto block_number = header->number;
const auto block_hash = header->hash();

Expand All @@ -239,38 +250,72 @@ bool SnapshotSync::save(db::RWTxn& txn, BlockNum max_block_available) {
hash2bn_collector.collect({block_hash.bytes, encoded_block_number});

if (++block_count % 1'000'000 == 0) {
log::Info() << "[Snapshots] processing block header: " << block_number << " count=" << block_count;
log::Info() << "[Snapshots] Processing block header: " << block_number << " count=" << block_count;
if (is_stopping()) return false;
}

return true;
});
db::PooledCursor header_numbers_cursor{txn, db::table::kHeaderNumbers};
hash2bn_collector.load(header_numbers_cursor);

// Reset sequence for kBlockTransactions table
const auto view_result = repository_->view_tx_segment(max_block_available, [&](const auto* tx_sn) {
const auto last_tx_id = tx_sn->idx_txn_hash()->base_data_id() + tx_sn->item_count();
db::reset_map_sequence(txn, db::table::kBlockTransactions.name, last_tx_id + 1);
return true;
});
if (view_result != SnapshotRepository::ViewResult::kWalkSuccess) {
log::Error() << "[Snapshots] snapshot not found for block: " << max_block_available;
return false;
}
log::Info() << "[Snapshots] Database table HeaderNumbers updated";

// Update head block header in kHeadHeader table
const auto canonical_hash{db::read_canonical_hash(txn, repository_->max_block_available())};
const auto canonical_hash{db::read_canonical_hash(txn, max_block_available)};
ensure(canonical_hash.has_value(), "SnapshotSync::save no canonical head hash found");
db::write_head_header_hash(txn, *canonical_hash);
log::Info() << "[Snapshots] Database table HeadHeader updated";

// Update progress for related stages
// Update Headers stage progress to the max block in snapshots
db::stages::write_stage_progress(txn, db::stages::kHeadersKey, max_block_available);

log::Info() << "[Snapshots] Database Headers stage progress updated";
}

void SnapshotSync::update_block_bodies(db::RWTxn& txn, BlockNum max_block_available) {
// Check if BlockBodies stage progress has already reached the max block in snapshots
const auto last_progress{db::stages::read_stage_progress(txn, db::stages::kBlockBodiesKey)};
if (last_progress >= max_block_available) {
return;
}

// Reset sequence for kBlockTransactions table
const auto tx_snapshot = repository_->find_tx_segment(max_block_available);
ensure(tx_snapshot, "SnapshotSync::update_block_bodies snapshots max block not found in any snapshot");
const auto last_tx_id = tx_snapshot->idx_txn_hash()->base_data_id() + tx_snapshot->item_count();
db::reset_map_sequence(txn, db::table::kBlockTransactions.name, last_tx_id + 1);
log::Info() << "[Snapshots] Database table BlockTransactions sequence reset";

// Update BlockBodies stage progress to the max block in snapshots
db::stages::write_stage_progress(txn, db::stages::kBlockBodiesKey, max_block_available);

log::Info() << "[Snapshots] Database BlockBodies stage progress updated";
}

void SnapshotSync::update_block_hashes(db::RWTxn& txn, BlockNum max_block_available) {
// Check if BlockHashes stage progress has already reached the max block in snapshots
const auto last_progress{db::stages::read_stage_progress(txn, db::stages::kBlockHashesKey)};
if (last_progress >= max_block_available) {
return;
}

// Update BlockHashes stage progress to the max block in snapshots
db::stages::write_stage_progress(txn, db::stages::kBlockHashesKey, max_block_available);

log::Info() << "[Snapshots] Database BlockHashes stage progress updated";
}

void SnapshotSync::update_block_senders(db::RWTxn& txn, BlockNum max_block_available) {
// Check if Senders stage progress has already reached the max block in snapshots
const auto last_progress{db::stages::read_stage_progress(txn, db::stages::kSendersKey)};
if (last_progress >= max_block_available) {
return;
}

// Update Senders stage progress to the max block in snapshots
db::stages::write_stage_progress(txn, db::stages::kSendersKey, max_block_available);

return true;
log::Info() << "[Snapshots] Database Senders stage progress updated";
}

} // namespace silkworm::snapshot
11 changes: 8 additions & 3 deletions silkworm/node/snapshot/sync.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,20 @@ class SnapshotSync : public Stoppable {
SnapshotSync(SnapshotRepository* repository, const ChainConfig& config);
~SnapshotSync() override;

bool stop() override;

bool download_and_index_snapshots(db::RWTxn& txn);
bool download_snapshots(const std::vector<std::string>& snapshot_file_names);
bool index_snapshots(db::RWTxn& txn, const std::vector<std::string>& snapshot_file_names);
bool stop() override;
void index_snapshots();

private:
void reopen();
void build_missing_indexes();
bool save(db::RWTxn& txn, BlockNum max_block_available);
void update_database(db::RWTxn& txn, BlockNum max_block_available);
void update_block_headers(db::RWTxn& txn, BlockNum max_block_available);
void update_block_bodies(db::RWTxn& txn, BlockNum max_block_available);
static void update_block_hashes(db::RWTxn& txn, BlockNum max_block_available);
static void update_block_senders(db::RWTxn& txn, BlockNum max_block_available);

SnapshotRepository* repository_;
const SnapshotSettings& settings_;
Expand Down

0 comments on commit 5c7c607

Please sign in to comment.