Skip to content

Commit

Permalink
Add subgraphs with exported ports
Browse files Browse the repository at this point in the history
Signed-off-by: Ivan Čukić <[email protected]>
  • Loading branch information
ivan-cukic committed Dec 4, 2024
1 parent 4829478 commit 233e6fd
Show file tree
Hide file tree
Showing 9 changed files with 459 additions and 301 deletions.
43 changes: 25 additions & 18 deletions core/include/gnuradio-4.0/Block.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ enum class Category {
* }
*
* void start() override {
* propertyCallbacks.emplace(kMyCustomProperty, &MyBlock::propertyCallbackMyCustom);
* propertyCallbacks.emplace(kMyCustomProperty, std::mem_fn(&MyBlock::propertyCallbackMyCustom));
* }
* };
* @endcode
Expand Down Expand Up @@ -428,18 +428,18 @@ class Block : public lifecycle::StateMachine<Derived> {
MsgPortInBuiltin msgIn;
MsgPortOutBuiltin msgOut;

using PropertyCallback = std::optional<Message> (Derived::*)(std::string_view, Message);
using PropertyCallback = std::function<std::optional<Message>(Derived&, std::string_view, Message)>;
std::map<std::string, PropertyCallback> propertyCallbacks{
{block::property::kHeartbeat, &Block::propertyCallbackHeartbeat}, //
{block::property::kEcho, &Block::propertyCallbackEcho}, //
{block::property::kLifeCycleState, &Block::propertyCallbackLifecycleState}, //
{block::property::kSetting, &Block::propertyCallbackSettings}, //
{block::property::kStagedSetting, &Block::propertyCallbackStagedSettings}, //
{block::property::kStoreDefaults, &Block::propertyCallbackStoreDefaults}, //
{block::property::kResetDefaults, &Block::propertyCallbackResetDefaults}, //
{block::property::kActiveContext, &Block::propertyCallbackActiveContext}, //
{block::property::kSettingsCtx, &Block::propertyCallbackSettingsCtx}, //
{block::property::kSettingsContexts, &Block::propertyCallbackSettingsContexts}, //
{block::property::kHeartbeat, std::mem_fn(&Block::propertyCallbackHeartbeat)}, //
{block::property::kEcho, std::mem_fn(&Block::propertyCallbackEcho)}, //
{block::property::kLifeCycleState, std::mem_fn(&Block::propertyCallbackLifecycleState)}, //
{block::property::kSetting, std::mem_fn(&Block::propertyCallbackSettings)}, //
{block::property::kStagedSetting, std::mem_fn(&Block::propertyCallbackStagedSettings)}, //
{block::property::kStoreDefaults, std::mem_fn(&Block::propertyCallbackStoreDefaults)}, //
{block::property::kResetDefaults, std::mem_fn(&Block::propertyCallbackResetDefaults)}, //
{block::property::kActiveContext, std::mem_fn(&Block::propertyCallbackActiveContext)}, //
{block::property::kSettingsCtx, std::mem_fn(&Block::propertyCallbackSettingsCtx)}, //
{block::property::kSettingsContexts, std::mem_fn(&Block::propertyCallbackSettingsContexts)}, //
};
std::map<std::string, std::set<std::string>> propertySubscriptions;

Expand Down Expand Up @@ -1358,17 +1358,20 @@ class Block : public lifecycle::StateMachine<Derived> {
}

std::size_t getMergedBlockLimit() {
if constexpr (requires(const Derived& d) {
{ available_samples(d) } -> std::same_as<std::size_t>;
}) {
if constexpr (Derived::blockCategory != block::Category::NormalBlock) {
return 0;
} else if constexpr (requires(const Derived& d) {
{ available_samples(d) } -> std::same_as<std::size_t>;
}) {
return available_samples(self());
} else if constexpr (traits::block::stream_input_port_types<Derived>::size == 0 && traits::block::stream_output_port_types<Derived>::size == 0) { // allow blocks that have neither input nor output ports (by merging source to sink block) -> use internal buffer size
constexpr gr::Size_t chunkSize = Derived::merged_work_chunk_size();
static_assert(chunkSize != std::dynamic_extent && chunkSize > 0, "At least one internal port must define a maximum number of samples or the non-member/hidden "
"friend function `available_samples(const BlockType&)` must be defined.");
return chunkSize;
} else {
return std::numeric_limits<std::size_t>::max();
}
return std::numeric_limits<std::size_t>::max();
}

template<typename TIn, typename TOut>
Expand Down Expand Up @@ -1670,7 +1673,11 @@ class Block : public lifecycle::StateMachine<Derived> {
}
}
} else { // block does not define any valid processing function
static_assert(meta::always_false<traits::block::stream_input_port_types_tuple<Derived>>, "neither processBulk(...) nor processOne(...) implemented");
if constexpr (Derived::blockCategory != block::Category::NormalBlock) {
return {requestedWork, 0UZ, OK};
} else {
static_assert(meta::always_false<traits::block::stream_input_port_types_tuple<Derived>>, "neither processBulk(...) nor processOne(...) implemented");
}
}

// sanitise input/output samples based on explicit user-defined processBulk(...) return status
Expand Down Expand Up @@ -1852,7 +1859,7 @@ class Block : public lifecycle::StateMachine<Derived> {

std::optional<Message> retMessage;
try {
retMessage = (self().*callback)(message.endpoint, message); // N.B. life-time: message is copied
retMessage = callback(self(), message.endpoint, message); // N.B. life-time: message is copied
} catch (const gr::exception& e) {
retMessage = Message{message};
retMessage->data = std::unexpected(Error(e));
Expand Down
58 changes: 30 additions & 28 deletions core/include/gnuradio-4.0/BlockModel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,10 @@ class BlockModel {
MsgPortInBuiltin* msgIn;
MsgPortOutBuiltin* msgOut;

static std::string_view portName(const DynamicPortOrCollection& portOrCollection) {
static std::string portName(const DynamicPortOrCollection& portOrCollection) {
return std::visit(meta::overloaded{ //
[](const gr::DynamicPort& port) { return port.name; }, //
[](const NamedPortCollection& namedCollection) { return namedCollection.name; }},
[](const NamedPortCollection& namedCollection) { return std::string(namedCollection.name); }},
portOrCollection);
}

Expand Down Expand Up @@ -375,11 +375,12 @@ constexpr bool contains_type = (std::is_same_v<T, Ts> || ...);
template<BlockLike T>
requires std::is_constructible_v<T, property_map>
class BlockWrapper : public BlockModel {
private:
protected:
static_assert(std::is_same_v<T, std::remove_reference_t<T>>);
T _block;
std::string _type_name = gr::meta::type_name<T>();

protected:
[[nodiscard]] constexpr const auto& blockRef() const noexcept {
if constexpr (requires { *_block; }) {
return *_block;
Expand All @@ -401,36 +402,37 @@ class BlockWrapper : public BlockModel {
msgOut = std::addressof(_block.msgOut);
}

template<typename TPort>
constexpr static auto& processPort(auto& where, TPort& port) noexcept {
where.push_back(gr::DynamicPort(port, DynamicPort::non_owned_reference_tag{}));
return where.back();
}

void dynamicPortLoader() {
void dynamicPortsLoader() {
if (_dynamicPortsLoaded) {
return;
}

auto registerPort = [this]<gr::detail::PortDescription CurrentPortType>(DynamicPorts& where, auto, CurrentPortType*) noexcept {
if constexpr (CurrentPortType::kIsDynamicCollection) {
auto& collection = CurrentPortType::getPortObject(blockRef());
NamedPortCollection result;
result.name = CurrentPortType::Name;
for (auto& port : collection) {
processPort(result.ports, port);
}
where.push_back(std::move(result));
} else {
auto& port = CurrentPortType::getPortObject(blockRef());
port.name = CurrentPortType::Name;
processPort(where, port);
}
auto processPort = []<typename TPort>(auto& where, TPort& port) -> auto& {
where.push_back(gr::DynamicPort(port, DynamicPort::non_owned_reference_tag{}));
return where.back();
};

using Node = std::remove_cvref_t<decltype(blockRef())>;
traits::block::all_input_ports<Node>::for_each(registerPort, _dynamicInputPorts);
traits::block::all_output_ports<Node>::for_each(registerPort, _dynamicOutputPorts);
using TBlock = std::remove_cvref_t<decltype(blockRef())>;
if constexpr (TBlock::blockCategory == block::Category::NormalBlock) {
auto registerPort = [this, processPort]<gr::detail::PortDescription CurrentPortType>(DynamicPorts& where, auto, CurrentPortType*) noexcept {
if constexpr (CurrentPortType::kIsDynamicCollection) {
auto& collection = CurrentPortType::getPortObject(blockRef());
NamedPortCollection result;
result.name = CurrentPortType::Name;
for (auto& port : collection) {
processPort(result.ports, port);
}
where.push_back(std::move(result));
} else {
auto& port = CurrentPortType::getPortObject(blockRef());
port.name = CurrentPortType::Name;
processPort(where, port);
}
};

traits::block::all_input_ports<TBlock>::for_each(registerPort, _dynamicInputPorts);
traits::block::all_output_ports<TBlock>::for_each(registerPort, _dynamicOutputPorts);
}

_dynamicPortsLoaded = true;
}
Expand All @@ -439,7 +441,7 @@ class BlockWrapper : public BlockModel {
BlockWrapper() : BlockWrapper(gr::property_map()) {}
explicit BlockWrapper(gr::property_map initParameter) : _block(std::move(initParameter)) {
initMessagePorts();
_dynamicPortsLoader = std::bind_front(&BlockWrapper::dynamicPortLoader, this);
_dynamicPortsLoader = [this] { this->dynamicPortsLoader(); };
}

BlockWrapper(const BlockWrapper& other) = delete;
Expand Down
1 change: 1 addition & 0 deletions core/include/gnuradio-4.0/Buffer.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#ifndef GNURADIO_BUFFER2_H
#define GNURADIO_BUFFER2_H

#include <array>
#include <bit>
#include <concepts>
#include <cstdint>
Expand Down
47 changes: 35 additions & 12 deletions core/include/gnuradio-4.0/Graph.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,14 @@ inline static const char* kGraphInspect = "GraphInspect";
inline static const char* kGraphInspected = "GraphInspected";

inline static const char* kRegistryBlockTypes = "RegistryBlockTypes";

inline static const char* kSubgraphCreate = "SubgraphCreate";
inline static const char* kSubgraphExportPorts = "SubgraphExportPorts";
inline static const char* kSubgraphDisband = "SubgraphDisband";

inline static const char* kSubgraphCreated = "SubgraphCreated";
inline static const char* kSubgraphExportedPorts = "SubgraphExportedPorts";
inline static const char* kSubgraphDisbanded = "SubgraphDisbanded";
} // namespace graph::property

class Graph : public gr::Block<Graph> {
Expand Down Expand Up @@ -184,14 +192,16 @@ class Graph : public gr::Block<Graph> {

Graph(property_map settings = {}) : gr::Block<Graph>(std::move(settings)) {
_blocks.reserve(100); // TODO: remove
propertyCallbacks[graph::property::kEmplaceBlock] = &Graph::propertyCallbackEmplaceBlock;
propertyCallbacks[graph::property::kRemoveBlock] = &Graph::propertyCallbackRemoveBlock;
propertyCallbacks[graph::property::kInspectBlock] = &Graph::propertyCallbackInspectBlock;
propertyCallbacks[graph::property::kReplaceBlock] = &Graph::propertyCallbackReplaceBlock;
propertyCallbacks[graph::property::kEmplaceEdge] = &Graph::propertyCallbackEmplaceEdge;
propertyCallbacks[graph::property::kRemoveEdge] = &Graph::propertyCallbackRemoveEdge;
propertyCallbacks[graph::property::kGraphInspect] = &Graph::propertyCallbackGraphInspect;
propertyCallbacks[graph::property::kRegistryBlockTypes] = &Graph::propertyCallbackRegistryBlockTypes;
propertyCallbacks[graph::property::kEmplaceBlock] = std::mem_fn(&Graph::propertyCallbackEmplaceBlock);
propertyCallbacks[graph::property::kRemoveBlock] = std::mem_fn(&Graph::propertyCallbackRemoveBlock);
propertyCallbacks[graph::property::kInspectBlock] = std::mem_fn(&Graph::propertyCallbackInspectBlock);
propertyCallbacks[graph::property::kReplaceBlock] = std::mem_fn(&Graph::propertyCallbackReplaceBlock);
propertyCallbacks[graph::property::kEmplaceEdge] = std::mem_fn(&Graph::propertyCallbackEmplaceEdge);
propertyCallbacks[graph::property::kRemoveEdge] = std::mem_fn(&Graph::propertyCallbackRemoveEdge);
propertyCallbacks[graph::property::kGraphInspect] = std::mem_fn(&Graph::propertyCallbackGraphInspect);
propertyCallbacks[graph::property::kRegistryBlockTypes] = std::mem_fn(&Graph::propertyCallbackRegistryBlockTypes);
propertyCallbacks[graph::property::kSubgraphCreate] = std::mem_fn(&Graph::propertyCallbackSubgraphCreate);
propertyCallbacks[graph::property::kSubgraphDisband] = std::mem_fn(&Graph::propertyCallbackSubgraphDisband);
}
Graph(Graph&) = delete; // there can be only one owner of Graph
Graph& operator=(Graph&) = delete; // there can be only one owner of Graph
Expand Down Expand Up @@ -318,13 +328,13 @@ class Graph : public gr::Block<Graph> {

property_map inputPorts;
for (auto& portOrCollection : block->dynamicInputPorts()) {
inputPorts[std::string(BlockModel::portName(portOrCollection))] = serializePortOrCollection(portOrCollection);
inputPorts[BlockModel::portName(portOrCollection)] = serializePortOrCollection(portOrCollection);
}
result["inputPorts"] = std::move(inputPorts);

property_map outputPorts;
for (auto& portOrCollection : block->dynamicOutputPorts()) {
outputPorts[std::string(BlockModel::portName(portOrCollection))] = serializePortOrCollection(portOrCollection);
outputPorts[BlockModel::portName(portOrCollection)] = serializePortOrCollection(portOrCollection);
}
result["outputPorts"] = std::move(outputPorts);

Expand Down Expand Up @@ -555,6 +565,19 @@ class Graph : public gr::Block<Graph> {
return message;
}

std::optional<Message> propertyCallbackSubgraphCreate([[maybe_unused]] std::string_view propertyName, Message message) {
// creates a subgraph given a list of uniqueNames
// edges that are split end up being the exported ports
// managed/unmanaged?
return {};
}

std::optional<Message> propertyCallbackSubgraphDisband([[maybe_unused]] std::string_view propertyName, Message message) {
// move all the blocks from the subgraph into the top-level,
// keeping the connections
return {};
}

// connect using the port index
template<std::size_t sourcePortIndex, std::size_t sourcePortSubIndex, typename Source>
[[nodiscard]] auto connectInternal(Source& source) {
Expand Down Expand Up @@ -903,10 +926,10 @@ class MergedGraph<Left, Right, OutId, InId> : public Block<MergedGraph<Left, Rig
auto right_out = apply_right<InId, traits::block::stream_input_port_types<Right>::size() - InId - 1>(std::forward_as_tuple(std::forward<Ts>(inputs)...), std::move(std::get<OutId>(left_out)));

if constexpr (traits::block::stream_output_port_types<Left>::size == 2 && traits::block::stream_output_port_types<Right>::size == 1) {
return std::make_tuple(std::move(std::get<OutId ^ 1>(left_out)), std::move(right_out));
return std::make_tuple(std::move(std::get < OutId ^ 1 > (left_out)), std::move(right_out));

} else if constexpr (traits::block::stream_output_port_types<Left>::size == 2) {
return std::tuple_cat(std::make_tuple(std::move(std::get<OutId ^ 1>(left_out))), std::move(right_out));
return std::tuple_cat(std::make_tuple(std::move(std::get < OutId ^ 1 > (left_out))), std::move(right_out));

} else if constexpr (traits::block::stream_output_port_types<Right>::size == 1) {
return [&]<std::size_t... Is, std::size_t... Js>(std::index_sequence<Is...>, std::index_sequence<Js...>) { return std::make_tuple(std::move(std::get<Is>(left_out))..., std::move(std::get<OutId + 1 + Js>(left_out))..., std::move(right_out)); }(std::make_index_sequence<OutId>(), std::make_index_sequence<traits::block::stream_output_port_types<Left>::size - OutId - 1>());
Expand Down
1 change: 1 addition & 0 deletions core/include/gnuradio-4.0/Message.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ void sendMessage(auto& port, std::string_view serviceName, std::string_view endp
}
WriterSpanLike auto msgSpan = port.streamWriter().template reserve<SpanReleasePolicy::ProcessAll>(1UZ);
msgSpan[0] = std::move(message);
msgSpan.publish(1UZ);
}
} // namespace detail

Expand Down
Loading

0 comments on commit 233e6fd

Please sign in to comment.