diff --git a/validator/db/archive-manager.cpp b/validator/db/archive-manager.cpp index d4d9e76c4..fa043ca25 100644 --- a/validator/db/archive-manager.cpp +++ b/validator/db/archive-manager.cpp @@ -830,9 +830,9 @@ void ArchiveManager::start_up() { archive_lru_ = td::actor::create_actor("archive_lru", opts_->get_max_open_archive_files()); } if (!opts_->get_disable_rocksdb_stats()) { - statistics_ = td::RocksDb::create_statistics(); + statistics_.init(); } - index_ = std::make_shared(td::RocksDb::open(db_root_ + "/files/globalindex", statistics_).move_as_ok()); + index_ = std::make_shared(td::RocksDb::open(db_root_ + "/files/globalindex", statistics_.rocksdb_statistics).move_as_ok()); std::string value; auto v = index_->get(create_serialize_tl_object().as_slice(), value); v.ensure(); @@ -914,7 +914,7 @@ void ArchiveManager::start_up() { void ArchiveManager::alarm() { alarm_timestamp() = td::Timestamp::in(60.0); - auto stats = td::RocksDb::statistics_to_string(statistics_); + auto stats = statistics_.to_string_and_reset(); auto to_file_r = td::FileFd::open(db_root_ + "/db_stats.txt", td::FileFd::Truncate | td::FileFd::Create | td::FileFd::Write, 0644); if (to_file_r.is_error()) { LOG(ERROR) << "Failed to open db_stats.txt: " << to_file_r.move_as_error(); @@ -927,7 +927,6 @@ void ArchiveManager::alarm() { LOG(ERROR) << "Failed to write to db_stats.txt: " << res.move_as_error(); return; } - td::RocksDb::reset_statistics(statistics_); } void ArchiveManager::run_gc(UnixTime mc_ts, UnixTime gc_ts, UnixTime archive_ttl) { diff --git a/validator/db/archive-manager.hpp b/validator/db/archive-manager.hpp index aff765445..23cbb9c89 100644 --- a/validator/db/archive-manager.hpp +++ b/validator/db/archive-manager.hpp @@ -174,7 +174,7 @@ class ArchiveManager : public td::actor::Actor { bool huge_transaction_started_ = false; td::uint32 huge_transaction_size_ = 0; - std::shared_ptr statistics_; + DbStatistics statistics_; FileMap &get_file_map(const PackageId &p) { return p.key ? key_files_ : p.temp ? temp_files_ : files_; diff --git a/validator/db/archive-slice.cpp b/validator/db/archive-slice.cpp index b38fbb7fd..43a02ec48 100644 --- a/validator/db/archive-slice.cpp +++ b/validator/db/archive-slice.cpp @@ -29,29 +29,151 @@ namespace ton { namespace validator { +class PackageStatistics { + public: + void record_open(uint64_t count = 1) { + open_count.fetch_add(count, std::memory_order_relaxed); + } + + void record_close(uint64_t count = 1) { + close_count.fetch_add(count, std::memory_order_relaxed); + } + + void record_read(double time, uint64_t bytes) { + read_bytes.fetch_add(bytes, std::memory_order_relaxed); + std::lock_guard guard(read_mutex); + read_time.insert(time); + read_time_sum += time; + } + + void record_write(double time, uint64_t bytes) { + write_bytes.fetch_add(bytes, std::memory_order_relaxed); + std::lock_guard guard(write_mutex); + write_time.insert(time); + write_time_sum += time; + } + + std::string to_string_and_reset() { + std::stringstream ss; + ss.setf(std::ios::fixed); + ss.precision(6); + + ss << "ton.pack.open COUNT : " << open_count.exchange(0, std::memory_order_relaxed) << "\n"; + ss << "ton.pack.close COUNT : " << close_count.exchange(0, std::memory_order_relaxed) << "\n"; + + ss << "ton.pack.read.bytes COUNT : " << read_bytes.exchange(0, std::memory_order_relaxed) << "\n"; + ss << "ton.pack.write.bytes COUNT : " << write_bytes.exchange(0, std::memory_order_relaxed) << "\n"; + + std::multiset temp_read_time; + double temp_read_time_sum; + { + std::lock_guard guard(read_mutex); + temp_read_time = std::move(read_time); + read_time.clear(); + temp_read_time_sum = read_time_sum; + read_time_sum = 0; + } + auto read_stats = calculate_statistics(temp_read_time); + ss << "ton.pack.read.micros P50 : " << read_stats[0] << + " P95 : " << read_stats[1] << + " P99 : " << read_stats[2] << + " P100 : " << read_stats[3] << + " COUNT : " << temp_read_time.size() << + " SUM : " << temp_read_time_sum << "\n"; + + std::multiset temp_write_time; + double temp_write_time_sum; + { + std::lock_guard guard(write_mutex); + temp_write_time = std::move(write_time); + write_time.clear(); + temp_write_time_sum = write_time_sum; + write_time_sum = 0; + } + auto write_stats = calculate_statistics(temp_write_time); + ss << "ton.pack.write.micros P50 : " << write_stats[0] << + " P95 : " << write_stats[1] << + " P99 : " << write_stats[2] << + " P100 : " << write_stats[3] << + " COUNT : " << temp_write_time.size() << + " SUM : " << temp_write_time_sum << "\n"; + + return ss.str(); + } + + private: + std::atomic_uint64_t open_count; + std::atomic_uint64_t close_count; + std::multiset read_time; + std::atomic_uint64_t read_bytes; + std::multiset write_time; + std::atomic_uint64_t write_bytes; + double read_time_sum; + double write_time_sum; + + mutable std::mutex read_mutex; + mutable std::mutex write_mutex; + + std::vector calculate_statistics(const std::multiset& data) const { + if (data.empty()) return {0, 0, 0, 0}; + + auto size = data.size(); + auto calc_percentile = [&](double p) -> double { + auto it = data.begin(); + std::advance(it, static_cast(std::ceil(p * double(size)) - 1)); + return *it; + }; + + return {calc_percentile(0.5), calc_percentile(0.95), calc_percentile(0.99), *data.rbegin()}; + } +}; + +void DbStatistics::init() { + rocksdb_statistics = td::RocksDb::create_statistics(); + pack_statistics = std::make_shared(); +} + +std::string DbStatistics::to_string_and_reset() { + std::stringstream ss; + ss << td::RocksDb::statistics_to_string(rocksdb_statistics) << pack_statistics->to_string_and_reset(); + td::RocksDb::reset_statistics(rocksdb_statistics); + return ss.str(); +} + void PackageWriter::append(std::string filename, td::BufferSlice data, td::Promise> promise) { td::uint64 offset, size; + auto data_size = data.size(); + td::Timestamp start, end; { auto p = package_.lock(); if (!p) { promise.set_error(td::Status::Error("Package is closed")); return; } - offset = p->append(std::move(filename), std::move(data), !async_mode_); + start = td::Timestamp::now(); + offset = p->append(std::move(filename), std::move(data), !async_mode_); + end = td::Timestamp::now(); size = p->size(); } + if (statistics_) { + statistics_->record_write((end.at() - start.at()) * 1e6, data_size); + } promise.set_value(std::pair{offset, size}); } class PackageReader : public td::actor::Actor { public: PackageReader(std::shared_ptr package, td::uint64 offset, - td::Promise> promise) - : package_(std::move(package)), offset_(offset), promise_(std::move(promise)) { + td::Promise> promise, std::shared_ptr statistics) + : package_(std::move(package)), offset_(offset), promise_(std::move(promise)), statistics_(std::move(statistics)) { } void start_up() override { + auto start = td::Timestamp::now(); auto result = package_->read(offset_); + if (statistics_ && result.is_ok()) { + statistics_->record_read((td::Timestamp::now().at() - start.at()) * 1e6, result.ok_ref().second.size()); + } package_ = {}; promise_.set_result(std::move(result)); stop(); @@ -61,6 +183,7 @@ class PackageReader : public td::actor::Actor { std::shared_ptr package_; td::uint64 offset_; td::Promise> promise_; + std::shared_ptr statistics_; }; void ArchiveSlice::add_handle(BlockHandle handle, td::Promise promise) { @@ -297,7 +420,7 @@ void ArchiveSlice::get_file(ConstBlockHandle handle, FileReference ref_id, td::P promise.set_value(std::move(R.move_as_ok().second)); } }); - td::actor::create_actor("reader", p->package, offset, std::move(P)).release(); + td::actor::create_actor("reader", p->package, offset, std::move(P), statistics_.pack_statistics).release(); } void ArchiveSlice::get_block_common(AccountIdPrefixFull account_id, @@ -465,7 +588,7 @@ void ArchiveSlice::get_archive_id(BlockSeqno masterchain_seqno, td::Promise(td::RocksDb::open(db_path_, statistics_).move_as_ok()); + kv_ = std::make_unique(td::RocksDb::open(db_path_, statistics_.rocksdb_statistics).move_as_ok()); std::string value; auto R2 = kv_->get("status", value); R2.ensure(); @@ -547,6 +670,7 @@ void ArchiveSlice::do_close() { LOG(DEBUG) << "Closing archive slice " << db_path_; status_ = st_closed; kv_ = {}; + statistics_.pack_statistics->record_close(packages_.size()); packages_.clear(); } @@ -604,7 +728,7 @@ void ArchiveSlice::set_async_mode(bool mode, td::Promise promise) { } ArchiveSlice::ArchiveSlice(td::uint32 archive_id, bool key_blocks_only, bool temp, bool finalized, std::string db_root, - td::actor::ActorId archive_lru, std::shared_ptr statistics) + td::actor::ActorId archive_lru, DbStatistics statistics) : archive_id_(archive_id) , key_blocks_only_(key_blocks_only) , temp_(temp) @@ -650,6 +774,7 @@ void ArchiveSlice::add_package(td::uint32 seqno, td::uint64 size, td::uint32 ver LOG(FATAL) << "failed to open/create archive '" << path << "': " << R.move_as_error(); return; } + statistics_.pack_statistics->record_open(); auto idx = td::narrow_cast(packages_.size()); if (finalized_) { packages_.emplace_back(nullptr, td::actor::ActorOwn(), seqno, path, idx, version); @@ -659,7 +784,7 @@ void ArchiveSlice::add_package(td::uint32 seqno, td::uint64 size, td::uint32 ver if (version >= 1) { pack->truncate(size).ensure(); } - auto writer = td::actor::create_actor("writer", pack, async_mode_); + auto writer = td::actor::create_actor("writer", pack, async_mode_, statistics_.pack_statistics); packages_.emplace_back(std::move(pack), std::move(writer), seqno, path, idx, version); } @@ -692,7 +817,7 @@ void ArchiveSlice::destroy(td::Promise promise) { for (auto &p : packages_) { td::unlink(p.path).ensure(); } - + statistics_.pack_statistics->record_close(packages_.size()); packages_.clear(); kv_ = nullptr; @@ -898,7 +1023,8 @@ void ArchiveSlice::truncate(BlockSeqno masterchain_seqno, ConstBlockHandle handl for (auto idx = pack->idx + 1; idx < packages_.size(); idx++) { td::unlink(packages_[idx].path).ensure(); } - packages_.erase(packages_.begin() + pack->idx + 1); + statistics_.pack_statistics->record_close(packages_.size() - pack->idx - 1); + packages_.erase(packages_.begin() + pack->idx + 1, packages_.end()); kv_->commit_transaction().ensure(); diff --git a/validator/db/archive-slice.hpp b/validator/db/archive-slice.hpp index a58df32b6..faec2fb83 100644 --- a/validator/db/archive-slice.hpp +++ b/validator/db/archive-slice.hpp @@ -21,6 +21,7 @@ #include "validator/interfaces/db.h" #include "package.hpp" #include "fileref.hpp" +#include "td/db/RocksDb.h" #include namespace rocksdb { @@ -57,10 +58,20 @@ struct PackageId { } }; +class PackageStatistics; + +struct DbStatistics { + void init(); + std::string to_string_and_reset(); + + std::shared_ptr pack_statistics; + std::shared_ptr rocksdb_statistics; +}; + class PackageWriter : public td::actor::Actor { public: - PackageWriter(std::weak_ptr package, bool async_mode = false) - : package_(std::move(package)), async_mode_(async_mode) { + PackageWriter(std::weak_ptr package, bool async_mode = false, std::shared_ptr statistics = nullptr) + : package_(std::move(package)), async_mode_(async_mode), statistics_(std::move(statistics)) { } void append(std::string filename, td::BufferSlice data, td::Promise> promise); @@ -78,6 +89,7 @@ class PackageWriter : public td::actor::Actor { private: std::weak_ptr package_; bool async_mode_ = false; + std::shared_ptr statistics_; }; class ArchiveLru; @@ -85,7 +97,7 @@ class ArchiveLru; class ArchiveSlice : public td::actor::Actor { public: ArchiveSlice(td::uint32 archive_id, bool key_blocks_only, bool temp, bool finalized, std::string db_root, - td::actor::ActorId archive_lru, std::shared_ptr statistics = nullptr); + td::actor::ActorId archive_lru, DbStatistics statistics = {}); void get_archive_id(BlockSeqno masterchain_seqno, td::Promise promise); @@ -155,7 +167,7 @@ class ArchiveSlice : public td::actor::Actor { std::string db_root_; td::actor::ActorId archive_lru_; - std::shared_ptr statistics_; + DbStatistics statistics_; std::unique_ptr kv_; struct PackageInfo {