Skip to content

Commit

Permalink
admission: lock work queue before reading waiting length
Browse files Browse the repository at this point in the history
When replicated work is submitted for admission, it returns early and
admission proceeds asynchronously to the caller. When `V(1)` is enabled,
we also log the current queue length in this code path, which is
protected by a mutex that wasn't acquired previously.

Acquire the queue mutex when `V(1)` is enabled to prevent a race.

Part of: #130187
Release note: None
  • Loading branch information
kvoli committed Sep 19, 2024
1 parent 6aaa174 commit 097cdb7
Showing 1 changed file with 10 additions and 4 deletions.
14 changes: 10 additions & 4 deletions pkg/util/admission/work_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -756,9 +756,12 @@ func (q *WorkQueue) Admit(ctx context.Context, info WorkInfo) (enabled bool, err
q.metrics.recordStartWait(info.Priority)
if info.ReplicatedWorkInfo.Enabled {
if log.V(1) {
q.mu.Lock()
queueLen := tenant.waitingWorkHeap.Len()
q.mu.Unlock()

log.Infof(ctx, "async-path: len(waiting-work)=%d: enqueued t%d pri=%s r%s origin=n%s log-position=%s ingested=%t",
tenant.waitingWorkHeap.Len(),
tenant.id, info.Priority,
queueLen, tenant.id, info.Priority,
info.ReplicatedWorkInfo.RangeID,
info.ReplicatedWorkInfo.Origin,
info.ReplicatedWorkInfo.LogPosition,
Expand Down Expand Up @@ -916,9 +919,12 @@ func (q *WorkQueue) granted(grantChainID grantChainID) int64 {
// NB: We don't use grant chains for store tokens, so they don't apply
// to replicated writes.
if log.V(1) {
q.mu.Lock()
queueLen := tenant.waitingWorkHeap.Len()
q.mu.Unlock()

log.Infof(q.ambientCtx, "async-path: len(waiting-work)=%d dequeued t%d pri=%s r%s origin=n%s log-position=%s ingested=%t",
tenant.waitingWorkHeap.Len(),
tenant.id, item.priority,
queueLen, tenant.id, item.priority,
item.replicated.RangeID,
item.replicated.Origin,
item.replicated.LogPosition,
Expand Down

0 comments on commit 097cdb7

Please sign in to comment.