Skip to content

Commit

Permalink
Use Publisher#replay operator (#2700)
Browse files Browse the repository at this point in the history
Motivation:
Publisher#replay was recently added, and there are use cases
that can be simplified/improved by using this new operator.
  • Loading branch information
Scottmitch authored Oct 4, 2023
1 parent b576131 commit d8e55aa
Show file tree
Hide file tree
Showing 12 changed files with 65 additions and 277 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ default Single<C> newConnection(@Nullable ContextMap context) {
/**
* A {@link Publisher} of events provided by this {@link LoadBalancer}. This maybe used to broadcast internal state
* of this {@link LoadBalancer} to provide hints/visibility for external usage.
* <p>
* Note the {@link Publisher} maybe subscribed to multiple times. It is recommended that implementations use
* operators such as {@link Publisher#replay(int)} (or similar) to support this use case.
* @return A {@link Publisher} of events provided by this {@link LoadBalancer}.
*/
Publisher<Object> eventStream();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public DefaultHealthService() {
*/
public DefaultHealthService(Predicate<String> watchAllowed) {
this.watchAllowed = requireNonNull(watchAllowed);
serviceToStatusMap.put(OVERALL_SERVICE_NAME, new HealthValue(SERVING));
serviceToStatusMap.put(OVERALL_SERVICE_NAME, HealthValue.newInstance(SERVING));
}

@Override
Expand All @@ -85,7 +85,7 @@ public Single<HealthCheckResponse> check(final GrpcServiceContext ctx, final Hea
return Single.failed(new GrpcStatusException(
new GrpcStatus(NOT_FOUND, "unknown service: " + request.getService())));
}
return Single.succeeded(health.last);
return health.publisher.takeAtMost(1).firstOrError();
}

@Override
Expand All @@ -103,13 +103,13 @@ public Publisher<HealthCheckResponse> watch(final GrpcServiceContext ctx, final
return Publisher.from(newBuilder().setStatus(NOT_SERVING).build());
}
healthValue = serviceToStatusMap.computeIfAbsent(request.getService(),
__ -> new HealthValue(SERVICE_UNKNOWN));
__ -> HealthValue.newInstance(SERVICE_UNKNOWN));
} finally {
lock.unlock();
}
}

return Publisher.from(healthValue.last).concat(healthValue.publisher);
return healthValue.publisher;
}

/**
Expand All @@ -130,7 +130,7 @@ public boolean setStatus(String service, ServingStatus status) {
return false;
}
resp = newBuilder().setStatus(status).build();
healthValue = serviceToStatusMap.computeIfAbsent(service, __ -> new HealthValue(resp));
healthValue = serviceToStatusMap.computeIfAbsent(service, __ -> new HealthValue());
} finally {
lock.unlock();
}
Expand Down Expand Up @@ -181,24 +181,29 @@ public boolean terminate() {
private static final class HealthValue {
private final Processor<HealthCheckResponse, HealthCheckResponse> processor;
private final Publisher<HealthCheckResponse> publisher;
private volatile HealthCheckResponse last;

private HealthValue(final HealthCheckResponse initialState) {
HealthValue() {
this.processor = newPublisherProcessorDropHeadOnOverflow(4);
this.publisher = fromSource(processor)
// Allow multiple subscribers to Subscribe to the resulting Publisher.
.multicast(1, false);
this.last = initialState;
// Allow multiple subscribers to Subscribe to the resulting Publisher, use a history of 1
// so each new subscriber gets the latest state.
.replay(1);
// Maintain a Subscriber so signals are always delivered to replay and new Subscribers get the latest
// signal.
publisher.ignoreElements().subscribe();
}

private HealthValue(final ServingStatus status) {
this(newBuilder().setStatus(status).build());
static HealthValue newInstance(final HealthCheckResponse initialState) {
HealthValue value = new HealthValue();
value.next(initialState);
return value;
}

static HealthValue newInstance(final ServingStatus status) {
return newInstance(newBuilder().setStatus(status).build());
}

void next(HealthCheckResponse response) {
// Set the status here instead of in an operator because we need the status to be updated regardless if
// anyone is consuming the status.
last = response;
processor.onNext(response);
}

Expand All @@ -208,7 +213,12 @@ void next(HealthCheckResponse response) {
* @param status The last status to set.
*/
void completeMultipleTerminalSafe(ServingStatus status) {
next(newBuilder().setStatus(status).build());
try {
next(newBuilder().setStatus(status).build());
} catch (Throwable cause) {
processor.onError(cause);
return;
}
processor.onComplete();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,12 @@ abstract class AbstractStreamingHttpConnection<CC extends NettyConnectionContext
this.connection = requireNonNull(conn);
this.connectionContext = new DefaultNettyHttpConnectionContext(conn);
this.reqRespFactory = requireNonNull(reqRespFactory);
// This Publisher currently provides replay() semantics in that all sources support multiple subscribers and
// from(..)/succeeded(..) will provide the same state to every subscriber. If these semantics change the
// replay() operator should be used to preserve these semantics.
maxConcurrencySetting = from(new IgnoreConsumedEvent<>(maxPipelinedRequests))
.concat(connection.onClosing())
.concat(succeeded(ZERO_MAX_CONCURRENCY_EVENT))
.multicast(1); // Allows multiple Subscribers to consume the event stream.
.concat(succeeded(ZERO_MAX_CONCURRENCY_EVENT));
this.headersFactory = headersFactory;
this.allowDropTrailersReadFromTransport = allowDropTrailersReadFromTransport;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,10 @@ private static final class DefaultH2ClientParentConnection extends AbstractH2Par
maxConcurrencyProcessor.onNext(DEFAULT_H2_MAX_CONCURRENCY_EVENT);
bs = new Http2StreamChannelBootstrap(connection.channel());
maxConcurrencyPublisher = fromSource(maxConcurrencyProcessor)
.multicast(1); // Allows multiple Subscribers to consume the event stream.
.replay(1); // Allow multiple Subscribers to consume, new Subscribers get last signal.
// Maintain a Subscriber so signals are always delivered to replay and new Subscribers get the latest
// signal.
maxConcurrencyPublisher.ignoreElements().subscribe();
}

@Override
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import io.servicetalk.client.api.LoadBalancerReadyEvent;
import io.servicetalk.client.api.NoAvailableHostException;
import io.servicetalk.client.api.ServiceDiscoverer;
import io.servicetalk.concurrent.api.AsyncCloseable;
import io.servicetalk.concurrent.api.AsyncContext;
import io.servicetalk.concurrent.api.BiIntFunction;
import io.servicetalk.concurrent.api.Completable;
Expand Down Expand Up @@ -54,15 +53,12 @@
import java.util.function.UnaryOperator;
import javax.annotation.Nullable;

import static io.servicetalk.concurrent.api.AsyncCloseables.emptyAsyncCloseable;
import static io.servicetalk.concurrent.api.AsyncCloseables.toAsyncCloseable;
import static io.servicetalk.concurrent.api.Completable.completed;
import static io.servicetalk.concurrent.api.Completable.failed;
import static io.servicetalk.concurrent.api.RetryStrategies.retryWithConstantBackoffDeltaJitter;
import static io.servicetalk.concurrent.api.RetryStrategies.retryWithConstantBackoffFullJitter;
import static io.servicetalk.concurrent.api.RetryStrategies.retryWithExponentialBackoffDeltaJitter;
import static io.servicetalk.concurrent.api.RetryStrategies.retryWithExponentialBackoffFullJitter;
import static io.servicetalk.concurrent.api.SourceAdapters.toSource;
import static io.servicetalk.http.api.HeaderUtils.DEFAULT_HEADER_FILTER;
import static io.servicetalk.http.api.HttpContextKeys.HTTP_EXECUTION_STRATEGY_KEY;
import static io.servicetalk.http.api.HttpHeaderNames.EXPECT;
Expand Down Expand Up @@ -129,15 +125,10 @@ public HttpExecutionStrategy requiredOffloads() {
}

final class ContextAwareRetryingHttpClientFilter extends StreamingHttpClientFilter {

@Nullable
private Completable sdStatus;

@Nullable
private AsyncCloseable closeAsync;

@Nullable
private LoadBalancerReadySubscriber loadBalancerReadySubscriber;
private Publisher<Object> lbEventStream;

/**
* Create a new instance.
Expand All @@ -150,21 +141,8 @@ private ContextAwareRetryingHttpClientFilter(final FilterableStreamingHttpClient

void inject(@Nullable final Publisher<Object> lbEventStream,
@Nullable final Completable sdStatus) {
assert lbEventStream != null;
assert sdStatus != null;
this.sdStatus = ignoreSdErrors ? null : sdStatus;

if (waitForLb) {
loadBalancerReadySubscriber = new LoadBalancerReadySubscriber();
closeAsync = toAsyncCloseable(__ -> {
loadBalancerReadySubscriber.cancel();
return completed();
});
toSource(lbEventStream).subscribe(loadBalancerReadySubscriber);
} else {
loadBalancerReadySubscriber = null;
closeAsync = emptyAsyncCloseable();
}
this.sdStatus = ignoreSdErrors ? null : requireNonNull(sdStatus);
this.lbEventStream = waitForLb ? requireNonNull(lbEventStream) : null;
}

private final class OuterRetryStrategy implements BiIntFunction<Throwable, Completable> {
Expand All @@ -187,9 +165,17 @@ public Completable apply(final int count, final Throwable t) {
return failed(t);
}

if (loadBalancerReadySubscriber != null && t instanceof NoAvailableHostException) {
if (lbEventStream != null && t instanceof NoAvailableHostException) {
++lbNotReadyCount;
final Completable onHostsAvailable = loadBalancerReadySubscriber.onHostsAvailable();
final Completable onHostsAvailable = lbEventStream
.onCompleteError(() -> new IllegalStateException("Subscriber listening for " +
LoadBalancerReadyEvent.class.getSimpleName() +
" completed unexpectedly"))
.takeWhile(lbEvent ->
// Don't complete until we get a LoadBalancerReadyEvent that is ready.
!(lbEvent instanceof LoadBalancerReadyEvent &&
((LoadBalancerReadyEvent) lbEvent).isReady()))
.ignoreElements();
return sdStatus == null ? onHostsAvailable : onHostsAvailable.ambWith(sdStatus);
}

Expand Down Expand Up @@ -264,22 +250,6 @@ protected Single<StreamingHttpResponse> request(final StreamingHttpRequester del
// duplicatedRequest isn't used below because retryWhen must be applied outside the defer operator for (2).
return single.retryWhen(retryStrategy(request, executionContext()));
}

@Override
public Completable closeAsync() {
if (closeAsync != null) {
closeAsync.closeAsync();
}
return super.closeAsync();
}

@Override
public Completable closeAsyncGracefully() {
if (closeAsync != null) {
closeAsync.closeAsyncGracefully();
}
return super.closeAsyncGracefully();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import io.servicetalk.http.api.StreamingHttpRequest;
import io.servicetalk.http.api.StreamingHttpResponse;
import io.servicetalk.http.netty.RetryingHttpRequesterFilter.BackOffPolicy;
import io.servicetalk.http.netty.StreamObserverTest.MulticastTransportEventsStreamingHttpConnectionFilter;
import io.servicetalk.transport.api.HostAndPort;
import io.servicetalk.transport.netty.internal.ExecutionContextExtension;

Expand Down Expand Up @@ -135,9 +134,9 @@ protected void channelRead0(final ChannelHandlerContext ctx, final Http2HeadersF
});
}
}
}, parentPipeline -> {
serverParentChannel.set(parentPipeline.channel());
}, h2Builder -> {
},
parentPipeline -> serverParentChannel.set(parentPipeline.channel()),
h2Builder -> {
h2Builder.initialSettings().maxConcurrentStreams(maxConcurrentStreams);
return h2Builder;
});
Expand All @@ -162,7 +161,6 @@ void noMaxActiveStreamsViolatedErrorAfterCancel() throws Exception {
assert serverAddress != null;
try (HttpClient client = newClientBuilder(serverAddress, CLIENT_CTX, HTTP_2)
.appendClientFilter(disableAutoRetries()) // All exceptions should be propagated
.appendConnectionFilter(MulticastTransportEventsStreamingHttpConnectionFilter.INSTANCE)
.appendConnectionFilter(connection -> new StreamingHttpConnectionFilter(connection) {
@Override
public Single<StreamingHttpResponse> request(StreamingHttpRequest request) {
Expand Down Expand Up @@ -252,7 +250,6 @@ private void noMaxActiveStreamsViolatedErrorWhenLimitChanges(boolean increase,
assert serverAddress != null;
try (HttpClient client = newClientBuilder(serverAddress, CLIENT_CTX, HTTP_2)
.executionStrategy(strategy)
.appendConnectionFilter(MulticastTransportEventsStreamingHttpConnectionFilter.INSTANCE)
// Don't allow more than 1 connection:
.appendConnectionFactoryFilter(LimitingConnectionFactoryFilter.withMax(1))
// Retry all ConnectionLimitReachedException(s), don't retry RetryableException(s):
Expand Down Expand Up @@ -324,7 +321,6 @@ void maxActiveStreamsOutsideIntRange() throws Exception {
assert serverAddress != null;
assertThat(MAX_UNSIGNED_INT, is(greaterThan((long) Integer.MAX_VALUE)));
try (HttpClient client = newClientBuilder(serverAddress, CLIENT_CTX, HTTP_2)
.appendConnectionFilter(MulticastTransportEventsStreamingHttpConnectionFilter.INSTANCE)
.protocols(HTTP_2.config)
.build()) {

Expand Down
Loading

0 comments on commit d8e55aa

Please sign in to comment.