Skip to content

Commit

Permalink
review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
moh-osman3 committed Apr 23, 2024
1 parent 0352178 commit 19161d8
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 4 deletions.
4 changes: 3 additions & 1 deletion collector/admission/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

## Overview

The admission package provides a BoundedQueue object which is a semaphore implementation that limits the number of bytes admitted into a collector pipeline. Additionally the BoundedQueue limits the number of waiters that can block on a call to `bq.Acquire(sz int64)`. The motivation for this object is to improve memory issues that can occur with collectors experiencing high traffic. The `exporterhelper` was a huge pain point for memory issues (An option to avoid data drops due to a full queue is to increase the exporterhelper queue size which requires the collector to hold more memory). The `concurrentbatchprocessor` can mitigate some of the issues of the exporterhelper by applying backpressure and using an inflight memory limiter within the processor, but memory issues can still occur in preceding components (e.g. the otelarrow receiver). Therefore, the BoundedQueue should help limit memory within the entire collector pipeline by limiting two dimensions that cause memory issues
The admission package provides a BoundedQueue object which is a semaphore implementation that limits the number of bytes admitted into a collector pipeline. Additionally the BoundedQueue limits the number of waiters that can block on a call to `bq.Acquire(sz int64)`.

This package is an experiment to improve the behavior of Collector pipelines having their `exporterhelper` configured to apply backpressure. This package is meant to be used in receivers, via an interceptor or custom logic. Therefore, the BoundedQueue helps limit memory within the entire collector pipeline by limiting two dimensions that cause memory issues:
1. bytes: large requests that enter the collector pipeline can require large allocations even if downstream components will eventually limit or ratelimit the request.
2. waiters: limiting on bytes alone is not enough because requests that enter the pipeline and block on `bq.Acquire()` can still consume memory within the receiver. If there are enough waiters this can be a significant contribution to memory usage.

Expand Down
6 changes: 4 additions & 2 deletions collector/admission/boundedqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ import (
orderedmap "github.com/wk8/go-ordered-map/v2"
)

var ErrTooManyWaiters = fmt.Errorf("rejecting request, too many waiters")

type BoundedQueue struct {
maxLimitBytes int64
maxLimitWaiters int64
currentBytes int64
currentWaiters int64
lock sync.Mutex
// waiters waiters
waiters *orderedmap.OrderedMap[uuid.UUID, waiter]
}

Expand Down Expand Up @@ -48,7 +49,7 @@ func (bq *BoundedQueue) admit(pendingBytes int64) (bool, error) {

// since we were unable to admit, check if we can wait.
if bq.currentWaiters + 1 > bq.maxLimitWaiters { // too many waiters
return false, fmt.Errorf("rejecting request, too many waiters")
return false, ErrTooManyWaiters
}

// if we got to this point we need to wait to acquire bytes, so update currentWaiters before releasing mutex.
Expand Down Expand Up @@ -128,6 +129,7 @@ func (bq *BoundedQueue) Release(pendingBytes int64) error {
if !found {
return fmt.Errorf("deleting waiter that doesn't exist")
}
continue

} else {
break
Expand Down
2 changes: 1 addition & 1 deletion collector/admission/boundedqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func TestAcquireBoundedWithWaiters(t *testing.T) {

bq.lock.Lock()
if tooManyWaiters {
assert.ErrorContains(t, errs, "rejecting request, too many waiters")
assert.ErrorContains(t, errs, ErrTooManyWaiters.Error())
} else {
assert.NoError(t, errs)
}
Expand Down

0 comments on commit 19161d8

Please sign in to comment.