From bba5cd01ebac27685250c4f4d6aaf96fa483f6c2 Mon Sep 17 00:00:00 2001 From: Tassilo Tanneberger Date: Fri, 18 Oct 2024 00:59:37 +0200 Subject: [PATCH 1/2] Macro Madness (#69) * first wave of macro magic * Macro Reactions * adding physical and logical actions * fixing problem with actions * clang format * merging construct and define * unifing api of timers * default entry point * new way to express reactions --- include/reactor-uc/macros.h | 157 ++++++++++++++++++++++++++++++ test/unit/action_microstep_test.c | 60 +++--------- test/unit/action_test.c | 57 +++-------- test/unit/delayed_conn_test.c | 109 ++++++--------------- test/unit/physical_action_test.c | 103 +++++--------------- test/unit/port_test.c | 110 +++++++-------------- test/unit/shutdown_test.c | 77 +++++---------- test/unit/startup_test.c | 43 +++----- test/unit/timer_test.c | 32 ++---- 9 files changed, 322 insertions(+), 426 deletions(-) diff --git a/include/reactor-uc/macros.h b/include/reactor-uc/macros.h index c224c200..93c2e077 100644 --- a/include/reactor-uc/macros.h +++ b/include/reactor-uc/macros.h @@ -90,6 +90,163 @@ ((Connection *)&(conn))->upstream = (Port *)&(up); \ } while (0) +// Convenience macro to register upstream and downstream on a connection +#define CONNECT(ConnectionVariable, SourcePort, DestinationPort) \ + CONN_REGISTER_UPSTREAM(ConnectionVariable, SourcePort); \ + CONN_REGISTER_DOWNSTREAM(ConnectionVariable, DestinationPort) + +typedef struct Output Output; + +#define DEFINE_OUTPUT_PORT(PortName, SourceSize) \ + typedef struct { \ + Output super; \ + Reaction *sources[(SourceSize)]; \ + } PortName; \ + \ + void PortName##_ctor(PortName *self, Reactor *parent) { \ + Output_ctor(&self->super, parent, self->sources, SourceSize); \ + } + +typedef struct Input Input; + +#define DEFINE_INPUT_PORT(PortName, EffectSize, BufferType, BufferSize) \ + typedef struct { \ + Input super; \ + Reaction *effects[(EffectSize)]; \ + BufferType buffer[(BufferSize)]; \ + } PortName; \ + \ + void PortName##_ctor(PortName *self, Reactor *parent) { \ + Input_ctor(&self->super, parent, self->effects, (EffectSize), self->buffer, sizeof(self->buffer[0])); \ + } + +typedef struct Timer Timer; + +#define DEFINE_TIMER(TimerName, EffectSize, Offset, Period) \ + typedef struct { \ + Timer super; \ + Reaction *effects[(EffectSize)]; \ + } TimerName; \ + \ + void TimerName##_ctor(TimerName *self, Reactor *parent) { \ + Timer_ctor(&self->super, parent, Offset, Period, self->effects, EffectSize); \ + } + +typedef struct Reaction Reaction; + +#define DEFINE_REACTION(ReactorName, ReactionIndex, EffectSize) \ + typedef struct { \ + Reaction super; \ + Trigger *effects[(EffectSize)]; \ + } ReactorName##_##ReactionIndex; + +#define REACTION_BODY(ReactorName, ReactionIndex, ReactionBody) \ + void ReactorName##_body_##ReactionIndex(Reaction *_self) { \ + ReactorName *self = (ReactorName *)_self->parent; \ + Environment *env = self->super.env; \ + ReactionBody \ + } \ + void ReactorName##_##ReactionIndex##_ctor(ReactorName##_##ReactionIndex *self, Reactor *parent) { \ + Reaction_ctor(&self->super, parent, ReactorName##_body_##ReactionIndex, self->effects, \ + sizeof(self->effects) / sizeof(self->effects[0]), ReactionIndex); \ + } + +typedef struct Startup Startup; + +#define DEFINE_STARTUP(StartupName, EffectSize) \ + typedef struct { \ + Startup super; \ + Reaction *effects[(EffectSize)]; \ + } StartupName; \ + \ + void StartupName##_ctor(StartupName *self, Reactor *parent) { \ + Startup_ctor(&self->super, parent, self->effects, sizeof(self->effects) / sizeof(self->effects[0])); \ + } + +typedef struct Shutdown Shutdown; + +#define DEFINE_SHUTDOWN(ShutdownName, EffectSize) \ + typedef struct { \ + Shutdown super; \ + Reaction *effects[(EffectSize)]; \ + } ShutdownName; \ + \ + void ShutdownName##_ctor(ShutdownName *self, Reactor *parent) { \ + Shutdown_ctor(&self->super, parent, self->effects, sizeof(self->effects) / sizeof(self->effects[0])); \ + } + +typedef struct LogicalAction LogicalAction; + +#define DEFINE_LOGICAL_ACTION(ActionName, EffectSize, SourceSize, BufferTyp, BufferSize, Offset, Spacing) \ + typedef struct { \ + LogicalAction super; \ + BufferTyp buffer[(BufferSize) + 1]; \ + Reaction *sources[(SourceSize)]; \ + Reaction *effects[(EffectSize)]; \ + } ActionName; \ + \ + void ActionName##_ctor(ActionName *self, Reactor *parent) { \ + LogicalAction_ctor(&self->super, Offset, Spacing, parent, self->sources, \ + sizeof(self->sources) / sizeof(self->sources[0]), self->effects, \ + sizeof(self->effects) / sizeof(self->effects[0]), &self->buffer, sizeof(self->buffer[0]), \ + sizeof(self->buffer) / sizeof(self->buffer[0])); \ + } + +typedef struct PhysicalAction PhysicalAction; + +#define DEFINE_PHYSICAL_ACTION(ActionName, EffectSize, SourceSize, BufferTyp, BufferSize, Offset, Spacing) \ + typedef struct { \ + PhysicalAction super; \ + BufferTyp buffer[(BufferSize)]; \ + Reaction *sources[(SourceSize)]; \ + Reaction *effects[(EffectSize)]; \ + } ActionName; \ + \ + void ActionName##_ctor(ActionName *self, Reactor *parent) { \ + PhysicalAction_ctor(&self->super, Offset, Spacing, parent, self->sources, \ + sizeof(self->sources) / sizeof(self->sources[0]), self->effects, \ + sizeof(self->effects) / sizeof(self->effects[0]), &self->buffer, sizeof(self->buffer[0]), \ + sizeof(self->buffer) / sizeof(self->buffer[0])); \ + } + +typedef struct LogicalConnection LogicalConnection; + +#define DEFINE_LOGICAL_CONNECTION(ConnectionName, DownstreamSize) \ + typedef struct { \ + LogicalConnection super; \ + Input *downstreams[(DownstreamSize)]; \ + } ConnectionName; \ + \ + void ConnectionName##_ctor(ConnectionName *self, Reactor *parent) { \ + LogicalConnection_ctor(&self->super, parent, (Port **)self->downstreams, \ + sizeof(self->downstreams) / sizeof(self->downstreams[0])); \ + } + +typedef struct DelayedConnection DelayedConnection; + +#define DEFINE_DELAYED_CONNECTION(ConnectionName, DownstreamSize, BufferType, BufferSize, Delay) \ + typedef struct { \ + DelayedConnection super; \ + BufferType buffer[(BufferSize)]; \ + Input *downstreams[(BufferSize)]; \ + } ConnectionName; \ + \ + void ConnectionName##_ctor(ConnectionName *self, Reactor *parent) { \ + DelayedConnection_ctor(&self->super, parent, (Port **)self->downstreams, \ + sizeof(self->downstreams) / sizeof(self->downstreams[0]), Delay, self->buffer, \ + sizeof(self->buffer[0]), sizeof(self->buffer) / sizeof(self->buffer[0])); \ + } + +#define ENTRY_POINT(MainReactorName) \ + void lf_start() { \ + MainReactorName main_reactor; \ + Environment env; \ + Environment_ctor(&env, (Reactor *)&main_reactor); \ + MyReactor_ctor(&main_reactor, &env); \ + env.assemble(&env); \ + env.start(&env); \ + } + // TODO: The following macro is defined to avoid compiler warnings. Ideally we would // not have to specify any alignment on any structs. It is a TODO to understand exactly why // the compiler complains and what we can do about it. diff --git a/test/unit/action_microstep_test.c b/test/unit/action_microstep_test.c index 32e297dd..ce876289 100644 --- a/test/unit/action_microstep_test.c +++ b/test/unit/action_microstep_test.c @@ -1,50 +1,23 @@ #include "reactor-uc/reactor-uc.h" #include "unity.h" -typedef struct { - LogicalAction super; - int buffer[1]; - - Reaction *sources[1]; - Reaction *effects[1]; -} MyAction; - -typedef struct MyStartup MyStartup; - -struct MyStartup { - Startup super; - Reaction *effects_[1]; -}; +DEFINE_LOGICAL_ACTION(MyAction, 1, 1, int, 1, MSEC(0), MSEC(0)); +DEFINE_STARTUP(MyStartup, 1); +DEFINE_REACTION(MyReactor, 0, 1); typedef struct { - Reaction super; - Trigger *effects[1]; -} MyReaction; - -struct MyReactor { Reactor super; - MyReaction my_reaction; + MyReactor_0 my_reaction; MyAction my_action; MyStartup startup; Reaction *_reactions[1]; Trigger *_triggers[2]; int cnt; -}; - -void MyAction_ctor(MyAction *self, struct MyReactor *parent) { - LogicalAction_ctor(&self->super, MSEC(0), MSEC(0), &parent->super, self->sources, 1, self->effects, 1, &self->buffer, - sizeof(self->buffer[0]), 2); -} +} MyReactor ; -void MyStartup_ctor(struct MyStartup *self, Reactor *parent, Reaction *effects) { - self->effects_[0] = effects; - Startup_ctor(&self->super, parent, self->effects_, 1); -} - -void action_handler(Reaction *_self) { - struct MyReactor *self = (struct MyReactor *)_self->parent; - Environment *env = self->super.env; +REACTION_BODY(MyReactor, 0, { MyAction *my_action = &self->my_action; + if (self->cnt == 0) { TEST_ASSERT_EQUAL(lf_is_present(my_action), false); } else { @@ -66,28 +39,27 @@ void action_handler(Reaction *_self) { if (self->cnt < 100) { lf_schedule(my_action, ++self->cnt, 0); } -} +}) -void MyReaction_ctor(MyReaction *self, Reactor *parent) { - Reaction_ctor(&self->super, parent, action_handler, self->effects, 1, 0); -} - -void MyReactor_ctor(struct MyReactor *self, Environment *env) { +void MyReactor_ctor(MyReactor *self, Environment *env) { self->_reactions[0] = (Reaction *)&self->my_reaction; self->_triggers[0] = (Trigger *)&self->startup; self->_triggers[1] = (Trigger *)&self->my_action; + Reactor_ctor(&self->super, "MyReactor", env, NULL, NULL, 0, self->_reactions, 1, self->_triggers, 2); - MyAction_ctor(&self->my_action, self); - MyReaction_ctor(&self->my_reaction, &self->super); - MyStartup_ctor(&self->startup, &self->super, &self->my_reaction.super); + MyAction_ctor(&self->my_action, &self->super); + MyReactor_0_ctor(&self->my_reaction, &self->super); + MyStartup_ctor(&self->startup, &self->super); + ACTION_REGISTER_EFFECT(self->my_action, self->my_reaction); REACTION_REGISTER_EFFECT(self->my_reaction, self->my_action); ACTION_REGISTER_SOURCE(self->my_action, self->my_reaction); + STARTUP_REGISTER_EFFECT(self->startup, self->my_reaction); self->cnt = 0; } void test_simple() { - struct MyReactor my_reactor; + MyReactor my_reactor; Environment env; Environment_ctor(&env, (Reactor *)&my_reactor); MyReactor_ctor(&my_reactor, &env); diff --git a/test/unit/action_test.c b/test/unit/action_test.c index e6a0796e..4e4248cd 100644 --- a/test/unit/action_test.c +++ b/test/unit/action_test.c @@ -1,48 +1,21 @@ #include "reactor-uc/reactor-uc.h" #include "unity.h" -typedef struct { - LogicalAction super; - int buffer[2]; - - Reaction *sources[1]; - Reaction *effects[1]; -} MyAction; - -typedef struct MyStartup MyStartup; - -struct MyStartup { - Startup super; - Reaction *effects_[1]; -}; +DEFINE_LOGICAL_ACTION(MyAction, 1, 1, int, 2, MSEC(0), MSEC(0)); +DEFINE_STARTUP(MyStartup, 1); +DEFINE_REACTION(MyReactor, 0, 1); typedef struct { - Reaction super; - Trigger *effects[1]; -} MyReaction; - -struct MyReactor { Reactor super; - MyReaction my_reaction; + MyReactor_0 my_reaction; MyAction my_action; MyStartup startup; Reaction *_reactions[1]; Trigger *_triggers[2]; int cnt; -}; - -void MyAction_ctor(MyAction *self, struct MyReactor *parent) { - LogicalAction_ctor(&self->super, MSEC(0), MSEC(0), &parent->super, self->sources, 1, self->effects, 1, &self->buffer, - sizeof(self->buffer[0]), 2); -} +} MyReactor ; -void MyStartup_ctor(struct MyStartup *self, Reactor *parent, Reaction *effects) { - self->effects_[0] = effects; - Startup_ctor(&self->super, parent, self->effects_, 1); -} - -void action_handler(Reaction *_self) { - struct MyReactor *self = (struct MyReactor *)_self->parent; +REACTION_BODY(MyReactor, 0, { MyAction *my_action = &self->my_action; if (self->cnt == 0) { TEST_ASSERT_EQUAL(lf_is_present(my_action), false); @@ -57,28 +30,26 @@ void action_handler(Reaction *_self) { } lf_schedule(my_action, ++self->cnt, MSEC(100)); -} +}) -void MyReaction_ctor(MyReaction *self, Reactor *parent) { - Reaction_ctor(&self->super, parent, action_handler, self->effects, 1, 0); -} - -void MyReactor_ctor(struct MyReactor *self, Environment *env) { +void MyReactor_ctor(MyReactor *self, Environment *env) { self->_reactions[0] = (Reaction *)&self->my_reaction; self->_triggers[0] = (Trigger *)&self->startup; self->_triggers[1] = (Trigger *)&self->my_action; Reactor_ctor(&self->super, "MyReactor", env, NULL, NULL, 0, self->_reactions, 1, self->_triggers, 2); - MyAction_ctor(&self->my_action, self); - MyReaction_ctor(&self->my_reaction, &self->super); - MyStartup_ctor(&self->startup, &self->super, &self->my_reaction.super); + MyAction_ctor(&self->my_action, &self->super); + MyReactor_0_ctor(&self->my_reaction, &self->super); + MyStartup_ctor(&self->startup, &self->super); ACTION_REGISTER_EFFECT(self->my_action, self->my_reaction); REACTION_REGISTER_EFFECT(self->my_reaction, self->my_action); ACTION_REGISTER_SOURCE(self->my_action, self->my_reaction); + STARTUP_REGISTER_EFFECT(self->startup, self->my_reaction); + self->cnt = 0; } void test_simple() { - struct MyReactor my_reactor; + MyReactor my_reactor; Environment env; Environment_ctor(&env, (Reactor *)&my_reactor); MyReactor_ctor(&my_reactor, &env); diff --git a/test/unit/delayed_conn_test.c b/test/unit/delayed_conn_test.c index 9295d666..7ddb2cc9 100644 --- a/test/unit/delayed_conn_test.c +++ b/test/unit/delayed_conn_test.c @@ -1,56 +1,35 @@ #include "reactor-uc/reactor-uc.h" #include "unity.h" -// Reactor Sender -typedef struct { - Timer super; - Reaction *effects[1]; -} Timer1; - -typedef struct { - Reaction super; -} Reaction1; +// Components of Reactor Sender +DEFINE_TIMER(Timer1, 1, 0, MSEC(100)) +DEFINE_REACTION(Sender, 0, 0); +DEFINE_OUTPUT_PORT(Out, 1) typedef struct { - Output super; - Reaction *sources[1]; - interval_t value; -} Out; - -struct Sender { Reactor super; - Reaction1 reaction; + Sender_0 reaction; Timer1 timer; Out out; Reaction *_reactions[1]; Trigger *_triggers[1]; -}; +} Sender; -void timer_handler(Reaction *_self) { - struct Sender *self = (struct Sender *)_self->parent; - Environment *env = self->super.env; +REACTION_BODY(Sender, 0, { Out *out = &self->out; printf("Timer triggered @ %ld\n", env->get_elapsed_logical_time(env)); lf_set(out, env->get_elapsed_logical_time(env)); -} +}) -void Reaction1_ctor(Reaction1 *self, Reactor *parent) { - Reaction_ctor(&self->super, parent, timer_handler, NULL, 0, 0); -} - -void Out_ctor(Out *self, struct Sender *parent) { - self->sources[0] = &parent->reaction.super; - Output_ctor(&self->super, &parent->super, self->sources, 1); -} - -void Sender_ctor(struct Sender *self, Reactor *parent, Environment *env) { +void Sender_ctor(Sender *self, Reactor *parent, Environment *env) { self->_reactions[0] = (Reaction *)&self->reaction; self->_triggers[0] = (Trigger *)&self->timer; Reactor_ctor(&self->super, "Sender", env, parent, NULL, 0, self->_reactions, 1, self->_triggers, 1); - Reaction1_ctor(&self->reaction, &self->super); - Timer_ctor(&self->timer.super, &self->super, 0, MSEC(100), self->timer.effects, 1); - Out_ctor(&self->out, self); + Sender_0_ctor(&self->reaction, &self->super); + Timer1_ctor(&self->timer, &self->super); + Out_ctor(&self->out, &self->super); + TIMER_REGISTER_EFFECT(self->timer, self->reaction); // Register reaction as a source for out @@ -58,75 +37,50 @@ void Sender_ctor(struct Sender *self, Reactor *parent, Environment *env) { } // Reactor Receiver -typedef struct { - Reaction super; -} Reaction2; +DEFINE_REACTION(Receiver, 0, 0) +DEFINE_INPUT_PORT(In, 1, interval_t, 1) typedef struct { - Input super; - interval_t buffer[1]; - Reaction *effects[1]; -} In; - -struct Receiver { Reactor super; - Reaction2 reaction; + Receiver_0 reaction; In inp; int cnt; Reaction *_reactions[1]; Trigger *_triggers[1]; -}; +} Receiver ; -void In_ctor(In *self, struct Receiver *parent) { - Input_ctor(&self->super, &parent->super, self->effects, 1, self->buffer, sizeof(self->buffer[0])); -} - -void input_handler(Reaction *_self) { - struct Receiver *self = (struct Receiver *)_self->parent; - Environment *env = self->super.env; +REACTION_BODY(Receiver, 0, { In *inp = &self->inp; printf("Input triggered @ %ld with %ld\n", env->get_elapsed_logical_time(env), lf_get(inp)); TEST_ASSERT_EQUAL(lf_get(inp) + MSEC(150), env->get_elapsed_logical_time(env)); -} +}) -void Reaction2_ctor(Reaction2 *self, Reactor *parent) { - Reaction_ctor(&self->super, parent, input_handler, NULL, 0, 0); -} - -void Receiver_ctor(struct Receiver *self, Reactor *parent, Environment *env) { +void Receiver_ctor(Receiver *self, Reactor *parent, Environment *env) { self->_reactions[0] = (Reaction *)&self->reaction; self->_triggers[0] = (Trigger *)&self->inp; Reactor_ctor(&self->super, "Receiver", env, parent, NULL, 0, self->_reactions, 1, self->_triggers, 1); - Reaction2_ctor(&self->reaction, &self->super); - In_ctor(&self->inp, self); + Receiver_0_ctor(&self->reaction, &self->super); + In_ctor(&self->inp, &self->super); // Register reaction as an effect of in INPUT_REGISTER_EFFECT(self->inp, self->reaction); } -struct Conn1 { - DelayedConnection super; - interval_t buffer[2]; - Input *downstreams[1]; -}; +DEFINE_DELAYED_CONNECTION(Conn1, 1, interval_t, 1, MSEC(150)) -void Conn1_ctor(struct Conn1 *self, Reactor *parent) { - DelayedConnection_ctor(&self->super, parent, (Port **)self->downstreams, 1, MSEC(150), self->buffer, - sizeof(self->buffer[0]), 2); -} // Reactor main -struct Main { +typedef struct { Reactor super; - struct Sender sender; - struct Receiver receiver; - struct Conn1 conn; + Sender sender; + Receiver receiver; + Conn1 conn; Reactor *_children[2]; -}; +} Main; -void Main_ctor(struct Main *self, Environment *env) { +void Main_ctor(Main *self, Environment *env) { self->_children[0] = &self->sender.super; Sender_ctor(&self->sender, &self->super, env); @@ -134,14 +88,13 @@ void Main_ctor(struct Main *self, Environment *env) { Receiver_ctor(&self->receiver, &self->super, env); Conn1_ctor(&self->conn, &self->super); - CONN_REGISTER_UPSTREAM(self->conn, self->sender.out); - CONN_REGISTER_DOWNSTREAM(self->conn, self->receiver.inp); + CONNECT(self->conn, self->sender.out, self->receiver.inp); Reactor_ctor(&self->super, "Main", env, NULL, self->_children, 2, NULL, 0, NULL, 0); } void test_simple() { - struct Main main; + Main main; Environment env; Environment_ctor(&env, (Reactor *)&main); Main_ctor(&main, &env); diff --git a/test/unit/physical_action_test.c b/test/unit/physical_action_test.c index e1b01b29..274bcb3b 100644 --- a/test/unit/physical_action_test.c +++ b/test/unit/physical_action_test.c @@ -4,57 +4,26 @@ Environment env; -typedef struct { - PhysicalAction super; - int buffer[1]; - Reaction *sources[1]; - Reaction *effects[1]; -} MyAction; - -typedef struct MyStartup MyStartup; -struct MyStartup { - Startup super; - Reaction *effects_[1]; -}; - -typedef struct { - Shutdown super; - Reaction *effects_[1]; -} MyShutdown; - -typedef struct { - Reaction super; -} ShutdownReaction; - -typedef struct { - Reaction super; - Trigger *effects[1]; -} MyReaction; +DEFINE_PHYSICAL_ACTION(MyAction, 1, 1, int, 1, MSEC(0), MSEC(0)) +DEFINE_STARTUP(MyStartup, 1) +DEFINE_SHUTDOWN(MyShutdown, 1) +DEFINE_REACTION(MyReactor, 0, 1) +DEFINE_REACTION(MyReactor, 1, 1) +DEFINE_REACTION(MyReactor, 2, 0) typedef struct { - Reaction super; - Trigger *effects[1]; -} StartupReaction; - -struct MyReactor { Reactor super; - StartupReaction startup_reaction; - ShutdownReaction shutdown_reaction; - MyReaction my_reaction; + MyReactor_0 startup_reaction; + MyReactor_1 my_reaction; + MyReactor_2 shutdown_reaction; MyAction my_action; MyStartup startup; MyShutdown shutdown; Reaction *_reactions[3]; Trigger *_triggers[3]; int cnt; -}; +} MyReactor; -void MyAction_ctor(MyAction *self, struct MyReactor *parent) { - self->sources[0] = &parent->startup_reaction.super; - self->effects[0] = &parent->my_reaction.super; - PhysicalAction_ctor(&self->super, MSEC(0), MSEC(0), &parent->super, self->sources, 1, self->effects, 1, self->buffer, - sizeof(self->buffer[0]), 2); -} bool run_thread = true; void *async_action_scheduler(void *_action) { MyAction *action = (MyAction *)_action; @@ -67,45 +36,27 @@ void *async_action_scheduler(void *_action) { } pthread_t thread; -void startup_handler(Reaction *_self) { - struct MyReactor *self = (struct MyReactor *)_self->parent; + +REACTION_BODY(MyReactor, 0, { MyAction *action = &self->my_action; pthread_create(&thread, NULL, async_action_scheduler, (void *)action); -} +}); -void shutdown_handler(Reaction *_self) { - (void)_self; - run_thread = false; - void *retval; - int ret = pthread_join(thread, &retval); -} - -void MyStartup_ctor(struct MyStartup *self, Reactor *parent) { Startup_ctor(&self->super, parent, self->effects_, 1); } - -void MyShutdown_ctor(MyShutdown *self, Reactor *parent) { Shutdown_ctor(&self->super, parent, self->effects_, 1); } - -void StartupReaction_ctor(StartupReaction *self, Reactor *parent) { - Reaction_ctor(&self->super, parent, startup_handler, self->effects, 1, 0); -} - -void ShutdownReaction_ctor(ShutdownReaction *self, Reactor *parent) { - Reaction_ctor(&self->super, parent, shutdown_handler, NULL, 0, 2); -} - -void action_handler(Reaction *_self) { - struct MyReactor *self = (struct MyReactor *)_self->parent; +REACTION_BODY(MyReactor, 1, { MyAction *my_action = &self->my_action; printf("Hello World\n"); printf("PhysicalAction = %d\n", lf_get(my_action)); TEST_ASSERT_EQUAL(lf_get(my_action), self->cnt++); -} +}) -void MyReaction_ctor(MyReaction *self, Reactor *parent) { - Reaction_ctor(&self->super, parent, action_handler, self->effects, 1, 1); -} +REACTION_BODY(MyReactor, 2, { + run_thread = false; + void *retval; + int ret = pthread_join(thread, &retval); +}) -void MyReactor_ctor(struct MyReactor *self, Environment *env) { +void MyReactor_ctor(MyReactor *self, Environment *_env) { self->_reactions[1] = (Reaction *)&self->my_reaction; self->_reactions[2] = (Reaction *)&self->shutdown_reaction; self->_reactions[0] = (Reaction *)&self->startup_reaction; @@ -113,11 +64,11 @@ void MyReactor_ctor(struct MyReactor *self, Environment *env) { self->_triggers[1] = (Trigger *)&self->my_action; self->_triggers[2] = (Trigger *)&self->shutdown; - Reactor_ctor(&self->super, "MyReactor", env, NULL, NULL, 0, self->_reactions, 3, self->_triggers, 3); - MyAction_ctor(&self->my_action, self); - MyReaction_ctor(&self->my_reaction, &self->super); - StartupReaction_ctor(&self->startup_reaction, &self->super); - ShutdownReaction_ctor(&self->shutdown_reaction, &self->super); + Reactor_ctor(&self->super, "MyReactor", _env, NULL, NULL, 0, self->_reactions, 3, self->_triggers, 3); + MyAction_ctor(&self->my_action, &self->super); + MyReactor_0_ctor(&self->startup_reaction, &self->super); + MyReactor_1_ctor(&self->my_reaction, &self->super); + MyReactor_2_ctor(&self->shutdown_reaction, &self->super); MyStartup_ctor(&self->startup, &self->super); MyShutdown_ctor(&self->shutdown, &self->super); @@ -132,7 +83,7 @@ void MyReactor_ctor(struct MyReactor *self, Environment *env) { } void test_simple() { - struct MyReactor my_reactor; + MyReactor my_reactor; Environment_ctor(&env, (Reactor *)&my_reactor); MyReactor_ctor(&my_reactor, &env); env.scheduler.set_timeout(&env.scheduler, SEC(1)); diff --git a/test/unit/port_test.c b/test/unit/port_test.c index 38f4984b..1eb3a06f 100644 --- a/test/unit/port_test.c +++ b/test/unit/port_test.c @@ -1,126 +1,82 @@ #include "reactor-uc/reactor-uc.h" #include "unity.h" -// Reactor Sender -typedef struct { - Timer super; - Reaction *effects[1]; -} Timer1; - -typedef struct { - Reaction super; -} Reaction1; +// Components of Reactor Sender +DEFINE_TIMER(Timer1, 1, 0, SEC(1)) +DEFINE_REACTION(Sender, 0, 0) +DEFINE_OUTPUT_PORT(Out, 1) typedef struct { - Output super; - Reaction *sources[1]; -} Out; - -struct Sender { Reactor super; - Reaction1 reaction; + Sender_0 reaction; Timer1 timer; Out out; Reaction *_reactions[1]; Trigger *_triggers[1]; -}; +} Sender; -void timer_handler(Reaction *_self) { - struct Sender *self = (struct Sender *)_self->parent; - Environment *env = self->super.env; +REACTION_BODY(Sender, 0, { Out *out = &self->out; printf("Timer triggered @ %ld\n", env->get_elapsed_logical_time(env)); lf_set(out, env->get_elapsed_logical_time(env)); -} - -void Reaction1_ctor(Reaction1 *self, Reactor *parent) { - Reaction_ctor(&self->super, parent, timer_handler, NULL, 0, 0); -} - -void Out_ctor(Out *self, struct Sender *parent) { - self->sources[0] = &parent->reaction.super; - Output_ctor(&self->super, &parent->super, self->sources, 1); -} +}) -void Sender_ctor(struct Sender *self, Reactor *parent, Environment *env) { +void Sender_ctor(Sender *self, Reactor *parent, Environment *env) { self->_reactions[0] = (Reaction *)&self->reaction; self->_triggers[0] = (Trigger *)&self->timer; Reactor_ctor(&self->super, "Sender", env, parent, NULL, 0, self->_reactions, 1, self->_triggers, 1); - Reaction1_ctor(&self->reaction, &self->super); - Timer_ctor(&self->timer.super, &self->super, 0, SEC(1), self->timer.effects, 1); - Out_ctor(&self->out, self); + Sender_0_ctor(&self->reaction, &self->super); + Timer1_ctor(&self->timer, &self->super); + Out_ctor(&self->out, &self->super); TIMER_REGISTER_EFFECT(self->timer, self->reaction); OUTPUT_REGISTER_SOURCE(self->out, self->reaction); } // Reactor Receiver -typedef struct { - Reaction super; -} Reaction2; -typedef struct { - Input super; - instant_t buffer[1]; - Reaction *effects[1]; -} In; +DEFINE_REACTION(Receiver, 0, 0) +DEFINE_INPUT_PORT(In, 1, instant_t, 1) -struct Receiver { +typedef struct { Reactor super; - Reaction2 reaction; + Receiver_0 reaction; In inp; Reaction *_reactions[1]; Trigger *_triggers[1]; -}; +} Receiver ; -void In_ctor(In *self, struct Receiver *parent) { - Input_ctor(&self->super, &parent->super, self->effects, 1, self->buffer, sizeof(self->buffer[0])); -} - -void input_handler(Reaction *_self) { - struct Receiver *self = (struct Receiver *)_self->parent; - Environment *env = self->super.env; +REACTION_BODY(Receiver, 0, { In *inp = &self->inp; printf("Input triggered @ %ld with %ld\n", env->get_elapsed_logical_time(env), lf_get(inp)); TEST_ASSERT_EQUAL(lf_get(inp), env->get_elapsed_logical_time(env)); -} +}) -void Reaction2_ctor(Reaction2 *self, Reactor *parent) { - Reaction_ctor(&self->super, parent, input_handler, NULL, 0, 0); -} - -void Receiver_ctor(struct Receiver *self, Reactor *parent, Environment *env) { +void Receiver_ctor(Receiver *self, Reactor *parent, Environment *env) { self->_reactions[0] = (Reaction *)&self->reaction; self->_triggers[0] = (Trigger *)&self->inp; Reactor_ctor(&self->super, "Receiver", env, parent, NULL, 0, self->_reactions, 1, self->_triggers, 1); - Reaction2_ctor(&self->reaction, &self->super); - In_ctor(&self->inp, self); + Receiver_0_ctor(&self->reaction, &self->super); + In_ctor(&self->inp, &self->super); // Register reaction as an effect of in INPUT_REGISTER_EFFECT(self->inp, self->reaction); } -struct Conn1 { - LogicalConnection super; - Input *downstreams[1]; -}; - -void Conn1_ctor(struct Conn1 *self, Reactor *parent) { - LogicalConnection_ctor(&self->super, parent, (Port **)self->downstreams, 1); -} - // Reactor main -struct Main { +DEFINE_LOGICAL_CONNECTION(Conn1, 1) + +typedef struct { Reactor super; - struct Sender sender; - struct Receiver receiver; - struct Conn1 conn; + Sender sender; + Receiver receiver; + Conn1 conn; Reactor *_children[2]; -}; +} Main; -void Main_ctor(struct Main *self, Environment *env) { +void Main_ctor(Main *self, Environment *env) { self->_children[0] = &self->sender.super; Sender_ctor(&self->sender, &self->super, env); @@ -128,14 +84,14 @@ void Main_ctor(struct Main *self, Environment *env) { Receiver_ctor(&self->receiver, &self->super, env); Conn1_ctor(&self->conn, &self->super); - CONN_REGISTER_UPSTREAM(self->conn, self->sender.out); - CONN_REGISTER_DOWNSTREAM(self->conn, self->receiver.inp); + + CONNECT(self->conn, self->sender.out, self->receiver.inp); Reactor_ctor(&self->super, "Main", env, NULL, self->_children, 2, NULL, 0, NULL, 0); } void test_simple() { - struct Main main; + Main main; Environment env; Environment_ctor(&env, (Reactor *)&main); Main_ctor(&main, &env); diff --git a/test/unit/shutdown_test.c b/test/unit/shutdown_test.c index 25b118ca..6192174d 100644 --- a/test/unit/shutdown_test.c +++ b/test/unit/shutdown_test.c @@ -2,80 +2,49 @@ #include "reactor-uc/reactor-uc.h" #include "unity.h" -typedef struct { - Startup super; - Reaction *effects_[1]; -} MyStartup; - -typedef struct { - Shutdown super; - Reaction *effects_[1]; -} MyShutdown; - -typedef struct { - Reaction super; -} Reaction1; - -typedef struct { - Reaction super; -} Reaction2; +DEFINE_STARTUP(MyStartup, 1) +DEFINE_SHUTDOWN(MyShutdown, 1) +DEFINE_REACTION(MyReactor, 0, 0) +DEFINE_REACTION(MyReactor, 1, 0) typedef struct { Reactor super; - Reaction1 reaction1; - Reaction2 reaction2; + MyReactor_0 reaction0; + MyReactor_1 reaction1; MyStartup startup; MyShutdown shutdown; Reaction *_reactions[2]; Trigger *_triggers[2]; } MyReactor; -void MyStartup_ctor(MyStartup *self, MyReactor *parent) { - Startup_ctor(&self->super, &parent->super, self->effects_, 1); -} +REACTION_BODY(MyReactor, 0, { + printf("Startup reaction executing\n"); +}) -void Reaction1_body(Reaction *_self) { printf("Startup reaction executing\n"); } - -void Reaction1_ctor(Reaction1 *self, Reactor *parent) { - Reaction_ctor(&self->super, parent, Reaction1_body, NULL, 0, 0); -} - -void MyShutdown_ctor(MyShutdown *self, MyReactor *parent) { - Shutdown_ctor(&self->super, &parent->super, self->effects_, 1); -} - -void Reaction2_body(Reaction *_self) { printf("Shutdown reaction executing\n"); } - -void Reaction2_ctor(Reaction2 *self, Reactor *parent) { - Reaction_ctor(&self->super, parent, Reaction2_body, NULL, 0, 0); -} +REACTION_BODY(MyReactor, 1, { + printf("Shutdown reaction executing\n"); +}) void MyReactor_ctor(MyReactor *self, Environment *env) { - self->_reactions[0] = (Reaction *)&self->reaction1; - self->_reactions[1] = (Reaction *)&self->reaction2; + self->_reactions[0] = (Reaction *)&self->reaction0; + self->_reactions[1] = (Reaction *)&self->reaction1; self->_triggers[0] = (Trigger *)&self->startup; self->_triggers[1] = (Trigger *)&self->shutdown; + Reactor_ctor(&self->super, "MyReactor", env, NULL, NULL, 0, self->_reactions, 2, self->_triggers, 2); - Reaction1_ctor(&self->reaction1, &self->super); - Reaction2_ctor(&self->reaction2, &self->super); - MyStartup_ctor(&self->startup, self); - MyShutdown_ctor(&self->shutdown, self); + MyReactor_0_ctor(&self->reaction0, &self->super); + MyReactor_1_ctor(&self->reaction1, &self->super); + MyStartup_ctor(&self->startup, &self->super); + MyShutdown_ctor(&self->shutdown, &self->super); - STARTUP_REGISTER_EFFECT(self->startup, self->reaction1); - SHUTDOWN_REGISTER_EFFECT(self->shutdown, self->reaction2); + STARTUP_REGISTER_EFFECT(self->startup, self->reaction0); + SHUTDOWN_REGISTER_EFFECT(self->shutdown, self->reaction1); } -void test_simple() { - MyReactor my_reactor; - Environment env; - Environment_ctor(&env, (Reactor *)&my_reactor); - MyReactor_ctor(&my_reactor, &env); - env.assemble(&env); - env.start(&env); -} +ENTRY_POINT(MyReactor) int main() { UNITY_BEGIN(); - RUN_TEST(test_simple); + RUN_TEST(lf_start); return UNITY_END(); } diff --git a/test/unit/startup_test.c b/test/unit/startup_test.c index 43c47132..3a1636c4 100644 --- a/test/unit/startup_test.c +++ b/test/unit/startup_test.c @@ -1,58 +1,37 @@ #include "reactor-uc/reactor-uc.h" #include "unity.h" -typedef struct { - Startup super; - Reaction *effects_[1]; -} MyStartup; -typedef struct { - Reaction super; -} MyReaction; +DEFINE_STARTUP(MyStartup, 1) +DEFINE_REACTION(MyReactor, 0, 0) typedef struct { Reactor super; - MyReaction my_reaction; + MyReactor_0 my_reaction; MyStartup startup; Reaction *_reactions[1]; Trigger *_triggers[1]; int cnt; } MyReactor; -void MyStartup_ctor(MyStartup *self, Reactor *parent, Reaction *effects) { - self->effects_[0] = effects; - Startup_ctor(&self->super, parent, self->effects_, 1); -} - -void startup_handler(Reaction *_self) { - MyReactor *self = (MyReactor *)_self->parent; - (void)self; +REACTION_BODY(MyReactor, 0, { printf("Hello World\n"); -} - -void MyReaction_ctor(MyReaction *self, Reactor *parent) { - Reaction_ctor(&self->super, parent, startup_handler, NULL, 0, 0); -} +}) void MyReactor_ctor(MyReactor *self, Environment *env) { self->_reactions[0] = (Reaction *)&self->my_reaction; self->_triggers[0] = (Trigger *)&self->startup; Reactor_ctor(&self->super, "MyReactor", env, NULL, NULL, 0, self->_reactions, 1, self->_triggers, 1); - MyReaction_ctor(&self->my_reaction, &self->super); - MyStartup_ctor(&self->startup, &self->super, &self->my_reaction.super); -} + MyReactor_0_ctor(&self->my_reaction, &self->super); + MyStartup_ctor(&self->startup, &self->super); -void test_simple() { - MyReactor my_reactor; - Environment env; - Environment_ctor(&env, (Reactor *)&my_reactor); - MyReactor_ctor(&my_reactor, &env); - env.assemble(&env); - env.start(&env); + STARTUP_REGISTER_EFFECT(self->startup, self->my_reaction); } +ENTRY_POINT(MyReactor); + int main() { UNITY_BEGIN(); - RUN_TEST(test_simple); + RUN_TEST(lf_start); return UNITY_END(); } \ No newline at end of file diff --git a/test/unit/timer_test.c b/test/unit/timer_test.c index 9e0acdb7..07996115 100644 --- a/test/unit/timer_test.c +++ b/test/unit/timer_test.c @@ -1,44 +1,32 @@ #include "reactor-uc/reactor-uc.h" #include "unity.h" -typedef struct { - Timer super; - Reaction *effects[1]; -} MyTimer; +DEFINE_TIMER(MyTimer, 1, 0, MSEC(100)) +DEFINE_REACTION(MyReactor, 0, 0) typedef struct { - Reaction super; -} MyReaction; - -struct MyReactor { Reactor super; - MyReaction my_reaction; + MyReactor_0 my_reaction; MyTimer timer; Reaction *_reactions[1]; Trigger *_triggers[1]; -}; +} MyReactor; -void timer_handler(Reaction *_self) { - struct MyReactor *self = (struct MyReactor *)_self->parent; - Environment *env = self->super.env; +REACTION_BODY(MyReactor, 0, { printf("Hello World @ %ld\n", env->get_elapsed_logical_time(env)); -} - -void MyReaction_ctor(MyReaction *self, Reactor *parent) { - Reaction_ctor(&self->super, parent, timer_handler, NULL, 0, 0); -} +}) -void MyReactor_ctor(struct MyReactor *self, Environment *env) { +void MyReactor_ctor(MyReactor *self, Environment *env) { self->_reactions[0] = (Reaction *)&self->my_reaction; self->_triggers[0] = (Trigger *)&self->timer; Reactor_ctor(&self->super, "MyReactor", env, NULL, NULL, 0, self->_reactions, 1, self->_triggers, 1); - MyReaction_ctor(&self->my_reaction, &self->super); - Timer_ctor(&self->timer.super, &self->super, MSEC(0), MSEC(100), self->timer.effects, 1); + MyReactor_0_ctor(&self->my_reaction, &self->super); + MyTimer_ctor(&self->timer, &self->super); TIMER_REGISTER_EFFECT(self->timer, self->my_reaction); } void test_simple() { - struct MyReactor my_reactor; + MyReactor my_reactor; Environment env; Environment_ctor(&env, (Reactor *)&my_reactor); env.scheduler.set_timeout(&env.scheduler, SEC(1)); From cec02034716775a0c10a5b6ea5623a11920e4f7f Mon Sep 17 00:00:00 2001 From: Lasse Rosenow <10547444+LasseRosenow@users.noreply.github.com> Date: Fri, 18 Oct 2024 02:27:20 +0200 Subject: [PATCH 2/2] Remove ioctl from TcpIpChannel (#61) * Remove ioctl from TcpIpChannel * Make NetworkcChannel blocking. Fix some typos. Remove some debug prints. * Include posix TcpIpChannel when platform is RIOT --------- Co-authored-by: erlingrj --- .../testing_posix_tcp_ip_channel_client.c | 12 +-- .../testing_posix_tcp_ip_channel_server.c | 22 ++--- .../testing_tcp_ip_channel_server_callback.c | 3 - include/reactor-uc/encoding.h | 1 - include/reactor-uc/network_channel.h | 1 - src/platform.c | 1 + src/platform/posix/tcp_ip_channel.c | 86 +++++-------------- 7 files changed, 34 insertions(+), 92 deletions(-) diff --git a/examples/posix/testing_posix_tcp_ip_channel_client.c b/examples/posix/testing_posix_tcp_ip_channel_client.c index 7ea20bce..b795ffc2 100644 --- a/examples/posix/testing_posix_tcp_ip_channel_client.c +++ b/examples/posix/testing_posix_tcp_ip_channel_client.c @@ -11,7 +11,7 @@ int main() { // server address const char *host = "127.0.0.1"; - unsigned short port = 8901; // NOLINT + unsigned short port = 8902; // NOLINT // message for server TaggedMessage port_message; @@ -26,23 +26,15 @@ int main() { // binding to that address channel.super.connect(&channel.super); - // change the super to non-blocking - channel.super.change_block_state(&channel.super, false); - for (int i = 0; i < NUM_ITER; i++) { // sending message channel.super.send(&channel.super, &port_message); // waiting for reply - TaggedMessage *received_message = NULL; - do { - received_message = channel.super.receive(&channel.super); - } while (received_message == NULL); + TaggedMessage *received_message = channel.super.receive(&channel.super); printf("Received message with connection number %i and content %s\n", received_message->conn_id, (char *)received_message->payload.bytes); - - sleep(i); } channel.super.close(&channel.super); diff --git a/examples/posix/testing_posix_tcp_ip_channel_server.c b/examples/posix/testing_posix_tcp_ip_channel_server.c index 9c1c8516..68bee971 100644 --- a/examples/posix/testing_posix_tcp_ip_channel_server.c +++ b/examples/posix/testing_posix_tcp_ip_channel_server.c @@ -3,6 +3,8 @@ #include #include +#define NUM_ITER 10 + int main() { TcpIpChannel channel; @@ -15,26 +17,20 @@ int main() { // binding to that address channel.super.bind(&channel.super); - // change the super to non-blocking - channel.super.change_block_state(&channel.super, false); - // accept one connection bool new_connection; do { new_connection = channel.super.accept(&channel.super); } while (!new_connection); - // waiting for messages from client - TaggedMessage *message = NULL; - do { - message = channel.super.receive(&channel.super); - sleep(1); - } while (message == NULL); - - printf("Received message with connection number %i and content %s\n", message->conn_id, - (char *)message->payload.bytes); + for (int i = 0; i < NUM_ITER; i++) { + // waiting for messages from client + TaggedMessage *message = channel.super.receive(&channel.super); + printf("Received message with connection number %i and content %s\n", message->conn_id, + (char *)message->payload.bytes); - channel.super.send(&channel.super, message); + channel.super.send(&channel.super, message); + } channel.super.close(&channel.super); } diff --git a/examples/posix/testing_tcp_ip_channel_server_callback.c b/examples/posix/testing_tcp_ip_channel_server_callback.c index 9f6306c0..c6a4e50c 100644 --- a/examples/posix/testing_tcp_ip_channel_server_callback.c +++ b/examples/posix/testing_tcp_ip_channel_server_callback.c @@ -21,9 +21,6 @@ int main() { // binding to that address channel.super.bind(&channel.super); - // change the super to non-blocking - channel.super.change_block_state(&channel.super, false); - // accept one connection bool new_connection; do { diff --git a/include/reactor-uc/encoding.h b/include/reactor-uc/encoding.h index 2d8599ed..4e89a455 100644 --- a/include/reactor-uc/encoding.h +++ b/include/reactor-uc/encoding.h @@ -22,7 +22,6 @@ int decode_protobuf(TaggedMessage *message, const unsigned char *buffer, size_t pb_istream_t stream_in = pb_istream_from_buffer(buffer, buffer_size); if (!pb_decode(&stream_in, TaggedMessage_fields, message)) { - printf("protobuf decoding error %s\n", stream_in.errmsg); return -1; } diff --git a/include/reactor-uc/network_channel.h b/include/reactor-uc/network_channel.h index 03679b8a..1fd4b83a 100644 --- a/include/reactor-uc/network_channel.h +++ b/include/reactor-uc/network_channel.h @@ -17,7 +17,6 @@ struct NetworkChannel { lf_ret_t (*connect)(NetworkChannel *self); bool (*accept)(NetworkChannel *self); void (*close)(NetworkChannel *self); - void (*change_block_state)(NetworkChannel *self, bool blocking); void (*register_callback)(NetworkChannel *self, void (*receive_callback)(FederatedConnectionBundle *conn, TaggedMessage *message), FederatedConnectionBundle *conn); diff --git a/src/platform.c b/src/platform.c index 4fd45271..22ab40d0 100644 --- a/src/platform.c +++ b/src/platform.c @@ -2,6 +2,7 @@ #include "platform/posix/posix.c" #include "platform/posix/tcp_ip_channel.c" #elif defined(PLATFORM_RIOT) +#include "platform/posix/tcp_ip_channel.c" #include "platform/riot/riot.c" #elif defined(PLATFORM_ZEPHYR) #include "platform/zephyr/zephyr.c" diff --git a/src/platform/posix/tcp_ip_channel.c b/src/platform/posix/tcp_ip_channel.c index fa97ba52..faab5724 100644 --- a/src/platform/posix/tcp_ip_channel.c +++ b/src/platform/posix/tcp_ip_channel.c @@ -1,20 +1,20 @@ #include "reactor-uc/platform/posix/tcp_ip_channel.h" #include "reactor-uc/encoding.h" +#include "reactor-uc/logging.h" #include #include #include #include +#include +#include #include #include #include -#include +#include #include #include -#include -#include - #include "proto/message.pb.h" #define MIN(a, b) ((a) < (b) ? (a) : (b)) @@ -34,8 +34,6 @@ lf_ret_t TcpIpChannel_bind(NetworkChannel *untyped_self) { // bind the socket to that address int ret = bind(self->fd, (struct sockaddr *)&serv_addr, sizeof(serv_addr)); if (ret < 0) { - printf("ret = %d, errno=%d\n", ret, errno); - return LF_NETWORK_SETUP_FAILED; } @@ -62,7 +60,6 @@ lf_ret_t TcpIpChannel_connect(NetworkChannel *untyped_self) { } int ret = connect(self->fd, (struct sockaddr *)&serv_addr, sizeof(serv_addr)); - printf("ret=%d errno=%d\n", ret, errno); if (ret < 0) { return LF_COULD_NOT_CONNECT; } @@ -84,7 +81,6 @@ bool TcpIpChannel_accept(NetworkChannel *untyped_self) { return true; } - printf("accept ret=%d, errno=%d\n", new_socket, errno); return false; } @@ -112,7 +108,7 @@ lf_ret_t TcpIpChannel_send(NetworkChannel *untyped_self, TaggedMessage *message) int timeout = TCP_IP_CHANNEL_NUM_RETRIES; while (bytes_written < message_size && timeout > 0) { - int bytes_send = write(socket, self->write_buffer + bytes_written, message_size - bytes_written); + int bytes_send = send(socket, self->write_buffer + bytes_written, message_size - bytes_written, 0); if (bytes_send < 0) { return LF_ERR; @@ -132,7 +128,6 @@ lf_ret_t TcpIpChannel_send(NetworkChannel *untyped_self, TaggedMessage *message) TaggedMessage *TcpIpChannel_receive(NetworkChannel *untyped_self) { TcpIpChannel *self = (TcpIpChannel *)untyped_self; - int bytes_available; int socket; // based if this super is in the server or client role we need to select different sockets @@ -142,37 +137,28 @@ TaggedMessage *TcpIpChannel_receive(NetworkChannel *untyped_self) { socket = self->fd; } - if (self->blocking) { - ioctl(socket, FIONREAD, &bytes_available); - bytes_available = MIN(8, bytes_available); - } else { - // peek into file descriptor to figure how many bytes are available. - ioctl(socket, FIONREAD, &bytes_available); - } - - if (bytes_available == 0) { - return NULL; - } - // calculating the maximum amount of bytes we can read - if (bytes_available + self->read_index >= TCP_IP_CHANNEL_BUFFERSIZE) { - bytes_available = TCP_IP_CHANNEL_BUFFERSIZE - self->read_index; - } + int bytes_available = TCP_IP_CHANNEL_BUFFERSIZE - self->read_index; + int bytes_left = 0; + bool read_more = true; - // reading from socket - int bytes_read = read(socket, self->read_buffer + self->read_index, bytes_available); + while (read_more) { - if (bytes_read < 0) { - printf("error during reading errno: %i\n", errno); - return NULL; - } - - self->read_index += bytes_read; + // reading from socket + int bytes_read = recv(socket, self->read_buffer + self->read_index, bytes_available, 0); - int bytes_left = decode_protobuf(&self->output, self->read_buffer, self->read_index); + if (bytes_read < 0) { + LF_ERR(NET, "Error recv from socket %d", errno); + continue; + } - if (bytes_left < 0) { - return NULL; + self->read_index += bytes_read; + bytes_left = decode_protobuf(&self->output, self->read_buffer, self->read_index); + if (bytes_left < 0) { + read_more = true; + } else { + read_more = false; + } } memcpy(self->read_buffer, self->read_buffer + (self->read_index - bytes_left), bytes_left); @@ -190,36 +176,9 @@ void TcpIpChannel_close(NetworkChannel *untyped_self) { close(self->fd); } -void TcpIpChannel_change_block_state(NetworkChannel *untyped_self, bool blocking) { - TcpIpChannel *self = (TcpIpChannel *)untyped_self; - - self->blocking = blocking; - - int fd_socket_config = fcntl(self->fd, F_GETFL); - - if (blocking) { - fcntl(self->fd, F_SETFL, fd_socket_config | (~O_NONBLOCK)); - - if (self->server) { - fcntl(self->client, F_SETFL, fd_socket_config | (~O_NONBLOCK)); - } - } else { - // configure the socket to be non-blocking - - fcntl(self->fd, F_SETFL, fd_socket_config | O_NONBLOCK); - - if (self->server) { - fcntl(self->client, F_SETFL, fd_socket_config | O_NONBLOCK); - } - } -} - void *TcpIpChannel_receive_thread(void *untyped_self) { TcpIpChannel *self = untyped_self; - // turning on blocking receive on this socket - self->super.change_block_state(untyped_self, true); - // set terminate to false so the loop runs self->terminate = false; @@ -265,7 +224,6 @@ void TcpIpChannel_ctor(TcpIpChannel *self, const char *host, unsigned short port self->super.close = TcpIpChannel_close; self->super.receive = TcpIpChannel_receive; self->super.send = TcpIpChannel_send; - self->super.change_block_state = TcpIpChannel_change_block_state; self->super.register_callback = TcpIpChannel_register_callback; self->super.free = TcpIpChannel_free; self->receive_callback = NULL;