Skip to content

Commit

Permalink
fix cross cluster operations for real this time
Browse files Browse the repository at this point in the history
  • Loading branch information
patschuh committed Oct 19, 2023
1 parent 0b12405 commit 506901b
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 3 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ java {
}

javafx {
version = '17'
version = '21'
modules = ['javafx.controls', 'javafx.fxml']
}

Expand Down
6 changes: 4 additions & 2 deletions src/main/java/at/esque/kafka/CrossClusterController.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.text.MessageFormat;
import java.time.Duration;
import java.time.Instant;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
Expand Down Expand Up @@ -193,8 +194,9 @@ private void startOperation(UUID operationId) {
while (!operation.getStop().get() && (limit == null || count.get() < limit) && !operation.getStatus().equals("Error")) {
ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(5));
Iterable<ConsumerRecord> records = consumerRecords.records(operation.getFromTopic().getName());
while ((limit == null || count.get() < limit) && records.iterator().hasNext()) {
ConsumerRecord consumerRecord = records.iterator().next();
Iterator<ConsumerRecord> iterator = records.iterator();
while ((limit == null || count.get() < limit) && iterator.hasNext()) {
ConsumerRecord consumerRecord = iterator.next();
try {
if (operation.getFilterFunction().test(consumerRecord)) {
if (reserializeMessagesToggle.isSelected()) {
Expand Down

0 comments on commit 506901b

Please sign in to comment.