Skip to content

Commit 4726185

Browse files
BAEL-8967: implementation for shared test broker (#18903)
Co-authored-by: bala <[email protected]>
1 parent 2183c26 commit 4726185

File tree

6 files changed

+199
-0
lines changed

6 files changed

+199
-0
lines changed
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package com.baeldung.kafka.sharedbroker;
2+
3+
import org.springframework.boot.SpringApplication;
4+
import org.springframework.boot.autoconfigure.SpringBootApplication;
5+
6+
@SpringBootApplication
7+
public class Application {
8+
9+
public static void main(String[] args) {
10+
SpringApplication.run(Application.class, args);
11+
}
12+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package com.baeldung.kafka.sharedbroker;
2+
3+
import static org.apache.kafka.clients.admin.AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG;
4+
5+
import java.util.Map;
6+
import java.util.concurrent.CountDownLatch;
7+
8+
import org.apache.kafka.clients.admin.AdminClient;
9+
import org.apache.kafka.clients.consumer.ConsumerRecord;
10+
import org.slf4j.Logger;
11+
import org.slf4j.LoggerFactory;
12+
import org.springframework.beans.factory.annotation.Value;
13+
import org.springframework.kafka.annotation.KafkaListener;
14+
import org.springframework.stereotype.Component;
15+
16+
@Component
17+
public class OrderListener {
18+
19+
private static final Logger LOG = LoggerFactory.getLogger(OrderListener.class);
20+
private final static CountDownLatch latch = new CountDownLatch(1);
21+
22+
@Value("${spring.kafka.bootstrap-servers}")
23+
private String bootstrapServers;
24+
25+
@KafkaListener(topics = "order")
26+
public void receive(ConsumerRecord<String, String> consumerRecord) throws Exception {
27+
try (AdminClient admin = AdminClient.create(Map.of(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers))) {
28+
LOG.info("Received customer order request [{}] from broker [{}]", consumerRecord.value(), admin.describeCluster()
29+
.clusterId()
30+
.get());
31+
}
32+
latch.countDown();
33+
}
34+
35+
public static CountDownLatch getLatch() {
36+
return latch;
37+
}
38+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package com.baeldung.kafka.sharedbroker;
2+
3+
import static org.apache.kafka.clients.admin.AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG;
4+
5+
import java.util.Map;
6+
import java.util.concurrent.CountDownLatch;
7+
8+
import org.apache.kafka.clients.admin.AdminClient;
9+
import org.apache.kafka.clients.consumer.ConsumerRecord;
10+
import org.slf4j.Logger;
11+
import org.slf4j.LoggerFactory;
12+
import org.springframework.beans.factory.annotation.Value;
13+
import org.springframework.kafka.annotation.KafkaListener;
14+
import org.springframework.stereotype.Component;
15+
16+
@Component
17+
public class PaymentListener {
18+
19+
private static final Logger LOG = LoggerFactory.getLogger(PaymentListener.class);
20+
private static final CountDownLatch latch = new CountDownLatch(1);
21+
22+
@Value("${spring.kafka.bootstrap-servers}")
23+
private String bootstrapServers;
24+
25+
@KafkaListener(topics = "payment")
26+
public void receive(ConsumerRecord<String, String> consumerRecord) throws Exception {
27+
try (AdminClient admin = AdminClient.create(Map.of(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers))) {
28+
LOG.info("Received payment request [{}] from broker [{}]", consumerRecord.value(), admin.describeCluster()
29+
.clusterId()
30+
.get());
31+
}
32+
latch.countDown();
33+
}
34+
35+
public static CountDownLatch getLatch() {
36+
return latch;
37+
}
38+
39+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package com.baeldung.kafka.sharedbroker;
2+
3+
import org.springframework.kafka.KafkaException;
4+
import org.springframework.kafka.test.EmbeddedKafkaBroker;
5+
import org.springframework.kafka.test.EmbeddedKafkaKraftBroker;
6+
7+
public final class EmbeddedKafkaHolder {
8+
9+
private static final EmbeddedKafkaBroker embeddedKafka = new EmbeddedKafkaKraftBroker(1, 1, "order", "payment").brokerListProperty(
10+
"spring.kafka.bootstrap-servers");
11+
12+
private static volatile boolean started;
13+
14+
private EmbeddedKafkaHolder() {
15+
}
16+
17+
public static EmbeddedKafkaBroker getEmbeddedKafka() {
18+
if (!started) {
19+
synchronized (EmbeddedKafkaBroker.class) {
20+
if (!started) {
21+
try {
22+
embeddedKafka.afterPropertiesSet();
23+
} catch (Exception e) {
24+
throw new KafkaException("Embedded broker failed to start", e);
25+
}
26+
started = true;
27+
}
28+
}
29+
}
30+
return embeddedKafka;
31+
}
32+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package com.baeldung.kafka.sharedbroker;
2+
3+
import java.util.UUID;
4+
import java.util.concurrent.TimeUnit;
5+
6+
import org.junit.jupiter.api.Test;
7+
import org.springframework.beans.factory.annotation.Autowired;
8+
import org.springframework.boot.test.context.SpringBootTest;
9+
import org.springframework.kafka.core.KafkaTemplate;
10+
import org.springframework.kafka.test.EmbeddedKafkaBroker;
11+
import org.springframework.test.context.DynamicPropertyRegistry;
12+
import org.springframework.test.context.DynamicPropertySource;
13+
import org.testcontainers.shaded.org.awaitility.Awaitility;
14+
15+
@SpringBootTest
16+
class OrderListenerTest {
17+
18+
private static final EmbeddedKafkaBroker broker = EmbeddedKafkaHolder.getEmbeddedKafka();
19+
20+
@Autowired
21+
private KafkaTemplate<String, String> kafkaTemplate;
22+
23+
@DynamicPropertySource
24+
static void kafkaProps(DynamicPropertyRegistry registry) {
25+
registry.add("spring.kafka.bootstrap-servers", broker::getBrokersAsString);
26+
}
27+
28+
@Test
29+
void givenKafkaBroker_whenOrderMessageIsSent_thenListenerConsumesMessages() {
30+
kafkaTemplate.send("order", "key", "{\"orderId\":%s}".formatted(UUID.randomUUID()
31+
.toString()));
32+
Awaitility.await()
33+
.atMost(10, TimeUnit.SECONDS)
34+
.pollInterval(500, TimeUnit.MILLISECONDS)
35+
.until(() -> OrderListener.getLatch()
36+
.getCount() == 0);
37+
}
38+
39+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package com.baeldung.kafka.sharedbroker;
2+
3+
import java.util.UUID;
4+
import java.util.concurrent.TimeUnit;
5+
6+
import org.junit.jupiter.api.Test;
7+
import org.springframework.beans.factory.annotation.Autowired;
8+
import org.springframework.boot.test.context.SpringBootTest;
9+
import org.springframework.kafka.core.KafkaTemplate;
10+
import org.springframework.kafka.test.EmbeddedKafkaBroker;
11+
import org.springframework.test.context.DynamicPropertyRegistry;
12+
import org.springframework.test.context.DynamicPropertySource;
13+
import org.testcontainers.shaded.org.awaitility.Awaitility;
14+
15+
@SpringBootTest
16+
class PaymentListenerTest {
17+
18+
private static final EmbeddedKafkaBroker broker = EmbeddedKafkaHolder.getEmbeddedKafka();
19+
20+
@Autowired
21+
private KafkaTemplate<String, String> kafkaTemplate;
22+
23+
@DynamicPropertySource
24+
static void kafkaProps(DynamicPropertyRegistry registry) {
25+
registry.add("spring.kafka.bootstrap-servers", broker::getBrokersAsString);
26+
}
27+
28+
@Test
29+
void givenKafkaBroker_whenPaymentMessageIsSent_thenListenerConsumesMessages() {
30+
kafkaTemplate.send("payment", "key", "{\"paymentId\":%s}".formatted(UUID.randomUUID()
31+
.toString()));
32+
Awaitility.await()
33+
.atMost(10, TimeUnit.SECONDS)
34+
.pollInterval(500, TimeUnit.MILLISECONDS)
35+
.until(() -> PaymentListener.getLatch()
36+
.getCount() == 0);
37+
}
38+
39+
}

0 commit comments

Comments
 (0)