From 549b94ecbfd9d72dcf57a967dfc3c9503a98d9de Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Wed, 4 Oct 2023 16:18:37 -0600 Subject: [PATCH] Move much of the RR connection selection process to Host Motivation: RoundRobinLoadBalancer contains the logic that dictates how an individual connection is selected from a Host. This means the RR lb is not just a load balancer but also the connection pool. Modifications: - Extract the connection selection out of the RR balancer and put it in Host. Result: We're continuing to separate concerns into their own abstractions. The current process of connection selection by host can now more easily shared and refined in the future. --- .../io/servicetalk/loadbalancer/Host.java | 123 ++++++++++++- .../loadbalancer/LBExceptions.java | 80 ++++++++ .../loadbalancer/RoundRobinLoadBalancer.java | 174 ++---------------- 3 files changed, 214 insertions(+), 163 deletions(-) create mode 100644 servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/LBExceptions.java diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/Host.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/Host.java index 7289837516..c32f8c7846 100644 --- a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/Host.java +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/Host.java @@ -22,7 +22,9 @@ import io.servicetalk.concurrent.api.AsyncContext; import io.servicetalk.concurrent.api.Completable; import io.servicetalk.concurrent.api.ListenableAsyncCloseable; +import io.servicetalk.concurrent.api.Single; import io.servicetalk.concurrent.internal.DelayedCancellable; +import io.servicetalk.context.api.ContextMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,8 +33,10 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.Function; +import java.util.function.Predicate; import java.util.stream.Stream; import javax.annotation.Nullable; @@ -40,12 +44,32 @@ import static io.servicetalk.concurrent.api.Completable.completed; import static io.servicetalk.concurrent.api.Publisher.from; import static io.servicetalk.concurrent.api.RetryStrategies.retryWithConstantBackoffDeltaJitter; +import static io.servicetalk.concurrent.api.Single.failed; +import static io.servicetalk.concurrent.api.Single.succeeded; import static io.servicetalk.concurrent.internal.FlowControlUtils.addWithOverflowProtection; +import static java.lang.Math.min; import static java.util.concurrent.atomic.AtomicReferenceFieldUpdater.newUpdater; import static java.util.stream.Collectors.toList; final class Host implements ListenableAsyncCloseable { + /** + * With a relatively small number of connections we can minimize connection creation under moderate concurrency by + * exhausting the full search space without sacrificing too much latency caused by the cost of a CAS operation per + * selection attempt. + */ + private static final int MIN_RANDOM_SEARCH_SPACE = 64; + + /** + * For larger search spaces, due to the cost of a CAS operation per selection attempt we see diminishing returns for + * trying to locate an available connection when most connections are in use. This increases tail latencies, thus + * after some number of failed attempts it appears to be more beneficial to open a new connection instead. + *

+ * The current heuristics were chosen based on a set of benchmarks under various circumstances, low connection + * counts, larger connection counts, low connection churn, high connection churn. + */ + private static final float RANDOM_SEARCH_FACTOR = 0.75f; + private static final Object[] EMPTY_ARRAY = new Object[0]; private static final Logger LOGGER = LoggerFactory.getLogger(Host.class); @@ -70,13 +94,18 @@ private enum State { final Addr address; @Nullable private final HealthCheckConfig healthCheckConfig; + private final ConnectionFactory connectionFactory; + private final int linearSearchSpace; private final ListenableAsyncCloseable closeable; private volatile ConnState connState = ACTIVE_EMPTY_CONN_STATE; - Host(String lbDescription, Addr address, @Nullable HealthCheckConfig healthCheckConfig) { + Host(String lbDescription, Addr address, ConnectionFactory connectionFactory, + int linearSearchSpace, @Nullable HealthCheckConfig healthCheckConfig) { this.lbDescription = lbDescription; this.address = address; this.healthCheckConfig = healthCheckConfig; + this.connectionFactory = connectionFactory; + this.linearSearchSpace = linearSearchSpace; this.closeable = toAsyncCloseable(graceful -> graceful ? doClose(AsyncCloseable::closeAsyncGracefully) : doClose(AsyncCloseable::closeAsync)); } @@ -140,7 +169,89 @@ void markExpired() { } } - void markHealthy(final HealthCheck originalHealthCheckState) { + @Nullable + C pickConnection(Predicate selector, @Nullable final ContextMap context) { + // TODO: what is the deal with not needing a context here? What is it used for in the factory? + final Object[] connections = connState.connections; + // Exhaust the linear search space first: + final int linearAttempts = min(connections.length, linearSearchSpace); + for (int j = 0; j < linearAttempts; ++j) { + @SuppressWarnings("unchecked") + final C connection = (C) connections[j]; + if (selector.test(connection)) { + return connection; + } + } + // Try other connections randomly: + if (connections.length > linearAttempts) { + final int diff = connections.length - linearAttempts; + // With small enough search space, attempt number of times equal to number of remaining connections. + // Back off after exploring most of the search space, it gives diminishing returns. + final int randomAttempts = diff < MIN_RANDOM_SEARCH_SPACE ? diff : + (int) (diff * RANDOM_SEARCH_FACTOR); + final ThreadLocalRandom rnd = ThreadLocalRandom.current(); + for (int j = 0; j < randomAttempts; ++j) { + @SuppressWarnings("unchecked") + final C connection = (C) connections[rnd.nextInt(linearAttempts, connections.length)]; + if (selector.test(connection)) { + return connection; + } + } + } + // So sad, we didn't find a healthy connection. + return null; + } + + Single newConnection( + Predicate selector, final boolean forceNewConnectionAndReserve, @Nullable final ContextMap context) { + // This LB implementation does not automatically provide TransportObserver. Therefore, we pass "null" here. + // Users can apply a ConnectionFactoryFilter if they need to override this "null" value with TransportObserver. + Single establishConnection = connectionFactory.newConnection(address, context, null); + if (healthCheckConfig != null) { + // Schedule health check before returning + establishConnection = establishConnection.beforeOnError(t -> markUnhealthy(t)); + } + return establishConnection + .flatMap(newCnx -> { + if (forceNewConnectionAndReserve && !newCnx.tryReserve()) { + return newCnx.closeAsync().concat(failed( + LBExceptions.StacklessConnectionRejectedException.newInstance( + "Newly created connection " + newCnx + " for " + lbDescription + + " could not be reserved.", + RoundRobinLoadBalancer.class, "selectConnection0(...)"))) + .shareContextOnSubscribe(); + } + + // Invoke the selector before adding the connection to the pool, otherwise, connection can be + // used concurrently and hence a new connection can be rejected by the selector. + if (!selector.test(newCnx)) { + // Failure in selection could be the result of connection factory returning cached connection, + // and not having visibility into max-concurrent-requests, or other threads already selected the + // connection which uses all the max concurrent request count. + + // If there is caching Propagate the exception and rely upon retry strategy. + Single failedSingle = failed(LBExceptions.StacklessConnectionRejectedException.newInstance( + "Newly created connection " + newCnx + " for " + lbDescription + + " was rejected by the selection filter.", + RoundRobinLoadBalancer.class, "selectConnection0(...)")); + + // Just in case the connection is not closed add it to the host so we don't lose track, + // duplicates will be filtered out. + return (addConnection(newCnx, null) ? + failedSingle : newCnx.closeAsync().concat(failedSingle)).shareContextOnSubscribe(); + } + if (addConnection(newCnx, null)) { + return succeeded(newCnx).shareContextOnSubscribe(); + } + return newCnx.closeAsync().concat( + failed(LBExceptions.StacklessConnectionRejectedException.newInstance( + "Failed to add newly created connection " + newCnx + " for " + toString(), + RoundRobinLoadBalancer.class, "selectConnection0(...)"))) + .shareContextOnSubscribe(); + }); + } + + private void markHealthy(final HealthCheck originalHealthCheckState) { // Marking healthy is called when we need to recover from an unexpected error. // However, it is possible that in the meantime, the host entered an EXPIRED state, then ACTIVE, then failed // to open connections and entered the UNHEALTHY state before the original thread continues execution here. @@ -159,7 +270,7 @@ void markHealthy(final HealthCheck originalHealthCheckState) { } } - void markUnhealthy(final Throwable cause, final ConnectionFactory connectionFactory) { + private void markUnhealthy(final Throwable cause) { assert healthCheckConfig != null; for (;;) { ConnState previous = connStateUpdater.get(this); @@ -215,7 +326,7 @@ private static boolean isUnhealthy(ConnState connState) { return HealthCheck.class.equals(connState.state.getClass()); } - boolean addConnection(final C connection, final @Nullable HealthCheck currentHealthCheck) { + private boolean addConnection(final C connection, final @Nullable HealthCheck currentHealthCheck) { int addAttempt = 0; for (;;) { final ConnState previous = connStateUpdater.get(this); @@ -325,10 +436,6 @@ Map.Entry> asEntry() { Stream.of(connState.connections).map(conn -> (C) conn).collect(toList())); } - public Object[] connections() { - return connState.connections; - } - @Override public Completable closeAsync() { return closeable.closeAsync(); diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/LBExceptions.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/LBExceptions.java new file mode 100644 index 0000000000..cd413df6a8 --- /dev/null +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/LBExceptions.java @@ -0,0 +1,80 @@ +/* + * Copyright © 2021-2023 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.loadbalancer; + +import io.servicetalk.client.api.ConnectionRejectedException; +import io.servicetalk.client.api.NoActiveHostException; +import io.servicetalk.client.api.NoAvailableHostException; +import io.servicetalk.concurrent.internal.ThrowableUtils; + +final class LBExceptions { + + static final class StacklessNoAvailableHostException extends NoAvailableHostException { + private static final long serialVersionUID = 5942960040738091793L; + + private StacklessNoAvailableHostException(final String message) { + super(message); + } + + @Override + public Throwable fillInStackTrace() { + return this; + } + + public static StacklessNoAvailableHostException newInstance(String message, Class clazz, String method) { + return ThrowableUtils.unknownStackTrace(new StacklessNoAvailableHostException(message), clazz, method); + } + } + + static final class StacklessNoActiveHostException extends NoActiveHostException { + + private static final long serialVersionUID = 7500474499335155869L; + + private StacklessNoActiveHostException(final String message) { + super(message); + } + + @Override + public Throwable fillInStackTrace() { + return this; + } + + public static StacklessNoActiveHostException newInstance(String message, Class clazz, String method) { + return ThrowableUtils.unknownStackTrace(new StacklessNoActiveHostException(message), clazz, method); + } + } + + static final class StacklessConnectionRejectedException extends ConnectionRejectedException { + private static final long serialVersionUID = -4940708893680455819L; + + private StacklessConnectionRejectedException(final String message) { + super(message); + } + + @Override + public Throwable fillInStackTrace() { + return this; + } + + public static StacklessConnectionRejectedException newInstance(String message, Class clazz, String method) { + return ThrowableUtils.unknownStackTrace(new StacklessConnectionRejectedException(message), clazz, method); + } + } + + private LBExceptions() { + // no instances + } +} diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java index 0f162648cf..ee40d3a99d 100644 --- a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java @@ -16,11 +16,8 @@ package io.servicetalk.loadbalancer; import io.servicetalk.client.api.ConnectionFactory; -import io.servicetalk.client.api.ConnectionRejectedException; import io.servicetalk.client.api.LoadBalancedConnection; import io.servicetalk.client.api.LoadBalancer; -import io.servicetalk.client.api.NoActiveHostException; -import io.servicetalk.client.api.NoAvailableHostException; import io.servicetalk.client.api.ServiceDiscovererEvent; import io.servicetalk.concurrent.PublisherSource.Processor; import io.servicetalk.concurrent.PublisherSource.Subscriber; @@ -31,7 +28,6 @@ import io.servicetalk.concurrent.api.Publisher; import io.servicetalk.concurrent.api.Single; import io.servicetalk.concurrent.internal.SequentialCancellable; -import io.servicetalk.concurrent.internal.ThrowableUtils; import io.servicetalk.context.api.ContextMap; import org.slf4j.Logger; @@ -69,7 +65,6 @@ import static io.servicetalk.concurrent.api.SourceAdapters.fromSource; import static io.servicetalk.concurrent.api.SourceAdapters.toSource; import static java.lang.Integer.toHexString; -import static java.lang.Math.min; import static java.lang.System.identityHashCode; import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; @@ -100,30 +95,13 @@ final class RoundRobinLoadBalancer - * The current heuristics were chosen based on a set of benchmarks under various circumstances, low connection - * counts, larger connection counts, low connection churn, high connection churn. - */ - private static final float RANDOM_SEARCH_FACTOR = 0.75f; - private volatile long nextResubscribeTime = RESUBSCRIBING; @SuppressWarnings("unused") private volatile int index; private volatile List> usedHosts = emptyList(); - private final String id; private final String targetResource; + private final String lbDescription; private final Publisher>> eventPublisher; private final Processor eventStreamProcessor = newPublisherProcessorDropHeadOnOverflow(32); private final Publisher eventStream; @@ -154,8 +132,8 @@ final class RoundRobinLoadBalancer connectionFactory, final int linearSearchSpace, @Nullable final HealthCheckConfig healthCheckConfig) { - this.id = id + '@' + toHexString(identityHashCode(this)); this.targetResource = requireNonNull(targetResourceName); + this.lbDescription = makeDescription(id, targetResource); this.eventPublisher = requireNonNull(eventPublisher); this.eventStream = fromSource(eventStreamProcessor) .replay(1); // Allow for multiple subscribers and provide new subscribers with last signal. @@ -368,7 +346,8 @@ private List> markHostAsExpired( private Host createHost(ResolvedAddress addr) { // All hosts will share the healthcheck config of the parent RR loadbalancer. - Host host = new Host<>(RoundRobinLoadBalancer.this.toString(), addr, healthCheckConfig); + Host host = new Host<>(RoundRobinLoadBalancer.this.toString(), addr, connectionFactory, + linearSearchSpace, healthCheckConfig); host.onClose().afterFinally(() -> usedHostsUpdater.updateAndGet(RoundRobinLoadBalancer.this, previousHosts -> { @SuppressWarnings("unchecked") @@ -472,10 +451,7 @@ public Publisher eventStream() { @Override public String toString() { - return "RoundRobinLoadBalancer{" + - "id=" + id + - ", targetResource=" + targetResource + - '}'; + return lbDescription; } private Single selectConnection0(final Predicate selector, @Nullable final ContextMap context, @@ -484,14 +460,13 @@ private Single selectConnection0(final Predicate selector, @Nullable final if (usedHosts.isEmpty()) { return isClosedList(usedHosts) ? failedLBClosed(targetResource) : // This is the case when SD has emitted some items but none of the hosts are available. - failed(StacklessNoAvailableHostException.newInstance( + failed(LBExceptions.StacklessNoAvailableHostException.newInstance( "No hosts are available to connect for " + targetResource + ".", RoundRobinLoadBalancer.class, "selectConnection0(...)")); } // try one loop over hosts and if all are expired, give up final int cursor = (indexUpdater.getAndIncrement(this) & Integer.MAX_VALUE) % usedHosts.size(); - final ThreadLocalRandom rnd = ThreadLocalRandom.current(); Host pickedHost = null; for (int i = 0; i < usedHosts.size(); ++i) { // for a particular iteration we maintain a local cursor without contention with other requests @@ -500,31 +475,10 @@ private Single selectConnection0(final Predicate selector, @Nullable final assert host != null : "Host can't be null."; if (!forceNewConnectionAndReserve) { - // Try first to see if an existing connection can be used - final Object[] connections = host.connections(); - // Exhaust the linear search space first: - final int linearAttempts = min(connections.length, linearSearchSpace); - for (int j = 0; j < linearAttempts; ++j) { - @SuppressWarnings("unchecked") - final C connection = (C) connections[j]; - if (selector.test(connection)) { - return succeeded(connection); - } - } - // Try other connections randomly: - if (connections.length > linearAttempts) { - final int diff = connections.length - linearAttempts; - // With small enough search space, attempt number of times equal to number of remaining connections. - // Back off after exploring most of the search space, it gives diminishing returns. - final int randomAttempts = diff < MIN_RANDOM_SEARCH_SPACE ? diff : - (int) (diff * RANDOM_SEARCH_FACTOR); - for (int j = 0; j < randomAttempts; ++j) { - @SuppressWarnings("unchecked") - final C connection = (C) connections[rnd.nextInt(linearAttempts, connections.length)]; - if (selector.test(connection)) { - return succeeded(connection); - } - } + // First see if an existing connection can be used + C connection = host.pickConnection(selector, context); + if (connection != null) { + return succeeded(connection); } } @@ -544,57 +498,12 @@ private Single selectConnection0(final Predicate selector, @Nullable final subscribeToEvents(true); } } - return failed(StacklessNoActiveHostException.newInstance("Failed to pick an active host for " + + return failed(LBExceptions.StacklessNoActiveHostException.newInstance("Failed to pick an active host for " + targetResource + ". Either all are busy, expired, or unhealthy: " + usedHosts, RoundRobinLoadBalancer.class, "selectConnection0(...)")); } // No connection was selected: create a new one. - final Host host = pickedHost; - - // This LB implementation does not automatically provide TransportObserver. Therefore, we pass "null" here. - // Users can apply a ConnectionFactoryFilter if they need to override this "null" value with TransportObserver. - Single establishConnection = connectionFactory.newConnection(host.address, context, null); - if (healthCheckConfig != null) { - // Schedule health check before returning - establishConnection = establishConnection.beforeOnError(t -> host.markUnhealthy(t, connectionFactory)); - } - return establishConnection - .flatMap(newCnx -> { - if (forceNewConnectionAndReserve && !newCnx.tryReserve()) { - return newCnx.closeAsync().concat(failed(StacklessConnectionRejectedException.newInstance( - "Newly created connection " + newCnx + " for " + targetResource - + " could not be reserved.", - RoundRobinLoadBalancer.class, "selectConnection0(...)"))) - .shareContextOnSubscribe(); - } - - // Invoke the selector before adding the connection to the pool, otherwise, connection can be - // used concurrently and hence a new connection can be rejected by the selector. - if (!selector.test(newCnx)) { - // Failure in selection could be the result of connection factory returning cached connection, - // and not having visibility into max-concurrent-requests, or other threads already selected the - // connection which uses all the max concurrent request count. - - // If there is caching Propagate the exception and rely upon retry strategy. - Single failedSingle = failed(StacklessConnectionRejectedException.newInstance( - "Newly created connection " + newCnx + " for " + targetResource - + " was rejected by the selection filter.", - RoundRobinLoadBalancer.class, "selectConnection0(...)")); - - // Just in case the connection is not closed add it to the host so we don't lose track, - // duplicates will be filtered out. - return (host.addConnection(newCnx, null) ? - failedSingle : newCnx.closeAsync().concat(failedSingle)).shareContextOnSubscribe(); - } - if (host.addConnection(newCnx, null)) { - return succeeded(newCnx).shareContextOnSubscribe(); - } - return newCnx.closeAsync().concat(isClosedList(this.usedHosts) ? failedLBClosed(targetResource) : - failed(StacklessConnectionRejectedException.newInstance( - "Failed to add newly created connection " + newCnx + " for " + targetResource - + " for " + host, RoundRobinLoadBalancer.class, "selectConnection0(...)"))) - .shareContextOnSubscribe(); - }); + return pickedHost.newConnection(selector, forceNewConnectionAndReserve, context); } @Override @@ -622,62 +531,17 @@ List>> usedAddresses() { return usedHosts.stream().map(Host::asEntry).collect(toList()); } - private static final class StacklessNoAvailableHostException extends NoAvailableHostException { - private static final long serialVersionUID = 5942960040738091793L; - - private StacklessNoAvailableHostException(final String message) { - super(message); - } - - @Override - public Throwable fillInStackTrace() { - return this; - } - - public static StacklessNoAvailableHostException newInstance(String message, Class clazz, String method) { - return ThrowableUtils.unknownStackTrace(new StacklessNoAvailableHostException(message), clazz, method); - } - } - - private static final class StacklessNoActiveHostException extends NoActiveHostException { - - private static final long serialVersionUID = 7500474499335155869L; - - private StacklessNoActiveHostException(final String message) { - super(message); - } - - @Override - public Throwable fillInStackTrace() { - return this; - } - - public static StacklessNoActiveHostException newInstance(String message, Class clazz, String method) { - return ThrowableUtils.unknownStackTrace(new StacklessNoActiveHostException(message), clazz, method); - } - } - - private static final class StacklessConnectionRejectedException extends ConnectionRejectedException { - private static final long serialVersionUID = -4940708893680455819L; - - private StacklessConnectionRejectedException(final String message) { - super(message); - } - - @Override - public Throwable fillInStackTrace() { - return this; - } - - public static StacklessConnectionRejectedException newInstance(String message, Class clazz, String method) { - return ThrowableUtils.unknownStackTrace(new StacklessConnectionRejectedException(message), clazz, method); - } - } - private static boolean isClosedList(List list) { return list.getClass().equals(ClosedList.class); } + private String makeDescription(String id, String targetResource) { + return "RoundRobinLoadBalancer{" + + "id=" + id + '@' + toHexString(identityHashCode(this)) + + ", targetResource=" + targetResource + + '}'; + } + private static final class ClosedList implements List { private final List delegate;