Skip to content

Commit

Permalink
Provide blockhound integrations (#4493)
Browse files Browse the repository at this point in the history
Motivation:

Armeria users often use `BlockHound` to determine whether there are any
blocking calls in their applications.
Providing a basic `BlockHound` integration can help de-duplicate some
code and debugging efforts.

I propose the following:

- Our CI checks for blocking calls in our tests to keep our `BlockHound`
integration up-to-date with changes.
- We provide a **lenient** list of allowed blocking calls that may be
commonly used even though not related directly to Armeria
- Some JDK implementations internally use locks (`ThreadPoolExecutor`).
As long as these locks are verified to be short-living, we can add it to
our integration.
- Although somewhat subjective, if short-living locks are held by
Armeria integrations (not necessarily directly related to Armeria) I
think it's reasonable to add it to our list.
- HdrHistogram is used internally by Micrometer. Micrometer itself
doesn't expect metrics to be recorded from event loops, so it may not be
reasonable to expect `Micrometer` or `HdrHistogram` to provide
`Blockhound` integrations. Since Armeria uses `Micrometer` to record
metrics from event loops, I think it is reasonable that we add
`HdrHistogram` related calls to our `Blockhound` integration.
- The integration is public so that users can opt-out of some
integrations if they choose to do so.
- We can possibly provide more customization points if users feel some
rules are too lenient.
- I don't consider the current listing as complete/finalized, and expect
that users can suggest additional rules in the future.

Also note that there are some blocking calls within the codebase, but
this PR attempts to just introduce blockhound rather than remove such
calls.

Modifications:

- Modify the build action so that
  - The blockhound flag is set for java 19
- Upload blockhound logs which contains a list of blocked threads and
their stacktraces
- Change the artifact upload name. Currently, because the name can be
duplicated for java19 (mac, windows, self-hosted), only one of these is
downloadable
- Provide `BlockHound` integrations for `core`, `brave`, `grpc`,
`retrofit2`, `scala`, `sangria` modules
- Provide an internal testing `BlockHound` integration which allows
test-specific blocking calls
- This integration also adds a callback which logs blocking calls to a
separate file
- If the `blockhound` gradle property flag is specified, run tests with
`BlockHound` enabled
- Otherwise, `BlockHound.install` is not invoked and not enabled in the
code-base
- Introduce `ReentrantShortLock` which Armeria's `BlockHound` whitelists
and change all usages of `ReentrantLock`
- `BlockingFastThreadLocalThread` is now used for non-event loops since
netty whitelists `FastThreadLocalThread` with
`permitBlockingCalls==true`
  - #4493 (comment)
  - Note that this means that armeria requires at least Netty 4.1.85 now
- `ShutdownSupport::shutdown` is now called from `startStopExecutor`.
Although this method returns a CF, the call can actually be blocking
- `START_STOP_EXECUTOR` is now a non-eventloop daemon thread instead of
using the `GlobalEventExecutor`
  - This is because `GlobalEventExecutor` is an event loop.
- Introduce `BlockingUtils#blockingRun` in case users want to run
blocking calls from tests
- Add the `kotlinx-coroutines-debug` dependency so that Coroutines
`BlockHound` integration is loaded
-
https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-debug/kotlinx.coroutines.debug/-coroutines-block-hound-integration/
- Allow users to specify the `ThreadFactory` when creating a
`EventLoopExtension` or `EventLoopGroupExtension`
- This can be useful when users would like to create an event loop which
allows blocking calls
- `SamlService`, `SamlDecorator` now runs some calls from the blocking
executor
- `ZookeeperEndpointGroup` now uses a non-event-loop thread factory

Result:

- Users can now use `BlockHound` integrations provided by armeria
  • Loading branch information
jrhee17 authored May 25, 2023
1 parent 274929f commit d853651
Show file tree
Hide file tree
Showing 74 changed files with 719 additions and 132 deletions.
15 changes: 12 additions & 3 deletions .github/workflows/actions_build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ jobs:
- java: 19
on: self-hosted
snapshot: true
# blockhound makes the build run about 10 minutes slower
blockhound: true

steps:
- uses: actions/checkout@v2
Expand Down Expand Up @@ -86,6 +88,7 @@ jobs:
${{ (matrix.on == 'self-hosted') && '--max-workers=8' || '--max-workers=2' }} --parallel \
${{ matrix.coverage && '-Pcoverage' || '' }} \
${{ matrix.leak && '-Pleak' || '' }} \
${{ matrix.blockhound && '-Pblockhound' || '' }} \
-PnoLint \
-PflakyTests=false \
-PbuildJdkVersion=${{ env.BUILD_JDK_VERSION }} \
Expand Down Expand Up @@ -133,17 +136,23 @@ jobs:
if: ${{ matrix.coverage }}
uses: codecov/codecov-action@v1

- name: Fail the run if any threads were blocked
if: ${{ matrix.blockhound }}
run: "if [[ -z `find . -name 'blockhound.log' -size +0` ]]; then exit 0; else exit 1; fi"
shell: bash

- name: Collect the test reports
if: failure()
run: find . '(' -name 'java_pid*.hprof' -or -name 'hs_err_*.log' -or -path '*/build/reports/tests' -or -path '*/build/test-results' -or -path '*/javadoc.options' ')' -exec tar rf "reports-JVM-${{ matrix.java }}.tar" {} ';'
run: |
find . '(' -name 'java_pid*.hprof' -or -name 'hs_err_*.log' -or -path '*/build/reports/tests' -or -path '*/build/test-results' -or -path '*/javadoc.options' -or -name 'blockhound.log' ')' -exec tar rf "reports-JVM-${{ matrix.on }}-${{ matrix.java }}.tar" {} ';'
shell: bash

- name: Upload the artifacts
if: failure()
uses: actions/upload-artifact@v2
with:
name: reports-JVM-${{ matrix.java }}
path: reports-JVM-${{ matrix.java }}.tar
name: reports-JVM-${{ matrix.on }}-${{ matrix.java }}
path: reports-JVM-${{ matrix.on }}-${{ matrix.java }}.tar
retention-days: 3

lint:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2022 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.common.brave;

import com.linecorp.armeria.common.annotation.UnstableApi;

import reactor.blockhound.BlockHound.Builder;
import reactor.blockhound.integration.BlockHoundIntegration;

/**
* A {@link BlockHoundIntegration} for the brave module.
*/
@UnstableApi
public final class BraveBlockHoundIntegration implements BlockHoundIntegration {

@Override
public void applyTo(Builder builder) {
builder.allowBlockingCallsInside("zipkin2.reporter.AsyncReporter$BoundedAsyncReporter", "report");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
com.linecorp.armeria.common.brave.BraveBlockHoundIntegration
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@
import com.linecorp.armeria.common.brave.HelloService.AsyncIface;
import com.linecorp.armeria.common.brave.RequestContextCurrentTraceContext;
import com.linecorp.armeria.common.thrift.ThriftFuture;
import com.linecorp.armeria.common.util.Exceptions;
import com.linecorp.armeria.common.util.ThreadFactories;
import com.linecorp.armeria.internal.testing.BlockingUtils;
import com.linecorp.armeria.server.AbstractHttpService;
import com.linecorp.armeria.server.HttpService;
import com.linecorp.armeria.server.ServerBuilder;
Expand Down Expand Up @@ -569,17 +569,13 @@ private static class SpanHandlerImpl extends SpanHandler {

@Override
public boolean end(TraceContext context, MutableSpan span, Cause cause) {
return spans.add(span);
return BlockingUtils.blockingRun(() -> spans.add(span));
}

MutableSpan[] take(int numSpans) {
final List<MutableSpan> taken = new ArrayList<>();
while (taken.size() < numSpans) {
try {
taken.add(spans.poll(30, TimeUnit.SECONDS));
} catch (InterruptedException e) {
return Exceptions.throwUnsafely(e);
}
BlockingUtils.blockingRun(() -> taken.add(spans.poll(30, TimeUnit.SECONDS)));
}

// Reverse the collected spans to sort the spans by request time.
Expand Down
15 changes: 15 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,17 @@ allprojects {
systemProperty "java.security.manager", "allow"
}

// required by blockhound for jvm 13+. See https://github.com/reactor/BlockHound/issues/33.
if (rootProject.hasProperty('blockhound') && project.ext.testJavaVersion >= 13) {
jvmArgs "-XX:+AllowRedefinitionToAddDeleteMethods"
}

// Use verbose exception/response reporting for easier debugging.
systemProperty 'com.linecorp.armeria.verboseExceptions', 'true'
systemProperty 'com.linecorp.armeria.verboseResponses', 'true'

systemProperty 'com.linecorp.armeria.blockhound.reportFile', "${project.buildDir}/blockhound.log"

// Pass special system property to tell our tests that we are measuring coverage.
if (project.hasFlags('coverage')) {
systemProperty 'com.linecorp.armeria.testing.coverage', 'true'
Expand Down Expand Up @@ -165,6 +172,9 @@ configure(projectsWithFlags('java')) {
// Reflections
implementation libs.reflections

// Blockhound
optionalImplementation libs.blockhound

// Test-time dependencies
testImplementation libs.guava.testlib
testImplementation libs.junit4
Expand All @@ -183,6 +193,11 @@ configure(projectsWithFlags('java')) {
testImplementation libs.apache.httpclient5
testImplementation libs.hamcrest
testImplementation libs.hamcrest.library
testRuntimeOnly libs.kotlin.coroutines.debug

if (rootProject.hasProperty('blockhound')) {
testRuntimeOnly libs.blockhound.junit.platform
}
}

// Configure the default DuplicatesStrategy for such as:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import com.google.common.annotations.VisibleForTesting;

import com.linecorp.armeria.internal.common.util.ReentrantShortLock;

import io.netty.channel.EventLoop;

abstract class AbstractEventLoopState {
Expand All @@ -35,7 +37,7 @@ static AbstractEventLoopState of(List<EventLoop> eventLoops, int maxNumEventLoop
return new HeapBasedEventLoopState(eventLoops, maxNumEventLoops, scheduler);
}

private final ReentrantLock lock = new ReentrantLock();
private final ReentrantLock lock = new ReentrantShortLock();
private final List<EventLoop> eventLoops;
private final DefaultEventLoopScheduler scheduler;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import com.linecorp.armeria.common.SessionProtocol;
import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.common.util.ReleasableHolder;
import com.linecorp.armeria.internal.common.util.ReentrantShortLock;

import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
Expand All @@ -58,7 +59,7 @@ final class DefaultEventLoopScheduler implements EventLoopScheduler {

static final int DEFAULT_MAX_NUM_EVENT_LOOPS = 1;

private final ReentrantLock lock = new ReentrantLock();
private final ReentrantLock lock = new ReentrantShortLock();

private final List<EventLoop> eventLoops;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.linecorp.armeria.common.Cookies;
import com.linecorp.armeria.common.Scheme;
import com.linecorp.armeria.common.SessionProtocol;
import com.linecorp.armeria.internal.common.util.ReentrantShortLock;

import io.netty.util.NetUtil;
import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
Expand All @@ -60,7 +61,7 @@ final class DefaultCookieJar implements CookieJar {
this.cookiePolicy = cookiePolicy;
store = new Object2LongOpenHashMap<>();
filter = new HashMap<>();
lock = new ReentrantLock();
lock = new ReentrantShortLock();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import com.google.common.base.MoreObjects;
import com.google.common.base.MoreObjects.ToStringHelper;
Expand All @@ -45,6 +44,7 @@
import com.linecorp.armeria.common.util.AsyncCloseableSupport;
import com.linecorp.armeria.common.util.EventLoopCheckingFuture;
import com.linecorp.armeria.common.util.ListenableAsyncCloseable;
import com.linecorp.armeria.internal.common.util.ReentrantShortLock;

/**
* A dynamic {@link EndpointGroup}. The list of {@link Endpoint}s can be updated dynamically.
Expand All @@ -66,7 +66,7 @@ public static DynamicEndpointGroupBuilder builder() {
private final EndpointSelectionStrategy selectionStrategy;
private final AtomicReference<EndpointSelector> selector = new AtomicReference<>();
private volatile List<Endpoint> endpoints = UNINITIALIZED_ENDPOINTS;
private final Lock endpointsLock = new ReentrantLock();
private final Lock endpointsLock = new ReentrantShortLock();

private final CompletableFuture<List<Endpoint>> initialEndpointsFuture = new InitialEndpointsFuture();
private final AsyncCloseableSupport closeable = AsyncCloseableSupport.of(this::closeAsync);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import com.google.common.base.MoreObjects;

import com.linecorp.armeria.client.endpoint.FileWatcherRunnable.FileWatchEvent;
import com.linecorp.armeria.internal.common.util.ReentrantShortLock;

/**
* A registry which wraps a {@link WatchService} and allows paths to be registered.
Expand Down Expand Up @@ -133,7 +134,7 @@ void close() throws IOException {

private final Map<FileSystem, FileSystemWatchContext> fileSystemWatchServiceMap =
new HashMap<>();
private final ReentrantLock lock = new ReentrantLock();
private final ReentrantLock lock = new ReentrantShortLock();

/**
* Registers a {@code filePath} and {@code callback} to the {@link WatchService}. When the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@
import java.util.function.Supplier;

import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.internal.common.util.ReentrantShortLock;

/**
* A restartable thread utility class.
*/
final class RestartableThread {

private final ReentrantLock lock = new ReentrantLock();
private final ReentrantLock lock = new ReentrantShortLock();

@Nullable
private Thread thread;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import com.linecorp.armeria.client.Endpoint;
import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.internal.common.util.ReentrantShortLock;

/**
* This selector selects an {@link Endpoint} using random and the weight of the {@link Endpoint}. If there are
Expand All @@ -37,7 +38,7 @@
*/
final class WeightedRandomDistributionEndpointSelector {

private final ReentrantLock lock = new ReentrantLock();
private final ReentrantLock lock = new ReentrantShortLock();
private final List<Entry> allEntries;
@GuardedBy("lock")
private final List<Entry> currentEntries;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.common.util.AsyncCloseable;
import com.linecorp.armeria.common.util.EventLoopCheckingFuture;
import com.linecorp.armeria.internal.common.util.ReentrantShortLock;

import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.Future;
Expand All @@ -54,7 +55,7 @@ final class DefaultHealthCheckerContext
private final Endpoint endpoint;
private final SessionProtocol protocol;
private final ClientOptions clientOptions;
private final ReentrantLock lock = new ReentrantLock();
private final ReentrantLock lock = new ReentrantShortLock();

/**
* Keeps the {@link Future}s which were scheduled via this {@link ScheduledExecutorService}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.common.metric.MeterIdPrefix;
import com.linecorp.armeria.common.util.AsyncCloseable;
import com.linecorp.armeria.internal.common.util.ReentrantShortLock;

import io.micrometer.core.instrument.binder.MeterBinder;

Expand Down Expand Up @@ -110,7 +111,7 @@ public static HealthCheckedEndpointGroupBuilder builder(EndpointGroup delegate,
@VisibleForTesting
final HealthCheckStrategy healthCheckStrategy;

private final ReentrantLock lock = new ReentrantLock();
private final ReentrantLock lock = new ReentrantShortLock();
@GuardedBy("lock")
private final Deque<HealthCheckContextGroup> contextGroupChain = new ArrayDeque<>(4);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import com.linecorp.armeria.common.util.AsyncCloseable;
import com.linecorp.armeria.common.util.AsyncCloseableSupport;
import com.linecorp.armeria.common.util.TimeoutMode;
import com.linecorp.armeria.internal.common.util.ReentrantShortLock;
import com.linecorp.armeria.unsafe.PooledObjects;

import io.netty.util.AsciiString;
Expand All @@ -60,7 +61,7 @@ final class HttpHealthChecker implements AsyncCloseable {

private static final AsciiString ARMERIA_LPHC = HttpHeaderNames.of("armeria-lphc");

private final ReentrantLock lock = new ReentrantLock();
private final ReentrantLock lock = new ReentrantShortLock();
private final HealthCheckerContext ctx;
private final WebClient webClient;
private final String authority;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright 2023 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.common;

import java.util.ResourceBundle;

import com.linecorp.armeria.common.annotation.UnstableApi;

import reactor.blockhound.BlockHound.Builder;
import reactor.blockhound.integration.BlockHoundIntegration;

/**
* A {@link BlockHoundIntegration} for the core module.
*/
@UnstableApi
public final class CoreBlockHoundIntegration implements BlockHoundIntegration {
@Override
public void applyTo(Builder builder) {
// short locks
builder.allowBlockingCallsInside("com.linecorp.armeria.client.HttpClientFactory",
"pool");
builder.allowBlockingCallsInside("com.linecorp.armeria.internal.common.util.ReentrantShortLock",
"lock");

// Thread.yield can be eventually called when PooledObjects.copyAndClose is called
builder.allowBlockingCallsInside("io.netty.util.internal.ReferenceCountUpdater", "release");

// hdr histogram holds locks
builder.allowBlockingCallsInside("org.HdrHistogram.ConcurrentHistogram", "getCountAtIndex");
builder.allowBlockingCallsInside("org.HdrHistogram.WriterReaderPhaser", "flipPhase");

// StreamMessageInputStream internally uses a blocking queue
// ThreadPoolExecutor.execute internally uses a blocking queue
builder.allowBlockingCallsInside("java.util.concurrent.LinkedBlockingQueue", "offer");

// a single blocking call is incurred for the first invocation, but the result is cached.
builder.allowBlockingCallsInside("com.linecorp.armeria.internal.client.PublicSuffix",
"get");
builder.allowBlockingCallsInside("java.util.ServiceLoader$LazyClassPathLookupIterator",
"parse");
builder.allowBlockingCallsInside(ResourceBundle.class.getName(), "getBundle");
builder.allowBlockingCallsInside("io.netty.handler.codec.compression.Brotli", "<clinit>");

// a lock is held temporarily when adding workers
builder.allowBlockingCallsInside("java.util.concurrent.ThreadPoolExecutor", "addWorker");

// prometheus exporting holds a lock temporarily
builder.allowBlockingCallsInside(
"com.linecorp.armeria.server.metric.PrometheusExpositionService", "doGet");

// Thread.yield can be called
builder.allowBlockingCallsInside(
"java.util.concurrent.FutureTask", "handlePossibleCancellationInterrupt");
}
}
Loading

0 comments on commit d853651

Please sign in to comment.