Skip to content

Commit

Permalink
Add persistent watcher for persistentNode changes
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanlb committed Feb 16, 2024
1 parent f251fd8 commit 8eb6c94
Show file tree
Hide file tree
Showing 7 changed files with 348 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ public static AsyncCuratorFramework build(
});

curator.start();

LOG.info(
"Started curator server with the following config zkhost: {}, path prefix: {}, "
+ "connection timeout ms: {}, session timeout ms {} and retry policy {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import java.io.Closeable;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -18,6 +19,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.curator.framework.recipes.nodes.PersistentNode;
import org.apache.curator.framework.recipes.watch.PersistentWatcher;
import org.apache.curator.x.async.AsyncCuratorFramework;
import org.apache.curator.x.async.api.CreateOption;
import org.apache.curator.x.async.modeled.ModelSerializer;
Expand All @@ -27,6 +29,7 @@
import org.apache.curator.x.async.modeled.cached.CachedModeledFramework;
import org.apache.curator.x.async.modeled.cached.ModeledCacheListener;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -63,6 +66,7 @@ public class KaldbMetadataStore<T extends KaldbMetadata> implements Closeable {
new ConcurrentHashMap<>();

private final Map<String, PersistentNode> persistentNodeMap = new ConcurrentHashMap<>();
private final Map<String, PersistentWatcher> persistentWatcherMap = new ConcurrentHashMap<>();

public static final String PERSISTENT_NODE_RECREATED_COUNTER =
"metadata_persistent_node_recreated";
Expand Down Expand Up @@ -112,34 +116,92 @@ public static boolean persistentEphemeralModeEnabled() {
public CompletionStage<String> createAsync(T metadataNode) {
if (createMode == CreateMode.EPHEMERAL && persistentEphemeralModeEnabled()) {
String nodePath = resolvePath(metadataNode);
// persistent node already implements NodeDataChanged
PersistentNode node =
new PersistentNode(
curator.unwrap(),
createMode,
false,
nodePath,
modelSpec.serializer().serialize(metadataNode));
persistentNodeMap.put(nodePath, node);
node.start();
// todo - what happens when attempting to create over existing?
return CompletableFuture.supplyAsync(
() -> {
try {
node.waitForInitialCreate(DEFAULT_ZK_TIMEOUT_SECS, TimeUnit.SECONDS);
node.getListenable().addListener(_ -> persistentNodeRecreatedCounter.increment());
return nodePath;
} catch (Exception e) {
throw new CompletionException(e);
}
});
return hasAsync(metadataNode.name)
.thenApplyAsync(
(stat) -> {
// it is possible that we have a node that hasn't been yet async persisted to ZK
if (stat != null || persistentNodeMap.containsKey(nodePath)) {
throw new CompletionException(
new IllegalArgumentException(
String.format("Node already exists at '%s'", nodePath)));
}

PersistentNode node =
new PersistentNode(
curator.unwrap(),
createMode,
false,
nodePath,
modelSpec.serializer().serialize(metadataNode));
persistentNodeMap.put(nodePath, node);
node.start();

try {
node.waitForInitialCreate(DEFAULT_ZK_TIMEOUT_SECS, TimeUnit.SECONDS);
node.getListenable().addListener(_ -> persistentNodeRecreatedCounter.increment());

// add a persistent watcher for node data changes on this persistent ephemeral
// node this is so when someone else updates a field on the ephemeral node, the
// owner also updates their local copy
PersistentWatcher persistentWatcher =
new PersistentWatcher(curator.unwrap(), node.getActualPath(), false);
persistentWatcher
.getListenable()
.addListener(
event -> {
try {
if (event.getType() == Watcher.Event.EventType.NodeDataChanged) {
node.waitForInitialCreate(
DEFAULT_ZK_TIMEOUT_SECS, TimeUnit.SECONDS);
modeledClient
.withPath(ZPath.parse(event.getPath()))
.read()
.thenAcceptAsync(
(updated) -> {
try {
if (node.getActualPath() != null) {
byte[] updatedBytes =
modelSpec.serializer().serialize(updated);
if (!Arrays.equals(node.getData(), updatedBytes)) {
// only trigger a setData if something actually
// changed, otherwise
// we end up in a deathloop
node.setData(
modelSpec.serializer().serialize(updated));
}
}
} catch (Exception e) {
LOG.error(
"Error attempting to set local node data - fatal ZK error",
e);
new RuntimeHalterImpl().handleFatal(e);
}
});
}
} catch (Exception e) {
LOG.error(
"Error attempting to watch NodeDataChanged - fatal ZK error", e);
new RuntimeHalterImpl().handleFatal(e);
}
});
persistentWatcherMap.put(nodePath, persistentWatcher);
persistentWatcher.start();
return nodePath;
} catch (Exception e) {
throw new CompletionException(e);
}
});
} else {
// by passing the version 0, this will throw if we attempt to create and it already exists
return modeledClient.set(metadataNode, 0);
}
}

// resolveForSet
/**
* Based off of the private ModelFrameWorkImp resolveForSet
*
* @see org.apache.curator.x.async.modeled.details.ModeledFrameworkImpl.resolveForSet
*/
private String resolvePath(T model) {
if (modelSpec.path().isResolved()) {
return modelSpec.path().fullPath();
Expand All @@ -158,6 +220,11 @@ public void createSync(T metadataNode) {
}

public CompletionStage<T> getAsync(String path) {
PersistentNode node = getPersistentNodeIfExists(path);
if (node != null) {
return CompletableFuture.supplyAsync(
() -> modelSpec.serializer().deserialize(node.getData()));
}
if (cachedModeledFramework != null) {
return cachedModeledFramework.withPath(zPath.resolved(path)).readThrough();
}
Expand All @@ -173,6 +240,8 @@ public T getSync(String path) {
}

public CompletionStage<Stat> hasAsync(String path) {
// We don't use the persist node here, as we want to get the actual stat details which isn't
// available on the persistentnode
if (cachedModeledFramework != null) {
awaitCacheInitialized();
return cachedModeledFramework.withPath(zPath.resolved(path)).checkExists();
Expand Down Expand Up @@ -317,11 +386,23 @@ private PersistentNode getPersistentNodeIfExists(T metadataNode) {
return persistentNodeMap.getOrDefault(resolvePath(metadataNode), null);
}

private PersistentNode getPersistentNodeIfExists(String path) {
return persistentNodeMap.getOrDefault(zPath.resolved(path).fullPath(), null);
}

private PersistentNode removePersistentNodeIfExists(T metadataNode) {
PersistentWatcher watcher = persistentWatcherMap.remove(resolvePath(metadataNode));
if (watcher != null) {
watcher.close();
}
return persistentNodeMap.remove(resolvePath(metadataNode));
}

private PersistentNode removePersistentNodeIfExists(String path) {
PersistentWatcher watcher = persistentWatcherMap.remove(zPath.resolved(path).fullPath());
if (watcher != null) {
watcher.close();
}
return persistentNodeMap.remove(zPath.resolved(path).fullPath());
}

Expand All @@ -342,11 +423,21 @@ public void initialized() {

@Override
public void close() {
persistentWatcherMap.forEach(
(_, persistentWatcher) -> {
try {
persistentWatcher.close();
} catch (Exception e) {
LOG.error("Error removing persistent watchers", e);
}
});

persistentNodeMap.forEach(
(_, persistentNode) -> {
try {
persistentNode.close();
} catch (Exception ignored) {
} catch (Exception e) {
LOG.error("Error removing persistent nodes", e);
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,13 @@ protected void shutDown() throws Exception {
private void recoveryNodeListener(RecoveryNodeMetadata recoveryNodeMetadata) {
Metadata.RecoveryNodeMetadata.RecoveryNodeState newRecoveryNodeState =
recoveryNodeMetadata.recoveryNodeState;
if (recoveryNodeLastKnownState.equals(recoveryNodeMetadata.recoveryNodeState)) {
// todo - consider moving this to a model where it deduplicates with a scheduled executor,
// similar to the manager
// This can fire duplicate events if the ephemeral node is re-created
LOG.info("Recovery node - listener fired with no state change, skipping event");
return;
}

if (newRecoveryNodeState.equals(Metadata.RecoveryNodeMetadata.RecoveryNodeState.ASSIGNED)) {
LOG.info("Recovery node - ASSIGNED received");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,11 @@ public void shouldHandleChunkLivecycle() throws Exception {

// ensure that the chunk was marked LIVE
await().until(() -> KaldbMetadataTestUtils.listSyncUncached(searchMetadataStore).size() == 1);
assertThat(readOnlyChunk.getChunkMetadataState())
.isEqualTo(Metadata.CacheSlotMetadata.CacheSlotState.LIVE);
await()
.until(
() ->
readOnlyChunk.getChunkMetadataState()
== Metadata.CacheSlotMetadata.CacheSlotState.LIVE);

SearchResult<LogMessage> logMessageSearchResult =
readOnlyChunk.query(
Expand Down Expand Up @@ -230,6 +233,10 @@ public void shouldHandleChunkLivecycle() throws Exception {
assertThat(meterRegistry.get(CHUNK_ASSIGNMENT_TIMER).tag("successful", "false").timer().count())
.isEqualTo(0);

cacheSlotMetadataStore.close();
searchMetadataStore.close();
snapshotMetadataStore.close();
replicaMetadataStore.close();
curatorFramework.unwrap().close();
}

Expand Down Expand Up @@ -300,6 +307,10 @@ public void shouldHandleMissingS3Assets() throws Exception {
assertThat(meterRegistry.get(CHUNK_ASSIGNMENT_TIMER).tag("successful", "false").timer().count())
.isEqualTo(1);

cacheSlotMetadataStore.close();
searchMetadataStore.close();
snapshotMetadataStore.close();
replicaMetadataStore.close();
curatorFramework.unwrap().close();
}

Expand Down Expand Up @@ -370,6 +381,10 @@ public void shouldHandleMissingZkData() throws Exception {
assertThat(meterRegistry.get(CHUNK_ASSIGNMENT_TIMER).tag("successful", "false").timer().count())
.isEqualTo(1);

cacheSlotMetadataStore.close();
searchMetadataStore.close();
snapshotMetadataStore.close();
replicaMetadataStore.close();
curatorFramework.unwrap().close();
}

Expand Down Expand Up @@ -473,6 +488,10 @@ public void closeShouldCleanupLiveChunkCorrectly() throws Exception {
assertThat(files.findFirst().isPresent()).isFalse();
}

cacheSlotMetadataStore.close();
searchMetadataStore.close();
snapshotMetadataStore.close();
replicaMetadataStore.close();
curatorFramework.unwrap().close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public void setUp() throws Exception {

@AfterEach
public void tearDown() throws IOException {
store.close();
curatorFramework.unwrap().close();
testingServer.close();
meterRegistry.close();
Expand Down
Loading

0 comments on commit 8eb6c94

Please sign in to comment.