Skip to content

Commit

Permalink
add pushEmptyProtectionLock to protect pull & listener
Browse files Browse the repository at this point in the history
  • Loading branch information
laywin committed Dec 19, 2023
1 parent d423833 commit 52bc5ab
Showing 1 changed file with 17 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ public class RedisRegistryServiceImpl implements RegistryService<RedisListener>
private static final long KEY_TTL = 5L;
private static final long KEY_REFRESH_PERIOD = 2000L;

private final Object pushEmptyProtectionLock = new Object();

private ScheduledExecutorService threadPoolExecutorForSubscribe = new ScheduledThreadPoolExecutor(1,
new NamedThreadFactory("RedisRegistryService-subscribe", 1));
private ScheduledExecutorService threadPoolExecutorForUpdateMap = new ScheduledThreadPoolExecutor(1,
Expand Down Expand Up @@ -267,15 +269,19 @@ private void removeServerAddressByPushEmptyProtection(String notifyCluserName, S

Set<InetSocketAddress> socketAddresses = CLUSTER_ADDRESS_MAP.computeIfAbsent(notifyCluserName, value -> ConcurrentHashMap.newKeySet(2));
InetSocketAddress inetSocketAddress = NetUtil.toInetSocketAddress(serverAddr);
if (socketAddresses.size() == 1 && socketAddresses.contains(inetSocketAddress)) {
String txServiceGroupName = ConfigurationFactory.getInstance()
.getConfig(ConfigurationKeys.TX_SERVICE_GROUP);
String clusterName = getServiceGroup(txServiceGroupName);
if (notifyCluserName.equals(clusterName)) {
return;

synchronized (pushEmptyProtectionLock) {
if (socketAddresses.size() == 1 && socketAddresses.contains(inetSocketAddress)) {
String txServiceGroupName = ConfigurationFactory.getInstance().getConfig(ConfigurationKeys.TX_SERVICE_GROUP);
if (StringUtils.isNotBlank(txServiceGroupName)) {
String clusterName = getServiceGroup(txServiceGroupName);
if (notifyCluserName.equals(clusterName)) {
return;
}
}
}
CLUSTER_ADDRESS_MAP.get(notifyCluserName).remove(inetSocketAddress);
}
CLUSTER_ADDRESS_MAP.get(notifyCluserName).remove(inetSocketAddress);
}

@Override
Expand Down Expand Up @@ -332,8 +338,10 @@ private void updateClusterAddressMap(Jedis jedis, String redisRegistryKey, Strin
}
} while (!cursor.equals(ScanParams.SCAN_POINTER_START));

if (CollectionUtils.isNotEmpty(newAddressSet) && !newAddressSet.equals(CLUSTER_ADDRESS_MAP.get(clusterName))) {
CLUSTER_ADDRESS_MAP.put(clusterName, newAddressSet);
synchronized (pushEmptyProtectionLock) {
if (CollectionUtils.isNotEmpty(newAddressSet) && !newAddressSet.equals(CLUSTER_ADDRESS_MAP.get(clusterName))) {
CLUSTER_ADDRESS_MAP.put(clusterName, newAddressSet);
}
}
}

Expand Down

0 comments on commit 52bc5ab

Please sign in to comment.