Skip to content

Commit

Permalink
Cleanup the concurrrency model in DefaultLoadBalancer a bit more
Browse files Browse the repository at this point in the history
  • Loading branch information
bryce-anderson committed Dec 7, 2023
1 parent b7365af commit 6ed40e2
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public final Single<C> selectConnection(@Nonnull Predicate<C> selector, @Nullabl
}

@Override
public boolean isHealthy() {
public final boolean isHealthy() {
return !allUnhealthy(hosts);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.function.Predicate;
import javax.annotation.Nonnull;
Expand Down Expand Up @@ -86,9 +88,11 @@ final class DefaultLoadBalancer<ResolvedAddress, C extends LoadBalancedConnectio

private volatile long nextResubscribeTime = RESUBSCRIBING;

// writes to these fields protected by `executeSequentially` but they can be read from any thread.
private volatile List<Host<ResolvedAddress, C>> usedHosts = emptyList();
// writes are protected by `executeSequentially` but the field can be read by any thread.
private volatile HostSelector<ResolvedAddress, C> hostSelector;
// reads and writes are protected by `executeSequentially`.
private List<Host<ResolvedAddress, C>> usedHosts = emptyList();
// reads and writes are protected by `executeSequentially`.
private boolean isClosed;

private final String targetResource;
Expand Down Expand Up @@ -179,10 +183,10 @@ private Completable doClose(final boolean graceful) {
SourceAdapters.toSource((graceful ? compositeCloseable.closeAsyncGracefully() :
// We only want to empty the host list on error if we're closing non-gracefully.
compositeCloseable.closeAsync().beforeOnError(t ->
sequentialExecutor.execute(this::finishClosed)
sequentialExecutor.execute(this::sequentialCompleteClosed)
)
// we want to always empty out the host list if we complete successfully
.beforeOnComplete(() -> sequentialExecutor.execute(this::finishClosed))))
.beforeOnComplete(() -> sequentialExecutor.execute(this::sequentialCompleteClosed))))
.subscribe(processor);
} catch (Throwable ex) {
processor.onError(ex);
Expand All @@ -191,9 +195,10 @@ private Completable doClose(final boolean graceful) {
return SourceAdapters.fromSource(processor);
}

private void finishClosed() {
hostSelector = new ClosedHostSelector();
// must be called from within the sequential executor.
private void sequentialCompleteClosed() {
usedHosts = emptyList();
hostSelector = new ClosedHostSelector();
}

private static <R, C extends LoadBalancedConnection> long nextResubscribeTime(
Expand Down Expand Up @@ -286,6 +291,7 @@ private void sequentialOnNext(Collection<? extends ServiceDiscovererEvent<Resolv
// First thing we do is go through the existing hosts and see if we need to transfer them. These
// will be all existing hosts that either don't have a matching discovery event or are not marked
// as unavailable. If they are marked unavailable, we need to close them.
boolean hostSetChanged = false;
for (Host<ResolvedAddress, C> host : oldUsedHosts) {
ServiceDiscovererEvent<ResolvedAddress> event = eventMap.remove(host.address());
if (event == null) {
Expand All @@ -299,14 +305,20 @@ private void sequentialOnNext(Collection<? extends ServiceDiscovererEvent<Resolv
if (host.markActiveIfNotClosed()) {
nextHosts.add(host);
} else {
// It's a new host, so the set changed.
hostSetChanged = true;
nextHosts.add(createHost(event.address()));
}
} else if (EXPIRED.equals(event.status())) {
if (!host.markExpired()) {
nextHosts.add(host);
} else {
// Marking it expired also resulted in removing it from the set.
hostSetChanged = true;
}
} else if (UNAVAILABLE.equals(event.status())) {
host.markClosed();
hostSetChanged = true;
} else {
LOGGER.warn("{}: Unsupported Status in event:" +
" {} (mapped to {}). Leaving usedHosts unchanged: {}",
Expand All @@ -319,11 +331,14 @@ private void sequentialOnNext(Collection<? extends ServiceDiscovererEvent<Resolv
for (ServiceDiscovererEvent<ResolvedAddress> event : eventMap.values()) {
if (AVAILABLE.equals(event.status())) {
sendReadyEvent = true;
hostSetChanged = true;
nextHosts.add(createHost(event.address()));
}
}
// We've built the new list so now set it for consumption and then send our events.
updateUsedHosts(nextHosts);
// We've built a materially different host set so now set it for consumption and send our events.
if (hostSetChanged) {
sequentialUpdateUsedHosts(nextHosts);
}

LOGGER.debug("{}: now using addresses (size={}): {}.",
DefaultLoadBalancer.this, nextHosts.size(), nextHosts);
Expand Down Expand Up @@ -371,10 +386,13 @@ private Host<ResolvedAddress, C> createHost(ResolvedAddress addr) {
}
final List<Host<ResolvedAddress, C>> nextHosts = listWithHostRemoved(
currentHosts, current -> current == host);
updateUsedHosts(nextHosts);
if (nextHosts.isEmpty()) {
// We transitioned from non-empty to empty. That means we're not ready.
eventStreamProcessor.onNext(LOAD_BALANCER_NOT_READY_EVENT);
// we only need to do anything else if we actually removed the host
if (nextHosts.size() != currentHosts.size()) {
sequentialUpdateUsedHosts(nextHosts);
if (nextHosts.isEmpty()) {
// We transitioned from non-empty to empty. That means we're not ready.
eventStreamProcessor.onNext(LOAD_BALANCER_NOT_READY_EVENT);
}
}
})).subscribe();
return host;
Expand Down Expand Up @@ -406,29 +424,34 @@ private List<Host<ResolvedAddress, C>> listWithHostRemoved(

@Override
public void onError(final Throwable t) {
List<Host<ResolvedAddress, C>> hosts = usedHosts;
if (healthCheckConfig == null) {
// Terminate processor only if we will never re-subscribe
eventStreamProcessor.onError(t);
}
LOGGER.error(
"{}: service discoverer {} emitted an error. Last seen addresses (size={}): {}.",
DefaultLoadBalancer.this, eventPublisher, hosts.size(), hosts, t);
sequentialExecutor.execute(() -> {
if (healthCheckConfig == null) {
// Terminate processor only if we will never re-subscribe
eventStreamProcessor.onError(t);
}
List<Host<ResolvedAddress, C>> hosts = usedHosts;
LOGGER.error(
"{}: service discoverer {} emitted an error. Last seen addresses (size={}): {}.",
DefaultLoadBalancer.this, eventPublisher, hosts.size(), hosts, t);
});
}

@Override
public void onComplete() {
List<Host<ResolvedAddress, C>> hosts = usedHosts;
if (healthCheckConfig == null) {
// Terminate processor only if we will never re-subscribe
eventStreamProcessor.onComplete();
}
LOGGER.error("{}: service discoverer completed. Last seen addresses (size={}): {}.",
DefaultLoadBalancer.this, hosts.size(), hosts);
sequentialExecutor.execute(() -> {
List<Host<ResolvedAddress, C>> hosts = usedHosts;
if (healthCheckConfig == null) {
// Terminate processor only if we will never re-subscribe
eventStreamProcessor.onComplete();
}
LOGGER.error("{}: service discoverer completed. Last seen addresses (size={}): {}.",
DefaultLoadBalancer.this, hosts.size(), hosts);
});
}
}

private void updateUsedHosts(List<Host<ResolvedAddress, C>> nextHosts) {
// must be called from within the SequentialExecutor
private void sequentialUpdateUsedHosts(List<Host<ResolvedAddress, C>> nextHosts) {
this.usedHosts = nextHosts;
this.hostSelector = hostSelector.rebuildWithHosts(usedHosts);
}
Expand Down Expand Up @@ -494,7 +517,15 @@ public Completable closeAsyncGracefully() {

@Override
public List<Entry<ResolvedAddress, List<C>>> usedAddresses() {
return usedHosts.stream().map(host -> ((DefaultHost<ResolvedAddress, C>) host).asEntry()).collect(toList());
// This method is just for testing so we can use some awaiting to get the results in a thread safe way.
CompletableFuture<List<Entry<ResolvedAddress, List<C>>>> future = new CompletableFuture<>();
sequentialExecutor.execute(() -> future.complete(
usedHosts.stream().map(host -> ((DefaultHost<ResolvedAddress, C>) host).asEntry()).collect(toList())));
try {
return future.get(5, TimeUnit.SECONDS);
} catch (Exception ex) {
throw new AssertionError("Failed to get results", ex);
}
}

private String makeDescription(String id, String targetResource) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,20 @@
package io.servicetalk.loadbalancer;

import io.servicetalk.client.api.ServiceDiscovererEvent;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.api.TestPublisher;
import io.servicetalk.context.api.ContextMap;

import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;

import java.util.Collection;
import java.util.List;
import java.util.function.Predicate;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import static io.servicetalk.concurrent.api.Single.succeeded;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static io.servicetalk.concurrent.api.Single.failed;
import static org.junit.jupiter.api.Assertions.assertEquals;

class DefaultLoadBalancerTest extends LoadBalancerTestScaffold {

Expand All @@ -42,7 +42,7 @@ protected boolean eagerConnectionShutdown() {
}

@Test
void hostUpdatesRebuildsSelector() {
void newHostsRebuildsSelector() throws Exception {
// necessary because we're making a new lb.
serviceDiscoveryPublisher.onComplete();

Expand All @@ -51,8 +51,46 @@ void hostUpdatesRebuildsSelector() {
lb = newTestLoadBalancer();

sendServiceDiscoveryEvents(upEvent("address-1"));
// We should have rebuilt the LB due to a host update.
assertEquals(1, lbPolicy.rebuilds);
}

@Test
void removingHostsRebuildsSelector() throws Exception {
// necessary because we're making a new lb.
serviceDiscoveryPublisher.onComplete();

final TestLoadBalancerPolicy lbPolicy = new TestLoadBalancerPolicy();
loadBalancingPolicy = lbPolicy;
lb = newTestLoadBalancer();

sendServiceDiscoveryEvents(upEvent("address-1"));
// We should have rebuilt the LB due to a host update.
assertEquals(1, lbPolicy.rebuilds);
// take it back down immediate. No connections means we close in the sd event.
sendServiceDiscoveryEvents(downEvent("address-1"));
verify(lbPolicy.hostSelector, times(2)).rebuildWithHosts(ArgumentMatchers.anyList());
assertEquals(2, lbPolicy.rebuilds);
}

@Test
void lazyHostExpirationRebuildsSelector() throws Exception {
// necessary because we're making a new lb.
serviceDiscoveryPublisher.onComplete();

final TestLoadBalancerPolicy lbPolicy = new TestLoadBalancerPolicy();
loadBalancingPolicy = lbPolicy;
lb = newTestLoadBalancer();

sendServiceDiscoveryEvents(upEvent("address-1"));
// We should have rebuilt the LB due to a host update.
assertEquals(1, lbPolicy.rebuilds);
// should be an expired but not gone yet because a connection remains.
TestLoadBalancedConnection cxn = lb.selectConnection(any(), null).toFuture().get();
sendServiceDiscoveryEvents(downEvent("address-1"));
assertEquals(1, lbPolicy.rebuilds);
// Close the connection and we should see a rebuild.
cxn.closeAsync().subscribe();
assertEquals(2, lbPolicy.rebuilds);
}

@Override
Expand All @@ -75,17 +113,7 @@ private LoadBalancerBuilder<String, TestLoadBalancedConnection> baseLoadBalancer

private static class TestLoadBalancerPolicy implements LoadBalancingPolicy<String, TestLoadBalancedConnection> {

HostSelector<String, TestLoadBalancedConnection> hostSelector;

TestLoadBalancerPolicy() {
TestLoadBalancedConnection connection = TestLoadBalancedConnection.mockConnection("address");
hostSelector = mock(HostSelector.class);
when(hostSelector.selectConnection(ArgumentMatchers.any(), ArgumentMatchers.any(),
ArgumentMatchers.anyBoolean()))
.thenReturn(succeeded(connection));
when(hostSelector.rebuildWithHosts(ArgumentMatchers.anyList()))
.thenReturn(hostSelector);
}
int rebuilds;

@Override
public String name() {
Expand All @@ -95,7 +123,36 @@ public String name() {
@Override
public HostSelector<String, TestLoadBalancedConnection> buildSelector(
@Nonnull List<Host<String, TestLoadBalancedConnection>> hosts, String targetResource) {
return hostSelector;
return new TestSelector(hosts);
}

private class TestSelector implements HostSelector<String, TestLoadBalancedConnection> {

private final List<Host<String, TestLoadBalancedConnection>> hosts;

TestSelector(final List<Host<String, TestLoadBalancedConnection>> hosts) {
this.hosts = hosts;
}

@Override
public Single<TestLoadBalancedConnection> selectConnection(
@Nonnull Predicate<TestLoadBalancedConnection> selector, @Nullable ContextMap context,
boolean forceNewConnectionAndReserve) {
return hosts.isEmpty() ? failed(new IllegalStateException("shouldn't be empty"))
: hosts.get(0).newConnection(selector, false, context);
}

@Override
public HostSelector<String, TestLoadBalancedConnection> rebuildWithHosts(
@Nonnull List<Host<String, TestLoadBalancedConnection>> hosts) {
rebuilds++;
return new TestSelector(hosts);
}

@Override
public boolean isHealthy() {
return true;
}
}
}
}

0 comments on commit 6ed40e2

Please sign in to comment.