Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

serialize/deserialize hooks #97

Merged
merged 10 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 34 additions & 6 deletions examples/posix/testing_fed_conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,28 @@
#define PORT_NUM 8901

typedef struct {
char msg[32];
int size;
char msg[512];
} msg_t;

lf_ret_t deserialize_msg_t(void *user_struct, const unsigned char *msg_buf, size_t msg_size) {
msg_t* msg = user_struct;
memcpy(&msg->size, msg_buf, sizeof(msg->size));
memcpy(msg->msg, msg_buf + sizeof(msg->size), sizeof(msg->size));

return LF_OK;
}

size_t serialize_msg_t(const void *user_struct, size_t user_struct_size, unsigned char *msg_buf) {
const msg_t* msg = user_struct;

memcpy(msg_buf, &msg->size, sizeof(msg->size));
memcpy(msg_buf + sizeof(msg->size), msg->msg, sizeof(msg->size));

return sizeof(msg->size) + msg->size;
}


DEFINE_TIMER_STRUCT(Timer1, 1)
DEFINE_TIMER_CTOR_FIXED(Timer1, 1, MSEC(0), SEC(1))
DEFINE_REACTION_STRUCT(Sender, 0, 1)
Expand All @@ -33,6 +52,7 @@ DEFINE_REACTION_BODY(Sender, 0) {
printf("Timer triggered @ %" PRId64 "\n", env->get_elapsed_logical_time(env));
msg_t val;
strcpy(val.msg, "Hello From Sender");
val.size = sizeof("Hello From Sender");
lf_set(out, val);
}
DEFINE_REACTION_CTOR(Sender, 0)
Expand Down Expand Up @@ -67,7 +87,7 @@ DEFINE_REACTION_BODY(Receiver, 0) {
Receiver *self = (Receiver *)_self->parent;
Environment *env = self->super.env;
In *inp = &self->inp;
printf("Input triggered @ %" PRId64 " with %s\n", env->get_elapsed_logical_time(env), inp->value.msg);
printf("Input triggered @ %" PRId64 " with %s size %i\n", env->get_elapsed_logical_time(env), inp->value.msg, inp->value.size);
}
DEFINE_REACTION_CTOR(Receiver, 0)

Expand All @@ -89,6 +109,7 @@ typedef struct {
TcpIpChannel channel;
ConnSender conn;
FederatedOutputConnection *output[1];
serialize_hook serialize_hooks[1];
} SenderRecvBundle;

void SenderRecvConn_ctor(SenderRecvBundle *self, Sender *parent) {
Expand All @@ -106,8 +127,11 @@ void SenderRecvConn_ctor(SenderRecvBundle *self, Sender *parent) {
validate(new_connection);
printf("Sender: Accepted\n");

FederatedConnectionBundle_ctor(&self->super, &parent->super, &self->channel.super, NULL, 0,
(FederatedOutputConnection **)&self->output, 1);
self->serialize_hooks[0] = serialize_msg_t;

FederatedConnectionBundle_ctor(&self->super, &parent->super, &self->channel.super,
NULL, NULL, 0,
(FederatedOutputConnection **)&self->output, self->serialize_hooks, 1);
}

DEFINE_FEDERATED_INPUT_CONNECTION(ConnRecv, 1, msg_t, 5, MSEC(100), false)
Expand All @@ -117,6 +141,7 @@ typedef struct {
TcpIpChannel channel;
ConnRecv conn;
FederatedInputConnection *inputs[1];
deserialize_hook deserialize_hooks[1];
} RecvSenderBundle;

void RecvSenderBundle_ctor(RecvSenderBundle *self, Reactor *parent) {
Expand All @@ -133,8 +158,11 @@ void RecvSenderBundle_ctor(RecvSenderBundle *self, Reactor *parent) {
validate(ret == LF_OK);
printf("Recv: Connected\n");

FederatedConnectionBundle_ctor(&self->super, parent, &self->channel.super, (FederatedInputConnection **)&self->inputs,
1, NULL, 0);
self->deserialize_hooks[0] = deserialize_msg_t;

FederatedConnectionBundle_ctor(&self->super, parent, &self->channel.super,
(FederatedInputConnection **)&self->inputs, self->deserialize_hooks, 1,
NULL, NULL, 0);
}

// Reactor main
Expand Down
14 changes: 12 additions & 2 deletions include/reactor-uc/federated.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,33 @@ typedef struct FederatedOutputConnection FederatedOutputConnection;
typedef struct FederatedInputConnection FederatedInputConnection;
typedef struct NetworkChannel NetworkChannel;

// returns how many bytes of the buffer were used by the serialized string
typedef size_t (*serialize_hook)(const void *user_struct, size_t user_struct_size, unsigned char *msg_buffer);

// returns if the deserialization was successful
typedef lf_ret_t (*deserialize_hook)(void *user_struct, const unsigned char *msg_buffer, size_t msg_size);

// Wrapping all connections going both ways between this federated and
// another federated of.
struct FederatedConnectionBundle {
Reactor *parent; // Pointer to the federate
NetworkChannel *net_channel; // Pointer to the network super doing the actual I/O
// Pointer to an array of input connections which should live in the derived struct.
FederatedInputConnection **inputs;
deserialize_hook *deserialize_hooks;
size_t inputs_size;

// Pointer to an array of output connections which should live in the derived struct.
FederatedOutputConnection **outputs;
serialize_hook *serialize_hooks;
size_t outputs_size;
bool server; // Does this federate work as server or client
};

void FederatedConnectionBundle_ctor(FederatedConnectionBundle *self, Reactor *parent, NetworkChannel *net_channel,
FederatedInputConnection **inputs, size_t inputs_size,
FederatedOutputConnection **outputs, size_t outputs_size);
FederatedInputConnection **inputs, deserialize_hook *deserialize_hooks,
size_t inputs_size, FederatedOutputConnection **outputs,
serialize_hook *serialize_hooks, size_t outputs_size);

// A single output connection from this federate. Might connect to several
// downstream ports, but all of them must be in the same federate
Expand Down
1 change: 0 additions & 1 deletion src/action.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
#include "reactor-uc/logging.h"
#include "reactor-uc/trigger.h"

#include <assert.h>
#include <string.h>

void Action_cleanup(Trigger *self) {
Expand Down
67 changes: 46 additions & 21 deletions src/federated.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
#include "reactor-uc/logging.h"
#include "reactor-uc/platform.h"

#undef MIN
#define MIN(x, y) (((x) < (y)) ? (x) : (y))

// TODO: Refactor so this function is available
void LogicalConnection_trigger_downstreams(Connection *self, const void *value, size_t value_size);

Expand Down Expand Up @@ -48,8 +51,9 @@ void FederatedOutputConnection_cleanup(Trigger *trigger) {
msg.tag.time = sched->current_tag.time;
msg.tag.microstep = sched->current_tag.microstep;

memcpy(msg.payload.bytes, self->staged_payload_ptr, self->payload_pool.size);
msg.payload.size = self->payload_pool.size;
size_t msg_size = (*self->bundle->serialize_hooks[self->conn_id])(self->staged_payload_ptr, self->payload_pool.size,
msg.payload.bytes);
msg.payload.size = msg_size;

LF_DEBUG(FED, "FedOutConn %p sending message with tag=%" PRId64 ":%" PRIu32, trigger, msg.tag.time,
msg.tag.microstep);
Expand Down Expand Up @@ -141,23 +145,28 @@ void FederatedConnectionBundle_msg_received_cb(FederatedConnectionBundle *self,
if (ret != LF_OK) {
LF_ERR(FED, "Input buffer at Connection %p is full. Dropping incoming msg", input);
} else {
memcpy(payload, msg->payload.bytes, msg->payload.size);
Event event = EVENT_INIT(tag, &input->super.super, payload);
ret = sched->schedule_at_locked(sched, &event);
switch (ret) {
case LF_AFTER_STOP_TAG:
LF_WARN(FED, "Tried scheduling event after stop tag. Dropping\n");
break;
case LF_PAST_TAG:
LF_WARN(FED, "Tried scheduling event to a past tag. Dropping\n");
break;
case LF_OK:
env->platform->new_async_event(env->platform);
break;
default:
LF_ERR(FED, "Unknown return value `%d` from schedule_at_locked\n", ret);
validate(false);
break;
lf_ret_t status = (*self->deserialize_hooks[msg->conn_id])(payload, msg->payload.bytes, msg->payload.size);

if (status == LF_OK) {
Event event = EVENT_INIT(tag, &input->super.super, payload);
ret = sched->schedule_at_locked(sched, &event);
switch (ret) {
case LF_AFTER_STOP_TAG:
LF_WARN(FED, "Tried scheduling event after stop tag. Dropping\n");
break;
case LF_PAST_TAG:
LF_WARN(FED, "Tried scheduling event to a past tag. Dropping\n");
break;
case LF_OK:
env->platform->new_async_event(env->platform);
break;
default:
LF_ERR(FED, "Unknown return value `%d` from schedule_at_locked\n", ret);
validate(false);
break;
}
} else {
LF_ERR(FED, "Cannot deserialize message from other Federate. Dropping\n");
}

if (lf_tag_compare(input->last_known_tag, tag) < 0) {
Expand All @@ -169,15 +178,31 @@ void FederatedConnectionBundle_msg_received_cb(FederatedConnectionBundle *self,
env->platform->leave_critical_section(env->platform);
}

lf_ret_t standard_deserialization(void *user_struct, const unsigned char *msg_buf, size_t msg_size) {
memcpy(user_struct, msg_buf, MIN(msg_size, 832));

return LF_OK;
}

size_t standard_serialization(const void *user_struct, size_t user_struct_size, unsigned char *msg_buf) {
memcpy(msg_buf, user_struct, MIN(user_struct_size, 832));

return MIN(user_struct_size, 832);
}

void FederatedConnectionBundle_ctor(FederatedConnectionBundle *self, Reactor *parent, NetworkChannel *net_channel,
FederatedInputConnection **inputs, size_t inputs_size,
FederatedOutputConnection **outputs, size_t outputs_size) {
FederatedInputConnection **inputs, deserialize_hook *deserialize_hooks,
size_t inputs_size, FederatedOutputConnection **outputs,
serialize_hook *serialize_hooks, size_t outputs_size) {
self->inputs = inputs;
self->inputs_size = inputs_size;
self->net_channel = net_channel;
self->outputs = outputs;
self->outputs_size = outputs_size;
self->parent = parent;
self->deserialize_hooks = deserialize_hooks;
self->serialize_hooks = serialize_hooks;

// Register callback function for message received.
self->net_channel->register_callback(self->net_channel, FederatedConnectionBundle_msg_received_cb, self);
}
2 changes: 0 additions & 2 deletions src/platform/posix/posix.c
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
#include "reactor-uc/platform/posix/posix.h"
#include "reactor-uc/logging.h"
#include <assert.h>
#include <errno.h>
#include <pthread.h>
#include <stdbool.h>
#include <string.h>
#include <time.h>

Expand Down
12 changes: 4 additions & 8 deletions src/platform/posix/tcp_ip_channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,7 @@
#include "reactor-uc/logging.h"

#include <arpa/inet.h>
#include <assert.h>
#include <errno.h>
#include <fcntl.h>
#include <nanopb/pb_decode.h>
#include <nanopb/pb_encode.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
Expand Down Expand Up @@ -115,7 +111,7 @@ lf_ret_t TcpIpChannel_send(NetworkChannel *untyped_self, TaggedMessage *message)

while (bytes_written < message_size && timeout > 0) {
LF_DEBUG(NET, "Sending %d bytes", message_size - bytes_written);
int bytes_send = send(socket, self->write_buffer + bytes_written, message_size - bytes_written, 0);
ssize_t bytes_send = send(socket, self->write_buffer + bytes_written, message_size - bytes_written, 0);
LF_DEBUG(NET, "%d bytes sent", bytes_send);

if (bytes_send < 0) {
Expand Down Expand Up @@ -149,13 +145,13 @@ TaggedMessage *TcpIpChannel_receive(NetworkChannel *untyped_self) {

// calculating the maximum amount of bytes we can read
int bytes_available = TCP_IP_CHANNEL_BUFFERSIZE - self->read_index;
int bytes_left = 0;
int bytes_left;
bool read_more = true;

while (read_more) {

// reading from socket
int bytes_read = recv(socket, self->read_buffer + self->read_index, bytes_available, 0);
ssize_t bytes_read = recv(socket, self->read_buffer + self->read_index, bytes_available, 0);

if (bytes_read < 0) {
LF_ERR(NET, "Error recv from socket %d", errno);
Expand Down Expand Up @@ -214,7 +210,7 @@ void TcpIpChannel_register_callback(NetworkChannel *untyped_self,
self->receive_callback = receive_callback;
self->federated_connection = conn;
memset(&self->receive_thread_stack, 0, TCP_IP_CHANNEL_RECV_THREAD_STACK_SIZE);
if (pthread_attr_init(&self->receive_thread_attr) < 0) {
if (pthread_attr_init(&self->receive_thread_attr) != 0) {
throw("pthread_attr_init failed");
}
/* TODO: RIOT posix-wrappers don't have pthread_attr_setstack yet */
Expand Down
2 changes: 0 additions & 2 deletions src/reaction.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
#include "reactor-uc/port.h"
#include "reactor-uc/trigger.h"

#include <assert.h>

static size_t calculate_input_port_level(Input *port);

size_t Reaction_get_level(Reaction *self) {
Expand Down
3 changes: 1 addition & 2 deletions src/scheduler.c
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#include "reactor-uc/scheduler.h"
#include "reactor-uc/environment.h"
#include "reactor-uc/logging.h"
#include "reactor-uc/reactor-uc.h"

// Private functions

Expand Down Expand Up @@ -147,7 +146,7 @@ void Scheduler_terminate(Scheduler *self) {

void Scheduler_run(Scheduler *self) {
Environment *env = self->env;
lf_ret_t res = 0;
lf_ret_t res;
bool do_shutdown = false;
bool non_terminating = self->keep_alive || env->has_async_events;
LF_INFO(SCHED, "Scheduler running with non_terminating=%d has_async_events=%d", non_terminating,
Expand Down
1 change: 0 additions & 1 deletion src/tag.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
*/

#include "reactor-uc/tag.h"
#include "reactor-uc/environment.h"

instant_t lf_time_add(instant_t time, interval_t interval) {
if (time == NEVER || interval == NEVER) {
Expand Down
2 changes: 0 additions & 2 deletions src/timer.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
#include "reactor-uc/environment.h"
#include "reactor-uc/logging.h"

#include <assert.h>

void Timer_prepare(Trigger *_self, Event *event) {
(void)event;
LF_DEBUG(TRIG, "Preparing timer %p", _self);
Expand Down
4 changes: 0 additions & 4 deletions src/trigger.c
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
#include "reactor-uc/trigger.h"
#include "reactor-uc/environment.h"
#include "reactor-uc/logging.h"
#include "reactor-uc/timer.h"

#include <assert.h>

void Trigger_ctor(Trigger *self, TriggerType type, Reactor *parent, EventPayloadPool *payload_pool,
void (*prepare)(Trigger *, Event *), void (*cleanup)(Trigger *)) {
Expand Down
Loading