diff --git a/airbyte-integrations/connectors/source-mssql/metadata.yaml b/airbyte-integrations/connectors/source-mssql/metadata.yaml index 4044481c816a..1f42d9ddff87 100644 --- a/airbyte-integrations/connectors/source-mssql/metadata.yaml +++ b/airbyte-integrations/connectors/source-mssql/metadata.yaml @@ -9,7 +9,7 @@ data: connectorSubtype: database connectorType: source definitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1 - dockerImageTag: 3.4.0 + dockerImageTag: 3.4.1 dockerRepository: airbyte/source-mssql documentationUrl: https://docs.airbyte.com/integrations/sources/mssql githubIssueLabel: source-mssql diff --git a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcHelper.java b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcHelper.java index c9c38928beb2..c330ea5267bb 100644 --- a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcHelper.java +++ b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcHelper.java @@ -157,6 +157,7 @@ static Properties getDebeziumProperties(final JdbcDatabase database, final Confi props.setProperty("provide.transaction.metadata", "false"); props.setProperty("converters", "mssql_converter"); + props.setProperty("mssql_converter.type", MssqlDebeziumConverter.class.getName()); // If new stream(s) are added after a previously successful sync, diff --git a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlDebeziumConverter.java b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlDebeziumConverter.java index 0e57af398ec9..69ffcf2c0499 100644 --- a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlDebeziumConverter.java +++ b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlDebeziumConverter.java @@ -13,7 +13,6 @@ import io.debezium.spi.converter.CustomConverter; import io.debezium.spi.converter.RelationalColumn; import java.math.BigDecimal; -import java.nio.charset.Charset; import java.sql.Timestamp; import java.time.LocalDateTime; import java.time.OffsetDateTime; @@ -22,6 +21,7 @@ import java.util.Properties; import java.util.Set; import microsoft.sql.DateTimeOffset; +import org.apache.commons.codec.binary.Base64; import org.apache.kafka.connect.data.SchemaBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -197,7 +197,7 @@ private void registerBinary(final RelationalColumn field, } if (input instanceof byte[]) { - return new String((byte[]) input, Charset.defaultCharset()); + return Base64.encodeBase64String((byte[]) input); } LOGGER.warn("Uncovered binary class type '{}'. Use default converter", diff --git a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSourceOperations.java b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSourceOperations.java index 7fb984c6d7d4..4c46601f7fcf 100644 --- a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSourceOperations.java +++ b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSourceOperations.java @@ -18,7 +18,6 @@ import com.microsoft.sqlserver.jdbc.SQLServerResultSetMetaData; import io.airbyte.cdk.db.jdbc.JdbcSourceOperations; import io.airbyte.protocol.models.JsonSchemaType; -import java.nio.charset.Charset; import java.sql.JDBCType; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -29,6 +28,7 @@ import java.time.format.DateTimeFormatter; import java.time.format.DateTimeParseException; import microsoft.sql.DateTimeOffset; +import org.apache.commons.codec.binary.Base64; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -128,7 +128,7 @@ protected void putBinary(final ObjectNode node, final int index) throws SQLException { final byte[] bytes = resultSet.getBytes(index); - final String value = new String(bytes, Charset.defaultCharset()); + final String value = Base64.encodeBase64String(bytes); node.put(columnName, value); } diff --git a/airbyte-integrations/connectors/source-mssql/src/test-integration/java/io/airbyte/integrations/source/mssql/AbstractMssqlSourceDatatypeTest.java b/airbyte-integrations/connectors/source-mssql/src/test-integration/java/io/airbyte/integrations/source/mssql/AbstractMssqlSourceDatatypeTest.java index f229caaad68f..32c42ebea52c 100644 --- a/airbyte-integrations/connectors/source-mssql/src/test-integration/java/io/airbyte/integrations/source/mssql/AbstractMssqlSourceDatatypeTest.java +++ b/airbyte-integrations/connectors/source-mssql/src/test-integration/java/io/airbyte/integrations/source/mssql/AbstractMssqlSourceDatatypeTest.java @@ -257,7 +257,7 @@ protected void initTests() { .sourceType("binary") .airbyteType(JsonSchemaType.STRING_BASE_64) .addInsertValues("CAST( 'A' AS BINARY(1))", "null") - .addExpectedValues("A", null) + .addExpectedValues("QQ==", null) .createTablePatternSql(CREATE_TABLE_SQL) .build()); @@ -267,7 +267,7 @@ protected void initTests() { .fullSourceDataType("varbinary(3)") .airbyteType(JsonSchemaType.STRING_BASE_64) .addInsertValues("CAST( 'ABC' AS VARBINARY)", "null") - .addExpectedValues("ABC", null) + .addExpectedValues("QUJD", null) .createTablePatternSql(CREATE_TABLE_SQL) .build()); diff --git a/docs/integrations/sources/mssql.md b/docs/integrations/sources/mssql.md index 578484b2d0b2..1e1059f3150b 100644 --- a/docs/integrations/sources/mssql.md +++ b/docs/integrations/sources/mssql.md @@ -342,6 +342,7 @@ WHERE actor_definition_id ='b5ea17b1-f170-46dc-bc31-cc744ca984c1' AND (configura | Version | Date | Pull Request | Subject | |:--------|:-----------|:------------------------------------------------------------------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------| +| 3.4.1 | 2024-01-02 | [33755](https://github.com/airbytehq/airbyte/pull/33755) | Encode binary to base64 format | | 3.4.0 | 2023-12-19 | [33481](https://github.com/airbytehq/airbyte/pull/33481) | Remove LEGACY state flag | | 3.3.2 | 2023-12-14 | [33505](https://github.com/airbytehq/airbyte/pull/33225) | Using the released CDK. | | 3.3.1 | 2023-12-12 | [33225](https://github.com/airbytehq/airbyte/pull/33225) | extracting MsSql specific files out of the CDK. |