Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ require (
github.com/99designs/keyring v1.2.1
github.com/AthenZ/athenz v1.12.13
github.com/DataDog/zstd v1.5.0
github.com/bits-and-blooms/bitset v1.4.0
github.com/RoaringBitmap/roaring/v2 v2.8.0
github.com/bits-and-blooms/bitset v1.12.0
github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc
github.com/emirpasic/gods v1.18.1
github.com/docker/docker v28.0.0+incompatible
github.com/docker/go-connections v0.5.0
github.com/golang-jwt/jwt/v5 v5.2.2
Expand Down Expand Up @@ -78,6 +80,7 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/morikuni/aec v1.0.0 // indirect
github.com/mschoch/smat v0.2.0 // indirect
github.com/mtibben/percent v0.2.1 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/nxadm/tail v1.4.8 // indirect
Expand Down
11 changes: 9 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@ github.com/DataDog/zstd v1.5.0 h1:+K/VEwIAaPcHiMtQvpLD4lqW7f0Gk3xdYZmI1hD+CXo=
github.com/DataDog/zstd v1.5.0/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw=
github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY=
github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU=
github.com/RoaringBitmap/roaring/v2 v2.8.0 h1:y1rdtixfXvaITKzkfiKvScI0hlBJHe9sfzJp8cgeM7w=
github.com/RoaringBitmap/roaring/v2 v2.8.0/go.mod h1:FiJcsfkGje/nZBZgCu0ZxCPOKD/hVXDS2dXi7/eUFE0=
github.com/ardielle/ardielle-go v1.5.2 h1:TilHTpHIQJ27R1Tl/iITBzMwiUGSlVfiVhwDNGM3Zj4=
github.com/ardielle/ardielle-go v1.5.2/go.mod h1:I4hy1n795cUhaVt/ojz83SNVCYIGsAFAONtv2Dr7HUI=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bits-and-blooms/bitset v1.4.0 h1:+YZ8ePm+He2pU3dZlIZiOeAKfrBkXi1lSrXJ/Xzgbu8=
github.com/bits-and-blooms/bitset v1.4.0/go.mod h1:gIdJ4wp64HaoK2YrL1Q5/N7Y16edYb8uY+O0FJTyyDA=
github.com/bits-and-blooms/bitset v1.12.0 h1:U/q1fAF7xXRhFCrhROzIfffYnu+dlS38vCZtmFVPHmA=
github.com/bits-and-blooms/bitset v1.12.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8=
github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b h1:AP/Y7sqYicnjGDfD5VcY4CIfh1hRXBUavxrvELjTiOE=
github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b/go.mod h1:ac9efd0D1fsDb3EJvhqgXRbFx7bs2wqZ10HQPeU8U/Q=
github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
Expand Down Expand Up @@ -53,6 +55,8 @@ github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4
github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/dvsekhvalnov/jose2go v1.6.0 h1:Y9gnSnP4qEI0+/uQkHvFXeD2PLPJeXEL+ySMEA2EjTY=
github.com/dvsekhvalnov/jose2go v1.6.0/go.mod h1:QsHjhyTlD/lAVqn/NSbVZmSCGeDehTB/mPZadG+mhXU=
github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc=
github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ=
github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
Expand Down Expand Up @@ -154,6 +158,8 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A=
github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc=
github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM=
github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw=
github.com/mtibben/percent v0.2.1 h1:5gssi8Nqo8QU/r2pynCm+hBQHpkB/uNK7BJCFogWdzs=
github.com/mtibben/percent v0.2.1/go.mod h1:KG9uO+SZkUp+VkRHsCdYQV3XSZrrSpR3O9ibNBTZrns=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
Expand Down Expand Up @@ -346,6 +352,7 @@ gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gotest.tools/v3 v3.5.1 h1:EENdUnS3pdur5nybKYIh2Vfgc8IUNBjxDPSjtiJcOzU=
Expand Down
5 changes: 5 additions & 0 deletions pulsar/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,11 @@ type ConsumerOptions struct {
// processed. Default is 1 min. (See `Consumer.Nack()`)
NackRedeliveryDelay time.Duration

// NackPrecisionBit specifies the precision bit for nack redelivery delay.
// This is used to trim the lower bits of the nack redelivery delay to reduce memory usage.
// Default is 8 bits.
NackPrecisionBit *int64

// Name specifies the consumer name.
Name string

Expand Down
7 changes: 7 additions & 0 deletions pulsar/consumer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,12 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
options.NackBackoffPolicy = new(defaultNackBackoffPolicy)
}

if options.NackPrecisionBit == nil {
options.NackPrecisionBit = ptr(defaultNackPrecisionBit)
} else if *options.NackPrecisionBit < 0 {
return nil, newError(InvalidConfiguration, "NackPrecisionBit cannot be negative")
}

// did the user pass in a message channel?
messageCh := options.MessageChannel
if options.MessageChannel == nil {
Expand Down Expand Up @@ -460,6 +466,7 @@ func newPartitionConsumerOpts(topic, consumerName string, idx int, options Consu
receiverQueueSize: options.ReceiverQueueSize,
nackRedeliveryDelay: nackRedeliveryDelay,
nackBackoffPolicy: options.NackBackoffPolicy,
nackPrecisionBit: options.NackPrecisionBit,
metadata: options.Properties,
subProperties: options.SubscriptionProperties,
replicateSubscriptionState: options.ReplicateSubscriptionState,
Expand Down
4 changes: 3 additions & 1 deletion pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ type partitionConsumerOpts struct {
autoReceiverQueueSize bool
nackRedeliveryDelay time.Duration
nackBackoffPolicy NackBackoffPolicy
nackPrecisionBit *int64
metadata map[string]string
subProperties map[string]string
replicateSubscriptionState bool
Expand Down Expand Up @@ -424,7 +425,8 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon

pc.decryptor = decryptor

pc.nackTracker = newNegativeAcksTracker(pc, options.nackRedeliveryDelay, options.nackBackoffPolicy, pc.log)
pc.nackTracker = newNegativeAcksTracker(pc, options.nackRedeliveryDelay, options.nackBackoffPolicy, pc.log,
options.nackPrecisionBit)

err := pc.grabConn("")
if err != nil {
Expand Down
96 changes: 96 additions & 0 deletions pulsar/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1225,6 +1225,102 @@ func TestConsumerNack(t *testing.T) {
}
}

func TestNegativeAckPrecisionBitCnt(t *testing.T) {
// Validate behavior across precision bits and default (nil -> 8)
const delay = 300 * time.Millisecond // Tracker scans every 100ms (delay/3)
ctx := context.Background()

client, err := NewClient(ClientOptions{URL: lookupURL})
assert.Nil(t, err)
defer client.Close()

// Helper to verify behavior for a given NackPrecisionBit and boundary bits.
testPrecisionBitBehavior := func(nackPrecisionBit *int64, boundaryBits int64) {
// Create topic, consumer and producer inside the function
topicName := fmt.Sprintf("testNackPrecisionBit-%d-%d", boundaryBits, time.Now().UnixNano())
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topicName,
SubscriptionName: fmt.Sprintf("sub-%d", boundaryBits),
Type: Shared,
NackRedeliveryDelay: delay,
NackPrecisionBit: nackPrecisionBit, // can be nil for default behavior
})
assert.Nil(t, err)
defer consumer.Close()

producer, err := client.CreateProducer(ProducerOptions{Topic: topicName})
assert.Nil(t, err)
defer producer.Close()

// Align to the next window boundary based on boundaryBits
windowMs := int64(1) << boundaryBits
nowMs := time.Now().UnixMilli()
nextBoundaryMs := ((nowMs / windowMs) + 1) * windowMs // Next boundary
time.Sleep(time.Duration(nextBoundaryMs-nowMs) * time.Millisecond)

// Send first message at the boundary
content1 := fmt.Sprintf("msg1-p%d", boundaryBits)
_, err = producer.Send(ctx, &ProducerMessage{Payload: []byte(content1)})
assert.Nil(t, err)

// Send second message around 3/4 into the window (still in same window)
time.Sleep(time.Duration(windowMs*3/4) * time.Millisecond)
content2 := fmt.Sprintf("msg2-p%d", boundaryBits)
_, err = producer.Send(ctx, &ProducerMessage{Payload: []byte(content2)})
assert.Nil(t, err)

// Receive and nack both messages
m1, err := consumer.Receive(ctx)
assert.Nil(t, err)
assert.Equal(t, content1, string(m1.Payload()))
consumer.Nack(m1)
m2, err := consumer.Receive(ctx)
assert.Nil(t, err)
assert.Equal(t, content2, string(m2.Payload()))
consumer.Nack(m2)

// Expected redelivery window considering precision and tracker tick
expected := time.Now().Add(delay)
deviation := time.Duration(windowMs) * time.Millisecond

// Both should be redelivered in the same cycle
rm1, err := consumer.Receive(ctx)
assert.Nil(t, err)
redeliveryTime1 := time.Now()
rm2, err := consumer.Receive(ctx)
assert.Nil(t, err)
redeliveryTime2 := time.Now()

// For both the default precision (nil) and precisionBit=8, boundaryBits is 8.
// This checks that the default precisionBit is correctly set to 8,
// and that its redelivery behavior matches a consumer explicitly configured with precisionBit=8.
if boundaryBits == 8 {
assert.InDelta(t, redeliveryTime1.UnixMilli(), redeliveryTime2.UnixMilli(), 1)
}

// Redelivery should occur within [expected-window, expected+buffer]
minExpected := expected.Add(-deviation)
maxExpected := expected.Add(150 * time.Millisecond)
assert.GreaterOrEqual(t, redeliveryTime1.UnixMilli(), minExpected.UnixMilli())
assert.LessOrEqual(t, redeliveryTime2.UnixMilli(), maxExpected.UnixMilli())

consumer.Ack(rm1)
consumer.Ack(rm2)
}

// Run for precision bits 1...8 with matching boundary bits
for bits := int64(1); bits <= int64(8); bits++ {
t.Run(fmt.Sprintf("PrecisionBits=%d", bits), func(_ *testing.T) {
testPrecisionBitBehavior(ptr(bits), bits)
})
}

// Default behavior (nil) should match precision bit 8
t.Run("DefaultPrecisionBits=8", func(_ *testing.T) {
testPrecisionBitBehavior(nil, int64(8))
})
}

func TestConsumerCompression(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
Expand Down
Loading