Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker][PIP-379] Add observability stats for "draining hashes" #23429

Merged
merged 15 commits into from
Oct 10, 2024

Conversation

lhotari
Copy link
Member

@lhotari lhotari commented Oct 10, 2024

Motivation

PIP-379: Key_Shared Draining Hashes for Improved Message Ordering was implemented in #23352.

One of the major benefits of PIP-379 is the easy-to-understand model of when a hash is blocked.

When a new consumer is added, hash range assignments move from existing consumers to the new consumer. (In some cases, hash range assignments can move between existing consumers after a consumer is added or removed.)

The PIP-379 implementation ensures that no new messages for the hash ranges that were moved can be delivered until all unacknowledged messages for a specific hash are cleared with acknowledgements or when the consumer disconnects.
This applies to the AUTO_SPLIT ordered mode of the Key_Shared subscription type.

There's a concept of "draining hashes" in PIP-379 which is now reflected in the consumer stats. This is an intentionally exposed internal detail since the user must have the information available for understanding why messages don't get delivered.

Since there's no mapping between external and internal concepts, the abstraction isn't leaky. The user doesn't need to know about the internal details of how the draining hashes are implemented, but they need to know that the consumer is blocked on unacknowledged messages for a specific hash range. This is all relevant information and doesn't contain unnecessary implementation details.

This PR contains the "consumer stats" changes that provide the information in a clear way.

Modifications

Added consumer-level stats:

  • drainingHashesCount - the current number of hashes in the draining state for this consumer
  • drainingHashesClearedTotal - the total number of hashes cleared from the draining state since the consumer connected
  • drainingHashesUnackedMessages - the total number of unacknowledged messages for all draining hashes for this consumer
  • drainingHashes - draining hashes information for this consumer
    • hash - the sticky key hash which is draining
    • unackMsgs - the number of unacknowledged messages for this hash
    • blockedAttempts - the number of times the hash has blocked an attempted delivery of a message

In addition:

  • keyHashRangeArrays - the consumer's hash range assignments in a list of lists where each item contains the start and end as elements.
    • example [ [ 2960, 5968 ], [ 22258, 43033 ], [ 49261, 54464 ], [ 55155, 61273 ] ]

It was necessary to add this field with a new name keyHashRangeArrays since there's already an existing keyHashRange field. Changing that isn't possible since it would break compatibility. A newer admin client couldn't read stats from an older broker and vice-versa.
The previous keyHashRange is now deprecated. The field format was different.

Example of both fields where the difference is visible:

{
        "keyHashRangeArrays" : [ [ 2960, 5968 ], [ 22258, 43033 ], [ 49261, 54464 ], [ 55155, 61273 ] ],
        "keyHashRanges" : [ "[2960, 5968]", "[22258, 43033]", "[49261, 54464]", "[55155, 61273]" ],
}

The field keyHashRanges contains the information as a list of string values, which isn't very usable for most use cases since it would need to be parsed before it can be used.

The stats will continue to contain keyHashRange and readPositionWhenJoining when the "classic" (3.3.x) implementation of Key_Shared is used by configuring subscriptionKeySharedUseClassicPersistentImplementation=true ("classic" support was added in #23424).
In the default configuration, the fields are removed from the topic stats output, but the client continues to support the fields for backward and forward compatibility.

Example of consumer stats for a subscription

{      
      "consumers" : [ {
        "msgRateOut" : 0.0,
        "msgThroughputOut" : 0.0,
        "bytesOutCounter" : 1560,
        "msgOutCounter" : 30,
        "msgRateRedeliver" : 0.0,
        "messageAckRate" : 0.0,
        "chunkedMessageRate" : 0.0,
        "consumerName" : "c1",
        "availablePermits" : 70,
        "unackedMessages" : 30,
        "avgMessagesPerEntry" : 1,
        "blockedConsumerOnUnackedMsgs" : false,
        "drainingHashesCount" : 5,
        "drainingHashesClearedTotal" : 0,
        "drainingHashesUnackedMessages" : 10,
        "drainingHashes" : [ {
          "hash" : 2862,
          "unackMsgs" : 2,
          "blockedAttempts" : 5
        }, {
          "hash" : 11707,
          "unackMsgs" : 2,
          "blockedAttempts" : 9
        }, {
          "hash" : 15786,
          "unackMsgs" : 2,
          "blockedAttempts" : 6
        }, {
          "hash" : 43539,
          "unackMsgs" : 2,
          "blockedAttempts" : 6
        }, {
          "hash" : 45436,
          "unackMsgs" : 2,
          "blockedAttempts" : 9
        } ],
        "address" : "/127.0.0.1:55829",
        "connectedSince" : "2024-10-10T05:39:39.077284+03:00",
        "clientVersion" : "Pulsar-Java-v4.0.0-SNAPSHOT",
        "lastAckedTimestamp" : 0,
        "lastConsumedTimestamp" : 1728527979411,
        "lastConsumedFlowTimestamp" : 1728527979106,
        "keyHashRangeArrays" : [ [ 2960, 5968 ], [ 22258, 43033 ], [ 49261, 54464 ], [ 55155, 61273 ] ],
        "metadata" : { },
        "lastAckedTime" : "1970-01-01T02:00:00+02:00",
        "lastConsumedTime" : "2024-10-10T05:39:39.411+03:00"
      }, {
        "msgRateOut" : 0.0,
        "msgThroughputOut" : 0.0,
        "bytesOutCounter" : 0,
        "msgOutCounter" : 0,
        "msgRateRedeliver" : 0.0,
        "messageAckRate" : 0.0,
        "chunkedMessageRate" : 0.0,
        "consumerName" : "c2",
        "availablePermits" : 1000,
        "unackedMessages" : 0,
        "avgMessagesPerEntry" : 0,
        "blockedConsumerOnUnackedMsgs" : false,
        "drainingHashesCount" : 0,
        "drainingHashesClearedTotal" : 0,
        "drainingHashesUnackedMessages" : 0,
        "drainingHashes" : [ ],
        "address" : "/127.0.0.1:55829",
        "connectedSince" : "2024-10-10T05:39:39.294216+03:00",
        "clientVersion" : "Pulsar-Java-v4.0.0-SNAPSHOT",
        "lastAckedTimestamp" : 0,
        "lastConsumedTimestamp" : 0,
        "lastConsumedFlowTimestamp" : 1728527979297,
        "keyHashRangeArrays" : [ [ 1, 2959 ], [ 5969, 22257 ], [ 43034, 49260 ], [ 54465, 55154 ], [ 61274, 65535 ] ],
        "metadata" : { },
        "lastAckedTime" : "1970-01-01T02:00:00+02:00",
        "lastConsumedTime" : "1970-01-01T02:00:00+02:00"
      } ]
}

Relevant information for consumer c1:

{
        "drainingHashesCount" : 5,
        "drainingHashesClearedTotal" : 0,
        "drainingHashesUnackedMessages" : 10,
        "drainingHashes" : [ {
          "hash" : 2862,
          "unackMsgs" : 2,
          "blockedAttempts" : 5
        }, {
          "hash" : 11707,
          "unackMsgs" : 2,
          "blockedAttempts" : 9
        }, {
          "hash" : 15786,
          "unackMsgs" : 2,
          "blockedAttempts" : 6
        }, {
          "hash" : 43539,
          "unackMsgs" : 2,
          "blockedAttempts" : 6
        }, {
          "hash" : 45436,
          "unackMsgs" : 2,
          "blockedAttempts" : 9
        } ],
}

Relevant information in this case about consumer c2:

{
        "keyHashRangeArrays" : [ [ 1, 2959 ], [ 5969, 22257 ], [ 43034, 49260 ], [ 54465, 55154 ], [ 61274, 65535 ] ],
}

The PIP-379 implementation will only block hashes that are necessary. For each hash, there's a way to get detailed information to find out why the delivery is blocked.
The major difference from the previous readPositionWhenJoining solution is that it's possible to automate and build CLI and web user interface tools to assist a user, making it very easy to troubleshoot issues when message delivery is blocked by unacknowledged messages in Key_Shared subscriptions.

Client-side tooling could already use the information provided in this PR to determine which consumer is blocked by a hash in the case that there would be multiple consumers.
In the above example, the hash 2862 is contained in the hash range [1, 2959], which means 2 unacknowledged messages for that hash are preventing further messages with hash 2862 from being delivered to consumer c2.
The blockedAttempts field contains a counter that increments each time the dispatcher skips delivery to a consumer due to this hash. Using this information alone, it's very convenient to observe Key_Shared AUTO_SPLIT subscriptions and find out the causes.

A future improvement will be to add a REST API for finding out the unacknowledged message ID information of the unacknowledged message for a hash. Using this information, it's possible to find out the details of the message that is blocking a particular hash.

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

@lhotari lhotari added this to the 4.0.0 milestone Oct 10, 2024
@lhotari lhotari self-assigned this Oct 10, 2024
@github-actions github-actions bot added the doc-required Your PR changes impact docs and you will update later. label Oct 10, 2024
@codecov-commenter
Copy link

Codecov Report

Attention: Patch coverage is 73.94958% with 31 lines in your changes missing coverage. Please review.

Project coverage is 74.33%. Comparing base (bbc6224) to head (87af5f4).
Report is 654 commits behind head on master.

Files with missing lines Patch % Lines
...e/pulsar/broker/service/DrainingHashesTracker.java 70.76% 12 Missing and 7 partials ⚠️
...oker/service/persistent/RescheduleReadHandler.java 11.11% 4 Missing and 4 partials ⚠️
...ersistentStickyKeyDispatcherMultipleConsumers.java 50.00% 2 Missing ⚠️
...ava/org/apache/pulsar/broker/service/Consumer.java 75.00% 1 Missing ⚠️
...r/common/policies/data/stats/DrainingHashImpl.java 75.00% 1 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #23429      +/-   ##
============================================
+ Coverage     73.57%   74.33%   +0.76%     
- Complexity    32624    34398    +1774     
============================================
  Files          1877     1950      +73     
  Lines        139502   146980    +7478     
  Branches      15299    16184     +885     
============================================
+ Hits         102638   109261    +6623     
- Misses        28908    29298     +390     
- Partials       7956     8421     +465     
Flag Coverage Δ
inttests 27.33% <24.36%> (+2.74%) ⬆️
systests 24.38% <28.57%> (+0.05%) ⬆️
unittests 73.71% <73.94%> (+0.86%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...ker/service/persistent/PersistentSubscription.java 76.54% <100.00%> (-0.15%) ⬇️
.../common/policies/data/stats/ConsumerStatsImpl.java 98.14% <100.00%> (+0.52%) ⬆️
...mon/policies/data/stats/SubscriptionStatsImpl.java 94.69% <100.00%> (+0.57%) ⬆️
...apache/pulsar/common/util/ObjectMapperFactory.java 93.75% <100.00%> (+0.06%) ⬆️
...ava/org/apache/pulsar/broker/service/Consumer.java 85.44% <75.00%> (-1.23%) ⬇️
...r/common/policies/data/stats/DrainingHashImpl.java 75.00% <75.00%> (ø)
...ersistentStickyKeyDispatcherMultipleConsumers.java 83.78% <50.00%> (-1.86%) ⬇️
...oker/service/persistent/RescheduleReadHandler.java 80.00% <11.11%> (ø)
...e/pulsar/broker/service/DrainingHashesTracker.java 76.92% <70.76%> (ø)

... and 620 files with indirect coverage changes

Copy link
Contributor

@nicoloboschi nicoloboschi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor

@Technoboy- Technoboy- left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@lhotari lhotari merged commit acac72e into apache:master Oct 10, 2024
54 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/broker doc-required Your PR changes impact docs and you will update later. ready-to-test
Development

Successfully merging this pull request may close these issues.

4 participants