Skip to content

Commit

Permalink
make indexes mandatory in reopen
Browse files Browse the repository at this point in the history
  • Loading branch information
battlmonstr committed Apr 24, 2024
1 parent 0eaf8cd commit c553693
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 67 deletions.
7 changes: 0 additions & 7 deletions silkworm/capi/silkworm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -332,9 +332,7 @@ SILKWORM_EXPORT int silkworm_add_snapshot(SilkwormHandle handle, SilkwormChainSn
return SILKWORM_INVALID_PATH;
}
snapshots::Snapshot header_snapshot{*headers_segment_path, make_region(hs.segment)};
header_snapshot.reopen_segment();
snapshots::Index idx_header_hash{headers_segment_path->index_file(), make_region(hs.header_hash_index)};
idx_header_hash.reopen_index();

const SilkwormBodiesSnapshot& bs = snapshot->bodies;
if (!bs.segment.file_path || !bs.block_num_index.file_path) {
Expand All @@ -345,9 +343,7 @@ SILKWORM_EXPORT int silkworm_add_snapshot(SilkwormHandle handle, SilkwormChainSn
return SILKWORM_INVALID_PATH;
}
snapshots::Snapshot body_snapshot{*bodies_segment_path, make_region(bs.segment)};
body_snapshot.reopen_segment();
snapshots::Index idx_body_number{bodies_segment_path->index_file(), make_region(bs.block_num_index)};
idx_body_number.reopen_index();

const SilkwormTransactionsSnapshot& ts = snapshot->transactions;
if (!ts.segment.file_path || !ts.tx_hash_index.file_path || !ts.tx_hash_2_block_index.file_path) {
Expand All @@ -358,11 +354,8 @@ SILKWORM_EXPORT int silkworm_add_snapshot(SilkwormHandle handle, SilkwormChainSn
return SILKWORM_INVALID_PATH;
}
snapshots::Snapshot txn_snapshot{*transactions_segment_path, make_region(ts.segment)};
txn_snapshot.reopen_segment();
snapshots::Index idx_txn_hash{transactions_segment_path->index_file_for_type(snapshots::SnapshotType::transactions), make_region(ts.tx_hash_index)};
idx_txn_hash.reopen_index();
snapshots::Index idx_txn_hash_2_block{transactions_segment_path->index_file_for_type(snapshots::SnapshotType::transactions_to_block), make_region(ts.tx_hash_2_block_index)};
idx_txn_hash_2_block.reopen_index();

snapshots::SnapshotBundle bundle{
.header_snapshot = std::move(header_snapshot),
Expand Down
8 changes: 5 additions & 3 deletions silkworm/db/snapshots/path.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ std::optional<SnapshotPath> SnapshotPath::parse(fs::path path) {
const std::string filename_no_ext = path.stem().string();

// Expected stem format: <version>-<6_digit_block_from>-<6_digit_block_to>-<tag>
const std::vector<absl::string_view> tokens = absl::StrSplit(filename_no_ext, '-');
const std::vector<absl::string_view> tokens = absl::StrSplit(filename_no_ext, absl::MaxSplits('-', 3));
if (tokens.size() != 4) {
return std::nullopt;
}
Expand Down Expand Up @@ -79,8 +79,10 @@ std::optional<SnapshotPath> SnapshotPath::parse(fs::path path) {
return std::nullopt;
}

// Expected tag format: headers|bodies|transactions (parsing relies on magic_enum, so SnapshotType items must match exactly)
std::string_view tag_str{tag.data(), tag.size()};
// Expected tag format: headers|bodies|transactions|transactions-to-block
// parsing relies on magic_enum, so SnapshotType items must match exactly
std::string tag_str{tag.data(), tag.size()};
std::replace(tag_str.begin(), tag_str.end(), '-', '_');
const auto type = magic_enum::enum_cast<SnapshotType>(tag_str);
if (!type) {
return std::nullopt;
Expand Down
114 changes: 60 additions & 54 deletions silkworm/db/snapshots/repository.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,29 +61,40 @@ void SnapshotRepository::add_snapshot_bundle(SnapshotBundle bundle) {
BlockNum block_from = bundle.block_from();
BlockNum block_to = bundle.block_to();

bundle.reopen();

bundles_.emplace(block_from, std::move(bundle));

segment_max_block_ = std::max(segment_max_block_, block_to - 1);
idx_max_block_ = max_idx_available();
}

void SnapshotRepository::reopen_folder() {
SILK_INFO << "Reopen snapshot repository folder: " << settings_.repository_dir.string();
SnapshotPathList segment_files = get_segment_files();
reopen_list(segment_files);
SILK_INFO << "Total reopened snapshots: " << total_snapshots_count();
void SnapshotBundle::reopen() {
for (auto& snapshot_ref : snapshots()) {
snapshot_ref.get().reopen_segment();
ensure(!snapshot_ref.get().empty(), [&]() {
return "invalid empty snapshot " + snapshot_ref.get().fs_path().string();
});
}
for (auto& index_ref : indexes()) {
index_ref.get().reopen_index();
}
}

void SnapshotBundle::close() {
for (auto& index_ref : indexes()) {
index_ref.get().close_index();
}
for (auto& snapshot_ref : snapshots()) {
snapshot_ref.get().close();
}
}

void SnapshotRepository::close() {
SILK_TRACE << "Close snapshot repository folder: " << settings_.repository_dir.string();
for (auto& entry : bundles_) {
auto& bundle = entry.second;
for (auto& index_ref : bundle.indexes()) {
index_ref.get().close_index();
}
for (auto& snapshot_ref : bundle.snapshots()) {
snapshot_ref.get().close();
}
bundle.close();
}
}

Expand Down Expand Up @@ -228,65 +239,59 @@ std::vector<std::shared_ptr<IndexBuilder>> SnapshotRepository::missing_indexes()
return missing_index_list;
}

void SnapshotRepository::reopen_list(const SnapshotPathList& segment_files) {
std::map<BlockNum, SnapshotPath> header_snapshot_paths;
std::map<BlockNum, SnapshotPath> body_snapshot_paths;
std::map<BlockNum, SnapshotPath> txn_snapshot_paths;

for (const SnapshotPath& path : segment_files) {
switch (path.type()) {
case SnapshotType::headers:
header_snapshot_paths.emplace(path.block_from(), path);
break;
case SnapshotType::bodies:
body_snapshot_paths.emplace(path.block_from(), path);
break;
case SnapshotType::transactions:
txn_snapshot_paths.emplace(path.block_from(), path);
break;
case SnapshotType::transactions_to_block:
assert(false);
break;
}
void SnapshotRepository::reopen_folder() {
SILK_INFO << "Reopen snapshot repository folder: " << settings_.repository_dir.string();
SnapshotPathList all_snapshot_paths = get_segment_files();
SnapshotPathList all_index_paths = get_idx_files();

std::map<BlockNum, std::map<bool, std::map<SnapshotType, size_t>>> groups;

for (size_t i = 0; i < all_snapshot_paths.size(); i++) {
auto& path = all_snapshot_paths[i];
auto& group = groups[path.block_from()][false];
group[path.type()] = i;
}

for (size_t i = 0; i < all_index_paths.size(); i++) {
auto& path = all_index_paths[i];
auto& group = groups[path.block_from()][true];
group[path.type()] = i;
}

BlockNum num = 0;
if (!header_snapshot_paths.empty()) {
num = header_snapshot_paths.begin()->first;
if (!groups.empty()) {
num = groups.begin()->first;
}

while (
header_snapshot_paths.contains(num) &&
body_snapshot_paths.contains(num) &&
txn_snapshot_paths.contains(num)) {
while (groups.contains(num) &&
(groups[num][false].size() == SnapshotBundle::kSnapshotsCount) &&
(groups[num][true].size() == SnapshotBundle::kIndexesCount)) {
if (!bundles_.contains(num)) {
auto snapshot_path = [&](SnapshotType type) {
return all_snapshot_paths[groups[num][false][type]];
};
auto index_path = [&](SnapshotType type) {
return all_index_paths[groups[num][true][type]];
};

SnapshotBundle bundle{
.header_snapshot = Snapshot(header_snapshot_paths.at(num)),
.idx_header_hash = Index(header_snapshot_paths.at(num).index_file()),
.header_snapshot = Snapshot(snapshot_path(SnapshotType::headers)),
.idx_header_hash = Index(index_path(SnapshotType::headers)),

.body_snapshot = Snapshot(body_snapshot_paths.at(num)),
.idx_body_number = Index(body_snapshot_paths.at(num).index_file()),
.body_snapshot = Snapshot(snapshot_path(SnapshotType::bodies)),
.idx_body_number = Index(index_path(SnapshotType::bodies)),

.txn_snapshot = Snapshot(txn_snapshot_paths.at(num)),
.idx_txn_hash = Index(txn_snapshot_paths.at(num).index_file_for_type(SnapshotType::transactions)),
.idx_txn_hash_2_block = Index(txn_snapshot_paths.at(num).index_file_for_type(SnapshotType::transactions_to_block)),
.txn_snapshot = Snapshot(snapshot_path(SnapshotType::transactions)),
.idx_txn_hash = Index(index_path(SnapshotType::transactions)),
.idx_txn_hash_2_block = Index(index_path(SnapshotType::transactions_to_block)),
};

for (auto& snapshot_ref : bundle.snapshots()) {
snapshot_ref.get().reopen_segment();
ensure(!snapshot_ref.get().empty(), [&]() {
return "invalid empty snapshot " + snapshot_ref.get().fs_path().string();
});
}
bundle.reopen();

bundles_.emplace(num, std::move(bundle));
}

auto& bundle = bundles_.at(num);
for (auto& index_ref : bundle.indexes()) {
index_ref.get().reopen_index();
}

segment_max_block_ = std::max(segment_max_block_, bundle.block_to() - 1);

if (num < bundle.block_to()) {
Expand All @@ -297,6 +302,7 @@ void SnapshotRepository::reopen_list(const SnapshotPathList& segment_files) {
}

idx_max_block_ = max_idx_available();
SILK_INFO << "Total reopened snapshots: " << total_snapshots_count();
}

const SnapshotBundle* SnapshotRepository::find_bundle(BlockNum number) const {
Expand Down
11 changes: 8 additions & 3 deletions silkworm/db/snapshots/repository.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,18 @@ struct SnapshotBundle {
//! Index transaction_hash -> block_num
Index idx_txn_hash_2_block;

std::array<std::reference_wrapper<Snapshot>, 3> snapshots() {
static constexpr size_t kSnapshotsCount = 3;
static constexpr size_t kIndexesCount = 4;

std::array<std::reference_wrapper<Snapshot>, kSnapshotsCount> snapshots() {
return {
header_snapshot,
body_snapshot,
txn_snapshot,
};
}

std::array<std::reference_wrapper<Index>, 4> indexes() {
std::array<std::reference_wrapper<Index>, kIndexesCount> indexes() {
return {
idx_header_hash,
idx_body_number,
Expand Down Expand Up @@ -100,6 +103,9 @@ struct SnapshotBundle {
// assume that all snapshots have the same block range, and use one of them
BlockNum block_from() const { return header_snapshot.block_from(); }
BlockNum block_to() const { return header_snapshot.block_to(); }

void reopen();
void close();
};

//! Read-only repository for all snapshot files.
Expand Down Expand Up @@ -163,7 +169,6 @@ class SnapshotRepository {
[[nodiscard]] std::optional<BlockNum> find_block_number(Hash txn_hash) const;

private:
void reopen_list(const SnapshotPathList& segment_files);
std::size_t view_segments(SnapshotType type, const SnapshotWalker& walker);
const SnapshotBundle* find_bundle(BlockNum number) const;
std::optional<SnapshotRepository::SnapshotAndIndex> find_segment(SnapshotType type, BlockNum number) const;
Expand Down

0 comments on commit c553693

Please sign in to comment.