diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/Host.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/Host.java index f4961c2eb4..ed49b4a5a1 100644 --- a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/Host.java +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/Host.java @@ -22,7 +22,9 @@ import io.servicetalk.concurrent.api.AsyncContext; import io.servicetalk.concurrent.api.Completable; import io.servicetalk.concurrent.api.ListenableAsyncCloseable; +import io.servicetalk.concurrent.api.Single; import io.servicetalk.concurrent.internal.DelayedCancellable; +import io.servicetalk.context.api.ContextMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,8 +33,10 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.Function; +import java.util.function.Predicate; import java.util.stream.Stream; import javax.annotation.Nullable; @@ -40,12 +44,32 @@ import static io.servicetalk.concurrent.api.Completable.completed; import static io.servicetalk.concurrent.api.Publisher.from; import static io.servicetalk.concurrent.api.RetryStrategies.retryWithConstantBackoffDeltaJitter; +import static io.servicetalk.concurrent.api.Single.failed; +import static io.servicetalk.concurrent.api.Single.succeeded; import static io.servicetalk.concurrent.internal.FlowControlUtils.addWithOverflowProtection; +import static java.lang.Math.min; import static java.util.concurrent.atomic.AtomicReferenceFieldUpdater.newUpdater; import static java.util.stream.Collectors.toList; final class Host implements ListenableAsyncCloseable { + /** + * With a relatively small number of connections we can minimize connection creation under moderate concurrency by + * exhausting the full search space without sacrificing too much latency caused by the cost of a CAS operation per + * selection attempt. + */ + private static final int MIN_RANDOM_SEARCH_SPACE = 64; + + /** + * For larger search spaces, due to the cost of a CAS operation per selection attempt we see diminishing returns for + * trying to locate an available connection when most connections are in use. This increases tail latencies, thus + * after some number of failed attempts it appears to be more beneficial to open a new connection instead. + *

+ * The current heuristics were chosen based on a set of benchmarks under various circumstances, low connection + * counts, larger connection counts, low connection churn, high connection churn. + */ + private static final float RANDOM_SEARCH_FACTOR = 0.75f; + private static final Object[] EMPTY_ARRAY = new Object[0]; private static final Logger LOGGER = LoggerFactory.getLogger(Host.class); @@ -70,13 +94,18 @@ private enum State { final Addr address; @Nullable private final HealthCheckConfig healthCheckConfig; + private final ConnectionFactory connectionFactory; + private final int linearSearchSpace; private final ListenableAsyncCloseable closeable; private volatile ConnState connState = ACTIVE_EMPTY_CONN_STATE; - Host(String lbDescription, Addr address, @Nullable HealthCheckConfig healthCheckConfig) { + Host(String lbDescription, Addr address, ConnectionFactory connectionFactory, + int linearSearchSpace, @Nullable HealthCheckConfig healthCheckConfig) { this.lbDescription = lbDescription; this.address = address; this.healthCheckConfig = healthCheckConfig; + this.connectionFactory = connectionFactory; + this.linearSearchSpace = linearSearchSpace; this.closeable = toAsyncCloseable(graceful -> graceful ? doClose(AsyncCloseable::closeAsyncGracefully) : doClose(AsyncCloseable::closeAsync)); } @@ -140,7 +169,89 @@ void markExpired() { } } - void markHealthy(final HealthCheck originalHealthCheckState) { + @Nullable + C pickConnection(Predicate selector, @Nullable final ContextMap context) { + // TODO: what is the deal with not needing a context here? What is it used for in the factory? + final Object[] connections = connState.connections; + // Exhaust the linear search space first: + final int linearAttempts = min(connections.length, linearSearchSpace); + for (int j = 0; j < linearAttempts; ++j) { + @SuppressWarnings("unchecked") + final C connection = (C) connections[j]; + if (selector.test(connection)) { + return connection; + } + } + // Try other connections randomly: + if (connections.length > linearAttempts) { + final int diff = connections.length - linearAttempts; + // With small enough search space, attempt number of times equal to number of remaining connections. + // Back off after exploring most of the search space, it gives diminishing returns. + final int randomAttempts = diff < MIN_RANDOM_SEARCH_SPACE ? diff : + (int) (diff * RANDOM_SEARCH_FACTOR); + final ThreadLocalRandom rnd = ThreadLocalRandom.current(); + for (int j = 0; j < randomAttempts; ++j) { + @SuppressWarnings("unchecked") + final C connection = (C) connections[rnd.nextInt(linearAttempts, connections.length)]; + if (selector.test(connection)) { + return connection; + } + } + } + // So sad, we didn't find a healthy connection. + return null; + } + + Single newConnection( + Predicate selector, final boolean forceNewConnectionAndReserve, @Nullable final ContextMap context) { + // This LB implementation does not automatically provide TransportObserver. Therefore, we pass "null" here. + // Users can apply a ConnectionFactoryFilter if they need to override this "null" value with TransportObserver. + Single establishConnection = connectionFactory.newConnection(address, context, null); + if (healthCheckConfig != null) { + // Schedule health check before returning + establishConnection = establishConnection.beforeOnError(t -> markUnhealthy(t)); + } + return establishConnection + .flatMap(newCnx -> { + if (forceNewConnectionAndReserve && !newCnx.tryReserve()) { + return newCnx.closeAsync().concat(failed( + Exceptions.StacklessConnectionRejectedException.newInstance( + "Newly created connection " + newCnx + " for " + lbDescription + + " could not be reserved.", + RoundRobinLoadBalancer.class, "selectConnection0(...)"))) + .shareContextOnSubscribe(); + } + + // Invoke the selector before adding the connection to the pool, otherwise, connection can be + // used concurrently and hence a new connection can be rejected by the selector. + if (!selector.test(newCnx)) { + // Failure in selection could be the result of connection factory returning cached connection, + // and not having visibility into max-concurrent-requests, or other threads already selected the + // connection which uses all the max concurrent request count. + + // If there is caching Propagate the exception and rely upon retry strategy. + Single failedSingle = failed(Exceptions.StacklessConnectionRejectedException.newInstance( + "Newly created connection " + newCnx + " for " + lbDescription + + " was rejected by the selection filter.", + RoundRobinLoadBalancer.class, "selectConnection0(...)")); + + // Just in case the connection is not closed add it to the host so we don't lose track, + // duplicates will be filtered out. + return (addConnection(newCnx, null) ? + failedSingle : newCnx.closeAsync().concat(failedSingle)).shareContextOnSubscribe(); + } + if (addConnection(newCnx, null)) { + return succeeded(newCnx).shareContextOnSubscribe(); + } + return newCnx.closeAsync().concat( + failed(Exceptions.StacklessConnectionRejectedException.newInstance( + "Failed to add newly created connection " + newCnx + " for " + toString(), + RoundRobinLoadBalancer.class, "selectConnection0(...)"))) + .shareContextOnSubscribe(); + }); + } + + private void markHealthy(final HealthCheck originalHealthCheckState) { // Marking healthy is called when we need to recover from an unexpected error. // However, it is possible that in the meantime, the host entered an EXPIRED state, then ACTIVE, then failed // to open connections and entered the UNHEALTHY state before the original thread continues execution here. @@ -159,7 +270,7 @@ void markHealthy(final HealthCheck originalHealthCheckState) { } } - void markUnhealthy(final Throwable cause, final ConnectionFactory connectionFactory) { + private void markUnhealthy(final Throwable cause) { assert healthCheckConfig != null; for (;;) { ConnState previous = connStateUpdater.get(this); @@ -215,7 +326,7 @@ private static boolean isUnhealthy(ConnState connState) { return HealthCheck.class.equals(connState.state.getClass()); } - boolean addConnection(final C connection, final @Nullable HealthCheck currentHealthCheck) { + private boolean addConnection(final C connection, final @Nullable HealthCheck currentHealthCheck) { int addAttempt = 0; for (;;) { final ConnState previous = connStateUpdater.get(this); @@ -325,10 +436,6 @@ Map.Entry> asEntry() { Stream.of(connState.connections).map(conn -> (C) conn).collect(toList())); } - Object[] connections() { - return connState.connections; - } - @Override public Completable closeAsync() { return closeable.closeAsync(); diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/NewRoundRobinLoadBalancer.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/NewRoundRobinLoadBalancer.java index 54bbce2343..96896c244e 100644 --- a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/NewRoundRobinLoadBalancer.java +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/NewRoundRobinLoadBalancer.java @@ -29,9 +29,6 @@ import io.servicetalk.concurrent.api.Single; import io.servicetalk.concurrent.internal.SequentialCancellable; import io.servicetalk.context.api.ContextMap; -import io.servicetalk.loadbalancer.Exceptions.StacklessConnectionRejectedException; -import io.servicetalk.loadbalancer.Exceptions.StacklessNoActiveHostException; -import io.servicetalk.loadbalancer.Exceptions.StacklessNoAvailableHostException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,6 +39,7 @@ import java.util.Iterator; import java.util.List; import java.util.ListIterator; +import java.util.Map; import java.util.Map.Entry; import java.util.Spliterator; import java.util.concurrent.ThreadLocalRandom; @@ -68,7 +66,6 @@ import static io.servicetalk.concurrent.api.SourceAdapters.fromSource; import static io.servicetalk.concurrent.api.SourceAdapters.toSource; import static java.lang.Integer.toHexString; -import static java.lang.Math.min; import static java.lang.System.identityHashCode; import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; @@ -99,30 +96,13 @@ final class NewRoundRobinLoadBalancer - * The current heuristics were chosen based on a set of benchmarks under various circumstances, low connection - * counts, larger connection counts, low connection churn, high connection churn. - */ - private static final float RANDOM_SEARCH_FACTOR = 0.75f; - private volatile long nextResubscribeTime = RESUBSCRIBING; @SuppressWarnings("unused") private volatile int index; private volatile List> usedHosts = emptyList(); - private final String id; private final String targetResource; + private final String lbDescription; private final Publisher>> eventPublisher; private final Processor eventStreamProcessor = newPublisherProcessorDropHeadOnOverflow(32); private final Publisher eventStream; @@ -153,8 +133,8 @@ final class NewRoundRobinLoadBalancer connectionFactory, final int linearSearchSpace, @Nullable final HealthCheckConfig healthCheckConfig) { - this.id = id + '@' + toHexString(identityHashCode(this)); this.targetResource = requireNonNull(targetResourceName); + this.lbDescription = makeDescription(id, targetResource); this.eventPublisher = requireNonNull(eventPublisher); this.eventStream = fromSource(eventStreamProcessor) .replay(1); // Allow for multiple subscribers and provide new subscribers with last signal. @@ -367,8 +347,8 @@ private List> markHostAsExpired( private Host createHost(ResolvedAddress addr) { // All hosts will share the healthcheck config of the parent RR loadbalancer. - Host host = new Host<>(NewRoundRobinLoadBalancer.this.toString(), addr, - healthCheckConfig); + Host host = new Host<>(NewRoundRobinLoadBalancer.this.toString(), addr, connectionFactory, + linearSearchSpace, healthCheckConfig); host.onClose().afterFinally(() -> usedHostsUpdater.updateAndGet(NewRoundRobinLoadBalancer.this, previousHosts -> { @SuppressWarnings("unchecked") @@ -408,7 +388,7 @@ private List> addHostToList( private List> listWithHostRemoved( List> oldHostsTyped, Predicate> hostPredicate) { if (oldHostsTyped.isEmpty()) { - // this can happen when an expired host is removed during closing of the RoundRobinLoadBalancer, + // this can happen when an expired host is removed during closing of the NewRoundRobinLoadBalancer, // but all of its connections have already been closed return oldHostsTyped; } @@ -472,10 +452,7 @@ public Publisher eventStream() { @Override public String toString() { - return "RoundRobinLoadBalancer{" + - "id=" + id + - ", targetResource=" + targetResource + - '}'; + return lbDescription; } private Single selectConnection0(final Predicate selector, @Nullable final ContextMap context, @@ -484,14 +461,13 @@ private Single selectConnection0(final Predicate selector, @Nullable final if (usedHosts.isEmpty()) { return isClosedList(usedHosts) ? failedLBClosed(targetResource) : // This is the case when SD has emitted some items but none of the hosts are available. - failed(StacklessNoAvailableHostException.newInstance( + failed(Exceptions.StacklessNoAvailableHostException.newInstance( "No hosts are available to connect for " + targetResource + ".", NewRoundRobinLoadBalancer.class, "selectConnection0(...)")); } // try one loop over hosts and if all are expired, give up final int cursor = (indexUpdater.getAndIncrement(this) & Integer.MAX_VALUE) % usedHosts.size(); - final ThreadLocalRandom rnd = ThreadLocalRandom.current(); Host pickedHost = null; for (int i = 0; i < usedHosts.size(); ++i) { // for a particular iteration we maintain a local cursor without contention with other requests @@ -500,31 +476,10 @@ private Single selectConnection0(final Predicate selector, @Nullable final assert host != null : "Host can't be null."; if (!forceNewConnectionAndReserve) { - // Try first to see if an existing connection can be used - final Object[] connections = host.connections(); - // Exhaust the linear search space first: - final int linearAttempts = min(connections.length, linearSearchSpace); - for (int j = 0; j < linearAttempts; ++j) { - @SuppressWarnings("unchecked") - final C connection = (C) connections[j]; - if (selector.test(connection)) { - return succeeded(connection); - } - } - // Try other connections randomly: - if (connections.length > linearAttempts) { - final int diff = connections.length - linearAttempts; - // With small enough search space, attempt number of times equal to number of remaining connections. - // Back off after exploring most of the search space, it gives diminishing returns. - final int randomAttempts = diff < MIN_RANDOM_SEARCH_SPACE ? diff : - (int) (diff * RANDOM_SEARCH_FACTOR); - for (int j = 0; j < randomAttempts; ++j) { - @SuppressWarnings("unchecked") - final C connection = (C) connections[rnd.nextInt(linearAttempts, connections.length)]; - if (selector.test(connection)) { - return succeeded(connection); - } - } + // First see if an existing connection can be used + C connection = host.pickConnection(selector, context); + if (connection != null) { + return succeeded(connection); } } @@ -544,57 +499,12 @@ private Single selectConnection0(final Predicate selector, @Nullable final subscribeToEvents(true); } } - return failed(StacklessNoActiveHostException.newInstance("Failed to pick an active host for " + + return failed(Exceptions.StacklessNoActiveHostException.newInstance("Failed to pick an active host for " + targetResource + ". Either all are busy, expired, or unhealthy: " + usedHosts, NewRoundRobinLoadBalancer.class, "selectConnection0(...)")); } // No connection was selected: create a new one. - final Host host = pickedHost; - - // This LB implementation does not automatically provide TransportObserver. Therefore, we pass "null" here. - // Users can apply a ConnectionFactoryFilter if they need to override this "null" value with TransportObserver. - Single establishConnection = connectionFactory.newConnection(host.address, context, null); - if (healthCheckConfig != null) { - // Schedule health check before returning - establishConnection = establishConnection.beforeOnError(t -> host.markUnhealthy(t, connectionFactory)); - } - return establishConnection - .flatMap(newCnx -> { - if (forceNewConnectionAndReserve && !newCnx.tryReserve()) { - return newCnx.closeAsync().concat(failed(StacklessConnectionRejectedException.newInstance( - "Newly created connection " + newCnx + " for " + targetResource - + " could not be reserved.", - NewRoundRobinLoadBalancer.class, "selectConnection0(...)"))) - .shareContextOnSubscribe(); - } - - // Invoke the selector before adding the connection to the pool, otherwise, connection can be - // used concurrently and hence a new connection can be rejected by the selector. - if (!selector.test(newCnx)) { - // Failure in selection could be the result of connection factory returning cached connection, - // and not having visibility into max-concurrent-requests, or other threads already selected the - // connection which uses all the max concurrent request count. - - // If there is caching Propagate the exception and rely upon retry strategy. - Single failedSingle = failed(StacklessConnectionRejectedException.newInstance( - "Newly created connection " + newCnx + " for " + targetResource - + " was rejected by the selection filter.", - NewRoundRobinLoadBalancer.class, "selectConnection0(...)")); - - // Just in case the connection is not closed add it to the host so we don't lose track, - // duplicates will be filtered out. - return (host.addConnection(newCnx, null) ? - failedSingle : newCnx.closeAsync().concat(failedSingle)).shareContextOnSubscribe(); - } - if (host.addConnection(newCnx, null)) { - return succeeded(newCnx).shareContextOnSubscribe(); - } - return newCnx.closeAsync().concat(isClosedList(this.usedHosts) ? failedLBClosed(targetResource) : - failed(StacklessConnectionRejectedException.newInstance( - "Failed to add newly created connection " + newCnx + " for " + targetResource - + " for " + host, NewRoundRobinLoadBalancer.class, "selectConnection0(...)"))) - .shareContextOnSubscribe(); - }); + return pickedHost.newConnection(selector, forceNewConnectionAndReserve, context); } @Override @@ -617,7 +527,6 @@ public Completable closeAsyncGracefully() { return asyncCloseable.closeAsyncGracefully(); } - // Visible for testing @Override public List>> usedAddresses() { return usedHosts.stream().map(Host::asEntry).collect(toList()); @@ -627,6 +536,13 @@ private static boolean isClosedList(List list) { return list.getClass().equals(ClosedList.class); } + private String makeDescription(String id, String targetResource) { + return "NewRoundRobinLoadBalancer{" + + "id=" + id + '@' + toHexString(identityHashCode(this)) + + ", targetResource=" + targetResource + + '}'; + } + private static final class ClosedList implements List { private final List delegate; diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java index 3c17c9cf23..4d80742079 100644 --- a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java @@ -16,17 +16,21 @@ package io.servicetalk.loadbalancer; import io.servicetalk.client.api.ConnectionFactory; +import io.servicetalk.client.api.ConnectionLimitReachedException; import io.servicetalk.client.api.LoadBalancedConnection; import io.servicetalk.client.api.LoadBalancer; import io.servicetalk.client.api.ServiceDiscovererEvent; import io.servicetalk.concurrent.PublisherSource.Processor; import io.servicetalk.concurrent.PublisherSource.Subscriber; import io.servicetalk.concurrent.PublisherSource.Subscription; +import io.servicetalk.concurrent.api.AsyncCloseable; +import io.servicetalk.concurrent.api.AsyncContext; import io.servicetalk.concurrent.api.Completable; import io.servicetalk.concurrent.api.CompositeCloseable; import io.servicetalk.concurrent.api.ListenableAsyncCloseable; import io.servicetalk.concurrent.api.Publisher; import io.servicetalk.concurrent.api.Single; +import io.servicetalk.concurrent.internal.DelayedCancellable; import io.servicetalk.concurrent.internal.SequentialCancellable; import io.servicetalk.context.api.ContextMap; import io.servicetalk.loadbalancer.Exceptions.StacklessConnectionRejectedException; @@ -36,13 +40,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.AbstractMap.SimpleImmutableEntry; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.ListIterator; -import java.util.Map; import java.util.Map.Entry; import java.util.Spliterator; import java.util.concurrent.ThreadLocalRandom; @@ -50,6 +55,7 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.Consumer; +import java.util.function.Function; import java.util.function.Predicate; import java.util.function.UnaryOperator; import java.util.stream.Stream; @@ -62,12 +68,16 @@ import static io.servicetalk.client.api.ServiceDiscovererEvent.Status.UNAVAILABLE; import static io.servicetalk.concurrent.api.AsyncCloseables.newCompositeCloseable; import static io.servicetalk.concurrent.api.AsyncCloseables.toAsyncCloseable; +import static io.servicetalk.concurrent.api.Completable.completed; import static io.servicetalk.concurrent.api.Processors.newPublisherProcessorDropHeadOnOverflow; +import static io.servicetalk.concurrent.api.Publisher.from; +import static io.servicetalk.concurrent.api.RetryStrategies.retryWithConstantBackoffDeltaJitter; import static io.servicetalk.concurrent.api.Single.defer; import static io.servicetalk.concurrent.api.Single.failed; import static io.servicetalk.concurrent.api.Single.succeeded; import static io.servicetalk.concurrent.api.SourceAdapters.fromSource; import static io.servicetalk.concurrent.api.SourceAdapters.toSource; +import static io.servicetalk.concurrent.internal.FlowControlUtils.addWithOverflowProtection; import static java.lang.Integer.toHexString; import static java.lang.Math.min; import static java.lang.System.identityHashCode; @@ -75,6 +85,7 @@ import static java.util.Collections.singletonList; import static java.util.Objects.requireNonNull; import static java.util.concurrent.TimeUnit.NANOSECONDS; +import static java.util.concurrent.atomic.AtomicReferenceFieldUpdater.newUpdater; import static java.util.stream.Collectors.toList; /** @@ -88,6 +99,8 @@ final class RoundRobinLoadBalancer usedHostsUpdater = AtomicReferenceFieldUpdater.newUpdater(RoundRobinLoadBalancer.class, List.class, "usedHosts"); @@ -218,7 +231,7 @@ private static boolean allUn final List> usedHosts) { boolean allUnhealthy = !usedHosts.isEmpty(); for (Host host : usedHosts) { - if (!host.isUnhealthy()) { + if (!Host.isUnhealthy(host.connState)) { allUnhealthy = false; break; } @@ -367,7 +380,6 @@ private List> markHostAsExpired( } private Host createHost(ResolvedAddress addr) { - // All hosts will share the healthcheck config of the parent RR loadbalancer. Host host = new Host<>(RoundRobinLoadBalancer.this.toString(), addr, healthCheckConfig); host.onClose().afterFinally(() -> usedHostsUpdater.updateAndGet(RoundRobinLoadBalancer.this, previousHosts -> { @@ -501,7 +513,7 @@ private Single selectConnection0(final Predicate selector, @Nullable final if (!forceNewConnectionAndReserve) { // Try first to see if an existing connection can be used - final Object[] connections = host.connections(); + final Object[] connections = host.connState.connections; // Exhaust the linear search space first: final int linearAttempts = min(connections.length, linearSearchSpace); for (int j = 0; j < linearAttempts; ++j) { @@ -554,7 +566,7 @@ private Single selectConnection0(final Predicate selector, @Nullable final // This LB implementation does not automatically provide TransportObserver. Therefore, we pass "null" here. // Users can apply a ConnectionFactoryFilter if they need to override this "null" value with TransportObserver. Single establishConnection = connectionFactory.newConnection(host.address, context, null); - if (healthCheckConfig != null) { + if (host.healthCheckConfig != null) { // Schedule health check before returning establishConnection = establishConnection.beforeOnError(t -> host.markUnhealthy(t, connectionFactory)); } @@ -618,7 +630,7 @@ public Completable closeAsyncGracefully() { } @Override - public List>> usedAddresses() { + public List>> usedAddresses() { return usedHosts.stream().map(Host::asEntry).collect(toList()); } @@ -626,6 +638,441 @@ private static boolean isClosedList(List list) { return list.getClass().equals(ClosedList.class); } + private static final class Host implements ListenableAsyncCloseable { + + private enum State { + // The enum is not exhaustive, as other states have dynamic properties. + // For clarity, the other state classes are listed as comments: + // ACTIVE - see ActiveState + // UNHEALTHY - see HealthCheck + EXPIRED, + CLOSED + } + + private static final ActiveState STATE_ACTIVE_NO_FAILURES = new ActiveState(); + private static final ConnState ACTIVE_EMPTY_CONN_STATE = new ConnState(EMPTY_ARRAY, STATE_ACTIVE_NO_FAILURES); + private static final ConnState CLOSED_CONN_STATE = new ConnState(EMPTY_ARRAY, State.CLOSED); + + @SuppressWarnings("rawtypes") + private static final AtomicReferenceFieldUpdater connStateUpdater = + newUpdater(Host.class, ConnState.class, "connState"); + + private final String lbDescription; + final Addr address; + @Nullable + private final HealthCheckConfig healthCheckConfig; + private final ListenableAsyncCloseable closeable; + private volatile ConnState connState = ACTIVE_EMPTY_CONN_STATE; + + Host(String lbDescription, Addr address, @Nullable HealthCheckConfig healthCheckConfig) { + this.lbDescription = lbDescription; + this.address = address; + this.healthCheckConfig = healthCheckConfig; + this.closeable = toAsyncCloseable(graceful -> + graceful ? doClose(AsyncCloseable::closeAsyncGracefully) : doClose(AsyncCloseable::closeAsync)); + } + + boolean markActiveIfNotClosed() { + final Object oldState = connStateUpdater.getAndUpdate(this, oldConnState -> { + if (oldConnState.state == State.EXPIRED) { + return new ConnState(oldConnState.connections, STATE_ACTIVE_NO_FAILURES); + } + // If oldConnState.state == State.ACTIVE this could mean either a duplicate event, + // or a repeated CAS operation. We could issue a warning, but as we don't know, we don't log anything. + // UNHEALTHY state cannot transition to ACTIVE without passing the health check. + return oldConnState; + }).state; + return oldState != State.CLOSED; + } + + void markClosed() { + final ConnState oldState = closeConnState(); + final Object[] toRemove = oldState.connections; + cancelIfHealthCheck(oldState); + LOGGER.debug("{}: closing {} connection(s) gracefully to the closed address: {}.", + lbDescription, toRemove.length, address); + for (Object conn : toRemove) { + @SuppressWarnings("unchecked") + final C cConn = (C) conn; + cConn.closeAsyncGracefully().subscribe(); + } + } + + private ConnState closeConnState() { + for (;;) { + // We need to keep the oldState.connections around even if we are closed because the user may do + // closeGracefully with a timeout, which fails, and then force close. If we discard connections when + // closeGracefully is started we may leak connections. + final ConnState oldState = connState; + if (oldState.state == State.CLOSED || connStateUpdater.compareAndSet(this, oldState, + new ConnState(oldState.connections, State.CLOSED))) { + return oldState; + } + } + } + + void markExpired() { + for (;;) { + ConnState oldState = connStateUpdater.get(this); + if (oldState.state == State.EXPIRED || oldState.state == State.CLOSED) { + break; + } + Object nextState = oldState.connections.length == 0 ? State.CLOSED : State.EXPIRED; + + if (connStateUpdater.compareAndSet(this, oldState, + new ConnState(oldState.connections, nextState))) { + cancelIfHealthCheck(oldState); + if (nextState == State.CLOSED) { + // Trigger the callback to remove the host from usedHosts array. + this.closeAsync().subscribe(); + } + break; + } + } + } + + void markHealthy(final HealthCheck originalHealthCheckState) { + // Marking healthy is called when we need to recover from an unexpected error. + // However, it is possible that in the meantime, the host entered an EXPIRED state, then ACTIVE, then failed + // to open connections and entered the UNHEALTHY state before the original thread continues execution here. + // In such case, the flipped state is not the same as the one that just succeeded to open a connection. + // In an unlikely scenario that the following connection attempts fail indefinitely, a health check task + // would leak and would not be cancelled. Therefore, we cancel it here and allow failures to trigger a new + // health check. + ConnState oldState = connStateUpdater.getAndUpdate(this, previous -> { + if (Host.isUnhealthy(previous)) { + return new ConnState(previous.connections, STATE_ACTIVE_NO_FAILURES); + } + return previous; + }); + if (oldState.state != originalHealthCheckState) { + cancelIfHealthCheck(oldState); + } + } + + void markUnhealthy(final Throwable cause, final ConnectionFactory connectionFactory) { + assert healthCheckConfig != null; + for (;;) { + ConnState previous = connStateUpdater.get(this); + + if (!Host.isActive(previous) || previous.connections.length > 0 + || cause instanceof ConnectionLimitReachedException) { + LOGGER.debug("{}: failed to open a new connection to the host on address {}. {}.", + lbDescription, address, previous, cause); + break; + } + + ActiveState previousState = (ActiveState) previous.state; + if (previousState.failedConnections + 1 < healthCheckConfig.failedThreshold) { + final ActiveState nextState = previousState.forNextFailedConnection(); + if (connStateUpdater.compareAndSet(this, previous, + new ConnState(previous.connections, nextState))) { + LOGGER.debug("{}: failed to open a new connection to the host on address {}" + + " {} time(s) ({} consecutive failures will trigger health-checking).", + lbDescription, address, nextState.failedConnections, + healthCheckConfig.failedThreshold, cause); + break; + } + // another thread won the race, try again + continue; + } + + final HealthCheck healthCheck = new HealthCheck<>(connectionFactory, this, cause); + final ConnState nextState = new ConnState(previous.connections, healthCheck); + if (connStateUpdater.compareAndSet(this, previous, nextState)) { + LOGGER.info("{}: failed to open a new connection to the host on address {} " + + "{} time(s) in a row. Error counting threshold reached, marking this host as " + + "UNHEALTHY for the selection algorithm and triggering background health-checking.", + lbDescription, address, healthCheckConfig.failedThreshold, cause); + healthCheck.schedule(cause); + break; + } + } + } + + boolean isActiveAndHealthy() { + return isActive(connState); + } + + static boolean isActive(final ConnState connState) { + return ActiveState.class.equals(connState.state.getClass()); + } + + static boolean isUnhealthy(final ConnState connState) { + return HealthCheck.class.equals(connState.state.getClass()); + } + + boolean addConnection(final C connection, final @Nullable HealthCheck currentHealthCheck) { + int addAttempt = 0; + for (;;) { + final ConnState previous = connStateUpdater.get(this); + if (previous.state == State.CLOSED) { + return false; + } + ++addAttempt; + + final Object[] existing = previous.connections; + // Brute force iteration to avoid duplicates. If connections grow larger and faster lookup is required + // we can keep a Set for faster lookups (at the cost of more memory) as well as array. + for (final Object o : existing) { + if (o.equals(connection)) { + return true; + } + } + Object[] newList = Arrays.copyOf(existing, existing.length + 1); + newList[existing.length] = connection; + + // If we were able to add a new connection to the list, we should mark the host as ACTIVE again and + // reset its failures counter. + final Object newState = Host.isActive(previous) || Host.isUnhealthy(previous) ? + STATE_ACTIVE_NO_FAILURES : previous.state; + + if (connStateUpdater.compareAndSet(this, + previous, new ConnState(newList, newState))) { + // It could happen that the Host turned into UNHEALTHY state either concurrently with adding a new + // connection or with passing a previous health-check (if SD turned it into ACTIVE state). In both + // cases we have to cancel the "previous" ongoing health check. See "markHealthy" for more context. + if (Host.isUnhealthy(previous) && + (currentHealthCheck == null || previous.state != currentHealthCheck)) { + assert newState == STATE_ACTIVE_NO_FAILURES; + cancelIfHealthCheck(previous); + } + break; + } + } + + LOGGER.trace("{}: added a new connection {} to {} after {} attempt(s).", + lbDescription, connection, this, addAttempt); + // Instrument the new connection so we prune it on close + connection.onClose().beforeFinally(() -> { + int removeAttempt = 0; + for (;;) { + final ConnState currentConnState = this.connState; + if (currentConnState.state == State.CLOSED) { + break; + } + assert currentConnState.connections.length > 0; + ++removeAttempt; + int i = 0; + final Object[] connections = currentConnState.connections; + // Search for the connection in the list. + for (; i < connections.length; ++i) { + if (connections[i].equals(connection)) { + break; + } + } + if (i == connections.length) { + // Connection was already removed, nothing to do. + break; + } else if (connections.length == 1) { + assert !Host.isUnhealthy(currentConnState) : "Cannot be UNHEALTHY with #connections > 0"; + if (Host.isActive(currentConnState)) { + if (connStateUpdater.compareAndSet(this, currentConnState, + new ConnState(EMPTY_ARRAY, currentConnState.state))) { + break; + } + } else if (currentConnState.state == State.EXPIRED + // We're closing the last connection, close the Host. + // Closing the host will trigger the Host's onClose method, which will remove the host + // from used hosts list. If a race condition appears and a new connection was added + // in the meantime, that would mean the host is available again and the CAS operation + // will allow for determining that. It will prevent closing the Host and will only + // remove the connection (previously considered as the last one) from the array + // in the next iteration. + && connStateUpdater.compareAndSet(this, currentConnState, CLOSED_CONN_STATE)) { + this.closeAsync().subscribe(); + break; + } + } else { + Object[] newList = new Object[connections.length - 1]; + System.arraycopy(connections, 0, newList, 0, i); + System.arraycopy(connections, i + 1, newList, i, newList.length - i); + if (connStateUpdater.compareAndSet(this, + currentConnState, new ConnState(newList, currentConnState.state))) { + break; + } + } + } + LOGGER.trace("{}: removed connection {} from {} after {} attempt(s).", + lbDescription, connection, this, removeAttempt); + }).onErrorComplete(t -> { + // Use onErrorComplete instead of whenOnError to avoid double logging of an error inside subscribe(): + // SimpleCompletableSubscriber. + LOGGER.error("{}: unexpected error while processing connection.onClose() for {}.", + lbDescription, connection, t); + return true; + }).subscribe(); + return true; + } + + // Used for testing only + @SuppressWarnings("unchecked") + Entry> asEntry() { + return new SimpleImmutableEntry<>(address, + Stream.of(connState.connections).map(conn -> (C) conn).collect(toList())); + } + + @Override + public Completable closeAsync() { + return closeable.closeAsync(); + } + + @Override + public Completable closeAsyncGracefully() { + return closeable.closeAsyncGracefully(); + } + + @Override + public Completable onClose() { + return closeable.onClose(); + } + + @Override + public Completable onClosing() { + return closeable.onClosing(); + } + + @SuppressWarnings("unchecked") + private Completable doClose(final Function closeFunction) { + return Completable.defer(() -> { + final ConnState oldState = closeConnState(); + cancelIfHealthCheck(oldState); + final Object[] connections = oldState.connections; + return (connections.length == 0 ? completed() : + from(connections).flatMapCompletableDelayError(conn -> closeFunction.apply((C) conn))) + .shareContextOnSubscribe(); + }); + } + + private void cancelIfHealthCheck(ConnState connState) { + if (Host.isUnhealthy(connState)) { + @SuppressWarnings("unchecked") + HealthCheck healthCheck = (HealthCheck) connState.state; + LOGGER.debug("{}: health check cancelled for {}.", lbDescription, healthCheck.host); + healthCheck.cancel(); + } + } + + @Override + public String toString() { + final ConnState connState = this.connState; + return "Host{" + + "lbDescription=" + lbDescription + + ", address=" + address + + ", state=" + connState.state + + ", #connections=" + connState.connections.length + + '}'; + } + + private static final class ActiveState { + private final int failedConnections; + + ActiveState() { + this(0); + } + + private ActiveState(int failedConnections) { + this.failedConnections = failedConnections; + } + + ActiveState forNextFailedConnection() { + return new ActiveState(addWithOverflowProtection(this.failedConnections, 1)); + } + + @Override + public String toString() { + return "ACTIVE(failedConnections=" + failedConnections + ')'; + } + } + + private static final class HealthCheck + extends DelayedCancellable { + private final ConnectionFactory connectionFactory; + private final Host host; + private final Throwable lastError; + + private HealthCheck(final ConnectionFactory connectionFactory, + final Host host, final Throwable lastError) { + this.connectionFactory = connectionFactory; + this.host = host; + this.lastError = lastError; + } + + public void schedule(final Throwable originalCause) { + assert host.healthCheckConfig != null; + delayedCancellable( + // Use retry strategy to utilize jitter. + retryWithConstantBackoffDeltaJitter(cause -> true, + host.healthCheckConfig.healthCheckInterval, + host.healthCheckConfig.jitter, + host.healthCheckConfig.executor) + .apply(0, originalCause) + // Remove any state from async context + .beforeOnSubscribe(__ -> AsyncContext.clear()) + .concat(connectionFactory.newConnection(host.address, null, null) + // There is no risk for StackOverflowError because result of each connection + // attempt will be invoked on IoExecutor as a new task. + .retryWhen(retryWithConstantBackoffDeltaJitter( + cause -> { + LOGGER.debug("{}: health check failed for {}.", + host.lbDescription, host, cause); + return true; + }, + host.healthCheckConfig.healthCheckInterval, + host.healthCheckConfig.jitter, + host.healthCheckConfig.executor))) + .flatMapCompletable(newCnx -> { + if (host.addConnection(newCnx, this)) { + LOGGER.info("{}: health check passed for {}, marked this " + + "host as ACTIVE for the selection algorithm.", + host.lbDescription, host); + return completed(); + } else { + // This happens only if the host is closed, no need to mark as healthy. + assert host.connState.state == State.CLOSED; + LOGGER.debug("{}: health check passed for {}, but the " + + "host rejected a new connection {}. Closing it now.", + host.lbDescription, host, newCnx); + return newCnx.closeAsync(); + } + }) + // Use onErrorComplete instead of whenOnError to avoid double logging of an error inside + // subscribe(): SimpleCompletableSubscriber. + .onErrorComplete(t -> { + LOGGER.error("{}: health check terminated with " + + "an unexpected error for {}. Marking this host as ACTIVE as a fallback " + + "to allow connection attempts.", host.lbDescription, host, t); + host.markHealthy(this); + return true; + }) + .subscribe()); + } + + @Override + public String toString() { + return "UNHEALTHY(" + lastError + ')'; + } + } + + private static final class ConnState { + final Object[] connections; + final Object state; + + ConnState(final Object[] connections, final Object state) { + this.connections = connections; + this.state = state; + } + + @Override + public String toString() { + return "ConnState{" + + "state=" + state + + ", #connections=" + connections.length + + '}'; + } + } + } + private static final class ClosedList implements List { private final List delegate;