diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java index 420e76de91..99505f3f22 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java @@ -1129,7 +1129,10 @@ private CompletableFuture> fetchOffset(String topicName, Long return; } if (position.compareTo(lac) > 0) { - partitionData.complete(Pair.of(Errors.NONE, latestOffset)); + // If all the data expires, the entry of lac becomes -1, + // and the actual offset will be 1 smaller than LATEST, + // here is special treatment, add 1 back + partitionData.complete(Pair.of(Errors.NONE, latestOffset + 1)); } else { MessageMetadataUtils.getOffsetOfPosition(managedLedger, position, false, timestamp, skipMessagesWithoutIndex)