Skip to content

Commit

Permalink
chore(queue): add queue as pkg (stackrox#7884)
Browse files Browse the repository at this point in the history
Co-authored-by: Alex Rukletsov <[email protected]>
  • Loading branch information
dhaus67 and rukletsov authored Sep 30, 2023
1 parent 478ddec commit 0b86824
Show file tree
Hide file tree
Showing 4 changed files with 185 additions and 94 deletions.
90 changes: 0 additions & 90 deletions pkg/administration/events/stream/queue.go

This file was deleted.

32 changes: 28 additions & 4 deletions pkg/administration/events/stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,38 @@ package stream
import (
"testing"

"github.com/prometheus/client_golang/prometheus"
"github.com/stackrox/rox/pkg/administration/events"
"github.com/stackrox/rox/pkg/concurrency"
"github.com/stackrox/rox/pkg/metrics"
"github.com/stackrox/rox/pkg/queue"
)

const (
// Sample calculation with a sample administration event (250 chars in message + hint):
// 1 Administration event = 160 bytes
// 100000 *160 bytes = 16 MB
maxQueueSize = 100000
)

var (
administrationEventsQueueCounter = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: metrics.PrometheusNamespace,
Subsystem: metrics.CentralSubsystem.String(),
Name: "administration_events_queue_size_total",
Help: "A counter that tracks the size of the administration events queue",
}, []string{"Operation"})
)

func init() {
prometheus.MustRegister(administrationEventsQueueCounter)
}

// newStream creates a new event stream.
func newStream() *streamImpl {
return &streamImpl{
queue: newQueue(),
queue: queue.NewQueue[*events.AdministrationEvent](queue.WithMaxSize[*events.AdministrationEvent](maxQueueSize),
queue.WithCounterVec[*events.AdministrationEvent](administrationEventsQueueCounter)),
}
}

Expand All @@ -20,16 +44,16 @@ func GetStreamForTesting(_ *testing.T) *streamImpl {
}

type streamImpl struct {
queue *administrationEventsQueue
queue *queue.Queue[*events.AdministrationEvent]
}

// Consume returns an event.
// Note that this is blocking and waits for events to be emitted before returning.
func (s *streamImpl) Consume(waitable concurrency.Waitable) *events.AdministrationEvent {
return s.queue.pullBlocking(waitable)
return s.queue.PullBlocking(waitable)
}

// Produce adds an event to the stream.
func (s *streamImpl) Produce(event *events.AdministrationEvent) {
s.queue.push(event)
s.queue.Push(event)
}
119 changes: 119 additions & 0 deletions pkg/queue/queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package queue

import (
"container/list"

"github.com/prometheus/client_golang/prometheus"
"github.com/stackrox/rox/pkg/concurrency"
"github.com/stackrox/rox/pkg/logging"
"github.com/stackrox/rox/pkg/metrics"
"github.com/stackrox/rox/pkg/sync"
)

var (
log = logging.LoggerForModule()
)

// Queue provides a thread-safe queue for type T.
// The queue allows to push, pull, and blocking pull.
// Additionally, it exposes safety guards such as a max size as well as metrics to track the queue growth and size.
type Queue[T comparable] struct {
maxSize int
counterMetric *prometheus.CounterVec
queue *list.List
notEmptySignal concurrency.Signal
mutex sync.Mutex
}

// OptionFunc provides options for the queue.
// Note that the comparable type is currently required, once we upgrade to go1.20 we can switch this to
// any and creation will be much easier.
type OptionFunc[T comparable] func(queue *Queue[T])

// WithCounterVec provides a counter vec which tracks added and removed items from the queue.
func WithCounterVec[T comparable](vec *prometheus.CounterVec) OptionFunc[T] {
return func(queue *Queue[T]) {
queue.counterMetric = vec
}
}

// WithMaxSize provides a limit to the size of the queue. By default, no size limit is set so the queue is
// unbounded.
func WithMaxSize[T comparable](size int) OptionFunc[T] {
return func(queue *Queue[T]) {
queue.maxSize = size
}
}

// NewQueue creates a new queue. Optionally, a metric can be included.
func NewQueue[T comparable](opts ...OptionFunc[T]) *Queue[T] {
queue := &Queue[T]{
notEmptySignal: concurrency.NewSignal(),
queue: list.New(),
}

for _, opt := range opts {
opt(queue)
}

return queue
}

// Pull will pull an item from the queue. If the queue is empty, the default value of T will be returned.
// Note that his does not wait for items to be available in the queue, use PullBlocking instead.
func (q *Queue[T]) Pull() T {
q.mutex.Lock()
defer q.mutex.Unlock()

if q.queue.Len() == 0 {
var nilT T
return nilT
}

item := q.queue.Remove(q.queue.Front()).(T)

if q.counterMetric != nil {
q.counterMetric.With(prometheus.Labels{"Operation": metrics.Remove.String()}).Inc()
}

if q.queue.Len() == 0 {
q.notEmptySignal.Reset()
}

return item
}

// PullBlocking will pull an item from the queue, potentially waiting until one is available.
// In case the waitable signals done, the default value of T will be returned.
func (q *Queue[T]) PullBlocking(waitable concurrency.Waitable) T {
var item T
// In case multiple go routines are pull blocking, we have to ensure that the result of pull
// is non-zero, hence the additional for loop here.
for item == *new(T) {
select {
case <-waitable.Done():
return item
case <-q.notEmptySignal.Done():
item = q.Pull()
}
}
return item
}

// Push adds an item to the queue.
// Note that in case the queue is full, no error will be returned but rather only a log emitted.
func (q *Queue[T]) Push(item T) {
q.mutex.Lock()
defer q.mutex.Unlock()

if q.maxSize != 0 && q.queue.Len() >= q.maxSize {
log.Warnf("Queue size limit reached (%d). New items added to the queue will be dropped.", q.maxSize)
return
}

defer q.notEmptySignal.Signal()
if q.counterMetric != nil {
q.counterMetric.With(prometheus.Labels{"Operation": metrics.Add.String()}).Inc()
}
q.queue.PushBack(item)
}
38 changes: 38 additions & 0 deletions pkg/queue/queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package queue

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestQueue(t *testing.T) {
t.Parallel()
q := NewQueue[*string]()

// 1. Adding a new item to the queue.
item := "first-item"
q.Push(&item)

// 2. Using pull should retrieve the previously added item.
queueItem := q.Pull()
assert.Equal(t, item, *queueItem)

// 3. Add an item after 500ms of waiting. Meanwhile, call pull blocking. It should wait until an item is added
// and afterward return it.
time.AfterFunc(500*time.Millisecond, func() {
item := "second-item"
q.Push(&item)
})

assert.Eventually(t, func() bool {
queueItem := q.PullBlocking(context.Background())
return "second-item" == *queueItem
}, 1*time.Second, 100*time.Millisecond)

// 4. Another pull should now return an empty value.
queueItem = q.Pull()
assert.Nil(t, queueItem)
}

0 comments on commit 0b86824

Please sign in to comment.