From 6ce2969a30bacd4826abe78f5714e2de56333532 Mon Sep 17 00:00:00 2001 From: ulatekh Date: Sat, 10 Feb 2018 15:48:42 -0700 Subject: [PATCH] Now the worker-queue feature works when mixing permanant and temporary worker-record registration. --- src/ringbuf.c | 84 ++++++++++++-------------------------------------- src/t_stress.c | 6 ++-- 2 files changed, 23 insertions(+), 67 deletions(-) diff --git a/src/ringbuf.c b/src/ringbuf.c index 9671eaa..3032a12 100644 --- a/src/ringbuf.c +++ b/src/ringbuf.c @@ -104,8 +104,8 @@ struct ringbuf { volatile ringbuf_off_t next; ringbuf_off_t end; - /* The range of worker-records in use. */ - worker_off_t first_used_worker, first_free_worker; + /* The index of the first potentially free worker-record. */ + worker_off_t first_free_worker; /* The following are updated by the consumer. */ ringbuf_off_t written; @@ -143,52 +143,6 @@ ringbuf_get_sizes(unsigned nworkers, *ringbuf_worker_size = sizeof(ringbuf_worker_t); } -/* - * prune_unregistered_workers: consume any range of unregistered - * worker-records. - */ -static worker_off_t -prune_unregistered_workers(ringbuf_t *rbuf) -{ - worker_off_t volatile *p_used_worker; - worker_off_t volatile const *p_free_worker; - worker_off_t old_used_worker, new_used_worker; - bool should_continue = true; - - p_used_worker = &rbuf->first_used_worker; - p_free_worker = &rbuf->first_free_worker; - - while (should_continue) { - - do { - worker_off_t old_used_worker_index, free_worker_index; - ringbuf_worker_t *w; - - /* If there is no more range of used worker-records, stop. */ - old_used_worker = *p_used_worker; - old_used_worker_index = (old_used_worker & RBUF_OFF_MASK); - free_worker_index = ((*p_free_worker) & RBUF_OFF_MASK); - if (old_used_worker_index == free_worker_index) { - should_continue = false; - break; - } - - /* If this worker-record is still in use, stop. */ - w = &rbuf->workers[old_used_worker_index]; - if (w->registered != not_registered) { - should_continue = false; - break; - } - - /* Advance past this unused worker-record. */ - new_used_worker = (old_used_worker_index + 1) % rbuf->nworkers; - new_used_worker |= WRAP_INCR(old_used_worker); - } while (!atomic_compare_exchange_weak(p_used_worker, old_used_worker, new_used_worker)); - } - - return old_used_worker; -} - /* * register_worker: allocate a worker-record for a thread/process, * and pass back the pointer to its local store. @@ -197,47 +151,37 @@ prune_unregistered_workers(ringbuf_t *rbuf) static ringbuf_worker_t * register_worker(ringbuf_t *rbuf, unsigned registration_type) { - worker_off_t volatile *p_used_worker, *p_free_worker; + worker_off_t volatile *p_free_worker; int acquired; ringbuf_worker_t *w = NULL; /* Try to find a worker-record that can be registered. */ - p_used_worker = &rbuf->first_used_worker; p_free_worker = &rbuf->first_free_worker; acquired = false; while (!acquired) { - worker_off_t old_used_worker, prev_free_worker, - old_free_worker, new_free_worker; + worker_off_t prev_free_worker, old_free_worker, new_free_worker, + i; /* * Get the current range of used worker-records, after trying * to prune recently-unregistered workers. */ - old_used_worker = prune_unregistered_workers(rbuf); prev_free_worker = *p_free_worker; - /* - * If there are no potentially free worker-records, - * let our caller know. - */ - if ((((prev_free_worker & RBUF_OFF_MASK) + 1) % rbuf->nworkers) - == (old_used_worker & RBUF_OFF_MASK)) - return NULL; - /* Exclusively acquire a worker-record index. */ - while (!acquired) { + for (i = 0; !acquired && i < rbuf->nworkers; ++i) { /* Prepare to acquire a worker-record index. */ old_free_worker = (*p_free_worker); if (old_free_worker != prev_free_worker) break; - new_free_worker = ((old_free_worker & RBUF_OFF_MASK) + 1) - % rbuf->nworkers; + new_free_worker = ((old_free_worker & RBUF_OFF_MASK) + + i) % rbuf->nworkers; new_free_worker |= WRAP_INCR(old_free_worker); /* Remember that this worker-record is being registered. */ w = &rbuf->workers[new_free_worker & RBUF_OFF_MASK]; if (!atomic_compare_exchange_weak(&w->registered, not_registered, being_registered)) - break; + continue; acquired = true; /* Swap the indexes to exclusively acquire the worker-record. */ @@ -343,6 +287,11 @@ ringbuf_acquire(ringbuf_t *rbuf, ringbuf_worker_t **pw, size_t len) if (__predict_false(next < written && target >= written)) { /* The producer must wait. */ w->seen_off = RBUF_OFF_MAX; + if (w->registered == temp_registered) { + *pw = NULL; + atomic_thread_fence(memory_order_release); + w->registered = not_registered; + } return -1; } @@ -362,6 +311,11 @@ ringbuf_acquire(ringbuf_t *rbuf, ringbuf_worker_t **pw, size_t len) target = exceed ? (WRAP_LOCK_BIT | len) : 0; if ((target & RBUF_OFF_MASK) >= written) { w->seen_off = RBUF_OFF_MAX; + if (w->registered == temp_registered) { + *pw = NULL; + atomic_thread_fence(memory_order_release); + w->registered = not_registered; + } return -1; } /* Increment the wrap-around counter. */ diff --git a/src/t_stress.c b/src/t_stress.c index 7bac066..924516c 100644 --- a/src/t_stress.c +++ b/src/t_stress.c @@ -102,8 +102,10 @@ ringbuf_stress(void *arg) { const unsigned id = (uintptr_t)arg; ringbuf_worker_t *w = NULL; - //ringbuf_worker_t *w = ringbuf_register(ringbuf); - //assert (w != NULL); + if (id == 1) { + w = ringbuf_register(ringbuf); + assert (w != NULL); + } uint64_t total_recv = 0; /*