From e5f466feca3c29c26c088209ed0dcfc6994ebc08 Mon Sep 17 00:00:00 2001 From: drslebedev Date: Thu, 11 Jul 2024 16:25:56 +0200 Subject: [PATCH] Performance optimization for MultiProducerStrategy * AtomicBitset bulk set/reset * Rename to Single(Multi)ProducerStrategy * Eliminate usage of reserve(), use tryReserve() instead * Remove warnings Signed-off-by: drslebedev --- .../include/gnuradio-4.0/basic/DataSink.hpp | 101 ++++++---- core/include/gnuradio-4.0/AtomicBitset.hpp | 132 +++++++++++++ core/include/gnuradio-4.0/Block.hpp | 10 +- core/include/gnuradio-4.0/CircularBuffer.hpp | 10 +- core/include/gnuradio-4.0/ClaimStrategy.hpp | 160 +++++----------- core/include/gnuradio-4.0/Port.hpp | 12 +- core/test/CMakeLists.txt | 1 + core/test/qa_AtomicBitset.cpp | 177 ++++++++++++++++++ core/test/qa_buffer.cpp | 97 ---------- 9 files changed, 445 insertions(+), 255 deletions(-) create mode 100644 core/include/gnuradio-4.0/AtomicBitset.hpp create mode 100644 core/test/qa_AtomicBitset.cpp diff --git a/blocks/basic/include/gnuradio-4.0/basic/DataSink.hpp b/blocks/basic/include/gnuradio-4.0/basic/DataSink.hpp index f284a02d..0b6a4116 100644 --- a/blocks/basic/include/gnuradio-4.0/basic/DataSink.hpp +++ b/blocks/basic/include/gnuradio-4.0/basic/DataSink.hpp @@ -698,28 +698,34 @@ synchronously (/asynchronously) if handled by the same (/different) sink block. void setMetadata(detail::Metadata metadata) override { dataset_template = detail::makeDataSetTemplate(std::move(metadata)); } - inline void publishDataSet(DataSet&& data) { + inline bool publishDataSet(DataSet&& data) { if constexpr (!std::is_same_v) { callback(std::move(data)); + return true; } else { auto poller = polling_handler.lock(); if (!poller) { this->setExpired(); - return; + return false; } - auto writeData = poller->writer.reserve(1); - if (block) { - writeData[0] = std::move(data); - writeData.publish(1); + auto writeData = poller->writer.tryReserve(1); + if (writeData.empty()) { + return false; } else { - if (poller->writer.available() > 0) { + if (block) { writeData[0] = std::move(data); writeData.publish(1); } else { - poller->drop_count++; + if (poller->writer.available() > 0) { + writeData[0] = std::move(data); + writeData.publish(1); + } else { + poller->drop_count++; + } } } + return true; } } @@ -742,8 +748,10 @@ synchronously (/asynchronously) if handled by the same (/different) sink block. window->pending_post_samples -= postSampleView.size(); if (window->pending_post_samples == 0) { - this->publishDataSet(std::move(window->dataset)); - window = pending_trigger_windows.erase(window); + bool published = this->publishDataSet(std::move(window->dataset)); + if (published) { + window = pending_trigger_windows.erase(window); + } } else { ++window; } @@ -753,7 +761,10 @@ synchronously (/asynchronously) if handled by the same (/different) sink block. void stop() override { for (auto& window : pending_trigger_windows) { if (!window.dataset.signal_values.empty()) { - this->publishDataSet(std::move(window.dataset)); + bool published = this->publishDataSet(std::move(window.dataset)); + if (!published) { + throw gr::exception("DataSink::TriggerListener::stop() can not publish pending datasets\n"); + } } } pending_trigger_windows.clear(); @@ -781,28 +792,34 @@ synchronously (/asynchronously) if handled by the same (/different) sink block. void setMetadata(detail::Metadata metadata) override { dataset_template = detail::makeDataSetTemplate(std::move(metadata)); } - inline void publishDataSet(DataSet&& data) { + inline bool publishDataSet(DataSet&& data) { if constexpr (!std::is_same_v) { callback(std::move(data)); + return true; } else { auto poller = polling_handler.lock(); if (!poller) { this->setExpired(); - return; + return false; } - auto writeData = poller->writer.reserve(1); - if (block) { - writeData[0] = std::move(data); - writeData.publish(1); + auto writeData = poller->writer.tryReserve(1); + if (writeData.empty()) { + return false; } else { - if (poller->writer.available() > 0) { + if (block) { writeData[0] = std::move(data); writeData.publish(1); } else { - poller->drop_count++; + if (poller->writer.available() > 0) { + writeData[0] = std::move(data); + writeData.publish(1); + } else { + poller->drop_count++; + } } } + return true; } } @@ -814,8 +831,10 @@ synchronously (/asynchronously) if handled by the same (/different) sink block. if (obsr == trigger::MatchResult::NotMatching) { pending_dataset->timing_events[0].push_back({static_cast(pending_dataset->signal_values.size()), *tagData0}); } - this->publishDataSet(std::move(*pending_dataset)); - pending_dataset.reset(); + bool published = this->publishDataSet(std::move(*pending_dataset)); + if (published) { + pending_dataset.reset(); + } } } if (obsr == trigger::MatchResult::Matching) { @@ -830,15 +849,20 @@ synchronously (/asynchronously) if handled by the same (/different) sink block. pending_dataset->signal_values.insert(pending_dataset->signal_values.end(), view.begin(), view.end()); if (pending_dataset->signal_values.size() == maximumWindowSize) { - this->publishDataSet(std::move(*pending_dataset)); - pending_dataset.reset(); + bool published = this->publishDataSet(std::move(*pending_dataset)); + if (published) { + pending_dataset.reset(); + } } } } void stop() override { if (pending_dataset) { - this->publishDataSet(std::move(*pending_dataset)); + bool published = this->publishDataSet(std::move(*pending_dataset)); + if (!published) { + throw gr::exception("DataSink::MultiplexedListener::stop() can not publish pending datasets\n"); + } pending_dataset.reset(); } if (auto p = polling_handler.lock()) { @@ -875,28 +899,34 @@ synchronously (/asynchronously) if handled by the same (/different) sink block. dataset_template = detail::makeDataSetTemplate(std::move(metadata)); } - inline void publishDataSet(DataSet&& data) { + inline bool publishDataSet(DataSet&& data) { if constexpr (!std::is_same_v) { callback(std::move(data)); + return true; } else { auto poller = polling_handler.lock(); if (!poller) { this->setExpired(); - return; + return false; } - auto writeData = poller->writer.reserve(1); - if (block) { - writeData[0] = std::move(data); - writeData.publish(1); + auto writeData = poller->writer.tryReserve(1); + if (writeData.empty()) { + return false; } else { - if (poller->writer.available() > 0) { + if (block) { writeData[0] = std::move(data); writeData.publish(1); } else { - poller->drop_count++; + if (poller->writer.available() > 0) { + writeData[0] = std::move(data); + writeData.publish(1); + } else { + poller->drop_count++; + } } } + return true; } } @@ -918,9 +948,10 @@ synchronously (/asynchronously) if handled by the same (/different) sink block. DataSet dataset = dataset_template; dataset.timing_events = {{{-static_cast(it->delay), std::move(it->tag_data)}}}; dataset.signal_values = {inData[it->pending_samples]}; - this->publishDataSet(std::move(dataset)); - - it = pending.erase(it); + bool published = this->publishDataSet(std::move(dataset)); + if (published) { + it = pending.erase(it); + } } } diff --git a/core/include/gnuradio-4.0/AtomicBitset.hpp b/core/include/gnuradio-4.0/AtomicBitset.hpp new file mode 100644 index 00000000..6c52b798 --- /dev/null +++ b/core/include/gnuradio-4.0/AtomicBitset.hpp @@ -0,0 +1,132 @@ +#ifndef GNURADIO_ATOMICBITSET_HPP +#define GNURADIO_ATOMICBITSET_HPP + +#include + +#ifndef forceinline +// use this for hot-spots only <-> may bloat code size, not fit into cache and consequently slow down execution +#define forceinline inline __attribute__((always_inline)) +#endif + +namespace gr { +/* + * `AtomicBitset` is a lock-free, thread-safe bitset. + * It allows for efficient and thread-safe manipulation of individual bits. + * For bulk set or reset atomic operation is guaranteed per individual bit (word). + */ +template +class AtomicBitset { + static_assert(Size > 0, "Size must be greater than 0"); + static constexpr bool isSizeDynamic = Size == std::dynamic_extent; + + static constexpr std::size_t _bitsPerWord = sizeof(size_t) * 8UZ; + static constexpr std::size_t _nStaticWords = isSizeDynamic ? 1UZ : (Size + _bitsPerWord - 1UZ) / _bitsPerWord; + + // using DynamicArrayType = std::unique_ptr[]>; + using DynamicArrayType = std::vector>; + using StaticArrayType = std::array, _nStaticWords>; + using ArrayType = std::conditional_t; + + std::size_t _size = Size; + ArrayType _bits; + +public: + AtomicBitset() + requires(!isSizeDynamic) + { + for (auto& word : _bits) { + word.store(0UL, std::memory_order_relaxed); + } + } + + explicit AtomicBitset(std::size_t size = 0UZ) + requires(isSizeDynamic) + : _size(size), _bits(std::vector>(size)) { + // assert(size > 0UZ); + for (std::size_t i = 0; i < _size; i++) { + _bits[i].store(0UL, std::memory_order_relaxed); + } + } + + void set(std::size_t bitPosition, bool value) { + assert(bitPosition < _size); + const std::size_t wordIndex = bitPosition / _bitsPerWord; + const std::size_t bitIndex = bitPosition % _bitsPerWord; + const std::size_t mask = 1UL << bitIndex; + + std::size_t oldBits; + std::size_t newBits; + do { + oldBits = _bits[wordIndex].load(std::memory_order_relaxed); + newBits = value ? (oldBits | mask) : (oldBits & ~mask); + } while (!_bits[wordIndex].compare_exchange_weak(oldBits, newBits, std::memory_order_release, std::memory_order_relaxed)); + } + + void set(std::size_t begin, std::size_t end, bool value) { + assert(begin <= end && end <= _size); // [begin, end) + + std::size_t beginWord = begin / _bitsPerWord; + std::size_t endWord = end / _bitsPerWord; + const std::size_t beginOffset = begin % _bitsPerWord; + const std::size_t endOffset = end % _bitsPerWord; + + if (begin == end) { + return; + } else if (beginWord == endWord) { + // the range is within a single word + setBitsInWord(beginWord, beginOffset, endOffset, value); + } else { + // leading bits in the first word + if (beginOffset != 0) { + setBitsInWord(beginWord, beginOffset, _bitsPerWord, value); + beginWord++; + } + // trailing bits in the last word + if (endOffset != 0) { + setBitsInWord(endWord, 0, endOffset, value); + } + endWord--; + // whole words in the middle + for (std::size_t wordIndex = beginWord; wordIndex <= endWord; ++wordIndex) { + setFullWord(wordIndex, value); + } + } + } + + void set(std::size_t bitPosition) { set(bitPosition, true); } + + void set(std::size_t begin, std::size_t end) { set(begin, end, true); } + + void reset(std::size_t bitPosition) { set(bitPosition, false); } + + void reset(std::size_t begin, std::size_t end) { set(begin, end, false); } + + bool test(std::size_t bitPosition) const { + assert(bitPosition < _size); + const std::size_t wordIndex = bitPosition / _bitsPerWord; + const std::size_t bitIndex = bitPosition % _bitsPerWord; + const std::size_t mask = 1UL << bitIndex; + + return (_bits[wordIndex].load(std::memory_order_acquire) & mask) != 0; + } + + [[nodiscard]] constexpr std::size_t size() const { return _size; } + +private: + void setBitsInWord(std::size_t wordIndex, std::size_t begin, std::size_t end, bool value) { + assert(begin < end && end <= _bitsPerWord); + const std::size_t mask = (end == _bitsPerWord) ? ~((1UL << begin) - 1) : ((1UL << end) - 1) & ~((1UL << begin) - 1); + std::size_t oldBits; + std::size_t newBits; + do { + oldBits = _bits[wordIndex].load(std::memory_order_relaxed); + newBits = value ? (oldBits | mask) : (oldBits & ~mask); + } while (!_bits[wordIndex].compare_exchange_weak(oldBits, newBits, std::memory_order_release, std::memory_order_relaxed)); + } + + forceinline void setFullWord(std::size_t wordIndex, bool value) { _bits[wordIndex].store(value ? ~0UL : 0UL, std::memory_order_release); } +}; + +} // namespace gr + +#endif // GNURADIO_ATOMICBITSET_HPP diff --git a/core/include/gnuradio-4.0/Block.hpp b/core/include/gnuradio-4.0/Block.hpp index c22ad32e..0a4cfc35 100644 --- a/core/include/gnuradio-4.0/Block.hpp +++ b/core/include/gnuradio-4.0/Block.hpp @@ -688,7 +688,7 @@ class Block : public lifecycle::StateMachine, public std::tuple(auto, OutputRange& outputRange) { auto processOneRange = [nSamples](Out& out) { - if constexpr (Out::isMultiThreadedStrategy()) { + if constexpr (Out::isMultiProducerStrategy()) { if (!out.isFullyPublished()) { std::abort(); } @@ -1737,8 +1737,12 @@ class Block : public lifecycle::StateMachine, public std::tuplecmd = Final; // N.B. could enable/allow for partial if we return multiple messages (e.g. using coroutines?) retMessage->serviceName = unique_name; - PublishableSpan auto msgSpan = msgOut.streamWriter().reserve(1UZ); - msgSpan[0] = *retMessage; + PublishableSpan auto msgSpan = msgOut.streamWriter().tryReserve(1UZ); + if (msgSpan.empty()) { + throw gr::exception(fmt::format("{}::processMessages() can not reserve span for message\n", name)); + } else { + msgSpan[0] = *retMessage; + } } // - end - for (const auto &message : messages) { .. } diff --git a/core/include/gnuradio-4.0/CircularBuffer.hpp b/core/include/gnuradio-4.0/CircularBuffer.hpp index 1c6211c6..50d9398c 100644 --- a/core/include/gnuradio-4.0/CircularBuffer.hpp +++ b/core/include/gnuradio-4.0/CircularBuffer.hpp @@ -335,15 +335,15 @@ class CircularBuffer { _parent->_buffer->_claimStrategy.publish(_parent->_offset, _parent->nSamplesPublished()); _parent->_offset += static_cast(_parent->nSamplesPublished()); #ifndef NDEBUG - if constexpr (isMultiThreadedStrategy()) { + if constexpr (isMultiProducerStrategy()) { if (!isFullyPublished()) { - fmt::print(stderr, "CircularBuffer::multiple_writer::PublishableOutputRange() - did not publish {} samples\n", _parent->_internalSpan.size() - _parent->_nSamplesPublished); + fmt::print(stderr, "CircularBuffer::MultiWriter::PublishableOutputRange() - did not publish {} samples\n", _parent->_internalSpan.size() - _parent->_nSamplesPublished); std::abort(); } } else { if (!_parent->_internalSpan.empty() && !isPublished()) { - fmt::print(stderr, "CircularBuffer::single_writer::PublishableOutputRange() - omitted publish call for {} reserved samples\n", _parent->_internalSpan.size()); + fmt::print(stderr, "CircularBuffer::SingleWriter::PublishableOutputRange() - omitted publish call for {} reserved samples\n", _parent->_internalSpan.size()); std::abort(); } } @@ -370,7 +370,7 @@ class CircularBuffer { [[nodiscard]] constexpr std::size_t samplesToPublish() const noexcept { return _parent->_nSamplesPublished; } [[nodiscard]] constexpr bool isPublished() const noexcept { return _parent->_isRangePublished; } [[nodiscard]] constexpr bool isFullyPublished() const noexcept { return _parent->_internalSpan.size() == _parent->_nSamplesPublished; } - [[nodiscard]] constexpr static bool isMultiThreadedStrategy() noexcept { return std::is_base_of_v, ClaimType>; } + [[nodiscard]] constexpr static bool isMultiProducerStrategy() noexcept { return std::is_base_of_v, ClaimType>; } [[nodiscard]] constexpr static SpanReleasePolicy spanReleasePolicy() noexcept { return policy; } constexpr void publish(std::size_t nSamplesToPublish) noexcept { @@ -478,7 +478,7 @@ class CircularBuffer { private: constexpr void checkIfCanReserveAndAbortIfNeeded() const noexcept { - if constexpr (std::is_base_of_v, ClaimType>) { + if constexpr (std::is_base_of_v, ClaimType>) { if (_internalSpan.size() - _nSamplesPublished != 0) { fmt::print(stderr, "An error occurred: The method CircularBuffer::MultiWriter::reserve() was invoked for the second time in succession, " diff --git a/core/include/gnuradio-4.0/ClaimStrategy.hpp b/core/include/gnuradio-4.0/ClaimStrategy.hpp index d2c3b276..fd61c6fa 100644 --- a/core/include/gnuradio-4.0/ClaimStrategy.hpp +++ b/core/include/gnuradio-4.0/ClaimStrategy.hpp @@ -11,93 +11,12 @@ #include +#include "AtomicBitset.hpp" #include "Sequence.hpp" #include "WaitStrategy.hpp" namespace gr { -namespace detail { - -/* - * `AtomicBitset` is a lock-free, thread-safe bitset. - * It allows for efficient and thread-safe manipulation of individual bits. - */ -template -class AtomicBitset { - static_assert(Size > 0, "Size must be greater than 0"); - static constexpr bool isSizeDynamic = Size == std::dynamic_extent; - - static constexpr std::size_t _bitsPerWord = sizeof(size_t) * 8UZ; - static constexpr std::size_t _nStaticWords = isSizeDynamic ? 1UZ : (Size + _bitsPerWord - 1UZ) / _bitsPerWord; - - // using DynamicArrayType = std::unique_ptr[]>; - using DynamicArrayType = std::vector>; - using StaticArrayType = std::array, _nStaticWords>; - using ArrayType = std::conditional_t; - - std::size_t _size = Size; - ArrayType _bits; - -public: - AtomicBitset() - requires(!isSizeDynamic) - { - for (auto& word : _bits) { - word.store(0, std::memory_order_relaxed); - } - } - - explicit AtomicBitset(std::size_t size = 0UZ) - requires(isSizeDynamic) - : _size(size), _bits(std::vector>(size)) { - // assert(size > 0UZ); - for (std::size_t i = 0; i < _size; i++) { - _bits[i].store(0, std::memory_order_relaxed); - } - } - - void set(std::size_t bitPosition) { - assert(bitPosition < _size); - const std::size_t wordIndex = bitPosition / _bitsPerWord; - const std::size_t bitIndex = bitPosition % _bitsPerWord; - const std::size_t mask = 1UL << bitIndex; - - std::size_t oldBits; - std::size_t newBits; - do { - oldBits = _bits[wordIndex].load(std::memory_order_relaxed); - newBits = oldBits | mask; - } while (!_bits[wordIndex].compare_exchange_weak(oldBits, newBits, std::memory_order_release, std::memory_order_relaxed)); - } - - void reset(std::size_t bitPosition) { - assert(bitPosition < _size); - const std::size_t wordIndex = bitPosition / _bitsPerWord; - const std::size_t bitIndex = bitPosition % _bitsPerWord; - const std::size_t mask = ~(1UL << bitIndex); - - std::size_t oldBits; - std::size_t newBits; - do { - oldBits = _bits[wordIndex].load(std::memory_order_relaxed); - newBits = oldBits & mask; - } while (!_bits[wordIndex].compare_exchange_weak(oldBits, newBits, std::memory_order_release, std::memory_order_relaxed)); - } - - bool test(std::size_t bitPosition) const { - assert(bitPosition < _size); - const std::size_t wordIndex = bitPosition / _bitsPerWord; - const std::size_t bitIndex = bitPosition % _bitsPerWord; - const std::size_t mask = 1UL << bitIndex; - - return (_bits[wordIndex].load(std::memory_order_acquire) & mask) != 0; - } - - [[nodiscard]] constexpr std::size_t size() const { return _size; } -}; - -} // namespace detail - template concept ClaimStrategyLike = requires(T /*const*/ t, const Sequence::signed_index_type sequence, const Sequence::signed_index_type offset, const std::size_t nSlotsToClaim) { { t.next(nSlotsToClaim) } -> std::same_as; @@ -107,7 +26,7 @@ concept ClaimStrategyLike = requires(T /*const*/ t, const Sequence::signed_index }; template -class alignas(hardware_constructive_interference_size) SingleThreadedStrategy { +class alignas(hardware_constructive_interference_size) SingleProducerStrategy { using signed_index_type = Sequence::signed_index_type; const std::size_t _size = SIZE; @@ -118,16 +37,16 @@ class alignas(hardware_constructive_interference_size) SingleThreadedStrategy { TWaitStrategy _waitStrategy; std::shared_ptr>> _readSequences{std::make_shared>>()}; // list of dependent reader sequences - explicit SingleThreadedStrategy(const std::size_t bufferSize = SIZE) : _size(bufferSize){}; - SingleThreadedStrategy(const SingleThreadedStrategy&) = delete; - SingleThreadedStrategy(const SingleThreadedStrategy&&) = delete; - void operator=(const SingleThreadedStrategy&) = delete; + explicit SingleProducerStrategy(const std::size_t bufferSize = SIZE) : _size(bufferSize){}; + SingleProducerStrategy(const SingleProducerStrategy&) = delete; + SingleProducerStrategy(const SingleProducerStrategy&&) = delete; + void operator=(const SingleProducerStrategy&) = delete; signed_index_type next(const std::size_t nSlotsToClaim = 1) noexcept { assert((nSlotsToClaim > 0 && nSlotsToClaim <= static_cast(_size)) && "nSlotsToClaim must be > 0 and <= bufferSize"); SpinWait spinWait; - while (getRemainingCapacity() < nSlotsToClaim) { // while not enough slots in buffer + while (getRemainingCapacity() < static_cast(nSlotsToClaim)) { // while not enough slots in buffer if constexpr (hasSignalAllWhenBlocking) { _waitStrategy.signalAllWhenBlocking(); } @@ -139,9 +58,8 @@ class alignas(hardware_constructive_interference_size) SingleThreadedStrategy { [[nodiscard]] std::optional tryNext(const std::size_t nSlotsToClaim) noexcept { assert((nSlotsToClaim > 0 && nSlotsToClaim <= static_cast(_size)) && "nSlotsToClaim must be > 0 and <= bufferSize"); - static_cast(_size) < nSlotsToClaim + _reserveCursor - getMinReaderCursor(); - if (getRemainingCapacity() < nSlotsToClaim) { // not enough slots in buffer + if (getRemainingCapacity() < static_cast(nSlotsToClaim)) { // not enough slots in buffer return std::nullopt; } _reserveCursor += nSlotsToClaim; @@ -168,7 +86,7 @@ class alignas(hardware_constructive_interference_size) SingleThreadedStrategy { } }; -static_assert(ClaimStrategyLike>); +static_assert(ClaimStrategyLike>); /** * Claim strategy for claiming sequences for access to a data structure while tracking dependent Sequences. @@ -180,12 +98,12 @@ static_assert(ClaimStrategyLike>); */ template requires(SIZE == std::dynamic_extent || std::has_single_bit(SIZE)) -class alignas(hardware_constructive_interference_size) MultiThreadedStrategy { +class alignas(hardware_constructive_interference_size) MultiProducerStrategy { using signed_index_type = Sequence::signed_index_type; - detail::AtomicBitset _slotStates; // tracks the state of each ringbuffer slot, true -> completed and ready to be read - const std::size_t _size = SIZE; - const std::size_t _mask = SIZE - 1; + AtomicBitset _slotStates; // tracks the state of each ringbuffer slot, true -> completed and ready to be read + const std::size_t _size = SIZE; + const std::size_t _mask = SIZE - 1; public: Sequence _reserveCursor; // slots can be reserved starting from _reserveCursor @@ -193,19 +111,19 @@ class alignas(hardware_constructive_interference_size) MultiThreadedStrategy { TWaitStrategy _waitStrategy; std::shared_ptr>> _readSequences{std::make_shared>>()}; // list of dependent reader sequences - MultiThreadedStrategy() = delete; + MultiProducerStrategy() = delete; - explicit MultiThreadedStrategy() + explicit MultiProducerStrategy() requires(SIZE != std::dynamic_extent) {} - explicit MultiThreadedStrategy(std::size_t bufferSize) + explicit MultiProducerStrategy(std::size_t bufferSize) requires(SIZE == std::dynamic_extent) - : _slotStates(detail::AtomicBitset<>(bufferSize)), _size(bufferSize), _mask(bufferSize - 1) {} + : _slotStates(AtomicBitset<>(bufferSize)), _size(bufferSize), _mask(bufferSize - 1) {} - MultiThreadedStrategy(const MultiThreadedStrategy&) = delete; - MultiThreadedStrategy(const MultiThreadedStrategy&&) = delete; - void operator=(const MultiThreadedStrategy&) = delete; + MultiProducerStrategy(const MultiProducerStrategy&) = delete; + MultiProducerStrategy(const MultiProducerStrategy&&) = delete; + void operator=(const MultiProducerStrategy&) = delete; [[nodiscard]] signed_index_type next(std::size_t nSlotsToClaim = 1) { assert((nSlotsToClaim > 0 && nSlotsToClaim <= static_cast(_size)) && "nSlotsToClaim must be > 0 and <= bufferSize"); @@ -249,9 +167,10 @@ class alignas(hardware_constructive_interference_size) MultiThreadedStrategy { [[nodiscard]] forceinline signed_index_type getRemainingCapacity() const noexcept { return static_cast(_size) - (_reserveCursor.value() - getMinReaderCursor()); } void publish(signed_index_type offset, std::size_t nSlotsToClaim) { - for (std::size_t i = 0; i < nSlotsToClaim; i++) { - _slotStates.set((offset + i) & _mask); // mark slots as published + if (nSlotsToClaim == 0) { + return; } + setSlotsStates(offset, offset + static_cast(nSlotsToClaim), true); // ensure publish cursor is only advanced after all prior slots are published signed_index_type currentPublishCursor; @@ -260,15 +179,13 @@ class alignas(hardware_constructive_interference_size) MultiThreadedStrategy { currentPublishCursor = _publishCursor.value(); nextPublishCursor = currentPublishCursor; - while (_slotStates.test(nextPublishCursor & _mask) && nextPublishCursor - currentPublishCursor < _slotStates.size()) { + while (_slotStates.test(static_cast(nextPublishCursor) & _mask) && static_cast(nextPublishCursor - currentPublishCursor) < _slotStates.size()) { nextPublishCursor++; } } while (!_publishCursor.compareAndSet(currentPublishCursor, nextPublishCursor)); // clear completed slots up to the new published cursor - for (std::size_t seq = static_cast(currentPublishCursor); seq < nextPublishCursor; seq++) { - _slotStates.reset(seq & _mask); - } + setSlotsStates(currentPublishCursor, nextPublishCursor, false); if constexpr (hasSignalAllWhenBlocking) { _waitStrategy.signalAllWhenBlocking(); @@ -282,9 +199,30 @@ class alignas(hardware_constructive_interference_size) MultiThreadedStrategy { } return std::ranges::min(*_readSequences | std::views::transform([](const auto& cursor) { return cursor->value(); })); } + + void setSlotsStates(signed_index_type seqBegin, signed_index_type seqEnd, bool value) { + const std::size_t beginSet = static_cast(seqBegin) & _mask; + const std::size_t endSet = static_cast(seqEnd) & _mask; + const auto diffReset = static_cast(seqEnd - seqBegin); + + if (beginSet == endSet && beginSet == 0UZ && diffReset == _size) { + _slotStates.set(0UZ, _size, value); + } else if (beginSet <= endSet) { + _slotStates.set(beginSet, endSet, value); + } else { + _slotStates.set(beginSet, _size, value); + if (endSet > 0UZ) { + _slotStates.set(0UZ, endSet, value); + } + } + // Non-bulk AtomicBitset API + // for (std::size_t seq = static_cast(seqBegin); seq < static_cast(seqEnd); seq++) { + // _slotStates.set(seq & _mask, value); + // } + } }; -static_assert(ClaimStrategyLike>); +static_assert(ClaimStrategyLike>); enum class ProducerType { /** @@ -304,12 +242,12 @@ struct producer_type; template struct producer_type { - using value_type = SingleThreadedStrategy; + using value_type = SingleProducerStrategy; }; template struct producer_type { - using value_type = MultiThreadedStrategy; + using value_type = MultiProducerStrategy; }; template diff --git a/core/include/gnuradio-4.0/Port.hpp b/core/include/gnuradio-4.0/Port.hpp index 5409515f..43264a4e 100644 --- a/core/include/gnuradio-4.0/Port.hpp +++ b/core/include/gnuradio-4.0/Port.hpp @@ -670,10 +670,14 @@ struct Port { return false; } { - auto outTags = tagWriter().reserve(1UZ); - outTags[0].index = _cachedTag.index; - outTags[0].map = _cachedTag.map; - outTags.publish(1UZ); + PublishableSpan auto outTags = tagWriter().tryReserve(1UZ); + if (!outTags.empty()) { + outTags[0].index = _cachedTag.index; + outTags[0].map = _cachedTag.map; + outTags.publish(1UZ); + } else { + return false; + } } _cachedTag.reset(); diff --git a/core/test/CMakeLists.txt b/core/test/CMakeLists.txt index 731121a9..0aa71440 100644 --- a/core/test/CMakeLists.txt +++ b/core/test/CMakeLists.txt @@ -32,6 +32,7 @@ function(add_app_test TEST_NAME) endfunction() add_ut_test(qa_buffer) +add_ut_test(qa_AtomicBitset) add_ut_test(qa_DynamicBlock) add_ut_test(qa_DynamicPort) add_ut_test(qa_HierBlock) diff --git a/core/test/qa_AtomicBitset.cpp b/core/test/qa_AtomicBitset.cpp new file mode 100644 index 00000000..da6cf5cf --- /dev/null +++ b/core/test/qa_AtomicBitset.cpp @@ -0,0 +1,177 @@ +#include +#include +#include +#include +#include +#include + +#include + +#include +#include + +#include +#include +#include +#include +#include +#include + +template +void runAtomicBitsetTest(TBitset& bitset, std::size_t bitsetSize) { + using namespace boost::ut; + + expect(eq(bitset.size(), bitsetSize)) << fmt::format("Bitset size should be {}", bitsetSize); + + // test default values + for (std::size_t i = 0; i < bitset.size(); i++) { + expect(!bitset.test(i)) << fmt::format("Bit {} should be false", i); + } + + // set true for test positions + std::vector testPositions = {0UZ, 1UZ, 5UZ, 31UZ, 32UZ, 47UZ, 63UZ, 64UZ, 100UZ, 127UZ}; + for (const std::size_t pos : testPositions) { + bitset.set(pos); + } + + // only test positions should be set + for (std::size_t i = 0; i < bitset.size(); i++) { + if (std::ranges::find(testPositions, i) != testPositions.end()) { + expect(bitset.test(i)) << fmt::format("Bit {} should be set", i); + } else { + expect(!bitset.test(i)) << fmt::format("Bit {} should be false", i); + } + } + + // reset test positions + for (const std::size_t pos : testPositions) { + bitset.reset(pos); + } + + // all positions should be reset + for (std::size_t i = 0; i < bitset.size(); i++) { + expect(!bitset.test(i)) << fmt::format("Bit {} should be false", i); + } + + // Bulk operations + std::vector> testPositionsBulk = {{10UZ, 20UZ}, {10UZ, 10UZ}, {50UZ, 70UZ}, {0UZ, 127UZ}, {0UZ, 128UZ}, {63UZ, 64UZ}, {127UZ, 128UZ}, {0UZ, 1UZ}, {128UZ, 128UZ}}; + for (const auto& pos : testPositionsBulk) { + bitset.set(pos.first, pos.second); + + for (std::size_t i = 0; i < bitset.size(); ++i) { + if (i >= pos.first && i < pos.second) { + expect(bitset.test(i)) << fmt::format("Bulk [{},{}) Bit {} should be true", pos.first, pos.second, i); + } else { + expect(!bitset.test(i)) << fmt::format("Bulk [{},{}) Bit {} should be false", pos.first, pos.second, i); + } + } + + // all positions should be reset + bitset.reset(pos.first, pos.second); + for (std::size_t i = 0; i < bitset.size(); i++) { + expect(!bitset.test(i)) << fmt::format("Bulk [{},{}) Bit {} should be false", pos.first, pos.second, i); + } + } + +#if not defined(__EMSCRIPTEN__) && not defined(NDEBUG) + expect(aborts([&] { bitset.set(bitsetSize); })) << "Setting bit should throw an assertion."; + expect(aborts([&] { bitset.reset(bitsetSize); })) << "Resetting bit should throw an assertion."; + expect(aborts([&] { bitset.test(bitsetSize); })) << "Testing bit should throw an assertion."; + // bulk operations + expect(aborts([&] { bitset.set(100UZ, 200UZ); })) << "Setting bulk bits should throw an assertion."; + expect(aborts([&] { bitset.reset(100UZ, 200UZ); })) << "Resetting bulk bits should throw an assertion."; + expect(aborts([&] { bitset.set(200UZ, 100UZ); })) << "Setting bulk begin > end should throw an assertion."; + expect(aborts([&] { bitset.reset(200UZ, 100UZ); })) << "Resetting bulk begin > end should throw an assertion."; +#endif +} + +const boost::ut::suite AtomicBitsetTests = [] { + using namespace boost::ut; + using namespace gr; + using namespace gr::detail; + + "basics set/reset/test"_test = []() { + auto dynamicBitset = AtomicBitset<>(128UZ); + runAtomicBitsetTest(dynamicBitset, 128UZ); + + auto staticBitset = AtomicBitset<128UZ>(); + runAtomicBitsetTest(staticBitset, 128UZ); + }; + + "multithreads"_test = [] { + constexpr std::size_t bitsetSize = 256UZ; + constexpr std::size_t nThreads = 16UZ; + constexpr std::size_t nRepeats = 100UZ; + AtomicBitset bitset; + std::vector threads; + + for (std::size_t iThread = 0; iThread < nThreads; iThread++) { + threads.emplace_back([&] { + for (std::size_t iR = 0; iR < nRepeats; iR++) { + for (std::size_t i = 0; i < bitsetSize; i++) { + if (i < bitsetSize / 2) { + bitset.set(i); + bitset.reset(i); + std::ignore = bitset.test(i); + } else { + bitset.reset(i); + bitset.set(i); + std::ignore = bitset.test(i); + } + } + } + }); + } + + for (auto& thread : threads) { + thread.join(); + } + + // Verify final state: first half should be reset, second half should be set + for (std::size_t i = 0; i < bitsetSize; i++) { + if (i < bitsetSize / 2) { + expect(!bitset.test(i)) << std::format("Bit {} should be reset", i); + } else { + expect(bitset.test(i)) << std::format("Bit {} should be set", i); + } + } + }; + + "multithreads bulk"_test = [] { + constexpr std::size_t bitsetSize = 2000UZ; + constexpr std::size_t nThreads = 10UZ; + constexpr std::size_t chunkSize = bitsetSize / nThreads; + constexpr std::size_t nRepeats = 1000UZ; + AtomicBitset bitset; + std::vector threads; + + for (std::size_t iThread = 0; iThread < nThreads; iThread++) { + threads.emplace_back([&bitset, iThread] { + for (std::size_t iR = 0; iR < nRepeats; iR++) { + if (iThread % 2 == 0) { + bitset.set(iThread * chunkSize, (iThread + 1) * chunkSize); + bitset.reset(iThread * chunkSize, (iThread + 1) * chunkSize); + } else { + bitset.reset(iThread * chunkSize, (iThread + 1) * chunkSize); + bitset.set(iThread * chunkSize, (iThread + 1) * chunkSize); + } + } + }); + } + + for (auto& thread : threads) { + thread.join(); + } + + // Verify final state + for (std::size_t i = 0; i < bitsetSize; i++) { + if ((i / chunkSize) % 2 == 0) { + expect(!bitset.test(i)) << std::format("Bit {} should be reset", i); + } else { + expect(bitset.test(i)) << std::format("Bit {} should be set", i); + } + } + }; +}; + +int main() { /* not needed for UT */ } diff --git a/core/test/qa_buffer.cpp b/core/test/qa_buffer.cpp index a5e76bd0..70a724da 100644 --- a/core/test/qa_buffer.cpp +++ b/core/test/qa_buffer.cpp @@ -109,102 +109,6 @@ const boost::ut::suite BasicConceptsTests = [] { } | CircularBufferTypesToTest(); }; -template -void runAtomicBitsetTest(TBitset& bitset, std::size_t bitsetSize) { - using namespace boost::ut; - - expect(eq(bitset.size(), bitsetSize)) << fmt::format("Bitset size should be {}", bitsetSize); - - // test default values - for (std::size_t i = 0; i < bitset.size(); i++) { - expect(!bitset.test(i)) << fmt::format("Bit {} should be false", i); - } - - // set true for test positions - std::vector testPositions = {0UZ, 1UZ, 5UZ, 31UZ, 32UZ, 47UZ, 63UZ, 64UZ, 100UZ, 127UZ}; - for (const std::size_t pos : testPositions) { - bitset.set(pos); - } - - // only test positions should be set - for (std::size_t i = 0; i < bitset.size(); i++) { - if (std::ranges::find(testPositions, i) != testPositions.end()) { - expect(bitset.test(i)) << fmt::format("Bit {} should be set", i); - } else { - expect(!bitset.test(i)) << fmt::format("Bit {} should be false", i); - } - } - - // reset test positions - for (const std::size_t pos : testPositions) { - bitset.reset(pos); - } - - // all positions should be reset - for (std::size_t i = 0; i < bitset.size(); i++) { - expect(!bitset.test(i)) << fmt::format("Bit {} should be false", i); - } - -#if not defined(__EMSCRIPTEN__) && not defined(NDEBUG) - expect(aborts([&] { bitset.set(bitsetSize); })) << "Setting bit should throw an assertion."; - expect(aborts([&] { bitset.reset(bitsetSize); })) << "Resetting bit should throw an assertion."; - expect(aborts([&] { bitset.test(bitsetSize); })) << "Testing bit should throw an assertion."; -#endif -} - -const boost::ut::suite AtomicBitsetTests = [] { - using namespace boost::ut; - using namespace gr; - using namespace gr::detail; - - "basics set/reset/test"_test = []() { - auto dynamicBitset = AtomicBitset<>(128UZ); - runAtomicBitsetTest(dynamicBitset, 128UZ); - - auto staticBitset = AtomicBitset<128UZ>(); - runAtomicBitsetTest(staticBitset, 128UZ); - }; - - "multithreads"_test = [] { - constexpr std::size_t bitsetSize = 256UZ; - constexpr std::size_t nThreads = 16UZ; - constexpr std::size_t nRepeats = 100UZ; - AtomicBitset bitset; - std::vector threads; - - for (std::size_t iThread = 0; iThread < nThreads; iThread++) { - threads.emplace_back([&] { - for (std::size_t iR = 0; iR < nRepeats; iR++) { - for (std::size_t i = 0; i < bitsetSize; i++) { - if (i < bitsetSize / 2) { - bitset.set(i); - bitset.reset(i); - std::ignore = bitset.test(i); - } else { - bitset.reset(i); - bitset.set(i); - std::ignore = bitset.test(i); - } - } - } - }); - } - - for (auto& thread : threads) { - thread.join(); - } - - // Verify final state: first half should be reset, second half should be set - for (std::size_t i = 0; i < bitsetSize; i++) { - if (i < bitsetSize / 2) { - expect(!bitset.test(i)) << std::format("Bit {} should be reset", i); - } else { - expect(bitset.test(i)) << std::format("Bit {} should be set", i); - } - } - }; -}; - const boost::ut::suite SequenceTests = [] { using namespace boost::ut; @@ -501,7 +405,6 @@ const boost::ut::suite CircularBufferTests = [] { expect(eq(reader.nSamplesConsumed(), 0UZ)); } } - // basic expert writer api for (int k = 0; k < 3; k++) { // case 0: write fully reserved data