Skip to content

Commit

Permalink
node: refactor database ROTxn/RWTxn hierarchy (#1327)
Browse files Browse the repository at this point in the history
  • Loading branch information
canepat authored Jul 13, 2023
1 parent 720ee34 commit 3b4a35c
Show file tree
Hide file tree
Showing 33 changed files with 137 additions and 102 deletions.
2 changes: 1 addition & 1 deletion cmd/common/db_checklist.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ void run_db_checklist(NodeSettings& node_settings, bool init_if_empty) {
// Open chaindata environment and check tables are consistent
log::Message("Opening database", {"path", node_settings.data_directory->chaindata().path().string()});
auto chaindata_env{db::open_env(node_settings.chaindata_env_config)};
db::RWTxn tx(chaindata_env);
db::RWTxnManaged tx(chaindata_env);

// Ensures all tables are present
db::table::check_or_create_chaindata_tables(tx);
Expand Down
2 changes: 1 addition & 1 deletion cmd/dev/backend_kv_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ int main(int argc, char* argv[]) {
SILK_INFO << "BackEndKvServer MDBX max readers: " << database_env.max_readers();

// Read chain config from database (this allows for custom config)
db::ROTxn ro_txn{database_env};
db::ROTxnManaged ro_txn{database_env};
node_settings.chain_config = db::read_chain_config(ro_txn);
if (!node_settings.chain_config.has_value()) {
throw std::runtime_error("invalid chain config in database");
Expand Down
2 changes: 1 addition & 1 deletion cmd/dev/check_changes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ int main(int argc, char* argv[]) {
data_dir.deploy();
db::EnvConfig db_config{data_dir.chaindata().path().string()};
auto env{db::open_env(db_config)};
db::RWTxn txn{env};
db::RWTxnManaged txn{env};
auto chain_config{db::read_chain_config(txn)};
if (!chain_config) {
throw std::runtime_error("Unable to retrieve chain config");
Expand Down
2 changes: 1 addition & 1 deletion cmd/dev/check_pow.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ int main(int argc, char* argv[]) {
// Set database parameters
db::EnvConfig db_config{options.datadir};
auto env{db::open_env(db_config)};
db::ROTxn txn{env};
db::ROTxnManaged txn{env};

auto config{db::read_chain_config(txn)};
if (!config.has_value()) {
Expand Down
2 changes: 1 addition & 1 deletion cmd/dev/check_senders.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ int main(int argc, char* argv[]) {
etl::Collector collector(data_dir.etl().path().string().c_str(), /* flush size */ 512 * kMebi);

auto env{db::open_env(db_config)};
db::ROTxn txn{env};
db::ROTxnManaged txn{env};

auto canonical_hashes_cursor = txn.ro_cursor(db::table::kCanonicalHashes);
auto bodies_cursor = txn.ro_cursor(db::table::kBlockBodies);
Expand Down
2 changes: 1 addition & 1 deletion cmd/dev/scan_txs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ int main(int argc, char* argv[]) {
data_dir.deploy();
db::EnvConfig db_config{data_dir.chaindata().path().string()};
auto env{db::open_env(db_config)};
db::RWTxn txn{env};
db::RWTxnManaged txn{env};
auto chain_config{db::read_chain_config(txn)};
if (!chain_config) {
throw std::runtime_error("Unable to retrieve chain config");
Expand Down
18 changes: 9 additions & 9 deletions cmd/dev/toolbox.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ void do_stage_set(db::EnvConfig& config, std::string&& stage_name, uint32_t new_
}

auto env{silkworm::db::open_env(config)};
db::RWTxn txn{env};
db::RWTxnManaged txn{env};
if (!db::stages::is_known_stage(stage_name.c_str())) {
throw std::runtime_error("Stage name " + stage_name + " is not known");
}
Expand Down Expand Up @@ -551,7 +551,7 @@ void do_freelist(db::EnvConfig& config, bool detail) {

void do_schema(db::EnvConfig& config) {
auto env{silkworm::db::open_env(config)};
db::ROTxn txn{env};
db::ROTxnManaged txn{env};

auto schema_version{db::read_schema_version(txn)};
if (!schema_version.has_value()) {
Expand Down Expand Up @@ -845,7 +845,7 @@ void do_init_genesis(DataDirectory& data_dir, const std::string&& json_file, uin
// Prime database
db::EnvConfig config{data_dir.chaindata().path().string(), /*create*/ true};
auto env{db::open_env(config)};
db::RWTxn txn{env};
db::RWTxnManaged txn{env};
db::table::check_or_create_chaindata_tables(txn);
db::initialize_genesis(txn, genesis_json, /*allow_exceptions=*/true);

Expand All @@ -863,7 +863,7 @@ void do_init_genesis(DataDirectory& data_dir, const std::string&& json_file, uin

void do_chainconfig(db::EnvConfig& config) {
auto env{silkworm::db::open_env(config)};
db::ROTxn txn{env};
db::ROTxnManaged txn{env};
auto chain_config{db::read_chain_config(txn)};
if (!chain_config.has_value()) {
throw std::runtime_error("Not an initialized Silkworm db or unknown/custom chain ");
Expand All @@ -882,7 +882,7 @@ void do_first_byte_analysis(db::EnvConfig& config) {
}

auto env{silkworm::db::open_env(config)};
db::ROTxn txn{env};
db::ROTxnManaged txn{env};

std::cout << "\n"
<< (boost::format(fmt_hdr) % "Table name" % "%") << "\n"
Expand Down Expand Up @@ -945,7 +945,7 @@ void do_extract_headers(db::EnvConfig& config, const std::string& file_name, uin
}

auto env{silkworm::db::open_env(config)};
db::ROTxn txn{env};
db::ROTxnManaged txn{env};

// We can store all header hashes into a single byte array given all
// hashes are same in length. By consequence we only need to assert
Expand Down Expand Up @@ -1401,7 +1401,7 @@ void do_trie_reset(db::EnvConfig& config, bool always_yes) {
}

auto env{silkworm::db::open_env(config)};
db::RWTxn txn{env};
db::RWTxnManaged txn{env};
log::Info("Clearing ...", {"table", db::table::kTrieOfAccounts.name});
txn->clear_map(db::table::kTrieOfAccounts.name);
log::Info("Clearing ...", {"table", db::table::kTrieOfStorage.name});
Expand All @@ -1420,7 +1420,7 @@ void do_trie_root(db::EnvConfig& config) {
}

auto env{silkworm::db::open_env(config)};
db::ROTxn txn{env};
db::ROTxnManaged txn{env};
db::PooledCursor trie_accounts(txn, db::table::kTrieOfAccounts);
std::string source{db::table::kTrieOfAccounts.name};

Expand Down Expand Up @@ -1473,7 +1473,7 @@ void do_reset_to_download(db::EnvConfig& config, bool keep_senders) {
log::Info() << "Ok boss ... you say it. Please be patient...";

auto env{silkworm::db::open_env(config)};
db::RWTxn txn(env);
db::RWTxnManaged txn(env);

StopWatch sw(/*auto_start=*/true);
// Void finish stage
Expand Down
4 changes: 2 additions & 2 deletions silkworm/node/backend/remote/grpc/kv_calls.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ awaitable<void> TxCall::operator()(const EthereumBackEnd& backend) {
grpc::Status status{grpc::Status::OK};
try {
// Create a new read-only transaction.
read_only_txn_ = db::ROTxn{*chaindata_env};
read_only_txn_ = db::ROTxnManaged{*chaindata_env};
SILK_DEBUG << "TxCall peer: " << peer() << " started tx: " << read_only_txn_->id();

// Send an unsolicited message containing the transaction ID.
Expand Down Expand Up @@ -336,7 +336,7 @@ void TxCall::handle_max_ttl_timer_expired(const EthereumBackEnd& backend) {

// Close and reopen to avoid long-lived transactions (resource-consuming for MDBX)
read_only_txn_.abort();
read_only_txn_ = db::ROTxn{*chaindata_env};
read_only_txn_ = db::ROTxnManaged{*chaindata_env};

// Restore the whole state of the transaction (i.e. all cursor positions)
const bool restore_success = restore_cursors(positions);
Expand Down
2 changes: 1 addition & 1 deletion silkworm/node/backend/remote/grpc/kv_calls.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ class TxCall : public server::BidiStreamingCall<remote::Cursor, remote::Pair> {

static std::chrono::milliseconds max_ttl_duration_;

db::ROTxn read_only_txn_;
db::ROTxnManaged read_only_txn_;
std::map<uint32_t, TxCursor> cursors_;
uint32_t last_cursor_id_{0};
};
Expand Down
2 changes: 1 addition & 1 deletion silkworm/node/db/db_utils_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ namespace db {

auto db = db::open_env(db_config);
db::RWAccess rw_access(db);
db::RWTxn tx = rw_access.start_rw_tx();
db::RWTxnManaged tx = rw_access.start_rw_tx();

db::table::check_or_create_chaindata_tables(tx);

Expand Down
14 changes: 7 additions & 7 deletions silkworm/node/db/mdbx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,31 +225,31 @@ size_t max_value_size_for_leaf_page(const mdbx::txn& txn, const size_t key_size)
return max_value_size_for_leaf_page(page_size, key_size);
}

std::unique_ptr<ROCursor> ROTxn::ro_cursor(const MapConfig& config) {
std::unique_ptr<ROCursor> ROTxnManaged::ro_cursor(const MapConfig& config) {
return std::make_unique<PooledCursor>(*this, config);
}

std::unique_ptr<ROCursorDupSort> ROTxn::ro_cursor_dup_sort(const MapConfig& config) {
std::unique_ptr<ROCursorDupSort> ROTxnManaged::ro_cursor_dup_sort(const MapConfig& config) {
return std::make_unique<PooledCursor>(*this, config);
}

std::unique_ptr<RWCursor> RWTxn::rw_cursor(const MapConfig& config) {
std::unique_ptr<RWCursor> RWTxnManaged::rw_cursor(const MapConfig& config) {
return std::make_unique<PooledCursor>(*this, config);
}

std::unique_ptr<RWCursorDupSort> RWTxn::rw_cursor_dup_sort(const MapConfig& config) {
std::unique_ptr<RWCursorDupSort> RWTxnManaged::rw_cursor_dup_sort(const MapConfig& config) {
return std::make_unique<PooledCursor>(*this, config);
}

void RWTxn::commit_and_renew() {
void RWTxnManaged::commit_and_renew() {
if (!commit_disabled_) {
mdbx::env env = db();
mdbx::env env = ROTxnManaged::db();
managed_txn_.commit();
managed_txn_ = env.start_write(); // renew transaction
}
}

void RWTxn::commit_and_stop() {
void RWTxnManaged::commit_and_stop() {
if (!commit_disabled_) {
managed_txn_.commit();
}
Expand Down
113 changes: 75 additions & 38 deletions silkworm/node/db/mdbx.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,42 +191,63 @@ struct MapConfig {
const ::mdbx::value_mode value_mode{::mdbx::value_mode::single}; // Data Storage Mode
};

//! \brief This class wraps a read-only transaction.
//! \brief ROTxn represents a read-only transaction.
//! It is used in function signatures to clarify that read-only access is sufficient, read-write access is not required.
class ROTxn {
public:
explicit ROTxn() = default;
explicit ROTxn(mdbx::env& env) : managed_txn_{env.start_read()} {}
explicit ROTxn(mdbx::env&& env) : managed_txn_{env.start_read()} {}
virtual ~ROTxn() = default;

// Access to the underling raw mdbx transaction
virtual mdbx::txn& operator*() = 0;
virtual mdbx::txn* operator->() = 0;
virtual operator mdbx::txn&() = 0; // NOLINT(google-explicit-constructor)

[[nodiscard]] virtual uint64_t id() const = 0;
[[nodiscard]] virtual bool is_open() const = 0;
[[nodiscard]] virtual mdbx::env db() const = 0;

virtual std::unique_ptr<ROCursor> ro_cursor(const MapConfig& config) = 0;
virtual std::unique_ptr<ROCursorDupSort> ro_cursor_dup_sort(const MapConfig& config) = 0;

virtual void abort() = 0;
};

//! \brief ROTxnManaged wraps a *managed* read-only transaction, which means the underlying transaction lifecycle
//! is entirely managed by this class.
class ROTxnManaged : public virtual ROTxn {
public:
explicit ROTxnManaged() = default;
explicit ROTxnManaged(mdbx::env& env) : managed_txn_{env.start_read()} {}
explicit ROTxnManaged(mdbx::env&& env) : managed_txn_{env.start_read()} {}
~ROTxnManaged() override = default;

// Not copyable
ROTxn(const ROTxn&) = delete;
ROTxn& operator=(const ROTxn&) = delete;
ROTxnManaged(const ROTxnManaged&) = delete;
ROTxnManaged& operator=(const ROTxnManaged&) = delete;

// Only movable
ROTxn(ROTxn&& source) noexcept : managed_txn_(std::move(source.managed_txn_)) {}
ROTxn& operator=(ROTxn&& other) noexcept {
ROTxnManaged(ROTxnManaged&& source) noexcept : managed_txn_(std::move(source.managed_txn_)) {}
ROTxnManaged& operator=(ROTxnManaged&& other) noexcept {
managed_txn_ = std::move(other.managed_txn_);
return *this;
}

// Access to the underling raw mdbx transaction
mdbx::txn& operator*() { return managed_txn_; }
mdbx::txn* operator->() { return &managed_txn_; }
operator mdbx::txn&() { return managed_txn_; } // NOLINT(google-explicit-constructor)
mdbx::txn& operator*() override { return managed_txn_; }
mdbx::txn* operator->() override { return &managed_txn_; }
operator mdbx::txn&() override { return managed_txn_; } // NOLINT(google-explicit-constructor)

[[nodiscard]] uint64_t id() const { return managed_txn_.id(); }
[[nodiscard]] bool is_open() const { return managed_txn_.txn::operator bool(); }
[[nodiscard]] mdbx::env db() const { return managed_txn_.env(); }
[[nodiscard]] uint64_t id() const override { return managed_txn_.id(); }
[[nodiscard]] bool is_open() const override { return managed_txn_.txn::operator bool(); }
[[nodiscard]] mdbx::env db() const override { return managed_txn_.env(); }

virtual std::unique_ptr<ROCursor> ro_cursor(const MapConfig& config);
virtual std::unique_ptr<ROCursorDupSort> ro_cursor_dup_sort(const MapConfig& config);
std::unique_ptr<ROCursor> ro_cursor(const MapConfig& config) override;
std::unique_ptr<ROCursorDupSort> ro_cursor_dup_sort(const MapConfig& config) override;

void abort() { managed_txn_.abort(); }
void abort() override { managed_txn_.abort(); }

protected:
explicit ROTxn(mdbx::txn_managed&& source) : managed_txn_{std::move(source)} {}
explicit ROTxnManaged(mdbx::txn_managed&& source) : managed_txn_{std::move(source)} {}

mdbx::txn_managed managed_txn_;
};
Expand All @@ -235,38 +256,55 @@ class ROTxn {
//! It is used in function signatures to clarify that read-write access is required.
//! It supports explicit disable/enable of commit capabilities.
//! Disabling commit is useful for running several stages on a handful of blocks atomically.
class RWTxn : public ROTxn {
class RWTxn : public virtual ROTxn {
public:
explicit RWTxn() = default;
// This variant creates new mdbx transactions as need be.
explicit RWTxn(mdbx::env& env) : ROTxn{env.start_write()} {}
~RWTxn() override = default;

virtual void disable_commit() = 0;
virtual void enable_commit() = 0;

virtual std::unique_ptr<RWCursor> rw_cursor(const MapConfig& config) = 0;
virtual std::unique_ptr<RWCursorDupSort> rw_cursor_dup_sort(const MapConfig& config) = 0;

virtual void commit_and_renew() = 0;
virtual void commit_and_stop() = 0;
};

//! \brief RWTxnManaged wraps a *managed* read-write transaction, which means the underlying transaction lifecycle
//! is entirely managed by this class.
class RWTxnManaged : public RWTxn, public ROTxnManaged {
public:
explicit RWTxnManaged() = default;
// This variant creates new mdbx transactions as need be.
explicit RWTxnManaged(mdbx::env& env) : ROTxnManaged{env.start_write()} {}
~RWTxnManaged() override = default;

// Not copyable
RWTxn(const RWTxn&) = delete;
RWTxn& operator=(const RWTxn&) = delete;
RWTxnManaged(const RWTxnManaged&) = delete;
RWTxnManaged& operator=(const RWTxnManaged&) = delete;

// Only movable
RWTxn(RWTxn&& source) noexcept : ROTxn(std::move(source)), commit_disabled_{source.commit_disabled_} {}
RWTxn& operator=(RWTxn&& other) noexcept {
RWTxnManaged(RWTxnManaged&& source) noexcept
: ROTxnManaged(std::move(source)), commit_disabled_{source.commit_disabled_} {}
RWTxnManaged& operator=(RWTxnManaged&& other) noexcept {
commit_disabled_ = other.commit_disabled_;
ROTxn::operator=(std::move(other));
ROTxnManaged::operator=(std::move(other));
return *this;
}

void disable_commit() { commit_disabled_ = true; }
void enable_commit() { commit_disabled_ = false; }
void disable_commit() override { commit_disabled_ = true; }
void enable_commit() override { commit_disabled_ = false; }

virtual std::unique_ptr<RWCursor> rw_cursor(const MapConfig& config);
virtual std::unique_ptr<RWCursorDupSort> rw_cursor_dup_sort(const MapConfig& config);
std::unique_ptr<RWCursor> rw_cursor(const MapConfig& config) override;
std::unique_ptr<RWCursorDupSort> rw_cursor_dup_sort(const MapConfig& config) override;

void commit_and_renew();
void commit_and_stop();
void commit_and_renew() override;
void commit_and_stop() override;

void reopen(mdbx::env& env) { managed_txn_ = env.start_write(); }

protected:
explicit RWTxn(mdbx::txn_managed&& source) : ROTxn{std::move(source)} {}
explicit RWTxnManaged(mdbx::txn_managed&& source) : ROTxnManaged{std::move(source)} {}

bool commit_disabled_{false};
};
Expand All @@ -277,7 +315,7 @@ class ROAccess {
explicit ROAccess(mdbx::env& env) : env_{env} {}
ROAccess(const ROAccess& copy) = default;

ROTxn start_ro_tx() { return ROTxn(env_); }
ROTxnManaged start_ro_tx() { return ROTxnManaged(env_); }

mdbx::env& operator*() { return env_; }

Expand All @@ -291,7 +329,7 @@ class RWAccess : public ROAccess {
explicit RWAccess(mdbx::env& env) : ROAccess{env} {}
RWAccess(const RWAccess& copy) = default;

RWTxn start_rw_tx() { return RWTxn(env_); }
RWTxnManaged start_rw_tx() { return RWTxnManaged(env_); }
};

//! \brief Reference to a processing function invoked by cursor_for_each & cursor_for_count on each record
Expand Down Expand Up @@ -344,13 +382,12 @@ size_t max_value_size_for_leaf_page(const ::mdbx::txn& txn, size_t key_size);

//! \brief Managed cursor class to access cursor API
//! \remarks Unlike ::mdbx::cursor_managed this class withdraws and deposits allocated MDBX_cursor handles in a
//! thread_local pool for reuse. This helps avoiding multiple mallocs on cursor creation.
//! thread-local pool for reuse. This helps avoiding multiple malloc on cursor creation.
class PooledCursor : public RWCursorDupSort, protected ::mdbx::cursor {
public:
explicit PooledCursor();
explicit PooledCursor(RWTxn& txn, ::mdbx::map_handle map);
explicit PooledCursor(::mdbx::txn& txn, const MapConfig& config);
explicit PooledCursor(RWTxn& txn, const MapConfig& config) : PooledCursor(*txn, config) {}
~PooledCursor() override;

PooledCursor(PooledCursor&& other) noexcept;
Expand Down
Loading

0 comments on commit 3b4a35c

Please sign in to comment.