diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md index 0b5732693f6..3032c3e7411 100644 --- a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md +++ b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md @@ -1070,6 +1070,13 @@ property, when available, is noted below. **New in 3.6.0:** The size threshold after which a request is considered a large request. If it is -1, then all requests are considered small, effectively turning off large request throttling. The default is -1. +* *ephemeralNodes.total.byte.limit* : + (Java system property: **zookeeper.ephemeralNodes.total.byte.limit**) + This property set a limit on the amount of ephemeral nodes that can be created in one session. The limit is the number + of bytes it takes to store the serialized path strings for all the session's ephemeral nodes. + This limit should always be under the jute maxbuffer, as exceeding will cause the server to crash when the connection is closed + and a transaction to delete all the ephemeral nodes for that session are deleted. This limit will be ignored if not explicitly set. + * *outstandingHandshake.limit* (Jave system property only: **zookeeper.netty.server.outstandingHandshake.limit**) The maximum in-flight TLS handshake connections could have in ZooKeeper, diff --git a/zookeeper-jute/src/main/java/org/apache/jute/BinaryOutputArchive.java b/zookeeper-jute/src/main/java/org/apache/jute/BinaryOutputArchive.java index 4752d8461ab..8e936557545 100644 --- a/zookeeper-jute/src/main/java/org/apache/jute/BinaryOutputArchive.java +++ b/zookeeper-jute/src/main/java/org/apache/jute/BinaryOutputArchive.java @@ -102,6 +102,29 @@ private ByteBuffer stringToByteBuffer(CharSequence s) { return bb; } + public static int getSerializedStringByteSize(String s) throws ArithmeticException { + if (s == null) { + return 0; + } + + // Always add 4 bytes to size as we call writeInt(bb.remaining(), "len") when writing to DataOutput + int length_descriptor_size = 4; + + int size = 0; + final int len = s.length(); + for (int i = 0; i < len; i++) { + char c = s.charAt(i); + if (c < 0x80) { + size = Math.addExact(size, 1); + } else if (c < 0x800) { + size = Math.addExact(size, 2); + } else { + size = Math.addExact(size, 3); + } + } + return Math.addExact(size, length_descriptor_size); + } + public void writeString(String s, String tag) throws IOException { if (s == null) { writeInt(-1, "len"); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/KeeperException.java b/zookeeper-server/src/main/java/org/apache/zookeeper/KeeperException.java index 5cff6f3180c..12381b5bb79 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/KeeperException.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/KeeperException.java @@ -148,6 +148,8 @@ public static KeeperException create(Code code) { return new SessionClosedRequireAuthException(); case REQUESTTIMEOUT: return new RequestTimeoutException(); + case TOTALEPHEMERALLIMITEXCEEDED: + return new TotalEphemeralLimitExceeded(); case OK: default: throw new IllegalArgumentException("Invalid exception code"); @@ -404,7 +406,10 @@ public enum Code implements CodeDeprecated { /** The session has been closed by server because server requires client to do SASL authentication, * but client is not configured with SASL authentication or configuted with SASL but failed * (i.e. wrong credential used.). */ - SESSIONCLOSEDREQUIRESASLAUTH(-124); + SESSIONCLOSEDREQUIRESASLAUTH(-124), + /** Request to create ephemeral node was rejected because the total byte limit for the session was exceeded. + * This limit is manually set through the "zookeeper.ephemeralNodes.total.byte.limit" system property. */ + TOTALEPHEMERALLIMITEXCEEDED(-125); private static final Map lookup = new HashMap(); @@ -495,6 +500,8 @@ static String getCodeMessage(Code code) { return "Reconfig is disabled"; case SESSIONCLOSEDREQUIRESASLAUTH: return "Session closed because client failed to authenticate"; + case TOTALEPHEMERALLIMITEXCEEDED: + return "Ephemeral count exceeded for session"; default: return "Unknown error " + code; } @@ -940,4 +947,10 @@ public RequestTimeoutException() { } + public static class TotalEphemeralLimitExceeded extends KeeperException { + public TotalEphemeralLimitExceeded() { + super(Code.TOTALEPHEMERALLIMITEXCEEDED); + } + } + } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java index 959bc59bd70..650fba0df28 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java @@ -35,11 +35,10 @@ import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import org.apache.jute.BinaryInputArchive; -import org.apache.jute.InputArchive; -import org.apache.jute.OutputArchive; -import org.apache.jute.Record; + +import org.apache.jute.*; import org.apache.zookeeper.DigestWatcher; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.Code; @@ -160,6 +159,7 @@ public class DataTree { * This hashtable lists the paths of the ephemeral nodes of a session. */ private final Map> ephemerals = new ConcurrentHashMap>(); + private final Map ephemeralsByteSizeMap = new ConcurrentHashMap(); /** * This set contains the paths of all container nodes @@ -222,6 +222,11 @@ public Set getEphemerals(long sessionId) { return cloned; } + public int getTotalEphemeralsByteSize(long sessionID) { + AtomicInteger byteSize = ephemeralsByteSizeMap.get(sessionID); + return byteSize != null ? byteSize.get() : 0; + } + public Set getContainers() { return new HashSet(containers); } @@ -554,6 +559,7 @@ public void createNode(final String path, byte[] data, List acl, long ephem ttls.add(path); } else if (ephemeralOwner != 0) { HashSet list = ephemerals.get(ephemeralOwner); + AtomicInteger totalEphemeralsByteSize = ephemeralsByteSizeMap.get(ephemeralOwner); if (list == null) { list = new HashSet(); ephemerals.put(ephemeralOwner, list); @@ -561,6 +567,14 @@ public void createNode(final String path, byte[] data, List acl, long ephem synchronized (list) { list.add(path); } + // Only store sum of ephemeral node byte sizes if we're enforcing a limit + if (ZooKeeperServer.getEphemeralNodesTotalByteLimit() != -1) { + if (totalEphemeralsByteSize == null) { + totalEphemeralsByteSize = new AtomicInteger(); + ephemeralsByteSizeMap.put(ephemeralOwner, totalEphemeralsByteSize); + } + totalEphemeralsByteSize.addAndGet(BinaryOutputArchive.getSerializedStringByteSize(path)); + } } if (outputStat != null) { child.copyStat(outputStat); @@ -645,9 +659,15 @@ public void deleteNode(String path, long zxid) throws KeeperException.NoNodeExce ttls.remove(path); } else if (eowner != 0) { Set nodes = ephemerals.get(eowner); + AtomicInteger totalEphemeralsByteSize = ephemeralsByteSizeMap.get(eowner); if (nodes != null) { + Boolean nodeExisted; synchronized (nodes) { - nodes.remove(path); + nodeExisted = nodes.remove(path); + } + // Only store sum of ephemeral node byte sizes if we're enforcing a limit + if (ZooKeeperServer.getEphemeralNodesTotalByteLimit() != -1 && nodeExisted && totalEphemeralsByteSize != null) { + totalEphemeralsByteSize.addAndGet(-(BinaryOutputArchive.getSerializedStringByteSize(path))); } } } @@ -1206,6 +1226,7 @@ public ProcessTxnResult processTxn(TxnHeader header, Record txn, boolean isSubTx case OpCode.closeSession: long sessionId = header.getClientId(); if (txn != null) { + ephemeralsByteSizeMap.remove(sessionId); killSession(sessionId, header.getZxid(), ephemerals.remove(sessionId), ((CloseSessionTxn) txn).getPaths2Delete()); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java index 8bce3de0f49..b670024b6b4 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.io.StringReader; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -708,6 +709,7 @@ private void pRequest2TxnCreate(int type, Request request, Record record, boolea request.setTxn(new CreateTxn(path, data, listACL, createMode.isEphemeral(), newCversion)); } + TxnHeader hdr = request.getHdr(); long ephemeralOwner = 0; if (createMode.isContainer()) { @@ -716,6 +718,12 @@ private void pRequest2TxnCreate(int type, Request request, Record record, boolea ephemeralOwner = EphemeralType.TTL.toEphemeralOwner(ttl); } else if (createMode.isEphemeral()) { ephemeralOwner = request.sessionId; + int currentByteSize = zks.getZKDatabase().getDataTree().getTotalEphemeralsByteSize(ephemeralOwner); + if (ZooKeeperServer.getEphemeralNodesTotalByteLimit() != -1 && currentByteSize + BinaryOutputArchive.getSerializedStringByteSize(path) + > ZooKeeperServer.getEphemeralNodesTotalByteLimit()) { + ServerMetrics.getMetrics().EPHEMERAL_NODE_LIMIT_VIOLATION.inc(); + throw new KeeperException.TotalEphemeralLimitExceeded(); + } } StatPersisted s = DataTree.createStat(hdr.getZxid(), hdr.getTime(), ephemeralOwner); parentRecord = parentRecord.duplicate(request.getHdr().getZxid()); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java index cee22056d7e..c9cfcc12a52 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java @@ -156,6 +156,7 @@ private ServerMetrics(MetricsProvider metricsProvider) { COMMITS_QUEUED = metricsContext.getCounter("request_commit_queued"); READS_ISSUED_IN_COMMIT_PROC = metricsContext.getSummary("read_commit_proc_issued", DetailLevel.BASIC); WRITES_ISSUED_IN_COMMIT_PROC = metricsContext.getSummary("write_commit_proc_issued", DetailLevel.BASIC); + EPHEMERAL_NODE_LIMIT_VIOLATION = metricsContext.getCounter("ephemeral_node_limit_violation"); /** * Time spent by a read request in the commit processor. @@ -386,6 +387,7 @@ private ServerMetrics(MetricsProvider metricsProvider) { public final Counter COMMITS_QUEUED; public final Summary READS_ISSUED_IN_COMMIT_PROC; public final Summary WRITES_ISSUED_IN_COMMIT_PROC; + public final Counter EPHEMERAL_NODE_LIMIT_VIOLATION; /** * Time spent by a read request in the commit processor. diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java index 7adfb9e1297..db062ed26d4 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java @@ -99,6 +99,10 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { public static final String ENABLE_EAGER_ACL_CHECK = "zookeeper.enableEagerACLCheck"; public static final String SKIP_ACL = "zookeeper.skipACL"; + public static final String EPHEMERAL_NODES_TOTAL_BYTE_LIMIT_KEY = "zookeeper.ephemeralNodes.total.byte.limit"; + public static final int DEFAULT_EPHEMERAL_NODES_TOTAL_BYTE_LIMIT = -1; + private static int ephemeralCountLimit; + private static int ephemeralNodesTotalByteLimit; // When enabled, will check ACL constraints appertained to the requests first, // before sending the requests to the quorum. @@ -139,6 +143,9 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { closeSessionTxnEnabled = Boolean.parseBoolean( System.getProperty(CLOSE_SESSION_TXN_ENABLED, "true")); LOG.info("{} = {}", CLOSE_SESSION_TXN_ENABLED, closeSessionTxnEnabled); + + ephemeralNodesTotalByteLimit = Integer.getInteger(EPHEMERAL_NODES_TOTAL_BYTE_LIMIT_KEY, DEFAULT_EPHEMERAL_NODES_TOTAL_BYTE_LIMIT); + LOG.info("{} = {}",EPHEMERAL_NODES_TOTAL_BYTE_LIMIT_KEY, ephemeralNodesTotalByteLimit); } // @VisibleForTesting @@ -162,6 +169,8 @@ public static void setCloseSessionTxnEnabled(boolean enabled) { ZooKeeperServer.closeSessionTxnEnabled); } + public static int getEphemeralNodesTotalByteLimit() {return ephemeralNodesTotalByteLimit;} + protected ZooKeeperServerBean jmxServerBean; protected DataTreeBean jmxDataTreeBean; diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/EphemeralNodeThrottlingMultithreadTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/EphemeralNodeThrottlingMultithreadTest.java new file mode 100644 index 00000000000..8eb3b8743a5 --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/EphemeralNodeThrottlingMultithreadTest.java @@ -0,0 +1,120 @@ +package org.apache.zookeeper.server.quorum; + +import org.apache.jute.BinaryOutputArchive; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooKeeper; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; + +public class EphemeralNodeThrottlingMultithreadTest extends QuorumPeerTestBase { + + public static final String EPHEMERAL_BYTE_LIMIT_KEY = "zookeeper.ephemeralNodes.total.byte.limit"; + static final String TEST_PATH = "/ephemeral-throttling-multithread-test"; + static final int DEFAULT_EPHEMERALNODES_TOTAL_BYTE_LIMIT = (int) (Math.pow(2d, 20d) * .5); + static final int NUM_SERVERS = 5; + + @BeforeClass + public static void setUpClass() { + System.setProperty(EPHEMERAL_BYTE_LIMIT_KEY, Integer.toString(DEFAULT_EPHEMERALNODES_TOTAL_BYTE_LIMIT)); + } + + @AfterClass + public static void tearDownClass() { + System.clearProperty(EPHEMERAL_BYTE_LIMIT_KEY); + } + + // Tests multithreaded creates and deletes against the leader and a follower server + @Test + public void multithreadedRequestsTest() throws Exception { + // 50% of 1mb jute max buffer + int totalEphemeralNodesByteLimit = (int) (Math.pow(2d, 20d) * .5); + System.setProperty("zookeeper.ephemeralNodes.total.byte.limit", Integer.toString(totalEphemeralNodesByteLimit)); + + servers = LaunchServers(NUM_SERVERS); + ZooKeeper leaderServer = servers.zk[servers.findLeader()]; + ZooKeeper followerServer = servers.zk[servers.findAnyFollower()]; + + runMultithreadedRequests(leaderServer); + runMultithreadedRequests(followerServer); + + int leaderSessionEphemeralsByteSum = 0; + for (String nodePath : leaderServer.getEphemerals()) { + leaderSessionEphemeralsByteSum += BinaryOutputArchive.getSerializedStringByteSize(nodePath); + } + // TODO: What % delta do we want to allow here? + assertEquals(totalEphemeralNodesByteLimit, leaderSessionEphemeralsByteSum, totalEphemeralNodesByteLimit/20d); + + int followerSessionEphemeralsByteSum = 0; + for (String nodePath : leaderServer.getEphemerals()) { + followerSessionEphemeralsByteSum += BinaryOutputArchive.getSerializedStringByteSize(nodePath); + } + assertEquals(totalEphemeralNodesByteLimit, followerSessionEphemeralsByteSum, totalEphemeralNodesByteLimit/20d); + + servers.shutDownAllServers(); + } + + private void runMultithreadedRequests(ZooKeeper server) { + int threadPoolCount = 8; + int deleteRequestThreads = 2; + int createRequestThreads = threadPoolCount - deleteRequestThreads; + // Spin up threads to repeatedly send CREATE requests to server + ExecutorService executor = Executors.newFixedThreadPool(threadPoolCount); + for (int i = 0; i < createRequestThreads; i++) { + final int threadID = i; + executor.submit(() ->{ + long startTime = System.currentTimeMillis(); + while (System.currentTimeMillis() - startTime < 10000) { + try { + server.create(TEST_PATH +"_"+threadID+"_", new byte[512], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); + } catch (KeeperException.TotalEphemeralLimitExceeded expectedException) { + // Ignore Ephemeral Count exceeded exception, as this is expected to occur + } catch (Exception e) { + LOG.warn("Thread encountered an exception but ignored it:\n" + e.getMessage()); + } + } + }); + } + + // Spin up threads to repeatedly send DELETE requests to server + // After a 1-second sleep, this should run concurrently with the create threads, but then end before create threads end + // so that we still have time to hit the limit and can then assert that limit was upheld correctly + for (int i = 0; i < deleteRequestThreads; i++) { + executor.submit(() -> { + long startTime = System.currentTimeMillis(); + try { + // Brief sleep to reduce chance that ephemeral nodes not yet created + Thread.sleep(1000); + while (System.currentTimeMillis() - startTime < 4000) { + for (String ephemeralNode : server.getEphemerals()) { + server.delete(ephemeralNode, -1); + } + } + } catch (KeeperException.TotalEphemeralLimitExceeded expectedException) { + // Ignore Ephemeral Count exceeded exception, as this is expected to occur + } catch (Exception e) { + LOG.warn("Thread encountered an exception but ignored it:\n" + e.getMessage()); + } + }); + } + + executor.shutdown(); + try { + if(!executor.awaitTermination(12000, TimeUnit.MILLISECONDS)) { + LOG.warn("Threads did not finish in the given time!"); + executor.shutdownNow(); + } + } catch (InterruptedException e) { + LOG.error(e.getMessage()); + executor.shutdownNow(); + } + } +} diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/EphemeralNodeThrottlingTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/EphemeralNodeThrottlingTest.java new file mode 100644 index 00000000000..82dbc55c59f --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/EphemeralNodeThrottlingTest.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.quorum; + +import org.apache.jute.BinaryOutputArchive; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.metrics.MetricsUtils; +import org.junit.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.Assert.*; + +public class EphemeralNodeThrottlingTest extends QuorumPeerTestBase { + + protected static final Logger LOG = LoggerFactory.getLogger(EphemeralNodeThrottlingTest.class); + + public static final String EPHEMERAL_BYTE_LIMIT_KEY = "zookeeper.ephemeralNodes.total.byte.limit"; + public static final String EPHEMERAL_BYTE_LIMIT_VIOLATION_KEY = "ephemeral_node_limit_violation"; + static final int DEFAULT_EPHEMERALNODES_TOTAL_BYTE_LIMIT = 2000; + static final int NUM_SERVERS = 5; + static final String TEST_PATH = "/ephemeral-throttling-test"; + + @BeforeClass + public static void setUpClass() { + System.setProperty(EPHEMERAL_BYTE_LIMIT_KEY, Integer.toString(DEFAULT_EPHEMERALNODES_TOTAL_BYTE_LIMIT)); + } + + @AfterClass + public static void tearDownClass() { + System.clearProperty(EPHEMERAL_BYTE_LIMIT_KEY); + } + + @Before + public void setUpMethod() throws Exception { + servers = LaunchServers(NUM_SERVERS); + } + + @After + public void tearDownMethod() throws Exception { + servers.shutDownAllServers(); + } + + @Test + public void byteSizeTest() { + System.setProperty(EPHEMERAL_BYTE_LIMIT_KEY, Integer.toString(DEFAULT_EPHEMERALNODES_TOTAL_BYTE_LIMIT)); + ZooKeeper leaderServer = servers.zk[servers.findLeader()]; + int cumulativeBytes = 0; + int i = 0; + while (cumulativeBytes <= DEFAULT_EPHEMERALNODES_TOTAL_BYTE_LIMIT) { + cumulativeBytes += BinaryOutputArchive.getSerializedStringByteSize(TEST_PATH +i); + try { + leaderServer.create(TEST_PATH + i++, new byte[512], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); + } catch (Exception e) { + break; + } + } + long actual = (long) MetricsUtils.currentServerMetrics().get(EPHEMERAL_BYTE_LIMIT_VIOLATION_KEY); + System.out.println("byte limit property is: " + Integer.getInteger(EPHEMERAL_BYTE_LIMIT_KEY)); + assertEquals(1, actual); + } + + @Test + public void limitingEphemeralsTest() throws Exception { + System.setProperty(EPHEMERAL_BYTE_LIMIT_KEY, Integer.toString(DEFAULT_EPHEMERALNODES_TOTAL_BYTE_LIMIT)); + ZooKeeper leaderServer = servers.zk[servers.findLeader()]; + String leaderSubPath = TEST_PATH + "-leader-"; + assertTrue(checkLimitEnforcedForServer(leaderServer, leaderSubPath, CreateMode.EPHEMERAL)); + + ZooKeeper followerServer = servers.zk[servers.findAnyFollower()]; + String followerSubPath = TEST_PATH + "-follower-"; + assertTrue(checkLimitEnforcedForServer(followerServer, followerSubPath, CreateMode.EPHEMERAL)); + + // Assert both servers emitted failure metric + long actual = (long) MetricsUtils.currentServerMetrics().get(EPHEMERAL_BYTE_LIMIT_VIOLATION_KEY); + assertEquals(2, actual); + } + + @Test + public void limitingSequentialEphemeralsTest() throws Exception { + System.setProperty(EPHEMERAL_BYTE_LIMIT_KEY, Integer.toString(DEFAULT_EPHEMERALNODES_TOTAL_BYTE_LIMIT)); + ZooKeeper leaderServer = servers.zk[servers.findLeader()]; + String leaderSubPath = TEST_PATH + "-leader-"; + assertTrue(checkLimitEnforcedForServer(leaderServer, leaderSubPath, CreateMode.EPHEMERAL_SEQUENTIAL)); + + ZooKeeper followerServer = servers.zk[servers.findAnyFollower()]; + String followerSubPath = TEST_PATH + "-follower-"; + assertTrue(checkLimitEnforcedForServer(followerServer, followerSubPath, CreateMode.EPHEMERAL_SEQUENTIAL)); + + // Assert both servers emitted failure metric + long actual = (long) MetricsUtils.currentServerMetrics().get(EPHEMERAL_BYTE_LIMIT_VIOLATION_KEY); + assertEquals(2, actual); + } + + public boolean checkLimitEnforcedForServer(ZooKeeper server, String subPath, CreateMode mode) throws Exception { + if (!mode.isEphemeral()) { + return false; + } + + int limit = Integer.getInteger(EPHEMERAL_BYTE_LIMIT_KEY); + int cumulativeBytes = 0; + + if (mode.isSequential()) { + int lastPathBytes = 0; + while (cumulativeBytes + lastPathBytes <= limit) { + String path = server.create(TEST_PATH + "-leader-", new byte[512], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); + lastPathBytes = BinaryOutputArchive.getSerializedStringByteSize(path); + cumulativeBytes += lastPathBytes; + } + + try { + server.create(TEST_PATH + "-leader-", new byte[512], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); + } catch (KeeperException.TotalEphemeralLimitExceeded e) { + return true; + } + return false; + } else { + int i = 0; + while (cumulativeBytes + BinaryOutputArchive.getSerializedStringByteSize(subPath + i) + <= limit) { + server.create(subPath + i, new byte[512], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); + cumulativeBytes += BinaryOutputArchive.getSerializedStringByteSize(subPath + i); + i++; + } + try { + server.create(subPath + "-follower-" + i, new byte[512], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); + } catch (KeeperException.TotalEphemeralLimitExceeded e) { + return true; + } + return false; + } + } + + @Test + public void rejectedEphemeralMetricsTest() throws Exception { + ZooKeeper leaderServer = servers.zk[servers.findLeader()]; + int expectedLimitExceededAttempts = 10; + int i = expectedLimitExceededAttempts; + int limit = Integer.getInteger(EPHEMERAL_BYTE_LIMIT_KEY); + int cumulativeBytes = 0; + int lastPathBytes = 0; + while (i > 0 || cumulativeBytes + lastPathBytes <= limit) { + try { + String path = leaderServer.create(TEST_PATH + "-leader-", new byte[512], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); + lastPathBytes = BinaryOutputArchive.getSerializedStringByteSize(path); + cumulativeBytes += lastPathBytes; + } catch (KeeperException.TotalEphemeralLimitExceeded e) { + LOG.info("Encountered TotalEphemeralLimitExceeded as expected, continuing..."); + i--; + } + } + + long actual = (long) MetricsUtils.currentServerMetrics().get(EPHEMERAL_BYTE_LIMIT_VIOLATION_KEY); + assertEquals(expectedLimitExceededAttempts, actual); + } +}