diff --git a/kaldb/src/main/java/com/slack/kaldb/metadata/core/CuratorBuilder.java b/kaldb/src/main/java/com/slack/kaldb/metadata/core/CuratorBuilder.java index 08000139e9..5c11749054 100644 --- a/kaldb/src/main/java/com/slack/kaldb/metadata/core/CuratorBuilder.java +++ b/kaldb/src/main/java/com/slack/kaldb/metadata/core/CuratorBuilder.java @@ -81,7 +81,6 @@ public static AsyncCuratorFramework build( }); curator.start(); - LOG.info( "Started curator server with the following config zkhost: {}, path prefix: {}, " + "connection timeout ms: {}, session timeout ms {} and retry policy {}", diff --git a/kaldb/src/main/java/com/slack/kaldb/metadata/core/KaldbMetadataStore.java b/kaldb/src/main/java/com/slack/kaldb/metadata/core/KaldbMetadataStore.java index b685e9261f..9c9f69d8ad 100644 --- a/kaldb/src/main/java/com/slack/kaldb/metadata/core/KaldbMetadataStore.java +++ b/kaldb/src/main/java/com/slack/kaldb/metadata/core/KaldbMetadataStore.java @@ -6,6 +6,7 @@ import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.MeterRegistry; import java.io.Closeable; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Set; @@ -18,6 +19,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.apache.curator.framework.recipes.nodes.PersistentNode; +import org.apache.curator.framework.recipes.watch.PersistentWatcher; import org.apache.curator.x.async.AsyncCuratorFramework; import org.apache.curator.x.async.api.CreateOption; import org.apache.curator.x.async.modeled.ModelSerializer; @@ -27,6 +29,7 @@ import org.apache.curator.x.async.modeled.cached.CachedModeledFramework; import org.apache.curator.x.async.modeled.cached.ModeledCacheListener; import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.Watcher; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,6 +66,7 @@ public class KaldbMetadataStore implements Closeable { new ConcurrentHashMap<>(); private final Map persistentNodeMap = new ConcurrentHashMap<>(); + private final Map persistentWatcherMap = new ConcurrentHashMap<>(); public static final String PERSISTENT_NODE_RECREATED_COUNTER = "metadata_persistent_node_recreated"; @@ -112,34 +116,92 @@ public static boolean persistentEphemeralModeEnabled() { public CompletionStage createAsync(T metadataNode) { if (createMode == CreateMode.EPHEMERAL && persistentEphemeralModeEnabled()) { String nodePath = resolvePath(metadataNode); - // persistent node already implements NodeDataChanged - PersistentNode node = - new PersistentNode( - curator.unwrap(), - createMode, - false, - nodePath, - modelSpec.serializer().serialize(metadataNode)); - persistentNodeMap.put(nodePath, node); - node.start(); - // todo - what happens when attempting to create over existing? - return CompletableFuture.supplyAsync( - () -> { - try { - node.waitForInitialCreate(DEFAULT_ZK_TIMEOUT_SECS, TimeUnit.SECONDS); - node.getListenable().addListener(_ -> persistentNodeRecreatedCounter.increment()); - return nodePath; - } catch (Exception e) { - throw new CompletionException(e); - } - }); + return hasAsync(metadataNode.name) + .thenApplyAsync( + (stat) -> { + // it is possible that we have a node that hasn't been yet async persisted to ZK + if (stat != null || persistentNodeMap.containsKey(nodePath)) { + throw new CompletionException( + new IllegalArgumentException( + String.format("Node already exists at '%s'", nodePath))); + } + + PersistentNode node = + new PersistentNode( + curator.unwrap(), + createMode, + false, + nodePath, + modelSpec.serializer().serialize(metadataNode)); + persistentNodeMap.put(nodePath, node); + node.start(); + + try { + node.waitForInitialCreate(DEFAULT_ZK_TIMEOUT_SECS, TimeUnit.SECONDS); + node.getListenable().addListener(_ -> persistentNodeRecreatedCounter.increment()); + + // add a persistent watcher for node data changes on this persistent ephemeral + // node this is so when someone else updates a field on the ephemeral node, the + // owner also updates their local copy + PersistentWatcher persistentWatcher = + new PersistentWatcher(curator.unwrap(), node.getActualPath(), false); + persistentWatcher + .getListenable() + .addListener( + event -> { + try { + if (event.getType() == Watcher.Event.EventType.NodeDataChanged) { + node.waitForInitialCreate( + DEFAULT_ZK_TIMEOUT_SECS, TimeUnit.SECONDS); + modeledClient + .withPath(ZPath.parse(event.getPath())) + .read() + .thenAcceptAsync( + (updated) -> { + try { + if (node.getActualPath() != null) { + byte[] updatedBytes = + modelSpec.serializer().serialize(updated); + if (!Arrays.equals(node.getData(), updatedBytes)) { + // only trigger a setData if something actually + // changed, otherwise + // we end up in a deathloop + node.setData( + modelSpec.serializer().serialize(updated)); + } + } + } catch (Exception e) { + LOG.error( + "Error attempting to set local node data - fatal ZK error", + e); + new RuntimeHalterImpl().handleFatal(e); + } + }); + } + } catch (Exception e) { + LOG.error( + "Error attempting to watch NodeDataChanged - fatal ZK error", e); + new RuntimeHalterImpl().handleFatal(e); + } + }); + persistentWatcherMap.put(nodePath, persistentWatcher); + persistentWatcher.start(); + return nodePath; + } catch (Exception e) { + throw new CompletionException(e); + } + }); } else { // by passing the version 0, this will throw if we attempt to create and it already exists return modeledClient.set(metadataNode, 0); } } - // resolveForSet + /** + * Based off of the private ModelFrameWorkImp resolveForSet + * + * @see org.apache.curator.x.async.modeled.details.ModeledFrameworkImpl.resolveForSet + */ private String resolvePath(T model) { if (modelSpec.path().isResolved()) { return modelSpec.path().fullPath(); @@ -158,6 +220,11 @@ public void createSync(T metadataNode) { } public CompletionStage getAsync(String path) { + PersistentNode node = getPersistentNodeIfExists(path); + if (node != null) { + return CompletableFuture.supplyAsync( + () -> modelSpec.serializer().deserialize(node.getData())); + } if (cachedModeledFramework != null) { return cachedModeledFramework.withPath(zPath.resolved(path)).readThrough(); } @@ -173,6 +240,8 @@ public T getSync(String path) { } public CompletionStage hasAsync(String path) { + // We don't use the persist node here, as we want to get the actual stat details which isn't + // available on the persistentnode if (cachedModeledFramework != null) { awaitCacheInitialized(); return cachedModeledFramework.withPath(zPath.resolved(path)).checkExists(); @@ -317,11 +386,23 @@ private PersistentNode getPersistentNodeIfExists(T metadataNode) { return persistentNodeMap.getOrDefault(resolvePath(metadataNode), null); } + private PersistentNode getPersistentNodeIfExists(String path) { + return persistentNodeMap.getOrDefault(zPath.resolved(path).fullPath(), null); + } + private PersistentNode removePersistentNodeIfExists(T metadataNode) { + PersistentWatcher watcher = persistentWatcherMap.remove(resolvePath(metadataNode)); + if (watcher != null) { + watcher.close(); + } return persistentNodeMap.remove(resolvePath(metadataNode)); } private PersistentNode removePersistentNodeIfExists(String path) { + PersistentWatcher watcher = persistentWatcherMap.remove(zPath.resolved(path).fullPath()); + if (watcher != null) { + watcher.close(); + } return persistentNodeMap.remove(zPath.resolved(path).fullPath()); } @@ -342,11 +423,21 @@ public void initialized() { @Override public void close() { + persistentWatcherMap.forEach( + (_, persistentWatcher) -> { + try { + persistentWatcher.close(); + } catch (Exception e) { + LOG.error("Error removing persistent watchers", e); + } + }); + persistentNodeMap.forEach( (_, persistentNode) -> { try { persistentNode.close(); - } catch (Exception ignored) { + } catch (Exception e) { + LOG.error("Error removing persistent nodes", e); } }); diff --git a/kaldb/src/main/java/com/slack/kaldb/recovery/RecoveryService.java b/kaldb/src/main/java/com/slack/kaldb/recovery/RecoveryService.java index 0057db7713..12acac3176 100644 --- a/kaldb/src/main/java/com/slack/kaldb/recovery/RecoveryService.java +++ b/kaldb/src/main/java/com/slack/kaldb/recovery/RecoveryService.java @@ -179,6 +179,13 @@ protected void shutDown() throws Exception { private void recoveryNodeListener(RecoveryNodeMetadata recoveryNodeMetadata) { Metadata.RecoveryNodeMetadata.RecoveryNodeState newRecoveryNodeState = recoveryNodeMetadata.recoveryNodeState; + if (recoveryNodeLastKnownState.equals(recoveryNodeMetadata.recoveryNodeState)) { + // todo - consider moving this to a model where it deduplicates with a scheduled executor, + // similar to the manager + // This can fire duplicate events if the ephemeral node is re-created + LOG.info("Recovery node - listener fired with no state change, skipping event"); + return; + } if (newRecoveryNodeState.equals(Metadata.RecoveryNodeMetadata.RecoveryNodeState.ASSIGNED)) { LOG.info("Recovery node - ASSIGNED received"); diff --git a/kaldb/src/test/java/com/slack/kaldb/chunk/ReadOnlyChunkImplTest.java b/kaldb/src/test/java/com/slack/kaldb/chunk/ReadOnlyChunkImplTest.java index 79dd313323..17587ce877 100644 --- a/kaldb/src/test/java/com/slack/kaldb/chunk/ReadOnlyChunkImplTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/chunk/ReadOnlyChunkImplTest.java @@ -154,8 +154,11 @@ public void shouldHandleChunkLivecycle() throws Exception { // ensure that the chunk was marked LIVE await().until(() -> KaldbMetadataTestUtils.listSyncUncached(searchMetadataStore).size() == 1); - assertThat(readOnlyChunk.getChunkMetadataState()) - .isEqualTo(Metadata.CacheSlotMetadata.CacheSlotState.LIVE); + await() + .until( + () -> + readOnlyChunk.getChunkMetadataState() + == Metadata.CacheSlotMetadata.CacheSlotState.LIVE); SearchResult logMessageSearchResult = readOnlyChunk.query( @@ -230,6 +233,10 @@ public void shouldHandleChunkLivecycle() throws Exception { assertThat(meterRegistry.get(CHUNK_ASSIGNMENT_TIMER).tag("successful", "false").timer().count()) .isEqualTo(0); + cacheSlotMetadataStore.close(); + searchMetadataStore.close(); + snapshotMetadataStore.close(); + replicaMetadataStore.close(); curatorFramework.unwrap().close(); } @@ -300,6 +307,10 @@ public void shouldHandleMissingS3Assets() throws Exception { assertThat(meterRegistry.get(CHUNK_ASSIGNMENT_TIMER).tag("successful", "false").timer().count()) .isEqualTo(1); + cacheSlotMetadataStore.close(); + searchMetadataStore.close(); + snapshotMetadataStore.close(); + replicaMetadataStore.close(); curatorFramework.unwrap().close(); } @@ -370,6 +381,10 @@ public void shouldHandleMissingZkData() throws Exception { assertThat(meterRegistry.get(CHUNK_ASSIGNMENT_TIMER).tag("successful", "false").timer().count()) .isEqualTo(1); + cacheSlotMetadataStore.close(); + searchMetadataStore.close(); + snapshotMetadataStore.close(); + replicaMetadataStore.close(); curatorFramework.unwrap().close(); } @@ -473,6 +488,10 @@ public void closeShouldCleanupLiveChunkCorrectly() throws Exception { assertThat(files.findFirst().isPresent()).isFalse(); } + cacheSlotMetadataStore.close(); + searchMetadataStore.close(); + snapshotMetadataStore.close(); + replicaMetadataStore.close(); curatorFramework.unwrap().close(); } diff --git a/kaldb/src/test/java/com/slack/kaldb/metadata/cache/CacheSlotMetadataStoreTest.java b/kaldb/src/test/java/com/slack/kaldb/metadata/cache/CacheSlotMetadataStoreTest.java index 7cfa648dd1..304b12f260 100644 --- a/kaldb/src/test/java/com/slack/kaldb/metadata/cache/CacheSlotMetadataStoreTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/metadata/cache/CacheSlotMetadataStoreTest.java @@ -52,6 +52,7 @@ public void setUp() throws Exception { @AfterEach public void tearDown() throws IOException { + store.close(); curatorFramework.unwrap().close(); testingServer.close(); meterRegistry.close(); diff --git a/kaldb/src/test/java/com/slack/kaldb/metadata/core/KaldbMetadataStoreTest.java b/kaldb/src/test/java/com/slack/kaldb/metadata/core/KaldbMetadataStoreTest.java index 11524b56bd..edb513363d 100644 --- a/kaldb/src/test/java/com/slack/kaldb/metadata/core/KaldbMetadataStoreTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/metadata/core/KaldbMetadataStoreTest.java @@ -31,7 +31,6 @@ import org.junit.jupiter.api.Test; public class KaldbMetadataStoreTest { - private TestingServer testingServer; private MeterRegistry meterRegistry; @@ -50,7 +49,7 @@ public void setUp() throws Exception { KaldbConfigs.ZookeeperConfig.newBuilder() .setZkConnectString(testingServer.getConnectString()) .setZkPathPrefix("Test") - .setZkSessionTimeoutMs(10000) + .setZkSessionTimeoutMs(2500) .setZkConnectionTimeoutMs(1000) .setSleepBetweenRetriesMs(500) .build(); @@ -712,4 +711,181 @@ public TestMetadataStore() { curator.close(); } + + @Test + public void testWhenPersistentEphemeralIsUpdatedByAnotherCurator() { + System.setProperty(KaldbMetadataStore.PERSISTENT_EPHEMERAL_PROPERTY, "true"); + RetryPolicy retryPolicy = new RetryNTimes(1, 10); + CuratorFramework curator1 = + CuratorFrameworkFactory.builder() + .connectString(zookeeperConfig.getZkConnectString()) + .namespace(zookeeperConfig.getZkPathPrefix()) + .connectionTimeoutMs(50) + .sessionTimeoutMs(50) + .retryPolicy(retryPolicy) + .build(); + curator1.start(); + AsyncCuratorFramework asyncCuratorFramework1 = AsyncCuratorFramework.wrap(curator1); + + CuratorFramework curator2 = + CuratorFrameworkFactory.builder() + .connectString(zookeeperConfig.getZkConnectString()) + .namespace(zookeeperConfig.getZkPathPrefix()) + .connectionTimeoutMs(50) + .sessionTimeoutMs(50) + .retryPolicy(retryPolicy) + .build(); + curator2.start(); + AsyncCuratorFramework asyncCuratorFramework2 = AsyncCuratorFramework.wrap(curator2); + + class TestMetadataStore extends KaldbMetadataStore { + public TestMetadataStore(AsyncCuratorFramework curator) { + super( + curator, + CreateMode.EPHEMERAL, + true, + new JacksonModelSerializer<>(TestMetadata.class), + STORE_FOLDER, + meterRegistry); + } + } + + TestMetadata metadata1 = new TestMetadata("foo", "val1"); + try (KaldbMetadataStore store1 = new TestMetadataStore(asyncCuratorFramework1)) { + store1.createSync(metadata1); + + try (KaldbMetadataStore store2 = + new TestMetadataStore(asyncCuratorFramework2)) { + // curator2 updates the value + TestMetadata metadata2 = store2.getSync(metadata1.name); + metadata2.value = "val2"; + store2.updateSync(metadata2); + } + + // curator1 should pickup the new update + await().until(() -> store1.getSync(metadata1.name).getValue().equals("val2")); + assertThat(store1.hasSync(metadata1.name)).isTrue(); + } + + curator2.close(); + curator1.close(); + } + + @Test + public void testDoubleCreateEphemeral() { + System.setProperty(KaldbMetadataStore.PERSISTENT_EPHEMERAL_PROPERTY, "true"); + RetryPolicy retryPolicy = new RetryNTimes(1, 10); + CuratorFramework curator1 = + CuratorFrameworkFactory.builder() + .connectString(zookeeperConfig.getZkConnectString()) + .namespace(zookeeperConfig.getZkPathPrefix()) + .connectionTimeoutMs(50) + .sessionTimeoutMs(50) + .retryPolicy(retryPolicy) + .build(); + curator1.start(); + AsyncCuratorFramework asyncCuratorFramework1 = AsyncCuratorFramework.wrap(curator1); + + CuratorFramework curator2 = + CuratorFrameworkFactory.builder() + .connectString(zookeeperConfig.getZkConnectString()) + .namespace(zookeeperConfig.getZkPathPrefix()) + .connectionTimeoutMs(50) + .sessionTimeoutMs(50) + .retryPolicy(retryPolicy) + .build(); + curator2.start(); + AsyncCuratorFramework asyncCuratorFramework2 = AsyncCuratorFramework.wrap(curator2); + + class TestMetadataStore extends KaldbMetadataStore { + public TestMetadataStore(AsyncCuratorFramework curator) { + super( + curator, + CreateMode.EPHEMERAL, + true, + new JacksonModelSerializer<>(TestMetadata.class), + STORE_FOLDER, + meterRegistry); + } + } + TestMetadata testMetadata = new TestMetadata("foo", "vbr"); + try (KaldbMetadataStore store1 = new TestMetadataStore(asyncCuratorFramework1)) { + store1.createSync(testMetadata); + assertThrows(InternalMetadataStoreException.class, () -> store1.createSync(testMetadata)); + + try (KaldbMetadataStore store2 = + new TestMetadataStore(asyncCuratorFramework2)) { + assertThrows(InternalMetadataStoreException.class, () -> store2.createSync(testMetadata)); + } + } + + curator2.close(); + curator1.close(); + } + + @Test + public void testWhenPersistentEphemeralIsUpdatedByAnotherCuratorWhileAway() throws Exception { + System.setProperty(KaldbMetadataStore.PERSISTENT_EPHEMERAL_PROPERTY, "true"); + RetryPolicy retryPolicy = new RetryNTimes(1, 10); + CuratorFramework curator1 = + CuratorFrameworkFactory.builder() + .connectString(zookeeperConfig.getZkConnectString()) + .namespace(zookeeperConfig.getZkPathPrefix()) + .connectionTimeoutMs(50) + .sessionTimeoutMs(50) + .retryPolicy(retryPolicy) + .build(); + curator1.start(); + AsyncCuratorFramework asyncCuratorFramework1 = AsyncCuratorFramework.wrap(curator1); + + CuratorFramework curator2 = + CuratorFrameworkFactory.builder() + .connectString(zookeeperConfig.getZkConnectString()) + .namespace(zookeeperConfig.getZkPathPrefix()) + .connectionTimeoutMs(50) + .sessionTimeoutMs(50) + .retryPolicy(retryPolicy) + .build(); + curator2.start(); + AsyncCuratorFramework asyncCuratorFramework2 = AsyncCuratorFramework.wrap(curator2); + + class TestMetadataStore extends KaldbMetadataStore { + public TestMetadataStore(AsyncCuratorFramework curator) { + super( + curator, + CreateMode.EPHEMERAL, + true, + new JacksonModelSerializer<>(TestMetadata.class), + STORE_FOLDER, + meterRegistry); + } + } + + TestMetadata metadata1 = new TestMetadata("foo", "val1"); + try (KaldbMetadataStore store1 = new TestMetadataStore(asyncCuratorFramework1)) { + store1.createSync(metadata1); + + long sessionId1 = curator1.getZookeeperClient().getZooKeeper().getSessionId(); + try (KaldbMetadataStore store2 = + new TestMetadataStore(asyncCuratorFramework2)) { + // curator2 updates the value + TestMetadata metadata2 = store2.getSync(metadata1.name); + metadata2.value = "val2"; + + // force a ZK session close + curator1.getZookeeperClient().getZooKeeper().close(); + assertThrows(InternalMetadataStoreException.class, () -> store2.updateSync(metadata2)); + } + + await() + .until(() -> curator1.getZookeeperClient().getZooKeeper().getSessionId() != sessionId1); + + // wait until the node shows back up on ZK + await().until(() -> store1.hasSync(metadata1.name)); + assertThat(store1.getSync(metadata1.name).getValue()).isEqualTo("val1"); + } + + curator2.close(); + curator1.close(); + } } diff --git a/kaldb/src/test/java/com/slack/kaldb/recovery/RecoveryServiceTest.java b/kaldb/src/test/java/com/slack/kaldb/recovery/RecoveryServiceTest.java index 912f1db9df..f6aa048233 100644 --- a/kaldb/src/test/java/com/slack/kaldb/recovery/RecoveryServiceTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/recovery/RecoveryServiceTest.java @@ -184,6 +184,8 @@ public void testShouldHandleRecoveryTask() throws Exception { assertThat(getCount(ROLLOVERS_INITIATED, meterRegistry)).isEqualTo(1); assertThat(getCount(ROLLOVERS_COMPLETED, meterRegistry)).isEqualTo(1); assertThat(getCount(ROLLOVERS_FAILED, meterRegistry)).isEqualTo(0); + + snapshotMetadataStore.close(); } @Test @@ -269,6 +271,8 @@ public void testShouldHandleRecoveryTaskWithCompletelyUnavailableOffsets() throw assertThat(getCount(ROLLOVERS_INITIATED, meterRegistry)).isEqualTo(0); assertThat(getCount(ROLLOVERS_COMPLETED, meterRegistry)).isEqualTo(0); assertThat(getCount(ROLLOVERS_FAILED, meterRegistry)).isEqualTo(0); + + snapshotMetadataStore.close(); } @Test @@ -357,6 +361,8 @@ public void testShouldHandleRecoveryTaskWithPartiallyUnavailableOffsets() throws assertThat(getCount(ROLLOVERS_INITIATED, meterRegistry)).isEqualTo(0); assertThat(getCount(ROLLOVERS_COMPLETED, meterRegistry)).isEqualTo(0); assertThat(getCount(ROLLOVERS_FAILED, meterRegistry)).isEqualTo(0); + + snapshotMetadataStore.close(); } @Test @@ -398,6 +404,8 @@ public void testShouldHandleRecoveryTaskFailure() throws Exception { assertThat(getCount(ROLLOVERS_INITIATED, meterRegistry)).isEqualTo(1); assertThat(getCount(ROLLOVERS_COMPLETED, meterRegistry)).isEqualTo(0); assertThat(getCount(ROLLOVERS_FAILED, meterRegistry)).isEqualTo(1); + + snapshotMetadataStore.close(); } @Test @@ -482,6 +490,10 @@ public void testShouldHandleRecoveryTaskAssignmentSuccess() throws Exception { assertThat(getCount(ROLLOVERS_INITIATED, meterRegistry)).isEqualTo(1); assertThat(getCount(ROLLOVERS_COMPLETED, meterRegistry)).isEqualTo(1); assertThat(getCount(ROLLOVERS_FAILED, meterRegistry)).isEqualTo(0); + + snapshotMetadataStore.close(); + recoveryTaskMetadataStore.close(); + recoveryNodeMetadataStore.close(); } @Test @@ -571,6 +583,10 @@ public void testShouldHandleRecoveryTaskAssignmentFailure() throws Exception { assertThat(getCount(ROLLOVERS_INITIATED, meterRegistry)).isEqualTo(1); assertThat(getCount(ROLLOVERS_COMPLETED, meterRegistry)).isEqualTo(0); assertThat(getCount(ROLLOVERS_FAILED, meterRegistry)).isEqualTo(1); + + snapshotMetadataStore.close(); + recoveryTaskMetadataStore.close(); + recoveryNodeMetadataStore.close(); } @Test @@ -709,11 +725,17 @@ public void shouldHandleInvalidRecoveryTasks() throws Exception { // Post recovery checks assertThat(KaldbMetadataTestUtils.listSyncUncached(recoveryNodeMetadataStore).size()) .isEqualTo(1); - assertThat( - KaldbMetadataTestUtils.listSyncUncached(recoveryNodeMetadataStore) - .get(0) - .recoveryNodeState) - .isEqualTo(Metadata.RecoveryNodeMetadata.RecoveryNodeState.FREE); + + await() + .until( + () -> + KaldbMetadataTestUtils.listSyncUncached(recoveryNodeMetadataStore) + .get(0) + .recoveryNodeState + == Metadata.RecoveryNodeMetadata.RecoveryNodeState.FREE); + + recoveryNodeMetadataStore.close(); + recoveryTaskMetadataStore.close(); } // returns startOffset or endOffset based on the supplied OffsetSpec