Skip to content

Commit

Permalink
optimize redis message handle
Browse files Browse the repository at this point in the history
  • Loading branch information
laywin committed Dec 19, 2023
1 parent 52bc5ab commit ac1a55f
Showing 1 changed file with 103 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import io.seata.common.ConfigurationKeys;
Expand Down Expand Up @@ -74,7 +76,9 @@ public class RedisRegistryServiceImpl implements RegistryService<RedisListener>
private static final long KEY_TTL = 5L;
private static final long KEY_REFRESH_PERIOD = 2000L;

private final Object pushEmptyProtectionLock = new Object();
private final ReentrantLock reentrantLock = new ReentrantLock();

private RedisMessageHandler redisMessageHandler = new RedisMessageHandler();

private ScheduledExecutorService threadPoolExecutorForSubscribe = new ScheduledThreadPoolExecutor(1,
new NamedThreadFactory("RedisRegistryService-subscribe", 1));
Expand Down Expand Up @@ -236,22 +240,8 @@ List<InetSocketAddress> lookupByCluster(String clusterName) {
try (Jedis jedis = jedisPool.getResource()) {
updateClusterAddressMap(jedis, redisRegistryKey, clusterName);
}
subscribe(clusterName, msg -> {
String[] msgr = msg.split("-");
String serverAddr = msgr[0];
String eventType = msgr[1];
switch (eventType) {
case RedisListener.REGISTER:
CLUSTER_ADDRESS_MAP.computeIfAbsent(clusterName, value -> ConcurrentHashMap.newKeySet(2))
.add(NetUtil.toInetSocketAddress(serverAddr));
break;
case RedisListener.UN_REGISTER:
removeServerAddressByPushEmptyProtection(clusterName, serverAddr);
break;
default:
throw new ShouldNeverHappenException("unknown redis msg:" + msg);
}
});
subscribe(clusterName, msg -> redisMessageHandler.accept(new RedisMessage(msg, clusterName,
MessageType.SUBSCRIBE)));
}
return new ArrayList<>(CLUSTER_ADDRESS_MAP.computeIfAbsent(clusterName, value -> ConcurrentHashMap.newKeySet(2)));
}
Expand All @@ -270,7 +260,8 @@ private void removeServerAddressByPushEmptyProtection(String notifyCluserName, S
Set<InetSocketAddress> socketAddresses = CLUSTER_ADDRESS_MAP.computeIfAbsent(notifyCluserName, value -> ConcurrentHashMap.newKeySet(2));
InetSocketAddress inetSocketAddress = NetUtil.toInetSocketAddress(serverAddr);

synchronized (pushEmptyProtectionLock) {
reentrantLock.lock();
try {
if (socketAddresses.size() == 1 && socketAddresses.contains(inetSocketAddress)) {
String txServiceGroupName = ConfigurationFactory.getInstance().getConfig(ConfigurationKeys.TX_SERVICE_GROUP);
if (StringUtils.isNotBlank(txServiceGroupName)) {
Expand All @@ -281,6 +272,8 @@ private void removeServerAddressByPushEmptyProtection(String notifyCluserName, S
}
}
CLUSTER_ADDRESS_MAP.get(notifyCluserName).remove(inetSocketAddress);
} finally {
reentrantLock.unlock();
}
}

Expand Down Expand Up @@ -338,9 +331,49 @@ private void updateClusterAddressMap(Jedis jedis, String redisRegistryKey, Strin
}
} while (!cursor.equals(ScanParams.SCAN_POINTER_START));

synchronized (pushEmptyProtectionLock) {
if (CollectionUtils.isNotEmpty(newAddressSet) && !newAddressSet.equals(CLUSTER_ADDRESS_MAP.get(clusterName))) {
CLUSTER_ADDRESS_MAP.put(clusterName, newAddressSet);
redisMessageHandler.accept(new RedisMessage(newAddressSet, clusterName, MessageType.SCAN));
}

/**
*
* handle redis message
*
*/
public class RedisMessageHandler implements Consumer<RedisMessage> {

/***
*
* @param redisMessage
*/
@Override
public void accept(RedisMessage redisMessage) {
String clusterName = redisMessage.getCluster();
if (redisMessage.getMessageType() == MessageType.SUBSCRIBE) {
String msg = (String)redisMessage.getMessage();
String[] msgr = msg.split("-");
String serverAddr = msgr[0];
String eventType = msgr[1];
switch (eventType) {
case RedisListener.REGISTER:
CLUSTER_ADDRESS_MAP.computeIfAbsent(redisMessage.getCluster(), value -> ConcurrentHashMap.newKeySet(2))
.add(NetUtil.toInetSocketAddress(serverAddr));
break;
case RedisListener.UN_REGISTER:
removeServerAddressByPushEmptyProtection(redisMessage.getCluster(), serverAddr);
break;
default:
throw new ShouldNeverHappenException("unknown redis msg:" + msg);
}
} else if (redisMessage.getMessageType() == MessageType.SCAN){
Set<InetSocketAddress> newAddressSet = (Set<InetSocketAddress>)redisMessage.getMessage();
if (CollectionUtils.isNotEmpty(newAddressSet) && !newAddressSet.equals(CLUSTER_ADDRESS_MAP.get(clusterName))) {
reentrantLock.lock();
try {
CLUSTER_ADDRESS_MAP.put(clusterName, newAddressSet);
} finally {
reentrantLock.unlock();
}
}
}
}
}
Expand All @@ -361,4 +394,53 @@ private String getRedisDbFileKey() {
return REDIS_FILEKEY_PREFIX + REDIS_DB;
}

private enum MessageType {

SCAN, SUBSCRIBE;
}

/**
*
* redis message
*
*/
private static class RedisMessage {

private Object message;

private String cluster;

private MessageType messageType;

public RedisMessage(Object message, String cluster, MessageType messageType) {
this.message = message;
this.cluster = cluster;
this.messageType = messageType;
}

public Object getMessage() {
return message;
}

public void setMessage(Object message) {
this.message = message;
}

public String getCluster() {
return cluster;
}

public void setCluster(String cluster) {
this.cluster = cluster;
}

public MessageType getMessageType() {
return messageType;
}

public void setMessageType(MessageType messageType) {
this.messageType = messageType;
}
}

}

0 comments on commit ac1a55f

Please sign in to comment.