From 9a27b0e07943efb918a61163f79af1baba57ccb8 Mon Sep 17 00:00:00 2001 From: ipipman Date: Mon, 8 Jul 2024 04:44:06 +0800 Subject: [PATCH] support multiple file manager --- cn.ipman.test/0.dat | Bin 0 -> 10240 bytes im.order/0.dat | 1 + im.order/1.dat | Bin 0 -> 10240 bytes pom.xml | 12 ++ .../cn/ipman/mq/annotation/MQListener.java | 11 + .../Broker.java => broker/MQBroker.java} | 82 +++----- .../Consumer.java => broker/MQConsumer.java} | 14 +- .../Listener.java => broker/MQListener.java} | 11 +- .../mq/broker/MQListenerContainerFactory.java | 38 ++++ .../ipman/mq/broker/MQListenerEndpoint.java | 23 +++ .../ipman/mq/broker/MQListenerProcessor.java | 57 +++++ .../Producer.java => broker/MQProducer.java} | 8 +- .../cn/ipman/mq/client/ClientService.java | 69 +++++++ .../ipman/mq/client/http/HttpClientImpl.java | 128 ++++++++++++ .../mq/client/netty/NettyClientImpl.java | 161 +++++++++++++++ .../ipman/mq/client/netty/NettyMQClient.java | 100 +++++++++ .../mq/client/netty/NettyMQClientFactory.java | 50 +++++ .../mq/client/netty/NettyMQClientHandler.java | 54 +++++ .../mq/client/netty/NettyMQClientPool.java | 37 ++++ .../java/cn/ipman/mq/config/BrokerConfig.java | 21 ++ .../cn/ipman/mq/config/ConsumerConfig.java | 24 +++ .../cn/ipman/mq/config/ProducerConfig.java | 25 +++ .../java/cn/ipman/mq/demo/ConsumerDemo1.java | 18 +- .../java/cn/ipman/mq/demo/ConsumerDemo2.java | 33 +-- .../java/cn/ipman/mq/demo/ConsumerDemo3.java | 14 +- .../cn/ipman/mq/demo/NettyClientDemo.java | 194 ++++++++++++++++++ src/main/java/cn/ipman/mq/demo/Order.java | 0 .../mq/demo/SpringAnnotationListenerDemo.java | 23 +++ .../java/cn/ipman/mq/model/Constants.java | 17 ++ .../java/cn/ipman/mq/model/HttpResult.java | 44 ++++ src/main/java/cn/ipman/mq/model/Message.java | 0 .../java/cn/ipman/mq/model/NettyRequest.java | 21 ++ .../java/cn/ipman/mq/model/NettyResponse.java | 15 ++ src/main/java/cn/ipman/mq/model/Result.java | 44 ---- .../java/cn/ipman/mq/model/Statistical.java | 0 .../java/cn/ipman/mq/model/Subscription.java | 0 .../java/cn/ipman/mq/server/MQServer.java | 66 +++--- .../java/cn/ipman/mq/server/MessageQueue.java | 91 ++++---- .../cn/ipman/mq/server/NettyMQServer.java | 75 +++++++ .../ipman/mq/server/NettyMQServerHandler.java | 113 ++++++++++ .../ipman/mq/server/NettyServerBootstrap.java | 33 +++ src/main/java/cn/ipman/mq/store/Indexer.java | 54 +---- .../java/cn/ipman/mq/store/MessageStore.java | 177 ++++++++-------- .../java/cn/ipman/mq/store/StoreDemo.java | 2 +- .../java/cn/ipman/mq/utils/HttpUtils.java | 0 .../java/cn/ipman/mq/utils/ThreadUtils.java | 0 46 files changed, 1622 insertions(+), 338 deletions(-) create mode 100644 cn.ipman.test/0.dat create mode 100644 im.order/0.dat create mode 100644 im.order/1.dat create mode 100644 src/main/java/cn/ipman/mq/annotation/MQListener.java rename src/main/java/cn/ipman/mq/{client/Broker.java => broker/MQBroker.java} (53%) mode change 100644 => 100755 rename src/main/java/cn/ipman/mq/{client/Consumer.java => broker/MQConsumer.java} (93%) mode change 100644 => 100755 rename src/main/java/cn/ipman/mq/{client/Listener.java => broker/MQListener.java} (75%) mode change 100644 => 100755 create mode 100644 src/main/java/cn/ipman/mq/broker/MQListenerContainerFactory.java create mode 100644 src/main/java/cn/ipman/mq/broker/MQListenerEndpoint.java create mode 100644 src/main/java/cn/ipman/mq/broker/MQListenerProcessor.java rename src/main/java/cn/ipman/mq/{client/Producer.java => broker/MQProducer.java} (88%) mode change 100644 => 100755 create mode 100644 src/main/java/cn/ipman/mq/client/ClientService.java create mode 100644 src/main/java/cn/ipman/mq/client/http/HttpClientImpl.java create mode 100644 src/main/java/cn/ipman/mq/client/netty/NettyClientImpl.java create mode 100644 src/main/java/cn/ipman/mq/client/netty/NettyMQClient.java create mode 100644 src/main/java/cn/ipman/mq/client/netty/NettyMQClientFactory.java create mode 100644 src/main/java/cn/ipman/mq/client/netty/NettyMQClientHandler.java create mode 100644 src/main/java/cn/ipman/mq/client/netty/NettyMQClientPool.java create mode 100644 src/main/java/cn/ipman/mq/config/BrokerConfig.java create mode 100644 src/main/java/cn/ipman/mq/config/ConsumerConfig.java create mode 100644 src/main/java/cn/ipman/mq/config/ProducerConfig.java mode change 100644 => 100755 src/main/java/cn/ipman/mq/demo/ConsumerDemo1.java mode change 100644 => 100755 src/main/java/cn/ipman/mq/demo/ConsumerDemo2.java mode change 100644 => 100755 src/main/java/cn/ipman/mq/demo/ConsumerDemo3.java create mode 100644 src/main/java/cn/ipman/mq/demo/NettyClientDemo.java mode change 100644 => 100755 src/main/java/cn/ipman/mq/demo/Order.java create mode 100644 src/main/java/cn/ipman/mq/demo/SpringAnnotationListenerDemo.java create mode 100644 src/main/java/cn/ipman/mq/model/Constants.java create mode 100755 src/main/java/cn/ipman/mq/model/HttpResult.java mode change 100644 => 100755 src/main/java/cn/ipman/mq/model/Message.java create mode 100644 src/main/java/cn/ipman/mq/model/NettyRequest.java create mode 100644 src/main/java/cn/ipman/mq/model/NettyResponse.java delete mode 100644 src/main/java/cn/ipman/mq/model/Result.java mode change 100644 => 100755 src/main/java/cn/ipman/mq/model/Statistical.java mode change 100644 => 100755 src/main/java/cn/ipman/mq/model/Subscription.java mode change 100644 => 100755 src/main/java/cn/ipman/mq/server/MQServer.java mode change 100644 => 100755 src/main/java/cn/ipman/mq/server/MessageQueue.java create mode 100644 src/main/java/cn/ipman/mq/server/NettyMQServer.java create mode 100644 src/main/java/cn/ipman/mq/server/NettyMQServerHandler.java create mode 100644 src/main/java/cn/ipman/mq/server/NettyServerBootstrap.java mode change 100644 => 100755 src/main/java/cn/ipman/mq/store/Indexer.java mode change 100644 => 100755 src/main/java/cn/ipman/mq/store/MessageStore.java mode change 100644 => 100755 src/main/java/cn/ipman/mq/store/StoreDemo.java mode change 100644 => 100755 src/main/java/cn/ipman/mq/utils/HttpUtils.java mode change 100644 => 100755 src/main/java/cn/ipman/mq/utils/ThreadUtils.java diff --git a/cn.ipman.test/0.dat b/cn.ipman.test/0.dat new file mode 100644 index 0000000000000000000000000000000000000000..9df64990f7be3c1f7194a0c22852a1ab3a09f3c5 GIT binary patch literal 10240 zcmeIu0Sy2E0K%a6Pi+o2h(KY$fB^#r3>YwAz<>b*1`HT5V8DO@0|pEjFkrxd0RsjM P7%*VKfB^#r47?2tC;$Kf literal 0 HcmV?d00001 diff --git a/im.order/0.dat b/im.order/0.dat new file mode 100644 index 0000000..3ca459e --- /dev/null +++ b/im.order/0.dat @@ -0,0 +1 @@ +0000000088{"body":"{\"id\":0,\"item\":\"item0\",\"price\":0.0}","headers":{"X-offset":"0"},"id":0}0000000091{"body":"{\"id\":1,\"item\":\"item1\",\"price\":100.0}","headers":{"X-offset":"98"},"id":1}0000000092{"body":"{\"id\":2,\"item\":\"item2\",\"price\":200.0}","headers":{"X-offset":"199"},"id":2}0000000092{"body":"{\"id\":3,\"item\":\"item3\",\"price\":300.0}","headers":{"X-offset":"301"},"id":3}0000000092{"body":"{\"id\":4,\"item\":\"item4\",\"price\":400.0}","headers":{"X-offset":"403"},"id":4}0000000092{"body":"{\"id\":5,\"item\":\"item5\",\"price\":500.0}","headers":{"X-offset":"505"},"id":5}0000000092{"body":"{\"id\":6,\"item\":\"item6\",\"price\":600.0}","headers":{"X-offset":"607"},"id":6}0000000092{"body":"{\"id\":7,\"item\":\"item7\",\"price\":700.0}","headers":{"X-offset":"709"},"id":7}0000000092{"body":"{\"id\":8,\"item\":\"item8\",\"price\":800.0}","headers":{"X-offset":"811"},"id":8}0000000092{"body":"{\"id\":9,\"item\":\"item9\",\"price\":900.0}","headers":{"X-offset":"913"},"id":9}0000000097{"body":"{\"id\":10,\"item\":\"item10\",\"price\":1000.0}","headers":{"X-offset":"1015"},"id":10}0000000097{"body":"{\"id\":11,\"item\":\"item11\",\"price\":1100.0}","headers":{"X-offset":"1122"},"id":11}0000000097{"body":"{\"id\":12,\"item\":\"item12\",\"price\":1200.0}","headers":{"X-offset":"1229"},"id":12}0000000097{"body":"{\"id\":13,\"item\":\"item13\",\"price\":1300.0}","headers":{"X-offset":"1336"},"id":13}0000000097{"body":"{\"id\":14,\"item\":\"item14\",\"price\":1400.0}","headers":{"X-offset":"1443"},"id":14}0000000097{"body":"{\"id\":15,\"item\":\"item15\",\"price\":1500.0}","headers":{"X-offset":"1550"},"id":15}0000000097{"body":"{\"id\":16,\"item\":\"item16\",\"price\":1600.0}","headers":{"X-offset":"1657"},"id":16}0000000097{"body":"{\"id\":17,\"item\":\"item17\",\"price\":1700.0}","headers":{"X-offset":"1764"},"id":17}0000000097{"body":"{\"id\":18,\"item\":\"item18\",\"price\":1800.0}","headers":{"X-offset":"1871"},"id":18}0000000097{"body":"{\"id\":19,\"item\":\"item19\",\"price\":1900.0}","headers":{"X-offset":"1978"},"id":19}0000000097{"body":"{\"id\":20,\"item\":\"item20\",\"price\":2000.0}","headers":{"X-offset":"2085"},"id":20}0000000097{"body":"{\"id\":21,\"item\":\"item21\",\"price\":2100.0}","headers":{"X-offset":"2192"},"id":21}0000000097{"body":"{\"id\":22,\"item\":\"item22\",\"price\":2200.0}","headers":{"X-offset":"2299"},"id":22}0000000097{"body":"{\"id\":23,\"item\":\"item23\",\"price\":2300.0}","headers":{"X-offset":"2406"},"id":23}0000000097{"body":"{\"id\":24,\"item\":\"item24\",\"price\":2400.0}","headers":{"X-offset":"2513"},"id":24}0000000097{"body":"{\"id\":25,\"item\":\"item25\",\"price\":2500.0}","headers":{"X-offset":"2620"},"id":25}0000000097{"body":"{\"id\":26,\"item\":\"item26\",\"price\":2600.0}","headers":{"X-offset":"2727"},"id":26}0000000097{"body":"{\"id\":27,\"item\":\"item27\",\"price\":2700.0}","headers":{"X-offset":"2834"},"id":27}0000000097{"body":"{\"id\":28,\"item\":\"item28\",\"price\":2800.0}","headers":{"X-offset":"2941"},"id":28}0000000097{"body":"{\"id\":29,\"item\":\"item29\",\"price\":2900.0}","headers":{"X-offset":"3048"},"id":29}0000000097{"body":"{\"id\":30,\"item\":\"item30\",\"price\":3000.0}","headers":{"X-offset":"3155"},"id":30}0000000097{"body":"{\"id\":31,\"item\":\"item31\",\"price\":3100.0}","headers":{"X-offset":"3262"},"id":31}0000000097{"body":"{\"id\":32,\"item\":\"item32\",\"price\":3200.0}","headers":{"X-offset":"3369"},"id":32}0000000097{"body":"{\"id\":33,\"item\":\"item33\",\"price\":3300.0}","headers":{"X-offset":"3476"},"id":33}0000000097{"body":"{\"id\":34,\"item\":\"item34\",\"price\":3400.0}","headers":{"X-offset":"3583"},"id":34}0000000097{"body":"{\"id\":35,\"item\":\"item35\",\"price\":3500.0}","headers":{"X-offset":"3690"},"id":35}0000000097{"body":"{\"id\":36,\"item\":\"item36\",\"price\":3600.0}","headers":{"X-offset":"3797"},"id":36}0000000097{"body":"{\"id\":37,\"item\":\"item37\",\"price\":3700.0}","headers":{"X-offset":"3904"},"id":37}0000000097{"body":"{\"id\":38,\"item\":\"item38\",\"price\":3800.0}","headers":{"X-offset":"4011"},"id":38}0000000097{"body":"{\"id\":39,\"item\":\"item39\",\"price\":3900.0}","headers":{"X-offset":"4118"},"id":39}0000000097{"body":"{\"id\":40,\"item\":\"item40\",\"price\":4000.0}","headers":{"X-offset":"4225"},"id":40}0000000097{"body":"{\"id\":41,\"item\":\"item41\",\"price\":4100.0}","headers":{"X-offset":"4332"},"id":41}0000000097{"body":"{\"id\":42,\"item\":\"item42\",\"price\":4200.0}","headers":{"X-offset":"4439"},"id":42}0000000097{"body":"{\"id\":43,\"item\":\"item43\",\"price\":4300.0}","headers":{"X-offset":"4546"},"id":43}0000000097{"body":"{\"id\":44,\"item\":\"item44\",\"price\":4400.0}","headers":{"X-offset":"4653"},"id":44}0000000097{"body":"{\"id\":45,\"item\":\"item45\",\"price\":4500.0}","headers":{"X-offset":"4760"},"id":45}0000000097{"body":"{\"id\":46,\"item\":\"item46\",\"price\":4600.0}","headers":{"X-offset":"4867"},"id":46}0000000097{"body":"{\"id\":47,\"item\":\"item47\",\"price\":4700.0}","headers":{"X-offset":"4974"},"id":47}0000000097{"body":"{\"id\":48,\"item\":\"item48\",\"price\":4800.0}","headers":{"X-offset":"5081"},"id":48}0000000097{"body":"{\"id\":49,\"item\":\"item49\",\"price\":4900.0}","headers":{"X-offset":"5188"},"id":49}0000000097{"body":"{\"id\":50,\"item\":\"item50\",\"price\":5000.0}","headers":{"X-offset":"5295"},"id":50}0000000097{"body":"{\"id\":51,\"item\":\"item51\",\"price\":5100.0}","headers":{"X-offset":"5402"},"id":51}0000000097{"body":"{\"id\":52,\"item\":\"item52\",\"price\":5200.0}","headers":{"X-offset":"5509"},"id":52}0000000097{"body":"{\"id\":53,\"item\":\"item53\",\"price\":5300.0}","headers":{"X-offset":"5616"},"id":53}0000000097{"body":"{\"id\":54,\"item\":\"item54\",\"price\":5400.0}","headers":{"X-offset":"5723"},"id":54}0000000097{"body":"{\"id\":55,\"item\":\"item55\",\"price\":5500.0}","headers":{"X-offset":"5830"},"id":55}0000000097{"body":"{\"id\":56,\"item\":\"item56\",\"price\":5600.0}","headers":{"X-offset":"5937"},"id":56}0000000097{"body":"{\"id\":57,\"item\":\"item57\",\"price\":5700.0}","headers":{"X-offset":"6044"},"id":57}0000000097{"body":"{\"id\":58,\"item\":\"item58\",\"price\":5800.0}","headers":{"X-offset":"6151"},"id":58}0000000097{"body":"{\"id\":59,\"item\":\"item59\",\"price\":5900.0}","headers":{"X-offset":"6258"},"id":59}0000000097{"body":"{\"id\":60,\"item\":\"item60\",\"price\":6000.0}","headers":{"X-offset":"6365"},"id":60}0000000097{"body":"{\"id\":61,\"item\":\"item61\",\"price\":6100.0}","headers":{"X-offset":"6472"},"id":61}0000000097{"body":"{\"id\":62,\"item\":\"item62\",\"price\":6200.0}","headers":{"X-offset":"6579"},"id":62}0000000097{"body":"{\"id\":63,\"item\":\"item63\",\"price\":6300.0}","headers":{"X-offset":"6686"},"id":63}0000000097{"body":"{\"id\":64,\"item\":\"item64\",\"price\":6400.0}","headers":{"X-offset":"6793"},"id":64}0000000097{"body":"{\"id\":65,\"item\":\"item65\",\"price\":6500.0}","headers":{"X-offset":"6900"},"id":65}0000000097{"body":"{\"id\":66,\"item\":\"item66\",\"price\":6600.0}","headers":{"X-offset":"7007"},"id":66}0000000097{"body":"{\"id\":67,\"item\":\"item67\",\"price\":6700.0}","headers":{"X-offset":"7114"},"id":67}0000000097{"body":"{\"id\":68,\"item\":\"item68\",\"price\":6800.0}","headers":{"X-offset":"7221"},"id":68}0000000097{"body":"{\"id\":69,\"item\":\"item69\",\"price\":6900.0}","headers":{"X-offset":"7328"},"id":69}0000000097{"body":"{\"id\":70,\"item\":\"item70\",\"price\":7000.0}","headers":{"X-offset":"7435"},"id":70}0000000097{"body":"{\"id\":71,\"item\":\"item71\",\"price\":7100.0}","headers":{"X-offset":"7542"},"id":71}0000000097{"body":"{\"id\":72,\"item\":\"item72\",\"price\":7200.0}","headers":{"X-offset":"7649"},"id":72}0000000097{"body":"{\"id\":73,\"item\":\"item73\",\"price\":7300.0}","headers":{"X-offset":"7756"},"id":73}0000000097{"body":"{\"id\":74,\"item\":\"item74\",\"price\":7400.0}","headers":{"X-offset":"7863"},"id":74}0000000097{"body":"{\"id\":75,\"item\":\"item75\",\"price\":7500.0}","headers":{"X-offset":"7970"},"id":75}0000000097{"body":"{\"id\":76,\"item\":\"item76\",\"price\":7600.0}","headers":{"X-offset":"8077"},"id":76}0000000097{"body":"{\"id\":77,\"item\":\"item77\",\"price\":7700.0}","headers":{"X-offset":"8184"},"id":77}0000000097{"body":"{\"id\":78,\"item\":\"item78\",\"price\":7800.0}","headers":{"X-offset":"8291"},"id":78}0000000097{"body":"{\"id\":79,\"item\":\"item79\",\"price\":7900.0}","headers":{"X-offset":"8398"},"id":79}0000000097{"body":"{\"id\":80,\"item\":\"item80\",\"price\":8000.0}","headers":{"X-offset":"8505"},"id":80}0000000097{"body":"{\"id\":81,\"item\":\"item81\",\"price\":8100.0}","headers":{"X-offset":"8612"},"id":81}0000000097{"body":"{\"id\":82,\"item\":\"item82\",\"price\":8200.0}","headers":{"X-offset":"8719"},"id":82}0000000097{"body":"{\"id\":83,\"item\":\"item83\",\"price\":8300.0}","headers":{"X-offset":"8826"},"id":83}0000000097{"body":"{\"id\":84,\"item\":\"item84\",\"price\":8400.0}","headers":{"X-offset":"8933"},"id":84}0000000097{"body":"{\"id\":85,\"item\":\"item85\",\"price\":8500.0}","headers":{"X-offset":"9040"},"id":85}0000000097{"body":"{\"id\":86,\"item\":\"item86\",\"price\":8600.0}","headers":{"X-offset":"9147"},"id":86}0000000097{"body":"{\"id\":87,\"item\":\"item87\",\"price\":8700.0}","headers":{"X-offset":"9254"},"id":87}0000000097{"body":"{\"id\":88,\"item\":\"item88\",\"price\":8800.0}","headers":{"X-offset":"9361"},"id":88}0000000097{"body":"{\"id\":89,\"item\":\"item89\",\"price\":8900.0}","headers":{"X-offset":"9468"},"id":89}0000000097{"body":"{\"id\":90,\"item\":\"item90\",\"price\":9000.0}","headers":{"X-offset":"9575"},"id":90}0000000097{"body":"{\"id\":91,\"item\":\"item91\",\"price\":9100.0}","headers":{"X-offset":"9682"},"id":91}0000000097{"body":"{\"id\":92,\"item\":\"item92\",\"price\":9200.0}","headers":{"X-offset":"9789"},"id":92}0000000097{"body":"{\"id\":93,\"item\":\"item93\",\"price\":9300.0}","headers":{"X-offset":"9896"},"id":93}0000000098{"body":"{\"id\":94,\"item\":\"item94\",\"price\":9400.0}","headers":{"X-offset":"10003"},"id":94}0000000098{"body":"{\"id\":95,\"item\":\"item95\",\"price\":9500.0}","headers":{"X-offset":"10111"},"id":95} \ No newline at end of file diff --git a/im.order/1.dat b/im.order/1.dat new file mode 100644 index 0000000000000000000000000000000000000000..af3a8c5ecfa919e3b9e3df7b856f7cd357cc6a4c GIT binary patch literal 10240 zcmeIuK@Ng25I|ASGHWzNp|qUD1sjzTSr`nkF{F35iXkBwxWJ#)q-o|SPpNUN8RXiO zFUe&nq%I4|t&YOhyGPi^#p*)B+q122?)yQhgYpu|-Cau8c9IWr-Zxd%xjr0<)iJi> zqj2tX&HqyBUn^U*vUw}pYMl}@wX)TgS+vZ&Wwu&Ls;8D&ZP`W3&Rce?mFi?_S-)ct zKmY**5I_I{1Q0*~0R#|0009ILKmY**5I_I{1Q0*~0R#|0009ILKmY**5C{Tq-2r2.0.12 compile + + + io.netty + netty-all + 4.1.68.Final + + + + org.apache.commons + commons-pool2 + 2.11.1 + diff --git a/src/main/java/cn/ipman/mq/annotation/MQListener.java b/src/main/java/cn/ipman/mq/annotation/MQListener.java new file mode 100644 index 0000000..9b296e7 --- /dev/null +++ b/src/main/java/cn/ipman/mq/annotation/MQListener.java @@ -0,0 +1,11 @@ +package cn.ipman.mq.annotation; + +import java.lang.annotation.*; + +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.METHOD) +@Documented +@Inherited +public @interface MQListener { + String[] topic(); +} diff --git a/src/main/java/cn/ipman/mq/client/Broker.java b/src/main/java/cn/ipman/mq/broker/MQBroker.java old mode 100644 new mode 100755 similarity index 53% rename from src/main/java/cn/ipman/mq/client/Broker.java rename to src/main/java/cn/ipman/mq/broker/MQBroker.java index 131ec79..29d47c9 --- a/src/main/java/cn/ipman/mq/client/Broker.java +++ b/src/main/java/cn/ipman/mq/broker/MQBroker.java @@ -1,12 +1,10 @@ -package cn.ipman.mq.client; +package cn.ipman.mq.broker; +import cn.ipman.mq.client.ClientService; +import cn.ipman.mq.client.netty.NettyClientImpl; import cn.ipman.mq.model.Message; -import cn.ipman.mq.model.Result; import cn.ipman.mq.model.Statistical; -import cn.ipman.mq.utils.HttpUtils; import cn.ipman.mq.utils.ThreadUtils; -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.TypeReference; import lombok.Getter; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; @@ -18,13 +16,18 @@ * @Author IpMan * @Date 2024/6/29 19:44 */ -public class Broker { +public class MQBroker { /** * 默认的消息代理实例。 */ @Getter - public static Broker Default = new Broker(); + public static MQBroker Default = new MQBroker(); + + /** + * 网络请求客户端 + */ + public ClientService clientService = new NettyClientImpl("127.0.0.1", 6666); /** * 消息代理服务的URL。 @@ -34,7 +37,7 @@ public class Broker { /** * 所有消费者的集合。 */ - final MultiValueMap> consumers = new LinkedMultiValueMap<>(); + final MultiValueMap> consumers = new LinkedMultiValueMap<>(); /** * 添加消费者到指定主题。 @@ -42,7 +45,7 @@ public class Broker { * @param topic 主题。 * @param consumer 消费者。 */ - public void addConsumer(String topic, Consumer consumer) { + public void addConsumer(String topic, MQConsumer consumer) { consumers.add(topic, consumer); } @@ -57,9 +60,10 @@ public static void init() { // 定时轮询消息队列,并调用监听器处理消息 ThreadUtils.getDefault().init(1); ThreadUtils.getDefault().schedule(() -> { - MultiValueMap> consumers = getDefault().consumers; + MultiValueMap> consumers = getDefault().consumers; // 遍历所有topic下的消费者, 分别取server端获取数据, 并调用监听器处理消息 consumers.forEach((topic, c) -> c.forEach(consumer -> { + // 消费数据 Message receive = consumer.receive(topic); if (receive == null) return; try { @@ -67,7 +71,7 @@ public static void init() { consumer.listener.onMessage(receive); consumer.ack(topic, receive); } catch (Exception e) { - //todo + //todo retry } })); }, 100, 100); @@ -79,8 +83,8 @@ public static void init() { * * @return 生产者实例。 */ - public Producer createProducer() { - return new Producer(this); + public MQProducer createProducer() { + return new MQProducer(this); } /** @@ -90,20 +94,19 @@ public Producer createProducer() { * @return 消费者实例。 */ @SuppressWarnings("unused") - public Consumer createConsumer(String topic) { - Consumer consumer = new Consumer<>(this); + public MQConsumer createConsumer(String topic) { + MQConsumer consumer = new MQConsumer<>(this); consumer.subscribe(topic); return consumer; } - public Consumer createConsumer(String topic, int customCid) { - Consumer consumer = new Consumer<>(this, customCid); + public MQConsumer createConsumer(String topic, int customCid) { + MQConsumer consumer = new MQConsumer<>(this, customCid); consumer.subscribe(topic); return consumer; } - /** * 发送消息到指定主题。 * @@ -112,13 +115,7 @@ public Consumer createConsumer(String topic, int customCid) { * @return 发送是否成功。 */ public boolean send(String topic, Message message) { - System.out.println(" ==>> send topic/message: " + topic + "/" + message); - System.out.println(JSON.toJSONString(message)); - Result result = HttpUtils.httpPost(JSON.toJSONString(message), - brokerUrl + "/send?t=" + topic, new TypeReference>() { - }); - System.out.println(" ==>> send result: " + result); - return result.getCode() == 1; + return clientService.send(topic, message); } /** @@ -128,11 +125,7 @@ public boolean send(String topic, Message message) { * @param consumerId 消费者ID。 */ public void subscribe(String topic, String consumerId) { - System.out.println(" ==>> subscribe topic/consumerID: " + topic + "/" + consumerId); - Result result = HttpUtils.httpGet(brokerUrl + "/sub?t=" + topic + "&cid=" + consumerId, - new TypeReference>() { - }); - System.out.println(" ==>> subscribe result: " + result); + clientService.subscribe(topic, consumerId); } /** @@ -144,12 +137,7 @@ public void subscribe(String topic, String consumerId) { */ @SuppressWarnings("unchecked") public Message receive(String topic, String consumerId) { - System.out.println(" ==>> receive topic/cid: " + topic + "/" + consumerId); - Result> result = HttpUtils.httpGet(brokerUrl + "/receive?t=" + topic + "&cid=" + consumerId, - new TypeReference>>() { - }); - System.out.println(" ==>> receive result: " + result); - return (Message) result.getData(); + return clientService.receive(topic, consumerId); } /** @@ -159,11 +147,7 @@ public Message receive(String topic, String consumerId) { * @param consumerId 消费者ID。 */ public void unSubscribe(String topic, String consumerId) { - System.out.println(" ==>> unSubscribe topic/cid: " + topic + "/" + consumerId); - Result result = HttpUtils.httpGet(brokerUrl + "/unsub?t=" + topic + "&cid=" + consumerId, - new TypeReference>() { - }); - System.out.println(" ==>> unSubscribe result: " + result); + clientService.unSubscribe(topic, consumerId); } /** @@ -175,13 +159,7 @@ public void unSubscribe(String topic, String consumerId) { * @return 确认是否成功。 */ public boolean ack(String topic, String consumerId, int offset) { - System.out.println(" ==>> ack topic/cid/offset: " + topic + "/" + consumerId + "/" + offset); - Result result = HttpUtils.httpGet( - brokerUrl + "/ack?t=" + topic + "&cid=" + consumerId + "&offset=" + offset, - new TypeReference>() { - }); - System.out.println(" ==>> ack result: " + result); - return result.getCode() == 1; + return clientService.ack(topic, consumerId, offset); } @@ -193,12 +171,6 @@ public boolean ack(String topic, String consumerId, int offset) { * @return 统计信息对象。 */ public Statistical statistical(String topic, String consumerId) { - System.out.println(" ==>> statistical topic/cid: " + topic + "/" + consumerId); - Result result = HttpUtils.httpGet( - brokerUrl + "/stat?t=" + topic + "&cid=" + consumerId, - new TypeReference>() { - }); - System.out.println(" ==>> statistical result: " + result); - return result.getData(); + return clientService.statistical(topic, consumerId); } } diff --git a/src/main/java/cn/ipman/mq/client/Consumer.java b/src/main/java/cn/ipman/mq/broker/MQConsumer.java old mode 100644 new mode 100755 similarity index 93% rename from src/main/java/cn/ipman/mq/client/Consumer.java rename to src/main/java/cn/ipman/mq/broker/MQConsumer.java index 55ca1da..dc6caef --- a/src/main/java/cn/ipman/mq/client/Consumer.java +++ b/src/main/java/cn/ipman/mq/broker/MQConsumer.java @@ -1,4 +1,4 @@ -package cn.ipman.mq.client; +package cn.ipman.mq.broker; import cn.ipman.mq.model.Message; import cn.ipman.mq.model.Statistical; @@ -15,7 +15,7 @@ * @Author IpMan * @Date 2024/6/29 19:55 */ -public class Consumer { +public class MQConsumer { /** * 使用AtomicInteger来全局唯一标识消费者 @@ -30,7 +30,7 @@ public class Consumer { /** * 与之交互的Broker实例 */ - Broker broker; + MQBroker broker; /** * Consumer构造函数。 @@ -39,13 +39,13 @@ public class Consumer { * * @param broker 与消费者交互的Broker实例。 */ - public Consumer(Broker broker) { + public MQConsumer(MQBroker broker) { this.broker = broker; this.id = "CID" + CID.getAndIncrement(); } - public Consumer(Broker broker, int customCid) { + public MQConsumer(MQBroker broker, int customCid) { this.broker = broker; this.id = "CID" + customCid; } @@ -119,7 +119,7 @@ public boolean ack(String topic, Message message) { /** * 消息监听器,用于异步处理接收到的消息 */ - public Listener listener; + public MQListener listener; /** * 注册消息监听器。 @@ -129,7 +129,7 @@ public boolean ack(String topic, Message message) { * @param topic 监听的消息主题。 * @param listener 用于处理消息的监听器实例。 */ - public void listen(String topic, Listener listener) { + public void addListen(String topic, MQListener listener) { this.listener = listener; broker.addConsumer(topic, this); } diff --git a/src/main/java/cn/ipman/mq/client/Listener.java b/src/main/java/cn/ipman/mq/broker/MQListener.java old mode 100644 new mode 100755 similarity index 75% rename from src/main/java/cn/ipman/mq/client/Listener.java rename to src/main/java/cn/ipman/mq/broker/MQListener.java index 94b2ba5..8a28e25 --- a/src/main/java/cn/ipman/mq/client/Listener.java +++ b/src/main/java/cn/ipman/mq/broker/MQListener.java @@ -1,10 +1,17 @@ -package cn.ipman.mq.client; +package cn.ipman.mq.broker; import cn.ipman.mq.model.Message; + +/** + * Description for this class + * + * @Author IpMan + * @Date 2024/6/29 20:07 + */ @FunctionalInterface -public interface Listener { +public interface MQListener { /** * 当收到消息时调用此方法。 diff --git a/src/main/java/cn/ipman/mq/broker/MQListenerContainerFactory.java b/src/main/java/cn/ipman/mq/broker/MQListenerContainerFactory.java new file mode 100644 index 0000000..40c406f --- /dev/null +++ b/src/main/java/cn/ipman/mq/broker/MQListenerContainerFactory.java @@ -0,0 +1,38 @@ +package cn.ipman.mq.broker; + + +import java.util.Arrays; + +/** + * Description for this class + * + * @Author IpMan + * @Date 2024/6/29 20:07 + */ +public class MQListenerContainerFactory { + + MQBroker broker; + + public MQListenerContainerFactory(MQBroker broker) { + this.broker = broker; + } + + public void registryListener(MQListenerEndpoint endpoint) { + Arrays.stream(endpoint.getTopic()).forEach(topic -> { + // created and subscribe + MQConsumer consumer = broker.createConsumer(topic); + endpoint.getConsumerMap().putIfAbsent(topic, consumer); + // 添加订阅者 + consumer.addListen(topic, message -> { + try { + // 业务处理 + endpoint.getMethod().invoke(endpoint.getBean(), message); + } catch (Exception e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + }); + }); + } + +} diff --git a/src/main/java/cn/ipman/mq/broker/MQListenerEndpoint.java b/src/main/java/cn/ipman/mq/broker/MQListenerEndpoint.java new file mode 100644 index 0000000..b97b730 --- /dev/null +++ b/src/main/java/cn/ipman/mq/broker/MQListenerEndpoint.java @@ -0,0 +1,23 @@ +package cn.ipman.mq.broker; + +import lombok.Data; + +import java.lang.reflect.Method; +import java.util.HashMap; +import java.util.Map; + +/** + * Description for this class + * + * @Author IpMan + * @Date 2024/6/29 20:07 + */ +@Data +public class MQListenerEndpoint { + + private Method method; + private Object bean; + private String[] topic; + private Map> consumerMap = new HashMap<>(); + +} diff --git a/src/main/java/cn/ipman/mq/broker/MQListenerProcessor.java b/src/main/java/cn/ipman/mq/broker/MQListenerProcessor.java new file mode 100644 index 0000000..1fe4f46 --- /dev/null +++ b/src/main/java/cn/ipman/mq/broker/MQListenerProcessor.java @@ -0,0 +1,57 @@ +package cn.ipman.mq.broker; + +import cn.ipman.mq.annotation.MQListener; +import lombok.Data; +import lombok.NonNull; +import org.springframework.aop.framework.AopProxyUtils; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.config.BeanPostProcessor; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.context.EnvironmentAware; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.MethodIntrospector; +import org.springframework.core.annotation.AnnotatedElementUtils; +import org.springframework.core.env.Environment; +import org.springframework.util.ReflectionUtils; + +import java.lang.reflect.Method; +import java.util.Set; + +/** + * Description for this class + * + * @Author IpMan + * @Date 2024/6/29 20:07 + */ +@Configuration +@Data +public class MQListenerProcessor implements BeanPostProcessor, ApplicationContextAware, EnvironmentAware { + + private ApplicationContext applicationContext; + private Environment environment; + + @Override + public Object postProcessAfterInitialization(@NonNull Object bean, @NonNull String beanName) throws BeansException { + // 为了方便测试... + if ("8765".equals(environment.getProperty("server.port"))) { + return bean; + } + Class targetClass = AopProxyUtils.ultimateTargetClass(bean); + Set methods = MethodIntrospector.selectMethods(targetClass, + (ReflectionUtils.MethodFilter) method -> AnnotatedElementUtils.hasAnnotation(method, MQListener.class)); + + // 查找消费者程序 + methods.forEach(method -> { + MQListener listener = method.getAnnotation(MQListener.class); + MQListenerEndpoint endpoint = new MQListenerEndpoint(); + endpoint.setBean(bean); + endpoint.setMethod(method); + endpoint.setTopic(listener.topic()); + + MQListenerContainerFactory factory = applicationContext.getBean(MQListenerContainerFactory.class); + factory.registryListener(endpoint); + }); + return bean; + } +} diff --git a/src/main/java/cn/ipman/mq/client/Producer.java b/src/main/java/cn/ipman/mq/broker/MQProducer.java old mode 100644 new mode 100755 similarity index 88% rename from src/main/java/cn/ipman/mq/client/Producer.java rename to src/main/java/cn/ipman/mq/broker/MQProducer.java index c2ef006..4206605 --- a/src/main/java/cn/ipman/mq/client/Producer.java +++ b/src/main/java/cn/ipman/mq/broker/MQProducer.java @@ -1,4 +1,4 @@ -package cn.ipman.mq.client; +package cn.ipman.mq.broker; import cn.ipman.mq.model.Message; @@ -10,20 +10,20 @@ * @Author IpMan * @Date 2024/6/29 19:41 */ -public class Producer { +public class MQProducer { /** * 消息代理实例。 * 用于发送消息到指定的主题。 */ - Broker broker; + MQBroker broker; /** * 构造方法,初始化消息队列生产者。 * * @param broker 消息代理实例,用于实际发送消息。 */ - public Producer(Broker broker) { + public MQProducer(MQBroker broker) { this.broker = broker; } diff --git a/src/main/java/cn/ipman/mq/client/ClientService.java b/src/main/java/cn/ipman/mq/client/ClientService.java new file mode 100644 index 0000000..9f863d9 --- /dev/null +++ b/src/main/java/cn/ipman/mq/client/ClientService.java @@ -0,0 +1,69 @@ +package cn.ipman.mq.client; + +import cn.ipman.mq.model.Message; +import cn.ipman.mq.model.Statistical; + + +/** + * Description for this class + * + * @Author IpMan + * @Date 2024/6/29 20:07 + */ +public interface ClientService { + + /** + * 发送消息到指定主题。 + * + * @param topic 消息主题。 + * @param message 消息对象。 + * @return 发送是否成功。 + */ + Boolean send(String topic, Message message); + + + /** + * 订阅指定主题。 + * + * @param topic 主题。 + * @param consumerId 消费者ID。 + */ + void subscribe(String topic, String consumerId); + + /** + * 接收指定主题的消息。 + * + * @param topic 主题。 + * @param consumerId 消费者ID。 + * @return 消息对象。 + */ + @SuppressWarnings("unchecked") + Message receive(String topic, String consumerId); + + /** + * 取消订阅指定主题。 + * + * @param topic 主题。 + * @param consumerId 消费者ID。 + */ + void unSubscribe(String topic, String consumerId); + + /** + * 确认消息消费。 + * + * @param topic 主题。 + * @param consumerId 消费者ID。 + * @param offset 消息偏移量。 + * @return 确认是否成功。 + */ + Boolean ack(String topic, String consumerId, int offset); + + /** + * 获取指定主题和消费者ID的统计信息。 + * + * @param topic 主题。 + * @param consumerId 消费者ID。 + * @return 统计信息对象。 + */ + Statistical statistical(String topic, String consumerId); +} diff --git a/src/main/java/cn/ipman/mq/client/http/HttpClientImpl.java b/src/main/java/cn/ipman/mq/client/http/HttpClientImpl.java new file mode 100644 index 0000000..672d728 --- /dev/null +++ b/src/main/java/cn/ipman/mq/client/http/HttpClientImpl.java @@ -0,0 +1,128 @@ +package cn.ipman.mq.client.http; + +import cn.ipman.mq.client.ClientService; +import cn.ipman.mq.model.Message; +import cn.ipman.mq.model.HttpResult; +import cn.ipman.mq.model.Statistical; +import cn.ipman.mq.utils.HttpUtils; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.TypeReference; + +/** + * Description for this class + * + * @Author IpMan + * @Date 2024/6/29 20:07 + */ +public class HttpClientImpl implements ClientService { + + String brokerUrl = null; + + public HttpClientImpl(String brokerUrl) { + this.brokerUrl = brokerUrl; + } + + /** + * 发送消息到指定主题。 + * + * @param topic 消息主题。 + * @param message 消息对象。 + * @return 发送是否成功。 + */ + @Override + public Boolean send(String topic, Message message) { + System.out.println(" ==>> send topic/message: " + topic + "/" + message); + System.out.println(JSON.toJSONString(message)); + HttpResult result = HttpUtils.httpPost(JSON.toJSONString(message), + brokerUrl + "/send?t=" + topic, new TypeReference>() { + }); + System.out.println(" ==>> send result: " + result); + return result.getCode() == 1; + } + + /** + * 订阅指定主题。 + * + * @param topic 主题。 + * @param consumerId 消费者ID。 + */ + @Override + public void subscribe(String topic, String consumerId) { + System.out.println(" ==>> subscribe topic/consumerID: " + topic + "/" + consumerId); + HttpResult result = HttpUtils.httpGet(brokerUrl + "/sub?t=" + topic + "&cid=" + consumerId, + new TypeReference>() { + }); + System.out.println(" ==>> subscribe result: " + result); + } + + /** + * 接收指定主题的消息。 + * + * @param topic 主题。 + * @param consumerId 消费者ID。 + * @return 消息对象。 + */ + @SuppressWarnings("unchecked") + @Override + public Message receive(String topic, String consumerId) { + System.out.println(" ==>> receive topic/cid: " + topic + "/" + consumerId); + HttpResult> result = HttpUtils.httpGet(brokerUrl + "/receive?t=" + topic + "&cid=" + consumerId, + new TypeReference>>() { + }); + System.out.println(" ==>> receive result: " + result); + return (Message) result.getData(); + } + + /** + * 取消订阅指定主题。 + * + * @param topic 主题。 + * @param consumerId 消费者ID。 + */ + @Override + public void unSubscribe(String topic, String consumerId) { + System.out.println(" ==>> unSubscribe topic/cid: " + topic + "/" + consumerId); + HttpResult result = HttpUtils.httpGet(brokerUrl + "/unsub?t=" + topic + "&cid=" + consumerId, + new TypeReference>() { + }); + System.out.println(" ==>> unSubscribe result: " + result); + } + + /** + * 确认消息消费。 + * + * @param topic 主题。 + * @param consumerId 消费者ID。 + * @param offset 消息偏移量。 + * @return 确认是否成功。 + */ + @Override + public Boolean ack(String topic, String consumerId, int offset) { + System.out.println(" ==>> ack topic/cid/offset: " + topic + "/" + consumerId + "/" + offset); + HttpResult result = HttpUtils.httpGet( + brokerUrl + "/ack?t=" + topic + "&cid=" + consumerId + "&offset=" + offset, + new TypeReference>() { + }); + System.out.println(" ==>> ack result: " + result); + return result.getCode() == 1; + } + + + /** + * 获取指定主题和消费者ID的统计信息。 + * + * @param topic 主题。 + * @param consumerId 消费者ID。 + * @return 统计信息对象。 + */ + @Override + public Statistical statistical(String topic, String consumerId) { + System.out.println(" ==>> statistical topic/cid: " + topic + "/" + consumerId); + HttpResult result = HttpUtils.httpGet( + brokerUrl + "/stat?t=" + topic + "&cid=" + consumerId, + new TypeReference>() { + }); + System.out.println(" ==>> statistical result: " + result); + return result.getData(); + } +} diff --git a/src/main/java/cn/ipman/mq/client/netty/NettyClientImpl.java b/src/main/java/cn/ipman/mq/client/netty/NettyClientImpl.java new file mode 100644 index 0000000..b4d38f8 --- /dev/null +++ b/src/main/java/cn/ipman/mq/client/netty/NettyClientImpl.java @@ -0,0 +1,161 @@ +package cn.ipman.mq.client.netty; + +import cn.ipman.mq.client.ClientService; +import cn.ipman.mq.model.Message; +import cn.ipman.mq.model.NettyResponse; +import cn.ipman.mq.model.Statistical; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.TypeReference; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; + +/** + * Description for this class + * + * @Author IpMan + * @Date 2024/6/29 20:07 + */ +public class NettyClientImpl implements ClientService { + + String host; + int port; + NettyMQClientPool clientPool; + + int maxTotal = 10; + int maxIdle = 5; + int minIdle = 2; + + public NettyClientImpl(String host, int port) { + this.host = host; + this.port = port; + this.clientPool = new NettyMQClientPool(host, port, maxTotal, maxIdle, minIdle); + } + + private T executeWithClient(Function function) { + NettyMQClient client = null; + try { + client = clientPool.borrowClient(); + return function.apply(client); + } catch (Exception e) { + e.printStackTrace(); + return null; + } finally { + if (client != null) { + clientPool.returnClient(client); + } + } + } + + @Override + public Boolean send(String topic, Message message) { + return executeWithClient(client -> { + try { + Map params = Map.of("t", topic); + CompletableFuture future = client.sendMessage("send", params, message); + NettyResponse response = JSON.parseObject(future.get(), + new TypeReference>() { + }); + System.out.println("【send】 Received response: " + response); + return response.getCode() == 1; + } catch (Exception e) { + e.printStackTrace(); + return false; + } + }); + } + + @Override + public void subscribe(String topic, String consumerId) { + executeWithClient(client -> { + try { + Map params = Map.of("t", topic, "cid", consumerId); + CompletableFuture future = client.sendMessage("sub", params, null); + NettyResponse response = JSON.parseObject(future.get(), + new TypeReference>() { + }); + System.out.println("【sub】 Received response: " + response); + } catch (Exception e) { + e.printStackTrace(); + } + return null; + }); + } + + @Override + @SuppressWarnings("unchecked") + public Message receive(String topic, String consumerId) { + return executeWithClient(client -> { + try { + Map params = Map.of("t", topic, "cid", consumerId); + CompletableFuture future = client.sendMessage("receive", params, null); + NettyResponse> response = JSON.parseObject(future.get(), + new TypeReference>>() { + }); + System.out.println("【receive】 Received response: " + response); + return (Message) response.getData(); + } catch (Exception e) { + e.printStackTrace(); + return null; + } + }); + } + + @Override + public void unSubscribe(String topic, String consumerId) { + executeWithClient(client -> { + try { + Map params = Map.of("t", topic, "cid", consumerId); + CompletableFuture future = client.sendMessage("unsub", params, null); + NettyResponse response = JSON.parseObject(future.get(), + new TypeReference>() { + }); + System.out.println("【unsub】 Received response: " + response); + } catch (Exception e) { + e.printStackTrace(); + } + return null; + }); + } + + @Override + public Boolean ack(String topic, String consumerId, int offset) { + return executeWithClient(client -> { + try { + Map params = Map.of( + "t", topic, + "cid", consumerId, + "offset", String.valueOf(offset) + ); + CompletableFuture future = client.sendMessage("ack", params, null); + NettyResponse response = JSON.parseObject(future.get(), + new TypeReference>() { + }); + System.out.println("【ack】 Received response: " + response); + return response.getCode() == 1; + } catch (Exception e) { + e.printStackTrace(); + return false; + } + }); + } + + @Override + public Statistical statistical(String topic, String consumerId) { + return executeWithClient(client -> { + try { + Map params = Map.of("t", topic, "cid", consumerId); + CompletableFuture future = client.sendMessage("stat", params, null); + NettyResponse response = JSON.parseObject(future.get(), + new TypeReference>() { + }); + System.out.println("【stat】 Received response: " + response); + return response.getData(); + } catch (Exception e) { + e.printStackTrace(); + return null; + } + }); + } +} diff --git a/src/main/java/cn/ipman/mq/client/netty/NettyMQClient.java b/src/main/java/cn/ipman/mq/client/netty/NettyMQClient.java new file mode 100644 index 0000000..b561766 --- /dev/null +++ b/src/main/java/cn/ipman/mq/client/netty/NettyMQClient.java @@ -0,0 +1,100 @@ +package cn.ipman.mq.client.netty; + +import cn.ipman.mq.model.Message; +import com.alibaba.fastjson.JSON; +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.*; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.DelimiterBasedFrameDecoder; +import io.netty.handler.codec.string.StringDecoder; +import io.netty.handler.codec.string.StringEncoder; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; + +import java.nio.charset.StandardCharsets; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +import static cn.ipman.mq.model.Constants.DELIMITER; +import static cn.ipman.mq.model.Constants.MAX_FRAME_LENGTH; + +/** + * Description for this class + * + * @Author IpMan + * @Date 2024/6/29 20:07 + */ +public class NettyMQClient { + + String host; + int port; + NettyMQClientHandler clientHandler; + int timeout = 2_000; + + private Channel channel; + private EventLoopGroup group; + + public NettyMQClient(String host, int port) { + this.host = host; + this.port = port; + this.clientHandler = new NettyMQClientHandler(); + } + + public void start() { + group = new NioEventLoopGroup(10); + try { + Bootstrap b = new Bootstrap(); + b.group(group); + b.channel(NioSocketChannel.class); + //b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeout); // 连接超时 + b.option(ChannelOption.SO_KEEPALIVE, true); + b.handler(new LoggingHandler(LogLevel.INFO)); + b.handler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline p = ch.pipeline(); + ByteBuf delimiter = Unpooled.copiedBuffer(DELIMITER.getBytes(StandardCharsets.UTF_8)); + p.addLast(new DelimiterBasedFrameDecoder(MAX_FRAME_LENGTH, delimiter)); + //p.addLast(new ReadTimeoutHandler(timeout, TimeUnit.MILLISECONDS)); // 读超时 + p.addLast(new StringDecoder()); + p.addLast(new StringEncoder()); + p.addLast(clientHandler); + } + }); + + ChannelFuture f = b.connect(host, port).sync(); + channel = f.channel(); + } catch (Exception e) { + group.shutdownGracefully(); + throw new RuntimeException(e); + } + } + + public boolean isActive() { + return channel != null && channel.isActive(); + } + + public void close() { + if (channel != null) { + channel.close(); + } + if (group != null) { + group.shutdownGracefully(); + } + } + + public CompletableFuture sendMessage(String action, Map params, Message message) { + if (channel != null && channel.isActive()) { + String jsonMessage = JSON.toJSONString(message); + return clientHandler.sendMessage(channel, action, params, jsonMessage); + } else { + CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally(new IllegalStateException("Channel is not active")); + return future; + } + } +} diff --git a/src/main/java/cn/ipman/mq/client/netty/NettyMQClientFactory.java b/src/main/java/cn/ipman/mq/client/netty/NettyMQClientFactory.java new file mode 100644 index 0000000..29da92d --- /dev/null +++ b/src/main/java/cn/ipman/mq/client/netty/NettyMQClientFactory.java @@ -0,0 +1,50 @@ +package cn.ipman.mq.client.netty; + +import org.apache.commons.pool2.PooledObject; +import org.apache.commons.pool2.PooledObjectFactory; +import org.apache.commons.pool2.impl.DefaultPooledObject; + +/** + * Description for this class + * + * @Author IpMan + * @Date 2024/6/29 20:07 + */ +public class NettyMQClientFactory implements PooledObjectFactory { + + private final String host; + private final int port; + + public NettyMQClientFactory(String host, int port) { + this.host = host; + this.port = port; + } + + @Override + public PooledObject makeObject() throws Exception { + NettyMQClient client = new NettyMQClient(host, port); + client.start(); + return new DefaultPooledObject<>(client); + } + + @Override + public void destroyObject(PooledObject p) throws Exception { + p.getObject().close(); + } + + @Override + public boolean validateObject(PooledObject p) { + return p.getObject().isActive(); + } + + @Override + public void activateObject(PooledObject p) throws Exception { + // No activation required + } + + @Override + public void passivateObject(PooledObject p) throws Exception { + // No passivation required + } + +} diff --git a/src/main/java/cn/ipman/mq/client/netty/NettyMQClientHandler.java b/src/main/java/cn/ipman/mq/client/netty/NettyMQClientHandler.java new file mode 100644 index 0000000..10e453b --- /dev/null +++ b/src/main/java/cn/ipman/mq/client/netty/NettyMQClientHandler.java @@ -0,0 +1,54 @@ +package cn.ipman.mq.client.netty; + +import cn.ipman.mq.model.NettyRequest; +import cn.ipman.mq.model.NettyResponse; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.TypeReference; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + +import static cn.ipman.mq.model.Constants.DELIMITER; + +/** + * Description for this class + * + * @Author IpMan + * @Date 2024/6/29 20:07 + */ +public class NettyMQClientHandler extends SimpleChannelInboundHandler { + + private static final AtomicLong requestIdGenerator = new AtomicLong(0); + private final ConcurrentHashMap> paddingRequests = new ConcurrentHashMap<>(); + + @Override + protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { + NettyResponse response = JSON.parseObject(msg, new TypeReference>(){}); + long requestId = response.getTraceId(); + CompletableFuture future = paddingRequests.remove(requestId); + if (future != null) { + future.complete(msg); + } + } + + public CompletableFuture sendMessage(Channel channel, String action, + Map params, String message) { + long requestId = requestIdGenerator.getAndIncrement(); + NettyRequest request = new NettyRequest<>(); + request.setTraceId(requestId); + request.setAction(action); + request.setParams(params); + request.setBody(message); + String jsonRequest = JSON.toJSONString(request); + + CompletableFuture future = new CompletableFuture<>(); + paddingRequests.put(requestId, future); + channel.writeAndFlush( jsonRequest + DELIMITER); + return future; + } +} diff --git a/src/main/java/cn/ipman/mq/client/netty/NettyMQClientPool.java b/src/main/java/cn/ipman/mq/client/netty/NettyMQClientPool.java new file mode 100644 index 0000000..65b89a2 --- /dev/null +++ b/src/main/java/cn/ipman/mq/client/netty/NettyMQClientPool.java @@ -0,0 +1,37 @@ +package cn.ipman.mq.client.netty; + +import org.apache.commons.pool2.impl.GenericObjectPool; +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; + +/** + * Description for this class + * + * @Author IpMan + * @Date 2024/6/29 20:07 + */ +public class NettyMQClientPool { + + private final GenericObjectPool clientPool; + + public NettyMQClientPool(String host, int port, int maxTotal, int maxIdle, int minIdle) { + GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig<>(); + poolConfig.setMaxTotal(maxTotal); + poolConfig.setMaxIdle(maxIdle); + poolConfig.setMinIdle(minIdle); + + NettyMQClientFactory factory = new NettyMQClientFactory(host, port); + clientPool = new GenericObjectPool<>(factory, poolConfig); + } + + public NettyMQClient borrowClient() throws Exception { + return clientPool.borrowObject(); + } + + public void returnClient(NettyMQClient client) { + clientPool.returnObject(client); + } + + public void close() { + clientPool.close(); + } +} diff --git a/src/main/java/cn/ipman/mq/config/BrokerConfig.java b/src/main/java/cn/ipman/mq/config/BrokerConfig.java new file mode 100644 index 0000000..8f0f81d --- /dev/null +++ b/src/main/java/cn/ipman/mq/config/BrokerConfig.java @@ -0,0 +1,21 @@ +package cn.ipman.mq.config; + +import cn.ipman.mq.broker.MQBroker; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + + +/** + * Description for this class + * + * @Author IpMan + * @Date 2024/6/29 20:07 + */ +@Configuration +public class BrokerConfig { + + @Bean + public MQBroker brokerFactory() { + return MQBroker.getDefault(); + } +} diff --git a/src/main/java/cn/ipman/mq/config/ConsumerConfig.java b/src/main/java/cn/ipman/mq/config/ConsumerConfig.java new file mode 100644 index 0000000..b549547 --- /dev/null +++ b/src/main/java/cn/ipman/mq/config/ConsumerConfig.java @@ -0,0 +1,24 @@ +package cn.ipman.mq.config; + +import cn.ipman.mq.broker.MQBroker; +import cn.ipman.mq.broker.MQListenerContainerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + + +/** + * Description for this class + * + * @Author IpMan + * @Date 2024/6/29 20:07 + */ +@Configuration +public class ConsumerConfig { + + @Bean + public MQListenerContainerFactory listenerContainerFactory(@Autowired MQBroker broker) { + return new MQListenerContainerFactory(broker); + } + +} diff --git a/src/main/java/cn/ipman/mq/config/ProducerConfig.java b/src/main/java/cn/ipman/mq/config/ProducerConfig.java new file mode 100644 index 0000000..00effb1 --- /dev/null +++ b/src/main/java/cn/ipman/mq/config/ProducerConfig.java @@ -0,0 +1,25 @@ +package cn.ipman.mq.config; + + +import cn.ipman.mq.broker.MQBroker; +import cn.ipman.mq.broker.MQProducer; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + + +/** + * Description for this class + * + * @Author IpMan + * @Date 2024/6/29 20:07 + */ +@Configuration +public class ProducerConfig { + + @Bean + public MQProducer producerFactory(@Autowired MQBroker broker){ + return broker.createProducer(); + } + +} diff --git a/src/main/java/cn/ipman/mq/demo/ConsumerDemo1.java b/src/main/java/cn/ipman/mq/demo/ConsumerDemo1.java old mode 100644 new mode 100755 index c3f6f46..e8d4da7 --- a/src/main/java/cn/ipman/mq/demo/ConsumerDemo1.java +++ b/src/main/java/cn/ipman/mq/demo/ConsumerDemo1.java @@ -1,9 +1,9 @@ package cn.ipman.mq.demo; -import cn.ipman.mq.client.Broker; -import cn.ipman.mq.client.Consumer; +import cn.ipman.mq.broker.MQBroker; +import cn.ipman.mq.broker.MQConsumer; import cn.ipman.mq.model.Message; -import cn.ipman.mq.client.Producer; +import cn.ipman.mq.broker.MQProducer; import cn.ipman.mq.model.Statistical; import com.alibaba.fastjson.JSON; import lombok.SneakyThrows; @@ -23,15 +23,15 @@ public static void main(String[] args) { // 创建broker, 绑定topic String topic = "im.order"; - Broker broker = Broker.getDefault(); + MQBroker broker = MQBroker.getDefault(); // 通过broker创建producer和consumer - Producer producer = broker.createProducer(); + MQProducer producer = broker.createProducer(); // consumer-1 - Consumer consumer1 = broker.createConsumer(topic, 1); + MQConsumer consumer1 = broker.createConsumer(topic, 1); // ------------ 生产、消费 ------------------ - for (int i = 0; i < 10; i++) { + for (int i = 0; i < 100; i++) { Order order = new Order(ids, "item" + ids, 100 * ids); producer.send(topic, new Message<>(ids++, JSON.toJSONString(order), null)); System.out.println("send ok => " + order); @@ -42,12 +42,13 @@ public static void main(String[] args) { System.out.println("poll ok => " + message); // 做业务处理... consumer1.ack(topic, message); } + System.out.println("===>>===>>===>>===>>===>> " ); while (true) { char c = (char) System.in.read(); if (c == 'q' || c == 'e') { // 退出 consumer1.unSubscribe(topic); - break; + System.exit(0); } if (c == 'p') { // 生产 Order order = new Order(ids, "item" + ids, 100 * ids); @@ -72,6 +73,5 @@ public static void main(String[] args) { System.out.println("send 10 orders... "); } } - } } diff --git a/src/main/java/cn/ipman/mq/demo/ConsumerDemo2.java b/src/main/java/cn/ipman/mq/demo/ConsumerDemo2.java old mode 100644 new mode 100755 index 80ada75..fa806b9 --- a/src/main/java/cn/ipman/mq/demo/ConsumerDemo2.java +++ b/src/main/java/cn/ipman/mq/demo/ConsumerDemo2.java @@ -1,8 +1,8 @@ package cn.ipman.mq.demo; -import cn.ipman.mq.client.Broker; -import cn.ipman.mq.client.Consumer; -import cn.ipman.mq.client.Producer; +import cn.ipman.mq.broker.MQBroker; +import cn.ipman.mq.broker.MQConsumer; +import cn.ipman.mq.broker.MQProducer; import cn.ipman.mq.model.Message; import com.alibaba.fastjson.JSON; import lombok.SneakyThrows; @@ -15,6 +15,8 @@ */ public class ConsumerDemo2 { + static int count = 0; + @SneakyThrows @SuppressWarnings("unchecked") public static void main(String[] args) { @@ -22,23 +24,28 @@ public static void main(String[] args) { // 创建broker, 绑定topic String topic = "im.order"; - Broker broker = Broker.getDefault(); + MQBroker broker = MQBroker.getDefault(); // 通过broker创建producer和consumer - Producer producer = broker.createProducer(); + MQProducer producer = broker.createProducer(); // consumer-0 - Consumer consumer = broker.createConsumer(topic, 2); + MQConsumer consumer = broker.createConsumer(topic, 3); // 测试listen监听topic - consumer.listen(topic, message -> { + + + consumer.addListen(topic, message -> { System.out.println("listener onMessage => " + message); + System.out.println(count++); }); - // ------------ 生产、消费 ------------------ - for (int i = 0; i < 10; i++) { - Order order = new Order(ids, "item" + ids, 100 * ids); - producer.send(topic, new Message<>(ids++, JSON.toJSONString(order), null)); - System.out.println("send ok => " + order); - } + + +// // ------------ 生产、消费 ------------------ +// for (int i = 0; i < 10; i++) { +// Order order = new Order(ids, "item" + ids, 100 * ids); +// producer.send(topic, new Message<>(ids++, JSON.toJSONString(order), null)); +// System.out.println("send ok => " + order); +// } } } diff --git a/src/main/java/cn/ipman/mq/demo/ConsumerDemo3.java b/src/main/java/cn/ipman/mq/demo/ConsumerDemo3.java old mode 100644 new mode 100755 index 3121eed..2e2c2a7 --- a/src/main/java/cn/ipman/mq/demo/ConsumerDemo3.java +++ b/src/main/java/cn/ipman/mq/demo/ConsumerDemo3.java @@ -1,8 +1,8 @@ package cn.ipman.mq.demo; -import cn.ipman.mq.client.Broker; -import cn.ipman.mq.client.Consumer; -import cn.ipman.mq.client.Producer; +import cn.ipman.mq.broker.MQBroker; +import cn.ipman.mq.broker.MQConsumer; +import cn.ipman.mq.broker.MQProducer; import cn.ipman.mq.model.Message; import cn.ipman.mq.model.Statistical; import com.alibaba.fastjson.JSON; @@ -22,14 +22,14 @@ public static void main(String[] args) { int ids = 0; // 创建broker, 绑定topic - String topic = "cn.ipman.test"; - Broker broker = Broker.getDefault(); + String topic = "im.order"; + MQBroker broker = MQBroker.getDefault(); // 通过broker创建producer和consumer - Producer producer = broker.createProducer(); + MQProducer producer = broker.createProducer(); // consumer-1 - Consumer consumer1 = broker.createConsumer(topic, 3); + MQConsumer consumer1 = broker.createConsumer(topic, 3); // ------------ 生产、消费 ------------------ for (int i = 0; i < 10; i++) { Order order = new Order(ids, "item" + ids, 100 * ids); diff --git a/src/main/java/cn/ipman/mq/demo/NettyClientDemo.java b/src/main/java/cn/ipman/mq/demo/NettyClientDemo.java new file mode 100644 index 0000000..e5c50bb --- /dev/null +++ b/src/main/java/cn/ipman/mq/demo/NettyClientDemo.java @@ -0,0 +1,194 @@ +package cn.ipman.mq.demo; + +import cn.ipman.mq.client.netty.NettyMQClient; +import cn.ipman.mq.client.netty.NettyMQClientPool; +import cn.ipman.mq.model.Message; +import cn.ipman.mq.model.NettyResponse; +import cn.ipman.mq.model.Statistical; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.TypeReference; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + + +/** + * Description for this class + * + * @Author IpMan + * @Date 2024/6/29 20:07 + */ +public class NettyClientDemo { + + + public static void main(String[] args) throws Exception { + String host = "127.0.0.1"; + int port = 6666; + int maxTotal = 10; + int maxIdle = 5; + int minIdle = 2; + + NettyMQClientPool clientPool = new NettyMQClientPool(host, port, maxTotal, maxIdle, minIdle); + try { + // 借用客户端 + NettyMQClient client = clientPool.borrowClient(); + + sub(client); + statistical(client); + + for (int i = 0; i < 10; i++) { + send(client); + } + + NettyResponse> receive = receive(client); + assert receive != null; + int offset = Integer.parseInt(receive.getData().getHeaders().get("X-offset")); + ack(client, offset); + + NettyResponse>> batchReceive = batchReceive(client, 50); + assert batchReceive != null; + int maxOffset = batchReceive.getData().stream() + .mapToInt(msg -> Integer.parseInt(msg.getHeaders().get("X-offset"))) + .max() + .orElse(0); + long totalOffset = batchReceive.getData().size(); + System.out.println("batch receive total/max = " + totalOffset + "/" + maxOffset); + ack(client, maxOffset); + + statistical(client); + + unsub(client); + + // 归还客户端 + clientPool.returnClient(client); + } finally { + clientPool.close(); + } + } + + public static void sub(NettyMQClient client) { + Map params = Map.of( + "t", "cn.ipman.test", + "cid", "123"); + // 发送消息 + CompletableFuture future = client.sendMessage("sub", params, null); + try { + NettyResponse response = JSON.parseObject(future.get(), + new TypeReference>() { + }); + System.out.println("【sub】 Received response: " + response); + } catch (Exception e) { + e.printStackTrace(); + } + } + + public static void unsub(NettyMQClient client) { + Map params = Map.of( + "t", "cn.ipman.test", + "cid", "123"); + // 发送消息 + CompletableFuture future = client.sendMessage("unsub", params, null); + try { + NettyResponse response = JSON.parseObject(future.get(), + new TypeReference>() { + }); + System.out.println("【unsub】 Received response: " + response); + } catch (Exception e) { + e.printStackTrace(); + } + } + + public static void send(NettyMQClient client) { + Map params = Map.of( + "t", "cn.ipman.test", + "cid", "123"); + Order order = new Order(1, "item" + 1, 100); + Message message = new Message<>(1, JSON.toJSONString(order), null); + + // 发送消息 + CompletableFuture future = client.sendMessage("send", params, message); + try { + NettyResponse response = JSON.parseObject(future.get(), + new TypeReference>() { + }); + System.out.println("【send】 Received response: " + response); + } catch (Exception e) { + e.printStackTrace(); + } + } + + public static NettyResponse> receive(NettyMQClient client) { + Map params = Map.of( + "t", "cn.ipman.test", + "cid", "123"); + // 发送消息 + CompletableFuture future = client.sendMessage("receive", params, null); + try { + NettyResponse> response = JSON.parseObject(future.get(), + new TypeReference>>() { + }); + System.out.println("【batch receive】 Received response: " + response); + return response; + } catch (Exception e) { + e.printStackTrace(); + } + return null; + } + + public static NettyResponse>> batchReceive(NettyMQClient client, int size) { + Map params = Map.of( + "t", "cn.ipman.test", + "cid", "123", + "size", String.valueOf(size)); + // 发送消息 + CompletableFuture future = client.sendMessage("batch-receive", params, null); + try { + NettyResponse>> response = JSON.parseObject(future.get(), + new TypeReference>>>() { + }); + System.out.println("【receive】 Received response: " + response); + return response; + } catch (Exception e) { + e.printStackTrace(); + } + return null; + } + + + public static void ack(NettyMQClient client, int ack) { + Map params = Map.of( + "t", "cn.ipman.test", + "cid", "123", + "offset", String.valueOf(ack)); + + // 发送消息 + CompletableFuture future = client.sendMessage("ack", params, null); + try { + NettyResponse response = JSON.parseObject(future.get(), + new TypeReference>() { + }); + System.out.println("【ack】 Received response: " + response); + } catch (Exception e) { + e.printStackTrace(); + } + } + + + public static NettyResponse statistical(NettyMQClient client) { + Map params = Map.of("t", "cn.ipman.test", "cid", "123"); + // 发送消息 + CompletableFuture future = client.sendMessage("stat", params, null); + try { + NettyResponse response = JSON.parseObject(future.get(), + new TypeReference>() { + }); + System.out.println("【stat】 Received response: " + response); + return response; + } catch (Exception e) { + e.printStackTrace(); + } + return null; + } + +} diff --git a/src/main/java/cn/ipman/mq/demo/Order.java b/src/main/java/cn/ipman/mq/demo/Order.java old mode 100644 new mode 100755 diff --git a/src/main/java/cn/ipman/mq/demo/SpringAnnotationListenerDemo.java b/src/main/java/cn/ipman/mq/demo/SpringAnnotationListenerDemo.java new file mode 100644 index 0000000..be7054c --- /dev/null +++ b/src/main/java/cn/ipman/mq/demo/SpringAnnotationListenerDemo.java @@ -0,0 +1,23 @@ +package cn.ipman.mq.demo; + +import cn.ipman.mq.annotation.MQListener; +import cn.ipman.mq.model.Message; +import org.springframework.stereotype.Component; + + + +/** + * Description for this class + * + * @Author IpMan + * @Date 2024/6/29 20:07 + */ +@Component +public class SpringAnnotationListenerDemo { + + @MQListener(topic = {"im.order", "cn.ipman.test"}) + public void demo(Message msg) { + System.out.println("........." + msg); + } + +} diff --git a/src/main/java/cn/ipman/mq/model/Constants.java b/src/main/java/cn/ipman/mq/model/Constants.java new file mode 100644 index 0000000..21df1e8 --- /dev/null +++ b/src/main/java/cn/ipman/mq/model/Constants.java @@ -0,0 +1,17 @@ +package cn.ipman.mq.model; + + + +/** + * Description for this class + * + * @Author IpMan + * @Date 2024/6/29 20:07 + */ +public class Constants { + + public final static String DELIMITER = "\nnnn"; + public final static int MAX_FRAME_LENGTH = 8192; + + +} diff --git a/src/main/java/cn/ipman/mq/model/HttpResult.java b/src/main/java/cn/ipman/mq/model/HttpResult.java new file mode 100755 index 0000000..160c6cf --- /dev/null +++ b/src/main/java/cn/ipman/mq/model/HttpResult.java @@ -0,0 +1,44 @@ +package cn.ipman.mq.model; + +import lombok.AllArgsConstructor; +import lombok.Data; + +import java.util.List; + +/** + * Result for MQServer. + * + * @Author IpMan + * @Date 2024/6/30 20:36 + */ +@AllArgsConstructor +@Data +public class HttpResult { + + private int code; + private T data; + + public static HttpResult ok() { + return new HttpResult<>(1, "OK"); + } + + public static HttpResult> msg(String message) { + return new HttpResult<>(1, Message.createMessage(message, null)); + } + + public static HttpResult> msg(Message message) { + return new HttpResult<>(1, message); + } + + public static HttpResult>> msg(List> messages) { + return new HttpResult<>(1, messages); + } + + public static HttpResult ok(String msg) { + return new HttpResult<>(1, msg); + } + + public static HttpResult stat(Statistical statistical) { + return new HttpResult<>(1, statistical); + } +} diff --git a/src/main/java/cn/ipman/mq/model/Message.java b/src/main/java/cn/ipman/mq/model/Message.java old mode 100644 new mode 100755 diff --git a/src/main/java/cn/ipman/mq/model/NettyRequest.java b/src/main/java/cn/ipman/mq/model/NettyRequest.java new file mode 100644 index 0000000..e020a36 --- /dev/null +++ b/src/main/java/cn/ipman/mq/model/NettyRequest.java @@ -0,0 +1,21 @@ +package cn.ipman.mq.model; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.Map; + + +@Data +@AllArgsConstructor +@NoArgsConstructor +public class NettyRequest { + + private long traceId; + private String action; + private Map params; + private T body; + + +} diff --git a/src/main/java/cn/ipman/mq/model/NettyResponse.java b/src/main/java/cn/ipman/mq/model/NettyResponse.java new file mode 100644 index 0000000..3c1a7ff --- /dev/null +++ b/src/main/java/cn/ipman/mq/model/NettyResponse.java @@ -0,0 +1,15 @@ +package cn.ipman.mq.model; + +import lombok.*; + + + +@Data +@AllArgsConstructor +@NoArgsConstructor +public class NettyResponse { + + private long traceId; + private int code; + private T data; +} diff --git a/src/main/java/cn/ipman/mq/model/Result.java b/src/main/java/cn/ipman/mq/model/Result.java deleted file mode 100644 index 0a7ffac..0000000 --- a/src/main/java/cn/ipman/mq/model/Result.java +++ /dev/null @@ -1,44 +0,0 @@ -package cn.ipman.mq.model; - -import lombok.AllArgsConstructor; -import lombok.Data; - -import java.util.List; - -/** - * Result for MQServer. - * - * @Author IpMan - * @Date 2024/6/30 20:36 - */ -@AllArgsConstructor -@Data -public class Result { - - private int code; - private T data; - - public static Result ok() { - return new Result<>(1, "OK"); - } - - public static Result> msg(String message) { - return new Result<>(1, Message.createMessage(message, null)); - } - - public static Result> msg(Message message) { - return new Result<>(1, message); - } - - public static Result>> msg(List> messages) { - return new Result<>(1, messages); - } - - public static Result ok(String msg) { - return new Result<>(1, msg); - } - - public static Result stat(Statistical statistical) { - return new Result<>(1, statistical); - } -} diff --git a/src/main/java/cn/ipman/mq/model/Statistical.java b/src/main/java/cn/ipman/mq/model/Statistical.java old mode 100644 new mode 100755 diff --git a/src/main/java/cn/ipman/mq/model/Subscription.java b/src/main/java/cn/ipman/mq/model/Subscription.java old mode 100644 new mode 100755 diff --git a/src/main/java/cn/ipman/mq/server/MQServer.java b/src/main/java/cn/ipman/mq/server/MQServer.java old mode 100644 new mode 100755 index 5a5a37d..4759bd5 --- a/src/main/java/cn/ipman/mq/server/MQServer.java +++ b/src/main/java/cn/ipman/mq/server/MQServer.java @@ -1,9 +1,13 @@ package cn.ipman.mq.server; +import cn.ipman.mq.broker.MQProducer; +import cn.ipman.mq.demo.Order; import cn.ipman.mq.model.Message; -import cn.ipman.mq.model.Result; +import cn.ipman.mq.model.HttpResult; import cn.ipman.mq.model.Statistical; import cn.ipman.mq.model.Subscription; +import com.alibaba.fastjson.JSON; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; @@ -23,57 +27,71 @@ public class MQServer { // send @RequestMapping("/send") - public Result send(@RequestParam("t") String topic, - @RequestBody Message message) { - return Result.ok("msg" + MessageQueue.send(topic, message)); + public HttpResult send(@RequestParam("t") String topic, + @RequestBody Message message) { + return HttpResult.ok("msg" + MessageQueue.send(topic, message)); } + // receive @RequestMapping("/receive") - public Result> receive(@RequestParam("t") String topic, - @RequestParam("cid") String consumerId) { - return Result.msg(MessageQueue.receive(topic, consumerId)); + public HttpResult> receive(@RequestParam("t") String topic, + @RequestParam("cid") String consumerId) { + return HttpResult.msg(MessageQueue.receive(topic, consumerId)); } // receive @RequestMapping("/batch-receive") - public Result>> batchReceive(@RequestParam("t") String topic, - @RequestParam("cid") String consumerId, - @RequestParam(name = "size", required = false, defaultValue = "1000") int size) { - return Result.msg(MessageQueue.batchReceive(topic, consumerId, size)); + public HttpResult>> batchReceive(@RequestParam("t") String topic, + @RequestParam("cid") String consumerId, + @RequestParam(name = "size", required = false, defaultValue = "1000") int size) { + return HttpResult.msg(MessageQueue.batchReceive(topic, consumerId, size)); } // ack @RequestMapping("/ack") - public Result ack(@RequestParam("t") String topic, - @RequestParam("cid") String consumerId, - @RequestParam("offset") Integer offset) { - return Result.ok("" + MessageQueue.ack(topic, consumerId, offset)); + public HttpResult ack(@RequestParam("t") String topic, + @RequestParam("cid") String consumerId, + @RequestParam("offset") Integer offset) { + return HttpResult.ok("" + MessageQueue.ack(topic, consumerId, offset)); } // 1. subscriber @RequestMapping("/sub") - public Result subscribe(@RequestParam("t") String topic, - @RequestParam("cid") String consumerId) { + public HttpResult subscribe(@RequestParam("t") String topic, + @RequestParam("cid") String consumerId) { MessageQueue.sub(new Subscription(topic, consumerId, -1)); - return Result.ok(); + return HttpResult.ok(); } // unsubscribe @RequestMapping("/unsub") - public Result unSubscribe(@RequestParam("t") String topic, - @RequestParam("cid") String consumerId) { + public HttpResult unSubscribe(@RequestParam("t") String topic, + @RequestParam("cid") String consumerId) { MessageQueue.unsub(new Subscription(topic, consumerId, -1)); - return Result.ok(); + return HttpResult.ok(); } // stat @RequestMapping("/stat") - public Result stat(@RequestParam("t") String topic, - @RequestParam("cid") String consumerId) { - return Result.stat(MessageQueue.stat(topic, consumerId)); + public HttpResult stat(@RequestParam("t") String topic, + @RequestParam("cid") String consumerId) { + return HttpResult.stat(MessageQueue.stat(topic, consumerId)); + } + + + @Autowired + private MQProducer producer; + + // test product + @RequestMapping("/test-send") + public HttpResult testSend(){ + Order order = new Order(1, "item" + 1, 100); + producer.send("cn.ipman.test", new Message<>(1, JSON.toJSONString(order), null)); + return HttpResult.ok(); } + } diff --git a/src/main/java/cn/ipman/mq/server/MessageQueue.java b/src/main/java/cn/ipman/mq/server/MessageQueue.java old mode 100644 new mode 100755 index 6e23020..6b79cab --- a/src/main/java/cn/ipman/mq/server/MessageQueue.java +++ b/src/main/java/cn/ipman/mq/server/MessageQueue.java @@ -49,49 +49,49 @@ public MessageQueue(String topic) { /** * 批量接收消息。 * - * @param topic 消息的主题。 + * @param topic 消息的主题。 * @param consumerId 消费者的ID。 - * @param size 批量接收的消息数量。 + * @param size 批量接收的消息数量。 * @return 消息列表。 */ public static List> batchReceive(String topic, String consumerId, int size) { MessageQueue messageQueue = queues.get(topic); - if (messageQueue == null) throw new RuntimeException("topic not found"); - if (messageQueue.subscriptions.containsKey(consumerId)) { - - int offset = messageQueue.subscriptions.get(consumerId).getOffset(); - int nextOffset = 0; - if (offset > -1) { - Indexer.Entry entry = Indexer.getEntry(topic, offset); - if (entry == null) return null; - nextOffset = offset + entry.getLength(); - } - - List> result = new ArrayList<>(); - Message receive = messageQueue.receive(nextOffset); - // 如果能拿到数据, 并且不超过batch size的限制时 - while (receive != null) { - result.add(receive); - if (result.size() >= size) break; - - offset = Integer.parseInt(receive.getHeaders().get("X-offset")); - Indexer.Entry entry = Indexer.getEntry(topic, offset); - if (entry == null) break; - nextOffset = offset + entry.getLength(); - receive = messageQueue.receive(nextOffset); + if (messageQueue == null) { + throw new RuntimeException("Topic not found: " + topic); + } + if (!messageQueue.subscriptions.containsKey(consumerId)) { + throw new RuntimeException("Subscriptions not found for topic/consumerId = " + topic + "/" + consumerId); + } + // 寻找consumerId的消费位置 + int offset = messageQueue.subscriptions.get(consumerId).getOffset(); + int nextOffset = 0; + if (offset > -1) { + Indexer.Entry entry = Indexer.getEntry(topic, offset); + if (entry == null) return null; + nextOffset = messageQueue.store.nextOffset(offset, entry) + entry.getLength(); + } + // 批量获取数据 + List> result = new ArrayList<>(); + Message receive = messageQueue.receive(nextOffset); + while (receive != null && result.size() < size) { + result.add(receive); + offset = Integer.parseInt(receive.getHeaders().get("X-offset")); + Indexer.Entry entry = Indexer.getEntry(topic, offset); + if (entry == null) { + break; } - System.out.println(" ===>> batch: topic/cid/size = " + topic + "/" + consumerId + "/" + offset + "/" + result.size()); - System.out.println(" ===>> batch: last message = " + receive); - return result; + // 获取下一条消息 + nextOffset = messageQueue.store.nextOffset(offset, entry) + entry.getLength(); + receive = messageQueue.receive(nextOffset); } - throw new RuntimeException("subscriptions not found for topic/consumerId = " + topic + "/" + consumerId); + return result; } /** * 统计消息队列的状态。 * - * @param topic 消息的主题。 + * @param topic 消息的主题。 * @param consumerId 消费者的ID。 * @return 消息队列的统计信息。 */ @@ -175,8 +175,8 @@ public static void unsub(Subscription subscription) { /** * 发送消息的公共接口。 * - * @param topic 消息的主题。 - * @param message 要发送的消息。 + * @param topic 消息的主题。 + * @param message 要发送的消息。 * @return 消息的偏移量。 */ public static int send(String topic, Message message) { @@ -189,9 +189,9 @@ public static int send(String topic, Message message) { /** * 根据偏移量接收消息的公共接口。 * - * @param topic 消息的主题。 + * @param topic 消息的主题。 * @param consumerId 消费者的ID。 - * @param offset 消息的偏移量。 + * @param offset 消息的偏移量。 * @return 消息对象。 */ public static Message receive(String topic, String consumerId, int offset) { @@ -206,7 +206,7 @@ public static Message receive(String topic, String consumerId, int offset) { /** * 接收消息,不传入偏移量。 * - * @param topic 消息的主题。 + * @param topic 消息的主题。 * @param consumerId 消费者的ID。 * @return 消息对象。 */ @@ -219,10 +219,24 @@ public static Message receive(String topic, String consumerId) { int offset = messageQueue.subscriptions.get(consumerId).getOffset(); int nextOffset = 0; if (offset > -1) { + //offset = messageQueue.store.nextOffset(offset); + System.out.println(" ===>> receive: start = " + topic + "/" + consumerId + "/" + offset); Indexer.Entry entry = Indexer.getEntry(topic, offset); if (entry == null) return null; - nextOffset = offset + entry.getLength(); + nextOffset = messageQueue.store.nextOffset(offset, entry) + entry.getLength() ; + + // 拿到偏移量,再获取数据 + Message receive = messageQueue.receive(nextOffset); + if (receive == null) { + System.out.println("$$$$$$" + Indexer.getMappings()); + System.out.println("$$$$$$" + entry); + System.out.println("$$$$$$" + offset); + System.out.println("$$$$$$" + nextOffset); + } + System.out.println(" ===>> receive: topic/cid/idx = " + topic + "/" + consumerId + "/" + offset); + System.out.println(" ===>> receive: message = " + receive); + return receive; } // 拿到偏移量,再获取数据 @@ -230,6 +244,7 @@ public static Message receive(String topic, String consumerId) { System.out.println(" ===>> receive: topic/cid/idx = " + topic + "/" + consumerId + "/" + offset); System.out.println(" ===>> receive: message = " + receive); return receive; + } throw new RuntimeException("subscriptions not found for topic/consumerId = " + topic + "/" + consumerId); } @@ -238,9 +253,9 @@ public static Message receive(String topic, String consumerId) { /** * 确认消息消费并更新消费者偏移量。 * - * @param topic 消息的主题。 + * @param topic 消息的主题。 * @param consumerId 消费者的ID。 - * @param offset 消息的偏移量。 + * @param offset 消息的偏移量。 * @return 更新后的偏移量。 */ public static int ack(String topic, String consumerId, int offset) { diff --git a/src/main/java/cn/ipman/mq/server/NettyMQServer.java b/src/main/java/cn/ipman/mq/server/NettyMQServer.java new file mode 100644 index 0000000..bd650c2 --- /dev/null +++ b/src/main/java/cn/ipman/mq/server/NettyMQServer.java @@ -0,0 +1,75 @@ +package cn.ipman.mq.server; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.buffer.Unpooled; +import io.netty.channel.*; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.DelimiterBasedFrameDecoder; +import io.netty.handler.codec.string.StringDecoder; +import io.netty.handler.codec.string.StringEncoder; + +import java.nio.charset.StandardCharsets; + +import static cn.ipman.mq.model.Constants.DELIMITER; +import static cn.ipman.mq.model.Constants.MAX_FRAME_LENGTH; + + +/** + * Description for this class + * + * @Author IpMan + * @Date 2024/6/29 20:07 + */ +public class NettyMQServer { + + int port; + + public NettyMQServer(int port) { + this.port = port; + } + + public void run() throws Exception { + + EventLoopGroup bossGroup = new NioEventLoopGroup(2); + EventLoopGroup workerGroup = new NioEventLoopGroup(16); + + try { + ServerBootstrap b = new ServerBootstrap(); + b.option(ChannelOption.SO_BACKLOG, 128) // 连接队列大小 + .childOption(ChannelOption.TCP_NODELAY, true) // 关闭Nagle,即时传输 + .childOption(ChannelOption.SO_KEEPALIVE, true) // 支持长连接 + .childOption(ChannelOption.SO_REUSEADDR, true) // 共享端口 + .childOption(ChannelOption.SO_RCVBUF, 32 * 1024) // 操作缓冲区的大小 + .childOption(ChannelOption.SO_SNDBUF, 32 * 1024) // 发送缓冲区的大小 + .childOption(ChannelOption.SO_KEEPALIVE, true) + .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); + + b.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .childHandler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline p = ch.pipeline(); + ByteBuf delimiter = Unpooled.copiedBuffer(DELIMITER.getBytes(StandardCharsets.UTF_8)); + p.addLast(new DelimiterBasedFrameDecoder(MAX_FRAME_LENGTH, delimiter)); + p.addLast(new StringDecoder()); + p.addLast(new StringEncoder()); + p.addLast(new NettyMQServerHandler()); + } + }); + + System.out.println("netty server starting....."); + ChannelFuture f = b.bind(port).sync(); + f.channel().closeFuture().sync(); + } finally { + workerGroup.shutdownGracefully(); + bossGroup.shutdownGracefully(); + } + } + + +} diff --git a/src/main/java/cn/ipman/mq/server/NettyMQServerHandler.java b/src/main/java/cn/ipman/mq/server/NettyMQServerHandler.java new file mode 100644 index 0000000..9ad1291 --- /dev/null +++ b/src/main/java/cn/ipman/mq/server/NettyMQServerHandler.java @@ -0,0 +1,113 @@ +package cn.ipman.mq.server; + +import cn.ipman.mq.model.*; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.TypeReference; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; + +import java.util.List; + +import static cn.ipman.mq.model.Constants.DELIMITER; + + +/** + * Description for this class + * + * @Author IpMan + * @Date 2024/6/29 20:07 + */ +public class NettyMQServerHandler extends ChannelInboundHandlerAdapter { + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + // 解析请求 + String message = (String) msg; + System.out.println("Received message: " + message); + NettyRequest request = JSON.parseObject(message, new TypeReference>() { + }); + // 处理请求并返回 + NettyResponse response = handlerRequest(request); + String jsonResponse = JSON.toJSONString(response) + DELIMITER; + ctx.writeAndFlush(jsonResponse); + } + + private NettyResponse handlerRequest(NettyRequest request) { + return switch (request.getAction()) { + case "send" -> handleSend(request); + case "receive" -> handleReceive(request); + case "batch-receive" -> handleBatchReceive(request); + case "ack" -> handleAck(request); + case "sub" -> handleSubscribe(request); + case "unsub" -> handleUnSubscribe(request); + case "stat" -> handleStat(request); + default -> new NettyResponse<>(request.getTraceId(), 0, "Unknown action"); + }; + } + + private NettyResponse handleSend(NettyRequest request) { + // 调用 MessageQueue.send 方法 + String topic = request.getParams().get("t"); + String result = String.valueOf(MessageQueue.send(topic, + JSON.parseObject((String) request.getBody(), new TypeReference>(){}))); + return new NettyResponse<>(request.getTraceId(), 1, "msg" + result); + } + + private NettyResponse handleReceive(NettyRequest request) { + // 调用 MessageQueue.receive 方法 + System.out.println("===>>===>>===>>===>>===>> " + request); + String topic = request.getParams().get("t"); + String consumerId = request.getParams().get("cid"); + Message message = MessageQueue.receive(topic, consumerId); + return new NettyResponse<>(request.getTraceId(), 1, message); + } + + private NettyResponse handleBatchReceive(NettyRequest request) { + // 调用 MessageQueue.batchReceive 方法 + String topic = request.getParams().get("t"); + String consumerId = request.getParams().get("cid"); + int size = Integer.parseInt(request.getParams().getOrDefault("size", "1000")); + List> messages = MessageQueue.batchReceive(topic, consumerId, size); + return new NettyResponse<>(request.getTraceId(), 1, messages); + } + + private NettyResponse handleAck(NettyRequest request) { + // 调用 MessageQueue.ack 方法 + String topic = request.getParams().get("t"); + String consumerId = request.getParams().get("cid"); + int offset = Integer.parseInt(request.getParams().get("offset")); + String result = String.valueOf(MessageQueue.ack(topic, consumerId, offset)); + return new NettyResponse<>(request.getTraceId(), 1, result); + } + + private NettyResponse handleSubscribe(NettyRequest request) { + // 调用 MessageQueue.sub 方法 + String topic = request.getParams().get("t"); + String consumerId = request.getParams().get("cid"); + MessageQueue.sub(new Subscription(topic, consumerId, -1)); + return new NettyResponse<>(request.getTraceId(), 1, null); + } + + private NettyResponse handleUnSubscribe(NettyRequest request) { + // 调用 MessageQueue.unsub 方法 + String topic = request.getParams().get("t"); + String consumerId = request.getParams().get("cid"); + MessageQueue.unsub(new Subscription(topic, consumerId, -1)); + return new NettyResponse<>(request.getTraceId(), 1, null); + } + + private NettyResponse handleStat(NettyRequest request) { + // 调用 MessageQueue.stat 方法 + String topic = request.getParams().get("t"); + String consumerId = request.getParams().get("cid"); + Statistical stat = MessageQueue.stat(topic, consumerId); + return new NettyResponse<>(request.getTraceId(), 1, stat); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + cause.printStackTrace(); + ctx.close(); + } + +} diff --git a/src/main/java/cn/ipman/mq/server/NettyServerBootstrap.java b/src/main/java/cn/ipman/mq/server/NettyServerBootstrap.java new file mode 100644 index 0000000..4caab11 --- /dev/null +++ b/src/main/java/cn/ipman/mq/server/NettyServerBootstrap.java @@ -0,0 +1,33 @@ +package cn.ipman.mq.server; + +import org.jetbrains.annotations.NotNull; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.ApplicationEvent; +import org.springframework.context.ApplicationListener; +import org.springframework.stereotype.Component; + + +/** + * Description for this class + * + * @Author IpMan + * @Date 2024/6/29 20:07 + */ +@Component +public class NettyServerBootstrap implements ApplicationListener { + + @Override + public void onApplicationEvent(@NotNull ApplicationEvent event) { + if (event instanceof ApplicationReadyEvent) { + Thread thread = new Thread(() -> { + NettyMQServer server = new NettyMQServer(6666); + try { + server.run(); + } catch (Exception e) { + e.printStackTrace(); + } + }); + thread.start(); + } + } +} diff --git a/src/main/java/cn/ipman/mq/store/Indexer.java b/src/main/java/cn/ipman/mq/store/Indexer.java old mode 100644 new mode 100755 index 332e62a..3c9b33a --- a/src/main/java/cn/ipman/mq/store/Indexer.java +++ b/src/main/java/cn/ipman/mq/store/Indexer.java @@ -2,12 +2,14 @@ import lombok.AllArgsConstructor; import lombok.Data; +import lombok.Getter; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; /** * 索引器类,用于存储和检索消息的偏移量和长度信息。 @@ -18,72 +20,34 @@ */ public class Indexer { - // 存储主题与入口(偏移量和长度)的映射,使用LinkedMultiValueMap保持插入顺序 static MultiValueMap indexers = new LinkedMultiValueMap<>(); - - // 根据偏移量键映射到具体的入口,提高检索效率 - static Map mappings = new HashMap<>(); // 根据offset索引映射 - - // 偏移量的占位符,用于构造唯一的偏移量键 + @Getter + static Map mappings = new ConcurrentHashMap<>(); public final static String OFFSET_PLACEHOLDER = "||__offset_key__||"; - /** - * 入口类,包含消息的偏移量和长度信息。 - */ @AllArgsConstructor @Data public static class Entry { - int offset; // 偏移量 - int length; // 消息的长度 + int offset; + int length; + int fileIndex; // 新增:文件索引 } - /** - * 根据主题和偏移量构造偏移量键。 - * - * @param topic 消息主题 - * @param offset 消息偏移量 - * @return 构造的偏移量键 - */ public static String getOffsetKey(String topic, int offset) { return topic + OFFSET_PLACEHOLDER + offset; } - - /** - * 添加入口到索引器中。 - * - * @param topic 消息主题 - * @param offset 消息偏移量 - * @param length 消息长度 - */ - public static void addEntry(String topic, int offset, int length) { - // 按topic创建, 一个topic创建一次 - Entry entry = new Entry(offset, length); + public static void addEntry(String topic, int offset, int length, int fileIndex) { + Entry entry = new Entry(offset, length, fileIndex); indexers.add(topic, entry); - // 在topic下, 按offset添加入口 - System.out.println("add offset entry , key = " + getOffsetKey(topic, offset)); mappings.put(getOffsetKey(topic, offset), entry); } - /** - * 根据主题获取所有入口。 - * - * @param topic 消息主题 - * @return 主题对应的入口列表 - */ public static List getEntries(String topic) { return indexers.get(topic); } - /** - * 根据主题和偏移量获取特定的入口。 - * - * @param topic 消息主题 - * @param offset 消息偏移量 - * @return 对应的入口,如果不存在则返回null - */ public static Entry getEntry(String topic, int offset) { - System.out.println("get offset entry , key = " + getOffsetKey(topic, offset)); return mappings.get(getOffsetKey(topic, offset)); } diff --git a/src/main/java/cn/ipman/mq/store/MessageStore.java b/src/main/java/cn/ipman/mq/store/MessageStore.java old mode 100644 new mode 100755 index 8244007..b8d4865 --- a/src/main/java/cn/ipman/mq/store/MessageStore.java +++ b/src/main/java/cn/ipman/mq/store/MessageStore.java @@ -6,6 +6,7 @@ import lombok.SneakyThrows; import java.io.File; +import java.io.IOException; import java.nio.ByteBuffer; import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; @@ -14,6 +15,8 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.StandardOpenOption; +import java.util.HashMap; +import java.util.Map; /** * 消息存储类,用于存储和检索消息。 @@ -22,141 +25,137 @@ public class MessageStore { String topic; - public static final int LEN = 1024 * 100; //100KB,每个文件的大小 + public static final int LEN = 1024 * 10; // 100KB 每个文件的大小 + MappedByteBuffer mappedByteBuffer = null; + FileChannel channel = null; + int currentFileIndex = 0; + int currentOffset = 0; + Map fileBuffers = new HashMap<>(); - /** - * 构造函数初始化MessageStore,指定消息的主题。 - * @param topic 消息主题,用于分类消息。 - */ public MessageStore(String topic) { this.topic = topic; } - /** - * 当前文件的位置 - */ - MappedByteBuffer mappedByteBuffer = null; - - /** - * 初始化消息存储,创建必要的文件和映射内存。 - */ @SneakyThrows public void init() { - File file = new File(this.topic + ".dat"); - if (!file.exists()) { - file.createNewFile(); + File dir = new File(this.topic); + if (!dir.exists()) { + dir.mkdirs(); } - Path path = Paths.get(file.getAbsolutePath()); - FileChannel channel = (FileChannel) Files.newByteChannel(path, - StandardOpenOption.READ, StandardOpenOption.WRITE); + File[] files = dir.listFiles((d, name) -> name.endsWith(".dat")); + if (files != null) { + for (File file : files) { + int fileIndex = Integer.parseInt(file.getName().replace(".dat", "")); + loadFile(fileIndex); + } + } + if (files == null || files.length == 0) { + openFile(0); + } else { + currentFileIndex = files.length - 1; + mappedByteBuffer = fileBuffers.get(currentFileIndex); + currentOffset = mappedByteBuffer.position(); + } + } - mappedByteBuffer = channel - .map(FileChannel.MapMode.READ_WRITE, 0, LEN); + public int nextOffset(int offset, Indexer.Entry entry) { + int result = offset + entry.getFileIndex() * LEN; + System.out.println("************::" + result); + return result; + } + @SneakyThrows + private void loadFile(int fileIndex) { + File file = new File(this.topic + File.separator + fileIndex + ".dat"); + Path path = Paths.get(file.getAbsolutePath()); + FileChannel channel = (FileChannel) Files.newByteChannel(path, StandardOpenOption.READ, StandardOpenOption.WRITE); + MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, LEN); + fileBuffers.put(fileIndex, buffer); - // 初始化时判断topic文件中是否已经有了数据, 如果有的话需要将文件指针指向最新位置 - ByteBuffer buffer = mappedByteBuffer.asReadOnlyBuffer(); - byte[] header = new byte[10]; // 隐藏头,用来声明单个message的长度 - buffer.get(header); + ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer(); + byte[] header = new byte[10]; + readOnlyBuffer.get(header); int offset = 0; while (header[9] > 0) { String trim = new String(header, StandardCharsets.UTF_8).trim(); int len = Integer.parseInt(trim) + 10; - System.out.println("store init topic = " + topic + ", len = " + len + ", header = " + trim); - Indexer.addEntry(topic, offset, len); // 初始化历史数据 - // 计算下一个position,并尝试读取 + Indexer.addEntry(topic, offset + fileIndex * LEN, len, fileIndex); offset += len; - System.out.println(" next = " + offset); - buffer.position(offset); - buffer.get(header); + readOnlyBuffer.position(offset); + if (readOnlyBuffer.remaining() < 10) break; + readOnlyBuffer.get(header); + } + readOnlyBuffer.clear(); + } + + @SneakyThrows + private void openFile(int fileIndex) { + File file = new File(this.topic + File.separator + fileIndex + ".dat"); + if (!file.exists()) { + file.createNewFile(); } - buffer.clear(); - System.out.println("store init topic = " + topic + ", last position = " + offset); - mappedByteBuffer.position(offset); + Path path = Paths.get(file.getAbsolutePath()); + channel = (FileChannel) Files.newByteChannel(path, StandardOpenOption.READ, StandardOpenOption.WRITE); + mappedByteBuffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, LEN); + fileBuffers.put(fileIndex, mappedByteBuffer); + currentFileIndex = fileIndex; + currentOffset = 0; } - /** - * 写入消息到存储。 - * @param message 要写入的消息对象。 - * @return 写入消息的起始位置。 - */ public int write(Message message) { - System.out.println("write position -> " + mappedByteBuffer.position()); // offset String json = JSON.toJSONString(message); - - // 写入header头,方便store初始化时读取所有Message的索引信息 int len = json.getBytes(StandardCharsets.UTF_8).length; - String format = String.format("%010d", len); // 用10个长度表示 + String format = String.format("%010d", len); String msg = format + json; len = len + 10; - // 写入数据 - int position = mappedByteBuffer.position(); // 获取当前偏移量 - Indexer.addEntry(this.topic, position, len); + if (mappedByteBuffer.remaining() < len) { + System.out.println("写满了......"); + try { + channel.close(); + } catch (IOException e) { + e.printStackTrace(); + } + openFile(++currentFileIndex); + } + int position = mappedByteBuffer.position(); + Indexer.addEntry(this.topic, position + currentFileIndex * LEN, len, currentFileIndex); mappedByteBuffer.put(StandardCharsets.UTF_8.encode(msg)); - return position; + currentOffset = mappedByteBuffer.position(); + return position + currentFileIndex * LEN; } - /** - * 获取当前写位置。 - * @return 当前的写位置。 - */ public int pos() { - return mappedByteBuffer.position(); + return currentOffset + currentFileIndex * LEN; } - /** - * 从存储中读取消息。 - * @param offset 消息的偏移量,从存储的起始位置开始。 - * @return 读取到的消息对象,如果找不到则返回null。 - */ public Message read(int offset) { - ByteBuffer readOnlyBuffer = mappedByteBuffer.asReadOnlyBuffer(); + int fileIndex = offset / LEN; + int localOffset = offset % LEN; + + MappedByteBuffer buffer = fileBuffers.get(fileIndex); + if (buffer == null) { + return null; + } + + ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer(); Indexer.Entry entry = Indexer.getEntry(this.topic, offset); if (entry == null) return null; - readOnlyBuffer.position(entry.getOffset() + 10); // 10是隐藏的header头 + readOnlyBuffer.position(localOffset + 10); - int len = entry.getLength() - 10; // 数据的长度 + int len = entry.getLength() - 10; byte[] bytes = new byte[len]; readOnlyBuffer.get(bytes, 0, len); String json = new String(bytes, StandardCharsets.UTF_8); - System.out.println("read only ==>> " + json); - // 反序列化 Message message = JSON.parseObject(json, new TypeReference>() { }); - System.out.println("message.body = " + message); readOnlyBuffer.clear(); return message; } - /** - * 获取当前存储中的消息总数。 - * @return 消息总数。 - */ public int total() { - return Indexer.getEntries(topic).size(); - } - - public static void main(String[] args) { - System.out.println( - "{\"body\":\"{\\\"id\\\":0,\\\"item\\\":\\\"item0\\\",\\\"price\\\":0.0}\",\"headers\":{\"X-offset\":\"0\"},\"id\":0}" - .getBytes(StandardCharsets.UTF_8).length); - - String a = "{\"body\":\"{\\\"id\\\":0,\\\"item\\\":\\\"item0\\\",\\\"price\\\":0.0}\",\"headers\":{\"X-offset\":\"98\"},\"id\":0}"; - String b = "0000000090"; - String c = b + a; - System.out.println(c); - System.out.println(c.getBytes(StandardCharsets.UTF_8).length); - - System.out.println("=======> "); - - String e = String.format("%010d", 90); - String d = e + a; - System.out.println(d); - System.out.println(d.getBytes(StandardCharsets.UTF_8).length); - - + return Indexer.getEntries(topic) != null ? Indexer.getEntries(topic).size() : 0; } } diff --git a/src/main/java/cn/ipman/mq/store/StoreDemo.java b/src/main/java/cn/ipman/mq/store/StoreDemo.java old mode 100644 new mode 100755 index 7597bc7..f8c7114 --- a/src/main/java/cn/ipman/mq/store/StoreDemo.java +++ b/src/main/java/cn/ipman/mq/store/StoreDemo.java @@ -49,7 +49,7 @@ public static void main(String[] args) throws IOException { @SuppressWarnings("unchecked") Message message = (Message) Message.createMessage(content, null); String msg = JSON.toJSONString(message); - Indexer.addEntry("im.order", mappedByteBuffer.position(), msg.getBytes(StandardCharsets.UTF_8).length); + Indexer.addEntry("im.order", mappedByteBuffer.position(), msg.getBytes(StandardCharsets.UTF_8).length, 0); mappedByteBuffer.put(StandardCharsets.UTF_8.encode(msg)); } diff --git a/src/main/java/cn/ipman/mq/utils/HttpUtils.java b/src/main/java/cn/ipman/mq/utils/HttpUtils.java old mode 100644 new mode 100755 diff --git a/src/main/java/cn/ipman/mq/utils/ThreadUtils.java b/src/main/java/cn/ipman/mq/utils/ThreadUtils.java old mode 100644 new mode 100755