From 9666acc55957528caecbd83928efff633ff4f9a6 Mon Sep 17 00:00:00 2001 From: ulatekh Date: Sat, 10 Feb 2018 14:43:41 -0700 Subject: [PATCH 1/5] Implemented a queue of worker-records. This performs much better than the stack of worker-records, while accomplishing the same goal. ringbuf_register() can still be called in this variant, i.e. it allows for both permanent registration of worker-records as well as temporary registration. I think the version-number in the first-used/free-worker indices are unnecessary if both indices are tracked; I also realize that the use of a first-used-worker index interferes with mixed permanent and temporary registration, but then the version-number in the worker-indices must be kept. Both variants will be tried in further commits. --- src/Makefile | 4 +- src/ringbuf.c | 170 ++++++++++++++++++++++++++++++++++++++++++++---- src/ringbuf.h | 6 +- src/t_ringbuf.c | 76 +++++++++------------- src/t_stress.c | 15 +++-- 5 files changed, 203 insertions(+), 68 deletions(-) diff --git a/src/Makefile b/src/Makefile index 7f85353..b4a0c10 100644 --- a/src/Makefile +++ b/src/Makefile @@ -2,7 +2,7 @@ # This file is in the Public Domain. # -CFLAGS= -std=c11 -O2 -g -W -Wextra -Werror +CFLAGS= -std=c11 -g -W -Wextra -Werror CFLAGS+= -D_POSIX_C_SOURCE=200809L CFLAGS+= -D_GNU_SOURCE -D_DEFAULT_SOURCE @@ -23,7 +23,7 @@ endif ifeq ($(DEBUG),1) CFLAGS+= -O0 -DDEBUG -fno-omit-frame-pointer else -CFLAGS+= -DNDEBUG +CFLAGS+= -O2 -DNDEBUG endif LIB= libringbuf diff --git a/src/ringbuf.c b/src/ringbuf.c index e0f2c9e..9671eaa 100644 --- a/src/ringbuf.c +++ b/src/ringbuf.c @@ -76,10 +76,20 @@ #define WRAP_INCR(x) (((x) + 0x100000000UL) & WRAP_COUNTER) typedef uint64_t ringbuf_off_t; +typedef uint64_t worker_off_t; +typedef uint64_t registered_t; + +enum +{ + not_registered, + being_registered, /* Being registered in register_worker() */ + perm_registered, /* Registered in ringbuf_register() */ + temp_registered /* Registered in ringbuf_acquire() */ +}; struct ringbuf_worker { volatile ringbuf_off_t seen_off; - int registered; + registered_t registered; }; struct ringbuf { @@ -94,6 +104,9 @@ struct ringbuf { volatile ringbuf_off_t next; ringbuf_off_t end; + /* The range of worker-records in use. */ + worker_off_t first_used_worker, first_free_worker; + /* The following are updated by the consumer. */ ringbuf_off_t written; unsigned nworkers; @@ -130,25 +143,135 @@ ringbuf_get_sizes(unsigned nworkers, *ringbuf_worker_size = sizeof(ringbuf_worker_t); } +/* + * prune_unregistered_workers: consume any range of unregistered + * worker-records. + */ +static worker_off_t +prune_unregistered_workers(ringbuf_t *rbuf) +{ + worker_off_t volatile *p_used_worker; + worker_off_t volatile const *p_free_worker; + worker_off_t old_used_worker, new_used_worker; + bool should_continue = true; + + p_used_worker = &rbuf->first_used_worker; + p_free_worker = &rbuf->first_free_worker; + + while (should_continue) { + + do { + worker_off_t old_used_worker_index, free_worker_index; + ringbuf_worker_t *w; + + /* If there is no more range of used worker-records, stop. */ + old_used_worker = *p_used_worker; + old_used_worker_index = (old_used_worker & RBUF_OFF_MASK); + free_worker_index = ((*p_free_worker) & RBUF_OFF_MASK); + if (old_used_worker_index == free_worker_index) { + should_continue = false; + break; + } + + /* If this worker-record is still in use, stop. */ + w = &rbuf->workers[old_used_worker_index]; + if (w->registered != not_registered) { + should_continue = false; + break; + } + + /* Advance past this unused worker-record. */ + new_used_worker = (old_used_worker_index + 1) % rbuf->nworkers; + new_used_worker |= WRAP_INCR(old_used_worker); + } while (!atomic_compare_exchange_weak(p_used_worker, old_used_worker, new_used_worker)); + } + + return old_used_worker; +} + +/* + * register_worker: allocate a worker-record for a thread/process, + * and pass back the pointer to its local store. + * Returns NULL if none are available. + */ +static ringbuf_worker_t * +register_worker(ringbuf_t *rbuf, unsigned registration_type) +{ + worker_off_t volatile *p_used_worker, *p_free_worker; + int acquired; + ringbuf_worker_t *w = NULL; + + /* Try to find a worker-record that can be registered. */ + p_used_worker = &rbuf->first_used_worker; + p_free_worker = &rbuf->first_free_worker; + acquired = false; + while (!acquired) { + worker_off_t old_used_worker, prev_free_worker, + old_free_worker, new_free_worker; + + /* + * Get the current range of used worker-records, after trying + * to prune recently-unregistered workers. + */ + old_used_worker = prune_unregistered_workers(rbuf); + prev_free_worker = *p_free_worker; + + /* + * If there are no potentially free worker-records, + * let our caller know. + */ + if ((((prev_free_worker & RBUF_OFF_MASK) + 1) % rbuf->nworkers) + == (old_used_worker & RBUF_OFF_MASK)) + return NULL; + + /* Exclusively acquire a worker-record index. */ + while (!acquired) { + /* Prepare to acquire a worker-record index. */ + old_free_worker = (*p_free_worker); + if (old_free_worker != prev_free_worker) + break; + new_free_worker = ((old_free_worker & RBUF_OFF_MASK) + 1) + % rbuf->nworkers; + new_free_worker |= WRAP_INCR(old_free_worker); + + /* Remember that this worker-record is being registered. */ + w = &rbuf->workers[new_free_worker & RBUF_OFF_MASK]; + if (!atomic_compare_exchange_weak(&w->registered, not_registered, being_registered)) + break; + acquired = true; + + /* Swap the indexes to exclusively acquire the worker-record. */ + if (atomic_compare_exchange_weak(p_free_worker, prev_free_worker, new_free_worker)) { + w->seen_off = RBUF_OFF_MAX; + atomic_thread_fence(memory_order_release); + w->registered = registration_type; + break; + } + + /* The worker-record was not successfully acquired. */ + w->registered = not_registered; + acquired = false; + } + } + + /* Register this worker-record. */ + return w; +} + /* * ringbuf_register: register the worker (thread/process) as a producer * and pass the pointer to its local store. */ ringbuf_worker_t * -ringbuf_register(ringbuf_t *rbuf, unsigned i) +ringbuf_register(ringbuf_t *rbuf) { - ringbuf_worker_t *w = &rbuf->workers[i]; - - w->seen_off = RBUF_OFF_MAX; - atomic_thread_fence(memory_order_release); - w->registered = true; - return w; + return register_worker(rbuf, perm_registered); } void ringbuf_unregister(ringbuf_t *rbuf, ringbuf_worker_t *w) { - w->registered = false; + w->registered = not_registered; (void)rbuf; } @@ -176,11 +299,22 @@ stable_nextoff(ringbuf_t *rbuf) * => On failure: returns -1. */ ssize_t -ringbuf_acquire(ringbuf_t *rbuf, ringbuf_worker_t *w, size_t len) +ringbuf_acquire(ringbuf_t *rbuf, ringbuf_worker_t **pw, size_t len) { ringbuf_off_t seen, next, target; + ringbuf_worker_t *w; ASSERT(len > 0 && len <= rbuf->space); + + /* If necessary, acquire a worker-record. */ + if (*pw == NULL) { + w = register_worker(rbuf, temp_registered); + if (w == NULL) + return -1; + *pw = w; + } else { + w = *pw; + } ASSERT(w->seen_off == RBUF_OFF_MAX); do { @@ -272,13 +406,22 @@ ringbuf_acquire(ringbuf_t *rbuf, ringbuf_worker_t *w, size_t len) * and is ready to be consumed. */ void -ringbuf_produce(ringbuf_t *rbuf, ringbuf_worker_t *w) +ringbuf_produce(ringbuf_t *rbuf, ringbuf_worker_t **pw) { + ringbuf_worker_t *w = *pw; + (void)rbuf; - ASSERT(w->registered); + ASSERT(w->registered != not_registered + && w->registered != being_registered); ASSERT(w->seen_off != RBUF_OFF_MAX); atomic_thread_fence(memory_order_release); w->seen_off = RBUF_OFF_MAX; + + /* Free any temporarily-allocated worker-record. */ + if (w->registered == temp_registered) { + w->registered = not_registered; + *pw = NULL; + } } /* @@ -317,7 +460,8 @@ ringbuf_consume(ringbuf_t *rbuf, size_t *offset) ringbuf_off_t seen_off; /* Skip if the worker has not registered. */ - if (!w->registered) { + if (w->registered == not_registered + || w->registered == being_registered) { continue; } diff --git a/src/ringbuf.h b/src/ringbuf.h index e8fc767..c728c15 100644 --- a/src/ringbuf.h +++ b/src/ringbuf.h @@ -16,11 +16,11 @@ typedef struct ringbuf_worker ringbuf_worker_t; int ringbuf_setup(ringbuf_t *, unsigned, size_t); void ringbuf_get_sizes(unsigned, size_t *, size_t *); -ringbuf_worker_t *ringbuf_register(ringbuf_t *, unsigned); +ringbuf_worker_t *ringbuf_register(ringbuf_t *); void ringbuf_unregister(ringbuf_t *, ringbuf_worker_t *); -ssize_t ringbuf_acquire(ringbuf_t *, ringbuf_worker_t *, size_t); -void ringbuf_produce(ringbuf_t *, ringbuf_worker_t *); +ssize_t ringbuf_acquire(ringbuf_t *, ringbuf_worker_t **, size_t); +void ringbuf_produce(ringbuf_t *, ringbuf_worker_t **); size_t ringbuf_consume(ringbuf_t *, size_t *); void ringbuf_release(ringbuf_t *, size_t); diff --git a/src/t_ringbuf.c b/src/t_ringbuf.c index 05aa4d5..04d4e1f 100644 --- a/src/t_ringbuf.c +++ b/src/t_ringbuf.c @@ -11,7 +11,7 @@ #include "ringbuf.h" -#define MAX_WORKERS 2 +#define MAX_WORKERS 3 static size_t ringbuf_obj_size; @@ -20,20 +20,19 @@ test_wraparound(void) { const size_t n = 1000; ringbuf_t *r = malloc(ringbuf_obj_size); - ringbuf_worker_t *w; + ringbuf_worker_t *w = NULL; size_t len, woff; ssize_t off; /* Size n, but only (n - 1) can be produced at a time. */ ringbuf_setup(r, MAX_WORKERS, n); - w = ringbuf_register(r, 0); /* Produce (n / 2 + 1) and then attempt another (n / 2 - 1). */ - off = ringbuf_acquire(r, w, n / 2 + 1); + off = ringbuf_acquire(r, &w, n / 2 + 1); assert(off == 0); - ringbuf_produce(r, w); + ringbuf_produce(r, &w); - off = ringbuf_acquire(r, w, n / 2 - 1); + off = ringbuf_acquire(r, &w, n / 2 - 1); assert(off == -1); /* Consume (n / 2 + 1) bytes. */ @@ -42,20 +41,19 @@ test_wraparound(void) ringbuf_release(r, len); /* All consumed, attempt (n / 2 + 1) now. */ - off = ringbuf_acquire(r, w, n / 2 + 1); + off = ringbuf_acquire(r, &w, n / 2 + 1); assert(off == -1); /* However, wraparound can be successful with (n / 2). */ - off = ringbuf_acquire(r, w, n / 2); + off = ringbuf_acquire(r, &w, n / 2); assert(off == 0); - ringbuf_produce(r, w); + ringbuf_produce(r, &w); /* Consume (n / 2) bytes. */ len = ringbuf_consume(r, &woff); assert(len == (n / 2) && woff == 0); ringbuf_release(r, len); - ringbuf_unregister(r, w); free(r); } @@ -63,26 +61,25 @@ static void test_multi(void) { ringbuf_t *r = malloc(ringbuf_obj_size); - ringbuf_worker_t *w; + ringbuf_worker_t *w = NULL; size_t len, woff; ssize_t off; ringbuf_setup(r, MAX_WORKERS, 3); - w = ringbuf_register(r, 0); /* * Produce 2 bytes. */ - off = ringbuf_acquire(r, w, 1); + off = ringbuf_acquire(r, &w, 1); assert(off == 0); - ringbuf_produce(r, w); + ringbuf_produce(r, &w); - off = ringbuf_acquire(r, w, 1); + off = ringbuf_acquire(r, &w, 1); assert(off == 1); - ringbuf_produce(r, w); + ringbuf_produce(r, &w); - off = ringbuf_acquire(r, w, 1); + off = ringbuf_acquire(r, &w, 1); assert(off == -1); /* @@ -99,18 +96,18 @@ test_multi(void) * Produce another 2 with wrap-around. */ - off = ringbuf_acquire(r, w, 2); + off = ringbuf_acquire(r, &w, 2); assert(off == -1); - off = ringbuf_acquire(r, w, 1); + off = ringbuf_acquire(r, &w, 1); assert(off == 2); - ringbuf_produce(r, w); + ringbuf_produce(r, &w); - off = ringbuf_acquire(r, w, 1); + off = ringbuf_acquire(r, &w, 1); assert(off == 0); - ringbuf_produce(r, w); + ringbuf_produce(r, &w); - off = ringbuf_acquire(r, w, 1); + off = ringbuf_acquire(r, &w, 1); assert(off == -1); /* @@ -125,7 +122,6 @@ test_multi(void) assert(len == 1 && woff == 0); ringbuf_release(r, len); - ringbuf_unregister(r, w); free(r); } @@ -133,18 +129,16 @@ static void test_overlap(void) { ringbuf_t *r = malloc(ringbuf_obj_size); - ringbuf_worker_t *w1, *w2; + ringbuf_worker_t *w1 = NULL, *w2 = NULL; size_t len, woff; ssize_t off; ringbuf_setup(r, MAX_WORKERS, 10); - w1 = ringbuf_register(r, 0); - w2 = ringbuf_register(r, 1); /* * Producer 1: acquire 5 bytes. Consumer should fail. */ - off = ringbuf_acquire(r, w1, 5); + off = ringbuf_acquire(r, &w1, 5); assert(off == 0); len = ringbuf_consume(r, &woff); @@ -153,7 +147,7 @@ test_overlap(void) /* * Producer 2: acquire 3 bytes. Consumer should still fail. */ - off = ringbuf_acquire(r, w2, 3); + off = ringbuf_acquire(r, &w2, 3); assert(off == 5); len = ringbuf_consume(r, &woff); @@ -162,7 +156,7 @@ test_overlap(void) /* * Producer 1: commit. Consumer can get the first range. */ - ringbuf_produce(r, w1); + ringbuf_produce(r, &w1); len = ringbuf_consume(r, &woff); assert(len == 5 && woff == 0); ringbuf_release(r, len); @@ -174,13 +168,13 @@ test_overlap(void) * Producer 1: acquire-produce 4 bytes, triggering wrap-around. * Consumer should still fail. */ - off = ringbuf_acquire(r, w1, 4); + off = ringbuf_acquire(r, &w1, 4); assert(off == 0); len = ringbuf_consume(r, &woff); assert(len == 0); - ringbuf_produce(r, w1); + ringbuf_produce(r, &w1); len = ringbuf_consume(r, &woff); assert(len == 0); @@ -188,7 +182,7 @@ test_overlap(void) * Finally, producer 2 commits its 3 bytes. * Consumer can proceed for both ranges. */ - ringbuf_produce(r, w2); + ringbuf_produce(r, &w2); len = ringbuf_consume(r, &woff); assert(len == 3 && woff == 5); ringbuf_release(r, len); @@ -197,8 +191,6 @@ test_overlap(void) assert(len == 4 && woff == 0); ringbuf_release(r, len); - ringbuf_unregister(r, w1); - ringbuf_unregister(r, w2); free(r); } @@ -206,14 +198,12 @@ static void test_random(void) { ringbuf_t *r = malloc(ringbuf_obj_size); - ringbuf_worker_t *w1, *w2; + ringbuf_worker_t *w1 = NULL, *w2 = NULL; ssize_t off1 = -1, off2 = -1; unsigned n = 1000 * 1000 * 50; unsigned char buf[500]; ringbuf_setup(r, MAX_WORKERS, sizeof(buf)); - w1 = ringbuf_register(r, 0); - w2 = ringbuf_register(r, 1); while (n--) { size_t len, woff; @@ -237,32 +227,30 @@ test_random(void) break; case 1: // producer 1 if (off1 == -1) { - if ((off1 = ringbuf_acquire(r, w1, len)) >= 0) { + if ((off1 = ringbuf_acquire(r, &w1, len)) >= 0) { assert((size_t)off1 < sizeof(buf)); buf[off1] = len - 1; } } else { buf[off1]++; - ringbuf_produce(r, w1); + ringbuf_produce(r, &w1); off1 = -1; } break; case 2: // producer 2 if (off2 == -1) { - if ((off2 = ringbuf_acquire(r, w2, len)) >= 0) { + if ((off2 = ringbuf_acquire(r, &w2, len)) >= 0) { assert((size_t)off2 < sizeof(buf)); buf[off2] = len - 1; } } else { buf[off2]++; - ringbuf_produce(r, w2); + ringbuf_produce(r, &w2); off2 = -1; } break; } } - ringbuf_unregister(r, w1); - ringbuf_unregister(r, w2); free(r); } diff --git a/src/t_stress.c b/src/t_stress.c index e32fe94..7bac066 100644 --- a/src/t_stress.c +++ b/src/t_stress.c @@ -101,10 +101,10 @@ static void * ringbuf_stress(void *arg) { const unsigned id = (uintptr_t)arg; - ringbuf_worker_t *w; - - w = ringbuf_register(ringbuf, id); - assert(w != NULL); + ringbuf_worker_t *w = NULL; + //ringbuf_worker_t *w = ringbuf_register(ringbuf); + //assert (w != NULL); + uint64_t total_recv = 0; /* * There are NCPU threads concurrently generating and producing @@ -123,6 +123,7 @@ ringbuf_stress(void *arg) if (id == 0) { if ((len = ringbuf_consume(ringbuf, &off)) != 0) { + total_recv += len; size_t rem = len; assert(off < RBUF_SIZE); while (rem) { @@ -136,14 +137,16 @@ ringbuf_stress(void *arg) continue; } len = generate_message(buf, sizeof(buf) - 1); - if ((ret = ringbuf_acquire(ringbuf, w, len)) != -1) { + if ((ret = ringbuf_acquire(ringbuf, &w, len)) != -1) { off = (size_t)ret; assert(off < RBUF_SIZE); memcpy(&rbuf[off], buf, len); - ringbuf_produce(ringbuf, w); + ringbuf_produce(ringbuf, &w); } } pthread_barrier_wait(&barrier); + if (id == 0) + printf ("Total received: %" PRIu64 "\n", total_recv); pthread_exit(NULL); return NULL; } From 6ce2969a30bacd4826abe78f5714e2de56333532 Mon Sep 17 00:00:00 2001 From: ulatekh Date: Sat, 10 Feb 2018 15:48:42 -0700 Subject: [PATCH 2/5] Now the worker-queue feature works when mixing permanant and temporary worker-record registration. --- src/ringbuf.c | 84 ++++++++++++-------------------------------------- src/t_stress.c | 6 ++-- 2 files changed, 23 insertions(+), 67 deletions(-) diff --git a/src/ringbuf.c b/src/ringbuf.c index 9671eaa..3032a12 100644 --- a/src/ringbuf.c +++ b/src/ringbuf.c @@ -104,8 +104,8 @@ struct ringbuf { volatile ringbuf_off_t next; ringbuf_off_t end; - /* The range of worker-records in use. */ - worker_off_t first_used_worker, first_free_worker; + /* The index of the first potentially free worker-record. */ + worker_off_t first_free_worker; /* The following are updated by the consumer. */ ringbuf_off_t written; @@ -143,52 +143,6 @@ ringbuf_get_sizes(unsigned nworkers, *ringbuf_worker_size = sizeof(ringbuf_worker_t); } -/* - * prune_unregistered_workers: consume any range of unregistered - * worker-records. - */ -static worker_off_t -prune_unregistered_workers(ringbuf_t *rbuf) -{ - worker_off_t volatile *p_used_worker; - worker_off_t volatile const *p_free_worker; - worker_off_t old_used_worker, new_used_worker; - bool should_continue = true; - - p_used_worker = &rbuf->first_used_worker; - p_free_worker = &rbuf->first_free_worker; - - while (should_continue) { - - do { - worker_off_t old_used_worker_index, free_worker_index; - ringbuf_worker_t *w; - - /* If there is no more range of used worker-records, stop. */ - old_used_worker = *p_used_worker; - old_used_worker_index = (old_used_worker & RBUF_OFF_MASK); - free_worker_index = ((*p_free_worker) & RBUF_OFF_MASK); - if (old_used_worker_index == free_worker_index) { - should_continue = false; - break; - } - - /* If this worker-record is still in use, stop. */ - w = &rbuf->workers[old_used_worker_index]; - if (w->registered != not_registered) { - should_continue = false; - break; - } - - /* Advance past this unused worker-record. */ - new_used_worker = (old_used_worker_index + 1) % rbuf->nworkers; - new_used_worker |= WRAP_INCR(old_used_worker); - } while (!atomic_compare_exchange_weak(p_used_worker, old_used_worker, new_used_worker)); - } - - return old_used_worker; -} - /* * register_worker: allocate a worker-record for a thread/process, * and pass back the pointer to its local store. @@ -197,47 +151,37 @@ prune_unregistered_workers(ringbuf_t *rbuf) static ringbuf_worker_t * register_worker(ringbuf_t *rbuf, unsigned registration_type) { - worker_off_t volatile *p_used_worker, *p_free_worker; + worker_off_t volatile *p_free_worker; int acquired; ringbuf_worker_t *w = NULL; /* Try to find a worker-record that can be registered. */ - p_used_worker = &rbuf->first_used_worker; p_free_worker = &rbuf->first_free_worker; acquired = false; while (!acquired) { - worker_off_t old_used_worker, prev_free_worker, - old_free_worker, new_free_worker; + worker_off_t prev_free_worker, old_free_worker, new_free_worker, + i; /* * Get the current range of used worker-records, after trying * to prune recently-unregistered workers. */ - old_used_worker = prune_unregistered_workers(rbuf); prev_free_worker = *p_free_worker; - /* - * If there are no potentially free worker-records, - * let our caller know. - */ - if ((((prev_free_worker & RBUF_OFF_MASK) + 1) % rbuf->nworkers) - == (old_used_worker & RBUF_OFF_MASK)) - return NULL; - /* Exclusively acquire a worker-record index. */ - while (!acquired) { + for (i = 0; !acquired && i < rbuf->nworkers; ++i) { /* Prepare to acquire a worker-record index. */ old_free_worker = (*p_free_worker); if (old_free_worker != prev_free_worker) break; - new_free_worker = ((old_free_worker & RBUF_OFF_MASK) + 1) - % rbuf->nworkers; + new_free_worker = ((old_free_worker & RBUF_OFF_MASK) + + i) % rbuf->nworkers; new_free_worker |= WRAP_INCR(old_free_worker); /* Remember that this worker-record is being registered. */ w = &rbuf->workers[new_free_worker & RBUF_OFF_MASK]; if (!atomic_compare_exchange_weak(&w->registered, not_registered, being_registered)) - break; + continue; acquired = true; /* Swap the indexes to exclusively acquire the worker-record. */ @@ -343,6 +287,11 @@ ringbuf_acquire(ringbuf_t *rbuf, ringbuf_worker_t **pw, size_t len) if (__predict_false(next < written && target >= written)) { /* The producer must wait. */ w->seen_off = RBUF_OFF_MAX; + if (w->registered == temp_registered) { + *pw = NULL; + atomic_thread_fence(memory_order_release); + w->registered = not_registered; + } return -1; } @@ -362,6 +311,11 @@ ringbuf_acquire(ringbuf_t *rbuf, ringbuf_worker_t **pw, size_t len) target = exceed ? (WRAP_LOCK_BIT | len) : 0; if ((target & RBUF_OFF_MASK) >= written) { w->seen_off = RBUF_OFF_MAX; + if (w->registered == temp_registered) { + *pw = NULL; + atomic_thread_fence(memory_order_release); + w->registered = not_registered; + } return -1; } /* Increment the wrap-around counter. */ diff --git a/src/t_stress.c b/src/t_stress.c index 7bac066..924516c 100644 --- a/src/t_stress.c +++ b/src/t_stress.c @@ -102,8 +102,10 @@ ringbuf_stress(void *arg) { const unsigned id = (uintptr_t)arg; ringbuf_worker_t *w = NULL; - //ringbuf_worker_t *w = ringbuf_register(ringbuf); - //assert (w != NULL); + if (id == 1) { + w = ringbuf_register(ringbuf); + assert (w != NULL); + } uint64_t total_recv = 0; /* From c3f19e6127f0acc383448cc0a07dc67d4d9dbaa0 Mon Sep 17 00:00:00 2001 From: ulatekh Date: Sun, 11 Feb 2018 10:10:53 -0700 Subject: [PATCH 3/5] If register_worker() acquires the worker-record, advancing the index becomes optional, i.e. it's OK if some other thread advances the index. This should be slightly faster, i.e. less reasons to retry. Also modified the stress-test to print the number of bytes transmitted, and the number of bytes each thread tried to transmit. --- src/ringbuf.c | 16 +++++----------- src/t_stress.c | 13 +++++++++---- 2 files changed, 14 insertions(+), 15 deletions(-) diff --git a/src/ringbuf.c b/src/ringbuf.c index 3032a12..155b716 100644 --- a/src/ringbuf.c +++ b/src/ringbuf.c @@ -183,18 +183,12 @@ register_worker(ringbuf_t *rbuf, unsigned registration_type) if (!atomic_compare_exchange_weak(&w->registered, not_registered, being_registered)) continue; acquired = true; + w->seen_off = RBUF_OFF_MAX; + atomic_thread_fence(memory_order_release); + w->registered = registration_type; - /* Swap the indexes to exclusively acquire the worker-record. */ - if (atomic_compare_exchange_weak(p_free_worker, prev_free_worker, new_free_worker)) { - w->seen_off = RBUF_OFF_MAX; - atomic_thread_fence(memory_order_release); - w->registered = registration_type; - break; - } - - /* The worker-record was not successfully acquired. */ - w->registered = not_registered; - acquired = false; + /* Advance the index if no one else has. */ + atomic_compare_exchange_weak(p_free_worker, prev_free_worker, new_free_worker); } } diff --git a/src/t_stress.c b/src/t_stress.c index 924516c..14880dc 100644 --- a/src/t_stress.c +++ b/src/t_stress.c @@ -106,7 +106,7 @@ ringbuf_stress(void *arg) w = ringbuf_register(ringbuf); assert (w != NULL); } - uint64_t total_recv = 0; + uint64_t total_xmit = 0, total_not_xmit = 0; /* * There are NCPU threads concurrently generating and producing @@ -125,7 +125,7 @@ ringbuf_stress(void *arg) if (id == 0) { if ((len = ringbuf_consume(ringbuf, &off)) != 0) { - total_recv += len; + total_xmit += len; size_t rem = len; assert(off < RBUF_SIZE); while (rem) { @@ -140,15 +140,20 @@ ringbuf_stress(void *arg) } len = generate_message(buf, sizeof(buf) - 1); if ((ret = ringbuf_acquire(ringbuf, &w, len)) != -1) { + total_xmit += len; off = (size_t)ret; assert(off < RBUF_SIZE); memcpy(&rbuf[off], buf, len); ringbuf_produce(ringbuf, &w); - } + } else + total_not_xmit += len; } pthread_barrier_wait(&barrier); if (id == 0) - printf ("Total received: %" PRIu64 "\n", total_recv); + printf ("Thread 0: received %" PRIu64 "\n", total_xmit); + else + printf ("Thread %d: sent %" PRIu64 ", unsent %" PRIu64 "\n", + id, total_xmit, total_not_xmit); pthread_exit(NULL); return NULL; } From 46917a16b258288821b7c0d549dc558d36ef1586 Mon Sep 17 00:00:00 2001 From: ulatekh Date: Mon, 12 Feb 2018 18:26:23 -0700 Subject: [PATCH 4/5] One more slight refinement. --- src/ringbuf.c | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/src/ringbuf.c b/src/ringbuf.c index 155b716..a44cdb4 100644 --- a/src/ringbuf.c +++ b/src/ringbuf.c @@ -162,23 +162,17 @@ register_worker(ringbuf_t *rbuf, unsigned registration_type) worker_off_t prev_free_worker, old_free_worker, new_free_worker, i; - /* - * Get the current range of used worker-records, after trying - * to prune recently-unregistered workers. - */ + /* Get the index of the first worker-record to try registering. */ prev_free_worker = *p_free_worker; - /* Exclusively acquire a worker-record index. */ for (i = 0; !acquired && i < rbuf->nworkers; ++i) { /* Prepare to acquire a worker-record index. */ old_free_worker = (*p_free_worker); - if (old_free_worker != prev_free_worker) - break; new_free_worker = ((old_free_worker & RBUF_OFF_MASK) + i) % rbuf->nworkers; new_free_worker |= WRAP_INCR(old_free_worker); - /* Remember that this worker-record is being registered. */ + /* Try to acquire a worker-record. */ w = &rbuf->workers[new_free_worker & RBUF_OFF_MASK]; if (!atomic_compare_exchange_weak(&w->registered, not_registered, being_registered)) continue; @@ -190,6 +184,13 @@ register_worker(ringbuf_t *rbuf, unsigned registration_type) /* Advance the index if no one else has. */ atomic_compare_exchange_weak(p_free_worker, prev_free_worker, new_free_worker); } + + /* + * If no worker-record could be registered, and no one else was + * trying to register at the same time, then stop searching. + */ + if (!acquired && old_free_worker == prev_free_worker) + break; } /* Register this worker-record. */ From ee7eef04015108b8d587e44517708c2c9978ab30 Mon Sep 17 00:00:00 2001 From: ulatekh Date: Sun, 18 Feb 2018 18:40:53 -0700 Subject: [PATCH 5/5] Re-enabled registering worker-records by index. This makes externally-registered workers more resilient to crashed clients. Now ringbuf_get_sizes() and ringbuf_setup() take a separate count of "temporary" worker-records. These are allocated during a single acquire/produce, if no registered worker-record is provided. --- src/ringbuf.c | 44 +++++++++++++++++++++++++++----------------- src/ringbuf.h | 6 +++--- src/t_ringbuf.c | 13 +++++++------ src/t_stress.c | 6 +++--- 4 files changed, 40 insertions(+), 29 deletions(-) diff --git a/src/ringbuf.c b/src/ringbuf.c index a44cdb4..42f7bc9 100644 --- a/src/ringbuf.c +++ b/src/ringbuf.c @@ -109,7 +109,7 @@ struct ringbuf { /* The following are updated by the consumer. */ ringbuf_off_t written; - unsigned nworkers; + unsigned nworkers, ntempworkers; ringbuf_worker_t workers[]; }; @@ -117,16 +117,17 @@ struct ringbuf { * ringbuf_setup: initialise a new ring buffer of a given length. */ int -ringbuf_setup(ringbuf_t *rbuf, unsigned nworkers, size_t length) +ringbuf_setup(ringbuf_t *rbuf, unsigned nworkers, unsigned ntempworkers, size_t length) { if (length >= RBUF_OFF_MASK) { errno = EINVAL; return -1; } - memset(rbuf, 0, offsetof(ringbuf_t, workers[nworkers])); + memset(rbuf, 0, offsetof(ringbuf_t, workers[nworkers + ntempworkers])); rbuf->space = length; rbuf->end = RBUF_OFF_MAX; rbuf->nworkers = nworkers; + rbuf->ntempworkers = ntempworkers; return 0; } @@ -134,11 +135,11 @@ ringbuf_setup(ringbuf_t *rbuf, unsigned nworkers, size_t length) * ringbuf_get_sizes: return the sizes of the ringbuf_t and ringbuf_worker_t. */ void -ringbuf_get_sizes(unsigned nworkers, +ringbuf_get_sizes(unsigned nworkers, unsigned ntempworkers, size_t *ringbuf_size, size_t *ringbuf_worker_size) { if (ringbuf_size) - *ringbuf_size = offsetof(ringbuf_t, workers[nworkers]); + *ringbuf_size = offsetof(ringbuf_t, workers[nworkers + ntempworkers]); if (ringbuf_worker_size) *ringbuf_worker_size = sizeof(ringbuf_worker_t); } @@ -159,21 +160,20 @@ register_worker(ringbuf_t *rbuf, unsigned registration_type) p_free_worker = &rbuf->first_free_worker; acquired = false; while (!acquired) { - worker_off_t prev_free_worker, old_free_worker, new_free_worker, - i; + worker_off_t prev_free_worker, i; /* Get the index of the first worker-record to try registering. */ prev_free_worker = *p_free_worker; - for (i = 0; !acquired && i < rbuf->nworkers; ++i) { + for (i = 0; !acquired && i < rbuf->ntempworkers; ++i) { + worker_off_t new_free_worker; + /* Prepare to acquire a worker-record index. */ - old_free_worker = (*p_free_worker); - new_free_worker = ((old_free_worker & RBUF_OFF_MASK) - + i) % rbuf->nworkers; - new_free_worker |= WRAP_INCR(old_free_worker); + new_free_worker = ((prev_free_worker & RBUF_OFF_MASK) + + i) % rbuf->ntempworkers; /* Try to acquire a worker-record. */ - w = &rbuf->workers[new_free_worker & RBUF_OFF_MASK]; + w = &rbuf->workers[new_free_worker + rbuf->nworkers]; if (!atomic_compare_exchange_weak(&w->registered, not_registered, being_registered)) continue; acquired = true; @@ -182,6 +182,7 @@ register_worker(ringbuf_t *rbuf, unsigned registration_type) w->registered = registration_type; /* Advance the index if no one else has. */ + new_free_worker |= WRAP_INCR(prev_free_worker); atomic_compare_exchange_weak(p_free_worker, prev_free_worker, new_free_worker); } @@ -189,7 +190,7 @@ register_worker(ringbuf_t *rbuf, unsigned registration_type) * If no worker-record could be registered, and no one else was * trying to register at the same time, then stop searching. */ - if (!acquired && old_free_worker == prev_free_worker) + if (!acquired && (*p_free_worker) == prev_free_worker) break; } @@ -202,9 +203,16 @@ register_worker(ringbuf_t *rbuf, unsigned registration_type) * and pass the pointer to its local store. */ ringbuf_worker_t * -ringbuf_register(ringbuf_t *rbuf) +ringbuf_register(ringbuf_t *rbuf, unsigned i) { - return register_worker(rbuf, perm_registered); + ASSERT (i < rbuf->nworkers); + + ringbuf_worker_t *w = &rbuf->workers[i]; + + w->seen_off = RBUF_OFF_MAX; + atomic_thread_fence(memory_order_release); + w->registered = perm_registered; + return w; } void @@ -381,6 +389,7 @@ ringbuf_consume(ringbuf_t *rbuf, size_t *offset) { ringbuf_off_t written = rbuf->written, next, ready; size_t towrite; + unsigned total_workers; retry: /* * Get the stable 'next' offset. Note: stable_nextoff() issued @@ -403,7 +412,8 @@ ringbuf_consume(ringbuf_t *rbuf, size_t *offset) */ ready = RBUF_OFF_MAX; - for (unsigned i = 0; i < rbuf->nworkers; i++) { + total_workers = rbuf->nworkers + rbuf->ntempworkers; + for (unsigned i = 0; i < total_workers; i++) { ringbuf_worker_t *w = &rbuf->workers[i]; unsigned count = SPINLOCK_BACKOFF_MIN; ringbuf_off_t seen_off; diff --git a/src/ringbuf.h b/src/ringbuf.h index c728c15..562e0e8 100644 --- a/src/ringbuf.h +++ b/src/ringbuf.h @@ -13,10 +13,10 @@ __BEGIN_DECLS typedef struct ringbuf ringbuf_t; typedef struct ringbuf_worker ringbuf_worker_t; -int ringbuf_setup(ringbuf_t *, unsigned, size_t); -void ringbuf_get_sizes(unsigned, size_t *, size_t *); +int ringbuf_setup(ringbuf_t *, unsigned, unsigned, size_t); +void ringbuf_get_sizes(unsigned, unsigned, size_t *, size_t *); -ringbuf_worker_t *ringbuf_register(ringbuf_t *); +ringbuf_worker_t *ringbuf_register(ringbuf_t *, unsigned); void ringbuf_unregister(ringbuf_t *, ringbuf_worker_t *); ssize_t ringbuf_acquire(ringbuf_t *, ringbuf_worker_t **, size_t); diff --git a/src/t_ringbuf.c b/src/t_ringbuf.c index 04d4e1f..8a0402d 100644 --- a/src/t_ringbuf.c +++ b/src/t_ringbuf.c @@ -11,7 +11,8 @@ #include "ringbuf.h" -#define MAX_WORKERS 3 +#define MAX_WORKERS 1 +#define MAX_TEMP_WORKERS 3 static size_t ringbuf_obj_size; @@ -25,7 +26,7 @@ test_wraparound(void) ssize_t off; /* Size n, but only (n - 1) can be produced at a time. */ - ringbuf_setup(r, MAX_WORKERS, n); + ringbuf_setup(r, MAX_WORKERS, MAX_TEMP_WORKERS, n); /* Produce (n / 2 + 1) and then attempt another (n / 2 - 1). */ off = ringbuf_acquire(r, &w, n / 2 + 1); @@ -65,7 +66,7 @@ test_multi(void) size_t len, woff; ssize_t off; - ringbuf_setup(r, MAX_WORKERS, 3); + ringbuf_setup(r, MAX_WORKERS, MAX_TEMP_WORKERS, 3); /* * Produce 2 bytes. @@ -133,7 +134,7 @@ test_overlap(void) size_t len, woff; ssize_t off; - ringbuf_setup(r, MAX_WORKERS, 10); + ringbuf_setup(r, MAX_WORKERS, MAX_TEMP_WORKERS, 10); /* * Producer 1: acquire 5 bytes. Consumer should fail. @@ -203,7 +204,7 @@ test_random(void) unsigned n = 1000 * 1000 * 50; unsigned char buf[500]; - ringbuf_setup(r, MAX_WORKERS, sizeof(buf)); + ringbuf_setup(r, MAX_WORKERS, MAX_TEMP_WORKERS, sizeof(buf)); while (n--) { size_t len, woff; @@ -257,7 +258,7 @@ test_random(void) int main(void) { - ringbuf_get_sizes(MAX_WORKERS, &ringbuf_obj_size, NULL); + ringbuf_get_sizes(MAX_WORKERS, MAX_TEMP_WORKERS, &ringbuf_obj_size, NULL); test_wraparound(); test_multi(); test_overlap(); diff --git a/src/t_stress.c b/src/t_stress.c index 14880dc..d52ab93 100644 --- a/src/t_stress.c +++ b/src/t_stress.c @@ -103,7 +103,7 @@ ringbuf_stress(void *arg) const unsigned id = (uintptr_t)arg; ringbuf_worker_t *w = NULL; if (id == 1) { - w = ringbuf_register(ringbuf); + w = ringbuf_register(ringbuf, 0); assert (w != NULL); } uint64_t total_xmit = 0, total_not_xmit = 0; @@ -188,11 +188,11 @@ run_test(void *func(void *)) /* * Create a ring buffer. */ - ringbuf_get_sizes(nworkers, &ringbuf_obj_size, NULL); + ringbuf_get_sizes(1, nworkers, &ringbuf_obj_size, NULL); ringbuf = malloc(ringbuf_obj_size); assert(ringbuf != NULL); - ringbuf_setup(ringbuf, nworkers, RBUF_SIZE); + ringbuf_setup(ringbuf, 1, nworkers, RBUF_SIZE); memset(rbuf, MAGIC_BYTE, sizeof(rbuf)); /*