Skip to content

Commit

Permalink
[DB sources] : Add plumbing for adding transient errors (#38030)
Browse files Browse the repository at this point in the history
  • Loading branch information
akashkulk authored May 9, 2024
1 parent 49bb246 commit d26bd10
Show file tree
Hide file tree
Showing 13 changed files with 77 additions and 23 deletions.
3 changes: 2 additions & 1 deletion airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,8 @@ corresponds to that version.
### Java CDK

| Version | Date | Pull Request | Subject |
|:--------| :--------- | :--------------------------------------------------------- |:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
|:--------|:-----------| :--------------------------------------------------------- |:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.34.1 | 2024-05-07 | [\#38030](https://github.com/airbytehq/airbyte/pull/38030) | Add support for transient errors |
| 0.34.0 | 2024-05-01 | [\#37712](https://github.com/airbytehq/airbyte/pull/37712) | Destinations: Remove incremental T+D |
| 0.33.2 | 2024-05-03 | [\#37824](https://github.com/airbytehq/airbyte/pull/37824) | improve source acceptance tests |
| 0.33.1 | 2024-05-03 | [\#37824](https://github.com/airbytehq/airbyte/pull/37824) | Add a unit test for cursor based sync |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package io.airbyte.cdk.integrations.base

import com.fasterxml.jackson.databind.JsonNode
import com.google.common.annotations.VisibleForTesting
import io.airbyte.cdk.integrations.util.ConnectorExceptionUtil
import java.util.*
import java.util.regex.Pattern
import javax.validation.constraints.NotNull
Expand All @@ -28,13 +27,6 @@ class AirbyteExceptionHandler : Thread.UncaughtExceptionHandler {
// from the spec:
// https://docs.google.com/document/d/1ctrj3Yh_GjtQ93aND-WH3ocqGxsmxyC3jfiarrF6NY0/edit#
LOGGER.error(logMessage, throwable)

val rootThrowable = ConnectorExceptionUtil.getRootConfigError(Exception(throwable))

if (ConnectorExceptionUtil.isConfigError(rootThrowable)) {
terminate()
}

// Attempt to deinterpolate the error message before emitting a trace message
val mangledMessage: String?
// If any exception in the chain is of a deinterpolatable type, find it and deinterpolate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ object AirbyteTraceMessageUtility {

@JvmStatic
fun emitTransientErrorTrace(e: Throwable, displayMessage: String?) {
emitErrorTrace(e, displayMessage, AirbyteErrorTraceMessage.FailureType.SYSTEM_ERROR)
emitErrorTrace(e, displayMessage, AirbyteErrorTraceMessage.FailureType.TRANSIENT_ERROR)
}

fun emitCustomErrorTrace(displayMessage: String?, internalMessage: String?) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,14 +220,11 @@ internal constructor(
// exist, we
// just return the original exception.
ApmTraceUtils.addExceptionToTrace(e)
val rootThrowable = ConnectorExceptionUtil.getRootConfigError(e)
val displayMessage = ConnectorExceptionUtil.getDisplayMessage(rootThrowable)
val rootConfigErrorThrowable = ConnectorExceptionUtil.getRootConfigError(e)
val rootTransientErrorThrowable = ConnectorExceptionUtil.getRootTransientError(e)
// If the source connector throws a config error, a trace message with the relevant
// message should
// be surfaced.
if (ConnectorExceptionUtil.isConfigError(rootThrowable)) {
AirbyteTraceMessageUtility.emitConfigErrorTrace(e, displayMessage)
}
if (parsed.command == Command.CHECK) {
// Currently, special handling is required for the CHECK case since the user display
// information in
Expand All @@ -240,11 +237,30 @@ internal constructor(
.withConnectionStatus(
AirbyteConnectionStatus()
.withStatus(AirbyteConnectionStatus.Status.FAILED)
.withMessage(displayMessage)
.withMessage(
ConnectorExceptionUtil.getDisplayMessage(
rootConfigErrorThrowable
)
)
)
)
return
}

if (ConnectorExceptionUtil.isConfigError(rootConfigErrorThrowable)) {
AirbyteTraceMessageUtility.emitConfigErrorTrace(
e,
ConnectorExceptionUtil.getDisplayMessage(rootConfigErrorThrowable),
)
// On receiving a config error, the container should be immediately shut down.
} else if (ConnectorExceptionUtil.isTransientError(rootTransientErrorThrowable)) {
AirbyteTraceMessageUtility.emitTransientErrorTrace(
e,
ConnectorExceptionUtil.getDisplayMessage(rootTransientErrorThrowable)
)
// On receiving a transient error, the container should be immediately shut down.
System.exit(1)
}
throw e
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import com.google.common.collect.ImmutableList
import io.airbyte.cdk.integrations.base.errors.messages.ErrorMessage
import io.airbyte.commons.exceptions.ConfigErrorException
import io.airbyte.commons.exceptions.ConnectionErrorException
import io.airbyte.commons.exceptions.TransientErrorException
import io.airbyte.commons.functional.Either
import java.sql.SQLException
import java.sql.SQLSyntaxErrorException
Expand All @@ -30,13 +31,18 @@ object ConnectorExceptionUtil {
fun isConfigError(e: Throwable?): Boolean {
return isConfigErrorException(e) ||
isConnectionError(e) ||
isRecoveryConnectionException(e) ||
isUnknownColumnInFieldListException(e)
}

fun isTransientError(e: Throwable?): Boolean {
return isTransientErrorException(e) || isRecoveryConnectionException(e)
}

fun getDisplayMessage(e: Throwable?): String? {
return if (e is ConfigErrorException) {
e.displayMessage
} else if (e is TransientErrorException) {
e.message
} else if (e is ConnectionErrorException) {
ErrorMessage.getErrorMessage(e.stateCode, e.errorCode, e.exceptionMessage, e)
} else if (isRecoveryConnectionException(e)) {
Expand Down Expand Up @@ -67,6 +73,22 @@ object ConnectorExceptionUtil {
return e
}

/**
* Returns the first instance of an exception associated with a configuration error (if it
* exists). Otherwise, the original exception is returned.
*/
fun getRootTransientError(e: Exception?): Throwable? {
var current: Throwable? = e
while (current != null) {
if (isTransientError(current)) {
return current
} else {
current = current.cause
}
}
return e
}

/**
* Log all the exceptions, and rethrow the first. This is useful for e.g. running multiple
* futures and waiting for them to complete/fail. Rather than combining them into a single
Expand Down Expand Up @@ -103,6 +125,10 @@ object ConnectorExceptionUtil {
return eithers.stream().map { obj: Either<out T, Result> -> obj.right!! }.toList()
}

private fun isTransientErrorException(e: Throwable?): Boolean {
return e is TransientErrorException
}

private fun isConfigErrorException(e: Throwable?): Boolean {
return e is ConfigErrorException
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.34.0
version=0.34.1
2 changes: 1 addition & 1 deletion airbyte-cdk/java/airbyte-cdk/dependencies/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ dependencies {
api 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
api 'com.google.guava:guava:33.0.0-jre'
api 'commons-io:commons-io:2.15.1'
api ('io.airbyte.airbyte-protocol:protocol-models:0.7.0') { exclude group: 'com.google.api-client', module: 'google-api-client' }
api ('io.airbyte.airbyte-protocol:protocol-models:0.9.0') { exclude group: 'com.google.api-client', module: 'google-api-client' }
api 'javax.annotation:javax.annotation-api:1.3.2'
api 'org.apache.commons:commons-compress:1.25.0'
api 'org.apache.commons:commons-lang3:3.14.0'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.commons.exceptions

/**
* An exception that indicates a transient error was encountered. This exception is caught and emits
* an AirbyteTraceMessage.
*/
class TransientErrorException : RuntimeException {

constructor(displayMessage: String) : super(displayMessage)

constructor(displayMessage: String, exception: Throwable?) : super(displayMessage, exception)
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ java {
}

airbyteJavaConnector {
cdkVersionRequired = '0.31.5'
cdkVersionRequired = '0.34.1'
features = ['db-sources', 'datastore-postgres']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerImageTag: 3.3.32
dockerImageTag: 3.3.33
dockerRepository: airbyte/source-postgres
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
githubIssueLabel: source-postgres
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import org.junit.jupiter.api.Test;

@Order(1)
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH")
public class CdcPostgresSourceTest extends CdcSourceTest<PostgresSource, PostgresTestDatabase> {

protected BaseImage postgresImage;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH")
class PostgresJdbcSourceAcceptanceTest extends JdbcSourceAcceptanceTest<PostgresSource, PostgresTestDatabase> {

private static final String DATABASE = "new_db";
Expand Down
5 changes: 3 additions & 2 deletions docs/integrations/sources/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -304,8 +304,9 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp

## Changelog

| Version | Date | Pull Request | Subject |
|---------|------------|----------------------------------------------------------|-----------------------------------------------------------|
| Version | Date | Pull Request | Subject |
| ------- | ---------- | -------------------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | --- |
| 3.3.33 | 2024-05-07 | [38030](https://github.com/airbytehq/airbyte/pull/38030) | Mark PG hot standby error as transient. |
| 3.3.32 | 2024-04-30 | [37758](https://github.com/airbytehq/airbyte/pull/37758) | Correct previous release to disable debezium retries |
| 3.3.31 | 2024-04-30 | [37754](https://github.com/airbytehq/airbyte/pull/37754) | Add CDC logs |
| 3.3.30 | 2024-04-30 | [37726](https://github.com/airbytehq/airbyte/pull/37726) | Remove debezium retries |
Expand Down

0 comments on commit d26bd10

Please sign in to comment.