From 2e4cbdb3fedb93bea32c85d329f5741d61f8585c Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Wed, 20 Mar 2024 15:41:06 -0400 Subject: [PATCH] Release event pointers faster as they go through the pipeline (#38166) Make two changes to allow event data to be freed from memory faster: - Clear event pointers from the memory queue buffer when they are vended instead of when they're acknowledged. (The data will be preserved in the event batch structure until acknowledgment.) - Clear event pointers from the batch structure immediately after it is acknowledged instead of waiting for the batch to be freed naturally. In benchmarks of Filebeat with saturated Filestream input going to an Elasticsearch output, this lowered average memory footprint by ~10%. --- libbeat/publisher/pipeline/ttl_batch.go | 4 ++ libbeat/publisher/pipeline/ttl_batch_test.go | 45 +++++++++++++++++++ libbeat/publisher/queue/memqueue/broker.go | 8 +++- .../publisher/queue/memqueue/queue_test.go | 38 ++++++++++++++++ libbeat/publisher/queue/memqueue/runloop.go | 9 +--- 5 files changed, 95 insertions(+), 9 deletions(-) diff --git a/libbeat/publisher/pipeline/ttl_batch.go b/libbeat/publisher/pipeline/ttl_batch.go index c374ac88d724..dcc2790f2316 100644 --- a/libbeat/publisher/pipeline/ttl_batch.go +++ b/libbeat/publisher/pipeline/ttl_batch.go @@ -93,10 +93,14 @@ func (b *ttlBatch) Events() []publisher.Event { } func (b *ttlBatch) ACK() { + // Help the garbage collector clean up the event data a little faster + b.events = nil b.done() } func (b *ttlBatch) Drop() { + // Help the garbage collector clean up the event data a little faster + b.events = nil b.done() } diff --git a/libbeat/publisher/pipeline/ttl_batch_test.go b/libbeat/publisher/pipeline/ttl_batch_test.go index a56f4b0fca10..5e277d5042c4 100644 --- a/libbeat/publisher/pipeline/ttl_batch_test.go +++ b/libbeat/publisher/pipeline/ttl_batch_test.go @@ -18,6 +18,7 @@ package pipeline import ( + "fmt" "testing" "github.com/stretchr/testify/assert" @@ -91,6 +92,50 @@ func TestNestedBatchSplit(t *testing.T) { assert.True(t, doneWasCalled, "Original callback should be invoked when all children are") } +func TestBatchCallsDoneAndFreesEvents(t *testing.T) { + doneCalled := false + batch := &ttlBatch{ + done: func() { doneCalled = true }, + events: []publisher.Event{{}}, + } + require.NotNil(t, batch.events, "Initial batch events must be non-nil") + batch.ACK() + require.Nil(t, batch.events, "Calling batch.ACK should clear the events array") + require.True(t, doneCalled, "Calling batch.ACK should invoke the done callback") + + doneCalled = false + batch.events = []publisher.Event{{}} + require.NotNil(t, batch.events, "Initial batch events must be non-nil") + batch.Drop() + require.Nil(t, batch.events, "Calling batch.Drop should clear the events array") + require.True(t, doneCalled, "Calling batch.Drop should invoke the done callback") +} + +func TestNewBatchFreesEvents(t *testing.T) { + queueBatch := &mockQueueBatch{} + _ = newBatch(nil, queueBatch, 0) + assert.Equal(t, 1, queueBatch.freeEntriesCalled, "Creating a new ttlBatch should call FreeEntries on the underlying queue.Batch") +} + +type mockQueueBatch struct { + freeEntriesCalled int +} + +func (b *mockQueueBatch) Count() int { + return 1 +} + +func (b *mockQueueBatch) Done() { +} + +func (b *mockQueueBatch) Entry(i int) interface{} { + return fmt.Sprintf("event %v", i) +} + +func (b *mockQueueBatch) FreeEntries() { + b.freeEntriesCalled++ +} + type mockRetryer struct { batches []*ttlBatch } diff --git a/libbeat/publisher/queue/memqueue/broker.go b/libbeat/publisher/queue/memqueue/broker.go index e1d0fd46c004..1455745961c7 100644 --- a/libbeat/publisher/queue/memqueue/broker.go +++ b/libbeat/publisher/queue/memqueue/broker.go @@ -403,8 +403,12 @@ func (b *batch) Entry(i int) interface{} { } func (b *batch) FreeEntries() { - // Memory queue can't release event references until they're fully acknowledged, - // so do nothing. + // This signals that the event data has been copied out of the batch, and is + // safe to free from the queue buffer, so set all the event pointers to nil. + for i := 0; i < b.count; i++ { + index := (b.start + i) % len(b.queue.buf) + b.queue.buf[index].event = nil + } } func (b *batch) Done() { diff --git a/libbeat/publisher/queue/memqueue/queue_test.go b/libbeat/publisher/queue/memqueue/queue_test.go index 53f8da4b77c6..637e7ccd4fbb 100644 --- a/libbeat/publisher/queue/memqueue/queue_test.go +++ b/libbeat/publisher/queue/memqueue/queue_test.go @@ -438,6 +438,44 @@ func TestEntryIDs(t *testing.T) { }) } +func TestBatchFreeEntries(t *testing.T) { + const queueSize = 10 + const batchSize = 5 + // 1. Add 10 events to the queue, request two batches with 5 events each + // 2. Make sure the queue buffer has 10 non-nil events + // 3. Call FreeEntries on the second batch + // 4. Make sure only events 6-10 are nil + // 5. Call FreeEntries on the first batch + // 6. Make sure all events are nil + testQueue := NewQueue(nil, nil, Settings{Events: queueSize, MaxGetRequest: batchSize, FlushTimeout: time.Second}, 0) + producer := testQueue.Producer(queue.ProducerConfig{}) + for i := 0; i < queueSize; i++ { + _, ok := producer.Publish(i) + require.True(t, ok, "Queue publish must succeed") + } + batch1, err := testQueue.Get(batchSize) + require.NoError(t, err, "Queue read must succeed") + require.Equal(t, batchSize, batch1.Count(), "Returned batch size must match request") + batch2, err := testQueue.Get(batchSize) + require.NoError(t, err, "Queue read must succeed") + require.Equal(t, batchSize, batch2.Count(), "Returned batch size must match request") + // Slight concurrency subtlety: we check events are non-nil after the queue + // reads, since if we do it before we have no way to be sure the insert + // has been completed. + for i := 0; i < queueSize; i++ { + require.NotNil(t, testQueue.buf[i].event, "All queue events must be non-nil") + } + batch2.FreeEntries() + for i := 0; i < batchSize; i++ { + require.NotNilf(t, testQueue.buf[i].event, "Queue index %v: batch 1's events should be unaffected by calling FreeEntries on Batch 2", i) + require.Nilf(t, testQueue.buf[batchSize+i].event, "Queue index %v: batch 2's events should be nil after FreeEntries", batchSize+i) + } + batch1.FreeEntries() + for i := 0; i < queueSize; i++ { + require.Nilf(t, testQueue.buf[i].event, "Queue index %v: all events should be nil after calling FreeEntries on both batches") + } +} + // producerACKWaiter is a helper that can listen to queue producer callbacks // and wait on them from the test thread, so we can test the queue's asynchronous // behavior without relying on time.Sleep. diff --git a/libbeat/publisher/queue/memqueue/runloop.go b/libbeat/publisher/queue/memqueue/runloop.go index 0f7788c62098..45ae3c0a1a2b 100644 --- a/libbeat/publisher/queue/memqueue/runloop.go +++ b/libbeat/publisher/queue/memqueue/runloop.go @@ -187,13 +187,8 @@ func (l *runLoop) handleGetReply(req *getRequest) { } func (l *runLoop) handleDelete(count int) { - // Clear the internal event pointers so they can be garbage collected - for i := 0; i < count; i++ { - index := (l.bufPos + i) % len(l.broker.buf) - l.broker.buf[index].event = nil - } - - // Advance position and counters + // Advance position and counters. Event data was already cleared in + // batch.FreeEntries when the events were vended. l.bufPos = (l.bufPos + count) % len(l.broker.buf) l.eventCount -= count l.consumedCount -= count