-
Notifications
You must be signed in to change notification settings - Fork 921
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix bug related with context propagation (CoroutineServerInterceptor) #4894
Conversation
override fun <I : Any, O : Any> asyncInterceptCall( | ||
call: ServerCall<I, O>, | ||
headers: Metadata, | ||
next: ServerCallHandler<I, O> | ||
): CompletableFuture<ServerCall.Listener<I>> { | ||
check(call is AbstractServerCall) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that this assertion is unneeded.
...kotlin/src/main/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptor.kt
Show resolved
Hide resolved
|
||
companion object { | ||
@Suppress("UNCHECKED_CAST") | ||
internal val COROUTINE_CONTEXT_KEY: Context.Key<CoroutineContext> = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I used reflection... There is no other way.
This problem seems to be caused by A workaround for the problem would be to execute armeria/grpc/src/main/java/com/linecorp/armeria/server/grpc/GrpcServiceBuilder.java Lines 964 to 968 in c888c86
What do you think of this approach instead of using reflection which would be a final solution if we have no other choices? |
If it only propagates the Armeria Context, your proposed fix is fine. However, your suggestion does not propagate the Coroutine Context.
my test code diff from this PR:diff --git a/grpc-kotlin/src/main/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptor.kt b/grpc-kotlin/src/main/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptor.kt
index f8d5bcc6d..4e340e506 100644
--- a/grpc-kotlin/src/main/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptor.kt
+++ b/grpc-kotlin/src/main/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptor.kt
@@ -17,6 +17,8 @@
package com.linecorp.armeria.server.grpc.kotlin
import com.linecorp.armeria.common.annotation.UnstableApi
+import com.linecorp.armeria.internal.common.kotlin.ArmeriaRequestCoroutineContext
+import com.linecorp.armeria.internal.server.grpc.AbstractServerCall
import com.linecorp.armeria.server.grpc.AsyncServerInterceptor
import io.grpc.Context
import io.grpc.Metadata
@@ -26,6 +28,8 @@ import io.grpc.ServerInterceptor
import io.grpc.kotlin.CoroutineContextServerInterceptor
import io.grpc.kotlin.GrpcContextElement
import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.GlobalScope
+import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.future.future
import java.util.concurrent.CompletableFuture
import kotlin.coroutines.CoroutineContext
@@ -62,14 +66,24 @@ interface CoroutineServerInterceptor : AsyncServerInterceptor {
headers: Metadata,
next: ServerCallHandler<I, O>
): CompletableFuture<ServerCall.Listener<I>> {
- // COROUTINE_CONTEXT_KEY.get():
- // It is necessary to propagate the CoroutineContext set by the previous CoroutineContextServerInterceptor.
- // (The ArmeriaRequestCoroutineContext is also propagated by CoroutineContextServerInterceptor)
- // GrpcContextElement.current():
- // In gRPC-kotlin, the Coroutine Context is propagated using the gRPC Context.
- return CoroutineScope(COROUTINE_CONTEXT_KEY.get() + GrpcContextElement.current()).future {
+ check(call is AbstractServerCall) {
+ throw IllegalArgumentException(
+ "Cannot use ${AsyncServerInterceptor::class.java.name} with a non-Armeria gRPC server"
+ )
+ }
+ val executor = call.blockingExecutor() ?: call.eventLoop()
+
+ return GlobalScope.future(executor.asCoroutineDispatcher() + ArmeriaRequestCoroutineContext(call.ctx())) {
suspendedInterceptCall(call, headers, next)
}
+// // COROUTINE_CONTEXT_KEY.get():
+// // It is necessary to propagate the CoroutineContext set by the previous CoroutineContextServerInterceptor.
+// // (The ArmeriaRequestCoroutineContext is also propagated by CoroutineContextServerInterceptor)
+// // GrpcContextElement.current():
+// // In gRPC-kotlin, the Coroutine Context is propagated using the gRPC Context.
+// return CoroutineScope(COROUTINE_CONTEXT_KEY.get() + GrpcContextElement.current()).future {
+// suspendedInterceptCall(call, headers, next)
+// }
}
/**
diff --git a/grpc/src/main/java/com/linecorp/armeria/server/grpc/GrpcServiceBuilder.java b/grpc/src/main/java/com/linecorp/armeria/server/grpc/GrpcServiceBuilder.java
index 38b66d976..01943b8e2 100644
--- a/grpc/src/main/java/com/linecorp/armeria/server/grpc/GrpcServiceBuilder.java
+++ b/grpc/src/main/java/com/linecorp/armeria/server/grpc/GrpcServiceBuilder.java
@@ -948,6 +948,9 @@ public final class GrpcServiceBuilder {
private ImmutableList.Builder<ServerInterceptor> interceptors() {
if (interceptors == null) {
interceptors = ImmutableList.builder();
+ final ServerInterceptor coroutineContextInterceptor =
+ new ArmeriaCoroutineContextInterceptor(useBlockingTaskExecutor);
+ interceptors().add(coroutineContextInterceptor);
}
return interceptors;
}
@@ -961,11 +964,11 @@ public final class GrpcServiceBuilder {
*/
public GrpcService build() {
final HandlerRegistry handlerRegistry;
- if (USE_COROUTINE_CONTEXT_INTERCEPTOR) {
- final ServerInterceptor coroutineContextInterceptor =
- new ArmeriaCoroutineContextInterceptor(useBlockingTaskExecutor);
- interceptors().add(coroutineContextInterceptor);
- }
+// if (USE_COROUTINE_CONTEXT_INTERCEPTOR) {
+// final ServerInterceptor coroutineContextInterceptor =
+// new ArmeriaCoroutineContextInterceptor(useBlockingTaskExecutor);
+// interceptors().add(coroutineContextInterceptor);
+// }
if (!enableUnframedRequests && unframedGrpcErrorHandler != null) {
throw new IllegalStateException(
"'unframedGrpcErrorHandler' can only be set if unframed requests are enabled");
We use reflection, but grpc-kotlin is rarely changed, and even if it is changed, I think that unit test is enough to cover it. |
I see. I'm okay with reflection. But we need to consider two points in the current changes.
I will leave some workarounds on the code review commnets. |
...kotlin/src/main/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptor.kt
Show resolved
Hide resolved
// (The ArmeriaRequestCoroutineContext is also propagated by CoroutineContextServerInterceptor) | ||
// GrpcContextElement.current(): | ||
// In gRPC-kotlin, the Coroutine Context is propagated using the gRPC Context. | ||
return CoroutineScope(COROUTINE_CONTEXT_KEY.get() + GrpcContextElement.current()).future { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the previous plain AsyncServerInterceptor
changes the current thread, we need to inject ArmeriaRequestCoroutineContext(call.ctx()
again. What do you think of reviving the old code and combine them with the new one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think even if thread switched, the coroutine will do the context propagation, so there is no problem.
Actually this test is pass.
https://github.com/be-hase/armeria/blob/e4aad71345c39853945fd97b2ea1f85feaf2fb3e/grpc-kotlin/src/test/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptorTest.kt#L277-L286
I think it is a good idea to inject ArmeriaRequestCoroutineContext(call.ctx()) just in case.
Hmmm, which would you prefer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you mind testing the following scenario?
private class MyAsyncInterceptor : AsyncServerInterceptor {
override fun <I : Any, O : Any> asyncInterceptCall(
call: ServerCall<I, O>,
headers: Metadata,
next: ServerCallHandler<I, O>
): CompletableFuture<ServerCall.Listener<I>> {
return CompletableFuture.supplyAsync({
next.startCall(call, headers)
}, Executors.newSingleThreadExecutor())
}
}
GrpcService.builder()
.exceptionMapping(statusFunction)
// applying order is MyAsyncInterceptor -> coroutineNameInterceptor -> authInterceptor -> threadLocalInterceptor
.intercept(threadLocalInterceptor, authInterceptor, coroutineNameInterceptor, MyAsyncInterceptor())
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, my reply is late. I was on vacation.
Oh, I see what you mean by what you intend.
Indeed, I was able to confirm that test falls down when there is such an AsyncInterceptor.
However, test will still fail in this case.
What do you think of reviving the old code and combining them with the new one?
my test code diff from this PR:
diff --git a/grpc-kotlin/src/main/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptor.kt b/grpc-kotlin/src/main/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptor.kt
index f8d5bcc6d..d44a0fc73 100644
--- a/grpc-kotlin/src/main/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptor.kt
+++ b/grpc-kotlin/src/main/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptor.kt
@@ -17,6 +17,8 @@
package com.linecorp.armeria.server.grpc.kotlin
import com.linecorp.armeria.common.annotation.UnstableApi
+import com.linecorp.armeria.internal.common.kotlin.ArmeriaRequestCoroutineContext
+import com.linecorp.armeria.internal.server.grpc.AbstractServerCall
import com.linecorp.armeria.server.grpc.AsyncServerInterceptor
import io.grpc.Context
import io.grpc.Metadata
@@ -26,6 +28,7 @@ import io.grpc.ServerInterceptor
import io.grpc.kotlin.CoroutineContextServerInterceptor
import io.grpc.kotlin.GrpcContextElement
import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.future.future
import java.util.concurrent.CompletableFuture
import kotlin.coroutines.CoroutineContext
@@ -62,12 +65,19 @@ interface CoroutineServerInterceptor : AsyncServerInterceptor {
headers: Metadata,
next: ServerCallHandler<I, O>
): CompletableFuture<ServerCall.Listener<I>> {
+ check(call is AbstractServerCall) {
+ throw IllegalArgumentException(
+ "Cannot use ${AsyncServerInterceptor::class.java.name} with a non-Armeria gRPC server"
+ )
+ }
+ val executor = call.blockingExecutor() ?: call.eventLoop()
+
// COROUTINE_CONTEXT_KEY.get():
// It is necessary to propagate the CoroutineContext set by the previous CoroutineContextServerInterceptor.
// (The ArmeriaRequestCoroutineContext is also propagated by CoroutineContextServerInterceptor)
// GrpcContextElement.current():
// In gRPC-kotlin, the Coroutine Context is propagated using the gRPC Context.
- return CoroutineScope(COROUTINE_CONTEXT_KEY.get() + GrpcContextElement.current()).future {
+ return CoroutineScope(executor.asCoroutineDispatcher() + ArmeriaRequestCoroutineContext(call.ctx()) + COROUTINE_CONTEXT_KEY.get() + GrpcContextElement.current()).future {
suspendedInterceptCall(call, headers, next)
}
}
diff --git a/grpc-kotlin/src/test/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptorTest.kt b/grpc-kotlin/src/test/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptorTest.kt
index a05514ae4..f62fd9282 100644
--- a/grpc-kotlin/src/test/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptorTest.kt
+++ b/grpc-kotlin/src/test/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptorTest.kt
@@ -34,6 +34,7 @@ import com.linecorp.armeria.internal.testing.AnticipatedException
import com.linecorp.armeria.server.ServerBuilder
import com.linecorp.armeria.server.ServiceRequestContext
import com.linecorp.armeria.server.auth.Authorizer
+import com.linecorp.armeria.server.grpc.AsyncServerInterceptor
import com.linecorp.armeria.server.grpc.GrpcService
import com.linecorp.armeria.testing.junit5.server.ServerExtension
import io.grpc.Context
@@ -232,7 +233,12 @@ internal class CoroutineServerInterceptorTest {
GrpcService.builder()
.exceptionMapping(statusFunction)
// applying order is coroutineNameInterceptor -> authInterceptor -> threadLocalInterceptor
- .intercept(threadLocalInterceptor, authInterceptor, coroutineNameInterceptor)
+ .intercept(
+ threadLocalInterceptor,
+ authInterceptor,
+ coroutineNameInterceptor,
+ MyAsyncInterceptor()
+ )
.addService(TestService())
.build()
)
@@ -242,7 +248,12 @@ internal class CoroutineServerInterceptorTest {
.addService(TestService())
.exceptionMapping(statusFunction)
// applying order is coroutineNameInterceptor -> authInterceptor -> threadLocalInterceptor
- .intercept(threadLocalInterceptor, authInterceptor, coroutineNameInterceptor)
+ .intercept(
+ threadLocalInterceptor,
+ authInterceptor,
+ coroutineNameInterceptor,
+ MyAsyncInterceptor()
+ )
.useBlockingTaskExecutor(true)
.build()
)
@@ -322,6 +333,22 @@ internal class CoroutineServerInterceptorTest {
}
}
+ private class MyAsyncInterceptor : AsyncServerInterceptor {
+ override fun <I : Any, O : Any> asyncInterceptCall(
+ call: ServerCall<I, O>,
+ headers: Metadata,
+ next: ServerCallHandler<I, O>
+ ): CompletableFuture<ServerCall.Listener<I>> {
+ return CompletableFuture.supplyAsync({
+ next.startCall(call, headers)
+ }, EXECUTOR)
+ }
+
+ companion object {
+ private val EXECUTOR = Executors.newSingleThreadExecutor()
+ }
+ }
+
private class TestService : TestServiceGrpcKt.TestServiceCoroutineImplBase() {
override suspend fun unaryCall(request: SimpleRequest): SimpleResponse {
assertContextPropagation()
As I wrote in the issue, it is necessary to propagate the grpc context correctly, so, for example, the following modification will cause the test to PASS.
private class MyAsyncInterceptor : AsyncServerInterceptor {
override fun <I : Any, O : Any> asyncInterceptCall(
call: ServerCall<I, O>,
headers: Metadata,
next: ServerCallHandler<I, O>
): CompletableFuture<ServerCall.Listener<I>> {
val ctx = Context.current()
return CompletableFuture.supplyAsync({
val prev = ctx.attach()
try {
next.startCall(call, headers)
} finally {
ctx.detach(prev)
}
}, EXECUTOR)
}
companion object {
private val EXECUTOR = Executors.newSingleThreadExecutor()
}
}
By the way, this is NG.
private class MyAsyncInterceptor : AsyncServerInterceptor {
override fun <I : Any, O : Any> asyncInterceptCall(
call: ServerCall<I, O>,
headers: Metadata,
next: ServerCallHandler<I, O>
): CompletableFuture<ServerCall.Listener<I>> {
val ctx = ServiceRequestContext.current()
return CompletableFuture.supplyAsync({
ctx.push().use {
next.startCall(call, headers)
}
}, EXECUTOR)
}
companion object {
private val EXECUTOR = Executors.newSingleThreadExecutor()
}
}
I think AsyncServerInterceptor
also needs to be modified.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test failure seems fixed if we add ArmeriaCoroutineContextInterceptor
as the first interceptor. #4894 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks, updated: 2f18c5f
At the current commit, test goes pass.
But.... . if I change the order of MyAuthInterceptor, the test doesn't pass again...
diff --git a/grpc-kotlin/src/test/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptorTest.kt b/grpc-kotlin/src/test/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptorTest.kt
index c8ca61e49..b0dcabca8 100644
--- a/grpc-kotlin/src/test/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptorTest.kt
+++ b/grpc-kotlin/src/test/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptorTest.kt
@@ -234,10 +234,10 @@ internal class CoroutineServerInterceptorTest {
.exceptionMapping(statusFunction)
// applying order is MyAsyncInterceptor -> coroutineNameInterceptor -> authInterceptor -> threadLocalInterceptor
.intercept(
+ MyAsyncInterceptor(),
threadLocalInterceptor,
authInterceptor,
coroutineNameInterceptor,
- MyAsyncInterceptor(),
)
.addService(TestService())
.build()
@@ -249,10 +249,10 @@ internal class CoroutineServerInterceptorTest {
.exceptionMapping(statusFunction)
// applying order is MyAsyncInterceptor -> coroutineNameInterceptor -> authInterceptor -> threadLocalInterceptor
.intercept(
+ MyAsyncInterceptor(),
threadLocalInterceptor,
authInterceptor,
coroutineNameInterceptor,
- MyAsyncInterceptor(),
)
.useBlockingTaskExecutor(true)
.build()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val ctx = Context.current() return CompletableFuture.supplyAsync({ val prev = ctx.attach() try { next.startCall(call, headers) } finally { ctx.detach(prev) } }, EXECUTOR)
After some experiments, I figured out that these manual attach
and detach
seem the only way to propagate the gRPC's context correctly when the intercepting thread is changed.
I am okay with this PR if we can change ArmeriaCoroutineContextInterceptor
to get ServiceRequestContext
without relying on the current thread. @be-hase What do you think?
diff --git grpc/src/main/java/com/linecorp/armeria/server/grpc/ArmeriaCoroutineContextInterceptor.java grpc/src/main/java/com/linecorp/armeria/server/grpc/ArmeriaCoroutineContextInterceptor.java
index 223afad48..05add2d4c 100644
--- grpc/src/main/java/com/linecorp/armeria/server/grpc/ArmeriaCoroutineContextInterceptor.java
+++ grpc/src/main/java/com/linecorp/armeria/server/grpc/ArmeriaCoroutineContextInterceptor.java
@@ -16,8 +16,11 @@
package com.linecorp.armeria.server.grpc;
+import static com.google.common.base.Preconditions.checkState;
+
import java.util.concurrent.ScheduledExecutorService;
+import com.linecorp.armeria.internal.server.grpc.AbstractServerCall;
import com.linecorp.armeria.server.ServiceRequestContext;
import io.grpc.Metadata;
@@ -36,7 +39,10 @@ final class ArmeriaCoroutineContextInterceptor extends CoroutineContextServerInt
@Override
public CoroutineContext coroutineContext(ServerCall<?, ?> serverCall, Metadata metadata) {
- final ServiceRequestContext ctx = ServiceRequestContext.current();
+ checkState(serverCall instanceof AbstractServerCall,
+ "Cannot use %s with a non-Armeria gRPC server",
+ ArmeriaCoroutineContextInterceptor.class.getName());
+ final ServiceRequestContext ctx = ((AbstractServerCall<?, ?>) serverCall).ctx();
final ArmeriaRequestCoroutineContext coroutineContext = new ArmeriaRequestCoroutineContext(ctx);
final ScheduledExecutorService executor;
if (useBlockingTaskExecutor) {
diff --git grpc-kotlin/src/test/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptorTest.kt grpc-kotlin/src/test/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptorTest.kt
index c8ca61e49..f30c64ae3 100644
--- grpc-kotlin/src/test/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptorTest.kt
+++ grpc-kotlin/src/test/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptorTest.kt
@@ -19,6 +19,7 @@ package com.linecorp.armeria.server.grpc.kotlin
import com.google.common.util.concurrent.ThreadFactoryBuilder
import com.google.protobuf.ByteString
import com.linecorp.armeria.client.grpc.GrpcClients
+import com.linecorp.armeria.client.logging.LoggingClient
import com.linecorp.armeria.common.RequestContext
import com.linecorp.armeria.common.auth.AuthToken
import com.linecorp.armeria.common.grpc.GrpcStatusFunction
@@ -234,6 +235,7 @@ internal class CoroutineServerInterceptorTest {
.exceptionMapping(statusFunction)
// applying order is MyAsyncInterceptor -> coroutineNameInterceptor -> authInterceptor -> threadLocalInterceptor
.intercept(
+ MyAsyncInterceptor(),
threadLocalInterceptor,
authInterceptor,
coroutineNameInterceptor,
@@ -249,6 +251,7 @@ internal class CoroutineServerInterceptorTest {
.exceptionMapping(statusFunction)
// applying order is MyAsyncInterceptor -> coroutineNameInterceptor -> authInterceptor -> threadLocalInterceptor
.intercept(
+ MyAsyncInterceptor(),
threadLocalInterceptor,
authInterceptor,
coroutineNameInterceptor,
@@ -339,8 +342,9 @@ internal class CoroutineServerInterceptorTest {
headers: Metadata,
next: ServerCallHandler<I, O>
): CompletableFuture<ServerCall.Listener<I>> {
+ val current = Context.current()
return CompletableFuture.supplyAsync({
- next.startCall(call, headers)
+ current.call { next.startCall(call, headers) }
}, EXECUTOR)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Applied the patch to e2af251
(#4894)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the delay in getting back to you. Thanks for catching up.
After some experiments, I figured out that these manual attach and detach seem the only way to propagate the gRPC's context correctly when the intercepting thread is changed.
Yes, I agree with you.
I generally agree with your fix, but I prefer to focus only on propagating the grpc context (so the armeria context is propagated along with it) and write it this way.
However, this is a matter of personal preference and can be ignored.
diff --git a/grpc-kotlin/src/main/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptor.kt b/grpc-kotlin/src/main/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptor.kt
index 5af2461cd..4e2496d67 100644
--- a/grpc-kotlin/src/main/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptor.kt
+++ b/grpc-kotlin/src/main/kotlin/com/linecorp/armeria/server/grpc/kotlin/CoroutineServerInterceptor.kt
@@ -17,8 +17,6 @@
package com.linecorp.armeria.server.grpc.kotlin
import com.linecorp.armeria.common.annotation.UnstableApi
-import com.linecorp.armeria.internal.common.kotlin.ArmeriaRequestCoroutineContext
-import com.linecorp.armeria.internal.server.grpc.AbstractServerCall
import com.linecorp.armeria.server.grpc.AsyncServerInterceptor
import io.grpc.Context
import io.grpc.Metadata
@@ -28,7 +26,6 @@ import io.grpc.ServerInterceptor
import io.grpc.kotlin.CoroutineContextServerInterceptor
import io.grpc.kotlin.GrpcContextElement
import kotlinx.coroutines.CoroutineScope
-import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.future.future
import java.util.concurrent.CompletableFuture
import kotlin.coroutines.CoroutineContext
@@ -65,21 +62,13 @@ interface CoroutineServerInterceptor : AsyncServerInterceptor {
headers: Metadata,
next: ServerCallHandler<I, O>
): CompletableFuture<ServerCall.Listener<I>> {
- check(call is AbstractServerCall) {
- throw IllegalArgumentException(
- "Cannot use ${AsyncServerInterceptor::class.java.name} with a non-Armeria gRPC server"
- )
- }
- val executor = call.blockingExecutor() ?: call.eventLoop()
-
// COROUTINE_CONTEXT_KEY.get():
// It is necessary to propagate the CoroutineContext set by the previous CoroutineContextServerInterceptor.
// (The ArmeriaRequestCoroutineContext is also propagated by CoroutineContextServerInterceptor)
// GrpcContextElement.current():
// In gRPC-kotlin, the Coroutine Context is propagated using the gRPC Context.
return CoroutineScope(
- executor.asCoroutineDispatcher() + ArmeriaRequestCoroutineContext(call.ctx()) +
- COROUTINE_CONTEXT_KEY.get() + GrpcContextElement.current()
+ COROUTINE_CONTEXT_KEY.get() + GrpcContextElement.current()
).future {
suspendedInterceptCall(call, headers, next)
}
diff --git a/grpc/src/main/java/com/linecorp/armeria/server/grpc/GrpcServiceBuilder.java b/grpc/src/main/java/com/linecorp/armeria/server/grpc/GrpcServiceBuilder.java
index 5500506b5..6b61f89a2 100644
--- a/grpc/src/main/java/com/linecorp/armeria/server/grpc/GrpcServiceBuilder.java
+++ b/grpc/src/main/java/com/linecorp/armeria/server/grpc/GrpcServiceBuilder.java
@@ -948,11 +948,6 @@ public final class GrpcServiceBuilder {
private ImmutableList.Builder<ServerInterceptor> interceptors() {
if (interceptors == null) {
interceptors = ImmutableList.builder();
- if (USE_COROUTINE_CONTEXT_INTERCEPTOR) {
- final ServerInterceptor coroutineContextInterceptor =
- new ArmeriaCoroutineContextInterceptor(useBlockingTaskExecutor);
- interceptors.add(coroutineContextInterceptor);
- }
}
return interceptors;
}
@@ -966,6 +961,11 @@ public final class GrpcServiceBuilder {
*/
public GrpcService build() {
final HandlerRegistry handlerRegistry;
+ if (USE_COROUTINE_CONTEXT_INTERCEPTOR) {
+ final ServerInterceptor coroutineContextInterceptor =
+ new ArmeriaCoroutineContextInterceptor(useBlockingTaskExecutor);
+ interceptors().add(coroutineContextInterceptor);
+ }
if (!enableUnframedRequests && unframedGrpcErrorHandler != null) {
throw new IllegalStateException(
"'unframedGrpcErrorHandler' can only be set if unframed requests are enabled");
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It makes sense. Updated!
This PR is ready to review. PTAL. 🙇♂️ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, @be-hase! 👍 It was a pleasant experience building this PR together. ❤️
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, @be-hase! 🙇
Hey guys, thanks for the review of the difficult context propagation. I also enjoyed it. :) |
Motivation:
A great feature for Coroutines users has been introduced.
#4724
However, Armeria's RequestContext is not propagated when using CoroutineServerInterceptor.
Modifications:
CoroutineServerInterceptor
so that the context is propagated correctlyResult:
CoroutineServerInterceptor
#4889