From c2c6d0ce8adddaaf8f378dec7a0ad7edf4172115 Mon Sep 17 00:00:00 2001 From: ywy2090 <912554887@qq.com> Date: Wed, 11 Mar 2020 10:11:30 +0800 Subject: [PATCH] sync update topic func to 2.2.3 branch (#521) (#526) * sync update topic func to 2.2.3 branch (#521) * add topics update * add Channel2ServerUpdateTopics.java * sync update topic func to 2.2.3 branch * update keepalive nodes count log to trace level. --- Changelog.md | 10 ++++ release_note.txt | 2 +- .../fisco/bcos/channel/client/Service.java | 50 ++++++++++++++++++- .../channel/handler/ChannelConnections.java | 2 +- .../channel/handler/ConnectionCallback.java | 10 ++-- .../test/amop/Channel2ServerUpdateTopics.java | 46 +++++++++++++++++ 6 files changed, 111 insertions(+), 9 deletions(-) create mode 100644 src/test/java/org/fisco/bcos/channel/test/amop/Channel2ServerUpdateTopics.java diff --git a/Changelog.md b/Changelog.md index 47b9da03a..37cfea007 100644 --- a/Changelog.md +++ b/Changelog.md @@ -1,3 +1,13 @@ +### v2.2.3 + +(2020-03-03) + +* 新增 +1. Service对象添加updateTopicsToNode接口,SDK可以运行时更新订阅的topic + +* 兼容 +1. 适配fisco-bcos 2.2.0版本,支持Channel Message v1/V2/V3协议 + ### v2.2.2 (2020-01-17) diff --git a/release_note.txt b/release_note.txt index 6c8b2a3c3..0b1f88b80 100644 --- a/release_note.txt +++ b/release_note.txt @@ -1 +1 @@ -v2.2.2 +v2.2.3 diff --git a/src/main/java/org/fisco/bcos/channel/client/Service.java b/src/main/java/org/fisco/bcos/channel/client/Service.java index ba0009996..e4a957b5c 100644 --- a/src/main/java/org/fisco/bcos/channel/client/Service.java +++ b/src/main/java/org/fisco/bcos/channel/client/Service.java @@ -716,6 +716,52 @@ public void run(Timeout timeout) throws Exception { } } + /** + * When SDK start, the initial subscribed topics information set by user will be sent to the + * node. User can update subscribed topics again by following steps: 1. Set the topics you want + * to subscribe to again Service service // Servcie object Set topics // topics that + * subscribe again service.setTopics(topics) 2. send update topics message to all nodes + * service.updateTopicsToNode(); + */ + public void updateTopicsToNode() { + + logger.info(" updateTopicToNode, groupId: {}, topics: {}", groupId, getTopics()); + + // select send node + ChannelConnections channelConnections = + allChannelConnections + .getAllChannelConnections() + .stream() + .filter(x -> x.getGroupId() == groupId) + .findFirst() + .get(); + + if (Objects.isNull(channelConnections)) { + throw new IllegalArgumentException( + " No group configuration was found, groupId: " + groupId); + } + + ConnectionCallback callback = (ConnectionCallback) channelConnections.getCallback(); + if (Objects.isNull(callback)) { + throw new IllegalArgumentException( + " No callback was found for ChannelConnections, service is not initialized"); + } + callback.setTopics(getTopics()); + + /** send update topic message to all connected nodes */ + Map networkConnections = + channelConnections.getNetworkConnections(); + for (ChannelHandlerContext ctx : networkConnections.values()) { + if (Objects.nonNull(ctx) && ChannelHandlerContextHelper.isChannelAvailable(ctx)) { + try { + callback.sendUpdateTopicMessage(ctx); + } catch (Exception e) { + logger.debug(" e: ", e); + } + } + } + } + public void asyncMulticastChannelMessageForVerifyTopic(ChannelRequest request) { String toTopic = request.getToTopic(); request.setToTopic(getNeedVerifyTopics(toTopic)); @@ -1133,7 +1179,7 @@ public void onReceiveChannelMessage2(ChannelHandlerContext ctx, ChannelMessage2 push.setSeq(message.getSeq()); push.setMessageID(message.getSeq()); - logger.info("msg:{}", Arrays.toString(message.getData())); + logger.debug("msg:{}", Arrays.toString(message.getData())); push.setContent(message.getData()); pushCallback.onPush(push); } else { @@ -1145,7 +1191,7 @@ public void onReceiveChannelMessage2(ChannelHandlerContext ctx, ChannelMessage2 } } else if (message.getType() == ChannelMessageType.AMOP_RESPONSE.getType()) { - logger.info("channel message:{}", message.getSeq()); + logger.debug("channel message:{}", message.getSeq()); if (callback != null) { logger.debug("found callback response"); diff --git a/src/main/java/org/fisco/bcos/channel/handler/ChannelConnections.java b/src/main/java/org/fisco/bcos/channel/handler/ChannelConnections.java index 505caa864..c46633cdb 100644 --- a/src/main/java/org/fisco/bcos/channel/handler/ChannelConnections.java +++ b/src/main/java/org/fisco/bcos/channel/handler/ChannelConnections.java @@ -664,7 +664,7 @@ public void reconnect() { } } - logger.info(" Keepalive nodes count: {}", aliveConnectionCount); + logger.trace(" Keepalive nodes count: {}", aliveConnectionCount); for (ConnectionInfo connectionInfo : connectionInfoList) { diff --git a/src/main/java/org/fisco/bcos/channel/handler/ConnectionCallback.java b/src/main/java/org/fisco/bcos/channel/handler/ConnectionCallback.java index b9b49da4c..380a68ca5 100644 --- a/src/main/java/org/fisco/bcos/channel/handler/ConnectionCallback.java +++ b/src/main/java/org/fisco/bcos/channel/handler/ConnectionCallback.java @@ -177,7 +177,7 @@ public void onResponse(BcosResponse response) { .set(channelProtocol); // - subBlockNotification(ctx); + sendUpdateTopicMessage(ctx); queryBlockNumber(ctx); // channelService.getEventLogFilterManager().sendFilter(); @@ -234,7 +234,7 @@ public void onResponse(BcosResponse response) { logger.info( " query node version timeout, content: {}", response.getContent()); - subBlockNotification(ctx); + sendUpdateTopicMessage(ctx); queryBlockNumber(ctx); return; } else if (response.getErrorCode() != 0) { @@ -282,7 +282,7 @@ public void onResponse(BcosResponse response) { nodeVersion.getResult(), response.getContent()); - subBlockNotification(ctx); + sendUpdateTopicMessage(ctx); queryBlockNumber(ctx); // channelService.getEventLogFilterManager().sendFilter(); } @@ -316,7 +316,7 @@ public void run(Timeout timeout) throws Exception { channelService.getSeq2Callback().put(seq, callback); } - private void subBlockNotification(ChannelHandlerContext ctx) throws JsonProcessingException { + public void sendUpdateTopicMessage(ChannelHandlerContext ctx) throws JsonProcessingException { Message message = new Message(); message.setResult(0); @@ -336,7 +336,7 @@ private void subBlockNotification(ChannelHandlerContext ctx) throws JsonProcessi ctx.writeAndFlush(out); logger.info( - " send sub block notification request, seq: {}, content: {}", + " send update topic message request, seq: {}, content: {}", message.getSeq(), content); } diff --git a/src/test/java/org/fisco/bcos/channel/test/amop/Channel2ServerUpdateTopics.java b/src/test/java/org/fisco/bcos/channel/test/amop/Channel2ServerUpdateTopics.java new file mode 100644 index 000000000..93e46efb5 --- /dev/null +++ b/src/test/java/org/fisco/bcos/channel/test/amop/Channel2ServerUpdateTopics.java @@ -0,0 +1,46 @@ +package org.fisco.bcos.channel.test.amop; + +import java.util.HashSet; +import java.util.Set; +import org.fisco.bcos.channel.client.Service; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.ApplicationContext; +import org.springframework.context.support.ClassPathXmlApplicationContext; + +public class Channel2ServerUpdateTopics { + private static final Logger logger = LoggerFactory.getLogger(Channel2Server.class); + + public static void main(String[] args) throws Exception { + if (args.length < 1) { + System.out.println("Param: topic"); + return; + } + String topic = args[0]; + logger.debug("init Server"); + ApplicationContext context = + new ClassPathXmlApplicationContext("classpath:applicationContext.xml"); + Service service = context.getBean(Service.class); + PushCallback cb = new PushCallback(); + service.setPushCallback(cb); + System.out.println("3s..."); + Thread.sleep(1000); + System.out.println("2s..."); + Thread.sleep(1000); + System.out.println("1s..."); + Thread.sleep(1000); + + System.out.println("start test"); + System.out.println("==================================================================="); + service.run(); + + Thread.sleep(10000); + + System.out.println("set topics"); + System.out.println("==================================================================="); + Set topics = new HashSet(); + topics.add(topic); + service.setTopics(topics); + service.updateTopicsToNode(); + } +}