Skip to content

Commit

Permalink
kad put (#274)
Browse files Browse the repository at this point in the history
Signed-off-by: turuslan <[email protected]>
  • Loading branch information
turuslan authored Oct 9, 2024
1 parent 883d3a2 commit d85fdc6
Show file tree
Hide file tree
Showing 16 changed files with 89 additions and 54 deletions.
2 changes: 1 addition & 1 deletion example/02-kademlia/rendezvous_chat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ int main(int argc, char *argv[]) {
auto peer_id =
libp2p::peer::PeerId::fromHash(cid.content_address).value();

[[maybe_unused]] auto res = kademlia->findPeer(peer_id, [&](auto) {
[[maybe_unused]] auto res = kademlia->findPeer(peer_id, [&](auto, auto) {
// Say to world about his providing
provide();

Expand Down
3 changes: 2 additions & 1 deletion include/libp2p/protocol/kademlia/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ namespace libp2p::protocol::kademlia {
using Time = std::chrono::milliseconds;
using ValueAndTime = std::pair<Value, Time>;

using FoundPeerInfoHandler = std::function<void(outcome::result<PeerInfo>)>;
using FoundPeerInfoHandler =
std::function<void(outcome::result<PeerInfo>, std::vector<PeerId>)>;
using FoundProvidersHandler =
std::function<void(outcome::result<std::vector<PeerInfo>>)>;
using FoundValueHandler = std::function<void(outcome::result<Value>)>;
Expand Down
3 changes: 3 additions & 0 deletions include/libp2p/protocol/kademlia/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ namespace libp2p::protocol::kademlia {

// https://github.com/libp2p/rust-libp2p/blob/c6cf7fec6913aa590622aeea16709fce6e9c99a5/protocols/kad/src/query/peers/closest.rs#L110-L120
size_t query_initial_peers = K_VALUE;

// https://github.com/libp2p/rust-libp2p/blob/9a45db3f82b760c93099e66ec77a7a772d1f6cd3/protocols/kad/src/query/peers/closest.rs#L336-L346
size_t replication_factor = K_VALUE;
};

} // namespace libp2p::protocol::kademlia
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ namespace libp2p::protocol::kademlia {
ContentId sought_key, FoundProvidersHandler handler) = 0;

virtual std::shared_ptr<FindPeerExecutor> createFindPeerExecutor(
PeerId peer_id, FoundPeerInfoHandler handler) = 0;
HashedKey key, FoundPeerInfoHandler handler) = 0;
};

} // namespace libp2p::protocol::kademlia
6 changes: 3 additions & 3 deletions include/libp2p/protocol/kademlia/impl/find_peer_executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ namespace libp2p::protocol::kademlia {
std::shared_ptr<basic::Scheduler> scheduler,
std::shared_ptr<SessionHost> session_host,
const std::shared_ptr<PeerRoutingTable> &peer_routing_table,
PeerId peer_id,
HashedKey target,
FoundPeerInfoHandler handler);

~FindPeerExecutor() override;
Expand Down Expand Up @@ -70,9 +70,9 @@ namespace libp2p::protocol::kademlia {
std::shared_ptr<SessionHost> session_host_;

// Secondary
const PeerId sought_peer_id_;
const NodeId target_;
HashedKey target_;
std::unordered_set<PeerId> nearest_peer_ids_;
std::vector<PeerId> succeeded_peers_;
FoundPeerInfoHandler handler_;

// Auxiliary
Expand Down
2 changes: 1 addition & 1 deletion include/libp2p/protocol/kademlia/impl/kademlia_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ namespace libp2p::protocol::kademlia {
ContentId content_id, FoundProvidersHandler handler) override;

std::shared_ptr<FindPeerExecutor> createFindPeerExecutor(
PeerId peer_id, FoundPeerInfoHandler handler) override;
HashedKey key, FoundPeerInfoHandler handler) override;

outcome::result<void> findRandomPeer() override;
void randomWalk();
Expand Down
2 changes: 1 addition & 1 deletion include/libp2p/protocol/kademlia/message.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ namespace libp2p::protocol::kademlia {
Message createGetProvidersRequest(const Key &key,
boost::optional<PeerInfo> self_announce);

Message createFindNodeRequest(const PeerId &node,
Message createFindNodeRequest(Key key,
boost::optional<PeerInfo> self_announce);

} // namespace libp2p::protocol::kademlia
Expand Down
14 changes: 14 additions & 0 deletions include/libp2p/protocol/kademlia/node_id.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <climits>
#include <cstring>
#include <memory>
#include <optional>
#include <span>
#include <vector>

Expand Down Expand Up @@ -106,4 +107,17 @@ namespace libp2p::protocol::kademlia {
Hash256 data_;
};

struct HashedKey {
HashedKey(Key key, std::optional<PeerId> peer)
: key{std::move(key)},
hash{NodeId::hash(this->key)},
peer{std::move(peer)} {}
// NOLINTNEXTLINE(google-explicit-constructor)
HashedKey(Key key) : HashedKey{std::move(key), std::nullopt} {}
// NOLINTNEXTLINE(google-explicit-constructor)
HashedKey(const PeerId &peer) : HashedKey{peer.toVector(), peer} {}
Key key;
NodeId hash;
std::optional<PeerId> peer;
};
} // namespace libp2p::protocol::kademlia
4 changes: 2 additions & 2 deletions src/peer/address_repository/inmem_address_repository.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,9 @@ namespace libp2p::peer {
auto &addresses = *peer_it->second;

auto expires_at = Clock::now() + ttl;
std::for_each(addresses.begin(), addresses.end(), [expires_at](auto &item) {
for (auto &item : addresses) {
item.second = expires_at;
});
}

return outcome::success();
} // namespace libp2p::peer
Expand Down
6 changes: 3 additions & 3 deletions src/protocol/kademlia/impl/add_provider_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ namespace libp2p::protocol::kademlia {
nearest_peer_ids_.insert(std::move_iterator(nearest_peer_ids.begin()),
std::move_iterator(nearest_peer_ids.end()));

std::for_each(nearest_peer_ids_.begin(),
nearest_peer_ids_.end(),
[this](auto &peer_id) { queue_.emplace(peer_id, target_); });
for (auto &peer_id : nearest_peer_ids_) {
queue_.emplace(peer_id, target_);
}

log_.debug("created");
}
Expand Down
30 changes: 17 additions & 13 deletions src/protocol/kademlia/impl/find_peer_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,24 @@ namespace libp2p::protocol::kademlia {
std::shared_ptr<basic::Scheduler> scheduler,
std::shared_ptr<SessionHost> session_host,
const std::shared_ptr<PeerRoutingTable> &peer_routing_table,
PeerId sought_peer_id,
HashedKey target,
FoundPeerInfoHandler handler)
: config_(config),
host_(std::move(host)),
scheduler_(std::move(scheduler)),
session_host_(std::move(session_host)),
sought_peer_id_(std::move(sought_peer_id)),
target_(sought_peer_id_),
target_{std::move(target)},
handler_(std::move(handler)),
log_("KademliaExecutor", "kademlia", "FindPeer", ++instance_number) {
auto nearest_peer_ids = peer_routing_table->getNearestPeers(
target_, config_.query_initial_peers);
target_.hash, config_.query_initial_peers);

nearest_peer_ids_.insert(std::move_iterator(nearest_peer_ids.begin()),
std::move_iterator(nearest_peer_ids.end()));

std::for_each(nearest_peer_ids_.begin(),
nearest_peer_ids_.end(),
[this](auto &peer_id) { queue_.emplace(peer_id, target_); });
for (auto &peer_id : nearest_peer_ids_) {
queue_.emplace(peer_id, target_.hash);
}

log_.debug("created");
}
Expand All @@ -69,7 +68,7 @@ namespace libp2p::protocol::kademlia {
}

Message request =
createFindNodeRequest(sought_peer_id_, std::move(self_announce));
createFindNodeRequest(target_.key, std::move(self_announce));
if (!request.serialize(*serialized_request_)) {
done_ = true;
return Error::MESSAGE_SERIALIZE_ERROR;
Expand Down Expand Up @@ -100,7 +99,7 @@ namespace libp2p::protocol::kademlia {
} else {
log_.debug("done: {}", result.error());
}
handler_(result);
handler_(result, std::move(succeeded_peers_));
}

void FindPeerExecutor::spawn() {
Expand Down Expand Up @@ -220,8 +219,6 @@ namespace libp2p::protocol::kademlia {
return
// Check if message type is appropriate
msg.type == Message::Type::kFindNode;
// Check if response is accorded to request
// && msg.key == sought_peer_id_.toVector()
}

void FindPeerExecutor::onResult(const std::shared_ptr<Session> &session,
Expand Down Expand Up @@ -258,6 +255,8 @@ namespace libp2p::protocol::kademlia {
requests_in_progress_,
queue_.size());

succeeded_peers_.emplace_back(remote_peer_id);

// Append gotten peer to queue
if (msg.closer_peers) {
for (auto &peer : msg.closer_peers.value()) {
Expand All @@ -283,7 +282,7 @@ namespace libp2p::protocol::kademlia {
}

// Found
if (peer.info.id == sought_peer_id_) {
if (peer.info.id == target_.peer) {
done(peer.info);
}

Expand All @@ -299,10 +298,15 @@ namespace libp2p::protocol::kademlia {

// New peer add to queue
if (auto [it, ok] = nearest_peer_ids_.emplace(peer.info.id); ok) {
queue_.emplace(*it, target_);
queue_.emplace(*it, target_.hash);
}
}
}

if (succeeded_peers_.size() >= config_.replication_factor) {
// https://github.com/libp2p/rust-libp2p/blob/9a45db3f82b760c93099e66ec77a7a772d1f6cd3/protocols/kad/src/query/peers/closest.rs#L336-L346
done(Error::VALUE_NOT_FOUND);
}
}

} // namespace libp2p::protocol::kademlia
6 changes: 3 additions & 3 deletions src/protocol/kademlia/impl/find_providers_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ namespace libp2p::protocol::kademlia {
nearest_peer_ids_.insert(std::move_iterator(nearest_peer_ids.begin()),
std::move_iterator(nearest_peer_ids.end()));

std::for_each(nearest_peer_ids_.begin(),
nearest_peer_ids_.end(),
[this](auto &peer_id) { queue_.emplace(peer_id, target_); });
for (auto &peer_id : nearest_peer_ids_) {
queue_.emplace(peer_id, target_);
}

log_.debug("created");
}
Expand Down
6 changes: 3 additions & 3 deletions src/protocol/kademlia/impl/get_value_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ namespace libp2p::protocol::kademlia {
nearest_peer_ids_.insert(std::move_iterator(nearest_peer_ids.begin()),
std::move_iterator(nearest_peer_ids.end()));

std::for_each(nearest_peer_ids_.begin(),
nearest_peer_ids_.end(),
[this](auto &peer_id) { queue_.emplace(peer_id, target_); });
for (auto &peer_id : nearest_peer_ids_) {
queue_.emplace(peer_id, target_);
}

received_records_ = std::make_unique<Table>();
log_.debug("created");
Expand Down
48 changes: 30 additions & 18 deletions src/protocol/kademlia/impl/kademlia_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,23 @@ namespace libp2p::protocol::kademlia {
outcome::result<void> KademliaImpl::putValue(Key key, Value value) {
log_.debug("CALL: PutValue ({})", multi::detail::encodeBase58(key));

if (auto res = storage_->putValue(key, std::move(value));
not res.has_value()) {
return res.as_failure();
}

return outcome::success();
OUTCOME_TRY(storage_->putValue(key, value));

// `FindPeerExecutor` holds itself using `shared_from_this`
return createFindPeerExecutor(
key,
[weak_self{weak_from_this()}, key, value](
const outcome::result<PeerInfo> &,
std::vector<PeerId> succeeded_peers) {
auto self = weak_self.lock();
if (not self) {
return;
}
std::ignore = self->createPutValueExecutor(
key, value, std::move(succeeded_peers))
->start();
})
->start();
}

outcome::result<void> KademliaImpl::getValue(const Key &key,
Expand Down Expand Up @@ -274,7 +285,7 @@ namespace libp2p::protocol::kademlia {
if (not peer_info.addresses.empty()) {
scheduler_->schedule(
[handler = std::move(handler), peer_info = std::move(peer_info)] {
handler(peer_info);
handler(peer_info, {});
});

log_.debug("{} found locally", peer_id.toBase58());
Expand Down Expand Up @@ -330,7 +341,7 @@ namespace libp2p::protocol::kademlia {
return;
}

auto res = putValue(key, value);
auto res = storage_->putValue(key, value);
if (!res) {
log_.warn("incoming PutValue failed: {}", res.error());
return;
Expand Down Expand Up @@ -555,14 +566,15 @@ namespace libp2p::protocol::kademlia {
multi::Multihash::create(multi::HashType::sha256, hash).value())
.value();

FoundPeerInfoHandler handler =
[wp = weak_from_this()](outcome::result<PeerInfo> res) {
if (auto self = wp.lock()) {
if (res.has_value()) {
self->addPeer(res.value(), false);
}
}
};
FoundPeerInfoHandler handler = [wp = weak_from_this()](
outcome::result<PeerInfo> res,
const std::vector<PeerId> &) {
if (auto self = wp.lock()) {
if (res.has_value()) {
self->addPeer(res.value(), false);
}
}
};

return findPeer(peer_id, handler);
}
Expand Down Expand Up @@ -682,13 +694,13 @@ namespace libp2p::protocol::kademlia {
}

std::shared_ptr<FindPeerExecutor> KademliaImpl::createFindPeerExecutor(
PeerId peer_id, FoundPeerInfoHandler handler) {
HashedKey key, FoundPeerInfoHandler handler) {
return std::make_shared<FindPeerExecutor>(config_,
host_,
scheduler_,
shared_from_this(),
peer_routing_table_,
std::move(peer_id),
std::move(key),
std::move(handler));
}

Expand Down
4 changes: 2 additions & 2 deletions src/protocol/kademlia/message.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,11 +225,11 @@ namespace libp2p::protocol::kademlia {
return msg;
}

Message createFindNodeRequest(const PeerId &node,
Message createFindNodeRequest(Key key,
boost::optional<PeerInfo> self_announce) {
Message msg;
msg.type = Message::Type::kFindNode;
msg.key = node.toVector();
msg.key = std::move(key);
if (self_announce) {
msg.selfAnnounce(std::move(self_announce.value()));
}
Expand Down
5 changes: 3 additions & 2 deletions test/libp2p/transport/tcp/tcp_integration_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,9 @@ TEST(TCP, SingleListenerCanAcceptManyClients) {
});

context->run_for(500ms);
std::for_each(
clients.begin(), clients.end(), [](std::thread &t) { t.join(); });
for (auto &t : clients) {
t.join();
}

ASSERT_EQ(counter, kClients) << "not all clients' requests were handled";
}
Expand Down

0 comments on commit d85fdc6

Please sign in to comment.