From 72d415112771f488721509cbfca932138d30cf00 Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Thu, 9 Jan 2025 15:28:40 -0700 Subject: [PATCH 01/10] Wip of leak detection working as intended --- .../java/io/servicetalk/grpc/LeakRepro.java | 99 ++++++++++++++ .../src/test/proto/servicetalkloeak.proto | 12 ++ ...tpMessageDiscardWatchdogServiceFilter.java | 2 +- .../servicetalk/http/netty/LeakDetection.java | 129 ++++++++++++++++++ .../netty/SpliceFlatStreamToMetaSingle.java | 5 +- 5 files changed, 244 insertions(+), 3 deletions(-) create mode 100644 servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/LeakRepro.java create mode 100644 servicetalk-grpc-netty/src/test/proto/servicetalkloeak.proto create mode 100644 servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/LeakDetection.java diff --git a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/LeakRepro.java b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/LeakRepro.java new file mode 100644 index 0000000000..d238f6e8ae --- /dev/null +++ b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/LeakRepro.java @@ -0,0 +1,99 @@ +package io.servicetalk.grpc; + +import com.apple.servicetalkleak.Message; +import com.apple.servicetalkleak.ServiceTalkLeak; +import io.netty.buffer.ByteBufUtil; +import io.servicetalk.concurrent.api.Publisher; +import io.servicetalk.concurrent.api.Single; +import io.servicetalk.grpc.api.GrpcServiceContext; +import io.servicetalk.grpc.api.GrpcStatusCode; +import io.servicetalk.grpc.api.GrpcStatusException; +import io.servicetalk.grpc.netty.GrpcClients; +import io.servicetalk.grpc.netty.GrpcServers; +import io.servicetalk.http.netty.HttpProtocolConfigs; +import io.servicetalk.http.netty.SpliceFlatStreamToMetaSingle; +import io.servicetalk.logging.api.LogLevel; +import io.servicetalk.transport.api.HostAndPort; +import io.servicetalk.transport.api.IoExecutor; +import io.servicetalk.transport.netty.internal.NettyIoExecutors; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static io.servicetalk.concurrent.api.internal.BlockingUtils.blockingInvocation; +import static org.junit.jupiter.api.Assertions.assertFalse; + +public class LeakRepro { + + private static final Logger LOGGER = LoggerFactory.getLogger(LeakRepro.class); + + static boolean leakDetected = false; + + static { + System.setProperty("io.netty.leakDetection.level", "paranoid"); + ByteBufUtil.setLeakListener((type, records) -> { + leakDetected = true; + LOGGER.error("ByteBuf leak detected!"); + }); + } + + IoExecutor serverExecutor = NettyIoExecutors.createIoExecutor(1, "server"); + IoExecutor clientExecutor = NettyIoExecutors.createIoExecutor(1, "client"); + + @SuppressWarnings("resource") + @Test + public void testLeak() throws Exception { + GrpcServers.forPort(8888) + .initializeHttp(b -> b + .ioExecutor(serverExecutor) + .executor(serverExecutor)) + .listenAndAwait(new ServiceTalkLeak.ServiceTalkLeakService() { + @Override + public Publisher rpc(GrpcServiceContext ctx, Publisher request) { + Publisher response = splice(request) + .flatMapPublisher(pair -> { + LOGGER.info("Initial message: " + pair.head); + return Publisher.failed(new GrpcStatusException(GrpcStatusCode.INVALID_ARGUMENT.status())); + }); + return response; + } + }); + + ServiceTalkLeak.ServiceTalkLeakClient client = GrpcClients.forAddress(HostAndPort.of("127.0.0.1", 8888)) + .initializeHttp(b -> b + .protocols(HttpProtocolConfigs.h2().enableFrameLogging("CLIENT", LogLevel.INFO, () -> true).build()) + .ioExecutor(clientExecutor) + .executor(clientExecutor)) + .build(new ServiceTalkLeak.ClientFactory()); + + for (int i = 0; i < 10; i++) { + LOGGER.info("Iteration {}", i); + blockingInvocation( + client.rpc( + Publisher.from( + Message.newBuilder().setValue("first message").build(), + Message.newBuilder().setValue("second message (which leaks)").build())) + .ignoreElements() + .onErrorComplete()); + + System.gc(); + System.runFinalization(); + } + + assertFalse(leakDetected); + } + + private static Single splice(Publisher request) { + return request.liftSyncToSingle(new SpliceFlatStreamToMetaSingle<>(Pair::new)); + } + + private static final class Pair { + final Message head; + final Publisher stream; + + public Pair(Message head, Publisher stream) { + this.head = head; + this.stream = stream; + } + } +} \ No newline at end of file diff --git a/servicetalk-grpc-netty/src/test/proto/servicetalkloeak.proto b/servicetalk-grpc-netty/src/test/proto/servicetalkloeak.proto new file mode 100644 index 0000000000..e17934d3d7 --- /dev/null +++ b/servicetalk-grpc-netty/src/test/proto/servicetalkloeak.proto @@ -0,0 +1,12 @@ +syntax = "proto3"; + +option java_multiple_files = true; +option java_package = "com.apple.servicetalkleak"; + +message Message { + string value = 1; +} + +service ServiceTalkLeak { + rpc Rpc(stream Message) returns (stream Message); +} diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogServiceFilter.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogServiceFilter.java index 875acf0291..4c2fc8d4d5 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogServiceFilter.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogServiceFilter.java @@ -74,7 +74,7 @@ public Single handle(final HttpServiceContext ctx, final StreamingHttpRequest request, final StreamingHttpResponseFactory responseFactory) { return delegate() - .handle(ctx, request, responseFactory) + .handle(ctx, request.transformMessageBody(LeakDetection::instrument), responseFactory) .map(response -> { // always write the buffer publisher into the request context. When a downstream subscriber // arrives, mark the message as subscribed explicitly (having a message present and no diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/LeakDetection.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/LeakDetection.java new file mode 100644 index 0000000000..ca508cc155 --- /dev/null +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/LeakDetection.java @@ -0,0 +1,129 @@ +package io.servicetalk.http.netty; + +import io.servicetalk.concurrent.Cancellable; +import io.servicetalk.concurrent.PublisherSource.Subscriber; +import io.servicetalk.concurrent.PublisherSource.Subscription; +import io.servicetalk.concurrent.api.Publisher; +import io.servicetalk.concurrent.api.SourceAdapters; +import io.servicetalk.concurrent.internal.CancelImmediatelySubscriber; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +final class LeakDetection { + + private static final Logger LOGGER = LoggerFactory.getLogger(LeakDetection.class); + private LeakDetection() { + // no instances. + } + + static Publisher instrument(Publisher publisher) { + FinishedToken token = new FinishedToken(publisher); + return publisher.liftSync(subscriber -> new InstrumentedSubscriber<>(subscriber, token)); + } + private static final class InstrumentedSubscriber implements Subscriber { + + private final Subscriber delegate; + private final FinishedToken token; + + public InstrumentedSubscriber(Subscriber delegate, FinishedToken token) { + this.delegate = delegate; + this.token = token; + } + + @Override + public void onSubscribe(Subscription subscription) { + token.subscribed(subscription); + delegate.onSubscribe(new Subscription() { + @Override + public void request(long n) { + subscription.request(n); + } + + @Override + public void cancel() { + token.doComplete(); + subscription.cancel(); + } + }); + } + + @Override + public void onNext(@Nullable T t) { + delegate.onNext(t); + } + + @Override + public void onError(Throwable t) { + token.doComplete(); + delegate.onError(t); + } + + @Override + public void onComplete() { + token.doComplete(); + delegate.onComplete(); + } + + + } + + private static final class FinishedToken { + + private static final AtomicReferenceFieldUpdater UPDATER = + AtomicReferenceFieldUpdater.newUpdater(FinishedToken.class, Object.class,"state"); + private static final String COMPLETE = "complete"; + + volatile Object state; + + public FinishedToken(Publisher parent) { + this.state = parent; + } + + void doComplete() { + UPDATER.set(this, COMPLETE); + } + + private boolean checkComplete() { + Object previous = UPDATER.getAndSet(this, COMPLETE); + if (previous != COMPLETE) { + // This means something leaked. + if (previous instanceof Publisher) { + // never subscribed to. + SourceAdapters.toSource((Publisher) previous).subscribe(CancelImmediatelySubscriber.INSTANCE); + } else { + assert previous instanceof Cancellable; + Cancellable cancellable = (Cancellable) previous; + cancellable.cancel(); + } + return true; + } else { + return false; + } + } + + void subscribed(Subscription subscription) { + while (true) { + Object old = UPDATER.get(this); + if (old == COMPLETE || old instanceof Subscription) { + // TODO: What to do here? + LOGGER.debug("Publisher subscribed to multiple times."); + return; + } else if (UPDATER.compareAndSet(this, old, subscription)) { + return; + } + } + } + + // TODO: move this to a phantom reference approach. + @Override + protected void finalize() throws Throwable { + super.finalize(); + if (checkComplete()) { + LOGGER.warn("LEAK detected."); + } + } + } +} diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/SpliceFlatStreamToMetaSingle.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/SpliceFlatStreamToMetaSingle.java index 3c0025a2e3..0e29512777 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/SpliceFlatStreamToMetaSingle.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/SpliceFlatStreamToMetaSingle.java @@ -52,7 +52,8 @@ * @param type of meta-data in front of the stream of {@link Payload}, eg. {@link HttpResponseMetaData} * @param type of payload inside the {@link Data}, eg. {@link Buffer} */ -final class SpliceFlatStreamToMetaSingle implements PublisherToSingleOperator { +// TODO: revert: this shouldn't be public. +public final class SpliceFlatStreamToMetaSingle implements PublisherToSingleOperator { private static final Logger LOGGER = LoggerFactory.getLogger(SpliceFlatStreamToMetaSingle.class); private final BiFunction, Data> packer; @@ -64,7 +65,7 @@ final class SpliceFlatStreamToMetaSingle implements Pub * @param packer function to pack the {@link Publisher}<{@link Payload}> and {@link MetaData} into a * {@link Data} */ - SpliceFlatStreamToMetaSingle(BiFunction, Data> packer) { + public SpliceFlatStreamToMetaSingle(BiFunction, Data> packer) { this.packer = requireNonNull(packer); } From 47d1fcfbb0f037af38ac6a00f824b88fb2638acf Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Fri, 10 Jan 2025 12:27:50 -0700 Subject: [PATCH 02/10] Cleanup and weak ref based removal --- .../java/io/servicetalk/grpc/LeakRepro.java | 1 + ...ttpMessageDiscardWatchdogClientFilter.java | 23 +- ...tpMessageDiscardWatchdogServiceFilter.java | 34 ++- .../servicetalk/http/netty/LeakDetection.java | 129 ------------ .../http/netty/WatchdogLeakDetector.java | 196 ++++++++++++++++++ 5 files changed, 237 insertions(+), 146 deletions(-) delete mode 100644 servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/LeakDetection.java create mode 100644 servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/WatchdogLeakDetector.java diff --git a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/LeakRepro.java b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/LeakRepro.java index d238f6e8ae..15bb971707 100644 --- a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/LeakRepro.java +++ b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/LeakRepro.java @@ -30,6 +30,7 @@ public class LeakRepro { static boolean leakDetected = false; static { + System.setProperty("io.servicetalk.http.netty.leakdetection", "strict"); System.setProperty("io.netty.leakDetection.level", "paranoid"); ByteBufUtil.setLeakListener((type, records) -> { leakDetected = true; diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogClientFilter.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogClientFilter.java index c0bc77797f..2e38eace0f 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogClientFilter.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogClientFilter.java @@ -36,6 +36,8 @@ import java.util.concurrent.atomic.AtomicReference; import static io.servicetalk.http.netty.HttpMessageDiscardWatchdogServiceFilter.generifyAtomicReference; +import static io.servicetalk.http.netty.WatchdogLeakDetector.REQUEST_LEAK_MESSAGE; +import static io.servicetalk.http.netty.WatchdogLeakDetector.RESPONSE_LEAK_MESSAGE; /** * Filter which tracks message bodies and warns if they are not discarded properly. @@ -67,6 +69,17 @@ public StreamingHttpConnectionFilter create(final FilterableStreamingHttpConnect return new StreamingHttpConnectionFilter(connection) { @Override public Single request(final StreamingHttpRequest request) { + return WatchdogLeakDetector.strictDetection() ? requestStrict(request) : requestSimple(request); + } + + private Single requestStrict(final StreamingHttpRequest request) { + return delegate().request(request.transformMessageBody(publisher -> + WatchdogLeakDetector.instrument(publisher, REQUEST_LEAK_MESSAGE))) + .map(response -> response.transformMessageBody(publisher -> + WatchdogLeakDetector.instrument(publisher, RESPONSE_LEAK_MESSAGE))); + } + + private Single requestSimple(final StreamingHttpRequest request) { return delegate().request(request).map(response -> { // always write the buffer publisher into the request context. When a downstream subscriber // arrives, mark the message as subscribed explicitly (having a message present and no @@ -78,10 +91,7 @@ public Single request(final StreamingHttpRequest request) // If a previous message exists, the Single got resubscribed to // (i.e. during a retry) and so previous message body needs to be cleaned up by the // user. - LOGGER.warn("Discovered un-drained HTTP response message body which has " + - "been dropped by user code - this is a strong indication of a bug " + - "in a user-defined filter. Response payload (message) body must " + - "be fully consumed before retrying."); + LOGGER.warn(RESPONSE_LEAK_MESSAGE); } return response.transformMessageBody(msgPublisher -> msgPublisher.beforeSubscriber(() -> { @@ -112,10 +122,7 @@ protected Single request(final StreamingHttpRequester del if (maybePublisher != null && maybePublisher.getAndSet(null) != null) { // No-one subscribed to the message (or there is none), so if there is a message // tell the user to clean it up. - LOGGER.warn("Discovered un-drained HTTP response message body which has " + - "been dropped by user code - this is a strong indication of a bug " + - "in a user-defined filter. Response payload (message) body must " + - "be fully consumed before discarding.", cause); + LOGGER.warn(RESPONSE_LEAK_MESSAGE, cause); } }); } diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogServiceFilter.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogServiceFilter.java index 4c2fc8d4d5..c54389215c 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogServiceFilter.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogServiceFilter.java @@ -39,6 +39,9 @@ import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; +import static io.servicetalk.http.netty.WatchdogLeakDetector.REQUEST_LEAK_MESSAGE; +import static io.servicetalk.http.netty.WatchdogLeakDetector.RESPONSE_LEAK_MESSAGE; + /** * Filter which tracks message bodies and warns if they are not discarded properly. */ @@ -46,6 +49,7 @@ final class HttpMessageDiscardWatchdogServiceFilter implements StreamingHttpServ private static final Logger LOGGER = LoggerFactory.getLogger(HttpMessageDiscardWatchdogServiceFilter.class); + /** * Instance of {@link HttpMessageDiscardWatchdogServiceFilter}. */ @@ -69,12 +73,30 @@ private HttpMessageDiscardWatchdogServiceFilter() { public StreamingHttpServiceFilter create(final StreamingHttpService service) { return new StreamingHttpServiceFilter(service) { + @Override public Single handle(final HttpServiceContext ctx, + final StreamingHttpRequest request, + final StreamingHttpResponseFactory responseFactory) { + return WatchdogLeakDetector.strictDetection() ? + handleStrict(ctx, request, responseFactory) : handleSimple(ctx, request, responseFactory); + } + + private Single handleStrict(final HttpServiceContext ctx, + final StreamingHttpRequest request, + final StreamingHttpResponseFactory responseFactory) { + return delegate() + .handle(ctx, request.transformMessageBody(publisher -> + WatchdogLeakDetector.instrument(publisher, REQUEST_LEAK_MESSAGE)), responseFactory) + .map(response -> response.transformMessageBody(publisher -> + WatchdogLeakDetector.instrument(publisher, RESPONSE_LEAK_MESSAGE))); + } + + private Single handleSimple(final HttpServiceContext ctx, final StreamingHttpRequest request, final StreamingHttpResponseFactory responseFactory) { return delegate() - .handle(ctx, request.transformMessageBody(LeakDetection::instrument), responseFactory) + .handle(ctx, request, responseFactory) .map(response -> { // always write the buffer publisher into the request context. When a downstream subscriber // arrives, mark the message as subscribed explicitly (having a message present and no @@ -86,10 +108,7 @@ public Single handle(final HttpServiceContext ctx, // If a previous message exists, the Single got resubscribed to // (i.e. during a retry) and so previous message body needs to be cleaned up by the // user. - LOGGER.warn("Discovered un-drained HTTP response message body which has " + - "been dropped by user code - this is a strong indication of a bug " + - "in a user-defined filter. Responses (or their message body) must " + - "be fully consumed before retrying."); + LOGGER.warn(RESPONSE_LEAK_MESSAGE); } return response.transformMessageBody(msgPublisher -> msgPublisher.beforeSubscriber(() -> { @@ -173,10 +192,7 @@ public void onExchangeFinally() { if (maybePublisher != null && maybePublisher.get() != null) { // No-one subscribed to the message (or there is none), so if there is a message // tell the user to clean it up. - LOGGER.warn("Discovered un-drained HTTP response message body which has " + - "been dropped by user code - this is a strong indication of a bug " + - "in a user-defined filter. Responses (or their message body) must " + - "be fully consumed before discarding."); + LOGGER.warn(RESPONSE_LEAK_MESSAGE); } } } diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/LeakDetection.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/LeakDetection.java deleted file mode 100644 index ca508cc155..0000000000 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/LeakDetection.java +++ /dev/null @@ -1,129 +0,0 @@ -package io.servicetalk.http.netty; - -import io.servicetalk.concurrent.Cancellable; -import io.servicetalk.concurrent.PublisherSource.Subscriber; -import io.servicetalk.concurrent.PublisherSource.Subscription; -import io.servicetalk.concurrent.api.Publisher; -import io.servicetalk.concurrent.api.SourceAdapters; -import io.servicetalk.concurrent.internal.CancelImmediatelySubscriber; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.Nullable; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; - -final class LeakDetection { - - private static final Logger LOGGER = LoggerFactory.getLogger(LeakDetection.class); - private LeakDetection() { - // no instances. - } - - static Publisher instrument(Publisher publisher) { - FinishedToken token = new FinishedToken(publisher); - return publisher.liftSync(subscriber -> new InstrumentedSubscriber<>(subscriber, token)); - } - private static final class InstrumentedSubscriber implements Subscriber { - - private final Subscriber delegate; - private final FinishedToken token; - - public InstrumentedSubscriber(Subscriber delegate, FinishedToken token) { - this.delegate = delegate; - this.token = token; - } - - @Override - public void onSubscribe(Subscription subscription) { - token.subscribed(subscription); - delegate.onSubscribe(new Subscription() { - @Override - public void request(long n) { - subscription.request(n); - } - - @Override - public void cancel() { - token.doComplete(); - subscription.cancel(); - } - }); - } - - @Override - public void onNext(@Nullable T t) { - delegate.onNext(t); - } - - @Override - public void onError(Throwable t) { - token.doComplete(); - delegate.onError(t); - } - - @Override - public void onComplete() { - token.doComplete(); - delegate.onComplete(); - } - - - } - - private static final class FinishedToken { - - private static final AtomicReferenceFieldUpdater UPDATER = - AtomicReferenceFieldUpdater.newUpdater(FinishedToken.class, Object.class,"state"); - private static final String COMPLETE = "complete"; - - volatile Object state; - - public FinishedToken(Publisher parent) { - this.state = parent; - } - - void doComplete() { - UPDATER.set(this, COMPLETE); - } - - private boolean checkComplete() { - Object previous = UPDATER.getAndSet(this, COMPLETE); - if (previous != COMPLETE) { - // This means something leaked. - if (previous instanceof Publisher) { - // never subscribed to. - SourceAdapters.toSource((Publisher) previous).subscribe(CancelImmediatelySubscriber.INSTANCE); - } else { - assert previous instanceof Cancellable; - Cancellable cancellable = (Cancellable) previous; - cancellable.cancel(); - } - return true; - } else { - return false; - } - } - - void subscribed(Subscription subscription) { - while (true) { - Object old = UPDATER.get(this); - if (old == COMPLETE || old instanceof Subscription) { - // TODO: What to do here? - LOGGER.debug("Publisher subscribed to multiple times."); - return; - } else if (UPDATER.compareAndSet(this, old, subscription)) { - return; - } - } - } - - // TODO: move this to a phantom reference approach. - @Override - protected void finalize() throws Throwable { - super.finalize(); - if (checkComplete()) { - LOGGER.warn("LEAK detected."); - } - } - } -} diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/WatchdogLeakDetector.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/WatchdogLeakDetector.java new file mode 100644 index 0000000000..4ab0e70feb --- /dev/null +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/WatchdogLeakDetector.java @@ -0,0 +1,196 @@ +package io.servicetalk.http.netty; + +import io.servicetalk.concurrent.Cancellable; +import io.servicetalk.concurrent.PublisherSource.Subscriber; +import io.servicetalk.concurrent.PublisherSource.Subscription; +import io.servicetalk.concurrent.api.Executors; +import io.servicetalk.concurrent.api.Publisher; +import io.servicetalk.concurrent.api.SourceAdapters; +import io.servicetalk.concurrent.internal.CancelImmediatelySubscriber; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.lang.ref.Reference; +import java.lang.ref.ReferenceQueue; +import java.lang.ref.WeakReference; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +final class WatchdogLeakDetector { + + static final String REQUEST_LEAK_MESSAGE = + "Discovered un-drained HTTP request message body which has " + + "been dropped by user code - this is a strong indication of a bug " + + "in a user-defined filter. Requests (or their message body) must " + + "be fully consumed before retrying."; + + static final String RESPONSE_LEAK_MESSAGE = + "Discovered un-drained HTTP response message body which has " + + "been dropped by user code - this is a strong indication of a bug " + + "in a user-defined filter. Responses (or their message body) must " + + "be fully consumed before retrying."; + + private static final Logger LOGGER = LoggerFactory.getLogger(WatchdogLeakDetector.class); + + private static final WatchdogLeakDetector INSTANCE = new WatchdogLeakDetector(); + + private static final String PROPERTY_NAME = "io.servicetalk.http.netty.leakdetection"; + + private static final String STRICT_MODE = "strict"; + + private static final boolean STRICT_DETECTION; + + static { + String prop = System.getProperty(PROPERTY_NAME); + STRICT_DETECTION = prop != null && prop.equalsIgnoreCase(STRICT_MODE); + } + + private final ReferenceQueue refQueue = new ReferenceQueue<>(); + private final Map, CleanupState> allRefs = new ConcurrentHashMap<>(); + + private WatchdogLeakDetector() { + // Singleton. + } + + static Publisher instrument(Publisher publisher, String message) { + return INSTANCE.instrument0(publisher, message); + } + + static boolean strictDetection() { + return STRICT_DETECTION; + } + + private Publisher instrument0(Publisher publisher, String message) { + maybeCleanRefs(); + CleanupState cleanupState = new CleanupState(publisher, message); + Publisher result = publisher.liftSync(subscriber -> new InstrumentedSubscriber<>(subscriber, cleanupState)); + Reference ref = new WeakReference<>(result, refQueue); + allRefs.put(ref, cleanupState); + return result; + } + + private void maybeCleanRefs() { + final Reference testRef = refQueue.poll(); + if (testRef != null) { + // There are references to be cleaned but don't do it on this thread. + // TODO: what executor should we really use? + Executors.global().submit(() -> { + Reference ref = testRef; + do { + ref.clear(); + CleanupState cleanupState = allRefs.remove(ref); + if (cleanupState != null) { + cleanupState.check(); + } + } while ((ref = refQueue.poll()) != null); + }); + } + + + } + + private static final class InstrumentedSubscriber implements Subscriber { + + private final Subscriber delegate; + private final CleanupState cleanupToken; + + public InstrumentedSubscriber(Subscriber delegate, CleanupState cleanupToken) { + this.delegate = delegate; + this.cleanupToken = cleanupToken; + } + + @Override + public void onSubscribe(Subscription subscription) { + cleanupToken.subscribed(subscription); + delegate.onSubscribe(new Subscription() { + @Override + public void request(long n) { + subscription.request(n); + } + + @Override + public void cancel() { + cleanupToken.doComplete(); + subscription.cancel(); + } + }); + } + + @Override + public void onNext(@Nullable T t) { + delegate.onNext(t); + } + + @Override + public void onError(Throwable t) { + cleanupToken.doComplete(); + delegate.onError(t); + } + + @Override + public void onComplete() { + cleanupToken.doComplete(); + delegate.onComplete(); + } + + + } + + private static final class CleanupState { + + private static final AtomicReferenceFieldUpdater UPDATER = + AtomicReferenceFieldUpdater.newUpdater(CleanupState.class, Object.class,"state"); + private static final String COMPLETE = "complete"; + + private final String message; + volatile Object state; + + public CleanupState(Publisher parent, String message) { + this.message = message; + this.state = parent; + } + + void doComplete() { + UPDATER.set(this, COMPLETE); + } + + private boolean checkComplete() { + Object previous = UPDATER.getAndSet(this, COMPLETE); + if (previous != COMPLETE) { + // This means something leaked. + if (previous instanceof Publisher) { + // never subscribed to. + SourceAdapters.toSource((Publisher) previous).subscribe(CancelImmediatelySubscriber.INSTANCE); + } else { + assert previous instanceof Cancellable; + Cancellable cancellable = (Cancellable) previous; + cancellable.cancel(); + } + return true; + } else { + return false; + } + } + + void subscribed(Subscription subscription) { + while (true) { + Object old = UPDATER.get(this); + if (old == COMPLETE || old instanceof Subscription) { + // TODO: What to do here? + LOGGER.debug("Publisher subscribed to multiple times."); + return; + } else if (UPDATER.compareAndSet(this, old, subscription)) { + return; + } + } + } + + void check() { + if (checkComplete()) { + LOGGER.warn(message); + } + } + } +} From ed72a8cbd39ac7313b933d07ee0b05dd55de4fba Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Tue, 14 Jan 2025 12:16:19 -0700 Subject: [PATCH 03/10] cleanup --- .../gradle/spotbugs/test-exclusions.xml | 8 +- .../java/io/servicetalk/grpc/LeakRepro.java | 100 ------------------ .../netty/GcWatchdogLeakDetectorTest.java | 96 +++++++++++++++++ .../src/test/proto/servicetalkleak.proto | 13 +++ .../src/test/proto/servicetalkloeak.proto | 12 --- ...ttpMessageDiscardWatchdogClientFilter.java | 4 +- ...tpMessageDiscardWatchdogServiceFilter.java | 4 +- .../http/netty/WatchdogLeakDetector.java | 34 ++++-- 8 files changed, 141 insertions(+), 130 deletions(-) delete mode 100644 servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/LeakRepro.java create mode 100644 servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GcWatchdogLeakDetectorTest.java create mode 100644 servicetalk-grpc-netty/src/test/proto/servicetalkleak.proto delete mode 100644 servicetalk-grpc-netty/src/test/proto/servicetalkloeak.proto diff --git a/servicetalk-grpc-netty/gradle/spotbugs/test-exclusions.xml b/servicetalk-grpc-netty/gradle/spotbugs/test-exclusions.xml index a396334e69..c18d61b816 100644 --- a/servicetalk-grpc-netty/gradle/spotbugs/test-exclusions.xml +++ b/servicetalk-grpc-netty/gradle/spotbugs/test-exclusions.xml @@ -19,16 +19,18 @@ - + - + - + + + diff --git a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/LeakRepro.java b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/LeakRepro.java deleted file mode 100644 index 15bb971707..0000000000 --- a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/LeakRepro.java +++ /dev/null @@ -1,100 +0,0 @@ -package io.servicetalk.grpc; - -import com.apple.servicetalkleak.Message; -import com.apple.servicetalkleak.ServiceTalkLeak; -import io.netty.buffer.ByteBufUtil; -import io.servicetalk.concurrent.api.Publisher; -import io.servicetalk.concurrent.api.Single; -import io.servicetalk.grpc.api.GrpcServiceContext; -import io.servicetalk.grpc.api.GrpcStatusCode; -import io.servicetalk.grpc.api.GrpcStatusException; -import io.servicetalk.grpc.netty.GrpcClients; -import io.servicetalk.grpc.netty.GrpcServers; -import io.servicetalk.http.netty.HttpProtocolConfigs; -import io.servicetalk.http.netty.SpliceFlatStreamToMetaSingle; -import io.servicetalk.logging.api.LogLevel; -import io.servicetalk.transport.api.HostAndPort; -import io.servicetalk.transport.api.IoExecutor; -import io.servicetalk.transport.netty.internal.NettyIoExecutors; -import org.junit.jupiter.api.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static io.servicetalk.concurrent.api.internal.BlockingUtils.blockingInvocation; -import static org.junit.jupiter.api.Assertions.assertFalse; - -public class LeakRepro { - - private static final Logger LOGGER = LoggerFactory.getLogger(LeakRepro.class); - - static boolean leakDetected = false; - - static { - System.setProperty("io.servicetalk.http.netty.leakdetection", "strict"); - System.setProperty("io.netty.leakDetection.level", "paranoid"); - ByteBufUtil.setLeakListener((type, records) -> { - leakDetected = true; - LOGGER.error("ByteBuf leak detected!"); - }); - } - - IoExecutor serverExecutor = NettyIoExecutors.createIoExecutor(1, "server"); - IoExecutor clientExecutor = NettyIoExecutors.createIoExecutor(1, "client"); - - @SuppressWarnings("resource") - @Test - public void testLeak() throws Exception { - GrpcServers.forPort(8888) - .initializeHttp(b -> b - .ioExecutor(serverExecutor) - .executor(serverExecutor)) - .listenAndAwait(new ServiceTalkLeak.ServiceTalkLeakService() { - @Override - public Publisher rpc(GrpcServiceContext ctx, Publisher request) { - Publisher response = splice(request) - .flatMapPublisher(pair -> { - LOGGER.info("Initial message: " + pair.head); - return Publisher.failed(new GrpcStatusException(GrpcStatusCode.INVALID_ARGUMENT.status())); - }); - return response; - } - }); - - ServiceTalkLeak.ServiceTalkLeakClient client = GrpcClients.forAddress(HostAndPort.of("127.0.0.1", 8888)) - .initializeHttp(b -> b - .protocols(HttpProtocolConfigs.h2().enableFrameLogging("CLIENT", LogLevel.INFO, () -> true).build()) - .ioExecutor(clientExecutor) - .executor(clientExecutor)) - .build(new ServiceTalkLeak.ClientFactory()); - - for (int i = 0; i < 10; i++) { - LOGGER.info("Iteration {}", i); - blockingInvocation( - client.rpc( - Publisher.from( - Message.newBuilder().setValue("first message").build(), - Message.newBuilder().setValue("second message (which leaks)").build())) - .ignoreElements() - .onErrorComplete()); - - System.gc(); - System.runFinalization(); - } - - assertFalse(leakDetected); - } - - private static Single splice(Publisher request) { - return request.liftSyncToSingle(new SpliceFlatStreamToMetaSingle<>(Pair::new)); - } - - private static final class Pair { - final Message head; - final Publisher stream; - - public Pair(Message head, Publisher stream) { - this.head = head; - this.stream = stream; - } - } -} \ No newline at end of file diff --git a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GcWatchdogLeakDetectorTest.java b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GcWatchdogLeakDetectorTest.java new file mode 100644 index 0000000000..e320a43049 --- /dev/null +++ b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GcWatchdogLeakDetectorTest.java @@ -0,0 +1,96 @@ +/* + * Copyright © 2024 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.grpc.netty; + +import io.servicetalk.concurrent.api.Publisher; +import io.servicetalk.concurrent.api.Single; +import io.servicetalk.grpc.api.GrpcServiceContext; +import io.servicetalk.grpc.api.GrpcStatusCode; +import io.servicetalk.grpc.api.GrpcStatusException; +import io.servicetalk.http.netty.SpliceFlatStreamToMetaSingle; +import io.servicetalk.leak.LeakMessage; +import io.servicetalk.leak.Leaker; +import io.servicetalk.transport.api.HostAndPort; + +import io.netty.buffer.ByteBufUtil; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static io.servicetalk.concurrent.api.internal.BlockingUtils.blockingInvocation; +import static org.junit.jupiter.api.Assertions.assertFalse; + +final class GcWatchdogLeakDetectorTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(GcWatchdogLeakDetectorTest.class); + + private static boolean leakDetected; + + static { + System.setProperty("io.servicetalk.http.netty.leakdetection", "strict"); + System.setProperty("io.netty.leakDetection.level", "paranoid"); + ByteBufUtil.setLeakListener((type, records) -> { + leakDetected = true; + LOGGER.error("ByteBuf leak detected!"); + }); + } + + @Test + void testLeak() throws Exception { + GrpcServers.forPort(8888) + .listenAndAwait(new Leaker.LeakerService() { + @Override + public Publisher rpc(GrpcServiceContext ctx, Publisher request) { + Publisher response = splice(request) + .flatMapPublisher(pair -> Publisher.failed( + new GrpcStatusException(GrpcStatusCode.INVALID_ARGUMENT.status()))); + return response; + } + }); + + Leaker.LeakerClient client = GrpcClients.forAddress(HostAndPort.of("localhost", 8888)) + .build(new Leaker.ClientFactory()); + + for (int i = 0; i < 10; i++) { + blockingInvocation( + client.rpc( + Publisher.from( + LeakMessage.newBuilder().setValue("first LeakMessage").build(), + LeakMessage.newBuilder().setValue("second LeakMessage (which leaks)").build())) + .ignoreElements() + .onErrorComplete()); + + System.gc(); + System.runFinalization(); + } + + assertFalse(leakDetected); + } + + private static Single splice(Publisher request) { + return request.liftSyncToSingle(new SpliceFlatStreamToMetaSingle<>(Pair::new)); + } + + private static final class Pair { + final LeakMessage head; + final Publisher stream; + + Pair(LeakMessage head, Publisher stream) { + this.head = head; + this.stream = stream; + } + } +} diff --git a/servicetalk-grpc-netty/src/test/proto/servicetalkleak.proto b/servicetalk-grpc-netty/src/test/proto/servicetalkleak.proto new file mode 100644 index 0000000000..c6c3e39b4a --- /dev/null +++ b/servicetalk-grpc-netty/src/test/proto/servicetalkleak.proto @@ -0,0 +1,13 @@ +syntax = "proto3"; + +option java_multiple_files = true; +option java_outer_classname = "ServiceTalkLeak"; +option java_package = "io.servicetalk.leak"; + +message LeakMessage { + string value = 1; +} + +service Leaker { + rpc Rpc(stream LeakMessage) returns (stream LeakMessage); +} diff --git a/servicetalk-grpc-netty/src/test/proto/servicetalkloeak.proto b/servicetalk-grpc-netty/src/test/proto/servicetalkloeak.proto deleted file mode 100644 index e17934d3d7..0000000000 --- a/servicetalk-grpc-netty/src/test/proto/servicetalkloeak.proto +++ /dev/null @@ -1,12 +0,0 @@ -syntax = "proto3"; - -option java_multiple_files = true; -option java_package = "com.apple.servicetalkleak"; - -message Message { - string value = 1; -} - -service ServiceTalkLeak { - rpc Rpc(stream Message) returns (stream Message); -} diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogClientFilter.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogClientFilter.java index 2e38eace0f..1f4903f547 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogClientFilter.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogClientFilter.java @@ -74,9 +74,9 @@ public Single request(final StreamingHttpRequest request) private Single requestStrict(final StreamingHttpRequest request) { return delegate().request(request.transformMessageBody(publisher -> - WatchdogLeakDetector.instrument(publisher, REQUEST_LEAK_MESSAGE))) + WatchdogLeakDetector.gcLeakDetection(publisher, REQUEST_LEAK_MESSAGE))) .map(response -> response.transformMessageBody(publisher -> - WatchdogLeakDetector.instrument(publisher, RESPONSE_LEAK_MESSAGE))); + WatchdogLeakDetector.gcLeakDetection(publisher, RESPONSE_LEAK_MESSAGE))); } private Single requestSimple(final StreamingHttpRequest request) { diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogServiceFilter.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogServiceFilter.java index c54389215c..5a737c33c3 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogServiceFilter.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogServiceFilter.java @@ -87,9 +87,9 @@ private Single handleStrict(final HttpServiceContext ctx, final StreamingHttpResponseFactory responseFactory) { return delegate() .handle(ctx, request.transformMessageBody(publisher -> - WatchdogLeakDetector.instrument(publisher, REQUEST_LEAK_MESSAGE)), responseFactory) + WatchdogLeakDetector.gcLeakDetection(publisher, REQUEST_LEAK_MESSAGE)), responseFactory) .map(response -> response.transformMessageBody(publisher -> - WatchdogLeakDetector.instrument(publisher, RESPONSE_LEAK_MESSAGE))); + WatchdogLeakDetector.gcLeakDetection(publisher, RESPONSE_LEAK_MESSAGE))); } private Single handleSimple(final HttpServiceContext ctx, diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/WatchdogLeakDetector.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/WatchdogLeakDetector.java index 4ab0e70feb..2d82db55cc 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/WatchdogLeakDetector.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/WatchdogLeakDetector.java @@ -1,3 +1,18 @@ +/* + * Copyright © 2024 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package io.servicetalk.http.netty; import io.servicetalk.concurrent.Cancellable; @@ -7,16 +22,17 @@ import io.servicetalk.concurrent.api.Publisher; import io.servicetalk.concurrent.api.SourceAdapters; import io.servicetalk.concurrent.internal.CancelImmediatelySubscriber; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nullable; import java.lang.ref.Reference; import java.lang.ref.ReferenceQueue; import java.lang.ref.WeakReference; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import javax.annotation.Nullable; final class WatchdogLeakDetector { @@ -54,15 +70,15 @@ private WatchdogLeakDetector() { // Singleton. } - static Publisher instrument(Publisher publisher, String message) { - return INSTANCE.instrument0(publisher, message); + static Publisher gcLeakDetection(Publisher publisher, String message) { + return INSTANCE.gcLeakDetection0(publisher, message); } static boolean strictDetection() { return STRICT_DETECTION; } - private Publisher instrument0(Publisher publisher, String message) { + private Publisher gcLeakDetection0(Publisher publisher, String message) { maybeCleanRefs(); CleanupState cleanupState = new CleanupState(publisher, message); Publisher result = publisher.liftSync(subscriber -> new InstrumentedSubscriber<>(subscriber, cleanupState)); @@ -87,8 +103,6 @@ private void maybeCleanRefs() { } while ((ref = refQueue.poll()) != null); }); } - - } private static final class InstrumentedSubscriber implements Subscriber { @@ -96,7 +110,7 @@ private static final class InstrumentedSubscriber implements Subscriber { private final Subscriber delegate; private final CleanupState cleanupToken; - public InstrumentedSubscriber(Subscriber delegate, CleanupState cleanupToken) { + InstrumentedSubscriber(Subscriber delegate, CleanupState cleanupToken) { this.delegate = delegate; this.cleanupToken = cleanupToken; } @@ -134,20 +148,18 @@ public void onComplete() { cleanupToken.doComplete(); delegate.onComplete(); } - - } private static final class CleanupState { private static final AtomicReferenceFieldUpdater UPDATER = - AtomicReferenceFieldUpdater.newUpdater(CleanupState.class, Object.class,"state"); + AtomicReferenceFieldUpdater.newUpdater(CleanupState.class, Object.class, "state"); private static final String COMPLETE = "complete"; private final String message; volatile Object state; - public CleanupState(Publisher parent, String message) { + CleanupState(Publisher parent, String message) { this.message = message; this.state = parent; } From 65d143b34c4bdc2f35ceea29c8c50d7c2336ea7f Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Tue, 14 Jan 2025 15:33:45 -0700 Subject: [PATCH 04/10] Filter refactoring --- .../netty/GcWatchdogLeakDetectorTest.java | 96 ---------- ...ttpMessageDiscardWatchdogClientFilter.java | 145 +++++++++------ ...tpMessageDiscardWatchdogServiceFilter.java | 167 ++++++++++-------- .../http/netty/WatchdogLeakDetector.java | 6 + .../http/netty/WatchdogLeakDetectorTest.java | 135 ++++++++++++++ 5 files changed, 333 insertions(+), 216 deletions(-) delete mode 100644 servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GcWatchdogLeakDetectorTest.java create mode 100644 servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/WatchdogLeakDetectorTest.java diff --git a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GcWatchdogLeakDetectorTest.java b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GcWatchdogLeakDetectorTest.java deleted file mode 100644 index e320a43049..0000000000 --- a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/GcWatchdogLeakDetectorTest.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Copyright © 2024 Apple Inc. and the ServiceTalk project authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.servicetalk.grpc.netty; - -import io.servicetalk.concurrent.api.Publisher; -import io.servicetalk.concurrent.api.Single; -import io.servicetalk.grpc.api.GrpcServiceContext; -import io.servicetalk.grpc.api.GrpcStatusCode; -import io.servicetalk.grpc.api.GrpcStatusException; -import io.servicetalk.http.netty.SpliceFlatStreamToMetaSingle; -import io.servicetalk.leak.LeakMessage; -import io.servicetalk.leak.Leaker; -import io.servicetalk.transport.api.HostAndPort; - -import io.netty.buffer.ByteBufUtil; -import org.junit.jupiter.api.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static io.servicetalk.concurrent.api.internal.BlockingUtils.blockingInvocation; -import static org.junit.jupiter.api.Assertions.assertFalse; - -final class GcWatchdogLeakDetectorTest { - - private static final Logger LOGGER = LoggerFactory.getLogger(GcWatchdogLeakDetectorTest.class); - - private static boolean leakDetected; - - static { - System.setProperty("io.servicetalk.http.netty.leakdetection", "strict"); - System.setProperty("io.netty.leakDetection.level", "paranoid"); - ByteBufUtil.setLeakListener((type, records) -> { - leakDetected = true; - LOGGER.error("ByteBuf leak detected!"); - }); - } - - @Test - void testLeak() throws Exception { - GrpcServers.forPort(8888) - .listenAndAwait(new Leaker.LeakerService() { - @Override - public Publisher rpc(GrpcServiceContext ctx, Publisher request) { - Publisher response = splice(request) - .flatMapPublisher(pair -> Publisher.failed( - new GrpcStatusException(GrpcStatusCode.INVALID_ARGUMENT.status()))); - return response; - } - }); - - Leaker.LeakerClient client = GrpcClients.forAddress(HostAndPort.of("localhost", 8888)) - .build(new Leaker.ClientFactory()); - - for (int i = 0; i < 10; i++) { - blockingInvocation( - client.rpc( - Publisher.from( - LeakMessage.newBuilder().setValue("first LeakMessage").build(), - LeakMessage.newBuilder().setValue("second LeakMessage (which leaks)").build())) - .ignoreElements() - .onErrorComplete()); - - System.gc(); - System.runFinalization(); - } - - assertFalse(leakDetected); - } - - private static Single splice(Publisher request) { - return request.liftSyncToSingle(new SpliceFlatStreamToMetaSingle<>(Pair::new)); - } - - private static final class Pair { - final LeakMessage head; - final Publisher stream; - - Pair(LeakMessage head, Publisher stream) { - this.head = head; - this.stream = stream; - } - } -} diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogClientFilter.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogClientFilter.java index 1f4903f547..198e847fc1 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogClientFilter.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogClientFilter.java @@ -35,77 +35,42 @@ import java.util.concurrent.atomic.AtomicReference; -import static io.servicetalk.http.netty.HttpMessageDiscardWatchdogServiceFilter.generifyAtomicReference; import static io.servicetalk.http.netty.WatchdogLeakDetector.REQUEST_LEAK_MESSAGE; import static io.servicetalk.http.netty.WatchdogLeakDetector.RESPONSE_LEAK_MESSAGE; /** * Filter which tracks message bodies and warns if they are not discarded properly. */ -final class HttpMessageDiscardWatchdogClientFilter implements StreamingHttpConnectionFilterFactory { +final class HttpMessageDiscardWatchdogClientFilter { private static final ContextMap.Key>> MESSAGE_PUBLISHER_KEY = ContextMap.Key .newKey(HttpMessageDiscardWatchdogClientFilter.class.getName() + ".messagePublisher", - generifyAtomicReference()); + WatchdogLeakDetector.generifyAtomicReference()); private static final Logger LOGGER = LoggerFactory.getLogger(HttpMessageDiscardWatchdogClientFilter.class); /** * Instance of {@link HttpMessageDiscardWatchdogClientFilter}. */ - static final HttpMessageDiscardWatchdogClientFilter INSTANCE = new HttpMessageDiscardWatchdogClientFilter(); + static final StreamingHttpConnectionFilterFactory INSTANCE; /** * Instance of {@link StreamingHttpClientFilterFactory} with the cleaner implementation. */ - static final StreamingHttpClientFilterFactory CLIENT_CLEANER = new CleanerStreamingHttpClientFilterFactory(); - - private HttpMessageDiscardWatchdogClientFilter() { - // Singleton - } - - @Override - public StreamingHttpConnectionFilter create(final FilterableStreamingHttpConnection connection) { - return new StreamingHttpConnectionFilter(connection) { - @Override - public Single request(final StreamingHttpRequest request) { - return WatchdogLeakDetector.strictDetection() ? requestStrict(request) : requestSimple(request); - } - - private Single requestStrict(final StreamingHttpRequest request) { - return delegate().request(request.transformMessageBody(publisher -> - WatchdogLeakDetector.gcLeakDetection(publisher, REQUEST_LEAK_MESSAGE))) - .map(response -> response.transformMessageBody(publisher -> - WatchdogLeakDetector.gcLeakDetection(publisher, RESPONSE_LEAK_MESSAGE))); - } - - private Single requestSimple(final StreamingHttpRequest request) { - return delegate().request(request).map(response -> { - // always write the buffer publisher into the request context. When a downstream subscriber - // arrives, mark the message as subscribed explicitly (having a message present and no - // subscription is an indicator that it must be freed later on). - final AtomicReference> reference = request.context() - .computeIfAbsent(MESSAGE_PUBLISHER_KEY, key -> new AtomicReference<>()); - assert reference != null; - if (reference.getAndSet(response.messageBody()) != null) { - // If a previous message exists, the Single got resubscribed to - // (i.e. during a retry) and so previous message body needs to be cleaned up by the - // user. - LOGGER.warn(RESPONSE_LEAK_MESSAGE); - } - - return response.transformMessageBody(msgPublisher -> msgPublisher.beforeSubscriber(() -> { - reference.set(null); - return HttpMessageDiscardWatchdogServiceFilter.NoopSubscriber.INSTANCE; - })); - }); - } - }; + static final StreamingHttpClientFilterFactory CLIENT_CLEANER; + + static { + if (WatchdogLeakDetector.strictDetection()) { + INSTANCE = new GcHttpMessageDiscardWatchdogClientFilter(); + CLIENT_CLEANER = new NoopCleaner(); + } else { + INSTANCE = new ContextHttpMessageDiscardWatchdogClientFilter(); + CLIENT_CLEANER = new CleanerStreamingHttpClientFilterFactory(); + } } - @Override - public HttpExecutionStrategy requiredOffloads() { - return HttpExecutionStrategies.offloadNone(); + private HttpMessageDiscardWatchdogClientFilter() { + // No instances } private static final class CleanerStreamingHttpClientFilterFactory implements StreamingHttpClientFilterFactory { @@ -134,4 +99,84 @@ public HttpExecutionStrategy requiredOffloads() { return HttpExecutionStrategies.offloadNone(); } } + + private static final class ContextHttpMessageDiscardWatchdogClientFilter implements StreamingHttpConnectionFilterFactory { + + @Override + public StreamingHttpConnectionFilter create(final FilterableStreamingHttpConnection connection) { + return new StreamingHttpConnectionFilter(connection) { + @Override + public Single request(final StreamingHttpRequest request) { + return WatchdogLeakDetector.strictDetection() ? requestStrict(request) : requestSimple(request); + } + + private Single requestStrict(final StreamingHttpRequest request) { + return delegate().request(request.transformMessageBody(publisher -> + WatchdogLeakDetector.gcLeakDetection(publisher, REQUEST_LEAK_MESSAGE))) + .map(response -> response.transformMessageBody(publisher -> + WatchdogLeakDetector.gcLeakDetection(publisher, RESPONSE_LEAK_MESSAGE))); + } + + private Single requestSimple(final StreamingHttpRequest request) { + return delegate().request(request).map(response -> { + // always write the buffer publisher into the request context. When a downstream subscriber + // arrives, mark the message as subscribed explicitly (having a message present and no + // subscription is an indicator that it must be freed later on). + final AtomicReference> reference = request.context() + .computeIfAbsent(MESSAGE_PUBLISHER_KEY, key -> new AtomicReference<>()); + assert reference != null; + if (reference.getAndSet(response.messageBody()) != null) { + // If a previous message exists, the Single got resubscribed to + // (i.e. during a retry) and so previous message body needs to be cleaned up by the + // user. + LOGGER.warn(RESPONSE_LEAK_MESSAGE); + } + + return response.transformMessageBody(msgPublisher -> msgPublisher.beforeSubscriber(() -> { + reference.set(null); + return HttpMessageDiscardWatchdogServiceFilter.NoopSubscriber.INSTANCE; + })); + }); + } + }; + } + + @Override + public HttpExecutionStrategy requiredOffloads() { + return HttpExecutionStrategies.offloadNone(); + } + } + + private static final class GcHttpMessageDiscardWatchdogClientFilter implements StreamingHttpConnectionFilterFactory { + + @Override + public StreamingHttpConnectionFilter create(FilterableStreamingHttpConnection connection) { + return new StreamingHttpConnectionFilter(connection) { + @Override + public Single request(final StreamingHttpRequest request) { + return delegate().request(request.transformMessageBody(publisher -> + WatchdogLeakDetector.gcLeakDetection(publisher, REQUEST_LEAK_MESSAGE))) + .map(response -> response.transformMessageBody(publisher -> + WatchdogLeakDetector.gcLeakDetection(publisher, RESPONSE_LEAK_MESSAGE))); + } + }; + } + + @Override + public HttpExecutionStrategy requiredOffloads() { + return HttpExecutionStrategies.offloadNone(); + } + } + + private static final class NoopCleaner implements StreamingHttpClientFilterFactory { + @Override + public StreamingHttpClientFilter create(FilterableStreamingHttpClient client) { + return new StreamingHttpClientFilter(client) {}; + } + + @Override + public HttpExecutionStrategy requiredOffloads() { + return HttpExecutionStrategies.offloadNone(); + } + } } diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogServiceFilter.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogServiceFilter.java index 5a737c33c3..839bd6563e 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogServiceFilter.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogServiceFilter.java @@ -39,95 +39,43 @@ import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; +import static io.servicetalk.http.api.HttpExecutionStrategies.offloadNone; import static io.servicetalk.http.netty.WatchdogLeakDetector.REQUEST_LEAK_MESSAGE; import static io.servicetalk.http.netty.WatchdogLeakDetector.RESPONSE_LEAK_MESSAGE; /** * Filter which tracks message bodies and warns if they are not discarded properly. */ -final class HttpMessageDiscardWatchdogServiceFilter implements StreamingHttpServiceFilterFactory { +final class HttpMessageDiscardWatchdogServiceFilter { private static final Logger LOGGER = LoggerFactory.getLogger(HttpMessageDiscardWatchdogServiceFilter.class); + private static final ContextMap.Key>> MESSAGE_PUBLISHER_KEY = ContextMap.Key + .newKey(HttpMessageDiscardWatchdogServiceFilter.class.getName() + ".messagePublisher", + WatchdogLeakDetector.generifyAtomicReference()); /** * Instance of {@link HttpMessageDiscardWatchdogServiceFilter}. */ - static final StreamingHttpServiceFilterFactory INSTANCE = new HttpMessageDiscardWatchdogServiceFilter(); + static final StreamingHttpServiceFilterFactory INSTANCE; /** * Instance of {@link HttpLifecycleObserverServiceFilter} with the cleaner implementation. */ - static final StreamingHttpServiceFilterFactory CLEANER = - new HttpLifecycleObserverServiceFilter(new CleanerHttpLifecycleObserver()); - - private static final ContextMap.Key>> MESSAGE_PUBLISHER_KEY = ContextMap.Key - .newKey(HttpMessageDiscardWatchdogServiceFilter.class.getName() + ".messagePublisher", - generifyAtomicReference()); - - private HttpMessageDiscardWatchdogServiceFilter() { - // Singleton - } - - @Override - public StreamingHttpServiceFilter create(final StreamingHttpService service) { - - return new StreamingHttpServiceFilter(service) { - - @Override - public Single handle(final HttpServiceContext ctx, - final StreamingHttpRequest request, - final StreamingHttpResponseFactory responseFactory) { - return WatchdogLeakDetector.strictDetection() ? - handleStrict(ctx, request, responseFactory) : handleSimple(ctx, request, responseFactory); - } - - private Single handleStrict(final HttpServiceContext ctx, - final StreamingHttpRequest request, - final StreamingHttpResponseFactory responseFactory) { - return delegate() - .handle(ctx, request.transformMessageBody(publisher -> - WatchdogLeakDetector.gcLeakDetection(publisher, REQUEST_LEAK_MESSAGE)), responseFactory) - .map(response -> response.transformMessageBody(publisher -> - WatchdogLeakDetector.gcLeakDetection(publisher, RESPONSE_LEAK_MESSAGE))); - } - - private Single handleSimple(final HttpServiceContext ctx, - final StreamingHttpRequest request, - final StreamingHttpResponseFactory responseFactory) { - return delegate() - .handle(ctx, request, responseFactory) - .map(response -> { - // always write the buffer publisher into the request context. When a downstream subscriber - // arrives, mark the message as subscribed explicitly (having a message present and no - // subscription is an indicator that it must be freed later on). - final AtomicReference> reference = request.context() - .computeIfAbsent(MESSAGE_PUBLISHER_KEY, key -> new AtomicReference<>()); - assert reference != null; - if (reference.getAndSet(response.messageBody()) != null) { - // If a previous message exists, the Single got resubscribed to - // (i.e. during a retry) and so previous message body needs to be cleaned up by the - // user. - LOGGER.warn(RESPONSE_LEAK_MESSAGE); - } - - return response.transformMessageBody(msgPublisher -> msgPublisher.beforeSubscriber(() -> { - reference.set(null); - return NoopSubscriber.INSTANCE; - })); - }); - } - }; - } - - @Override - public HttpExecutionStrategy requiredOffloads() { - return HttpExecutionStrategies.offloadNone(); + static final StreamingHttpServiceFilterFactory CLEANER; + + static { + if (WatchdogLeakDetector.strictDetection()) { + INSTANCE = new GcHttpMessageWatchdogServiceFilter(); + CLEANER = new NoopFilterFactory(); + } else { + INSTANCE = new ContextHttpMessageDiscardWatchdogServiceFilter(); + CLEANER = new HttpLifecycleObserverServiceFilter(new CleanerHttpLifecycleObserver()); + } } - @SuppressWarnings("unchecked") - static Class generifyAtomicReference() { - return (Class) AtomicReference.class; + private HttpMessageDiscardWatchdogServiceFilter() { + // no instances } static final class NoopSubscriber implements PublisherSource.Subscriber { @@ -211,4 +159,83 @@ public void onResponseCancel() { }; } } + + private static final class GcHttpMessageWatchdogServiceFilter implements StreamingHttpServiceFilterFactory { + @Override + public StreamingHttpServiceFilter create(StreamingHttpService service) { + return new StreamingHttpServiceFilter(service) { + @Override + public Single handle(HttpServiceContext ctx, StreamingHttpRequest request, + StreamingHttpResponseFactory responseFactory) { + return delegate() + .handle(ctx, request.transformMessageBody(publisher -> + WatchdogLeakDetector.gcLeakDetection(publisher, REQUEST_LEAK_MESSAGE)), responseFactory) + .map(response -> response.transformMessageBody(publisher -> + WatchdogLeakDetector.gcLeakDetection(publisher, RESPONSE_LEAK_MESSAGE))); + } + }; + } + + @Override + public HttpExecutionStrategy requiredOffloads() { + return HttpExecutionStrategies.offloadNone(); + } + } + + private static final class ContextHttpMessageDiscardWatchdogServiceFilter implements StreamingHttpServiceFilterFactory { + @Override + public HttpExecutionStrategy requiredOffloads() { + return HttpExecutionStrategies.offloadNone(); + } + + @Override + public StreamingHttpServiceFilter create(final StreamingHttpService service) { + + return new StreamingHttpServiceFilter(service) { + + @Override + public Single handle(final HttpServiceContext ctx, + final StreamingHttpRequest request, + final StreamingHttpResponseFactory responseFactory) { + return delegate() + .handle(ctx, request, responseFactory) + .map(response -> { + // always write the buffer publisher into the request context. When a downstream subscriber + // arrives, mark the message as subscribed explicitly (having a message present and no + // subscription is an indicator that it must be freed later on). + final AtomicReference> reference = request.context() + .computeIfAbsent(MESSAGE_PUBLISHER_KEY, key -> new AtomicReference<>()); + assert reference != null; + if (reference.getAndSet(response.messageBody()) != null) { + // If a previous message exists, the Single got resubscribed to + // (i.e. during a retry) and so previous message body needs to be cleaned up by the + // user. + LOGGER.warn(RESPONSE_LEAK_MESSAGE); + } + + return response.transformMessageBody(msgPublisher -> msgPublisher.beforeSubscriber(() -> { + reference.set(null); + return NoopSubscriber.INSTANCE; + })); + }); + } + }; + } + } + + private static final class NoopFilterFactory implements StreamingHttpServiceFilterFactory { + + private NoopFilterFactory() { + // singleton + } + @Override + public StreamingHttpServiceFilter create(StreamingHttpService service) { + return new StreamingHttpServiceFilter(service); + } + + @Override + public HttpExecutionStrategy requiredOffloads() { + return offloadNone(); + } + } } diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/WatchdogLeakDetector.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/WatchdogLeakDetector.java index 2d82db55cc..07801b1469 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/WatchdogLeakDetector.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/WatchdogLeakDetector.java @@ -31,6 +31,7 @@ import java.lang.ref.WeakReference; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import javax.annotation.Nullable; @@ -78,6 +79,11 @@ static boolean strictDetection() { return STRICT_DETECTION; } + @SuppressWarnings("unchecked") + static Class generifyAtomicReference() { + return (Class) AtomicReference.class; + } + private Publisher gcLeakDetection0(Publisher publisher, String message) { maybeCleanRefs(); CleanupState cleanupState = new CleanupState(publisher, message); diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/WatchdogLeakDetectorTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/WatchdogLeakDetectorTest.java new file mode 100644 index 0000000000..b48217f05f --- /dev/null +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/WatchdogLeakDetectorTest.java @@ -0,0 +1,135 @@ +/* + * Copyright © 2024 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.http.netty; + +import io.netty.buffer.ByteBufUtil; +import io.servicetalk.buffer.api.Buffer; +import io.servicetalk.buffer.netty.BufferAllocators; +import io.servicetalk.concurrent.PublisherSource; +import io.servicetalk.concurrent.api.Publisher; +import io.servicetalk.concurrent.api.Single; +import io.servicetalk.concurrent.api.SourceAdapters; +import io.servicetalk.http.api.HttpClient; +import io.servicetalk.http.api.HttpProtocolConfig; +import io.servicetalk.http.api.HttpResponse; +import io.servicetalk.http.api.HttpResponseStatus; +import io.servicetalk.http.api.HttpServerContext; +import io.servicetalk.http.api.StreamingHttpClient; +import io.servicetalk.http.api.StreamingHttpResponse; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.net.InetSocketAddress; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.jupiter.api.Assertions.assertFalse; + +final class WatchdogLeakDetectorTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(WatchdogLeakDetectorTest.class); + private static final int ITERATIONS = 10; + + private static boolean leakDetected; + + static { + System.setProperty("io.servicetalk.http.netty.leakdetection", "strict"); + System.setProperty("io.netty.leakDetection.level", "paranoid"); + ByteBufUtil.setLeakListener((type, records) -> { + leakDetected = true; + LOGGER.error("ByteBuf leak detected!"); + }); + } + + @Test + void orphanedServiceRequestPublisher() throws Exception { + // TODO: this fails with HTTP/1.1 as we hang on the `client.request(..).toFuture().get()` call. + HttpProtocolConfig config = HttpProtocolConfigs.h2Default(); + try (HttpServerContext serverContext = HttpServers.forPort(0) + .protocols(config) + .listenStreamingAndAwait((ctx, request, responseFactory) -> { + abandon(request.messageBody()); + return Single.succeeded(responseFactory.ok()); + })) { + + try (HttpClient client = HttpClients.forSingleAddress("localhost", + ((InetSocketAddress) serverContext.listenAddress()).getPort()).protocols(config).build()) { + for (int i = 0; i < ITERATIONS && !leakDetected; i++) { + HttpResponse response = client.request(client.post("/foo") + .payloadBody(payload())).toFuture().get(); + assertThat(response.status(), equalTo(HttpResponseStatus.OK)); + + System.gc(); + System.runFinalization(); + } + } + } + assertFalse(leakDetected); + } + + @Test + void orphanClientResponsePublisher() throws Exception { + // TODO: this succeeds with or without strict detection. + HttpProtocolConfig config = HttpProtocolConfigs.h2Default(); + try (HttpServerContext serverContext = HttpServers.forPort(0) + .protocols(config) + .listenAndAwait((ctx, request, responseFactory) -> + Single.succeeded(responseFactory.ok().payloadBody(payload())))) { + try (StreamingHttpClient client = HttpClients.forSingleAddress("localhost", + ((InetSocketAddress) serverContext.listenAddress()).getPort()). + protocols(config).build().asStreamingClient()) { + for (int i = 0; i < ITERATIONS && !leakDetected; i++) { + StreamingHttpResponse response = client.request(client.get("/foo")).toFuture().get(); + assertThat(response.status(), equalTo(HttpResponseStatus.OK)); + abandon(response.messageBody()); + response = null; + + System.gc(); + System.runFinalization(); + } + } + } + assertFalse(leakDetected); + } + + private static Buffer payload() { + return BufferAllocators.DEFAULT_ALLOCATOR.fromAscii("Hello, world!"); + } + + + private static void abandon(Publisher messageBody) { + SourceAdapters.toSource(messageBody).subscribe(new PublisherSource.Subscriber() { + @Override + public void onSubscribe(PublisherSource.Subscription subscription) { + } + + @Override + public void onNext(@Nullable Object o) { + } + + @Override + public void onError(Throwable t) { + } + + @Override + public void onComplete() { + } + }); + } +} From df3b01f2898c9b8cac2c7eec2305d29ab9de5986 Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Tue, 14 Jan 2025 15:49:13 -0700 Subject: [PATCH 05/10] Whitespace --- .../java/io/servicetalk/http/netty/WatchdogLeakDetectorTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/WatchdogLeakDetectorTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/WatchdogLeakDetectorTest.java index b48217f05f..4c9aa3b15d 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/WatchdogLeakDetectorTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/WatchdogLeakDetectorTest.java @@ -67,7 +67,6 @@ void orphanedServiceRequestPublisher() throws Exception { abandon(request.messageBody()); return Single.succeeded(responseFactory.ok()); })) { - try (HttpClient client = HttpClients.forSingleAddress("localhost", ((InetSocketAddress) serverContext.listenAddress()).getPort()).protocols(config).build()) { for (int i = 0; i < ITERATIONS && !leakDetected; i++) { From 51fa178093e5501b636014710be7260cb1db4e52 Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Tue, 14 Jan 2025 16:17:25 -0700 Subject: [PATCH 06/10] Some cleanup --- .../servicetalk/http/netty/WatchdogLeakDetectorTest.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/WatchdogLeakDetectorTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/WatchdogLeakDetectorTest.java index 4c9aa3b15d..641bd06518 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/WatchdogLeakDetectorTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/WatchdogLeakDetectorTest.java @@ -15,7 +15,6 @@ */ package io.servicetalk.http.netty; -import io.netty.buffer.ByteBufUtil; import io.servicetalk.buffer.api.Buffer; import io.servicetalk.buffer.netty.BufferAllocators; import io.servicetalk.concurrent.PublisherSource; @@ -29,13 +28,14 @@ import io.servicetalk.http.api.HttpServerContext; import io.servicetalk.http.api.StreamingHttpClient; import io.servicetalk.http.api.StreamingHttpResponse; + +import io.netty.buffer.ByteBufUtil; import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nullable; - import java.net.InetSocketAddress; +import javax.annotation.Nullable; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; @@ -111,7 +111,6 @@ private static Buffer payload() { return BufferAllocators.DEFAULT_ALLOCATOR.fromAscii("Hello, world!"); } - private static void abandon(Publisher messageBody) { SourceAdapters.toSource(messageBody).subscribe(new PublisherSource.Subscriber() { @Override From aa9763d36b952ded40723fd89c75df89234eb23b Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Tue, 14 Jan 2025 16:29:16 -0700 Subject: [PATCH 07/10] Even more cleanup --- .../gradle/spotbugs/test-exclusions.xml | 8 +++--- .../src/test/proto/servicetalkleak.proto | 13 ---------- .../gradle/spotbugs/test-exclusions.xml | 4 +++ ...ttpMessageDiscardWatchdogClientFilter.java | 14 ++++++----- ...tpMessageDiscardWatchdogServiceFilter.java | 25 +++++++++++-------- .../netty/SpliceFlatStreamToMetaSingle.java | 5 ++-- 6 files changed, 32 insertions(+), 37 deletions(-) delete mode 100644 servicetalk-grpc-netty/src/test/proto/servicetalkleak.proto diff --git a/servicetalk-grpc-netty/gradle/spotbugs/test-exclusions.xml b/servicetalk-grpc-netty/gradle/spotbugs/test-exclusions.xml index c18d61b816..a396334e69 100644 --- a/servicetalk-grpc-netty/gradle/spotbugs/test-exclusions.xml +++ b/servicetalk-grpc-netty/gradle/spotbugs/test-exclusions.xml @@ -19,18 +19,16 @@ - + - + + - - - diff --git a/servicetalk-grpc-netty/src/test/proto/servicetalkleak.proto b/servicetalk-grpc-netty/src/test/proto/servicetalkleak.proto deleted file mode 100644 index c6c3e39b4a..0000000000 --- a/servicetalk-grpc-netty/src/test/proto/servicetalkleak.proto +++ /dev/null @@ -1,13 +0,0 @@ -syntax = "proto3"; - -option java_multiple_files = true; -option java_outer_classname = "ServiceTalkLeak"; -option java_package = "io.servicetalk.leak"; - -message LeakMessage { - string value = 1; -} - -service Leaker { - rpc Rpc(stream LeakMessage) returns (stream LeakMessage); -} diff --git a/servicetalk-http-netty/gradle/spotbugs/test-exclusions.xml b/servicetalk-http-netty/gradle/spotbugs/test-exclusions.xml index e3a8d42e02..cf24eaa958 100644 --- a/servicetalk-http-netty/gradle/spotbugs/test-exclusions.xml +++ b/servicetalk-http-netty/gradle/spotbugs/test-exclusions.xml @@ -79,4 +79,8 @@ + + + + diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogClientFilter.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogClientFilter.java index 9fdf4a83db..059c15c777 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogClientFilter.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogClientFilter.java @@ -101,7 +101,8 @@ public HttpExecutionStrategy requiredOffloads() { } } - private static final class ContextHttpMessageDiscardWatchdogClientFilter implements StreamingHttpConnectionFilterFactory { + private static final class ContextHttpMessageDiscardWatchdogClientFilter + implements StreamingHttpConnectionFilterFactory { @Override public StreamingHttpConnectionFilter create(final FilterableStreamingHttpConnection connection) { @@ -140,27 +141,28 @@ public HttpExecutionStrategy requiredOffloads() { } } - private static final class GcHttpMessageDiscardWatchdogClientFilter implements StreamingHttpConnectionFilterFactory { + private static final class GcHttpMessageDiscardWatchdogClientFilter + implements StreamingHttpConnectionFilterFactory { @Override public StreamingHttpConnectionFilter create(FilterableStreamingHttpConnection connection) { return new StreamingHttpConnectionFilter(connection) { @Override - public Single request(final StreamingHttpRequest request) { + public Single request(final StreamingHttpRequest request) { return delegate().request(request.transformMessageBody(publisher -> WatchdogLeakDetector.gcLeakDetection(publisher, this::onRequestLeak))) .map(response -> response.transformMessageBody(publisher -> WatchdogLeakDetector.gcLeakDetection(publisher, this::onResponseLeak))); } - void onRequestLeak() { + private void onRequestLeak() { LOGGER.warn("Discovered un-drained HTTP request message body which has " + "been dropped by user code - this is a strong indication of a bug " + "in a user-defined filter. The request payload (message) body must " + "be fully consumed. connectionInfo={}", connectionContext()); } - void onResponseLeak() { + private void onResponseLeak() { LOGGER.warn("Discovered un-drained HTTP response message body which has " + "been dropped by user code - this is a strong indication of a bug " + "in a user-defined filter. Response payload (message) body must " + @@ -178,7 +180,7 @@ public HttpExecutionStrategy requiredOffloads() { private static final class NoopCleaner implements StreamingHttpClientFilterFactory { @Override public StreamingHttpClientFilter create(FilterableStreamingHttpClient client) { - return new StreamingHttpClientFilter(client) {}; + return new StreamingHttpClientFilter(client) { }; } @Override diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogServiceFilter.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogServiceFilter.java index 4640c61470..c2f70567c2 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogServiceFilter.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogServiceFilter.java @@ -179,9 +179,11 @@ public Single handle(HttpServiceContext ctx, StreamingHtt StreamingHttpResponseFactory responseFactory) { return delegate() .handle(ctx, request.transformMessageBody(publisher -> - WatchdogLeakDetector.gcLeakDetection(publisher, () -> LOGGER.error(REQUEST_LEAK_MESSAGE))), responseFactory) + WatchdogLeakDetector.gcLeakDetection(publisher, + () -> LOGGER.error(REQUEST_LEAK_MESSAGE))), responseFactory) .map(response -> response.transformMessageBody(publisher -> - WatchdogLeakDetector.gcLeakDetection(publisher, () -> LOGGER.warn(RESPONSE_LEAK_MESSAGE)))); + WatchdogLeakDetector.gcLeakDetection(publisher, + () -> LOGGER.warn(RESPONSE_LEAK_MESSAGE)))); } }; } @@ -192,7 +194,8 @@ public HttpExecutionStrategy requiredOffloads() { } } - private static final class ContextHttpMessageDiscardWatchdogServiceFilter implements StreamingHttpServiceFilterFactory { + private static final class ContextHttpMessageDiscardWatchdogServiceFilter + implements StreamingHttpServiceFilterFactory { @Override public HttpExecutionStrategy requiredOffloads() { return HttpExecutionStrategies.offloadNone(); @@ -210,20 +213,21 @@ public Single handle(final HttpServiceContext ctx, return delegate() .handle(ctx, request, responseFactory) .map(response -> { - // always write the buffer publisher into the request context. When a downstream subscriber - // arrives, mark the message as subscribed explicitly (having a message present and no - // subscription is an indicator that it must be freed later on). + // always write the buffer publisher into the request context. When a downstream + // subscriber arrives, mark the message as subscribed explicitly (having a message + // present and no subscription is an indicator that it must be freed later on). final AtomicReference> reference = request.context() .computeIfAbsent(MESSAGE_PUBLISHER_KEY, key -> new AtomicReference<>()); assert reference != null; if (reference.getAndSet(response.messageBody()) != null) { - // If a previous message exists, the Single got resubscribed to - // (i.e. during a retry) and so previous message body needs to be cleaned up by the - // user. + // If a previous message exists, the Single got resubscribed + // to (i.e. during a retry) and so previous message body needs to be cleaned up by + // the user. LOGGER.warn(RESPONSE_LEAK_MESSAGE); } - return response.transformMessageBody(msgPublisher -> msgPublisher.beforeSubscriber(() -> { + return response.transformMessageBody(msgPublisher -> + msgPublisher.beforeSubscriber(() -> { reference.set(null); return NoopSubscriber.INSTANCE; })); @@ -238,6 +242,7 @@ private static final class NoopFilterFactory implements StreamingHttpServiceFilt private NoopFilterFactory() { // singleton } + @Override public StreamingHttpServiceFilter create(StreamingHttpService service) { return new StreamingHttpServiceFilter(service); diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/SpliceFlatStreamToMetaSingle.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/SpliceFlatStreamToMetaSingle.java index 0e29512777..3c0025a2e3 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/SpliceFlatStreamToMetaSingle.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/SpliceFlatStreamToMetaSingle.java @@ -52,8 +52,7 @@ * @param type of meta-data in front of the stream of {@link Payload}, eg. {@link HttpResponseMetaData} * @param type of payload inside the {@link Data}, eg. {@link Buffer} */ -// TODO: revert: this shouldn't be public. -public final class SpliceFlatStreamToMetaSingle implements PublisherToSingleOperator { +final class SpliceFlatStreamToMetaSingle implements PublisherToSingleOperator { private static final Logger LOGGER = LoggerFactory.getLogger(SpliceFlatStreamToMetaSingle.class); private final BiFunction, Data> packer; @@ -65,7 +64,7 @@ public final class SpliceFlatStreamToMetaSingle impleme * @param packer function to pack the {@link Publisher}<{@link Payload}> and {@link MetaData} into a * {@link Data} */ - public SpliceFlatStreamToMetaSingle(BiFunction, Data> packer) { + SpliceFlatStreamToMetaSingle(BiFunction, Data> packer) { this.packer = requireNonNull(packer); } From 6ffa2a1a35077cb3381640b88d077d5d1161e96e Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Tue, 14 Jan 2025 17:18:20 -0700 Subject: [PATCH 08/10] Fix copyright --- .../java/io/servicetalk/http/netty/WatchdogLeakDetector.java | 2 +- .../io/servicetalk/http/netty/WatchdogLeakDetectorTest.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/WatchdogLeakDetector.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/WatchdogLeakDetector.java index cc56cac8d8..ac9d31f5a7 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/WatchdogLeakDetector.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/WatchdogLeakDetector.java @@ -1,5 +1,5 @@ /* - * Copyright © 2024 Apple Inc. and the ServiceTalk project authors + * Copyright © 2025 Apple Inc. and the ServiceTalk project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/WatchdogLeakDetectorTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/WatchdogLeakDetectorTest.java index 641bd06518..8f4bf6f3dc 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/WatchdogLeakDetectorTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/WatchdogLeakDetectorTest.java @@ -1,5 +1,5 @@ /* - * Copyright © 2024 Apple Inc. and the ServiceTalk project authors + * Copyright © 2025 Apple Inc. and the ServiceTalk project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. From 86a29352fe9e1e62edc4968caeee6cd5b504254d Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Fri, 17 Jan 2025 14:07:20 -0700 Subject: [PATCH 09/10] A bit of cleanup, but there is work to do --- .../http/netty/WatchdogLeakDetector.java | 42 ++++++++++++------- 1 file changed, 27 insertions(+), 15 deletions(-) diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/WatchdogLeakDetector.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/WatchdogLeakDetector.java index ac9d31f5a7..207b890ee0 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/WatchdogLeakDetector.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/WatchdogLeakDetector.java @@ -18,6 +18,7 @@ import io.servicetalk.concurrent.Cancellable; import io.servicetalk.concurrent.PublisherSource.Subscriber; import io.servicetalk.concurrent.PublisherSource.Subscription; +import io.servicetalk.concurrent.api.Executor; import io.servicetalk.concurrent.api.Executors; import io.servicetalk.concurrent.api.Publisher; import io.servicetalk.concurrent.api.SourceAdapters; @@ -31,15 +32,19 @@ import java.lang.ref.WeakReference; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import javax.annotation.Nullable; final class WatchdogLeakDetector { + private static final AtomicIntegerFieldUpdater STATE_UPDATER = + AtomicIntegerFieldUpdater.newUpdater(WatchdogLeakDetector.class, "state"); + private static final Logger LOGGER = LoggerFactory.getLogger(WatchdogLeakDetector.class); - private static final WatchdogLeakDetector INSTANCE = new WatchdogLeakDetector(); + private static final WatchdogLeakDetector INSTANCE = new WatchdogLeakDetector(Executors.global()); private static final String PROPERTY_NAME = "io.servicetalk.http.netty.leakdetection"; @@ -52,11 +57,13 @@ final class WatchdogLeakDetector { STRICT_DETECTION = prop != null && prop.equalsIgnoreCase(STRICT_MODE); } + private final Executor executor; private final ReferenceQueue refQueue = new ReferenceQueue<>(); private final Map, CleanupState> allRefs = new ConcurrentHashMap<>(); + private volatile int state; - private WatchdogLeakDetector() { - // Singleton. + private WatchdogLeakDetector(Executor executor) { + this.executor = executor; } static Publisher gcLeakDetection(Publisher publisher, Runnable onLeak) { @@ -76,25 +83,29 @@ private Publisher gcLeakDetection0(Publisher publisher, Runnable onLea maybeCleanRefs(); CleanupState cleanupState = new CleanupState(publisher, onLeak); Publisher result = publisher.liftSync(subscriber -> new InstrumentedSubscriber<>(subscriber, cleanupState)); - Reference ref = new WeakReference<>(result, refQueue); + Reference ref = new WeakReference<>(cleanupState, refQueue); allRefs.put(ref, cleanupState); return result; } private void maybeCleanRefs() { final Reference testRef = refQueue.poll(); - if (testRef != null) { + if (testRef != null && STATE_UPDATER.compareAndSet(this, 0, 1)) { // There are references to be cleaned but don't do it on this thread. // TODO: what executor should we really use? - Executors.global().submit(() -> { + executor.submit(() -> { Reference ref = testRef; - do { - ref.clear(); - CleanupState cleanupState = allRefs.remove(ref); - if (cleanupState != null) { - cleanupState.check(); - } - } while ((ref = refQueue.poll()) != null); + try { + do { + ref.clear(); + CleanupState cleanupState = allRefs.remove(ref); + if (cleanupState != null) { + cleanupState.check(); + } + } while ((ref = refQueue.poll()) != null); + } finally { + STATE_UPDATER.set(this, 0); + } }); } } @@ -112,7 +123,7 @@ private static final class InstrumentedSubscriber implements Subscriber { @Override public void onSubscribe(Subscription subscription) { cleanupToken.subscribed(subscription); - delegate.onSubscribe(new Subscription() { + Subscription nextSubscription = new Subscription() { @Override public void request(long n) { subscription.request(n); @@ -123,7 +134,8 @@ public void cancel() { cleanupToken.doComplete(); subscription.cancel(); } - }); + }; + delegate.onSubscribe(nextSubscription); } @Override From 1f02eacd68ab137680eff202267fef99f3ea63d0 Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Fri, 17 Jan 2025 14:45:18 -0700 Subject: [PATCH 10/10] Revert ref --- .../java/io/servicetalk/http/netty/WatchdogLeakDetector.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/WatchdogLeakDetector.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/WatchdogLeakDetector.java index 207b890ee0..8da57f3ba9 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/WatchdogLeakDetector.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/WatchdogLeakDetector.java @@ -83,7 +83,7 @@ private Publisher gcLeakDetection0(Publisher publisher, Runnable onLea maybeCleanRefs(); CleanupState cleanupState = new CleanupState(publisher, onLeak); Publisher result = publisher.liftSync(subscriber -> new InstrumentedSubscriber<>(subscriber, cleanupState)); - Reference ref = new WeakReference<>(cleanupState, refQueue); + Reference ref = new WeakReference<>(result, refQueue); allRefs.put(ref, cleanupState); return result; }