-
Notifications
You must be signed in to change notification settings - Fork 12
/
SqsEcho.scala
153 lines (134 loc) · 5.49 KB
/
SqsEcho.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
package alpakka.sqs
import com.github.pjfanning.pekkohttpspi.PekkoHttpClient
import org.apache.commons.validator.routines.UrlValidator
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.stream.connectors.sqs.*
import org.apache.pekko.stream.connectors.sqs.scaladsl.{SqsAckSink, SqsPublishSink, SqsSource}
import org.apache.pekko.stream.scaladsl.{Flow, Sink, Source}
import org.slf4j.{Logger, LoggerFactory}
import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, StaticCredentialsProvider}
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.sqs.SqsAsyncClient
import software.amazon.awssdk.services.sqs.model.{CreateQueueRequest, DeleteQueueRequest, DeleteQueueResponse, Message}
import java.net.URI
import scala.collection.immutable
import scala.concurrent.duration.{DurationInt, SECONDS}
import scala.concurrent.{Await, ExecutionContextExecutor}
/**
* Echo flow via a SQS standard queue:
* - produce n String msgs
* - consume n String msgs
*
* Run this class against your AWS account using hardcoded accessKey/secretKey
* or
* Run via [[alpakka.sqs.SqsEchoIT]] against localStack docker container
*
* Remarks:
* - For convenience we use the async `awsSqsClient` to create/delete the queue
* - Be warned that running against AWS (and thus setting up resources) can cost you money
*
* Doc:
* https://doc.akka.io/docs/alpakka/current/sqs.html
* https://docs.localstack.cloud/user-guide/aws/sqs
*/
class SqsEcho(urlWithMappedPort: URI = new URI(""), accessKey: String = "", secretKey: String = "", region: String = "") {
val logger: Logger = LoggerFactory.getLogger(this.getClass)
implicit val system: ActorSystem = ActorSystem("SqsEcho")
implicit val executionContext: ExecutionContextExecutor = system.dispatcher
val queueName = "mysqs-queue"
private var queueUrl = ""
implicit val awsSqsClient: SqsAsyncClient =
if (new UrlValidator().isValid(urlWithMappedPort.toString)) {
logger.info("Running against localStack on local container...")
val credentialsProvider = StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey, secretKey))
SqsAsyncClient
.builder()
.endpointOverride(urlWithMappedPort)
.credentialsProvider(credentialsProvider)
.region(Region.of(region))
.httpClient(PekkoHttpClient.builder().withActorSystem(system).build())
.build()
}
else {
logger.info("Running against AWS...")
// Add your credentials
val credentialsProvider = StaticCredentialsProvider.create(AwsBasicCredentials.create("***", "***"))
SqsAsyncClient
.builder()
.credentialsProvider(credentialsProvider)
.region(Region.EU_WEST_1)
.httpClient(PekkoHttpClient.builder().withActorSystem(system).build())
.build()
}
system.registerOnTermination(awsSqsClient.close())
def run(): Int = {
queueUrl = createStandardQueue()
logger.info(s"Created queue with URL: $queueUrl")
val done = for {
_ <- producerClient()
consumed <- consumerClient()
} yield consumed
val result = Await.result(done, 10.seconds)
logger.info(s"Successfully consumed: ${result.size} msgs")
result.size
}
private def producerClient() = {
logger.info(s"About to start producing msgs to URL: {}", queueUrl)
val messages = for (i <- 0 until 10) yield s"Message - $i"
val done = Source(messages)
.runWith(SqsPublishSink.grouped(queueUrl, SqsPublishGroupedSettings.Defaults.withMaxBatchSize(2)))
done
}
private def consumerClient() = {
logger.info(s"About to start consuming msgs from URL: {}", queueUrl)
// Grace time to process msgs on server
Thread.sleep(1000)
val settings = SqsSourceSettings()
// Long polling to avoid (expensive) empty reads, MAX is 20s
// https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-short-and-long-polling.html
.withWaitTime(2.seconds)
.withMaxBufferSize(100)
.withMaxBatchSize(2)
.withAttributes(immutable.Seq(SenderId, SentTimestamp))
// Let the stream complete when there are no more messages on the queue
// In realistic scenarios, you should add a KillSwitch to the stream
.withCloseOnEmptyReceive(true)
// Invisible for other concurrent consumers
// https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html
.withVisibilityTimeout(10.seconds)
val ackFlow = Flow[Message]
.map(MessageAction.Delete(_))
.to(SqsAckSink(queueUrl)) // Handle ack/deletion (via internal ReceiptHandle)
val messages =
SqsSource(queueUrl, settings)
.alsoTo(ackFlow)
.wireTap(msg => logger.info(s"Received msg: $msg"))
.runWith(Sink.seq)
messages
}
// When the queue already exists, return it's queueUrl
private def createStandardQueue(): String = {
val response = awsSqsClient
.createQueue(
CreateQueueRequest.builder()
.queueName(queueName)
.build())
.get(10, SECONDS)
response.queueUrl()
}
private def deleteQueue(): DeleteQueueResponse = {
val response = awsSqsClient
.deleteQueue(DeleteQueueRequest.builder()
.queueUrl(queueUrl)
.build())
.get(10, SECONDS)
logger.info("Successfully deleted SQS queue with URL: {}", queueUrl)
response
}
}
object SqsEcho extends App {
val echo = new SqsEcho()
echo.run()
// Avoid dangling resources on AWS, comment out for testing
echo.deleteQueue()
}