From 545dddc950e206333caa209ba073e0f756f0514b Mon Sep 17 00:00:00 2001 From: Jakob Otto Date: Fri, 12 Jun 2020 14:59:08 +0200 Subject: [PATCH 1/8] Add datagram adaptor --- libcaf_net/caf/net/datagram_adaptor.hpp | 51 +++++++++++++++++++++++++ 1 file changed, 51 insertions(+) create mode 100644 libcaf_net/caf/net/datagram_adaptor.hpp diff --git a/libcaf_net/caf/net/datagram_adaptor.hpp b/libcaf_net/caf/net/datagram_adaptor.hpp new file mode 100644 index 00000000..3aff63fe --- /dev/null +++ b/libcaf_net/caf/net/datagram_adaptor.hpp @@ -0,0 +1,51 @@ +/****************************************************************************** + * ____ _ _____ * + * / ___| / \ | ___| C++ * + * | | / _ \ | |_ Actor * + * | |___ / ___ \| _| Framework * + * \____/_/ \_|_| * + * * + * Copyright 2011-2020 Dominik Charousset * + * * + * Distributed under the terms and conditions of the BSD 3-Clause License or * + * (at your option) under the terms and conditions of the Boost Software * + * License 1.0. See accompanying files LICENSE and LICENSE_ALTERNATIVE. * + * * + * If you did not receive a copy of the license files, see * + * http://opensource.org/licenses/BSD-3-Clause and * + * http://www.boost.org/LICENSE_1_0.txt. * + ******************************************************************************/ + +#pragma once + +#include "caf/byte_buffer.hpp" +#include "caf/net/basp/header.hpp" + +namespace caf::net { + +/// Implements an adaption layer for datagram oriented transport protocols. +template +class datagram_adaptor { +public: + datagram_adaptor(Application application) + : application_(std::move(application)), missing_(0), passed_(0) { + // nop + } + + template + error handle_data(Parent&, span received) { + auto writer = make_packet_writer_decorator(*this, parent); + if (auto err = application_.handle_data( + writer_, make_span(received.data(), basp::header_size))) + return err; + if (auto err = application_.handle_data( + writer_, make_span(received.data() + basp::header_size, + received.size() - basp::header_size)) + return err; + } + +private: + Application application_; +}; + +} // namespace caf::net From c2602506f7522613d2ec26b18d3ffe95adccc78a Mon Sep 17 00:00:00 2001 From: Jakob Otto Date: Fri, 12 Jun 2020 17:31:30 +0200 Subject: [PATCH 2/8] Add udp-backend --- libcaf_net/CMakeLists.txt | 2 + libcaf_net/caf/net/backend/udp.hpp | 92 ++++++++++++ libcaf_net/caf/net/datagram_transport.hpp | 7 + libcaf_net/caf/net/defaults.hpp | 3 + libcaf_net/caf/net/doorman.hpp | 4 + libcaf_net/caf/net/endpoint_manager.hpp | 2 + libcaf_net/caf/net/endpoint_manager_impl.hpp | 4 + libcaf_net/caf/net/transport_base.hpp | 4 + .../caf/net/transport_worker_dispatcher.hpp | 42 ++++-- libcaf_net/src/defaults.cpp | 2 + libcaf_net/src/net/backend/udp.cpp | 102 +++++++++++++ libcaf_net/test/endpoint_manager.cpp | 4 + libcaf_net/test/net/backend/udp.cpp | 139 ++++++++++++++++++ 13 files changed, 397 insertions(+), 10 deletions(-) create mode 100644 libcaf_net/caf/net/backend/udp.hpp create mode 100644 libcaf_net/src/net/backend/udp.cpp create mode 100644 libcaf_net/test/net/backend/udp.cpp diff --git a/libcaf_net/CMakeLists.txt b/libcaf_net/CMakeLists.txt index e4f78b24..1f6a3982 100644 --- a/libcaf_net/CMakeLists.txt +++ b/libcaf_net/CMakeLists.txt @@ -52,6 +52,7 @@ add_library(libcaf_net_obj OBJECT ${CAF_NET_HEADERS} src/multiplexer.cpp src/net/backend/test.cpp src/net/backend/tcp.cpp + src/net/backend/udp.cpp src/net/endpoint_manager_queue.cpp src/net/middleman.cpp src/net/middleman_backend.cpp @@ -149,4 +150,5 @@ caf_incubator_add_test_suites(caf-net-test udp_datagram_socket network_socket net.backend.tcp + net.backend.udp ) diff --git a/libcaf_net/caf/net/backend/udp.hpp b/libcaf_net/caf/net/backend/udp.hpp new file mode 100644 index 00000000..52e7f46e --- /dev/null +++ b/libcaf_net/caf/net/backend/udp.hpp @@ -0,0 +1,92 @@ +/****************************************************************************** + * ____ _ _____ * + * / ___| / \ | ___| C++ * + * | | / _ \ | |_ Actor * + * | |___ / ___ \| _| Framework * + * \____/_/ \_|_| * + * * + * Copyright 2011-2020 Dominik Charousset * + * * + * Distributed under the terms and conditions of the BSD 3-Clause License or * + * (at your option) under the terms and conditions of the Boost Software * + * License 1.0. See accompanying files LICENSE and LICENSE_ALTERNATIVE. * + * * + * If you did not receive a copy of the license files, see * + * http://opensource.org/licenses/BSD-3-Clause and * + * http://www.boost.org/LICENSE_1_0.txt. * + ******************************************************************************/ + +#pragma once + +#include + +#include "caf/detail/net_export.hpp" +#include "caf/error.hpp" +#include "caf/expected.hpp" +#include "caf/net/basp/application.hpp" +#include "caf/net/fwd.hpp" +#include "caf/net/make_endpoint_manager.hpp" +#include "caf/net/middleman.hpp" +#include "caf/net/middleman_backend.hpp" +#include "caf/net/multiplexer.hpp" +#include "caf/net/udp_datagram_socket.hpp" +#include "caf/node_id.hpp" + +namespace caf::net::backend { + +/// Minimal backend for udp communication. +/// @warning this backend is *not* thread safe. +class CAF_NET_EXPORT udp : public middleman_backend { +public: + // -- constructors, destructors, and assignment operators -------------------- + + udp(middleman& mm); + + ~udp() override; + + // -- interface functions ---------------------------------------------------- + + error init() override; + + void stop() override; + + expected connect(const uri& locator) override; + + endpoint_manager_ptr peer(const node_id&) override; + + void resolve(const uri& locator, const actor& listener) override; + + strong_actor_ptr make_proxy(node_id nid, actor_id aid) override; + + void set_last_hop(node_id*) override; + + // -- properties ------------------------------------------------------------- + + udp_datagram_socket socket(const node_id&) { + return socket_cast(ep_manager_->handle()); + } + + uint16_t port() const noexcept override { + return listening_port_; + } + + template + expected emplace(const uri& locator) { + if (auto err = ep_manager_->emplace(locator)) + return err; + return ep_manager_; + } + +private: + middleman& mm_; + + endpoint_manager_ptr ep_manager_; + + std::vector node_ids_; + + proxy_registry proxies_; + + uint16_t listening_port_; +}; + +} // namespace caf::net::backend diff --git a/libcaf_net/caf/net/datagram_transport.hpp b/libcaf_net/caf/net/datagram_transport.hpp index babd7331..4c6739f5 100644 --- a/libcaf_net/caf/net/datagram_transport.hpp +++ b/libcaf_net/caf/net/datagram_transport.hpp @@ -163,6 +163,13 @@ class datagram_transport : public datagram_transport_base { } }; + error emplace(const uri& locator) override { + if (auto ret = this->next_layer_.emplace(*this, locator)) + return none; + else + return ret.error(); + } + private: // -- utility functions ------------------------------------------------------ diff --git a/libcaf_net/caf/net/defaults.hpp b/libcaf_net/caf/net/defaults.hpp index 15a54457..4b1e9abe 100644 --- a/libcaf_net/caf/net/defaults.hpp +++ b/libcaf_net/caf/net/defaults.hpp @@ -36,4 +36,7 @@ CAF_NET_EXPORT extern const size_t max_header_buffers; /// Port to listen on for tcp. CAF_NET_EXPORT extern const uint16_t tcp_port; +/// Port to listen on for udp. +CAF_NET_EXPORT extern const uint16_t udp_port; + } // namespace caf::defaults::middleman diff --git a/libcaf_net/caf/net/doorman.hpp b/libcaf_net/caf/net/doorman.hpp index e7b93ed2..f98fd642 100644 --- a/libcaf_net/caf/net/doorman.hpp +++ b/libcaf_net/caf/net/doorman.hpp @@ -121,6 +121,10 @@ class doorman { CAF_LOG_ERROR("doorman encounterd error: " << err); } + error emplace(const uri& locator) { + return make_error(sec::runtime_error, "function not implemented"); + } + private: net::tcp_accept_socket acceptor_; diff --git a/libcaf_net/caf/net/endpoint_manager.hpp b/libcaf_net/caf/net/endpoint_manager.hpp index 14bf249b..c44f522b 100644 --- a/libcaf_net/caf/net/endpoint_manager.hpp +++ b/libcaf_net/caf/net/endpoint_manager.hpp @@ -77,6 +77,8 @@ class CAF_NET_EXPORT endpoint_manager : public socket_manager { /// Initializes the manager before adding it to the multiplexer's event loop. virtual error init() = 0; + virtual error emplace(const uri& locator) = 0; + protected: bool enqueue(endpoint_manager_queue::element* ptr); diff --git a/libcaf_net/caf/net/endpoint_manager_impl.hpp b/libcaf_net/caf/net/endpoint_manager_impl.hpp index c6523387..66548333 100644 --- a/libcaf_net/caf/net/endpoint_manager_impl.hpp +++ b/libcaf_net/caf/net/endpoint_manager_impl.hpp @@ -119,6 +119,10 @@ class endpoint_manager_impl : public endpoint_manager { transport_.handle_error(code); } + error emplace(const uri& locator) override { + return transport_.emplace(locator); + } + private: transport_type transport_; diff --git a/libcaf_net/caf/net/transport_base.hpp b/libcaf_net/caf/net/transport_base.hpp index 462ad9dd..a549e832 100644 --- a/libcaf_net/caf/net/transport_base.hpp +++ b/libcaf_net/caf/net/transport_base.hpp @@ -192,6 +192,10 @@ class transport_base { /// @param buffers Pointers to the buffers that make up the packet content. virtual void write_packet(id_type id, span buffers) = 0; + virtual error emplace(const uri&) { + return make_error(sec::runtime_error, "function not implemented"); + } + // -- buffer management ------------------------------------------------------ /// Returns the next cached header buffer or creates a new one if no buffers diff --git a/libcaf_net/caf/net/transport_worker_dispatcher.hpp b/libcaf_net/caf/net/transport_worker_dispatcher.hpp index 6dab4e16..cc04cb93 100644 --- a/libcaf_net/caf/net/transport_worker_dispatcher.hpp +++ b/libcaf_net/caf/net/transport_worker_dispatcher.hpp @@ -20,11 +20,14 @@ #include +#include "caf/ip_endpoint.hpp" #include "caf/logger.hpp" #include "caf/net/endpoint_manager_queue.hpp" #include "caf/net/fwd.hpp" +#include "caf/net/ip.hpp" #include "caf/net/packet_writer_decorator.hpp" #include "caf/net/transport_worker.hpp" +#include "caf/node_id.hpp" #include "caf/sec.hpp" #include "caf/send.hpp" @@ -91,11 +94,16 @@ class transport_worker_dispatcher { template void resolve(Parent& parent, const uri& locator, const actor& listener) { - if (auto worker = find_worker(make_node_id(locator))) + if (auto worker = find_worker(make_node_id(*locator.authority_only()))) worker->resolve(parent, locator.path(), listener); - else - anon_send(listener, - make_error(sec::runtime_error, "could not resolve node")); + else { + if (auto ret = emplace(parent, locator)) { + auto& worker = *ret; + worker->resolve(parent, locator.path(), listener); + } else { + CAF_LOG_ERROR("emplace failed: " << ret.error()); + } + } } template @@ -132,8 +140,24 @@ class transport_worker_dispatcher { } template - expected add_new_worker(Parent& parent, node_id node, - id_type id) { + expected emplace(Parent& parent, const uri& locator) { + auto& auth = locator.authority(); + ip_address addr; + if (auto hostname = get_if(&auth.host)) { + auto addrs = ip::resolve(*hostname); + if (addrs.empty()) + return sec::cannot_connect_to_node; + addr = addrs.at(0); + } else { + addr = *get_if(&auth.host); + } + return add_new_worker(parent, make_node_id(*locator.authority_only()), + ip_endpoint{addr, auth.port}); + } + + template + expected + add_new_worker(Parent& parent, node_id node, id_type id) { CAF_LOG_TRACE(CAF_ARG(node) << CAF_ARG(id)); auto application = factory_.make(); auto worker = std::make_shared(std::move(application), id); @@ -141,7 +165,7 @@ class transport_worker_dispatcher { return err; workers_by_id_.emplace(std::move(id), worker); workers_by_node_.emplace(std::move(node), worker); - return worker; + return std::move(worker); } private: @@ -156,10 +180,8 @@ class transport_worker_dispatcher { template worker_ptr find_worker_impl(const std::unordered_map& map, const Key& key) { - if (map.count(key) == 0) { - CAF_LOG_DEBUG("could not find worker: " << CAF_ARG(key)); + if (map.count(key) == 0) return nullptr; - } return map.at(key); } diff --git a/libcaf_net/src/defaults.cpp b/libcaf_net/src/defaults.cpp index d4d2f9df..a9ddeb1d 100644 --- a/libcaf_net/src/defaults.cpp +++ b/libcaf_net/src/defaults.cpp @@ -26,4 +26,6 @@ const size_t max_header_buffers = 10; const uint16_t tcp_port = 0; +const uint16_t udp_port = 0; + } // namespace caf::defaults::middleman diff --git a/libcaf_net/src/net/backend/udp.cpp b/libcaf_net/src/net/backend/udp.cpp new file mode 100644 index 00000000..172d5825 --- /dev/null +++ b/libcaf_net/src/net/backend/udp.cpp @@ -0,0 +1,102 @@ +/****************************************************************************** + * ____ _ _____ * + * / ___| / \ | ___| C++ * + * | | / _ \ | |_ Actor * + * | |___ / ___ \| _| Framework * + * \____/_/ \_|_| * + * * + * Copyright 2011-2020 Dominik Charousset * + * * + * Distributed under the terms and conditions of the BSD 3-Clause License or * + * (at your option) under the terms and conditions of the Boost Software * + * License 1.0. See accompanying files LICENSE and LICENSE_ALTERNATIVE. * + * * + * If you did not receive a copy of the license files, see * + * http://opensource.org/licenses/BSD-3-Clause and * + * http://www.boost.org/LICENSE_1_0.txt. * + ******************************************************************************/ + +#include "caf/net/backend/udp.hpp" + +#include + +#include "caf/net/actor_proxy_impl.hpp" +#include "caf/net/basp/application.hpp" +#include "caf/net/basp/application_factory.hpp" +#include "caf/net/basp/ec.hpp" +#include "caf/net/datagram_transport.hpp" +#include "caf/net/ip.hpp" +#include "caf/net/make_endpoint_manager.hpp" +#include "caf/net/middleman.hpp" +#include "caf/net/socket_guard.hpp" +#include "caf/net/stream_transport.hpp" +#include "caf/net/udp_datagram_socket.hpp" +#include "caf/send.hpp" + +namespace caf::net::backend { + +udp::udp(middleman& mm) + : middleman_backend("udp"), mm_(mm), proxies_(mm.system(), *this) { + // nop +} + +udp::~udp() { + // nop +} + +error udp::init() { + uint16_t conf_port = get_or( + mm_.system().config(), "middleman.udp-port", defaults::middleman::udp_port); + ip_endpoint ep; + auto local_address = std::string("[::]:") + std::to_string(conf_port); + if (auto err = detail::parse(local_address, ep)) + return err; + auto sock = make_udp_datagram_socket(ep, true); + if (!sock) + return sock.error(); + auto guard = make_socket_guard(sock->first); + nonblocking(guard.socket(), true); + listening_port_ = sock->second; + CAF_LOG_INFO("udp socket spawned on " << CAF_ARG(listening_port)); + auto& mpx = mm_.mpx(); + ep_manager_ = make_endpoint_manager( + mpx, mm_.system(), + datagram_transport{guard.release(), basp::application_factory{proxies_}}); + if (auto err = ep_manager_->init()) { + CAF_LOG_ERROR("mgr->init() failed: " << err); + return err; + } + return none; +} + +void udp::stop() { + for (const auto& id : node_ids_) + proxies_.erase(id); + ep_manager_.reset(); +} + +expected udp::connect(const uri& locator) { + return make_error(sec::runtime_error, "connect called on udp backend"); +} + +endpoint_manager_ptr udp::peer(const node_id&) { + return ep_manager_; +} + +void udp::resolve(const uri& locator, const actor& listener) { + ep_manager_->resolve(locator, listener); +} + +strong_actor_ptr udp::make_proxy(node_id nid, actor_id aid) { + using impl_type = actor_proxy_impl; + using hdl_type = strong_actor_ptr; + actor_config cfg; + return make_actor(aid, nid, &mm_.system(), cfg, + peer(nid)); +} + +void udp::set_last_hop(node_id*) { + // nop +} + +} // namespace caf::net::backend diff --git a/libcaf_net/test/endpoint_manager.cpp b/libcaf_net/test/endpoint_manager.cpp index 8513b42c..9e42d2f0 100644 --- a/libcaf_net/test/endpoint_manager.cpp +++ b/libcaf_net/test/endpoint_manager.cpp @@ -145,6 +145,10 @@ class dummy_transport { // nop } + error emplace(const uri&) { + return none; + } + private: stream_socket handle_; diff --git a/libcaf_net/test/net/backend/udp.cpp b/libcaf_net/test/net/backend/udp.cpp new file mode 100644 index 00000000..2f49784c --- /dev/null +++ b/libcaf_net/test/net/backend/udp.cpp @@ -0,0 +1,139 @@ +/****************************************************************************** + * ____ _ _____ * + * / ___| / \ | ___| C++ * + * | | / _ \ | |_ Actor * + * | |___ / ___ \| _| Framework * + * \____/_/ \_|_| * + * * + * Copyright 2011-2019 Dominik Charousset * + * * + * Distributed under the terms and conditions of the BSD 3-Clause License or * + * (at your option) under the terms and conditions of the Boost Software * + * License 1.0. See accompanying files LICENSE and LICENSE_ALTERNATIVE. * + * * + * If you did not receive a copy of the license files, see * + * http://opensource.org/licenses/BSD-3-Clause and * + * http://www.boost.org/LICENSE_1_0.txt. * + ******************************************************************************/ + +#define CAF_SUITE net.backend.udp + +#include "caf/net/backend/udp.hpp" + +#include "caf/net/test/host_fixture.hpp" +#include "caf/test/dsl.hpp" + +#include +#include + +#include "caf/actor_system_config.hpp" +#include "caf/ip_endpoint.hpp" +#include "caf/net/middleman.hpp" +#include "caf/net/socket_guard.hpp" +#include "caf/uri.hpp" + +using namespace caf; +using namespace caf::net; +using namespace std::literals::string_literals; + +namespace { + +behavior dummy_actor(event_based_actor*) { + return { + // nop + }; +} + +struct earth_node { + uri operator()() { + return unbox(make_uri("udp://earth")); + } +}; + +struct mars_node { + uri operator()() { + return unbox(make_uri("udp://mars")); + } +}; + +template +struct config : actor_system_config { + config() { + Node this_node; + put(content, "middleman.this-node", this_node()); + load(); + } +}; + +class planet_driver { +public: + virtual ~planet_driver() = default; + + virtual bool handle_io_event() = 0; +}; + +template +class planet : public test_coordinator_fixture> { +public: + planet(planet_driver& driver) + : mm(this->sys.network_manager()), mpx(mm.mpx()), driver_(driver) { + mpx->set_thread_id(); + } + + node_id id() const { + return this->sys.node(); + } + + bool handle_io_event() override { + return driver_.handle_io_event(); + } + + net::middleman& mm; + multiplexer_ptr mpx; + +private: + planet_driver& driver_; +}; + +struct fixture : host_fixture, planet_driver { + fixture() : earth(*this), mars(*this) { + earth.run(); + mars.run(); + CAF_REQUIRE_EQUAL(earth.mpx->num_socket_managers(), 2); + CAF_REQUIRE_EQUAL(mars.mpx->num_socket_managers(), 2); + } + + bool handle_io_event() override { + return earth.mpx->poll_once(false) || mars.mpx->poll_once(false); + } + + void run() { + earth.run(); + } + + planet earth; + planet mars; +}; + +} // namespace + +CAF_TEST_FIXTURE_SCOPE(udp_backend_tests, fixture) + +CAF_TEST(doorman accept) { + // nop +} + +CAF_TEST(publish) { + auto dummy = earth.sys.spawn(dummy_actor); + auto path = "dummy"s; + CAF_MESSAGE("publishing actor " << CAF_ARG(path)); + earth.mm.publish(dummy, path); + CAF_MESSAGE("check registry for " << CAF_ARG(path)); + CAF_CHECK_NOT_EQUAL(earth.sys.registry().get(path), nullptr); +} + +CAF_TEST(resolve) { + // nop +} + +CAF_TEST_FIXTURE_SCOPE_END() From 3f74c77a41d49c9e484a1e5e3f33e82332fe121c Mon Sep 17 00:00:00 2001 From: Jakob Otto Date: Fri, 12 Jun 2020 18:35:29 +0200 Subject: [PATCH 3/8] Fix test, address nodiscard warnings --- libcaf_net/caf/net/doorman.hpp | 2 +- libcaf_net/src/net/backend/udp.cpp | 7 ++++--- libcaf_net/test/datagram_transport.cpp | 4 ++-- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/libcaf_net/caf/net/doorman.hpp b/libcaf_net/caf/net/doorman.hpp index f98fd642..7461b9d9 100644 --- a/libcaf_net/caf/net/doorman.hpp +++ b/libcaf_net/caf/net/doorman.hpp @@ -121,7 +121,7 @@ class doorman { CAF_LOG_ERROR("doorman encounterd error: " << err); } - error emplace(const uri& locator) { + error emplace(const uri&) { return make_error(sec::runtime_error, "function not implemented"); } diff --git a/libcaf_net/src/net/backend/udp.cpp b/libcaf_net/src/net/backend/udp.cpp index 172d5825..74602fb8 100644 --- a/libcaf_net/src/net/backend/udp.cpp +++ b/libcaf_net/src/net/backend/udp.cpp @@ -55,9 +55,10 @@ error udp::init() { if (!sock) return sock.error(); auto guard = make_socket_guard(sock->first); - nonblocking(guard.socket(), true); + if (auto err = nonblocking(guard.socket(), true)) + return err; listening_port_ = sock->second; - CAF_LOG_INFO("udp socket spawned on " << CAF_ARG(listening_port)); + CAF_LOG_INFO("udp socket spawned on " << CAF_ARG(listening_port_)); auto& mpx = mm_.mpx(); ep_manager_ = make_endpoint_manager( mpx, mm_.system(), @@ -75,7 +76,7 @@ void udp::stop() { ep_manager_.reset(); } -expected udp::connect(const uri& locator) { +expected udp::connect(const uri&) { return make_error(sec::runtime_error, "connect called on udp backend"); } diff --git a/libcaf_net/test/datagram_transport.cpp b/libcaf_net/test/datagram_transport.cpp index 4697fec4..020cf260 100644 --- a/libcaf_net/test/datagram_transport.cpp +++ b/libcaf_net/test/datagram_transport.cpp @@ -135,7 +135,7 @@ class dummy_application { template void resolve(Parent& parent, string_view path, const actor& listener) { actor_id aid = 42; - auto uri = unbox(make_uri("test:/id/42")); + auto uri = unbox(make_uri("test://earth/id/42")); auto nid = make_node_id(uri); actor_config cfg; endpoint_manager_ptr ptr{&parent.manager()}; @@ -214,7 +214,7 @@ CAF_TEST(receive) { CAF_TEST(resolve and proxy communication) { using transport_type = datagram_transport; byte_buffer recv_buf(1024); - auto uri = unbox(make_uri("test:/id/42")); + auto uri = unbox(make_uri("test://earth/id/42")); auto mgr = make_endpoint_manager( mpx, sys, transport_type{send_socket, dummy_application_factory{shared_buf}}); From c125c14a44349e1f7d281ec4d48ee33c33041b3d Mon Sep 17 00:00:00 2001 From: Jakob Otto Date: Sat, 13 Jun 2020 16:06:28 +0200 Subject: [PATCH 4/8] Fix dispatcher receive/send --- libcaf_net/caf/net/backend/udp.hpp | 7 +---- libcaf_net/caf/net/datagram_transport.hpp | 4 +++ .../caf/net/transport_worker_dispatcher.hpp | 29 ++++++++++++------- libcaf_net/src/net/backend/udp.cpp | 16 ++++++---- libcaf_net/test/net/backend/udp.cpp | 25 +++++++++++++--- 5 files changed, 55 insertions(+), 26 deletions(-) diff --git a/libcaf_net/caf/net/backend/udp.hpp b/libcaf_net/caf/net/backend/udp.hpp index 52e7f46e..1db203e4 100644 --- a/libcaf_net/caf/net/backend/udp.hpp +++ b/libcaf_net/caf/net/backend/udp.hpp @@ -70,12 +70,7 @@ class CAF_NET_EXPORT udp : public middleman_backend { return listening_port_; } - template - expected emplace(const uri& locator) { - if (auto err = ep_manager_->emplace(locator)) - return err; - return ep_manager_; - } + expected emplace(const uri& locator); private: middleman& mm_; diff --git a/libcaf_net/caf/net/datagram_transport.hpp b/libcaf_net/caf/net/datagram_transport.hpp index 4c6739f5..132aef0e 100644 --- a/libcaf_net/caf/net/datagram_transport.hpp +++ b/libcaf_net/caf/net/datagram_transport.hpp @@ -170,6 +170,10 @@ class datagram_transport : public datagram_transport_base { return ret.error(); } + const transport_worker_dispatcher& next_layer() { + return this->next_layer_; + } + private: // -- utility functions ------------------------------------------------------ diff --git a/libcaf_net/caf/net/transport_worker_dispatcher.hpp b/libcaf_net/caf/net/transport_worker_dispatcher.hpp index cc04cb93..e6a9ba00 100644 --- a/libcaf_net/caf/net/transport_worker_dispatcher.hpp +++ b/libcaf_net/caf/net/transport_worker_dispatcher.hpp @@ -68,8 +68,11 @@ class transport_worker_dispatcher { error handle_data(Parent& parent, span data, id_type id) { if (auto worker = find_worker(id)) return worker->handle_data(parent, data); - // TODO: Where to get node_id from here? - auto worker = add_new_worker(parent, node_id{}, id); + + auto locator = make_uri("udp://" + to_string(id)); + if (!locator) + return locator.error(); + auto worker = add_new_worker(parent, make_node_id(*locator), id); if (worker) return (*worker)->handle_data(parent, data); else @@ -80,16 +83,16 @@ class transport_worker_dispatcher { void write_message(Parent& parent, std::unique_ptr msg) { auto receiver = msg->receiver; - if (!receiver) + if (!receiver) { + CAF_LOG_ERROR("no receiver was specified"); return; + } auto nid = receiver->node(); - if (auto worker = find_worker(nid)) { + auto worker = find_worker(nid); + if (!worker) + CAF_LOG_ERROR("could not find worker for endpoint"); + else worker->write_message(parent, std::move(msg)); - return; - } - // TODO: where to get id_type from here? - if (auto worker = add_new_worker(parent, nid, id_type{})) - (*worker)->write_message(parent, std::move(msg)); } template @@ -101,7 +104,7 @@ class transport_worker_dispatcher { auto& worker = *ret; worker->resolve(parent, locator.path(), listener); } else { - CAF_LOG_ERROR("emplace failed: " << ret.error()); + anon_send(listener, ret.error()); } } } @@ -146,7 +149,7 @@ class transport_worker_dispatcher { if (auto hostname = get_if(&auth.host)) { auto addrs = ip::resolve(*hostname); if (addrs.empty()) - return sec::cannot_connect_to_node; + return sec::remote_lookup_failed; addr = addrs.at(0); } else { addr = *get_if(&auth.host); @@ -168,6 +171,10 @@ class transport_worker_dispatcher { return std::move(worker); } + size_t num_workers() const { + return workers_by_id_.size(); + } + private: worker_ptr find_worker(const node_id& nid) { return find_worker_impl(workers_by_node_, nid); diff --git a/libcaf_net/src/net/backend/udp.cpp b/libcaf_net/src/net/backend/udp.cpp index 74602fb8..c8e4cd0d 100644 --- a/libcaf_net/src/net/backend/udp.cpp +++ b/libcaf_net/src/net/backend/udp.cpp @@ -45,12 +45,12 @@ udp::~udp() { } error udp::init() { - uint16_t conf_port = get_or( - mm_.system().config(), "middleman.udp-port", defaults::middleman::udp_port); - ip_endpoint ep; - auto local_address = std::string("[::]:") + std::to_string(conf_port); - if (auto err = detail::parse(local_address, ep)) + auto conf_port = get_or(mm_.system().config(), "middleman.udp-port", + defaults::middleman::udp_port); + ip_address addr; + if (auto err = parse("0.0.0.0", addr)) return err; + auto ep = ip_endpoint(addr, conf_port); auto sock = make_udp_datagram_socket(ep, true); if (!sock) return sock.error(); @@ -100,4 +100,10 @@ void udp::set_last_hop(node_id*) { // nop } +expected udp::emplace(const uri& locator) { + if (auto err = ep_manager_->emplace(locator)) + return err; + return ep_manager_; +} + } // namespace caf::net::backend diff --git a/libcaf_net/test/net/backend/udp.cpp b/libcaf_net/test/net/backend/udp.cpp index 2f49784c..c1a549f3 100644 --- a/libcaf_net/test/net/backend/udp.cpp +++ b/libcaf_net/test/net/backend/udp.cpp @@ -28,6 +28,7 @@ #include "caf/actor_system_config.hpp" #include "caf/ip_endpoint.hpp" +#include "caf/net/ip.hpp" #include "caf/net/middleman.hpp" #include "caf/net/socket_guard.hpp" #include "caf/uri.hpp" @@ -46,13 +47,21 @@ behavior dummy_actor(event_based_actor*) { struct earth_node { uri operator()() { - return unbox(make_uri("udp://earth")); + return unbox(make_uri("udp://127.0.0.1:12345")); + } + + uint16_t port() { + return 12345; } }; struct mars_node { uri operator()() { - return unbox(make_uri("udp://mars")); + return unbox(make_uri("udp://127.0.0.1:12346")); + } + + uint16_t port() { + return 12346; } }; @@ -60,6 +69,7 @@ template struct config : actor_system_config { config() { Node this_node; + put(content, "middleman.udp-port", this_node.port()); put(content, "middleman.this-node", this_node()); load(); } @@ -88,6 +98,13 @@ class planet : public test_coordinator_fixture> { return driver_.handle_io_event(); } + uint16_t port() { + return mm.port("udp"); + } + + uri locator() { + } + net::middleman& mm; multiplexer_ptr mpx; @@ -119,8 +136,8 @@ struct fixture : host_fixture, planet_driver { CAF_TEST_FIXTURE_SCOPE(udp_backend_tests, fixture) -CAF_TEST(doorman accept) { - // nop +CAF_TEST(worker creation) { + // CAF_CHECK(earth.mm.backend("udp").emplace(make_node_id())); } CAF_TEST(publish) { From 616b4cfa41096a3feb21545371dcf9d92d61d3dc Mon Sep 17 00:00:00 2001 From: Jakob Otto Date: Tue, 16 Jun 2020 17:25:23 +0200 Subject: [PATCH 5/8] Fix datagram_adaptor::handle_data() --- libcaf_net/caf/net/backend/udp.hpp | 3 ++ .../net/basp/datagram_application_factory.hpp | 52 +++++++++++++++++++ libcaf_net/caf/net/datagram_adaptor.hpp | 51 +++++++++++++++--- .../caf/net/transport_worker_dispatcher.hpp | 7 ++- libcaf_net/src/net/backend/udp.cpp | 34 +++++++----- libcaf_net/test/net/backend/udp.cpp | 6 +-- 6 files changed, 125 insertions(+), 28 deletions(-) create mode 100644 libcaf_net/caf/net/basp/datagram_application_factory.hpp diff --git a/libcaf_net/caf/net/backend/udp.hpp b/libcaf_net/caf/net/backend/udp.hpp index 1db203e4..2514a682 100644 --- a/libcaf_net/caf/net/backend/udp.hpp +++ b/libcaf_net/caf/net/backend/udp.hpp @@ -72,6 +72,9 @@ class CAF_NET_EXPORT udp : public middleman_backend { expected emplace(const uri& locator); + expected emplace(udp_datagram_socket sock, + uint16_t port); + private: middleman& mm_; diff --git a/libcaf_net/caf/net/basp/datagram_application_factory.hpp b/libcaf_net/caf/net/basp/datagram_application_factory.hpp new file mode 100644 index 00000000..4695ba32 --- /dev/null +++ b/libcaf_net/caf/net/basp/datagram_application_factory.hpp @@ -0,0 +1,52 @@ +/****************************************************************************** + * ____ _ _____ * + * / ___| / \ | ___| C++ * + * | | / _ \ | |_ Actor * + * | |___ / ___ \| _| Framework * + * \____/_/ \_|_| * + * * + * Copyright 2011-2020 Dominik Charousset * + * * + * Distributed under the terms and conditions of the BSD 3-Clause License or * + * (at your option) under the terms and conditions of the Boost Software * + * License 1.0. See accompanying files LICENSE and LICENSE_ALTERNATIVE. * + * * + * If you did not receive a copy of the license files, see * + * http://opensource.org/licenses/BSD-3-Clause and * + * http://www.boost.org/LICENSE_1_0.txt. * + ******************************************************************************/ + +#pragma once + +#include "caf/detail/net_export.hpp" +#include "caf/error.hpp" +#include "caf/net/basp/application.hpp" +#include "caf/net/datagram_adaptor.hpp" +#include "caf/proxy_registry.hpp" + +namespace caf::net::basp { + +/// Factory for datagram oriented basp::applications. +/// @relates doorman +class CAF_NET_EXPORT datagram_application_factory { +public: + using application_type = datagram_adaptor; + + datagram_application_factory(proxy_registry& proxies) : proxies_(proxies) { + // nop + } + + template + error init(Parent&) { + return none; + } + + application_type make() const { + return application_type{basp::application{proxies_}}; + } + +private: + proxy_registry& proxies_; +}; + +} // namespace caf::net::basp diff --git a/libcaf_net/caf/net/datagram_adaptor.hpp b/libcaf_net/caf/net/datagram_adaptor.hpp index 3aff63fe..c24f0088 100644 --- a/libcaf_net/caf/net/datagram_adaptor.hpp +++ b/libcaf_net/caf/net/datagram_adaptor.hpp @@ -28,20 +28,55 @@ template class datagram_adaptor { public: datagram_adaptor(Application application) - : application_(std::move(application)), missing_(0), passed_(0) { + : application_(std::move(application)) { // nop } template - error handle_data(Parent&, span received) { - auto writer = make_packet_writer_decorator(*this, parent); - if (auto err = application_.handle_data( - writer_, make_span(received.data(), basp::header_size))) - return err; + error init(Parent& parent) { + return application_.init(parent); + } + + template + error write_message(Parent& parent, + std::unique_ptr msg) { + return application_.write_message(parent, std::move(msg)); + } + + template + error handle_data(Parent& parent, span received) { if (auto err = application_.handle_data( - writer_, make_span(received.data() + basp::header_size, - received.size() - basp::header_size)) + parent, make_span(received.data(), basp::header_size))) return err; + auto data = received.data() + basp::header_size; + auto size = received.size() - basp::header_size; + if (size > 0) + return application_.handle_data(parent, make_span(data, size)); + return none; + } + + template + void resolve(Parent& parent, string_view path, const actor& listener) { + application_.resolve(parent, path, listener); + } + + template + void new_proxy(Parent& parent, actor_id id) { + application_.new_proxy(parent, id); + } + + template + void local_actor_down(Parent& parent, actor_id id, error reason) { + application_.local_actor_down(parent, id, std::move(reason)); + } + + template + void timeout(Parent& parent, std::string tag, uint64_t id) { + application_.timeout(parent, std::move(tag), id); + } + + void handle_error(sec error) { + application_.handle_error(error); } private: diff --git a/libcaf_net/caf/net/transport_worker_dispatcher.hpp b/libcaf_net/caf/net/transport_worker_dispatcher.hpp index e6a9ba00..33ca2f84 100644 --- a/libcaf_net/caf/net/transport_worker_dispatcher.hpp +++ b/libcaf_net/caf/net/transport_worker_dispatcher.hpp @@ -66,14 +66,13 @@ class transport_worker_dispatcher { template error handle_data(Parent& parent, span data, id_type id) { - if (auto worker = find_worker(id)) + if (auto worker = find_worker(id)) { return worker->handle_data(parent, data); - + } auto locator = make_uri("udp://" + to_string(id)); if (!locator) return locator.error(); - auto worker = add_new_worker(parent, make_node_id(*locator), id); - if (worker) + if (auto worker = add_new_worker(parent, make_node_id(*locator), id)) return (*worker)->handle_data(parent, data); else return std::move(worker.error()); diff --git a/libcaf_net/src/net/backend/udp.cpp b/libcaf_net/src/net/backend/udp.cpp index c8e4cd0d..aa7c1369 100644 --- a/libcaf_net/src/net/backend/udp.cpp +++ b/libcaf_net/src/net/backend/udp.cpp @@ -22,7 +22,7 @@ #include "caf/net/actor_proxy_impl.hpp" #include "caf/net/basp/application.hpp" -#include "caf/net/basp/application_factory.hpp" +#include "caf/net/basp/datagram_application_factory.hpp" #include "caf/net/basp/ec.hpp" #include "caf/net/datagram_transport.hpp" #include "caf/net/ip.hpp" @@ -55,18 +55,8 @@ error udp::init() { if (!sock) return sock.error(); auto guard = make_socket_guard(sock->first); - if (auto err = nonblocking(guard.socket(), true)) - return err; - listening_port_ = sock->second; - CAF_LOG_INFO("udp socket spawned on " << CAF_ARG(listening_port_)); - auto& mpx = mm_.mpx(); - ep_manager_ = make_endpoint_manager( - mpx, mm_.system(), - datagram_transport{guard.release(), basp::application_factory{proxies_}}); - if (auto err = ep_manager_->init()) { - CAF_LOG_ERROR("mgr->init() failed: " << err); - return err; - } + CAF_LOG_INFO("udp socket spawned on " << CAF_ARG2("port", sock->second)); + emplace(sock->first, sock->second); return none; } @@ -106,4 +96,22 @@ expected udp::emplace(const uri& locator) { return ep_manager_; } +expected udp::emplace(udp_datagram_socket sock, + uint16_t port) { + auto guard = make_socket_guard(sock); + listening_port_ = port; + if (auto err = nonblocking(guard.socket(), true)) + return err; + auto& mpx = mm_.mpx(); + ep_manager_ = make_endpoint_manager( + mpx, mm_.system(), + datagram_transport{guard.release(), + basp::datagram_application_factory{proxies_}}); + if (auto err = ep_manager_->init()) { + CAF_LOG_ERROR("mgr->init() failed: " << err); + return err; + } + return ep_manager_; +} + } // namespace caf::net::backend diff --git a/libcaf_net/test/net/backend/udp.cpp b/libcaf_net/test/net/backend/udp.cpp index c1a549f3..bffd4e5b 100644 --- a/libcaf_net/test/net/backend/udp.cpp +++ b/libcaf_net/test/net/backend/udp.cpp @@ -116,8 +116,8 @@ struct fixture : host_fixture, planet_driver { fixture() : earth(*this), mars(*this) { earth.run(); mars.run(); - CAF_REQUIRE_EQUAL(earth.mpx->num_socket_managers(), 2); - CAF_REQUIRE_EQUAL(mars.mpx->num_socket_managers(), 2); + // CAF_REQUIRE_EQUAL(earth.mpx->num_socket_managers(), 2); + // CAF_REQUIRE_EQUAL(mars.mpx->num_socket_managers(), 2); } bool handle_io_event() override { @@ -137,7 +137,7 @@ struct fixture : host_fixture, planet_driver { CAF_TEST_FIXTURE_SCOPE(udp_backend_tests, fixture) CAF_TEST(worker creation) { - // CAF_CHECK(earth.mm.backend("udp").emplace(make_node_id())); + // CAF_CHECK(earth.mm.backend("udp").emplace(make_node_id())); } CAF_TEST(publish) { From 2a39d52eaa6b350a50938d8f7bc66ceeaa3f3188 Mon Sep 17 00:00:00 2001 From: Jakob Otto Date: Tue, 16 Jun 2020 17:45:30 +0200 Subject: [PATCH 6/8] Cleanup --- libcaf_net/caf/net/backend/udp.hpp | 7 ++++--- .../caf/net/basp/datagram_application_factory.hpp | 2 +- libcaf_net/caf/net/datagram_transport.hpp | 13 ++----------- libcaf_net/caf/net/endpoint_manager.hpp | 2 -- libcaf_net/caf/net/endpoint_manager_impl.hpp | 4 ---- libcaf_net/caf/net/transport_base.hpp | 4 ---- libcaf_net/caf/net/transport_worker_dispatcher.hpp | 13 +++++-------- libcaf_net/src/net/backend/udp.cpp | 9 ++------- libcaf_net/test/endpoint_manager.cpp | 4 ---- 9 files changed, 14 insertions(+), 44 deletions(-) diff --git a/libcaf_net/caf/net/backend/udp.hpp b/libcaf_net/caf/net/backend/udp.hpp index 2514a682..d3c47f77 100644 --- a/libcaf_net/caf/net/backend/udp.hpp +++ b/libcaf_net/caf/net/backend/udp.hpp @@ -19,6 +19,7 @@ #pragma once #include +#include #include "caf/detail/net_export.hpp" #include "caf/error.hpp" @@ -50,7 +51,7 @@ class CAF_NET_EXPORT udp : public middleman_backend { void stop() override; - expected connect(const uri& locator) override; + expected get_or_connect(const uri& locator) override; endpoint_manager_ptr peer(const node_id&) override; @@ -70,8 +71,6 @@ class CAF_NET_EXPORT udp : public middleman_backend { return listening_port_; } - expected emplace(const uri& locator); - expected emplace(udp_datagram_socket sock, uint16_t port); @@ -85,6 +84,8 @@ class CAF_NET_EXPORT udp : public middleman_backend { proxy_registry proxies_; uint16_t listening_port_; + + std::mutex lock_; }; } // namespace caf::net::backend diff --git a/libcaf_net/caf/net/basp/datagram_application_factory.hpp b/libcaf_net/caf/net/basp/datagram_application_factory.hpp index 4695ba32..83f40866 100644 --- a/libcaf_net/caf/net/basp/datagram_application_factory.hpp +++ b/libcaf_net/caf/net/basp/datagram_application_factory.hpp @@ -27,7 +27,7 @@ namespace caf::net::basp { /// Factory for datagram oriented basp::applications. -/// @relates doorman +/// @relates transport_worker_dispatcher class CAF_NET_EXPORT datagram_application_factory { public: using application_type = datagram_adaptor; diff --git a/libcaf_net/caf/net/datagram_transport.hpp b/libcaf_net/caf/net/datagram_transport.hpp index 132aef0e..8cee2df3 100644 --- a/libcaf_net/caf/net/datagram_transport.hpp +++ b/libcaf_net/caf/net/datagram_transport.hpp @@ -61,6 +61,8 @@ class datagram_transport : public datagram_transport_base { using buffer_cache_type = typename super::buffer_cache_type; + using dispatcher_type = transport_worker_dispatcher; + // -- constructors, destructors, and assignment operators -------------------- datagram_transport(udp_datagram_socket handle, factory_type factory) @@ -163,17 +165,6 @@ class datagram_transport : public datagram_transport_base { } }; - error emplace(const uri& locator) override { - if (auto ret = this->next_layer_.emplace(*this, locator)) - return none; - else - return ret.error(); - } - - const transport_worker_dispatcher& next_layer() { - return this->next_layer_; - } - private: // -- utility functions ------------------------------------------------------ diff --git a/libcaf_net/caf/net/endpoint_manager.hpp b/libcaf_net/caf/net/endpoint_manager.hpp index c44f522b..14bf249b 100644 --- a/libcaf_net/caf/net/endpoint_manager.hpp +++ b/libcaf_net/caf/net/endpoint_manager.hpp @@ -77,8 +77,6 @@ class CAF_NET_EXPORT endpoint_manager : public socket_manager { /// Initializes the manager before adding it to the multiplexer's event loop. virtual error init() = 0; - virtual error emplace(const uri& locator) = 0; - protected: bool enqueue(endpoint_manager_queue::element* ptr); diff --git a/libcaf_net/caf/net/endpoint_manager_impl.hpp b/libcaf_net/caf/net/endpoint_manager_impl.hpp index 66548333..c6523387 100644 --- a/libcaf_net/caf/net/endpoint_manager_impl.hpp +++ b/libcaf_net/caf/net/endpoint_manager_impl.hpp @@ -119,10 +119,6 @@ class endpoint_manager_impl : public endpoint_manager { transport_.handle_error(code); } - error emplace(const uri& locator) override { - return transport_.emplace(locator); - } - private: transport_type transport_; diff --git a/libcaf_net/caf/net/transport_base.hpp b/libcaf_net/caf/net/transport_base.hpp index a549e832..462ad9dd 100644 --- a/libcaf_net/caf/net/transport_base.hpp +++ b/libcaf_net/caf/net/transport_base.hpp @@ -192,10 +192,6 @@ class transport_base { /// @param buffers Pointers to the buffers that make up the packet content. virtual void write_packet(id_type id, span buffers) = 0; - virtual error emplace(const uri&) { - return make_error(sec::runtime_error, "function not implemented"); - } - // -- buffer management ------------------------------------------------------ /// Returns the next cached header buffer or creates a new one if no buffers diff --git a/libcaf_net/caf/net/transport_worker_dispatcher.hpp b/libcaf_net/caf/net/transport_worker_dispatcher.hpp index 33ca2f84..a652179f 100644 --- a/libcaf_net/caf/net/transport_worker_dispatcher.hpp +++ b/libcaf_net/caf/net/transport_worker_dispatcher.hpp @@ -66,9 +66,8 @@ class transport_worker_dispatcher { template error handle_data(Parent& parent, span data, id_type id) { - if (auto worker = find_worker(id)) { + if (auto worker = find_worker(id)) return worker->handle_data(parent, data); - } auto locator = make_uri("udp://" + to_string(id)); if (!locator) return locator.error(); @@ -96,9 +95,9 @@ class transport_worker_dispatcher { template void resolve(Parent& parent, const uri& locator, const actor& listener) { - if (auto worker = find_worker(make_node_id(*locator.authority_only()))) + if (auto worker = find_worker(make_node_id(*locator.authority_only()))) { worker->resolve(parent, locator.path(), listener); - else { + } else { if (auto ret = emplace(parent, locator)) { auto& worker = *ret; worker->resolve(parent, locator.path(), listener); @@ -135,10 +134,8 @@ class transport_worker_dispatcher { } void handle_error(sec error) { - for (const auto& p : workers_by_id_) { - auto worker = p.second; - worker->handle_error(error); - } + for (const auto& p : workers_by_id_) + p.second->handle_error(error); } template diff --git a/libcaf_net/src/net/backend/udp.cpp b/libcaf_net/src/net/backend/udp.cpp index aa7c1369..e043eeaa 100644 --- a/libcaf_net/src/net/backend/udp.cpp +++ b/libcaf_net/src/net/backend/udp.cpp @@ -66,7 +66,7 @@ void udp::stop() { ep_manager_.reset(); } -expected udp::connect(const uri&) { +expected udp::get_or_connect(const uri&) { return make_error(sec::runtime_error, "connect called on udp backend"); } @@ -90,12 +90,6 @@ void udp::set_last_hop(node_id*) { // nop } -expected udp::emplace(const uri& locator) { - if (auto err = ep_manager_->emplace(locator)) - return err; - return ep_manager_; -} - expected udp::emplace(udp_datagram_socket sock, uint16_t port) { auto guard = make_socket_guard(sock); @@ -103,6 +97,7 @@ expected udp::emplace(udp_datagram_socket sock, if (auto err = nonblocking(guard.socket(), true)) return err; auto& mpx = mm_.mpx(); + const std::lock_guard lock(lock_); ep_manager_ = make_endpoint_manager( mpx, mm_.system(), datagram_transport{guard.release(), diff --git a/libcaf_net/test/endpoint_manager.cpp b/libcaf_net/test/endpoint_manager.cpp index 9e42d2f0..8513b42c 100644 --- a/libcaf_net/test/endpoint_manager.cpp +++ b/libcaf_net/test/endpoint_manager.cpp @@ -145,10 +145,6 @@ class dummy_transport { // nop } - error emplace(const uri&) { - return none; - } - private: stream_socket handle_; From f9891c1aa6810ef56da2ddc5eb80ffec8a0cbcb4 Mon Sep 17 00:00:00 2001 From: Jakob Otto Date: Tue, 16 Jun 2020 18:46:03 +0200 Subject: [PATCH 7/8] Add resolve test for udp --- .../caf/net/transport_worker_dispatcher.hpp | 2 +- libcaf_net/src/net/backend/udp.cpp | 4 - libcaf_net/test/net/backend/tcp.cpp | 14 +-- libcaf_net/test/net/backend/udp.cpp | 117 +++++++----------- 4 files changed, 45 insertions(+), 92 deletions(-) diff --git a/libcaf_net/caf/net/transport_worker_dispatcher.hpp b/libcaf_net/caf/net/transport_worker_dispatcher.hpp index a652179f..115cc32e 100644 --- a/libcaf_net/caf/net/transport_worker_dispatcher.hpp +++ b/libcaf_net/caf/net/transport_worker_dispatcher.hpp @@ -164,7 +164,7 @@ class transport_worker_dispatcher { return err; workers_by_id_.emplace(std::move(id), worker); workers_by_node_.emplace(std::move(node), worker); - return std::move(worker); + return worker; } size_t num_workers() const { diff --git a/libcaf_net/src/net/backend/udp.cpp b/libcaf_net/src/net/backend/udp.cpp index e043eeaa..8b2be5cb 100644 --- a/libcaf_net/src/net/backend/udp.cpp +++ b/libcaf_net/src/net/backend/udp.cpp @@ -18,14 +18,10 @@ #include "caf/net/backend/udp.hpp" -#include - #include "caf/net/actor_proxy_impl.hpp" #include "caf/net/basp/application.hpp" #include "caf/net/basp/datagram_application_factory.hpp" -#include "caf/net/basp/ec.hpp" #include "caf/net/datagram_transport.hpp" -#include "caf/net/ip.hpp" #include "caf/net/make_endpoint_manager.hpp" #include "caf/net/middleman.hpp" #include "caf/net/socket_guard.hpp" diff --git a/libcaf_net/test/net/backend/tcp.cpp b/libcaf_net/test/net/backend/tcp.cpp index f48ef926..48892690 100644 --- a/libcaf_net/test/net/backend/tcp.cpp +++ b/libcaf_net/test/net/backend/tcp.cpp @@ -155,15 +155,6 @@ CAF_TEST(connect) { CAF_CHECK_EQUAL(earth.mpx->num_socket_managers(), 3); } -CAF_TEST(publish) { - auto dummy = earth.sys.spawn(dummy_actor); - auto path = "dummy"s; - CAF_MESSAGE("publishing actor " << CAF_ARG(path)); - earth.mm.publish(dummy, path); - CAF_MESSAGE("check registry for " << CAF_ARG(path)); - CAF_CHECK_NOT_EQUAL(earth.sys.registry().get(path), nullptr); -} - CAF_TEST(resolve) { using std::chrono::milliseconds; using std::chrono::seconds; @@ -180,9 +171,8 @@ CAF_TEST(resolve) { auto locator = unbox(make_uri("tcp://earth/name/dummy"s)); CAF_MESSAGE("resolve " << CAF_ARG(locator)); mars.mm.resolve(locator, mars.self); - mars.run(); - earth.run(); - mars.run(); + while (handle_io_event()) + ; mars.self->receive( [](strong_actor_ptr& ptr, const std::set&) { CAF_MESSAGE("resolved actor!"); diff --git a/libcaf_net/test/net/backend/udp.cpp b/libcaf_net/test/net/backend/udp.cpp index bffd4e5b..0807c4fd 100644 --- a/libcaf_net/test/net/backend/udp.cpp +++ b/libcaf_net/test/net/backend/udp.cpp @@ -45,112 +45,79 @@ behavior dummy_actor(event_based_actor*) { }; } -struct earth_node { - uri operator()() { - return unbox(make_uri("udp://127.0.0.1:12345")); - } - - uint16_t port() { - return 12345; - } -}; - -struct mars_node { - uri operator()() { - return unbox(make_uri("udp://127.0.0.1:12346")); - } - - uint16_t port() { - return 12346; - } -}; - -template struct config : actor_system_config { config() { - Node this_node; - put(content, "middleman.udp-port", this_node.port()); - put(content, "middleman.this-node", this_node()); + ip_endpoint ep; + CAF_REQUIRE_EQUAL(detail::parse("127.0.0.1:0"s, ep), none); + auto ret = unbox(make_udp_datagram_socket(ep)); + sock = ret.first; + port = ret.second; + this_node_str = "udp://127.0.0.1:"s + std::to_string(port); + auto this_node = unbox(make_uri(this_node_str)); + CAF_MESSAGE("datagram_socket spawned on " << CAF_ARG(this_node) << " " + << CAF_ARG(sock.id)); + put(content, "middleman.this-node", this_node); load(); } -}; -class planet_driver { -public: - virtual ~planet_driver() = default; - - virtual bool handle_io_event() = 0; + udp_datagram_socket sock; + uint16_t port; + std::string this_node_str; }; -template -class planet : public test_coordinator_fixture> { +class planet : public test_coordinator_fixture { public: - planet(planet_driver& driver) - : mm(this->sys.network_manager()), mpx(mm.mpx()), driver_(driver) { + planet() : mm(this->sys.network_manager()), mpx(mm.mpx()) { mpx->set_thread_id(); } - node_id id() const { - return this->sys.node(); - } - - bool handle_io_event() override { - return driver_.handle_io_event(); - } - - uint16_t port() { - return mm.port("udp"); - } - - uri locator() { + std::string locator_str() { + return cfg.this_node_str; } net::middleman& mm; multiplexer_ptr mpx; - -private: - planet_driver& driver_; }; -struct fixture : host_fixture, planet_driver { - fixture() : earth(*this), mars(*this) { - earth.run(); - mars.run(); - // CAF_REQUIRE_EQUAL(earth.mpx->num_socket_managers(), 2); - // CAF_REQUIRE_EQUAL(mars.mpx->num_socket_managers(), 2); - } - - bool handle_io_event() override { - return earth.mpx->poll_once(false) || mars.mpx->poll_once(false); +struct fixture : host_fixture { + fixture() { + dynamic_cast(earth.mm.backend("udp")) + ->emplace(earth.cfg.sock, earth.cfg.port); + dynamic_cast(mars.mm.backend("udp")) + ->emplace(mars.cfg.sock, mars.cfg.port); } - void run() { - earth.run(); + bool handle_io_event() { + return mars.mpx->poll_once(false) || earth.mpx->poll_once(false); } - planet earth; - planet mars; + planet earth; + planet mars; }; } // namespace CAF_TEST_FIXTURE_SCOPE(udp_backend_tests, fixture) -CAF_TEST(worker creation) { - // CAF_CHECK(earth.mm.backend("udp").emplace(make_node_id())); -} - -CAF_TEST(publish) { +CAF_TEST(resolve) { auto dummy = earth.sys.spawn(dummy_actor); auto path = "dummy"s; CAF_MESSAGE("publishing actor " << CAF_ARG(path)); earth.mm.publish(dummy, path); - CAF_MESSAGE("check registry for " << CAF_ARG(path)); - CAF_CHECK_NOT_EQUAL(earth.sys.registry().get(path), nullptr); -} - -CAF_TEST(resolve) { - // nop + auto locator = unbox(make_uri(earth.locator_str() + "/name/" + path)); + mars.mm.resolve(locator, mars.self); + while (handle_io_event()) + ; + mars.self->receive( + [](strong_actor_ptr& ptr, const std::set&) { + CAF_MESSAGE("resolved actor!"); + CAF_CHECK_NOT_EQUAL(ptr, nullptr); + }, + [](const error& err) { + CAF_FAIL("got error while resolving: " << CAF_ARG(err)); + }, + after(std::chrono::seconds(0)) >> + [] { CAF_FAIL("manager did not respond with a proxy."); }); } CAF_TEST_FIXTURE_SCOPE_END() From a0d726fea1085eb2dd806a95fb5ccfd4cac260a9 Mon Sep 17 00:00:00 2001 From: Jakob Otto Date: Tue, 16 Jun 2020 19:05:08 +0200 Subject: [PATCH 8/8] Cleanup test --- libcaf_net/caf/net/backend/udp.hpp | 5 ----- .../caf/net/transport_worker_dispatcher.hpp | 19 +++++++++---------- libcaf_net/test/net/backend/udp.cpp | 14 ++++++++------ 3 files changed, 17 insertions(+), 21 deletions(-) diff --git a/libcaf_net/caf/net/backend/udp.hpp b/libcaf_net/caf/net/backend/udp.hpp index d3c47f77..ec3a21d9 100644 --- a/libcaf_net/caf/net/backend/udp.hpp +++ b/libcaf_net/caf/net/backend/udp.hpp @@ -36,7 +36,6 @@ namespace caf::net::backend { /// Minimal backend for udp communication. -/// @warning this backend is *not* thread safe. class CAF_NET_EXPORT udp : public middleman_backend { public: // -- constructors, destructors, and assignment operators -------------------- @@ -63,10 +62,6 @@ class CAF_NET_EXPORT udp : public middleman_backend { // -- properties ------------------------------------------------------------- - udp_datagram_socket socket(const node_id&) { - return socket_cast(ep_manager_->handle()); - } - uint16_t port() const noexcept override { return listening_port_; } diff --git a/libcaf_net/caf/net/transport_worker_dispatcher.hpp b/libcaf_net/caf/net/transport_worker_dispatcher.hpp index 115cc32e..2a09e197 100644 --- a/libcaf_net/caf/net/transport_worker_dispatcher.hpp +++ b/libcaf_net/caf/net/transport_worker_dispatcher.hpp @@ -95,15 +95,18 @@ class transport_worker_dispatcher { template void resolve(Parent& parent, const uri& locator, const actor& listener) { - if (auto worker = find_worker(make_node_id(*locator.authority_only()))) { - worker->resolve(parent, locator.path(), listener); - } else { - if (auto ret = emplace(parent, locator)) { - auto& worker = *ret; + if (auto auth = locator.authority_only()) { + if (auto worker = find_worker(make_node_id(*auth))) { worker->resolve(parent, locator.path(), listener); } else { - anon_send(listener, ret.error()); + if (auto ret = emplace(parent, locator)) + (*ret)->resolve(parent, locator.path(), listener); + else + anon_send(listener, ret.error()); } + } else { + anon_send(listener, + make_error(sec::runtime_error, "could not get authority")); } } @@ -167,10 +170,6 @@ class transport_worker_dispatcher { return worker; } - size_t num_workers() const { - return workers_by_id_.size(); - } - private: worker_ptr find_worker(const node_id& nid) { return find_worker_impl(workers_by_node_, nid); diff --git a/libcaf_net/test/net/backend/udp.cpp b/libcaf_net/test/net/backend/udp.cpp index 0807c4fd..a9352784 100644 --- a/libcaf_net/test/net/backend/udp.cpp +++ b/libcaf_net/test/net/backend/udp.cpp @@ -5,7 +5,7 @@ * | |___ / ___ \| _| Framework * * \____/_/ \_|_| * * * - * Copyright 2011-2019 Dominik Charousset * + * Copyright 2011-2020 Dominik Charousset * * * * Distributed under the terms and conditions of the BSD 3-Clause License or * * (at your option) under the terms and conditions of the Boost Software * @@ -28,7 +28,6 @@ #include "caf/actor_system_config.hpp" #include "caf/ip_endpoint.hpp" -#include "caf/net/ip.hpp" #include "caf/net/middleman.hpp" #include "caf/net/socket_guard.hpp" #include "caf/uri.hpp" @@ -69,22 +68,24 @@ class planet : public test_coordinator_fixture { public: planet() : mm(this->sys.network_manager()), mpx(mm.mpx()) { mpx->set_thread_id(); + backend()->emplace(cfg.sock, cfg.port); } std::string locator_str() { return cfg.this_node_str; } + net::backend::udp* backend() { + return dynamic_cast(mm.backend("udp")); + } + net::middleman& mm; multiplexer_ptr mpx; }; struct fixture : host_fixture { fixture() { - dynamic_cast(earth.mm.backend("udp")) - ->emplace(earth.cfg.sock, earth.cfg.port); - dynamic_cast(mars.mm.backend("udp")) - ->emplace(mars.cfg.sock, mars.cfg.port); + // nop } bool handle_io_event() { @@ -105,6 +106,7 @@ CAF_TEST(resolve) { CAF_MESSAGE("publishing actor " << CAF_ARG(path)); earth.mm.publish(dummy, path); auto locator = unbox(make_uri(earth.locator_str() + "/name/" + path)); + CAF_MESSAGE("resolving " << CAF_ARG(locator)); mars.mm.resolve(locator, mars.self); while (handle_io_event()) ;