diff --git a/blocks/basic/include/gnuradio-4.0/basic/StreamToDataSet.hpp b/blocks/basic/include/gnuradio-4.0/basic/StreamToDataSet.hpp index aa37015f..ac8b3015 100644 --- a/blocks/basic/include/gnuradio-4.0/basic/StreamToDataSet.hpp +++ b/blocks/basic/include/gnuradio-4.0/basic/StreamToDataSet.hpp @@ -2,20 +2,30 @@ #define GNURADIO_STREAMTODATASET_HPP #include "gnuradio-4.0/TriggerMatcher.hpp" -#include #include #include #include +#include #include namespace gr::basic { template - requires(std::is_arithmetic_v || gr::meta::complex_like) -struct StreamFilterImpl : Block, Doc || gr::meta::complex_like) +struct StreamFilterImpl : Block> { + using Description = Doc based on tag-based pre- / post-conditions +Notes: +* n_pre must be less than the size of the output port's CircularBuffer. The block waits until all n_pre samples can be written to the output in a single iteration. +* In stream-to-stream mode, the 'stop' Tag is not included in the output stream. +* In stream-to-dataset mode, the 'stop' Tag is included in the DataSet output. +* Stream-to-stream mode has limited support for overlapping triggers. Configurations such as Start-Stop-Start-Stop work as expected, +but configurations like Start-Start-Stop-Stop result in undefined behavior. +If data accumulation is active, the second 'start' is ignored, and the output may vary depending on subsequent tags. +* Tags for pre-samples are not stored and are not included in the output. +* It is expected that 'start' and 'stop' Tags will come from different iterations. +If multiple 'start' or 'stop' Tags arrive in a single merged tag, only one DataSet is created. This may lead to incorrect or unexpected output.)">; -)"">> { constexpr static std::size_t MIN_BUFFER_SIZE = 1024U; template // optional annotation shortening using A = Annotated; @@ -27,8 +37,7 @@ struct StreamFilterImpl : Block, Doc/, /]'">> filter; - A> filterState; - A> n_pre = 0U; + A> n_pre = 0U; // Note: It is assumed that n_pre <= output port CircularBuffer size, and we wait until all n_pre samples can be written to the output in a single iteration. A> n_post = 0U; A> n_max = 0U; @@ -41,150 +50,285 @@ struct StreamFilterImpl : Block, Doc> signal_max = 1.f; // internal trigger state - HistoryBuffer _history{ MIN_BUFFER_SIZE + n_pre }; + HistoryBuffer _history{MIN_BUFFER_SIZE + n_pre}; TMatcher _matcher{}; - bool _accumulationActive = false; - std::size_t _nSamplesWritten = 0UZ; - std::size_t _nPostSamplesWritten = 0UZ; - DataSet _tempDataSet; - bool _dataSetAccumulation = false; - - void - reset() { - filterState.value.clear(); - _accumulationActive = false; - _dataSetAccumulation = false; + + struct AccumulationState { + bool isActive = false; + bool isPreActive = false; + bool isPostActive = false; + bool isSingleTrigger = false; + std::size_t nPostSamplesRemain = 0UZ; + std::size_t nPreSamples = 0UZ; + std::size_t nSamples = 0UZ; + + void update(bool startTrigger, bool endTrigger, bool isSingle, gr::Size_t nPre, gr::Size_t nPost) { + isSingleTrigger = isSingle; + if (!isActive) { + if (startTrigger) { + isPreActive = nPre > 0; // No pre samples -> Done + isActive = true; + nSamples = 0UZ; + if (isSingleTrigger) { + isPostActive = true; + nPostSamplesRemain = nPost; + } + } + } + + if (isActive && !isPostActive && endTrigger) { + isPostActive = true; + nPostSamplesRemain = nPost; + } + } + + void updatePostSamples(std::size_t nPostSamplesToCopy) { + nPostSamplesRemain -= nPostSamplesToCopy; + nSamples += nPostSamplesToCopy; + + if (nPostSamplesRemain == 0UZ) { + isActive = false; + isPostActive = false; + } + } + + void reset() { + isActive = false; + isPreActive = false; + isPostActive = false; + nPostSamplesRemain = 0UZ; + } + }; + + std::conditional_t> _accState{}; + std::deque> _tempDataSets; + std::conditional_t> _filterState; + + void reset() { + _filterState.clear(); + if constexpr (streamOut) { + _accState.reset(); + } else { + _tempDataSets.clear(); + _accState.clear(); + } } - void - settingsChanged(const gr::property_map & /*oldSettings*/, const gr::property_map &newSettings) { + void settingsChanged(const gr::property_map& /*oldSettings*/, const gr::property_map& newSettings) { if (newSettings.contains("n_pre")) { + if constexpr (streamOut) { + if (n_pre.value > out.buffer().streamBuffer.size()) { + using namespace gr::message; + throw gr::exception("n_pre must be <= output port CircularBuffer size"); + } + } auto newBuffer = HistoryBuffer(MIN_BUFFER_SIZE + std::get(newSettings.at("n_pre"))); newBuffer.push_back_bulk(_history); _history = std::move(newBuffer); } } - gr::work::Status - processBulk(ConsumableSpan auto inSamples /* equivalent to std::span */, PublishableSpan auto &outSamples /* equivalent to std::span */) { - bool firstTrigger = false; - bool lastTrigger = false; - if (const trigger::MatchResult matchResult = _matcher(filter.value, this->mergedInputTag(), filterState.value); matchResult != trigger::MatchResult::Ignore) { - assert(filterState.value.contains("isSingleTrigger")); - _accumulationActive = (matchResult == trigger::MatchResult::Matching); - firstTrigger = _accumulationActive; - lastTrigger = !_accumulationActive || std::get(filterState.value.at("isSingleTrigger")); - if (std::get(filterState.value.at("isSingleTrigger"))) { // handled by the n_pre and n_post settings - _accumulationActive = false; - } - if constexpr (!streamOut) { - if (firstTrigger) { - _tempDataSet = DataSet(); - initNewDataSet(_tempDataSet); - } - } - } - if (lastTrigger) { - _nPostSamplesWritten = n_post; + gr::work::Status processBulk(ConsumableSpan auto& inSamples /* equivalent to std::span */, PublishableSpan auto& outSamples /* equivalent to std::span */) { + if constexpr (streamOut) { + return processBulkStream(inSamples, outSamples); + } else { + return processBulkDataSet(inSamples, outSamples); } + } - const std::size_t nInAvailable = inSamples.size(); - std::size_t nOutAvailable = outSamples.size(); - std::size_t nSamplesToPublish = 0UZ; + gr::work::Status processBulkStream(ConsumableSpan auto& inSamples, PublishableSpan auto& outSamples) { + const auto [startTrigger, endTrigger, isSingleTrigger] = detectTrigger(_filterState); + _accState.update(startTrigger, endTrigger, isSingleTrigger, n_pre, n_post); - if (firstTrigger) { // handle pre-trigger samples kept in _history - _nSamplesWritten = 0UZ; - if constexpr (streamOut) { - if (n_pre >= nOutAvailable) { + if (!_accState.isActive) { // If accumulation is not active, consume all input samples and publish 0 samples. + copyInputSamplesToHistory(inSamples, inSamples.size()); + std::ignore = inSamples.consume(inSamples.size()); + outSamples.publish(0UZ); + } else { // accumulation is active + std::size_t nOutAvailable = outSamples.size(); + std::size_t nSamplesToPublish = 0UZ; + + // pre samples data accumulation + auto nPreSamplesToCopy = 0UZ; + if (_accState.isPreActive) { + // Note: It is assumed that n_pre <= output port CircularBuffer size, and we wait until all n_pre samples can be written to the output in a single iteration. + nPreSamplesToCopy = std::min(static_cast(n_pre.value), _history.size()); // partially write pre samples if not enough samples stored in HistoryBuffer + if (nPreSamplesToCopy > nOutAvailable) { std::ignore = inSamples.consume(0UZ); outSamples.publish(0UZ); return work::Status::INSUFFICIENT_OUTPUT_ITEMS; } - const auto nPreSamplesToPublish = static_cast(n_pre.value); - std::ranges::copy_n(_history.cbegin(), static_cast(std::min(nPreSamplesToPublish, nOutAvailable)), outSamples.begin()); - nSamplesToPublish += n_pre; - nOutAvailable -= n_pre; - } else { - _tempDataSet.signal_values.insert(_tempDataSet.signal_values.end(), _history.cbegin(), std::next(_history.cbegin(), static_cast(n_pre.value))); - for (int i = -static_cast(n_pre); i < 0; i++) { - _tempDataSet.axis_values[0].emplace_back(static_cast(i) / sample_rate); - } + auto startIt = std::next(_history.begin(), static_cast(nPreSamplesToCopy)); + std::ranges::copy_n(std::make_reverse_iterator(startIt), static_cast(nPreSamplesToCopy), outSamples.begin()); + nSamplesToPublish += nPreSamplesToCopy; + nOutAvailable -= nPreSamplesToCopy; + _accState.isPreActive = false; + _accState.nSamples += nPreSamplesToCopy; } - _nSamplesWritten += n_pre; - } - if constexpr (!streamOut) { // move tags into DataSet - const Tag &mergedTag = this->mergedInputTag(); - if (!_tempDataSet.timing_events.empty() && !mergedTag.map.empty() && (_accumulationActive || _nPostSamplesWritten > 0 || std::get(filterState.value.at("isSingleTrigger")))) { - _tempDataSet.timing_events[0].emplace_back(Tag{ static_cast(_nSamplesWritten), mergedTag.map }); + if (!_accState.isPostActive) { // normal data accumulation + const std::size_t nSamplesToCopy = std::min(inSamples.size(), nOutAvailable); + std::ranges::copy_n(inSamples.begin(), static_cast(nSamplesToCopy), std::next(outSamples.begin(), static_cast(nSamplesToPublish))); + nSamplesToPublish += nSamplesToCopy; + _accState.nSamples += nSamplesToCopy; + } else { // post samples data accumulation + const std::size_t nPostSamplesToCopy = std::min(_accState.nPostSamplesRemain, std::min(inSamples.size(), nOutAvailable)); + std::ranges::copy_n(inSamples.begin(), static_cast(nPostSamplesToCopy), std::next(outSamples.begin(), static_cast(nSamplesToPublish))); + nSamplesToPublish += nPostSamplesToCopy; + _accState.updatePostSamples(nPostSamplesToCopy); } - this->_mergedInputTag.map.clear(); // ensure that the input tag is only propagated once + + copyInputSamplesToHistory(inSamples, nSamplesToPublish - nPreSamplesToCopy); + std::ignore = inSamples.consume(nSamplesToPublish - nPreSamplesToCopy); + outSamples.publish(nSamplesToPublish); } + return work::Status::OK; + } - std::size_t nSamplesToCopy = 0UZ; - if (_accumulationActive) { // handle normal data accumulation - if constexpr (streamOut) { - nSamplesToCopy += std::min(nInAvailable, nOutAvailable); - std::copy_n(inSamples.begin(), static_cast(nSamplesToCopy), std::next(outSamples.begin(), static_cast(nSamplesToPublish))); - nOutAvailable -= nSamplesToCopy; - } else { - nSamplesToCopy += nInAvailable; - _tempDataSet.signal_values.insert(_tempDataSet.signal_values.end(), inSamples.begin(), inSamples.end()); - _tempDataSet.axis_values[0].reserve(_nSamplesWritten + nInAvailable); - for (auto i = 0U; i < nInAvailable; i++) { - _tempDataSet.axis_values[0].emplace_back(static_cast(_nSamplesWritten - n_pre + i) / sample_rate); + gr::work::Status processBulkDataSet(ConsumableSpan auto& inSamples, PublishableSpan auto& outSamples) { + // This is a workaround to support cases of overlapping datasets, for example, Start1-Start2-Stop1-Stop2 case. + // always add new DataSet when Start trigger is present + property_map tmpFilterState; + const auto [startTrigger, endTrigger, isSingleTrigger] = detectTrigger(tmpFilterState); + if (startTrigger) { + _tempDataSets.emplace_back(); + initNewDataSet(_tempDataSets.back()); + + _accState.emplace_back(); + _accState.back().update(startTrigger, endTrigger, isSingleTrigger, n_pre, n_post); + + _filterState.push_back(tmpFilterState); + } + + // Update state only for the front dataset which is not in the isPostActiveState + for (std::size_t i = 0; i < _tempDataSets.size(); i++) { + if (!_accState[i].isPostActive) { + const auto [startTrigger2, endTrigger2, isSingleTrigger2] = detectTrigger(_filterState[i]); + if (endTrigger2) { + _accState[i].update(startTrigger2, endTrigger2, isSingleTrigger2, n_pre, n_post); } + break; // only the first one should be updated } - _nSamplesWritten += nSamplesToCopy; - nSamplesToPublish += nSamplesToCopy; } - if (_nPostSamplesWritten > 0) { // handle post-trigger samples - if constexpr (streamOut) { - const std::size_t nPostSamplesToPublish = std::min(_nPostSamplesWritten, std::min(nInAvailable, nOutAvailable)); - std::copy_n(inSamples.begin(), nPostSamplesToPublish, std::next(outSamples.begin(), static_cast(nSamplesToPublish))); - nSamplesToCopy += nPostSamplesToPublish; - nSamplesToPublish += nPostSamplesToPublish; - _nSamplesWritten += nPostSamplesToPublish; - _nPostSamplesWritten -= nPostSamplesToPublish; - } else { - const std::size_t nPostSamplesToPublish = std::min(_nPostSamplesWritten, nInAvailable); - _tempDataSet.signal_values.insert(_tempDataSet.signal_values.end(), inSamples.begin(), std::next(inSamples.begin(), static_cast(nPostSamplesToPublish))); - for (std::size_t i = 0; i < nPostSamplesToPublish; i++) { - _tempDataSet.axis_values[0].emplace_back(static_cast(_nSamplesWritten - n_pre + i) / sample_rate); + if (_tempDataSets.empty()) { // If accumulation is not active (no active DataSets), consume all input samples and publish 0 samples. + copyInputSamplesToHistory(inSamples, inSamples.size()); + std::ignore = inSamples.consume(inSamples.size()); + outSamples.publish(0UZ); + return work::Status::OK; + } else { // accumulation is active (at least one DataSets is active) + for (std::size_t i = 0; i < _tempDataSets.size(); i++) { + auto& ds = _tempDataSets[i]; + auto& accState = _accState[i]; + + // Note: Tags for pre-samples are not added to DataSet + const Tag& mergedTag = this->mergedInputTag(); + if (!ds.timing_events.empty() && !mergedTag.map.empty() && accState.isActive) { + ds.timing_events[0].emplace_back(Tag{static_cast(ds.signal_values.size()), mergedTag.map}); + } + + // pre samples data accumulation + if (accState.isPreActive) { + const std::size_t nPreSamplesToCopy = std::min(static_cast(n_pre.value), _history.size()); // partially write pre samples if not enough samples stored in HistoryBuffer + const auto historyEnd = std::next(_history.cbegin(), static_cast(nPreSamplesToCopy)); + ds.signal_values.insert(ds.signal_values.end(), std::make_reverse_iterator(historyEnd), std::make_reverse_iterator(_history.cbegin())); + fillAxisValues(ds, -static_cast(nPreSamplesToCopy), nPreSamplesToCopy); + accState.isPreActive = false; + accState.nPreSamples = nPreSamplesToCopy; + accState.nSamples += nPreSamplesToCopy; + } + + if (!accState.isPostActive) { // normal data accumulation + ds.signal_values.insert(ds.signal_values.end(), inSamples.begin(), inSamples.end()); + fillAxisValues(ds, static_cast(accState.nSamples - accState.nPreSamples), inSamples.size()); + accState.nSamples += inSamples.size(); + } else { // post samples data accumulation + const std::size_t nPostSamplesToCopy = std::min(accState.nPostSamplesRemain, inSamples.size()); + ds.signal_values.insert(ds.signal_values.end(), inSamples.begin(), std::next(inSamples.begin(), static_cast(nPostSamplesToCopy))); + fillAxisValues(ds, static_cast(accState.nSamples - accState.nPreSamples), nPostSamplesToCopy); + accState.updatePostSamples(nPostSamplesToCopy); } - _nSamplesWritten += nPostSamplesToPublish; - _nPostSamplesWritten -= nPostSamplesToPublish; } + this->_mergedInputTag.map.clear(); // ensure that the input tag is only propagated once } - if (n_pre > 0) { // copy the last min(n_pre, nInAvailable) samples to the history buffer - _history.push_back_bulk(std::prev(inSamples.end(), static_cast(std::min(static_cast(n_pre), nInAvailable))), inSamples.end()); - } + copyInputSamplesToHistory(inSamples, inSamples.size()); + std::ignore = inSamples.consume(inSamples.size()); - std::ignore = inSamples.consume(nInAvailable); - if constexpr (streamOut) { - outSamples.publish(nSamplesToPublish); - } else { - // publish a single sample if the DataSet accumulation is complete, otherwise publish 0UZ - if (((lastTrigger || (std::get(filterState.value.at("isSingleTrigger")) && _nSamplesWritten > 0)) && _nPostSamplesWritten == 0) - || (_dataSetAccumulation && _tempDataSet.signal_values.size() >= n_max)) { - assert(!_tempDataSet.extents.empty()); - _tempDataSet.extents[1] = static_cast(_nSamplesWritten); - gr::dataset::updateMinMax(_tempDataSet); - outSamples[0] = std::move(_tempDataSet); - outSamples.publish(1UZ); - _nSamplesWritten = 0UZ; + // publish all completed DataSet + std::size_t publishedCounter = 0UZ; + while (true) { + if (!_tempDataSets.empty() && !_accState.front().isActive) { + auto& ds = _tempDataSets.front(); + assert(!ds.extents.empty()); + ds.extents[1] = static_cast(ds.signal_values.size()); + if (!ds.signal_values.empty()) { // TODO: do we need to publish empty DataSet at all, empty DataSet can occur when n_max is set. + gr::dataset::updateMinMax(ds); + } + outSamples[publishedCounter] = std::move(ds); + _tempDataSets.pop_front(); + _accState.pop_front(); + _filterState.pop_front(); + publishedCounter++; } else { - outSamples.publish(0UZ); + break; } } + + // publish dataset partially if signal_values.size() > n_max + for (std::size_t i = 0; i < _tempDataSets.size(); i++) { + auto& ds = _tempDataSets[i]; + if (n_max != 0UZ && ds.signal_values.size() >= n_max) { + assert(!ds.extents.empty()); + ds.extents[1] = static_cast(ds.signal_values.size()); + gr::dataset::updateMinMax(ds); + outSamples[publishedCounter] = std::move(ds); + DataSet newDs; + initNewDataSet(newDs); + _tempDataSets[i] = newDs; // start again from empty DataSet + publishedCounter++; + } + } + outSamples.publish(publishedCounter); + return work::Status::OK; } private: - void - initNewDataSet(DataSet &dataSet) const { + auto detectTrigger(property_map& filterState) { + struct { + bool startTrigger = false; + bool endTrigger = false; + bool isSingleTrigger = false; + } result; + + const trigger::MatchResult matchResult = _matcher(filter.value, this->mergedInputTag(), filterState); + if (matchResult != trigger::MatchResult::Ignore) { + assert(filterState.contains("isSingleTrigger")); + result.startTrigger = matchResult == trigger::MatchResult::Matching; + result.endTrigger = matchResult == trigger::MatchResult::NotMatching; + result.isSingleTrigger = std::get(filterState.at("isSingleTrigger")); + } + return result; + } + + void copyInputSamplesToHistory(ConsumableSpan auto& inSamples, std::size_t maxSamplesToCopy) { + if (n_pre > 0) { + const auto samplesToCopy = std::min(maxSamplesToCopy, inSamples.size()); + const auto end = std::next(inSamples.begin(), static_cast(samplesToCopy)); + const auto optimizedSamplesToCopy = std::min(static_cast(_history.capacity()), samplesToCopy); + _history.push_back_bulk(std::prev(end, static_cast(optimizedSamplesToCopy)), end); + } + } + + void fillAxisValues(DataSet& ds, int start, std::size_t nSamples) { + ds.axis_values[0].reserve(ds.axis_values[0].size() + nSamples); + for (int j = 0; j < static_cast(nSamples); j++) { + ds.axis_values[0].emplace_back(static_cast(start + j) / sample_rate); + } + } + + void initNewDataSet(DataSet& dataSet) const { dataSet.axis_names.emplace_back("time"); dataSet.axis_units.emplace_back("s"); dataSet.axis_values.resize(1UZ); @@ -214,14 +358,9 @@ using StreamFilter = StreamFilterImpl; } // namespace gr::basic -ENABLE_REFLECTION_FOR_TEMPLATE_FULL((typename T, bool streamOut, typename Matcher), (gr::basic::StreamFilterImpl), filter, in, out, filter, n_pre, n_post, n_max, sample_rate, - signal_name, signal_quantity, signal_unit, signal_min, signal_max); +ENABLE_REFLECTION_FOR_TEMPLATE_FULL((typename T, bool streamOut, typename Matcher), (gr::basic::StreamFilterImpl), filter, in, out, filter, n_pre, n_post, n_max, sample_rate, signal_name, signal_quantity, signal_unit, signal_min, signal_max); static_assert(gr::HasProcessBulkFunction>); -inline static auto registerStreamFilters - = gr::registerBlock, std::complex>( - gr::globalBlockRegistry()) - | gr::registerBlock, std::complex>( - gr::globalBlockRegistry()); +inline static auto registerStreamFilters = gr::registerBlock, std::complex>(gr::globalBlockRegistry()) | gr::registerBlock, std::complex>(gr::globalBlockRegistry()); #endif // GNURADIO_STREAMTODATASET_HPP diff --git a/blocks/basic/test/qa_StreamToDataSet.cpp b/blocks/basic/test/qa_StreamToDataSet.cpp index c19fc154..0ecb751e 100644 --- a/blocks/basic/test/qa_StreamToDataSet.cpp +++ b/blocks/basic/test/qa_StreamToDataSet.cpp @@ -111,81 +111,52 @@ const boost::ut::suite<"StreamToDataSet Block"> selectorTest = [] { tag("visual") / "default ^matcher"_test = [&runUIExample] { runUIExample(5, "[CMD_BP_START/FAIR.SELECTOR.C=1:S=1:P=1, CMD_BP_START/^FAIR.SELECTOR.C=1:S=1:P=3]", 0, 0); }; tag("visual") / "default ^matcher + pre/post"_test = [&runUIExample] { runUIExample(5, "[CMD_BP_START/FAIR.SELECTOR.C=1:S=1:P=1, CMD_BP_START/^FAIR.SELECTOR.C=1:S=1:P=3]", 30, 30); }; tag("visual") / "single trigger"_test = [&runUIExample] { runUIExample(10, "CMD_DIAG_TRIGGER1", 30, 30); }; +}; - auto runTest = [](std::string filter, gr::Size_t preSamples, gr::Size_t postSamples, std::array expectedValues, std::size_t nTags, gr::Size_t maxSamples = 10000U) { - using namespace gr; - using namespace gr::basic; - using namespace gr::testing; - using gr::tag::TRIGGER_NAME; - using gr::tag::CONTEXT; - - using namespace function_generator; - constexpr gr::Size_t N_SAMPLES = 402U; - constexpr float sample_rate = 1'000.f; - - Graph graph; +gr::Tag genTrigger(gr::Tag::signed_index_type index, std::string triggerName, std::string triggerCtx = {}) { + return {index, {{gr::tag::TRIGGER_NAME.shortKey(), triggerName}, {gr::tag::TRIGGER_TIME.shortKey(), std::uint64_t(0)}, {gr::tag::TRIGGER_OFFSET.shortKey(), 0.f}, // + {gr::tag::TRIGGER_META_INFO.shortKey(), gr::property_map{{gr::tag::CONTEXT.shortKey(), triggerCtx}}}}}; +}; - // all times are in nanoseconds - auto& clockSrc = graph.emplaceBlock>({{"sample_rate", sample_rate}, // - {"n_samples_max", N_SAMPLES}, // - {"name", "TagSource"}, // - {"verbose_console", false}, // - {"repeat_tags", true}}); - - auto genTrigger = [](Tag::signed_index_type index, std::string triggerName, std::string triggerCtx = {}) -> Tag { - return {index, {{tag::TRIGGER_NAME.shortKey(), triggerName}, // - {tag::TRIGGER_TIME.shortKey(), std::uint64_t(0)}, // - {tag::TRIGGER_OFFSET.shortKey(), 0.f}, // - {tag::TRIGGER_META_INFO.shortKey(), property_map{{tag::CONTEXT.shortKey(), triggerCtx}}}}}; +const boost::ut::suite<"StreamToStream test"> streamToStreamTest = [] { + using namespace boost::ut; + using namespace gr; + using namespace gr::basic; + using namespace gr::testing; + + auto runTestStream = [](gr::Size_t nSamples, std::string filter, gr::Size_t preSamples, gr::Size_t postSamples, const std::vector& expectedValues, std::size_t nTags, gr::Size_t maxSamples = 100000U) { + constexpr float sample_rate = 1'000.f; + Graph graph; + + auto& tagSrc = graph.emplaceBlock>({{"sample_rate", sample_rate}, // + {"n_samples_max", nSamples}, {"name", "TagSource"}, {"verbose_console", false}, {"repeat_tags", false}, {"mark_tag", false}}); + tagSrc._tags = { + genTrigger(5, "CMD_BP_START", "CMD_BP_START/FAIR.SELECTOR.C=1:S=1:P=1"), // start + genTrigger(8, "CMD_DIAG_TRIGGER1", "CMD_DIAG_TRIGGER1"), // it is also used to split samples processing into 2 iterations + genTrigger(10, "CMD_BP_START", "CMD_BP_START/FAIR.SELECTOR.C=1:S=1:P=2"), // stop + genTrigger(12, "CMD_DIAG_TRIGGER1", "CMD_DIAG_TRIGGER1"), // it is also used as end trigger for "including" mode + genTrigger(15, "CMD_BP_START", "CMD_BP_START/FAIR.SELECTOR.C=1:S=1:P=1"), // start + genTrigger(20, "CMD_BP_START", "CMD_BP_START/FAIR.SELECTOR.C=1:S=1:P=2"), // stop + genTrigger(22, "CMD_DIAG_TRIGGER1", "CMD_DIAG_TRIGGER1") // it is also used as end trigger for "including" mode }; - clockSrc._tags = { // - // TODO: refactor triggerCtx do only contain the 'FAIR.SELECTOR...' info (-> needs changes in FunctionGenerator) - genTrigger(10, "CMD_BP_START", "CMD_BP_START/FAIR.SELECTOR.C=1:S=1:P=1"), // - genTrigger(50, "CMD_DIAG_TRIGGER1", "CMD_DIAG_TRIGGER1"), // - genTrigger(100, "CMD_BP_START", "CMD_BP_START/FAIR.SELECTOR.C=1:S=1:P=2"), // - genTrigger(200, "CMD_BP_START", "CMD_BP_START/FAIR.SELECTOR.C=1:S=1:P=3"), // - genTrigger(300, "CMD_BP_START", "CMD_BP_START/FAIR.SELECTOR.C=1:S=1:P=4"), // - genTrigger(400, "CMD_BP_START", "CMD_BP_START/FAIR.SELECTOR.C=1:S=1:P=5")}; - - auto& funcGen = graph.emplaceBlock>({{"sample_rate", sample_rate}, {"name", "FunctionGenerator"}}); - const auto now = settings::convertTimePointToUint64Ns(std::chrono::system_clock::now()); - - // TODO: add graphic - expect(funcGen.settings().set(createConstPropertyMap(1.f), SettingsCtx{now, "CMD_BP_START/FAIR.SELECTOR.C=1:S=1:P=1"}).empty()); - expect(funcGen.settings().set(createConstPropertyMap(2.f), SettingsCtx{now, "CMD_BP_START/FAIR.SELECTOR.C=1:S=1:P=2"}).empty()); - expect(funcGen.settings().set(createConstPropertyMap(3.f), SettingsCtx{now, "CMD_BP_START/FAIR.SELECTOR.C=1:S=1:P=3"}).empty()); - expect(funcGen.settings().set(createConstPropertyMap(4.f), SettingsCtx{now, "CMD_BP_START/FAIR.SELECTOR.C=1:S=1:P=4"}).empty()); - expect(funcGen.settings().set(createConstPropertyMap(5.f), SettingsCtx{now, "CMD_BP_START/FAIR.SELECTOR.C=1:S=1:P=5"}).empty()); - expect(eq(ConnectionResult::SUCCESS, graph.connect<"out">(clockSrc).to<"in">(funcGen))) << "connect clockSrc->funcGen"; - - const property_map blockSettings = {{"filter", filter}, {"n_pre", preSamples}, {"n_post", postSamples}, {"n_max", maxSamples}}; - // producing stream (filtered) - auto& filterStreamToStream = graph.emplaceBlock>(blockSettings); - auto& streamSink = graph.emplaceBlock>({{"name", "streamSink"}, {"log_tags", true}, {"log_samples", true}, {"n_samples_expected", N_SAMPLES}, {"verbose_console", false}}); - expect(eq(gr::ConnectionResult::SUCCESS, graph.connect<"out">(funcGen).template to<"in">(filterStreamToStream))) << "connect funcGen->filterStreamToStream"; - expect(eq(gr::ConnectionResult::SUCCESS, graph.connect<"out">(filterStreamToStream).template to<"in">(streamSink))) << "connect filterStreamToStream->streamSink"; - // producing DataSet (filtered) - auto& filterStreamToDataSet = graph.emplaceBlock>(blockSettings); - auto& dataSetSink = graph.emplaceBlock, ProcessFunction::USE_PROCESS_BULK>>({{"name", "dataSetSink"}, {"log_tags", true}, {"log_samples", true}, {"n_samples_expected", gr::Size_t(1)}, {"verbose_console", false}}); - expect(eq(gr::ConnectionResult::SUCCESS, graph.connect<"out">(funcGen).template to<"in">(filterStreamToDataSet))) << "connect funcGen->filterStreamToDataSet"; - expect(eq(gr::ConnectionResult::SUCCESS, graph.connect<"out">(filterStreamToDataSet).template to<"in">(dataSetSink))) << "connect filterStreamToDataSet->dataSetSink"; + const property_map blockSettings = {{"filter", filter}, {"n_pre", preSamples}, {"n_post", postSamples}, {"n_max", maxSamples}}; + auto& filterStreamToStream = graph.emplaceBlock>(blockSettings); + auto& streamSink = graph.emplaceBlock>({{"name", "streamSink"}, {"log_tags", true}, {"log_samples", true}, {"verbose_console", false}}); + expect(eq(gr::ConnectionResult::SUCCESS, graph.connect<"out">(tagSrc).template to<"in">(filterStreamToStream))); + expect(eq(gr::ConnectionResult::SUCCESS, graph.connect<"out">(filterStreamToStream).template to<"in">(streamSink))); gr::scheduler::Simple sched{std::move(graph)}; - fmt::println("start test with filter: {}", filter); + fmt::println("start -> Stream-to-Stream with filter: {} n_pre:{} n_post:{}", filter, preSamples, postSamples); expect(sched.runAndWait().has_value()) << fmt::format("runAndWait - filter {}", filter); - fmt::println("start test with filter: {} -- DONE", filter); + fmt::println("done -> Stream-to-Stream with filter: {} n_pre:{} n_post:{}", filter, preSamples, postSamples); - expect(eq(clockSrc.sample_rate, sample_rate)) << "clockSrc.sample_rate"; - expect(eq(funcGen.sample_rate, sample_rate)) << "funcGen.sample_rate"; - expect(eq(filterStreamToStream.sample_rate, sample_rate)) << "filterStreamToStream.sample_rate"; - expect(eq(filterStreamToDataSet.sample_rate, sample_rate)) << "filterStreamToDataSet.sample_rate"; - expect(eq(streamSink.sample_rate, sample_rate)) << "streamSink.sample_rate"; - expect(eq(dataSetSink.sample_rate, sample_rate)) << "dataSetSink.sample_rate"; + expect(eq(tagSrc.sample_rate, sample_rate)); + expect(eq(filterStreamToStream.sample_rate, sample_rate)); + expect(eq(streamSink.sample_rate, sample_rate)); - expect(!streamSink._samples.empty()) << "streamSink._samples.empty()"; - expect(eq(streamSink._samples.front(), expectedValues.front())) << fmt::format("streamSink._samples - first sample does not match({}): {}", streamSink._samples.size(), fmt::join(streamSink._samples, ", ")); - expect(eq(streamSink._samples.back(), expectedValues.back())) << fmt::format("streamSink._samples - last sample does not match({}): {}", streamSink._samples.size(), fmt::join(streamSink._samples, ", ")); + expect(eq(streamSink._samples.size(), expectedValues.size())); + expect(std::ranges::equal(streamSink._samples, expectedValues)); expect(eq(streamSink._tags.size(), nTags)) << [&]() { std::string ret = fmt::format("Stream nTags: {}\n", streamSink._tags.size()); @@ -194,33 +165,107 @@ const boost::ut::suite<"StreamToDataSet Block"> selectorTest = [] { } return ret; }(); + }; + // We always test scenarios where no overlaps occur. If accumulation is currently active in the block, no new "Start" should happen. + // Any new Start events are ignored, and this behavior is considered undefined for stream-to-stream mode + std::vector expectedValues = {5, 6, 7, 8, 9, 15, 16, 17, 18, 19}; + "start->stop matcher (excluding)"_test = [&runTestStream, &expectedValues] { runTestStream(50U, "[CMD_BP_START/FAIR.SELECTOR.C=1:S=1:P=1, CMD_BP_START/FAIR.SELECTOR.C=1:S=1:P=2]", 0, 0, expectedValues, 3UZ /* 2 BPs (2 starts) + custom diag trigger */); }; + expectedValues = {3, 4, 5, 6, 7, 8, 9, 10, 11, 13, 14, 15, 16, 17, 18, 19, 20, 21}; + "start->stop matcher (excluding +pre/post)"_test = [&runTestStream, &expectedValues] { runTestStream(50U, "[CMD_BP_START/FAIR.SELECTOR.C=1:S=1:P=1, CMD_BP_START/FAIR.SELECTOR.C=1:S=1:P=2]", 2, 2, expectedValues, 5UZ /* 4 BPs (2 starts + 2 stops) + custom diag trigger */); }; + + expectedValues = {5, 6, 7, 8, 9, 10, 11, 15, 16, 17, 18, 19, 20, 21}; + "start->^stop matcher (including)"_test = [&runTestStream, &expectedValues] { runTestStream(50U, "[CMD_BP_START/FAIR.SELECTOR.C=1:S=1:P=1, CMD_BP_START/^FAIR.SELECTOR.C=1:S=1:P=2]", 0, 0, expectedValues, 5UZ /* 4 BPs (2 starts + 2 stops) + custom diag trigger */); }; + expectedValues = {3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23}; + "start->^stop matcher (including. +pre/post)"_test = [&runTestStream, &expectedValues] { runTestStream(50U, "[CMD_BP_START/FAIR.SELECTOR.C=1:S=1:P=1, CMD_BP_START/^FAIR.SELECTOR.C=1:S=1:P=2]", 2, 2, expectedValues, 7UZ /* 4 BPs (2 starts + 2 stops) + 3 custom diag event */); }; + + expectedValues = {6, 7, 8, 9, 10, 11, 12, 13, 20, 21, 22, 23}; + "single trigger (+pre/post)"_test = [&runTestStream, &expectedValues] { runTestStream(50U, "CMD_DIAG_TRIGGER1", 2, 2, expectedValues, 3UZ); }; +}; + +const boost::ut::suite<"StreamToDataSet test"> streamToDataSetTest = [] { + using namespace boost::ut; + using namespace gr; + using namespace gr::basic; + using namespace gr::testing; + + auto runTestDataSet = [](gr::Size_t nSamples, std::string filter, gr::Size_t preSamples, gr::Size_t postSamples, const std::vector>& expectedValues, const std::vector& nTags, gr::Size_t maxSamples = 100000U) { + using namespace gr; + using namespace gr::basic; + using namespace gr::testing; + + constexpr float sample_rate = 1'000.f; + Graph graph; + + auto& tagSrc = graph.emplaceBlock>({{"sample_rate", sample_rate}, // + {"n_samples_max", nSamples}, {"name", "TagSource"}, {"verbose_console", false}, {"repeat_tags", false}, {"mark_tag", false}}); + tagSrc._tags = { + genTrigger(5, "CMD_BP_START", "CMD_BP_START/FAIR.SELECTOR.C=1:S=1:P=1"), // start + genTrigger(8, "CMD_DIAG_TRIGGER1", "CMD_DIAG_TRIGGER1"), // it is also used to split samples processing into 2 iterations + genTrigger(10, "CMD_BP_START", "CMD_BP_START/FAIR.SELECTOR.C=1:S=1:P=2"), // stop + genTrigger(12, "CMD_DIAG_TRIGGER1", "CMD_DIAG_TRIGGER1"), // it is also used as end trigger for "including" mode + genTrigger(15, "CMD_BP_START", "CMD_BP_START/FAIR.SELECTOR.C=1:S=1:P=1"), // start + genTrigger(20, "CMD_BP_START", "CMD_BP_START/FAIR.SELECTOR.C=1:S=1:P=1"), // start + genTrigger(25, "CMD_BP_START", "CMD_BP_START/FAIR.SELECTOR.C=1:S=1:P=2"), // stop + genTrigger(27, "CMD_DIAG_TRIGGER1", "CMD_DIAG_TRIGGER1"), // it is also used as end trigger for "including" mode + genTrigger(30, "CMD_BP_START", "CMD_BP_START/FAIR.SELECTOR.C=1:S=1:P=2"), // stop + genTrigger(32, "CMD_DIAG_TRIGGER1", "CMD_DIAG_TRIGGER1") // it is also used as end trigger for "including" mode + }; + + const property_map blockSettings = {{"filter", filter}, {"n_pre", preSamples}, {"n_post", postSamples}, {"n_max", maxSamples}}; + auto& filterStreamToDataSet = graph.emplaceBlock>(blockSettings); + auto& dataSetSink = graph.emplaceBlock, ProcessFunction::USE_PROCESS_BULK>>({{"name", "dataSetSink"}, {"log_tags", true}, {"log_samples", true}, {"verbose_console", false}}); + expect(eq(gr::ConnectionResult::SUCCESS, graph.connect<"out">(tagSrc).template to<"in">(filterStreamToDataSet))); + expect(eq(gr::ConnectionResult::SUCCESS, graph.connect<"out">(filterStreamToDataSet).template to<"in">(dataSetSink))); - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - expect(!dataSetSink._samples.empty()) << "dataSetSink did not receive the required minimum data"; - if (dataSetSink._samples.empty()) { - return; + gr::scheduler::Simple sched{std::move(graph)}; + fmt::println("start -> Stream-to-DataSet with filter: {} n_pre:{} n_post:{}", filter, preSamples, postSamples); + expect(sched.runAndWait().has_value()) << fmt::format("runAndWait - filter {}", filter); + fmt::println("done -> Stream-to-DataSet with filter: {} n_pre:{} n_post:{}", filter, preSamples, postSamples); + + expect(eq(tagSrc.sample_rate, sample_rate)); + expect(eq(filterStreamToDataSet.sample_rate, sample_rate)); + expect(eq(dataSetSink.sample_rate, sample_rate)); + + expect(eq(dataSetSink._samples.size(), expectedValues.size())); + for (std::size_t i = 0; i < dataSetSink._samples.size(); i++) { + const DataSet& ds = dataSetSink._samples.at(i); + expect(std::ranges::equal(ds.signal_values, expectedValues[i])); + + expect(fatal(eq(ds.timing_events.size(), 1UZ))); + const std::vector& timingEvt0 = ds.timing_events[0]; + expect(eq(timingEvt0.size(), nTags[i])) << [&]() { + std::string ret = fmt::format("DataSet nTags: {}\n", timingEvt0.size()); + for (const auto& tag : timingEvt0) { + ret += fmt::format("tag.index: {} .map: {}\n", tag.index, tag.map); + } + return ret; + }(); } - const DataSet& dataSet = dataSetSink._samples.at(0UZ); - expect(ge(streamSink._samples.size(), dataSet.signal_values.size())) << "streamSink needs to receive correct amount of samples"; - - expect(fatal(!dataSet.signal_values.empty())) << "no samples in DataSet"; - expect(eq(dataSet.signal_values.front(), expectedValues.front())) << fmt::format("dataSet.signal_values - first sample does not match ({}): {}", dataSet.signal_values.size(), fmt::join(dataSet.signal_values, ", ")); - expect(eq(dataSet.signal_values.back(), expectedValues.back())) << fmt::format("dataSet.signal_values - last sample does not match({}): {}", dataSet.signal_values.size(), fmt::join(dataSet.signal_values, ", ")); - expect(fatal(eq(dataSet.timing_events.size(), 1UZ))) << "dataSetSink._samples[0] -> DataSet - timing_events match"; - const std::vector& timingEvt0 = dataSet.timing_events[0]; - expect(eq(timingEvt0.size(), nTags)) << [&]() { - std::string ret = fmt::format("DataSet nTags: {}\n", timingEvt0.size()); - for (const auto& tag : timingEvt0) { - ret += fmt::format("tag.index: {} .map: {}\n", tag.index, tag.map); - } - return ret; - }(); }; - "start->stop matcher (excluding)"_test = [&runTest] { runTest("[CMD_BP_START/FAIR.SELECTOR.C=1:S=1:P=1, CMD_BP_START/FAIR.SELECTOR.C=1:S=1:P=4]", 0, 0, {1.f, 3.f}, 4UZ /* 3 BPs + custom diag event */); }; - "start->^stop matcher (including)"_test = [&runTest] { runTest("[CMD_BP_START/FAIR.SELECTOR.C=1:S=1:P=1, CMD_BP_START/^FAIR.SELECTOR.C=1:S=1:P=3]", 0, 0, {1.f, 3.f}, 4UZ /* 3 BPs + custom diag event */); }; - "start->^stop matcher (including. +pre/post)"_test = [&runTest] { runTest("[CMD_BP_START/FAIR.SELECTOR.C=1:S=1:P=1, CMD_BP_START/^FAIR.SELECTOR.C=1:S=1:P=3]", 30, 30, {0.f, 4.f}, 5UZ /* 3+1 BPs (because of +30 post samples ranging into P=4) + custom diag event */); }; - "single trigger (+pre/post)"_test = [&runTest] { runTest("CMD_DIAG_TRIGGER1", 30, 30, {1.f, 1.f}, 1UZ); }; + std::vector> expectedValues = {{5, 6, 7, 8, 9}, {15, 16, 17, 18, 19, 20, 21, 22, 23, 24}, {20, 21, 22, 23, 24, 25, 26, 27, 28, 29}}; + "start->stop matcher (excluding)"_test = [&runTestDataSet, &expectedValues] { runTestDataSet(50U, "[CMD_BP_START/FAIR.SELECTOR.C=1:S=1:P=1, CMD_BP_START/FAIR.SELECTOR.C=1:S=1:P=2]", 0, 0, expectedValues, {3UZ, 3UZ, 4UZ}); }; + + expectedValues = {{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, // + {8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31}, // + {13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36}}; + "start->stop matcher (excluding +pre/post)"_test = [&runTestDataSet, &expectedValues] { runTestDataSet(50U, "[CMD_BP_START/FAIR.SELECTOR.C=1:S=1:P=1, CMD_BP_START/FAIR.SELECTOR.C=1:S=1:P=2]", 7, 7, expectedValues, {5UZ, 5UZ, 5UZ}); }; + + expectedValues = {{5, 6, 7, 8, 9, 10, 11}, // + {15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26}, // + {20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31}}; + "start->^stop matcher (including)"_test = [&runTestDataSet, &expectedValues] { runTestDataSet(50U, "[CMD_BP_START/FAIR.SELECTOR.C=1:S=1:P=1, CMD_BP_START/^FAIR.SELECTOR.C=1:S=1:P=2]", 0, 0, expectedValues, {4UZ, 4UZ, 5UZ}); }; + + expectedValues = {{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18}, // + {8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33}, // + {13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38}}; + "start->^stop matcher (including. +pre/post)"_test = [&runTestDataSet, &expectedValues] { runTestDataSet(50U, "[CMD_BP_START/FAIR.SELECTOR.C=1:S=1:P=1, CMD_BP_START/^FAIR.SELECTOR.C=1:S=1:P=2]", 7, 7, expectedValues, {5UZ, 6UZ, 5UZ}); }; + + expectedValues = {{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14}, // + {5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18}, // + {20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33}, // + {25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38}}; + "single trigger (+pre/post)"_test = [&runTestDataSet, &expectedValues] { runTestDataSet(50U, "CMD_DIAG_TRIGGER1", 7, 7, expectedValues, {3UZ, 2UZ, 3UZ, 1UZ}); }; }; int main() { /* not needed for UT */ }