Skip to content

Commit

Permalink
Manual Tag Forwarding with InputSpan and OutputSpan API
Browse files Browse the repository at this point in the history
The changes include:
* Removed TagPropagationPolicy and introduced NoDefaultTagForwarding
* Resolved tag forwarding for scenarios where some output ports are asynchronous and lack samples
* Updated Selector block as a test case and example for manual tag forwarding
* Added synch_combined_ports parameter and possibility to have item-by-item interleaved output for multi mapped output ports
* Added tag propagation test to qa_Selector
* Added back pressure test to qa_Selector
* Added unit test in qa_Block to handle cases where input_chunk_size > 1 and input range has multiple tags
* Fix: Port::consumeTags to make untilLocalIndex exclusive
* Fix: Port::mergedTag to make untilLocalIndex exclusive
* Fix: TagMonitor to limit nSamples to the remaining samples for processing

Signed-off-by: drslebedev <[email protected]>
  • Loading branch information
drslebedev committed Nov 11, 2024
1 parent 3273d08 commit 2baf96e
Show file tree
Hide file tree
Showing 9 changed files with 319 additions and 168 deletions.
193 changes: 124 additions & 69 deletions blocks/basic/include/gnuradio-4.0/basic/Selector.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace gr::basic {
using namespace gr;

template<typename T>
struct Selector : Block<Selector<T>> {
struct Selector : Block<Selector<T>, NoDefaultTagForwarding> {
using Description = Doc<R""(
@brief basic multiplexing class to route arbitrary inputs to outputs
Expand Down Expand Up @@ -53,6 +53,18 @@ The mapping is as follows:
-|-4 >-----/ 4-|-
+--------------+
When multiple input ports are mapped to a single output port, their samples are synchronized.
This means that all input ports are aligned to have the same number of samples.
It is assumed that these ports should have the same sample rate; otherwise, the buffer may fill up quickly.
Example: Assuming there are 3 input ports with sample counts of 1, 2, and 3, and these are mapped to one output, the output would look like:
`1, 2, 3, 1, 2, 3, 1, 2, 3...`
If you want to change this behavior, set `sync_combined_ports = false;`.
In this case, samples are taken in order from each input, resulting in a sequence that copies samples sequentially from the first input, then the second, and so on.
It also contains two additional ports -- `selectOut` and `monitorOut`. Port
`selectOut` which can be used to define which input port is passed on to the
`monitorOut` port.
Expand All @@ -72,16 +84,19 @@ you can set the `backPressure` property to false.
std::vector<PortOut<T, Async>> outputs{};

// settings
A<gr::Size_t, "n_inputs", Visible, Doc<"variable number of inputs">, Limits<1U, 32U>> n_inputs = 0U;
A<gr::Size_t, "n_outputs", Visible, Doc<"variable number of inputs">, Limits<1U, 32U>> n_outputs = 0U;
A<std::vector<gr::Size_t>, "map_in", Visible, Doc<"input port index to route from">> map_in{}; // N.B. need two vectors since pmt_t doesn't support pairs (yet!?!)
A<std::vector<gr::Size_t>, "map_out", Visible, Doc<"output port index to route to">> map_out{};
A<bool, "back_pressure", Visible, Doc<"true: do not consume samples from un-routed ports">> back_pressure = false;
A<gr::Size_t, "n_inputs", Visible, Doc<"variable number of inputs">, Limits<1U, 32U>> n_inputs = 0U;
A<gr::Size_t, "n_outputs", Visible, Doc<"variable number of inputs">, Limits<1U, 32U>> n_outputs = 0U;
A<std::vector<gr::Size_t>, "map_in", Visible, Doc<"input port index to route from">> map_in{}; // N.B. need two vectors since pmt_t doesn't support pairs (yet!?!)
A<std::vector<gr::Size_t>, "map_out", Visible, Doc<"output port index to route to">> map_out{};
A<bool, "back_pressure", Visible, Doc<"true: do not consume samples from un-routed ports">> back_pressure = false;
A<bool, "sync combined port", Doc<"true: input ports connected to the same output port are synchronised">> sync_combined_ports = true;

GR_MAKE_REFLECTABLE(Selector, select, inputs, monitor, outputs, n_inputs, n_outputs, map_in, map_out, back_pressure, sync_combined_ports);

GR_MAKE_REFLECTABLE(Selector, select, inputs, monitor, outputs, n_inputs, n_outputs, map_in, map_out, back_pressure);
std::map<std::size_t, std::vector<std::size_t>> _internalMappingInOut{};
std::map<std::size_t, std::vector<std::size_t>> _internalMappingOutIn{};

std::map<gr::Size_t, std::vector<gr::Size_t>> _internalMapping{};
gr::Size_t _selectedSrc = -1U;
std::size_t _selectedSrc = 0UZ;

void settingsChanged(const gr::property_map& oldSettings, const gr::property_map& newSettings) {
if (newSettings.contains("n_inputs") || newSettings.contains("n_outputs")) {
Expand All @@ -91,101 +106,141 @@ you can set the `backPressure` property to false.
}
if (newSettings.contains("map_in") || newSettings.contains("map_out")) {
assert(map_in.value.size() == map_out.value.size() && "map_in and map_out must have the same length");
_internalMapping.clear();
_internalMappingInOut.clear();
_internalMappingOutIn.clear();

if (map_in.value.size() != map_out.value.size()) {
throw std::invalid_argument("Input and output map need to have the same number of elements");
}

std::set<std::pair<gr::Size_t, gr::Size_t>> duplicateSet{};

for (std::size_t i = 0U; i < map_out.value.size(); ++i) {
_internalMapping[map_in.value[i]].push_back(map_out.value[i]);
_internalMappingInOut[static_cast<std::size_t>(map_in.value[i])].push_back(static_cast<std::size_t>(map_out.value[i]));
_internalMappingOutIn[static_cast<std::size_t>(map_out.value[i])].push_back(static_cast<std::size_t>(map_in.value[i]));

const auto isDuplicate = !duplicateSet.insert({map_in.value[i], map_out.value[i]}).second;
if (isDuplicate) {
throw std::invalid_argument(fmt::format("map_in[{}]:{} and map_out[{}]:{} are duplicated", i, map_in.value[i], i, map_out.value[i]));
}

if (map_in.value[i] >= n_inputs) {
throw std::invalid_argument(fmt::format("map_in[{}] contains port index ({}) > n_inputs ({})", i, map_in.value[i], n_inputs));
}

if (map_out.value[i] >= n_outputs) {
throw std::invalid_argument(fmt::format("map_out[{}] contains port index ({}) > n_outputs ({})", i, map_in.value[i], n_outputs));
}
}
}
}

template<gr::InputSpanLike TInSpan, gr::OutputSpanLike TOutSpan>
gr::work::Status processBulk(InputSpanLike auto& selectSpan, std::span<TInSpan>& ins, OutputSpanLike auto& monOut, std::span<TOutSpan>& outs) {
if (_internalMapping.empty()) {
if (back_pressure) {
std::for_each(ins.begin(), ins.end(), [](auto& input) { std::ignore = input.consume(0UZ); });
} else {
// make the implicit consume all available behaviour explicit
std::for_each(ins.begin(), ins.end(), [](auto& input) { std::ignore = input.consume(input.size()); });
}
if (_internalMappingInOut.empty()) {
std::ranges::for_each(ins, [this](auto& input) { std::ignore = input.consume(back_pressure ? 0UZ : input.size()); });
return work::Status::OK;
}

std::set<std::size_t> usedInputs;

if (const auto selectAvailable = selectSpan.size(); selectAvailable > 0) {
_selectedSrc = selectSpan.back();
std::ignore = selectSpan.consume(selectAvailable); // consume all samples on the 'select' streaming input port
} else {
_selectedSrc = std::numeric_limits<std::size_t>::max();
}

std::vector<int> outOffsets(outs.size(), 0U);

auto copyToOutput = [&outOffsets](auto inputAvailable, auto& inputSpan, auto& outputSpan, int outIndex) {
const auto offset = (outIndex < 0) ? 0 : outOffsets[static_cast<std::size_t>(outIndex)];
std::copy_n(inputSpan.begin(), inputAvailable, std::next(outputSpan.begin(), offset));
if (outIndex >= 0) {
outOffsets[static_cast<std::size_t>(outIndex)] += static_cast<int>(inputAvailable);
std::vector<std::size_t> outOffsets(outs.size(), 0UZ);
auto copyToOutput = [&outOffsets](std::size_t nSamplesToCopy, auto& inputSpan, auto& outputSpan, std::size_t outIndex) {
const std::size_t offset = outIndex == std::numeric_limits<std::size_t>::max() ? 0UZ : outOffsets[outIndex];
std::copy_n(inputSpan.begin(), nSamplesToCopy, std::next(outputSpan.begin(), offset));
if (outIndex != std::numeric_limits<std::size_t>::max()) {
outOffsets[outIndex] += nSamplesToCopy;
}
outputSpan.publish(inputAvailable);
};

bool monitorOutProcessed = false;
for (const auto& [inIndex, outIndices] : _internalMapping) {
InputSpanLike auto inputSpan = ins[inIndex];
auto available = inputSpan.size();

for (const auto outIndex : outIndices) {
const auto remainingSize = outs[static_cast<std::size_t>(outIndex)].size() - static_cast<std::size_t>(outOffsets[static_cast<std::size_t>(outIndex)]);
if (available > remainingSize) {
available = remainingSize;
const auto tags = inputSpan.tags();
for (const auto& tag : tags) {
if (tag.first < nSamplesToCopy) {
outputSpan.publishTag(tag.second, tag.first + offset);
}
}
outputSpan.publish(nSamplesToCopy);
};

if (_selectedSrc == inIndex) {
if (available > monOut.size()) {
available = monOut.size();
std::vector<std::size_t> nSamplesToConsume(ins.size(), std::numeric_limits<std::size_t>::max());
if (sync_combined_ports && std::ranges::any_of(_internalMappingOutIn, [](const auto& pair) { return pair.second.size() > 1; })) {
for (const auto& [outIndex, inIndices] : _internalMappingOutIn) {
std::size_t nInSamplesAvailable = std::ranges::min(inIndices | std::views::transform([&ins](std::size_t i) { return ins[i].size(); }));
nInSamplesAvailable = std::min(nInSamplesAvailable, outs[outIndex].size() / inIndices.size());
if (std::ranges::find(inIndices, _selectedSrc) != inIndices.end()) {
nInSamplesAvailable = std::min(nInSamplesAvailable, monOut.size());
}
for (const std::size_t inIndex : inIndices) {
nSamplesToConsume[inIndex] = std::min(nSamplesToConsume[inIndex], nInSamplesAvailable);
}
}

if (available == 0) {
continue;
// we need to iterate until no changes occur to include all dependencies -> Think about alternative approach
bool changed = true;
while (changed) {
changed = false;
for (const auto& [outIndex, inIndices] : _internalMappingOutIn) {
const std::size_t currentMin = std::ranges::min(inIndices | std::views::transform([&nSamplesToConsume](std::size_t i) { return nSamplesToConsume[i]; }));
for (const std::size_t inIndex : inIndices) {
if (nSamplesToConsume[inIndex] > currentMin) {
changed = true;
}
nSamplesToConsume[inIndex] = currentMin;
}
}
}

for (const auto outIndex : outIndices) {
copyToOutput(available, inputSpan, outs[outIndex], static_cast<int>(outIndex));
for (const auto& [outIndex, inIndices] : _internalMappingOutIn) {
if (inIndices.size() == 1) {
const std::size_t inIndex = inIndices[0];
copyToOutput(nSamplesToConsume[inIndex], ins[inIndex], outs[outIndex], std::numeric_limits<std::size_t>::max());
} else if (inIndices.size() > 1) {
auto& outSpan = outs[outIndex];
std::size_t nSamplesToPublish = 0UZ;
for (std::size_t iS = 0UZ; iS < nSamplesToConsume[0]; iS++) {
for (const std::size_t inIndex : inIndices) {
outSpan[nSamplesToPublish] = ins[inIndex][iS];
for (const auto& tag : ins[inIndex].rawTags) {
const auto relIndex = tag.index >= ins[inIndex].streamIndex ? static_cast<std::ptrdiff_t>(tag.index - ins[inIndex].streamIndex) : -static_cast<std::ptrdiff_t>(ins[inIndex].streamIndex - tag.index);
if (relIndex == iS) {
outSpan.publishTag(tag.map, nSamplesToPublish);
}
}
nSamplesToPublish++;
}
}
outSpan.publish(nSamplesToPublish);
}
}

if (_selectedSrc == inIndex) {
monitorOutProcessed = true;
copyToOutput(available, inputSpan, monOut, -1);
} else {
for (const auto& [inIndex, outIndices] : _internalMappingInOut) {
InputSpanLike auto inSpan = ins[inIndex];
std::size_t monOutSize = _selectedSrc == inIndex ? monOut.size() : std::numeric_limits<std::size_t>::max();
std::size_t nSamplesToCopy = std::min({inSpan.size(), monOutSize, //
std::ranges::min(outIndices | std::views::transform([&](std::size_t outIndex) { return outs[outIndex].size() - outOffsets[outIndex]; }))});

for (const std::size_t outIndex : outIndices) {
copyToOutput(nSamplesToCopy, inSpan, outs[outIndex], outIndex);
}
nSamplesToConsume[inIndex] = nSamplesToCopy;
}

std::ignore = inputSpan.consume(available);
usedInputs.insert(inIndex);
}

if (!monitorOutProcessed && _selectedSrc < ins.size()) {
InputSpanLike auto inputSpan = ins[_selectedSrc];
auto available = std::min(inputSpan.size(), monOut.size());
copyToOutput(available, inputSpan, monOut, -1);
std::ignore = inputSpan.consume(available);
if (_selectedSrc < ins.size()) {
InputSpanLike auto inSpan = ins[_selectedSrc];
std::size_t nSamplesToCopy = std::min({inSpan.size(), monOut.size(), nSamplesToConsume[_selectedSrc]});
copyToOutput(nSamplesToCopy, inSpan, monOut, std::numeric_limits<std::size_t>::max());
nSamplesToConsume[_selectedSrc] = nSamplesToCopy;
}

for (auto iPort = 0U; iPort < ins.size(); ++iPort) {
if (usedInputs.contains(iPort)) {
continue;
}

if (back_pressure) {
std::ignore = ins[iPort].consume(0UZ);
} else {
// make the implicit consume all available behaviour explicit
std::ignore = ins[iPort].consume(ins[iPort].size());
}
for (std::size_t inIndex = 0UZ; inIndex < ins.size(); inIndex++) {
const std::size_t nBackPressure = back_pressure ? 0UZ : ins[inIndex].size();
const std::size_t nFinal = nSamplesToConsume[inIndex] == std::numeric_limits<std::size_t>::max() ? nBackPressure : nSamplesToConsume[inIndex];
std::ignore = ins[inIndex].consume(nFinal);
ins[inIndex].consumeTags(nFinal);
}
return work::Status::OK;
}
Expand Down
Loading

0 comments on commit 2baf96e

Please sign in to comment.