Skip to content

Commit

Permalink
Merge pull request #229 from insertinterestingnamehere/shepherd_atomics
Browse files Browse the repository at this point in the history
Explicit Atomics For Shepherd And Worker Active Fields
  • Loading branch information
insertinterestingnamehere authored Feb 6, 2024
2 parents 86accb0 + d4b479b commit 3f4d01a
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 18 deletions.
4 changes: 2 additions & 2 deletions include/qt_shepherd_innards.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ struct qthread_worker_s {
#ifdef QTHREAD_PERFORMANCE
struct qtperfdata_s* performance_data;
#endif
Q_ALIGNED(8) uint_fast8_t QTHREAD_CASLOCK(active);
_Atomic Q_ALIGNED(8) uint_fast8_t active;
};
typedef struct qthread_worker_s qthread_worker_t;

Expand All @@ -66,7 +66,7 @@ struct qthread_shepherd_s {
#endif /* ifdef QTHREAD_LOCAL_PRIORITY */
/* round robin scheduler - can probably be smarter */
aligned_t sched_shepherd;
uintptr_t QTHREAD_CASLOCK(active);
_Atomic uintptr_t active;
/* affinity information */
unsigned int node; /* whereami */
#ifdef QTHREAD_HAVE_LGRP
Expand Down
25 changes: 12 additions & 13 deletions src/qthread.c
Original file line number Diff line number Diff line change
Expand Up @@ -466,13 +466,13 @@ static void *qthread_master(void *arg)
#endif
qthread_debug(SHEPHERD_DETAILS, "id(%i): fetching a thread from my queue...\n", my_id);

while (!QTHREAD_CASLOCK_READ_UI(me_worker->active)) {
while (!atomic_load_explicit(&me_worker->active, memory_order_relaxed)) {
SPINLOCK_BODY();
}
#ifdef QTHREAD_LOCAL_PRIORITY
t = qt_scheduler_get_thread(threadqueue, localpriorityqueue, localqueue, QTHREAD_CASLOCK_READ_UI(me->active));
t = qt_scheduler_get_thread(threadqueue, localpriorityqueue, localqueue, atomic_load_explicit(&me->active, memory_order_relaxed));
#else
t = qt_scheduler_get_thread(threadqueue, localqueue, QTHREAD_CASLOCK_READ_UI(me->active));
t = qt_scheduler_get_thread(threadqueue, localqueue, atomic_load_explicit(&me->active, memory_order_relaxed));
#endif /* ifdef QTHREAD_LOCAL_PRIORITY */
assert(t);
#ifdef QTHREAD_SHEPHERD_PROFILING
Expand Down Expand Up @@ -521,15 +521,15 @@ static void *qthread_master(void *arg)
}

if ((t->target_shepherd != NO_SHEPHERD) && (t->target_shepherd != my_id) &&
QTHREAD_CASLOCK_READ_UI(qlib->shepherds[t->target_shepherd].active)) {
atomic_load_explicit(&qlib->shepherds[t->target_shepherd].active, memory_order_relaxed)) {
/* send this thread home */
qthread_debug(THREAD_DETAILS,
"id(%u): thread %u going back home to shep %u\n",
my_id, t->thread_id, t->target_shepherd);
t->rdata->shepherd_ptr = &qlib->shepherds[t->target_shepherd];
assert(t->rdata->shepherd_ptr->ready != NULL);
qt_threadqueue_enqueue(qlib->shepherds[t->target_shepherd].ready, t);
} else if (!QTHREAD_CASLOCK_READ_UI(me->active)) {
} else if (!atomic_load_explicit(&me->active, memory_order_relaxed)) {
qthread_debug(THREAD_DETAILS,
"id(%u): skipping thread exec because I've been disabled!\n",
my_id);
Expand Down Expand Up @@ -613,10 +613,10 @@ static void *qthread_master(void *arg)
{
#ifdef QTHREAD_LOCAL_PRIORITY
qthread_t *f = qt_scheduler_get_thread(threadqueue, localpriorityqueue, NULL,
QTHREAD_CASLOCK_READ_UI(me->active));
atomic_load_explicit(&me->active, memory_order_relaxed));
#else
qthread_t *f = qt_scheduler_get_thread(threadqueue, NULL,
QTHREAD_CASLOCK_READ_UI(me->active));
atomic_load_explicit(&me->active, memory_order_relaxed));
#endif /* ifdef QTHREAD_LOCAL_PRIORITY */
qt_threadqueue_enqueue(me->ready, t);
qt_threadqueue_enqueue(me->ready, f);
Expand Down Expand Up @@ -1071,8 +1071,8 @@ int API_FUNC qthread_initialize(void)
qthread_debug(CORE_DETAILS, "calling qthread_makecontext\n");
qlib->shepherds[0].workers[0].worker = pthread_self();
qlib->shepherds[0].workers[0].shepherd = &qlib->shepherds[0];
QTHREAD_CASLOCK_INIT(qlib->shepherds[0].workers[0].active, 1);
qthread_debug(CORE_DETAILS, "initialized caslock 0,0 %p\n", &qlib->shepherds[0].workers[0].active);
atomic_store_explicit(&qlib->shepherds[0].workers[0].active, 1, memory_order_relaxed);
qthread_debug(CORE_DETAILS, "initialized caslock 0,0 %p\n", atomic_load_explicit(&qlib->shepherds[0].workers[0].active, memory_order_relaxed));
qlib->shepherds[0].workers[0].worker_id = 0;
qlib->shepherds[0].workers[0].unique_id = qthread_internal_incr(&(qlib->max_unique_id),
&qlib->max_unique_id_lock, 1);
Expand Down Expand Up @@ -1155,10 +1155,10 @@ int API_FUNC qthread_initialize(void)

if ((j * nshepherds) + i + 1 > hw_par) {
qthread_debug(CORE_DETAILS, "deactivate shep %i's worker %i\n", (int)i, (int)j);
QTHREAD_CASLOCK_INIT(qlib->shepherds[i].workers[j].active, 0);
atomic_store_explicit(&qlib->shepherds[i].workers[j].active, 0, memory_order_relaxed);
} else {
qthread_debug(CORE_DETAILS, "activate shep %i's worker %i\n", (int)i, (int)j);
QTHREAD_CASLOCK_INIT(qlib->shepherds[i].workers[j].active, 1);
atomic_store_explicit(&qlib->shepherds[i].workers[j].active, 1, memory_order_relaxed);
}
qthread_debug(CORE_DETAILS, "initialized caslock %i,%i %p\n", i, j, &qlib->shepherds[i].workers[j].active);
qlib->shepherds[i].workers[j].shepherd = &qlib->shepherds[i];
Expand Down Expand Up @@ -1430,7 +1430,7 @@ void API_FUNC qthread_finalize(void)
t->thread_id = QTHREAD_NON_TASK_ID;
t->flags = QTHREAD_UNSTEALABLE;
qt_threadqueue_enqueue(qlib->shepherds[i].ready, t);
if (!QTHREAD_CASLOCK_READ_UI(qlib->shepherds[i].workers[j].active)) {
if (!atomic_load_explicit(&qlib->shepherds[i].workers[j].active, memory_order_relaxed)) {
qthread_debug(SHEPHERD_DETAILS, "re-enabling worker %i:%i, so he can exit\n", (int)i, (int)j);
(void)QT_CAS(qlib->shepherds[i].workers[j].active, 0, 1);
}
Expand Down Expand Up @@ -1501,7 +1501,6 @@ void API_FUNC qthread_finalize(void)
}
FREE(qlib->shepherds[i].workers, qlib->nworkerspershep * sizeof(qthread_worker_t));
if (i == 0) { continue; }
QTHREAD_CASLOCK_DESTROY(shep->active);
qt_threadqueue_free(shep->ready);
#ifdef QTHREAD_LOCAL_PRIORITY
qt_threadqueue_free(shep->local_priority_queue);
Expand Down
6 changes: 3 additions & 3 deletions src/shepherds.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ int API_FUNC qthread_shep_ok(void)
if (ret == NULL) {
return QTHREAD_PTHREAD_ERROR;
} else {
return QTHREAD_CASLOCK_READ_UI(ret->active);
return atomic_load_explicit(&ret->active, memory_order_relaxed);
}
} /*}}} */

Expand Down Expand Up @@ -212,7 +212,7 @@ qthread_shepherd_t INTERNAL *qthread_find_active_shepherd(qthread_shepherd_id_t
int found = 0;

for (size_t i = 0; i < nsheps; i++) {
if (QTHREAD_CASLOCK_READ_UI(sheps[i].active)) {
if (atomic_load_explicit(&sheps[i].active, memory_order_relaxed)) {
ssize_t shep_busy_level = qt_threadqueue_advisory_queuelen(sheps[i].ready);

if (found == 0) {
Expand Down Expand Up @@ -252,7 +252,7 @@ qthread_shepherd_t INTERNAL *qthread_find_active_shepherd(qthread_shepherd_id_t
saligned_t busyness;
unsigned int target_dist;

while (target < (nsheps - 1) && QTHREAD_CASLOCK_READ_UI(sheps[l[target]].active) == 0) {
while (target < (nsheps - 1) && atomic_load_explicit(&sheps[l[target]].active, memory_order_relaxed) == 0) {
target++;
}
if (target >= (nsheps - 1)) {
Expand Down

0 comments on commit 3f4d01a

Please sign in to comment.