Skip to content

Commit

Permalink
Clean up processSeeks() duplicate code (#3411)
Browse files Browse the repository at this point in the history
* Use utility method for `processSeeks()` duplicate code
* Refactor utility method
  • Loading branch information
bky373 authored Aug 14, 2024
1 parent d8de5b9 commit 821b383
Showing 1 changed file with 11 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3004,6 +3004,15 @@ private void timedAcks() {
}
}

private boolean checkPartitionAssignedBeforeSeek(@Nullable Collection<TopicPartition> assigned, TopicPartition topicPartition) {
if (assigned != null && assigned.contains(topicPartition)) {
return true;
}
this.logger.warn("No current assignment for partition '" + topicPartition +
"' due to partition reassignment prior to seeking.");
return false;
}

private void processSeeks() {
Collection<TopicPartition> assigned = getAssignedPartitions();
processTimestampSeeks(assigned);
Expand All @@ -3012,9 +3021,7 @@ private void processSeeks() {
traceSeek(offset);
try {
TopicPartition topicPartition = offset.getTopicPartition();
if (assigned == null || !assigned.contains(topicPartition)) {
this.logger.warn("No current assignment for partition " + topicPartition +
" due to partition reassignment prior to seeking.");
if (!checkPartitionAssignedBeforeSeek(assigned, topicPartition)) {
offset = this.seeks.poll();
continue;
}
Expand Down Expand Up @@ -3068,9 +3075,7 @@ private void processTimestampSeeks(@Nullable Collection<TopicPartition> assigned
Map<TopicPartition, Long> timestampSeeks = null;
while (seekIterator.hasNext()) {
TopicPartitionOffset tpo = seekIterator.next();
if (assigned == null || !assigned.contains(tpo.getTopicPartition())) {
this.logger.warn("No current assignment for partition " + tpo.getTopicPartition() +
" due to partition reassignment prior to seeking.");
if (!checkPartitionAssignedBeforeSeek(assigned, tpo.getTopicPartition())) {
seekIterator.remove();
continue;
}
Expand Down

0 comments on commit 821b383

Please sign in to comment.