Skip to content

Commit

Permalink
Add --celldb-direct-io and --celldb-preload-all
Browse files Browse the repository at this point in the history
  • Loading branch information
SpyCheese committed May 13, 2024
1 parent 816dd9c commit c58c331
Show file tree
Hide file tree
Showing 8 changed files with 108 additions and 17 deletions.
4 changes: 4 additions & 0 deletions tddb/td/db/KeyValue.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#pragma once
#include "td/utils/Status.h"
#include "td/utils/logging.h"
#include <functional>
namespace td {
class KeyValueReader {
public:
Expand All @@ -27,6 +28,9 @@ class KeyValueReader {

virtual Result<GetStatus> get(Slice key, std::string &value) = 0;
virtual Result<size_t> count(Slice prefix) = 0;
virtual Status for_each(std::function<Status(Slice, Slice)> f) {
return Status::Error("for_each is not supported");
}
};

class PrefixedKeyValueReader : public KeyValueReader {
Expand Down
47 changes: 36 additions & 11 deletions tddb/td/db/RocksDb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,24 +56,24 @@ RocksDb::~RocksDb() {
}

RocksDb RocksDb::clone() const {
return RocksDb{db_, statistics_};
return RocksDb{db_, options_};
}

Result<RocksDb> RocksDb::open(std::string path, RocksDbOptions options) {
rocksdb::OptimisticTransactionDB *db;
{
rocksdb::Options db_options;

static auto cache = rocksdb::NewLRUCache(1 << 30);
static auto default_cache = rocksdb::NewLRUCache(1 << 30);
if (options.block_cache == nullptr) {
options.block_cache = default_cache;
}

rocksdb::BlockBasedTableOptions table_options;
if (options.block_cache_size) {
table_options.block_cache = rocksdb::NewLRUCache(options.block_cache_size.value());
} else {
table_options.block_cache = cache;
}
table_options.block_cache = options.block_cache;
db_options.table_factory.reset(rocksdb::NewBlockBasedTableFactory(table_options));

db_options.use_direct_reads = options.use_direct_reads;
db_options.manual_wal_flush = true;
db_options.create_if_missing = true;
db_options.max_background_compactions = 4;
Expand All @@ -94,7 +94,7 @@ Result<RocksDb> RocksDb::open(std::string path, RocksDbOptions options) {
// default column family
delete handles[0];
}
return RocksDb(std::shared_ptr<rocksdb::OptimisticTransactionDB>(db), std::move(options.statistics));
return RocksDb(std::shared_ptr<rocksdb::OptimisticTransactionDB>(db), std::move(options));
}

std::shared_ptr<rocksdb::Statistics> RocksDb::create_statistics() {
Expand All @@ -109,6 +109,10 @@ void RocksDb::reset_statistics(const std::shared_ptr<rocksdb::Statistics> statis
statistics->Reset();
}

std::shared_ptr<rocksdb::Cache> RocksDb::create_cache(size_t capacity) {
return rocksdb::NewLRUCache(capacity);
}

std::unique_ptr<KeyValueReader> RocksDb::snapshot() {
auto res = std::make_unique<RocksDb>(clone());
res->begin_snapshot().ensure();
Expand All @@ -120,7 +124,6 @@ std::string RocksDb::stats() const {
db_->GetProperty("rocksdb.stats", &out);
//db_->GetProperty("rocksdb.cur-size-all-mem-tables", &out);
return out;
return statistics_->ToString();
}

Result<RocksDb::GetStatus> RocksDb::get(Slice key, std::string &value) {
Expand Down Expand Up @@ -187,6 +190,28 @@ Result<size_t> RocksDb::count(Slice prefix) {
return res;
}

Status RocksDb::for_each(std::function<Status(Slice, Slice)> f) {
rocksdb::ReadOptions options;
options.snapshot = snapshot_.get();
std::unique_ptr<rocksdb::Iterator> iterator;
if (snapshot_ || !transaction_) {
iterator.reset(db_->NewIterator(options));
} else {
iterator.reset(transaction_->GetIterator(options));
}

iterator->SeekToFirst();
for (; iterator->Valid(); iterator->Next()) {
auto key = from_rocksdb(iterator->key());
auto value = from_rocksdb(iterator->value());
TRY_STATUS(f(key, value));
}
if (!iterator->status().ok()) {
return from_rocksdb(iterator->status());
}
return Status::OK();
}

Status RocksDb::begin_write_batch() {
CHECK(!transaction_);
write_batch_ = std::make_unique<rocksdb::WriteBatch>();
Expand Down Expand Up @@ -243,7 +268,7 @@ Status RocksDb::end_snapshot() {
return td::Status::OK();
}

RocksDb::RocksDb(std::shared_ptr<rocksdb::OptimisticTransactionDB> db, std::shared_ptr<rocksdb::Statistics> statistics)
: db_(std::move(db)), statistics_(std::move(statistics)) {
RocksDb::RocksDb(std::shared_ptr<rocksdb::OptimisticTransactionDB> db, RocksDbOptions options)
: db_(std::move(db)), options_(options) {
}
} // namespace td
12 changes: 8 additions & 4 deletions tddb/td/db/RocksDb.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "td/utils/optional.h"

namespace rocksdb {
class Cache;
class OptimisticTransactionDB;
class Transaction;
class WriteBatch;
Expand All @@ -38,7 +39,8 @@ namespace td {

struct RocksDbOptions {
std::shared_ptr<rocksdb::Statistics> statistics = nullptr;
optional<uint64> block_cache_size; // Default - one 1G cache for all RocksDb
std::shared_ptr<rocksdb::Cache> block_cache; // Default - one 1G cache for all RocksDb
bool use_direct_reads = false;
};

class RocksDb : public KeyValue {
Expand All @@ -51,6 +53,7 @@ class RocksDb : public KeyValue {
Status set(Slice key, Slice value) override;
Status erase(Slice key) override;
Result<size_t> count(Slice prefix) override;
Status for_each(std::function<Status(Slice, Slice)> f) override;

Status begin_write_batch() override;
Status commit_write_batch() override;
Expand All @@ -71,6 +74,8 @@ class RocksDb : public KeyValue {
static std::string statistics_to_string(const std::shared_ptr<rocksdb::Statistics> statistics);
static void reset_statistics(const std::shared_ptr<rocksdb::Statistics> statistics);

static std::shared_ptr<rocksdb::Cache> create_cache(size_t capacity);

RocksDb(RocksDb &&);
RocksDb &operator=(RocksDb &&);
~RocksDb();
Expand All @@ -81,7 +86,7 @@ class RocksDb : public KeyValue {

private:
std::shared_ptr<rocksdb::OptimisticTransactionDB> db_;
std::shared_ptr<rocksdb::Statistics> statistics_;
RocksDbOptions options_;

std::unique_ptr<rocksdb::Transaction> transaction_;
std::unique_ptr<rocksdb::WriteBatch> write_batch_;
Expand All @@ -94,7 +99,6 @@ class RocksDb : public KeyValue {
};
std::unique_ptr<const rocksdb::Snapshot, UnreachableDeleter> snapshot_;

explicit RocksDb(std::shared_ptr<rocksdb::OptimisticTransactionDB> db,
std::shared_ptr<rocksdb::Statistics> statistics);
explicit RocksDb(std::shared_ptr<rocksdb::OptimisticTransactionDB> db, RocksDbOptions options);
};
} // namespace td
10 changes: 10 additions & 0 deletions validator-engine/validator-engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1372,6 +1372,8 @@ td::Status ValidatorEngine::load_global_config() {
if (celldb_cache_size_) {
validator_options_.write().set_celldb_cache_size(celldb_cache_size_.value());
}
validator_options_.write().set_celldb_direct_io(celldb_direct_io_);
validator_options_.write().set_celldb_preload_all(celldb_preload_all_);
if (catchain_max_block_delay_) {
validator_options_.write().set_catchain_max_block_delay(catchain_max_block_delay_.value());
}
Expand Down Expand Up @@ -3984,6 +3986,14 @@ int main(int argc, char *argv[]) {
acts.push_back([&x, v]() { td::actor::send_closure(x, &ValidatorEngine::set_celldb_cache_size, v); });
return td::Status::OK();
});
p.add_option('\0', "celldb-direct-io", "enable direct I/O mode for RocksDb in CellDb", [&]() {
acts.push_back([&x]() { td::actor::send_closure(x, &ValidatorEngine::set_celldb_direct_io); });
});
p.add_option(
'\0', "celldb-preload-all",
"preload all cells from CellDb on startup (recommended to use with big enough celldb-cache-size and "
"celldb-direct-io)",
[&]() { acts.push_back([&x]() { td::actor::send_closure(x, &ValidatorEngine::set_celldb_preload_all); }); });
p.add_checked_option(
'\0', "catchain-max-block-delay", "delay before creating a new catchain block, in seconds (default: 0.5)",
[&](td::Slice s) -> td::Status {
Expand Down
8 changes: 8 additions & 0 deletions validator-engine/validator-engine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,8 @@ class ValidatorEngine : public td::actor::Actor {
bool disable_rocksdb_stats_ = false;
bool nonfinal_ls_queries_enabled_ = false;
td::optional<td::uint64> celldb_cache_size_;
bool celldb_direct_io_ = false;
bool celldb_preload_all_ = false;
td::optional<double> catchain_max_block_delay_;
bool read_config_ = false;
bool started_keyring_ = false;
Expand Down Expand Up @@ -286,6 +288,12 @@ class ValidatorEngine : public td::actor::Actor {
void set_celldb_cache_size(td::uint64 value) {
celldb_cache_size_ = value;
}
void set_celldb_direct_io() {
celldb_direct_io_ = true;
}
void set_celldb_preload_all() {
celldb_preload_all_ = true;
}
void set_catchain_max_block_delay(double value) {
catchain_max_block_delay_ = value;
}
Expand Down
26 changes: 24 additions & 2 deletions validator/db/celldb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,10 @@ void CellDbIn::start_up() {
td::RocksDbOptions db_options;
db_options.statistics = statistics_;
if (opts_->get_celldb_cache_size()) {
db_options.block_cache_size = opts_->get_celldb_cache_size().value();
LOG(WARNING) << "Set CellDb block cache size to " << td::format::as_size(db_options.block_cache_size.value());
db_options.block_cache = td::RocksDb::create_cache(opts_->get_celldb_cache_size().value());
LOG(WARNING) << "Set CellDb block cache size to " << td::format::as_size(opts_->get_celldb_cache_size().value());
}
db_options.use_direct_reads = opts_->get_celldb_direct_io();
cell_db_ = std::make_shared<td::RocksDb>(td::RocksDb::open(path_, std::move(db_options)).move_as_ok());


Expand All @@ -111,6 +112,27 @@ void CellDbIn::start_up() {
set_block(empty, std::move(e));
cell_db_->commit_write_batch().ensure();
}

if (opts_->get_celldb_preload_all()) {
// Iterate whole DB in a separate thread
delay_action([snapshot = cell_db_->snapshot()]() {
LOG(WARNING) << "CellDb: pre-loading all keys";
td::uint64 total = 0;
td::Timer timer;
auto S = snapshot->for_each([&](td::Slice, td::Slice) {
++total;
if (total % 1000000 == 0) {
LOG(INFO) << "CellDb: iterated " << total << " keys";
}
return td::Status::OK();
});
if (S.is_error()) {
LOG(ERROR) << "CellDb: pre-load failed: " << S.move_as_error();
} else {
LOG(WARNING) << "CellDb: iterated " << total << " keys in " << timer.elapsed() << "s";
}
}, td::Timestamp::now());
}
}

void CellDbIn::load_cell(RootHash hash, td::Promise<td::Ref<vm::DataCell>> promise) {
Expand Down
14 changes: 14 additions & 0 deletions validator/validator-options.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,12 @@ struct ValidatorManagerOptionsImpl : public ValidatorManagerOptions {
td::optional<td::uint64> get_celldb_cache_size() const override {
return celldb_cache_size_;
}
bool get_celldb_direct_io() const override {
return celldb_direct_io_;
}
bool get_celldb_preload_all() const override {
return celldb_preload_all_;
}
td::optional<double> get_catchain_max_block_delay() const override {
return catchain_max_block_delay_;
}
Expand Down Expand Up @@ -206,6 +212,12 @@ struct ValidatorManagerOptionsImpl : public ValidatorManagerOptions {
void set_celldb_cache_size(td::uint64 value) override {
celldb_cache_size_ = value;
}
void set_celldb_direct_io(bool value) override {
celldb_direct_io_ = value;
}
void set_celldb_preload_all(bool value) override {
celldb_preload_all_ = value;
}
void set_catchain_max_block_delay(double value) override {
catchain_max_block_delay_ = value;
}
Expand Down Expand Up @@ -257,6 +269,8 @@ struct ValidatorManagerOptionsImpl : public ValidatorManagerOptions {
bool disable_rocksdb_stats_;
bool nonfinal_ls_queries_enabled_ = false;
td::optional<td::uint64> celldb_cache_size_;
bool celldb_direct_io_ = false;
bool celldb_preload_all_ = false;
td::optional<double> catchain_max_block_delay_;
};

Expand Down
4 changes: 4 additions & 0 deletions validator/validator.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ struct ValidatorManagerOptions : public td::CntObject {
virtual bool get_disable_rocksdb_stats() const = 0;
virtual bool nonfinal_ls_queries_enabled() const = 0;
virtual td::optional<td::uint64> get_celldb_cache_size() const = 0;
virtual bool get_celldb_direct_io() const = 0;
virtual bool get_celldb_preload_all() const = 0;
virtual td::optional<double> get_catchain_max_block_delay() const = 0;

virtual void set_zero_block_id(BlockIdExt block_id) = 0;
Expand All @@ -113,6 +115,8 @@ struct ValidatorManagerOptions : public td::CntObject {
virtual void set_disable_rocksdb_stats(bool value) = 0;
virtual void set_nonfinal_ls_queries_enabled(bool value) = 0;
virtual void set_celldb_cache_size(td::uint64 value) = 0;
virtual void set_celldb_direct_io(bool value) = 0;
virtual void set_celldb_preload_all(bool value) = 0;
virtual void set_catchain_max_block_delay(double value) = 0;

static td::Ref<ValidatorManagerOptions> create(
Expand Down

0 comments on commit c58c331

Please sign in to comment.