From 82f7b1e0cbf914c813c07243cc2f2c1b176c3f3c Mon Sep 17 00:00:00 2001 From: Stefano Colli Date: Mon, 9 Dec 2024 17:44:50 +0100 Subject: [PATCH] Add Receiver endpoint mode --- src/endpoint.cpp | 26 ++++++++++++++++++++++---- src/endpoint.h | 9 ++++++--- src/mainloop.cpp | 26 ++++++++++++++++++++++++-- 3 files changed, 52 insertions(+), 9 deletions(-) diff --git a/src/endpoint.cpp b/src/endpoint.cpp index e95b0360..9a03256c 100644 --- a/src/endpoint.cpp +++ b/src/endpoint.cpp @@ -1108,6 +1108,7 @@ bool UdpEndpoint::setup(UdpEndpointConfig conf) return false; } + this->_mode = conf.mode; if (!this->open(conf.address.c_str(), conf.port, conf.mode)) { log_error("Could not open %s:%ld", conf.address.c_str(), conf.port); return false; @@ -1184,7 +1185,7 @@ int UdpEndpoint::open_ipv6(const char *ip, unsigned long port, UdpEndpointConfig sockaddr6.sin6_port = htons(port); /* multicast address needs to listen to all, but "filter" incoming packets */ - if (mode == UdpEndpointConfig::Mode::Server && ipv6_is_multicast(ip_str)) { + if ((mode == UdpEndpointConfig::Mode::Server || mode == UdpEndpointConfig::Mode::Receiver) && ipv6_is_multicast(ip_str)) { sockaddr6.sin6_addr = in6addr_any; struct ipv6_mreq group; @@ -1204,7 +1205,7 @@ int UdpEndpoint::open_ipv6(const char *ip, unsigned long port, UdpEndpointConfig sockaddr6.sin6_scope_id = ipv6_get_scope_id(ip_str); } - if (mode == UdpEndpointConfig::Mode::Server) { + if (mode == UdpEndpointConfig::Mode::Server || mode == UdpEndpointConfig::Mode::Receiver) { if (bind(fd, (struct sockaddr *)&sockaddr6, sizeof(sockaddr6)) < 0) { log_error("Error binding IPv6 socket for [%s]:%lu (%m)", ip_str, port); goto fail; @@ -1236,7 +1237,7 @@ int UdpEndpoint::open_ipv4(const char *ip, unsigned long port, UdpEndpointConfig sockaddr.sin_addr.s_addr = inet_addr(ip); sockaddr.sin_port = htons(port); - if (mode == UdpEndpointConfig::Mode::Server) { + if (mode == UdpEndpointConfig::Mode::Server || mode == UdpEndpointConfig::Mode::Receiver) { if (bind(fd, (struct sockaddr *)&sockaddr, sizeof(sockaddr)) < 0) { log_error("Error binding IPv4 socket for %s:%lu (%m)", ip, port); goto fail; @@ -1292,6 +1293,8 @@ bool UdpEndpoint::open(const char *ip, unsigned long port, UdpEndpointConfig::Mo if (mode == UdpEndpointConfig::Mode::Server) { log_info("Opened UDP Server [%d]%s: %s:%lu", fd, _name.c_str(), ip, port); + } else if (mode == UdpEndpointConfig::Mode::Receiver) { + log_info("Opened UDP Receiver [%d]%s: %s:%lu", fd, _name.c_str(), ip, port); } else { log_info("Opened UDP Client [%d]%s: %s:%lu", fd, _name.c_str(), ip, port); } @@ -1484,6 +1487,8 @@ int UdpEndpoint::parse_udp_mode(const char *val, size_t val_len, void *storage, *udp_mode = UdpEndpointConfig::Mode::Server; } else if (memcaseeq(val, val_len, "server", sizeof("server") - 1)) { *udp_mode = UdpEndpointConfig::Mode::Server; + } else if (memcaseeq(val, val_len, "receiver", sizeof("receiver") - 1)) { + *udp_mode = UdpEndpointConfig::Mode::Receiver; } else { log_error("Unknown 'mode' key: %.*s", (int)val_len, val); return -EINVAL; @@ -1514,13 +1519,26 @@ bool UdpEndpoint::validate_config(const UdpEndpointConfig &config) } if (config.mode != UdpEndpointConfig::Mode::Client - && config.mode != UdpEndpointConfig::Mode::Server) { + && config.mode != UdpEndpointConfig::Mode::Server + && config.mode != UdpEndpointConfig::Mode::Receiver) { return false; } return true; } +Endpoint::AcceptState UdpEndpoint::accept_msg(const struct buffer *pbuf) const +{ + // reject when UDP endpoint is in receiver mode + if (this->_mode == UdpEndpointConfig::Mode::Receiver) { + log_trace("Endpoint [%d]%s: in Receiver mode, not sending back any msg", fd, _name.c_str()); + return Endpoint::AcceptState::Filtered; + } + + // otherwise: refer to standard accept rules + return Endpoint::accept_msg(pbuf); +} + TcpEndpoint::TcpEndpoint(std::string name) : Endpoint{ENDPOINT_TYPE_TCP, std::move(name)} { diff --git a/src/endpoint.h b/src/endpoint.h index b2329c20..794b43a0 100644 --- a/src/endpoint.h +++ b/src/endpoint.h @@ -58,7 +58,7 @@ struct UartEndpointConfig { }; struct UdpEndpointConfig { - enum class Mode { Undefined = 0, Server, Client }; + enum class Mode { Undefined = 0, Server, Client, Receiver }; std::string name; std::string address; @@ -184,7 +184,7 @@ class Endpoint : public Pollable { return has_sys_comp_id(sys_comp_id); } - AcceptState accept_msg(const struct buffer *pbuf) const; + AcceptState virtual accept_msg(const struct buffer *pbuf) const; void filter_add_allowed_out_msg_id(uint32_t msg_id) { @@ -346,6 +346,8 @@ class UdpEndpoint : public Endpoint { static int parse_udp_mode(const char *val, size_t val_len, void *storage, size_t storage_len); static bool validate_config(const UdpEndpointConfig &config); + Endpoint::AcceptState accept_msg(const struct buffer *pbuf) const override; + void add_no_coalesce_msg_id(uint32_t msg_id) { _coalesce_nodelay.insert(msg_id); @@ -375,6 +377,7 @@ class UdpEndpoint : public Endpoint { unsigned int _coalesce_bytes = 0UL; // max coalescence size unsigned long _coalesce_ms = 0UL; // max time to hold data to try to coalesce packets together + UdpEndpointConfig::Mode _mode = UdpEndpointConfig::Mode::Undefined; private: bool is_ipv6; struct sockaddr_in sockaddr; @@ -393,7 +396,7 @@ class TcpEndpoint : public Endpoint { bool is_valid() override { return _valid; }; bool is_critical() override { return false; }; - Endpoint::AcceptState accept_msg(const struct buffer *pbuf) const; + Endpoint::AcceptState accept_msg(const struct buffer *pbuf) const override; int accept(int listener_fd); ///< accept incoming connection bool setup(TcpEndpointConfig conf); ///< open connection and apply config diff --git a/src/mainloop.cpp b/src/mainloop.cpp index 7d91ec05..21fcb47c 100644 --- a/src/mainloop.cpp +++ b/src/mainloop.cpp @@ -298,15 +298,37 @@ void Mainloop::handle_command_pipe() log_trace("Malformed port in add command"); return; } - UdpEndpointConfig::Mode mode = a[5] == "server" ? UdpEndpointConfig::Mode::Server : UdpEndpointConfig::Mode::Client; + auto to_create = std::find_if(g_endpoints.begin(), g_endpoints.end(), + [&a](const std::shared_ptr e) {return e->get_name() == a[2];}); + if (to_create != g_endpoints.end()) { + log_error("Endpoint named \"%s\" already exists, please choose another name", a[2].c_str()); + return; + } // Command to UDP endpoint configuration UdpEndpointConfig conf{}; - conf.mode = mode; + conf.name = a[2]; conf.address = a[3]; conf.port = port; + // UDP endpoint mode + if(a[5] == "server" || a[5] == "Server") { + conf.mode = UdpEndpointConfig::Mode::Server; + } + else if (a[5] == "eavesdropping" || a[5] == "Eavesdropping") { + conf.mode = UdpEndpointConfig::Mode::Server; + } + else if (a[5] == "receiver" || a[5] == "Receiver") { + conf.mode = UdpEndpointConfig::Mode::Receiver; + } + else if (a[5] == "client" || a[5] == "Client") { + conf.mode = UdpEndpointConfig::Mode::Client; + } + else { + conf.mode = UdpEndpointConfig::Mode::Undefined; + } + if (a.size() > 6) { // group name provided conf.group = a[6] == "NULL" ? "" : a[6]; }