From 36e906826db1645241b06867912e7f1d46a9bd53 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ivan=20=C4=8Cuki=C4=87?= Date: Wed, 4 Oct 2023 18:57:17 +0200 Subject: [PATCH] Added the example selector node (#178) * Added the example selector node Added: - support for collections of ports - support for async ports - settings report errors when a property is not set due to incompatible types - Monitor works even if nothing is connected to its input - connection examples to the block doc --- bench/bm_fft.cpp | 2 +- include/graph.hpp | 174 +++------ include/node.hpp | 256 ++++++++++--- include/node_traits.hpp | 197 +++++++--- include/port.hpp | 11 +- include/port_traits.hpp | 21 +- include/settings.hpp | 26 +- include/typelist.hpp | 11 + include/utils.hpp | 15 +- test/CMakeLists.txt | 1 + test/blocklib/core/selector.hpp | 209 ++++++++++ test/blocklib/core/unit-test/common_nodes.hpp | 4 +- test/qa_dynamic_port.cpp | 4 +- test/qa_fft.cpp | 6 +- test/qa_hier_node.cpp | 6 +- test/qa_selector.cpp | 358 ++++++++++++++++++ 16 files changed, 1039 insertions(+), 262 deletions(-) create mode 100644 test/blocklib/core/selector.hpp create mode 100644 test/qa_selector.cpp diff --git a/bench/bm_fft.cpp b/bench/bm_fft.cpp index 269376c9..1258c669 100644 --- a/bench/bm_fft.cpp +++ b/bench/bm_fft.cpp @@ -111,4 +111,4 @@ inline const boost::ut::suite _fft_bm_tests = [] { int main() { /* not needed by the UT framework */ -} \ No newline at end of file +} diff --git a/include/graph.hpp b/include/graph.hpp index 88a9e810..23ab3f78 100644 --- a/include/graph.hpp +++ b/include/graph.hpp @@ -34,106 +34,13 @@ namespace fair::graph { using namespace fair::literals; -#define ENABLE_PYTHON_INTEGRATION -#ifdef ENABLE_PYTHON_INTEGRATION - -// TODO: Not yet implemented -class dynamic_node { -private: - // TODO: replace the following with array<2, vector> - using dynamic_ports = std::vector; - dynamic_ports _dynamic_input_ports; - dynamic_ports _dynamic_output_ports; - - std::function _process; - -public: - void - work() { - _process(_dynamic_input_ports, _dynamic_output_ports); - } - - template - void - add_port(T &&port) { - switch (port.direction()) { - case port_direction_t::INPUT: - if (auto portID = port_index(port.name()); portID.has_value()) { - throw std::invalid_argument(fmt::format("port already has a defined input port named '{}' at ID {}", port.name(), portID.value())); - } - _dynamic_input_ports.emplace_back(std::forward(port)); - break; - - case port_direction_t::OUTPUT: - if (auto portID = port_index(port.name()); portID.has_value()) { - throw std::invalid_argument(fmt::format("port already has a defined output port named '{}' at ID {}", port.name(), portID.value())); - } - _dynamic_output_ports.emplace_back(std::forward(port)); - break; - - default: assert(false && "cannot add port with ANY designation"); - } - } - - [[nodiscard]] std::optional - dynamic_input_port(std::size_t index) { - return index < _dynamic_input_ports.size() ? std::optional{ &_dynamic_input_ports[index] } : std::nullopt; - } - - [[nodiscard]] std::optional - dynamic_input_port_index(std::string_view name) const { - auto portNameMatches = [name](const auto &port) { return port.name == name; }; - const auto it = std::find_if(_dynamic_input_ports.cbegin(), _dynamic_input_ports.cend(), portNameMatches); - return it != _dynamic_input_ports.cend() ? std::optional{ std::distance(_dynamic_input_ports.cbegin(), it) } : std::nullopt; - } - - [[nodiscard]] std::optional - dynamic_input_port(std::string_view name) { - if (const auto index = dynamic_input_port_index(name); index.has_value()) { - return &_dynamic_input_ports[*index]; - } - return std::nullopt; - } - - [[nodiscard]] std::optional - dynamic_output_port(std::size_t index) { - return index < _dynamic_output_ports.size() ? std::optional{ &_dynamic_output_ports[index] } : std::nullopt; - } - - [[nodiscard]] std::optional - dynamic_output_port_index(std::string_view name) const { - auto portNameMatches = [name](const auto &port) { return port.name == name; }; - const auto it = std::find_if(_dynamic_output_ports.cbegin(), _dynamic_output_ports.cend(), portNameMatches); - return it != _dynamic_output_ports.cend() ? std::optional{ std::distance(_dynamic_output_ports.cbegin(), it) } : std::nullopt; - } - - [[nodiscard]] std::optional - dynamic_output_port(std::string_view name) { - if (const auto index = dynamic_output_port_index(name); index.has_value()) { - return &_dynamic_output_ports[*index]; - } - return std::nullopt; - } - - [[nodiscard]] std::span - dynamic_input_ports() const noexcept { - return _dynamic_input_ports; - } - - [[nodiscard]] std::span - dynamic_output_ports() const noexcept { - return _dynamic_output_ports; - } -}; - -#endif - class node_model { protected: - using dynamic_ports = std::vector; - bool _dynamic_ports_loaded = false; - dynamic_ports _dynamic_input_ports; - dynamic_ports _dynamic_output_ports; + using dynamic_ports = std::vector; + bool _dynamic_ports_loaded = false; + std::function _dynamic_ports_loader; + dynamic_ports _dynamic_input_ports; + dynamic_ports _dynamic_output_ports; node_model() = default; @@ -147,27 +54,32 @@ class node_model { operator=(node_model &&other) = delete; + void + init_dynamic_ports() const { + if (!_dynamic_ports_loaded) _dynamic_ports_loader(); + } + fair::graph::dynamic_port & dynamic_input_port(std::size_t index) { - assert(_dynamic_ports_loaded); - return _dynamic_input_ports[index]; + init_dynamic_ports(); + return _dynamic_input_ports.at(index); } fair::graph::dynamic_port & dynamic_output_port(std::size_t index) { - assert(_dynamic_ports_loaded); - return _dynamic_output_ports[index]; + init_dynamic_ports(); + return _dynamic_output_ports.at(index); } [[nodiscard]] auto dynamic_input_ports_size() const { - assert(_dynamic_ports_loaded); + init_dynamic_ports(); return _dynamic_input_ports.size(); } [[nodiscard]] auto dynamic_output_ports_size() const { - assert(_dynamic_ports_loaded); + init_dynamic_ports(); return _dynamic_output_ports.size(); } @@ -280,21 +192,40 @@ class node_wrapper : public node_model { } void - init_dynamic_ports() { - using Node = std::remove_cvref_t; + create_dynamic_ports_loader() { + _dynamic_ports_loader = [this] { + if (_dynamic_ports_loaded) return; - constexpr std::size_t input_port_count = fair::graph::traits::node::template input_port_types::size; - [this](std::index_sequence) { - (this->_dynamic_input_ports.emplace_back(fair::graph::input_port(&node_ref())), ...); - }(std::make_index_sequence()); + using Node = std::remove_cvref_t; - constexpr std::size_t output_port_count = fair::graph::traits::node::template output_port_types::size; - [this](std::index_sequence) { - (this->_dynamic_output_ports.push_back(fair::graph::dynamic_port(fair::graph::output_port(&node_ref()))), ...); - }(std::make_index_sequence()); + auto register_port = [](PortOrCollection &port_or_collection, auto &where) { + auto process_port = [&where] (Port& port) { + where.push_back(fair::graph::dynamic_port(port, dynamic_port::non_owned_reference_tag{})); + }; + + if constexpr (traits::port::is_port_v) { + process_port(port_or_collection); + + } else { + for (auto &port : port_or_collection) { + process_port(port); + } + } + }; - static_assert(input_port_count + output_port_count > 0); - _dynamic_ports_loaded = true; + constexpr std::size_t input_port_count = fair::graph::traits::node::template input_port_types::size; + [this, register_port](std::index_sequence) { + (register_port(fair::graph::input_port(&node_ref()), this->_dynamic_input_ports), ...); + }(std::make_index_sequence()); + + constexpr std::size_t output_port_count = fair::graph::traits::node::template output_port_types::size; + [this, register_port](std::index_sequence) { + (register_port(fair::graph::output_port(&node_ref()), this->_dynamic_output_ports), ...); + }(std::make_index_sequence()); + + static_assert(input_port_count + output_port_count > 0); + _dynamic_ports_loaded = true; + }; } public: @@ -309,23 +240,24 @@ class node_wrapper : public node_model { ~node_wrapper() override = default; - node_wrapper() { init_dynamic_ports(); } + node_wrapper() { + create_dynamic_ports_loader(); + } template requires(!std::is_same_v, T>) explicit node_wrapper(Arg &&arg) : _node(std::forward(arg)) { - init_dynamic_ports(); + create_dynamic_ports_loader(); } template requires(sizeof...(Args) > 1) explicit node_wrapper(Args &&...args) : _node{ std::forward(args)... } { - init_dynamic_ports(); + create_dynamic_ports_loader(); } explicit node_wrapper(std::initializer_list> init_parameter) : _node{ std::move(init_parameter) } { - init_dynamic_ports(); - _node.settings().update_active_parameters(); + create_dynamic_ports_loader(); } void diff --git a/include/node.hpp b/include/node.hpp index 1e78ec44..bc4fb626 100644 --- a/include/node.hpp +++ b/include/node.hpp @@ -342,14 +342,17 @@ struct node : protected std::tuple { bool in_at_least_one_port_has_data{ false }; // at least one port has data bool in_at_least_one_tag_available{ false }; // at least one port has a tag + bool has_sync_input_ports{ false }; // if all ports are async, status is not important + bool has_sync_output_ports{ false }; // if all ports are async, status is not important + constexpr bool enough_samples_for_output_ports(std::size_t n) { - return n >= out_min_samples; + return !has_sync_output_ports || n >= out_min_samples; } constexpr bool space_available_on_output_ports(std::size_t n) { - return n <= out_available; + return !has_sync_output_ports || n <= out_available; } }; @@ -376,23 +379,48 @@ struct node : protected std::tuple { void update_ports_status() { - ports_status = ports_status_t(); + ports_status = ports_status_t(); + + auto adjust_for_input_port = [&ps = ports_status](Port &port) { + if constexpr (std::remove_cvref_t::synchronous) { + ps.has_sync_input_ports = true; + ps.in_min_samples = std::max(ps.in_min_samples, port.min_buffer_size()); + ps.in_max_samples = std::min(ps.in_max_samples, port.max_buffer_size()); + ps.in_available = std::min(ps.in_available, port.streamReader().available()); + ps.in_samples_to_next_tag = std::min(ps.in_samples_to_next_tag, samples_to_next_tag(port)); + ps.in_at_least_one_port_has_data = ps.in_at_least_one_port_has_data | (port.streamReader().available() > 0); + ps.in_at_least_one_tag_available = ps.in_at_least_one_port_has_data | (port.tagReader().available() > 0); + } + }; meta::tuple_for_each( - [&ps = ports_status](PortType auto &port) { - ps.in_min_samples = std::max(ps.in_min_samples, port.min_buffer_size()); - ps.in_max_samples = std::min(ps.in_max_samples, port.max_buffer_size()); - ps.in_available = std::min(ps.in_available, port.streamReader().available()); - ps.in_samples_to_next_tag = std::min(ps.in_samples_to_next_tag, samples_to_next_tag(port)); - ps.in_at_least_one_port_has_data = ps.in_at_least_one_port_has_data | (port.streamReader().available() > 0); - ps.in_at_least_one_tag_available = ps.in_at_least_one_port_has_data | (port.tagReader().available() > 0); + [&adjust_for_input_port](Port &port_or_collection) { + if constexpr (traits::port::is_port_v) { + adjust_for_input_port(port_or_collection); + } else { + for (auto &port : port_or_collection) { + adjust_for_input_port(port); + } + } }, input_ports(&self())); + auto adjust_for_output_port = [&ps = ports_status](Port &port) { + if constexpr (std::remove_cvref_t::synchronous) { + ps.has_sync_output_ports = true; + ps.out_min_samples = std::max(ps.out_min_samples, port.min_buffer_size()); + ps.out_max_samples = std::min(ps.out_max_samples, port.max_buffer_size()); + ps.out_available = std::min(ps.out_available, port.streamWriter().available()); + } + }; meta::tuple_for_each( - [&ps = ports_status](PortType auto &port) { - ps.out_min_samples = std::max(ps.out_min_samples, port.min_buffer_size()); - ps.out_max_samples = std::min(ps.out_max_samples, port.max_buffer_size()); - ps.out_available = std::min(ps.out_available, port.streamWriter().available()); + [&adjust_for_output_port](Port &port_or_collection) { + if constexpr (traits::port::is_port_v) { + adjust_for_output_port(port_or_collection); + } else { + for (auto &port : port_or_collection) { + adjust_for_output_port(port); + } + } }, output_ports(&self())); @@ -404,21 +432,33 @@ struct node : protected std::tuple { // TODO: adjust `samples_to_proceed` to output limits? ports_status.out_samples = ports_status.in_samples; - if (ports_status.in_min_samples > ports_status.in_max_samples) + if (ports_status.has_sync_input_ports && ports_status.in_min_samples > ports_status.in_max_samples) throw std::runtime_error(fmt::format("Min samples for input ports ({}) is larger then max samples for input ports ({})", ports_status.in_min_samples, ports_status.in_max_samples)); - if (ports_status.out_min_samples > ports_status.out_max_samples) + if (ports_status.has_sync_output_ports && ports_status.out_min_samples > ports_status.out_max_samples) throw std::runtime_error(fmt::format("Min samples for output ports ({}) is larger then max samples for output ports ({})", ports_status.out_min_samples, ports_status.out_max_samples)); + + if (!ports_status.has_sync_input_ports) { + ports_status.in_samples = 0; + ports_status.in_available = 0; + } + if (!ports_status.has_sync_output_ports) { + ports_status.out_samples = 0; + ports_status.out_available = 0; + } } public: node() noexcept : node({}) {} - node(std::initializer_list> init_parameter) noexcept + node(std::initializer_list> init_parameter) : _tags_at_input(traits::node::input_port_types::size()) , _tags_at_output(traits::node::output_port_types::size()) , _settings(std::make_unique>(*static_cast(this))) { // N.B. safe delegated use of this (i.e. not used during construction) if (init_parameter.size() != 0) { - std::ignore = settings().set(init_parameter); + const auto failed = settings().set(init_parameter); + if (!failed.empty()) { + throw std::invalid_argument("Settings not applied successfully"); + } } } @@ -466,7 +506,18 @@ struct node : protected std::tuple { } else { static_assert(fair::meta::always_false, "type not supported"); } - meta::tuple_for_each_enumerate([&data](auto index, auto &input_port) { data[index] = input_port.streamReader().available(); }, input_ports(&self())); + meta::tuple_for_each_enumerate( + [&data](auto index, Port &input_port) { + if constexpr (traits::port::is_port_v) { + data[index] = input_port.streamReader().available(); + } else { + data[index] = 0; + for (auto &port : input_port) { + data[index] += port.streamReader().available(); + } + } + }, + input_ports(&self())); return traits::node::input_port_types::size; } @@ -480,7 +531,18 @@ struct node : protected std::tuple { } else { static_assert(fair::meta::always_false, "type not supported"); } - meta::tuple_for_each_enumerate([&data](auto index, auto &output_port) { data[index] = output_port.streamWriter().available(); }, output_ports(&self())); + meta::tuple_for_each_enumerate( + [&data](auto index, Port &output_port) { + if constexpr (traits::port::is_port_v) { + data[index] = output_port.streamWriter().available(); + } else { + data[index] = 0; + for (auto &port : output_port) { + data[index] += port.streamWriter().available(); + } + } + }, + output_ports(&self())); return traits::node::output_port_types::size; } @@ -555,12 +617,31 @@ struct node : protected std::tuple { write_to_outputs(std::size_t available_values_count, auto &writers_tuple) noexcept { if constexpr (traits::node::output_ports::size > 0) { meta::tuple_for_each_enumerate( - [available_values_count](auto i, auto &output_range) { + [available_values_count](auto i, OutputRange &output_range) { if constexpr (traits::node::can_process_one or traits::node::process_bulk_requires_ith_output_as_span) { - output_range.publish(available_values_count); - } else if (not output_range.is_published()) { - fmt::print(stderr, "process_bulk failed to publish one of its outputs. Use a std::span argument if you do not want to publish manually.\n"); - std::abort(); + auto process_out = [available_values_count](Out &out) { + // This will be a pointer if the port was async + // TODO: Make this check more specific + if constexpr (not std::is_pointer_v>) { + out.publish(available_values_count); + } + }; + if (available_values_count) { + if constexpr (refl::trait::is_instance_of_v>) { + for (auto &out : output_range) { + process_out(out); + } + } else { + process_out(output_range); + } + } + } else { + if constexpr (requires { output_range.is_published(); }) { + if (not output_range.is_published()) { + fmt::print(stderr, "process_bulk failed to publish one of its outputs. Use a std::span argument if you do not want to publish manually.\n"); + std::abort(); + } + } } }, writers_tuple); @@ -574,7 +655,20 @@ struct node : protected std::tuple { consume_readers(Self &self, std::size_t available_values_count) { bool success = true; if constexpr (traits::node::input_ports::size > 0) { - std::apply([available_values_count, &success](auto &...input_port) { ((success = success && input_port.streamReader().consume(available_values_count)), ...); }, input_ports(&self)); + std::apply( + [available_values_count, &success](auto &...input_port) { + auto consume_port = [&](Port &port_or_collection) { + if constexpr (traits::port::is_port_v) { + success = success && port_or_collection.streamReader().consume(available_values_count); + } else { + for (auto &port : port_or_collection) { + success = success && port.streamReader().consume(available_values_count); + } + } + }; + (consume_port(input_port), ...); + }, + input_ports(&self)); } return success; } @@ -618,17 +712,22 @@ struct node : protected std::tuple { // TODO: following function does not call the lvalue but erroneously the lvalue version of publish_tag(...) ?!?! // meta::tuple_for_each([&port_id, this](auto &output_port) noexcept { publish_tag2(output_port, _tags_at_output[port_id++]); }, output_ports(&self())); meta::tuple_for_each( - [&port_id, this](auto &output_port) noexcept { - if (_tags_at_output[port_id].map.empty()) { - port_id++; + [&port_id, this](Port &output_port) noexcept { + if constexpr (!traits::port::is_port_v) { + // TODO Add tag support to port collections? return; + } else { + if (_tags_at_output[port_id].map.empty()) { + port_id++; + return; + } + auto data = output_port.tagWriter().reserve_output_range(1); + auto stream_writer_offset = std::max(static_cast(0), output_port.streamWriter().position() + 1); + data[0].index = stream_writer_offset + _tags_at_output[port_id].index; + data[0].map = _tags_at_output[port_id].map; + data.publish(1); + port_id++; } - auto data = output_port.tagWriter().reserve_output_range(1); - auto stream_writer_offset = std::max(static_cast(0), output_port.streamWriter().position() + 1); - data[0].index = stream_writer_offset + _tags_at_output[port_id].index; - data[0].map = _tags_at_output[port_id].map; - data.publish(1); - port_id++; }, output_ports(&self())); // clear input/output tags after processing, N.B. ranges omitted because of missing Clang/Emscripten support @@ -698,6 +797,7 @@ struct node : protected std::tuple { } ports_status.in_samples = std::min(samples_to_process, requested_work); ports_status.out_samples = ports_status.in_samples; + } else if constexpr (requires(const Derived &d) { { available_samples(d) } -> std::same_as; }) { @@ -714,6 +814,7 @@ struct node : protected std::tuple { } ports_status.in_samples = std::min(samples_to_process, requested_work); ports_status.out_samples = ports_status.in_samples; + } else if constexpr (is_sink_node) { // no input or output buffers, derive from internal "buffer sizes" (i.e. what the // buffer size would be if the node were not merged) @@ -722,6 +823,7 @@ struct node : protected std::tuple { "friend function `available_samples(const NodeType&)` must be defined."); ports_status.in_samples = std::min(chunk_size, requested_work); ports_status.out_samples = ports_status.in_samples; + } else { // derive value from output buffer size std::size_t samples_to_process = std::min(ports_status.out_available, ports_status.out_max_samples); @@ -732,11 +834,12 @@ struct node : protected std::tuple { ports_status.out_samples = ports_status.in_samples; // space_available_on_output_ports is true by construction of samples_to_process } + } else { ports_status.in_samples = std::min(ports_status.in_samples, requested_work); ports_status.out_samples = ports_status.in_samples; - if (ports_status.in_available == 0) { + if (ports_status.has_sync_input_ports && ports_status.in_available == 0) { return { requested_work, 0_UZ, ports_status.in_at_least_one_port_has_data ? work_return_status_t::INSUFFICIENT_INPUT_ITEMS : work_return_status_t::DONE }; } @@ -788,24 +891,27 @@ struct node : protected std::tuple { _input_tags_present = true; std::size_t port_index = 0; // TODO absorb this as optional tuple_for_each argument meta::tuple_for_each( - [&merged_tag_map, &port_index, this](auto &input_port) noexcept { - auto &tag_at_present_input = _tags_at_input[port_index++]; - tag_at_present_input.reset(); - if (!input_port.tagReader().available()) { - return; - } - const auto tags = input_port.tagReader().get(1_UZ); - const auto readPos = input_port.streamReader().position(); - const auto tag_stream_pos = tags[0].index - 1 - readPos; - if ((readPos == -1 && tags[0].index <= 0) // first tag on initialised stream - || tag_stream_pos <= 0) { - for (const auto &[index, map] : tags) { - for (const auto &[key, value] : map) { - tag_at_present_input.map.insert_or_assign(key, value); - merged_tag_map.insert_or_assign(key, value); + [&merged_tag_map, &port_index, this](Port &input_port) noexcept { + // TODO: Do we want to support tags for non-compile-time ports? [ivan][port_group][move_to_policy?] + if constexpr (traits::port::is_port_v) { + auto &tag_at_present_input = _tags_at_input[port_index++]; + tag_at_present_input.reset(); + if (!input_port.tagReader().available()) { + return; + } + const auto tags = input_port.tagReader().get(1_UZ); + const auto readPos = input_port.streamReader().position(); + const auto tag_stream_pos = tags[0].index - 1 - readPos; + if ((readPos == -1 && tags[0].index <= 0) // first tag on initialised stream + || tag_stream_pos <= 0) { + for (const auto &[index, map] : tags) { + for (const auto &[key, value] : map) { + tag_at_present_input.map.insert_or_assign(key, value); + merged_tag_map.insert_or_assign(key, value); + } } + std::ignore = input_port.tagReader().consume(1_UZ); } - std::ignore = input_port.tagReader().consume(1_UZ); } }, input_ports(&self())); @@ -876,9 +982,48 @@ struct node : protected std::tuple { } } - const auto input_spans = meta::tuple_transform([in_samples = ports_status.in_samples](auto &input_port) noexcept { return input_port.streamReader().get(in_samples); }, input_ports(&self())); - auto writers_tuple = meta::tuple_transform([out_samples = ports_status.out_samples](auto &output_port) noexcept { return output_port.streamWriter().reserve_output_range(out_samples); }, - output_ports(&self())); + const auto input_spans = meta::tuple_transform( + [&self = self(), sync_in_samples = self().ports_status.in_samples]( PortOrCollection &input_port_or_collection) noexcept { + auto in_samples = sync_in_samples; + + auto process_single_port = [&in_samples](Port &&port) { + if constexpr (std::remove_cvref_t::synchronous) { + return std::forward(port).streamReader().get(in_samples); + } else { + return std::addressof(std::forward(port).streamReader()); + } + }; + if constexpr (traits::port::is_port_v) { + return process_single_port(input_port_or_collection); + } else { + using value_span = decltype(process_single_port(std::declval())); + std::vector result; + std::transform(input_port_or_collection.begin(), input_port_or_collection.end(), std::back_inserter(result), process_single_port); + return result; + } + }, + input_ports(&self())); + auto writers_tuple = meta::tuple_transform( + [&self = self(), sync_out_samples = ports_status.out_samples]( PortOrCollection &output_port_or_collection) noexcept { + auto out_samples = sync_out_samples; + + auto process_single_port = [&out_samples](Port &&port) { + if constexpr (std::remove_cvref_t::synchronous) { + return std::forward(port).streamWriter().reserve_output_range(out_samples); + } else { + return std::addressof(std::forward(port).streamWriter()); + } + }; + if constexpr (traits::port::is_port_v) { + return process_single_port(output_port_or_collection); + } else { + using value_span = decltype(process_single_port(std::declval())); + std::vector result; + std::transform(output_port_or_collection.begin(), output_port_or_collection.end(), std::back_inserter(result), process_single_port); + return result; + } + }, + output_ports(&self())); if constexpr (HasProcessBulkFunction) { // cannot use std::apply because it requires tuple_cat(input_spans, writers_tuple). The latter doesn't work because writers_tuple isn't copyable. @@ -890,6 +1035,7 @@ struct node : protected std::tuple { const bool success = consume_readers(self(), n_samples_to_consume); forward_tags(); return { requested_work, ports_status.in_samples, success ? ret : work_return_status_t::ERROR }; + } else if constexpr (HasProcessOneFunction) { if (ports_status.in_samples != ports_status.out_samples) throw std::runtime_error(fmt::format("N input samples ({}) does not equal to N output samples ({}) for process_one() method.", ports_status.in_samples, ports_status.out_samples)); diff --git a/include/node_traits.hpp b/include/node_traits.hpp index 8b284b04..7d959cb9 100644 --- a/include/node_traits.hpp +++ b/include/node_traits.hpp @@ -3,9 +3,9 @@ #include -#include // localinclude +#include // localinclude #include // localinclude -#include // localinclude +#include // localinclude #include @@ -16,49 +16,82 @@ enum class work_return_status_t; namespace fair::graph::traits::node { namespace detail { - template - using member_type = typename FieldDescriptor::value_type; +template +using member_type = typename FieldDescriptor::value_type; - template - using is_port = std::integral_constant>; +template +auto +unwrap_port_helper() { + if constexpr (port::is_port_v) { + return static_cast(nullptr); + } else if constexpr (port::is_port_collection_v) { + return static_cast(nullptr); + } else { + static_assert(meta::always_false, "Not a port or a collection of ports"); + } +} - template - constexpr bool is_port_descriptor_v = port::is_port_v>; +template +using unwrap_port = std::remove_pointer_t())>; - template - using is_port_descriptor = std::integral_constant>; +template +using is_port_or_collection = std::integral_constant || port::is_port_collection_v>; - template - using member_to_named_port = typename PortDescriptor::value_type::template with_name; +template +using is_input_port_or_collection = std::integral_constant() && port::is_input_v>>; - template - struct member_ports_detector { - static constexpr bool value = false; - }; +template +using is_output_port_or_collection = std::integral_constant() && port::is_output_v>>; - template> - concept Reflectable = refl::is_reflectable(); +template +constexpr bool is_port_descriptor_v = port::is_port_v>; - template - struct member_ports_detector { - using member_ports = - typename meta::to_typelist> - ::template filter - ::template transform; +template +constexpr bool is_port_collection_descriptor_v = port::is_port_collection_v>; - static constexpr bool value = member_ports::size != 0; - }; +template +using is_port_or_collection_descriptor = std::integral_constant || is_port_collection_descriptor_v>; - template - using port_name = typename Node::static_name(); +template +constexpr auto +member_to_named_port_helper() { + // Collections of ports don't get names inside the type as + // the ports inside are dynamically created + if constexpr (is_port_descriptor_v) { + return static_cast *>(nullptr); + } else if constexpr (is_port_collection_descriptor_v) { + return static_cast(nullptr); + } else { + return static_cast(nullptr); + } +} - template - struct member_descriptor_has_type { - template - using matches = std::is_same>; - }; +template +using member_to_named_port = std::remove_pointer_t())>; + +template +struct member_ports_detector { + static constexpr bool value = false; +}; +template> +concept Reflectable = refl::is_reflectable(); +template +struct member_ports_detector { + using member_ports = typename meta::to_typelist>::template filter::template transform; + + static constexpr bool value = member_ports::size != 0; +}; + +template +using port_name = typename Node::static_name(); + +template +struct member_descriptor_has_type { + template + using matches = std::is_same>; +}; } // namespace detail @@ -68,8 +101,8 @@ struct fixed_node_ports_data_helper; // This specialization defines node attributes when the node is created // with two type lists - one list for input and one for output ports template - requires InputPorts::template all_of &&OutputPorts::template all_of -struct fixed_node_ports_data_helper { + requires InputPorts::template +all_of &&OutputPorts::template all_of struct fixed_node_ports_data_helper { using member_ports_detector = std::false_type; // using member_ports_detector = detail::member_ports_detector; @@ -89,20 +122,19 @@ template struct fixed_node_ports_data_helper { using member_ports_detector = detail::member_ports_detector; - using all_ports = std::remove_pointer_t< - decltype([] { - if constexpr (member_ports_detector::value) { - return static_cast(nullptr); - } else { - return static_cast, Ports, meta::typelist>...>*>(nullptr); - } - }())>; + using all_ports = std::remove_pointer_t(nullptr); + } else { + return static_cast, Ports, meta::typelist>...> *>(nullptr); + } + }())>; - using input_ports = typename all_ports ::template filter; - using output_ports = typename all_ports ::template filter; + using input_ports = typename all_ports ::template filter; + using output_ports = typename all_ports ::template filter; - using input_port_types = typename input_ports ::template transform; - using output_port_types = typename output_ports ::template transform; + using input_port_types = typename input_ports ::template transform; + using output_port_types = typename output_ports ::template transform; }; // clang-format off @@ -146,16 +178,14 @@ template constexpr bool node_defines_ports_as_member_variables = fixed_node_ports_data::member_ports_detector::value; template -using get_port_member_descriptor = - typename meta::to_typelist> - ::template filter - ::template filter::template matches>::template at<0>; +using get_port_member_descriptor = typename meta::to_typelist>::template filter::template filter< + detail::member_descriptor_has_type::template matches>::template at<0>; +// TODO: Why is this not done with requires? namespace detail { template auto -can_process_one_invoke_test(auto &node, const auto &input, std::index_sequence) - -> decltype(node.process_one(std::get(input)...)); +can_process_one_invoke_test(auto &node, const auto &input, std::index_sequence) -> decltype(node.process_one(std::get(input)...)); template struct exact_argument_type { @@ -169,7 +199,7 @@ can_process_one_with_offset_invoke_test(auto &node, const auto &input, std::inde template using simd_return_type_of_can_process_one = meta::simdize, meta::simdize_size_v>>>; -} +} // namespace detail /* A node "can process simd" if its `process_one` function takes at least one argument and all * arguments can be simdized types of the actual port data types. @@ -185,6 +215,7 @@ concept can_process_one_simd = #if DISABLE_SIMD false; #else + traits::node::input_ports::template all_of and // checks we don't have port collections inside traits::node::input_port_types::size() > 0 and requires(Node &node, const meta::simdize> &input_simds) { { detail::can_process_one_invoke_test(node, input_simds, std::make_index_sequence::size()>()) @@ -197,6 +228,7 @@ concept can_process_one_simd_with_offset = #if DISABLE_SIMD false; #else + traits::node::input_ports::template all_of and // checks we don't have port collections inside traits::node::input_port_types::size() > 0 && requires(Node &node, const meta::simdize> &input_simds) { { detail::can_process_one_with_offset_invoke_test(node, input_simds, std::make_index_sequence::size()>()) @@ -222,19 +254,68 @@ concept can_process_one_with_offset = can_process_one_scalar_with_offset o namespace detail { template -struct dummy_input_span : public std::span { // NOSONAR +struct dummy_input_span : std::span { // NOSONAR dummy_input_span(const dummy_input_span &) = delete; // NOSONAR dummy_input_span(dummy_input_span &&) noexcept; // NOSONAR constexpr void consume(std::size_t) noexcept; }; template -struct dummy_output_span : public std::span { // NOSONAR +struct dummy_output_span : std::span { // NOSONAR dummy_output_span(const dummy_output_span &) = delete; // NOSONAR dummy_output_span(dummy_output_span &&) noexcept; // NOSONAR constexpr void publish(std::size_t) noexcept; }; +struct to_any_vector { + template + operator std::vector() const { return {}; } // NOSONAR + + template + operator std::vector&() const { return {}; } // NOSONAR +}; + +struct to_any_pointer { + template + operator Any*() const { return {}; } // NOSONAR +}; + +template +struct dummy_reader { + using type = to_any_pointer; +}; + +template +struct dummy_writer { + using type = to_any_pointer; +}; + +template +constexpr auto * +port_to_process_bulk_argument_helper() { + if constexpr (requires(Port p) { typename Port::value_type; p.cbegin() != p.cend(); }) { + return static_cast(nullptr); + + } else if constexpr (Port::synchronous) { + if constexpr (Port::IS_INPUT) { + return static_cast *>(nullptr); + } else if constexpr (Port::IS_OUTPUT) { + return static_cast *>(nullptr); + } + } else { + if constexpr (Port::IS_INPUT) { + return static_cast(nullptr); + } else if constexpr (Port::IS_OUTPUT) { + return static_cast(nullptr); + } + } +} + +template +struct port_to_process_bulk_argument { + using type = std::remove_pointer_t())>; +}; + template struct nothing_you_ever_wanted {}; @@ -249,8 +330,8 @@ can_process_bulk_invoke_test(auto &node, const auto &inputs, auto &outputs, std: } // namespace detail template -concept can_process_bulk = requires(Node &n, typename meta::transform_types>::tuple_type inputs, - typename meta::transform_types>::tuple_type outputs) { +concept can_process_bulk = requires(Node &n, typename meta::transform_types_nested>::tuple_type inputs, + typename meta::transform_types_nested>::tuple_type outputs) { { detail::can_process_bulk_invoke_test(n, inputs, outputs, std::make_index_sequence::size>(), std::make_index_sequence::size>()) } -> std::same_as; diff --git a/include/port.hpp b/include/port.hpp index 0af4d1d6..fca37c0b 100644 --- a/include/port.hpp +++ b/include/port.hpp @@ -750,6 +750,9 @@ class dynamic_port { public: using value_type = void; // a sterile port + struct owned_value_tag {}; + struct non_owned_reference_tag {}; + constexpr dynamic_port() = delete; dynamic_port(const dynamic_port &arg) = delete; @@ -762,13 +765,15 @@ class dynamic_port { operator=(dynamic_port &&arg) = delete; - // TODO: Make owning versus non-owning API more explicit + // TODO: The lifetime of ports is a problem here, if we keep + // a reference to the port in dynamic_port, the port object + // can not be reallocated template - explicit constexpr dynamic_port(T &arg) noexcept + explicit constexpr dynamic_port(T &arg, non_owned_reference_tag) noexcept : name(arg.name), priority(arg.priority), min_samples(arg.min_samples), max_samples(arg.max_samples), _accessor{ std::make_unique>(arg) } {} template - explicit constexpr dynamic_port(T &&arg) noexcept + explicit constexpr dynamic_port(T &&arg, owned_value_tag) noexcept : name(arg.name), priority(arg.priority), min_samples(arg.min_samples), max_samples(arg.max_samples), _accessor{ std::make_unique>(std::forward(arg)) } {} [[nodiscard]] supported_type diff --git a/include/port_traits.hpp b/include/port_traits.hpp index 11b9b7d1..48a5477b 100644 --- a/include/port_traits.hpp +++ b/include/port_traits.hpp @@ -28,9 +28,6 @@ template requires(meta::is_typelist_v and T::template all_of) struct has_fixed_info_or_is_typelist : std::true_type {}; -template -using type = typename Port::value_type; - template using is_input = std::integral_constant; @@ -46,6 +43,24 @@ concept is_output_v = is_output::value; template concept is_port_v = is_output_v || is_input_v; +template +using is_port = std::integral_constant>; + +template +concept is_port_collection_v = is_port_v; + +template +auto type_helper() { + if constexpr (is_port_v) { + return static_cast(nullptr); + } else { + return static_cast*>(nullptr); + } +} + +template +using type = std::remove_pointer_t())>; + template struct min_samples : std::integral_constant {}; diff --git a/include/settings.hpp b/include/settings.hpp index 60444d18..2723010e 100644 --- a/include/settings.hpp +++ b/include/settings.hpp @@ -1,17 +1,19 @@ #ifndef GRAPH_PROTOTYPE_SETTINGS_HPP #define GRAPH_PROTOTYPE_SETTINGS_HPP -#include "annotated.hpp" #include #include #include #include #include -#include #include -#include #include +#include +#include +#include +#include + namespace fair::graph { namespace detail { @@ -267,7 +269,7 @@ class basic_settings : public settings_base { auto iterate_over_member = [&](auto member) { using RawType = std::remove_cvref_t; using Type = unwrap_if_wrapped_t; - if constexpr (is_writable(member) && (std::is_arithmetic_v || std::is_same_v || fair::meta::vector_type) ) { + if constexpr ((!traits::node::detail::is_port_or_collection()) && is_writable(member) && (std::is_arithmetic_v || std::is_same_v || fair::meta::vector_type) ) { auto matchesIgnoringPrefix = [](std::string_view str, std::string_view prefix, std::string_view target) { if (str.starts_with(prefix)) { str.remove_prefix(prefix.size()); @@ -363,7 +365,7 @@ class basic_settings : public settings_base { bool is_set = false; auto iterate_over_member = [&, this](auto member) { using Type = unwrap_if_wrapped_t>; - if constexpr (is_writable(member) && (std::is_arithmetic_v || std::is_same_v || fair::meta::vector_type) ) { + if constexpr ((!traits::node::detail::is_port_or_collection()) && is_writable(member) && (std::is_arithmetic_v || std::is_same_v || fair::meta::vector_type) ) { if (std::string(get_display_name(member)) == key && std::holds_alternative(value)) { if (_auto_update.contains(key)) { _auto_update.erase(key); @@ -372,6 +374,9 @@ class basic_settings : public settings_base { settings_base::_changed.store(true); is_set = true; } + if (std::string(get_display_name(member)) == key && !std::holds_alternative(value)) { + throw std::invalid_argument(fmt::format("The {} has a wrong type", key)); + } } }; if constexpr (detail::HasBaseType) { @@ -379,6 +384,7 @@ class basic_settings : public settings_base { } refl::util::for_each(refl::reflect().members, iterate_over_member); if (!is_set) { + fmt::print("The property {} was not set\n", key); ret.insert_or_assign(key, pmtv::pmt(value)); } } @@ -418,7 +424,7 @@ class basic_settings : public settings_base { const auto &value = localValue; auto iterate_over_member = [&](auto member) { using Type = unwrap_if_wrapped_t>; - if constexpr (is_writable(member) && (std::is_arithmetic_v || std::is_same_v || fair::meta::vector_type) ) { + if constexpr ((!traits::node::detail::is_port_or_collection()) && is_writable(member) && (std::is_arithmetic_v || std::is_same_v || fair::meta::vector_type) ) { if (std::string(get_display_name(member)) == key && std::holds_alternative(value)) { _staged.insert_or_assign(key, value); settings_base::_changed.store(true); @@ -509,7 +515,7 @@ class basic_settings : public settings_base { auto apply_member_changes = [&key, &staged, &forward_parameters, &staged_value, this](auto member) { using RawType = std::remove_cvref_t; using Type = unwrap_if_wrapped_t; - if constexpr (is_writable(member) && is_supported_type()) { + if constexpr ((!traits::node::detail::is_port_or_collection()) && is_writable(member) && is_supported_type()) { if (std::string(get_display_name(member)) == key && std::holds_alternative(staged_value)) { if constexpr (is_annotated()) { if (member(*_node).validate_and_set(std::get(staged_value))) { @@ -554,7 +560,7 @@ class basic_settings : public settings_base { // update active parameters auto update_active = [this](auto member) { using Type = unwrap_if_wrapped_t>; - if constexpr (is_readable(member) && is_supported_type()) { + if constexpr ((!traits::node::detail::is_port_or_collection()) && is_readable(member) && is_supported_type()) { _active.insert_or_assign(get_display_name(member), pmtv::pmt(member(*_node))); } }; @@ -593,7 +599,7 @@ class basic_settings : public settings_base { auto iterate_over_member = [&, this](auto member) { using Type = unwrap_if_wrapped_t>; - if constexpr (is_readable(member) && is_supported_type()) { + if constexpr ((!traits::node::detail::is_port_or_collection()) && is_readable(member) && is_supported_type()) { _active.insert_or_assign(get_display_name_const(member).str(), member(*_node)); } }; @@ -612,7 +618,7 @@ class basic_settings : public settings_base { auto iterate_over_member = [&, this](auto member) { using Type = unwrap_if_wrapped_t>; - if constexpr (is_readable(member) && is_supported_type()) { + if constexpr ((!traits::node::detail::is_port_or_collection()) && is_readable(member) && is_supported_type()) { oldSettings.insert_or_assign(get_display_name(member), pmtv::pmt(member(*_node))); } }; diff --git a/include/typelist.hpp b/include/typelist.hpp index bf789677..6df6de6b 100644 --- a/include/typelist.hpp +++ b/include/typelist.hpp @@ -180,11 +180,22 @@ template class Template, typename... Ts> struct transform_types_impl> { using type = typelist...>; }; + +template class Template, typename List> +struct transform_types_nested_impl; + +template class Template, typename... Ts> +struct transform_types_nested_impl> { + using type = typelist::type...>; +}; } // namespace detail template class Template, typename List> using transform_types = typename detail::transform_types_impl::type; +template class Template, typename List> +using transform_types_nested = typename detail::transform_types_nested_impl::type; + // transform_value_type template using transform_value_type = typename T::value_type; diff --git a/include/utils.hpp b/include/utils.hpp index 8d16e0e5..5da3e6fd 100644 --- a/include/utils.hpp +++ b/include/utils.hpp @@ -440,7 +440,20 @@ auto tuple_transform(Function &&function, Tuple &&tuple, Tuples &&...tuples) { static_assert(((std::tuple_size_v> == std::tuple_size_v>) &&...)); return [&](std::index_sequence) { - return std::make_tuple([&function, &tuple, &tuples...](auto I) { return function(std::get(tuple), std::get(tuples)...); }(std::integral_constant{})...); + return std::make_tuple([&function, &tuple, &tuples...](auto I) { + return function(std::get(tuple), std::get(tuples)...); + }(std::integral_constant{})...); + }(std::make_index_sequence>>()); +} + +template +auto +tuple_transform_enumerated(Function &&function, Tuple &&tuple, Tuples &&...tuples) { + static_assert(((std::tuple_size_v> == std::tuple_size_v>) &&...)); + return [&](std::index_sequence) { + return std::make_tuple([&function, &tuple, &tuples...](auto I) { + return function(I, std::get(tuple), std::get(tuples)...); + }(std::integral_constant{})...); }(std::make_index_sequence>>()); } diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index eaa2bb95..e4f736b4 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -40,6 +40,7 @@ add_ut_test(qa_node) add_ut_test(qa_scheduler) add_ut_test(qa_reader_writer_lock) add_ut_test(qa_settings) +add_ut_test(qa_selector) add_ut_test(qa_sources) add_ut_test(qa_tags) add_ut_test(qa_thread_affinity) diff --git a/test/blocklib/core/selector.hpp b/test/blocklib/core/selector.hpp new file mode 100644 index 00000000..cfd09acb --- /dev/null +++ b/test/blocklib/core/selector.hpp @@ -0,0 +1,209 @@ +#ifndef GRAPH_PROTOTYPE_SELECTOR_HPP +#define GRAPH_PROTOTYPE_SELECTOR_HPP + +#include +#include + +namespace gr::blocks::basic { +using namespace fair::graph; + +// optional shortening +template +using A = Annotated; + +using SelectorDoc = Doc 1-|- + -|-2 >------> 2-|- + -|-3 >------> 3-|- + -|-4 \----> 4-|- + +--------------+ + +And for arrays: + + mapIn = {1, 2, 3, 4} + mapOut = {2, 2, 3, 3} + +The mapping is as follows: + + +--------------+ + -|-1 >-----\ 1-|- + -|-2 >------> 2-|- + -|-3 >------> 3-|- + -|-4 >-----/ 4-|- + +--------------+ + +It also contains two additional ports -- `selectOut` and `monitorOut`. Port +`selectOut` which can be used to define which input port is passed on to the +`monitorOut` port. + +For uses where all input ports should be read even if they are not connected +to any output port (thus reading and ignoring all the values from the input), +you can set the `backPressure` property to false. + +)"">; + +template +struct Selector : node, SelectorDoc> { + // port definitions + PortIn selectOut; + PortOut monitorOut; // optional monitor output (more for demo/API purposes than actual need) + std::vector> inputs; // TODO: need to add exception to pmt_t that this isn't interpreted as a settings type + std::vector> outputs; + + // settings + A, Limits<1U, 32U>> nInputs = 0U; + A, Limits<1U, 32U>> nOutputs = 0U; + A, "mapIn", Visible, Doc<"input port index to route from">> mapIn; // N.B. need two vectors since pmt_t doesn't support pairs (yet!?!) + A, "mapOut", Visible, Doc<"output port index to route to">> mapOut; + A> backPressure = false; + std::map> _internalMapping; + std::uint32_t _selectedSrc = -1U; + + void + settings_changed(const fair::graph::property_map &old_settings, const fair::graph::property_map &new_settings) { + if (new_settings.contains("nInputs") || new_settings.contains("nOutputs")) { + fmt::print("{}: configuration changed: nInputs {} -> {}, nOutputs {} -> {}\n", static_cast(this), old_settings.at("nInputs"), + new_settings.contains("nInputs") ? new_settings.at("nInputs") : "same", old_settings.at("nOutputs"), new_settings.contains("nOutputs") ? new_settings.at("nOutputs") : "same"); + inputs.resize(nInputs); + outputs.resize(nOutputs); + } + if (new_settings.contains("mapIn") || new_settings.contains("mapOut")) { + assert(mapIn.value.size() == mapOut.value.size() && "mapIn and mapOut must have the same length"); + _internalMapping.clear(); + + if (mapIn.value.size() != mapOut.value.size()) { + throw std::invalid_argument("Input and output map need to have the same number of elements"); + } + + for (std::size_t i = 0U; i < mapOut.value.size(); ++i) { + _internalMapping[mapIn.value[i]].push_back(mapOut.value[i]); + } + } + } + + using select_reader_t = typename PortIn::ReaderType; + using monitor_writer_t = typename PortOut::WriterType; + using input_reader_t = typename PortIn::ReaderType; + using output_writer_t = typename PortOut::WriterType; + + fair::graph::work_return_status_t + process_bulk(select_reader_t *select, // + const std::vector &ins, + monitor_writer_t *monOut, // + const std::vector &outs) { + if (_internalMapping.empty()) { + if (backPressure) { + std::for_each(ins.begin(), ins.end(), [](auto *input) { std::ignore = input->consume(0_UZ); }); + } else { + // make the implicit consume all available behaviour explicit + std::for_each(ins.begin(), ins.end(), [](auto *input) { std::ignore = input->consume(input->available()); }); + } + return work_return_status_t::OK; + } + + std::set used_inputs; + + if (const auto select_available = select->available(); select_available > 0) { + auto select_span = select->get(select_available); + _selectedSrc = select_span.back(); + std::ignore = select->consume(select_available); // consume all samples on the 'select' streaming input port + } + + auto copy_to_output = [](auto input_available, auto &input_span, auto *output_writer) { + auto output_span = output_writer->reserve_output_range(input_available); + + for (std::size_t i = 0; i < input_span.size(); ++i) { + output_span[i] = input_span[i]; + } + + output_span.publish(input_available); + }; + + bool monitor_out_processed = false; + + for (const auto &[input_index, output_indices] : _internalMapping) { + auto *input_reader = ins[input_index]; + auto input_available = input_reader->available(); + + for (const auto output_index : output_indices) { + auto *writer = outs[output_index]; + if (input_available > writer->available()) { + input_available = writer->available(); + } + } + + if (_selectedSrc == input_index) { + if (input_available > monOut->available()) { + input_available = monOut->available(); + } + } + + if (input_available == 0) { + continue; + } + + auto input_span = input_reader->get(input_available); + for (const auto output_index : output_indices) { + auto *output_writer = outs[output_index]; + copy_to_output(input_available, input_span, output_writer); + } + + if (_selectedSrc == input_index) { + monitor_out_processed = true; + copy_to_output(input_available, input_span, monOut); + } + + std::ignore = input_reader->consume(input_available); + used_inputs.insert(input_index); + } + + if (!monitor_out_processed && _selectedSrc < ins.size()) { + auto *input_reader = ins[_selectedSrc]; + auto input_available = std::min(input_reader->available(), monOut->available()); + auto input_span = input_reader->get(input_available); + copy_to_output(input_available, input_span, monOut); + std::ignore = input_reader->consume(input_available); + } + + for (auto src_port = 0U; src_port < ins.size(); ++src_port) { + if (used_inputs.contains(src_port)) continue; + + if (backPressure) { + std::ignore = ins[src_port]->consume(0_UZ); + + } else { + // make the implicit consume all available behaviour explicit + std::ignore = ins[src_port]->consume(ins[src_port]->available()); + } + } + + return work_return_status_t::OK; + } +}; +} // namespace gr::blocks::basic + +ENABLE_REFLECTION_FOR_TEMPLATE(gr::blocks::basic::Selector, selectOut, inputs, monitorOut, outputs, nInputs, nOutputs, mapIn, mapOut, backPressure); +static_assert(fair::graph::HasProcessBulkFunction>); + +#endif // include guard diff --git a/test/blocklib/core/unit-test/common_nodes.hpp b/test/blocklib/core/unit-test/common_nodes.hpp index 1b95a9b8..7dc3be27 100644 --- a/test/blocklib/core/unit-test/common_nodes.hpp +++ b/test/blocklib/core/unit-test/common_nodes.hpp @@ -94,10 +94,10 @@ class multi_adder : public fair::graph::node_model { _dynamic_input_ports.clear(); for (auto &input_port : _input_ports) { - _dynamic_input_ports.emplace_back(input_port); + _dynamic_input_ports.emplace_back(input_port, fair::graph::dynamic_port::non_owned_reference_tag{}); } if (_dynamic_output_ports.empty()) { - _dynamic_output_ports.emplace_back(_output_port); + _dynamic_output_ports.emplace_back(_output_port, fair::graph::dynamic_port::non_owned_reference_tag{}); } _dynamic_ports_loaded = true; } diff --git a/test/qa_dynamic_port.cpp b/test/qa_dynamic_port.cpp index 576eb89d..93ea3a48 100644 --- a/test/qa_dynamic_port.cpp +++ b/test/qa_dynamic_port.cpp @@ -153,8 +153,8 @@ const boost::ut::suite PortApiTests = [] { PortIn in; std::vector port_list; - port_list.emplace_back(out); - port_list.emplace_back(in); + port_list.emplace_back(out, dynamic_port::non_owned_reference_tag{}); + port_list.emplace_back(in, dynamic_port::non_owned_reference_tag{}); expect(eq(port_list.size(), 2_UZ)); }; diff --git a/test/qa_fft.cpp b/test/qa_fft.cpp index e9c4d673..9a4fe42b 100644 --- a/test/qa_fft.cpp +++ b/test/qa_fft.cpp @@ -278,7 +278,7 @@ const boost::ut::suite<"Fourier Transforms"> fftTests = [] { auto threadPool = std::make_shared("custom pool", fair::thread_pool::CPU_BOUND, 2, 2); fg::graph flow1; auto &source1 = flow1.make_node>(); - auto &fftBlock = flow1.make_node>({ { "fftSize", 16 } }); + auto &fftBlock = flow1.make_node>({ { "fftSize", static_cast(16) } }); std::ignore = flow1.connect<"out">(source1).to<"in">(fftBlock); auto sched1 = Scheduler(std::move(flow1), threadPool); @@ -286,7 +286,7 @@ const boost::ut::suite<"Fourier Transforms"> fftTests = [] { for (int i = 0; i < 2; i++) { fg::graph flow2; auto &source2 = flow2.make_node>(); - auto &fft2 = flow2.make_node>({ { "fftSize", 16 } }); + auto &fft2 = flow2.make_node>({ { "fftSize", static_cast(16) } }); std::ignore = flow2.connect<"out">(source2).to<"in">(fft2); auto sched2 = Scheduler(std::move(flow2), threadPool); sched2.run_and_wait(); @@ -433,4 +433,4 @@ const boost::ut::suite<"window functions"> windowTests = [] { int main() { /* not needed for UT */ -} \ No newline at end of file +} diff --git a/test/qa_hier_node.cpp b/test/qa_hier_node.cpp index a8e8c174..0131c043 100644 --- a/test/qa_hier_node.cpp +++ b/test/qa_hier_node.cpp @@ -55,9 +55,9 @@ class hier_node : public fg::node_model { std::ignore = graph.connect<"scaled">(left_scale_block).to<"addend0">(adder_block); std::ignore = graph.connect<"scaled">(right_scale_block).to<"addend1">(adder_block); - _dynamic_input_ports.emplace_back(fg::input_port<0>(&left_scale_block)); - _dynamic_input_ports.emplace_back(fg::input_port<0>(&right_scale_block)); - _dynamic_output_ports.emplace_back(fg::output_port<0>(&adder_block)); + _dynamic_input_ports.emplace_back(fg::input_port<0>(&left_scale_block), fg::dynamic_port::non_owned_reference_tag{}); + _dynamic_input_ports.emplace_back(fg::input_port<0>(&right_scale_block), fg::dynamic_port::non_owned_reference_tag{}); + _dynamic_output_ports.emplace_back(fg::output_port<0>(&adder_block), fg::dynamic_port::non_owned_reference_tag{}); _dynamic_ports_loaded = true; return graph; diff --git a/test/qa_selector.cpp b/test/qa_selector.cpp new file mode 100644 index 00000000..2a610413 --- /dev/null +++ b/test/qa_selector.cpp @@ -0,0 +1,358 @@ +#include + +#include +#include +#include +#include + +#include +#include + +#include "blocklib/core/selector.hpp" + +namespace fg = fair::graph; +using namespace fair::literals; + +template +struct repeated_source : public fg::node> { + std::uint32_t identifier = 0; + std::uint32_t remaining_events_count; + std::vector values; + std::vector::const_iterator values_next; + + fg::PortOut out; + + void + settings_changed(const fair::graph::property_map & /*old_settings*/, const fair::graph::property_map &new_settings) noexcept { + if (new_settings.contains("values")) { + values_next = values.cbegin(); + } + } + + fg::work_return_t + work(std::size_t requested_work) { + if (values_next == values.cend()) { + values_next = values.cbegin(); + } + + if (remaining_events_count != 0) { + using namespace fair::literals; + auto &port = fg::output_port<0>(this); + auto &writer = port.streamWriter(); + auto data = writer.reserve_output_range(1_UZ); + + data[0] = *values_next; + data.publish(1_UZ); + + remaining_events_count--; + if (remaining_events_count == 0) { + fmt::print("{}: Last value sent was {}\n", static_cast(this), *values_next); + } + + values_next++; + + return { requested_work, 1UL, fg::work_return_status_t::OK }; + } else { + // TODO: Investigate what schedulers do when there is an event written, but we return DONE + return { requested_work, 1UL, fg::work_return_status_t::DONE }; + } + } +}; + +ENABLE_REFLECTION_FOR_TEMPLATE_FULL((typename T), (repeated_source), identifier, remaining_events_count, values, out); + +template +struct validator_sink : public fg::node> { + std::uint32_t identifier = 0; + fg::PortIn in; + + std::vector expected_values; + std::vector::const_iterator expected_values_next; + bool all_ok = true; + + bool + verify() const { + return all_ok && (expected_values_next == expected_values.cend()); + } + + void + settings_changed(const fair::graph::property_map & /*old_settings*/, const fair::graph::property_map &new_settings) noexcept { + if (new_settings.contains("expected_values")) { + expected_values_next = expected_values.cbegin(); + } + } + + void + process_one(T value) { + if (expected_values_next == expected_values.cend()) { + all_ok = false; + fmt::print("Error: {}#{}: We got more values than expected\n", static_cast(this), identifier); + + } else { + if (value != *expected_values_next) { + all_ok = false; + fmt::print("Error: {}#{}: Got a value {}, but wanted {} (position {})\n", static_cast(this), identifier, value, *expected_values_next, + std::distance(expected_values.cbegin(), expected_values_next)); + } + + expected_values_next++; + } + } +}; + +ENABLE_REFLECTION_FOR_TEMPLATE_FULL((typename T), (validator_sink), identifier, expected_values, in); + +template() + std::declval())> +struct adder : public fg::node> { + fg::PortIn addend0; + fg::PortIn addend1; + fg::PortOut sum; + + template V> + [[nodiscard]] constexpr auto + process_one(V a, V b) const noexcept { + return a + b; + } +}; + +ENABLE_REFLECTION_FOR_TEMPLATE_FULL((typename T), (adder), addend0, addend1, sum); + +struct test_definition { + std::uint32_t value_count; + std::vector> mapping; + std::vector> input_values; + std::vector> output_values; + std::uint32_t monitor_source; + std::vector monitor_values; + bool back_pressure; +}; + +void +execute_selector_test(test_definition definition) { + using namespace boost::ut; + + const std::uint32_t sources_count = definition.input_values.size(); + const std::uint32_t sinks_count = definition.output_values.size(); + + fg::graph graph; + std::vector *> sources; + std::vector *> sinks; + gr::blocks::basic::Selector *selector; + + std::vector mapIn(definition.mapping.size()); + std::vector mapOut(definition.mapping.size()); + std::ranges::transform(definition.mapping, mapIn.begin(), [](auto &p) { return p.first; }); + std::ranges::transform(definition.mapping, mapOut.begin(), [](auto &p) { return p.second; }); + + selector = std::addressof(graph.make_node>({ { "nInputs", sources_count }, // + { "nOutputs", sinks_count }, // + { "mapIn", mapIn }, // + { "mapOut", mapOut }, // + { "backPressure", definition.back_pressure } })); + + for (std::uint32_t source_index = 0; source_index < sources_count; ++source_index) { + sources.push_back(std::addressof(graph.make_node>({ { "remaining_events_count", definition.value_count }, // + { "identifier", source_index }, // + { "values", definition.input_values[source_index] } }))); + expect(sources[source_index]->settings().apply_staged_parameters().empty()); + expect(fair::graph::connection_result_t::SUCCESS == graph.dynamic_connect(*sources[source_index], 0, *selector, source_index + 1 /* there's one port before the inputs */)); + } + + for (std::uint32_t sink_index = 0; sink_index < sinks_count; ++sink_index) { + sinks.push_back(std::addressof(graph.make_node>({ { "identifier", sink_index }, // + { "expected_values", definition.output_values[sink_index] } }))); + expect(sinks[sink_index]->settings().apply_staged_parameters().empty()); + expect(fair::graph::connection_result_t::SUCCESS == graph.dynamic_connect(*selector, sink_index + 1 /* there's one port before the outputs */, *sinks[sink_index], 0)); + } + + validator_sink *monitor_sink = std::addressof(graph.make_node>({ { "identifier", static_cast(-1) }, // + { "expected_values", definition.monitor_values } })); + expect(monitor_sink->settings().apply_staged_parameters().empty()); + expect(fair::graph::connection_result_t::SUCCESS == graph.dynamic_connect(*selector, 0, *monitor_sink, 0)); + + for (std::size_t iterration = 0; iterration < definition.value_count * sources_count; ++iterration) { + const auto max = std::numeric_limits::max(); + for (auto *source : sources) { + source->work(max); + } + selector->work(max); + for (auto *sink : sinks) { + sink->work(max); + } + monitor_sink->work(max); + } + + if (!definition.back_pressure) { + for (const auto &input : selector->inputs) { + expect(eq(input.streamReader().available(), 0)); + } + } + + for (auto *sink : sinks) { + expect(sink->verify()); + } +} + +const boost::ut::suite SelectorTest = [] { + using namespace boost::ut; + using namespace gr::blocks::basic; + + "Selector constructor"_test = [] { + Selector block_nop({ { "name", "block_nop" } }); + expect(block_nop.settings().apply_staged_parameters().empty()); + expect(eq(block_nop.nInputs, 0U)); + expect(eq(block_nop.nOutputs, 0U)); + expect(eq(block_nop.inputs.size(), 0U)); + expect(eq(block_nop.outputs.size(), 0U)); + expect(eq(block_nop._internalMapping.size(), 0U)); + + Selector block({ { "name", "block" }, { "nInputs", 4U }, { "nOutputs", 3U } }); + expect(block.settings().apply_staged_parameters().empty()); + expect(eq(block.nInputs, 4U)); + expect(eq(block.nOutputs, 3U)); + expect(eq(block.inputs.size(), 4U)); + expect(eq(block.outputs.size(), 3U)); + expect(eq(block._internalMapping.size(), 0U)); + }; + + "basic Selector"_test = [] { + using T = double; + const std::vector outputMap{ 1U, 0U }; + Selector block({ { "nInputs", 3U }, { "nOutputs", 2U }, { "mapIn", std::vector{ 0U, 1U } }, { "mapOut", outputMap } }); // N.B. 3rd input is unconnected + expect(block.settings().apply_staged_parameters().empty()); + expect(eq(block._internalMapping.size(), 2U)); + + using internal_mapping_t = decltype(block._internalMapping); + expect(block._internalMapping == internal_mapping_t{ { 0U, { outputMap[0] } }, { 1U, { outputMap[1] } } }); + }; + + // Tests without the back pressure + + "Selector 1 to 1 mapping"_test = [] { + execute_selector_test({ .value_count = 5, // + .mapping = { { 0, 0 }, { 1, 1 }, { 2, 2 } }, // + .input_values = { { 1 }, { 2 }, { 3 } }, // + .output_values = { { 1, 1, 1, 1, 1 }, { 2, 2, 2, 2, 2 }, { 3, 3, 3, 3, 3 } }, // + .monitor_source = -1U, // + .monitor_values = {}, // + .back_pressure = false }); + }; + + "Selector only one input used"_test = [] { + execute_selector_test({ .value_count = 5, // + .mapping = { { 1, 1 } }, // + .input_values = { { 1 }, { 2 }, { 3 } }, // + .output_values = { {}, { 2, 2, 2, 2, 2 }, {} }, // + .monitor_source = -1U, // + .monitor_values = {}, // + .back_pressure = false }); + }; + + "Selector all for one"_test = [] { + execute_selector_test({ .value_count = 5, // + .mapping = { { 0, 1 }, { 1, 1 }, { 2, 1 } }, // + .input_values = { { 1 }, { 2 }, { 3 } }, // + .output_values = { {}, { 1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3 }, {} }, // + .monitor_source = -1U, // + .monitor_values = {}, // + .back_pressure = false }); + }; + + "Selector one for all"_test = [] { + execute_selector_test({ .value_count = 5, // + .mapping = { { 1, 0 }, { 1, 1 }, { 1, 2 } }, // + .input_values = { { 1 }, { 2 }, { 3 } }, // + .output_values = { { 2, 2, 2, 2, 2 }, { 2, 2, 2, 2, 2 }, { 2, 2, 2, 2, 2 } }, // + .monitor_source = -1U, // + .monitor_values = {}, // + .back_pressure = false }); + }; + + // tests with the back pressure + + "Selector 1 to 1 mapping, with back pressure"_test = [] { + execute_selector_test({ .value_count = 5, // + .mapping = { { 0, 0 }, { 1, 1 }, { 2, 2 } }, // + .input_values = { { 1 }, { 2 }, { 3 } }, // + .output_values = { { 1, 1, 1, 1, 1 }, { 2, 2, 2, 2, 2 }, { 3, 3, 3, 3, 3 } }, // + .monitor_source = -1U, // + .monitor_values = {}, // + .back_pressure = true }); + }; + + "Selector only one input used, with back pressure"_test = [] { + execute_selector_test({ .value_count = 5, // + .mapping = { { 1, 1 } }, // + .input_values = { { 1 }, { 2 }, { 3 } }, // + .output_values = { {}, { 2, 2, 2, 2, 2 }, {} }, // + .monitor_source = -1U, // + .monitor_values = {}, // + .back_pressure = true }); + }; + + "Selector all for one, with back pressure"_test = [] { + execute_selector_test({ .value_count = 5, // + .mapping = { { 0, 1 }, { 1, 1 }, { 2, 1 } }, // + .input_values = { { 1 }, { 2 }, { 3 } }, // + .output_values = { {}, { 1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3 }, {} }, // + .monitor_source = -1U, // + .monitor_values = {}, // + .back_pressure = true }); + }; + + "Selector one for all, with back pressure"_test = [] { + execute_selector_test({ .value_count = 5, // + .mapping = { { 1, 0 }, { 1, 1 }, { 1, 2 } }, // + .input_values = { { 1 }, { 2 }, { 3 } }, // + .output_values = { { 2, 2, 2, 2, 2 }, { 2, 2, 2, 2, 2 }, { 2, 2, 2, 2, 2 } }, // + .monitor_source = -1U, // + .monitor_values = {}, // + .back_pressure = true }); + }; + + // Tests with a monitor + + "Selector 1 to 1 mapping, with monitor, monitor source already mapped"_test = [] { + execute_selector_test({ .value_count = 5, // + .mapping = { { 0, 0 }, { 1, 1 }, { 2, 2 } }, // + .input_values = { { 1 }, { 2 }, { 3 } }, // + .output_values = { { 1, 1, 1, 1, 1 }, { 2, 2, 2, 2, 2 }, { 3, 3, 3, 3, 3 } }, // + .monitor_source = 0U, // + .monitor_values = { 1, 1, 1, 1, 1 }, // + .back_pressure = false }); + }; + + "Selector only one input used, with monitor, monitor source not mapped"_test = [] { + execute_selector_test({ .value_count = 5, // + .mapping = { { 1, 1 } }, // + .input_values = { { 1 }, { 2 }, { 3 } }, // + .output_values = { {}, { 2, 2, 2, 2, 2 }, {} }, // + .monitor_source = 0U, // + .monitor_values = { 1, 1, 1, 1, 1 }, // + .back_pressure = false }); + }; + + "Selector all for one, with monitor, monitor source already mapped"_test = [] { + execute_selector_test({ .value_count = 5, // + .mapping = { { 0, 1 }, { 1, 1 }, { 2, 1 } }, // + .input_values = { { 1 }, { 2 }, { 3 } }, // + .output_values = { {}, { 1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3 }, {} }, // + .monitor_source = 1U, // + .monitor_values = { 2, 2, 2, 2, 2 }, // + .back_pressure = false }); + }; + + "Selector one for all, with monitor, monitor source already mapped"_test = [] { + execute_selector_test({ .value_count = 5, // + .mapping = { { 1, 0 }, { 1, 1 }, { 1, 2 } }, // + .input_values = { { 1 }, { 2 }, { 3 } }, // + .output_values = { { 2, 2, 2, 2, 2 }, { 2, 2, 2, 2, 2 }, { 2, 2, 2, 2, 2 } }, // + .monitor_source = 1U, // + .monitor_values = { 2, 2, 2, 2, 2 }, // + .back_pressure = false }); + }; +}; + +int +main() { /* not needed for UT */ +}