Skip to content

Commit

Permalink
Update proto messages and have a start_tag_signal that is handled and…
Browse files Browse the repository at this point in the history
… used to distribute start tags
  • Loading branch information
erlingrj committed Oct 21, 2024
1 parent c0d75de commit 7a44b16
Show file tree
Hide file tree
Showing 11 changed files with 285 additions and 32 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ test: unit-test lf-test examples

# Generate protobuf code
proto:
python external/nanopb/generator/nanopb_generator.py -Iexternal/nanopb/generator/proto/ -Iexternal/proto -L'#include "nanopb/%s"' -Dexternal/proto message.proto
python3 external/nanopb/generator/nanopb_generator.py -Iexternal/nanopb/generator/proto/ -Iexternal/proto -L'#include "nanopb/%s"' -Dexternal/proto message.proto

# Build reactor-uc as a static library
lib:
Expand Down
17 changes: 17 additions & 0 deletions external/proto/message.pb.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,24 @@
PB_BIND(Tag, Tag, AUTO)


PB_BIND(StartTagSignal, StartTagSignal, AUTO)


PB_BIND(TaggedMessage, TaggedMessage, 2)


PB_BIND(RequestAbsentSignal, RequestAbsentSignal, AUTO)


PB_BIND(AbsentSignal, AbsentSignal, AUTO)


PB_BIND(ConditionalAbsentSignal, ConditionalAbsentSignal, AUTO)


PB_BIND(FederateMessage, FederateMessage, 2)





133 changes: 132 additions & 1 deletion external/proto/message.pb.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,36 +9,111 @@
#error Regenerate this file with the current version of nanopb generator.
#endif

/* Enum definitions */
typedef enum _MessageType {
MessageType_TAGGED_MESSAGE = 1,
MessageType_START_TAG_SIGNAL = 2,
MessageType_REQUEST_ABSENT_SIGNAL = 3,
MessageType_ABSENT_SIGNAL = 4,
MessageType_CONDITIONAL_ABSENT_SIGNAL = 5
} MessageType;

/* Struct definitions */
typedef struct _Tag {
int64_t time;
uint32_t microstep;
} Tag;

typedef struct _StartTagSignal {
Tag tag;
} StartTagSignal;

typedef PB_BYTES_ARRAY_T(832) TaggedMessage_payload_t;
typedef struct _TaggedMessage {
Tag tag;
int32_t conn_id;
TaggedMessage_payload_t payload;
} TaggedMessage;

typedef struct _RequestAbsentSignal {
Tag tag;
int32_t id;
} RequestAbsentSignal;

typedef struct _AbsentSignal {
Tag tag;
} AbsentSignal;

typedef struct _ConditionalAbsentSignal {
Tag tag;
int32_t id;
} ConditionalAbsentSignal;

typedef struct _FederateMessage {
MessageType type;
pb_size_t which_message;
union {
TaggedMessage tagged_message;
StartTagSignal start_tag_signal;
RequestAbsentSignal request_absent_signal;
AbsentSignal absent_signal;
ConditionalAbsentSignal conditional_absent_signal;
} message;
} FederateMessage;


#ifdef __cplusplus
extern "C" {
#endif

/* Helper constants for enums */
#define _MessageType_MIN MessageType_TAGGED_MESSAGE
#define _MessageType_MAX MessageType_CONDITIONAL_ABSENT_SIGNAL
#define _MessageType_ARRAYSIZE ((MessageType)(MessageType_CONDITIONAL_ABSENT_SIGNAL+1))







#define FederateMessage_type_ENUMTYPE MessageType


/* Initializer values for message structs */
#define Tag_init_default {0, 0}
#define StartTagSignal_init_default {Tag_init_default}
#define TaggedMessage_init_default {Tag_init_default, 0, {0, {0}}}
#define RequestAbsentSignal_init_default {Tag_init_default, 0}
#define AbsentSignal_init_default {Tag_init_default}
#define ConditionalAbsentSignal_init_default {Tag_init_default, 0}
#define FederateMessage_init_default {_MessageType_MIN, 0, {TaggedMessage_init_default}}
#define Tag_init_zero {0, 0}
#define StartTagSignal_init_zero {Tag_init_zero}
#define TaggedMessage_init_zero {Tag_init_zero, 0, {0, {0}}}
#define RequestAbsentSignal_init_zero {Tag_init_zero, 0}
#define AbsentSignal_init_zero {Tag_init_zero}
#define ConditionalAbsentSignal_init_zero {Tag_init_zero, 0}
#define FederateMessage_init_zero {_MessageType_MIN, 0, {TaggedMessage_init_zero}}

/* Field tags (for use in manual encoding/decoding) */
#define Tag_time_tag 1
#define Tag_microstep_tag 2
#define StartTagSignal_tag_tag 1
#define TaggedMessage_tag_tag 1
#define TaggedMessage_conn_id_tag 2
#define TaggedMessage_payload_tag 3
#define RequestAbsentSignal_tag_tag 1
#define RequestAbsentSignal_id_tag 2
#define AbsentSignal_tag_tag 1
#define ConditionalAbsentSignal_tag_tag 1
#define ConditionalAbsentSignal_id_tag 2
#define FederateMessage_type_tag 1
#define FederateMessage_tagged_message_tag 2
#define FederateMessage_start_tag_signal_tag 3
#define FederateMessage_request_absent_signal_tag 4
#define FederateMessage_absent_signal_tag 5
#define FederateMessage_conditional_absent_signal_tag 6

/* Struct field encoding specification for nanopb */
#define Tag_FIELDLIST(X, a) \
Expand All @@ -47,6 +122,12 @@ X(a, STATIC, REQUIRED, UINT32, microstep, 2)
#define Tag_CALLBACK NULL
#define Tag_DEFAULT NULL

#define StartTagSignal_FIELDLIST(X, a) \
X(a, STATIC, REQUIRED, MESSAGE, tag, 1)
#define StartTagSignal_CALLBACK NULL
#define StartTagSignal_DEFAULT NULL
#define StartTagSignal_tag_MSGTYPE Tag

#define TaggedMessage_FIELDLIST(X, a) \
X(a, STATIC, REQUIRED, MESSAGE, tag, 1) \
X(a, STATIC, REQUIRED, INT32, conn_id, 2) \
Expand All @@ -55,15 +136,65 @@ X(a, STATIC, REQUIRED, BYTES, payload, 3)
#define TaggedMessage_DEFAULT NULL
#define TaggedMessage_tag_MSGTYPE Tag

#define RequestAbsentSignal_FIELDLIST(X, a) \
X(a, STATIC, REQUIRED, MESSAGE, tag, 1) \
X(a, STATIC, REQUIRED, INT32, id, 2)
#define RequestAbsentSignal_CALLBACK NULL
#define RequestAbsentSignal_DEFAULT NULL
#define RequestAbsentSignal_tag_MSGTYPE Tag

#define AbsentSignal_FIELDLIST(X, a) \
X(a, STATIC, REQUIRED, MESSAGE, tag, 1)
#define AbsentSignal_CALLBACK NULL
#define AbsentSignal_DEFAULT NULL
#define AbsentSignal_tag_MSGTYPE Tag

#define ConditionalAbsentSignal_FIELDLIST(X, a) \
X(a, STATIC, REQUIRED, MESSAGE, tag, 1) \
X(a, STATIC, REQUIRED, INT32, id, 2)
#define ConditionalAbsentSignal_CALLBACK NULL
#define ConditionalAbsentSignal_DEFAULT NULL
#define ConditionalAbsentSignal_tag_MSGTYPE Tag

#define FederateMessage_FIELDLIST(X, a) \
X(a, STATIC, REQUIRED, UENUM, type, 1) \
X(a, STATIC, ONEOF, MESSAGE, (message,tagged_message,message.tagged_message), 2) \
X(a, STATIC, ONEOF, MESSAGE, (message,start_tag_signal,message.start_tag_signal), 3) \
X(a, STATIC, ONEOF, MESSAGE, (message,request_absent_signal,message.request_absent_signal), 4) \
X(a, STATIC, ONEOF, MESSAGE, (message,absent_signal,message.absent_signal), 5) \
X(a, STATIC, ONEOF, MESSAGE, (message,conditional_absent_signal,message.conditional_absent_signal), 6)
#define FederateMessage_CALLBACK NULL
#define FederateMessage_DEFAULT (const pb_byte_t*)"\x08\x01\x00"
#define FederateMessage_message_tagged_message_MSGTYPE TaggedMessage
#define FederateMessage_message_start_tag_signal_MSGTYPE StartTagSignal
#define FederateMessage_message_request_absent_signal_MSGTYPE RequestAbsentSignal
#define FederateMessage_message_absent_signal_MSGTYPE AbsentSignal
#define FederateMessage_message_conditional_absent_signal_MSGTYPE ConditionalAbsentSignal

extern const pb_msgdesc_t Tag_msg;
extern const pb_msgdesc_t StartTagSignal_msg;
extern const pb_msgdesc_t TaggedMessage_msg;
extern const pb_msgdesc_t RequestAbsentSignal_msg;
extern const pb_msgdesc_t AbsentSignal_msg;
extern const pb_msgdesc_t ConditionalAbsentSignal_msg;
extern const pb_msgdesc_t FederateMessage_msg;

/* Defines for backwards compatibility with code written before nanopb-0.4.0 */
#define Tag_fields &Tag_msg
#define StartTagSignal_fields &StartTagSignal_msg
#define TaggedMessage_fields &TaggedMessage_msg
#define RequestAbsentSignal_fields &RequestAbsentSignal_msg
#define AbsentSignal_fields &AbsentSignal_msg
#define ConditionalAbsentSignal_fields &ConditionalAbsentSignal_msg
#define FederateMessage_fields &FederateMessage_msg

/* Maximum encoded size of messages (where known) */
#define MESSAGE_PB_H_MAX_SIZE TaggedMessage_size
#define AbsentSignal_size 19
#define ConditionalAbsentSignal_size 30
#define FederateMessage_size 870
#define MESSAGE_PB_H_MAX_SIZE FederateMessage_size
#define RequestAbsentSignal_size 30
#define StartTagSignal_size 19
#define Tag_size 17
#define TaggedMessage_size 865

Expand Down
29 changes: 29 additions & 0 deletions external/proto/message.proto
Original file line number Diff line number Diff line change
@@ -1,12 +1,41 @@
import "nanopb.proto";

enum MessageType {
TAGGED_MESSAGE = 1; START_TAG_SIGNAL = 2; REQUEST_ABSENT_SIGNAL = 3; ABSENT_SIGNAL = 4; CONDITIONAL_ABSENT_SIGNAL = 5;
}

message Tag {
required int64 time = 1;
required uint32 microstep = 2;
}

message StartTagSignal { required Tag tag = 1; }

message TaggedMessage {
required Tag tag = 1;
required int32 conn_id = 2;
required bytes payload = 3 [(nanopb).max_size = 832];
}

message RequestAbsentSignal {
required Tag tag = 1;
required int32 id = 2;
}

message AbsentSignal { required Tag tag = 1; }

message ConditionalAbsentSignal {
required Tag tag = 1;
required int32 id = 2;
}

message FederateMessage {
required MessageType type = 1;
oneof message {
TaggedMessage tagged_message = 2;
StartTagSignal start_tag_signal = 3;
RequestAbsentSignal request_absent_signal = 4;
AbsentSignal absent_signal = 5;
ConditionalAbsentSignal conditional_absent_signal = 6;
}
}
8 changes: 4 additions & 4 deletions include/reactor-uc/encoding.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,23 @@
#include <nanopb/pb_decode.h>
#include <nanopb/pb_encode.h>

int encode_protobuf(const TaggedMessage *message, unsigned char *buffer, size_t buffer_size) {
int encode_protobuf(const FederateMessage *message, unsigned char *buffer, size_t buffer_size) {
// turing write buffer into pb_ostream buffer
pb_ostream_t stream_out = pb_ostream_from_buffer(buffer, buffer_size);

// serializing protobuf into buffer
if (!pb_encode(&stream_out, TaggedMessage_fields, message)) {
if (!pb_encode(&stream_out, FederateMessage_fields, message)) {
printf("protobuf encoding error %s\n", stream_out.errmsg);
return -1;
}

return (int)stream_out.bytes_written;
}

int decode_protobuf(TaggedMessage *message, const unsigned char *buffer, size_t buffer_size) {
int decode_protobuf(FederateMessage *message, const unsigned char *buffer, size_t buffer_size) {
pb_istream_t stream_in = pb_istream_from_buffer(buffer, buffer_size);

if (!pb_decode(&stream_in, TaggedMessage_fields, message)) {
if (!pb_decode(&stream_in, FederateMessage_fields, message)) {
return -1;
}

Expand Down
6 changes: 3 additions & 3 deletions include/reactor-uc/network_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ struct NetworkChannel {
bool (*accept)(NetworkChannel *self);
void (*close)(NetworkChannel *self);
void (*register_callback)(NetworkChannel *self,
void (*receive_callback)(FederatedConnectionBundle *conn, TaggedMessage *message),
void (*receive_callback)(FederatedConnectionBundle *conn, const FederateMessage *message),
FederatedConnectionBundle *conn);
lf_ret_t (*send)(NetworkChannel *self, TaggedMessage *message);
TaggedMessage *(*receive)(NetworkChannel *self);
lf_ret_t (*send)(NetworkChannel *self, const FederateMessage *message);
const FederateMessage *(*receive)(NetworkChannel *self);
void (*free)(NetworkChannel *self);
};

Expand Down
4 changes: 2 additions & 2 deletions include/reactor-uc/platform/posix/tcp_ip_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ struct TcpIpChannel {
unsigned short port;
int protocol_family;

TaggedMessage output;
FederateMessage output;
unsigned char write_buffer[TCP_IP_CHANNEL_BUFFERSIZE];
unsigned char read_buffer[TCP_IP_CHANNEL_BUFFERSIZE];
unsigned int read_index;
Expand All @@ -37,7 +37,7 @@ struct TcpIpChannel {
// required for callbacks
pthread_t receive_thread;
FederatedConnectionBundle *federated_connection;
void (*receive_callback)(FederatedConnectionBundle *conn, TaggedMessage *message);
void (*receive_callback)(FederatedConnectionBundle *conn, const FederateMessage *message);
};

void TcpIpChannel_ctor(TcpIpChannel *self, const char *host, unsigned short port, int protocol_family);
Expand Down
1 change: 1 addition & 0 deletions include/reactor-uc/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ struct Scheduler {
// that are registered for cleanup at the end of the current tag.
Trigger *cleanup_ll_head;
Trigger *cleanup_ll_tail;
bool leader; // Whether this scheduler is the leader in a federated program and selects the start tag.
instant_t start_time; // The physical time at which the program started.
tag_t stop_tag; // The tag at which the program should stop. This is set by the user or by the scheduler.
tag_t current_tag; // The current logical tag. Set by the scheduler and read by user in the reaction bodies.
Expand Down
Loading

0 comments on commit 7a44b16

Please sign in to comment.