From 18e2c080c1e61451329e5eb2e2124385637ab81c Mon Sep 17 00:00:00 2001 From: Lasse Rosenow Date: Thu, 24 Oct 2024 14:16:26 +0000 Subject: [PATCH 01/18] Migrate to new NetworkChannel interface --- include/reactor-uc/network_channel.h | 19 +- .../platform/posix/tcp_ip_channel.h | 4 +- src/federated.c | 4 +- src/platform/posix/tcp_ip_channel.c | 169 +++++++++--------- 4 files changed, 101 insertions(+), 95 deletions(-) diff --git a/include/reactor-uc/network_channel.h b/include/reactor-uc/network_channel.h index 1fd4b83a..d74b75d1 100644 --- a/include/reactor-uc/network_channel.h +++ b/include/reactor-uc/network_channel.h @@ -13,16 +13,15 @@ typedef struct FederatedConnectionBundle FederatedConnectionBundle; typedef struct NetworkChannel NetworkChannel; struct NetworkChannel { - lf_ret_t (*bind)(NetworkChannel *self); - lf_ret_t (*connect)(NetworkChannel *self); - bool (*accept)(NetworkChannel *self); - void (*close)(NetworkChannel *self); - void (*register_callback)(NetworkChannel *self, - void (*receive_callback)(FederatedConnectionBundle *conn, TaggedMessage *message), - FederatedConnectionBundle *conn); - lf_ret_t (*send)(NetworkChannel *self, TaggedMessage *message); - TaggedMessage *(*receive)(NetworkChannel *self); - void (*free)(NetworkChannel *self); + size_t dest_channel_id; // So that we can "address" one of several NetworkChannel's at the other end. + lf_ret_t (*open_connection)(NetworkChannel *self); + lf_ret_t (*try_connect)(NetworkChannel *self); + lf_ret_t (*close_connection)(NetworkChannel *self); + lf_ret_t (*send_blocking)(NetworkChannel *self, TaggedMessage *message); + void (*register_receive_callback)(NetworkChannel *self, + void (*receive_callback)(FederatedConnectionBundle *conn, TaggedMessage *message), + FederatedConnectionBundle *conn); + lf_ret_t (*free)(NetworkChannel *self); }; #endif // REACTOR_UC_NETWORK_CHANNEL_H diff --git a/include/reactor-uc/platform/posix/tcp_ip_channel.h b/include/reactor-uc/platform/posix/tcp_ip_channel.h index e3491414..51c34bc1 100644 --- a/include/reactor-uc/platform/posix/tcp_ip_channel.h +++ b/include/reactor-uc/platform/posix/tcp_ip_channel.h @@ -45,8 +45,6 @@ struct TcpIpChannel { void (*receive_callback)(FederatedConnectionBundle *conn, TaggedMessage *message); }; -void TcpIpChannel_ctor(TcpIpChannel *self, const char *host, unsigned short port, int protocol_family); - -void TcpIpChannel_free(NetworkChannel *self); +void TcpIpChannel_ctor(TcpIpChannel *self, const char *host, unsigned short port, int protocol_family, bool server); #endif diff --git a/src/federated.c b/src/federated.c index 0f7d10a3..2e34f8c5 100644 --- a/src/federated.c +++ b/src/federated.c @@ -57,7 +57,7 @@ void FederatedOutputConnection_cleanup(Trigger *trigger) { LF_DEBUG(FED, "FedOutConn %p sending message with tag=%" PRId64 ":%" PRIu32, trigger, msg.tag.time, msg.tag.microstep); - lf_ret_t ret = channel->send(channel, &msg); + lf_ret_t ret = channel->send_blocking(channel, &msg); if (ret != LF_OK) { LF_ERR(FED, "FedOutConn %p failed to send message", trigger); @@ -204,5 +204,5 @@ void FederatedConnectionBundle_ctor(FederatedConnectionBundle *self, Reactor *pa self->serialize_hooks = serialize_hooks; // Register callback function for message received. - self->net_channel->register_callback(self->net_channel, FederatedConnectionBundle_msg_received_cb, self); + self->net_channel->register_receive_callback(self->net_channel, FederatedConnectionBundle_msg_received_cb, self); } diff --git a/src/platform/posix/tcp_ip_channel.c b/src/platform/posix/tcp_ip_channel.c index 982431e0..306012c9 100644 --- a/src/platform/posix/tcp_ip_channel.c +++ b/src/platform/posix/tcp_ip_channel.c @@ -1,6 +1,6 @@ +#include "reactor-uc/platform/posix/tcp_ip_channel.h" #include "reactor-uc/encoding.h" #include "reactor-uc/logging.h" -#include "reactor-uc/platform/posix/tcp_ip_channel.h" #include #include @@ -19,72 +19,83 @@ #define MIN(a, b) ((a) < (b) ? (a) : (b)) -lf_ret_t TcpIpChannel_bind(NetworkChannel *untyped_self) { +/** + * @brief If is server: Bind and Listen for connections + * If is client: Do nothing + */ +static lf_ret_t _open_connection(NetworkChannel *untyped_self) { TcpIpChannel *self = (TcpIpChannel *)untyped_self; - struct sockaddr_in serv_addr; - serv_addr.sin_family = self->protocol_family; - serv_addr.sin_port = htons(self->port); + if (self->server) { + struct sockaddr_in serv_addr; + serv_addr.sin_family = self->protocol_family; + serv_addr.sin_port = htons(self->port); - // turn human-readable address into something the os can work with - if (inet_pton(self->protocol_family, self->host, &serv_addr.sin_addr) <= 0) { - return LF_INVALID_VALUE; - } + // turn human-readable address into something the os can work with + if (inet_pton(self->protocol_family, self->host, &serv_addr.sin_addr) <= 0) { + return LF_INVALID_VALUE; + } - // bind the socket to that address - int ret = bind(self->fd, (struct sockaddr *)&serv_addr, sizeof(serv_addr)); - if (ret < 0) { - return LF_NETWORK_SETUP_FAILED; - } + // bind the socket to that address + int ret = bind(self->fd, (struct sockaddr *)&serv_addr, sizeof(serv_addr)); + if (ret < 0) { + return LF_NETWORK_SETUP_FAILED; + } - // start listening - if (listen(self->fd, 1) < 0) { - return LF_NETWORK_SETUP_FAILED; + // start listening + if (listen(self->fd, 1) < 0) { + return LF_NETWORK_SETUP_FAILED; + } } return LF_OK; } -lf_ret_t TcpIpChannel_connect(NetworkChannel *untyped_self) { +/** + * @brief If is server: Try to accept + * If is client: Try to connect + */ +static lf_ret_t _try_connect(NetworkChannel *untyped_self) { TcpIpChannel *self = (TcpIpChannel *)untyped_self; - self->server = false; - - struct sockaddr_in serv_addr; + /* Server -> Accept */ + if (self->server) { + int new_socket; + struct sockaddr_in address; + socklen_t addrlen = sizeof(address); - serv_addr.sin_family = self->protocol_family; - serv_addr.sin_port = htons(self->port); + new_socket = accept(self->fd, (struct sockaddr *)&address, &addrlen); + if (new_socket >= 0) { + self->client = new_socket; + FD_SET(new_socket, &self->set); - if (inet_pton(self->protocol_family, self->host, &serv_addr.sin_addr) <= 0) { - return LF_INVALID_VALUE; - } + return true; + } - int ret = connect(self->fd, (struct sockaddr *)&serv_addr, sizeof(serv_addr)); - if (ret < 0) { - return LF_COULD_NOT_CONNECT; + return false; } - return LF_OK; -} - -bool TcpIpChannel_accept(NetworkChannel *untyped_self) { - TcpIpChannel *self = (TcpIpChannel *)untyped_self; + /* Client -> Connect */ + if (!self->server) { + struct sockaddr_in serv_addr; - int new_socket; - struct sockaddr_in address; - socklen_t addrlen = sizeof(address); + serv_addr.sin_family = self->protocol_family; + serv_addr.sin_port = htons(self->port); - new_socket = accept(self->fd, (struct sockaddr *)&address, &addrlen); - if (new_socket >= 0) { - self->client = new_socket; - FD_SET(new_socket, &self->set); + if (inet_pton(self->protocol_family, self->host, &serv_addr.sin_addr) <= 0) { + return LF_INVALID_VALUE; + } - return true; + int ret = connect(self->fd, (struct sockaddr *)&serv_addr, sizeof(serv_addr)); + if (ret < 0) { + return LF_COULD_NOT_CONNECT; + } } - return false; + + return LF_OK; } -lf_ret_t TcpIpChannel_send(NetworkChannel *untyped_self, TaggedMessage *message) { +static lf_ret_t _send_blocking(NetworkChannel *untyped_self, TaggedMessage *message) { LF_DEBUG(NET, "TcpIpChannel sending message"); TcpIpChannel *self = (TcpIpChannel *)untyped_self; @@ -132,7 +143,7 @@ lf_ret_t TcpIpChannel_send(NetworkChannel *untyped_self, TaggedMessage *message) return LF_OK; } -TaggedMessage *TcpIpChannel_receive(NetworkChannel *untyped_self) { +static TaggedMessage *_receive(NetworkChannel *untyped_self) { TcpIpChannel *self = (TcpIpChannel *)untyped_self; int socket; @@ -173,7 +184,7 @@ TaggedMessage *TcpIpChannel_receive(NetworkChannel *untyped_self) { return &self->output; } -void TcpIpChannel_close(NetworkChannel *untyped_self) { +static void _close_connection(NetworkChannel *untyped_self) { LF_DEBUG(NET, "Closing TCP/IP Channel"); TcpIpChannel *self = (TcpIpChannel *)untyped_self; @@ -188,7 +199,7 @@ void TcpIpChannel_close(NetworkChannel *untyped_self) { } } -void *TcpIpChannel_receive_thread(void *untyped_self) { +static void *_receive_thread(void *untyped_self) { LF_INFO(NET, "Starting TCP/IP receive thread"); TcpIpChannel *self = untyped_self; @@ -196,7 +207,7 @@ void *TcpIpChannel_receive_thread(void *untyped_self) { self->terminate = false; while (!self->terminate) { - TaggedMessage *msg = self->super.receive(untyped_self); + TaggedMessage *msg = _receive(untyped_self); if (msg) { self->receive_callback(self->federated_connection, msg); @@ -206,9 +217,9 @@ void *TcpIpChannel_receive_thread(void *untyped_self) { return NULL; } -void TcpIpChannel_register_callback(NetworkChannel *untyped_self, - void (*receive_callback)(FederatedConnectionBundle *conn, TaggedMessage *msg), - FederatedConnectionBundle *conn) { +static void _register_receive_callback(NetworkChannel *untyped_self, + void (*receive_callback)(FederatedConnectionBundle *conn, TaggedMessage *msg), + FederatedConnectionBundle *conn) { int res; LF_INFO(NET, "TCP/IP registering callback thread"); TcpIpChannel *self = (TcpIpChannel *)untyped_self; @@ -234,13 +245,31 @@ void TcpIpChannel_register_callback(NetworkChannel *untyped_self, throw("pthread_attr_setstack failed"); } #endif - res = pthread_create(&self->receive_thread, &self->receive_thread_attr, TcpIpChannel_receive_thread, self); + res = pthread_create(&self->receive_thread, &self->receive_thread_attr, _receive_thread, self); if (res < 0) { throw("pthread_create failed"); } } -void TcpIpChannel_ctor(TcpIpChannel *self, const char *host, unsigned short port, int protocol_family) { +static void _free(NetworkChannel *untyped_self) { + LF_DEBUG(NET, "Freeing TCP/IP Channel"); + TcpIpChannel *self = (TcpIpChannel *)untyped_self; + self->terminate = true; + + if (self->receive_thread != 0) { + LF_DEBUG(NET, "Stopping receive thread"); + if (pthread_cancel(self->receive_thread) != 0) { + LF_ERR(NET, "Error canceling receive thread"); + } + + if (pthread_join(self->receive_thread, NULL) != 0) { + LF_ERR(NET, "Error joining receive thread"); + } + } + self->super.close_connection((NetworkChannel *)self); +} + +void TcpIpChannel_ctor(TcpIpChannel *self, const char *host, unsigned short port, int protocol_family, bool server) { FD_ZERO(&self->set); if ((self->fd = socket(protocol_family, SOCK_STREAM, 0)) < 0) { @@ -250,7 +279,7 @@ void TcpIpChannel_ctor(TcpIpChannel *self, const char *host, unsigned short port throw("Could not set SO_REUSEADDR"); } - self->server = true; + self->server = server; self->terminate = true; self->protocol_family = protocol_family; self->host = host; @@ -258,32 +287,12 @@ void TcpIpChannel_ctor(TcpIpChannel *self, const char *host, unsigned short port self->read_index = 0; self->client = 0; - self->super.accept = TcpIpChannel_accept; - self->super.bind = TcpIpChannel_bind; - self->super.connect = TcpIpChannel_connect; - self->super.close = TcpIpChannel_close; - self->super.receive = TcpIpChannel_receive; - self->super.send = TcpIpChannel_send; - self->super.register_callback = TcpIpChannel_register_callback; - self->super.free = TcpIpChannel_free; + self->super.open_connection = _open_connection; + self->super.try_connect = _try_connect; + self->super.close_connection = _close_connection; + self->super.send_blocking = _send_blocking; + self->super.register_receive_callback = _register_receive_callback; + self->super.free = _free; self->receive_callback = NULL; self->federated_connection = NULL; -} - -void TcpIpChannel_free(NetworkChannel *untyped_self) { - LF_DEBUG(NET, "Freeing TCP/IP Channel"); - TcpIpChannel *self = (TcpIpChannel *)untyped_self; - self->terminate = true; - - if (self->receive_thread != 0) { - LF_DEBUG(NET, "Stopping receive thread"); - if (pthread_cancel(self->receive_thread) != 0) { - LF_ERR(NET, "Error canceling receive thread"); - } - - if (pthread_join(self->receive_thread, NULL) != 0) { - LF_ERR(NET, "Error joining receive thread"); - } - } - self->super.close((NetworkChannel *)self); } \ No newline at end of file From 361a0e473753228021cc70ea88305371ba7da4c6 Mon Sep 17 00:00:00 2001 From: Lasse Rosenow Date: Thu, 24 Oct 2024 14:19:50 +0000 Subject: [PATCH 02/18] Fix close_connection interface --- include/reactor-uc/network_channel.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/reactor-uc/network_channel.h b/include/reactor-uc/network_channel.h index d74b75d1..be4446c9 100644 --- a/include/reactor-uc/network_channel.h +++ b/include/reactor-uc/network_channel.h @@ -16,7 +16,7 @@ struct NetworkChannel { size_t dest_channel_id; // So that we can "address" one of several NetworkChannel's at the other end. lf_ret_t (*open_connection)(NetworkChannel *self); lf_ret_t (*try_connect)(NetworkChannel *self); - lf_ret_t (*close_connection)(NetworkChannel *self); + void (*close_connection)(NetworkChannel *self); lf_ret_t (*send_blocking)(NetworkChannel *self, TaggedMessage *message); void (*register_receive_callback)(NetworkChannel *self, void (*receive_callback)(FederatedConnectionBundle *conn, TaggedMessage *message), From d9c10943f68ce134d205fa1bef042351475e0a2c Mon Sep 17 00:00:00 2001 From: Lasse Rosenow Date: Thu, 24 Oct 2024 14:33:46 +0000 Subject: [PATCH 03/18] Fix network channel free --- include/reactor-uc/network_channel.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/reactor-uc/network_channel.h b/include/reactor-uc/network_channel.h index be4446c9..b3e5037e 100644 --- a/include/reactor-uc/network_channel.h +++ b/include/reactor-uc/network_channel.h @@ -21,7 +21,7 @@ struct NetworkChannel { void (*register_receive_callback)(NetworkChannel *self, void (*receive_callback)(FederatedConnectionBundle *conn, TaggedMessage *message), FederatedConnectionBundle *conn); - lf_ret_t (*free)(NetworkChannel *self); + void (*free)(NetworkChannel *self); }; #endif // REACTOR_UC_NETWORK_CHANNEL_H From c15b4072df584d808f3aa4dc6b3cfa41132638a5 Mon Sep 17 00:00:00 2001 From: Lasse Rosenow Date: Thu, 24 Oct 2024 15:29:18 +0000 Subject: [PATCH 04/18] Fix tcp_channel_try_connect return types --- src/platform/posix/tcp_ip_channel.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/platform/posix/tcp_ip_channel.c b/src/platform/posix/tcp_ip_channel.c index 306012c9..a0605f83 100644 --- a/src/platform/posix/tcp_ip_channel.c +++ b/src/platform/posix/tcp_ip_channel.c @@ -69,10 +69,10 @@ static lf_ret_t _try_connect(NetworkChannel *untyped_self) { self->client = new_socket; FD_SET(new_socket, &self->set); - return true; + return LF_OK; } - return false; + return LF_COULD_NOT_CONNECT; } /* Client -> Connect */ @@ -231,7 +231,7 @@ static void _register_receive_callback(NetworkChannel *untyped_self, throw("pthread_attr_init failed"); } /* TODO: RIOT posix-wrappers don't have pthread_attr_setstack yet */ -#ifdef PLATFORM_RIOT +#if defined(PLATFORM_RIOT) && !defined(__USE_XOPEN2K) if (pthread_attr_setstackaddr(&self->receive_thread_attr, self->receive_thread_stack) != 0) { throw("pthread_attr_setstackaddr failed"); } From 70e87644ed4150c5667b0967715eb26931b10d75 Mon Sep 17 00:00:00 2001 From: Lasse Rosenow Date: Thu, 24 Oct 2024 15:30:11 +0000 Subject: [PATCH 05/18] Add WIP test_tcp_channel inside riot examples --- examples/riot/test_tcp_channel/Makefile | 18 ++++++ examples/riot/test_tcp_channel/main.c | 83 +++++++++++++++++++++++++ 2 files changed, 101 insertions(+) create mode 100755 examples/riot/test_tcp_channel/Makefile create mode 100755 examples/riot/test_tcp_channel/main.c diff --git a/examples/riot/test_tcp_channel/Makefile b/examples/riot/test_tcp_channel/Makefile new file mode 100755 index 00000000..61759277 --- /dev/null +++ b/examples/riot/test_tcp_channel/Makefile @@ -0,0 +1,18 @@ +# name of your application +APPLICATION = lf-test + +# If no BOARD is found in the environment, use this default: +BOARD ?= native + +# Comment this out to disable code in RIOT that does safety checking +# which is not needed in a production environment but helps in the +# development process: +DEVELHELP ?= 1 + +# Change this to 0 show compiler invocation lines by default: +QUIET ?= 1 + +# Enable reactor-uc features +CFLAGS += -DNETWORK_POSIX_TCP + +include $(CURDIR)/../../../make/riot/riot.mk diff --git a/examples/riot/test_tcp_channel/main.c b/examples/riot/test_tcp_channel/main.c new file mode 100755 index 00000000..158f6987 --- /dev/null +++ b/examples/riot/test_tcp_channel/main.c @@ -0,0 +1,83 @@ +#include "reactor-uc/platform/posix/tcp_ip_channel.h" +#include "reactor-uc/reactor-uc.h" +#include +#include + +NetworkChannel *server_channel; +NetworkChannel *client_channel; + +void callback_handler(FederatedConnectionBundle *self, TaggedMessage *msg) { + (void)self; + printf("Received message with connection number %i and content %s\n", msg->conn_id, (char *)msg->payload.bytes); + // channel.super.send_blocking(&channel.super, msg); +} + +NetworkChannel *server_init(TcpIpChannel *tcp_channel, const char *host, unsigned short port) { + NetworkChannel *channel = &tcp_channel->super; + + // creating a server that listens on loopback device on port 8903 + TcpIpChannel_ctor(tcp_channel, host, port, AF_INET, true); + + // binding to that address + channel->open_connection(channel); + + // register receive callback for handling incoming messages + channel->register_receive_callback(channel, callback_handler, NULL); + + return channel; +} + +NetworkChannel *client_init(TcpIpChannel *tcp_channel, const char *host, unsigned short port) { + NetworkChannel *channel = &tcp_channel->super; + + // creating a server that listens on loopback device on port 8900 + TcpIpChannel_ctor(tcp_channel, host, port, AF_INET, false); + + // binding to that address + channel->open_connection(channel); + + return channel; +} + +int main() { + const char *host = "127.0.0.1"; + unsigned short port = 8903; // NOLINT + + /* init server and wait for messages */ + TcpIpChannel _server_tcp_channel; + server_channel = server_init(&_server_tcp_channel, host, port); + + /* init client and send messages to server */ + TcpIpChannel _client_tcp_channel; + client_channel = client_init(&_client_tcp_channel, host, port); + + bool client_connected = false; + bool server_connected = false; + while (!client_connected && !server_connected) { + /* client tries to connect to server */ + if (client_channel->try_connect(client_channel) == LF_OK) { + client_connected = true; + } + + /* server tries to connect to client */ + if (server_channel->try_connect(server_channel) == LF_OK) { + server_connected = true; + } + } + + /* send data to server */ + TaggedMessage port_message; + port_message.conn_id = 42; // NOLINT + const char *message = "Hello World1234"; + memcpy(port_message.payload.bytes, message, sizeof("Hello World1234")); // NOLINT + port_message.payload.size = sizeof("Hello World1234"); + for (int i = 0; i < 10; i++) { + client_channel->send_blocking(client_channel, &port_message); + + // waiting for reply + // TaggedMessage *received_message = channel->receive(channel); + + // printf("Received message with connection number %i and content %s\n", received_message->conn_id, (char + // *)received_message->payload.bytes); + } +} From 3d359c06c75f4122d342aa3dda2ef5e9bd11c7eb Mon Sep 17 00:00:00 2001 From: Lasse Rosenow Date: Fri, 25 Oct 2024 15:08:20 +0000 Subject: [PATCH 06/18] Fix tcp-ip-channel function prefix --- src/platform/posix/tcp_ip_channel.c | 35 +++++++++++++++-------------- 1 file changed, 18 insertions(+), 17 deletions(-) diff --git a/src/platform/posix/tcp_ip_channel.c b/src/platform/posix/tcp_ip_channel.c index a0605f83..efddce5a 100644 --- a/src/platform/posix/tcp_ip_channel.c +++ b/src/platform/posix/tcp_ip_channel.c @@ -23,7 +23,7 @@ * @brief If is server: Bind and Listen for connections * If is client: Do nothing */ -static lf_ret_t _open_connection(NetworkChannel *untyped_self) { +static lf_ret_t TcpIpChannel_open_connection(NetworkChannel *untyped_self) { TcpIpChannel *self = (TcpIpChannel *)untyped_self; if (self->server) { @@ -55,7 +55,7 @@ static lf_ret_t _open_connection(NetworkChannel *untyped_self) { * @brief If is server: Try to accept * If is client: Try to connect */ -static lf_ret_t _try_connect(NetworkChannel *untyped_self) { +static lf_ret_t TcpIpChannel_try_connect(NetworkChannel *untyped_self) { TcpIpChannel *self = (TcpIpChannel *)untyped_self; /* Server -> Accept */ @@ -95,7 +95,7 @@ static lf_ret_t _try_connect(NetworkChannel *untyped_self) { return LF_OK; } -static lf_ret_t _send_blocking(NetworkChannel *untyped_self, TaggedMessage *message) { +static lf_ret_t TcpIpChannel_send_blocking(NetworkChannel *untyped_self, TaggedMessage *message) { LF_DEBUG(NET, "TcpIpChannel sending message"); TcpIpChannel *self = (TcpIpChannel *)untyped_self; @@ -143,7 +143,7 @@ static lf_ret_t _send_blocking(NetworkChannel *untyped_self, TaggedMessage *mess return LF_OK; } -static TaggedMessage *_receive(NetworkChannel *untyped_self) { +static TaggedMessage *TcpIpChannel_receive(NetworkChannel *untyped_self) { TcpIpChannel *self = (TcpIpChannel *)untyped_self; int socket; @@ -184,7 +184,7 @@ static TaggedMessage *_receive(NetworkChannel *untyped_self) { return &self->output; } -static void _close_connection(NetworkChannel *untyped_self) { +static void TcpIpChannel_close_connection(NetworkChannel *untyped_self) { LF_DEBUG(NET, "Closing TCP/IP Channel"); TcpIpChannel *self = (TcpIpChannel *)untyped_self; @@ -199,7 +199,7 @@ static void _close_connection(NetworkChannel *untyped_self) { } } -static void *_receive_thread(void *untyped_self) { +static void *TcpIpChannel_receive_thread(void *untyped_self) { LF_INFO(NET, "Starting TCP/IP receive thread"); TcpIpChannel *self = untyped_self; @@ -217,9 +217,10 @@ static void *_receive_thread(void *untyped_self) { return NULL; } -static void _register_receive_callback(NetworkChannel *untyped_self, - void (*receive_callback)(FederatedConnectionBundle *conn, TaggedMessage *msg), - FederatedConnectionBundle *conn) { +static void TcpIpChannel_register_receive_callback(NetworkChannel *untyped_self, + void (*receive_callback)(FederatedConnectionBundle *conn, + TaggedMessage *msg), + FederatedConnectionBundle *conn) { int res; LF_INFO(NET, "TCP/IP registering callback thread"); TcpIpChannel *self = (TcpIpChannel *)untyped_self; @@ -245,13 +246,13 @@ static void _register_receive_callback(NetworkChannel *untyped_self, throw("pthread_attr_setstack failed"); } #endif - res = pthread_create(&self->receive_thread, &self->receive_thread_attr, _receive_thread, self); + res = pthread_create(&self->receive_thread, &self->receive_thread_attr, TcpIpChannel_receive_thread, self); if (res < 0) { throw("pthread_create failed"); } } -static void _free(NetworkChannel *untyped_self) { +static void TcpIpChannel_free(NetworkChannel *untyped_self) { LF_DEBUG(NET, "Freeing TCP/IP Channel"); TcpIpChannel *self = (TcpIpChannel *)untyped_self; self->terminate = true; @@ -287,12 +288,12 @@ void TcpIpChannel_ctor(TcpIpChannel *self, const char *host, unsigned short port self->read_index = 0; self->client = 0; - self->super.open_connection = _open_connection; - self->super.try_connect = _try_connect; - self->super.close_connection = _close_connection; - self->super.send_blocking = _send_blocking; - self->super.register_receive_callback = _register_receive_callback; - self->super.free = _free; + self->super.open_connection = TcpIpChannel_open_connection; + self->super.try_connect = TcpIpChannel_try_connect; + self->super.close_connection = TcpIpChannel_close_connection; + self->super.send_blocking = TcpIpChannel_send_blocking; + self->super.register_receive_callback = TcpIpChannel_register_receive_callback; + self->super.free = TcpIpChannel_free; self->receive_callback = NULL; self->federated_connection = NULL; } \ No newline at end of file From dbb90ea99ff27170bc28091666ccee76a6fbf879 Mon Sep 17 00:00:00 2001 From: Lasse Rosenow Date: Fri, 25 Oct 2024 16:13:10 +0000 Subject: [PATCH 07/18] Fix function name --- src/platform/posix/tcp_ip_channel.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/platform/posix/tcp_ip_channel.c b/src/platform/posix/tcp_ip_channel.c index efddce5a..bb438d5a 100644 --- a/src/platform/posix/tcp_ip_channel.c +++ b/src/platform/posix/tcp_ip_channel.c @@ -207,7 +207,7 @@ static void *TcpIpChannel_receive_thread(void *untyped_self) { self->terminate = false; while (!self->terminate) { - TaggedMessage *msg = _receive(untyped_self); + TaggedMessage *msg = TcpIpChannel_receive(untyped_self); if (msg) { self->receive_callback(self->federated_connection, msg); From ff1eb7c8e39b7de5600e6fe1ced4d934daf5ffbc Mon Sep 17 00:00:00 2001 From: Lasse Rosenow Date: Fri, 25 Oct 2024 17:11:57 +0000 Subject: [PATCH 08/18] Remove tcp ip channel tests and move them to a more generic network channel test inside the unit tests folder --- .../testing_posix_tcp_ip_channel_client.c | 41 ------ .../testing_posix_tcp_ip_channel_server.c | 36 ------ .../testing_tcp_ip_channel_server_callback.c | 35 ----- examples/riot/test_tcp_channel/Makefile | 18 --- examples/riot/test_tcp_channel/main.c | 83 ------------ src/platform/posix/tcp_ip_channel.c | 3 +- test/unit/network_channels_test.c | 122 ++++++++++++++++++ 7 files changed, 123 insertions(+), 215 deletions(-) delete mode 100644 examples/posix/testing_posix_tcp_ip_channel_client.c delete mode 100644 examples/posix/testing_posix_tcp_ip_channel_server.c delete mode 100644 examples/posix/testing_tcp_ip_channel_server_callback.c delete mode 100755 examples/riot/test_tcp_channel/Makefile delete mode 100755 examples/riot/test_tcp_channel/main.c create mode 100644 test/unit/network_channels_test.c diff --git a/examples/posix/testing_posix_tcp_ip_channel_client.c b/examples/posix/testing_posix_tcp_ip_channel_client.c deleted file mode 100644 index b795ffc2..00000000 --- a/examples/posix/testing_posix_tcp_ip_channel_client.c +++ /dev/null @@ -1,41 +0,0 @@ -#include "reactor-uc/platform/posix/tcp_ip_channel.h" -#include "reactor-uc/reactor-uc.h" -#include -#include - -#include "proto/message.pb.h" -#define NUM_ITER 10 - -int main() { - TcpIpChannel channel; - - // server address - const char *host = "127.0.0.1"; - unsigned short port = 8902; // NOLINT - - // message for server - TaggedMessage port_message; - port_message.conn_id = 42; // NOLINT - const char *message = "Hello World1234"; - memcpy(port_message.payload.bytes, message, sizeof("Hello World1234")); // NOLINT - port_message.payload.size = sizeof("Hello World1234"); - - // creating a server that listens on loopback device on port 8900 - TcpIpChannel_ctor(&channel, host, port, AF_INET); - - // binding to that address - channel.super.connect(&channel.super); - - for (int i = 0; i < NUM_ITER; i++) { - // sending message - channel.super.send(&channel.super, &port_message); - - // waiting for reply - TaggedMessage *received_message = channel.super.receive(&channel.super); - - printf("Received message with connection number %i and content %s\n", received_message->conn_id, - (char *)received_message->payload.bytes); - } - - channel.super.close(&channel.super); -} diff --git a/examples/posix/testing_posix_tcp_ip_channel_server.c b/examples/posix/testing_posix_tcp_ip_channel_server.c deleted file mode 100644 index 68bee971..00000000 --- a/examples/posix/testing_posix_tcp_ip_channel_server.c +++ /dev/null @@ -1,36 +0,0 @@ -#include "reactor-uc/platform/posix/tcp_ip_channel.h" -#include "reactor-uc/reactor-uc.h" -#include -#include - -#define NUM_ITER 10 - -int main() { - TcpIpChannel channel; - - const char *host = "127.0.0.1"; - unsigned short port = 8902; // NOLINT - - // creating a server that listens on loopback device on port 8900 - TcpIpChannel_ctor(&channel, host, port, AF_INET); - - // binding to that address - channel.super.bind(&channel.super); - - // accept one connection - bool new_connection; - do { - new_connection = channel.super.accept(&channel.super); - } while (!new_connection); - - for (int i = 0; i < NUM_ITER; i++) { - // waiting for messages from client - TaggedMessage *message = channel.super.receive(&channel.super); - printf("Received message with connection number %i and content %s\n", message->conn_id, - (char *)message->payload.bytes); - - channel.super.send(&channel.super, message); - } - - channel.super.close(&channel.super); -} diff --git a/examples/posix/testing_tcp_ip_channel_server_callback.c b/examples/posix/testing_tcp_ip_channel_server_callback.c deleted file mode 100644 index c6a4e50c..00000000 --- a/examples/posix/testing_tcp_ip_channel_server_callback.c +++ /dev/null @@ -1,35 +0,0 @@ -#include "reactor-uc/federated.h" -#include "reactor-uc/platform/posix/tcp_ip_channel.h" -#include "reactor-uc/reactor-uc.h" -#include -#include -TcpIpChannel channel; - -void callback_handler(FederatedConnectionBundle *self, TaggedMessage *msg) { - printf("Received message with connection number %i and content %s\n", msg->conn_id, (char *)msg->payload.bytes); - channel.super.send(&channel.super, msg); -} - -int main() { - - const char *host = "127.0.0.1"; - unsigned short port = 8903; // NOLINT - - // creating a server that listens on loopback device on port 8900 - TcpIpChannel_ctor(&channel, host, port, AF_INET); - - // binding to that address - channel.super.bind(&channel.super); - - // accept one connection - bool new_connection; - do { - new_connection = channel.super.accept(&channel.super); - } while (!new_connection); - - channel.super.register_callback(&channel.super, callback_handler, NULL); - - sleep(100); - - channel.super.close(&channel.super); -} diff --git a/examples/riot/test_tcp_channel/Makefile b/examples/riot/test_tcp_channel/Makefile deleted file mode 100755 index 61759277..00000000 --- a/examples/riot/test_tcp_channel/Makefile +++ /dev/null @@ -1,18 +0,0 @@ -# name of your application -APPLICATION = lf-test - -# If no BOARD is found in the environment, use this default: -BOARD ?= native - -# Comment this out to disable code in RIOT that does safety checking -# which is not needed in a production environment but helps in the -# development process: -DEVELHELP ?= 1 - -# Change this to 0 show compiler invocation lines by default: -QUIET ?= 1 - -# Enable reactor-uc features -CFLAGS += -DNETWORK_POSIX_TCP - -include $(CURDIR)/../../../make/riot/riot.mk diff --git a/examples/riot/test_tcp_channel/main.c b/examples/riot/test_tcp_channel/main.c deleted file mode 100755 index 158f6987..00000000 --- a/examples/riot/test_tcp_channel/main.c +++ /dev/null @@ -1,83 +0,0 @@ -#include "reactor-uc/platform/posix/tcp_ip_channel.h" -#include "reactor-uc/reactor-uc.h" -#include -#include - -NetworkChannel *server_channel; -NetworkChannel *client_channel; - -void callback_handler(FederatedConnectionBundle *self, TaggedMessage *msg) { - (void)self; - printf("Received message with connection number %i and content %s\n", msg->conn_id, (char *)msg->payload.bytes); - // channel.super.send_blocking(&channel.super, msg); -} - -NetworkChannel *server_init(TcpIpChannel *tcp_channel, const char *host, unsigned short port) { - NetworkChannel *channel = &tcp_channel->super; - - // creating a server that listens on loopback device on port 8903 - TcpIpChannel_ctor(tcp_channel, host, port, AF_INET, true); - - // binding to that address - channel->open_connection(channel); - - // register receive callback for handling incoming messages - channel->register_receive_callback(channel, callback_handler, NULL); - - return channel; -} - -NetworkChannel *client_init(TcpIpChannel *tcp_channel, const char *host, unsigned short port) { - NetworkChannel *channel = &tcp_channel->super; - - // creating a server that listens on loopback device on port 8900 - TcpIpChannel_ctor(tcp_channel, host, port, AF_INET, false); - - // binding to that address - channel->open_connection(channel); - - return channel; -} - -int main() { - const char *host = "127.0.0.1"; - unsigned short port = 8903; // NOLINT - - /* init server and wait for messages */ - TcpIpChannel _server_tcp_channel; - server_channel = server_init(&_server_tcp_channel, host, port); - - /* init client and send messages to server */ - TcpIpChannel _client_tcp_channel; - client_channel = client_init(&_client_tcp_channel, host, port); - - bool client_connected = false; - bool server_connected = false; - while (!client_connected && !server_connected) { - /* client tries to connect to server */ - if (client_channel->try_connect(client_channel) == LF_OK) { - client_connected = true; - } - - /* server tries to connect to client */ - if (server_channel->try_connect(server_channel) == LF_OK) { - server_connected = true; - } - } - - /* send data to server */ - TaggedMessage port_message; - port_message.conn_id = 42; // NOLINT - const char *message = "Hello World1234"; - memcpy(port_message.payload.bytes, message, sizeof("Hello World1234")); // NOLINT - port_message.payload.size = sizeof("Hello World1234"); - for (int i = 0; i < 10; i++) { - client_channel->send_blocking(client_channel, &port_message); - - // waiting for reply - // TaggedMessage *received_message = channel->receive(channel); - - // printf("Received message with connection number %i and content %s\n", received_message->conn_id, (char - // *)received_message->payload.bytes); - } -} diff --git a/src/platform/posix/tcp_ip_channel.c b/src/platform/posix/tcp_ip_channel.c index bb438d5a..1d9b7c97 100644 --- a/src/platform/posix/tcp_ip_channel.c +++ b/src/platform/posix/tcp_ip_channel.c @@ -160,12 +160,11 @@ static TaggedMessage *TcpIpChannel_receive(NetworkChannel *untyped_self) { bool read_more = true; while (read_more) { - // reading from socket ssize_t bytes_read = recv(socket, self->read_buffer + self->read_index, bytes_available, 0); if (bytes_read < 0) { - LF_ERR(NET, "Error recv from socket %d", errno); + LF_ERR(NET, "[%s] Error recv from socket %d", self->server ? "server" : "client", errno); continue; } diff --git a/test/unit/network_channels_test.c b/test/unit/network_channels_test.c new file mode 100644 index 00000000..3604fc3b --- /dev/null +++ b/test/unit/network_channels_test.c @@ -0,0 +1,122 @@ +#include "reactor-uc/platform/posix/tcp_ip_channel.h" +#include "reactor-uc/reactor-uc.h" +#include "unity.h" +#include +#include +#include + +#define MESSAGE_CONTENT "Hello World1234" +#define MESSAGE_CONNECTION_ID 42 + +NetworkChannel *server_channel; +NetworkChannel *client_channel; +TcpIpChannel _server_tcp_channel; +TcpIpChannel _client_tcp_channel; + +bool server_callback_called = false; +bool client_callback_called = false; + +void test_init_tcp_ip(void) { + const char *host = "127.0.0.1"; + unsigned short port = 8903; // NOLINT + + /* init server */ + TcpIpChannel_ctor(&_server_tcp_channel, host, port, AF_INET, true); + server_channel = &_server_tcp_channel.super; + + /* init client */ + TcpIpChannel_ctor(&_client_tcp_channel, host, port, AF_INET, false); + client_channel = &_client_tcp_channel.super; +} + +void test_connect(void) { + // open connection + server_channel->open_connection(server_channel); + client_channel->open_connection(client_channel); + + // try connect + bool client_connected = false; + bool server_connected = false; + while (!client_connected && !server_connected) { + /* client tries to connect to server */ + if (client_channel->try_connect(client_channel) == LF_OK) { + client_connected = true; + } + + /* server tries to connect to client */ + if (server_channel->try_connect(server_channel) == LF_OK) { + server_connected = true; + } + } +} + +void server_callback_handler(FederatedConnectionBundle *self, TaggedMessage *msg) { + (void)self; + printf("\nServer: Received message with connection number %i and content %s\n", msg->conn_id, + (char *)msg->payload.bytes); + TEST_ASSERT_EQUAL_STRING(MESSAGE_CONTENT, (char *)msg->payload.bytes); + TEST_ASSERT_EQUAL(MESSAGE_CONNECTION_ID, msg->conn_id); + + server_callback_called = true; +} + +void test_client_send_and_server_recv(void) { + // register receive callback for handling incoming messages + server_channel->register_receive_callback(server_channel, server_callback_handler, NULL); + + /* create message */ + TaggedMessage port_message; + port_message.conn_id = MESSAGE_CONNECTION_ID; + const char *message = MESSAGE_CONTENT; + memcpy(port_message.payload.bytes, message, sizeof(MESSAGE_CONTENT)); + port_message.payload.size = sizeof(MESSAGE_CONTENT); + + /* send message */ + client_channel->send_blocking(client_channel, &port_message); + + /* wait for the callback */ + sleep(1); + + /* check if the callback was called */ + TEST_ASSERT_TRUE(server_callback_called); +} + +void client_callback_handler(FederatedConnectionBundle *self, TaggedMessage *msg) { + (void)self; + printf("\nClient: Received message with connection number %i and content %s\n", msg->conn_id, + (char *)msg->payload.bytes); + TEST_ASSERT_EQUAL_STRING(MESSAGE_CONTENT, (char *)msg->payload.bytes); + TEST_ASSERT_EQUAL(MESSAGE_CONNECTION_ID, msg->conn_id); + + client_callback_called = true; +} + +void test_server_send_and_client_recv(void) { + // register receive callback for handling incoming messages + client_channel->register_receive_callback(client_channel, client_callback_handler, NULL); + + /* create message */ + TaggedMessage port_message; + port_message.conn_id = MESSAGE_CONNECTION_ID; + const char *message = MESSAGE_CONTENT; + memcpy(port_message.payload.bytes, message, sizeof(MESSAGE_CONTENT)); + port_message.payload.size = sizeof(MESSAGE_CONTENT); + + /* send message */ + server_channel->send_blocking(server_channel, &port_message); + + /* wait for the callback */ + sleep(1); + + /* check if the callback was called */ + TEST_ASSERT_TRUE(client_callback_called); +} + +int main(void) { + UNITY_BEGIN(); + RUN_TEST(test_init_tcp_ip); + RUN_TEST(test_connect); + RUN_TEST(test_client_send_and_server_recv); + RUN_TEST(test_server_send_and_client_recv); + return UNITY_END(); +} From 0003576c1107e4cd353511e079fce26536619c97 Mon Sep 17 00:00:00 2001 From: Lasse Rosenow Date: Fri, 25 Oct 2024 17:34:48 +0000 Subject: [PATCH 09/18] Fix zephyr and posix federated examples --- examples/posix/testing_fed_conn_receiver.c | 5 +++-- examples/posix/testing_fed_conn_sender.c | 10 ++++++---- .../federated_receiver1/src/receiver.c | 5 +++-- .../federated_receiver2/src/receiver.c | 5 +++-- .../federated_sender/src/sender.c | 20 +++++++++++-------- 5 files changed, 27 insertions(+), 18 deletions(-) diff --git a/examples/posix/testing_fed_conn_receiver.c b/examples/posix/testing_fed_conn_receiver.c index 154566fc..a155b7dc 100644 --- a/examples/posix/testing_fed_conn_receiver.c +++ b/examples/posix/testing_fed_conn_receiver.c @@ -63,14 +63,15 @@ typedef struct { void RecvSenderBundle_ctor(RecvSenderBundle *self, Reactor *parent) { ConnRecv_ctor(&self->conn, parent); - TcpIpChannel_ctor(&self->channel, "127.0.0.1", PORT_NUM, AF_INET); + TcpIpChannel_ctor(&self->channel, "127.0.0.1", PORT_NUM, AF_INET, false); self->inputs[0] = &self->conn.super; NetworkChannel *channel = (NetworkChannel *)&self->channel; + channel->open_connection(channel); lf_ret_t ret; do { - ret = channel->connect(channel); + ret = channel->try_connect(channel); } while (ret != LF_OK); validate(ret == LF_OK); printf("Recv: Connected\n"); diff --git a/examples/posix/testing_fed_conn_sender.c b/examples/posix/testing_fed_conn_sender.c index 802f0d3c..e8da205e 100644 --- a/examples/posix/testing_fed_conn_sender.c +++ b/examples/posix/testing_fed_conn_sender.c @@ -106,12 +106,12 @@ typedef struct { } SenderRecvBundle; void SenderRecvConn_ctor(SenderRecvBundle *self, Sender *parent) { - TcpIpChannel_ctor(&self->channel, "127.0.0.1", PORT_NUM, AF_INET); + TcpIpChannel_ctor(&self->channel, "127.0.0.1", PORT_NUM, AF_INET, true); ConnSender_ctor(&self->conn, &parent->super, &self->super); self->output[0] = &self->conn.super; NetworkChannel *channel = (NetworkChannel *)&self->channel; - int ret = channel->bind(channel); + lf_ret_t ret = channel->open_connection(channel); if (ret != LF_OK) { printf("bind failed with %d\n", errno); exit(1); @@ -120,8 +120,10 @@ void SenderRecvConn_ctor(SenderRecvBundle *self, Sender *parent) { printf("Sender: Bound\n"); // accept one connection - bool new_connection = channel->accept(channel); - validate(new_connection); + do { + ret = channel->try_connect(channel); + } while (ret != LF_OK); + validate(ret == LF_OK); printf("Sender: Accepted\n"); self->serialize_hooks[0] = serialize_msg_t; diff --git a/examples/zephyr/basic_federated/federated_receiver1/src/receiver.c b/examples/zephyr/basic_federated/federated_receiver1/src/receiver.c index ad7c02f7..b2f0f80c 100644 --- a/examples/zephyr/basic_federated/federated_receiver1/src/receiver.c +++ b/examples/zephyr/basic_federated/federated_receiver1/src/receiver.c @@ -61,15 +61,16 @@ typedef struct { void RecvSenderBundle_ctor(RecvSenderBundle *self, Reactor *parent) { ConnRecv_ctor(&self->conn, parent); - TcpIpChannel_ctor(&self->chan, "192.168.1.100", PORT_NUM, AF_INET); + TcpIpChannel_ctor(&self->chan, "192.168.1.100", PORT_NUM, AF_INET, false); self->inputs[0] = &self->conn.super; NetworkChannel *chan = &self->chan.super; + chan->open_connection(channel); lf_ret_t ret; LF_DEBUG(ENV, "Recv: Connecting"); do { - ret = chan->connect(chan); + ret = chan->try_connect(chan); } while (ret != LF_OK); LF_DEBUG(ENV, "Recv: Connected"); diff --git a/examples/zephyr/basic_federated/federated_receiver2/src/receiver.c b/examples/zephyr/basic_federated/federated_receiver2/src/receiver.c index 50d0f1f8..51230ae8 100644 --- a/examples/zephyr/basic_federated/federated_receiver2/src/receiver.c +++ b/examples/zephyr/basic_federated/federated_receiver2/src/receiver.c @@ -61,15 +61,16 @@ typedef struct { void RecvSenderBundle_ctor(RecvSenderBundle *self, Reactor *parent) { ConnRecv_ctor(&self->conn, parent); - TcpIpChannel_ctor(&self->chan, "192.168.1.100", PORT_NUM, AF_INET); + TcpIpChannel_ctor(&self->chan, "192.168.1.100", PORT_NUM, AF_INET, false); self->inputs[0] = &self->conn.super; NetworkChannel *chan = &self->chan.super; + chan->open_connection(chan); lf_ret_t ret; LF_DEBUG(ENV, "Recv: Connecting"); do { - ret = chan->connect(chan); + ret = chan->try_connect(chan); } while (ret != LF_OK); LF_DEBUG(ENV, "Recv: Connected"); diff --git a/examples/zephyr/basic_federated/federated_sender/src/sender.c b/examples/zephyr/basic_federated/federated_sender/src/sender.c index 3be64e9b..6fcb5cd7 100644 --- a/examples/zephyr/basic_federated/federated_sender/src/sender.c +++ b/examples/zephyr/basic_federated/federated_sender/src/sender.c @@ -115,18 +115,20 @@ typedef struct { } SenderRecv2Bundle; void SenderRecv1Bundle_ctor(SenderRecv1Bundle *self, Reactor *parent) { - TcpIpChannel_ctor(&self->chan, "192.168.1.100", PORT_CONN_1, AF_INET); + TcpIpChannel_ctor(&self->chan, "192.168.1.100", PORT_CONN_1, AF_INET, true); ConnSender1_ctor(&self->conn, parent, &self->super); self->output[0] = &self->conn.super; NetworkChannel *chan = &self->chan.super; - int ret = chan->bind(chan); + lf_ret_t ret = chan->open_connection(chan); validate(ret == LF_OK); printf("Sender: Bound 1\n"); // accept one connection - bool new_connection = chan->accept(chan); - validate(new_connection); + do { + ret = chan->try_connect(chan); + } while (ret != LF_OK); + validate(ret == LF_OK); printf("Sender: Accepted 1\n"); FederatedConnectionBundle_ctor(&self->super, parent, &self->chan.super, NULL, 0, @@ -134,18 +136,20 @@ void SenderRecv1Bundle_ctor(SenderRecv1Bundle *self, Reactor *parent) { } void SenderRecv2Bundle_ctor(SenderRecv2Bundle *self, Reactor *parent) { - TcpIpChannel_ctor(&self->chan, "192.168.1.100", PORT_CONN_2, AF_INET); + TcpIpChannel_ctor(&self->chan, "192.168.1.100", PORT_CONN_2, AF_INET, true); ConnSender2_ctor(&self->conn, parent, &self->super); self->output[0] = &self->conn.super; NetworkChannel *chan = &self->chan.super; - int ret = chan->bind(chan); + lf_ret_t ret = chan->open_connection(chan); validate(ret == LF_OK); printf("Sender: Bound 2\n"); // accept one connection - bool new_connection = chan->accept(chan); - validate(new_connection); + do { + ret = chan->try_connect(chan); + } while (ret != LF_OK); + validate(ret == LF_OK); printf("Sender: Accepted 2\n"); FederatedConnectionBundle_ctor(&self->super, parent, &self->chan.super, NULL, 0, From 9dde67a0c8331176ae31503597f76c92807f9b37 Mon Sep 17 00:00:00 2001 From: Lasse Rosenow Date: Fri, 25 Oct 2024 17:38:39 +0000 Subject: [PATCH 10/18] Fix --- test/unit/network_channels_test.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test/unit/network_channels_test.c b/test/unit/network_channels_test.c index 3604fc3b..7c4e7a83 100644 --- a/test/unit/network_channels_test.c +++ b/test/unit/network_channels_test.c @@ -1,3 +1,5 @@ +#define NETWORK_POSIX_TCP + #include "reactor-uc/platform/posix/tcp_ip_channel.h" #include "reactor-uc/reactor-uc.h" #include "unity.h" From 9f3d1d2c992745b58c6ce9b2160aaf7a8f8e6ec8 Mon Sep 17 00:00:00 2001 From: Lasse Rosenow Date: Fri, 25 Oct 2024 17:42:18 +0000 Subject: [PATCH 11/18] Test --- test/unit/network_channels_test.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/unit/network_channels_test.c b/test/unit/network_channels_test.c index 7c4e7a83..96b832ae 100644 --- a/test/unit/network_channels_test.c +++ b/test/unit/network_channels_test.c @@ -1,4 +1,4 @@ -#define NETWORK_POSIX_TCP +#define NETWORK_POSIX_TCP true #include "reactor-uc/platform/posix/tcp_ip_channel.h" #include "reactor-uc/reactor-uc.h" From 434ca790a366bb8fd24b18eb6f16124524b7576b Mon Sep 17 00:00:00 2001 From: Lasse Rosenow Date: Fri, 25 Oct 2024 17:47:38 +0000 Subject: [PATCH 12/18] Revert test --- test/unit/network_channels_test.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/test/unit/network_channels_test.c b/test/unit/network_channels_test.c index 96b832ae..3604fc3b 100644 --- a/test/unit/network_channels_test.c +++ b/test/unit/network_channels_test.c @@ -1,5 +1,3 @@ -#define NETWORK_POSIX_TCP true - #include "reactor-uc/platform/posix/tcp_ip_channel.h" #include "reactor-uc/reactor-uc.h" #include "unity.h" From 9fb286119302c1e21ce76f246d0c6e3ea639fca9 Mon Sep 17 00:00:00 2001 From: Lasse Rosenow Date: Fri, 25 Oct 2024 18:00:58 +0000 Subject: [PATCH 13/18] Make accept non blocking --- examples/riot/blinky/Makefile | 2 +- src/platform/posix/tcp_ip_channel.c | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/examples/riot/blinky/Makefile b/examples/riot/blinky/Makefile index a444960e..61759277 100755 --- a/examples/riot/blinky/Makefile +++ b/examples/riot/blinky/Makefile @@ -13,6 +13,6 @@ DEVELHELP ?= 1 QUIET ?= 1 # Enable reactor-uc features -# CFLAGS += -DNETWORK_POSIX_TCP +CFLAGS += -DNETWORK_POSIX_TCP include $(CURDIR)/../../../make/riot/riot.mk diff --git a/src/platform/posix/tcp_ip_channel.c b/src/platform/posix/tcp_ip_channel.c index 1d9b7c97..6675decc 100644 --- a/src/platform/posix/tcp_ip_channel.c +++ b/src/platform/posix/tcp_ip_channel.c @@ -4,6 +4,7 @@ #include #include +#include #include #include #include @@ -279,6 +280,11 @@ void TcpIpChannel_ctor(TcpIpChannel *self, const char *host, unsigned short port throw("Could not set SO_REUSEADDR"); } + // Set server socket to non-blocking + if (server) { + fcntl(self->fd, F_SETFL, O_NONBLOCK); + } + self->server = server; self->terminate = true; self->protocol_family = protocol_family; From 6574a65edf89c9da05209485e0054ab74ac09db0 Mon Sep 17 00:00:00 2001 From: Lasse Rosenow Date: Fri, 25 Oct 2024 19:15:14 +0000 Subject: [PATCH 14/18] Fix tcpchannel close --- .../platform/posix/tcp_ip_channel.h | 1 - src/platform/posix/tcp_ip_channel.c | 21 ++++++++++++++----- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/include/reactor-uc/platform/posix/tcp_ip_channel.h b/include/reactor-uc/platform/posix/tcp_ip_channel.h index 51c34bc1..681482f2 100644 --- a/include/reactor-uc/platform/posix/tcp_ip_channel.h +++ b/include/reactor-uc/platform/posix/tcp_ip_channel.h @@ -33,7 +33,6 @@ struct TcpIpChannel { fd_set set; bool server; - bool blocking; bool terminate; // required for callbacks diff --git a/src/platform/posix/tcp_ip_channel.c b/src/platform/posix/tcp_ip_channel.c index 6675decc..0585d9a1 100644 --- a/src/platform/posix/tcp_ip_channel.c +++ b/src/platform/posix/tcp_ip_channel.c @@ -188,7 +188,7 @@ static void TcpIpChannel_close_connection(NetworkChannel *untyped_self) { LF_DEBUG(NET, "Closing TCP/IP Channel"); TcpIpChannel *self = (TcpIpChannel *)untyped_self; - if (self->server) { + if (self->server && self->client != 0) { if (close(self->client) < 0) { LF_ERR(NET, "Error closing client socket %d", errno); } @@ -258,13 +258,22 @@ static void TcpIpChannel_free(NetworkChannel *untyped_self) { self->terminate = true; if (self->receive_thread != 0) { + int err = 0; LF_DEBUG(NET, "Stopping receive thread"); - if (pthread_cancel(self->receive_thread) != 0) { - LF_ERR(NET, "Error canceling receive thread"); + + err = pthread_cancel(self->receive_thread); + if (err != 0) { + LF_ERR(NET, "Error canceling receive thread %d", err); } - if (pthread_join(self->receive_thread, NULL) != 0) { - LF_ERR(NET, "Error joining receive thread"); + err = pthread_join(self->receive_thread, NULL); + if (err != 0) { + LF_ERR(NET, "Error joining receive thread %d", err); + } + + err = pthread_attr_destroy(&self->receive_thread_attr); + if (err != 0) { + LF_ERR(NET, "Error destroying pthread attr %d", err); } } self->super.close_connection((NetworkChannel *)self); @@ -301,4 +310,6 @@ void TcpIpChannel_ctor(TcpIpChannel *self, const char *host, unsigned short port self->super.free = TcpIpChannel_free; self->receive_callback = NULL; self->federated_connection = NULL; + + self->receive_thread = 0; } \ No newline at end of file From 638ff5ed3a7f6b05c7288c73587c23fad8fe2179 Mon Sep 17 00:00:00 2001 From: Lasse Rosenow Date: Fri, 25 Oct 2024 19:15:24 +0000 Subject: [PATCH 15/18] Improve networkchannel unit test --- test/unit/mock.c | 4 +- test/unit/network_channels_test.c | 70 ++++++++++++++++++++----------- 2 files changed, 48 insertions(+), 26 deletions(-) diff --git a/test/unit/mock.c b/test/unit/mock.c index 7ed111e1..65538679 100644 --- a/test/unit/mock.c +++ b/test/unit/mock.c @@ -1,3 +1,3 @@ #include "unity.h" -void setUp(void) {} -void tearDown(void) {} \ No newline at end of file +// void setUp(void) {} +// void tearDown(void) {} \ No newline at end of file diff --git a/test/unit/network_channels_test.c b/test/unit/network_channels_test.c index 3604fc3b..637d86d0 100644 --- a/test/unit/network_channels_test.c +++ b/test/unit/network_channels_test.c @@ -7,6 +7,8 @@ #define MESSAGE_CONTENT "Hello World1234" #define MESSAGE_CONNECTION_ID 42 +#define HOST "127.0.0.1" +#define PORT 8903 NetworkChannel *server_channel; NetworkChannel *client_channel; @@ -16,38 +18,40 @@ TcpIpChannel _client_tcp_channel; bool server_callback_called = false; bool client_callback_called = false; -void test_init_tcp_ip(void) { - const char *host = "127.0.0.1"; - unsigned short port = 8903; // NOLINT - +void setUp(void) { /* init server */ - TcpIpChannel_ctor(&_server_tcp_channel, host, port, AF_INET, true); + TcpIpChannel_ctor(&_server_tcp_channel, HOST, PORT, AF_INET, true); server_channel = &_server_tcp_channel.super; /* init client */ - TcpIpChannel_ctor(&_client_tcp_channel, host, port, AF_INET, false); + TcpIpChannel_ctor(&_client_tcp_channel, HOST, PORT, AF_INET, false); client_channel = &_client_tcp_channel.super; } -void test_connect(void) { - // open connection +void tearDown(void) { + server_channel->free(server_channel); + client_channel->free(client_channel); +} + +/* TESTS */ +void test_server_open_connection_non_blocking(void) { server_channel->open_connection(server_channel); } + +void test_client_open_connection_non_blocking(void) { client_channel->open_connection(client_channel); } + +void test_server_try_connect_non_blocking(void) { + /* open connection */ server_channel->open_connection(server_channel); + + /* try connect */ + server_channel->try_connect(server_channel); +} + +void test_client_try_connect_non_blocking(void) { + /* open connection */ client_channel->open_connection(client_channel); - // try connect - bool client_connected = false; - bool server_connected = false; - while (!client_connected && !server_connected) { - /* client tries to connect to server */ - if (client_channel->try_connect(client_channel) == LF_OK) { - client_connected = true; - } - - /* server tries to connect to client */ - if (server_channel->try_connect(server_channel) == LF_OK) { - server_connected = true; - } - } + /* try connect */ + int ret = client_channel->try_connect(client_channel); } void server_callback_handler(FederatedConnectionBundle *self, TaggedMessage *msg) { @@ -61,6 +65,14 @@ void server_callback_handler(FederatedConnectionBundle *self, TaggedMessage *msg } void test_client_send_and_server_recv(void) { + // open connection + server_channel->open_connection(server_channel); + client_channel->open_connection(client_channel); + + // try connect + client_channel->try_connect(client_channel); + server_channel->try_connect(server_channel); + // register receive callback for handling incoming messages server_channel->register_receive_callback(server_channel, server_callback_handler, NULL); @@ -92,6 +104,14 @@ void client_callback_handler(FederatedConnectionBundle *self, TaggedMessage *msg } void test_server_send_and_client_recv(void) { + // open connection + server_channel->open_connection(server_channel); + client_channel->open_connection(client_channel); + + // try connect + client_channel->try_connect(client_channel); + server_channel->try_connect(server_channel); + // register receive callback for handling incoming messages client_channel->register_receive_callback(client_channel, client_callback_handler, NULL); @@ -114,8 +134,10 @@ void test_server_send_and_client_recv(void) { int main(void) { UNITY_BEGIN(); - RUN_TEST(test_init_tcp_ip); - RUN_TEST(test_connect); + RUN_TEST(test_server_open_connection_non_blocking); + RUN_TEST(test_client_open_connection_non_blocking); + RUN_TEST(test_server_try_connect_non_blocking); + RUN_TEST(test_client_try_connect_non_blocking); RUN_TEST(test_client_send_and_server_recv); RUN_TEST(test_server_send_and_client_recv); return UNITY_END(); From 663a07def41ddb84d7211701bf3fb06befa7c359 Mon Sep 17 00:00:00 2001 From: erlingrj Date: Fri, 25 Oct 2024 12:39:45 -0700 Subject: [PATCH 16/18] Rename to tcp_channel_test as this is more accurate --- test/unit/{network_channels_test.c => tcp_channel_test.c} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename test/unit/{network_channels_test.c => tcp_channel_test.c} (100%) diff --git a/test/unit/network_channels_test.c b/test/unit/tcp_channel_test.c similarity index 100% rename from test/unit/network_channels_test.c rename to test/unit/tcp_channel_test.c From 1cd197ccb446af3fd2cf592b157bdcaac9efb534 Mon Sep 17 00:00:00 2001 From: erlingrj Date: Fri, 25 Oct 2024 12:40:09 -0700 Subject: [PATCH 17/18] Include the TcpIpChannel in the build when we build the tests (this is a hack/workaround) --- CMakeLists.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/CMakeLists.txt b/CMakeLists.txt index ef79a893..86df9763 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -30,6 +30,7 @@ endif() if(BUILD_TESTS) set(BUILD_LF_TESTS ON) set(BUILD_UNIT_TESTS ON) + set(NETWORK_POSIX_TCP ON) # TODO: This is currently needed because one of the tests uses this stack, we need a nicer way of selecting build options for tests and apps. set(CMAKE_BUILD_TYPE "Debug") find_program(CLANG_TIDY clang-tidy) if (CLANG_TIDY) From 9c59e414eeb96523b60577fb5ab6090c08e69fda Mon Sep 17 00:00:00 2001 From: erlingrj Date: Fri, 25 Oct 2024 12:40:27 -0700 Subject: [PATCH 18/18] Use weak linkage on setUp and tearDown such that they can be overridden in each test file --- test/unit/mock.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/test/unit/mock.c b/test/unit/mock.c index 65538679..d792b769 100644 --- a/test/unit/mock.c +++ b/test/unit/mock.c @@ -1,3 +1,5 @@ #include "unity.h" -// void setUp(void) {} -// void tearDown(void) {} \ No newline at end of file + +// These can be overridden in the test files if needed +__attribute__((weak)) void setUp(void) {} +__attribute__((weak)) void tearDown(void) {} \ No newline at end of file