This Vert.x client allows Amazon SQS access in two ways:
- As a @VertxGen service bridge to Amazon SQS Async Client methods
- As an Amazon SQS queue consuming verticle
Add add the following dependency:
uy.kohesive.vertx:vertx-sqs:1.0.0-BETA-01
Client must be configured with a region. It can also be configured with AWS credentials, otherwise a default ~/.aws/credentials
credentials file will be used:
JsonObject config = new JsonObject()
.put("accessKey", "someAccessKey")
.put("secretKey", "someSecretKey")
.put("region", "us-west-2");
The client is initialized asynchronously:
SqsClient client = SqsClient.create(vertx, config);
client.start(result -> {
if (result.succeeded()) {
System.out.println("Client is initialized");
}
});
Once the client is initialized, it can be used to access the Amazon SQS API in async manner:
client.sendMessage("MyQueue", "Hello World", result -> {
if (result.succeeded()) {
System.out.println("Message is sent");
}
});
An SQS queue consumer can be configured to poll the queue periodically and send the messages polled to a Vert.x event bus address. The verticle is deployed with a config containing AWS credentials (see above), region, SQS queue url, Vert.x address and a polling interval in millisecods:
JsonObject config = new JsonObject()
.put("accessKey", "someAccessKey")
.put("secretKey", "someSecretKey")
.put("region", "us-west-2")
.put("pollingInterval", 1000)
.put("queueUrl", "https://sqs.us-west-2.amazonaws.com/1000/MyQueue")
.put("address", "sqs.queue.MyQueue");
vertx.deployVerticle("uy.kohesive.vertx.sqs.SqsQueueConsumerVerticle", new DeploymentOptions().setConfig(config));
When the verticle is successfully deployed, it starts polling the SQS and routing the messages to the Vert.x address configured.
To delete a message from the SQS queue, simply reply with null
to that message in your Vert.x event bus consumer:
vertx.eventBus().consumer("sqs.queue.MyQueue", message -> {
// Process the message
// ...
message.reply(null);
});
SqsSequentialQueueConsumerVerticle
is used to limit the SQS messages consumption rate. It uses a thread pool of size configured by workersCount
parameters to fetch messages from the queue, and waiting for message acknowledgment (see above) before fetching more messages. Messages are buffered, the buffer size is configured by bufferSize
parameter.
An SQS message producer verticle can be configured to route the event bus messages to an SQS queue. The verticle is deployed with a config containing AWS credentials (see above), region, SQS queue url, Vert.x address and a local
flag, which specifies whether or not the vertcle should start a local message consumer (false
by default):
JsonObject config = new JsonObject()
.put("accessKey", "someAccessKey")
.put("secretKey", "someSecretKey")
.put("region", "us-west-2")
.put("queueUrl", "https://sqs.us-west-2.amazonaws.com/1000/MyQueue")
.put("address", "sqs.queue.MyQueue")
.put("local", true);
vertx.deployVerticle("uy.kohesive.vertx.sqs.SqsQueueProducerVerticle", new DeploymentOptions().setConfig(config));
When the verticle is successfully deployed, it starts routing the event-bus messages (message body is expected to be of String type) from the event-bus address configured to the SQS queue.
All the verticles mentioned above can be constucted using a secondary constructor accepting AWSCredentialsProvider
instance. In that case, no AWS credentials configuration via the JsonObject
is needed, but cluster deployment becomes a problem, as we can't deploy the verticle using only its ID.