diff --git a/libbeat/publisher/pipeline/ttl_batch.go b/libbeat/publisher/pipeline/ttl_batch.go index c374ac88d724..dcc2790f2316 100644 --- a/libbeat/publisher/pipeline/ttl_batch.go +++ b/libbeat/publisher/pipeline/ttl_batch.go @@ -93,10 +93,14 @@ func (b *ttlBatch) Events() []publisher.Event { } func (b *ttlBatch) ACK() { + // Help the garbage collector clean up the event data a little faster + b.events = nil b.done() } func (b *ttlBatch) Drop() { + // Help the garbage collector clean up the event data a little faster + b.events = nil b.done() } diff --git a/libbeat/publisher/pipeline/ttl_batch_test.go b/libbeat/publisher/pipeline/ttl_batch_test.go index a56f4b0fca10..5e277d5042c4 100644 --- a/libbeat/publisher/pipeline/ttl_batch_test.go +++ b/libbeat/publisher/pipeline/ttl_batch_test.go @@ -18,6 +18,7 @@ package pipeline import ( + "fmt" "testing" "github.com/stretchr/testify/assert" @@ -91,6 +92,50 @@ func TestNestedBatchSplit(t *testing.T) { assert.True(t, doneWasCalled, "Original callback should be invoked when all children are") } +func TestBatchCallsDoneAndFreesEvents(t *testing.T) { + doneCalled := false + batch := &ttlBatch{ + done: func() { doneCalled = true }, + events: []publisher.Event{{}}, + } + require.NotNil(t, batch.events, "Initial batch events must be non-nil") + batch.ACK() + require.Nil(t, batch.events, "Calling batch.ACK should clear the events array") + require.True(t, doneCalled, "Calling batch.ACK should invoke the done callback") + + doneCalled = false + batch.events = []publisher.Event{{}} + require.NotNil(t, batch.events, "Initial batch events must be non-nil") + batch.Drop() + require.Nil(t, batch.events, "Calling batch.Drop should clear the events array") + require.True(t, doneCalled, "Calling batch.Drop should invoke the done callback") +} + +func TestNewBatchFreesEvents(t *testing.T) { + queueBatch := &mockQueueBatch{} + _ = newBatch(nil, queueBatch, 0) + assert.Equal(t, 1, queueBatch.freeEntriesCalled, "Creating a new ttlBatch should call FreeEntries on the underlying queue.Batch") +} + +type mockQueueBatch struct { + freeEntriesCalled int +} + +func (b *mockQueueBatch) Count() int { + return 1 +} + +func (b *mockQueueBatch) Done() { +} + +func (b *mockQueueBatch) Entry(i int) interface{} { + return fmt.Sprintf("event %v", i) +} + +func (b *mockQueueBatch) FreeEntries() { + b.freeEntriesCalled++ +} + type mockRetryer struct { batches []*ttlBatch } diff --git a/libbeat/publisher/queue/memqueue/broker.go b/libbeat/publisher/queue/memqueue/broker.go index e1d0fd46c004..1455745961c7 100644 --- a/libbeat/publisher/queue/memqueue/broker.go +++ b/libbeat/publisher/queue/memqueue/broker.go @@ -403,8 +403,12 @@ func (b *batch) Entry(i int) interface{} { } func (b *batch) FreeEntries() { - // Memory queue can't release event references until they're fully acknowledged, - // so do nothing. + // This signals that the event data has been copied out of the batch, and is + // safe to free from the queue buffer, so set all the event pointers to nil. + for i := 0; i < b.count; i++ { + index := (b.start + i) % len(b.queue.buf) + b.queue.buf[index].event = nil + } } func (b *batch) Done() { diff --git a/libbeat/publisher/queue/memqueue/queue_test.go b/libbeat/publisher/queue/memqueue/queue_test.go index 53f8da4b77c6..637e7ccd4fbb 100644 --- a/libbeat/publisher/queue/memqueue/queue_test.go +++ b/libbeat/publisher/queue/memqueue/queue_test.go @@ -438,6 +438,44 @@ func TestEntryIDs(t *testing.T) { }) } +func TestBatchFreeEntries(t *testing.T) { + const queueSize = 10 + const batchSize = 5 + // 1. Add 10 events to the queue, request two batches with 5 events each + // 2. Make sure the queue buffer has 10 non-nil events + // 3. Call FreeEntries on the second batch + // 4. Make sure only events 6-10 are nil + // 5. Call FreeEntries on the first batch + // 6. Make sure all events are nil + testQueue := NewQueue(nil, nil, Settings{Events: queueSize, MaxGetRequest: batchSize, FlushTimeout: time.Second}, 0) + producer := testQueue.Producer(queue.ProducerConfig{}) + for i := 0; i < queueSize; i++ { + _, ok := producer.Publish(i) + require.True(t, ok, "Queue publish must succeed") + } + batch1, err := testQueue.Get(batchSize) + require.NoError(t, err, "Queue read must succeed") + require.Equal(t, batchSize, batch1.Count(), "Returned batch size must match request") + batch2, err := testQueue.Get(batchSize) + require.NoError(t, err, "Queue read must succeed") + require.Equal(t, batchSize, batch2.Count(), "Returned batch size must match request") + // Slight concurrency subtlety: we check events are non-nil after the queue + // reads, since if we do it before we have no way to be sure the insert + // has been completed. + for i := 0; i < queueSize; i++ { + require.NotNil(t, testQueue.buf[i].event, "All queue events must be non-nil") + } + batch2.FreeEntries() + for i := 0; i < batchSize; i++ { + require.NotNilf(t, testQueue.buf[i].event, "Queue index %v: batch 1's events should be unaffected by calling FreeEntries on Batch 2", i) + require.Nilf(t, testQueue.buf[batchSize+i].event, "Queue index %v: batch 2's events should be nil after FreeEntries", batchSize+i) + } + batch1.FreeEntries() + for i := 0; i < queueSize; i++ { + require.Nilf(t, testQueue.buf[i].event, "Queue index %v: all events should be nil after calling FreeEntries on both batches") + } +} + // producerACKWaiter is a helper that can listen to queue producer callbacks // and wait on them from the test thread, so we can test the queue's asynchronous // behavior without relying on time.Sleep. diff --git a/libbeat/publisher/queue/memqueue/runloop.go b/libbeat/publisher/queue/memqueue/runloop.go index 0f7788c62098..45ae3c0a1a2b 100644 --- a/libbeat/publisher/queue/memqueue/runloop.go +++ b/libbeat/publisher/queue/memqueue/runloop.go @@ -187,13 +187,8 @@ func (l *runLoop) handleGetReply(req *getRequest) { } func (l *runLoop) handleDelete(count int) { - // Clear the internal event pointers so they can be garbage collected - for i := 0; i < count; i++ { - index := (l.bufPos + i) % len(l.broker.buf) - l.broker.buf[index].event = nil - } - - // Advance position and counters + // Advance position and counters. Event data was already cleared in + // batch.FreeEntries when the events were vended. l.bufPos = (l.bufPos + count) % len(l.broker.buf) l.eventCount -= count l.consumedCount -= count