Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Do not throw recursive update exception when producer state recovery failed #1982

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -180,18 +180,6 @@ public CompletableFuture<PartitionLog> initialise() {
return initFuture;
}

public CompletableFuture<PartitionLog> awaitInitialisation() {
return initFuture;
}

public boolean isInitialised() {
return initFuture.isDone() && !initFuture.isCompletedExceptionally();
}

public boolean isInitialisationFailed() {
return initFuture.isDone() && initFuture.isCompletedExceptionally();
}

private CompletableFuture<Void> loadTopicProperties() {
CompletableFuture<Optional<PersistentTopic>> persistentTopicFuture =
kafkaTopicLookupService.getTopic(fullPartitionName, this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,16 @@
*/
package io.streamnative.pulsar.handlers.kop.storage;

import com.google.common.collect.Maps;
import com.google.common.annotations.VisibleForTesting;
import io.streamnative.pulsar.handlers.kop.KafkaServiceConfiguration;
import io.streamnative.pulsar.handlers.kop.KafkaTopicLookupService;
import io.streamnative.pulsar.handlers.kop.RequestStats;
import io.streamnative.pulsar.handlers.kop.utils.KopTopic;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -30,7 +31,6 @@
import org.apache.kafka.common.utils.Time;
import org.apache.pulsar.broker.service.plugin.EntryFilter;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;

/**
* Manage {@link PartitionLog}.
Expand All @@ -39,9 +39,9 @@
@Slf4j
public class PartitionLogManager {

private final Map<String, CompletableFuture<PartitionLog>> logMap = new ConcurrentHashMap<>();
private final KafkaServiceConfiguration kafkaConfig;
private final RequestStats requestStats;
private final Map<String, PartitionLog> logMap;
private final Time time;
private final List<EntryFilter> entryFilters;

Expand All @@ -60,62 +60,43 @@ public PartitionLogManager(KafkaServiceConfiguration kafkaConfig,
OrderedExecutor recoveryExecutor) {
this.kafkaConfig = kafkaConfig;
this.requestStats = requestStats;
this.logMap = Maps.newConcurrentMap();
this.entryFilters = entryFilters;
this.time = time;
this.kafkaTopicLookupService = kafkaTopicLookupService;
this.producerStateManagerSnapshotBuffer = producerStateManagerSnapshotBuffer;
this.recoveryExecutor = recoveryExecutor;
}

public PartitionLog getLog(TopicPartition topicPartition, String namespacePrefix) {
public CompletableFuture<PartitionLog> getLog(TopicPartition topicPartition, String namespacePrefix) {
String kopTopic = KopTopic.toString(topicPartition, namespacePrefix);
String tenant = TopicName.get(kopTopic).getTenant();
ProducerStateManagerSnapshotBuffer prodPerTenant = producerStateManagerSnapshotBuffer.apply(tenant);
PartitionLog res = logMap.computeIfAbsent(kopTopic, key -> {
PartitionLog partitionLog = new PartitionLog(kafkaConfig, requestStats,
time, topicPartition, key, entryFilters,
kafkaTopicLookupService,
prodPerTenant, recoveryExecutor);

CompletableFuture<PartitionLog> initialiseResult = partitionLog
.initialise();

initialiseResult.whenComplete((___, error) -> {
if (error != null) {
// in case of failure we have to remove the CompletableFuture from the map
log.error("Failed to recovery of {}", key, error);
logMap.remove(key, partitionLog);
}
});

return partitionLog;
});
if (res.isInitialisationFailed()) {
log.error("Failed to initialize of {}", kopTopic);
logMap.remove(kopTopic, res);
CompletableFuture<PartitionLog> future = logMap.computeIfAbsent(kopTopic, key -> new PartitionLog(
kafkaConfig, requestStats, time, topicPartition, key, entryFilters, kafkaTopicLookupService,
prodPerTenant, recoveryExecutor
).initialise());
// We need to wait here to avoid the message disorder
try {
future.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
log.error("Failed to get PartitionLog for {} under {}", topicPartition, namespacePrefix, e.getCause());
Copy link
Contributor

@gaoran10 gaoran10 Jul 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we need to remove the failed future.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be removed in the exceptionally callback of the future returned.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR has a serious problem so that it's draft now.

}
return res;
return future;
}

public PartitionLog removeLog(String topicName) {
log.info("removePartitionLog {}", topicName);
CompletableFuture<PartitionLog> removeLog(String topicName) {
return logMap.remove(topicName);
}

public int size() {
@VisibleForTesting
int size() {
return logMap.size();
}

public CompletableFuture<?> updatePurgeAbortedTxnsOffsets() {
List<CompletableFuture<?>> handles = new ArrayList<>();
logMap.values().forEach(log -> {
if (log.isInitialised()) {
handles.add(log.updatePurgeAbortedTxnsOffset());
}
});
return FutureUtil
.waitForAll(handles);
void updatePurgeAbortedTxnsOffsets() {
logMap.values().forEach(future -> future.thenApply(PartitionLog::updatePurgeAbortedTxnsOffset));
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,13 @@ public ReplicaManager(KafkaServiceConfiguration kafkaConfig,
this.metadataNamespace = kafkaConfig.getKafkaMetadataNamespace();
}

public PartitionLog getPartitionLog(TopicPartition topicPartition,
String namespacePrefix) {
@VisibleForTesting
public CompletableFuture<PartitionLog> getPartitionLog(TopicPartition topicPartition, String namespacePrefix) {
return logManager.getLog(topicPartition, namespacePrefix);
}

public void removePartitionLog(String topicName) {
PartitionLog partitionLog = logManager.removeLog(topicName);
CompletableFuture<PartitionLog> partitionLog = logManager.removeLog(topicName);
if (log.isDebugEnabled() && partitionLog != null) {
log.debug("PartitionLog: {} has bean removed.", topicName);
}
Expand Down Expand Up @@ -168,8 +168,9 @@ public CompletableFuture<Map<TopicPartition, ProduceResponse.PartitionResponse>>
addPartitionResponse.accept(topicPartition, new ProduceResponse.PartitionResponse(
Errors.forException(new InvalidTopicException(
String.format("Cannot append to internal topic %s", topicPartition.topic())))));
} else {
PartitionLog partitionLog = getPartitionLog(topicPartition, namespacePrefix);
return;
}
getPartitionLog(topicPartition, namespacePrefix).thenAccept(partitionLog -> {
if (requiredAcks == 0) {
partitionLog.appendRecords(memoryRecords, origin, appendRecordsContext);
return;
Expand Down Expand Up @@ -208,7 +209,13 @@ public CompletableFuture<Map<TopicPartition, ProduceResponse.PartitionResponse>>
}
return null;
});
}
}).exceptionally(e -> {
log.error("Failed to get PartitionLog for {}", topicPartition, e);
logManager.removeLog(fullPartitionName);
addPartitionResponse.accept(topicPartition,
new ProduceResponse.PartitionResponse(Errors.UNKNOWN_SERVER_ERROR));
return null;
});
});
// delay produce
if (timeout <= 0) {
Expand Down Expand Up @@ -307,7 +314,6 @@ public CompletableFuture<Map<TopicPartition, PartitionLog.ReadRecordsResult>> re
};
readPartitionInfo.forEach((tp, fetchInfo) -> {
getPartitionLog(tp, context.getNamespacePrefix())
.awaitInitialisation()
.whenComplete((partitionLog, failed) ->{
if (failed != null) {
result.put(tp,
Expand Down Expand Up @@ -337,8 +343,8 @@ public void tryCompleteDelayedFetch(DelayedOperationKey key) {
}
}

public CompletableFuture<?> updatePurgeAbortedTxnsOffsets() {
return logManager.updatePurgeAbortedTxnsOffsets();
public void updatePurgeAbortedTxnsOffsets() {
logManager.updatePurgeAbortedTxnsOffsets();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ private void takeSnapshot(String topicName) throws Exception {
for (int i = 0; i < numPartitions; i++) {
PartitionLog partitionLog = protocolHandler
.getReplicaManager()
.getPartitionLog(new TopicPartition(topicName, i), tenant + "/" + namespace);
.getPartitionLog(new TopicPartition(topicName, i), tenant + "/" + namespace).join();

// we can only take the snapshot on the only thread that is allowed to process mutations
// on the state
Expand Down Expand Up @@ -728,7 +728,7 @@ public void basicRecoveryAbortedTransactionDueToProducerTimedOut(boolean takeSna
consumeTxnMessage(topicName, 2, lastMessage, isolation);
}

@Test(timeOut = 10000, dataProvider = "takeSnapshotBeforeRecovery")
@Test(timeOut = 20000, dataProvider = "takeSnapshotBeforeRecovery")
public void testPurgeAbortedTx(boolean takeSnapshotBeforeRecovery) throws Exception {

String topicName = "testPurgeAbortedTx_" + takeSnapshotBeforeRecovery;
Expand Down Expand Up @@ -760,8 +760,7 @@ public void testPurgeAbortedTx(boolean takeSnapshotBeforeRecovery) throws Except

PartitionLog partitionLog = protocolHandler
.getReplicaManager()
.getPartitionLog(topicPartition, namespacePrefix);
partitionLog.awaitInitialisation().get();
.getPartitionLog(topicPartition, namespacePrefix).join();
assertEquals(0, partitionLog.fetchOldestAvailableIndexFromTopic().get().longValue());

List<FetchResponse.AbortedTransaction> abortedIndexList =
Expand Down Expand Up @@ -795,8 +794,7 @@ public void testPurgeAbortedTx(boolean takeSnapshotBeforeRecovery) throws Except

partitionLog = protocolHandler
.getReplicaManager()
.getPartitionLog(topicPartition, namespacePrefix);
partitionLog.awaitInitialisation().get();
.getPartitionLog(topicPartition, namespacePrefix).join();
assertEquals(0L, partitionLog.fetchOldestAvailableIndexFromTopic().get().longValue());

abortedIndexList =
Expand All @@ -821,8 +819,7 @@ public void testPurgeAbortedTx(boolean takeSnapshotBeforeRecovery) throws Except
// validate that the topic has been trimmed
partitionLog = protocolHandler
.getReplicaManager()
.getPartitionLog(topicPartition, namespacePrefix);
partitionLog.awaitInitialisation().get();
.getPartitionLog(topicPartition, namespacePrefix).join();
assertEquals(0L, partitionLog.fetchOldestAvailableIndexFromTopic().get().longValue());

// all the messages up to here will be trimmed
Expand All @@ -832,7 +829,7 @@ public void testPurgeAbortedTx(boolean takeSnapshotBeforeRecovery) throws Except

assertSame(partitionLog, protocolHandler
.getReplicaManager()
.getPartitionLog(topicPartition, namespacePrefix));
.getPartitionLog(topicPartition, namespacePrefix).join());

assertEquals(7L, partitionLog.fetchOldestAvailableIndexFromTopic().get().longValue());
abortedIndexList =
Expand All @@ -851,8 +848,7 @@ public void testPurgeAbortedTx(boolean takeSnapshotBeforeRecovery) throws Except

partitionLog = protocolHandler
.getReplicaManager()
.getPartitionLog(topicPartition, namespacePrefix);
partitionLog.awaitInitialisation().get();
.getPartitionLog(topicPartition, namespacePrefix).join();
assertEquals(8L, partitionLog.fetchOldestAvailableIndexFromTopic().get().longValue());

// this TX is aborted and must not be purged
Expand Down Expand Up @@ -883,8 +879,7 @@ public void testPurgeAbortedTx(boolean takeSnapshotBeforeRecovery) throws Except

partitionLog = protocolHandler
.getReplicaManager()
.getPartitionLog(topicPartition, namespacePrefix);
partitionLog.awaitInitialisation().get();
.getPartitionLog(topicPartition, namespacePrefix).join();

// verify that we have 2 aborted TX in memory
assertTrue(partitionLog.getProducerStateManager().hasSomeAbortedTransactions());
Expand Down Expand Up @@ -1058,8 +1053,7 @@ public void testRecoverFromInvalidSnapshotAfterTrim() throws Exception {
pulsar.getProtocolHandlers().protocol("kafka");
PartitionLog partitionLog = protocolHandler
.getReplicaManager()
.getPartitionLog(topicPartition, namespacePrefix);
partitionLog.awaitInitialisation().get();
.getPartitionLog(topicPartition, namespacePrefix).join();
assertEquals(0L, partitionLog.fetchOldestAvailableIndexFromTopic().get().longValue());

// all the messages up to here will be trimmed
Expand All @@ -1078,8 +1072,7 @@ public void testRecoverFromInvalidSnapshotAfterTrim() throws Exception {

partitionLog = protocolHandler
.getReplicaManager()
.getPartitionLog(topicPartition, namespacePrefix);
partitionLog.awaitInitialisation().get();
.getPartitionLog(topicPartition, namespacePrefix).join();
assertEquals(8L, partitionLog.fetchOldestAvailableIndexFromTopic().get().longValue());

// use a new consumer group, it will read from the beginning of the topic
Expand Down