Skip to content

Commit

Permalink
Implement new DONE logic for the work function using END_OF_STREAM tag.
Browse files Browse the repository at this point in the history
  • Loading branch information
drslebedev committed Dec 1, 2023
1 parent 593556f commit 6ca1845
Show file tree
Hide file tree
Showing 17 changed files with 834 additions and 569 deletions.
37 changes: 18 additions & 19 deletions blocks/basic/include/gnuradio-4.0/basic/DataSink.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ class DataSink : public Block<DataSink<T>> {
Annotated<T, "signal min", Doc<"signal physical min. (e.g. DAQ) limit">> signal_min = std::numeric_limits<T>::lowest();
Annotated<T, "signal max", Doc<"signal physical max. (e.g. DAQ) limit">> signal_max = std::numeric_limits<T>::max();

PortIn<T, RequiredSamples<std::dynamic_extent, _listener_buffer_size>> in;
PortIn<T, RequiredSamples<std::dynamic_extent, _listener_buffer_size>> in;

struct Poller {
// TODO consider whether reusing port<T> here makes sense
Expand Down Expand Up @@ -336,12 +336,12 @@ class DataSink : public Block<DataSink<T>> {
};

struct DataSetPoller {
gr::CircularBuffer<DataSet<T>> buffer = gr::CircularBuffer<DataSet<T>>(_listener_buffer_size);
decltype(buffer.new_reader()) reader = buffer.new_reader();
decltype(buffer.new_writer()) writer = buffer.new_writer();
gr::CircularBuffer<DataSet<T>> buffer = gr::CircularBuffer<DataSet<T>>(_listener_buffer_size);
decltype(buffer.new_reader()) reader = buffer.new_reader();
decltype(buffer.new_writer()) writer = buffer.new_writer();

std::atomic<bool> finished = false;
std::atomic<std::size_t> drop_count = 0;
std::atomic<bool> finished = false;
std::atomic<std::size_t> drop_count = 0;

[[nodiscard]] bool
process(std::invocable<std::span<DataSet<T>>> auto fnc, std::size_t requested = std::numeric_limits<std::size_t>::max()) {
Expand Down Expand Up @@ -373,21 +373,21 @@ class DataSink : public Block<DataSink<T>> {

std::shared_ptr<Poller>
getStreamingPoller(BlockingMode blockMode = BlockingMode::Blocking) {
const auto block = blockMode == BlockingMode::Blocking;
auto handler = std::make_shared<Poller>();
const auto block = blockMode == BlockingMode::Blocking;
auto handler = std::make_shared<Poller>();
std::lock_guard lg(_listener_mutex);
handler->finished = _listeners_finished;
handler->finished = _listeners_finished;
addListener(std::make_unique<ContinuousListener<gr::meta::null_type>>(handler, block, *this), block);
return handler;
}

template<TriggerMatcher M>
std::shared_ptr<DataSetPoller>
getTriggerPoller(M &&matcher, std::size_t preSamples, std::size_t postSamples, BlockingMode blockMode = BlockingMode::Blocking) {
const auto block = blockMode == BlockingMode::Blocking;
auto handler = std::make_shared<DataSetPoller>();
const auto block = blockMode == BlockingMode::Blocking;
auto handler = std::make_shared<DataSetPoller>();
std::lock_guard lg(_listener_mutex);
handler->finished = _listeners_finished;
handler->finished = _listeners_finished;
addListener(std::make_unique<TriggerListener<gr::meta::null_type, M>>(std::forward<M>(matcher), handler, preSamples, postSamples, block), block);
ensureHistorySize(preSamples);
return handler;
Expand Down Expand Up @@ -475,7 +475,6 @@ class DataSink : public Block<DataSink<T>> {
_history->push_back_bulk(inData.last(toWrite).begin(), inData.last(toWrite).end());
}
}

return work::Status::OK;
}

Expand Down Expand Up @@ -554,7 +553,7 @@ class DataSink : public Block<DataSink<T>> {
}

struct AbstractListener {
bool expired = false;
bool expired = false;

virtual ~AbstractListener() = default;

Expand Down Expand Up @@ -594,7 +593,7 @@ class DataSink : public Block<DataSink<T>> {
// polling-only
std::weak_ptr<Poller> polling_handler = {};

Callback callback;
Callback callback;

template<typename CallbackFW>
explicit ContinuousListener(std::size_t maxChunkSize, CallbackFW &&c, const DataSink<T> &parent) : parent_sink(parent), buffer(maxChunkSize), callback{ std::forward<CallbackFW>(c) } {}
Expand Down Expand Up @@ -709,16 +708,16 @@ class DataSink : public Block<DataSink<T>> {

template<typename Callback, TriggerMatcher M>
struct TriggerListener : public AbstractListener {
bool block = false;
std::size_t preSamples = 0;
std::size_t postSamples = 0;
bool block = false;
std::size_t preSamples = 0;
std::size_t postSamples = 0;

DataSet<T> dataset_template;
M trigger_matcher = {};
std::deque<PendingWindow> pending_trigger_windows; // triggers that still didn't receive all their data
std::weak_ptr<DataSetPoller> polling_handler = {};

Callback callback;
Callback callback;

template<TriggerMatcher Matcher>
explicit TriggerListener(Matcher &&matcher, std::shared_ptr<DataSetPoller> handler, std::size_t pre, std::size_t post, bool doBlock)
Expand Down
12 changes: 6 additions & 6 deletions blocks/basic/include/gnuradio-4.0/basic/clock_source.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,23 +106,23 @@ ClockSource Documentation -- add here
return work::Status::INSUFFICIENT_OUTPUT_ITEMS;
}

const std::uint32_t remaining_samples = n_samples_max - n_samples_produced;
const std::uint32_t limit = std::min(writableSamples, remaining_samples);
const std::uint32_t n_available = std::min(limit, chunk_size.value);
const std::uint32_t remaining_samples = n_samples_max - n_samples_produced;
const std::uint32_t limit = std::min(writableSamples, remaining_samples);
const std::uint32_t n_available = std::min(limit, chunk_size.value);

std::uint32_t samples_to_produce = n_available;
std::uint32_t samples_to_produce = n_available;
while (next_tag < tags.size() && tags[next_tag].index <= static_cast<std::make_signed_t<std::size_t>>(n_samples_produced + n_available)) {
gr::testing::print_tag(tags[next_tag], fmt::format("{}::processBulk(...)\t publish tag at {:6}", this->name, n_samples_produced));
Tag &out_tag = this->output_tags()[0];
out_tag = tags[next_tag];
out_tag.index = tags[next_tag].index - static_cast<std::make_signed_t<std::size_t>>(n_samples_produced);
samples_to_produce = static_cast<std::uint32_t>(tags[next_tag].index) - n_samples_produced;
this->forward_tags();
this->forwardTags();
next_tag++;
}
samples_to_produce = std::min(samples_to_produce, n_samples_max.value);

output.publish(samples_to_produce);
output.publish(static_cast<std::size_t>(samples_to_produce));
n_samples_produced += samples_to_produce;

if constexpr (basicPeriodAlgorithm) {
Expand Down
Loading

0 comments on commit 6ca1845

Please sign in to comment.