Skip to content

Commit d6728d4

Browse files
authored
KAFKA-19789: Log an error when we get duplicate acquired offsets in ShareFetchResponse. (#20752)
*What* https://issues.apache.org/jira/browse/KAFKA-19789 - There were some scenarios where `ShareFetchResponse` contained duplicate acquired records, this was a broker side bug. - Although ideally this should not happen, the client was not expecting this case and acknowledged with `GAP` type for any duplicate occurrence. - This case should be logged as an error in the client, and we must not acknowledge the duplicate offsets as the broker is already in a bad state. - PR adds an error log for this case and a unit test for the same. Reviewers: Chia-Ping Tsai <[email protected]>, Andrew Schofield <[email protected]>
1 parent 7337631 commit d6728d4

File tree

2 files changed

+74
-3
lines changed

2 files changed

+74
-3
lines changed

clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,9 @@
4141

4242
import java.io.Closeable;
4343
import java.nio.ByteBuffer;
44+
import java.util.ArrayList;
4445
import java.util.HashSet;
4546
import java.util.Iterator;
46-
import java.util.LinkedList;
4747
import java.util.List;
4848
import java.util.ListIterator;
4949
import java.util.Optional;
@@ -99,10 +99,23 @@ public class ShareCompletedFetch {
9999
}
100100

101101
private List<OffsetAndDeliveryCount> buildAcquiredRecordList(List<ShareFetchResponseData.AcquiredRecords> partitionAcquiredRecords) {
102-
List<OffsetAndDeliveryCount> acquiredRecordList = new LinkedList<>();
102+
// Setting the size of the array to the size of the first batch of acquired records. In case there is only 1 batch acquired, resizing would not happen.
103+
if (partitionAcquiredRecords.isEmpty()) {
104+
return List.of();
105+
}
106+
int initialListSize = (int) (partitionAcquiredRecords.get(0).lastOffset() - partitionAcquiredRecords.get(0).firstOffset() + 1);
107+
List<OffsetAndDeliveryCount> acquiredRecordList = new ArrayList<>(initialListSize);
108+
109+
// Set to find duplicates in case of overlapping acquired records
110+
Set<Long> offsets = new HashSet<>();
103111
partitionAcquiredRecords.forEach(acquiredRecords -> {
104112
for (long offset = acquiredRecords.firstOffset(); offset <= acquiredRecords.lastOffset(); offset++) {
105-
acquiredRecordList.add(new OffsetAndDeliveryCount(offset, acquiredRecords.deliveryCount()));
113+
if (!offsets.add(offset)) {
114+
log.error("Duplicate acquired record offset {} found in share fetch response for partition {}. " +
115+
"This indicates a broker processing issue.", offset, partition.topicPartition());
116+
} else {
117+
acquiredRecordList.add(new OffsetAndDeliveryCount(offset, acquiredRecords.deliveryCount()));
118+
}
106119
}
107120
});
108121
return acquiredRecordList;

clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import static org.junit.jupiter.api.Assertions.assertEquals;
6161
import static org.junit.jupiter.api.Assertions.assertNotNull;
6262
import static org.junit.jupiter.api.Assertions.assertNull;
63+
import static org.junit.jupiter.api.Assertions.assertTrue;
6364

6465
public class ShareCompletedFetchTest {
6566
private static final String TOPIC_NAME = "test";
@@ -356,6 +357,63 @@ record = records.get(1);
356357
assertEquals(0, records.size());
357358
}
358359

360+
@Test
361+
public void testOverlappingAcquiredRecordsLogsErrorAndRetainsFirstOccurrence() {
362+
int startingOffset = 0;
363+
int numRecords = 20; // Records for 0-19
364+
365+
// Create overlapping acquired records: [0-9] and [5-14]
366+
// Offsets 5-9 will be duplicates
367+
List<ShareFetchResponseData.AcquiredRecords> acquiredRecords = new ArrayList<>();
368+
acquiredRecords.add(new ShareFetchResponseData.AcquiredRecords()
369+
.setFirstOffset(0L)
370+
.setLastOffset(9L)
371+
.setDeliveryCount((short) 1));
372+
acquiredRecords.add(new ShareFetchResponseData.AcquiredRecords()
373+
.setFirstOffset(5L)
374+
.setLastOffset(14L)
375+
.setDeliveryCount((short) 2));
376+
377+
ShareFetchResponseData.PartitionData partitionData = new ShareFetchResponseData.PartitionData()
378+
.setRecords(newRecords(startingOffset, numRecords))
379+
.setAcquiredRecords(acquiredRecords);
380+
381+
ShareCompletedFetch completedFetch = newShareCompletedFetch(partitionData);
382+
383+
Deserializers<String, String> deserializers = newStringDeserializers();
384+
385+
// Fetch records and verify that only 15 unique records are returned (0-14)
386+
ShareInFlightBatch<String, String> batch = completedFetch.fetchRecords(deserializers, 20, true);
387+
List<ConsumerRecord<String, String>> records = batch.getInFlightRecords();
388+
389+
// Should get 15 unique records: 0-9 from first range (with deliveryCount=1)
390+
// and 10-14 from second range (with deliveryCount=2)
391+
assertEquals(15, records.size());
392+
393+
// Verify first occurrence (offset 5 should have deliveryCount=1 from first range)
394+
ConsumerRecord<String, String> record5 = records.stream()
395+
.filter(r -> r.offset() == 5L)
396+
.findFirst()
397+
.orElse(null);
398+
assertNotNull(record5);
399+
assertEquals(Optional.of((short) 1), record5.deliveryCount());
400+
401+
// Verify offset 10 has deliveryCount=2 from second range
402+
ConsumerRecord<String, String> record10 = records.stream()
403+
.filter(r -> r.offset() == 10L)
404+
.findFirst()
405+
.orElse(null);
406+
assertNotNull(record10);
407+
assertEquals(Optional.of((short) 2), record10.deliveryCount());
408+
409+
// Verify all offsets are unique
410+
Set<Long> offsetSet = new HashSet<>();
411+
for (ConsumerRecord<String, String> record : records) {
412+
assertTrue(offsetSet.add(record.offset()),
413+
"Duplicate offset found in results: " + record.offset());
414+
}
415+
}
416+
359417
private ShareCompletedFetch newShareCompletedFetch(ShareFetchResponseData.PartitionData partitionData) {
360418
LogContext logContext = new LogContext();
361419
ShareFetchMetricsRegistry shareFetchMetricsRegistry = new ShareFetchMetricsRegistry();

0 commit comments

Comments
 (0)