-
Notifications
You must be signed in to change notification settings - Fork 81
/
Copy pathringbuffer.c
375 lines (339 loc) · 13.9 KB
/
ringbuffer.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
/**
* Ring buffer is a fixed-size queue, implemented as a table of
* pointers. Head and tail pointers are modified atomically, allowing
* concurrent access to it. It has the following features:
* - FIFO (First In First Out)
* - Maximum size is fixed; the pointers are stored in a table.
* - Lockless implementation.
*
* The ring buffer implementation is not preemptable.
*/
#include <errno.h>
#include <stdbool.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
/* typically 64 bytes on x86/x64 CPUs */
#define CACHE_LINE_SIZE 64
#ifndef __compiler_barrier
#define __compiler_barrier() \
do { \
asm volatile("" : : : "memory"); \
} while (0)
#endif
/* The producer and the consumer have a head and a tail index. The particularity
* of these index is that they are not between 0 and size(ring). These indexes
* are between 0 and 2^32, and we mask their value when we access the ring[]
* field. Thanks to this assumption, we can do subtractions between 2 index
* values in a modulo-32bit base: that is why the overflow of the indexes is not
* a problem.
*/
typedef struct {
struct { /** Ring producer status. */
uint32_t watermark; /**< Maximum items before EDQUOT. */
uint32_t size; /**< Size of ring buffer. */
uint32_t mask; /**< Mask (size - 1) of ring buffer. */
volatile uint32_t head, tail; /**< Producer head and tail. */
} prod __attribute__((__aligned__(CACHE_LINE_SIZE)));
struct { /** Ring consumer status. */
uint32_t size; /**< Size of the ring buffer. */
uint32_t mask; /**< Mask (size - 1) of ring buffer. */
volatile uint32_t head, tail; /**< Consumer head and tail. */
} cons __attribute__((__aligned__(CACHE_LINE_SIZE)));
void *ring[] __attribute__((__aligned__(CACHE_LINE_SIZE)));
} ringbuf_t;
/* true if x is a power of 2 */
#define IS_POWEROF2(x) ((((x) -1) & (x)) == 0)
#define RING_SIZE_MASK (unsigned) (0x0fffffff) /**< Ring size mask */
#define ALIGN_CEIL(val, align) \
(typeof(val))((val) + (-(typeof(val))(val) & ((align) -1)))
/* Calculate the memory size needed for a ring buffer.
*
* This function returns the number of bytes needed for a ring buffer, given
* the number of elements in it. This value is the sum of the size of the
* structure ringbuf and the size of the memory needed by the objects pointers.
* The value is aligned to a cache line size.
*
* @param count
* The number of elements in the ring buffer (must be a power of 2).
* @return
* - The memory size occupied by the ring buffer on success.
* - -EINVAL if count is not a power of 2.
*/
ssize_t ringbuf_get_memsize(const unsigned count)
{
/* Requested size is invalid, must be power of 2, and do not exceed the
* size limit RING_SIZE_MASK.
*/
if ((!IS_POWEROF2(count)) || (count > RING_SIZE_MASK))
return -EINVAL;
ssize_t sz = sizeof(ringbuf_t) + count * sizeof(void *);
sz = ALIGN_CEIL(sz, CACHE_LINE_SIZE);
return sz;
}
/* Initialize a ring buffer.
*
* The ring size is set to *count*, which must be a power of two. Water
* marking is disabled by default. The real usable ring size is (count - 1)
* instead of (count) to differentiate a free ring from an empty ring buffer.
*
* @param r
* The pointer to the ring buffer structure followed by the objects table.
* @param count
* The number of elements in the ring buffer (must be a power of 2).
* @return
* 0 on success, or a negative value on error.
*/
int ringbuf_init(ringbuf_t *r, const unsigned count)
{
memset(r, 0, sizeof(*r));
r->prod.watermark = count, r->prod.size = r->cons.size = count;
r->prod.mask = r->cons.mask = count - 1;
r->prod.head = r->cons.head = 0, r->prod.tail = r->cons.tail = 0;
return 0;
}
/* Create a ring buffer.
*
* The real usable ring size is (count - 1) instead of (count) to
* differentiate a free ring from an empty ring buffer.
*
* @param count
* The size of the ring (must be a power of 2).
* @return
* On success, the pointer to the new allocated ring buffer. NULL on error
* with errno set appropriately. Possible errno values include:
* - EINVAL - count provided is not a power of 2
* - ENOSPC - the maximum number of memzones has already been allocated
* - EEXIST - a memzone with the same name already exists
* - ENOMEM - no appropriate memory area found in which to create memzone
*/
ringbuf_t *ringbuf_create(const unsigned count)
{
ssize_t ring_size = ringbuf_get_memsize(count);
if (ring_size < 0)
return NULL;
ringbuf_t *r = malloc(ring_size);
if (r)
ringbuf_init(r, count);
return r;
}
/* Release all memory used by the ring buffer.
*
* @param r
* Ring to free
*/
void ringbuf_free(ringbuf_t *r)
{
free(r);
}
/* The actual enqueue of pointers on the ring buffer.
* Placed here since identical code needed in both single- and multi- producer
* enqueue functions.
*/
#define ENQUEUE_PTRS() \
do { \
const uint32_t size = r->prod.size; \
uint32_t i, idx = prod_head & mask; \
if (idx + n < size) { \
for (i = 0; i < (n & ((~(unsigned) 0x3))); i += 4, idx += 4) { \
r->ring[idx] = obj_table[i]; \
r->ring[idx + 1] = obj_table[i + 1]; \
r->ring[idx + 2] = obj_table[i + 2]; \
r->ring[idx + 3] = obj_table[i + 3]; \
} \
switch (n & 0x3) { \
case 3: \
r->ring[idx++] = obj_table[i++]; \
case 2: \
r->ring[idx++] = obj_table[i++]; \
case 1: \
r->ring[idx++] = obj_table[i++]; \
} \
} else { \
for (i = 0; idx < size; i++, idx++) \
r->ring[idx] = obj_table[i]; \
for (idx = 0; i < n; i++, idx++) \
r->ring[idx] = obj_table[i]; \
} \
} while (0)
/* The actual copy of pointers on the ring to obj_table.
* Placed here since identical code needed in both single- and multi- consumer
* dequeue functions.
*/
#define DEQUEUE_PTRS() \
do { \
uint32_t idx = cons_head & mask; \
uint32_t i, size = r->cons.size; \
if (idx + n < size) { \
for (i = 0; i < (n & (~(unsigned) 0x3)); i += 4, idx += 4) { \
obj_table[i] = r->ring[idx]; \
obj_table[i + 1] = r->ring[idx + 1]; \
obj_table[i + 2] = r->ring[idx + 2]; \
obj_table[i + 3] = r->ring[idx + 3]; \
} \
switch (n & 0x3) { \
case 3: \
obj_table[i++] = r->ring[idx++]; \
case 2: \
obj_table[i++] = r->ring[idx++]; \
case 1: \
obj_table[i++] = r->ring[idx++]; \
} \
} else { \
for (i = 0; idx < size; i++, idx++) \
obj_table[i] = r->ring[idx]; \
for (idx = 0; i < n; i++, idx++) \
obj_table[i] = r->ring[idx]; \
} \
} while (0)
/* Enqueue several objects on a ring buffer (NOT multi-producers safe).
*
* @param r
* A pointer to the ring buffer structure.
* @param obj_table
* A pointer to a table of void * pointers (objects).
* @param n
* The number of objects to add in the ring buffer from the obj_table.
* @return
* - 0: Success; objects enqueue.
* - -EDQUOT: Quota exceeded. The objects have been enqueued, but the
* high water mark is exceeded.
* - -ENOBUFS: Not enough room in the ring to enqueue, no object is enqueued.
*/
static inline int ringbuffer_sp_do_enqueue(ringbuf_t *r,
void *const *obj_table,
const unsigned n)
{
uint32_t mask = r->prod.mask;
uint32_t prod_head = r->prod.head, cons_tail = r->cons.tail;
/* The subtraction is done between two unsigned 32-bits value (the result
* is always modulo 32 bits even if we have prod_head > cons_tail). So
* @free_entries is always between 0 and size(ring) - 1.
*/
uint32_t free_entries = mask + cons_tail - prod_head;
/* check that we have enough room in ring buffer */
if ((n > free_entries))
return -ENOBUFS;
uint32_t prod_next = prod_head + n;
r->prod.head = prod_next;
/* write entries in ring buffer */
ENQUEUE_PTRS();
__compiler_barrier();
r->prod.tail = prod_next;
/* if we exceed the watermark */
return ((mask + 1) - free_entries + n) > r->prod.watermark ? -EDQUOT : 0;
}
/* Dequeue several objects from a ring buffer (NOT multi-consumers safe).
* When the request objects are more than the available objects, only dequeue
* the actual number of objects
*
* @param r
* A pointer to the ring buffer structure.
* @param obj_table
* A pointer to a table of void * pointers (objects) that will be filled.
* @param n
* The number of objects to dequeue from the ring buffer to the obj_table.
* @return
* - 0: Success; objects dequeued.
* - -ENOENT: Not enough entries in the ring buffer to dequeue; no object is
* dequeued.
*/
static inline int ringbuffer_sc_do_dequeue(ringbuf_t *r,
void **obj_table,
const unsigned n)
{
uint32_t mask = r->prod.mask;
uint32_t cons_head = r->cons.head, prod_tail = r->prod.tail;
/* The subtraction is done between two unsigned 32-bits value (the result
* is always modulo 32 bits even if we have cons_head > prod_tail). So
* @entries is always between 0 and size(ring) - 1.
*/
uint32_t entries = prod_tail - cons_head;
if (n > entries)
return -ENOENT;
uint32_t cons_next = cons_head + n;
r->cons.head = cons_next;
/* copy in table */
DEQUEUE_PTRS();
__compiler_barrier();
r->cons.tail = cons_next;
return 0;
}
/* Enqueue one object on a ring buffer (NOT multi-producers safe).
*
* @param r
* A pointer to the ring buffer structure.
* @param obj
* A pointer to the object to be added.
* @return
* - 0: Success; objects enqueued.
* - -EDQUOT: Quota exceeded. The objects have been enqueued, but the
* high water mark is exceeded.
* - -ENOBUFS: Not enough room in the ring buffer to enqueue; no object
* is enqueued.
*/
static inline int ringbuf_sp_enqueue(ringbuf_t *r, void *obj)
{
return ringbuffer_sp_do_enqueue(r, &obj, 1);
}
/**
* Dequeue one object from a ring buffer (NOT multi-consumers safe).
*
* @param r
* A pointer to the ring structure.
* @param obj_p
* A pointer to a void * pointer (object) that will be filled.
* @return
* - 0: Success; objects dequeued.
* - -ENOENT: Not enough entries in the ring buffer to dequeue, no object
* is dequeued.
*/
static inline int ringbuf_sc_dequeue(ringbuf_t *r, void **obj_p)
{
return ringbuffer_sc_do_dequeue(r, obj_p, 1);
}
/* Test if a ring buffer is full.
*
* @param r
* A pointer to the ring structure.
* @return
* - true: The ring is full.
* - false: The ring is not full.
*/
static inline bool ringbuf_is_full(const ringbuf_t *r)
{
uint32_t prod_tail = r->prod.tail, cons_tail = r->cons.tail;
return ((cons_tail - prod_tail - 1) & r->prod.mask) == 0;
}
/* Test if a ring buffer is empty.
*
* @param r
* A pointer to the ring structure.
* @return
* - true: The ring is empty.
* - false: The ring is not empty.
*/
static inline bool ringbuf_is_empty(const ringbuf_t *r)
{
uint32_t prod_tail = r->prod.tail, cons_tail = r->cons.tail;
return cons_tail == prod_tail;
}
#include <assert.h>
int main(void)
{
ringbuf_t *r = ringbuf_create((1 << 6));
if (!r) {
printf("Fail to create ring buffer.\n");
return -1;
}
for (int i = 0; !ringbuf_is_full(r); i++)
ringbuf_sp_enqueue(r, *(void **) &i);
for (int i = 0; !ringbuf_is_empty(r); i++) {
void *obj;
ringbuf_sc_dequeue(r, &obj);
assert(i == *(int *) &obj);
}
ringbuf_free(r);
return 0;
}