From 4691fc3c5899b8a42354943ce85c45f34914c3ee Mon Sep 17 00:00:00 2001 From: Tassilo Tanneberger Date: Fri, 25 Oct 2024 03:42:53 +0200 Subject: [PATCH] serialize/deserialize hooks (#97) * add functions to set serialize/deserialize functions * clean up fixing some smells * use hooks * moving serialization/deserialization into FederatedConnection * adding example with custom serialize/deserialize * fix array * old callback interface * clean up * fix * last fixes --- examples/posix/testing_fed_conn.c | 40 ++++++++++++++--- include/reactor-uc/federated.h | 14 +++++- src/action.c | 1 - src/federated.c | 67 ++++++++++++++++++++--------- src/platform/posix/posix.c | 2 - src/platform/posix/tcp_ip_channel.c | 12 ++---- src/reaction.c | 2 - src/scheduler.c | 3 +- src/tag.c | 1 - src/timer.c | 2 - src/trigger.c | 4 -- 11 files changed, 97 insertions(+), 51 deletions(-) diff --git a/examples/posix/testing_fed_conn.c b/examples/posix/testing_fed_conn.c index e92a5a9e..902a335f 100644 --- a/examples/posix/testing_fed_conn.c +++ b/examples/posix/testing_fed_conn.c @@ -7,9 +7,28 @@ #define PORT_NUM 8901 typedef struct { - char msg[32]; + int size; + char msg[512]; } msg_t; +lf_ret_t deserialize_msg_t(void *user_struct, const unsigned char *msg_buf, size_t msg_size) { + msg_t* msg = user_struct; + memcpy(&msg->size, msg_buf, sizeof(msg->size)); + memcpy(msg->msg, msg_buf + sizeof(msg->size), sizeof(msg->size)); + + return LF_OK; +} + +size_t serialize_msg_t(const void *user_struct, size_t user_struct_size, unsigned char *msg_buf) { + const msg_t* msg = user_struct; + + memcpy(msg_buf, &msg->size, sizeof(msg->size)); + memcpy(msg_buf + sizeof(msg->size), msg->msg, sizeof(msg->size)); + + return sizeof(msg->size) + msg->size; +} + + DEFINE_TIMER_STRUCT(Timer1, 1) DEFINE_TIMER_CTOR_FIXED(Timer1, 1, MSEC(0), SEC(1)) DEFINE_REACTION_STRUCT(Sender, 0, 1) @@ -33,6 +52,7 @@ DEFINE_REACTION_BODY(Sender, 0) { printf("Timer triggered @ %" PRId64 "\n", env->get_elapsed_logical_time(env)); msg_t val; strcpy(val.msg, "Hello From Sender"); + val.size = sizeof("Hello From Sender"); lf_set(out, val); } DEFINE_REACTION_CTOR(Sender, 0) @@ -67,7 +87,7 @@ DEFINE_REACTION_BODY(Receiver, 0) { Receiver *self = (Receiver *)_self->parent; Environment *env = self->super.env; In *inp = &self->inp; - printf("Input triggered @ %" PRId64 " with %s\n", env->get_elapsed_logical_time(env), inp->value.msg); + printf("Input triggered @ %" PRId64 " with %s size %i\n", env->get_elapsed_logical_time(env), inp->value.msg, inp->value.size); } DEFINE_REACTION_CTOR(Receiver, 0) @@ -89,6 +109,7 @@ typedef struct { TcpIpChannel channel; ConnSender conn; FederatedOutputConnection *output[1]; + serialize_hook serialize_hooks[1]; } SenderRecvBundle; void SenderRecvConn_ctor(SenderRecvBundle *self, Sender *parent) { @@ -106,8 +127,11 @@ void SenderRecvConn_ctor(SenderRecvBundle *self, Sender *parent) { validate(new_connection); printf("Sender: Accepted\n"); - FederatedConnectionBundle_ctor(&self->super, &parent->super, &self->channel.super, NULL, 0, - (FederatedOutputConnection **)&self->output, 1); + self->serialize_hooks[0] = serialize_msg_t; + + FederatedConnectionBundle_ctor(&self->super, &parent->super, &self->channel.super, + NULL, NULL, 0, + (FederatedOutputConnection **)&self->output, self->serialize_hooks, 1); } DEFINE_FEDERATED_INPUT_CONNECTION(ConnRecv, 1, msg_t, 5, MSEC(100), false) @@ -117,6 +141,7 @@ typedef struct { TcpIpChannel channel; ConnRecv conn; FederatedInputConnection *inputs[1]; + deserialize_hook deserialize_hooks[1]; } RecvSenderBundle; void RecvSenderBundle_ctor(RecvSenderBundle *self, Reactor *parent) { @@ -133,8 +158,11 @@ void RecvSenderBundle_ctor(RecvSenderBundle *self, Reactor *parent) { validate(ret == LF_OK); printf("Recv: Connected\n"); - FederatedConnectionBundle_ctor(&self->super, parent, &self->channel.super, (FederatedInputConnection **)&self->inputs, - 1, NULL, 0); + self->deserialize_hooks[0] = deserialize_msg_t; + + FederatedConnectionBundle_ctor(&self->super, parent, &self->channel.super, + (FederatedInputConnection **)&self->inputs, self->deserialize_hooks, 1, + NULL, NULL, 0); } // Reactor main diff --git a/include/reactor-uc/federated.h b/include/reactor-uc/federated.h index 9642d9bf..7b10f3cf 100644 --- a/include/reactor-uc/federated.h +++ b/include/reactor-uc/federated.h @@ -10,6 +10,12 @@ typedef struct FederatedOutputConnection FederatedOutputConnection; typedef struct FederatedInputConnection FederatedInputConnection; typedef struct NetworkChannel NetworkChannel; +// returns how many bytes of the buffer were used by the serialized string +typedef size_t (*serialize_hook)(const void *user_struct, size_t user_struct_size, unsigned char *msg_buffer); + +// returns if the deserialization was successful +typedef lf_ret_t (*deserialize_hook)(void *user_struct, const unsigned char *msg_buffer, size_t msg_size); + // Wrapping all connections going both ways between this federated and // another federated of. struct FederatedConnectionBundle { @@ -17,16 +23,20 @@ struct FederatedConnectionBundle { NetworkChannel *net_channel; // Pointer to the network super doing the actual I/O // Pointer to an array of input connections which should live in the derived struct. FederatedInputConnection **inputs; + deserialize_hook *deserialize_hooks; size_t inputs_size; + // Pointer to an array of output connections which should live in the derived struct. FederatedOutputConnection **outputs; + serialize_hook *serialize_hooks; size_t outputs_size; bool server; // Does this federate work as server or client }; void FederatedConnectionBundle_ctor(FederatedConnectionBundle *self, Reactor *parent, NetworkChannel *net_channel, - FederatedInputConnection **inputs, size_t inputs_size, - FederatedOutputConnection **outputs, size_t outputs_size); + FederatedInputConnection **inputs, deserialize_hook *deserialize_hooks, + size_t inputs_size, FederatedOutputConnection **outputs, + serialize_hook *serialize_hooks, size_t outputs_size); // A single output connection from this federate. Might connect to several // downstream ports, but all of them must be in the same federate diff --git a/src/action.c b/src/action.c index 589a575c..9a4b0b80 100644 --- a/src/action.c +++ b/src/action.c @@ -3,7 +3,6 @@ #include "reactor-uc/logging.h" #include "reactor-uc/trigger.h" -#include #include void Action_cleanup(Trigger *self) { diff --git a/src/federated.c b/src/federated.c index cfef3acd..0f7d10a3 100644 --- a/src/federated.c +++ b/src/federated.c @@ -3,6 +3,9 @@ #include "reactor-uc/logging.h" #include "reactor-uc/platform.h" +#undef MIN +#define MIN(x, y) (((x) < (y)) ? (x) : (y)) + // TODO: Refactor so this function is available void LogicalConnection_trigger_downstreams(Connection *self, const void *value, size_t value_size); @@ -48,8 +51,9 @@ void FederatedOutputConnection_cleanup(Trigger *trigger) { msg.tag.time = sched->current_tag.time; msg.tag.microstep = sched->current_tag.microstep; - memcpy(msg.payload.bytes, self->staged_payload_ptr, self->payload_pool.size); - msg.payload.size = self->payload_pool.size; + size_t msg_size = (*self->bundle->serialize_hooks[self->conn_id])(self->staged_payload_ptr, self->payload_pool.size, + msg.payload.bytes); + msg.payload.size = msg_size; LF_DEBUG(FED, "FedOutConn %p sending message with tag=%" PRId64 ":%" PRIu32, trigger, msg.tag.time, msg.tag.microstep); @@ -141,23 +145,28 @@ void FederatedConnectionBundle_msg_received_cb(FederatedConnectionBundle *self, if (ret != LF_OK) { LF_ERR(FED, "Input buffer at Connection %p is full. Dropping incoming msg", input); } else { - memcpy(payload, msg->payload.bytes, msg->payload.size); - Event event = EVENT_INIT(tag, &input->super.super, payload); - ret = sched->schedule_at_locked(sched, &event); - switch (ret) { - case LF_AFTER_STOP_TAG: - LF_WARN(FED, "Tried scheduling event after stop tag. Dropping\n"); - break; - case LF_PAST_TAG: - LF_WARN(FED, "Tried scheduling event to a past tag. Dropping\n"); - break; - case LF_OK: - env->platform->new_async_event(env->platform); - break; - default: - LF_ERR(FED, "Unknown return value `%d` from schedule_at_locked\n", ret); - validate(false); - break; + lf_ret_t status = (*self->deserialize_hooks[msg->conn_id])(payload, msg->payload.bytes, msg->payload.size); + + if (status == LF_OK) { + Event event = EVENT_INIT(tag, &input->super.super, payload); + ret = sched->schedule_at_locked(sched, &event); + switch (ret) { + case LF_AFTER_STOP_TAG: + LF_WARN(FED, "Tried scheduling event after stop tag. Dropping\n"); + break; + case LF_PAST_TAG: + LF_WARN(FED, "Tried scheduling event to a past tag. Dropping\n"); + break; + case LF_OK: + env->platform->new_async_event(env->platform); + break; + default: + LF_ERR(FED, "Unknown return value `%d` from schedule_at_locked\n", ret); + validate(false); + break; + } + } else { + LF_ERR(FED, "Cannot deserialize message from other Federate. Dropping\n"); } if (lf_tag_compare(input->last_known_tag, tag) < 0) { @@ -169,15 +178,31 @@ void FederatedConnectionBundle_msg_received_cb(FederatedConnectionBundle *self, env->platform->leave_critical_section(env->platform); } +lf_ret_t standard_deserialization(void *user_struct, const unsigned char *msg_buf, size_t msg_size) { + memcpy(user_struct, msg_buf, MIN(msg_size, 832)); + + return LF_OK; +} + +size_t standard_serialization(const void *user_struct, size_t user_struct_size, unsigned char *msg_buf) { + memcpy(msg_buf, user_struct, MIN(user_struct_size, 832)); + + return MIN(user_struct_size, 832); +} + void FederatedConnectionBundle_ctor(FederatedConnectionBundle *self, Reactor *parent, NetworkChannel *net_channel, - FederatedInputConnection **inputs, size_t inputs_size, - FederatedOutputConnection **outputs, size_t outputs_size) { + FederatedInputConnection **inputs, deserialize_hook *deserialize_hooks, + size_t inputs_size, FederatedOutputConnection **outputs, + serialize_hook *serialize_hooks, size_t outputs_size) { self->inputs = inputs; self->inputs_size = inputs_size; self->net_channel = net_channel; self->outputs = outputs; self->outputs_size = outputs_size; self->parent = parent; + self->deserialize_hooks = deserialize_hooks; + self->serialize_hooks = serialize_hooks; + // Register callback function for message received. self->net_channel->register_callback(self->net_channel, FederatedConnectionBundle_msg_received_cb, self); } diff --git a/src/platform/posix/posix.c b/src/platform/posix/posix.c index b100bfd0..c7faa8e6 100644 --- a/src/platform/posix/posix.c +++ b/src/platform/posix/posix.c @@ -1,9 +1,7 @@ #include "reactor-uc/platform/posix/posix.h" #include "reactor-uc/logging.h" -#include #include #include -#include #include #include diff --git a/src/platform/posix/tcp_ip_channel.c b/src/platform/posix/tcp_ip_channel.c index 4a2ff5ed..df34775e 100644 --- a/src/platform/posix/tcp_ip_channel.c +++ b/src/platform/posix/tcp_ip_channel.c @@ -3,11 +3,7 @@ #include "reactor-uc/logging.h" #include -#include #include -#include -#include -#include #include #include #include @@ -115,7 +111,7 @@ lf_ret_t TcpIpChannel_send(NetworkChannel *untyped_self, TaggedMessage *message) while (bytes_written < message_size && timeout > 0) { LF_DEBUG(NET, "Sending %d bytes", message_size - bytes_written); - int bytes_send = send(socket, self->write_buffer + bytes_written, message_size - bytes_written, 0); + ssize_t bytes_send = send(socket, self->write_buffer + bytes_written, message_size - bytes_written, 0); LF_DEBUG(NET, "%d bytes sent", bytes_send); if (bytes_send < 0) { @@ -149,13 +145,13 @@ TaggedMessage *TcpIpChannel_receive(NetworkChannel *untyped_self) { // calculating the maximum amount of bytes we can read int bytes_available = TCP_IP_CHANNEL_BUFFERSIZE - self->read_index; - int bytes_left = 0; + int bytes_left; bool read_more = true; while (read_more) { // reading from socket - int bytes_read = recv(socket, self->read_buffer + self->read_index, bytes_available, 0); + ssize_t bytes_read = recv(socket, self->read_buffer + self->read_index, bytes_available, 0); if (bytes_read < 0) { LF_ERR(NET, "Error recv from socket %d", errno); @@ -214,7 +210,7 @@ void TcpIpChannel_register_callback(NetworkChannel *untyped_self, self->receive_callback = receive_callback; self->federated_connection = conn; memset(&self->receive_thread_stack, 0, TCP_IP_CHANNEL_RECV_THREAD_STACK_SIZE); - if (pthread_attr_init(&self->receive_thread_attr) < 0) { + if (pthread_attr_init(&self->receive_thread_attr) != 0) { throw("pthread_attr_init failed"); } /* TODO: RIOT posix-wrappers don't have pthread_attr_setstack yet */ diff --git a/src/reaction.c b/src/reaction.c index 5144712b..84d7a5ac 100644 --- a/src/reaction.c +++ b/src/reaction.c @@ -3,8 +3,6 @@ #include "reactor-uc/port.h" #include "reactor-uc/trigger.h" -#include - static size_t calculate_input_port_level(Input *port); size_t Reaction_get_level(Reaction *self) { diff --git a/src/scheduler.c b/src/scheduler.c index 09f75b89..1706c3af 100644 --- a/src/scheduler.c +++ b/src/scheduler.c @@ -1,7 +1,6 @@ #include "reactor-uc/scheduler.h" #include "reactor-uc/environment.h" #include "reactor-uc/logging.h" -#include "reactor-uc/reactor-uc.h" // Private functions @@ -147,7 +146,7 @@ void Scheduler_terminate(Scheduler *self) { void Scheduler_run(Scheduler *self) { Environment *env = self->env; - lf_ret_t res = 0; + lf_ret_t res; bool do_shutdown = false; bool non_terminating = self->keep_alive || env->has_async_events; LF_INFO(SCHED, "Scheduler running with non_terminating=%d has_async_events=%d", non_terminating, diff --git a/src/tag.c b/src/tag.c index e69d8226..a770859e 100644 --- a/src/tag.c +++ b/src/tag.c @@ -9,7 +9,6 @@ */ #include "reactor-uc/tag.h" -#include "reactor-uc/environment.h" instant_t lf_time_add(instant_t time, interval_t interval) { if (time == NEVER || interval == NEVER) { diff --git a/src/timer.c b/src/timer.c index 4bf24aa9..d6e60db0 100644 --- a/src/timer.c +++ b/src/timer.c @@ -2,8 +2,6 @@ #include "reactor-uc/environment.h" #include "reactor-uc/logging.h" -#include - void Timer_prepare(Trigger *_self, Event *event) { (void)event; LF_DEBUG(TRIG, "Preparing timer %p", _self); diff --git a/src/trigger.c b/src/trigger.c index 7e2c4025..395d135d 100644 --- a/src/trigger.c +++ b/src/trigger.c @@ -1,9 +1,5 @@ #include "reactor-uc/trigger.h" #include "reactor-uc/environment.h" -#include "reactor-uc/logging.h" -#include "reactor-uc/timer.h" - -#include void Trigger_ctor(Trigger *self, TriggerType type, Reactor *parent, EventPayloadPool *payload_pool, void (*prepare)(Trigger *, Event *), void (*cleanup)(Trigger *)) {