Skip to content

Commit

Permalink
[source-postgres] : Remove legacy bad values handling code (#37445)
Browse files Browse the repository at this point in the history
  • Loading branch information
akashkulk authored Apr 22, 2024
1 parent 01381ae commit 9b9ec1c
Show file tree
Hide file tree
Showing 8 changed files with 267 additions and 240 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,30 @@ object DataTypeUtils {
}
}

@JvmStatic
fun <T> throwExceptionIfInvalid(valueProducer: DataTypeSupplier<T>): T? {
return throwExceptionIfInvalid(valueProducer, Function { _: T? -> true })
}

@JvmStatic
fun <T> throwExceptionIfInvalid(
valueProducer: DataTypeSupplier<T>,
isValidFn: Function<T?, Boolean>
): T? {
// Some edge case values (e.g: Infinity, NaN) have no java or JSON equivalent, and will
// throw an
// exception when parsed. We want to parse those
// values as null.
// This method reduces error handling boilerplate.
try {
val value = valueProducer.apply()
return if (isValidFn.apply(value)) value
else throw SQLException("Given value is not valid.")
} catch (e: SQLException) {
return null
}
}

@JvmStatic
fun toISO8601StringWithMicroseconds(instant: Instant): String {
val dateWithMilliseconds = dateFormatMillisPattern.format(Date.from(instant))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ abstract class AbstractJdbcCompatibleSourceOperations<Datatype> :
// convert to java types that will convert into reasonable json.
copyToJsonField(queryContext, i, jsonNode)
} catch (e: java.lang.Exception) {
jsonNode.putNull(columnName)
LOGGER.info(
"Failed to serialize column: {}, of type {}, with error {}",
columnName,
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.30.6
version=0.30.7
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ java {
}

airbyteJavaConnector {
cdkVersionRequired = '0.29.13'
cdkVersionRequired = '0.30.7'
features = ['db-sources', 'datastore-postgres']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerImageTag: 3.3.26
dockerImageTag: 3.3.27
dockerRepository: airbyte/source-postgres
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
githubIssueLabel: source-postgres
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ private void putBigDecimalArray(final ObjectNode node, final String columnName,
final ArrayNode arrayNode = Jsons.arrayNode();
final ResultSet arrayResultSet = resultSet.getArray(colIndex).getResultSet();
while (arrayResultSet.next()) {
final BigDecimal bigDecimal = DataTypeUtils.returnNullIfInvalid(() -> arrayResultSet.getBigDecimal(2));
final BigDecimal bigDecimal = DataTypeUtils.throwExceptionIfInvalid(() -> arrayResultSet.getBigDecimal(2));
if (bigDecimal != null) {
arrayNode.add(bigDecimal);
} else {
Expand All @@ -361,7 +361,7 @@ private void putBigIntArray(final ObjectNode node, final String columnName, fina
final ArrayNode arrayNode = Jsons.arrayNode();
final ResultSet arrayResultSet = resultSet.getArray(colIndex).getResultSet();
while (arrayResultSet.next()) {
final long value = DataTypeUtils.returnNullIfInvalid(() -> arrayResultSet.getLong(2));
final long value = DataTypeUtils.throwExceptionIfInvalid(() -> arrayResultSet.getLong(2));
arrayNode.add(value);
}
node.set(columnName, arrayNode);
Expand All @@ -371,7 +371,7 @@ private void putDoubleArray(final ObjectNode node, final String columnName, fina
final ArrayNode arrayNode = Jsons.arrayNode();
final ResultSet arrayResultSet = resultSet.getArray(colIndex).getResultSet();
while (arrayResultSet.next()) {
arrayNode.add(DataTypeUtils.returnNullIfInvalid(() -> arrayResultSet.getDouble(2), Double::isFinite));
arrayNode.add(DataTypeUtils.throwExceptionIfInvalid(() -> arrayResultSet.getDouble(2), Double::isFinite));
}
node.set(columnName, arrayNode);
}
Expand All @@ -381,7 +381,8 @@ private void putMoneyArray(final ObjectNode node, final String columnName, final
final ResultSet arrayResultSet = resultSet.getArray(colIndex).getResultSet();
while (arrayResultSet.next()) {
final String moneyValue = parseMoneyValue(arrayResultSet.getString(2));
arrayNode.add(DataTypeUtils.returnNullIfInvalid(() -> DataTypeUtils.returnNullIfInvalid(() -> Double.valueOf(moneyValue), Double::isFinite)));
arrayNode.add(
DataTypeUtils.throwExceptionIfInvalid(() -> DataTypeUtils.throwExceptionIfInvalid(() -> Double.valueOf(moneyValue), Double::isFinite)));
}
node.set(columnName, arrayNode);
}
Expand Down Expand Up @@ -612,7 +613,7 @@ protected <T extends PGobject> void putObject(final ObjectNode node,

@Override
protected void putBigDecimal(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) {
final BigDecimal bigDecimal = DataTypeUtils.returnNullIfInvalid(() -> resultSet.getBigDecimal(index));
final BigDecimal bigDecimal = DataTypeUtils.throwExceptionIfInvalid(() -> resultSet.getBigDecimal(index));
if (bigDecimal != null) {
node.put(columnName, bigDecimal);
} else {
Expand All @@ -633,7 +634,7 @@ protected void putDouble(final ObjectNode node, final String columnName, final R

private void putMoney(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
final String moneyValue = parseMoneyValue(resultSet.getString(index));
node.put(columnName, DataTypeUtils.returnNullIfInvalid(() -> Double.valueOf(moneyValue), Double::isFinite));
node.put(columnName, DataTypeUtils.throwExceptionIfInvalid(() -> Double.valueOf(moneyValue), Double::isFinite));
}

private void putHstoreAsJson(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ protected List<AirbyteMessage> runRead(final ConfiguredAirbyteCatalog configured
@Override
protected void postSetup() throws Exception {
final Database database = setupDatabase();
for (final TestDataHolder test : getTestDataHolders()) {
for (final TestDataHolder test : testDataHolders) {
database.query(ctx -> {
ctx.fetch(test.getCreateSqlQuery());
return null;
Expand All @@ -56,7 +56,7 @@ protected void postSetup() throws Exception {
if (stateAfterFirstSync == null) {
throw new RuntimeException("stateAfterFirstSync should not be null");
}
for (final TestDataHolder test : getTestDataHolders()) {
for (final TestDataHolder test : testDataHolders) {
database.query(ctx -> {
test.getInsertSqlQueries().forEach(ctx::fetch);
return null;
Expand Down
Loading

0 comments on commit 9b9ec1c

Please sign in to comment.