Skip to content

Commit

Permalink
Stateful allocator support added. Resolves #45.
Browse files Browse the repository at this point in the history
  • Loading branch information
Maxim Egorushkin committed Feb 13, 2024
1 parent 363d3cd commit 7c8c456
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 48 deletions.
105 changes: 59 additions & 46 deletions include/atomic_queue/atomic_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -420,17 +420,16 @@ class AtomicQueue2 : public AtomicQueueCommon<AtomicQueue2<T, SIZE, MINIMIZE_CON
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

template<class T, class A = std::allocator<T>, T NIL = details::nil<T>(), bool MAXIMIZE_THROUGHPUT = true, bool TOTAL_ORDER = false, bool SPSC = false>
class AtomicQueueB : public AtomicQueueCommon<AtomicQueueB<T, A, NIL, MAXIMIZE_THROUGHPUT, TOTAL_ORDER, SPSC>>,
private std::allocator_traits<A>::template rebind_alloc<std::atomic<T>> {
class AtomicQueueB : private std::allocator_traits<A>::template rebind_alloc<std::atomic<T>>,
public AtomicQueueCommon<AtomicQueueB<T, A, NIL, MAXIMIZE_THROUGHPUT, TOTAL_ORDER, SPSC>> {
using AllocatorElements = typename std::allocator_traits<A>::template rebind_alloc<std::atomic<T>>;
using Base = AtomicQueueCommon<AtomicQueueB<T, A, NIL, MAXIMIZE_THROUGHPUT, TOTAL_ORDER, SPSC>>;
friend Base;

static constexpr bool total_order_ = TOTAL_ORDER;
static constexpr bool spsc_ = SPSC;
static constexpr bool maximize_throughput_ = MAXIMIZE_THROUGHPUT;

using AllocatorElements = typename std::allocator_traits<A>::template rebind_alloc<std::atomic<T>>;

static constexpr auto ELEMENTS_PER_CACHE_LINE = CACHE_LINE_SIZE / sizeof(std::atomic<T>);
static_assert(ELEMENTS_PER_CACHE_LINE, "Unexpected ELEMENTS_PER_CACHE_LINE.");

Expand All @@ -454,25 +453,25 @@ class AtomicQueueB : public AtomicQueueCommon<AtomicQueueB<T, A, NIL, MAXIMIZE_T

public:
using value_type = T;
using allocator_type = A;

// The special member functions are not thread-safe.

AtomicQueueB(unsigned size)
: size_(std::max(details::round_up_to_power_of_2(size), 1u << (SHUFFLE_BITS * 2)))
AtomicQueueB(unsigned size, A const& allocator = A{})
: AllocatorElements(allocator)
, size_(std::max(details::round_up_to_power_of_2(size), 1u << (SHUFFLE_BITS * 2)))
, elements_(AllocatorElements::allocate(size_)) {
assert(std::atomic<T>{NIL}.is_lock_free()); // Queue element type T is not atomic. Use AtomicQueue2/AtomicQueueB2 for such element types.
for(auto p = elements_, q = elements_ + size_; p < q; ++p)
p->store(NIL, X);
}

AtomicQueueB(AtomicQueueB&& b) noexcept
: Base(static_cast<Base&&>(b))
, AllocatorElements(static_cast<AllocatorElements&&>(b)) // TODO: This must be noexcept, static_assert that.
, size_(b.size_)
, elements_(b.elements_) {
b.size_ = 0;
b.elements_ = 0;
}
: AllocatorElements(static_cast<AllocatorElements&&>(b)) // TODO: This must be noexcept, static_assert that.
, Base(static_cast<Base&&>(b))
, size_(std::exchange(b.size_, 0))
, elements_(std::exchange(b.elements_, nullptr))
{}

AtomicQueueB& operator=(AtomicQueueB&& b) noexcept {
b.swap(*this);
Expand All @@ -484,10 +483,14 @@ class AtomicQueueB : public AtomicQueueCommon<AtomicQueueB<T, A, NIL, MAXIMIZE_T
AllocatorElements::deallocate(elements_, size_); // TODO: This must be noexcept, static_assert that.
}

A get_allocator() const noexcept {
return *this;
}

void swap(AtomicQueueB& b) noexcept {
using std::swap;
this->Base::swap(b);
swap(static_cast<AllocatorElements&>(*this), static_cast<AllocatorElements&>(b));
Base::swap(b);
swap(size_, b.size_);
swap(elements_, b.elements_);
}
Expand All @@ -500,27 +503,25 @@ class AtomicQueueB : public AtomicQueueCommon<AtomicQueueB<T, A, NIL, MAXIMIZE_T
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

template<class T, class A = std::allocator<T>, bool MAXIMIZE_THROUGHPUT = true, bool TOTAL_ORDER = false, bool SPSC = false>
class AtomicQueueB2 : public AtomicQueueCommon<AtomicQueueB2<T, A, MAXIMIZE_THROUGHPUT, TOTAL_ORDER, SPSC>>,
private A,
private std::allocator_traits<A>::template rebind_alloc<std::atomic<uint8_t>> {
class AtomicQueueB2 : private std::allocator_traits<A>::template rebind_alloc<unsigned char>,
public AtomicQueueCommon<AtomicQueueB2<T, A, MAXIMIZE_THROUGHPUT, TOTAL_ORDER, SPSC>> {
using StorageAllocator = typename std::allocator_traits<A>::template rebind_alloc<unsigned char>;
using Base = AtomicQueueCommon<AtomicQueueB2<T, A, MAXIMIZE_THROUGHPUT, TOTAL_ORDER, SPSC>>;
using State = typename Base::State;
using AtomicState = std::atomic<unsigned char>;
friend Base;

static constexpr bool total_order_ = TOTAL_ORDER;
static constexpr bool spsc_ = SPSC;
static constexpr bool maximize_throughput_ = MAXIMIZE_THROUGHPUT;

using AllocatorElements = A;
using AllocatorStates = typename std::allocator_traits<A>::template rebind_alloc<std::atomic<uint8_t>>;

// AtomicQueueCommon members are stored into by readers and writers.
// Allocate these immutable members on another cache line which never gets invalidated by stores.
alignas(CACHE_LINE_SIZE) unsigned size_;
std::atomic<unsigned char>* states_;
AtomicState* states_;
T* elements_;

static constexpr auto STATES_PER_CACHE_LINE = CACHE_LINE_SIZE / sizeof(State);
static constexpr auto STATES_PER_CACHE_LINE = CACHE_LINE_SIZE / sizeof(AtomicState);
static_assert(STATES_PER_CACHE_LINE, "Unexpected STATES_PER_CACHE_LINE.");

static constexpr auto SHUFFLE_BITS = details::GetCacheLineIndexBits<STATES_PER_CACHE_LINE>::value;
Expand All @@ -537,34 +538,43 @@ class AtomicQueueB2 : public AtomicQueueCommon<AtomicQueueB2<T, A, MAXIMIZE_THRO
Base::template do_push_any(std::forward<U>(element), states_[index], elements_[index]);
}

template<class U>
U* allocate_() {
U* p = reinterpret_cast<U*>(StorageAllocator::allocate(size_ * sizeof(U)));
assert(reinterpret_cast<uintptr_t>(p) % alignof(U) == 0); // Allocated storage must be suitably aligned for U.
return p;
}

template<class U>
void deallocate_(U* p) noexcept {
StorageAllocator::deallocate(reinterpret_cast<unsigned char*>(p), size_ * sizeof(U)); // TODO: This must be noexcept, static_assert that.
}

public:
using value_type = T;
using allocator_type = A;

// The special member functions are not thread-safe.

AtomicQueueB2(unsigned size)
: size_(std::max(details::round_up_to_power_of_2(size), 1u << (SHUFFLE_BITS * 2)))
, states_(AllocatorStates::allocate(size_))
, elements_(AllocatorElements::allocate(size_)) {
AtomicQueueB2(unsigned size, A const& allocator = A{})
: StorageAllocator(allocator)
, size_(std::max(details::round_up_to_power_of_2(size), 1u << (SHUFFLE_BITS * 2)))
, states_(allocate_<AtomicState>())
, elements_(allocate_<T>()) {
for(auto p = states_, q = states_ + size_; p < q; ++p)
p->store(Base::EMPTY, X);

AllocatorElements& ae = *this;
A a = get_allocator();
for(auto p = elements_, q = elements_ + size_; p < q; ++p)
std::allocator_traits<AllocatorElements>::construct(ae, p);
std::allocator_traits<A>::construct(a, p);
}

AtomicQueueB2(AtomicQueueB2&& b) noexcept
: Base(static_cast<Base&&>(b))
, AllocatorElements(static_cast<AllocatorElements&&>(b)) // TODO: This must be noexcept, static_assert that.
, AllocatorStates(static_cast<AllocatorStates&&>(b)) // TODO: This must be noexcept, static_assert that.
, size_(b.size_)
, states_(b.states_)
, elements_(b.elements_) {
b.size_ = 0;
b.states_ = 0;
b.elements_ = 0;
}
: StorageAllocator(static_cast<StorageAllocator&&>(b)) // TODO: This must be noexcept, static_assert that.
, Base(static_cast<Base&&>(b))
, size_(std::exchange(b.size_, 0))
, states_(std::exchange(b.states_, nullptr))
, elements_(std::exchange(b.elements_, nullptr))
{}

AtomicQueueB2& operator=(AtomicQueueB2&& b) noexcept {
b.swap(*this);
Expand All @@ -573,19 +583,22 @@ class AtomicQueueB2 : public AtomicQueueCommon<AtomicQueueB2<T, A, MAXIMIZE_THRO

~AtomicQueueB2() noexcept {
if(elements_) {
AllocatorElements& ae = *this;
A a = get_allocator();
for(auto p = elements_, q = elements_ + size_; p < q; ++p)
std::allocator_traits<AllocatorElements>::destroy(ae, p);
AllocatorElements::deallocate(elements_, size_); // TODO: This must be noexcept, static_assert that.
AllocatorStates::deallocate(states_, size_); // TODO: This must be noexcept, static_assert that.
std::allocator_traits<A>::destroy(a, p);
deallocate_(elements_);
deallocate_(states_);
}
}

A get_allocator() const noexcept {
return *this;
}

void swap(AtomicQueueB2& b) noexcept {
using std::swap;
this->Base::swap(b);
swap(static_cast<AllocatorElements&>(*this), static_cast<AllocatorElements&>(b));
swap(static_cast<AllocatorStates&>(*this), static_cast<AllocatorStates&>(b));
swap(static_cast<StorageAllocator&>(*this), static_cast<StorageAllocator&>(b));
Base::swap(b);
swap(size_, b.size_);
swap(states_, b.states_);
swap(elements_, b.elements_);
Expand Down
12 changes: 10 additions & 2 deletions src/huge_pages.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,12 @@ struct HugePageAllocator : HugePageAllocatorBase

using value_type = T;

HugePageAllocator() noexcept = default;

template<class U>
HugePageAllocator(HugePageAllocator<U>) noexcept
{}

T* allocate(size_t n) const {
return static_cast<T*>(hp->allocate(n * sizeof(T)));
}
Expand All @@ -151,11 +157,13 @@ struct HugePageAllocator : HugePageAllocatorBase
hp->deallocate(p, n * sizeof(T));
}

bool operator==(HugePageAllocator b) const {
template<class U>
bool operator==(HugePageAllocator<U> b) const {
return hp == b.hp;
}

bool operator!=(HugePageAllocator b) const {
template<class U>
bool operator!=(HugePageAllocator<U> b) const {
return hp != b.hp;
}
};
Expand Down

0 comments on commit 7c8c456

Please sign in to comment.