Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] consumers stops receiving new messages due to invalid blockedConsumerOnUnackedMsgs state #22657

Open
2 of 3 tasks
180254 opened this issue May 6, 2024 · 23 comments · May be fixed by #23796
Open
2 of 3 tasks

[Bug] consumers stops receiving new messages due to invalid blockedConsumerOnUnackedMsgs state #22657

180254 opened this issue May 6, 2024 · 23 comments · May be fixed by #23796
Assignees
Labels
release/blocker Indicate the PR or issue that should block the release until it gets resolved type/bug The PR fixed a bug or issue reported a bug
Milestone

Comments

@180254
Copy link

180254 commented May 6, 2024

Search before asking

  • I searched in the issues and found nothing similar.

Read release policy

  • I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker.

Version

pulsar server: docker image apachepulsar/pulsar:3.0.4 + helm chart pulsar-helm-chart
pulsar client: java client org.apache.pulsar:pulsar-client:3.0.4

Minimal reproduce step

After updating Apache Pulsar, we noticed that one of the consumers sometimes stops receiving new messages for some topics.
The last fully working version for us is 3.0.1. I have tested all later versions released so far and also built a branch-3.0.

I looked through the commits and determined when our service stops working:

  • Last commit where our service works properly: 80a8f8d
  • Commit which breaks, our service no longer works properly: 6e59208

I performed a test using the last commit from branch 3.0 (fd823f6) and reverting the individualAckNormal method to the last version before the "commit which breaks." The change looks as follows: 180254@6dac4bf. I have no problem with the modified code.

I found nothing in the logs that would inform me about the consumer suspension, etc. There are no unusual logs at all. Restarting the Kubernetes pod with consumers has helped for some time.

What did you expect to see?

consumer retrieves all messages

What did you see instead?

consumers stops receiving new messages for some topics

Anything else?

The configuration we use:

  • broker configuration:
broker:
    managedLedgerDefaultEnsembleSize: "3"
    managedLedgerDefaultWriteQuorum: "3"
    managedLedgerDefaultAckQuorum: "2"
    brokerDeduplicationEnabled: "true"
    bookkeeperClientTimeoutInSeconds: "5"
    bookkeeperClientHealthCheckErrorThresholdPerInterval: "3"
    bookkeeperClientHealthCheckQuarantineTimeInSeconds: "600"
  • namespace configuration:
bin/pulsar-admin --admin-url "${ADMIN_URL}" namespaces create "${TENANT}/service"
bin/pulsar-admin --admin-url "${ADMIN_URL}" namespaces set-max-unacked-messages-per-consumer -c 10 "${TENANT}/service"
bin/pulsar-admin --admin-url "${ADMIN_URL}" namespaces set-max-unacked-messages-per-subscription -c 20 "${TENANT}/service"
  • the topic is persistent:
persistent://public/service/service12_some_topic_someotherpart
  • we use PatternMultiTopicsConsumerImpl
   return pulsarClient
        .newConsumer(Schema.STRING)
        .subscriptionName(pulsarServerBasename)
        .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
        .subscriptionType(SubscriptionType.Shared)
        .topicsPattern(Pattern.compile("persistent://public/service/service12_.+"))
        .negativeAckRedeliveryDelay(1000, TimeUnit.MILLISECONDS)
        .patternAutoDiscoveryPeriod(60, TimeUnit.SECONDS)
        .receiverQueueSize(1)

We can reproduce it on our service. Test scenario: serviced approximately 20 customers (== 20 topics), each with about 20 messages per second. 1 message is processed in approximately 200ms. The problem occurs for a certain number of topics in the test, not for all

When a problem occurs:

  • pulsar_subscription_back_log metric shows that the backlog is growing
  • pulsar_subscription_unacked_messages metric shows 0
  • pulsar_subscription_blocked_on_unacked_messages metric shows 0
  • in the service metrics (consume & process pod), I do not see that any messages for the broken topic are being processed

Are you willing to submit a PR?

  • I'm willing to submit a PR!
@180254 180254 added the type/bug The PR fixed a bug or issue reported a bug label May 6, 2024
@lhotari
Copy link
Member

lhotari commented May 8, 2024

Thanks for the great issue report @180254.

@poorbarcode or @Technoboy- do you have a chance to take a look at this issue report?

@lhotari
Copy link
Member

lhotari commented May 8, 2024

@180254 in your case, can you detect the issue from topic stats? for example, does it tell "blockedSubscriptionOnUnackedMsgs": true?

@lhotari
Copy link
Member

lhotari commented May 8, 2024

  • pulsar_subscription_blocked_on_unacked_messages metric shows 0

sorry, noticed this now. I guess topics stats wouldn't have "blockedSubscriptionOnUnackedMsgs": true either?

@AdrianPedziwiatr-TomTom
Copy link

AdrianPedziwiatr-TomTom commented May 9, 2024

Here are the statistics for a "broken topic", collected them after a test.

{
  "msgRateIn" : 0.0,
  "msgThroughputIn" : 0.0,
  "msgRateOut" : 0.0,
  "msgThroughputOut" : 0.0,
  "bytesInCounter" : 943097,
  "msgInCounter" : 6052,
  "bytesOutCounter" : 15027,
  "msgOutCounter" : 109,
  "averageMsgSize" : 0.0,
  "msgChunkPublished" : false,
  "storageSize" : 943097,
  "backlogSize" : 934725,
  "publishRateLimitedTimes" : 0,
  "earliestMsgPublishTimeInBacklogs" : 0,
  "offloadedStorageSize" : 0,
  "lastOffloadLedgerId" : 0,
  "lastOffloadSuccessTimeStamp" : 0,
  "lastOffloadFailureTimeStamp" : 0,
  "ongoingTxnCount" : 0,
  "abortedTxnCount" : 0,
  "committedTxnCount" : 0,
  "publishers" : [ {
    "accessMode" : "Shared",
    "msgRateIn" : 0.0,
    "msgThroughputIn" : 0.0,
    "averageMsgSize" : 0.0,
    "chunkedMessageRate" : 0.0,
    "producerId" : 9,
    "supportsPartialProducer" : false,
    "metadata" : { },
    "address" : "/10.240.0.18:42256",
    "producerName" : "pulsar-3-11",
    "connectedSince" : "2024-05-09T07:11:32.288324634Z",
    "clientVersion" : "Pulsar-Java-v3.0.4"
  }, {
    "accessMode" : "Shared",
    "msgRateIn" : 0.0,
    "msgThroughputIn" : 0.0,
    "averageMsgSize" : 0.0,
    "chunkedMessageRate" : 0.0,
    "producerId" : 14,
    "supportsPartialProducer" : false,
    "metadata" : { },
    "address" : "/10.240.0.131:44020",
    "producerName" : "pulsar-3-13",
    "connectedSince" : "2024-05-09T07:11:32.28602009Z",
    "clientVersion" : "Pulsar-Java-v3.0.4"
  }, {
    "accessMode" : "Shared",
    "msgRateIn" : 0.0,
    "msgThroughputIn" : 0.0,
    "averageMsgSize" : 0.0,
    "chunkedMessageRate" : 0.0,
    "producerId" : 21,
    "supportsPartialProducer" : false,
    "metadata" : { },
    "address" : "/10.240.0.18:37600",
    "producerName" : "pulsar-3-17",
    "connectedSince" : "2024-05-09T07:11:32.289208551Z",
    "clientVersion" : "Pulsar-Java-v3.0.4"
  }, {
    "accessMode" : "Shared",
    "msgRateIn" : 0.0,
    "msgThroughputIn" : 0.0,
    "averageMsgSize" : 0.0,
    "chunkedMessageRate" : 0.0,
    "producerId" : 20,
    "supportsPartialProducer" : false,
    "metadata" : { },
    "address" : "/10.240.0.152:43668",
    "producerName" : "pulsar-3-22",
    "connectedSince" : "2024-05-09T07:11:32.315173653Z",
    "clientVersion" : "Pulsar-Java-v3.0.4"
  } ],
  "waitingPublishers" : 0,
  "subscriptions" : {
    "pulsartestad10" : {
      "msgRateOut" : 0.0,
      "msgThroughputOut" : 0.0,
      "bytesOutCounter" : 15027,
      "msgOutCounter" : 109,
      "msgRateRedeliver" : 0.0,
      "messageAckRate" : 0.0,
      "chunkedMessageRate" : 0,
      "msgBacklog" : 5777,
      "backlogSize" : 934725,
      "earliestMsgPublishTimeInBacklog" : 0,
      "msgBacklogNoDelayed" : 5777,
      "blockedSubscriptionOnUnackedMsgs" : false,
      "msgDelayed" : 0,
      "unackedMessages" : 0,
      "type" : "Shared",
      "msgRateExpired" : 0.0,
      "totalMsgExpired" : 0,
      "lastExpireTimestamp" : 0,
      "lastConsumedFlowTimestamp" : 1715238695970,
      "lastConsumedTimestamp" : 1715238695970,
      "lastAckedTimestamp" : 1715238696259,
      "lastMarkDeleteAdvancedTimestamp" : 1715238696259,
      "consumers" : [ {
        "msgRateOut" : 0.0,
        "msgThroughputOut" : 0.0,
        "bytesOutCounter" : 2346,
        "msgOutCounter" : 19,
        "msgRateRedeliver" : 0.0,
        "messageAckRate" : 0.0,
        "chunkedMessageRate" : 0.0,
        "consumerName" : "9a385",
        "availablePermits" : -17,
        "unackedMessages" : 0,
        "avgMessagesPerEntry" : 3,
        "blockedConsumerOnUnackedMsgs" : true,
        "lastAckedTimestamp" : 1715238694438,
        "lastConsumedTimestamp" : 1715238694116,
        "lastConsumedFlowTimestamp" : 1715238694124,
        "metadata" : { },
        "address" : "/10.240.0.131:44036",
        "connectedSince" : "2024-05-09T07:11:32.290486476Z",
        "clientVersion" : "Pulsar-Java-v3.0.4",
        "lastAckedTime" : "2024-05-09T07:11:34.438Z",
        "lastConsumedTime" : "2024-05-09T07:11:34.116Z"
      }, {
        "msgRateOut" : 0.0,
        "msgThroughputOut" : 0.0,
        "bytesOutCounter" : 1357,
        "msgOutCounter" : 11,
        "msgRateRedeliver" : 0.0,
        "messageAckRate" : 0.0,
        "chunkedMessageRate" : 0.0,
        "consumerName" : "58a5f",
        "availablePermits" : -10,
        "unackedMessages" : 0,
        "avgMessagesPerEntry" : 11,
        "blockedConsumerOnUnackedMsgs" : true,
        "lastAckedTimestamp" : 1715238693873,
        "lastConsumedTimestamp" : 1715238692374,
        "lastConsumedFlowTimestamp" : 1715238692409,
        "metadata" : { },
        "address" : "/10.240.0.131:44042",
        "connectedSince" : "2024-05-09T07:11:32.291014886Z",
        "clientVersion" : "Pulsar-Java-v3.0.4",
        "lastAckedTime" : "2024-05-09T07:11:33.873Z",
        "lastConsumedTime" : "2024-05-09T07:11:32.374Z"
      }, {
        "msgRateOut" : 0.0,
        "msgThroughputOut" : 0.0,
        "bytesOutCounter" : 1476,
        "msgOutCounter" : 12,
        "msgRateRedeliver" : 0.0,
        "messageAckRate" : 0.0,
        "chunkedMessageRate" : 0.0,
        "consumerName" : "c4fba",
        "availablePermits" : -11,
        "unackedMessages" : 0,
        "avgMessagesPerEntry" : 12,
        "blockedConsumerOnUnackedMsgs" : true,
        "lastAckedTimestamp" : 1715238694112,
        "lastConsumedTimestamp" : 1715238692413,
        "lastConsumedFlowTimestamp" : 1715238692543,
        "metadata" : { },
        "address" : "/10.240.0.131:44008",
        "connectedSince" : "2024-05-09T07:11:32.291343692Z",
        "clientVersion" : "Pulsar-Java-v3.0.4",
        "lastAckedTime" : "2024-05-09T07:11:34.112Z",
        "lastConsumedTime" : "2024-05-09T07:11:32.413Z"
      }, {
        "msgRateOut" : 0.0,
        "msgThroughputOut" : 0.0,
        "bytesOutCounter" : 9848,
        "msgOutCounter" : 67,
        "msgRateRedeliver" : 0.0,
        "messageAckRate" : 0.0,
        "chunkedMessageRate" : 0.0,
        "consumerName" : "cff0d",
        "availablePermits" : -5,
        "unackedMessages" : 0,
        "avgMessagesPerEntry" : 2,
        "blockedConsumerOnUnackedMsgs" : true,
        "lastAckedTimestamp" : 1715238696259,
        "lastConsumedTimestamp" : 1715238695970,
        "lastConsumedFlowTimestamp" : 1715238695972,
        "metadata" : { },
        "address" : "/10.240.0.18:42270",
        "connectedSince" : "2024-05-09T07:11:32.291622498Z",
        "clientVersion" : "Pulsar-Java-v3.0.4",
        "lastAckedTime" : "2024-05-09T07:11:36.259Z",
        "lastConsumedTime" : "2024-05-09T07:11:35.97Z"
      } ],
      "isDurable" : true,
      "isReplicated" : false,
      "allowOutOfOrderDelivery" : false,
      "consumersAfterMarkDeletePosition" : { },
      "nonContiguousDeletedMessagesRanges" : 0,
      "nonContiguousDeletedMessagesRangesSerializedSize" : 0,
      "delayedMessageIndexSizeInBytes" : 0,
      "subscriptionProperties" : { },
      "filterProcessedMsgCount" : 0,
      "filterAcceptedMsgCount" : 0,
      "filterRejectedMsgCount" : 0,
      "filterRescheduledMsgCount" : 0,
      "durable" : true,
      "replicated" : false
    }
  },
  "replication" : { },
  "deduplicationStatus" : "Enabled",
  "nonContiguousDeletedMessagesRanges" : 0,
  "nonContiguousDeletedMessagesRangesSerializedSize" : 0,
  "delayedMessageIndexSizeInBytes" : 0,
  "compaction" : {
    "lastCompactionRemovedEventCount" : 0,
    "lastCompactionSucceedTimestamp" : 0,
    "lastCompactionFailedTimestamp" : 0,
    "lastCompactionDurationTimeInMills" : 0
  },
  "ownerBroker" : "pulsar-broker-2.pulsar-broker.default.svc.cluster.local:8080"
}

There are 4 consumers, each of them is "unackedMessages" : 0 and at the same time "blockedConsumerOnUnackedMsgs" : true.

@lhotari
Copy link
Member

lhotari commented May 9, 2024

There are 4 consumers, each of them is "unackedMessages" : 0 and at the same time "blockedConsumerOnUnackedMsgs" : true.

@AdrianPedziwiatr-TomTom thanks for sharing. This is an interesting detail.

@180254
Copy link
Author

180254 commented May 11, 2024

A little progress in reproducing the problem in the unit test:

180254@5822ab6 (test22657_1_parameterized)

v3.0.1: fails when [maxUnackedMsgPerConsumer=1,maxUnackedMsgPerSubscription=1], other variants ok
v3.0.2: fails when [maxUnackedMsgPerConsumer=1,maxUnackedMsgPerSubscription=any value], other variants ok
branch-3.0 (b178084): fails when [maxUnackedMsgPerConsumer=1,maxUnackedMsgPerSubscription=any value], other variants ok

@180254
Copy link
Author

180254 commented May 14, 2024

I reproduced the problem also for larger values of maxUnackedMsgPerConsumer.
The new test well represents the issue we are struggling with.

Please see:
180254@5822ab6 (test22657_3, test22657_3_moreconsumers)

Test results:

  • v3.0.1: fails when [maxUnackedMsgPerConsumer=1,maxUnackedMsgPerSubscription=1], other variants ok
  • v3.0.2: fails in all cases
  • branch-3.0 (2da571e): fails in all cases

Some log from the failed case:

2024-05-14T17:06:13,335 - INFO  - [awaitility-thread:BrokerServiceTest] - ----
2024-05-14T17:06:13,336 - INFO  - [awaitility-thread:BrokerServiceTest] - subscriptionStats: SubscriptionStatsImpl(msgRateOut=0.0, msgThroughputOut=0.0, bytesOutCounter=744, msgOutCounter=40, msgRateRedeliver=0.0, messageAckRate=0.0, chunkedMessageRate=0, msgBacklog=0, backlogSize=-1, earliestMsgPublishTimeInBacklog=0, msgBacklogNoDelayed=0, blockedSubscriptionOnUnackedMsgs=false, msgDelayed=0, unackedMessages=0, type=Shared, activeConsumerName=null, msgRateExpired=0.0, totalMsgExpired=0, lastExpireTimestamp=0, lastConsumedFlowTimestamp=1715699172372, lastConsumedTimestamp=1715699172383, lastAckedTimestamp=1715699172386, lastMarkDeleteAdvancedTimestamp=1715699172386, consumers=[ConsumerStatsImpl(msgRateOut=0.0, msgThroughputOut=0.0, bytesOutCounter=744, msgOutCounter=40, msgRateRedeliver=0.0, messageAckRate=0.0, chunkedMessageRate=0.0, consumerName=2b07e, availablePermits=-39, unackedMessages=0, avgMessagesPerEntry=40, blockedConsumerOnUnackedMsgs=true, readPositionWhenJoining=null, addressOffset=0, addressLength=16, connectedSinceOffset=34, connectedSinceLength=35, clientVersionOffset=16, clientVersionLength=18, lastAckedTimestamp=1715699172386, lastConsumedTimestamp=1715699172383, lastConsumedFlowTimestamp=1715699172386, keyHashRanges=null, metadata={}, stringBuffer=/127.0.0.1:45914Pulsar-Java-v3.0.52024-05-14T17:06:12.371810907+02:00)], isDurable=true, isReplicated=false, allowOutOfOrderDelivery=false, keySharedMode=null, consumersAfterMarkDeletePosition={}, nonContiguousDeletedMessagesRanges=0, nonContiguousDeletedMessagesRangesSerializedSize=0, delayedMessageIndexSizeInBytes=0, bucketDelayedIndexStats={}, subscriptionProperties={}, filterProcessedMsgCount=0, filterAcceptedMsgCount=0, filterRejectedMsgCount=0, filterRescheduledMsgCount=0)
2024-05-14T17:06:13,336 - INFO  - [awaitility-thread:BrokerServiceTest] - currentReceiverQueueSize: 1
2024-05-14T17:06:13,336 - INFO  - [awaitility-thread:BrokerServiceTest] - numMessagesInQueue: 0
2024-05-14T17:06:13,336 - INFO  - [awaitility-thread:BrokerServiceTest] - unackedMessagesSubscription: 0
2024-05-14T17:06:13,336 - INFO  - [awaitility-thread:BrokerServiceTest] - blockedSubscriptionOnUnackedMsgs: false
2024-05-14T17:06:13,336 - INFO  - [awaitility-thread:BrokerServiceTest] - unackedMessagesConsumer: 0
2024-05-14T17:06:13,336 - INFO  - [awaitility-thread:BrokerServiceTest] - blockedConsumerOnUnackedMsgs: true
2024-05-14T17:06:13,336 - INFO  - [awaitility-thread:BrokerServiceTest] - ----

(in summary)

  unackedMessagesSubscription: 0
  blockedSubscriptionOnUnackedMsgs: false
  unackedMessagesConsumer: 0
  blockedConsumerOnUnackedMsgs: true

At branch-3.0...180254:pulsar-issue-22657:branch-3.0 you can find all my tests and the restored old version of the individualAckNormal method for testing/comparison.

@prasathsekar
Copy link

Getting the same issue, consumer stops receiving new messages

{
"msgRateIn" : 0.0,
"msgThroughputIn" : 0.0,
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesInCounter" : 388755,
"msgInCounter" : 136,
"bytesOutCounter" : 4481887,
"msgOutCounter" : 1636,
"averageMsgSize" : 0.0,
"msgChunkPublished" : false,
"storageSize" : 391126,
"backlogSize" : 391126,
"publishRateLimitedTimes" : 0,
"earliestMsgPublishTimeInBacklogs" : 0,
"offloadedStorageSize" : 0,
"lastOffloadLedgerId" : 0,
"lastOffloadSuccessTimeStamp" : 0,
"lastOffloadFailureTimeStamp" : 0,
"ongoingTxnCount" : 0,
"abortedTxnCount" : 39,
"committedTxnCount" : 76,
"publishers" : [ ],
"waitingPublishers" : 0,
"subscriptions" : {
"my-subscription" : {
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesOutCounter" : 4481829,
"msgOutCounter" : 1635,
"msgRateRedeliver" : 0.0,
"messageAckRate" : 0.0,
"chunkedMessageRate" : 0,
"msgBacklog" : 54,
"backlogSize" : 359284,
"earliestMsgPublishTimeInBacklog" : 0,
"msgBacklogNoDelayed" : 54,
"blockedSubscriptionOnUnackedMsgs" : false,
"msgDelayed" : 0,
"unackedMessages" : 56,
"type" : "Shared",
"msgRateExpired" : 0.0,
"totalMsgExpired" : 0,
"lastExpireTimestamp" : 0,
"lastConsumedFlowTimestamp" : 1716211257096,
"lastConsumedTimestamp" : 1716211552131,
"lastAckedTimestamp" : 0,
"lastMarkDeleteAdvancedTimestamp" : 1716204988009,
"consumers" : [ {
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesOutCounter" : 340432,
"msgOutCounter" : 125,
"msgRateRedeliver" : 0.0,
"messageAckRate" : 0.0,
"chunkedMessageRate" : 0.0,
"consumerName" : "8c286d32f5",
"availablePermits" : 875,
"unackedMessages" : 56,
"avgMessagesPerEntry" : 1,
"blockedConsumerOnUnackedMsgs" : false,
"address" : "/10.20.1.13:35118",
"connectedSince" : "2024-05-20T13:20:57.088603363Z",
"clientVersion" : "Pulsar-CPP-v3.5.1",
"lastAckedTimestamp" : 0,
"lastConsumedTimestamp" : 1716211552131,
"lastConsumedFlowTimestamp" : 1716211257096,
"metadata" : { },
"lastAckedTime" : "1970-01-01T00:00:00Z",
"lastConsumedTime" : "2024-05-20T13:25:52.131Z"
} ],
"isDurable" : true,
"isReplicated" : false,
"allowOutOfOrderDelivery" : false,
"consumersAfterMarkDeletePosition" : { },
"nonContiguousDeletedMessagesRanges" : 49,
"nonContiguousDeletedMessagesRangesSerializedSize" : 731,
"delayedMessageIndexSizeInBytes" : 0,
"subscriptionProperties" : { },
"filterProcessedMsgCount" : 0,
"filterAcceptedMsgCount" : 0,
"filterRejectedMsgCount" : 0,
"filterRescheduledMsgCount" : 0,
"durable" : true,
"replicated" : false
},
"cmi-cdr-webhooks-subscription" : {
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesOutCounter" : 0,
"msgOutCounter" : 0,
"msgRateRedeliver" : 0.0,
"messageAckRate" : 0.0,
"chunkedMessageRate" : 0,
"msgBacklog" : 221,
"backlogSize" : 391126,
"earliestMsgPublishTimeInBacklog" : 0,
"msgBacklogNoDelayed" : 221,
"blockedSubscriptionOnUnackedMsgs" : false,
"msgDelayed" : 0,
"unackedMessages" : 0,
"type" : "Shared",
"msgRateExpired" : 0.0,
"totalMsgExpired" : 0,
"lastExpireTimestamp" : 0,
"lastConsumedFlowTimestamp" : 1716115069064,
"lastConsumedTimestamp" : 0,
"lastAckedTimestamp" : 0,
"lastMarkDeleteAdvancedTimestamp" : 0,
"consumers" : [ ],
"isDurable" : true,
"isReplicated" : false,
"allowOutOfOrderDelivery" : false,
"consumersAfterMarkDeletePosition" : { },
"nonContiguousDeletedMessagesRanges" : 0,
"nonContiguousDeletedMessagesRangesSerializedSize" : 0,
"delayedMessageIndexSizeInBytes" : 0,
"subscriptionProperties" : { },
"filterProcessedMsgCount" : 0,
"filterAcceptedMsgCount" : 0,
"filterRejectedMsgCount" : 0,
"filterRescheduledMsgCount" : 0,
"durable" : true,
"replicated" : false
}
},
"replication" : { },
"deduplicationStatus" : "Disabled",
"nonContiguousDeletedMessagesRanges" : 49,
"nonContiguousDeletedMessagesRangesSerializedSize" : 731,
"delayedMessageIndexSizeInBytes" : 0,
"compaction" : {
"lastCompactionRemovedEventCount" : 0,
"lastCompactionSucceedTimestamp" : 0,
"lastCompactionFailedTimestamp" : 0,
"lastCompactionDurationTimeInMills" : 0
},
"ownerBroker" : "10.0.0.1:8080"
}

pulsar version 3.2.1

@lhotari
Copy link
Member

lhotari commented May 21, 2024

pulsar version 3.2.1

@prasathsekar You might be facing another bug that is already fixed in 3.2.3 with #22454. Please upgrade to Pulsar 3.2.3 and then comment whether the problem is resolved.

@180254 180254 changed the title [Bug] consumers stops receiving new messages [Bug] consumers stops receiving new messages due to invalid blockedConsumerOnUnackedMsgs value May 22, 2024
@180254 180254 changed the title [Bug] consumers stops receiving new messages due to invalid blockedConsumerOnUnackedMsgs value [Bug] consumers stops receiving new messages due to invalid blockedConsumerOnUnackedMsgs state May 23, 2024
@MichalKoziorowski-TomTom
Copy link
Contributor

Hi @Technoboy-, @poorbarcode Will you have a chance to look at this? You might have the biggest knowledge, because the commit that changed this was yours.

@lhotari
Copy link
Member

lhotari commented Jun 5, 2024

@MichalKoziorowski-TomTom please confirm whether this problem reproduces on 3.2.3 or 3.0.5 .

@180254
Copy link
Author

180254 commented Jun 5, 2024

Before submitting a ticket, we also checked 3.2.x. The problem with our service also occurred there.

I reran the proposed BrokerServiceTest.java tests that I shared in previous messages.
branch-3.0, commit 46b5419: doesn't work, results as reported previously
master, commit 342d88d: doesn't work, results as reported previously

@dao-jun
Copy link
Member

dao-jun commented Jun 5, 2024

I checked the code, my first vision is maybe it could have race conditions here. But I didn't dive deeper.

@lhotari
Copy link
Member

lhotari commented Jun 5, 2024

@180254 I experimented with some changes in lhotari#192 , I added test cases based on your work. There are multiple inconsistencies in handling the unacked message counts and blocking/unblocking dispatchers. The main gap in the experiment is the handling for negative acknowledgements.
The changes I made fixed the test cases, but I didn't run other tests to verify that there aren't regressions caused by the change. Most likely there are because of invalid negative ack handling in the experiment.

@MichalKoziorowski-TomTom
Copy link
Contributor

MichalKoziorowski-TomTom commented Aug 7, 2024

Hi.

Is there any chance someone will look at this race condition? We're trying to figure out some workaround to not see this problem.

@lhotari
Copy link
Member

lhotari commented Aug 30, 2024

Hi.

Is there any chance someone will look at this race condition? We're trying to figure out some workaround to not see this problem.

I'm doing this now. I'm sorry for the long delay.

@lhotari lhotari self-assigned this Aug 30, 2024
@lhotari lhotari added the release/blocker Indicate the PR or issue that should block the release until it gets resolved label Aug 30, 2024
@lhotari lhotari added this to the 4.0.0 milestone Aug 30, 2024
@lhotari
Copy link
Member

lhotari commented Aug 30, 2024

I remember seeing that this is a regression caused by #20990.

@lhotari
Copy link
Member

lhotari commented Aug 30, 2024

It looks like #23072 makes improvements to the part where the regression happened.

@lhotari
Copy link
Member

lhotari commented Aug 30, 2024

There's #21126 before #23072.

@lhotari
Copy link
Member

lhotari commented Aug 30, 2024

Rebased the test cases added by @180254 here: lhotari@ff0c8a5 . It looks like the problem persists after #21126 and #23072 .

@lhotari
Copy link
Member

lhotari commented Aug 30, 2024

Great job by @180254 in doing the troubleshooting, thank you!

@lhotari
Copy link
Member

lhotari commented Sep 19, 2024

@180254 I experimented with some changes in lhotari#192 , I added test cases based on your work. There are multiple inconsistencies in handling the unacked message counts and blocking/unblocking dispatchers. The main gap in the experiment is the handling for negative acknowledgements.
The changes I made fixed the test cases, but I didn't run other tests to verify that there aren't regressions caused by the change. Most likely there are because of invalid negative ack handling in the experiment.

Resuming this work would be needed.

@summeriiii
Copy link
Contributor

@lhotari I found this issue was introduced by #20990. I add a pr to fix this, update blockedConsumerOnUnackedMsgs's value after change the unackedMessages.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
release/blocker Indicate the PR or issue that should block the release until it gets resolved type/bug The PR fixed a bug or issue reported a bug
7 participants