Skip to content

Commit

Permalink
Bulk load CDK: nondocker runner throws exception if destination fails (
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao authored Oct 17, 2024
1 parent 55a7522 commit 871db55
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ sealed class AirbyteConnectorRunner(
if (!isTest) {
// Required by the platform, otherwise syncs may hang.
exitProcess(exitCode)
} else if (exitCode != 0) {
// Otherwise, propagate failure to test callers.
throw ConnectorUncleanExitException(exitCode)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk

class ConnectorUncleanExitException(val exitCode: Int) :
Exception("Destination process exited uncleanly: $exitCode")
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.test.util.destination_process

import io.airbyte.protocol.models.v0.AirbyteErrorTraceMessage
import io.airbyte.protocol.models.v0.AirbyteTraceMessage

class DestinationUncleanExitException(
exitCode: Int,
traceMessages: List<AirbyteErrorTraceMessage>
) :
Exception(
"""
Destination process exited uncleanly: $exitCode
Trace messages:
""".trimIndent()
// explicit concat because otherwise trimIndent behaves badly
+ traceMessages
) {
companion object {
// generic type erasure strikes again >.>
// this can't just be a second constructor, because both constructors
// would have signature `traceMessages: List`.
// so we have to pull this into a companion object function.
fun of(exitCode: Int, traceMessages: List<AirbyteTraceMessage>) =
DestinationUncleanExitException(
exitCode,
traceMessages.filter { it.type == AirbyteTraceMessage.Type.ERROR }.map { it.error }
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,8 @@ import com.google.common.collect.Lists
import io.airbyte.cdk.command.ConfigurationSpecification
import io.airbyte.cdk.output.BufferingOutputConsumer
import io.airbyte.cdk.util.Jsons
import io.airbyte.protocol.models.v0.AirbyteErrorTraceMessage
import io.airbyte.protocol.models.v0.AirbyteLogMessage
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.AirbyteTraceMessage
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.context.annotation.Requires
Expand Down Expand Up @@ -207,32 +205,12 @@ class DockerizedDestination(
stderrDrained.join()
val exitCode = process.exitValue()
if (exitCode != 0) {
// Hey look, it's possible to extract the error from a failed destination process!
// because "destination exit code 1" is the least-helpful error message.
val filteredTraces =
destinationOutput
.traces()
.filter { it.type == AirbyteTraceMessage.Type.ERROR }
.map { it.error }
throw DestinationUncleanExitException(exitCode, filteredTraces)
throw DestinationUncleanExitException.of(exitCode, destinationOutput.traces())
}
}
}
}

class DestinationUncleanExitException(
exitCode: Int,
traceMessages: List<AirbyteErrorTraceMessage>
) :
Exception(
"""
Destination process exited uncleanly: $exitCode
Trace messages:
""".trimIndent()
// explicit concat because otherwise trimIndent behaves badly
+ traceMessages
)

@Singleton
@Requires(env = [DOCKERIZED_TEST_ENV])
class DockerizedDestinationFactory(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.cdk.load.test.util.destination_process

import io.airbyte.cdk.ConnectorUncleanExitException
import io.airbyte.cdk.command.CliRunnable
import io.airbyte.cdk.command.CliRunner
import io.airbyte.cdk.command.ConfigurationSpecification
Expand Down Expand Up @@ -53,7 +54,11 @@ class NonDockerizedDestination(
}

override suspend fun run() {
destination.run()
try {
destination.run()
} catch (e: ConnectorUncleanExitException) {
throw DestinationUncleanExitException.of(e.exitCode, destination.results.traces())
}
}

override fun sendMessage(message: AirbyteMessage) {
Expand Down

0 comments on commit 871db55

Please sign in to comment.