Skip to content

Commit

Permalink
Merge pull request #186 from insertinterestingnamehere/tsan
Browse files Browse the repository at this point in the history
Resolve Race Condition in Nemesis Threadqueue (#187)
  • Loading branch information
insertinterestingnamehere authored Feb 7, 2024
2 parents b1171cb + 85c5389 commit a2d1dba
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 68 deletions.
13 changes: 2 additions & 11 deletions include/qt_atomics.h
Original file line number Diff line number Diff line change
@@ -1,23 +1,14 @@
#ifndef QT_ATOMICS_H
#define QT_ATOMICS_H

#include <stdatomic.h>
#include <sys/time.h>

#include <qthread/common.h>
#include <qthread/qthread.h>

#if (__STDC_VERSION__ >= 201112L) && (!defined(__STDC_NO_ATOMICS__))
#define USE_C11_MEMORY_FENCE
#include <stdatomic.h>
#endif

#ifdef USE_C11_MEMORY_FENCE
#define THREAD_FENCE_MEM_ACQUIRE_IMPL atomic_thread_fence(memory_order_acquire)
#define THREAD_FENCE_MEM_RELEASE_IMPL atomic_thread_fence(memory_order_release)
#else
#define THREAD_FENCE_MEM_ACQUIRE_IMPL MACHINE_FENCE
#define THREAD_FENCE_MEM_RELEASE_IMPL MACHINE_FENCE
#endif

#if (QTHREAD_ASSEMBLY_ARCH == QTHREAD_IA32) || \
(QTHREAD_ASSEMBLY_ARCH == QTHREAD_AMD64)
Expand Down Expand Up @@ -750,7 +741,7 @@ static QINLINE aligned_t qthread_internal_incr_mod_(aligned_t *opera
static QINLINE void *qt_internal_atomic_swap_ptr(void **addr,
void *newval)
{ /*{{{*/
void *oldval = *addr;
void *oldval = atomic_load_explicit((void *_Atomic *)addr, memory_order_relaxed);
void *tmp;

while ((tmp = qthread_cas_ptr(addr, oldval, newval)) != oldval) {
Expand Down
123 changes: 66 additions & 57 deletions src/threadqueues/nemesis_threadqueues.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
/* System Headers */
#include <pthread.h>
#include <sys/types.h>
#include <stdatomic.h>
#include <stdio.h>
#include <stdlib.h>

Expand Down Expand Up @@ -42,14 +43,14 @@ int num_spins_before_condwait;

/* Data Structures */
struct _qt_threadqueue_node {
struct _qt_threadqueue_node *next;
struct _qt_threadqueue_node *_Atomic next;
qthread_t *thread;
};

typedef struct {
/* The First Cacheline */
void *head;
void *tail;
void *_Atomic head;
void *_Atomic tail;
uint8_t pad1[CACHELINE_WIDTH - (2 * sizeof(void *))];
/* The Second Cacheline */
void *shadow_head;
Expand Down Expand Up @@ -110,7 +111,9 @@ qt_threadqueue_t INTERNAL *qt_threadqueue_new(void)

qassert_ret(q != NULL, NULL);

q->q.shadow_head = q->q.head = q->q.tail = NULL;
atomic_init(&q->q.head, NULL);
atomic_init(&q->q.tail, NULL);
q->q.shadow_head = NULL;
q->advisory_queuelen = 0;
q->q.nemesis_advisory_queuelen = 0; // redundant
#ifdef QTHREAD_CONDWAIT_BLOCKING_QUEUE
Expand All @@ -124,27 +127,31 @@ qt_threadqueue_t INTERNAL *qt_threadqueue_new(void)
static inline qt_threadqueue_node_t *qt_internal_NEMESIS_dequeue(NEMESIS_queue *q)
{ /*{{{ */
if (!q->shadow_head) {
if (!q->head) {
void *head = atomic_load_explicit(&q->head, memory_order_relaxed);
if (!head) {
return NULL;
}
q->shadow_head = q->head;
q->head = NULL;
q->shadow_head = head;

atomic_store_explicit(&q->head, NULL, memory_order_relaxed);
}

qt_threadqueue_node_t *const retval = (void *volatile)(q->shadow_head);

if ((retval != NULL) && (retval != (void *)1)) {
if (retval->next != NULL) {
q->shadow_head = retval->next;
retval->next = NULL;
struct _qt_threadqueue_node *next_loc = atomic_load_explicit(&retval->next, memory_order_acquire);
if (next_loc != NULL) {
q->shadow_head = next_loc;
atomic_store_explicit(&retval->next, NULL, memory_order_relaxed);
} else {
qt_threadqueue_node_t *old;
q->shadow_head = NULL;
old = qthread_cas_ptr(&(q->tail), retval, NULL);
old = qthread_cas_ptr((void**)&(q->tail), retval, NULL);
if (old != retval) {
while (retval->next == NULL) SPINLOCK_BODY();
q->shadow_head = retval->next;
retval->next = NULL;
void *retval_next_tmp;
while ((retval_next_tmp = atomic_load_explicit(&retval->next, memory_order_relaxed)) == NULL) SPINLOCK_BODY();
q->shadow_head = retval_next_tmp;
atomic_store_explicit(&retval->next, NULL, memory_order_relaxed);
}
}
}
Expand All @@ -154,27 +161,29 @@ static inline qt_threadqueue_node_t *qt_internal_NEMESIS_dequeue(NEMESIS_queue *
static inline qt_threadqueue_node_t *qt_internal_NEMESIS_dequeue_st(NEMESIS_queue *q)
{ /*{{{ */
if (!q->shadow_head) {
if (!q->head) {
void *head = atomic_load_explicit(&q->head, memory_order_relaxed);
if (!head) {
return NULL;
}
q->shadow_head = q->head;
q->head = NULL;
q->shadow_head = head;
atomic_store_explicit(&q->head, NULL, memory_order_relaxed);
}

qt_threadqueue_node_t *const retval = (void *volatile)(q->shadow_head);

if ((retval != NULL) && (retval != (void *)1)) {
if (retval->next != NULL) {
q->shadow_head = retval->next;
retval->next = NULL;
void *retval_next_tmp = atomic_load_explicit(&retval->next, memory_order_relaxed);
if (retval_next_tmp != NULL) {
q->shadow_head = retval_next_tmp;
atomic_store_explicit(&retval->next, NULL, memory_order_relaxed);
} else {
q->shadow_head = NULL;
if (q->tail == retval) {
q->tail = NULL;
if (atomic_load_explicit(&q->tail, memory_order_relaxed) == retval) {
atomic_store_explicit(&q->tail, NULL, memory_order_relaxed);
}
}
}
qthread_debug(THREADQUEUE_DETAILS, "nemesis q:%p head:%p tail:%p shadow_head:%p\n", q, q->head, q->tail, q->shadow_head);
qthread_debug(THREADQUEUE_DETAILS, "nemesis q:%p head:%p tail:%p shadow_head:%p\n", q, atomic_load_explicit(&q->head, memory_order_relaxed), atomic_load_explicit(&q->tail, memory_order_relaxed), q->shadow_head);
return retval;
} /*}}} */

Expand All @@ -185,7 +194,7 @@ void INTERNAL qt_threadqueue_free(qt_threadqueue_t *q)
qt_threadqueue_node_t *node = qt_internal_NEMESIS_dequeue_st(&q->q);
if (node) {
qthread_t *retval = node->thread;
assert(node->next == NULL);
assert(atomic_load_explicit(&node->next, memory_order_relaxed) == NULL);
(void)qthread_incr(&(q->advisory_queuelen), -1);
FREE_TQNODE(node);
qthread_thread_free(retval);
Expand Down Expand Up @@ -242,18 +251,18 @@ static void sanity_check_tq(NEMESIS_queue *q)
if (q->shadow_head) {
assert(q->head != q->shadow_head);
}
if (q->tail != NULL) {
if (q->head == NULL) {
if (atomic_load_explicit(&q->tail, memory_order_relaxed) != NULL) {
if (atomic_load_explicit(&q->head, memory_order_relaxed) == NULL) {
assert(q->shadow_head != NULL);
}
}
if ((q->head != NULL) || (q->tail != NULL)) {
if ((atomic_load_explicit(&q->head, memory_order_relaxed) != NULL) || (atomic_load_explicit(&q->tail, memory_order_relaxed) != NULL)) {
if (q->shadow_head) {
curs = q->shadow_head;
assert(curs->thread);
assert(curs->thread != (void *)0x7777777777777777);
while (curs->next) {
curs = curs->next;
while (atomic_load_explicit(&curs->next, memory_order_relaxed)) {
curs = atomic_load_explicit(&curs->next, memory_order_relaxed);
assert(curs->thread);
assert(curs->thread != (void *)0x7777777777777777);
}
Expand All @@ -262,8 +271,8 @@ static void sanity_check_tq(NEMESIS_queue *q)
curs = q->head;
assert(curs->thread);
assert(curs->thread != (void *)0x7777777777777777);
while (curs->next) {
curs = curs->next;
while (atomic_load_explicit(&curs->next, memory_order_relaxed)) {
curs = atomic_load_explicit(&curs->next, memory_order_relaxed);
assert(curs->thread);
assert(curs->thread != (void *)0x7777777777777777);
}
Expand Down Expand Up @@ -305,14 +314,14 @@ void INTERNAL qt_threadqueue_enqueue(qt_threadqueue_t *restrict q,
node = ALLOC_TQNODE();
assert(node != NULL);
node->thread = t;
node->next = NULL;
atomic_store_explicit(&node->next, NULL, memory_order_release);

prev = qt_internal_atomic_swap_ptr((void **)&(q->q.tail), node);

if (prev == NULL) {
q->q.head = node;
atomic_store_explicit(&q->q.head, node, memory_order_relaxed);
} else {
prev->next = node;
atomic_store_explicit(&prev->next, node, memory_order_relaxed);
}
PARANOIA(sanity_check_tq(&q->q));
(void)qthread_incr(&(q->advisory_queuelen), 1);
Expand Down Expand Up @@ -354,12 +363,12 @@ qthread_t INTERNAL *qt_scheduler_get_thread(qt_threadqueue_t *q,
#ifdef QTHREAD_USE_EUREKAS
qt_eureka_disable();
#endif /* QTHREAD_USE_EUREKAS */
qthread_debug(THREADQUEUE_DETAILS, "q(%p)->q {head:%p tail:%p sh:%p} q->advisory_queuelen:%u\n", q, q->q.head, q->q.tail, q->q.shadow_head, q->advisory_queuelen);
qthread_debug(THREADQUEUE_DETAILS, "q(%p)->q {head:%p tail:%p sh:%p} q->advisory_queuelen:%u\n", q, atomic_load_explicit(&q->q.head, memory_order_relaxed), atomic_load_explicit(&q->q.tail, memory_order_relaxed), q->q.shadow_head, q->advisory_queuelen);
PARANOIA(sanity_check_tq(&q->q));
qt_threadqueue_node_t *node = qt_internal_NEMESIS_dequeue(&q->q);
qthread_t *retval;

qthread_debug(THREADQUEUE_DETAILS, "q(%p)->q {head:%p tail:%p sh:%p} q->advisory_queuelen:%u\n", q, q->q.head, q->q.tail, q->q.shadow_head, q->advisory_queuelen);
qthread_debug(THREADQUEUE_DETAILS, "q(%p)->q {head:%p tail:%p sh:%p} q->advisory_queuelen:%u\n", q, atomic_load_explicit(&q->q.head, memory_order_relaxed), atomic_load_explicit(&q->q.tail), q->q.shadow_head, q->advisory_queuelen);
PARANOIA(sanity_check_tq(&q->q));
if (node == NULL) {
#ifdef QTHREAD_USE_EUREKAS
Expand All @@ -368,13 +377,13 @@ qthread_t INTERNAL *qt_scheduler_get_thread(qt_threadqueue_t *q,

#ifdef QTHREAD_CONDWAIT_BLOCKING_QUEUE
i = num_spins_before_condwait;
while (q->q.shadow_head == NULL && q->q.head == NULL && i > 0) {
while (q->q.shadow_head == NULL && atomic_load_explicit(&q->q.head, memory_order_relaxed) == NULL && i > 0) {
SPINLOCK_BODY();
i--;
}
#endif /* QTHREAD_CONDWAIT_BLOCKING_QUEUE */

while (q->q.shadow_head == NULL && q->q.head == NULL) {
while (q->q.shadow_head == NULL && atomic_load_explicit(&q->q.head, memory_order_relaxed) == NULL) {
#ifndef QTHREAD_CONDWAIT_BLOCKING_QUEUE
SPINLOCK_BODY();
#else
Expand All @@ -393,11 +402,11 @@ qthread_t INTERNAL *qt_scheduler_get_thread(qt_threadqueue_t *q,
node = qt_internal_NEMESIS_dequeue(&q->q);
}
assert(node);
assert(node->next == NULL);
assert(atomic_load_explicit(&node->next, memory_order_relaxed) == NULL);
(void)qthread_incr(&(q->advisory_queuelen), -1);
retval = node->thread;
FREE_TQNODE(node);
qthread_debug(THREADQUEUE_DETAILS, "q(%p)->q {head:%p tail:%p sh:%p} q->advisory_queuelen:%u\n", q, q->q.head, q->q.tail, q->q.shadow_head, q->advisory_queuelen);
qthread_debug(THREADQUEUE_DETAILS, "q(%p)->q {head:%p tail:%p sh:%p} q->advisory_queuelen:%u\n", q, atomic_load_explicit(&q->q.head, memory_order_relaxed), atomic_load_explicit(&q->q.tail), q->q.shadow_head, q->advisory_queuelen);
PARANOIA(sanity_check_tq(&q->q));
return retval;
} /*}}} */
Expand All @@ -412,33 +421,33 @@ void INTERNAL qt_threadqueue_filter(qt_threadqueue_t *q,
assert(q != NULL);
qthread_debug(THREADQUEUE_FUNCTIONS, "begin q:%p f:%p\n", q, f);

tmp.head = NULL;
tmp.tail = NULL;
atomic_init(&tmp.head, NULL);
atomic_init(&tmp.tail, NULL);
tmp.shadow_head = NULL;
tmp.nemesis_advisory_queuelen = 0;
qthread_debug(THREADQUEUE_DETAILS, "q(%p)->q {head:%p tail:%p} q->advisory_queuelen:%u\n", q, q->q.head, q->q.tail, q->advisory_queuelen);
qthread_debug(THREADQUEUE_DETAILS, "q(%p)->q {head:%p tail:%p} q->advisory_queuelen:%u\n", q, atomic_load_explicit(&q->q.head, memory_order_relaxed), atomic_load_explicit(&q->q.tail, memory_order_relaxed), q->advisory_queuelen);
PARANOIA(sanity_check_tq(&q->q));
while ((curs = qt_internal_NEMESIS_dequeue_st(&q->q))) {
qthread_t *t = curs->thread;
qthread_debug(THREADQUEUE_DETAILS, "q(%p)->q {head:%p tail:%p} q->advisory_queuelen:%u\n", q, q->q.head, q->q.tail, q->advisory_queuelen);
qthread_debug(THREADQUEUE_DETAILS, "q(%p)->q {head:%p tail:%p} q->advisory_queuelen:%u\n", q, atomic_load_explicit(&q->q.head, memory_order_relaxed), atomic_load_explicit(&q->q.tail, memory_order_relaxed), q->advisory_queuelen);
PARANOIA(sanity_check_tq(&tmp));
PARANOIA(sanity_check_tq(&q->q));
switch (f(t)) {
case IGNORE_AND_CONTINUE: // ignore, move on
prev = qt_internal_atomic_swap_ptr((void **)&(tmp.tail), curs);
if (prev == NULL) {
tmp.head = curs;
atomic_store_explicit(&tmp.head, curs, memory_order_relaxed);
} else {
prev->next = curs;
atomic_store_explicit(&prev->next, curs, memory_order_relaxed);
}
tmp.nemesis_advisory_queuelen++;
break;
case IGNORE_AND_STOP: // ignore, stop looking
prev = qt_internal_atomic_swap_ptr((void **)&(tmp.tail), curs);
if (prev == NULL) {
tmp.head = curs;
atomic_store_explicit(&tmp.head, curs, memory_order_relaxed);
} else {
prev->next = curs;
atomic_store_explicit(&prev->next, curs, memory_order_relaxed);
}
tmp.nemesis_advisory_queuelen++;
goto pushback;
Expand All @@ -458,24 +467,24 @@ void INTERNAL qt_threadqueue_filter(qt_threadqueue_t *q,
}
pushback:
/* dequeue the rest of the queue */
qthread_debug(THREADQUEUE_DETAILS, "q(%p)->q {head:%p tail:%p} q->advisory_queuelen:%u\n", q, q->q.head, q->q.tail, q->advisory_queuelen);
qthread_debug(THREADQUEUE_DETAILS, "tmp {head:%p tail:%p} tmp->advisory_queuelen:%u\n", tmp.head, tmp.tail, tmp.nemesis_advisory_queuelen);
qthread_debug(THREADQUEUE_DETAILS, "q(%p)->q {head:%p tail:%p} q->advisory_queuelen:%u\n", q, atomic_load_explicit(&q->q.head, memory_order_relaxed), atomic_load_explicit(&q->q.tail, memory_order_relaxed), q->advisory_queuelen);
qthread_debug(THREADQUEUE_DETAILS, "tmp {head:%p tail:%p} tmp->advisory_queuelen:%u\n", atomic_load_explicit(&tmp.head, memory_order_relaxed), atomic_load_explicit(&tmp.tail, memory_order_relaxed), tmp.nemesis_advisory_queuelen);
PARANOIA(sanity_check_tq(&tmp));
if (q->q.head) {
if (atomic_load_explicit(&q->q.head, memory_order_relaxed)) {
prev = qt_internal_atomic_swap_ptr((void **)&(tmp.tail), q->q.head);
if (prev == NULL) {
tmp.head = q->q.head;
atomic_store_explicit(&tmp.head, atomic_load_explicit(&q->q.head, memory_order_relaxed), memory_order_relaxed);
} else {
prev->next = q->q.head;
atomic_store_explicit(&prev->next, atomic_load_explicit(&q->q.head, memory_order_relaxed), memory_order_relaxed);
}
tmp.nemesis_advisory_queuelen += q->advisory_queuelen;
tmp.tail = q->q.tail;
atomic_store_explicit(&tmp.tail, atomic_load_explicit(&q->q.tail, memory_order_relaxed), memory_order_relaxed);
}
q->q.head = tmp.head;
q->q.tail = tmp.tail;
atomic_store_explicit(&q->q.head, atomic_load_explicit(&tmp.head, memory_order_relaxed), memory_order_relaxed);
atomic_store_explicit(&q->q.tail, atomic_load_explicit(&tmp.tail, memory_order_relaxed), memory_order_relaxed);
q->q.shadow_head = NULL;
q->advisory_queuelen = tmp.nemesis_advisory_queuelen;
qthread_debug(THREADQUEUE_DETAILS, "q(%p)->q {head:%p tail:%p} q->advisory_queuelen:%u\n", q, q->q.head, q->q.tail, q->advisory_queuelen);
qthread_debug(THREADQUEUE_DETAILS, "q(%p)->q {head:%p tail:%p} q->advisory_queuelen:%u\n", q, atomic_load_explicit(&q->q.head, memory_order_relaxed), atomic_load_explicit(&q->q.tail, memory_order_relaxed), q->advisory_queuelen);
PARANOIA(sanity_check_tq(&q->q));
} /*}}}*/

Expand Down

0 comments on commit a2d1dba

Please sign in to comment.