diff --git a/airbyte-integrations/connectors/destination-bigquery/build.gradle b/airbyte-integrations/connectors/destination-bigquery/build.gradle index c23be7d3cc4b0..6e5847e5ceb3d 100644 --- a/airbyte-integrations/connectors/destination-bigquery/build.gradle +++ b/airbyte-integrations/connectors/destination-bigquery/build.gradle @@ -3,7 +3,7 @@ plugins { } airbyteJavaConnector { - cdkVersionRequired = '0.35.7' + cdkVersionRequired = '0.35.0' features = [ 'db-destinations', 'datastore-bigquery', @@ -22,7 +22,7 @@ java { } application { - mainClass = 'io.airbyte.integrations.destination.bigquery.BigQueryDestinationKt' + mainClass = 'io.airbyte.integrations.destination.bigquery.BigQueryDestination' applicationDefaultJvmArgs = ['-XX:+ExitOnOutOfMemoryError', '-XX:MaxRAMPercentage=75.0', '-XX:NativeMemoryTracking=detail', '-XX:+UnlockDiagnosticVMOptions', '-XX:GCLockerRetryAllocationCount=100', @@ -37,5 +37,6 @@ application { } dependencies { + implementation 'com.codepoetics:protonpack:1.13' implementation 'org.apache.commons:commons-text:1.10.0' } diff --git a/airbyte-integrations/connectors/destination-bigquery/metadata.yaml b/airbyte-integrations/connectors/destination-bigquery/metadata.yaml index 9ce4d0628bc24..215097e05c2e2 100644 --- a/airbyte-integrations/connectors/destination-bigquery/metadata.yaml +++ b/airbyte-integrations/connectors/destination-bigquery/metadata.yaml @@ -5,7 +5,7 @@ data: connectorSubtype: database connectorType: destination definitionId: 22f6c74f-5699-40ff-833c-4a879ea40133 - dockerImageTag: 2.5.0 + dockerImageTag: 2.4.21 dockerRepository: airbyte/destination-bigquery documentationUrl: https://docs.airbyte.com/integrations/destinations/bigquery githubIssueLabel: destination-bigquery diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryAsyncFlush.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryAsyncFlush.java new file mode 100644 index 0000000000000..142a307aa64f7 --- /dev/null +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryAsyncFlush.java @@ -0,0 +1,105 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.bigquery; + +import com.google.cloud.bigquery.TableId; +import io.airbyte.cdk.integrations.base.JavaBaseConstants.DestinationColumns; +import io.airbyte.cdk.integrations.destination.async.function.DestinationFlushFunction; +import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage; +import io.airbyte.cdk.integrations.destination.record_buffer.FileBuffer; +import io.airbyte.cdk.integrations.destination.record_buffer.SerializableBuffer; +import io.airbyte.cdk.integrations.destination.s3.csv.CsvSerializedBuffer; +import io.airbyte.cdk.integrations.destination.s3.csv.StagingDatabaseCsvSheetGenerator; +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig; +import io.airbyte.integrations.base.destination.typing_deduping.StreamId; +import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter; +import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.v0.StreamDescriptor; +import java.util.Map; +import java.util.stream.Stream; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.io.FileUtils; + +/** + * Async flushing logic. Flushing async prevents backpressure and is the superior flushing strategy. + */ +@Slf4j +class BigQueryAsyncFlush implements DestinationFlushFunction { + + private final Map streamConfigMap; + private final BigQueryGcsOperations stagingOperations; + private final ConfiguredAirbyteCatalog catalog; + + public BigQueryAsyncFlush( + final Map streamConfigMap, + final BigQueryGcsOperations stagingOperations, + final ConfiguredAirbyteCatalog catalog) { + this.streamConfigMap = streamConfigMap; + this.stagingOperations = stagingOperations; + this.catalog = catalog; + } + + @Override + public void flush(final StreamDescriptor decs, final Stream stream) throws Exception { + final SerializableBuffer writer; + try { + writer = new CsvSerializedBuffer( + new FileBuffer(CsvSerializedBuffer.CSV_GZ_SUFFIX), + new StagingDatabaseCsvSheetGenerator(DestinationColumns.V2_WITHOUT_META), + true); + + stream.forEach(record -> { + try { + writer.accept(record.getSerialized(), Jsons.serialize(record.getRecord().getMeta()), record.getRecord().getEmittedAt()); + } catch (final Exception e) { + throw new RuntimeException(e); + } + }); + } catch (final Exception e) { + throw new RuntimeException(e); + } + + writer.flush(); + log.info("Flushing CSV buffer for stream {} ({}) to staging", decs.getName(), FileUtils.byteCountToDisplaySize(writer.getByteCount())); + if (!streamConfigMap.containsKey(decs)) { + throw new IllegalArgumentException( + String.format("Message contained record from a stream that was not in the catalog. \ncatalog: %s", Jsons.serialize(catalog))); + } + + final StreamId streamId = streamConfigMap.get(decs).getId(); + try { + final String stagedFileName = stagingOperations.uploadRecordsToStage(streamId.getRawNamespace(), streamId.getOriginalName(), writer); + + stagingOperations.copyIntoTableFromStage( + streamId.getRawNamespace(), + streamId.getOriginalName(), + TableId.of(streamId.getRawNamespace(), streamId.getRawName()), + BigQueryRecordFormatter.SCHEMA_V2, + stagedFileName); + } catch (final Exception e) { + log.error("Failed to flush and commit buffer data into destination's raw table", e); + throw new RuntimeException("Failed to upload buffer to stage and commit to destination", e); + } + + writer.close(); + } + + @Override + public long getOptimalBatchSizeBytes() { + // Chosen arbitrarily (mostly to match legacy behavior). We have no reason to believe a larger + // number would be worse. + // This was previously set to 25MB, which ran into rate-limiting issues: + // https://cloud.google.com/bigquery/quotas#standard_tables + // > Your project can make up to 1,500 table modifications per table per day + return 200 * 1024 * 1024; + } + + @Override + public long getQueueFlushThresholdBytes() { + return 200 * 1024 * 1024; + } + +} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryAsyncStandardFlush.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryAsyncStandardFlush.java new file mode 100644 index 0000000000000..9cb5ba5792ba9 --- /dev/null +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryAsyncStandardFlush.java @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.bigquery; + +import com.google.common.util.concurrent.RateLimiter; +import io.airbyte.cdk.integrations.destination.async.function.DestinationFlushFunction; +import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage; +import io.airbyte.integrations.destination.bigquery.uploader.BigQueryDirectUploader; +import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.v0.StreamDescriptor; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; +import java.util.stream.Stream; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class BigQueryAsyncStandardFlush implements DestinationFlushFunction { + + // TODO remove this once the async framework supports rate-limiting/backpressuring + private static final RateLimiter rateLimiter = RateLimiter.create(0.07); + private final Supplier> uploaderMap; + + public BigQueryAsyncStandardFlush(final Supplier> uploaderMap) { + this.uploaderMap = uploaderMap; + } + + @Override + public void flush(final StreamDescriptor decs, final Stream stream) throws Exception { + rateLimiter.acquire(); + final ConcurrentMap uploaderMapSupplied = uploaderMap.get(); + final AtomicInteger recordCount = new AtomicInteger(); + stream.forEach(aibyteMessage -> { + try { + final AirbyteStreamNameNamespacePair sd = new AirbyteStreamNameNamespacePair(aibyteMessage.getRecord().getStream(), + aibyteMessage.getRecord().getNamespace()); + uploaderMapSupplied.get(sd).upload(aibyteMessage); + recordCount.getAndIncrement(); + } catch (final Exception e) { + log.error("An error happened while trying to flush a record to big query", e); + throw e; + } + }); + uploaderMapSupplied.values().forEach(test -> test.closeAfterPush()); + } + + @Override + public long getOptimalBatchSizeBytes() { + // todo(ryankfu): this should be per-destination specific. currently this is for Snowflake. + // The size chosen is currently for improving the performance of low memory connectors. With 1 Gi of + // resource the connector will usually at most fill up around 150 MB in a single queue. By lowering + // the batch size, the AsyncFlusher will flush in smaller batches which allows for memory to be + // freed earlier similar to a sliding window effect + return Double.valueOf(Runtime.getRuntime().maxMemory() * 0.2).longValue(); + } + +} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryConsts.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryConsts.java new file mode 100644 index 0000000000000..8cc29dd511fa5 --- /dev/null +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryConsts.java @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.bigquery; + +public class BigQueryConsts { + + public static final int MiB = 1024 * 1024; + public static final String CONFIG_DATASET_ID = "dataset_id"; + public static final String CONFIG_PROJECT_ID = "project_id"; + public static final String CONFIG_DATASET_LOCATION = "dataset_location"; + public static final String CONFIG_CREDS = "credentials_json"; + public static final String BIG_QUERY_CLIENT_CHUNK_SIZE = "big_query_client_buffer_size_mb"; + + public static final String LOADING_METHOD = "loading_method"; + public static final String METHOD = "method"; + public static final String GCS_STAGING = "GCS Staging"; + public static final String GCS_BUCKET_NAME = "gcs_bucket_name"; + public static final String GCS_BUCKET_PATH = "gcs_bucket_path"; + public static final String GCS_BUCKET_REGION = "gcs_bucket_region"; + public static final String CREDENTIAL = "credential"; + public static final String FORMAT = "format"; + public static final String KEEP_GCS_FILES = "keep_files_in_gcs-bucket"; + public static final String KEEP_GCS_FILES_VAL = "Keep all tmp files in GCS"; + + public static final String DISABLE_TYPE_DEDUPE = "disable_type_dedupe"; + + public static final String NAMESPACE_PREFIX = "n"; + + // tests + public static final String BIGQUERY_BASIC_CONFIG = "basic_bigquery_config"; + public static final String GCS_CONFIG = "gcs_config"; + + public static final String CREDENTIAL_TYPE = "credential_type"; + public static final String HMAC_KEY_ACCESS_ID = "hmac_key_access_id"; + public static final String HMAC_KEY_ACCESS_SECRET = "hmac_key_secret"; + +} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java new file mode 100644 index 0000000000000..1dd41d447fbb4 --- /dev/null +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java @@ -0,0 +1,473 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.bigquery; + +import com.codepoetics.protonpack.StreamUtils; +import com.fasterxml.jackson.databind.JsonNode; +import com.google.auth.oauth2.GoogleCredentials; +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryException; +import com.google.cloud.bigquery.BigQueryOptions; +import com.google.cloud.bigquery.Dataset; +import com.google.cloud.bigquery.Job; +import com.google.cloud.bigquery.QueryJobConfiguration; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.StorageOptions; +import com.google.common.base.Charsets; +import io.airbyte.cdk.integrations.BaseConnector; +import io.airbyte.cdk.integrations.base.AirbyteExceptionHandler; +import io.airbyte.cdk.integrations.base.AirbyteMessageConsumer; +import io.airbyte.cdk.integrations.base.Destination; +import io.airbyte.cdk.integrations.base.IntegrationRunner; +import io.airbyte.cdk.integrations.base.JavaBaseConstants; +import io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer; +import io.airbyte.cdk.integrations.base.TypingAndDedupingFlag; +import io.airbyte.cdk.integrations.destination.StandardNameTransformer; +import io.airbyte.cdk.integrations.destination.gcs.BaseGcsDestination; +import io.airbyte.cdk.integrations.destination.gcs.GcsDestinationConfig; +import io.airbyte.cdk.integrations.destination.gcs.GcsNameTransformer; +import io.airbyte.cdk.integrations.destination.gcs.GcsStorageOperations; +import io.airbyte.commons.exceptions.ConfigErrorException; +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.base.destination.typing_deduping.CatalogParser; +import io.airbyte.integrations.base.destination.typing_deduping.DefaultTyperDeduper; +import io.airbyte.integrations.base.destination.typing_deduping.NoOpTyperDeduperWithV1V2Migrations; +import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog; +import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig; +import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduper; +import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter; +import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryDestinationHandler; +import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQuerySqlGenerator; +import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryV1V2Migrator; +import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryV2TableMigrator; +import io.airbyte.integrations.destination.bigquery.uploader.BigQueryDirectUploader; +import io.airbyte.integrations.destination.bigquery.uploader.BigQueryUploaderFactory; +import io.airbyte.integrations.destination.bigquery.uploader.UploaderType; +import io.airbyte.integrations.destination.bigquery.uploader.config.UploaderConfig; +import io.airbyte.protocol.models.v0.AirbyteConnectionStatus; +import io.airbyte.protocol.models.v0.AirbyteConnectionStatus.Status; +import io.airbyte.protocol.models.v0.AirbyteMessage; +import io.airbyte.protocol.models.v0.AirbyteStream; +import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream; +import io.airbyte.protocol.models.v0.DestinationSyncMode; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.function.Consumer; +import java.util.function.Supplier; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.jetbrains.annotations.NotNull; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BigQueryDestination extends BaseConnector implements Destination { + + private static final String RAW_DATA_DATASET = "raw_data_dataset"; + + private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryDestination.class); + private static final List REQUIRED_PERMISSIONS = List.of( + "storage.multipartUploads.abort", + "storage.multipartUploads.create", + "storage.objects.create", + "storage.objects.delete", + "storage.objects.get", + "storage.objects.list"); + protected final BigQuerySQLNameTransformer namingResolver; + + public BigQueryDestination() { + namingResolver = new BigQuerySQLNameTransformer(); + } + + @Override + public AirbyteConnectionStatus check(final JsonNode config) { + try { + final String datasetId = BigQueryUtils.getDatasetId(config); + final String datasetLocation = BigQueryUtils.getDatasetLocation(config); + final BigQuery bigquery = getBigQuery(config); + final UploadingMethod uploadingMethod = BigQueryUtils.getLoadingMethod(config); + + BigQueryUtils.checkHasCreateAndDeleteDatasetRole(bigquery, datasetId, datasetLocation); + + final Dataset dataset = BigQueryUtils.getOrCreateDataset(bigquery, datasetId, datasetLocation); + if (!dataset.getLocation().equals(datasetLocation)) { + throw new ConfigErrorException("Actual dataset location doesn't match to location from config"); + } + final QueryJobConfiguration queryConfig = QueryJobConfiguration + .newBuilder(String.format("SELECT * FROM `%s.INFORMATION_SCHEMA.TABLES` LIMIT 1;", datasetId)) + .setUseLegacySql(false) + .build(); + + if (UploadingMethod.GCS.equals(uploadingMethod)) { + final AirbyteConnectionStatus status = checkGcsPermission(config); + if (!status.getStatus().equals(Status.SUCCEEDED)) { + return status; + } + } + + final ImmutablePair result = BigQueryUtils.executeQuery(bigquery, queryConfig); + if (result.getLeft() != null) { + return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED); + } else { + throw new ConfigErrorException(result.getRight()); + } + } catch (final Exception e) { + LOGGER.error("Check failed.", e); + throw new ConfigErrorException(e.getMessage() != null ? e.getMessage() : e.toString()); + } + } + + /** + * This method does two checks: 1) permissions related to the bucket, and 2) the ability to create + * and delete an actual file. The latter is important because even if the service account may have + * the proper permissions, the HMAC keys can only be verified by running the actual GCS check. + */ + private AirbyteConnectionStatus checkGcsPermission(final JsonNode config) { + final JsonNode loadingMethod = config.get(BigQueryConsts.LOADING_METHOD); + final String bucketName = loadingMethod.get(BigQueryConsts.GCS_BUCKET_NAME).asText(); + final List missingPermissions = new ArrayList<>(); + + try { + final GoogleCredentials credentials = getServiceAccountCredentials(config); + final Storage storage = StorageOptions.newBuilder() + .setProjectId(config.get(BigQueryConsts.CONFIG_PROJECT_ID).asText()) + .setCredentials(credentials) + .setHeaderProvider(BigQueryUtils.getHeaderProvider()) + .build().getService(); + final List permissionsCheckStatusList = storage.testIamPermissions(bucketName, REQUIRED_PERMISSIONS); + + missingPermissions.addAll(StreamUtils + .zipWithIndex(permissionsCheckStatusList.stream()) + .filter(i -> !i.getValue()) + .map(i -> REQUIRED_PERMISSIONS.get(Math.toIntExact(i.getIndex()))) + .toList()); + + final BaseGcsDestination gcsDestination = new BaseGcsDestination() {}; + final JsonNode gcsJsonNodeConfig = BigQueryUtils.getGcsJsonNodeConfig(config); + return gcsDestination.check(gcsJsonNodeConfig); + } catch (final Exception e) { + final StringBuilder message = new StringBuilder("Cannot access the GCS bucket."); + if (!missingPermissions.isEmpty()) { + message.append(" The following permissions are missing on the service account: ") + .append(String.join(", ", missingPermissions)) + .append("."); + } + message.append(" Please make sure the service account can access the bucket path, and the HMAC keys are correct."); + + LOGGER.error(message.toString(), e); + throw new ConfigErrorException("Could not access the GCS bucket with the provided configuration.\n", e); + } + } + + public static BigQuery getBigQuery(final JsonNode config) { + final String projectId = config.get(BigQueryConsts.CONFIG_PROJECT_ID).asText(); + + try { + final BigQueryOptions.Builder bigQueryBuilder = BigQueryOptions.newBuilder(); + final GoogleCredentials credentials = getServiceAccountCredentials(config); + return bigQueryBuilder + .setProjectId(projectId) + .setCredentials(credentials) + .setHeaderProvider(BigQueryUtils.getHeaderProvider()) + .build() + .getService(); + } catch (final IOException e) { + throw new RuntimeException(e); + } + } + + public static GoogleCredentials getServiceAccountCredentials(final JsonNode config) throws IOException { + final JsonNode serviceAccountKey = config.get(BigQueryConsts.CONFIG_CREDS); + // Follows this order of resolution: + // https://cloud.google.com/java/docs/reference/google-auth-library/latest/com.google.auth.oauth2.GoogleCredentials#com_google_auth_oauth2_GoogleCredentials_getApplicationDefault + if (serviceAccountKey == null) { + LOGGER.info("No service account key json is provided. It is required if you are using Airbyte cloud."); + LOGGER.info("Using the default service account credential from environment."); + return GoogleCredentials.getApplicationDefault(); + } + + // The JSON credential can either be a raw JSON object, or a serialized JSON object. + final String credentialsString = serviceAccountKey.isObject() + ? Jsons.serialize(serviceAccountKey) + : serviceAccountKey.asText(); + return GoogleCredentials.fromStream( + new ByteArrayInputStream(credentialsString.getBytes(Charsets.UTF_8))); + } + + /** + * Returns a {@link AirbyteMessageConsumer} based on whether the uploading mode is STANDARD INSERTS + * or using STAGING + * + * @param config - integration-specific configuration object as json. e.g. { "username": "airbyte", + * "password": "super secure" } + * @param catalog - schema of the incoming messages. + */ + @Override + public AirbyteMessageConsumer getConsumer(final JsonNode config, + final ConfiguredAirbyteCatalog catalog, + final Consumer outputRecordCollector) { + throw new UnsupportedOperationException("Should use getSerializedMessageConsumer"); + } + + @Override + @SuppressWarnings("deprecation") + public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final @NotNull JsonNode config, + final @NotNull ConfiguredAirbyteCatalog catalog, + final @NotNull Consumer outputRecordCollector) + throws Exception { + final UploadingMethod uploadingMethod = BigQueryUtils.getLoadingMethod(config); + final String defaultNamespace = BigQueryUtils.getDatasetId(config); + setDefaultStreamNamespace(catalog, defaultNamespace); + final boolean disableTypeDedupe = BigQueryUtils.getDisableTypeDedupFlag(config); + final String datasetLocation = BigQueryUtils.getDatasetLocation(config); + final BigQuerySqlGenerator sqlGenerator = new BigQuerySqlGenerator(config.get(BigQueryConsts.CONFIG_PROJECT_ID).asText(), datasetLocation); + final Optional rawNamespaceOverride = TypingAndDedupingFlag.getRawNamespaceOverride(RAW_DATA_DATASET); + final ParsedCatalog parsedCatalog = parseCatalog(sqlGenerator, defaultNamespace, + rawNamespaceOverride.orElse(JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE), catalog); + final BigQuery bigquery = getBigQuery(config); + final TyperDeduper typerDeduper = + buildTyperDeduper(sqlGenerator, parsedCatalog, bigquery, datasetLocation, disableTypeDedupe); + + AirbyteExceptionHandler.addAllStringsInConfigForDeinterpolation(config); + final JsonNode serviceAccountKey = config.get(BigQueryConsts.CONFIG_CREDS); + if (serviceAccountKey != null) { + // If the service account key is a non-null string, we will try to + // deserialize it. Otherwise, we will let the Google library find it in + // the environment during the client initialization. + if (serviceAccountKey.isTextual()) { + // There are cases where we fail to deserialize the service account key. In these cases, we + // shouldn't do anything. + // Google's creds library is more lenient with JSON-parsing than Jackson, and I'd rather just let it + // go. + Jsons.tryDeserialize(serviceAccountKey.asText()) + .ifPresent(AirbyteExceptionHandler::addAllStringsInConfigForDeinterpolation); + } else { + AirbyteExceptionHandler.addAllStringsInConfigForDeinterpolation(serviceAccountKey); + } + } + + if (uploadingMethod == UploadingMethod.STANDARD) { + LOGGER.warn("The \"standard\" upload mode is not performant, and is not recommended for production. " + + "Please use the GCS upload mode if you are syncing a large amount of data."); + return getStandardRecordConsumer(bigquery, config, catalog, parsedCatalog, outputRecordCollector, typerDeduper); + } + + final StandardNameTransformer gcsNameTransformer = new GcsNameTransformer(); + final GcsDestinationConfig gcsConfig = BigQueryUtils.getGcsCsvDestinationConfig(config); + final UUID stagingId = UUID.randomUUID(); + final DateTime syncDatetime = DateTime.now(DateTimeZone.UTC); + final boolean keepStagingFiles = BigQueryUtils.isKeepFilesInGcs(config); + final GcsStorageOperations gcsOperations = new GcsStorageOperations(gcsNameTransformer, gcsConfig.getS3Client(), gcsConfig); + final BigQueryGcsOperations bigQueryGcsOperations = new BigQueryGcsOperations( + bigquery, + gcsNameTransformer, + gcsConfig, + gcsOperations, + datasetLocation, + stagingId, + syncDatetime, + keepStagingFiles); + + return new BigQueryStagingConsumerFactory().createAsync( + catalog, + outputRecordCollector, + bigQueryGcsOperations, + typerDeduper, + parsedCatalog, + BigQueryUtils.getDatasetId(config)); + } + + protected Supplier> getUploaderMap( + final BigQuery bigquery, + final JsonNode config, + final ConfiguredAirbyteCatalog catalog, + final ParsedCatalog parsedCatalog) + throws IOException { + return () -> { + final ConcurrentMap uploaderMap = new ConcurrentHashMap<>(); + for (final ConfiguredAirbyteStream configStream : catalog.getStreams()) { + final AirbyteStream stream = configStream.getStream(); + final StreamConfig parsedStream; + + final String targetTableName; + + parsedStream = parsedCatalog.getStream(stream.getNamespace(), stream.getName()); + targetTableName = parsedStream.getId().getRawName(); + + final UploaderConfig uploaderConfig = UploaderConfig + .builder() + .bigQuery(bigquery) + .configStream(configStream) + .parsedStream(parsedStream) + .config(config) + .formatterMap(getFormatterMap()) + .targetTableName(targetTableName) + // This refers to whether this is BQ denormalized or not + .isDefaultAirbyteTmpSchema(isDefaultAirbyteTmpTableSchema()) + .build(); + + try { + putStreamIntoUploaderMap(stream, uploaderConfig, uploaderMap); + } catch (final IOException e) { + throw new RuntimeException(e); + } + } + return uploaderMap; + }; + } + + protected void putStreamIntoUploaderMap(final AirbyteStream stream, + final UploaderConfig uploaderConfig, + final Map uploaderMap) + throws IOException { + uploaderMap.put( + AirbyteStreamNameNamespacePair.fromAirbyteStream(stream), + BigQueryUploaderFactory.getUploader(uploaderConfig)); + } + + /** + * BigQuery might have different structure of the Temporary table. If this method returns TRUE, + * temporary table will have only three common Airbyte attributes. In case of FALSE, temporary table + * structure will be in line with Airbyte message JsonSchema. + * + * @return use default AirbyteSchema or build using JsonSchema + */ + protected boolean isDefaultAirbyteTmpTableSchema() { + return true; + } + + protected Map getFormatterMap() { + return Map.of( + UploaderType.STANDARD, new BigQueryRecordFormatter(namingResolver), + UploaderType.CSV, new BigQueryRecordFormatter(namingResolver)); + } + + private SerializedAirbyteMessageConsumer getStandardRecordConsumer(final BigQuery bigquery, + final JsonNode config, + final ConfiguredAirbyteCatalog catalog, + final ParsedCatalog parsedCatalog, + final Consumer outputRecordCollector, + final TyperDeduper typerDeduper) + throws Exception { + final Supplier> writeConfigs = getUploaderMap( + bigquery, + config, + catalog, + parsedCatalog); + + final String bqNamespace = BigQueryUtils.getDatasetId(config); + + return new BigQueryRecordStandardConsumer( + outputRecordCollector, + () -> { + typerDeduper.prepareSchemasAndRunMigrations(); + + // Set up our raw tables + writeConfigs.get().forEach((streamId, uploader) -> { + final StreamConfig stream = parsedCatalog.getStream(streamId); + if (stream.getDestinationSyncMode() == DestinationSyncMode.OVERWRITE) { + // For streams in overwrite mode, truncate the raw table. + // non-1s1t syncs actually overwrite the raw table at the end of the sync, so we only do this in + // 1s1t mode. + final TableId rawTableId = TableId.of(stream.getId().getRawNamespace(), stream.getId().getRawName()); + LOGGER.info("Deleting Raw table {}", rawTableId); + if (!bigquery.delete(rawTableId)) { + LOGGER.info("Raw table {} not found, continuing with creation", rawTableId); + } + LOGGER.info("Creating table {}", rawTableId); + BigQueryUtils.createPartitionedTableIfNotExists(bigquery, rawTableId, BigQueryRecordFormatter.SCHEMA_V2); + } else { + uploader.createRawTable(); + } + }); + + typerDeduper.prepareFinalTables(); + }, + (hasFailed, streamSyncSummaries) -> { + try { + Thread.sleep(30 * 1000); + typerDeduper.typeAndDedupe(streamSyncSummaries); + typerDeduper.commitFinalTables(); + typerDeduper.cleanup(); + } catch (final Exception e) { + throw new RuntimeException(e); + } + }, + catalog, + bqNamespace, + writeConfigs); + } + + private void setDefaultStreamNamespace(final ConfiguredAirbyteCatalog catalog, final String namespace) { + // Set the default originalNamespace on streams with null originalNamespace. This means we don't + // need to repeat this + // logic in the rest of the connector. + // (record messages still need to handle null namespaces though, which currently happens in e.g. + // AsyncStreamConsumer#accept) + // This probably should be shared logic amongst destinations eventually. + for (final ConfiguredAirbyteStream stream : catalog.getStreams()) { + if (StringUtils.isEmpty(stream.getStream().getNamespace())) { + stream.getStream().withNamespace(namespace); + } + } + } + + private ParsedCatalog parseCatalog(final BigQuerySqlGenerator sqlGenerator, + final String defaultNamespace, + final String rawNamespaceOverride, + final ConfiguredAirbyteCatalog catalog) { + final CatalogParser catalogParser = new CatalogParser(sqlGenerator, rawNamespaceOverride); + + return catalogParser.parseCatalog(catalog); + } + + private TyperDeduper buildTyperDeduper(final BigQuerySqlGenerator sqlGenerator, + final ParsedCatalog parsedCatalog, + final BigQuery bigquery, + final String datasetLocation, + final boolean disableTypeDedupe) { + final BigQueryV1V2Migrator migrator = new BigQueryV1V2Migrator(bigquery, namingResolver); + final BigQueryV2TableMigrator v2RawTableMigrator = new BigQueryV2TableMigrator(bigquery); + final BigQueryDestinationHandler destinationHandler = new BigQueryDestinationHandler( + bigquery, + datasetLocation); + + if (disableTypeDedupe) { + return new NoOpTyperDeduperWithV1V2Migrations<>( + sqlGenerator, destinationHandler, parsedCatalog, migrator, v2RawTableMigrator, List.of()); + } + + return new DefaultTyperDeduper<>( + sqlGenerator, + destinationHandler, + parsedCatalog, + migrator, + v2RawTableMigrator, + List.of()); + } + + @Override + public boolean isV2Destination() { + return true; + } + + public static void main(final String[] args) throws Exception { + AirbyteExceptionHandler.addThrowableForDeinterpolation(BigQueryException.class); + final Destination destination = new BigQueryDestination(); + new IntegrationRunner(destination).run(args); + } + +} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryGcsOperations.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryGcsOperations.java new file mode 100644 index 0000000000000..c230f2bb79cbc --- /dev/null +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryGcsOperations.java @@ -0,0 +1,206 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.bigquery; + +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryException; +import com.google.cloud.bigquery.FormatOptions; +import com.google.cloud.bigquery.Job; +import com.google.cloud.bigquery.JobInfo; +import com.google.cloud.bigquery.JobInfo.WriteDisposition; +import com.google.cloud.bigquery.LoadJobConfiguration; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.TableId; +import io.airbyte.cdk.integrations.destination.StandardNameTransformer; +import io.airbyte.cdk.integrations.destination.gcs.GcsDestinationConfig; +import io.airbyte.cdk.integrations.destination.gcs.GcsStorageOperations; +import io.airbyte.cdk.integrations.destination.record_buffer.SerializableBuffer; +import io.airbyte.cdk.integrations.util.ConnectorExceptionUtil; +import io.airbyte.commons.exceptions.ConfigErrorException; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import org.joda.time.DateTime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BigQueryGcsOperations { + + private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryGcsOperations.class); + + private final BigQuery bigQuery; + private final StandardNameTransformer gcsNameTransformer; + private final GcsDestinationConfig gcsConfig; + private final GcsStorageOperations gcsStorageOperations; + + private final String datasetLocation; + private final UUID randomStagingId; + private final DateTime syncDatetime; + private final boolean keepStagingFiles; + private final Set existingSchemas = new HashSet<>(); + + public BigQueryGcsOperations(final BigQuery bigQuery, + final StandardNameTransformer gcsNameTransformer, + final GcsDestinationConfig gcsConfig, + final GcsStorageOperations gcsStorageOperations, + final String datasetLocation, + final UUID randomStagingId, + final DateTime syncDatetime, + final boolean keepStagingFiles) { + this.bigQuery = bigQuery; + this.gcsNameTransformer = gcsNameTransformer; + this.gcsConfig = gcsConfig; + this.gcsStorageOperations = gcsStorageOperations; + this.datasetLocation = datasetLocation; + this.randomStagingId = randomStagingId; + this.syncDatetime = syncDatetime; + this.keepStagingFiles = keepStagingFiles; + } + + /** + * @return {@code /_} + */ + private String getStagingRootPath(final String datasetId, final String stream) { + return gcsNameTransformer.applyDefaultCase(String.format("%s/%s_%s", + gcsConfig.getBucketPath(), + gcsNameTransformer.convertStreamName(datasetId), + gcsNameTransformer.convertStreamName(stream))); + } + + /** + * @return {@code /_//////} + */ + public String getStagingFullPath(final String datasetId, final String stream) { + return gcsNameTransformer.applyDefaultCase(String.format("%s/%s/%02d/%02d/%02d/%s/", + getStagingRootPath(datasetId, stream), + syncDatetime.year().get(), + syncDatetime.monthOfYear().get(), + syncDatetime.dayOfMonth().get(), + syncDatetime.hourOfDay().get(), + randomStagingId)); + } + + public void createSchemaIfNotExists(final String datasetId) { + if (!existingSchemas.contains(datasetId)) { + LOGGER.info("Creating dataset {}", datasetId); + try { + BigQueryUtils.getOrCreateDataset(bigQuery, datasetId, datasetLocation); + } catch (final BigQueryException e) { + if (ConnectorExceptionUtil.HTTP_AUTHENTICATION_ERROR_CODES.contains(e.getCode())) { + throw new ConfigErrorException(e.getMessage(), e); + } else { + throw e; + } + } + existingSchemas.add(datasetId); + } + } + + public void createTableIfNotExists(final TableId tableId, final Schema tableSchema) { + LOGGER.info("Creating target table {}", tableId); + BigQueryUtils.createPartitionedTableIfNotExists(bigQuery, tableId, tableSchema); + } + + public void createStageIfNotExists(final String datasetId, final String stream) { + final String objectPath = getStagingFullPath(datasetId, stream); + LOGGER.info("Creating staging path for stream {} (dataset {}): {}", stream, datasetId, objectPath); + gcsStorageOperations.createBucketIfNotExists(); + } + + public String uploadRecordsToStage(final String datasetId, final String stream, final SerializableBuffer writer) { + final String objectPath = getStagingFullPath(datasetId, stream); + LOGGER.info("Uploading records to staging for stream {} (dataset {}): {}", stream, datasetId, objectPath); + return gcsStorageOperations.uploadRecordsToBucket(writer, datasetId, objectPath); + } + + /** + * Similar to COPY INTO within + * {@link io.airbyte.cdk.integrations.destination.staging.StagingOperations} which loads the data + * stored in the stage area into a target table in the destination + * + * Reference + * https://googleapis.dev/java/google-cloud-clients/latest/index.html?com/google/cloud/bigquery/package-summary.html + */ + public void copyIntoTableFromStage(final String datasetId, + final String stream, + final TableId tableId, + final Schema tableSchema, + final String stagedFileName) { + LOGGER.info("Uploading records from staging files to target table {} (dataset {}): {}", + tableId, datasetId, stagedFileName); + + final String fullFilePath = String.format("gs://%s/%s%s", gcsConfig.getBucketName(), getStagingFullPath(datasetId, stream), stagedFileName); + LOGGER.info("Uploading staged file: {}", fullFilePath); + final LoadJobConfiguration configuration = LoadJobConfiguration.builder(tableId, fullFilePath) + .setFormatOptions(FormatOptions.csv()) + .setSchema(tableSchema) + .setWriteDisposition(WriteDisposition.WRITE_APPEND) + .setJobTimeoutMs(600000L) // 10 min + .build(); + + final Job loadJob = this.bigQuery.create(JobInfo.of(configuration)); + LOGGER.info("[{}] Created a new job to upload record(s) to target table {} (dataset {}): {}", loadJob.getJobId(), + tableId, datasetId, loadJob); + + try { + BigQueryUtils.waitForJobFinish(loadJob); + LOGGER.info("[{}] Target table {} (dataset {}) is successfully appended with staging files", loadJob.getJobId(), + tableId, datasetId); + } catch (final BigQueryException | InterruptedException e) { + throw new RuntimeException( + String.format("[%s] Failed to upload staging files to destination table %s (%s)", loadJob.getJobId(), + tableId, datasetId), + e); + } + } + + @Deprecated + public void cleanUpStage(final String datasetId, final String stream, final List stagedFiles) { + if (keepStagingFiles) { + return; + } + + LOGGER.info("Deleting staging files for stream {} (dataset {}): {}", stream, datasetId, stagedFiles); + gcsStorageOperations.cleanUpBucketObject(getStagingRootPath(datasetId, stream), stagedFiles); + } + + public void dropTableIfExists(final String datasetId, final TableId tableId) { + LOGGER.info("Deleting target table {} (dataset {})", tableId, datasetId); + bigQuery.delete(tableId); + } + + public void dropStageIfExists(final String datasetId, final String stream) { + if (keepStagingFiles) { + return; + } + + final String stagingDatasetPath = getStagingRootPath(datasetId, stream); + LOGGER.info("Cleaning up staging path for stream {} (dataset {}): {}", stream, datasetId, stagingDatasetPath); + gcsStorageOperations.dropBucketObject(stagingDatasetPath); + } + + /** + * "Truncates" table, this is a workaround to the issue with TRUNCATE TABLE in BigQuery where the + * table's partition filter must be turned off to truncate. Since deleting a table is a free + * operation this option re-uses functions that already exist + * + *

+ * See: https://cloud.google.com/bigquery/pricing#free + *

+ * + * @param datasetId equivalent to schema name + * @param tableId table name + * @param schema schema of the table to be deleted/created + */ + public void truncateTableIfExists(final String datasetId, + final TableId tableId, + final Schema schema) { + LOGGER.info("Truncating target table {} (dataset {})", tableId, datasetId); + dropTableIfExists(datasetId, tableId); + createTableIfNotExists(tableId, schema); + } + +} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordStandardConsumer.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordStandardConsumer.java new file mode 100644 index 0000000000000..baa28e56f3900 --- /dev/null +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordStandardConsumer.java @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.bigquery; + +import io.airbyte.cdk.integrations.destination.async.AsyncStreamConsumer; +import io.airbyte.cdk.integrations.destination.async.buffers.BufferManager; +import io.airbyte.cdk.integrations.destination.async.state.FlushFailure; +import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnCloseFunction; +import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnStartFunction; +import io.airbyte.integrations.destination.bigquery.uploader.BigQueryDirectUploader; +import io.airbyte.protocol.models.v0.AirbyteMessage; +import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; +import java.util.Optional; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executors; +import java.util.function.Consumer; +import java.util.function.Supplier; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@SuppressWarnings("try") +public class BigQueryRecordStandardConsumer extends AsyncStreamConsumer { + + public BigQueryRecordStandardConsumer(Consumer outputRecordCollector, + OnStartFunction onStart, + OnCloseFunction onClose, + ConfiguredAirbyteCatalog catalog, + String defaultNamespace, + Supplier> uploaderMap) { + super(outputRecordCollector, + onStart, + onClose, + new BigQueryAsyncStandardFlush(uploaderMap), + catalog, + new BufferManager((long) (Runtime.getRuntime().maxMemory() * 0.5)), + Optional.ofNullable(defaultNamespace), + new FlushFailure(), + Executors.newFixedThreadPool(2)); + } + +} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingConsumerFactory.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingConsumerFactory.java new file mode 100644 index 0000000000000..72f6362d61c0c --- /dev/null +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingConsumerFactory.java @@ -0,0 +1,158 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.bigquery; + +import static io.airbyte.cdk.integrations.base.JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE; + +import com.google.cloud.bigquery.TableId; +import com.google.common.base.Functions; +import com.google.common.base.Preconditions; +import io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer; +import io.airbyte.cdk.integrations.destination.async.AsyncStreamConsumer; +import io.airbyte.cdk.integrations.destination.async.buffers.BufferManager; +import io.airbyte.cdk.integrations.destination.async.function.DestinationFlushFunction; +import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnCloseFunction; +import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnStartFunction; +import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog; +import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig; +import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduper; +import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter; +import io.airbyte.protocol.models.v0.AirbyteMessage; +import io.airbyte.protocol.models.v0.AirbyteStream; +import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.v0.DestinationSyncMode; +import io.airbyte.protocol.models.v0.StreamDescriptor; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class mimics the same functionality as + * {@link io.airbyte.cdk.integrations.destination.staging.StagingConsumerFactory} which likely + * should be placed into a commons package to be utilized across all ConsumerFactories + */ +public class BigQueryStagingConsumerFactory { + + private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryStagingConsumerFactory.class); + + public SerializedAirbyteMessageConsumer createAsync( + final ConfiguredAirbyteCatalog catalog, + final Consumer outputRecordCollector, + final BigQueryGcsOperations bigQueryGcsOperations, + final TyperDeduper typerDeduper, + final ParsedCatalog parsedCatalog, + final String defaultNamespace) { + final Map streamConfigMap = createWriteConfigs( + catalog, + parsedCatalog); + + final DestinationFlushFunction flusher = new BigQueryAsyncFlush(streamConfigMap, bigQueryGcsOperations, catalog); + return new AsyncStreamConsumer( + outputRecordCollector, + onStartFunction(bigQueryGcsOperations, parsedCatalog.getStreams(), typerDeduper), + onCloseFunction(bigQueryGcsOperations, parsedCatalog.getStreams(), typerDeduper), + flusher, + catalog, + new BufferManager(getBigQueryBufferMemoryLimit()), + Optional.ofNullable(defaultNamespace)); + } + + /** + * Out BigQuery's uploader threads use a fair amount of memory. We believe this is largely due to + * the sdk client we use. + * + * @return number of bytes to make available for message buffering. + */ + private long getBigQueryBufferMemoryLimit() { + return (long) (Runtime.getRuntime().maxMemory() * 0.4); + } + + private Map createWriteConfigs(final ConfiguredAirbyteCatalog catalog, + final ParsedCatalog parsedCatalog) { + return catalog.getStreams().stream() + .map(configuredStream -> { + Preconditions.checkNotNull(configuredStream.getDestinationSyncMode(), "Undefined destination sync mode"); + + final AirbyteStream stream = configuredStream.getStream(); + return parsedCatalog.getStream(stream.getNamespace(), stream.getName()); + }) + .collect(Collectors.toMap( + c -> new StreamDescriptor().withName(c.getId().getOriginalName()).withNamespace(c.getId().getOriginalNamespace()), + Functions.identity())); + } + + /** + * @param bigQueryGcsOperations collection of Google Cloud Storage Operations + * @param streamConfigs configuration settings used to describe how to write data and where it + * exists + */ + private OnStartFunction onStartFunction(final BigQueryGcsOperations bigQueryGcsOperations, + final List streamConfigs, + final TyperDeduper typerDeduper) { + return () -> { + LOGGER.info("Preparing airbyte_raw tables in destination started for {} streams", streamConfigs.size()); + typerDeduper.prepareSchemasAndRunMigrations(); + + for (final StreamConfig streamConfig : streamConfigs) { + final var tableId = TableId.of(streamConfig.getId().getRawNamespace(), streamConfig.getId().getRawName()); + LOGGER.info("Preparing staging are in destination for schema: {}, stream: {}, target table: {}, stage: {}", + BigQueryRecordFormatter.SCHEMA_V2, streamConfig.getId().getOriginalName(), + tableId, streamConfig.getId().getOriginalName()); + // In Destinations V2, we will always use the 'airbyte_internal' schema/originalNamespace for raw + // tables + final String rawDatasetId = DEFAULT_AIRBYTE_INTERNAL_NAMESPACE; + // Regardless, ensure the schema the customer wants to write to exists + bigQueryGcsOperations.createSchemaIfNotExists(streamConfig.getId().getRawNamespace()); + // Schema used for raw and airbyte internal tables + bigQueryGcsOperations.createSchemaIfNotExists(rawDatasetId); + // Customer's destination schema + // With checkpointing, we will be creating the target table earlier in the setup such that + // the data can be immediately loaded from the staging area + bigQueryGcsOperations.createTableIfNotExists(tableId, BigQueryRecordFormatter.SCHEMA_V2); + bigQueryGcsOperations.createStageIfNotExists(rawDatasetId, streamConfig.getId().getOriginalName()); + // When OVERWRITE mode, truncate the destination's raw table prior to syncing data + if (streamConfig.getDestinationSyncMode() == DestinationSyncMode.OVERWRITE) { + // TODO: this might need special handling during the migration + bigQueryGcsOperations.truncateTableIfExists(rawDatasetId, tableId, BigQueryRecordFormatter.SCHEMA_V2); + } + } + + typerDeduper.prepareFinalTables(); + LOGGER.info("Preparing tables in destination completed."); + }; + } + + /** + * Tear down process, will attempt to clean out any staging area + * + * @param bigQueryGcsOperations collection of staging operations + * @param streamConfigs configuration settings used to describe how to write data and where it + * exists + */ + private OnCloseFunction onCloseFunction(final BigQueryGcsOperations bigQueryGcsOperations, + final List streamConfigs, + final TyperDeduper typerDeduper) { + return (hasFailed, streamSyncSummaries) -> { + /* + * Previously the hasFailed value was used to commit any remaining staged files into destination, + * however, with the changes to checkpointing this will no longer be necessary since despite partial + * successes, we'll be committing the target table (aka airbyte_raw) table throughout the sync + */ + typerDeduper.typeAndDedupe(streamSyncSummaries); + LOGGER.info("Cleaning up destination started for {} streams", streamConfigs.size()); + for (final StreamConfig streamConfig : streamConfigs) { + bigQueryGcsOperations.dropStageIfExists(streamConfig.getId().getRawNamespace(), streamConfig.getId().getOriginalName()); + } + typerDeduper.commitFinalTables(); + typerDeduper.cleanup(); + LOGGER.info("Cleaning up destination completed."); + }; + } + +} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java index be378b795891e..36c10dbc00026 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java @@ -4,9 +4,10 @@ package io.airbyte.integrations.destination.bigquery; +import static io.airbyte.integrations.destination.bigquery.helpers.LoggerHelper.getJobErrorMessage; + import com.fasterxml.jackson.databind.JsonNode; import com.google.api.gax.rpc.HeaderProvider; -import com.google.cloud.RetryOption; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.BigQueryError; import com.google.cloud.bigquery.BigQueryException; @@ -39,14 +40,12 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.UUID; -import java.util.stream.Collectors; -import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.logging.log4j.util.Strings; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.threeten.bp.Duration; public class BigQueryUtils { @@ -86,6 +85,16 @@ static Job waitForQuery(final Job queryJob) { } } + public static void createSchemaAndTableIfNeeded(final BigQuery bigquery, + final Set existingSchemas, + final String schemaName, + final String datasetLocation) { + if (!existingSchemas.contains(schemaName)) { + getOrCreateDataset(bigquery, schemaName, datasetLocation); + existingSchemas.add(schemaName); + } + } + public static Dataset getOrCreateDataset(final BigQuery bigquery, final String datasetId, final String datasetLocation) { Dataset dataset = bigquery.getDataset(datasetId); if (dataset == null || !dataset.exists()) { @@ -176,7 +185,7 @@ public static Table createTable(final BigQuery bigquery, final String datasetNam * @return Table BigQuery table object to be referenced for deleting, otherwise empty meaning table * was not successfully created */ - public static void createPartitionedTableIfNotExists(final BigQuery bigquery, final TableId tableId, final Schema schema) { + static void createPartitionedTableIfNotExists(final BigQuery bigquery, final TableId tableId, final Schema schema) { try { final var chunkingColumn = JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT; final TimePartitioning partitioning = TimePartitioning.newBuilder(TimePartitioning.Type.DAY) @@ -205,8 +214,7 @@ public static void createPartitionedTableIfNotExists(final BigQuery bigquery, fi } } catch (final BigQueryException e) { - LOGGER.error("Partitioned table was not created: {}", tableId, e); - throw e; + LOGGER.error("Partitioned table was not created: " + tableId, e); } } @@ -308,39 +316,15 @@ public static void waitForJobFinish(final Job job) throws InterruptedException { if (job != null) { AirbyteExceptionHandler.addStringForDeinterpolation(job.getEtag()); try { - LOGGER.info("Waiting for Job {} to finish. Status: {}", job.getJobId(), job.getStatus()); - // Default totalTimeout is 12 Hours, 30 minutes seems reasonable - final Job completedJob = job.waitFor(RetryOption.totalTimeout(Duration.ofMinutes(30))); - if (completedJob == null) { - // job no longer exists - LOGGER.warn("Job {} No longer exists", job.getJobId()); - } else if (completedJob.getStatus().getError() != null) { - // job failed, handle error - LOGGER.error("Job {} failed with errors {}", completedJob.getJobId(), completedJob.getStatus().getError().toString()); - throw new RuntimeException( - "Fail to complete a load job in big query, Job id: " + completedJob.getJobId() + - ", with error: " + completedJob.getStatus().getError()); - } else { - // job completed successfully - LOGGER.info("Job {} completed successfully, job info {}", completedJob.getJobId(), completedJob); - } + LOGGER.info("Waiting for job finish {}. Status: {}", job, job.getStatus()); + job.waitFor(); + LOGGER.info("Job finish {} with status {}", job, job.getStatus()); } catch (final BigQueryException e) { final String errorMessage = getJobErrorMessage(e.getErrors(), job); LOGGER.error(errorMessage); throw new BigQueryException(e.getCode(), errorMessage, e); } - } else { - LOGGER.warn("Received null value for Job, nothing to waitFor"); - } - } - - private static String getJobErrorMessage(List errors, Job job) { - if (errors == null || errors.isEmpty()) { - return StringUtils.EMPTY; - } - return String.format("An error occurred during execution of job: %s, \n For more details see Big Query Error collection: %s:", job, - errors.stream().map(BigQueryError::toString).collect(Collectors.joining(",\n "))); } public static HeaderProvider getHeaderProvider() { diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/BigQueryRecordFormatter.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/BigQueryRecordFormatter.java index 1372e810d6e8a..ccb5d513b0482 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/BigQueryRecordFormatter.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/BigQueryRecordFormatter.java @@ -9,12 +9,15 @@ import com.google.cloud.bigquery.Schema; import com.google.cloud.bigquery.StandardSQLTypeName; import io.airbyte.cdk.integrations.base.JavaBaseConstants; +import io.airbyte.cdk.integrations.destination.StandardNameTransformer; import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage; import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteRecordMessage; import io.airbyte.commons.json.Jsons; import java.util.HashMap; import java.util.UUID; import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * The class formats incoming JsonSchema and AirbyteRecord in order to be inline with a @@ -27,8 +30,13 @@ public class BigQueryRecordFormatter { Field.of(JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT, StandardSQLTypeName.TIMESTAMP), Field.of(JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT, StandardSQLTypeName.TIMESTAMP), Field.of(JavaBaseConstants.COLUMN_NAME_DATA, StandardSQLTypeName.STRING)); + private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryRecordFormatter.class); - public BigQueryRecordFormatter() {} + protected final StandardNameTransformer namingResolver; + + public BigQueryRecordFormatter(final StandardNameTransformer namingResolver) { + this.namingResolver = namingResolver; + } public String formatRecord(PartialAirbyteMessage recordMessage) { // Map.of has a @NonNull requirement, so creating a new Hash map diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/helpers/LoggerHelper.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/helpers/LoggerHelper.java new file mode 100644 index 0000000000000..4e0682865dd1a --- /dev/null +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/helpers/LoggerHelper.java @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.bigquery.helpers; + +import com.google.cloud.bigquery.BigQueryError; +import com.google.cloud.bigquery.Job; +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryMXBean; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LoggerHelper { + + private static final Logger LOGGER = LoggerFactory.getLogger(LoggerHelper.class); + + private LoggerHelper() {} + + public static void printHeapMemoryConsumption() { + final int mb = 1024 * 1024; + final MemoryMXBean memoryBean = ManagementFactory.getMemoryMXBean(); + final long xmx = memoryBean.getHeapMemoryUsage().getMax() / mb; + final long xms = memoryBean.getHeapMemoryUsage().getInit() / mb; + LOGGER.info("Initial Memory (xms) mb = {}", xms); + LOGGER.info("Max Memory (xmx) : mb = {}", xmx); + } + + public static String getJobErrorMessage(List errors, Job job) { + if (errors == null || errors.isEmpty()) { + return StringUtils.EMPTY; + + } + return String.format("An error occurred during execution of job: %s, \n For more details see Big Query Error collection: %s:", job, + errors.stream().map(BigQueryError::toString).collect(Collectors.joining(",\n "))); + } + +} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryDestinationHandler.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryDestinationHandler.java index 46ba1c8c886cd..39f283316c944 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryDestinationHandler.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryDestinationHandler.java @@ -33,8 +33,6 @@ import com.google.common.collect.Streams; import io.airbyte.cdk.integrations.base.AirbyteExceptionHandler; import io.airbyte.cdk.integrations.base.JavaBaseConstants; -import io.airbyte.cdk.integrations.util.ConnectorExceptionUtil; -import io.airbyte.commons.exceptions.ConfigErrorException; import io.airbyte.integrations.base.destination.typing_deduping.AlterTableReport; import io.airbyte.integrations.base.destination.typing_deduping.ColumnId; import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler; @@ -44,8 +42,8 @@ import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig; import io.airbyte.integrations.base.destination.typing_deduping.StreamId; import io.airbyte.integrations.base.destination.typing_deduping.TableNotMigratedException; -import io.airbyte.integrations.destination.bigquery.BigQueryUtils; -import io.airbyte.integrations.destination.bigquery.migrators.BigQueryDestinationState; +import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState; +import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState.Impl; import java.math.BigInteger; import java.util.ArrayList; import java.util.Collection; @@ -60,11 +58,11 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.commons.text.StringSubstitutor; -import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class BigQueryDestinationHandler implements DestinationHandler { +// TODO this stuff almost definitely exists somewhere else in our codebase. +public class BigQueryDestinationHandler implements DestinationHandler { private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryDestinationHandler.class); @@ -100,7 +98,7 @@ SELECT TIMESTAMP_SUB(MIN(_airbyte_extracted_at), INTERVAL 1 MICROSECOND) FROM ${raw_table} WHERE _airbyte_loaded_at IS NULL """)) - .build()).iterateAll().iterator().next().getFirst(); + .build()).iterateAll().iterator().next().get(0); // If this value is null, then there are no records with null loaded_at. // If it's not null, then we can return immediately - we've found some unprocessed records and their // timestamp. @@ -114,7 +112,7 @@ SELECT TIMESTAMP_SUB(MIN(_airbyte_extracted_at), INTERVAL 1 MICROSECOND) SELECT MAX(_airbyte_extracted_at) FROM ${raw_table} """)) - .build()).iterateAll().iterator().next().getFirst(); + .build()).iterateAll().iterator().next().get(0); // We know (from the previous query) that all records have been processed by T+D already. // So we just need to get the timestamp of the most recent record. if (loadedRecordTimestamp.isNull()) { @@ -194,8 +192,8 @@ public void execute(final Sql sql) throws InterruptedException { } @Override - public List> gatherInitialState(List streamConfigs) throws Exception { - final List> initialStates = new ArrayList<>(); + public List> gatherInitialState(List streamConfigs) throws Exception { + final List> initialStates = new ArrayList<>(); for (final StreamConfig streamConfig : streamConfigs) { final StreamId id = streamConfig.getId(); final Optional finalTable = findExistingTable(id); @@ -207,13 +205,13 @@ public List> gatherInitialSta finalTable.isPresent() && !existingSchemaMatchesStreamConfig(streamConfig, finalTable.get()), finalTable.isEmpty() || isFinalTableEmpty(id), // Return a default state blob since we don't actually track state. - new BigQueryDestinationState(false))); + new MinimumDestinationState.Impl(false))); } return initialStates; } @Override - public void commitDestinationStates(Map destinationStates) throws Exception { + public void commitDestinationStates(Map destinationStates) throws Exception { // Intentionally do nothing. Bigquery doesn't actually support destination states. } @@ -323,22 +321,4 @@ private static Set getPks(final StreamConfig stream) { : Collections.emptySet(); } - @Override - public void createNamespaces(@NotNull Set schemas) { - schemas.forEach(this::createDataset); - } - - private void createDataset(final String dataset) { - LOGGER.info("Creating dataset if not present {}", dataset); - try { - BigQueryUtils.getOrCreateDataset(bq, dataset, datasetLocation); - } catch (BigQueryException e) { - if (ConnectorExceptionUtil.HTTP_AUTHENTICATION_ERROR_CODES.contains(e.getCode())) { - throw new ConfigErrorException(e.getMessage(), e); - } else { - throw e; - } - } - } - } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java index 028e67e40e766..bf82c1f6243fc 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java @@ -6,7 +6,7 @@ import static io.airbyte.integrations.base.destination.typing_deduping.Sql.separately; import static io.airbyte.integrations.base.destination.typing_deduping.Sql.transactionally; -import static io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtil.SOFT_RESET_SUFFIX; +import static io.airbyte.integrations.base.destination.typing_deduping.TypeAndDedupeTransaction.SOFT_RESET_SUFFIX; import static java.util.stream.Collectors.joining; import com.google.cloud.bigquery.StandardSQLTypeName; @@ -183,12 +183,13 @@ THEN JSON_QUERY(`_airbyte_data`, '$."${column_name}"') // the SQLGenerator? public static StandardSQLTypeName toDialectType(final AirbyteProtocolType airbyteProtocolType) { return switch (airbyteProtocolType) { - case STRING, TIME_WITH_TIMEZONE -> StandardSQLTypeName.STRING; + case STRING -> StandardSQLTypeName.STRING; case NUMBER -> StandardSQLTypeName.NUMERIC; case INTEGER -> StandardSQLTypeName.INT64; case BOOLEAN -> StandardSQLTypeName.BOOL; case TIMESTAMP_WITH_TIMEZONE -> StandardSQLTypeName.TIMESTAMP; case TIMESTAMP_WITHOUT_TIMEZONE -> StandardSQLTypeName.DATETIME; + case TIME_WITH_TIMEZONE -> StandardSQLTypeName.STRING; case TIME_WITHOUT_TIMEZONE -> StandardSQLTypeName.TIME; case DATE -> StandardSQLTypeName.DATE; case UNKNOWN -> StandardSQLTypeName.JSON; diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryV2TableMigrator.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryV2TableMigrator.java new file mode 100644 index 0000000000000..61bd4602042b1 --- /dev/null +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryV2TableMigrator.java @@ -0,0 +1,81 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.bigquery.typing_deduping; + +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.FieldList; +import com.google.cloud.bigquery.LegacySQLTypeName; +import com.google.cloud.bigquery.QueryJobConfiguration; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.Table; +import com.google.cloud.bigquery.TableId; +import io.airbyte.cdk.integrations.base.JavaBaseConstants; +import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig; +import io.airbyte.integrations.base.destination.typing_deduping.V2TableMigrator; +import java.util.Map; +import org.apache.commons.text.StringSubstitutor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BigQueryV2TableMigrator implements V2TableMigrator { + + private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryV2TableMigrator.class); + + private final BigQuery bq; + + public BigQueryV2TableMigrator(final BigQuery bq) { + this.bq = bq; + } + + @Override + public void migrateIfNecessary(final StreamConfig streamConfig) throws InterruptedException { + final Table rawTable = bq.getTable(TableId.of(streamConfig.getId().getRawNamespace(), streamConfig.getId().getRawName())); + if (rawTable != null && rawTable.exists()) { + final Schema existingRawSchema = rawTable.getDefinition().getSchema(); + final FieldList fields = existingRawSchema.getFields(); + if (fields.stream().noneMatch(f -> JavaBaseConstants.COLUMN_NAME_DATA.equals(f.getName()))) { + throw new IllegalStateException( + "Table does not have a column named _airbyte_data. We are likely colliding with a completely different table."); + } + final Field dataColumn = fields.get(JavaBaseConstants.COLUMN_NAME_DATA); + if (dataColumn.getType() == LegacySQLTypeName.JSON) { + LOGGER.info("Raw table has _airbyte_data of type JSON. Migrating to STRING."); + final String tmpRawTableId = BigQuerySqlGenerator.QUOTE + streamConfig.getId().getRawNamespace() + BigQuerySqlGenerator.QUOTE + "." + + BigQuerySqlGenerator.QUOTE + streamConfig.getId().getRawName() + "_airbyte_tmp" + BigQuerySqlGenerator.QUOTE; + bq.query(QueryJobConfiguration.of( + new StringSubstitutor(Map.of( + "raw_table", streamConfig.getId().rawTableId(BigQuerySqlGenerator.QUOTE), + "tmp_raw_table", tmpRawTableId, + "real_raw_table", BigQuerySqlGenerator.QUOTE + streamConfig.getId().getRawName() + BigQuerySqlGenerator.QUOTE)).replace( + // In full refresh / append mode, standard inserts is creating a non-partitioned raw table. + // (possibly also in overwrite mode?). + // We can't just CREATE OR REPLACE the table because bigquery will complain that we're trying to + // change the partitioning scheme. + // Do an explicit CREATE tmp + DROP + RENAME, similar to how we overwrite the final tables in + // OVERWRITE mode. + """ + CREATE TABLE ${tmp_raw_table} + PARTITION BY DATE(_airbyte_extracted_at) + CLUSTER BY _airbyte_extracted_at + AS ( + SELECT + _airbyte_raw_id, + _airbyte_extracted_at, + _airbyte_loaded_at, + to_json_string(_airbyte_data) as _airbyte_data + FROM ${raw_table} + ); + DROP TABLE IF EXISTS ${raw_table}; + ALTER TABLE ${tmp_raw_table} RENAME TO ${real_raw_table}; + """))); + LOGGER.info("Completed Data column Migration for stream {}", streamConfig.getId().getRawName()); + } else { + LOGGER.info("No Data column Migration Required for stream {}", streamConfig.getId().getRawName()); + } + } + } + +} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/BigQueryDirectUploader.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/BigQueryDirectUploader.java new file mode 100644 index 0000000000000..796510e5ed84b --- /dev/null +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/BigQueryDirectUploader.java @@ -0,0 +1,81 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.bigquery.uploader; + +import static io.airbyte.integrations.destination.bigquery.helpers.LoggerHelper.printHeapMemoryConsumption; + +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.StandardTableDefinition; +import com.google.cloud.bigquery.Table; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.TableInfo; +import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage; +import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter; +import io.airbyte.integrations.destination.bigquery.writer.BigQueryTableWriter; +import java.io.IOException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BigQueryDirectUploader { + + private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryDirectUploader.class); + + protected final TableId table; + protected final BigQueryTableWriter writer; + protected final BigQuery bigQuery; + protected final BigQueryRecordFormatter recordFormatter; + + BigQueryDirectUploader(final TableId table, + final BigQueryTableWriter writer, + final BigQuery bigQuery, + final BigQueryRecordFormatter recordFormatter) { + this.table = table; + this.writer = writer; + this.bigQuery = bigQuery; + this.recordFormatter = recordFormatter; + } + + public void upload(final PartialAirbyteMessage airbyteMessage) { + try { + writer.write(recordFormatter.formatRecord(airbyteMessage)); + } catch (final IOException | RuntimeException e) { + LOGGER.error("Got an error while writing message: {}", e.getMessage(), e); + LOGGER.error(String.format( + "Failed to process a message for job: %s", + writer.toString())); + printHeapMemoryConsumption(); + throw new RuntimeException(e); + } + } + + public void closeAfterPush() { + try { + this.writer.close(); + } catch (final IOException e) { + throw new RuntimeException(e); + } + } + + public void createRawTable() { + // Ensure that this table exists. + final Table rawTable = bigQuery.getTable(table); + if (rawTable == null) { + LOGGER.info("Creating raw table {}.", table); + bigQuery.create(TableInfo.newBuilder(table, StandardTableDefinition.of(BigQueryRecordFormatter.SCHEMA_V2)).build()); + } else { + LOGGER.info("Found raw table {}.", rawTable.getTableId()); + } + } + + @Override + public String toString() { + return "BigQueryDirectUploader{" + + "table=" + table.getTable() + + ", writer=" + writer.getClass() + + ", recordFormatter=" + recordFormatter.getClass() + + '}'; + } + +} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/BigQueryUploaderFactory.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/BigQueryUploaderFactory.java new file mode 100644 index 0000000000000..2db3a903089df --- /dev/null +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/BigQueryUploaderFactory.java @@ -0,0 +1,121 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.bigquery.uploader; + +import static io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter.*; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryException; +import com.google.cloud.bigquery.FormatOptions; +import com.google.cloud.bigquery.JobId; +import com.google.cloud.bigquery.JobInfo; +import com.google.cloud.bigquery.TableDataWriteChannel; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.WriteChannelConfiguration; +import io.airbyte.commons.exceptions.ConfigErrorException; +import io.airbyte.integrations.destination.bigquery.BigQueryUtils; +import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter; +import io.airbyte.integrations.destination.bigquery.uploader.config.UploaderConfig; +import io.airbyte.integrations.destination.bigquery.writer.BigQueryTableWriter; +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BigQueryUploaderFactory { + + private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryUploaderFactory.class); + + private static final int HTTP_STATUS_CODE_FORBIDDEN = 403; + private static final int HTTP_STATUS_CODE_NOT_FOUND = 404; + + private static final String CONFIG_ERROR_MSG = """ + Failed to write to destination schema. + + 1. Make sure you have all required permissions for writing to the schema. + + 2. Make sure that the actual destination schema's location corresponds to location provided + in connector's config. + + 3. Try to change the "Destination schema" from "Mirror Source Structure" (if it's set) tp the + "Destination Default" option. + + More details: + """; + + public static BigQueryDirectUploader getUploader(final UploaderConfig uploaderConfig) + throws IOException { + final String dataset = uploaderConfig.getParsedStream().getId().getRawNamespace(); + final String datasetLocation = BigQueryUtils.getDatasetLocation(uploaderConfig.getConfig()); + final Set existingDatasets = new HashSet<>(); + + final BigQueryRecordFormatter recordFormatter = uploaderConfig.getFormatter(); + + final TableId targetTable = TableId.of(dataset, uploaderConfig.getTargetTableName()); + + BigQueryUtils.createSchemaAndTableIfNeeded( + uploaderConfig.getBigQuery(), + existingDatasets, + dataset, + datasetLocation); + + return getBigQueryDirectUploader( + uploaderConfig.getConfig(), + targetTable, + uploaderConfig.getBigQuery(), + datasetLocation, + recordFormatter); + } + + private static BigQueryDirectUploader getBigQueryDirectUploader( + final JsonNode config, + final TableId targetTable, + final BigQuery bigQuery, + final String datasetLocation, + final BigQueryRecordFormatter formatter) { + // https://cloud.google.com/bigquery/docs/loading-data-local#loading_data_from_a_local_data_source + LOGGER.info("Will write raw data to {} with schema {}", targetTable, SCHEMA_V2); + final WriteChannelConfiguration writeChannelConfiguration = + WriteChannelConfiguration.newBuilder(targetTable) + .setCreateDisposition(JobInfo.CreateDisposition.CREATE_IF_NEEDED) + .setSchema(SCHEMA_V2) + .setFormatOptions(FormatOptions.json()) + .build(); // new-line delimited json. + + final JobId job = JobId.newBuilder() + .setRandomJob() + .setLocation(datasetLocation) + .setProject(bigQuery.getOptions().getProjectId()) + .build(); + + final TableDataWriteChannel writer; + + try { + writer = bigQuery.writer(job, writeChannelConfiguration); + } catch (final BigQueryException e) { + if (e.getCode() == HTTP_STATUS_CODE_FORBIDDEN || e.getCode() == HTTP_STATUS_CODE_NOT_FOUND) { + throw new ConfigErrorException(CONFIG_ERROR_MSG + e); + } else { + throw new BigQueryException(e.getCode(), e.getMessage()); + } + } + + // this this optional value. If not set - use default client's value (15MiG) + final Integer bigQueryClientChunkSizeFomConfig = + BigQueryUtils.getBigQueryClientChunkSize(config); + if (bigQueryClientChunkSizeFomConfig != null) { + writer.setChunkSize(bigQueryClientChunkSizeFomConfig); + } + + return new BigQueryDirectUploader( + targetTable, + new BigQueryTableWriter(writer), + bigQuery, + formatter); + } + +} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/UploaderType.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/UploaderType.java new file mode 100644 index 0000000000000..80333f1b68397 --- /dev/null +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/UploaderType.java @@ -0,0 +1,11 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.bigquery.uploader; + +public enum UploaderType { + STANDARD, + AVRO, + CSV +} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/config/UploaderConfig.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/config/UploaderConfig.java new file mode 100644 index 0000000000000..ad7cbebbff7cd --- /dev/null +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/config/UploaderConfig.java @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.bigquery.uploader.config; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.cloud.bigquery.BigQuery; +import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig; +import io.airbyte.integrations.destination.bigquery.BigQueryUtils; +import io.airbyte.integrations.destination.bigquery.UploadingMethod; +import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter; +import io.airbyte.integrations.destination.bigquery.uploader.UploaderType; +import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream; +import java.util.Map; +import lombok.Builder; +import lombok.Getter; + +@Builder +@Getter +public class UploaderConfig { + + private JsonNode config; + /** + * Taken directly from the {@link ConfiguredAirbyteStream}, except if the originalNamespace was + * null, we set it to the destination default originalNamespace. + */ + private ConfiguredAirbyteStream configStream; + /** + * Parsed directly from {@link #configStream}. + */ + private StreamConfig parsedStream; + private String targetTableName; + private BigQuery bigQuery; + private Map formatterMap; + private boolean isDefaultAirbyteTmpSchema; + + public boolean isGcsUploadingMode() { + return BigQueryUtils.getLoadingMethod(config) == UploadingMethod.GCS; + } + + public UploaderType getUploaderType() { + return (isGcsUploadingMode() ? UploaderType.CSV : UploaderType.STANDARD); + } + + public BigQueryRecordFormatter getFormatter() { + return formatterMap.get(getUploaderType()); + } + +} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/writer/BigQueryTableWriter.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/writer/BigQueryTableWriter.java new file mode 100644 index 0000000000000..50c2608ae1714 --- /dev/null +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/writer/BigQueryTableWriter.java @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.bigquery.writer; + +import com.google.cloud.bigquery.Job; +import com.google.cloud.bigquery.TableDataWriteChannel; +import com.google.common.base.Charsets; +import io.airbyte.cdk.integrations.base.AirbyteExceptionHandler; +import java.io.IOException; +import java.nio.ByteBuffer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public record BigQueryTableWriter(TableDataWriteChannel writeChannel) { + + private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryTableWriter.class); + + public void write(final String formattedData) throws IOException { + writeChannel.write(ByteBuffer.wrap((formattedData + "\n").getBytes(Charsets.UTF_8))); + } + + public void close() throws IOException { + this.writeChannel.close(); + try { + final Job job = writeChannel.getJob(); + if (job != null && job.getStatus().getError() != null) { + AirbyteExceptionHandler.addStringForDeinterpolation(job.getEtag()); + throw new RuntimeException("Fail to complete a load job in big query, Job id: " + writeChannel.getJob().getJobId() + + ", with error: " + writeChannel.getJob().getStatus().getError()); + } + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + +} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/BigQueryConsts.kt b/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/BigQueryConsts.kt deleted file mode 100644 index 4fc7aafe64106..0000000000000 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/BigQueryConsts.kt +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ -package io.airbyte.integrations.destination.bigquery - -object BigQueryConsts { - const val MiB: Int = 1024 * 1024 - const val CONFIG_DATASET_ID: String = "dataset_id" - const val CONFIG_PROJECT_ID: String = "project_id" - const val CONFIG_DATASET_LOCATION: String = "dataset_location" - const val CONFIG_CREDS: String = "credentials_json" - const val BIG_QUERY_CLIENT_CHUNK_SIZE: String = "big_query_client_buffer_size_mb" - - const val LOADING_METHOD: String = "loading_method" - const val METHOD: String = "method" - const val GCS_STAGING: String = "GCS Staging" - const val GCS_BUCKET_NAME: String = "gcs_bucket_name" - const val GCS_BUCKET_PATH: String = "gcs_bucket_path" - const val GCS_BUCKET_REGION: String = "gcs_bucket_region" - const val CREDENTIAL: String = "credential" - const val FORMAT: String = "format" - const val KEEP_GCS_FILES: String = "keep_files_in_gcs-bucket" - const val KEEP_GCS_FILES_VAL: String = "Keep all tmp files in GCS" - const val DISABLE_TYPE_DEDUPE: String = "disable_type_dedupe" - const val RAW_DATA_DATASET = "raw_data_dataset" - const val NAMESPACE_PREFIX: String = "n" -} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/BigQueryConsumerFactory.kt b/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/BigQueryConsumerFactory.kt deleted file mode 100644 index 29bfa325f9cf7..0000000000000 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/BigQueryConsumerFactory.kt +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Copyright (c) 2024 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.bigquery - -import io.airbyte.cdk.integrations.destination.async.AsyncStreamConsumer -import io.airbyte.cdk.integrations.destination.async.buffers.BufferManager -import io.airbyte.cdk.integrations.destination.operation.SyncOperation -import io.airbyte.integrations.base.destination.operation.DefaultFlush -import io.airbyte.protocol.models.v0.AirbyteMessage -import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog -import java.util.* -import java.util.function.Consumer - -object BigQueryConsumerFactory { - - fun createStagingConsumer( - outputRecordCollector: Consumer, - syncOperation: SyncOperation, - catalog: ConfiguredAirbyteCatalog, - defaultNamespace: String - ): AsyncStreamConsumer { - // values here are resurrected from some old code. - // TODO: Find why max memory ratio is 0.4 capped - return AsyncStreamConsumer( - outputRecordCollector = outputRecordCollector, - onStart = {}, - onClose = { _, streamSyncSummaries -> - syncOperation.finalizeStreams(streamSyncSummaries) - }, - onFlush = DefaultFlush(200 * 1024 * 1024, syncOperation), - catalog = catalog, - bufferManager = - BufferManager( - (Runtime.getRuntime().maxMemory() * 0.4).toLong(), - ), - defaultNamespace = Optional.of(defaultNamespace), - ) - } - - fun createDirectUploadConsumer( - outputRecordCollector: Consumer, - syncOperation: SyncOperation, - catalog: ConfiguredAirbyteCatalog, - defaultNamespace: String - ): AsyncStreamConsumer { - - // TODO: Why is Standard consumer operating at memory ratio of 0.5 - // and Max 2 threads and some weird 20% max memory as the default flush size. - return AsyncStreamConsumer( - outputRecordCollector = outputRecordCollector, - onStart = {}, - onClose = { _, streamSyncSummaries -> - syncOperation.finalizeStreams(streamSyncSummaries) - }, - onFlush = - DefaultFlush((Runtime.getRuntime().maxMemory() * 0.2).toLong(), syncOperation), - catalog = catalog, - bufferManager = - BufferManager( - (Runtime.getRuntime().maxMemory() * 0.5).toLong(), - ), - defaultNamespace = Optional.of(defaultNamespace), - ) - } -} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/BigQueryDestination.kt b/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/BigQueryDestination.kt deleted file mode 100644 index 91e520d9e8483..0000000000000 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/BigQueryDestination.kt +++ /dev/null @@ -1,397 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ -package io.airbyte.integrations.destination.bigquery - -import com.fasterxml.jackson.databind.JsonNode -import com.google.auth.oauth2.GoogleCredentials -import com.google.cloud.bigquery.BigQuery -import com.google.cloud.bigquery.BigQueryException -import com.google.cloud.bigquery.BigQueryOptions -import com.google.cloud.bigquery.QueryJobConfiguration -import com.google.cloud.storage.Storage -import com.google.cloud.storage.StorageOptions -import com.google.common.base.Charsets -import io.airbyte.cdk.integrations.BaseConnector -import io.airbyte.cdk.integrations.base.AirbyteExceptionHandler.Companion.addAllStringsInConfigForDeinterpolation -import io.airbyte.cdk.integrations.base.AirbyteExceptionHandler.Companion.addThrowableForDeinterpolation -import io.airbyte.cdk.integrations.base.AirbyteMessageConsumer -import io.airbyte.cdk.integrations.base.Destination -import io.airbyte.cdk.integrations.base.IntegrationRunner -import io.airbyte.cdk.integrations.base.JavaBaseConstants -import io.airbyte.cdk.integrations.base.JavaBaseConstants.DestinationColumns.* -import io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer -import io.airbyte.cdk.integrations.base.TypingAndDedupingFlag.getRawNamespaceOverride -import io.airbyte.cdk.integrations.destination.gcs.BaseGcsDestination -import io.airbyte.cdk.integrations.destination.gcs.GcsNameTransformer -import io.airbyte.cdk.integrations.destination.gcs.GcsStorageOperations -import io.airbyte.cdk.integrations.destination.operation.SyncOperation -import io.airbyte.cdk.integrations.destination.s3.FileUploadFormat -import io.airbyte.cdk.integrations.destination.staging.operation.StagingStreamOperations -import io.airbyte.commons.exceptions.ConfigErrorException -import io.airbyte.commons.json.Jsons.serialize -import io.airbyte.commons.json.Jsons.tryDeserialize -import io.airbyte.integrations.base.destination.operation.DefaultSyncOperation -import io.airbyte.integrations.base.destination.operation.StandardStreamOperation -import io.airbyte.integrations.base.destination.typing_deduping.CatalogParser -import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus -import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog -import io.airbyte.integrations.destination.bigquery.BigQueryConsts as bqConstants -import io.airbyte.integrations.destination.bigquery.BigQueryConsumerFactory.createDirectUploadConsumer -import io.airbyte.integrations.destination.bigquery.BigQueryConsumerFactory.createStagingConsumer -import io.airbyte.integrations.destination.bigquery.BigQueryUtils.* -import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter -import io.airbyte.integrations.destination.bigquery.migrators.BigQueryDV2Migration -import io.airbyte.integrations.destination.bigquery.migrators.BigQueryDestinationState -import io.airbyte.integrations.destination.bigquery.operation.BigQueryDirectLoadingStorageOperation -import io.airbyte.integrations.destination.bigquery.operation.BigQueryGcsStorageOperation -import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryDestinationHandler -import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQuerySqlGenerator -import io.airbyte.protocol.models.v0.AirbyteConnectionStatus -import io.airbyte.protocol.models.v0.AirbyteMessage -import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog -import io.github.oshai.kotlinlogging.KotlinLogging -import java.io.ByteArrayInputStream -import java.io.IOException -import java.util.* -import java.util.function.Consumer -import org.apache.commons.lang3.StringUtils - -private val log = KotlinLogging.logger {} - -class BigQueryDestination : BaseConnector(), Destination { - - override fun check(config: JsonNode): AirbyteConnectionStatus? { - try { - val datasetId = getDatasetId(config) - val datasetLocation = getDatasetLocation(config) - val bigquery = getBigQuery(config) - val uploadingMethod = getLoadingMethod(config) - - checkHasCreateAndDeleteDatasetRole(bigquery, datasetId, datasetLocation) - - val dataset = getOrCreateDataset(bigquery, datasetId, datasetLocation) - if (dataset.location != datasetLocation) { - throw ConfigErrorException( - "Actual dataset location doesn't match to location from config", - ) - } - val queryConfig = - QueryJobConfiguration.newBuilder( - String.format( - "SELECT * FROM `%s.INFORMATION_SCHEMA.TABLES` LIMIT 1;", - datasetId, - ), - ) - .setUseLegacySql(false) - .build() - - if (UploadingMethod.GCS == uploadingMethod) { - val status = checkGcsPermission(config) - if (status!!.status != AirbyteConnectionStatus.Status.SUCCEEDED) { - return status - } - } - - val result = executeQuery(bigquery, queryConfig) - if (result.getLeft() != null) { - return AirbyteConnectionStatus() - .withStatus(AirbyteConnectionStatus.Status.SUCCEEDED) - } else { - throw ConfigErrorException(result.getRight()) - } - } catch (e: Exception) { - log.error(e) { "Check failed." } - throw ConfigErrorException((if (e.message != null) e.message else e.toString())!!) - } - } - - /** - * This method does two checks: 1) permissions related to the bucket, and 2) the ability to - * create and delete an actual file. The latter is important because even if the service account - * may have the proper permissions, the HMAC keys can only be verified by running the actual GCS - * check. - */ - private fun checkGcsPermission(config: JsonNode): AirbyteConnectionStatus? { - val loadingMethod = config[bqConstants.LOADING_METHOD] - val bucketName = loadingMethod[bqConstants.GCS_BUCKET_NAME].asText() - val missingPermissions: MutableList = ArrayList() - - try { - val credentials = getServiceAccountCredentials(config) - val storage: Storage = - StorageOptions.newBuilder() - .setProjectId(config[bqConstants.CONFIG_PROJECT_ID].asText()) - .setCredentials(credentials) - .setHeaderProvider(getHeaderProvider()) - .build() - .service - val permissionsCheckStatusList: List = - storage.testIamPermissions(bucketName, REQUIRED_PERMISSIONS) - - // testIamPermissions returns a list of booleans - // in the same order of the presented permissions list - missingPermissions.addAll( - permissionsCheckStatusList - .asSequence() - .withIndex() - .filter { !it.value } - .map { REQUIRED_PERMISSIONS[it.index] } - .toList(), - ) - - val gcsDestination: BaseGcsDestination = object : BaseGcsDestination() {} - val gcsJsonNodeConfig = getGcsJsonNodeConfig(config) - return gcsDestination.check(gcsJsonNodeConfig) - } catch (e: Exception) { - val message = StringBuilder("Cannot access the GCS bucket.") - if (!missingPermissions.isEmpty()) { - message - .append(" The following permissions are missing on the service account: ") - .append(java.lang.String.join(", ", missingPermissions)) - .append(".") - } - message.append( - " Please make sure the service account can access the bucket path, and the HMAC keys are correct.", - ) - - log.error(e) { message.toString() } - throw ConfigErrorException( - "Could not access the GCS bucket with the provided configuration.\n", - e, - ) - } - } - - /** - * Returns a [AirbyteMessageConsumer] based on whether the uploading mode is STANDARD INSERTS or - * using STAGING - * - * @param config - * - integration-specific configuration object as json. e.g. { "username": "airbyte", - * "password": "super secure" } - * @param catalog - * - schema of the incoming messages. - */ - override fun getConsumer( - config: JsonNode, - catalog: ConfiguredAirbyteCatalog, - outputRecordCollector: Consumer - ): AirbyteMessageConsumer? { - throw UnsupportedOperationException("Should use getSerializedMessageConsumer") - } - - @Throws(Exception::class) - override fun getSerializedMessageConsumer( - config: JsonNode, - catalog: ConfiguredAirbyteCatalog, - outputRecordCollector: Consumer - ): SerializedAirbyteMessageConsumer { - val uploadingMethod = getLoadingMethod(config) - val defaultNamespace = getDatasetId(config) - setDefaultStreamNamespace(catalog, defaultNamespace) - val disableTypeDedupe = getDisableTypeDedupFlag(config) - val datasetLocation = getDatasetLocation(config) - val projectId = config[bqConstants.CONFIG_PROJECT_ID].asText() - val bigquery = getBigQuery(config) - val rawNamespaceOverride = getRawNamespaceOverride(bqConstants.RAW_DATA_DATASET) - - addAllStringsInConfigForDeinterpolation(config) - val serviceAccountKey = config[bqConstants.CONFIG_CREDS] - if (serviceAccountKey != null) { - // If the service account key is a non-null string, we will try to - // deserialize it. Otherwise, we will let the Google library find it in - // the environment during the client initialization. - if (serviceAccountKey.isTextual) { - // There are cases where we fail to deserialize the service account key. In these - // cases, we - // shouldn't do anything. - // Google's creds library is more lenient with JSON-parsing than Jackson, and I'd - // rather just let it - // go. - tryDeserialize(serviceAccountKey.asText()).ifPresent { obj: JsonNode -> - addAllStringsInConfigForDeinterpolation(obj) - } - } else { - addAllStringsInConfigForDeinterpolation(serviceAccountKey) - } - } - - val sqlGenerator = BigQuerySqlGenerator(projectId, datasetLocation) - val parsedCatalog = - parseCatalog( - sqlGenerator, - rawNamespaceOverride.orElse(JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE), - catalog, - ) - val destinationHandler = BigQueryDestinationHandler(bigquery, datasetLocation) - - val migrations = listOf(BigQueryDV2Migration(sqlGenerator, bigquery)) - - if (uploadingMethod == UploadingMethod.STANDARD) { - val bigQueryClientChunkSize = getBigQueryClientChunkSize(config) - val bigQueryLoadingStorageOperation = - BigQueryDirectLoadingStorageOperation( - bigquery, - bigQueryClientChunkSize, - BigQueryRecordFormatter(), - sqlGenerator, - destinationHandler, - datasetLocation, - ) - val syncOperation = - DefaultSyncOperation( - parsedCatalog, - destinationHandler, - defaultNamespace, - { initialStatus: DestinationInitialStatus, disableTD - -> - StandardStreamOperation( - bigQueryLoadingStorageOperation, - initialStatus, - disableTD - ) - }, - migrations, - disableTypeDedupe, - ) - return createDirectUploadConsumer( - outputRecordCollector, - syncOperation, - catalog, - defaultNamespace, - ) - } - - val gcsNameTransformer = GcsNameTransformer() - val gcsConfig = getGcsCsvDestinationConfig(config) - val keepStagingFiles = isKeepFilesInGcs(config) - val gcsOperations = - GcsStorageOperations(gcsNameTransformer, gcsConfig.getS3Client(), gcsConfig) - - val bigQueryGcsStorageOperations = - BigQueryGcsStorageOperation( - gcsOperations, - gcsConfig, - gcsNameTransformer, - keepStagingFiles, - bigquery, - sqlGenerator, - destinationHandler, - ) - val syncOperation: SyncOperation = - DefaultSyncOperation( - parsedCatalog, - destinationHandler, - defaultNamespace, - { initialStatus: DestinationInitialStatus, disableTD -> - StagingStreamOperations( - bigQueryGcsStorageOperations, - initialStatus, - FileUploadFormat.CSV, - V2_WITHOUT_META, - disableTD - ) - }, - migrations, - disableTypeDedupe, - ) - return createStagingConsumer( - outputRecordCollector, - syncOperation, - catalog, - defaultNamespace, - ) - } - - private fun setDefaultStreamNamespace(catalog: ConfiguredAirbyteCatalog, namespace: String) { - // Set the default originalNamespace on streams with null originalNamespace. This means we - // don't - // need to repeat this - // logic in the rest of the connector. - // (record messages still need to handle null namespaces though, which currently happens in - // e.g. - // AsyncStreamConsumer#accept) - // This probably should be shared logic amongst destinations eventually. - for (stream in catalog.streams) { - if (StringUtils.isEmpty(stream.stream.namespace)) { - stream.stream.withNamespace(namespace) - } - } - } - - private fun parseCatalog( - sqlGenerator: BigQuerySqlGenerator, - rawNamespaceOverride: String, - catalog: ConfiguredAirbyteCatalog - ): ParsedCatalog { - val catalogParser = CatalogParser(sqlGenerator, rawNamespaceOverride) - - return catalogParser.parseCatalog(catalog) - } - - override val isV2Destination: Boolean - get() = true - - companion object { - - private val REQUIRED_PERMISSIONS = - listOf( - "storage.multipartUploads.abort", - "storage.multipartUploads.create", - "storage.objects.create", - "storage.objects.delete", - "storage.objects.get", - "storage.objects.list", - ) - - @JvmStatic - fun getBigQuery(config: JsonNode): BigQuery { - val projectId = config[bqConstants.CONFIG_PROJECT_ID].asText() - - try { - val bigQueryBuilder = BigQueryOptions.newBuilder() - val credentials = getServiceAccountCredentials(config) - return bigQueryBuilder - .setProjectId(projectId) - .setCredentials(credentials) - .setHeaderProvider(getHeaderProvider()) - .build() - .service - } catch (e: IOException) { - throw RuntimeException(e) - } - } - - @JvmStatic - @Throws(IOException::class) - fun getServiceAccountCredentials(config: JsonNode): GoogleCredentials { - val serviceAccountKey = config[bqConstants.CONFIG_CREDS] - // Follows this order of resolution: - // https://cloud.google.com/java/docs/reference/google-auth-library/latest/com.google.auth.oauth2.GoogleCredentials#com_google_auth_oauth2_GoogleCredentials_getApplicationDefault - if (serviceAccountKey == null) { - log.info { - "No service account key json is provided. It is required if you are using Airbyte cloud." - } - log.info { "Using the default service account credential from environment." } - return GoogleCredentials.getApplicationDefault() - } - - // The JSON credential can either be a raw JSON object, or a serialized JSON object. - val credentialsString = - if (serviceAccountKey.isObject) serialize(serviceAccountKey) - else serviceAccountKey.asText() - return GoogleCredentials.fromStream( - ByteArrayInputStream(credentialsString.toByteArray(Charsets.UTF_8)), - ) - } - } -} - -fun main(args: Array) { - addThrowableForDeinterpolation(BigQueryException::class.java) - val destination: Destination = BigQueryDestination() - log.info { "Starting Destination : ${destination.javaClass}" } - IntegrationRunner(destination).run(args) - log.info { "Completed Destination : ${destination.javaClass}" } -} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/migrators/BigQueryDV2Migration.kt b/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/migrators/BigQueryDV2Migration.kt deleted file mode 100644 index 6f23c33f5672d..0000000000000 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/migrators/BigQueryDV2Migration.kt +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright (c) 2024 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.bigquery.migrators - -import com.google.cloud.bigquery.BigQuery -import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler -import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus -import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig -import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration -import io.airbyte.integrations.destination.bigquery.BigQuerySQLNameTransformer -import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQuerySqlGenerator -import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryV1V2Migrator -import io.github.oshai.kotlinlogging.KotlinLogging -import kotlin.math.log - -class BigQueryDV2Migration(private val sqlGenerator: BigQuerySqlGenerator, bigQuery: BigQuery) : - Migration { - private val log = KotlinLogging.logger {} - private val legacyV1V2migrator = BigQueryV1V2Migrator(bigQuery, BigQuerySQLNameTransformer()) - override fun migrateIfNecessary( - destinationHandler: DestinationHandler, - stream: StreamConfig, - state: DestinationInitialStatus - ): Migration.MigrationResult { - log.info { "Initializing DV2 Migration check" } - legacyV1V2migrator.migrateIfNecessary(sqlGenerator, destinationHandler, stream) - // Invalidate state because rawTableExists could be false but we don't use it yet for - // anything ? - return Migration.MigrationResult(BigQueryDestinationState(false), true) - } -} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/migrators/BigQueryDestinationState.kt b/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/migrators/BigQueryDestinationState.kt deleted file mode 100644 index 5517707f35421..0000000000000 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/migrators/BigQueryDestinationState.kt +++ /dev/null @@ -1,18 +0,0 @@ -/* - * Copyright (c) 2024 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.bigquery.migrators - -import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState - -data class BigQueryDestinationState(private val needsSoftReset: Boolean) : MinimumDestinationState { - override fun needsSoftReset(): Boolean { - return needsSoftReset - } - - @Suppress("UNCHECKED_CAST") - override fun withSoftReset(needsSoftReset: Boolean): T { - return copy(needsSoftReset = needsSoftReset) as T - } -} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/operation/BigQueryDirectLoadingStorageOperation.kt b/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/operation/BigQueryDirectLoadingStorageOperation.kt deleted file mode 100644 index 0274dfcb76cf0..0000000000000 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/operation/BigQueryDirectLoadingStorageOperation.kt +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Copyright (c) 2024 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.bigquery.operation - -import com.google.cloud.bigquery.BigQuery -import com.google.cloud.bigquery.BigQueryException -import com.google.cloud.bigquery.FormatOptions -import com.google.cloud.bigquery.JobId -import com.google.cloud.bigquery.JobInfo -import com.google.cloud.bigquery.TableDataWriteChannel -import com.google.cloud.bigquery.TableId -import com.google.cloud.bigquery.WriteChannelConfiguration -import com.google.common.util.concurrent.RateLimiter -import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage -import io.airbyte.commons.exceptions.ConfigErrorException -import io.airbyte.integrations.base.destination.typing_deduping.StreamId -import io.airbyte.integrations.destination.bigquery.BigQueryUtils -import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter -import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter.SCHEMA_V2 -import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryDestinationHandler -import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQuerySqlGenerator -import io.github.oshai.kotlinlogging.KotlinLogging -import java.nio.ByteBuffer -import java.nio.charset.StandardCharsets -import java.util.stream.Stream - -private val log = KotlinLogging.logger {} - -class BigQueryDirectLoadingStorageOperation( - bigquery: BigQuery, - private val bigQueryClientChunkSize: Int?, - private val bigQueryRecordFormatter: BigQueryRecordFormatter, - sqlGenerator: BigQuerySqlGenerator, - destinationHandler: BigQueryDestinationHandler, - datasetLocation: String -) : - BigQueryStorageOperation>( - bigquery, - sqlGenerator, - destinationHandler, - datasetLocation, - ) { - private val rateLimiter: RateLimiter = RateLimiter.create(0.07) - companion object { - private const val HTTP_STATUS_CODE_FORBIDDEN = 403 - private const val HTTP_STATUS_CODE_NOT_FOUND = 404 - - private val CONFIG_ERROR_MSG = - """ - |Failed to write to destination schema. - | 1. Make sure you have all required permissions for writing to the schema. - | 2. Make sure that the actual destination schema's location corresponds to location provided in connector's config. - | 3. Try to change the "Destination schema" from "Mirror Source Structure" (if it's set) tp the "Destination Default" option. - |More details: - |""".trimMargin() - } - override fun writeToStage(streamId: StreamId, data: Stream) { - // TODO: why do we need ratelimiter, and using unstable API from Google's guava - rateLimiter.acquire() - val tableId = TableId.of(streamId.rawNamespace, streamId.rawName) - log.info { "Writing data to table $tableId with schema $SCHEMA_V2" } - val writeChannel = initWriteChannel(tableId) - writeChannel.use { - data.forEach { record -> - val byteArray = - "${bigQueryRecordFormatter.formatRecord(record)} ${System.lineSeparator()}".toByteArray( - StandardCharsets.UTF_8, - ) - it.write(ByteBuffer.wrap(byteArray)) - } - } - log.info { "Writing to channel completed for $tableId" } - val job = writeChannel.job - BigQueryUtils.waitForJobFinish(job) - } - - private fun initWriteChannel(tableId: TableId): TableDataWriteChannel { - val writeChannelConfiguration = - WriteChannelConfiguration.newBuilder(tableId) - .setCreateDisposition(JobInfo.CreateDisposition.CREATE_IF_NEEDED) - .setSchema(SCHEMA_V2) - .setFormatOptions(FormatOptions.json()) - .build() // new-line delimited json. - - val job = - JobId.newBuilder() - .setRandomJob() - .setLocation(datasetLocation) - .setProject(bigquery.options.projectId) - .build() - - val writer: TableDataWriteChannel - - try { - writer = bigquery.writer(job, writeChannelConfiguration) - } catch (e: BigQueryException) { - if (e.code == HTTP_STATUS_CODE_FORBIDDEN || e.code == HTTP_STATUS_CODE_NOT_FOUND) { - throw ConfigErrorException(CONFIG_ERROR_MSG + e) - } else { - throw BigQueryException(e.code, e.message) - } - } - - // this this optional value. If not set - use default client's value (15MiG) - if (bigQueryClientChunkSize != null) { - writer.setChunkSize(bigQueryClientChunkSize) - } - return writer - } -} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/operation/BigQueryGcsStorageOperation.kt b/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/operation/BigQueryGcsStorageOperation.kt deleted file mode 100644 index 9ce45c6323668..0000000000000 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/operation/BigQueryGcsStorageOperation.kt +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Copyright (c) 2024 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.bigquery.operation - -import com.google.cloud.bigquery.BigQuery -import com.google.cloud.bigquery.BigQueryException -import com.google.cloud.bigquery.FormatOptions -import com.google.cloud.bigquery.Job -import com.google.cloud.bigquery.JobInfo -import com.google.cloud.bigquery.LoadJobConfiguration -import com.google.cloud.bigquery.TableId -import io.airbyte.cdk.integrations.destination.gcs.GcsDestinationConfig -import io.airbyte.cdk.integrations.destination.gcs.GcsNameTransformer -import io.airbyte.cdk.integrations.destination.gcs.GcsStorageOperations -import io.airbyte.cdk.integrations.destination.record_buffer.SerializableBuffer -import io.airbyte.integrations.base.destination.typing_deduping.StreamId -import io.airbyte.integrations.destination.bigquery.BigQueryUtils -import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter -import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryDestinationHandler -import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQuerySqlGenerator -import io.airbyte.protocol.models.v0.DestinationSyncMode -import io.github.oshai.kotlinlogging.KotlinLogging -import java.util.* -import org.joda.time.DateTime -import org.joda.time.DateTimeZone - -private val log = KotlinLogging.logger {} - -class BigQueryGcsStorageOperation( - private val gcsStorageOperations: GcsStorageOperations, - private val gcsConfig: GcsDestinationConfig, - private val gcsNameTransformer: GcsNameTransformer, - private val keepStagingFiles: Boolean, - bigquery: BigQuery, - sqlGenerator: BigQuerySqlGenerator, - destinationHandler: BigQueryDestinationHandler -) : - BigQueryStorageOperation( - bigquery, - sqlGenerator, - destinationHandler, - datasetLocation = gcsConfig.bucketRegion!! - ) { - private val connectionId = UUID.randomUUID() - private val syncDateTime = DateTime.now(DateTimeZone.UTC) - override fun prepareStage(streamId: StreamId, destinationSyncMode: DestinationSyncMode) { - super.prepareStage(streamId, destinationSyncMode) - // prepare staging bucket - log.info { "Creating bucket ${gcsConfig.bucketName}" } - gcsStorageOperations.createBucketIfNotExists() - } - - override fun cleanupStage(streamId: StreamId) { - if (keepStagingFiles) return - - val stagingRootPath = stagingRootPath(streamId) - log.info { "Cleaning up staging path at $stagingRootPath" } - gcsStorageOperations.dropBucketObject(stagingRootPath) - } - - override fun writeToStage(streamId: StreamId, data: SerializableBuffer) { - val stagedFileName: String = uploadRecordsToStage(streamId, data) - copyIntoTableFromStage(streamId, stagedFileName) - } - - private fun uploadRecordsToStage(streamId: StreamId, buffer: SerializableBuffer): String { - val objectPath: String = stagingFullPath(streamId) - log.info { - "Uploading records to for ${streamId.rawNamespace}.${streamId.rawName} to path $objectPath" - } - return gcsStorageOperations.uploadRecordsToBucket(buffer, streamId.rawNamespace, objectPath) - } - - private fun copyIntoTableFromStage(streamId: StreamId, stagedFileName: String) { - val tableId = TableId.of(streamId.rawNamespace, streamId.rawName) - val stagingPath = stagingFullPath(streamId) - val fullFilePath = "gs://${gcsConfig.bucketName}/$stagingPath$stagedFileName" - log.info { "Uploading records from file $fullFilePath to target Table $tableId" } - val configuration = - LoadJobConfiguration.builder(tableId, fullFilePath) - .setFormatOptions(FormatOptions.csv()) - .setSchema(BigQueryRecordFormatter.SCHEMA_V2) - .setWriteDisposition(JobInfo.WriteDisposition.WRITE_APPEND) - .setJobTimeoutMs(600000L) // 10 min - .build() - - val loadJob: Job = this.bigquery.create(JobInfo.of(configuration)) - log.info { - "[${loadJob.jobId}] Created a new job to upload record(s) to target table $tableId: $loadJob" - } - try { - BigQueryUtils.waitForJobFinish(loadJob) - log.info { - "[${loadJob.jobId}] Target table $tableId is successfully appended with staging files" - } - } catch (e: BigQueryException) { - throw RuntimeException( - String.format( - "[%s] Failed to upload staging files to destination table %s", - loadJob.jobId, - tableId - ), - e - ) - } catch (e: InterruptedException) { - throw RuntimeException( - String.format( - "[%s] Failed to upload staging files to destination table %s", - loadJob.jobId, - tableId - ), - e - ) - } - } - - private fun stagingFullPath(streamId: StreamId): String { - return gcsNameTransformer.applyDefaultCase( - String.format( - "%s/%s/%02d/%02d/%02d/%s/", - stagingRootPath(streamId), - syncDateTime.year().get(), - syncDateTime.monthOfYear().get(), - syncDateTime.dayOfMonth().get(), - syncDateTime.hourOfDay().get(), - connectionId - ) - ) - } - - private fun stagingRootPath(streamId: StreamId): String { - return gcsNameTransformer.applyDefaultCase( - String.format( - "%s/%s_%s", - gcsConfig.bucketPath, - gcsNameTransformer.convertStreamName(streamId.rawNamespace), - gcsNameTransformer.convertStreamName(streamId.rawName) - ) - ) - } -} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/operation/BigQueryStorageOperation.kt b/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/operation/BigQueryStorageOperation.kt deleted file mode 100644 index 85f79885856e0..0000000000000 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/operation/BigQueryStorageOperation.kt +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Copyright (c) 2024 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.bigquery.operation - -import com.google.cloud.bigquery.BigQuery -import com.google.cloud.bigquery.TableId -import io.airbyte.integrations.base.destination.operation.StorageOperation -import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig -import io.airbyte.integrations.base.destination.typing_deduping.StreamId -import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtil -import io.airbyte.integrations.destination.bigquery.BigQueryUtils -import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter -import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryDestinationHandler -import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQuerySqlGenerator -import io.airbyte.protocol.models.v0.DestinationSyncMode -import io.github.oshai.kotlinlogging.KotlinLogging -import java.time.Instant -import java.util.* -import java.util.concurrent.ConcurrentHashMap - -private val log = KotlinLogging.logger {} - -abstract class BigQueryStorageOperation( - protected val bigquery: BigQuery, - private val sqlGenerator: BigQuerySqlGenerator, - private val destinationHandler: BigQueryDestinationHandler, - protected val datasetLocation: String -) : StorageOperation { - private val existingSchemas = ConcurrentHashMap.newKeySet() - override fun prepareStage(streamId: StreamId, destinationSyncMode: DestinationSyncMode) { - // Prepare staging table. For overwrite, it does drop-create so we can skip explicit create. - if (destinationSyncMode == DestinationSyncMode.OVERWRITE) { - truncateStagingTable(streamId) - } else { - createStagingTable(streamId) - } - } - - private fun createStagingTable(streamId: StreamId) { - val tableId = TableId.of(streamId.rawNamespace, streamId.rawName) - BigQueryUtils.createPartitionedTableIfNotExists( - bigquery, - tableId, - BigQueryRecordFormatter.SCHEMA_V2 - ) - } - - private fun dropStagingTable(streamId: StreamId) { - val tableId = TableId.of(streamId.rawNamespace, streamId.rawName) - bigquery.delete(tableId) - } - - /** - * "Truncates" table, this is a workaround to the issue with TRUNCATE TABLE in BigQuery where - * the table's partition filter must be turned off to truncate. Since deleting a table is a free - * operation this option re-uses functions that already exist - */ - private fun truncateStagingTable(streamId: StreamId) { - val tableId = TableId.of(streamId.rawNamespace, streamId.rawName) - log.info { "Truncating raw table $tableId" } - dropStagingTable(streamId) - createStagingTable(streamId) - } - - override fun cleanupStage(streamId: StreamId) { - log.info { "Nothing to cleanup in stage for Streaming inserts" } - } - - abstract override fun writeToStage(streamId: StreamId, data: Data) - - override fun createFinalTable(streamConfig: StreamConfig, suffix: String, replace: Boolean) { - destinationHandler.execute(sqlGenerator.createTable(streamConfig, suffix, replace)) - } - - override fun softResetFinalTable(streamConfig: StreamConfig) { - TyperDeduperUtil.executeSoftReset( - sqlGenerator = sqlGenerator, - destinationHandler = destinationHandler, - streamConfig, - ) - } - - override fun overwriteFinalTable(streamConfig: StreamConfig, tmpTableSuffix: String) { - if (tmpTableSuffix.isNotBlank()) { - log.info { - "Overwriting table ${streamConfig.id.finalTableId(BigQuerySqlGenerator.QUOTE)} with ${ - streamConfig.id.finalTableId( - BigQuerySqlGenerator.QUOTE, - tmpTableSuffix, - ) - }" - } - destinationHandler.execute( - sqlGenerator.overwriteFinalTable(streamConfig.id, tmpTableSuffix) - ) - } - } - - override fun typeAndDedupe( - streamConfig: StreamConfig, - maxProcessedTimestamp: Optional, - finalTableSuffix: String - ) { - TyperDeduperUtil.executeTypeAndDedupe( - sqlGenerator = sqlGenerator, - destinationHandler = destinationHandler, - streamConfig, - maxProcessedTimestamp, - finalTableSuffix, - ) - } -} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTest.java index 9e0b2eebad84a..e522b901bd9cf 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTest.java @@ -135,6 +135,12 @@ class BigQueryDestinationTest { private AmazonS3 s3Client; + /* + * TODO: Migrate all BigQuery Destination configs (GCS, Denormalized, Normalized) to no longer use + * #partitionIfUnpartitioned then recombine Base Provider. The reason for breaking this method into + * a base class is because #testWritePartitionOverUnpartitioned is no longer used only in GCS + * Staging + */ private Stream successTestConfigProviderBase() { return Stream.of( Arguments.of("config"), diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/AbstractBigQueryTypingDedupingTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/AbstractBigQueryTypingDedupingTest.java index 7574a7d8146ff..79b43682f276c 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/AbstractBigQueryTypingDedupingTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/AbstractBigQueryTypingDedupingTest.java @@ -91,8 +91,12 @@ protected SqlGenerator getSqlGenerator() { return new BigQuerySqlGenerator(getConfig().get(BigQueryConsts.CONFIG_PROJECT_ID).asText(), null); } + /** + * Run a sync using 1.9.0 (which is the highest version that still creates v2 raw tables with JSON + * _airbyte_data). Then run a sync using our current version. + */ @Test - public void testV1V2Migration() throws Exception { + public void testRawTableJsonToStringMigration() throws Exception { final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of( new ConfiguredAirbyteStream() .withSyncMode(SyncMode.FULL_REFRESH) @@ -105,19 +109,23 @@ public void testV1V2Migration() throws Exception { // First sync final List messages1 = readMessages("dat/sync1_messages.jsonl"); - runSync(catalog, messages1, "airbyte/destination-bigquery:1.10.2", config -> { + runSync(catalog, messages1, "airbyte/destination-bigquery:1.9.0", config -> { // Defensive to avoid weird behaviors or test failures if the original config is being altered by // another thread, thanks jackson for a mutable JsonNode JsonNode copiedConfig = Jsons.clone(config); if (config instanceof ObjectNode) { - // Opt out of T+D to run old V1 sync - ((ObjectNode) copiedConfig).put("use_1s1t_format", false); + // Add opt-in T+D flag for older version. this is removed in newer version of the spec. + ((ObjectNode) copiedConfig).put("use_1s1t_format", true); } return copiedConfig; }); - // The record differ code is already adapted to V2 columns format, use the post V2 sync - // to verify that append mode preserved all the raw records and final records. + // 1.9.0 is known-good, but we might as well check that we're in good shape before continuing. + // If this starts erroring out because we added more test records and 1.9.0 had a latent bug, + // just delete these three lines :P + final List expectedRawRecords1 = readRecords("dat/sync1_expectedrecords_raw.jsonl"); + final List expectedFinalRecords1 = readRecords("dat/sync1_expectedrecords_nondedup_final.jsonl"); + verifySyncResult(expectedRawRecords1, expectedFinalRecords1, disableFinalTableComparison()); // Second sync final List messages2 = readMessages("dat/sync2_messages.jsonl"); diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorIntegrationTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorIntegrationTest.java index 81c5fed371de0..c4c40c3cb1437 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorIntegrationTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorIntegrationTest.java @@ -33,9 +33,9 @@ import io.airbyte.integrations.base.destination.typing_deduping.Sql; import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig; import io.airbyte.integrations.base.destination.typing_deduping.StreamId; +import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState; import io.airbyte.integrations.destination.bigquery.BigQueryConsts; import io.airbyte.integrations.destination.bigquery.BigQueryDestination; -import io.airbyte.integrations.destination.bigquery.migrators.BigQueryDestinationState; import io.airbyte.protocol.models.v0.DestinationSyncMode; import java.nio.file.Files; import java.nio.file.Path; @@ -57,7 +57,7 @@ import org.slf4j.LoggerFactory; @Execution(ExecutionMode.CONCURRENT) -public class BigQuerySqlGeneratorIntegrationTest extends BaseSqlGeneratorIntegrationTest { +public class BigQuerySqlGeneratorIntegrationTest extends BaseSqlGeneratorIntegrationTest { private static final Logger LOGGER = LoggerFactory.getLogger(BigQuerySqlGeneratorIntegrationTest.class); diff --git a/docs/integrations/destinations/bigquery.md b/docs/integrations/destinations/bigquery.md index 3bab2ebd0c7fe..55cd9701c663a 100644 --- a/docs/integrations/destinations/bigquery.md +++ b/docs/integrations/destinations/bigquery.md @@ -220,7 +220,7 @@ tutorials: | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------| -| 2.5.0 | 2024-05-17 | [38132](https://github.com/airbytehq/airbyte/pull/38132) | Major rewrite of existing code, Adapting to CDK changes introduced in [38107](https://github.com/airbytehq/airbyte/pull/38107) | +| 2.4.21 | 2024-05-22 | [38588](https://github.com/airbytehq/airbyte/pull/38588) | Revert #38132 | | 2.4.20 | 2024-05-13 | [38131](https://github.com/airbytehq/airbyte/pull/38131) | Cleanup `BigQueryWriteConfig` and reuse `StreamConfig`; Adapt to `StreamConfig` signature changes | | 2.4.19 | 2024-05-10 | [38125](https://github.com/airbytehq/airbyte/pull/38125) | adopt latest CDK code | | 2.4.18 | 2024-05-10 | [38111](https://github.com/airbytehq/airbyte/pull/38111) | No functional changes, deleting unused code |