Skip to content

Commit

Permalink
Add nSamplesConsumed() and nSamplesPublished() to BufferReader/Writer
Browse files Browse the repository at this point in the history
  • Loading branch information
drslebedev authored and RalphSteinhagen committed May 1, 2024
1 parent 76e8491 commit 86e88f6
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 41 deletions.
3 changes: 3 additions & 0 deletions core/include/gnuradio-4.0/Buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ concept BufferReader = requires(T /*const*/ t, const std::size_t n_items) {
{ t.position() } -> std::same_as<std::make_signed_t<std::size_t>>;
{ t.available() } -> std::same_as<std::size_t>;
{ t.buffer() };
{ t.nSamplesConsumed()} -> std::same_as<std::size_t>;
{ t.isConsumeRequested()} -> std::same_as<bool>;
};

template<class Fn, typename T, typename ...Args>
Expand All @@ -92,6 +94,7 @@ concept BufferWriter = requires(T t, const std::size_t n_items, std::pair<std::s
{ t.reserve(n_items) };// TODO: reserve() returns CircularBuffer::buffer_writer::PublishableOutputRange
{ t.available() } -> std::same_as<std::size_t>;
{ t.buffer() };
{ t.nSamplesPublished()} -> std::same_as<std::size_t>;
};

template<class T, typename ...Args>
Expand Down
21 changes: 17 additions & 4 deletions core/include/gnuradio-4.0/BufferSkeleton.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,16 @@ class BufferSkeleton {
return BufferSkeleton(_buffer);
};

[[nodiscard]] constexpr std::size_t
nSamplesConsumed() const noexcept {
return 0UZ;
};

[[nodiscard]] constexpr bool
isConsumeRequested() const noexcept {
return false;
}

template<bool strict_check = true>
[[nodiscard]] std::span<const U>
get(const std::size_t /* n_requested = 0*/) const noexcept(!strict_check) {
Expand Down Expand Up @@ -83,19 +93,22 @@ class BufferSkeleton {
return BufferSkeleton(_buffer);
};

[[nodiscard]] constexpr std::size_t
nSamplesPublished() const noexcept {
return 0UZ;
};

[[nodiscard]] constexpr auto
reserve(std::size_t n) noexcept -> std::span<U> {
return { &_buffer->_data[0], n };
}

constexpr void
publish(std::pair<std::size_t, std::make_signed<std::size_t>>, std::size_t) const { /* empty */
}
publish(std::pair<std::size_t, std::make_signed<std::size_t>>, std::size_t) const { /* empty */ }

template<typename... Args, WriterCallback<U, Args...> Translator>
void
publish(Translator && /* translator */, std::size_t /* n_slots_to_claim = 1 */, Args &&.../* args */) const noexcept { /* empty */
} // blocks until elements are available
publish(Translator && /* translator */, std::size_t /* n_slots_to_claim = 1 */, Args &&.../* args */) const noexcept { /* empty */ } // blocks until elements are available

template<typename... Args, WriterCallback<U, Args...> Translator>
[[nodiscard]] bool
Expand Down
71 changes: 40 additions & 31 deletions core/include/gnuradio-4.0/CircularBuffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ class CircularBuffer

if constexpr (isMultiThreadedStrategy()) {
if (_parent->_rangesCounter == 0 && !isFullyPublished()) {
fmt::print(stderr, "CircularBuffer::multiple_writer::PublishableOutputRange() - did not publish {} samples\n", _parent->_internalSpan.size() - _parent->_nSlotsPublished);
fmt::print(stderr, "CircularBuffer::multiple_writer::PublishableOutputRange() - did not publish {} samples\n", _parent->_internalSpan.size() - _parent->_nSamplesPublished);
std::abort();
}

Expand All @@ -351,7 +351,7 @@ class CircularBuffer

[[nodiscard]] constexpr bool
isFullyPublished() const noexcept {
return _parent->_internalSpan.size() == _parent->_nSlotsPublished;
return _parent->_internalSpan.size() == _parent->_nSamplesPublished;
}

[[nodiscard]] constexpr static bool
Expand Down Expand Up @@ -379,21 +379,21 @@ class CircularBuffer
explicit(false) operator std::span<T>&() const noexcept { return _parent->_internalSpan; }
operator std::span<T>&() noexcept { return _parent->_internalSpan; }

constexpr void publish(std::size_t nSlotsToPublish) noexcept {
assert(nSlotsToPublish <= _parent->_internalSpan.size() - _parent->_nSlotsPublished && "n_produced must be <= than unpublished slots");
constexpr void publish(std::size_t nSamplesToPublish) noexcept {
assert(nSamplesToPublish <= _parent->_internalSpan.size() - _parent->_nSamplesPublished && "n_produced must be <= than unpublished samples");
if (!_parent->_isMmapAllocated) {
const std::size_t size = _parent->_size;
// mirror samples below/above the buffer's wrap-around point
const size_t nFirstHalf = std::min(size - _parent->_index, nSlotsToPublish);
const size_t nSecondHalf = nSlotsToPublish - nFirstHalf;
const size_t nFirstHalf = std::min(size - _parent->_index, nSamplesToPublish);
const size_t nSecondHalf = nSamplesToPublish - nFirstHalf;

auto &data = _parent->_buffer->_data;
std::copy(&data[_parent->_index], &data[_parent->_index + nFirstHalf], &data[_parent->_index + size]);
std::copy(&data[size], &data[size + nSecondHalf], &data[0]);
}
_parent->_claimStrategy->publish(_parent->_offset, nSlotsToPublish);
_parent->_offset += static_cast<signed_index_type>(nSlotsToPublish);
_parent->_nSlotsPublished += nSlotsToPublish;
_parent->_claimStrategy->publish(_parent->_offset, nSamplesToPublish);
_parent->_offset += static_cast<signed_index_type>(nSamplesToPublish);
_parent->_nSamplesPublished += nSamplesToPublish;
_parent->_isRangePublished = true;
}
}; // class PublishableOutputRange
Expand All @@ -415,7 +415,7 @@ class CircularBuffer

// doesn't have to be atomic because this writer is accessed (by design) always by the same thread.
// These are the parameters for PublishableOutputRange, only one PublishableOutputRange can be reserved per writer
std::size_t _nSlotsPublished {0UZ}; // controls how many slots wre already published, multiple publish() calls are allowed,
std::size_t _nSamplesPublished {0UZ}; // controls how many samples were already published, multiple publish() calls are allowed
bool _isRangePublished {true};// controls if publish() was invoked
std::size_t _index {0UZ};
signed_index_type _offset {0};
Expand All @@ -435,7 +435,7 @@ class CircularBuffer
, _isMmapAllocated(_buffer->_isMmapAllocated)
, _size(_buffer->_size)
, _claimStrategy(std::addressof(_buffer->_claimStrategy))
, _nSlotsPublished(std::exchange(other._nSlotsPublished, 0UZ))
, _nSamplesPublished(std::exchange(other._nSamplesPublished, 0UZ))
, _isRangePublished(std::exchange(other._isRangePublished, true))
, _index(std::exchange(other._index, 0UZ))
, _offset(std::exchange(other._offset, 0))
Expand All @@ -445,7 +445,7 @@ class CircularBuffer
std::swap(_buffer, tmp._buffer);
_isMmapAllocated = _buffer->_isMmapAllocated;
_size = _buffer->_size;
std::swap(_nSlotsPublished, tmp._nSlotsPublished);
std::swap(_nSamplesPublished, tmp._nSamplesPublished);
std::swap(_isRangePublished, tmp._isRangePublished);
_claimStrategy = std::addressof(_buffer->_claimStrategy);
std::swap(_index, tmp._index);
Expand All @@ -457,46 +457,48 @@ class CircularBuffer

[[nodiscard]] constexpr BufferType buffer() const noexcept { return CircularBuffer(_buffer); };

[[nodiscard]] constexpr std::size_t nSamplesPublished() const noexcept {return _nSamplesPublished;};

template<SpanReleasePolicy policy = SpanReleasePolicy::ProcessNone>
[[nodiscard]] constexpr auto reserve(std::size_t nSlotsToClaim) noexcept -> PublishableOutputRange<U, policy> {
[[nodiscard]] constexpr auto reserve(std::size_t nSamples) noexcept -> PublishableOutputRange<U, policy> {
checkIfCanReserveAndAbortIfNeeded();
_isRangePublished = false;
_nSlotsPublished = 0UZ;
_nSamplesPublished = 0UZ;

if (nSlotsToClaim == 0) {
if (nSamples == 0) {
return PublishableOutputRange<U, policy>(this);
}

try {
const auto sequence = _claimStrategy->next(*_buffer->_read_indices, nSlotsToClaim); // alt: try_next
const std::size_t index = (static_cast<std::size_t>(sequence) + _size - nSlotsToClaim) % _size;
return PublishableOutputRange<U, policy>(this, index, sequence, nSlotsToClaim);
const auto sequence = _claimStrategy->next(*_buffer->_read_indices, nSamples); // alt: try_next
const std::size_t index = (static_cast<std::size_t>(sequence) + _size - nSamples) % _size;
return PublishableOutputRange<U, policy>(this, index, sequence, nSamples);
} catch (const NoCapacityException &) {
return PublishableOutputRange<U, policy>(this);
}
}

template <typename... Args, WriterCallback<U, Args...> Translator>
constexpr void publish(Translator&& translator, std::size_t nSlotsToClaim = 1, Args&&... args) {
constexpr void publish(Translator&& translator, std::size_t nSamples = 1, Args&&... args) {
_isRangePublished = true;
_nSlotsPublished += nSlotsToClaim;
if (nSlotsToClaim <= 0 || _buffer->_read_indices->empty()) {
_nSamplesPublished += nSamples;
if (nSamples <= 0 || _buffer->_read_indices->empty()) {
return;
}
const auto sequence = _claimStrategy->next(*_buffer->_read_indices, nSlotsToClaim);
translate_and_publish(std::forward<Translator>(translator), nSlotsToClaim, sequence, std::forward<Args>(args)...);
const auto sequence = _claimStrategy->next(*_buffer->_read_indices, nSamples);
translate_and_publish(std::forward<Translator>(translator), nSamples, sequence, std::forward<Args>(args)...);
} // blocks until elements are available

template <typename... Args, WriterCallback<U, Args...> Translator>
constexpr bool try_publish(Translator&& translator, std::size_t nSlotsToClaim = 1, Args&&... args) {
constexpr bool try_publish(Translator&& translator, std::size_t nSamples = 1, Args&&... args) {
_isRangePublished = true;
_nSlotsPublished += nSlotsToClaim;
if (nSlotsToClaim <= 0 || _buffer->_read_indices->empty()) {
_nSamplesPublished += nSamples;
if (nSamples <= 0 || _buffer->_read_indices->empty()) {
return true;
}
try {
const auto sequence = _claimStrategy->tryNext(*_buffer->_read_indices, nSlotsToClaim);
translate_and_publish(std::forward<Translator>(translator), nSlotsToClaim, sequence, std::forward<Args>(args)...);
const auto sequence = _claimStrategy->tryNext(*_buffer->_read_indices, nSamples);
translate_and_publish(std::forward<Translator>(translator), nSamples, sequence, std::forward<Args>(args)...);
return true;
} catch (const NoCapacityException &) {
return false;
Expand Down Expand Up @@ -542,9 +544,9 @@ class CircularBuffer

constexpr void checkIfCanReserveAndAbortIfNeeded() const noexcept {
if constexpr (std::is_base_of_v<MultiThreadedStrategy<SIZE, WAIT_STRATEGY>, ClaimType>) {
if (_internalSpan.size() - _nSlotsPublished != 0) {
if (_internalSpan.size() - _nSamplesPublished != 0) {
fmt::print(stderr, "An error occurred: The method CircularBuffer::multiple_writer::reserve() was invoked for the second time in succession, "
"a previous PublishableOutputRange was not fully published, {} samples remain unpublished.", _internalSpan.size() - _nSlotsPublished);
"a previous PublishableOutputRange was not fully published, {} samples remain unpublished.", _internalSpan.size() - _nSamplesPublished);
std::abort();
}

Expand Down Expand Up @@ -685,6 +687,7 @@ class CircularBuffer
}
}
_parent->_readIndexCached = _parent->_readIndex->addAndGet(static_cast<signed_index_type>(nSamples));
_parent->_nSamplesConsumed = nSamples;
return true;
}

Expand All @@ -710,6 +713,7 @@ class CircularBuffer
// Samples are now consumed in a delayed manner. When the consume() method is called, the actual consumption does not happen immediately.
// Instead, the real consume() operation is invoked in the destructor, when the last ConsumableInputRange is destroyed.
mutable std::size_t _nSamplesToConsume {std::numeric_limits<std::size_t>::max()}; // The number of samples requested for consumption by explicitly invoking the consume() method.
mutable std::size_t _nSamplesConsumed {0UZ}; // The number of samples actually consumed.

std::size_t
buffer_index() const noexcept {
Expand All @@ -731,7 +735,8 @@ class CircularBuffer
, _size(_buffer->_size)
, _nSamplesFirstGet(std::move(other._nSamplesFirstGet))
, _rangesCounter(std::move(other._rangesCounter))
, _nSamplesToConsume(std::move(other._nSamplesToConsume)){
, _nSamplesToConsume(std::move(other._nSamplesToConsume))
, _nSamplesConsumed(std::move(other._nSamplesConsumed)){
}
buffer_reader& operator=(buffer_reader tmp) noexcept {
std::swap(_readIndex, tmp._readIndex);
Expand All @@ -740,13 +745,16 @@ class CircularBuffer
std::swap(_nSamplesFirstGet, tmp._nSamplesFirstGet);
std::swap(_rangesCounter, tmp._rangesCounter);
std::swap(_nSamplesToConsume, tmp._nSamplesToConsume);
std::swap(_nSamplesConsumed, tmp._nSamplesConsumed);
_size = _buffer->_size;
return *this;
};
~buffer_reader() { gr::detail::removeSequence( _buffer->_read_indices, _readIndex); }

[[nodiscard]] constexpr BufferType buffer() const noexcept { return CircularBuffer(_buffer); };

[[nodiscard]] constexpr std::size_t nSamplesConsumed() const noexcept {return _nSamplesConsumed;};

[[nodiscard]] constexpr bool isConsumeRequested() const noexcept {
return _nSamplesToConsume != std::numeric_limits<std::size_t>::max();
}
Expand All @@ -766,6 +774,7 @@ class CircularBuffer

if (_nSamplesFirstGet == std::numeric_limits<std::size_t>::max() ) {
_nSamplesFirstGet = nSamples;
_nSamplesConsumed = 0UZ;
} else {
nSamples = std::min(nSamples, _nSamplesFirstGet);
}
Expand Down
Loading

0 comments on commit 86e88f6

Please sign in to comment.