From d2da491ac5d4f76b58f240659fbafe114e60794e Mon Sep 17 00:00:00 2001 From: Xharos Date: Sat, 18 May 2024 22:57:15 +0200 Subject: [PATCH] Add rabbitmq test --- .../commons/network/NetOutput.java | 1 + .../commons/network/nio/OutputByteBuffer.java | 5 ++ .../service/rabbitmq/ConsumerHandler.java | 77 ++++++++++++++++ .../service/rabbitmq/RabbitMQConnection.java | 35 +++++++- .../fr/islandswars/test/RabbitMQTest.java | 90 ++++++++++++++++--- 5 files changed, 195 insertions(+), 13 deletions(-) create mode 100644 src/main/java/fr/islandswars/commons/service/rabbitmq/ConsumerHandler.java diff --git a/src/main/java/fr/islandswars/commons/network/NetOutput.java b/src/main/java/fr/islandswars/commons/network/NetOutput.java index 16aef7f..730f50d 100644 --- a/src/main/java/fr/islandswars/commons/network/NetOutput.java +++ b/src/main/java/fr/islandswars/commons/network/NetOutput.java @@ -72,4 +72,5 @@ public interface NetOutput { void writeVarLong(long l) throws IOException; + byte[] getBuffer(); } \ No newline at end of file diff --git a/src/main/java/fr/islandswars/commons/network/nio/OutputByteBuffer.java b/src/main/java/fr/islandswars/commons/network/nio/OutputByteBuffer.java index 0b12298..88c4f10 100644 --- a/src/main/java/fr/islandswars/commons/network/nio/OutputByteBuffer.java +++ b/src/main/java/fr/islandswars/commons/network/nio/OutputByteBuffer.java @@ -45,6 +45,11 @@ public ByteBuffer getByteBuffer() { return buffer; } + @Override + public byte[] getBuffer() { + return getByteBuffer().array(); + } + @Override public void writeBigInteger(BigInteger bInt) { if (bInt == null) diff --git a/src/main/java/fr/islandswars/commons/service/rabbitmq/ConsumerHandler.java b/src/main/java/fr/islandswars/commons/service/rabbitmq/ConsumerHandler.java new file mode 100644 index 0000000..8dba841 --- /dev/null +++ b/src/main/java/fr/islandswars/commons/service/rabbitmq/ConsumerHandler.java @@ -0,0 +1,77 @@ +package fr.islandswars.commons.service.rabbitmq; + +import com.rabbitmq.client.*; +import fr.islandswars.commons.network.NetInput; +import fr.islandswars.commons.network.nio.InputByteBuffer; + +import java.io.IOException; +import java.nio.ByteBuffer; + +/** + * File ConsumerHandler located on fr.islandswars.commons.service.rabbitmq + * ConsumerHandler is a part of commons. + *

+ * Copyright (c) 2017 - 2024 Islands Wars. + *

+ * commons is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + *

+ * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + *

+ * You should have received a copy of the GNU General Public License + * along with this program. If not, see GNU license. + *

+ * + * @author Jangliu, {@literal } + * Created the 18/05/2024 at 21:50 + * @since 0.1 + */ +public abstract class ConsumerHandler implements Consumer { + + protected final String id; + protected String tag; + protected Channel channel; + + public ConsumerHandler(String queueName) { + this.id = queueName; + } + + @Override + public void handleConsumeOk(String consumerTag) { + + } + + @Override + public void handleCancelOk(String consumerTag) { + + } + + @Override + public void handleCancel(String consumerTag) throws IOException { + + } + + @Override + public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) { + + } + + @Override + public void handleRecoverOk(String consumerTag) { + + } + + @Override + public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { + var buffer = new InputByteBuffer(ByteBuffer.wrap(body)); + handleDelivery(envelope, buffer); + } + + public abstract void handleDelivery(Envelope envelope, NetInput output); + +} diff --git a/src/main/java/fr/islandswars/commons/service/rabbitmq/RabbitMQConnection.java b/src/main/java/fr/islandswars/commons/service/rabbitmq/RabbitMQConnection.java index b3335c1..a56b795 100644 --- a/src/main/java/fr/islandswars/commons/service/rabbitmq/RabbitMQConnection.java +++ b/src/main/java/fr/islandswars/commons/service/rabbitmq/RabbitMQConnection.java @@ -1,14 +1,15 @@ package fr.islandswars.commons.service.rabbitmq; -import com.rabbitmq.client.Channel; -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.*; +import fr.islandswars.commons.network.NetOutput; import fr.islandswars.commons.secrets.DockerSecretsLoader; import fr.islandswars.commons.service.ServiceConnection; import fr.islandswars.commons.service.ServiceType; +import fr.islandswars.commons.utils.LogUtils; import fr.islandswars.commons.utils.Preconditions; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -96,4 +97,32 @@ public void load() throws NullPointerException { factory.setHost(host); factory.setPort(Integer.decode(port)); } + + public void notifyTopic(NetOutput buffer, String queueName, String routingKey) throws Exception { + if (!isClosed()) { + var channel = getConnection(); + try { + channel.queueDeclare(queueName, false, false, false, null); + channel.basicPublish("", queueName, null, buffer.getBuffer()); + } catch (Exception e) { + LogUtils.error(e); + } + } + } + + public void registerConsumer(String queueName, int prefetchCount) { + if (!isClosed()) { + var channel = getConnection(); + try { + channel.queueDeclare(queueName, false, false, false, null); + channel.basicQos(prefetchCount); + channel.basicConsume(queueName, true, (consumerTag, delivery) -> { + String message = new String(delivery.getBody(), StandardCharsets.UTF_8); + System.out.println(" [x] Received '" + message + "'"); + }, consumerTag -> { }); + } catch (IOException e) { + LogUtils.error(e); + } + } + } } diff --git a/src/test/java/fr/islandswars/test/RabbitMQTest.java b/src/test/java/fr/islandswars/test/RabbitMQTest.java index aec4ff9..28cc74d 100644 --- a/src/test/java/fr/islandswars/test/RabbitMQTest.java +++ b/src/test/java/fr/islandswars/test/RabbitMQTest.java @@ -1,14 +1,18 @@ package fr.islandswars.test; +import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; +import com.rabbitmq.client.DeliverCallback; import fr.islandswars.commons.service.rabbitmq.RabbitMQConnection; import org.junit.jupiter.api.*; import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.*; /** * File RabbitMQTest located on fr.islandswars.test @@ -37,28 +41,94 @@ @TestMethodOrder(MethodOrderer.OrderAnnotation.class) public class RabbitMQTest { - private static RabbitMQConnection rabbitClient; - private static Channel connection; + private static final String QUEUE_NAME = "TEST"; + private static final String EXCHANGE_NAME = "TEST_EXCHANGE"; + private static final BuiltinExchangeType EXCHANGE_TYPE = BuiltinExchangeType.FANOUT; + private static RabbitMQConnection rabbitClient; + private static Channel channel; @BeforeAll - public static void setup() throws IOException, TimeoutException { + public static void setupConnection() throws IOException, TimeoutException { rabbitClient = new RabbitMQConnection(); rabbitClient.load(); rabbitClient.connect(); - connection = rabbitClient.getConnection(); + channel = rabbitClient.getConnection(); } @AfterAll - public static void tearDown() throws Exception { + public static void closeConnection() throws Exception { + if (channel != null && channel.isOpen()) { + channel.close(); + } rabbitClient.close(); } + @BeforeEach + public void setup() throws Exception { + // Ensure the queue is declared before each test + channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE); + channel.queueDeclare(QUEUE_NAME, false, false, false, null); + channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); + } + + @AfterEach + public void tearDown() throws Exception { + // Clean up the queue after each test + if (channel != null && channel.isOpen()) { + channel.queueDelete(QUEUE_NAME); + channel.exchangeDelete(EXCHANGE_NAME); + } + } + @Test @Order(1) public void testConnection() { - assertNotNull(connection, "Broker connection should not be null"); + assertNotNull(channel, "Broker connection should not be null"); + assertTrue(channel.isOpen(), "The channel should be open"); + } + + @Test + @Order(2) + public void registerConsumer() throws Exception { + assertNotNull(channel, "Broker connection should not be null"); + + final String message = "Hello RabbitMQ!"; + channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8)); + var response = channel.basicGet(QUEUE_NAME, true); + + assertEquals(new String(response.getBody(), StandardCharsets.UTF_8), message, "The message received should be the same as the message sent"); + } + + @Test + @Order(3) + public void testMultipleConsumers() throws Exception { + assertNotNull(channel, "Broker connection should not be null"); + + final String message = "Hello RabbitMQ!"; + final int numberOfConsumers = 3; + final CountDownLatch latch = new CountDownLatch(numberOfConsumers); + + // Create a consumer that counts down the latch + DeliverCallback deliverCallback = (consumerTag, delivery) -> { + String receivedMessage = new String(delivery.getBody(), StandardCharsets.UTF_8); + System.out.println(receivedMessage); + assertEquals(message, receivedMessage, "The message received should be the same as the message sent"); + latch.countDown(); + }; + + // Register multiple consumers + for (int i = 0; i < numberOfConsumers; i++) { + String uniqueQueueName = QUEUE_NAME+"_" + i; + channel.queueDeclare(uniqueQueueName, false, false, false, null); + channel.queueBind(uniqueQueueName, EXCHANGE_NAME, ""); + channel.basicConsume(uniqueQueueName, true, deliverCallback, consumerTag -> {}); + } + + // Publish a message to the queue + channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8)); - var result = connection.isOpen(); - assertTrue(result); + // Wait for the latch to count down + boolean allMessagesReceived = latch.await(5, TimeUnit.SECONDS); + assertTrue(allMessagesReceived, "All consumers should receive the message"); } }