Skip to content

Commit

Permalink
Refactor ZookeeperRepository#watch to avoid unless refresh metadata. (#…
Browse files Browse the repository at this point in the history
…31403)

* Refactor ZookeeperRepository#watch to avoid unless refresh metadata.

* Fix deletes
  • Loading branch information
zhaojinchao95 authored May 27, 2024
1 parent a44fa72 commit da31876
Showing 1 changed file with 9 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.framework.recipes.cache.CuratorCache;
import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext;
Expand Down Expand Up @@ -53,6 +52,7 @@
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -242,31 +242,19 @@ public void watch(final String key, final DataChangedEventListener listener) {
caches.put(key, cache);
}
CuratorCacheListener curatorCacheListener = CuratorCacheListener.builder()
.afterInitialized()
.forTreeCache(client, (framework, treeCacheListener) -> {
Type changedType = getChangedType(treeCacheListener.getType());
if (Type.IGNORED != changedType) {
listener.onChange(new DataChangedEvent(treeCacheListener.getData().getPath(),
new String(treeCacheListener.getData().getData(), StandardCharsets.UTF_8), changedType));
.forCreates(childData -> listener.onChange(new DataChangedEvent(childData.getPath(), new String(childData.getData(), StandardCharsets.UTF_8), Type.ADDED)))
.forChanges((oldData, newData) -> {
if (!Objects.equals(oldData, newData)) {
listener.onChange(new DataChangedEvent(newData.getPath(), new String(newData.getData(), StandardCharsets.UTF_8), Type.UPDATED));
}
}).build();
})
.forDeletes(oldData -> listener.onChange(new DataChangedEvent(oldData.getPath(), new String(oldData.getData(), StandardCharsets.UTF_8), Type.DELETED)))
.afterInitialized()
.build();
cache.listenable().addListener(curatorCacheListener);
cache.start();
}

private Type getChangedType(final TreeCacheEvent.Type type) {
switch (type) {
case NODE_ADDED:
return Type.ADDED;
case NODE_UPDATED:
return Type.UPDATED;
case NODE_REMOVED:
return Type.DELETED;
default:
return Type.IGNORED;
}
}

@Override
public void close() {
caches.values().forEach(CuratorCache::close);
Expand Down

0 comments on commit da31876

Please sign in to comment.