From c3f19e6127f0acc383448cc0a07dc67d4d9dbaa0 Mon Sep 17 00:00:00 2001 From: ulatekh Date: Sun, 11 Feb 2018 10:10:53 -0700 Subject: [PATCH] If register_worker() acquires the worker-record, advancing the index becomes optional, i.e. it's OK if some other thread advances the index. This should be slightly faster, i.e. less reasons to retry. Also modified the stress-test to print the number of bytes transmitted, and the number of bytes each thread tried to transmit. --- src/ringbuf.c | 16 +++++----------- src/t_stress.c | 13 +++++++++---- 2 files changed, 14 insertions(+), 15 deletions(-) diff --git a/src/ringbuf.c b/src/ringbuf.c index 3032a12..155b716 100644 --- a/src/ringbuf.c +++ b/src/ringbuf.c @@ -183,18 +183,12 @@ register_worker(ringbuf_t *rbuf, unsigned registration_type) if (!atomic_compare_exchange_weak(&w->registered, not_registered, being_registered)) continue; acquired = true; + w->seen_off = RBUF_OFF_MAX; + atomic_thread_fence(memory_order_release); + w->registered = registration_type; - /* Swap the indexes to exclusively acquire the worker-record. */ - if (atomic_compare_exchange_weak(p_free_worker, prev_free_worker, new_free_worker)) { - w->seen_off = RBUF_OFF_MAX; - atomic_thread_fence(memory_order_release); - w->registered = registration_type; - break; - } - - /* The worker-record was not successfully acquired. */ - w->registered = not_registered; - acquired = false; + /* Advance the index if no one else has. */ + atomic_compare_exchange_weak(p_free_worker, prev_free_worker, new_free_worker); } } diff --git a/src/t_stress.c b/src/t_stress.c index 924516c..14880dc 100644 --- a/src/t_stress.c +++ b/src/t_stress.c @@ -106,7 +106,7 @@ ringbuf_stress(void *arg) w = ringbuf_register(ringbuf); assert (w != NULL); } - uint64_t total_recv = 0; + uint64_t total_xmit = 0, total_not_xmit = 0; /* * There are NCPU threads concurrently generating and producing @@ -125,7 +125,7 @@ ringbuf_stress(void *arg) if (id == 0) { if ((len = ringbuf_consume(ringbuf, &off)) != 0) { - total_recv += len; + total_xmit += len; size_t rem = len; assert(off < RBUF_SIZE); while (rem) { @@ -140,15 +140,20 @@ ringbuf_stress(void *arg) } len = generate_message(buf, sizeof(buf) - 1); if ((ret = ringbuf_acquire(ringbuf, &w, len)) != -1) { + total_xmit += len; off = (size_t)ret; assert(off < RBUF_SIZE); memcpy(&rbuf[off], buf, len); ringbuf_produce(ringbuf, &w); - } + } else + total_not_xmit += len; } pthread_barrier_wait(&barrier); if (id == 0) - printf ("Total received: %" PRIu64 "\n", total_recv); + printf ("Thread 0: received %" PRIu64 "\n", total_xmit); + else + printf ("Thread %d: sent %" PRIu64 ", unsent %" PRIu64 "\n", + id, total_xmit, total_not_xmit); pthread_exit(NULL); return NULL; }