diff --git a/core/include/gnuradio-4.0/Block.hpp b/core/include/gnuradio-4.0/Block.hpp index 04e8a134..399f8be5 100644 --- a/core/include/gnuradio-4.0/Block.hpp +++ b/core/include/gnuradio-4.0/Block.hpp @@ -327,7 +327,7 @@ enum class Category { * } * * void start() override { - * propertyCallbacks.emplace(kMyCustomProperty, &MyBlock::propertyCallbackMyCustom); + * propertyCallbacks.emplace(kMyCustomProperty, std::mem_fn(&MyBlock::propertyCallbackMyCustom)); * } * }; * @endcode @@ -428,18 +428,18 @@ class Block : public lifecycle::StateMachine { MsgPortInBuiltin msgIn; MsgPortOutBuiltin msgOut; - using PropertyCallback = std::optional (Derived::*)(std::string_view, Message); + using PropertyCallback = std::function(Derived&, std::string_view, Message)>; std::map propertyCallbacks{ - {block::property::kHeartbeat, &Block::propertyCallbackHeartbeat}, // - {block::property::kEcho, &Block::propertyCallbackEcho}, // - {block::property::kLifeCycleState, &Block::propertyCallbackLifecycleState}, // - {block::property::kSetting, &Block::propertyCallbackSettings}, // - {block::property::kStagedSetting, &Block::propertyCallbackStagedSettings}, // - {block::property::kStoreDefaults, &Block::propertyCallbackStoreDefaults}, // - {block::property::kResetDefaults, &Block::propertyCallbackResetDefaults}, // - {block::property::kActiveContext, &Block::propertyCallbackActiveContext}, // - {block::property::kSettingsCtx, &Block::propertyCallbackSettingsCtx}, // - {block::property::kSettingsContexts, &Block::propertyCallbackSettingsContexts}, // + {block::property::kHeartbeat, std::mem_fn(&Block::propertyCallbackHeartbeat)}, // + {block::property::kEcho, std::mem_fn(&Block::propertyCallbackEcho)}, // + {block::property::kLifeCycleState, std::mem_fn(&Block::propertyCallbackLifecycleState)}, // + {block::property::kSetting, std::mem_fn(&Block::propertyCallbackSettings)}, // + {block::property::kStagedSetting, std::mem_fn(&Block::propertyCallbackStagedSettings)}, // + {block::property::kStoreDefaults, std::mem_fn(&Block::propertyCallbackStoreDefaults)}, // + {block::property::kResetDefaults, std::mem_fn(&Block::propertyCallbackResetDefaults)}, // + {block::property::kActiveContext, std::mem_fn(&Block::propertyCallbackActiveContext)}, // + {block::property::kSettingsCtx, std::mem_fn(&Block::propertyCallbackSettingsCtx)}, // + {block::property::kSettingsContexts, std::mem_fn(&Block::propertyCallbackSettingsContexts)}, // }; std::map> propertySubscriptions; @@ -1358,17 +1358,20 @@ class Block : public lifecycle::StateMachine { } std::size_t getMergedBlockLimit() { - if constexpr (requires(const Derived& d) { - { available_samples(d) } -> std::same_as; - }) { + if constexpr (Derived::blockCategory != block::Category::NormalBlock) { + return 0; + } else if constexpr (requires(const Derived& d) { + { available_samples(d) } -> std::same_as; + }) { return available_samples(self()); } else if constexpr (traits::block::stream_input_port_types::size == 0 && traits::block::stream_output_port_types::size == 0) { // allow blocks that have neither input nor output ports (by merging source to sink block) -> use internal buffer size constexpr gr::Size_t chunkSize = Derived::merged_work_chunk_size(); static_assert(chunkSize != std::dynamic_extent && chunkSize > 0, "At least one internal port must define a maximum number of samples or the non-member/hidden " "friend function `available_samples(const BlockType&)` must be defined."); return chunkSize; + } else { + return std::numeric_limits::max(); } - return std::numeric_limits::max(); } template @@ -1670,7 +1673,11 @@ class Block : public lifecycle::StateMachine { } } } else { // block does not define any valid processing function - static_assert(meta::always_false>, "neither processBulk(...) nor processOne(...) implemented"); + if constexpr (Derived::blockCategory != block::Category::NormalBlock) { + return {requestedWork, 0UZ, OK}; + } else { + static_assert(meta::always_false>, "neither processBulk(...) nor processOne(...) implemented"); + } } // sanitise input/output samples based on explicit user-defined processBulk(...) return status @@ -1852,7 +1859,7 @@ class Block : public lifecycle::StateMachine { std::optional retMessage; try { - retMessage = (self().*callback)(message.endpoint, message); // N.B. life-time: message is copied + retMessage = callback(self(), message.endpoint, message); // N.B. life-time: message is copied } catch (const gr::exception& e) { retMessage = Message{message}; retMessage->data = std::unexpected(Error(e)); diff --git a/core/include/gnuradio-4.0/BlockModel.hpp b/core/include/gnuradio-4.0/BlockModel.hpp index 2a044960..c1617739 100644 --- a/core/include/gnuradio-4.0/BlockModel.hpp +++ b/core/include/gnuradio-4.0/BlockModel.hpp @@ -160,10 +160,10 @@ class BlockModel { MsgPortInBuiltin* msgIn; MsgPortOutBuiltin* msgOut; - static std::string_view portName(const DynamicPortOrCollection& portOrCollection) { + static std::string portName(const DynamicPortOrCollection& portOrCollection) { return std::visit(meta::overloaded{ // [](const gr::DynamicPort& port) { return port.name; }, // - [](const NamedPortCollection& namedCollection) { return namedCollection.name; }}, + [](const NamedPortCollection& namedCollection) { return std::string(namedCollection.name); }}, portOrCollection); } @@ -375,11 +375,12 @@ constexpr bool contains_type = (std::is_same_v || ...); template requires std::is_constructible_v class BlockWrapper : public BlockModel { -private: +protected: static_assert(std::is_same_v>); T _block; std::string _type_name = gr::meta::type_name(); +protected: [[nodiscard]] constexpr const auto& blockRef() const noexcept { if constexpr (requires { *_block; }) { return *_block; @@ -401,36 +402,37 @@ class BlockWrapper : public BlockModel { msgOut = std::addressof(_block.msgOut); } - template - constexpr static auto& processPort(auto& where, TPort& port) noexcept { - where.push_back(gr::DynamicPort(port, DynamicPort::non_owned_reference_tag{})); - return where.back(); - } - - void dynamicPortLoader() { + void dynamicPortsLoader() { if (_dynamicPortsLoaded) { return; } - auto registerPort = [this](DynamicPorts& where, auto, CurrentPortType*) noexcept { - if constexpr (CurrentPortType::kIsDynamicCollection) { - auto& collection = CurrentPortType::getPortObject(blockRef()); - NamedPortCollection result; - result.name = CurrentPortType::Name; - for (auto& port : collection) { - processPort(result.ports, port); - } - where.push_back(std::move(result)); - } else { - auto& port = CurrentPortType::getPortObject(blockRef()); - port.name = CurrentPortType::Name; - processPort(where, port); - } + auto processPort = [](auto& where, TPort& port) -> auto& { + where.push_back(gr::DynamicPort(port, DynamicPort::non_owned_reference_tag{})); + return where.back(); }; - using Node = std::remove_cvref_t; - traits::block::all_input_ports::for_each(registerPort, _dynamicInputPorts); - traits::block::all_output_ports::for_each(registerPort, _dynamicOutputPorts); + using TBlock = std::remove_cvref_t; + if constexpr (TBlock::blockCategory == block::Category::NormalBlock) { + auto registerPort = [this, processPort](DynamicPorts& where, auto, CurrentPortType*) noexcept { + if constexpr (CurrentPortType::kIsDynamicCollection) { + auto& collection = CurrentPortType::getPortObject(blockRef()); + NamedPortCollection result; + result.name = CurrentPortType::Name; + for (auto& port : collection) { + processPort(result.ports, port); + } + where.push_back(std::move(result)); + } else { + auto& port = CurrentPortType::getPortObject(blockRef()); + port.name = CurrentPortType::Name; + processPort(where, port); + } + }; + + traits::block::all_input_ports::for_each(registerPort, _dynamicInputPorts); + traits::block::all_output_ports::for_each(registerPort, _dynamicOutputPorts); + } _dynamicPortsLoaded = true; } @@ -439,7 +441,7 @@ class BlockWrapper : public BlockModel { BlockWrapper() : BlockWrapper(gr::property_map()) {} explicit BlockWrapper(gr::property_map initParameter) : _block(std::move(initParameter)) { initMessagePorts(); - _dynamicPortsLoader = std::bind_front(&BlockWrapper::dynamicPortLoader, this); + _dynamicPortsLoader = [this] { this->dynamicPortsLoader(); }; } BlockWrapper(const BlockWrapper& other) = delete; diff --git a/core/include/gnuradio-4.0/Buffer.hpp b/core/include/gnuradio-4.0/Buffer.hpp index e092b9a6..017c53b8 100644 --- a/core/include/gnuradio-4.0/Buffer.hpp +++ b/core/include/gnuradio-4.0/Buffer.hpp @@ -1,6 +1,7 @@ #ifndef GNURADIO_BUFFER2_H #define GNURADIO_BUFFER2_H +#include #include #include #include diff --git a/core/include/gnuradio-4.0/Graph.hpp b/core/include/gnuradio-4.0/Graph.hpp index 3cd63aa8..a95958d5 100644 --- a/core/include/gnuradio-4.0/Graph.hpp +++ b/core/include/gnuradio-4.0/Graph.hpp @@ -54,6 +54,14 @@ inline static const char* kGraphInspect = "GraphInspect"; inline static const char* kGraphInspected = "GraphInspected"; inline static const char* kRegistryBlockTypes = "RegistryBlockTypes"; + +inline static const char* kSubgraphCreate = "SubgraphCreate"; +inline static const char* kSubgraphExportPorts = "SubgraphExportPorts"; +inline static const char* kSubgraphDisband = "SubgraphDisband"; + +inline static const char* kSubgraphCreated = "SubgraphCreated"; +inline static const char* kSubgraphExportedPorts = "SubgraphExportedPorts"; +inline static const char* kSubgraphDisbanded = "SubgraphDisbanded"; } // namespace graph::property class Graph : public gr::Block { @@ -184,14 +192,16 @@ class Graph : public gr::Block { Graph(property_map settings = {}) : gr::Block(std::move(settings)) { _blocks.reserve(100); // TODO: remove - propertyCallbacks[graph::property::kEmplaceBlock] = &Graph::propertyCallbackEmplaceBlock; - propertyCallbacks[graph::property::kRemoveBlock] = &Graph::propertyCallbackRemoveBlock; - propertyCallbacks[graph::property::kInspectBlock] = &Graph::propertyCallbackInspectBlock; - propertyCallbacks[graph::property::kReplaceBlock] = &Graph::propertyCallbackReplaceBlock; - propertyCallbacks[graph::property::kEmplaceEdge] = &Graph::propertyCallbackEmplaceEdge; - propertyCallbacks[graph::property::kRemoveEdge] = &Graph::propertyCallbackRemoveEdge; - propertyCallbacks[graph::property::kGraphInspect] = &Graph::propertyCallbackGraphInspect; - propertyCallbacks[graph::property::kRegistryBlockTypes] = &Graph::propertyCallbackRegistryBlockTypes; + propertyCallbacks[graph::property::kEmplaceBlock] = std::mem_fn(&Graph::propertyCallbackEmplaceBlock); + propertyCallbacks[graph::property::kRemoveBlock] = std::mem_fn(&Graph::propertyCallbackRemoveBlock); + propertyCallbacks[graph::property::kInspectBlock] = std::mem_fn(&Graph::propertyCallbackInspectBlock); + propertyCallbacks[graph::property::kReplaceBlock] = std::mem_fn(&Graph::propertyCallbackReplaceBlock); + propertyCallbacks[graph::property::kEmplaceEdge] = std::mem_fn(&Graph::propertyCallbackEmplaceEdge); + propertyCallbacks[graph::property::kRemoveEdge] = std::mem_fn(&Graph::propertyCallbackRemoveEdge); + propertyCallbacks[graph::property::kGraphInspect] = std::mem_fn(&Graph::propertyCallbackGraphInspect); + propertyCallbacks[graph::property::kRegistryBlockTypes] = std::mem_fn(&Graph::propertyCallbackRegistryBlockTypes); + propertyCallbacks[graph::property::kSubgraphCreate] = std::mem_fn(&Graph::propertyCallbackSubgraphCreate); + propertyCallbacks[graph::property::kSubgraphDisband] = std::mem_fn(&Graph::propertyCallbackSubgraphDisband); } Graph(Graph&) = delete; // there can be only one owner of Graph Graph& operator=(Graph&) = delete; // there can be only one owner of Graph @@ -318,13 +328,13 @@ class Graph : public gr::Block { property_map inputPorts; for (auto& portOrCollection : block->dynamicInputPorts()) { - inputPorts[std::string(BlockModel::portName(portOrCollection))] = serializePortOrCollection(portOrCollection); + inputPorts[BlockModel::portName(portOrCollection)] = serializePortOrCollection(portOrCollection); } result["inputPorts"] = std::move(inputPorts); property_map outputPorts; for (auto& portOrCollection : block->dynamicOutputPorts()) { - outputPorts[std::string(BlockModel::portName(portOrCollection))] = serializePortOrCollection(portOrCollection); + outputPorts[BlockModel::portName(portOrCollection)] = serializePortOrCollection(portOrCollection); } result["outputPorts"] = std::move(outputPorts); @@ -555,6 +565,19 @@ class Graph : public gr::Block { return message; } + std::optional propertyCallbackSubgraphCreate([[maybe_unused]] std::string_view propertyName, Message message) { + // creates a subgraph given a list of uniqueNames + // edges that are split end up being the exported ports + // managed/unmanaged? + return {}; + } + + std::optional propertyCallbackSubgraphDisband([[maybe_unused]] std::string_view propertyName, Message message) { + // move all the blocks from the subgraph into the top-level, + // keeping the connections + return {}; + } + // connect using the port index template [[nodiscard]] auto connectInternal(Source& source) { @@ -903,10 +926,10 @@ class MergedGraph : public Block::size() - InId - 1>(std::forward_as_tuple(std::forward(inputs)...), std::move(std::get(left_out))); if constexpr (traits::block::stream_output_port_types::size == 2 && traits::block::stream_output_port_types::size == 1) { - return std::make_tuple(std::move(std::get(left_out)), std::move(right_out)); + return std::make_tuple(std::move(std::get < OutId ^ 1 > (left_out)), std::move(right_out)); } else if constexpr (traits::block::stream_output_port_types::size == 2) { - return std::tuple_cat(std::make_tuple(std::move(std::get(left_out))), std::move(right_out)); + return std::tuple_cat(std::make_tuple(std::move(std::get < OutId ^ 1 > (left_out))), std::move(right_out)); } else if constexpr (traits::block::stream_output_port_types::size == 1) { return [&](std::index_sequence, std::index_sequence) { return std::make_tuple(std::move(std::get(left_out))..., std::move(std::get(left_out))..., std::move(right_out)); }(std::make_index_sequence(), std::make_index_sequence::size - OutId - 1>()); diff --git a/core/include/gnuradio-4.0/Message.hpp b/core/include/gnuradio-4.0/Message.hpp index 504561c3..af5e2f65 100644 --- a/core/include/gnuradio-4.0/Message.hpp +++ b/core/include/gnuradio-4.0/Message.hpp @@ -134,6 +134,7 @@ void sendMessage(auto& port, std::string_view serviceName, std::string_view endp } WriterSpanLike auto msgSpan = port.streamWriter().template reserve(1UZ); msgSpan[0] = std::move(message); + msgSpan.publish(1UZ); } } // namespace detail diff --git a/core/include/gnuradio-4.0/Port.hpp b/core/include/gnuradio-4.0/Port.hpp index 68101a8d..4c55d95d 100644 --- a/core/include/gnuradio-4.0/Port.hpp +++ b/core/include/gnuradio-4.0/Port.hpp @@ -937,15 +937,19 @@ static_assert(std::is_default_constructible_v>); */ class DynamicPort { public: - std::string_view name; - std::int16_t& priority; // → dependents of a higher-prio port should be scheduled first (Q: make this by order of ports?) - std::size_t& min_samples; - std::size_t& max_samples; + std::string name; + std::int16_t priority; // → dependents of a higher-prio port should be scheduled first (Q: make this by order of ports?) + std::size_t min_samples; + std::size_t max_samples; private: struct model { // intentionally class-private definition to limit interface exposure and enhance composition virtual ~model() = default; + [[nodiscard]] virtual DynamicPort weakRef() const noexcept = 0; + + [[nodiscard]] virtual std::intptr_t internalId() const noexcept = 0; + [[nodiscard]] virtual std::any defaultValue() const noexcept = 0; [[nodiscard]] virtual bool setDefaultValue(const std::any& val) noexcept = 0; @@ -1021,6 +1025,10 @@ class DynamicPort { ~PortWrapper() override = default; + [[nodiscard]] virtual DynamicPort weakRef() const noexcept override; + + [[nodiscard]] std::intptr_t internalId() const noexcept override { return reinterpret_cast(std::addressof(_value)); } + [[nodiscard]] std::any defaultValue() const noexcept override { return _value.defaultValue(); } [[nodiscard]] bool setDefaultValue(const std::any& val) noexcept override { return _value.setDefaultValue(val); } @@ -1071,8 +1079,19 @@ class DynamicPort { DynamicPort(const DynamicPort& arg) = delete; DynamicPort& operator=(const DynamicPort& arg) = delete; - DynamicPort(DynamicPort&& arg) = default; - DynamicPort& operator=(DynamicPort&& arg) = delete; + DynamicPort(DynamicPort&& other) noexcept : name(other.name), priority(other.priority), min_samples(other.min_samples), max_samples(other.max_samples), _accessor(std::move(other._accessor)) {} + auto& operator=(DynamicPort&& other) noexcept { + auto tmp = std::move(other); + std::swap(_accessor, tmp._accessor); + std::swap(name, tmp.name); + std::swap(priority, tmp.priority); + std::swap(min_samples, tmp.min_samples); + std::swap(max_samples, tmp.max_samples); + return *this; + } + + bool operator==(const DynamicPort& other) const noexcept { return _accessor->internalId() == other._accessor->internalId(); } + bool operator!=(const DynamicPort& other) const noexcept { return _accessor->internalId() != other._accessor->internalId(); } // TODO: The lifetime of ports is a problem here, if we keep a reference to the port in DynamicPort, the port object/ can not be reallocated template @@ -1081,6 +1100,8 @@ class DynamicPort { template explicit constexpr DynamicPort(T&& arg, owned_value_tag) noexcept : name(arg.name), priority(arg.priority), min_samples(arg.min_samples), max_samples(arg.max_samples), _accessor{std::make_unique>(std::forward(arg))} {} + [[nodiscard]] DynamicPort weakRef() const noexcept { return _accessor->weakRef(); } + [[nodiscard]] std::any defaultValue() const noexcept { return _accessor->defaultValue(); } [[nodiscard]] bool setDefaultValue(const std::any& val) noexcept { return _accessor->setDefaultValue(val); } @@ -1113,6 +1134,11 @@ class DynamicPort { [[nodiscard]] ConnectionResult connect(DynamicPort& dst_port) { return _accessor->connect(dst_port); } }; +template +[[nodiscard]] DynamicPort DynamicPort::PortWrapper::weakRef() const noexcept { + return DynamicPort(_value, DynamicPort::non_owned_reference_tag{}); +} + static_assert(PortLike); namespace detail { diff --git a/core/test/message_utils.hpp b/core/test/message_utils.hpp new file mode 100644 index 00000000..b1ee1856 --- /dev/null +++ b/core/test/message_utils.hpp @@ -0,0 +1,101 @@ +#ifndef CORE_TEST_MESSAGE_UTILS_HPP +#define CORE_TEST_MESSAGE_UTILS_HPP + +#include +#include +#include + +#include + +#include +#include +#include + +namespace gr::testing { + +using namespace boost::ut; +using namespace gr; + +using namespace std::chrono_literals; +using enum gr::message::Command; + +template +bool awaitCondition(std::chrono::milliseconds timeout, Condition condition) { + auto start = std::chrono::steady_clock::now(); + while (std::chrono::steady_clock::now() - start < timeout) { + if (condition()) { + return true; + } + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + return false; +} + +inline auto returnReplyMsg(gr::MsgPortIn& port, std::optional expectedEndpoint = {}, std::source_location caller = std::source_location::current()) { + auto available = port.streamReader().available(); + expect(gt(available, 0UZ)) << "didn't receive a reply message, caller: " << caller.file_name() << ":" << caller.line(); + ReaderSpanLike auto messages = port.streamReader().get(available); + Message result; + + if (expectedEndpoint) { + auto it = std::ranges::find_if(messages, [endpoint = *expectedEndpoint](const auto& message) { return message.endpoint == endpoint; }); + expect(gt(available, 0UZ)) << "didn't receive the expected reply message, caller: " << caller.file_name() << ":" << caller.line(); + } else { + result = messages[0]; + } + + expect(messages.consume(messages.size())); + fmt::print("Test got a reply: {}\n", result); + return result; +}; + +inline auto awaitReplyMsg(auto& graph, std::chrono::milliseconds timeout, gr::MsgPortIn& port, std::optional expectedEndpoint = {}, std::source_location caller = std::source_location::current()) { + Message msg; + + awaitCondition(timeout, [&port, &graph] { + graph.processScheduledMessages(); + return port.streamReader().available() > 0; + }); + + return returnReplyMsg(port, expectedEndpoint, caller); +}; + +inline auto waitForAReply(gr::MsgPortIn& fromGraph, std::chrono::milliseconds maxWait = 1s, std::source_location currentSource = std::source_location::current()) { + auto startedAt = std::chrono::system_clock::now(); + while (fromGraph.streamReader().available() == 0) { + std::this_thread::sleep_for(100ms); + if (std::chrono::system_clock::now() - startedAt > maxWait) { + break; + } + } + expect(fromGraph.streamReader().available() > 0) << "Caller at" << currentSource.file_name() << ":" << currentSource.line(); + return fromGraph.streamReader().available() > 0; +}; + +inline auto sendEmplaceTestBlockMsg(gr::MsgPortOut& toGraph, gr::MsgPortIn& fromGraph, std::string type, std::string params, property_map properties) { + sendMessage(toGraph, "" /* serviceName */, graph::property::kEmplaceBlock /* endpoint */, // + {{"type", std::move(type)}, {"parameters", std::move(params)}, {"properties", std::move(properties)}} /* data */); + expect(waitForAReply(fromGraph)) << "didn't receive a reply message"; + + const Message reply = returnReplyMsg(fromGraph); + expect(reply.data.has_value()) << "emplace block failed and returned an error"; + return reply.data.has_value() ? std::get(reply.data.value().at("uniqueName"s)) : std::string{}; +}; + +inline auto sendEmplaceTestEdgeMsg(gr::MsgPortOut& toGraph, gr::MsgPortIn& fromGraph, std::string sourceBlock, std::string sourcePort, std::string destinationBlock, std::string destinationPort) { + gr::property_map data = {{"sourceBlock", sourceBlock}, {"sourcePort", sourcePort}, // + {"destinationBlock", destinationBlock}, {"destinationPort", destinationPort}, // + {"minBufferSize", gr::Size_t()}, {"weight", 0}, {"edgeName", "unnamed edge"}}; + sendMessage(toGraph, "" /* serviceName */, graph::property::kEmplaceEdge /* endpoint */, data /* data */); + if (!waitForAReply(fromGraph)) { + fmt::println("didn't receive a reply message for {}", data); + return false; + } + + const Message reply = returnReplyMsg(fromGraph); + return reply.data.has_value(); +}; + +} // namespace gr::testing + +#endif // include guard diff --git a/core/test/qa_GraphMessages.cpp b/core/test/qa_GraphMessages.cpp index d47f58df..3c1173fc 100644 --- a/core/test/qa_GraphMessages.cpp +++ b/core/test/qa_GraphMessages.cpp @@ -1,7 +1,7 @@ #include -#include "gnuradio-4.0/Block.hpp" -#include "gnuradio-4.0/Message.hpp" +#include +#include #include #include #include @@ -10,7 +10,7 @@ #include #include -#include +#include "message_utils.hpp" using namespace std::chrono_literals; using namespace std::string_literals; @@ -55,47 +55,6 @@ const boost::ut::suite<"Graph Formatter Tests"> graphFormatterTests = [] { }; }; -template -bool awaitCondition(std::chrono::milliseconds timeout, Condition condition) { - auto start = std::chrono::steady_clock::now(); - while (std::chrono::steady_clock::now() - start < timeout) { - if (condition()) { - return true; - } - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - } - return false; -} - -auto returnReplyMsg(gr::MsgPortIn& port, std::optional expectedEndpoint = {}, std::source_location caller = std::source_location::current()) { - auto available = port.streamReader().available(); - expect(gt(available, 0UZ)) << "didn't receive a reply message, caller: " << caller.file_name() << ":" << caller.line(); - ReaderSpanLike auto messages = port.streamReader().get(available); - Message result; - - if (expectedEndpoint) { - auto it = std::ranges::find_if(messages, [endpoint = *expectedEndpoint](const auto& message) { return message.endpoint == endpoint; }); - expect(gt(available, 0UZ)) << "didn't receive the expected reply message, caller: " << caller.file_name() << ":" << caller.line(); - } else { - result = messages[0]; - } - - expect(messages.consume(messages.size())); - fmt::print("Test got a reply: {}\n", result); - return result; -}; - -auto awaitReplyMsg(auto& graph, std::chrono::milliseconds timeout, gr::MsgPortIn& port, std::optional expectedEndpoint = {}, std::source_location caller = std::source_location::current()) { - Message msg; - - awaitCondition(timeout, [&port, &graph] { - graph.processScheduledMessages(); - return port.streamReader().available() > 0; - }); - - return returnReplyMsg(port, expectedEndpoint, caller); -}; - const boost::ut::suite NonRunningGraphTests = [] { using namespace std::string_literals; using namespace boost::ut; @@ -348,42 +307,6 @@ const boost::ut::suite RunningGraphTests = [] { expect(eq(ConnectionResult::SUCCESS, toGraph.connect(scheduler.msgIn))); expect(eq(ConnectionResult::SUCCESS, scheduler.msgOut.connect(fromGraph))); - auto waitForAReply = [&](std::chrono::milliseconds maxWait = 1s, std::source_location currentSource = std::source_location::current()) { - auto startedAt = std::chrono::system_clock::now(); - while (fromGraph.streamReader().available() == 0) { - std::this_thread::sleep_for(100ms); - if (std::chrono::system_clock::now() - startedAt > maxWait) { - break; - } - } - expect(fromGraph.streamReader().available() > 0) << "Caller at" << currentSource.file_name() << ":" << currentSource.line(); - return fromGraph.streamReader().available() > 0; - }; - - auto emplaceTestBlock = [&](std::string type, std::string params, property_map properties) { - sendMessage(toGraph, "" /* serviceName */, graph::property::kEmplaceBlock /* endpoint */, // - {{"type", std::move(type)}, {"parameters", std::move(params)}, {"properties", std::move(properties)}} /* data */); - expect(waitForAReply()) << "didn't receive a reply message"; - - const Message reply = returnReplyMsg(fromGraph); - expect(reply.data.has_value()) << "emplace block failed and returned an error"; - return reply.data.has_value() ? std::get(reply.data.value().at("uniqueName"s)) : std::string{}; - }; - - auto emplaceTestEdge = [&](std::string sourceBlock, std::string sourcePort, std::string destinationBlock, std::string destinationPort) { - property_map data = {{"sourceBlock", sourceBlock}, {"sourcePort", sourcePort}, // - {"destinationBlock", destinationBlock}, {"destinationPort", destinationPort}, // - {"minBufferSize", gr::Size_t()}, {"weight", 0}, {"edgeName", "unnamed edge"}}; - sendMessage(toGraph, "" /* serviceName */, graph::property::kEmplaceEdge /* endpoint */, data /* data */); - if (!waitForAReply()) { - fmt::println("didn't receive a reply message for {}", data); - return false; - } - - const Message reply = returnReplyMsg(fromGraph); - return reply.data.has_value(); - }; - std::expected schedulerRet; auto runScheduler = [&scheduler, &schedulerRet] { schedulerRet = scheduler.runAndWait(); }; @@ -397,8 +320,8 @@ const boost::ut::suite RunningGraphTests = [] { fmt::println("executed basic graph"); // Adding a few blocks - auto multiply1 = emplaceTestBlock("gr::testing::Copy"s, "float"s, property_map{}); - auto multiply2 = emplaceTestBlock("gr::testing::Copy"s, "float"s, property_map{}); + auto multiply1 = sendEmplaceTestBlockMsg(toGraph, fromGraph, "gr::testing::Copy"s, "float"s, property_map{}); + auto multiply2 = sendEmplaceTestBlockMsg(toGraph, fromGraph, "gr::testing::Copy"s, "float"s, property_map{}); scheduler.processScheduledMessages(); for (const auto& block : scheduler.graph().blocks()) { @@ -406,15 +329,15 @@ const boost::ut::suite RunningGraphTests = [] { } expect(eq(scheduler.graph().blocks().size(), 4UZ)) << "should contain sink->multiply1->multiply2->sink"; - expect(emplaceTestEdge(source.unique_name, "out", multiply1, "in")) << "emplace edge source -> multiply1 failed and returned an error"; - expect(emplaceTestEdge(multiply1, "out", multiply2, "in")) << "emplace edge multiply1 -> multiply2 failed and returned an error"; - expect(emplaceTestEdge(multiply2, "out", sink.unique_name, "in")) << "emplace edge multiply2 -> sink failed and returned an error"; + expect(sendEmplaceTestEdgeMsg(toGraph, fromGraph, source.unique_name, "out", multiply1, "in")) << "emplace edge source -> multiply1 failed and returned an error"; + expect(sendEmplaceTestEdgeMsg(toGraph, fromGraph, multiply1, "out", multiply2, "in")) << "emplace edge multiply1 -> multiply2 failed and returned an error"; + expect(sendEmplaceTestEdgeMsg(toGraph, fromGraph, multiply2, "out", sink.unique_name, "in")) << "emplace edge multiply2 -> sink failed and returned an error"; scheduler.processScheduledMessages(); // Get the whole graph { sendMessage(toGraph, "" /* serviceName */, graph::property::kGraphInspect /* endpoint */, property_map{} /* data */); - if (!waitForAReply()) { + if (!waitForAReply(fromGraph)) { fmt::println("didn't receive a reply message for kGraphInspect"); expect(false); } diff --git a/core/test/qa_HierBlock.cpp b/core/test/qa_HierBlock.cpp index 7312261a..6694c9ad 100644 --- a/core/test/qa_HierBlock.cpp +++ b/core/test/qa_HierBlock.cpp @@ -1,202 +1,276 @@ -#include +#include #include #include #include +#include +#include +#include -template() * std::declval())> -struct scale : public gr::Block> { - gr::PortIn original; - gr::PortOut scaled; - GR_MAKE_REFLECTABLE(scale, original, scaled); +#include "message_utils.hpp" - template V> - [[nodiscard]] constexpr auto processOne(V a) const noexcept { - return a * 2; - } -}; +namespace gr::testing { + +using namespace std::chrono_literals; +using namespace std::string_literals; + +using namespace boost::ut; +using namespace gr; +using namespace gr::message; + +template +struct DemoSubGraph : public gr::Graph { +public: + gr::testing::Copy* pass1 = nullptr; + gr::testing::Copy* pass2 = nullptr; -template() + std::declval())> -struct adder : public gr::Block> { - gr::PortIn addend0; - gr::PortIn addend1; - gr::PortOut sum; - GR_MAKE_REFLECTABLE(adder, addend0, addend1, sum); + DemoSubGraph(const gr::property_map& init) : gr::Graph(init) { + pass1 = std::addressof(emplaceBlock>()); + pass2 = std::addressof(emplaceBlock>()); - template V> - [[nodiscard]] constexpr auto processOne(V a, V b) const noexcept { - return a + b; + std::ignore = gr::Graph::connect(*pass1, PortDefinition("out"), *pass2, PortDefinition("in")); } }; -// TODO: These lines is commented out -// HieBlock has to be reimplemented to support recent changes -/* -template -class HierBlock : public gr::lifecycle::StateMachine>, public gr::BlockModel { +template +class DynamicSubGraph : public gr::BlockWrapper { private: - static std::atomic_size_t _unique_id_counter; - const std::size_t _unique_id = _unique_id_counter++; - const std::string _unique_name = fmt::format("multi_adder#{}", _unique_id); - -protected: - using setting_map = std::map>; - std::string _name = "multi_adder"; - std::string _type_name = "multi_adder"; - gr::property_map _meta_information; /// used to store non-graph-processing information like UI block position etc. - bool _input_tags_present = false; - bool _output_tags_changed = false; - std::vector _tags_at_input; - std::vector _tags_at_output; - gr::CtxSettings> _settings = gr::CtxSettings>(*this); - - using in_port_t = gr::PortIn; - - gr::scheduler::Simple<> _scheduler; - - gr::Graph make_graph() { - gr::Graph graph; - auto& adder_block = graph.emplaceBlock>({{"name", "adder"}}); - auto& left_scale_block = graph.emplaceBlock>(); - auto& right_scale_block = graph.emplaceBlock>(); - - assert(gr::ConnectionResult::SUCCESS == graph.connect<"scaled">(left_scale_block).to<"addend0">(adder_block)); - assert(gr::ConnectionResult::SUCCESS == graph.connect<"scaled">(right_scale_block).to<"addend1">(adder_block)); - - _dynamicInputPorts.emplace_back(gr::DynamicPort(gr::inputPort<0, gr::PortType::STREAM>(&left_scale_block), gr::DynamicPort::non_owned_reference_tag{})); - _dynamicInputPorts.emplace_back(gr::DynamicPort(gr::inputPort<0, gr::PortType::STREAM>(&right_scale_block), gr::DynamicPort::non_owned_reference_tag{})); - _dynamicOutputPorts.emplace_back(gr::DynamicPort(gr::outputPort<0, gr::PortType::STREAM>(&adder_block), gr::DynamicPort::non_owned_reference_tag{})); - - _dynamicPortsLoaded = true; - return graph; - } + std::unordered_multimap _exportedInputPortsForBlock; + std::unordered_multimap _exportedOutputPortsForBlock; public: - HierBlock() : _scheduler(make_graph()) {}; + DynamicSubGraph() { + // We need to make sure nobody touches our dynamic ports + // as this class will handle them + this->_dynamicPortsLoader = [] {}; + + this->_block.propertyCallbacks[gr::graph::property::kSubgraphExportPorts] = [this](auto& self, std::string_view property, gr::Message message) -> std::optional { + const auto& data = message.data.value(); + const std::string& uniqueBlockName = std::get(data.at("uniqueBlockName"s)); + auto portDirection = std::get(data.at("portDirection"s)) == "input" ? PortDirection::INPUT : PortDirection::OUTPUT; + const std::string& portName = std::get(data.at("portName"s)); + const bool exportFlag = std::get(data.at("exportFlag"s)); + + exportPort(exportFlag, uniqueBlockName, portDirection, portName); + + message.endpoint = graph::property::kSubgraphExportedPorts; + return message; + }; + } - ~HierBlock() override = default; + void exportPort(bool exportFlag, const std::string& uniqueBlockName, gr::PortDirection portDirection, const std::string& portName) { + auto [infoIt, infoFound] = findExportedPortInfo(uniqueBlockName, portDirection, portName); + if (infoFound == exportFlag) { + throw gr::Error(fmt::format("Port {} in block {} export status already as desired {}", portName, uniqueBlockName, exportFlag)); + } - void init(std::shared_ptr progress, std::shared_ptr ioThreadPool) override {} + auto& port = findPortInBlock(uniqueBlockName, portDirection, portName); + auto& bookkeepingCollection = portDirection == gr::PortDirection::INPUT ? _exportedInputPortsForBlock : _exportedOutputPortsForBlock; + auto& portCollection = portDirection == gr::PortDirection::INPUT ? this->_dynamicInputPorts : this->_dynamicOutputPorts; + if (exportFlag) { + bookkeepingCollection.emplace(uniqueBlockName, portName); + portCollection.push_back(port.weakRef()); + } else { + bookkeepingCollection.erase(infoIt); + // TODO: Add support for exporting port collections + auto portIt = std::ranges::find_if(portCollection, [needleName = port.name](const auto& portOrCollection) { + return std::visit(gr::meta::overloaded{ + // + [&](gr::DynamicPort& in) { return in.name == needleName; }, // + [](auto&) { return false; } // + }, + portOrCollection); + }); + if (portIt != portCollection.end()) { + portCollection.erase(portIt); + } else { + throw gr::Error("Port was not exported, while it is registered as such"); + } + } - [[nodiscard]] std::string_view name() const override { return _unique_name; } + updateMetaInformation(); + } - std::string_view typeName() const override { return _type_name; } + auto& blockRef() { return gr::BlockWrapper::blockRef(); } + auto& blockRef() const { return gr::BlockWrapper::blockRef(); } - constexpr bool isBlocking() const noexcept override { return false; } +private: + gr::BlockModel& findBlock(std::string uniqueBlockName) { + for (const auto& block : this->blocks()) { + if (std::string(block->uniqueName()) == uniqueBlockName) { + return *block; + } + } + throw gr::Error(fmt::format("Block {} not found in {}", uniqueBlockName, this->uniqueName())); + } - [[nodiscard]] std::expected changeState(gr::lifecycle::State newState) noexcept override { return this->changeStateTo(newState); } + gr::DynamicPort& findPortInBlock(const std::string& uniqueBlockName, gr::PortDirection portDirection, const std::string& portName) { + auto& block = findBlock(uniqueBlockName); - constexpr gr::lifecycle::State state() const noexcept override { return gr::lifecycle::StateMachine>::state(); } + if (portDirection == gr::PortDirection::INPUT) { + return block.dynamicInputPort(portName); + } else { + return block.dynamicOutputPort(portName); + } + } - gr::work::Result work(std::size_t requested_work) override { - if (state() == gr::lifecycle::State::STOPPED) { - return {requested_work, 0UL, gr::work::Status::DONE}; + auto findExportedPortInfo(const std::string& uniqueBlockName, gr::PortDirection portDirection, const std::string& portName) const { + auto& bookkeepingCollection = portDirection == gr::PortDirection::INPUT ? _exportedInputPortsForBlock : _exportedOutputPortsForBlock; + const auto& [from, to] = bookkeepingCollection.equal_range(std::string(uniqueBlockName)); + for (auto it = from; it != to; it++) { + if (it->second == portName) { + return std::make_pair(it, true); + } } - _scheduler.runAndWait(); - bool ok = this->changeStateTo(gr::lifecycle::State::STOPPED).has_value(); - return {requested_work, requested_work, ok ? gr::work::Status::DONE : gr::work::Status::ERROR}; + return std::make_pair(bookkeepingCollection.end(), false); } - gr::work::Status draw(const property_map& config = {}) override { return gr::work::Status::OK; } + void updateMetaInformation() { + auto& info = gr::BlockWrapper::metaInformation(); + + auto fillMetaInformation = [](gr::property_map& dest, auto& bookkeepingCollection) { + std::string previousUniqueName; + std::vector collectedPorts; + for (const auto& [blockUniqueName, portName] : bookkeepingCollection) { + if (previousUniqueName != blockUniqueName && !collectedPorts.empty()) { + dest[previousUniqueName] = std::move(collectedPorts); + collectedPorts.clear(); + } + collectedPorts.push_back(portName); + previousUniqueName = blockUniqueName; + } + if (!collectedPorts.empty()) { + dest[previousUniqueName] = std::move(collectedPorts); + collectedPorts.clear(); + } + }; + + gr::property_map exportedInputPorts, exportedOutputPorts; + fillMetaInformation(exportedInputPorts, _exportedInputPortsForBlock); + fillMetaInformation(exportedOutputPorts, _exportedOutputPortsForBlock); - void processScheduledMessages() override { - // TODO + info["exportedInputPorts"] = std::move(exportedInputPorts); + info["exportedOutputPorts"] = std::move(exportedOutputPorts); } +}; - void* raw() override { return this; } +const boost::ut::suite RunningGraphTests_ = [] { + using namespace std::string_literals; + using namespace boost::ut; + using namespace gr; + using enum gr::message::Command; - void setName(std::string name) noexcept override {} + gr::scheduler::Simple scheduler{gr::Graph()}; - [[nodiscard]] gr::property_map& metaInformation() noexcept override { return _meta_information; } + // Basic source and sink + auto& source = scheduler.graph().emplaceBlock>(); + auto& sink = scheduler.graph().emplaceBlock>(); - [[nodiscard]] const gr::property_map& metaInformation() const override { return _meta_information; } + // Subgraph with a single block inside + using SubGraphType = DynamicSubGraph>; + auto& subGraph = scheduler.graph().addBlock(std::make_unique()); + SubGraphType* subGraphDirect = dynamic_cast(&subGraph); - [[nodiscard]] gr::SettingsBase& settings() override { return _settings; } + // Connecting the message ports + gr::MsgPortOut toGraph; + gr::MsgPortIn fromGraph; + expect(eq(ConnectionResult::SUCCESS, toGraph.connect(scheduler.msgIn))); + expect(eq(ConnectionResult::SUCCESS, scheduler.msgOut.connect(fromGraph))); - [[nodiscard]] const gr::SettingsBase& settings() const override { return _settings; } + std::expected schedulerRet; + auto runScheduler = [&scheduler, &schedulerRet] { schedulerRet = scheduler.runAndWait(); }; - [[nodiscard]] std::string_view uniqueName() const override { return _unique_name; } -}; + std::thread schedulerThread1(runScheduler); -static_assert(gr::BlockLike>); + expect(awaitCondition(1s, [&scheduler] { return scheduler.state() == lifecycle::State::RUNNING; })) << "scheduler thread up and running w/ timeout"; + expect(scheduler.state() == lifecycle::State::RUNNING) << "scheduler thread up and running"; -template -std::atomic_size_t HierBlock::_unique_id_counter = 0; -*/ -template -struct fixed_source : public gr::Block> { - gr::PortOut> out; - std::size_t remaining_events_count; + for (const auto& block : scheduler.graph().blocks()) { + fmt::println("block in list: {} - state() : {}", block->name(), magic_enum::enum_name(block->state())); + } + expect(eq(scheduler.graph().blocks().size(), 3UZ)) << "should contain sink->(copy->copy)->sink"; + + // Export ports from the sub-graph + + sendMessage(toGraph, subGraph.uniqueName(), graph::property::kSubgraphExportPorts, + property_map{ + {"uniqueBlockName"s, subGraphDirect->blockRef().pass2->unique_name}, // + {"portDirection"s, "output"}, // + {"portName"s, "out"}, // + {"exportFlag"s, true} // + }); + sendMessage(toGraph, subGraph.uniqueName(), graph::property::kSubgraphExportPorts, + property_map{ + {"uniqueBlockName"s, subGraphDirect->blockRef().pass1->unique_name}, // + {"portDirection"s, "input"}, // + {"portName"s, "in"}, // + {"exportFlag"s, true} // + }); + if (!waitForAReply(fromGraph)) { + fmt::println("didn't receive a reply message for kSubgraphExportPorts"); + expect(false); + } - GR_MAKE_REFLECTABLE(fixed_source, out, remaining_events_count); + // Make connections - T value = 1; + expect(sendEmplaceTestEdgeMsg(toGraph, fromGraph, source.unique_name, "out", std::string(subGraph.uniqueName()), "in")) << "emplace edge source -> group failed and returned an error"; + expect(sendEmplaceTestEdgeMsg(toGraph, fromGraph, std::string(subGraph.uniqueName()), "out", sink.unique_name, "in")) << "emplace edge multiply2 -> sink failed and returned an error"; - gr::work::Result work(std::size_t requested_work) { - if (this->state() == gr::lifecycle::State::STOPPED) { - return {requested_work, 0UL, gr::work::Status::DONE}; + // Get the whole graph + { + sendMessage(toGraph, "" /* serviceName */, graph::property::kGraphInspect /* endpoint */, property_map{} /* data */); + if (!waitForAReply(fromGraph)) { + fmt::println("didn't receive a reply message for kGraphInspect"); + expect(false); } - if (remaining_events_count != 0) { - auto& writer = out.streamWriter(); - auto data = writer.reserve(1UZ); - data[0] = value; - data.publish(1UZ); + const Message reply = returnReplyMsg(fromGraph); + expect(reply.data.has_value()); - remaining_events_count--; - if (remaining_events_count == 0) { - fmt::print("Last value sent was {}\n", value); - } + const auto& data = reply.data.value(); + const auto& children = std::get(data.at("children"s)); + expect(eq(children.size(), 3UZ)); - value += 1; - return {requested_work, 1UL, gr::work::Status::OK}; - } else { - // TODO: Investigate what schedulers do when there is an event written, but we return DONE - if (auto ret = this->changeStateTo(gr::lifecycle::State::STOPPED); !ret) { - this->emitErrorMessage("requested STOPPED", ret.error()); - } - this->publishTag({{gr::tag::END_OF_STREAM, true}}, 0); - return {requested_work, 1UL, gr::work::Status::DONE}; - } - } -}; + const auto& edges = std::get(data.at("edges"s)); + expect(eq(edges.size(), 2UZ)); -template -struct cout_sink : public gr::Block> { - gr::PortIn> in; - std::size_t remaining = 0; + std::size_t subGraphInConnections = 0UZ; + std::size_t subGraphOutConnections = 0UZ; - GR_MAKE_REFLECTABLE(cout_sink, in, remaining); + // Check that the subgraph is connected properly - void processOne(T value) { - remaining--; - if (remaining == 0) { - std::cerr << "last value was: " << value << "\n"; + for (const auto& [index, edge_] : edges) { + const auto& edge = std::get(edge_); + if (std::get(edge.at("destinationBlock")) == subGraph.uniqueName()) { + subGraphInConnections++; + } + if (std::get(edge.at("sourceBlock")) == subGraph.uniqueName()) { + subGraphOutConnections++; + } } + expect(eq(subGraphInConnections, 1UZ)); + expect(eq(subGraphOutConnections, 1UZ)); + + // Check subgraph topology + const auto& subGraphData = std::get(children.at(std::string(subGraph.uniqueName()))); + const auto& subGraphChildren = std::get(subGraphData.at("children"s)); + const auto& subGraphEdges = std::get(subGraphData.at("edges"s)); + expect(eq(subGraphChildren.size(), 2UZ)); + expect(eq(subGraphEdges.size(), 1UZ)); } -}; -gr::Graph make_graph(std::size_t /*events_count*/) { - gr::Graph graph; - - // auto& source_leftBlock = graph.emplaceBlock>({{"remaining_events_count", events_count}}); - // auto& source_rightBlock = graph.emplaceBlock>({{"remaining_events_count", events_count}}); - // auto& sink = graph.emplaceBlock>({{"remaining", events_count}}); - - // auto& hier = graph.addBlock(std::make_unique>()); - - // graph.connect(source_leftBlock, 0, hier, 0); - // graph.connect(source_rightBlock, 0, hier, 1); - // graph.connect(hier, 0, sink, 0); + // Stopping scheduler + scheduler.requestStop(); + schedulerThread1.join(); + if (!schedulerRet.has_value()) { + expect(false) << fmt::format("scheduler.runAndWait() failed:\n{}\n", schedulerRet.error()); + } - return graph; -} + // return to initial state + expect(scheduler.changeStateTo(lifecycle::State::INITIALISED).has_value()) << "could switch to INITIALISED?"; + expect(awaitCondition(1s, [&scheduler] { return scheduler.state() == lifecycle::State::INITIALISED; })) << "scheduler INITIALISED w/ timeout"; + expect(scheduler.state() == lifecycle::State::INITIALISED) << fmt::format("scheduler INITIALISED - actual: {}\n", magic_enum::enum_name(scheduler.state())); +}; -int main() { - // TODO: These lines is commented because of failing tests - // TODO: HierBlock as it is implemented now does not support tag handling and can not be used with new DONE mechanism via EOS tag - // TODO: Review HierBlock implementation - // auto thread_pool = std::make_shared("custom pool", gr::thread_pool::CPU_BOUND, 2, 2); // use custom pool to limit number of threads for emscripten - // gr::scheduler::Simple scheduler(make_graph(10), thread_pool); - // scheduler.runAndWait(); -} +} // namespace gr::testing +int main() { /* tests are statically executed */ }