Skip to content

Commit

Permalink
serialize/deserialize hooks (#97)
Browse files Browse the repository at this point in the history
* add functions to set serialize/deserialize functions

* clean up fixing some smells

* use hooks

* moving serialization/deserialization into FederatedConnection

* adding example with custom serialize/deserialize

* fix array

* old callback interface

* clean up

* fix

* last fixes
  • Loading branch information
tanneberger authored Oct 25, 2024
1 parent b2c7758 commit 4691fc3
Show file tree
Hide file tree
Showing 11 changed files with 97 additions and 51 deletions.
40 changes: 34 additions & 6 deletions examples/posix/testing_fed_conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,28 @@
#define PORT_NUM 8901

typedef struct {
char msg[32];
int size;
char msg[512];
} msg_t;

lf_ret_t deserialize_msg_t(void *user_struct, const unsigned char *msg_buf, size_t msg_size) {
msg_t* msg = user_struct;
memcpy(&msg->size, msg_buf, sizeof(msg->size));
memcpy(msg->msg, msg_buf + sizeof(msg->size), sizeof(msg->size));

return LF_OK;
}

size_t serialize_msg_t(const void *user_struct, size_t user_struct_size, unsigned char *msg_buf) {
const msg_t* msg = user_struct;

memcpy(msg_buf, &msg->size, sizeof(msg->size));
memcpy(msg_buf + sizeof(msg->size), msg->msg, sizeof(msg->size));

return sizeof(msg->size) + msg->size;
}


DEFINE_TIMER_STRUCT(Timer1, 1)
DEFINE_TIMER_CTOR_FIXED(Timer1, 1, MSEC(0), SEC(1))
DEFINE_REACTION_STRUCT(Sender, 0, 1)
Expand All @@ -33,6 +52,7 @@ DEFINE_REACTION_BODY(Sender, 0) {
printf("Timer triggered @ %" PRId64 "\n", env->get_elapsed_logical_time(env));
msg_t val;
strcpy(val.msg, "Hello From Sender");
val.size = sizeof("Hello From Sender");
lf_set(out, val);
}
DEFINE_REACTION_CTOR(Sender, 0)
Expand Down Expand Up @@ -67,7 +87,7 @@ DEFINE_REACTION_BODY(Receiver, 0) {
Receiver *self = (Receiver *)_self->parent;
Environment *env = self->super.env;
In *inp = &self->inp;
printf("Input triggered @ %" PRId64 " with %s\n", env->get_elapsed_logical_time(env), inp->value.msg);
printf("Input triggered @ %" PRId64 " with %s size %i\n", env->get_elapsed_logical_time(env), inp->value.msg, inp->value.size);
}
DEFINE_REACTION_CTOR(Receiver, 0)

Expand All @@ -89,6 +109,7 @@ typedef struct {
TcpIpChannel channel;
ConnSender conn;
FederatedOutputConnection *output[1];
serialize_hook serialize_hooks[1];
} SenderRecvBundle;

void SenderRecvConn_ctor(SenderRecvBundle *self, Sender *parent) {
Expand All @@ -106,8 +127,11 @@ void SenderRecvConn_ctor(SenderRecvBundle *self, Sender *parent) {
validate(new_connection);
printf("Sender: Accepted\n");

FederatedConnectionBundle_ctor(&self->super, &parent->super, &self->channel.super, NULL, 0,
(FederatedOutputConnection **)&self->output, 1);
self->serialize_hooks[0] = serialize_msg_t;

FederatedConnectionBundle_ctor(&self->super, &parent->super, &self->channel.super,
NULL, NULL, 0,
(FederatedOutputConnection **)&self->output, self->serialize_hooks, 1);
}

DEFINE_FEDERATED_INPUT_CONNECTION(ConnRecv, 1, msg_t, 5, MSEC(100), false)
Expand All @@ -117,6 +141,7 @@ typedef struct {
TcpIpChannel channel;
ConnRecv conn;
FederatedInputConnection *inputs[1];
deserialize_hook deserialize_hooks[1];
} RecvSenderBundle;

void RecvSenderBundle_ctor(RecvSenderBundle *self, Reactor *parent) {
Expand All @@ -133,8 +158,11 @@ void RecvSenderBundle_ctor(RecvSenderBundle *self, Reactor *parent) {
validate(ret == LF_OK);
printf("Recv: Connected\n");

FederatedConnectionBundle_ctor(&self->super, parent, &self->channel.super, (FederatedInputConnection **)&self->inputs,
1, NULL, 0);
self->deserialize_hooks[0] = deserialize_msg_t;

FederatedConnectionBundle_ctor(&self->super, parent, &self->channel.super,
(FederatedInputConnection **)&self->inputs, self->deserialize_hooks, 1,
NULL, NULL, 0);
}

// Reactor main
Expand Down
14 changes: 12 additions & 2 deletions include/reactor-uc/federated.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,33 @@ typedef struct FederatedOutputConnection FederatedOutputConnection;
typedef struct FederatedInputConnection FederatedInputConnection;
typedef struct NetworkChannel NetworkChannel;

// returns how many bytes of the buffer were used by the serialized string
typedef size_t (*serialize_hook)(const void *user_struct, size_t user_struct_size, unsigned char *msg_buffer);

// returns if the deserialization was successful
typedef lf_ret_t (*deserialize_hook)(void *user_struct, const unsigned char *msg_buffer, size_t msg_size);

// Wrapping all connections going both ways between this federated and
// another federated of.
struct FederatedConnectionBundle {
Reactor *parent; // Pointer to the federate
NetworkChannel *net_channel; // Pointer to the network super doing the actual I/O
// Pointer to an array of input connections which should live in the derived struct.
FederatedInputConnection **inputs;
deserialize_hook *deserialize_hooks;
size_t inputs_size;

// Pointer to an array of output connections which should live in the derived struct.
FederatedOutputConnection **outputs;
serialize_hook *serialize_hooks;
size_t outputs_size;
bool server; // Does this federate work as server or client
};

void FederatedConnectionBundle_ctor(FederatedConnectionBundle *self, Reactor *parent, NetworkChannel *net_channel,
FederatedInputConnection **inputs, size_t inputs_size,
FederatedOutputConnection **outputs, size_t outputs_size);
FederatedInputConnection **inputs, deserialize_hook *deserialize_hooks,
size_t inputs_size, FederatedOutputConnection **outputs,
serialize_hook *serialize_hooks, size_t outputs_size);

// A single output connection from this federate. Might connect to several
// downstream ports, but all of them must be in the same federate
Expand Down
1 change: 0 additions & 1 deletion src/action.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
#include "reactor-uc/logging.h"
#include "reactor-uc/trigger.h"

#include <assert.h>
#include <string.h>

void Action_cleanup(Trigger *self) {
Expand Down
67 changes: 46 additions & 21 deletions src/federated.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
#include "reactor-uc/logging.h"
#include "reactor-uc/platform.h"

#undef MIN
#define MIN(x, y) (((x) < (y)) ? (x) : (y))

// TODO: Refactor so this function is available
void LogicalConnection_trigger_downstreams(Connection *self, const void *value, size_t value_size);

Expand Down Expand Up @@ -48,8 +51,9 @@ void FederatedOutputConnection_cleanup(Trigger *trigger) {
msg.tag.time = sched->current_tag.time;
msg.tag.microstep = sched->current_tag.microstep;

memcpy(msg.payload.bytes, self->staged_payload_ptr, self->payload_pool.size);
msg.payload.size = self->payload_pool.size;
size_t msg_size = (*self->bundle->serialize_hooks[self->conn_id])(self->staged_payload_ptr, self->payload_pool.size,
msg.payload.bytes);
msg.payload.size = msg_size;

LF_DEBUG(FED, "FedOutConn %p sending message with tag=%" PRId64 ":%" PRIu32, trigger, msg.tag.time,
msg.tag.microstep);
Expand Down Expand Up @@ -141,23 +145,28 @@ void FederatedConnectionBundle_msg_received_cb(FederatedConnectionBundle *self,
if (ret != LF_OK) {
LF_ERR(FED, "Input buffer at Connection %p is full. Dropping incoming msg", input);
} else {
memcpy(payload, msg->payload.bytes, msg->payload.size);
Event event = EVENT_INIT(tag, &input->super.super, payload);
ret = sched->schedule_at_locked(sched, &event);
switch (ret) {
case LF_AFTER_STOP_TAG:
LF_WARN(FED, "Tried scheduling event after stop tag. Dropping\n");
break;
case LF_PAST_TAG:
LF_WARN(FED, "Tried scheduling event to a past tag. Dropping\n");
break;
case LF_OK:
env->platform->new_async_event(env->platform);
break;
default:
LF_ERR(FED, "Unknown return value `%d` from schedule_at_locked\n", ret);
validate(false);
break;
lf_ret_t status = (*self->deserialize_hooks[msg->conn_id])(payload, msg->payload.bytes, msg->payload.size);

if (status == LF_OK) {
Event event = EVENT_INIT(tag, &input->super.super, payload);
ret = sched->schedule_at_locked(sched, &event);
switch (ret) {
case LF_AFTER_STOP_TAG:
LF_WARN(FED, "Tried scheduling event after stop tag. Dropping\n");
break;
case LF_PAST_TAG:
LF_WARN(FED, "Tried scheduling event to a past tag. Dropping\n");
break;
case LF_OK:
env->platform->new_async_event(env->platform);
break;
default:
LF_ERR(FED, "Unknown return value `%d` from schedule_at_locked\n", ret);
validate(false);
break;
}
} else {
LF_ERR(FED, "Cannot deserialize message from other Federate. Dropping\n");
}

if (lf_tag_compare(input->last_known_tag, tag) < 0) {
Expand All @@ -169,15 +178,31 @@ void FederatedConnectionBundle_msg_received_cb(FederatedConnectionBundle *self,
env->platform->leave_critical_section(env->platform);
}

lf_ret_t standard_deserialization(void *user_struct, const unsigned char *msg_buf, size_t msg_size) {
memcpy(user_struct, msg_buf, MIN(msg_size, 832));

return LF_OK;
}

size_t standard_serialization(const void *user_struct, size_t user_struct_size, unsigned char *msg_buf) {
memcpy(msg_buf, user_struct, MIN(user_struct_size, 832));

return MIN(user_struct_size, 832);
}

void FederatedConnectionBundle_ctor(FederatedConnectionBundle *self, Reactor *parent, NetworkChannel *net_channel,
FederatedInputConnection **inputs, size_t inputs_size,
FederatedOutputConnection **outputs, size_t outputs_size) {
FederatedInputConnection **inputs, deserialize_hook *deserialize_hooks,
size_t inputs_size, FederatedOutputConnection **outputs,
serialize_hook *serialize_hooks, size_t outputs_size) {
self->inputs = inputs;
self->inputs_size = inputs_size;
self->net_channel = net_channel;
self->outputs = outputs;
self->outputs_size = outputs_size;
self->parent = parent;
self->deserialize_hooks = deserialize_hooks;
self->serialize_hooks = serialize_hooks;

// Register callback function for message received.
self->net_channel->register_callback(self->net_channel, FederatedConnectionBundle_msg_received_cb, self);
}
2 changes: 0 additions & 2 deletions src/platform/posix/posix.c
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
#include "reactor-uc/platform/posix/posix.h"
#include "reactor-uc/logging.h"
#include <assert.h>
#include <errno.h>
#include <pthread.h>
#include <stdbool.h>
#include <string.h>
#include <time.h>

Expand Down
12 changes: 4 additions & 8 deletions src/platform/posix/tcp_ip_channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,7 @@
#include "reactor-uc/logging.h"

#include <arpa/inet.h>
#include <assert.h>
#include <errno.h>
#include <fcntl.h>
#include <nanopb/pb_decode.h>
#include <nanopb/pb_encode.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
Expand Down Expand Up @@ -115,7 +111,7 @@ lf_ret_t TcpIpChannel_send(NetworkChannel *untyped_self, TaggedMessage *message)

while (bytes_written < message_size && timeout > 0) {
LF_DEBUG(NET, "Sending %d bytes", message_size - bytes_written);
int bytes_send = send(socket, self->write_buffer + bytes_written, message_size - bytes_written, 0);
ssize_t bytes_send = send(socket, self->write_buffer + bytes_written, message_size - bytes_written, 0);
LF_DEBUG(NET, "%d bytes sent", bytes_send);

if (bytes_send < 0) {
Expand Down Expand Up @@ -149,13 +145,13 @@ TaggedMessage *TcpIpChannel_receive(NetworkChannel *untyped_self) {

// calculating the maximum amount of bytes we can read
int bytes_available = TCP_IP_CHANNEL_BUFFERSIZE - self->read_index;
int bytes_left = 0;
int bytes_left;
bool read_more = true;

while (read_more) {

// reading from socket
int bytes_read = recv(socket, self->read_buffer + self->read_index, bytes_available, 0);
ssize_t bytes_read = recv(socket, self->read_buffer + self->read_index, bytes_available, 0);

if (bytes_read < 0) {
LF_ERR(NET, "Error recv from socket %d", errno);
Expand Down Expand Up @@ -214,7 +210,7 @@ void TcpIpChannel_register_callback(NetworkChannel *untyped_self,
self->receive_callback = receive_callback;
self->federated_connection = conn;
memset(&self->receive_thread_stack, 0, TCP_IP_CHANNEL_RECV_THREAD_STACK_SIZE);
if (pthread_attr_init(&self->receive_thread_attr) < 0) {
if (pthread_attr_init(&self->receive_thread_attr) != 0) {
throw("pthread_attr_init failed");
}
/* TODO: RIOT posix-wrappers don't have pthread_attr_setstack yet */
Expand Down
2 changes: 0 additions & 2 deletions src/reaction.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
#include "reactor-uc/port.h"
#include "reactor-uc/trigger.h"

#include <assert.h>

static size_t calculate_input_port_level(Input *port);

size_t Reaction_get_level(Reaction *self) {
Expand Down
3 changes: 1 addition & 2 deletions src/scheduler.c
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#include "reactor-uc/scheduler.h"
#include "reactor-uc/environment.h"
#include "reactor-uc/logging.h"
#include "reactor-uc/reactor-uc.h"

// Private functions

Expand Down Expand Up @@ -147,7 +146,7 @@ void Scheduler_terminate(Scheduler *self) {

void Scheduler_run(Scheduler *self) {
Environment *env = self->env;
lf_ret_t res = 0;
lf_ret_t res;
bool do_shutdown = false;
bool non_terminating = self->keep_alive || env->has_async_events;
LF_INFO(SCHED, "Scheduler running with non_terminating=%d has_async_events=%d", non_terminating,
Expand Down
1 change: 0 additions & 1 deletion src/tag.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
*/

#include "reactor-uc/tag.h"
#include "reactor-uc/environment.h"

instant_t lf_time_add(instant_t time, interval_t interval) {
if (time == NEVER || interval == NEVER) {
Expand Down
2 changes: 0 additions & 2 deletions src/timer.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
#include "reactor-uc/environment.h"
#include "reactor-uc/logging.h"

#include <assert.h>

void Timer_prepare(Trigger *_self, Event *event) {
(void)event;
LF_DEBUG(TRIG, "Preparing timer %p", _self);
Expand Down
4 changes: 0 additions & 4 deletions src/trigger.c
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
#include "reactor-uc/trigger.h"
#include "reactor-uc/environment.h"
#include "reactor-uc/logging.h"
#include "reactor-uc/timer.h"

#include <assert.h>

void Trigger_ctor(Trigger *self, TriggerType type, Reactor *parent, EventPayloadPool *payload_pool,
void (*prepare)(Trigger *, Event *), void (*cleanup)(Trigger *)) {
Expand Down

0 comments on commit 4691fc3

Please sign in to comment.