Skip to content

Commit

Permalink
Enable ZooKeeper client to establish connection in read-only mode
Browse files Browse the repository at this point in the history
  • Loading branch information
massakam committed Mar 21, 2024
1 parent 382184e commit 933d81f
Show file tree
Hide file tree
Showing 7 changed files with 23 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -241,9 +241,9 @@ public ZooKeeperClient build() throws IOException, KeeperException, InterruptedE

// Create a watcher manager
StatsLogger watcherStatsLogger = statsLogger.scope("watcher");
ZooKeeperWatcherBase watcherManager =
null == watchers ? new ZooKeeperWatcherBase(sessionTimeoutMs, watcherStatsLogger) :
new ZooKeeperWatcherBase(sessionTimeoutMs, watchers, watcherStatsLogger);
ZooKeeperWatcherBase watcherManager = (null == watchers)
? new ZooKeeperWatcherBase(sessionTimeoutMs, allowReadOnlyMode, watcherStatsLogger)
: new ZooKeeperWatcherBase(sessionTimeoutMs, allowReadOnlyMode, watchers, watcherStatsLogger);
ZooKeeperClient client = new ZooKeeperClient(
connectString,
sessionTimeoutMs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class ZooKeeperWatcherBase implements Watcher {
.getLogger(ZooKeeperWatcherBase.class);

private final int zkSessionTimeOut;
private final boolean allowReadOnlyMode;
private volatile CountDownLatch clientConnectLatch = new CountDownLatch(1);
private final CopyOnWriteArraySet<Watcher> childWatchers =
new CopyOnWriteArraySet<Watcher>();
Expand All @@ -53,18 +54,20 @@ public class ZooKeeperWatcherBase implements Watcher {
private final ConcurrentHashMap<EventType, Counter> eventCounters =
new ConcurrentHashMap<EventType, Counter>();

public ZooKeeperWatcherBase(int zkSessionTimeOut) {
this(zkSessionTimeOut, NullStatsLogger.INSTANCE);
public ZooKeeperWatcherBase(int zkSessionTimeOut, boolean allowReadOnlyMode) {
this(zkSessionTimeOut, allowReadOnlyMode, NullStatsLogger.INSTANCE);
}

public ZooKeeperWatcherBase(int zkSessionTimeOut, StatsLogger statsLogger) {
this(zkSessionTimeOut, new HashSet<Watcher>(), statsLogger);
public ZooKeeperWatcherBase(int zkSessionTimeOut, boolean allowReadOnlyMode, StatsLogger statsLogger) {
this(zkSessionTimeOut, allowReadOnlyMode, new HashSet<Watcher>(), statsLogger);
}

public ZooKeeperWatcherBase(int zkSessionTimeOut,
boolean allowReadOnlyMode,
Set<Watcher> childWatchers,
StatsLogger statsLogger) {
this.zkSessionTimeOut = zkSessionTimeOut;
this.allowReadOnlyMode = allowReadOnlyMode;
this.childWatchers.addAll(childWatchers);
this.statsLogger = statsLogger;
}
Expand Down Expand Up @@ -130,6 +133,14 @@ public void process(WatchedEvent event) {
LOG.info("ZooKeeper client is connected now.");
clientConnectLatch.countDown();
break;
case ConnectedReadOnly:
if (allowReadOnlyMode) {
LOG.info("ZooKeeper client is connected in read-only mode now.");
clientConnectLatch.countDown();
} else {
LOG.warn("ZooKeeper client is connected in read-only mode, which is not allowed.");
}
break;
case Disconnected:
LOG.info("ZooKeeper client is disconnected from zookeeper now,"
+ " but it is OK unless we received EXPIRED event.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void run() {
byte[] sessionPasswd = bkc.getZkHandle().getSessionPasswd();

try {
ZooKeeperWatcherBase watcher = new ZooKeeperWatcherBase(10000);
ZooKeeperWatcherBase watcher = new ZooKeeperWatcherBase(10000, false);
ZooKeeper zk = new ZooKeeper(zkUtil.getZooKeeperConnectString(), 10000,
watcher, sessionId, sessionPasswd);
zk.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1080,7 +1080,7 @@ protected ZooKeeper createZooKeeper() throws IOException {
public void testZKConnectionLossForLedgerCreation() throws Exception {
int zkSessionTimeOut = 10000;
AtomicLong ledgerIdToInjectFailure = new AtomicLong(INVALID_LEDGERID);
ZooKeeperWatcherBase zooKeeperWatcherBase = new ZooKeeperWatcherBase(zkSessionTimeOut,
ZooKeeperWatcherBase zooKeeperWatcherBase = new ZooKeeperWatcherBase(zkSessionTimeOut, false,
NullStatsLogger.INSTANCE);
MockZooKeeperClient zkFaultInjectionWrapper = new MockZooKeeperClient(zkUtil.getZooKeeperConnectString(),
zkSessionTimeOut, zooKeeperWatcherBase, ledgerIdToInjectFailure);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1103,7 +1103,7 @@ public void testRWShutDownInTheCaseOfZKOperationFailures() throws Exception {
* create MockZooKeeperClient instance and wait for it to be connected.
*/
int zkSessionTimeOut = 10000;
ZooKeeperWatcherBase zooKeeperWatcherBase = new ZooKeeperWatcherBase(zkSessionTimeOut,
ZooKeeperWatcherBase zooKeeperWatcherBase = new ZooKeeperWatcherBase(zkSessionTimeOut, false,
NullStatsLogger.INSTANCE);
MockZooKeeperClient zkFaultInjectionWrapper = new MockZooKeeperClient(zkUtil.getZooKeeperConnectString(),
zkSessionTimeOut, zooKeeperWatcherBase);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ void sleepCluster(int time, TimeUnit timeUnit, CountDownLatch l)
default void expireSession(ZooKeeper zk) throws Exception {
long id = zk.getSessionId();
byte[] password = zk.getSessionPasswd();
ZooKeeperWatcherBase w = new ZooKeeperWatcherBase(10000);
ZooKeeperWatcherBase w = new ZooKeeperWatcherBase(10000, false);
ZooKeeper zk2 = new ZooKeeper(getZooKeeperConnectString(), zk.getSessionTimeout(), w, id, password);
w.waitForConnection();
zk2.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ public void process(WatchedEvent event) {
};
final int timeout = 2000;
ZooKeeperWatcherBase watcherManager =
new ZooKeeperWatcherBase(timeout).addChildWatcher(testWatcher);
new ZooKeeperWatcherBase(timeout, false).addChildWatcher(testWatcher);
List<Watcher> watchers = new ArrayList<Watcher>(1);
watchers.add(testWatcher);
ZooKeeperClient client = new ShutdownZkServerClient(
Expand Down

0 comments on commit 933d81f

Please sign in to comment.