Skip to content

Commit

Permalink
Add rabbitmq test
Browse files Browse the repository at this point in the history
  • Loading branch information
Xharos committed May 18, 2024
1 parent ad43d01 commit d2da491
Show file tree
Hide file tree
Showing 5 changed files with 195 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,5 @@ public interface NetOutput {

void writeVarLong(long l) throws IOException;

byte[] getBuffer();
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ public ByteBuffer getByteBuffer() {
return buffer;
}

@Override
public byte[] getBuffer() {
return getByteBuffer().array();
}

@Override
public void writeBigInteger(BigInteger bInt) {
if (bInt == null)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package fr.islandswars.commons.service.rabbitmq;

import com.rabbitmq.client.*;
import fr.islandswars.commons.network.NetInput;
import fr.islandswars.commons.network.nio.InputByteBuffer;

import java.io.IOException;
import java.nio.ByteBuffer;

/**
* File <b>ConsumerHandler</b> located on fr.islandswars.commons.service.rabbitmq
* ConsumerHandler is a part of commons.
* <p>
* Copyright (c) 2017 - 2024 Islands Wars.
* <p>
* commons is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
* <p>
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* <p>
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <a href="http://www.gnu.org/licenses/">GNU license</a>.
* <p>
*
* @author Jangliu, {@literal <[email protected]>}
* Created the 18/05/2024 at 21:50
* @since 0.1
*/
public abstract class ConsumerHandler implements Consumer {

protected final String id;
protected String tag;
protected Channel channel;

public ConsumerHandler(String queueName) {
this.id = queueName;
}

@Override
public void handleConsumeOk(String consumerTag) {

}

@Override
public void handleCancelOk(String consumerTag) {

}

@Override
public void handleCancel(String consumerTag) throws IOException {

}

@Override
public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {

}

@Override
public void handleRecoverOk(String consumerTag) {

}

@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
var buffer = new InputByteBuffer(ByteBuffer.wrap(body));
handleDelivery(envelope, buffer);
}

public abstract void handleDelivery(Envelope envelope, NetInput output);

}
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package fr.islandswars.commons.service.rabbitmq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.*;
import fr.islandswars.commons.network.NetOutput;
import fr.islandswars.commons.secrets.DockerSecretsLoader;
import fr.islandswars.commons.service.ServiceConnection;
import fr.islandswars.commons.service.ServiceType;
import fr.islandswars.commons.utils.LogUtils;
import fr.islandswars.commons.utils.Preconditions;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

Expand Down Expand Up @@ -96,4 +97,32 @@ public void load() throws NullPointerException {
factory.setHost(host);
factory.setPort(Integer.decode(port));
}

public void notifyTopic(NetOutput buffer, String queueName, String routingKey) throws Exception {
if (!isClosed()) {
var channel = getConnection();
try {
channel.queueDeclare(queueName, false, false, false, null);
channel.basicPublish("", queueName, null, buffer.getBuffer());
} catch (Exception e) {
LogUtils.error(e);
}
}
}

public void registerConsumer(String queueName, int prefetchCount) {
if (!isClosed()) {
var channel = getConnection();
try {
channel.queueDeclare(queueName, false, false, false, null);
channel.basicQos(prefetchCount);
channel.basicConsume(queueName, true, (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [x] Received '" + message + "'");
}, consumerTag -> { });
} catch (IOException e) {
LogUtils.error(e);
}
}
}
}
90 changes: 80 additions & 10 deletions src/test/java/fr/islandswars/test/RabbitMQTest.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
package fr.islandswars.test;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import fr.islandswars.commons.service.rabbitmq.RabbitMQConnection;
import org.junit.jupiter.api.*;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.*;

/**
* File <b>RabbitMQTest</b> located on fr.islandswars.test
Expand Down Expand Up @@ -37,28 +41,94 @@
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class RabbitMQTest {

private static RabbitMQConnection rabbitClient;
private static Channel connection;
private static final String QUEUE_NAME = "TEST";
private static final String EXCHANGE_NAME = "TEST_EXCHANGE";
private static final BuiltinExchangeType EXCHANGE_TYPE = BuiltinExchangeType.FANOUT;
private static RabbitMQConnection rabbitClient;
private static Channel channel;

@BeforeAll
public static void setup() throws IOException, TimeoutException {
public static void setupConnection() throws IOException, TimeoutException {
rabbitClient = new RabbitMQConnection();
rabbitClient.load();
rabbitClient.connect();
connection = rabbitClient.getConnection();
channel = rabbitClient.getConnection();
}

@AfterAll
public static void tearDown() throws Exception {
public static void closeConnection() throws Exception {
if (channel != null && channel.isOpen()) {
channel.close();
}
rabbitClient.close();
}

@BeforeEach
public void setup() throws Exception {
// Ensure the queue is declared before each test
channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE);
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
}

@AfterEach
public void tearDown() throws Exception {
// Clean up the queue after each test
if (channel != null && channel.isOpen()) {
channel.queueDelete(QUEUE_NAME);
channel.exchangeDelete(EXCHANGE_NAME);
}
}

@Test
@Order(1)
public void testConnection() {
assertNotNull(connection, "Broker connection should not be null");
assertNotNull(channel, "Broker connection should not be null");
assertTrue(channel.isOpen(), "The channel should be open");
}

@Test
@Order(2)
public void registerConsumer() throws Exception {
assertNotNull(channel, "Broker connection should not be null");

final String message = "Hello RabbitMQ!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
var response = channel.basicGet(QUEUE_NAME, true);

assertEquals(new String(response.getBody(), StandardCharsets.UTF_8), message, "The message received should be the same as the message sent");
}

@Test
@Order(3)
public void testMultipleConsumers() throws Exception {
assertNotNull(channel, "Broker connection should not be null");

final String message = "Hello RabbitMQ!";
final int numberOfConsumers = 3;
final CountDownLatch latch = new CountDownLatch(numberOfConsumers);

// Create a consumer that counts down the latch
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String receivedMessage = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(receivedMessage);
assertEquals(message, receivedMessage, "The message received should be the same as the message sent");
latch.countDown();
};

// Register multiple consumers
for (int i = 0; i < numberOfConsumers; i++) {
String uniqueQueueName = QUEUE_NAME+"_" + i;
channel.queueDeclare(uniqueQueueName, false, false, false, null);
channel.queueBind(uniqueQueueName, EXCHANGE_NAME, "");
channel.basicConsume(uniqueQueueName, true, deliverCallback, consumerTag -> {});
}

// Publish a message to the queue
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8));

var result = connection.isOpen();
assertTrue(result);
// Wait for the latch to count down
boolean allMessagesReceived = latch.await(5, TimeUnit.SECONDS);
assertTrue(allMessagesReceived, "All consumers should receive the message");
}
}

0 comments on commit d2da491

Please sign in to comment.