Skip to content

Commit

Permalink
cmd: add command-line parameters in C API execute tool (#1381)
Browse files Browse the repository at this point in the history
cmd: open collect and pass all snapshots in execute tool
api, node: move unmanaged ROTxn/RWTxn implementations to access layer
node: access memory-mapped file address and size in snapshot segment and index
  • Loading branch information
canepat authored Jul 28, 2023
1 parent 384f17d commit e5807f0
Show file tree
Hide file tree
Showing 8 changed files with 202 additions and 91 deletions.
10 changes: 9 additions & 1 deletion cmd/api/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,12 @@ find_package(CLI11 REQUIRED)

add_executable(execute_cpp execute.cpp)
target_include_directories(execute_cpp PUBLIC ${CMAKE_SOURCE_DIR} "${SILKWORM_MAIN_DIR}/third_party/libmdbx")
target_link_libraries(execute_cpp PRIVATE Boost::headers Boost::filesystem Boost::system CLI11::CLI11 silkworm_node)
target_link_libraries(
execute_cpp
PRIVATE Boost::headers
Boost::filesystem
Boost::system
CLI11::CLI11
silkworm_node
cmd_common
)
215 changes: 151 additions & 64 deletions cmd/api/execute.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@
limitations under the License.
*/

#include <iostream>
#include <limits>
#include <stdexcept>

#include <CLI/CLI.hpp>
#include <boost/dll.hpp>
#include <boost/process/environment.hpp>
#include <magic_enum.hpp>
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wimplicit-fallthrough"
#pragma GCC diagnostic ignored "-Wold-style-cast"
Expand All @@ -31,7 +31,16 @@

#include <silkworm/api/silkworm_api.h>
#include <silkworm/infra/common/directories.hpp>
#include <silkworm/infra/common/log.hpp>
#include <silkworm/node/db/access_layer.hpp>
#include <silkworm/node/db/mdbx.hpp>
#include <silkworm/node/snapshot/repository.hpp>

#include "../common/common.hpp"

using namespace silkworm;
using namespace silkworm::snapshot;
using namespace silkworm::cmd::common;

const char* kSilkwormApiLibPath = "../../silkworm/api/libsilkworm_api.dylib";
const char* kSilkwormInitSymbol = "silkworm_init";
Expand All @@ -52,87 +61,156 @@ using SilkwormExecuteBlocksSig =
//! Function signature for silkworm_fini C API
using SilkwormFiniSig = int(SilkwormHandle*);

constexpr const char* kHeader1{"/Users/tullio/Library/Silkworm/snapshots/v1-000000-000500-headers.seg"};
constexpr const char* kBody1{"/Users/tullio/Library/Silkworm/snapshots/v1-000000-000500-bodies.seg"};
constexpr const char* kTransaction1{"/Users/tullio/Library/Silkworm/snapshots/v1-000000-000500-transactions.seg"};
struct ExecuteSettings {
log::Settings log_settings;
BlockNum start_block{0};
BlockNum max_block{0};
uint64_t batch_size{1};
bool write_receipts{false};
};

void parse_command_line(int argc, char* argv[], CLI::App& app, ExecuteSettings& settings) {
auto& log_settings = settings.log_settings;

add_logging_options(app, log_settings);

app.add_option("--from", settings.start_block, "The start block number to execute")
->capture_default_str();
app.add_option("--to", settings.max_block, "The maximum block number to execute")
->capture_default_str();
app.add_option("--batch_size", settings.batch_size, "The block batch size to use")
->capture_default_str();
app.add_flag("--write_receipts", settings.write_receipts, "Flag indicating if transaction receipts must be written or not")
->capture_default_str();

app.parse(argc, argv);
}

std::vector<SilkwormChainSnapshot> collect_all_snapshots(const SnapshotRepository& snapshot_repository) {
std::vector<SilkwormHeadersSnapshot> headers_snapshot_sequence;
std::vector<SilkwormBodiesSnapshot> bodies_snapshot_sequence;
std::vector<SilkwormTransactionsSnapshot> transactions_snapshot_sequence;

for (const auto& segment_file : snapshot_repository.get_segment_files()) {
switch (segment_file.type()) {
case SnapshotType::headers: {
const auto* header_snapshot{snapshot_repository.get_header_segment(segment_file)};
const auto* idx_header_hash{header_snapshot->idx_header_hash()};
SilkwormHeadersSnapshot raw_headers_snapshot{
.segment{
.file_path = segment_file.path().c_str(),
.memory_address = header_snapshot->memory_file_address(),
.memory_length = header_snapshot->memory_file_size()},
.header_hash_index{
.file_path = segment_file.index_file().path().c_str(),
.memory_address = idx_header_hash->memory_file_address(),
.memory_length = idx_header_hash->memory_file_size()}};
headers_snapshot_sequence.push_back(raw_headers_snapshot);
} break;
case SnapshotType::bodies: {
const auto* body_snapshot{snapshot_repository.get_body_segment(segment_file)};
const auto* idx_body_number{body_snapshot->idx_body_number()};
SilkwormBodiesSnapshot raw_bodies_snapshot{
.segment{
.file_path = segment_file.path().c_str(),
.memory_address = body_snapshot->memory_file_address(),
.memory_length = body_snapshot->memory_file_size()},
.block_num_index{
.file_path = segment_file.index_file().path().c_str(),
.memory_address = idx_body_number->memory_file_address(),
.memory_length = idx_body_number->memory_file_size()}};
bodies_snapshot_sequence.push_back(raw_bodies_snapshot);
} break;
case SnapshotType::transactions: {
const auto* tx_snapshot{snapshot_repository.get_tx_segment(segment_file)};
const auto* idx_txn_hash{tx_snapshot->idx_txn_hash()};
const auto* idx_txn_hash_2_block{tx_snapshot->idx_txn_hash_2_block()};
SilkwormTransactionsSnapshot raw_transactions_snapshot{
.segment{
.file_path = segment_file.path().c_str(),
.memory_address = tx_snapshot->memory_file_address(),
.memory_length = tx_snapshot->memory_file_size()},
.tx_hash_index{
.file_path = segment_file.index_file().path().c_str(),
.memory_address = idx_txn_hash->memory_file_address(),
.memory_length = idx_txn_hash->memory_file_size()},
.tx_hash_2_block_index{
.file_path = segment_file.index_file_for_type(SnapshotType::transactions2block).path().c_str(),
.memory_address = idx_txn_hash_2_block->memory_file_address(),
.memory_length = idx_txn_hash_2_block->memory_file_size()}};
transactions_snapshot_sequence.push_back(raw_transactions_snapshot);
} break;
default:
ensure(false, "unexpected snapshot type: " + std::string{magic_enum::enum_name(segment_file.type())});
}
}

ensure(headers_snapshot_sequence.size() == snapshot_repository.header_snapshots_count(), "invalid header snapshot count");
ensure(bodies_snapshot_sequence.size() == snapshot_repository.body_snapshots_count(), "invalid body snapshot count");
ensure(transactions_snapshot_sequence.size() == snapshot_repository.tx_snapshots_count(), "invalid tx snapshot count");

std::vector<SilkwormChainSnapshot> snapshot_sequence;
snapshot_sequence.reserve(headers_snapshot_sequence.size());
for (std::size_t i{0}; i < headers_snapshot_sequence.size(); ++i) {
SilkwormChainSnapshot chain_snapshot{
std::move(headers_snapshot_sequence[i]),
std::move(bodies_snapshot_sequence[i]),
std::move(transactions_snapshot_sequence[i])};
snapshot_sequence.push_back(chain_snapshot);
}
return snapshot_sequence;
}

int main(int /*argc*/, char* /*argv*/[]) {
int main(int argc, char* argv[]) {
CLI::App app{"Execute blocks"};

try {
ExecuteSettings settings;
parse_command_line(argc, argv, app, settings);

log::init(settings.log_settings);

const auto pid = boost::this_process::get_id();
std::cout << "Execute blocks starting [pid=" << std::to_string(pid) << "]\n";
// parse_command_line(argc, argv, app, settings);
SILK_INFO << "Execute blocks starting [pid=" << std::to_string(pid) << "]";

// Import the silkworm_init symbol from silkworm API library
// Import the silkworm_init symbol from Silkworm API library
const auto silkworm_init{
boost::dll::import_symbol<SilkwormInitSig>(kSilkwormApiLibPath, kSilkwormInitSymbol)};

// Import the silkworm_add_snapshot symbol from silkworm API library
// Import the silkworm_add_snapshot symbol from Silkworm API library
const auto silkworm_add_snapshot{
boost::dll::import_symbol<SilkwormAddSnapshotSig>(kSilkwormApiLibPath, kSilkwormAddSnapshotSymbol)};

// Import the silkworm_execute_blocks symbol from silkworm API library
// Import the silkworm_execute_blocks symbol from Silkworm API library
const auto silkworm_execute_blocks{
boost::dll::import_symbol<SilkwormExecuteBlocksSig>(kSilkwormApiLibPath, kSilkwormExecuteBlocksSymbol)};

// Import the silkworm_fini symbol from silkworm API library
// Import the silkworm_fini symbol from Silkworm API library
const auto silkworm_fini{
boost::dll::import_symbol<SilkwormFiniSig>(kSilkwormApiLibPath, kSilkwormFiniSymbol)};

// Initialize SilkwormAPI library
// Initialize Silkworm API library
SilkwormHandle* handle{nullptr};
const int init_status_code = silkworm_init(&handle);
if (init_status_code != SILKWORM_OK) {
std::cerr << "Execute blocks silkworm_init failed [code=" << std::to_string(init_status_code) << "]\n";
SILK_ERROR << "Execute blocks silkworm_init failed [code=" << std::to_string(init_status_code) << "]";
return init_status_code;
}

// Add snapshots to SilkwormAPI library
SilkwormChainSnapshot chain_snapshot{
.headers{
.segment{
.file_path = kHeader1,
.memory_address = 0,
.memory_length = 0,
},
.header_hash_index{
.file_path = "",
.memory_address = 0,
.memory_length = 0,
}},
.bodies{
.segment{
.file_path = kBody1,
.memory_address = 0,
.memory_length = 0,
},
.block_num_index{
.file_path = "",
.memory_address = 0,
.memory_length = 0,
}},
.transactions{
.segment{
.file_path = kTransaction1,
.memory_address = 0,
.memory_length = 0,
},
.tx_hash_index{
.file_path = "",
.memory_address = 0,
.memory_length = 0,
},
.tx_hash_2_block_index{
.file_path = "",
.memory_address = 0,
.memory_length = 0,
}}};
const int add_snapshot_status_code{silkworm_add_snapshot(handle, &chain_snapshot)};
if (add_snapshot_status_code != SILKWORM_OK) {
return init_status_code;
// Add snapshots to Silkworm API library
SnapshotRepository repository;
repository.reopen_folder();

auto all_chain_snapshots{collect_all_snapshots(repository)};
for (auto& chain_snapshot : all_chain_snapshots) {
const int add_snapshot_status_code{silkworm_add_snapshot(handle, &chain_snapshot)};
if (add_snapshot_status_code != SILKWORM_OK) {
SILK_ERROR << "Execute blocks silkworm_add_snapshot failed [code=" << std::to_string(add_snapshot_status_code) << "]";
return add_snapshot_status_code;
}
}

// Execute specified block range using Silkworm API library
silkworm::DataDirectory data_dir{};
silkworm::db::EnvConfig config{
.path = data_dir.chaindata().path().string(),
Expand All @@ -141,30 +219,39 @@ int main(int /*argc*/, char* /*argv*/[]) {
::mdbx::env_managed env{silkworm::db::open_env(config)};
::mdbx::txn_managed rw_txn{env.start_write()};

db::ROTxnUnmanaged ro_txn{rw_txn};
const auto chain_config{db::read_chain_config(ro_txn)};
ensure(chain_config.has_value(), "no chain configuration in database");
const auto chain_id{chain_config->chain_id};

const auto start_block{settings.start_block};
const auto max_block{settings.max_block};
const auto batch_size{settings.batch_size};
const auto write_receipts{settings.write_receipts};
uint64_t last_executed_block{std::numeric_limits<uint64_t>::max()};
int mdbx_error_code{0};
const int status_code{
silkworm_execute_blocks(handle, &*rw_txn, 1, 46147, 46147, 1, false, &last_executed_block, &mdbx_error_code)};
std::cout << "Execute blocks status code: " << std::to_string(status_code) << "\n";
silkworm_execute_blocks(handle, &*rw_txn, chain_id, start_block, max_block, batch_size, write_receipts, &last_executed_block, &mdbx_error_code)};
SILK_INFO << "Execute blocks status code: " << std::to_string(status_code);

// Finalize SilkwormAPI library
// Finalize Silkworm API library
const int fini_status_code = silkworm_fini(handle);
if (fini_status_code != SILKWORM_OK) {
std::cerr << "Execute blocks silkworm_fini failed [code=" << std::to_string(fini_status_code) << "]\n";
SILK_ERROR << "Execute blocks silkworm_fini failed [code=" << std::to_string(fini_status_code) << "]";
return fini_status_code;
}

rw_txn.abort(); // We do not want to commit anything now
rw_txn.abort(); // We do *not* want to commit anything

std::cout << "Execute blocks exiting [pid=" << std::to_string(pid) << "]\n";
SILK_INFO << "Execute blocks exiting [pid=" << std::to_string(pid) << "]";
return status_code;
} catch (const CLI::ParseError& pe) {
return app.exit(pe);
} catch (const std::exception& e) {
// SILK_CRIT << "Execute blocks exiting due to exception: " << e.what();
SILK_CRIT << "Execute blocks exiting due to exception: " << e.what();
return -2;
} catch (...) {
// SILK_CRIT << "Execute blocks exiting due to unexpected exception";
SILK_CRIT << "Execute blocks exiting due to unexpected exception";
return -3;
}
}
26 changes: 0 additions & 26 deletions silkworm/api/silkworm_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,32 +25,6 @@
#include <silkworm/node/db/access_layer.hpp>
#include <silkworm/node/db/buffer.hpp>

namespace silkworm::db {

//! \brief ROTxnUnmanaged wraps an *unmanaged* read-only transaction, which means the underlying transaction
//! lifecycle is not touched by this class. This implies that this class does not abort the transaction.
class ROTxnUnmanaged : public ROTxn, protected ::mdbx::txn {
public:
explicit ROTxnUnmanaged(MDBX_txn* ptr) : ROTxn{static_cast<::mdbx::txn&>(*this)}, ::mdbx::txn{ptr} {}
~ROTxnUnmanaged() override = default;

void abort() override {}
};

//! \brief ROTxnUnmanaged wraps an *unmanaged* read-write transaction, which means the underlying transaction
//! lifecycle is not touched by this class. This implies that this class does not commit nor abort the transaction.
class RWTxnUnmanaged : public RWTxn, protected ::mdbx::txn {
public:
explicit RWTxnUnmanaged(MDBX_txn* ptr) : RWTxn{static_cast<::mdbx::txn&>(*this)}, ::mdbx::txn{ptr} {}
~RWTxnUnmanaged() override = default;

void abort() override {}
void commit_and_renew() override {}
void commit_and_stop() override {}
};

} // namespace silkworm::db

using namespace silkworm;

SILKWORM_EXPORT int silkworm_init(SilkwormHandle** handle) SILKWORM_NOEXCEPT {
Expand Down
22 changes: 22 additions & 0 deletions silkworm/node/db/mdbx.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,16 @@ class ROTxnManaged : public ROTxn {
mdbx::txn_managed managed_txn_;
};

//! \brief ROTxnUnmanaged wraps an *unmanaged* read-only transaction, which means the underlying transaction
//! lifecycle is not touched by this class. This implies that this class does not abort the transaction.
class ROTxnUnmanaged : public ROTxn, protected ::mdbx::txn {
public:
explicit ROTxnUnmanaged(MDBX_txn* ptr) : ROTxn{static_cast<::mdbx::txn&>(*this)}, ::mdbx::txn{ptr} {}
~ROTxnUnmanaged() override = default;

void abort() override {}
};

//! \brief This class wraps a read-write transaction.
//! It is used in function signatures to clarify that read-write access is required.
//! It supports explicit disable/enable of commit capabilities.
Expand Down Expand Up @@ -306,6 +316,18 @@ class RWTxnManaged : public RWTxn {
mdbx::txn_managed managed_txn_;
};

//! \brief ROTxnUnmanaged wraps an *unmanaged* read-write transaction, which means the underlying transaction
//! lifecycle is not touched by this class. This implies that this class does not commit nor abort the transaction.
class RWTxnUnmanaged : public RWTxn, protected ::mdbx::txn {
public:
explicit RWTxnUnmanaged(MDBX_txn* ptr) : RWTxn{static_cast<::mdbx::txn&>(*this)}, ::mdbx::txn{ptr} {}
~RWTxnUnmanaged() override = default;

void abort() override {}
void commit_and_renew() override {}
void commit_and_stop() override {}
};

//! \brief This class create ROTxn(s) on demand, it is used to enforce in some method signatures the type of db access
class ROAccess {
public:
Expand Down
2 changes: 2 additions & 0 deletions silkworm/node/huffman/decompressor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,8 @@ class Decompressor {

[[nodiscard]] bool is_open() const { return bool(compressed_file_); }

[[nodiscard]] const MemoryMappedFile* memory_file() const { return compressed_file_.get(); }

void open();

//! Read the data stream eagerly applying the specified function, expected read in sequential order
Expand Down
3 changes: 3 additions & 0 deletions silkworm/node/recsplit/rec_split.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,9 @@ class RecSplit {
return std::filesystem::last_write_time(index_path_);
}

const void* memory_file_address() const { return encoded_file_ ? encoded_file_->address() : nullptr; }
std::size_t memory_file_size() const { return encoded_file_ ? encoded_file_->length() : 0; }

private:
static inline std::size_t skip_bits(std::size_t m) { return memo[m] & 0xFFFF; }

Expand Down
Loading

0 comments on commit e5807f0

Please sign in to comment.