A modern Spring Boot implementation demonstrating Kafka integration with Reactive Streams. Features real-time Wikimedia data processing with DynamoDB persistence.
- Real-time data streaming from Wikimedia
- Reactive Kafka producers/consumers
- DynamoDB integration for data persistence
- Custom serialization/deserialization
- Consumer group management
- Offset tracking implementation
Start Kafka Services
# Start ZooKeeper
$ bin/zookeeper-server-start.sh config/zookeeper.properties
# Start Kafka Broker (in new terminal)
$ bin/kafka-server-start.sh config/server.properties
Clone and Run
$ git clone https://github.com/Asirwad/Reactive-RESTAPI-with-Kafka-and-Webflux
$ cd Reactive-RESTAPI-with-Kafka-and-Webflux
$ ./mvnw spring-boot:run
📦 Kafka Architecture Overview
Component | Description |
---|---|
Producer | Publishes messages to topics |
Consumer | Subscribes and processes messages |
Broker | Manages data storage and distribution |
ZooKeeper | Handles cluster coordination |
🏗️ Kafka Cluster
- Distributed message broker system
- Horizontal scaling capabilities
- Automatic failover handling
📮 Kafka Producer
// Example Reactive Producer
public Mono<SenderResult<Void>> sendMessage(String topic, String message) {
return kafkaSender.send(
Mono.just(SenderRecord.create(topic, null, null, null, message, null))
);
}
📥 Kafka Consumer
// Example Reactive Consumer
@Bean
public ReceiverOptions<String, String> receiverOptions() {
return ReceiverOptions.<String, String>create(consumerProps())
.subscription(Collections.singleton("wikimedia.recentchange"));
}
📂 Topics & Partitions
Feature | Benefit |
---|---|
Partitions | Enable parallel processing |
Replication | Ensure data redundancy |
Retention | Configurable message persistence |
🔢 Offsets & Consumer Groups
- Offset Tracking: Consumer position management
- Group Coordination: Parallel message processing
- Rebalancing: Automatic partition redistribution
📡 Producer Implementation
Wikimedia Stream Processor
- Reactive HTTP client for stream consumption
- Kafka Template for message publishing
- Backpressure management
- Error handling with retries
webClient.get()
.uri("/v2/stream/recentchange")
.retrieve()
.bodyToFlux(String.class)
.doOnNext(event -> kafkaTemplate.send("wikimedia.recentchange", event))
.subscribe();
💾 Consumer Implementation
DynamoDB Persistence
- Batch record processing
- Exponential backoff strategy
- Consumer group management
- Offset commit strategies
@Bean
public Consumer<Flux<ConsumerRecord<String, String>>> dynamoDbSaver() {
return flux -> flux
.bufferTimeout(100, Duration.ofMillis(500))
.flatMap(batch -> dynamoService.saveBatch(batch))
.subscribe();
}
Common Issues
- Verify zookeeper.properties configuration
- Check for port conflicts (default 2181)
- Monitor with
kafka-consumer-groups.sh
- Adjust
max.poll.records
if needed
- Validate message formats
- Check key/value serializer configurations