Skip to content

Commit

Permalink
support multiple file manager
Browse files Browse the repository at this point in the history
  • Loading branch information
ipipman committed Jul 7, 2024
1 parent 08d5bec commit 9a27b0e
Show file tree
Hide file tree
Showing 46 changed files with 1,622 additions and 338 deletions.
Binary file added cn.ipman.test/0.dat
Binary file not shown.
1 change: 1 addition & 0 deletions im.order/0.dat
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
0000000088{"body":"{\"id\":0,\"item\":\"item0\",\"price\":0.0}","headers":{"X-offset":"0"},"id":0}0000000091{"body":"{\"id\":1,\"item\":\"item1\",\"price\":100.0}","headers":{"X-offset":"98"},"id":1}0000000092{"body":"{\"id\":2,\"item\":\"item2\",\"price\":200.0}","headers":{"X-offset":"199"},"id":2}0000000092{"body":"{\"id\":3,\"item\":\"item3\",\"price\":300.0}","headers":{"X-offset":"301"},"id":3}0000000092{"body":"{\"id\":4,\"item\":\"item4\",\"price\":400.0}","headers":{"X-offset":"403"},"id":4}0000000092{"body":"{\"id\":5,\"item\":\"item5\",\"price\":500.0}","headers":{"X-offset":"505"},"id":5}0000000092{"body":"{\"id\":6,\"item\":\"item6\",\"price\":600.0}","headers":{"X-offset":"607"},"id":6}0000000092{"body":"{\"id\":7,\"item\":\"item7\",\"price\":700.0}","headers":{"X-offset":"709"},"id":7}0000000092{"body":"{\"id\":8,\"item\":\"item8\",\"price\":800.0}","headers":{"X-offset":"811"},"id":8}0000000092{"body":"{\"id\":9,\"item\":\"item9\",\"price\":900.0}","headers":{"X-offset":"913"},"id":9}0000000097{"body":"{\"id\":10,\"item\":\"item10\",\"price\":1000.0}","headers":{"X-offset":"1015"},"id":10}0000000097{"body":"{\"id\":11,\"item\":\"item11\",\"price\":1100.0}","headers":{"X-offset":"1122"},"id":11}0000000097{"body":"{\"id\":12,\"item\":\"item12\",\"price\":1200.0}","headers":{"X-offset":"1229"},"id":12}0000000097{"body":"{\"id\":13,\"item\":\"item13\",\"price\":1300.0}","headers":{"X-offset":"1336"},"id":13}0000000097{"body":"{\"id\":14,\"item\":\"item14\",\"price\":1400.0}","headers":{"X-offset":"1443"},"id":14}0000000097{"body":"{\"id\":15,\"item\":\"item15\",\"price\":1500.0}","headers":{"X-offset":"1550"},"id":15}0000000097{"body":"{\"id\":16,\"item\":\"item16\",\"price\":1600.0}","headers":{"X-offset":"1657"},"id":16}0000000097{"body":"{\"id\":17,\"item\":\"item17\",\"price\":1700.0}","headers":{"X-offset":"1764"},"id":17}0000000097{"body":"{\"id\":18,\"item\":\"item18\",\"price\":1800.0}","headers":{"X-offset":"1871"},"id":18}0000000097{"body":"{\"id\":19,\"item\":\"item19\",\"price\":1900.0}","headers":{"X-offset":"1978"},"id":19}0000000097{"body":"{\"id\":20,\"item\":\"item20\",\"price\":2000.0}","headers":{"X-offset":"2085"},"id":20}0000000097{"body":"{\"id\":21,\"item\":\"item21\",\"price\":2100.0}","headers":{"X-offset":"2192"},"id":21}0000000097{"body":"{\"id\":22,\"item\":\"item22\",\"price\":2200.0}","headers":{"X-offset":"2299"},"id":22}0000000097{"body":"{\"id\":23,\"item\":\"item23\",\"price\":2300.0}","headers":{"X-offset":"2406"},"id":23}0000000097{"body":"{\"id\":24,\"item\":\"item24\",\"price\":2400.0}","headers":{"X-offset":"2513"},"id":24}0000000097{"body":"{\"id\":25,\"item\":\"item25\",\"price\":2500.0}","headers":{"X-offset":"2620"},"id":25}0000000097{"body":"{\"id\":26,\"item\":\"item26\",\"price\":2600.0}","headers":{"X-offset":"2727"},"id":26}0000000097{"body":"{\"id\":27,\"item\":\"item27\",\"price\":2700.0}","headers":{"X-offset":"2834"},"id":27}0000000097{"body":"{\"id\":28,\"item\":\"item28\",\"price\":2800.0}","headers":{"X-offset":"2941"},"id":28}0000000097{"body":"{\"id\":29,\"item\":\"item29\",\"price\":2900.0}","headers":{"X-offset":"3048"},"id":29}0000000097{"body":"{\"id\":30,\"item\":\"item30\",\"price\":3000.0}","headers":{"X-offset":"3155"},"id":30}0000000097{"body":"{\"id\":31,\"item\":\"item31\",\"price\":3100.0}","headers":{"X-offset":"3262"},"id":31}0000000097{"body":"{\"id\":32,\"item\":\"item32\",\"price\":3200.0}","headers":{"X-offset":"3369"},"id":32}0000000097{"body":"{\"id\":33,\"item\":\"item33\",\"price\":3300.0}","headers":{"X-offset":"3476"},"id":33}0000000097{"body":"{\"id\":34,\"item\":\"item34\",\"price\":3400.0}","headers":{"X-offset":"3583"},"id":34}0000000097{"body":"{\"id\":35,\"item\":\"item35\",\"price\":3500.0}","headers":{"X-offset":"3690"},"id":35}0000000097{"body":"{\"id\":36,\"item\":\"item36\",\"price\":3600.0}","headers":{"X-offset":"3797"},"id":36}0000000097{"body":"{\"id\":37,\"item\":\"item37\",\"price\":3700.0}","headers":{"X-offset":"3904"},"id":37}0000000097{"body":"{\"id\":38,\"item\":\"item38\",\"price\":3800.0}","headers":{"X-offset":"4011"},"id":38}0000000097{"body":"{\"id\":39,\"item\":\"item39\",\"price\":3900.0}","headers":{"X-offset":"4118"},"id":39}0000000097{"body":"{\"id\":40,\"item\":\"item40\",\"price\":4000.0}","headers":{"X-offset":"4225"},"id":40}0000000097{"body":"{\"id\":41,\"item\":\"item41\",\"price\":4100.0}","headers":{"X-offset":"4332"},"id":41}0000000097{"body":"{\"id\":42,\"item\":\"item42\",\"price\":4200.0}","headers":{"X-offset":"4439"},"id":42}0000000097{"body":"{\"id\":43,\"item\":\"item43\",\"price\":4300.0}","headers":{"X-offset":"4546"},"id":43}0000000097{"body":"{\"id\":44,\"item\":\"item44\",\"price\":4400.0}","headers":{"X-offset":"4653"},"id":44}0000000097{"body":"{\"id\":45,\"item\":\"item45\",\"price\":4500.0}","headers":{"X-offset":"4760"},"id":45}0000000097{"body":"{\"id\":46,\"item\":\"item46\",\"price\":4600.0}","headers":{"X-offset":"4867"},"id":46}0000000097{"body":"{\"id\":47,\"item\":\"item47\",\"price\":4700.0}","headers":{"X-offset":"4974"},"id":47}0000000097{"body":"{\"id\":48,\"item\":\"item48\",\"price\":4800.0}","headers":{"X-offset":"5081"},"id":48}0000000097{"body":"{\"id\":49,\"item\":\"item49\",\"price\":4900.0}","headers":{"X-offset":"5188"},"id":49}0000000097{"body":"{\"id\":50,\"item\":\"item50\",\"price\":5000.0}","headers":{"X-offset":"5295"},"id":50}0000000097{"body":"{\"id\":51,\"item\":\"item51\",\"price\":5100.0}","headers":{"X-offset":"5402"},"id":51}0000000097{"body":"{\"id\":52,\"item\":\"item52\",\"price\":5200.0}","headers":{"X-offset":"5509"},"id":52}0000000097{"body":"{\"id\":53,\"item\":\"item53\",\"price\":5300.0}","headers":{"X-offset":"5616"},"id":53}0000000097{"body":"{\"id\":54,\"item\":\"item54\",\"price\":5400.0}","headers":{"X-offset":"5723"},"id":54}0000000097{"body":"{\"id\":55,\"item\":\"item55\",\"price\":5500.0}","headers":{"X-offset":"5830"},"id":55}0000000097{"body":"{\"id\":56,\"item\":\"item56\",\"price\":5600.0}","headers":{"X-offset":"5937"},"id":56}0000000097{"body":"{\"id\":57,\"item\":\"item57\",\"price\":5700.0}","headers":{"X-offset":"6044"},"id":57}0000000097{"body":"{\"id\":58,\"item\":\"item58\",\"price\":5800.0}","headers":{"X-offset":"6151"},"id":58}0000000097{"body":"{\"id\":59,\"item\":\"item59\",\"price\":5900.0}","headers":{"X-offset":"6258"},"id":59}0000000097{"body":"{\"id\":60,\"item\":\"item60\",\"price\":6000.0}","headers":{"X-offset":"6365"},"id":60}0000000097{"body":"{\"id\":61,\"item\":\"item61\",\"price\":6100.0}","headers":{"X-offset":"6472"},"id":61}0000000097{"body":"{\"id\":62,\"item\":\"item62\",\"price\":6200.0}","headers":{"X-offset":"6579"},"id":62}0000000097{"body":"{\"id\":63,\"item\":\"item63\",\"price\":6300.0}","headers":{"X-offset":"6686"},"id":63}0000000097{"body":"{\"id\":64,\"item\":\"item64\",\"price\":6400.0}","headers":{"X-offset":"6793"},"id":64}0000000097{"body":"{\"id\":65,\"item\":\"item65\",\"price\":6500.0}","headers":{"X-offset":"6900"},"id":65}0000000097{"body":"{\"id\":66,\"item\":\"item66\",\"price\":6600.0}","headers":{"X-offset":"7007"},"id":66}0000000097{"body":"{\"id\":67,\"item\":\"item67\",\"price\":6700.0}","headers":{"X-offset":"7114"},"id":67}0000000097{"body":"{\"id\":68,\"item\":\"item68\",\"price\":6800.0}","headers":{"X-offset":"7221"},"id":68}0000000097{"body":"{\"id\":69,\"item\":\"item69\",\"price\":6900.0}","headers":{"X-offset":"7328"},"id":69}0000000097{"body":"{\"id\":70,\"item\":\"item70\",\"price\":7000.0}","headers":{"X-offset":"7435"},"id":70}0000000097{"body":"{\"id\":71,\"item\":\"item71\",\"price\":7100.0}","headers":{"X-offset":"7542"},"id":71}0000000097{"body":"{\"id\":72,\"item\":\"item72\",\"price\":7200.0}","headers":{"X-offset":"7649"},"id":72}0000000097{"body":"{\"id\":73,\"item\":\"item73\",\"price\":7300.0}","headers":{"X-offset":"7756"},"id":73}0000000097{"body":"{\"id\":74,\"item\":\"item74\",\"price\":7400.0}","headers":{"X-offset":"7863"},"id":74}0000000097{"body":"{\"id\":75,\"item\":\"item75\",\"price\":7500.0}","headers":{"X-offset":"7970"},"id":75}0000000097{"body":"{\"id\":76,\"item\":\"item76\",\"price\":7600.0}","headers":{"X-offset":"8077"},"id":76}0000000097{"body":"{\"id\":77,\"item\":\"item77\",\"price\":7700.0}","headers":{"X-offset":"8184"},"id":77}0000000097{"body":"{\"id\":78,\"item\":\"item78\",\"price\":7800.0}","headers":{"X-offset":"8291"},"id":78}0000000097{"body":"{\"id\":79,\"item\":\"item79\",\"price\":7900.0}","headers":{"X-offset":"8398"},"id":79}0000000097{"body":"{\"id\":80,\"item\":\"item80\",\"price\":8000.0}","headers":{"X-offset":"8505"},"id":80}0000000097{"body":"{\"id\":81,\"item\":\"item81\",\"price\":8100.0}","headers":{"X-offset":"8612"},"id":81}0000000097{"body":"{\"id\":82,\"item\":\"item82\",\"price\":8200.0}","headers":{"X-offset":"8719"},"id":82}0000000097{"body":"{\"id\":83,\"item\":\"item83\",\"price\":8300.0}","headers":{"X-offset":"8826"},"id":83}0000000097{"body":"{\"id\":84,\"item\":\"item84\",\"price\":8400.0}","headers":{"X-offset":"8933"},"id":84}0000000097{"body":"{\"id\":85,\"item\":\"item85\",\"price\":8500.0}","headers":{"X-offset":"9040"},"id":85}0000000097{"body":"{\"id\":86,\"item\":\"item86\",\"price\":8600.0}","headers":{"X-offset":"9147"},"id":86}0000000097{"body":"{\"id\":87,\"item\":\"item87\",\"price\":8700.0}","headers":{"X-offset":"9254"},"id":87}0000000097{"body":"{\"id\":88,\"item\":\"item88\",\"price\":8800.0}","headers":{"X-offset":"9361"},"id":88}0000000097{"body":"{\"id\":89,\"item\":\"item89\",\"price\":8900.0}","headers":{"X-offset":"9468"},"id":89}0000000097{"body":"{\"id\":90,\"item\":\"item90\",\"price\":9000.0}","headers":{"X-offset":"9575"},"id":90}0000000097{"body":"{\"id\":91,\"item\":\"item91\",\"price\":9100.0}","headers":{"X-offset":"9682"},"id":91}0000000097{"body":"{\"id\":92,\"item\":\"item92\",\"price\":9200.0}","headers":{"X-offset":"9789"},"id":92}0000000097{"body":"{\"id\":93,\"item\":\"item93\",\"price\":9300.0}","headers":{"X-offset":"9896"},"id":93}0000000098{"body":"{\"id\":94,\"item\":\"item94\",\"price\":9400.0}","headers":{"X-offset":"10003"},"id":94}0000000098{"body":"{\"id\":95,\"item\":\"item95\",\"price\":9500.0}","headers":{"X-offset":"10111"},"id":95}
Binary file added im.order/1.dat
Binary file not shown.
12 changes: 12 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,18 @@
<version>2.0.12</version>
<scope>compile</scope>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.68.Final</version>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.11.1</version>
</dependency>
</dependencies>

<build>
Expand Down
11 changes: 11 additions & 0 deletions src/main/java/cn/ipman/mq/annotation/MQListener.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package cn.ipman.mq.annotation;

import java.lang.annotation.*;

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
@Documented
@Inherited
public @interface MQListener {
String[] topic();
}
82 changes: 27 additions & 55 deletions src/main/java/cn/ipman/mq/client/Broker.java → ...ain/java/cn/ipman/mq/broker/MQBroker.java
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
package cn.ipman.mq.client;
package cn.ipman.mq.broker;

import cn.ipman.mq.client.ClientService;
import cn.ipman.mq.client.netty.NettyClientImpl;
import cn.ipman.mq.model.Message;
import cn.ipman.mq.model.Result;
import cn.ipman.mq.model.Statistical;
import cn.ipman.mq.utils.HttpUtils;
import cn.ipman.mq.utils.ThreadUtils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import lombok.Getter;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
Expand All @@ -18,13 +16,18 @@
* @Author IpMan
* @Date 2024/6/29 19:44
*/
public class Broker {
public class MQBroker {

/**
* 默认的消息代理实例。
*/
@Getter
public static Broker Default = new Broker();
public static MQBroker Default = new MQBroker();

/**
* 网络请求客户端
*/
public ClientService clientService = new NettyClientImpl("127.0.0.1", 6666);

/**
* 消息代理服务的URL。
Expand All @@ -34,15 +37,15 @@ public class Broker {
/**
* 所有消费者的集合。
*/
final MultiValueMap<String, Consumer<?>> consumers = new LinkedMultiValueMap<>();
final MultiValueMap<String, MQConsumer<?>> consumers = new LinkedMultiValueMap<>();

/**
* 添加消费者到指定主题。
*
* @param topic 主题。
* @param consumer 消费者。
*/
public void addConsumer(String topic, Consumer<?> consumer) {
public void addConsumer(String topic, MQConsumer<?> consumer) {
consumers.add(topic, consumer);
}

Expand All @@ -57,17 +60,18 @@ public static void init() {
// 定时轮询消息队列,并调用监听器处理消息
ThreadUtils.getDefault().init(1);
ThreadUtils.getDefault().schedule(() -> {
MultiValueMap<String, Consumer<?>> consumers = getDefault().consumers;
MultiValueMap<String, MQConsumer<?>> consumers = getDefault().consumers;
// 遍历所有topic下的消费者, 分别取server端获取数据, 并调用监听器处理消息
consumers.forEach((topic, c) -> c.forEach(consumer -> {
// 消费数据
Message<?> receive = consumer.receive(topic);
if (receive == null) return;
try {
// 通知监听器处理消息
consumer.listener.onMessage(receive);
consumer.ack(topic, receive);
} catch (Exception e) {
//todo
//todo retry
}
}));
}, 100, 100);
Expand All @@ -79,8 +83,8 @@ public static void init() {
*
* @return 生产者实例。
*/
public Producer createProducer() {
return new Producer(this);
public MQProducer createProducer() {
return new MQProducer(this);
}

/**
Expand All @@ -90,20 +94,19 @@ public Producer createProducer() {
* @return 消费者实例。
*/
@SuppressWarnings("unused")
public Consumer<?> createConsumer(String topic) {
Consumer<?> consumer = new Consumer<>(this);
public MQConsumer<?> createConsumer(String topic) {
MQConsumer<?> consumer = new MQConsumer<>(this);
consumer.subscribe(topic);
return consumer;
}

public Consumer<?> createConsumer(String topic, int customCid) {
Consumer<?> consumer = new Consumer<>(this, customCid);
public MQConsumer<?> createConsumer(String topic, int customCid) {
MQConsumer<?> consumer = new MQConsumer<>(this, customCid);
consumer.subscribe(topic);
return consumer;
}



/**
* 发送消息到指定主题。
*
Expand All @@ -112,13 +115,7 @@ public Consumer<?> createConsumer(String topic, int customCid) {
* @return 发送是否成功。
*/
public boolean send(String topic, Message<?> message) {
System.out.println(" ==>> send topic/message: " + topic + "/" + message);
System.out.println(JSON.toJSONString(message));
Result<String> result = HttpUtils.httpPost(JSON.toJSONString(message),
brokerUrl + "/send?t=" + topic, new TypeReference<Result<String>>() {
});
System.out.println(" ==>> send result: " + result);
return result.getCode() == 1;
return clientService.send(topic, message);
}

/**
Expand All @@ -128,11 +125,7 @@ public boolean send(String topic, Message<?> message) {
* @param consumerId 消费者ID。
*/
public void subscribe(String topic, String consumerId) {
System.out.println(" ==>> subscribe topic/consumerID: " + topic + "/" + consumerId);
Result<String> result = HttpUtils.httpGet(brokerUrl + "/sub?t=" + topic + "&cid=" + consumerId,
new TypeReference<Result<String>>() {
});
System.out.println(" ==>> subscribe result: " + result);
clientService.subscribe(topic, consumerId);
}

/**
Expand All @@ -144,12 +137,7 @@ public void subscribe(String topic, String consumerId) {
*/
@SuppressWarnings("unchecked")
public <T> Message<T> receive(String topic, String consumerId) {
System.out.println(" ==>> receive topic/cid: " + topic + "/" + consumerId);
Result<Message<String>> result = HttpUtils.httpGet(brokerUrl + "/receive?t=" + topic + "&cid=" + consumerId,
new TypeReference<Result<Message<String>>>() {
});
System.out.println(" ==>> receive result: " + result);
return (Message<T>) result.getData();
return clientService.receive(topic, consumerId);
}

/**
Expand All @@ -159,11 +147,7 @@ public <T> Message<T> receive(String topic, String consumerId) {
* @param consumerId 消费者ID。
*/
public void unSubscribe(String topic, String consumerId) {
System.out.println(" ==>> unSubscribe topic/cid: " + topic + "/" + consumerId);
Result<String> result = HttpUtils.httpGet(brokerUrl + "/unsub?t=" + topic + "&cid=" + consumerId,
new TypeReference<Result<String>>() {
});
System.out.println(" ==>> unSubscribe result: " + result);
clientService.unSubscribe(topic, consumerId);
}

/**
Expand All @@ -175,13 +159,7 @@ public void unSubscribe(String topic, String consumerId) {
* @return 确认是否成功。
*/
public boolean ack(String topic, String consumerId, int offset) {
System.out.println(" ==>> ack topic/cid/offset: " + topic + "/" + consumerId + "/" + offset);
Result<String> result = HttpUtils.httpGet(
brokerUrl + "/ack?t=" + topic + "&cid=" + consumerId + "&offset=" + offset,
new TypeReference<Result<String>>() {
});
System.out.println(" ==>> ack result: " + result);
return result.getCode() == 1;
return clientService.ack(topic, consumerId, offset);
}


Expand All @@ -193,12 +171,6 @@ public boolean ack(String topic, String consumerId, int offset) {
* @return 统计信息对象。
*/
public Statistical statistical(String topic, String consumerId) {
System.out.println(" ==>> statistical topic/cid: " + topic + "/" + consumerId);
Result<Statistical> result = HttpUtils.httpGet(
brokerUrl + "/stat?t=" + topic + "&cid=" + consumerId,
new TypeReference<Result<Statistical>>() {
});
System.out.println(" ==>> statistical result: " + result);
return result.getData();
return clientService.statistical(topic, consumerId);
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package cn.ipman.mq.client;
package cn.ipman.mq.broker;

import cn.ipman.mq.model.Message;
import cn.ipman.mq.model.Statistical;
Expand All @@ -15,7 +15,7 @@
* @Author IpMan
* @Date 2024/6/29 19:55
*/
public class Consumer<T> {
public class MQConsumer<T> {

/**
* 使用AtomicInteger来全局唯一标识消费者
Expand All @@ -30,7 +30,7 @@ public class Consumer<T> {
/**
* 与之交互的Broker实例
*/
Broker broker;
MQBroker broker;

/**
* Consumer构造函数。
Expand All @@ -39,13 +39,13 @@ public class Consumer<T> {
*
* @param broker 与消费者交互的Broker实例。
*/
public Consumer(Broker broker) {
public MQConsumer(MQBroker broker) {
this.broker = broker;
this.id = "CID" + CID.getAndIncrement();
}


public Consumer(Broker broker, int customCid) {
public MQConsumer(MQBroker broker, int customCid) {
this.broker = broker;
this.id = "CID" + customCid;
}
Expand Down Expand Up @@ -119,7 +119,7 @@ public boolean ack(String topic, Message<?> message) {
/**
* 消息监听器,用于异步处理接收到的消息
*/
public Listener<?> listener;
public MQListener<?> listener;

/**
* 注册消息监听器。
Expand All @@ -129,7 +129,7 @@ public boolean ack(String topic, Message<?> message) {
* @param topic 监听的消息主题。
* @param listener 用于处理消息的监听器实例。
*/
public void listen(String topic, Listener<?> listener) {
public void addListen(String topic, MQListener<?> listener) {
this.listener = listener;
broker.addConsumer(topic, this);
}
Expand Down
Loading

0 comments on commit 9a27b0e

Please sign in to comment.