Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Destination BigQuery: Adapt to newer interface for Sync operations" #38588

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.35.7'
cdkVersionRequired = '0.35.0'
features = [
'db-destinations',
'datastore-bigquery',
Expand All @@ -22,7 +22,7 @@ java {
}

application {
mainClass = 'io.airbyte.integrations.destination.bigquery.BigQueryDestinationKt'
mainClass = 'io.airbyte.integrations.destination.bigquery.BigQueryDestination'
applicationDefaultJvmArgs = ['-XX:+ExitOnOutOfMemoryError', '-XX:MaxRAMPercentage=75.0',
'-XX:NativeMemoryTracking=detail', '-XX:+UnlockDiagnosticVMOptions',
'-XX:GCLockerRetryAllocationCount=100',
Expand All @@ -37,5 +37,6 @@ application {
}

dependencies {
implementation 'com.codepoetics:protonpack:1.13'
implementation 'org.apache.commons:commons-text:1.10.0'
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
dockerImageTag: 2.5.0
dockerImageTag: 2.4.21
dockerRepository: airbyte/destination-bigquery
documentationUrl: https://docs.airbyte.com/integrations/destinations/bigquery
githubIssueLabel: destination-bigquery
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.bigquery;

import com.google.cloud.bigquery.TableId;
import io.airbyte.cdk.integrations.base.JavaBaseConstants.DestinationColumns;
import io.airbyte.cdk.integrations.destination.async.function.DestinationFlushFunction;
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage;
import io.airbyte.cdk.integrations.destination.record_buffer.FileBuffer;
import io.airbyte.cdk.integrations.destination.record_buffer.SerializableBuffer;
import io.airbyte.cdk.integrations.destination.s3.csv.CsvSerializedBuffer;
import io.airbyte.cdk.integrations.destination.s3.csv.StagingDatabaseCsvSheetGenerator;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig;
import io.airbyte.integrations.base.destination.typing_deduping.StreamId;
import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.StreamDescriptor;
import java.util.Map;
import java.util.stream.Stream;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;

/**
* Async flushing logic. Flushing async prevents backpressure and is the superior flushing strategy.
*/
@Slf4j
class BigQueryAsyncFlush implements DestinationFlushFunction {

private final Map<StreamDescriptor, StreamConfig> streamConfigMap;
private final BigQueryGcsOperations stagingOperations;
private final ConfiguredAirbyteCatalog catalog;

public BigQueryAsyncFlush(
final Map<StreamDescriptor, StreamConfig> streamConfigMap,
final BigQueryGcsOperations stagingOperations,
final ConfiguredAirbyteCatalog catalog) {
this.streamConfigMap = streamConfigMap;
this.stagingOperations = stagingOperations;
this.catalog = catalog;
}

@Override
public void flush(final StreamDescriptor decs, final Stream<PartialAirbyteMessage> stream) throws Exception {
final SerializableBuffer writer;
try {
writer = new CsvSerializedBuffer(
new FileBuffer(CsvSerializedBuffer.CSV_GZ_SUFFIX),
new StagingDatabaseCsvSheetGenerator(DestinationColumns.V2_WITHOUT_META),
true);

stream.forEach(record -> {
try {
writer.accept(record.getSerialized(), Jsons.serialize(record.getRecord().getMeta()), record.getRecord().getEmittedAt());
} catch (final Exception e) {
throw new RuntimeException(e);
}
});
} catch (final Exception e) {
throw new RuntimeException(e);
}

writer.flush();
log.info("Flushing CSV buffer for stream {} ({}) to staging", decs.getName(), FileUtils.byteCountToDisplaySize(writer.getByteCount()));
if (!streamConfigMap.containsKey(decs)) {
throw new IllegalArgumentException(
String.format("Message contained record from a stream that was not in the catalog. \ncatalog: %s", Jsons.serialize(catalog)));
}

final StreamId streamId = streamConfigMap.get(decs).getId();
try {
final String stagedFileName = stagingOperations.uploadRecordsToStage(streamId.getRawNamespace(), streamId.getOriginalName(), writer);

stagingOperations.copyIntoTableFromStage(
streamId.getRawNamespace(),
streamId.getOriginalName(),
TableId.of(streamId.getRawNamespace(), streamId.getRawName()),
BigQueryRecordFormatter.SCHEMA_V2,
stagedFileName);
} catch (final Exception e) {
log.error("Failed to flush and commit buffer data into destination's raw table", e);
throw new RuntimeException("Failed to upload buffer to stage and commit to destination", e);
}

writer.close();
}

@Override
public long getOptimalBatchSizeBytes() {
// Chosen arbitrarily (mostly to match legacy behavior). We have no reason to believe a larger
// number would be worse.
// This was previously set to 25MB, which ran into rate-limiting issues:
// https://cloud.google.com/bigquery/quotas#standard_tables
// > Your project can make up to 1,500 table modifications per table per day
return 200 * 1024 * 1024;
}

@Override
public long getQueueFlushThresholdBytes() {
return 200 * 1024 * 1024;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.bigquery;

import com.google.common.util.concurrent.RateLimiter;
import io.airbyte.cdk.integrations.destination.async.function.DestinationFlushFunction;
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage;
import io.airbyte.integrations.destination.bigquery.uploader.BigQueryDirectUploader;
import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.v0.StreamDescriptor;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Stream;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class BigQueryAsyncStandardFlush implements DestinationFlushFunction {

// TODO remove this once the async framework supports rate-limiting/backpressuring
private static final RateLimiter rateLimiter = RateLimiter.create(0.07);
private final Supplier<ConcurrentMap<AirbyteStreamNameNamespacePair, BigQueryDirectUploader>> uploaderMap;

public BigQueryAsyncStandardFlush(final Supplier<ConcurrentMap<AirbyteStreamNameNamespacePair, BigQueryDirectUploader>> uploaderMap) {
this.uploaderMap = uploaderMap;
}

@Override
public void flush(final StreamDescriptor decs, final Stream<PartialAirbyteMessage> stream) throws Exception {
rateLimiter.acquire();
final ConcurrentMap<AirbyteStreamNameNamespacePair, BigQueryDirectUploader> uploaderMapSupplied = uploaderMap.get();
final AtomicInteger recordCount = new AtomicInteger();
stream.forEach(aibyteMessage -> {
try {
final AirbyteStreamNameNamespacePair sd = new AirbyteStreamNameNamespacePair(aibyteMessage.getRecord().getStream(),
aibyteMessage.getRecord().getNamespace());
uploaderMapSupplied.get(sd).upload(aibyteMessage);
recordCount.getAndIncrement();
} catch (final Exception e) {
log.error("An error happened while trying to flush a record to big query", e);
throw e;
}
});
uploaderMapSupplied.values().forEach(test -> test.closeAfterPush());
}

@Override
public long getOptimalBatchSizeBytes() {
// todo(ryankfu): this should be per-destination specific. currently this is for Snowflake.
// The size chosen is currently for improving the performance of low memory connectors. With 1 Gi of
// resource the connector will usually at most fill up around 150 MB in a single queue. By lowering
// the batch size, the AsyncFlusher will flush in smaller batches which allows for memory to be
// freed earlier similar to a sliding window effect
return Double.valueOf(Runtime.getRuntime().maxMemory() * 0.2).longValue();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.bigquery;

public class BigQueryConsts {

public static final int MiB = 1024 * 1024;
public static final String CONFIG_DATASET_ID = "dataset_id";
public static final String CONFIG_PROJECT_ID = "project_id";
public static final String CONFIG_DATASET_LOCATION = "dataset_location";
public static final String CONFIG_CREDS = "credentials_json";
public static final String BIG_QUERY_CLIENT_CHUNK_SIZE = "big_query_client_buffer_size_mb";

public static final String LOADING_METHOD = "loading_method";
public static final String METHOD = "method";
public static final String GCS_STAGING = "GCS Staging";
public static final String GCS_BUCKET_NAME = "gcs_bucket_name";
public static final String GCS_BUCKET_PATH = "gcs_bucket_path";
public static final String GCS_BUCKET_REGION = "gcs_bucket_region";
public static final String CREDENTIAL = "credential";
public static final String FORMAT = "format";
public static final String KEEP_GCS_FILES = "keep_files_in_gcs-bucket";
public static final String KEEP_GCS_FILES_VAL = "Keep all tmp files in GCS";

public static final String DISABLE_TYPE_DEDUPE = "disable_type_dedupe";

public static final String NAMESPACE_PREFIX = "n";

// tests
public static final String BIGQUERY_BASIC_CONFIG = "basic_bigquery_config";
public static final String GCS_CONFIG = "gcs_config";

public static final String CREDENTIAL_TYPE = "credential_type";
public static final String HMAC_KEY_ACCESS_ID = "hmac_key_access_id";
public static final String HMAC_KEY_ACCESS_SECRET = "hmac_key_secret";

}
Loading
Loading