From 2baf96e4cbb95e9a305ef87c8fedcf81f8865572 Mon Sep 17 00:00:00 2001 From: drslebedev Date: Thu, 7 Nov 2024 12:16:39 +0100 Subject: [PATCH] Manual Tag Forwarding with InputSpan and OutputSpan API The changes include: * Removed TagPropagationPolicy and introduced NoDefaultTagForwarding * Resolved tag forwarding for scenarios where some output ports are asynchronous and lack samples * Updated Selector block as a test case and example for manual tag forwarding * Added synch_combined_ports parameter and possibility to have item-by-item interleaved output for multi mapped output ports * Added tag propagation test to qa_Selector * Added back pressure test to qa_Selector * Added unit test in qa_Block to handle cases where input_chunk_size > 1 and input range has multiple tags * Fix: Port::consumeTags to make untilLocalIndex exclusive * Fix: Port::mergedTag to make untilLocalIndex exclusive * Fix: TagMonitor to limit nSamples to the remaining samples for processing Signed-off-by: drslebedev --- .../include/gnuradio-4.0/basic/Selector.hpp | 193 +++++++++++------- blocks/basic/test/qa_Selector.cpp | 183 ++++++++++++----- .../gnuradio-4.0/testing/TagMonitors.hpp | 1 + core/include/gnuradio-4.0/Block.hpp | 24 +-- core/include/gnuradio-4.0/Port.hpp | 14 +- core/include/gnuradio-4.0/Tag.hpp | 22 -- core/include/gnuradio-4.0/annotated.hpp | 17 ++ core/test/qa_Block.cpp | 30 +++ core/test/qa_Tags.cpp | 3 +- 9 files changed, 319 insertions(+), 168 deletions(-) diff --git a/blocks/basic/include/gnuradio-4.0/basic/Selector.hpp b/blocks/basic/include/gnuradio-4.0/basic/Selector.hpp index e47d8389..bdddde4c 100644 --- a/blocks/basic/include/gnuradio-4.0/basic/Selector.hpp +++ b/blocks/basic/include/gnuradio-4.0/basic/Selector.hpp @@ -10,7 +10,7 @@ namespace gr::basic { using namespace gr; template -struct Selector : Block> { +struct Selector : Block, NoDefaultTagForwarding> { using Description = Doc-----/ 4-|- +--------------+ + +When multiple input ports are mapped to a single output port, their samples are synchronized. +This means that all input ports are aligned to have the same number of samples. +It is assumed that these ports should have the same sample rate; otherwise, the buffer may fill up quickly. + +Example: Assuming there are 3 input ports with sample counts of 1, 2, and 3, and these are mapped to one output, the output would look like: +`1, 2, 3, 1, 2, 3, 1, 2, 3...` + +If you want to change this behavior, set `sync_combined_ports = false;`. +In this case, samples are taken in order from each input, resulting in a sequence that copies samples sequentially from the first input, then the second, and so on. + + It also contains two additional ports -- `selectOut` and `monitorOut`. Port `selectOut` which can be used to define which input port is passed on to the `monitorOut` port. @@ -72,16 +84,19 @@ you can set the `backPressure` property to false. std::vector> outputs{}; // settings - A, Limits<1U, 32U>> n_inputs = 0U; - A, Limits<1U, 32U>> n_outputs = 0U; - A, "map_in", Visible, Doc<"input port index to route from">> map_in{}; // N.B. need two vectors since pmt_t doesn't support pairs (yet!?!) - A, "map_out", Visible, Doc<"output port index to route to">> map_out{}; - A> back_pressure = false; + A, Limits<1U, 32U>> n_inputs = 0U; + A, Limits<1U, 32U>> n_outputs = 0U; + A, "map_in", Visible, Doc<"input port index to route from">> map_in{}; // N.B. need two vectors since pmt_t doesn't support pairs (yet!?!) + A, "map_out", Visible, Doc<"output port index to route to">> map_out{}; + A> back_pressure = false; + A> sync_combined_ports = true; + + GR_MAKE_REFLECTABLE(Selector, select, inputs, monitor, outputs, n_inputs, n_outputs, map_in, map_out, back_pressure, sync_combined_ports); - GR_MAKE_REFLECTABLE(Selector, select, inputs, monitor, outputs, n_inputs, n_outputs, map_in, map_out, back_pressure); + std::map> _internalMappingInOut{}; + std::map> _internalMappingOutIn{}; - std::map> _internalMapping{}; - gr::Size_t _selectedSrc = -1U; + std::size_t _selectedSrc = 0UZ; void settingsChanged(const gr::property_map& oldSettings, const gr::property_map& newSettings) { if (newSettings.contains("n_inputs") || newSettings.contains("n_outputs")) { @@ -91,101 +106,141 @@ you can set the `backPressure` property to false. } if (newSettings.contains("map_in") || newSettings.contains("map_out")) { assert(map_in.value.size() == map_out.value.size() && "map_in and map_out must have the same length"); - _internalMapping.clear(); + _internalMappingInOut.clear(); + _internalMappingOutIn.clear(); if (map_in.value.size() != map_out.value.size()) { throw std::invalid_argument("Input and output map need to have the same number of elements"); } + std::set> duplicateSet{}; + for (std::size_t i = 0U; i < map_out.value.size(); ++i) { - _internalMapping[map_in.value[i]].push_back(map_out.value[i]); + _internalMappingInOut[static_cast(map_in.value[i])].push_back(static_cast(map_out.value[i])); + _internalMappingOutIn[static_cast(map_out.value[i])].push_back(static_cast(map_in.value[i])); + + const auto isDuplicate = !duplicateSet.insert({map_in.value[i], map_out.value[i]}).second; + if (isDuplicate) { + throw std::invalid_argument(fmt::format("map_in[{}]:{} and map_out[{}]:{} are duplicated", i, map_in.value[i], i, map_out.value[i])); + } + + if (map_in.value[i] >= n_inputs) { + throw std::invalid_argument(fmt::format("map_in[{}] contains port index ({}) > n_inputs ({})", i, map_in.value[i], n_inputs)); + } + + if (map_out.value[i] >= n_outputs) { + throw std::invalid_argument(fmt::format("map_out[{}] contains port index ({}) > n_outputs ({})", i, map_in.value[i], n_outputs)); + } } } } template gr::work::Status processBulk(InputSpanLike auto& selectSpan, std::span& ins, OutputSpanLike auto& monOut, std::span& outs) { - if (_internalMapping.empty()) { - if (back_pressure) { - std::for_each(ins.begin(), ins.end(), [](auto& input) { std::ignore = input.consume(0UZ); }); - } else { - // make the implicit consume all available behaviour explicit - std::for_each(ins.begin(), ins.end(), [](auto& input) { std::ignore = input.consume(input.size()); }); - } + if (_internalMappingInOut.empty()) { + std::ranges::for_each(ins, [this](auto& input) { std::ignore = input.consume(back_pressure ? 0UZ : input.size()); }); return work::Status::OK; } - std::set usedInputs; - if (const auto selectAvailable = selectSpan.size(); selectAvailable > 0) { _selectedSrc = selectSpan.back(); std::ignore = selectSpan.consume(selectAvailable); // consume all samples on the 'select' streaming input port + } else { + _selectedSrc = std::numeric_limits::max(); } - std::vector outOffsets(outs.size(), 0U); - - auto copyToOutput = [&outOffsets](auto inputAvailable, auto& inputSpan, auto& outputSpan, int outIndex) { - const auto offset = (outIndex < 0) ? 0 : outOffsets[static_cast(outIndex)]; - std::copy_n(inputSpan.begin(), inputAvailable, std::next(outputSpan.begin(), offset)); - if (outIndex >= 0) { - outOffsets[static_cast(outIndex)] += static_cast(inputAvailable); + std::vector outOffsets(outs.size(), 0UZ); + auto copyToOutput = [&outOffsets](std::size_t nSamplesToCopy, auto& inputSpan, auto& outputSpan, std::size_t outIndex) { + const std::size_t offset = outIndex == std::numeric_limits::max() ? 0UZ : outOffsets[outIndex]; + std::copy_n(inputSpan.begin(), nSamplesToCopy, std::next(outputSpan.begin(), offset)); + if (outIndex != std::numeric_limits::max()) { + outOffsets[outIndex] += nSamplesToCopy; } - outputSpan.publish(inputAvailable); - }; - - bool monitorOutProcessed = false; - for (const auto& [inIndex, outIndices] : _internalMapping) { - InputSpanLike auto inputSpan = ins[inIndex]; - auto available = inputSpan.size(); - - for (const auto outIndex : outIndices) { - const auto remainingSize = outs[static_cast(outIndex)].size() - static_cast(outOffsets[static_cast(outIndex)]); - if (available > remainingSize) { - available = remainingSize; + const auto tags = inputSpan.tags(); + for (const auto& tag : tags) { + if (tag.first < nSamplesToCopy) { + outputSpan.publishTag(tag.second, tag.first + offset); } } + outputSpan.publish(nSamplesToCopy); + }; - if (_selectedSrc == inIndex) { - if (available > monOut.size()) { - available = monOut.size(); + std::vector nSamplesToConsume(ins.size(), std::numeric_limits::max()); + if (sync_combined_ports && std::ranges::any_of(_internalMappingOutIn, [](const auto& pair) { return pair.second.size() > 1; })) { + for (const auto& [outIndex, inIndices] : _internalMappingOutIn) { + std::size_t nInSamplesAvailable = std::ranges::min(inIndices | std::views::transform([&ins](std::size_t i) { return ins[i].size(); })); + nInSamplesAvailable = std::min(nInSamplesAvailable, outs[outIndex].size() / inIndices.size()); + if (std::ranges::find(inIndices, _selectedSrc) != inIndices.end()) { + nInSamplesAvailable = std::min(nInSamplesAvailable, monOut.size()); + } + for (const std::size_t inIndex : inIndices) { + nSamplesToConsume[inIndex] = std::min(nSamplesToConsume[inIndex], nInSamplesAvailable); } } - if (available == 0) { - continue; + // we need to iterate until no changes occur to include all dependencies -> Think about alternative approach + bool changed = true; + while (changed) { + changed = false; + for (const auto& [outIndex, inIndices] : _internalMappingOutIn) { + const std::size_t currentMin = std::ranges::min(inIndices | std::views::transform([&nSamplesToConsume](std::size_t i) { return nSamplesToConsume[i]; })); + for (const std::size_t inIndex : inIndices) { + if (nSamplesToConsume[inIndex] > currentMin) { + changed = true; + } + nSamplesToConsume[inIndex] = currentMin; + } + } } - for (const auto outIndex : outIndices) { - copyToOutput(available, inputSpan, outs[outIndex], static_cast(outIndex)); + for (const auto& [outIndex, inIndices] : _internalMappingOutIn) { + if (inIndices.size() == 1) { + const std::size_t inIndex = inIndices[0]; + copyToOutput(nSamplesToConsume[inIndex], ins[inIndex], outs[outIndex], std::numeric_limits::max()); + } else if (inIndices.size() > 1) { + auto& outSpan = outs[outIndex]; + std::size_t nSamplesToPublish = 0UZ; + for (std::size_t iS = 0UZ; iS < nSamplesToConsume[0]; iS++) { + for (const std::size_t inIndex : inIndices) { + outSpan[nSamplesToPublish] = ins[inIndex][iS]; + for (const auto& tag : ins[inIndex].rawTags) { + const auto relIndex = tag.index >= ins[inIndex].streamIndex ? static_cast(tag.index - ins[inIndex].streamIndex) : -static_cast(ins[inIndex].streamIndex - tag.index); + if (relIndex == iS) { + outSpan.publishTag(tag.map, nSamplesToPublish); + } + } + nSamplesToPublish++; + } + } + outSpan.publish(nSamplesToPublish); + } } - - if (_selectedSrc == inIndex) { - monitorOutProcessed = true; - copyToOutput(available, inputSpan, monOut, -1); + } else { + for (const auto& [inIndex, outIndices] : _internalMappingInOut) { + InputSpanLike auto inSpan = ins[inIndex]; + std::size_t monOutSize = _selectedSrc == inIndex ? monOut.size() : std::numeric_limits::max(); + std::size_t nSamplesToCopy = std::min({inSpan.size(), monOutSize, // + std::ranges::min(outIndices | std::views::transform([&](std::size_t outIndex) { return outs[outIndex].size() - outOffsets[outIndex]; }))}); + + for (const std::size_t outIndex : outIndices) { + copyToOutput(nSamplesToCopy, inSpan, outs[outIndex], outIndex); + } + nSamplesToConsume[inIndex] = nSamplesToCopy; } - - std::ignore = inputSpan.consume(available); - usedInputs.insert(inIndex); } - if (!monitorOutProcessed && _selectedSrc < ins.size()) { - InputSpanLike auto inputSpan = ins[_selectedSrc]; - auto available = std::min(inputSpan.size(), monOut.size()); - copyToOutput(available, inputSpan, monOut, -1); - std::ignore = inputSpan.consume(available); + if (_selectedSrc < ins.size()) { + InputSpanLike auto inSpan = ins[_selectedSrc]; + std::size_t nSamplesToCopy = std::min({inSpan.size(), monOut.size(), nSamplesToConsume[_selectedSrc]}); + copyToOutput(nSamplesToCopy, inSpan, monOut, std::numeric_limits::max()); + nSamplesToConsume[_selectedSrc] = nSamplesToCopy; } - for (auto iPort = 0U; iPort < ins.size(); ++iPort) { - if (usedInputs.contains(iPort)) { - continue; - } - - if (back_pressure) { - std::ignore = ins[iPort].consume(0UZ); - } else { - // make the implicit consume all available behaviour explicit - std::ignore = ins[iPort].consume(ins[iPort].size()); - } + for (std::size_t inIndex = 0UZ; inIndex < ins.size(); inIndex++) { + const std::size_t nBackPressure = back_pressure ? 0UZ : ins[inIndex].size(); + const std::size_t nFinal = nSamplesToConsume[inIndex] == std::numeric_limits::max() ? nBackPressure : nSamplesToConsume[inIndex]; + std::ignore = ins[inIndex].consume(nFinal); + ins[inIndex].consumeTags(nFinal); } return work::Status::OK; } diff --git a/blocks/basic/test/qa_Selector.cpp b/blocks/basic/test/qa_Selector.cpp index ca6d2903..e89ff5f2 100644 --- a/blocks/basic/test/qa_Selector.cpp +++ b/blocks/basic/test/qa_Selector.cpp @@ -19,9 +19,13 @@ struct TestParams { std::vector> mapping; std::vector> inValues; std::vector> outValues; + std::vector> inTags; + std::vector> outTags; gr::Size_t monitorSource; std::vector monitorValues; bool backPressure; + std::vector nSamplesSelectorInput; // check back pressure + bool syncCombinedPorts{true}; bool ignoreOrder{false}; }; @@ -42,35 +46,30 @@ void execute_selector_test(TestParams params) { std::ranges::transform(params.mapping, mapIn.begin(), [](auto& p) { return p.first; }); std::ranges::transform(params.mapping, mapOut.begin(), [](auto& p) { return p.second; }); - selector = std::addressof(graph.emplaceBlock>({{"n_inputs", nSources}, // - {"n_outputs", nSinks}, // - {"map_in", mapIn}, // - {"map_out", mapOut}, // - {"back_pressure", params.backPressure}})); + selector = std::addressof(graph.emplaceBlock>({{"n_inputs", nSources}, {"n_outputs", nSinks}, {"map_in", mapIn}, {"map_out", mapOut}, {"back_pressure", params.backPressure}, {"sync_combined_ports", params.syncCombinedPorts}, {"disconnect_on_done", false}})); for (gr::Size_t i = 0; i < nSources; ++i) { - sources.push_back(std::addressof(graph.emplaceBlock>({{"n_samples_max", params.nSamples}, {"values", params.inValues[i]}}))); + sources.push_back(std::addressof(graph.emplaceBlock>({{"n_samples_max", params.nSamples}, {"values", params.inValues[i]}, {"disconnect_on_done", false}}))); expect(sources[i]->settings().applyStagedParameters().forwardParameters.empty()); + sources[i]->_tags = params.inTags[i]; expect(gr::ConnectionResult::SUCCESS == graph.connect(*sources[i], "out"s, *selector, "inputs#"s + std::to_string(i))); } for (gr::Size_t i = 0; i < nSinks; ++i) { - sinks.push_back(std::addressof(graph.emplaceBlock>())); + sinks.push_back(std::addressof(graph.emplaceBlock>({{"disconnect_on_done", false}}))); expect(sinks[i]->settings().applyStagedParameters().forwardParameters.empty()); expect(gr::ConnectionResult::SUCCESS == graph.connect(*selector, "outputs#"s + std::to_string(i), *sinks[i], "in"s)); } - TagSink* monitorSink = std::addressof(graph.emplaceBlock>()); + TagSink* monitorSink = std::addressof(graph.emplaceBlock>({{"disconnect_on_done", false}})); expect(monitorSink->settings().applyStagedParameters().forwardParameters.empty()); expect(gr::ConnectionResult::SUCCESS == graph.connect<"monitor">(*selector).to<"in">(*monitorSink)); gr::scheduler::Simple sched{std::move(graph)}; expect(sched.runAndWait().has_value()); - if (!params.backPressure) { - for (const auto& input : selector->inputs) { - expect(eq(input.streamReader().available(), 0U)); - } + for (std::size_t i = 0; i < selector->inputs.size(); i++) { + expect(eq(selector->inputs[i].streamReader().available(), params.nSamplesSelectorInput[i])); } for (std::size_t i = 0; i < sinks.size(); i++) { @@ -80,6 +79,10 @@ void execute_selector_test(TestParams params) { } expect(std::ranges::equal(sinks[i]->_samples, params.outValues[i])) << fmt::format("sinks[{}]->_samples does not match to expected values:\nSink:{}\nExpected:{}\n", i, sinks[i]->_samples, params.outValues[i]); } + + for (std::size_t i = 0; i < sinks.size(); i++) { + expect(equal_tag_lists(sinks[i]->_tags, params.outTags[i], {})); + } } const boost::ut::suite SelectorTest = [] { @@ -93,7 +96,7 @@ const boost::ut::suite SelectorTest = [] { expect(eq(block_nop.n_outputs, 0U)); expect(eq(block_nop.inputs.size(), 0U)); expect(eq(block_nop.outputs.size(), 0U)); - expect(eq(block_nop._internalMapping.size(), 0U)); + expect(eq(block_nop._internalMappingInOut.size(), 0U)); Selector block({{"name", "block"}, {"n_inputs", 4U}, {"n_outputs", 3U}}); block.init(block.progress, block.ioThreadPool); @@ -101,7 +104,7 @@ const boost::ut::suite SelectorTest = [] { expect(eq(block.n_outputs, 3U)); expect(eq(block.inputs.size(), 4U)); expect(eq(block.outputs.size(), 3U)); - expect(eq(block._internalMapping.size(), 0U)); + expect(eq(block._internalMappingInOut.size(), 0U)); }; "basic Selector"_test = [] { @@ -109,147 +112,215 @@ const boost::ut::suite SelectorTest = [] { const std::vector outputMap{1U, 0U}; Selector block({{"n_inputs", 3U}, {"n_outputs", 2U}, {"map_in", std::vector{0U, 1U}}, {"map_out", outputMap}}); // N.B. 3rd input is unconnected block.init(block.progress, block.ioThreadPool); - expect(eq(block._internalMapping.size(), 2U)); + expect(eq(block._internalMappingInOut.size(), 2U)); - using internal_mapping_t = decltype(block._internalMapping); - expect(block._internalMapping == internal_mapping_t{{0U, {outputMap[0]}}, {1U, {outputMap[1]}}}); + using internal_mapping_t = decltype(block._internalMappingInOut); + expect(block._internalMappingInOut == internal_mapping_t{{0U, {outputMap[0]}}, {1U, {outputMap[1]}}}); }; + gr::Tag tag1{1, {{"key1", "value1"}}}; + gr::Tag tag2{2, {{"key2", "value2"}}}; + gr::Tag tag3{3, {{"key3", "value3"}}}; + // Tests without the back pressure - "Selector 1 to 1 mapping"_test = [] { + "Selector 1 to 1 mapping"_test = [tag1, tag2, tag3] { execute_selector_test({.nSamples = 5, // .mapping = {{0, 0}, {1, 1}, {2, 2}}, // .inValues = {{1}, {2}, {3}}, // .outValues = {{1, 1, 1, 1, 1}, {2, 2, 2, 2, 2}, {3, 3, 3, 3, 3}}, // - .monitorSource = -1U, // - .monitorValues = {}, // + .inTags = {{tag1}, {tag2}, {tag3}}, // + .outTags = {{tag1}, {tag2}, {tag3}}, + .monitorSource = -1U, // + .monitorValues = {}, // .backPressure = false, - .ignoreOrder = false}); + .nSamplesSelectorInput = {0, 0, 0}, + .ignoreOrder = true}); }; - "Selector only one input used"_test = [] { + "Selector only one input used"_test = [tag1, tag2, tag3] { execute_selector_test({.nSamples = 5, // .mapping = {{1, 1}}, // .inValues = {{1}, {2}, {3}}, // .outValues = {{}, {2, 2, 2, 2, 2}, {}}, // - .monitorSource = -1U, // - .monitorValues = {}, // + .inTags = {{tag1}, {tag2}, {tag3}}, // + .outTags = {{}, {tag2}, {}}, + .monitorSource = -1U, // + .monitorValues = {}, // + .backPressure = false, + .nSamplesSelectorInput = {0, 0, 0}, + .ignoreOrder = true}); + }; + + "Selector all for one synch_combined_ports = false"_test = [tag1, tag2, tag3] { + const Tag newTag1{6, tag1.map}; + const Tag newTag2{10, tag2.map}; + const Tag newTag3{13, tag3.map}; + execute_selector_test({.nSamples = 5, // + .mapping = {{0, 1}, {1, 1}, {2, 1}}, // + .inValues = {{1}, {2}, {3}}, // + .outValues = {{}, {1, 2, 2, 3, 3, 3, 1, 1, 1, 1, 2, 2, 2, 3, 3}, {}}, // + .inTags = {{tag1}, {tag2}, {tag3}}, // + .outTags = {{}, {newTag1, newTag2, newTag3}, {}}, + .monitorSource = -1U, // + .monitorValues = {}, // .backPressure = false, + .nSamplesSelectorInput = {0, 0, 0}, + .syncCombinedPorts = false, .ignoreOrder = false}); }; - "Selector all for one"_test = [] { + "Selector all for one synch_combined_ports = true"_test = [tag1, tag2, tag3] { + const Tag newTag1{3, tag1.map}; + const Tag newTag2{7, tag2.map}; + const Tag newTag3{11, tag3.map}; execute_selector_test({.nSamples = 5, // .mapping = {{0, 1}, {1, 1}, {2, 1}}, // .inValues = {{1}, {2}, {3}}, // .outValues = {{}, {1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3}, {}}, // - .monitorSource = -1U, // - .monitorValues = {}, // + .inTags = {{tag1}, {tag2}, {tag3}}, // + .outTags = {{}, {newTag1, newTag2, newTag3}, {}}, + .monitorSource = -1U, // + .monitorValues = {}, // .backPressure = false, - .ignoreOrder = true}); + .nSamplesSelectorInput = {0, 0, 0}, + .syncCombinedPorts = true, + .ignoreOrder = false}); }; - "Selector one for all"_test = [] { + "Selector one for all"_test = [tag1, tag2, tag3] { execute_selector_test({.nSamples = 5, // .mapping = {{1, 0}, {1, 1}, {1, 2}}, // .inValues = {{1}, {2}, {3}}, // .outValues = {{2, 2, 2, 2, 2}, {2, 2, 2, 2, 2}, {2, 2, 2, 2, 2}}, // - .monitorSource = -1U, // - .monitorValues = {}, // + .inTags = {{tag1}, {tag2}, {tag3}}, // + .outTags = {{tag2}, {tag2}, {tag2}}, + .monitorSource = -1U, // + .monitorValues = {}, // .backPressure = false, + .nSamplesSelectorInput = {0, 0, 0}, .ignoreOrder = false}); }; // tests with the back pressure - "Selector 1 to 1 mapping, with back pressure"_test = [] { + "Selector 1 to 1 mapping, with back pressure"_test = [tag1, tag2, tag3] { execute_selector_test({.nSamples = 5, // .mapping = {{0, 0}, {1, 1}, {2, 2}}, // .inValues = {{1}, {2}, {3}}, // .outValues = {{1, 1, 1, 1, 1}, {2, 2, 2, 2, 2}, {3, 3, 3, 3, 3}}, // - .monitorSource = -1U, // - .monitorValues = {}, // + .inTags = {{tag1}, {tag2}, {tag3}}, // + .outTags = {{tag1}, {tag2}, {tag3}}, + .monitorSource = -1U, // + .monitorValues = {}, // .backPressure = true, + .nSamplesSelectorInput = {0, 0, 0}, .ignoreOrder = false}); }; - "Selector only one input used, with back pressure"_test = [] { + "Selector only one input used, with back pressure"_test = [tag1, tag2, tag3] { execute_selector_test({.nSamples = 5, // .mapping = {{1, 1}}, // .inValues = {{1}, {2}, {3}}, // .outValues = {{}, {2, 2, 2, 2, 2}, {}}, // - .monitorSource = -1U, // - .monitorValues = {}, // + .inTags = {{tag1}, {tag2}, {tag3}}, // + .outTags = {{}, {tag2}, {}}, + .monitorSource = -1U, // + .monitorValues = {}, // .backPressure = true, + .nSamplesSelectorInput = {5, 0, 5}, .ignoreOrder = false}); }; - "Selector all for one, with back pressure"_test = [] { + "Selector all for one, with back pressure"_test = [tag1, tag2, tag3] { + const Tag newTag1{3, tag1.map}; + const Tag newTag2{7, tag2.map}; + const Tag newTag3{11, tag3.map}; execute_selector_test({.nSamples = 5, // .mapping = {{0, 1}, {1, 1}, {2, 1}}, // .inValues = {{1}, {2}, {3}}, // .outValues = {{}, {1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3}, {}}, // - .monitorSource = -1U, // - .monitorValues = {}, // + .inTags = {{tag1}, {tag2}, {tag3}}, // + .outTags = {{}, {newTag1, newTag2, newTag3}, {}}, + .monitorSource = -1U, // + .monitorValues = {}, // .backPressure = true, - .ignoreOrder = true}); + .nSamplesSelectorInput = {0, 0, 0}, + .ignoreOrder = false}); }; - "Selector one for all, with back pressure"_test = [] { + "Selector one for all, with back pressure"_test = [tag1, tag2, tag3] { execute_selector_test({.nSamples = 5, // .mapping = {{1, 0}, {1, 1}, {1, 2}}, // .inValues = {{1}, {2}, {3}}, // .outValues = {{2, 2, 2, 2, 2}, {2, 2, 2, 2, 2}, {2, 2, 2, 2, 2}}, // - .monitorSource = -1U, // - .monitorValues = {}, // + .inTags = {{tag1}, {tag2}, {tag3}}, // + .outTags = {{tag2}, {tag2}, {tag2}}, + .monitorSource = -1U, // + .monitorValues = {}, // .backPressure = true, + .nSamplesSelectorInput = {5, 0, 5}, .ignoreOrder = false}); }; // Tests with a monitor - "Selector 1 to 1 mapping, with monitor, monitor source already mapped"_test = [] { + "Selector 1 to 1 mapping, with monitor, monitor source already mapped"_test = [tag1, tag2, tag3] { execute_selector_test({.nSamples = 5, // .mapping = {{0, 0}, {1, 1}, {2, 2}}, // .inValues = {{1}, {2}, {3}}, // .outValues = {{1, 1, 1, 1, 1}, {2, 2, 2, 2, 2}, {3, 3, 3, 3, 3}}, // - .monitorSource = 0U, // - .monitorValues = {1, 1, 1, 1, 1}, // + .inTags = {{tag1}, {tag2}, {tag3}}, // + .outTags = {{tag1}, {tag2}, {tag3}}, + .monitorSource = 0U, // set monitor index + .monitorValues = {1, 1, 1, 1, 1}, // .backPressure = false, + .nSamplesSelectorInput = {0, 0, 0}, .ignoreOrder = false}); }; - "Selector only one input used, with monitor, monitor source not mapped"_test = [] { + "Selector only one input used, with monitor, monitor source not mapped"_test = [tag1, tag2, tag3] { execute_selector_test({.nSamples = 5, // .mapping = {{1, 1}}, // .inValues = {{1}, {2}, {3}}, // .outValues = {{}, {2, 2, 2, 2, 2}, {}}, // - .monitorSource = 0U, // - .monitorValues = {1, 1, 1, 1, 1}, // + .inTags = {{tag1}, {tag2}, {tag3}}, // + .outTags = {{}, {tag2}, {}}, + .monitorSource = 0U, // set monitor index + .monitorValues = {1, 1, 1, 1, 1}, // monitor has values even if port is not mapped .backPressure = false, + .nSamplesSelectorInput = {0, 0, 0}, .ignoreOrder = false}); }; - "Selector all for one, with monitor, monitor source already mapped"_test = [] { + "Selector all for one, with monitor, monitor source already mapped"_test = [tag1, tag2, tag3] { + const Tag newTag1{3, tag1.map}; + const Tag newTag2{7, tag2.map}; + const Tag newTag3{11, tag3.map}; execute_selector_test({.nSamples = 5, // .mapping = {{0, 1}, {1, 1}, {2, 1}}, // .inValues = {{1}, {2}, {3}}, // .outValues = {{}, {1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3}, {}}, // - .monitorSource = 1U, // - .monitorValues = {2, 2, 2, 2, 2}, // + .inTags = {{tag1}, {tag2}, {tag3}}, // + .outTags = {{}, {newTag1, newTag2, newTag3}, {}}, + .monitorSource = 1U, // set monitor index + .monitorValues = {2, 2, 2, 2, 2}, // .backPressure = false, + .nSamplesSelectorInput = {0, 0, 0}, .ignoreOrder = true}); }; - "Selector one for all, with monitor, monitor source already mapped"_test = [] { + "Selector one for all, with monitor, monitor source already mapped"_test = [tag1, tag2, tag3] { execute_selector_test({.nSamples = 5, // .mapping = {{1, 0}, {1, 1}, {1, 2}}, // .inValues = {{1}, {2}, {3}}, // .outValues = {{2, 2, 2, 2, 2}, {2, 2, 2, 2, 2}, {2, 2, 2, 2, 2}}, // - .monitorSource = 1U, // - .monitorValues = {2, 2, 2, 2, 2}, // + .inTags = {{tag1}, {tag2}, {tag3}}, // + .outTags = {{tag2}, {tag2}, {tag2}}, + .monitorSource = 1U, // + .monitorValues = {2, 2, 2, 2, 2}, // .backPressure = false, + .nSamplesSelectorInput = {0, 0, 0}, .ignoreOrder = false}); }; }; diff --git a/blocks/testing/include/gnuradio-4.0/testing/TagMonitors.hpp b/blocks/testing/include/gnuradio-4.0/testing/TagMonitors.hpp index d198179a..a362c126 100644 --- a/blocks/testing/include/gnuradio-4.0/testing/TagMonitors.hpp +++ b/blocks/testing/include/gnuradio-4.0/testing/TagMonitors.hpp @@ -163,6 +163,7 @@ struct TagSource : public Block> { if (_tagIndex < _tags.size()) { if (static_cast(_tags[_tagIndex].index) > nSamplesRemainder) { nextTagIn = static_cast(_tags[_tagIndex].index) - nSamplesRemainder; + nextTagIn = std::min(nextTagIn, n_samples_max - _nSamplesProduced); } } else { nextTagIn = isInfinite() ? static_cast(outSpan.size()) : n_samples_max - _nSamplesProduced; diff --git a/core/include/gnuradio-4.0/Block.hpp b/core/include/gnuradio-4.0/Block.hpp index c0695755..60404918 100644 --- a/core/include/gnuradio-4.0/Block.hpp +++ b/core/include/gnuradio-4.0/Block.hpp @@ -344,7 +344,9 @@ class Block : public lifecycle::StateMachine { using AllowIncompleteFinalUpdate = ArgumentsTypeList::template find_or_default>; using DrawableControl = ArgumentsTypeList::template find_or_default>; - constexpr static bool blockingIO = std::disjunction_v, Arguments>..., std::is_same, Arguments>...>; + constexpr static bool blockingIO = std::disjunction_v, Arguments>..., std::is_same, Arguments>...>; + constexpr static bool noDefaultTagForwarding = std::disjunction_v...>; + constexpr static block::Category blockCategory = block::Category::NormalBlock; template @@ -365,8 +367,6 @@ class Block : public lifecycle::StateMachine { alignas(hardware_destructive_interference_size) std::shared_ptr ioThreadPool; alignas(hardware_destructive_interference_size) std::atomic ioThreadRunning{false}; - constexpr static TagPropagationPolicy tag_policy = TagPropagationPolicy::TPP_ALL_TO_ALL; - using ResamplingValue = std::conditional_t; using ResamplingLimit = Limits<1UL, std::numeric_limits::max()>; using ResamplingDoc = Doc<"For each `input_chunk_size` input samples, `output_chunk_size` output samples are published (in>out: Decimate, in; @@ -526,7 +526,7 @@ class Block : public lifecycle::StateMachine { // important: these tags need to be queued because at this stage the block is not yet connected to other downstream blocks invokeUserProvidedFunction("init() - applyStagedParameters", [this] noexcept(false) { if (const auto applyResult = settings().applyStagedParameters(); !applyResult.forwardParameters.empty()) { - if constexpr (Derived::tag_policy == TagPropagationPolicy::TPP_ALL_TO_ALL) { + if constexpr (!noDefaultTagForwarding) { publishTag(applyResult.forwardParameters, 0); } notifyListeners(block::property::kSetting, settings().get()); @@ -684,9 +684,8 @@ class Block : public lifecycle::StateMachine { } constexpr void forwardTags(auto& outputSpanTuple) noexcept { - if (inputTagsPresent()) { - if constexpr (Derived::tag_policy == TagPropagationPolicy::TPP_ALL_TO_ALL) { - // publishTag(mergedInputTag().map, 0); + if constexpr (!noDefaultTagForwarding) { + if (inputTagsPresent()) { for_each_writer_span([this](auto& outSpan) { outSpan.publishTag(mergedInputTag().map, 0); }, outputSpanTuple); } } @@ -718,9 +717,9 @@ class Block : public lifecycle::StateMachine { } }; if constexpr (InputSpanLike>) { - mergeSrcMapInto(inputSpanOrVector.getMergedTag(0).map, _mergedInputTag.map); + mergeSrcMapInto(inputSpanOrVector.getMergedTag(1).map, _mergedInputTag.map); } else { - std::ranges::for_each(inputSpanOrVector, [this, &mergeSrcMapInto](auto& inputSpan) { mergeSrcMapInto(inputSpan.getMergedTag(0).map, _mergedInputTag.map); }); + std::ranges::for_each(inputSpanOrVector, [this, &mergeSrcMapInto](auto& inputSpan) { mergeSrcMapInto(inputSpan.getMergedTag(1).map, _mergedInputTag.map); }); } }, inputSpans); @@ -1480,10 +1479,9 @@ class Block : public lifecycle::StateMachine { if constexpr (HasProcessBulkFunction) { invokeUserProvidedFunction("invokeProcessBulk", [&userReturnStatus, &inputSpans, &outputSpans, this] noexcept(HasNoexceptProcessBulkFunction) { userReturnStatus = invokeProcessBulk(inputSpans, outputSpans); }); - // See https://github.com/fair-acc/gnuradio4/issues/444 for_each_reader_span( [&processedIn](auto& in) { - if (in.isConsumeRequested() && in.isConnected) { + if (in.isConsumeRequested() && in.isConnected && in.isSync) { processedIn = std::min(processedIn, in.nRequestedSamplesToConsume()); } }, @@ -1491,7 +1489,7 @@ class Block : public lifecycle::StateMachine { for_each_writer_span( [&processedOut](auto& out) { - if (out.isPublishRequested() && out.isConnected) { + if (out.isPublishRequested() && out.isConnected && out.isSync) { processedOut = std::min(processedOut, out.nRequestedSamplesToPublish()); } }, @@ -1544,7 +1542,7 @@ class Block : public lifecycle::StateMachine { processedOut = 0UZ; } - if (processedIn > 0 && processedOut > 0) { + if (processedOut > 0) { copyCachedOutputTags(outputSpans); if (mergedInputTag().map.contains(gr::tag::END_OF_STREAM)) { diff --git a/core/include/gnuradio-4.0/Port.hpp b/core/include/gnuradio-4.0/Port.hpp index dbf9adf6..0d2a92c7 100644 --- a/core/include/gnuradio-4.0/Port.hpp +++ b/core/include/gnuradio-4.0/Port.hpp @@ -280,7 +280,7 @@ struct Async {}; * - Using `tags()`: Returns a `range::view` of input tags. Indices are relative to the first sample in the span and can be negative for unconsumed tags. * - Using `rawTags`: Provides direct access to the underlying `ReaderSpan` for advanced manipulation. * - Consuming Tags: By default, tags associated with samples up to and including the first sample are consumed. One can manually consume tags up to a specific sample index using `consumeTags(streamSampleIndex)`. - * - Merging Tags: Use `getMergedTag(untilLocalIndex)` to obtain a single tag that merges all tags up to `untilLocalIndex` and including first sample. + * - Merging Tags: Use `getMergedTag(untilLocalIndex)` to obtain a single tag that merges all tags up to `untilLocalIndex` (exclusively). */ template concept InputSpanLike = std::ranges::contiguous_range && ConstSpanLike && requires(T& span, std::size_t n) { @@ -468,10 +468,12 @@ struct Port { if (rawTags.isConsumeRequested()) { // the user has already manually consumed tags return; } - if ((ReaderSpanType::isConsumeRequested() && ReaderSpanType::nRequestedSamplesToConsume() == 0) || this->empty()) { + if ((ReaderSpanType::isConsumeRequested() && ReaderSpanType::nRequestedSamplesToConsume() == 0) // + || ReaderSpanType::spanReleasePolicy() == SpanReleasePolicy::ProcessNone // + || this->empty()) { return; // no samples to be consumed -> do not consume any tags } - consumeTags(0); // consume all tags including the one on the first sample + consumeTags(1); // consume all tags including the one on the first sample } } @@ -483,11 +485,11 @@ struct Port { } void consumeTags(std::size_t untilLocalIndex) { - std::size_t tagsToConsume = static_cast(std::ranges::count_if(rawTags | std::views::take_while([untilLocalIndex, this](auto& t) { return t.index <= streamIndex + untilLocalIndex; }), [](auto /*v*/) { return true; })); + std::size_t tagsToConsume = static_cast(std::ranges::count_if(rawTags | std::views::take_while([untilLocalIndex, this](auto& t) { return t.index < streamIndex + untilLocalIndex; }), [](auto /*v*/) { return true; })); std::ignore = rawTags.tryConsume(tagsToConsume); } - [[nodiscard]] inline Tag getMergedTag(std::size_t untilLocalIndex = 0) const { + [[nodiscard]] inline Tag getMergedTag(std::size_t untilLocalIndex = 1) const { auto mergeSrcMapInto = [](const property_map& sourceMap, property_map& destinationMap) { assert(&sourceMap != &destinationMap); for (const auto& [key, value] : sourceMap) { @@ -495,7 +497,7 @@ struct Port { } }; Tag result{0UZ, {}}; - std::ranges::for_each(rawTags | std::views::take_while([untilLocalIndex, this](auto& t) { return t.index <= streamIndex + untilLocalIndex; }), [&mergeSrcMapInto, &result](const Tag& tag) { mergeSrcMapInto(tag.map, result.map); }); + std::ranges::for_each(rawTags | std::views::take_while([untilLocalIndex, this](auto& t) { return t.index < streamIndex + untilLocalIndex; }), [&mergeSrcMapInto, &result](const Tag& tag) { mergeSrcMapInto(tag.map, result.map); }); return result; } diff --git a/core/include/gnuradio-4.0/Tag.hpp b/core/include/gnuradio-4.0/Tag.hpp index adaa88b9..f892cc65 100644 --- a/core/include/gnuradio-4.0/Tag.hpp +++ b/core/include/gnuradio-4.0/Tag.hpp @@ -28,28 +28,6 @@ inline constexpr std::size_t hardware_constructive_interference_size = 64; namespace gr { -/*** - * Controls automatic propagation of stream tags on sync ports. - * ``` - * ┌───────┐ ┌───────┐ ┌───────┐ ┌───────┐ - * ┌┤ ├┐ ┌┤ ├┐ ┌┤ ├┐ ┌┤ ├┐ - * ││ ││ ││ ────► ││ ││ ────► ││ ││ ││ - * └┤ ├┘ └┤ \ / ├┘ └┤ ├┘ └┤work(){├┘ - * │ │ │ X │ │ │ │ get();│ - * ┌┤ ├┐ ┌┤ / \ ├┐ ┌┤ ├┐ ┌┤ pub();├┐ - * ││ ││ ││ ────► ││ ││ ────► ││ ││} ││ - * └┤ ├┘ └┤ ├┘ └┤ ├┘ └┤ ├┘ - * └───────┘ └───────┘ └───────┘ └───────┘ - * `DONT` `ALL_TO_ALL `ONE_TO_ONE` `TPP_CUSTOM` - * ``` - */ -enum class TagPropagationPolicy { - TPP_DONT = 0, /*!< Scheduler doesn't propagate tags from in- to output. The block itself is free to insert tags. */ - TPP_ALL_TO_ALL = 1, /*!< Propagate tags from all in- to all outputs. The scheduler takes care of that. */ - TPP_ONE_TO_ONE = 2, /*!< Propagate tags from n. input to n. output. Requires same number of in- and outputs */ - TPP_CUSTOM = 3 /*!< Like TPP_DONT, but signals the block it should implement application-specific forwarding behaviour. */ -}; - using property_map = pmtv::map_t; template diff --git a/core/include/gnuradio-4.0/annotated.hpp b/core/include/gnuradio-4.0/annotated.hpp index 56909a00..67c3e75f 100644 --- a/core/include/gnuradio-4.0/annotated.hpp +++ b/core/include/gnuradio-4.0/annotated.hpp @@ -69,6 +69,23 @@ struct BlockingIO { [[maybe_unused]] constexpr static bool useIoThread = UseIoThread; }; +/** + * @brief Disable default tag forwarding. + * + * There are two types of tag forwarding: (1) default All-To-All, and (2) user-implemented. + * + * By default, tag forwarding operates as All-To-All. Before tags on input ports are forwarded, they are merged. + * If a block has multiple ports and tags on these ports contain maps with identical keys, only one value for each key + * will be retained in the merged tag. This may lead to potential information loss, as it’s not guaranteed which + * value will be kept. + * + * This default behavior is generally sufficient. However, if it’s not suitable for your use case, you can disable it + * by adding the `NoDefaultTagForwarding` attribute to the template parameters. In such cases, the block should implement + * custom tag forwarding in the `processBulk` function. The `InputSpanLike` and `OutputSpanLike` APIs are available to simplify + * with custom tag forwarding. + */ +struct NoDefaultTagForwarding {}; + /** * @brief Annotates block, indicating to perform resampling based on the provided `inputChunkSize` and `outputChunkSize`. * For each `inputChunkSize` input samples, `outputChunkSize` output samples are published. diff --git a/core/test/qa_Block.cpp b/core/test/qa_Block.cpp index 192ccca6..aea8499c 100644 --- a/core/test/qa_Block.cpp +++ b/core/test/qa_Block.cpp @@ -720,6 +720,36 @@ const boost::ut::suite<"Stride Tests"> _stride_tests = [] { stride_test({.n_samples = 1000000, .output_chunk_size = 100, .input_chunk_size = 100, .stride = 249900, .exp_in = 100, .exp_out = 100, .exp_counter = 5, .exp_total_in = 500, .exp_total_out = 500}, thread_pool); }; + "Interpolation/Decimation with many tags, tags forward policy"_test = [] { + using namespace boost::ut; + using namespace gr::testing; + + gr::Graph testGraph; + auto& source = testGraph.emplaceBlock>({{"n_samples_max", gr::Size_t(20)}}); + source._tags = { + {0, {{"key0", "value@0"}}}, // + {2, {{"key2", "value@2"}}}, // + {4, {{"key4", "value@4"}}}, // + {6, {{"key6", "value@6"}}}, // + {8, {{"ke8", "value@8"}}}, // + {10, {{"key10", "value@10"}}}, // + {12, {{"key12", "value@12"}}}, // + {14, {{"key14", "value@14"}}} // + }; + + auto& intDecBlock = testGraph.emplaceBlock>({{"output_chunk_size", gr::Size_t(10)}, {"input_chunk_size", gr::Size_t(10)}}); + auto& sink = testGraph.emplaceBlock>(); + expect(eq(gr::ConnectionResult::SUCCESS, testGraph.connect<"out">(source).to<"in">(intDecBlock))); + expect(eq(gr::ConnectionResult::SUCCESS, testGraph.connect<"out">(intDecBlock).to<"in">(sink))); + + gr::scheduler::Simple sched{std::move(testGraph)}; + expect(sched.runAndWait().has_value()); + + expect(eq(intDecBlock.status.process_counter, 2UZ)); + expect(eq(intDecBlock.status.total_in, 20UZ)); + expect(eq(intDecBlock.status.total_out, 20UZ)); + }; + "SyncOrAsync ports tests"_test = [] { syncOrAsyncTest(); syncOrAsyncTest(); diff --git a/core/test/qa_Tags.cpp b/core/test/qa_Tags.cpp index 71482cc5..1ec7d250 100644 --- a/core/test/qa_Tags.cpp +++ b/core/test/qa_Tags.cpp @@ -35,8 +35,7 @@ or next chunk, whichever is closer. Also adds an "offset" key to the tag map sig GR_MAKE_REFLECTABLE(RealignTagsToChunks, inPort, outPort); - double sampling_rate = 1.0; - constexpr static gr::TagPropagationPolicy tag_policy = gr::TagPropagationPolicy::TPP_DONT; + double sampling_rate = 1.0; // TODO: References are required here because InputSpan and OutputSpan have internal states // (e.g., tagsPublished) that are unique to each instance. Copying these objects without proper