diff --git a/tddb/td/db/KeyValue.h b/tddb/td/db/KeyValue.h index 4e0d85384..4f30a272b 100644 --- a/tddb/td/db/KeyValue.h +++ b/tddb/td/db/KeyValue.h @@ -19,6 +19,7 @@ #pragma once #include "td/utils/Status.h" #include "td/utils/logging.h" +#include namespace td { class KeyValueReader { public: @@ -27,6 +28,9 @@ class KeyValueReader { virtual Result get(Slice key, std::string &value) = 0; virtual Result count(Slice prefix) = 0; + virtual Status for_each(std::function f) { + return Status::Error("for_each is not supported"); + } }; class PrefixedKeyValueReader : public KeyValueReader { diff --git a/tddb/td/db/RocksDb.cpp b/tddb/td/db/RocksDb.cpp index 91c5ca662..f8688c006 100644 --- a/tddb/td/db/RocksDb.cpp +++ b/tddb/td/db/RocksDb.cpp @@ -56,7 +56,7 @@ RocksDb::~RocksDb() { } RocksDb RocksDb::clone() const { - return RocksDb{db_, statistics_}; + return RocksDb{db_, options_}; } Result RocksDb::open(std::string path, RocksDbOptions options) { @@ -64,16 +64,16 @@ Result RocksDb::open(std::string path, RocksDbOptions options) { { rocksdb::Options db_options; - static auto cache = rocksdb::NewLRUCache(1 << 30); + static auto default_cache = rocksdb::NewLRUCache(1 << 30); + if (options.block_cache == nullptr) { + options.block_cache = default_cache; + } rocksdb::BlockBasedTableOptions table_options; - if (options.block_cache_size) { - table_options.block_cache = rocksdb::NewLRUCache(options.block_cache_size.value()); - } else { - table_options.block_cache = cache; - } + table_options.block_cache = options.block_cache; db_options.table_factory.reset(rocksdb::NewBlockBasedTableFactory(table_options)); + db_options.use_direct_reads = options.use_direct_reads; db_options.manual_wal_flush = true; db_options.create_if_missing = true; db_options.max_background_compactions = 4; @@ -94,7 +94,7 @@ Result RocksDb::open(std::string path, RocksDbOptions options) { // default column family delete handles[0]; } - return RocksDb(std::shared_ptr(db), std::move(options.statistics)); + return RocksDb(std::shared_ptr(db), std::move(options)); } std::shared_ptr RocksDb::create_statistics() { @@ -109,6 +109,10 @@ void RocksDb::reset_statistics(const std::shared_ptr statis statistics->Reset(); } +std::shared_ptr RocksDb::create_cache(size_t capacity) { + return rocksdb::NewLRUCache(capacity); +} + std::unique_ptr RocksDb::snapshot() { auto res = std::make_unique(clone()); res->begin_snapshot().ensure(); @@ -120,7 +124,6 @@ std::string RocksDb::stats() const { db_->GetProperty("rocksdb.stats", &out); //db_->GetProperty("rocksdb.cur-size-all-mem-tables", &out); return out; - return statistics_->ToString(); } Result RocksDb::get(Slice key, std::string &value) { @@ -187,6 +190,28 @@ Result RocksDb::count(Slice prefix) { return res; } +Status RocksDb::for_each(std::function f) { + rocksdb::ReadOptions options; + options.snapshot = snapshot_.get(); + std::unique_ptr iterator; + if (snapshot_ || !transaction_) { + iterator.reset(db_->NewIterator(options)); + } else { + iterator.reset(transaction_->GetIterator(options)); + } + + iterator->SeekToFirst(); + for (; iterator->Valid(); iterator->Next()) { + auto key = from_rocksdb(iterator->key()); + auto value = from_rocksdb(iterator->value()); + TRY_STATUS(f(key, value)); + } + if (!iterator->status().ok()) { + return from_rocksdb(iterator->status()); + } + return Status::OK(); +} + Status RocksDb::begin_write_batch() { CHECK(!transaction_); write_batch_ = std::make_unique(); @@ -243,7 +268,7 @@ Status RocksDb::end_snapshot() { return td::Status::OK(); } -RocksDb::RocksDb(std::shared_ptr db, std::shared_ptr statistics) - : db_(std::move(db)), statistics_(std::move(statistics)) { +RocksDb::RocksDb(std::shared_ptr db, RocksDbOptions options) + : db_(std::move(db)), options_(options) { } } // namespace td diff --git a/tddb/td/db/RocksDb.h b/tddb/td/db/RocksDb.h index babfadb73..5efcd0f48 100644 --- a/tddb/td/db/RocksDb.h +++ b/tddb/td/db/RocksDb.h @@ -27,6 +27,7 @@ #include "td/utils/optional.h" namespace rocksdb { +class Cache; class OptimisticTransactionDB; class Transaction; class WriteBatch; @@ -38,7 +39,8 @@ namespace td { struct RocksDbOptions { std::shared_ptr statistics = nullptr; - optional block_cache_size; // Default - one 1G cache for all RocksDb + std::shared_ptr block_cache; // Default - one 1G cache for all RocksDb + bool use_direct_reads = false; }; class RocksDb : public KeyValue { @@ -51,6 +53,7 @@ class RocksDb : public KeyValue { Status set(Slice key, Slice value) override; Status erase(Slice key) override; Result count(Slice prefix) override; + Status for_each(std::function f) override; Status begin_write_batch() override; Status commit_write_batch() override; @@ -71,6 +74,8 @@ class RocksDb : public KeyValue { static std::string statistics_to_string(const std::shared_ptr statistics); static void reset_statistics(const std::shared_ptr statistics); + static std::shared_ptr create_cache(size_t capacity); + RocksDb(RocksDb &&); RocksDb &operator=(RocksDb &&); ~RocksDb(); @@ -81,7 +86,7 @@ class RocksDb : public KeyValue { private: std::shared_ptr db_; - std::shared_ptr statistics_; + RocksDbOptions options_; std::unique_ptr transaction_; std::unique_ptr write_batch_; @@ -94,7 +99,6 @@ class RocksDb : public KeyValue { }; std::unique_ptr snapshot_; - explicit RocksDb(std::shared_ptr db, - std::shared_ptr statistics); + explicit RocksDb(std::shared_ptr db, RocksDbOptions options); }; } // namespace td diff --git a/validator-engine/validator-engine.cpp b/validator-engine/validator-engine.cpp index 0c423f7a9..06ca89b56 100644 --- a/validator-engine/validator-engine.cpp +++ b/validator-engine/validator-engine.cpp @@ -1372,6 +1372,8 @@ td::Status ValidatorEngine::load_global_config() { if (celldb_cache_size_) { validator_options_.write().set_celldb_cache_size(celldb_cache_size_.value()); } + validator_options_.write().set_celldb_direct_io(celldb_direct_io_); + validator_options_.write().set_celldb_preload_all(celldb_preload_all_); if (catchain_max_block_delay_) { validator_options_.write().set_catchain_max_block_delay(catchain_max_block_delay_.value()); } @@ -3984,6 +3986,14 @@ int main(int argc, char *argv[]) { acts.push_back([&x, v]() { td::actor::send_closure(x, &ValidatorEngine::set_celldb_cache_size, v); }); return td::Status::OK(); }); + p.add_option('\0', "celldb-direct-io", "enable direct I/O mode for RocksDb in CellDb", [&]() { + acts.push_back([&x]() { td::actor::send_closure(x, &ValidatorEngine::set_celldb_direct_io); }); + }); + p.add_option( + '\0', "celldb-preload-all", + "preload all cells from CellDb on startup (recommended to use with big enough celldb-cache-size and " + "celldb-direct-io)", + [&]() { acts.push_back([&x]() { td::actor::send_closure(x, &ValidatorEngine::set_celldb_preload_all); }); }); p.add_checked_option( '\0', "catchain-max-block-delay", "delay before creating a new catchain block, in seconds (default: 0.5)", [&](td::Slice s) -> td::Status { diff --git a/validator-engine/validator-engine.hpp b/validator-engine/validator-engine.hpp index 265bb9e40..94ced8ba3 100644 --- a/validator-engine/validator-engine.hpp +++ b/validator-engine/validator-engine.hpp @@ -210,6 +210,8 @@ class ValidatorEngine : public td::actor::Actor { bool disable_rocksdb_stats_ = false; bool nonfinal_ls_queries_enabled_ = false; td::optional celldb_cache_size_; + bool celldb_direct_io_ = false; + bool celldb_preload_all_ = false; td::optional catchain_max_block_delay_; bool read_config_ = false; bool started_keyring_ = false; @@ -286,6 +288,12 @@ class ValidatorEngine : public td::actor::Actor { void set_celldb_cache_size(td::uint64 value) { celldb_cache_size_ = value; } + void set_celldb_direct_io() { + celldb_direct_io_ = true; + } + void set_celldb_preload_all() { + celldb_preload_all_ = true; + } void set_catchain_max_block_delay(double value) { catchain_max_block_delay_ = value; } diff --git a/validator/db/celldb.cpp b/validator/db/celldb.cpp index bc315b599..dfbee0a1a 100644 --- a/validator/db/celldb.cpp +++ b/validator/db/celldb.cpp @@ -91,9 +91,10 @@ void CellDbIn::start_up() { td::RocksDbOptions db_options; db_options.statistics = statistics_; if (opts_->get_celldb_cache_size()) { - db_options.block_cache_size = opts_->get_celldb_cache_size().value(); - LOG(WARNING) << "Set CellDb block cache size to " << td::format::as_size(db_options.block_cache_size.value()); + db_options.block_cache = td::RocksDb::create_cache(opts_->get_celldb_cache_size().value()); + LOG(WARNING) << "Set CellDb block cache size to " << td::format::as_size(opts_->get_celldb_cache_size().value()); } + db_options.use_direct_reads = opts_->get_celldb_direct_io(); cell_db_ = std::make_shared(td::RocksDb::open(path_, std::move(db_options)).move_as_ok()); @@ -111,6 +112,27 @@ void CellDbIn::start_up() { set_block(empty, std::move(e)); cell_db_->commit_write_batch().ensure(); } + + if (opts_->get_celldb_preload_all()) { + // Iterate whole DB in a separate thread + delay_action([snapshot = cell_db_->snapshot()]() { + LOG(WARNING) << "CellDb: pre-loading all keys"; + td::uint64 total = 0; + td::Timer timer; + auto S = snapshot->for_each([&](td::Slice, td::Slice) { + ++total; + if (total % 1000000 == 0) { + LOG(INFO) << "CellDb: iterated " << total << " keys"; + } + return td::Status::OK(); + }); + if (S.is_error()) { + LOG(ERROR) << "CellDb: pre-load failed: " << S.move_as_error(); + } else { + LOG(WARNING) << "CellDb: iterated " << total << " keys in " << timer.elapsed() << "s"; + } + }, td::Timestamp::now()); + } } void CellDbIn::load_cell(RootHash hash, td::Promise> promise) { diff --git a/validator/validator-options.hpp b/validator/validator-options.hpp index 34f7fdf9f..593c75bf6 100644 --- a/validator/validator-options.hpp +++ b/validator/validator-options.hpp @@ -132,6 +132,12 @@ struct ValidatorManagerOptionsImpl : public ValidatorManagerOptions { td::optional get_celldb_cache_size() const override { return celldb_cache_size_; } + bool get_celldb_direct_io() const override { + return celldb_direct_io_; + } + bool get_celldb_preload_all() const override { + return celldb_preload_all_; + } td::optional get_catchain_max_block_delay() const override { return catchain_max_block_delay_; } @@ -206,6 +212,12 @@ struct ValidatorManagerOptionsImpl : public ValidatorManagerOptions { void set_celldb_cache_size(td::uint64 value) override { celldb_cache_size_ = value; } + void set_celldb_direct_io(bool value) override { + celldb_direct_io_ = value; + } + void set_celldb_preload_all(bool value) override { + celldb_preload_all_ = value; + } void set_catchain_max_block_delay(double value) override { catchain_max_block_delay_ = value; } @@ -257,6 +269,8 @@ struct ValidatorManagerOptionsImpl : public ValidatorManagerOptions { bool disable_rocksdb_stats_; bool nonfinal_ls_queries_enabled_ = false; td::optional celldb_cache_size_; + bool celldb_direct_io_ = false; + bool celldb_preload_all_ = false; td::optional catchain_max_block_delay_; }; diff --git a/validator/validator.h b/validator/validator.h index 81dfe7a39..245acbd49 100644 --- a/validator/validator.h +++ b/validator/validator.h @@ -87,6 +87,8 @@ struct ValidatorManagerOptions : public td::CntObject { virtual bool get_disable_rocksdb_stats() const = 0; virtual bool nonfinal_ls_queries_enabled() const = 0; virtual td::optional get_celldb_cache_size() const = 0; + virtual bool get_celldb_direct_io() const = 0; + virtual bool get_celldb_preload_all() const = 0; virtual td::optional get_catchain_max_block_delay() const = 0; virtual void set_zero_block_id(BlockIdExt block_id) = 0; @@ -113,6 +115,8 @@ struct ValidatorManagerOptions : public td::CntObject { virtual void set_disable_rocksdb_stats(bool value) = 0; virtual void set_nonfinal_ls_queries_enabled(bool value) = 0; virtual void set_celldb_cache_size(td::uint64 value) = 0; + virtual void set_celldb_direct_io(bool value) = 0; + virtual void set_celldb_preload_all(bool value) = 0; virtual void set_catchain_max_block_delay(double value) = 0; static td::Ref create(