Skip to content

Commit

Permalink
StreamToDataSet: if n_max is set, produce only one DataSet with maxim…
Browse files Browse the repository at this point in the history
…um n_max samples

Signed-off-by: drslebedev <[email protected]>
  • Loading branch information
drslebedev authored and RalphSteinhagen committed Sep 30, 2024
1 parent 24dd79b commit f94b029
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 40 deletions.
54 changes: 29 additions & 25 deletions blocks/basic/include/gnuradio-4.0/basic/StreamToDataSet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,15 @@ If multiple 'start' or 'stop' Tags arrive in a single merged tag, only one DataS
newBuffer.push_back_bulk(_history);
_history = std::move(newBuffer);
}

if constexpr (!streamOut) {
if (newSettings.contains("n_pre") || newSettings.contains("n_post") || newSettings.contains("n_max")) {
if (n_max != 0UZ && n_pre + n_post > n_max) {
using namespace gr::message;
throw gr::exception(fmt::format("ill-formed settings: n_pre({}) + n_post({}) > n_max({})", n_pre, n_post, n_max));
}
}
}
}

gr::work::Status processBulk(ConsumableSpan auto& inSamples /* equivalent to std::span<const T> */, PublishableSpan auto& outSamples /* equivalent to std::span<T> */) {
Expand Down Expand Up @@ -222,13 +231,16 @@ If multiple 'start' or 'stop' Tags arrive in a single merged tag, only one DataS
auto& accState = _accState[i];

// Note: Tags for pre-samples are not added to DataSet
const Tag& mergedTag = this->mergedInputTag();
if (!ds.timing_events.empty() && !mergedTag.map.empty() && accState.isActive) {
ds.timing_events[0].emplace_back(Tag{static_cast<Tag::signed_index_type>(ds.signal_values.size()), mergedTag.map});
if (n_max.value > ds.signal_values.size()) { // do not add Tags if DataSet is full
const Tag& mergedTag = this->mergedInputTag();
if (!ds.timing_events.empty() && !mergedTag.map.empty() && accState.isActive) {
ds.timing_events[0].emplace_back(Tag{static_cast<Tag::signed_index_type>(ds.signal_values.size()), mergedTag.map});
}
}

// pre samples data accumulation
if (accState.isPreActive) {
// no need to check for n_max here: n_pre + n_post <= n_max
const std::size_t nPreSamplesToCopy = std::min(static_cast<std::size_t>(n_pre.value), _history.size()); // partially write pre samples if not enough samples stored in HistoryBuffer
const auto historyEnd = std::next(_history.cbegin(), static_cast<std::ptrdiff_t>(nPreSamplesToCopy));
ds.signal_values.insert(ds.signal_values.end(), std::make_reverse_iterator(historyEnd), std::make_reverse_iterator(_history.cbegin()));
Expand All @@ -239,14 +251,21 @@ If multiple 'start' or 'stop' Tags arrive in a single merged tag, only one DataS
}

if (!accState.isPostActive) { // normal data accumulation
ds.signal_values.insert(ds.signal_values.end(), inSamples.begin(), inSamples.end());
fillAxisValues(ds, static_cast<int>(accState.nSamples - accState.nPreSamples), inSamples.size());
accState.nSamples += inSamples.size();
const std::size_t nSamplesToCopy = std::min(n_max.value - ds.signal_values.size(), inSamples.size());
if (nSamplesToCopy > 0) {
ds.signal_values.insert(ds.signal_values.end(), inSamples.begin(), inSamples.begin() + nSamplesToCopy);
fillAxisValues(ds, static_cast<int>(accState.nSamples - accState.nPreSamples), nSamplesToCopy);
accState.nSamples += nSamplesToCopy;
}
} else { // post samples data accumulation
const std::size_t nPostSamplesToCopy = std::min(accState.nPostSamplesRemain, inSamples.size());
ds.signal_values.insert(ds.signal_values.end(), inSamples.begin(), std::next(inSamples.begin(), static_cast<std::ptrdiff_t>(nPostSamplesToCopy)));
fillAxisValues(ds, static_cast<int>(accState.nSamples - accState.nPreSamples), nPostSamplesToCopy);
accState.updatePostSamples(nPostSamplesToCopy);
const std::size_t nPostSamplesToCopy = std::min({n_max.value - ds.signal_values.size(), accState.nPostSamplesRemain, inSamples.size()});
if (nPostSamplesToCopy > 0) {
ds.signal_values.insert(ds.signal_values.end(), inSamples.begin(), std::next(inSamples.begin(), static_cast<std::ptrdiff_t>(nPostSamplesToCopy)));
fillAxisValues(ds, static_cast<int>(accState.nSamples - accState.nPreSamples), nPostSamplesToCopy);
accState.updatePostSamples(nPostSamplesToCopy);
} else {
accState.isActive = false;
}
}
}
this->_mergedInputTag.map.clear(); // ensure that the input tag is only propagated once
Expand Down Expand Up @@ -274,21 +293,6 @@ If multiple 'start' or 'stop' Tags arrive in a single merged tag, only one DataS
break;
}
}

// publish dataset partially if signal_values.size() > n_max
for (std::size_t i = 0; i < _tempDataSets.size(); i++) {
auto& ds = _tempDataSets[i];
if (n_max != 0UZ && ds.signal_values.size() >= n_max) {
assert(!ds.extents.empty());
ds.extents[1] = static_cast<std::int32_t>(ds.signal_values.size());
gr::dataset::updateMinMax(ds);
outSamples[publishedCounter] = std::move(ds);
DataSet<T> newDs;
initNewDataSet(newDs);
_tempDataSets[i] = newDs; // start again from empty DataSet
publishedCounter++;
}
}
outSamples.publish(publishedCounter);

return work::Status::OK;
Expand Down
54 changes: 39 additions & 15 deletions blocks/basic/test/qa_StreamToDataSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ const boost::ut::suite<"StreamToStream test"> streamToStreamTest = [] {
using namespace gr::basic;
using namespace gr::testing;

auto runTestStream = [](gr::Size_t nSamples, std::string filter, gr::Size_t preSamples, gr::Size_t postSamples, const std::vector<float>& expectedValues, std::size_t nTags, gr::Size_t maxSamples = 100000U) {
auto runTestStream = [](gr::Size_t nSamples, std::string filter, gr::Size_t preSamples, gr::Size_t postSamples, const std::vector<float>& expectedValues, std::size_t nTags) {
constexpr float sample_rate = 1'000.f;
Graph graph;

Expand All @@ -140,7 +140,7 @@ const boost::ut::suite<"StreamToStream test"> streamToStreamTest = [] {
genTrigger(22, "CMD_DIAG_TRIGGER1", "CMD_DIAG_TRIGGER1") // it is also used as end trigger for "including" mode
};

const property_map blockSettings = {{"filter", filter}, {"n_pre", preSamples}, {"n_post", postSamples}, {"n_max", maxSamples}};
const property_map blockSettings = {{"filter", filter}, {"n_pre", preSamples}, {"n_post", postSamples}};
auto& filterStreamToStream = graph.emplaceBlock<StreamFilter<float>>(blockSettings);
auto& streamSink = graph.emplaceBlock<TagSink<float, ProcessFunction::USE_PROCESS_ONE>>({{"name", "streamSink"}, {"log_tags", true}, {"log_samples", true}, {"verbose_console", false}});
expect(eq(gr::ConnectionResult::SUCCESS, graph.connect<"out">(tagSrc).template to<"in">(filterStreamToStream)));
Expand Down Expand Up @@ -244,28 +244,52 @@ const boost::ut::suite<"StreamToDataSet test"> streamToDataSetTest = [] {
};

std::vector<std::vector<float>> expectedValues = {{5, 6, 7, 8, 9}, {15, 16, 17, 18, 19, 20, 21, 22, 23, 24}, {20, 21, 22, 23, 24, 25, 26, 27, 28, 29}};
"start->stop matcher (excluding)"_test = [&runTestDataSet, &expectedValues] { runTestDataSet(50U, "[CMD_BP_START/FAIR.SELECTOR.C=1:S=1:P=1, CMD_BP_START/FAIR.SELECTOR.C=1:S=1:P=2]", 0, 0, expectedValues, {3UZ, 3UZ, 4UZ}); };
"start->stop (excluding)"_test = [&runTestDataSet, &expectedValues] { runTestDataSet(50U, "[CMD_BP_START/FAIR.SELECTOR.C=1:S=1:P=1, CMD_BP_START/FAIR.SELECTOR.C=1:S=1:P=2]", 0, 0, expectedValues, {3UZ, 3UZ, 4UZ}); };

expectedValues = {{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, //
{8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31}, //
{13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36}};
"start->stop matcher (excluding +pre/post)"_test = [&runTestDataSet, &expectedValues] { runTestDataSet(50U, "[CMD_BP_START/FAIR.SELECTOR.C=1:S=1:P=1, CMD_BP_START/FAIR.SELECTOR.C=1:S=1:P=2]", 7, 7, expectedValues, {5UZ, 5UZ, 5UZ}); };
expectedValues = {{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, //
{8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31}, //
{13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36}};
"start->stop (excluding +pre/post)"_test = [&runTestDataSet, &expectedValues] { runTestDataSet(50U, "[CMD_BP_START/FAIR.SELECTOR.C=1:S=1:P=1, CMD_BP_START/FAIR.SELECTOR.C=1:S=1:P=2]", 7, 7, expectedValues, {5UZ, 5UZ, 5UZ}); };

expectedValues = {{5, 6, 7, 8, 9, 10, 11}, //
{15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26}, //
{20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31}};
"start->^stop matcher (including)"_test = [&runTestDataSet, &expectedValues] { runTestDataSet(50U, "[CMD_BP_START/FAIR.SELECTOR.C=1:S=1:P=1, CMD_BP_START/^FAIR.SELECTOR.C=1:S=1:P=2]", 0, 0, expectedValues, {4UZ, 4UZ, 5UZ}); };
expectedValues = {{5, 6, 7, 8, 9, 10, 11}, //
{15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26}, //
{20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31}};
"start->^stop (including)"_test = [&runTestDataSet, &expectedValues] { runTestDataSet(50U, "[CMD_BP_START/FAIR.SELECTOR.C=1:S=1:P=1, CMD_BP_START/^FAIR.SELECTOR.C=1:S=1:P=2]", 0, 0, expectedValues, {4UZ, 4UZ, 5UZ}); };

expectedValues = {{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18}, //
{8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33}, //
{13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38}};
"start->^stop matcher (including. +pre/post)"_test = [&runTestDataSet, &expectedValues] { runTestDataSet(50U, "[CMD_BP_START/FAIR.SELECTOR.C=1:S=1:P=1, CMD_BP_START/^FAIR.SELECTOR.C=1:S=1:P=2]", 7, 7, expectedValues, {5UZ, 6UZ, 5UZ}); };
expectedValues = {{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18}, //
{8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33}, //
{13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38}};
"start->^stop (including. +pre/post)"_test = [&runTestDataSet, &expectedValues] { runTestDataSet(50U, "[CMD_BP_START/FAIR.SELECTOR.C=1:S=1:P=1, CMD_BP_START/^FAIR.SELECTOR.C=1:S=1:P=2]", 7, 7, expectedValues, {5UZ, 6UZ, 5UZ}); };

expectedValues = {{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14}, //
{5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18}, //
{20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33}, //
{25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38}};
"single trigger (+pre/post)"_test = [&runTestDataSet, &expectedValues] { runTestDataSet(50U, "CMD_DIAG_TRIGGER1", 7, 7, expectedValues, {3UZ, 2UZ, 3UZ, 1UZ}); };

// n_max test
gr::Size_t nMaxSamples = 6;
expectedValues = {{5, 6, 7, 8, 9}, {15, 16, 17, 18, 19, 20}, {20, 21, 22, 23, 24, 25}};
"start->stop (excluding, n_max)"_test = [&runTestDataSet, &expectedValues, &nMaxSamples] { runTestDataSet(50U, "[CMD_BP_START/FAIR.SELECTOR.C=1:S=1:P=1, CMD_BP_START/FAIR.SELECTOR.C=1:S=1:P=2]", 0, 0, expectedValues, {3UZ, 2UZ, 2UZ}, nMaxSamples); };

nMaxSamples = 14;
expectedValues = {{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}, {8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21}, {13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26}};
"start->stop (excluding +pre/post, n_max)"_test = [&runTestDataSet, &expectedValues, &nMaxSamples] { runTestDataSet(50U, "[CMD_BP_START/FAIR.SELECTOR.C=1:S=1:P=1, CMD_BP_START/FAIR.SELECTOR.C=1:S=1:P=2]", 7, 7, expectedValues, {4UZ, 2UZ, 2UZ}, nMaxSamples); };

nMaxSamples = 6;
expectedValues = {{5, 6, 7, 8, 9, 10}, {15, 16, 17, 18, 19, 20}, {20, 21, 22, 23, 24, 25}};
"start->^stop (including, n_max)"_test = [&runTestDataSet, &expectedValues, &nMaxSamples] { runTestDataSet(50U, "[CMD_BP_START/FAIR.SELECTOR.C=1:S=1:P=1, CMD_BP_START/^FAIR.SELECTOR.C=1:S=1:P=2]", 0, 0, expectedValues, {3UZ, 2UZ, 2UZ}, nMaxSamples); };

nMaxSamples = 14;
expectedValues = {{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}, {8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21}, {13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26}};
"start->^stop (including. +pre/post, n_max)"_test = [&runTestDataSet, &expectedValues, &nMaxSamples] { runTestDataSet(50U, "[CMD_BP_START/FAIR.SELECTOR.C=1:S=1:P=1, CMD_BP_START/^FAIR.SELECTOR.C=1:S=1:P=2]", 7, 7, expectedValues, {4UZ, 2UZ, 2UZ}, nMaxSamples); };

nMaxSamples = 14;
expectedValues = {{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14}, //
{5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18}, //
{20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33}, //
{25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38}};
"single trigger (+pre/post, n_max)"_test = [&runTestDataSet, &expectedValues, &nMaxSamples] { runTestDataSet(50U, "CMD_DIAG_TRIGGER1", 7, 7, expectedValues, {3UZ, 2UZ, 3UZ, 1UZ}, nMaxSamples); };
};

int main() { /* not needed for UT */ }

0 comments on commit f94b029

Please sign in to comment.