From 460696f233c6e256743966e060e9b570a87479f4 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 27 Jul 2023 20:49:19 +0800 Subject: [PATCH 1/2] Do not throw recursive update exception when producer state recovery failed ### Motivation When transaction is enabled, `PartitionLog#initialise` will recover the state from the local snapshot. It's an asynchronous operation that could fail. In this case, an "recursive update" `IllegalStateException` will be thrown, which is unexpected. ``` Suppressed: java.lang.IllegalStateException: Recursive update at java.util.concurrent.ConcurrentHashMap.replaceNode(ConcurrentHashMap.java:1167) ~[?:?] at java.util.concurrent.ConcurrentHashMap.remove(ConcurrentHashMap.java:1552) ~[?:?] at io.streamnative.pulsar.handlers.kop.storage.PartitionLogManager.lambda$getLog$0(PartitionLogManager.java:88) ~[?:?] ``` The reason is that in `PartitionLogManager#getLog`, `logMap.remove` is called in the callback of `whenComplete`, which could be called in the same thread. Then the `remove` method is just called in the 2nd argument of `computeIfAbsent`. https://github.com/streamnative/kop/blob/3602c9e826d903d97091af1cc608b9d88c1b8cf3/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLogManager.java#L88 ### Modifications Store the future of `PartitionLog` in `PartitionLogManager`, move the `remove` call out of the `computeIfAbsent` in the `exceptionally` callback of `ReplicaManager#getPartitionLog`. --- .../handlers/kop/storage/PartitionLog.java | 12 ---- .../kop/storage/PartitionLogManager.java | 55 +++++-------------- .../handlers/kop/storage/ReplicaManager.java | 24 +++++--- .../transaction/TransactionTest.java | 27 ++++----- 4 files changed, 38 insertions(+), 80 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLog.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLog.java index 3b5b50c390..a0454e706c 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLog.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLog.java @@ -180,18 +180,6 @@ public CompletableFuture initialise() { return initFuture; } - public CompletableFuture awaitInitialisation() { - return initFuture; - } - - public boolean isInitialised() { - return initFuture.isDone() && !initFuture.isCompletedExceptionally(); - } - - public boolean isInitialisationFailed() { - return initFuture.isDone() && initFuture.isCompletedExceptionally(); - } - private CompletableFuture loadTopicProperties() { CompletableFuture> persistentTopicFuture = kafkaTopicLookupService.getTopic(fullPartitionName, this); diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLogManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLogManager.java index 02144d60c4..bf7a3b7127 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLogManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLogManager.java @@ -13,15 +13,15 @@ */ package io.streamnative.pulsar.handlers.kop.storage; -import com.google.common.collect.Maps; +import com.google.common.annotations.VisibleForTesting; import io.streamnative.pulsar.handlers.kop.KafkaServiceConfiguration; import io.streamnative.pulsar.handlers.kop.KafkaTopicLookupService; import io.streamnative.pulsar.handlers.kop.RequestStats; import io.streamnative.pulsar.handlers.kop.utils.KopTopic; -import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -30,7 +30,6 @@ import org.apache.kafka.common.utils.Time; import org.apache.pulsar.broker.service.plugin.EntryFilter; import org.apache.pulsar.common.naming.TopicName; -import org.apache.pulsar.common.util.FutureUtil; /** * Manage {@link PartitionLog}. @@ -39,9 +38,9 @@ @Slf4j public class PartitionLogManager { + private final Map> logMap = new ConcurrentHashMap<>(); private final KafkaServiceConfiguration kafkaConfig; private final RequestStats requestStats; - private final Map logMap; private final Time time; private final List entryFilters; @@ -60,7 +59,6 @@ public PartitionLogManager(KafkaServiceConfiguration kafkaConfig, OrderedExecutor recoveryExecutor) { this.kafkaConfig = kafkaConfig; this.requestStats = requestStats; - this.logMap = Maps.newConcurrentMap(); this.entryFilters = entryFilters; this.time = time; this.kafkaTopicLookupService = kafkaTopicLookupService; @@ -68,54 +66,27 @@ public PartitionLogManager(KafkaServiceConfiguration kafkaConfig, this.recoveryExecutor = recoveryExecutor; } - public PartitionLog getLog(TopicPartition topicPartition, String namespacePrefix) { + public CompletableFuture getLog(TopicPartition topicPartition, String namespacePrefix) { String kopTopic = KopTopic.toString(topicPartition, namespacePrefix); String tenant = TopicName.get(kopTopic).getTenant(); ProducerStateManagerSnapshotBuffer prodPerTenant = producerStateManagerSnapshotBuffer.apply(tenant); - PartitionLog res = logMap.computeIfAbsent(kopTopic, key -> { - PartitionLog partitionLog = new PartitionLog(kafkaConfig, requestStats, - time, topicPartition, key, entryFilters, - kafkaTopicLookupService, - prodPerTenant, recoveryExecutor); - - CompletableFuture initialiseResult = partitionLog - .initialise(); - - initialiseResult.whenComplete((___, error) -> { - if (error != null) { - // in case of failure we have to remove the CompletableFuture from the map - log.error("Failed to recovery of {}", key, error); - logMap.remove(key, partitionLog); - } - }); - - return partitionLog; - }); - if (res.isInitialisationFailed()) { - log.error("Failed to initialize of {}", kopTopic); - logMap.remove(kopTopic, res); - } - return res; + return logMap.computeIfAbsent(kopTopic, key -> new PartitionLog( + kafkaConfig, requestStats, time, topicPartition, key, entryFilters, kafkaTopicLookupService, + prodPerTenant, recoveryExecutor + ).initialise()); } - public PartitionLog removeLog(String topicName) { - log.info("removePartitionLog {}", topicName); + CompletableFuture removeLog(String topicName) { return logMap.remove(topicName); } - public int size() { + @VisibleForTesting + int size() { return logMap.size(); } - public CompletableFuture updatePurgeAbortedTxnsOffsets() { - List> handles = new ArrayList<>(); - logMap.values().forEach(log -> { - if (log.isInitialised()) { - handles.add(log.updatePurgeAbortedTxnsOffset()); - } - }); - return FutureUtil - .waitForAll(handles); + void updatePurgeAbortedTxnsOffsets() { + logMap.values().forEach(future -> future.thenApply(PartitionLog::updatePurgeAbortedTxnsOffset)); } } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ReplicaManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ReplicaManager.java index 03f9939882..8c7ed41fa0 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ReplicaManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ReplicaManager.java @@ -79,13 +79,13 @@ public ReplicaManager(KafkaServiceConfiguration kafkaConfig, this.metadataNamespace = kafkaConfig.getKafkaMetadataNamespace(); } - public PartitionLog getPartitionLog(TopicPartition topicPartition, - String namespacePrefix) { + @VisibleForTesting + public CompletableFuture getPartitionLog(TopicPartition topicPartition, String namespacePrefix) { return logManager.getLog(topicPartition, namespacePrefix); } public void removePartitionLog(String topicName) { - PartitionLog partitionLog = logManager.removeLog(topicName); + CompletableFuture partitionLog = logManager.removeLog(topicName); if (log.isDebugEnabled() && partitionLog != null) { log.debug("PartitionLog: {} has bean removed.", topicName); } @@ -168,8 +168,9 @@ public CompletableFuture> addPartitionResponse.accept(topicPartition, new ProduceResponse.PartitionResponse( Errors.forException(new InvalidTopicException( String.format("Cannot append to internal topic %s", topicPartition.topic()))))); - } else { - PartitionLog partitionLog = getPartitionLog(topicPartition, namespacePrefix); + return; + } + getPartitionLog(topicPartition, namespacePrefix).thenAccept(partitionLog -> { if (requiredAcks == 0) { partitionLog.appendRecords(memoryRecords, origin, appendRecordsContext); return; @@ -208,7 +209,13 @@ public CompletableFuture> } return null; }); - } + }).exceptionally(e -> { + log.error("Failed to get PartitionLog for {}", topicPartition, e); + logManager.removeLog(fullPartitionName); + addPartitionResponse.accept(topicPartition, + new ProduceResponse.PartitionResponse(Errors.UNKNOWN_SERVER_ERROR)); + return null; + }); }); // delay produce if (timeout <= 0) { @@ -307,7 +314,6 @@ public CompletableFuture> re }; readPartitionInfo.forEach((tp, fetchInfo) -> { getPartitionLog(tp, context.getNamespacePrefix()) - .awaitInitialisation() .whenComplete((partitionLog, failed) ->{ if (failed != null) { result.put(tp, @@ -337,8 +343,8 @@ public void tryCompleteDelayedFetch(DelayedOperationKey key) { } } - public CompletableFuture updatePurgeAbortedTxnsOffsets() { - return logManager.updatePurgeAbortedTxnsOffsets(); + public void updatePurgeAbortedTxnsOffsets() { + logManager.updatePurgeAbortedTxnsOffsets(); } } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionTest.java index f043c56330..930ab223ed 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionTest.java @@ -477,7 +477,7 @@ private void takeSnapshot(String topicName) throws Exception { for (int i = 0; i < numPartitions; i++) { PartitionLog partitionLog = protocolHandler .getReplicaManager() - .getPartitionLog(new TopicPartition(topicName, i), tenant + "/" + namespace); + .getPartitionLog(new TopicPartition(topicName, i), tenant + "/" + namespace).join(); // we can only take the snapshot on the only thread that is allowed to process mutations // on the state @@ -728,7 +728,7 @@ public void basicRecoveryAbortedTransactionDueToProducerTimedOut(boolean takeSna consumeTxnMessage(topicName, 2, lastMessage, isolation); } - @Test(timeOut = 10000, dataProvider = "takeSnapshotBeforeRecovery") + @Test(timeOut = 20000, dataProvider = "takeSnapshotBeforeRecovery") public void testPurgeAbortedTx(boolean takeSnapshotBeforeRecovery) throws Exception { String topicName = "testPurgeAbortedTx_" + takeSnapshotBeforeRecovery; @@ -760,8 +760,7 @@ public void testPurgeAbortedTx(boolean takeSnapshotBeforeRecovery) throws Except PartitionLog partitionLog = protocolHandler .getReplicaManager() - .getPartitionLog(topicPartition, namespacePrefix); - partitionLog.awaitInitialisation().get(); + .getPartitionLog(topicPartition, namespacePrefix).join(); assertEquals(0, partitionLog.fetchOldestAvailableIndexFromTopic().get().longValue()); List abortedIndexList = @@ -795,8 +794,7 @@ public void testPurgeAbortedTx(boolean takeSnapshotBeforeRecovery) throws Except partitionLog = protocolHandler .getReplicaManager() - .getPartitionLog(topicPartition, namespacePrefix); - partitionLog.awaitInitialisation().get(); + .getPartitionLog(topicPartition, namespacePrefix).join(); assertEquals(0L, partitionLog.fetchOldestAvailableIndexFromTopic().get().longValue()); abortedIndexList = @@ -821,8 +819,7 @@ public void testPurgeAbortedTx(boolean takeSnapshotBeforeRecovery) throws Except // validate that the topic has been trimmed partitionLog = protocolHandler .getReplicaManager() - .getPartitionLog(topicPartition, namespacePrefix); - partitionLog.awaitInitialisation().get(); + .getPartitionLog(topicPartition, namespacePrefix).join(); assertEquals(0L, partitionLog.fetchOldestAvailableIndexFromTopic().get().longValue()); // all the messages up to here will be trimmed @@ -832,7 +829,7 @@ public void testPurgeAbortedTx(boolean takeSnapshotBeforeRecovery) throws Except assertSame(partitionLog, protocolHandler .getReplicaManager() - .getPartitionLog(topicPartition, namespacePrefix)); + .getPartitionLog(topicPartition, namespacePrefix).join()); assertEquals(7L, partitionLog.fetchOldestAvailableIndexFromTopic().get().longValue()); abortedIndexList = @@ -851,8 +848,7 @@ public void testPurgeAbortedTx(boolean takeSnapshotBeforeRecovery) throws Except partitionLog = protocolHandler .getReplicaManager() - .getPartitionLog(topicPartition, namespacePrefix); - partitionLog.awaitInitialisation().get(); + .getPartitionLog(topicPartition, namespacePrefix).join(); assertEquals(8L, partitionLog.fetchOldestAvailableIndexFromTopic().get().longValue()); // this TX is aborted and must not be purged @@ -883,8 +879,7 @@ public void testPurgeAbortedTx(boolean takeSnapshotBeforeRecovery) throws Except partitionLog = protocolHandler .getReplicaManager() - .getPartitionLog(topicPartition, namespacePrefix); - partitionLog.awaitInitialisation().get(); + .getPartitionLog(topicPartition, namespacePrefix).join(); // verify that we have 2 aborted TX in memory assertTrue(partitionLog.getProducerStateManager().hasSomeAbortedTransactions()); @@ -1058,8 +1053,7 @@ public void testRecoverFromInvalidSnapshotAfterTrim() throws Exception { pulsar.getProtocolHandlers().protocol("kafka"); PartitionLog partitionLog = protocolHandler .getReplicaManager() - .getPartitionLog(topicPartition, namespacePrefix); - partitionLog.awaitInitialisation().get(); + .getPartitionLog(topicPartition, namespacePrefix).join(); assertEquals(0L, partitionLog.fetchOldestAvailableIndexFromTopic().get().longValue()); // all the messages up to here will be trimmed @@ -1078,8 +1072,7 @@ public void testRecoverFromInvalidSnapshotAfterTrim() throws Exception { partitionLog = protocolHandler .getReplicaManager() - .getPartitionLog(topicPartition, namespacePrefix); - partitionLog.awaitInitialisation().get(); + .getPartitionLog(topicPartition, namespacePrefix).join(); assertEquals(8L, partitionLog.fetchOldestAvailableIndexFromTopic().get().longValue()); // use a new consumer group, it will read from the beginning of the topic From e6488617c01de32daf09dddd96127885eab786fa Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 27 Jul 2023 22:38:33 +0800 Subject: [PATCH 2/2] Fix message disorder --- .../handlers/kop/storage/PartitionLogManager.java | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLogManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLogManager.java index bf7a3b7127..40723b641c 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLogManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLogManager.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.function.Function; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -70,10 +71,19 @@ public CompletableFuture getLog(TopicPartition topicPartition, Str String kopTopic = KopTopic.toString(topicPartition, namespacePrefix); String tenant = TopicName.get(kopTopic).getTenant(); ProducerStateManagerSnapshotBuffer prodPerTenant = producerStateManagerSnapshotBuffer.apply(tenant); - return logMap.computeIfAbsent(kopTopic, key -> new PartitionLog( + CompletableFuture future = logMap.computeIfAbsent(kopTopic, key -> new PartitionLog( kafkaConfig, requestStats, time, topicPartition, key, entryFilters, kafkaTopicLookupService, prodPerTenant, recoveryExecutor ).initialise()); + // We need to wait here to avoid the message disorder + try { + future.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (ExecutionException e) { + log.error("Failed to get PartitionLog for {} under {}", topicPartition, namespacePrefix, e.getCause()); + } + return future; } CompletableFuture removeLog(String topicName) {