Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Support large number of unack message store for cursor recovery #9292

Merged
merged 2 commits into from
Oct 1, 2024

Conversation

rdhabalia
Copy link
Contributor

@rdhabalia rdhabalia commented Jan 24, 2021

Motivation

Right now, managed-cursor serializes individually deleted-message ranges and persist them in bookie ledger-entry for the recovery. managed-cursor can persist max 150K ranges due to limited bookie entry size (5MB). 150K ranges might not be enough for some of the use cases while recovering cursor. so, we need a mechanism to persist a couple of million ranges to support such usecases.

Modification

with #3818 and #3819 managed-cursor manages individually deleted messages in bitset with OpenRangeSet. Serializing OpenRangeSet can allow managed-cursor to store 10M ranges with 5MB data size.

Result

Usecases require a large number of individually deleted-messages that can be supported with this change.

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

@rdhabalia
Copy link
Contributor Author

/pulsarbot run-failure-checks

1 similar comment
@rdhabalia
Copy link
Contributor Author

/pulsarbot run-failure-checks

Copy link
Contributor

@codelipenghui codelipenghui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdhabalia I think this can only improve the cases that do not contain the too big ack holes such as 10000 unacked messages with a hole. In other words, this optimization will increase the burden on the scenario which has big ack holes right?

For example, if the message with index 0 acked and with index 10000 acked but [1-9999] does not acked. I think the new approach needs 157 long value to persistent to the bookie.

If I understand correctly, I think we should to add a flag for enabling or disabling this enhancement.

Please correct me if I miss some context here

@rdhabalia
Copy link
Contributor Author

@codelipenghui
this improvement serializes long-array to protobuf directly which should not cost cpu, and managed-ledger has a cap on max entries per ledger so, that will make sure to have a limit on size of long array. encoded 150 long shouldn't be a problem in terms of size and cpu as broker persist into bookie into a separate non-io background thread. we can also control size with managedLedgerMaxUnackedRangesToPersist config.

I think we should to add a flag for enabling or disabling this enhancement.

Sure, let me add the flag.

Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

awesome work.

what happens if I upgrade my broker to a new version and then I rollback to the previous version in case of problems ?
IIUC we are going to write data that would not be understood by the old version.
does it lead to some bad state of the system ? like having consumers that receive again the same messages ?

sijie
sijie previously requested changes Jan 28, 2021
Copy link
Member

@sijie sijie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdhabalia This is a great change. However, I also had the same concern as what Penghui and Lin had. It is an improvement and doesn't solve all the problems. I would like to see how these concerns are addressed before merging this.

Also, Penghui suggested you adding a flag to enable/disable this feature. I think we should take one more step to abstract the "unack range management" into an interface. So we can load the implementation from class instead of using a flag to turn it on/off.

@rdhabalia
Copy link
Contributor Author

what happens if I upgrade my broker to a new version and then I rollback to the previous version in case of problems ? IIUC we are going to write data that would not be understood by the old version. does it lead to some bad state of the system ? like having consumers that receive again the same messages ?

no, it will not cause bad state in broker. broker understands both the format as it has separate buckets for each. broker recovers unack message ranges based on which ever bucket has ranges and later persist ranges in only new bucket. Broker rollback will not be able to recover unack messages from new bucket and in that case it will redeliver messages newer markDeleteMessageOffset.

It is an improvement and doesn't solve all the problems.

Yes, I shared my point of view in email regarding having unack message management. However, regardless abstraction changes, I think we can proceed with this PR as this can be used with existing default unack message implementation.

@sijie
Copy link
Member

sijie commented Feb 1, 2021

Yes, I shared my point of view in email regarding having unack message management. However, regardless abstraction changes, I think we can proceed with this PR as this can be used with existing default unack message implementation.

I would like to have a consensus on providing an abstraction before merging this implementation. Otherwise, it becomes really hard to allow people to implement other algorithms.

@sijie
Copy link
Member

sijie commented Feb 1, 2021

no, it will not cause bad state in broker. broker understands both the format as it has separate buckets for each.

Can you add a backward compatibility integration test?

@codelipenghui codelipenghui modified the milestones: 2.8.0, 2.9.0 May 20, 2021
@rdhabalia
Copy link
Contributor Author

@sijie @merlimat what's the conclusion on this PR. If PR will be blocked for no reason then it will not be merged. so, we can close in that case?

@eolivelli eolivelli modified the milestones: 2.9.0, 2.10.0 Oct 6, 2021
@codelipenghui codelipenghui modified the milestones: 2.10.0, 2.11.0 Jan 18, 2022
@codelipenghui
Copy link
Contributor

The pr had no activity for 30 days, mark with Stale label.

@rdhabalia rdhabalia changed the title [pulsar-broker] Support large number of unack message store for cursor recovery [fix][broker] Support large number of unack message store for cursor recovery Sep 27, 2024
@rdhabalia rdhabalia added doc-not-needed Your PR changes do not impact docs ready-to-test and removed doc-label-missing labels Sep 27, 2024
@rdhabalia rdhabalia force-pushed the ml_unack branch 2 times, most recently from aeb11ab to 1b7bb32 Compare September 27, 2024 23:44
Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@rdhabalia rdhabalia merged commit 9eeffe5 into apache:master Oct 1, 2024
51 checks passed
@rdhabalia rdhabalia deleted the ml_unack branch October 1, 2024 19:53
@codecov-commenter
Copy link

Codecov Report

Attention: Patch coverage is 59.34066% with 37 lines in your changes missing coverage. Please review.

Project coverage is 74.54%. Comparing base (bbc6224) to head (29b3c41).
Report is 619 commits behind head on master.

Files with missing lines Patch % Lines
...il/collections/ConcurrentOpenLongPairRangeSet.java 0.00% 17 Missing ⚠️
...che/bookkeeper/mledger/impl/ManagedCursorImpl.java 82.50% 4 Missing and 3 partials ⚠️
.../common/util/collections/OpenLongPairRangeSet.java 66.66% 4 Missing and 3 partials ⚠️
...pache/bookkeeper/mledger/impl/RangeSetWrapper.java 50.00% 3 Missing and 2 partials ⚠️
...lsar/common/util/collections/LongPairRangeSet.java 50.00% 1 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##             master    #9292      +/-   ##
============================================
+ Coverage     73.57%   74.54%   +0.97%     
+ Complexity    32624     2758   -29866     
============================================
  Files          1877     1934      +57     
  Lines        139502   145222    +5720     
  Branches      15299    15875     +576     
============================================
+ Hits         102638   108258    +5620     
+ Misses        28908    28665     -243     
- Partials       7956     8299     +343     
Flag Coverage Δ
inttests 27.77% <31.86%> (+3.19%) ⬆️
systests 24.49% <31.86%> (+0.16%) ⬆️
unittests 73.90% <59.34%> (+1.05%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...apache/bookkeeper/mledger/ManagedLedgerConfig.java 96.38% <100.00%> (+0.08%) ⬆️
...lsar/common/util/collections/LongPairRangeSet.java 80.00% <50.00%> (-0.96%) ⬇️
...pache/bookkeeper/mledger/impl/RangeSetWrapper.java 87.30% <50.00%> (-7.04%) ⬇️
...che/bookkeeper/mledger/impl/ManagedCursorImpl.java 79.74% <82.50%> (+0.44%) ⬆️
.../common/util/collections/OpenLongPairRangeSet.java 88.01% <66.66%> (ø)
...il/collections/ConcurrentOpenLongPairRangeSet.java 46.08% <0.00%> (-44.42%) ⬇️

... and 599 files with indirect coverage changes

Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR is very useful if I understand it correctly. Since individual acks get encoded as long[] array, it will compress the information a lot. I guess a single long entry in the array will hold 64 bits and 64 individual acks. In theory, 1 million individual bits can be held in 128kB of memory (1024kB/8). @rdhabalia Is this the correct understanding?

@rdhabalia
Copy link
Contributor Author

I have already replied in the email but also updating the same reply here.

Is this the correct understanding of how PR 9292 efficiently stores individual acks?

Yes, that's correct. It's like serializing a Position object to a bit which could be the smallest serializable size we could achieve among any ser/des approaches.

In the past, there were two fundamental challenges we were facing to serve a large set of unack messages: (a) in memory pressure/ large GC-Pauses due to the large number of Position objects (b) serializing such a number of objects to store in bookie ledger for topic recovery.

(a) was handled by #3819 to replace a Position object with a bit which can allow brokers to run with a large number of unack messages for a topic. But it also comes with a certain limit for large scale multi-tenant systems where a broker is serving a large number of topics and serving several millions of unack messages per topic can create memory pressure on the broker. Therefore, even if we solve (b) to store billions of unack messages while topic recovery, the broker might not run with stability beyond sever millions of unack messages.
So, we don't have to solve (b) to store more than 1M-10M unack messages because keeping > 10M unack messages can impact broker stability in large scale multi-tenant env. #9292 solves this acceptable range of unack messages with which we can also run broker with stability.

Talking about PIP-381, we might be able to solve storing > 10M-100M unack messages but the question is if a broker really has that many unack messages then will broker run with such huge memory pressure and will it really serve large scale usecases for which Pulsar was built? I am sure, it might be useful for small usecases and clients can use it if needed but it might not be useful for most of the usecases.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

10 participants