Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Priority Queue Multithreaded Test #15

Merged
merged 3 commits into from
Dec 27, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions tests/spsc/priority_queue.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include <algorithm>
#include <math.h>
#include <thread>

#include <catch2/catch_test_macros.hpp>

Expand Down Expand Up @@ -66,6 +67,93 @@ TEST_CASE("Write multiple with different priority and read back ensuring "
REQUIRE(read == 1024);
}

TEST_CASE("Multithreaded read/write", "[pq_multithreaded]") {
lockfree::spsc::PriorityQueue<uint64_t, 10, 4> queue;
std::vector<std::thread> threads;
std::vector<uint64_t> written;
std::vector<uint64_t> read;

/* The following code pushes a large number of values into a priority
queue using four different priorities. The priority is also encoded
into the value pushed, as its two lower significance bits.

Both consumer and producer store in two vectors the numbers written
and read.

After the multi-threaded execution, special code (described in detail
below) checks no higher priority value was written to the priority
queue at the time a lower priority one was read.
*/

// consumer, it just pops values from the queue and stores them in the
// main thread vector
threads.emplace_back([&]() {
uint64_t value = 0;
uint64_t cnt = 0;
do {
bool pop_success = queue.Pop(value);
if (pop_success) {
read.push_back(value);
cnt++;
}
} while (cnt < TEST_MT_TRANSFER_CNT);
});

// producer, uses alternative priorities and pushes a counter shifted to
// accommodate the priority on its lower bits.
threads.emplace_back([&]() {
uint64_t cnt = 0;
uint64_t value = 0;
uint8_t prio = 0;
do {
value = cnt << 2 + prio;
bool push_success = queue.Push(value, prio);
if (push_success) {
written.push_back(value);
prio = (prio + 1) % 4; // this could be also randomly generated
cnt++;
}
} while (cnt < TEST_MT_TRANSFER_CNT + 1);
});
for (auto &t : threads) {
t.join();
}
/* The following code checks that at all times no higher priority value was
present in the `written` vector.

It needs to keep track which values were already read, it does that with
the help of the `consumed` Boolean vector.
*/
std::vector<bool> consumed(written.size(), false);
uint64_t value1, value2;
uint8_t prio1, prio2;
bool found;
for(size_t idx=0; idx<read.size(); idx++) {
// the value was read
value1 = read[idx];
// extract the priority encoded in the value
prio1 = value1 & ((1<<2) - 1);

found = false;
for(size_t idx2=0; idx2<written.size(); idx2++) {
if(consumed[idx2]) { // consumed values are skipped
continue;
}
// find when the value was written
value2 = written[idx2];
prio2 = value2 & ((1<<2) - 1);
if(written[idx2] == value1) {
consumed[idx2] = true; // this value is now accounted for
found = true;
break;
} else { // intermediate value, should be lower priority
REQUIRE(prio2 <= prio1);
}
}
REQUIRE(found);
}
}

TEST_CASE("Optional API", "[pq_optional_api]") {
lockfree::spsc::PriorityQueue<int16_t, 20, 3> queue;

Expand Down