From 930e502d3b67c1be46297811bb7adf1bc00943e6 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Thu, 2 Mar 2023 10:40:37 +0100 Subject: [PATCH 1/4] Update Kafka wireprotocol to 3.4.0 and implement KIP-699 and KIP-709 (cherry picked from commit 71b77b2fd65fd1a2d481ed7067939fab3c0ad002) --- .../pulsar/handlers/kop/DelayedFetch.java | 6 +- .../handlers/kop/KafkaCommandDecoder.java | 9 +- .../handlers/kop/KafkaRequestHandler.java | 319 +++++++++++++----- .../handlers/kop/storage/PartitionLog.java | 76 +++-- .../kop/storage/ProducerStateManager.java | 10 +- .../handlers/kop/storage/ReplicaManager.java | 6 +- .../kop/utils/KafkaResponseUtils.java | 111 +++++- .../kop/utils/MessageMetadataUtils.java | 4 + .../requests/ResponseCallbackWrapper.java | 5 + .../kop/format/EntryFormatterTest.java | 10 +- pom.xml | 2 +- .../kop/EntryPublishTimeKafkaFormatTest.java | 2 +- .../pulsar/handlers/kop/KafkaApisTest.java | 77 +++-- .../handlers/kop/KafkaCommonTestUtils.java | 11 +- .../handlers/kop/KafkaRequestHandlerTest.java | 2 +- ...kaRequestHandlerWithAuthorizationTest.java | 108 ++++-- .../kop/KopProtocolHandlerTestBase.java | 4 +- .../TransactionStateManagerTest.java | 2 + .../transaction/TransactionTest.java | 20 +- .../kop/e2e/DistributedClusterTest.java | 1 + .../kop/metrics/MetricsProviderTest.java | 6 +- .../kop/streams/GlobalKTableTest.java | 14 +- .../kop/streams/KStreamAggregationTest.java | 31 +- .../handlers/kop/streams/KTableTest.java | 7 +- .../kop/streams/KafkaStreamsTestBase.java | 4 +- 25 files changed, 623 insertions(+), 224 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/DelayedFetch.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/DelayedFetch.java index 4c88e801ce..edd7f516b0 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/DelayedFetch.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/DelayedFetch.java @@ -24,7 +24,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.common.message.FetchRequestData; @Slf4j public class DelayedFetch extends DelayedOperation { @@ -33,7 +33,7 @@ public class DelayedFetch extends DelayedOperation { private final long bytesReadable; private final int fetchMaxBytes; private final boolean readCommitted; - private final Map readPartitionInfo; + private final Map readPartitionInfo; private final Map readRecordsResult; private final MessageFetchContext context; protected volatile Boolean hasError; @@ -55,7 +55,7 @@ public DelayedFetch(final long delayMs, final boolean readCommitted, final MessageFetchContext context, final ReplicaManager replicaManager, - final Map readPartitionInfo, + final Map readPartitionInfo, final Map readRecordsResult, final CompletableFuture> callback) { super(delayMs, Optional.empty()); diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java index 2157782e99..ad7b3f9d61 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java @@ -454,7 +454,14 @@ protected void writeAndFlushResponseToClient(Channel channel) { request, response); } - final ByteBuf result = responseToByteBuf(response, request, true); + final ByteBuf result; + try { + result = responseToByteBuf(response, request, true); + } catch (Throwable error) { + log.error("[{}] Failed to convert response {} to ByteBuf", channel, response, error); + sendErrorResponse(request, channel, error, true); + return; + } final int resultSize = result.readableBytes(); channel.writeAndFlush(result).addListener(future -> { if (response instanceof ResponseCallbackWrapper) { diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java index 9cffb86d0b..466da180d0 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java @@ -17,6 +17,7 @@ import static com.google.common.base.Preconditions.checkState; import static io.streamnative.pulsar.handlers.kop.KafkaServiceConfiguration.TENANT_ALLNAMESPACES_PLACEHOLDER; import static io.streamnative.pulsar.handlers.kop.KafkaServiceConfiguration.TENANT_PLACEHOLDER; +import static io.streamnative.pulsar.handlers.kop.utils.KafkaResponseUtils.buildOffsetFetchResponse; import static java.nio.charset.StandardCharsets.UTF_8; import com.google.common.annotations.VisibleForTesting; @@ -58,7 +59,6 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -116,6 +116,9 @@ import org.apache.kafka.common.message.DescribeConfigsResponseData; import org.apache.kafka.common.message.EndTxnRequestData; import org.apache.kafka.common.message.EndTxnResponseData; +import org.apache.kafka.common.message.FetchRequestData; +import org.apache.kafka.common.message.FetchResponseData; +import org.apache.kafka.common.message.FindCoordinatorResponseData; import org.apache.kafka.common.message.InitProducerIdRequestData; import org.apache.kafka.common.message.InitProducerIdResponseData; import org.apache.kafka.common.message.JoinGroupRequestData; @@ -134,7 +137,6 @@ import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.MutableRecordBatch; import org.apache.kafka.common.record.RecordBatch; -import org.apache.kafka.common.record.Records; import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.requests.AbstractResponse; import org.apache.kafka.common.requests.AddOffsetsToTxnRequest; @@ -959,18 +961,53 @@ protected void handleFindCoordinatorRequest(KafkaHeaderAndRequest findCoordinato checkArgument(findCoordinator.getRequest() instanceof FindCoordinatorRequest); FindCoordinatorRequest request = (FindCoordinatorRequest) findCoordinator.getRequest(); + List coordinatorKeys = request.version() < FindCoordinatorRequest.MIN_BATCHED_VERSION + ? Collections.singletonList(request.data().key()) : request.data().coordinatorKeys(); + + List> futures = + new ArrayList<>(coordinatorKeys.size()); + for (String coordinatorKey : coordinatorKeys) { + CompletableFuture future = + findSingleCoordinator(coordinatorKey, findCoordinator); + futures.add(future); + } + + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) + .whenComplete((ignore, ex) -> { + if (ex != null) { + resultFuture.completeExceptionally(ex); + return; + } + List coordinators = new ArrayList<>(futures.size()); + for (CompletableFuture future : futures) { + coordinators.add(future.join()); + } + resultFuture.complete(KafkaResponseUtils.newFindCoordinator(coordinators, request.version())); + }); + + } + + private CompletableFuture findSingleCoordinator( + String coordinatorKey, KafkaHeaderAndRequest findCoordinator) { + + FindCoordinatorRequest request = (FindCoordinatorRequest) findCoordinator.getRequest(); + CompletableFuture findSingleCoordinatorResult = + new CompletableFuture<>(); + if (request.data().keyType() == FindCoordinatorRequest.CoordinatorType.TRANSACTION.id()) { TransactionCoordinator transactionCoordinator = getTransactionCoordinator(); - int partition = transactionCoordinator.partitionFor(request.data().key()); + int partition = transactionCoordinator.partitionFor(coordinatorKey); String pulsarTopicName = transactionCoordinator.getTopicPartitionName(partition); findBroker(TopicName.get(pulsarTopicName)) .whenComplete((KafkaResponseUtils.BrokerLookupResult result, Throwable throwable) -> { if (result.error != Errors.NONE || throwable != null) { log.error("[{}] Request {}: Error while find coordinator.", ctx.channel(), findCoordinator.getHeader(), throwable); - - resultFuture.complete(KafkaResponseUtils - .newFindCoordinator(Errors.LEADER_NOT_AVAILABLE)); + findSingleCoordinatorResult.complete( + new FindCoordinatorResponseData.Coordinator() + .setErrorCode(Errors.LEADER_NOT_AVAILABLE.code()) + .setErrorMessage(Errors.LEADER_NOT_AVAILABLE.message()) + .setKey(coordinatorKey)); return; } @@ -978,7 +1015,14 @@ protected void handleFindCoordinatorRequest(KafkaHeaderAndRequest findCoordinato log.debug("[{}] Found node {} as coordinator for key {} partition {}.", ctx.channel(), result.node, request.data().key(), partition); } - resultFuture.complete(KafkaResponseUtils.newFindCoordinator(result.node)); + findSingleCoordinatorResult.complete( + new FindCoordinatorResponseData.Coordinator() + .setNodeId(result.node.id()) + .setHost(result.node.host()) + .setPort(result.node.port()) + .setErrorCode(result.error.code()) + .setErrorMessage(result.error.message()) + .setKey(coordinatorKey)); }); } else if (request.data().keyType() == FindCoordinatorRequest.CoordinatorType.GROUP.id()) { authorize(AclOperation.DESCRIBE, Resource.of(ResourceType.GROUP, request.data().key())) @@ -986,27 +1030,33 @@ protected void handleFindCoordinatorRequest(KafkaHeaderAndRequest findCoordinato if (ex != null) { log.error("Describe group authorize failed, group - {}. {}", request.data().key(), ex.getMessage()); - resultFuture.complete(KafkaResponseUtils - .newFindCoordinator(Errors.GROUP_AUTHORIZATION_FAILED)); + findSingleCoordinatorResult.complete( + new FindCoordinatorResponseData.Coordinator() + .setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code()) + .setErrorMessage(Errors.GROUP_AUTHORIZATION_FAILED.message()) + .setKey(coordinatorKey)); + return; } if (!isAuthorized) { - resultFuture.complete( - KafkaResponseUtils - .newFindCoordinator(Errors.GROUP_AUTHORIZATION_FAILED)); + findSingleCoordinatorResult.complete( + new FindCoordinatorResponseData.Coordinator() + .setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code()) + .setErrorMessage(Errors.GROUP_AUTHORIZATION_FAILED.message()) + .setKey(coordinatorKey)); + return; } CompletableFuture storeGroupIdFuture; - int partition = getGroupCoordinator().partitionFor(request.data().key()); + int partition = getGroupCoordinator().partitionFor(coordinatorKey); String pulsarTopicName = getGroupCoordinator().getTopicPartitionName(partition); if (kafkaConfig.isKopEnableGroupLevelConsumerMetrics()) { - String groupId = request.data().key(); String groupIdPath = GroupIdUtils.groupIdPathFormat(findCoordinator.getClientHost(), findCoordinator.getHeader().clientId()); currentConnectedClientId.add(findCoordinator.getHeader().clientId()); // Store group name to metadata store for current client, use to collect consumer metrics. - storeGroupIdFuture = storeGroupId(groupId, groupIdPath); + storeGroupIdFuture = storeGroupId(coordinatorKey, groupIdPath); } else { storeGroupIdFuture = CompletableFuture.completedFuture(null); } @@ -1022,8 +1072,11 @@ protected void handleFindCoordinatorRequest(KafkaHeaderAndRequest findCoordinato log.error("[{}] Request {}: Error while find coordinator.", ctx.channel(), findCoordinator.getHeader(), throwable); - resultFuture.complete(KafkaResponseUtils - .newFindCoordinator(Errors.LEADER_NOT_AVAILABLE)); + findSingleCoordinatorResult.complete( + new FindCoordinatorResponseData.Coordinator() + .setErrorCode(Errors.LEADER_NOT_AVAILABLE.code()) + .setErrorMessage(Errors.LEADER_NOT_AVAILABLE.message()) + .setKey(coordinatorKey)); return; } @@ -1031,14 +1084,23 @@ protected void handleFindCoordinatorRequest(KafkaHeaderAndRequest findCoordinato log.debug("[{}] Found node {} as coordinator for key {} partition {}.", ctx.channel(), result.node, request.data().key(), partition); } - resultFuture.complete(KafkaResponseUtils.newFindCoordinator(result.node)); + findSingleCoordinatorResult.complete( + new FindCoordinatorResponseData.Coordinator() + .setNodeId(result.node.id()) + .setHost(result.node.host()) + .setPort(result.node.port()) + .setErrorCode(result.error.code()) + .setErrorMessage(result.error.message()) + .setKey(coordinatorKey)); }); }); }); } else { - throw new NotImplementedException("FindCoordinatorRequest not support unknown type " - + request.data().keyType()); + findSingleCoordinatorResult.completeExceptionally( + new NotImplementedException("FindCoordinatorRequest not support unknown type " + + request.data().keyType())); } + return findSingleCoordinatorResult; } @VisibleForTesting @@ -1082,6 +1144,57 @@ protected void handleOffsetFetchRequest(KafkaHeaderAndRequest offsetFetch, checkState(getGroupCoordinator() != null, "Group Coordinator not started"); + List> futures = new ArrayList<>(); + if (request.version() >= 8) { + request.data().groups().forEach(group -> { + String groupId = group.groupId(); + List partitions = new ArrayList<>(); + // null topics means no partitions specified, so we should fetch all partitions + if (group.topics() != null) { + group + .topics() + .forEach(topic -> { + topic.partitionIndexes() + .forEach(partition -> partitions.add(new TopicPartition(topic.name(), partition))); + }); + } + futures.add(getOffsetFetchForGroup(groupId, partitions)); + }); + + } else { + // old clients + String groupId = request.data().groupId(); + List partitions = new ArrayList<>(); + request.data().topics().forEach(topic -> { + topic + .partitionIndexes() + .forEach(partition -> partitions.add(new TopicPartition(topic.name(), partition))); + }); + futures.add(getOffsetFetchForGroup(groupId, partitions)); + } + + FutureUtil.waitForAll(futures).whenComplete((___, error) -> { + if (error != null) { + resultFuture.complete(request.getErrorResponse(error)); + return; + } + List partitionsResponses = new ArrayList<>(); + futures.forEach(f -> { + partitionsResponses.add(f.join()); + }); + + resultFuture.complete(buildOffsetFetchResponse(partitionsResponses, request.version())); + }); + + } + + protected CompletableFuture getOffsetFetchForGroup( + String groupId, + List partitions + ) { + + CompletableFuture resultFuture = new CompletableFuture<>(); + CompletableFuture> authorizeFuture = new CompletableFuture<>(); // replace @@ -1093,10 +1206,11 @@ protected void handleOffsetFetchRequest(KafkaHeaderAndRequest offsetFetch, Map unknownPartitionData = Maps.newConcurrentMap(); - if (request.partitions() == null || request.partitions().isEmpty()) { + if (partitions == null || partitions.isEmpty()) { + // fetch all partitions authorizeFuture.complete(null); } else { - AtomicInteger partitionCount = new AtomicInteger(request.partitions().size()); + AtomicInteger partitionCount = new AtomicInteger(partitions.size()); Runnable completeOneAuthorization = () -> { if (partitionCount.decrementAndGet() == 0) { @@ -1104,7 +1218,7 @@ protected void handleOffsetFetchRequest(KafkaHeaderAndRequest offsetFetch, } }; final String namespacePrefix = currentNamespacePrefix(); - request.partitions().forEach(tp -> { + partitions.forEach(tp -> { try { String fullName = new KopTopic(tp.topic(), namespacePrefix).getFullName(); authorize(AclOperation.DESCRIBE, Resource.of(ResourceType.TOPIC, fullName)) @@ -1137,7 +1251,7 @@ protected void handleOffsetFetchRequest(KafkaHeaderAndRequest offsetFetch, authorizeFuture.whenComplete((partitionList, ex) -> { KeyValue> keyValue = getGroupCoordinator().handleFetchOffsets( - request.groupId(), + groupId, Optional.ofNullable(partitionList) ); if (log.isDebugEnabled()) { @@ -1153,14 +1267,21 @@ protected void handleOffsetFetchRequest(KafkaHeaderAndRequest offsetFetch, } // recover to original topic name - replaceTopicPartition(keyValue.getValue(), replacingIndex); - keyValue.getValue().putAll(unauthorizedPartitionData); - keyValue.getValue().putAll(unknownPartitionData); - - resultFuture.complete(new OffsetFetchResponse(keyValue.getKey(), keyValue.getValue())); + Map partitionsResponses = keyValue.getValue(); + replaceTopicPartition(partitionsResponses, replacingIndex); + partitionsResponses.putAll(unauthorizedPartitionData); + partitionsResponses.putAll(unknownPartitionData); + + Errors errors = keyValue.getKey(); + resultFuture.complete(new KafkaResponseUtils.OffsetFetchResponseGroupData(groupId, errors, + partitionsResponses)); }); + + return resultFuture; } + + private CompletableFuture> fetchOffset(String topicName, long timestamp) { CompletableFuture> partitionData = new CompletableFuture<>(); @@ -1631,30 +1752,31 @@ protected void handleFetchRequest(KafkaHeaderAndRequest fetch, checkArgument(fetch.getRequest() instanceof FetchRequest); FetchRequest request = (FetchRequest) fetch.getRequest(); + FetchRequestData data = request.data(); if (log.isDebugEnabled()) { log.debug("[{}] Request {} Fetch request. Size: {}. Each item: ", - ctx.channel(), fetch.getHeader(), request.fetchData().size()); + ctx.channel(), fetch.getHeader(), data.topics().size()); - request.fetchData().forEach((topic, data) -> { - log.debug("Fetch request topic:{} data:{}.", topic, data.toString()); + data.topics().forEach((topicData) -> { + log.debug("Fetch request topic: data:{}.", topicData.toString()); }); } - if (request.fetchData().isEmpty()) { - resultFuture.complete(new FetchResponse<>( - Errors.NONE, - new LinkedHashMap<>(), - THROTTLE_TIME_MS, - request.metadata().sessionId())); + int numPartitions = data.topics().stream().mapToInt(topic -> topic.partitions().size()).sum(); + if (numPartitions == 0) { + resultFuture.complete(new FetchResponse(new FetchResponseData() + .setErrorCode(Errors.NONE.code()) + .setSessionId(request.metadata().sessionId()) + .setResponses(new ArrayList<>()))); return; } - ConcurrentHashMap> erroneous = + ConcurrentHashMap erroneous = new ConcurrentHashMap<>(); - ConcurrentHashMap interesting = + ConcurrentHashMap interesting = new ConcurrentHashMap<>(); - AtomicInteger unfinishedAuthorizationCount = new AtomicInteger(request.fetchData().size()); + AtomicInteger unfinishedAuthorizationCount = new AtomicInteger(numPartitions); Runnable completeOne = () -> { if (unfinishedAuthorizationCount.decrementAndGet() == 0) { TransactionCoordinator transactionCoordinator = null; @@ -1667,13 +1789,12 @@ protected void handleFetchRequest(KafkaHeaderAndRequest fetch, int fetchMinBytes = Math.min(request.minBytes(), fetchMaxBytes); if (interesting.isEmpty()) { if (log.isDebugEnabled()) { - log.debug("Fetch interesting is empty. Partitions: [{}]", request.fetchData()); + log.debug("Fetch interesting is empty. Partitions: [{}]", data.topics()); } - resultFuture.complete(new FetchResponse<>( - Errors.NONE, - new LinkedHashMap<>(erroneous), - THROTTLE_TIME_MS, - request.metadata().sessionId())); + resultFuture.complete(new FetchResponse(new FetchResponseData() + .setErrorCode(Errors.NONE.code()) + .setSessionId(request.metadata().sessionId()) + .setResponses(buildFetchResponses(erroneous)))); } else { MessageFetchContext context = MessageFetchContext .get(this, transactionCoordinator, maxReadEntriesNum, namespacePrefix, @@ -1686,18 +1807,17 @@ protected void handleFetchRequest(KafkaHeaderAndRequest fetch, request.isolationLevel(), context ).thenAccept(resultMap -> { - LinkedHashMap> partitions = - new LinkedHashMap<>(); - resultMap.forEach((tp, data) -> { - partitions.put(tp, data.toPartitionData()); + Map all = new HashMap<>(); + resultMap.forEach((tp, results) -> { + all.put(tp, results.toPartitionData()); }); - partitions.putAll(erroneous); + all.putAll(erroneous); boolean triggeredCompletion = resultFuture.complete(new ResponseCallbackWrapper( - new FetchResponse<>( - Errors.NONE, - partitions, - 0, - request.metadata().sessionId()), + new FetchResponse(new FetchResponseData() + .setErrorCode(Errors.NONE.code()) + .setThrottleTimeMs(0) + .setSessionId(request.metadata().sessionId()) + .setResponses(buildFetchResponses(all))), () -> resultMap.forEach((__, readRecordsResult) -> { readRecordsResult.recycle(); }) @@ -1714,36 +1834,72 @@ protected void handleFetchRequest(KafkaHeaderAndRequest fetch, }; // Regular Kafka consumers need READ permission on each partition they are fetching. - request.fetchData().forEach((topicPartition, partitionData) -> { - final String fullTopicName = KopTopic.toString(topicPartition, this.currentNamespacePrefix()); - authorize(AclOperation.READ, Resource.of(ResourceType.TOPIC, fullTopicName)) - .whenComplete((isAuthorized, ex) -> { - if (ex != null) { - log.error("Read topic authorize failed, topic - {}. {}", - fullTopicName, ex.getMessage()); - erroneous.put(topicPartition, errorResponse(Errors.TOPIC_AUTHORIZATION_FAILED)); - completeOne.run(); - return; - } - if (!isAuthorized) { - erroneous.put(topicPartition, errorResponse(Errors.TOPIC_AUTHORIZATION_FAILED)); + data.topics().forEach(topicData -> { + topicData.partitions().forEach((partitionData) -> { + TopicPartition topicPartition = new TopicPartition(topicData.topic(), partitionData.partition()); + final String fullTopicName = KopTopic.toString(topicPartition, this.currentNamespacePrefix()); + authorize(AclOperation.READ, Resource.of(ResourceType.TOPIC, fullTopicName)) + .whenComplete((isAuthorized, ex) -> { + if (ex != null) { + log.error("Read topic authorize failed, topic - {}. {}", + fullTopicName, ex.getMessage()); + erroneous.put(topicPartition, errorResponse(Errors.TOPIC_AUTHORIZATION_FAILED)); + completeOne.run(); + return; + } + if (!isAuthorized) { + erroneous.put(topicPartition, errorResponse(Errors.TOPIC_AUTHORIZATION_FAILED)); + completeOne.run(); + return; + } + interesting.put(topicPartition, partitionData); completeOne.run(); - return; - } - interesting.put(topicPartition, partitionData); - completeOne.run(); - }); + }); + }); }); } - private static FetchResponse.PartitionData errorResponse(Errors error) { - return new FetchResponse.PartitionData<>(error, - FetchResponse.INVALID_HIGHWATERMARK, - FetchResponse.INVALID_LAST_STABLE_OFFSET, - FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY); + public static List buildFetchResponses( + Map partitionData) { + List result = new ArrayList<>(); + partitionData.keySet() + .stream() + .map(topicPartition -> topicPartition.topic()) + .distinct() + .forEach(topic -> { + FetchResponseData.FetchableTopicResponse fetchableTopicResponse = + new FetchResponseData.FetchableTopicResponse() + .setTopic(topic) + .setPartitions(new ArrayList<>()); + result.add(fetchableTopicResponse); + + partitionData.forEach((tp, data) -> { + if (tp.topic().equals(topic)) { + fetchableTopicResponse.partitions().add(new FetchResponseData.PartitionData() + .setPartitionIndex(tp.partition()) + .setErrorCode(data.errorCode()) + .setHighWatermark(data.highWatermark()) + .setLastStableOffset(data.lastStableOffset()) + .setLogStartOffset(data.logStartOffset()) + .setAbortedTransactions(data.abortedTransactions()) + .setPreferredReadReplica(data.preferredReadReplica()) + .setRecords(data.records())); + } + }); + }); + return result; + } + + private static FetchResponseData.PartitionData errorResponse(Errors error) { + return new FetchResponseData.PartitionData() + .setErrorCode(error.code()) + .setHighWatermark(FetchResponse.INVALID_HIGH_WATERMARK) + .setLastStableOffset(FetchResponse.INVALID_LAST_STABLE_OFFSET) + .setLogStartOffset(FetchResponse.INVALID_LOG_START_OFFSET) + .setRecords(MemoryRecords.EMPTY); } @Override @@ -1779,7 +1935,8 @@ protected void handleJoinGroupRequest(KafkaHeaderAndRequest joinGroup, joinGroupResult.getProtocolType(), joinGroupResult.getMemberId(), joinGroupResult.getLeaderId(), - members + members, + request.version() ); if (log.isTraceEnabled()) { log.trace("Sending join group response {} for correlation id {} to client {}.", diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLog.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLog.java index 3b5b50c390..63f2221025 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLog.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLog.java @@ -75,12 +75,12 @@ import org.apache.kafka.common.errors.NotLeaderOrFollowerException; import org.apache.kafka.common.errors.RecordTooLargeException; import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.message.FetchRequestData; +import org.apache.kafka.common.message.FetchResponseData; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.RecordBatch; -import org.apache.kafka.common.record.Records; -import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.common.requests.FetchResponse; import org.apache.kafka.common.utils.Time; import org.apache.pulsar.broker.service.Topic; @@ -284,7 +284,7 @@ protected ReadRecordsResult newObject(Handle handle) { private final Recycler.Handle recyclerHandle; private DecodeResult decodeResult; - private List abortedTransactions; + private List abortedTransactions; private long highWatermark; private long lastStableOffset; private Position lastPosition; @@ -301,7 +301,7 @@ public Errors errors() { } public static ReadRecordsResult get(DecodeResult decodeResult, - List abortedTransactions, + List abortedTransactions, long highWatermark, long lastStableOffset, Position lastPosition, @@ -317,7 +317,7 @@ public static ReadRecordsResult get(DecodeResult decodeResult, } public static ReadRecordsResult get(DecodeResult decodeResult, - List abortedTransactions, + List abortedTransactions, long highWatermark, long lastStableOffset, Position lastPosition, @@ -361,7 +361,7 @@ public static ReadRecordsResult error(Position position, Errors errors, Partitio partitionLog); } - public FetchResponse.PartitionData toPartitionData() { + public FetchResponseData.PartitionData toPartitionData() { // There are three cases: // @@ -372,21 +372,20 @@ public FetchResponse.PartitionData toPartitionData() { // 3. errors == Others error : // Get errors. if (errors != null) { - return new FetchResponse.PartitionData<>( - errors, - FetchResponse.INVALID_HIGHWATERMARK, - FetchResponse.INVALID_LAST_STABLE_OFFSET, - FetchResponse.INVALID_LOG_START_OFFSET, - null, - MemoryRecords.EMPTY); + return new FetchResponseData.PartitionData() + .setErrorCode(errors.code()) + .setHighWatermark(FetchResponse.INVALID_HIGH_WATERMARK) + .setLastStableOffset(FetchResponse.INVALID_LAST_STABLE_OFFSET) + .setLogStartOffset(FetchResponse.INVALID_LOG_START_OFFSET) + .setRecords(MemoryRecords.EMPTY); } - return new FetchResponse.PartitionData<>( - Errors.NONE, - highWatermark, - lastStableOffset, - highWatermark, // TODO: should it be changed to the logStartOffset? - abortedTransactions, - decodeResult.getRecords()); + return new FetchResponseData.PartitionData() + .setErrorCode(Errors.NONE.code()) + .setHighWatermark(highWatermark) + .setLastStableOffset(lastStableOffset) + .setHighWatermark(highWatermark) // TODO: should it be changed to the logStartOffset? + .setAbortedTransactions(abortedTransactions) + .setRecords(decodeResult.getRecords()); } public void recycle() { @@ -453,7 +452,7 @@ public Optional firstUndecidedOffset() { return producerStateManager.firstUndecidedOffset(); } - public List getAbortedIndexList(long fetchOffset) { + public List getAbortedIndexList(long fetchOffset) { return producerStateManager.getAbortedIndexList(fetchOffset); } @@ -530,14 +529,14 @@ public Position getLastPosition() { return persistentTopic.getLastPosition(); } - public CompletableFuture readRecords(final FetchRequest.PartitionData partitionData, + public CompletableFuture readRecords(final FetchRequestData.FetchPartition partitionData, final boolean readCommitted, final AtomicLong limitBytes, final int maxReadEntriesNum, final MessageFetchContext context) { final long startPrepareMetadataNanos = MathUtils.nowInNano(); final CompletableFuture future = new CompletableFuture<>(); - final long offset = partitionData.fetchOffset; + final long offset = partitionData.fetchOffset(); KafkaTopicManager topicManager = context.getTopicManager(); // The future that is returned by getTopicConsumerManager is always completed normally topicManager.getTopicConsumerManager(fullPartitionName).thenAccept(tcm -> { @@ -585,7 +584,7 @@ public CompletableFuture readRecords(final FetchRequest.Parti requestStats.getPrepareMetadataStats().registerSuccessfulEvent( MathUtils.elapsedNanos(startPrepareMetadataNanos), TimeUnit.NANOSECONDS); - long adjustedMaxBytes = Math.min(partitionData.maxBytes, limitBytes.get()); + long adjustedMaxBytes = Math.min(partitionData.partitionMaxBytes(), limitBytes.get()); if (readCommitted) { long firstUndecidedOffset = producerStateManager.firstUndecidedOffset().orElse(-1L); if (firstUndecidedOffset >= 0 && firstUndecidedOffset <= offset) { @@ -667,7 +666,7 @@ private void registerPrepareMetadataFailedEvent(long startPrepareMetadataNanos) private void handleEntries(final CompletableFuture future, final List entries, - final FetchRequest.PartitionData partitionData, + final FetchRequestData.FetchPartition partitionData, final KafkaTopicConsumerManager tcm, final ManagedCursor cursor, final boolean readCommitted, @@ -709,9 +708,9 @@ private void handleEntries(final CompletableFuture future, // collect consumer metrics decodeResult.updateConsumerStats(topicPartition, committedEntries.size(), groupName, requestStats); - List abortedTransactions = null; + List abortedTransactions = null; if (readCommitted) { - abortedTransactions = this.getAbortedIndexList(partitionData.fetchOffset); + abortedTransactions = this.getAbortedIndexList(partitionData.fetchOffset()); } if (log.isDebugEnabled()) { log.debug("Partition {} read entry completed in {} ns", @@ -1116,7 +1115,15 @@ public CompletableFuture fetchOldestAvailableIndexFromTopic() { // look for the first entry with data PositionImpl nextValidPosition = managedLedger.getNextValidPosition(firstPosition); - managedLedger.asyncReadEntry(nextValidPosition, new AsyncCallbacks.ReadEntryCallback() { + fetchOldestAvailableIndexFromTopicReadNext(future, managedLedger, nextValidPosition); + + return future; + + } + + private void fetchOldestAvailableIndexFromTopicReadNext(CompletableFuture future, + ManagedLedgerImpl managedLedger, PositionImpl position) { + managedLedger.asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback() { @Override public void readEntryComplete(Entry entry, Object ctx) { try { @@ -1124,6 +1131,13 @@ public void readEntryComplete(Entry entry, Object ctx) { log.info("First offset for topic {} is {} - position {}", fullPartitionName, startOffset, entry.getPosition()); future.complete(startOffset); + } catch (MetadataCorruptedException.NoBrokerEntryMetadata noBrokerEntryMetadata) { + long currentOffset = MessageMetadataUtils.getCurrentOffset(managedLedger); + log.info("Legacy entry for topic {} - position {} - returning current offset {}", + fullPartitionName, + entry.getPosition(), + currentOffset); + future.complete(currentOffset); } catch (Exception err) { future.completeExceptionally(err); } finally { @@ -1136,9 +1150,6 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) { future.completeExceptionally(exception); } }, null); - - return future; - } public CompletableFuture takeProducerSnapshot() { @@ -1168,7 +1179,8 @@ public CompletableFuture forcePurgeAbortTx() { public CompletableFuture recoverTxEntries( long offset, Executor executor) { - if (!kafkaConfig.isKafkaTransactionCoordinatorEnabled()) { + if (!kafkaConfig.isKafkaTransactionCoordinatorEnabled() + || !MessageMetadataUtils.isInterceptorConfigured(persistentTopic.getManagedLedger())) { // no need to scan the topic, because transactions are disabled return CompletableFuture.completedFuture(Long.valueOf(0)); } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ProducerStateManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ProducerStateManager.java index df6fef09fd..a19fe5ee8b 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ProducerStateManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ProducerStateManager.java @@ -27,8 +27,8 @@ import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.util.SafeRunnable; +import org.apache.kafka.common.message.FetchResponseData; import org.apache.kafka.common.record.RecordBatch; -import org.apache.kafka.common.requests.FetchResponse; /** * Producer state manager. @@ -328,13 +328,15 @@ public long purgeAbortedTxns(long offset) { return count.get(); } - public List getAbortedIndexList(long fetchOffset) { + public List getAbortedIndexList(long fetchOffset) { synchronized (abortedIndexList) { - List abortedTransactions = new ArrayList<>(); + List abortedTransactions = new ArrayList<>(); for (AbortedTxn abortedTxn : abortedIndexList) { if (abortedTxn.lastOffset() >= fetchOffset) { abortedTransactions.add( - new FetchResponse.AbortedTransaction(abortedTxn.producerId(), abortedTxn.firstOffset())); + new FetchResponseData.AbortedTransaction() + .setProducerId(abortedTxn.producerId()) + .setFirstOffset(abortedTxn.firstOffset())); } } return abortedTransactions; diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ReplicaManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ReplicaManager.java index 03f9939882..711b7d2395 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ReplicaManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ReplicaManager.java @@ -42,9 +42,9 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.errors.NotLeaderOrFollowerException; +import org.apache.kafka.common.message.FetchRequestData; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.MemoryRecords; -import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.common.requests.ProduceResponse; import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Time; @@ -239,7 +239,7 @@ public CompletableFuture> fe final long timeout, final int fetchMinBytes, final int fetchMaxBytes, - final ConcurrentHashMap fetchInfos, + final ConcurrentHashMap fetchInfos, final IsolationLevel isolationLevel, final MessageFetchContext context) { CompletableFuture> future = @@ -293,7 +293,7 @@ public CompletableFuture> re final boolean readCommitted, final int fetchMaxBytes, final int maxReadEntriesNum, - final Map readPartitionInfo, + final Map readPartitionInfo, final MessageFetchContext context) { AtomicLong limitBytes = new AtomicLong(fetchMaxBytes); CompletableFuture> resultFuture = new CompletableFuture<>(); diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/KafkaResponseUtils.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/KafkaResponseUtils.java index fa386b26f7..f20fc5251c 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/KafkaResponseUtils.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/KafkaResponseUtils.java @@ -24,6 +24,7 @@ import java.util.stream.Collectors; import lombok.AllArgsConstructor; import lombok.Getter; +import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.Pair; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; @@ -41,6 +42,7 @@ import org.apache.kafka.common.message.ListGroupsResponseData; import org.apache.kafka.common.message.ListOffsetsResponseData; import org.apache.kafka.common.message.MetadataResponseData; +import org.apache.kafka.common.message.OffsetFetchResponseData; import org.apache.kafka.common.message.SaslAuthenticateResponseData; import org.apache.kafka.common.message.SaslHandshakeResponseData; import org.apache.kafka.common.message.SyncGroupResponseData; @@ -54,6 +56,7 @@ import org.apache.kafka.common.requests.DeleteRecordsResponse; import org.apache.kafka.common.requests.DeleteTopicsResponse; import org.apache.kafka.common.requests.DescribeGroupsResponse; +import org.apache.kafka.common.requests.FindCoordinatorRequest; import org.apache.kafka.common.requests.FindCoordinatorResponse; import org.apache.kafka.common.requests.HeartbeatResponse; import org.apache.kafka.common.requests.JoinGroupResponse; @@ -68,6 +71,7 @@ import org.apache.kafka.common.requests.SyncGroupResponse; import org.apache.pulsar.common.schema.KeyValue; +@Slf4j public class KafkaResponseUtils { public static ApiVersionsResponse newApiVersions(List versionList) { @@ -184,22 +188,52 @@ public static DescribeGroupsResponse newDescribeGroups( return new DescribeGroupsResponse(data); } - public static FindCoordinatorResponse newFindCoordinator(Node node) { + public static FindCoordinatorResponse newFindCoordinator(List coordinatorKeys, + Node node, + int version) { FindCoordinatorResponseData data = new FindCoordinatorResponseData(); - data.setNodeId(node.id()); - data.setHost(node.host()); - data.setPort(node.port()); - data.setErrorCode(Errors.NONE.code()); + if (version < FindCoordinatorRequest.MIN_BATCHED_VERSION) { + data.setErrorMessage(Errors.NONE.message()) + .setErrorCode(Errors.NONE.code()) + .setPort(node.port()) + .setHost(node.host()) + .setNodeId(node.id()); + } else { + // for new clients + data.setCoordinators(coordinatorKeys + .stream() + .map(key -> new FindCoordinatorResponseData.Coordinator() + .setErrorCode(Errors.NONE.code()) + .setErrorMessage(Errors.NONE.message()) + .setHost(node.host()) + .setPort(node.port()) + .setNodeId(node.id()) + .setKey(key)) + .collect(Collectors.toList())); + } + return new FindCoordinatorResponse(data); } - public static FindCoordinatorResponse newFindCoordinator(Errors errors) { + public static FindCoordinatorResponse newFindCoordinator(List coordinators, + int version) { FindCoordinatorResponseData data = new FindCoordinatorResponseData(); - data.setErrorCode(errors.code()); - data.setErrorMessage(errors.message()); + if (version < FindCoordinatorRequest.MIN_BATCHED_VERSION) { + FindCoordinatorResponseData.Coordinator coordinator = coordinators.get(0); + data.setErrorMessage(coordinator.errorMessage()) + .setErrorCode(coordinator.errorCode()) + .setPort(coordinator.port()) + .setHost(coordinator.host()) + .setNodeId(coordinator.nodeId()); + } else { + // for new clients + data.setCoordinators(coordinators); + } + return new FindCoordinatorResponse(data); } + public static HeartbeatResponse newHeartbeat(Errors errors) { HeartbeatResponseData data = new HeartbeatResponseData(); data.setErrorCode(errors.code()); @@ -212,7 +246,8 @@ public static JoinGroupResponse newJoinGroup(Errors errors, String groupProtocolType, String memberId, String leaderId, - Map groupMembers) { + Map groupMembers, + short requestVersion) { JoinGroupResponseData data = new JoinGroupResponseData() .setErrorCode(errors.code()) .setLeader(leaderId) @@ -234,7 +269,7 @@ public static JoinGroupResponse newJoinGroup(Errors errors, data.setThrottleTimeMs(1000); } - return new JoinGroupResponse(data); + return new JoinGroupResponse(data, requestVersion); } public static LeaveGroupResponse newLeaveGroup(Errors errors) { @@ -347,6 +382,7 @@ public static MetadataResponse newMetadata(List nodes, return new MetadataResponse(data, apiVersion); } + @Getter @AllArgsConstructor public static class BrokerLookupResult { @@ -435,4 +471,59 @@ public static SyncGroupResponse newSyncGroup(Errors errors, data.setAssignment(assignment); return new SyncGroupResponse(data); } + + @AllArgsConstructor + public static class OffsetFetchResponseGroupData { + String groupId; + Errors errors; + Map partitionsResponses; + } + + public static OffsetFetchResponse buildOffsetFetchResponse( + List groups, + int version) { + + if (version < 8) { + // old clients + OffsetFetchResponseGroupData offsetFetchResponseGroupData = groups.get(0); + return new OffsetFetchResponse(offsetFetchResponseGroupData.errors, + offsetFetchResponseGroupData.partitionsResponses); + } else { + // new clients + OffsetFetchResponseData data = new OffsetFetchResponseData(); + for (OffsetFetchResponseGroupData groupData : groups) { + OffsetFetchResponseData.OffsetFetchResponseGroup offsetFetchResponseGroup = + new OffsetFetchResponseData.OffsetFetchResponseGroup() + .setErrorCode(groupData.errors.code()) + .setGroupId(groupData.groupId) + .setTopics(new ArrayList<>()); + data.groups().add(offsetFetchResponseGroup); + Set topics = groupData.partitionsResponses.keySet().stream().map(TopicPartition::topic) + .collect(Collectors.toSet()); + topics.forEach(topic -> { + offsetFetchResponseGroup.topics().add(new OffsetFetchResponseData.OffsetFetchResponseTopics() + .setName(topic) + .setPartitions(groupData.partitionsResponses.entrySet() + .stream() + .filter(e -> e.getKey().topic().equals(topic)) + .map(entry -> { + OffsetFetchResponse.PartitionData value = entry.getValue(); + if (log.isDebugEnabled()) { + log.debug("Add resp for group {} topic {}: {}", + groupData.groupId, topic, value); + } + return new OffsetFetchResponseData.OffsetFetchResponsePartitions() + .setErrorCode(value.error.code()) + .setMetadata(value.metadata) + .setPartitionIndex(entry.getKey().partition()) + .setCommittedOffset(value.offset) + .setCommittedLeaderEpoch(value.leaderEpoch.orElse(-1)); + }) + .collect(Collectors.toList()))); + }); + } + return new OffsetFetchResponse(data); + } + + } } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/MessageMetadataUtils.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/MessageMetadataUtils.java index 7940f063ad..52bb5a3b1f 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/MessageMetadataUtils.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/MessageMetadataUtils.java @@ -38,6 +38,10 @@ @Slf4j public class MessageMetadataUtils { + public static boolean isInterceptorConfigured(ManagedLedger managedLedger) { + return managedLedger.getManagedLedgerInterceptor() instanceof ManagedLedgerInterceptorImpl; + } + public static long getCurrentOffset(ManagedLedger managedLedger) { return ((ManagedLedgerInterceptorImpl) managedLedger.getManagedLedgerInterceptor()).getIndex(); } diff --git a/kafka-impl/src/main/java/org/apache/kafka/common/requests/ResponseCallbackWrapper.java b/kafka-impl/src/main/java/org/apache/kafka/common/requests/ResponseCallbackWrapper.java index 21cb50eb95..038db82591 100644 --- a/kafka-impl/src/main/java/org/apache/kafka/common/requests/ResponseCallbackWrapper.java +++ b/kafka-impl/src/main/java/org/apache/kafka/common/requests/ResponseCallbackWrapper.java @@ -56,4 +56,9 @@ public int throttleTimeMs() { public ApiMessage data() { return abstractResponse.data(); } + + @Override + public void maybeSetThrottleTimeMs(int i) { + abstractResponse.maybeSetThrottleTimeMs(i); + } } diff --git a/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatterTest.java b/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatterTest.java index 948d58e1b4..18cbcad50d 100644 --- a/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatterTest.java +++ b/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatterTest.java @@ -339,15 +339,15 @@ public MineMemoryRecordsBuilder(ByteBufferOutputStream bufferStream, } @Override - public Long appendWithOffset(long offset, SimpleRecord record) { - return appendWithOffset(offset, + public void appendWithOffset(long offset, SimpleRecord record) { + appendWithOffset(offset, record.timestamp(), record.key(), record.value(), record.headers()); } - public Long appendWithOffset(long offset, + public void appendWithOffset(long offset, long timestamp, ByteBuffer key, ByteBuffer value, @@ -359,9 +359,9 @@ public Long appendWithOffset(long offset, if (magic > RecordBatch.MAGIC_VALUE_V1) { appendDefaultRecord(offset, timestamp, key, value, headers); - return null; + } else { - return appendLegacyRecord(offset, timestamp, key, value); + appendLegacyRecord(offset, timestamp, key, value); } } catch (IOException e) { throw new KafkaException("I/O exception when writing to the append stream, closing", e); diff --git a/pom.xml b/pom.xml index f2d2dc7b81..fd058bf8c6 100644 --- a/pom.xml +++ b/pom.xml @@ -48,7 +48,7 @@ 2.14.0 2.13.4.2 - 2.8.0 + 3.4.0 1.18.24 4.11.0 io.streamnative diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/EntryPublishTimeKafkaFormatTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/EntryPublishTimeKafkaFormatTest.java index 35efa41895..165726969a 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/EntryPublishTimeKafkaFormatTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/EntryPublishTimeKafkaFormatTest.java @@ -92,7 +92,7 @@ public void testPublishTime() throws Exception { // time before first message ListOffsetsRequest.Builder builder = ListOffsetsRequest.Builder - .forConsumer(true, IsolationLevel.READ_UNCOMMITTED) + .forConsumer(true, IsolationLevel.READ_UNCOMMITTED, false) .setTargetTimes(KafkaCommonTestUtils.newListOffsetTargetTimes(tp, startTime)); KafkaCommandDecoder.KafkaHeaderAndRequest request = buildRequest(builder); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaApisTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaApisTest.java index 056a7cc0ce..d97f726cea 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaApisTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaApisTest.java @@ -73,10 +73,12 @@ import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.FetchResponseData; import org.apache.kafka.common.message.FindCoordinatorRequestData; import org.apache.kafka.common.message.ListOffsetsResponseData; import org.apache.kafka.common.message.OffsetCommitRequestData; import org.apache.kafka.common.message.ProduceRequestData; +import org.apache.kafka.common.message.ProduceResponseData; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.CompressionType; @@ -108,6 +110,7 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.util.netty.EventLoopUtil; +import org.jetbrains.annotations.NotNull; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -285,7 +288,7 @@ public void testReadUncommittedConsumerListOffsetEarliestOffsetEquals() throws E // 2. real test, for ListOffset request verify Earliest get earliest ListOffsetsRequest.Builder builder = ListOffsetsRequest.Builder - .forConsumer(true, IsolationLevel.READ_UNCOMMITTED) + .forConsumer(true, IsolationLevel.READ_UNCOMMITTED, false) .setTargetTimes(KafkaCommonTestUtils.newListOffsetTargetTimes(tp, EARLIEST_TIMESTAMP)); KafkaHeaderAndRequest request = buildRequest(builder); @@ -338,7 +341,7 @@ public void testConsumerListOffsetLatest() throws Exception { // 2. real test, for ListOffset request verify Earliest get earliest ListOffsetsRequest.Builder builder = ListOffsetsRequest.Builder - .forConsumer(true, IsolationLevel.READ_UNCOMMITTED) + .forConsumer(true, IsolationLevel.READ_UNCOMMITTED, false) .setTargetTimes(KafkaCommonTestUtils.newListOffsetTargetTimes(tp, ListOffsetsRequest.LATEST_TIMESTAMP)); KafkaHeaderAndRequest request = buildRequest(builder); @@ -572,7 +575,7 @@ public void testConsumerListOffset() throws Exception { private ListOffsetsResponse listOffset(long timestamp, TopicPartition tp) throws Exception { ListOffsetsRequest.Builder builder = ListOffsetsRequest.Builder - .forConsumer(true, IsolationLevel.READ_UNCOMMITTED) + .forConsumer(true, IsolationLevel.READ_UNCOMMITTED, false) .setTargetTimes(KafkaCommonTestUtils.newListOffsetTargetTimes(tp, timestamp)); KafkaHeaderAndRequest request = buildRequest(builder); @@ -586,21 +589,24 @@ private ListOffsetsResponse listOffset(long timestamp, TopicPartition tp) throws /// Add test for FetchRequest private void checkFetchResponse(List expectedPartitions, - FetchResponse fetchResponse, + FetchResponse fetchResponse, int maxPartitionBytes, int maxResponseBytes, int numMessagesPerPartition) { - assertEquals(expectedPartitions.size(), fetchResponse.responseData().size()); - expectedPartitions.forEach(tp -> assertTrue(fetchResponse.responseData().get(tp) != null)); + assertEquals(expectedPartitions.size(), fetchResponse + .data().responses().stream().mapToInt(r->r.partitions().size())); + expectedPartitions.forEach(tp + -> assertTrue(getPartitionDataFromFetchResponse(fetchResponse, tp) != null)); final AtomicBoolean emptyResponseSeen = new AtomicBoolean(false); AtomicInteger responseSize = new AtomicInteger(0); AtomicInteger responseBufferSize = new AtomicInteger(0); expectedPartitions.forEach(tp -> { - FetchResponse.PartitionData partitionData = fetchResponse.responseData().get(tp); - assertEquals(Errors.NONE, partitionData.error()); + FetchResponseData.PartitionData partitionData = getPartitionDataFromFetchResponse(fetchResponse, tp); + + assertEquals(Errors.NONE.code(), partitionData.errorCode()); assertTrue(partitionData.highWatermark() > 0); MemoryRecords records = (MemoryRecords) partitionData.records(); @@ -631,6 +637,18 @@ private void checkFetchResponse(List expectedPartitions, // In Kop implementation, fetch at least 1 item for each topicPartition in the request. } + @NotNull + private static FetchResponseData.PartitionData getPartitionDataFromFetchResponse(FetchResponse fetchResponse, + TopicPartition tp) { + FetchResponseData.PartitionData partitionData = fetchResponse + .data() + .responses().stream().filter(t->t.topic().equals(tp.topic())) + .flatMap(r-> r.partitions().stream()) + .filter(p -> p.partitionIndex() == tp.partition()) + .findFirst().orElse(null); + return partitionData; + } + private Map createPartitionMap(int maxPartitionBytes, List topicPartitions, Map offsetMap) { @@ -649,7 +667,8 @@ private KafkaHeaderAndRequest createFetchRequest(int maxResponseBytes, Map offsetMap) { AbstractRequest.Builder builder = FetchRequest.Builder - .forConsumer(Integer.MAX_VALUE, 0, createPartitionMap(maxPartitionBytes, topicPartitions, offsetMap)) + .forConsumer(ApiKeys.FETCH.latestVersion(), + Integer.MAX_VALUE, 0, createPartitionMap(maxPartitionBytes, topicPartitions, offsetMap)) .setMaxBytes(maxResponseBytes); return buildRequest(builder); @@ -795,8 +814,8 @@ public void testBrokerRespectsPartitionsOrderAndSizeLimits() throws Exception { Collections.EMPTY_MAP); CompletableFuture responseFuture1 = new CompletableFuture<>(); kafkaRequestHandler.handleFetchRequest(fetchRequest1, responseFuture1); - FetchResponse fetchResponse1 = - (FetchResponse) responseFuture1.get(); + FetchResponse fetchResponse1 = + (FetchResponse) responseFuture1.get(); checkFetchResponse(shuffledTopicPartitions1, fetchResponse1, maxPartitionBytes, maxResponseBytes, messagesPerPartition); @@ -814,8 +833,8 @@ public void testBrokerRespectsPartitionsOrderAndSizeLimits() throws Exception { Collections.EMPTY_MAP); CompletableFuture responseFuture2 = new CompletableFuture<>(); kafkaRequestHandler.handleFetchRequest(fetchRequest2, responseFuture2); - FetchResponse fetchResponse2 = - (FetchResponse) responseFuture2.get(); + FetchResponse fetchResponse2 = + (FetchResponse) responseFuture2.get(); checkFetchResponse(shuffledTopicPartitions2, fetchResponse2, maxPartitionBytes, maxResponseBytes, messagesPerPartition); @@ -836,8 +855,8 @@ public void testBrokerRespectsPartitionsOrderAndSizeLimits() throws Exception { offsetMaps); CompletableFuture responseFuture3 = new CompletableFuture<>(); kafkaRequestHandler.handleFetchRequest(fetchRequest3, responseFuture3); - FetchResponse fetchResponse3 = - (FetchResponse) responseFuture3.get(); + FetchResponse fetchResponse3 = + (FetchResponse) responseFuture3.get(); checkFetchResponse(shuffledTopicPartitions3, fetchResponse3, maxPartitionBytes, maxResponseBytes, messagesPerPartition); @@ -895,7 +914,7 @@ public void testGetOffsetsForUnknownTopic() throws Exception { TopicPartition tp = new TopicPartition(topicName, 0); ListOffsetsRequest.Builder builder = ListOffsetsRequest.Builder - .forConsumer(false, IsolationLevel.READ_UNCOMMITTED) + .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false) .setTargetTimes(KafkaCommonTestUtils.newListOffsetTargetTimes(tp, ListOffsetsRequest.LATEST_TIMESTAMP)); KafkaHeaderAndRequest request = buildRequest(builder); @@ -989,11 +1008,17 @@ private void verifySendMessageToPartition(final TopicPartition topicPartition, ApiKeys.PRODUCE.latestVersion(), produceRequestData)); final CompletableFuture future = new CompletableFuture<>(); kafkaRequestHandler.handleProduceRequest(request, future); - final ProduceResponse.PartitionResponse response = - ((ProduceResponse) future.get()).responses().get(topicPartition); + final ProduceResponseData.PartitionProduceResponse response = + ((ProduceResponse) future.get()).data().responses() + .stream() + .filter(r->r.name().equals(topicPartition.topic())) + .flatMap(r->r.partitionResponses().stream()) + .filter(p->p.index() == topicPartition.partition()) + .findFirst() + .get(); assertNotNull(response); - assertEquals(response.error, expectedError); - assertEquals(response.baseOffset, expectedOffset); + assertEquals(response.errorCode(), expectedError.code()); + assertEquals(response.baseOffset(), expectedOffset); } private static MemoryRecords newIdempotentRecords( @@ -1142,10 +1167,13 @@ public void testFetchMinBytesSingleConsumer() throws Exception { final int minBytes = 1; @Cleanup - final KafkaHeaderAndRequest request = buildRequest(FetchRequest.Builder.forConsumer(maxWaitMs, minBytes, - Collections.singletonMap(topicPartition, new FetchRequest.PartitionData( + final KafkaHeaderAndRequest request = buildRequest(FetchRequest.Builder + .forConsumer(ApiKeys.FETCH.oldestVersion(), + maxWaitMs, minBytes, + Collections.singletonMap(topicPartition, new FetchRequest.PartitionData(null, 0L, -1L, 1024 * 1024, Optional.empty() )))); + final CompletableFuture future = new CompletableFuture<>(); final long startTime = System.currentTimeMillis(); kafkaRequestHandler.handleFetchRequest(request, future); @@ -1157,11 +1185,10 @@ public void testFetchMinBytesSingleConsumer() throws Exception { AbstractResponse abstractResponse = ((ResponseCallbackWrapper) future.get(maxWaitMs + 1000, TimeUnit.MILLISECONDS)).getResponse(); assertTrue(abstractResponse instanceof FetchResponse); - final FetchResponse response = (FetchResponse) abstractResponse; + final FetchResponse response = (FetchResponse) abstractResponse; assertEquals(response.error(), Errors.NONE); final long endTime = System.currentTimeMillis(); - log.info("Take {} ms to process FETCH request, record count: {}", - endTime - startTime, response.responseData().size()); + log.info("Take {} ms to process FETCH request", endTime - startTime); assertTrue(endTime - startTime <= maxWaitMs); Long waitingFetchesTriggered = kafkaRequestHandler.getRequestStats().getWaitingFetchesTriggered().get(); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaCommonTestUtils.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaCommonTestUtils.java index 129a5928d4..71b8a9fb6c 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaCommonTestUtils.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaCommonTestUtils.java @@ -13,6 +13,8 @@ */ package io.streamnative.pulsar.handlers.kop; +import static org.testng.Assert.assertEquals; + import io.netty.buffer.ByteBuf; import java.net.SocketAddress; import java.nio.ByteBuffer; @@ -48,7 +50,7 @@ public static List newListOffsetTargetT public static FetchRequest.PartitionData newFetchRequestPartitionData(long fetchOffset, long logStartOffset, int maxBytes) { - return new FetchRequest.PartitionData(fetchOffset, + return new FetchRequest.PartitionData(null, fetchOffset, logStartOffset, maxBytes, Optional.empty() @@ -110,7 +112,12 @@ public static ListOffsetsResponseData.ListOffsetsPartitionResponse getListOffset public static KafkaCommandDecoder.KafkaHeaderAndRequest buildRequest(AbstractRequest.Builder builder, SocketAddress serviceAddress) { - AbstractRequest request = builder.build(builder.apiKey().latestVersion()); + return buildRequest(builder, serviceAddress, builder.latestAllowedVersion()); + } + public static KafkaCommandDecoder.KafkaHeaderAndRequest buildRequest(AbstractRequest.Builder builder, + SocketAddress serviceAddress, short version) { + AbstractRequest request = builder.build(version); + assertEquals(version, request.version()); RequestHeader mockHeader = new RequestHeader(builder.apiKey(), request.version(), "dummy", 1233); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java index 6ded463c1c..847cdb7283 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java @@ -639,7 +639,7 @@ public void testListOffsetsForNotExistedTopic() throws Exception { final RequestHeader header = new RequestHeader(ApiKeys.LIST_OFFSETS, ApiKeys.LIST_OFFSETS.latestVersion(), "client", 0); final ListOffsetsRequest request = - ListOffsetsRequest.Builder.forConsumer(true, IsolationLevel.READ_UNCOMMITTED) + ListOffsetsRequest.Builder.forConsumer(true, IsolationLevel.READ_UNCOMMITTED, false) .setTargetTimes(KafkaCommonTestUtils .newListOffsetTargetTimes(topicPartition, ListOffsetsRequest.EARLIEST_TIMESTAMP)) .build(ApiKeys.LIST_OFFSETS.latestVersion()); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerWithAuthorizationTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerWithAuthorizationTest.java index c10026058a..9e4f0ed880 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerWithAuthorizationTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerWithAuthorizationTest.java @@ -341,9 +341,26 @@ public void testHandleProduceRequest() throws ExecutionException, InterruptedExc final ProduceResponse response = (ProduceResponse) responseFuture.get(); //Topic: "topic2" authorize success. Error is not TOPIC_AUTHORIZATION_FAILED - assertEquals(response.responses().get(topicPartition2).error, Errors.NOT_LEADER_OR_FOLLOWER); + assertEquals(response + .data() + .responses() + .stream() + .filter(t -> t.name().equals(topicPartition2.topic())) + .flatMap(t->t.partitionResponses().stream()) + .filter(p -> p.index() == topicPartition2.partition()) + .findFirst() + .get().errorCode(), Errors.NOT_LEADER_OR_FOLLOWER.code()); + //Topic: `TOPIC` authorize failed. - assertEquals(response.responses().get(topicPartition1).error, Errors.TOPIC_AUTHORIZATION_FAILED); + assertEquals(response + .data() + .responses() + .stream() + .filter(t -> t.name().equals(topicPartition1.topic())) + .flatMap(t->t.partitionResponses().stream()) + .filter(p -> p.index() == topicPartition1.partition()) + .findFirst() + .get().errorCode(), Errors.TOPIC_AUTHORIZATION_FAILED.code()); } @Test(timeOut = 20000) @@ -384,7 +401,7 @@ public void testHandleListOffsetRequestAuthorizationSuccess() throws Exception { // Test for ListOffset request verify Earliest get earliest ListOffsetsRequest.Builder builder = ListOffsetsRequest.Builder - .forConsumer(true, IsolationLevel.READ_UNCOMMITTED) + .forConsumer(true, IsolationLevel.READ_UNCOMMITTED, false) .setTargetTimes(KafkaCommonTestUtils .newListOffsetTargetTimes(tp, ListOffsetsRequest.EARLIEST_TIMESTAMP)); @@ -412,7 +429,7 @@ public void testHandleListOffsetRequestAuthorizationFailed() throws Exception { TopicPartition tp = new TopicPartition(topicName, 0); ListOffsetsRequest.Builder builder = ListOffsetsRequest.Builder - .forConsumer(true, IsolationLevel.READ_UNCOMMITTED) + .forConsumer(true, IsolationLevel.READ_UNCOMMITTED, false) .setTargetTimes(KafkaCommonTestUtils .newListOffsetTargetTimes(tp, ListOffsetsRequest.EARLIEST_TIMESTAMP)); @@ -428,12 +445,19 @@ public void testHandleListOffsetRequestAuthorizationFailed() throws Exception { } - @Test(timeOut = 20000) - public void testHandleOffsetFetchRequestAuthorizationSuccess() + @DataProvider(name = "offsetFetchVersions") + public static Object[][] offsetFetchVersions() { + return new Object[][]{ + { (short) 7 }, + { (short) ApiKeys.OFFSET_FETCH.latestVersion() } }; + } + + @Test(timeOut = 20000, dataProvider = "offsetFetchVersions") + public void testHandleOffsetFetchRequestAuthorizationSuccess(short version) throws PulsarAdminException, ExecutionException, InterruptedException { KafkaRequestHandler spyHandler = spy(handler); String topicName = "persistent://" + TENANT + "/" + NAMESPACE + "/" - + "testHandleOffsetFetchRequestAuthorizationSuccess"; + + "testHandleOffsetFetchRequestAuthorizationSuccess_" + version; String groupId = "DemoKafkaOnPulsarConsumer"; // create partitioned topic. @@ -450,7 +474,8 @@ public void testHandleOffsetFetchRequestAuthorizationSuccess() new OffsetFetchRequest.Builder(groupId, false, Collections.singletonList(tp), false); - KafkaCommandDecoder.KafkaHeaderAndRequest request = buildRequest(builder); + KafkaCommandDecoder.KafkaHeaderAndRequest request = buildRequest(builder, version); + assertEquals(version, request.getRequest().version()); CompletableFuture responseFuture = new CompletableFuture<>(); spyHandler.handleOffsetFetchRequest(request, responseFuture); @@ -459,18 +484,35 @@ public void testHandleOffsetFetchRequestAuthorizationSuccess() assertTrue(response instanceof OffsetFetchResponse); OffsetFetchResponse offsetFetchResponse = (OffsetFetchResponse) response; - assertEquals(offsetFetchResponse.responseData().size(), 1); - assertEquals(offsetFetchResponse.error(), Errors.NONE); - offsetFetchResponse.responseData() - .forEach((topicPartition, partitionData) -> assertEquals(partitionData.error, Errors.NONE)); + + if (request.getRequest().version() >= 8) { + assertEquals(offsetFetchResponse.data() + .groups() + .stream().flatMap(g -> g.topics().stream()) + .flatMap(t -> t.partitions().stream()) + .count(), 1); + assertTrue(offsetFetchResponse.data() + .groups() + .stream().flatMap(g -> g.topics().stream()) + .flatMap(t -> t.partitions().stream()) + .allMatch(d->d.errorCode() == Errors.NONE.code())); + } else { + assertEquals(offsetFetchResponse.data() + .topics() + .stream().flatMap(t->t.partitions().stream()) + .count(), 1); + assertEquals(offsetFetchResponse.error(), Errors.NONE); + offsetFetchResponse.data().topics().stream().flatMap(t->t.partitions().stream()) + .forEach((partitionData) -> assertEquals(partitionData.errorCode(), Errors.NONE.code())); + } } - @Test(timeOut = 20000) - public void testHandleOffsetFetchRequestAuthorizationFailed() + @Test(timeOut = 20000, dataProvider = "offsetFetchVersions") + public void testHandleOffsetFetchRequestAuthorizationFailed(short version) throws PulsarAdminException, ExecutionException, InterruptedException { KafkaRequestHandler spyHandler = spy(handler); String topicName = "persistent://" + TENANT + "/" + NAMESPACE + "/" - + "testHandleOffsetFetchRequestAuthorizationFailed"; + + "testHandleOffsetFetchRequestAuthorizationFailed_" + version; String groupId = "DemoKafkaOnPulsarConsumer"; // create partitioned topic. @@ -480,7 +522,8 @@ public void testHandleOffsetFetchRequestAuthorizationFailed() OffsetFetchRequest.Builder builder = new OffsetFetchRequest.Builder(groupId, false, Collections.singletonList(tp), false); - KafkaCommandDecoder.KafkaHeaderAndRequest request = buildRequest(builder); + KafkaCommandDecoder.KafkaHeaderAndRequest request = buildRequest(builder, version); + assertEquals(request.getRequest().version(), version); CompletableFuture responseFuture = new CompletableFuture<>(); spyHandler.handleOffsetFetchRequest(request, responseFuture); @@ -489,10 +532,27 @@ public void testHandleOffsetFetchRequestAuthorizationFailed() assertTrue(response instanceof OffsetFetchResponse); OffsetFetchResponse offsetFetchResponse = (OffsetFetchResponse) response; - assertEquals(offsetFetchResponse.responseData().size(), 1); - assertEquals(offsetFetchResponse.error(), Errors.NONE); - offsetFetchResponse.responseData().forEach((topicPartition, partitionData) -> assertEquals(partitionData.error, - Errors.TOPIC_AUTHORIZATION_FAILED)); + + if (request.getRequest().version() >= 8) { + assertEquals(offsetFetchResponse.data() + .groups() + .stream().flatMap(g -> g.topics().stream()) + .flatMap(t -> t.partitions().stream()) + .count(), 1); + assertTrue(offsetFetchResponse.data() + .groups() + .stream().flatMap(g -> g.topics().stream()) + .flatMap(t -> t.partitions().stream()) + .allMatch(d->d.errorCode() == Errors.TOPIC_AUTHORIZATION_FAILED.code())); + } else { + assertTrue(offsetFetchResponse.data() + .topics() + .stream().flatMap(t -> t.partitions().stream()) + .allMatch(d->d.errorCode() == Errors.TOPIC_AUTHORIZATION_FAILED.code())); + + assertEquals(offsetFetchResponse.error(), Errors.NONE); + } + } @Test(timeOut = 20000) @@ -586,7 +646,7 @@ public void testHandleTxnOffsetCommitAuthorizationFailed() throws ExecutionExcep offsetData.put(topicPartition, KafkaCommonTestUtils.newTxnOffsetCommitRequestCommittedOffset(1L, "")); TxnOffsetCommitRequest.Builder builder = new TxnOffsetCommitRequest.Builder( - "1", group, 1, (short) 1, offsetData, false); + "1", group, 1L, (short) 1, offsetData); KafkaCommandDecoder.KafkaHeaderAndRequest headerAndRequest = buildRequest(builder); // Handle request @@ -618,7 +678,7 @@ public void testHandleTxnOffsetCommitPartAuthorizationFailed() throws ExecutionE TxnOffsetCommitRequest.Builder builder = new TxnOffsetCommitRequest.Builder( - "1", group, 1, (short) 1, offsetData, false); + "1", group, 1L, (short) 1, offsetData); KafkaCommandDecoder.KafkaHeaderAndRequest headerAndRequest = buildRequest(builder); // Topic: `test1` authorize success. @@ -815,6 +875,10 @@ private KafkaCommandDecoder.KafkaHeaderAndRequest buildRequest(AbstractRequest.B return KafkaCommonTestUtils.buildRequest(builder, serviceAddress); } + private KafkaCommandDecoder.KafkaHeaderAndRequest buildRequest(AbstractRequest.Builder builder, short version) { + return KafkaCommonTestUtils.buildRequest(builder, serviceAddress, version); + } + private void handleGroupImmigration() { GroupCoordinator groupCoordinator = handler.getGroupCoordinator(); for (int i = 0; i < conf.getOffsetsTopicNumPartitions(); i++) { diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java index e022552681..bf0abaec14 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java @@ -190,7 +190,8 @@ protected void resetConfig() { // kafka related settings. kafkaConfig.setOffsetsTopicNumPartitions(1); - kafkaConfig.setKafkaTransactionCoordinatorEnabled(false); + // kafka 3.1.x clients init the producerId by default, so we need to enable it. + kafkaConfig.setKafkaTransactionCoordinatorEnabled(true); kafkaConfig.setKafkaTxnLogTopicNumPartitions(1); kafkaConfig.setKafkaListeners( @@ -309,6 +310,7 @@ protected final void internalSetup(boolean startBroker) throws Exception { createClient(); MetadataUtils.createOffsetMetadataIfMissing(conf.getKafkaMetadataTenant(), admin, clusterData, this.conf); + if (conf.isKafkaTransactionCoordinatorEnabled()) { MetadataUtils.createTxnMetadataIfMissing(conf.getKafkaMetadataTenant(), admin, clusterData, this.conf); } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionStateManagerTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionStateManagerTest.java index 8feaeb5d52..cfb3ee5ce2 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionStateManagerTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionStateManagerTest.java @@ -107,6 +107,8 @@ private static TransactionMetadata transactionMetadata(String transactionalId, @BeforeClass @Override protected void setup() throws Exception { + // we need to disable the kafka transaction coordinator to avoid the conflict + this.conf.setKafkaTransactionCoordinatorEnabled(false); this.conf.setKafkaTxnLogTopicNumPartitions(numPartitions); internalSetup(); MetadataUtils.createTxnMetadataIfMissing(conf.getKafkaMetadataTenant(), admin, clusterData, this.conf); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionTest.java index f043c56330..4432e886b5 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionTest.java @@ -57,8 +57,8 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.ProducerFencedException; +import org.apache.kafka.common.message.FetchResponseData; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.requests.FetchResponse; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringDeserializer; @@ -764,12 +764,12 @@ public void testPurgeAbortedTx(boolean takeSnapshotBeforeRecovery) throws Except partitionLog.awaitInitialisation().get(); assertEquals(0, partitionLog.fetchOldestAvailableIndexFromTopic().get().longValue()); - List abortedIndexList = + List abortedIndexList = partitionLog.getProducerStateManager().getAbortedIndexList(Long.MIN_VALUE); abortedIndexList.forEach(tx -> { log.info("TX {}", tx); }); - assertEquals(0, abortedIndexList.get(0).firstOffset); + assertEquals(0, abortedIndexList.get(0).firstOffset()); producer.beginTransaction(); String lastMessage = "msg1b"; @@ -804,7 +804,7 @@ public void testPurgeAbortedTx(boolean takeSnapshotBeforeRecovery) throws Except abortedIndexList.forEach(tx -> { log.info("TX {}", tx); }); - assertEquals(0, abortedIndexList.get(0).firstOffset); + assertEquals(0, abortedIndexList.get(0).firstOffset()); assertEquals(1, abortedIndexList.size()); waitForTransactionsToBeInStableState(transactionalId); @@ -842,7 +842,7 @@ public void testPurgeAbortedTx(boolean takeSnapshotBeforeRecovery) throws Except }); assertEquals(1, abortedIndexList.size()); - assertEquals(0, abortedIndexList.get(0).firstOffset); + assertEquals(0, abortedIndexList.get(0).firstOffset()); producer.beginTransaction(); producer.send(new ProducerRecord<>(topicName, 0, "msg4")).get(); // OFFSET 8 @@ -869,8 +869,8 @@ public void testPurgeAbortedTx(boolean takeSnapshotBeforeRecovery) throws Except log.info("TX {}", tx); }); - assertEquals(0, abortedIndexList.get(0).firstOffset); - assertEquals(11, abortedIndexList.get(1).firstOffset); + assertEquals(0, abortedIndexList.get(0).firstOffset()); + assertEquals(11, abortedIndexList.get(1).firstOffset()); assertEquals(2, abortedIndexList.size()); producer.beginTransaction(); @@ -894,8 +894,8 @@ public void testPurgeAbortedTx(boolean takeSnapshotBeforeRecovery) throws Except log.info("TX {}", tx); }); - assertEquals(0, abortedIndexList.get(0).firstOffset); - assertEquals(11, abortedIndexList.get(1).firstOffset); + assertEquals(0, abortedIndexList.get(0).firstOffset()); + assertEquals(11, abortedIndexList.get(1).firstOffset()); assertEquals(2, abortedIndexList.size()); @@ -910,7 +910,7 @@ public void testPurgeAbortedTx(boolean takeSnapshotBeforeRecovery) throws Except log.info("TX {}", tx); }); assertEquals(1, abortedIndexList.size()); - assertEquals(11, abortedIndexList.get(0).firstOffset); + assertEquals(11, abortedIndexList.get(0).firstOffset()); // use a new consumer group, it will read from the beginning of the topic assertEquals( diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/e2e/DistributedClusterTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/e2e/DistributedClusterTest.java index 97cf46e17d..992c609d2a 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/e2e/DistributedClusterTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/e2e/DistributedClusterTest.java @@ -85,6 +85,7 @@ protected KafkaServiceConfiguration resetConfig(int brokerPort, int webPort, int kConfig.setOffsetsTopicNumPartitions(offsetsTopicNumPartitions); kConfig.setAdvertisedAddress("localhost"); + kConfig.setKafkaTransactionCoordinatorEnabled(true); kConfig.setClusterName(configClusterName); kConfig.setManagedLedgerCacheSizeMB(8); kConfig.setActiveConsumerFailoverDelayTimeMillis(0); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/metrics/MetricsProviderTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/metrics/MetricsProviderTest.java index 17109602ae..e76f85591c 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/metrics/MetricsProviderTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/metrics/MetricsProviderTest.java @@ -127,7 +127,7 @@ public void testMetricsProvider() throws Exception { } Assert.assertEquals(getApiKeysSet(), new TreeSet<>( - Arrays.asList(ApiKeys.API_VERSIONS, ApiKeys.METADATA, ApiKeys.PRODUCE))); + Arrays.asList(ApiKeys.API_VERSIONS, ApiKeys.METADATA, ApiKeys.PRODUCE, ApiKeys.INIT_PRODUCER_ID))); // 2. consume messages with Kafka consumer @Cleanup @@ -154,14 +154,14 @@ public void testMetricsProvider() throws Exception { Assert.assertEquals(getApiKeysSet(), new TreeSet<>(Arrays.asList( ApiKeys.API_VERSIONS, ApiKeys.METADATA, ApiKeys.PRODUCE, ApiKeys.FIND_COORDINATOR, ApiKeys.LIST_OFFSETS, - ApiKeys.OFFSET_FETCH, ApiKeys.FETCH + ApiKeys.OFFSET_FETCH, ApiKeys.FETCH, ApiKeys.INIT_PRODUCER_ID ))); // commit offsets kConsumer.getConsumer().commitSync(Duration.ofSeconds(5)); Assert.assertEquals(getApiKeysSet(), new TreeSet<>(Arrays.asList( ApiKeys.API_VERSIONS, ApiKeys.METADATA, ApiKeys.PRODUCE, ApiKeys.FIND_COORDINATOR, ApiKeys.LIST_OFFSETS, - ApiKeys.OFFSET_FETCH, ApiKeys.FETCH, ApiKeys.OFFSET_COMMIT + ApiKeys.OFFSET_FETCH, ApiKeys.FETCH, ApiKeys.OFFSET_COMMIT, ApiKeys.INIT_PRODUCER_ID ))); try { diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/streams/GlobalKTableTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/streams/GlobalKTableTest.java index af5869529f..e8ac8ee348 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/streams/GlobalKTableTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/streams/GlobalKTableTest.java @@ -25,6 +25,7 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StoreQueryParameters; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.ForeachAction; @@ -109,7 +110,8 @@ public void shouldKStreamGlobalKTableLeftJoin() throws Exception { produceGlobalTableValues(); final ReadOnlyKeyValueStore replicatedStore = - kafkaStreams.store(globalStore, QueryableStoreTypes.keyValueStore()); + kafkaStreams.store( + StoreQueryParameters.fromNameAndType(globalStore, QueryableStoreTypes.keyValueStore())); TestUtils.waitForCondition(() -> "J".equals(replicatedStore.get(5L)), 30000, "waiting for data in replicated store"); @@ -143,7 +145,9 @@ public void shouldKStreamGlobalKTableJoin() throws Exception { produceGlobalTableValues(); final ReadOnlyKeyValueStore replicatedStore = - kafkaStreams.store(globalStore, QueryableStoreTypes.keyValueStore()); + kafkaStreams.store( + StoreQueryParameters + .fromNameAndType(globalStore, QueryableStoreTypes.keyValueStore())); TestUtils.waitForCondition(() -> "J".equals(replicatedStore.get(5L)), 30000, "waiting for data in replicated store"); @@ -173,13 +177,15 @@ public void shouldRestoreGlobalInMemoryKTableOnRestart() throws Exception { Thread.sleep(1000); // NOTE: it may take a few milliseconds to wait streams started ReadOnlyKeyValueStore store = - kafkaStreams.store(globalStore, QueryableStoreTypes.keyValueStore()); + kafkaStreams.store( + StoreQueryParameters.fromNameAndType(globalStore, QueryableStoreTypes.keyValueStore())); assertEquals(store.approximateNumEntries(), 4L); kafkaStreams.close(); startStreams(); Thread.sleep(1000); // NOTE: it may take a few milliseconds to wait streams started - store = kafkaStreams.store(globalStore, QueryableStoreTypes.keyValueStore()); + store = kafkaStreams.store( + StoreQueryParameters.fromNameAndType(globalStore, QueryableStoreTypes.keyValueStore())); assertEquals(store.approximateNumEntries(), 4L); } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/streams/KStreamAggregationTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/streams/KStreamAggregationTest.java index b156fb7106..1794bb2676 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/streams/KStreamAggregationTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/streams/KStreamAggregationTest.java @@ -48,9 +48,11 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StoreQueryParameters; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.Aggregator; import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Grouped; import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.kstream.KGroupedStream; import org.apache.kafka.streams.kstream.KStream; @@ -58,7 +60,6 @@ import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.Reducer; -import org.apache.kafka.streams.kstream.Serialized; import org.apache.kafka.streams.kstream.SessionWindowedDeserializer; import org.apache.kafka.streams.kstream.SessionWindows; import org.apache.kafka.streams.kstream.TimeWindowedDeserializer; @@ -115,7 +116,7 @@ protected void extraSetup() throws Exception { groupedStream = stream .groupBy( mapper, - Serialized.with(Serdes.String(), Serdes.String())); + Grouped.with(Serdes.String(), Serdes.String())); reducer = (value1, value2) -> value1 + ":" + value2; initializer = () -> 0; @@ -174,7 +175,7 @@ public void shouldReduceWindowed() throws Exception { final Serde> windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class); groupedStream - .windowedBy(TimeWindows.of(500L)) + .windowedBy(TimeWindows.of(Duration.ofMillis(500L))) .reduce(reducer) .toStream() .to(outputTopic, Produced.with(windowedSerde, Serdes.String())); @@ -279,7 +280,7 @@ public void shouldAggregateWindowed() throws Exception { produceMessages(secondTimestamp); final Serde> windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class); - groupedStream.windowedBy(TimeWindows.of(500L)) + groupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(500L))) .aggregate( initializer, aggregator, @@ -415,8 +416,8 @@ public void shouldGroupByKey() throws Exception { produceMessages(timestamp); produceMessages(timestamp); - stream.groupByKey(Serialized.with(Serdes.Integer(), Serdes.String())) - .windowedBy(TimeWindows.of(500L)) + stream.groupByKey(Grouped.with(Serdes.Integer(), Serdes.String())) + .windowedBy(TimeWindows.of(Duration.ofMillis(500L))) .count() .toStream((windowedKey, value) -> windowedKey.key() + "@" + windowedKey.window().start()).to(outputTopic, Produced.with(Serdes.String(), Serdes.Long())); @@ -509,8 +510,9 @@ public void shouldCountSessionWindows() throws Exception { final CountDownLatch latch = new CountDownLatch(11); builder.stream(userSessionsStream, Consumed.with(Serdes.String(), Serdes.String())) - .groupByKey(Serialized.with(Serdes.String(), Serdes.String())) - .windowedBy(SessionWindows.with(sessionGap).until(maintainMillis)) + .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) + .windowedBy(SessionWindows.ofInactivityGapAndGrace(Duration.ofMillis(sessionGap), + Duration.ofMillis(maintainMillis))) .count() .toStream() .transform(() -> new Transformer, Long, KeyValue>() { @@ -608,8 +610,9 @@ public void shouldReduceSessionWindows() throws Exception { final CountDownLatch latch = new CountDownLatch(11); final String userSessionsStore = "UserSessionsStore"; builder.stream(userSessionsStream, Consumed.with(Serdes.String(), Serdes.String())) - .groupByKey(Serialized.with(Serdes.String(), Serdes.String())) - .windowedBy(SessionWindows.with(sessionGap).until(maintainMillis)) + .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) + .windowedBy(SessionWindows.ofInactivityGapAndGrace(Duration.ofMillis(sessionGap), + Duration.ofMillis(maintainMillis))) .reduce((value1, value2) -> value1 + ":" + value2, Materialized.as(userSessionsStore)) .toStream() .foreach((key, value) -> { @@ -620,7 +623,8 @@ public void shouldReduceSessionWindows() throws Exception { startStreams(); latch.await(30, TimeUnit.SECONDS); final ReadOnlySessionStore sessionStore = - kafkaStreams.store(userSessionsStore, QueryableStoreTypes.sessionStore()); + kafkaStreams.store( + StoreQueryParameters.fromNameAndType(userSessionsStore, QueryableStoreTypes.sessionStore())); // verify correct data received assertThat(results.get(new Windowed<>("bob", new SessionWindow(t1, t1))), equalTo("start")); @@ -690,6 +694,9 @@ private List> receiveMessages(final Deserializer keyDes consumerProperties.setProperty(StreamsConfig.WINDOW_SIZE_MS_CONFIG, Long.MAX_VALUE + ""); if (keyDeserializer instanceof TimeWindowedDeserializer || keyDeserializer instanceof SessionWindowedDeserializer) { + consumerProperties.setProperty(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, + Serdes.serdeFrom(innerClass).getClass().getName()); + consumerProperties.setProperty(StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS, Serdes.serdeFrom(innerClass).getClass().getName()); } @@ -740,6 +747,8 @@ private String readWindowedKeyedMessagesViaConsoleConsumer(final Deserial final Map configs = new HashMap<>(); Serde serde = Serdes.serdeFrom(innerClass); configs.put(StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS, serde.getClass().getName()); + configs.put(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, serde.getClass().getName()); + serde.close(); // https://issues.apache.org/jira/browse/KAFKA-10366 configs.put(StreamsConfig.WINDOW_SIZE_MS_CONFIG, Long.toString(Long.MAX_VALUE)); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/streams/KTableTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/streams/KTableTest.java index b27b455ee8..73197e75f1 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/streams/KTableTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/streams/KTableTest.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StoreQueryParameters; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.kstream.Consumed; @@ -115,14 +116,16 @@ public void shouldRestoreInMemoryKTableOnRestart() throws Exception { startStreams(); Thread.sleep(1000); // NOTE: it may take a few milliseconds to wait streams started final ReadOnlyKeyValueStore store = - kafkaStreams.store(this.store, QueryableStoreTypes.keyValueStore()); + kafkaStreams.store( + StoreQueryParameters.fromNameAndType(this.store, QueryableStoreTypes.keyValueStore())); TestUtils.waitForCondition(() -> store.approximateNumEntries() == 4L, 30000L, "waiting for values"); kafkaStreams.close(); startStreams(); Thread.sleep(1000); // NOTE: it may take a few milliseconds to wait streams started final ReadOnlyKeyValueStore recoveredStore = - kafkaStreams.store(this.store, QueryableStoreTypes.keyValueStore()); + kafkaStreams.store( + StoreQueryParameters.fromNameAndType(this.store, QueryableStoreTypes.keyValueStore())); TestUtils.waitForCondition(() -> { try { return recoveredStore.approximateNumEntries() == 4L; diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/streams/KafkaStreamsTestBase.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/streams/KafkaStreamsTestBase.java index 88a0ad7db7..41c676eb28 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/streams/KafkaStreamsTestBase.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/streams/KafkaStreamsTestBase.java @@ -15,8 +15,8 @@ import io.streamnative.pulsar.handlers.kop.KopProtocolHandlerTestBase; import io.streamnative.pulsar.handlers.kop.utils.timer.MockTime; +import java.time.Duration; import java.util.Properties; -import java.util.concurrent.TimeUnit; import lombok.Getter; import lombok.NonNull; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -83,7 +83,7 @@ protected void setupTestCase() throws Exception { @AfterMethod protected void cleanupTestCase() throws Exception { if (kafkaStreams != null) { - kafkaStreams.close(3, TimeUnit.SECONDS); + kafkaStreams.close(Duration.ofSeconds(3)); TestUtils.purgeLocalStreamsState(streamsConfiguration); } } From 3a4e749a9269a638e6c832a7e796ef4a63080894 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Thu, 9 Mar 2023 20:41:29 +0100 Subject: [PATCH 2/4] [transactions] Implement KIP-664 listTransactions (#76) (cherry picked from commit 5ef4a8531cde389af898cf36f57c518b340e426c) --- .../handlers/kop/KafkaCommandDecoder.java | 6 + .../handlers/kop/KafkaRequestHandler.java | 26 +++- .../coordinator/group/GroupCoordinator.java | 15 +-- .../transaction/TransactionCoordinator.java | 18 ++- .../transaction/TransactionState.java | 22 ++++ .../transaction/TransactionStateManager.java | 67 ++++++++++ .../kop/utils/KafkaResponseUtils.java | 19 +-- .../group/GroupCoordinatorTest.java | 22 ++-- .../transaction/TransactionTest.java | 114 ++++++++++++++++++ 9 files changed, 279 insertions(+), 30 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java index ad7b3f9d61..6e61ceb50b 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java @@ -314,6 +314,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception case LIST_GROUPS: handleListGroupsRequest(kafkaHeaderAndRequest, responseFuture); break; + case LIST_TRANSACTIONS: + handleListTransactionsRequest(kafkaHeaderAndRequest, responseFuture); + break; case DELETE_GROUPS: handleDeleteGroupsRequest(kafkaHeaderAndRequest, responseFuture); break; @@ -572,6 +575,9 @@ protected void handleError(KafkaHeaderAndRequest kafkaHeaderAndRequest, protected abstract void handleListGroupsRequest(KafkaHeaderAndRequest listGroups, CompletableFuture response); + protected abstract void + handleListTransactionsRequest(KafkaHeaderAndRequest listGroups, CompletableFuture response); + protected abstract void handleDeleteGroupsRequest(KafkaHeaderAndRequest deleteGroups, CompletableFuture response); diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java index 466da180d0..cf69d9b4f1 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java @@ -31,6 +31,7 @@ import io.streamnative.pulsar.handlers.kop.exceptions.KoPTopicException; import io.streamnative.pulsar.handlers.kop.offset.OffsetAndMetadata; import io.streamnative.pulsar.handlers.kop.offset.OffsetMetadata; +import io.streamnative.pulsar.handlers.kop.scala.Either; import io.streamnative.pulsar.handlers.kop.security.SaslAuthenticator; import io.streamnative.pulsar.handlers.kop.security.Session; import io.streamnative.pulsar.handlers.kop.security.auth.Authorizer; @@ -125,6 +126,7 @@ import org.apache.kafka.common.message.LeaveGroupRequestData; import org.apache.kafka.common.message.ListOffsetsRequestData; import org.apache.kafka.common.message.ListOffsetsResponseData; +import org.apache.kafka.common.message.ListTransactionsResponseData; import org.apache.kafka.common.message.OffsetCommitRequestData; import org.apache.kafka.common.message.ProduceRequestData; import org.apache.kafka.common.message.SaslAuthenticateResponseData; @@ -173,6 +175,8 @@ import org.apache.kafka.common.requests.ListOffsetRequestV0; import org.apache.kafka.common.requests.ListOffsetsRequest; import org.apache.kafka.common.requests.ListOffsetsResponse; +import org.apache.kafka.common.requests.ListTransactionsRequest; +import org.apache.kafka.common.requests.ListTransactionsResponse; import org.apache.kafka.common.requests.MetadataRequest; import org.apache.kafka.common.requests.MetadataResponse.PartitionMetadata; import org.apache.kafka.common.requests.MetadataResponse.TopicMetadata; @@ -2038,8 +2042,26 @@ protected void handleDescribeGroupRequest(KafkaHeaderAndRequest describeGroup, protected void handleListGroupsRequest(KafkaHeaderAndRequest listGroups, CompletableFuture resultFuture) { checkArgument(listGroups.getRequest() instanceof ListGroupsRequest); - KeyValue> listResult = getGroupCoordinator().handleListGroups(); - resultFuture.complete(KafkaResponseUtils.newListGroups(listResult.getKey(), listResult.getValue())); + Either> listResult = getGroupCoordinator().handleListGroups(); + resultFuture.complete(KafkaResponseUtils.newListGroups(listResult)); + } + + @Override + protected void handleListTransactionsRequest(KafkaHeaderAndRequest listTransactions, + CompletableFuture resultFuture) { + checkArgument(listTransactions.getRequest() instanceof ListTransactionsRequest); + ListTransactionsRequest request = (ListTransactionsRequest) listTransactions.getRequest(); + List stateFilters = request.data().stateFilters(); + if (stateFilters == null) { + stateFilters = Collections.emptyList(); + } + List producerIdFilters = request.data().producerIdFilters(); + if (producerIdFilters == null) { + producerIdFilters = Collections.emptyList(); + } + ListTransactionsResponseData listResult = getTransactionCoordinator() + .handleListTransactions(stateFilters, producerIdFilters); + resultFuture.complete(new ListTransactionsResponse(listResult)); } @Override diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupCoordinator.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupCoordinator.java index f1f5d90ac1..98813604f7 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupCoordinator.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupCoordinator.java @@ -29,6 +29,7 @@ import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadata.GroupOverview; import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadata.GroupSummary; import io.streamnative.pulsar.handlers.kop.offset.OffsetAndMetadata; +import io.streamnative.pulsar.handlers.kop.scala.Either; import io.streamnative.pulsar.handlers.kop.utils.CoreUtils; import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationKey.GroupKey; import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationKey.MemberKey; @@ -832,22 +833,16 @@ public KeyValue> handleFetchOffsets( ); } - public KeyValue> handleListGroups() { + public Either> handleListGroups() { if (!isActive.get()) { - return new KeyValue<>(Errors.COORDINATOR_NOT_AVAILABLE, new ArrayList<>()); + return Either.left(Errors.COORDINATOR_NOT_AVAILABLE); } else { - Errors errors; if (groupManager.isLoading()) { - errors = Errors.COORDINATOR_LOAD_IN_PROGRESS; - } else { - errors = Errors.NONE; + return Either.left(Errors.COORDINATOR_LOAD_IN_PROGRESS); } List overviews = new ArrayList<>(); groupManager.currentGroups().forEach(group -> overviews.add(group.overview())); - return new KeyValue<>( - errors, - overviews - ); + return Either.right(overviews); } } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionCoordinator.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionCoordinator.java index 498cbb0e2d..f438c6790f 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionCoordinator.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionCoordinator.java @@ -32,6 +32,7 @@ import io.streamnative.pulsar.handlers.kop.storage.PulsarTopicProducerStateManagerSnapshotBuffer; import io.streamnative.pulsar.handlers.kop.utils.MetadataUtils; import io.streamnative.pulsar.handlers.kop.utils.ProducerIdAndEpoch; +import java.util.List; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -51,6 +52,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.ListTransactionsResponseData; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.requests.TransactionResult; @@ -79,6 +81,8 @@ public class TransactionCoordinator { private final Time time; + private final AtomicBoolean isActive = new AtomicBoolean(false); + private static final BiConsumer onEndTransactionComplete = (txnIdAndPidEpoch, errors) -> { @@ -215,6 +219,17 @@ public static String getTopicPartitionName(String topicPartitionName, int partit return topicPartitionName + PARTITIONED_TOPIC_SUFFIX + partitionId; } + public ListTransactionsResponseData handleListTransactions(List filteredStates, + List filteredProducerIds) { + // https://github.com/apache/kafka/blob/915991445fde106d02e61a70425ae2601c813db0/core/ + // src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L259 + if (!isActive.get()) { + log.warn("The transaction coordinator is not active, so it will reject list transaction request"); + return new ListTransactionsResponseData().setErrorCode(Errors.NOT_COORDINATOR.code()); + } + return this.txnManager.listTransactionStates(filteredProducerIds, filteredStates); + } + @Data @EqualsAndHashCode @AllArgsConstructor @@ -925,7 +940,8 @@ public CompletableFuture startup(boolean enableTransactionalIdExpiration) txnManager.startup(enableTransactionalIdExpiration); return this.producerIdManager.initialize().thenCompose(ignored -> { - log.info("Startup transaction coordinator complete."); + log.info("{} Startup transaction coordinator complete.", namespacePrefixForMetadata); + isActive.set(true); return CompletableFuture.completedFuture(null); }); } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionState.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionState.java index 7919449097..01d626f391 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionState.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionState.java @@ -117,4 +117,26 @@ public boolean isExpirationAllowed() { return false; } } + + public org.apache.kafka.clients.admin.TransactionState toAdminState() { + switch (this) { + case EMPTY: + return org.apache.kafka.clients.admin.TransactionState.EMPTY; + case ONGOING: + return org.apache.kafka.clients.admin.TransactionState.ONGOING; + case PREPARE_COMMIT: + return org.apache.kafka.clients.admin.TransactionState.PREPARE_COMMIT; + case PREPARE_ABORT: + return org.apache.kafka.clients.admin.TransactionState.PREPARE_ABORT; + case COMPLETE_COMMIT: + return org.apache.kafka.clients.admin.TransactionState.COMPLETE_COMMIT; + case COMPLETE_ABORT: + return org.apache.kafka.clients.admin.TransactionState.COMPLETE_ABORT; + case PREPARE_EPOCH_FENCE: + return org.apache.kafka.clients.admin.TransactionState.PREPARE_EPOCH_FENCE; + case DEAD: + default: + return org.apache.kafka.clients.admin.TransactionState.UNKNOWN; + } + } } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionStateManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionStateManager.java index 2855f88429..9c62affed7 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionStateManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionStateManager.java @@ -25,6 +25,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -40,6 +41,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.common.concurrent.FutureUtils; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.ListTransactionsResponseData; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.types.SchemaException; import org.apache.kafka.common.requests.ProduceResponse; @@ -244,6 +246,71 @@ private boolean shouldExpire(TransactionMetadata txnMetadata, Long currentTimeMs <= (currentTimeMs - transactionConfig.getTransactionalIdExpirationMs()); } + private static boolean shouldInclude(TransactionMetadata txnMetadata, + List filterProducerIds, Set filterStateNames) { + if (txnMetadata.getState() == TransactionState.DEAD) { + // We filter the `Dead` state since it is a transient state which + // indicates that the transactionalId and its metadata are in the + // process of expiration and removal. + return false; + } else if (!filterProducerIds.isEmpty() && !filterProducerIds.contains(txnMetadata.getProducerId())) { + return false; + } else if (!filterStateNames.isEmpty() && !filterStateNames.contains( + txnMetadata.getState().toAdminState().toString())) { + return false; + } else { + return true; + } + } + + public ListTransactionsResponseData listTransactionStates(List filteredProducerIds, + List filteredStates) { + return CoreUtils.inReadLock(stateLock, () -> { + ListTransactionsResponseData response = new ListTransactionsResponseData(); + if (!loadingPartitions.isEmpty()) { + response.setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()); + } else { + Set filterStates = new HashSet<>(); + for (TransactionState stateName : TransactionState.values()) { + String nameForTheClient = stateName.toAdminState().toString(); + if (filteredStates.contains(nameForTheClient)) { + filterStates.add(nameForTheClient); + } else { + response.unknownStateFilters().add(nameForTheClient); + } + } + List states = new ArrayList<>(); + transactionMetadataCache.forEach((__, cache) -> { + cache.values().forEach(txnMetadata -> { + txnMetadata.inLock(() -> { + // use toString() to get the name of the state according to the protocol + ListTransactionsResponseData.TransactionState transactionState = + new ListTransactionsResponseData.TransactionState() + .setTransactionalId(txnMetadata.getTransactionalId()) + .setProducerId(txnMetadata.getProducerId()) + .setTransactionState(txnMetadata.getState().toAdminState().toString()); + + if (shouldInclude(txnMetadata, filteredProducerIds, filterStates)) { + if (log.isDebugEnabled()) { + log.debug("add transaction state: {}", transactionState); + } + states.add(transactionState); + } else { + if (log.isDebugEnabled()) { + log.debug("Skip transaction state: {}", transactionState); + } + } + return null; + }); + }); + }); + response.setErrorCode(Errors.NONE.code()) + .setTransactionStates(states); + } + return response; + }); + } + @Data @AllArgsConstructor private static class TransactionalIdCoordinatorEpochAndMetadata { diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/KafkaResponseUtils.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/KafkaResponseUtils.java index f20fc5251c..d779862b87 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/KafkaResponseUtils.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/KafkaResponseUtils.java @@ -15,6 +15,7 @@ import io.streamnative.pulsar.handlers.kop.ApiVersion; import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadata; +import io.streamnative.pulsar.handlers.kop.scala.Either; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -278,14 +279,18 @@ public static LeaveGroupResponse newLeaveGroup(Errors errors) { return new LeaveGroupResponse(data); } - public static ListGroupsResponse newListGroups(Errors errors, - List groups) { + public static ListGroupsResponse newListGroups(Either> results) { ListGroupsResponseData data = new ListGroupsResponseData(); - data.setErrorCode(errors.code()); - data.setGroups(groups.stream().map(overView -> new ListGroupsResponseData.ListedGroup() - .setGroupId(overView.groupId()) - .setProtocolType(overView.protocolType())) - .collect(Collectors.toList())); + data.setErrorCode(results.isLeft() ? results.getLeft().code() : Errors.NONE.code()); + if (!results.isLeft()) { + data.setGroups(results.getRight().stream().map(overView -> new ListGroupsResponseData.ListedGroup() + .setGroupId(overView.groupId()) + .setProtocolType(overView.protocolType())) + .collect(Collectors.toList())); + + } else { + data.setGroups(Collections.emptyList()); + } return new ListGroupsResponse(data); } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupCoordinatorTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupCoordinatorTest.java index 105958975d..3696b926dc 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupCoordinatorTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupCoordinatorTest.java @@ -15,6 +15,7 @@ import static org.mockito.Mockito.spy; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; @@ -28,6 +29,7 @@ import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadata.GroupSummary; import io.streamnative.pulsar.handlers.kop.coordinator.group.MemberMetadata.MemberSummary; import io.streamnative.pulsar.handlers.kop.offset.OffsetAndMetadata; +import io.streamnative.pulsar.handlers.kop.scala.Either; import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationPurgatory; import io.streamnative.pulsar.handlers.kop.utils.timer.MockTimer; import java.util.ArrayList; @@ -218,8 +220,8 @@ public void testRequestHandlingWhileLoadingInProgress() throws Exception { assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS, describeGroupResult.getKey()); // ListGroups - KeyValue> listGroupsResult = groupCoordinator.handleListGroups(); - assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS, listGroupsResult.getKey()); + Either> listGroupsResult = groupCoordinator.handleListGroups(); + assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS, listGroupsResult.getLeft()); // DeleteGroups Map deleteGroupsErrors = groupCoordinator.handleDeleteGroups( @@ -1695,12 +1697,12 @@ groupId, memberId, protocolType, newProtocols() ).get(); assertEquals(Errors.NONE, syncGroupResult.getKey()); - KeyValue> groups = groupCoordinator.handleListGroups(); - assertEquals(Errors.NONE, groups.getKey()); - assertEquals(1, groups.getValue().size()); + Either> groups = groupCoordinator.handleListGroups(); + assertFalse(groups.isLeft()); + assertEquals(1, groups.getRight().size()); assertEquals( new GroupOverview("groupId", "consumer"), - groups.getValue().get(0) + groups.getRight().get(0) ); } @@ -1712,12 +1714,12 @@ groupId, memberId, protocolType, newProtocols() ); assertEquals(Errors.NONE, joinGroupResult.getError()); - KeyValue> groups = groupCoordinator.handleListGroups(); - assertEquals(Errors.NONE, groups.getKey()); - assertEquals(1, groups.getValue().size()); + Either> groups = groupCoordinator.handleListGroups(); + assertFalse(groups.isLeft()); + assertEquals(1, groups.getRight().size()); assertEquals( new GroupOverview("groupId", "consumer"), - groups.getValue().get(0) + groups.getRight().get(0) ); } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionTest.java index 4432e886b5..df0f599750 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionTest.java @@ -31,6 +31,7 @@ import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -46,7 +47,10 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.RandomStringUtils; import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.ListTransactionsOptions; +import org.apache.kafka.clients.admin.ListTransactionsResult; import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.TransactionListing; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -1320,6 +1324,116 @@ public void testNotFencedWithBeginTransaction() throws Exception { producer2.close(); } + @Test(timeOut = 100000 * 30) + public void testListTransactions() throws Exception { + + String topicName = "testListTransactions"; + String transactionalId = "myProducer_" + UUID.randomUUID(); + + @Cleanup + KafkaProducer producer = buildTransactionProducer(transactionalId); + @Cleanup + AdminClient kafkaAdmin = AdminClient.create(newKafkaAdminClientProperties()); + + producer.initTransactions(); + producer.beginTransaction(); + assertTransactionState(kafkaAdmin, transactionalId, + org.apache.kafka.clients.admin.TransactionState.EMPTY); + producer.send(new ProducerRecord<>(topicName, 1, "bar")).get(); + producer.flush(); + + ListTransactionsResult listTransactionsResult = kafkaAdmin.listTransactions(); + listTransactionsResult.all().get().forEach(t -> { + log.info("Found transactionalId: {} {} {}", + t.transactionalId(), + t.producerId(), + t.state()); + }); + assertTransactionState(kafkaAdmin, transactionalId, + org.apache.kafka.clients.admin.TransactionState.ONGOING); + Awaitility.await().untilAsserted(() -> { + assertTransactionState(kafkaAdmin, transactionalId, + org.apache.kafka.clients.admin.TransactionState.ONGOING); + }); + producer.commitTransaction(); + Awaitility.await().untilAsserted(() -> { + assertTransactionState(kafkaAdmin, transactionalId, + org.apache.kafka.clients.admin.TransactionState.COMPLETE_COMMIT); + }); + producer.beginTransaction(); + + assertTransactionState(kafkaAdmin, transactionalId, + org.apache.kafka.clients.admin.TransactionState.COMPLETE_COMMIT); + + producer.send(new ProducerRecord<>(topicName, 1, "bar")).get(); + producer.flush(); + producer.abortTransaction(); + Awaitility.await().untilAsserted(() -> { + assertTransactionState(kafkaAdmin, transactionalId, + org.apache.kafka.clients.admin.TransactionState.COMPLETE_ABORT); + }); + producer.close(); + assertTransactionState(kafkaAdmin, transactionalId, + org.apache.kafka.clients.admin.TransactionState.COMPLETE_ABORT); + } + + private static void assertTransactionState(AdminClient kafkaAdmin, String transactionalId, + org.apache.kafka.clients.admin.TransactionState transactionState) + throws Exception { + ListTransactionsResult listTransactionsResult = kafkaAdmin.listTransactions(); + Collection transactionListings = listTransactionsResult.all().get(); + transactionListings.forEach(t -> { + log.info("Found transactionalId: {} {} {}", + t.transactionalId(), + t.producerId(), + t.state()); + }); + TransactionListing transactionListing = transactionListings + .stream() + .filter(t -> t.transactionalId().equals(transactionalId)) + .findFirst() + .get(); + assertEquals(transactionState, transactionListing.state()); + + // filter for the same state + ListTransactionsOptions optionFilterState = new ListTransactionsOptions() + .filterStates(Collections.singleton(transactionState)); + listTransactionsResult = kafkaAdmin.listTransactions(optionFilterState); + transactionListings = listTransactionsResult.all().get(); + transactionListing = transactionListings + .stream() + .filter(t -> t.transactionalId().equals(transactionalId)) + .findFirst() + .get(); + assertEquals(transactionState, transactionListing.state()); + + + // filter for the same producer id + ListTransactionsOptions optionFilterProducer = new ListTransactionsOptions() + .filterProducerIds(Collections.singleton(transactionListing.producerId())); + listTransactionsResult = kafkaAdmin.listTransactions(optionFilterProducer); + transactionListings = listTransactionsResult.all().get(); + transactionListing = transactionListings + .stream() + .filter(t -> t.transactionalId().equals(transactionalId)) + .findFirst() + .get(); + assertEquals(transactionState, transactionListing.state()); + + // filter for the same producer id and state + ListTransactionsOptions optionFilterProducerAndState = new ListTransactionsOptions() + .filterStates(Collections.singleton(transactionState)) + .filterProducerIds(Collections.singleton(transactionListing.producerId())); + listTransactionsResult = kafkaAdmin.listTransactions(optionFilterProducerAndState); + transactionListings = listTransactionsResult.all().get(); + transactionListing = transactionListings + .stream() + .filter(t -> t.transactionalId().equals(transactionalId)) + .findFirst() + .get(); + assertEquals(transactionState, transactionListing.state()); + } + /** * Get the Kafka server address. */ From c9e90c29f55daf7c6d284fc342c870865221a33d Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Fri, 10 Mar 2023 16:29:14 +0100 Subject: [PATCH 3/4] [transactions] Implement KIP-664 - DESCRIBE_TRANSACTIONS (#77) (cherry picked from commit 1f2fe99e0e88c6bba1291525c242563a5eb94ef2) --- .../handlers/kop/KafkaCommandDecoder.java | 6 ++ .../handlers/kop/KafkaRequestHandler.java | 13 ++++ .../transaction/TransactionCoordinator.java | 72 +++++++++++++++++++ .../transaction/TransactionTest.java | 39 ++++++++-- 4 files changed, 126 insertions(+), 4 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java index 6e61ceb50b..e0934a6db5 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java @@ -317,6 +317,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception case LIST_TRANSACTIONS: handleListTransactionsRequest(kafkaHeaderAndRequest, responseFuture); break; + case DESCRIBE_TRANSACTIONS: + handleDescribeTransactionsRequest(kafkaHeaderAndRequest, responseFuture); + break; case DELETE_GROUPS: handleDeleteGroupsRequest(kafkaHeaderAndRequest, responseFuture); break; @@ -578,6 +581,9 @@ protected void handleError(KafkaHeaderAndRequest kafkaHeaderAndRequest, protected abstract void handleListTransactionsRequest(KafkaHeaderAndRequest listGroups, CompletableFuture response); + protected abstract void + handleDescribeTransactionsRequest(KafkaHeaderAndRequest listGroups, CompletableFuture response); + protected abstract void handleDeleteGroupsRequest(KafkaHeaderAndRequest deleteGroups, CompletableFuture response); diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java index cf69d9b4f1..75dabce732 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java @@ -115,6 +115,7 @@ import org.apache.kafka.common.message.DescribeClusterResponseData; import org.apache.kafka.common.message.DescribeConfigsRequestData; import org.apache.kafka.common.message.DescribeConfigsResponseData; +import org.apache.kafka.common.message.DescribeTransactionsResponseData; import org.apache.kafka.common.message.EndTxnRequestData; import org.apache.kafka.common.message.EndTxnResponseData; import org.apache.kafka.common.message.FetchRequestData; @@ -159,6 +160,8 @@ import org.apache.kafka.common.requests.DescribeConfigsRequest; import org.apache.kafka.common.requests.DescribeConfigsResponse; import org.apache.kafka.common.requests.DescribeGroupsRequest; +import org.apache.kafka.common.requests.DescribeTransactionsRequest; +import org.apache.kafka.common.requests.DescribeTransactionsResponse; import org.apache.kafka.common.requests.EndTxnRequest; import org.apache.kafka.common.requests.EndTxnResponse; import org.apache.kafka.common.requests.FetchRequest; @@ -2064,6 +2067,16 @@ protected void handleListTransactionsRequest(KafkaHeaderAndRequest listTransacti resultFuture.complete(new ListTransactionsResponse(listResult)); } + @Override + protected void handleDescribeTransactionsRequest(KafkaHeaderAndRequest listGroups, + CompletableFuture response) { + checkArgument(listGroups.getRequest() instanceof DescribeTransactionsRequest); + DescribeTransactionsRequest request = (DescribeTransactionsRequest) listGroups.getRequest(); + DescribeTransactionsResponseData describeResult = getTransactionCoordinator() + .handleDescribeTransactions(request.data().transactionalIds()); + response.complete(new DescribeTransactionsResponse(describeResult)); + } + @Override protected void handleDeleteGroupsRequest(KafkaHeaderAndRequest deleteGroups, CompletableFuture resultFuture) { diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionCoordinator.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionCoordinator.java index f438c6790f..9d92b52bdf 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionCoordinator.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionCoordinator.java @@ -13,6 +13,7 @@ */ package io.streamnative.pulsar.handlers.kop.coordinator.transaction; +import static io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionState.DEAD; import static io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionState.ONGOING; import static io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionState.PREPARE_ABORT; import static io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionState.PREPARE_COMMIT; @@ -52,6 +53,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.DescribeTransactionsResponseData; import org.apache.kafka.common.message.ListTransactionsResponseData; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.RecordBatch; @@ -230,6 +232,76 @@ public ListTransactionsResponseData handleListTransactions(List filtered return this.txnManager.listTransactionStates(filteredProducerIds, filteredStates); } + public DescribeTransactionsResponseData handleDescribeTransactions(List transactionalIds) { + DescribeTransactionsResponseData response = new DescribeTransactionsResponseData(); + if (transactionalIds != null) { + transactionalIds.forEach(transactionalId -> { + DescribeTransactionsResponseData.TransactionState transactionState = + handleDescribeTransactions(transactionalId); + response.transactionStates().add(transactionState); + }); + } + return response; + } + + private DescribeTransactionsResponseData.TransactionState handleDescribeTransactions(String transactionalId) { + // https://github.com/apache/kafka/blob/915991445fde106d02e61a70425ae2601c813db0/core/ + // src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L270 + if (transactionalId == null) { + throw new IllegalArgumentException("Invalid null transactionalId"); + } + + DescribeTransactionsResponseData.TransactionState transactionState = + new DescribeTransactionsResponseData.TransactionState() + .setTransactionalId(transactionalId); + + if (!isActive.get()) { + transactionState.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()); + } else if (transactionalId.isEmpty()) { + transactionState.setErrorCode(Errors.INVALID_REQUEST.code()); + } else { + Either> tState = + txnManager.getTransactionState(transactionalId); + if (tState.isLeft()) { + transactionState.setErrorCode(tState.getLeft().code()); + } else { + Optional right = tState.getRight(); + if (!right.isPresent()) { + transactionState.setErrorCode(Errors.TRANSACTIONAL_ID_NOT_FOUND.code()); + } else { + CoordinatorEpochAndTxnMetadata coordinatorEpochAndMetadata = right.get(); + TransactionMetadata txnMetadata = coordinatorEpochAndMetadata.getTransactionMetadata(); + txnMetadata.inLock(() -> { + if (txnMetadata.getState() == DEAD) { + // The transaction state is being expired, so ignore it + transactionState.setErrorCode(Errors.TRANSACTIONAL_ID_NOT_FOUND.code()); + } else { + txnMetadata.getTopicPartitions().forEach(topicPartition -> { + var topicData = transactionState.topics().find(topicPartition.topic()); + if (topicData == null) { + topicData = new DescribeTransactionsResponseData.TopicData() + .setTopic(topicPartition.topic()); + transactionState.topics().add(topicData); + } + topicData.partitions().add(topicPartition.partition()); + }); + + transactionState + .setErrorCode(Errors.NONE.code()) + .setProducerId(txnMetadata.getProducerId()) + .setProducerEpoch(txnMetadata.getProducerEpoch()) + .setTransactionState(txnMetadata.getState().toAdminState().toString()) + .setTransactionTimeoutMs(txnMetadata.getTxnTimeoutMs()) + .setTransactionStartTimeMs(txnMetadata.getTxnStartTimestamp()); + } + return null; + }); + } + } + } + return transactionState; + } + @Data @EqualsAndHashCode @AllArgsConstructor diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionTest.java index df0f599750..470a3e10cc 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionTest.java @@ -17,6 +17,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertSame; import static org.testng.Assert.assertTrue; import static org.testng.Assert.expectThrows; @@ -50,6 +51,7 @@ import org.apache.kafka.clients.admin.ListTransactionsOptions; import org.apache.kafka.clients.admin.ListTransactionsResult; import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.TransactionDescription; import org.apache.kafka.clients.admin.TransactionListing; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -81,6 +83,8 @@ @Slf4j public class TransactionTest extends KopProtocolHandlerTestBase { + private static final int TRANSACTION_TIMEOUT_CONFIG_VALUE = 600 * 1000; + protected void setupTransactions() { this.conf.setDefaultNumberOfNamespaceBundles(4); this.conf.setOffsetsTopicNumPartitions(10); @@ -1157,7 +1161,7 @@ private KafkaProducer buildTransactionProducer(String transacti producerProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, txTimeout); } else { // very long time-out - producerProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 600 * 1000); + producerProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, TRANSACTION_TIMEOUT_CONFIG_VALUE); } addCustomizeProps(producerProps); @@ -1324,10 +1328,10 @@ public void testNotFencedWithBeginTransaction() throws Exception { producer2.close(); } - @Test(timeOut = 100000 * 30) - public void testListTransactions() throws Exception { + @Test(timeOut = 1000 * 30) + public void testListAndDescribeTransactions() throws Exception { - String topicName = "testListTransactions"; + String topicName = "testListAndDescribeTransactions"; String transactionalId = "myProducer_" + UUID.randomUUID(); @Cleanup @@ -1432,6 +1436,33 @@ private static void assertTransactionState(AdminClient kafkaAdmin, String transa .findFirst() .get(); assertEquals(transactionState, transactionListing.state()); + + Map map = + kafkaAdmin.describeTransactions(Collections.singleton(transactionalId)) + .all().get(); + assertEquals(1, map.size()); + TransactionDescription transactionDescription = map.get(transactionalId); + log.info("transactionDescription {}", transactionDescription); + assertNotNull(transactionDescription); + assertEquals(transactionDescription.state(), transactionState); + assertTrue(transactionDescription.producerEpoch() >= 0); + assertEquals(TRANSACTION_TIMEOUT_CONFIG_VALUE, transactionDescription.transactionTimeoutMs()); + assertTrue(transactionDescription.transactionStartTimeMs().isPresent()); + assertTrue(transactionDescription.coordinatorId() >= 0); + + switch (transactionState) { + case EMPTY: + case COMPLETE_COMMIT: + case COMPLETE_ABORT: + assertEquals(0, transactionDescription.topicPartitions().size()); + break; + case ONGOING: + assertEquals(1, transactionDescription.topicPartitions().size()); + break; + default: + fail("unhandled " + transactionState); + } + } /** From d60d786c646ff370635f4df3368110abb27f98ec Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Mon, 13 Mar 2023 19:36:38 +0100 Subject: [PATCH 4/4] [transactions] Implement KIP-664 DescribeProducers (#78) (cherry picked from commit c3376e597b51cb198d73870d15c596550c09ed21) --- .../handlers/kop/KafkaCommandDecoder.java | 10 +- .../handlers/kop/KafkaRequestHandler.java | 96 +++++++++++++++++++ .../handlers/kop/storage/PartitionLog.java | 27 ++++++ .../kop/storage/ProducerStateManager.java | 4 + .../handlers/kop/storage/ReplicaManager.java | 11 +++ .../transaction/TransactionTest.java | 88 +++++++++++++---- 6 files changed, 217 insertions(+), 19 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java index e0934a6db5..8203307eeb 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java @@ -311,6 +311,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception case DESCRIBE_GROUPS: handleDescribeGroupRequest(kafkaHeaderAndRequest, responseFuture); break; + case DESCRIBE_PRODUCERS: + handleDescribeProducersRequest(kafkaHeaderAndRequest, responseFuture); + break; case LIST_GROUPS: handleListGroupsRequest(kafkaHeaderAndRequest, responseFuture); break; @@ -573,7 +576,12 @@ protected void handleError(KafkaHeaderAndRequest kafkaHeaderAndRequest, handleLeaveGroupRequest(KafkaHeaderAndRequest leaveGroup, CompletableFuture response); protected abstract void - handleDescribeGroupRequest(KafkaHeaderAndRequest describeGroup, CompletableFuture response); + handleDescribeGroupRequest(KafkaHeaderAndRequest kafkaHeaderAndRequest, + CompletableFuture response); + + protected abstract void + handleDescribeProducersRequest(KafkaHeaderAndRequest kafkaHeaderAndRequest, + CompletableFuture response); protected abstract void handleListGroupsRequest(KafkaHeaderAndRequest listGroups, CompletableFuture response); diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java index 75dabce732..df9cb66eae 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java @@ -115,6 +115,7 @@ import org.apache.kafka.common.message.DescribeClusterResponseData; import org.apache.kafka.common.message.DescribeConfigsRequestData; import org.apache.kafka.common.message.DescribeConfigsResponseData; +import org.apache.kafka.common.message.DescribeProducersResponseData; import org.apache.kafka.common.message.DescribeTransactionsResponseData; import org.apache.kafka.common.message.EndTxnRequestData; import org.apache.kafka.common.message.EndTxnResponseData; @@ -160,6 +161,8 @@ import org.apache.kafka.common.requests.DescribeConfigsRequest; import org.apache.kafka.common.requests.DescribeConfigsResponse; import org.apache.kafka.common.requests.DescribeGroupsRequest; +import org.apache.kafka.common.requests.DescribeProducersRequest; +import org.apache.kafka.common.requests.DescribeProducersResponse; import org.apache.kafka.common.requests.DescribeTransactionsRequest; import org.apache.kafka.common.requests.DescribeTransactionsResponse; import org.apache.kafka.common.requests.EndTxnRequest; @@ -2041,6 +2044,99 @@ protected void handleDescribeGroupRequest(KafkaHeaderAndRequest describeGroup, )); } + @Override + protected void handleDescribeProducersRequest(KafkaHeaderAndRequest describeGroup, + CompletableFuture responseFuture) { + // https://github.com/apache/kafka/blob/79c19da68d6a93a729a07dfdd37f238246653a46/ + // core/src/main/scala/kafka/server/KafkaApis.scala#L3397 + checkArgument(describeGroup.getRequest() instanceof DescribeProducersRequest); + DescribeProducersRequest request = (DescribeProducersRequest) describeGroup.getRequest(); + Map allResponses = Maps.newConcurrentMap(); + Map errors = Maps.newConcurrentMap(); + String namespacePrefix = currentNamespacePrefix(); + final int numPartitions = request.data().topics().stream() + .mapToInt(t->t.partitionIndexes().size()) + .sum(); + Runnable completeOne = () -> { + if (errors.size() + allResponses.size() != numPartitions) { + // not enough responses + return; + } + errors.forEach((topicPartition, tpErrors) -> { + DescribeProducersResponseData.PartitionResponse topicResponse = + new DescribeProducersResponseData.PartitionResponse() + .setPartitionIndex(topicPartition.partition()) + .setErrorCode(tpErrors.code()) + .setErrorMessage(tpErrors.message()); + allResponses.put(topicPartition, topicResponse); + }); + DescribeProducersResponseData response = new DescribeProducersResponseData(); + allResponses + .entrySet() + .stream() + .collect(Collectors.groupingBy( + entry -> entry.getKey().topic(), + Collectors.mapping( + entry -> entry.getValue(), + Collectors.toList() + ) + )) + .forEach((topic, partitionResponses) -> { + DescribeProducersResponseData.TopicResponse topicResponse = + new DescribeProducersResponseData.TopicResponse() + .setName(topic) + .setPartitions(partitionResponses); + response.topics().add(topicResponse); + }); + responseFuture.complete(new DescribeProducersResponse(response)); + }; + + request.data().topics().forEach ((topicRequest) -> { + topicRequest.partitionIndexes().forEach(partition -> { + TopicPartition tp = new TopicPartition(topicRequest.name(), partition); + String fullPartitionName; + try { + fullPartitionName = KopTopic.toString(tp, namespacePrefix); + } catch (KoPTopicException e) { + log.warn("Invalid topic name: {}", tp.topic(), e); + errors.put(tp, Errors.UNKNOWN_TOPIC_OR_PARTITION); + completeOne.run(); + return; + } + authorize(AclOperation.WRITE, Resource.of(ResourceType.TOPIC, fullPartitionName)) + .whenComplete((isAuthorized, ex) -> { + if (ex != null) { + log.error("AddPartitionsToTxn topic authorize failed, topic - {}. {}", + fullPartitionName, ex.getMessage()); + errors.put(tp, Errors.TOPIC_AUTHORIZATION_FAILED); + completeOne.run(); + return; + } + if (!isAuthorized) { + errors.put(tp, Errors.TOPIC_AUTHORIZATION_FAILED); + completeOne.run(); + return; + } + CompletableFuture topicResponse = + replicaManager.activeProducerState(tp, namespacePrefix); + topicResponse.whenComplete((response, throwable) -> { + if (throwable != null) { + log.error("DescribeProducersRequest failed, topic - {}. {}", + fullPartitionName, throwable.getMessage()); + errors.put(tp, Errors.UNKNOWN_TOPIC_OR_PARTITION); + } else { + allResponses.put(tp, response); + } + completeOne.run(); + }); + + }); + }); + }); + + + } + @Override protected void handleListGroupsRequest(KafkaHeaderAndRequest listGroups, CompletableFuture resultFuture) { diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLog.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLog.java index 63f2221025..ed3f57faa3 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLog.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLog.java @@ -46,6 +46,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; import lombok.AllArgsConstructor; @@ -75,6 +76,7 @@ import org.apache.kafka.common.errors.NotLeaderOrFollowerException; import org.apache.kafka.common.errors.RecordTooLargeException; import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.message.DescribeProducersResponseData; import org.apache.kafka.common.message.FetchRequestData; import org.apache.kafka.common.message.FetchResponseData; import org.apache.kafka.common.protocol.Errors; @@ -138,6 +140,8 @@ public class PartitionLog { private volatile String kafkaTopicUUID; + private final AtomicBoolean unloaded = new AtomicBoolean(); + public PartitionLog(KafkaServiceConfiguration kafkaConfig, RequestStats requestStats, Time time, @@ -1176,6 +1180,29 @@ public CompletableFuture forcePurgeAbortTx() { }); } + public DescribeProducersResponseData.PartitionResponse activeProducerState() { + DescribeProducersResponseData.PartitionResponse producerState = + new DescribeProducersResponseData.PartitionResponse() + .setPartitionIndex(topicPartition.partition()) + .setErrorCode(Errors.NONE.code()) + .setActiveProducers(new ArrayList<>()); + + // this utility is only for monitoring, it is fine to access this structure directly from any thread + Map producers = producerStateManager.getProducers(); + producers.values().forEach(producerStateEntry -> { + producerState.activeProducers().add(new DescribeProducersResponseData.ProducerState() + .setProducerId(producerStateEntry.producerId()) + .setLastSequence(-1) // NOT HANDLED YET + .setProducerEpoch(producerStateEntry.producerEpoch() != null + ? producerStateEntry.producerEpoch().intValue() : -1) + .setLastTimestamp(producerStateEntry.lastTimestamp() != null + ? producerStateEntry.lastTimestamp().longValue() : -1) + .setCoordinatorEpoch(producerStateEntry.coordinatorEpoch()) + .setCurrentTxnStartOffset(producerStateEntry.currentTxnFirstOffset().orElse(-1L))); + }); + return producerState; + } + public CompletableFuture recoverTxEntries( long offset, Executor executor) { diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ProducerStateManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ProducerStateManager.java index a19fe5ee8b..19de9b9c79 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ProducerStateManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ProducerStateManager.java @@ -363,4 +363,8 @@ public void handleMissingDataBeforeRecovery(long minOffset, long snapshotOffset) } } + public Map getProducers() { + return producers; + } + } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ReplicaManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ReplicaManager.java index 711b7d2395..d89597dbbc 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ReplicaManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ReplicaManager.java @@ -42,6 +42,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.errors.NotLeaderOrFollowerException; +import org.apache.kafka.common.message.DescribeProducersResponseData; import org.apache.kafka.common.message.FetchRequestData; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.MemoryRecords; @@ -341,4 +342,14 @@ public CompletableFuture updatePurgeAbortedTxnsOffsets() { return logManager.updatePurgeAbortedTxnsOffsets(); } + public CompletableFuture activeProducerState( + TopicPartition topicPartition, + String namespacePrefix) { + PartitionLog partitionLog = getPartitionLog(topicPartition, namespacePrefix); + // https://github.com/apache/kafka/blob/5514f372b3e12db1df35b257068f6bb5083111c7/ + // core/src/main/scala/kafka/server/ReplicaManager.scala#L535 + return partitionLog.awaitInitialisation() + .thenApply(log -> log.activeProducerState()); + } + } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionTest.java index 470a3e10cc..40cda3b6ef 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionTest.java @@ -18,6 +18,7 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; import static org.testng.Assert.assertSame; import static org.testng.Assert.assertTrue; import static org.testng.Assert.expectThrows; @@ -43,14 +44,17 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; import java.util.function.Function; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.RandomStringUtils; import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.DescribeProducersResult; import org.apache.kafka.clients.admin.ListTransactionsOptions; import org.apache.kafka.clients.admin.ListTransactionsResult; import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.ProducerState; import org.apache.kafka.clients.admin.TransactionDescription; import org.apache.kafka.clients.admin.TransactionListing; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -1342,47 +1346,63 @@ public void testListAndDescribeTransactions() throws Exception { producer.initTransactions(); producer.beginTransaction(); assertTransactionState(kafkaAdmin, transactionalId, - org.apache.kafka.clients.admin.TransactionState.EMPTY); + org.apache.kafka.clients.admin.TransactionState.EMPTY, (stateOnBroker, stateOnCoodinator) -> { + assertNull(stateOnBroker); + }); producer.send(new ProducerRecord<>(topicName, 1, "bar")).get(); producer.flush(); - ListTransactionsResult listTransactionsResult = kafkaAdmin.listTransactions(); - listTransactionsResult.all().get().forEach(t -> { - log.info("Found transactionalId: {} {} {}", - t.transactionalId(), - t.producerId(), - t.state()); - }); + // the transaction is in ONGOING state assertTransactionState(kafkaAdmin, transactionalId, - org.apache.kafka.clients.admin.TransactionState.ONGOING); + org.apache.kafka.clients.admin.TransactionState.ONGOING, + (stateOnBroker, stateOnCoodinator) -> {}); + + // wait for the brokers to update the state Awaitility.await().untilAsserted(() -> { - assertTransactionState(kafkaAdmin, transactionalId, - org.apache.kafka.clients.admin.TransactionState.ONGOING); + assertTransactionState(kafkaAdmin, transactionalId, + org.apache.kafka.clients.admin.TransactionState.ONGOING, + (stateOnBroker, stateOnCoodinator) -> { + // THESE ASSERTIONS ARE NOT VALID YET + //log.info("stateOnBroker: {}", stateOnBroker); + //log.info("stateOnCoodinator: {}", stateOnCoodinator); + // assertTrue(stateOnBroker.lastTimestamp() + // >= stateOnCoodinator.transactionStartTimeMs().orElseThrow()); + }); }); producer.commitTransaction(); Awaitility.await().untilAsserted(() -> { - assertTransactionState(kafkaAdmin, transactionalId, - org.apache.kafka.clients.admin.TransactionState.COMPLETE_COMMIT); - }); + assertTransactionState(kafkaAdmin, transactionalId, + org.apache.kafka.clients.admin.TransactionState.COMPLETE_COMMIT, + (stateOnBroker, stateOnCoodinator) -> { + }); + }); producer.beginTransaction(); assertTransactionState(kafkaAdmin, transactionalId, - org.apache.kafka.clients.admin.TransactionState.COMPLETE_COMMIT); + org.apache.kafka.clients.admin.TransactionState.COMPLETE_COMMIT, + (stateOnBroker, stateOnCoodinator) -> { + }); producer.send(new ProducerRecord<>(topicName, 1, "bar")).get(); producer.flush(); producer.abortTransaction(); Awaitility.await().untilAsserted(() -> { assertTransactionState(kafkaAdmin, transactionalId, - org.apache.kafka.clients.admin.TransactionState.COMPLETE_ABORT); + org.apache.kafka.clients.admin.TransactionState.COMPLETE_ABORT, + (stateOnBroker, stateOnCoodinator) -> { + }); }); producer.close(); assertTransactionState(kafkaAdmin, transactionalId, - org.apache.kafka.clients.admin.TransactionState.COMPLETE_ABORT); + org.apache.kafka.clients.admin.TransactionState.COMPLETE_ABORT, + (stateOnBroker, stateOnCoodinator) -> { + }); } private static void assertTransactionState(AdminClient kafkaAdmin, String transactionalId, - org.apache.kafka.clients.admin.TransactionState transactionState) + org.apache.kafka.clients.admin.TransactionState transactionState, + BiConsumer + producerStateValidator) throws Exception { ListTransactionsResult listTransactionsResult = kafkaAdmin.listTransactions(); Collection transactionListings = listTransactionsResult.all().get(); @@ -1457,12 +1477,44 @@ private static void assertTransactionState(AdminClient kafkaAdmin, String transa assertEquals(0, transactionDescription.topicPartitions().size()); break; case ONGOING: + assertTrue(transactionDescription.transactionStartTimeMs().orElseThrow() > 0); assertEquals(1, transactionDescription.topicPartitions().size()); break; default: fail("unhandled " + transactionState); } + DescribeProducersResult producers = kafkaAdmin.describeProducers(transactionDescription.topicPartitions()); + Map topicPartitionPartitionProducerStateMap = + producers.all().get(); + log.debug("topicPartitionPartitionProducerStateMap {}", topicPartitionPartitionProducerStateMap); + + + switch (transactionState) { + case EMPTY: + case COMPLETE_COMMIT: + case COMPLETE_ABORT: + producerStateValidator.accept(null, transactionDescription); + assertEquals(0, topicPartitionPartitionProducerStateMap.size()); + break; + case ONGOING: + assertEquals(1, topicPartitionPartitionProducerStateMap.size()); + TopicPartition tp = transactionDescription.topicPartitions().iterator().next(); + DescribeProducersResult.PartitionProducerState partitionProducerState = + topicPartitionPartitionProducerStateMap.get(tp); + List producerStates = partitionProducerState.activeProducers(); + assertEquals(1, producerStates.size()); + ProducerState producerState = producerStates.get(0); + assertEquals(producerState.producerId(), transactionDescription.producerId()); + producerStateValidator.accept(producerState, transactionDescription); + + + break; + default: + fail("unhandled " + transactionState); + } + + } /**