Skip to content

Commit

Permalink
Limit the number of ephemeral nodes a session can create (#118)
Browse files Browse the repository at this point in the history
This throttles ephemeral node creation if the total size of all characters of ephemeral node paths exceeds threshold.
  • Loading branch information
GrantPSpencer authored Nov 8, 2023
1 parent a9a109f commit 6433cbd
Show file tree
Hide file tree
Showing 9 changed files with 384 additions and 6 deletions.
7 changes: 7 additions & 0 deletions zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
Original file line number Diff line number Diff line change
Expand Up @@ -1070,6 +1070,13 @@ property, when available, is noted below.
**New in 3.6.0:**
The size threshold after which a request is considered a large request. If it is -1, then all requests are considered small, effectively turning off large request throttling. The default is -1.

* *ephemeralNodes.total.byte.limit* :
(Java system property: **zookeeper.ephemeralNodes.total.byte.limit**)
This property set a limit on the amount of ephemeral nodes that can be created in one session. The limit is the number
of bytes it takes to store the serialized path strings for all the session's ephemeral nodes.
This limit should always be under the jute maxbuffer, as exceeding will cause the server to crash when the connection is closed
and a transaction to delete all the ephemeral nodes for that session are deleted. This limit will be ignored if not explicitly set.

* *outstandingHandshake.limit*
(Jave system property only: **zookeeper.netty.server.outstandingHandshake.limit**)
The maximum in-flight TLS handshake connections could have in ZooKeeper,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,29 @@ private ByteBuffer stringToByteBuffer(CharSequence s) {
return bb;
}

public static int getSerializedStringByteSize(String s) throws ArithmeticException {
if (s == null) {
return 0;
}

// Always add 4 bytes to size as we call writeInt(bb.remaining(), "len") when writing to DataOutput
int length_descriptor_size = 4;

int size = 0;
final int len = s.length();
for (int i = 0; i < len; i++) {
char c = s.charAt(i);
if (c < 0x80) {
size = Math.addExact(size, 1);
} else if (c < 0x800) {
size = Math.addExact(size, 2);
} else {
size = Math.addExact(size, 3);
}
}
return Math.addExact(size, length_descriptor_size);
}

public void writeString(String s, String tag) throws IOException {
if (s == null) {
writeInt(-1, "len");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ public static KeeperException create(Code code) {
return new SessionClosedRequireAuthException();
case REQUESTTIMEOUT:
return new RequestTimeoutException();
case TOTALEPHEMERALLIMITEXCEEDED:
return new TotalEphemeralLimitExceeded();
case OK:
default:
throw new IllegalArgumentException("Invalid exception code");
Expand Down Expand Up @@ -404,7 +406,10 @@ public enum Code implements CodeDeprecated {
/** The session has been closed by server because server requires client to do SASL authentication,
* but client is not configured with SASL authentication or configuted with SASL but failed
* (i.e. wrong credential used.). */
SESSIONCLOSEDREQUIRESASLAUTH(-124);
SESSIONCLOSEDREQUIRESASLAUTH(-124),
/** Request to create ephemeral node was rejected because the total byte limit for the session was exceeded.
* This limit is manually set through the "zookeeper.ephemeralNodes.total.byte.limit" system property. */
TOTALEPHEMERALLIMITEXCEEDED(-125);

private static final Map<Integer, Code> lookup = new HashMap<Integer, Code>();

Expand Down Expand Up @@ -495,6 +500,8 @@ static String getCodeMessage(Code code) {
return "Reconfig is disabled";
case SESSIONCLOSEDREQUIRESASLAUTH:
return "Session closed because client failed to authenticate";
case TOTALEPHEMERALLIMITEXCEEDED:
return "Ephemeral count exceeded for session";
default:
return "Unknown error " + code;
}
Expand Down Expand Up @@ -940,4 +947,10 @@ public RequestTimeoutException() {

}

public static class TotalEphemeralLimitExceeded extends KeeperException {
public TotalEphemeralLimitExceeded() {
super(Code.TOTALEPHEMERALLIMITEXCEEDED);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,10 @@
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.jute.BinaryInputArchive;
import org.apache.jute.InputArchive;
import org.apache.jute.OutputArchive;
import org.apache.jute.Record;

import org.apache.jute.*;
import org.apache.zookeeper.DigestWatcher;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
Expand Down Expand Up @@ -160,6 +159,7 @@ public class DataTree {
* This hashtable lists the paths of the ephemeral nodes of a session.
*/
private final Map<Long, HashSet<String>> ephemerals = new ConcurrentHashMap<Long, HashSet<String>>();
private final Map<Long, AtomicInteger> ephemeralsByteSizeMap = new ConcurrentHashMap<Long, AtomicInteger>();

/**
* This set contains the paths of all container nodes
Expand Down Expand Up @@ -222,6 +222,11 @@ public Set<String> getEphemerals(long sessionId) {
return cloned;
}

public int getTotalEphemeralsByteSize(long sessionID) {
AtomicInteger byteSize = ephemeralsByteSizeMap.get(sessionID);
return byteSize != null ? byteSize.get() : 0;
}

public Set<String> getContainers() {
return new HashSet<String>(containers);
}
Expand Down Expand Up @@ -554,13 +559,22 @@ public void createNode(final String path, byte[] data, List<ACL> acl, long ephem
ttls.add(path);
} else if (ephemeralOwner != 0) {
HashSet<String> list = ephemerals.get(ephemeralOwner);
AtomicInteger totalEphemeralsByteSize = ephemeralsByteSizeMap.get(ephemeralOwner);
if (list == null) {
list = new HashSet<String>();
ephemerals.put(ephemeralOwner, list);
}
synchronized (list) {
list.add(path);
}
// Only store sum of ephemeral node byte sizes if we're enforcing a limit
if (ZooKeeperServer.getEphemeralNodesTotalByteLimit() != -1) {
if (totalEphemeralsByteSize == null) {
totalEphemeralsByteSize = new AtomicInteger();
ephemeralsByteSizeMap.put(ephemeralOwner, totalEphemeralsByteSize);
}
totalEphemeralsByteSize.addAndGet(BinaryOutputArchive.getSerializedStringByteSize(path));
}
}
if (outputStat != null) {
child.copyStat(outputStat);
Expand Down Expand Up @@ -645,9 +659,15 @@ public void deleteNode(String path, long zxid) throws KeeperException.NoNodeExce
ttls.remove(path);
} else if (eowner != 0) {
Set<String> nodes = ephemerals.get(eowner);
AtomicInteger totalEphemeralsByteSize = ephemeralsByteSizeMap.get(eowner);
if (nodes != null) {
Boolean nodeExisted;
synchronized (nodes) {
nodes.remove(path);
nodeExisted = nodes.remove(path);
}
// Only store sum of ephemeral node byte sizes if we're enforcing a limit
if (ZooKeeperServer.getEphemeralNodesTotalByteLimit() != -1 && nodeExisted && totalEphemeralsByteSize != null) {
totalEphemeralsByteSize.addAndGet(-(BinaryOutputArchive.getSerializedStringByteSize(path)));
}
}
}
Expand Down Expand Up @@ -1206,6 +1226,7 @@ public ProcessTxnResult processTxn(TxnHeader header, Record txn, boolean isSubTx
case OpCode.closeSession:
long sessionId = header.getClientId();
if (txn != null) {
ephemeralsByteSizeMap.remove(sessionId);
killSession(sessionId, header.getZxid(),
ephemerals.remove(sessionId),
((CloseSessionTxn) txn).getPaths2Delete());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.IOException;
import java.io.StringReader;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -708,6 +709,7 @@ private void pRequest2TxnCreate(int type, Request request, Record record, boolea
request.setTxn(new CreateTxn(path, data, listACL, createMode.isEphemeral(), newCversion));
}


TxnHeader hdr = request.getHdr();
long ephemeralOwner = 0;
if (createMode.isContainer()) {
Expand All @@ -716,6 +718,12 @@ private void pRequest2TxnCreate(int type, Request request, Record record, boolea
ephemeralOwner = EphemeralType.TTL.toEphemeralOwner(ttl);
} else if (createMode.isEphemeral()) {
ephemeralOwner = request.sessionId;
int currentByteSize = zks.getZKDatabase().getDataTree().getTotalEphemeralsByteSize(ephemeralOwner);
if (ZooKeeperServer.getEphemeralNodesTotalByteLimit() != -1 && currentByteSize + BinaryOutputArchive.getSerializedStringByteSize(path)
> ZooKeeperServer.getEphemeralNodesTotalByteLimit()) {
ServerMetrics.getMetrics().EPHEMERAL_NODE_LIMIT_VIOLATION.inc();
throw new KeeperException.TotalEphemeralLimitExceeded();
}
}
StatPersisted s = DataTree.createStat(hdr.getZxid(), hdr.getTime(), ephemeralOwner);
parentRecord = parentRecord.duplicate(request.getHdr().getZxid());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ private ServerMetrics(MetricsProvider metricsProvider) {
COMMITS_QUEUED = metricsContext.getCounter("request_commit_queued");
READS_ISSUED_IN_COMMIT_PROC = metricsContext.getSummary("read_commit_proc_issued", DetailLevel.BASIC);
WRITES_ISSUED_IN_COMMIT_PROC = metricsContext.getSummary("write_commit_proc_issued", DetailLevel.BASIC);
EPHEMERAL_NODE_LIMIT_VIOLATION = metricsContext.getCounter("ephemeral_node_limit_violation");

/**
* Time spent by a read request in the commit processor.
Expand Down Expand Up @@ -386,6 +387,7 @@ private ServerMetrics(MetricsProvider metricsProvider) {
public final Counter COMMITS_QUEUED;
public final Summary READS_ISSUED_IN_COMMIT_PROC;
public final Summary WRITES_ISSUED_IN_COMMIT_PROC;
public final Counter EPHEMERAL_NODE_LIMIT_VIOLATION;

/**
* Time spent by a read request in the commit processor.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {

public static final String ENABLE_EAGER_ACL_CHECK = "zookeeper.enableEagerACLCheck";
public static final String SKIP_ACL = "zookeeper.skipACL";
public static final String EPHEMERAL_NODES_TOTAL_BYTE_LIMIT_KEY = "zookeeper.ephemeralNodes.total.byte.limit";
public static final int DEFAULT_EPHEMERAL_NODES_TOTAL_BYTE_LIMIT = -1;
private static int ephemeralCountLimit;
private static int ephemeralNodesTotalByteLimit;

// When enabled, will check ACL constraints appertained to the requests first,
// before sending the requests to the quorum.
Expand Down Expand Up @@ -139,6 +143,9 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
closeSessionTxnEnabled = Boolean.parseBoolean(
System.getProperty(CLOSE_SESSION_TXN_ENABLED, "true"));
LOG.info("{} = {}", CLOSE_SESSION_TXN_ENABLED, closeSessionTxnEnabled);

ephemeralNodesTotalByteLimit = Integer.getInteger(EPHEMERAL_NODES_TOTAL_BYTE_LIMIT_KEY, DEFAULT_EPHEMERAL_NODES_TOTAL_BYTE_LIMIT);
LOG.info("{} = {}",EPHEMERAL_NODES_TOTAL_BYTE_LIMIT_KEY, ephemeralNodesTotalByteLimit);
}

// @VisibleForTesting
Expand All @@ -162,6 +169,8 @@ public static void setCloseSessionTxnEnabled(boolean enabled) {
ZooKeeperServer.closeSessionTxnEnabled);
}

public static int getEphemeralNodesTotalByteLimit() {return ephemeralNodesTotalByteLimit;}

protected ZooKeeperServerBean jmxServerBean;
protected DataTreeBean jmxDataTreeBean;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package org.apache.zookeeper.server.quorum;

import org.apache.jute.BinaryOutputArchive;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertEquals;

public class EphemeralNodeThrottlingMultithreadTest extends QuorumPeerTestBase {

public static final String EPHEMERAL_BYTE_LIMIT_KEY = "zookeeper.ephemeralNodes.total.byte.limit";
static final String TEST_PATH = "/ephemeral-throttling-multithread-test";
static final int DEFAULT_EPHEMERALNODES_TOTAL_BYTE_LIMIT = (int) (Math.pow(2d, 20d) * .5);
static final int NUM_SERVERS = 5;

@BeforeClass
public static void setUpClass() {
System.setProperty(EPHEMERAL_BYTE_LIMIT_KEY, Integer.toString(DEFAULT_EPHEMERALNODES_TOTAL_BYTE_LIMIT));
}

@AfterClass
public static void tearDownClass() {
System.clearProperty(EPHEMERAL_BYTE_LIMIT_KEY);
}

// Tests multithreaded creates and deletes against the leader and a follower server
@Test
public void multithreadedRequestsTest() throws Exception {
// 50% of 1mb jute max buffer
int totalEphemeralNodesByteLimit = (int) (Math.pow(2d, 20d) * .5);
System.setProperty("zookeeper.ephemeralNodes.total.byte.limit", Integer.toString(totalEphemeralNodesByteLimit));

servers = LaunchServers(NUM_SERVERS);
ZooKeeper leaderServer = servers.zk[servers.findLeader()];
ZooKeeper followerServer = servers.zk[servers.findAnyFollower()];

runMultithreadedRequests(leaderServer);
runMultithreadedRequests(followerServer);

int leaderSessionEphemeralsByteSum = 0;
for (String nodePath : leaderServer.getEphemerals()) {
leaderSessionEphemeralsByteSum += BinaryOutputArchive.getSerializedStringByteSize(nodePath);
}
// TODO: What % delta do we want to allow here?
assertEquals(totalEphemeralNodesByteLimit, leaderSessionEphemeralsByteSum, totalEphemeralNodesByteLimit/20d);

int followerSessionEphemeralsByteSum = 0;
for (String nodePath : leaderServer.getEphemerals()) {
followerSessionEphemeralsByteSum += BinaryOutputArchive.getSerializedStringByteSize(nodePath);
}
assertEquals(totalEphemeralNodesByteLimit, followerSessionEphemeralsByteSum, totalEphemeralNodesByteLimit/20d);

servers.shutDownAllServers();
}

private void runMultithreadedRequests(ZooKeeper server) {
int threadPoolCount = 8;
int deleteRequestThreads = 2;
int createRequestThreads = threadPoolCount - deleteRequestThreads;
// Spin up threads to repeatedly send CREATE requests to server
ExecutorService executor = Executors.newFixedThreadPool(threadPoolCount);
for (int i = 0; i < createRequestThreads; i++) {
final int threadID = i;
executor.submit(() ->{
long startTime = System.currentTimeMillis();
while (System.currentTimeMillis() - startTime < 10000) {
try {
server.create(TEST_PATH +"_"+threadID+"_", new byte[512], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
} catch (KeeperException.TotalEphemeralLimitExceeded expectedException) {
// Ignore Ephemeral Count exceeded exception, as this is expected to occur
} catch (Exception e) {
LOG.warn("Thread encountered an exception but ignored it:\n" + e.getMessage());
}
}
});
}

// Spin up threads to repeatedly send DELETE requests to server
// After a 1-second sleep, this should run concurrently with the create threads, but then end before create threads end
// so that we still have time to hit the limit and can then assert that limit was upheld correctly
for (int i = 0; i < deleteRequestThreads; i++) {
executor.submit(() -> {
long startTime = System.currentTimeMillis();
try {
// Brief sleep to reduce chance that ephemeral nodes not yet created
Thread.sleep(1000);
while (System.currentTimeMillis() - startTime < 4000) {
for (String ephemeralNode : server.getEphemerals()) {
server.delete(ephemeralNode, -1);
}
}
} catch (KeeperException.TotalEphemeralLimitExceeded expectedException) {
// Ignore Ephemeral Count exceeded exception, as this is expected to occur
} catch (Exception e) {
LOG.warn("Thread encountered an exception but ignored it:\n" + e.getMessage());
}
});
}

executor.shutdown();
try {
if(!executor.awaitTermination(12000, TimeUnit.MILLISECONDS)) {
LOG.warn("Threads did not finish in the given time!");
executor.shutdownNow();
}
} catch (InterruptedException e) {
LOG.error(e.getMessage());
executor.shutdownNow();
}
}
}
Loading

0 comments on commit 6433cbd

Please sign in to comment.