From 50c1f7ace2843216fb4e11dce59c4bee78b477a8 Mon Sep 17 00:00:00 2001 From: asdf2014 Date: Mon, 13 Jan 2020 00:33:18 +0800 Subject: [PATCH 01/13] * Fix huge number of watches in zk * Tear down nodeAnnouncer * Remove useless Logger and ExecutorService * Init CuratorListener by lambda * Improve explicit type * Using CuratorMultiTransaction instead of CuratorTransaction * Add @GuardedBy("toAnnounce") for toUpdate field * Improve docs --- .../worker/WorkerCuratorCoordinator.java | 7 +- .../druid/curator/announcement/Announcer.java | 2 + .../curator/announcement/NodeAnnouncer.java | 350 ++++++++++++++++++ .../discovery/CuratorDruidNodeAnnouncer.java | 6 +- .../apache/druid/guice/AnnouncerModule.java | 8 + .../LookupResourceListenerAnnouncer.java | 4 +- .../org/apache/druid/server/ZKPathsUtils.java | 30 ++ .../CuratorDataSegmentServerAnnouncer.java | 6 +- .../announcer/ListenerResourceAnnouncer.java | 8 +- .../client/BatchServerInventoryViewTest.java | 8 +- .../announcement/NodeAnnouncerTest.java | 285 ++++++++++++++ ...torDruidNodeAnnouncerAndDiscoveryTest.java | 8 +- .../ListenerResourceAnnouncerTest.java | 23 +- 13 files changed, 702 insertions(+), 43 deletions(-) create mode 100644 server/src/main/java/org/apache/druid/curator/announcement/NodeAnnouncer.java create mode 100644 server/src/main/java/org/apache/druid/server/ZKPathsUtils.java create mode 100644 server/src/test/java/org/apache/druid/curator/announcement/NodeAnnouncerTest.java diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerCuratorCoordinator.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerCuratorCoordinator.java index 7e7c09893b9d..7ed8aea24f0b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerCuratorCoordinator.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerCuratorCoordinator.java @@ -25,11 +25,10 @@ import com.google.inject.Inject; import org.apache.curator.framework.CuratorFramework; import org.apache.druid.curator.CuratorUtils; -import org.apache.druid.curator.announcement.Announcer; +import org.apache.druid.curator.announcement.NodeAnnouncer; import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStop; import org.apache.druid.java.util.common.logger.Logger; @@ -54,7 +53,7 @@ public class WorkerCuratorCoordinator private final ObjectMapper jsonMapper; private final RemoteTaskRunnerConfig config; private final CuratorFramework curatorFramework; - private final Announcer announcer; + private final NodeAnnouncer announcer; private final String baseAnnouncementsPath; private final String baseTaskPath; @@ -77,7 +76,7 @@ public WorkerCuratorCoordinator( this.curatorFramework = curatorFramework; this.worker = worker; - this.announcer = new Announcer(curatorFramework, Execs.directExecutor()); + this.announcer = new NodeAnnouncer(curatorFramework); this.baseAnnouncementsPath = getPath(Arrays.asList(indexerZkConfig.getAnnouncementsPath(), worker.getHost())); this.baseTaskPath = getPath(Arrays.asList(indexerZkConfig.getTasksPath(), worker.getHost())); diff --git a/server/src/main/java/org/apache/druid/curator/announcement/Announcer.java b/server/src/main/java/org/apache/druid/curator/announcement/Announcer.java index 533389a02c3e..89694d67c557 100644 --- a/server/src/main/java/org/apache/druid/curator/announcement/Announcer.java +++ b/server/src/main/java/org/apache/druid/curator/announcement/Announcer.java @@ -40,6 +40,7 @@ import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.data.Stat; +import javax.annotation.concurrent.GuardedBy; import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; @@ -64,6 +65,7 @@ public class Announcer private final ExecutorService pathChildrenCacheExecutor; private final List toAnnounce = new ArrayList<>(); + @GuardedBy("toAnnounce") private final List toUpdate = new ArrayList<>(); private final ConcurrentMap listeners = new ConcurrentHashMap<>(); private final ConcurrentMap> announcements = new ConcurrentHashMap<>(); diff --git a/server/src/main/java/org/apache/druid/curator/announcement/NodeAnnouncer.java b/server/src/main/java/org/apache/druid/curator/announcement/NodeAnnouncer.java new file mode 100644 index 000000000000..a68b426e5402 --- /dev/null +++ b/server/src/main/java/org/apache/druid/curator/announcement/NodeAnnouncer.java @@ -0,0 +1,350 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.curator.announcement; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.transaction.CuratorOp; +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.NodeCache; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.guava.CloseQuietly; +import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.java.util.common.lifecycle.LifecycleStart; +import org.apache.druid.java.util.common.lifecycle.LifecycleStop; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.server.ZKPathsUtils; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; + +import javax.annotation.concurrent.GuardedBy; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; + +/** + * NodeAnnouncer announces single node on Zookeeper and only watches this node, + * while {@link Announcer} watches all child paths, not only this node + */ +public class NodeAnnouncer +{ + private static final Logger log = new Logger(NodeAnnouncer.class); + + private final CuratorFramework curator; + + /** + * In case a path is added to this collection in {@link #announce} before zk is connected, + * should remember the path and do announce in {@link #start} later. + */ + private final List toAnnounce = new ArrayList<>(); + /** + * In case a path is added to this collection in {@link #update} before zk is connected, + * should remember the path and do update in {@link #start} later. + */ + @GuardedBy("toAnnounce") + private final List toUpdate = new ArrayList<>(); + private final ConcurrentMap listeners = new ConcurrentHashMap<>(); + private final ConcurrentMap announcedPaths = new ConcurrentHashMap<>(); + /** + * Only the one created the parent path can drop the parent path, so should remember these created parents. + */ + private final List pathsCreatedInThisAnnouncer = new CopyOnWriteArrayList<>(); + + private boolean started = false; + + public NodeAnnouncer(CuratorFramework curator) + { + this.curator = curator; + } + + @VisibleForTesting + Set getAddedPaths() + { + return announcedPaths.keySet(); + } + + @LifecycleStart + @SuppressWarnings("DuplicatedCode") + public void start() + { + log.info("Starting announcer"); + synchronized (toAnnounce) { + if (started) { + return; + } + + started = true; + + for (Announceable announceable : toAnnounce) { + announce(announceable.path, announceable.bytes, announceable.removeParentsIfCreated); + } + toAnnounce.clear(); + + for (Announceable announceable : toUpdate) { + update(announceable.path, announceable.bytes); + } + toUpdate.clear(); + } + } + + @LifecycleStop + public void stop() + { + log.info("Stopping announcer"); + synchronized (toAnnounce) { + if (!started) { + return; + } + + started = false; + + Closer closer = Closer.create(); + for (NodeCache cache : listeners.values()) { + closer.register(cache); + } + CloseQuietly.close(closer); + + for (String announcementPath : announcedPaths.keySet()) { + unannounce(announcementPath); + } + + if (!pathsCreatedInThisAnnouncer.isEmpty()) { + final List deleteOps = new ArrayList<>(pathsCreatedInThisAnnouncer.size()); + for (String parent : pathsCreatedInThisAnnouncer) { + try { + deleteOps.add(curator.transactionOp().delete().forPath(parent)); + } + catch (Exception e) { + log.error(e, "Unable to delete parent[%s].", parent); + } + } + try { + curator.transaction().forOperations(deleteOps); + } + catch (Exception e) { + log.error(e, "Unable to commit transaction."); + } + } + } + } + + /** + * Like announce(path, bytes, true). + */ + public void announce(String path, byte[] bytes) + { + announce(path, bytes, true); + } + + /** + * Announces the provided bytes at the given path. Announcement means that it will create an ephemeral node + * and monitor it to make sure that it always exists until it is unannounced or this object is closed. + * + * @param path The path to announce at + * @param bytes The payload to announce + * @param removeParentIfCreated remove parent of "path" if we had created that parent + */ + public void announce(String path, byte[] bytes, boolean removeParentIfCreated) + { + synchronized (toAnnounce) { + if (!started) { + toAnnounce.add(new Announceable(path, bytes, removeParentIfCreated)); + return; + } + } + + final String parentPath = ZKPathsUtils.getParentPath(path); + boolean buildParentPath = false; + + byte[] value = announcedPaths.get(path); + + if (value == null) { + try { + if (curator.checkExists().forPath(parentPath) == null) { + buildParentPath = true; + } + } + catch (Exception e) { + log.debug(e, "Problem checking if the parent existed, ignoring."); + } + + // Synchronize to make sure that I only create a listener once. + synchronized (toAnnounce) { + if (!listeners.containsKey(path)) { + final NodeCache cache = new NodeCache(curator, path, true); + cache.getListenable().addListener( + () -> { + ChildData currentData = cache.getCurrentData(); + if (currentData == null) { + final byte[] value1 = announcedPaths.get(path); + if (value1 != null) { + log.info("Node[%s] dropped, reinstating.", path); + createAnnouncement(path, value1); + } + } + } + ); + + if (started) { + if (buildParentPath) { + createPath(parentPath, removeParentIfCreated); + } + startCache(cache); + listeners.put(path, cache); + } + } + } + } + + boolean created = false; + synchronized (toAnnounce) { + if (started) { + byte[] oldBytes = announcedPaths.putIfAbsent(path, bytes); + + if (oldBytes == null) { + created = true; + } else if (!Arrays.equals(oldBytes, bytes)) { + throw new IAE("Cannot reannounce different values under the same path"); + } + } + } + + if (created) { + try { + createAnnouncement(path, bytes); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + public void update(final String path, final byte[] bytes) + { + synchronized (toAnnounce) { + if (!started) { + // removeParentsIfCreated is not relevant for updates; use dummy value "false". + toUpdate.add(new Announceable(path, bytes, false)); + return; + } + } + + byte[] oldBytes = announcedPaths.get(path); + + if (oldBytes == null) { + throw new ISE("Cannot update a path[%s] that hasn't been announced!", path); + } + + synchronized (toAnnounce) { + try { + if (!Arrays.equals(oldBytes, bytes)) { + announcedPaths.put(path, bytes); + updateAnnouncement(path, bytes); + } + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + private void createAnnouncement(final String path, byte[] value) throws Exception + { + curator.create().compressed().withMode(CreateMode.EPHEMERAL).inBackground().forPath(path, value); + } + + private void updateAnnouncement(final String path, final byte[] value) throws Exception + { + curator.setData().compressed().inBackground().forPath(path, value); + } + + /** + * Unannounces an announcement created at path. Note that if all announcements get removed, the Announcer + * will continue to have ZK watches on paths because clearing them out is a source of ugly race conditions. + *

+ * If you need to completely clear all the state of what is being watched and announced, stop() the Announcer. + * + * @param path the path to unannounce + */ + public void unannounce(String path) + { + log.info("unannouncing [%s]", path); + final byte[] value = announcedPaths.remove(path); + + if (value == null) { + log.error("Path[%s] not announced, cannot unannounce.", path); + return; + } + + try { + curator.transaction().forOperations(curator.transactionOp().delete().forPath(path)); + } + catch (KeeperException.NoNodeException e) { + log.info("node[%s] didn't exist anyway...", path); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + + private void startCache(NodeCache cache) + { + try { + cache.start(); + } + catch (Exception e) { + CloseQuietly.close(cache); + throw new RuntimeException(e); + } + } + + private void createPath(String parentPath, boolean removeParentsIfCreated) + { + try { + curator.create().creatingParentsIfNeeded().forPath(parentPath); + if (removeParentsIfCreated) { + pathsCreatedInThisAnnouncer.add(parentPath); + } + log.debug("Created parentPath[%s], %s remove on stop() called.", parentPath, removeParentsIfCreated ? "will" : "will not"); + } + catch (Exception e) { + log.error(e, "Problem creating parentPath[%s], someone else created it first?", parentPath); + } + } + + private static class Announceable + { + final String path; + final byte[] bytes; + final boolean removeParentsIfCreated; + + public Announceable(String path, byte[] bytes, boolean removeParentsIfCreated) + { + this.path = path; + this.bytes = bytes; + this.removeParentsIfCreated = removeParentsIfCreated; + } + } +} diff --git a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncer.java b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncer.java index 5b536c65d39d..0dc4d85ec8ea 100644 --- a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncer.java +++ b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncer.java @@ -23,7 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.inject.Inject; import org.apache.curator.utils.ZKPaths; -import org.apache.druid.curator.announcement.Announcer; +import org.apache.druid.curator.announcement.NodeAnnouncer; import org.apache.druid.discovery.DiscoveryDruidNode; import org.apache.druid.discovery.DruidNodeAnnouncer; import org.apache.druid.discovery.NodeRole; @@ -42,12 +42,12 @@ static String makeNodeAnnouncementPath(ZkPathsConfig config, NodeRole nodeRole, private static final Logger log = new Logger(CuratorDruidNodeAnnouncer.class); - private final Announcer announcer; + private final NodeAnnouncer announcer; private final ZkPathsConfig config; private final ObjectMapper jsonMapper; @Inject - public CuratorDruidNodeAnnouncer(Announcer announcer, ZkPathsConfig config, @Json ObjectMapper jsonMapper) + public CuratorDruidNodeAnnouncer(NodeAnnouncer announcer, ZkPathsConfig config, @Json ObjectMapper jsonMapper) { this.announcer = announcer; this.config = config; diff --git a/server/src/main/java/org/apache/druid/guice/AnnouncerModule.java b/server/src/main/java/org/apache/druid/guice/AnnouncerModule.java index 3a48183bdc1a..13a2372b86c9 100644 --- a/server/src/main/java/org/apache/druid/guice/AnnouncerModule.java +++ b/server/src/main/java/org/apache/druid/guice/AnnouncerModule.java @@ -24,6 +24,7 @@ import com.google.inject.Provides; import org.apache.curator.framework.CuratorFramework; import org.apache.druid.curator.announcement.Announcer; +import org.apache.druid.curator.announcement.NodeAnnouncer; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.server.coordination.BatchDataSegmentAnnouncer; import org.apache.druid.server.coordination.CuratorDataSegmentServerAnnouncer; @@ -52,4 +53,11 @@ public Announcer getAnnouncer(CuratorFramework curator) { return new Announcer(curator, Execs.singleThreaded("Announcer-%s")); } + + @Provides + @ManageLifecycle + public NodeAnnouncer getNodeAnnouncer(CuratorFramework curator) + { + return new NodeAnnouncer(curator); + } } diff --git a/server/src/main/java/org/apache/druid/query/lookup/LookupResourceListenerAnnouncer.java b/server/src/main/java/org/apache/druid/query/lookup/LookupResourceListenerAnnouncer.java index d58e2b69d7bc..07a8c4d80250 100644 --- a/server/src/main/java/org/apache/druid/query/lookup/LookupResourceListenerAnnouncer.java +++ b/server/src/main/java/org/apache/druid/query/lookup/LookupResourceListenerAnnouncer.java @@ -20,7 +20,7 @@ package org.apache.druid.query.lookup; import com.google.inject.Inject; -import org.apache.druid.curator.announcement.Announcer; +import org.apache.druid.curator.announcement.NodeAnnouncer; import org.apache.druid.guice.annotations.Self; import org.apache.druid.server.DruidNode; import org.apache.druid.server.http.HostAndPortWithScheme; @@ -31,7 +31,7 @@ class LookupResourceListenerAnnouncer extends ListenerResourceAnnouncer { @Inject public LookupResourceListenerAnnouncer( - Announcer announcer, + NodeAnnouncer announcer, LookupListeningAnnouncerConfig lookupListeningAnnouncerConfig, @Self DruidNode node ) diff --git a/server/src/main/java/org/apache/druid/server/ZKPathsUtils.java b/server/src/main/java/org/apache/druid/server/ZKPathsUtils.java new file mode 100644 index 000000000000..9832abde27bf --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/ZKPathsUtils.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server; + +import org.apache.curator.utils.ZKPaths; + +public class ZKPathsUtils +{ + public static String getParentPath(String path) + { + return ZKPaths.getPathAndNode(path).getPath(); + } +} diff --git a/server/src/main/java/org/apache/druid/server/coordination/CuratorDataSegmentServerAnnouncer.java b/server/src/main/java/org/apache/druid/server/coordination/CuratorDataSegmentServerAnnouncer.java index 1f456bd3445d..a337c78b948d 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/CuratorDataSegmentServerAnnouncer.java +++ b/server/src/main/java/org/apache/druid/server/coordination/CuratorDataSegmentServerAnnouncer.java @@ -23,7 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.inject.Inject; import org.apache.curator.utils.ZKPaths; -import org.apache.druid.curator.announcement.Announcer; +import org.apache.druid.curator.announcement.NodeAnnouncer; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.server.initialization.ZkPathsConfig; @@ -37,7 +37,7 @@ public class CuratorDataSegmentServerAnnouncer implements DataSegmentServerAnnou private final DruidServerMetadata server; private final ZkPathsConfig config; - private final Announcer announcer; + private final NodeAnnouncer announcer; private final ObjectMapper jsonMapper; private final Object lock = new Object(); @@ -48,7 +48,7 @@ public class CuratorDataSegmentServerAnnouncer implements DataSegmentServerAnnou public CuratorDataSegmentServerAnnouncer( DruidServerMetadata server, ZkPathsConfig config, - Announcer announcer, + NodeAnnouncer announcer, ObjectMapper jsonMapper ) { diff --git a/server/src/main/java/org/apache/druid/server/listener/announcer/ListenerResourceAnnouncer.java b/server/src/main/java/org/apache/druid/server/listener/announcer/ListenerResourceAnnouncer.java index 04d51df0b41d..f0d84d81fff7 100644 --- a/server/src/main/java/org/apache/druid/server/listener/announcer/ListenerResourceAnnouncer.java +++ b/server/src/main/java/org/apache/druid/server/listener/announcer/ListenerResourceAnnouncer.java @@ -20,7 +20,7 @@ package org.apache.druid.server.listener.announcer; import org.apache.curator.utils.ZKPaths; -import org.apache.druid.curator.announcement.Announcer; +import org.apache.druid.curator.announcement.NodeAnnouncer; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStop; import org.apache.druid.java.util.common.logger.Logger; @@ -41,11 +41,11 @@ public abstract class ListenerResourceAnnouncer private static final Logger LOG = new Logger(ListenerResourceAnnouncer.class); private final Object startStopSync = new Object(); private volatile boolean started = false; - private final Announcer announcer; + private final NodeAnnouncer announcer; private final String announcePath; public ListenerResourceAnnouncer( - Announcer announcer, + NodeAnnouncer announcer, ListeningAnnouncerConfig listeningAnnouncerConfig, String listener_key, HostAndPortWithScheme node @@ -59,7 +59,7 @@ public ListenerResourceAnnouncer( } ListenerResourceAnnouncer( - Announcer announcer, + NodeAnnouncer announcer, String announceBasePath, HostAndPortWithScheme node ) diff --git a/server/src/test/java/org/apache/druid/client/client/BatchServerInventoryViewTest.java b/server/src/test/java/org/apache/druid/client/client/BatchServerInventoryViewTest.java index 9ba446659633..76d959e5d2f7 100644 --- a/server/src/test/java/org/apache/druid/client/client/BatchServerInventoryViewTest.java +++ b/server/src/test/java/org/apache/druid/client/client/BatchServerInventoryViewTest.java @@ -39,6 +39,7 @@ import org.apache.druid.client.ServerView; import org.apache.druid.curator.PotentiallyGzippedCompressionProvider; import org.apache.druid.curator.announcement.Announcer; +import org.apache.druid.curator.announcement.NodeAnnouncer; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Pair; @@ -90,6 +91,7 @@ public class BatchServerInventoryViewTest private CuratorFramework cf; private ObjectMapper jsonMapper; private Announcer announcer; + private NodeAnnouncer nodeAnnouncer; private BatchDataSegmentAnnouncer segmentAnnouncer; private DataSegmentServerAnnouncer serverAnnouncer; private Set testSegments; @@ -123,6 +125,9 @@ public void setUp() throws Exception ); announcer.start(); + nodeAnnouncer = new NodeAnnouncer(cf); + nodeAnnouncer.start(); + DruidServerMetadata serverMetadata = new DruidServerMetadata( "id", "host", @@ -145,7 +150,7 @@ public String getBase() serverAnnouncer = new CuratorDataSegmentServerAnnouncer( serverMetadata, zkPathsConfig, - announcer, + nodeAnnouncer, jsonMapper ); serverAnnouncer.announce(); @@ -225,6 +230,7 @@ public void tearDown() throws Exception filteredBatchServerInventoryView.stop(); serverAnnouncer.unannounce(); announcer.stop(); + nodeAnnouncer.stop(); cf.close(); testingCluster.stop(); } diff --git a/server/src/test/java/org/apache/druid/curator/announcement/NodeAnnouncerTest.java b/server/src/test/java/org/apache/druid/curator/announcement/NodeAnnouncerTest.java new file mode 100644 index 000000000000..220c7e40e160 --- /dev/null +++ b/server/src/test/java/org/apache/druid/curator/announcement/NodeAnnouncerTest.java @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.curator.announcement; + +import com.google.common.collect.Sets; +import org.apache.curator.framework.api.CuratorEventType; +import org.apache.curator.framework.api.transaction.CuratorOp; +import org.apache.curator.framework.api.transaction.CuratorTransactionResult; +import org.apache.curator.test.KillSession; +import org.apache.curator.utils.ZKPaths; +import org.apache.druid.curator.CuratorTestBase; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.zookeeper.KeeperException.Code; +import org.apache.zookeeper.data.Stat; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.CountDownLatch; + +/** + */ +public class NodeAnnouncerTest extends CuratorTestBase +{ + + @Before + public void setUp() throws Exception + { + setupServerAndCurator(); + } + + @After + public void tearDown() + { + tearDownServerAndCurator(); + } + + @Test(timeout = 60_000L) + public void testSanity() throws Exception + { + curator.start(); + curator.blockUntilConnected(); + NodeAnnouncer announcer = new NodeAnnouncer(curator); + + final byte[] billy = StringUtils.toUtf8("billy"); + final String testPath1 = "/test1"; + final String testPath2 = "/somewhere/test2"; + announcer.announce(testPath1, billy); + + Assert.assertNull("/test1 does not exists", curator.checkExists().forPath(testPath1)); + Assert.assertNull("/somewhere/test2 does not exists", curator.checkExists().forPath(testPath2)); + + announcer.start(); + while (!announcer.getAddedPaths().contains("/test1")) { + Thread.sleep(100); + } + + try { + Assert.assertArrayEquals("/test1 has data", billy, curator.getData().decompressed().forPath(testPath1)); + Assert.assertNull("/somewhere/test2 still does not exist", curator.checkExists().forPath(testPath2)); + + announcer.announce(testPath2, billy); + + Assert.assertArrayEquals("/test1 still has data", billy, curator.getData().decompressed().forPath(testPath1)); + Assert.assertArrayEquals( + "/somewhere/test2 has data", + billy, + curator.getData().decompressed().forPath(testPath2) + ); + + final CountDownLatch latch = new CountDownLatch(1); + curator.getCuratorListenable().addListener( + (client, event) -> { + if (event.getType() == CuratorEventType.CREATE && event.getPath().equals(testPath1)) { + latch.countDown(); + } + } + ); + final CuratorOp deleteOp = curator.transactionOp().delete().forPath(testPath1); + final Collection results = curator.transaction().forOperations(deleteOp); + Assert.assertEquals(1, results.size()); + final CuratorTransactionResult result = results.iterator().next(); + Assert.assertEquals(Code.OK.intValue(), result.getError()); // assert delete + + Assert.assertTrue("Wait for /test1 to be created", timing.forWaiting().awaitLatch(latch)); + + Assert.assertArrayEquals( + "expect /test1 data is restored", + billy, + curator.getData().decompressed().forPath(testPath1) + ); + Assert.assertArrayEquals( + "expect /somewhere/test2 is still there", + billy, + curator.getData().decompressed().forPath(testPath2) + ); + + announcer.unannounce(testPath1); + Assert.assertNull("expect /test1 unannounced", curator.checkExists().forPath(testPath1)); + Assert.assertArrayEquals( + "expect /somewhere/test2 is still still there", + billy, + curator.getData().decompressed().forPath(testPath2) + ); + } + finally { + announcer.stop(); + } + + Assert.assertNull("expect /test1 remains unannounced", curator.checkExists().forPath(testPath1)); + Assert.assertNull("expect /somewhere/test2 unannounced", curator.checkExists().forPath(testPath2)); + } + + @Test(timeout = 60_000L) + public void testSessionKilled() throws Exception + { + curator.start(); + curator.blockUntilConnected(); + NodeAnnouncer announcer = new NodeAnnouncer(curator); + try { + curator.inTransaction().create().forPath("/somewhere").and().commit(); + announcer.start(); + + final byte[] billy = StringUtils.toUtf8("billy"); + final String testPath1 = "/test1"; + final String testPath2 = "/somewhere/test2"; + final Set paths = Sets.newHashSet(testPath1, testPath2); + announcer.announce(testPath1, billy); + announcer.announce(testPath2, billy); + + Assert.assertArrayEquals(billy, curator.getData().decompressed().forPath(testPath1)); + Assert.assertArrayEquals(billy, curator.getData().decompressed().forPath(testPath2)); + + final CountDownLatch latch = new CountDownLatch(1); + curator.getCuratorListenable().addListener( + (client, event) -> { + if (event.getType() == CuratorEventType.CREATE) { + paths.remove(event.getPath()); + if (paths.isEmpty()) { + latch.countDown(); + } + } + } + ); + KillSession.kill(curator.getZookeeperClient().getZooKeeper(), server.getConnectString()); + + Assert.assertTrue(timing.forWaiting().awaitLatch(latch)); + + Assert.assertArrayEquals(billy, curator.getData().decompressed().forPath(testPath1)); + Assert.assertArrayEquals(billy, curator.getData().decompressed().forPath(testPath2)); + + announcer.stop(); + + while ((curator.checkExists().forPath(testPath1) != null) || (curator.checkExists().forPath(testPath2) != null)) { + Thread.sleep(100); + } + + Assert.assertNull(curator.checkExists().forPath(testPath1)); + Assert.assertNull(curator.checkExists().forPath(testPath2)); + } + finally { + announcer.stop(); + } + } + + @Test + public void testCleansUpItsLittleTurdlings() throws Exception + { + curator.start(); + curator.blockUntilConnected(); + NodeAnnouncer announcer = new NodeAnnouncer(curator); + + final byte[] billy = StringUtils.toUtf8("billy"); + final String testPath = "/somewhere/test2"; + final String parent = ZKPaths.getPathAndNode(testPath).getPath(); + + announcer.start(); + try { + Assert.assertNull(curator.checkExists().forPath(parent)); + + awaitAnnounce(announcer, testPath, billy, true); + + Assert.assertNotNull(curator.checkExists().forPath(parent)); + } + finally { + announcer.stop(); + } + + Assert.assertNull(curator.checkExists().forPath(parent)); + } + + @Test + public void testLeavesBehindTurdlingsThatAlreadyExisted() throws Exception + { + curator.start(); + curator.blockUntilConnected(); + NodeAnnouncer announcer = new NodeAnnouncer(curator); + + final byte[] billy = StringUtils.toUtf8("billy"); + final String testPath = "/somewhere/test2"; + final String parent = ZKPaths.getPathAndNode(testPath).getPath(); + + curator.create().forPath(parent); + final Stat initialStat = curator.checkExists().forPath(parent); + + announcer.start(); + try { + Assert.assertEquals(initialStat.getMzxid(), curator.checkExists().forPath(parent).getMzxid()); + + awaitAnnounce(announcer, testPath, billy, true); + + Assert.assertEquals(initialStat.getMzxid(), curator.checkExists().forPath(parent).getMzxid()); + } + finally { + announcer.stop(); + } + + Assert.assertEquals(initialStat.getMzxid(), curator.checkExists().forPath(parent).getMzxid()); + } + + @Test + public void testLeavesBehindTurdlingsWhenToldTo() throws Exception + { + curator.start(); + curator.blockUntilConnected(); + NodeAnnouncer announcer = new NodeAnnouncer(curator); + + final byte[] billy = StringUtils.toUtf8("billy"); + final String testPath = "/somewhere/test2"; + final String parent = ZKPaths.getPathAndNode(testPath).getPath(); + + announcer.start(); + try { + Assert.assertNull(curator.checkExists().forPath(parent)); + + awaitAnnounce(announcer, testPath, billy, false); + + Assert.assertNotNull(curator.checkExists().forPath(parent)); + } + finally { + announcer.stop(); + } + + Assert.assertNotNull(curator.checkExists().forPath(parent)); + } + + private void awaitAnnounce( + final NodeAnnouncer announcer, + final String path, + final byte[] bytes, + boolean removeParentsIfCreated + ) throws InterruptedException + { + final CountDownLatch latch = new CountDownLatch(1); + curator.getCuratorListenable().addListener( + (client, event) -> { + if (event.getType() == CuratorEventType.CREATE && event.getPath().equals(path)) { + latch.countDown(); + } + } + ); + announcer.announce(path, bytes, removeParentsIfCreated); + latch.await(); + } +} diff --git a/server/src/test/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncerAndDiscoveryTest.java b/server/src/test/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncerAndDiscoveryTest.java index 1851cc749614..12a87fc5e3d3 100644 --- a/server/src/test/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncerAndDiscoveryTest.java +++ b/server/src/test/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncerAndDiscoveryTest.java @@ -24,12 +24,11 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import org.apache.druid.curator.CuratorTestBase; -import org.apache.druid.curator.announcement.Announcer; +import org.apache.druid.curator.announcement.NodeAnnouncer; import org.apache.druid.discovery.DiscoveryDruidNode; import org.apache.druid.discovery.DruidNodeDiscovery; import org.apache.druid.discovery.NodeRole; import org.apache.druid.jackson.DefaultObjectMapper; -import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.server.DruidNode; import org.apache.druid.server.initialization.ServerConfig; import org.apache.druid.server.initialization.ZkPathsConfig; @@ -69,10 +68,7 @@ public void testAnnouncementAndDiscovery() throws Exception curator.start(); curator.blockUntilConnected(); - Announcer announcer = new Announcer( - curator, - Execs.directExecutor() - ); + NodeAnnouncer announcer = new NodeAnnouncer(curator); announcer.start(); CuratorDruidNodeAnnouncer druidNodeAnnouncer = new CuratorDruidNodeAnnouncer( diff --git a/server/src/test/java/org/apache/druid/server/listener/announcer/ListenerResourceAnnouncerTest.java b/server/src/test/java/org/apache/druid/server/listener/announcer/ListenerResourceAnnouncerTest.java index 96e2a18bde7b..5d86e2e7e1c9 100644 --- a/server/src/test/java/org/apache/druid/server/listener/announcer/ListenerResourceAnnouncerTest.java +++ b/server/src/test/java/org/apache/druid/server/listener/announcer/ListenerResourceAnnouncerTest.java @@ -21,21 +21,17 @@ import org.apache.curator.utils.ZKPaths; import org.apache.druid.curator.CuratorTestBase; -import org.apache.druid.curator.announcement.Announcer; +import org.apache.druid.curator.announcement.NodeAnnouncer; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.segment.CloserRule; import org.apache.druid.server.http.HostAndPortWithScheme; import org.apache.druid.server.initialization.ZkPathsConfig; import org.easymock.EasyMock; -import org.junit.After; import org.junit.Assert; -import org.junit.Before; import org.junit.Rule; import org.junit.Test; import java.io.Closeable; -import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; public class ListenerResourceAnnouncerTest extends CuratorTestBase @@ -45,19 +41,6 @@ public class ListenerResourceAnnouncerTest extends CuratorTestBase private final String announcePath = listeningAnnouncerConfig.getAnnouncementPath(listenerKey); @Rule public CloserRule closerRule = new CloserRule(true); - private ExecutorService executorService; - - @Before - public void setUp() - { - executorService = Execs.singleThreaded("listener-resource--%d"); - } - - @After - public void tearDown() - { - executorService.shutdownNow(); - } @Test public void testAnnouncerBehaves() throws Exception @@ -68,7 +51,7 @@ public void testAnnouncerBehaves() throws Exception closerRule.closeLater(curator); Assert.assertNotNull(curator.create().forPath("/druid")); Assert.assertTrue(curator.blockUntilConnected(10, TimeUnit.SECONDS)); - final Announcer announcer = new Announcer(curator, executorService); + final NodeAnnouncer announcer = new NodeAnnouncer(curator); final HostAndPortWithScheme node = HostAndPortWithScheme.fromString("localhost"); final ListenerResourceAnnouncer listenerResourceAnnouncer = new ListenerResourceAnnouncer( announcer, @@ -109,7 +92,7 @@ public void close() @Test public void testStartCorrect() { - final Announcer announcer = EasyMock.createStrictMock(Announcer.class); + final NodeAnnouncer announcer = EasyMock.createStrictMock(NodeAnnouncer.class); final HostAndPortWithScheme node = HostAndPortWithScheme.fromString("some_host"); final ListenerResourceAnnouncer resourceAnnouncer = new ListenerResourceAnnouncer( From 8420d73df63250d0a1dcd7d5b1acb72365d601dd Mon Sep 17 00:00:00 2001 From: asdf2014 Date: Mon, 20 Jan 2020 00:02:34 +0800 Subject: [PATCH 02/13] Patch comments --- .../curator/announcement/Announceable.java | 15 ++++++++++ .../druid/curator/announcement/Announcer.java | 21 +++----------- .../curator/announcement/NodeAnnouncer.java | 29 ++++++------------- .../org/apache/druid/server/ZKPathsUtils.java | 5 ++++ .../curator/announcement/AnnouncerTest.java | 8 ++--- .../announcement/NodeAnnouncerTest.java | 8 ++--- 6 files changed, 41 insertions(+), 45 deletions(-) create mode 100644 server/src/main/java/org/apache/druid/curator/announcement/Announceable.java diff --git a/server/src/main/java/org/apache/druid/curator/announcement/Announceable.java b/server/src/main/java/org/apache/druid/curator/announcement/Announceable.java new file mode 100644 index 000000000000..2520cb3012bc --- /dev/null +++ b/server/src/main/java/org/apache/druid/curator/announcement/Announceable.java @@ -0,0 +1,15 @@ +package org.apache.druid.curator.announcement; + +class Announceable +{ + final String path; + final byte[] bytes; + final boolean removeParentsIfCreated; + + public Announceable(String path, byte[] bytes, boolean removeParentsIfCreated) + { + this.path = path; + this.bytes = bytes; + this.removeParentsIfCreated = removeParentsIfCreated; + } +} diff --git a/server/src/main/java/org/apache/druid/curator/announcement/Announcer.java b/server/src/main/java/org/apache/druid/curator/announcement/Announcer.java index 89694d67c557..a58e35430997 100644 --- a/server/src/main/java/org/apache/druid/curator/announcement/Announcer.java +++ b/server/src/main/java/org/apache/druid/curator/announcement/Announcer.java @@ -36,6 +36,7 @@ import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStop; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.server.ZKPathsUtils; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.data.Stat; @@ -54,7 +55,8 @@ import java.util.concurrent.atomic.AtomicReference; /** - * Announces things on Zookeeper. + * {@link NodeAnnouncer} announces single node on Zookeeper and only watches this node, + * while {@link Announcer} watches all child paths, not only this node. */ public class Announcer { @@ -242,8 +244,7 @@ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) th switch (event.getType()) { case CHILD_REMOVED: final ChildData child = event.getData(); - final ZKPaths.PathAndNode childPath = ZKPaths.getPathAndNode(child.getPath()); - final byte[] value = finalSubPaths.get(childPath.getNode()); + final byte[] value = finalSubPaths.get(ZKPathsUtils.getParentNode(child.getPath())); if (value != null) { log.info("Node[%s] dropped, reinstating.", child.getPath()); createAnnouncement(child.getPath(), value); @@ -434,18 +435,4 @@ private void createPath(String parentPath, boolean removeParentsIfCreated) log.info(e, "Problem creating parentPath[%s], someone else created it first?", parentPath); } } - - private static class Announceable - { - final String path; - final byte[] bytes; - final boolean removeParentsIfCreated; - - public Announceable(String path, byte[] bytes, boolean removeParentsIfCreated) - { - this.path = path; - this.bytes = bytes; - this.removeParentsIfCreated = removeParentsIfCreated; - } - } } diff --git a/server/src/main/java/org/apache/druid/curator/announcement/NodeAnnouncer.java b/server/src/main/java/org/apache/druid/curator/announcement/NodeAnnouncer.java index a68b426e5402..3f05fce84dd3 100644 --- a/server/src/main/java/org/apache/druid/curator/announcement/NodeAnnouncer.java +++ b/server/src/main/java/org/apache/druid/curator/announcement/NodeAnnouncer.java @@ -42,11 +42,10 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CopyOnWriteArrayList; /** - * NodeAnnouncer announces single node on Zookeeper and only watches this node, - * while {@link Announcer} watches all child paths, not only this node + * {@link NodeAnnouncer} announces single node on Zookeeper and only watches this node, + * while {@link Announcer} watches all child paths, not only this node. */ public class NodeAnnouncer { @@ -68,10 +67,13 @@ public class NodeAnnouncer private final ConcurrentMap listeners = new ConcurrentHashMap<>(); private final ConcurrentMap announcedPaths = new ConcurrentHashMap<>(); /** - * Only the one created the parent path can drop the parent path, so should remember these created parents. + * Only the one created the parent path can drop it, so should remember these created parents. + * This comment sounds confusing, shouldn't one of "parent path" occurrences in it be something else? */ - private final List pathsCreatedInThisAnnouncer = new CopyOnWriteArrayList<>(); + @GuardedBy("toAnnounce") + private final List pathsCreatedInThisAnnouncer = new ArrayList<>(); + @GuardedBy("toAnnounce") private boolean started = false; public NodeAnnouncer(CuratorFramework curator) @@ -187,7 +189,7 @@ public void announce(String path, byte[] bytes, boolean removeParentIfCreated) } } catch (Exception e) { - log.debug(e, "Problem checking if the parent existed, ignoring."); + log.debug(e, "Problem checking if the parent path doesn't exist, ignoring."); } // Synchronize to make sure that I only create a listener once. @@ -320,6 +322,7 @@ private void startCache(NodeCache cache) } } + @GuardedBy("toAnnounce") private void createPath(String parentPath, boolean removeParentsIfCreated) { try { @@ -333,18 +336,4 @@ private void createPath(String parentPath, boolean removeParentsIfCreated) log.error(e, "Problem creating parentPath[%s], someone else created it first?", parentPath); } } - - private static class Announceable - { - final String path; - final byte[] bytes; - final boolean removeParentsIfCreated; - - public Announceable(String path, byte[] bytes, boolean removeParentsIfCreated) - { - this.path = path; - this.bytes = bytes; - this.removeParentsIfCreated = removeParentsIfCreated; - } - } } diff --git a/server/src/main/java/org/apache/druid/server/ZKPathsUtils.java b/server/src/main/java/org/apache/druid/server/ZKPathsUtils.java index 9832abde27bf..59a9c8c2aeee 100644 --- a/server/src/main/java/org/apache/druid/server/ZKPathsUtils.java +++ b/server/src/main/java/org/apache/druid/server/ZKPathsUtils.java @@ -27,4 +27,9 @@ public static String getParentPath(String path) { return ZKPaths.getPathAndNode(path).getPath(); } + + public static String getParentNode(String path) + { + return ZKPaths.getPathAndNode(path).getNode(); + } } diff --git a/server/src/test/java/org/apache/druid/curator/announcement/AnnouncerTest.java b/server/src/test/java/org/apache/druid/curator/announcement/AnnouncerTest.java index e12d8bc47bb7..3f0c7e545e42 100644 --- a/server/src/test/java/org/apache/druid/curator/announcement/AnnouncerTest.java +++ b/server/src/test/java/org/apache/druid/curator/announcement/AnnouncerTest.java @@ -27,11 +27,11 @@ import org.apache.curator.framework.api.transaction.CuratorOp; import org.apache.curator.framework.api.transaction.CuratorTransactionResult; import org.apache.curator.test.KillSession; -import org.apache.curator.utils.ZKPaths; import org.apache.druid.curator.CuratorTestBase; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.server.ZKPathsUtils; import org.apache.zookeeper.KeeperException.Code; import org.apache.zookeeper.data.Stat; import org.junit.After; @@ -212,7 +212,7 @@ public void testCleansUpItsLittleTurdlings() throws Exception final byte[] billy = StringUtils.toUtf8("billy"); final String testPath = "/somewhere/test2"; - final String parent = ZKPaths.getPathAndNode(testPath).getPath(); + final String parent = ZKPathsUtils.getParentPath(testPath); announcer.start(); try { @@ -238,7 +238,7 @@ public void testLeavesBehindTurdlingsThatAlreadyExisted() throws Exception final byte[] billy = StringUtils.toUtf8("billy"); final String testPath = "/somewhere/test2"; - final String parent = ZKPaths.getPathAndNode(testPath).getPath(); + final String parent = ZKPathsUtils.getParentPath(testPath); curator.create().forPath(parent); final Stat initialStat = curator.checkExists().forPath(parent); @@ -267,7 +267,7 @@ public void testLeavesBehindTurdlingsWhenToldTo() throws Exception final byte[] billy = StringUtils.toUtf8("billy"); final String testPath = "/somewhere/test2"; - final String parent = ZKPaths.getPathAndNode(testPath).getPath(); + final String parent = ZKPathsUtils.getParentPath(testPath); announcer.start(); try { diff --git a/server/src/test/java/org/apache/druid/curator/announcement/NodeAnnouncerTest.java b/server/src/test/java/org/apache/druid/curator/announcement/NodeAnnouncerTest.java index 220c7e40e160..42fe943349d6 100644 --- a/server/src/test/java/org/apache/druid/curator/announcement/NodeAnnouncerTest.java +++ b/server/src/test/java/org/apache/druid/curator/announcement/NodeAnnouncerTest.java @@ -24,9 +24,9 @@ import org.apache.curator.framework.api.transaction.CuratorOp; import org.apache.curator.framework.api.transaction.CuratorTransactionResult; import org.apache.curator.test.KillSession; -import org.apache.curator.utils.ZKPaths; import org.apache.druid.curator.CuratorTestBase; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.server.ZKPathsUtils; import org.apache.zookeeper.KeeperException.Code; import org.apache.zookeeper.data.Stat; import org.junit.After; @@ -192,7 +192,7 @@ public void testCleansUpItsLittleTurdlings() throws Exception final byte[] billy = StringUtils.toUtf8("billy"); final String testPath = "/somewhere/test2"; - final String parent = ZKPaths.getPathAndNode(testPath).getPath(); + final String parent = ZKPathsUtils.getParentPath(testPath); announcer.start(); try { @@ -218,7 +218,7 @@ public void testLeavesBehindTurdlingsThatAlreadyExisted() throws Exception final byte[] billy = StringUtils.toUtf8("billy"); final String testPath = "/somewhere/test2"; - final String parent = ZKPaths.getPathAndNode(testPath).getPath(); + final String parent = ZKPathsUtils.getParentPath(testPath); curator.create().forPath(parent); final Stat initialStat = curator.checkExists().forPath(parent); @@ -247,7 +247,7 @@ public void testLeavesBehindTurdlingsWhenToldTo() throws Exception final byte[] billy = StringUtils.toUtf8("billy"); final String testPath = "/somewhere/test2"; - final String parent = ZKPaths.getPathAndNode(testPath).getPath(); + final String parent = ZKPathsUtils.getParentPath(testPath); announcer.start(); try { From da8538b709cc8b617a1923d74b5758896d3a3e65 Mon Sep 17 00:00:00 2001 From: asdf2014 Date: Mon, 20 Jan 2020 00:10:06 +0800 Subject: [PATCH 03/13] Add license --- .../curator/announcement/Announceable.java | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/server/src/main/java/org/apache/druid/curator/announcement/Announceable.java b/server/src/main/java/org/apache/druid/curator/announcement/Announceable.java index 2520cb3012bc..59f26d2f902e 100644 --- a/server/src/main/java/org/apache/druid/curator/announcement/Announceable.java +++ b/server/src/main/java/org/apache/druid/curator/announcement/Announceable.java @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.druid.curator.announcement; class Announceable From 67b5064d31a072605c32c6fe0abc1d06326a68d9 Mon Sep 17 00:00:00 2001 From: asdf2014 Date: Thu, 23 Jan 2020 21:27:45 +0800 Subject: [PATCH 04/13] Patch comments --- .../apache/druid/curator/announcement/Announcer.java | 7 ++++--- .../druid/curator/announcement/NodeAnnouncer.java | 12 ++++++------ 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/apache/druid/curator/announcement/Announcer.java b/server/src/main/java/org/apache/druid/curator/announcement/Announcer.java index a58e35430997..e47916814cd4 100644 --- a/server/src/main/java/org/apache/druid/curator/announcement/Announcer.java +++ b/server/src/main/java/org/apache/druid/curator/announcement/Announcer.java @@ -55,7 +55,7 @@ import java.util.concurrent.atomic.AtomicReference; /** - * {@link NodeAnnouncer} announces single node on Zookeeper and only watches this node, + * {@link NodeAnnouncer} announces a single node on Zookeeper and only watches this node, * while {@link Announcer} watches all child paths, not only this node. */ public class Announcer @@ -66,12 +66,13 @@ public class Announcer private final PathChildrenCacheFactory factory; private final ExecutorService pathChildrenCacheExecutor; + @GuardedBy("toAnnounce") private final List toAnnounce = new ArrayList<>(); @GuardedBy("toAnnounce") private final List toUpdate = new ArrayList<>(); private final ConcurrentMap listeners = new ConcurrentHashMap<>(); private final ConcurrentMap> announcements = new ConcurrentHashMap<>(); - private final List parentsIBuilt = new CopyOnWriteArrayList(); + private final List parentsIBuilt = new CopyOnWriteArrayList<>(); // Used for testing private Set addedChildren; @@ -233,7 +234,7 @@ public void announce(String path, byte[] bytes, boolean removeParentIfCreated) cache.getListenable().addListener( new PathChildrenCacheListener() { - private final AtomicReference> pathsLost = new AtomicReference>(null); + private final AtomicReference> pathsLost = new AtomicReference<>(null); @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception diff --git a/server/src/main/java/org/apache/druid/curator/announcement/NodeAnnouncer.java b/server/src/main/java/org/apache/druid/curator/announcement/NodeAnnouncer.java index 3f05fce84dd3..0648d2a7d48d 100644 --- a/server/src/main/java/org/apache/druid/curator/announcement/NodeAnnouncer.java +++ b/server/src/main/java/org/apache/druid/curator/announcement/NodeAnnouncer.java @@ -44,7 +44,7 @@ import java.util.concurrent.ConcurrentMap; /** - * {@link NodeAnnouncer} announces single node on Zookeeper and only watches this node, + * {@link NodeAnnouncer} announces a single node on Zookeeper and only watches this node, * while {@link Announcer} watches all child paths, not only this node. */ public class NodeAnnouncer @@ -57,6 +57,7 @@ public class NodeAnnouncer * In case a path is added to this collection in {@link #announce} before zk is connected, * should remember the path and do announce in {@link #start} later. */ + @GuardedBy("toAnnounce") private final List toAnnounce = new ArrayList<>(); /** * In case a path is added to this collection in {@link #update} before zk is connected, @@ -67,8 +68,8 @@ public class NodeAnnouncer private final ConcurrentMap listeners = new ConcurrentHashMap<>(); private final ConcurrentMap announcedPaths = new ConcurrentHashMap<>(); /** - * Only the one created the parent path can drop it, so should remember these created parents. - * This comment sounds confusing, shouldn't one of "parent path" occurrences in it be something else? + * This list is to remember all paths this node announcer has created. + * On {@link #stop}, the node announcer is responsible for deleting all paths in this list. */ @GuardedBy("toAnnounce") private final List pathsCreatedInThisAnnouncer = new ArrayList<>(); @@ -126,11 +127,10 @@ public void stop() for (NodeCache cache : listeners.values()) { closer.register(cache); } - CloseQuietly.close(closer); - for (String announcementPath : announcedPaths.keySet()) { - unannounce(announcementPath); + closer.register(() -> unannounce(announcementPath)); } + CloseQuietly.close(closer); if (!pathsCreatedInThisAnnouncer.isEmpty()) { final List deleteOps = new ArrayList<>(pathsCreatedInThisAnnouncer.size()); From da2f21283acd88e768ed89fad85d2a11a0da8b81 Mon Sep 17 00:00:00 2001 From: GWphua Date: Mon, 11 Nov 2024 15:46:33 +0800 Subject: [PATCH 05/13] Update to use CloseableUtils instead of CloseQuietly. --- .../apache/druid/curator/announcement/NodeAnnouncer.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/apache/druid/curator/announcement/NodeAnnouncer.java b/server/src/main/java/org/apache/druid/curator/announcement/NodeAnnouncer.java index 0648d2a7d48d..3ba68f28b8db 100644 --- a/server/src/main/java/org/apache/druid/curator/announcement/NodeAnnouncer.java +++ b/server/src/main/java/org/apache/druid/curator/announcement/NodeAnnouncer.java @@ -26,12 +26,12 @@ import org.apache.curator.framework.recipes.cache.NodeCache; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStop; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.server.ZKPathsUtils; +import org.apache.druid.utils.CloseableUtils; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; @@ -130,7 +130,7 @@ public void stop() for (String announcementPath : announcedPaths.keySet()) { closer.register(() -> unannounce(announcementPath)); } - CloseQuietly.close(closer); + CloseableUtils.closeAndWrapExceptions(closer); if (!pathsCreatedInThisAnnouncer.isEmpty()) { final List deleteOps = new ArrayList<>(pathsCreatedInThisAnnouncer.size()); @@ -317,8 +317,7 @@ private void startCache(NodeCache cache) cache.start(); } catch (Exception e) { - CloseQuietly.close(cache); - throw new RuntimeException(e); + throw CloseableUtils.closeInCatch(new RuntimeException(e), cache); } } From a5d0669ab7205f158e10600d9b9ae8d8a57e991b Mon Sep 17 00:00:00 2001 From: GWphua Date: Mon, 11 Nov 2024 18:11:10 +0800 Subject: [PATCH 06/13] Add Javadocs for Announceable class --- .../druid/curator/announcement/Announceable.java | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/server/src/main/java/org/apache/druid/curator/announcement/Announceable.java b/server/src/main/java/org/apache/druid/curator/announcement/Announceable.java index 59f26d2f902e..c2ad7c588163 100644 --- a/server/src/main/java/org/apache/druid/curator/announcement/Announceable.java +++ b/server/src/main/java/org/apache/druid/curator/announcement/Announceable.java @@ -19,10 +19,25 @@ package org.apache.druid.curator.announcement; +/** + * The {@link Announceable} is a representation of an announcement to be made in ZooKeeper. + */ class Announceable { + /** + * Represents the path in ZooKeeper where the announcement will be made. + */ final String path; + + /** + * Holds the actual data to be announced. + */ final byte[] bytes; + + /** + * Indicates whether parent nodes should be removed if the announcement is created successfully. + * This can be useful for cleaning up unused paths in ZooKeeper. + */ final boolean removeParentsIfCreated; public Announceable(String path, byte[] bytes, boolean removeParentsIfCreated) From f3b21a4ae68c006aea87f6964e5cee315adaf81d Mon Sep 17 00:00:00 2001 From: GWphua Date: Mon, 11 Nov 2024 18:16:13 +0800 Subject: [PATCH 07/13] Javadocs on Announcers --- .../druid/curator/announcement/Announcer.java | 9 +- .../curator/announcement/NodeAnnouncer.java | 88 +++++++++++++------ 2 files changed, 69 insertions(+), 28 deletions(-) diff --git a/server/src/main/java/org/apache/druid/curator/announcement/Announcer.java b/server/src/main/java/org/apache/druid/curator/announcement/Announcer.java index a25d89f38a3a..828ee3a6c793 100644 --- a/server/src/main/java/org/apache/druid/curator/announcement/Announcer.java +++ b/server/src/main/java/org/apache/druid/curator/announcement/Announcer.java @@ -54,8 +54,13 @@ import java.util.concurrent.atomic.AtomicReference; /** - * {@link NodeAnnouncer} announces a single node on Zookeeper and only watches this node, - * while {@link Announcer} watches all child paths, not only this node. + * The {@link Announcer} class manages the announcement of multiple child nodes + * under a specified parent path in a ZooKeeper ensemble. It monitors these nodes + * to ensure their existence and manage their lifecycle collectively. + * + *

Utilize this class when you need to handle complex node structures, + * including relationships between multiple child nodes. Should your use case + * involve the management of a standalone node instead, see {@link NodeAnnouncer}.

*/ public class Announcer { diff --git a/server/src/main/java/org/apache/druid/curator/announcement/NodeAnnouncer.java b/server/src/main/java/org/apache/druid/curator/announcement/NodeAnnouncer.java index 3ba68f28b8db..b7eaeca371a9 100644 --- a/server/src/main/java/org/apache/druid/curator/announcement/NodeAnnouncer.java +++ b/server/src/main/java/org/apache/druid/curator/announcement/NodeAnnouncer.java @@ -44,39 +44,54 @@ import java.util.concurrent.ConcurrentMap; /** - * {@link NodeAnnouncer} announces a single node on Zookeeper and only watches this node, - * while {@link Announcer} watches all child paths, not only this node. + * The {@link NodeAnnouncer} class is responsible for announcing a single node + * in a ZooKeeper ensemble. It creates an ephemeral node at a specified path + * and monitors its existence to ensure that it remains active until it is + * explicitly unannounced or the object is closed. + * + *

This class provides methods to announce and update the content of the + * node as well as handle path creation if required.

+ * + *

Use this class when you need to manage the lifecycle of a standalone + * node without concerns about its children or siblings. Should your use case + * involve the management of child nodes under a specific parent path in a + * ZooKeeper ensemble, see {@link Announcer}.

*/ public class NodeAnnouncer { private static final Logger log = new Logger(NodeAnnouncer.class); private final CuratorFramework curator; + private final ConcurrentMap listeners = new ConcurrentHashMap<>(); + private final ConcurrentMap announcedPaths = new ConcurrentHashMap<>(); + + @GuardedBy("toAnnounce") + private boolean started = false; /** - * In case a path is added to this collection in {@link #announce} before zk is connected, - * should remember the path and do announce in {@link #start} later. + * This list holds paths that need to be announced. If a path is added to this list + * in the {@link #announce} method before the connection to ZooKeeper is established, + * it will be stored here and announced later during the {@link #start} method. */ @GuardedBy("toAnnounce") private final List toAnnounce = new ArrayList<>(); + /** - * In case a path is added to this collection in {@link #update} before zk is connected, - * should remember the path and do update in {@link #start} later. + * This list holds paths that need to be updated. If a path is added to this list + * in the {@link #update} method before the connection to ZooKeeper is established, + * it will be stored here and updated later during the {@link #start} method. */ @GuardedBy("toAnnounce") private final List toUpdate = new ArrayList<>(); - private final ConcurrentMap listeners = new ConcurrentHashMap<>(); - private final ConcurrentMap announcedPaths = new ConcurrentHashMap<>(); + /** - * This list is to remember all paths this node announcer has created. - * On {@link #stop}, the node announcer is responsible for deleting all paths in this list. + * This list keeps track of all the paths created by this node announcer. + * When the {@link #stop} method is called, + * the node announcer is responsible for deleting all paths stored in this list. */ @GuardedBy("toAnnounce") private final List pathsCreatedInThisAnnouncer = new ArrayList<>(); - @GuardedBy("toAnnounce") - private boolean started = false; - public NodeAnnouncer(CuratorFramework curator) { this.curator = curator; @@ -89,12 +104,12 @@ Set getAddedPaths() } @LifecycleStart - @SuppressWarnings("DuplicatedCode") public void start() { - log.info("Starting announcer"); + log.info("Starting NodeAnnouncer"); synchronized (toAnnounce) { if (started) { + log.debug("Called start() but NodeAnnouncer have already started."); return; } @@ -115,23 +130,37 @@ public void start() @LifecycleStop public void stop() { - log.info("Stopping announcer"); + log.info("Stopping NodeAnnouncer"); synchronized (toAnnounce) { if (!started) { + log.debug("Called stop() but NodeAnnouncer have not started."); return; } started = false; + closeResources(); + deletePaths(); + } + } - Closer closer = Closer.create(); - for (NodeCache cache : listeners.values()) { - closer.register(cache); - } - for (String announcementPath : announcedPaths.keySet()) { - closer.register(() -> unannounce(announcementPath)); - } - CloseableUtils.closeAndWrapExceptions(closer); + private void closeResources() + { + Closer closer = Closer.create(); + for (NodeCache cache : listeners.values()) { + closer.register(cache); + } + for (String announcementPath : announcedPaths.keySet()) { + closer.register(() -> unannounce(announcementPath)); + } + CloseableUtils.closeAndWrapExceptions(closer); + } + private void deletePaths() + { + // deletePaths method is only used in stop(), which already has synchronized(toAnnounce), + // this line is here just to prevent the static analysis from throwing + // "Access to field 'pathsCreatedInThisAnnouncer' outside declared guards". + synchronized (toAnnounce) { if (!pathsCreatedInThisAnnouncer.isEmpty()) { final List deleteOps = new ArrayList<>(pathsCreatedInThisAnnouncer.size()); for (String parent : pathsCreatedInThisAnnouncer) { @@ -142,6 +171,7 @@ public void stop() log.error(e, "Unable to delete parent[%s].", parent); } } + try { curator.transaction().forOperations(deleteOps); } @@ -152,8 +182,9 @@ public void stop() } } + /** - * Like announce(path, bytes, true). + * Overload of {@link #announce(String, byte[],boolean)}, but removes parent node of path after announcement. */ public void announce(String path, byte[] bytes) { @@ -172,6 +203,7 @@ public void announce(String path, byte[] bytes, boolean removeParentIfCreated) { synchronized (toAnnounce) { if (!started) { + log.debug("NodeAnnouncer has not started yet, queuing announcement for later processing..."); toAnnounce.add(new Announceable(path, bytes, removeParentIfCreated)); return; } @@ -329,7 +361,11 @@ private void createPath(String parentPath, boolean removeParentsIfCreated) if (removeParentsIfCreated) { pathsCreatedInThisAnnouncer.add(parentPath); } - log.debug("Created parentPath[%s], %s remove on stop() called.", parentPath, removeParentsIfCreated ? "will" : "will not"); + log.debug( + "Created parentPath[%s], %s remove on stop() called.", + parentPath, + removeParentsIfCreated ? "will" : "will not" + ); } catch (Exception e) { log.error(e, "Problem creating parentPath[%s], someone else created it first?", parentPath); From 10530420b6763739f44b3c8c2c9e0d2fcb3f385e Mon Sep 17 00:00:00 2001 From: GWphua Date: Tue, 12 Nov 2024 11:07:29 +0800 Subject: [PATCH 08/13] Refactor announce method --- .../curator/announcement/NodeAnnouncer.java | 182 ++++++++++-------- 1 file changed, 102 insertions(+), 80 deletions(-) diff --git a/server/src/main/java/org/apache/druid/curator/announcement/NodeAnnouncer.java b/server/src/main/java/org/apache/druid/curator/announcement/NodeAnnouncer.java index b7eaeca371a9..72df62fd828a 100644 --- a/server/src/main/java/org/apache/druid/curator/announcement/NodeAnnouncer.java +++ b/server/src/main/java/org/apache/druid/curator/announcement/NodeAnnouncer.java @@ -70,7 +70,7 @@ public class NodeAnnouncer /** * This list holds paths that need to be announced. If a path is added to this list - * in the {@link #announce} method before the connection to ZooKeeper is established, + * in the {@link #announce(String, byte[], boolean)} method before the connection to ZooKeeper is established, * it will be stored here and announced later during the {@link #start} method. */ @GuardedBy("toAnnounce") @@ -139,10 +139,11 @@ public void stop() started = false; closeResources(); - deletePaths(); + dropPathsCreatedInThisAnnouncer(); } } + @GuardedBy("toAnnounce") private void closeResources() { Closer closer = Closer.create(); @@ -155,36 +156,32 @@ private void closeResources() CloseableUtils.closeAndWrapExceptions(closer); } - private void deletePaths() + @GuardedBy("toAnnounce") + private void dropPathsCreatedInThisAnnouncer() { - // deletePaths method is only used in stop(), which already has synchronized(toAnnounce), - // this line is here just to prevent the static analysis from throwing - // "Access to field 'pathsCreatedInThisAnnouncer' outside declared guards". - synchronized (toAnnounce) { - if (!pathsCreatedInThisAnnouncer.isEmpty()) { - final List deleteOps = new ArrayList<>(pathsCreatedInThisAnnouncer.size()); - for (String parent : pathsCreatedInThisAnnouncer) { - try { - deleteOps.add(curator.transactionOp().delete().forPath(parent)); - } - catch (Exception e) { - log.error(e, "Unable to delete parent[%s].", parent); - } - } - + if (!pathsCreatedInThisAnnouncer.isEmpty()) { + final List deleteOps = new ArrayList<>(pathsCreatedInThisAnnouncer.size()); + for (String parent : pathsCreatedInThisAnnouncer) { try { - curator.transaction().forOperations(deleteOps); + deleteOps.add(curator.transactionOp().delete().forPath(parent)); } catch (Exception e) { - log.error(e, "Unable to commit transaction."); + log.error(e, "Unable to delete parent[%s].", parent); } } + + try { + curator.transaction().forOperations(deleteOps); + } + catch (Exception e) { + log.error(e, "Unable to commit transaction."); + } } } /** - * Overload of {@link #announce(String, byte[],boolean)}, but removes parent node of path after announcement. + * Overload of {@link #announce(String, byte[], boolean)}, but removes parent node of path after announcement. */ public void announce(String path, byte[] bytes) { @@ -202,6 +199,8 @@ public void announce(String path, byte[] bytes) public void announce(String path, byte[] bytes, boolean removeParentIfCreated) { synchronized (toAnnounce) { + // In the case that this method is called by other components or thread that assumes the NodeAnnouncer + // is ready when NodeAnnouncer has not started, we will queue the announcement request. if (!started) { log.debug("NodeAnnouncer has not started yet, queuing announcement for later processing..."); toAnnounce.add(new Announceable(path, bytes, removeParentIfCreated)); @@ -210,39 +209,19 @@ public void announce(String path, byte[] bytes, boolean removeParentIfCreated) } final String parentPath = ZKPathsUtils.getParentPath(path); - boolean buildParentPath = false; - byte[] value = announcedPaths.get(path); + // We have yet to announce this path. Check if we need to build a parent path. if (value == null) { - try { - if (curator.checkExists().forPath(parentPath) == null) { - buildParentPath = true; - } - } - catch (Exception e) { - log.debug(e, "Problem checking if the parent path doesn't exist, ignoring."); - } + boolean shouldBuildParentPath = canBuildParentPath(parentPath); // Synchronize to make sure that I only create a listener once. synchronized (toAnnounce) { if (!listeners.containsKey(path)) { - final NodeCache cache = new NodeCache(curator, path, true); - cache.getListenable().addListener( - () -> { - ChildData currentData = cache.getCurrentData(); - if (currentData == null) { - final byte[] value1 = announcedPaths.get(path); - if (value1 != null) { - log.info("Node[%s] dropped, reinstating.", path); - createAnnouncement(path, value1); - } - } - } - ); + final NodeCache cache = setupNodeCache(path); if (started) { - if (buildParentPath) { + if (shouldBuildParentPath) { createPath(parentPath, removeParentIfCreated); } startCache(cache); @@ -252,6 +231,57 @@ public void announce(String path, byte[] bytes, boolean removeParentIfCreated) } } + final boolean readyToCreateAnnouncement = updateAnnouncedPaths(path, bytes); + + if (readyToCreateAnnouncement) { + try { + createAnnouncement(path, bytes); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + private boolean canBuildParentPath(String parentPath) + { + try { + return (curator.checkExists().forPath(parentPath) == null); + } + catch (Exception e) { + log.debug(e, "Failed to check existence of parent path. Proceeding without creating parent path."); + return false; + } + } + + @GuardedBy("toAnnounce") + private NodeCache setupNodeCache(String path) + { + final NodeCache cache = new NodeCache(curator, path, true); + cache.getListenable().addListener( + () -> { + ChildData currentData = cache.getCurrentData(); + + if (currentData == null) { + // If currentData is null, and we record having announced the data, + // this means that the ephemeral node was unexpectedly removed. + // We will recreate the node again using the previous data. + final byte[] previouslyAnnouncedData = announcedPaths.get(path); + if (previouslyAnnouncedData != null) { + log.info( + "Ephemeral node at path [%s] was unexpectedly removed. Recreating node with previous data.", + path + ); + createAnnouncement(path, previouslyAnnouncedData); + } + } + } + ); + return cache; + } + + private boolean updateAnnouncedPaths(String path, byte[] bytes) + { boolean created = false; synchronized (toAnnounce) { if (started) { @@ -264,14 +294,35 @@ public void announce(String path, byte[] bytes, boolean removeParentIfCreated) } } } + return created; + } - if (created) { - try { - createAnnouncement(path, bytes); - } - catch (Exception e) { - throw new RuntimeException(e); + @GuardedBy("toAnnounce") + private void createPath(String parentPath, boolean removeParentsIfCreated) + { + try { + curator.create().creatingParentsIfNeeded().forPath(parentPath); + if (removeParentsIfCreated) { + pathsCreatedInThisAnnouncer.add(parentPath); } + log.debug( + "Created parentPath[%s], %s remove on stop() called.", + parentPath, + removeParentsIfCreated ? "will" : "will not" + ); + } + catch (Exception e) { + log.error(e, "Problem creating parentPath[%s], someone else created it first?", parentPath); + } + } + + private void startCache(NodeCache cache) + { + try { + cache.start(); + } + catch (Exception e) { + throw CloseableUtils.closeInCatch(new RuntimeException(e), cache); } } @@ -342,33 +393,4 @@ public void unannounce(String path) throw new RuntimeException(e); } } - - private void startCache(NodeCache cache) - { - try { - cache.start(); - } - catch (Exception e) { - throw CloseableUtils.closeInCatch(new RuntimeException(e), cache); - } - } - - @GuardedBy("toAnnounce") - private void createPath(String parentPath, boolean removeParentsIfCreated) - { - try { - curator.create().creatingParentsIfNeeded().forPath(parentPath); - if (removeParentsIfCreated) { - pathsCreatedInThisAnnouncer.add(parentPath); - } - log.debug( - "Created parentPath[%s], %s remove on stop() called.", - parentPath, - removeParentsIfCreated ? "will" : "will not" - ); - } - catch (Exception e) { - log.error(e, "Problem creating parentPath[%s], someone else created it first?", parentPath); - } - } } From 903db88262c1d270038672c960bc56d24d8a1d28 Mon Sep 17 00:00:00 2001 From: GWphua Date: Tue, 12 Nov 2024 15:47:06 +0800 Subject: [PATCH 09/13] Refactor NodeAnnouncer --- .../curator/announcement/NodeAnnouncer.java | 64 +++++++++++-------- 1 file changed, 39 insertions(+), 25 deletions(-) diff --git a/server/src/main/java/org/apache/druid/curator/announcement/NodeAnnouncer.java b/server/src/main/java/org/apache/druid/curator/announcement/NodeAnnouncer.java index 72df62fd828a..99cf82c29910 100644 --- a/server/src/main/java/org/apache/druid/curator/announcement/NodeAnnouncer.java +++ b/server/src/main/java/org/apache/druid/curator/announcement/NodeAnnouncer.java @@ -63,7 +63,7 @@ public class NodeAnnouncer private final CuratorFramework curator; private final ConcurrentMap listeners = new ConcurrentHashMap<>(); - private final ConcurrentMap announcedPaths = new ConcurrentHashMap<>(); + private final ConcurrentHashMap announcedPaths = new ConcurrentHashMap<>(); @GuardedBy("toAnnounce") private boolean started = false; @@ -282,19 +282,23 @@ private NodeCache setupNodeCache(String path) private boolean updateAnnouncedPaths(String path, byte[] bytes) { - boolean created = false; synchronized (toAnnounce) { - if (started) { - byte[] oldBytes = announcedPaths.putIfAbsent(path, bytes); - - if (oldBytes == null) { - created = true; - } else if (!Arrays.equals(oldBytes, bytes)) { - throw new IAE("Cannot reannounce different values under the same path"); - } + if (!started) { + return false; // Do nothing if not started } } - return created; + + final byte[] updatedAnnouncementData = announcedPaths.compute(path, (key, oldBytes) -> { + if (oldBytes == null) { + return bytes; // Insert the new value + } else if (!Arrays.equals(oldBytes, bytes)) { + throw new IAE("Cannot reannounce different values under the same path"); + } + return oldBytes; // No change if values are equal + }); + + // Return true if we have updated the paths. + return Arrays.equals(updatedAnnouncementData, bytes); } @GuardedBy("toAnnounce") @@ -306,13 +310,16 @@ private void createPath(String parentPath, boolean removeParentsIfCreated) pathsCreatedInThisAnnouncer.add(parentPath); } log.debug( - "Created parentPath[%s], %s remove on stop() called.", + "Created parentPath[%s], %s remove when stop() is called.", parentPath, removeParentsIfCreated ? "will" : "will not" ); } + catch (KeeperException.NodeExistsException e) { + log.error(e, "The parentPath[%s] already exists.", parentPath); + } catch (Exception e) { - log.error(e, "Problem creating parentPath[%s], someone else created it first?", parentPath); + log.error(e, "Failed to create parentPath[%s].", parentPath); } } @@ -342,17 +349,22 @@ public void update(final String path, final byte[] bytes) throw new ISE("Cannot update a path[%s] that hasn't been announced!", path); } + boolean canUpdate = false; synchronized (toAnnounce) { - try { - if (!Arrays.equals(oldBytes, bytes)) { - announcedPaths.put(path, bytes); - updateAnnouncement(path, bytes); - } + if (!Arrays.equals(oldBytes, bytes)) { + announcedPaths.put(path, bytes); + canUpdate = true; } - catch (Exception e) { - throw new RuntimeException(e); + } + + try { + if (canUpdate) { + updateAnnouncement(path, bytes); } } + catch (Exception e) { + throw new RuntimeException(e); + } } private void createAnnouncement(final String path, byte[] value) throws Exception @@ -375,12 +387,14 @@ private void updateAnnouncement(final String path, final byte[] value) throws Ex */ public void unannounce(String path) { - log.info("unannouncing [%s]", path); - final byte[] value = announcedPaths.remove(path); + synchronized (toAnnounce) { + log.info("unannouncing [%s]", path); + final byte[] value = announcedPaths.remove(path); - if (value == null) { - log.error("Path[%s] not announced, cannot unannounce.", path); - return; + if (value == null) { + log.error("Path[%s] not announced, cannot unannounce.", path); + return; + } } try { From 954fa534d607d81a08f9ed2d197e3420d695ab17 Mon Sep 17 00:00:00 2001 From: GWphua Date: Tue, 12 Nov 2024 18:38:31 +0800 Subject: [PATCH 10/13] Add Unit Tests --- .../curator/announcement/NodeAnnouncer.java | 1 - .../announcement/NodeAnnouncerTest.java | 89 +++++++++++++++++++ 2 files changed, 89 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/apache/druid/curator/announcement/NodeAnnouncer.java b/server/src/main/java/org/apache/druid/curator/announcement/NodeAnnouncer.java index 99cf82c29910..4c17993631ba 100644 --- a/server/src/main/java/org/apache/druid/curator/announcement/NodeAnnouncer.java +++ b/server/src/main/java/org/apache/druid/curator/announcement/NodeAnnouncer.java @@ -179,7 +179,6 @@ private void dropPathsCreatedInThisAnnouncer() } } - /** * Overload of {@link #announce(String, byte[], boolean)}, but removes parent node of path after announcement. */ diff --git a/server/src/test/java/org/apache/druid/curator/announcement/NodeAnnouncerTest.java b/server/src/test/java/org/apache/druid/curator/announcement/NodeAnnouncerTest.java index 42fe943349d6..c50bdef41dfd 100644 --- a/server/src/test/java/org/apache/druid/curator/announcement/NodeAnnouncerTest.java +++ b/server/src/test/java/org/apache/druid/curator/announcement/NodeAnnouncerTest.java @@ -25,6 +25,7 @@ import org.apache.curator.framework.api.transaction.CuratorTransactionResult; import org.apache.curator.test.KillSession; import org.apache.druid.curator.CuratorTestBase; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.server.ZKPathsUtils; import org.apache.zookeeper.KeeperException.Code; @@ -39,6 +40,7 @@ import java.util.concurrent.CountDownLatch; /** + * */ public class NodeAnnouncerTest extends CuratorTestBase { @@ -55,6 +57,93 @@ public void tearDown() tearDownServerAndCurator(); } + @Test + public void testAnnounceBeforeStartingNodeAnnouncer() throws Exception + { + curator.start(); + curator.blockUntilConnected(); + NodeAnnouncer announcer = new NodeAnnouncer(curator); + final byte[] billy = StringUtils.toUtf8("billy"); + final String testPath = "/testAnnounce"; + + announcer.announce(testPath, billy); + announcer.start(); + + // Verify that the path was announced + Assert.assertArrayEquals(billy, curator.getData().decompressed().forPath(testPath)); + announcer.stop(); + } + + @Test + public void testCreateParentPath() throws Exception + { + curator.start(); + curator.blockUntilConnected(); + NodeAnnouncer announcer = new NodeAnnouncer(curator); + final byte[] billy = StringUtils.toUtf8("billy"); + final String testPath = "/newParent/testPath"; + final String parentPath = ZKPathsUtils.getParentPath(testPath); + + announcer.start(); + + Assert.assertNull("Parent path should not exist before announcement", curator.checkExists().forPath(parentPath)); + + // Announce with parent creation + announcer.announce(testPath, billy); + + // Wait for the announcement to be processed + while (curator.checkExists().forPath(testPath) == null) { + Thread.sleep(100); + } + + // Verify the parent path has been created + Assert.assertNotNull("Parent path should be created", curator.checkExists().forPath(parentPath)); + Assert.assertArrayEquals(billy, curator.getData().decompressed().forPath(testPath)); + + announcer.stop(); + } + + @Test + public void testUpdateSuccessfully() throws Exception + { + curator.start(); + curator.blockUntilConnected(); + NodeAnnouncer announcer = new NodeAnnouncer(curator); + final byte[] billy = StringUtils.toUtf8("billy"); + final byte[] tilly = StringUtils.toUtf8("tilly"); + final String testPath = "/testUpdate"; + + announcer.start(); + announcer.announce(testPath, billy); + Assert.assertArrayEquals(billy, curator.getData().decompressed().forPath(testPath)); + + announcer.update(testPath, billy); + Assert.assertArrayEquals(billy, curator.getData().decompressed().forPath(testPath)); + + announcer.update(testPath, tilly); + Assert.assertArrayEquals(tilly, curator.getData().decompressed().forPath(testPath)); + announcer.stop(); + } + + @Test + public void testUpdateWithNonExistentPath() throws Exception + { + curator.start(); + curator.blockUntilConnected(); + NodeAnnouncer announcer = new NodeAnnouncer(curator); + final byte[] billy = StringUtils.toUtf8("billy"); + final String testPath = "/testUpdate"; + + announcer.start(); + + Exception exception = Assert.assertThrows(ISE.class, () -> { + announcer.update(testPath, billy); + }); + Assert.assertTrue(exception.getMessage().contains("Cannot update a path")); + + announcer.stop(); + } + @Test(timeout = 60_000L) public void testSanity() throws Exception { From 25de238871d0345162ddbf9eaa5fd6dcb82818b8 Mon Sep 17 00:00:00 2001 From: GWphua Date: Wed, 13 Nov 2024 10:19:34 +0800 Subject: [PATCH 11/13] Remove humor from logs --- .../druid/indexing/worker/WorkerCuratorCoordinator.java | 2 +- .../org/apache/druid/curator/announcement/Announcer.java | 6 +++--- .../apache/druid/curator/announcement/NodeAnnouncer.java | 2 +- .../druid/curator/announcement/NodeAnnouncerTest.java | 3 +-- 4 files changed, 6 insertions(+), 7 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerCuratorCoordinator.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerCuratorCoordinator.java index 7ed8aea24f0b..6e09217e958e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerCuratorCoordinator.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerCuratorCoordinator.java @@ -86,7 +86,7 @@ public WorkerCuratorCoordinator( @LifecycleStart public void start() throws Exception { - log.info("WorkerCuratorCoordinator good to go sir. Server[%s]", worker.getHost()); + log.info("WorkerCuratorCoordinator good to go. Server[%s]", worker.getHost()); synchronized (lock) { if (started) { return; diff --git a/server/src/main/java/org/apache/druid/curator/announcement/Announcer.java b/server/src/main/java/org/apache/druid/curator/announcement/Announcer.java index 828ee3a6c793..2c12a73488cd 100644 --- a/server/src/main/java/org/apache/druid/curator/announcement/Announcer.java +++ b/server/src/main/java/org/apache/druid/curator/announcement/Announcer.java @@ -169,14 +169,14 @@ public void stop() transaction = transaction.delete().forPath(parent).and(); } catch (Exception e) { - log.info(e, "Unable to delete parent[%s], boooo.", parent); + log.info(e, "Unable to delete parent[%s].", parent); } } try { ((CuratorTransactionFinal) transaction).commit(); } catch (Exception e) { - log.info(e, "Unable to commit transaction. Please feed the hamsters"); + log.info(e, "Unable to commit transaction."); } } } @@ -356,7 +356,7 @@ public void update(final String path, final byte[] bytes) ConcurrentMap subPaths = announcements.get(parentPath); if (subPaths == null || subPaths.get(nodePath) == null) { - throw new ISE("Cannot update a path[%s] that hasn't been announced!", path); + throw new ISE("Cannot update path[%s] that hasn't been announced!", path); } synchronized (toAnnounce) { diff --git a/server/src/main/java/org/apache/druid/curator/announcement/NodeAnnouncer.java b/server/src/main/java/org/apache/druid/curator/announcement/NodeAnnouncer.java index 4c17993631ba..4eae8a31915b 100644 --- a/server/src/main/java/org/apache/druid/curator/announcement/NodeAnnouncer.java +++ b/server/src/main/java/org/apache/druid/curator/announcement/NodeAnnouncer.java @@ -345,7 +345,7 @@ public void update(final String path, final byte[] bytes) byte[] oldBytes = announcedPaths.get(path); if (oldBytes == null) { - throw new ISE("Cannot update a path[%s] that hasn't been announced!", path); + throw new ISE("Cannot update path[%s] that hasn't been announced!", path); } boolean canUpdate = false; diff --git a/server/src/test/java/org/apache/druid/curator/announcement/NodeAnnouncerTest.java b/server/src/test/java/org/apache/druid/curator/announcement/NodeAnnouncerTest.java index c50bdef41dfd..16beef3ccf67 100644 --- a/server/src/test/java/org/apache/druid/curator/announcement/NodeAnnouncerTest.java +++ b/server/src/test/java/org/apache/druid/curator/announcement/NodeAnnouncerTest.java @@ -139,8 +139,7 @@ public void testUpdateWithNonExistentPath() throws Exception Exception exception = Assert.assertThrows(ISE.class, () -> { announcer.update(testPath, billy); }); - Assert.assertTrue(exception.getMessage().contains("Cannot update a path")); - + Assert.assertEquals(exception.getMessage(), "Cannot update path[/testUpdate] that hasn't been announced!"); announcer.stop(); } From a7733e3a73b556b8784fd23100ac03633adfe8cf Mon Sep 17 00:00:00 2001 From: GWphua Date: Wed, 13 Nov 2024 15:28:48 +0800 Subject: [PATCH 12/13] Add unit test for NodeAnnouncer announcing same path different payload. --- .../curator/announcement/NodeAnnouncer.java | 8 +-- .../announcement/NodeAnnouncerTest.java | 58 ++++++++++++++++--- 2 files changed, 54 insertions(+), 12 deletions(-) diff --git a/server/src/main/java/org/apache/druid/curator/announcement/NodeAnnouncer.java b/server/src/main/java/org/apache/druid/curator/announcement/NodeAnnouncer.java index 4eae8a31915b..e85822916f9a 100644 --- a/server/src/main/java/org/apache/druid/curator/announcement/NodeAnnouncer.java +++ b/server/src/main/java/org/apache/druid/curator/announcement/NodeAnnouncer.java @@ -208,10 +208,10 @@ public void announce(String path, byte[] bytes, boolean removeParentIfCreated) } final String parentPath = ZKPathsUtils.getParentPath(path); - byte[] value = announcedPaths.get(path); + byte[] announcedPayload = announcedPaths.get(path); - // We have yet to announce this path. Check if we need to build a parent path. - if (value == null) { + if (announcedPayload == null) { + // Payload does not exist. We have yet to announce this path. Check if we need to build a parent path. boolean shouldBuildParentPath = canBuildParentPath(parentPath); // Synchronize to make sure that I only create a listener once. @@ -291,7 +291,7 @@ private boolean updateAnnouncedPaths(String path, byte[] bytes) if (oldBytes == null) { return bytes; // Insert the new value } else if (!Arrays.equals(oldBytes, bytes)) { - throw new IAE("Cannot reannounce different values under the same path"); + throw new IAE("Cannot reannounce different values under the same path."); } return oldBytes; // No change if values are equal }); diff --git a/server/src/test/java/org/apache/druid/curator/announcement/NodeAnnouncerTest.java b/server/src/test/java/org/apache/druid/curator/announcement/NodeAnnouncerTest.java index 16beef3ccf67..bcf0a2d14e33 100644 --- a/server/src/test/java/org/apache/druid/curator/announcement/NodeAnnouncerTest.java +++ b/server/src/test/java/org/apache/druid/curator/announcement/NodeAnnouncerTest.java @@ -25,6 +25,7 @@ import org.apache.curator.framework.api.transaction.CuratorTransactionResult; import org.apache.curator.test.KillSession; import org.apache.druid.curator.CuratorTestBase; +import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.server.ZKPathsUtils; @@ -74,7 +75,7 @@ public void testAnnounceBeforeStartingNodeAnnouncer() throws Exception announcer.stop(); } - @Test + @Test(timeout = 60_000L) public void testCreateParentPath() throws Exception { curator.start(); @@ -85,10 +86,7 @@ public void testCreateParentPath() throws Exception final String parentPath = ZKPathsUtils.getParentPath(testPath); announcer.start(); - Assert.assertNull("Parent path should not exist before announcement", curator.checkExists().forPath(parentPath)); - - // Announce with parent creation announcer.announce(testPath, billy); // Wait for the announcement to be processed @@ -96,10 +94,56 @@ public void testCreateParentPath() throws Exception Thread.sleep(100); } - // Verify the parent path has been created Assert.assertNotNull("Parent path should be created", curator.checkExists().forPath(parentPath)); Assert.assertArrayEquals(billy, curator.getData().decompressed().forPath(testPath)); + announcer.stop(); + } + + @Test(timeout = 60_000L) + public void testAnnounceSamePathWithDifferentPayloadThrowsIAE() throws Exception + { + curator.start(); + curator.blockUntilConnected(); + NodeAnnouncer announcer = new NodeAnnouncer(curator); + final byte[] billy = StringUtils.toUtf8("billy"); + final byte[] tilly = StringUtils.toUtf8("tilly"); + final String testPath = "/testPath"; + + announcer.start(); + announcer.announce(testPath, billy); + while (curator.checkExists().forPath(testPath) == null) { + Thread.sleep(100); + } + Assert.assertArrayEquals(billy, curator.getData().decompressed().forPath(testPath)); + // Nothing wrong when we announce same path. + announcer.announce(testPath, billy); + + // Something wrong when we announce different path. + Exception exception = Assert.assertThrows(IAE.class, () -> announcer.announce(testPath, tilly)); + Assert.assertEquals(exception.getMessage(), "Cannot reannounce different values under the same path."); + + // Confirm that the new announcement is invalidated, and we still have payload from previous announcement. + Assert.assertArrayEquals(billy, curator.getData().decompressed().forPath(testPath)); + announcer.stop(); + } + + @Test + public void testUpdateBeforeStartingNodeAnnouncer() throws Exception + { + curator.start(); + curator.blockUntilConnected(); + NodeAnnouncer announcer = new NodeAnnouncer(curator); + final byte[] billy = StringUtils.toUtf8("billy"); + final byte[] tilly = StringUtils.toUtf8("tilly"); + final String testPath = "/testAnnounce"; + + announcer.update(testPath, tilly); + announcer.announce(testPath, billy); + announcer.start(); + + // Verify that the path was announced + Assert.assertArrayEquals(tilly, curator.getData().decompressed().forPath(testPath)); announcer.stop(); } @@ -136,9 +180,7 @@ public void testUpdateWithNonExistentPath() throws Exception announcer.start(); - Exception exception = Assert.assertThrows(ISE.class, () -> { - announcer.update(testPath, billy); - }); + Exception exception = Assert.assertThrows(ISE.class, () -> announcer.update(testPath, billy)); Assert.assertEquals(exception.getMessage(), "Cannot update path[/testUpdate] that hasn't been announced!"); announcer.stop(); } From f541b84a44d7d2458bb349e859da926b05146a24 Mon Sep 17 00:00:00 2001 From: GWphua Date: Fri, 15 Nov 2024 16:20:20 +0800 Subject: [PATCH 13/13] Tweak import and change Javadocs --- .../apache/druid/curator/announcement/Announcer.java | 12 ++++++------ .../druid/curator/announcement/NodeAnnouncer.java | 10 +++------- 2 files changed, 9 insertions(+), 13 deletions(-) diff --git a/server/src/main/java/org/apache/druid/curator/announcement/Announcer.java b/server/src/main/java/org/apache/druid/curator/announcement/Announcer.java index 2c12a73488cd..ba87ec07e258 100644 --- a/server/src/main/java/org/apache/druid/curator/announcement/Announcer.java +++ b/server/src/main/java/org/apache/druid/curator/announcement/Announcer.java @@ -20,6 +20,7 @@ package org.apache.druid.curator.announcement; import com.google.common.annotations.VisibleForTesting; +import com.google.errorprone.annotations.concurrent.GuardedBy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.api.transaction.CuratorTransaction; import org.apache.curator.framework.api.transaction.CuratorTransactionFinal; @@ -39,7 +40,6 @@ import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.data.Stat; -import javax.annotation.concurrent.GuardedBy; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -54,13 +54,13 @@ import java.util.concurrent.atomic.AtomicReference; /** - * The {@link Announcer} class manages the announcement of multiple child nodes - * under a specified parent path in a ZooKeeper ensemble. It monitors these nodes + * The {@link Announcer} class manages the announcement of a node, and watches all child + * nodes under the specified path in a ZooKeeper ensemble. It monitors these nodes * to ensure their existence and manage their lifecycle collectively. * - *

Utilize this class when you need to handle complex node structures, - * including relationships between multiple child nodes. Should your use case - * involve the management of a standalone node instead, see {@link NodeAnnouncer}.

+ *

Use this class when you need to manage the lifecycle of all child nodes under the + * specified path. Should your use case involve the management of a standalone node + * instead, see {@link NodeAnnouncer}.

*/ public class Announcer { diff --git a/server/src/main/java/org/apache/druid/curator/announcement/NodeAnnouncer.java b/server/src/main/java/org/apache/druid/curator/announcement/NodeAnnouncer.java index e85822916f9a..35d7e27b0640 100644 --- a/server/src/main/java/org/apache/druid/curator/announcement/NodeAnnouncer.java +++ b/server/src/main/java/org/apache/druid/curator/announcement/NodeAnnouncer.java @@ -20,6 +20,7 @@ package org.apache.druid.curator.announcement; import com.google.common.annotations.VisibleForTesting; +import com.google.errorprone.annotations.concurrent.GuardedBy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.api.transaction.CuratorOp; import org.apache.curator.framework.recipes.cache.ChildData; @@ -35,7 +36,6 @@ import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; -import javax.annotation.concurrent.GuardedBy; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -49,13 +49,9 @@ * and monitors its existence to ensure that it remains active until it is * explicitly unannounced or the object is closed. * - *

This class provides methods to announce and update the content of the - * node as well as handle path creation if required.

- * *

Use this class when you need to manage the lifecycle of a standalone - * node without concerns about its children or siblings. Should your use case - * involve the management of child nodes under a specific parent path in a - * ZooKeeper ensemble, see {@link Announcer}.

+ * node. Should your use case involve watching all child nodes of your specified + * path in a ZooKeeper ensemble, see {@link Announcer}.

*/ public class NodeAnnouncer {