Skip to content

Commit

Permalink
MSSQL and debezium improvements (#44759)
Browse files Browse the repository at this point in the history
Co-authored-by: evantahler <[email protected]>
  • Loading branch information
rodireich and evantahler authored Aug 28, 2024
1 parent 7debf35 commit 18a8e5f
Show file tree
Hide file tree
Showing 13 changed files with 105 additions and 56 deletions.
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ corresponds to that version.

| Version | Date | Pull Request | Subject |
|:-----------|:-----------|:-------------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.44.18 | 2024-08-22 | [\#44505](https://github.com/airbytehq/airbyte/pull/44505) | Improve handling of incoming debezium change events |
| 0.44.17 | 2024-08-27 | [\#44832](https://github.com/airbytehq/airbyte/pull/44832) | Fix issues where some error messages with upper cases do not get matched by the error translation framework. |
| 0.44.16 | 2024-08-22 | [\#44505](https://github.com/airbytehq/airbyte/pull/44505) | Destinations: add sqlgenerator testing for mixed-case stream name |
| 0.44.15 | ?????????? | [\#?????](https://github.com/airbytehq/airbyte/pull/?????) | ????? |
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.44.17
version=0.44.18
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,30 @@ package io.airbyte.cdk.integrations.debezium.internals
import com.fasterxml.jackson.databind.JsonNode
import io.airbyte.commons.json.Jsons
import io.debezium.engine.ChangeEvent
import io.github.oshai.kotlinlogging.KotlinLogging

class ChangeEventWithMetadata(private val event: ChangeEvent<String?, String?>) {
private val eventKeyAsJson: JsonNode = Jsons.deserialize(event.key())
private val eventValueAsJson: JsonNode = Jsons.deserialize(event.value())
private val snapshotMetadata: SnapshotMetadata? =
SnapshotMetadata.Companion.fromString(eventValueAsJson["source"]["snapshot"].asText())

fun event(): ChangeEvent<String?, String?> {
return event
}
private val LOGGER = KotlinLogging.logger {}

fun eventKeyAsJson(): JsonNode {
return eventKeyAsJson
}
class ChangeEventWithMetadata(private val event: ChangeEvent<String?, String?>) {
val eventKeyAsJson: JsonNode?
get() =
event
.key()
?.let { Jsons.deserialize(it) }
.also { it ?: LOGGER.warn { "Event key is null $event" } }
val eventValueAsJson: JsonNode?
get() =
event
.value()
?.let { Jsons.deserialize(it) }
.also { it ?: LOGGER.warn { "Event value is null $event" } }

fun eventValueAsJson(): JsonNode {
return eventValueAsJson
}
val snapshotMetadata: SnapshotMetadata?
get() {
val metadataKey = eventValueAsJson?.get("source")?.get("snapshot")?.asText()
return metadataKey?.let { SnapshotMetadata.fromString(metadataKey) }
}

val isSnapshotEvent: Boolean
get() = SnapshotMetadata.Companion.isSnapshotEventMetadata(snapshotMetadata)

fun snapshotMetadata(): SnapshotMetadata? {
return snapshotMetadata
}
get() = SnapshotMetadata.isSnapshotEventMetadata(snapshotMetadata)
}
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ class DebeziumRecordIterator<T>(
hasSnapshotFinished = !changeEventWithMetadata.isSnapshotEvent

if (isEventLogged) {
val source: JsonNode? = changeEventWithMetadata.eventValueAsJson()["source"]
val source: JsonNode? = changeEventWithMetadata.eventValueAsJson?.get("source")
LOGGER.info {
"CDC events queue poll(): " +
"returned a change event with \"source\": $source."
Expand Down Expand Up @@ -340,8 +340,8 @@ class DebeziumRecordIterator<T>(
* snapshots) t: truncate, m: message
*/
fun isEventTypeHandled(event: ChangeEventWithMetadata): Boolean {
event.eventValueAsJson()["op"]?.asText()?.let {
return it in listOf("c", "u", "d", "r", "t")
event.eventValueAsJson?.get("op")?.asText()?.let {
return it in listOf("c", "u", "d", "t")
}
?: return false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ class RelationalDbDebeziumEventConverter(
private val emittedAt: Instant
) : DebeziumEventConverter {
override fun toAirbyteMessage(event: ChangeEventWithMetadata): AirbyteMessage {
val debeziumEvent = event.eventValueAsJson()
val before: JsonNode? = debeziumEvent.get(DebeziumEventConverter.Companion.BEFORE_EVENT)
val after: JsonNode? = debeziumEvent.get(DebeziumEventConverter.Companion.AFTER_EVENT)
val debeziumEvent = event.eventValueAsJson
val before: JsonNode? = debeziumEvent?.get(DebeziumEventConverter.Companion.BEFORE_EVENT)
val after: JsonNode? = debeziumEvent?.get(DebeziumEventConverter.Companion.AFTER_EVENT)
val source: JsonNode =
checkNotNull(debeziumEvent.get(DebeziumEventConverter.Companion.SOURCE_EVENT)) {
checkNotNull(debeziumEvent?.get(DebeziumEventConverter.Companion.SOURCE_EVENT)) {
"ChangeEvent contains no source record $debeziumEvent"
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class DebeziumRecordIteratorTest {
"c, true",
"u, true",
"d, true",
"r, true",
"r, false",
"t, true",
"m, false",
"badVal, false",
Expand Down
4 changes: 2 additions & 2 deletions airbyte-integrations/connectors/source-mssql/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.44.17'
cdkVersionRequired = '0.44.18'
features = ['db-sources']
useLocalCdk = false
}
Expand All @@ -25,7 +25,7 @@ application {

dependencies {
implementation 'com.microsoft.sqlserver:mssql-jdbc:12.6.1.jre11'
implementation 'io.debezium:debezium-embedded:2.6.1.Final'
implementation 'io.debezium:debezium-embedded:2.7.1.Final'
implementation 'io.debezium:debezium-connector-sqlserver:2.6.2.Final'
implementation 'org.codehaus.plexus:plexus-utils:3.4.2'

Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mssql/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
dockerImageTag: 4.1.9
dockerImageTag: 4.1.10
dockerRepository: airbyte/source-mssql
documentationUrl: https://docs.airbyte.com/integrations/sources/mssql
githubIssueLabel: source-mssql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public static Properties getDebeziumProperties(final JdbcDatabase database, fina
} else {
// If not in snapshot mode, initial will make sure that a snapshot is taken if the transaction log
// is rotated out. This will also end up read streaming changes from the transaction_log.
props.setProperty("snapshot.mode", "initial");
props.setProperty("snapshot.mode", "when_needed");
}

props.setProperty("snapshot.isolation.mode", "read_committed");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ public MssqlCdcTargetPosition(final Lsn targetLsn) {
public boolean reachedTargetPosition(final ChangeEventWithMetadata changeEventWithMetadata) {
if (changeEventWithMetadata.isSnapshotEvent()) {
return false;
} else if (SnapshotMetadata.LAST == changeEventWithMetadata.snapshotMetadata()) {
} else if (SnapshotMetadata.LAST == changeEventWithMetadata.getSnapshotMetadata()) {
LOGGER.info("Signalling close because Snapshot is complete");
return true;
} else {
final Lsn recordLsn = extractLsn(changeEventWithMetadata.eventValueAsJson());
final Lsn recordLsn = extractLsn(changeEventWithMetadata.getEventValueAsJson());
final boolean isEventLSNAfter = targetLsn.compareTo(recordLsn) <= 0;
if (isEventLSNAfter) {
LOGGER.info("Signalling close because record's LSN : " + recordLsn + " is after target LSN : " + targetLsn);
Expand Down Expand Up @@ -122,7 +122,7 @@ public boolean isEventAheadOffset(Map<String, String> offset, ChangeEventWithMet
if (offset == null || offset.size() != 1) {
return false;
}
final Lsn eventLsn = extractLsn(event.eventValueAsJson());
final Lsn eventLsn = extractLsn(event.getEventValueAsJson());
final Lsn offsetLsn = offsetToLsn(offset);
return eventLsn.compareTo(offsetLsn) > 0;
}
Expand Down
27 changes: 14 additions & 13 deletions docs/integrations/sources/mssql.md
Original file line number Diff line number Diff line change
Expand Up @@ -422,19 +422,20 @@ WHERE actor_definition_id ='b5ea17b1-f170-46dc-bc31-cc744ca984c1' AND (configura

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:------------------------------------------------------------------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------|
| 4.1.9 | 2024-08-27 | [44841](https://github.com/airbytehq/airbyte/pull/44841) | Adopt latest CDK. |
| 4.1.8 | 2024-08-08 | [43410](https://github.com/airbytehq/airbyte/pull/43410) | Adopt latest CDK. |
| 4.1.7 | 2024-08-06 | [42869](https://github.com/airbytehq/airbyte/pull/42869) | Adopt latest CDK. |
| 4.1.6 | 2024-07-30 | [42550](https://github.com/airbytehq/airbyte/pull/42550) | Correctly report stream states. |
| 4.1.5 | 2024-07-29 | [42852](https://github.com/airbytehq/airbyte/pull/42852) | Bump CDK version to latest to use new bug fixes on error translation. |
| 4.1.4 | 2024-07-23 | [42421](https://github.com/airbytehq/airbyte/pull/42421) | Remove final transient error emitter iterators. |
| 4.1.3 | |2024-07-22 | [42411](https://github.com/airbytehq/airbyte/pull/42411) | Hide the "initial load timeout in hours" field by default in UI |
| 4.1.2 | 2024-07-22 | [42024](https://github.com/airbytehq/airbyte/pull/42024) | Fix a NPE bug on resuming from a failed attempt. |
| 4.1.1 | 2024-07-19 | [42122](https://github.com/airbytehq/airbyte/pull/42122) | Improve wass error message + logging. |
| 4.1.0 | 2024-07-17 | [42078](https://github.com/airbytehq/airbyte/pull/42078) | WASS analytics + bug fixes. |
| 4.0.36 | 2024-07-17 | [41648](https://github.com/airbytehq/airbyte/pull/41648) | Implement WASS. |
| 4.0.35 | 2024-07-05 | [40570](https://github.com/airbytehq/airbyte/pull/40570) | Bump debezium-connector-sqlserver from 2.6.1.Final to 2.6.2.Final. |
| 4.0.34 | 2024-07-01 | [40516](https://github.com/airbytehq/airbyte/pull/40516) | Remove dbz hearbeat. |
| 4.1.10 | 2024-08-27 | [44759](https://github.com/airbytehq/airbyte/pull/44759) | Improve null safety in parsing debezium change events. |
| 4.1.9 | 2024-08-27 | [44841](https://github.com/airbytehq/airbyte/pull/44841) | Adopt latest CDK. |
| 4.1.8 | 2024-08-08 | [43410](https://github.com/airbytehq/airbyte/pull/43410) | Adopt latest CDK. |
| 4.1.7 | 2024-08-06 | [42869](https://github.com/airbytehq/airbyte/pull/42869) | Adopt latest CDK. |
| 4.1.6 | 2024-07-30 | [42550](https://github.com/airbytehq/airbyte/pull/42550) | Correctly report stream states. |
| 4.1.5 | 2024-07-29 | [42852](https://github.com/airbytehq/airbyte/pull/42852) | Bump CDK version to latest to use new bug fixes on error translation. |
| 4.1.4 | 2024-07-23 | [42421](https://github.com/airbytehq/airbyte/pull/42421) | Remove final transient error emitter iterators. |
| 4.1.3 | | 2024-07-22 | [42411](https://github.com/airbytehq/airbyte/pull/42411) | Hide the "initial load timeout in hours" field by default in UI |
| 4.1.2 | 2024-07-22 | [42024](https://github.com/airbytehq/airbyte/pull/42024) | Fix a NPE bug on resuming from a failed attempt. |
| 4.1.1 | 2024-07-19 | [42122](https://github.com/airbytehq/airbyte/pull/42122) | Improve wass error message + logging. |
| 4.1.0 | 2024-07-17 | [42078](https://github.com/airbytehq/airbyte/pull/42078) | WASS analytics + bug fixes. |
| 4.0.36 | 2024-07-17 | [41648](https://github.com/airbytehq/airbyte/pull/41648) | Implement WASS. |
| 4.0.35 | 2024-07-05 | [40570](https://github.com/airbytehq/airbyte/pull/40570) | Bump debezium-connector-sqlserver from 2.6.1.Final to 2.6.2.Final. |
| 4.0.34 | 2024-07-01 | [40516](https://github.com/airbytehq/airbyte/pull/40516) | Remove dbz hearbeat. |
| 4.0.33 | 2024-06-30 | [40638](https://github.com/airbytehq/airbyte/pull/40638) | Fix a bug that could slow down an initial load of a large table using a different clustered index from the primary key. |
| 4.0.32 | 2024-06-26 | [40558](https://github.com/airbytehq/airbyte/pull/40558) | Handle DatetimeOffset correctly. |
| 4.0.31 | 2024-06-14 | [39419](https://github.com/airbytehq/airbyte/pull/39419) | Handle DatetimeOffset correctly. |
Expand Down
30 changes: 30 additions & 0 deletions docs/integrations/sources/mssql/mssql-troubleshooting.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Troubleshooting Microsoft SQL Server (MSSQL) Sources

## Connector Limitations

### Adding columns to existing tables with CDC

When working with source SQL Server (MSSQL) in CDC mode, Making alteration to a table such as `ALTER TABLE <table> ADD <column>` will not automatically be reflected in the CDC stream.
The easiest way of making CDC match the new structure of a table. You can disable and re-enable CDC on the table. This will create a new capture instance for the table with the new structure:
1. Disabling CDC on the table:
```sql
EXEC sys.sp_cdc_disable_table
@source_schema = N'<schema>',
@source_name = N'<table>',
@capture_instance = N'<capture instance (typically schema_table)>'
```
2. Enabling CDC on the table:
```sql
EXEC sys.sp_cdc_enable_table
@source_schema = N'<schema>',
@source_name = N'<table>',
@role_name = NULL
```
Note: You may want to set a `@role_name` or any other arguments similarly to how they were set when CDC was enabled in the first place.

You can validate which columns are being captured by running the following query:
```sql
EXEC sys.sp_cdc_get_captured_columns
@capture_instance = N'<capture instance (typically schema_table)>';
```

30 changes: 23 additions & 7 deletions docusaurus/sidebars.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ function getSourceConnectors() {
"readme",
"postgres",
"mongodb-v2",
"mssql",
"mysql",
]);
}
Expand Down Expand Up @@ -147,6 +148,22 @@ const sourceMysql = {
],
};

const sourceMssql = {
type: "category",
label: "MS SQL Server (MSSQL)",
link: {
type: "doc",
id: "integrations/sources/mssql",
},
items: [
{
type: "doc",
label: "Troubleshooting",
id: "integrations/sources/mssql/mssql-troubleshooting",
},
],
};

const destinationS3 = {
type: "category",
label: "S3",
Expand All @@ -156,15 +173,15 @@ const destinationS3 = {
},
items: [
{
type: "doc",
label: "Migration Guide",
id: "integrations/destinations/s3-migrations",
type: "doc",
label: "Migration Guide",
id: "integrations/destinations/s3-migrations",
},
{
type: "doc",
label: "Troubleshooting",
id: "integrations/destinations/s3/s3-troubleshooting",
}
},
],
};

Expand Down Expand Up @@ -350,6 +367,7 @@ const connectorCatalog = {
sourcePostgres,
sourceMongoDB,
sourceMysql,
sourceMssql,
...getSourceConnectors(),
].sort((itemA, itemB) => itemA.label.localeCompare(itemB.label)),
},
Expand Down Expand Up @@ -573,9 +591,7 @@ module.exports = {
type: "doc",
id: "operator-guides/upgrading-airbyte",
},
items: [
"managing-airbyte/connector-updates"
],
items: ["managing-airbyte/connector-updates"],
},
{
type: "category",
Expand Down

0 comments on commit 18a8e5f

Please sign in to comment.