Skip to content

Commit

Permalink
move insert statement to base class
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao committed Jan 3, 2024
1 parent 2e6e931 commit b4ffdba
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,15 @@
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType;
import io.airbyte.integrations.base.destination.typing_deduping.BaseSqlGeneratorIntegrationTest;
import io.airbyte.integrations.base.destination.typing_deduping.StreamId;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;
import org.jooq.DSLContext;
import org.jooq.DataType;
import org.jooq.Field;
import org.jooq.InsertValuesStepN;
import org.jooq.Name;
import org.jooq.Record;
import org.jooq.SQLDialect;
import org.jooq.conf.ParamType;
import org.jooq.impl.DSL;
Expand Down Expand Up @@ -52,11 +57,43 @@ private DSLContext getDslContext() {
return DSL.using(getSqlDialect());
}

protected abstract void insertRecords(final Name tableName,
final List<String> columnNames,
final List<JsonNode> records,
final String... jsonColumns)
throws Exception;
/**
* Many destinations require special handling to create JSON values. For example,
* redshift requires you to invoke JSON_PARSE('{...}'), and postgres requires you to
* CAST('{...}' AS JSONB). This method allows subclasses to implement that logic.
*/
protected abstract Field<?> toJsonValue(String valueAsString);

private void insertRecords(final Name tableName, final List<String> columnNames, final List<JsonNode> records, final String... columnsToParseJson)
throws SQLException {
InsertValuesStepN<Record> insert = getDslContext().insertInto(
DSL.table(tableName),
columnNames.stream().map(DSL::field).toList());
for (final JsonNode record : records) {
insert = insert.values(
columnNames.stream()
.map(fieldName -> {
// Convert this field to a string. Pretty naive implementation.
final JsonNode column = record.get(fieldName);
final String columnAsString;
if (column == null) {
columnAsString = null;
} else if (column.isTextual()) {
columnAsString = column.asText();
} else {
columnAsString = column.toString();
}

if (Arrays.asList(columnsToParseJson).contains(fieldName)) {
return toJsonValue(columnAsString);
} else {
return DSL.val(columnAsString);
}
})
.toList());
}
getDatabase().execute(insert.getSQL(ParamType.INLINED));
}

@Override
protected void createNamespace(final String namespace) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import javax.sql.DataSource;
import org.jooq.DSLContext;
import org.jooq.DataType;
import org.jooq.Field;
import org.jooq.InsertValuesStepN;
import org.jooq.Name;
import org.jooq.Record;
Expand Down Expand Up @@ -173,44 +174,8 @@ protected SQLDialect getSqlDialect() {
return SQLDialect.POSTGRES;
}

/**
* Insert arbitrary records into an arbitrary table.
*
* @param columnsToParseJson Columns that must be wrapped in JSON_PARSE, because we're inserting
* them into a SUPER column. Naively inserting a string results a SUPER value containing a
* json string, rather than a json object.
*/
protected void insertRecords(final Name tableName, final List<String> columnNames, final List<JsonNode> records, final String... columnsToParseJson)
throws SQLException {
InsertValuesStepN<Record> insert = DSL.insertInto(
DSL.table(tableName),
columnNames.stream().map(DSL::field).toList());
for (final JsonNode record : records) {
insert = insert.values(
columnNames.stream()
.map(fieldName -> {
// Convert this field to a string. Pretty naive implementation.
final JsonNode column = record.get(fieldName);
final String columnAsString;
if (column == null) {
columnAsString = null;
} else if (column.isTextual()) {
columnAsString = column.asText();
} else {
columnAsString = column.toString();
}

if (Arrays.asList(columnsToParseJson).contains(fieldName)) {
// TODO this is redshift-specific. If we try and genericize this class, we need to handle this
// specifically
return DSL.function("JSON_PARSE", String.class, DSL.inline(escapeStringLiteral(columnAsString)));
} else {
return DSL.inline(escapeStringLiteral(columnAsString));
}
})
.toList());
}
database.execute(insert.getSQL());
protected Field<?> toJsonValue(final String valueAsString) {
return DSL.function("JSON_PARSE", String.class, DSL.val(valueAsString));
}

@Override
Expand Down Expand Up @@ -244,14 +209,4 @@ public void testCreateTableIncremental() throws Exception {
// TODO assert on table clustering, etc.
}

private static String escapeStringLiteral(final String str) {
if (str == null) {
return null;
} else {
// jooq handles most things
// but we need to manually escape backslashes for some reason
return str.replace("\\", "\\\\");
}
}

}

0 comments on commit b4ffdba

Please sign in to comment.