diff --git a/src/Makefile b/src/Makefile index 7f85353..b4a0c10 100644 --- a/src/Makefile +++ b/src/Makefile @@ -2,7 +2,7 @@ # This file is in the Public Domain. # -CFLAGS= -std=c11 -O2 -g -W -Wextra -Werror +CFLAGS= -std=c11 -g -W -Wextra -Werror CFLAGS+= -D_POSIX_C_SOURCE=200809L CFLAGS+= -D_GNU_SOURCE -D_DEFAULT_SOURCE @@ -23,7 +23,7 @@ endif ifeq ($(DEBUG),1) CFLAGS+= -O0 -DDEBUG -fno-omit-frame-pointer else -CFLAGS+= -DNDEBUG +CFLAGS+= -O2 -DNDEBUG endif LIB= libringbuf diff --git a/src/ringbuf.c b/src/ringbuf.c index e0f2c9e..9671eaa 100644 --- a/src/ringbuf.c +++ b/src/ringbuf.c @@ -76,10 +76,20 @@ #define WRAP_INCR(x) (((x) + 0x100000000UL) & WRAP_COUNTER) typedef uint64_t ringbuf_off_t; +typedef uint64_t worker_off_t; +typedef uint64_t registered_t; + +enum +{ + not_registered, + being_registered, /* Being registered in register_worker() */ + perm_registered, /* Registered in ringbuf_register() */ + temp_registered /* Registered in ringbuf_acquire() */ +}; struct ringbuf_worker { volatile ringbuf_off_t seen_off; - int registered; + registered_t registered; }; struct ringbuf { @@ -94,6 +104,9 @@ struct ringbuf { volatile ringbuf_off_t next; ringbuf_off_t end; + /* The range of worker-records in use. */ + worker_off_t first_used_worker, first_free_worker; + /* The following are updated by the consumer. */ ringbuf_off_t written; unsigned nworkers; @@ -130,25 +143,135 @@ ringbuf_get_sizes(unsigned nworkers, *ringbuf_worker_size = sizeof(ringbuf_worker_t); } +/* + * prune_unregistered_workers: consume any range of unregistered + * worker-records. + */ +static worker_off_t +prune_unregistered_workers(ringbuf_t *rbuf) +{ + worker_off_t volatile *p_used_worker; + worker_off_t volatile const *p_free_worker; + worker_off_t old_used_worker, new_used_worker; + bool should_continue = true; + + p_used_worker = &rbuf->first_used_worker; + p_free_worker = &rbuf->first_free_worker; + + while (should_continue) { + + do { + worker_off_t old_used_worker_index, free_worker_index; + ringbuf_worker_t *w; + + /* If there is no more range of used worker-records, stop. */ + old_used_worker = *p_used_worker; + old_used_worker_index = (old_used_worker & RBUF_OFF_MASK); + free_worker_index = ((*p_free_worker) & RBUF_OFF_MASK); + if (old_used_worker_index == free_worker_index) { + should_continue = false; + break; + } + + /* If this worker-record is still in use, stop. */ + w = &rbuf->workers[old_used_worker_index]; + if (w->registered != not_registered) { + should_continue = false; + break; + } + + /* Advance past this unused worker-record. */ + new_used_worker = (old_used_worker_index + 1) % rbuf->nworkers; + new_used_worker |= WRAP_INCR(old_used_worker); + } while (!atomic_compare_exchange_weak(p_used_worker, old_used_worker, new_used_worker)); + } + + return old_used_worker; +} + +/* + * register_worker: allocate a worker-record for a thread/process, + * and pass back the pointer to its local store. + * Returns NULL if none are available. + */ +static ringbuf_worker_t * +register_worker(ringbuf_t *rbuf, unsigned registration_type) +{ + worker_off_t volatile *p_used_worker, *p_free_worker; + int acquired; + ringbuf_worker_t *w = NULL; + + /* Try to find a worker-record that can be registered. */ + p_used_worker = &rbuf->first_used_worker; + p_free_worker = &rbuf->first_free_worker; + acquired = false; + while (!acquired) { + worker_off_t old_used_worker, prev_free_worker, + old_free_worker, new_free_worker; + + /* + * Get the current range of used worker-records, after trying + * to prune recently-unregistered workers. + */ + old_used_worker = prune_unregistered_workers(rbuf); + prev_free_worker = *p_free_worker; + + /* + * If there are no potentially free worker-records, + * let our caller know. + */ + if ((((prev_free_worker & RBUF_OFF_MASK) + 1) % rbuf->nworkers) + == (old_used_worker & RBUF_OFF_MASK)) + return NULL; + + /* Exclusively acquire a worker-record index. */ + while (!acquired) { + /* Prepare to acquire a worker-record index. */ + old_free_worker = (*p_free_worker); + if (old_free_worker != prev_free_worker) + break; + new_free_worker = ((old_free_worker & RBUF_OFF_MASK) + 1) + % rbuf->nworkers; + new_free_worker |= WRAP_INCR(old_free_worker); + + /* Remember that this worker-record is being registered. */ + w = &rbuf->workers[new_free_worker & RBUF_OFF_MASK]; + if (!atomic_compare_exchange_weak(&w->registered, not_registered, being_registered)) + break; + acquired = true; + + /* Swap the indexes to exclusively acquire the worker-record. */ + if (atomic_compare_exchange_weak(p_free_worker, prev_free_worker, new_free_worker)) { + w->seen_off = RBUF_OFF_MAX; + atomic_thread_fence(memory_order_release); + w->registered = registration_type; + break; + } + + /* The worker-record was not successfully acquired. */ + w->registered = not_registered; + acquired = false; + } + } + + /* Register this worker-record. */ + return w; +} + /* * ringbuf_register: register the worker (thread/process) as a producer * and pass the pointer to its local store. */ ringbuf_worker_t * -ringbuf_register(ringbuf_t *rbuf, unsigned i) +ringbuf_register(ringbuf_t *rbuf) { - ringbuf_worker_t *w = &rbuf->workers[i]; - - w->seen_off = RBUF_OFF_MAX; - atomic_thread_fence(memory_order_release); - w->registered = true; - return w; + return register_worker(rbuf, perm_registered); } void ringbuf_unregister(ringbuf_t *rbuf, ringbuf_worker_t *w) { - w->registered = false; + w->registered = not_registered; (void)rbuf; } @@ -176,11 +299,22 @@ stable_nextoff(ringbuf_t *rbuf) * => On failure: returns -1. */ ssize_t -ringbuf_acquire(ringbuf_t *rbuf, ringbuf_worker_t *w, size_t len) +ringbuf_acquire(ringbuf_t *rbuf, ringbuf_worker_t **pw, size_t len) { ringbuf_off_t seen, next, target; + ringbuf_worker_t *w; ASSERT(len > 0 && len <= rbuf->space); + + /* If necessary, acquire a worker-record. */ + if (*pw == NULL) { + w = register_worker(rbuf, temp_registered); + if (w == NULL) + return -1; + *pw = w; + } else { + w = *pw; + } ASSERT(w->seen_off == RBUF_OFF_MAX); do { @@ -272,13 +406,22 @@ ringbuf_acquire(ringbuf_t *rbuf, ringbuf_worker_t *w, size_t len) * and is ready to be consumed. */ void -ringbuf_produce(ringbuf_t *rbuf, ringbuf_worker_t *w) +ringbuf_produce(ringbuf_t *rbuf, ringbuf_worker_t **pw) { + ringbuf_worker_t *w = *pw; + (void)rbuf; - ASSERT(w->registered); + ASSERT(w->registered != not_registered + && w->registered != being_registered); ASSERT(w->seen_off != RBUF_OFF_MAX); atomic_thread_fence(memory_order_release); w->seen_off = RBUF_OFF_MAX; + + /* Free any temporarily-allocated worker-record. */ + if (w->registered == temp_registered) { + w->registered = not_registered; + *pw = NULL; + } } /* @@ -317,7 +460,8 @@ ringbuf_consume(ringbuf_t *rbuf, size_t *offset) ringbuf_off_t seen_off; /* Skip if the worker has not registered. */ - if (!w->registered) { + if (w->registered == not_registered + || w->registered == being_registered) { continue; } diff --git a/src/ringbuf.h b/src/ringbuf.h index e8fc767..c728c15 100644 --- a/src/ringbuf.h +++ b/src/ringbuf.h @@ -16,11 +16,11 @@ typedef struct ringbuf_worker ringbuf_worker_t; int ringbuf_setup(ringbuf_t *, unsigned, size_t); void ringbuf_get_sizes(unsigned, size_t *, size_t *); -ringbuf_worker_t *ringbuf_register(ringbuf_t *, unsigned); +ringbuf_worker_t *ringbuf_register(ringbuf_t *); void ringbuf_unregister(ringbuf_t *, ringbuf_worker_t *); -ssize_t ringbuf_acquire(ringbuf_t *, ringbuf_worker_t *, size_t); -void ringbuf_produce(ringbuf_t *, ringbuf_worker_t *); +ssize_t ringbuf_acquire(ringbuf_t *, ringbuf_worker_t **, size_t); +void ringbuf_produce(ringbuf_t *, ringbuf_worker_t **); size_t ringbuf_consume(ringbuf_t *, size_t *); void ringbuf_release(ringbuf_t *, size_t); diff --git a/src/t_ringbuf.c b/src/t_ringbuf.c index 05aa4d5..04d4e1f 100644 --- a/src/t_ringbuf.c +++ b/src/t_ringbuf.c @@ -11,7 +11,7 @@ #include "ringbuf.h" -#define MAX_WORKERS 2 +#define MAX_WORKERS 3 static size_t ringbuf_obj_size; @@ -20,20 +20,19 @@ test_wraparound(void) { const size_t n = 1000; ringbuf_t *r = malloc(ringbuf_obj_size); - ringbuf_worker_t *w; + ringbuf_worker_t *w = NULL; size_t len, woff; ssize_t off; /* Size n, but only (n - 1) can be produced at a time. */ ringbuf_setup(r, MAX_WORKERS, n); - w = ringbuf_register(r, 0); /* Produce (n / 2 + 1) and then attempt another (n / 2 - 1). */ - off = ringbuf_acquire(r, w, n / 2 + 1); + off = ringbuf_acquire(r, &w, n / 2 + 1); assert(off == 0); - ringbuf_produce(r, w); + ringbuf_produce(r, &w); - off = ringbuf_acquire(r, w, n / 2 - 1); + off = ringbuf_acquire(r, &w, n / 2 - 1); assert(off == -1); /* Consume (n / 2 + 1) bytes. */ @@ -42,20 +41,19 @@ test_wraparound(void) ringbuf_release(r, len); /* All consumed, attempt (n / 2 + 1) now. */ - off = ringbuf_acquire(r, w, n / 2 + 1); + off = ringbuf_acquire(r, &w, n / 2 + 1); assert(off == -1); /* However, wraparound can be successful with (n / 2). */ - off = ringbuf_acquire(r, w, n / 2); + off = ringbuf_acquire(r, &w, n / 2); assert(off == 0); - ringbuf_produce(r, w); + ringbuf_produce(r, &w); /* Consume (n / 2) bytes. */ len = ringbuf_consume(r, &woff); assert(len == (n / 2) && woff == 0); ringbuf_release(r, len); - ringbuf_unregister(r, w); free(r); } @@ -63,26 +61,25 @@ static void test_multi(void) { ringbuf_t *r = malloc(ringbuf_obj_size); - ringbuf_worker_t *w; + ringbuf_worker_t *w = NULL; size_t len, woff; ssize_t off; ringbuf_setup(r, MAX_WORKERS, 3); - w = ringbuf_register(r, 0); /* * Produce 2 bytes. */ - off = ringbuf_acquire(r, w, 1); + off = ringbuf_acquire(r, &w, 1); assert(off == 0); - ringbuf_produce(r, w); + ringbuf_produce(r, &w); - off = ringbuf_acquire(r, w, 1); + off = ringbuf_acquire(r, &w, 1); assert(off == 1); - ringbuf_produce(r, w); + ringbuf_produce(r, &w); - off = ringbuf_acquire(r, w, 1); + off = ringbuf_acquire(r, &w, 1); assert(off == -1); /* @@ -99,18 +96,18 @@ test_multi(void) * Produce another 2 with wrap-around. */ - off = ringbuf_acquire(r, w, 2); + off = ringbuf_acquire(r, &w, 2); assert(off == -1); - off = ringbuf_acquire(r, w, 1); + off = ringbuf_acquire(r, &w, 1); assert(off == 2); - ringbuf_produce(r, w); + ringbuf_produce(r, &w); - off = ringbuf_acquire(r, w, 1); + off = ringbuf_acquire(r, &w, 1); assert(off == 0); - ringbuf_produce(r, w); + ringbuf_produce(r, &w); - off = ringbuf_acquire(r, w, 1); + off = ringbuf_acquire(r, &w, 1); assert(off == -1); /* @@ -125,7 +122,6 @@ test_multi(void) assert(len == 1 && woff == 0); ringbuf_release(r, len); - ringbuf_unregister(r, w); free(r); } @@ -133,18 +129,16 @@ static void test_overlap(void) { ringbuf_t *r = malloc(ringbuf_obj_size); - ringbuf_worker_t *w1, *w2; + ringbuf_worker_t *w1 = NULL, *w2 = NULL; size_t len, woff; ssize_t off; ringbuf_setup(r, MAX_WORKERS, 10); - w1 = ringbuf_register(r, 0); - w2 = ringbuf_register(r, 1); /* * Producer 1: acquire 5 bytes. Consumer should fail. */ - off = ringbuf_acquire(r, w1, 5); + off = ringbuf_acquire(r, &w1, 5); assert(off == 0); len = ringbuf_consume(r, &woff); @@ -153,7 +147,7 @@ test_overlap(void) /* * Producer 2: acquire 3 bytes. Consumer should still fail. */ - off = ringbuf_acquire(r, w2, 3); + off = ringbuf_acquire(r, &w2, 3); assert(off == 5); len = ringbuf_consume(r, &woff); @@ -162,7 +156,7 @@ test_overlap(void) /* * Producer 1: commit. Consumer can get the first range. */ - ringbuf_produce(r, w1); + ringbuf_produce(r, &w1); len = ringbuf_consume(r, &woff); assert(len == 5 && woff == 0); ringbuf_release(r, len); @@ -174,13 +168,13 @@ test_overlap(void) * Producer 1: acquire-produce 4 bytes, triggering wrap-around. * Consumer should still fail. */ - off = ringbuf_acquire(r, w1, 4); + off = ringbuf_acquire(r, &w1, 4); assert(off == 0); len = ringbuf_consume(r, &woff); assert(len == 0); - ringbuf_produce(r, w1); + ringbuf_produce(r, &w1); len = ringbuf_consume(r, &woff); assert(len == 0); @@ -188,7 +182,7 @@ test_overlap(void) * Finally, producer 2 commits its 3 bytes. * Consumer can proceed for both ranges. */ - ringbuf_produce(r, w2); + ringbuf_produce(r, &w2); len = ringbuf_consume(r, &woff); assert(len == 3 && woff == 5); ringbuf_release(r, len); @@ -197,8 +191,6 @@ test_overlap(void) assert(len == 4 && woff == 0); ringbuf_release(r, len); - ringbuf_unregister(r, w1); - ringbuf_unregister(r, w2); free(r); } @@ -206,14 +198,12 @@ static void test_random(void) { ringbuf_t *r = malloc(ringbuf_obj_size); - ringbuf_worker_t *w1, *w2; + ringbuf_worker_t *w1 = NULL, *w2 = NULL; ssize_t off1 = -1, off2 = -1; unsigned n = 1000 * 1000 * 50; unsigned char buf[500]; ringbuf_setup(r, MAX_WORKERS, sizeof(buf)); - w1 = ringbuf_register(r, 0); - w2 = ringbuf_register(r, 1); while (n--) { size_t len, woff; @@ -237,32 +227,30 @@ test_random(void) break; case 1: // producer 1 if (off1 == -1) { - if ((off1 = ringbuf_acquire(r, w1, len)) >= 0) { + if ((off1 = ringbuf_acquire(r, &w1, len)) >= 0) { assert((size_t)off1 < sizeof(buf)); buf[off1] = len - 1; } } else { buf[off1]++; - ringbuf_produce(r, w1); + ringbuf_produce(r, &w1); off1 = -1; } break; case 2: // producer 2 if (off2 == -1) { - if ((off2 = ringbuf_acquire(r, w2, len)) >= 0) { + if ((off2 = ringbuf_acquire(r, &w2, len)) >= 0) { assert((size_t)off2 < sizeof(buf)); buf[off2] = len - 1; } } else { buf[off2]++; - ringbuf_produce(r, w2); + ringbuf_produce(r, &w2); off2 = -1; } break; } } - ringbuf_unregister(r, w1); - ringbuf_unregister(r, w2); free(r); } diff --git a/src/t_stress.c b/src/t_stress.c index e32fe94..7bac066 100644 --- a/src/t_stress.c +++ b/src/t_stress.c @@ -101,10 +101,10 @@ static void * ringbuf_stress(void *arg) { const unsigned id = (uintptr_t)arg; - ringbuf_worker_t *w; - - w = ringbuf_register(ringbuf, id); - assert(w != NULL); + ringbuf_worker_t *w = NULL; + //ringbuf_worker_t *w = ringbuf_register(ringbuf); + //assert (w != NULL); + uint64_t total_recv = 0; /* * There are NCPU threads concurrently generating and producing @@ -123,6 +123,7 @@ ringbuf_stress(void *arg) if (id == 0) { if ((len = ringbuf_consume(ringbuf, &off)) != 0) { + total_recv += len; size_t rem = len; assert(off < RBUF_SIZE); while (rem) { @@ -136,14 +137,16 @@ ringbuf_stress(void *arg) continue; } len = generate_message(buf, sizeof(buf) - 1); - if ((ret = ringbuf_acquire(ringbuf, w, len)) != -1) { + if ((ret = ringbuf_acquire(ringbuf, &w, len)) != -1) { off = (size_t)ret; assert(off < RBUF_SIZE); memcpy(&rbuf[off], buf, len); - ringbuf_produce(ringbuf, w); + ringbuf_produce(ringbuf, &w); } } pthread_barrier_wait(&barrier); + if (id == 0) + printf ("Total received: %" PRIu64 "\n", total_recv); pthread_exit(NULL); return NULL; }