Skip to content

Commit

Permalink
Merge pull request #420
Browse files Browse the repository at this point in the history
  • Loading branch information
Neverlord committed Jul 16, 2024
2 parents 7ec638a + 8724ffe commit 0b1f47d
Show file tree
Hide file tree
Showing 7 changed files with 7 additions and 331 deletions.
4 changes: 4 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
2.8.0-dev.113 | 2024-07-16 16:45:13 +0200

* Remove obsolete test scaffolding (Dominik Charousset, Corelight)

2.8.0-dev.111 | 2024-07-16 16:38:44 +0200

* Switch to clang-format 18 in CI (Dominik Charousset, Corelight)
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.8.0-dev.111
2.8.0-dev.113
120 changes: 0 additions & 120 deletions libbroker/broker/broker-test.test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -143,126 +143,6 @@ char ids_fixture::id_by_value(const broker::endpoint_id& value) {
FAIL("value not found: " << value);
}

base_fixture::base_fixture()
: ep(make_config()),
sys(internal::endpoint_access{&ep}.sys()),
self(sys),
sched(dynamic_cast<scheduler_type&>(sys.scheduler())) {
// nop
}

base_fixture::~base_fixture() {
run();
// Our core might do some messaging in its dtor, hence we need to make sure
// messages are handled when enqueued to avoid blocking.
sched.inline_all_enqueues();
}

configuration base_fixture::make_config() {
broker_options options;
options.disable_ssl = true;
configuration cfg{options};
auto& nat_cfg = internal::configuration_access{&cfg}.cfg();
caf::put(nat_cfg.content, "broker.disable-connector", true);
test_coordinator_fixture<caf::actor_system_config>::init_config(nat_cfg);
return cfg;
}

namespace {

struct bridge_state {
static inline const char* name = "broker.test.bridge";
};

using bridge_actor = caf::stateful_actor<bridge_state>;

} // namespace

base_fixture::endpoint_state base_fixture::ep_state(caf::actor core) {
auto& st = deref<internal::core_actor>(core).state;
return endpoint_state{st.id, st.filter->read(), core};
}

caf::actor base_fixture::bridge(const endpoint_state& left,
const endpoint_state& right) {
using caf::async::make_spsc_buffer_resource;
auto& sys = left.hdl.home_system();
auto [self, launch] = sys.spawn_inactive<bridge_actor>();
{
CAF_PUSH_AID_FROM_PTR(self);
auto [con1, prod1] = make_spsc_buffer_resource<node_message>();
auto [con2, prod2] = make_spsc_buffer_resource<node_message>();
caf::anon_send(left.hdl, atom::peer_v, right.id,
network_info{to_string(right.id), 42}, right.filter, con1,
prod2);
auto [con3, prod3] = make_spsc_buffer_resource<node_message>();
auto [con4, prod4] = make_spsc_buffer_resource<node_message>();
caf::anon_send(right.hdl, atom::peer_v, left.id,
network_info{to_string(left.id), 42}, left.filter, con3,
prod4);
self->make_observable().from_resource(con2).subscribe(prod3);
self->make_observable().from_resource(con4).subscribe(prod1);
}
auto hdl = caf::actor{self};
launch();
return hdl;
}

caf::actor base_fixture::bridge(const endpoint& left, const endpoint& right) {
return bridge(ep_state(native(left.core())), ep_state(native(right.core())));
}

caf::actor base_fixture::bridge(caf::actor left_core, caf::actor right_core) {
return bridge(ep_state(left_core), ep_state(right_core));
}

void base_fixture::push_data(caf::actor core,
std::vector<broker::data_message> xs) {
for (auto& x : xs)
caf::anon_send(core, atom::publish_v, std::move(x));
}

namespace {

struct data_collector_state {
static inline const char* name = "broker.test.data-collector";
};

using data_collector_actor = caf::stateful_actor<data_collector_state>;

void data_collector_impl(data_collector_actor* self,
std::shared_ptr<std::vector<data_message>> buf,
caf::async::consumer_resource<data_message> res) {
self->make_observable()
.from_resource(std::move(res))
.for_each([buf](const data_message& msg) { buf->emplace_back(msg); });
}

} // namespace

std::shared_ptr<std::vector<data_message>>
base_fixture::collect_data(caf::actor core, filter_type filter) {
using actor_t = data_collector_actor;
auto& sys = core.home_system();
auto [con, prod] = caf::async::make_spsc_buffer_resource<data_message>();
auto buf = std::make_shared<std::vector<data_message>>();
sys.spawn(data_collector_impl, buf, std::move(con));
anon_send(core, std::move(filter), std::move(prod));
return buf;
}

void base_fixture::run() {
while (sched.has_job() || sched.has_pending_timeout()) {
sched.run();
sched.trigger_timeouts();
}
}

void base_fixture::consume_message() {
if (!sched.try_run_once())
CAF_FAIL("no message to consume");
}

int main(int argc, char** argv) {
caf::init_global_meta_objects<caf::id_block::broker_test>();
endpoint::system_guard sys_guard; // Initialize global state.
Expand Down
194 changes: 1 addition & 193 deletions libbroker/broker/broker-test.test.hh
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
#include <caf/test/bdd_dsl.hpp>

#include <caf/actor_system.hpp>
#include <caf/scheduler/test_coordinator.hpp>
#include <caf/scoped_actor.hpp>

#include "broker/configuration.hh"
Expand Down Expand Up @@ -135,46 +134,7 @@ normalize_status_log(const std::vector<broker::data_message>& xs,

// -- fixtures -----------------------------------------------------------------

struct empty_fixture_base {};

template <class Derived, class Base = empty_fixture_base>
class time_aware_fixture : public Base {
public:
void run(caf::timespan t) {
auto& sched = dref().sched;
for (;;) {
sched.run();
auto& clk = sched.clock();
if (!clk.has_pending_timeout()) {
sched.advance_time(t);
sched.run();
return;
} else {
auto next_timeout = clk.next_timeout();
auto delta = next_timeout - clk.now();
if (delta >= t) {
sched.advance_time(t);
sched.run();
return;
} else {
sched.advance_time(delta);
t -= delta;
}
}
}
}

template <class Rep, class Period>
void run(std::chrono::duration<Rep, Period> t) {
run(std::chrono::duration_cast<caf::timespan>(t));
}

private:
Derived& dref() {
return *static_cast<Derived*>(this);
}
};

/// Adds broker endpoint IDs with keys A-Z to the test environment.
class ids_fixture {
public:
ids_fixture();
Expand All @@ -190,150 +150,8 @@ public:
std::map<char, std::string> str_ids;
};

/// A fixture that hosts an endpoint configured with `test_coordinator` as
/// scheduler as well as a `scoped_actor`.
class base_fixture : public time_aware_fixture<base_fixture>,
public ids_fixture {
public:
struct endpoint_state {
broker::endpoint_id id;
broker::filter_type filter;
caf::actor hdl;
};

using super = time_aware_fixture<base_fixture>;

using scheduler_type = caf::scheduler::test_coordinator;

base_fixture();

virtual ~base_fixture();

broker::endpoint ep;
caf::actor_system& sys;
caf::scoped_actor self;
scheduler_type& sched;

using super::run;

void run();

void consume_message();

/// Dereferences `hdl` and downcasts it to `T`.
template <class T = caf::scheduled_actor, class Handle = caf::actor>
static T& deref(const Handle& hdl) {
auto ptr = caf::actor_cast<caf::abstract_actor*>(hdl);
if (ptr == nullptr)
CAF_FAIL("unable to cast handle to abstract_actor*");
return dynamic_cast<T&>(*ptr);
}

static endpoint_state ep_state(caf::actor core);

static broker::configuration make_config();

/// Establishes a peering relation between `left` and `right`.
static caf::actor bridge(const endpoint_state& left,
const endpoint_state& right);

/// Establishes a peering relation between `left` and `right`.
static caf::actor bridge(const broker::endpoint& left,
const broker::endpoint& right);

static caf::actor bridge(caf::actor left_core, caf::actor right_core);

/// Collect data directly at a Broker core without using a
/// `broker::subscriber` or other public API.
static std::shared_ptr<std::vector<broker::data_message>>
collect_data(caf::actor core, broker::filter_type filter);

static void push_data(caf::actor core, std::vector<broker::data_message> xs);
};

template <class Fixture>
class net_fixture {
public:
using planet_type = Fixture;

planet_type earth;
planet_type mars;

auto bridge(planet_type& left, planet_type& right) {
return planet_type::bridge(left.ep, right.ep);
}

void run() {
while (earth.sched.has_job() || earth.sched.has_pending_timeout()
|| mars.sched.has_job() || mars.sched.has_pending_timeout()) {
earth.sched.run();
earth.sched.trigger_timeouts();
mars.sched.run();
mars.sched.trigger_timeouts();
}
}

void run(caf::timespan t) {
auto& n1 = this->earth;
auto& n2 = this->mars;
assert(n1.sched.clock().now() == n2.sched.clock().now());
auto advance = [](auto& n) {
return n.sched.try_run_once() || n.mpx.try_exec_runnable()
|| n.mpx.read_data();
};
auto exhaust = [&] {
while (advance(n1) || advance(n2))
; // repeat
};
auto get_next_timeout = [](auto& result, auto& node) {
if (node.sched.has_pending_timeout()) {
auto t = node.sched.clock().schedule().begin()->first;
if (result)
result = std::min(*result, t);
else
result = t;
}
};
for (;;) {
exhaust();
caf::optional<caf::actor_clock::time_point> next_timeout;
get_next_timeout(next_timeout, n1);
get_next_timeout(next_timeout, n2);
if (!next_timeout) {
n1.sched.advance_time(t);
n2.sched.advance_time(t);
exhaust();
return;
}
auto delta = *next_timeout - n1.sched.clock().now();
if (delta >= t) {
n1.sched.advance_time(t);
n2.sched.advance_time(t);
exhaust();
return;
}
n1.sched.advance_time(delta);
n2.sched.advance_time(delta);
t -= delta;
}
}

template <class Rep, class Period>
void run(std::chrono::duration<Rep, Period> t) {
run(std::chrono::duration_cast<caf::timespan>(t));
}
};

// -- utility ------------------------------------------------------------------

template <class T>
T unbox(broker::expected<T> x) {
if (!x)
FAIL(to_string(x.error()));
else
return std::move(*x);
}

inline broker::data value_of(broker::expected<broker::data> x) {
if (!x) {
FAIL("cannot unbox expected<data>: " << to_string(x.error()));
Expand All @@ -348,13 +166,3 @@ inline broker::error error_of(broker::expected<broker::data> x) {
}
return std::move(x.error());
}

/// Convenience function for creating a vector of events from topic and data
/// pairs.
inline std::vector<broker::data_message>
data_msgs(std::initializer_list<std::pair<broker::topic, broker::data>> xs) {
std::vector<broker::data_message> result;
for (auto& x : xs)
result.push_back(broker::make_data_message(x.first, x.second));
return result;
}
14 changes: 1 addition & 13 deletions libbroker/broker/internal/core_actor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -290,15 +290,6 @@ caf::behavior core_actor_state::make_behavior() {
try_connect(addr, rp);
return rp;
},
[this](atom::peer, endpoint_id peer, const network_info& addr,
const filter_type& filter, node_consumer_res in_res,
node_producer_res out_res) -> caf::result<void> {
if (auto err = init_new_peer(peer, addr, filter, std::move(in_res),
std::move(out_res)))
return err;
else
return caf::unit;
},
// -- unpeering ------------------------------------------------------------
[this](atom::unpeer, const network_info& peer_addr) { //
unpeer(peer_addr);
Expand Down Expand Up @@ -485,9 +476,6 @@ caf::behavior core_actor_state::make_behavior() {
[this](atom::shutdown, shutdown_options opts) { //
shutdown(opts);
},
[this](atom::no_events) { //
disable_notifications = true;
},
[this](atom::await, endpoint_id peer_id) {
auto rp = self->make_response_promise();
if (auto i = peers.find(peer_id); i != peers.end())
Expand Down Expand Up @@ -580,7 +568,7 @@ void core_actor_state::finalize_shutdown() {
template <class Info, class EnumConstant>
void core_actor_state::emit(Info&& ep, EnumConstant code, const char* msg) {
// Sanity checking.
if (disable_notifications || !data_outputs)
if (!data_outputs)
return;
// Pick the right topic and factory based on the event type.
using value_type = typename EnumConstant::value_type;
Expand Down
Loading

0 comments on commit 0b1f47d

Please sign in to comment.