Skip to content

Commit

Permalink
ConsumableInputRange and buffer_reader updates:
Browse files Browse the repository at this point in the history
* ConsumableInputRange counter
* Deferred consume(), consume is executed in destructor of the last
  range using proper SpanReleasePolicy
* Default SpanReleasePolicy is ProcessNone
* reader.get() by default returns all available samples
* Multiple consecutive reader.get() calls are possible, nSamples of the first
  get() call is the max for all subsequent calls.
* Remove consume() method from buffer_reader, consume is possible only
  via ConsumableInputRange.consume()
  • Loading branch information
drslebedev committed Apr 18, 2024
1 parent 07de569 commit ac7f40a
Show file tree
Hide file tree
Showing 7 changed files with 279 additions and 205 deletions.
20 changes: 10 additions & 10 deletions blocks/basic/include/gnuradio-4.0/basic/DataSink.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -432,21 +432,21 @@ synchronously (/asynchronously) if handled by the same (/different) sink block.

const auto readData = reader.get(nProcess);
if constexpr (requires { fnc(std::span<const T>(), std::span<const Tag>()); }) {
const auto tags = tag_reader.get(tag_reader.available());
const auto tags = tag_reader.get();
const auto it = std::find_if_not(tags.begin(), tags.end(), [until = static_cast<int64_t>(samples_read + nProcess)](const auto &tag) { return tag.index < until; });
auto relevantTags = std::vector<Tag>(tags.begin(), it);
for (auto &t : relevantTags) {
t.index -= static_cast<int64_t>(samples_read);
}
fnc(readData, std::span<const Tag>(relevantTags));
std::ignore = tag_reader.consume(relevantTags.size());
std::ignore = tags.consume(relevantTags.size());
} else {
const auto tags = tag_reader.get(tag_reader.available());
std::ignore = tag_reader.consume(tags.size());
const auto tags = tag_reader.get();
std::ignore = tags.consume(tags.size());
fnc(readData);
}

std::ignore = reader.consume(nProcess);
std::ignore = readData.consume(nProcess);
samples_read += nProcess;
return true;
}
Expand All @@ -469,7 +469,7 @@ synchronously (/asynchronously) if handled by the same (/different) sink block.

const auto readData = reader.get(nProcess);
fnc(readData);
std::ignore = reader.consume(nProcess);
std::ignore = readData.consume(nProcess);
return true;
}
};
Expand Down Expand Up @@ -651,9 +651,9 @@ synchronously (/asynchronously) if handled by the same (/different) sink block.
static constexpr auto callbackTakesTags = std::is_invocable_v<Callback, std::span<const T>, std::span<const Tag>>
|| std::is_invocable_v<Callback, std::span<const T>, std::span<const Tag>, const DataSink<T> &>;

const DataSink<T> &parent_sink;
bool block = false;
std::size_t samples_written = 0;
const DataSink<T> &parent_sink;
bool block = false;
std::size_t samples_written = 0;
std::optional<detail::Metadata> _pendingMetadata;

// callback-only
Expand All @@ -663,7 +663,7 @@ synchronously (/asynchronously) if handled by the same (/different) sink block.

// polling-only
std::weak_ptr<Poller> polling_handler = {};
Callback callback;
Callback callback;

template<typename CallbackFW>
explicit ContinuousListener(std::size_t maxChunkSize, CallbackFW &&c, const DataSink<T> &parent) : parent_sink(parent), buffer(maxChunkSize), callback{ std::forward<CallbackFW>(c) } {}
Expand Down
14 changes: 7 additions & 7 deletions core/benchmarks/bm-nosonar_node_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -203,10 +203,10 @@ class gen_operation_SIMD : public gr::Block<gen_operation_SIMD<T, op>, gr::PortI
return { requested_work, 0UL, gr::work::Status::INSUFFICIENT_OUTPUT_ITEMS };
}
const std::size_t n_to_publish = std::min(n_readable, n_writable);
const auto input = reader.get();

writer.publish( //
[&reader, n_to_publish, this](std::span<T> output) {
const auto input = reader.get(reader.available());
[&input, n_to_publish, this](std::span<T> output) {
// #### N.B. later high-level user-function starts here

using namespace vir::stdx;
Expand Down Expand Up @@ -248,7 +248,7 @@ class gen_operation_SIMD : public gr::Block<gen_operation_SIMD<T, op>, gr::PortI
},
n_to_publish);

if (!reader.consume(n_to_publish)) {
if (!input.consume(n_to_publish)) {
return { requested_work, n_to_publish, gr::work::Status::ERROR };
}
return { requested_work, n_to_publish, gr::work::Status::OK };
Expand Down Expand Up @@ -294,22 +294,22 @@ class copy : public gr::Block<copy<T, N_MIN, N_MAX, use_bulk_operation, use_memc
return { requested_work, 0UL, gr::work::Status::INSUFFICIENT_OUTPUT_ITEMS };
}
const std::size_t n_to_publish = std::min(n_readable, n_writable);
const auto input = reader.get();

if constexpr (use_memcopy) {
// fmt::print("n_to_publish {} - {} {}\n", n_to_publish, use_bulk_operation, use_memcopy);
writer.publish( //
[&reader, n_to_publish](std::span<T> output) { std::memcpy(output.data(), reader.get(reader.available()).data(), n_to_publish * sizeof(T)); }, n_to_publish);
[&reader, n_to_publish](std::span<T> output) { std::memcpy(output.data(), reader.get().data(), n_to_publish * sizeof(T)); }, n_to_publish);
} else {
writer.publish( //
[&reader, n_to_publish](std::span<T> output) {
const auto input = reader.get();
[&input, n_to_publish](std::span<T> output) {
for (std::size_t i = 0; i < n_to_publish; i++) {
output[i] = input[i];
}
},
n_to_publish);
}
if (!reader.consume(n_to_publish)) {
if (!input.consume(n_to_publish)) {
return { requested_work, 0UL, gr::work::Status::ERROR };
}
return { requested_work, 0UL, gr::work::Status::OK };
Expand Down
Loading

0 comments on commit ac7f40a

Please sign in to comment.