From 097cdb77c70d1644947e1233a71ac872cf89966e Mon Sep 17 00:00:00 2001 From: Austen McClernon Date: Thu, 19 Sep 2024 16:44:58 +0000 Subject: [PATCH] admission: lock work queue before reading waiting length When replicated work is submitted for admission, it returns early and admission proceeds asynchronously to the caller. When `V(1)` is enabled, we also log the current queue length in this code path, which is protected by a mutex that wasn't acquired previously. Acquire the queue mutex when `V(1)` is enabled to prevent a race. Part of: #130187 Release note: None --- pkg/util/admission/work_queue.go | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/pkg/util/admission/work_queue.go b/pkg/util/admission/work_queue.go index 7db1cd6a03b3..4a12577be025 100644 --- a/pkg/util/admission/work_queue.go +++ b/pkg/util/admission/work_queue.go @@ -756,9 +756,12 @@ func (q *WorkQueue) Admit(ctx context.Context, info WorkInfo) (enabled bool, err q.metrics.recordStartWait(info.Priority) if info.ReplicatedWorkInfo.Enabled { if log.V(1) { + q.mu.Lock() + queueLen := tenant.waitingWorkHeap.Len() + q.mu.Unlock() + log.Infof(ctx, "async-path: len(waiting-work)=%d: enqueued t%d pri=%s r%s origin=n%s log-position=%s ingested=%t", - tenant.waitingWorkHeap.Len(), - tenant.id, info.Priority, + queueLen, tenant.id, info.Priority, info.ReplicatedWorkInfo.RangeID, info.ReplicatedWorkInfo.Origin, info.ReplicatedWorkInfo.LogPosition, @@ -916,9 +919,12 @@ func (q *WorkQueue) granted(grantChainID grantChainID) int64 { // NB: We don't use grant chains for store tokens, so they don't apply // to replicated writes. if log.V(1) { + q.mu.Lock() + queueLen := tenant.waitingWorkHeap.Len() + q.mu.Unlock() + log.Infof(q.ambientCtx, "async-path: len(waiting-work)=%d dequeued t%d pri=%s r%s origin=n%s log-position=%s ingested=%t", - tenant.waitingWorkHeap.Len(), - tenant.id, item.priority, + queueLen, tenant.id, item.priority, item.replicated.RangeID, item.replicated.Origin, item.replicated.LogPosition,