Skip to content

Commit

Permalink
Nullify failed column conversion and mark with meta change in record (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
rodireich authored Dec 4, 2024
1 parent cad4c1d commit c7f0eb7
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package io.airbyte.cdk.read
import com.fasterxml.jackson.databind.node.ObjectNode
import io.airbyte.cdk.TransientErrorException
import io.airbyte.cdk.command.OpaqueStateValue
import io.airbyte.cdk.discover.Field
import java.time.Instant
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicLong
Expand Down Expand Up @@ -36,8 +37,8 @@ sealed class JdbcPartitionReader<P : JdbcPartition<*>>(
return PartitionReader.TryAcquireResourcesStatus.READY_TO_RUN
}

fun out(record: ObjectNode) {
streamRecordConsumer.accept(record, changes = null)
fun out(record: ObjectNode, changes: Map<Field, FieldValueChange>?) {
streamRecordConsumer.accept(record, changes)
}

override fun releaseResources() {
Expand Down Expand Up @@ -80,7 +81,7 @@ class JdbcNonResumablePartitionReader<P : JdbcPartition<*>>(
)
.use { result: SelectQuerier.Result ->
for (record in result) {
out(record)
out(record, result.changes)
numRecords.incrementAndGet()
}
}
Expand Down Expand Up @@ -127,7 +128,7 @@ class JdbcResumablePartitionReader<P : JdbcSplittablePartition<*>>(
)
.use { result: SelectQuerier.Result ->
for (record in result) {
out(record)
out(record, result.changes)
lastRecord.set(record)
// Check activity periodically to handle timeout.
if (numRecords.incrementAndGet() % fetchSize == 0L) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package io.airbyte.cdk.read

import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node.ObjectNode
import io.airbyte.cdk.discover.Field
import io.airbyte.cdk.jdbc.JdbcConnectionFactory
import io.airbyte.cdk.jdbc.JdbcFieldType
import io.airbyte.cdk.util.Jsons
Expand All @@ -27,7 +28,9 @@ interface SelectQuerier {
val fetchSize: Int? = null,
)

interface Result : Iterator<ObjectNode>, AutoCloseable
interface Result : Iterator<ObjectNode>, AutoCloseable {
val changes: Map<Field, FieldValueChange>?
}
}

/** Default implementation of [SelectQuerier]. */
Expand All @@ -50,6 +53,9 @@ class JdbcSelectQuerier(
var stmt: PreparedStatement? = null
var rs: ResultSet? = null
val reusable: ObjectNode? = Jsons.objectNode().takeIf { parameters.reuseResultObject }
val metaChanges: MutableMap<Field, FieldValueChange> = mutableMapOf()
override val changes: Map<Field, FieldValueChange>?
get() = metaChanges

init {
log.info { "Querying ${q.sql}" }
Expand Down Expand Up @@ -94,6 +100,7 @@ class JdbcSelectQuerier(
}

override fun next(): ObjectNode {
metaChanges.clear()
// Ensure that the current row in the ResultSet hasn't been read yet; advance if
// necessary.
if (!hasNext()) throw NoSuchElementException()
Expand All @@ -103,7 +110,15 @@ class JdbcSelectQuerier(
for (column in q.columns) {
log.debug { "Getting value #$colIdx for $column." }
val jdbcFieldType: JdbcFieldType<*> = column.type as JdbcFieldType<*>
record.set<JsonNode>(column.id, jdbcFieldType.get(rs!!, colIdx))
try {
record.set<JsonNode>(column.id, jdbcFieldType.get(rs!!, colIdx))
} catch (e: Exception) {
record.set<JsonNode>(column.id, Jsons.nullNode())
log.info {
"Failed to serialize column: ${column.id}, of type ${column.type}, with error ${e.message}"
}
metaChanges.set(column, FieldValueChange.RETRIEVAL_FAILURE_TOTAL)
}
colIdx++
}
// Flag that the current row has been read before returning.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ object TestFixtures {
override fun hasNext(): Boolean = wrapped.hasNext()
override fun next(): ObjectNode = wrapped.next()
override fun close() {}
override val changes: Map<Field, FieldValueChange>?
get() = null
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerImageTag: 3.9.0-rc.24
dockerImageTag: 3.9.0-rc.25
dockerRepository: airbyte/source-mysql
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
githubIssueLabel: source-mysql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import io.micronaut.context.annotation.Primary
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter
import java.time.format.DateTimeFormatterBuilder
import java.time.format.DateTimeParseException
import java.time.temporal.ChronoField
import java.util.*
import java.util.concurrent.ConcurrentHashMap
Expand Down Expand Up @@ -322,8 +321,12 @@ class MysqlJdbcPartitionFactory(
Jsons.valueToTree(stateValue?.toDouble())
}
LeafAirbyteSchemaType.BINARY -> {
val ba = Base64.getDecoder().decode(stateValue!!)
Jsons.valueToTree<BinaryNode>(ba)
try {
val ba = Base64.getDecoder().decode(stateValue!!)
Jsons.valueToTree<BinaryNode>(ba)
} catch (_: RuntimeException) {
Jsons.valueToTree<JsonNode>(stateValue)
}
}
LeafAirbyteSchemaType.TIMESTAMP_WITHOUT_TIMEZONE -> {
val timestampInStatePattern = "yyyy-MM-dd'T'HH:mm:ss"
Expand All @@ -340,9 +343,9 @@ class MysqlJdbcPartitionFactory(
LocalDateTime.parse(stateValue, formatter)
.format(LocalDateTimeCodec.formatter)
)
} catch (_: DateTimeParseException) {
} catch (_: RuntimeException) {
// Resolve to use the new format.
Jsons.valueToTree(stateValue)
Jsons.valueToTree<JsonNode>(stateValue)
}
}
LeafAirbyteSchemaType.TIMESTAMP_WITH_TIMEZONE -> {
Expand All @@ -356,12 +359,12 @@ class MysqlJdbcPartitionFactory(
.atOffset(java.time.ZoneOffset.UTC)
.format(OffsetDateTimeCodec.formatter)
)
} catch (_: DateTimeParseException) {
} catch (_: RuntimeException) {
// Resolve to use the new format.
Jsons.valueToTree(stateValue)
Jsons.valueToTree<JsonNode>(stateValue)
}
}
else -> Jsons.valueToTree(stateValue)
else -> Jsons.valueToTree<JsonNode>(stateValue)
}
else ->
throw IllegalStateException(
Expand Down

0 comments on commit c7f0eb7

Please sign in to comment.