Skip to content

Commit

Permalink
Implement SyncBlock
Browse files Browse the repository at this point in the history
Signed-off-by: drslebedev <[email protected]>
  • Loading branch information
drslebedev committed Nov 21, 2024
1 parent 711226e commit 84aaf00
Show file tree
Hide file tree
Showing 9 changed files with 694 additions and 14 deletions.
6 changes: 2 additions & 4 deletions blocks/basic/include/gnuradio-4.0/basic/StreamToDataSet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -320,10 +320,8 @@ If multiple 'start' or 'stop' Tags arrive in a single merged tag, only one DataS

void copyInputSamplesToHistory(InputSpanLike auto& inSamples, std::size_t maxSamplesToCopy) {
if (n_pre > 0) {
const auto samplesToCopy = std::min(maxSamplesToCopy, inSamples.size());
const auto end = std::next(inSamples.begin(), static_cast<std::ptrdiff_t>(samplesToCopy));
const auto optimizedSamplesToCopy = std::min(static_cast<std::size_t>(_history.capacity()), samplesToCopy);
_history.push_back_bulk(std::prev(end, static_cast<std::ptrdiff_t>(optimizedSamplesToCopy)), end);
const auto samplesToCopy = std::min(maxSamplesToCopy, inSamples.size());
_history.push_back_bulk(inSamples.begin(), std::next(inSamples.begin(), static_cast<std::ptrdiff_t>(samplesToCopy)));
}
}

Expand Down
363 changes: 363 additions & 0 deletions blocks/basic/include/gnuradio-4.0/basic/SyncBlock.hpp

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions blocks/basic/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ if (ENABLE_BLOCK_REGISTRY AND ENABLE_BLOCK_PLUGINS)
add_ut_test(qa_BasicKnownBlocks)
endif ()
add_ut_test(qa_StreamToDataSet)
add_ut_test(qa_SyncBlock)

message(STATUS "###Python Include Dirs: ${Python3_INCLUDE_DIRS}")
if (PYTHON_AVAILABL AND ENABLE_BLOCK_REGISTRY AND ENABLE_BLOCK_PLUGINS)
add_ut_test(qa_PythonBlock)
endif ()

205 changes: 205 additions & 0 deletions blocks/basic/test/qa_SyncBlock.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
#include <boost/ut.hpp>
#include <gnuradio-4.0/Graph.hpp>
#include <gnuradio-4.0/Scheduler.hpp>
#include <gnuradio-4.0/basic/SyncBlock.hpp>
#include <gnuradio-4.0/testing/TagMonitors.hpp>

#include <fmt/format.h>

struct TestParams {
std::string testName = "";
gr::Size_t nSamples = 0U; // 0 -> take inValues[i].size()
gr::Size_t maxHistorySize = 0U; // if 0 -> take default
std::string filter = ""; // if "" -> take default
std::uint64_t tolerance = std::numeric_limits<std::uint64_t>::max(); // if max() -> take default

std::vector<std::vector<int>> inValues;
std::vector<std::vector<gr::Tag>> inTags;
std::vector<std::vector<int>> expectedValues;
std::vector<std::vector<gr::Tag>> expectedTags;
std::size_t expectedNSamples = 0UZ;
};

void runTest(const TestParams& par) {
using namespace boost::ut;
using namespace gr;
using namespace gr::basic;
using namespace gr::testing;

expect(eq(par.inValues.size(), par.inTags.size()));

gr::Graph graph;

std::size_t nPorts = par.inValues.size();

property_map syncBlockParams = {{"n_ports", static_cast<gr::Size_t>(nPorts)}};
if (par.maxHistorySize != 0) {
syncBlockParams.insert_or_assign("max_history_size", par.maxHistorySize);
}
if (par.tolerance != std::numeric_limits<std::uint64_t>::max()) {
syncBlockParams.insert_or_assign("tolerance", par.tolerance);
}
if (par.filter != "") {
syncBlockParams.insert_or_assign("filter", par.filter);
}
auto& syncBlock = graph.emplaceBlock<SyncBlock<int>>(syncBlockParams);

std::vector<TagSource<int, ProcessFunction::USE_PROCESS_BULK>*> sources;
std::vector<TagSink<int, ProcessFunction::USE_PROCESS_BULK>*> sinks;

for (std::size_t i = 0; i < nPorts; i++) {
property_map srcParams = {{"values", par.inValues[i]}, {"verbose_console", false}, {"disconnect_on_done", false}};
if (par.nSamples != 0) {
srcParams.insert_or_assign("n_samples_max", par.nSamples);
} else {
srcParams.insert_or_assign("n_samples_max", static_cast<gr::Size_t>(par.inValues[i].size()));
}

sources.push_back(std::addressof(graph.emplaceBlock<TagSource<int, ProcessFunction::USE_PROCESS_BULK>>(srcParams)));
sources[i]->_tags = par.inTags[i];
expect(gr::ConnectionResult::SUCCESS == graph.connect(*sources[i], "out"s, syncBlock, "inputs#"s + std::to_string(i)));
}

for (std::size_t i = 0; i < nPorts; i++) {
property_map sinkParams = {{"verbose_console", false}, {"disconnect_on_done", false}};
if (par.expectedValues.empty()) {
sinkParams.insert_or_assign("log_samples", false);
}
sinks.push_back(std::addressof(graph.emplaceBlock<TagSink<int, ProcessFunction::USE_PROCESS_BULK>>(sinkParams)));
expect(gr::ConnectionResult::SUCCESS == graph.connect(syncBlock, "outputs#"s + std::to_string(i), *sinks[i], "in"s));
}

gr::scheduler::Simple sched{std::move(graph)};
sched.runAndWait();

for (std::size_t i = 0; i < sinks.size(); i++) {
if (par.expectedValues.empty()) {
expect(eq(par.expectedNSamples, sinks[i]->_nSamplesProduced));
} else {
expect(std::ranges::equal(sinks[i]->_samples, par.expectedValues[i])) << fmt::format("sinks[{}]->_samples does not match to expected values:\nSink:{}\nExpected:{}\n", i, sinks[i]->_samples, par.expectedValues[i]);
}
}

for (std::size_t i = 0; i < sinks.size(); i++) {
expect(equal_tag_lists(sinks[i]->_tags, par.expectedTags[i], {}));
}
}

gr::Tag genSyncTag(std::size_t index, std::uint64_t triggerTime, std::string triggerName = "TriggerName") { //
return {index, {{gr::tag::TRIGGER_NAME.shortKey(), triggerName}, {gr::tag::TRIGGER_TIME.shortKey(), triggerTime}}};
};

gr::Tag genDropTag(std::size_t index, std::uint64_t nSamplesDropped) { //
return {index, {{gr::tag::N_DROPPED_SAMPLES.shortKey(), nSamplesDropped}}};
};

gr::Tag genDropSyncTag(std::size_t index, std::uint64_t nSamplesDropped, std::uint64_t triggerTime, std::string triggerName = "TriggerName") { //
return {index, {{gr::tag::N_DROPPED_SAMPLES.shortKey(), nSamplesDropped}, {gr::tag::TRIGGER_NAME.shortKey(), triggerName}, {gr::tag::TRIGGER_TIME.shortKey(), triggerTime}}};
};

const boost::ut::suite SyncBlockTests = [] {
using namespace boost::ut;
using namespace gr;
using namespace gr::basic;
using namespace gr::testing;

"SyncBlock basic test"_test = [] {
runTest({ //
.tolerance = 2ULL, //
.inValues = //
{ //
{1, 0, 1, 2, 3, 0, 1, 2, 3, 4, 0, 1}, //
{1, 2, 0, 1, 2, 3, 4, 0, 1, 2, 3, 0, 1, 2}, //
{1, 2, 3, 0, 1, 2, 3, 4, 5, 0, 1, 2, 0, 1, 2, 3}}, //
.inTags = //
{ //
{genSyncTag(1, 99), genSyncTag(5, 201), genSyncTag(10, 301)}, //
{genSyncTag(2, 100), genSyncTag(7, 199), genSyncTag(11, 299)}, //
{genSyncTag(3, 101), genSyncTag(9, 200), genSyncTag(12, 300)}}, //
.expectedValues = //
{ //
{1, 0, 1, 2, 3, 0, 1, 2, 0, 1}, //
{2, 0, 1, 2, 3, 0, 1, 2, 0, 1}, //
{3, 0, 1, 2, 3, 0, 1, 2, 0, 1}}, //
.expectedTags = //
{ //
{genSyncTag(1, 99), genSyncTag(5, 201), genDropSyncTag(8, 2, 301)}, //
{genDropTag(0, 1), genSyncTag(1, 100), genDropSyncTag(5, 1, 199), genDropSyncTag(8, 1, 299)}, //
{genDropTag(0, 2), genSyncTag(1, 101), genDropSyncTag(5, 2, 200), genSyncTag(8, 300)}}});
};

"SyncBlock missing tag test"_test = [] {
runTest({ //
.tolerance = 2ULL, //
.inValues = //
{ //
{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}, //
{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}, //
{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}}, //
.inTags = //
{ //
{genSyncTag(1, 100), genSyncTag(5, 200), genSyncTag(10, 300)}, //
{genSyncTag(2, 100), genSyncTag(10, 300)}, //
{genSyncTag(4, 200), genSyncTag(10, 300)}}, //
.expectedValues = //
{ //
{5, 6, 7, 8, 9, 10, 11}, //
{5, 6, 7, 8, 9, 10, 11}, //
{5, 6, 7, 8, 9, 10, 11}}, //
.expectedTags = //
{ //
{genDropSyncTag(0, 5, 200), genSyncTag(5, 300)}, // Sample 5 was copied to the output, including the sync tag, even though the tag was not used
{genDropTag(0, 5), genSyncTag(5, 300)}, //
{genDropTag(0, 5), genSyncTag(5, 300)}}});
};

"SyncBlock isSync test"_test = [] {
runTest({ //
.nSamples = 300'000, //
.maxHistorySize = 32'000, //
.tolerance = 2ULL, //
.inValues = {{}, {}}, //
.inTags = //
{ //
{genSyncTag(10, 100), genSyncTag(100'100, 200), genSyncTag(201'000, 300)}, //
{genSyncTag(1, 100), genSyncTag(100'000, 200), genSyncTag(200'000, 300)}}, //
.expectedValues = {}, // //
.expectedTags = //
{ //
{genDropTag(0, 9), genSyncTag(1, 100), genDropTag(65537, 91), genSyncTag(100'000, 200), genDropSyncTag(200'000, 900, 300)}, // 65537 -> depends on buffer size
{genSyncTag(1, 100), genSyncTag(100'000, 200), genSyncTag(200'000, 300)}}, //
.expectedNSamples = 299'000});
};

"SyncBlock back pressure test"_test = [] {
runTest({ //
.nSamples = 300'000, //
.maxHistorySize = 32'000, //
.tolerance = 2ULL, //
.inValues = {{}, {}}, //
.inTags = //
{ //
{genSyncTag(1, 100), genSyncTag(1000, 200), genSyncTag(200'000, 300)}, //
{genSyncTag(1, 100), genSyncTag(100'000, 200), genSyncTag(200'000, 300)}}, //
.expectedValues = {}, //
.expectedTags = //
{ //
{genSyncTag(1, 100), genDropTag(1000, 167000), genSyncTag(33'000, 300)}, //
{genSyncTag(1, 100), genDropTag(1000, 167000), genSyncTag(33'000, 300)}}, //
.expectedNSamples = 133'000});
};

"SyncBlock back pressure test 2"_test = [] {
runTest({ //
.nSamples = 300'000, //
.maxHistorySize = 32'000, //
.tolerance = 2ULL, //
.inValues = {{}, {}}, //
.inTags = {{genSyncTag(100'000, 100)}, {genSyncTag(101'000, 100)}}, //
.expectedValues = {}, //
.expectedTags = {{genDropTag(0, 68000), genSyncTag(32'000, 100)}, {genDropTag(0, 69000), genSyncTag(32'000, 100)}}, //
.expectedNSamples = 231'000});
};
};

int main() {}
3 changes: 2 additions & 1 deletion core/benchmarks/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
function (add_gr_benchmark BM_NAME)
function(add_gr_benchmark BM_NAME)
add_benchmark(${BM_NAME})
target_link_libraries(${BM_NAME} PRIVATE gnuradio-core fmt gr-basic gr-fileio gr-math gr-testing)
target_compile_options(${BM_NAME} PRIVATE -O3) # performance related benchmarks should be optimised during compile-time
Expand All @@ -10,4 +10,5 @@ add_gr_benchmark(bm_Profiler)
add_gr_benchmark(bm_Scheduler)
add_gr_benchmark(bm-nosonar_node_api)
add_gr_benchmark(bm_fft)
add_gr_benchmark(bm_sync)
target_link_libraries(bm_fft PRIVATE gr-fourier)
Loading

0 comments on commit 84aaf00

Please sign in to comment.