From c92941cd9034a41a9d1e98b63dc77d193aac0194 Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Thu, 5 Oct 2023 09:45:24 -0600 Subject: [PATCH 1/9] ReplayStrategies.LazyTimeLimitedReplayAccumulator trims and accumulate (#2725) Motivation: The lazy accumulator doesn't eagerly evict items so it can be prone to poor behaviors under certain patterns. One of those patterns is having a large queue limit with a short timeout and infrequent re-subscribes. In this case a producer can fill the queue to the brim despite many elements being expired. Modifications: Trim stale entries when accumulating. Result: This removes a pathological case and leaves the only remaining pathological case being one where we get a large burst of messages, none of which have expired, and then we have no more new elements or subscribers for long durations which can put unnecessary pressure on the GC. --- .../concurrent/api/ReplayStrategies.java | 28 +++++++++++-------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/ReplayStrategies.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/ReplayStrategies.java index b2bea802cc..1cde1ffeee 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/ReplayStrategies.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/ReplayStrategies.java @@ -22,7 +22,6 @@ import java.time.Duration; import java.util.ArrayDeque; import java.util.Deque; -import java.util.Iterator; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; @@ -158,23 +157,30 @@ private static final class LazyTimeLimitedReplayAccumulator implements Replay @Override public void accumulate(@Nullable final T t) { + final long nanoTime = executor.currentTime(NANOSECONDS); + trimExpired(nanoTime); if (items.size() >= maxItems) { items.poll(); } - items.add(new TimeStampSignal<>(executor.currentTime(NANOSECONDS), t)); + items.add(new TimeStampSignal<>(nanoTime, t)); } @Override public void deliverAccumulation(final Consumer consumer) { - final Iterator> itr = items.iterator(); - final long nanoTime = executor.currentTime(NANOSECONDS); - while (itr.hasNext()) { - final TimeStampSignal next = itr.next(); - if (nanoTime - next.timeStamp >= ttlNanos) { - itr.remove(); - } else { - consumer.accept(next.signal); - } + if (items.isEmpty()) { + return; + } + trimExpired(executor.currentTime(NANOSECONDS)); + for (TimeStampSignal next : items) { + consumer.accept(next.signal); + } + } + + private void trimExpired(long nanoTime) { + // Entry time stamps are monotonically increasing, so we only need to trim until the first non-stale entry. + TimeStampSignal next; + while ((next = items.peek()) != null && nanoTime - next.timeStamp >= ttlNanos) { + items.poll(); } } } From 4fb2ac1ddb841364b14dc9f724e1789dc8079820 Mon Sep 17 00:00:00 2001 From: Idel Pivnitskiy Date: Fri, 6 Oct 2023 15:05:17 -0700 Subject: [PATCH 2/9] ConnectionObserver: provide ConnectionInfo on transport handshake (#2726) Motivation: If a connection fails with an exception before one of the "established" methods is invoked, users don't have access to meaningful information, like `SslConfig`, SocketOptions, channelId (for correlation with wire logs), local address, etc. Modifications: - Add `ConnectionObserver.onTransportHandshakeComplete(ConnectionInfo)` callback that gives users a ST view of the netty's `Channel`; - Deprecate pre-existing `ConnectionObserver.onTransportHandshakeComplete()`; Results: 1. Users can collect more information about a connection if it's failed before "established". 2. Users can get Channel's ID for reporting `ProxyConnectObserver` events and security handshake failures. --- .../netty/AlpnLBHttpConnectionFactory.java | 3 +- .../netty/DeferredServerChannelBinder.java | 7 +- .../http/netty/H2LBHttpConnectionFactory.java | 2 +- .../H2ServerParentConnectionContext.java | 4 +- .../http/netty/NettyHttpServer.java | 4 +- .../ProxyConnectLBHttpConnectionFactory.java | 2 +- .../netty/StreamingConnectionFactory.java | 2 +- .../http/netty/FlushStrategyOnServerTest.java | 2 +- .../http/netty/HttpRequestEncoderTest.java | 4 +- ...HttpTransportObserverAsyncContextTest.java | 2 +- .../http/netty/HttpTransportObserverTest.java | 4 +- .../http/netty/HttpsProxyTest.java | 4 +- .../netty/SecurityHandshakeObserverTest.java | 2 +- .../netty/ServerRespondsOnClosingTest.java | 2 +- .../netty/SslCertificateCompressionTest.java | 3 +- .../gradle/spotbugs/main-exclusions.xml | 10 ++ .../internal/AbstractReadOnlyTcpConfig.java | 11 +- .../tcp/netty/internal/AbstractTcpConfig.java | 2 +- .../internal/TcpClientChannelInitializer.java | 42 +++++- .../tcp/netty/internal/TcpConnectionInfo.java | 93 +++++++++++++ .../tcp/netty/internal/TcpServerBinder.java | 48 +------ .../internal/TcpServerChannelInitializer.java | 34 ++++- .../SecureTcpTransportObserverErrorsTest.java | 2 + .../SecureTcpTransportObserverTest.java | 4 +- .../TcpTransportObserverErrorsTest.java | 4 +- .../internal/TcpTransportObserverTest.java | 4 +- .../tcp/netty/internal/TcpClient.java | 2 +- .../tcp/netty/internal/TcpServer.java | 2 +- .../transport/api/BiTransportObserver.java | 6 +- .../api/CatchAllTransportObserver.java | 4 +- .../transport/api/ConnectionObserver.java | 31 ++++- .../transport/api/NoopTransportObserver.java | 2 +- .../gradle/spotbugs/main-exclusions.xml | 7 + .../ConnectionObserverInitializer.java | 130 +++++++++++++++--- .../netty/internal/NoopTransportObserver.java | 2 +- 35 files changed, 371 insertions(+), 116 deletions(-) create mode 100644 servicetalk-tcp-netty-internal/src/main/java/io/servicetalk/tcp/netty/internal/TcpConnectionInfo.java diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/AlpnLBHttpConnectionFactory.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/AlpnLBHttpConnectionFactory.java index 303d03b31e..e0814039bb 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/AlpnLBHttpConnectionFactory.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/AlpnLBHttpConnectionFactory.java @@ -69,7 +69,8 @@ Single newFilterableConnection( private Single createConnection( final Channel channel, final ConnectionObserver connectionObserver, final ReadOnlyTcpClientConfig tcpConfig) { - return new AlpnChannelSingle(channel, new TcpClientChannelInitializer(tcpConfig, connectionObserver), + return new AlpnChannelSingle(channel, + new TcpClientChannelInitializer(tcpConfig, connectionObserver, executionContext, false), ctx -> { /* SslHandler will automatically start handshake on channelActive */ }).flatMap(protocol -> { switch (protocol) { case HTTP_1_1: diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/DeferredServerChannelBinder.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/DeferredServerChannelBinder.java index 4a04b91974..0c913c6e67 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/DeferredServerChannelBinder.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/DeferredServerChannelBinder.java @@ -1,5 +1,5 @@ /* - * Copyright © 2019-2021 Apple Inc. and the ServiceTalk project authors + * Copyright © 2019-2023 Apple Inc. and the ServiceTalk project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -91,7 +91,8 @@ private static Single alpnInitChannel(final SocketAddres final StreamingHttpService service, final boolean drainRequestPayloadBody, final ConnectionObserver observer) { - return new AlpnChannelSingle(channel, new TcpServerChannelInitializer(config.tcpConfig(), observer), + return new AlpnChannelSingle(channel, + new TcpServerChannelInitializer(config.tcpConfig(), observer, httpExecutionContext), // Force a read to get the SSL handshake started. We initialize pipeline before // SslHandshakeCompletionEvent will complete, therefore, no data will be propagated before we finish // initialization. @@ -117,7 +118,7 @@ private static Single sniInitChannel(final SocketAddress final boolean drainRequestPayloadBody, final ConnectionObserver observer) { return new SniCompleteChannelSingle(channel, - new TcpServerChannelInitializer(config.tcpConfig(), observer)).flatMap(sniEvt -> { + new TcpServerChannelInitializer(config.tcpConfig(), observer, httpExecutionContext)).flatMap(sniEvt -> { Throwable failureCause = sniEvt.cause(); if (failureCause != null) { return failed(failureCause); diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2LBHttpConnectionFactory.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2LBHttpConnectionFactory.java index 7417e6952d..c81bb908a5 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2LBHttpConnectionFactory.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2LBHttpConnectionFactory.java @@ -55,7 +55,7 @@ Single newFilterableConnection( (channel, connectionObserver) -> H2ClientParentConnectionContext.initChannel(channel, executionContext, config.h2Config(), reqRespFactoryFunc.apply(HTTP_2_0), tcpConfig.flushStrategy(), tcpConfig.idleTimeoutMs(), tcpConfig.sslConfig(), - new TcpClientChannelInitializer(tcpConfig, connectionObserver).andThen( + new TcpClientChannelInitializer(tcpConfig, connectionObserver, executionContext, false).andThen( new H2ClientParentChannelInitializer(config.h2Config())), connectionObserver, config.allowDropTrailersReadFromTransport()), observer); } diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ServerParentConnectionContext.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ServerParentConnectionContext.java index b29d45a266..2e6551500a 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ServerParentConnectionContext.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ServerParentConnectionContext.java @@ -1,5 +1,5 @@ /* - * Copyright © 2019-2021 Apple Inc. and the ServiceTalk project authors + * Copyright © 2019-2023 Apple Inc. and the ServiceTalk project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -100,7 +100,7 @@ static Single bind(final HttpExecutionContext executionContex final ReadOnlyTcpServerConfig tcpServerConfig = config.tcpConfig(); return TcpServerBinder.bind(listenAddress, tcpServerConfig, executionContext, connectionAcceptor, (channel, connectionObserver) -> initChannel(listenAddress, channel, executionContext, config, - new TcpServerChannelInitializer(tcpServerConfig, connectionObserver), service, + new TcpServerChannelInitializer(tcpServerConfig, connectionObserver, executionContext), service, drainRequestPayloadBody, connectionObserver), serverConnection -> { /* nothing to do as h2 uses auto read on the parent channel */ }, earlyConnectionAcceptor, lateConnectionAcceptor) diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/NettyHttpServer.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/NettyHttpServer.java index 295a1710dc..a7df428bae 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/NettyHttpServer.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/NettyHttpServer.java @@ -1,5 +1,5 @@ /* - * Copyright © 2018-2021 Apple Inc. and the ServiceTalk project authors + * Copyright © 2018-2023 Apple Inc. and the ServiceTalk project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -129,7 +129,7 @@ static Single bind(final HttpExecutionContext executionContex final ReadOnlyTcpServerConfig tcpServerConfig = config.tcpConfig(); return TcpServerBinder.bind(address, tcpServerConfig, executionContext, connectionAcceptor, (channel, connectionObserver) -> initChannel(channel, executionContext, config, - new TcpServerChannelInitializer(tcpServerConfig, connectionObserver), service, + new TcpServerChannelInitializer(tcpServerConfig, connectionObserver, executionContext), service, drainRequestPayloadBody, connectionObserver), serverConnection -> serverConnection.process(true), earlyConnectionAcceptor, lateConnectionAcceptor) diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/ProxyConnectLBHttpConnectionFactory.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/ProxyConnectLBHttpConnectionFactory.java index e262ec8287..95bade909b 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/ProxyConnectLBHttpConnectionFactory.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/ProxyConnectLBHttpConnectionFactory.java @@ -99,7 +99,7 @@ private Single createConnection( // Disable half-closure to simplify ProxyConnectHandler implementation channelConfig.setOption(ALLOW_HALF_CLOSURE, FALSE); return new ProxyConnectChannelSingle(channel, - new TcpClientChannelInitializer(config.tcpConfig(), observer, config.hasProxy()) + new TcpClientChannelInitializer(config.tcpConfig(), observer, executionContext, true) .andThen(new HttpClientChannelInitializer( getByteBufAllocator(executionContext.bufferAllocator()), h1Config, closeHandler)), observer, h1Config.headersFactory(), connectAddress) diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/StreamingConnectionFactory.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/StreamingConnectionFactory.java index 8cfe85ff80..d5cb736670 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/StreamingConnectionFactory.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/StreamingConnectionFactory.java @@ -59,7 +59,7 @@ static Single> build // We disable auto read so we can handle stuff in the ConnectionFilter before we accept any content. return TcpConnector.connect(null, resolvedAddress, tcpConfig, false, executionContext, (channel, connectionObserver) -> createConnection(channel, executionContext, h1Config, tcpConfig, - new TcpClientChannelInitializer(tcpConfig, connectionObserver, false), + new TcpClientChannelInitializer(tcpConfig, connectionObserver, executionContext, false), connectionObserver), observer); } diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/FlushStrategyOnServerTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/FlushStrategyOnServerTest.java index b559f17a63..0ee77d0773 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/FlushStrategyOnServerTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/FlushStrategyOnServerTest.java @@ -121,7 +121,7 @@ private void setUp(final Param param) { (channel, observer) -> { channel.config().setAutoRead(true); return initChannel(channel, httpExecutionContext, config, - new TcpServerChannelInitializer(tcpReadOnly, observer) + new TcpServerChannelInitializer(tcpReadOnly, observer, httpExecutionContext) .andThen(channel1 -> channel1.pipeline().addLast(interceptor)), service, true, observer); }, diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/HttpRequestEncoderTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/HttpRequestEncoderTest.java index 6bc409f520..dd8032de73 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/HttpRequestEncoderTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/HttpRequestEncoderTest.java @@ -442,7 +442,7 @@ void protocolPayloadEndOutboundShouldNotTriggerOnFailedFlush() throws Exception SEC.executor(), SEC.ioExecutor(), forPipelinedRequestResponse(false, channel.config()), defaultFlushStrategy(), 0L, null, - new TcpServerChannelInitializer(sConfig, observer).andThen( + new TcpServerChannelInitializer(sConfig, observer, SEC).andThen( channel2 -> { serverChannelRef.compareAndSet(null, channel2); serverChannelLatch.countDown(); @@ -461,7 +461,7 @@ void protocolPayloadEndOutboundShouldNotTriggerOnFailedFlush() throws Exception closeHandler, defaultFlushStrategy(), 0L, cConfig.tcpConfig().sslConfig(), new TcpClientChannelInitializer(cConfig.tcpConfig(), - connectionObserver) + connectionObserver, CEC, false) .andThen(new HttpClientChannelInitializer( getByteBufAllocator(CEC.bufferAllocator()), cConfig.h1Config(), closeHandler)) diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/HttpTransportObserverAsyncContextTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/HttpTransportObserverAsyncContextTest.java index 66a027da32..3addeb3270 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/HttpTransportObserverAsyncContextTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/HttpTransportObserverAsyncContextTest.java @@ -195,7 +195,7 @@ public void onFlush() { } @Override - public void onTransportHandshakeComplete() { + public void onTransportHandshakeComplete(final ConnectionInfo info) { // AsyncContext is unknown at this point because this event is triggered by network } diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/HttpTransportObserverTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/HttpTransportObserverTest.java index 405bb54dd8..0536593b28 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/HttpTransportObserverTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/HttpTransportObserverTest.java @@ -170,8 +170,8 @@ void connectionEstablished(HttpProtocol httpProtocol) throws Exception { verify(clientTransportObserver).onNewConnection(any(), any()); verify(serverTransportObserver, await()).onNewConnection(any(), any()); - verify(clientConnectionObserver).onTransportHandshakeComplete(); - verify(serverConnectionObserver, await()).onTransportHandshakeComplete(); + verify(clientConnectionObserver).onTransportHandshakeComplete(any()); + verify(serverConnectionObserver, await()).onTransportHandshakeComplete(any()); if (protocol == HTTP_1) { verify(clientConnectionObserver).connectionEstablished(any(ConnectionInfo.class)); verify(serverConnectionObserver, await()).connectionEstablished(any(ConnectionInfo.class)); diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/HttpsProxyTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/HttpsProxyTest.java index 534e7595fe..54ec2e3282 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/HttpsProxyTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/HttpsProxyTest.java @@ -378,14 +378,14 @@ private void assertTargetAddress() { private void verifyProxyConnectFailed(Throwable cause) { order.verify(transportObserver).onNewConnection(any(), any()); - order.verify(connectionObserver).onTransportHandshakeComplete(); + order.verify(connectionObserver).onTransportHandshakeComplete(any()); order.verify(connectionObserver).onProxyConnect(any()); order.verify(proxyConnectObserver).proxyConnectFailed(cause); } private void verifyProxyConnectComplete() { order.verify(transportObserver).onNewConnection(any(), any()); - order.verify(connectionObserver).onTransportHandshakeComplete(); + order.verify(connectionObserver).onTransportHandshakeComplete(any()); order.verify(connectionObserver).onProxyConnect(any()); order.verify(proxyConnectObserver).proxyConnectComplete(any()); } diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/SecurityHandshakeObserverTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/SecurityHandshakeObserverTest.java index 272134cc75..a809d2d740 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/SecurityHandshakeObserverTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/SecurityHandshakeObserverTest.java @@ -181,7 +181,7 @@ private static void verifyObservers(InOrder order, TransportObserver transportOb ConnectionObserver connectionObserver, SecurityHandshakeObserver securityHandshakeObserver, HttpProtocol expectedProtocol, boolean failHandshake) { order.verify(transportObserver).onNewConnection(any(), any()); - order.verify(connectionObserver).onTransportHandshakeComplete(); + order.verify(connectionObserver).onTransportHandshakeComplete(any()); order.verify(connectionObserver).onSecurityHandshake(); if (failHandshake) { ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(Throwable.class); diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ServerRespondsOnClosingTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ServerRespondsOnClosingTest.java index e4c623ea78..c04a81faf4 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ServerRespondsOnClosingTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ServerRespondsOnClosingTest.java @@ -87,7 +87,7 @@ class ServerRespondsOnClosingTest { return fromSource(responseProcessor); }; serverConnection = initChannel(channel, httpExecutionContext, config, new TcpServerChannelInitializer( - config.tcpConfig(), connectionObserver), + config.tcpConfig(), connectionObserver, httpExecutionContext), toStreamingHttpService(offloadNone(), service), true, connectionObserver).toFuture().get(); } diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/SslCertificateCompressionTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/SslCertificateCompressionTest.java index df9ac60bb9..5da1f4da7b 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/SslCertificateCompressionTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/SslCertificateCompressionTest.java @@ -139,6 +139,7 @@ static class SslBytesReadTransportObserver implements TransportObserver { @Override public ConnectionObserver onNewConnection(@Nullable final Object localAddress, final Object remoteAddress) { return new ConnectionObserver() { + @Override public void onDataRead(final int size) { if (inHandshake) { @@ -174,7 +175,7 @@ public void onFlush() { } @Override - public void onTransportHandshakeComplete() { + public void onTransportHandshakeComplete(final ConnectionInfo info) { } @Override diff --git a/servicetalk-tcp-netty-internal/gradle/spotbugs/main-exclusions.xml b/servicetalk-tcp-netty-internal/gradle/spotbugs/main-exclusions.xml index a2134f331e..900a33a5ff 100644 --- a/servicetalk-tcp-netty-internal/gradle/spotbugs/main-exclusions.xml +++ b/servicetalk-tcp-netty-internal/gradle/spotbugs/main-exclusions.xml @@ -54,4 +54,14 @@ + + + + + + + + + + diff --git a/servicetalk-tcp-netty-internal/src/main/java/io/servicetalk/tcp/netty/internal/AbstractReadOnlyTcpConfig.java b/servicetalk-tcp-netty-internal/src/main/java/io/servicetalk/tcp/netty/internal/AbstractReadOnlyTcpConfig.java index cc9b6d44e9..0bff3355fe 100644 --- a/servicetalk-tcp-netty-internal/src/main/java/io/servicetalk/tcp/netty/internal/AbstractReadOnlyTcpConfig.java +++ b/servicetalk-tcp-netty-internal/src/main/java/io/servicetalk/tcp/netty/internal/AbstractReadOnlyTcpConfig.java @@ -17,6 +17,7 @@ import io.servicetalk.logging.api.UserDataLoggerConfig; import io.servicetalk.transport.api.ServiceTalkSocketOptions; +import io.servicetalk.transport.api.SslConfig; import io.servicetalk.transport.netty.internal.FlushStrategy; import io.netty.channel.ChannelOption; @@ -34,7 +35,7 @@ * * @param type of security configuration */ -abstract class AbstractReadOnlyTcpConfig { +abstract class AbstractReadOnlyTcpConfig { @SuppressWarnings("rawtypes") private final Map options; private final long idleTimeoutMs; @@ -106,4 +107,12 @@ public final UserDataLoggerConfig wireLoggerConfig() { */ @Nullable public abstract SslContext sslContext(); + + /** + * Get the {@link SslConfig}. + * + * @return the {@link SslConfig}, or {@code null} if SSL/TLS is not configured. + */ + @Nullable + public abstract SecurityConfig sslConfig(); } diff --git a/servicetalk-tcp-netty-internal/src/main/java/io/servicetalk/tcp/netty/internal/AbstractTcpConfig.java b/servicetalk-tcp-netty-internal/src/main/java/io/servicetalk/tcp/netty/internal/AbstractTcpConfig.java index 96cc1c9cc7..155f7ada6d 100644 --- a/servicetalk-tcp-netty-internal/src/main/java/io/servicetalk/tcp/netty/internal/AbstractTcpConfig.java +++ b/servicetalk-tcp-netty-internal/src/main/java/io/servicetalk/tcp/netty/internal/AbstractTcpConfig.java @@ -40,7 +40,7 @@ * * @param type of {@link SslConfig}. */ -abstract class AbstractTcpConfig { +abstract class AbstractTcpConfig { @Nullable @SuppressWarnings("rawtypes") diff --git a/servicetalk-tcp-netty-internal/src/main/java/io/servicetalk/tcp/netty/internal/TcpClientChannelInitializer.java b/servicetalk-tcp-netty-internal/src/main/java/io/servicetalk/tcp/netty/internal/TcpClientChannelInitializer.java index 6c3816a66b..030076f34b 100644 --- a/servicetalk-tcp-netty-internal/src/main/java/io/servicetalk/tcp/netty/internal/TcpClientChannelInitializer.java +++ b/servicetalk-tcp-netty-internal/src/main/java/io/servicetalk/tcp/netty/internal/TcpClientChannelInitializer.java @@ -1,5 +1,5 @@ /* - * Copyright © 2018-2020 Apple Inc. and the ServiceTalk project authors + * Copyright © 2018-2023 Apple Inc. and the ServiceTalk project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,7 +16,9 @@ package io.servicetalk.tcp.netty.internal; import io.servicetalk.transport.api.ClientSslConfig; +import io.servicetalk.transport.api.ConnectionInfo; import io.servicetalk.transport.api.ConnectionObserver; +import io.servicetalk.transport.api.ExecutionContext; import io.servicetalk.transport.netty.internal.ChannelInitializer; import io.servicetalk.transport.netty.internal.ConnectionObserverInitializer; import io.servicetalk.transport.netty.internal.DeferSslHandler; @@ -29,6 +31,7 @@ import io.netty.handler.ssl.SslHandler; import static io.servicetalk.tcp.netty.internal.TcpServerChannelInitializer.initWireLogger; +import static io.servicetalk.transport.netty.internal.ExecutionContextUtils.channelExecutionContext; /** * {@link ChannelInitializer} for TCP client. @@ -42,7 +45,10 @@ public class TcpClientChannelInitializer implements ChannelInitializer { // F * * @param config to use for initialization. * @param observer {@link ConnectionObserver} to report network events. + * @deprecated Use + * {@link #TcpClientChannelInitializer(ReadOnlyTcpClientConfig, ConnectionObserver, ExecutionContext, boolean)} */ + @Deprecated // FIXME: 0.43 - remove deprecated ctor public TcpClientChannelInitializer(final ReadOnlyTcpClientConfig config, final ConnectionObserver observer) { this(config, observer, false); @@ -54,25 +60,49 @@ public TcpClientChannelInitializer(final ReadOnlyTcpClientConfig config, * @param config to use for initialization. * @param observer {@link ConnectionObserver} to report network events. * @param deferSslHandler {@code true} to wrap the {@link SslHandler} in a {@link DeferSslHandler}. + * @deprecated Use + * {@link #TcpClientChannelInitializer(ReadOnlyTcpClientConfig, ConnectionObserver, ExecutionContext, boolean)} */ + @Deprecated // FIXME: 0.43 - remove deprecated ctor + @SuppressWarnings("DataFlowIssue") public TcpClientChannelInitializer(final ReadOnlyTcpClientConfig config, final ConnectionObserver observer, final boolean deferSslHandler) { + this(config, observer, null, deferSslHandler); + } + + /** + * Creates a {@link ChannelInitializer} for the {@code config}. + * + * @param config to use for initialization. + * @param observer {@link ConnectionObserver} to report network events. + * @param executionContext {@link ExecutionContext} to use for {@link ConnectionInfo} reporting. + * @param deferSslHandler {@code true} to wrap the {@link SslHandler} in a {@link DeferSslHandler}. + */ + @SuppressWarnings("ConstantValue") + public TcpClientChannelInitializer(final ReadOnlyTcpClientConfig config, + final ConnectionObserver observer, + final ExecutionContext executionContext, + final boolean deferSslHandler) { ChannelInitializer delegate = ChannelInitializer.defaultInitializer(); - final SslContext sslContext = config.sslContext(); + final ClientSslConfig sslConfig = config.sslConfig(); if (observer != NoopConnectionObserver.INSTANCE) { delegate = delegate.andThen(new ConnectionObserverInitializer(observer, - sslContext != null && !deferSslHandler, true)); + channel -> new TcpConnectionInfo(channel, + // ExecutionContext can be null if users used deprecated ctor + executionContext == null ? null : channelExecutionContext(channel, executionContext), + sslConfig, config.idleTimeoutMs()), + sslConfig != null && !deferSslHandler, true)); } if (config.idleTimeoutMs() > 0L) { delegate = delegate.andThen(new IdleTimeoutInitializer(config.idleTimeoutMs())); } - if (sslContext != null) { - ClientSslConfig sslConfig = config.sslConfig(); - assert sslConfig != null; + if (sslConfig != null) { + final SslContext sslContext = config.sslContext(); + assert sslContext != null; delegate = delegate.andThen(new SslClientChannelInitializer(sslContext, sslConfig, deferSslHandler)); } diff --git a/servicetalk-tcp-netty-internal/src/main/java/io/servicetalk/tcp/netty/internal/TcpConnectionInfo.java b/servicetalk-tcp-netty-internal/src/main/java/io/servicetalk/tcp/netty/internal/TcpConnectionInfo.java new file mode 100644 index 0000000000..a23b8d98dd --- /dev/null +++ b/servicetalk-tcp-netty-internal/src/main/java/io/servicetalk/tcp/netty/internal/TcpConnectionInfo.java @@ -0,0 +1,93 @@ +/* + * Copyright © 2023 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.tcp.netty.internal; + +import io.servicetalk.transport.api.ConnectionInfo; +import io.servicetalk.transport.api.ExecutionContext; +import io.servicetalk.transport.api.SslConfig; + +import io.netty.channel.Channel; + +import java.net.SocketAddress; +import java.net.SocketOption; +import javax.annotation.Nullable; +import javax.net.ssl.SSLSession; + +import static io.servicetalk.transport.netty.internal.SocketOptionUtils.getOption; + +final class TcpConnectionInfo implements ConnectionInfo { + + private static final Protocol TCP_PROTOCOL = () -> "TCP"; + + private final Channel channel; + private final ExecutionContext executionContext; + @Nullable + private final SslConfig sslConfig; + private final long idleTimeoutMs; + + TcpConnectionInfo(final Channel channel, + final ExecutionContext executionContext, + @Nullable final SslConfig sslConfig, + final long idleTimeoutMs) { + this.channel = channel; + this.executionContext = executionContext; + this.sslConfig = sslConfig; + this.idleTimeoutMs = idleTimeoutMs; + } + + @Override + public SocketAddress localAddress() { + return channel.localAddress(); + } + + @Override + public SocketAddress remoteAddress() { + return channel.remoteAddress(); + } + + @Override + public ExecutionContext executionContext() { + return executionContext; + } + + @Nullable + @Override + public SslConfig sslConfig() { + return sslConfig; + } + + @Nullable + @Override + public SSLSession sslSession() { + return null; + } + + @Nullable + @Override + public T socketOption(final SocketOption option) { + return getOption(option, channel.config(), idleTimeoutMs); + } + + @Override + public Protocol protocol() { + return TCP_PROTOCOL; + } + + @Override + public String toString() { + return channel.toString(); + } +} diff --git a/servicetalk-tcp-netty-internal/src/main/java/io/servicetalk/tcp/netty/internal/TcpServerBinder.java b/servicetalk-tcp-netty-internal/src/main/java/io/servicetalk/tcp/netty/internal/TcpServerBinder.java index d4c46f1e69..ead1cf277d 100644 --- a/servicetalk-tcp-netty-internal/src/main/java/io/servicetalk/tcp/netty/internal/TcpServerBinder.java +++ b/servicetalk-tcp-netty-internal/src/main/java/io/servicetalk/tcp/netty/internal/TcpServerBinder.java @@ -21,14 +21,12 @@ import io.servicetalk.concurrent.api.internal.SubscribableSingle; import io.servicetalk.transport.api.ConnectionAcceptor; import io.servicetalk.transport.api.ConnectionContext; -import io.servicetalk.transport.api.ConnectionInfo; import io.servicetalk.transport.api.ConnectionObserver; import io.servicetalk.transport.api.EarlyConnectionAcceptor; import io.servicetalk.transport.api.ExecutionContext; import io.servicetalk.transport.api.IoThreadFactory; import io.servicetalk.transport.api.LateConnectionAcceptor; import io.servicetalk.transport.api.ServerContext; -import io.servicetalk.transport.api.SslConfig; import io.servicetalk.transport.netty.internal.BuilderUtils; import io.servicetalk.transport.netty.internal.ChannelSet; import io.servicetalk.transport.netty.internal.EventLoopAwareNettyIoExecutor; @@ -49,12 +47,10 @@ import org.slf4j.LoggerFactory; import java.net.SocketAddress; -import java.net.SocketOption; import java.util.Map; import java.util.function.BiFunction; import java.util.function.Consumer; import javax.annotation.Nullable; -import javax.net.ssl.SSLSession; import static io.servicetalk.concurrent.api.Executors.immediate; import static io.servicetalk.concurrent.api.Single.defer; @@ -64,7 +60,6 @@ import static io.servicetalk.transport.netty.internal.CopyByteBufHandlerChannelInitializer.POOLED_ALLOCATOR; import static io.servicetalk.transport.netty.internal.EventLoopAwareNettyIoExecutors.toEventLoopAwareNettyIoExecutor; import static io.servicetalk.transport.netty.internal.ExecutionContextUtils.channelExecutionContext; -import static io.servicetalk.transport.netty.internal.SocketOptionUtils.getOption; import static java.util.Objects.requireNonNull; /** @@ -229,51 +224,12 @@ private static Single wrapConnectionAcceptors if (earlyConnectionAcceptor != null) { final ExecutionContext channelExecutionContext = channelExecutionContext(channel, executionContext); - ConnectionInfo info = new ConnectionInfo() { - @Override - public SocketAddress localAddress() { - return channel.localAddress(); - } - - @Override - public SocketAddress remoteAddress() { - return channel.remoteAddress(); - } - - @Override - public ExecutionContext executionContext() { - return channelExecutionContext; - } - - @Nullable - @Override - public SslConfig sslConfig() { - return config.sslConfig(); - } - - @Nullable - @Override - public SSLSession sslSession() { - return null; - } - - @Nullable - @Override - public T socketOption(final SocketOption option) { - return getOption(option, channel.config(), config.idleTimeoutMs()); - } - - @Override - public Protocol protocol() { - return () -> "TCP"; - } - }; - final EarlyConnectionAcceptorHandler acceptorHandler = new EarlyConnectionAcceptorHandler(); channel.pipeline().addLast(acceptorHandler); // Defer is required to isolate the context between the user accept callback and the rest of the connection - Completable earlyCompletable = Completable.defer(() -> earlyConnectionAcceptor.accept(info)); + Completable earlyCompletable = Completable.defer(() -> earlyConnectionAcceptor.accept(new TcpConnectionInfo( + channel, channelExecutionContext, config.sslConfig(), config.idleTimeoutMs()))); if (earlyConnectionAcceptor.requiredOffloads().isConnectOffloaded()) { earlyCompletable = earlyCompletable.subscribeOn(offloadExecutor); diff --git a/servicetalk-tcp-netty-internal/src/main/java/io/servicetalk/tcp/netty/internal/TcpServerChannelInitializer.java b/servicetalk-tcp-netty-internal/src/main/java/io/servicetalk/tcp/netty/internal/TcpServerChannelInitializer.java index e23858f32c..93c66a9b5c 100644 --- a/servicetalk-tcp-netty-internal/src/main/java/io/servicetalk/tcp/netty/internal/TcpServerChannelInitializer.java +++ b/servicetalk-tcp-netty-internal/src/main/java/io/servicetalk/tcp/netty/internal/TcpServerChannelInitializer.java @@ -1,5 +1,5 @@ /* - * Copyright © 2018-2020 Apple Inc. and the ServiceTalk project authors + * Copyright © 2018-2023 Apple Inc. and the ServiceTalk project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,7 +16,9 @@ package io.servicetalk.tcp.netty.internal; import io.servicetalk.logging.api.UserDataLoggerConfig; +import io.servicetalk.transport.api.ConnectionInfo; import io.servicetalk.transport.api.ConnectionObserver; +import io.servicetalk.transport.api.ExecutionContext; import io.servicetalk.transport.netty.internal.ChannelInitializer; import io.servicetalk.transport.netty.internal.ConnectionObserverInitializer; import io.servicetalk.transport.netty.internal.IdleTimeoutInitializer; @@ -29,6 +31,8 @@ import javax.annotation.Nullable; +import static io.servicetalk.transport.netty.internal.ExecutionContextUtils.channelExecutionContext; + /** * {@link ChannelInitializer} for TCP. */ @@ -41,14 +45,36 @@ public class TcpServerChannelInitializer implements ChannelInitializer { // F * * @param config to use for initialization. * @param observer {@link ConnectionObserver} to report network events. + * @deprecated Use + * {@link #TcpServerChannelInitializer(ReadOnlyTcpServerConfig, ConnectionObserver, ExecutionContext)} */ + @Deprecated + @SuppressWarnings("DataFlowIssue") public TcpServerChannelInitializer(final ReadOnlyTcpServerConfig config, final ConnectionObserver observer) { + this(config, observer, null); + } + + /** + * Creates a {@link ChannelInitializer} for the {@code config}. + * + * @param config to use for initialization. + * @param observer {@link ConnectionObserver} to report network events. + * @param executionContext {@link ExecutionContext} to use for {@link ConnectionInfo} reporting. + */ + @SuppressWarnings("ConstantValue") + public TcpServerChannelInitializer(final ReadOnlyTcpServerConfig config, + final ConnectionObserver observer, + final ExecutionContext executionContext) { ChannelInitializer delegate = ChannelInitializer.defaultInitializer(); if (observer != NoopConnectionObserver.INSTANCE) { - delegate = delegate.andThen( - new ConnectionObserverInitializer(observer, config.sslContext() != null, false)); + delegate = delegate.andThen(new ConnectionObserverInitializer(observer, + channel -> new TcpConnectionInfo(channel, + // ExecutionContext can be null if users used deprecated ctor + executionContext == null ? null : channelExecutionContext(channel, executionContext), + config.sslConfig(), config.idleTimeoutMs()), + config.sslConfig() != null, false)); } if (config.idleTimeoutMs() > 0L) { @@ -56,10 +82,12 @@ public TcpServerChannelInitializer(final ReadOnlyTcpServerConfig config, } if (config.sniMapping() != null) { + assert config.sslConfig() != null; assert config.sslContext() != null; delegate = delegate.andThen(new SniServerChannelInitializer(config.sniMapping(), config.sniMaxClientHelloLength(), config.sniClientHelloTimeout().toMillis())); } else if (config.sslContext() != null) { + assert config.sslConfig() != null; delegate = delegate.andThen(new SslServerChannelInitializer(config.sslContext())); } diff --git a/servicetalk-tcp-netty-internal/src/test/java/io/servicetalk/tcp/netty/internal/SecureTcpTransportObserverErrorsTest.java b/servicetalk-tcp-netty-internal/src/test/java/io/servicetalk/tcp/netty/internal/SecureTcpTransportObserverErrorsTest.java index 4fbe45ed7a..29227a864a 100644 --- a/servicetalk-tcp-netty-internal/src/test/java/io/servicetalk/tcp/netty/internal/SecureTcpTransportObserverErrorsTest.java +++ b/servicetalk-tcp-netty-internal/src/test/java/io/servicetalk/tcp/netty/internal/SecureTcpTransportObserverErrorsTest.java @@ -173,7 +173,9 @@ void testSslErrors(ErrorReason errorReason, clientConnected.countDown(); }); verify(clientTransportObserver, await()).onNewConnection(any(), any()); + verify(clientConnectionObserver, await()).onTransportHandshakeComplete(any()); verify(serverTransportObserver, await()).onNewConnection(any(), any()); + verify(serverConnectionObserver, await()).onTransportHandshakeComplete(any()); switch (errorReason) { case SECURE_CLIENT_TO_PLAIN_SERVER: verify(clientConnectionObserver, await()).onSecurityHandshake(); diff --git a/servicetalk-tcp-netty-internal/src/test/java/io/servicetalk/tcp/netty/internal/SecureTcpTransportObserverTest.java b/servicetalk-tcp-netty-internal/src/test/java/io/servicetalk/tcp/netty/internal/SecureTcpTransportObserverTest.java index 757780ded1..00460a2662 100644 --- a/servicetalk-tcp-netty-internal/src/test/java/io/servicetalk/tcp/netty/internal/SecureTcpTransportObserverTest.java +++ b/servicetalk-tcp-netty-internal/src/test/java/io/servicetalk/tcp/netty/internal/SecureTcpTransportObserverTest.java @@ -76,9 +76,9 @@ void testConnectionObserverEvents(SslProvider clientProvider, SslProvider server verify(clientTransportObserver).onNewConnection(any(), any()); verify(serverTransportObserver, await()).onNewConnection(any(), any()); - verify(clientConnectionObserver).onTransportHandshakeComplete(); + verify(clientConnectionObserver).onTransportHandshakeComplete(any()); verify(clientConnectionObserver).connectionEstablished(any(ConnectionInfo.class)); - verify(serverConnectionObserver, await()).onTransportHandshakeComplete(); + verify(serverConnectionObserver, await()).onTransportHandshakeComplete(any()); verify(serverConnectionObserver, await()).connectionEstablished(any(ConnectionInfo.class)); // handshake starts diff --git a/servicetalk-tcp-netty-internal/src/test/java/io/servicetalk/tcp/netty/internal/TcpTransportObserverErrorsTest.java b/servicetalk-tcp-netty-internal/src/test/java/io/servicetalk/tcp/netty/internal/TcpTransportObserverErrorsTest.java index 49a8cb8015..b8e54df663 100644 --- a/servicetalk-tcp-netty-internal/src/test/java/io/servicetalk/tcp/netty/internal/TcpTransportObserverErrorsTest.java +++ b/servicetalk-tcp-netty-internal/src/test/java/io/servicetalk/tcp/netty/internal/TcpTransportObserverErrorsTest.java @@ -142,9 +142,9 @@ void testConnectionClosed(ErrorSource errorSource) throws Exception { if (errorSource != ErrorSource.CONNECTION_REFUSED) { assertThat(connection, is(notNullValue())); verify(serverTransportObserver, await()).onNewConnection(any(), any()); - verify(clientConnectionObserver).onTransportHandshakeComplete(); + verify(clientConnectionObserver).onTransportHandshakeComplete(any()); verify(clientConnectionObserver).connectionEstablished(any(ConnectionInfo.class)); - verify(serverConnectionObserver, await()).onTransportHandshakeComplete(); + verify(serverConnectionObserver, await()).onTransportHandshakeComplete(any()); verify(serverConnectionObserver, await()).connectionEstablished(any(ConnectionInfo.class)); } else { assertThat(connection, is(nullValue())); diff --git a/servicetalk-tcp-netty-internal/src/test/java/io/servicetalk/tcp/netty/internal/TcpTransportObserverTest.java b/servicetalk-tcp-netty-internal/src/test/java/io/servicetalk/tcp/netty/internal/TcpTransportObserverTest.java index b5aedf1460..c9d8637203 100644 --- a/servicetalk-tcp-netty-internal/src/test/java/io/servicetalk/tcp/netty/internal/TcpTransportObserverTest.java +++ b/servicetalk-tcp-netty-internal/src/test/java/io/servicetalk/tcp/netty/internal/TcpTransportObserverTest.java @@ -48,9 +48,9 @@ void testConnectionObserverEvents() throws Exception { NettyConnection connection = client.connectBlocking(CLIENT_CTX, serverAddress); verify(clientTransportObserver).onNewConnection(any(), any()); verify(serverTransportObserver, await()).onNewConnection(any(), any()); - verify(clientConnectionObserver).onTransportHandshakeComplete(); + verify(clientConnectionObserver).onTransportHandshakeComplete(any()); verify(clientConnectionObserver).connectionEstablished(any(ConnectionInfo.class)); - verify(serverConnectionObserver, await()).onTransportHandshakeComplete(); + verify(serverConnectionObserver, await()).onTransportHandshakeComplete(any()); verify(serverConnectionObserver, await()).connectionEstablished(any(ConnectionInfo.class)); Buffer content = connection.executionContext().bufferAllocator().fromAscii("Hello"); diff --git a/servicetalk-tcp-netty-internal/src/testFixtures/java/io/servicetalk/tcp/netty/internal/TcpClient.java b/servicetalk-tcp-netty-internal/src/testFixtures/java/io/servicetalk/tcp/netty/internal/TcpClient.java index b176f5c73a..4d28675c2e 100644 --- a/servicetalk-tcp-netty-internal/src/testFixtures/java/io/servicetalk/tcp/netty/internal/TcpClient.java +++ b/servicetalk-tcp-netty-internal/src/testFixtures/java/io/servicetalk/tcp/netty/internal/TcpClient.java @@ -97,7 +97,7 @@ public Single> connect(ExecutionContext execu executionContext.bufferAllocator(), executionContext.executor(), executionContext.ioExecutor(), UNSUPPORTED_PROTOCOL_CLOSE_HANDLER, config.flushStrategy(), config.idleTimeoutMs(), config.sslConfig(), - new TcpClientChannelInitializer(config, connectionObserver).andThen( + new TcpClientChannelInitializer(config, connectionObserver, executionContext, false).andThen( channel2 -> channel2.pipeline().addLast(BufferHandler.INSTANCE)), executionContext.executionStrategy(), TCP, connectionObserver, true, __ -> false), observer); diff --git a/servicetalk-tcp-netty-internal/src/testFixtures/java/io/servicetalk/tcp/netty/internal/TcpServer.java b/servicetalk-tcp-netty-internal/src/testFixtures/java/io/servicetalk/tcp/netty/internal/TcpServer.java index 6cd941f8e0..c8517d5533 100644 --- a/servicetalk-tcp-netty-internal/src/testFixtures/java/io/servicetalk/tcp/netty/internal/TcpServer.java +++ b/servicetalk-tcp-netty-internal/src/testFixtures/java/io/servicetalk/tcp/netty/internal/TcpServer.java @@ -87,7 +87,7 @@ public ServerContext bind(ExecutionContext executionContext, int port, executionContext.bufferAllocator(), executionContext.executor(), executionContext.ioExecutor(), UNSUPPORTED_PROTOCOL_CLOSE_HANDLER, config.flushStrategy(), config.idleTimeoutMs(), config.sslConfig(), - new TcpServerChannelInitializer(config, connectionObserver) + new TcpServerChannelInitializer(config, connectionObserver, executionContext) .andThen(getChannelInitializer(service, executionContext)), executionStrategy, TCP, connectionObserver, false, __ -> false), serverConnection -> service.apply(serverConnection) diff --git a/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/BiTransportObserver.java b/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/BiTransportObserver.java index bd2ad62a9a..ef30151a8b 100644 --- a/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/BiTransportObserver.java +++ b/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/BiTransportObserver.java @@ -73,9 +73,9 @@ public void onFlush() { } @Override - public void onTransportHandshakeComplete() { - first.onTransportHandshakeComplete(); - second.onTransportHandshakeComplete(); + public void onTransportHandshakeComplete(final ConnectionInfo info) { + first.onTransportHandshakeComplete(info); + second.onTransportHandshakeComplete(info); } @Override diff --git a/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/CatchAllTransportObserver.java b/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/CatchAllTransportObserver.java index aa235348fd..65e2d1de9d 100644 --- a/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/CatchAllTransportObserver.java +++ b/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/CatchAllTransportObserver.java @@ -85,8 +85,8 @@ public void onFlush() { } @Override - public void onTransportHandshakeComplete() { - safeReport(observer::onTransportHandshakeComplete, observer, "flush"); + public void onTransportHandshakeComplete(final ConnectionInfo info) { + safeReport(() -> observer.onTransportHandshakeComplete(info), observer, "transport handshake complete"); } @Override diff --git a/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/ConnectionObserver.java b/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/ConnectionObserver.java index 56d7bd2827..1b2c5c0353 100644 --- a/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/ConnectionObserver.java +++ b/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/ConnectionObserver.java @@ -1,5 +1,5 @@ /* - * Copyright © 2020 Apple Inc. and the ServiceTalk project authors + * Copyright © 2020-2023 Apple Inc. and the ServiceTalk project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -52,13 +52,34 @@ public interface ConnectionObserver { *

* Transport protocols that require a handshake in order to connect. Example: * TCP "three-way handshake". + * + * @deprecated Use {@link #onTransportHandshakeComplete(ConnectionInfo)} */ - void onTransportHandshakeComplete(); + @Deprecated + default void onTransportHandshakeComplete() { + } + + /** + * Callback when a transport handshake completes. + *

+ * Transport protocols that require a handshake in order to connect. Example: + * TCP "three-way handshake". + * + * @param info {@link ConnectionInfo} for the connection after transport handshake completes. Note that the + * {@link ConnectionInfo#sslSession()} will always return {@code null} since it is called before the + * {@link ConnectionObserver#onSecurityHandshake() security handshake} is performed (and as a result no SSL session + * has been established). Also, {@link ConnectionInfo#protocol()} will return L4 (transport) protocol. + * Finalized {@link ConnectionInfo} will be available via {@link #connectionEstablished(ConnectionInfo)} or + * {@link #multiplexedConnectionEstablished(ConnectionInfo)} callbacks. + */ + default void onTransportHandshakeComplete(ConnectionInfo info) { + onTransportHandshakeComplete(); + } /** * Callback when a proxy connect is initiated. *

- * For a typical connection, this callback is invoked after {@link #onTransportHandshakeComplete()}. + * For a typical connection, this callback is invoked after {@link #onTransportHandshakeComplete(ConnectionInfo)}. * * @param connectMsg a message sent to a proxy in request to establish a connection to the target server * @return a new {@link ProxyConnectObserver} that provides visibility into proxy connect events. @@ -70,8 +91,8 @@ default ProxyConnectObserver onProxyConnect(Object connectMsg) { // FIXME: 0. /** * Callback when a security handshake is initiated. *

- * For a typical connection, this callback is invoked after {@link #onTransportHandshakeComplete()}. There are may - * be exceptions: + * For a typical connection, this callback is invoked after {@link #onTransportHandshakeComplete(ConnectionInfo)}. + * There are may be exceptions: *

    *
  1. For a TCP connection, when {@link ServiceTalkSocketOptions#TCP_FASTOPEN_CONNECT} option is configured and * the Fast Open feature is supported by the OS, this callback may be invoked earlier. Note, even if the Fast diff --git a/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/NoopTransportObserver.java b/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/NoopTransportObserver.java index fce93727e0..9ed6c3f242 100644 --- a/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/NoopTransportObserver.java +++ b/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/NoopTransportObserver.java @@ -60,7 +60,7 @@ public void onFlush() { } @Override - public void onTransportHandshakeComplete() { + public void onTransportHandshakeComplete(final ConnectionInfo info) { } @Override diff --git a/servicetalk-transport-netty-internal/gradle/spotbugs/main-exclusions.xml b/servicetalk-transport-netty-internal/gradle/spotbugs/main-exclusions.xml index 10b5cd9b5b..2aba0f5b49 100644 --- a/servicetalk-transport-netty-internal/gradle/spotbugs/main-exclusions.xml +++ b/servicetalk-transport-netty-internal/gradle/spotbugs/main-exclusions.xml @@ -97,4 +97,11 @@ + + + + + + + diff --git a/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/ConnectionObserverInitializer.java b/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/ConnectionObserverInitializer.java index 0eb93fdc1f..31a2bbdffd 100644 --- a/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/ConnectionObserverInitializer.java +++ b/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/ConnectionObserverInitializer.java @@ -1,5 +1,5 @@ /* - * Copyright © 2020 Apple Inc. and the ServiceTalk project authors + * Copyright © 2020-2023 Apple Inc. and the ServiceTalk project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,8 +15,11 @@ */ package io.servicetalk.transport.netty.internal; +import io.servicetalk.transport.api.ConnectionInfo; import io.servicetalk.transport.api.ConnectionObserver; import io.servicetalk.transport.api.ConnectionObserver.SecurityHandshakeObserver; +import io.servicetalk.transport.api.ExecutionContext; +import io.servicetalk.transport.api.SslConfig; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufHolder; @@ -28,10 +31,15 @@ import io.netty.channel.epoll.Epoll; import io.netty.channel.kqueue.KQueue; +import java.net.SocketAddress; +import java.net.SocketOption; +import java.util.function.Function; import javax.annotation.Nullable; +import javax.net.ssl.SSLSession; import static io.netty.channel.ChannelOption.TCP_FASTOPEN_CONNECT; import static io.servicetalk.transport.netty.internal.ChannelCloseUtils.channelError; +import static io.servicetalk.transport.netty.internal.SocketOptionUtils.getOption; import static java.util.Objects.requireNonNull; /** @@ -40,20 +48,41 @@ public final class ConnectionObserverInitializer implements ChannelInitializer { private final ConnectionObserver observer; - private final boolean secure; + private final Function connectionInfoFactory; + private final boolean handshakeOnActive; private final boolean client; /** * Creates a new instance. * * @param observer {@link ConnectionObserver} to report network events. - * @param secure {@code true} if the observed connection is secure + * @param handshakeOnActive {@code true} if the observed connection is secure * @param client {@code true} if this initializer is used on the client-side + * @deprecated Use {@link #ConnectionObserverInitializer(ConnectionObserver, Function, boolean, boolean)} instead */ - public ConnectionObserverInitializer(final ConnectionObserver observer, final boolean secure, + @Deprecated // FIXME: 0.43 - remove deprecated ctor + public ConnectionObserverInitializer(final ConnectionObserver observer, + final boolean handshakeOnActive, + final boolean client) { + this(observer, PartialConnectionInfo::new, handshakeOnActive, client); + } + + /** + * Creates a new instance. + * + * @param observer {@link ConnectionObserver} to report network events + * @param connectionInfoFactory {@link Function} that creates {@link ConnectionInfo} from the provided + * {@link Channel} to report {@link ConnectionObserver#onTransportHandshakeComplete(ConnectionInfo)} + * @param handshakeOnActive {@code true} if the observed connection is secure + * @param client {@code true} if this initializer is used on the client-side + */ + public ConnectionObserverInitializer(final ConnectionObserver observer, + final Function connectionInfoFactory, + final boolean handshakeOnActive, final boolean client) { this.observer = requireNonNull(observer); - this.secure = secure; + this.connectionInfoFactory = requireNonNull(connectionInfoFactory); + this.handshakeOnActive = handshakeOnActive; this.client = client; } @@ -67,25 +96,31 @@ public void init(final Channel channel) { observer.connectionClosed(t); } }); - channel.pipeline().addLast(new ConnectionObserverHandler(observer, secure, isFastOpen(channel))); + channel.pipeline().addLast( + new ConnectionObserverHandler(observer, connectionInfoFactory, handshakeOnActive, isFastOpen(channel))); } private boolean isFastOpen(final Channel channel) { - return client && secure && Boolean.TRUE.equals(channel.config().getOption(TCP_FASTOPEN_CONNECT)) && + return client && handshakeOnActive && Boolean.TRUE.equals(channel.config().getOption(TCP_FASTOPEN_CONNECT)) && (Epoll.isTcpFastOpenClientSideAvailable() || KQueue.isTcpFastOpenClientSideAvailable()); } static final class ConnectionObserverHandler extends ChannelDuplexHandler { private final ConnectionObserver observer; - private final boolean secure; + private final Function connectionInfoFactory; + private final boolean handshakeOnActive; private boolean tcpHandshakeComplete; @Nullable private SecurityHandshakeObserver handshakeObserver; - ConnectionObserverHandler(final ConnectionObserver observer, final boolean secure, final boolean fastOpen) { + ConnectionObserverHandler(final ConnectionObserver observer, + final Function connectionInfoFactory, + final boolean handshakeOnActive, + final boolean fastOpen) { this.observer = observer; - this.secure = secure; + this.connectionInfoFactory = connectionInfoFactory; + this.handshakeOnActive = handshakeOnActive; if (fastOpen) { reportSecurityHandshakeStarting(); } @@ -93,9 +128,10 @@ static final class ConnectionObserverHandler extends ChannelDuplexHandler { @Override public void handlerAdded(final ChannelHandlerContext ctx) { - if (ctx.channel().isActive()) { - reportTcpHandshakeComplete(); - if (secure) { + final Channel channel = ctx.channel(); + if (channel.isActive()) { + reportTcpHandshakeComplete(channel); + if (handshakeOnActive) { reportSecurityHandshakeStarting(); } } @@ -103,17 +139,17 @@ public void handlerAdded(final ChannelHandlerContext ctx) { @Override public void channelActive(final ChannelHandlerContext ctx) { - reportTcpHandshakeComplete(); - if (secure) { + reportTcpHandshakeComplete(ctx.channel()); + if (handshakeOnActive) { reportSecurityHandshakeStarting(); } ctx.fireChannelActive(); } - private void reportTcpHandshakeComplete() { + private void reportTcpHandshakeComplete(final Channel channel) { if (!tcpHandshakeComplete) { tcpHandshakeComplete = true; - observer.onTransportHandshakeComplete(); + observer.onTransportHandshakeComplete(connectionInfoFactory.apply(channel)); } } @@ -160,4 +196,64 @@ public void channelWritabilityChanged(final ChannelHandlerContext ctx) { ctx.fireChannelWritabilityChanged(); } } + + /** + * Implementation of {@link ConnectionInfo} that will be used only if users use our deprecated internal API. + * It's not used for regular users or if users of internal API migrate to recommended constructors. + */ + // FIXME: 0.43 - remove this class after deprecated public constructors removed + private static final class PartialConnectionInfo implements ConnectionInfo { + + private static final Protocol TCP_PROTOCOL = () -> "TCP"; + + private final Channel channel; + + PartialConnectionInfo(final Channel channel) { + this.channel = channel; + } + + @Override + public SocketAddress localAddress() { + return channel.localAddress(); + } + + @Override + public SocketAddress remoteAddress() { + return channel.remoteAddress(); + } + + @Override + @SuppressWarnings("DataFlowIssue") + public ExecutionContext executionContext() { + return null; + } + + @Nullable + @Override + public SslConfig sslConfig() { + return null; + } + + @Nullable + @Override + public SSLSession sslSession() { + return null; + } + + @Nullable + @Override + public T socketOption(final SocketOption option) { + return getOption(option, channel.config(), 0L); + } + + @Override + public Protocol protocol() { + return TCP_PROTOCOL; + } + + @Override + public String toString() { + return channel.toString(); + } + } } diff --git a/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/NoopTransportObserver.java b/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/NoopTransportObserver.java index cb6b4828e0..e0cf91f583 100644 --- a/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/NoopTransportObserver.java +++ b/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/NoopTransportObserver.java @@ -69,7 +69,7 @@ public void onFlush() { } @Override - public void onTransportHandshakeComplete() { + public void onTransportHandshakeComplete(final ConnectionInfo info) { } @Override From ed5993195987698b8db7b5009d7c01829b5300a3 Mon Sep 17 00:00:00 2001 From: Idel Pivnitskiy Date: Tue, 10 Oct 2023 11:29:27 -0700 Subject: [PATCH 3/9] Update Netty 4.1.99 -> 4.1.100 (#2727) --- gradle.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle.properties b/gradle.properties index f3b7fba7bf..c30b12c487 100644 --- a/gradle.properties +++ b/gradle.properties @@ -30,7 +30,7 @@ issueManagementUrl=https://github.com/apple/servicetalk/issues ciManagementUrl=https://github.com/apple/servicetalk/actions # dependency versions -nettyVersion=4.1.99.Final +nettyVersion=4.1.100.Final nettyIoUringVersion=0.0.23.Final jsr305Version=3.0.2 From 10b73741fbe19fe1fa0880b33ebcdff2955e1e05 Mon Sep 17 00:00:00 2001 From: Idel Pivnitskiy Date: Wed, 11 Oct 2023 08:46:00 -0700 Subject: [PATCH 4/9] Restore API/ABI compatibility with version 0.42.37 (#2729) Motivation: `japicmp.sh` shows the following warnings: ``` Comparing binary compatibility of servicetalk-http-netty-0.42.38-SNAPSHOT.jar against servicetalk-http-netty-0.42.37.jar ***! MODIFIED CLASS: PUBLIC NON_FINAL (<- FINAL) io.servicetalk.http.netty.ProxyResponseException (Serializable removed) === CLASS FILE FORMAT VERSION: 52.0 <- 52.0 ---! REMOVED SUPERCLASS: java.io.IOException ---! REMOVED METHOD: PUBLIC(-) java.lang.String toString() Comparing binary compatibility of servicetalk-transport-netty-internal-0.42.38-SNAPSHOT.jar against servicetalk-transport-netty-internal-0.42.37.jar ***! MODIFIED CLASS: PUBLIC STATIC FINAL io.servicetalk.transport.netty.internal.NoopTransportObserver$NoopConnectionObserver (not serializable) === CLASS FILE FORMAT VERSION: 52.0 <- 52.0 === UNCHANGED SUPERCLASS: java.lang.Object (<- java.lang.Object) ---! REMOVED METHOD: PUBLIC(-) void onTransportHandshakeComplete() ``` Modifications: - Restore `ProxyResponseException.toString()`; - Restore `NoopTransportObserver.onTransportHandshakeComplete()`; Result: Less warnings produced by `japicmp.sh` script. --- .../io/servicetalk/http/netty/ProxyResponseException.java | 5 +++++ .../io/servicetalk/transport/api/ConnectionObserver.java | 2 +- .../transport/netty/internal/NoopTransportObserver.java | 5 +++++ 3 files changed, 11 insertions(+), 1 deletion(-) diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/ProxyResponseException.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/ProxyResponseException.java index 72146423e0..dae94398d1 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/ProxyResponseException.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/ProxyResponseException.java @@ -41,4 +41,9 @@ public class ProxyResponseException extends ProxyConnectResponseException implem public HttpResponseStatus status() { return response().status(); } + + @Override + public String toString() { + return super.toString(); + } } diff --git a/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/ConnectionObserver.java b/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/ConnectionObserver.java index 1b2c5c0353..4e23a70b72 100644 --- a/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/ConnectionObserver.java +++ b/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/ConnectionObserver.java @@ -56,7 +56,7 @@ public interface ConnectionObserver { * @deprecated Use {@link #onTransportHandshakeComplete(ConnectionInfo)} */ @Deprecated - default void onTransportHandshakeComplete() { + default void onTransportHandshakeComplete() { // FIXME: 0.43 - remove deprecated method } /** diff --git a/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/NoopTransportObserver.java b/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/NoopTransportObserver.java index e0cf91f583..3a7a5c465e 100644 --- a/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/NoopTransportObserver.java +++ b/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/NoopTransportObserver.java @@ -68,6 +68,11 @@ public void onDataWrite(final int size) { public void onFlush() { } + @Override + @SuppressWarnings("deprecation") + public void onTransportHandshakeComplete() { + } + @Override public void onTransportHandshakeComplete(final ConnectionInfo info) { } From ec3c64a414f3444f8391bef60f16386f781c2e2b Mon Sep 17 00:00:00 2001 From: Idel Pivnitskiy Date: Wed, 11 Oct 2023 08:46:14 -0700 Subject: [PATCH 5/9] Introduce tmp properties for Netty `decoderEnforceMaxRstFramesPerWindow` (#2728) Motivation: Netty 4.1.100 introduced a new feature for HTTP/2 to protect from DDoS vector (see https://github.com/netty/netty/security/advisories/GHSA-xpw8-rcwv-8f8p) This is unknown if users of libraries that work on top of Netty may be impacted by false positives or not. For the next major release we should either remove these properties or promote them to public API. Modifications: - Temporarily introduce the following system properties to control underlying netty-codec-http2: `-Dio.servicetalk.http.netty.http2.decoderEnforceMaxRstFramesPerWindow.maxConsecutiveEmptyFrames=200` `-Dio.servicetalk.http.netty.http2.decoderEnforceMaxRstFramesPerWindow.secondsPerWindow=30` Result: If necessary, users can control new netty options via system properties. --- .../OptimizedHttp2FrameCodecBuilder.java | 79 ++++++++++++++++++- 1 file changed, 78 insertions(+), 1 deletion(-) diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/OptimizedHttp2FrameCodecBuilder.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/OptimizedHttp2FrameCodecBuilder.java index 7c6786dcbf..1e3785d075 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/OptimizedHttp2FrameCodecBuilder.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/OptimizedHttp2FrameCodecBuilder.java @@ -39,10 +39,26 @@ final class OptimizedHttp2FrameCodecBuilder extends Http2FrameCodecBuilder { private static final Logger LOGGER = LoggerFactory.getLogger(OptimizedHttp2FrameCodecBuilder.class); + // FIXME: 0.43 - reconsider system properties for netty-codec-http2 + // These properties are introduced temporarily in case users need to disable or re-configure default values set by + // Netty. For the next major release we should either remove these properties or promote them to public API. + private static final String MAX_CONSECUTIVE_EMPTY_FRAMES_PROPERTY_NAME = + "io.servicetalk.http.netty.http2.decoderEnforceMaxRstFramesPerWindow.maxConsecutiveEmptyFrames"; + private static final String SECONDS_PER_WINDOW_PROPERTY_NAME = + "io.servicetalk.http.netty.http2.decoderEnforceMaxRstFramesPerWindow.secondsPerWindow"; + + private static final int MAX_CONSECUTIVE_EMPTY_FRAMES; + private static final int SECONDS_PER_WINDOW; + @Nullable private static final MethodHandle FLUSH_PREFACE; + @Nullable + private static final MethodHandle DECODER_ENFORCE_MAX_RST_FRAMES_PER_WINDOW; + static { + final Http2FrameCodecBuilder builder = Http2FrameCodecBuilder.forServer(); + MethodHandle flushPreface; try { // Find a new method that exists only in Netty starting from 4.1.78.Final: @@ -50,7 +66,7 @@ final class OptimizedHttp2FrameCodecBuilder extends Http2FrameCodecBuilder { .findVirtual(Http2FrameCodecBuilder.class, "flushPreface", methodType(Http2FrameCodecBuilder.class, boolean.class)); // Verify the method is working as expected: - disableFlushPreface(flushPreface, Http2FrameCodecBuilder.forClient()); + disableFlushPreface(flushPreface, builder); } catch (Throwable cause) { LOGGER.debug("Http2FrameCodecBuilder#flushPreface(boolean) is available only starting from " + "Netty 4.1.78.Final. Detected Netty version: {}", @@ -58,6 +74,26 @@ final class OptimizedHttp2FrameCodecBuilder extends Http2FrameCodecBuilder { flushPreface = null; } FLUSH_PREFACE = flushPreface; + + // Default values are taken from Netty's AbstractHttp2ConnectionHandlerBuilder + MAX_CONSECUTIVE_EMPTY_FRAMES = parseProperty(MAX_CONSECUTIVE_EMPTY_FRAMES_PROPERTY_NAME, 200); + SECONDS_PER_WINDOW = parseProperty(SECONDS_PER_WINDOW_PROPERTY_NAME, 30); + + MethodHandle decoderEnforceMaxRstFramesPerWindow; + try { + // Find a new method that exists only in Netty starting from 4.1.100.Final: + decoderEnforceMaxRstFramesPerWindow = MethodHandles.publicLookup() + .findVirtual(Http2FrameCodecBuilder.class, "decoderEnforceMaxRstFramesPerWindow", + methodType(Http2FrameCodecBuilder.class, int.class, int.class)); + // Verify the method is working as expected: + decoderEnforceMaxRstFramesPerWindow(decoderEnforceMaxRstFramesPerWindow, builder); + } catch (Throwable cause) { + LOGGER.debug("Http2FrameCodecBuilder#decoderEnforceMaxRstFramesPerWindow(int, int) is available only " + + "starting from Netty 4.1.100.Final. Detected Netty version: {}", + Http2FrameCodecBuilder.class.getPackage().getImplementationVersion(), cause); + decoderEnforceMaxRstFramesPerWindow = null; + } + DECODER_ENFORCE_MAX_RST_FRAMES_PER_WINDOW = decoderEnforceMaxRstFramesPerWindow; } private final boolean server; @@ -74,6 +110,7 @@ final class OptimizedHttp2FrameCodecBuilder extends Http2FrameCodecBuilder { this.server = server; this.flowControlQuantum = flowControlQuantum; disableFlushPreface(FLUSH_PREFACE, this); + decoderEnforceMaxRstFramesPerWindow(DECODER_ENFORCE_MAX_RST_FRAMES_PER_WINDOW, this); } @Override @@ -115,4 +152,44 @@ private static Http2FrameCodecBuilder disableFlushPreface(@Nullable final Method return builderInstance; } } + + // To avoid a strict dependency on Netty 4.1.100.Final in the classpath, we use {@link MethodHandle} to check if + // the new method is available or not. + private static Http2FrameCodecBuilder decoderEnforceMaxRstFramesPerWindow( + @Nullable final MethodHandle methodHandle, final Http2FrameCodecBuilder builderInstance) { + if (methodHandle == null) { + return builderInstance; + } + try { + // invokeExact requires return type cast to match the type signature + return (Http2FrameCodecBuilder) methodHandle.invokeExact(builderInstance, + MAX_CONSECUTIVE_EMPTY_FRAMES, SECONDS_PER_WINDOW); + } catch (Throwable t) { + throwException(t); + return builderInstance; + } + } + + private static int parseProperty(final String name, final int defaultValue) { + final String value = System.getProperty(name); + final int intValue; + if (value == null || value.isEmpty()) { + intValue = defaultValue; + } else { + try { + intValue = Integer.parseInt(value); + if (intValue < 0) { + LOGGER.error("Found invalid value -D{}={} (expected >= 0), using fallback value={}", + name, value, defaultValue); + return defaultValue; + } + } catch (NumberFormatException e) { + LOGGER.error("Could not parse -D{}={} (expected int >= 0), using fallback value={}", + name, value, defaultValue, e); + return defaultValue; + } + } + LOGGER.debug("-D{}={}", name, intValue); + return intValue; + } } From 59b6544250c3db7a772f92b397bd14e3bc7c773b Mon Sep 17 00:00:00 2001 From: Thomas Kountis Date: Wed, 11 Oct 2023 13:51:19 -0700 Subject: [PATCH 6/9] Release 0.42.38 --- docs/antora.yml | 2 +- docs/generation/site-remote.yml | 20 +++++++++---------- docs/modules/ROOT/nav.adoc | 2 +- gradle.properties | 2 +- servicetalk-client-api/docs/antora.yml | 2 +- .../docs/modules/ROOT/nav.adoc | 2 +- servicetalk-concurrent-api/docs/antora.yml | 2 +- .../docs/modules/ROOT/nav.adoc | 2 +- .../docs/antora.yml | 2 +- .../docs/modules/ROOT/nav.adoc | 2 +- .../docs/antora.yml | 2 +- .../docs/modules/ROOT/nav.adoc | 2 +- servicetalk-examples/docs/antora.yml | 2 +- .../docs/modules/ROOT/nav.adoc | 2 +- servicetalk-grpc-api/docs/antora.yml | 2 +- .../docs/modules/ROOT/nav.adoc | 2 +- servicetalk-http-api/docs/antora.yml | 2 +- .../docs/modules/ROOT/nav.adoc | 2 +- .../docs/antora.yml | 2 +- .../docs/modules/ROOT/nav.adoc | 2 +- .../docs/antora.yml | 2 +- .../docs/modules/ROOT/nav.adoc | 2 +- servicetalk-loadbalancer/docs/antora.yml | 2 +- .../docs/modules/ROOT/nav.adoc | 2 +- 24 files changed, 33 insertions(+), 33 deletions(-) diff --git a/docs/antora.yml b/docs/antora.yml index 00a61d3a1b..894b4930d6 100644 --- a/docs/antora.yml +++ b/docs/antora.yml @@ -16,6 +16,6 @@ name: servicetalk title: ServiceTalk -version: SNAPSHOT +version: '0.42' nav: - modules/ROOT/nav.adoc diff --git a/docs/generation/site-remote.yml b/docs/generation/site-remote.yml index 561238baac..63dba8cb75 100644 --- a/docs/generation/site-remote.yml +++ b/docs/generation/site-remote.yml @@ -30,43 +30,43 @@ content: sources: - url: https://github.com/apple/servicetalk.git branches: main - tags: [0.19.0, 0.20.0, 0.21.0, 0.22.0, 0.23.0, 0.24.0, 0.25.0, 0.26.0, 0.27.0, 0.28.0, 0.29.0, 0.30.0, 0.31.0, 0.32.0, 0.33.0, 0.34.0, 0.35.0, 0.36.0, 0.37.0, 0.38.0, 0.39.0, 0.40.0, 0.41.14, 0.42.37] + tags: [0.19.0, 0.20.0, 0.21.0, 0.22.0, 0.23.0, 0.24.0, 0.25.0, 0.26.0, 0.27.0, 0.28.0, 0.29.0, 0.30.0, 0.31.0, 0.32.0, 0.33.0, 0.34.0, 0.35.0, 0.36.0, 0.37.0, 0.38.0, 0.39.0, 0.40.0, 0.41.14, 0.42.38] start_path: docs - url: https://github.com/apple/servicetalk.git branches: main - tags: [0.19.0, 0.20.0, 0.21.0, 0.22.0, 0.23.0, 0.24.0, 0.25.0, 0.26.0, 0.27.0, 0.28.0, 0.29.0, 0.30.0, 0.31.0, 0.32.0, 0.33.0, 0.34.0, 0.35.0, 0.36.0, 0.37.0, 0.38.0, 0.39.0, 0.40.0, 0.41.14, 0.42.37] + tags: [0.19.0, 0.20.0, 0.21.0, 0.22.0, 0.23.0, 0.24.0, 0.25.0, 0.26.0, 0.27.0, 0.28.0, 0.29.0, 0.30.0, 0.31.0, 0.32.0, 0.33.0, 0.34.0, 0.35.0, 0.36.0, 0.37.0, 0.38.0, 0.39.0, 0.40.0, 0.41.14, 0.42.38] start_path: servicetalk-examples/docs - url: https://github.com/apple/servicetalk.git branches: main - tags: [0.19.0, 0.20.0, 0.21.0, 0.22.0, 0.23.0, 0.24.0, 0.25.0, 0.26.0, 0.27.0, 0.28.0, 0.29.0, 0.30.0, 0.31.0, 0.32.0, 0.33.0, 0.34.0, 0.35.0, 0.36.0, 0.37.0, 0.38.0, 0.39.0, 0.40.0, 0.41.14, 0.42.37] + tags: [0.19.0, 0.20.0, 0.21.0, 0.22.0, 0.23.0, 0.24.0, 0.25.0, 0.26.0, 0.27.0, 0.28.0, 0.29.0, 0.30.0, 0.31.0, 0.32.0, 0.33.0, 0.34.0, 0.35.0, 0.36.0, 0.37.0, 0.38.0, 0.39.0, 0.40.0, 0.41.14, 0.42.38] start_path: servicetalk-http-api/docs - url: https://github.com/apple/servicetalk.git branches: main - tags: [0.19.0, 0.20.0, 0.21.0, 0.22.0, 0.23.0, 0.24.0, 0.25.0, 0.26.0, 0.27.0, 0.28.0, 0.29.0, 0.30.0, 0.31.0, 0.32.0, 0.33.0, 0.34.0, 0.35.0, 0.36.0, 0.37.0, 0.38.0, 0.39.0, 0.40.0, 0.41.14, 0.42.37] + tags: [0.19.0, 0.20.0, 0.21.0, 0.22.0, 0.23.0, 0.24.0, 0.25.0, 0.26.0, 0.27.0, 0.28.0, 0.29.0, 0.30.0, 0.31.0, 0.32.0, 0.33.0, 0.34.0, 0.35.0, 0.36.0, 0.37.0, 0.38.0, 0.39.0, 0.40.0, 0.41.14, 0.42.38] start_path: servicetalk-http-router-jersey/docs - url: https://github.com/apple/servicetalk.git branches: main - tags: [0.19.0, 0.20.0, 0.21.0, 0.22.0, 0.23.0, 0.24.0, 0.25.0, 0.26.0, 0.27.0, 0.28.0, 0.29.0, 0.30.0, 0.31.0, 0.32.0, 0.33.0, 0.34.0, 0.35.0, 0.36.0, 0.37.0, 0.38.0, 0.39.0, 0.40.0, 0.41.14, 0.42.37] + tags: [0.19.0, 0.20.0, 0.21.0, 0.22.0, 0.23.0, 0.24.0, 0.25.0, 0.26.0, 0.27.0, 0.28.0, 0.29.0, 0.30.0, 0.31.0, 0.32.0, 0.33.0, 0.34.0, 0.35.0, 0.36.0, 0.37.0, 0.38.0, 0.39.0, 0.40.0, 0.41.14, 0.42.38] start_path: servicetalk-http-security-jersey/docs - url: https://github.com/apple/servicetalk.git branches: main - tags: [0.19.0, 0.20.0, 0.21.0, 0.22.0, 0.23.0, 0.24.0, 0.25.0, 0.26.0, 0.27.0, 0.28.0, 0.29.0, 0.30.0, 0.31.0, 0.32.0, 0.33.0, 0.34.0, 0.35.0, 0.36.0, 0.37.0, 0.38.0, 0.39.0, 0.40.0, 0.41.14, 0.42.37] + tags: [0.19.0, 0.20.0, 0.21.0, 0.22.0, 0.23.0, 0.24.0, 0.25.0, 0.26.0, 0.27.0, 0.28.0, 0.29.0, 0.30.0, 0.31.0, 0.32.0, 0.33.0, 0.34.0, 0.35.0, 0.36.0, 0.37.0, 0.38.0, 0.39.0, 0.40.0, 0.41.14, 0.42.38] start_path: servicetalk-concurrent-api/docs - url: https://github.com/apple/servicetalk.git branches: main - tags: [0.19.0, 0.20.0, 0.21.0, 0.22.0, 0.23.0, 0.24.0, 0.25.0, 0.26.0, 0.27.0, 0.28.0, 0.29.0, 0.30.0, 0.31.0, 0.32.0, 0.33.0, 0.34.0, 0.35.0, 0.36.0, 0.37.0, 0.38.0, 0.39.0, 0.40.0, 0.41.14, 0.42.37] + tags: [0.19.0, 0.20.0, 0.21.0, 0.22.0, 0.23.0, 0.24.0, 0.25.0, 0.26.0, 0.27.0, 0.28.0, 0.29.0, 0.30.0, 0.31.0, 0.32.0, 0.33.0, 0.34.0, 0.35.0, 0.36.0, 0.37.0, 0.38.0, 0.39.0, 0.40.0, 0.41.14, 0.42.38] start_path: servicetalk-data-jackson-jersey/docs - url: https://github.com/apple/servicetalk.git branches: main - tags: [0.19.0, 0.20.0, 0.21.0, 0.22.0, 0.23.0, 0.24.0, 0.25.0, 0.26.0, 0.27.0, 0.28.0, 0.29.0, 0.30.0, 0.31.0, 0.32.0, 0.33.0, 0.34.0, 0.35.0, 0.36.0, 0.37.0, 0.38.0, 0.39.0, 0.40.0, 0.41.14, 0.42.37] + tags: [0.19.0, 0.20.0, 0.21.0, 0.22.0, 0.23.0, 0.24.0, 0.25.0, 0.26.0, 0.27.0, 0.28.0, 0.29.0, 0.30.0, 0.31.0, 0.32.0, 0.33.0, 0.34.0, 0.35.0, 0.36.0, 0.37.0, 0.38.0, 0.39.0, 0.40.0, 0.41.14, 0.42.38] start_path: servicetalk-grpc-api/docs - url: https://github.com/apple/servicetalk.git branches: main - tags: [0.20.0, 0.21.0, 0.22.0, 0.23.0, 0.24.0, 0.25.0, 0.26.0, 0.27.0, 0.28.0, 0.29.0, 0.30.0, 0.31.0, 0.32.0, 0.33.0, 0.34.0, 0.35.0, 0.36.0, 0.37.0, 0.38.0, 0.39.0, 0.40.0, 0.41.14, 0.42.37] + tags: [0.20.0, 0.21.0, 0.22.0, 0.23.0, 0.24.0, 0.25.0, 0.26.0, 0.27.0, 0.28.0, 0.29.0, 0.30.0, 0.31.0, 0.32.0, 0.33.0, 0.34.0, 0.35.0, 0.36.0, 0.37.0, 0.38.0, 0.39.0, 0.40.0, 0.41.14, 0.42.38] start_path: servicetalk-loadbalancer/docs - url: https://github.com/apple/servicetalk.git branches: main - tags: [0.25.0, 0.26.0, 0.27.0, 0.28.0, 0.29.0, 0.30.0, 0.31.0, 0.32.0, 0.33.0, 0.34.0, 0.35.0, 0.36.0, 0.37.0, 0.38.0, 0.39.0, 0.40.0, 0.41.14, 0.42.37] + tags: [0.25.0, 0.26.0, 0.27.0, 0.28.0, 0.29.0, 0.30.0, 0.31.0, 0.32.0, 0.33.0, 0.34.0, 0.35.0, 0.36.0, 0.37.0, 0.38.0, 0.39.0, 0.40.0, 0.41.14, 0.42.38] start_path: servicetalk-client-api/docs asciidoc: attributes: diff --git a/docs/modules/ROOT/nav.adoc b/docs/modules/ROOT/nav.adoc index 0afba89ac4..bb8528f4e8 100644 --- a/docs/modules/ROOT/nav.adoc +++ b/docs/modules/ROOT/nav.adoc @@ -1,5 +1,5 @@ ifndef::page-version[] -:page-version: SNAPSHOT +:page-version: 0.42 endif::[] include::{page-version}@servicetalk::partial$nav-versioned.adoc[] diff --git a/gradle.properties b/gradle.properties index c30b12c487..948a2b5b06 100644 --- a/gradle.properties +++ b/gradle.properties @@ -23,7 +23,7 @@ org.gradle.jvmargs=-Xms2g -Xmx4g -dsa -da -ea:io.servicetalk... -XX:+HeapDumpOnO # project metadata used for publications group=io.servicetalk -version=0.42.38-SNAPSHOT +version=0.42.38 scmHost=github.com scmPath=apple/servicetalk issueManagementUrl=https://github.com/apple/servicetalk/issues diff --git a/servicetalk-client-api/docs/antora.yml b/servicetalk-client-api/docs/antora.yml index 6522f97515..efc15ff236 100644 --- a/servicetalk-client-api/docs/antora.yml +++ b/servicetalk-client-api/docs/antora.yml @@ -16,6 +16,6 @@ name: servicetalk-client-api title: Client API -version: SNAPSHOT +version: '0.42' nav: - modules/ROOT/nav.adoc diff --git a/servicetalk-client-api/docs/modules/ROOT/nav.adoc b/servicetalk-client-api/docs/modules/ROOT/nav.adoc index 0afba89ac4..bb8528f4e8 100644 --- a/servicetalk-client-api/docs/modules/ROOT/nav.adoc +++ b/servicetalk-client-api/docs/modules/ROOT/nav.adoc @@ -1,5 +1,5 @@ ifndef::page-version[] -:page-version: SNAPSHOT +:page-version: 0.42 endif::[] include::{page-version}@servicetalk::partial$nav-versioned.adoc[] diff --git a/servicetalk-concurrent-api/docs/antora.yml b/servicetalk-concurrent-api/docs/antora.yml index 71dcd1cef2..b28d71dc68 100644 --- a/servicetalk-concurrent-api/docs/antora.yml +++ b/servicetalk-concurrent-api/docs/antora.yml @@ -16,6 +16,6 @@ name: servicetalk-concurrent-api title: Concurrent API -version: SNAPSHOT +version: '0.42' nav: - modules/ROOT/nav.adoc diff --git a/servicetalk-concurrent-api/docs/modules/ROOT/nav.adoc b/servicetalk-concurrent-api/docs/modules/ROOT/nav.adoc index 0afba89ac4..bb8528f4e8 100644 --- a/servicetalk-concurrent-api/docs/modules/ROOT/nav.adoc +++ b/servicetalk-concurrent-api/docs/modules/ROOT/nav.adoc @@ -1,5 +1,5 @@ ifndef::page-version[] -:page-version: SNAPSHOT +:page-version: 0.42 endif::[] include::{page-version}@servicetalk::partial$nav-versioned.adoc[] diff --git a/servicetalk-data-jackson-jersey/docs/antora.yml b/servicetalk-data-jackson-jersey/docs/antora.yml index 9734c0b9ba..a3d24c2463 100644 --- a/servicetalk-data-jackson-jersey/docs/antora.yml +++ b/servicetalk-data-jackson-jersey/docs/antora.yml @@ -16,6 +16,6 @@ name: servicetalk-data-jackson-jersey title: JSON (Jackson) -version: SNAPSHOT +version: '0.42' nav: - modules/ROOT/nav.adoc diff --git a/servicetalk-data-jackson-jersey/docs/modules/ROOT/nav.adoc b/servicetalk-data-jackson-jersey/docs/modules/ROOT/nav.adoc index 0afba89ac4..bb8528f4e8 100644 --- a/servicetalk-data-jackson-jersey/docs/modules/ROOT/nav.adoc +++ b/servicetalk-data-jackson-jersey/docs/modules/ROOT/nav.adoc @@ -1,5 +1,5 @@ ifndef::page-version[] -:page-version: SNAPSHOT +:page-version: 0.42 endif::[] include::{page-version}@servicetalk::partial$nav-versioned.adoc[] diff --git a/servicetalk-data-protobuf-jersey/docs/antora.yml b/servicetalk-data-protobuf-jersey/docs/antora.yml index ca4b011d91..f8b901792c 100644 --- a/servicetalk-data-protobuf-jersey/docs/antora.yml +++ b/servicetalk-data-protobuf-jersey/docs/antora.yml @@ -16,6 +16,6 @@ name: servicetalk-data-protobuf-jersey title: Protobuf -version: SNAPSHOT +version: '0.42' nav: - modules/ROOT/nav.adoc diff --git a/servicetalk-data-protobuf-jersey/docs/modules/ROOT/nav.adoc b/servicetalk-data-protobuf-jersey/docs/modules/ROOT/nav.adoc index 0afba89ac4..bb8528f4e8 100644 --- a/servicetalk-data-protobuf-jersey/docs/modules/ROOT/nav.adoc +++ b/servicetalk-data-protobuf-jersey/docs/modules/ROOT/nav.adoc @@ -1,5 +1,5 @@ ifndef::page-version[] -:page-version: SNAPSHOT +:page-version: 0.42 endif::[] include::{page-version}@servicetalk::partial$nav-versioned.adoc[] diff --git a/servicetalk-examples/docs/antora.yml b/servicetalk-examples/docs/antora.yml index 9371528ccc..6cdf50dbed 100644 --- a/servicetalk-examples/docs/antora.yml +++ b/servicetalk-examples/docs/antora.yml @@ -16,6 +16,6 @@ name: servicetalk-examples title: Examples -version: SNAPSHOT +version: '0.42' nav: - modules/ROOT/nav.adoc diff --git a/servicetalk-examples/docs/modules/ROOT/nav.adoc b/servicetalk-examples/docs/modules/ROOT/nav.adoc index 0afba89ac4..bb8528f4e8 100644 --- a/servicetalk-examples/docs/modules/ROOT/nav.adoc +++ b/servicetalk-examples/docs/modules/ROOT/nav.adoc @@ -1,5 +1,5 @@ ifndef::page-version[] -:page-version: SNAPSHOT +:page-version: 0.42 endif::[] include::{page-version}@servicetalk::partial$nav-versioned.adoc[] diff --git a/servicetalk-grpc-api/docs/antora.yml b/servicetalk-grpc-api/docs/antora.yml index c96cd2d20f..aff1ab7c9d 100644 --- a/servicetalk-grpc-api/docs/antora.yml +++ b/servicetalk-grpc-api/docs/antora.yml @@ -16,6 +16,6 @@ name: servicetalk-grpc-api title: gRPC -version: SNAPSHOT +version: '0.42' nav: - modules/ROOT/nav.adoc diff --git a/servicetalk-grpc-api/docs/modules/ROOT/nav.adoc b/servicetalk-grpc-api/docs/modules/ROOT/nav.adoc index 0afba89ac4..bb8528f4e8 100644 --- a/servicetalk-grpc-api/docs/modules/ROOT/nav.adoc +++ b/servicetalk-grpc-api/docs/modules/ROOT/nav.adoc @@ -1,5 +1,5 @@ ifndef::page-version[] -:page-version: SNAPSHOT +:page-version: 0.42 endif::[] include::{page-version}@servicetalk::partial$nav-versioned.adoc[] diff --git a/servicetalk-http-api/docs/antora.yml b/servicetalk-http-api/docs/antora.yml index 96121d6cf9..a2b075c3b9 100644 --- a/servicetalk-http-api/docs/antora.yml +++ b/servicetalk-http-api/docs/antora.yml @@ -16,6 +16,6 @@ name: servicetalk-http-api title: HTTP -version: SNAPSHOT +version: '0.42' nav: - modules/ROOT/nav.adoc diff --git a/servicetalk-http-api/docs/modules/ROOT/nav.adoc b/servicetalk-http-api/docs/modules/ROOT/nav.adoc index 0afba89ac4..bb8528f4e8 100644 --- a/servicetalk-http-api/docs/modules/ROOT/nav.adoc +++ b/servicetalk-http-api/docs/modules/ROOT/nav.adoc @@ -1,5 +1,5 @@ ifndef::page-version[] -:page-version: SNAPSHOT +:page-version: 0.42 endif::[] include::{page-version}@servicetalk::partial$nav-versioned.adoc[] diff --git a/servicetalk-http-router-jersey/docs/antora.yml b/servicetalk-http-router-jersey/docs/antora.yml index c4c118acdb..f92e95c6bc 100644 --- a/servicetalk-http-router-jersey/docs/antora.yml +++ b/servicetalk-http-router-jersey/docs/antora.yml @@ -16,6 +16,6 @@ name: servicetalk-http-router-jersey title: JAX-RS Router (Jersey) -version: SNAPSHOT +version: '0.42' nav: - modules/ROOT/nav.adoc diff --git a/servicetalk-http-router-jersey/docs/modules/ROOT/nav.adoc b/servicetalk-http-router-jersey/docs/modules/ROOT/nav.adoc index 0afba89ac4..bb8528f4e8 100644 --- a/servicetalk-http-router-jersey/docs/modules/ROOT/nav.adoc +++ b/servicetalk-http-router-jersey/docs/modules/ROOT/nav.adoc @@ -1,5 +1,5 @@ ifndef::page-version[] -:page-version: SNAPSHOT +:page-version: 0.42 endif::[] include::{page-version}@servicetalk::partial$nav-versioned.adoc[] diff --git a/servicetalk-http-security-jersey/docs/antora.yml b/servicetalk-http-security-jersey/docs/antora.yml index 8574785341..1cffbf8112 100644 --- a/servicetalk-http-security-jersey/docs/antora.yml +++ b/servicetalk-http-security-jersey/docs/antora.yml @@ -16,6 +16,6 @@ name: servicetalk-http-security-jersey title: Security -version: SNAPSHOT +version: '0.42' nav: - modules/ROOT/nav.adoc diff --git a/servicetalk-http-security-jersey/docs/modules/ROOT/nav.adoc b/servicetalk-http-security-jersey/docs/modules/ROOT/nav.adoc index 0afba89ac4..bb8528f4e8 100644 --- a/servicetalk-http-security-jersey/docs/modules/ROOT/nav.adoc +++ b/servicetalk-http-security-jersey/docs/modules/ROOT/nav.adoc @@ -1,5 +1,5 @@ ifndef::page-version[] -:page-version: SNAPSHOT +:page-version: 0.42 endif::[] include::{page-version}@servicetalk::partial$nav-versioned.adoc[] diff --git a/servicetalk-loadbalancer/docs/antora.yml b/servicetalk-loadbalancer/docs/antora.yml index 3e34638348..e0e2c6c216 100644 --- a/servicetalk-loadbalancer/docs/antora.yml +++ b/servicetalk-loadbalancer/docs/antora.yml @@ -16,6 +16,6 @@ name: servicetalk-loadbalancer title: Load balancing -version: SNAPSHOT +version: '0.42' nav: - modules/ROOT/nav.adoc diff --git a/servicetalk-loadbalancer/docs/modules/ROOT/nav.adoc b/servicetalk-loadbalancer/docs/modules/ROOT/nav.adoc index 0afba89ac4..bb8528f4e8 100644 --- a/servicetalk-loadbalancer/docs/modules/ROOT/nav.adoc +++ b/servicetalk-loadbalancer/docs/modules/ROOT/nav.adoc @@ -1,5 +1,5 @@ ifndef::page-version[] -:page-version: SNAPSHOT +:page-version: 0.42 endif::[] include::{page-version}@servicetalk::partial$nav-versioned.adoc[] From 8b8d1c551499d46b8ae1123efc41bdcfbe2779e5 Mon Sep 17 00:00:00 2001 From: Thomas Kountis Date: Wed, 11 Oct 2023 13:51:20 -0700 Subject: [PATCH 7/9] Preparing for 0.42.39-SNAPSHOT development --- docs/antora.yml | 2 +- docs/modules/ROOT/nav.adoc | 2 +- gradle.properties | 2 +- servicetalk-client-api/docs/antora.yml | 2 +- servicetalk-client-api/docs/modules/ROOT/nav.adoc | 2 +- servicetalk-concurrent-api/docs/antora.yml | 2 +- servicetalk-concurrent-api/docs/modules/ROOT/nav.adoc | 2 +- servicetalk-data-jackson-jersey/docs/antora.yml | 2 +- servicetalk-data-jackson-jersey/docs/modules/ROOT/nav.adoc | 2 +- servicetalk-data-protobuf-jersey/docs/antora.yml | 2 +- servicetalk-data-protobuf-jersey/docs/modules/ROOT/nav.adoc | 2 +- servicetalk-examples/docs/antora.yml | 2 +- servicetalk-examples/docs/modules/ROOT/nav.adoc | 2 +- servicetalk-examples/grpc/helloworld/pom.xml | 2 +- servicetalk-grpc-api/docs/antora.yml | 2 +- servicetalk-grpc-api/docs/modules/ROOT/nav.adoc | 2 +- servicetalk-http-api/docs/antora.yml | 2 +- servicetalk-http-api/docs/modules/ROOT/nav.adoc | 2 +- servicetalk-http-router-jersey/docs/antora.yml | 2 +- servicetalk-http-router-jersey/docs/modules/ROOT/nav.adoc | 2 +- servicetalk-http-security-jersey/docs/antora.yml | 2 +- servicetalk-http-security-jersey/docs/modules/ROOT/nav.adoc | 2 +- servicetalk-loadbalancer/docs/antora.yml | 2 +- servicetalk-loadbalancer/docs/modules/ROOT/nav.adoc | 2 +- 24 files changed, 24 insertions(+), 24 deletions(-) diff --git a/docs/antora.yml b/docs/antora.yml index 894b4930d6..00a61d3a1b 100644 --- a/docs/antora.yml +++ b/docs/antora.yml @@ -16,6 +16,6 @@ name: servicetalk title: ServiceTalk -version: '0.42' +version: SNAPSHOT nav: - modules/ROOT/nav.adoc diff --git a/docs/modules/ROOT/nav.adoc b/docs/modules/ROOT/nav.adoc index bb8528f4e8..0afba89ac4 100644 --- a/docs/modules/ROOT/nav.adoc +++ b/docs/modules/ROOT/nav.adoc @@ -1,5 +1,5 @@ ifndef::page-version[] -:page-version: 0.42 +:page-version: SNAPSHOT endif::[] include::{page-version}@servicetalk::partial$nav-versioned.adoc[] diff --git a/gradle.properties b/gradle.properties index 948a2b5b06..68835894e2 100644 --- a/gradle.properties +++ b/gradle.properties @@ -23,7 +23,7 @@ org.gradle.jvmargs=-Xms2g -Xmx4g -dsa -da -ea:io.servicetalk... -XX:+HeapDumpOnO # project metadata used for publications group=io.servicetalk -version=0.42.38 +version=0.42.39-SNAPSHOT scmHost=github.com scmPath=apple/servicetalk issueManagementUrl=https://github.com/apple/servicetalk/issues diff --git a/servicetalk-client-api/docs/antora.yml b/servicetalk-client-api/docs/antora.yml index efc15ff236..6522f97515 100644 --- a/servicetalk-client-api/docs/antora.yml +++ b/servicetalk-client-api/docs/antora.yml @@ -16,6 +16,6 @@ name: servicetalk-client-api title: Client API -version: '0.42' +version: SNAPSHOT nav: - modules/ROOT/nav.adoc diff --git a/servicetalk-client-api/docs/modules/ROOT/nav.adoc b/servicetalk-client-api/docs/modules/ROOT/nav.adoc index bb8528f4e8..0afba89ac4 100644 --- a/servicetalk-client-api/docs/modules/ROOT/nav.adoc +++ b/servicetalk-client-api/docs/modules/ROOT/nav.adoc @@ -1,5 +1,5 @@ ifndef::page-version[] -:page-version: 0.42 +:page-version: SNAPSHOT endif::[] include::{page-version}@servicetalk::partial$nav-versioned.adoc[] diff --git a/servicetalk-concurrent-api/docs/antora.yml b/servicetalk-concurrent-api/docs/antora.yml index b28d71dc68..71dcd1cef2 100644 --- a/servicetalk-concurrent-api/docs/antora.yml +++ b/servicetalk-concurrent-api/docs/antora.yml @@ -16,6 +16,6 @@ name: servicetalk-concurrent-api title: Concurrent API -version: '0.42' +version: SNAPSHOT nav: - modules/ROOT/nav.adoc diff --git a/servicetalk-concurrent-api/docs/modules/ROOT/nav.adoc b/servicetalk-concurrent-api/docs/modules/ROOT/nav.adoc index bb8528f4e8..0afba89ac4 100644 --- a/servicetalk-concurrent-api/docs/modules/ROOT/nav.adoc +++ b/servicetalk-concurrent-api/docs/modules/ROOT/nav.adoc @@ -1,5 +1,5 @@ ifndef::page-version[] -:page-version: 0.42 +:page-version: SNAPSHOT endif::[] include::{page-version}@servicetalk::partial$nav-versioned.adoc[] diff --git a/servicetalk-data-jackson-jersey/docs/antora.yml b/servicetalk-data-jackson-jersey/docs/antora.yml index a3d24c2463..9734c0b9ba 100644 --- a/servicetalk-data-jackson-jersey/docs/antora.yml +++ b/servicetalk-data-jackson-jersey/docs/antora.yml @@ -16,6 +16,6 @@ name: servicetalk-data-jackson-jersey title: JSON (Jackson) -version: '0.42' +version: SNAPSHOT nav: - modules/ROOT/nav.adoc diff --git a/servicetalk-data-jackson-jersey/docs/modules/ROOT/nav.adoc b/servicetalk-data-jackson-jersey/docs/modules/ROOT/nav.adoc index bb8528f4e8..0afba89ac4 100644 --- a/servicetalk-data-jackson-jersey/docs/modules/ROOT/nav.adoc +++ b/servicetalk-data-jackson-jersey/docs/modules/ROOT/nav.adoc @@ -1,5 +1,5 @@ ifndef::page-version[] -:page-version: 0.42 +:page-version: SNAPSHOT endif::[] include::{page-version}@servicetalk::partial$nav-versioned.adoc[] diff --git a/servicetalk-data-protobuf-jersey/docs/antora.yml b/servicetalk-data-protobuf-jersey/docs/antora.yml index f8b901792c..ca4b011d91 100644 --- a/servicetalk-data-protobuf-jersey/docs/antora.yml +++ b/servicetalk-data-protobuf-jersey/docs/antora.yml @@ -16,6 +16,6 @@ name: servicetalk-data-protobuf-jersey title: Protobuf -version: '0.42' +version: SNAPSHOT nav: - modules/ROOT/nav.adoc diff --git a/servicetalk-data-protobuf-jersey/docs/modules/ROOT/nav.adoc b/servicetalk-data-protobuf-jersey/docs/modules/ROOT/nav.adoc index bb8528f4e8..0afba89ac4 100644 --- a/servicetalk-data-protobuf-jersey/docs/modules/ROOT/nav.adoc +++ b/servicetalk-data-protobuf-jersey/docs/modules/ROOT/nav.adoc @@ -1,5 +1,5 @@ ifndef::page-version[] -:page-version: 0.42 +:page-version: SNAPSHOT endif::[] include::{page-version}@servicetalk::partial$nav-versioned.adoc[] diff --git a/servicetalk-examples/docs/antora.yml b/servicetalk-examples/docs/antora.yml index 6cdf50dbed..9371528ccc 100644 --- a/servicetalk-examples/docs/antora.yml +++ b/servicetalk-examples/docs/antora.yml @@ -16,6 +16,6 @@ name: servicetalk-examples title: Examples -version: '0.42' +version: SNAPSHOT nav: - modules/ROOT/nav.adoc diff --git a/servicetalk-examples/docs/modules/ROOT/nav.adoc b/servicetalk-examples/docs/modules/ROOT/nav.adoc index bb8528f4e8..0afba89ac4 100644 --- a/servicetalk-examples/docs/modules/ROOT/nav.adoc +++ b/servicetalk-examples/docs/modules/ROOT/nav.adoc @@ -1,5 +1,5 @@ ifndef::page-version[] -:page-version: 0.42 +:page-version: SNAPSHOT endif::[] include::{page-version}@servicetalk::partial$nav-versioned.adoc[] diff --git a/servicetalk-examples/grpc/helloworld/pom.xml b/servicetalk-examples/grpc/helloworld/pom.xml index 7b4a6defd3..71854f76cc 100644 --- a/servicetalk-examples/grpc/helloworld/pom.xml +++ b/servicetalk-examples/grpc/helloworld/pom.xml @@ -10,7 +10,7 @@ - 0.42.37 + 0.42.38 0.6.1 3.19.2 1.6.0 diff --git a/servicetalk-grpc-api/docs/antora.yml b/servicetalk-grpc-api/docs/antora.yml index aff1ab7c9d..c96cd2d20f 100644 --- a/servicetalk-grpc-api/docs/antora.yml +++ b/servicetalk-grpc-api/docs/antora.yml @@ -16,6 +16,6 @@ name: servicetalk-grpc-api title: gRPC -version: '0.42' +version: SNAPSHOT nav: - modules/ROOT/nav.adoc diff --git a/servicetalk-grpc-api/docs/modules/ROOT/nav.adoc b/servicetalk-grpc-api/docs/modules/ROOT/nav.adoc index bb8528f4e8..0afba89ac4 100644 --- a/servicetalk-grpc-api/docs/modules/ROOT/nav.adoc +++ b/servicetalk-grpc-api/docs/modules/ROOT/nav.adoc @@ -1,5 +1,5 @@ ifndef::page-version[] -:page-version: 0.42 +:page-version: SNAPSHOT endif::[] include::{page-version}@servicetalk::partial$nav-versioned.adoc[] diff --git a/servicetalk-http-api/docs/antora.yml b/servicetalk-http-api/docs/antora.yml index a2b075c3b9..96121d6cf9 100644 --- a/servicetalk-http-api/docs/antora.yml +++ b/servicetalk-http-api/docs/antora.yml @@ -16,6 +16,6 @@ name: servicetalk-http-api title: HTTP -version: '0.42' +version: SNAPSHOT nav: - modules/ROOT/nav.adoc diff --git a/servicetalk-http-api/docs/modules/ROOT/nav.adoc b/servicetalk-http-api/docs/modules/ROOT/nav.adoc index bb8528f4e8..0afba89ac4 100644 --- a/servicetalk-http-api/docs/modules/ROOT/nav.adoc +++ b/servicetalk-http-api/docs/modules/ROOT/nav.adoc @@ -1,5 +1,5 @@ ifndef::page-version[] -:page-version: 0.42 +:page-version: SNAPSHOT endif::[] include::{page-version}@servicetalk::partial$nav-versioned.adoc[] diff --git a/servicetalk-http-router-jersey/docs/antora.yml b/servicetalk-http-router-jersey/docs/antora.yml index f92e95c6bc..c4c118acdb 100644 --- a/servicetalk-http-router-jersey/docs/antora.yml +++ b/servicetalk-http-router-jersey/docs/antora.yml @@ -16,6 +16,6 @@ name: servicetalk-http-router-jersey title: JAX-RS Router (Jersey) -version: '0.42' +version: SNAPSHOT nav: - modules/ROOT/nav.adoc diff --git a/servicetalk-http-router-jersey/docs/modules/ROOT/nav.adoc b/servicetalk-http-router-jersey/docs/modules/ROOT/nav.adoc index bb8528f4e8..0afba89ac4 100644 --- a/servicetalk-http-router-jersey/docs/modules/ROOT/nav.adoc +++ b/servicetalk-http-router-jersey/docs/modules/ROOT/nav.adoc @@ -1,5 +1,5 @@ ifndef::page-version[] -:page-version: 0.42 +:page-version: SNAPSHOT endif::[] include::{page-version}@servicetalk::partial$nav-versioned.adoc[] diff --git a/servicetalk-http-security-jersey/docs/antora.yml b/servicetalk-http-security-jersey/docs/antora.yml index 1cffbf8112..8574785341 100644 --- a/servicetalk-http-security-jersey/docs/antora.yml +++ b/servicetalk-http-security-jersey/docs/antora.yml @@ -16,6 +16,6 @@ name: servicetalk-http-security-jersey title: Security -version: '0.42' +version: SNAPSHOT nav: - modules/ROOT/nav.adoc diff --git a/servicetalk-http-security-jersey/docs/modules/ROOT/nav.adoc b/servicetalk-http-security-jersey/docs/modules/ROOT/nav.adoc index bb8528f4e8..0afba89ac4 100644 --- a/servicetalk-http-security-jersey/docs/modules/ROOT/nav.adoc +++ b/servicetalk-http-security-jersey/docs/modules/ROOT/nav.adoc @@ -1,5 +1,5 @@ ifndef::page-version[] -:page-version: 0.42 +:page-version: SNAPSHOT endif::[] include::{page-version}@servicetalk::partial$nav-versioned.adoc[] diff --git a/servicetalk-loadbalancer/docs/antora.yml b/servicetalk-loadbalancer/docs/antora.yml index e0e2c6c216..3e34638348 100644 --- a/servicetalk-loadbalancer/docs/antora.yml +++ b/servicetalk-loadbalancer/docs/antora.yml @@ -16,6 +16,6 @@ name: servicetalk-loadbalancer title: Load balancing -version: '0.42' +version: SNAPSHOT nav: - modules/ROOT/nav.adoc diff --git a/servicetalk-loadbalancer/docs/modules/ROOT/nav.adoc b/servicetalk-loadbalancer/docs/modules/ROOT/nav.adoc index bb8528f4e8..0afba89ac4 100644 --- a/servicetalk-loadbalancer/docs/modules/ROOT/nav.adoc +++ b/servicetalk-loadbalancer/docs/modules/ROOT/nav.adoc @@ -1,5 +1,5 @@ ifndef::page-version[] -:page-version: 0.42 +:page-version: SNAPSHOT endif::[] include::{page-version}@servicetalk::partial$nav-versioned.adoc[] From 53ae43837fd31999dbba5d063f220d8ed039cc67 Mon Sep 17 00:00:00 2001 From: Idel Pivnitskiy Date: Fri, 13 Oct 2023 08:28:16 -0700 Subject: [PATCH 8/9] Update Jackson 2.14.3 -> 2.15.3 (#2730) --- gradle.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle.properties b/gradle.properties index 68835894e2..3eb46df929 100644 --- a/gradle.properties +++ b/gradle.properties @@ -49,7 +49,7 @@ jerseyVersion=2.37 reactiveStreamsVersion=1.0.4 jcToolsVersion=4.0.1 # backward compatible with jackson 2.9+, we do not depend on any new features from later versions. -jacksonVersion=2.14.3 +jacksonVersion=2.15.2 openTracingVersion=0.33.0 zipkinReporterVersion=2.16.4 From f00a486f3f4e010b64e8654915f9a1985bcd2a57 Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Mon, 16 Oct 2023 17:04:08 -0700 Subject: [PATCH 9/9] Extract inner types from RoundRobinLoadBalancer (#2724) Motivation: The RoundRobinLoadBalancer contains a bunch of code that is doing a good deal more than load balancing, namely it's doing pooling, circuit breaking, and other things. This both makes the implementation hard to read and reason about and makes the inner pieces difficult to reuse for future load balancer implementation. Modifications: - Extract the types into their own files. - Hide a bit of the internal state to start to define abstraction boundaries. Note that this is pretty minimal so far, just accessors etc, so to keep this PR mechanical in nature. --- .../servicetalk/loadbalancer/Exceptions.java | 80 +++ .../loadbalancer/HealthCheckConfig.java | 40 ++ .../io/servicetalk/loadbalancer/Host.java | 491 ++++++++++++++++ .../loadbalancer/RoundRobinLoadBalancer.java | 536 +----------------- .../RoundRobinLoadBalancerFactory.java | 1 - 5 files changed, 618 insertions(+), 530 deletions(-) create mode 100644 servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/Exceptions.java create mode 100644 servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/HealthCheckConfig.java create mode 100644 servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/Host.java diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/Exceptions.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/Exceptions.java new file mode 100644 index 0000000000..a89e9e5791 --- /dev/null +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/Exceptions.java @@ -0,0 +1,80 @@ +/* + * Copyright © 2021-2023 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.loadbalancer; + +import io.servicetalk.client.api.ConnectionRejectedException; +import io.servicetalk.client.api.NoActiveHostException; +import io.servicetalk.client.api.NoAvailableHostException; +import io.servicetalk.concurrent.internal.ThrowableUtils; + +final class Exceptions { + + static final class StacklessNoAvailableHostException extends NoAvailableHostException { + private static final long serialVersionUID = 5942960040738091793L; + + private StacklessNoAvailableHostException(final String message) { + super(message); + } + + @Override + public Throwable fillInStackTrace() { + return this; + } + + static StacklessNoAvailableHostException newInstance(String message, Class clazz, String method) { + return ThrowableUtils.unknownStackTrace(new StacklessNoAvailableHostException(message), clazz, method); + } + } + + static final class StacklessNoActiveHostException extends NoActiveHostException { + + private static final long serialVersionUID = 7500474499335155869L; + + private StacklessNoActiveHostException(final String message) { + super(message); + } + + @Override + public Throwable fillInStackTrace() { + return this; + } + + static StacklessNoActiveHostException newInstance(String message, Class clazz, String method) { + return ThrowableUtils.unknownStackTrace(new StacklessNoActiveHostException(message), clazz, method); + } + } + + static final class StacklessConnectionRejectedException extends ConnectionRejectedException { + private static final long serialVersionUID = -4940708893680455819L; + + private StacklessConnectionRejectedException(final String message) { + super(message); + } + + @Override + public Throwable fillInStackTrace() { + return this; + } + + static StacklessConnectionRejectedException newInstance(String message, Class clazz, String method) { + return ThrowableUtils.unknownStackTrace(new StacklessConnectionRejectedException(message), clazz, method); + } + } + + private Exceptions() { + // no instances + } +} diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/HealthCheckConfig.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/HealthCheckConfig.java new file mode 100644 index 0000000000..8fad1f4198 --- /dev/null +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/HealthCheckConfig.java @@ -0,0 +1,40 @@ +/* + * Copyright © 2021-2023 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.loadbalancer; + +import io.servicetalk.concurrent.api.Executor; + +import java.time.Duration; + +final class HealthCheckConfig { + final Executor executor; + final Duration healthCheckInterval; + final Duration jitter; + final int failedThreshold; + final long healthCheckResubscribeLowerBound; + final long healthCheckResubscribeUpperBound; + + HealthCheckConfig(final Executor executor, final Duration healthCheckInterval, final Duration healthCheckJitter, + final int failedThreshold, final long healthCheckResubscribeLowerBound, + final long healthCheckResubscribeUpperBound) { + this.executor = executor; + this.healthCheckInterval = healthCheckInterval; + this.failedThreshold = failedThreshold; + this.jitter = healthCheckJitter; + this.healthCheckResubscribeLowerBound = healthCheckResubscribeLowerBound; + this.healthCheckResubscribeUpperBound = healthCheckResubscribeUpperBound; + } +} diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/Host.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/Host.java new file mode 100644 index 0000000000..f4961c2eb4 --- /dev/null +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/Host.java @@ -0,0 +1,491 @@ +/* + * Copyright © 2021-2023 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.loadbalancer; + +import io.servicetalk.client.api.ConnectionFactory; +import io.servicetalk.client.api.ConnectionLimitReachedException; +import io.servicetalk.client.api.LoadBalancedConnection; +import io.servicetalk.concurrent.api.AsyncCloseable; +import io.servicetalk.concurrent.api.AsyncContext; +import io.servicetalk.concurrent.api.Completable; +import io.servicetalk.concurrent.api.ListenableAsyncCloseable; +import io.servicetalk.concurrent.internal.DelayedCancellable; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.AbstractMap; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.function.Function; +import java.util.stream.Stream; +import javax.annotation.Nullable; + +import static io.servicetalk.concurrent.api.AsyncCloseables.toAsyncCloseable; +import static io.servicetalk.concurrent.api.Completable.completed; +import static io.servicetalk.concurrent.api.Publisher.from; +import static io.servicetalk.concurrent.api.RetryStrategies.retryWithConstantBackoffDeltaJitter; +import static io.servicetalk.concurrent.internal.FlowControlUtils.addWithOverflowProtection; +import static java.util.concurrent.atomic.AtomicReferenceFieldUpdater.newUpdater; +import static java.util.stream.Collectors.toList; + +final class Host implements ListenableAsyncCloseable { + + private static final Object[] EMPTY_ARRAY = new Object[0]; + private static final Logger LOGGER = LoggerFactory.getLogger(Host.class); + + private enum State { + // The enum is not exhaustive, as other states have dynamic properties. + // For clarity, the other state classes are listed as comments: + // ACTIVE - see ActiveState + // UNHEALTHY - see HealthCheck + EXPIRED, + CLOSED + } + + private static final ActiveState STATE_ACTIVE_NO_FAILURES = new ActiveState(); + private static final ConnState ACTIVE_EMPTY_CONN_STATE = new ConnState(EMPTY_ARRAY, STATE_ACTIVE_NO_FAILURES); + private static final ConnState CLOSED_CONN_STATE = new ConnState(EMPTY_ARRAY, State.CLOSED); + + @SuppressWarnings("rawtypes") + private static final AtomicReferenceFieldUpdater connStateUpdater = + newUpdater(Host.class, ConnState.class, "connState"); + + private final String lbDescription; + final Addr address; + @Nullable + private final HealthCheckConfig healthCheckConfig; + private final ListenableAsyncCloseable closeable; + private volatile ConnState connState = ACTIVE_EMPTY_CONN_STATE; + + Host(String lbDescription, Addr address, @Nullable HealthCheckConfig healthCheckConfig) { + this.lbDescription = lbDescription; + this.address = address; + this.healthCheckConfig = healthCheckConfig; + this.closeable = toAsyncCloseable(graceful -> + graceful ? doClose(AsyncCloseable::closeAsyncGracefully) : doClose(AsyncCloseable::closeAsync)); + } + + boolean markActiveIfNotClosed() { + final Object oldState = connStateUpdater.getAndUpdate(this, oldConnState -> { + if (oldConnState.state == State.EXPIRED) { + return new ConnState(oldConnState.connections, STATE_ACTIVE_NO_FAILURES); + } + // If oldConnState.state == State.ACTIVE this could mean either a duplicate event, + // or a repeated CAS operation. We could issue a warning, but as we don't know, we don't log anything. + // UNHEALTHY state cannot transition to ACTIVE without passing the health check. + return oldConnState; + }).state; + return oldState != State.CLOSED; + } + + void markClosed() { + final ConnState oldState = closeConnState(); + final Object[] toRemove = oldState.connections; + cancelIfHealthCheck(oldState); + LOGGER.debug("{}: closing {} connection(s) gracefully to the closed address: {}.", + lbDescription, toRemove.length, address); + for (Object conn : toRemove) { + @SuppressWarnings("unchecked") + final C cConn = (C) conn; + cConn.closeAsyncGracefully().subscribe(); + } + } + + private ConnState closeConnState() { + for (;;) { + // We need to keep the oldState.connections around even if we are closed because the user may do + // closeGracefully with a timeout, which fails, and then force close. If we discard connections when + // closeGracefully is started we may leak connections. + final ConnState oldState = connState; + if (oldState.state == State.CLOSED || connStateUpdater.compareAndSet(this, oldState, + new ConnState(oldState.connections, State.CLOSED))) { + return oldState; + } + } + } + + void markExpired() { + for (;;) { + ConnState oldState = connStateUpdater.get(this); + if (oldState.state == State.EXPIRED || oldState.state == State.CLOSED) { + break; + } + Object nextState = oldState.connections.length == 0 ? State.CLOSED : State.EXPIRED; + + if (connStateUpdater.compareAndSet(this, oldState, + new ConnState(oldState.connections, nextState))) { + cancelIfHealthCheck(oldState); + if (nextState == State.CLOSED) { + // Trigger the callback to remove the host from usedHosts array. + this.closeAsync().subscribe(); + } + break; + } + } + } + + void markHealthy(final HealthCheck originalHealthCheckState) { + // Marking healthy is called when we need to recover from an unexpected error. + // However, it is possible that in the meantime, the host entered an EXPIRED state, then ACTIVE, then failed + // to open connections and entered the UNHEALTHY state before the original thread continues execution here. + // In such case, the flipped state is not the same as the one that just succeeded to open a connection. + // In an unlikely scenario that the following connection attempts fail indefinitely, a health check task + // would leak and would not be cancelled. Therefore, we cancel it here and allow failures to trigger a new + // health check. + ConnState oldState = connStateUpdater.getAndUpdate(this, previous -> { + if (Host.isUnhealthy(previous)) { + return new ConnState(previous.connections, STATE_ACTIVE_NO_FAILURES); + } + return previous; + }); + if (oldState.state != originalHealthCheckState) { + cancelIfHealthCheck(oldState); + } + } + + void markUnhealthy(final Throwable cause, final ConnectionFactory connectionFactory) { + assert healthCheckConfig != null; + for (;;) { + ConnState previous = connStateUpdater.get(this); + + if (!Host.isActive(previous) || previous.connections.length > 0 + || cause instanceof ConnectionLimitReachedException) { + LOGGER.debug("{}: failed to open a new connection to the host on address {}. {}.", + lbDescription, address, previous, cause); + break; + } + + ActiveState previousState = (ActiveState) previous.state; + if (previousState.failedConnections + 1 < healthCheckConfig.failedThreshold) { + final ActiveState nextState = previousState.forNextFailedConnection(); + if (connStateUpdater.compareAndSet(this, previous, + new ConnState(previous.connections, nextState))) { + LOGGER.debug("{}: failed to open a new connection to the host on address {}" + + " {} time(s) ({} consecutive failures will trigger health-checking).", + lbDescription, address, nextState.failedConnections, + healthCheckConfig.failedThreshold, cause); + break; + } + // another thread won the race, try again + continue; + } + + final HealthCheck healthCheck = new HealthCheck<>(connectionFactory, this, cause); + final ConnState nextState = new ConnState(previous.connections, healthCheck); + if (connStateUpdater.compareAndSet(this, previous, nextState)) { + LOGGER.info("{}: failed to open a new connection to the host on address {} " + + "{} time(s) in a row. Error counting threshold reached, marking this host as " + + "UNHEALTHY for the selection algorithm and triggering background health-checking.", + lbDescription, address, healthCheckConfig.failedThreshold, cause); + healthCheck.schedule(cause); + break; + } + } + } + + boolean isActiveAndHealthy() { + return isActive(connState); + } + + boolean isUnhealthy() { + return isUnhealthy(connState); + } + + private static boolean isActive(final ConnState connState) { + return ActiveState.class.equals(connState.state.getClass()); + } + + private static boolean isUnhealthy(ConnState connState) { + return HealthCheck.class.equals(connState.state.getClass()); + } + + boolean addConnection(final C connection, final @Nullable HealthCheck currentHealthCheck) { + int addAttempt = 0; + for (;;) { + final ConnState previous = connStateUpdater.get(this); + if (previous.state == State.CLOSED) { + return false; + } + ++addAttempt; + + final Object[] existing = previous.connections; + // Brute force iteration to avoid duplicates. If connections grow larger and faster lookup is required + // we can keep a Set for faster lookups (at the cost of more memory) as well as array. + for (final Object o : existing) { + if (o.equals(connection)) { + return true; + } + } + Object[] newList = Arrays.copyOf(existing, existing.length + 1); + newList[existing.length] = connection; + + // If we were able to add a new connection to the list, we should mark the host as ACTIVE again and + // reset its failures counter. + final Object newState = Host.isActive(previous) || Host.isUnhealthy(previous) ? + STATE_ACTIVE_NO_FAILURES : previous.state; + + if (connStateUpdater.compareAndSet(this, + previous, new ConnState(newList, newState))) { + // It could happen that the Host turned into UNHEALTHY state either concurrently with adding a new + // connection or with passing a previous health-check (if SD turned it into ACTIVE state). In both + // cases we have to cancel the "previous" ongoing health check. See "markHealthy" for more context. + if (Host.isUnhealthy(previous) && + (currentHealthCheck == null || previous.state != currentHealthCheck)) { + assert newState == STATE_ACTIVE_NO_FAILURES; + cancelIfHealthCheck(previous); + } + break; + } + } + + LOGGER.trace("{}: added a new connection {} to {} after {} attempt(s).", + lbDescription, connection, this, addAttempt); + // Instrument the new connection so we prune it on close + connection.onClose().beforeFinally(() -> { + int removeAttempt = 0; + for (;;) { + final ConnState currentConnState = this.connState; + if (currentConnState.state == State.CLOSED) { + break; + } + assert currentConnState.connections.length > 0; + ++removeAttempt; + int i = 0; + final Object[] connections = currentConnState.connections; + // Search for the connection in the list. + for (; i < connections.length; ++i) { + if (connections[i].equals(connection)) { + break; + } + } + if (i == connections.length) { + // Connection was already removed, nothing to do. + break; + } else if (connections.length == 1) { + assert !Host.isUnhealthy(currentConnState) : "Cannot be UNHEALTHY with #connections > 0"; + if (Host.isActive(currentConnState)) { + if (connStateUpdater.compareAndSet(this, currentConnState, + new ConnState(EMPTY_ARRAY, currentConnState.state))) { + break; + } + } else if (currentConnState.state == State.EXPIRED + // We're closing the last connection, close the Host. + // Closing the host will trigger the Host's onClose method, which will remove the host + // from used hosts list. If a race condition appears and a new connection was added + // in the meantime, that would mean the host is available again and the CAS operation + // will allow for determining that. It will prevent closing the Host and will only + // remove the connection (previously considered as the last one) from the array + // in the next iteration. + && connStateUpdater.compareAndSet(this, currentConnState, CLOSED_CONN_STATE)) { + this.closeAsync().subscribe(); + break; + } + } else { + Object[] newList = new Object[connections.length - 1]; + System.arraycopy(connections, 0, newList, 0, i); + System.arraycopy(connections, i + 1, newList, i, newList.length - i); + if (connStateUpdater.compareAndSet(this, + currentConnState, new ConnState(newList, currentConnState.state))) { + break; + } + } + } + LOGGER.trace("{}: removed connection {} from {} after {} attempt(s).", + lbDescription, connection, this, removeAttempt); + }).onErrorComplete(t -> { + // Use onErrorComplete instead of whenOnError to avoid double logging of an error inside subscribe(): + // SimpleCompletableSubscriber. + LOGGER.error("{}: unexpected error while processing connection.onClose() for {}.", + lbDescription, connection, t); + return true; + }).subscribe(); + return true; + } + + // Used for testing only + @SuppressWarnings("unchecked") + Map.Entry> asEntry() { + return new AbstractMap.SimpleImmutableEntry<>(address, + Stream.of(connState.connections).map(conn -> (C) conn).collect(toList())); + } + + Object[] connections() { + return connState.connections; + } + + @Override + public Completable closeAsync() { + return closeable.closeAsync(); + } + + @Override + public Completable closeAsyncGracefully() { + return closeable.closeAsyncGracefully(); + } + + @Override + public Completable onClose() { + return closeable.onClose(); + } + + @Override + public Completable onClosing() { + return closeable.onClosing(); + } + + @SuppressWarnings("unchecked") + private Completable doClose(final Function closeFunction) { + return Completable.defer(() -> { + final ConnState oldState = closeConnState(); + cancelIfHealthCheck(oldState); + final Object[] connections = oldState.connections; + return (connections.length == 0 ? completed() : + from(connections).flatMapCompletableDelayError(conn -> closeFunction.apply((C) conn))) + .shareContextOnSubscribe(); + }); + } + + private void cancelIfHealthCheck(ConnState connState) { + if (Host.isUnhealthy(connState)) { + @SuppressWarnings("unchecked") + HealthCheck healthCheck = (HealthCheck) connState.state; + LOGGER.debug("{}: health check cancelled for {}.", lbDescription, healthCheck.host); + healthCheck.cancel(); + } + } + + @Override + public String toString() { + final ConnState connState = this.connState; + return "Host{" + + "lbDescription=" + lbDescription + + ", address=" + address + + ", state=" + connState.state + + ", #connections=" + connState.connections.length + + '}'; + } + + private static final class ActiveState { + private final int failedConnections; + + ActiveState() { + this(0); + } + + private ActiveState(int failedConnections) { + this.failedConnections = failedConnections; + } + + ActiveState forNextFailedConnection() { + return new ActiveState(addWithOverflowProtection(this.failedConnections, 1)); + } + + @Override + public String toString() { + return "ACTIVE(failedConnections=" + failedConnections + ')'; + } + } + + private static final class HealthCheck + extends DelayedCancellable { + private final ConnectionFactory connectionFactory; + private final Host host; + private final Throwable lastError; + + private HealthCheck(final ConnectionFactory connectionFactory, + final Host host, final Throwable lastError) { + this.connectionFactory = connectionFactory; + this.host = host; + this.lastError = lastError; + } + + public void schedule(final Throwable originalCause) { + assert host.healthCheckConfig != null; + delayedCancellable( + // Use retry strategy to utilize jitter. + retryWithConstantBackoffDeltaJitter(cause -> true, + host.healthCheckConfig.healthCheckInterval, + host.healthCheckConfig.jitter, + host.healthCheckConfig.executor) + .apply(0, originalCause) + // Remove any state from async context + .beforeOnSubscribe(__ -> AsyncContext.clear()) + .concat(connectionFactory.newConnection(host.address, null, null) + // There is no risk for StackOverflowError because result of each connection + // attempt will be invoked on IoExecutor as a new task. + .retryWhen(retryWithConstantBackoffDeltaJitter( + cause -> { + LOGGER.debug("{}: health check failed for {}.", + host.lbDescription, host, cause); + return true; + }, + host.healthCheckConfig.healthCheckInterval, + host.healthCheckConfig.jitter, + host.healthCheckConfig.executor))) + .flatMapCompletable(newCnx -> { + if (host.addConnection(newCnx, this)) { + LOGGER.info("{}: health check passed for {}, marked this " + + "host as ACTIVE for the selection algorithm.", + host.lbDescription, host); + return completed(); + } else { + // This happens only if the host is closed, no need to mark as healthy. + assert host.connState.state == State.CLOSED; + LOGGER.debug("{}: health check passed for {}, but the " + + "host rejected a new connection {}. Closing it now.", + host.lbDescription, host, newCnx); + return newCnx.closeAsync(); + } + }) + // Use onErrorComplete instead of whenOnError to avoid double logging of an error inside + // subscribe(): SimpleCompletableSubscriber. + .onErrorComplete(t -> { + LOGGER.error("{}: health check terminated with " + + "an unexpected error for {}. Marking this host as ACTIVE as a fallback " + + "to allow connection attempts.", host.lbDescription, host, t); + host.markHealthy(this); + return true; + }) + .subscribe()); + } + + @Override + public String toString() { + return "UNHEALTHY(" + lastError + ')'; + } + } + + private static final class ConnState { + final Object[] connections; + final Object state; + + ConnState(final Object[] connections, final Object state) { + this.connections = connections; + this.state = state; + } + + @Override + public String toString() { + return "ConnState{" + + "state=" + state + + ", #connections=" + connections.length + + '}'; + } + } +} diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java index 4a1034dc43..35c3a1a52f 100644 --- a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java @@ -16,36 +16,27 @@ package io.servicetalk.loadbalancer; import io.servicetalk.client.api.ConnectionFactory; -import io.servicetalk.client.api.ConnectionLimitReachedException; -import io.servicetalk.client.api.ConnectionRejectedException; import io.servicetalk.client.api.LoadBalancedConnection; import io.servicetalk.client.api.LoadBalancer; -import io.servicetalk.client.api.NoActiveHostException; -import io.servicetalk.client.api.NoAvailableHostException; import io.servicetalk.client.api.ServiceDiscovererEvent; import io.servicetalk.concurrent.PublisherSource.Processor; import io.servicetalk.concurrent.PublisherSource.Subscriber; import io.servicetalk.concurrent.PublisherSource.Subscription; -import io.servicetalk.concurrent.api.AsyncCloseable; -import io.servicetalk.concurrent.api.AsyncContext; import io.servicetalk.concurrent.api.Completable; import io.servicetalk.concurrent.api.CompositeCloseable; -import io.servicetalk.concurrent.api.Executor; import io.servicetalk.concurrent.api.ListenableAsyncCloseable; import io.servicetalk.concurrent.api.Publisher; import io.servicetalk.concurrent.api.Single; -import io.servicetalk.concurrent.internal.DelayedCancellable; import io.servicetalk.concurrent.internal.SequentialCancellable; -import io.servicetalk.concurrent.internal.ThrowableUtils; import io.servicetalk.context.api.ContextMap; +import io.servicetalk.loadbalancer.Exceptions.StacklessConnectionRejectedException; +import io.servicetalk.loadbalancer.Exceptions.StacklessNoActiveHostException; +import io.servicetalk.loadbalancer.Exceptions.StacklessNoAvailableHostException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.time.Duration; -import java.util.AbstractMap.SimpleImmutableEntry; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Comparator; import java.util.Iterator; @@ -58,7 +49,6 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.Consumer; -import java.util.function.Function; import java.util.function.Predicate; import java.util.function.UnaryOperator; import java.util.stream.Stream; @@ -71,16 +61,12 @@ import static io.servicetalk.client.api.ServiceDiscovererEvent.Status.UNAVAILABLE; import static io.servicetalk.concurrent.api.AsyncCloseables.newCompositeCloseable; import static io.servicetalk.concurrent.api.AsyncCloseables.toAsyncCloseable; -import static io.servicetalk.concurrent.api.Completable.completed; import static io.servicetalk.concurrent.api.Processors.newPublisherProcessorDropHeadOnOverflow; -import static io.servicetalk.concurrent.api.Publisher.from; -import static io.servicetalk.concurrent.api.RetryStrategies.retryWithConstantBackoffDeltaJitter; import static io.servicetalk.concurrent.api.Single.defer; import static io.servicetalk.concurrent.api.Single.failed; import static io.servicetalk.concurrent.api.Single.succeeded; import static io.servicetalk.concurrent.api.SourceAdapters.fromSource; import static io.servicetalk.concurrent.api.SourceAdapters.toSource; -import static io.servicetalk.concurrent.internal.FlowControlUtils.addWithOverflowProtection; import static java.lang.Integer.toHexString; import static java.lang.Math.min; import static java.lang.System.identityHashCode; @@ -88,7 +74,6 @@ import static java.util.Collections.singletonList; import static java.util.Objects.requireNonNull; import static java.util.concurrent.TimeUnit.NANOSECONDS; -import static java.util.concurrent.atomic.AtomicReferenceFieldUpdater.newUpdater; import static java.util.stream.Collectors.toList; /** @@ -101,7 +86,6 @@ final class RoundRobinLoadBalancer { private static final Logger LOGGER = LoggerFactory.getLogger(RoundRobinLoadBalancer.class); - private static final Object[] EMPTY_ARRAY = new Object[0]; @SuppressWarnings("rawtypes") private static final AtomicReferenceFieldUpdater usedHostsUpdater = @@ -233,7 +217,7 @@ private static boolean allUn final List> usedHosts) { boolean allUnhealthy = !usedHosts.isEmpty(); for (Host host : usedHosts) { - if (!Host.isUnhealthy(host.connState)) { + if (!host.isUnhealthy()) { allUnhealthy = false; break; } @@ -382,6 +366,7 @@ private List> markHostAsExpired( } private Host createHost(ResolvedAddress addr) { + // All hosts will share the healthcheck config of the parent RR loadbalancer. Host host = new Host<>(RoundRobinLoadBalancer.this.toString(), addr, healthCheckConfig); host.onClose().afterFinally(() -> usedHostsUpdater.updateAndGet(RoundRobinLoadBalancer.this, previousHosts -> { @@ -515,7 +500,7 @@ private Single selectConnection0(final Predicate selector, @Nullable final if (!forceNewConnectionAndReserve) { // Try first to see if an existing connection can be used - final Object[] connections = host.connState.connections; + final Object[] connections = host.connections(); // Exhaust the linear search space first: final int linearAttempts = min(connections.length, linearSearchSpace); for (int j = 0; j < linearAttempts; ++j) { @@ -568,7 +553,7 @@ private Single selectConnection0(final Predicate selector, @Nullable final // This LB implementation does not automatically provide TransportObserver. Therefore, we pass "null" here. // Users can apply a ConnectionFactoryFilter if they need to override this "null" value with TransportObserver. Single establishConnection = connectionFactory.newConnection(host.address, context, null); - if (host.healthCheckConfig != null) { + if (healthCheckConfig != null) { // Schedule health check before returning establishConnection = establishConnection.beforeOnError(t -> host.markUnhealthy(t, connectionFactory)); } @@ -636,513 +621,6 @@ List>> usedAddresses() { return usedHosts.stream().map(Host::asEntry).collect(toList()); } - static final class HealthCheckConfig { - private final Executor executor; - private final Duration healthCheckInterval; - private final Duration jitter; - private final int failedThreshold; - private final long healthCheckResubscribeLowerBound; - private final long healthCheckResubscribeUpperBound; - - HealthCheckConfig(final Executor executor, final Duration healthCheckInterval, final Duration healthCheckJitter, - final int failedThreshold, final long healthCheckResubscribeLowerBound, - final long healthCheckResubscribeUpperBound) { - this.executor = executor; - this.healthCheckInterval = healthCheckInterval; - this.failedThreshold = failedThreshold; - this.jitter = healthCheckJitter; - this.healthCheckResubscribeLowerBound = healthCheckResubscribeLowerBound; - this.healthCheckResubscribeUpperBound = healthCheckResubscribeUpperBound; - } - } - - private static final class Host implements ListenableAsyncCloseable { - - private enum State { - // The enum is not exhaustive, as other states have dynamic properties. - // For clarity, the other state classes are listed as comments: - // ACTIVE - see ActiveState - // UNHEALTHY - see HealthCheck - EXPIRED, - CLOSED - } - - private static final ActiveState STATE_ACTIVE_NO_FAILURES = new ActiveState(); - private static final ConnState ACTIVE_EMPTY_CONN_STATE = new ConnState(EMPTY_ARRAY, STATE_ACTIVE_NO_FAILURES); - private static final ConnState CLOSED_CONN_STATE = new ConnState(EMPTY_ARRAY, State.CLOSED); - - @SuppressWarnings("rawtypes") - private static final AtomicReferenceFieldUpdater connStateUpdater = - newUpdater(Host.class, ConnState.class, "connState"); - - private final String lbDescription; - final Addr address; - @Nullable - private final HealthCheckConfig healthCheckConfig; - private final ListenableAsyncCloseable closeable; - private volatile ConnState connState = ACTIVE_EMPTY_CONN_STATE; - - Host(String lbDescription, Addr address, @Nullable HealthCheckConfig healthCheckConfig) { - this.lbDescription = lbDescription; - this.address = address; - this.healthCheckConfig = healthCheckConfig; - this.closeable = toAsyncCloseable(graceful -> - graceful ? doClose(AsyncCloseable::closeAsyncGracefully) : doClose(AsyncCloseable::closeAsync)); - } - - boolean markActiveIfNotClosed() { - final Object oldState = connStateUpdater.getAndUpdate(this, oldConnState -> { - if (oldConnState.state == State.EXPIRED) { - return new ConnState(oldConnState.connections, STATE_ACTIVE_NO_FAILURES); - } - // If oldConnState.state == State.ACTIVE this could mean either a duplicate event, - // or a repeated CAS operation. We could issue a warning, but as we don't know, we don't log anything. - // UNHEALTHY state cannot transition to ACTIVE without passing the health check. - return oldConnState; - }).state; - return oldState != State.CLOSED; - } - - void markClosed() { - final ConnState oldState = closeConnState(); - final Object[] toRemove = oldState.connections; - cancelIfHealthCheck(oldState); - LOGGER.debug("{}: closing {} connection(s) gracefully to the closed address: {}.", - lbDescription, toRemove.length, address); - for (Object conn : toRemove) { - @SuppressWarnings("unchecked") - final C cConn = (C) conn; - cConn.closeAsyncGracefully().subscribe(); - } - } - - private ConnState closeConnState() { - for (;;) { - // We need to keep the oldState.connections around even if we are closed because the user may do - // closeGracefully with a timeout, which fails, and then force close. If we discard connections when - // closeGracefully is started we may leak connections. - final ConnState oldState = connState; - if (oldState.state == State.CLOSED || connStateUpdater.compareAndSet(this, oldState, - new ConnState(oldState.connections, State.CLOSED))) { - return oldState; - } - } - } - - void markExpired() { - for (;;) { - ConnState oldState = connStateUpdater.get(this); - if (oldState.state == State.EXPIRED || oldState.state == State.CLOSED) { - break; - } - Object nextState = oldState.connections.length == 0 ? State.CLOSED : State.EXPIRED; - - if (connStateUpdater.compareAndSet(this, oldState, - new ConnState(oldState.connections, nextState))) { - cancelIfHealthCheck(oldState); - if (nextState == State.CLOSED) { - // Trigger the callback to remove the host from usedHosts array. - this.closeAsync().subscribe(); - } - break; - } - } - } - - void markHealthy(final HealthCheck originalHealthCheckState) { - // Marking healthy is called when we need to recover from an unexpected error. - // However, it is possible that in the meantime, the host entered an EXPIRED state, then ACTIVE, then failed - // to open connections and entered the UNHEALTHY state before the original thread continues execution here. - // In such case, the flipped state is not the same as the one that just succeeded to open a connection. - // In an unlikely scenario that the following connection attempts fail indefinitely, a health check task - // would leak and would not be cancelled. Therefore, we cancel it here and allow failures to trigger a new - // health check. - ConnState oldState = connStateUpdater.getAndUpdate(this, previous -> { - if (Host.isUnhealthy(previous)) { - return new ConnState(previous.connections, STATE_ACTIVE_NO_FAILURES); - } - return previous; - }); - if (oldState.state != originalHealthCheckState) { - cancelIfHealthCheck(oldState); - } - } - - void markUnhealthy(final Throwable cause, final ConnectionFactory connectionFactory) { - assert healthCheckConfig != null; - for (;;) { - ConnState previous = connStateUpdater.get(this); - - if (!Host.isActive(previous) || previous.connections.length > 0 - || cause instanceof ConnectionLimitReachedException) { - LOGGER.debug("{}: failed to open a new connection to the host on address {}. {}.", - lbDescription, address, previous, cause); - break; - } - - ActiveState previousState = (ActiveState) previous.state; - if (previousState.failedConnections + 1 < healthCheckConfig.failedThreshold) { - final ActiveState nextState = previousState.forNextFailedConnection(); - if (connStateUpdater.compareAndSet(this, previous, - new ConnState(previous.connections, nextState))) { - LOGGER.debug("{}: failed to open a new connection to the host on address {}" + - " {} time(s) ({} consecutive failures will trigger health-checking).", - lbDescription, address, nextState.failedConnections, - healthCheckConfig.failedThreshold, cause); - break; - } - // another thread won the race, try again - continue; - } - - final HealthCheck healthCheck = new HealthCheck<>(connectionFactory, this, cause); - final ConnState nextState = new ConnState(previous.connections, healthCheck); - if (connStateUpdater.compareAndSet(this, previous, nextState)) { - LOGGER.info("{}: failed to open a new connection to the host on address {} " + - "{} time(s) in a row. Error counting threshold reached, marking this host as " + - "UNHEALTHY for the selection algorithm and triggering background health-checking.", - lbDescription, address, healthCheckConfig.failedThreshold, cause); - healthCheck.schedule(cause); - break; - } - } - } - - boolean isActiveAndHealthy() { - return isActive(connState); - } - - static boolean isActive(final ConnState connState) { - return ActiveState.class.equals(connState.state.getClass()); - } - - static boolean isUnhealthy(final ConnState connState) { - return HealthCheck.class.equals(connState.state.getClass()); - } - - boolean addConnection(final C connection, final @Nullable HealthCheck currentHealthCheck) { - int addAttempt = 0; - for (;;) { - final ConnState previous = connStateUpdater.get(this); - if (previous.state == State.CLOSED) { - return false; - } - ++addAttempt; - - final Object[] existing = previous.connections; - // Brute force iteration to avoid duplicates. If connections grow larger and faster lookup is required - // we can keep a Set for faster lookups (at the cost of more memory) as well as array. - for (final Object o : existing) { - if (o.equals(connection)) { - return true; - } - } - Object[] newList = Arrays.copyOf(existing, existing.length + 1); - newList[existing.length] = connection; - - // If we were able to add a new connection to the list, we should mark the host as ACTIVE again and - // reset its failures counter. - final Object newState = Host.isActive(previous) || Host.isUnhealthy(previous) ? - STATE_ACTIVE_NO_FAILURES : previous.state; - - if (connStateUpdater.compareAndSet(this, - previous, new ConnState(newList, newState))) { - // It could happen that the Host turned into UNHEALTHY state either concurrently with adding a new - // connection or with passing a previous health-check (if SD turned it into ACTIVE state). In both - // cases we have to cancel the "previous" ongoing health check. See "markHealthy" for more context. - if (Host.isUnhealthy(previous) && - (currentHealthCheck == null || previous.state != currentHealthCheck)) { - assert newState == STATE_ACTIVE_NO_FAILURES; - cancelIfHealthCheck(previous); - } - break; - } - } - - LOGGER.trace("{}: added a new connection {} to {} after {} attempt(s).", - lbDescription, connection, this, addAttempt); - // Instrument the new connection so we prune it on close - connection.onClose().beforeFinally(() -> { - int removeAttempt = 0; - for (;;) { - final ConnState currentConnState = this.connState; - if (currentConnState.state == State.CLOSED) { - break; - } - assert currentConnState.connections.length > 0; - ++removeAttempt; - int i = 0; - final Object[] connections = currentConnState.connections; - // Search for the connection in the list. - for (; i < connections.length; ++i) { - if (connections[i].equals(connection)) { - break; - } - } - if (i == connections.length) { - // Connection was already removed, nothing to do. - break; - } else if (connections.length == 1) { - assert !Host.isUnhealthy(currentConnState) : "Cannot be UNHEALTHY with #connections > 0"; - if (Host.isActive(currentConnState)) { - if (connStateUpdater.compareAndSet(this, currentConnState, - new ConnState(EMPTY_ARRAY, currentConnState.state))) { - break; - } - } else if (currentConnState.state == State.EXPIRED - // We're closing the last connection, close the Host. - // Closing the host will trigger the Host's onClose method, which will remove the host - // from used hosts list. If a race condition appears and a new connection was added - // in the meantime, that would mean the host is available again and the CAS operation - // will allow for determining that. It will prevent closing the Host and will only - // remove the connection (previously considered as the last one) from the array - // in the next iteration. - && connStateUpdater.compareAndSet(this, currentConnState, CLOSED_CONN_STATE)) { - this.closeAsync().subscribe(); - break; - } - } else { - Object[] newList = new Object[connections.length - 1]; - System.arraycopy(connections, 0, newList, 0, i); - System.arraycopy(connections, i + 1, newList, i, newList.length - i); - if (connStateUpdater.compareAndSet(this, - currentConnState, new ConnState(newList, currentConnState.state))) { - break; - } - } - } - LOGGER.trace("{}: removed connection {} from {} after {} attempt(s).", - lbDescription, connection, this, removeAttempt); - }).onErrorComplete(t -> { - // Use onErrorComplete instead of whenOnError to avoid double logging of an error inside subscribe(): - // SimpleCompletableSubscriber. - LOGGER.error("{}: unexpected error while processing connection.onClose() for {}.", - lbDescription, connection, t); - return true; - }).subscribe(); - return true; - } - - // Used for testing only - @SuppressWarnings("unchecked") - Entry> asEntry() { - return new SimpleImmutableEntry<>(address, - Stream.of(connState.connections).map(conn -> (C) conn).collect(toList())); - } - - @Override - public Completable closeAsync() { - return closeable.closeAsync(); - } - - @Override - public Completable closeAsyncGracefully() { - return closeable.closeAsyncGracefully(); - } - - @Override - public Completable onClose() { - return closeable.onClose(); - } - - @Override - public Completable onClosing() { - return closeable.onClosing(); - } - - @SuppressWarnings("unchecked") - private Completable doClose(final Function closeFunction) { - return Completable.defer(() -> { - final ConnState oldState = closeConnState(); - cancelIfHealthCheck(oldState); - final Object[] connections = oldState.connections; - return (connections.length == 0 ? completed() : - from(connections).flatMapCompletableDelayError(conn -> closeFunction.apply((C) conn))) - .shareContextOnSubscribe(); - }); - } - - private void cancelIfHealthCheck(ConnState connState) { - if (Host.isUnhealthy(connState)) { - @SuppressWarnings("unchecked") - HealthCheck healthCheck = (HealthCheck) connState.state; - LOGGER.debug("{}: health check cancelled for {}.", lbDescription, healthCheck.host); - healthCheck.cancel(); - } - } - - @Override - public String toString() { - final ConnState connState = this.connState; - return "Host{" + - "lbDescription=" + lbDescription + - ", address=" + address + - ", state=" + connState.state + - ", #connections=" + connState.connections.length + - '}'; - } - - private static final class ActiveState { - private final int failedConnections; - - ActiveState() { - this(0); - } - - private ActiveState(int failedConnections) { - this.failedConnections = failedConnections; - } - - ActiveState forNextFailedConnection() { - return new ActiveState(addWithOverflowProtection(this.failedConnections, 1)); - } - - @Override - public String toString() { - return "ACTIVE(failedConnections=" + failedConnections + ')'; - } - } - - private static final class HealthCheck - extends DelayedCancellable { - private final ConnectionFactory connectionFactory; - private final Host host; - private final Throwable lastError; - - private HealthCheck(final ConnectionFactory connectionFactory, - final Host host, final Throwable lastError) { - this.connectionFactory = connectionFactory; - this.host = host; - this.lastError = lastError; - } - - public void schedule(final Throwable originalCause) { - assert host.healthCheckConfig != null; - delayedCancellable( - // Use retry strategy to utilize jitter. - retryWithConstantBackoffDeltaJitter(cause -> true, - host.healthCheckConfig.healthCheckInterval, - host.healthCheckConfig.jitter, - host.healthCheckConfig.executor) - .apply(0, originalCause) - // Remove any state from async context - .beforeOnSubscribe(__ -> AsyncContext.clear()) - .concat(connectionFactory.newConnection(host.address, null, null) - // There is no risk for StackOverflowError because result of each connection - // attempt will be invoked on IoExecutor as a new task. - .retryWhen(retryWithConstantBackoffDeltaJitter( - cause -> { - LOGGER.debug("{}: health check failed for {}.", - host.lbDescription, host, cause); - return true; - }, - host.healthCheckConfig.healthCheckInterval, - host.healthCheckConfig.jitter, - host.healthCheckConfig.executor))) - .flatMapCompletable(newCnx -> { - if (host.addConnection(newCnx, this)) { - LOGGER.info("{}: health check passed for {}, marked this " + - "host as ACTIVE for the selection algorithm.", - host.lbDescription, host); - return completed(); - } else { - // This happens only if the host is closed, no need to mark as healthy. - assert host.connState.state == State.CLOSED; - LOGGER.debug("{}: health check passed for {}, but the " + - "host rejected a new connection {}. Closing it now.", - host.lbDescription, host, newCnx); - return newCnx.closeAsync(); - } - }) - // Use onErrorComplete instead of whenOnError to avoid double logging of an error inside - // subscribe(): SimpleCompletableSubscriber. - .onErrorComplete(t -> { - LOGGER.error("{}: health check terminated with " + - "an unexpected error for {}. Marking this host as ACTIVE as a fallback " + - "to allow connection attempts.", host.lbDescription, host, t); - host.markHealthy(this); - return true; - }) - .subscribe()); - } - - @Override - public String toString() { - return "UNHEALTHY(" + lastError + ')'; - } - } - - private static final class ConnState { - final Object[] connections; - final Object state; - - ConnState(final Object[] connections, final Object state) { - this.connections = connections; - this.state = state; - } - - @Override - public String toString() { - return "ConnState{" + - "state=" + state + - ", #connections=" + connections.length + - '}'; - } - } - } - - private static final class StacklessNoAvailableHostException extends NoAvailableHostException { - private static final long serialVersionUID = 5942960040738091793L; - - private StacklessNoAvailableHostException(final String message) { - super(message); - } - - @Override - public Throwable fillInStackTrace() { - return this; - } - - public static StacklessNoAvailableHostException newInstance(String message, Class clazz, String method) { - return ThrowableUtils.unknownStackTrace(new StacklessNoAvailableHostException(message), clazz, method); - } - } - - private static final class StacklessNoActiveHostException extends NoActiveHostException { - - private static final long serialVersionUID = 7500474499335155869L; - - private StacklessNoActiveHostException(final String message) { - super(message); - } - - @Override - public Throwable fillInStackTrace() { - return this; - } - - public static StacklessNoActiveHostException newInstance(String message, Class clazz, String method) { - return ThrowableUtils.unknownStackTrace(new StacklessNoActiveHostException(message), clazz, method); - } - } - - private static final class StacklessConnectionRejectedException extends ConnectionRejectedException { - private static final long serialVersionUID = -4940708893680455819L; - - private StacklessConnectionRejectedException(final String message) { - super(message); - } - - @Override - public Throwable fillInStackTrace() { - return this; - } - - public static StacklessConnectionRejectedException newInstance(String message, Class clazz, String method) { - return ThrowableUtils.unknownStackTrace(new StacklessConnectionRejectedException(message), clazz, method); - } - } - private static boolean isClosedList(List list) { return list.getClass().equals(ClosedList.class); } diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerFactory.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerFactory.java index 9b4362b6f1..e47b9a3464 100644 --- a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerFactory.java +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerFactory.java @@ -24,7 +24,6 @@ import io.servicetalk.concurrent.api.Executor; import io.servicetalk.concurrent.api.Executors; import io.servicetalk.concurrent.api.Publisher; -import io.servicetalk.loadbalancer.RoundRobinLoadBalancer.HealthCheckConfig; import io.servicetalk.transport.api.ExecutionStrategy; import java.time.Duration;