diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/BaseHostSelector.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/BaseHostSelector.java index ff41e3fd8b..18e9927294 100644 --- a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/BaseHostSelector.java +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/BaseHostSelector.java @@ -31,9 +31,9 @@ abstract class BaseHostSelector { private final String targetResource; - private final boolean isEmpty; - BaseHostSelector(final boolean isEmpty, final String targetResource) { - this.isEmpty = isEmpty; + private final List> hosts; + BaseHostSelector(final List> hosts, final String targetResource) { + this.hosts = hosts; this.targetResource = requireNonNull(targetResource, "targetResource"); } @@ -43,7 +43,12 @@ protected abstract Single selectConnection0(@Nonnull Predicate selector, @ @Override public final Single selectConnection(@Nonnull Predicate selector, @Nullable ContextMap context, boolean forceNewConnectionAndReserve) { - return isEmpty ? noHostsFailure() : selectConnection0(selector, context, forceNewConnectionAndReserve); + return hosts.isEmpty() ? noHostsFailure() : selectConnection0(selector, context, forceNewConnectionAndReserve); + } + + @Override + public boolean isHealthy() { + return !allUnhealthy(hosts); } protected final String getTargetResource() { @@ -59,6 +64,19 @@ protected final Single noActiveHostsFailure(List> us private Single noHostsFailure() { return failed(Exceptions.StacklessNoAvailableHostException.newInstance( "No hosts are available to connect for " + targetResource + ".", - this.getClass(), "selectConnection0(...)")); + this.getClass(), "selectConnection(...)")); + } + + // This will be faster than `allHealthy` in the typical case since we expect hosts to be healthy most of the time. + private static boolean allUnhealthy( + final List> usedHosts) { + boolean allUnhealthy = !usedHosts.isEmpty(); + for (Host host : usedHosts) { + if (!host.isUnhealthy()) { + allUnhealthy = false; + break; + } + } + return allUnhealthy; } } diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/DefaultLoadBalancer.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/DefaultLoadBalancer.java index ec0deaeed4..6e5562e17d 100644 --- a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/DefaultLoadBalancer.java +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/DefaultLoadBalancer.java @@ -17,6 +17,7 @@ import io.servicetalk.client.api.ConnectionFactory; import io.servicetalk.client.api.LoadBalancedConnection; +import io.servicetalk.client.api.NoActiveHostException; import io.servicetalk.client.api.ServiceDiscovererEvent; import io.servicetalk.concurrent.CompletableSource; import io.servicetalk.concurrent.PublisherSource.Processor; @@ -44,6 +45,7 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.function.Predicate; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import static io.servicetalk.client.api.LoadBalancerReadyEvent.LOAD_BALANCER_NOT_READY_EVENT; @@ -87,7 +89,7 @@ final class DefaultLoadBalancer> usedHosts = emptyList(); private volatile HostSelector hostSelector; - private volatile boolean isClosed; + private boolean isClosed; private final String targetResource; private final SequentialExecutor sequentialExecutor; @@ -177,10 +179,10 @@ private Completable doClose(final boolean graceful) { SourceAdapters.toSource((graceful ? compositeCloseable.closeAsyncGracefully() : // We only want to empty the host list on error if we're closing non-gracefully. compositeCloseable.closeAsync().beforeOnError(t -> - sequentialExecutor.execute(() -> updateUsedHosts(emptyList()))) + sequentialExecutor.execute(this::finishClosed) ) // we want to always empty out the host list if we complete successfully - .beforeOnComplete(() -> sequentialExecutor.execute(() -> updateUsedHosts(emptyList())))) + .beforeOnComplete(() -> sequentialExecutor.execute(this::finishClosed)))) .subscribe(processor); } catch (Throwable ex) { processor.onError(ex); @@ -189,6 +191,11 @@ private Completable doClose(final boolean graceful) { return SourceAdapters.fromSource(processor); } + private void finishClosed() { + hostSelector = new ClosedHostSelector(); + usedHosts = emptyList(); + } + private static long nextResubscribeTime( final HealthCheckConfig config, final DefaultLoadBalancer lb) { final long lowerNanos = config.healthCheckResubscribeLowerBound; @@ -201,18 +208,6 @@ private static long nextResubscribeTime( return result; } - private static boolean allUnhealthy( - final List> usedHosts) { - boolean allUnhealthy = !usedHosts.isEmpty(); - for (Host host : usedHosts) { - if (!host.isUnhealthy()) { - allUnhealthy = false; - break; - } - } - return allUnhealthy; - } - private static boolean onlyAvailable( final Collection> events) { boolean onlyAvailable = !events.isEmpty(); @@ -438,10 +433,6 @@ private void updateUsedHosts(List> nextHosts) { this.hostSelector = hostSelector.rebuildWithHosts(usedHosts); } - private static Single failedLBClosed(String targetResource) { - return failed(new IllegalStateException("LoadBalancer for " + targetResource + " has closed")); - } - @Override public Single selectConnection(final Predicate selector, @Nullable final ContextMap context) { return defer(() -> selectConnection0(selector, context, false).shareContextOnSubscribe()); @@ -454,21 +445,11 @@ public Single newConnection(@Nullable final ContextMap context) { private Single selectConnection0(final Predicate selector, @Nullable final ContextMap context, final boolean forceNewConnectionAndReserve) { - final List> currentHosts = this.usedHosts; - // It's possible that we're racing with updates from the `onNext` method but since it's intrinsically - // racy it's fine to do these 'are there any hosts at all' checks here using the total host set. - if (currentHosts.isEmpty()) { - return isClosed ? failedLBClosed(targetResource) : - // This is the case when SD has emitted some items but none of the hosts are available. - failed(Exceptions.StacklessNoAvailableHostException.newInstance( - "No hosts are available to connect for " + targetResource + ".", - this.getClass(), "selectConnection0(...)")); - } - - Single result = hostSelector.selectConnection(selector, context, forceNewConnectionAndReserve); + final HostSelector currentHostSelector = hostSelector; + Single result = currentHostSelector.selectConnection(selector, context, forceNewConnectionAndReserve); if (healthCheckConfig != null) { result = result.beforeOnError(exn -> { - if (exn instanceof Exceptions.StacklessNoActiveHostException && allUnhealthy(currentHosts)) { + if (exn instanceof NoActiveHostException && !currentHostSelector.isHealthy()) { final long currNextResubscribeTime = nextResubscribeTime; if (currNextResubscribeTime >= 0 && healthCheckConfig.executor.currentTime(NANOSECONDS) >= currNextResubscribeTime && @@ -522,4 +503,22 @@ private String makeDescription(String id, String targetResource) { ", targetResource=" + targetResource + '}'; } + + private class ClosedHostSelector implements HostSelector { + @Override + public Single selectConnection(@Nonnull Predicate selector, @Nullable ContextMap context, + boolean forceNewConnectionAndReserve) { + return failed(new IllegalStateException("LoadBalancer for " + targetResource + " has closed")); + } + + @Override + public HostSelector rebuildWithHosts(@Nonnull List> hosts) { + return this; + } + + @Override + public boolean isHealthy() { + return false; + } + } } diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/HostSelector.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/HostSelector.java index 82b2c64164..f9d8ef8b9f 100644 --- a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/HostSelector.java +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/HostSelector.java @@ -60,4 +60,14 @@ Single selectConnection(@Nonnull Predicate selector, @Nullable ContextMap * @return the next selector that should be used for host selection. */ HostSelector rebuildWithHosts(@Nonnull List> hosts); + + /** + * Whether the load balancer believes itself healthy enough to serve traffic. + *

+ * Note that this is both racy and best effort: just because it is healthy doesn't guarantee that + * this selector will be able to successfully serve a request or that if unhealthy a request is + * guaranteed to fail. + * @return whether the load balancer believes itself healthy enough to serve traffic. + */ + boolean isHealthy(); } diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/P2CSelector.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/P2CSelector.java index 296949a651..88ec2f0508 100644 --- a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/P2CSelector.java +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/P2CSelector.java @@ -46,7 +46,7 @@ final class P2CSelector P2CSelector(@Nonnull List> hosts, final String targetResource, final int maxEffort, @Nullable final Random random) { - super(hosts.isEmpty(), targetResource); + super(hosts, targetResource); this.hosts = hosts; this.maxEffort = maxEffort; this.random = random; diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinSelector.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinSelector.java index 8c0151cd63..c59e7157f8 100644 --- a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinSelector.java +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinSelector.java @@ -39,7 +39,7 @@ final class RoundRobinSelector> usedHosts, final String targetResource) { - super(usedHosts.isEmpty(), targetResource); + super(usedHosts, targetResource); this.index = index; this.usedHosts = usedHosts; }