diff --git a/src/Makefile b/src/Makefile index 7f85353..b4a0c10 100644 --- a/src/Makefile +++ b/src/Makefile @@ -2,7 +2,7 @@ # This file is in the Public Domain. # -CFLAGS= -std=c11 -O2 -g -W -Wextra -Werror +CFLAGS= -std=c11 -g -W -Wextra -Werror CFLAGS+= -D_POSIX_C_SOURCE=200809L CFLAGS+= -D_GNU_SOURCE -D_DEFAULT_SOURCE @@ -23,7 +23,7 @@ endif ifeq ($(DEBUG),1) CFLAGS+= -O0 -DDEBUG -fno-omit-frame-pointer else -CFLAGS+= -DNDEBUG +CFLAGS+= -O2 -DNDEBUG endif LIB= libringbuf diff --git a/src/ringbuf.c b/src/ringbuf.c index e0f2c9e..42f7bc9 100644 --- a/src/ringbuf.c +++ b/src/ringbuf.c @@ -76,10 +76,20 @@ #define WRAP_INCR(x) (((x) + 0x100000000UL) & WRAP_COUNTER) typedef uint64_t ringbuf_off_t; +typedef uint64_t worker_off_t; +typedef uint64_t registered_t; + +enum +{ + not_registered, + being_registered, /* Being registered in register_worker() */ + perm_registered, /* Registered in ringbuf_register() */ + temp_registered /* Registered in ringbuf_acquire() */ +}; struct ringbuf_worker { volatile ringbuf_off_t seen_off; - int registered; + registered_t registered; }; struct ringbuf { @@ -94,9 +104,12 @@ struct ringbuf { volatile ringbuf_off_t next; ringbuf_off_t end; + /* The index of the first potentially free worker-record. */ + worker_off_t first_free_worker; + /* The following are updated by the consumer. */ ringbuf_off_t written; - unsigned nworkers; + unsigned nworkers, ntempworkers; ringbuf_worker_t workers[]; }; @@ -104,16 +117,17 @@ struct ringbuf { * ringbuf_setup: initialise a new ring buffer of a given length. */ int -ringbuf_setup(ringbuf_t *rbuf, unsigned nworkers, size_t length) +ringbuf_setup(ringbuf_t *rbuf, unsigned nworkers, unsigned ntempworkers, size_t length) { if (length >= RBUF_OFF_MASK) { errno = EINVAL; return -1; } - memset(rbuf, 0, offsetof(ringbuf_t, workers[nworkers])); + memset(rbuf, 0, offsetof(ringbuf_t, workers[nworkers + ntempworkers])); rbuf->space = length; rbuf->end = RBUF_OFF_MAX; rbuf->nworkers = nworkers; + rbuf->ntempworkers = ntempworkers; return 0; } @@ -121,15 +135,69 @@ ringbuf_setup(ringbuf_t *rbuf, unsigned nworkers, size_t length) * ringbuf_get_sizes: return the sizes of the ringbuf_t and ringbuf_worker_t. */ void -ringbuf_get_sizes(unsigned nworkers, +ringbuf_get_sizes(unsigned nworkers, unsigned ntempworkers, size_t *ringbuf_size, size_t *ringbuf_worker_size) { if (ringbuf_size) - *ringbuf_size = offsetof(ringbuf_t, workers[nworkers]); + *ringbuf_size = offsetof(ringbuf_t, workers[nworkers + ntempworkers]); if (ringbuf_worker_size) *ringbuf_worker_size = sizeof(ringbuf_worker_t); } +/* + * register_worker: allocate a worker-record for a thread/process, + * and pass back the pointer to its local store. + * Returns NULL if none are available. + */ +static ringbuf_worker_t * +register_worker(ringbuf_t *rbuf, unsigned registration_type) +{ + worker_off_t volatile *p_free_worker; + int acquired; + ringbuf_worker_t *w = NULL; + + /* Try to find a worker-record that can be registered. */ + p_free_worker = &rbuf->first_free_worker; + acquired = false; + while (!acquired) { + worker_off_t prev_free_worker, i; + + /* Get the index of the first worker-record to try registering. */ + prev_free_worker = *p_free_worker; + + for (i = 0; !acquired && i < rbuf->ntempworkers; ++i) { + worker_off_t new_free_worker; + + /* Prepare to acquire a worker-record index. */ + new_free_worker = ((prev_free_worker & RBUF_OFF_MASK) + + i) % rbuf->ntempworkers; + + /* Try to acquire a worker-record. */ + w = &rbuf->workers[new_free_worker + rbuf->nworkers]; + if (!atomic_compare_exchange_weak(&w->registered, not_registered, being_registered)) + continue; + acquired = true; + w->seen_off = RBUF_OFF_MAX; + atomic_thread_fence(memory_order_release); + w->registered = registration_type; + + /* Advance the index if no one else has. */ + new_free_worker |= WRAP_INCR(prev_free_worker); + atomic_compare_exchange_weak(p_free_worker, prev_free_worker, new_free_worker); + } + + /* + * If no worker-record could be registered, and no one else was + * trying to register at the same time, then stop searching. + */ + if (!acquired && (*p_free_worker) == prev_free_worker) + break; + } + + /* Register this worker-record. */ + return w; +} + /* * ringbuf_register: register the worker (thread/process) as a producer * and pass the pointer to its local store. @@ -137,18 +205,20 @@ ringbuf_get_sizes(unsigned nworkers, ringbuf_worker_t * ringbuf_register(ringbuf_t *rbuf, unsigned i) { + ASSERT (i < rbuf->nworkers); + ringbuf_worker_t *w = &rbuf->workers[i]; w->seen_off = RBUF_OFF_MAX; atomic_thread_fence(memory_order_release); - w->registered = true; + w->registered = perm_registered; return w; } void ringbuf_unregister(ringbuf_t *rbuf, ringbuf_worker_t *w) { - w->registered = false; + w->registered = not_registered; (void)rbuf; } @@ -176,11 +246,22 @@ stable_nextoff(ringbuf_t *rbuf) * => On failure: returns -1. */ ssize_t -ringbuf_acquire(ringbuf_t *rbuf, ringbuf_worker_t *w, size_t len) +ringbuf_acquire(ringbuf_t *rbuf, ringbuf_worker_t **pw, size_t len) { ringbuf_off_t seen, next, target; + ringbuf_worker_t *w; ASSERT(len > 0 && len <= rbuf->space); + + /* If necessary, acquire a worker-record. */ + if (*pw == NULL) { + w = register_worker(rbuf, temp_registered); + if (w == NULL) + return -1; + *pw = w; + } else { + w = *pw; + } ASSERT(w->seen_off == RBUF_OFF_MAX); do { @@ -209,6 +290,11 @@ ringbuf_acquire(ringbuf_t *rbuf, ringbuf_worker_t *w, size_t len) if (__predict_false(next < written && target >= written)) { /* The producer must wait. */ w->seen_off = RBUF_OFF_MAX; + if (w->registered == temp_registered) { + *pw = NULL; + atomic_thread_fence(memory_order_release); + w->registered = not_registered; + } return -1; } @@ -228,6 +314,11 @@ ringbuf_acquire(ringbuf_t *rbuf, ringbuf_worker_t *w, size_t len) target = exceed ? (WRAP_LOCK_BIT | len) : 0; if ((target & RBUF_OFF_MASK) >= written) { w->seen_off = RBUF_OFF_MAX; + if (w->registered == temp_registered) { + *pw = NULL; + atomic_thread_fence(memory_order_release); + w->registered = not_registered; + } return -1; } /* Increment the wrap-around counter. */ @@ -272,13 +363,22 @@ ringbuf_acquire(ringbuf_t *rbuf, ringbuf_worker_t *w, size_t len) * and is ready to be consumed. */ void -ringbuf_produce(ringbuf_t *rbuf, ringbuf_worker_t *w) +ringbuf_produce(ringbuf_t *rbuf, ringbuf_worker_t **pw) { + ringbuf_worker_t *w = *pw; + (void)rbuf; - ASSERT(w->registered); + ASSERT(w->registered != not_registered + && w->registered != being_registered); ASSERT(w->seen_off != RBUF_OFF_MAX); atomic_thread_fence(memory_order_release); w->seen_off = RBUF_OFF_MAX; + + /* Free any temporarily-allocated worker-record. */ + if (w->registered == temp_registered) { + w->registered = not_registered; + *pw = NULL; + } } /* @@ -289,6 +389,7 @@ ringbuf_consume(ringbuf_t *rbuf, size_t *offset) { ringbuf_off_t written = rbuf->written, next, ready; size_t towrite; + unsigned total_workers; retry: /* * Get the stable 'next' offset. Note: stable_nextoff() issued @@ -311,13 +412,15 @@ ringbuf_consume(ringbuf_t *rbuf, size_t *offset) */ ready = RBUF_OFF_MAX; - for (unsigned i = 0; i < rbuf->nworkers; i++) { + total_workers = rbuf->nworkers + rbuf->ntempworkers; + for (unsigned i = 0; i < total_workers; i++) { ringbuf_worker_t *w = &rbuf->workers[i]; unsigned count = SPINLOCK_BACKOFF_MIN; ringbuf_off_t seen_off; /* Skip if the worker has not registered. */ - if (!w->registered) { + if (w->registered == not_registered + || w->registered == being_registered) { continue; } diff --git a/src/ringbuf.h b/src/ringbuf.h index e8fc767..562e0e8 100644 --- a/src/ringbuf.h +++ b/src/ringbuf.h @@ -13,14 +13,14 @@ __BEGIN_DECLS typedef struct ringbuf ringbuf_t; typedef struct ringbuf_worker ringbuf_worker_t; -int ringbuf_setup(ringbuf_t *, unsigned, size_t); -void ringbuf_get_sizes(unsigned, size_t *, size_t *); +int ringbuf_setup(ringbuf_t *, unsigned, unsigned, size_t); +void ringbuf_get_sizes(unsigned, unsigned, size_t *, size_t *); ringbuf_worker_t *ringbuf_register(ringbuf_t *, unsigned); void ringbuf_unregister(ringbuf_t *, ringbuf_worker_t *); -ssize_t ringbuf_acquire(ringbuf_t *, ringbuf_worker_t *, size_t); -void ringbuf_produce(ringbuf_t *, ringbuf_worker_t *); +ssize_t ringbuf_acquire(ringbuf_t *, ringbuf_worker_t **, size_t); +void ringbuf_produce(ringbuf_t *, ringbuf_worker_t **); size_t ringbuf_consume(ringbuf_t *, size_t *); void ringbuf_release(ringbuf_t *, size_t); diff --git a/src/t_ringbuf.c b/src/t_ringbuf.c index 05aa4d5..8a0402d 100644 --- a/src/t_ringbuf.c +++ b/src/t_ringbuf.c @@ -11,7 +11,8 @@ #include "ringbuf.h" -#define MAX_WORKERS 2 +#define MAX_WORKERS 1 +#define MAX_TEMP_WORKERS 3 static size_t ringbuf_obj_size; @@ -20,20 +21,19 @@ test_wraparound(void) { const size_t n = 1000; ringbuf_t *r = malloc(ringbuf_obj_size); - ringbuf_worker_t *w; + ringbuf_worker_t *w = NULL; size_t len, woff; ssize_t off; /* Size n, but only (n - 1) can be produced at a time. */ - ringbuf_setup(r, MAX_WORKERS, n); - w = ringbuf_register(r, 0); + ringbuf_setup(r, MAX_WORKERS, MAX_TEMP_WORKERS, n); /* Produce (n / 2 + 1) and then attempt another (n / 2 - 1). */ - off = ringbuf_acquire(r, w, n / 2 + 1); + off = ringbuf_acquire(r, &w, n / 2 + 1); assert(off == 0); - ringbuf_produce(r, w); + ringbuf_produce(r, &w); - off = ringbuf_acquire(r, w, n / 2 - 1); + off = ringbuf_acquire(r, &w, n / 2 - 1); assert(off == -1); /* Consume (n / 2 + 1) bytes. */ @@ -42,20 +42,19 @@ test_wraparound(void) ringbuf_release(r, len); /* All consumed, attempt (n / 2 + 1) now. */ - off = ringbuf_acquire(r, w, n / 2 + 1); + off = ringbuf_acquire(r, &w, n / 2 + 1); assert(off == -1); /* However, wraparound can be successful with (n / 2). */ - off = ringbuf_acquire(r, w, n / 2); + off = ringbuf_acquire(r, &w, n / 2); assert(off == 0); - ringbuf_produce(r, w); + ringbuf_produce(r, &w); /* Consume (n / 2) bytes. */ len = ringbuf_consume(r, &woff); assert(len == (n / 2) && woff == 0); ringbuf_release(r, len); - ringbuf_unregister(r, w); free(r); } @@ -63,26 +62,25 @@ static void test_multi(void) { ringbuf_t *r = malloc(ringbuf_obj_size); - ringbuf_worker_t *w; + ringbuf_worker_t *w = NULL; size_t len, woff; ssize_t off; - ringbuf_setup(r, MAX_WORKERS, 3); - w = ringbuf_register(r, 0); + ringbuf_setup(r, MAX_WORKERS, MAX_TEMP_WORKERS, 3); /* * Produce 2 bytes. */ - off = ringbuf_acquire(r, w, 1); + off = ringbuf_acquire(r, &w, 1); assert(off == 0); - ringbuf_produce(r, w); + ringbuf_produce(r, &w); - off = ringbuf_acquire(r, w, 1); + off = ringbuf_acquire(r, &w, 1); assert(off == 1); - ringbuf_produce(r, w); + ringbuf_produce(r, &w); - off = ringbuf_acquire(r, w, 1); + off = ringbuf_acquire(r, &w, 1); assert(off == -1); /* @@ -99,18 +97,18 @@ test_multi(void) * Produce another 2 with wrap-around. */ - off = ringbuf_acquire(r, w, 2); + off = ringbuf_acquire(r, &w, 2); assert(off == -1); - off = ringbuf_acquire(r, w, 1); + off = ringbuf_acquire(r, &w, 1); assert(off == 2); - ringbuf_produce(r, w); + ringbuf_produce(r, &w); - off = ringbuf_acquire(r, w, 1); + off = ringbuf_acquire(r, &w, 1); assert(off == 0); - ringbuf_produce(r, w); + ringbuf_produce(r, &w); - off = ringbuf_acquire(r, w, 1); + off = ringbuf_acquire(r, &w, 1); assert(off == -1); /* @@ -125,7 +123,6 @@ test_multi(void) assert(len == 1 && woff == 0); ringbuf_release(r, len); - ringbuf_unregister(r, w); free(r); } @@ -133,18 +130,16 @@ static void test_overlap(void) { ringbuf_t *r = malloc(ringbuf_obj_size); - ringbuf_worker_t *w1, *w2; + ringbuf_worker_t *w1 = NULL, *w2 = NULL; size_t len, woff; ssize_t off; - ringbuf_setup(r, MAX_WORKERS, 10); - w1 = ringbuf_register(r, 0); - w2 = ringbuf_register(r, 1); + ringbuf_setup(r, MAX_WORKERS, MAX_TEMP_WORKERS, 10); /* * Producer 1: acquire 5 bytes. Consumer should fail. */ - off = ringbuf_acquire(r, w1, 5); + off = ringbuf_acquire(r, &w1, 5); assert(off == 0); len = ringbuf_consume(r, &woff); @@ -153,7 +148,7 @@ test_overlap(void) /* * Producer 2: acquire 3 bytes. Consumer should still fail. */ - off = ringbuf_acquire(r, w2, 3); + off = ringbuf_acquire(r, &w2, 3); assert(off == 5); len = ringbuf_consume(r, &woff); @@ -162,7 +157,7 @@ test_overlap(void) /* * Producer 1: commit. Consumer can get the first range. */ - ringbuf_produce(r, w1); + ringbuf_produce(r, &w1); len = ringbuf_consume(r, &woff); assert(len == 5 && woff == 0); ringbuf_release(r, len); @@ -174,13 +169,13 @@ test_overlap(void) * Producer 1: acquire-produce 4 bytes, triggering wrap-around. * Consumer should still fail. */ - off = ringbuf_acquire(r, w1, 4); + off = ringbuf_acquire(r, &w1, 4); assert(off == 0); len = ringbuf_consume(r, &woff); assert(len == 0); - ringbuf_produce(r, w1); + ringbuf_produce(r, &w1); len = ringbuf_consume(r, &woff); assert(len == 0); @@ -188,7 +183,7 @@ test_overlap(void) * Finally, producer 2 commits its 3 bytes. * Consumer can proceed for both ranges. */ - ringbuf_produce(r, w2); + ringbuf_produce(r, &w2); len = ringbuf_consume(r, &woff); assert(len == 3 && woff == 5); ringbuf_release(r, len); @@ -197,8 +192,6 @@ test_overlap(void) assert(len == 4 && woff == 0); ringbuf_release(r, len); - ringbuf_unregister(r, w1); - ringbuf_unregister(r, w2); free(r); } @@ -206,14 +199,12 @@ static void test_random(void) { ringbuf_t *r = malloc(ringbuf_obj_size); - ringbuf_worker_t *w1, *w2; + ringbuf_worker_t *w1 = NULL, *w2 = NULL; ssize_t off1 = -1, off2 = -1; unsigned n = 1000 * 1000 * 50; unsigned char buf[500]; - ringbuf_setup(r, MAX_WORKERS, sizeof(buf)); - w1 = ringbuf_register(r, 0); - w2 = ringbuf_register(r, 1); + ringbuf_setup(r, MAX_WORKERS, MAX_TEMP_WORKERS, sizeof(buf)); while (n--) { size_t len, woff; @@ -237,39 +228,37 @@ test_random(void) break; case 1: // producer 1 if (off1 == -1) { - if ((off1 = ringbuf_acquire(r, w1, len)) >= 0) { + if ((off1 = ringbuf_acquire(r, &w1, len)) >= 0) { assert((size_t)off1 < sizeof(buf)); buf[off1] = len - 1; } } else { buf[off1]++; - ringbuf_produce(r, w1); + ringbuf_produce(r, &w1); off1 = -1; } break; case 2: // producer 2 if (off2 == -1) { - if ((off2 = ringbuf_acquire(r, w2, len)) >= 0) { + if ((off2 = ringbuf_acquire(r, &w2, len)) >= 0) { assert((size_t)off2 < sizeof(buf)); buf[off2] = len - 1; } } else { buf[off2]++; - ringbuf_produce(r, w2); + ringbuf_produce(r, &w2); off2 = -1; } break; } } - ringbuf_unregister(r, w1); - ringbuf_unregister(r, w2); free(r); } int main(void) { - ringbuf_get_sizes(MAX_WORKERS, &ringbuf_obj_size, NULL); + ringbuf_get_sizes(MAX_WORKERS, MAX_TEMP_WORKERS, &ringbuf_obj_size, NULL); test_wraparound(); test_multi(); test_overlap(); diff --git a/src/t_stress.c b/src/t_stress.c index e32fe94..d52ab93 100644 --- a/src/t_stress.c +++ b/src/t_stress.c @@ -101,10 +101,12 @@ static void * ringbuf_stress(void *arg) { const unsigned id = (uintptr_t)arg; - ringbuf_worker_t *w; - - w = ringbuf_register(ringbuf, id); - assert(w != NULL); + ringbuf_worker_t *w = NULL; + if (id == 1) { + w = ringbuf_register(ringbuf, 0); + assert (w != NULL); + } + uint64_t total_xmit = 0, total_not_xmit = 0; /* * There are NCPU threads concurrently generating and producing @@ -123,6 +125,7 @@ ringbuf_stress(void *arg) if (id == 0) { if ((len = ringbuf_consume(ringbuf, &off)) != 0) { + total_xmit += len; size_t rem = len; assert(off < RBUF_SIZE); while (rem) { @@ -136,14 +139,21 @@ ringbuf_stress(void *arg) continue; } len = generate_message(buf, sizeof(buf) - 1); - if ((ret = ringbuf_acquire(ringbuf, w, len)) != -1) { + if ((ret = ringbuf_acquire(ringbuf, &w, len)) != -1) { + total_xmit += len; off = (size_t)ret; assert(off < RBUF_SIZE); memcpy(&rbuf[off], buf, len); - ringbuf_produce(ringbuf, w); - } + ringbuf_produce(ringbuf, &w); + } else + total_not_xmit += len; } pthread_barrier_wait(&barrier); + if (id == 0) + printf ("Thread 0: received %" PRIu64 "\n", total_xmit); + else + printf ("Thread %d: sent %" PRIu64 ", unsent %" PRIu64 "\n", + id, total_xmit, total_not_xmit); pthread_exit(NULL); return NULL; } @@ -178,11 +188,11 @@ run_test(void *func(void *)) /* * Create a ring buffer. */ - ringbuf_get_sizes(nworkers, &ringbuf_obj_size, NULL); + ringbuf_get_sizes(1, nworkers, &ringbuf_obj_size, NULL); ringbuf = malloc(ringbuf_obj_size); assert(ringbuf != NULL); - ringbuf_setup(ringbuf, nworkers, RBUF_SIZE); + ringbuf_setup(ringbuf, 1, nworkers, RBUF_SIZE); memset(rbuf, MAGIC_BYTE, sizeof(rbuf)); /*