-
Notifications
You must be signed in to change notification settings - Fork 138
Do not throw recursive update exception when producer state recovery failed #1982
base: master
Are you sure you want to change the base?
Do not throw recursive update exception when producer state recovery failed #1982
Conversation
…failed ### Motivation When transaction is enabled, `PartitionLog#initialise` will recover the state from the local snapshot. It's an asynchronous operation that could fail. In this case, an "recursive update" `IllegalStateException` will be thrown, which is unexpected. ``` Suppressed: java.lang.IllegalStateException: Recursive update at java.util.concurrent.ConcurrentHashMap.replaceNode(ConcurrentHashMap.java:1167) ~[?:?] at java.util.concurrent.ConcurrentHashMap.remove(ConcurrentHashMap.java:1552) ~[?:?] at io.streamnative.pulsar.handlers.kop.storage.PartitionLogManager.lambda$getLog$0(PartitionLogManager.java:88) ~[?:?] ``` The reason is that in `PartitionLogManager#getLog`, `logMap.remove` is called in the callback of `whenComplete`, which could be called in the same thread. Then the `remove` method is just called in the 2nd argument of `computeIfAbsent`. https://github.com/streamnative/kop/blob/3602c9e826d903d97091af1cc608b9d88c1b8cf3/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLogManager.java#L88 ### Modifications Store the future of `PartitionLog` in `PartitionLogManager`, move the `remove` call out of the `computeIfAbsent` in the `exceptionally` callback of `ReplicaManager#getPartitionLog`.
@BewareMyPower:Thanks for your contribution. For this PR, do we need to update docs? |
1 similar comment
@BewareMyPower:Thanks for your contribution. For this PR, do we need to update docs? |
Codecov Report
@@ Coverage Diff @@
## master #1982 +/- ##
============================================
+ Coverage 17.72% 17.75% +0.03%
- Complexity 751 752 +1
============================================
Files 195 195
Lines 14156 14146 -10
Branches 1322 1319 -3
============================================
+ Hits 2509 2512 +3
+ Misses 11464 11452 -12
+ Partials 183 182 -1
|
} catch (InterruptedException e) { | ||
Thread.currentThread().interrupt(); | ||
} catch (ExecutionException e) { | ||
log.error("Failed to get PartitionLog for {} under {}", topicPartition, namespacePrefix, e.getCause()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we need to remove the failed future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will be removed in the exceptionally
callback of the future returned.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR has a serious problem so that it's draft now.
Motivation
When transaction is enabled,
PartitionLog#initialise
will recover the state from the local snapshot. It's an asynchronous operation that could fail. In this case, an "recursive update"IllegalStateException
will be thrown, which is unexpected.The reason is that in
PartitionLogManager#getLog
,logMap.remove
is called in the callback ofwhenComplete
, which could be called in the same thread. Then theremove
method is just called in the 2nd argument ofcomputeIfAbsent
.kop/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLogManager.java
Line 88 in 3602c9e
Modifications
Store the future of
PartitionLog
inPartitionLogManager
, move theremove
call out of thecomputeIfAbsent
in theexceptionally
callback ofReplicaManager#getPartitionLog
.