Skip to content

Commit

Permalink
Move much of the RR connection selection process to Host
Browse files Browse the repository at this point in the history
Motivation:

RoundRobinLoadBalancer contains the logic that dictates how an
individual connection is selected from a Host. This means
the RR lb is not just a load balancer but also the connection
pool.

Modifications:

- Extract the connection selection out of the RR balancer
  and put it in Host.

Result:

We're continuing to separate concerns into their own
abstractions. The current process of connection selection
by host can now more easily shared and refined in the future.
  • Loading branch information
bryce-anderson committed Oct 5, 2023
1 parent c667bb5 commit 549b94e
Show file tree
Hide file tree
Showing 3 changed files with 214 additions and 163 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import io.servicetalk.concurrent.api.AsyncContext;
import io.servicetalk.concurrent.api.Completable;
import io.servicetalk.concurrent.api.ListenableAsyncCloseable;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.internal.DelayedCancellable;
import io.servicetalk.context.api.ContextMap;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -31,21 +33,43 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Stream;
import javax.annotation.Nullable;

import static io.servicetalk.concurrent.api.AsyncCloseables.toAsyncCloseable;
import static io.servicetalk.concurrent.api.Completable.completed;
import static io.servicetalk.concurrent.api.Publisher.from;
import static io.servicetalk.concurrent.api.RetryStrategies.retryWithConstantBackoffDeltaJitter;
import static io.servicetalk.concurrent.api.Single.failed;
import static io.servicetalk.concurrent.api.Single.succeeded;
import static io.servicetalk.concurrent.internal.FlowControlUtils.addWithOverflowProtection;
import static java.lang.Math.min;
import static java.util.concurrent.atomic.AtomicReferenceFieldUpdater.newUpdater;
import static java.util.stream.Collectors.toList;

final class Host<Addr, C extends LoadBalancedConnection> implements ListenableAsyncCloseable {

/**
* With a relatively small number of connections we can minimize connection creation under moderate concurrency by
* exhausting the full search space without sacrificing too much latency caused by the cost of a CAS operation per
* selection attempt.
*/
private static final int MIN_RANDOM_SEARCH_SPACE = 64;

/**
* For larger search spaces, due to the cost of a CAS operation per selection attempt we see diminishing returns for
* trying to locate an available connection when most connections are in use. This increases tail latencies, thus
* after some number of failed attempts it appears to be more beneficial to open a new connection instead.
* <p>
* The current heuristics were chosen based on a set of benchmarks under various circumstances, low connection
* counts, larger connection counts, low connection churn, high connection churn.
*/
private static final float RANDOM_SEARCH_FACTOR = 0.75f;

private static final Object[] EMPTY_ARRAY = new Object[0];
private static final Logger LOGGER = LoggerFactory.getLogger(Host.class);

Expand All @@ -70,13 +94,18 @@ private enum State {
final Addr address;
@Nullable
private final HealthCheckConfig healthCheckConfig;
private final ConnectionFactory<Addr, ? extends C> connectionFactory;
private final int linearSearchSpace;
private final ListenableAsyncCloseable closeable;
private volatile ConnState connState = ACTIVE_EMPTY_CONN_STATE;

Host(String lbDescription, Addr address, @Nullable HealthCheckConfig healthCheckConfig) {
Host(String lbDescription, Addr address, ConnectionFactory<Addr, ? extends C> connectionFactory,
int linearSearchSpace, @Nullable HealthCheckConfig healthCheckConfig) {
this.lbDescription = lbDescription;
this.address = address;
this.healthCheckConfig = healthCheckConfig;
this.connectionFactory = connectionFactory;
this.linearSearchSpace = linearSearchSpace;
this.closeable = toAsyncCloseable(graceful ->
graceful ? doClose(AsyncCloseable::closeAsyncGracefully) : doClose(AsyncCloseable::closeAsync));
}
Expand Down Expand Up @@ -140,7 +169,89 @@ void markExpired() {
}
}

void markHealthy(final HealthCheck<Addr, C> originalHealthCheckState) {
@Nullable
C pickConnection(Predicate<C> selector, @Nullable final ContextMap context) {
// TODO: what is the deal with not needing a context here? What is it used for in the factory?
final Object[] connections = connState.connections;
// Exhaust the linear search space first:
final int linearAttempts = min(connections.length, linearSearchSpace);
for (int j = 0; j < linearAttempts; ++j) {
@SuppressWarnings("unchecked")
final C connection = (C) connections[j];
if (selector.test(connection)) {
return connection;
}
}
// Try other connections randomly:
if (connections.length > linearAttempts) {
final int diff = connections.length - linearAttempts;
// With small enough search space, attempt number of times equal to number of remaining connections.
// Back off after exploring most of the search space, it gives diminishing returns.
final int randomAttempts = diff < MIN_RANDOM_SEARCH_SPACE ? diff :
(int) (diff * RANDOM_SEARCH_FACTOR);
final ThreadLocalRandom rnd = ThreadLocalRandom.current();
for (int j = 0; j < randomAttempts; ++j) {
@SuppressWarnings("unchecked")
final C connection = (C) connections[rnd.nextInt(linearAttempts, connections.length)];
if (selector.test(connection)) {
return connection;
}
}
}
// So sad, we didn't find a healthy connection.
return null;
}

Single<C> newConnection(
Predicate<C> selector, final boolean forceNewConnectionAndReserve, @Nullable final ContextMap context) {
// This LB implementation does not automatically provide TransportObserver. Therefore, we pass "null" here.
// Users can apply a ConnectionFactoryFilter if they need to override this "null" value with TransportObserver.
Single<? extends C> establishConnection = connectionFactory.newConnection(address, context, null);
if (healthCheckConfig != null) {
// Schedule health check before returning
establishConnection = establishConnection.beforeOnError(t -> markUnhealthy(t));
}
return establishConnection
.flatMap(newCnx -> {
if (forceNewConnectionAndReserve && !newCnx.tryReserve()) {
return newCnx.closeAsync().<C>concat(failed(
LBExceptions.StacklessConnectionRejectedException.newInstance(
"Newly created connection " + newCnx + " for " + lbDescription
+ " could not be reserved.",
RoundRobinLoadBalancer.class, "selectConnection0(...)")))
.shareContextOnSubscribe();
}

// Invoke the selector before adding the connection to the pool, otherwise, connection can be
// used concurrently and hence a new connection can be rejected by the selector.
if (!selector.test(newCnx)) {
// Failure in selection could be the result of connection factory returning cached connection,
// and not having visibility into max-concurrent-requests, or other threads already selected the
// connection which uses all the max concurrent request count.

// If there is caching Propagate the exception and rely upon retry strategy.
Single<C> failedSingle = failed(LBExceptions.StacklessConnectionRejectedException.newInstance(
"Newly created connection " + newCnx + " for " + lbDescription
+ " was rejected by the selection filter.",
RoundRobinLoadBalancer.class, "selectConnection0(...)"));

// Just in case the connection is not closed add it to the host so we don't lose track,
// duplicates will be filtered out.
return (addConnection(newCnx, null) ?
failedSingle : newCnx.closeAsync().concat(failedSingle)).shareContextOnSubscribe();
}
if (addConnection(newCnx, null)) {
return succeeded(newCnx).shareContextOnSubscribe();
}
return newCnx.closeAsync().<C>concat(
failed(LBExceptions.StacklessConnectionRejectedException.newInstance(
"Failed to add newly created connection " + newCnx + " for " + toString(),
RoundRobinLoadBalancer.class, "selectConnection0(...)")))
.shareContextOnSubscribe();
});
}

private void markHealthy(final HealthCheck<Addr, C> originalHealthCheckState) {
// Marking healthy is called when we need to recover from an unexpected error.
// However, it is possible that in the meantime, the host entered an EXPIRED state, then ACTIVE, then failed
// to open connections and entered the UNHEALTHY state before the original thread continues execution here.
Expand All @@ -159,7 +270,7 @@ void markHealthy(final HealthCheck<Addr, C> originalHealthCheckState) {
}
}

void markUnhealthy(final Throwable cause, final ConnectionFactory<Addr, ? extends C> connectionFactory) {
private void markUnhealthy(final Throwable cause) {
assert healthCheckConfig != null;
for (;;) {
ConnState previous = connStateUpdater.get(this);
Expand Down Expand Up @@ -215,7 +326,7 @@ private static boolean isUnhealthy(ConnState connState) {
return HealthCheck.class.equals(connState.state.getClass());
}

boolean addConnection(final C connection, final @Nullable HealthCheck<Addr, C> currentHealthCheck) {
private boolean addConnection(final C connection, final @Nullable HealthCheck<Addr, C> currentHealthCheck) {
int addAttempt = 0;
for (;;) {
final ConnState previous = connStateUpdater.get(this);
Expand Down Expand Up @@ -325,10 +436,6 @@ Map.Entry<Addr, List<C>> asEntry() {
Stream.of(connState.connections).map(conn -> (C) conn).collect(toList()));
}

public Object[] connections() {
return connState.connections;
}

@Override
public Completable closeAsync() {
return closeable.closeAsync();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright © 2021-2023 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.loadbalancer;

import io.servicetalk.client.api.ConnectionRejectedException;
import io.servicetalk.client.api.NoActiveHostException;
import io.servicetalk.client.api.NoAvailableHostException;
import io.servicetalk.concurrent.internal.ThrowableUtils;

final class LBExceptions {

static final class StacklessNoAvailableHostException extends NoAvailableHostException {
private static final long serialVersionUID = 5942960040738091793L;

private StacklessNoAvailableHostException(final String message) {
super(message);
}

@Override
public Throwable fillInStackTrace() {
return this;
}

public static StacklessNoAvailableHostException newInstance(String message, Class<?> clazz, String method) {
return ThrowableUtils.unknownStackTrace(new StacklessNoAvailableHostException(message), clazz, method);
}
}

static final class StacklessNoActiveHostException extends NoActiveHostException {

private static final long serialVersionUID = 7500474499335155869L;

private StacklessNoActiveHostException(final String message) {
super(message);
}

@Override
public Throwable fillInStackTrace() {
return this;
}

public static StacklessNoActiveHostException newInstance(String message, Class<?> clazz, String method) {
return ThrowableUtils.unknownStackTrace(new StacklessNoActiveHostException(message), clazz, method);
}
}

static final class StacklessConnectionRejectedException extends ConnectionRejectedException {
private static final long serialVersionUID = -4940708893680455819L;

private StacklessConnectionRejectedException(final String message) {
super(message);
}

@Override
public Throwable fillInStackTrace() {
return this;
}

public static StacklessConnectionRejectedException newInstance(String message, Class<?> clazz, String method) {
return ThrowableUtils.unknownStackTrace(new StacklessConnectionRejectedException(message), clazz, method);
}
}

private LBExceptions() {
// no instances
}
}
Loading

0 comments on commit 549b94e

Please sign in to comment.