Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge safe features branch #1156

Merged
merged 6 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions adnl/adnl-channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,16 +112,16 @@ void AdnlChannelImpl::send_message(td::uint32 priority, td::actor::ActorId<AdnlN
}

void AdnlChannelImpl::receive(td::IPAddress addr, td::BufferSlice data) {
auto P = td::PromiseCreator::lambda(
[peer = peer_pair_, channel_id = channel_in_id_, addr, id = print_id()](td::Result<AdnlPacket> R) {
if (R.is_error()) {
VLOG(ADNL_WARNING) << id << ": dropping IN message: can not decrypt: " << R.move_as_error();
} else {
auto packet = R.move_as_ok();
packet.set_remote_addr(addr);
td::actor::send_closure(peer, &AdnlPeerPair::receive_packet_from_channel, channel_id, std::move(packet));
}
});
auto P = td::PromiseCreator::lambda([peer = peer_pair_, channel_id = channel_in_id_, addr, id = print_id(),
size = data.size()](td::Result<AdnlPacket> R) {
if (R.is_error()) {
VLOG(ADNL_WARNING) << id << ": dropping IN message: can not decrypt: " << R.move_as_error();
} else {
auto packet = R.move_as_ok();
packet.set_remote_addr(addr);
td::actor::send_closure(peer, &AdnlPeerPair::receive_packet_from_channel, channel_id, std::move(packet), size);
}
});

decrypt(std::move(data), std::move(P));
}
Expand Down
97 changes: 86 additions & 11 deletions adnl/adnl-local-id.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,20 +41,34 @@ AdnlAddressList AdnlLocalId::get_addr_list() const {
}

void AdnlLocalId::receive(td::IPAddress addr, td::BufferSlice data) {
auto P = td::PromiseCreator::lambda(
[peer_table = peer_table_, dst = short_id_, addr, id = print_id()](td::Result<AdnlPacket> R) {
if (R.is_error()) {
VLOG(ADNL_WARNING) << id << ": dropping IN message: cannot decrypt: " << R.move_as_error();
} else {
auto packet = R.move_as_ok();
packet.set_remote_addr(addr);
td::actor::send_closure(peer_table, &AdnlPeerTable::receive_decrypted_packet, dst, std::move(packet));
}
});

InboundRateLimiter& rate_limiter = inbound_rate_limiter_[addr];
if (!rate_limiter.rate_limiter.take()) {
VLOG(ADNL_NOTICE) << this << ": dropping IN message: rate limit exceeded";
add_dropped_packet_stats(addr);
return;
}
++rate_limiter.currently_decrypting_packets;
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), peer_table = peer_table_, dst = short_id_, addr,
id = print_id(), size = data.size()](td::Result<AdnlPacket> R) {
td::actor::send_closure(SelfId, &AdnlLocalId::decrypt_packet_done, addr);
if (R.is_error()) {
VLOG(ADNL_WARNING) << id << ": dropping IN message: cannot decrypt: " << R.move_as_error();
} else {
auto packet = R.move_as_ok();
packet.set_remote_addr(addr);
td::actor::send_closure(peer_table, &AdnlPeerTable::receive_decrypted_packet, dst, std::move(packet), size);
}
});
decrypt(std::move(data), std::move(P));
}

void AdnlLocalId::decrypt_packet_done(td::IPAddress addr) {
auto it = inbound_rate_limiter_.find(addr);
CHECK(it != inbound_rate_limiter_.end());
--it->second.currently_decrypting_packets;
add_decrypted_packet_stats(addr);
}

void AdnlLocalId::deliver(AdnlNodeIdShort src, td::BufferSlice data) {
auto s = std::move(data);
for (auto &cb : cb_) {
Expand Down Expand Up @@ -292,6 +306,67 @@ void AdnlLocalId::update_packet(AdnlPacket packet, bool update_id, bool sign, td
}
}

void AdnlLocalId::get_stats(td::Promise<tl_object_ptr<ton_api::adnl_stats_localId>> promise) {
auto stats = create_tl_object<ton_api::adnl_stats_localId>();
stats->short_id_ = short_id_.bits256_value();
for (auto &[ip, x] : inbound_rate_limiter_) {
if (x.currently_decrypting_packets != 0) {
stats->current_decrypt_.push_back(create_tl_object<ton_api::adnl_stats_ipPackets>(
ip.is_valid() ? PSTRING() << ip.get_ip_str() << ":" << ip.get_port() : "", x.currently_decrypting_packets));
}
}
prepare_packet_stats();
stats->packets_recent_ = packet_stats_prev_.tl();
stats->packets_total_ = packet_stats_total_.tl();
stats->packets_total_->ts_start_ = (double)Adnl::adnl_start_time();
stats->packets_total_->ts_end_ = td::Clocks::system();
promise.set_result(std::move(stats));
}

void AdnlLocalId::add_decrypted_packet_stats(td::IPAddress addr) {
prepare_packet_stats();
++packet_stats_cur_.decrypted_packets[addr];
++packet_stats_total_.decrypted_packets[addr];
}

void AdnlLocalId::add_dropped_packet_stats(td::IPAddress addr) {
prepare_packet_stats();
++packet_stats_cur_.dropped_packets[addr];
++packet_stats_total_.dropped_packets[addr];
}

void AdnlLocalId::prepare_packet_stats() {
double now = td::Clocks::system();
if (now >= packet_stats_cur_.ts_end) {
packet_stats_prev_ = std::move(packet_stats_cur_);
packet_stats_cur_ = {};
auto now_int = (int)td::Clocks::system();
packet_stats_cur_.ts_start = (double)(now_int / 60 * 60);
packet_stats_cur_.ts_end = packet_stats_cur_.ts_start + 60.0;
if (packet_stats_prev_.ts_end < now - 60.0) {
packet_stats_prev_ = {};
packet_stats_prev_.ts_end = packet_stats_cur_.ts_start;
packet_stats_prev_.ts_start = packet_stats_prev_.ts_end - 60.0;
}
}
}

tl_object_ptr<ton_api::adnl_stats_localIdPackets> AdnlLocalId::PacketStats::tl() const {
auto obj = create_tl_object<ton_api::adnl_stats_localIdPackets>();
obj->ts_start_ = ts_start;
obj->ts_end_ = ts_end;
for (const auto &[ip, packets] : decrypted_packets) {
obj->decrypted_packets_.push_back(create_tl_object<ton_api::adnl_stats_ipPackets>(
ip.is_valid() ? PSTRING() << ip.get_ip_str() << ":" << ip.get_port() : "", packets));
}
for (const auto &[ip, packets] : dropped_packets) {
obj->dropped_packets_.push_back(create_tl_object<ton_api::adnl_stats_ipPackets>(
ip.is_valid() ? PSTRING() << ip.get_ip_str() << ":" << ip.get_port() : "", packets));
}
return obj;
}


} // namespace adnl

} // namespace ton
19 changes: 19 additions & 0 deletions adnl/adnl-local-id.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class AdnlLocalId : public td::actor::Actor {
void deliver(AdnlNodeIdShort src, td::BufferSlice data);
void deliver_query(AdnlNodeIdShort src, td::BufferSlice data, td::Promise<td::BufferSlice> promise);
void receive(td::IPAddress addr, td::BufferSlice data);
void decrypt_packet_done(td::IPAddress addr);

void subscribe(std::string prefix, std::unique_ptr<AdnlPeerTable::Callback> callback);
void unsubscribe(std::string prefix);
Expand All @@ -77,6 +78,8 @@ class AdnlLocalId : public td::actor::Actor {
void update_packet(AdnlPacket packet, bool update_id, bool sign, td::int32 update_addr_list_if,
td::int32 update_priority_addr_list_if, td::Promise<AdnlPacket> promise);

void get_stats(td::Promise<tl_object_ptr<ton_api::adnl_stats_localId>> promise);

td::uint32 get_mode() {
return mode_;
}
Expand All @@ -101,6 +104,22 @@ class AdnlLocalId : public td::actor::Actor {

td::uint32 mode_;

struct InboundRateLimiter {
RateLimiter rate_limiter = RateLimiter(75, 0.33);
td::uint64 currently_decrypting_packets = 0;
};
std::map<td::IPAddress, InboundRateLimiter> inbound_rate_limiter_;
struct PacketStats {
double ts_start = 0.0, ts_end = 0.0;
std::map<td::IPAddress, td::uint64> decrypted_packets;
std::map<td::IPAddress, td::uint64> dropped_packets;

tl_object_ptr<ton_api::adnl_stats_localIdPackets> tl() const;
} packet_stats_cur_, packet_stats_prev_, packet_stats_total_;
void add_decrypted_packet_stats(td::IPAddress addr);
void add_dropped_packet_stats(td::IPAddress addr);
void prepare_packet_stats();

void publish_address_list();
};

Expand Down
86 changes: 84 additions & 2 deletions adnl/adnl-peer-table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ void AdnlPeerTableImpl::receive_packet(td::IPAddress addr, AdnlCategoryMask cat_
<< " (len=" << (data.size() + 32) << ")";
}

void AdnlPeerTableImpl::receive_decrypted_packet(AdnlNodeIdShort dst, AdnlPacket packet) {
void AdnlPeerTableImpl::receive_decrypted_packet(AdnlNodeIdShort dst, AdnlPacket packet, td::uint64 serialized_size) {
packet.run_basic_checks().ensure();

if (!packet.inited_from_short()) {
Expand Down Expand Up @@ -119,7 +119,7 @@ void AdnlPeerTableImpl::receive_decrypted_packet(AdnlNodeIdShort dst, AdnlPacket
return;
}
td::actor::send_closure(it->second, &AdnlPeer::receive_packet, dst, it2->second.mode, it2->second.local_id.get(),
std::move(packet));
std::move(packet), serialized_size);
}

void AdnlPeerTableImpl::add_peer(AdnlNodeIdShort local_id, AdnlNodeIdFull id, AdnlAddressList addr_list) {
Expand Down Expand Up @@ -385,6 +385,88 @@ void AdnlPeerTableImpl::get_conn_ip_str(AdnlNodeIdShort l_id, AdnlNodeIdShort p_
td::actor::send_closure(it->second, &AdnlPeer::get_conn_ip_str, l_id, std::move(promise));
}

void AdnlPeerTableImpl::get_stats(td::Promise<tl_object_ptr<ton_api::adnl_stats>> promise) {
class Cb : public td::actor::Actor {
public:
explicit Cb(td::Promise<tl_object_ptr<ton_api::adnl_stats>> promise) : promise_(std::move(promise)) {
}

void got_local_id_stats(tl_object_ptr<ton_api::adnl_stats_localId> local_id) {
auto &local_id_stats = local_id_stats_[local_id->short_id_];
if (local_id_stats) {
local_id->peers_ = std::move(local_id_stats->peers_);
}
local_id_stats = std::move(local_id);
dec_pending();
}

void got_peer_stats(std::vector<tl_object_ptr<ton_api::adnl_stats_peerPair>> peer_pairs) {
for (auto &peer_pair : peer_pairs) {
auto &local_id_stats = local_id_stats_[peer_pair->local_id_];
if (local_id_stats == nullptr) {
local_id_stats = create_tl_object<ton_api::adnl_stats_localId>();
local_id_stats->short_id_ = peer_pair->local_id_;
}
local_id_stats->peers_.push_back(std::move(peer_pair));
}
dec_pending();
}

void inc_pending() {
++pending_;
}

void dec_pending() {
CHECK(pending_ > 0);
--pending_;
if (pending_ == 0) {
auto stats = create_tl_object<ton_api::adnl_stats>();
stats->timestamp_ = td::Clocks::system();
for (auto &[id, local_id_stats] : local_id_stats_) {
stats->local_ids_.push_back(std::move(local_id_stats));
}
promise_.set_result(std::move(stats));
stop();
}
}

private:
td::Promise<tl_object_ptr<ton_api::adnl_stats>> promise_;
size_t pending_ = 1;

std::map<td::Bits256, tl_object_ptr<ton_api::adnl_stats_localId>> local_id_stats_;
};
auto callback = td::actor::create_actor<Cb>("adnlstats", std::move(promise)).release();

for (auto &[id, local_id] : local_ids_) {
td::actor::send_closure(callback, &Cb::inc_pending);
td::actor::send_closure(local_id.local_id, &AdnlLocalId::get_stats,
[id = id, callback](td::Result<tl_object_ptr<ton_api::adnl_stats_localId>> R) {
if (R.is_error()) {
VLOG(ADNL_NOTICE)
<< "failed to get stats for local id " << id << " : " << R.move_as_error();
td::actor::send_closure(callback, &Cb::dec_pending);
} else {
td::actor::send_closure(callback, &Cb::got_local_id_stats, R.move_as_ok());
}
});
}
for (auto &[id, peer] : peers_) {
td::actor::send_closure(callback, &Cb::inc_pending);
td::actor::send_closure(
peer, &AdnlPeer::get_stats,
[id = id, callback](td::Result<std::vector<tl_object_ptr<ton_api::adnl_stats_peerPair>>> R) {
if (R.is_error()) {
VLOG(ADNL_NOTICE) << "failed to get stats for peer " << id << " : " << R.move_as_error();
td::actor::send_closure(callback, &Cb::dec_pending);
} else {
td::actor::send_closure(callback, &Cb::got_peer_stats, R.move_as_ok());
}
});
}
td::actor::send_closure(callback, &Cb::dec_pending);
}

} // namespace adnl

} // namespace ton
2 changes: 1 addition & 1 deletion adnl/adnl-peer-table.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class AdnlPeerTable : public Adnl {
virtual void answer_query(AdnlNodeIdShort src, AdnlNodeIdShort dst, AdnlQueryId query_id, td::BufferSlice data) = 0;

virtual void receive_packet(td::IPAddress addr, AdnlCategoryMask cat_mask, td::BufferSlice data) = 0;
virtual void receive_decrypted_packet(AdnlNodeIdShort dst, AdnlPacket packet) = 0;
virtual void receive_decrypted_packet(AdnlNodeIdShort dst, AdnlPacket packet, td::uint64 serialized_size) = 0;
virtual void send_message_in(AdnlNodeIdShort src, AdnlNodeIdShort dst, AdnlMessage message, td::uint32 flags) = 0;

virtual void register_channel(AdnlChannelIdShort id, AdnlNodeIdShort local_id,
Expand Down
4 changes: 3 additions & 1 deletion adnl/adnl-peer-table.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class AdnlPeerTableImpl : public AdnlPeerTable {
void add_static_nodes_from_config(AdnlNodesList nodes) override;

void receive_packet(td::IPAddress addr, AdnlCategoryMask cat_mask, td::BufferSlice data) override;
void receive_decrypted_packet(AdnlNodeIdShort dst, AdnlPacket data) override;
void receive_decrypted_packet(AdnlNodeIdShort dst, AdnlPacket data, td::uint64 serialized_size) override;
void send_message_in(AdnlNodeIdShort src, AdnlNodeIdShort dst, AdnlMessage message, td::uint32 flags) override;
void send_message(AdnlNodeIdShort src, AdnlNodeIdShort dst, td::BufferSlice data) override {
send_message_ex(src, dst, std::move(data), 0);
Expand Down Expand Up @@ -108,6 +108,8 @@ class AdnlPeerTableImpl : public AdnlPeerTable {
td::Promise<std::pair<td::actor::ActorOwn<AdnlTunnel>, AdnlAddress>> promise) override;
void get_conn_ip_str(AdnlNodeIdShort l_id, AdnlNodeIdShort p_id, td::Promise<td::string> promise) override;

void get_stats(td::Promise<tl_object_ptr<ton_api::adnl_stats>> promise) override;

struct PrintId {};
PrintId print_id() const {
return PrintId{};
Expand Down
Loading
Loading