-
Notifications
You must be signed in to change notification settings - Fork 23
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Avoid hasNext=true on the last incremental payload #588
Avoid hasNext=true on the last incremental payload #588
Conversation
return DelayedIncrementalPartialResultImpl.newIncrementalExecutionResult() | ||
.incrementalItems(emptyList()) | ||
.hasNext(false) | ||
.build() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this belongs in NadelIncrementalResultSupport
This class just accumulates data. This is some incremental result logic.
@@ -145,21 +143,6 @@ class NadelIncrementalResultSupport internal constructor( | |||
initialCompletionLock.complete(Unit) | |||
} | |||
|
|||
fun close() { | |||
coroutineScope.cancel() | |||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
HUH, this needs to be called. I think it's a bug that it's not.
i.e. we should cancel the jobs launched on the defer scope if the incremental result is cancelled e.g. on request closed early.
We don't have to address that in this PR.
I suspect we can edit resultFlow
to something like
fun resultFlow(): Flow<DelayedIncrementalPartialResult> {
return resultFlow
.onCompletion {
close()
}
}
Where onCompletion
is invoked on cancellation.
@@ -379,7 +379,9 @@ abstract class NadelIntegrationTest( | |||
// Note: there exists a IncrementalExecutionResult.getIncremental but that is part of the initial result | |||
assertTrue(result is IncrementalExecutionResult) | |||
|
|||
// Fuck why delayed & incremental?? Shouldn't incremental == delayed? Why is there an optional synchronous incremental?? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, should've deleted that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, thanks.
subject.copy(hasNext = hasNext) | ||
} | ||
// We have to return hasNext=false to indicate to clients that there's no more data coming. | ||
// Note: the spec allows an empty payload which only contains hastNext=false to be returned. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Javadoc to make comment sticky to function
The computation of
hasNext
is somewhat flaky, as we can see in the occasional failure ofHydrationDeferGroupingTest
.The failure occurs because sometimes the last deferred payload contains
hasNext=true
.This is possible since not all results from an outstanding job will be converted to a deferred payload and sent to the results channel.
So we might have something like this
The last (and only) event that was actually emitted has hasNext=true, so clients will hang waiting for a completion event forever.
Note: this PR doesn't fix the flaky test (yet)