diff --git a/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java b/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java index a33513e4624e1..76e20a97f79f4 100644 --- a/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java +++ b/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java @@ -25,7 +25,6 @@ import org.apache.curator.framework.api.ACLProvider; import org.apache.curator.framework.recipes.cache.CuratorCache; import org.apache.curator.framework.recipes.cache.CuratorCacheListener; -import org.apache.curator.framework.recipes.cache.TreeCacheEvent; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.utils.CloseableUtils; import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext; @@ -53,6 +52,7 @@ import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; @@ -242,31 +242,19 @@ public void watch(final String key, final DataChangedEventListener listener) { caches.put(key, cache); } CuratorCacheListener curatorCacheListener = CuratorCacheListener.builder() - .afterInitialized() - .forTreeCache(client, (framework, treeCacheListener) -> { - Type changedType = getChangedType(treeCacheListener.getType()); - if (Type.IGNORED != changedType) { - listener.onChange(new DataChangedEvent(treeCacheListener.getData().getPath(), - new String(treeCacheListener.getData().getData(), StandardCharsets.UTF_8), changedType)); + .forCreates(childData -> listener.onChange(new DataChangedEvent(childData.getPath(), new String(childData.getData(), StandardCharsets.UTF_8), Type.ADDED))) + .forChanges((oldData, newData) -> { + if (!Objects.equals(oldData, newData)) { + listener.onChange(new DataChangedEvent(newData.getPath(), new String(newData.getData(), StandardCharsets.UTF_8), Type.UPDATED)); } - }).build(); + }) + .forDeletes(oldData -> listener.onChange(new DataChangedEvent(oldData.getPath(), new String(oldData.getData(), StandardCharsets.UTF_8), Type.DELETED))) + .afterInitialized() + .build(); cache.listenable().addListener(curatorCacheListener); cache.start(); } - private Type getChangedType(final TreeCacheEvent.Type type) { - switch (type) { - case NODE_ADDED: - return Type.ADDED; - case NODE_UPDATED: - return Type.UPDATED; - case NODE_REMOVED: - return Type.DELETED; - default: - return Type.IGNORED; - } - } - @Override public void close() { caches.values().forEach(CuratorCache::close);