Skip to content

Commit

Permalink
Fix data race checking the queue length in dequeue (#454)
Browse files Browse the repository at this point in the history
If we are working with q.queue we must hold the lock to make sure there
is no race conditions. During Dequeue we weren't holding the lock when
we checked the length of queue.

After this commit we hold the lock to check the length of q.queue. If
the queue is empty we unlock the mutex to allow an item to be added to
the queue while we wait for a itemAdded notification. The lock is then
regained to dequeue the item that was added.

If the queue isn't empty we continue to hold the lock and move straight
to dequeue the item defering the unlock until the function completes.
  • Loading branch information
Sam Betts committed Jul 13, 2023
1 parent b624ade commit d8248a0
Showing 1 changed file with 8 additions and 3 deletions.
11 changes: 8 additions & 3 deletions runtime_scan/pkg/orchestrator/common/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,19 @@ func NewQueue[T ReconcileEvent]() *Queue[T] {
// "Done" is called with the dequeued item to acknowledge its processing is
// completed.
func (q *Queue[T]) Dequeue(ctx context.Context) (T, error) {
// Grab the lock so that we can check the length of q.queue safely.
q.l.Lock()
if len(q.queue) == 0 {
// Unlock while we wait for an item to be added to the queue
q.l.Unlock()

// If the queue is empty, block waiting for the itemAdded
// notification or context timeout.
select {
case <-q.itemAdded:
// continue
// We know we have an item added to the queue so grab
// the lock so that we can dequeue it safely
q.l.Lock()
case <-ctx.Done():
var empty T
return empty, fmt.Errorf("failed to get item: %w", ctx.Err())
Expand All @@ -100,8 +107,6 @@ func (q *Queue[T]) Dequeue(ctx context.Context) (T, error) {
default:
}
}

q.l.Lock()
defer q.l.Unlock()

item := q.queue[0]
Expand Down

0 comments on commit d8248a0

Please sign in to comment.