From a38035f2a5470f5c56c66f06b94a3954d3e90812 Mon Sep 17 00:00:00 2001 From: Felipe Reis Date: Fri, 6 Sep 2024 10:30:44 +1000 Subject: [PATCH 1/4] Avoid hasNext=true on the last incremental payload --- .../engine/NadelIncrementalResultAccumulator.kt | 10 +++++++++- .../engine/NadelIncrementalResultSupport.kt | 17 ----------------- .../nadel/tests/next/NadelIntegrationTest.kt | 4 +++- 3 files changed, 12 insertions(+), 19 deletions(-) diff --git a/lib/src/main/java/graphql/nadel/engine/NadelIncrementalResultAccumulator.kt b/lib/src/main/java/graphql/nadel/engine/NadelIncrementalResultAccumulator.kt index 7b72ce31c..252a1e60d 100644 --- a/lib/src/main/java/graphql/nadel/engine/NadelIncrementalResultAccumulator.kt +++ b/lib/src/main/java/graphql/nadel/engine/NadelIncrementalResultAccumulator.kt @@ -40,7 +40,7 @@ import graphql.normalized.incremental.NormalizedDeferredExecution * the user. */ class NadelIncrementalResultAccumulator( - private val operation: ExecutableNormalizedOperation, + operation: ExecutableNormalizedOperation, ) { data class DeferAccumulatorKey( val incrementalPayloadPath: List, @@ -115,6 +115,14 @@ class NadelIncrementalResultAccumulator( } if (readyAccumulators.isEmpty()) { + if(!hasNext) { + // We have to return hasNext=false to indicate to clients that there's no more data coming. + // Note: the spec allows an empty payload which only contains hastNext=false to be returned. + return DelayedIncrementalPartialResultImpl.newIncrementalExecutionResult() + .incrementalItems(emptyList()) + .hasNext(false) + .build() + } return null } diff --git a/lib/src/main/java/graphql/nadel/engine/NadelIncrementalResultSupport.kt b/lib/src/main/java/graphql/nadel/engine/NadelIncrementalResultSupport.kt index 0843073b9..a847f5866 100644 --- a/lib/src/main/java/graphql/nadel/engine/NadelIncrementalResultSupport.kt +++ b/lib/src/main/java/graphql/nadel/engine/NadelIncrementalResultSupport.kt @@ -2,7 +2,6 @@ package graphql.nadel.engine import graphql.incremental.DelayedIncrementalPartialResult import graphql.nadel.engine.NadelIncrementalResultSupport.OutstandingJobCounter.OutstandingJobHandle -import graphql.nadel.engine.util.copy import graphql.nadel.util.getLogger import graphql.normalized.ExecutableNormalizedOperation import kotlinx.coroutines.CompletableDeferred @@ -10,7 +9,6 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Job import kotlinx.coroutines.SupervisorJob -import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.BufferOverflow import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.Flow @@ -145,21 +143,6 @@ class NadelIncrementalResultSupport internal constructor( initialCompletionLock.complete(Unit) } - fun close() { - coroutineScope.cancel() - } - - private fun quickCopy( - subject: DelayedIncrementalPartialResult, - hasNext: Boolean, - ): DelayedIncrementalPartialResult { - return if (subject.hasNext() == hasNext) { - subject - } else { - subject.copy(hasNext = hasNext) - } - } - /** * Launches a job and increments the outstanding job handle. * diff --git a/test/src/test/kotlin/graphql/nadel/tests/next/NadelIntegrationTest.kt b/test/src/test/kotlin/graphql/nadel/tests/next/NadelIntegrationTest.kt index 2396cbd75..24d245e63 100644 --- a/test/src/test/kotlin/graphql/nadel/tests/next/NadelIntegrationTest.kt +++ b/test/src/test/kotlin/graphql/nadel/tests/next/NadelIntegrationTest.kt @@ -379,7 +379,9 @@ abstract class NadelIntegrationTest( // Note: there exists a IncrementalExecutionResult.getIncremental but that is part of the initial result assertTrue(result is IncrementalExecutionResult) - // Fuck why delayed & incremental?? Shouldn't incremental == delayed? Why is there an optional synchronous incremental?? + // Note: the spec allows for a list of "incremental" results which is sent as part of the initial + // result (so not delivered in a delayed fashion). This var represents the incremental results that were + // sent in a delayed fashion. val actualDelayedResponses = incrementalResults!! // Should only have one element that says hasNext=false, and it should be the last one From 9e91db56d645275b85c4a24b04794fdaa44cdd73 Mon Sep 17 00:00:00 2001 From: Felipe Reis Date: Fri, 6 Sep 2024 10:44:13 +1000 Subject: [PATCH 2/4] Adjust test case --- .../nadel/engine/NadelIncrementalResultAccumulatorTest.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/src/test/kotlin/graphql/nadel/engine/NadelIncrementalResultAccumulatorTest.kt b/lib/src/test/kotlin/graphql/nadel/engine/NadelIncrementalResultAccumulatorTest.kt index ffc166506..455d4da1e 100644 --- a/lib/src/test/kotlin/graphql/nadel/engine/NadelIncrementalResultAccumulatorTest.kt +++ b/lib/src/test/kotlin/graphql/nadel/engine/NadelIncrementalResultAccumulatorTest.kt @@ -174,7 +174,7 @@ class NadelIncrementalResultAccumulatorTest { ) // Then - val result = accumulator.getIncrementalPartialResult(false) + val result = accumulator.getIncrementalPartialResult(true) assertTrue(result == null) } } From 29beff52d3d570a99f659c1a9e1906e18ae41670 Mon Sep 17 00:00:00 2001 From: Felipe Reis Date: Fri, 6 Sep 2024 10:50:30 +1000 Subject: [PATCH 3/4] Add new test case --- .../NadelIncrementalResultAccumulatorTest.kt | 30 +++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/lib/src/test/kotlin/graphql/nadel/engine/NadelIncrementalResultAccumulatorTest.kt b/lib/src/test/kotlin/graphql/nadel/engine/NadelIncrementalResultAccumulatorTest.kt index 455d4da1e..2dd8b4c69 100644 --- a/lib/src/test/kotlin/graphql/nadel/engine/NadelIncrementalResultAccumulatorTest.kt +++ b/lib/src/test/kotlin/graphql/nadel/engine/NadelIncrementalResultAccumulatorTest.kt @@ -180,6 +180,36 @@ class NadelIncrementalResultAccumulatorTest { } } + @Test + fun `yields empty payload if hasNext=false and accumulator is empty`() { + val accumulator = makeAccumulator( + query = """ + query { + issue(id: "1") { + ... @defer { + id + assignee { + name + } + } + } + } + """.trimIndent(), + ) + + // When + accumulator.accumulate( + DelayedIncrementalPartialResultImpl.newIncrementalExecutionResult() + .incrementalItems(emptyList()) + .build(), + ) + + // Then + val result = accumulator.getIncrementalPartialResult(false) + assertTrue(result!!.incremental!!.isEmpty()) + assertFalse(result.hasNext()) + } + @Test fun `yields list elements that are complete`() { val accumulator = makeAccumulator( From 3f0a60790d885e456e63bad203de4ed9441661a7 Mon Sep 17 00:00:00 2001 From: Felipe Reis Date: Fri, 6 Sep 2024 12:44:00 +1000 Subject: [PATCH 4/4] Adjustments after review. Thanks @gnawf --- .../NadelIncrementalResultAccumulator.kt | 8 ----- .../engine/NadelIncrementalResultSupport.kt | 24 +++++++++++++- .../hydration/NadelHydrationTransform.kt | 4 +++ .../NadelIncrementalResultAccumulatorTest.kt | 32 +------------------ .../NadelIncrementalResultSupportTest.kt | 6 +++- 5 files changed, 33 insertions(+), 41 deletions(-) diff --git a/lib/src/main/java/graphql/nadel/engine/NadelIncrementalResultAccumulator.kt b/lib/src/main/java/graphql/nadel/engine/NadelIncrementalResultAccumulator.kt index 252a1e60d..5eec863ac 100644 --- a/lib/src/main/java/graphql/nadel/engine/NadelIncrementalResultAccumulator.kt +++ b/lib/src/main/java/graphql/nadel/engine/NadelIncrementalResultAccumulator.kt @@ -115,14 +115,6 @@ class NadelIncrementalResultAccumulator( } if (readyAccumulators.isEmpty()) { - if(!hasNext) { - // We have to return hasNext=false to indicate to clients that there's no more data coming. - // Note: the spec allows an empty payload which only contains hastNext=false to be returned. - return DelayedIncrementalPartialResultImpl.newIncrementalExecutionResult() - .incrementalItems(emptyList()) - .hasNext(false) - .build() - } return null } diff --git a/lib/src/main/java/graphql/nadel/engine/NadelIncrementalResultSupport.kt b/lib/src/main/java/graphql/nadel/engine/NadelIncrementalResultSupport.kt index a847f5866..0de454cd6 100644 --- a/lib/src/main/java/graphql/nadel/engine/NadelIncrementalResultSupport.kt +++ b/lib/src/main/java/graphql/nadel/engine/NadelIncrementalResultSupport.kt @@ -1,6 +1,7 @@ package graphql.nadel.engine import graphql.incremental.DelayedIncrementalPartialResult +import graphql.incremental.DelayedIncrementalPartialResultImpl import graphql.nadel.engine.NadelIncrementalResultSupport.OutstandingJobCounter.OutstandingJobHandle import graphql.nadel.util.getLogger import graphql.normalized.ExecutableNormalizedOperation @@ -9,10 +10,12 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Job import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.BufferOverflow import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.consumeAsFlow +import kotlinx.coroutines.flow.onCompletion import kotlinx.coroutines.launch import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock @@ -87,6 +90,8 @@ class NadelIncrementalResultSupport internal constructor( val next = accumulator.getIncrementalPartialResult(hasNext) if (next != null) { delayedResultsChannel.send(next) + } else if (!hasNext) { + delayedResultsChannel.send(emptyLastResult()) } } } @@ -116,6 +121,8 @@ class NadelIncrementalResultSupport internal constructor( val next = accumulator.getIncrementalPartialResult(hasNext) if (next != null) { delayedResultsChannel.send(next) + } else if (!hasNext) { + delayedResultsChannel.send(emptyLastResult()) } } } @@ -132,7 +139,9 @@ class NadelIncrementalResultSupport internal constructor( * There should never be more than one consumer. If you need multiple, you can wrap the [Flow] object. */ fun resultFlow(): Flow { - return resultFlow + return resultFlow.onCompletion { + close() + } } fun onInitialResultComplete() { @@ -143,6 +152,19 @@ class NadelIncrementalResultSupport internal constructor( initialCompletionLock.complete(Unit) } + private fun close() { + coroutineScope.cancel() + } + + // We have to return hasNext=false to indicate to clients that there's no more data coming. + // Note: the spec allows an empty payload which only contains hastNext=false to be returned. + private fun emptyLastResult(): DelayedIncrementalPartialResult { + return DelayedIncrementalPartialResultImpl.newIncrementalExecutionResult() + .incrementalItems(emptyList()) + .hasNext(false) + .build() + } + /** * Launches a job and increments the outstanding job handle. * diff --git a/lib/src/main/java/graphql/nadel/engine/transform/hydration/NadelHydrationTransform.kt b/lib/src/main/java/graphql/nadel/engine/transform/hydration/NadelHydrationTransform.kt index 99cb7d2f8..fb1e627de 100644 --- a/lib/src/main/java/graphql/nadel/engine/transform/hydration/NadelHydrationTransform.kt +++ b/lib/src/main/java/graphql/nadel/engine/transform/hydration/NadelHydrationTransform.kt @@ -239,6 +239,10 @@ internal class NadelHydrationTransform( ) } + if(preparedHydrations.isEmpty()) { + return + } + // This isn't really right… but we start with this val label = overallField.deferredExecutions.firstNotNullOfOrNull { it.label } diff --git a/lib/src/test/kotlin/graphql/nadel/engine/NadelIncrementalResultAccumulatorTest.kt b/lib/src/test/kotlin/graphql/nadel/engine/NadelIncrementalResultAccumulatorTest.kt index 2dd8b4c69..ffc166506 100644 --- a/lib/src/test/kotlin/graphql/nadel/engine/NadelIncrementalResultAccumulatorTest.kt +++ b/lib/src/test/kotlin/graphql/nadel/engine/NadelIncrementalResultAccumulatorTest.kt @@ -174,42 +174,12 @@ class NadelIncrementalResultAccumulatorTest { ) // Then - val result = accumulator.getIncrementalPartialResult(true) + val result = accumulator.getIncrementalPartialResult(false) assertTrue(result == null) } } } - @Test - fun `yields empty payload if hasNext=false and accumulator is empty`() { - val accumulator = makeAccumulator( - query = """ - query { - issue(id: "1") { - ... @defer { - id - assignee { - name - } - } - } - } - """.trimIndent(), - ) - - // When - accumulator.accumulate( - DelayedIncrementalPartialResultImpl.newIncrementalExecutionResult() - .incrementalItems(emptyList()) - .build(), - ) - - // Then - val result = accumulator.getIncrementalPartialResult(false) - assertTrue(result!!.incremental!!.isEmpty()) - assertFalse(result.hasNext()) - } - @Test fun `yields list elements that are complete`() { val accumulator = makeAccumulator( diff --git a/lib/src/test/kotlin/graphql/nadel/engine/NadelIncrementalResultSupportTest.kt b/lib/src/test/kotlin/graphql/nadel/engine/NadelIncrementalResultSupportTest.kt index ec1375cbb..2029c58f6 100644 --- a/lib/src/test/kotlin/graphql/nadel/engine/NadelIncrementalResultSupportTest.kt +++ b/lib/src/test/kotlin/graphql/nadel/engine/NadelIncrementalResultSupportTest.kt @@ -373,7 +373,11 @@ class NadelIncrementalResultSupportTest { // Then subject.onInitialResultComplete() - assertTrue(channel.toList().isEmpty()) + val elements = channel.toList() + + assertTrue(elements.size == 1) + assertFalse(elements[0].hasNext()) + assertTrue(elements[0].incremental!!.isEmpty()) verifyOrder { accumulator.accumulate(any())