Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

COROUTINE_SUSPENDED response when calling coroutine in a suspending controller function which returns Flow #11131

Open
vekonypeter opened this issue Aug 29, 2024 · 9 comments
Labels
info: workaround available A workaround is available for the issue

Comments

@vekonypeter
Copy link

vekonypeter commented Aug 29, 2024

Expected Behavior

I have a class annotated with @Controller and my method return Flow<...> as response. Inside the method I have to call another suspending function, because e.g. I want to fetch data from the database or call another API which all works via coroutines for me. Due to this my controller method is also a suspending.

When a client calls this method it should get the data sent via the Flow.

Actual Behaviour

status code is 200 , but the response is the following:

"COROUTINE_SUSPENDED"

Steps To Reproduce

/**
 * This returns "COROUTINE_SUSPENDED" as body
 */
@Get("foo1")
suspend fun foo1(): Flow<Char> {
    // get data from db or from other API
    val res = callDbOrOtherApi().toList()

    // want to return a flow that needs the result as an input
    return doSomeOtherStuff(res)
}

fun callDbOrOtherApi(): Flow<String> = flow {
    delay(1000) // other party needs some time to think

    // then it starts to stream the data
    listOf("val1", "val2")
        .forEach { emit(it) }
}

fun doSomeOtherStuff(input: List<String>): Flow<Char> =
    input.joinToString()
        .toList()
        .asFlow()

Environment Information

  • OS: MacOS 12.7.4 (21H1123)
  • Architecture: ARM64
  • Kotlin version: 1.9.23
  • JVM: OpenJDK Temurin-17.0.5+8 (build 17.0.5+8)

Example Application

No response

Version

4.4.2

@vekonypeter
Copy link
Author

Workaround:

    @Get("foo2")
    fun foo2(): Flow<Char> {

        return flow {
            // get data from db or from other API
            emit(callDbOrOtherApi().toList())
        }.flatMapConcat { res ->

            // want to return a flow that needs the result as an input
            doSomeOtherStuff(res)
        }
    }

@dstepanov
Copy link
Contributor

Please create a sample app that reproduces the problem. Make sure you have added Micronaut Kotlin dependency.

@yawkat
Copy link
Member

yawkat commented Aug 30, 2024

combining Flow and suspend seems a bit weird to me

@dstepanov
Copy link
Contributor

I have added a sample project with Flow and it's passing.
#11135
Please modify it to reproduce your problem

@vekonypeter
Copy link
Author

vekonypeter commented Aug 30, 2024

@dstepanov sure, example application here based on your's: https://github.com/vekonypeter/miconaut-core-issue-11131/tree/main

I modified your example, because that way it is surely works. The problem is when you have a suspending function, which returns a Flow, but also contains some other suspending coroutine interaction before. See example here: https://github.com/vekonypeter/miconaut-core-issue-11131/blob/main/src/main/kotlin/com/example/HelloController.kt#L14-L18

Test result here is:

Expected :Hello World
Actual   :"COROUTINE_SUSPENDED"
<Click to see difference>

org.opentest4j.AssertionFailedError: expected: <Hello World> but was: <"COROUTINE_SUSPENDED">
	at org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
	at org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
	at org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197)
	at org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182)
	at org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:177)
	at org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1145)
	at com.example.HelloControllerTest.testHelloWorld1(HelloControllerTest.kt:21)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at io.micronaut.test.extensions.junit5.MicronautJunit5Extension$2.proceed(MicronautJunit5Extension.java:142)
	at io.micronaut.test.extensions.AbstractMicronautExtension.interceptEach(AbstractMicronautExtension.java:162)
	at io.micronaut.test.extensions.AbstractMicronautExtension.interceptTest(AbstractMicronautExtension.java:119)
	at io.micronaut.test.extensions.junit5.MicronautJunit5Extension.interceptTestMethod(MicronautJunit5Extension.java:129)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)

Dependencies seem to be fine, micronaut-kotlin-runtime is added.

Is this a very exotic use-case? For us, it seems pretty common, because all of our database interactions and calls towards other APIs are done using suspending functions.

@dstepanov
Copy link
Contributor

I see, but as Jonas wrote it doesn’t make sense to have suspended Flow.

@yawkat
Copy link
Member

yawkat commented Aug 30, 2024

@vekonypeter the reason it's weird is that you're combining two reactive programming styles (coroutines and flows). In our framework view it becomes a Publisher<Publisher<String>>. You can "unwrap" this in one of the following ways:

Index: src/main/kotlin/com/example/HelloController.kt
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/src/main/kotlin/com/example/HelloController.kt b/src/main/kotlin/com/example/HelloController.kt
--- a/src/main/kotlin/com/example/HelloController.kt	(revision abe1e617c596a15913e542f4832d531535516e00)
+++ b/src/main/kotlin/com/example/HelloController.kt	(date 1725020074789)
@@ -6,15 +6,16 @@
 import kotlinx.coroutines.delay
 import kotlinx.coroutines.flow.Flow
 import kotlinx.coroutines.flow.asFlow
+import kotlinx.coroutines.flow.emitAll
 import kotlinx.coroutines.flow.flow
 
 @Controller("/hello")
 class HelloController {
 
     @Get(value = "/world1")
-    suspend fun world1(): Flow<String> {
+    fun world1(): Flow<String> = flow {
         delay(1000)
-        return listOf("Hello World").asFlow()
+        emitAll(listOf("Hello World").asFlow())
     }
 
     @Get(value = "/world2", produces = [MediaType.TEXT_PLAIN])
Index: src/main/kotlin/com/example/HelloController.kt
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/src/main/kotlin/com/example/HelloController.kt b/src/main/kotlin/com/example/HelloController.kt
--- a/src/main/kotlin/com/example/HelloController.kt	(revision abe1e617c596a15913e542f4832d531535516e00)
+++ b/src/main/kotlin/com/example/HelloController.kt	(date 1725020169453)
@@ -7,14 +7,15 @@
 import kotlinx.coroutines.flow.Flow
 import kotlinx.coroutines.flow.asFlow
 import kotlinx.coroutines.flow.flow
+import kotlinx.coroutines.flow.single
 
 @Controller("/hello")
 class HelloController {
 
     @Get(value = "/world1")
-    suspend fun world1(): Flow<String> {
+    suspend fun world1(): String {
         delay(1000)
-        return listOf("Hello World").asFlow()
+        return listOf("Hello World").asFlow().single()
     }
 
     @Get(value = "/world2", produces = [MediaType.TEXT_PLAIN])

@vekonypeter
Copy link
Author

I don't fully understand what you mean by "combining two reactive programming styles (coroutines and flows)". Flows are an integral part of Kotlin coroutines. Maybe the example is just too simple, but imagine the following use-case:

  • I have a controller method
  • first, I need to fetch some data from a remote API. I'm using Ktor HTTP client for that which uses coroutines, therefore returns data through suspending functions.
  • then I go to my database and want to read data from it using the result of the previous API call. The database call returns a Flow or entities and I want to return that from the controller method.

in code:

suspend fun test(): Flow<Any> {
    val res = httpClientMethodCall() // this is a suspending function therefore test() must be also suspending
    return readSomeStuffFromDb(res)
}

I don't think that this is something super weird, pretty usual when you have everything implemented with coroutines and suspending functions. But this will return the "COROUTINE_SUSPENDED" response for sure.

I can make it work somehow like this and probably in a lot of other ways too, but it feels odd:

fun test(): Flow<Any> = flow {
    emit(httpClientMethodCall())
}.map { res ->
    readSomeStuffFromDb(res)
}

@yawkat
Copy link
Member

yawkat commented Aug 30, 2024

Both a suspend method and a Flow represent the same thing: A result that will be produced asynchronously in the future (in reactive streams terms, both are a flow). Flow has some added features like multiple items and multiple consumers, but it is conceptually the same thing.

When you have a suspend method that returns a Flow, the framework first has to wait for the suspend method to complete, and then also for the flow to complete. This form of double flow is not supported.

FWIW, this is not exactly idiomatic in general kotlin code either, because it can be annoying to work with even without micronaut involved. This SO answer describes it well: https://stackoverflow.com/a/76031024

@yawkat yawkat added the info: workaround available A workaround is available for the issue label Sep 2, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
info: workaround available A workaround is available for the issue
Projects
None yet
Development

No branches or pull requests

3 participants