From c7f0eb7004d4a3d99b188b78049267b42f50e41b Mon Sep 17 00:00:00 2001
From: Rodi Reich Zilberman <867491+rodireich@users.noreply.github.com>
Date: Wed, 4 Dec 2024 14:55:00 -0800
Subject: [PATCH] Nullify failed column conversion and mark with meta change in
record (#48791)
---
.../airbyte/cdk/read/JdbcPartitionReader.kt | 9 +++++----
.../io/airbyte/cdk/read/SelectQuerier.kt | 19 +++++++++++++++++--
.../io/airbyte/cdk/read/TestFixtures.kt | 2 ++
.../connectors/source-mysql/metadata.yaml | 2 +-
.../source/mysql/MysqlJdbcPartitionFactory.kt | 19 +++++++++++--------
5 files changed, 36 insertions(+), 15 deletions(-)
diff --git a/airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk/read/JdbcPartitionReader.kt b/airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk/read/JdbcPartitionReader.kt
index e5e05a1e2569..832c208bb41e 100644
--- a/airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk/read/JdbcPartitionReader.kt
+++ b/airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk/read/JdbcPartitionReader.kt
@@ -4,6 +4,7 @@ package io.airbyte.cdk.read
import com.fasterxml.jackson.databind.node.ObjectNode
import io.airbyte.cdk.TransientErrorException
import io.airbyte.cdk.command.OpaqueStateValue
+import io.airbyte.cdk.discover.Field
import java.time.Instant
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicLong
@@ -36,8 +37,8 @@ sealed class JdbcPartitionReader
>(
return PartitionReader.TryAcquireResourcesStatus.READY_TO_RUN
}
- fun out(record: ObjectNode) {
- streamRecordConsumer.accept(record, changes = null)
+ fun out(record: ObjectNode, changes: Map?) {
+ streamRecordConsumer.accept(record, changes)
}
override fun releaseResources() {
@@ -80,7 +81,7 @@ class JdbcNonResumablePartitionReader>(
)
.use { result: SelectQuerier.Result ->
for (record in result) {
- out(record)
+ out(record, result.changes)
numRecords.incrementAndGet()
}
}
@@ -127,7 +128,7 @@ class JdbcResumablePartitionReader
>(
)
.use { result: SelectQuerier.Result ->
for (record in result) {
- out(record)
+ out(record, result.changes)
lastRecord.set(record)
// Check activity periodically to handle timeout.
if (numRecords.incrementAndGet() % fetchSize == 0L) {
diff --git a/airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk/read/SelectQuerier.kt b/airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk/read/SelectQuerier.kt
index caa44bf22f47..d412ae54ba83 100644
--- a/airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk/read/SelectQuerier.kt
+++ b/airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk/read/SelectQuerier.kt
@@ -3,6 +3,7 @@ package io.airbyte.cdk.read
import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node.ObjectNode
+import io.airbyte.cdk.discover.Field
import io.airbyte.cdk.jdbc.JdbcConnectionFactory
import io.airbyte.cdk.jdbc.JdbcFieldType
import io.airbyte.cdk.util.Jsons
@@ -27,7 +28,9 @@ interface SelectQuerier {
val fetchSize: Int? = null,
)
- interface Result : Iterator, AutoCloseable
+ interface Result : Iterator, AutoCloseable {
+ val changes: Map?
+ }
}
/** Default implementation of [SelectQuerier]. */
@@ -50,6 +53,9 @@ class JdbcSelectQuerier(
var stmt: PreparedStatement? = null
var rs: ResultSet? = null
val reusable: ObjectNode? = Jsons.objectNode().takeIf { parameters.reuseResultObject }
+ val metaChanges: MutableMap = mutableMapOf()
+ override val changes: Map?
+ get() = metaChanges
init {
log.info { "Querying ${q.sql}" }
@@ -94,6 +100,7 @@ class JdbcSelectQuerier(
}
override fun next(): ObjectNode {
+ metaChanges.clear()
// Ensure that the current row in the ResultSet hasn't been read yet; advance if
// necessary.
if (!hasNext()) throw NoSuchElementException()
@@ -103,7 +110,15 @@ class JdbcSelectQuerier(
for (column in q.columns) {
log.debug { "Getting value #$colIdx for $column." }
val jdbcFieldType: JdbcFieldType<*> = column.type as JdbcFieldType<*>
- record.set(column.id, jdbcFieldType.get(rs!!, colIdx))
+ try {
+ record.set(column.id, jdbcFieldType.get(rs!!, colIdx))
+ } catch (e: Exception) {
+ record.set(column.id, Jsons.nullNode())
+ log.info {
+ "Failed to serialize column: ${column.id}, of type ${column.type}, with error ${e.message}"
+ }
+ metaChanges.set(column, FieldValueChange.RETRIEVAL_FAILURE_TOTAL)
+ }
colIdx++
}
// Flag that the current row has been read before returning.
diff --git a/airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/kotlin/io/airbyte/cdk/read/TestFixtures.kt b/airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/kotlin/io/airbyte/cdk/read/TestFixtures.kt
index fb9833b9a1fe..cb57c9777389 100644
--- a/airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/kotlin/io/airbyte/cdk/read/TestFixtures.kt
+++ b/airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/kotlin/io/airbyte/cdk/read/TestFixtures.kt
@@ -162,6 +162,8 @@ object TestFixtures {
override fun hasNext(): Boolean = wrapped.hasNext()
override fun next(): ObjectNode = wrapped.next()
override fun close() {}
+ override val changes: Map?
+ get() = null
}
}
}
diff --git a/airbyte-integrations/connectors/source-mysql/metadata.yaml b/airbyte-integrations/connectors/source-mysql/metadata.yaml
index 966183d4be50..6e5b3307e20c 100644
--- a/airbyte-integrations/connectors/source-mysql/metadata.yaml
+++ b/airbyte-integrations/connectors/source-mysql/metadata.yaml
@@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
- dockerImageTag: 3.9.0-rc.24
+ dockerImageTag: 3.9.0-rc.25
dockerRepository: airbyte/source-mysql
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
githubIssueLabel: source-mysql
diff --git a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlJdbcPartitionFactory.kt b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlJdbcPartitionFactory.kt
index 85cedec8dfea..63016589baa7 100644
--- a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlJdbcPartitionFactory.kt
+++ b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlJdbcPartitionFactory.kt
@@ -29,7 +29,6 @@ import io.micronaut.context.annotation.Primary
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter
import java.time.format.DateTimeFormatterBuilder
-import java.time.format.DateTimeParseException
import java.time.temporal.ChronoField
import java.util.*
import java.util.concurrent.ConcurrentHashMap
@@ -322,8 +321,12 @@ class MysqlJdbcPartitionFactory(
Jsons.valueToTree(stateValue?.toDouble())
}
LeafAirbyteSchemaType.BINARY -> {
- val ba = Base64.getDecoder().decode(stateValue!!)
- Jsons.valueToTree(ba)
+ try {
+ val ba = Base64.getDecoder().decode(stateValue!!)
+ Jsons.valueToTree(ba)
+ } catch (_: RuntimeException) {
+ Jsons.valueToTree(stateValue)
+ }
}
LeafAirbyteSchemaType.TIMESTAMP_WITHOUT_TIMEZONE -> {
val timestampInStatePattern = "yyyy-MM-dd'T'HH:mm:ss"
@@ -340,9 +343,9 @@ class MysqlJdbcPartitionFactory(
LocalDateTime.parse(stateValue, formatter)
.format(LocalDateTimeCodec.formatter)
)
- } catch (_: DateTimeParseException) {
+ } catch (_: RuntimeException) {
// Resolve to use the new format.
- Jsons.valueToTree(stateValue)
+ Jsons.valueToTree(stateValue)
}
}
LeafAirbyteSchemaType.TIMESTAMP_WITH_TIMEZONE -> {
@@ -356,12 +359,12 @@ class MysqlJdbcPartitionFactory(
.atOffset(java.time.ZoneOffset.UTC)
.format(OffsetDateTimeCodec.formatter)
)
- } catch (_: DateTimeParseException) {
+ } catch (_: RuntimeException) {
// Resolve to use the new format.
- Jsons.valueToTree(stateValue)
+ Jsons.valueToTree(stateValue)
}
}
- else -> Jsons.valueToTree(stateValue)
+ else -> Jsons.valueToTree(stateValue)
}
else ->
throw IllegalStateException(