diff --git a/go.mod b/go.mod index 6c3882091..0e9fb7d88 100644 --- a/go.mod +++ b/go.mod @@ -6,9 +6,11 @@ require ( github.com/99designs/keyring v1.2.1 github.com/AthenZ/athenz v1.12.13 github.com/DataDog/zstd v1.5.0 - github.com/bits-and-blooms/bitset v1.4.0 + github.com/RoaringBitmap/roaring/v2 v2.8.0 + github.com/bits-and-blooms/bitset v1.12.0 github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc + github.com/emirpasic/gods v1.18.1 github.com/docker/docker v28.0.0+incompatible github.com/docker/go-connections v0.5.0 github.com/golang-jwt/jwt/v5 v5.2.2 @@ -78,6 +80,7 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/morikuni/aec v1.0.0 // indirect + github.com/mschoch/smat v0.2.0 // indirect github.com/mtibben/percent v0.2.1 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/nxadm/tail v1.4.8 // indirect diff --git a/go.sum b/go.sum index c71833344..44ec10eb9 100644 --- a/go.sum +++ b/go.sum @@ -14,12 +14,14 @@ github.com/DataDog/zstd v1.5.0 h1:+K/VEwIAaPcHiMtQvpLD4lqW7f0Gk3xdYZmI1hD+CXo= github.com/DataDog/zstd v1.5.0/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw= github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= +github.com/RoaringBitmap/roaring/v2 v2.8.0 h1:y1rdtixfXvaITKzkfiKvScI0hlBJHe9sfzJp8cgeM7w= +github.com/RoaringBitmap/roaring/v2 v2.8.0/go.mod h1:FiJcsfkGje/nZBZgCu0ZxCPOKD/hVXDS2dXi7/eUFE0= github.com/ardielle/ardielle-go v1.5.2 h1:TilHTpHIQJ27R1Tl/iITBzMwiUGSlVfiVhwDNGM3Zj4= github.com/ardielle/ardielle-go v1.5.2/go.mod h1:I4hy1n795cUhaVt/ojz83SNVCYIGsAFAONtv2Dr7HUI= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= -github.com/bits-and-blooms/bitset v1.4.0 h1:+YZ8ePm+He2pU3dZlIZiOeAKfrBkXi1lSrXJ/Xzgbu8= -github.com/bits-and-blooms/bitset v1.4.0/go.mod h1:gIdJ4wp64HaoK2YrL1Q5/N7Y16edYb8uY+O0FJTyyDA= +github.com/bits-and-blooms/bitset v1.12.0 h1:U/q1fAF7xXRhFCrhROzIfffYnu+dlS38vCZtmFVPHmA= +github.com/bits-and-blooms/bitset v1.12.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8= github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b h1:AP/Y7sqYicnjGDfD5VcY4CIfh1hRXBUavxrvELjTiOE= github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b/go.mod h1:ac9efd0D1fsDb3EJvhqgXRbFx7bs2wqZ10HQPeU8U/Q= github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= @@ -53,6 +55,8 @@ github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4 github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/dvsekhvalnov/jose2go v1.6.0 h1:Y9gnSnP4qEI0+/uQkHvFXeD2PLPJeXEL+ySMEA2EjTY= github.com/dvsekhvalnov/jose2go v1.6.0/go.mod h1:QsHjhyTlD/lAVqn/NSbVZmSCGeDehTB/mPZadG+mhXU= +github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc= +github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= @@ -154,6 +158,8 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A= github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc= +github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM= +github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw= github.com/mtibben/percent v0.2.1 h1:5gssi8Nqo8QU/r2pynCm+hBQHpkB/uNK7BJCFogWdzs= github.com/mtibben/percent v0.2.1/go.mod h1:KG9uO+SZkUp+VkRHsCdYQV3XSZrrSpR3O9ibNBTZrns= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= @@ -346,6 +352,7 @@ gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gotest.tools/v3 v3.5.1 h1:EENdUnS3pdur5nybKYIh2Vfgc8IUNBjxDPSjtiJcOzU= diff --git a/pulsar/consumer.go b/pulsar/consumer.go index 80bbf01c3..7996335a8 100644 --- a/pulsar/consumer.go +++ b/pulsar/consumer.go @@ -191,6 +191,11 @@ type ConsumerOptions struct { // processed. Default is 1 min. (See `Consumer.Nack()`) NackRedeliveryDelay time.Duration + // NackPrecisionBit specifies the precision bit for nack redelivery delay. + // This is used to trim the lower bits of the nack redelivery delay to reduce memory usage. + // Default is 8 bits. + NackPrecisionBit *int64 + // Name specifies the consumer name. Name string diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index 20227beec..b3d3194cc 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -121,6 +121,12 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) { options.NackBackoffPolicy = new(defaultNackBackoffPolicy) } + if options.NackPrecisionBit == nil { + options.NackPrecisionBit = ptr(defaultNackPrecisionBit) + } else if *options.NackPrecisionBit < 0 { + return nil, newError(InvalidConfiguration, "NackPrecisionBit cannot be negative") + } + // did the user pass in a message channel? messageCh := options.MessageChannel if options.MessageChannel == nil { @@ -460,6 +466,7 @@ func newPartitionConsumerOpts(topic, consumerName string, idx int, options Consu receiverQueueSize: options.ReceiverQueueSize, nackRedeliveryDelay: nackRedeliveryDelay, nackBackoffPolicy: options.NackBackoffPolicy, + nackPrecisionBit: options.NackPrecisionBit, metadata: options.Properties, subProperties: options.SubscriptionProperties, replicateSubscriptionState: options.ReplicateSubscriptionState, diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index fcde4f3aa..bfd44b183 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -104,6 +104,7 @@ type partitionConsumerOpts struct { autoReceiverQueueSize bool nackRedeliveryDelay time.Duration nackBackoffPolicy NackBackoffPolicy + nackPrecisionBit *int64 metadata map[string]string subProperties map[string]string replicateSubscriptionState bool @@ -424,7 +425,8 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon pc.decryptor = decryptor - pc.nackTracker = newNegativeAcksTracker(pc, options.nackRedeliveryDelay, options.nackBackoffPolicy, pc.log) + pc.nackTracker = newNegativeAcksTracker(pc, options.nackRedeliveryDelay, options.nackBackoffPolicy, pc.log, + options.nackPrecisionBit) err := pc.grabConn("") if err != nil { diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index c36f1f592..9f20ed841 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -1225,6 +1225,102 @@ func TestConsumerNack(t *testing.T) { } } +func TestNegativeAckPrecisionBitCnt(t *testing.T) { + // Validate behavior across precision bits and default (nil -> 8) + const delay = 300 * time.Millisecond // Tracker scans every 100ms (delay/3) + ctx := context.Background() + + client, err := NewClient(ClientOptions{URL: lookupURL}) + assert.Nil(t, err) + defer client.Close() + + // Helper to verify behavior for a given NackPrecisionBit and boundary bits. + testPrecisionBitBehavior := func(nackPrecisionBit *int64, boundaryBits int64) { + // Create topic, consumer and producer inside the function + topicName := fmt.Sprintf("testNackPrecisionBit-%d-%d", boundaryBits, time.Now().UnixNano()) + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: topicName, + SubscriptionName: fmt.Sprintf("sub-%d", boundaryBits), + Type: Shared, + NackRedeliveryDelay: delay, + NackPrecisionBit: nackPrecisionBit, // can be nil for default behavior + }) + assert.Nil(t, err) + defer consumer.Close() + + producer, err := client.CreateProducer(ProducerOptions{Topic: topicName}) + assert.Nil(t, err) + defer producer.Close() + + // Align to the next window boundary based on boundaryBits + windowMs := int64(1) << boundaryBits + nowMs := time.Now().UnixMilli() + nextBoundaryMs := ((nowMs / windowMs) + 1) * windowMs // Next boundary + time.Sleep(time.Duration(nextBoundaryMs-nowMs) * time.Millisecond) + + // Send first message at the boundary + content1 := fmt.Sprintf("msg1-p%d", boundaryBits) + _, err = producer.Send(ctx, &ProducerMessage{Payload: []byte(content1)}) + assert.Nil(t, err) + + // Send second message around 3/4 into the window (still in same window) + time.Sleep(time.Duration(windowMs*3/4) * time.Millisecond) + content2 := fmt.Sprintf("msg2-p%d", boundaryBits) + _, err = producer.Send(ctx, &ProducerMessage{Payload: []byte(content2)}) + assert.Nil(t, err) + + // Receive and nack both messages + m1, err := consumer.Receive(ctx) + assert.Nil(t, err) + assert.Equal(t, content1, string(m1.Payload())) + consumer.Nack(m1) + m2, err := consumer.Receive(ctx) + assert.Nil(t, err) + assert.Equal(t, content2, string(m2.Payload())) + consumer.Nack(m2) + + // Expected redelivery window considering precision and tracker tick + expected := time.Now().Add(delay) + deviation := time.Duration(windowMs) * time.Millisecond + + // Both should be redelivered in the same cycle + rm1, err := consumer.Receive(ctx) + assert.Nil(t, err) + redeliveryTime1 := time.Now() + rm2, err := consumer.Receive(ctx) + assert.Nil(t, err) + redeliveryTime2 := time.Now() + + // For both the default precision (nil) and precisionBit=8, boundaryBits is 8. + // This checks that the default precisionBit is correctly set to 8, + // and that its redelivery behavior matches a consumer explicitly configured with precisionBit=8. + if boundaryBits == 8 { + assert.InDelta(t, redeliveryTime1.UnixMilli(), redeliveryTime2.UnixMilli(), 1) + } + + // Redelivery should occur within [expected-window, expected+buffer] + minExpected := expected.Add(-deviation) + maxExpected := expected.Add(150 * time.Millisecond) + assert.GreaterOrEqual(t, redeliveryTime1.UnixMilli(), minExpected.UnixMilli()) + assert.LessOrEqual(t, redeliveryTime2.UnixMilli(), maxExpected.UnixMilli()) + + consumer.Ack(rm1) + consumer.Ack(rm2) + } + + // Run for precision bits 1...8 with matching boundary bits + for bits := int64(1); bits <= int64(8); bits++ { + t.Run(fmt.Sprintf("PrecisionBits=%d", bits), func(_ *testing.T) { + testPrecisionBitBehavior(ptr(bits), bits) + }) + } + + // Default behavior (nil) should match precision bit 8 + t.Run("DefaultPrecisionBits=8", func(_ *testing.T) { + testPrecisionBitBehavior(nil, int64(8)) + }) +} + func TestConsumerCompression(t *testing.T) { client, err := NewClient(ClientOptions{ URL: lookupURL, diff --git a/pulsar/negative_acks_tracker.go b/pulsar/negative_acks_tracker.go index 1331c7dfa..ac9f08d79 100644 --- a/pulsar/negative_acks_tracker.go +++ b/pulsar/negative_acks_tracker.go @@ -21,35 +21,57 @@ import ( "sync" "time" + "github.com/RoaringBitmap/roaring/v2/roaring64" log "github.com/apache/pulsar-client-go/pulsar/log" + "github.com/emirpasic/gods/trees/avltree" ) type redeliveryConsumer interface { Redeliver(msgIDs []messageID) } +type ledgerID = int64 + type negativeAcksTracker struct { sync.Mutex - doneCh chan interface{} - doneOnce sync.Once - negativeAcks map[messageID]time.Time - rc redeliveryConsumer - nackBackoff NackBackoffPolicy - tick *time.Ticker - delay time.Duration - log log.Logger + doneCh chan interface{} + doneOnce sync.Once + negativeAcks *avltree.Tree + nackPrecisionBit *int64 + rc redeliveryConsumer + nackBackoff NackBackoffPolicy + tick *time.Ticker + delay time.Duration + log log.Logger } func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration, - nackBackoffPolicy NackBackoffPolicy, logger log.Logger) *negativeAcksTracker { + nackBackoffPolicy NackBackoffPolicy, logger log.Logger, nackPrecisionBit *int64) *negativeAcksTracker { t := &negativeAcksTracker{ - doneCh: make(chan interface{}), - negativeAcks: make(map[messageID]time.Time), - rc: rc, - nackBackoff: nackBackoffPolicy, - log: logger, + doneCh: make(chan interface{}), + negativeAcks: avltree.NewWith(func(a, b interface{}) int { + // Perform type assertions and handle invalid types. + timeA, okA := a.(time.Time) + timeB, okB := b.(time.Time) + + if !okA || !okB { + panic("invalid type: both values must be of type time.Time") + } + + // Compare the two time.Time values. + if timeA.Before(timeB) { + return -1 + } else if timeA.After(timeB) { + return 1 + } + return 0 // Equal times. + }), + rc: rc, + nackBackoff: nackBackoffPolicy, + log: logger, + nackPrecisionBit: nackPrecisionBit, } if nackBackoffPolicy != nil { @@ -65,6 +87,37 @@ func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration, return t } +func trimLowerBit(ts int64, precisionBit int64) int64 { + if precisionBit <= 0 { + return ts + } + mask := ^((int64(1) << precisionBit) - 1) + return ts & mask +} + +func putNackEntry(t *negativeAcksTracker, batchMsgID *messageID, delay time.Duration) { + t.Lock() + defer t.Unlock() + + targetTime := time.Now().Add(delay) + trimmedTime := time.UnixMilli(trimLowerBit(targetTime.UnixMilli(), *t.nackPrecisionBit)) + // try get trimmedTime + value, exists := t.negativeAcks.Get(trimmedTime) + if !exists { + newMap := make(map[ledgerID]*roaring64.Bitmap) + t.negativeAcks.Put(trimmedTime, newMap) + value = newMap + } + bitmapMap, ok := value.(map[ledgerID]*roaring64.Bitmap) + if !ok { + panic("negativeAcksTracker: value is not of expected type map[LedgerID]*roaring64.Bitmap") + } + if _, exists := bitmapMap[batchMsgID.ledgerID]; !exists { + bitmapMap[batchMsgID.ledgerID] = roaring64.NewBitmap() + } + bitmapMap[batchMsgID.ledgerID].Add(uint64(batchMsgID.entryID)) +} + func (t *negativeAcksTracker) Add(msgID *messageID) { // Always clear up the batch index since we want to track the nack // for the entire batch @@ -74,17 +127,7 @@ func (t *negativeAcksTracker) Add(msgID *messageID) { batchIdx: 0, } - t.Lock() - defer t.Unlock() - - _, present := t.negativeAcks[batchMsgID] - if present { - // The batch is already being tracked - return - } - - targetTime := time.Now().Add(t.delay) - t.negativeAcks[batchMsgID] = targetTime + putNackEntry(t, &batchMsgID, t.delay) } func (t *negativeAcksTracker) AddMessage(msg Message) { @@ -100,17 +143,7 @@ func (t *negativeAcksTracker) AddMessage(msg Message) { batchIdx: 0, } - t.Lock() - defer t.Unlock() - - _, present := t.negativeAcks[batchMsgID] - if present { - // The batch is already being tracked - return - } - - targetTime := time.Now().Add(nackBackoffDelay) - t.negativeAcks[batchMsgID] = targetTime + putNackEntry(t, &batchMsgID, nackBackoffDelay) } func (t *negativeAcksTracker) track() { @@ -127,13 +160,28 @@ func (t *negativeAcksTracker) track() { t.Lock() - for msgID, targetTime := range t.negativeAcks { - t.log.Debugf("MsgId: %v -- targetTime: %v -- now: %v", msgID, targetTime, now) - if targetTime.Before(now) { - t.log.Debugf("Adding MsgId: %v", msgID) - msgIDs = append(msgIDs, msgID) - delete(t.negativeAcks, msgID) + iterator := t.negativeAcks.Iterator() + for iterator.Next() { + targetTime := iterator.Key().(time.Time) + // because use ordered map, so we can early break + if targetTime.After(now) { + break + } + + ledgerMap := iterator.Value().(map[ledgerID]*roaring64.Bitmap) + for ledgerID, entrySet := range ledgerMap { + for _, entryID := range entrySet.ToArray() { + msgID := messageID{ + ledgerID: ledgerID, + entryID: int64(entryID), + batchIdx: 0, + } + msgIDs = append(msgIDs, msgID) + } } + + // Safe deletion during iteration + t.negativeAcks.Remove(targetTime) } t.Unlock() @@ -153,3 +201,7 @@ func (t *negativeAcksTracker) Close() { t.doneCh <- nil }) } + +func ptr[T any](v T) *T { return &v } + +const defaultNackPrecisionBit = int64(8) diff --git a/pulsar/negative_acks_tracker_test.go b/pulsar/negative_acks_tracker_test.go index 3f03ac446..bf3953462 100644 --- a/pulsar/negative_acks_tracker_test.go +++ b/pulsar/negative_acks_tracker_test.go @@ -29,6 +29,8 @@ import ( const testNackDelay = 300 * time.Millisecond +var testNackPrecisionBit = ptr(defaultNackPrecisionBit) + type nackMockedConsumer struct { ch chan messageID closed bool @@ -80,7 +82,7 @@ func (nmc *nackMockedConsumer) Wait() <-chan messageID { func TestNacksTracker(t *testing.T) { nmc := newNackMockedConsumer(nil) - nacks := newNegativeAcksTracker(nmc, testNackDelay, nil, log.DefaultNopLogger()) + nacks := newNegativeAcksTracker(nmc, testNackDelay, nil, log.DefaultNopLogger(), testNackPrecisionBit) nacks.Add(&messageID{ ledgerID: 1, @@ -113,7 +115,7 @@ func TestNacksTracker(t *testing.T) { func TestNacksWithBatchesTracker(t *testing.T) { nmc := newNackMockedConsumer(nil) - nacks := newNegativeAcksTracker(nmc, testNackDelay, nil, log.DefaultNopLogger()) + nacks := newNegativeAcksTracker(nmc, testNackDelay, nil, log.DefaultNopLogger(), testNackPrecisionBit) nacks.Add(&messageID{ ledgerID: 1, @@ -156,7 +158,8 @@ func TestNacksWithBatchesTracker(t *testing.T) { func TestNackBackoffTracker(t *testing.T) { nmc := newNackMockedConsumer(new(defaultNackBackoffPolicy)) - nacks := newNegativeAcksTracker(nmc, testNackDelay, new(defaultNackBackoffPolicy), log.DefaultNopLogger()) + nacks := newNegativeAcksTracker(nmc, testNackDelay, new(defaultNackBackoffPolicy), log.DefaultNopLogger(), + testNackPrecisionBit) nacks.AddMessage(new(mockMessage1)) nacks.AddMessage(new(mockMessage2))