From 871db55443d9265a06ffdde5ad5a33ff9c6c1f3e Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Thu, 17 Oct 2024 15:04:17 -0700 Subject: [PATCH] Bulk load CDK: nondocker runner throws exception if destination fails (#46921) --- .../io/airbyte/cdk/AirbyteConnectorRunner.kt | 3 ++ .../cdk/ConnectorUncleanExitException.kt | 8 +++++ .../DestinationUncleanExitException.kt | 33 +++++++++++++++++++ .../DockerizedDestination.kt | 24 +------------- .../NonDockerizedDestination.kt | 7 +++- 5 files changed, 51 insertions(+), 24 deletions(-) create mode 100644 airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/ConnectorUncleanExitException.kt create mode 100644 airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/destination_process/DestinationUncleanExitException.kt diff --git a/airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/AirbyteConnectorRunner.kt b/airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/AirbyteConnectorRunner.kt index 41b8bf1f1bdf..214d0821e2d4 100644 --- a/airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/AirbyteConnectorRunner.kt +++ b/airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/AirbyteConnectorRunner.kt @@ -105,6 +105,9 @@ sealed class AirbyteConnectorRunner( if (!isTest) { // Required by the platform, otherwise syncs may hang. exitProcess(exitCode) + } else if (exitCode != 0) { + // Otherwise, propagate failure to test callers. + throw ConnectorUncleanExitException(exitCode) } } } diff --git a/airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/ConnectorUncleanExitException.kt b/airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/ConnectorUncleanExitException.kt new file mode 100644 index 000000000000..19c108b8966d --- /dev/null +++ b/airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/ConnectorUncleanExitException.kt @@ -0,0 +1,8 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk + +class ConnectorUncleanExitException(val exitCode: Int) : + Exception("Destination process exited uncleanly: $exitCode") diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/destination_process/DestinationUncleanExitException.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/destination_process/DestinationUncleanExitException.kt new file mode 100644 index 000000000000..f07c7cba70b6 --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/destination_process/DestinationUncleanExitException.kt @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.test.util.destination_process + +import io.airbyte.protocol.models.v0.AirbyteErrorTraceMessage +import io.airbyte.protocol.models.v0.AirbyteTraceMessage + +class DestinationUncleanExitException( + exitCode: Int, + traceMessages: List +) : + Exception( + """ + Destination process exited uncleanly: $exitCode + Trace messages: + """.trimIndent() + // explicit concat because otherwise trimIndent behaves badly + + traceMessages + ) { + companion object { + // generic type erasure strikes again >.> + // this can't just be a second constructor, because both constructors + // would have signature `traceMessages: List`. + // so we have to pull this into a companion object function. + fun of(exitCode: Int, traceMessages: List) = + DestinationUncleanExitException( + exitCode, + traceMessages.filter { it.type == AirbyteTraceMessage.Type.ERROR }.map { it.error } + ) + } +} diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/destination_process/DockerizedDestination.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/destination_process/DockerizedDestination.kt index 576e4ec8ed5c..dd310cccafc8 100644 --- a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/destination_process/DockerizedDestination.kt +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/destination_process/DockerizedDestination.kt @@ -8,10 +8,8 @@ import com.google.common.collect.Lists import io.airbyte.cdk.command.ConfigurationSpecification import io.airbyte.cdk.output.BufferingOutputConsumer import io.airbyte.cdk.util.Jsons -import io.airbyte.protocol.models.v0.AirbyteErrorTraceMessage import io.airbyte.protocol.models.v0.AirbyteLogMessage import io.airbyte.protocol.models.v0.AirbyteMessage -import io.airbyte.protocol.models.v0.AirbyteTraceMessage import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog import io.github.oshai.kotlinlogging.KotlinLogging import io.micronaut.context.annotation.Requires @@ -207,32 +205,12 @@ class DockerizedDestination( stderrDrained.join() val exitCode = process.exitValue() if (exitCode != 0) { - // Hey look, it's possible to extract the error from a failed destination process! - // because "destination exit code 1" is the least-helpful error message. - val filteredTraces = - destinationOutput - .traces() - .filter { it.type == AirbyteTraceMessage.Type.ERROR } - .map { it.error } - throw DestinationUncleanExitException(exitCode, filteredTraces) + throw DestinationUncleanExitException.of(exitCode, destinationOutput.traces()) } } } } -class DestinationUncleanExitException( - exitCode: Int, - traceMessages: List -) : - Exception( - """ - Destination process exited uncleanly: $exitCode - Trace messages: - """.trimIndent() - // explicit concat because otherwise trimIndent behaves badly - + traceMessages - ) - @Singleton @Requires(env = [DOCKERIZED_TEST_ENV]) class DockerizedDestinationFactory( diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/destination_process/NonDockerizedDestination.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/destination_process/NonDockerizedDestination.kt index bd669e863df8..60ebaaa03270 100644 --- a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/destination_process/NonDockerizedDestination.kt +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/destination_process/NonDockerizedDestination.kt @@ -4,6 +4,7 @@ package io.airbyte.cdk.load.test.util.destination_process +import io.airbyte.cdk.ConnectorUncleanExitException import io.airbyte.cdk.command.CliRunnable import io.airbyte.cdk.command.CliRunner import io.airbyte.cdk.command.ConfigurationSpecification @@ -53,7 +54,11 @@ class NonDockerizedDestination( } override suspend fun run() { - destination.run() + try { + destination.run() + } catch (e: ConnectorUncleanExitException) { + throw DestinationUncleanExitException.of(e.exitCode, destination.results.traces()) + } } override fun sendMessage(message: AirbyteMessage) {