diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java index d7043dc8c9a..9b77260fa12 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java @@ -1608,6 +1608,11 @@ public CompletableFuture getLedgerMetadata(long ledgerId) { }); } + @Override + public CompletableFuture isDriverMetadataServiceAvailable() { + return metadataDriver.isMetadataServiceAvailable(); + } + private final ClientContext clientCtx = new ClientContext() { @Override public ClientInternalConf getConf() { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/BookKeeper.java index 1e7fa35670f..6e0bffe3a0e 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/BookKeeper.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/BookKeeper.java @@ -81,6 +81,12 @@ static BookKeeperBuilder newBuilder(final ClientConfiguration clientConfiguratio */ CompletableFuture getLedgerMetadata(long ledgerId); + /** + * Return driver metadata service is available + * + * @return the metadata service is available. + */ + CompletableFuture isDriverMetadataServiceAvailable(); /** * Close the client and release every resource. * diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MetadataClientDriver.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MetadataClientDriver.java index 1615f04ecdc..89919f0f769 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MetadataClientDriver.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MetadataClientDriver.java @@ -114,4 +114,13 @@ interface SessionStateListener { default CompletableFuture isHealthCheckEnabled() { return FutureUtils.value(true); } + + /** + * Return driver metadata service is available + * + * @return the metadata service is available. + */ + default CompletableFuture isMetadataServiceAvailable() { + return FutureUtils.value(true); + } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/zk/ZKMetadataClientDriver.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/zk/ZKMetadataClientDriver.java index 1fe6210ce9b..a89dd31f145 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/zk/ZKMetadataClientDriver.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/zk/ZKMetadataClientDriver.java @@ -20,6 +20,7 @@ import com.google.common.annotations.VisibleForTesting; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.conf.ClientConfiguration; @@ -111,4 +112,8 @@ public void setSessionStateListener(SessionStateListener sessionStateListener) { } }); } + + public CompletableFuture isMetadataServiceAvailable() { + return CompletableFuture.completedFuture(metadataServiceAvailable); + } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/zk/ZKMetadataDriverBase.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/zk/ZKMetadataDriverBase.java index 643ddd3ced2..9e469fc7be8 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/zk/ZKMetadataDriverBase.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/zk/ZKMetadataDriverBase.java @@ -27,6 +27,7 @@ import java.io.IOException; import java.net.URI; +import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -53,6 +54,7 @@ import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; @@ -64,6 +66,8 @@ public class ZKMetadataDriverBase implements AutoCloseable { protected static final String SCHEME = "zk"; + + protected volatile boolean metadataServiceAvailable; private static final int ZK_CLIENT_WAIT_FOR_SHUTDOWN_TIMEOUT_MS = 5000; public static String getZKServersFromServiceUri(URI uri) { @@ -179,6 +183,7 @@ protected void initialize(AbstractConfiguration conf, // if an external zookeeper is added, use the zookeeper instance this.zk = (ZooKeeper) (optionalCtx.get()); this.ownZKHandle = false; + this.metadataServiceAvailable = true; } else { final String metadataServiceUriStr; try { @@ -212,6 +217,10 @@ protected void initialize(AbstractConfiguration conf, .sessionTimeoutMs(conf.getZkTimeout()) .operationRetryPolicy(zkRetryPolicy) .requestRateLimit(conf.getZkRequestRateLimit()) + .watchers(Collections.singleton(watchedEvent -> { + log.info("Got ZK session watch event: {}", watchedEvent); + handleState(watchedEvent.getState()); + })) .statsLogger(statsLogger) .build(); @@ -247,6 +256,19 @@ protected void initialize(AbstractConfiguration conf, acls); } + private synchronized void handleState(Watcher.Event.KeeperState zkClientState) { + switch (zkClientState) { + case Expired: + case Disconnected: + this.metadataServiceAvailable = false; + break; + + default: + this.metadataServiceAvailable = true; + break; + } + } + public LayoutManager getLayoutManager() { return layoutManager; } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/DriverMetadataServiceAvailableTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/DriverMetadataServiceAvailableTest.java new file mode 100644 index 00000000000..0bc5f28cc8d --- /dev/null +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/DriverMetadataServiceAvailableTest.java @@ -0,0 +1,55 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.client.api; + +import org.apache.bookkeeper.conf.ClientConfiguration; +import org.apache.bookkeeper.test.BookKeeperClusterTestCase; +import org.awaitility.Awaitility; +import org.junit.Test; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +/** + * Bookkeeper Client API driver metadata service available test. + */ +public class DriverMetadataServiceAvailableTest extends BookKeeperClusterTestCase { + + + public DriverMetadataServiceAvailableTest() { + super(3); + } + + @Test + public void testDriverMetadataServiceAvailable() + throws Exception { + + ClientConfiguration conf = new ClientConfiguration(); + conf.setMetadataServiceUri(zkUtil.getMetadataServiceUri()); + conf.setZkTimeout(3000); + try (BookKeeper bkc = BookKeeper.newBuilder(conf).build()) { + Awaitility.await().until(() -> bkc.isDriverMetadataServiceAvailable().get()); + zkUtil.sleepCluster(5, TimeUnit.SECONDS, new CountDownLatch(1)); + Awaitility.await().until(() -> !bkc.isDriverMetadataServiceAvailable().get()); + Awaitility.await().until(() -> bkc.isDriverMetadataServiceAvailable().get()); + } + } +}