Skip to content

Commit

Permalink
Some lifecycle management changes
Browse files Browse the repository at this point in the history
  • Loading branch information
bryce-anderson committed Dec 7, 2023
1 parent ebb41e4 commit b7365af
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ abstract class BaseHostSelector<ResolvedAddress, C extends LoadBalancedConnectio
implements HostSelector<ResolvedAddress, C> {

private final String targetResource;
private final boolean isEmpty;
BaseHostSelector(final boolean isEmpty, final String targetResource) {
this.isEmpty = isEmpty;
private final List<Host<ResolvedAddress, C>> hosts;
BaseHostSelector(final List<Host<ResolvedAddress, C>> hosts, final String targetResource) {
this.hosts = hosts;
this.targetResource = requireNonNull(targetResource, "targetResource");
}

Expand All @@ -43,7 +43,12 @@ protected abstract Single<C> selectConnection0(@Nonnull Predicate<C> selector, @
@Override
public final Single<C> selectConnection(@Nonnull Predicate<C> selector, @Nullable ContextMap context,
boolean forceNewConnectionAndReserve) {
return isEmpty ? noHostsFailure() : selectConnection0(selector, context, forceNewConnectionAndReserve);
return hosts.isEmpty() ? noHostsFailure() : selectConnection0(selector, context, forceNewConnectionAndReserve);
}

@Override
public boolean isHealthy() {
return !allUnhealthy(hosts);
}

protected final String getTargetResource() {
Expand All @@ -59,6 +64,19 @@ protected final Single<C> noActiveHostsFailure(List<Host<ResolvedAddress, C>> us
private Single<C> noHostsFailure() {
return failed(Exceptions.StacklessNoAvailableHostException.newInstance(
"No hosts are available to connect for " + targetResource + ".",
this.getClass(), "selectConnection0(...)"));
this.getClass(), "selectConnection(...)"));
}

// This will be faster than `allHealthy` in the typical case since we expect hosts to be healthy most of the time.
private static <ResolvedAddress, C extends LoadBalancedConnection> boolean allUnhealthy(
final List<Host<ResolvedAddress, C>> usedHosts) {
boolean allUnhealthy = !usedHosts.isEmpty();
for (Host<ResolvedAddress, C> host : usedHosts) {
if (!host.isUnhealthy()) {
allUnhealthy = false;
break;
}
}
return allUnhealthy;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.servicetalk.client.api.ConnectionFactory;
import io.servicetalk.client.api.LoadBalancedConnection;
import io.servicetalk.client.api.NoActiveHostException;
import io.servicetalk.client.api.ServiceDiscovererEvent;
import io.servicetalk.concurrent.CompletableSource;
import io.servicetalk.concurrent.PublisherSource.Processor;
Expand Down Expand Up @@ -44,6 +45,7 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.function.Predicate;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import static io.servicetalk.client.api.LoadBalancerReadyEvent.LOAD_BALANCER_NOT_READY_EVENT;
Expand Down Expand Up @@ -87,7 +89,7 @@ final class DefaultLoadBalancer<ResolvedAddress, C extends LoadBalancedConnectio
// writes to these fields protected by `executeSequentially` but they can be read from any thread.
private volatile List<Host<ResolvedAddress, C>> usedHosts = emptyList();
private volatile HostSelector<ResolvedAddress, C> hostSelector;
private volatile boolean isClosed;
private boolean isClosed;

private final String targetResource;
private final SequentialExecutor sequentialExecutor;
Expand Down Expand Up @@ -177,10 +179,10 @@ private Completable doClose(final boolean graceful) {
SourceAdapters.toSource((graceful ? compositeCloseable.closeAsyncGracefully() :
// We only want to empty the host list on error if we're closing non-gracefully.
compositeCloseable.closeAsync().beforeOnError(t ->
sequentialExecutor.execute(() -> updateUsedHosts(emptyList())))
sequentialExecutor.execute(this::finishClosed)
)
// we want to always empty out the host list if we complete successfully
.beforeOnComplete(() -> sequentialExecutor.execute(() -> updateUsedHosts(emptyList()))))
.beforeOnComplete(() -> sequentialExecutor.execute(this::finishClosed))))
.subscribe(processor);
} catch (Throwable ex) {
processor.onError(ex);
Expand All @@ -189,6 +191,11 @@ private Completable doClose(final boolean graceful) {
return SourceAdapters.fromSource(processor);
}

private void finishClosed() {
hostSelector = new ClosedHostSelector();
usedHosts = emptyList();
}

private static <R, C extends LoadBalancedConnection> long nextResubscribeTime(
final HealthCheckConfig config, final DefaultLoadBalancer<R, C> lb) {
final long lowerNanos = config.healthCheckResubscribeLowerBound;
Expand All @@ -201,18 +208,6 @@ private static <R, C extends LoadBalancedConnection> long nextResubscribeTime(
return result;
}

private static <ResolvedAddress, C extends LoadBalancedConnection> boolean allUnhealthy(
final List<Host<ResolvedAddress, C>> usedHosts) {
boolean allUnhealthy = !usedHosts.isEmpty();
for (Host<ResolvedAddress, C> host : usedHosts) {
if (!host.isUnhealthy()) {
allUnhealthy = false;
break;
}
}
return allUnhealthy;
}

private static <ResolvedAddress> boolean onlyAvailable(
final Collection<? extends ServiceDiscovererEvent<ResolvedAddress>> events) {
boolean onlyAvailable = !events.isEmpty();
Expand Down Expand Up @@ -438,10 +433,6 @@ private void updateUsedHosts(List<Host<ResolvedAddress, C>> nextHosts) {
this.hostSelector = hostSelector.rebuildWithHosts(usedHosts);
}

private static <T> Single<T> failedLBClosed(String targetResource) {
return failed(new IllegalStateException("LoadBalancer for " + targetResource + " has closed"));
}

@Override
public Single<C> selectConnection(final Predicate<C> selector, @Nullable final ContextMap context) {
return defer(() -> selectConnection0(selector, context, false).shareContextOnSubscribe());
Expand All @@ -454,21 +445,11 @@ public Single<C> newConnection(@Nullable final ContextMap context) {

private Single<C> selectConnection0(final Predicate<C> selector, @Nullable final ContextMap context,
final boolean forceNewConnectionAndReserve) {
final List<Host<ResolvedAddress, C>> currentHosts = this.usedHosts;
// It's possible that we're racing with updates from the `onNext` method but since it's intrinsically
// racy it's fine to do these 'are there any hosts at all' checks here using the total host set.
if (currentHosts.isEmpty()) {
return isClosed ? failedLBClosed(targetResource) :
// This is the case when SD has emitted some items but none of the hosts are available.
failed(Exceptions.StacklessNoAvailableHostException.newInstance(
"No hosts are available to connect for " + targetResource + ".",
this.getClass(), "selectConnection0(...)"));
}

Single<C> result = hostSelector.selectConnection(selector, context, forceNewConnectionAndReserve);
final HostSelector<ResolvedAddress, C> currentHostSelector = hostSelector;
Single<C> result = currentHostSelector.selectConnection(selector, context, forceNewConnectionAndReserve);
if (healthCheckConfig != null) {
result = result.beforeOnError(exn -> {
if (exn instanceof Exceptions.StacklessNoActiveHostException && allUnhealthy(currentHosts)) {
if (exn instanceof NoActiveHostException && !currentHostSelector.isHealthy()) {
final long currNextResubscribeTime = nextResubscribeTime;
if (currNextResubscribeTime >= 0 &&
healthCheckConfig.executor.currentTime(NANOSECONDS) >= currNextResubscribeTime &&
Expand Down Expand Up @@ -522,4 +503,22 @@ private String makeDescription(String id, String targetResource) {
", targetResource=" + targetResource +
'}';
}

private class ClosedHostSelector implements HostSelector<ResolvedAddress, C> {
@Override
public Single<C> selectConnection(@Nonnull Predicate<C> selector, @Nullable ContextMap context,
boolean forceNewConnectionAndReserve) {
return failed(new IllegalStateException("LoadBalancer for " + targetResource + " has closed"));
}

@Override
public HostSelector<ResolvedAddress, C> rebuildWithHosts(@Nonnull List<Host<ResolvedAddress, C>> hosts) {
return this;
}

@Override
public boolean isHealthy() {
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,14 @@ Single<C> selectConnection(@Nonnull Predicate<C> selector, @Nullable ContextMap
* @return the next selector that should be used for host selection.
*/
HostSelector<ResolvedAddress, C> rebuildWithHosts(@Nonnull List<Host<ResolvedAddress, C>> hosts);

/**
* Whether the load balancer believes itself healthy enough to serve traffic.
* <p>
* Note that this is both racy and best effort: just because it is healthy doesn't guarantee that
* this selector will be able to successfully serve a request or that if unhealthy a request is
* guaranteed to fail.
* @return whether the load balancer believes itself healthy enough to serve traffic.
*/
boolean isHealthy();
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ final class P2CSelector<ResolvedAddress, C extends LoadBalancedConnection>

P2CSelector(@Nonnull List<Host<ResolvedAddress, C>> hosts,
final String targetResource, final int maxEffort, @Nullable final Random random) {
super(hosts.isEmpty(), targetResource);
super(hosts, targetResource);
this.hosts = hosts;
this.maxEffort = maxEffort;
this.random = random;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ final class RoundRobinSelector<ResolvedAddress, C extends LoadBalancedConnection

private RoundRobinSelector(final AtomicInteger index, final List<Host<ResolvedAddress, C>> usedHosts,
final String targetResource) {
super(usedHosts.isEmpty(), targetResource);
super(usedHosts, targetResource);
this.index = index;
this.usedHosts = usedHosts;
}
Expand Down

0 comments on commit b7365af

Please sign in to comment.