Skip to content

Commit

Permalink
Now the worker-queue feature works when mixing permanant and temporary
Browse files Browse the repository at this point in the history
worker-record registration.
  • Loading branch information
ulatekh committed Feb 10, 2018
1 parent 9666acc commit 6ce2969
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 67 deletions.
84 changes: 19 additions & 65 deletions src/ringbuf.c
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ struct ringbuf {
volatile ringbuf_off_t next;
ringbuf_off_t end;

/* The range of worker-records in use. */
worker_off_t first_used_worker, first_free_worker;
/* The index of the first potentially free worker-record. */
worker_off_t first_free_worker;

/* The following are updated by the consumer. */
ringbuf_off_t written;
Expand Down Expand Up @@ -143,52 +143,6 @@ ringbuf_get_sizes(unsigned nworkers,
*ringbuf_worker_size = sizeof(ringbuf_worker_t);
}

/*
* prune_unregistered_workers: consume any range of unregistered
* worker-records.
*/
static worker_off_t
prune_unregistered_workers(ringbuf_t *rbuf)
{
worker_off_t volatile *p_used_worker;
worker_off_t volatile const *p_free_worker;
worker_off_t old_used_worker, new_used_worker;
bool should_continue = true;

p_used_worker = &rbuf->first_used_worker;
p_free_worker = &rbuf->first_free_worker;

while (should_continue) {

do {
worker_off_t old_used_worker_index, free_worker_index;
ringbuf_worker_t *w;

/* If there is no more range of used worker-records, stop. */
old_used_worker = *p_used_worker;
old_used_worker_index = (old_used_worker & RBUF_OFF_MASK);
free_worker_index = ((*p_free_worker) & RBUF_OFF_MASK);
if (old_used_worker_index == free_worker_index) {
should_continue = false;
break;
}

/* If this worker-record is still in use, stop. */
w = &rbuf->workers[old_used_worker_index];
if (w->registered != not_registered) {
should_continue = false;
break;
}

/* Advance past this unused worker-record. */
new_used_worker = (old_used_worker_index + 1) % rbuf->nworkers;
new_used_worker |= WRAP_INCR(old_used_worker);
} while (!atomic_compare_exchange_weak(p_used_worker, old_used_worker, new_used_worker));
}

return old_used_worker;
}

/*
* register_worker: allocate a worker-record for a thread/process,
* and pass back the pointer to its local store.
Expand All @@ -197,47 +151,37 @@ prune_unregistered_workers(ringbuf_t *rbuf)
static ringbuf_worker_t *
register_worker(ringbuf_t *rbuf, unsigned registration_type)
{
worker_off_t volatile *p_used_worker, *p_free_worker;
worker_off_t volatile *p_free_worker;
int acquired;
ringbuf_worker_t *w = NULL;

/* Try to find a worker-record that can be registered. */
p_used_worker = &rbuf->first_used_worker;
p_free_worker = &rbuf->first_free_worker;
acquired = false;
while (!acquired) {
worker_off_t old_used_worker, prev_free_worker,
old_free_worker, new_free_worker;
worker_off_t prev_free_worker, old_free_worker, new_free_worker,
i;

/*
* Get the current range of used worker-records, after trying
* to prune recently-unregistered workers.
*/
old_used_worker = prune_unregistered_workers(rbuf);
prev_free_worker = *p_free_worker;

/*
* If there are no potentially free worker-records,
* let our caller know.
*/
if ((((prev_free_worker & RBUF_OFF_MASK) + 1) % rbuf->nworkers)
== (old_used_worker & RBUF_OFF_MASK))
return NULL;

/* Exclusively acquire a worker-record index. */
while (!acquired) {
for (i = 0; !acquired && i < rbuf->nworkers; ++i) {
/* Prepare to acquire a worker-record index. */
old_free_worker = (*p_free_worker);
if (old_free_worker != prev_free_worker)
break;
new_free_worker = ((old_free_worker & RBUF_OFF_MASK) + 1)
% rbuf->nworkers;
new_free_worker = ((old_free_worker & RBUF_OFF_MASK)
+ i) % rbuf->nworkers;
new_free_worker |= WRAP_INCR(old_free_worker);

/* Remember that this worker-record is being registered. */
w = &rbuf->workers[new_free_worker & RBUF_OFF_MASK];
if (!atomic_compare_exchange_weak(&w->registered, not_registered, being_registered))
break;
continue;
acquired = true;

/* Swap the indexes to exclusively acquire the worker-record. */
Expand Down Expand Up @@ -343,6 +287,11 @@ ringbuf_acquire(ringbuf_t *rbuf, ringbuf_worker_t **pw, size_t len)
if (__predict_false(next < written && target >= written)) {
/* The producer must wait. */
w->seen_off = RBUF_OFF_MAX;
if (w->registered == temp_registered) {
*pw = NULL;
atomic_thread_fence(memory_order_release);
w->registered = not_registered;
}
return -1;
}

Expand All @@ -362,6 +311,11 @@ ringbuf_acquire(ringbuf_t *rbuf, ringbuf_worker_t **pw, size_t len)
target = exceed ? (WRAP_LOCK_BIT | len) : 0;
if ((target & RBUF_OFF_MASK) >= written) {
w->seen_off = RBUF_OFF_MAX;
if (w->registered == temp_registered) {
*pw = NULL;
atomic_thread_fence(memory_order_release);
w->registered = not_registered;
}
return -1;
}
/* Increment the wrap-around counter. */
Expand Down
6 changes: 4 additions & 2 deletions src/t_stress.c
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,10 @@ ringbuf_stress(void *arg)
{
const unsigned id = (uintptr_t)arg;
ringbuf_worker_t *w = NULL;
//ringbuf_worker_t *w = ringbuf_register(ringbuf);
//assert (w != NULL);
if (id == 1) {
w = ringbuf_register(ringbuf);
assert (w != NULL);
}
uint64_t total_recv = 0;

/*
Expand Down

0 comments on commit 6ce2969

Please sign in to comment.