Skip to content

Commit

Permalink
clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
tanneberger committed Jan 20, 2025
1 parent a723f07 commit d7c0f98
Show file tree
Hide file tree
Showing 19 changed files with 145 additions and 132 deletions.
12 changes: 3 additions & 9 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,10 +1,4 @@
include_directories(
"${PROJECT_SOURCE_DIR}/include"
)
include_directories("${PROJECT_SOURCE_DIR}/include")

add_custom_target(examples)
add_subdirectory(count)
add_subdirectory(ports)
add_subdirectory(hello)
add_subdirectory(power_train)
add_subdirectory(multiport_mutation)
add_custom_target(examples) add_subdirectory(count) add_subdirectory(ports) add_subdirectory(hello)
add_subdirectory(power_train) add_subdirectory(multiport_mutation)
5 changes: 2 additions & 3 deletions examples/count/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
add_executable(count EXCLUDE_FROM_ALL main.cc)
target_link_libraries(count reactor-cpp)
add_dependencies(examples count)
add_executable(count EXCLUDE_FROM_ALL main.cc) target_link_libraries(count reactor - cpp)
add_dependencies(examples count)
5 changes: 2 additions & 3 deletions examples/hello/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
add_executable(hello EXCLUDE_FROM_ALL main.cc)
target_link_libraries(hello reactor-cpp)
add_dependencies(examples hello)
add_executable(hello EXCLUDE_FROM_ALL main.cc) target_link_libraries(hello reactor - cpp)
add_dependencies(examples hello)
5 changes: 2 additions & 3 deletions examples/multiport_mutation/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
add_executable(mutation_multiports main.cc)
target_link_libraries(mutation_multiports reactor-cpp)
add_dependencies(examples mutation_multiports)
add_executable(mutation_multiports main.cc) target_link_libraries(mutation_multiports reactor - cpp)
add_dependencies(examples mutation_multiports)
6 changes: 2 additions & 4 deletions examples/multiport_mutation/consumer.hh
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class Consumer final : public Reactor { // NOLINT
std::size_t index_ = 0;

void reaction_1(const Input<unsigned>& in) const {
std::cout << "consumer: " << index_ << " received value:" << *in.get() << '\n';
// std::cout << "consumer: " << index_ << " received value:" << *in.get() << '\n';
}

friend Consumer;
Expand All @@ -34,9 +34,7 @@ public:

Input<unsigned> in{"in", this}; // NOLINT

void assemble() override {
handle.declare_trigger(&in);
}
void assemble() override { handle.declare_trigger(&in); }
};

#endif // CONSUMER_HH
11 changes: 5 additions & 6 deletions examples/multiport_mutation/load_balancer.hh
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,15 @@ class LoadBalancer final : public Reactor { // NOLINT
// reaction bodies
static void reaction_1(const Input<unsigned>& inbound, Output<unsigned>& scale_bank,
Multiport<Output<unsigned>>& outbound) {
if (std::rand() % 15 == 0) { // NOLINT
if (std::rand() % 15 == 0) { // NOLINT
scale_bank.set(std::rand() % 20 + 1); // NOLINT
}
const unsigned sel = std::rand() % outbound.size(); // NOLINT
std::cout << "Sending out to:" << sel << '\n';
// std::cout << "Sending out to:" << sel << '\n';
outbound[sel].set(inbound.get());
outbound[std::min(4ul, outbound.size() - 1)].set(inbound.get());
}


friend LoadBalancer;
};

Expand All @@ -44,10 +43,10 @@ public:
}
~LoadBalancer() override = default;

ModifableMultiport<Output<unsigned>> out{"out", this}; // NOLINT
ModifableMultiport<Output<unsigned>> out{"out", this}; // NOLINT
std::size_t out_size_ = 0;
Input<unsigned> inbound{"inbound", this}; // NOLINT
Output<unsigned> scale_bank{"scale_bank", this}; // NOLINT
Input<unsigned> inbound{"inbound", this}; // NOLINT
Output<unsigned> scale_bank{"scale_bank", this}; // NOLINT

void assemble() override {
std::cout << "assemble LoadBalancer\n";
Expand Down
17 changes: 5 additions & 12 deletions examples/multiport_mutation/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
#include <reactor-cpp/mutations/bank.hh>
#include <reactor-cpp/mutations/connection.hh>

#include "./multiport_to_bank.hh"
#include "./consumer.hh"
#include "./load_balancer.hh"
#include "./multiport_to_bank.hh"
#include "./producer.hh"
#include <reactor-cpp/reactor-cpp.hh>

Expand All @@ -19,7 +19,6 @@ class Deployment final : public Reactor { // NOLINT
Reaction scale_bank{"scale_bank", 1, this,
[this]() { this->_inner.reaction_1(this->scale, this->consumers_, load_balancer_->out); }};


class Inner : public MutableScope {
int state = 0;

Expand All @@ -35,15 +34,9 @@ class Deployment final : public Reactor { // NOLINT
return std::make_unique<Consumer>(_lf_inst_name, env, index);
};

std::function get_input_port = [](const std::unique_ptr<Consumer>& consumer) {
return &consumer->in;
};
auto rescale = std::make_shared<ResizeMultiportToBank<unsigned, Consumer>>(
&load_balancer,
&reactor_bank,
get_input_port,
lambda,
new_size);
std::function get_input_port = [](const std::unique_ptr<Consumer>& consumer) { return &consumer->in; };
auto rescale = std::make_shared<ResizeMultiportToBank<unsigned, Consumer>>(&load_balancer, &reactor_bank,
get_input_port, lambda, new_size);

add_to_transaction(rescale);

Expand All @@ -53,7 +46,7 @@ class Deployment final : public Reactor { // NOLINT
friend LoadBalancer;
};

Inner _inner;
Inner _inner;

public:
Deployment(const std::string& name, Environment* env)
Expand Down
114 changes: 55 additions & 59 deletions examples/multiport_mutation/multiport_to_bank.hh
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
#ifndef MULTIPORT_TO_BANK_HH
#define MULTIPORT_TO_BANK_HH

#include <reactor-cpp/mutations.hh>
#include <reactor-cpp/multiport.hh>
#include <reactor-cpp/port.hh>
#include <reactor-cpp/mutations/multiport.hh>
#include <reactor-cpp/mutations.hh>
#include <reactor-cpp/mutations/bank.hh>
#include <reactor-cpp/mutations/connection.hh>
#include <reactor-cpp/mutations/multiport.hh>
#include <reactor-cpp/port.hh>
#include <reactor-cpp/reactor.hh>

#include "../../lib/mutations/bank.cc"
Expand All @@ -21,75 +21,71 @@

namespace reactor {

template<class PortType, class ReactorType>
class ResizeMultiportToBank : public Mutation {
ModifableMultiport<Output<PortType>>* multiport_;
std::vector<std::unique_ptr<ReactorType>>* bank_;
std::function<Input<PortType>*(const std::unique_ptr<ReactorType>&)> get_input_port_;
std::function<std::unique_ptr<ReactorType>(Environment* env, std::size_t index)> create_lambda_;
std::size_t new_size_ = 0;
public:
ResizeMultiportToBank(ModifableMultiport<Output<PortType>>* multiport,
std::vector<std::unique_ptr<ReactorType>>* bank,
std::function<Input<PortType>*(const std::unique_ptr<ReactorType>&)> get_input_port,
std::function<std::unique_ptr<ReactorType>(Environment* env, std::size_t index)> create_lambda,
std::size_t new_size) :
multiport_(multiport), bank_(bank), get_input_port_(get_input_port), create_lambda_(create_lambda), new_size_(new_size) {}

~ResizeMultiportToBank() = default;
auto run() -> MutationResult {
if (multiport_->size() != bank_->size()) {
return NotMatchingBankSize;
}
auto old_size = multiport_->size();

if (new_size_ > old_size) {
// TODO: this is an assumption
auto change_multiport_size =
std::make_shared<MutationChangeOutputMultiportSize<unsigned>>(multiport_, new_size_);
template <class PortType, class ReactorType> class ResizeMultiportToBank : public Mutation {
ModifableMultiport<Output<PortType>>* multiport_;
std::vector<std::unique_ptr<ReactorType>>* bank_;
std::function<Input<PortType>*(const std::unique_ptr<ReactorType>&)> get_input_port_;
std::function<std::unique_ptr<ReactorType>(Environment* env, std::size_t index)> create_lambda_;
std::size_t new_size_ = 0;

change_multiport_size->run();
public:
ResizeMultiportToBank(ModifableMultiport<Output<PortType>>* multiport,
std::vector<std::unique_ptr<ReactorType>>* bank,
std::function<Input<PortType>*(const std::unique_ptr<ReactorType>&)> get_input_port,
std::function<std::unique_ptr<ReactorType>(Environment* env, std::size_t index)> create_lambda,
std::size_t new_size)
: multiport_(multiport)
, bank_(bank)
, get_input_port_(get_input_port)
, create_lambda_(create_lambda)
, new_size_(new_size) {}

auto change_bank_size = std::make_shared<MutationChangeBankSize<std::unique_ptr<ReactorType>>>(
bank_, (*bank_)[0]->environment(), new_size_, create_lambda_);
~ResizeMultiportToBank() = default;
auto run() -> MutationResult {
if (multiport_->size() != bank_->size()) {
return NotMatchingBankSize;
}
auto old_size = multiport_->size();

change_bank_size->run();
if (new_size_ > old_size) {
auto change_multiport_size = std::make_shared<MutationChangeOutputMultiportSize<unsigned>>(multiport_, new_size_);

for (auto i = old_size; i < new_size_; i++) {
auto add_conn = std::make_shared<MutationAddConnection<Output<PortType>, Input<PortType>>>(
&(*multiport_)[i], get_input_port_((*bank_)[i]), (*bank_)[0]->environment(), true);
change_multiport_size->run();

add_conn->run();
}
} else if (new_size_ < old_size) {
for (auto i = old_size - 1; i >= new_size_; i--) {
auto add_conn = std::make_shared<MutationAddConnection<Output<PortType>, Input<PortType>>>(
&(*multiport_)[i], get_input_port_((*bank_)[i]), (*bank_)[0]->environment(), false);
auto change_bank_size = std::make_shared<MutationChangeBankSize<std::unique_ptr<ReactorType>>>(
bank_, (*bank_)[0]->environment(), new_size_, create_lambda_);

add_conn->run();
}
change_bank_size->run();

auto change_multiport_size =
std::make_shared<MutationChangeOutputMultiportSize<unsigned>>(multiport_, new_size_);
for (auto i = old_size; i < new_size_; i++) {
auto add_conn = std::make_shared<MutationAddConnection<Output<PortType>, Input<PortType>>>(
&(*multiport_)[i], get_input_port_((*bank_)[i]), (*bank_)[0]->environment(), true);

change_multiport_size->run();
add_conn->run();
}
} else if (new_size_ < old_size) {
for (auto i = old_size - 1; i >= new_size_; i--) {
auto add_conn = std::make_shared<MutationAddConnection<Output<PortType>, Input<PortType>>>(
&(*multiport_)[i], get_input_port_((*bank_)[i]), (*bank_)[0]->environment(), false);

auto change_bank_size = std::make_shared<MutationChangeBankSize<std::unique_ptr<ReactorType>>>(
bank_, (*bank_)[0]->environment(), new_size_, create_lambda_);
add_conn->run();
}

change_bank_size->run();
}
auto change_multiport_size = std::make_shared<MutationChangeOutputMultiportSize<unsigned>>(multiport_, new_size_);

change_multiport_size->run();

return Success;
}
auto change_bank_size = std::make_shared<MutationChangeBankSize<std::unique_ptr<ReactorType>>>(
bank_, (*bank_)[0]->environment(), new_size_, create_lambda_);

auto rollback() -> MutationResult {
return Success;
}
};
}
change_bank_size->run();
}

return Success;
}

auto rollback() -> MutationResult { return Success; }
};
} // namespace reactor

#endif //MULTIPORT_TO_BANK_HH
#endif // MULTIPORT_TO_BANK_HH
2 changes: 1 addition & 1 deletion examples/multiport_mutation/producer.hh
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ private:
unsigned int counter_ = 0;

void reaction_1([[maybe_unused]] Output<unsigned>& out) {
std::cout << "producing value:" << counter_ << "\n";
// std::cout << "producing value:" << counter_ << "\n";
out.set(counter_++);
}

Expand Down
5 changes: 2 additions & 3 deletions examples/ports/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
add_executable(ports EXCLUDE_FROM_ALL main.cc)
target_link_libraries(ports reactor-cpp)
add_dependencies(examples ports)
add_executable(ports EXCLUDE_FROM_ALL main.cc) target_link_libraries(ports reactor - cpp)
add_dependencies(examples ports)
5 changes: 2 additions & 3 deletions examples/power_train/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
add_executable(power_train EXCLUDE_FROM_ALL main.cc)
target_link_libraries(power_train reactor-cpp)
add_dependencies(examples power_train)
add_executable(power_train EXCLUDE_FROM_ALL main.cc) target_link_libraries(power_train reactor - cpp)
add_dependencies(examples power_train)
12 changes: 6 additions & 6 deletions include/reactor-cpp/connection.hh
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,8 @@ public:
};
}

auto acquire_tag(const Tag& tag, std::unique_lock<std::mutex>& lock,
const std::function<bool(void)>& abort_waiting) -> bool override {
auto acquire_tag(const Tag& tag, std::unique_lock<std::mutex>& lock, const std::function<bool(void)>& abort_waiting)
-> bool override {
reactor_assert(lock.owns_lock());
log_.debug() << "downstream tries to acquire tag " << tag;

Expand Down Expand Up @@ -210,8 +210,8 @@ public:
};
}

auto acquire_tag(const Tag& tag, std::unique_lock<std::mutex>& lock,
const std::function<bool(void)>& abort_waiting) -> bool override {
auto acquire_tag(const Tag& tag, std::unique_lock<std::mutex>& lock, const std::function<bool(void)>& abort_waiting)
-> bool override {
// Since this is a delayed connection, we can go back in time and need to
// acquire the latest upstream tag that can create an event at the given
// tag. We also need to consider that given a delay d and a tag g=(t, n),
Expand Down Expand Up @@ -240,8 +240,8 @@ public:
};
}

auto acquire_tag(const Tag& tag, std::unique_lock<std::mutex>& lock,
const std::function<bool(void)>& abort_waiting) -> bool override {
auto acquire_tag(const Tag& tag, std::unique_lock<std::mutex>& lock, const std::function<bool(void)>& abort_waiting)
-> bool override {
this->log_.debug() << "downstream tries to acquire tag " << tag;
return PhysicalTimeBarrier::acquire_tag(tag, lock, this->environment()->scheduler(), abort_waiting);
}
Expand Down
8 changes: 6 additions & 2 deletions include/reactor-cpp/reactor.hh
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,16 @@ private:

void register_action(BaseAction* action);
void register_input(BasePort* port);
void unregister_input(BasePort* port);
void register_output(BasePort* port);
void unregister_output(BasePort* port);
void register_reaction(Reaction* reaction);
void register_reactor(Reactor* reactor);

void unregister_action(BaseAction* action);
void unregister_input(BasePort* port);
void unregister_output(BasePort* port);
void unregister_reaction(Reaction* reaction);
void unregister_reactor(Reactor* reactor);

public:
Reactor(const std::string& name, Reactor* container);
Reactor(const std::string& name, Environment* environment);
Expand Down
5 changes: 5 additions & 0 deletions include/reactor-cpp/statistics.hh
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,17 @@ public:
static void increment_reactions() { increment(reactions_); }
static void increment_actions() { increment(actions_); }
static void increment_ports() { increment(ports_); }

static void increment_processed_events() { increment(processed_events_); }
static void increment_processed_reactions() { increment(processed_reactions_); }
static void increment_triggered_actions() { increment(triggered_actions_); }
static void increment_set_ports() { increment(set_ports_); }
static void increment_scheduled_actions() { increment(scheduled_actions_); }

static void decrement_reactor_instances() { decrement(reactor_instances_); }
static void decrement_connections() { decrement(connections_); }
static void decrement_reactions() { decrement(reactions_); }
static void decrement_actions() { decrement(actions_); }
static void decrement_ports() { decrement(ports_); }

static auto reactor_instances() { return reactor_instances_.load(std::memory_order_acquire); }
Expand Down
4 changes: 2 additions & 2 deletions include/reactor-cpp/time_barrier.hh
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ public:
// The caller must hold a lock on the scheduler mutex
auto try_acquire_tag(const Tag& tag) { return tag <= released_time_; }

auto acquire_tag(const Tag& tag, std::unique_lock<std::mutex>& lock,
const std::function<bool(void)>& abort_waiting) -> bool {
auto acquire_tag(const Tag& tag, std::unique_lock<std::mutex>& lock, const std::function<bool(void)>& abort_waiting)
-> bool {
if (try_acquire_tag(tag)) {
return true;
}
Expand Down
Loading

0 comments on commit d7c0f98

Please sign in to comment.