diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java index 4def7adecb..dbce355edf 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java @@ -325,6 +325,12 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception case LIST_GROUPS: handleListGroupsRequest(kafkaHeaderAndRequest, responseFuture); break; + case LIST_TRANSACTIONS: + handleListTransactionsRequest(kafkaHeaderAndRequest, responseFuture); + break; + case DESCRIBE_TRANSACTIONS: + handleDescribeTransactionsRequest(kafkaHeaderAndRequest, responseFuture); + break; case DELETE_GROUPS: handleDeleteGroupsRequest(kafkaHeaderAndRequest, responseFuture); break; @@ -583,6 +589,12 @@ protected void handleError(KafkaHeaderAndRequest kafkaHeaderAndRequest, protected abstract void handleListGroupsRequest(KafkaHeaderAndRequest listGroups, CompletableFuture response); + protected abstract void + handleListTransactionsRequest(KafkaHeaderAndRequest listGroups, CompletableFuture response); + + protected abstract void + handleDescribeTransactionsRequest(KafkaHeaderAndRequest listGroups, CompletableFuture response); + protected abstract void handleDeleteGroupsRequest(KafkaHeaderAndRequest deleteGroups, CompletableFuture response); diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java index 233b7892c2..818d331abe 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java @@ -32,6 +32,7 @@ import io.streamnative.pulsar.handlers.kop.exceptions.KoPTopicException; import io.streamnative.pulsar.handlers.kop.offset.OffsetAndMetadata; import io.streamnative.pulsar.handlers.kop.offset.OffsetMetadata; +import io.streamnative.pulsar.handlers.kop.scala.Either; import io.streamnative.pulsar.handlers.kop.security.SaslAuthenticator; import io.streamnative.pulsar.handlers.kop.security.Session; import io.streamnative.pulsar.handlers.kop.security.auth.Authorizer; @@ -116,6 +117,7 @@ import org.apache.kafka.common.message.DescribeClusterResponseData; import org.apache.kafka.common.message.DescribeConfigsRequestData; import org.apache.kafka.common.message.DescribeConfigsResponseData; +import org.apache.kafka.common.message.DescribeTransactionsResponseData; import org.apache.kafka.common.message.EndTxnRequestData; import org.apache.kafka.common.message.EndTxnResponseData; import org.apache.kafka.common.message.FetchRequestData; @@ -127,6 +129,7 @@ import org.apache.kafka.common.message.LeaveGroupRequestData; import org.apache.kafka.common.message.ListOffsetsRequestData; import org.apache.kafka.common.message.ListOffsetsResponseData; +import org.apache.kafka.common.message.ListTransactionsResponseData; import org.apache.kafka.common.message.OffsetCommitRequestData; import org.apache.kafka.common.message.ProduceRequestData; import org.apache.kafka.common.message.SaslAuthenticateResponseData; @@ -159,6 +162,8 @@ import org.apache.kafka.common.requests.DescribeConfigsRequest; import org.apache.kafka.common.requests.DescribeConfigsResponse; import org.apache.kafka.common.requests.DescribeGroupsRequest; +import org.apache.kafka.common.requests.DescribeTransactionsRequest; +import org.apache.kafka.common.requests.DescribeTransactionsResponse; import org.apache.kafka.common.requests.EndTxnRequest; import org.apache.kafka.common.requests.EndTxnResponse; import org.apache.kafka.common.requests.FetchRequest; @@ -175,6 +180,8 @@ import org.apache.kafka.common.requests.ListOffsetRequestV0; import org.apache.kafka.common.requests.ListOffsetsRequest; import org.apache.kafka.common.requests.ListOffsetsResponse; +import org.apache.kafka.common.requests.ListTransactionsRequest; +import org.apache.kafka.common.requests.ListTransactionsResponse; import org.apache.kafka.common.requests.MetadataRequest; import org.apache.kafka.common.requests.MetadataResponse.PartitionMetadata; import org.apache.kafka.common.requests.MetadataResponse.TopicMetadata; @@ -2017,8 +2024,36 @@ protected void handleDescribeGroupRequest(KafkaHeaderAndRequest describeGroup, protected void handleListGroupsRequest(KafkaHeaderAndRequest listGroups, CompletableFuture resultFuture) { checkArgument(listGroups.getRequest() instanceof ListGroupsRequest); - KeyValue> listResult = getGroupCoordinator().handleListGroups(); - resultFuture.complete(KafkaResponseUtils.newListGroups(listResult.getKey(), listResult.getValue())); + Either> listResult = getGroupCoordinator().handleListGroups(); + resultFuture.complete(KafkaResponseUtils.newListGroups(listResult)); + } + + @Override + protected void handleListTransactionsRequest(KafkaHeaderAndRequest listTransactions, + CompletableFuture resultFuture) { + checkArgument(listTransactions.getRequest() instanceof ListTransactionsRequest); + ListTransactionsRequest request = (ListTransactionsRequest) listTransactions.getRequest(); + List stateFilters = request.data().stateFilters(); + if (stateFilters == null) { + stateFilters = Collections.emptyList(); + } + List producerIdFilters = request.data().producerIdFilters(); + if (producerIdFilters == null) { + producerIdFilters = Collections.emptyList(); + } + ListTransactionsResponseData listResult = getTransactionCoordinator() + .handleListTransactions(stateFilters, producerIdFilters); + resultFuture.complete(new ListTransactionsResponse(listResult)); + } + + @Override + protected void handleDescribeTransactionsRequest(KafkaHeaderAndRequest listGroups, + CompletableFuture response) { + checkArgument(listGroups.getRequest() instanceof DescribeTransactionsRequest); + DescribeTransactionsRequest request = (DescribeTransactionsRequest) listGroups.getRequest(); + DescribeTransactionsResponseData describeResult = getTransactionCoordinator() + .handleDescribeTransactions(request.data().transactionalIds()); + response.complete(new DescribeTransactionsResponse(describeResult)); } @Override diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupCoordinator.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupCoordinator.java index f1f5d90ac1..98813604f7 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupCoordinator.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupCoordinator.java @@ -29,6 +29,7 @@ import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadata.GroupOverview; import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadata.GroupSummary; import io.streamnative.pulsar.handlers.kop.offset.OffsetAndMetadata; +import io.streamnative.pulsar.handlers.kop.scala.Either; import io.streamnative.pulsar.handlers.kop.utils.CoreUtils; import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationKey.GroupKey; import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationKey.MemberKey; @@ -832,22 +833,16 @@ public KeyValue> handleFetchOffsets( ); } - public KeyValue> handleListGroups() { + public Either> handleListGroups() { if (!isActive.get()) { - return new KeyValue<>(Errors.COORDINATOR_NOT_AVAILABLE, new ArrayList<>()); + return Either.left(Errors.COORDINATOR_NOT_AVAILABLE); } else { - Errors errors; if (groupManager.isLoading()) { - errors = Errors.COORDINATOR_LOAD_IN_PROGRESS; - } else { - errors = Errors.NONE; + return Either.left(Errors.COORDINATOR_LOAD_IN_PROGRESS); } List overviews = new ArrayList<>(); groupManager.currentGroups().forEach(group -> overviews.add(group.overview())); - return new KeyValue<>( - errors, - overviews - ); + return Either.right(overviews); } } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionCoordinator.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionCoordinator.java index dfd3f08580..5b887057a8 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionCoordinator.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionCoordinator.java @@ -13,6 +13,7 @@ */ package io.streamnative.pulsar.handlers.kop.coordinator.transaction; +import static io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionState.DEAD; import static io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionState.ONGOING; import static io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionState.PREPARE_ABORT; import static io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionState.PREPARE_COMMIT; @@ -32,6 +33,7 @@ import io.streamnative.pulsar.handlers.kop.storage.PulsarPartitionedTopicProducerStateManagerSnapshotBuffer; import io.streamnative.pulsar.handlers.kop.utils.MetadataUtils; import io.streamnative.pulsar.handlers.kop.utils.ProducerIdAndEpoch; +import java.util.List; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -52,6 +54,8 @@ import org.apache.commons.lang3.StringUtils; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.DescribeTransactionsResponseData; +import org.apache.kafka.common.message.ListTransactionsResponseData; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.requests.TransactionResult; @@ -80,6 +84,8 @@ public class TransactionCoordinator { private final Time time; + private final AtomicBoolean isActive = new AtomicBoolean(false); + private static final BiConsumer onEndTransactionComplete = (txnIdAndPidEpoch, errors) -> { @@ -218,6 +224,87 @@ public static String getTopicPartitionName(String topicPartitionName, int partit return topicPartitionName + PARTITIONED_TOPIC_SUFFIX + partitionId; } + public ListTransactionsResponseData handleListTransactions(List filteredStates, + List filteredProducerIds) { + // https://github.com/apache/kafka/blob/915991445fde106d02e61a70425ae2601c813db0/core/ + // src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L259 + if (!isActive.get()) { + log.warn("The transaction coordinator is not active, so it will reject list transaction request"); + return new ListTransactionsResponseData().setErrorCode(Errors.NOT_COORDINATOR.code()); + } + return this.txnManager.listTransactionStates(filteredProducerIds, filteredStates); + } + + public DescribeTransactionsResponseData handleDescribeTransactions(List transactionalIds) { + DescribeTransactionsResponseData response = new DescribeTransactionsResponseData(); + if (transactionalIds != null) { + transactionalIds.forEach(transactionalId -> { + DescribeTransactionsResponseData.TransactionState transactionState = + handleDescribeTransactions(transactionalId); + response.transactionStates().add(transactionState); + }); + } + return response; + } + + private DescribeTransactionsResponseData.TransactionState handleDescribeTransactions(String transactionalId) { + // https://github.com/apache/kafka/blob/915991445fde106d02e61a70425ae2601c813db0/core/ + // src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L270 + if (transactionalId == null) { + throw new IllegalArgumentException("Invalid null transactionalId"); + } + + DescribeTransactionsResponseData.TransactionState transactionState = + new DescribeTransactionsResponseData.TransactionState() + .setTransactionalId(transactionalId); + + if (!isActive.get()) { + transactionState.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()); + } else if (transactionalId.isEmpty()) { + transactionState.setErrorCode(Errors.INVALID_REQUEST.code()); + } else { + Either> tState = + txnManager.getTransactionState(transactionalId); + if (tState.isLeft()) { + transactionState.setErrorCode(tState.getLeft().code()); + } else { + Optional right = tState.getRight(); + if (!right.isPresent()) { + transactionState.setErrorCode(Errors.TRANSACTIONAL_ID_NOT_FOUND.code()); + } else { + CoordinatorEpochAndTxnMetadata coordinatorEpochAndMetadata = right.get(); + TransactionMetadata txnMetadata = coordinatorEpochAndMetadata.getTransactionMetadata(); + txnMetadata.inLock(() -> { + if (txnMetadata.getState() == DEAD) { + // The transaction state is being expired, so ignore it + transactionState.setErrorCode(Errors.TRANSACTIONAL_ID_NOT_FOUND.code()); + } else { + txnMetadata.getTopicPartitions().forEach(topicPartition -> { + var topicData = transactionState.topics().find(topicPartition.topic()); + if (topicData == null) { + topicData = new DescribeTransactionsResponseData.TopicData() + .setTopic(topicPartition.topic()); + transactionState.topics().add(topicData); + } + topicData.partitions().add(topicPartition.partition()); + }); + + transactionState + .setErrorCode(Errors.NONE.code()) + .setProducerId(txnMetadata.getProducerId()) + .setProducerEpoch(txnMetadata.getProducerEpoch()) + .setTransactionState(txnMetadata.getState().toAdminState().toString()) + .setTransactionTimeoutMs(txnMetadata.getTxnTimeoutMs()) + .setTransactionStartTimeMs(txnMetadata.getTxnStartTimestamp()); + } + return null; + }); + } + } + } + return transactionState; + } + @Data @EqualsAndHashCode @AllArgsConstructor @@ -956,6 +1043,7 @@ public CompletableFuture startup(boolean enableTransactionalIdExpiration) return this.producerIdManager.initialize().thenCompose(ignored -> { log.info("{} Startup transaction coordinator complete.", namespacePrefixForMetadata); + isActive.set(true); return CompletableFuture.completedFuture(null); }); } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionState.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionState.java index 7919449097..01d626f391 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionState.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionState.java @@ -117,4 +117,26 @@ public boolean isExpirationAllowed() { return false; } } + + public org.apache.kafka.clients.admin.TransactionState toAdminState() { + switch (this) { + case EMPTY: + return org.apache.kafka.clients.admin.TransactionState.EMPTY; + case ONGOING: + return org.apache.kafka.clients.admin.TransactionState.ONGOING; + case PREPARE_COMMIT: + return org.apache.kafka.clients.admin.TransactionState.PREPARE_COMMIT; + case PREPARE_ABORT: + return org.apache.kafka.clients.admin.TransactionState.PREPARE_ABORT; + case COMPLETE_COMMIT: + return org.apache.kafka.clients.admin.TransactionState.COMPLETE_COMMIT; + case COMPLETE_ABORT: + return org.apache.kafka.clients.admin.TransactionState.COMPLETE_ABORT; + case PREPARE_EPOCH_FENCE: + return org.apache.kafka.clients.admin.TransactionState.PREPARE_EPOCH_FENCE; + case DEAD: + default: + return org.apache.kafka.clients.admin.TransactionState.UNKNOWN; + } + } } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionStateManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionStateManager.java index f2762c57b5..08835b265d 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionStateManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionStateManager.java @@ -25,6 +25,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -40,6 +41,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.common.concurrent.FutureUtils; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.ListTransactionsResponseData; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.types.SchemaException; import org.apache.kafka.common.requests.ProduceResponse; @@ -245,6 +247,71 @@ private boolean shouldExpire(TransactionMetadata txnMetadata, Long currentTimeMs <= (currentTimeMs - transactionConfig.getTransactionalIdExpirationMs()); } + private static boolean shouldInclude(TransactionMetadata txnMetadata, + List filterProducerIds, Set filterStateNames) { + if (txnMetadata.getState() == TransactionState.DEAD) { + // We filter the `Dead` state since it is a transient state which + // indicates that the transactionalId and its metadata are in the + // process of expiration and removal. + return false; + } else if (!filterProducerIds.isEmpty() && !filterProducerIds.contains(txnMetadata.getProducerId())) { + return false; + } else if (!filterStateNames.isEmpty() && !filterStateNames.contains( + txnMetadata.getState().toAdminState().toString())) { + return false; + } else { + return true; + } + } + + public ListTransactionsResponseData listTransactionStates(List filteredProducerIds, + List filteredStates) { + return CoreUtils.inReadLock(stateLock, () -> { + ListTransactionsResponseData response = new ListTransactionsResponseData(); + if (!loadingPartitions.isEmpty()) { + response.setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()); + } else { + Set filterStates = new HashSet<>(); + for (TransactionState stateName : TransactionState.values()) { + String nameForTheClient = stateName.toAdminState().toString(); + if (filteredStates.contains(nameForTheClient)) { + filterStates.add(nameForTheClient); + } else { + response.unknownStateFilters().add(nameForTheClient); + } + } + List states = new ArrayList<>(); + transactionMetadataCache.forEach((__, cache) -> { + cache.values().forEach(txnMetadata -> { + txnMetadata.inLock(() -> { + // use toString() to get the name of the state according to the protocol + ListTransactionsResponseData.TransactionState transactionState = + new ListTransactionsResponseData.TransactionState() + .setTransactionalId(txnMetadata.getTransactionalId()) + .setProducerId(txnMetadata.getProducerId()) + .setTransactionState(txnMetadata.getState().toAdminState().toString()); + + if (shouldInclude(txnMetadata, filteredProducerIds, filterStates)) { + if (log.isDebugEnabled()) { + log.debug("add transaction state: {}", transactionState); + } + states.add(transactionState); + } else { + if (log.isDebugEnabled()) { + log.debug("Skip transaction state: {}", transactionState); + } + } + return null; + }); + }); + }); + response.setErrorCode(Errors.NONE.code()) + .setTransactionStates(states); + } + return response; + }); + } + @Data @AllArgsConstructor private static class TransactionalIdCoordinatorEpochAndMetadata { diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/KafkaResponseUtils.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/KafkaResponseUtils.java index 3a654dc2b3..0fb479f517 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/KafkaResponseUtils.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/KafkaResponseUtils.java @@ -15,6 +15,7 @@ import io.streamnative.pulsar.handlers.kop.ApiVersion; import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadata; +import io.streamnative.pulsar.handlers.kop.scala.Either; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -269,14 +270,18 @@ public static LeaveGroupResponse newLeaveGroup(Errors errors) { return new LeaveGroupResponse(data); } - public static ListGroupsResponse newListGroups(Errors errors, - List groups) { + public static ListGroupsResponse newListGroups(Either> results) { ListGroupsResponseData data = new ListGroupsResponseData(); - data.setErrorCode(errors.code()); - data.setGroups(groups.stream().map(overView -> new ListGroupsResponseData.ListedGroup() - .setGroupId(overView.groupId()) - .setProtocolType(overView.protocolType())) - .collect(Collectors.toList())); + data.setErrorCode(results.isLeft() ? results.getLeft().code() : Errors.NONE.code()); + if (!results.isLeft()) { + data.setGroups(results.getRight().stream().map(overView -> new ListGroupsResponseData.ListedGroup() + .setGroupId(overView.groupId()) + .setProtocolType(overView.protocolType())) + .collect(Collectors.toList())); + + } else { + data.setGroups(Collections.emptyList()); + } return new ListGroupsResponse(data); } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java index fe939f3689..399b9c201d 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java @@ -312,6 +312,7 @@ protected final void internalSetup(boolean startBroker) throws Exception { createClient(); MetadataUtils.createOffsetMetadataIfMissing(conf.getKafkaMetadataTenant(), admin, clusterData, this.conf); + if (conf.isKafkaTransactionCoordinatorEnabled()) { MetadataUtils.createTxnMetadataIfMissing(conf.getKafkaMetadataTenant(), admin, clusterData, this.conf); } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupCoordinatorTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupCoordinatorTest.java index 105958975d..3696b926dc 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupCoordinatorTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupCoordinatorTest.java @@ -15,6 +15,7 @@ import static org.mockito.Mockito.spy; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; @@ -28,6 +29,7 @@ import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadata.GroupSummary; import io.streamnative.pulsar.handlers.kop.coordinator.group.MemberMetadata.MemberSummary; import io.streamnative.pulsar.handlers.kop.offset.OffsetAndMetadata; +import io.streamnative.pulsar.handlers.kop.scala.Either; import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationPurgatory; import io.streamnative.pulsar.handlers.kop.utils.timer.MockTimer; import java.util.ArrayList; @@ -218,8 +220,8 @@ public void testRequestHandlingWhileLoadingInProgress() throws Exception { assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS, describeGroupResult.getKey()); // ListGroups - KeyValue> listGroupsResult = groupCoordinator.handleListGroups(); - assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS, listGroupsResult.getKey()); + Either> listGroupsResult = groupCoordinator.handleListGroups(); + assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS, listGroupsResult.getLeft()); // DeleteGroups Map deleteGroupsErrors = groupCoordinator.handleDeleteGroups( @@ -1695,12 +1697,12 @@ groupId, memberId, protocolType, newProtocols() ).get(); assertEquals(Errors.NONE, syncGroupResult.getKey()); - KeyValue> groups = groupCoordinator.handleListGroups(); - assertEquals(Errors.NONE, groups.getKey()); - assertEquals(1, groups.getValue().size()); + Either> groups = groupCoordinator.handleListGroups(); + assertFalse(groups.isLeft()); + assertEquals(1, groups.getRight().size()); assertEquals( new GroupOverview("groupId", "consumer"), - groups.getValue().get(0) + groups.getRight().get(0) ); } @@ -1712,12 +1714,12 @@ groupId, memberId, protocolType, newProtocols() ); assertEquals(Errors.NONE, joinGroupResult.getError()); - KeyValue> groups = groupCoordinator.handleListGroups(); - assertEquals(Errors.NONE, groups.getKey()); - assertEquals(1, groups.getValue().size()); + Either> groups = groupCoordinator.handleListGroups(); + assertFalse(groups.isLeft()); + assertEquals(1, groups.getRight().size()); assertEquals( new GroupOverview("groupId", "consumer"), - groups.getValue().get(0) + groups.getRight().get(0) ); } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionTest.java index ea6ef0e23b..415846c46d 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionTest.java @@ -36,6 +36,7 @@ import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -52,7 +53,11 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.RandomStringUtils; import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.ListTransactionsOptions; +import org.apache.kafka.clients.admin.ListTransactionsResult; import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.TransactionDescription; +import org.apache.kafka.clients.admin.TransactionListing; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -83,6 +88,8 @@ @Slf4j public class TransactionTest extends KopProtocolHandlerTestBase { + private static final int TRANSACTION_TIMEOUT_CONFIG_VALUE = 600 * 1000; + protected void setupTransactions() { this.conf.setDefaultNumberOfNamespaceBundles(4); this.conf.setOffsetsTopicNumPartitions(10); @@ -1158,7 +1165,7 @@ private KafkaProducer buildTransactionProducer(String transacti producerProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, txTimeout); } else { // very long time-out - producerProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 600 * 1000); + producerProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, TRANSACTION_TIMEOUT_CONFIG_VALUE); } producerProps.put(CLIENT_ID_CONFIG, "dummy_client_" + UUID.randomUUID()); addCustomizeProps(producerProps); @@ -1487,6 +1494,143 @@ public void testAbortedTxEventuallyPurged() throws Exception { } } + @Test(timeOut = 1000 * 30) + public void testListAndDescribeTransactions() throws Exception { + + String topicName = "testListAndDescribeTransactions"; + String transactionalId = "myProducer_" + UUID.randomUUID(); + + @Cleanup + KafkaProducer producer = buildTransactionProducer(transactionalId); + @Cleanup + AdminClient kafkaAdmin = AdminClient.create(newKafkaAdminClientProperties()); + + producer.initTransactions(); + producer.beginTransaction(); + assertTransactionState(kafkaAdmin, transactionalId, + org.apache.kafka.clients.admin.TransactionState.EMPTY); + producer.send(new ProducerRecord<>(topicName, 1, "bar")).get(); + producer.flush(); + + ListTransactionsResult listTransactionsResult = kafkaAdmin.listTransactions(); + listTransactionsResult.all().get().forEach(t -> { + log.info("Found transactionalId: {} {} {}", + t.transactionalId(), + t.producerId(), + t.state()); + }); + assertTransactionState(kafkaAdmin, transactionalId, + org.apache.kafka.clients.admin.TransactionState.ONGOING); + Awaitility.await().untilAsserted(() -> { + assertTransactionState(kafkaAdmin, transactionalId, + org.apache.kafka.clients.admin.TransactionState.ONGOING); + }); + producer.commitTransaction(); + Awaitility.await().untilAsserted(() -> { + assertTransactionState(kafkaAdmin, transactionalId, + org.apache.kafka.clients.admin.TransactionState.COMPLETE_COMMIT); + }); + producer.beginTransaction(); + + assertTransactionState(kafkaAdmin, transactionalId, + org.apache.kafka.clients.admin.TransactionState.COMPLETE_COMMIT); + + producer.send(new ProducerRecord<>(topicName, 1, "bar")).get(); + producer.flush(); + producer.abortTransaction(); + Awaitility.await().untilAsserted(() -> { + assertTransactionState(kafkaAdmin, transactionalId, + org.apache.kafka.clients.admin.TransactionState.COMPLETE_ABORT); + }); + producer.close(); + assertTransactionState(kafkaAdmin, transactionalId, + org.apache.kafka.clients.admin.TransactionState.COMPLETE_ABORT); + } + + private static void assertTransactionState(AdminClient kafkaAdmin, String transactionalId, + org.apache.kafka.clients.admin.TransactionState transactionState) + throws Exception { + ListTransactionsResult listTransactionsResult = kafkaAdmin.listTransactions(); + Collection transactionListings = listTransactionsResult.all().get(); + transactionListings.forEach(t -> { + log.info("Found transactionalId: {} {} {}", + t.transactionalId(), + t.producerId(), + t.state()); + }); + TransactionListing transactionListing = transactionListings + .stream() + .filter(t -> t.transactionalId().equals(transactionalId)) + .findFirst() + .get(); + assertEquals(transactionState, transactionListing.state()); + + // filter for the same state + ListTransactionsOptions optionFilterState = new ListTransactionsOptions() + .filterStates(Collections.singleton(transactionState)); + listTransactionsResult = kafkaAdmin.listTransactions(optionFilterState); + transactionListings = listTransactionsResult.all().get(); + transactionListing = transactionListings + .stream() + .filter(t -> t.transactionalId().equals(transactionalId)) + .findFirst() + .get(); + assertEquals(transactionState, transactionListing.state()); + + + // filter for the same producer id + ListTransactionsOptions optionFilterProducer = new ListTransactionsOptions() + .filterProducerIds(Collections.singleton(transactionListing.producerId())); + listTransactionsResult = kafkaAdmin.listTransactions(optionFilterProducer); + transactionListings = listTransactionsResult.all().get(); + transactionListing = transactionListings + .stream() + .filter(t -> t.transactionalId().equals(transactionalId)) + .findFirst() + .get(); + assertEquals(transactionState, transactionListing.state()); + + // filter for the same producer id and state + ListTransactionsOptions optionFilterProducerAndState = new ListTransactionsOptions() + .filterStates(Collections.singleton(transactionState)) + .filterProducerIds(Collections.singleton(transactionListing.producerId())); + listTransactionsResult = kafkaAdmin.listTransactions(optionFilterProducerAndState); + transactionListings = listTransactionsResult.all().get(); + transactionListing = transactionListings + .stream() + .filter(t -> t.transactionalId().equals(transactionalId)) + .findFirst() + .get(); + assertEquals(transactionState, transactionListing.state()); + + Map map = + kafkaAdmin.describeTransactions(Collections.singleton(transactionalId)) + .all().get(); + assertEquals(1, map.size()); + TransactionDescription transactionDescription = map.get(transactionalId); + log.info("transactionDescription {}", transactionDescription); + assertNotNull(transactionDescription); + assertEquals(transactionDescription.state(), transactionState); + assertTrue(transactionDescription.producerEpoch() >= 0); + assertEquals(TRANSACTION_TIMEOUT_CONFIG_VALUE, transactionDescription.transactionTimeoutMs()); + assertTrue(transactionDescription.transactionStartTimeMs().isPresent()); + assertTrue(transactionDescription.coordinatorId() >= 0); + + switch (transactionState) { + case EMPTY: + case COMPLETE_COMMIT: + case COMPLETE_ABORT: + assertEquals(0, transactionDescription.topicPartitions().size()); + break; + case ONGOING: + assertEquals(1, transactionDescription.topicPartitions().size()); + break; + default: + fail("unhandled " + transactionState); + } + + } + /** * Get the Kafka server address. */