Skip to content

Commit

Permalink
Use requestedWork as a soft limit in the work function
Browse files Browse the repository at this point in the history
* work() function should aim to honor the requestedWork value whenever feasible

Signed-off-by: drslebedev <[email protected]>
  • Loading branch information
drslebedev authored and RalphSteinhagen committed Sep 6, 2024
1 parent 02b5587 commit c723af1
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 20 deletions.
57 changes: 37 additions & 20 deletions core/include/gnuradio-4.0/Block.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1235,32 +1235,39 @@ class Block : public lifecycle::StateMachine<Derived>, public std::tuple<Argumen
return 0UZ;
}

auto computeResampling(std::size_t minSyncIn, std::size_t maxSyncIn, std::size_t minSyncOut, std::size_t maxSyncOut) {
auto computeResampling(std::size_t minSyncIn, std::size_t maxSyncIn, std::size_t minSyncOut, std::size_t maxSyncOut, std::size_t requestedWork = std::numeric_limits<std::size_t>::max()) {
if (requestedWork == 0UZ) {
requestedWork = std::numeric_limits<std::size_t>::max();
}
struct ResamplingResult {
std::size_t resampledIn;
std::size_t resampledOut;
work::Status status = work::Status::OK;
};

if constexpr (!ResamplingControl::kEnabled) { // no resampling
const std::size_t n = std::min(maxSyncIn, maxSyncOut);
if (n < minSyncIn) {
const std::size_t maxSync = std::min(maxSyncIn, maxSyncOut);
if (maxSync < minSyncIn) {
return ResamplingResult{.resampledIn = 0UZ, .resampledOut = 0UZ, .status = work::Status::INSUFFICIENT_INPUT_ITEMS};
}
if (n < minSyncOut) {
if (maxSync < minSyncOut) {
return ResamplingResult{.resampledIn = 0UZ, .resampledOut = 0UZ, .status = work::Status::INSUFFICIENT_OUTPUT_ITEMS};
}
return ResamplingResult{.resampledIn = n, .resampledOut = n};
const auto minSync = std::max(minSyncIn, minSyncOut);
const auto resampled = std::clamp(requestedWork, minSync, maxSync);
return ResamplingResult{.resampledIn = resampled, .resampledOut = resampled};
}
if (input_chunk_size == 1UL && output_chunk_size == 1UL) { // no resampling
const std::size_t n = std::min(maxSyncIn, maxSyncOut);
if (n < minSyncIn) {
const std::size_t maxSync = std::min(maxSyncIn, maxSyncOut);
if (maxSync < minSyncIn) {
return ResamplingResult{.resampledIn = 0UZ, .resampledOut = 0UZ, .status = work::Status::INSUFFICIENT_INPUT_ITEMS};
}
if (n < minSyncOut) {
if (maxSync < minSyncOut) {
return ResamplingResult{.resampledIn = 0UZ, .resampledOut = 0UZ, .status = work::Status::INSUFFICIENT_OUTPUT_ITEMS};
}
return ResamplingResult{.resampledIn = n, .resampledOut = n};
const auto minSync = std::max(minSyncIn, minSyncOut);
const auto resampled = std::clamp(requestedWork, minSync, maxSync);
return ResamplingResult{.resampledIn = resampled, .resampledOut = resampled};
}
std::size_t nResamplingChunks;
if constexpr (StrideControl::kEnabled) { // with stride, we cannot process more than one chunk
Expand All @@ -1278,6 +1285,14 @@ class Block : public lifecycle::StateMachine<Derived>, public std::tuple<Argumen
} else if (nResamplingChunks * output_chunk_size < minSyncOut) {
return ResamplingResult{.resampledIn = 0UZ, .resampledOut = 0UZ, .status = work::Status::INSUFFICIENT_OUTPUT_ITEMS};
} else {
if (requestedWork < nResamplingChunks * input_chunk_size) { // if we still can apply requestedWork soft cut
const auto minSync = std::max(minSyncIn, static_cast<std::size_t>(input_chunk_size));
requestedWork = std::clamp(requestedWork, minSync, maxSyncIn);
const std::size_t nResamplingChunks2 = std::min(requestedWork / input_chunk_size, maxSyncOut / output_chunk_size);
if (static_cast<std::size_t>(nResamplingChunks2 * input_chunk_size) >= minSyncIn && static_cast<std::size_t>(nResamplingChunks2 * output_chunk_size) >= minSyncOut) {
return ResamplingResult{.resampledIn = static_cast<std::size_t>(nResamplingChunks2 * input_chunk_size), .resampledOut = static_cast<std::size_t>(nResamplingChunks2 * output_chunk_size)};
}
}
return ResamplingResult{.resampledIn = static_cast<std::size_t>(nResamplingChunks * input_chunk_size), .resampledOut = static_cast<std::size_t>(nResamplingChunks * output_chunk_size)};
}
}
Expand Down Expand Up @@ -1464,7 +1479,7 @@ class Block : public lifecycle::StateMachine<Derived>, public std::tuple<Argumen
* - consume in samples (has to be last to correctly propagate back-pressure)
* @return struct { std::size_t produced_work, work_return_t}
*/
work::Result workInternal(std::size_t requested_work) {
work::Result workInternal(std::size_t requestedWork) {
using enum gr::work::Status;
using TInputTypes = traits::block::stream_input_port_types<Derived>;
using TOutputTypes = traits::block::stream_output_port_types<Derived>;
Expand All @@ -1486,7 +1501,7 @@ class Block : public lifecycle::StateMachine<Derived>, public std::tuple<Argumen

if (this->state() == lifecycle::State::STOPPED) {
disconnectFromUpStreamParents();
return {requested_work, 0UZ, DONE};
return {requestedWork, 0UZ, DONE};
}

// evaluate number of available and processable samples
Expand All @@ -1499,7 +1514,7 @@ class Block : public lifecycle::StateMachine<Derived>, public std::tuple<Argumen
const auto ensureMinimalDecimation = nextTagLimit >= input_chunk_size ? nextTagLimit : static_cast<long unsigned int>(input_chunk_size); // ensure to process at least one input_chunk_size (may shift tags)
const auto availableToProcess = std::min({maxSyncIn, maxChunk, (maxSyncAvailableIn - inputSkipBefore), ensureMinimalDecimation, (nextEosTag - inputSkipBefore)});
const auto availableToPublish = std::min({maxSyncOut, maxSyncAvailableOut});
const auto [resampledIn, resampledOut, resampledStatus] = computeResampling(std::min(minSyncIn, nextEosTag), availableToProcess, minSyncOut, availableToPublish);
auto [resampledIn, resampledOut, resampledStatus] = computeResampling(std::min(minSyncIn, nextEosTag), availableToProcess, minSyncOut, availableToPublish, requestedWork);
const auto nextEosTagSkipBefore = nextEosTag - inputSkipBefore;
const bool isEosTagPresent = nextEosTag <= 0 || nextEosTagSkipBefore < minSyncIn || nextEosTagSkipBefore < input_chunk_size || output_chunk_size * (nextEosTagSkipBefore / input_chunk_size) < minSyncOut;

Expand All @@ -1513,10 +1528,11 @@ class Block : public lifecycle::StateMachine<Derived>, public std::tuple<Argumen
emitErrorMessageIfAny("workInternal(): EOS tag arrived -> REQUESTED_STOP", this->changeStateTo(lifecycle::State::REQUESTED_STOP));
publishEoS();
this->setAndNotifyState(lifecycle::State::STOPPED);
return {requested_work, 0UZ, DONE};
return {requestedWork, 0UZ, DONE};
}

if (asyncEoS || (resampledIn == 0 && resampledOut == 0 && !hasAsyncIn && !hasAsyncOut)) {
return {requested_work, 0UZ, resampledStatus};
return {requestedWork, 0UZ, resampledStatus};
}

// for non-bulk processing, the processed span has to be limited to the first sample if it contains a tag s.t. the tag is not applied to every sample
Expand All @@ -1526,11 +1542,12 @@ class Block : public lifecycle::StateMachine<Derived>, public std::tuple<Argumen
work::Status userReturnStatus = ERROR; // default if nothing has been set
std::size_t processedIn = limitByFirstTag ? 1UZ : resampledIn;
std::size_t processedOut = limitByFirstTag ? 1UZ : resampledOut;
const auto inputSpans = prepareStreams(inputPorts<PortType::STREAM>(&self()), processedIn);
auto outputSpans = prepareStreams(outputPorts<PortType::STREAM>(&self()), processedOut);

const auto inputSpans = prepareStreams(inputPorts<PortType::STREAM>(&self()), processedIn);
auto outputSpans = prepareStreams(outputPorts<PortType::STREAM>(&self()), processedOut);

if (containsEmptyOutputSpans(outputSpans)) {
return {requested_work, 0UZ, INSUFFICIENT_OUTPUT_ITEMS};
return {requestedWork, 0UZ, INSUFFICIENT_OUTPUT_ITEMS};
}

updateInputAndOutputTags();
Expand Down Expand Up @@ -1648,7 +1665,7 @@ class Block : public lifecycle::StateMachine<Derived>, public std::tuple<Argumen
if (userReturnStatus == OK) {
constexpr bool kIsSourceBlock = traits::block::stream_input_port_types<Derived>::size == 0;
constexpr bool kIsSinkBlock = traits::block::stream_output_port_types<Derived>::size == 0;
if constexpr (kIsSourceBlock && kIsSinkBlock) {
if constexpr (!kIsSourceBlock && !kIsSinkBlock) { // normal block with input(s) and output(s)
performedWork = processedIn;
} else if constexpr (kIsSinkBlock) {
performedWork = processedIn;
Expand All @@ -1663,8 +1680,8 @@ class Block : public lifecycle::StateMachine<Derived>, public std::tuple<Argumen
progress->notify_all();
}
}
return {requested_work, performedWork, userReturnStatus};
} // end: work::Result workInternal(std::size_t requested_work) { ... }
return {requestedWork, performedWork, userReturnStatus};
} // end: work::Result workInternal(std::size_t requestedWork) { ... }

public:
work::Status invokeWork()
Expand Down
55 changes: 55 additions & 0 deletions core/test/qa_Block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -835,4 +835,59 @@ const boost::ut::suite<"Port MetaInfo Tests"> _portMetaInfoTests = [] {
};
};

const boost::ut::suite<"Requested Work Tests"> _requestedWorkTests = [] {
using namespace boost::ut;
using namespace gr;
using namespace gr::testing;

"work() test"_test = [] {
gr::Size_t nSamples = 1000000;

gr::Graph graph;
auto& src = graph.emplaceBlock<TagSource<float, ProcessFunction::USE_PROCESS_BULK>>({{"n_samples_max", nSamples}, {"disconnect_on_done", false}});
auto& block = graph.emplaceBlock<IntDecBlock<float>>({{"disconnect_on_done", false}});
auto& sink = graph.emplaceBlock<TagSink<float, ProcessFunction::USE_PROCESS_BULK>>({{"disconnect_on_done", false}});

expect(eq(ConnectionResult::SUCCESS, graph.connect<"out">(src).to<"in">(block)));
expect(eq(ConnectionResult::SUCCESS, graph.connect<"out">(block).template to<"in">(sink)));

graph.reconnectAllEdges();
auto blockInit = [](auto& block) {
if (block.state() == lifecycle::State::IDLE) {
std::ignore = block.changeStateTo(lifecycle::State::INITIALISED);
}
std::ignore = block.changeStateTo(lifecycle::State::RUNNING);
expect(block.state() == lifecycle::State::RUNNING);
};
blockInit(src);
blockInit(block);
blockInit(sink);

auto resultSrc = src.work(100);
expect(eq(resultSrc.performed_work, 100UZ));

auto resultBlock = block.work(10); // requestedWork is applied, process 10 samples
expect(eq(resultBlock.requested_work, 10UZ));
expect(eq(resultBlock.performed_work, 10UZ));

expect(block.settings().set({{"output_chunk_size", gr::Size_t(7)}, {"input_chunk_size", gr::Size_t(7)}}).empty());
expect(block.settings().activateContext() != std::nullopt);
resultBlock = block.work(8); // requestedWork is applied, process only one `input_chunk_size` which fits to requestedWork
expect(eq(resultBlock.requested_work, 8UZ));
expect(eq(resultBlock.performed_work, 7UZ));
resultBlock = block.work(28); // requestedWork is applied, process 4 `input_chunk_size` which fits to requestedWork
expect(eq(resultBlock.requested_work, 28UZ));
expect(eq(resultBlock.performed_work, 28UZ));
resultBlock = block.work(5); // requestedWork is clamped to `input_chunk_size`
expect(eq(resultBlock.requested_work, 5UZ));
expect(eq(resultBlock.performed_work, 7UZ)); // 7 samples are processed
expect(block.settings().set({{"output_chunk_size", gr::Size_t(1)}, {"input_chunk_size", gr::Size_t(1)}}).empty());
expect(block.settings().activateContext() != std::nullopt);
resultBlock = block.work(); // process last 48 samples
expect(eq(resultBlock.requested_work, std::numeric_limits<std::size_t>::max()));
expect(eq(resultBlock.performed_work, 48UZ));
auto resultSink = sink.work(100);
expect(eq(resultSink.performed_work, 100UZ));
};
};
int main() { /* not needed for UT */ }

0 comments on commit c723af1

Please sign in to comment.