Skip to content

Commit

Permalink
Merge pull request #15405 from tomazfernandes/BAEL-7349
Browse files Browse the repository at this point in the history
BAEL-7349 - Introduction to Spring Cloud AWS SQS.
  • Loading branch information
theangrydev authored Jan 13, 2024
2 parents 0743fa0 + 5ff9dfe commit 5923092
Show file tree
Hide file tree
Showing 12 changed files with 332 additions and 0 deletions.
1 change: 1 addition & 0 deletions spring-cloud-modules/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
<!-- <module>spring-cloud-stream-starters</module>-->
<module>spring-cloud-connectors-heroku</module>
<module>spring-cloud-aws</module>
<module>spring-cloud-aws-v3</module>
<module>spring-cloud-consul</module>
<!-- <module>spring-cloud-zuul-eureka-integration</module>-->
<!-- <module>spring-cloud-contract</module>-->
Expand Down
3 changes: 3 additions & 0 deletions spring-cloud-modules/spring-cloud-aws-v3/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Spring Cloud AWS

TBD
68 changes: 68 additions & 0 deletions spring-cloud-modules/spring-cloud-aws-v3/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.baeldung.spring.cloud</groupId>
<artifactId>spring-cloud-aws-v3</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spring-cloud-aws-v3</name>
<packaging>jar</packaging>
<description>Spring Cloud AWS Examples</description>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws</artifactId>
<version>${spring-cloud-aws.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws-starter</artifactId>
</dependency>
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws-starter-sqs</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>localstack</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>

</dependencies>

<properties>
<start-class>com.baeldung.spring.cloud.aws.SpringCloudAwsApplication</start-class>
<spring-cloud-aws.version>3.1.0</spring-cloud-aws.version>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
</properties>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.baeldung.spring.cloud.aws;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;

import com.baeldung.spring.cloud.aws.sqs.EventQueuesProperties;

@SpringBootApplication
@EnableConfigurationProperties(EventQueuesProperties.class)
public class SpringCloudAwsApplication {

public static void main(String[] args) {
SpringApplication.run(SpringCloudAwsApplication.class, args);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.baeldung.spring.cloud.aws.sqs;

import org.springframework.boot.context.properties.ConfigurationProperties;

@ConfigurationProperties(prefix = "events.queues")
public class EventQueuesProperties {

private String userCreatedByNameQueue;
private String userCreatedRecordQueue;
private String userCreatedEventTypeQueue;

public String getUserCreatedByNameQueue() {
return userCreatedByNameQueue;
}

public void setUserCreatedByNameQueue(String userCreatedByNameQueue) {
this.userCreatedByNameQueue = userCreatedByNameQueue;
}

public String getUserCreatedRecordQueue() {
return userCreatedRecordQueue;
}

public void setUserCreatedRecordQueue(String userCreatedRecordQueue) {
this.userCreatedRecordQueue = userCreatedRecordQueue;
}

public String getUserCreatedEventTypeQueue() {
return userCreatedEventTypeQueue;
}

public void setUserCreatedEventTypeQueue(String userCreatedEventTypeQueue) {
this.userCreatedEventTypeQueue = userCreatedEventTypeQueue;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.baeldung.spring.cloud.aws.sqs;

public record User(String id, String name, String email) {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.baeldung.spring.cloud.aws.sqs;

public record UserCreatedEvent(String id, String username, String email) {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package com.baeldung.spring.cloud.aws.sqs;

import static io.awspring.cloud.sqs.listener.SqsHeaders.MessageSystemAttributes.SQS_APPROXIMATE_FIRST_RECEIVE_TIMESTAMP;

import java.util.UUID;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import io.awspring.cloud.sqs.annotation.SqsListener;

@Component
public class UserEventListeners {

private static final Logger logger = LoggerFactory.getLogger(UserEventListeners.class);

public static final String EVENT_TYPE_CUSTOM_HEADER = "eventType";

private final UserRepository userRepository;

public UserEventListeners(UserRepository userRepository) {
this.userRepository = userRepository;
}

@SqsListener("${events.queues.user-created-by-name-queue}")
public void receiveStringMessage(String username) {
logger.info("Received message: {}", username);
userRepository.save(new User(UUID.randomUUID()
.toString(), username, null));
}

@SqsListener(value = "${events.queues.user-created-record-queue}")
public void receiveRecordMessage(UserCreatedEvent event) {
logger.info("Received message: {}", event);
userRepository.save(new User(event.id(), event.username(), event.email()));
}

@SqsListener("${events.queues.user-created-event-type-queue}")
public void customHeaderMessage(Message<UserCreatedEvent> message, @Header(EVENT_TYPE_CUSTOM_HEADER) String eventType,
@Header(SQS_APPROXIMATE_FIRST_RECEIVE_TIMESTAMP) Long firstReceive) {
logger.info("Received message {} with event type {}. First received at approximately {}.", message, eventType, firstReceive);
UserCreatedEvent payload = message.getPayload();
userRepository.save(new User(payload.id(), payload.username(), payload.email()));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.baeldung.spring.cloud.aws.sqs;

import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;

import org.springframework.stereotype.Repository;

@Repository
public class UserRepository {

private final Map<String, User> persistedUsers = new ConcurrentHashMap<>();

public void save(User userToSave) {
persistedUsers.put(userToSave.id(), userToSave);
}

public Optional<User> findById(String userId) {
return Optional.ofNullable(persistedUsers.get(userId));
}

public Optional<User> findByName(String name) {
return persistedUsers.values().stream()
.filter(user -> user.name().equals(name))
.findFirst();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
events:
queues:
user-created-by-name-queue: user_created_by_name_queue
user-created-record-queue: user_created_record_queue
user-created-event-type-queue: user_created_event_type_queue
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.baeldung.spring.cloud.aws.sqs;

import static org.testcontainers.containers.localstack.LocalStackContainer.Service.SQS;

import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.testcontainers.containers.localstack.LocalStackContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;

@SpringBootTest
@Testcontainers
public class BaseSqsIntegrationTest {

private static final String LOCAL_STACK_VERSION = "localstack/localstack:2.3.2";

@Container
static LocalStackContainer localStack = new LocalStackContainer(DockerImageName.parse(LOCAL_STACK_VERSION));

@DynamicPropertySource
static void overrideProperties(DynamicPropertyRegistry registry) {
registry.add("spring.cloud.aws.region.static", () -> localStack.getRegion());
registry.add("spring.cloud.aws.credentials.access-key", () -> localStack.getAccessKey());
registry.add("spring.cloud.aws.credentials.secret-key", () -> localStack.getSecretKey());
registry.add("spring.cloud.aws.sqs.endpoint", () -> localStack.getEndpointOverride(SQS)
.toString());
// ...other AWS services endpoints can be added here
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package com.baeldung.spring.cloud.aws.sqs;

import static com.baeldung.spring.cloud.aws.sqs.UserEventListeners.EVENT_TYPE_CUSTOM_HEADER;
import static org.awaitility.Awaitility.await;

import java.time.Duration;
import java.util.Map;
import java.util.UUID;

import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

import io.awspring.cloud.sqs.operations.SqsTemplate;

public class SpringCloudAwsSQSLiveTest extends BaseSqsIntegrationTest {

private static final Logger logger = LoggerFactory.getLogger(SpringCloudAwsSQSLiveTest.class);

@Autowired
private SqsTemplate sqsTemplate;

@Autowired
private UserRepository userRepository;

@Autowired
private EventQueuesProperties eventQueuesProperties;

@Test
void givenAStringPayload_whenSend_shouldReceive() {
// given
var userName = "Albert";

// when
sqsTemplate.send(to -> to.queue(eventQueuesProperties.getUserCreatedByNameQueue())
.payload(userName));
logger.info("Message sent with payload {}", userName);

// then
await().atMost(Duration.ofSeconds(3))
.until(() -> userRepository.findByName(userName)
.isPresent());
}

@Test
void givenARecordPayload_whenSend_shouldReceive() {
// given
String userId = UUID.randomUUID()
.toString();
var payload = new UserCreatedEvent(userId, "John", "[email protected]");

// when
sqsTemplate.send(to -> to.queue(eventQueuesProperties.getUserCreatedRecordQueue())
.payload(payload));

// then
logger.info("Message sent with payload: {}", payload);
await().atMost(Duration.ofSeconds(3))
.until(() -> userRepository.findById(userId)
.isPresent());
}

@Test
void givenCustomHeaders_whenSend_shouldReceive() {
// given
String userId = UUID.randomUUID()
.toString();
var payload = new UserCreatedEvent(userId, "John", "[email protected]");
var headers = Map.<String, Object> of(EVENT_TYPE_CUSTOM_HEADER, "UserCreatedEvent");

// when
sqsTemplate.send(to -> to.queue(eventQueuesProperties.getUserCreatedEventTypeQueue())
.payload(payload)
.headers(headers));

// then
logger.info("Sent message with payload {} and custom headers: {}", payload, headers);
await().atMost(Duration.ofSeconds(3))
.until(() -> userRepository.findById(userId)
.isPresent());
}

}

0 comments on commit 5923092

Please sign in to comment.