Skip to content

Commit

Permalink
List defer hydrations (#535)
Browse files Browse the repository at this point in the history
* Implement result path tracking

* Execute query without defer directive and assert result is same

* Fixes and 2d array test

* Clean up code and tests

* Add some slight doco

* Add test for forwarding errors

* Set path for deferred hydration errors

* Delete comment

* Clean up code

* Improvements + refactoring after benchmarking

* Update run config

* Fix the damn tests

* More naming fixes

* Fix dumb merge conflict

* revert some useless changes

* Add back comment

* Better arg names

* Add doco

* Update doco

* Fix build

* Add NadelResultPath value type so it doesn't feed into List params

* Cleanup code
  • Loading branch information
gnawf authored Aug 5, 2024
1 parent fe6be8d commit eac8762
Show file tree
Hide file tree
Showing 32 changed files with 3,010 additions and 247 deletions.
10 changes: 10 additions & 0 deletions .run/Update Test Snapshots.run.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="Update Test Snapshots" type="JetRunConfigurationType">
<option name="MAIN_CLASS_NAME" value="graphql.nadel.tests.next.UpdateTestSnapshotsKt" />
<module name="nadel.test.test" />
<shortenClasspath name="NONE" />
<method v="2">
<option name="Make" enabled="true" />
</method>
</configuration>
</component>
3 changes: 2 additions & 1 deletion lib/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ dependencies {
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:5.10.1")

testImplementation("org.slf4j:slf4j-simple:$slf4jVersion")
testImplementation("com.fasterxml.jackson.core:jackson-databind:2.15.3")
testImplementation("com.fasterxml.jackson.core:jackson-databind:2.17.0")
testImplementation("com.fasterxml.jackson.module:jackson-module-kotlin:2.17.0")
testImplementation("org.openjdk.jmh:jmh-core:1.37")
testImplementation("org.openjdk.jmh:jmh-generator-annprocess:1.37")

Expand Down
7 changes: 7 additions & 0 deletions lib/src/main/java/graphql/nadel/NextgenEngine.kt
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ import graphql.nadel.instrumentation.parameters.NadelInstrumentationTimingParame
import graphql.nadel.instrumentation.parameters.NadelInstrumentationTimingParameters.RootStep
import graphql.nadel.instrumentation.parameters.child
import graphql.nadel.schema.NadelDirectives.namespacedDirectiveDefinition
import graphql.nadel.result.NadelResultMerger
import graphql.nadel.result.NadelResultTracker
import graphql.nadel.util.OperationNameUtil
import graphql.normalized.ExecutableNormalizedField
import graphql.normalized.ExecutableNormalizedOperationFactory.createExecutableNormalizedOperationWithRawVariables
Expand Down Expand Up @@ -160,6 +162,7 @@ internal class NextgenEngine(
}

val incrementalResultSupport = NadelIncrementalResultSupport()
val resultTracker = NadelResultTracker()
val executionContext = NadelExecutionContext(
executionInput,
query,
Expand All @@ -168,6 +171,7 @@ internal class NextgenEngine(
instrumentationState,
timer,
incrementalResultSupport,
resultTracker,
)

val beginExecuteContext = instrumentation.beginExecute(
Expand Down Expand Up @@ -214,6 +218,9 @@ internal class NextgenEngine(
beginExecuteContext?.onCompleted(result, null)
incrementalResultSupport.onInitialResultComplete()

// todo: maybe pass in the incremental version that's built below into here
resultTracker.complete(result)

return if (incrementalResultSupport.hasDeferredResults()) {
IncrementalExecutionResultImpl.Builder()
.from(result)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import graphql.nadel.ServiceExecutionHydrationDetails
import graphql.nadel.engine.instrumentation.NadelInstrumentationTimer
import graphql.nadel.hooks.CreateServiceContextParams
import graphql.nadel.hooks.NadelExecutionHooks
import graphql.nadel.result.NadelResultTracker
import graphql.normalized.ExecutableNormalizedOperation
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ConcurrentHashMap
Expand All @@ -21,6 +22,7 @@ data class NadelExecutionContext internal constructor(
val instrumentationState: InstrumentationState?,
internal val timer: NadelInstrumentationTimer,
internal val incrementalResultSupport: NadelIncrementalResultSupport,
internal val resultTracker: NadelResultTracker,
internal val hydrationDetails: ServiceExecutionHydrationDetails? = null,
) {
private val serviceContexts = ConcurrentHashMap<String, CompletableFuture<Any?>>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,23 @@ import graphql.nadel.engine.transform.NadelTransformUtil.makeTypeNameField
import graphql.nadel.engine.transform.artificial.NadelAliasHelper
import graphql.nadel.engine.transform.getInstructionsForNode
import graphql.nadel.engine.transform.hydration.NadelHydrationTransform.State
import graphql.nadel.engine.transform.hydration.NadelHydrationUtil.getInstructionsToAddErrors
import graphql.nadel.engine.transform.query.NadelQueryPath
import graphql.nadel.engine.transform.query.NadelQueryTransformer
import graphql.nadel.engine.transform.result.NadelResultInstruction
import graphql.nadel.engine.transform.result.NadelResultKey
import graphql.nadel.engine.transform.result.json.JsonNode
import graphql.nadel.engine.transform.result.json.JsonNodeExtractor
import graphql.nadel.engine.transform.result.json.JsonNodes
import graphql.nadel.engine.util.JsonMap
import graphql.nadel.engine.util.emptyOrSingle
import graphql.nadel.engine.util.getFieldDefinitionSequence
import graphql.nadel.engine.util.isList
import graphql.nadel.engine.util.queryPath
import graphql.nadel.engine.util.toBuilder
import graphql.nadel.engine.util.toGraphQLError
import graphql.nadel.engine.util.unwrapNonNull
import graphql.nadel.hooks.NadelExecutionHooks
import graphql.nadel.result.NadelResultPath
import graphql.nadel.result.NadelResultPathSegment
import graphql.normalized.ExecutableNormalizedField
import graphql.schema.FieldCoordinates
import kotlinx.coroutines.async
Expand Down Expand Up @@ -175,7 +177,7 @@ internal class NadelHydrationTransform(
): List<NadelResultInstruction> {
return coroutineScope {
parentNodes
.map {
.mapNotNull {
prepareHydration(
parentNode = it,
state = state,
Expand All @@ -190,7 +192,25 @@ internal class NadelHydrationTransform(
}
}
.awaitAll()
.flatten()
.flatMap { hydration ->
val setData = sequenceOf(
NadelResultInstruction.Set(
subject = hydration.parentNode,
newValue = hydration.newValue,
field = overallField,
),
)
val addErrors = hydration.errors
.asSequence()
.map { error ->
toGraphQLError(error)
}
.map {
NadelResultInstruction.AddError(it)
}

setData + addErrors
}
}
}

Expand All @@ -203,60 +223,60 @@ internal class NadelHydrationTransform(
) {
// Prepare the hydrations before we go async
// We need to do this because if we run it async below, we cannot guarantee that our artificial fields have not yet been removed
val hydrations = parentNodes.map {
prepareHydration(
parentNode = it,
state = state,
executionBlueprint = executionBlueprint,
fieldToHydrate = overallField,
executionContext = executionContext,
)
}
val preparedHydrations = parentNodes
.mapNotNull {
prepareHydration(
parentNode = it,
state = state,
executionBlueprint = executionBlueprint,
fieldToHydrate = overallField,
executionContext = executionContext,
)
}

// This isn't really right… but we start with this
val label = overallField.deferredExecutions.firstNotNullOfOrNull { it.label }

executionContext.incrementalResultSupport.defer {
val instructionSequence = hydrations
val hydrations = preparedHydrations
.map {
async {
it.hydrate()
}
}
.awaitAll()
.asSequence()
.flatten()

val results = instructionSequence
.filterIsInstance<NadelResultInstruction.Set>()
.emptyOrSingle()

DelayedIncrementalPartialResultImpl.Builder()
.incrementalItems(
listOf(
DeferPayload.Builder()
.label(label)
.data(
mapOf(
overallField.resultKey to results?.newValue?.value,
),
)
.path(
overallField.parent?.listOfResultKeys?.let {
@Suppress("USELESS_CAST") // It's not useless because Java (yay)
it as List<Any>
} ?: emptyList()
)
.errors(
instructionSequence
.filterIsInstance<NadelResultInstruction.AddError>()
.map {
it.error
}
.toList(),
)
.build(),
),
hydrations
.map { hydration -> // Hydration of one parent node
val data = hydration.newValue

val parentPath = executionContext.resultTracker.getResultPath(
overallField.queryPath.dropLast(1),
hydration.parentNode,
)!!
val path = parentPath + overallField.resultKey

DeferPayload.newDeferredItem()
.label(label)
.data(
mapOf(
overallField.resultKey to data?.value,
),
)
.path(parentPath.toRawPath())
.errors(
hydration.errors
.map {
toGraphQLError(
raw = it,
path = path.toRawPath(),
)
},
)
.build()
}
)
.build()
}
Expand All @@ -268,7 +288,7 @@ internal class NadelHydrationTransform(
executionBlueprint: NadelOverallExecutionBlueprint,
fieldToHydrate: ExecutableNormalizedField, // Field asking for hydration from the overall query
executionContext: NadelExecutionContext,
): NadelPreparedHydration {
): NadelPreparedHydration? {
val instructions = state.instructionsByObjectTypeNames.getInstructionsForNode(
executionBlueprint = executionBlueprint,
service = state.hydratedFieldService,
Expand All @@ -278,19 +298,15 @@ internal class NadelHydrationTransform(

// Do nothing if there is no hydration instruction associated with this result
if (instructions.isEmpty()) {
return NadelPreparedHydration {
emptyList()
}
return null
}

val instruction = getHydrationFieldInstruction(state, instructions, executionContext.hooks, parentNode)
?: return NadelPreparedHydration {
listOf(
NadelResultInstruction.Set(
subject = parentNode,
key = NadelResultKey(state.hydratedField.resultKey),
newValue = null,
),
NadelHydrationResult(
parentNode = parentNode,
newValue = null,
errors = emptyList(),
)
}

Expand Down Expand Up @@ -343,33 +359,26 @@ internal class NadelHydrationTransform(
).emptyOrSingle()
}

val errors = result?.let(::getInstructionsToAddErrors) ?: emptyList()

listOf(
NadelResultInstruction.Set(
subject = parentNode,
key = NadelResultKey(fieldToHydrate.resultKey),
newValue = JsonNode(data?.value),
),
) + errors
NadelHydrationResult(
parentNode = parentNode,
newValue = JsonNode(data?.value),
errors = result?.errors ?: emptyList(),
)
}
is NadelHydrationStrategy.ManyToOne -> {
val data = actorQueryResults.map { result ->
JsonNodeExtractor.getNodesAt(
data = result.data,
queryPath = instruction.queryPathToActorField,
).emptyOrSingle()?.value
}

val addErrors = getInstructionsToAddErrors(actorQueryResults)
val data = actorQueryResults
.map { result ->
JsonNodeExtractor.getNodesAt(
data = result.data,
queryPath = instruction.queryPathToActorField,
).emptyOrSingle()?.value
}

listOf(
NadelResultInstruction.Set(
subject = parentNode,
key = NadelResultKey(fieldToHydrate.resultKey),
newValue = JsonNode(data),
),
) + addErrors
NadelHydrationResult(
parentNode = parentNode,
newValue = JsonNode(data),
errors = actorQueryResults.flatMap { it.errors },
)
}
}
}
Expand Down Expand Up @@ -409,12 +418,7 @@ internal class NadelHydrationTransform(
return false
}

return if (executionContext.hints.deferSupport() && overallField.deferredExecutions.isNotEmpty()) {
// We currently don't support defer if the hydration is inside a List
return !areAnyParentFieldsOutputtingLists(overallField, executionBlueprint)
} else {
false
}
return executionContext.hints.deferSupport() && overallField.deferredExecutions.isNotEmpty()
}

private fun areAnyParentFieldsOutputtingLists(
Expand Down Expand Up @@ -466,5 +470,11 @@ internal class NadelHydrationTransform(
* So we "prepare" a hydration to ensure we have the value of the artificial field before it gets removed.
*/
private fun interface NadelPreparedHydration {
suspend fun hydrate(): List<NadelResultInstruction>
suspend fun hydrate(): NadelHydrationResult
}

private data class NadelHydrationResult(
val parentNode: JsonNode,
val newValue: JsonNode?,
val errors: List<JsonMap>,
)
Original file line number Diff line number Diff line change
Expand Up @@ -13,35 +13,35 @@ internal object NadelHydrationUtil {
@JvmName("getInstructionsToAddErrors_2")
fun getInstructionsToAddErrors(
results: List<NadelResolvedObjectBatch>,
): List<NadelResultInstruction> {
): List<NadelResultInstruction.AddError> {
return results
.asSequence()
.map(NadelResolvedObjectBatch::result)
.flatMap(::sequenceOfInstructionsToAddErrors)
.flatMap(::getInstructionsToAddErrorsSequence)
.toList()
}

fun getInstructionsToAddErrors(
results: List<ServiceExecutionResult>,
): List<NadelResultInstruction> {
): List<NadelResultInstruction.AddError> {
return results
.asSequence()
.flatMap(::sequenceOfInstructionsToAddErrors)
.flatMap(::getInstructionsToAddErrorsSequence)
.toList()
}

fun getInstructionsToAddErrors(
result: ServiceExecutionResult,
): List<NadelResultInstruction> {
return sequenceOfInstructionsToAddErrors(result).toList()
): List<NadelResultInstruction.AddError> {
return getInstructionsToAddErrorsSequence(result).toList()
}

/**
* Do not expose sequences as those
*/
private fun sequenceOfInstructionsToAddErrors(
private fun getInstructionsToAddErrorsSequence(
result: ServiceExecutionResult,
): Sequence<NadelResultInstruction> {
): Sequence<NadelResultInstruction.AddError> {
return result.errors
.asSequence()
.map(::toGraphQLError)
Expand Down
Loading

0 comments on commit eac8762

Please sign in to comment.