Skip to content

Commit

Permalink
add functions to set serialize/deserialize functions
Browse files Browse the repository at this point in the history
  • Loading branch information
tanneberger committed Oct 24, 2024
1 parent 66fb853 commit a0c1d75
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 7 deletions.
4 changes: 3 additions & 1 deletion examples/posix/testing_tcp_ip_channel_server_callback.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
#include <unistd.h>
TcpIpChannel channel;

void callback_handler(FederatedConnectionBundle *self, TaggedMessage *msg) {
void callback_handler(FederatedConnectionBundle *self, void *raw_msg) {
TaggedMessage* msg = raw_msg;

printf("Received message with connection number %i and content %s\n", msg->conn_id, (char *)msg->payload.bytes);
channel.super.send(&channel.super, msg);
}
Expand Down
8 changes: 6 additions & 2 deletions include/reactor-uc/encoding.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
#include <nanopb/pb_decode.h>
#include <nanopb/pb_encode.h>

int encode_protobuf(const TaggedMessage *message, unsigned char *buffer, size_t buffer_size) {
int encode_protobuf(const void *raw_message, unsigned char *buffer, size_t buffer_size) {
const TaggedMessage *message = raw_message;

// turing write buffer into pb_ostream buffer
pb_ostream_t stream_out = pb_ostream_from_buffer(buffer, buffer_size);

Expand All @@ -17,7 +19,9 @@ int encode_protobuf(const TaggedMessage *message, unsigned char *buffer, size_t
return (int)stream_out.bytes_written;
}

int decode_protobuf(TaggedMessage *message, const unsigned char *buffer, size_t buffer_size) {
int decode_protobuf(void *raw_message, const unsigned char *buffer, size_t buffer_size) {
TaggedMessage *message = raw_message;

pb_istream_t stream_in = pb_istream_from_buffer(buffer, buffer_size);

if (!pb_decode(&stream_in, TaggedMessage_fields, message)) {
Expand Down
8 changes: 7 additions & 1 deletion include/reactor-uc/network_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,23 @@
typedef struct FederatedConnectionBundle FederatedConnectionBundle;
typedef struct NetworkChannel NetworkChannel;

typedef int (*encode_message_hook)(const void *message, unsigned char *buffer, size_t buffer_size);
typedef int (*decode_message_hook)(void *message, const unsigned char *buffer, size_t buffer_size);

struct NetworkChannel {
lf_ret_t (*bind)(NetworkChannel *self);
lf_ret_t (*connect)(NetworkChannel *self);
bool (*accept)(NetworkChannel *self);
void (*close)(NetworkChannel *self);
void (*register_callback)(NetworkChannel *self,
void (*receive_callback)(FederatedConnectionBundle *conn, TaggedMessage *message),
void (*receive_callback)(FederatedConnectionBundle *conn, void *message),
FederatedConnectionBundle *conn);
lf_ret_t (*send)(NetworkChannel *self, TaggedMessage *message);
TaggedMessage *(*receive)(NetworkChannel *self);
void (*free)(NetworkChannel *self);

void (*register_encode_hook)(NetworkChannel *self, encode_message_hook encode_hook);
void (*register_decode_hook)(NetworkChannel *self, decode_message_hook decode_hook);
};

#endif // REACTOR_UC_NETWORK_CHANNEL_H
5 changes: 4 additions & 1 deletion include/reactor-uc/platform/posix/tcp_ip_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ struct TcpIpChannel {
char receive_thread_stack[TCP_IP_CHANNEL_RECV_THREAD_STACK_SIZE];

FederatedConnectionBundle *federated_connection;
void (*receive_callback)(FederatedConnectionBundle *conn, TaggedMessage *message);
void (*receive_callback)(FederatedConnectionBundle *conn, void *message);

decode_message_hook decode_hook;
encode_message_hook encode_hook;
};

void TcpIpChannel_ctor(TcpIpChannel *self, const char *host, unsigned short port, int protocol_family);
Expand Down
4 changes: 3 additions & 1 deletion src/federated.c
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,9 @@ void FederatedInputConnection_ctor(FederatedInputConnection *self, Reactor *pare

// Callback registered with the NetworkChannel. Is called asynchronously when there is a
// a TaggedMessage available.
void FederatedConnectionBundle_msg_received_cb(FederatedConnectionBundle *self, TaggedMessage *msg) {
void FederatedConnectionBundle_msg_received_cb(FederatedConnectionBundle *self, void *raw_msg) {
TaggedMessage *msg = raw_msg;

LF_DEBUG(FED, "Callback on FedConnBundle %p for message with tag=%" PRId64 ":%" PRIu32, self, msg->tag.time,
msg->tag.microstep);
assert(((size_t)msg->conn_id) < self->inputs_size);
Expand Down
14 changes: 13 additions & 1 deletion src/platform/posix/tcp_ip_channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ void *TcpIpChannel_receive_thread(void *untyped_self) {
}

void TcpIpChannel_register_callback(NetworkChannel *untyped_self,
void (*receive_callback)(FederatedConnectionBundle *conn, TaggedMessage *msg),
void (*receive_callback)(FederatedConnectionBundle *conn, void *msg),
FederatedConnectionBundle *conn) {
int res;
LF_INFO(NET, "TCP/IP registering callback thread");
Expand Down Expand Up @@ -238,6 +238,14 @@ void TcpIpChannel_register_callback(NetworkChannel *untyped_self,
}
}

void TcpIpChannel_register_decode_hook(NetworkChannel *self, decode_message_hook hook) {
((TcpIpChannel *)self)->decode_hook = hook;
}

void TcpIpChannel_register_encode_hook(NetworkChannel *self, encode_message_hook hook) {
((TcpIpChannel *)self)->encode_hook = hook;
}

void TcpIpChannel_ctor(TcpIpChannel *self, const char *host, unsigned short port, int protocol_family) {
FD_ZERO(&self->set);

Expand All @@ -260,9 +268,13 @@ void TcpIpChannel_ctor(TcpIpChannel *self, const char *host, unsigned short port
self->super.receive = TcpIpChannel_receive;
self->super.send = TcpIpChannel_send;
self->super.register_callback = TcpIpChannel_register_callback;
self->super.register_decode_hook = TcpIpChannel_register_decode_hook;
self->super.register_encode_hook = TcpIpChannel_register_encode_hook;
self->super.free = TcpIpChannel_free;
self->receive_callback = NULL;
self->federated_connection = NULL;
self->decode_hook = decode_protobuf;
self->encode_hook = encode_protobuf;
}

void TcpIpChannel_free(NetworkChannel *untyped_self) {
Expand Down

0 comments on commit a0c1d75

Please sign in to comment.