Skip to content

Commit

Permalink
Use Publisher#replay operator
Browse files Browse the repository at this point in the history
Motivation:
Publisher#replay was recently added, and there are use cases
that can be simplified/improved by using this new operator.
  • Loading branch information
Scottmitch committed Oct 2, 2023
1 parent da1ea3b commit 93531d5
Show file tree
Hide file tree
Showing 10 changed files with 82 additions and 220 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ default Single<C> newConnection(@Nullable ContextMap context) {
/**
* A {@link Publisher} of events provided by this {@link LoadBalancer}. This maybe used to broadcast internal state
* of this {@link LoadBalancer} to provide hints/visibility for external usage.
* <p>
* Note the {@link Publisher} maybe subscribed to multiple times. Using operators such as
* {@link Publisher#replay(int)} is recommended.
* @return A {@link Publisher} of events provided by this {@link LoadBalancer}.
*/
Publisher<Object> eventStream();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public DefaultHealthService() {
*/
public DefaultHealthService(Predicate<String> watchAllowed) {
this.watchAllowed = requireNonNull(watchAllowed);
serviceToStatusMap.put(OVERALL_SERVICE_NAME, new HealthValue(SERVING));
serviceToStatusMap.put(OVERALL_SERVICE_NAME, HealthValue.newInstance(SERVING));
}

@Override
Expand All @@ -85,7 +85,7 @@ public Single<HealthCheckResponse> check(final GrpcServiceContext ctx, final Hea
return Single.failed(new GrpcStatusException(
new GrpcStatus(NOT_FOUND, "unknown service: " + request.getService())));
}
return Single.succeeded(health.last);
return health.publisher.takeAtMost(1).firstOrError();
}

@Override
Expand All @@ -103,13 +103,13 @@ public Publisher<HealthCheckResponse> watch(final GrpcServiceContext ctx, final
return Publisher.from(newBuilder().setStatus(NOT_SERVING).build());
}
healthValue = serviceToStatusMap.computeIfAbsent(request.getService(),
__ -> new HealthValue(SERVICE_UNKNOWN));
__ -> HealthValue.newInstance(SERVICE_UNKNOWN));
} finally {
lock.unlock();
}
}

return Publisher.from(healthValue.last).concat(healthValue.publisher);
return healthValue.publisher;
}

/**
Expand All @@ -130,7 +130,7 @@ public boolean setStatus(String service, ServingStatus status) {
return false;
}
resp = newBuilder().setStatus(status).build();
healthValue = serviceToStatusMap.computeIfAbsent(service, __ -> new HealthValue(resp));
healthValue = serviceToStatusMap.computeIfAbsent(service, __ -> new HealthValue());
} finally {
lock.unlock();
}
Expand Down Expand Up @@ -181,24 +181,29 @@ public boolean terminate() {
private static final class HealthValue {
private final Processor<HealthCheckResponse, HealthCheckResponse> processor;
private final Publisher<HealthCheckResponse> publisher;
private volatile HealthCheckResponse last;

private HealthValue(final HealthCheckResponse initialState) {
HealthValue() {
this.processor = newPublisherProcessorDropHeadOnOverflow(4);
this.publisher = fromSource(processor)
// Allow multiple subscribers to Subscribe to the resulting Publisher.
.multicast(1, false);
this.last = initialState;
// Allow multiple subscribers to Subscribe to the resulting Publisher, use a history of 1
// so each new subscriber gets the latest state.
.replay(1);
// Maintain a Subscriber so signals are always delivered to replay and new Subscribers get the latest
// signal.
publisher.ignoreElements().subscribe();
}

private HealthValue(final ServingStatus status) {
this(newBuilder().setStatus(status).build());
static HealthValue newInstance(final HealthCheckResponse initialState) {
HealthValue value = new HealthValue();
value.next(initialState);
return value;
}

static HealthValue newInstance(final ServingStatus status) {
return newInstance(newBuilder().setStatus(status).build());
}

void next(HealthCheckResponse response) {
// Set the status here instead of in an operator because we need the status to be updated regardless if
// anyone is consuming the status.
last = response;
processor.onNext(response);
}

Expand All @@ -208,7 +213,12 @@ void next(HealthCheckResponse response) {
* @param status The last status to set.
*/
void completeMultipleTerminalSafe(ServingStatus status) {
next(newBuilder().setStatus(status).build());
try {
next(newBuilder().setStatus(status).build());
} catch (Throwable cause) {
processor.onError(cause);
return;
}
processor.onComplete();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,9 @@ abstract class AbstractStreamingHttpConnection<CC extends NettyConnectionContext
maxConcurrencySetting = from(new IgnoreConsumedEvent<>(maxPipelinedRequests))
.concat(connection.onClosing())
.concat(succeeded(ZERO_MAX_CONCURRENCY_EVENT))
.multicast(1); // Allows multiple Subscribers to consume the event stream.
.replay(1); // Allow multiple Subscribers to consume, new Subscribers get last signal.
// Maintain a Subscriber so signals are always delivered to replay and new Subscribers get the latest signal.
maxConcurrencySetting.ignoreElements().subscribe();
this.headersFactory = headersFactory;
this.allowDropTrailersReadFromTransport = allowDropTrailersReadFromTransport;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,10 @@ private static final class DefaultH2ClientParentConnection extends AbstractH2Par
maxConcurrencyProcessor.onNext(DEFAULT_H2_MAX_CONCURRENCY_EVENT);
bs = new Http2StreamChannelBootstrap(connection.channel());
maxConcurrencyPublisher = fromSource(maxConcurrencyProcessor)
.multicast(1); // Allows multiple Subscribers to consume the event stream.
.replay(1); // Allow multiple Subscribers to consume, new Subscribers get last signal.
// Maintain a Subscriber so signals are always delivered to replay and new Subscribers get the latest
// signal.
maxConcurrencyPublisher.ignoreElements().subscribe();
}

@Override
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
import io.servicetalk.client.api.LoadBalancerReadyEvent;
import io.servicetalk.client.api.NoAvailableHostException;
import io.servicetalk.client.api.ServiceDiscoverer;
import io.servicetalk.concurrent.api.AsyncCloseable;
import io.servicetalk.concurrent.PublisherSource.Subscriber;
import io.servicetalk.concurrent.PublisherSource.Subscription;
import io.servicetalk.concurrent.api.AsyncContext;
import io.servicetalk.concurrent.api.BiIntFunction;
import io.servicetalk.concurrent.api.Completable;
Expand Down Expand Up @@ -54,15 +55,12 @@
import java.util.function.UnaryOperator;
import javax.annotation.Nullable;

import static io.servicetalk.concurrent.api.AsyncCloseables.emptyAsyncCloseable;
import static io.servicetalk.concurrent.api.AsyncCloseables.toAsyncCloseable;
import static io.servicetalk.concurrent.api.Completable.completed;
import static io.servicetalk.concurrent.api.Completable.failed;
import static io.servicetalk.concurrent.api.RetryStrategies.retryWithConstantBackoffDeltaJitter;
import static io.servicetalk.concurrent.api.RetryStrategies.retryWithConstantBackoffFullJitter;
import static io.servicetalk.concurrent.api.RetryStrategies.retryWithExponentialBackoffDeltaJitter;
import static io.servicetalk.concurrent.api.RetryStrategies.retryWithExponentialBackoffFullJitter;
import static io.servicetalk.concurrent.api.SourceAdapters.toSource;
import static io.servicetalk.http.api.HeaderUtils.DEFAULT_HEADER_FILTER;
import static io.servicetalk.http.api.HttpContextKeys.HTTP_EXECUTION_STRATEGY_KEY;
import static io.servicetalk.http.api.HttpHeaderNames.EXPECT;
Expand Down Expand Up @@ -129,15 +127,10 @@ public HttpExecutionStrategy requiredOffloads() {
}

final class ContextAwareRetryingHttpClientFilter extends StreamingHttpClientFilter {

@Nullable
private Completable sdStatus;

@Nullable
private AsyncCloseable closeAsync;

@Nullable
private LoadBalancerReadySubscriber loadBalancerReadySubscriber;
private Publisher<Object> lbEventStream;

/**
* Create a new instance.
Expand All @@ -150,21 +143,8 @@ private ContextAwareRetryingHttpClientFilter(final FilterableStreamingHttpClient

void inject(@Nullable final Publisher<Object> lbEventStream,
@Nullable final Completable sdStatus) {
assert lbEventStream != null;
assert sdStatus != null;
this.sdStatus = ignoreSdErrors ? null : sdStatus;

if (waitForLb) {
loadBalancerReadySubscriber = new LoadBalancerReadySubscriber();
closeAsync = toAsyncCloseable(__ -> {
loadBalancerReadySubscriber.cancel();
return completed();
});
toSource(lbEventStream).subscribe(loadBalancerReadySubscriber);
} else {
loadBalancerReadySubscriber = null;
closeAsync = emptyAsyncCloseable();
}
this.sdStatus = ignoreSdErrors ? null : requireNonNull(sdStatus);
this.lbEventStream = waitForLb ? requireNonNull(lbEventStream) : null;
}

private final class OuterRetryStrategy implements BiIntFunction<Throwable, Completable> {
Expand All @@ -187,9 +167,37 @@ public Completable apply(final int count, final Throwable t) {
return failed(t);
}

if (loadBalancerReadySubscriber != null && t instanceof NoAvailableHostException) {
if (lbEventStream != null && t instanceof NoAvailableHostException) {
++lbNotReadyCount;
final Completable onHostsAvailable = loadBalancerReadySubscriber.onHostsAvailable();
final Completable onHostsAvailable = lbEventStream
.liftSync(subscriber -> new Subscriber<Object>() {
@Override
public void onSubscribe(final Subscription subscription) {
subscriber.onSubscribe(subscription);
}

@Override
public void onNext(@Nullable final Object o) {
subscriber.onNext(o);
}

@Override
public void onError(final Throwable t1) {
subscriber.onError(t1);
}

@Override
public void onComplete() {
subscriber.onError(new IllegalStateException("Subscriber listening for " +
LoadBalancerReadyEvent.class.getSimpleName() +
" completed unexpectedly"));
}
})
.takeWhile(lbEvent ->
// Don't complete until we get a LoadBalancerReadyEvent that is ready.
!(lbEvent instanceof LoadBalancerReadyEvent) ||
!((LoadBalancerReadyEvent) lbEvent).isReady())
.ignoreElements();
return sdStatus == null ? onHostsAvailable : onHostsAvailable.ambWith(sdStatus);
}

Expand Down Expand Up @@ -264,22 +272,6 @@ protected Single<StreamingHttpResponse> request(final StreamingHttpRequester del
// duplicatedRequest isn't used below because retryWhen must be applied outside the defer operator for (2).
return single.retryWhen(retryStrategy(request, executionContext()));
}

@Override
public Completable closeAsync() {
if (closeAsync != null) {
closeAsync.closeAsync();
}
return super.closeAsync();
}

@Override
public Completable closeAsyncGracefully() {
if (closeAsync != null) {
closeAsync.closeAsyncGracefully();
}
return super.closeAsyncGracefully();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import io.servicetalk.http.api.FilterableStreamingHttpConnection;
import io.servicetalk.http.api.Http2Exception;
import io.servicetalk.http.api.HttpCookiePair;
import io.servicetalk.http.api.HttpEventKey;
import io.servicetalk.http.api.HttpExecutionStrategy;
import io.servicetalk.http.api.HttpHeaders;
import io.servicetalk.http.api.HttpHeadersFactory;
Expand Down Expand Up @@ -1636,12 +1635,13 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
Processor<Buffer, Buffer> requestPayload = newProcessor();
client.request(client.post("/0").payloadBody(fromSource(requestPayload))).toFuture().get();

Iterator<? extends ConsumableEvent<Integer>> maxItr = maxConcurrentPubQueue.take().toIterable().iterator();

serverChannelLatch.await();
Channel serverParentChannel = serverParentChannelRef.get();
serverParentChannel.writeAndFlush(new DefaultHttp2SettingsFrame(
new Http2Settings().maxConcurrentStreams(expectedMaxConcurrent))).sync();

Iterator<? extends ConsumableEvent<Integer>> maxItr = maxConcurrentPubQueue.take().toIterable().iterator();
// Verify that the initial maxConcurrency value is the default number
assertThat("No initial maxConcurrency value", maxItr.hasNext(), is(true));
ConsumableEvent<Integer> next = maxItr.next();
Expand Down Expand Up @@ -2011,22 +2011,12 @@ public void onComplete() {
}

private static final class TestConnectionFilter extends StreamingHttpConnectionFilter {
private final Publisher<? extends ConsumableEvent<Integer>> maxConcurrent;

TestConnectionFilter(final FilterableStreamingHttpConnection delegate,
Queue<FilterableStreamingHttpConnection> connectionQueue,
Queue<Publisher<? extends ConsumableEvent<Integer>>> maxConcurrentPubQueue) {
super(delegate);
maxConcurrent = delegate.transportEventStream(MAX_CONCURRENCY_NO_OFFLOADING).multicast(2);
connectionQueue.add(delegate);
maxConcurrentPubQueue.add(maxConcurrent);
}

@SuppressWarnings("unchecked")
@Override
public <T> Publisher<? extends T> transportEventStream(final HttpEventKey<T> eventKey) {
return eventKey == MAX_CONCURRENCY_NO_OFFLOADING ? (Publisher<? extends T>) maxConcurrent :
super.transportEventStream(eventKey);
maxConcurrentPubQueue.add(delegate.transportEventStream(MAX_CONCURRENCY_NO_OFFLOADING));
}
}

Expand Down
Loading

0 comments on commit 93531d5

Please sign in to comment.