Skip to content

Commit

Permalink
Collect statistics for .pack files (#944)
Browse files Browse the repository at this point in the history
* Statistics for .pack files

* optimizations

* fix typo

* fix erasing packages
  • Loading branch information
dungeon-master-666 authored Mar 27, 2024
1 parent b076143 commit 10487b1
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 18 deletions.
7 changes: 3 additions & 4 deletions validator/db/archive-manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -830,9 +830,9 @@ void ArchiveManager::start_up() {
archive_lru_ = td::actor::create_actor<ArchiveLru>("archive_lru", opts_->get_max_open_archive_files());
}
if (!opts_->get_disable_rocksdb_stats()) {
statistics_ = td::RocksDb::create_statistics();
statistics_.init();
}
index_ = std::make_shared<td::RocksDb>(td::RocksDb::open(db_root_ + "/files/globalindex", statistics_).move_as_ok());
index_ = std::make_shared<td::RocksDb>(td::RocksDb::open(db_root_ + "/files/globalindex", statistics_.rocksdb_statistics).move_as_ok());
std::string value;
auto v = index_->get(create_serialize_tl_object<ton_api::db_files_index_key>().as_slice(), value);
v.ensure();
Expand Down Expand Up @@ -914,7 +914,7 @@ void ArchiveManager::start_up() {

void ArchiveManager::alarm() {
alarm_timestamp() = td::Timestamp::in(60.0);
auto stats = td::RocksDb::statistics_to_string(statistics_);
auto stats = statistics_.to_string_and_reset();
auto to_file_r = td::FileFd::open(db_root_ + "/db_stats.txt", td::FileFd::Truncate | td::FileFd::Create | td::FileFd::Write, 0644);
if (to_file_r.is_error()) {
LOG(ERROR) << "Failed to open db_stats.txt: " << to_file_r.move_as_error();
Expand All @@ -927,7 +927,6 @@ void ArchiveManager::alarm() {
LOG(ERROR) << "Failed to write to db_stats.txt: " << res.move_as_error();
return;
}
td::RocksDb::reset_statistics(statistics_);
}

void ArchiveManager::run_gc(UnixTime mc_ts, UnixTime gc_ts, UnixTime archive_ttl) {
Expand Down
2 changes: 1 addition & 1 deletion validator/db/archive-manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ class ArchiveManager : public td::actor::Actor {
bool huge_transaction_started_ = false;
td::uint32 huge_transaction_size_ = 0;

std::shared_ptr<rocksdb::Statistics> statistics_;
DbStatistics statistics_;

FileMap &get_file_map(const PackageId &p) {
return p.key ? key_files_ : p.temp ? temp_files_ : files_;
Expand Down
144 changes: 135 additions & 9 deletions validator/db/archive-slice.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,29 +29,151 @@ namespace ton {

namespace validator {

class PackageStatistics {
public:
void record_open(uint64_t count = 1) {
open_count.fetch_add(count, std::memory_order_relaxed);
}

void record_close(uint64_t count = 1) {
close_count.fetch_add(count, std::memory_order_relaxed);
}

void record_read(double time, uint64_t bytes) {
read_bytes.fetch_add(bytes, std::memory_order_relaxed);
std::lock_guard<std::mutex> guard(read_mutex);
read_time.insert(time);
read_time_sum += time;
}

void record_write(double time, uint64_t bytes) {
write_bytes.fetch_add(bytes, std::memory_order_relaxed);
std::lock_guard<std::mutex> guard(write_mutex);
write_time.insert(time);
write_time_sum += time;
}

std::string to_string_and_reset() {
std::stringstream ss;
ss.setf(std::ios::fixed);
ss.precision(6);

ss << "ton.pack.open COUNT : " << open_count.exchange(0, std::memory_order_relaxed) << "\n";
ss << "ton.pack.close COUNT : " << close_count.exchange(0, std::memory_order_relaxed) << "\n";

ss << "ton.pack.read.bytes COUNT : " << read_bytes.exchange(0, std::memory_order_relaxed) << "\n";
ss << "ton.pack.write.bytes COUNT : " << write_bytes.exchange(0, std::memory_order_relaxed) << "\n";

std::multiset<double> temp_read_time;
double temp_read_time_sum;
{
std::lock_guard<std::mutex> guard(read_mutex);
temp_read_time = std::move(read_time);
read_time.clear();
temp_read_time_sum = read_time_sum;
read_time_sum = 0;
}
auto read_stats = calculate_statistics(temp_read_time);
ss << "ton.pack.read.micros P50 : " << read_stats[0] <<
" P95 : " << read_stats[1] <<
" P99 : " << read_stats[2] <<
" P100 : " << read_stats[3] <<
" COUNT : " << temp_read_time.size() <<
" SUM : " << temp_read_time_sum << "\n";

std::multiset<double> temp_write_time;
double temp_write_time_sum;
{
std::lock_guard<std::mutex> guard(write_mutex);
temp_write_time = std::move(write_time);
write_time.clear();
temp_write_time_sum = write_time_sum;
write_time_sum = 0;
}
auto write_stats = calculate_statistics(temp_write_time);
ss << "ton.pack.write.micros P50 : " << write_stats[0] <<
" P95 : " << write_stats[1] <<
" P99 : " << write_stats[2] <<
" P100 : " << write_stats[3] <<
" COUNT : " << temp_write_time.size() <<
" SUM : " << temp_write_time_sum << "\n";

return ss.str();
}

private:
std::atomic_uint64_t open_count;
std::atomic_uint64_t close_count;
std::multiset<double> read_time;
std::atomic_uint64_t read_bytes;
std::multiset<double> write_time;
std::atomic_uint64_t write_bytes;
double read_time_sum;
double write_time_sum;

mutable std::mutex read_mutex;
mutable std::mutex write_mutex;

std::vector<double> calculate_statistics(const std::multiset<double>& data) const {
if (data.empty()) return {0, 0, 0, 0};

auto size = data.size();
auto calc_percentile = [&](double p) -> double {
auto it = data.begin();
std::advance(it, static_cast<int>(std::ceil(p * double(size)) - 1));
return *it;
};

return {calc_percentile(0.5), calc_percentile(0.95), calc_percentile(0.99), *data.rbegin()};
}
};

void DbStatistics::init() {
rocksdb_statistics = td::RocksDb::create_statistics();
pack_statistics = std::make_shared<PackageStatistics>();
}

std::string DbStatistics::to_string_and_reset() {
std::stringstream ss;
ss << td::RocksDb::statistics_to_string(rocksdb_statistics) << pack_statistics->to_string_and_reset();
td::RocksDb::reset_statistics(rocksdb_statistics);
return ss.str();
}

void PackageWriter::append(std::string filename, td::BufferSlice data,
td::Promise<std::pair<td::uint64, td::uint64>> promise) {
td::uint64 offset, size;
auto data_size = data.size();
td::Timestamp start, end;
{
auto p = package_.lock();
if (!p) {
promise.set_error(td::Status::Error("Package is closed"));
return;
}
offset = p->append(std::move(filename), std::move(data), !async_mode_);
start = td::Timestamp::now();
offset = p->append(std::move(filename), std::move(data), !async_mode_);
end = td::Timestamp::now();
size = p->size();
}
if (statistics_) {
statistics_->record_write((end.at() - start.at()) * 1e6, data_size);
}
promise.set_value(std::pair<td::uint64, td::uint64>{offset, size});
}

class PackageReader : public td::actor::Actor {
public:
PackageReader(std::shared_ptr<Package> package, td::uint64 offset,
td::Promise<std::pair<std::string, td::BufferSlice>> promise)
: package_(std::move(package)), offset_(offset), promise_(std::move(promise)) {
td::Promise<std::pair<std::string, td::BufferSlice>> promise, std::shared_ptr<PackageStatistics> statistics)
: package_(std::move(package)), offset_(offset), promise_(std::move(promise)), statistics_(std::move(statistics)) {
}
void start_up() override {
auto start = td::Timestamp::now();
auto result = package_->read(offset_);
if (statistics_ && result.is_ok()) {
statistics_->record_read((td::Timestamp::now().at() - start.at()) * 1e6, result.ok_ref().second.size());
}
package_ = {};
promise_.set_result(std::move(result));
stop();
Expand All @@ -61,6 +183,7 @@ class PackageReader : public td::actor::Actor {
std::shared_ptr<Package> package_;
td::uint64 offset_;
td::Promise<std::pair<std::string, td::BufferSlice>> promise_;
std::shared_ptr<PackageStatistics> statistics_;
};

void ArchiveSlice::add_handle(BlockHandle handle, td::Promise<td::Unit> promise) {
Expand Down Expand Up @@ -297,7 +420,7 @@ void ArchiveSlice::get_file(ConstBlockHandle handle, FileReference ref_id, td::P
promise.set_value(std::move(R.move_as_ok().second));
}
});
td::actor::create_actor<PackageReader>("reader", p->package, offset, std::move(P)).release();
td::actor::create_actor<PackageReader>("reader", p->package, offset, std::move(P), statistics_.pack_statistics).release();
}

void ArchiveSlice::get_block_common(AccountIdPrefixFull account_id,
Expand Down Expand Up @@ -465,7 +588,7 @@ void ArchiveSlice::get_archive_id(BlockSeqno masterchain_seqno, td::Promise<td::
void ArchiveSlice::before_query() {
if (status_ == st_closed) {
LOG(DEBUG) << "Opening archive slice " << db_path_;
kv_ = std::make_unique<td::RocksDb>(td::RocksDb::open(db_path_, statistics_).move_as_ok());
kv_ = std::make_unique<td::RocksDb>(td::RocksDb::open(db_path_, statistics_.rocksdb_statistics).move_as_ok());
std::string value;
auto R2 = kv_->get("status", value);
R2.ensure();
Expand Down Expand Up @@ -547,6 +670,7 @@ void ArchiveSlice::do_close() {
LOG(DEBUG) << "Closing archive slice " << db_path_;
status_ = st_closed;
kv_ = {};
statistics_.pack_statistics->record_close(packages_.size());
packages_.clear();
}

Expand Down Expand Up @@ -604,7 +728,7 @@ void ArchiveSlice::set_async_mode(bool mode, td::Promise<td::Unit> promise) {
}

ArchiveSlice::ArchiveSlice(td::uint32 archive_id, bool key_blocks_only, bool temp, bool finalized, std::string db_root,
td::actor::ActorId<ArchiveLru> archive_lru, std::shared_ptr<rocksdb::Statistics> statistics)
td::actor::ActorId<ArchiveLru> archive_lru, DbStatistics statistics)
: archive_id_(archive_id)
, key_blocks_only_(key_blocks_only)
, temp_(temp)
Expand Down Expand Up @@ -650,6 +774,7 @@ void ArchiveSlice::add_package(td::uint32 seqno, td::uint64 size, td::uint32 ver
LOG(FATAL) << "failed to open/create archive '" << path << "': " << R.move_as_error();
return;
}
statistics_.pack_statistics->record_open();
auto idx = td::narrow_cast<td::uint32>(packages_.size());
if (finalized_) {
packages_.emplace_back(nullptr, td::actor::ActorOwn<PackageWriter>(), seqno, path, idx, version);
Expand All @@ -659,7 +784,7 @@ void ArchiveSlice::add_package(td::uint32 seqno, td::uint64 size, td::uint32 ver
if (version >= 1) {
pack->truncate(size).ensure();
}
auto writer = td::actor::create_actor<PackageWriter>("writer", pack, async_mode_);
auto writer = td::actor::create_actor<PackageWriter>("writer", pack, async_mode_, statistics_.pack_statistics);
packages_.emplace_back(std::move(pack), std::move(writer), seqno, path, idx, version);
}

Expand Down Expand Up @@ -692,7 +817,7 @@ void ArchiveSlice::destroy(td::Promise<td::Unit> promise) {
for (auto &p : packages_) {
td::unlink(p.path).ensure();
}

statistics_.pack_statistics->record_close(packages_.size());
packages_.clear();
kv_ = nullptr;

Expand Down Expand Up @@ -898,7 +1023,8 @@ void ArchiveSlice::truncate(BlockSeqno masterchain_seqno, ConstBlockHandle handl
for (auto idx = pack->idx + 1; idx < packages_.size(); idx++) {
td::unlink(packages_[idx].path).ensure();
}
packages_.erase(packages_.begin() + pack->idx + 1);
statistics_.pack_statistics->record_close(packages_.size() - pack->idx - 1);
packages_.erase(packages_.begin() + pack->idx + 1, packages_.end());

kv_->commit_transaction().ensure();

Expand Down
20 changes: 16 additions & 4 deletions validator/db/archive-slice.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "validator/interfaces/db.h"
#include "package.hpp"
#include "fileref.hpp"
#include "td/db/RocksDb.h"
#include <map>

namespace rocksdb {
Expand Down Expand Up @@ -57,10 +58,20 @@ struct PackageId {
}
};

class PackageStatistics;

struct DbStatistics {
void init();
std::string to_string_and_reset();

std::shared_ptr<PackageStatistics> pack_statistics;
std::shared_ptr<rocksdb::Statistics> rocksdb_statistics;
};

class PackageWriter : public td::actor::Actor {
public:
PackageWriter(std::weak_ptr<Package> package, bool async_mode = false)
: package_(std::move(package)), async_mode_(async_mode) {
PackageWriter(std::weak_ptr<Package> package, bool async_mode = false, std::shared_ptr<PackageStatistics> statistics = nullptr)
: package_(std::move(package)), async_mode_(async_mode), statistics_(std::move(statistics)) {
}

void append(std::string filename, td::BufferSlice data, td::Promise<std::pair<td::uint64, td::uint64>> promise);
Expand All @@ -78,14 +89,15 @@ class PackageWriter : public td::actor::Actor {
private:
std::weak_ptr<Package> package_;
bool async_mode_ = false;
std::shared_ptr<PackageStatistics> statistics_;
};

class ArchiveLru;

class ArchiveSlice : public td::actor::Actor {
public:
ArchiveSlice(td::uint32 archive_id, bool key_blocks_only, bool temp, bool finalized, std::string db_root,
td::actor::ActorId<ArchiveLru> archive_lru, std::shared_ptr<rocksdb::Statistics> statistics = nullptr);
td::actor::ActorId<ArchiveLru> archive_lru, DbStatistics statistics = {});

void get_archive_id(BlockSeqno masterchain_seqno, td::Promise<td::uint64> promise);

Expand Down Expand Up @@ -155,7 +167,7 @@ class ArchiveSlice : public td::actor::Actor {

std::string db_root_;
td::actor::ActorId<ArchiveLru> archive_lru_;
std::shared_ptr<rocksdb::Statistics> statistics_;
DbStatistics statistics_;
std::unique_ptr<td::KeyValue> kv_;

struct PackageInfo {
Expand Down

0 comments on commit 10487b1

Please sign in to comment.