|
27 | 27 |
|
28 | 28 | import java.io.IOException;
|
29 | 29 | import java.net.URI;
|
| 30 | +import java.util.Collections; |
30 | 31 | import java.util.List;
|
31 | 32 | import java.util.Optional;
|
32 | 33 | import java.util.concurrent.CompletableFuture;
|
|
53 | 54 | import org.apache.zookeeper.AsyncCallback;
|
54 | 55 | import org.apache.zookeeper.CreateMode;
|
55 | 56 | import org.apache.zookeeper.KeeperException;
|
| 57 | +import org.apache.zookeeper.Watcher; |
56 | 58 | import org.apache.zookeeper.ZooKeeper;
|
57 | 59 | import org.apache.zookeeper.data.ACL;
|
58 | 60 | import org.apache.zookeeper.data.Stat;
|
|
64 | 66 | public class ZKMetadataDriverBase implements AutoCloseable {
|
65 | 67 |
|
66 | 68 | protected static final String SCHEME = "zk";
|
| 69 | + |
| 70 | + protected volatile boolean metadataServiceAvailable; |
| 71 | + |
67 | 72 | private static final int ZK_CLIENT_WAIT_FOR_SHUTDOWN_TIMEOUT_MS = 5000;
|
68 | 73 |
|
69 | 74 | public static String getZKServersFromServiceUri(URI uri) {
|
@@ -179,6 +184,7 @@ protected void initialize(AbstractConfiguration<?> conf,
|
179 | 184 | // if an external zookeeper is added, use the zookeeper instance
|
180 | 185 | this.zk = (ZooKeeper) (optionalCtx.get());
|
181 | 186 | this.ownZKHandle = false;
|
| 187 | + this.metadataServiceAvailable = true; |
182 | 188 | } else {
|
183 | 189 | final String metadataServiceUriStr;
|
184 | 190 | try {
|
@@ -212,6 +218,12 @@ protected void initialize(AbstractConfiguration<?> conf,
|
212 | 218 | .sessionTimeoutMs(conf.getZkTimeout())
|
213 | 219 | .operationRetryPolicy(zkRetryPolicy)
|
214 | 220 | .requestRateLimit(conf.getZkRequestRateLimit())
|
| 221 | + .watchers(Collections.singleton(watchedEvent -> { |
| 222 | + if (log.isDebugEnabled()) { |
| 223 | + log.debug("Got ZK session watch event: {}", watchedEvent); |
| 224 | + } |
| 225 | + handleState(watchedEvent.getState()); |
| 226 | + })) |
215 | 227 | .statsLogger(statsLogger)
|
216 | 228 | .build();
|
217 | 229 |
|
@@ -247,6 +259,17 @@ protected void initialize(AbstractConfiguration<?> conf,
|
247 | 259 | acls);
|
248 | 260 | }
|
249 | 261 |
|
| 262 | + private void handleState(Watcher.Event.KeeperState zkClientState) { |
| 263 | + switch (zkClientState) { |
| 264 | + case Expired: |
| 265 | + case Disconnected: |
| 266 | + this.metadataServiceAvailable = false; |
| 267 | + break; |
| 268 | + default: |
| 269 | + this.metadataServiceAvailable = true; |
| 270 | + } |
| 271 | + } |
| 272 | + |
250 | 273 | public LayoutManager getLayoutManager() {
|
251 | 274 | return layoutManager;
|
252 | 275 | }
|
|
0 commit comments