Skip to content

Commit

Permalink
lifecycle::StateMachine integration into Block<T>
Browse files Browse the repository at this point in the history
Signed-off-by: Ralph J. Steinhagen <[email protected]>
  • Loading branch information
RalphSteinhagen committed Feb 6, 2024
1 parent de7ab7a commit c96c823
Show file tree
Hide file tree
Showing 10 changed files with 213 additions and 210 deletions.
35 changes: 23 additions & 12 deletions blocks/basic/include/gnuradio-4.0/basic/clock_source.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,14 +189,14 @@ struct ClockSource : public gr::Block<ClockSource<T, useIoThread, ClockSourceTyp
return false; // use Block<T>::work generated thread
}
if (verbose_console) {
fmt::println("initial ClockSource state: {}", magic_enum::enum_name(this->state.load()));
fmt::println("initial ClockSource state: {}", magic_enum::enum_name(this->state()));
}
if (lifecycle::State expectedThreadState = lifecycle::State::INITIALISED; this->state.compare_exchange_strong(expectedThreadState, lifecycle::State::RUNNING, std::memory_order_acq_rel)) {
if (lifecycle::State expectedThreadState = lifecycle::State::INITIALISED; this->_state.compare_exchange_strong(expectedThreadState, lifecycle::State::RUNNING, std::memory_order_acq_rel)) {
// mocks re-using a user-provided thread
if (verbose_console) {
fmt::println("mocking a user-provided io-Thread for {}", this->name);
}
this->state.notify_all();
this->_state.notify_all();
auto createManagedThread = [](auto &&threadFunction, auto &&threadDeleter) {
return std::shared_ptr<std::thread>(new std::thread(std::forward<decltype(threadFunction)>(threadFunction)), std::forward<decltype(threadDeleter)>(threadDeleter));
};
Expand All @@ -205,29 +205,40 @@ struct ClockSource : public gr::Block<ClockSource<T, useIoThread, ClockSourceTyp
if (verbose_console) {
fmt::println("started user-provided thread");
}
lifecycle::State actualThreadState = this->state.load();
lifecycle::State actualThreadState = this->state();
while (lifecycle::isActive(actualThreadState)) {
std::this_thread::sleep_until(nextTimePoint);
// invoke and execute work function from user-provided thread
const work::Status status = this->invokeWork();
if (status == work::Status::DONE) {
std::atomic_store_explicit(&this->state, lifecycle::State::REQUESTED_STOP, std::memory_order_release);
this->state.notify_all();
if (auto ret = this->changeStateTo(lifecycle::State::REQUESTED_STOP); !ret) {
using namespace gr::message;
this->emitMessage(this->msgOut, { { key::Sender, this->unique_name },
{ key::Kind, kind::Error },
{ key::ErrorInfo, ret.error().message },
{ key::Location, ret.error().srcLoc() } });
}
break;
}
actualThreadState = this->state.load();
actualThreadState = this->state();
this->ioLastWorkStatus.exchange(status, std::memory_order_relaxed);
}

if (verbose_console) {
fmt::println("stopped user-provided thread - state: {}", magic_enum::enum_name(this->state.load()));
fmt::println("stopped user-provided thread - state: {}", magic_enum::enum_name(this->state()));
}
if (auto ret = this->changeStateTo(lifecycle::State::STOPPED); !ret) {
using namespace gr::message;
this->emitMessage(this->msgOut,
{ { key::Sender, this->unique_name }, { key::Kind, kind::Error }, { key::ErrorInfo, ret.error().message }, { key::Location, ret.error().srcLoc() } });
}
std::atomic_store_explicit(&this->state, lifecycle::State::STOPPED, std::memory_order_release);
this->state.notify_all();
},
[this](std::thread *t) {
std::atomic_store_explicit(&this->state, lifecycle::State::STOPPED, std::memory_order_release);
this->state.notify_all();
if (auto ret = this->changeStateTo(lifecycle::State::STOPPED); !ret) {
using namespace gr::message;
this->emitMessage(this->msgOut,
{ { key::Sender, this->unique_name }, { key::Kind, kind::Error }, { key::ErrorInfo, ret.error().message }, { key::Location, ret.error().srcLoc() } });
}
if (t->joinable()) {
t->join();
}
Expand Down
26 changes: 8 additions & 18 deletions blocks/basic/include/gnuradio-4.0/basic/common_blocks.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ ENABLE_REFLECTION_FOR_TEMPLATE(builtin_counter, in, out);
// - use Block::set_name instead of returning an empty name
// TODO: Inherit from Block class when create new block.
template<typename T>
class multi_adder : public gr::BlockModel {
class multi_adder : public gr::lifecycle::StateMachine<multi_adder<T>>, public gr::BlockModel {
static std::atomic_size_t _unique_id_counter;

public:
Expand Down Expand Up @@ -112,21 +112,6 @@ class multi_adder : public gr::BlockModel {
void
init(std::shared_ptr<gr::Sequence> /*progress*/, std::shared_ptr<gr::thread_pool::BasicThreadPool> /*ioThreadPool*/) override {}

void
start() override {}

void
stop() override {}

void
pause() override {}

void
resume() override {}

void
reset() override {}

[[nodiscard]] std::string_view
name() const override {
return unique_name_;
Expand All @@ -142,9 +127,14 @@ class multi_adder : public gr::BlockModel {
return false;
}

constexpr gr::lifecycle::State
[[nodiscard]] std::expected<void, gr::lifecycle::ErrorType>
changeState(gr::lifecycle::State newState) noexcept override {
return this->changeStateTo(newState);
}

[[nodiscard]] constexpr gr::lifecycle::State
state() const noexcept override {
return gr::lifecycle::State::RUNNING;
return this->state();
}

[[nodiscard]] constexpr std::size_t
Expand Down
5 changes: 4 additions & 1 deletion blocks/testing/include/gnuradio-4.0/testing/TagMonitors.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,10 @@ struct TagSink : public Block<TagSink<T, UseProcessVariant>> {
}
n_samples_produced++;
if (n_samples_expected > 0 && n_samples_produced >= n_samples_expected) {
this->state = lifecycle::State::STOPPED;
if (auto ret = this->changeStateTo(lifecycle::State::REQUESTED_STOP); !ret) {
using namespace gr::message;
this->emitMessage(this->msgOut, { { key::Sender, this->unique_name }, { key::Kind, kind::Error }, { key::ErrorInfo, ret.error().message }, { key::Location, ret.error().srcLoc() } });
}
}
_timeLastSample = ClockSourceType::now();
}
Expand Down
Loading

0 comments on commit c96c823

Please sign in to comment.