Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shutdown #98

Merged
merged 8 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 108 additions & 0 deletions examples/posix/testing_fed_conn_receiver.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
#include "reactor-uc/platform/posix/tcp_ip_channel.h"
#include "reactor-uc/reactor-uc.h"

#include <pthread.h>
#include <sys/socket.h>

#define PORT_NUM 8901

typedef struct {
int size;
char msg[512];
} msg_t;

lf_ret_t deserialize_msg_t(void *user_struct, const unsigned char *msg_buf, size_t msg_size) {
msg_t *msg = user_struct;
memcpy(&msg->size, msg_buf, sizeof(msg->size));
memcpy(msg->msg, msg_buf + sizeof(msg->size), sizeof(msg->size));

return LF_OK;
}

DEFINE_REACTION_STRUCT(Receiver, 0, 1)
DEFINE_INPUT_PORT_STRUCT(In, 1, msg_t, 0)
DEFINE_INPUT_PORT_CTOR(In, 1, msg_t, 0)

typedef struct {
Reactor super;
Receiver_Reaction0 reaction;
In inp;
int cnt;
Reaction *_reactions[1];
Trigger *_triggers[1];
} Receiver;

DEFINE_REACTION_BODY(Receiver, 0) {
Receiver *self = (Receiver *)_self->parent;
Environment *env = self->super.env;
In *inp = &self->inp;
printf("Input triggered @ %" PRId64 " with %s\n", env->get_elapsed_logical_time(env), inp->value.msg);
}
DEFINE_REACTION_CTOR(Receiver, 0)

void Receiver_ctor(Receiver *self, Reactor *parent, Environment *env) {
self->_reactions[0] = (Reaction *)&self->reaction;
self->_triggers[0] = (Trigger *)&self->inp;
Reactor_ctor(&self->super, "Receiver", env, parent, NULL, 0, self->_reactions, 1, self->_triggers, 1);
Receiver_Reaction0_ctor(&self->reaction, &self->super);
In_ctor(&self->inp, &self->super);

// Register reaction as an effect of in
INPUT_REGISTER_EFFECT(self->inp, self->reaction);
}

DEFINE_FEDERATED_INPUT_CONNECTION(ConnRecv, 1, msg_t, 5, MSEC(100), false)

typedef struct {
FederatedConnectionBundle super;
TcpIpChannel channel;
ConnRecv conn;
FederatedInputConnection *inputs[1];
deserialize_hook deserialize_hooks[1];
} RecvSenderBundle;

void RecvSenderBundle_ctor(RecvSenderBundle *self, Reactor *parent) {
ConnRecv_ctor(&self->conn, parent);
TcpIpChannel_ctor(&self->channel, "127.0.0.1", PORT_NUM, AF_INET);
self->inputs[0] = &self->conn.super;

NetworkChannel *channel = (NetworkChannel *)&self->channel;

lf_ret_t ret;
do {
ret = channel->connect(channel);
} while (ret != LF_OK);
validate(ret == LF_OK);
printf("Recv: Connected\n");
self->deserialize_hooks[0] = deserialize_msg_t;

FederatedConnectionBundle_ctor(&self->super, parent, &self->channel.super, (FederatedInputConnection **)&self->inputs,
self->deserialize_hooks, 1, NULL, NULL, 0);
}

typedef struct {
Reactor super;
Receiver receiver;
RecvSenderBundle bundle;

FederatedConnectionBundle *_bundles[1];
Reactor *_children[1];
} MainRecv;

void MainRecv_ctor(MainRecv *self, Environment *env) {
self->_children[0] = &self->receiver.super;
Receiver_ctor(&self->receiver, &self->super, env);

RecvSenderBundle_ctor(&self->bundle, &self->super);
self->_bundles[0] = &self->bundle.super;

CONN_REGISTER_DOWNSTREAM(self->bundle.conn, self->receiver.inp);
Reactor_ctor(&self->super, "MainRecv", env, NULL, self->_children, 1, NULL, 0, NULL, 0);
}

ENTRY_POINT_FEDERATED(MainRecv, SEC(1), true, true, 1)

int main() {
lf_start();
return 0;
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "reactor-uc/platform/posix/tcp_ip_channel.h"
#include "reactor-uc/reactor-uc.h"

#include <errno.h>
#include <pthread.h>
#include <sys/socket.h>

Expand All @@ -11,24 +12,15 @@ typedef struct {
char msg[512];
} msg_t;

lf_ret_t deserialize_msg_t(void *user_struct, const unsigned char *msg_buf, size_t msg_size) {
msg_t* msg = user_struct;
memcpy(&msg->size, msg_buf, sizeof(msg->size));
memcpy(msg->msg, msg_buf + sizeof(msg->size), sizeof(msg->size));

return LF_OK;
}

size_t serialize_msg_t(const void *user_struct, size_t user_struct_size, unsigned char *msg_buf) {
const msg_t* msg = user_struct;
const msg_t *msg = user_struct;

memcpy(msg_buf, &msg->size, sizeof(msg->size));
memcpy(msg_buf + sizeof(msg->size), msg->msg, sizeof(msg->size));

return sizeof(msg->size) + msg->size;
}


DEFINE_TIMER_STRUCT(Timer1, 1)
DEFINE_TIMER_CTOR_FIXED(Timer1, 1, MSEC(0), SEC(1))
DEFINE_REACTION_STRUCT(Sender, 0, 1)
Expand Down Expand Up @@ -57,7 +49,7 @@ DEFINE_REACTION_BODY(Sender, 0) {
}
DEFINE_REACTION_CTOR(Sender, 0)

void Sender_ctor(Sender *self, Reactor *parent, Environment *env, Connection** conn_out, size_t conn_out_num) {
void Sender_ctor(Sender *self, Reactor *parent, Environment *env, Connection **conn_out, size_t conn_out_num) {
self->_reactions[0] = (Reaction *)&self->reaction;
self->_triggers[0] = (Trigger *)&self->timer;
Reactor_ctor(&self->super, "Sender", env, parent, NULL, 0, self->_reactions, 1, self->_triggers, 1);
Expand Down Expand Up @@ -87,7 +79,8 @@ DEFINE_REACTION_BODY(Receiver, 0) {
Receiver *self = (Receiver *)_self->parent;
Environment *env = self->super.env;
In *inp = &self->inp;
printf("Input triggered @ %" PRId64 " with %s size %i\n", env->get_elapsed_logical_time(env), inp->value.msg, inp->value.size);
printf("Input triggered @ %" PRId64 " with %s size %i\n", env->get_elapsed_logical_time(env), inp->value.msg,
inp->value.size);
}
DEFINE_REACTION_CTOR(Receiver, 0)

Expand Down Expand Up @@ -119,6 +112,10 @@ void SenderRecvConn_ctor(SenderRecvBundle *self, Sender *parent) {

NetworkChannel *channel = (NetworkChannel *)&self->channel;
int ret = channel->bind(channel);
if (ret != LF_OK) {
printf("bind failed with %d\n", errno);
exit(1);
}
validate(ret == LF_OK);
printf("Sender: Bound\n");

Expand All @@ -129,44 +126,11 @@ void SenderRecvConn_ctor(SenderRecvBundle *self, Sender *parent) {

self->serialize_hooks[0] = serialize_msg_t;

FederatedConnectionBundle_ctor(&self->super, &parent->super, &self->channel.super,
NULL, NULL, 0,
FederatedConnectionBundle_ctor(&self->super, &parent->super, &self->channel.super, NULL, NULL, 0,
(FederatedOutputConnection **)&self->output, self->serialize_hooks, 1);
}

DEFINE_FEDERATED_INPUT_CONNECTION(ConnRecv, 1, msg_t, 5, MSEC(100), false)

typedef struct {
FederatedConnectionBundle super;
TcpIpChannel channel;
ConnRecv conn;
FederatedInputConnection *inputs[1];
deserialize_hook deserialize_hooks[1];
} RecvSenderBundle;

void RecvSenderBundle_ctor(RecvSenderBundle *self, Reactor *parent) {
ConnRecv_ctor(&self->conn, parent);
TcpIpChannel_ctor(&self->channel, "127.0.0.1", PORT_NUM, AF_INET);
self->inputs[0] = &self->conn.super;

NetworkChannel *channel = (NetworkChannel *)&self->channel;

lf_ret_t ret;
do {
ret = channel->connect(channel);
} while (ret != LF_OK);
validate(ret == LF_OK);
printf("Recv: Connected\n");

self->deserialize_hooks[0] = deserialize_msg_t;

FederatedConnectionBundle_ctor(&self->super, parent, &self->channel.super,
(FederatedInputConnection **)&self->inputs, self->deserialize_hooks, 1,
NULL, NULL, 0);
}

// Reactor main

typedef struct {
Reactor super;
Sender sender;
Expand All @@ -178,15 +142,6 @@ typedef struct {
Connection *_conn_sender_out[1];
} MainSender;

typedef struct {
Reactor super;
Receiver receiver;
RecvSenderBundle bundle;

FederatedConnectionBundle *_bundles[1];
Reactor *_children[1];
} MainRecv;

void MainSender_ctor(MainSender *self, Environment *env) {
self->_children[0] = &self->sender.super;
Sender_ctor(&self->sender, &self->super, env, self->_conn_sender_out, 1);
Expand All @@ -196,60 +151,9 @@ void MainSender_ctor(MainSender *self, Environment *env) {
CONN_REGISTER_UPSTREAM(self->bundle.conn, self->sender.out);
Reactor_ctor(&self->super, "MainSender", env, NULL, self->_children, 1, NULL, 0, NULL, 0);
}

void MainRecv_ctor(MainRecv *self, Environment *env) {
self->_children[0] = &self->receiver.super;
Receiver_ctor(&self->receiver, &self->super, env);

RecvSenderBundle_ctor(&self->bundle, &self->super);
self->_bundles[0] = &self->bundle.super;

CONN_REGISTER_DOWNSTREAM(self->bundle.conn, self->receiver.inp);
Reactor_ctor(&self->super, "MainRecv", env, NULL, self->_children, 1, NULL, 0, NULL, 0);
}

ENTRY_POINT_FEDERATED(MainSender, SEC(1), true, false, 1)
ENTRY_POINT_FEDERATED(MainRecv, SEC(1), true, true, 1)

void *recv_thread(void *unused) {
(void)unused;
lf_MainRecv_start();
return NULL;
}
void *sender_thread(void *unused) {
(void)unused;
lf_MainSender_start();
return NULL;
}

void lf_exit(void) {
Environment_free(&MainSender_env);
Environment_free(&MainRecv_env);
}

int main() {
pthread_t thread1;
pthread_t thread2;
if (atexit(lf_exit) != 0) {
validate(false);
}

// Create the first thread running func1
if (pthread_create(&thread1, NULL, recv_thread, NULL)) {
printf("Error creating thread 1\n");
return 1;
}

// Create the second thread running func2
if (pthread_create(&thread2, NULL, sender_thread, NULL)) {
printf("Error creating thread 2\n");
return 1;
}

// Wait for both threads to finish
pthread_join(thread1, NULL);
pthread_join(thread2, NULL);

printf("Both threads have finished\n");
lf_start();
return 0;
}
2 changes: 2 additions & 0 deletions include/reactor-uc/environment.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ struct Environment {

void (*enter_critical_section)(Environment *self);
void (*leave_critical_section)(Environment *self);

void (*request_shutdown)(Environment *self);
};

void Environment_ctor(Environment *self, Reactor *main);
Expand Down
1 change: 0 additions & 1 deletion include/reactor-uc/logging.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

// Add color codes to the output
#define LF_COLORIZE_LOGS 1
#define LF_LOG_LEVEL_ALL LF_LOG_LEVEL_ERR

// The default log level for any unspecified module
#ifndef LF_LOG_LEVEL_ALL
Expand Down
34 changes: 18 additions & 16 deletions include/reactor-uc/macros.h
Original file line number Diff line number Diff line change
Expand Up @@ -257,32 +257,34 @@ typedef struct FederatedInputConnection FederatedInputConnection;
#define ENTRY_POINT(MainReactorName, Timeout, KeepAlive) \
MainReactorName main_reactor; \
Environment env; \
void lf_exit(void) { Environment_free(&env); } \
void lf_start() { \
Environment_ctor(&env, (Reactor *)&main_reactor); \
MainReactorName##_ctor(&main_reactor, &env); \
env.scheduler.set_timeout(&env.scheduler, Timeout); \
env.assemble(&env); \
env.scheduler.keep_alive = KeepAlive; \
env.start(&env); \
lf_exit(); \
}

#define ENTRY_POINT_FEDERATED(FederateName, Timeout, KeepAlive, HasInputs, NumBundles) \
FederateName FederateName##_main; \
Environment FederateName##_env; \
void lf_##FederateName##_start() { \
Environment *env = &FederateName##_env; \
FederateName *main = &FederateName##_main; \
Environment_ctor(env, (Reactor *)main); \
env->scheduler.set_timeout(&env->scheduler, Timeout); \
env->scheduler.keep_alive = KeepAlive; \
env->has_async_events = HasInputs; \
env->enter_critical_section(env); \
FederateName##_ctor(main, env); \
env->net_bundles_size = NumBundles; \
env->net_bundles = (FederatedConnectionBundle **)&main->_bundles; \
env->assemble(env); \
env->leave_critical_section(env); \
env->start(env); \
FederateName main_reactor; \
Environment env; \
void lf_exit(void) { Environment_free(&env); } \
void lf_start() { \
Environment_ctor(&env, (Reactor *)&main_reactor); \
env.scheduler.set_timeout(&env.scheduler, Timeout); \
env.scheduler.keep_alive = KeepAlive; \
env.has_async_events = HasInputs; \
env.enter_critical_section(&env); \
FederateName##_ctor(&main_reactor, &env); \
env.net_bundles_size = NumBundles; \
env.net_bundles = (FederatedConnectionBundle **)&main_reactor._bundles; \
env.assemble(&env); \
env.leave_critical_section(&env); \
env.start(&env); \
lf_exit(); \
}

// TODO: The following macro is defined to avoid compiler warnings. Ideally we would
Expand Down
4 changes: 3 additions & 1 deletion include/reactor-uc/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ struct Scheduler {
/**
* @brief Called to execute all reactions triggered by a shutdown trigger.
*/
void (*terminate)(Scheduler *self);
void (*do_shutdown)(Scheduler *self, tag_t stop_tag);

void (*request_shutdown)(Scheduler *self);

/**
* @brief Set the stop tag of the program based on a timeout duration.
Expand Down
3 changes: 3 additions & 0 deletions src/environment.c
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ void Environment_leave_critical_section(Environment *self) {
}
}

void Environment_request_shutdown(Environment *self) { self->scheduler.request_shutdown(&self->scheduler); }

void Environment_ctor(Environment *self, Reactor *main) {
self->main = main;
self->platform = Platform_new();
Expand All @@ -72,6 +74,7 @@ void Environment_ctor(Environment *self, Reactor *main) {
self->get_elapsed_physical_time = Environment_get_elapsed_physical_time;
self->leave_critical_section = Environment_leave_critical_section;
self->enter_critical_section = Environment_enter_critical_section;
self->request_shutdown = Environment_request_shutdown;
self->has_async_events = false;
self->startup = NULL;
self->shutdown = NULL;
Expand Down
Loading
Loading