Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

[transactions] Implement KIP-664 listTransactions #1984

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
case LIST_GROUPS:
handleListGroupsRequest(kafkaHeaderAndRequest, responseFuture);
break;
case LIST_TRANSACTIONS:
handleListTransactionsRequest(kafkaHeaderAndRequest, responseFuture);
break;
case DELETE_GROUPS:
handleDeleteGroupsRequest(kafkaHeaderAndRequest, responseFuture);
break;
Expand Down Expand Up @@ -583,6 +586,9 @@ protected void handleError(KafkaHeaderAndRequest kafkaHeaderAndRequest,
protected abstract void
handleListGroupsRequest(KafkaHeaderAndRequest listGroups, CompletableFuture<AbstractResponse> response);

protected abstract void
handleListTransactionsRequest(KafkaHeaderAndRequest listGroups, CompletableFuture<AbstractResponse> response);

protected abstract void
handleDeleteGroupsRequest(KafkaHeaderAndRequest deleteGroups, CompletableFuture<AbstractResponse> response);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.streamnative.pulsar.handlers.kop.exceptions.KoPTopicException;
import io.streamnative.pulsar.handlers.kop.offset.OffsetAndMetadata;
import io.streamnative.pulsar.handlers.kop.offset.OffsetMetadata;
import io.streamnative.pulsar.handlers.kop.scala.Either;
import io.streamnative.pulsar.handlers.kop.security.SaslAuthenticator;
import io.streamnative.pulsar.handlers.kop.security.Session;
import io.streamnative.pulsar.handlers.kop.security.auth.Authorizer;
Expand Down Expand Up @@ -127,6 +128,7 @@
import org.apache.kafka.common.message.LeaveGroupRequestData;
import org.apache.kafka.common.message.ListOffsetsRequestData;
import org.apache.kafka.common.message.ListOffsetsResponseData;
import org.apache.kafka.common.message.ListTransactionsResponseData;
import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.ProduceRequestData;
import org.apache.kafka.common.message.SaslAuthenticateResponseData;
Expand Down Expand Up @@ -175,6 +177,8 @@
import org.apache.kafka.common.requests.ListOffsetRequestV0;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.common.requests.ListOffsetsResponse;
import org.apache.kafka.common.requests.ListTransactionsRequest;
import org.apache.kafka.common.requests.ListTransactionsResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse.PartitionMetadata;
import org.apache.kafka.common.requests.MetadataResponse.TopicMetadata;
Expand Down Expand Up @@ -2017,8 +2021,26 @@ protected void handleDescribeGroupRequest(KafkaHeaderAndRequest describeGroup,
protected void handleListGroupsRequest(KafkaHeaderAndRequest listGroups,
CompletableFuture<AbstractResponse> resultFuture) {
checkArgument(listGroups.getRequest() instanceof ListGroupsRequest);
KeyValue<Errors, List<GroupOverview>> listResult = getGroupCoordinator().handleListGroups();
resultFuture.complete(KafkaResponseUtils.newListGroups(listResult.getKey(), listResult.getValue()));
Either<Errors, List<GroupOverview>> listResult = getGroupCoordinator().handleListGroups();
resultFuture.complete(KafkaResponseUtils.newListGroups(listResult));
}

@Override
protected void handleListTransactionsRequest(KafkaHeaderAndRequest listTransactions,
CompletableFuture<AbstractResponse> resultFuture) {
checkArgument(listTransactions.getRequest() instanceof ListTransactionsRequest);
ListTransactionsRequest request = (ListTransactionsRequest) listTransactions.getRequest();
List<String> stateFilters = request.data().stateFilters();
if (stateFilters == null) {
stateFilters = Collections.emptyList();
}
List<Long> producerIdFilters = request.data().producerIdFilters();
if (producerIdFilters == null) {
producerIdFilters = Collections.emptyList();
}
ListTransactionsResponseData listResult = getTransactionCoordinator()
.handleListTransactions(stateFilters, producerIdFilters);
resultFuture.complete(new ListTransactionsResponse(listResult));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadata.GroupOverview;
import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadata.GroupSummary;
import io.streamnative.pulsar.handlers.kop.offset.OffsetAndMetadata;
import io.streamnative.pulsar.handlers.kop.scala.Either;
import io.streamnative.pulsar.handlers.kop.utils.CoreUtils;
import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationKey.GroupKey;
import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationKey.MemberKey;
Expand Down Expand Up @@ -832,22 +833,16 @@ public KeyValue<Errors, Map<TopicPartition, PartitionData>> handleFetchOffsets(
);
}

public KeyValue<Errors, List<GroupOverview>> handleListGroups() {
public Either<Errors, List<GroupOverview>> handleListGroups() {
if (!isActive.get()) {
return new KeyValue<>(Errors.COORDINATOR_NOT_AVAILABLE, new ArrayList<>());
return Either.left(Errors.COORDINATOR_NOT_AVAILABLE);
} else {
Errors errors;
if (groupManager.isLoading()) {
errors = Errors.COORDINATOR_LOAD_IN_PROGRESS;
} else {
errors = Errors.NONE;
return Either.left(Errors.COORDINATOR_LOAD_IN_PROGRESS);
}
List<GroupOverview> overviews = new ArrayList<>();
groupManager.currentGroups().forEach(group -> overviews.add(group.overview()));
return new KeyValue<>(
errors,
overviews
);
return Either.right(overviews);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.streamnative.pulsar.handlers.kop.storage.PulsarPartitionedTopicProducerStateManagerSnapshotBuffer;
import io.streamnative.pulsar.handlers.kop.utils.MetadataUtils;
import io.streamnative.pulsar.handlers.kop.utils.ProducerIdAndEpoch;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand All @@ -52,6 +53,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.message.ListTransactionsResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.requests.TransactionResult;
Expand Down Expand Up @@ -80,6 +82,8 @@ public class TransactionCoordinator {

private final Time time;

private final AtomicBoolean isActive = new AtomicBoolean(false);

private static final BiConsumer<TransactionStateManager.TransactionalIdAndProducerIdEpoch, Errors>
onEndTransactionComplete =
(txnIdAndPidEpoch, errors) -> {
Expand Down Expand Up @@ -218,6 +222,17 @@ public static String getTopicPartitionName(String topicPartitionName, int partit
return topicPartitionName + PARTITIONED_TOPIC_SUFFIX + partitionId;
}

public ListTransactionsResponseData handleListTransactions(List<String> filteredStates,
List<Long> filteredProducerIds) {
// https://github.com/apache/kafka/blob/915991445fde106d02e61a70425ae2601c813db0/core/
// src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L259
if (!isActive.get()) {
log.warn("The transaction coordinator is not active, so it will reject list transaction request");
return new ListTransactionsResponseData().setErrorCode(Errors.NOT_COORDINATOR.code());
}
return this.txnManager.listTransactionStates(filteredProducerIds, filteredStates);
}

@Data
@EqualsAndHashCode
@AllArgsConstructor
Expand Down Expand Up @@ -956,6 +971,7 @@ public CompletableFuture<Void> startup(boolean enableTransactionalIdExpiration)

return this.producerIdManager.initialize().thenCompose(ignored -> {
log.info("{} Startup transaction coordinator complete.", namespacePrefixForMetadata);
isActive.set(true);
return CompletableFuture.completedFuture(null);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,4 +117,26 @@ public boolean isExpirationAllowed() {
return false;
}
}

public org.apache.kafka.clients.admin.TransactionState toAdminState() {
switch (this) {
case EMPTY:
return org.apache.kafka.clients.admin.TransactionState.EMPTY;
case ONGOING:
return org.apache.kafka.clients.admin.TransactionState.ONGOING;
case PREPARE_COMMIT:
return org.apache.kafka.clients.admin.TransactionState.PREPARE_COMMIT;
case PREPARE_ABORT:
return org.apache.kafka.clients.admin.TransactionState.PREPARE_ABORT;
case COMPLETE_COMMIT:
return org.apache.kafka.clients.admin.TransactionState.COMPLETE_COMMIT;
case COMPLETE_ABORT:
return org.apache.kafka.clients.admin.TransactionState.COMPLETE_ABORT;
case PREPARE_EPOCH_FENCE:
return org.apache.kafka.clients.admin.TransactionState.PREPARE_EPOCH_FENCE;
case DEAD:
default:
return org.apache.kafka.clients.admin.TransactionState.UNKNOWN;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -40,6 +41,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.ListTransactionsResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.SchemaException;
import org.apache.kafka.common.requests.ProduceResponse;
Expand Down Expand Up @@ -245,6 +247,71 @@ private boolean shouldExpire(TransactionMetadata txnMetadata, Long currentTimeMs
<= (currentTimeMs - transactionConfig.getTransactionalIdExpirationMs());
}

private static boolean shouldInclude(TransactionMetadata txnMetadata,
List<Long> filterProducerIds, Set<String> filterStateNames) {
if (txnMetadata.getState() == TransactionState.DEAD) {
// We filter the `Dead` state since it is a transient state which
// indicates that the transactionalId and its metadata are in the
// process of expiration and removal.
return false;
} else if (!filterProducerIds.isEmpty() && !filterProducerIds.contains(txnMetadata.getProducerId())) {
return false;
} else if (!filterStateNames.isEmpty() && !filterStateNames.contains(
txnMetadata.getState().toAdminState().toString())) {
return false;
} else {
return true;
}
}

public ListTransactionsResponseData listTransactionStates(List<Long> filteredProducerIds,
List<String> filteredStates) {
return CoreUtils.inReadLock(stateLock, () -> {
ListTransactionsResponseData response = new ListTransactionsResponseData();
if (!loadingPartitions.isEmpty()) {
response.setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code());
} else {
Set<String> filterStates = new HashSet<>();
for (TransactionState stateName : TransactionState.values()) {
String nameForTheClient = stateName.toAdminState().toString();
if (filteredStates.contains(nameForTheClient)) {
filterStates.add(nameForTheClient);
} else {
response.unknownStateFilters().add(nameForTheClient);
}
}
List<ListTransactionsResponseData.TransactionState> states = new ArrayList<>();
transactionMetadataCache.forEach((__, cache) -> {
cache.values().forEach(txnMetadata -> {
txnMetadata.inLock(() -> {
// use toString() to get the name of the state according to the protocol
ListTransactionsResponseData.TransactionState transactionState =
new ListTransactionsResponseData.TransactionState()
.setTransactionalId(txnMetadata.getTransactionalId())
.setProducerId(txnMetadata.getProducerId())
.setTransactionState(txnMetadata.getState().toAdminState().toString());

if (shouldInclude(txnMetadata, filteredProducerIds, filterStates)) {
if (log.isDebugEnabled()) {
log.debug("add transaction state: {}", transactionState);
}
states.add(transactionState);
} else {
if (log.isDebugEnabled()) {
log.debug("Skip transaction state: {}", transactionState);
}
}
return null;
});
});
});
response.setErrorCode(Errors.NONE.code())
.setTransactionStates(states);
}
return response;
});
}

@Data
@AllArgsConstructor
private static class TransactionalIdCoordinatorEpochAndMetadata {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import io.streamnative.pulsar.handlers.kop.ApiVersion;
import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadata;
import io.streamnative.pulsar.handlers.kop.scala.Either;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -269,14 +270,18 @@ public static LeaveGroupResponse newLeaveGroup(Errors errors) {
return new LeaveGroupResponse(data);
}

public static ListGroupsResponse newListGroups(Errors errors,
List<GroupMetadata.GroupOverview> groups) {
public static ListGroupsResponse newListGroups(Either<Errors, List<GroupMetadata.GroupOverview>> results) {
ListGroupsResponseData data = new ListGroupsResponseData();
data.setErrorCode(errors.code());
data.setGroups(groups.stream().map(overView -> new ListGroupsResponseData.ListedGroup()
.setGroupId(overView.groupId())
.setProtocolType(overView.protocolType()))
.collect(Collectors.toList()));
data.setErrorCode(results.isLeft() ? results.getLeft().code() : Errors.NONE.code());
if (!results.isLeft()) {
data.setGroups(results.getRight().stream().map(overView -> new ListGroupsResponseData.ListedGroup()
.setGroupId(overView.groupId())
.setProtocolType(overView.protocolType()))
.collect(Collectors.toList()));

} else {
data.setGroups(Collections.emptyList());
}
return new ListGroupsResponse(data);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
Expand All @@ -28,6 +29,7 @@
import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadata.GroupSummary;
import io.streamnative.pulsar.handlers.kop.coordinator.group.MemberMetadata.MemberSummary;
import io.streamnative.pulsar.handlers.kop.offset.OffsetAndMetadata;
import io.streamnative.pulsar.handlers.kop.scala.Either;
import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationPurgatory;
import io.streamnative.pulsar.handlers.kop.utils.timer.MockTimer;
import java.util.ArrayList;
Expand Down Expand Up @@ -218,8 +220,8 @@ public void testRequestHandlingWhileLoadingInProgress() throws Exception {
assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS, describeGroupResult.getKey());

// ListGroups
KeyValue<Errors, List<GroupOverview>> listGroupsResult = groupCoordinator.handleListGroups();
assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS, listGroupsResult.getKey());
Either<Errors, List<GroupOverview>> listGroupsResult = groupCoordinator.handleListGroups();
assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS, listGroupsResult.getLeft());

// DeleteGroups
Map<String, Errors> deleteGroupsErrors = groupCoordinator.handleDeleteGroups(
Expand Down Expand Up @@ -1695,12 +1697,12 @@ groupId, memberId, protocolType, newProtocols()
).get();
assertEquals(Errors.NONE, syncGroupResult.getKey());

KeyValue<Errors, List<GroupOverview>> groups = groupCoordinator.handleListGroups();
assertEquals(Errors.NONE, groups.getKey());
assertEquals(1, groups.getValue().size());
Either<Errors, List<GroupOverview>> groups = groupCoordinator.handleListGroups();
assertFalse(groups.isLeft());
assertEquals(1, groups.getRight().size());
assertEquals(
new GroupOverview("groupId", "consumer"),
groups.getValue().get(0)
groups.getRight().get(0)
);
}

Expand All @@ -1712,12 +1714,12 @@ groupId, memberId, protocolType, newProtocols()
);
assertEquals(Errors.NONE, joinGroupResult.getError());

KeyValue<Errors, List<GroupOverview>> groups = groupCoordinator.handleListGroups();
assertEquals(Errors.NONE, groups.getKey());
assertEquals(1, groups.getValue().size());
Either<Errors, List<GroupOverview>> groups = groupCoordinator.handleListGroups();
assertFalse(groups.isLeft());
assertEquals(1, groups.getRight().size());
assertEquals(
new GroupOverview("groupId", "consumer"),
groups.getValue().get(0)
groups.getRight().get(0)
);
}

Expand Down
Loading