From 074c1dd6573ec512e3e86ce41af14f836397e159 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Fri, 31 May 2024 11:46:52 -0400 Subject: [PATCH] [libbeat] Remove "producer cancel" features from queue API (#39760) "Producer cancel" is a feature that allows closing queue producers to also cancel any pending events created by that producer that have not yet been sent to a queue reader. It was introduced as a small part of a [very large refactor](https://github.com/elastic/beats/pull/4492) in 2017, but current code doesn't depend on it for anything. Since this feature adds considerable complexity to the queue API and implementation, this PR removes the feature and associated helpers. This PR should cause no user-visible behavior change. --- libbeat/publisher/pipeline/client.go | 15 +-- libbeat/publisher/pipeline/controller.go | 3 +- libbeat/publisher/pipeline/pipeline.go | 16 --- libbeat/publisher/pipeline/pipeline_test.go | 23 ++-- libbeat/publisher/queue/diskqueue/producer.go | 9 +- libbeat/publisher/queue/memqueue/broker.go | 7 +- .../publisher/queue/memqueue/internal_api.go | 9 -- libbeat/publisher/queue/memqueue/produce.go | 30 +---- .../publisher/queue/memqueue/queue_test.go | 14 +-- libbeat/publisher/queue/memqueue/runloop.go | 74 ++---------- .../publisher/queue/memqueue/runloop_test.go | 4 +- libbeat/publisher/queue/queue.go | 20 +--- .../queue/queuetest/producer_cancel.go | 106 ------------------ 13 files changed, 36 insertions(+), 294 deletions(-) delete mode 100644 libbeat/publisher/queue/queuetest/producer_cancel.go diff --git a/libbeat/publisher/pipeline/client.go b/libbeat/publisher/pipeline/client.go index a5c02faace6d..7ecce6fd8c70 100644 --- a/libbeat/publisher/pipeline/client.go +++ b/libbeat/publisher/pipeline/client.go @@ -146,8 +146,8 @@ func (c *client) Close() error { c.logger.Debug("client: done closing acker") c.logger.Debug("client: close queue producer") - cancelledEventCount := c.producer.Cancel() - c.onClosed(cancelledEventCount) + c.producer.Close() + c.onClosed() c.logger.Debug("client: done producer close") if c.processors != nil { @@ -168,16 +168,7 @@ func (c *client) onClosing() { } } -func (c *client) onClosed(cancelledEventCount int) { - c.logger.Debugf("client: cancelled %v events", cancelledEventCount) - - if c.eventWaitGroup != nil { - c.logger.Debugf("client: remove client events") - if cancelledEventCount > 0 { - c.eventWaitGroup.Add(-cancelledEventCount) - } - } - +func (c *client) onClosed() { c.observer.clientClosed() if c.clientListener != nil { c.clientListener.Closed() diff --git a/libbeat/publisher/pipeline/controller.go b/libbeat/publisher/pipeline/controller.go index bb75c9619c57..b34d6a64d2c0 100644 --- a/libbeat/publisher/pipeline/controller.go +++ b/libbeat/publisher/pipeline/controller.go @@ -303,6 +303,5 @@ func (emptyProducer) TryPublish(_ queue.Entry) (queue.EntryID, bool) { return 0, false } -func (emptyProducer) Cancel() int { - return 0 +func (emptyProducer) Close() { } diff --git a/libbeat/publisher/pipeline/pipeline.go b/libbeat/publisher/pipeline/pipeline.go index 85eeb0e64977..dbe87681ea63 100644 --- a/libbeat/publisher/pipeline/pipeline.go +++ b/libbeat/publisher/pipeline/pipeline.go @@ -71,10 +71,6 @@ type Pipeline struct { waitCloseTimeout time.Duration eventWaitGroup *sync.WaitGroup - // closeRef signal propagation support - guardStartSigPropagation sync.Once - sigNewClient chan *client - processors processing.Supporter } @@ -250,18 +246,6 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) { producerCfg := queue.ProducerConfig{} - if client.eventWaitGroup != nil || cfg.ClientListener != nil { - producerCfg.OnDrop = func(event queue.Entry) { - publisherEvent, _ := event.(publisher.Event) - if cfg.ClientListener != nil { - cfg.ClientListener.DroppedOnPublish(publisherEvent.Content) - } - if client.eventWaitGroup != nil { - client.eventWaitGroup.Add(-1) - } - } - } - var waiter *clientCloseWaiter if waitClose > 0 { waiter = newClientCloseWaiter(waitClose) diff --git a/libbeat/publisher/pipeline/pipeline_test.go b/libbeat/publisher/pipeline/pipeline_test.go index feb01c4fa6e0..78725b043f1a 100644 --- a/libbeat/publisher/pipeline/pipeline_test.go +++ b/libbeat/publisher/pipeline/pipeline_test.go @@ -93,7 +93,6 @@ func makeDiscardQueue() queue.Queue { producer: func(cfg queue.ProducerConfig) queue.Producer { producerID.Inc() - id := producerID.Load() // count is a counter that increments on every published event // it's also the returned Event ID @@ -103,10 +102,8 @@ func makeDiscardQueue() queue.Queue { count++ return queue.EntryID(count), true }, - cancel: func() int { - + cancel: func() { wg.Done() - return id }, } @@ -125,7 +122,7 @@ type testQueue struct { type testProducer struct { publish func(try bool, event queue.Entry) (queue.EntryID, bool) - cancel func() int + cancel func() } func (q *testQueue) Metrics() (queue.Metrics, error) { @@ -178,11 +175,10 @@ func (p *testProducer) TryPublish(event queue.Entry) (queue.EntryID, bool) { return 0, false } -func (p *testProducer) Cancel() int { +func (p *testProducer) Close() { if p.cancel != nil { - return p.cancel() + p.cancel() } - return 0 } func makeTestQueue() queue.Queue { @@ -194,7 +190,7 @@ func makeTestQueue() queue.Queue { close: func() error { mux.Lock() for producer := range producers { - producer.Cancel() + producer.Close() } mux.Unlock() @@ -216,15 +212,11 @@ func makeTestQueue() queue.Queue { } return p.Publish(event) }, - cancel: func() int { - i := p.Cancel() - + cancel: func() { mux.Lock() defer mux.Unlock() delete(producers, producer) wg.Done() - - return i }, } @@ -248,9 +240,8 @@ func blockingProducer(_ queue.ProducerConfig) queue.Producer { return 0, false }, - cancel: func() int { + cancel: func() { close(sig) - return waiting.Load() }, } } diff --git a/libbeat/publisher/queue/diskqueue/producer.go b/libbeat/publisher/queue/diskqueue/producer.go index 69725c62ccc1..7d084adf5ea4 100644 --- a/libbeat/publisher/queue/diskqueue/producer.go +++ b/libbeat/publisher/queue/diskqueue/producer.go @@ -94,15 +94,10 @@ func (producer *diskQueueProducer) publish( } } -func (producer *diskQueueProducer) Cancel() int { +func (producer *diskQueueProducer) Close() { if producer.cancelled { - return 0 + return } producer.cancelled = true close(producer.done) - - // TODO (possibly?): message the core loop to remove any pending events that - // were sent through this producer. If we do, return the number of cancelled - // events here instead of zero. - return 0 } diff --git a/libbeat/publisher/queue/memqueue/broker.go b/libbeat/publisher/queue/memqueue/broker.go index a42215f48a67..d9aff10bd3ac 100644 --- a/libbeat/publisher/queue/memqueue/broker.go +++ b/libbeat/publisher/queue/memqueue/broker.go @@ -66,10 +66,6 @@ type broker struct { // Consumers send requests to getChan to read events from the queue. getChan chan getRequest - // Producers send requests to cancelChan to cancel events they've - // sent so far that have not yet reached a consumer. - cancelChan chan producerCancelRequest - // Metrics() sends requests to metricChan to expose internal queue // metrics to external callers. metricChan chan metricsRequest @@ -224,7 +220,6 @@ func newQueue( // broker API channels pushChan: make(chan pushRequest, chanSize), getChan: make(chan getRequest), - cancelChan: make(chan producerCancelRequest, 5), metricChan: make(chan metricsRequest), // internal runLoop and ackLoop channels @@ -264,7 +259,7 @@ func (b *broker) Producer(cfg queue.ProducerConfig) queue.Producer { if b.encoderFactory != nil { encoder = b.encoderFactory() } - return newProducer(b, cfg.ACK, cfg.OnDrop, cfg.DropOnCancel, encoder) + return newProducer(b, cfg.ACK, encoder) } func (b *broker) Get(count int) (queue.Batch, error) { diff --git a/libbeat/publisher/queue/memqueue/internal_api.go b/libbeat/publisher/queue/memqueue/internal_api.go index 95b5e0eba90f..6575472edbd0 100644 --- a/libbeat/publisher/queue/memqueue/internal_api.go +++ b/libbeat/publisher/queue/memqueue/internal_api.go @@ -38,15 +38,6 @@ type pushRequest struct { resp chan queue.EntryID } -type producerCancelRequest struct { - producer *ackProducer - resp chan producerCancelResponse -} - -type producerCancelResponse struct { - removed int -} - // consumer -> broker API type getRequest struct { diff --git a/libbeat/publisher/queue/memqueue/produce.go b/libbeat/publisher/queue/memqueue/produce.go index 55f15a8cc869..a206e357aacb 100644 --- a/libbeat/publisher/queue/memqueue/produce.go +++ b/libbeat/publisher/queue/memqueue/produce.go @@ -29,7 +29,6 @@ type forgetfulProducer struct { type ackProducer struct { broker *broker - dropOnCancel bool producedCount uint64 state produceState openState openState @@ -50,15 +49,13 @@ type openState struct { type producerID uint64 type produceState struct { - cb ackHandler - dropCB func(queue.Entry) - cancelled bool - lastACK producerID + cb ackHandler + lastACK producerID } type ackHandler func(count int) -func newProducer(b *broker, cb ackHandler, dropCB func(queue.Entry), dropOnCancel bool, encoder queue.Encoder) queue.Producer { +func newProducer(b *broker, cb ackHandler, encoder queue.Encoder) queue.Producer { openState := openState{ log: b.logger, done: make(chan struct{}), @@ -68,9 +65,8 @@ func newProducer(b *broker, cb ackHandler, dropCB func(queue.Entry), dropOnCance } if cb != nil { - p := &ackProducer{broker: b, dropOnCancel: dropOnCancel, openState: openState} + p := &ackProducer{broker: b, openState: openState} p.state.cb = cb - p.state.dropCB = dropCB return p } return &forgetfulProducer{broker: b, openState: openState} @@ -91,9 +87,8 @@ func (p *forgetfulProducer) TryPublish(event queue.Entry) (queue.EntryID, bool) return p.openState.tryPublish(p.makePushRequest(event)) } -func (p *forgetfulProducer) Cancel() int { +func (p *forgetfulProducer) Close() { p.openState.Close() - return 0 } func (p *ackProducer) makePushRequest(event queue.Entry) pushRequest { @@ -123,21 +118,8 @@ func (p *ackProducer) TryPublish(event queue.Entry) (queue.EntryID, bool) { return id, published } -func (p *ackProducer) Cancel() int { +func (p *ackProducer) Close() { p.openState.Close() - - if p.dropOnCancel { - ch := make(chan producerCancelResponse) - p.broker.cancelChan <- producerCancelRequest{ - producer: p, - resp: ch, - } - - // wait for cancel to being processed - resp := <-ch - return resp.removed - } - return 0 } func (st *openState) Close() { diff --git a/libbeat/publisher/queue/memqueue/queue_test.go b/libbeat/publisher/queue/memqueue/queue_test.go index 41228046c532..5ebf6b6f6fb5 100644 --- a/libbeat/publisher/queue/memqueue/queue_test.go +++ b/libbeat/publisher/queue/memqueue/queue_test.go @@ -96,9 +96,7 @@ func TestProducerDoesNotBlockWhenQueueClosed(t *testing.T) { p := q.Producer(queue.ProducerConfig{ // We do not read from the queue, so the callbacks are never called - ACK: func(count int) {}, - OnDrop: func(e queue.Entry) {}, - DropOnCancel: false, + ACK: func(count int) {}, }) success := atomic.Bool{} @@ -170,10 +168,6 @@ func TestProducerClosePreservesEventCount(t *testing.T) { ACK: func(count int) { activeEvents.Add(-int64(count)) }, - OnDrop: func(e queue.Entry) { - //activeEvents.Add(-1) - }, - DropOnCancel: false, }) // Asynchronously, send 4 events to the queue. @@ -209,7 +203,7 @@ func TestProducerClosePreservesEventCount(t *testing.T) { // Cancel the producer, then read and acknowledge two batches. If the // Publish calls and the queue code are working, activeEvents should // _usually_ end up as 0, but _always_ end up non-negative. - p.Cancel() + p.Close() // The queue reads also need to be done in a goroutine, in case the // producer cancellation signal went through before the Publish @@ -297,10 +291,6 @@ func queueMetricsAreValid(t *testing.T, q queue.Queue, evtCount, evtLimit, occup assert.Equal(t, testMetrics.UnackedConsumedEvents.ValueOr(0), uint64(occupied), "incorrect OccupiedRead for %s", test) } -func TestProducerCancelRemovesEvents(t *testing.T) { - queuetest.TestProducerCancelRemovesEvents(t, makeTestQueue(1024, 0, 0)) -} - func makeTestQueue(sz, minEvents int, flushTimeout time.Duration) queuetest.QueueFactory { return func(_ *testing.T) queue.Queue { return NewQueue(nil, nil, Settings{ diff --git a/libbeat/publisher/queue/memqueue/runloop.go b/libbeat/publisher/queue/memqueue/runloop.go index 45ae3c0a1a2b..ed14106f20c9 100644 --- a/libbeat/publisher/queue/memqueue/runloop.go +++ b/libbeat/publisher/queue/memqueue/runloop.go @@ -122,9 +122,6 @@ func (l *runLoop) runIteration() { case req := <-pushChan: // producer pushing new event l.handleInsert(&req) - case req := <-l.broker.cancelChan: // producer cancelling active events - l.handleCancel(&req) - case req := <-getChan: // consumer asking for next batch l.handleGetRequest(&req) @@ -195,16 +192,15 @@ func (l *runLoop) handleDelete(count int) { } func (l *runLoop) handleInsert(req *pushRequest) { - if l.insert(req, l.nextEntryID) { - // Send back the new event id. - req.resp <- l.nextEntryID + l.insert(req, l.nextEntryID) + // Send back the new event id. + req.resp <- l.nextEntryID - l.nextEntryID++ - l.eventCount++ + l.nextEntryID++ + l.eventCount++ - // See if this gave us enough for a new batch - l.maybeUnblockGetRequest() - } + // See if this gave us enough for a new batch + l.maybeUnblockGetRequest() } // Checks if we can handle pendingGetRequest yet, and handles it if so @@ -223,13 +219,7 @@ func (l *runLoop) maybeUnblockGetRequest() { } } -// Returns true if the event was inserted, false if insertion was cancelled. -func (l *runLoop) insert(req *pushRequest, id queue.EntryID) bool { - if req.producer != nil && req.producer.state.cancelled { - reportCancelledState(req) - return false - } - +func (l *runLoop) insert(req *pushRequest, id queue.EntryID) { index := (l.bufPos + l.eventCount) % len(l.broker.buf) l.broker.buf[index] = queueEntry{ event: req.event, @@ -237,7 +227,6 @@ func (l *runLoop) insert(req *pushRequest, id queue.EntryID) bool { producer: req.producer, producerID: req.producerID, } - return true } func (l *runLoop) handleMetricsRequest(req *metricsRequest) { @@ -253,50 +242,3 @@ func (l *runLoop) handleMetricsRequest(req *metricsRequest) { oldestEntryID: oldestEntryID, } } - -func (l *runLoop) handleCancel(req *producerCancelRequest) { - var removedCount int - - // Traverse all unconsumed events in the buffer, removing any with - // the specified producer. As we go we condense all the remaining - // events to be sequential. - buf := l.broker.buf - startIndex := l.bufPos + l.consumedCount - unconsumedEventCount := l.eventCount - l.consumedCount - for i := 0; i < unconsumedEventCount; i++ { - readIndex := (startIndex + i) % len(buf) - if buf[readIndex].producer == req.producer { - // The producer matches, skip this event - removedCount++ - } else { - // Move the event to its final position after accounting for any - // earlier indices that were removed. - // (Count backwards from (startIndex + i), not from readIndex, to avoid - // sign issues when the buffer wraps.) - writeIndex := (startIndex + i - removedCount) % len(buf) - buf[writeIndex] = buf[readIndex] - } - } - - // Clear the event pointers at the end of the buffer so we don't keep - // old events in memory by accident. - for i := 0; i < removedCount; i++ { - index := (l.bufPos + l.eventCount - removedCount + i) % len(buf) - buf[index].event = nil - } - - // Subtract removed events from the internal event count - l.eventCount -= removedCount - - // signal cancel request being finished - if req.resp != nil { - req.resp <- producerCancelResponse{removed: removedCount} - } -} - -func reportCancelledState(req *pushRequest) { - // do not add waiting events if producer did send cancel signal - if cb := req.producer.state.dropCB; cb != nil { - cb(req.event) - } -} diff --git a/libbeat/publisher/queue/memqueue/runloop_test.go b/libbeat/publisher/queue/memqueue/runloop_test.go index d25537265ea3..266704fc1fde 100644 --- a/libbeat/publisher/queue/memqueue/runloop_test.go +++ b/libbeat/publisher/queue/memqueue/runloop_test.go @@ -44,7 +44,7 @@ func TestFlushSettingsDoNotBlockFullBatches(t *testing.T) { }, 10, nil) - producer := newProducer(broker, nil, nil, false, nil) + producer := newProducer(broker, nil, nil) rl := broker.runLoop for i := 0; i < 100; i++ { // Pair each publish call with an iteration of the run loop so we @@ -83,7 +83,7 @@ func TestFlushSettingsBlockPartialBatches(t *testing.T) { }, 10, nil) - producer := newProducer(broker, nil, nil, false, nil) + producer := newProducer(broker, nil, nil) rl := broker.runLoop for i := 0; i < 100; i++ { // Pair each publish call with an iteration of the run loop so we diff --git a/libbeat/publisher/queue/queue.go b/libbeat/publisher/queue/queue.go index 8758c055945f..9c186ad30d0d 100644 --- a/libbeat/publisher/queue/queue.go +++ b/libbeat/publisher/queue/queue.go @@ -106,16 +106,6 @@ type ProducerConfig struct { // if ACK is set, the callback will be called with number of events produced // by the producer instance and being ACKed by the queue. ACK func(count int) - - // OnDrop is called to report events being silently dropped by - // the queue. Currently this can only happen when a Publish call is sent - // to the memory queue's request channel but the producer is cancelled - // before it reaches the queue buffer. - OnDrop func(Entry) - - // DropOnCancel is a hint to the queue to drop events if the producer disconnects - // via Cancel. - DropOnCancel bool } type EntryID uint64 @@ -134,12 +124,10 @@ type Producer interface { // the event's assigned ID, and false otherwise. TryPublish(entry Entry) (EntryID, bool) - // Cancel closes this Producer endpoint. If the producer is configured to - // drop its entries on Cancel, the number of dropped entries is returned. - // Note: A queue may still send ACK signals even after Cancel is called on - // the originating Producer. The pipeline client must accept and - // discard these ACKs. - Cancel() int + // Close closes this Producer endpoint. + // Note: A queue may still send ACK signals even after Close is called on + // the originating Producer. The pipeline client must accept these ACKs. + Close() } // Batch of entries (usually publisher.Event) to be returned to Consumers. diff --git a/libbeat/publisher/queue/queuetest/producer_cancel.go b/libbeat/publisher/queue/queuetest/producer_cancel.go deleted file mode 100644 index 6bb8a9bdd083..000000000000 --- a/libbeat/publisher/queue/queuetest/producer_cancel.go +++ /dev/null @@ -1,106 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package queuetest - -import ( - "testing" - - "github.com/stretchr/testify/assert" - - "github.com/elastic/beats/v7/libbeat/publisher" - "github.com/elastic/beats/v7/libbeat/publisher/queue" - "github.com/elastic/elastic-agent-libs/mapstr" -) - -// TestSingleProducerConsumer tests buffered events for a producer getting -// cancelled will not be consumed anymore. Concurrent producer/consumer pairs -// might still have active events not yet ACKed (not tested here). -// -// Note: queues not requiring consumers to ACK a events in order to -// return ACKs to the producer are not supported by this test. -func TestProducerCancelRemovesEvents(t *testing.T, factory QueueFactory) { - fn := withOptLogOutput(true, func(t *testing.T) { - var ( - i int - N1 = 3 - N2 = 10 - ) - - log := NewTestLogger(t) - b := factory(t) - defer b.Close() - - log.Debug("create first producer") - producer := b.Producer(queue.ProducerConfig{ - ACK: func(int) {}, // install function pointer, so 'cancel' will remove events - DropOnCancel: true, - }) - - for ; i < N1; i++ { - log.Debugf("send event %v to first producer", i) - producer.Publish(MakeEvent(mapstr.M{ - "value": i, - })) - } - - // cancel producer - log.Debugf("cancel producer") - producer.Cancel() - - // reconnect and send some more events - log.Debug("connect new producer") - producer = b.Producer(queue.ProducerConfig{}) - for ; i < N2; i++ { - log.Debugf("send event %v to new producer", i) - producer.Publish(MakeEvent(mapstr.M{ - "value": i, - })) - } - - // consume all events - total := N2 - N1 - events := make([]interface{}, 0, total) - for len(events) < total { - batch, err := b.Get(-1) // collect all events - if err != nil { - panic(err) - } - - for i := 0; i < batch.Count(); i++ { - events = append(events, batch.Entry(i)) - } - batch.Done() - } - - // verify - if total != len(events) { - assert.Equal(t, total, len(events)) - return - } - - for i, event := range events { - pubEvent, ok := event.(publisher.Event) - assert.True(t, ok, "queue output should be the same type as its input") - value, ok := pubEvent.Content.Fields["value"].(int) - assert.True(t, ok, "event.value should be an int") - assert.Equal(t, i+N1, value) - } - }) - - fn(t) -}