diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index e0120ad53e..5c34b93081 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -561,7 +561,14 @@ func (c *consumer) ReconsumeLaterWithCustomProperties(msg Message, customPropert msgID: msgID, }, } - if uint32(reconsumeTimes) > c.dlq.policy.MaxDeliveries { + + maxDeliveries := c.dlq.policy.MaxDeliveries + if s, ok := props[MsgPropertyMaxReconsumeTimes]; ok { + md, _ := strconv.Atoi(s) + maxDeliveries = uint32(md) + } + + if uint32(reconsumeTimes) > maxDeliveries { c.dlq.Chan() <- consumerMsg } else { c.rlq.Chan() <- RetryMessage{ diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index 11e72a7240..f2e3d2a4f7 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -1839,6 +1839,108 @@ func TestRLQWithCustomProperties(t *testing.T) { assert.Nil(t, checkMsg) } +func TestRLQWithCustomRetryPerMsg(t *testing.T) { + topic := newTopicName() + testURL := adminURL + "/" + "admin/v2/persistent/public/default/" + topic + "/partitions" + makeHTTPCall(t, http.MethodPut, testURL, "3") + + subName := fmt.Sprintf("sub01-%d", time.Now().Unix()) + maxRedeliveries := 100 + N := 100 + ctx := context.Background() + + client, err := NewClient(ClientOptions{URL: lookupURL}) + assert.Nil(t, err) + defer client.Close() + + // 1. Pre-produce N messages + producer, err := client.CreateProducer(ProducerOptions{Topic: topic}) + assert.Nil(t, err) + defer producer.Close() + + for i := 0; i < N; i++ { + _, err = producer.Send(ctx, &ProducerMessage{ + Payload: []byte(fmt.Sprintf("MESSAGE_%d", i)), + Properties: map[string]string{ + MsgPropertyMaxReconsumeTimes: fmt.Sprintf("%d", i%maxRedeliveries), + }, + }) + assert.Nil(t, err) + } + + // 2. Create consumer on the Retry Topic to reconsume N messages + rlqConsumer, err := client.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: subName, + Type: Shared, + SubscriptionInitialPosition: SubscriptionPositionEarliest, + DLQ: &DLQPolicy{ + MaxDeliveries: uint32(maxRedeliveries), + }, + RetryEnable: true, + NackRedeliveryDelay: 1 * time.Second, + }) + assert.Nil(t, err) + defer rlqConsumer.Close() + + rlqReceived := 0 + for rlqReceived < (N/maxRedeliveries)*(maxRedeliveries*(maxRedeliveries+1)/2) { + msg, err := rlqConsumer.Receive(ctx) + assert.Nil(t, err) + rlqConsumer.ReconsumeLater(msg, 1*time.Second) + rlqReceived++ + } + fmt.Println("retry consumed:", rlqReceived) // 5050 + + // No more messages on the Retry Topic + rlqCtx, rlqCancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer rlqCancel() + msg, err := rlqConsumer.Receive(rlqCtx) + assert.Error(t, err) + assert.Nil(t, msg) + + // 3. Create consumer on the DLQ topic to verify the routing + dlqConsumer, err := client.Subscribe(ConsumerOptions{ + Topic: "persistent://public/default/" + topic + "-" + subName + "-DLQ", + SubscriptionName: subName, + SubscriptionInitialPosition: SubscriptionPositionEarliest, + }) + assert.Nil(t, err) + defer dlqConsumer.Close() + + dlqReceived := 0 + for dlqReceived < N { + msg, err := dlqConsumer.Receive(ctx) + assert.Nil(t, err) + dlqConsumer.Ack(msg) + dlqReceived++ + } + fmt.Println("dlq received:", dlqReceived) // 100 + + // No more messages on the DLQ Topic + dlqCtx, dlqCancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer dlqCancel() + msg, err = dlqConsumer.Receive(dlqCtx) + assert.Error(t, err) + assert.Nil(t, msg) + + // 4. No more messages for same subscription + checkConsumer, err := client.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: subName, + Type: Shared, + SubscriptionInitialPosition: SubscriptionPositionEarliest, + }) + assert.Nil(t, err) + defer checkConsumer.Close() + + checkCtx, checkCancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer checkCancel() + checkMsg, err := checkConsumer.Receive(checkCtx) + assert.Error(t, err) + assert.Nil(t, checkMsg) +} + func TestAckWithResponse(t *testing.T) { now := time.Now().Unix() topic01 := fmt.Sprintf("persistent://public/default/topic-%d-01", now) diff --git a/pulsar/retry_router.go b/pulsar/retry_router.go index 7b5f6b8900..38a18e6c20 100644 --- a/pulsar/retry_router.go +++ b/pulsar/retry_router.go @@ -30,6 +30,8 @@ const ( RetryTopicSuffix = "-RETRY" MaxReconsumeTimes = 16 + MsgPropertyMaxReconsumeTimes = "MAX_RECONSUME_TIMES" + SysPropertyDelayTime = "DELAY_TIME" SysPropertyRealTopic = "REAL_TOPIC" SysPropertyRetryTopic = "RETRY_TOPIC"