Skip to content

Commit

Permalink
Implement SyncBlock
Browse files Browse the repository at this point in the history
Signed-off-by: drslebedev <[email protected]>
  • Loading branch information
drslebedev committed Nov 19, 2024
1 parent f3e8810 commit 47a2067
Show file tree
Hide file tree
Showing 7 changed files with 584 additions and 13 deletions.
6 changes: 2 additions & 4 deletions blocks/basic/include/gnuradio-4.0/basic/StreamToDataSet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -320,10 +320,8 @@ If multiple 'start' or 'stop' Tags arrive in a single merged tag, only one DataS

void copyInputSamplesToHistory(InputSpanLike auto& inSamples, std::size_t maxSamplesToCopy) {
if (n_pre > 0) {
const auto samplesToCopy = std::min(maxSamplesToCopy, inSamples.size());
const auto end = std::next(inSamples.begin(), static_cast<std::ptrdiff_t>(samplesToCopy));
const auto optimizedSamplesToCopy = std::min(static_cast<std::size_t>(_history.capacity()), samplesToCopy);
_history.push_back_bulk(std::prev(end, static_cast<std::ptrdiff_t>(optimizedSamplesToCopy)), end);
const auto samplesToCopy = std::min(maxSamplesToCopy, inSamples.size());
_history.push_back_bulk(inSamples.begin(), std::next(inSamples.begin(), static_cast<std::ptrdiff_t>(samplesToCopy)));
}
}

Expand Down
363 changes: 363 additions & 0 deletions blocks/basic/include/gnuradio-4.0/basic/SyncBlock.hpp

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions blocks/basic/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ if (ENABLE_BLOCK_REGISTRY AND ENABLE_BLOCK_PLUGINS)
add_ut_test(qa_BasicKnownBlocks)
endif ()
add_ut_test(qa_StreamToDataSet)
add_ut_test(qa_SyncBlock)

message(STATUS "###Python Include Dirs: ${Python3_INCLUDE_DIRS}")
if (PYTHON_AVAILABL AND ENABLE_BLOCK_REGISTRY AND ENABLE_BLOCK_PLUGINS)
add_ut_test(qa_PythonBlock)
endif ()

207 changes: 207 additions & 0 deletions blocks/basic/test/qa_SyncBlock.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
#include <boost/ut.hpp>
#include <gnuradio-4.0/Graph.hpp>
#include <gnuradio-4.0/Scheduler.hpp>
#include <gnuradio-4.0/basic/SyncBlock.hpp>
#include <gnuradio-4.0/testing/TagMonitors.hpp>

#include <fmt/format.h>

struct TestParams {
std::string testName = "";
gr::Size_t nSamples = 0U; // 0 -> take inValues[i].size()
gr::Size_t maxHistorySize = 0U; // if 0 -> take default
std::string filter = ""; // if "" -> take default
std::uint64_t tolerance = std::numeric_limits<std::uint64_t>::max(); // if max() -> take default

std::vector<std::vector<int>> inValues;
std::vector<std::vector<gr::Tag>> inTags;
std::vector<std::vector<int>> expectedValues;
std::vector<std::vector<gr::Tag>> expectedTags;
std::size_t expectedNSamples = 0UZ;
};

void runTest(const TestParams& par) {
using namespace boost::ut;
using namespace gr;
using namespace gr::basic;
using namespace gr::testing;

expect(eq(par.inValues.size(), par.inTags.size()));

gr::Graph graph;

std::size_t nPorts = par.inValues.size();

property_map syncBlockParams = {{"n_ports", static_cast<gr::Size_t>(nPorts)}};
if (par.maxHistorySize != 0) {
syncBlockParams.insert_or_assign("max_history_size", par.maxHistorySize);
}
if (par.tolerance != std::numeric_limits<std::uint64_t>::max()) {
syncBlockParams.insert_or_assign("tolerance", par.tolerance);
}
if (par.filter != "") {
syncBlockParams.insert_or_assign("filter", par.filter);
}
auto& syncBlock = graph.emplaceBlock<SyncBlock<int>>(syncBlockParams);

std::vector<TagSource<int, ProcessFunction::USE_PROCESS_BULK>*> sources;
std::vector<TagSink<int, ProcessFunction::USE_PROCESS_BULK>*> sinks;

for (std::size_t i = 0; i < nPorts; i++) {
property_map srcParams = {{"values", par.inValues[i]}, {"verbose_console", false}, {"disconnect_on_done", false}};
if (par.nSamples != 0) {
srcParams.insert_or_assign("n_samples_max", par.nSamples);
} else {
srcParams.insert_or_assign("n_samples_max", static_cast<gr::Size_t>(par.inValues[i].size()));
}

sources.push_back(std::addressof(graph.emplaceBlock<TagSource<int, ProcessFunction::USE_PROCESS_BULK>>(srcParams)));
expect(sources[i]->settings().applyStagedParameters().forwardParameters.empty());
sources[i]->_tags = par.inTags[i];
expect(gr::ConnectionResult::SUCCESS == graph.connect(*sources[i], "out"s, syncBlock, "inputs#"s + std::to_string(i)));
}

for (std::size_t i = 0; i < nPorts; i++) {
property_map sinkParams = {{"verbose_console", false}, {"disconnect_on_done", false}};
if (par.expectedValues.empty()) {
sinkParams.insert_or_assign("log_samples", false);
}
sinks.push_back(std::addressof(graph.emplaceBlock<TagSink<int, ProcessFunction::USE_PROCESS_BULK>>(sinkParams)));
expect(sinks[i]->settings().applyStagedParameters().forwardParameters.empty());
expect(gr::ConnectionResult::SUCCESS == graph.connect(syncBlock, "outputs#"s + std::to_string(i), *sinks[i], "in"s));
}

gr::scheduler::Simple sched{std::move(graph)};
sched.runAndWait();

for (std::size_t i = 0; i < sinks.size(); i++) {
if (par.expectedValues.empty()) {
expect(eq(par.expectedNSamples, sinks[i]->_nSamplesProduced));
} else {
expect(std::ranges::equal(sinks[i]->_samples, par.expectedValues[i])) << fmt::format("sinks[{}]->_samples does not match to expected values:\nSink:{}\nExpected:{}\n", i, sinks[i]->_samples, par.expectedValues[i]);
}
}

for (std::size_t i = 0; i < sinks.size(); i++) {
expect(equal_tag_lists(sinks[i]->_tags, par.expectedTags[i], {}));
}
}

gr::Tag genSyncTag(std::size_t index, std::uint64_t triggerTime, std::string triggerName = "TriggerName") { //
return {index, {{gr::tag::TRIGGER_NAME.shortKey(), triggerName}, {gr::tag::TRIGGER_TIME.shortKey(), triggerTime}}};
};

gr::Tag genDropTag(std::size_t index, std::uint64_t nSamplesDropped) { //
return {index, {{gr::tag::N_DROPPED_SAMPLES.shortKey(), nSamplesDropped}}};
};

gr::Tag genDropSyncTag(std::size_t index, std::uint64_t nSamplesDropped, std::uint64_t triggerTime, std::string triggerName = "TriggerName") { //
return {index, {{gr::tag::N_DROPPED_SAMPLES.shortKey(), nSamplesDropped}, {gr::tag::TRIGGER_NAME.shortKey(), triggerName}, {gr::tag::TRIGGER_TIME.shortKey(), triggerTime}}};
};

const boost::ut::suite SyncBlockTests = [] {
using namespace boost::ut;
using namespace gr;
using namespace gr::basic;
using namespace gr::testing;

"SyncBlock basic test"_test = [] {
runTest({ //
.tolerance = 2ULL, //
.inValues = //
{ //
{1, 0, 1, 2, 3, 0, 1, 2, 3, 4, 0, 1}, //
{1, 2, 0, 1, 2, 3, 4, 0, 1, 2, 3, 0, 1, 2}, //
{1, 2, 3, 0, 1, 2, 3, 4, 5, 0, 1, 2, 0, 1, 2, 3}}, //
.inTags = //
{ //
{genSyncTag(1, 99), genSyncTag(5, 201), genSyncTag(10, 301)}, //
{genSyncTag(2, 100), genSyncTag(7, 199), genSyncTag(11, 299)}, //
{genSyncTag(3, 101), genSyncTag(9, 200), genSyncTag(12, 300)}}, //
.expectedValues = //
{ //
{1, 0, 1, 2, 3, 0, 1, 2, 0, 1}, //
{2, 0, 1, 2, 3, 0, 1, 2, 0, 1}, //
{3, 0, 1, 2, 3, 0, 1, 2, 0, 1}}, //
.expectedTags = //
{ //
{genSyncTag(1, 99), genSyncTag(5, 201), genDropSyncTag(8, 2, 301)}, //
{genDropTag(0, 1), genSyncTag(1, 100), genDropSyncTag(5, 1, 199), genDropSyncTag(8, 1, 299)}, //
{genDropTag(0, 2), genSyncTag(1, 101), genDropSyncTag(5, 2, 200), genSyncTag(8, 300)}}});
};

"SyncBlock missing tag test"_test = [] {
runTest({ //
.tolerance = 2ULL, //
.inValues = //
{ //
{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}, //
{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}, //
{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}}, //
.inTags = //
{ //
{genSyncTag(1, 100), genSyncTag(5, 200), genSyncTag(10, 300)}, //
{genSyncTag(2, 100), genSyncTag(10, 300)}, //
{genSyncTag(4, 200), genSyncTag(10, 300)}}, //
.expectedValues = //
{ //
{5, 6, 7, 8, 9, 10, 11}, //
{5, 6, 7, 8, 9, 10, 11}, //
{5, 6, 7, 8, 9, 10, 11}}, //
.expectedTags = //
{ //
{genDropSyncTag(0, 5, 200), genSyncTag(5, 300)}, // Sample 5 was copied to the output, including the sync tag, even though the tag was not used
{genDropTag(0, 5), genSyncTag(5, 300)}, //
{genDropTag(0, 5), genSyncTag(5, 300)}}});
};

"SyncBlock isSync test"_test = [] {
runTest({ //
.nSamples = 300'000, //
.maxHistorySize = 32'000, //
.tolerance = 2ULL, //
.inValues = {{}, {}}, //
.inTags = //
{ //
{genSyncTag(10, 100), genSyncTag(100'100, 200), genSyncTag(201'000, 300)}, //
{genSyncTag(1, 100), genSyncTag(100'000, 200), genSyncTag(200'000, 300)}}, //
.expectedValues = {}, // //
.expectedTags = //
{ //
{genDropTag(0, 9), genSyncTag(1, 100), genDropTag(65537, 91), genSyncTag(100'000, 200), genDropSyncTag(200'000, 900, 300)}, // 65537 -> depends on buffer size
{genSyncTag(1, 100), genSyncTag(100'000, 200), genSyncTag(200'000, 300)}}, //
.expectedNSamples = 299'000});
};

"SyncBlock back pressure test"_test = [] {
runTest({ //
.nSamples = 300'000, //
.maxHistorySize = 32'000, //
.tolerance = 2ULL, //
.inValues = {{}, {}}, //
.inTags = //
{ //
{genSyncTag(1, 100), genSyncTag(1000, 200), genSyncTag(200'000, 300)}, //
{genSyncTag(1, 100), genSyncTag(100'000, 200), genSyncTag(200'000, 300)}}, //
.expectedValues = {}, //
.expectedTags = //
{ //
{genSyncTag(1, 100), genDropTag(1000, 167000), genSyncTag(33'000, 300)}, //
{genSyncTag(1, 100), genDropTag(1000, 167000), genSyncTag(33'000, 300)}}, //
.expectedNSamples = 133'000});
};

"SyncBlock back pressure test 2"_test = [] {
runTest({ //
.nSamples = 300'000, //
.maxHistorySize = 32'000, //
.tolerance = 2ULL, //
.inValues = {{}, {}}, //
.inTags = {{genSyncTag(100'000, 100)}, {genSyncTag(101'000, 100)}}, //
.expectedValues = {}, //
.expectedTags = {{genDropTag(0, 68000), genSyncTag(32'000, 100)}, {genDropTag(0, 69000), genSyncTag(32'000, 100)}}, //
.expectedNSamples = 231'000});
};
};

int main() {}
8 changes: 4 additions & 4 deletions core/include/gnuradio-4.0/HistoryBuffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,9 @@ class HistoryBuffer {
*/
template<typename Iter>
constexpr void push_back_bulk(Iter cbegin, Iter cend) noexcept {
for (auto it = cbegin; it != cend; ++it) {
const auto nSamplesToCopy = std::distance(cbegin, cend);
Iter optimizedBegin = static_cast<std::size_t>(nSamplesToCopy) > _capacity ? std::prev(cend, static_cast<std::ptrdiff_t>(_capacity)) : cbegin;
for (auto it = optimizedBegin; it != cend; ++it) {
push_back(*it);
}
}
Expand All @@ -112,9 +114,7 @@ class HistoryBuffer {
*/
template<typename Range>
constexpr void push_back_bulk(const Range& range) noexcept {
for (const auto& item : range) {
push_back(item);
}
push_back_bulk(range.cbegin(), range.cend());
}

/**
Expand Down
3 changes: 2 additions & 1 deletion core/include/gnuradio-4.0/Tag.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ inline EM_CONSTEXPR_STATIC DefaultTag<"signal_name", std::string, "", "signal na
inline EM_CONSTEXPR_STATIC DefaultTag<"signal_unit", std::string, "", "signal's physical SI unit"> SIGNAL_UNIT;
inline EM_CONSTEXPR_STATIC DefaultTag<"signal_min", float, "a.u.", "signal physical max. (e.g. DAQ) limit"> SIGNAL_MIN;
inline EM_CONSTEXPR_STATIC DefaultTag<"signal_max", float, "a.u.", "signal physical max. (e.g. DAQ) limit"> SIGNAL_MAX;
inline EM_CONSTEXPR_STATIC DefaultTag<"n_dropped_samples", std::size_t, "", "number of dropped samples"> N_DROPPED_SAMPLES;
inline EM_CONSTEXPR_STATIC DefaultTag<"trigger_name", std::string> TRIGGER_NAME;
inline EM_CONSTEXPR_STATIC DefaultTag<"trigger_time", uint64_t, "ns", "UTC-based time-stamp"> TRIGGER_TIME;
inline EM_CONSTEXPR_STATIC DefaultTag<"trigger_offset", float, "s", "sample delay w.r.t. the trigger (e.g.compensating analog group delays)"> TRIGGER_OFFSET;
Expand All @@ -179,7 +180,7 @@ inline EM_CONSTEXPR_STATIC DefaultTag<"reset_default", bool, "", "reset block st
inline EM_CONSTEXPR_STATIC DefaultTag<"store_default", bool, "", "store block settings as default"> STORE_DEFAULTS;
inline EM_CONSTEXPR_STATIC DefaultTag<"end_of_stream", bool, "", "end of stream, receiver should change to DONE state"> END_OF_STREAM;

inline constexpr std::array<std::string_view, 14> kDefaultTags = {"sample_rate", "signal_name", "signal_quantity", "signal_unit", "signal_min", "signal_max", "trigger_name", "trigger_time", "trigger_offset", "trigger_meta_info", "context", "reset_default", "store_default", "end_of_stream"};
inline constexpr std::array<std::string_view, 15> kDefaultTags = {"sample_rate", "signal_name", "signal_quantity", "signal_unit", "signal_min", "signal_max", "n_dropped_samples", "trigger_name", "trigger_time", "trigger_offset", "trigger_meta_info", "context", "reset_default", "store_default", "end_of_stream"};

} // namespace tag

Expand Down
8 changes: 4 additions & 4 deletions core/test/qa_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -792,10 +792,10 @@ const boost::ut::suite HistoryBufferTest = [] {
auto in = std::vector{1, 2, 3, 4, 5, 6};
hb_overflow.push_back_bulk(in.begin(), in.end());
expect(eq(hb_overflow[0], 6));
hb_overflow.push_back_bulk(std::vector{7, 8, 9});
expect(eq(hb_overflow[0], 9));
hb_overflow.push_back_bulk(std::array{10, 11, 12});
expect(eq(hb_overflow[0], 12));
hb_overflow.push_back_bulk(std::vector{7, 8, 9, 10, 11, 12, 13, 14});
expect(eq(hb_overflow[0], 14));
hb_overflow.push_back_bulk(std::array{15, 16, 17});
expect(eq(hb_overflow[0], 17));

// Test with different types, e.g., double
HistoryBuffer<double> hb_double(5);
Expand Down

0 comments on commit 47a2067

Please sign in to comment.