Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3814,7 +3814,7 @@ public AlterShareGroupOffsetsResult alterShareGroupOffsets(final String groupId,
@Override
public ListShareGroupOffsetsResult listShareGroupOffsets(final Map<String, ListShareGroupOffsetsSpec> groupSpecs,
final ListShareGroupOffsetsOptions options) {
SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> future = ListShareGroupOffsetsHandler.newFuture(groupSpecs.keySet());
SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, SharePartitionOffsetInfo>> future = ListShareGroupOffsetsHandler.newFuture(groupSpecs.keySet());
ListShareGroupOffsetsHandler handler = new ListShareGroupOffsetsHandler(groupSpecs, logContext);
invokeDriver(handler, future, options.timeoutMs);
return new ListShareGroupOffsetsResult(future.all());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.kafka.clients.admin;

import org.apache.kafka.clients.admin.internals.CoordinatorKey;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.annotation.InterfaceStability;
Expand All @@ -36,22 +35,22 @@
@InterfaceStability.Evolving
public class ListShareGroupOffsetsResult {

private final Map<String, KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> futures;
private final Map<String, KafkaFuture<Map<TopicPartition, SharePartitionOffsetInfo>>> futures;

ListShareGroupOffsetsResult(final Map<CoordinatorKey, KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> futures) {
ListShareGroupOffsetsResult(final Map<CoordinatorKey, KafkaFuture<Map<TopicPartition, SharePartitionOffsetInfo>>> futures) {
this.futures = futures.entrySet().stream()
.collect(Collectors.toMap(e -> e.getKey().idValue, Map.Entry::getValue));
}

/**
* Return the future when the requests for all groups succeed.
*
* @return Future which yields all {@code Map<String, Map<TopicPartition, OffsetAndMetadata>>} objects, if requests for all the groups succeed.
* @return Future which yields all {@code Map<String, Map<TopicPartition, SharePartitionOffsetInfo>>} objects, if requests for all the groups succeed.
*/
public KafkaFuture<Map<String, Map<TopicPartition, OffsetAndMetadata>>> all() {
public KafkaFuture<Map<String, Map<TopicPartition, SharePartitionOffsetInfo>>> all() {
return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture<?>[0])).thenApply(
nil -> {
Map<String, Map<TopicPartition, OffsetAndMetadata>> offsets = new HashMap<>(futures.size());
Map<String, Map<TopicPartition, SharePartitionOffsetInfo>> offsets = new HashMap<>(futures.size());
futures.forEach((groupId, future) -> {
try {
offsets.put(groupId, future.get());
Expand All @@ -67,9 +66,9 @@ public KafkaFuture<Map<String, Map<TopicPartition, OffsetAndMetadata>>> all() {

/**
* Return a future which yields a map of topic partitions to offsets for the specified group. If the group doesn't
* have a committed offset for a specific partition, the corresponding value in the returned map will be null.
* have offset information for a specific partition, the corresponding value in the returned map will be null.
*/
public KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> partitionsToOffsetAndMetadata(String groupId) {
public KafkaFuture<Map<TopicPartition, SharePartitionOffsetInfo>> partitionsToOffsetInfo(String groupId) {
if (!futures.containsKey(groupId)) {
throw new IllegalArgumentException("Group ID not found: " + groupId);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.clients.admin;

import org.apache.kafka.common.annotation.InterfaceStability;

import java.util.Objects;
import java.util.Optional;

/**
* This class is used to contain the offset and lag information for a share-partition.
*/
@InterfaceStability.Evolving
public class SharePartitionOffsetInfo {
private final long startOffset;
private final Optional<Integer> leaderEpoch;
private final Optional<Long> lag;

/**
* Construct a new SharePartitionOffsetInfo.
*
* @param startOffset The share-partition start offset
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I'm a bit confused by this "start offset". For example, If a partition has a latest offset of 23547540 and a share consumer has a "start offset" of 23541472, how do we explain the difference between those numbers?

chia7712@fedora:~/project/kafka$ ./bin/kafka-get-offsets.sh --bootstrap-server localhost:20000 --topic chia
chia:0:23547540
chia7712@fedora:~/project/kafka$ ./bin/kafka-share-groups.sh  --bootstrap-server localhost:20000 --offsets --describe --all-groups

GROUP               TOPIC           PARTITION  START-OFFSET  LAG
perf-share-consumer chia            0          23541472      -

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The start offset is a little like the committed offset for a consumer group. The lag is the number of records to be delivered between the start offset and the latest offset.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AndrewJSchofield thanks for your response! I have a follow-up question.

The start offset is a little like the committed offset for a consumer group.

Will the start offset eventually equal to the latest offset of the partition?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When consumption catches up with production, yes.

* @param leaderEpoch The optional leader epoch of the share-partition
* @param lag The optional lag for the share-partition
*/
public SharePartitionOffsetInfo(long startOffset, Optional<Integer> leaderEpoch, Optional<Long> lag) {
this.startOffset = startOffset;
this.leaderEpoch = leaderEpoch;
this.lag = lag;
}

public long startOffset() {
return startOffset;
}

public Optional<Integer> leaderEpoch() {
return leaderEpoch;
}

public Optional<Long> lag() {
return lag;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SharePartitionOffsetInfo that = (SharePartitionOffsetInfo) o;
return startOffset == that.startOffset &&
Objects.equals(leaderEpoch, that.leaderEpoch) &&
Objects.equals(lag, that.lag);
}

@Override
public int hashCode() {
return Objects.hash(startOffset, leaderEpoch, lag);
}

@Override
public String toString() {
return "SharePartitionOffsetInfo{" +
"startOffset=" + startOffset +
", leaderEpoch=" + leaderEpoch.orElse(null) +
", lag=" + lag.orElse(null) +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.ListShareGroupOffsetsOptions;
import org.apache.kafka.clients.admin.ListShareGroupOffsetsSpec;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.admin.SharePartitionOffsetInfo;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.DescribeShareGroupOffsetsRequestData;
Expand Down Expand Up @@ -47,7 +47,7 @@
/**
* This class is the handler for {@link KafkaAdminClient#listShareGroupOffsets(Map, ListShareGroupOffsetsOptions)} call
*/
public class ListShareGroupOffsetsHandler extends AdminApiHandler.Batched<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> {
public class ListShareGroupOffsetsHandler extends AdminApiHandler.Batched<CoordinatorKey, Map<TopicPartition, SharePartitionOffsetInfo>> {

private final Map<String, ListShareGroupOffsetsSpec> groupSpecs;
private final Logger log;
Expand All @@ -60,7 +60,7 @@ public ListShareGroupOffsetsHandler(Map<String, ListShareGroupOffsetsSpec> group
this.lookupStrategy = new CoordinatorStrategy(CoordinatorType.GROUP, logContext);
}

public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> newFuture(Collection<String> groupIds) {
public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, SharePartitionOffsetInfo>> newFuture(Collection<String> groupIds) {
return AdminApiFuture.forKeys(coordinatorKeys(groupIds));
}

Expand Down Expand Up @@ -110,13 +110,13 @@ public DescribeShareGroupOffsetsRequest.Builder buildBatchedRequest(int coordina
}

@Override
public ApiResult<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> handleResponse(Node coordinator,
Set<CoordinatorKey> groupIds,
AbstractResponse abstractResponse) {
public ApiResult<CoordinatorKey, Map<TopicPartition, SharePartitionOffsetInfo>> handleResponse(Node coordinator,
Set<CoordinatorKey> groupIds,
AbstractResponse abstractResponse) {
validateKeys(groupIds);

final DescribeShareGroupOffsetsResponse response = (DescribeShareGroupOffsetsResponse) abstractResponse;
final Map<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> completed = new HashMap<>();
final Map<CoordinatorKey, Map<TopicPartition, SharePartitionOffsetInfo>> completed = new HashMap<>();
final Map<CoordinatorKey, Throwable> failed = new HashMap<>();
final List<CoordinatorKey> unmapped = new ArrayList<>();

Expand All @@ -125,7 +125,7 @@ public ApiResult<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> handleR
if (response.hasGroupError(groupId)) {
handleGroupError(coordinatorKey, response.groupError(groupId), failed, unmapped);
} else {
Map<TopicPartition, OffsetAndMetadata> groupOffsetsListing = new HashMap<>();
Map<TopicPartition, SharePartitionOffsetInfo> groupOffsetsListing = new HashMap<>();
response.data().groups().stream().filter(g -> g.groupId().equals(groupId)).forEach(groupResponse -> {
for (DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic topicResponse : groupResponse.topics()) {
for (DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition partitionResponse : topicResponse.partitions()) {
Expand All @@ -137,7 +137,7 @@ public ApiResult<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> handleR
if (partitionResponse.startOffset() < 0) {
groupOffsetsListing.put(tp, null);
} else {
groupOffsetsListing.put(tp, new OffsetAndMetadata(startOffset, leaderEpoch, ""));
groupOffsetsListing.put(tp, new SharePartitionOffsetInfo(startOffset, leaderEpoch, Optional.empty()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pardon me, will the lag be implemented later?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Very soon. https://issues.apache.org/jira/browse/KAFKA-19778 for the tasks we are implementing in parallel.

}
} else {
log.warn("Skipping return offset for {} due to error {}: {}.", tp, partitionResponse.errorCode(), partitionResponse.errorMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,8 @@ public static ListConfigResourcesResult listConfigResourcesResult(KafkaException
return new ListConfigResourcesResult(future);
}

public static ListShareGroupOffsetsResult createListShareGroupOffsetsResult(Map<String, KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> groupOffsets) {
Map<CoordinatorKey, KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> coordinatorFutures = groupOffsets.entrySet().stream()
public static ListShareGroupOffsetsResult createListShareGroupOffsetsResult(Map<String, KafkaFuture<Map<TopicPartition, SharePartitionOffsetInfo>>> groupOffsets) {
Map<CoordinatorKey, KafkaFuture<Map<TopicPartition, SharePartitionOffsetInfo>>> coordinatorFutures = groupOffsets.entrySet().stream()
.collect(Collectors.toMap(
entry -> CoordinatorKey.byGroupId(entry.getKey()),
Map.Entry::getValue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11186,15 +11186,15 @@ public void testListShareGroupOffsets() throws Exception {
env.kafkaClient().prepareResponse(new DescribeShareGroupOffsetsResponse(data));

final ListShareGroupOffsetsResult result = env.adminClient().listShareGroupOffsets(groupSpecs);
final Map<TopicPartition, OffsetAndMetadata> partitionToOffsetAndMetadata = result.partitionsToOffsetAndMetadata(GROUP_ID).get();
final Map<TopicPartition, SharePartitionOffsetInfo> partitionToOffsetInfo = result.partitionsToOffsetInfo(GROUP_ID).get();

assertEquals(6, partitionToOffsetAndMetadata.size());
assertEquals(new OffsetAndMetadata(10, Optional.of(0), ""), partitionToOffsetAndMetadata.get(myTopicPartition0));
assertEquals(new OffsetAndMetadata(11, Optional.of(0), ""), partitionToOffsetAndMetadata.get(myTopicPartition1));
assertEquals(new OffsetAndMetadata(40, Optional.of(0), ""), partitionToOffsetAndMetadata.get(myTopicPartition2));
assertEquals(new OffsetAndMetadata(50, Optional.of(1), ""), partitionToOffsetAndMetadata.get(myTopicPartition3));
assertEquals(new OffsetAndMetadata(100, Optional.of(2), ""), partitionToOffsetAndMetadata.get(myTopicPartition4));
assertEquals(new OffsetAndMetadata(500, Optional.of(3), ""), partitionToOffsetAndMetadata.get(myTopicPartition5));
assertEquals(6, partitionToOffsetInfo.size());
assertEquals(new SharePartitionOffsetInfo(10, Optional.of(0), Optional.empty()), partitionToOffsetInfo.get(myTopicPartition0));
assertEquals(new SharePartitionOffsetInfo(11, Optional.of(0), Optional.empty()), partitionToOffsetInfo.get(myTopicPartition1));
assertEquals(new SharePartitionOffsetInfo(40, Optional.of(0), Optional.empty()), partitionToOffsetInfo.get(myTopicPartition2));
assertEquals(new SharePartitionOffsetInfo(50, Optional.of(1), Optional.empty()), partitionToOffsetInfo.get(myTopicPartition3));
assertEquals(new SharePartitionOffsetInfo(100, Optional.of(2), Optional.empty()), partitionToOffsetInfo.get(myTopicPartition4));
assertEquals(new SharePartitionOffsetInfo(500, Optional.of(3), Optional.empty()), partitionToOffsetInfo.get(myTopicPartition5));
}
}

Expand Down Expand Up @@ -11257,17 +11257,17 @@ public void testListShareGroupOffsetsMultipleGroups() throws Exception {
final ListShareGroupOffsetsResult result = env.adminClient().listShareGroupOffsets(groupSpecs);
assertEquals(2, result.all().get().size());

final Map<TopicPartition, OffsetAndMetadata> partitionToOffsetAndMetadataGroup0 = result.partitionsToOffsetAndMetadata(GROUP_ID).get();
assertEquals(4, partitionToOffsetAndMetadataGroup0.size());
assertEquals(new OffsetAndMetadata(10, Optional.of(0), ""), partitionToOffsetAndMetadataGroup0.get(myTopicPartition0));
assertEquals(new OffsetAndMetadata(11, Optional.of(0), ""), partitionToOffsetAndMetadataGroup0.get(myTopicPartition1));
assertEquals(new OffsetAndMetadata(40, Optional.of(0), ""), partitionToOffsetAndMetadataGroup0.get(myTopicPartition2));
assertEquals(new OffsetAndMetadata(50, Optional.of(1), ""), partitionToOffsetAndMetadataGroup0.get(myTopicPartition3));
final Map<TopicPartition, SharePartitionOffsetInfo> partitionToOffsetInfoGroup0 = result.partitionsToOffsetInfo(GROUP_ID).get();
assertEquals(4, partitionToOffsetInfoGroup0.size());
assertEquals(new SharePartitionOffsetInfo(10, Optional.of(0), Optional.empty()), partitionToOffsetInfoGroup0.get(myTopicPartition0));
assertEquals(new SharePartitionOffsetInfo(11, Optional.of(0), Optional.empty()), partitionToOffsetInfoGroup0.get(myTopicPartition1));
assertEquals(new SharePartitionOffsetInfo(40, Optional.of(0), Optional.empty()), partitionToOffsetInfoGroup0.get(myTopicPartition2));
assertEquals(new SharePartitionOffsetInfo(50, Optional.of(1), Optional.empty()), partitionToOffsetInfoGroup0.get(myTopicPartition3));

final Map<TopicPartition, OffsetAndMetadata> partitionToOffsetAndMetadataGroup1 = result.partitionsToOffsetAndMetadata("group-1").get();
assertEquals(2, partitionToOffsetAndMetadataGroup1.size());
assertEquals(new OffsetAndMetadata(100, Optional.of(2), ""), partitionToOffsetAndMetadataGroup1.get(myTopicPartition4));
assertEquals(new OffsetAndMetadata(500, Optional.of(2), ""), partitionToOffsetAndMetadataGroup1.get(myTopicPartition5));
final Map<TopicPartition, SharePartitionOffsetInfo> partitionToOffsetInfoGroup1 = result.partitionsToOffsetInfo("group-1").get();
assertEquals(2, partitionToOffsetInfoGroup1.size());
assertEquals(new SharePartitionOffsetInfo(100, Optional.of(2), Optional.empty()), partitionToOffsetInfoGroup1.get(myTopicPartition4));
assertEquals(new SharePartitionOffsetInfo(500, Optional.of(2), Optional.empty()), partitionToOffsetInfoGroup1.get(myTopicPartition5));
}
}

Expand All @@ -11290,9 +11290,9 @@ public void testListShareGroupOffsetsEmpty() throws Exception {
env.kafkaClient().prepareResponse(new DescribeShareGroupOffsetsResponse(data));

final ListShareGroupOffsetsResult result = env.adminClient().listShareGroupOffsets(groupSpecs);
final Map<TopicPartition, OffsetAndMetadata> partitionToOffsetAndMetadata = result.partitionsToOffsetAndMetadata(GROUP_ID).get();
final Map<TopicPartition, SharePartitionOffsetInfo> partitionToOffsetInfo = result.partitionsToOffsetInfo(GROUP_ID).get();

assertEquals(0, partitionToOffsetAndMetadata.size());
assertEquals(0, partitionToOffsetInfo.size());
}
}

Expand Down Expand Up @@ -11342,13 +11342,13 @@ public void testListShareGroupOffsetsWithErrorInOnePartition() throws Exception
env.kafkaClient().prepareResponse(new DescribeShareGroupOffsetsResponse(data));

final ListShareGroupOffsetsResult result = env.adminClient().listShareGroupOffsets(groupSpecs);
final Map<TopicPartition, OffsetAndMetadata> partitionToOffsetAndMetadata = result.partitionsToOffsetAndMetadata(GROUP_ID).get();
final Map<TopicPartition, SharePartitionOffsetInfo> partitionToOffsetInfo = result.partitionsToOffsetInfo(GROUP_ID).get();

// For myTopicPartition2 we have set an error as the response. Thus, it should be skipped from the final result
assertEquals(3, partitionToOffsetAndMetadata.size());
assertEquals(new OffsetAndMetadata(10, Optional.of(0), ""), partitionToOffsetAndMetadata.get(myTopicPartition0));
assertEquals(new OffsetAndMetadata(11, Optional.of(1), ""), partitionToOffsetAndMetadata.get(myTopicPartition1));
assertEquals(new OffsetAndMetadata(500, Optional.of(2), ""), partitionToOffsetAndMetadata.get(myTopicPartition3));
assertEquals(3, partitionToOffsetInfo.size());
assertEquals(new SharePartitionOffsetInfo(10, Optional.of(0), Optional.empty()), partitionToOffsetInfo.get(myTopicPartition0));
assertEquals(new SharePartitionOffsetInfo(11, Optional.of(1), Optional.empty()), partitionToOffsetInfo.get(myTopicPartition1));
assertEquals(new SharePartitionOffsetInfo(500, Optional.of(2), Optional.empty()), partitionToOffsetInfo.get(myTopicPartition3));
}
}

Expand Down
Loading