Skip to content

Commit

Permalink
Merge branch 'dev'
Browse files Browse the repository at this point in the history
  • Loading branch information
yanhom1314 committed Jul 13, 2023
2 parents 974d972 + dbacde9 commit 13c00e3
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,18 @@

package org.dromara.dynamictp.adapter.rabbitmq;

import lombok.extern.slf4j.Slf4j;
import lombok.val;
import org.apache.commons.collections4.MapUtils;
import org.dromara.dynamictp.adapter.common.AbstractDtpAdapter;
import org.dromara.dynamictp.common.spring.ApplicationContextHolder;
import org.dromara.dynamictp.common.properties.DtpProperties;
import org.dromara.dynamictp.common.spring.ApplicationContextHolder;
import org.dromara.dynamictp.common.util.ReflectionUtil;
import org.dromara.dynamictp.core.support.ExecutorWrapper;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import org.apache.commons.collections4.MapUtils;
import org.springframework.amqp.rabbit.connection.AbstractConnectionFactory;

import java.util.Objects;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ExecutorService;

/**
* RabbitMqDtpAdapter related
Expand Down Expand Up @@ -59,7 +59,7 @@ protected void initialize() {
}
beans.forEach((k, v) -> {
AbstractConnectionFactory abstractConnectionFactory = (AbstractConnectionFactory) v;
ThreadPoolExecutor executor = (ThreadPoolExecutor) ReflectionUtil.getFieldValue(
ExecutorService executor = (ExecutorService) ReflectionUtil.getFieldValue(
AbstractConnectionFactory.class, CONSUME_EXECUTOR_FIELD_NAME, abstractConnectionFactory);

if (Objects.nonNull(executor)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ThreadPoolExecutor;

/**
* @author fabian4
*/
Expand All @@ -40,22 +42,24 @@ public Queue queue() {
return new Queue("testQueue");
}

/**
* 往abstractConnectionFactory里面设置线程池
* 这里需要注意 配置文件 rabbitmqTp对应的threadPoolName 要与 RabbitMqDtpAdapter的genTpName方法生成的名字对上
*/
@Bean
public AbstractRabbitListenerContainerFactory<?> defaultRabbitListenerContainerFactory(AbstractConnectionFactory abstractConnectionFactory) {

public AbstractRabbitListenerContainerFactory<?> rabbitListenerContainerFactory(AbstractConnectionFactory abstractConnectionFactory) {
DirectRabbitListenerContainerFactory factory = new DirectRabbitListenerContainerFactory();
factory.setMessageConverter(jackson2JsonMessageConverter());
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setMaxPoolSize(5);
executor.setCorePoolSize(5);
executor.setQueueCapacity(1000);
executor.setThreadNamePrefix("RabbitMqTaskExecutor-");

executor.setCorePoolSize(2);
executor.setMaxPoolSize(4);
executor.setQueueCapacity(10);
executor.setThreadNamePrefix("rabbitConnectionFactoryTp");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();

DirectRabbitListenerContainerFactory factory = new DirectRabbitListenerContainerFactory();
factory.setConnectionFactory(abstractConnectionFactory);
factory.setMessageConverter(jackson2JsonMessageConverter());
factory.setConsumersPerQueue(10);
abstractConnectionFactory.setExecutor(executor);
//factory.setConsumersPerQueue(10);
abstractConnectionFactory.setExecutor(executor.getThreadPoolExecutor());
return factory;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,14 @@
@RabbitListener(queues = "testQueue")
public class RabbitMqConsumer {

@RabbitListener(containerFactory = "rabbitListenerContainerFactory")
@RabbitHandler
public void process(String text) {
log.info("Receiver : " + text);
public void process(String text) throws InterruptedException {
Thread thread = Thread.currentThread();
String name = thread.getName();
long id = thread.getId();
//Thread.sleep(2000L);
//TimeUnit.SECONDS.sleep(1);
log.info("thread id :"+id+";thread name :"+name+" Receiver : " + text);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,34 @@ spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: admin
password: admin
#virtual-host: /playlet-channel
#username: rabbit
#password: rabbit
#addresses: 111111

dynamic:
tp:
enabled: true
enabledBanner: true # 是否开启banner打印,默认true
enabledCollect: true # 是否开启监控指标采集,默认false
collectorTypes: internal_logging # 监控数据采集器类型(logging | micrometer | internal_logging),默认micrometer
monitorInterval: 5
monitorInterval: 5
rabbitmqTp: # rabbitmq 线程池配置 配置自行修改
- threadPoolName: rabbitConnectionFactoryTp
threadPoolAliasName: rabbit线程池
threadNamePrefix: rabbitExecutor
corePoolSize: 8
maximumPoolSize: 16
#单位秒
keepAliveTime: 60
notifyEnabled: true
notifyItems:
- type: change
enable: true
- type: liveness
enabled: true
threshold: 10
interval: 5
- type: capacity
enable: true
threshold: 10

0 comments on commit 13c00e3

Please sign in to comment.