Skip to content

Commit

Permalink
Performance optimization for MultiProducerStrategy
Browse files Browse the repository at this point in the history
* AtomicBitset bulk set/reset
* Rename to Single(Multi)ProducerStrategy
* Eliminate usage of reserve(), use tryReserve() instead
* Remove warnings

Signed-off-by: drslebedev <[email protected]>
  • Loading branch information
drslebedev committed Jul 12, 2024
1 parent e6c0183 commit e5f466f
Show file tree
Hide file tree
Showing 9 changed files with 445 additions and 255 deletions.
101 changes: 66 additions & 35 deletions blocks/basic/include/gnuradio-4.0/basic/DataSink.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -698,28 +698,34 @@ synchronously (/asynchronously) if handled by the same (/different) sink block.

void setMetadata(detail::Metadata metadata) override { dataset_template = detail::makeDataSetTemplate<T>(std::move(metadata)); }

inline void publishDataSet(DataSet<T>&& data) {
inline bool publishDataSet(DataSet<T>&& data) {
if constexpr (!std::is_same_v<Callback, gr::meta::null_type>) {
callback(std::move(data));
return true;
} else {
auto poller = polling_handler.lock();
if (!poller) {
this->setExpired();
return;
return false;
}

auto writeData = poller->writer.reserve(1);
if (block) {
writeData[0] = std::move(data);
writeData.publish(1);
auto writeData = poller->writer.tryReserve(1);
if (writeData.empty()) {
return false;
} else {
if (poller->writer.available() > 0) {
if (block) {
writeData[0] = std::move(data);
writeData.publish(1);
} else {
poller->drop_count++;
if (poller->writer.available() > 0) {
writeData[0] = std::move(data);
writeData.publish(1);
} else {
poller->drop_count++;
}
}
}
return true;
}
}

Expand All @@ -742,8 +748,10 @@ synchronously (/asynchronously) if handled by the same (/different) sink block.
window->pending_post_samples -= postSampleView.size();

if (window->pending_post_samples == 0) {
this->publishDataSet(std::move(window->dataset));
window = pending_trigger_windows.erase(window);
bool published = this->publishDataSet(std::move(window->dataset));
if (published) {
window = pending_trigger_windows.erase(window);
}
} else {
++window;
}
Expand All @@ -753,7 +761,10 @@ synchronously (/asynchronously) if handled by the same (/different) sink block.
void stop() override {
for (auto& window : pending_trigger_windows) {
if (!window.dataset.signal_values.empty()) {
this->publishDataSet(std::move(window.dataset));
bool published = this->publishDataSet(std::move(window.dataset));
if (!published) {
throw gr::exception("DataSink::TriggerListener::stop() can not publish pending datasets\n");
}
}
}
pending_trigger_windows.clear();
Expand Down Expand Up @@ -781,28 +792,34 @@ synchronously (/asynchronously) if handled by the same (/different) sink block.

void setMetadata(detail::Metadata metadata) override { dataset_template = detail::makeDataSetTemplate<T>(std::move(metadata)); }

inline void publishDataSet(DataSet<T>&& data) {
inline bool publishDataSet(DataSet<T>&& data) {
if constexpr (!std::is_same_v<Callback, gr::meta::null_type>) {
callback(std::move(data));
return true;
} else {
auto poller = polling_handler.lock();
if (!poller) {
this->setExpired();
return;
return false;
}

auto writeData = poller->writer.reserve(1);
if (block) {
writeData[0] = std::move(data);
writeData.publish(1);
auto writeData = poller->writer.tryReserve(1);
if (writeData.empty()) {
return false;
} else {
if (poller->writer.available() > 0) {
if (block) {
writeData[0] = std::move(data);
writeData.publish(1);
} else {
poller->drop_count++;
if (poller->writer.available() > 0) {
writeData[0] = std::move(data);
writeData.publish(1);
} else {
poller->drop_count++;
}
}
}
return true;
}
}

Expand All @@ -814,8 +831,10 @@ synchronously (/asynchronously) if handled by the same (/different) sink block.
if (obsr == trigger::MatchResult::NotMatching) {
pending_dataset->timing_events[0].push_back({static_cast<Tag::signed_index_type>(pending_dataset->signal_values.size()), *tagData0});
}
this->publishDataSet(std::move(*pending_dataset));
pending_dataset.reset();
bool published = this->publishDataSet(std::move(*pending_dataset));
if (published) {
pending_dataset.reset();
}
}
}
if (obsr == trigger::MatchResult::Matching) {
Expand All @@ -830,15 +849,20 @@ synchronously (/asynchronously) if handled by the same (/different) sink block.
pending_dataset->signal_values.insert(pending_dataset->signal_values.end(), view.begin(), view.end());

if (pending_dataset->signal_values.size() == maximumWindowSize) {
this->publishDataSet(std::move(*pending_dataset));
pending_dataset.reset();
bool published = this->publishDataSet(std::move(*pending_dataset));
if (published) {
pending_dataset.reset();
}
}
}
}

void stop() override {
if (pending_dataset) {
this->publishDataSet(std::move(*pending_dataset));
bool published = this->publishDataSet(std::move(*pending_dataset));
if (!published) {
throw gr::exception("DataSink::MultiplexedListener::stop() can not publish pending datasets\n");
}
pending_dataset.reset();
}
if (auto p = polling_handler.lock()) {
Expand Down Expand Up @@ -875,28 +899,34 @@ synchronously (/asynchronously) if handled by the same (/different) sink block.
dataset_template = detail::makeDataSetTemplate<T>(std::move(metadata));
}

inline void publishDataSet(DataSet<T>&& data) {
inline bool publishDataSet(DataSet<T>&& data) {
if constexpr (!std::is_same_v<Callback, gr::meta::null_type>) {
callback(std::move(data));
return true;
} else {
auto poller = polling_handler.lock();
if (!poller) {
this->setExpired();
return;
return false;
}

auto writeData = poller->writer.reserve(1);
if (block) {
writeData[0] = std::move(data);
writeData.publish(1);
auto writeData = poller->writer.tryReserve(1);
if (writeData.empty()) {
return false;
} else {
if (poller->writer.available() > 0) {
if (block) {
writeData[0] = std::move(data);
writeData.publish(1);
} else {
poller->drop_count++;
if (poller->writer.available() > 0) {
writeData[0] = std::move(data);
writeData.publish(1);
} else {
poller->drop_count++;
}
}
}
return true;
}
}

Expand All @@ -918,9 +948,10 @@ synchronously (/asynchronously) if handled by the same (/different) sink block.
DataSet<T> dataset = dataset_template;
dataset.timing_events = {{{-static_cast<Tag::signed_index_type>(it->delay), std::move(it->tag_data)}}};
dataset.signal_values = {inData[it->pending_samples]};
this->publishDataSet(std::move(dataset));

it = pending.erase(it);
bool published = this->publishDataSet(std::move(dataset));
if (published) {
it = pending.erase(it);
}
}
}

Expand Down
132 changes: 132 additions & 0 deletions core/include/gnuradio-4.0/AtomicBitset.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
#ifndef GNURADIO_ATOMICBITSET_HPP
#define GNURADIO_ATOMICBITSET_HPP

#include <vector>

#ifndef forceinline
// use this for hot-spots only <-> may bloat code size, not fit into cache and consequently slow down execution
#define forceinline inline __attribute__((always_inline))
#endif

namespace gr {
/*
* `AtomicBitset` is a lock-free, thread-safe bitset.
* It allows for efficient and thread-safe manipulation of individual bits.
* For bulk set or reset atomic operation is guaranteed per individual bit (word).
*/
template<std::size_t Size = std::dynamic_extent>
class AtomicBitset {
static_assert(Size > 0, "Size must be greater than 0");
static constexpr bool isSizeDynamic = Size == std::dynamic_extent;

static constexpr std::size_t _bitsPerWord = sizeof(size_t) * 8UZ;
static constexpr std::size_t _nStaticWords = isSizeDynamic ? 1UZ : (Size + _bitsPerWord - 1UZ) / _bitsPerWord;

// using DynamicArrayType = std::unique_ptr<std::atomic<std::size_t>[]>;
using DynamicArrayType = std::vector<std::atomic<std::size_t>>;
using StaticArrayType = std::array<std::atomic<std::size_t>, _nStaticWords>;
using ArrayType = std::conditional_t<isSizeDynamic, DynamicArrayType, StaticArrayType>;

std::size_t _size = Size;
ArrayType _bits;

public:
AtomicBitset()
requires(!isSizeDynamic)
{
for (auto& word : _bits) {
word.store(0UL, std::memory_order_relaxed);
}
}

explicit AtomicBitset(std::size_t size = 0UZ)
requires(isSizeDynamic)
: _size(size), _bits(std::vector<std::atomic<std::size_t>>(size)) {
// assert(size > 0UZ);
for (std::size_t i = 0; i < _size; i++) {
_bits[i].store(0UL, std::memory_order_relaxed);
}
}

void set(std::size_t bitPosition, bool value) {
assert(bitPosition < _size);
const std::size_t wordIndex = bitPosition / _bitsPerWord;
const std::size_t bitIndex = bitPosition % _bitsPerWord;
const std::size_t mask = 1UL << bitIndex;

std::size_t oldBits;
std::size_t newBits;
do {
oldBits = _bits[wordIndex].load(std::memory_order_relaxed);
newBits = value ? (oldBits | mask) : (oldBits & ~mask);
} while (!_bits[wordIndex].compare_exchange_weak(oldBits, newBits, std::memory_order_release, std::memory_order_relaxed));
}

void set(std::size_t begin, std::size_t end, bool value) {
assert(begin <= end && end <= _size); // [begin, end)

std::size_t beginWord = begin / _bitsPerWord;
std::size_t endWord = end / _bitsPerWord;
const std::size_t beginOffset = begin % _bitsPerWord;
const std::size_t endOffset = end % _bitsPerWord;

if (begin == end) {
return;
} else if (beginWord == endWord) {
// the range is within a single word
setBitsInWord(beginWord, beginOffset, endOffset, value);
} else {
// leading bits in the first word
if (beginOffset != 0) {
setBitsInWord(beginWord, beginOffset, _bitsPerWord, value);
beginWord++;
}
// trailing bits in the last word
if (endOffset != 0) {
setBitsInWord(endWord, 0, endOffset, value);
}
endWord--;
// whole words in the middle
for (std::size_t wordIndex = beginWord; wordIndex <= endWord; ++wordIndex) {
setFullWord(wordIndex, value);
}
}
}

void set(std::size_t bitPosition) { set(bitPosition, true); }

void set(std::size_t begin, std::size_t end) { set(begin, end, true); }

void reset(std::size_t bitPosition) { set(bitPosition, false); }

void reset(std::size_t begin, std::size_t end) { set(begin, end, false); }

bool test(std::size_t bitPosition) const {
assert(bitPosition < _size);
const std::size_t wordIndex = bitPosition / _bitsPerWord;
const std::size_t bitIndex = bitPosition % _bitsPerWord;
const std::size_t mask = 1UL << bitIndex;

return (_bits[wordIndex].load(std::memory_order_acquire) & mask) != 0;
}

[[nodiscard]] constexpr std::size_t size() const { return _size; }

private:
void setBitsInWord(std::size_t wordIndex, std::size_t begin, std::size_t end, bool value) {
assert(begin < end && end <= _bitsPerWord);
const std::size_t mask = (end == _bitsPerWord) ? ~((1UL << begin) - 1) : ((1UL << end) - 1) & ~((1UL << begin) - 1);
std::size_t oldBits;
std::size_t newBits;
do {
oldBits = _bits[wordIndex].load(std::memory_order_relaxed);
newBits = value ? (oldBits | mask) : (oldBits & ~mask);
} while (!_bits[wordIndex].compare_exchange_weak(oldBits, newBits, std::memory_order_release, std::memory_order_relaxed));
}

forceinline void setFullWord(std::size_t wordIndex, bool value) { _bits[wordIndex].store(value ? ~0UL : 0UL, std::memory_order_release); }
};

} // namespace gr

#endif // GNURADIO_ATOMICBITSET_HPP
10 changes: 7 additions & 3 deletions core/include/gnuradio-4.0/Block.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,7 @@ class Block : public lifecycle::StateMachine<Derived>, public std::tuple<Argumen
meta::tuple_for_each_enumerate(
[nSamples]<typename OutputRange>(auto, OutputRange& outputRange) {
auto processOneRange = [nSamples]<typename Out>(Out& out) {
if constexpr (Out::isMultiThreadedStrategy()) {
if constexpr (Out::isMultiProducerStrategy()) {
if (!out.isFullyPublished()) {
std::abort();
}
Expand Down Expand Up @@ -1737,8 +1737,12 @@ class Block : public lifecycle::StateMachine<Derived>, public std::tuple<Argumen

retMessage->cmd = Final; // N.B. could enable/allow for partial if we return multiple messages (e.g. using coroutines?)
retMessage->serviceName = unique_name;
PublishableSpan auto msgSpan = msgOut.streamWriter().reserve<SpanReleasePolicy::ProcessAll>(1UZ);
msgSpan[0] = *retMessage;
PublishableSpan auto msgSpan = msgOut.streamWriter().tryReserve<SpanReleasePolicy::ProcessAll>(1UZ);
if (msgSpan.empty()) {
throw gr::exception(fmt::format("{}::processMessages() can not reserve span for message\n", name));
} else {
msgSpan[0] = *retMessage;
}
} // - end - for (const auto &message : messages) { ..
}

Expand Down
10 changes: 5 additions & 5 deletions core/include/gnuradio-4.0/CircularBuffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -335,15 +335,15 @@ class CircularBuffer {
_parent->_buffer->_claimStrategy.publish(_parent->_offset, _parent->nSamplesPublished());
_parent->_offset += static_cast<signed_index_type>(_parent->nSamplesPublished());
#ifndef NDEBUG
if constexpr (isMultiThreadedStrategy()) {
if constexpr (isMultiProducerStrategy()) {
if (!isFullyPublished()) {
fmt::print(stderr, "CircularBuffer::multiple_writer::PublishableOutputRange() - did not publish {} samples\n", _parent->_internalSpan.size() - _parent->_nSamplesPublished);
fmt::print(stderr, "CircularBuffer::MultiWriter::PublishableOutputRange() - did not publish {} samples\n", _parent->_internalSpan.size() - _parent->_nSamplesPublished);
std::abort();
}

} else {
if (!_parent->_internalSpan.empty() && !isPublished()) {
fmt::print(stderr, "CircularBuffer::single_writer::PublishableOutputRange() - omitted publish call for {} reserved samples\n", _parent->_internalSpan.size());
fmt::print(stderr, "CircularBuffer::SingleWriter::PublishableOutputRange() - omitted publish call for {} reserved samples\n", _parent->_internalSpan.size());
std::abort();
}
}
Expand All @@ -370,7 +370,7 @@ class CircularBuffer {
[[nodiscard]] constexpr std::size_t samplesToPublish() const noexcept { return _parent->_nSamplesPublished; }
[[nodiscard]] constexpr bool isPublished() const noexcept { return _parent->_isRangePublished; }
[[nodiscard]] constexpr bool isFullyPublished() const noexcept { return _parent->_internalSpan.size() == _parent->_nSamplesPublished; }
[[nodiscard]] constexpr static bool isMultiThreadedStrategy() noexcept { return std::is_base_of_v<MultiThreadedStrategy<SIZE, TWaitStrategy>, ClaimType>; }
[[nodiscard]] constexpr static bool isMultiProducerStrategy() noexcept { return std::is_base_of_v<MultiProducerStrategy<SIZE, TWaitStrategy>, ClaimType>; }
[[nodiscard]] constexpr static SpanReleasePolicy spanReleasePolicy() noexcept { return policy; }

constexpr void publish(std::size_t nSamplesToPublish) noexcept {
Expand Down Expand Up @@ -478,7 +478,7 @@ class CircularBuffer {

private:
constexpr void checkIfCanReserveAndAbortIfNeeded() const noexcept {
if constexpr (std::is_base_of_v<MultiThreadedStrategy<SIZE, TWaitStrategy>, ClaimType>) {
if constexpr (std::is_base_of_v<MultiProducerStrategy<SIZE, TWaitStrategy>, ClaimType>) {
if (_internalSpan.size() - _nSamplesPublished != 0) {
fmt::print(stderr,
"An error occurred: The method CircularBuffer::MultiWriter::reserve() was invoked for the second time in succession, "
Expand Down
Loading

0 comments on commit e5f466f

Please sign in to comment.