Skip to content

Commit

Permalink
Merge branch 'dht-overlay-patch' into perfnet
Browse files Browse the repository at this point in the history
  • Loading branch information
SpyCheese committed Jul 26, 2023
2 parents 518921b + 92f8ded commit 0875caf
Show file tree
Hide file tree
Showing 16 changed files with 162 additions and 89 deletions.
5 changes: 1 addition & 4 deletions dht/dht-in.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,7 @@ class DhtMemberImpl : public DhtMember {
}
}

void add_full_node(DhtKeyId id, DhtNode node) override {
add_full_node_impl(id, std::move(node));
}
void add_full_node_impl(DhtKeyId id, DhtNode node, bool set_active = false);
void add_full_node(DhtKeyId id, DhtNode node, bool set_active) override;

adnl::AdnlNodeIdShort get_id() const override {
return id_;
Expand Down
87 changes: 49 additions & 38 deletions dht/dht-query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,24 +34,33 @@ namespace ton {
namespace dht {

void DhtQuery::send_queries() {
while (pending_queries_.size() > k_ * 2) {
pending_queries_.erase(--pending_queries_.end());
}
VLOG(DHT_EXTRA_DEBUG) << this << ": sending new queries. active=" << active_queries_ << " max_active=" << a_;
while (pending_ids_.size() > 0 && active_queries_ < a_) {
while (pending_queries_.size() > 0 && active_queries_ < a_) {
auto id_xor = *pending_queries_.begin();
if (result_list_.size() == k_ && *result_list_.rbegin() < id_xor) {
break;
}
active_queries_++;
auto id_xor = *pending_ids_.begin();
auto id = id_xor ^ key_;
VLOG(DHT_EXTRA_DEBUG) << this << ": sending " << get_name() << " query to " << id;
pending_ids_.erase(id_xor);
pending_queries_.erase(id_xor);

auto it = list_.find(id_xor);
CHECK(it != list_.end());
td::actor::send_closure(adnl_, &adnl::Adnl::add_peer, get_src(), it->second.adnl_id(), it->second.addr_list());
auto it = nodes_.find(id_xor);
CHECK(it != nodes_.end());
td::actor::send_closure(adnl_, &adnl::Adnl::add_peer, get_src(), it->second.node.adnl_id(),
it->second.node.addr_list());
send_one_query(id.to_adnl());
}
if (active_queries_ == 0) {
CHECK(pending_ids_.size() == 0);
pending_queries_.clear();
DhtNodesList list;
for (auto &node : list_) {
list.push_back(std::move(node.second));
for (auto id_xor : result_list_) {
auto it = nodes_.find(id_xor);
CHECK(it != nodes_.end());
list.push_back(it->second.node.clone());
}
CHECK(list.size() <= k_);
VLOG(DHT_EXTRA_DEBUG) << this << ": finalizing " << get_name() << " query. List size=" << list.size();
Expand All @@ -65,30 +74,32 @@ void DhtQuery::add_nodes(DhtNodesList list) {
for (auto &node : list.list()) {
auto id = node.get_key();
auto id_xor = key_ ^ id;
if (list_.find(id_xor) != list_.end()) {
if (nodes_.find(id_xor) != nodes_.end()) {
continue;
}
td::actor::send_closure(node_, &DhtMember::add_full_node, id, node.clone());
VLOG(DHT_EXTRA_DEBUG) << this << ": " << get_name() << " query: adding " << id << " key";
td::actor::send_closure(node_, &DhtMember::add_full_node, id, node.clone(), false);
nodes_[id_xor].node = std::move(node);
pending_queries_.insert(id_xor);
}
}

DhtKeyId last_id_xor;
if (list_.size() > 0) {
last_id_xor = list_.rbegin()->first;
void DhtQuery::finish_query(adnl::AdnlNodeIdShort id, bool success) {
active_queries_--;
CHECK(active_queries_ <= k_);
auto id_xor = key_ ^ DhtKeyId(id);
if (success) {
result_list_.insert(id_xor);
if (result_list_.size() > k_) {
result_list_.erase(--result_list_.end());
}

if (list_.size() < k_ || id_xor < last_id_xor) {
list_[id_xor] = std::move(node);
pending_ids_.insert(id_xor);
if (list_.size() > k_) {
CHECK(id_xor != last_id_xor);
VLOG(DHT_EXTRA_DEBUG) << this << ": " << get_name() << " query: replacing " << (last_id_xor ^ key_)
<< " key with " << id;
pending_ids_.erase(last_id_xor);
list_.erase(last_id_xor);
} else {
VLOG(DHT_EXTRA_DEBUG) << this << ": " << get_name() << " query: adding " << id << " key";
}
} else {
NodeInfo &info = nodes_[id_xor];
if (++info.failed_attempts < MAX_ATTEMPTS) {
pending_queries_.insert(id_xor);
}
}
send_queries();
}

void DhtQueryFindNodes::send_one_query(adnl::AdnlNodeIdShort id) {
Expand All @@ -111,7 +122,7 @@ void DhtQueryFindNodes::send_one_query(adnl::AdnlNodeIdShort id) {
void DhtQueryFindNodes::on_result(td::Result<td::BufferSlice> R, adnl::AdnlNodeIdShort dst) {
if (R.is_error()) {
VLOG(DHT_INFO) << this << ": failed find nodes query " << get_src() << "->" << dst << ": " << R.move_as_error();
finish_query();
finish_query(dst, false);
return;
}

Expand All @@ -122,7 +133,7 @@ void DhtQueryFindNodes::on_result(td::Result<td::BufferSlice> R, adnl::AdnlNodeI
} else {
add_nodes(DhtNodesList{Res.move_as_ok(), our_network_id()});
}
finish_query();
finish_query(dst);
}

void DhtQueryFindNodes::finish(DhtNodesList list) {
Expand Down Expand Up @@ -166,14 +177,14 @@ void DhtQueryFindValue::send_one_query_nodes(adnl::AdnlNodeIdShort id) {
void DhtQueryFindValue::on_result(td::Result<td::BufferSlice> R, adnl::AdnlNodeIdShort dst) {
if (R.is_error()) {
VLOG(DHT_INFO) << this << ": failed find value query " << get_src() << "->" << dst << ": " << R.move_as_error();
finish_query();
finish_query(dst, false);
return;
}
auto Res = fetch_tl_object<ton_api::dht_ValueResult>(R.move_as_ok(), true);
if (Res.is_error()) {
VLOG(DHT_WARNING) << this << ": dropping incorrect answer on dht.findValue query from " << dst << ": "
<< Res.move_as_error();
finish_query();
finish_query(dst, false);
return;
}

Expand Down Expand Up @@ -210,26 +221,26 @@ void DhtQueryFindValue::on_result(td::Result<td::BufferSlice> R, adnl::AdnlNodeI
} else if (send_get_nodes) {
send_one_query_nodes(dst);
} else {
finish_query();
finish_query(dst);
}
}

void DhtQueryFindValue::on_result_nodes(td::Result<td::BufferSlice> R, adnl::AdnlNodeIdShort dst) {
if (R.is_error()) {
VLOG(DHT_INFO) << this << ": failed find nodes query " << get_src() << "->" << dst << ": " << R.move_as_error();
finish_query();
finish_query(dst, false);
return;
}
auto Res = fetch_tl_object<ton_api::dht_nodes>(R.move_as_ok(), true);
if (Res.is_error()) {
VLOG(DHT_WARNING) << this << ": dropping incorrect answer on dht.findNodes query from " << dst << ": "
<< Res.move_as_error();
finish_query();
finish_query(dst, false);
return;
}
auto r = Res.move_as_ok();
add_nodes(DhtNodesList{create_tl_object<ton_api::dht_nodes>(std::move(r->nodes_)), our_network_id()});
finish_query();
finish_query(dst);
}

void DhtQueryFindValue::finish(DhtNodesList list) {
Expand Down Expand Up @@ -422,14 +433,14 @@ void DhtQueryRequestReversePing::send_one_query(adnl::AdnlNodeIdShort id) {
void DhtQueryRequestReversePing::on_result(td::Result<td::BufferSlice> R, adnl::AdnlNodeIdShort dst) {
if (R.is_error()) {
VLOG(DHT_INFO) << this << ": failed reverse ping query " << get_src() << "->" << dst << ": " << R.move_as_error();
finish_query();
finish_query(dst, false);
return;
}
auto Res = fetch_tl_object<ton_api::dht_ReversePingResult>(R.move_as_ok(), true);
if (Res.is_error()) {
VLOG(DHT_WARNING) << this << ": dropping incorrect answer on dht.requestReversePing query from " << dst << ": "
<< Res.move_as_error();
finish_query();
finish_query(dst, false);
return;
}

Expand All @@ -441,7 +452,7 @@ void DhtQueryRequestReversePing::on_result(td::Result<td::BufferSlice> R, adnl::
},
[&](ton_api::dht_clientNotFound &v) {
add_nodes(DhtNodesList{std::move(v.nodes_), our_network_id()});
finish_query();
finish_query(dst);
}));
}

Expand Down
16 changes: 9 additions & 7 deletions dht/dht-query.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,7 @@ class DhtQuery : public td::actor::Actor {
}
void send_queries();
void add_nodes(DhtNodesList list);
void finish_query() {
active_queries_--;
CHECK(active_queries_ <= k_);
send_queries();
}
void finish_query(adnl::AdnlNodeIdShort id, bool success = true);
DhtKeyId get_key() const {
return key_;
}
Expand All @@ -88,16 +84,22 @@ class DhtQuery : public td::actor::Actor {
virtual std::string get_name() const = 0;

private:
struct NodeInfo {
DhtNode node;
int failed_attempts = 0;
};
DhtMember::PrintId print_id_;
adnl::AdnlNodeIdShort src_;
std::map<DhtKeyId, DhtNode> list_;
std::set<DhtKeyId> pending_ids_;
std::map<DhtKeyId, NodeInfo> nodes_;
std::set<DhtKeyId> result_list_, pending_queries_;
td::uint32 k_;
td::uint32 a_;
td::int32 our_network_id_;
td::actor::ActorId<DhtMember> node_;
td::uint32 active_queries_ = 0;

static const int MAX_ATTEMPTS = 1;

protected:
td::actor::ActorId<adnl::Adnl> adnl_;
};
Expand Down
22 changes: 11 additions & 11 deletions dht/dht.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ td::Result<td::actor::ActorOwn<Dht>> Dht::create(adnl::AdnlNodeIdShort id, std::

for (auto &node : nodes.list()) {
auto key = node.get_key();
td::actor::send_closure(D, &DhtMember::add_full_node, key, node.clone());
td::actor::send_closure(D, &DhtMember::add_full_node, key, node.clone(), true);
}
return std::move(D);
}
Expand All @@ -74,7 +74,7 @@ td::Result<td::actor::ActorOwn<Dht>> Dht::create_client(adnl::AdnlNodeIdShort id

for (auto &node : nodes.list()) {
auto key = node.get_key();
td::actor::send_closure(D, &DhtMember::add_full_node, key, node.clone());
td::actor::send_closure(D, &DhtMember::add_full_node, key, node.clone(), true);
}
return std::move(D);
}
Expand Down Expand Up @@ -368,7 +368,7 @@ void DhtMemberImpl::receive_query(adnl::AdnlNodeIdShort src, td::BufferSlice dat
auto node = N.move_as_ok();
if (node.adnl_id().compute_short_id() == src) {
auto key = node.get_key();
add_full_node_impl(key, std::move(node), true);
add_full_node(key, std::move(node), true);
} else {
VLOG(DHT_WARNING) << this << ": dropping bad node: unexpected adnl id";
}
Expand Down Expand Up @@ -398,7 +398,7 @@ void DhtMemberImpl::receive_query(adnl::AdnlNodeIdShort src, td::BufferSlice dat
ton_api::downcast_call(*Q, [&](auto &object) { this->process_query(src, object, std::move(promise)); });
}

void DhtMemberImpl::add_full_node_impl(DhtKeyId key, DhtNode node, bool set_active) {
void DhtMemberImpl::add_full_node(DhtKeyId key, DhtNode node, bool set_active) {
VLOG(DHT_EXTRA_DEBUG) << this << ": adding full node " << key;

auto eid = key ^ key_;
Expand Down Expand Up @@ -466,7 +466,7 @@ void DhtMemberImpl::set_value(DhtValue value, td::Promise<td::Unit> promise) {

void DhtMemberImpl::get_value_in(DhtKeyId key, td::Promise<DhtValue> result) {
auto P = td::PromiseCreator::lambda([key, promise = std::move(result), SelfId = actor_id(this), print_id = print_id(),
adnl = adnl_, list = get_nearest_nodes(key, k_), k = k_, a = a_,
adnl = adnl_, list = get_nearest_nodes(key, k_ * 2), k = k_, a = a_,
network_id = network_id_, id = id_,
client_only = client_only_](td::Result<DhtNode> R) mutable {
R.ensure();
Expand All @@ -485,7 +485,7 @@ void DhtMemberImpl::register_reverse_connection(adnl::AdnlNodeIdFull client, td:
auto key_id = get_reverse_connection_key(client_short).compute_key_id();
td::actor::send_closure(keyring_, &keyring::Keyring::sign_message, client_short.pubkey_hash(),
register_reverse_connection_to_sign(client_short, id_, ttl),
[=, print_id = print_id(), list = get_nearest_nodes(key_id, k_), SelfId = actor_id(this),
[=, print_id = print_id(), list = get_nearest_nodes(key_id, k_ * 2), SelfId = actor_id(this),
promise = std::move(promise)](td::Result<td::BufferSlice> R) mutable {
TRY_RESULT_PROMISE_PREFIX(promise, signature, std::move(R), "Failed to sign: ");
td::actor::send_closure(SelfId, &DhtMemberImpl::get_self_node,
Expand Down Expand Up @@ -532,7 +532,7 @@ void DhtMemberImpl::request_reverse_ping_cont(adnl::AdnlNode target, td::BufferS
}
auto key_id = get_reverse_connection_key(client).compute_key_id();
get_self_node([=, target = std::move(target), signature = std::move(signature), promise = std::move(promise),
SelfId = actor_id(this), print_id = print_id(), list = get_nearest_nodes(key_id, k_),
SelfId = actor_id(this), print_id = print_id(), list = get_nearest_nodes(key_id, k_ * 2),
client_only = client_only_](td::Result<DhtNode> R) mutable {
R.ensure();
td::actor::create_actor<DhtQueryRequestReversePing>(
Expand Down Expand Up @@ -651,8 +651,8 @@ void DhtMemberImpl::check() {

DhtKeyId key{x};
auto P = td::PromiseCreator::lambda([key, promise = std::move(promise), SelfId = actor_id(this),
print_id = print_id(), adnl = adnl_, list = get_nearest_nodes(key, k_), k = k_,
a = a_, network_id = network_id_, id = id_,
print_id = print_id(), adnl = adnl_, list = get_nearest_nodes(key, k_ * 2),
k = k_, a = a_, network_id = network_id_, id = id_,
client_only = client_only_](td::Result<DhtNode> R) mutable {
R.ensure();
td::actor::create_actor<DhtQueryFindNodes>("FindNodesQuery", key, print_id, id, std::move(list), k, a, network_id,
Expand All @@ -677,8 +677,8 @@ void DhtMemberImpl::send_store(DhtValue value, td::Promise<td::Unit> promise) {
auto key_id = value.key_id();

auto P = td::PromiseCreator::lambda([value = std::move(value), print_id = print_id(), id = id_,
client_only = client_only_, list = get_nearest_nodes(key_id, k_), k = k_, a = a_,
network_id = network_id_, SelfId = actor_id(this), adnl = adnl_,
client_only = client_only_, list = get_nearest_nodes(key_id, k_ * 2), k = k_,
a = a_, network_id = network_id_, SelfId = actor_id(this), adnl = adnl_,
promise = std::move(promise)](td::Result<DhtNode> R) mutable {
R.ensure();
td::actor::create_actor<DhtQueryStore>("StoreQuery", std::move(value), print_id, id, std::move(list), k, a,
Expand Down
2 changes: 1 addition & 1 deletion dht/dht.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class DhtMember : public Dht {

//virtual void update_addr_list(tl_object_ptr<ton_api::adnl_addressList> addr_list) = 0;
//virtual void add_node(adnl::AdnlNodeIdShort id) = 0;
virtual void add_full_node(DhtKeyId id, DhtNode node) = 0;
virtual void add_full_node(DhtKeyId id, DhtNode node, bool set_active) = 0;

virtual void receive_ping(DhtKeyId id, DhtNode result) = 0;

Expand Down
6 changes: 3 additions & 3 deletions overlay/overlay-manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,17 +92,17 @@ void OverlayManager::create_public_overlay(adnl::AdnlNodeIdShort local_id, Overl
std::unique_ptr<Callback> callback, OverlayPrivacyRules rules,
td::string scope) {
create_public_overlay_ex(local_id, std::move(overlay_id), std::move(callback), std::move(rules), std::move(scope),
true);
{});
}

void OverlayManager::create_public_overlay_ex(adnl::AdnlNodeIdShort local_id, OverlayIdFull overlay_id,
std::unique_ptr<Callback> callback, OverlayPrivacyRules rules,
td::string scope, bool announce_self) {
td::string scope, OverlayOptions opts) {
CHECK(!dht_node_.empty());
auto id = overlay_id.compute_short_id();
register_overlay(local_id, id,
Overlay::create(keyring_, adnl_, actor_id(this), dht_node_, local_id, std::move(overlay_id),
std::move(callback), std::move(rules), scope, announce_self));
std::move(callback), std::move(rules), scope, std::move(opts)));
}

void OverlayManager::create_private_overlay(adnl::AdnlNodeIdShort local_id, OverlayIdFull overlay_id,
Expand Down
2 changes: 1 addition & 1 deletion overlay/overlay-manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class OverlayManager : public Overlays {
std::unique_ptr<Callback> callback, OverlayPrivacyRules rules, td::string scope) override;
void create_public_overlay_ex(adnl::AdnlNodeIdShort local_id, OverlayIdFull overlay_id,
std::unique_ptr<Callback> callback, OverlayPrivacyRules rules, td::string scope,
bool announce_self) override;
OverlayOptions opts) override;
void create_private_overlay(adnl::AdnlNodeIdShort local_id, OverlayIdFull overlay_id,
std::vector<adnl::AdnlNodeIdShort> nodes, std::unique_ptr<Callback> callback,
OverlayPrivacyRules rules) override;
Expand Down
Loading

0 comments on commit 0875caf

Please sign in to comment.