Skip to content

Commit

Permalink
Merge pull request #1562 from janbar/read_write_lock_reentrant
Browse files Browse the repository at this point in the history
Read write lock fully reentrant
  • Loading branch information
Framstag authored Feb 17, 2024
2 parents 16af10b + 804a1e6 commit 79a9065
Show file tree
Hide file tree
Showing 3 changed files with 199 additions and 61 deletions.
73 changes: 53 additions & 20 deletions Tests/src/Latch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,43 @@ TEST_CASE("Multi Recursive Writer Worker") {
REQUIRE(refCounter == pc);
}

TEST_CASE("Check Re-Entrant One Reader Writer") {
osmscout::StopClock stopClock;
volatile int i=0;
osmscout::ReadLock rl(latch);

std::thread t([&i](){
osmscout::ReadLock rl0(latch);
{
osmscout::WriteLock wl(latch);
{
osmscout::ReadLock rl1(latch);
i++;
}
}
});

// wait until writer lock is requested
while (true) {
if (!latch.try_lock_shared()) {
// writer lock is requested already
break;
}
latch.unlock_shared();
std::this_thread::yield();
}

REQUIRE(i == 0);
rl.unlock();
t.join();

stopClock.Stop();

REQUIRE(i == 1);
std::cout << "<<< ReentrantOneReaderWriter...done: " << stopClock.ResultString() << std::endl;
std::cout << std::endl;
}

TEST_CASE("Check write precedence") {
volatile int i=0;
osmscout::ReadLock rl(latch);
Expand Down Expand Up @@ -517,9 +554,9 @@ TEST_CASE("Check write precedence") {
TEST_CASE("Second shared lock should be blocked when exclusive is requested") {
int nbreader=4;
volatile int i=0;
std::atomic<int> j=0;
std::atomic<int> blocked=0;
std::atomic<int> notblocked=0;
int blocked=0;
std::atomic<int> beg=0;
std::atomic<int> end=0;
std::vector<std::thread*> pools;
{
osmscout::ReadLock rl(latch);
Expand All @@ -540,40 +577,36 @@ TEST_CASE("Second shared lock should be blocked when exclusive is requested") {
}

for (int nr=0; nr < nbreader; ++nr) {
std::thread * tr = new std::thread([&j, &blocked, &notblocked](){
if (latch.try_lock_shared()) {
notblocked++;
latch.unlock_shared();
} else {
blocked++;
osmscout::ReadLock rl(latch);
j++;
}
std::thread * tr = new std::thread([&beg, &end](){
beg++;
osmscout::ReadLock rl(latch);
end++;
});
pools.push_back(tr);
}

// wait for everyone to get set up
int k=0;
while ((notblocked.load() + blocked.load()) < 1 && k++ < 1000) {
while (beg.load() != nbreader && k++ < 1000) {
std::this_thread::yield();
}
std::this_thread::sleep_for(std::chrono::milliseconds(500));
REQUIRE(i == 0); // write lock is still waiting
REQUIRE((blocked.load() + notblocked.load()) > 0);
blocked = beg.load() - end.load();
rl.unlock();
t.join();
}
std::cout << "#blocked: " << blocked.load() << "/" << nbreader << std::endl;
// hoping that 1 read lock have been blocked because exclusive lock was requested
REQUIRE((blocked.load() > 0));
std::cout << "#blocked: " << blocked << "/" << nbreader << std::endl;
// hoping that all read locks have been blocked because exclusive lock was requested
REQUIRE((blocked == nbreader));
// check BUG: thread was not awakened after broadcast signal
// wait for all readers, or fail when lost reader
int k=0;
while (j.load() != blocked.load() && k++ < 1000) {
while (end.load() != nbreader && k++ < 1000) {
std::this_thread::yield();
}
// all blocked readers must be finalized
REQUIRE(j.load() == blocked.load());
// all readers must be finalized
REQUIRE(end.load() == nbreader);
// cleanup
while (!pools.empty()) {
pools.back()->join();
Expand Down
32 changes: 21 additions & 11 deletions libosmscout/include/osmscout/async/ReadWriteLock.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

#include <osmscout/private/Config.h>

#include <array>
#include <atomic>
#include <thread>
#include <mutex>
Expand All @@ -39,14 +38,11 @@ namespace osmscout {
* reverts to lock and wait in race condition.
*/

constexpr size_t latch_bucket_count = 64;

class OSMSCOUT_API Latch {
private:
mutable std::atomic_flag s_spin = ATOMIC_FLAG_INIT;
mutable std::atomic<bool> s_spin = false;

volatile int x_wait = 0; /* counts requests in wait for X */
volatile int s_count = 0; /* counts held S locks */
volatile int x_flag = 0; /* X status: 0, 1, 2, or 3 */
std::thread::id x_owner; /* X owner (thread id) */

Expand All @@ -56,25 +52,39 @@ class OSMSCOUT_API Latch {
std::condition_variable s_gate; /* wait for release of S */

bool px = true; /* enable X precedence */
std::array<int,latch_bucket_count> s_buckets{};

struct TNode {
TNode * _prev = nullptr;
TNode * _next = nullptr;
std::thread::id id;
int count = 0;
};
TNode * s_freed = nullptr;
TNode * s_nodes = nullptr;

void spin_lock() {
while (s_spin.test_and_set(std::memory_order_acquire)) {
std::this_thread::yield();
while (s_spin.exchange(true, std::memory_order_acquire)) {
do {
std::this_thread::yield();
} while (s_spin.load(std::memory_order_relaxed));
}
}
void spin_unlock() {
s_spin.clear(std::memory_order_release);
s_spin.store(false, std::memory_order_release);
}

TNode * find_node(const std::thread::id& id);
TNode * new_node(const std::thread::id& id);
void free_node(TNode * n);

public:
Latch() = default;
Latch();
explicit Latch(bool _px) : px(_px) { }
Latch(const Latch&) = delete;
Latch(Latch&&) = delete;
Latch& operator=(const Latch&) = delete;
Latch& operator=(Latch&&) = delete;
~Latch() = default;
~Latch();

/* Locks the latch for exclusive ownership,
* blocks if the latch is not available
Expand Down
155 changes: 125 additions & 30 deletions libosmscout/src/osmscout/async/ReadWriteLock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,85 @@ constexpr int X_STEP_1 = 1;
constexpr int X_STEP_2 = 2;
constexpr int X_STEP_3 = 3;

Latch::Latch() {
/* preallocate free list with 2 nodes */
TNode * n1 = new_node(std::thread::id());
TNode * n2 = new_node(std::thread::id());
free_node(n1);
free_node(n2);
}

Latch::~Latch() {
/* destroy free nodes */
while (s_freed != nullptr) {
TNode * n = s_freed;
s_freed = s_freed->_next;
delete n;
}
/* it should be empty, but still tries to destroy any existing busy node */
while (s_nodes != nullptr) {
TNode * n = s_nodes;
s_nodes = s_nodes->_next;
delete n;
}
}

Latch::TNode * Latch::find_node(const std::thread::id& id)
{
TNode * p = s_nodes;
while (p != nullptr && p->id != id) {
p = p->_next;
}
return p;
}

Latch::TNode * Latch::new_node(const std::thread::id& id)
{
TNode * p;
if (s_freed == nullptr) {
/* create node */
p = new TNode();
} else {
/* pop front from free list */
p = s_freed;
s_freed = p->_next;
}

/* setup */
p->id = id;
p->count = 0;

/* push front in list */
p->_prev = nullptr;
p->_next = s_nodes;
if (s_nodes != nullptr) {
s_nodes->_prev = p;
}
s_nodes = p;
return p;
}

void Latch::free_node(TNode * n)
{
/* remove from list */
if (n == s_nodes) {
s_nodes = n->_next;
} else {
n->_prev->_next = n->_next;
}
if (n->_next != nullptr) {
n->_next->_prev = n->_prev;
}

/* push front in free list */
if (s_freed != nullptr) {
s_freed->_prev = n;
}
n->_next = s_freed;
n->_prev = nullptr;
s_freed = n;
}

void Latch::lock() {
/* Depending on the internal implementation of conditional variable,
* a race condition could arise, permanently blocking the thread;
Expand Down Expand Up @@ -73,11 +152,14 @@ void Latch::lock() {
spin_lock();
}

/* find the thread node */
TNode * n = find_node(tid);
/* X = 1, check the releasing of S */
for (;;) {
/* if the count of S is zeroed then it finalize with no wait,
* in other case it have to wait for S gate */
if (s_count == 0) {
/* if the count of S is zeroed, or equal to self count, then it finalizes
* with no wait, in other case it has to wait for S gate
*/
if (s_nodes == nullptr || (s_nodes == n && s_nodes->_next == nullptr)) {
x_flag = X_STEP_3;
break;
} else {
Expand Down Expand Up @@ -127,10 +209,6 @@ void Latch::unlock() {
}
}

namespace {
static const std::hash<std::thread::id> thread_hash;
}

void Latch::lock_shared() {
/* Depending on the internal implementation of conditional variable,
* a race condition could arise, permanently blocking the thread;
Expand All @@ -141,6 +219,10 @@ void Latch::lock_shared() {
std::thread::id tid = std::this_thread::get_id();

spin_lock();

/* find the thread node */
TNode * n = find_node(tid);

if (x_owner != tid) {
/* if flag is 0 or 1 then it hold S with no wait,
* in other case it have to wait for X gate
Expand All @@ -153,10 +235,9 @@ void Latch::lock_shared() {
}
} else {
/* X precedence is true,
* estimate if this thread holds a recursive S lock
* test if this thread holds a recursive S lock
*/
if (x_flag == X_STEP_0 || (x_flag == X_STEP_1 &&
s_buckets[thread_hash(tid) % latch_bucket_count] > 0)) {
if (x_flag == X_STEP_0 || (x_flag == X_STEP_1 && n != nullptr)) {
break;
}
}
Expand All @@ -168,30 +249,41 @@ void Latch::lock_shared() {
spin_lock();
}
}
++s_count;
if (px) {
/* X precedence is true */
++s_buckets[thread_hash(tid) % latch_bucket_count];
if (n == nullptr) {
n = new_node(tid);
}
/* increment recursive count for this thread */
++n->count;

spin_unlock();
}

void Latch::unlock_shared() {
std::thread::id tid = std::this_thread::get_id();

spin_lock();
if (px) {
/* X precedence is true */
--s_buckets[thread_hash(tid) % latch_bucket_count];
}
/* on last S, finalize X request in wait, and notify */
if (--s_count == 0 && x_flag == X_STEP_1 && x_owner != tid) {
x_flag = X_STEP_3;
/* !!! unlock spin then pop gate (reverse order for X receiver) */
spin_unlock();
std::unique_lock<std::mutex> lk(s_gate_lock);
s_gate.notify_one();
lk.unlock();

/* find the thread node */
TNode * n = find_node(tid);
/* does it own shared lock ? */
assert(n != nullptr);

/* decrement recursive count for this thread, finally free */
if (--n->count == 0) {
free_node(n);
/* on last S, finalize X request in wait, and notify */
if (x_flag == X_STEP_1 && x_owner != tid) {
if (s_nodes == nullptr) {
x_flag = X_STEP_3;
}
/* !!! unlock spin then pop gate (reverse order for X receiver) */
spin_unlock();
std::unique_lock<std::mutex> lk(s_gate_lock);
s_gate.notify_one();
lk.unlock();
} else {
spin_unlock();
}
} else {
spin_unlock();
}
Expand All @@ -206,11 +298,14 @@ bool Latch::try_lock_shared()
* in other case fails
*/
if (x_flag == X_STEP_0 || x_owner == tid) {
++s_count;
if (px) {
/* X precedence is true */
++s_buckets[thread_hash(tid) % latch_bucket_count];
/* find the thread node, else create */
TNode * n = find_node(tid);
if (n == nullptr) {
n = new_node(tid);
}
/* increment recursive count for this thread */
++n->count;

spin_unlock();
return true;
}
Expand Down

0 comments on commit 79a9065

Please sign in to comment.