Skip to content

Commit

Permalink
[FEAT] Messaging : replace aws kafka broker with embeded kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
ilyasabdellaoui committed May 26, 2024
1 parent fc22f31 commit 7c3b025
Show file tree
Hide file tree
Showing 9 changed files with 112 additions and 42 deletions.
44 changes: 23 additions & 21 deletions .idea/workspace.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions api-gateway/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ spring:
uri: http://posts-service:8081
predicates:
- Path=/api/posts/**
- id: angular
uri: http://localhost:4200
predicates:
- Path=/**
### Local config :
# - id: kafka-messaging
# uri: http://localhost:8083
Expand Down
26 changes: 18 additions & 8 deletions messaging-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,28 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springdoc</groupId>
<artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
<version>2.3.0</version>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>1.19.3</version>
<scope>test</scope>
</dependency>

<dependency>
Expand All @@ -57,6 +61,12 @@
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
</dependency>

<dependency>
<groupId>org.springdoc</groupId>
<artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
<version>2.3.0</version>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.example.kafkamessaging.Config;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Properties;

@Configuration
public class KafkaConfig {

@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;

@Bean
public AdminClient adminClient() {
Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
return AdminClient.create(properties);
}
}
Original file line number Diff line number Diff line change
@@ -1,23 +1,39 @@
package com.example.kafkamessaging.Controller;

import com.example.kafkamessaging.MessageRecord;
import org.springframework.beans.factory.annotation.Autowired;
import lombok.RequiredArgsConstructor;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.*;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.Collections;
import java.util.concurrent.ExecutionException;

@RestController
@RequestMapping("api/kafka/send")
@CrossOrigin(origins = "http://localhost:8888")
@RequestMapping("api/kafka")
@RequiredArgsConstructor
public class KafkaController {

private final KafkaTemplate<String, String> kafkaTemplate;
private final AdminClient adminClient;

public KafkaController(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
@PostMapping("send/{topic}")
public void publish(@PathVariable("topic") String topic, @RequestBody MessageRecord messageRecord) {
kafkaTemplate.send(topic, messageRecord.message());
}

@PostMapping
public void publish(@RequestBody MessageRecord messageRecord) {
kafkaTemplate.send("sporterz", messageRecord.message());
@PostMapping("create-topic")
public ResponseEntity<String> createTopic(@RequestParam String topicName) {
NewTopic newTopic = new NewTopic(topicName, 1, (short) 1);
try {
adminClient.createTopics(Collections.singleton(newTopic)).all().get();
return ResponseEntity.status(HttpStatus.CREATED).body("Topic created successfully: " + topicName);
} catch (InterruptedException | ExecutionException e) {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Failed to create topic: " + topicName);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ public class KafkaListeners {
public void listen(MessageRecord messageRecord) {
System.out.println("Received message: " + messageRecord.message());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,24 @@

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.EmbeddedKafkaZKBroker;

@SpringBootApplication
@Configuration
public class KafkaMessagingApplication {

public static void main(String[] args) {
SpringApplication.run(KafkaMessagingApplication.class, args);
}

}
@Bean
EmbeddedKafkaBroker broker() {
return new EmbeddedKafkaZKBroker(1)
.kafkaPorts(9092)
.brokerListProperty("spring.kafka.bootstrap-servers");
}

}
3 changes: 2 additions & 1 deletion messaging-service/src/main/resources/application.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
server.port=8083
spring.application.name=messaging-service
spring.kafka.bootstrap-servers=ec2-3-65-182-112.eu-central-1.compute.amazonaws.com:9092
#spring.kafka.bootstrap-servers=ec2-3-65-182-112.eu-central-1.compute.amazonaws.com:9092
spring.kafka.bootstrap-servers=localhost:9092

management.endpoints.web.exposure.include=prometheus
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;

@SpringBootTest
class KafkaMessagingApplicationTests {
Expand All @@ -10,4 +12,4 @@ class KafkaMessagingApplicationTests {
void contextLoads() {
}

}
}

0 comments on commit 7c3b025

Please sign in to comment.