Skip to content

Commit

Permalink
[DB-sources] : Improve heartbeat logic (#40516)
Browse files Browse the repository at this point in the history
  • Loading branch information
akashkulk authored Jul 1, 2024
1 parent b66d528 commit 3a3e058
Show file tree
Hide file tree
Showing 20 changed files with 79 additions and 74 deletions.
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ corresponds to that version.

| Version | Date | Pull Request | Subject |
|:--------| :--------- | :--------------------------------------------------------- |:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.40.7 | 2024-07-01 | [40516](https://github.com/airbytehq/airbyte/pull/40516) | Remove dbz hearbeat. |
| 0.40.5 | 2024-06-26 | [\#40517](https://github.com/airbytehq/airbyte/pull/40517) | JdbcDatabase.executeWithinTransaction allows disabling SQL statement logging |
| 0.35.16 | 2024-06-25 | [\#40517](https://github.com/airbytehq/airbyte/pull/40517) | (backport) JdbcDatabase.executeWithinTransaction allows disabling SQL statement logging |
| 0.40.4 | 2024-06-18 | [\#40254](https://github.com/airbytehq/airbyte/pull/40254) | Destinations: Do not throw on unrecognized airbyte message type (ignore message instead) |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ object DbAnalyticsUtils {
const val CDC_CURSOR_INVALID_KEY: String = "db-sources-cdc-cursor-invalid"
const val DATA_TYPES_SERIALIZATION_ERROR_KEY = "db-sources-data-serialization-error"
const val CDC_SNAPSHOT_FORCE_SHUTDOWN_KEY = "db-sources-snapshot-force-shutdown"
const val DEBEZIUM_CLOSE_REASON_KEY = "db-sources-debezium-close-reason"

@JvmStatic
fun cdcCursorInvalidMessage(): AirbyteAnalyticsTraceMessage {
Expand All @@ -33,4 +34,9 @@ object DbAnalyticsUtils {
.withType(CDC_SNAPSHOT_FORCE_SHUTDOWN_KEY)
.withValue("1")
}

@JvmStatic
fun debeziumCloseReasonMessage(reason: String): AirbyteAnalyticsTraceMessage {
return AirbyteAnalyticsTraceMessage().withType(DEBEZIUM_CLOSE_REASON_KEY).withValue(reason)
}
}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.40.5
version=0.40.7
Original file line number Diff line number Diff line change
Expand Up @@ -80,20 +80,21 @@ class AirbyteDebeziumHandler<T>(
cdcSavedInfoFetcher.savedOffset,
if (addDbNameToOffsetState)
Optional.ofNullable<String>(config[JdbcUtils.DATABASE_KEY].asText())
else Optional.empty<String>()
else Optional.empty<String>(),
)
val schemaHistoryManager: Optional<AirbyteSchemaHistoryStorage> =
if (trackSchemaHistory)
Optional.of<AirbyteSchemaHistoryStorage>(
AirbyteSchemaHistoryStorage.Companion.initializeDBHistory(
cdcSavedInfoFetcher.savedSchemaHistory,
cdcStateHandler.compressSchemaHistoryForState()
)
cdcStateHandler.compressSchemaHistoryForState(),
),
)
else Optional.empty<AirbyteSchemaHistoryStorage>()
val publisher = DebeziumRecordPublisher(debeziumPropertiesManager)
val queue: CapacityReportingBlockingQueue<ChangeEvent<String?, String?>> =
CapacityReportingBlockingQueue(queueSize)

publisher.start(queue, offsetManager, schemaHistoryManager)
// handle state machine around pub/sub logic.
val eventIterator: AutoCloseableIterator<ChangeEventWithMetadata> =
Expand All @@ -102,13 +103,14 @@ class AirbyteDebeziumHandler<T>(
targetPosition,
{ publisher.hasClosed() },
DebeziumShutdownProcedure(queue, { publisher.close() }, { publisher.hasClosed() }),
firstRecordWaitTime
firstRecordWaitTime,
config
)

val syncCheckpointDuration =
if (config.has(DebeziumIteratorConstants.SYNC_CHECKPOINT_DURATION_PROPERTY))
Duration.ofSeconds(
config[DebeziumIteratorConstants.SYNC_CHECKPOINT_DURATION_PROPERTY].asLong()
config[DebeziumIteratorConstants.SYNC_CHECKPOINT_DURATION_PROPERTY].asLong(),
)
else DebeziumIteratorConstants.SYNC_CHECKPOINT_DURATION
val syncCheckpointRecords =
Expand All @@ -122,7 +124,7 @@ class AirbyteDebeziumHandler<T>(
targetPosition,
eventConverter,
offsetManager,
schemaHistoryManager
schemaHistoryManager,
)

// Usually sourceStateIterator requires airbyteStream as input. For DBZ iterator, stream is
Expand All @@ -133,7 +135,7 @@ class AirbyteDebeziumHandler<T>(
eventIterator,
null,
messageProducer,
StateEmitFrequency(syncCheckpointRecords, syncCheckpointDuration)
StateEmitFrequency(syncCheckpointRecords, syncCheckpointDuration),
)
return AutoCloseableIterators.fromIterator(iterator)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@
*/
package io.airbyte.cdk.integrations.debezium.internals

import com.fasterxml.jackson.databind.JsonNode
import com.google.common.annotations.VisibleForTesting
import com.google.common.collect.AbstractIterator
import io.airbyte.cdk.db.DbAnalyticsUtils.debeziumCloseReasonMessage
import io.airbyte.cdk.integrations.base.AirbyteTraceMessageUtility
import io.airbyte.cdk.integrations.debezium.CdcTargetPosition
import io.airbyte.commons.lang.MoreBooleans
import io.airbyte.commons.util.AutoCloseableIterator
Expand Down Expand Up @@ -36,6 +39,7 @@ class DebeziumRecordIterator<T>(
private val publisherStatusSupplier: Supplier<Boolean>,
private val debeziumShutdownProcedure: DebeziumShutdownProcedure<ChangeEvent<String?, String?>>,
private val firstRecordWaitTime: Duration,
private val config: JsonNode
) : AbstractIterator<ChangeEventWithMetadata>(), AutoCloseableIterator<ChangeEventWithMetadata> {
private val heartbeatEventSourceField: MutableMap<Class<out ChangeEvent<*, *>?>, Field?> =
HashMap(1)
Expand Down Expand Up @@ -82,7 +86,8 @@ class DebeziumRecordIterator<T>(
String.format(
"No records were returned by Debezium in the timeout seconds %s, closing the engine and iterator",
waitTime.seconds
)
),
DebeziumCloseReason.TIMEOUT
)
}
LOGGER.info { "no record found. polling again." }
Expand All @@ -101,12 +106,16 @@ class DebeziumRecordIterator<T>(
// too long
if (targetPosition.reachedTargetPosition(heartbeatPos)) {
requestClose(
"Closing: Heartbeat indicates sync is done by reaching the target position"
"Closing: Heartbeat indicates sync is done by reaching the target position",
DebeziumCloseReason.HEARTBEAT_REACHED_TARGET_POSITION
)
} else if (
heartbeatPos == this.lastHeartbeatPosition && heartbeatPosNotChanging()
) {
requestClose("Closing: Heartbeat indicates sync is not progressing")
requestClose(
"Closing: Heartbeat indicates sync is not progressing",
DebeziumCloseReason.HEARTBEAT_NOT_PROGRESSING
)
}

if (heartbeatPos != lastHeartbeatPosition) {
Expand All @@ -122,7 +131,10 @@ class DebeziumRecordIterator<T>(
// if the last record matches the target file position, it is time to tell the producer
// to shutdown.
if (targetPosition.reachedTargetPosition(changeEventWithMetadata)) {
requestClose("Closing: Change event reached target position")
requestClose(
"Closing: Change event reached target position",
DebeziumCloseReason.CHANGE_EVENT_REACHED_TARGET_POSITION
)
}
this.tsLastHeartbeat = null
this.receivedFirstRecord = true
Expand Down Expand Up @@ -175,7 +187,7 @@ class DebeziumRecordIterator<T>(
*/
@Throws(Exception::class)
override fun close() {
requestClose("Closing: Iterator closing")
requestClose("Closing: Iterator closing", DebeziumCloseReason.ITERATOR_CLOSE)
}

private fun isHeartbeatEvent(event: ChangeEvent<String?, String?>): Boolean {
Expand All @@ -185,23 +197,25 @@ class DebeziumRecordIterator<T>(
}

private fun heartbeatPosNotChanging(): Boolean {
if (this.tsLastHeartbeat == null) {
// Closing debezium due to heartbeat position not changing only exists as an escape hatch
// for
// testing setups. In production, we rely on the platform heartbeats to kill the sync
if (!isTest() || this.tsLastHeartbeat == null) {
return false
}
val timeElapsedSinceLastHeartbeatTs =
Duration.between(this.tsLastHeartbeat, LocalDateTime.now())
LOGGER.info {
"Time since last hb_pos change ${timeElapsedSinceLastHeartbeatTs.toSeconds()}s"
}
// wait time for no change in heartbeat position is half of initial waitTime
return timeElapsedSinceLastHeartbeatTs.compareTo(firstRecordWaitTime.dividedBy(2)) > 0
}

private fun requestClose(closeLogMessage: String) {
private fun requestClose(closeLogMessage: String, closeReason: DebeziumCloseReason) {
if (signalledDebeziumEngineShutdown) {
return
}
LOGGER.info { closeLogMessage }
AirbyteTraceMessageUtility.emitAnalyticsTrace(
debeziumCloseReasonMessage(closeReason.toString())
)
debeziumShutdownProcedure.initiateShutdownProcedure()
signalledDebeziumEngineShutdown = true
}
Expand All @@ -212,6 +226,10 @@ class DebeziumRecordIterator<T>(
}
}

private fun isTest(): Boolean {
return config.has("is_test") && config["is_test"].asBoolean()
}

/**
* [DebeziumRecordIterator.heartbeatEventSourceField] acts as a cache so that we avoid using
* reflection to setAccessible for each event
Expand Down Expand Up @@ -246,5 +264,13 @@ class DebeziumRecordIterator<T>(
}
}

enum class DebeziumCloseReason() {
TIMEOUT,
ITERATOR_CLOSE,
HEARTBEAT_REACHED_TARGET_POSITION,
CHANGE_EVENT_REACHED_TARGET_POSITION,
HEARTBEAT_NOT_PROGRESSING
}

companion object {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
*/
package io.airbyte.cdk.integrations.debezium.internals

import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.ObjectMapper
import io.airbyte.cdk.integrations.debezium.CdcTargetPosition
import io.debezium.engine.ChangeEvent
import java.time.Duration
Expand Down Expand Up @@ -34,6 +36,7 @@ class DebeziumRecordIteratorTest {
{ false },
mock(),
Duration.ZERO,
getTestConfig(), // Heartbeats should not be ignored for tests.
)
val lsn =
debeziumRecordIterator.getHeartbeatPosition(
Expand All @@ -44,7 +47,7 @@ class DebeziumRecordIteratorTest {
Collections.singletonMap("lsn", 358824993496L),
null,
null,
null
null,
)

override fun key(): String? {
Expand All @@ -62,9 +65,15 @@ class DebeziumRecordIteratorTest {
fun sourceRecord(): SourceRecord {
return sourceRecord
}
}
},
)

Assertions.assertEquals(lsn, 358824993496L)
}

fun getTestConfig(): JsonNode {
val mapper: ObjectMapper = ObjectMapper()
val testConfig = "{\"is_test\": true}"
return mapper.readTree(testConfig)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.38.1'
cdkVersionRequired = '0.40.7'
features = ['db-sources', 'datastore-mongo']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e
dockerImageTag: 1.4.1
dockerImageTag: 1.4.2
dockerRepository: airbyte/source-mongodb-v2
documentationUrl: https://docs.airbyte.com/integrations/sources/mongodb-v2
githubIssueLabel: source-mongodb-v2
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mssql/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.40.1'
cdkVersionRequired = '0.40.7'
features = ['db-sources']
useLocalCdk = false
}
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mssql/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
dockerImageTag: 4.0.33
dockerImageTag: 4.0.34
dockerRepository: airbyte/source-mssql
documentationUrl: https://docs.airbyte.com/integrations/sources/mssql
githubIssueLabel: source-mssql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@
import io.airbyte.cdk.integrations.base.Source;
import io.airbyte.cdk.integrations.base.adaptive.AdaptiveSourceRunner;
import io.airbyte.cdk.integrations.base.ssh.SshWrappedSource;
import io.airbyte.cdk.integrations.debezium.AirbyteDebeziumHandler;
import io.airbyte.cdk.integrations.debezium.CdcStateHandler;
import io.airbyte.cdk.integrations.debezium.CdcTargetPosition;
import io.airbyte.cdk.integrations.debezium.internals.*;
import io.airbyte.cdk.integrations.source.jdbc.AbstractJdbcSource;
import io.airbyte.cdk.integrations.source.relationaldb.InitialLoadHandler;
Expand All @@ -49,7 +46,6 @@
import io.airbyte.commons.stream.AirbyteStreamStatusHolder;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.commons.util.AutoCloseableIterators;
import io.airbyte.commons.util.MoreIterators;
import io.airbyte.integrations.source.mssql.cursor_based.MssqlCursorBasedStateManager;
import io.airbyte.integrations.source.mssql.initialsync.MssqlInitialLoadHandler;
import io.airbyte.integrations.source.mssql.initialsync.MssqlInitialLoadStateManager;
Expand All @@ -60,8 +56,6 @@
import io.airbyte.protocol.models.CommonField;
import io.airbyte.protocol.models.v0.*;
import io.airbyte.protocol.models.v0.AirbyteStateMessage.AirbyteStateType;
import io.debezium.connector.sqlserver.Lsn;
import io.debezium.engine.ChangeEvent;
import java.io.IOException;
import java.net.URI;
import java.security.KeyStoreException;
Expand All @@ -71,7 +65,6 @@
import java.time.Duration;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -456,42 +449,6 @@ protected void assertSqlServerAgentRunning(final JdbcDatabase database) throws S
return super.getIncrementalIterators(database, catalog, tableNameToTable, stateManager, emittedAt);
}

public AutoCloseableIterator<AirbyteMessage> getDebeziumSnapshotIterators(
final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final CdcTargetPosition<Lsn> targetPosition,
final Duration firstRecordWaitTime,
final Duration subsequentRecordWaitTime,
final MssqlCdcConnectorMetadataInjector cdcMetadataInjector,
final Properties properties,
final CdcStateHandler cdcStateHandler,
final Instant emittedAt) {

LOGGER.info("Running snapshot for " + catalog.getStreams().size() + " new tables");
final var queue = new LinkedBlockingQueue<ChangeEvent<String, String>>(AirbyteDebeziumHandler.QUEUE_CAPACITY);

final AirbyteFileOffsetBackingStore offsetManager = AirbyteFileOffsetBackingStore.initializeDummyStateForSnapshotPurpose();
final var emptyHistory = new AirbyteSchemaHistoryStorage.SchemaHistory<Optional<JsonNode>>(Optional.empty(), false);
final var schemaHistoryManager = AirbyteSchemaHistoryStorage.initializeDBHistory(emptyHistory, cdcStateHandler.compressSchemaHistoryForState());
final var propertiesManager = new RelationalDbDebeziumPropertiesManager(properties, config, catalog, Collections.emptyList());
final DebeziumRecordPublisher tableSnapshotPublisher = new DebeziumRecordPublisher(propertiesManager);
tableSnapshotPublisher.start(queue, offsetManager, Optional.of(schemaHistoryManager));

final AutoCloseableIterator<ChangeEventWithMetadata> eventIterator = new DebeziumRecordIterator<>(
queue,
targetPosition,
tableSnapshotPublisher::hasClosed,
new DebeziumShutdownProcedure<>(queue, tableSnapshotPublisher::close, tableSnapshotPublisher::hasClosed),
firstRecordWaitTime);

final var eventConverter = new RelationalDbDebeziumEventConverter(cdcMetadataInjector, emittedAt);
return AutoCloseableIterators.concatWithEagerClose(
AutoCloseableIterators.transform(eventIterator, eventConverter::toAirbyteMessage),
AutoCloseableIterators.fromIterator(
MoreIterators.singletonIteratorFromSupplier(
cdcStateHandler::saveStateAfterCompletionOfSnapshotOfNewStreams)));
}

@Override
protected int getStateEmissionFrequency() {
return INTERMEDIATE_STATE_EMISSION_FREQUENCY;
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.40.1'
cdkVersionRequired = '0.40.7'
features = ['db-sources']
useLocalCdk = false
}
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerImageTag: 3.4.11
dockerImageTag: 3.4.12
dockerRepository: airbyte/source-mysql
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
githubIssueLabel: source-mysql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ java {
}

airbyteJavaConnector {
cdkVersionRequired = '0.40.1'
cdkVersionRequired = '0.40.7'
features = ['db-sources', 'datastore-postgres']
useLocalCdk = false
}
Expand Down
Loading

0 comments on commit 3a3e058

Please sign in to comment.