Skip to content

Commit

Permalink
Send validator telemetry to overlay
Browse files Browse the repository at this point in the history
  • Loading branch information
SpyCheese committed Sep 18, 2024
1 parent eea95ae commit e5eb2a3
Show file tree
Hide file tree
Showing 19 changed files with 564 additions and 13 deletions.
15 changes: 12 additions & 3 deletions overlay/overlay-manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ void OverlayManager::register_overlay(adnl::AdnlNodeIdShort local_id, OverlayIdS
}
overlays_[local_id][overlay_id] = OverlayDescription{std::move(overlay), std::move(cert)};

if (!with_db_) {
return;
}
auto P =
td::PromiseCreator::lambda([id = overlays_[local_id][overlay_id].overlay.get()](td::Result<DbType::GetResult> R) {
R.ensure();
Expand Down Expand Up @@ -404,13 +407,19 @@ OverlayManager::OverlayManager(std::string db_root, td::actor::ActorId<keyring::
}

void OverlayManager::start_up() {
std::shared_ptr<td::KeyValue> kv =
std::make_shared<td::RocksDb>(td::RocksDb::open(PSTRING() << db_root_ << "/overlays").move_as_ok());
db_ = DbType{std::move(kv)};
if (!db_root_.empty()) {
with_db_ = true;
std::shared_ptr<td::KeyValue> kv =
std::make_shared<td::RocksDb>(td::RocksDb::open(PSTRING() << db_root_ << "/overlays").move_as_ok());
db_ = DbType{std::move(kv)};
}
}

void OverlayManager::save_to_db(adnl::AdnlNodeIdShort local_id, OverlayIdShort overlay_id,
std::vector<OverlayNode> nodes) {
if (!with_db_) {
return;
}
std::vector<tl_object_ptr<ton_api::overlay_node>> nodes_vec;
for (auto &n : nodes) {
nodes_vec.push_back(n.tl());
Expand Down
1 change: 1 addition & 0 deletions overlay/overlay-manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ class OverlayManager : public Overlays {
td::actor::ActorId<dht::Dht> dht_node_;

using DbType = td::KeyValueAsync<td::Bits256, td::BufferSlice>;
bool with_db_ = false;
DbType db_;

class AdnlCallback : public adnl::Adnl::Callback {
Expand Down
4 changes: 2 additions & 2 deletions overlay/overlay-peers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ void OverlayImpl::add_peer(OverlayNode node) {
peer_list_.peers_.insert(id, OverlayPeer(std::move(node)));
del_some_peers();
auto X = peer_list_.peers_.get(id);
if (X != nullptr && peer_list_.neighbours_.size() < max_neighbours() &&
if (X != nullptr && !X->is_neighbour() && peer_list_.neighbours_.size() < max_neighbours() &&
!(X->get_node()->flags() & OverlayMemberFlags::DoNotReceiveBroadcasts) && X->get_id() != local_id_) {
peer_list_.neighbours_.push_back(X->get_id());
X->set_neighbour(true);
Expand Down Expand Up @@ -439,7 +439,7 @@ void OverlayImpl::update_neighbours(td::uint32 nodes_to_change) {
VLOG(OVERLAY_INFO) << this << ": adding new neighbour " << X->get_id();
peer_list_.neighbours_.push_back(X->get_id());
X->set_neighbour(true);
} else {
} else if (X->is_alive()) {
CHECK(nodes_to_change > 0);
auto i = td::Random::fast(0, static_cast<td::uint32>(peer_list_.neighbours_.size()) - 1);
auto Y = peer_list_.peers_.get(peer_list_.neighbours_[i]);
Expand Down
7 changes: 6 additions & 1 deletion overlay/overlay.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,12 @@ void OverlayImpl::alarm() {
update_db_at_ = td::Timestamp::in(60.0);
}

update_neighbours(0);
if (update_neighbours_at_.is_in_past()) {
update_neighbours(2);
update_neighbours_at_ = td::Timestamp::in(td::Random::fast(30.0, 120.0));
} else {
update_neighbours(0);
}
alarm_timestamp() = td::Timestamp::in(1.0);
} else {
update_neighbours(0);
Expand Down
1 change: 1 addition & 0 deletions overlay/overlay.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,7 @@ class OverlayImpl : public Overlay {
td::Timestamp next_dht_store_query_ = td::Timestamp::in(1.0);
td::Timestamp update_db_at_;
td::Timestamp update_throughput_at_;
td::Timestamp update_neighbours_at_;
td::Timestamp last_throughput_update_;

std::unique_ptr<Overlays::Callback> callback_;
Expand Down
41 changes: 41 additions & 0 deletions tdutils/td/utils/port/Stat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -461,4 +461,45 @@ Result<uint64> get_total_ram() {
#endif
}

Result<uint32> get_cpu_cores() {
#if TD_LINUX
uint32 result = 0;
TRY_RESULT(fd, FileFd::open("/proc/cpuinfo", FileFd::Read));
SCOPE_EXIT {
fd.close();
};
std::string data;
char buf[10000];
while (true) {
TRY_RESULT(size, fd.read(MutableSlice{buf, sizeof(buf) - 1}));
if (size == 0) {
break;
}
buf[size] = '\0';
data += buf;
}
size_t i = 0;
while (i < data.size()) {
const char *line_begin = data.data() + i;
while (i < data.size() && data[i] != '\n') {
++i;
}
auto line_end = data.data() + i;
++i;
Slice line{line_begin, line_end};
size_t j = 0;
while (j < line.size() && line[j] != ' ' && line[j] != '\t' && line[j] != ':') {
++j;
}
Slice name = line.substr(0, j);
if (name == "processor") {
++result;
}
}
return result;
#else
return Status::Error("Not supported");
#endif
}

} // namespace td
2 changes: 2 additions & 0 deletions tdutils/td/utils/port/Stat.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,6 @@ Status update_atime(CSlice path) TD_WARN_UNUSED_RESULT;

Result<uint64> get_total_ram() TD_WARN_UNUSED_RESULT;

Result<uint32> get_cpu_cores() TD_WARN_UNUSED_RESULT;

} // namespace td
7 changes: 6 additions & 1 deletion tl/generate/scheme/ton_api.tl
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,12 @@ db.files.package.value package_id:int key:Bool temp:Bool firstblocks:(vector db.
validator.groupMember public_key_hash:int256 adnl:int256 weight:long = engine.validator.GroupMember;
validator.group workchain:int shard:long catchain_seqno:int config_hash:int256 members:(vector validator.groupMember) = validator.Group;
validator.groupEx workchain:int shard:long vertical_seqno:int catchain_seqno:int config_hash:int256 members:(vector validator.groupMember) = validator.Group;
validator.groupNew workchain:int shard:long vertical_seqno:int last_key_block_seqno:int catchain_seqno:int config_hash:int256 members:(vector validator.groupMember) = validator.Group;
validator.groupNew workchain:int shard:long vertical_seqno:int last_key_block_seqno:int catchain_seqno:int config_hash:int256 members:(vector validator.groupMember) = validator.Group;

validator.telemetryOverlayId zero_state_file_hash:int256 = validator.TelemetryOverlayId;
validator.telemetry flags:# timestamp:double adnl_id:int256
node_version:string os_version:string
ram_size:long cpu_cores:int = validator.Telemetry;

---functions---

Expand Down
Binary file modified tl/generate/scheme/ton_api.tlo
Binary file not shown.
4 changes: 4 additions & 0 deletions utils/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,8 @@ target_link_libraries(opcode-timing ton_crypto)
target_include_directories(pack-viewer PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}>/..)

add_executable(telemetry-collector telemetry-collector.cpp)
target_include_directories(telemetry-collector PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/..>)
target_link_libraries(telemetry-collector PRIVATE tonhttp rldp dht tonlib git terminal overlay)

install(TARGETS generate-random-id RUNTIME DESTINATION bin)
196 changes: 196 additions & 0 deletions utils/telemetry-collector.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
#include "dht.hpp"
#include "td/utils/port/signals.h"
#include "td/utils/OptionParser.h"
#include "td/utils/filesystem.h"
#include "common/delay.h"
#include <fstream>
#include "overlay/overlays.h"

#include "auto/tl/ton_api_json.h"
#include "common/errorcode.h"

#include "tonlib/tonlib/TonlibClient.h"

#include "adnl/adnl.h"
#include "dht/dht.h"

#include <algorithm>
#include "td/utils/port/path.h"
#include "td/utils/JsonBuilder.h"
#include "auto/tl/ton_api_json.h"
#include "tl/tl_json.h"

#include "git.h"

using namespace ton;

td::IPAddress ip_addr;
std::string global_config;

class TelemetryCollector : public td::actor::Actor {
public:
TelemetryCollector() = default;

td::Status load_global_config() {
TRY_RESULT_PREFIX(conf_data, td::read_file(global_config), "failed to read: ");
TRY_RESULT_PREFIX(conf_json, td::json_decode(conf_data.as_slice()), "failed to parse json: ");
ton_api::config_global conf;
TRY_STATUS_PREFIX(ton_api::from_json(conf, conf_json.get_object()), "json does not fit TL scheme: ");
if (!conf.dht_) {
return td::Status::Error(ErrorCode::error, "does not contain [dht] section");
}
TRY_RESULT_PREFIX(dht, dht::Dht::create_global_config(std::move(conf.dht_)), "bad [dht] section: ");
dht_config_ = std::move(dht);
zerostate_hash_ = conf.validator_->zero_state_->file_hash_;
return td::Status::OK();
}

void run() {
keyring_ = keyring::Keyring::create("");
load_global_config().ensure();

adnl_network_manager_ = adnl::AdnlNetworkManager::create(0);
adnl_ = adnl::Adnl::create("", keyring_.get());
td::actor::send_closure(adnl_, &adnl::Adnl::register_network_manager, adnl_network_manager_.get());
adnl::AdnlCategoryMask cat_mask;
cat_mask[0] = true;
td::actor::send_closure(adnl_network_manager_, &adnl::AdnlNetworkManager::add_self_addr, ip_addr,
std::move(cat_mask), 0);
addr_list_.set_version(static_cast<td::int32>(td::Clocks::system()));
addr_list_.set_reinit_date(adnl::Adnl::adnl_start_time());
addr_list_.add_udp_address(ip_addr);
{
auto pk = PrivateKey{privkeys::Ed25519::random()};
auto pub = pk.compute_public_key();
td::actor::send_closure(keyring_, &keyring::Keyring::add_key, std::move(pk), true, [](td::Unit) {});
dht_id_ = adnl::AdnlNodeIdShort{pub.compute_short_id()};
td::actor::send_closure(adnl_, &adnl::Adnl::add_id, adnl::AdnlNodeIdFull{pub}, addr_list_,
static_cast<td::uint8>(0));
}
{
auto pk = PrivateKey{privkeys::Ed25519::random()};
auto pub = pk.compute_public_key();
td::actor::send_closure(keyring_, &keyring::Keyring::add_key, std::move(pk), true, [](td::Unit) {});
local_id_ = adnl::AdnlNodeIdShort{pub.compute_short_id()};
td::actor::send_closure(adnl_, &adnl::Adnl::add_id, adnl::AdnlNodeIdFull{pub}, addr_list_,
static_cast<td::uint8>(0));
}
auto D = dht::Dht::create_client(dht_id_, "", dht_config_, keyring_.get(), adnl_.get());
D.ensure();
dht_ = D.move_as_ok();
td::actor::send_closure(adnl_, &adnl::Adnl::register_dht_node, dht_.get());

overlays_ = overlay::Overlays::create("", keyring_.get(), adnl_.get(), dht_.get());

class Callback : public overlay::Overlays::Callback {
public:
explicit Callback(td::actor::ActorId<TelemetryCollector> id) : id_(id) {
}
void receive_message(adnl::AdnlNodeIdShort src, overlay::OverlayIdShort overlay_id,
td::BufferSlice data) override {
}
void receive_query(adnl::AdnlNodeIdShort src, overlay::OverlayIdShort overlay_id, td::BufferSlice data,
td::Promise<td::BufferSlice> promise) override {
}
void receive_broadcast(PublicKeyHash src, overlay::OverlayIdShort overlay_id, td::BufferSlice data) override {
td::actor::send_closure(id_, &TelemetryCollector::receive_broadcast, src, std::move(data));
}
void check_broadcast(PublicKeyHash src, overlay::OverlayIdShort overlay_id, td::BufferSlice data,
td::Promise<td::Unit> promise) override {
}

private:
td::actor::ActorId<TelemetryCollector> id_;
};

auto X = create_hash_tl_object<ton_api::validator_telemetryOverlayId>(zerostate_hash_);
td::BufferSlice b{32};
b.as_slice().copy_from(as_slice(X));
overlay::OverlayIdFull overlay_id_full{std::move(b)};
overlay::OverlayPrivacyRules rules{
8192, overlay::CertificateFlags::AllowFec | overlay::CertificateFlags::Trusted, {}};
overlay::OverlayOptions opts;
opts.frequent_dht_lookup_ = true;
LOG(WARNING) << "Overlay id : " << overlay_id_full.compute_short_id();
td::actor::send_closure(overlays_, &overlay::Overlays::create_public_overlay_ex, local_id_,
std::move(overlay_id_full), std::make_unique<Callback>(actor_id(this)), std::move(rules),
R"({ "type": "telemetry" })", opts);
}

void receive_broadcast(PublicKeyHash src, td::BufferSlice data) {
auto R = fetch_tl_prefix<ton_api::validator_telemetry>(data, true);
if (R.is_error()) {
LOG(INFO) << "Invalid broadcast from " << src << ": " << R.move_as_error();
return;
}
auto telemetry = R.move_as_ok();
if (telemetry->adnl_id_ != src.bits256_value()) {
LOG(INFO) << "Invalid broadcast from " << src << ": adnl_id mismatch";
return;
}
auto s = td::json_encode<std::string>(td::ToJson(*telemetry), false);
s.erase(std::remove_if(s.begin(), s.end(), [](char c) { return c == '\n' || c == '\r'; }), s.end());
std::cout << s << "\n";
std::cout.flush();
}

private:
adnl::AdnlNodeIdShort dht_id_, local_id_;
adnl::AdnlAddressList addr_list_;

td::actor::ActorOwn<keyring::Keyring> keyring_;
td::actor::ActorOwn<adnl::AdnlNetworkManager> adnl_network_manager_;
td::actor::ActorOwn<adnl::Adnl> adnl_;
td::actor::ActorOwn<dht::Dht> dht_;
td::actor::ActorOwn<overlay::Overlays> overlays_;

std::shared_ptr<dht::DhtGlobalConfig> dht_config_;
td::Bits256 zerostate_hash_;
};

int main(int argc, char* argv[]) {
SET_VERBOSITY_LEVEL(verbosity_INFO);

td::set_default_failure_signal_handler().ensure();

td::actor::ActorOwn<TelemetryCollector> x;
td::unique_ptr<td::LogInterface> logger_;
SCOPE_EXIT {
td::log_interface = td::default_log_interface;
};

td::OptionParser p;
p.set_description("collect validator telemetry from the overlay, print as json to stdout\n");
p.add_option('v', "verbosity", "set verbosity level", [&](td::Slice arg) {
int v = VERBOSITY_NAME(FATAL) + (td::to_integer<int>(arg));
SET_VERBOSITY_LEVEL(v);
});
p.add_option('h', "help", "prints a help message", [&]() {
char b[10240];
td::StringBuilder sb(td::MutableSlice{b, 10000});
sb << p;
std::cout << sb.as_cslice().c_str();
std::exit(2);
});
p.add_option('V', "version", "shows build information", [&]() {
std::cout << "telemetyr-collector build information: [ Commit: " << GitMetadata::CommitSHA1()
<< ", Date: " << GitMetadata::CommitDate() << "]\n";
std::exit(0);
});
p.add_option('C', "global-config", "global TON configuration file",
[&](td::Slice arg) { global_config = arg.str(); });
p.add_checked_option('a', "addr", "ip:port", [&](td::Slice arg) {
TRY_STATUS(ip_addr.init_host_port(arg.str()));
return td::Status::OK();
});

td::actor::Scheduler scheduler({3});

scheduler.run_in_context([&] { x = td::actor::create_actor<TelemetryCollector>("collector"); });
scheduler.run_in_context([&] { p.run(argc, argv).ensure(); });
scheduler.run_in_context([&] { td::actor::send_closure(x, &TelemetryCollector::run); });
while (scheduler.run(1)) {
}

return 0;
}
2 changes: 2 additions & 0 deletions validator/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ set(VALIDATOR_HEADERS

import-db-slice.hpp
queue-size-counter.hpp
validator-telemetry.hpp

manager-disk.h
manager-disk.hpp
Expand All @@ -82,6 +83,7 @@ set(VALIDATOR_SOURCE
validator-group.cpp
validator-options.cpp
queue-size-counter.cpp
validator-telemetry.cpp

downloaders/wait-block-data.cpp
downloaders/wait-block-state.cpp
Expand Down
Loading

0 comments on commit e5eb2a3

Please sign in to comment.