Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao committed Jan 26, 2024
1 parent 90f445a commit a22e06b
Show file tree
Hide file tree
Showing 31 changed files with 218 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@
import org.jooq.DSLContext;
import org.jooq.DataType;
import org.jooq.Field;
import org.jooq.InsertOnDuplicateStep;
import org.jooq.InsertReturningStep;
import org.jooq.InsertValuesStepN;
import org.jooq.Name;
import org.jooq.Record;
Expand Down Expand Up @@ -369,7 +371,7 @@ private Sql insertAndDeleteTransaction(final StreamConfig streamConfig,
select(asterisk(), rowNumber).from(rawTableRowsWithCast));

// Used for append-dedupe mode.
final String insertStmtWithDedupe =
final String insertStmtWithDedupe = mutateInsertStatement(
insertIntoFinalTable(finalSchema, finalTable, streamConfig.columns(), getFinalTableMetaColumns(true))
.select(with(rawTableRowsWithCast)
.with(filteredRows)
Expand All @@ -378,15 +380,16 @@ private Sql insertAndDeleteTransaction(final StreamConfig streamConfig,
.where(field(name(ROW_NUMBER_COLUMN_NAME), Integer.class).eq(1)) // Can refer by CTE.field but no use since we don't strongly type
// them.
)
.getSQL(ParamType.INLINED);
).getSQL(ParamType.INLINED);

// Used for append and overwrite modes.
final String insertStmt =
final String insertStmt = mutateInsertStatement(
insertIntoFinalTable(finalSchema, finalTable, streamConfig.columns(), getFinalTableMetaColumns(true))
.select(with(rawTableRowsWithCast)
.select(finalTableFields)
.from(rawTableRowsWithCast))
.getSQL(ParamType.INLINED);
.from(rawTableRowsWithCast)
)
).getSQL(ParamType.INLINED);
final String deleteStmt = deleteFromFinalTable(finalSchema, finalTable, streamConfig.primaryKey(), streamConfig.cursor());
final String deleteCdcDeletesStmt =
streamConfig.columns().containsKey(cdcDeletedAtColumn) ? deleteFromFinalTableCdcDeletes(finalSchema, finalTable) : "";
Expand Down Expand Up @@ -512,4 +515,8 @@ protected Field<Timestamp> currentTimestamp() {
return DSL.currentTimestamp();
}

protected InsertReturningStep<Record> mutateInsertStatement(final InsertOnDuplicateStep<Record> insert) {
return insert;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@

import static io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_META;
import static io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_DATA;
import static org.jooq.impl.DSL.case_;
import static org.jooq.impl.DSL.cast;
import static org.jooq.impl.DSL.field;
import static org.jooq.impl.DSL.function;
import static org.jooq.impl.DSL.name;
import static org.jooq.impl.DSL.quotedName;
import static org.jooq.impl.DSL.trueCondition;
import static org.jooq.impl.DSL.val;

Expand All @@ -14,29 +16,38 @@
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator;
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType;
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType;
import io.airbyte.integrations.base.destination.typing_deduping.Array;
import io.airbyte.integrations.base.destination.typing_deduping.ColumnId;
import io.airbyte.integrations.base.destination.typing_deduping.Sql;
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig;
import io.airbyte.integrations.base.destination.typing_deduping.Struct;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.lang3.NotImplementedException;
import org.jooq.CaseConditionStep;
import org.jooq.Condition;
import org.jooq.DataType;
import org.jooq.Field;
import org.jooq.InsertOnDuplicateStep;
import org.jooq.InsertReturningStep;
import org.jooq.Param;
import org.jooq.Record;
import org.jooq.SQLDialect;
import org.jooq.impl.DefaultDataType;
import org.jooq.impl.DSL;
import org.jooq.impl.SQLDataType;

public class MysqlSqlGenerator extends JdbcSqlGenerator {

public static final DefaultDataType<Object> JSON_TYPE = new DefaultDataType<>(null, Object.class, "json");

public MysqlSqlGenerator(final NamingConventionTransformer namingResolver) {
super(namingResolver);
}

private DataType<?> getJsonType() {
return new DefaultDataType<>(null, String.class, "json");
return JSON_TYPE;
}

@Override
Expand Down Expand Up @@ -79,21 +90,66 @@ protected SQLDialect getDialect() {
}

@Override
protected List<Field<?>> extractRawDataFields(final LinkedHashMap<ColumnId, AirbyteType> columns) {
protected List<Field<?>> extractRawDataFields(final LinkedHashMap<ColumnId, AirbyteType> columns, final boolean useExpensiveSaferCasting) {
return columns
.entrySet()
.stream()
.map(column -> castedField(
// TODO escape jsonpath
function("JSON_EXTRACT", getJsonType(), field(name(COLUMN_NAME_DATA)), val("$." + column.getKey().originalName())),
column.getValue(),
column.getKey().name()))
.map(column -> {
final String jsonExtractFunction;
final AirbyteType type = column.getValue();
final boolean isStruct = type instanceof Struct;
final boolean isArray = type instanceof Array;
if (type == AirbyteProtocolType.UNKNOWN || isStruct || isArray) {
// UKKNOWN should use json_extract to retain the exact json value
jsonExtractFunction = "JSON_EXTRACT";
} else {
// And primitive types should just use json_value, to (a) strip quotes from strings, and
// (b) cast json null to sql null.
jsonExtractFunction = "JSON_VALUE";
}

final Field<?> extractedValue = function(jsonExtractFunction, getJsonType(), field(name(COLUMN_NAME_DATA)), jsonPath(column.getKey()));
if (isStruct) {
return case_()
.when(
extractedValue.isNull()
.or(function("JSON_TYPE", String.class, extractedValue).ne("OBJECT")),
val((Object) null)
).else_(extractedValue)
.as(quotedName(column.getKey().name()));
} else if (isArray) {
return case_()
.when(
extractedValue.isNull()
.or(function("JSON_TYPE", String.class, extractedValue).ne("ARRAY")),
val((Object) null)
).else_(extractedValue)
.as(quotedName(column.getKey().name()));
} else {
final Field<?> castedValue = castedField(extractedValue, type, column.getKey().name());
if (!(type instanceof final AirbyteProtocolType primitive)) {
return castedValue;
}
return switch (primitive) {
// These types are just casting to strings, so we need to use regex to validate their format
case TIME_WITH_TIMEZONE -> case_()
.when(castedValue.notLikeRegex("^[0-9]{2}:[0-9]{2}:[0-9]{2}([.][0-9]+)?([-+][0-9]{2}:[0-9]{2}|Z)$"), val((Object) null))
.else_(castedValue)
.as(quotedName(column.getKey().name()));
case TIMESTAMP_WITH_TIMEZONE -> case_()
.when(castedValue.notLikeRegex("^[0-9]+-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}([.][0-9]+)?([-+][0-9]{2}:[0-9]{2}|Z)$"), val((Object) null))
.else_(castedValue)
.as(quotedName(column.getKey().name()));
default -> castedValue;
};
}
})
.collect(Collectors.toList());
}

@Override
protected Field<?> buildAirbyteMetaColumn(final LinkedHashMap<ColumnId, AirbyteType> columns) {
// TODO
// TODO Intentionally unimplemented for initial DV2 release
return cast(val("{}"), getJsonType()).as(COLUMN_NAME_AB_META);
}

Expand Down Expand Up @@ -123,4 +179,16 @@ public boolean existingSchemaMatchesStreamConfig(final StreamConfig stream, fina
protected String beginTransaction() {
return "START TRANSACTION";
}

@Override
protected InsertReturningStep<Record> mutateInsertStatement(final InsertOnDuplicateStep<Record> insert) {
// this turns the insert into an `INSERT IGNORE ...`
// We're actually using this to ignore CAST() errors, rather than duplicate key errors.
return insert.onDuplicateKeyIgnore();
}

private static Param<String> jsonPath(final ColumnId column) {
// TODO escape jsonpath
return val("$." + column.originalName());
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package io.airbyte.integrations.destination.mysql.typing_deduping;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.cdk.db.jdbc.AbstractJdbcCompatibleSourceOperations;
import io.airbyte.cdk.db.jdbc.DefaultJdbcDatabase;
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.db.jdbc.JdbcUtils;
import io.airbyte.cdk.integrations.destination.jdbc.TableDefinition;
Expand All @@ -10,11 +13,13 @@
import io.airbyte.integrations.destination.mysql.MySQLDestination;
import io.airbyte.integrations.destination.mysql.MySQLDestinationAcceptanceTest;
import io.airbyte.integrations.destination.mysql.MySQLNameTransformer;
import java.util.List;
import io.airbyte.protocol.models.JsonSchemaType;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import javax.sql.DataSource;
import org.jooq.DataType;
import org.jooq.Field;
import org.jooq.Name;
import org.jooq.SQLDialect;
import org.jooq.impl.DSL;
import org.jooq.impl.DefaultDataType;
Expand All @@ -30,7 +35,7 @@ public class MysqlSqlGeneratorIntegrationTest extends JdbcSqlGeneratorIntegratio
private static JdbcDatabase database;

@BeforeAll
public static void setupMysql() {
public static void setupMysql() throws Exception {
testContainer = new MySQLContainer<>("mysql:8.0");
testContainer.start();
MySQLDestinationAcceptanceTest.configureTestContainer(testContainer);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":[]}, "id1": 1, "id2": 200, "old_cursor": 1, "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":[]}, "id1": 1, "id2": 201, "old_cursor": 2, "name": "Bob", "address": {"city": "Boston", "state": "MA"}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":["Problem with `age`", "Problem with `registration_date`"]}, "id1": 2, "id2": 200, "old_cursor": 3, "name": "Charlie"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 1, "id2": 200, "old_cursor": 0, "_ab_cdc_deleted_at": null, "name" :"Alice", "address": {"city": "San Francisco", "state": "CA"}}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 1, "id2": 200, "old_cursor": 1, "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 1, "id2": 201, "old_cursor": 2, "name": "Bob", "address": {"city": "Boston", "state": "MA"}}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 2, "id2": 200, "old_cursor": 3, "name": "Charlie", "age": "this is not an integer", "registration_date": "this is not a date"}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
// Keep the Alice record with more recent updated_at
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":[]}, "id1": 1, "id2": 200, "updated_at": "2000-01-01T00:01:00.000000Z", "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":[]}, "id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00.000000Z", "name": "Bob", "address": {"city": "Boston", "state": "MA"}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":["Problem with `age`", "Problem with `registration_date`"]}, "id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00.000000Z", "name": "Charlie"}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":[]}, "id1": 1, "id2": 200, "updated_at": "2001-01-01T00:00:00.000000Z", "name": "Someone completely different"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":[]}, "id1": 1, "id2": 200, "updated_at": "2000-01-01T00:00:00.000000Z", "name": "Alice", "address": {"city": "San Francisco", "state": "CA"}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":[]}, "id1": 1, "id2": 200, "updated_at": "2000-01-01T00:01:00.000000Z", "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":[]}, "id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00.000000Z", "name": "Bob", "address": {"city": "Boston", "state": "MA"}}
// Invalid columns are nulled out (i.e. SQL null, not JSON null)
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":["Problem with `age`", "Problem with `registration_date`"]}, "id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00.000000Z", "name": "Charlie"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-01T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "San Francisco", "state": "CA"}}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-01T00:01:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00Z", "name": "Bob", "address": {"city": "Boston", "state": "MA"}}}
// Invalid data is still allowed in the raw table.
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00Z", "name": "Charlie", "age": "this is not an integer", "registration_date": "this is not a date"}}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2001-01-01T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Someone completely different"}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_meta":{"errors":[]}, "id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00.000000Z", "name": "Alice", "address": {"city": "Seattle", "state": "WA"}}
// Charlie wasn't reemitted with updated_at, so it still has a null cursor
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":["Problem with `age`", "Problem with `registration_date`"]}, "id1": 2, "id2": 200, "name": "Charlie"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 1, "id2": 200, "old_cursor": 0, "_ab_cdc_deleted_at": null, "name" :"Alice", "address": {"city": "San Francisco", "state": "CA"}}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 1, "id2": 200, "old_cursor": 1, "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 1, "id2": 201, "old_cursor": 2, "name": "Bob", "address": {"city": "Boston", "state": "MA"}}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 2, "id2": 200, "old_cursor": 3, "name": "Charlie", "age": "this is not an integer", "registration_date": "this is not a date"}}
{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Seattle", "state": "WA"}}}
{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Bob", "address": {"city": "New York", "state": "NY"}}}
{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:01:00Z", "_ab_cdc_deleted_at": "1970-01-01T00:00:00Z"}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":[]}, "id1": 1, "id2": 200, "updated_at": "2000-01-01T00:00:00.000000Z", "name": "Alice", "address": {"city": "San Francisco", "state": "CA"}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":[]}, "id1": 1, "id2": 200, "updated_at": "2000-01-01T00:01:00.000000Z", "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":[]}, "id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00.000000Z", "name": "Bob", "address": {"city": "Boston", "state": "MA"}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":["Problem with `age`", "Problem with `registration_date`"]}, "id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00.000000Z", "name": "Charlie"}

{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_meta":{"errors":[]}, "id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00.000000Z", "name": "Alice", "address": {"city": "Seattle", "state": "WA"}}
{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_meta":{"errors":[]}, "id1": 1, "id2": 201, "updated_at": "2000-01-02T00:00:00.000000Z", "name": "Bob", "address": {"city": "New York", "state": "NY"}}
{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_meta":{"errors":[]}, "id1": 1, "id2": 201, "updated_at": "2000-01-02T00:01:00.000000Z", "_ab_cdc_deleted_at": "1970-01-01T00:00:00.000000Z"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_meta":{"errors":[]}, "id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00.000000Z", "name": "Alice", "address": {"city": "Seattle", "state": "WA"}}
{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_meta":{"errors":[]}, "id1": 1, "id2": 201, "updated_at": "2000-01-02T00:00:00.000000Z", "name": "Bob", "address": {"city": "New York", "state": "NY"}}
{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_meta":{"errors":[]}, "id1": 1, "id2": 201, "updated_at": "2000-01-02T00:01:00.000000Z", "_ab_cdc_deleted_at": "1970-01-01T00:00:00.000000Z"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Seattle", "state": "WA"}}}
{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Bob", "address": {"city": "New York", "state": "NY"}}}
{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:01:00Z", "_ab_cdc_deleted_at": "1970-01-01T00:00:00Z"}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_meta":{"errors":[]}, "id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00.000000Z", "name": "Alice", "address": {"city": "Seattle", "state": "WA"}}
// Delete Bob, keep Charlie
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":["Problem with `age`", "Problem with `registration_date`"]}, "id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00.000000Z", "name": "Charlie"}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_meta":{"errors":[]}, "id1": 1, "id2": 200, "updated_at": "2001-01-02T00:00:00.000000Z", "name": "Someone completely different v2"}
Loading

0 comments on commit a22e06b

Please sign in to comment.