diff --git a/airbyte-cdk/java/airbyte-cdk/README.md b/airbyte-cdk/java/airbyte-cdk/README.md index b7cd3a8853d4..386fe1df3b1b 100644 --- a/airbyte-cdk/java/airbyte-cdk/README.md +++ b/airbyte-cdk/java/airbyte-cdk/README.md @@ -174,6 +174,7 @@ corresponds to that version. | Version | Date | Pull Request | Subject | |:-----------|:-----------|:-------------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 0.44.18 | 2024-08-22 | [\#44505](https://github.com/airbytehq/airbyte/pull/44505) | Improve handling of incoming debezium change events | | 0.44.17 | 2024-08-27 | [\#44832](https://github.com/airbytehq/airbyte/pull/44832) | Fix issues where some error messages with upper cases do not get matched by the error translation framework. | | 0.44.16 | 2024-08-22 | [\#44505](https://github.com/airbytehq/airbyte/pull/44505) | Destinations: add sqlgenerator testing for mixed-case stream name | | 0.44.15 | ?????????? | [\#?????](https://github.com/airbytehq/airbyte/pull/?????) | ????? | diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties index a34469e9e2d8..400a5cdc6b37 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties @@ -1 +1 @@ -version=0.44.17 +version=0.44.18 diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/ChangeEventWithMetadata.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/ChangeEventWithMetadata.kt index 8e0a8985e2ff..2be17de6728e 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/ChangeEventWithMetadata.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/ChangeEventWithMetadata.kt @@ -6,29 +6,30 @@ package io.airbyte.cdk.integrations.debezium.internals import com.fasterxml.jackson.databind.JsonNode import io.airbyte.commons.json.Jsons import io.debezium.engine.ChangeEvent +import io.github.oshai.kotlinlogging.KotlinLogging -class ChangeEventWithMetadata(private val event: ChangeEvent) { - private val eventKeyAsJson: JsonNode = Jsons.deserialize(event.key()) - private val eventValueAsJson: JsonNode = Jsons.deserialize(event.value()) - private val snapshotMetadata: SnapshotMetadata? = - SnapshotMetadata.Companion.fromString(eventValueAsJson["source"]["snapshot"].asText()) - - fun event(): ChangeEvent { - return event - } +private val LOGGER = KotlinLogging.logger {} - fun eventKeyAsJson(): JsonNode { - return eventKeyAsJson - } +class ChangeEventWithMetadata(private val event: ChangeEvent) { + val eventKeyAsJson: JsonNode? + get() = + event + .key() + ?.let { Jsons.deserialize(it) } + .also { it ?: LOGGER.warn { "Event key is null $event" } } + val eventValueAsJson: JsonNode? + get() = + event + .value() + ?.let { Jsons.deserialize(it) } + .also { it ?: LOGGER.warn { "Event value is null $event" } } - fun eventValueAsJson(): JsonNode { - return eventValueAsJson - } + val snapshotMetadata: SnapshotMetadata? + get() { + val metadataKey = eventValueAsJson?.get("source")?.get("snapshot")?.asText() + return metadataKey?.let { SnapshotMetadata.fromString(metadataKey) } + } val isSnapshotEvent: Boolean - get() = SnapshotMetadata.Companion.isSnapshotEventMetadata(snapshotMetadata) - - fun snapshotMetadata(): SnapshotMetadata? { - return snapshotMetadata - } + get() = SnapshotMetadata.isSnapshotEventMetadata(snapshotMetadata) } diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumRecordIterator.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumRecordIterator.kt index c2e4169e269b..e035a8474532 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumRecordIterator.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumRecordIterator.kt @@ -181,7 +181,7 @@ class DebeziumRecordIterator( hasSnapshotFinished = !changeEventWithMetadata.isSnapshotEvent if (isEventLogged) { - val source: JsonNode? = changeEventWithMetadata.eventValueAsJson()["source"] + val source: JsonNode? = changeEventWithMetadata.eventValueAsJson?.get("source") LOGGER.info { "CDC events queue poll(): " + "returned a change event with \"source\": $source." @@ -340,8 +340,8 @@ class DebeziumRecordIterator( * snapshots) t: truncate, m: message */ fun isEventTypeHandled(event: ChangeEventWithMetadata): Boolean { - event.eventValueAsJson()["op"]?.asText()?.let { - return it in listOf("c", "u", "d", "r", "t") + event.eventValueAsJson?.get("op")?.asText()?.let { + return it in listOf("c", "u", "d", "t") } ?: return false } diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/RelationalDbDebeziumEventConverter.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/RelationalDbDebeziumEventConverter.kt index 5d4e5fb94f5b..1599b5140ed7 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/RelationalDbDebeziumEventConverter.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/RelationalDbDebeziumEventConverter.kt @@ -14,11 +14,11 @@ class RelationalDbDebeziumEventConverter( private val emittedAt: Instant ) : DebeziumEventConverter { override fun toAirbyteMessage(event: ChangeEventWithMetadata): AirbyteMessage { - val debeziumEvent = event.eventValueAsJson() - val before: JsonNode? = debeziumEvent.get(DebeziumEventConverter.Companion.BEFORE_EVENT) - val after: JsonNode? = debeziumEvent.get(DebeziumEventConverter.Companion.AFTER_EVENT) + val debeziumEvent = event.eventValueAsJson + val before: JsonNode? = debeziumEvent?.get(DebeziumEventConverter.Companion.BEFORE_EVENT) + val after: JsonNode? = debeziumEvent?.get(DebeziumEventConverter.Companion.AFTER_EVENT) val source: JsonNode = - checkNotNull(debeziumEvent.get(DebeziumEventConverter.Companion.SOURCE_EVENT)) { + checkNotNull(debeziumEvent?.get(DebeziumEventConverter.Companion.SOURCE_EVENT)) { "ChangeEvent contains no source record $debeziumEvent" } diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumRecordIteratorTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumRecordIteratorTest.kt index d99b69c6c123..bb2db2662c49 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumRecordIteratorTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumRecordIteratorTest.kt @@ -80,7 +80,7 @@ class DebeziumRecordIteratorTest { "c, true", "u, true", "d, true", - "r, true", + "r, false", "t, true", "m, false", "badVal, false", diff --git a/airbyte-integrations/connectors/source-mssql/build.gradle b/airbyte-integrations/connectors/source-mssql/build.gradle index e5f328120575..cdd9713e8457 100644 --- a/airbyte-integrations/connectors/source-mssql/build.gradle +++ b/airbyte-integrations/connectors/source-mssql/build.gradle @@ -3,7 +3,7 @@ plugins { } airbyteJavaConnector { - cdkVersionRequired = '0.44.17' + cdkVersionRequired = '0.44.18' features = ['db-sources'] useLocalCdk = false } @@ -25,7 +25,7 @@ application { dependencies { implementation 'com.microsoft.sqlserver:mssql-jdbc:12.6.1.jre11' - implementation 'io.debezium:debezium-embedded:2.6.1.Final' + implementation 'io.debezium:debezium-embedded:2.7.1.Final' implementation 'io.debezium:debezium-connector-sqlserver:2.6.2.Final' implementation 'org.codehaus.plexus:plexus-utils:3.4.2' diff --git a/airbyte-integrations/connectors/source-mssql/metadata.yaml b/airbyte-integrations/connectors/source-mssql/metadata.yaml index e9b2a7fdabc9..102daf2d4e52 100644 --- a/airbyte-integrations/connectors/source-mssql/metadata.yaml +++ b/airbyte-integrations/connectors/source-mssql/metadata.yaml @@ -9,7 +9,7 @@ data: connectorSubtype: database connectorType: source definitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1 - dockerImageTag: 4.1.9 + dockerImageTag: 4.1.10 dockerRepository: airbyte/source-mssql documentationUrl: https://docs.airbyte.com/integrations/sources/mssql githubIssueLabel: source-mssql diff --git a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcHelper.java b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcHelper.java index 074ae32a66ee..4401b0814d20 100644 --- a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcHelper.java +++ b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcHelper.java @@ -83,7 +83,7 @@ public static Properties getDebeziumProperties(final JdbcDatabase database, fina } else { // If not in snapshot mode, initial will make sure that a snapshot is taken if the transaction log // is rotated out. This will also end up read streaming changes from the transaction_log. - props.setProperty("snapshot.mode", "initial"); + props.setProperty("snapshot.mode", "when_needed"); } props.setProperty("snapshot.isolation.mode", "read_committed"); diff --git a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcTargetPosition.java b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcTargetPosition.java index fda25c5bad37..5b3de1915d4a 100644 --- a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcTargetPosition.java +++ b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcTargetPosition.java @@ -35,11 +35,11 @@ public MssqlCdcTargetPosition(final Lsn targetLsn) { public boolean reachedTargetPosition(final ChangeEventWithMetadata changeEventWithMetadata) { if (changeEventWithMetadata.isSnapshotEvent()) { return false; - } else if (SnapshotMetadata.LAST == changeEventWithMetadata.snapshotMetadata()) { + } else if (SnapshotMetadata.LAST == changeEventWithMetadata.getSnapshotMetadata()) { LOGGER.info("Signalling close because Snapshot is complete"); return true; } else { - final Lsn recordLsn = extractLsn(changeEventWithMetadata.eventValueAsJson()); + final Lsn recordLsn = extractLsn(changeEventWithMetadata.getEventValueAsJson()); final boolean isEventLSNAfter = targetLsn.compareTo(recordLsn) <= 0; if (isEventLSNAfter) { LOGGER.info("Signalling close because record's LSN : " + recordLsn + " is after target LSN : " + targetLsn); @@ -122,7 +122,7 @@ public boolean isEventAheadOffset(Map offset, ChangeEventWithMet if (offset == null || offset.size() != 1) { return false; } - final Lsn eventLsn = extractLsn(event.eventValueAsJson()); + final Lsn eventLsn = extractLsn(event.getEventValueAsJson()); final Lsn offsetLsn = offsetToLsn(offset); return eventLsn.compareTo(offsetLsn) > 0; } diff --git a/docs/integrations/sources/mssql.md b/docs/integrations/sources/mssql.md index 617fedca7a03..07297888099d 100644 --- a/docs/integrations/sources/mssql.md +++ b/docs/integrations/sources/mssql.md @@ -422,19 +422,20 @@ WHERE actor_definition_id ='b5ea17b1-f170-46dc-bc31-cc744ca984c1' AND (configura | Version | Date | Pull Request | Subject | |:--------|:-----------|:------------------------------------------------------------------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------| -| 4.1.9 | 2024-08-27 | [44841](https://github.com/airbytehq/airbyte/pull/44841) | Adopt latest CDK. | -| 4.1.8 | 2024-08-08 | [43410](https://github.com/airbytehq/airbyte/pull/43410) | Adopt latest CDK. | -| 4.1.7 | 2024-08-06 | [42869](https://github.com/airbytehq/airbyte/pull/42869) | Adopt latest CDK. | -| 4.1.6 | 2024-07-30 | [42550](https://github.com/airbytehq/airbyte/pull/42550) | Correctly report stream states. | -| 4.1.5 | 2024-07-29 | [42852](https://github.com/airbytehq/airbyte/pull/42852) | Bump CDK version to latest to use new bug fixes on error translation. | -| 4.1.4 | 2024-07-23 | [42421](https://github.com/airbytehq/airbyte/pull/42421) | Remove final transient error emitter iterators. | -| 4.1.3 | |2024-07-22 | [42411](https://github.com/airbytehq/airbyte/pull/42411) | Hide the "initial load timeout in hours" field by default in UI | -| 4.1.2 | 2024-07-22 | [42024](https://github.com/airbytehq/airbyte/pull/42024) | Fix a NPE bug on resuming from a failed attempt. | -| 4.1.1 | 2024-07-19 | [42122](https://github.com/airbytehq/airbyte/pull/42122) | Improve wass error message + logging. | -| 4.1.0 | 2024-07-17 | [42078](https://github.com/airbytehq/airbyte/pull/42078) | WASS analytics + bug fixes. | -| 4.0.36 | 2024-07-17 | [41648](https://github.com/airbytehq/airbyte/pull/41648) | Implement WASS. | -| 4.0.35 | 2024-07-05 | [40570](https://github.com/airbytehq/airbyte/pull/40570) | Bump debezium-connector-sqlserver from 2.6.1.Final to 2.6.2.Final. | -| 4.0.34 | 2024-07-01 | [40516](https://github.com/airbytehq/airbyte/pull/40516) | Remove dbz hearbeat. | +| 4.1.10 | 2024-08-27 | [44759](https://github.com/airbytehq/airbyte/pull/44759) | Improve null safety in parsing debezium change events. | +| 4.1.9 | 2024-08-27 | [44841](https://github.com/airbytehq/airbyte/pull/44841) | Adopt latest CDK. | +| 4.1.8 | 2024-08-08 | [43410](https://github.com/airbytehq/airbyte/pull/43410) | Adopt latest CDK. | +| 4.1.7 | 2024-08-06 | [42869](https://github.com/airbytehq/airbyte/pull/42869) | Adopt latest CDK. | +| 4.1.6 | 2024-07-30 | [42550](https://github.com/airbytehq/airbyte/pull/42550) | Correctly report stream states. | +| 4.1.5 | 2024-07-29 | [42852](https://github.com/airbytehq/airbyte/pull/42852) | Bump CDK version to latest to use new bug fixes on error translation. | +| 4.1.4 | 2024-07-23 | [42421](https://github.com/airbytehq/airbyte/pull/42421) | Remove final transient error emitter iterators. | +| 4.1.3 | | 2024-07-22 | [42411](https://github.com/airbytehq/airbyte/pull/42411) | Hide the "initial load timeout in hours" field by default in UI | +| 4.1.2 | 2024-07-22 | [42024](https://github.com/airbytehq/airbyte/pull/42024) | Fix a NPE bug on resuming from a failed attempt. | +| 4.1.1 | 2024-07-19 | [42122](https://github.com/airbytehq/airbyte/pull/42122) | Improve wass error message + logging. | +| 4.1.0 | 2024-07-17 | [42078](https://github.com/airbytehq/airbyte/pull/42078) | WASS analytics + bug fixes. | +| 4.0.36 | 2024-07-17 | [41648](https://github.com/airbytehq/airbyte/pull/41648) | Implement WASS. | +| 4.0.35 | 2024-07-05 | [40570](https://github.com/airbytehq/airbyte/pull/40570) | Bump debezium-connector-sqlserver from 2.6.1.Final to 2.6.2.Final. | +| 4.0.34 | 2024-07-01 | [40516](https://github.com/airbytehq/airbyte/pull/40516) | Remove dbz hearbeat. | | 4.0.33 | 2024-06-30 | [40638](https://github.com/airbytehq/airbyte/pull/40638) | Fix a bug that could slow down an initial load of a large table using a different clustered index from the primary key. | | 4.0.32 | 2024-06-26 | [40558](https://github.com/airbytehq/airbyte/pull/40558) | Handle DatetimeOffset correctly. | | 4.0.31 | 2024-06-14 | [39419](https://github.com/airbytehq/airbyte/pull/39419) | Handle DatetimeOffset correctly. | diff --git a/docs/integrations/sources/mssql/mssql-troubleshooting.md b/docs/integrations/sources/mssql/mssql-troubleshooting.md new file mode 100644 index 000000000000..46ad18571d0f --- /dev/null +++ b/docs/integrations/sources/mssql/mssql-troubleshooting.md @@ -0,0 +1,30 @@ +# Troubleshooting Microsoft SQL Server (MSSQL) Sources + +## Connector Limitations + +### Adding columns to existing tables with CDC + +When working with source SQL Server (MSSQL) in CDC mode, Making alteration to a table such as `ALTER TABLE ADD ` will not automatically be reflected in the CDC stream. +The easiest way of making CDC match the new structure of a table. You can disable and re-enable CDC on the table. This will create a new capture instance for the table with the new structure: +1. Disabling CDC on the table: +```sql +EXEC sys.sp_cdc_disable_table + @source_schema = N'', + @source_name = N'
', + @capture_instance = N'' +``` +2. Enabling CDC on the table: +```sql +EXEC sys.sp_cdc_enable_table + @source_schema = N'', + @source_name = N'
', + @role_name = NULL +``` +Note: You may want to set a `@role_name` or any other arguments similarly to how they were set when CDC was enabled in the first place. + +You can validate which columns are being captured by running the following query: +```sql +EXEC sys.sp_cdc_get_captured_columns + @capture_instance = N''; +``` + diff --git a/docusaurus/sidebars.js b/docusaurus/sidebars.js index 6d4236dd4326..3c31364c0ae6 100644 --- a/docusaurus/sidebars.js +++ b/docusaurus/sidebars.js @@ -69,6 +69,7 @@ function getSourceConnectors() { "readme", "postgres", "mongodb-v2", + "mssql", "mysql", ]); } @@ -147,6 +148,22 @@ const sourceMysql = { ], }; +const sourceMssql = { + type: "category", + label: "MS SQL Server (MSSQL)", + link: { + type: "doc", + id: "integrations/sources/mssql", + }, + items: [ + { + type: "doc", + label: "Troubleshooting", + id: "integrations/sources/mssql/mssql-troubleshooting", + }, + ], +}; + const destinationS3 = { type: "category", label: "S3", @@ -156,15 +173,15 @@ const destinationS3 = { }, items: [ { - type: "doc", - label: "Migration Guide", - id: "integrations/destinations/s3-migrations", + type: "doc", + label: "Migration Guide", + id: "integrations/destinations/s3-migrations", }, { type: "doc", label: "Troubleshooting", id: "integrations/destinations/s3/s3-troubleshooting", - } + }, ], }; @@ -350,6 +367,7 @@ const connectorCatalog = { sourcePostgres, sourceMongoDB, sourceMysql, + sourceMssql, ...getSourceConnectors(), ].sort((itemA, itemB) => itemA.label.localeCompare(itemB.label)), }, @@ -573,9 +591,7 @@ module.exports = { type: "doc", id: "operator-guides/upgrading-airbyte", }, - items: [ - "managing-airbyte/connector-updates" - ], + items: ["managing-airbyte/connector-updates"], }, { type: "category",