Skip to content

Commit

Permalink
Implemented a queue of worker-records.
Browse files Browse the repository at this point in the history
This performs much better than the stack of worker-records, while
accomplishing the same goal.
ringbuf_register() can still be called in this variant, i.e. it allows
for both permanent registration of worker-records as well as temporary
registration.

I think the version-number in the first-used/free-worker indices are
unnecessary if both indices are tracked; I also realize that the use
of a first-used-worker index interferes with mixed permanent and
temporary registration, but then the version-number in the
worker-indices must be kept.
Both variants will be tried in further commits.
  • Loading branch information
ulatekh committed Feb 10, 2018
1 parent 425c184 commit 9666acc
Show file tree
Hide file tree
Showing 5 changed files with 203 additions and 68 deletions.
4 changes: 2 additions & 2 deletions src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# This file is in the Public Domain.
#

CFLAGS= -std=c11 -O2 -g -W -Wextra -Werror
CFLAGS= -std=c11 -g -W -Wextra -Werror
CFLAGS+= -D_POSIX_C_SOURCE=200809L
CFLAGS+= -D_GNU_SOURCE -D_DEFAULT_SOURCE

Expand All @@ -23,7 +23,7 @@ endif
ifeq ($(DEBUG),1)
CFLAGS+= -O0 -DDEBUG -fno-omit-frame-pointer
else
CFLAGS+= -DNDEBUG
CFLAGS+= -O2 -DNDEBUG
endif

LIB= libringbuf
Expand Down
170 changes: 157 additions & 13 deletions src/ringbuf.c
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,20 @@
#define WRAP_INCR(x) (((x) + 0x100000000UL) & WRAP_COUNTER)

typedef uint64_t ringbuf_off_t;
typedef uint64_t worker_off_t;
typedef uint64_t registered_t;

enum
{
not_registered,
being_registered, /* Being registered in register_worker() */
perm_registered, /* Registered in ringbuf_register() */
temp_registered /* Registered in ringbuf_acquire() */
};

struct ringbuf_worker {
volatile ringbuf_off_t seen_off;
int registered;
registered_t registered;
};

struct ringbuf {
Expand All @@ -94,6 +104,9 @@ struct ringbuf {
volatile ringbuf_off_t next;
ringbuf_off_t end;

/* The range of worker-records in use. */
worker_off_t first_used_worker, first_free_worker;

/* The following are updated by the consumer. */
ringbuf_off_t written;
unsigned nworkers;
Expand Down Expand Up @@ -130,25 +143,135 @@ ringbuf_get_sizes(unsigned nworkers,
*ringbuf_worker_size = sizeof(ringbuf_worker_t);
}

/*
* prune_unregistered_workers: consume any range of unregistered
* worker-records.
*/
static worker_off_t
prune_unregistered_workers(ringbuf_t *rbuf)
{
worker_off_t volatile *p_used_worker;
worker_off_t volatile const *p_free_worker;
worker_off_t old_used_worker, new_used_worker;
bool should_continue = true;

p_used_worker = &rbuf->first_used_worker;
p_free_worker = &rbuf->first_free_worker;

while (should_continue) {

do {
worker_off_t old_used_worker_index, free_worker_index;
ringbuf_worker_t *w;

/* If there is no more range of used worker-records, stop. */
old_used_worker = *p_used_worker;
old_used_worker_index = (old_used_worker & RBUF_OFF_MASK);
free_worker_index = ((*p_free_worker) & RBUF_OFF_MASK);
if (old_used_worker_index == free_worker_index) {
should_continue = false;
break;
}

/* If this worker-record is still in use, stop. */
w = &rbuf->workers[old_used_worker_index];
if (w->registered != not_registered) {
should_continue = false;
break;
}

/* Advance past this unused worker-record. */
new_used_worker = (old_used_worker_index + 1) % rbuf->nworkers;
new_used_worker |= WRAP_INCR(old_used_worker);
} while (!atomic_compare_exchange_weak(p_used_worker, old_used_worker, new_used_worker));
}

return old_used_worker;
}

/*
* register_worker: allocate a worker-record for a thread/process,
* and pass back the pointer to its local store.
* Returns NULL if none are available.
*/
static ringbuf_worker_t *
register_worker(ringbuf_t *rbuf, unsigned registration_type)
{
worker_off_t volatile *p_used_worker, *p_free_worker;
int acquired;
ringbuf_worker_t *w = NULL;

/* Try to find a worker-record that can be registered. */
p_used_worker = &rbuf->first_used_worker;
p_free_worker = &rbuf->first_free_worker;
acquired = false;
while (!acquired) {
worker_off_t old_used_worker, prev_free_worker,
old_free_worker, new_free_worker;

/*
* Get the current range of used worker-records, after trying
* to prune recently-unregistered workers.
*/
old_used_worker = prune_unregistered_workers(rbuf);
prev_free_worker = *p_free_worker;

/*
* If there are no potentially free worker-records,
* let our caller know.
*/
if ((((prev_free_worker & RBUF_OFF_MASK) + 1) % rbuf->nworkers)
== (old_used_worker & RBUF_OFF_MASK))
return NULL;

/* Exclusively acquire a worker-record index. */
while (!acquired) {
/* Prepare to acquire a worker-record index. */
old_free_worker = (*p_free_worker);
if (old_free_worker != prev_free_worker)
break;
new_free_worker = ((old_free_worker & RBUF_OFF_MASK) + 1)
% rbuf->nworkers;
new_free_worker |= WRAP_INCR(old_free_worker);

/* Remember that this worker-record is being registered. */
w = &rbuf->workers[new_free_worker & RBUF_OFF_MASK];
if (!atomic_compare_exchange_weak(&w->registered, not_registered, being_registered))
break;
acquired = true;

/* Swap the indexes to exclusively acquire the worker-record. */
if (atomic_compare_exchange_weak(p_free_worker, prev_free_worker, new_free_worker)) {
w->seen_off = RBUF_OFF_MAX;
atomic_thread_fence(memory_order_release);
w->registered = registration_type;
break;
}

/* The worker-record was not successfully acquired. */
w->registered = not_registered;
acquired = false;
}
}

/* Register this worker-record. */
return w;
}

/*
* ringbuf_register: register the worker (thread/process) as a producer
* and pass the pointer to its local store.
*/
ringbuf_worker_t *
ringbuf_register(ringbuf_t *rbuf, unsigned i)
ringbuf_register(ringbuf_t *rbuf)
{
ringbuf_worker_t *w = &rbuf->workers[i];

w->seen_off = RBUF_OFF_MAX;
atomic_thread_fence(memory_order_release);
w->registered = true;
return w;
return register_worker(rbuf, perm_registered);
}

void
ringbuf_unregister(ringbuf_t *rbuf, ringbuf_worker_t *w)
{
w->registered = false;
w->registered = not_registered;
(void)rbuf;
}

Expand Down Expand Up @@ -176,11 +299,22 @@ stable_nextoff(ringbuf_t *rbuf)
* => On failure: returns -1.
*/
ssize_t
ringbuf_acquire(ringbuf_t *rbuf, ringbuf_worker_t *w, size_t len)
ringbuf_acquire(ringbuf_t *rbuf, ringbuf_worker_t **pw, size_t len)
{
ringbuf_off_t seen, next, target;
ringbuf_worker_t *w;

ASSERT(len > 0 && len <= rbuf->space);

/* If necessary, acquire a worker-record. */
if (*pw == NULL) {
w = register_worker(rbuf, temp_registered);
if (w == NULL)
return -1;
*pw = w;
} else {
w = *pw;
}
ASSERT(w->seen_off == RBUF_OFF_MAX);

do {
Expand Down Expand Up @@ -272,13 +406,22 @@ ringbuf_acquire(ringbuf_t *rbuf, ringbuf_worker_t *w, size_t len)
* and is ready to be consumed.
*/
void
ringbuf_produce(ringbuf_t *rbuf, ringbuf_worker_t *w)
ringbuf_produce(ringbuf_t *rbuf, ringbuf_worker_t **pw)
{
ringbuf_worker_t *w = *pw;

(void)rbuf;
ASSERT(w->registered);
ASSERT(w->registered != not_registered
&& w->registered != being_registered);
ASSERT(w->seen_off != RBUF_OFF_MAX);
atomic_thread_fence(memory_order_release);
w->seen_off = RBUF_OFF_MAX;

/* Free any temporarily-allocated worker-record. */
if (w->registered == temp_registered) {
w->registered = not_registered;
*pw = NULL;
}
}

/*
Expand Down Expand Up @@ -317,7 +460,8 @@ ringbuf_consume(ringbuf_t *rbuf, size_t *offset)
ringbuf_off_t seen_off;

/* Skip if the worker has not registered. */
if (!w->registered) {
if (w->registered == not_registered
|| w->registered == being_registered) {
continue;
}

Expand Down
6 changes: 3 additions & 3 deletions src/ringbuf.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ typedef struct ringbuf_worker ringbuf_worker_t;
int ringbuf_setup(ringbuf_t *, unsigned, size_t);
void ringbuf_get_sizes(unsigned, size_t *, size_t *);

ringbuf_worker_t *ringbuf_register(ringbuf_t *, unsigned);
ringbuf_worker_t *ringbuf_register(ringbuf_t *);
void ringbuf_unregister(ringbuf_t *, ringbuf_worker_t *);

ssize_t ringbuf_acquire(ringbuf_t *, ringbuf_worker_t *, size_t);
void ringbuf_produce(ringbuf_t *, ringbuf_worker_t *);
ssize_t ringbuf_acquire(ringbuf_t *, ringbuf_worker_t **, size_t);
void ringbuf_produce(ringbuf_t *, ringbuf_worker_t **);
size_t ringbuf_consume(ringbuf_t *, size_t *);
void ringbuf_release(ringbuf_t *, size_t);

Expand Down
Loading

0 comments on commit 9666acc

Please sign in to comment.