Skip to content

Commit

Permalink
DV2: Better errors in the UI (#30491)
Browse files Browse the repository at this point in the history
Co-authored-by: edgao <[email protected]>
  • Loading branch information
edgao and edgao authored Sep 18, 2023
1 parent 26fe788 commit 8805edc
Show file tree
Hide file tree
Showing 14 changed files with 601 additions and 528 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

package io.airbyte.integrations.destination.record_buffer;

import io.airbyte.commons.string.Strings;
import io.airbyte.integrations.util.ConnectorExceptionUtil;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
Expand Down Expand Up @@ -60,7 +60,6 @@ public SerializedBufferingStrategy(final BufferCreateFunction onCreateBuffer,
* @param stream stream associated with record
* @param message {@link AirbyteMessage} to buffer
* @return Optional which contains a {@link BufferFlushType} if a flush occurred, otherwise empty)
* @throws Exception
*/
@Override
public Optional<BufferFlushType> addRecord(final AirbyteStreamNameNamespacePair stream, final AirbyteMessage message) throws Exception {
Expand Down Expand Up @@ -163,9 +162,8 @@ public void close() throws Exception {
LOGGER.error("Exception while closing stream buffer", e);
}
}
if (!exceptionsThrown.isEmpty()) {
throw new RuntimeException(String.format("Exceptions thrown while closing buffers: %s", Strings.join(exceptionsThrown, "\n")));
}

ConnectorExceptionUtil.logAllAndThrowFirst("Exceptions thrown while closing buffers: ", exceptionsThrown);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,29 @@

package io.airbyte.integrations.util;

import static java.util.stream.Collectors.joining;

import com.google.common.collect.ImmutableList;
import io.airbyte.commons.exceptions.ConfigErrorException;
import io.airbyte.commons.exceptions.ConnectionErrorException;
import io.airbyte.integrations.base.errors.messages.ErrorMessage;
import java.sql.SQLException;
import java.sql.SQLSyntaxErrorException;
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.function.Predicate;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Utility class defining methods for handling configuration exceptions in connectors.
*/
public class ConnectorExceptionUtil {

private static final Logger LOGGER = LoggerFactory.getLogger(ConnectorExceptionUtil.class);

public static final String COMMON_EXCEPTION_MESSAGE_TEMPLATE = "Could not connect with provided configuration. Error: %s";
static final String RECOVERY_CONNECTION_ERROR_MESSAGE =
"We're having issues syncing from a Postgres replica that is configured as a hot standby server. " +
Expand All @@ -36,8 +44,7 @@ public static boolean isConfigError(final Throwable e) {
public static String getDisplayMessage(final Throwable e) {
if (e instanceof ConfigErrorException) {
return ((ConfigErrorException) e).getDisplayMessage();
} else if (e instanceof ConnectionErrorException) {
final ConnectionErrorException connEx = (ConnectionErrorException) e;
} else if (e instanceof final ConnectionErrorException connEx) {
return ErrorMessage.getErrorMessage(connEx.getStateCode(), connEx.getErrorCode(), connEx.getExceptionMessage(), connEx);
} else if (isRecoveryConnectionExceptionPredicate().test(e)) {
return RECOVERY_CONNECTION_ERROR_MESSAGE;
Expand All @@ -64,6 +71,23 @@ public static Throwable getRootConfigError(final Exception e) {
return e;
}

/**
* Log all the exceptions, and rethrow the first. This is useful for e.g. running multiple futures
* and waiting for them to complete/fail. Rather than combining them into a single mega-exception
* (which works poorly in the UI), we just log all of them, and throw the first exception.
* <p>
* In most cases, all the exceptions will look very similar, so the user only needs to see the first
* exception anyway. This mimics e.g. a for-loop over multiple tasks, where the loop would break on
* the first exception.
*/
public static <T extends Throwable> void logAllAndThrowFirst(final String initialMessage, final Collection<? extends T> throwables) throws T {
if (!throwables.isEmpty()) {
final String stacktraces = throwables.stream().map(ExceptionUtils::getStackTrace).collect(joining("\n"));
LOGGER.error(initialMessage + stacktraces + "\nRethrowing first exception.");
throw throwables.iterator().next();
}
}

private static Predicate<Throwable> getConfigErrorPredicate() {
return e -> e instanceof ConfigErrorException;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@

package io.airbyte.integrations.base.destination.typing_deduping;

import io.airbyte.integrations.util.ConnectorExceptionUtil;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;

public class FutureUtils {

Expand All @@ -24,17 +24,18 @@ public static int countOfTypingDedupingThreads(final int defaultThreads) {
.orElse(defaultThreads);
}

/**
* Log all exceptions from a list of futures, and rethrow the first exception if there is one. This
* mimics the behavior of running the futures in serial, where the first failure
*/
public static void reduceExceptions(final Collection<CompletableFuture<Optional<Exception>>> potentialExceptions, final String initialMessage)
throws Exception {
final var exceptionMessages = potentialExceptions.stream()
final List<Exception> exceptions = potentialExceptions.stream()
.map(CompletableFuture::join)
.filter(Optional::isPresent)
.map(Optional::get)
.map(Exception::getMessage)
.collect(Collectors.joining("\n"));
if (StringUtils.isNotBlank(exceptionMessages)) {
throw new Exception(initialMessage + exceptionMessages);
}
.toList();
ConnectorExceptionUtil.logAllAndThrowFirst(initialMessage, exceptions);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ ENV AIRBYTE_NORMALIZATION_INTEGRATION bigquery

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=2.0.9
LABEL io.airbyte.version=2.0.10
LABEL io.airbyte.name=airbyte/destination-bigquery
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
dockerImageTag: 2.0.9
dockerImageTag: 2.0.10
dockerRepository: airbyte/destination-bigquery
githubIssueLabel: destination-bigquery
icon: bigquery.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.TableId;
import io.airbyte.commons.string.Strings;
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer;
import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog;
Expand All @@ -15,6 +14,7 @@
import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduper;
import io.airbyte.integrations.destination.bigquery.formatter.DefaultBigQueryRecordFormatter;
import io.airbyte.integrations.destination.bigquery.uploader.AbstractBigQueryUploader;
import io.airbyte.integrations.util.ConnectorExceptionUtil;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteMessage.Type;
import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair;
Expand Down Expand Up @@ -132,9 +132,8 @@ public void close(final boolean hasFailed) throws Exception {
});
typerDeduper.commitFinalTables();
typerDeduper.cleanup();
if (!exceptionsThrown.isEmpty()) {
throw new RuntimeException(String.format("Exceptions thrown while closing consumer: %s", Strings.join(exceptionsThrown, "\n")));
}

ConnectorExceptionUtil.logAllAndThrowFirst("Exceptions thrown while closing consumer: ", exceptionsThrown);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.integrations.destination.bigquery.typing_deduping;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobConfiguration;
import com.google.cloud.bigquery.JobId;
Expand All @@ -15,12 +16,14 @@
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TableDefinition;
import com.google.cloud.bigquery.TableId;
import com.google.common.collect.Streams;
import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler;
import io.airbyte.integrations.base.destination.typing_deduping.StreamId;
import java.math.BigInteger;
import java.util.Comparator;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -32,19 +35,19 @@ public class BigQueryDestinationHandler implements DestinationHandler<TableDefin
private final BigQuery bq;
private final String datasetLocation;

public BigQueryDestinationHandler(final BigQuery bq, String datasetLocation) {
public BigQueryDestinationHandler(final BigQuery bq, final String datasetLocation) {
this.bq = bq;
this.datasetLocation = datasetLocation;
}

@Override
public Optional<TableDefinition> findExistingTable(StreamId id) {
public Optional<TableDefinition> findExistingTable(final StreamId id) {
final Table table = bq.getTable(id.finalNamespace(), id.finalName());
return Optional.ofNullable(table).map(Table::getDefinition);
}

@Override
public boolean isFinalTableEmpty(StreamId id) {
public boolean isFinalTableEmpty(final StreamId id) {
return BigInteger.ZERO.equals(bq.getTable(TableId.of(id.finalNamespace(), id.finalName())).getNumRows());
}

Expand All @@ -67,10 +70,13 @@ public void execute(final String sql) throws InterruptedException {
job = job.reload();
}
if (job.getStatus().getError() != null) {
throw new RuntimeException(job.getStatus().getError().toString());
throw new BigQueryException(Streams.concat(
Stream.of(job.getStatus().getError()),
job.getStatus().getExecutionErrors().stream()
).toList());
}

JobStatistics.QueryStatistics statistics = job.getStatistics();
final JobStatistics.QueryStatistics statistics = job.getStatistics();
LOGGER.info("Root-level job {} completed in {} ms; processed {} bytes; billed for {} bytes",
queryId,
statistics.getEndTime() - statistics.getStartTime(),
Expand All @@ -83,9 +89,9 @@ public void execute(final String sql) throws InterruptedException {
bq.listJobs(BigQuery.JobListOption.parentJobId(job.getJobId().getJob())).streamAll()
.sorted(Comparator.comparing(childJob -> childJob.getStatistics().getEndTime()))
.forEach(childJob -> {
JobConfiguration configuration = childJob.getConfiguration();
if (configuration instanceof QueryJobConfiguration qc) {
JobStatistics.QueryStatistics childQueryStats = childJob.getStatistics();
final JobConfiguration configuration = childJob.getConfiguration();
if (configuration instanceof final QueryJobConfiguration qc) {
final JobStatistics.QueryStatistics childQueryStats = childJob.getStatistics();
String truncatedQuery = qc.getQuery()
.replaceAll("\n", " ")
.replaceAll(" +", " ")
Expand All @@ -101,7 +107,7 @@ public void execute(final String sql) throws InterruptedException {
} else {
// other job types are extract/copy/load
// we're probably not using them, but handle just in case?
JobStatistics childJobStats = childJob.getStatistics();
final JobStatistics childJobStats = childJob.getStatistics();
LOGGER.info("Non-query child job ({}) completed in {} ms",
configuration.getType(),
childJobStats.getEndTime() - childJobStats.getStartTime());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,5 @@ RUN tar xf ${APPLICATION}.tar --strip-components=1

ENV ENABLE_SENTRY true

LABEL io.airbyte.version=3.1.6
LABEL io.airbyte.version=3.1.7
LABEL io.airbyte.name=airbyte/destination-snowflake
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 424892c4-daac-4491-b35d-c6688ba547ba
dockerImageTag: 3.1.6
dockerImageTag: 3.1.7
dockerRepository: airbyte/destination-snowflake
githubIssueLabel: destination-snowflake
icon: snowflake.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@
import java.util.LinkedHashMap;
import java.util.Optional;
import java.util.UUID;
import net.snowflake.client.jdbc.SnowflakeSQLException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SnowflakeDestinationHandler implements DestinationHandler<SnowflakeTableDefinition> {

private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeDestinationHandler.class);
public static final String EXCEPTION_COMMON_PREFIX = "JavaScript execution error: Uncaught Execution of multiple statements failed on statement";

private final String databaseName;
private final JdbcDatabase database;
Expand Down Expand Up @@ -79,7 +81,20 @@ public void execute(final String sql) throws Exception {
LOGGER.info("Executing sql {}: {}", queryId, sql);
final long startTime = System.currentTimeMillis();

database.execute(sql);
try {
database.execute(sql);
} catch (final SnowflakeSQLException e) {
LOGGER.error("Sql {} failed", queryId, e);
// Snowflake SQL exceptions by default may not be super helpful, so we try to extract the relevant part of the message.
final String trimmedMessage;
if (e.getMessage().startsWith(EXCEPTION_COMMON_PREFIX)) {
// The first line is a pretty generic message, so just remove it
trimmedMessage = e.getMessage().substring(e.getMessage().indexOf("\n") + 1);
} else {
trimmedMessage = e.getMessage();
}
throw new RuntimeException(trimmedMessage, e);
}

LOGGER.info("Sql {} completed in {} ms", queryId, System.currentTimeMillis() - startTime);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ public void incrementalDedupInvalidPrimaryKey() throws Exception {

final String sql = generator.updateTable(incrementalDedupStream, "");
final Exception exception = assertThrows(
SnowflakeSQLException.class,
RuntimeException.class,
() -> destinationHandler.execute(sql));

assertTrue(exception.getMessage().contains("_AB_MISSING_PRIMARY_KEY"));
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/destinations/bigquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ Now that you have set up the BigQuery destination connector, check out the follo

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 2.0.10 | 2023-09-15 | [\#30491](https://github.com/airbytehq/airbyte/pull/30491) | Improve error message display |
| 2.0.9 | 2023-09-14 | [\#30439](https://github.com/airbytehq/airbyte/pull/30439) | Fix a transient error |
| 2.0.8 | 2023-09-12 | [\#30364](https://github.com/airbytehq/airbyte/pull/30364) | Add log message |
| 2.0.7 | 2023-08-29 | [29878](https://github.com/airbytehq/airbyte/pull/29878) | Internal code changes |
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/destinations/snowflake.md
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ Otherwise, make sure to grant the role the required permissions in the desired n

| Version | Date | Pull Request | Subject |
| :-------------- | :--------- | :--------------------------------------------------------- | :-------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| 3.1.7 | 2023-09-15 | [\#30491](https://github.com/airbytehq/airbyte/pull/30491) | Improve error message display |
| 3.1.6 | 2023-09-14 | [\#30439](https://github.com/airbytehq/airbyte/pull/30439) | Fix a transient error |
| 3.1.5 | 2023-09-13 | [\#30416](https://github.com/airbytehq/airbyte/pull/30416) | Support `${` in stream name/namespace, and in column names |
| 3.1.4 | 2023-09-12 | [\#30364](https://github.com/airbytehq/airbyte/pull/30364) | Add log message |
Expand Down

0 comments on commit 8805edc

Please sign in to comment.