diff --git a/spring-kafka-4/src/main/java/com/baeldung/kafka/sharedbroker/Application.java b/spring-kafka-4/src/main/java/com/baeldung/kafka/sharedbroker/Application.java new file mode 100644 index 000000000000..bc097bd3364f --- /dev/null +++ b/spring-kafka-4/src/main/java/com/baeldung/kafka/sharedbroker/Application.java @@ -0,0 +1,12 @@ +package com.baeldung.kafka.sharedbroker; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class Application { + + public static void main(String[] args) { + SpringApplication.run(Application.class, args); + } +} diff --git a/spring-kafka-4/src/main/java/com/baeldung/kafka/sharedbroker/OrderListener.java b/spring-kafka-4/src/main/java/com/baeldung/kafka/sharedbroker/OrderListener.java new file mode 100644 index 000000000000..fe125ce7c83d --- /dev/null +++ b/spring-kafka-4/src/main/java/com/baeldung/kafka/sharedbroker/OrderListener.java @@ -0,0 +1,38 @@ +package com.baeldung.kafka.sharedbroker; + +import static org.apache.kafka.clients.admin.AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG; + +import java.util.Map; +import java.util.concurrent.CountDownLatch; + +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; + +@Component +public class OrderListener { + + private static final Logger LOG = LoggerFactory.getLogger(OrderListener.class); + private final static CountDownLatch latch = new CountDownLatch(1); + + @Value("${spring.kafka.bootstrap-servers}") + private String bootstrapServers; + + @KafkaListener(topics = "order") + public void receive(ConsumerRecord consumerRecord) throws Exception { + try (AdminClient admin = AdminClient.create(Map.of(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers))) { + LOG.info("Received customer order request [{}] from broker [{}]", consumerRecord.value(), admin.describeCluster() + .clusterId() + .get()); + } + latch.countDown(); + } + + public static CountDownLatch getLatch() { + return latch; + } +} diff --git a/spring-kafka-4/src/main/java/com/baeldung/kafka/sharedbroker/PaymentListener.java b/spring-kafka-4/src/main/java/com/baeldung/kafka/sharedbroker/PaymentListener.java new file mode 100644 index 000000000000..78dd3dfc33a0 --- /dev/null +++ b/spring-kafka-4/src/main/java/com/baeldung/kafka/sharedbroker/PaymentListener.java @@ -0,0 +1,39 @@ +package com.baeldung.kafka.sharedbroker; + +import static org.apache.kafka.clients.admin.AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG; + +import java.util.Map; +import java.util.concurrent.CountDownLatch; + +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; + +@Component +public class PaymentListener { + + private static final Logger LOG = LoggerFactory.getLogger(PaymentListener.class); + private static final CountDownLatch latch = new CountDownLatch(1); + + @Value("${spring.kafka.bootstrap-servers}") + private String bootstrapServers; + + @KafkaListener(topics = "payment") + public void receive(ConsumerRecord consumerRecord) throws Exception { + try (AdminClient admin = AdminClient.create(Map.of(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers))) { + LOG.info("Received payment request [{}] from broker [{}]", consumerRecord.value(), admin.describeCluster() + .clusterId() + .get()); + } + latch.countDown(); + } + + public static CountDownLatch getLatch() { + return latch; + } + +} diff --git a/spring-kafka-4/src/test/java/com/baeldung/kafka/sharedbroker/EmbeddedKafkaHolder.java b/spring-kafka-4/src/test/java/com/baeldung/kafka/sharedbroker/EmbeddedKafkaHolder.java new file mode 100644 index 000000000000..b2e6e2db25c6 --- /dev/null +++ b/spring-kafka-4/src/test/java/com/baeldung/kafka/sharedbroker/EmbeddedKafkaHolder.java @@ -0,0 +1,32 @@ +package com.baeldung.kafka.sharedbroker; + +import org.springframework.kafka.KafkaException; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.EmbeddedKafkaKraftBroker; + +public final class EmbeddedKafkaHolder { + + private static final EmbeddedKafkaBroker embeddedKafka = new EmbeddedKafkaKraftBroker(1, 1, "order", "payment").brokerListProperty( + "spring.kafka.bootstrap-servers"); + + private static volatile boolean started; + + private EmbeddedKafkaHolder() { + } + + public static EmbeddedKafkaBroker getEmbeddedKafka() { + if (!started) { + synchronized (EmbeddedKafkaBroker.class) { + if (!started) { + try { + embeddedKafka.afterPropertiesSet(); + } catch (Exception e) { + throw new KafkaException("Embedded broker failed to start", e); + } + started = true; + } + } + } + return embeddedKafka; + } +} diff --git a/spring-kafka-4/src/test/java/com/baeldung/kafka/sharedbroker/OrderListenerTest.java b/spring-kafka-4/src/test/java/com/baeldung/kafka/sharedbroker/OrderListenerTest.java new file mode 100644 index 000000000000..01782c6c2fc8 --- /dev/null +++ b/spring-kafka-4/src/test/java/com/baeldung/kafka/sharedbroker/OrderListenerTest.java @@ -0,0 +1,39 @@ +package com.baeldung.kafka.sharedbroker; + +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.test.context.DynamicPropertyRegistry; +import org.springframework.test.context.DynamicPropertySource; +import org.testcontainers.shaded.org.awaitility.Awaitility; + +@SpringBootTest +class OrderListenerTest { + + private static final EmbeddedKafkaBroker broker = EmbeddedKafkaHolder.getEmbeddedKafka(); + + @Autowired + private KafkaTemplate kafkaTemplate; + + @DynamicPropertySource + static void kafkaProps(DynamicPropertyRegistry registry) { + registry.add("spring.kafka.bootstrap-servers", broker::getBrokersAsString); + } + + @Test + void givenKafkaBroker_whenOrderMessageIsSent_thenListenerConsumesMessages() { + kafkaTemplate.send("order", "key", "{\"orderId\":%s}".formatted(UUID.randomUUID() + .toString())); + Awaitility.await() + .atMost(10, TimeUnit.SECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + .until(() -> OrderListener.getLatch() + .getCount() == 0); + } + +} \ No newline at end of file diff --git a/spring-kafka-4/src/test/java/com/baeldung/kafka/sharedbroker/PaymentListenerTest.java b/spring-kafka-4/src/test/java/com/baeldung/kafka/sharedbroker/PaymentListenerTest.java new file mode 100644 index 000000000000..c88c75b1eaee --- /dev/null +++ b/spring-kafka-4/src/test/java/com/baeldung/kafka/sharedbroker/PaymentListenerTest.java @@ -0,0 +1,39 @@ +package com.baeldung.kafka.sharedbroker; + +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.test.context.DynamicPropertyRegistry; +import org.springframework.test.context.DynamicPropertySource; +import org.testcontainers.shaded.org.awaitility.Awaitility; + +@SpringBootTest +class PaymentListenerTest { + + private static final EmbeddedKafkaBroker broker = EmbeddedKafkaHolder.getEmbeddedKafka(); + + @Autowired + private KafkaTemplate kafkaTemplate; + + @DynamicPropertySource + static void kafkaProps(DynamicPropertyRegistry registry) { + registry.add("spring.kafka.bootstrap-servers", broker::getBrokersAsString); + } + + @Test + void givenKafkaBroker_whenPaymentMessageIsSent_thenListenerConsumesMessages() { + kafkaTemplate.send("payment", "key", "{\"paymentId\":%s}".formatted(UUID.randomUUID() + .toString())); + Awaitility.await() + .atMost(10, TimeUnit.SECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + .until(() -> PaymentListener.getLatch() + .getCount() == 0); + } + +} \ No newline at end of file