Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid hasNext=true on the last incremental payload #588

Merged
merged 4 commits into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import graphql.normalized.incremental.NormalizedDeferredExecution
* the user.
*/
class NadelIncrementalResultAccumulator(
private val operation: ExecutableNormalizedOperation,
operation: ExecutableNormalizedOperation,
) {
data class DeferAccumulatorKey(
val incrementalPayloadPath: List<Any>,
Expand Down Expand Up @@ -115,6 +115,14 @@ class NadelIncrementalResultAccumulator(
}

if (readyAccumulators.isEmpty()) {
if(!hasNext) {
// We have to return hasNext=false to indicate to clients that there's no more data coming.
// Note: the spec allows an empty payload which only contains hastNext=false to be returned.
return DelayedIncrementalPartialResultImpl.newIncrementalExecutionResult()
.incrementalItems(emptyList())
.hasNext(false)
.build()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this belongs in NadelIncrementalResultSupport

This class just accumulates data. This is some incremental result logic.

}
return null
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,13 @@ package graphql.nadel.engine

import graphql.incremental.DelayedIncrementalPartialResult
import graphql.nadel.engine.NadelIncrementalResultSupport.OutstandingJobCounter.OutstandingJobHandle
import graphql.nadel.engine.util.copy
import graphql.nadel.util.getLogger
import graphql.normalized.ExecutableNormalizedOperation
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow
Expand Down Expand Up @@ -145,21 +143,6 @@ class NadelIncrementalResultSupport internal constructor(
initialCompletionLock.complete(Unit)
}

fun close() {
coroutineScope.cancel()
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HUH, this needs to be called. I think it's a bug that it's not.

i.e. we should cancel the jobs launched on the defer scope if the incremental result is cancelled e.g. on request closed early.

We don't have to address that in this PR.

I suspect we can edit resultFlow to something like

    fun resultFlow(): Flow<DelayedIncrementalPartialResult> {
        return resultFlow
            .onCompletion {
                close()
            }
    }

Where onCompletion is invoked on cancellation.


private fun quickCopy(
subject: DelayedIncrementalPartialResult,
hasNext: Boolean,
): DelayedIncrementalPartialResult {
return if (subject.hasNext() == hasNext) {
subject
} else {
subject.copy(hasNext = hasNext)
}
}

/**
* Launches a job and increments the outstanding job handle.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,12 +174,42 @@ class NadelIncrementalResultAccumulatorTest {
)

// Then
val result = accumulator.getIncrementalPartialResult(false)
val result = accumulator.getIncrementalPartialResult(true)
assertTrue(result == null)
}
}
}

@Test
fun `yields empty payload if hasNext=false and accumulator is empty`() {
val accumulator = makeAccumulator(
query = """
query {
issue(id: "1") {
... @defer {
id
assignee {
name
}
}
}
}
""".trimIndent(),
)

// When
accumulator.accumulate(
DelayedIncrementalPartialResultImpl.newIncrementalExecutionResult()
.incrementalItems(emptyList())
.build(),
)

// Then
val result = accumulator.getIncrementalPartialResult(false)
assertTrue(result!!.incremental!!.isEmpty())
assertFalse(result.hasNext())
}

@Test
fun `yields list elements that are complete`() {
val accumulator = makeAccumulator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,9 @@ abstract class NadelIntegrationTest(
// Note: there exists a IncrementalExecutionResult.getIncremental but that is part of the initial result
assertTrue(result is IncrementalExecutionResult)

// Fuck why delayed & incremental?? Shouldn't incremental == delayed? Why is there an optional synchronous incremental??
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, should've deleted that

// Note: the spec allows for a list of "incremental" results which is sent as part of the initial
// result (so not delivered in a delayed fashion). This var represents the incremental results that were
// sent in a delayed fashion.
val actualDelayedResponses = incrementalResults!!

// Should only have one element that says hasNext=false, and it should be the last one
Expand Down
Loading