Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

[transactions] Implement KIP-664 - DESCRIBE_TRANSACTIONS #1985

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,12 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
case LIST_GROUPS:
handleListGroupsRequest(kafkaHeaderAndRequest, responseFuture);
break;
case LIST_TRANSACTIONS:
handleListTransactionsRequest(kafkaHeaderAndRequest, responseFuture);
break;
case DESCRIBE_TRANSACTIONS:
handleDescribeTransactionsRequest(kafkaHeaderAndRequest, responseFuture);
break;
case DELETE_GROUPS:
handleDeleteGroupsRequest(kafkaHeaderAndRequest, responseFuture);
break;
Expand Down Expand Up @@ -583,6 +589,12 @@ protected void handleError(KafkaHeaderAndRequest kafkaHeaderAndRequest,
protected abstract void
handleListGroupsRequest(KafkaHeaderAndRequest listGroups, CompletableFuture<AbstractResponse> response);

protected abstract void
handleListTransactionsRequest(KafkaHeaderAndRequest listGroups, CompletableFuture<AbstractResponse> response);

protected abstract void
handleDescribeTransactionsRequest(KafkaHeaderAndRequest listGroups, CompletableFuture<AbstractResponse> response);

protected abstract void
handleDeleteGroupsRequest(KafkaHeaderAndRequest deleteGroups, CompletableFuture<AbstractResponse> response);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.streamnative.pulsar.handlers.kop.exceptions.KoPTopicException;
import io.streamnative.pulsar.handlers.kop.offset.OffsetAndMetadata;
import io.streamnative.pulsar.handlers.kop.offset.OffsetMetadata;
import io.streamnative.pulsar.handlers.kop.scala.Either;
import io.streamnative.pulsar.handlers.kop.security.SaslAuthenticator;
import io.streamnative.pulsar.handlers.kop.security.Session;
import io.streamnative.pulsar.handlers.kop.security.auth.Authorizer;
Expand Down Expand Up @@ -116,6 +117,7 @@
import org.apache.kafka.common.message.DescribeClusterResponseData;
import org.apache.kafka.common.message.DescribeConfigsRequestData;
import org.apache.kafka.common.message.DescribeConfigsResponseData;
import org.apache.kafka.common.message.DescribeTransactionsResponseData;
import org.apache.kafka.common.message.EndTxnRequestData;
import org.apache.kafka.common.message.EndTxnResponseData;
import org.apache.kafka.common.message.FetchRequestData;
Expand All @@ -127,6 +129,7 @@
import org.apache.kafka.common.message.LeaveGroupRequestData;
import org.apache.kafka.common.message.ListOffsetsRequestData;
import org.apache.kafka.common.message.ListOffsetsResponseData;
import org.apache.kafka.common.message.ListTransactionsResponseData;
import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.ProduceRequestData;
import org.apache.kafka.common.message.SaslAuthenticateResponseData;
Expand Down Expand Up @@ -159,6 +162,8 @@
import org.apache.kafka.common.requests.DescribeConfigsRequest;
import org.apache.kafka.common.requests.DescribeConfigsResponse;
import org.apache.kafka.common.requests.DescribeGroupsRequest;
import org.apache.kafka.common.requests.DescribeTransactionsRequest;
import org.apache.kafka.common.requests.DescribeTransactionsResponse;
import org.apache.kafka.common.requests.EndTxnRequest;
import org.apache.kafka.common.requests.EndTxnResponse;
import org.apache.kafka.common.requests.FetchRequest;
Expand All @@ -175,6 +180,8 @@
import org.apache.kafka.common.requests.ListOffsetRequestV0;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.common.requests.ListOffsetsResponse;
import org.apache.kafka.common.requests.ListTransactionsRequest;
import org.apache.kafka.common.requests.ListTransactionsResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse.PartitionMetadata;
import org.apache.kafka.common.requests.MetadataResponse.TopicMetadata;
Expand Down Expand Up @@ -2017,8 +2024,36 @@ protected void handleDescribeGroupRequest(KafkaHeaderAndRequest describeGroup,
protected void handleListGroupsRequest(KafkaHeaderAndRequest listGroups,
CompletableFuture<AbstractResponse> resultFuture) {
checkArgument(listGroups.getRequest() instanceof ListGroupsRequest);
KeyValue<Errors, List<GroupOverview>> listResult = getGroupCoordinator().handleListGroups();
resultFuture.complete(KafkaResponseUtils.newListGroups(listResult.getKey(), listResult.getValue()));
Either<Errors, List<GroupOverview>> listResult = getGroupCoordinator().handleListGroups();
resultFuture.complete(KafkaResponseUtils.newListGroups(listResult));
}

@Override
protected void handleListTransactionsRequest(KafkaHeaderAndRequest listTransactions,
CompletableFuture<AbstractResponse> resultFuture) {
checkArgument(listTransactions.getRequest() instanceof ListTransactionsRequest);
ListTransactionsRequest request = (ListTransactionsRequest) listTransactions.getRequest();
List<String> stateFilters = request.data().stateFilters();
if (stateFilters == null) {
stateFilters = Collections.emptyList();
}
List<Long> producerIdFilters = request.data().producerIdFilters();
if (producerIdFilters == null) {
producerIdFilters = Collections.emptyList();
}
ListTransactionsResponseData listResult = getTransactionCoordinator()
.handleListTransactions(stateFilters, producerIdFilters);
resultFuture.complete(new ListTransactionsResponse(listResult));
}

@Override
protected void handleDescribeTransactionsRequest(KafkaHeaderAndRequest listGroups,
CompletableFuture<AbstractResponse> response) {
checkArgument(listGroups.getRequest() instanceof DescribeTransactionsRequest);
DescribeTransactionsRequest request = (DescribeTransactionsRequest) listGroups.getRequest();
DescribeTransactionsResponseData describeResult = getTransactionCoordinator()
.handleDescribeTransactions(request.data().transactionalIds());
response.complete(new DescribeTransactionsResponse(describeResult));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadata.GroupOverview;
import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadata.GroupSummary;
import io.streamnative.pulsar.handlers.kop.offset.OffsetAndMetadata;
import io.streamnative.pulsar.handlers.kop.scala.Either;
import io.streamnative.pulsar.handlers.kop.utils.CoreUtils;
import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationKey.GroupKey;
import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationKey.MemberKey;
Expand Down Expand Up @@ -832,22 +833,16 @@ public KeyValue<Errors, Map<TopicPartition, PartitionData>> handleFetchOffsets(
);
}

public KeyValue<Errors, List<GroupOverview>> handleListGroups() {
public Either<Errors, List<GroupOverview>> handleListGroups() {
if (!isActive.get()) {
return new KeyValue<>(Errors.COORDINATOR_NOT_AVAILABLE, new ArrayList<>());
return Either.left(Errors.COORDINATOR_NOT_AVAILABLE);
} else {
Errors errors;
if (groupManager.isLoading()) {
errors = Errors.COORDINATOR_LOAD_IN_PROGRESS;
} else {
errors = Errors.NONE;
return Either.left(Errors.COORDINATOR_LOAD_IN_PROGRESS);
}
List<GroupOverview> overviews = new ArrayList<>();
groupManager.currentGroups().forEach(group -> overviews.add(group.overview()));
return new KeyValue<>(
errors,
overviews
);
return Either.right(overviews);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.streamnative.pulsar.handlers.kop.coordinator.transaction;

import static io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionState.DEAD;
import static io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionState.ONGOING;
import static io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionState.PREPARE_ABORT;
import static io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionState.PREPARE_COMMIT;
Expand All @@ -32,6 +33,7 @@
import io.streamnative.pulsar.handlers.kop.storage.PulsarPartitionedTopicProducerStateManagerSnapshotBuffer;
import io.streamnative.pulsar.handlers.kop.utils.MetadataUtils;
import io.streamnative.pulsar.handlers.kop.utils.ProducerIdAndEpoch;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand All @@ -52,6 +54,8 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.message.DescribeTransactionsResponseData;
import org.apache.kafka.common.message.ListTransactionsResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.requests.TransactionResult;
Expand Down Expand Up @@ -80,6 +84,8 @@ public class TransactionCoordinator {

private final Time time;

private final AtomicBoolean isActive = new AtomicBoolean(false);

private static final BiConsumer<TransactionStateManager.TransactionalIdAndProducerIdEpoch, Errors>
onEndTransactionComplete =
(txnIdAndPidEpoch, errors) -> {
Expand Down Expand Up @@ -218,6 +224,87 @@ public static String getTopicPartitionName(String topicPartitionName, int partit
return topicPartitionName + PARTITIONED_TOPIC_SUFFIX + partitionId;
}

public ListTransactionsResponseData handleListTransactions(List<String> filteredStates,
List<Long> filteredProducerIds) {
// https://github.com/apache/kafka/blob/915991445fde106d02e61a70425ae2601c813db0/core/
// src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L259
if (!isActive.get()) {
log.warn("The transaction coordinator is not active, so it will reject list transaction request");
return new ListTransactionsResponseData().setErrorCode(Errors.NOT_COORDINATOR.code());
}
return this.txnManager.listTransactionStates(filteredProducerIds, filteredStates);
}

public DescribeTransactionsResponseData handleDescribeTransactions(List<String> transactionalIds) {
DescribeTransactionsResponseData response = new DescribeTransactionsResponseData();
if (transactionalIds != null) {
transactionalIds.forEach(transactionalId -> {
DescribeTransactionsResponseData.TransactionState transactionState =
handleDescribeTransactions(transactionalId);
response.transactionStates().add(transactionState);
});
}
return response;
}

private DescribeTransactionsResponseData.TransactionState handleDescribeTransactions(String transactionalId) {
// https://github.com/apache/kafka/blob/915991445fde106d02e61a70425ae2601c813db0/core/
// src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L270
if (transactionalId == null) {
throw new IllegalArgumentException("Invalid null transactionalId");
}

DescribeTransactionsResponseData.TransactionState transactionState =
new DescribeTransactionsResponseData.TransactionState()
.setTransactionalId(transactionalId);

if (!isActive.get()) {
transactionState.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code());
} else if (transactionalId.isEmpty()) {
transactionState.setErrorCode(Errors.INVALID_REQUEST.code());
} else {
Either<Errors, Optional<CoordinatorEpochAndTxnMetadata>> tState =
txnManager.getTransactionState(transactionalId);
if (tState.isLeft()) {
transactionState.setErrorCode(tState.getLeft().code());
} else {
Optional<CoordinatorEpochAndTxnMetadata> right = tState.getRight();
if (!right.isPresent()) {
transactionState.setErrorCode(Errors.TRANSACTIONAL_ID_NOT_FOUND.code());
} else {
CoordinatorEpochAndTxnMetadata coordinatorEpochAndMetadata = right.get();
TransactionMetadata txnMetadata = coordinatorEpochAndMetadata.getTransactionMetadata();
txnMetadata.inLock(() -> {
if (txnMetadata.getState() == DEAD) {
// The transaction state is being expired, so ignore it
transactionState.setErrorCode(Errors.TRANSACTIONAL_ID_NOT_FOUND.code());
} else {
txnMetadata.getTopicPartitions().forEach(topicPartition -> {
var topicData = transactionState.topics().find(topicPartition.topic());
if (topicData == null) {
topicData = new DescribeTransactionsResponseData.TopicData()
.setTopic(topicPartition.topic());
transactionState.topics().add(topicData);
}
topicData.partitions().add(topicPartition.partition());
});

transactionState
.setErrorCode(Errors.NONE.code())
.setProducerId(txnMetadata.getProducerId())
.setProducerEpoch(txnMetadata.getProducerEpoch())
.setTransactionState(txnMetadata.getState().toAdminState().toString())
.setTransactionTimeoutMs(txnMetadata.getTxnTimeoutMs())
.setTransactionStartTimeMs(txnMetadata.getTxnStartTimestamp());
}
return null;
});
}
}
}
return transactionState;
}

@Data
@EqualsAndHashCode
@AllArgsConstructor
Expand Down Expand Up @@ -956,6 +1043,7 @@ public CompletableFuture<Void> startup(boolean enableTransactionalIdExpiration)

return this.producerIdManager.initialize().thenCompose(ignored -> {
log.info("{} Startup transaction coordinator complete.", namespacePrefixForMetadata);
isActive.set(true);
return CompletableFuture.completedFuture(null);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,4 +117,26 @@ public boolean isExpirationAllowed() {
return false;
}
}

public org.apache.kafka.clients.admin.TransactionState toAdminState() {
switch (this) {
case EMPTY:
return org.apache.kafka.clients.admin.TransactionState.EMPTY;
case ONGOING:
return org.apache.kafka.clients.admin.TransactionState.ONGOING;
case PREPARE_COMMIT:
return org.apache.kafka.clients.admin.TransactionState.PREPARE_COMMIT;
case PREPARE_ABORT:
return org.apache.kafka.clients.admin.TransactionState.PREPARE_ABORT;
case COMPLETE_COMMIT:
return org.apache.kafka.clients.admin.TransactionState.COMPLETE_COMMIT;
case COMPLETE_ABORT:
return org.apache.kafka.clients.admin.TransactionState.COMPLETE_ABORT;
case PREPARE_EPOCH_FENCE:
return org.apache.kafka.clients.admin.TransactionState.PREPARE_EPOCH_FENCE;
case DEAD:
default:
return org.apache.kafka.clients.admin.TransactionState.UNKNOWN;
}
}
}
Loading