Skip to content

Commit

Permalink
bulk-cdk: CDC-related fixes and tweaks (#48602)
Browse files Browse the repository at this point in the history
  • Loading branch information
postamar authored Nov 22, 2024
1 parent 9177792 commit 8aec55d
Show file tree
Hide file tree
Showing 7 changed files with 189 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ sealed class FeedBootstrap<T : Feed>(
* to the next. Not doing this generates a lot of garbage and the increased GC activity has a
* measurable impact on performance.
*/
private inner class EfficientStreamRecordConsumer(val stream: Stream) : StreamRecordConsumer {
private inner class EfficientStreamRecordConsumer(override val stream: Stream) :
StreamRecordConsumer {

override fun accept(recordData: ObjectNode, changes: Map<Field, FieldValueChange>?) {
if (changes.isNullOrEmpty()) {
Expand Down Expand Up @@ -214,7 +215,10 @@ sealed class FeedBootstrap<T : Feed>(
* b) field value changes and the motivating reason for these in the record metadata.
* ```
*/
fun interface StreamRecordConsumer {
interface StreamRecordConsumer {

val stream: Stream

fun accept(recordData: ObjectNode, changes: Map<Field, FieldValueChange>?)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ import io.airbyte.cdk.read.ConcurrencyResource
import io.airbyte.cdk.read.PartitionReadCheckpoint
import io.airbyte.cdk.read.PartitionReader
import io.airbyte.cdk.read.StreamRecordConsumer
import io.airbyte.cdk.util.Jsons
import io.debezium.embedded.EmbeddedEngineChangeEvent
import io.airbyte.protocol.models.v0.StreamDescriptor
import io.debezium.engine.ChangeEvent
import io.debezium.engine.DebeziumEngine
import io.debezium.engine.format.Json
Expand Down Expand Up @@ -132,68 +131,13 @@ class CdcPartitionReader<T : Comparable<T>>(
private val coroutineContext: CoroutineContext,
) : Consumer<ChangeEvent<String?, String?>> {

override fun accept(event: ChangeEvent<String?, String?>) {
numEvents.incrementAndGet()
// Get SourceRecord object if possible.
// This object is the preferred way to obtain the current position.
val sourceRecord: SourceRecord? =
(event as? EmbeddedEngineChangeEvent<*, *, *>)?.sourceRecord()
if (sourceRecord == null) numEventsWithoutSourceRecord.incrementAndGet()
// Debezium outputs a tombstone event that has a value of null. This is an artifact
// of how it interacts with kafka. We want to ignore it. More on the tombstone:
// https://debezium.io/documentation/reference/stable/transformations/event-flattening.html
val debeziumRecordValue: DebeziumRecordValue? =
event.value()?.let { DebeziumRecordValue(Jsons.readTree(it)) }
// Process records, ignoring heartbeats which are only used for completion checks.
val eventType: EventType = run {
if (debeziumRecordValue == null) return@run EventType.TOMBSTONE
if (debeziumRecordValue.isHeartbeat) return@run EventType.HEARTBEAT
val debeziumRecordKey = DebeziumRecordKey(Jsons.readTree(event.key()))
val deserializedRecord: DeserializedRecord =
readerOps.deserialize(debeziumRecordKey, debeziumRecordValue)
?: return@run EventType.RECORD_DISCARDED_BY_DESERIALIZE
val streamRecordConsumer: StreamRecordConsumer =
streamRecordConsumers[deserializedRecord.streamID]
?: return@run EventType.RECORD_DISCARDED_BY_STREAM_ID
streamRecordConsumer.accept(deserializedRecord.data, deserializedRecord.changes)
return@run EventType.RECORD_EMITTED
}
override fun accept(changeEvent: ChangeEvent<String?, String?>) {
val event = DebeziumEvent(changeEvent)
val eventType: EventType = emitRecord(event)
// Update counters.
when (eventType) {
EventType.TOMBSTONE -> numTombstones
EventType.HEARTBEAT -> numHeartbeats
EventType.RECORD_DISCARDED_BY_DESERIALIZE,
EventType.RECORD_DISCARDED_BY_STREAM_ID -> numDiscardedRecords
EventType.RECORD_EMITTED -> numEmittedRecords
}.incrementAndGet()
updateCounters(event, eventType)
// Look for reasons to close down the engine.
val closeReason: CloseReason = run {
if (input.isSynthetic && eventType != EventType.HEARTBEAT) {
// Special case where the engine started with a synthetic offset:
// don't even consider closing the engine unless handling a heartbeat event.
// For some databases, such as Oracle, Debezium actually needs to snapshot the
// schema in order to collect the database schema history and there's no point
// in interrupting it until the snapshot is done.
return
}
if (!coroutineContext.isActive) {
return@run CloseReason.TIMEOUT
}
val currentPosition: T? = position(sourceRecord) ?: position(debeziumRecordValue)
if (currentPosition == null || currentPosition < upperBound) {
return
}
// Close because the current event is past the sync upper bound.
when (eventType) {
EventType.TOMBSTONE,
EventType.HEARTBEAT ->
CloseReason.HEARTBEAT_OR_TOMBSTONE_REACHED_TARGET_POSITION
EventType.RECORD_EMITTED,
EventType.RECORD_DISCARDED_BY_DESERIALIZE,
EventType.RECORD_DISCARDED_BY_STREAM_ID ->
CloseReason.RECORD_REACHED_TARGET_POSITION
}
}
val closeReason: CloseReason = findCloseReason(event, eventType) ?: return
// At this point, if we haven't returned already, we want to close down the engine.
if (!closeReasonReference.compareAndSet(null, closeReason)) {
// An earlier event has already triggered closing down the engine, do nothing.
Expand All @@ -205,6 +149,94 @@ class CdcPartitionReader<T : Comparable<T>>(
Thread({ engine.close() }, "debezium-close").start()
}

private fun emitRecord(event: DebeziumEvent): EventType {
if (event.isTombstone) {
// Debezium outputs a tombstone event that has a value of null. This is an artifact
// of how it interacts with kafka. We want to ignore it. More on the tombstone:
// https://debezium.io/documentation/reference/stable/transformations/event-flattening.html
return EventType.TOMBSTONE
}
if (event.isHeartbeat) {
// Heartbeats are only used for their position.
return EventType.HEARTBEAT
}
if (event.key == null) {
// Sometimes, presumably due to bugs in Debezium, the key isn't valid JSON.
return EventType.KEY_JSON_INVALID
}
if (event.value == null) {
// Sometimes, presumably due to bugs in Debezium, the value isn't valid JSON.
return EventType.VALUE_JSON_INVALID
}
val streamRecordConsumer: StreamRecordConsumer =
findStreamRecordConsumer(event.key, event.value)
// Ignore events which can't be mapped to a stream.
?: return EventType.RECORD_DISCARDED_BY_STREAM_ID
val deserializedRecord: DeserializedRecord =
readerOps.deserialize(event.key, event.value, streamRecordConsumer.stream)
// Ignore events which can't be deserialized into records.
?: return EventType.RECORD_DISCARDED_BY_DESERIALIZE
// Emit the record at the end of the happy path.
streamRecordConsumer.accept(deserializedRecord.data, deserializedRecord.changes)
return EventType.RECORD_EMITTED
}

private fun findStreamRecordConsumer(
key: DebeziumRecordKey,
value: DebeziumRecordValue
): StreamRecordConsumer? {
val name: String = readerOps.findStreamName(key, value) ?: return null
val namespace: String? = readerOps.findStreamNamespace(key, value)
val desc: StreamDescriptor = StreamDescriptor().withNamespace(namespace).withName(name)
val streamID: StreamIdentifier = StreamIdentifier.from(desc)
return streamRecordConsumers[streamID]
}

private fun updateCounters(event: DebeziumEvent, eventType: EventType) {
numEvents.incrementAndGet()
if (event.sourceRecord == null) {
numEventsWithoutSourceRecord.incrementAndGet()
}
when (eventType) {
EventType.TOMBSTONE -> numTombstones
EventType.HEARTBEAT -> numHeartbeats
EventType.KEY_JSON_INVALID,
EventType.VALUE_JSON_INVALID,
EventType.RECORD_DISCARDED_BY_DESERIALIZE,
EventType.RECORD_DISCARDED_BY_STREAM_ID -> numDiscardedRecords
EventType.RECORD_EMITTED -> numEmittedRecords
}.incrementAndGet()
}

private fun findCloseReason(event: DebeziumEvent, eventType: EventType): CloseReason? {
if (input.isSynthetic && eventType != EventType.HEARTBEAT) {
// Special case where the engine started with a synthetic offset:
// don't even consider closing the engine unless handling a heartbeat event.
// For some databases, such as Oracle, Debezium actually needs to snapshot the
// schema in order to collect the database schema history and there's no point
// in interrupting it until the snapshot is done.
return null
}
if (!coroutineContext.isActive) {
return CloseReason.TIMEOUT
}
val currentPosition: T? = position(event.sourceRecord) ?: position(event.value)
if (currentPosition == null || currentPosition < upperBound) {
return null
}
// Close because the current event is past the sync upper bound.
return when (eventType) {
EventType.TOMBSTONE,
EventType.HEARTBEAT -> CloseReason.HEARTBEAT_OR_TOMBSTONE_REACHED_TARGET_POSITION
EventType.KEY_JSON_INVALID,
EventType.VALUE_JSON_INVALID,
EventType.RECORD_EMITTED,
EventType.RECORD_DISCARDED_BY_DESERIALIZE,
EventType.RECORD_DISCARDED_BY_STREAM_ID ->
CloseReason.RECORD_REACHED_TARGET_POSITION
}
}

private fun position(sourceRecord: SourceRecord?): T? {
if (sourceRecord == null) return null
val sourceRecordPosition: T? = readerOps.position(sourceRecord)
Expand All @@ -229,6 +261,8 @@ class CdcPartitionReader<T : Comparable<T>>(
private enum class EventType {
TOMBSTONE,
HEARTBEAT,
KEY_JSON_INVALID,
VALUE_JSON_INVALID,
RECORD_DISCARDED_BY_DESERIALIZE,
RECORD_DISCARDED_BY_STREAM_ID,
RECORD_EMITTED,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,43 @@ package io.airbyte.cdk.read.cdc

import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node.NullNode
import io.airbyte.cdk.util.Jsons
import io.debezium.embedded.EmbeddedEngineChangeEvent
import io.debezium.engine.ChangeEvent
import io.debezium.relational.history.HistoryRecord
import org.apache.kafka.connect.source.SourceRecord

/** Convenience wrapper around [ChangeEvent]. */
class DebeziumEvent(event: ChangeEvent<String?, String?>) {

/** This [SourceRecord] object is the preferred way to obtain the current position. */
val sourceRecord: SourceRecord? = (event as? EmbeddedEngineChangeEvent<*, *, *>)?.sourceRecord()

val key: DebeziumRecordKey? =
event
.key()
?.let { runCatching { Jsons.readTree(it) }.getOrNull() }
?.let(::DebeziumRecordKey)

val value: DebeziumRecordValue? =
event
.value()
?.let { runCatching { Jsons.readTree(it) }.getOrNull() }
?.let(::DebeziumRecordValue)

/**
* Debezium can output a tombstone event that has a value of null. This is an artifact of how it
* interacts with kafka. We want to ignore it. More on the tombstone:
* https://debezium.io/documentation/reference/stable/transformations/event-flattening.html
*/
val isTombstone: Boolean = event.value() == null

/**
* True if this is a Debezium heartbeat event, or the equivalent thereof. In any case, such
* events are only used for their position value and for triggering timeouts.
*/
val isHeartbeat: Boolean = value?.source?.isNull == true
}

/** [DebeziumRecordKey] wraps a Debezium change data event key. */
@JvmInline
Expand All @@ -25,13 +61,6 @@ value class DebeziumRecordKey(val wrapped: JsonNode) {
@JvmInline
value class DebeziumRecordValue(val wrapped: JsonNode) {

/**
* True if this is a Debezium heartbeat event, or the equivalent thereof. In any case, such
* events are only used for their position value and for triggering timeouts.
*/
val isHeartbeat: Boolean
get() = source.isNull

/** The datum prior to this event; null for insertions. */
val before: JsonNode
get() = element("before")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package io.airbyte.cdk.read.cdc

import com.fasterxml.jackson.databind.node.ObjectNode
import io.airbyte.cdk.StreamIdentifier
import io.airbyte.cdk.command.OpaqueStateValue
import io.airbyte.cdk.discover.Field
import io.airbyte.cdk.read.FieldValueChange
Expand Down Expand Up @@ -35,7 +34,23 @@ interface CdcPartitionReaderDebeziumOperations<T : Comparable<T>> {
*
* Returning null means that the event should be treated like a heartbeat.
*/
fun deserialize(key: DebeziumRecordKey, value: DebeziumRecordValue): DeserializedRecord?
fun deserialize(
key: DebeziumRecordKey,
value: DebeziumRecordValue,
stream: Stream,
): DeserializedRecord?

/** Identifies the namespace of the stream that this event belongs to, if applicable. */
fun findStreamNamespace(
key: DebeziumRecordKey,
value: DebeziumRecordValue,
): String?

/** Identifies the null of the stream that this event belongs to, if applicable. */
fun findStreamName(
key: DebeziumRecordKey,
value: DebeziumRecordValue,
): String?

/** Maps a [DebeziumState] to an [OpaqueStateValue]. */
fun serialize(debeziumState: DebeziumState): OpaqueStateValue
Expand All @@ -49,7 +64,6 @@ interface CdcPartitionReaderDebeziumOperations<T : Comparable<T>> {

/** [DeserializedRecord]s are used to generate Airbyte RECORD messages. */
data class DeserializedRecord(
val streamID: StreamIdentifier,
val data: ObjectNode,
val changes: Map<Field, FieldValueChange>,
)
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import io.airbyte.cdk.discover.TestMetaFieldDecorator
import io.airbyte.cdk.output.BufferingOutputConsumer
import io.airbyte.cdk.read.ConcurrencyResource
import io.airbyte.cdk.read.ConfiguredSyncMode
import io.airbyte.cdk.read.FieldValueChange
import io.airbyte.cdk.read.Global
import io.airbyte.cdk.read.PartitionReadCheckpoint
import io.airbyte.cdk.read.PartitionReader
Expand Down Expand Up @@ -169,13 +170,20 @@ sealed class CdcPartitionReaderTest<T : Comparable<T>, C : AutoCloseable>(
val streamRecordConsumers: Map<StreamIdentifier, StreamRecordConsumer> =
mapOf(
stream.id to
StreamRecordConsumer { recordData: ObjectNode, _ ->
outputConsumer.accept(
AirbyteRecordMessage()
.withStream(stream.name)
.withNamespace(stream.namespace)
.withData(recordData)
)
object : StreamRecordConsumer {
override val stream: Stream = this@CdcPartitionReaderTest.stream

override fun accept(
recordData: ObjectNode,
changes: Map<Field, FieldValueChange>?
) {
outputConsumer.accept(
AirbyteRecordMessage()
.withStream(stream.name)
.withNamespace(stream.namespace)
.withData(recordData)
)
}
}
)
val reader =
Expand Down Expand Up @@ -245,6 +253,7 @@ sealed class CdcPartitionReaderTest<T : Comparable<T>, C : AutoCloseable>(
override fun deserialize(
key: DebeziumRecordKey,
value: DebeziumRecordValue,
stream: Stream,
): DeserializedRecord {
val id: Int = key.element("id").asInt()
val after: Int? = value.after["v"]?.asInt()
Expand All @@ -257,12 +266,17 @@ sealed class CdcPartitionReaderTest<T : Comparable<T>, C : AutoCloseable>(
Update(id, after)
}
return DeserializedRecord(
streamID = stream.id,
data = Jsons.valueToTree(record) as ObjectNode,
changes = emptyMap(),
)
}

override fun findStreamNamespace(key: DebeziumRecordKey, value: DebeziumRecordValue): String? =
stream.id.namespace

override fun findStreamName(key: DebeziumRecordKey, value: DebeziumRecordValue): String? =
stream.id.name

override fun serialize(debeziumState: DebeziumState): OpaqueStateValue =
Jsons.valueToTree(
mapOf(
Expand Down Expand Up @@ -666,7 +680,8 @@ class CdcPartitionReaderMongoTest :

override fun deserialize(
key: DebeziumRecordKey,
value: DebeziumRecordValue
value: DebeziumRecordValue,
stream: Stream,
): DeserializedRecord {
val id: Int = key.element("id").asInt()
val record: Record =
Expand All @@ -691,7 +706,6 @@ class CdcPartitionReaderMongoTest :
}
}
return DeserializedRecord(
streamID = stream.id,
data = Jsons.valueToTree(record),
changes = emptyMap(),
)
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerImageTag: 3.9.0-rc.18
dockerImageTag: 3.9.0-rc.19
dockerRepository: airbyte/source-mysql
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
githubIssueLabel: source-mysql
Expand Down
Loading

0 comments on commit 8aec55d

Please sign in to comment.