Skip to content

Commit

Permalink
Release event pointers faster as they go through the pipeline (elasti…
Browse files Browse the repository at this point in the history
…c#38166)

Make two changes to allow event data to be freed from memory faster:
- Clear event pointers from the memory queue buffer when they are vended instead of when they're acknowledged. (The data will be preserved in the event batch structure until acknowledgment.)
- Clear event pointers from the batch structure immediately after it is acknowledged instead of waiting for the batch to be freed naturally.

In benchmarks of Filebeat with saturated Filestream input going to an Elasticsearch output, this lowered average memory footprint by ~10%.
  • Loading branch information
faec authored Mar 20, 2024
1 parent 16f8d09 commit 2e4cbdb
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 9 deletions.
4 changes: 4 additions & 0 deletions libbeat/publisher/pipeline/ttl_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,14 @@ func (b *ttlBatch) Events() []publisher.Event {
}

func (b *ttlBatch) ACK() {
// Help the garbage collector clean up the event data a little faster
b.events = nil
b.done()
}

func (b *ttlBatch) Drop() {
// Help the garbage collector clean up the event data a little faster
b.events = nil
b.done()
}

Expand Down
45 changes: 45 additions & 0 deletions libbeat/publisher/pipeline/ttl_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package pipeline

import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -91,6 +92,50 @@ func TestNestedBatchSplit(t *testing.T) {
assert.True(t, doneWasCalled, "Original callback should be invoked when all children are")
}

func TestBatchCallsDoneAndFreesEvents(t *testing.T) {
doneCalled := false
batch := &ttlBatch{
done: func() { doneCalled = true },
events: []publisher.Event{{}},
}
require.NotNil(t, batch.events, "Initial batch events must be non-nil")
batch.ACK()
require.Nil(t, batch.events, "Calling batch.ACK should clear the events array")
require.True(t, doneCalled, "Calling batch.ACK should invoke the done callback")

doneCalled = false
batch.events = []publisher.Event{{}}
require.NotNil(t, batch.events, "Initial batch events must be non-nil")
batch.Drop()
require.Nil(t, batch.events, "Calling batch.Drop should clear the events array")
require.True(t, doneCalled, "Calling batch.Drop should invoke the done callback")
}

func TestNewBatchFreesEvents(t *testing.T) {
queueBatch := &mockQueueBatch{}
_ = newBatch(nil, queueBatch, 0)
assert.Equal(t, 1, queueBatch.freeEntriesCalled, "Creating a new ttlBatch should call FreeEntries on the underlying queue.Batch")
}

type mockQueueBatch struct {
freeEntriesCalled int
}

func (b *mockQueueBatch) Count() int {
return 1
}

func (b *mockQueueBatch) Done() {
}

func (b *mockQueueBatch) Entry(i int) interface{} {
return fmt.Sprintf("event %v", i)
}

func (b *mockQueueBatch) FreeEntries() {
b.freeEntriesCalled++
}

type mockRetryer struct {
batches []*ttlBatch
}
Expand Down
8 changes: 6 additions & 2 deletions libbeat/publisher/queue/memqueue/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,8 +403,12 @@ func (b *batch) Entry(i int) interface{} {
}

func (b *batch) FreeEntries() {
// Memory queue can't release event references until they're fully acknowledged,
// so do nothing.
// This signals that the event data has been copied out of the batch, and is
// safe to free from the queue buffer, so set all the event pointers to nil.
for i := 0; i < b.count; i++ {
index := (b.start + i) % len(b.queue.buf)
b.queue.buf[index].event = nil
}
}

func (b *batch) Done() {
Expand Down
38 changes: 38 additions & 0 deletions libbeat/publisher/queue/memqueue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,44 @@ func TestEntryIDs(t *testing.T) {
})
}

func TestBatchFreeEntries(t *testing.T) {
const queueSize = 10
const batchSize = 5
// 1. Add 10 events to the queue, request two batches with 5 events each
// 2. Make sure the queue buffer has 10 non-nil events
// 3. Call FreeEntries on the second batch
// 4. Make sure only events 6-10 are nil
// 5. Call FreeEntries on the first batch
// 6. Make sure all events are nil
testQueue := NewQueue(nil, nil, Settings{Events: queueSize, MaxGetRequest: batchSize, FlushTimeout: time.Second}, 0)
producer := testQueue.Producer(queue.ProducerConfig{})
for i := 0; i < queueSize; i++ {
_, ok := producer.Publish(i)
require.True(t, ok, "Queue publish must succeed")
}
batch1, err := testQueue.Get(batchSize)
require.NoError(t, err, "Queue read must succeed")
require.Equal(t, batchSize, batch1.Count(), "Returned batch size must match request")
batch2, err := testQueue.Get(batchSize)
require.NoError(t, err, "Queue read must succeed")
require.Equal(t, batchSize, batch2.Count(), "Returned batch size must match request")
// Slight concurrency subtlety: we check events are non-nil after the queue
// reads, since if we do it before we have no way to be sure the insert
// has been completed.
for i := 0; i < queueSize; i++ {
require.NotNil(t, testQueue.buf[i].event, "All queue events must be non-nil")
}
batch2.FreeEntries()
for i := 0; i < batchSize; i++ {
require.NotNilf(t, testQueue.buf[i].event, "Queue index %v: batch 1's events should be unaffected by calling FreeEntries on Batch 2", i)
require.Nilf(t, testQueue.buf[batchSize+i].event, "Queue index %v: batch 2's events should be nil after FreeEntries", batchSize+i)
}
batch1.FreeEntries()
for i := 0; i < queueSize; i++ {
require.Nilf(t, testQueue.buf[i].event, "Queue index %v: all events should be nil after calling FreeEntries on both batches")
}
}

// producerACKWaiter is a helper that can listen to queue producer callbacks
// and wait on them from the test thread, so we can test the queue's asynchronous
// behavior without relying on time.Sleep.
Expand Down
9 changes: 2 additions & 7 deletions libbeat/publisher/queue/memqueue/runloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,13 +187,8 @@ func (l *runLoop) handleGetReply(req *getRequest) {
}

func (l *runLoop) handleDelete(count int) {
// Clear the internal event pointers so they can be garbage collected
for i := 0; i < count; i++ {
index := (l.bufPos + i) % len(l.broker.buf)
l.broker.buf[index].event = nil
}

// Advance position and counters
// Advance position and counters. Event data was already cleared in
// batch.FreeEntries when the events were vended.
l.bufPos = (l.bufPos + count) % len(l.broker.buf)
l.eventCount -= count
l.consumedCount -= count
Expand Down

0 comments on commit 2e4cbdb

Please sign in to comment.