Skip to content

Commit

Permalink
sync update topic func to 2.2.3 branch (#521) (#526)
Browse files Browse the repository at this point in the history
* sync update topic func to 2.2.3 branch (#521)

* add topics update

* add Channel2ServerUpdateTopics.java

* sync update topic func to 2.2.3 branch

* update keepalive nodes count log to trace level.
  • Loading branch information
ywy2090 authored Mar 11, 2020
1 parent 53377e3 commit c2c6d0c
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 9 deletions.
10 changes: 10 additions & 0 deletions Changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
### v2.2.3

(2020-03-03)

* 新增
1. Service对象添加updateTopicsToNode接口,SDK可以运行时更新订阅的topic

* 兼容
1. 适配fisco-bcos 2.2.0版本,支持Channel Message v1/V2/V3协议

### v2.2.2

(2020-01-17)
Expand Down
2 changes: 1 addition & 1 deletion release_note.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v2.2.2
v2.2.3
50 changes: 48 additions & 2 deletions src/main/java/org/fisco/bcos/channel/client/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,52 @@ public void run(Timeout timeout) throws Exception {
}
}

/**
* When SDK start, the initial subscribed topics information set by user will be sent to the
* node. User can update subscribed topics again by following steps: 1. Set the topics you want
* to subscribe to again Service service // Servcie object Set<String> topics // topics that
* subscribe again service.setTopics(topics) 2. send update topics message to all nodes
* service.updateTopicsToNode();
*/
public void updateTopicsToNode() {

logger.info(" updateTopicToNode, groupId: {}, topics: {}", groupId, getTopics());

// select send node
ChannelConnections channelConnections =
allChannelConnections
.getAllChannelConnections()
.stream()
.filter(x -> x.getGroupId() == groupId)
.findFirst()
.get();

if (Objects.isNull(channelConnections)) {
throw new IllegalArgumentException(
" No group configuration was found, groupId: " + groupId);
}

ConnectionCallback callback = (ConnectionCallback) channelConnections.getCallback();
if (Objects.isNull(callback)) {
throw new IllegalArgumentException(
" No callback was found for ChannelConnections, service is not initialized");
}
callback.setTopics(getTopics());

/** send update topic message to all connected nodes */
Map<String, ChannelHandlerContext> networkConnections =
channelConnections.getNetworkConnections();
for (ChannelHandlerContext ctx : networkConnections.values()) {
if (Objects.nonNull(ctx) && ChannelHandlerContextHelper.isChannelAvailable(ctx)) {
try {
callback.sendUpdateTopicMessage(ctx);
} catch (Exception e) {
logger.debug(" e: ", e);
}
}
}
}

public void asyncMulticastChannelMessageForVerifyTopic(ChannelRequest request) {
String toTopic = request.getToTopic();
request.setToTopic(getNeedVerifyTopics(toTopic));
Expand Down Expand Up @@ -1133,7 +1179,7 @@ public void onReceiveChannelMessage2(ChannelHandlerContext ctx, ChannelMessage2

push.setSeq(message.getSeq());
push.setMessageID(message.getSeq());
logger.info("msg:{}", Arrays.toString(message.getData()));
logger.debug("msg:{}", Arrays.toString(message.getData()));
push.setContent(message.getData());
pushCallback.onPush(push);
} else {
Expand All @@ -1145,7 +1191,7 @@ public void onReceiveChannelMessage2(ChannelHandlerContext ctx, ChannelMessage2
}

} else if (message.getType() == ChannelMessageType.AMOP_RESPONSE.getType()) {
logger.info("channel message:{}", message.getSeq());
logger.debug("channel message:{}", message.getSeq());
if (callback != null) {
logger.debug("found callback response");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,7 @@ public void reconnect() {
}
}

logger.info(" Keepalive nodes count: {}", aliveConnectionCount);
logger.trace(" Keepalive nodes count: {}", aliveConnectionCount);

for (ConnectionInfo connectionInfo : connectionInfoList) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public void onResponse(BcosResponse response) {
.set(channelProtocol);

//
subBlockNotification(ctx);
sendUpdateTopicMessage(ctx);
queryBlockNumber(ctx);
// channelService.getEventLogFilterManager().sendFilter();

Expand Down Expand Up @@ -234,7 +234,7 @@ public void onResponse(BcosResponse response) {
logger.info(
" query node version timeout, content: {}",
response.getContent());
subBlockNotification(ctx);
sendUpdateTopicMessage(ctx);
queryBlockNumber(ctx);
return;
} else if (response.getErrorCode() != 0) {
Expand Down Expand Up @@ -282,7 +282,7 @@ public void onResponse(BcosResponse response) {
nodeVersion.getResult(),
response.getContent());

subBlockNotification(ctx);
sendUpdateTopicMessage(ctx);
queryBlockNumber(ctx);
// channelService.getEventLogFilterManager().sendFilter();
}
Expand Down Expand Up @@ -316,7 +316,7 @@ public void run(Timeout timeout) throws Exception {
channelService.getSeq2Callback().put(seq, callback);
}

private void subBlockNotification(ChannelHandlerContext ctx) throws JsonProcessingException {
public void sendUpdateTopicMessage(ChannelHandlerContext ctx) throws JsonProcessingException {

Message message = new Message();
message.setResult(0);
Expand All @@ -336,7 +336,7 @@ private void subBlockNotification(ChannelHandlerContext ctx) throws JsonProcessi
ctx.writeAndFlush(out);

logger.info(
" send sub block notification request, seq: {}, content: {}",
" send update topic message request, seq: {}, content: {}",
message.getSeq(),
content);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package org.fisco.bcos.channel.test.amop;

import java.util.HashSet;
import java.util.Set;
import org.fisco.bcos.channel.client.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class Channel2ServerUpdateTopics {
private static final Logger logger = LoggerFactory.getLogger(Channel2Server.class);

public static void main(String[] args) throws Exception {
if (args.length < 1) {
System.out.println("Param: topic");
return;
}
String topic = args[0];
logger.debug("init Server");
ApplicationContext context =
new ClassPathXmlApplicationContext("classpath:applicationContext.xml");
Service service = context.getBean(Service.class);
PushCallback cb = new PushCallback();
service.setPushCallback(cb);
System.out.println("3s...");
Thread.sleep(1000);
System.out.println("2s...");
Thread.sleep(1000);
System.out.println("1s...");
Thread.sleep(1000);

System.out.println("start test");
System.out.println("===================================================================");
service.run();

Thread.sleep(10000);

System.out.println("set topics");
System.out.println("===================================================================");
Set<String> topics = new HashSet<String>();
topics.add(topic);
service.setTopics(topics);
service.updateTopicsToNode();
}
}

0 comments on commit c2c6d0c

Please sign in to comment.