Skip to content

Commit

Permalink
Rename proton examples -> AMQP examples and use Vert.x AMQP client in…
Browse files Browse the repository at this point in the history
…stead of Proton for sender/receiver verticles.
  • Loading branch information
vietj committed Nov 7, 2024
1 parent acd53fa commit a037521
Show file tree
Hide file tree
Showing 11 changed files with 193 additions and 567 deletions.
4 changes: 4 additions & 0 deletions README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ mail server and the user credentials in the `MailLogin` example.
The link:service-proxy-examples/README.adoc[Vert.x Service Proxy examples] contains an example of service proxy usage.
It depicts how a service provider can be implemented and how the published service can be consumed.

=== Vert.x AMQP examples

The link:amqp-examples/README.adoc[Vert.x AMQP examples] shows how to interact with an AMQP broker.

=== Spring Examples

The link:spring-examples/README.adoc[Vert.x Spring Examples] shows how vert.x application can be integrated inside a Spring
Expand Down
19 changes: 7 additions & 12 deletions amqp-proton-examples/README.adoc → amqp-examples/README.adoc
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
= Vert.x Proton examples
= Vert.x AMQP 1.0 examples

Here you will find examples demonstrating the Vert.x Proton AMQP library in action.
Here you will find examples demonstrating the Vert.x AMQP Client and Vert.x Proton AMQP library in action.

This component facilitates AMQP integrations for Vert.x by providing a thin wrapper around the link:http://qpid.apache.org[Apache Qpid] Proton-J AMQP 1.0 protocol engine.
Please consult the Vert.x Proton documentation for more information.
Please consult the Vert.x AMQP Client and Proton documentation for more information.

**NOTE: The Client examples require a server with AMQP 1.0 support listening at port 5672 on localhost and offering SASL ANONYMOUS.**

Expand All @@ -15,16 +15,11 @@ These examples demonstrate messaging between a client sender, client receiver, a

The Receiver example consumes incoming messages from an address, prints their content, and accepts them. The Sender example sends a message to that address every second.

link:src/main/java/io/vertx/example/proton/client/Receiver.java[Receiver] +
link:src/main/java/io/vertx/example/proton/client/Sender.java[Sender] +

The ReconnectReceiver example furthers the above to reconnects on connection closure or disconnect. The ReconnectSender similarly reconnects, until it confirms sending a set number of messages. They each try connecting to localhost:5672 and also localhost:15672.

link:src/main/java/io/vertx/example/proton/client/ReconnectReceiver.java[ReconnectReceiver] +
link:src/main/java/io/vertx/example/proton/client/ReconnectSender.java[ReconnectSender] +

link:src/main/java/io/vertx/example/amqp/client/Receiver.java[Receiver] +
link:src/main/java/io/vertx/example/amqp/client/Sender.java[Sender] +

== Server

This server example listens for incoming connections on port 5672. It prints any messages received from client senders, and periodically sends generated messages to client receivers.
link:src/main/java/io/vertx/example/proton/server/HelloServer.java[HelloServer] +

link:src/main/java/io/vertx/example/amqp/server/HelloServer.java[HelloServer] +
7 changes: 6 additions & 1 deletion amqp-proton-examples/pom.xml → amqp-examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<version>5.0.0-SNAPSHOT</version>
</parent>

<artifactId>amqp-proton-examples</artifactId>
<artifactId>amqp-examples</artifactId>

<dependencies>
<!-- primary deps -->
Expand All @@ -19,6 +19,11 @@
<artifactId>vertx-proton</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-amqp-client</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-launcher-application</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.vertx.example.amqp.client;

import io.vertx.amqp.AmqpClient;
import io.vertx.amqp.AmqpClientOptions;
import io.vertx.amqp.AmqpConnection;
import io.vertx.amqp.AmqpReceiver;
import io.vertx.core.Future;
import io.vertx.core.VerticleBase;
import io.vertx.launcher.application.VertxApplication;

public class Receiver extends VerticleBase {

private String address = "examples";

public static void main(String[] args) {
VertxApplication.main(new String[]{Receiver.class.getName()});
}

private AmqpClient client;
private AmqpConnection connection;
private AmqpReceiver receiver;

@Override
public Future<?> start() {
client = AmqpClient.create(vertx, new AmqpClientOptions()
.setPort(5672)
.setHost("localhost")
);

return client.connect().compose(conn -> conn
.createReceiver(address)
.andThen(ar -> {
if (ar.succeeded()) {
connection = conn;
receiver = ar.result();

receiver.handler(msg -> {
System.out.println("Received message with content: " + msg.bodyAsString());
});
} else {
conn.close();
}
}))
.onSuccess(v -> System.out.println("Received created"));
}

@Override
public Future<?> stop() {
return client.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.vertx.example.amqp.client;

import io.vertx.amqp.*;
import io.vertx.core.Future;
import io.vertx.core.VerticleBase;
import io.vertx.launcher.application.VertxApplication;

import java.util.concurrent.atomic.AtomicInteger;

import static io.vertx.proton.ProtonHelper.message;

public class Sender extends VerticleBase {

private String address = "examples";
private AtomicInteger sent = new AtomicInteger();

public static void main(String[] args) {
VertxApplication.main(new String[]{Sender.class.getName()});
}

private AmqpClient client;
private AmqpConnection connection;
private AmqpSender sender;

@Override
public Future<?> start() throws Exception {
AmqpClient client = AmqpClient.create(vertx, new AmqpClientOptions()
.setPort(5672)
.setHost("localhost"));

return client.connect().compose(conn -> conn
.createSender(address)
.andThen(ar -> {
if (ar.succeeded()) {
connection = conn;
sender = ar.result();

// Schedule sending of a message every second
System.out.println("Sender created, scheduling sends.");

vertx.setPeriodic(1000, x -> {
if(!sender.writeQueueFull()) {
final int msgNum = sent.incrementAndGet();
AmqpMessage message = AmqpMessage.create().withBody("Hello " + msgNum + " from Sender").build();

sender.sendWithAck(message).onComplete(ack -> {
if (ack.succeeded()) {
System.out.printf("Message " + msgNum + " was received by the server%n");
} else {
System.out.println("Ack failed " + ack.cause().getMessage());
}
});

System.out.println("Sent message: " + msgNum);
} else {
System.out.println("No credit to send, waiting.");
}
});
} else {
conn.close();
}
}));
}
}
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.vertx.example.proton.server;

import io.vertx.core.AbstractVerticle;
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.vertx.example.amqp.server;

import io.vertx.core.Future;
import io.vertx.core.VerticleBase;
import io.vertx.core.Vertx;
import io.vertx.launcher.application.VertxApplication;
import io.vertx.proton.ProtonConnection;
Expand All @@ -35,12 +36,12 @@

/**
* HelloServer
*
* <p>
* Allows attaching senders and receivers to any address, printing the messages
* received from producers, and periodically sending any consumers a message.
*/

public class HelloServer extends AbstractVerticle {
public class HelloServer extends VerticleBase {

private static final int PORT = 5672;

Expand All @@ -49,22 +50,17 @@ public static void main(String[] args) {
}

@Override
public void start() throws Exception {
public Future<?> start() throws Exception {
ProtonServer server = ProtonServer.create(vertx);

// Configure how new connections are handled
server.connectHandler((connection) -> {
initConnection(vertx, connection);
});

server.listen(PORT, (res) -> {
if (res.succeeded()) {
System.out.println("Listening on port " + res.result().actualPort());
} else {
System.out.println("Failed to start listening on port " + PORT + ":");
res.cause().printStackTrace();
}
});
return Future
.<ProtonServer>future(promise -> server.listen(PORT, promise::handle))
.onSuccess(v -> System.out.println("Listening on port " + v.actualPort()));
}

// Initialise then open new connections
Expand Down Expand Up @@ -103,7 +99,7 @@ private static void initConnection(Vertx vertx, ProtonConnection connection) {
// Initialise then open new sender (when a client receiver/consumer attaches)
private static void initSender(Vertx vertx, ProtonConnection connection, ProtonSender sender) {
org.apache.qpid.proton.amqp.messaging.Source remoteSource = (org.apache.qpid.proton.amqp.messaging.Source) sender.getRemoteSource();
if(remoteSource == null) {
if (remoteSource == null) {
// Example doesn't support 'looking up' existing links, so we will just close with an error
sender.setTarget(null);
sender.setCondition(new ErrorCondition(AmqpError.INVALID_FIELD, "No source terminus specified"));
Expand All @@ -117,7 +113,7 @@ private static void initSender(Vertx vertx, ProtonConnection connection, ProtonS
// This is rather naive, for example use only, proper servers should
// ensure that they advertise their own Source settings which actually
// reflect what is in place.
if(remoteSource.getDynamic()) {
if (remoteSource.getDynamic()) {
String dynamicAddress = UUID.randomUUID().toString();
remoteSource.setAddress(dynamicAddress);
}
Expand All @@ -133,7 +129,7 @@ private static void initSender(Vertx vertx, ProtonConnection connection, ProtonS
if (connection.isDisconnected()) {
vertx.cancelTimer(t);
} else {
if(!sender.sendQueueFull()) {
if (!sender.sendQueueFull()) {
int msgNum = sent.incrementAndGet();
System.out.println("Sending message " + msgNum + " to client, for address: " + remoteSource.getAddress());
Message m = message("Hello " + msgNum + " from Server!");
Expand Down Expand Up @@ -162,7 +158,7 @@ private static void initSender(Vertx vertx, ProtonConnection connection, ProtonS
// Initialise then open new receiver (when a client sender/producer attaches)
private static void initReceiver(ProtonReceiver receiver) {
org.apache.qpid.proton.amqp.messaging.Target remoteTarget = (org.apache.qpid.proton.amqp.messaging.Target) receiver.getRemoteTarget();
if(remoteTarget == null) {
if (remoteTarget == null) {
// Example doesn't support 'looking up' existing links, so we will just close with an error.
receiver.setTarget(null);
receiver.setCondition(new ErrorCondition(AmqpError.INVALID_FIELD, "No target terminus specified"));
Expand All @@ -176,7 +172,7 @@ private static void initReceiver(ProtonReceiver receiver) {
// This is rather naive, for example use only, proper servers should
// ensure that they advertise their own Target settings which actually
// reflect what is in place.
if(remoteTarget.getDynamic()) {
if (remoteTarget.getDynamic()) {
String dynamicAddress = UUID.randomUUID().toString();
remoteTarget.setAddress(dynamicAddress);
}
Expand All @@ -189,7 +185,7 @@ private static void initReceiver(ProtonReceiver receiver) {
// handler returns if another disposition hasn't been applied, and also grants
// credit when opened and replenishes it as messages are received.
receiver.handler((delivery, msg) -> {
String address = remoteTarget.getAddress() ;
String address = remoteTarget.getAddress();
if (address == null) {
address = msg.getAddress();
}
Expand Down
Loading

0 comments on commit a037521

Please sign in to comment.