Skip to content

Commit

Permalink
first jab at s4noc channel implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
tanneberger committed Jan 23, 2025
1 parent f0c75a1 commit 5e23862
Show file tree
Hide file tree
Showing 7 changed files with 198 additions and 12 deletions.
8 changes: 7 additions & 1 deletion include/reactor-uc/network_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ typedef enum {
typedef enum {
NETWORK_CHANNEL_TYPE_TCP_IP,
NETWORK_CHANNEL_TYPE_COAP_UDP_IP,
NETWORK_CHANNEL_TYPE_UART
NETWORK_CHANNEL_TYPE_UART,
NETWORK_CHANNEL_TYPE_S4NOC
} NetworkChannelType;

typedef enum {
Expand Down Expand Up @@ -151,6 +152,11 @@ struct AsyncNetworkChannel {
#error "NETWORK_POSIX_TCP not supported on FlexPRET"
#endif

#elif defined(PLATFORM_PATMOS)
#ifdef NETWORK_CHANNEL_S4NOC
#include "platform/patmos/s4noc_channel.h"
#endif

#else
#error "Platform not supported"
#endif
Expand Down
39 changes: 39 additions & 0 deletions include/reactor-uc/platform/patmos/s4noc_channel.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#ifndef REACTOR_UC_S4NOC_CHANNEL_H
#define REACTOR_UC_S4NOC_CHANNEL_H

#include "reactor-uc/network_channel.h"
#include "reactor-uc/environment.h"

typedef struct FederatedConnectionBundle FederatedConnectionBundle;
typedef struct S4NOCPollChannel S4NOCPollChannel;
typedef struct S4NOCGlobalState S4NOCGlobalState;

#define S4NOC_CHANNEL_BUFFERSIZE 1024

#ifndef S4NOC_CORE_COUNT
#define S4NOC_CORE_COUNT 4
#endif

struct S4NOCGlobalState {
S4NOCPollChannel *core_channels[S4NOC_CORE_COUNT][S4NOC_CORE_COUNT];
};

S4NOCGlobalState s4noc_global_state;

struct S4NOCPollChannel {
SyncNetworkChannel super;
NetworkChannelState state;

FederateMessage output;
unsigned char write_buffer[S4NOC_CHANNEL_BUFFERSIZE];
unsigned char receive_buffer[S4NOC_CHANNEL_BUFFERSIZE];
unsigned int receive_buffer_index;
unsigned int destination_core;

FederatedConnectionBundle *federated_connection;
void (*receive_callback)(FederatedConnectionBundle *conn, const FederateMessage *message);
};

void S4NOCPollChannel_ctor(S4NOCPollChannel *self, Environment *env, unsigned int destination_core);

#endif
3 changes: 3 additions & 0 deletions include/reactor-uc/serialization.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
int serialize_to_protobuf(const FederateMessage *message, unsigned char *buffer, size_t buffer_size);
int deserialize_from_protobuf(FederateMessage *message, const unsigned char *buffer, size_t buffer_size);

int serialize_message(const FederateMessage *message, unsigned char *buffer, size_t buffer_size);
int deserialize_message(FederateMessage *message, const unsigned char *buffer, size_t buffer_size);

lf_ret_t deserialize_payload_default(void *user_struct, const unsigned char *msg_buf, size_t msg_size);

size_t serialize_payload_default(const void *user_struct, size_t user_struct_size, unsigned char *msg_buf);
Expand Down
5 changes: 5 additions & 0 deletions src/network_channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@
#ifdef NETWORK_CHANNEL_TCP_POSIX
#error "NETWORK_POSIX_TCP not supported on Patmos"
#endif

#ifdef NETWORK_CHANNEL_S4NOC
#include "platform/patmos/s4noc_channel.c"
#endif

#else
#error "Platform not supported"
#endif
Expand Down
126 changes: 126 additions & 0 deletions src/platform/patmos/s4noc_channel.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
#include "reactor-uc/platform/patmos/s4noc_channel.h"
#include "reactor-uc/logging.h"
#include "reactor-uc/serialization.h"
#include "machine/patmos.h"
#include <arpa/inet.h>

#define S4NOC_CHANNEL_ERR(fmt, ...) LF_ERR(NET, "S4NOCPollChannel: " fmt, ##__VA_ARGS__)
#define S4NOC_CHANNEL_WARN(fmt, ...) LF_WARN(NET, "S4NOCPollChannel: " fmt, ##__VA_ARGS__)
#define S4NOC_CHANNEL_INFO(fmt, ...) LF_INFO(NET, "S4NOCPollChannel: " fmt, ##__VA_ARGS__)
#define S4NOC_CHANNEL_DEBUG(fmt, ...) LF_DEBUG(NET, "S4NOCPollChannel: " fmt, ##__VA_ARGS__)

static lf_ret_t S4NOCPollChannel_open_connection(NetworkChannel *untyped_self) {
S4NOC_CHANNEL_DEBUG("Open connection");
(void)untyped_self;
return LF_OK;
}

static void S4NOCPollChannel_close_connection(NetworkChannel *untyped_self) {
S4NOC_CHANNEL_DEBUG("Close connection");
(void)untyped_self;
}

static void S4NOCPollChannel_free(NetworkChannel *untyped_self) {
S4NOC_CHANNEL_DEBUG("Free");
(void)untyped_self;
}

static bool S4NOCPollChannel_is_connected(NetworkChannel *untyped_self) {
S4NOCPollChannel *self = (S4NOCPollChannel *)untyped_self;

return self->state == NETWORK_CHANNEL_STATE_CONNECTED;
}

static lf_ret_t S4NOCPollChannel_send_blocking(NetworkChannel *untyped_self, const FederateMessage *message) {
S4NOCPollChannel *self = (S4NOCPollChannel *)untyped_self;

volatile _IODEV int *s4noc_data = (volatile _IODEV int *)(PATMOS_IO_S4NOC + 4);
volatile _IODEV int *s4noc_dest = (volatile _IODEV int *)(PATMOS_IO_S4NOC + 8);

if (self->state == NETWORK_CHANNEL_STATE_CONNECTED) {
int message_size = serialize_message(message, self->write_buffer, S4NOC_CHANNEL_BUFFERSIZE);
S4NOC_CHANNEL_DEBUG("Sending Message of Size: %i", message_size);

*s4noc_dest = self->destination_core;
int bytes_send = 0;
while (bytes_send < message_size) {
*s4noc_data = ((int *)self->write_buffer)[bytes_send / 4];
bytes_send += 4;
}

return LF_OK;
} else {
return LF_ERR;
}
}

static void S4NOCPollChannel_register_receive_callback(NetworkChannel *untyped_self,
void (*receive_callback)(FederatedConnectionBundle *conn,
const FederateMessage *msg),
FederatedConnectionBundle *conn) {
S4NOC_CHANNEL_INFO("Register receive callback");
S4NOCPollChannel *self = (S4NOCPollChannel *)untyped_self;

self->receive_callback = receive_callback;
self->federated_connection = conn;
}

void S4NOCPollChannel_poll(NetworkChannel *untyped_self) {
S4NOCPollChannel *self = (S4NOCPollChannel *)untyped_self;

volatile _IODEV int *s4noc_status = (volatile _IODEV int *)PATMOS_IO_S4NOC;
volatile _IODEV int *s4noc_data = (volatile _IODEV int *)(PATMOS_IO_S4NOC + 4);
volatile _IODEV int *s4noc_source = (volatile _IODEV int *)(PATMOS_IO_S4NOC + 8);

if (((*s4noc_status) & 0x02) == 0) {
return;
}

int value = *s4noc_data;
int source = *s4noc_source;

S4NOCPollChannel *receive_channel = s4noc_global_state.core_channels[get_cpuid()][source];

((int *)receive_channel->receive_buffer)[receive_channel->receive_buffer_index / 4] = value;
receive_channel->receive_buffer_index += 4;

int expected_message_size = ntohl(*((int *)receive_channel->receive_buffer));
if (expected_message_size + 4 > receive_channel->receive_buffer_index) {
int bytes_left = deserialize_from_protobuf(&receive_channel->output, receive_channel->receive_buffer + 1,
self->receive_buffer_index - 4);
S4NOC_CHANNEL_DEBUG("Bytes Left after attempted to deserialize %d", bytes_left);

if (bytes_left >= 0) {
int receive_buffer_index = receive_channel->receive_buffer_index;
receive_channel->receive_buffer_index = bytes_left;

if (receive_channel->receive_callback != NULL) {
S4NOC_CHANNEL_DEBUG("calling user callback!");
receive_channel->receive_callback(self->federated_connection, &self->output);
}
}
}
}

void S4NOCPollChannel_ctor(S4NOCPollChannel *self, Environment *env, unsigned int destination_core) {
assert(self != NULL);
assert(env != NULL);

self->super.super.mode = NETWORK_CHANNEL_MODE_POLL;
self->super.super.expected_connect_duration = SEC(0);
self->super.super.type = NETWORK_CHANNEL_TYPE_S4NOC;
self->super.super.is_connected = S4NOCPollChannel_is_connected;
self->super.super.open_connection = S4NOCPollChannel_open_connection;
self->super.super.close_connection = S4NOCPollChannel_close_connection;
self->super.super.send_blocking = S4NOCPollChannel_send_blocking;
self->super.super.register_receive_callback = S4NOCPollChannel_register_receive_callback;
self->super.super.free = S4NOCPollChannel_free;
self->super.poll = S4NOCPollChannel_poll;

// Concrete fields
self->receive_buffer_index = 0;
self->receive_callback = NULL;
self->federated_connection = NULL;
self->state = NETWORK_CHANNEL_STATE_CONNECTED;
self->destination_core = destination_core;
}
22 changes: 11 additions & 11 deletions src/platform/riot/uart_channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ void _UARTSyncChannel_interrupt_callback(void *arg, uint8_t received_byte) {
UARTSyncChannel *self = (UARTSyncChannel *)arg;
const uint32_t minimum_message_size = 12;

self->receive_buffer[self->receive_buffer_index] = received_byte;
self->receive_buffer_index++;
self->read_buffer[self->read_index] = received_byte;
self->read_index++;

if (self->receive_buffer_index >= minimum_message_size) {
if (self->super.super.mode == NETWORK_CHANNEL_MODE_ASYNC) {
if (self->read_index >= minimum_message_size) {
if (self->super.super.category == NETWORK_CHANNEL_CATEGORY_ASYNC) {
cond_signal(&((UARTAsyncChannel *)self)->receive_cv);
}
}
Expand All @@ -73,14 +73,14 @@ void UARTSyncChannel_poll(NetworkChannel *untyped_self) {
UARTSyncChannel *self = (UARTSyncChannel *)untyped_self;
const uint32_t minimum_message_size = 12;

while (self->receive_buffer_index > minimum_message_size) {
int bytes_left = deserialize_from_protobuf(&self->output, self->receive_buffer, self->receive_buffer_index);
while (self->read_index > minimum_message_size) {
int bytes_left = deserialize_from_protobuf(&self->output, self->read_buffer, self->read_index);
UART_CHANNEL_DEBUG("Bytes Left after attempted to deserialize %d", bytes_left);

if (bytes_left >= 0) {
int receive_buffer_index = self->receive_buffer_index;
self->receive_buffer_index = bytes_left;
memcpy(self->receive_buffer, self->receive_buffer + (receive_buffer_index - bytes_left), bytes_left);
int read_index = self->read_index;
self->read_index = bytes_left;
memcpy(self->read_buffer, self->read_buffer + (read_index - bytes_left), bytes_left);

if (self->receive_callback != NULL) {
UART_CHANNEL_DEBUG("calling user callback!");
Expand Down Expand Up @@ -110,7 +110,7 @@ void UARTSyncChannel_ctor(UARTSyncChannel *self, Environment *env, uint32_t uart

self->uart_dev = UART_DEV(uart_device);

int result = uart_init(self->uart_dev, baud, _UARTSyncChannel_interrupt_callback, self);
int result = uart_init(self->uart_dev, baud, _UARTSyncChannel_e_callback, self);

if (result == -ENODEV) {
UART_CHANNEL_ERR("Invalid UART device!");
Expand Down Expand Up @@ -142,7 +142,7 @@ void UARTSyncChannel_ctor(UARTSyncChannel *self, Environment *env, uint32_t uart
self->super.poll = UARTSyncChannel_poll;

// Concrete fields
self->receive_buffer_index = 0;
self->read_index = 0;
self->receive_callback = NULL;
self->federated_connection = NULL;
self->state = NETWORK_CHANNEL_STATE_CONNECTED;
Expand Down
7 changes: 7 additions & 0 deletions src/serialization.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,10 @@ size_t serialize_payload_default(const void *user_struct, size_t user_struct_siz
memcpy(msg_buf, user_struct, MIN(user_struct_size, 832)); // TODO: 832 is a magic number
return MIN(user_struct_size, 832); // TODO: 832 is a magic number
}

int serialize_message(const FederateMessage *message, unsigned char *buffer, size_t buffer_size) {
int message_size = serialize_to_protobuf(message, buffer + 4, buffer_size);
buffer[0] = htonl(message_size);

return message_size + 4;
}

0 comments on commit 5e23862

Please sign in to comment.