From ffb58a34e9b53ecdff72622ad704d567443d8168 Mon Sep 17 00:00:00 2001 From: Felipe Reis Date: Fri, 6 Sep 2024 12:54:19 +1000 Subject: [PATCH] Avoid hasNext=true on the last incremental payload (#588) * Avoid hasNext=true on the last incremental payload * Adjust test case * Add new test case * Adjustments after review. Thanks @gnawf --- .../NadelIncrementalResultAccumulator.kt | 2 +- .../engine/NadelIncrementalResultSupport.kt | 29 +++++++++++-------- .../hydration/NadelHydrationTransform.kt | 4 +++ .../NadelIncrementalResultSupportTest.kt | 6 +++- .../nadel/tests/next/NadelIntegrationTest.kt | 4 ++- 5 files changed, 30 insertions(+), 15 deletions(-) diff --git a/lib/src/main/java/graphql/nadel/engine/NadelIncrementalResultAccumulator.kt b/lib/src/main/java/graphql/nadel/engine/NadelIncrementalResultAccumulator.kt index 7b72ce31c..5eec863ac 100644 --- a/lib/src/main/java/graphql/nadel/engine/NadelIncrementalResultAccumulator.kt +++ b/lib/src/main/java/graphql/nadel/engine/NadelIncrementalResultAccumulator.kt @@ -40,7 +40,7 @@ import graphql.normalized.incremental.NormalizedDeferredExecution * the user. */ class NadelIncrementalResultAccumulator( - private val operation: ExecutableNormalizedOperation, + operation: ExecutableNormalizedOperation, ) { data class DeferAccumulatorKey( val incrementalPayloadPath: List, diff --git a/lib/src/main/java/graphql/nadel/engine/NadelIncrementalResultSupport.kt b/lib/src/main/java/graphql/nadel/engine/NadelIncrementalResultSupport.kt index 0843073b9..0de454cd6 100644 --- a/lib/src/main/java/graphql/nadel/engine/NadelIncrementalResultSupport.kt +++ b/lib/src/main/java/graphql/nadel/engine/NadelIncrementalResultSupport.kt @@ -1,8 +1,8 @@ package graphql.nadel.engine import graphql.incremental.DelayedIncrementalPartialResult +import graphql.incremental.DelayedIncrementalPartialResultImpl import graphql.nadel.engine.NadelIncrementalResultSupport.OutstandingJobCounter.OutstandingJobHandle -import graphql.nadel.engine.util.copy import graphql.nadel.util.getLogger import graphql.normalized.ExecutableNormalizedOperation import kotlinx.coroutines.CompletableDeferred @@ -15,6 +15,7 @@ import kotlinx.coroutines.channels.BufferOverflow import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.consumeAsFlow +import kotlinx.coroutines.flow.onCompletion import kotlinx.coroutines.launch import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock @@ -89,6 +90,8 @@ class NadelIncrementalResultSupport internal constructor( val next = accumulator.getIncrementalPartialResult(hasNext) if (next != null) { delayedResultsChannel.send(next) + } else if (!hasNext) { + delayedResultsChannel.send(emptyLastResult()) } } } @@ -118,6 +121,8 @@ class NadelIncrementalResultSupport internal constructor( val next = accumulator.getIncrementalPartialResult(hasNext) if (next != null) { delayedResultsChannel.send(next) + } else if (!hasNext) { + delayedResultsChannel.send(emptyLastResult()) } } } @@ -134,7 +139,9 @@ class NadelIncrementalResultSupport internal constructor( * There should never be more than one consumer. If you need multiple, you can wrap the [Flow] object. */ fun resultFlow(): Flow { - return resultFlow + return resultFlow.onCompletion { + close() + } } fun onInitialResultComplete() { @@ -145,19 +152,17 @@ class NadelIncrementalResultSupport internal constructor( initialCompletionLock.complete(Unit) } - fun close() { + private fun close() { coroutineScope.cancel() } - private fun quickCopy( - subject: DelayedIncrementalPartialResult, - hasNext: Boolean, - ): DelayedIncrementalPartialResult { - return if (subject.hasNext() == hasNext) { - subject - } else { - subject.copy(hasNext = hasNext) - } + // We have to return hasNext=false to indicate to clients that there's no more data coming. + // Note: the spec allows an empty payload which only contains hastNext=false to be returned. + private fun emptyLastResult(): DelayedIncrementalPartialResult { + return DelayedIncrementalPartialResultImpl.newIncrementalExecutionResult() + .incrementalItems(emptyList()) + .hasNext(false) + .build() } /** diff --git a/lib/src/main/java/graphql/nadel/engine/transform/hydration/NadelHydrationTransform.kt b/lib/src/main/java/graphql/nadel/engine/transform/hydration/NadelHydrationTransform.kt index 99cb7d2f8..fb1e627de 100644 --- a/lib/src/main/java/graphql/nadel/engine/transform/hydration/NadelHydrationTransform.kt +++ b/lib/src/main/java/graphql/nadel/engine/transform/hydration/NadelHydrationTransform.kt @@ -239,6 +239,10 @@ internal class NadelHydrationTransform( ) } + if(preparedHydrations.isEmpty()) { + return + } + // This isn't really right… but we start with this val label = overallField.deferredExecutions.firstNotNullOfOrNull { it.label } diff --git a/lib/src/test/kotlin/graphql/nadel/engine/NadelIncrementalResultSupportTest.kt b/lib/src/test/kotlin/graphql/nadel/engine/NadelIncrementalResultSupportTest.kt index ec1375cbb..2029c58f6 100644 --- a/lib/src/test/kotlin/graphql/nadel/engine/NadelIncrementalResultSupportTest.kt +++ b/lib/src/test/kotlin/graphql/nadel/engine/NadelIncrementalResultSupportTest.kt @@ -373,7 +373,11 @@ class NadelIncrementalResultSupportTest { // Then subject.onInitialResultComplete() - assertTrue(channel.toList().isEmpty()) + val elements = channel.toList() + + assertTrue(elements.size == 1) + assertFalse(elements[0].hasNext()) + assertTrue(elements[0].incremental!!.isEmpty()) verifyOrder { accumulator.accumulate(any()) diff --git a/test/src/test/kotlin/graphql/nadel/tests/next/NadelIntegrationTest.kt b/test/src/test/kotlin/graphql/nadel/tests/next/NadelIntegrationTest.kt index 2396cbd75..24d245e63 100644 --- a/test/src/test/kotlin/graphql/nadel/tests/next/NadelIntegrationTest.kt +++ b/test/src/test/kotlin/graphql/nadel/tests/next/NadelIntegrationTest.kt @@ -379,7 +379,9 @@ abstract class NadelIntegrationTest( // Note: there exists a IncrementalExecutionResult.getIncremental but that is part of the initial result assertTrue(result is IncrementalExecutionResult) - // Fuck why delayed & incremental?? Shouldn't incremental == delayed? Why is there an optional synchronous incremental?? + // Note: the spec allows for a list of "incremental" results which is sent as part of the initial + // result (so not delivered in a delayed fashion). This var represents the incremental results that were + // sent in a delayed fashion. val actualDelayedResponses = incrementalResults!! // Should only have one element that says hasNext=false, and it should be the last one