diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryAsyncFlush.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryAsyncFlush.java deleted file mode 100644 index 142a307aa64f..000000000000 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryAsyncFlush.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.bigquery; - -import com.google.cloud.bigquery.TableId; -import io.airbyte.cdk.integrations.base.JavaBaseConstants.DestinationColumns; -import io.airbyte.cdk.integrations.destination.async.function.DestinationFlushFunction; -import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage; -import io.airbyte.cdk.integrations.destination.record_buffer.FileBuffer; -import io.airbyte.cdk.integrations.destination.record_buffer.SerializableBuffer; -import io.airbyte.cdk.integrations.destination.s3.csv.CsvSerializedBuffer; -import io.airbyte.cdk.integrations.destination.s3.csv.StagingDatabaseCsvSheetGenerator; -import io.airbyte.commons.json.Jsons; -import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig; -import io.airbyte.integrations.base.destination.typing_deduping.StreamId; -import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter; -import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; -import io.airbyte.protocol.models.v0.StreamDescriptor; -import java.util.Map; -import java.util.stream.Stream; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.io.FileUtils; - -/** - * Async flushing logic. Flushing async prevents backpressure and is the superior flushing strategy. - */ -@Slf4j -class BigQueryAsyncFlush implements DestinationFlushFunction { - - private final Map streamConfigMap; - private final BigQueryGcsOperations stagingOperations; - private final ConfiguredAirbyteCatalog catalog; - - public BigQueryAsyncFlush( - final Map streamConfigMap, - final BigQueryGcsOperations stagingOperations, - final ConfiguredAirbyteCatalog catalog) { - this.streamConfigMap = streamConfigMap; - this.stagingOperations = stagingOperations; - this.catalog = catalog; - } - - @Override - public void flush(final StreamDescriptor decs, final Stream stream) throws Exception { - final SerializableBuffer writer; - try { - writer = new CsvSerializedBuffer( - new FileBuffer(CsvSerializedBuffer.CSV_GZ_SUFFIX), - new StagingDatabaseCsvSheetGenerator(DestinationColumns.V2_WITHOUT_META), - true); - - stream.forEach(record -> { - try { - writer.accept(record.getSerialized(), Jsons.serialize(record.getRecord().getMeta()), record.getRecord().getEmittedAt()); - } catch (final Exception e) { - throw new RuntimeException(e); - } - }); - } catch (final Exception e) { - throw new RuntimeException(e); - } - - writer.flush(); - log.info("Flushing CSV buffer for stream {} ({}) to staging", decs.getName(), FileUtils.byteCountToDisplaySize(writer.getByteCount())); - if (!streamConfigMap.containsKey(decs)) { - throw new IllegalArgumentException( - String.format("Message contained record from a stream that was not in the catalog. \ncatalog: %s", Jsons.serialize(catalog))); - } - - final StreamId streamId = streamConfigMap.get(decs).getId(); - try { - final String stagedFileName = stagingOperations.uploadRecordsToStage(streamId.getRawNamespace(), streamId.getOriginalName(), writer); - - stagingOperations.copyIntoTableFromStage( - streamId.getRawNamespace(), - streamId.getOriginalName(), - TableId.of(streamId.getRawNamespace(), streamId.getRawName()), - BigQueryRecordFormatter.SCHEMA_V2, - stagedFileName); - } catch (final Exception e) { - log.error("Failed to flush and commit buffer data into destination's raw table", e); - throw new RuntimeException("Failed to upload buffer to stage and commit to destination", e); - } - - writer.close(); - } - - @Override - public long getOptimalBatchSizeBytes() { - // Chosen arbitrarily (mostly to match legacy behavior). We have no reason to believe a larger - // number would be worse. - // This was previously set to 25MB, which ran into rate-limiting issues: - // https://cloud.google.com/bigquery/quotas#standard_tables - // > Your project can make up to 1,500 table modifications per table per day - return 200 * 1024 * 1024; - } - - @Override - public long getQueueFlushThresholdBytes() { - return 200 * 1024 * 1024; - } - -} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryAsyncStandardFlush.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryAsyncStandardFlush.java deleted file mode 100644 index 9cb5ba5792ba..000000000000 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryAsyncStandardFlush.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.bigquery; - -import com.google.common.util.concurrent.RateLimiter; -import io.airbyte.cdk.integrations.destination.async.function.DestinationFlushFunction; -import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage; -import io.airbyte.integrations.destination.bigquery.uploader.BigQueryDirectUploader; -import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair; -import io.airbyte.protocol.models.v0.StreamDescriptor; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Supplier; -import java.util.stream.Stream; -import lombok.extern.slf4j.Slf4j; - -@Slf4j -public class BigQueryAsyncStandardFlush implements DestinationFlushFunction { - - // TODO remove this once the async framework supports rate-limiting/backpressuring - private static final RateLimiter rateLimiter = RateLimiter.create(0.07); - private final Supplier> uploaderMap; - - public BigQueryAsyncStandardFlush(final Supplier> uploaderMap) { - this.uploaderMap = uploaderMap; - } - - @Override - public void flush(final StreamDescriptor decs, final Stream stream) throws Exception { - rateLimiter.acquire(); - final ConcurrentMap uploaderMapSupplied = uploaderMap.get(); - final AtomicInteger recordCount = new AtomicInteger(); - stream.forEach(aibyteMessage -> { - try { - final AirbyteStreamNameNamespacePair sd = new AirbyteStreamNameNamespacePair(aibyteMessage.getRecord().getStream(), - aibyteMessage.getRecord().getNamespace()); - uploaderMapSupplied.get(sd).upload(aibyteMessage); - recordCount.getAndIncrement(); - } catch (final Exception e) { - log.error("An error happened while trying to flush a record to big query", e); - throw e; - } - }); - uploaderMapSupplied.values().forEach(test -> test.closeAfterPush()); - } - - @Override - public long getOptimalBatchSizeBytes() { - // todo(ryankfu): this should be per-destination specific. currently this is for Snowflake. - // The size chosen is currently for improving the performance of low memory connectors. With 1 Gi of - // resource the connector will usually at most fill up around 150 MB in a single queue. By lowering - // the batch size, the AsyncFlusher will flush in smaller batches which allows for memory to be - // freed earlier similar to a sliding window effect - return Double.valueOf(Runtime.getRuntime().maxMemory() * 0.2).longValue(); - } - -} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryGcsOperations.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryGcsOperations.java deleted file mode 100644 index 9b2bfd087bf0..000000000000 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryGcsOperations.java +++ /dev/null @@ -1,195 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.bigquery; - -import com.google.cloud.bigquery.BigQuery; -import com.google.cloud.bigquery.BigQueryException; -import com.google.cloud.bigquery.FormatOptions; -import com.google.cloud.bigquery.Job; -import com.google.cloud.bigquery.JobInfo; -import com.google.cloud.bigquery.JobInfo.WriteDisposition; -import com.google.cloud.bigquery.LoadJobConfiguration; -import com.google.cloud.bigquery.Schema; -import com.google.cloud.bigquery.TableId; -import io.airbyte.cdk.integrations.destination.StandardNameTransformer; -import io.airbyte.cdk.integrations.destination.gcs.GcsDestinationConfig; -import io.airbyte.cdk.integrations.destination.gcs.GcsStorageOperations; -import io.airbyte.cdk.integrations.destination.record_buffer.SerializableBuffer; -import io.airbyte.cdk.integrations.util.ConnectorExceptionUtil; -import io.airbyte.commons.exceptions.ConfigErrorException; -import java.util.HashSet; -import java.util.Set; -import java.util.UUID; -import org.joda.time.DateTime; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class BigQueryGcsOperations { - - private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryGcsOperations.class); - - private final BigQuery bigQuery; - private final StandardNameTransformer gcsNameTransformer; - private final GcsDestinationConfig gcsConfig; - private final GcsStorageOperations gcsStorageOperations; - - private final String datasetLocation; - private final UUID randomStagingId; - private final DateTime syncDatetime; - private final boolean keepStagingFiles; - private final Set existingSchemas = new HashSet<>(); - - public BigQueryGcsOperations(final BigQuery bigQuery, - final StandardNameTransformer gcsNameTransformer, - final GcsDestinationConfig gcsConfig, - final GcsStorageOperations gcsStorageOperations, - final String datasetLocation, // TODO: Is this information same as GcsConfig.bucketRegion? - final UUID randomStagingId, - final DateTime syncDatetime, - final boolean keepStagingFiles) { - this.bigQuery = bigQuery; - this.gcsNameTransformer = gcsNameTransformer; - this.gcsConfig = gcsConfig; - this.gcsStorageOperations = gcsStorageOperations; - this.datasetLocation = datasetLocation; - this.randomStagingId = randomStagingId; - this.syncDatetime = syncDatetime; - this.keepStagingFiles = keepStagingFiles; - } - - /** - * @return {@code /_} - */ - private String getStagingRootPath(final String datasetId, final String stream) { - return gcsNameTransformer.applyDefaultCase(String.format("%s/%s_%s", - gcsConfig.getBucketPath(), - gcsNameTransformer.convertStreamName(datasetId), - gcsNameTransformer.convertStreamName(stream))); - } - - /** - * @return {@code /_//////} - */ - public String getStagingFullPath(final String datasetId, final String stream) { - return gcsNameTransformer.applyDefaultCase(String.format("%s/%s/%02d/%02d/%02d/%s/", - getStagingRootPath(datasetId, stream), - syncDatetime.year().get(), - syncDatetime.monthOfYear().get(), - syncDatetime.dayOfMonth().get(), - syncDatetime.hourOfDay().get(), - randomStagingId)); - } - - public void createSchemaIfNotExists(final String datasetId) { - if (!existingSchemas.contains(datasetId)) { - LOGGER.info("Creating dataset {}", datasetId); - try { - BigQueryUtils.getOrCreateDataset(bigQuery, datasetId, datasetLocation); - } catch (final BigQueryException e) { - if (ConnectorExceptionUtil.HTTP_AUTHENTICATION_ERROR_CODES.contains(e.getCode())) { - throw new ConfigErrorException(e.getMessage(), e); - } else { - throw e; - } - } - existingSchemas.add(datasetId); - } - } - - public void createTableIfNotExists(final TableId tableId, final Schema tableSchema) { - LOGGER.info("Creating target table {}", tableId); - BigQueryUtils.createPartitionedTableIfNotExists(bigQuery, tableId, tableSchema); - } - - public void createStageIfNotExists(final String datasetId, final String stream) { - final String objectPath = getStagingFullPath(datasetId, stream); - LOGGER.info("Creating staging path for stream {} (dataset {}): {}", stream, datasetId, objectPath); - gcsStorageOperations.createBucketIfNotExists(); - } - - public String uploadRecordsToStage(final String datasetId, final String stream, final SerializableBuffer writer) { - final String objectPath = getStagingFullPath(datasetId, stream); - LOGGER.info("Uploading records to staging for stream {} (dataset {}): {}", stream, datasetId, objectPath); - return gcsStorageOperations.uploadRecordsToBucket(writer, datasetId, objectPath); - } - - /** - * Similar to COPY INTO within - * {@link io.airbyte.cdk.integrations.destination.staging.StagingOperations} which loads the data - * stored in the stage area into a target table in the destination - * - * Reference - * https://googleapis.dev/java/google-cloud-clients/latest/index.html?com/google/cloud/bigquery/package-summary.html - */ - public void copyIntoTableFromStage(final String datasetId, - final String stream, - final TableId tableId, - final Schema tableSchema, - final String stagedFileName) { - LOGGER.info("Uploading records from staging files to target table {} (dataset {}): {}", - tableId, datasetId, stagedFileName); - - final String fullFilePath = String.format("gs://%s/%s%s", gcsConfig.getBucketName(), getStagingFullPath(datasetId, stream), stagedFileName); - LOGGER.info("Uploading staged file: {}", fullFilePath); - final LoadJobConfiguration configuration = LoadJobConfiguration.builder(tableId, fullFilePath) - .setFormatOptions(FormatOptions.csv()) - .setSchema(tableSchema) - .setWriteDisposition(WriteDisposition.WRITE_APPEND) - .setJobTimeoutMs(600000L) // 10 min - .build(); - - final Job loadJob = this.bigQuery.create(JobInfo.of(configuration)); - LOGGER.info("[{}] Created a new job to upload record(s) to target table {} (dataset {}): {}", loadJob.getJobId(), - tableId, datasetId, loadJob); - - try { - BigQueryUtils.waitForJobFinish(loadJob); - LOGGER.info("[{}] Target table {} (dataset {}) is successfully appended with staging files", loadJob.getJobId(), - tableId, datasetId); - } catch (final BigQueryException | InterruptedException e) { - throw new RuntimeException( - String.format("[%s] Failed to upload staging files to destination table %s (%s)", loadJob.getJobId(), - tableId, datasetId), - e); - } - } - - public void dropTableIfExists(final String datasetId, final TableId tableId) { - LOGGER.info("Deleting target table {} (dataset {})", tableId, datasetId); - bigQuery.delete(tableId); - } - - public void dropStageIfExists(final String datasetId, final String stream) { - if (keepStagingFiles) { - return; - } - - final String stagingDatasetPath = getStagingRootPath(datasetId, stream); - LOGGER.info("Cleaning up staging path for stream {} (dataset {}): {}", stream, datasetId, stagingDatasetPath); - gcsStorageOperations.dropBucketObject(stagingDatasetPath); - } - - /** - * "Truncates" table, this is a workaround to the issue with TRUNCATE TABLE in BigQuery where the - * table's partition filter must be turned off to truncate. Since deleting a table is a free - * operation this option re-uses functions that already exist - * - *

- * See: https://cloud.google.com/bigquery/pricing#free - *

- * - * @param datasetId equivalent to schema name - * @param tableId table name - * @param schema schema of the table to be deleted/created - */ - public void truncateTableIfExists(final String datasetId, - final TableId tableId, - final Schema schema) { - LOGGER.info("Truncating target table {} (dataset {})", tableId, datasetId); - dropTableIfExists(datasetId, tableId); - createTableIfNotExists(tableId, schema); - } - -} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordStandardConsumer.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordStandardConsumer.java deleted file mode 100644 index 86de7c49ae31..000000000000 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordStandardConsumer.java +++ /dev/null @@ -1,211 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.bigquery; - -import com.fasterxml.jackson.databind.JsonNode; -import com.google.cloud.bigquery.BigQuery; -import com.google.cloud.bigquery.TableId; -import io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer; -import io.airbyte.cdk.integrations.destination.async.AsyncStreamConsumer; -import io.airbyte.cdk.integrations.destination.async.buffers.BufferManager; -import io.airbyte.cdk.integrations.destination.async.state.FlushFailure; -import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnCloseFunction; -import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnStartFunction; -import io.airbyte.integrations.base.destination.typing_deduping.DefaultTyperDeduper; -import io.airbyte.integrations.base.destination.typing_deduping.NoOpTyperDeduperWithV1V2Migrations; -import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog; -import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig; -import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduper; -import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter; -import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryDestinationHandler; -import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQuerySqlGenerator; -import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryV1V2Migrator; -import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryV2TableMigrator; -import io.airbyte.integrations.destination.bigquery.uploader.BigQueryDirectUploader; -import io.airbyte.integrations.destination.bigquery.uploader.BigQueryUploaderFactory; -import io.airbyte.integrations.destination.bigquery.uploader.config.UploaderConfig; -import io.airbyte.protocol.models.v0.AirbyteMessage; -import io.airbyte.protocol.models.v0.AirbyteStream; -import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair; -import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; -import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream; -import io.airbyte.protocol.models.v0.DestinationSyncMode; -import java.io.IOException; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executors; -import java.util.function.Consumer; -import java.util.function.Supplier; -import lombok.extern.slf4j.Slf4j; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@Slf4j -@SuppressWarnings("try") -public class BigQueryRecordStandardConsumer extends AsyncStreamConsumer { - - private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryRecordStandardConsumer.class); - - public BigQueryRecordStandardConsumer(Consumer outputRecordCollector, - OnStartFunction onStart, - OnCloseFunction onClose, - ConfiguredAirbyteCatalog catalog, - String defaultNamespace, - Supplier> uploaderMap) { - super(outputRecordCollector, - onStart, - onClose, - new BigQueryAsyncStandardFlush(uploaderMap), - catalog, - new BufferManager((long) (Runtime.getRuntime().maxMemory() * 0.5)), - Optional.ofNullable(defaultNamespace), - new FlushFailure(), - Executors.newFixedThreadPool(2)); - } - - public static SerializedAirbyteMessageConsumer createStandardConsumer(final BigQuery bigquery, - final JsonNode config, - final ConfiguredAirbyteCatalog catalog, - final ParsedCatalog parsedCatalog, - final Consumer outputRecordCollector, - final BigQuerySqlGenerator sqlGenerator, - final BigQueryDestinationHandler destinationHandler, - final boolean disableTypeDedupe) - throws Exception { - // Code related to initializing standard insert consumer isolated in this class file. - final TyperDeduper typerDeduper = - buildTyperDeduper(sqlGenerator, parsedCatalog, destinationHandler, bigquery, disableTypeDedupe); - return getStandardRecordConsumer(bigquery, config, catalog, parsedCatalog, outputRecordCollector, typerDeduper); - - } - - private static SerializedAirbyteMessageConsumer getStandardRecordConsumer(final BigQuery bigquery, - final JsonNode config, - final ConfiguredAirbyteCatalog catalog, - final ParsedCatalog parsedCatalog, - final Consumer outputRecordCollector, - final TyperDeduper typerDeduper) - throws Exception { - final Supplier> writeConfigs = getUploaderMap( - bigquery, - config, - catalog, - parsedCatalog); - - final String bqNamespace = BigQueryUtils.getDatasetId(config); - - return new BigQueryRecordStandardConsumer( - outputRecordCollector, - () -> { - typerDeduper.prepareSchemasAndRunMigrations(); - - // Set up our raw tables - writeConfigs.get().forEach((streamId, uploader) -> { - final StreamConfig stream = parsedCatalog.getStream(streamId); - if (stream.getDestinationSyncMode() == DestinationSyncMode.OVERWRITE) { - // For streams in overwrite mode, truncate the raw table. - // non-1s1t syncs actually overwrite the raw table at the end of the sync, so we only do this in - // 1s1t mode. - final TableId rawTableId = TableId.of(stream.getId().getRawNamespace(), stream.getId().getRawName()); - LOGGER.info("Deleting Raw table {}", rawTableId); - if (!bigquery.delete(rawTableId)) { - LOGGER.info("Raw table {} not found, continuing with creation", rawTableId); - } - LOGGER.info("Creating table {}", rawTableId); - BigQueryUtils.createPartitionedTableIfNotExists(bigquery, rawTableId, BigQueryRecordFormatter.SCHEMA_V2); - } else { - uploader.createRawTable(); - } - }); - - typerDeduper.prepareFinalTables(); - }, - (hasFailed, streamSyncSummaries) -> { - try { - Thread.sleep(30 * 1000); - typerDeduper.typeAndDedupe(streamSyncSummaries); - typerDeduper.commitFinalTables(); - typerDeduper.cleanup(); - } catch (final Exception e) { - throw new RuntimeException(e); - } - }, - catalog, - bqNamespace, - writeConfigs); - } - - protected static Supplier> getUploaderMap( - final BigQuery bigquery, - final JsonNode config, - final ConfiguredAirbyteCatalog catalog, - final ParsedCatalog parsedCatalog) - throws IOException { - return () -> { - final ConcurrentMap uploaderMap = new ConcurrentHashMap<>(); - for (final ConfiguredAirbyteStream configStream : catalog.getStreams()) { - final AirbyteStream stream = configStream.getStream(); - final StreamConfig parsedStream; - - final String targetTableName; - - parsedStream = parsedCatalog.getStream(stream.getNamespace(), stream.getName()); - targetTableName = parsedStream.getId().getRawName(); - - final UploaderConfig uploaderConfig = UploaderConfig - .builder() - .bigQuery(bigquery) - .parsedStream(parsedStream) - .bigQueryClientChunkSize(BigQueryUtils.getBigQueryClientChunkSize(config)) - .datasetLocation(BigQueryUtils.getDatasetLocation(config)) - .formatter(new BigQueryRecordFormatter(new BigQuerySQLNameTransformer())) - .targetTableName(targetTableName) - .build(); - - try { - putStreamIntoUploaderMap(stream, uploaderConfig, uploaderMap); - } catch (final IOException e) { - throw new RuntimeException(e); - } - } - return uploaderMap; - }; - } - - protected static void putStreamIntoUploaderMap(final AirbyteStream stream, - final UploaderConfig uploaderConfig, - final Map uploaderMap) - throws IOException { - uploaderMap.put( - AirbyteStreamNameNamespacePair.fromAirbyteStream(stream), - BigQueryUploaderFactory.getUploader(uploaderConfig)); - } - - private static TyperDeduper buildTyperDeduper(final BigQuerySqlGenerator sqlGenerator, - final ParsedCatalog parsedCatalog, - final BigQueryDestinationHandler destinationHandler, - final BigQuery bigquery, - final boolean disableTypeDedupe) { - final BigQueryV1V2Migrator migrator = new BigQueryV1V2Migrator(bigquery, new BigQuerySQLNameTransformer()); - final BigQueryV2TableMigrator v2RawTableMigrator = new BigQueryV2TableMigrator(bigquery); - - if (disableTypeDedupe) { - return new NoOpTyperDeduperWithV1V2Migrations<>( - sqlGenerator, destinationHandler, parsedCatalog, migrator, v2RawTableMigrator, List.of()); - } - - return new DefaultTyperDeduper<>( - sqlGenerator, - destinationHandler, - parsedCatalog, - migrator, - v2RawTableMigrator, - List.of()); - } - -} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingConsumerFactory.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingConsumerFactory.java deleted file mode 100644 index 72f6362d61c0..000000000000 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingConsumerFactory.java +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.bigquery; - -import static io.airbyte.cdk.integrations.base.JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE; - -import com.google.cloud.bigquery.TableId; -import com.google.common.base.Functions; -import com.google.common.base.Preconditions; -import io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer; -import io.airbyte.cdk.integrations.destination.async.AsyncStreamConsumer; -import io.airbyte.cdk.integrations.destination.async.buffers.BufferManager; -import io.airbyte.cdk.integrations.destination.async.function.DestinationFlushFunction; -import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnCloseFunction; -import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnStartFunction; -import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog; -import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig; -import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduper; -import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter; -import io.airbyte.protocol.models.v0.AirbyteMessage; -import io.airbyte.protocol.models.v0.AirbyteStream; -import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; -import io.airbyte.protocol.models.v0.DestinationSyncMode; -import io.airbyte.protocol.models.v0.StreamDescriptor; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.function.Consumer; -import java.util.stream.Collectors; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This class mimics the same functionality as - * {@link io.airbyte.cdk.integrations.destination.staging.StagingConsumerFactory} which likely - * should be placed into a commons package to be utilized across all ConsumerFactories - */ -public class BigQueryStagingConsumerFactory { - - private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryStagingConsumerFactory.class); - - public SerializedAirbyteMessageConsumer createAsync( - final ConfiguredAirbyteCatalog catalog, - final Consumer outputRecordCollector, - final BigQueryGcsOperations bigQueryGcsOperations, - final TyperDeduper typerDeduper, - final ParsedCatalog parsedCatalog, - final String defaultNamespace) { - final Map streamConfigMap = createWriteConfigs( - catalog, - parsedCatalog); - - final DestinationFlushFunction flusher = new BigQueryAsyncFlush(streamConfigMap, bigQueryGcsOperations, catalog); - return new AsyncStreamConsumer( - outputRecordCollector, - onStartFunction(bigQueryGcsOperations, parsedCatalog.getStreams(), typerDeduper), - onCloseFunction(bigQueryGcsOperations, parsedCatalog.getStreams(), typerDeduper), - flusher, - catalog, - new BufferManager(getBigQueryBufferMemoryLimit()), - Optional.ofNullable(defaultNamespace)); - } - - /** - * Out BigQuery's uploader threads use a fair amount of memory. We believe this is largely due to - * the sdk client we use. - * - * @return number of bytes to make available for message buffering. - */ - private long getBigQueryBufferMemoryLimit() { - return (long) (Runtime.getRuntime().maxMemory() * 0.4); - } - - private Map createWriteConfigs(final ConfiguredAirbyteCatalog catalog, - final ParsedCatalog parsedCatalog) { - return catalog.getStreams().stream() - .map(configuredStream -> { - Preconditions.checkNotNull(configuredStream.getDestinationSyncMode(), "Undefined destination sync mode"); - - final AirbyteStream stream = configuredStream.getStream(); - return parsedCatalog.getStream(stream.getNamespace(), stream.getName()); - }) - .collect(Collectors.toMap( - c -> new StreamDescriptor().withName(c.getId().getOriginalName()).withNamespace(c.getId().getOriginalNamespace()), - Functions.identity())); - } - - /** - * @param bigQueryGcsOperations collection of Google Cloud Storage Operations - * @param streamConfigs configuration settings used to describe how to write data and where it - * exists - */ - private OnStartFunction onStartFunction(final BigQueryGcsOperations bigQueryGcsOperations, - final List streamConfigs, - final TyperDeduper typerDeduper) { - return () -> { - LOGGER.info("Preparing airbyte_raw tables in destination started for {} streams", streamConfigs.size()); - typerDeduper.prepareSchemasAndRunMigrations(); - - for (final StreamConfig streamConfig : streamConfigs) { - final var tableId = TableId.of(streamConfig.getId().getRawNamespace(), streamConfig.getId().getRawName()); - LOGGER.info("Preparing staging are in destination for schema: {}, stream: {}, target table: {}, stage: {}", - BigQueryRecordFormatter.SCHEMA_V2, streamConfig.getId().getOriginalName(), - tableId, streamConfig.getId().getOriginalName()); - // In Destinations V2, we will always use the 'airbyte_internal' schema/originalNamespace for raw - // tables - final String rawDatasetId = DEFAULT_AIRBYTE_INTERNAL_NAMESPACE; - // Regardless, ensure the schema the customer wants to write to exists - bigQueryGcsOperations.createSchemaIfNotExists(streamConfig.getId().getRawNamespace()); - // Schema used for raw and airbyte internal tables - bigQueryGcsOperations.createSchemaIfNotExists(rawDatasetId); - // Customer's destination schema - // With checkpointing, we will be creating the target table earlier in the setup such that - // the data can be immediately loaded from the staging area - bigQueryGcsOperations.createTableIfNotExists(tableId, BigQueryRecordFormatter.SCHEMA_V2); - bigQueryGcsOperations.createStageIfNotExists(rawDatasetId, streamConfig.getId().getOriginalName()); - // When OVERWRITE mode, truncate the destination's raw table prior to syncing data - if (streamConfig.getDestinationSyncMode() == DestinationSyncMode.OVERWRITE) { - // TODO: this might need special handling during the migration - bigQueryGcsOperations.truncateTableIfExists(rawDatasetId, tableId, BigQueryRecordFormatter.SCHEMA_V2); - } - } - - typerDeduper.prepareFinalTables(); - LOGGER.info("Preparing tables in destination completed."); - }; - } - - /** - * Tear down process, will attempt to clean out any staging area - * - * @param bigQueryGcsOperations collection of staging operations - * @param streamConfigs configuration settings used to describe how to write data and where it - * exists - */ - private OnCloseFunction onCloseFunction(final BigQueryGcsOperations bigQueryGcsOperations, - final List streamConfigs, - final TyperDeduper typerDeduper) { - return (hasFailed, streamSyncSummaries) -> { - /* - * Previously the hasFailed value was used to commit any remaining staged files into destination, - * however, with the changes to checkpointing this will no longer be necessary since despite partial - * successes, we'll be committing the target table (aka airbyte_raw) table throughout the sync - */ - typerDeduper.typeAndDedupe(streamSyncSummaries); - LOGGER.info("Cleaning up destination started for {} streams", streamConfigs.size()); - for (final StreamConfig streamConfig : streamConfigs) { - bigQueryGcsOperations.dropStageIfExists(streamConfig.getId().getRawNamespace(), streamConfig.getId().getOriginalName()); - } - typerDeduper.commitFinalTables(); - typerDeduper.cleanup(); - LOGGER.info("Cleaning up destination completed."); - }; - } - -} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/BigQueryRecordFormatter.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/BigQueryRecordFormatter.java index ccb5d513b048..7627c609ec93 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/BigQueryRecordFormatter.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/BigQueryRecordFormatter.java @@ -16,8 +16,6 @@ import java.util.HashMap; import java.util.UUID; import java.util.concurrent.TimeUnit; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * The class formats incoming JsonSchema and AirbyteRecord in order to be inline with a @@ -30,7 +28,6 @@ public class BigQueryRecordFormatter { Field.of(JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT, StandardSQLTypeName.TIMESTAMP), Field.of(JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT, StandardSQLTypeName.TIMESTAMP), Field.of(JavaBaseConstants.COLUMN_NAME_DATA, StandardSQLTypeName.STRING)); - private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryRecordFormatter.class); protected final StandardNameTransformer namingResolver; diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryDestinationHandler.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryDestinationHandler.java index 056a620fd89f..71e7ee5604f6 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryDestinationHandler.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryDestinationHandler.java @@ -60,7 +60,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -// TODO this stuff almost definitely exists somewhere else in our codebase. public class BigQueryDestinationHandler implements DestinationHandler { private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryDestinationHandler.class); diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/BigQueryDirectUploader.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/BigQueryDirectUploader.java deleted file mode 100644 index 2142a8edd6e6..000000000000 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/BigQueryDirectUploader.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.bigquery.uploader; - -import static io.airbyte.integrations.destination.bigquery.BigQueryUtils.printHeapMemoryConsumption; - -import com.google.cloud.bigquery.BigQuery; -import com.google.cloud.bigquery.StandardTableDefinition; -import com.google.cloud.bigquery.Table; -import com.google.cloud.bigquery.TableId; -import com.google.cloud.bigquery.TableInfo; -import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage; -import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter; -import io.airbyte.integrations.destination.bigquery.writer.BigQueryTableWriter; -import java.io.IOException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class BigQueryDirectUploader { - - private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryDirectUploader.class); - - protected final TableId table; - protected final BigQueryTableWriter writer; - protected final BigQuery bigQuery; - protected final BigQueryRecordFormatter recordFormatter; - - BigQueryDirectUploader(final TableId table, - final BigQueryTableWriter writer, - final BigQuery bigQuery, - final BigQueryRecordFormatter recordFormatter) { - this.table = table; - this.writer = writer; - this.bigQuery = bigQuery; - this.recordFormatter = recordFormatter; - } - - public void upload(final PartialAirbyteMessage airbyteMessage) { - try { - writer.write(recordFormatter.formatRecord(airbyteMessage)); - } catch (final IOException | RuntimeException e) { - LOGGER.error("Got an error while writing message: {}", e.getMessage(), e); - LOGGER.error(String.format( - "Failed to process a message for job: %s", - writer.toString())); - printHeapMemoryConsumption(); - throw new RuntimeException(e); - } - } - - public void closeAfterPush() { - try { - this.writer.close(); - } catch (final IOException e) { - throw new RuntimeException(e); - } - } - - public void createRawTable() { - // Ensure that this table exists. - final Table rawTable = bigQuery.getTable(table); - if (rawTable == null) { - LOGGER.info("Creating raw table {}.", table); - bigQuery.create(TableInfo.newBuilder(table, StandardTableDefinition.of(BigQueryRecordFormatter.SCHEMA_V2)).build()); - } else { - LOGGER.info("Found raw table {}.", rawTable.getTableId()); - } - } - - @Override - public String toString() { - return "BigQueryDirectUploader{" + - "table=" + table.getTable() + - ", writer=" + writer.getClass() + - ", recordFormatter=" + recordFormatter.getClass() + - '}'; - } - -} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/BigQueryUploaderFactory.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/BigQueryUploaderFactory.java deleted file mode 100644 index 4089a17b7b02..000000000000 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/BigQueryUploaderFactory.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.bigquery.uploader; - -import static io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter.*; - -import com.google.cloud.bigquery.BigQuery; -import com.google.cloud.bigquery.BigQueryException; -import com.google.cloud.bigquery.FormatOptions; -import com.google.cloud.bigquery.JobId; -import com.google.cloud.bigquery.JobInfo; -import com.google.cloud.bigquery.TableDataWriteChannel; -import com.google.cloud.bigquery.TableId; -import com.google.cloud.bigquery.WriteChannelConfiguration; -import io.airbyte.commons.exceptions.ConfigErrorException; -import io.airbyte.integrations.destination.bigquery.BigQueryUtils; -import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter; -import io.airbyte.integrations.destination.bigquery.uploader.config.UploaderConfig; -import io.airbyte.integrations.destination.bigquery.writer.BigQueryTableWriter; -import java.io.IOException; -import java.util.HashSet; -import java.util.Set; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class BigQueryUploaderFactory { - - private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryUploaderFactory.class); - - private static final int HTTP_STATUS_CODE_FORBIDDEN = 403; - private static final int HTTP_STATUS_CODE_NOT_FOUND = 404; - - private static final String CONFIG_ERROR_MSG = """ - Failed to write to destination schema. - - 1. Make sure you have all required permissions for writing to the schema. - - 2. Make sure that the actual destination schema's location corresponds to location provided - in connector's config. - - 3. Try to change the "Destination schema" from "Mirror Source Structure" (if it's set) tp the - "Destination Default" option. - - More details: - """; - - public static BigQueryDirectUploader getUploader(final UploaderConfig uploaderConfig) - throws IOException { - final String dataset = uploaderConfig.getParsedStream().getId().getRawNamespace(); - final String datasetLocation = uploaderConfig.getDatasetLocation(); - final Set existingDatasets = new HashSet<>(); - - final BigQueryRecordFormatter recordFormatter = uploaderConfig.getFormatter(); - - final TableId targetTable = TableId.of(dataset, uploaderConfig.getTargetTableName()); - - BigQueryUtils.createSchemaAndTableIfNeeded( - uploaderConfig.getBigQuery(), - existingDatasets, - dataset, - datasetLocation); - - return getBigQueryDirectUploader( - uploaderConfig.getBigQueryClientChunkSize(), - targetTable, - uploaderConfig.getBigQuery(), - datasetLocation, - recordFormatter); - } - - private static BigQueryDirectUploader getBigQueryDirectUploader(final Integer bigQueryClientChunkSize, - final TableId targetTable, - final BigQuery bigQuery, - final String datasetLocation, - final BigQueryRecordFormatter formatter) { - // https://cloud.google.com/bigquery/docs/loading-data-local#loading_data_from_a_local_data_source - LOGGER.info("Will write raw data to {} with schema {}", targetTable, SCHEMA_V2); - final WriteChannelConfiguration writeChannelConfiguration = - WriteChannelConfiguration.newBuilder(targetTable) - .setCreateDisposition(JobInfo.CreateDisposition.CREATE_IF_NEEDED) - .setSchema(SCHEMA_V2) - .setFormatOptions(FormatOptions.json()) - .build(); // new-line delimited json. - - final JobId job = JobId.newBuilder() - .setRandomJob() - .setLocation(datasetLocation) - .setProject(bigQuery.getOptions().getProjectId()) - .build(); - - final TableDataWriteChannel writer; - - try { - writer = bigQuery.writer(job, writeChannelConfiguration); - } catch (final BigQueryException e) { - if (e.getCode() == HTTP_STATUS_CODE_FORBIDDEN || e.getCode() == HTTP_STATUS_CODE_NOT_FOUND) { - throw new ConfigErrorException(CONFIG_ERROR_MSG + e); - } else { - throw new BigQueryException(e.getCode(), e.getMessage()); - } - } - - // this this optional value. If not set - use default client's value (15MiG) - if (bigQueryClientChunkSize != null) { - writer.setChunkSize(bigQueryClientChunkSize); - } - - return new BigQueryDirectUploader( - targetTable, - new BigQueryTableWriter(writer), - bigQuery, - formatter); - } - -} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/config/UploaderConfig.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/config/UploaderConfig.java deleted file mode 100644 index d94b2145f0a7..000000000000 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/config/UploaderConfig.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.bigquery.uploader.config; - -import com.google.cloud.bigquery.BigQuery; -import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig; -import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter; -import lombok.Builder; -import lombok.Getter; - -@Builder -@Getter -public class UploaderConfig { - - private Integer bigQueryClientChunkSize; - private String datasetLocation; - private StreamConfig parsedStream; - private String targetTableName; - private BigQuery bigQuery; - private BigQueryRecordFormatter formatter; - -} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/writer/BigQueryTableWriter.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/writer/BigQueryTableWriter.java deleted file mode 100644 index 50c2608ae171..000000000000 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/writer/BigQueryTableWriter.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.bigquery.writer; - -import com.google.cloud.bigquery.Job; -import com.google.cloud.bigquery.TableDataWriteChannel; -import com.google.common.base.Charsets; -import io.airbyte.cdk.integrations.base.AirbyteExceptionHandler; -import java.io.IOException; -import java.nio.ByteBuffer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public record BigQueryTableWriter(TableDataWriteChannel writeChannel) { - - private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryTableWriter.class); - - public void write(final String formattedData) throws IOException { - writeChannel.write(ByteBuffer.wrap((formattedData + "\n").getBytes(Charsets.UTF_8))); - } - - public void close() throws IOException { - this.writeChannel.close(); - try { - final Job job = writeChannel.getJob(); - if (job != null && job.getStatus().getError() != null) { - AirbyteExceptionHandler.addStringForDeinterpolation(job.getEtag()); - throw new RuntimeException("Fail to complete a load job in big query, Job id: " + writeChannel.getJob().getJobId() + - ", with error: " + writeChannel.getJob().getStatus().getError()); - } - } catch (final Exception e) { - throw new RuntimeException(e); - } - } - -}