From 046acb4b60204e314d7aa5c6fb63d6434f4c6ba5 Mon Sep 17 00:00:00 2001 From: tanneberger Date: Tue, 21 Jan 2025 00:59:12 +0100 Subject: [PATCH] clean up --- examples/CMakeLists.txt | 1 - examples/multiport_mutation/CMakeLists.txt | 5 +- examples/multiport_mutation/consumer.hh | 6 +- examples/multiport_mutation/load_balancer.hh | 11 +- examples/multiport_mutation/main.cc | 17 +-- .../multiport_mutation/multiport_to_bank.hh | 114 +++++++++--------- examples/multiport_mutation/producer.hh | 2 +- include/reactor-cpp/connection.hh | 12 +- include/reactor-cpp/reactor.hh | 8 +- include/reactor-cpp/statistics.hh | 5 + include/reactor-cpp/time_barrier.hh | 4 +- include/reactor-cpp/value_ptr.hh | 20 +-- lib/environment.cc | 3 +- lib/reactor.cc | 28 ++++- lib/reactor_element.cc | 10 +- 15 files changed, 134 insertions(+), 112 deletions(-) diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 5b9bbf09..5f05c960 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -7,4 +7,3 @@ add_subdirectory(count) add_subdirectory(ports) add_subdirectory(hello) add_subdirectory(power_train) -add_subdirectory(multiport_mutation) diff --git a/examples/multiport_mutation/CMakeLists.txt b/examples/multiport_mutation/CMakeLists.txt index b19dd98a..6361e8e4 100644 --- a/examples/multiport_mutation/CMakeLists.txt +++ b/examples/multiport_mutation/CMakeLists.txt @@ -1,3 +1,2 @@ -add_executable(mutation_multiports main.cc) -target_link_libraries(mutation_multiports reactor-cpp) -add_dependencies(examples mutation_multiports) +add_executable(mutation_multiports main.cc) target_link_libraries(mutation_multiports reactor - cpp) + add_dependencies(examples mutation_multiports) diff --git a/examples/multiport_mutation/consumer.hh b/examples/multiport_mutation/consumer.hh index 8b263f75..fc899b17 100644 --- a/examples/multiport_mutation/consumer.hh +++ b/examples/multiport_mutation/consumer.hh @@ -15,7 +15,7 @@ class Consumer final : public Reactor { // NOLINT std::size_t index_ = 0; void reaction_1(const Input& in) const { - std::cout << "consumer: " << index_ << " received value:" << *in.get() << '\n'; + // std::cout << "consumer: " << index_ << " received value:" << *in.get() << '\n'; } friend Consumer; @@ -34,9 +34,7 @@ public: Input in{"in", this}; // NOLINT - void assemble() override { - handle.declare_trigger(&in); - } + void assemble() override { handle.declare_trigger(&in); } }; #endif // CONSUMER_HH diff --git a/examples/multiport_mutation/load_balancer.hh b/examples/multiport_mutation/load_balancer.hh index 913c86a6..11820c69 100644 --- a/examples/multiport_mutation/load_balancer.hh +++ b/examples/multiport_mutation/load_balancer.hh @@ -16,16 +16,15 @@ class LoadBalancer final : public Reactor { // NOLINT // reaction bodies static void reaction_1(const Input& inbound, Output& scale_bank, Multiport>& outbound) { - if (std::rand() % 15 == 0) { // NOLINT + if (std::rand() % 15 == 0) { // NOLINT scale_bank.set(std::rand() % 20 + 1); // NOLINT } const unsigned sel = std::rand() % outbound.size(); // NOLINT - std::cout << "Sending out to:" << sel << '\n'; + // std::cout << "Sending out to:" << sel << '\n'; outbound[sel].set(inbound.get()); outbound[std::min(4ul, outbound.size() - 1)].set(inbound.get()); } - friend LoadBalancer; }; @@ -44,10 +43,10 @@ public: } ~LoadBalancer() override = default; - ModifableMultiport> out{"out", this}; // NOLINT + ModifableMultiport> out{"out", this}; // NOLINT std::size_t out_size_ = 0; - Input inbound{"inbound", this}; // NOLINT - Output scale_bank{"scale_bank", this}; // NOLINT + Input inbound{"inbound", this}; // NOLINT + Output scale_bank{"scale_bank", this}; // NOLINT void assemble() override { std::cout << "assemble LoadBalancer\n"; diff --git a/examples/multiport_mutation/main.cc b/examples/multiport_mutation/main.cc index f1fb9241..0fa82558 100644 --- a/examples/multiport_mutation/main.cc +++ b/examples/multiport_mutation/main.cc @@ -4,9 +4,9 @@ #include #include -#include "./multiport_to_bank.hh" #include "./consumer.hh" #include "./load_balancer.hh" +#include "./multiport_to_bank.hh" #include "./producer.hh" #include @@ -19,7 +19,6 @@ class Deployment final : public Reactor { // NOLINT Reaction scale_bank{"scale_bank", 1, this, [this]() { this->_inner.reaction_1(this->scale, this->consumers_, load_balancer_->out); }}; - class Inner : public MutableScope { int state = 0; @@ -35,15 +34,9 @@ class Deployment final : public Reactor { // NOLINT return std::make_unique(_lf_inst_name, env, index); }; - std::function get_input_port = [](const std::unique_ptr& consumer) { - return &consumer->in; - }; - auto rescale = std::make_shared>( - &load_balancer, - &reactor_bank, - get_input_port, - lambda, - new_size); + std::function get_input_port = [](const std::unique_ptr& consumer) { return &consumer->in; }; + auto rescale = std::make_shared>(&load_balancer, &reactor_bank, + get_input_port, lambda, new_size); add_to_transaction(rescale); @@ -53,7 +46,7 @@ class Deployment final : public Reactor { // NOLINT friend LoadBalancer; }; -Inner _inner; + Inner _inner; public: Deployment(const std::string& name, Environment* env) diff --git a/examples/multiport_mutation/multiport_to_bank.hh b/examples/multiport_mutation/multiport_to_bank.hh index ac48fc2d..974534d7 100644 --- a/examples/multiport_mutation/multiport_to_bank.hh +++ b/examples/multiport_mutation/multiport_to_bank.hh @@ -5,12 +5,12 @@ #ifndef MULTIPORT_TO_BANK_HH #define MULTIPORT_TO_BANK_HH -#include #include -#include -#include +#include #include #include +#include +#include #include #include "../../lib/mutations/bank.cc" @@ -21,75 +21,71 @@ namespace reactor { - template - class ResizeMultiportToBank : public Mutation { - ModifableMultiport>* multiport_; - std::vector>* bank_; - std::function*(const std::unique_ptr&)> get_input_port_; - std::function(Environment* env, std::size_t index)> create_lambda_; - std::size_t new_size_ = 0; - public: - ResizeMultiportToBank(ModifableMultiport>* multiport, - std::vector>* bank, - std::function*(const std::unique_ptr&)> get_input_port, - std::function(Environment* env, std::size_t index)> create_lambda, - std::size_t new_size) : - multiport_(multiport), bank_(bank), get_input_port_(get_input_port), create_lambda_(create_lambda), new_size_(new_size) {} - - ~ResizeMultiportToBank() = default; - auto run() -> MutationResult { - if (multiport_->size() != bank_->size()) { - return NotMatchingBankSize; - } - auto old_size = multiport_->size(); - - if (new_size_ > old_size) { - // TODO: this is an assumption - auto change_multiport_size = - std::make_shared>(multiport_, new_size_); +template class ResizeMultiportToBank : public Mutation { + ModifableMultiport>* multiport_; + std::vector>* bank_; + std::function*(const std::unique_ptr&)> get_input_port_; + std::function(Environment* env, std::size_t index)> create_lambda_; + std::size_t new_size_ = 0; - change_multiport_size->run(); +public: + ResizeMultiportToBank(ModifableMultiport>* multiport, + std::vector>* bank, + std::function*(const std::unique_ptr&)> get_input_port, + std::function(Environment* env, std::size_t index)> create_lambda, + std::size_t new_size) + : multiport_(multiport) + , bank_(bank) + , get_input_port_(get_input_port) + , create_lambda_(create_lambda) + , new_size_(new_size) {} - auto change_bank_size = std::make_shared>>( - bank_, (*bank_)[0]->environment(), new_size_, create_lambda_); + ~ResizeMultiportToBank() = default; + auto run() -> MutationResult { + if (multiport_->size() != bank_->size()) { + return NotMatchingBankSize; + } + auto old_size = multiport_->size(); - change_bank_size->run(); + if (new_size_ > old_size) { + auto change_multiport_size = std::make_shared>(multiport_, new_size_); - for (auto i = old_size; i < new_size_; i++) { - auto add_conn = std::make_shared, Input>>( - &(*multiport_)[i], get_input_port_((*bank_)[i]), (*bank_)[0]->environment(), true); + change_multiport_size->run(); - add_conn->run(); - } - } else if (new_size_ < old_size) { - for (auto i = old_size - 1; i >= new_size_; i--) { - auto add_conn = std::make_shared, Input>>( - &(*multiport_)[i], get_input_port_((*bank_)[i]), (*bank_)[0]->environment(), false); + auto change_bank_size = std::make_shared>>( + bank_, (*bank_)[0]->environment(), new_size_, create_lambda_); - add_conn->run(); - } + change_bank_size->run(); - auto change_multiport_size = - std::make_shared>(multiport_, new_size_); + for (auto i = old_size; i < new_size_; i++) { + auto add_conn = std::make_shared, Input>>( + &(*multiport_)[i], get_input_port_((*bank_)[i]), (*bank_)[0]->environment(), true); - change_multiport_size->run(); + add_conn->run(); + } + } else if (new_size_ < old_size) { + for (auto i = old_size - 1; i >= new_size_; i--) { + auto add_conn = std::make_shared, Input>>( + &(*multiport_)[i], get_input_port_((*bank_)[i]), (*bank_)[0]->environment(), false); - auto change_bank_size = std::make_shared>>( - bank_, (*bank_)[0]->environment(), new_size_, create_lambda_); + add_conn->run(); + } - change_bank_size->run(); - } + auto change_multiport_size = std::make_shared>(multiport_, new_size_); + change_multiport_size->run(); - return Success; - } + auto change_bank_size = std::make_shared>>( + bank_, (*bank_)[0]->environment(), new_size_, create_lambda_); - auto rollback() -> MutationResult { - return Success; - } - }; -} + change_bank_size->run(); + } + return Success; + } + auto rollback() -> MutationResult { return Success; } +}; +} // namespace reactor -#endif //MULTIPORT_TO_BANK_HH +#endif // MULTIPORT_TO_BANK_HH diff --git a/examples/multiport_mutation/producer.hh b/examples/multiport_mutation/producer.hh index d6416e9a..6baf3aa4 100644 --- a/examples/multiport_mutation/producer.hh +++ b/examples/multiport_mutation/producer.hh @@ -15,7 +15,7 @@ private: unsigned int counter_ = 0; void reaction_1([[maybe_unused]] Output& out) { - std::cout << "producing value:" << counter_ << "\n"; + // std::cout << "producing value:" << counter_ << "\n"; out.set(counter_++); } diff --git a/include/reactor-cpp/connection.hh b/include/reactor-cpp/connection.hh index 106738fc..07000a6c 100644 --- a/include/reactor-cpp/connection.hh +++ b/include/reactor-cpp/connection.hh @@ -143,8 +143,8 @@ public: }; } - auto acquire_tag(const Tag& tag, std::unique_lock& lock, - const std::function& abort_waiting) -> bool override { + auto acquire_tag(const Tag& tag, std::unique_lock& lock, const std::function& abort_waiting) + -> bool override { reactor_assert(lock.owns_lock()); log_.debug() << "downstream tries to acquire tag " << tag; @@ -210,8 +210,8 @@ public: }; } - auto acquire_tag(const Tag& tag, std::unique_lock& lock, - const std::function& abort_waiting) -> bool override { + auto acquire_tag(const Tag& tag, std::unique_lock& lock, const std::function& abort_waiting) + -> bool override { // Since this is a delayed connection, we can go back in time and need to // acquire the latest upstream tag that can create an event at the given // tag. We also need to consider that given a delay d and a tag g=(t, n), @@ -240,8 +240,8 @@ public: }; } - auto acquire_tag(const Tag& tag, std::unique_lock& lock, - const std::function& abort_waiting) -> bool override { + auto acquire_tag(const Tag& tag, std::unique_lock& lock, const std::function& abort_waiting) + -> bool override { this->log_.debug() << "downstream tries to acquire tag " << tag; return PhysicalTimeBarrier::acquire_tag(tag, lock, this->environment()->scheduler(), abort_waiting); } diff --git a/include/reactor-cpp/reactor.hh b/include/reactor-cpp/reactor.hh index 74b9e430..8f1635bb 100644 --- a/include/reactor-cpp/reactor.hh +++ b/include/reactor-cpp/reactor.hh @@ -31,12 +31,16 @@ private: void register_action(BaseAction* action); void register_input(BasePort* port); - void unregister_input(BasePort* port); void register_output(BasePort* port); - void unregister_output(BasePort* port); void register_reaction(Reaction* reaction); void register_reactor(Reactor* reactor); + void unregister_action(BaseAction* action); + void unregister_input(BasePort* port); + void unregister_output(BasePort* port); + void unregister_reaction(Reaction* reaction); + void unregister_reactor(Reactor* reactor); + public: Reactor(const std::string& name, Reactor* container); Reactor(const std::string& name, Environment* environment); diff --git a/include/reactor-cpp/statistics.hh b/include/reactor-cpp/statistics.hh index 9232824a..76a91562 100644 --- a/include/reactor-cpp/statistics.hh +++ b/include/reactor-cpp/statistics.hh @@ -54,12 +54,17 @@ public: static void increment_reactions() { increment(reactions_); } static void increment_actions() { increment(actions_); } static void increment_ports() { increment(ports_); } + static void increment_processed_events() { increment(processed_events_); } static void increment_processed_reactions() { increment(processed_reactions_); } static void increment_triggered_actions() { increment(triggered_actions_); } static void increment_set_ports() { increment(set_ports_); } static void increment_scheduled_actions() { increment(scheduled_actions_); } + static void decrement_reactor_instances() { decrement(reactor_instances_); } + static void decrement_connections() { decrement(connections_); } + static void decrement_reactions() { decrement(reactions_); } + static void decrement_actions() { decrement(actions_); } static void decrement_ports() { decrement(ports_); } static auto reactor_instances() { return reactor_instances_.load(std::memory_order_acquire); } diff --git a/include/reactor-cpp/time_barrier.hh b/include/reactor-cpp/time_barrier.hh index a5874106..b69e2045 100644 --- a/include/reactor-cpp/time_barrier.hh +++ b/include/reactor-cpp/time_barrier.hh @@ -80,8 +80,8 @@ public: // The caller must hold a lock on the scheduler mutex auto try_acquire_tag(const Tag& tag) { return tag <= released_time_; } - auto acquire_tag(const Tag& tag, std::unique_lock& lock, - const std::function& abort_waiting) -> bool { + auto acquire_tag(const Tag& tag, std::unique_lock& lock, const std::function& abort_waiting) + -> bool { if (try_acquire_tag(tag)) { return true; } diff --git a/include/reactor-cpp/value_ptr.hh b/include/reactor-cpp/value_ptr.hh index a6d55089..7a8f6c75 100644 --- a/include/reactor-cpp/value_ptr.hh +++ b/include/reactor-cpp/value_ptr.hh @@ -517,23 +517,23 @@ public: // Comparison operators template -auto operator==(const MutableValuePtr& ptr1, - const MutableValuePtr& ptr2) noexcept -> bool { +auto operator==(const MutableValuePtr& ptr1, const MutableValuePtr& ptr2) noexcept + -> bool { return ptr1.get() == ptr2.get(); } template -auto operator==(const ImmutableValuePtr& ptr1, - const ImmutableValuePtr& ptr2) noexcept -> bool { +auto operator==(const ImmutableValuePtr& ptr1, const ImmutableValuePtr& ptr2) noexcept + -> bool { return ptr1.get() == ptr2.get(); } template -auto operator==(const ImmutableValuePtr& ptr1, - const MutableValuePtr& ptr2) noexcept -> bool { +auto operator==(const ImmutableValuePtr& ptr1, const MutableValuePtr& ptr2) noexcept + -> bool { return ptr1.get() == ptr2.get(); } template -auto operator==(const MutableValuePtr& ptr1, - const ImmutableValuePtr& ptr2) noexcept -> bool { +auto operator==(const MutableValuePtr& ptr1, const ImmutableValuePtr& ptr2) noexcept + -> bool { return ptr1.get() == ptr2.get(); } template @@ -554,8 +554,8 @@ auto operator==(std::nullptr_t, const ImmutableValuePtr& ptr1) no } template -auto operator!=(const MutableValuePtr& ptr1, - const MutableValuePtr& ptr2) noexcept -> bool { +auto operator!=(const MutableValuePtr& ptr1, const MutableValuePtr& ptr2) noexcept + -> bool { return ptr1.get() != ptr2.get(); } diff --git a/lib/environment.cc b/lib/environment.cc index 4ec92de0..5a659e60 100644 --- a/lib/environment.cc +++ b/lib/environment.cc @@ -59,8 +59,7 @@ void Environment::unregister_reactor(Reactor* reactor) { validate(this->phase() == Phase::Construction || this->phase() == Phase::Mutation, "Reactors may only be unregistered during construction phase!"); validate(reactor->is_top_level(), "The environment may only contain top level reactors!"); - [[maybe_unused]] std::size_t result = top_level_reactors_.erase(reactor); - reactor_assert(result > 0); + top_level_reactors_.erase(reactor); } void Environment::register_input_action(BaseAction* action) { diff --git a/lib/reactor.cc b/lib/reactor.cc index 491050a8..784ba48d 100644 --- a/lib/reactor.cc +++ b/lib/reactor.cc @@ -24,7 +24,7 @@ Reactor::Reactor(const std::string& name, Environment* environment) environment->register_reactor(this); } -Reactor::~Reactor() { environment()->unregister_reactor(this); } +Reactor::~Reactor() = default; void Reactor::register_action([[maybe_unused]] BaseAction* action) { reactor_assert(action != nullptr); @@ -36,6 +36,15 @@ void Reactor::register_action([[maybe_unused]] BaseAction* action) { Statistics::increment_actions(); } +void Reactor::unregister_action([[maybe_unused]] BaseAction* action) { + reactor_assert(action != nullptr); + reactor::validate(this->environment()->phase() == Phase::Construction || + this->environment()->phase() == Phase::Assembly, + "Actions can only be registered during construction phase!"); + actions_.erase(action); + Statistics::decrement_actions(); +} + void Reactor::register_input(BasePort* port) { reactor_assert(port != nullptr); reactor::validate(this->environment()->phase() == Phase::Construction || @@ -86,6 +95,15 @@ void Reactor::register_reaction([[maybe_unused]] Reaction* reaction) { Statistics::increment_reactions(); } +void Reactor::unregister_reaction([[maybe_unused]] Reaction* reaction) { + reactor_assert(reaction != nullptr); + + validate(this->environment()->phase() == Phase::Construction || this->environment()->phase() == Phase::Mutation, + "Reactions can only be registered during construction phase!"); + reactions_.erase(reaction); + Statistics::decrement_reactions(); +} + void Reactor::register_reactor([[maybe_unused]] Reactor* reactor) { reactor_assert(reactor != nullptr); validate(this->environment()->phase() == Phase::Construction || this->environment()->phase() == Phase::Mutation, @@ -95,6 +113,14 @@ void Reactor::register_reactor([[maybe_unused]] Reactor* reactor) { Statistics::increment_reactor_instances(); } +void Reactor::unregister_reactor([[maybe_unused]] Reactor* reactor) { + reactor_assert(reactor != nullptr); + validate(this->environment()->phase() == Phase::Construction || this->environment()->phase() == Phase::Mutation, + "Reactions can only be registered during construction phase!"); + reactors_.erase(reactor); + Statistics::decrement_reactor_instances(); +} + void Reactor::register_connection([[maybe_unused]] std::unique_ptr&& connection) { reactor_assert(connection != nullptr); [[maybe_unused]] auto result = connections_.insert(std::move(connection)).second; diff --git a/lib/reactor_element.cc b/lib/reactor_element.cc index e4811d08..278eff7b 100644 --- a/lib/reactor_element.cc +++ b/lib/reactor_element.cc @@ -93,7 +93,7 @@ ReactorElement::~ReactorElement() { switch (type_) { case Type::Action: // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) - // container->register_action(reinterpret_cast(this)); + container_->unregister_action(reinterpret_cast(this)); break; case Type::Input: // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) @@ -105,11 +105,15 @@ ReactorElement::~ReactorElement() { break; case Type::Reaction: // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) - // container->register_reaction(reinterpret_cast(this)); + container_->unregister_reaction(reinterpret_cast(this)); break; case Type::Reactor: // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) - // container->register_reactor(reinterpret_cast(this)); + if (container_ == nullptr) { + environment_->unregister_reactor(reinterpret_cast(this)); + } else { + container_->unregister_reactor(reinterpret_cast(this)); + } break; default: break;