-
Notifications
You must be signed in to change notification settings - Fork 4
/
bcq.h
210 lines (191 loc) · 10.3 KB
/
bcq.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
/* Copyright (c) 2010 The Regents of the University of California
* Barret Rhoden <[email protected]>
* See LICENSE for details.
*
* Multi-producer, multi-consumer queues. Designed initially for an untrusted
* consumer.
*/
#ifndef BCQ_H
#define BCQ_H
#include <parlib/atomic.h>
struct bcq_header {
uint32_t prod_idx; /* next to be produced in */
uint32_t cons_pub_idx; /* last completely consumed */
uint32_t cons_pvt_idx; /* last a consumer has dibs on */
};
#define DEFINE_BCQ_TYPES(__name, __elem_t, __num_elems) \
\
/* Wrapper, per element, with the consumption bool */ \
struct __name##_bcq_wrap { \
__elem_t elem; \
bool rdy_for_cons; /* elem is ready for consumption */ \
}; \
\
/* The actual BC queue */ \
struct __name##_bcq { \
struct bcq_header hdr; \
struct __name##_bcq_wrap wraps[__num_elems]; \
};
/* Bounded Concurrent Queues, untrusted consumer
*
* This is a producer/consumer circular buffer, in which the producer never
* blocks and does not need to trust the data structure (which is writable by
* the consumer).
*
* A producer enqueues an item, based on the indexes of the producer and
* consumer. Enqueue cannot block, but can fail if the queue is full or if it
* fails to enqueue after a certain amount of tries.
*
* prod_idx: the next item to be produced
* cons_pvt_idx: the next item a consumer can claim
* cons_pub_idx: the last item (farthest left / oldest) that hasn't been
* consumed/made ready to be clobbered by the producer (it is
* what the consumer produces). Once all are clear, this will be
* the same as the prod_idx.
*
* The number of free slots in the buffer is: BufSize - (prod_idx - cons_pub)
* The power-of-two nature of the number of elements makes this work when it
* wraps around, just like with Xen. Check it yourself with a counter of 8 and
* bufsizes of 8 and 4.
*
*
* General plan:
*
* Producers compete among themselves, using the prod_idx, to get a free spot.
* Once they have a spot, they fill in the item, and then toggle the "ready for
* consumption" bool for a client. If it cannot find one after a number of
* tries, it simply fails (could be a DoS from the client).
*
* Consumers fight with their private index, which they use to determine who is
* consuming which item. If there is an unconsumed item, they try to advance
* the pvt counter. If they succeed, they can consume the item. The item
* might not be there yet, so they must spin until it is there. Then, the
* consumer copies the item out, and clears the bool (rdy_for_cons).
*
* At this point, the consumer needs to make sure the pub_idx is advanced
* enough so the producer knows the item is free. If pub_idx was their item,
* they move it forward to the next item. If it is not, currently, they spin
* and wait until the previous consumer finishes, and then move it forward.
* This isn't ideal, and we can deal with this in the future.
*
* Enqueue will enqueue the item pointed to by elem. Dequeue will write an
* item into the memory pointed to by elem.
*
* The number of items must be a power of two. In the future, we'll probably
* use Xen's rounding macros. Not using powers of two is a pain, esp with mods
* of negative numbers.
*
* Here's how to use it:
*
* DEFINE_BCQ_TYPES(my_name, my_type, my_size);
* struct my_name_bcq some_bcq;
* bcq_init(&some_bcq, my_type, my_size);
*
* bcq_enqueue(&some_bcq, &some_my_type, my_size, num_fails_okay);
* bcq_dequeue(&some_bcq, &some_my_type, my_size);
*
* They both return 0 on success, or some error code on failure.
*
* TODO later:
* How about an atomic_add_return for the prod? Now that we use powers of two,
* CAS is probably overkill.
*
* Automatically round up.
*
* Watch out for ABA. Could use ctrs in the top of the indexes. Not really an
* issue, since that would be a wraparound.
*
* Consumers could also just set their bit, and have whoever has the pub_idx
* right before them be the one to advance it all the way up.
*
* Using uint32_t for now, since that's the comp_and_swap we have. We'll
* probably get other sizes once we're sure we like the current one. */
#if 0 // Defined in the included header
struct bcq_header {
uint32_t prod_idx; /* next to be produced in */
uint32_t cons_pub_idx; /* last completely consumed */
uint32_t cons_pvt_idx; /* last a consumer has dibs on */
};
// This is there too:
#define DEFINE_BCQ_TYPES(__name, __elem_t, __num_elems)
#endif
/* Functions */
#define bcq_init(_bcq, _ele_type, _num_elems) \
({ \
memset((_bcq), 0, sizeof(*(_bcq))); \
assert((_num_elems) == ROUNDUPPWR2(_num_elems)); \
})
/* Num empty buffer slots in the BCQ */
#define BCQ_FREE_SLOTS(_p, _cp, _ne) ((_ne) - ((_p) - (_cp)))
/* Really empty */
#define BCQ_EMPTY(_p, _cp, _ne) ((_ne) == BCQ_FREE_SLOTS(_p, _cp, _ne))
/* All work claimed by a consumer */
#define BCQ_NO_WORK(_p, _cpv) ((_p) == (_cpv))
/* Buffer full */
#define BCQ_FULL(_p, _cp, _ne) (0 == BCQ_FREE_SLOTS(_p, _cp, _ne))
/* Figure out the slot you want, bail if it's full, or if you failed too many
* times, CAS to set the new prod. Fill yours in, toggle the bool. Sorry, the
* macro got a bit ugly, esp with the __retval hackery. */
#define bcq_enqueue(_bcq, _elem, _num_elems, _num_fail) \
({ \
uint32_t __prod, __new_prod, __cons_pub, __failctr = 0; \
int __retval = 0; \
do { \
cmb(); \
if (((_num_fail)) && (__failctr++ >= (_num_fail))) { \
__retval = -EAGAIN; \
break; \
} \
__prod = (_bcq)->hdr.prod_idx; \
__cons_pub = (_bcq)->hdr.cons_pub_idx; \
if (BCQ_FULL(__prod, __cons_pub, (_num_elems))) { \
__retval = -EBUSY; \
break; \
} \
__new_prod = __prod + 1; \
} while (!atomic_cas_u32(&(_bcq)->hdr.prod_idx, __prod, __new_prod)); \
if (!__retval) { \
/* from here out, __prod is the local __prod that we won */ \
(_bcq)->wraps[__prod & ((_num_elems)-1)].elem = *(_elem); \
wmb(); \
(_bcq)->wraps[__prod & ((_num_elems)-1)].rdy_for_cons = TRUE; \
} \
__retval; \
})
/* Similar to enqueue, spin afterwards til cons_pub is our element, then
* advance it. */
#define bcq_dequeue(_bcq, _elem, _num_elems) \
({ \
uint32_t __prod, __cons_pvt, __new_cons_pvt, __cons_pub; \
int __retval = 0; \
do { \
cmb(); \
__prod = (_bcq)->hdr.prod_idx; \
__cons_pvt = (_bcq)->hdr.cons_pvt_idx; \
if (BCQ_NO_WORK(__prod, __cons_pvt)) { \
__retval = -EBUSY; \
break; \
} \
__new_cons_pvt = (__cons_pvt + 1); \
} while (!atomic_cas_u32(&(_bcq)->hdr.cons_pvt_idx, __cons_pvt, \
__new_cons_pvt)); \
if (!__retval) { \
/* from here out, __cons_pvt is the local __cons_pvt that we won */ \
/* wait for the producer to finish copying it in */ \
while (!(_bcq)->wraps[__cons_pvt & ((_num_elems)-1)].rdy_for_cons) \
cpu_relax(); \
*(_elem) = (_bcq)->wraps[__cons_pvt & ((_num_elems)-1)].elem; \
(_bcq)->wraps[__cons_pvt & ((_num_elems)-1)].rdy_for_cons = FALSE; \
/* wait til we're the cons_pub, then advance it by one */ \
while ((_bcq)->hdr.cons_pub_idx != __cons_pvt) \
cpu_relax(); \
(_bcq)->hdr.cons_pub_idx = __cons_pvt + 1; \
} \
__retval; \
})
/* Checks of a bcq is empty (meaning no work), instead of trying to dequeue */
#define bcq_empty(_bcq) \
BCQ_NO_WORK((_bcq)->hdr.prod_idx, (_bcq)->hdr.cons_pvt_idx)
#define bcq_nr_full(_bcq) \
((_bcq)->hdr.prod_idx - (_bcq)->hdr.cons_pub_idx)
#endif /* ROS_INC_BCQ_H */