Skip to content

Commit

Permalink
propagate safe_cast option to subclass
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao committed Jan 3, 2024
1 parent b4ffdba commit 4d045ca
Showing 1 changed file with 13 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,12 @@ public DataType<?> getTimestampWithTimeZoneType() {
protected abstract SQLDialect getDialect();

/**
* @param columns from the schema to be extracted from _airbyte_data column. Use the destination
* specific syntax to extract data
* @param columns from the schema to be extracted from _airbyte_data column. Use the destination
* specific syntax to extract data
* @param useExpensiveSaferCasting
* @return a list of jooq fields for the final table insert statement.
*/
protected abstract List<Field<?>> extractRawDataFields(final LinkedHashMap<ColumnId, AirbyteType> columns);
protected abstract List<Field<?>> extractRawDataFields(final LinkedHashMap<ColumnId, AirbyteType> columns, boolean useExpensiveSaferCasting);

/**
*
Expand Down Expand Up @@ -224,11 +225,13 @@ LinkedHashMap<String, DataType<?>> getFinalTableMetaColumns(final boolean includ
* @return
*/
@VisibleForTesting
List<Field<?>> buildRawTableSelectFields(final LinkedHashMap<ColumnId, AirbyteType> columns, final Map<String, DataType<?>> metaColumns) {
List<Field<?>> buildRawTableSelectFields(final LinkedHashMap<ColumnId, AirbyteType> columns,
final Map<String, DataType<?>> metaColumns,
final boolean useExpensiveSaferCasting) {
final List<Field<?>> fields =
metaColumns.entrySet().stream().map(metaColumn -> field(quotedName(metaColumn.getKey()), metaColumn.getValue())).collect(toList());
// Use originalName with non-sanitized characters when extracting data from _airbyte_data
final List<Field<?>> dataFields = extractRawDataFields(columns);
final List<Field<?>> dataFields = extractRawDataFields(columns, useExpensiveSaferCasting);
dataFields.addAll(fields);
return dataFields;
}
Expand Down Expand Up @@ -327,10 +330,11 @@ SelectConditionStep<Record> selectFromRawTable(final String schemaName,
final String tableName,
final LinkedHashMap<ColumnId, AirbyteType> columns,
final Map<String, DataType<?>> metaColumns,
final Condition condition) {
final Condition condition,
final boolean useExpensiveSaferCasting) {
final DSLContext dsl = getDslContext();
return dsl
.select(buildRawTableSelectFields(columns, metaColumns))
.select(buildRawTableSelectFields(columns, metaColumns, useExpensiveSaferCasting))
.select(buildAirbyteMetaColumn(columns))
.from(table(quotedName(schemaName, tableName)))
.where(condition);
Expand Down Expand Up @@ -363,7 +367,8 @@ private String insertAndDeleteTransaction(final StreamConfig streamConfig,
getFinalTableMetaColumns(false),
rawTableCondition(streamConfig.destinationSyncMode(),
streamConfig.columns().containsKey(cdcDeletedAtColumn),
minRawTimestamp)));
minRawTimestamp),
useExpensiveSaferCasting));
final List<Field<?>> finalTableFields = buildFinalTableFields(streamConfig.columns(), getFinalTableMetaColumns(true));
final Field<Integer> rowNumber = getRowNumber(streamConfig.primaryKey(), streamConfig.cursor());
final CommonTableExpression<Record> filteredRows = name(NUMBERED_ROWS_CTE_ALIAS).as(
Expand Down

0 comments on commit 4d045ca

Please sign in to comment.