Skip to content

Commit

Permalink
#2528 add test
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangheng0027 committed Sep 28, 2023
1 parent c0ef590 commit 3506b5d
Showing 1 changed file with 129 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,45 @@

package org.springframework.amqp.rabbit.listener.adapter;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessagePropertiesBuilder;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.junit.RabbitAvailable;
import org.springframework.amqp.rabbit.junit.RabbitAvailableCondition;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.utils.test.TestUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;

import java.lang.reflect.Method;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.junit.jupiter.api.Test;

import org.springframework.amqp.utils.test.TestUtils;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatIllegalStateException;

/**
* @author Gary Russell
* @author heng zhang
*
* @since 3.0
*
*/
@SpringJUnitConfig
@RabbitAvailable(queues = "test.batchQueue")
public class BatchMessagingMessageListenerAdapterTests {

@Test
Expand All @@ -52,4 +76,104 @@ public void listen(String in) {
public void listen(List<String> in) {
}


@Test
public void errorMsgConvert(@Autowired BatchMessagingMessageListenerAdapterTests.Config config,
@Autowired RabbitTemplate template) throws Exception {

Message message = MessageBuilder.withBody("""
{
"name" : "Tom",
"age" : 18
}
""".getBytes()).andProperties(
MessagePropertiesBuilder.newInstance()
.setContentType("application/json")
.setReplyTo("nowhere")
.build())
.build();

Message errorMessage = MessageBuilder.withBody("".getBytes()).andProperties(
MessagePropertiesBuilder.newInstance()
.setContentType("application/json")
.setReplyTo("nowhere")
.build())
.build();

for (int i = 0; i < config.count; i++) {
template.send("test.batchQueue", message);
template.send("test.batchQueue", errorMessage);
}

assertThat(config.countDownLatch.await(config.count * 1000L, TimeUnit.SECONDS)).isTrue();
}



@Configuration
@EnableRabbit
public static class Config {
volatile int count = 5;
volatile CountDownLatch countDownLatch = new CountDownLatch(count);

@RabbitListener(
queues = "test.batchQueue",
containerFactory = "batchListenerContainerFactory"
)
public void listen(List<Model> list) {
for (Model model : list) {
countDownLatch.countDown();
}

}

@Bean
ConnectionFactory cf() {
return new CachingConnectionFactory(RabbitAvailableCondition.getBrokerRunning().getConnectionFactory());
}

@Bean(name = "batchListenerContainerFactory")
public RabbitListenerContainerFactory<SimpleMessageListenerContainer> rc(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setPrefetchCount(1);
factory.setConcurrentConsumers(1);
factory.setBatchListener(true);
factory.setBatchSize(3);
factory.setConsumerBatchEnabled(true);

Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(new ObjectMapper());
factory.setMessageConverter(jackson2JsonMessageConverter);

return factory;
}

@Bean
RabbitTemplate template(ConnectionFactory cf) {
return new RabbitTemplate(cf);
}


}
public static class Model {
String name;
String age;

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public String getAge() {
return age;
}

public void setAge(String age) {
this.age = age;
}
}

}

0 comments on commit 3506b5d

Please sign in to comment.