From d85365147af33b30f76566e13d86144d8e788179 Mon Sep 17 00:00:00 2001 From: jrhee17 Date: Thu, 25 May 2023 20:45:52 +0900 Subject: [PATCH] Provide blockhound integrations (#4493) Motivation: Armeria users often use `BlockHound` to determine whether there are any blocking calls in their applications. Providing a basic `BlockHound` integration can help de-duplicate some code and debugging efforts. I propose the following: - Our CI checks for blocking calls in our tests to keep our `BlockHound` integration up-to-date with changes. - We provide a **lenient** list of allowed blocking calls that may be commonly used even though not related directly to Armeria - Some JDK implementations internally use locks (`ThreadPoolExecutor`). As long as these locks are verified to be short-living, we can add it to our integration. - Although somewhat subjective, if short-living locks are held by Armeria integrations (not necessarily directly related to Armeria) I think it's reasonable to add it to our list. - HdrHistogram is used internally by Micrometer. Micrometer itself doesn't expect metrics to be recorded from event loops, so it may not be reasonable to expect `Micrometer` or `HdrHistogram` to provide `Blockhound` integrations. Since Armeria uses `Micrometer` to record metrics from event loops, I think it is reasonable that we add `HdrHistogram` related calls to our `Blockhound` integration. - The integration is public so that users can opt-out of some integrations if they choose to do so. - We can possibly provide more customization points if users feel some rules are too lenient. - I don't consider the current listing as complete/finalized, and expect that users can suggest additional rules in the future. Also note that there are some blocking calls within the codebase, but this PR attempts to just introduce blockhound rather than remove such calls. Modifications: - Modify the build action so that - The blockhound flag is set for java 19 - Upload blockhound logs which contains a list of blocked threads and their stacktraces - Change the artifact upload name. Currently, because the name can be duplicated for java19 (mac, windows, self-hosted), only one of these is downloadable - Provide `BlockHound` integrations for `core`, `brave`, `grpc`, `retrofit2`, `scala`, `sangria` modules - Provide an internal testing `BlockHound` integration which allows test-specific blocking calls - This integration also adds a callback which logs blocking calls to a separate file - If the `blockhound` gradle property flag is specified, run tests with `BlockHound` enabled - Otherwise, `BlockHound.install` is not invoked and not enabled in the code-base - Introduce `ReentrantShortLock` which Armeria's `BlockHound` whitelists and change all usages of `ReentrantLock` - `BlockingFastThreadLocalThread` is now used for non-event loops since netty whitelists `FastThreadLocalThread` with `permitBlockingCalls==true` - https://github.com/line/armeria/pull/4493#issuecomment-1309880934 - Note that this means that armeria requires at least Netty 4.1.85 now - `ShutdownSupport::shutdown` is now called from `startStopExecutor`. Although this method returns a CF, the call can actually be blocking - `START_STOP_EXECUTOR` is now a non-eventloop daemon thread instead of using the `GlobalEventExecutor` - This is because `GlobalEventExecutor` is an event loop. - Introduce `BlockingUtils#blockingRun` in case users want to run blocking calls from tests - Add the `kotlinx-coroutines-debug` dependency so that Coroutines `BlockHound` integration is loaded - https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-debug/kotlinx.coroutines.debug/-coroutines-block-hound-integration/ - Allow users to specify the `ThreadFactory` when creating a `EventLoopExtension` or `EventLoopGroupExtension` - This can be useful when users would like to create an event loop which allows blocking calls - `SamlService`, `SamlDecorator` now runs some calls from the blocking executor - `ZookeeperEndpointGroup` now uses a non-event-loop thread factory Result: - Users can now use `BlockHound` integrations provided by armeria --- .github/workflows/actions_build.yml | 15 +++- .../brave/BraveBlockHoundIntegration.java | 34 ++++++++ ...ockhound.integration.BlockHoundIntegration | 1 + .../it/brave/BraveIntegrationTest.java | 10 +-- build.gradle | 15 ++++ .../client/AbstractEventLoopState.java | 4 +- .../client/DefaultEventLoopScheduler.java | 3 +- .../client/cookie/DefaultCookieJar.java | 3 +- .../client/endpoint/DynamicEndpointGroup.java | 4 +- .../client/endpoint/FileWatcherRegistry.java | 3 +- .../client/endpoint/RestartableThread.java | 3 +- ...tedRandomDistributionEndpointSelector.java | 3 +- .../DefaultHealthCheckerContext.java | 3 +- .../HealthCheckedEndpointGroup.java | 3 +- .../healthcheck/HttpHealthChecker.java | 3 +- .../common/CoreBlockHoundIntegration.java | 69 +++++++++++++++ .../common/util/AbstractListenable.java | 3 +- .../armeria/common/util/AbstractOption.java | 3 +- .../common/util/CompositeException.java | 3 +- .../util/NonEventLoopThreadFactory.java | 13 ++- .../armeria/common/util/ShutdownHooks.java | 3 +- .../armeria/common/util/StartStopSupport.java | 3 +- .../armeria/internal/common/JacksonUtil.java | 3 +- .../common/ReflectiveDependencyInjector.java | 3 +- .../common/metric/CaffeineMetricSupport.java | 3 +- .../util/MinifiedBouncyCastleProvider.java | 2 +- .../common/util/ReentrantShortLock.java | 35 ++++++++ .../AnnotatedBeanFactoryRegistry.java | 3 +- .../com/linecorp/armeria/server/Server.java | 16 ++-- .../armeria/server/ServerBuilder.java | 11 ++- .../healthcheck/HealthCheckService.java | 3 +- ...ockhound.integration.BlockHoundIntegration | 1 + .../armeria/client/Http1HeaderNamingTest.java | 45 +++++++--- .../HttpClientMaxConcurrentStreamTest.java | 10 +-- .../client/HttpClientPipeliningTest.java | 3 +- .../client/endpoint/SelectionTimeoutTest.java | 3 +- .../proxy/ProxyClientIntegrationTest.java | 15 +++- .../common/metric/EventLoopMetricsTest.java | 13 ++- .../util/EventLoopCheckingFutureTest.java | 3 +- .../common/util/StartStopSupportTest.java | 3 +- .../common/util/ThreadFactoryTest.java | 4 +- .../common/stream/FixedStreamMessageTest.java | 8 +- .../server/GracefulShutdownSupportTest.java | 2 +- .../Http1ServerEarlyDisconnectionTest.java | 7 +- .../linecorp/armeria/server/ServerTest.java | 11 +-- dependencies.toml | 12 +++ .../server/graphql/DefaultGraphqlService.java | 2 + .../grpc/GrpcBlockHoundIntegration.java | 35 ++++++++ ...ockhound.integration.BlockHoundIntegration | 1 + .../RequestContextStorageCustomizingTest.java | 4 +- .../MultipartCollectIntegrationTest.java | 8 +- .../common/TraceRequestContextLeakTest.java | 20 +++-- .../common/AbstractEventLoopGroupRule.java | 6 +- .../junit4/common/EventLoopGroupRule.java | 16 +++- .../testing/junit4/common/EventLoopRule.java | 15 +++- .../testing/EventLoopGroupRuleDelegate.java | 12 +-- .../AbstractEventLoopGroupExtension.java | 6 +- .../junit5/common/EventLoopExtension.java | 15 +++- .../common/EventLoopGroupExtension.java | 16 +++- .../RetrofitBlockHoundIntegration.java | 34 ++++++++ ...ockhound.integration.BlockHoundIntegration | 1 + .../armeria/server/saml/SamlDecorator.java | 5 +- .../armeria/server/saml/SamlService.java | 5 +- ...ockhound.integration.BlockHoundIntegration | 1 + .../SangriaBlockHoundIntegration.scala | 33 +++++++ ...ockhound.integration.BlockHoundIntegration | 1 + .../scala/ScalaBlockHoundIntegration.scala | 32 +++++++ .../checkstyle/checkstyle-suppressions.xml | 6 +- settings/checkstyle/checkstyle.xml | 6 ++ .../internal/testing/BlockingUtils.java | 41 +++++++++ .../InternalTestingBlockHoundIntegration.java | 86 +++++++++++++++++++ ...ockhound.integration.BlockHoundIntegration | 1 + .../thrift/ThriftOverHttpClientTest.java | 15 ++-- .../zookeeper/ZooKeeperEndpointGroup.java | 5 +- 74 files changed, 719 insertions(+), 132 deletions(-) create mode 100644 brave/src/main/java/com/linecorp/armeria/common/brave/BraveBlockHoundIntegration.java create mode 100644 brave/src/main/resources/META-INF/services/reactor.blockhound.integration.BlockHoundIntegration create mode 100644 core/src/main/java/com/linecorp/armeria/common/CoreBlockHoundIntegration.java create mode 100644 core/src/main/java/com/linecorp/armeria/internal/common/util/ReentrantShortLock.java create mode 100644 core/src/main/resources/META-INF/services/reactor.blockhound.integration.BlockHoundIntegration create mode 100644 grpc/src/main/java/com/linecorp/armeria/common/grpc/GrpcBlockHoundIntegration.java create mode 100644 grpc/src/main/resources/META-INF/services/reactor.blockhound.integration.BlockHoundIntegration create mode 100644 retrofit2/src/main/java/com/linecorp/armeria/client/retrofit2/RetrofitBlockHoundIntegration.java create mode 100644 retrofit2/src/main/resources/META-INF/services/reactor.blockhound.integration.BlockHoundIntegration create mode 100644 sangria/sangria_2.13/src/main/resources/META-INF/services/reactor.blockhound.integration.BlockHoundIntegration create mode 100644 sangria/sangria_2.13/src/main/scala/com/linecorp/armeria/server/sangria/SangriaBlockHoundIntegration.scala create mode 100644 scala/scala_2.13/src/main/resources/META-INF/services/reactor.blockhound.integration.BlockHoundIntegration create mode 100644 scala/scala_2.13/src/main/scala/com/linecorp/armeria/scala/ScalaBlockHoundIntegration.scala create mode 100644 testing-internal/src/main/java/com/linecorp/armeria/internal/testing/BlockingUtils.java create mode 100644 testing-internal/src/main/java/com/linecorp/armeria/internal/testing/InternalTestingBlockHoundIntegration.java create mode 100644 testing-internal/src/main/resources/META-INF/services/reactor.blockhound.integration.BlockHoundIntegration diff --git a/.github/workflows/actions_build.yml b/.github/workflows/actions_build.yml index e37c8e9a3db..5df8b5633a0 100644 --- a/.github/workflows/actions_build.yml +++ b/.github/workflows/actions_build.yml @@ -45,6 +45,8 @@ jobs: - java: 19 on: self-hosted snapshot: true + # blockhound makes the build run about 10 minutes slower + blockhound: true steps: - uses: actions/checkout@v2 @@ -86,6 +88,7 @@ jobs: ${{ (matrix.on == 'self-hosted') && '--max-workers=8' || '--max-workers=2' }} --parallel \ ${{ matrix.coverage && '-Pcoverage' || '' }} \ ${{ matrix.leak && '-Pleak' || '' }} \ + ${{ matrix.blockhound && '-Pblockhound' || '' }} \ -PnoLint \ -PflakyTests=false \ -PbuildJdkVersion=${{ env.BUILD_JDK_VERSION }} \ @@ -133,17 +136,23 @@ jobs: if: ${{ matrix.coverage }} uses: codecov/codecov-action@v1 + - name: Fail the run if any threads were blocked + if: ${{ matrix.blockhound }} + run: "if [[ -z `find . -name 'blockhound.log' -size +0` ]]; then exit 0; else exit 1; fi" + shell: bash + - name: Collect the test reports if: failure() - run: find . '(' -name 'java_pid*.hprof' -or -name 'hs_err_*.log' -or -path '*/build/reports/tests' -or -path '*/build/test-results' -or -path '*/javadoc.options' ')' -exec tar rf "reports-JVM-${{ matrix.java }}.tar" {} ';' + run: | + find . '(' -name 'java_pid*.hprof' -or -name 'hs_err_*.log' -or -path '*/build/reports/tests' -or -path '*/build/test-results' -or -path '*/javadoc.options' -or -name 'blockhound.log' ')' -exec tar rf "reports-JVM-${{ matrix.on }}-${{ matrix.java }}.tar" {} ';' shell: bash - name: Upload the artifacts if: failure() uses: actions/upload-artifact@v2 with: - name: reports-JVM-${{ matrix.java }} - path: reports-JVM-${{ matrix.java }}.tar + name: reports-JVM-${{ matrix.on }}-${{ matrix.java }} + path: reports-JVM-${{ matrix.on }}-${{ matrix.java }}.tar retention-days: 3 lint: diff --git a/brave/src/main/java/com/linecorp/armeria/common/brave/BraveBlockHoundIntegration.java b/brave/src/main/java/com/linecorp/armeria/common/brave/BraveBlockHoundIntegration.java new file mode 100644 index 00000000000..fbd586d73ae --- /dev/null +++ b/brave/src/main/java/com/linecorp/armeria/common/brave/BraveBlockHoundIntegration.java @@ -0,0 +1,34 @@ +/* + * Copyright 2022 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.common.brave; + +import com.linecorp.armeria.common.annotation.UnstableApi; + +import reactor.blockhound.BlockHound.Builder; +import reactor.blockhound.integration.BlockHoundIntegration; + +/** + * A {@link BlockHoundIntegration} for the brave module. + */ +@UnstableApi +public final class BraveBlockHoundIntegration implements BlockHoundIntegration { + + @Override + public void applyTo(Builder builder) { + builder.allowBlockingCallsInside("zipkin2.reporter.AsyncReporter$BoundedAsyncReporter", "report"); + } +} diff --git a/brave/src/main/resources/META-INF/services/reactor.blockhound.integration.BlockHoundIntegration b/brave/src/main/resources/META-INF/services/reactor.blockhound.integration.BlockHoundIntegration new file mode 100644 index 00000000000..5cd2f11865f --- /dev/null +++ b/brave/src/main/resources/META-INF/services/reactor.blockhound.integration.BlockHoundIntegration @@ -0,0 +1 @@ +com.linecorp.armeria.common.brave.BraveBlockHoundIntegration diff --git a/brave/src/test/java/com/linecorp/armeria/it/brave/BraveIntegrationTest.java b/brave/src/test/java/com/linecorp/armeria/it/brave/BraveIntegrationTest.java index 4b20920f658..d9183a61867 100644 --- a/brave/src/test/java/com/linecorp/armeria/it/brave/BraveIntegrationTest.java +++ b/brave/src/test/java/com/linecorp/armeria/it/brave/BraveIntegrationTest.java @@ -65,8 +65,8 @@ import com.linecorp.armeria.common.brave.HelloService.AsyncIface; import com.linecorp.armeria.common.brave.RequestContextCurrentTraceContext; import com.linecorp.armeria.common.thrift.ThriftFuture; -import com.linecorp.armeria.common.util.Exceptions; import com.linecorp.armeria.common.util.ThreadFactories; +import com.linecorp.armeria.internal.testing.BlockingUtils; import com.linecorp.armeria.server.AbstractHttpService; import com.linecorp.armeria.server.HttpService; import com.linecorp.armeria.server.ServerBuilder; @@ -569,17 +569,13 @@ private static class SpanHandlerImpl extends SpanHandler { @Override public boolean end(TraceContext context, MutableSpan span, Cause cause) { - return spans.add(span); + return BlockingUtils.blockingRun(() -> spans.add(span)); } MutableSpan[] take(int numSpans) { final List taken = new ArrayList<>(); while (taken.size() < numSpans) { - try { - taken.add(spans.poll(30, TimeUnit.SECONDS)); - } catch (InterruptedException e) { - return Exceptions.throwUnsafely(e); - } + BlockingUtils.blockingRun(() -> taken.add(spans.poll(30, TimeUnit.SECONDS))); } // Reverse the collected spans to sort the spans by request time. diff --git a/build.gradle b/build.gradle index 3dc92ece434..d4cd5ca61ba 100644 --- a/build.gradle +++ b/build.gradle @@ -78,10 +78,17 @@ allprojects { systemProperty "java.security.manager", "allow" } + // required by blockhound for jvm 13+. See https://github.com/reactor/BlockHound/issues/33. + if (rootProject.hasProperty('blockhound') && project.ext.testJavaVersion >= 13) { + jvmArgs "-XX:+AllowRedefinitionToAddDeleteMethods" + } + // Use verbose exception/response reporting for easier debugging. systemProperty 'com.linecorp.armeria.verboseExceptions', 'true' systemProperty 'com.linecorp.armeria.verboseResponses', 'true' + systemProperty 'com.linecorp.armeria.blockhound.reportFile', "${project.buildDir}/blockhound.log" + // Pass special system property to tell our tests that we are measuring coverage. if (project.hasFlags('coverage')) { systemProperty 'com.linecorp.armeria.testing.coverage', 'true' @@ -165,6 +172,9 @@ configure(projectsWithFlags('java')) { // Reflections implementation libs.reflections + // Blockhound + optionalImplementation libs.blockhound + // Test-time dependencies testImplementation libs.guava.testlib testImplementation libs.junit4 @@ -183,6 +193,11 @@ configure(projectsWithFlags('java')) { testImplementation libs.apache.httpclient5 testImplementation libs.hamcrest testImplementation libs.hamcrest.library + testRuntimeOnly libs.kotlin.coroutines.debug + + if (rootProject.hasProperty('blockhound')) { + testRuntimeOnly libs.blockhound.junit.platform + } } // Configure the default DuplicatesStrategy for such as: diff --git a/core/src/main/java/com/linecorp/armeria/client/AbstractEventLoopState.java b/core/src/main/java/com/linecorp/armeria/client/AbstractEventLoopState.java index 4799a2fd96a..ac0b3c5e75a 100644 --- a/core/src/main/java/com/linecorp/armeria/client/AbstractEventLoopState.java +++ b/core/src/main/java/com/linecorp/armeria/client/AbstractEventLoopState.java @@ -21,6 +21,8 @@ import com.google.common.annotations.VisibleForTesting; +import com.linecorp.armeria.internal.common.util.ReentrantShortLock; + import io.netty.channel.EventLoop; abstract class AbstractEventLoopState { @@ -35,7 +37,7 @@ static AbstractEventLoopState of(List eventLoops, int maxNumEventLoop return new HeapBasedEventLoopState(eventLoops, maxNumEventLoops, scheduler); } - private final ReentrantLock lock = new ReentrantLock(); + private final ReentrantLock lock = new ReentrantShortLock(); private final List eventLoops; private final DefaultEventLoopScheduler scheduler; diff --git a/core/src/main/java/com/linecorp/armeria/client/DefaultEventLoopScheduler.java b/core/src/main/java/com/linecorp/armeria/client/DefaultEventLoopScheduler.java index 913f2cedc8f..97c2340ee2d 100644 --- a/core/src/main/java/com/linecorp/armeria/client/DefaultEventLoopScheduler.java +++ b/core/src/main/java/com/linecorp/armeria/client/DefaultEventLoopScheduler.java @@ -41,6 +41,7 @@ import com.linecorp.armeria.common.SessionProtocol; import com.linecorp.armeria.common.annotation.Nullable; import com.linecorp.armeria.common.util.ReleasableHolder; +import com.linecorp.armeria.internal.common.util.ReentrantShortLock; import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; @@ -58,7 +59,7 @@ final class DefaultEventLoopScheduler implements EventLoopScheduler { static final int DEFAULT_MAX_NUM_EVENT_LOOPS = 1; - private final ReentrantLock lock = new ReentrantLock(); + private final ReentrantLock lock = new ReentrantShortLock(); private final List eventLoops; diff --git a/core/src/main/java/com/linecorp/armeria/client/cookie/DefaultCookieJar.java b/core/src/main/java/com/linecorp/armeria/client/cookie/DefaultCookieJar.java index d907966c022..81880e0dc9a 100644 --- a/core/src/main/java/com/linecorp/armeria/client/cookie/DefaultCookieJar.java +++ b/core/src/main/java/com/linecorp/armeria/client/cookie/DefaultCookieJar.java @@ -34,6 +34,7 @@ import com.linecorp.armeria.common.Cookies; import com.linecorp.armeria.common.Scheme; import com.linecorp.armeria.common.SessionProtocol; +import com.linecorp.armeria.internal.common.util.ReentrantShortLock; import io.netty.util.NetUtil; import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap; @@ -60,7 +61,7 @@ final class DefaultCookieJar implements CookieJar { this.cookiePolicy = cookiePolicy; store = new Object2LongOpenHashMap<>(); filter = new HashMap<>(); - lock = new ReentrantLock(); + lock = new ReentrantShortLock(); } @Override diff --git a/core/src/main/java/com/linecorp/armeria/client/endpoint/DynamicEndpointGroup.java b/core/src/main/java/com/linecorp/armeria/client/endpoint/DynamicEndpointGroup.java index 07ddf6e3399..91765b810f8 100644 --- a/core/src/main/java/com/linecorp/armeria/client/endpoint/DynamicEndpointGroup.java +++ b/core/src/main/java/com/linecorp/armeria/client/endpoint/DynamicEndpointGroup.java @@ -30,7 +30,6 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import com.google.common.base.MoreObjects; import com.google.common.base.MoreObjects.ToStringHelper; @@ -45,6 +44,7 @@ import com.linecorp.armeria.common.util.AsyncCloseableSupport; import com.linecorp.armeria.common.util.EventLoopCheckingFuture; import com.linecorp.armeria.common.util.ListenableAsyncCloseable; +import com.linecorp.armeria.internal.common.util.ReentrantShortLock; /** * A dynamic {@link EndpointGroup}. The list of {@link Endpoint}s can be updated dynamically. @@ -66,7 +66,7 @@ public static DynamicEndpointGroupBuilder builder() { private final EndpointSelectionStrategy selectionStrategy; private final AtomicReference selector = new AtomicReference<>(); private volatile List endpoints = UNINITIALIZED_ENDPOINTS; - private final Lock endpointsLock = new ReentrantLock(); + private final Lock endpointsLock = new ReentrantShortLock(); private final CompletableFuture> initialEndpointsFuture = new InitialEndpointsFuture(); private final AsyncCloseableSupport closeable = AsyncCloseableSupport.of(this::closeAsync); diff --git a/core/src/main/java/com/linecorp/armeria/client/endpoint/FileWatcherRegistry.java b/core/src/main/java/com/linecorp/armeria/client/endpoint/FileWatcherRegistry.java index 541bb5c0db7..525e4c4e59b 100644 --- a/core/src/main/java/com/linecorp/armeria/client/endpoint/FileWatcherRegistry.java +++ b/core/src/main/java/com/linecorp/armeria/client/endpoint/FileWatcherRegistry.java @@ -38,6 +38,7 @@ import com.google.common.base.MoreObjects; import com.linecorp.armeria.client.endpoint.FileWatcherRunnable.FileWatchEvent; +import com.linecorp.armeria.internal.common.util.ReentrantShortLock; /** * A registry which wraps a {@link WatchService} and allows paths to be registered. @@ -133,7 +134,7 @@ void close() throws IOException { private final Map fileSystemWatchServiceMap = new HashMap<>(); - private final ReentrantLock lock = new ReentrantLock(); + private final ReentrantLock lock = new ReentrantShortLock(); /** * Registers a {@code filePath} and {@code callback} to the {@link WatchService}. When the diff --git a/core/src/main/java/com/linecorp/armeria/client/endpoint/RestartableThread.java b/core/src/main/java/com/linecorp/armeria/client/endpoint/RestartableThread.java index 62987faecde..8c3ba52104a 100644 --- a/core/src/main/java/com/linecorp/armeria/client/endpoint/RestartableThread.java +++ b/core/src/main/java/com/linecorp/armeria/client/endpoint/RestartableThread.java @@ -22,13 +22,14 @@ import java.util.function.Supplier; import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.internal.common.util.ReentrantShortLock; /** * A restartable thread utility class. */ final class RestartableThread { - private final ReentrantLock lock = new ReentrantLock(); + private final ReentrantLock lock = new ReentrantShortLock(); @Nullable private Thread thread; diff --git a/core/src/main/java/com/linecorp/armeria/client/endpoint/WeightedRandomDistributionEndpointSelector.java b/core/src/main/java/com/linecorp/armeria/client/endpoint/WeightedRandomDistributionEndpointSelector.java index 26b304392e0..f5313d4ed95 100644 --- a/core/src/main/java/com/linecorp/armeria/client/endpoint/WeightedRandomDistributionEndpointSelector.java +++ b/core/src/main/java/com/linecorp/armeria/client/endpoint/WeightedRandomDistributionEndpointSelector.java @@ -27,6 +27,7 @@ import com.linecorp.armeria.client.Endpoint; import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.internal.common.util.ReentrantShortLock; /** * This selector selects an {@link Endpoint} using random and the weight of the {@link Endpoint}. If there are @@ -37,7 +38,7 @@ */ final class WeightedRandomDistributionEndpointSelector { - private final ReentrantLock lock = new ReentrantLock(); + private final ReentrantLock lock = new ReentrantShortLock(); private final List allEntries; @GuardedBy("lock") private final List currentEntries; diff --git a/core/src/main/java/com/linecorp/armeria/client/endpoint/healthcheck/DefaultHealthCheckerContext.java b/core/src/main/java/com/linecorp/armeria/client/endpoint/healthcheck/DefaultHealthCheckerContext.java index c2351782d49..25e07ca8f2c 100644 --- a/core/src/main/java/com/linecorp/armeria/client/endpoint/healthcheck/DefaultHealthCheckerContext.java +++ b/core/src/main/java/com/linecorp/armeria/client/endpoint/healthcheck/DefaultHealthCheckerContext.java @@ -43,6 +43,7 @@ import com.linecorp.armeria.common.annotation.Nullable; import com.linecorp.armeria.common.util.AsyncCloseable; import com.linecorp.armeria.common.util.EventLoopCheckingFuture; +import com.linecorp.armeria.internal.common.util.ReentrantShortLock; import io.netty.channel.EventLoopGroup; import io.netty.util.concurrent.Future; @@ -54,7 +55,7 @@ final class DefaultHealthCheckerContext private final Endpoint endpoint; private final SessionProtocol protocol; private final ClientOptions clientOptions; - private final ReentrantLock lock = new ReentrantLock(); + private final ReentrantLock lock = new ReentrantShortLock(); /** * Keeps the {@link Future}s which were scheduled via this {@link ScheduledExecutorService}. diff --git a/core/src/main/java/com/linecorp/armeria/client/endpoint/healthcheck/HealthCheckedEndpointGroup.java b/core/src/main/java/com/linecorp/armeria/client/endpoint/healthcheck/HealthCheckedEndpointGroup.java index 2d1493423fb..7bba57f55d5 100644 --- a/core/src/main/java/com/linecorp/armeria/client/endpoint/healthcheck/HealthCheckedEndpointGroup.java +++ b/core/src/main/java/com/linecorp/armeria/client/endpoint/healthcheck/HealthCheckedEndpointGroup.java @@ -49,6 +49,7 @@ import com.linecorp.armeria.common.annotation.Nullable; import com.linecorp.armeria.common.metric.MeterIdPrefix; import com.linecorp.armeria.common.util.AsyncCloseable; +import com.linecorp.armeria.internal.common.util.ReentrantShortLock; import io.micrometer.core.instrument.binder.MeterBinder; @@ -110,7 +111,7 @@ public static HealthCheckedEndpointGroupBuilder builder(EndpointGroup delegate, @VisibleForTesting final HealthCheckStrategy healthCheckStrategy; - private final ReentrantLock lock = new ReentrantLock(); + private final ReentrantLock lock = new ReentrantShortLock(); @GuardedBy("lock") private final Deque contextGroupChain = new ArrayDeque<>(4); diff --git a/core/src/main/java/com/linecorp/armeria/client/endpoint/healthcheck/HttpHealthChecker.java b/core/src/main/java/com/linecorp/armeria/client/endpoint/healthcheck/HttpHealthChecker.java index b374b3734c7..f46f24608b2 100644 --- a/core/src/main/java/com/linecorp/armeria/client/endpoint/healthcheck/HttpHealthChecker.java +++ b/core/src/main/java/com/linecorp/armeria/client/endpoint/healthcheck/HttpHealthChecker.java @@ -49,6 +49,7 @@ import com.linecorp.armeria.common.util.AsyncCloseable; import com.linecorp.armeria.common.util.AsyncCloseableSupport; import com.linecorp.armeria.common.util.TimeoutMode; +import com.linecorp.armeria.internal.common.util.ReentrantShortLock; import com.linecorp.armeria.unsafe.PooledObjects; import io.netty.util.AsciiString; @@ -60,7 +61,7 @@ final class HttpHealthChecker implements AsyncCloseable { private static final AsciiString ARMERIA_LPHC = HttpHeaderNames.of("armeria-lphc"); - private final ReentrantLock lock = new ReentrantLock(); + private final ReentrantLock lock = new ReentrantShortLock(); private final HealthCheckerContext ctx; private final WebClient webClient; private final String authority; diff --git a/core/src/main/java/com/linecorp/armeria/common/CoreBlockHoundIntegration.java b/core/src/main/java/com/linecorp/armeria/common/CoreBlockHoundIntegration.java new file mode 100644 index 00000000000..3faf112ebac --- /dev/null +++ b/core/src/main/java/com/linecorp/armeria/common/CoreBlockHoundIntegration.java @@ -0,0 +1,69 @@ +/* + * Copyright 2023 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.common; + +import java.util.ResourceBundle; + +import com.linecorp.armeria.common.annotation.UnstableApi; + +import reactor.blockhound.BlockHound.Builder; +import reactor.blockhound.integration.BlockHoundIntegration; + +/** + * A {@link BlockHoundIntegration} for the core module. + */ +@UnstableApi +public final class CoreBlockHoundIntegration implements BlockHoundIntegration { + @Override + public void applyTo(Builder builder) { + // short locks + builder.allowBlockingCallsInside("com.linecorp.armeria.client.HttpClientFactory", + "pool"); + builder.allowBlockingCallsInside("com.linecorp.armeria.internal.common.util.ReentrantShortLock", + "lock"); + + // Thread.yield can be eventually called when PooledObjects.copyAndClose is called + builder.allowBlockingCallsInside("io.netty.util.internal.ReferenceCountUpdater", "release"); + + // hdr histogram holds locks + builder.allowBlockingCallsInside("org.HdrHistogram.ConcurrentHistogram", "getCountAtIndex"); + builder.allowBlockingCallsInside("org.HdrHistogram.WriterReaderPhaser", "flipPhase"); + + // StreamMessageInputStream internally uses a blocking queue + // ThreadPoolExecutor.execute internally uses a blocking queue + builder.allowBlockingCallsInside("java.util.concurrent.LinkedBlockingQueue", "offer"); + + // a single blocking call is incurred for the first invocation, but the result is cached. + builder.allowBlockingCallsInside("com.linecorp.armeria.internal.client.PublicSuffix", + "get"); + builder.allowBlockingCallsInside("java.util.ServiceLoader$LazyClassPathLookupIterator", + "parse"); + builder.allowBlockingCallsInside(ResourceBundle.class.getName(), "getBundle"); + builder.allowBlockingCallsInside("io.netty.handler.codec.compression.Brotli", ""); + + // a lock is held temporarily when adding workers + builder.allowBlockingCallsInside("java.util.concurrent.ThreadPoolExecutor", "addWorker"); + + // prometheus exporting holds a lock temporarily + builder.allowBlockingCallsInside( + "com.linecorp.armeria.server.metric.PrometheusExpositionService", "doGet"); + + // Thread.yield can be called + builder.allowBlockingCallsInside( + "java.util.concurrent.FutureTask", "handlePossibleCancellationInterrupt"); + } +} diff --git a/core/src/main/java/com/linecorp/armeria/common/util/AbstractListenable.java b/core/src/main/java/com/linecorp/armeria/common/util/AbstractListenable.java index fe52d45fdf9..636be2063ca 100644 --- a/core/src/main/java/com/linecorp/armeria/common/util/AbstractListenable.java +++ b/core/src/main/java/com/linecorp/armeria/common/util/AbstractListenable.java @@ -25,6 +25,7 @@ import com.linecorp.armeria.common.annotation.Nullable; import com.linecorp.armeria.internal.common.util.IdentityHashStrategy; +import com.linecorp.armeria.internal.common.util.ReentrantShortLock; import it.unimi.dsi.fastutil.objects.ObjectLinkedOpenCustomHashSet; @@ -40,7 +41,7 @@ public abstract class AbstractListenable implements Listenable { private final Set> updateListeners = new ObjectLinkedOpenCustomHashSet<>(IdentityHashStrategy.of()); - private final ReentrantLock reentrantLock = new ReentrantLock(); + private final ReentrantLock reentrantLock = new ReentrantShortLock(); /** * Notify the new value changes to the listeners added via {@link #addListener(Consumer)}. diff --git a/core/src/main/java/com/linecorp/armeria/common/util/AbstractOption.java b/core/src/main/java/com/linecorp/armeria/common/util/AbstractOption.java index 0f59e3df7a3..9111b600f83 100644 --- a/core/src/main/java/com/linecorp/armeria/common/util/AbstractOption.java +++ b/core/src/main/java/com/linecorp/armeria/common/util/AbstractOption.java @@ -31,6 +31,7 @@ import com.google.common.collect.ImmutableSet; import com.linecorp.armeria.client.ClientOption; +import com.linecorp.armeria.internal.common.util.ReentrantShortLock; /** * A configuration option. @@ -236,7 +237,7 @@ private static final class Pool { private final Class type; private final BiMap> options; - private final ReentrantLock reentrantLock = new ReentrantLock(); + private final ReentrantLock reentrantLock = new ReentrantShortLock(); Pool(Class type) { this.type = type; diff --git a/core/src/main/java/com/linecorp/armeria/common/util/CompositeException.java b/core/src/main/java/com/linecorp/armeria/common/util/CompositeException.java index b72e164f6f8..f030df86d8f 100644 --- a/core/src/main/java/com/linecorp/armeria/common/util/CompositeException.java +++ b/core/src/main/java/com/linecorp/armeria/common/util/CompositeException.java @@ -47,6 +47,7 @@ import com.linecorp.armeria.common.Flags; import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.internal.common.util.ReentrantShortLock; /** * Represents an exception that is a composite of one or more other exceptions. A {@code CompositeException} @@ -81,7 +82,7 @@ public final class CompositeException extends RuntimeException { @Nullable private Throwable cause; - private final ReentrantLock reentrantLock = new ReentrantLock(); + private final ReentrantLock reentrantLock = new ReentrantShortLock(); /** * Constructs a CompositeException with the given array of Throwables as the diff --git a/core/src/main/java/com/linecorp/armeria/common/util/NonEventLoopThreadFactory.java b/core/src/main/java/com/linecorp/armeria/common/util/NonEventLoopThreadFactory.java index a662c9b0767..b342967431d 100644 --- a/core/src/main/java/com/linecorp/armeria/common/util/NonEventLoopThreadFactory.java +++ b/core/src/main/java/com/linecorp/armeria/common/util/NonEventLoopThreadFactory.java @@ -35,6 +35,17 @@ final class NonEventLoopThreadFactory extends AbstractThreadFactory { @Override Thread newThread(@Nullable ThreadGroup threadGroup, Runnable r, String name) { - return new FastThreadLocalThread(threadGroup, r, name); + return new BlockingFastThreadLocalThread(threadGroup, r, name); + } + + private static class BlockingFastThreadLocalThread extends FastThreadLocalThread { + BlockingFastThreadLocalThread(@Nullable ThreadGroup threadGroup, Runnable r, String name) { + super(threadGroup, r, name); + } + + @Override + public boolean permitBlockingCalls() { + return true; + } } } diff --git a/core/src/main/java/com/linecorp/armeria/common/util/ShutdownHooks.java b/core/src/main/java/com/linecorp/armeria/common/util/ShutdownHooks.java index 902b7bc3e53..d7c95b0c1ee 100644 --- a/core/src/main/java/com/linecorp/armeria/common/util/ShutdownHooks.java +++ b/core/src/main/java/com/linecorp/armeria/common/util/ShutdownHooks.java @@ -33,6 +33,7 @@ import com.linecorp.armeria.common.annotation.Nullable; import com.linecorp.armeria.common.annotation.UnstableApi; +import com.linecorp.armeria.internal.common.util.ReentrantShortLock; /** * A utility class for adding a task with an {@link AutoCloseable} on shutdown. @@ -46,7 +47,7 @@ public final class ShutdownHooks { private static final Map> autoCloseableOnShutdownTasks = new LinkedHashMap<>(); - private static final ReentrantLock reentrantLock = new ReentrantLock(); + private static final ReentrantLock reentrantLock = new ReentrantShortLock(); private static final ThreadFactory THREAD_FACTORY = ThreadFactories .builder("armeria-shutdown-hook") diff --git a/core/src/main/java/com/linecorp/armeria/common/util/StartStopSupport.java b/core/src/main/java/com/linecorp/armeria/common/util/StartStopSupport.java index 51399fabe16..065d2d21273 100644 --- a/core/src/main/java/com/linecorp/armeria/common/util/StartStopSupport.java +++ b/core/src/main/java/com/linecorp/armeria/common/util/StartStopSupport.java @@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory; import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.internal.common.util.ReentrantShortLock; /** * Provides asynchronous start-stop life cycle support. @@ -63,7 +64,7 @@ enum State { */ private UnmodifiableFuture future = completedFuture(null); - private final ReentrantLock reentrantLock = new ReentrantLock(); + private final ReentrantLock reentrantLock = new ReentrantShortLock(); /** * Creates a new instance. diff --git a/core/src/main/java/com/linecorp/armeria/internal/common/JacksonUtil.java b/core/src/main/java/com/linecorp/armeria/internal/common/JacksonUtil.java index eb59f6ab85f..fda9434ad15 100644 --- a/core/src/main/java/com/linecorp/armeria/internal/common/JacksonUtil.java +++ b/core/src/main/java/com/linecorp/armeria/internal/common/JacksonUtil.java @@ -38,7 +38,8 @@ public final class JacksonUtil { static { final List providers = - ImmutableList.copyOf(ServiceLoader.load(JacksonObjectMapperProvider.class)); + ImmutableList.copyOf(ServiceLoader.load(JacksonObjectMapperProvider.class, + JacksonObjectMapperProvider.class.getClassLoader())); if (!providers.isEmpty()) { // Use a custom ObjectMapper provided via SPI. provider = providers.get(0); diff --git a/core/src/main/java/com/linecorp/armeria/internal/common/ReflectiveDependencyInjector.java b/core/src/main/java/com/linecorp/armeria/internal/common/ReflectiveDependencyInjector.java index 539194ed92c..939405d82b5 100644 --- a/core/src/main/java/com/linecorp/armeria/internal/common/ReflectiveDependencyInjector.java +++ b/core/src/main/java/com/linecorp/armeria/internal/common/ReflectiveDependencyInjector.java @@ -31,6 +31,7 @@ import com.linecorp.armeria.common.DependencyInjector; import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.internal.common.util.ReentrantShortLock; public final class ReflectiveDependencyInjector implements DependencyInjector { @@ -57,7 +58,7 @@ public static T create(Class type, @Nullable Map, Obje return instance; } - private final ReentrantLock lock = new ReentrantLock(); + private final ReentrantLock lock = new ReentrantShortLock(); private final Map, Object> instances = new HashMap<>(); diff --git a/core/src/main/java/com/linecorp/armeria/internal/common/metric/CaffeineMetricSupport.java b/core/src/main/java/com/linecorp/armeria/internal/common/metric/CaffeineMetricSupport.java index a16e2a0a3e1..50f01ed077f 100644 --- a/core/src/main/java/com/linecorp/armeria/internal/common/metric/CaffeineMetricSupport.java +++ b/core/src/main/java/com/linecorp/armeria/internal/common/metric/CaffeineMetricSupport.java @@ -43,6 +43,7 @@ import com.linecorp.armeria.common.annotation.Nullable; import com.linecorp.armeria.common.metric.MeterIdPrefix; import com.linecorp.armeria.common.util.Ticker; +import com.linecorp.armeria.internal.common.util.ReentrantShortLock; import io.micrometer.core.instrument.MeterRegistry; @@ -86,7 +87,7 @@ private static final class CaffeineMetrics { private final MeterRegistry parent; private final MeterIdPrefix idPrefix; - private final ReentrantLock lock = new ReentrantLock(); + private final ReentrantLock lock = new ReentrantShortLock(); @GuardedBy("lock") private final List cacheRefs = new ArrayList<>(2); private final AtomicBoolean hasLoadingCache = new AtomicBoolean(); diff --git a/core/src/main/java/com/linecorp/armeria/internal/common/util/MinifiedBouncyCastleProvider.java b/core/src/main/java/com/linecorp/armeria/internal/common/util/MinifiedBouncyCastleProvider.java index 666cc8af085..d5df712ec92 100644 --- a/core/src/main/java/com/linecorp/armeria/internal/common/util/MinifiedBouncyCastleProvider.java +++ b/core/src/main/java/com/linecorp/armeria/internal/common/util/MinifiedBouncyCastleProvider.java @@ -47,7 +47,7 @@ public final class MinifiedBouncyCastleProvider extends Provider implements Conf private static final String PROVIDER_NAME = "ArmeriaBC"; - private static final ReentrantLock lock = new ReentrantLock(); + private static final ReentrantLock lock = new ReentrantShortLock(); private static final Map keyInfoConverters = new ConcurrentHashMap<>(); diff --git a/core/src/main/java/com/linecorp/armeria/internal/common/util/ReentrantShortLock.java b/core/src/main/java/com/linecorp/armeria/internal/common/util/ReentrantShortLock.java new file mode 100644 index 00000000000..28bd8f13de1 --- /dev/null +++ b/core/src/main/java/com/linecorp/armeria/internal/common/util/ReentrantShortLock.java @@ -0,0 +1,35 @@ +/* + * Copyright 2023 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.internal.common.util; + +import java.util.concurrent.locks.ReentrantLock; + +import com.linecorp.armeria.common.CoreBlockHoundIntegration; + +/** + * A short lock which is whitelisted by {@link CoreBlockHoundIntegration}. + * This lock may be preferred over {@link ReentrantLock} when it is known that the + * lock won't block the event loop over long periods of time. + */ +public final class ReentrantShortLock extends ReentrantLock { + private static final long serialVersionUID = 8999619612996643502L; + + @Override + public void lock() { + super.lock(); + } +} diff --git a/core/src/main/java/com/linecorp/armeria/internal/server/annotation/AnnotatedBeanFactoryRegistry.java b/core/src/main/java/com/linecorp/armeria/internal/server/annotation/AnnotatedBeanFactoryRegistry.java index f260d605132..7e9508a6b83 100644 --- a/core/src/main/java/com/linecorp/armeria/internal/server/annotation/AnnotatedBeanFactoryRegistry.java +++ b/core/src/main/java/com/linecorp/armeria/internal/server/annotation/AnnotatedBeanFactoryRegistry.java @@ -49,6 +49,7 @@ import com.linecorp.armeria.common.DependencyInjector; import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.internal.common.util.ReentrantShortLock; import com.linecorp.armeria.internal.server.annotation.AnnotatedValueResolver.NoAnnotatedParameterException; import com.linecorp.armeria.internal.server.annotation.AnnotatedValueResolver.RequestObjectResolver; import com.linecorp.armeria.server.annotation.RequestConverter; @@ -63,7 +64,7 @@ final class AnnotatedBeanFactoryRegistry { private static final Logger logger = LoggerFactory.getLogger(AnnotatedBeanFactoryRegistry.class); - private static final ReentrantLock lock = new ReentrantLock(); + private static final ReentrantLock lock = new ReentrantShortLock(); private static final ClassValue factories = new ClassValue() { diff --git a/core/src/main/java/com/linecorp/armeria/server/Server.java b/core/src/main/java/com/linecorp/armeria/server/Server.java index 3d00a0b8f12..e943e188bdb 100644 --- a/core/src/main/java/com/linecorp/armeria/server/Server.java +++ b/core/src/main/java/com/linecorp/armeria/server/Server.java @@ -77,6 +77,7 @@ import com.linecorp.armeria.common.util.Version; import com.linecorp.armeria.internal.common.RequestTargetCache; import com.linecorp.armeria.internal.common.util.ChannelUtil; +import com.linecorp.armeria.internal.common.util.ReentrantShortLock; import com.linecorp.armeria.server.websocket.WebSocketService; import io.micrometer.core.instrument.Gauge; @@ -115,7 +116,7 @@ public static ServerBuilder builder() { private final UpdatableServerConfig config; private final StartStopSupport startStop; private final Set serverChannels = new NonBlockingHashSet<>(); - private final ReentrantLock lock = new ReentrantLock(); + private final ReentrantLock lock = new ReentrantShortLock(); @GuardedBy("lock") private final Map activePorts = new LinkedHashMap<>(); private final ConnectionLimitingHandler connectionLimitingHandler; @@ -700,11 +701,14 @@ private void finishDoStop(CompletableFuture future) { builder.addAll(serviceConfig.shutdownSupports()); } - CompletableFutures.successfulAsList(builder.build() - .stream() - .map(ShutdownSupport::shutdown) - .collect(toImmutableList()), cause -> null) - .thenRunAsync(() -> future.complete(null), config.startStopExecutor()); + CompletableFuture.runAsync(() -> { + // ShutdownSupport may be blocking so run the entire block inside the startStopExecutor + CompletableFutures.successfulAsList(builder.build() + .stream() + .map(ShutdownSupport::shutdown) + .collect(toImmutableList()), cause -> null) + .thenRunAsync(() -> future.complete(null), config.startStopExecutor()); + }, config.startStopExecutor()); } @Override diff --git a/core/src/main/java/com/linecorp/armeria/server/ServerBuilder.java b/core/src/main/java/com/linecorp/armeria/server/ServerBuilder.java index da99275e6f3..63f73597d42 100644 --- a/core/src/main/java/com/linecorp/armeria/server/ServerBuilder.java +++ b/core/src/main/java/com/linecorp/armeria/server/ServerBuilder.java @@ -47,6 +47,8 @@ import java.util.Map.Entry; import java.util.Optional; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -84,6 +86,7 @@ import com.linecorp.armeria.common.util.BlockingTaskExecutor; import com.linecorp.armeria.common.util.EventLoopGroups; import com.linecorp.armeria.common.util.SystemInfo; +import com.linecorp.armeria.common.util.ThreadFactories; import com.linecorp.armeria.internal.common.BuiltInDependencyInjector; import com.linecorp.armeria.internal.common.ReflectiveDependencyInjector; import com.linecorp.armeria.internal.common.RequestContextUtil; @@ -102,7 +105,6 @@ import io.netty.handler.ssl.SslContextBuilder; import io.netty.util.Mapping; import io.netty.util.NetUtil; -import io.netty.util.concurrent.GlobalEventExecutor; import it.unimi.dsi.fastutil.objects.Object2ObjectArrayMap; /** @@ -166,6 +168,8 @@ public final class ServerBuilder implements TlsSetters { @VisibleForTesting static final long MIN_PING_INTERVAL_MILLIS = 1000L; private static final long MIN_MAX_CONNECTION_AGE_MILLIS = 1_000L; + private static final ExecutorService START_STOP_EXECUTOR = Executors.newSingleThreadExecutor( + ThreadFactories.newThreadFactory("startstop-support", true)); static { RequestContextUtil.init(); @@ -180,7 +184,7 @@ public final class ServerBuilder implements TlsSetters { private EventLoopGroup workerGroup = CommonPools.workerGroup(); private boolean shutdownWorkerGroupOnStop; - private Executor startStopExecutor = GlobalEventExecutor.INSTANCE; + private Executor startStopExecutor = START_STOP_EXECUTOR; private final Map, Object> channelOptions = new Object2ObjectArrayMap<>(); private final Map, Object> childChannelOptions = new Object2ObjectArrayMap<>(); private int maxNumConnections = Flags.maxNumConnections(); @@ -503,8 +507,7 @@ public ServerBuilder workerGroup(int numThreads) { /** * Sets the {@link Executor} which will invoke the callbacks of {@link Server#start()}, - * {@link Server#stop()} and {@link ServerListener}. If not set, {@link GlobalEventExecutor} will be used - * by default. + * {@link Server#stop()} and {@link ServerListener}. */ public ServerBuilder startStopExecutor(Executor startStopExecutor) { this.startStopExecutor = requireNonNull(startStopExecutor, "startStopExecutor"); diff --git a/core/src/main/java/com/linecorp/armeria/server/healthcheck/HealthCheckService.java b/core/src/main/java/com/linecorp/armeria/server/healthcheck/HealthCheckService.java index a21986587d6..90ff64773a7 100644 --- a/core/src/main/java/com/linecorp/armeria/server/healthcheck/HealthCheckService.java +++ b/core/src/main/java/com/linecorp/armeria/server/healthcheck/HealthCheckService.java @@ -43,6 +43,7 @@ import com.linecorp.armeria.common.annotation.Nullable; import com.linecorp.armeria.common.util.TimeoutMode; import com.linecorp.armeria.internal.common.ArmeriaHttpUtil; +import com.linecorp.armeria.internal.common.util.ReentrantShortLock; import com.linecorp.armeria.server.HttpService; import com.linecorp.armeria.server.HttpStatusException; import com.linecorp.armeria.server.RequestTimeoutException; @@ -137,7 +138,7 @@ public static HealthCheckServiceBuilder builder() { private final long maxLongPollingTimeoutMillis; private final double longPollingTimeoutJitterRate; private final long pingIntervalMillis; - private final ReentrantLock lock = new ReentrantLock(); + private final ReentrantLock lock = new ReentrantShortLock(); @Nullable private final Consumer healthCheckerListener; @Nullable diff --git a/core/src/main/resources/META-INF/services/reactor.blockhound.integration.BlockHoundIntegration b/core/src/main/resources/META-INF/services/reactor.blockhound.integration.BlockHoundIntegration new file mode 100644 index 00000000000..d6e8ba5a1c9 --- /dev/null +++ b/core/src/main/resources/META-INF/services/reactor.blockhound.integration.BlockHoundIntegration @@ -0,0 +1 @@ +com.linecorp.armeria.common.CoreBlockHoundIntegration diff --git a/core/src/test/java/com/linecorp/armeria/client/Http1HeaderNamingTest.java b/core/src/test/java/com/linecorp/armeria/client/Http1HeaderNamingTest.java index aaee7c13497..e6607821505 100644 --- a/core/src/test/java/com/linecorp/armeria/client/Http1HeaderNamingTest.java +++ b/core/src/test/java/com/linecorp/armeria/client/Http1HeaderNamingTest.java @@ -28,6 +28,7 @@ import java.net.Socket; import java.util.concurrent.CompletableFuture; +import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; @@ -39,11 +40,35 @@ import com.linecorp.armeria.common.HttpResponse; import com.linecorp.armeria.common.HttpStatus; import com.linecorp.armeria.common.ResponseHeaders; -import com.linecorp.armeria.server.Server; import com.linecorp.armeria.server.ServerBuilder; +import com.linecorp.armeria.testing.junit5.server.ServerExtension; class Http1HeaderNamingTest { + @RegisterExtension + static ServerExtension traditionalHeaderNameServer = new ServerExtension() { + @Override + protected void configure(ServerBuilder sb) throws Exception { + sb.service("/", (ctx, req) -> HttpResponse + .of(ResponseHeaders.of(HttpStatus.OK, + HttpHeaderNames.AUTHORIZATION, "Bearer foo", + HttpHeaderNames.X_FORWARDED_FOR, "bar"))) + .http1HeaderNaming(Http1HeaderNaming.traditional()); + } + }; + + @RegisterExtension + static ServerExtension server = new ServerExtension() { + @Override + protected void configure(ServerBuilder sb) throws Exception { + sb.service("/", (ctx, req) -> HttpResponse + .of(ResponseHeaders.of(HttpStatus.OK, + HttpHeaderNames.AUTHORIZATION, "Bearer foo", + HttpHeaderNames.X_FORWARDED_FOR, "bar"))) + .http1HeaderNaming(Http1HeaderNaming.ofDefault()); + } + }; + @CsvSource({ "true", "false" }) @ParameterizedTest void clientTraditionalHeaderNaming(boolean useHeaderNaming) throws IOException { @@ -112,20 +137,12 @@ void clientTraditionalHeaderNaming(boolean useHeaderNaming) throws IOException { @CsvSource({ "true", "false" }) @ParameterizedTest void serverTraditionalHeaderNaming(boolean useHeaderNaming) throws IOException { - final ServerBuilder serverBuilder = Server - .builder() - .service("/", (ctx, req) -> HttpResponse - .of(ResponseHeaders.of(HttpStatus.OK, - HttpHeaderNames.AUTHORIZATION, "Bearer foo", - HttpHeaderNames.X_FORWARDED_FOR, "bar"))); - if (useHeaderNaming) { - serverBuilder.http1HeaderNaming(Http1HeaderNaming.traditional()); - } - final Server server = serverBuilder.build(); - server.start().join(); - try (Socket socket = new Socket()) { - socket.connect(server.activePort().localAddress()); + if (useHeaderNaming) { + socket.connect(traditionalHeaderNameServer.httpSocketAddress()); + } else { + socket.connect(server.httpSocketAddress()); + } final PrintWriter outWriter = new PrintWriter(socket.getOutputStream(), false); outWriter.print("GET / HTTP/1.1\r\n"); diff --git a/core/src/test/java/com/linecorp/armeria/client/HttpClientMaxConcurrentStreamTest.java b/core/src/test/java/com/linecorp/armeria/client/HttpClientMaxConcurrentStreamTest.java index f6f1bb79291..97cfd6d3344 100644 --- a/core/src/test/java/com/linecorp/armeria/client/HttpClientMaxConcurrentStreamTest.java +++ b/core/src/test/java/com/linecorp/armeria/client/HttpClientMaxConcurrentStreamTest.java @@ -43,6 +43,7 @@ import com.linecorp.armeria.common.annotation.Nullable; import com.linecorp.armeria.common.logging.ClientConnectionTimings; import com.linecorp.armeria.common.logging.RequestLogProperty; +import com.linecorp.armeria.internal.testing.BlockingUtils; import com.linecorp.armeria.server.ServerBuilder; import com.linecorp.armeria.testing.junit5.server.ServerExtension; @@ -316,13 +317,8 @@ void ensureCorrectPendingAcquisitionDurationBehavior() throws Exception { .decorator(connectionTimingsAccumulatingDecorator(connectionTimings)) .build(); final int sleepMillis = 300; - connectionPoolListener = newConnectionPoolListener(() -> { - try { - Thread.sleep(sleepMillis); - } catch (InterruptedException e) { - throw new IllegalStateException(e); - } - }, () -> {}); + connectionPoolListener = newConnectionPoolListener( + () -> BlockingUtils.blockingRun(() -> Thread.sleep(sleepMillis)), () -> {}); final int numConnections = MAX_NUM_CONNECTIONS; final int numRequests = MAX_CONCURRENT_STREAMS * numConnections; diff --git a/core/src/test/java/com/linecorp/armeria/client/HttpClientPipeliningTest.java b/core/src/test/java/com/linecorp/armeria/client/HttpClientPipeliningTest.java index 08d5957e891..e73bfc7936d 100644 --- a/core/src/test/java/com/linecorp/armeria/client/HttpClientPipeliningTest.java +++ b/core/src/test/java/com/linecorp/armeria/client/HttpClientPipeliningTest.java @@ -34,6 +34,7 @@ import com.linecorp.armeria.common.HttpStatus; import com.linecorp.armeria.common.MediaType; import com.linecorp.armeria.common.util.EventLoopGroups; +import com.linecorp.armeria.internal.testing.BlockingUtils; import com.linecorp.armeria.server.AbstractHttpService; import com.linecorp.armeria.server.ServerBuilder; import com.linecorp.armeria.server.ServiceRequestContext; @@ -72,7 +73,7 @@ protected HttpResponse doGet(ServiceRequestContext ctx, HttpRequest req) throws lock.unlock(); } - semaphore.acquireUninterruptibly(); + BlockingUtils.blockingRun(() -> semaphore.acquireUninterruptibly()); try { return HttpResponse.of(HttpStatus.OK, MediaType.PLAIN_TEXT_UTF_8, String.valueOf(ctx.remoteAddress())); diff --git a/core/src/test/java/com/linecorp/armeria/client/endpoint/SelectionTimeoutTest.java b/core/src/test/java/com/linecorp/armeria/client/endpoint/SelectionTimeoutTest.java index d6edbc06922..5a3395cb5a6 100644 --- a/core/src/test/java/com/linecorp/armeria/client/endpoint/SelectionTimeoutTest.java +++ b/core/src/test/java/com/linecorp/armeria/client/endpoint/SelectionTimeoutTest.java @@ -45,6 +45,7 @@ import com.linecorp.armeria.common.HttpRequest; import com.linecorp.armeria.common.util.UnmodifiableFuture; import com.linecorp.armeria.internal.client.ClientPendingThrowableUtil; +import com.linecorp.armeria.internal.testing.BlockingUtils; class SelectionTimeoutTest { @@ -308,7 +309,7 @@ void select_shouldRespectResponseTimeout() { try (MockEndpointGroup endpointGroup = new MockEndpointGroup(5000)) { final CompletableFuture result = endpointGroup.select(ctx, CommonPools.blockingTaskExecutor()); - assertThat(result.join()).isNull(); + assertThat(BlockingUtils.blockingRun(result::join)).isNull(); assertThat(stopwatch.elapsed()) .isGreaterThanOrEqualTo(Duration.ofSeconds(2)); } diff --git a/core/src/test/java/com/linecorp/armeria/client/proxy/ProxyClientIntegrationTest.java b/core/src/test/java/com/linecorp/armeria/client/proxy/ProxyClientIntegrationTest.java index 27022dea844..fae5cfd77fa 100644 --- a/core/src/test/java/com/linecorp/armeria/client/proxy/ProxyClientIntegrationTest.java +++ b/core/src/test/java/com/linecorp/armeria/client/proxy/ProxyClientIntegrationTest.java @@ -40,6 +40,7 @@ import java.util.function.Consumer; import java.util.stream.Stream; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Order; import org.junit.jupiter.api.Test; @@ -60,6 +61,8 @@ import com.linecorp.armeria.common.HttpResponse; import com.linecorp.armeria.common.HttpStatus; import com.linecorp.armeria.common.SessionProtocol; +import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.internal.testing.BlockingUtils; import com.linecorp.armeria.internal.testing.NettyServerExtension; import com.linecorp.armeria.internal.testing.SimpleChannelHandlerFactory; import com.linecorp.armeria.server.ServerBuilder; @@ -100,6 +103,9 @@ class ProxyClientIntegrationTest { private static SimpleChannelHandlerFactory channelHandlerFactory; + @Nullable + private static SslContext sslContext; + @RegisterExtension @Order(0) static final SelfSignedCertificateExtension ssc = new SelfSignedCertificateExtension(); @@ -146,6 +152,7 @@ protected void configure(Channel ch) throws Exception { static NettyServerExtension httpsProxyServer = new NettyServerExtension() { @Override protected void configure(Channel ch) throws Exception { + assert sslContext != null; final SslContext sslContext = SslContextBuilder .forServer(ssc.privateKey(), ssc.certificate()).build(); ch.pipeline().addLast(sslContext.newHandler(ch.alloc())); @@ -177,6 +184,12 @@ protected void configure(Channel ch) throws Exception { } }; + @BeforeAll + static void beforeAll() throws Exception { + sslContext = SslContextBuilder + .forServer(ssc.privateKey(), ssc.certificate()).build(); + } + @BeforeEach void beforeEach() { numSuccessfulProxyRequests = 0; @@ -738,7 +751,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc // first writing to the channel occurs after ProxySuccessEvent is triggered. // If the first writing happens before ProxySuccessEvent is triggered, // the client would get WriteTimeoutException that makes the test fail. - Thread.sleep(Flags.defaultWriteTimeoutMillis()); + BlockingUtils.blockingRun(() -> Thread.sleep(Flags.defaultWriteTimeoutMillis())); } super.userEventTriggered(ctx, evt); } diff --git a/core/src/test/java/com/linecorp/armeria/common/metric/EventLoopMetricsTest.java b/core/src/test/java/com/linecorp/armeria/common/metric/EventLoopMetricsTest.java index 5eac1b2a71c..9773b00aef0 100644 --- a/core/src/test/java/com/linecorp/armeria/common/metric/EventLoopMetricsTest.java +++ b/core/src/test/java/com/linecorp/armeria/common/metric/EventLoopMetricsTest.java @@ -24,14 +24,21 @@ import org.hamcrest.Matchers; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import com.linecorp.armeria.common.util.ThreadFactories; +import com.linecorp.armeria.testing.junit5.common.EventLoopGroupExtension; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.simple.SimpleMeterRegistry; -import io.netty.channel.DefaultEventLoopGroup; import io.netty.channel.EventLoopGroup; class EventLoopMetricsTest { + @RegisterExtension + static EventLoopGroupExtension eventLoopGroup = + new EventLoopGroupExtension(2, ThreadFactories.newThreadFactory("block-me", false)); + private class BlockMe extends CountDownLatch implements Runnable { AtomicInteger run = new AtomicInteger(); @@ -61,7 +68,7 @@ void test() { final BlockMe task = new BlockMe(); - final EventLoopGroup workers = new DefaultEventLoopGroup(2); + final EventLoopGroup workers = eventLoopGroup.get(); // Block both executors workers.submit(task); workers.submit(task); @@ -89,7 +96,5 @@ void test() { MoreMeters.measureAll(registry)) .containsEntry("foo.event.loop.workers#value", 2.0) .containsEntry("foo.event.loop.pending.tasks#value", 0.0)); - - workers.shutdownGracefully(); } } diff --git a/core/src/test/java/com/linecorp/armeria/common/util/EventLoopCheckingFutureTest.java b/core/src/test/java/com/linecorp/armeria/common/util/EventLoopCheckingFutureTest.java index 9f67e20cb14..7e8b34969c6 100644 --- a/core/src/test/java/com/linecorp/armeria/common/util/EventLoopCheckingFutureTest.java +++ b/core/src/test/java/com/linecorp/armeria/common/util/EventLoopCheckingFutureTest.java @@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory; import com.linecorp.armeria.common.CommonPools; +import com.linecorp.armeria.internal.testing.BlockingUtils; import com.linecorp.armeria.testing.junit5.common.EventLoopExtension; import ch.qos.logback.classic.Level; @@ -117,7 +118,7 @@ void getTimeoutOffEventLoop() throws Exception { private void testBlockingOperationOnEventLoop(EventLoopCheckingFutureTask task) { final EventLoopCheckingFuture future = new EventLoopCheckingFuture<>(); - eventLoop.get().submit(() -> task.run(future)); + eventLoop.get().submit(() -> BlockingUtils.blockingRun(() -> task.run(future))); try { await().untilAsserted(() -> { verify(appender, atLeast(0)).doAppend(eventCaptor.capture()); diff --git a/core/src/test/java/com/linecorp/armeria/common/util/StartStopSupportTest.java b/core/src/test/java/com/linecorp/armeria/common/util/StartStopSupportTest.java index cb3c0f8e167..46d6ebe71f5 100644 --- a/core/src/test/java/com/linecorp/armeria/common/util/StartStopSupportTest.java +++ b/core/src/test/java/com/linecorp/armeria/common/util/StartStopSupportTest.java @@ -63,7 +63,8 @@ public class StartStopSupportTest { private static final String THREAD_NAME_PREFIX = StartStopSupportTest.class.getSimpleName(); @ClassRule - public static final EventLoopRule rule = new EventLoopRule(THREAD_NAME_PREFIX); + public static final EventLoopRule rule = new EventLoopRule( + ThreadFactories.newThreadFactory(THREAD_NAME_PREFIX, false)); @Rule public TestRule globalTimeout = new DisableOnDebug(new Timeout(10, TimeUnit.SECONDS)); diff --git a/core/src/test/java/com/linecorp/armeria/common/util/ThreadFactoryTest.java b/core/src/test/java/com/linecorp/armeria/common/util/ThreadFactoryTest.java index eae443084ed..de5263179a3 100644 --- a/core/src/test/java/com/linecorp/armeria/common/util/ThreadFactoryTest.java +++ b/core/src/test/java/com/linecorp/armeria/common/util/ThreadFactoryTest.java @@ -68,7 +68,7 @@ void testNonEventLoopThreadFactory() { .build() .newThread(() -> {}); - assertThat(nonEventLoopThread.getClass()).isSameAs(FastThreadLocalThread.class); + assertThat(nonEventLoopThread).isInstanceOf(FastThreadLocalThread.class); assertThat(nonEventLoopThread.getName()).startsWith("normal-thread"); assertThat(nonEventLoopThread.getPriority()).isEqualTo(Thread.NORM_PRIORITY); assertThat(nonEventLoopThread.isDaemon()).isFalse(); @@ -82,7 +82,7 @@ void testNonEventLoopThreadFactory() { .build() .newThread(() -> {}); - assertThat(nonEventLoopCustomThread.getClass()).isSameAs(FastThreadLocalThread.class); + assertThat(nonEventLoopCustomThread).isInstanceOf(FastThreadLocalThread.class); assertThat(nonEventLoopCustomThread.getName()).startsWith("custom-thread"); assertThat(nonEventLoopCustomThread.getPriority()).isEqualTo(Thread.MAX_PRIORITY); assertThat(nonEventLoopCustomThread.isDaemon()).isTrue(); diff --git a/core/src/test/java/com/linecorp/armeria/internal/common/stream/FixedStreamMessageTest.java b/core/src/test/java/com/linecorp/armeria/internal/common/stream/FixedStreamMessageTest.java index a39ad345ac6..607abfb4505 100644 --- a/core/src/test/java/com/linecorp/armeria/internal/common/stream/FixedStreamMessageTest.java +++ b/core/src/test/java/com/linecorp/armeria/internal/common/stream/FixedStreamMessageTest.java @@ -44,6 +44,7 @@ import com.linecorp.armeria.common.stream.StreamMessage; import com.linecorp.armeria.internal.testing.AnticipatedException; +import com.linecorp.armeria.internal.testing.BlockingUtils; import com.linecorp.armeria.testing.junit5.common.EventLoopExtension; import io.netty.util.concurrent.EventExecutor; @@ -94,12 +95,7 @@ void raceBetweenSubscriptionAndAbort(StreamMessage stream) { @Override public void onSubscribe(Subscription s) { - try { - // Wait for `abort()` to be called. - latch.await(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } + BlockingUtils.blockingRun(() -> latch.await()); } @Override diff --git a/core/src/test/java/com/linecorp/armeria/server/GracefulShutdownSupportTest.java b/core/src/test/java/com/linecorp/armeria/server/GracefulShutdownSupportTest.java index 1479e001bb7..57b66007d2b 100644 --- a/core/src/test/java/com/linecorp/armeria/server/GracefulShutdownSupportTest.java +++ b/core/src/test/java/com/linecorp/armeria/server/GracefulShutdownSupportTest.java @@ -52,7 +52,7 @@ class GracefulShutdownSupportTest { void setUp() { executor = new ThreadPoolExecutor( 0, 1, 1, TimeUnit.SECONDS, new LinkedTransferQueue<>(), - ThreadFactories.newEventLoopThreadFactory("graceful-shutdown-test", true)); + ThreadFactories.newThreadFactory("graceful-shutdown-test", true)); support = GracefulShutdownSupport.create(Duration.ofNanos(QUIET_PERIOD_NANOS), executor, ticker); } diff --git a/core/src/test/java/com/linecorp/armeria/server/Http1ServerEarlyDisconnectionTest.java b/core/src/test/java/com/linecorp/armeria/server/Http1ServerEarlyDisconnectionTest.java index debaaf1b91d..dcb260efc6b 100644 --- a/core/src/test/java/com/linecorp/armeria/server/Http1ServerEarlyDisconnectionTest.java +++ b/core/src/test/java/com/linecorp/armeria/server/Http1ServerEarlyDisconnectionTest.java @@ -35,6 +35,7 @@ import com.linecorp.armeria.common.SessionProtocol; import com.linecorp.armeria.common.SplitHttpResponse; import com.linecorp.armeria.common.logging.RequestLog; +import com.linecorp.armeria.internal.testing.BlockingUtils; import com.linecorp.armeria.internal.testing.FlakyTest; import com.linecorp.armeria.testing.junit5.server.ServerExtension; @@ -101,8 +102,10 @@ public void onNext(HttpData httpData) { received += httpData.length(); if (received >= contentLength) { // All data is received, so it should be safe to close the connection. - clientFactory.close(); - latch.countDown(); + BlockingUtils.blockingRun(() -> { + clientFactory.close(); + latch.countDown(); + }); } } diff --git a/core/src/test/java/com/linecorp/armeria/server/ServerTest.java b/core/src/test/java/com/linecorp/armeria/server/ServerTest.java index 5a49e4cdf7d..d965591ee67 100644 --- a/core/src/test/java/com/linecorp/armeria/server/ServerTest.java +++ b/core/src/test/java/com/linecorp/armeria/server/ServerTest.java @@ -76,6 +76,7 @@ import com.linecorp.armeria.common.util.TimeoutMode; import com.linecorp.armeria.internal.common.metric.MicrometerUtil; import com.linecorp.armeria.internal.testing.AnticipatedException; +import com.linecorp.armeria.internal.testing.BlockingUtils; import com.linecorp.armeria.server.logging.AccessLogWriter; import com.linecorp.armeria.server.logging.LoggingService; import com.linecorp.armeria.testing.junit4.server.ServerRule; @@ -112,12 +113,8 @@ protected void configure(ServerBuilder sb) throws Exception { final HttpService delayedResponseOnIoThread = new EchoService() { @Override protected HttpResponse echo(AggregatedHttpRequest aReq) { - try { - Thread.sleep(processDelayMillis); - return super.echo(aReq); - } catch (InterruptedException e) { - return HttpResponse.ofFailure(e); - } + BlockingUtils.blockingRun(() -> Thread.sleep(processDelayMillis)); + return super.echo(aReq); } }.decorate(LoggingService.newDecorator()); @@ -416,7 +413,7 @@ void defaultStartStopExecutor() { threads.add(server.stop().thenApply(unused -> Thread.currentThread()).join()); threads.add(server.start().thenApply(unused -> Thread.currentThread()).join()); - threads.forEach(t -> assertThat(t.getName()).startsWith("globalEventExecutor")); + threads.forEach(t -> assertThat(t.getName()).startsWith("startstop-support")); } @Test diff --git a/dependencies.toml b/dependencies.toml index 6095c6efb69..600d6de3f4f 100644 --- a/dependencies.toml +++ b/dependencies.toml @@ -6,6 +6,7 @@ apache-httpclient5 = "5.2.1" apache-httpclient4 = "4.5.14" assertj = "3.24.2" awaitility = "4.2.0" +blockhound = "1.0.8.RELEASE" bouncycastle = "1.70" brave = "5.15.0" brotli4j = "1.11.0" @@ -180,6 +181,14 @@ version.ref = "assertj" module = "org.awaitility:awaitility" version.ref = "awaitility" +[libraries.blockhound-junit-platform] +module="io.projectreactor.tools:blockhound-junit-platform" +version.ref = "blockhound" + +[libraries.blockhound] +module="io.projectreactor.tools:blockhound" +version.ref = "blockhound" + [libraries.bouncycastle-bcpkix] module = "org.bouncycastle:bcpkix-jdk15on" version.ref = "bouncycastle" @@ -685,6 +694,9 @@ version.ref = "kotlin-coroutine" [libraries.kotlin-coroutines-test] module = "org.jetbrains.kotlinx:kotlinx-coroutines-test" version.ref = "kotlin-coroutine" +[libraries.kotlin-coroutines-debug] +module = "org.jetbrains.kotlinx:kotlinx-coroutines-debug" +version.ref = "kotlin-coroutine" # Don't upgrade Logback 1.4.0 which requires Java 11 # TODO(ikhoon): Upgrade Logback to 1.3.0 when Spring Boot 2 supports it. diff --git a/graphql/src/main/java/com/linecorp/armeria/server/graphql/DefaultGraphqlService.java b/graphql/src/main/java/com/linecorp/armeria/server/graphql/DefaultGraphqlService.java index 443398f17fa..c174c94ba1b 100644 --- a/graphql/src/main/java/com/linecorp/armeria/server/graphql/DefaultGraphqlService.java +++ b/graphql/src/main/java/com/linecorp/armeria/server/graphql/DefaultGraphqlService.java @@ -40,6 +40,7 @@ import graphql.ExecutionInput; import graphql.ExecutionResult; import graphql.GraphQL; +import graphql.execution.ExecutionId; final class DefaultGraphqlService extends AbstractGraphqlService implements GraphqlService { @@ -88,6 +89,7 @@ protected HttpResponse executeGraphql(ServiceRequestContext ctx, GraphqlRequest final ExecutionInput executionInput = builder.context(ctx) + .executionId(ExecutionId.from(ctx.id().text())) .graphQLContext(GraphqlServiceContexts.graphqlContext(ctx)) .dataLoaderRegistry(dataLoaderRegistry) .build(); diff --git a/grpc/src/main/java/com/linecorp/armeria/common/grpc/GrpcBlockHoundIntegration.java b/grpc/src/main/java/com/linecorp/armeria/common/grpc/GrpcBlockHoundIntegration.java new file mode 100644 index 00000000000..26dafcede6f --- /dev/null +++ b/grpc/src/main/java/com/linecorp/armeria/common/grpc/GrpcBlockHoundIntegration.java @@ -0,0 +1,35 @@ +/* + * Copyright 2023 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.common.grpc; + +import com.linecorp.armeria.common.annotation.UnstableApi; + +import reactor.blockhound.BlockHound.Builder; +import reactor.blockhound.integration.BlockHoundIntegration; + +/** + * A {@link BlockHoundIntegration} for the gRPC module. + */ +@UnstableApi +public final class GrpcBlockHoundIntegration implements BlockHoundIntegration { + + @Override + public void applyTo(Builder builder) { + // ClientCalls.QueuingListener internally uses a blocking queue + builder.allowBlockingCallsInside("java.util.concurrent.ArrayBlockingQueue", "add"); + } +} diff --git a/grpc/src/main/resources/META-INF/services/reactor.blockhound.integration.BlockHoundIntegration b/grpc/src/main/resources/META-INF/services/reactor.blockhound.integration.BlockHoundIntegration new file mode 100644 index 00000000000..fd12d963d6d --- /dev/null +++ b/grpc/src/main/resources/META-INF/services/reactor.blockhound.integration.BlockHoundIntegration @@ -0,0 +1 @@ +com.linecorp.armeria.common.grpc.GrpcBlockHoundIntegration diff --git a/it/context-storage/src/test/java/com/linecorp/armeria/common/RequestContextStorageCustomizingTest.java b/it/context-storage/src/test/java/com/linecorp/armeria/common/RequestContextStorageCustomizingTest.java index d2e344b5ce5..1bcf94c302b 100644 --- a/it/context-storage/src/test/java/com/linecorp/armeria/common/RequestContextStorageCustomizingTest.java +++ b/it/context-storage/src/test/java/com/linecorp/armeria/common/RequestContextStorageCustomizingTest.java @@ -27,6 +27,7 @@ import com.linecorp.armeria.common.CustomRequestContextStorageProvider.CustomRequestContextStorage; import com.linecorp.armeria.common.annotation.Nullable; import com.linecorp.armeria.common.util.SafeCloseable; +import com.linecorp.armeria.common.util.ThreadFactories; import com.linecorp.armeria.server.ServiceRequestContext; import com.linecorp.armeria.testing.junit5.common.EventLoopExtension; @@ -35,7 +36,8 @@ class RequestContextStorageCustomizingTest { @RegisterExtension - static final EventLoopExtension eventLoopExtension = new EventLoopExtension(); + static final EventLoopExtension eventLoopExtension = new EventLoopExtension( + ThreadFactories.newThreadFactory("armeria-testing-eventloop", false)); @Test void requestContextStorageDoesNotAffectOtherThread() throws InterruptedException { diff --git a/it/multipart/src/test/java/com/linecorp/armeria/common/multipart/MultipartCollectIntegrationTest.java b/it/multipart/src/test/java/com/linecorp/armeria/common/multipart/MultipartCollectIntegrationTest.java index 029172b22bf..bc6696255fc 100644 --- a/it/multipart/src/test/java/com/linecorp/armeria/common/multipart/MultipartCollectIntegrationTest.java +++ b/it/multipart/src/test/java/com/linecorp/armeria/common/multipart/MultipartCollectIntegrationTest.java @@ -77,7 +77,7 @@ protected void configure(ServerBuilder sb) throws Exception { }) .thenApply(aggregated -> aggregated.stream().collect( Collectors.toMap(Entry::getKey, Entry::getValue))) - .thenApply(aggregated -> { + .thenApplyAsync(aggregated -> { final StringBuilder responseStringBuilder = new StringBuilder(); responseStringBuilder.append("param1/") .append(aggregated.get("param1")).append('\n'); @@ -110,7 +110,7 @@ protected void configure(ServerBuilder sb) throws Exception { } catch (IOException e) { throw new UncheckedIOException(e); } - }))) + }, ctx.blockingTaskExecutor()))) .service("/multipart/large-file", (ctx, req) -> HttpResponse.from( Multipart.from(req).collect(bodyPart -> { final Path path = tempDir.resolve(bodyPart.name()); @@ -120,7 +120,7 @@ protected void configure(ServerBuilder sb) throws Exception { }) .thenApply(aggregated -> aggregated.stream().collect( Collectors.toMap(Entry::getKey, Entry::getValue))) - .thenApply(aggregated -> { + .thenApplyAsync(aggregated -> { final StringBuilder responseStringBuilder = new StringBuilder(); try { final HashCode file1Hash = @@ -140,7 +140,7 @@ protected void configure(ServerBuilder sb) throws Exception { throw new RuntimeException(e); } return HttpResponse.of(responseStringBuilder.toString()); - }))) + }, ctx.blockingTaskExecutor()))) .requestTimeout(Duration.ZERO) .maxRequestLength(0); } diff --git a/it/trace-context-leak/src/test/java/com/linecorp/armeria/internal/common/TraceRequestContextLeakTest.java b/it/trace-context-leak/src/test/java/com/linecorp/armeria/internal/common/TraceRequestContextLeakTest.java index a6342750f94..7ba506cbe3b 100644 --- a/it/trace-context-leak/src/test/java/com/linecorp/armeria/internal/common/TraceRequestContextLeakTest.java +++ b/it/trace-context-leak/src/test/java/com/linecorp/armeria/internal/common/TraceRequestContextLeakTest.java @@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -36,6 +37,7 @@ import com.linecorp.armeria.common.HttpRequest; import com.linecorp.armeria.common.RequestContext; import com.linecorp.armeria.common.util.SafeCloseable; +import com.linecorp.armeria.common.util.ThreadFactories; import com.linecorp.armeria.server.ServiceRequestContext; import com.linecorp.armeria.testing.junit5.common.EventLoopExtension; import com.linecorp.armeria.testing.junit5.common.EventLoopGroupExtension; @@ -46,10 +48,12 @@ class TraceRequestContextLeakTest { @RegisterExtension - static final EventLoopExtension eventLoopExtension = new EventLoopExtension(); + static final EventLoopExtension eventLoopExtension = + new EventLoopExtension(ThreadFactories.newThreadFactory("trace-test", false)); @RegisterExtension - static final EventLoopGroupExtension eventLoopGroupExtension = new EventLoopGroupExtension(2); + static final EventLoopGroupExtension eventLoopGroupExtension = + new EventLoopGroupExtension(2, ThreadFactories.newThreadFactory("trace-test-group", false)); @Test void singleThreadContextNotLeak() throws InterruptedException { @@ -83,7 +87,7 @@ void singleThreadContextNotLeak() throws InterruptedException { } }); - latch.await(); + assertThat(latch.await(1, TimeUnit.MINUTES)).isTrue(); assertThat(isThrown).isFalse(); } @@ -114,7 +118,7 @@ void singleThreadContextLeak() throws InterruptedException { await().untilTrue(isThrown); assertThat(exception.get()) - .hasMessageContaining("singleThreadContextLeak$2(TraceRequestContextLeakTest.java:101)"); + .hasMessageContaining("the callback was called from unexpected thread"); } } @@ -154,7 +158,7 @@ void multiThreadContextLeakNotInterfereOthersEventLoop() throws InterruptedExcep } }); - latch.await(); + assertThat(latch.await(1, TimeUnit.MINUTES)).isTrue(); assertThat(isThrown).isFalse(); } } @@ -206,7 +210,7 @@ void multiThreadContextLeak() throws InterruptedException { await().untilTrue(isThrown); assertThat(exception.get()) - .hasMessageContaining("multiThreadContextLeak$7(TraceRequestContextLeakTest.java:180)"); + .hasMessageContaining("Trying to call object wrapped with context"); } } @@ -235,7 +239,7 @@ void multipleRequestContextPushBeforeLeak() { @Test @SuppressWarnings("MustBeClosedChecker") - void connerCase() { + void cornerCase() { final AtomicReference exception = new AtomicReference<>(); try (DeferredClose deferredClose = new DeferredClose()) { @@ -253,7 +257,7 @@ void connerCase() { } } assertThat(exception.get()) - .hasMessageContaining("connerCase(TraceRequestContextLeakTest.java:245)"); + .hasMessageContaining("is not the same as the context in the storage"); } private static ServiceRequestContext newCtx(String path) { diff --git a/junit4/src/main/java/com/linecorp/armeria/testing/junit4/common/AbstractEventLoopGroupRule.java b/junit4/src/main/java/com/linecorp/armeria/testing/junit4/common/AbstractEventLoopGroupRule.java index 3335c69bafb..f2e1a9cbd8e 100644 --- a/junit4/src/main/java/com/linecorp/armeria/testing/junit4/common/AbstractEventLoopGroupRule.java +++ b/junit4/src/main/java/com/linecorp/armeria/testing/junit4/common/AbstractEventLoopGroupRule.java @@ -15,6 +15,8 @@ */ package com.linecorp.armeria.testing.junit4.common; +import java.util.concurrent.ThreadFactory; + import org.junit.rules.ExternalResource; import org.junit.rules.TestRule; @@ -25,8 +27,8 @@ abstract class AbstractEventLoopGroupRule extends ExternalResource { private final EventLoopGroupRuleDelegate delegate; - AbstractEventLoopGroupRule(int numThreads, String threadNamePrefix, boolean useDaemonThreads) { - delegate = new EventLoopGroupRuleDelegate(numThreads, threadNamePrefix, useDaemonThreads); + AbstractEventLoopGroupRule(int numThreads, ThreadFactory threadFactory) { + delegate = new EventLoopGroupRuleDelegate(numThreads, threadFactory); } EventLoopGroup group() { diff --git a/junit4/src/main/java/com/linecorp/armeria/testing/junit4/common/EventLoopGroupRule.java b/junit4/src/main/java/com/linecorp/armeria/testing/junit4/common/EventLoopGroupRule.java index 32b7a4fc6e5..636d31d740b 100644 --- a/junit4/src/main/java/com/linecorp/armeria/testing/junit4/common/EventLoopGroupRule.java +++ b/junit4/src/main/java/com/linecorp/armeria/testing/junit4/common/EventLoopGroupRule.java @@ -15,8 +15,12 @@ */ package com.linecorp.armeria.testing.junit4.common; +import java.util.concurrent.ThreadFactory; + import org.junit.rules.TestRule; +import com.linecorp.armeria.common.util.ThreadFactories; + import io.netty.channel.EventLoopGroup; /** @@ -78,7 +82,17 @@ public EventLoopGroupRule(int numThreads, String threadNamePrefix) { * @param useDaemonThreads whether to create daemon threads or not */ public EventLoopGroupRule(int numThreads, String threadNamePrefix, boolean useDaemonThreads) { - super(numThreads, threadNamePrefix, useDaemonThreads); + this(numThreads, ThreadFactories.newEventLoopThreadFactory(threadNamePrefix, useDaemonThreads)); + } + + /** + * Creates a new {@link TestRule} that provides an {@link EventLoopGroup}. + * + * @param numThreads the number of event loop threads + * @param threadFactory the factory used to create threads. + */ + public EventLoopGroupRule(int numThreads, ThreadFactory threadFactory) { + super(numThreads, threadFactory); } /** diff --git a/junit4/src/main/java/com/linecorp/armeria/testing/junit4/common/EventLoopRule.java b/junit4/src/main/java/com/linecorp/armeria/testing/junit4/common/EventLoopRule.java index 701bf181f33..88f0d3bdc11 100644 --- a/junit4/src/main/java/com/linecorp/armeria/testing/junit4/common/EventLoopRule.java +++ b/junit4/src/main/java/com/linecorp/armeria/testing/junit4/common/EventLoopRule.java @@ -15,8 +15,12 @@ */ package com.linecorp.armeria.testing.junit4.common; +import java.util.concurrent.ThreadFactory; + import org.junit.rules.TestRule; +import com.linecorp.armeria.common.util.ThreadFactories; + import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; @@ -71,7 +75,16 @@ public EventLoopRule(String threadNamePrefix) { * @param useDaemonThread whether to create a daemon thread or not */ public EventLoopRule(String threadNamePrefix, boolean useDaemonThread) { - super(1, threadNamePrefix, useDaemonThread); + super(1, ThreadFactories.newEventLoopThreadFactory(threadNamePrefix, useDaemonThread)); + } + + /** + * Creates a new {@link TestRule} that provides an {@link EventLoop}. + * + * @param threadFactory the factory used to create threads. + */ + public EventLoopRule(ThreadFactory threadFactory) { + super(1, threadFactory); } /** diff --git a/junit5/src/main/java/com/linecorp/armeria/internal/testing/EventLoopGroupRuleDelegate.java b/junit5/src/main/java/com/linecorp/armeria/internal/testing/EventLoopGroupRuleDelegate.java index a2ea5ba90c1..6e981465fec 100644 --- a/junit5/src/main/java/com/linecorp/armeria/internal/testing/EventLoopGroupRuleDelegate.java +++ b/junit5/src/main/java/com/linecorp/armeria/internal/testing/EventLoopGroupRuleDelegate.java @@ -16,6 +16,8 @@ package com.linecorp.armeria.internal.testing; +import java.util.concurrent.ThreadFactory; + import com.linecorp.armeria.common.annotation.Nullable; import com.linecorp.armeria.common.util.EventLoopGroups; @@ -27,16 +29,14 @@ public final class EventLoopGroupRuleDelegate { private final int numThreads; - private final String threadNamePrefix; - private final boolean useDaemonThreads; + private final ThreadFactory threadFactory; @Nullable private volatile EventLoopGroup group; - public EventLoopGroupRuleDelegate(int numThreads, String threadNamePrefix, boolean useDaemonThreads) { + public EventLoopGroupRuleDelegate(int numThreads, ThreadFactory threadFactory) { this.numThreads = numThreads; - this.threadNamePrefix = threadNamePrefix; - this.useDaemonThreads = useDaemonThreads; + this.threadFactory = threadFactory; } public EventLoopGroup group() { @@ -48,7 +48,7 @@ public EventLoopGroup group() { } public void before() throws Throwable { - group = EventLoopGroups.newEventLoopGroup(numThreads, threadNamePrefix, useDaemonThreads); + group = EventLoopGroups.newEventLoopGroup(numThreads, threadFactory); } public void after() { diff --git a/junit5/src/main/java/com/linecorp/armeria/testing/junit5/common/AbstractEventLoopGroupExtension.java b/junit5/src/main/java/com/linecorp/armeria/testing/junit5/common/AbstractEventLoopGroupExtension.java index 27908da11b4..b2a7c7203b6 100644 --- a/junit5/src/main/java/com/linecorp/armeria/testing/junit5/common/AbstractEventLoopGroupExtension.java +++ b/junit5/src/main/java/com/linecorp/armeria/testing/junit5/common/AbstractEventLoopGroupExtension.java @@ -16,6 +16,8 @@ package com.linecorp.armeria.testing.junit5.common; +import java.util.concurrent.ThreadFactory; + import org.junit.jupiter.api.extension.Extension; import org.junit.jupiter.api.extension.ExtensionContext; @@ -26,8 +28,8 @@ abstract class AbstractEventLoopGroupExtension extends AbstractAllOrEachExtension { private final EventLoopGroupRuleDelegate delegate; - AbstractEventLoopGroupExtension(int numThreads, String threadNamePrefix, boolean useDaemonThreads) { - delegate = new EventLoopGroupRuleDelegate(numThreads, threadNamePrefix, useDaemonThreads); + AbstractEventLoopGroupExtension(int numThreads, ThreadFactory threadFactory) { + delegate = new EventLoopGroupRuleDelegate(numThreads, threadFactory); } EventLoopGroup group() { diff --git a/junit5/src/main/java/com/linecorp/armeria/testing/junit5/common/EventLoopExtension.java b/junit5/src/main/java/com/linecorp/armeria/testing/junit5/common/EventLoopExtension.java index 42d642e56ab..b17832ef880 100644 --- a/junit5/src/main/java/com/linecorp/armeria/testing/junit5/common/EventLoopExtension.java +++ b/junit5/src/main/java/com/linecorp/armeria/testing/junit5/common/EventLoopExtension.java @@ -16,8 +16,12 @@ package com.linecorp.armeria.testing.junit5.common; +import java.util.concurrent.ThreadFactory; + import org.junit.jupiter.api.extension.Extension; +import com.linecorp.armeria.common.util.ThreadFactories; + import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; @@ -72,7 +76,16 @@ public EventLoopExtension(String threadNamePrefix) { * @param useDaemonThread whether to create a daemon thread or not */ public EventLoopExtension(String threadNamePrefix, boolean useDaemonThread) { - super(1, threadNamePrefix, useDaemonThread); + this(ThreadFactories.newEventLoopThreadFactory(threadNamePrefix, useDaemonThread)); + } + + /** + * Creates a new {@link Extension} that provides an {@link EventLoop}. + * + * @param threadFactory the factory used to create threads. + */ + public EventLoopExtension(ThreadFactory threadFactory) { + super(1, threadFactory); } /** diff --git a/junit5/src/main/java/com/linecorp/armeria/testing/junit5/common/EventLoopGroupExtension.java b/junit5/src/main/java/com/linecorp/armeria/testing/junit5/common/EventLoopGroupExtension.java index 9db7a3b0d4a..b692558e052 100644 --- a/junit5/src/main/java/com/linecorp/armeria/testing/junit5/common/EventLoopGroupExtension.java +++ b/junit5/src/main/java/com/linecorp/armeria/testing/junit5/common/EventLoopGroupExtension.java @@ -16,8 +16,12 @@ package com.linecorp.armeria.testing.junit5.common; +import java.util.concurrent.ThreadFactory; + import org.junit.jupiter.api.extension.Extension; +import com.linecorp.armeria.common.util.ThreadFactories; + import io.netty.channel.EventLoopGroup; /** @@ -79,7 +83,17 @@ public EventLoopGroupExtension(int numThreads, String threadNamePrefix) { * @param useDaemonThreads whether to create daemon threads or not */ public EventLoopGroupExtension(int numThreads, String threadNamePrefix, boolean useDaemonThreads) { - super(numThreads, threadNamePrefix, useDaemonThreads); + this(numThreads, ThreadFactories.newEventLoopThreadFactory(threadNamePrefix, useDaemonThreads)); + } + + /** + * Creates a new {@link Extension} that provides an {@link EventLoopGroup}. + * + * @param numThreads the number of event loop threads + * @param threadFactory the factory used to create threads. + */ + public EventLoopGroupExtension(int numThreads, ThreadFactory threadFactory) { + super(numThreads, threadFactory); } /** diff --git a/retrofit2/src/main/java/com/linecorp/armeria/client/retrofit2/RetrofitBlockHoundIntegration.java b/retrofit2/src/main/java/com/linecorp/armeria/client/retrofit2/RetrofitBlockHoundIntegration.java new file mode 100644 index 00000000000..256d92bf0b1 --- /dev/null +++ b/retrofit2/src/main/java/com/linecorp/armeria/client/retrofit2/RetrofitBlockHoundIntegration.java @@ -0,0 +1,34 @@ +/* + * Copyright 2022 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.client.retrofit2; + +import com.linecorp.armeria.common.annotation.UnstableApi; + +import reactor.blockhound.BlockHound.Builder; +import reactor.blockhound.integration.BlockHoundIntegration; + +/** + * A {@link BlockHoundIntegration} for the retrofit2 module. + */ +@UnstableApi +public final class RetrofitBlockHoundIntegration implements BlockHoundIntegration { + + @Override + public void applyTo(Builder builder) { + builder.allowBlockingCallsInside("com.linecorp.armeria.client.retrofit2.PipeBuffer$PipeSource", "read"); + } +} diff --git a/retrofit2/src/main/resources/META-INF/services/reactor.blockhound.integration.BlockHoundIntegration b/retrofit2/src/main/resources/META-INF/services/reactor.blockhound.integration.BlockHoundIntegration new file mode 100644 index 00000000000..b896ee6116e --- /dev/null +++ b/retrofit2/src/main/resources/META-INF/services/reactor.blockhound.integration.BlockHoundIntegration @@ -0,0 +1 @@ +com.linecorp.armeria.client.retrofit2.RetrofitBlockHoundIntegration diff --git a/saml/src/main/java/com/linecorp/armeria/server/saml/SamlDecorator.java b/saml/src/main/java/com/linecorp/armeria/server/saml/SamlDecorator.java index d23ab17a35b..23806dc9984 100644 --- a/saml/src/main/java/com/linecorp/armeria/server/saml/SamlDecorator.java +++ b/saml/src/main/java/com/linecorp/armeria/server/saml/SamlDecorator.java @@ -137,7 +137,7 @@ public HttpResponse serve(ServiceRequestContext ctx, HttpRequest req) throws Exc }).thenCompose(arg -> { return ssoHandler.beforeInitiatingSso(ctx, req, arg.messageContext, arg.idpConfig) .thenApply(unused -> arg); - }).thenApply(arg -> { + }).thenApplyAsync(arg -> { final SAMLBindingContext bindingContext = arg.messageContext.getSubcontext(SAMLBindingContext.class); final String relayState = bindingContext != null ? bindingContext.getRelayState() : null; @@ -153,6 +153,7 @@ public HttpResponse serve(ServiceRequestContext ctx, HttpRequest req) throws Exc signingCredential, sp.signatureAlgorithm(), relayState)); } else { + // signing can incur a blocking call final String value = toSignedBase64( arg.messageContext.getMessage(), signingCredential, @@ -166,7 +167,7 @@ public HttpResponse serve(ServiceRequestContext ctx, HttpRequest req) throws Exc } catch (SamlException e) { return fail(ctx, e); } - }).exceptionally(e -> fail(ctx, e))); + }, ctx.blockingTaskExecutor()).exceptionally(e -> fail(ctx, e))); })); } diff --git a/saml/src/main/java/com/linecorp/armeria/server/saml/SamlService.java b/saml/src/main/java/com/linecorp/armeria/server/saml/SamlService.java index 5454b1e399e..498fc351cef 100644 --- a/saml/src/main/java/com/linecorp/armeria/server/saml/SamlService.java +++ b/saml/src/main/java/com/linecorp/armeria/server/saml/SamlService.java @@ -152,7 +152,7 @@ public HttpResponse serve(ServiceRequestContext ctx, HttpRequest req) throws Exc } else { f = portConfigHolder.future().thenCompose(unused -> req.aggregate()); } - return HttpResponse.from(f.handle((aggregatedReq, cause) -> { + return HttpResponse.from(f.handleAsync((aggregatedReq, cause) -> { if (cause != null) { logger.warn("{} Failed to aggregate a SAML request.", ctx, cause); return HttpResponse.of(HttpStatus.BAD_REQUEST, MediaType.PLAIN_TEXT_UTF_8, @@ -177,8 +177,9 @@ public HttpResponse serve(ServiceRequestContext ctx, HttpRequest req) throws Exc // If there's no hostname set by a user, the default virtual hostname will be used. final String defaultHostname = firstNonNull(sp.hostname(), ctx.config().virtualHost().defaultHostname()); + // assertion, logout requests incur blocking calls return func.serve(ctx, aggregatedReq, defaultHostname, portConfig); - })); + }, ctx.blockingTaskExecutor())); } /** diff --git a/sangria/sangria_2.13/src/main/resources/META-INF/services/reactor.blockhound.integration.BlockHoundIntegration b/sangria/sangria_2.13/src/main/resources/META-INF/services/reactor.blockhound.integration.BlockHoundIntegration new file mode 100644 index 00000000000..70c990557ca --- /dev/null +++ b/sangria/sangria_2.13/src/main/resources/META-INF/services/reactor.blockhound.integration.BlockHoundIntegration @@ -0,0 +1 @@ +com.linecorp.armeria.server.sangria.SangriaBlockHoundIntegration diff --git a/sangria/sangria_2.13/src/main/scala/com/linecorp/armeria/server/sangria/SangriaBlockHoundIntegration.scala b/sangria/sangria_2.13/src/main/scala/com/linecorp/armeria/server/sangria/SangriaBlockHoundIntegration.scala new file mode 100644 index 00000000000..ef473bf56ac --- /dev/null +++ b/sangria/sangria_2.13/src/main/scala/com/linecorp/armeria/server/sangria/SangriaBlockHoundIntegration.scala @@ -0,0 +1,33 @@ +/* + * Copyright 2023 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.server.sangria + +import com.linecorp.armeria.common.annotation.UnstableApi +import reactor.blockhound.BlockHound +import reactor.blockhound.integration.BlockHoundIntegration + +/** + * A [[BlockHoundIntegration]] for the sangria module. + */ +@UnstableApi +final class SangriaBlockHoundIntegration extends BlockHoundIntegration { + + override def applyTo(builder: BlockHound.Builder): Unit = { + builder.allowBlockingCallsInside("sangria.parser.QueryParser$", "parse") + builder.allowBlockingCallsInside("com.thoughtworks.paranamer.CachingParanamer", "lookupParameterNames") + } +} diff --git a/scala/scala_2.13/src/main/resources/META-INF/services/reactor.blockhound.integration.BlockHoundIntegration b/scala/scala_2.13/src/main/resources/META-INF/services/reactor.blockhound.integration.BlockHoundIntegration new file mode 100644 index 00000000000..55eeeb743ba --- /dev/null +++ b/scala/scala_2.13/src/main/resources/META-INF/services/reactor.blockhound.integration.BlockHoundIntegration @@ -0,0 +1 @@ +com.linecorp.armeria.scala.ScalaBlockHoundIntegration diff --git a/scala/scala_2.13/src/main/scala/com/linecorp/armeria/scala/ScalaBlockHoundIntegration.scala b/scala/scala_2.13/src/main/scala/com/linecorp/armeria/scala/ScalaBlockHoundIntegration.scala new file mode 100644 index 00000000000..aa4fd5efe5f --- /dev/null +++ b/scala/scala_2.13/src/main/scala/com/linecorp/armeria/scala/ScalaBlockHoundIntegration.scala @@ -0,0 +1,32 @@ +/* + * Copyright 2023 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.scala + +import com.linecorp.armeria.common.annotation.UnstableApi +import reactor.blockhound.BlockHound +import reactor.blockhound.integration.BlockHoundIntegration + +/** + * A [[BlockHoundIntegration]] for the scala module. + */ +@UnstableApi +final class ScalaBlockHoundIntegration extends BlockHoundIntegration { + + override def applyTo(builder: BlockHound.Builder): Unit = { + builder.allowBlockingCallsInside("com.thoughtworks.paranamer.CachingParanamer", "lookupParameterNames") + } +} diff --git a/settings/checkstyle/checkstyle-suppressions.xml b/settings/checkstyle/checkstyle-suppressions.xml index 69321c2c4d4..2b32f251f0d 100644 --- a/settings/checkstyle/checkstyle-suppressions.xml +++ b/settings/checkstyle/checkstyle-suppressions.xml @@ -16,7 +16,9 @@ - - + + + + diff --git a/settings/checkstyle/checkstyle.xml b/settings/checkstyle/checkstyle.xml index d7e7ee3b22e..5295650660c 100644 --- a/settings/checkstyle/checkstyle.xml +++ b/settings/checkstyle/checkstyle.xml @@ -125,6 +125,12 @@ + + + + + + diff --git a/testing-internal/src/main/java/com/linecorp/armeria/internal/testing/BlockingUtils.java b/testing-internal/src/main/java/com/linecorp/armeria/internal/testing/BlockingUtils.java new file mode 100644 index 00000000000..5d879ab5df4 --- /dev/null +++ b/testing-internal/src/main/java/com/linecorp/armeria/internal/testing/BlockingUtils.java @@ -0,0 +1,41 @@ +/* + * Copyright 2022 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.internal.testing; + +import org.junit.jupiter.api.function.Executable; +import org.junit.jupiter.api.function.ThrowingSupplier; + +public final class BlockingUtils { + + public static void blockingRun(Executable runnable) { + try { + runnable.execute(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + + public static T blockingRun(ThrowingSupplier supplier) { + try { + return supplier.get(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + + private BlockingUtils() {} +} diff --git a/testing-internal/src/main/java/com/linecorp/armeria/internal/testing/InternalTestingBlockHoundIntegration.java b/testing-internal/src/main/java/com/linecorp/armeria/internal/testing/InternalTestingBlockHoundIntegration.java new file mode 100644 index 00000000000..9a4d4a16720 --- /dev/null +++ b/testing-internal/src/main/java/com/linecorp/armeria/internal/testing/InternalTestingBlockHoundIntegration.java @@ -0,0 +1,86 @@ +/* + * Copyright 2022 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.internal.testing; + +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.io.PrintStream; + +import reactor.blockhound.BlockHound.Builder; +import reactor.blockhound.BlockingMethod; +import reactor.blockhound.integration.BlockHoundIntegration; + +public final class InternalTestingBlockHoundIntegration implements BlockHoundIntegration { + + private static final OutputStream NULL = new OutputStream() { + @Override + public void write(int b) throws IOException { + } + }; + + static final PrintStream ps; + + static { + final String path = System.getProperties().getProperty("com.linecorp.armeria.blockhound.reportFile"); + if (path == null) { + ps = new PrintStream(NULL); + } else { + final File file = new File(path); + try { + ps = new PrintStream(file); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + Runtime.getRuntime().addShutdownHook(new Thread(ps::close)); + } + + @Override + public void applyTo(Builder builder) { + + // tests are allowed to block event loops + builder.allowBlockingCallsInside("com.linecorp.armeria.internal.testing.BlockingUtils", + "sleep"); + builder.allowBlockingCallsInside("com.linecorp.armeria.internal.testing.BlockingUtils", + "join"); + builder.allowBlockingCallsInside("com.linecorp.armeria.internal.testing.BlockingUtils", + "acquireUninterruptibly"); + builder.allowBlockingCallsInside("com.linecorp.armeria.internal.testing.BlockingUtils", + "await"); + builder.allowBlockingCallsInside("com.linecorp.armeria.internal.testing.BlockingUtils", + "blockingRun"); + builder.allowBlockingCallsInside("org.assertj.core.api.Assertions", "assertThat"); + builder.allowBlockingCallsInside("net.javacrumbs.jsonunit.fluent.JsonFluentAssert", + "assertThatJson"); + builder.allowBlockingCallsInside("com.linecorp.armeria.testing.server.ServiceRequestContextCaptor$2", + "serve"); + + builder.allowBlockingCallsInside( + "com.linecorp.armeria.internal.testing.InternalTestingBlockHoundIntegration", + "writeBlockingMethod"); + + // prints the exception which makes it easier to debug issues + builder.blockingMethodCallback(this::writeBlockingMethod); + } + + void writeBlockingMethod(BlockingMethod m) { + ps.println(Thread.currentThread()); + new Exception(m.toString()).printStackTrace(ps); + ps.flush(); + } +} diff --git a/testing-internal/src/main/resources/META-INF/services/reactor.blockhound.integration.BlockHoundIntegration b/testing-internal/src/main/resources/META-INF/services/reactor.blockhound.integration.BlockHoundIntegration new file mode 100644 index 00000000000..c0224b4bab3 --- /dev/null +++ b/testing-internal/src/main/resources/META-INF/services/reactor.blockhound.integration.BlockHoundIntegration @@ -0,0 +1 @@ +com.linecorp.armeria.internal.testing.InternalTestingBlockHoundIntegration diff --git a/thrift/thrift0.13/src/test/java/com/linecorp/armeria/client/thrift/ThriftOverHttpClientTest.java b/thrift/thrift0.13/src/test/java/com/linecorp/armeria/client/thrift/ThriftOverHttpClientTest.java index 5df012c5109..375f784424a 100644 --- a/thrift/thrift0.13/src/test/java/com/linecorp/armeria/client/thrift/ThriftOverHttpClientTest.java +++ b/thrift/thrift0.13/src/test/java/com/linecorp/armeria/client/thrift/ThriftOverHttpClientTest.java @@ -62,6 +62,7 @@ import com.linecorp.armeria.client.Endpoint; import com.linecorp.armeria.client.InvalidResponseHeadersException; import com.linecorp.armeria.client.logging.LoggingRpcClient; +import com.linecorp.armeria.common.CommonPools; import com.linecorp.armeria.common.HttpHeaderNames; import com.linecorp.armeria.common.HttpHeaders; import com.linecorp.armeria.common.HttpResponse; @@ -79,6 +80,7 @@ import com.linecorp.armeria.common.thrift.ThriftReply; import com.linecorp.armeria.common.thrift.ThriftSerializationFormats; import com.linecorp.armeria.common.util.Exceptions; +import com.linecorp.armeria.internal.testing.BlockingUtils; import com.linecorp.armeria.server.HttpService; import com.linecorp.armeria.server.ServerBuilder; import com.linecorp.armeria.server.ServiceRequestContext; @@ -257,7 +259,7 @@ static void init() throws Exception { final ClientDecorationBuilder decoBuilder = ClientDecoration.builder(); decoBuilder.addRpc((delegate, ctx, req) -> { if (recordMessageLogs) { - ctx.log().whenComplete().thenAccept(requestLogs::add); + ctx.log().whenComplete().thenAcceptAsync(requestLogs::add, CommonPools.blockingTaskExecutor()); } return delegate.execute(ctx, req); }); @@ -318,12 +320,14 @@ void testHelloServiceAsync( client.hello("kukuman" + num, new AsyncMethodCallback() { @Override public void onComplete(String response) { - assertThat(resultQueue.add(new AbstractMap.SimpleEntry<>(num, response))).isTrue(); + BlockingUtils.blockingRun(() -> assertThat(resultQueue.add( + new AbstractMap.SimpleEntry<>(num, response))).isTrue()); } @Override public void onError(Exception exception) { - assertThat(resultQueue.add(new AbstractMap.SimpleEntry<>(num, exception))).isTrue(); + BlockingUtils.blockingRun(() -> assertThat(resultQueue.add( + new AbstractMap.SimpleEntry<>(num, exception))).isTrue()); } }); } @@ -831,12 +835,13 @@ private static class RequestQueuingCallback implements AsyncMethodCallback { @Override public void onComplete(Object response) { - assertThat(resQueue.add(response == null ? "null" : response)).isTrue(); + BlockingUtils.blockingRun( + () -> assertThat(resQueue.add(response == null ? "null" : response)).isTrue()); } @Override public void onError(Exception exception) { - assertThat(resQueue.add(exception)).isTrue(); + BlockingUtils.blockingRun(() -> assertThat(resQueue.add(exception)).isTrue()); } } } diff --git a/zookeeper3/src/main/java/com/linecorp/armeria/client/zookeeper/ZooKeeperEndpointGroup.java b/zookeeper3/src/main/java/com/linecorp/armeria/client/zookeeper/ZooKeeperEndpointGroup.java index ec9acc8342e..7de3b4dad1c 100644 --- a/zookeeper3/src/main/java/com/linecorp/armeria/client/zookeeper/ZooKeeperEndpointGroup.java +++ b/zookeeper3/src/main/java/com/linecorp/armeria/client/zookeeper/ZooKeeperEndpointGroup.java @@ -34,10 +34,9 @@ import com.linecorp.armeria.client.endpoint.EndpointGroup; import com.linecorp.armeria.client.endpoint.EndpointSelectionStrategy; import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.common.util.ThreadFactories; import com.linecorp.armeria.server.zookeeper.ZooKeeperUpdatingListener; -import io.netty.util.concurrent.DefaultThreadFactory; - /** * A ZooKeeper-based {@link EndpointGroup} implementation. This {@link EndpointGroup} retrieves the list of * {@link Endpoint}s from a ZooKeeper and updates it when the children of the znode changes. @@ -49,7 +48,7 @@ public final class ZooKeeperEndpointGroup extends DynamicEndpointGroup { private static final Logger logger = LoggerFactory.getLogger(ZooKeeperEndpointGroup.class); private static final ThreadFactory closeCuratorFrameworkThreadFactory = - new DefaultThreadFactory("armeria-close-CuratorFramework"); + ThreadFactories.newThreadFactory("armeria-close-CuratorFramework", false); /** * Returns a new {@link ZooKeeperEndpointGroup} that retrieves the {@link Endpoint} list from