Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

snapshots refactoring #1970

Merged
merged 37 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
51b9e41
extract header serialization
battlmonstr Apr 16, 2024
0972bb7
extract body serialization
battlmonstr Apr 16, 2024
5ac4cb0
refactor slice_tx_payload
battlmonstr Apr 16, 2024
361e887
rename txn_hash -> txn_snapshot_word_serializer
battlmonstr Apr 16, 2024
fe4644b
extract tx serialization
battlmonstr Apr 16, 2024
55094c3
snapshot iterator using SnapshotWordSerializer
battlmonstr Apr 16, 2024
dacc3d8
move Snapshot class to snapshot_base.hpp
battlmonstr Apr 16, 2024
1b44c19
refactor SnapshotWordSerializer-s: rename value, make check_sanity_wi…
battlmonstr Apr 16, 2024
c465358
HeaderSnapshotReader
battlmonstr Apr 16, 2024
a6bb467
rename snapshot_base -> snapshot_reader
battlmonstr Apr 18, 2024
41ff53f
BodySnapshotReader
battlmonstr Apr 16, 2024
73f7760
refactor next_item/seek, TransactionSnapshotReader
battlmonstr Apr 17, 2024
8caa0d5
walkers using references
battlmonstr Apr 17, 2024
75cd594
ordinal_lookup refactoring
battlmonstr Apr 17, 2024
d9bb192
txn_range refactoring
battlmonstr Apr 17, 2024
bb05598
basic queries
battlmonstr Apr 18, 2024
5d70d51
BodyTxsAmountQuery
battlmonstr Apr 18, 2024
a596a2c
TransactionBlockNumByTxnHashQuery
battlmonstr Apr 18, 2024
acc047a
refactor query call sites to use query objects
battlmonstr Apr 18, 2024
52f6858
remove path copies from SnapshotBundle
battlmonstr Apr 18, 2024
3733503
refactor to separate Index-es from snapshots
battlmonstr Apr 18, 2024
aa3b101
rename serializer -> deserializer
battlmonstr Apr 19, 2024
f61a88f
SnapshotReader: mutable iterators and fix move support
battlmonstr Apr 22, 2024
f9aa2ec
static_assert iterators
battlmonstr Apr 22, 2024
22e8ae5
simplify queries and readers with typedefs
battlmonstr Apr 22, 2024
d54cf6e
concepts for deserializers and readers
battlmonstr Apr 23, 2024
401cef8
remove read_senders from snapshots access_layer
battlmonstr Apr 23, 2024
95ff21d
rename ordinal_lookup -> lookup_by_ordinal
battlmonstr Apr 23, 2024
1226e02
lookup_by_data_id: return optional, check upper bound
battlmonstr Apr 23, 2024
e7a9c79
simplify SnapshotSync::download_and_index_snapshots
battlmonstr Apr 23, 2024
668937e
remove SnapshotRepository::view_segment
battlmonstr Apr 23, 2024
f8bb9ef
move stale index removal logic to repository and use it from Snapshot…
battlmonstr Apr 24, 2024
0eaf8cd
reopen_index: throw if path is not found
battlmonstr Apr 24, 2024
c553693
make indexes mandatory in reopen
battlmonstr Apr 24, 2024
d09764a
simplify count/max computations
battlmonstr Apr 24, 2024
7d1b167
demote log::Info to Debug: ETL collector flushed file
battlmonstr Apr 25, 2024
14c4ef9
rebuild a missing transactions_to_block index if transactions index e…
battlmonstr Apr 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 62 additions & 76 deletions cmd/capi/execute.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,68 +145,69 @@ const char* make_path(const snapshots::SnapshotPath& p) {
return path;
}

std::vector<SilkwormChainSnapshot> collect_all_snapshots(const SnapshotRepository& snapshot_repository) {
std::vector<SilkwormChainSnapshot> collect_all_snapshots(SnapshotRepository& snapshot_repository) {
std::vector<SilkwormHeadersSnapshot> headers_snapshot_sequence;
std::vector<SilkwormBodiesSnapshot> bodies_snapshot_sequence;
std::vector<SilkwormTransactionsSnapshot> transactions_snapshot_sequence;

for (const auto& segment_file : snapshot_repository.get_segment_files()) {
switch (segment_file.type()) {
case SnapshotType::headers: {
const auto* header_snapshot{snapshot_repository.get_header_segment(segment_file)};
const auto* idx_header_hash{header_snapshot->idx_header_hash()};
snapshot_repository.view_bundles(
[&](const SnapshotBundle& bundle) {
{
SilkwormHeadersSnapshot raw_headers_snapshot{
.segment{
.file_path = make_path(segment_file),
.memory_address = header_snapshot->memory_file_region().data(),
.memory_length = header_snapshot->memory_file_region().size()},
.file_path = make_path(bundle.header_snapshot.path()),
.memory_address = bundle.header_snapshot.memory_file_region().data(),
.memory_length = bundle.header_snapshot.memory_file_region().size(),
},
.header_hash_index{
.file_path = make_path(segment_file.index_file()),
.memory_address = idx_header_hash->memory_file_region().data(),
.memory_length = idx_header_hash->memory_file_region().size()}};
.file_path = make_path(bundle.idx_header_hash.path()),
.memory_address = bundle.idx_header_hash.memory_file_region().data(),
.memory_length = bundle.idx_header_hash.memory_file_region().size(),
},
};
headers_snapshot_sequence.push_back(raw_headers_snapshot);
} break;
case SnapshotType::bodies: {
const auto* body_snapshot{snapshot_repository.get_body_segment(segment_file)};
const auto* idx_body_number{body_snapshot->idx_body_number()};
}
{
SilkwormBodiesSnapshot raw_bodies_snapshot{
.segment{
.file_path = make_path(segment_file),
.memory_address = body_snapshot->memory_file_region().data(),
.memory_length = body_snapshot->memory_file_region().size()},
.file_path = make_path(bundle.body_snapshot.path()),
.memory_address = bundle.body_snapshot.memory_file_region().data(),
.memory_length = bundle.body_snapshot.memory_file_region().size(),
},
.block_num_index{
.file_path = make_path(segment_file.index_file()),
.memory_address = idx_body_number->memory_file_region().data(),
.memory_length = idx_body_number->memory_file_region().size()}};
.file_path = make_path(bundle.idx_body_number.path()),
.memory_address = bundle.idx_body_number.memory_file_region().data(),
.memory_length = bundle.idx_body_number.memory_file_region().size(),
},
};
bodies_snapshot_sequence.push_back(raw_bodies_snapshot);
} break;
case SnapshotType::transactions: {
const auto* tx_snapshot{snapshot_repository.get_tx_segment(segment_file)};
const auto* idx_txn_hash{tx_snapshot->idx_txn_hash()};
const auto* idx_txn_hash_2_block{tx_snapshot->idx_txn_hash_2_block()};
}
{
SilkwormTransactionsSnapshot raw_transactions_snapshot{
.segment{
.file_path = make_path(segment_file),
.memory_address = tx_snapshot->memory_file_region().data(),
.memory_length = tx_snapshot->memory_file_region().size()},
.file_path = make_path(bundle.txn_snapshot.path()),
.memory_address = bundle.txn_snapshot.memory_file_region().data(),
.memory_length = bundle.txn_snapshot.memory_file_region().size(),
},
.tx_hash_index{
.file_path = make_path(segment_file.index_file()),
.memory_address = idx_txn_hash->memory_file_region().data(),
.memory_length = idx_txn_hash->memory_file_region().size()},
.file_path = make_path(bundle.idx_txn_hash.path()),
.memory_address = bundle.idx_txn_hash.memory_file_region().data(),
.memory_length = bundle.idx_txn_hash.memory_file_region().size(),
},
.tx_hash_2_block_index{
.file_path = make_path(segment_file.index_file_for_type(SnapshotType::transactions_to_block)),
.memory_address = idx_txn_hash_2_block->memory_file_region().data(),
.memory_length = idx_txn_hash_2_block->memory_file_region().size()}};
.file_path = make_path(bundle.idx_txn_hash_2_block.path()),
.memory_address = bundle.idx_txn_hash_2_block.memory_file_region().data(),
.memory_length = bundle.idx_txn_hash_2_block.memory_file_region().size(),
},
};
transactions_snapshot_sequence.push_back(raw_transactions_snapshot);
} break;
default:
ensure(false, [&]() { return "unexpected snapshot type: " + std::string{magic_enum::enum_name(segment_file.type())}; });
}
}
}
return true;
});

ensure(headers_snapshot_sequence.size() == snapshot_repository.header_snapshots_count(), "invalid header snapshot count");
ensure(bodies_snapshot_sequence.size() == snapshot_repository.body_snapshots_count(), "invalid body snapshot count");
ensure(transactions_snapshot_sequence.size() == snapshot_repository.tx_snapshots_count(), "invalid tx snapshot count");
ensure(headers_snapshot_sequence.size() == snapshot_repository.bundles_count(), "invalid header snapshot count");
ensure(bodies_snapshot_sequence.size() == snapshot_repository.bundles_count(), "invalid body snapshot count");
ensure(transactions_snapshot_sequence.size() == snapshot_repository.bundles_count(), "invalid tx snapshot count");

std::vector<SilkwormChainSnapshot> snapshot_sequence;
snapshot_sequence.reserve(headers_snapshot_sequence.size());
Expand Down Expand Up @@ -281,7 +282,7 @@ int execute_with_external_txn(SilkwormHandle handle, ExecuteBlocksSettings setti
return SILKWORM_OK;
}

int execute_blocks(SilkwormHandle handle, ExecuteBlocksSettings settings, const SnapshotRepository& repository, const DataDirectory& data_dir) {
int execute_blocks(SilkwormHandle handle, ExecuteBlocksSettings settings, SnapshotRepository& repository, const DataDirectory& data_dir) {
// Open chain database
silkworm::db::EnvConfig config{
.path = data_dir.chaindata().path().string(),
Expand Down Expand Up @@ -318,58 +319,43 @@ int execute_blocks(SilkwormHandle handle, ExecuteBlocksSettings settings, const
}
}

int build_indexes(SilkwormHandle handle, const BuildIndexesSettings& settings, const SnapshotRepository& repository, const DataDirectory& data_dir) {
int build_indexes(SilkwormHandle handle, const BuildIndexesSettings& settings, const DataDirectory& data_dir) {
SILK_INFO << "Building indexes for snapshots: " << settings.snapshot_names;

std::vector<SilkwormMemoryMappedFile*> snapshots;
std::vector<Snapshot> snapshots;
std::vector<SilkwormMemoryMappedFile*> snapshot_files;
// Parse snapshot paths and create memory mapped files
for (auto& snapshot_name : settings.snapshot_names) {
auto raw_snapshot_path = data_dir.snapshots().path() / snapshot_name;
auto snapshot_path = SnapshotPath::parse(raw_snapshot_path);
if (!snapshot_path.has_value())
throw std::runtime_error("Invalid snapshot path");

const Snapshot* snapshot{nullptr};
switch (snapshot_path->type()) {
case headers:
snapshot = repository.get_header_segment(*snapshot_path);
break;
case bodies:
snapshot = repository.get_body_segment(*snapshot_path);
break;
case transactions:
case transactions_to_block:
snapshot = repository.get_tx_segment(*snapshot_path);
break;
default:
throw std::runtime_error("Invalid snapshot type");
}
if (!snapshot) {
throw std::runtime_error("Snapshot not found in the repository:" + snapshot_name);
}
Snapshot& snapshot = snapshots.emplace_back(*snapshot_path);
snapshot.reopen_segment();

auto mmf = new SilkwormMemoryMappedFile{
.file_path = make_path(snapshot->path()),
.memory_address = snapshot->memory_file_region().data(),
.memory_length = snapshot->memory_file_region().size(),
.file_path = make_path(*snapshot_path),
.memory_address = snapshot.memory_file_region().data(),
.memory_length = snapshot.memory_file_region().size(),
};
snapshots.push_back(mmf);
snapshot_files.push_back(mmf);
}

// Call api to build indexes
const auto start_time{std::chrono::high_resolution_clock::now()};

const int status_code = silkworm_build_recsplit_indexes(handle, snapshots.data(), snapshots.size());
const int status_code = silkworm_build_recsplit_indexes(handle, snapshot_files.data(), snapshot_files.size());
if (status_code != SILKWORM_OK) return status_code;

auto elapsed = std::chrono::high_resolution_clock::now() - start_time;
SILK_INFO << "Building indexes for snapshots done in "
<< std::chrono::duration_cast<std::chrono::milliseconds>(elapsed).count() << "ms";

// Free memory mapped files
for (auto snapshot : snapshots) {
delete[] snapshot->file_path;
delete snapshot;
for (auto mmf : snapshot_files) {
delete[] mmf->file_path;
delete mmf;
}

return SILKWORM_OK;
Expand Down Expand Up @@ -446,16 +432,16 @@ int main(int argc, char* argv[]) {
// Add snapshots to Silkworm API library
SnapshotSettings snapshot_settings{};
snapshot_settings.repository_dir = data_dir.snapshots().path();
SnapshotRepository repository{snapshot_settings};
repository.reopen_folder();

int status_code = -1;
if (settings.execute_blocks_settings) {
// Execute specified block range using Silkworm API library
SnapshotRepository repository{snapshot_settings};
repository.reopen_folder();
status_code = execute_blocks(handle, *settings.execute_blocks_settings, repository, data_dir);
} else if (settings.build_indexes_settings) {
// Build index for a specific snapshot using Silkworm API library
status_code = build_indexes(handle, *settings.build_indexes_settings, repository, data_dir);
status_code = build_indexes(handle, *settings.build_indexes_settings, data_dir);
} else if (settings.rpcdaemon_settings) {
// Start RPC Daemon using Silkworm API library
status_code = start_rpcdaemon(handle, *settings.rpcdaemon_settings, data_dir);
Expand Down
Loading
Loading