Skip to content

Commit 0072bd9

Browse files
authored
KAFKA-19820 remove the unnecessary copy from AbstractFetch#fetchablePartitions (#20745)
Remove the redundant copy and change the return type to List. This change also align `ShareConsumer` Reviewers: Kirk True <[email protected]>, Ken Huang <[email protected]>, Chia-Ping Tsai <[email protected]>
1 parent 23b809a commit 0072bd9

File tree

1 file changed

+4
-4
lines changed

1 file changed

+4
-4
lines changed

clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -341,15 +341,15 @@ protected FetchRequest.Builder createFetchRequest(final Node fetchTarget,
341341
* until the previously-fetched data has been processed.
342342
*
343343
* @param buffered The set of partitions we have in our buffer
344-
* @return {@link Set} of {@link TopicPartition topic partitions} for which we should fetch data
344+
* @return {@link List} of {@link TopicPartition topic partitions} for which we should fetch data
345345
*/
346-
private Set<TopicPartition> fetchablePartitions(Set<TopicPartition> buffered) {
346+
private List<TopicPartition> fetchablePartitions(Set<TopicPartition> buffered) {
347347
// This is the test that returns true if the partition is *not* buffered
348348
Predicate<TopicPartition> isNotBuffered = tp -> !buffered.contains(tp);
349349

350350
// Return all partitions that are in an otherwise fetchable state *and* for which we don't already have some
351351
// messages sitting in our buffer.
352-
return new HashSet<>(subscriptions.fetchablePartitions(isNotBuffered));
352+
return subscriptions.fetchablePartitions(isNotBuffered);
353353
}
354354

355355
/**
@@ -430,7 +430,7 @@ protected Map<Node, FetchSessionHandler.FetchRequestData> prepareFetchRequests()
430430
Set<TopicPartition> buffered = Collections.unmodifiableSet(fetchBuffer.bufferedPartitions());
431431

432432
// This is the set of partitions that do not have buffered data
433-
Set<TopicPartition> unbuffered = fetchablePartitions(buffered);
433+
List<TopicPartition> unbuffered = fetchablePartitions(buffered);
434434

435435
if (unbuffered.isEmpty()) {
436436
// If there are no partitions that don't already have data locally buffered, there's no need to issue

0 commit comments

Comments
 (0)