-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[improve] PIP-381: Handle PositionInfo that's too large to serialize as a single entry #22799
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is touching on-disk formats it would be good to have a PIP discussion.
I wonder if this is related to PIP-81 which doesn't seem to have an implementation: https://github.com/apache/pulsar/wiki/PIP-81%3A-Split-the-individual-acknowledgments-into-multiple-entries There was a larger PR for PIP-81 that was closed: #10729 |
Btw. I'm currently investigating a Key_Shared subscription type issue where ordinary consumption of message leads to a very large number of "ack holes". The WIP test app where this is reproduced is https://github.com/lhotari/pulsar-playground/blob/master/src/main/java/com/github/lhotari/pulsar/playground/TestScenarioIssueKeyShared.java . No messages get lost. It's just that some messages don't get delivered until all other messages have been processed. |
In the 1M message experiment, the number of ack holes goes down from about 150k ack holes to <500 with this experiment: lhotari@a3b0639 |
6fd14cc
to
319ad5f
Compare
319ad5f
to
1c405fe
Compare
this is a real problem and it has been solved with a simple and fundamentally proven solution with perf numbers : #9292 But again I am not sure some folks blocked this PR without saying the reason even after asking multiple times and blocked the progress on this PR. |
Since a PR implemented the compression of PositionInfo, the size of PositionInfo can be greatly reduced, and the problem of Entry size exceeding the threshold will no longer occur, so this PIP was not further promoted. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work Andrey!
If there's way to refactor the logic to avoid byte[]
and use Netty ByteBuf when possible, the solution would be more aligned with the "no garbage" style in Pulsar.
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
Outdated
Show resolved
Hide resolved
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
Outdated
Show resolved
Hide resolved
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
Outdated
Show resolved
Hide resolved
@315157973 Are you referring to PIP-146: ManagedCursorInfo compression |
(cherry picked from commit 1ef9664)
* serialize/compress without intermediate byte arrays * use lightproto for cursor serialization to the ledger * Reuse PositionInfo (cherry picked from commit 1887c44)
(cherry picked from commit 98a3d25)
* ManagedCursor: manually serialise PositionInfo * Add tests and save last serialized side to prevent reallocations (cherry picked from commit 8a365d0)
(cherry picked from commit 44ba614)
(cherry picked from commit f1323c6)
(cherry picked from commit d4b94ab)
(cherry picked from commit 5f07f0c)
(cherry picked from commit 6d2e494)
…pache#275) (cherry picked from commit 6a2a010)
(cherry picked from commit 4c5387d)
(cherry picked from commit c3fe80e)
…footer of the chunked data (apache#282) (cherry picked from commit 6e72ecb)
1c405fe
to
1397faf
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #22799 +/- ##
============================================
+ Coverage 73.57% 74.19% +0.61%
- Complexity 32624 34041 +1417
============================================
Files 1877 1937 +60
Lines 139502 146587 +7085
Branches 15299 16087 +788
============================================
+ Hits 102638 108756 +6118
- Misses 28908 29472 +564
- Partials 7956 8359 +403
Flags with carried forward coverage won't be shown. Click here to find out more.
|
@lhotari I got rid of byte[] usage and added a feature flag, per your comments |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a few review comments.
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
Outdated
Show resolved
Hide resolved
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
Outdated
Show resolved
Hide resolved
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
Show resolved
Hide resolved
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
conf/broker.conf
Outdated
#persistentUnackedRangesMaxEntrySize=1048576 | ||
|
||
# Set the compression type to use for cursor info. | ||
# Possible options are NONE, LZ4, ZLIB, ZSTD, SNAPPY | ||
#cursorInfoCompressionType=NONE |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to remove the comment from the line. The reason for this is that when it's without the comment, it won't require PULSAR_PREFIX_
with the k8s config hack that Pulsar uses.
Please also add the config changes to standalone.conf
Requested change has been performed. PIP-381 is the PIP.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I noticed that this PR adds a lot of info level logs. Just thinking if that could cause log traffic to grow in unexpected ways. In Pulsar, configuring loggers is a bit clumber some, but I think that we should primarily use log4j2 features to achieve it.
btw. SLF4J 2.0 comes with a fluent API that can also support a pattern where the log level is parameterized. However, the performance concerns of using that API are unknown to me. Regarding SLF4J 2.0 API usage, we can use SLF4J 2.0 API on broker side code. However for client side code, we should avoid using the SLF4J 2.0 APIs so that client could continue to use SLF4J 1.7.x implementations in their client apps.
Instead of using the fluent API with a log level parameter, a better approach would be to use a unique logger name for specific log groups since that would give the ability to enable and disable certain type of logging when it's necessary.
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
Outdated
Show resolved
Hide resolved
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
Outdated
Show resolved
Hide resolved
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
Outdated
Show resolved
Hide resolved
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
Outdated
Show resolved
Hide resolved
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
Outdated
Show resolved
Hide resolved
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
Outdated
Show resolved
Hide resolved
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
Show resolved
Hide resolved
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
Outdated
Show resolved
Hide resolved
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
Outdated
Show resolved
Hide resolved
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
||
# Set the compression type to use for cursor info. | ||
# Possible options are NONE, LZ4, ZLIB, ZSTD, SNAPPY | ||
cursorInfoCompressionType=NONE |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we already have lot of ServiceConfig and I guess we should define best compression type and use it by default instead giving tuning parameter. introducing 3 tuning config for a simple feature is not required. I guess we should introduce later if we see the need of this config.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's not break backwards compatibility
# If enabled, the maximum "acknowledgment holes" will not be limited and "acknowledgment holes" are stored in | ||
# multiple entries. | ||
# If enabled, the maximum "acknowledgment holes" (as defined by managedLedgerMaxUnackedRangesToPersist) | ||
# can be stored in multiple entries, allowing the higher limits. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should define that higher limit because again it will confuse user and no one will have any idea about this limit. PR #9292 by default supports up to 10M unack messages in single entry,. So, we should define higher limit as > 10M unack messages
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's not break backwards compatibility given that feature is disabled by default
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how does it break the compatibility by just updating the comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also this is a new feature to write in multiple entry from where backward compatibility came from :-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It writes to the same ledger as existing single entry write.
@@ -45,7 +45,9 @@ public class ManagedLedgerConfig { | |||
private boolean createIfMissing = true; | |||
private int maxUnackedRangesToPersist = 10000; | |||
private int maxBatchDeletedIndexToPersist = 10000; | |||
private String cursorInfoCompressionType = "NONE"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's use the best possible compression algo by performing various tests. performing perf tests for each type for such internal implementation is difficult for any user and it's the author's responsibility to give those numbers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's not break backwards compatibility
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isnt' it part of this new feature? then how come it will impact backward compatibility? if we will introduce it in this feature then we will have compatibility issue and that's what I would like to avoid by not adding it here,
return; | ||
LedgerEntry entry = seq.nextElement(); | ||
ByteBuf data = entry.getEntryBuffer(); | ||
try { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should go into a separate method as the current method is already super long.
callback.operationFailed(createManagedLedgerException(rc1)); | ||
return; | ||
LedgerEntry entry = seq.nextElement(); | ||
ByteBuf data = entry.getEntryBuffer(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we have to release the ByteBuf?
startCreatingNewMetadataLedger(); | ||
break; | ||
case NoLedger: | ||
pendingMarkDeleteOps.add(mdEntry); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extra space in format?
} | ||
|
||
individualDeletedMessages.forEachRawRange((lowerKey, lowerValue, upperKey, upperValue) -> { | ||
acksSerializedSize.addAndGet(16 * 4); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we create a CONSTANT with meaningful name and use it here.?
@@ -3164,58 +3365,252 @@ private List<MLDataFormats.BatchedEntryDeletionIndexInfo> buildBatchEntryDeletio | |||
} | |||
} | |||
|
|||
private void buildBatchEntryDeletionIndexInfoList( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
now, I feel we have multiple implementation of Cursor Recovery and it's a high time to create an interface and move all implementation there as it has create such a huge file with multiple heavy implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While a good idea, this goes beyond the scope of this PR, the blast radius of refactoring with the tests etc is too much to fit into the release schedule and other obligations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm.. I would avoid creating tech debt by adding such heavy implementation in ManagedCursorImpl which is just related to ser/deser of Position ranges.
} | ||
} | ||
|
||
static ByteBuf decompressDataIfNeeded(ByteBuf data, LedgerHandle lh) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we must need a separate interface and implementation to have this heavy implementation in separate file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While a good idea, this goes beyond the scope of this PR, the blast radius of refactoring with the tests etc is too much to fit into the release schedule and other obligations.
import org.apache.bookkeeper.mledger.Position; | ||
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; | ||
|
||
final class PositionInfoUtils { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I request to avoid creating Util Class for Position object as I have seen Position casting into client or in other system eg: Apache Beam connector and Once we have Util class, we are opening can of warm. Instead if we have an interface of CursorRecovery then this entire logic can go into MultiEntryCursorRecoveryImplementation
Class,.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PositionInfoUtils addresses problem that appears when serializing large PositionInfo. IIRC LightProto hogs the memory allocated for larger repeated objects and when the size of PositionInfo drop the memory isn't freed.
This is used just for the serialization in the same compatible way that can be read by protobuf and doesn't create problems for any system that relies on proto definitions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is PositionInfoUtils auto generated? which I don't think and I am kind of confused why it has such complex serialization/deser code which can be avoided if you can use some proto gen framework?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is generated by light proto with serializePositionInfo() crafted manually to avoid some of the inefficiencies. Unfortunately, the generated code couldn't be (re)used directly as some of the required things are hidden with private access.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh.. this seems like a big red flag as we are combining auto generated code with actual code. this can make a nightmare when we will have a change in auto-generated code and we need to merge it with actual code. this is a huge red flag of this PR and we should avoid it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is some sort of trick, but there is no better option at the moment, Protobuf is not able to handle efficiently serialization/deserialization while Lightproto is great.
Unfortunately there is not a good way to let the LightProto compiler fully create this file at build time, as we (I worked on the original patch) had to amend the generated code.
All the tricks are in this file, and not in other places.
As @dlg99 mentioned introducing some interface/abstraction will make the change more complicated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But I would avoid adding such a hack for just this feature in Pulsar. This shows that we should make interface for unack-message store/recovery and make it pluggable so, anyone can add pluggable solution and add a hack if needed. because by just looking at the amount of code, complication and hack for this feature in this PR , it's a great example of making it a pluggable and injecting custom code to keep Pulsar project free from hacks.
overall let us organize code more appropriately and I am not comfortable with such heavy implementation for a simple feature and it should go to separate interface implementation. I will not block this PR but let's not rush to merge PR and please do not merge this PR without addressing such fundamental separation to avoid creating a messy code base. |
Motivation
In some cases cursor position info can be too large to serialize as a single entry, e.g. in case of too many deleted ranges. Also the serialization can be too slow.
cherry-picks of changes by @eolivelli @nicoloboschi and I.
Modifications
Cursor PositionInfo serialization is reworked to produce less garbage/serialize faster; serialized data can be compressed.
In case the serialized data too large it is chunked and saved as a sequence of entries.
Verifying this change
This change added tests.
Does this pull request potentially affect one of the following parts:
NO
If the box was checked, please highlight the changes
Documentation
doc
doc-required
doc-not-needed
doc-complete
Matching PR in forked repository
PR in forked repository: dlg99#17