diff --git a/runtime_scan/pkg/orchestrator/common/queue.go b/runtime_scan/pkg/orchestrator/common/queue.go index 943a3b956..67c1ff342 100644 --- a/runtime_scan/pkg/orchestrator/common/queue.go +++ b/runtime_scan/pkg/orchestrator/common/queue.go @@ -82,12 +82,19 @@ func NewQueue[T ReconcileEvent]() *Queue[T] { // "Done" is called with the dequeued item to acknowledge its processing is // completed. func (q *Queue[T]) Dequeue(ctx context.Context) (T, error) { + // Grab the lock so that we can check the length of q.queue safely. + q.l.Lock() if len(q.queue) == 0 { + // Unlock while we wait for an item to be added to the queue + q.l.Unlock() + // If the queue is empty, block waiting for the itemAdded // notification or context timeout. select { case <-q.itemAdded: - // continue + // We know we have an item added to the queue so grab + // the lock so that we can dequeue it safely + q.l.Lock() case <-ctx.Done(): var empty T return empty, fmt.Errorf("failed to get item: %w", ctx.Err()) @@ -100,8 +107,6 @@ func (q *Queue[T]) Dequeue(ctx context.Context) (T, error) { default: } } - - q.l.Lock() defer q.l.Unlock() item := q.queue[0]