From 357c4b4cfa7b097afa2f5de4c86d1f474e7828f2 Mon Sep 17 00:00:00 2001 From: patschuh <46817726+patschuh@users.noreply.github.com> Date: Wed, 17 Apr 2024 20:49:49 +0200 Subject: [PATCH] fix StartEmbeddedKafka --- .../kafka/dialogs/TestDataProducer3.java | 26 ----- src/test/java/kafka/StartEmbeddedKafka.java | 5 +- .../java/kafka/WindowsEmbeddedKafkaRule.java | 94 ------------------- 3 files changed, 4 insertions(+), 121 deletions(-) delete mode 100644 src/test/java/at/esque/kafka/dialogs/TestDataProducer3.java delete mode 100644 src/test/java/kafka/WindowsEmbeddedKafkaRule.java diff --git a/src/test/java/at/esque/kafka/dialogs/TestDataProducer3.java b/src/test/java/at/esque/kafka/dialogs/TestDataProducer3.java deleted file mode 100644 index c97edd6..0000000 --- a/src/test/java/at/esque/kafka/dialogs/TestDataProducer3.java +++ /dev/null @@ -1,26 +0,0 @@ -package at.esque.kafka.dialogs; - -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.producer.ProducerRecord; - -import java.util.Properties; - -public class TestDataProducer3 { - public static void main(String[] args) { - final String bootstrapServers = "localhost:9092"; - Properties props = new Properties(); - props.put("bootstrap.servers", bootstrapServers); - props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "pk"); - try (Producer producer = new KafkaProducer<>(props)) { - producer.initTransactions(); - producer.beginTransaction(); - producer.send(new ProducerRecord<>("pk_input", "Test", "Test")); - producer.commitTransaction(); - } - - } -} \ No newline at end of file diff --git a/src/test/java/kafka/StartEmbeddedKafka.java b/src/test/java/kafka/StartEmbeddedKafka.java index 3b079c3..02b3a11 100644 --- a/src/test/java/kafka/StartEmbeddedKafka.java +++ b/src/test/java/kafka/StartEmbeddedKafka.java @@ -3,6 +3,7 @@ import org.junit.ClassRule; import org.junit.Ignore; import org.junit.Test; +import org.springframework.kafka.test.EmbeddedKafkaBroker; import org.springframework.kafka.test.rule.EmbeddedKafkaRule; import static com.google.code.tempusfugit.concurrency.ThreadUtils.sleep; @@ -13,10 +14,12 @@ public class StartEmbeddedKafka { @ClassRule public static EmbeddedKafkaRule embeddedKafkaRule = - new WindowsEmbeddedKafkaRule(1, true, "test.me").kafkaPorts(59000); + new EmbeddedKafkaRule(1, true, "test.me").kafkaPorts(59000); @Test public void startKafkaForManualTestsLaterWriterRealTests() { + EmbeddedKafkaBroker embeddedKafka = embeddedKafkaRule.getEmbeddedKafka(); + embeddedKafka.afterPropertiesSet(); sleep(seconds(60)); } } diff --git a/src/test/java/kafka/WindowsEmbeddedKafkaRule.java b/src/test/java/kafka/WindowsEmbeddedKafkaRule.java deleted file mode 100644 index 2ab1a40..0000000 --- a/src/test/java/kafka/WindowsEmbeddedKafkaRule.java +++ /dev/null @@ -1,94 +0,0 @@ -package kafka; - -import kafka.server.KafkaServer; -import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.test.TestUtils; -import org.springframework.kafka.test.EmbeddedKafkaBroker; -import org.springframework.kafka.test.rule.EmbeddedKafkaRule; - -import java.io.File; -import java.io.IOException; - -/** - * Fixes delete failure bug on Windows. - *

- * Overriding {@link EmbeddedKafkaRule#after()}, so that all temporary folders are deleted also on - * Windows. - *

- * This is achieved by calling getZookeeper().zookeeper().getZKDatabase().close(). - *

- * In addition, the number of try-catch blocks in the original implementation have been refactored to - * the {@link #swallow(SimpleFunction)} method. - *

- * The solution is based on following Gist: - * https://gist.github.com/grofoli/cffa0d06840cff34117d244f2bd7f628 - */ -public class WindowsEmbeddedKafkaRule extends EmbeddedKafkaRule { - - public WindowsEmbeddedKafkaRule(int count) { - super(count); - } - - public WindowsEmbeddedKafkaRule(int count, boolean controlledShutdown, String... topics) { - super(count, controlledShutdown, topics); - } - - public WindowsEmbeddedKafkaRule(int count, boolean controlledShutdown, int partitions, String... topics) { - super(count, controlledShutdown, partitions, topics); - } - - @Override - public void before() { - /* really brutal hack for logs of topic partitions that have received messages */ - for (File file : new File(TestUtils.tempDirectory().getParent()).listFiles()) { - if (file.getName().startsWith("kafka-")) { - swallow(() -> Utils.delete(file)); - } - } - super.before(); - } - - @Override - public void after() { - EmbeddedKafkaBroker embeddedKafka = getEmbeddedKafka(); - System.getProperties().remove(EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS); - System.getProperties().remove(EmbeddedKafkaBroker.SPRING_EMBEDDED_ZOOKEEPER_CONNECT); - for (KafkaServer kafkaServer : embeddedKafka.getKafkaServers()) { - swallow(() -> shutdown(kafkaServer)); - swallow(() -> deleteLogDir(kafkaServer)); - } - swallow(this::closeZkClient); - swallow(this::shutdownZookeeper); - } - - private void shutdown(KafkaServer kafkaServer) { - kafkaServer.shutdown(); - kafkaServer.awaitShutdown(); - } - - private void deleteLogDir(KafkaServer kafkaServer) { - // no need to make this delete here because of the shutdown hook in Testutils - // CoreUtils.delete(kafkaServer.config().logDirs()); - } - - private void closeZkClient() { - getEmbeddedKafka().getZooKeeperClient().close(); - } - - private void shutdownZookeeper() throws IOException { - getEmbeddedKafka().getZookeeper().shutdown(); - } - - private void swallow(SimpleFunction function) { - try { - function.execute(); - } catch (Exception e) { - // do nothing - } - } - - @FunctionalInterface - interface SimpleFunction { - void execute() throws Exception; - } -}