Skip to content

Commit

Permalink
datastore: SnapshotBundle schema (#2484)
Browse files Browse the repository at this point in the history
Previously SnapshotBundle contained a fixed number of predefined entities.
Now SnapshotBundle is dynamic and based on a snapshots::Schema::RepositoryDef. The actual schema is defined in db::blocks::make_blocks_repository_schema(). snapshots::Schema follows a "fluent config builder" pattern. Currently rec_split_index_defs_ values are trivial there, but in the future we will configure index builders there.

snapshots::Schema::RepositoryDef is a part of datastore::Schema. It is configured by db::DataStore::make_schema. Currently datastore::Schema just wraps snapshots::Schema, but in the future it will also contain DHIIs.

For the DAL client code a db::blocks::BundleDataRef wrapper is provided for convenient access into the block repository bundles (much like db::DataStoreRef provides a convenient access to the block repository).

Related refactorings:

* separate bundle and bundle paths. SnapshotBundlePaths is useful for file manipulation without opening snapshots.
* always open SnapshotBundle on creation (RAII)
* open repository by default. It is not opened only in case of CAPI where bundles are mmap-ed and provided externally.
* move db/transactions subfolder into db/blocks. db/blocks now contains everything related to "block snapshots". db/state will contain "state snapshots" and DHII.
* rename snapshot_repository variables to just repository
  • Loading branch information
battlmonstr authored Nov 7, 2024
2 parents c7651ce + 8741a76 commit 8c149fc
Show file tree
Hide file tree
Showing 58 changed files with 646 additions and 374 deletions.
55 changes: 27 additions & 28 deletions cmd/capi/execute.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,70 +145,70 @@ const char* make_path(const snapshots::SnapshotPath& p) {
return path;
}

std::vector<SilkwormChainSnapshot> collect_all_snapshots(const SnapshotRepository& snapshot_repository) {
std::vector<SilkwormChainSnapshot> collect_all_snapshots(const SnapshotRepository& repository) {
std::vector<SilkwormHeadersSnapshot> headers_snapshot_sequence;
std::vector<SilkwormBodiesSnapshot> bodies_snapshot_sequence;
std::vector<SilkwormTransactionsSnapshot> transactions_snapshot_sequence;

for (const auto& bundle_ptr : snapshot_repository.view_bundles()) {
const auto& bundle = *bundle_ptr;
for (const auto& bundle_ptr : repository.view_bundles()) {
db::blocks::BundleDataRef bundle{**bundle_ptr};
{
{
SilkwormHeadersSnapshot raw_headers_snapshot{
.segment{
.file_path = make_path(bundle.header_segment.path()),
.memory_address = bundle.header_segment.memory_file_region().data(),
.memory_length = bundle.header_segment.memory_file_region().size(),
.file_path = make_path(bundle.header_segment().path()),
.memory_address = bundle.header_segment().memory_file_region().data(),
.memory_length = bundle.header_segment().memory_file_region().size(),
},
.header_hash_index{
.file_path = make_path(bundle.idx_header_hash.path()),
.memory_address = bundle.idx_header_hash.memory_file_region().data(),
.memory_length = bundle.idx_header_hash.memory_file_region().size(),
.file_path = make_path(bundle.idx_header_hash().path()),
.memory_address = bundle.idx_header_hash().memory_file_region().data(),
.memory_length = bundle.idx_header_hash().memory_file_region().size(),
},
};
headers_snapshot_sequence.push_back(raw_headers_snapshot);
}
{
SilkwormBodiesSnapshot raw_bodies_snapshot{
.segment{
.file_path = make_path(bundle.body_segment.path()),
.memory_address = bundle.body_segment.memory_file_region().data(),
.memory_length = bundle.body_segment.memory_file_region().size(),
.file_path = make_path(bundle.body_segment().path()),
.memory_address = bundle.body_segment().memory_file_region().data(),
.memory_length = bundle.body_segment().memory_file_region().size(),
},
.block_num_index{
.file_path = make_path(bundle.idx_body_number.path()),
.memory_address = bundle.idx_body_number.memory_file_region().data(),
.memory_length = bundle.idx_body_number.memory_file_region().size(),
.file_path = make_path(bundle.idx_body_number().path()),
.memory_address = bundle.idx_body_number().memory_file_region().data(),
.memory_length = bundle.idx_body_number().memory_file_region().size(),
},
};
bodies_snapshot_sequence.push_back(raw_bodies_snapshot);
}
{
SilkwormTransactionsSnapshot raw_transactions_snapshot{
.segment{
.file_path = make_path(bundle.txn_segment.path()),
.memory_address = bundle.txn_segment.memory_file_region().data(),
.memory_length = bundle.txn_segment.memory_file_region().size(),
.file_path = make_path(bundle.txn_segment().path()),
.memory_address = bundle.txn_segment().memory_file_region().data(),
.memory_length = bundle.txn_segment().memory_file_region().size(),
},
.tx_hash_index{
.file_path = make_path(bundle.idx_txn_hash.path()),
.memory_address = bundle.idx_txn_hash.memory_file_region().data(),
.memory_length = bundle.idx_txn_hash.memory_file_region().size(),
.file_path = make_path(bundle.idx_txn_hash().path()),
.memory_address = bundle.idx_txn_hash().memory_file_region().data(),
.memory_length = bundle.idx_txn_hash().memory_file_region().size(),
},
.tx_hash_2_block_index{
.file_path = make_path(bundle.idx_txn_hash_2_block.path()),
.memory_address = bundle.idx_txn_hash_2_block.memory_file_region().data(),
.memory_length = bundle.idx_txn_hash_2_block.memory_file_region().size(),
.file_path = make_path(bundle.idx_txn_hash_2_block().path()),
.memory_address = bundle.idx_txn_hash_2_block().memory_file_region().data(),
.memory_length = bundle.idx_txn_hash_2_block().memory_file_region().size(),
},
};
transactions_snapshot_sequence.push_back(raw_transactions_snapshot);
}
}
}

ensure(headers_snapshot_sequence.size() == snapshot_repository.bundles_count(), "invalid header snapshot count");
ensure(bodies_snapshot_sequence.size() == snapshot_repository.bundles_count(), "invalid body snapshot count");
ensure(transactions_snapshot_sequence.size() == snapshot_repository.bundles_count(), "invalid tx snapshot count");
ensure(headers_snapshot_sequence.size() == repository.bundles_count(), "invalid header snapshot count");
ensure(bodies_snapshot_sequence.size() == repository.bundles_count(), "invalid body snapshot count");
ensure(transactions_snapshot_sequence.size() == repository.bundles_count(), "invalid tx snapshot count");

std::vector<SilkwormChainSnapshot> snapshot_sequence;
snapshot_sequence.reserve(headers_snapshot_sequence.size());
Expand Down Expand Up @@ -294,7 +294,6 @@ int execute_blocks(SilkwormHandle handle, ExecuteBlocksSettings settings, const
};

auto& repository = data_store.ref().repository;
repository.reopen_folder();

// Collect all snapshots
auto all_chain_snapshots{collect_all_snapshots(repository)};
Expand Down
4 changes: 1 addition & 3 deletions cmd/dev/check_changes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,7 @@ int main(int argc, char* argv[]) {
throw std::runtime_error("Unable to retrieve chain config");
}

auto& repository = data_store.ref().repository;
repository.reopen_folder();
db::DataModel access_layer{txn, repository};
db::DataModel access_layer = db::DataModelFactory{data_store.ref()}(txn);

AnalysisCache analysis_cache{/*max_size=*/5'000};
std::vector<Receipt> receipts;
Expand Down
11 changes: 7 additions & 4 deletions cmd/dev/db_toolbox.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2267,10 +2267,13 @@ void do_freeze(EnvConfig& config, const DataDirectory& data_dir, bool keep_block
};
StageSchedulerAdapter stage_scheduler{data_store.chaindata_rw()};

auto& repository = data_store.ref().repository;
repository.reopen_folder();

Freezer freezer{data_store.chaindata(), repository, stage_scheduler, data_dir.temp().path(), keep_blocks};
Freezer freezer{
data_store.chaindata(),
data_store.ref().repository,
stage_scheduler,
data_dir.temp().path(),
keep_blocks,
};

test_util::TaskRunner runner;
runner.run(freezer.exec() || stage_scheduler.async_run("StageSchedulerAdapter"));
Expand Down
54 changes: 23 additions & 31 deletions cmd/dev/snapshots.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@
#include <silkworm/db/blocks/headers/header_index.hpp>
#include <silkworm/db/blocks/headers/header_queries.hpp>
#include <silkworm/db/blocks/schema_config.hpp>
#include <silkworm/db/blocks/transactions/txn_index.hpp>
#include <silkworm/db/blocks/transactions/txn_queries.hpp>
#include <silkworm/db/blocks/transactions/txn_to_block_index.hpp>
#include <silkworm/db/datastore/snapshot_merger.hpp>
#include <silkworm/db/datastore/snapshots/bittorrent/client.hpp>
#include <silkworm/db/datastore/snapshots/bittorrent/web_seed_client.hpp>
Expand All @@ -52,9 +55,6 @@
#include <silkworm/db/snapshot_recompress.hpp>
#include <silkworm/db/snapshot_sync.hpp>
#include <silkworm/db/tables.hpp>
#include <silkworm/db/transactions/txn_index.hpp>
#include <silkworm/db/transactions/txn_queries.hpp>
#include <silkworm/db/transactions/txn_to_block_index.hpp>
#include <silkworm/infra/common/ensure.hpp>
#include <silkworm/infra/common/log.hpp>
#include <silkworm/infra/test_util/task_runner.hpp>
Expand Down Expand Up @@ -324,13 +324,12 @@ BodyCounters count_bodies_in_one(const SnapshotSubcommandSettings& settings, con
}

BodyCounters count_bodies_in_all(const SnapshotSubcommandSettings& settings) {
auto snapshot_repository = make_repository(settings.settings);
snapshot_repository.reopen_folder();
auto repository = make_repository(settings.settings);
int num_bodies = 0;
uint64_t num_txns = 0;
for (const auto& bundle_ptr : snapshot_repository.view_bundles()) {
const auto& bundle = *bundle_ptr;
const auto [body_count, txn_count] = count_bodies_in_one(settings, bundle.body_segment);
for (const auto& bundle_ptr : repository.view_bundles()) {
db::blocks::BundleDataRef bundle{**bundle_ptr};
const auto [body_count, txn_count] = count_bodies_in_one(settings, bundle.body_segment());
num_bodies += body_count;
num_txns += txn_count;
}
Expand Down Expand Up @@ -374,12 +373,11 @@ int count_headers_in_one(const SnapshotSubcommandSettings& settings, const Segme
}

int count_headers_in_all(const SnapshotSubcommandSettings& settings) {
auto snapshot_repository = make_repository(settings.settings);
snapshot_repository.reopen_folder();
auto repository = make_repository(settings.settings);
int num_headers{0};
for (const auto& bundle_ptr : snapshot_repository.view_bundles()) {
const auto& bundle = *bundle_ptr;
const auto header_count = count_headers_in_one(settings, bundle.header_segment);
for (const auto& bundle_ptr : repository.view_bundles()) {
db::blocks::BundleDataRef bundle{**bundle_ptr};
const auto header_count = count_headers_in_one(settings, bundle.header_segment());
num_headers += header_count;
}
return num_headers;
Expand Down Expand Up @@ -721,9 +719,8 @@ void lookup_header_by_hash(const SnapshotSubcommandSettings& settings) {

std::optional<SnapshotPath> matching_snapshot_path;
std::optional<BlockHeader> matching_header;
auto snapshot_repository = make_repository(settings.settings);
snapshot_repository.reopen_folder();
for (const auto& bundle_ptr : snapshot_repository.view_bundles_reverse()) {
auto repository = make_repository(settings.settings);
for (const auto& bundle_ptr : repository.view_bundles_reverse()) {
const auto& bundle = *bundle_ptr;
auto segment_and_index = bundle.segment_and_index(SnapshotType::headers);
const auto header = HeaderFindByHashQuery{segment_and_index}.exec(*hash);
Expand Down Expand Up @@ -751,9 +748,8 @@ void lookup_header_by_number(const SnapshotSubcommandSettings& settings) {
SILK_INFO << "Lookup header number: " << block_number;
std::chrono::time_point start{std::chrono::steady_clock::now()};

auto snapshot_repository = make_repository(settings.settings);
snapshot_repository.reopen_folder();
const auto [segment_and_index, _] = snapshot_repository.find_segment(SnapshotType::headers, block_number);
auto repository = make_repository(settings.settings);
const auto [segment_and_index, _] = repository.find_segment(SnapshotType::headers, block_number);
if (segment_and_index) {
const auto header = HeaderFindByBlockNumQuery{*segment_and_index}.exec(block_number);
ensure(header.has_value(),
Expand Down Expand Up @@ -811,11 +807,10 @@ void lookup_body_in_one(const SnapshotSubcommandSettings& settings, BlockNum blo
}

void lookup_body_in_all(const SnapshotSubcommandSettings& settings, BlockNum block_number) {
auto snapshot_repository = make_repository(settings.settings);
snapshot_repository.reopen_folder();
auto repository = make_repository(settings.settings);

std::chrono::time_point start{std::chrono::steady_clock::now()};
const auto [segment_and_index, _] = snapshot_repository.find_segment(SnapshotType::bodies, block_number);
const auto [segment_and_index, _] = repository.find_segment(SnapshotType::bodies, block_number);
if (segment_and_index) {
const auto body = BodyFindByBlockNumQuery{*segment_and_index}.exec(block_number);
ensure(body.has_value(),
Expand Down Expand Up @@ -918,12 +913,11 @@ void lookup_txn_by_hash_in_one(const SnapshotSubcommandSettings& settings, const
}

void lookup_txn_by_hash_in_all(const SnapshotSubcommandSettings& settings, const Hash& hash) {
auto snapshot_repository = make_repository(settings.settings);
snapshot_repository.reopen_folder();
auto repository = make_repository(settings.settings);

std::optional<SnapshotPath> matching_snapshot_path;
std::chrono::time_point start{std::chrono::steady_clock::now()};
for (const auto& bundle_ptr : snapshot_repository.view_bundles_reverse()) {
for (const auto& bundle_ptr : repository.view_bundles_reverse()) {
const auto& bundle = *bundle_ptr;
auto segment_and_index = bundle.segment_and_index(SnapshotType::transactions);
const auto transaction = TransactionFindByHashQuery{segment_and_index}.exec(hash);
Expand Down Expand Up @@ -983,12 +977,11 @@ void lookup_txn_by_id_in_one(const SnapshotSubcommandSettings& settings, uint64_
}

void lookup_txn_by_id_in_all(const SnapshotSubcommandSettings& settings, uint64_t txn_id) {
auto snapshot_repository = make_repository(settings.settings);
snapshot_repository.reopen_folder();
auto repository = make_repository(settings.settings);

std::optional<SnapshotPath> matching_snapshot_path;
std::chrono::time_point start{std::chrono::steady_clock::now()};
for (const auto& bundle_ptr : snapshot_repository.view_bundles_reverse()) {
for (const auto& bundle_ptr : repository.view_bundles_reverse()) {
const auto& bundle = *bundle_ptr;
auto segment_and_index = bundle.segment_and_index(SnapshotType::transactions);
const auto transaction = TransactionFindByIdQuery{segment_and_index}.exec(txn_id);
Expand Down Expand Up @@ -1029,10 +1022,9 @@ void lookup_transaction(const SnapshotSubcommandSettings& settings) {
}

void merge(const SnapshotSettings& settings) {
auto snapshot_repository = make_repository(settings);
snapshot_repository.reopen_folder();
auto repository = make_repository(settings);
TemporaryDirectory tmp_dir;
db::SnapshotMerger merger{snapshot_repository, tmp_dir.path()};
db::SnapshotMerger merger{repository, tmp_dir.path()};
test_util::TaskRunner runner;
runner.run(merger.exec());
}
Expand Down
2 changes: 1 addition & 1 deletion silkworm/capi/fork_validator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ SILKWORM_EXPORT int silkworm_start_fork_validator(SilkwormHandle handle, MDBX_en

silkworm::db::EnvUnmanaged unmanaged_env{mdbx_env};
silkworm::db::RWAccess rw_access{unmanaged_env};
silkworm::db::DataStoreRef data_store{rw_access, *handle->snapshot_repository};
silkworm::db::DataStoreRef data_store{rw_access, *handle->repository};
silkworm::db::DataModelFactory data_model_factory{std::move(data_store)};

handle->execution_engine = std::make_unique<silkworm::stagedsync::ExecutionEngine>(
Expand Down
2 changes: 1 addition & 1 deletion silkworm/capi/instance.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ struct SilkwormInstance {
silkworm::concurrency::ContextPoolSettings context_pool_settings;
std::filesystem::path data_dir_path;
silkworm::NodeSettings node_settings;
std::unique_ptr<silkworm::snapshots::SnapshotRepository> snapshot_repository;
std::unique_ptr<silkworm::snapshots::SnapshotRepository> repository;
std::unique_ptr<silkworm::rpc::Daemon> rpcdaemon;
std::unique_ptr<silkworm::stagedsync::ExecutionEngine> execution_engine;

Expand Down
2 changes: 1 addition & 1 deletion silkworm/capi/rpcdaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ SILKWORM_EXPORT int silkworm_start_rpcdaemon(SilkwormHandle handle, MDBX_env* en
auto daemon_settings = make_daemon_settings(handle, *settings);
db::DataStoreRef data_store{
db::RWAccess{db::EnvUnmanaged{env}},
*handle->snapshot_repository,
*handle->repository,
};

// Create the one-and-only Silkrpc daemon
Expand Down
Loading

0 comments on commit 8c149fc

Please sign in to comment.