Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Huge Number of Watches in ZooKeeper #17482

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,10 @@
import com.google.inject.Inject;
import org.apache.curator.framework.CuratorFramework;
import org.apache.druid.curator.CuratorUtils;
import org.apache.druid.curator.announcement.Announcer;
import org.apache.druid.curator.announcement.NodeAnnouncer;
import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.common.logger.Logger;
Expand All @@ -54,7 +53,7 @@ public class WorkerCuratorCoordinator
private final ObjectMapper jsonMapper;
private final RemoteTaskRunnerConfig config;
private final CuratorFramework curatorFramework;
private final Announcer announcer;
private final NodeAnnouncer announcer;

private final String baseAnnouncementsPath;
private final String baseTaskPath;
Expand All @@ -77,7 +76,7 @@ public WorkerCuratorCoordinator(
this.curatorFramework = curatorFramework;
this.worker = worker;

this.announcer = new Announcer(curatorFramework, Execs.directExecutor());
this.announcer = new NodeAnnouncer(curatorFramework);

this.baseAnnouncementsPath = getPath(Arrays.asList(indexerZkConfig.getAnnouncementsPath(), worker.getHost()));
this.baseTaskPath = getPath(Arrays.asList(indexerZkConfig.getTasksPath(), worker.getHost()));
Expand All @@ -87,7 +86,7 @@ public WorkerCuratorCoordinator(
@LifecycleStart
public void start() throws Exception
{
log.info("WorkerCuratorCoordinator good to go sir. Server[%s]", worker.getHost());
log.info("WorkerCuratorCoordinator good to go. Server[%s]", worker.getHost());
synchronized (lock) {
if (started) {
return;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.curator.announcement;

/**
* The {@link Announceable} is a representation of an announcement to be made in ZooKeeper.
*/
class Announceable
{
/**
* Represents the path in ZooKeeper where the announcement will be made.
*/
final String path;

/**
* Holds the actual data to be announced.
*/
final byte[] bytes;

/**
* Indicates whether parent nodes should be removed if the announcement is created successfully.
* This can be useful for cleaning up unused paths in ZooKeeper.
*/
final boolean removeParentsIfCreated;

public Announceable(String path, byte[] bytes, boolean removeParentsIfCreated)
{
this.path = path;
this.bytes = bytes;
this.removeParentsIfCreated = removeParentsIfCreated;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.curator.announcement;

import com.google.common.annotations.VisibleForTesting;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.transaction.CuratorTransaction;
import org.apache.curator.framework.api.transaction.CuratorTransactionFinal;
Expand Down Expand Up @@ -53,7 +54,13 @@
import java.util.concurrent.atomic.AtomicReference;

/**
* Announces things on Zookeeper.
* The {@link Announcer} class manages the announcement of a node, and watches all child
* nodes under the specified path in a ZooKeeper ensemble. It monitors these nodes
* to ensure their existence and manage their lifecycle collectively.
*
* <p>Use this class when you need to manage the lifecycle of all child nodes under the
* specified path. Should your use case involve the management of a standalone node
* instead, see {@link NodeAnnouncer}.</p>
*/
public class Announcer
{
Expand All @@ -63,11 +70,13 @@ public class Announcer
private final PathChildrenCacheFactory factory;
private final ExecutorService pathChildrenCacheExecutor;

@GuardedBy("toAnnounce")
private final List<Announceable> toAnnounce = new ArrayList<>();
@GuardedBy("toAnnounce")
private final List<Announceable> toUpdate = new ArrayList<>();
private final ConcurrentMap<String, PathChildrenCache> listeners = new ConcurrentHashMap<>();
private final ConcurrentMap<String, ConcurrentMap<String, byte[]>> announcements = new ConcurrentHashMap<>();
private final List<String> parentsIBuilt = new CopyOnWriteArrayList<String>();
private final List<String> parentsIBuilt = new CopyOnWriteArrayList<>();

// Used for testing
private Set<String> addedChildren;
Expand Down Expand Up @@ -160,14 +169,14 @@ public void stop()
transaction = transaction.delete().forPath(parent).and();
}
catch (Exception e) {
log.info(e, "Unable to delete parent[%s], boooo.", parent);
log.info(e, "Unable to delete parent[%s].", parent);
}
}
try {
((CuratorTransactionFinal) transaction).commit();
}
catch (Exception e) {
log.info(e, "Unable to commit transaction. Please feed the hamsters");
log.info(e, "Unable to commit transaction.");
}
}
}
Expand Down Expand Up @@ -228,7 +237,7 @@ public void announce(String path, byte[] bytes, boolean removeParentIfCreated)
cache.getListenable().addListener(
new PathChildrenCacheListener()
{
private final AtomicReference<Set<String>> pathsLost = new AtomicReference<Set<String>>(null);
private final AtomicReference<Set<String>> pathsLost = new AtomicReference<>(null);

@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
Expand Down Expand Up @@ -347,7 +356,7 @@ public void update(final String path, final byte[] bytes)
ConcurrentMap<String, byte[]> subPaths = announcements.get(parentPath);

if (subPaths == null || subPaths.get(nodePath) == null) {
throw new ISE("Cannot update a path[%s] that hasn't been announced!", path);
throw new ISE("Cannot update path[%s] that hasn't been announced!", path);
}

synchronized (toAnnounce) {
Expand Down Expand Up @@ -430,18 +439,4 @@ private void createPath(String parentPath, boolean removeParentsIfCreated)
log.info(e, "Problem creating parentPath[%s], someone else created it first?", parentPath);
}
}

private static class Announceable
{
final String path;
final byte[] bytes;
final boolean removeParentsIfCreated;

public Announceable(String path, byte[] bytes, boolean removeParentsIfCreated)
{
this.path = path;
this.bytes = bytes;
this.removeParentsIfCreated = removeParentsIfCreated;
}
}
}
Loading
Loading