Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

[transactions] Implement KIP-664 DescribeProducers #1986

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.message.FetchRequestData;

@Slf4j
public class DelayedFetch extends DelayedOperation {
Expand All @@ -33,7 +33,7 @@ public class DelayedFetch extends DelayedOperation {
private final long bytesReadable;
private final int fetchMaxBytes;
private final boolean readCommitted;
private final Map<TopicPartition, FetchRequest.PartitionData> readPartitionInfo;
private final Map<TopicPartition, FetchRequestData.FetchPartition> readPartitionInfo;
private final Map<TopicPartition, PartitionLog.ReadRecordsResult> readRecordsResult;
private final MessageFetchContext context;
protected volatile Boolean hasError;
Expand All @@ -55,7 +55,7 @@ public DelayedFetch(final long delayMs,
final boolean readCommitted,
final MessageFetchContext context,
final ReplicaManager replicaManager,
final Map<TopicPartition, FetchRequest.PartitionData> readPartitionInfo,
final Map<TopicPartition, FetchRequestData.FetchPartition> readPartitionInfo,
final Map<TopicPartition, PartitionLog.ReadRecordsResult> readRecordsResult,
final CompletableFuture<Map<TopicPartition, PartitionLog.ReadRecordsResult>> callback) {
super(delayMs, Optional.empty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,9 +311,18 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
case DESCRIBE_GROUPS:
handleDescribeGroupRequest(kafkaHeaderAndRequest, responseFuture);
break;
case DESCRIBE_PRODUCERS:
handleDescribeProducersRequest(kafkaHeaderAndRequest, responseFuture);
break;
case LIST_GROUPS:
handleListGroupsRequest(kafkaHeaderAndRequest, responseFuture);
break;
case LIST_TRANSACTIONS:
handleListTransactionsRequest(kafkaHeaderAndRequest, responseFuture);
break;
case DESCRIBE_TRANSACTIONS:
handleDescribeTransactionsRequest(kafkaHeaderAndRequest, responseFuture);
break;
case DELETE_GROUPS:
handleDeleteGroupsRequest(kafkaHeaderAndRequest, responseFuture);
break;
Expand Down Expand Up @@ -454,7 +463,14 @@ protected void writeAndFlushResponseToClient(Channel channel) {
request, response);
}

final ByteBuf result = responseToByteBuf(response, request, true);
final ByteBuf result;
try {
result = responseToByteBuf(response, request, true);
} catch (Throwable error) {
log.error("[{}] Failed to convert response {} to ByteBuf", channel, response, error);
sendErrorResponse(request, channel, error, true);
return;
}
final int resultSize = result.readableBytes();
channel.writeAndFlush(result).addListener(future -> {
if (response instanceof ResponseCallbackWrapper) {
Expand Down Expand Up @@ -560,11 +576,22 @@ protected void handleError(KafkaHeaderAndRequest kafkaHeaderAndRequest,
handleLeaveGroupRequest(KafkaHeaderAndRequest leaveGroup, CompletableFuture<AbstractResponse> response);

protected abstract void
handleDescribeGroupRequest(KafkaHeaderAndRequest describeGroup, CompletableFuture<AbstractResponse> response);
handleDescribeGroupRequest(KafkaHeaderAndRequest kafkaHeaderAndRequest,
CompletableFuture<AbstractResponse> response);

protected abstract void
handleDescribeProducersRequest(KafkaHeaderAndRequest kafkaHeaderAndRequest,
CompletableFuture<AbstractResponse> response);

protected abstract void
handleListGroupsRequest(KafkaHeaderAndRequest listGroups, CompletableFuture<AbstractResponse> response);

protected abstract void
handleListTransactionsRequest(KafkaHeaderAndRequest listGroups, CompletableFuture<AbstractResponse> response);

protected abstract void
handleDescribeTransactionsRequest(KafkaHeaderAndRequest listGroups, CompletableFuture<AbstractResponse> response);

protected abstract void
handleDeleteGroupsRequest(KafkaHeaderAndRequest deleteGroups, CompletableFuture<AbstractResponse> response);

Expand Down
Loading