Skip to content

Commit

Permalink
If register_worker() acquires the worker-record, advancing the index
Browse files Browse the repository at this point in the history
becomes optional, i.e. it's OK if some other thread advances the index.

This should be slightly faster, i.e. less reasons to retry.

Also modified the stress-test to print the number of bytes transmitted,
and the number of bytes each thread tried to transmit.
  • Loading branch information
ulatekh committed Feb 11, 2018
1 parent 6ce2969 commit c3f19e6
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 15 deletions.
16 changes: 5 additions & 11 deletions src/ringbuf.c
Original file line number Diff line number Diff line change
Expand Up @@ -183,18 +183,12 @@ register_worker(ringbuf_t *rbuf, unsigned registration_type)
if (!atomic_compare_exchange_weak(&w->registered, not_registered, being_registered))
continue;
acquired = true;
w->seen_off = RBUF_OFF_MAX;
atomic_thread_fence(memory_order_release);
w->registered = registration_type;

/* Swap the indexes to exclusively acquire the worker-record. */
if (atomic_compare_exchange_weak(p_free_worker, prev_free_worker, new_free_worker)) {
w->seen_off = RBUF_OFF_MAX;
atomic_thread_fence(memory_order_release);
w->registered = registration_type;
break;
}

/* The worker-record was not successfully acquired. */
w->registered = not_registered;
acquired = false;
/* Advance the index if no one else has. */
atomic_compare_exchange_weak(p_free_worker, prev_free_worker, new_free_worker);
}
}

Expand Down
13 changes: 9 additions & 4 deletions src/t_stress.c
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ ringbuf_stress(void *arg)
w = ringbuf_register(ringbuf);
assert (w != NULL);
}
uint64_t total_recv = 0;
uint64_t total_xmit = 0, total_not_xmit = 0;

/*
* There are NCPU threads concurrently generating and producing
Expand All @@ -125,7 +125,7 @@ ringbuf_stress(void *arg)

if (id == 0) {
if ((len = ringbuf_consume(ringbuf, &off)) != 0) {
total_recv += len;
total_xmit += len;
size_t rem = len;
assert(off < RBUF_SIZE);
while (rem) {
Expand All @@ -140,15 +140,20 @@ ringbuf_stress(void *arg)
}
len = generate_message(buf, sizeof(buf) - 1);
if ((ret = ringbuf_acquire(ringbuf, &w, len)) != -1) {
total_xmit += len;
off = (size_t)ret;
assert(off < RBUF_SIZE);
memcpy(&rbuf[off], buf, len);
ringbuf_produce(ringbuf, &w);
}
} else
total_not_xmit += len;
}
pthread_barrier_wait(&barrier);
if (id == 0)
printf ("Total received: %" PRIu64 "\n", total_recv);
printf ("Thread 0: received %" PRIu64 "\n", total_xmit);
else
printf ("Thread %d: sent %" PRIu64 ", unsent %" PRIu64 "\n",
id, total_xmit, total_not_xmit);
pthread_exit(NULL);
return NULL;
}
Expand Down

0 comments on commit c3f19e6

Please sign in to comment.