Skip to content

Commit

Permalink
polish client bootstrap
Browse files Browse the repository at this point in the history
  • Loading branch information
ipipman committed Jul 14, 2024
1 parent 5f34d43 commit 67c8d6d
Show file tree
Hide file tree
Showing 12 changed files with 139 additions and 70 deletions.
10 changes: 10 additions & 0 deletions mq-client-spring-demo/src/main/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,13 @@ spring:

server:
port: 8081



mq:
client:
host: "127.0.0.1"
port: 8765
pool-max-total: 10
pool-max-idle: 5
pool-min-idle: 2
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package cn.ipman.mq.client.annotation;

import cn.ipman.mq.client.broker.MQListenerProcessor;
import cn.ipman.mq.client.config.MQClientConfig;
import cn.ipman.mq.client.config.MQClientBootstrapConfig;
import org.springframework.context.annotation.Import;

import java.lang.annotation.*;
Expand All @@ -16,7 +16,7 @@
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Inherited
@Import({MQClientConfig.class, MQListenerProcessor.class})
@Import({MQClientBootstrapConfig.class, MQListenerProcessor.class})
@SuppressWarnings("unused")
public @interface EnableMqMan {
}
42 changes: 20 additions & 22 deletions mq-client/src/main/java/cn/ipman/mq/client/broker/MQBroker.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,17 @@ public class MQBroker {
@Getter
public static MQBroker Default = new MQBroker();

/**
* 网络请求客户端
*/
public ClientService clientService = new NettyClientImpl("127.0.0.1", 6666);
public ClientService clientService;

public MQBroker() {
this.clientService = new NettyClientImpl(
"127.0.0.1", 8765, 10, 5, 2);
}

public MQBroker(ClientService clientService) {
this.clientService = clientService;
}

/**
* 消息代理服务的URL。
*/
public static String brokerUrl = "http://localhost:8765/mq";

/**
* 所有消费者的集合。
Expand All @@ -42,25 +44,21 @@ public class MQBroker {
/**
* 添加消费者到指定主题。
*
* @param topic 主题。
* @param consumer 消费者。
* @param topic 主题。
* @param consumer 消费者。
*/
public void addConsumer(String topic, MQConsumer<?> consumer) {
consumers.add(topic, consumer);
}

static {
init();
}

/**
* 初始化消息代理,启动定时任务轮询消费者以处理消息。
*/
public static void init() {
public void init() {
// 定时轮询消息队列,并调用监听器处理消息
ThreadUtils.getDefault().init(1);
ThreadUtils.getDefault().schedule(() -> {
MultiValueMap<String, MQConsumer<?>> consumers = getDefault().consumers;
// 遍历所有topic下的消费者, 分别取server端获取数据, 并调用监听器处理消息
consumers.forEach((topic, c) -> c.forEach(consumer -> {
// 消费数据
Expand Down Expand Up @@ -110,7 +108,7 @@ public MQConsumer<?> createConsumer(String topic, int customCid) {
/**
* 发送消息到指定主题。
*
* @param topic 消息主题。
* @param topic 消息主题。
* @param message 消息对象。
* @return 发送是否成功。
*/
Expand All @@ -121,7 +119,7 @@ public boolean send(String topic, Message<?> message) {
/**
* 订阅指定主题。
*
* @param topic 主题。
* @param topic 主题。
* @param consumerId 消费者ID。
*/
public void subscribe(String topic, String consumerId) {
Expand All @@ -131,7 +129,7 @@ public void subscribe(String topic, String consumerId) {
/**
* 接收指定主题的消息。
*
* @param topic 主题。
* @param topic 主题。
* @param consumerId 消费者ID。
* @return 消息对象。
*/
Expand All @@ -143,7 +141,7 @@ public <T> Message<T> receive(String topic, String consumerId) {
/**
* 取消订阅指定主题。
*
* @param topic 主题。
* @param topic 主题。
* @param consumerId 消费者ID。
*/
public void unSubscribe(String topic, String consumerId) {
Expand All @@ -153,9 +151,9 @@ public void unSubscribe(String topic, String consumerId) {
/**
* 确认消息消费。
*
* @param topic 主题。
* @param topic 主题。
* @param consumerId 消费者ID。
* @param offset 消息偏移量。
* @param offset 消息偏移量。
* @return 确认是否成功。
*/
public boolean ack(String topic, String consumerId, int offset) {
Expand All @@ -166,7 +164,7 @@ public boolean ack(String topic, String consumerId, int offset) {
/**
* 获取指定主题和消费者ID的统计信息。
*
* @param topic 主题。
* @param topic 主题。
* @param consumerId 消费者ID。
* @return 统计信息对象。
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,8 @@ public class NettyClientImpl implements ClientService {
int port;
NettyMQClientPool clientPool;

int maxTotal = 10;
int maxIdle = 5;
int minIdle = 2;

public NettyClientImpl(String host, int port) {
public NettyClientImpl(String host, int port, int maxTotal, int maxIdle, int minIdle) {
this.host = host;
this.port = port;
this.clientPool = new NettyMQClientPool(host, port, maxTotal, maxIdle, minIdle);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package cn.ipman.mq.client.config;

import cn.ipman.mq.client.broker.MQBroker;
import cn.ipman.mq.client.broker.MQListenerContainerFactory;
import cn.ipman.mq.client.broker.MQProducer;
import cn.ipman.mq.client.client.ClientService;
import cn.ipman.mq.client.client.netty.NettyClientImpl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;

/**
* Description for this class
*
* @Author IpMan
* @Date 2024/7/14 08:47
*/
@Configuration
@Import({MQConfigProperties.class})
public class MQClientBootstrapConfig {

@Autowired
private MQConfigProperties mqConfigProperties;

@Bean
public ClientService clientService() {
return new NettyClientImpl(
mqConfigProperties.getHost(),
mqConfigProperties.getPort(),
mqConfigProperties.getPoolMaxTotal(),
mqConfigProperties.getPoolMaxIdle(),
mqConfigProperties.getPoolMinIdle()
);
}

@Bean(initMethod = "init")
public MQBroker brokerFactory(@Autowired ClientService clientService) {
return new MQBroker(clientService);
}

@Bean
public MQProducer producerFactory(@Autowired MQBroker broker) {
return broker.createProducer();
}

@Bean
public MQListenerContainerFactory listenerContainerFactory(@Autowired MQBroker broker) {
return new MQListenerContainerFactory(broker);
}

}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package cn.ipman.mq.client.config;

import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

/**
* Description for this class
*
* @Author IpMan
* @Date 2024/7/14 10:21
*/

@Data
@Configuration
@ConfigurationProperties(prefix = "mq.client")
public class MQConfigProperties {

private String host = "127.0.0.1";
private int port = 8765;
private int poolMaxTotal = 10;
private int poolMaxIdle = 5;
private int poolMinIdle = 2;

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,19 @@
public class NettyMQServer {

int port;
int bossThreads;
int workerThreads;

public NettyMQServer(int port) {
public NettyMQServer(int port, int bossThreads, int workerThreads) {
this.port = port;
this.bossThreads = bossThreads;
this.workerThreads = workerThreads;
}

public void run() throws Exception {

EventLoopGroup bossGroup = new NioEventLoopGroup(2);
EventLoopGroup workerGroup = new NioEventLoopGroup(16);
EventLoopGroup bossGroup = new NioEventLoopGroup(bossThreads);
EventLoopGroup workerGroup = new NioEventLoopGroup(workerThreads);

try {
ServerBootstrap b = new ServerBootstrap();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cn.ipman.mq.server.server;

import org.jetbrains.annotations.NotNull;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
Expand All @@ -16,11 +17,20 @@
@Component
public class NettyServerBootstrap implements ApplicationListener<ApplicationEvent> {

@Value("${mq.server.port}")
private int serverPort;

@Value("${mq.server.boss.threads}")
private int bossThreads;

@Value("${mq.server.worker.threads}")
private int workerThreads;

@Override
public void onApplicationEvent(@NotNull ApplicationEvent event) {
if (event instanceof ApplicationReadyEvent) {
Thread thread = new Thread(() -> {
NettyMQServer server = new NettyMQServer(6666);
NettyMQServer server = new NettyMQServer(serverPort, bossThreads, workerThreads);
try {
server.run();
} catch (Exception e) {
Expand Down
10 changes: 9 additions & 1 deletion mq-server/src/main/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,12 @@ spring:
name: mq-server

server:
port: 8765
port: 9765

mq:
server:
port: 8765
boss:
threads: 2
worker:
threads: 16
Loading

0 comments on commit 67c8d6d

Please sign in to comment.