Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Conflict for NonBlocking Interface in Reactor Integration #6037

Merged
merged 2 commits into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@ dependencies {
// JUnit Pioneer
testImplementation libs.junit.pioneer

// Reactor for registering EventLoop as a non blocking thread.
optionalImplementation libs.reactor.core

// Reactive Streams
api libs.reactivestreams
testImplementation libs.reactivestreams.tck
Expand Down Expand Up @@ -217,8 +220,6 @@ if (tasks.findByName('trimShadedJar')) {
tasks.trimShadedJar.configure {
// Keep all classes under com.linecorp.armeria, except the internal ones.
keep "class !com.linecorp.armeria.internal.shaded.**,com.linecorp.armeria.** { *; }"
// Keep the 'NonBlocking' tag interface.
keep "class reactor.core.scheduler.NonBlocking { *; }"
// Do not optimize the dependencies that access some fields via sun.misc.Unsafe or reflection only.
keep "class com.linecorp.armeria.internal.shaded.caffeine.** { *; }"
keep "class com.linecorp.armeria.internal.shaded.jctools.** { *; }"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

import com.linecorp.armeria.client.endpoint.EndpointGroup;
import com.linecorp.armeria.common.FlagsProvider;
import com.linecorp.armeria.common.NonBlocking;
import com.linecorp.armeria.common.Scheme;
import com.linecorp.armeria.common.SerializationFormat;
import com.linecorp.armeria.common.SessionProtocol;
Expand All @@ -49,7 +50,6 @@
import io.micrometer.core.instrument.MeterRegistry;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import reactor.core.scheduler.NonBlocking;

/**
* Creates and manages clients.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import com.linecorp.armeria.client.proxy.Socks4ProxyConfig;
import com.linecorp.armeria.client.proxy.Socks5ProxyConfig;
import com.linecorp.armeria.common.ClosedSessionException;
import com.linecorp.armeria.common.NonBlocking;
import com.linecorp.armeria.common.SerializationFormat;
import com.linecorp.armeria.common.SessionProtocol;
import com.linecorp.armeria.common.annotation.Nullable;
Expand Down Expand Up @@ -76,7 +77,6 @@
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import reactor.core.scheduler.NonBlocking;

final class HttpChannelPool implements AsyncCloseable {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import com.linecorp.armeria.client.proxy.ProxyConfigSelector;
import com.linecorp.armeria.client.redirect.RedirectConfig;
import com.linecorp.armeria.common.Http1HeaderNaming;
import com.linecorp.armeria.common.NonBlocking;
import com.linecorp.armeria.common.RequestContext;
import com.linecorp.armeria.common.Scheme;
import com.linecorp.armeria.common.SerializationFormat;
Expand Down Expand Up @@ -71,7 +72,6 @@
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.resolver.AddressResolverGroup;
import io.netty.util.concurrent.FutureListener;
import reactor.core.scheduler.NonBlocking;

/**
* A {@link ClientFactory} that creates an HTTP client.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ public final class CommonPools {
MoreMeterBinders
.eventLoopMetrics(WORKER_GROUP, new MeterIdPrefix("armeria.netty.common"))
.bindTo(Flags.meterRegistry());

try {
Class.forName("reactor.core.scheduler.Schedulers",
true, CommonPools.class.getClassLoader());
ReactorNonBlockingUtil.registerEventLoopAsNonBlocking();
} catch (ClassNotFoundException e) {
// Do nothing.
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
public final class CoreBlockHoundIntegration implements BlockHoundIntegration {
@Override
public void applyTo(Builder builder) {
builder.nonBlockingThreadPredicate(predicate -> predicate.or(NonBlocking.class::isInstance));

// short locks
builder.allowBlockingCallsInside("com.linecorp.armeria.client.HttpClientFactory",
"pool");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,18 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
package reactor.core.scheduler;
package com.linecorp.armeria.common;

/**
* A dummy interface that makes Project Reactor recognize Armeria's event loop threads as non-blocking.
trustin marked this conversation as resolved.
Show resolved Hide resolved
* An interface that indicates a non-blocking thread. You can use this interface to check if the current
* thread is a non-blocking thread. For example:
* <pre>{@code
* if (Thread.currentThread() instanceof NonBlocking) {
* // Avoid blocking operations.
* closeable.closeAsync();
* } else {
* closeable.close();
* }
* }</pre>
*/
public interface NonBlocking {}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019 LINE Corporation
* Copyright 2024 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
Expand All @@ -13,11 +13,15 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.linecorp.armeria.common;

/**
* Provides a dummy interface that makes Project Reactor recognize Armeria's event loop threads as non-blocking.
*/
@NonNullByDefault
package reactor.core.scheduler;
import reactor.core.scheduler.Schedulers;

final class ReactorNonBlockingUtil {

static void registerEventLoopAsNonBlocking() {
Schedulers.registerNonBlockingThreadPredicate(NonBlocking.class::isInstance);
}

import com.linecorp.armeria.common.annotation.NonNullByDefault;
private ReactorNonBlockingUtil() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,9 @@
import com.google.common.collect.MapMaker;

import com.linecorp.armeria.common.Flags;
import com.linecorp.armeria.common.NonBlocking;
import com.linecorp.armeria.common.annotation.Nullable;

import reactor.core.scheduler.NonBlocking;

/**
* A {@link CompletableFuture} that warns the user if they call a method that blocks the event loop.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@
*/
package com.linecorp.armeria.internal.common.util;

import com.linecorp.armeria.common.NonBlocking;
import com.linecorp.armeria.common.annotation.Nullable;

import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.concurrent.FastThreadLocalThread;
import reactor.core.scheduler.NonBlocking;

/**
* An event loop thread with support for {@link TemporaryThreadLocals}, Netty {@link FastThreadLocal} and
* Project Reactor {@link NonBlocking}.
* {@link NonBlocking} interface.
*/
public final class EventLoopThread extends FastThreadLocalThread implements NonBlocking {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2024 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.linecorp.armeria.common.reactor3;

import static org.assertj.core.api.Assertions.assertThat;

import org.junit.jupiter.api.Test;

import com.linecorp.armeria.common.CommonPools;

import io.netty.util.concurrent.Future;
import reactor.core.scheduler.Schedulers;

final class EventLoopNonBlockingTest {

/**
* Verifies that the current thread is registered a non-blocking thread via {@code ReactorNonBlockingUtil}.
*/
@Test
void checkEventLoopNonBlocking() throws Exception {
final Future<Boolean> submit = CommonPools.workerGroup().submit(Schedulers::isInNonBlockingThread);
assertThat(submit.get()).isTrue();
}
}
Loading