Skip to content

Commit

Permalink
Refactor ZK persistent node logic into wrapper class
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanlb committed Feb 16, 2024
1 parent 8eb6c94 commit d3ca51f
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@
import static com.slack.kaldb.server.KaldbConfig.DEFAULT_ZK_TIMEOUT_SECS;

import com.slack.kaldb.util.RuntimeHalterImpl;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import java.io.Closeable;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -18,8 +16,6 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.curator.framework.recipes.nodes.PersistentNode;
import org.apache.curator.framework.recipes.watch.PersistentWatcher;
import org.apache.curator.x.async.AsyncCuratorFramework;
import org.apache.curator.x.async.api.CreateOption;
import org.apache.curator.x.async.modeled.ModelSerializer;
Expand All @@ -29,7 +25,6 @@
import org.apache.curator.x.async.modeled.cached.CachedModeledFramework;
import org.apache.curator.x.async.modeled.cached.ModeledCacheListener;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -65,12 +60,8 @@ public class KaldbMetadataStore<T extends KaldbMetadata> implements Closeable {
private final Map<KaldbMetadataStoreChangeListener<T>, ModeledCacheListener<T>> listenerMap =
new ConcurrentHashMap<>();

private final Map<String, PersistentNode> persistentNodeMap = new ConcurrentHashMap<>();
private final Map<String, PersistentWatcher> persistentWatcherMap = new ConcurrentHashMap<>();

public static final String PERSISTENT_NODE_RECREATED_COUNTER =
"metadata_persistent_node_recreated";
private final Counter persistentNodeRecreatedCounter;
private final Map<String, PersistentWatchedNode> persistentNodeMap = new ConcurrentHashMap<>();
private final MeterRegistry meterRegistry;

public KaldbMetadataStore(
AsyncCuratorFramework curator,
Expand Down Expand Up @@ -102,15 +93,15 @@ public KaldbMetadataStore(
cachedModeledFramework = null;
}

persistentNodeRecreatedCounter = meterRegistry.counter(PERSISTENT_NODE_RECREATED_COUNTER);
this.meterRegistry = meterRegistry;
LOG.info(
"Persistent ephemeral mode '{}' enabled - {}",
PERSISTENT_EPHEMERAL_PROPERTY,
persistentEphemeralModeEnabled());
}

public static boolean persistentEphemeralModeEnabled() {
return Boolean.parseBoolean(System.getProperty(PERSISTENT_EPHEMERAL_PROPERTY, "true"));
return Boolean.parseBoolean(System.getProperty(PERSISTENT_EPHEMERAL_PROPERTY, "false"));
}

public CompletionStage<String> createAsync(T metadataNode) {
Expand All @@ -125,71 +116,17 @@ public CompletionStage<String> createAsync(T metadataNode) {
new IllegalArgumentException(
String.format("Node already exists at '%s'", nodePath)));
}

PersistentNode node =
new PersistentNode(
PersistentWatchedNode node =
new PersistentWatchedNode(
curator.unwrap(),
createMode,
false,
nodePath,
modelSpec.serializer().serialize(metadataNode));
modelSpec.serializer().serialize(metadataNode),
meterRegistry);
persistentNodeMap.put(nodePath, node);
node.start();

try {
node.waitForInitialCreate(DEFAULT_ZK_TIMEOUT_SECS, TimeUnit.SECONDS);
node.getListenable().addListener(_ -> persistentNodeRecreatedCounter.increment());

// add a persistent watcher for node data changes on this persistent ephemeral
// node this is so when someone else updates a field on the ephemeral node, the
// owner also updates their local copy
PersistentWatcher persistentWatcher =
new PersistentWatcher(curator.unwrap(), node.getActualPath(), false);
persistentWatcher
.getListenable()
.addListener(
event -> {
try {
if (event.getType() == Watcher.Event.EventType.NodeDataChanged) {
node.waitForInitialCreate(
DEFAULT_ZK_TIMEOUT_SECS, TimeUnit.SECONDS);
modeledClient
.withPath(ZPath.parse(event.getPath()))
.read()
.thenAcceptAsync(
(updated) -> {
try {
if (node.getActualPath() != null) {
byte[] updatedBytes =
modelSpec.serializer().serialize(updated);
if (!Arrays.equals(node.getData(), updatedBytes)) {
// only trigger a setData if something actually
// changed, otherwise
// we end up in a deathloop
node.setData(
modelSpec.serializer().serialize(updated));
}
}
} catch (Exception e) {
LOG.error(
"Error attempting to set local node data - fatal ZK error",
e);
new RuntimeHalterImpl().handleFatal(e);
}
});
}
} catch (Exception e) {
LOG.error(
"Error attempting to watch NodeDataChanged - fatal ZK error", e);
new RuntimeHalterImpl().handleFatal(e);
}
});
persistentWatcherMap.put(nodePath, persistentWatcher);
persistentWatcher.start();
return nodePath;
} catch (Exception e) {
throw new CompletionException(e);
}
return nodePath;
});
} else {
// by passing the version 0, this will throw if we attempt to create and it already exists
Expand Down Expand Up @@ -220,7 +157,7 @@ public void createSync(T metadataNode) {
}

public CompletionStage<T> getAsync(String path) {
PersistentNode node = getPersistentNodeIfExists(path);
PersistentWatchedNode node = getPersistentNodeIfExists(path);
if (node != null) {
return CompletableFuture.supplyAsync(
() -> modelSpec.serializer().deserialize(node.getData()));
Expand Down Expand Up @@ -259,10 +196,9 @@ public boolean hasSync(String path) {
}

public CompletionStage<Stat> updateAsync(T metadataNode) {
PersistentNode node = getPersistentNodeIfExists(metadataNode);
PersistentWatchedNode node = getPersistentNodeIfExists(metadataNode);
if (node != null) {
try {
node.waitForInitialCreate(DEFAULT_ZK_TIMEOUT_SECS, TimeUnit.SECONDS);
node.setData(modelSpec.serializer().serialize(metadataNode));
return CompletableFuture.completedFuture(null);
} catch (Exception e) {
Expand All @@ -284,10 +220,9 @@ public void updateSync(T metadataNode) {
}

public CompletionStage<Void> deleteAsync(String path) {
PersistentNode node = removePersistentNodeIfExists(path);
PersistentWatchedNode node = removePersistentNodeIfExists(path);
if (node != null) {
try {
node.waitForInitialCreate(DEFAULT_ZK_TIMEOUT_SECS, TimeUnit.SECONDS);
node.close();
return CompletableFuture.completedFuture(null);
} catch (Exception e) {
Expand All @@ -307,10 +242,9 @@ public void deleteSync(String path) {
}

public CompletionStage<Void> deleteAsync(T metadataNode) {
PersistentNode node = removePersistentNodeIfExists(metadataNode);
PersistentWatchedNode node = removePersistentNodeIfExists(metadataNode);
if (node != null) {
try {
node.waitForInitialCreate(DEFAULT_ZK_TIMEOUT_SECS, TimeUnit.SECONDS);
node.close();
return CompletableFuture.completedFuture(null);
} catch (Exception e) {
Expand Down Expand Up @@ -382,27 +316,19 @@ private void awaitCacheInitialized() {
}
}

private PersistentNode getPersistentNodeIfExists(T metadataNode) {
private PersistentWatchedNode getPersistentNodeIfExists(T metadataNode) {
return persistentNodeMap.getOrDefault(resolvePath(metadataNode), null);
}

private PersistentNode getPersistentNodeIfExists(String path) {
private PersistentWatchedNode getPersistentNodeIfExists(String path) {
return persistentNodeMap.getOrDefault(zPath.resolved(path).fullPath(), null);
}

private PersistentNode removePersistentNodeIfExists(T metadataNode) {
PersistentWatcher watcher = persistentWatcherMap.remove(resolvePath(metadataNode));
if (watcher != null) {
watcher.close();
}
private PersistentWatchedNode removePersistentNodeIfExists(T metadataNode) {
return persistentNodeMap.remove(resolvePath(metadataNode));
}

private PersistentNode removePersistentNodeIfExists(String path) {
PersistentWatcher watcher = persistentWatcherMap.remove(zPath.resolved(path).fullPath());
if (watcher != null) {
watcher.close();
}
private PersistentWatchedNode removePersistentNodeIfExists(String path) {
return persistentNodeMap.remove(zPath.resolved(path).fullPath());
}

Expand All @@ -423,15 +349,6 @@ public void initialized() {

@Override
public void close() {
persistentWatcherMap.forEach(
(_, persistentWatcher) -> {
try {
persistentWatcher.close();
} catch (Exception e) {
LOG.error("Error removing persistent watchers", e);
}
});

persistentNodeMap.forEach(
(_, persistentNode) -> {
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package com.slack.kaldb.metadata.core;

import com.slack.kaldb.util.RuntimeHalterImpl;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.nodes.PersistentNode;
import org.apache.curator.framework.recipes.watch.PersistentWatcher;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PersistentWatchedNode implements Closeable {

private static final Logger LOG = LoggerFactory.getLogger(PersistentWatchedNode.class);
private final PersistentNode node;
private PersistentWatcher watcher;

public static final String PERSISTENT_NODE_RECREATED_COUNTER =
"metadata_persistent_node_recreated";
private final Counter persistentNodeRecreatedCounter;

private final CuratorFramework givenClient;

public PersistentWatchedNode(
CuratorFramework givenClient,
final CreateMode mode,
boolean useProtection,
final String basePath,
byte[] initData,
MeterRegistry meterRegistry) {
this.givenClient = givenClient;
persistentNodeRecreatedCounter = meterRegistry.counter(PERSISTENT_NODE_RECREATED_COUNTER);
node = new PersistentNode(givenClient, mode, useProtection, basePath, initData);
node.getListenable().addListener(_ -> persistentNodeRecreatedCounter.increment());
}

private Watcher nodeWatcher() {
return event -> {
try {
if (event.getType() == Watcher.Event.EventType.NodeDataChanged) {
// get data
givenClient
.getData()
.inBackground(
(_, event1) -> {
if (node.getActualPath() != null) {
byte[] updatedBytes = event1.getData();
if (!Arrays.equals(node.getData(), updatedBytes)) {
// only trigger a setData if something actually
// changed, otherwise we end up in a deathloop
node.setData(updatedBytes);
}
}
})
.forPath(event.getPath());
}
} catch (Exception e) {
LOG.info("Error", e);
new RuntimeHalterImpl().handleFatal(e);
}
};
}

public void start() {
node.start();
try {
node.waitForInitialCreate(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
watcher = new PersistentWatcher(givenClient, node.getActualPath(), false);
watcher.start();
watcher.getListenable().addListener(nodeWatcher());
}

public void setData(byte[] data) throws Exception {
node.setData(data);
}

public byte[] getData() {
return node.getData();
}

@Override
public void close() throws IOException {
if (watcher != null) {
watcher.close();
}
node.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.curator.RetryPolicy;
Expand Down Expand Up @@ -62,8 +63,8 @@ public void tearDown() throws Exception {
testingServer.close();
meterRegistry.close();

// reset to true
System.setProperty(KaldbMetadataStore.PERSISTENT_EPHEMERAL_PROPERTY, "true");
// clear any overrides
System.clearProperty(KaldbMetadataStore.PERSISTENT_EPHEMERAL_PROPERTY);
}

private static class TestMetadata extends KaldbMetadata {
Expand Down Expand Up @@ -528,7 +529,7 @@ public TestMetadataStore() {
.until(
() ->
MetricsUtil.getCount(
KaldbMetadataStore.PERSISTENT_NODE_RECREATED_COUNTER, meterRegistry)
PersistentWatchedNode.PERSISTENT_NODE_RECREATED_COUNTER, meterRegistry)
>= 1);

Stat afterStat = store.hasAsync(metadata1.name).toCompletableFuture().get();
Expand Down Expand Up @@ -596,7 +597,7 @@ public TestMetadataStore() {
.until(
() ->
MetricsUtil.getCount(
KaldbMetadataStore.PERSISTENT_NODE_RECREATED_COUNTER, meterRegistry)
PersistentWatchedNode.PERSISTENT_NODE_RECREATED_COUNTER, meterRegistry)
>= 1);

Stat afterStat = store.hasAsync(metadata1.name).toCompletableFuture().get();
Expand Down Expand Up @@ -655,7 +656,7 @@ public TestMetadataStore() {
assertThrows(InternalMetadataStoreException.class, () -> store.getSync(metadata1.name));
assertThat(
MetricsUtil.getCount(
KaldbMetadataStore.PERSISTENT_NODE_RECREATED_COUNTER, meterRegistry))
PersistentWatchedNode.PERSISTENT_NODE_RECREATED_COUNTER, meterRegistry))
.isEqualTo(0);
}

Expand Down Expand Up @@ -705,7 +706,7 @@ public TestMetadataStore() {
assertThrows(InternalMetadataStoreException.class, () -> store.getSync(metadata1.name));
assertThat(
MetricsUtil.getCount(
KaldbMetadataStore.PERSISTENT_NODE_RECREATED_COUNTER, meterRegistry))
PersistentWatchedNode.PERSISTENT_NODE_RECREATED_COUNTER, meterRegistry))
.isEqualTo(0);
}

Expand Down Expand Up @@ -878,6 +879,7 @@ public TestMetadataStore(AsyncCuratorFramework curator) {
}

await()
.atMost(30, TimeUnit.SECONDS)
.until(() -> curator1.getZookeeperClient().getZooKeeper().getSessionId() != sessionId1);

// wait until the node shows back up on ZK
Expand Down

0 comments on commit d3ca51f

Please sign in to comment.