From c723af132e7018a7919babde00171f19af41533c Mon Sep 17 00:00:00 2001 From: drslebedev Date: Fri, 23 Aug 2024 10:57:14 +0200 Subject: [PATCH] Use requestedWork as a soft limit in the work function * work() function should aim to honor the requestedWork value whenever feasible Signed-off-by: drslebedev --- core/include/gnuradio-4.0/Block.hpp | 57 +++++++++++++++++++---------- core/test/qa_Block.cpp | 55 ++++++++++++++++++++++++++++ 2 files changed, 92 insertions(+), 20 deletions(-) diff --git a/core/include/gnuradio-4.0/Block.hpp b/core/include/gnuradio-4.0/Block.hpp index b1dd8f76..8a0fd931 100644 --- a/core/include/gnuradio-4.0/Block.hpp +++ b/core/include/gnuradio-4.0/Block.hpp @@ -1235,7 +1235,10 @@ class Block : public lifecycle::StateMachine, public std::tuple::max()) { + if (requestedWork == 0UZ) { + requestedWork = std::numeric_limits::max(); + } struct ResamplingResult { std::size_t resampledIn; std::size_t resampledOut; @@ -1243,24 +1246,28 @@ class Block : public lifecycle::StateMachine, public std::tuple, public std::tuple(input_chunk_size)); + requestedWork = std::clamp(requestedWork, minSync, maxSyncIn); + const std::size_t nResamplingChunks2 = std::min(requestedWork / input_chunk_size, maxSyncOut / output_chunk_size); + if (static_cast(nResamplingChunks2 * input_chunk_size) >= minSyncIn && static_cast(nResamplingChunks2 * output_chunk_size) >= minSyncOut) { + return ResamplingResult{.resampledIn = static_cast(nResamplingChunks2 * input_chunk_size), .resampledOut = static_cast(nResamplingChunks2 * output_chunk_size)}; + } + } return ResamplingResult{.resampledIn = static_cast(nResamplingChunks * input_chunk_size), .resampledOut = static_cast(nResamplingChunks * output_chunk_size)}; } } @@ -1464,7 +1479,7 @@ class Block : public lifecycle::StateMachine, public std::tuple; using TOutputTypes = traits::block::stream_output_port_types; @@ -1486,7 +1501,7 @@ class Block : public lifecycle::StateMachine, public std::tuplestate() == lifecycle::State::STOPPED) { disconnectFromUpStreamParents(); - return {requested_work, 0UZ, DONE}; + return {requestedWork, 0UZ, DONE}; } // evaluate number of available and processable samples @@ -1499,7 +1514,7 @@ class Block : public lifecycle::StateMachine, public std::tuple= input_chunk_size ? nextTagLimit : static_cast(input_chunk_size); // ensure to process at least one input_chunk_size (may shift tags) const auto availableToProcess = std::min({maxSyncIn, maxChunk, (maxSyncAvailableIn - inputSkipBefore), ensureMinimalDecimation, (nextEosTag - inputSkipBefore)}); const auto availableToPublish = std::min({maxSyncOut, maxSyncAvailableOut}); - const auto [resampledIn, resampledOut, resampledStatus] = computeResampling(std::min(minSyncIn, nextEosTag), availableToProcess, minSyncOut, availableToPublish); + auto [resampledIn, resampledOut, resampledStatus] = computeResampling(std::min(minSyncIn, nextEosTag), availableToProcess, minSyncOut, availableToPublish, requestedWork); const auto nextEosTagSkipBefore = nextEosTag - inputSkipBefore; const bool isEosTagPresent = nextEosTag <= 0 || nextEosTagSkipBefore < minSyncIn || nextEosTagSkipBefore < input_chunk_size || output_chunk_size * (nextEosTagSkipBefore / input_chunk_size) < minSyncOut; @@ -1513,10 +1528,11 @@ class Block : public lifecycle::StateMachine, public std::tuple REQUESTED_STOP", this->changeStateTo(lifecycle::State::REQUESTED_STOP)); publishEoS(); this->setAndNotifyState(lifecycle::State::STOPPED); - return {requested_work, 0UZ, DONE}; + return {requestedWork, 0UZ, DONE}; } + if (asyncEoS || (resampledIn == 0 && resampledOut == 0 && !hasAsyncIn && !hasAsyncOut)) { - return {requested_work, 0UZ, resampledStatus}; + return {requestedWork, 0UZ, resampledStatus}; } // for non-bulk processing, the processed span has to be limited to the first sample if it contains a tag s.t. the tag is not applied to every sample @@ -1526,11 +1542,12 @@ class Block : public lifecycle::StateMachine, public std::tuple(&self()), processedIn); - auto outputSpans = prepareStreams(outputPorts(&self()), processedOut); + + const auto inputSpans = prepareStreams(inputPorts(&self()), processedIn); + auto outputSpans = prepareStreams(outputPorts(&self()), processedOut); if (containsEmptyOutputSpans(outputSpans)) { - return {requested_work, 0UZ, INSUFFICIENT_OUTPUT_ITEMS}; + return {requestedWork, 0UZ, INSUFFICIENT_OUTPUT_ITEMS}; } updateInputAndOutputTags(); @@ -1648,7 +1665,7 @@ class Block : public lifecycle::StateMachine, public std::tuple::size == 0; constexpr bool kIsSinkBlock = traits::block::stream_output_port_types::size == 0; - if constexpr (kIsSourceBlock && kIsSinkBlock) { + if constexpr (!kIsSourceBlock && !kIsSinkBlock) { // normal block with input(s) and output(s) performedWork = processedIn; } else if constexpr (kIsSinkBlock) { performedWork = processedIn; @@ -1663,8 +1680,8 @@ class Block : public lifecycle::StateMachine, public std::tuplenotify_all(); } } - return {requested_work, performedWork, userReturnStatus}; - } // end: work::Result workInternal(std::size_t requested_work) { ... } + return {requestedWork, performedWork, userReturnStatus}; + } // end: work::Result workInternal(std::size_t requestedWork) { ... } public: work::Status invokeWork() diff --git a/core/test/qa_Block.cpp b/core/test/qa_Block.cpp index 03a4c82b..c2e0e180 100644 --- a/core/test/qa_Block.cpp +++ b/core/test/qa_Block.cpp @@ -835,4 +835,59 @@ const boost::ut::suite<"Port MetaInfo Tests"> _portMetaInfoTests = [] { }; }; +const boost::ut::suite<"Requested Work Tests"> _requestedWorkTests = [] { + using namespace boost::ut; + using namespace gr; + using namespace gr::testing; + + "work() test"_test = [] { + gr::Size_t nSamples = 1000000; + + gr::Graph graph; + auto& src = graph.emplaceBlock>({{"n_samples_max", nSamples}, {"disconnect_on_done", false}}); + auto& block = graph.emplaceBlock>({{"disconnect_on_done", false}}); + auto& sink = graph.emplaceBlock>({{"disconnect_on_done", false}}); + + expect(eq(ConnectionResult::SUCCESS, graph.connect<"out">(src).to<"in">(block))); + expect(eq(ConnectionResult::SUCCESS, graph.connect<"out">(block).template to<"in">(sink))); + + graph.reconnectAllEdges(); + auto blockInit = [](auto& block) { + if (block.state() == lifecycle::State::IDLE) { + std::ignore = block.changeStateTo(lifecycle::State::INITIALISED); + } + std::ignore = block.changeStateTo(lifecycle::State::RUNNING); + expect(block.state() == lifecycle::State::RUNNING); + }; + blockInit(src); + blockInit(block); + blockInit(sink); + + auto resultSrc = src.work(100); + expect(eq(resultSrc.performed_work, 100UZ)); + + auto resultBlock = block.work(10); // requestedWork is applied, process 10 samples + expect(eq(resultBlock.requested_work, 10UZ)); + expect(eq(resultBlock.performed_work, 10UZ)); + + expect(block.settings().set({{"output_chunk_size", gr::Size_t(7)}, {"input_chunk_size", gr::Size_t(7)}}).empty()); + expect(block.settings().activateContext() != std::nullopt); + resultBlock = block.work(8); // requestedWork is applied, process only one `input_chunk_size` which fits to requestedWork + expect(eq(resultBlock.requested_work, 8UZ)); + expect(eq(resultBlock.performed_work, 7UZ)); + resultBlock = block.work(28); // requestedWork is applied, process 4 `input_chunk_size` which fits to requestedWork + expect(eq(resultBlock.requested_work, 28UZ)); + expect(eq(resultBlock.performed_work, 28UZ)); + resultBlock = block.work(5); // requestedWork is clamped to `input_chunk_size` + expect(eq(resultBlock.requested_work, 5UZ)); + expect(eq(resultBlock.performed_work, 7UZ)); // 7 samples are processed + expect(block.settings().set({{"output_chunk_size", gr::Size_t(1)}, {"input_chunk_size", gr::Size_t(1)}}).empty()); + expect(block.settings().activateContext() != std::nullopt); + resultBlock = block.work(); // process last 48 samples + expect(eq(resultBlock.requested_work, std::numeric_limits::max())); + expect(eq(resultBlock.performed_work, 48UZ)); + auto resultSink = sink.work(100); + expect(eq(resultSink.performed_work, 100UZ)); + }; +}; int main() { /* not needed for UT */ }