Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exception in Consumer breaks subscription #441

Open
jmattheis opened this issue Oct 11, 2023 · 0 comments
Open

Exception in Consumer breaks subscription #441

jmattheis opened this issue Oct 11, 2023 · 0 comments

Comments

@jmattheis
Copy link

When an exception is thrown in the com.rabbitmq.client.Consumer#handleDelivery method, the consumer doesn't receive messages afterwards because the delivery thread is dead.

The test throwKillsConsumer, will throw an exception when receiving the first message. The second message won't be delivered. This was hand to debug because no exception is logged about the dying thread.

The test noThrowIsFine, delivers all messages as expected.

Click to expand

import static org.junit.jupiter.api.Assertions.assertEquals;

import com.github.fridujo.rabbitmq.mock.MockConnectionFactory;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;

public class MockConnectionTest3 {

    private static final String EXCHANGE = "exchange";
    private static final String QUEUE = "queue";

    @Test
    void throwKillsConsumer() throws Exception {
        var factory = new MockConnectionFactory();

        var conn = factory.newConnection();
        var channel = conn.createChannel();
        channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT, true);
        channel.queueDeclare(QUEUE, false, true, true, Map.of());
        channel.queueBind(QUEUE, EXCHANGE, "");

        channel.basicPublish(EXCHANGE, "", new AMQP.BasicProperties(), "throw".getBytes());
        channel.basicPublish(EXCHANGE, "", new AMQP.BasicProperties(), "complete".getBytes());

        var queue = readFromQueue(conn);
        assertEquals("throw", queue.poll(1, TimeUnit.SECONDS));
        assertEquals("complete", queue.poll(1, TimeUnit.SECONDS));
    }

    @Test
    void noThrowIsFine() throws Exception {
        var factory = new MockConnectionFactory();

        var conn = factory.newConnection();
        var channel = conn.createChannel();
        channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT, true);
        channel.queueDeclare(QUEUE, false, true, true, Map.of());
        channel.queueBind(QUEUE, EXCHANGE, "");

        channel.basicPublish(EXCHANGE, "", new AMQP.BasicProperties(), "first".getBytes());
        channel.basicPublish(EXCHANGE, "", new AMQP.BasicProperties(), "complete".getBytes());

        var queue = readFromQueue(conn);
        assertEquals("first", queue.poll(1, TimeUnit.SECONDS));
        assertEquals("complete", queue.poll(1, TimeUnit.SECONDS));
    }

    ArrayBlockingQueue<String> readFromQueue(Connection conn) throws IOException {
        var channel = conn.createChannel();
        var queue = new ArrayBlockingQueue<String>(3);

        var consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                try {
                    queue.put(new String(body));
                } catch (InterruptedException e) {}

                if (new String(body).equals("throw")) {
                    throw new RuntimeException("oops");
                }
            }
        };

        channel.basicConsume(QUEUE, true, consumer);
        return queue;
    }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant