Skip to content

Commit

Permalink
Merge branch 'master' into akash/lsn-investigate
Browse files Browse the repository at this point in the history
  • Loading branch information
akashkulk authored Dec 11, 2023
2 parents 8323a32 + 19554f9 commit eece59f
Show file tree
Hide file tree
Showing 18 changed files with 199 additions and 166 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.50.35
current_version = 0.50.36
commit = False
tag = False
parse = (?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+)(\-[a-z]+)?
Expand Down
7 changes: 6 additions & 1 deletion airbyte-cdk/java/airbyte-cdk/core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,12 @@ dependencies {

// testImplementation libs.junit.jupiter.api
implementation libs.hikaricp
implementation libs.bundles.debezium.bundle
implementation libs.debezium.api
implementation libs.debezium.embedded
implementation libs.debezium.sqlserver
implementation libs.debezium.mysql
implementation libs.debezium.postgres
implementation libs.debezium.mongodb

api libs.bundles.datadog
implementation 'org.apache.sshd:sshd-mina:2.8.0'
Expand Down
7 changes: 6 additions & 1 deletion airbyte-cdk/java/airbyte-cdk/db-destinations/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,12 @@ dependencies {

// testImplementation libs.junit.jupiter.api
implementation libs.hikaricp
implementation libs.bundles.debezium.bundle
implementation libs.debezium.api
implementation libs.debezium.embedded
implementation libs.debezium.sqlserver
implementation libs.debezium.mysql
implementation libs.debezium.postgres
implementation libs.debezium.mongodb

implementation libs.bundles.datadog
// implementation 'com.datadoghq:dd-trace-api'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,10 @@ protected DataType<?> toDialectType(final AirbyteType type) {

protected DataType<?> toDialectType(final AirbyteProtocolType airbyteProtocolType) {
return switch (airbyteProtocolType) {
case STRING -> SQLDataType.VARCHAR;
case NUMBER -> SQLDataType.FLOAT;
// Many destinations default to a very short length (e.g. Redshift defaults to 256)
case STRING -> SQLDataType.VARCHAR(65535);
// We default to precision=39, scale=9 across destinations
case NUMBER -> SQLDataType.DECIMAL(38, 9);
case INTEGER -> SQLDataType.BIGINT;
case BOOLEAN -> SQLDataType.BOOLEAN;
case TIMESTAMP_WITH_TIMEZONE -> SQLDataType.TIMESTAMPWITHTIMEZONE;
Expand Down
7 changes: 6 additions & 1 deletion airbyte-cdk/java/airbyte-cdk/db-sources/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,12 @@ dependencies {
// TODO: Change these to 'compileOnly' or 'testCompileOnly'

implementation libs.hikaricp
implementation libs.bundles.debezium.bundle
implementation libs.debezium.api
implementation libs.debezium.embedded
implementation libs.debezium.sqlserver
implementation libs.debezium.mysql
implementation libs.debezium.postgres
implementation libs.debezium.mongodb

implementation libs.bundles.datadog
// implementation 'com.datadoghq:dd-trace-api'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
dockerImageTag: 0.7.0
dockerImageTag: 0.7.1
dockerRepository: airbyte/destination-redshift
documentationUrl: https://docs.airbyte.com/integrations/destinations/redshift
githubIssueLabel: destination-redshift
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ private Field<String> jsonSerialize(final Field<?> field) {
* @param arrays
* @return
*/
Field<?> arrayConcatStmt(List<Field<?>> arrays) {
Field<?> arrayConcatStmt(final List<Field<?>> arrays) {
if (arrays.isEmpty()) {
return field("ARRAY()"); // Return an empty string if the list is empty
}
Expand All @@ -235,8 +235,8 @@ Field<?> arrayConcatStmt(List<Field<?>> arrays) {
}

// Recursive case: construct ARRAY_CONCAT function call
Field<?> lastValue = arrays.get(arrays.size() - 1);
Field<?> recursiveCall = arrayConcatStmt(arrays.subList(0, arrays.size() - 1));
final Field<?> lastValue = arrays.get(arrays.size() - 1);
final Field<?> recursiveCall = arrayConcatStmt(arrays.subList(0, arrays.size() - 1));

return function("ARRAY_CONCAT", getSuperType(), recursiveCall, lastValue);
}
Expand Down Expand Up @@ -268,7 +268,7 @@ Field<?> buildAirbyteMetaColumn(final LinkedHashMap<ColumnId, AirbyteType> colum
* @param includeMetaColumn
* @return
*/
LinkedHashMap<String, DataType<?>> getFinalTableMetaColumns(boolean includeMetaColumn) {
LinkedHashMap<String, DataType<?>> getFinalTableMetaColumns(final boolean includeMetaColumn) {
final LinkedHashMap<String, DataType<?>> metaColumns = new LinkedHashMap<>();
metaColumns.put(COLUMN_NAME_AB_RAW_ID, SQLDataType.VARCHAR(36).nullable(false));
metaColumns.put(COLUMN_NAME_AB_EXTRACTED_AT, SQLDataType.TIMESTAMPWITHTIMEZONE.nullable(false));
Expand All @@ -279,12 +279,12 @@ LinkedHashMap<String, DataType<?>> getFinalTableMetaColumns(boolean includeMetaC

@Override
public String createTable(final StreamConfig stream, final String suffix, final boolean force) {
DSLContext dsl = getDslContext();
CreateSchemaFinalStep createSchemaSql = createSchemaIfNotExists(quotedName(stream.id().finalNamespace()));
final DSLContext dsl = getDslContext();
final CreateSchemaFinalStep createSchemaSql = createSchemaIfNotExists(quotedName(stream.id().finalNamespace()));

// TODO: Use Naming transformer to sanitize these strings with redshift restrictions.
String finalTableIdentifier = stream.id().finalName() + suffix.toLowerCase();
CreateTableColumnStep createTableSql = dsl
final String finalTableIdentifier = stream.id().finalName() + suffix.toLowerCase();
final CreateTableColumnStep createTableSql = dsl
.createTable(quotedName(stream.id().finalNamespace(), finalTableIdentifier))
.columns(buildFinalTableFields(stream.columns(), getFinalTableMetaColumns(true)));
if (!force) {
Expand Down Expand Up @@ -423,7 +423,7 @@ private String mergeTransaction(final StreamConfig streamConfig,
* @param cursor
* @return
*/
Field<Integer> getRowNumber(List<ColumnId> primaryKeys, Optional<ColumnId> cursor) {
Field<Integer> getRowNumber(final List<ColumnId> primaryKeys, final Optional<ColumnId> cursor) {
final List<Field<?>> primaryKeyFields =
primaryKeys != null ? primaryKeys.stream().map(columnId -> field(quotedName(columnId.name()))).collect(Collectors.toList())
: new ArrayList<>();
Expand Down Expand Up @@ -452,7 +452,7 @@ SelectConditionStep<Record> selectFromRawTable(final String schemaName,
.where(condition);
}

Condition rawTableCondition(DestinationSyncMode syncMode, boolean isCdcDeletedAtPresent, Optional<Instant> minRawTimestamp) {
Condition rawTableCondition(final DestinationSyncMode syncMode, final boolean isCdcDeletedAtPresent, final Optional<Instant> minRawTimestamp) {
Condition condition = field(name(COLUMN_NAME_AB_LOADED_AT)).isNull();
if (syncMode == DestinationSyncMode.APPEND_DEDUP) {
if (isCdcDeletedAtPresent) {
Expand All @@ -478,7 +478,7 @@ InsertValuesStepN<Record> insertIntoFinalTable(final String schemaName,
.columns(buildFinalTableFields(columns, metaFields));
}

String deleteFromFinalTable(final String schemaName, final String tableName, List<ColumnId> primaryKeys, Optional<ColumnId> cursor) {
String deleteFromFinalTable(final String schemaName, final String tableName, final List<ColumnId> primaryKeys, final Optional<ColumnId> cursor) {
final DSLContext dsl = getDslContext();
// Unknown type doesn't play well with where .. in (select..)
final Field<Object> airbyteRawId = field(quotedName(COLUMN_NAME_AB_RAW_ID));
Expand Down Expand Up @@ -552,7 +552,7 @@ public String clearLoadedAt(final StreamId streamId) {
}

@Override
public boolean shouldRetry(Exception e) {
public boolean shouldRetry(final Exception e) {
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,12 @@ dependencies {
dataGeneratorImplementation libs.kotlinx.cli.jvm
dataGeneratorImplementation 'org.yaml:snakeyaml:2.2'

debeziumTestImplementation libs.bundles.debezium.bundle
debeziumTestImplementation libs.debezium.api
debeziumTestImplementation libs.debezium.embedded
debeziumTestImplementation libs.debezium.sqlserver
debeziumTestImplementation libs.debezium.mysql
debeziumTestImplementation libs.debezium.postgres
debeziumTestImplementation libs.debezium.mongodb
debeziumTestImplementation libs.bundles.slf4j
debeziumTestImplementation libs.slf4j.simple
debeziumTestImplementation libs.kotlinx.cli.jvm
Expand Down
1 change: 0 additions & 1 deletion airbyte-integrations/connectors/source-mssql/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ data:
name: Microsoft SQL Server (MSSQL)
registries:
cloud:
dockerRepository: airbyte/source-mssql-strict-encrypt
enabled: true
oss:
enabled: true
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerImageTag: 3.2.2
dockerImageTag: 3.2.3
dockerRepository: airbyte/source-mysql
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
githubIssueLabel: source-mysql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,9 @@ public JsonNode toDatabaseConfig(final JsonNode config) {
// When using this approach MySql creates a temporary table which may have some effect on db
// performance.
jdbcUrl.append("?useCursorFetch=true");
jdbcUrl.append("&zeroDateTimeBehavior=convertToNull");
// What should happen when the driver encounters DATETIME values that are composed entirely of zeros
// https://dev.mysql.com/doc/connector-j/8.1/en/connector-j-connp-props-datetime-types-processing.html#cj-conn-prop_zeroDateTimeBehavior
jdbcUrl.append("&zeroDateTimeBehavior=CONVERT_TO_NULL");
// ensure the return tinyint(1) is boolean
jdbcUrl.append("&tinyInt1isBit=true");
// ensure the return year value is a Date; see the rationale
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,14 @@ protected void initTests() {
.addExpectedValues((String) null)
.build());

addDataTypeTestData(
TestDataHolder.builder()
.sourceType("date")
.airbyteType(JsonSchemaType.STRING_DATE)
.addInsertValues("0000-00-00")
.addExpectedValues((String) null)
.build());

for (final String fullSourceType : Set.of("datetime", "datetime not null default now()")) {
addDataTypeTestData(
TestDataHolder.builder()
Expand Down
1 change: 0 additions & 1 deletion deps.toml
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ micronaut-test = ["micronaut-test-core", "micronaut-test-junit5", "h2-database"]
micronaut-test-annotation-processor = ["micronaut-inject-java"]
slf4j = ["jul-to-slf4j", "jcl-over-slf4j", "log4j-over-slf4j"]
temporal = ["temporal-sdk", "temporal-serviceclient"]
debezium-bundle = ["debezium-api", "debezium-embedded", "debezium-sqlserver", "debezium-mysql", "debezium-postgres", "debezium-mongodb"]

[plugins]
kotlin-jvm = { id = "org.jetbrains.kotlin.jvm", version.ref = "kotlin" }
Expand Down
3 changes: 2 additions & 1 deletion docs/integrations/destinations/redshift.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ Each stream will be output into its own raw table in Redshift. Each table will c

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.7.1 | 2023-12-11 | [33307](https://github.com/airbytehq/airbyte/pull/33307) | DV2: improve data type mapping |
| 0.7.0 | 2023-12-05 | [32326](https://github.com/airbytehq/airbyte/pull/32326) | Opt in beta for v2 destination |
| 0.6.11 | 2023-11-29 | [#32888](https://github.com/airbytehq/airbyte/pull/32888) | Use the new async framework. |
| 0.6.10 | 2023-11-06 | [#32193](https://github.com/airbytehq/airbyte/pull/32193) | Adopt java CDK version 0.4.1. |
Expand Down Expand Up @@ -219,4 +220,4 @@ Each stream will be output into its own raw table in Redshift. Each table will c
| 0.3.14 | 2021-10-08 | [\#5924](https://github.com/airbytehq/airbyte/pull/5924) | Fixed AWS S3 Staging COPY is writing records from different table in the same raw table |
| 0.3.13 | 2021-09-02 | [\#5745](https://github.com/airbytehq/airbyte/pull/5745) | Disable STATUPDATE flag when using S3 staging to speed up performance |
| 0.3.12 | 2021-07-21 | [\#3555](https://github.com/airbytehq/airbyte/pull/3555) | Enable partial checkpointing for halfway syncs |
| 0.3.11 | 2021-07-20 | [\#4874](https://github.com/airbytehq/airbyte/pull/4874) | allow `additionalProperties` in connector spec |
| 0.3.11 | 2021-07-20 | [\#4874](https://github.com/airbytehq/airbyte/pull/4874) | allow `additionalProperties` in connector spec |
Loading

0 comments on commit eece59f

Please sign in to comment.