Skip to content

Commit

Permalink
Include Async ports to the calculation of nSamplesToTag.
Browse files Browse the repository at this point in the history
  • Loading branch information
drslebedev authored and wirew0rm committed Jan 25, 2024
1 parent 33d93f1 commit 017eb01
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 11 deletions.
24 changes: 13 additions & 11 deletions core/include/gnuradio-4.0/Block.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -482,18 +482,20 @@ struct Block : protected std::tuple<Arguments...> {

auto adjust_for_input_port = [&ps = ports_status]<PortLike Port>(Port &port) {
if constexpr (std::remove_cvref_t<Port>::kIsSynch) {
if (!port.isConnected()) {
return;
if (port.isConnected()) {
ps.has_sync_input_ports = true;
ps.in_min_samples = std::max(ps.in_min_samples, port.min_samples);
ps.in_max_samples = std::min(ps.in_max_samples, port.max_samples);
ps.in_available = std::min(ps.in_available, port.streamReader().available());
ps.in_at_least_one_port_has_data = ps.in_at_least_one_port_has_data | (port.streamReader().available() > 0);
ps.in_at_least_one_tag_available = ps.in_at_least_one_port_has_data | (port.tagReader().available() > 0);
}
ps.has_sync_input_ports = true;
ps.in_min_samples = std::max(ps.in_min_samples, port.min_samples);
ps.in_max_samples = std::min(ps.in_max_samples, port.max_samples);
ps.in_available = std::min(ps.in_available, port.streamReader().available());
ps.nSamplesToNextTag = std::min(ps.nSamplesToNextTag, nSamplesUntilNextTag(port).value_or(std::numeric_limits<std::size_t>::max()));
ps.nSamplesToNextTagAfter = std::min(ps.nSamplesToNextTagAfter, nSamplesUntilNextTag(port, 1).value_or(std::numeric_limits<std::size_t>::max())); /* 1: in case nextTag == 0 */
ps.nSamplesToEosTag = std::min(ps.nSamplesToEosTag, samples_to_eos_tag(port).value_or(std::numeric_limits<std::size_t>::max()));
ps.in_at_least_one_port_has_data = ps.in_at_least_one_port_has_data | (port.streamReader().available() > 0);
ps.in_at_least_one_tag_available = ps.in_at_least_one_port_has_data | (port.tagReader().available() > 0);
}
// if Async ports are present then we still want to process Tags fo these ports
if (port.isConnected()) {
ps.nSamplesToNextTag = std::min(ps.nSamplesToNextTag, nSamplesUntilNextTag(port).value_or(std::numeric_limits<std::size_t>::max()));
ps.nSamplesToNextTagAfter = std::min(ps.nSamplesToNextTagAfter, nSamplesUntilNextTag(port, 1).value_or(std::numeric_limits<std::size_t>::max())); /* 1: in case nextTag == 0 */
ps.nSamplesToEosTag = std::min(ps.nSamplesToEosTag, samples_to_eos_tag(port).value_or(std::numeric_limits<std::size_t>::max()));
}
};
for_each_port([&adjust_for_input_port](PortLike auto &port) { adjust_for_input_port(port); }, inputPorts(&self()));
Expand Down
42 changes: 42 additions & 0 deletions core/test/qa_Block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@

#include <fmt/format.h>

#include "gnuradio-4.0/basic/clock_source.hpp"
#include <gnuradio-4.0/Block.hpp>
#include <gnuradio-4.0/Graph.hpp>
#include <gnuradio-4.0/Scheduler.hpp>
#include <gnuradio-4.0/testing/TagMonitors.hpp>

#if defined(__clang__) && __clang_major__ >= 16
// clang 16 does not like ut's default reporter_junit due to some issues with stream buffers and output redirection
Expand Down Expand Up @@ -98,8 +100,29 @@ struct IntDecBlock : public gr::Block<IntDecBlock<T>, gr::ResamplingRatio<>, gr:
}
};

template<typename T>
struct AsyncBlock : gr::Block<AsyncBlock<T>> {
gr::PortIn<T, gr::Async> in;
gr::PortOut<T, gr::Async> out;

gr::work::Status
processBulk(gr::PortIn<T, gr::Async>::ReaderType *inReader, gr::PortOut<T, gr::Async>::WriterType *outputWriter) {
auto available = std::min(inReader->available(), outputWriter->available());
if (available == 0) {
return gr::work::Status::INSUFFICIENT_INPUT_ITEMS;
}
auto inSpan = inReader->get(available);
auto outSpan = outputWriter->reserve_output_range(available);
std::copy(inSpan.begin(), std::next(inSpan.begin(), static_cast<std::ptrdiff_t>(available)), outSpan.begin());
boost::ut::expect(inReader->consume(available)) << "Samples were not consumed";
outSpan.publish(available);
return gr::work::Status::OK;
}
};

ENABLE_REFLECTION_FOR_TEMPLATE_FULL((typename T), (CountSource<T>), out, count, n_samples);
ENABLE_REFLECTION_FOR_TEMPLATE_FULL((typename T), (IntDecBlock<T>), in, out);
ENABLE_REFLECTION_FOR_TEMPLATE_FULL((typename T), (AsyncBlock<T>), in, out);

void
interpolation_decimation_test(const IntDecTestData &data, std::shared_ptr<gr::thread_pool::BasicThreadPool> thread_pool) {
Expand Down Expand Up @@ -287,6 +310,25 @@ const boost::ut::suite _stride_tests = [] {
stride_test( {.n_samples = 1000000, .stride = 249900, .in_port_max = 100, .exp_in = 100, .exp_out = 100, .exp_counter = 5, .exp_total_in = 500, .exp_total_out = 500 }, thread_pool);
};
// clang-format on

"Async ports tests"_test = [&thread_pool] {
using namespace gr;
using namespace gr::testing;
constexpr std::uint64_t n_samples = 1000;
constexpr float sample_rate = 1000.f;
Graph testGraph;
auto &tagSrc = testGraph.emplaceBlock<TagSource<float>>({ { "sample_rate", sample_rate }, { "n_samples_max", n_samples }, { "name", "TagSource" } });
auto &asyncBlock = testGraph.emplaceBlock<AsyncBlock<float>>({ { "name", "AsyncBlock" } });
auto &sink = testGraph.emplaceBlock<TagSink<float, ProcessFunction::USE_PROCESS_ONE>>({ { "name", "TagSink" }, { "verbose_console", true } });

expect(eq(ConnectionResult::SUCCESS, testGraph.connect<"out">(tagSrc).to<"in">(asyncBlock)));
expect(eq(ConnectionResult::SUCCESS, testGraph.connect<"out">(asyncBlock).to<"in">(sink)));

scheduler::Simple sched{ std::move(testGraph) };
sched.runAndWait();

expect(eq(n_samples, static_cast<std::uint32_t>(sink.n_samples_produced))) << "Number of samples does not match";
};
};

int
Expand Down

0 comments on commit 017eb01

Please sign in to comment.