Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.baeldung.kafka.sharedbroker;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Application {

public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.baeldung.kafka.sharedbroker;

import static org.apache.kafka.clients.admin.AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG;

import java.util.Map;
import java.util.concurrent.CountDownLatch;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class OrderListener {

private static final Logger LOG = LoggerFactory.getLogger(OrderListener.class);
private final static CountDownLatch latch = new CountDownLatch(1);

@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;

@KafkaListener(topics = "order")
public void receive(ConsumerRecord<String, String> consumerRecord) throws Exception {
try (AdminClient admin = AdminClient.create(Map.of(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers))) {
LOG.info("Received customer order request [{}] from broker [{}]", consumerRecord.value(), admin.describeCluster()
.clusterId()
.get());
}
latch.countDown();
}

public static CountDownLatch getLatch() {
return latch;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.baeldung.kafka.sharedbroker;

import static org.apache.kafka.clients.admin.AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG;

import java.util.Map;
import java.util.concurrent.CountDownLatch;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class PaymentListener {

private static final Logger LOG = LoggerFactory.getLogger(PaymentListener.class);
private static final CountDownLatch latch = new CountDownLatch(1);

@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;

@KafkaListener(topics = "payment")
public void receive(ConsumerRecord<String, String> consumerRecord) throws Exception {
try (AdminClient admin = AdminClient.create(Map.of(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers))) {
LOG.info("Received payment request [{}] from broker [{}]", consumerRecord.value(), admin.describeCluster()
.clusterId()
.get());
}
latch.countDown();
}

public static CountDownLatch getLatch() {
return latch;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.baeldung.kafka.sharedbroker;

import org.springframework.kafka.KafkaException;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.EmbeddedKafkaKraftBroker;

public final class EmbeddedKafkaHolder {

private static final EmbeddedKafkaBroker embeddedKafka = new EmbeddedKafkaKraftBroker(1, 1, "order", "payment").brokerListProperty(
"spring.kafka.bootstrap-servers");

private static volatile boolean started;

private EmbeddedKafkaHolder() {
}

public static EmbeddedKafkaBroker getEmbeddedKafka() {
if (!started) {
synchronized (EmbeddedKafkaBroker.class) {
if (!started) {
try {
embeddedKafka.afterPropertiesSet();
} catch (Exception e) {
throw new KafkaException("Embedded broker failed to start", e);
}
started = true;
}
}
}
return embeddedKafka;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.baeldung.kafka.sharedbroker;

import java.util.UUID;
import java.util.concurrent.TimeUnit;

import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.testcontainers.shaded.org.awaitility.Awaitility;

@SpringBootTest
class OrderListenerTest {

private static final EmbeddedKafkaBroker broker = EmbeddedKafkaHolder.getEmbeddedKafka();

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

@DynamicPropertySource
static void kafkaProps(DynamicPropertyRegistry registry) {
registry.add("spring.kafka.bootstrap-servers", broker::getBrokersAsString);
}

@Test
void givenKafkaBroker_whenOrderMessageIsSent_thenListenerConsumesMessages() {
kafkaTemplate.send("order", "key", "{\"orderId\":%s}".formatted(UUID.randomUUID()
.toString()));
Awaitility.await()
.atMost(10, TimeUnit.SECONDS)
.pollInterval(500, TimeUnit.MILLISECONDS)
.until(() -> OrderListener.getLatch()
.getCount() == 0);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.baeldung.kafka.sharedbroker;

import java.util.UUID;
import java.util.concurrent.TimeUnit;

import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.testcontainers.shaded.org.awaitility.Awaitility;

@SpringBootTest
class PaymentListenerTest {

private static final EmbeddedKafkaBroker broker = EmbeddedKafkaHolder.getEmbeddedKafka();

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

@DynamicPropertySource
static void kafkaProps(DynamicPropertyRegistry registry) {
registry.add("spring.kafka.bootstrap-servers", broker::getBrokersAsString);
}

@Test
void givenKafkaBroker_whenPaymentMessageIsSent_thenListenerConsumesMessages() {
kafkaTemplate.send("payment", "key", "{\"paymentId\":%s}".formatted(UUID.randomUUID()
.toString()));
Awaitility.await()
.atMost(10, TimeUnit.SECONDS)
.pollInterval(500, TimeUnit.MILLISECONDS)
.until(() -> PaymentListener.getLatch()
.getCount() == 0);
}

}