From 59b92183a311118a2dff8c4eb0ba623afe000394 Mon Sep 17 00:00:00 2001 From: feynmanlin <315157973@qq.com> Date: Thu, 28 Dec 2023 20:09:12 +0800 Subject: [PATCH] Fix sending lag caused by thread safety --- .../handlers/kop/storage/ReplicaManager.java | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ReplicaManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ReplicaManager.java index 0dadd1bfc8..6c877affe7 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ReplicaManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ReplicaManager.java @@ -26,6 +26,15 @@ import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperation; import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationKey; import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationPurgatory; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiConsumer; +import java.util.function.Function; +import java.util.stream.Collectors; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.common.util.OrderedExecutor; @@ -46,16 +55,6 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.common.util.FutureUtil; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.BiConsumer; -import java.util.function.Function; -import java.util.stream.Collectors; - /** * Used to append records. Mapping to Kafka ReplicaManager.scala. */