Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Worker-record queue #6

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# This file is in the Public Domain.
#

CFLAGS= -std=c11 -O2 -g -W -Wextra -Werror
CFLAGS= -std=c11 -g -W -Wextra -Werror
CFLAGS+= -D_POSIX_C_SOURCE=200809L
CFLAGS+= -D_GNU_SOURCE -D_DEFAULT_SOURCE

Expand All @@ -23,7 +23,7 @@ endif
ifeq ($(DEBUG),1)
CFLAGS+= -O0 -DDEBUG -fno-omit-frame-pointer
else
CFLAGS+= -DNDEBUG
CFLAGS+= -O2 -DNDEBUG
endif

LIB= libringbuf
Expand Down
129 changes: 116 additions & 13 deletions src/ringbuf.c
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,20 @@
#define WRAP_INCR(x) (((x) + 0x100000000UL) & WRAP_COUNTER)

typedef uint64_t ringbuf_off_t;
typedef uint64_t worker_off_t;
typedef uint64_t registered_t;

enum
{
not_registered,
being_registered, /* Being registered in register_worker() */
perm_registered, /* Registered in ringbuf_register() */
temp_registered /* Registered in ringbuf_acquire() */
};

struct ringbuf_worker {
volatile ringbuf_off_t seen_off;
int registered;
registered_t registered;
};

struct ringbuf {
Expand All @@ -94,61 +104,121 @@ struct ringbuf {
volatile ringbuf_off_t next;
ringbuf_off_t end;

/* The index of the first potentially free worker-record. */
worker_off_t first_free_worker;

/* The following are updated by the consumer. */
ringbuf_off_t written;
unsigned nworkers;
unsigned nworkers, ntempworkers;
ringbuf_worker_t workers[];
};

/*
* ringbuf_setup: initialise a new ring buffer of a given length.
*/
int
ringbuf_setup(ringbuf_t *rbuf, unsigned nworkers, size_t length)
ringbuf_setup(ringbuf_t *rbuf, unsigned nworkers, unsigned ntempworkers, size_t length)
{
if (length >= RBUF_OFF_MASK) {
errno = EINVAL;
return -1;
}
memset(rbuf, 0, offsetof(ringbuf_t, workers[nworkers]));
memset(rbuf, 0, offsetof(ringbuf_t, workers[nworkers + ntempworkers]));
rbuf->space = length;
rbuf->end = RBUF_OFF_MAX;
rbuf->nworkers = nworkers;
rbuf->ntempworkers = ntempworkers;
return 0;
}

/*
* ringbuf_get_sizes: return the sizes of the ringbuf_t and ringbuf_worker_t.
*/
void
ringbuf_get_sizes(unsigned nworkers,
ringbuf_get_sizes(unsigned nworkers, unsigned ntempworkers,
size_t *ringbuf_size, size_t *ringbuf_worker_size)
{
if (ringbuf_size)
*ringbuf_size = offsetof(ringbuf_t, workers[nworkers]);
*ringbuf_size = offsetof(ringbuf_t, workers[nworkers + ntempworkers]);
if (ringbuf_worker_size)
*ringbuf_worker_size = sizeof(ringbuf_worker_t);
}

/*
* register_worker: allocate a worker-record for a thread/process,
* and pass back the pointer to its local store.
* Returns NULL if none are available.
*/
static ringbuf_worker_t *
register_worker(ringbuf_t *rbuf, unsigned registration_type)
{
worker_off_t volatile *p_free_worker;
int acquired;
ringbuf_worker_t *w = NULL;

/* Try to find a worker-record that can be registered. */
p_free_worker = &rbuf->first_free_worker;
acquired = false;
while (!acquired) {
worker_off_t prev_free_worker, i;

/* Get the index of the first worker-record to try registering. */
prev_free_worker = *p_free_worker;

for (i = 0; !acquired && i < rbuf->ntempworkers; ++i) {
worker_off_t new_free_worker;

/* Prepare to acquire a worker-record index. */
new_free_worker = ((prev_free_worker & RBUF_OFF_MASK)
+ i) % rbuf->ntempworkers;

/* Try to acquire a worker-record. */
w = &rbuf->workers[new_free_worker + rbuf->nworkers];
if (!atomic_compare_exchange_weak(&w->registered, not_registered, being_registered))
continue;
acquired = true;
w->seen_off = RBUF_OFF_MAX;
atomic_thread_fence(memory_order_release);
w->registered = registration_type;

/* Advance the index if no one else has. */
new_free_worker |= WRAP_INCR(prev_free_worker);
atomic_compare_exchange_weak(p_free_worker, prev_free_worker, new_free_worker);
}

/*
* If no worker-record could be registered, and no one else was
* trying to register at the same time, then stop searching.
*/
if (!acquired && (*p_free_worker) == prev_free_worker)
break;
}

/* Register this worker-record. */
return w;
}

/*
* ringbuf_register: register the worker (thread/process) as a producer
* and pass the pointer to its local store.
*/
ringbuf_worker_t *
ringbuf_register(ringbuf_t *rbuf, unsigned i)
{
ASSERT (i < rbuf->nworkers);

ringbuf_worker_t *w = &rbuf->workers[i];

w->seen_off = RBUF_OFF_MAX;
atomic_thread_fence(memory_order_release);
w->registered = true;
w->registered = perm_registered;
return w;
}

void
ringbuf_unregister(ringbuf_t *rbuf, ringbuf_worker_t *w)
{
w->registered = false;
w->registered = not_registered;
(void)rbuf;
}

Expand Down Expand Up @@ -176,11 +246,22 @@ stable_nextoff(ringbuf_t *rbuf)
* => On failure: returns -1.
*/
ssize_t
ringbuf_acquire(ringbuf_t *rbuf, ringbuf_worker_t *w, size_t len)
ringbuf_acquire(ringbuf_t *rbuf, ringbuf_worker_t **pw, size_t len)
{
ringbuf_off_t seen, next, target;
ringbuf_worker_t *w;

ASSERT(len > 0 && len <= rbuf->space);

/* If necessary, acquire a worker-record. */
if (*pw == NULL) {
w = register_worker(rbuf, temp_registered);
if (w == NULL)
return -1;
*pw = w;
} else {
w = *pw;
}
ASSERT(w->seen_off == RBUF_OFF_MAX);

do {
Expand Down Expand Up @@ -209,6 +290,11 @@ ringbuf_acquire(ringbuf_t *rbuf, ringbuf_worker_t *w, size_t len)
if (__predict_false(next < written && target >= written)) {
/* The producer must wait. */
w->seen_off = RBUF_OFF_MAX;
if (w->registered == temp_registered) {
*pw = NULL;
atomic_thread_fence(memory_order_release);
w->registered = not_registered;
}
return -1;
}

Expand All @@ -228,6 +314,11 @@ ringbuf_acquire(ringbuf_t *rbuf, ringbuf_worker_t *w, size_t len)
target = exceed ? (WRAP_LOCK_BIT | len) : 0;
if ((target & RBUF_OFF_MASK) >= written) {
w->seen_off = RBUF_OFF_MAX;
if (w->registered == temp_registered) {
*pw = NULL;
atomic_thread_fence(memory_order_release);
w->registered = not_registered;
}
return -1;
}
/* Increment the wrap-around counter. */
Expand Down Expand Up @@ -272,13 +363,22 @@ ringbuf_acquire(ringbuf_t *rbuf, ringbuf_worker_t *w, size_t len)
* and is ready to be consumed.
*/
void
ringbuf_produce(ringbuf_t *rbuf, ringbuf_worker_t *w)
ringbuf_produce(ringbuf_t *rbuf, ringbuf_worker_t **pw)
{
ringbuf_worker_t *w = *pw;

(void)rbuf;
ASSERT(w->registered);
ASSERT(w->registered != not_registered
&& w->registered != being_registered);
ASSERT(w->seen_off != RBUF_OFF_MAX);
atomic_thread_fence(memory_order_release);
w->seen_off = RBUF_OFF_MAX;

/* Free any temporarily-allocated worker-record. */
if (w->registered == temp_registered) {
w->registered = not_registered;
*pw = NULL;
}
}

/*
Expand All @@ -289,6 +389,7 @@ ringbuf_consume(ringbuf_t *rbuf, size_t *offset)
{
ringbuf_off_t written = rbuf->written, next, ready;
size_t towrite;
unsigned total_workers;
retry:
/*
* Get the stable 'next' offset. Note: stable_nextoff() issued
Expand All @@ -311,13 +412,15 @@ ringbuf_consume(ringbuf_t *rbuf, size_t *offset)
*/
ready = RBUF_OFF_MAX;

for (unsigned i = 0; i < rbuf->nworkers; i++) {
total_workers = rbuf->nworkers + rbuf->ntempworkers;
for (unsigned i = 0; i < total_workers; i++) {
ringbuf_worker_t *w = &rbuf->workers[i];
unsigned count = SPINLOCK_BACKOFF_MIN;
ringbuf_off_t seen_off;

/* Skip if the worker has not registered. */
if (!w->registered) {
if (w->registered == not_registered
|| w->registered == being_registered) {
continue;
}

Expand Down
8 changes: 4 additions & 4 deletions src/ringbuf.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ __BEGIN_DECLS
typedef struct ringbuf ringbuf_t;
typedef struct ringbuf_worker ringbuf_worker_t;

int ringbuf_setup(ringbuf_t *, unsigned, size_t);
void ringbuf_get_sizes(unsigned, size_t *, size_t *);
int ringbuf_setup(ringbuf_t *, unsigned, unsigned, size_t);
void ringbuf_get_sizes(unsigned, unsigned, size_t *, size_t *);

ringbuf_worker_t *ringbuf_register(ringbuf_t *, unsigned);
void ringbuf_unregister(ringbuf_t *, ringbuf_worker_t *);

ssize_t ringbuf_acquire(ringbuf_t *, ringbuf_worker_t *, size_t);
void ringbuf_produce(ringbuf_t *, ringbuf_worker_t *);
ssize_t ringbuf_acquire(ringbuf_t *, ringbuf_worker_t **, size_t);
void ringbuf_produce(ringbuf_t *, ringbuf_worker_t **);
size_t ringbuf_consume(ringbuf_t *, size_t *);
void ringbuf_release(ringbuf_t *, size_t);

Expand Down
Loading