From b64a0d5c38a4624087186db09ab8adfca75a9cd3 Mon Sep 17 00:00:00 2001 From: Gireesh Sreepathi Date: Fri, 10 May 2024 17:19:36 -0700 Subject: [PATCH] bigquery-refactor-new-intf --- .../bigquery/BigQueryGcsOperations.java | 2 +- .../migrators/BigQueryDestinationState.kt | 18 +++++++ .../operation/BigQueryStorageOperations.kt | 51 +++++++++++++++++++ .../BigQueryStreamOperationFactory.kt | 18 +++++++ .../operation/BigQueryStreamOperations.kt | 22 ++++++++ .../bigquery/writer/BigQueryFlushFunction.kt | 20 ++++++++ 6 files changed, 130 insertions(+), 1 deletion(-) create mode 100644 airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/migrators/BigQueryDestinationState.kt create mode 100644 airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/operation/BigQueryStorageOperations.kt create mode 100644 airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/operation/BigQueryStreamOperationFactory.kt create mode 100644 airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/operation/BigQueryStreamOperations.kt create mode 100644 airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/writer/BigQueryFlushFunction.kt diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryGcsOperations.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryGcsOperations.java index c230f2bb79cb..e97ccd6585ef 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryGcsOperations.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryGcsOperations.java @@ -46,7 +46,7 @@ public BigQueryGcsOperations(final BigQuery bigQuery, final StandardNameTransformer gcsNameTransformer, final GcsDestinationConfig gcsConfig, final GcsStorageOperations gcsStorageOperations, - final String datasetLocation, + final String datasetLocation, // TODO: Is this information same as GcsConfig.bucketRegion? final UUID randomStagingId, final DateTime syncDatetime, final boolean keepStagingFiles) { diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/migrators/BigQueryDestinationState.kt b/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/migrators/BigQueryDestinationState.kt new file mode 100644 index 000000000000..5517707f3542 --- /dev/null +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/migrators/BigQueryDestinationState.kt @@ -0,0 +1,18 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.bigquery.migrators + +import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState + +data class BigQueryDestinationState(private val needsSoftReset: Boolean) : MinimumDestinationState { + override fun needsSoftReset(): Boolean { + return needsSoftReset + } + + @Suppress("UNCHECKED_CAST") + override fun withSoftReset(needsSoftReset: Boolean): T { + return copy(needsSoftReset = needsSoftReset) as T + } +} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/operation/BigQueryStorageOperations.kt b/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/operation/BigQueryStorageOperations.kt new file mode 100644 index 000000000000..60f92e84b360 --- /dev/null +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/operation/BigQueryStorageOperations.kt @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.bigquery.operation + +import io.airbyte.cdk.integrations.destination.record_buffer.SerializableBuffer +import io.airbyte.integrations.base.destination.operation.StorageOperations +import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig +import io.airbyte.integrations.base.destination.typing_deduping.StreamId +import io.airbyte.protocol.models.v0.DestinationSyncMode +import java.time.Instant +import java.util.* + +class BigQueryStorageOperations : StorageOperations { + override fun prepareStage(streamId: StreamId, destinationSyncMode: DestinationSyncMode) { + TODO("Not yet implemented") + } + + override fun cleanupStage(streamId: StreamId) { + TODO("Not yet implemented") + } + + override fun writeToStage(streamId: StreamId, buffer: SerializableBuffer) { + TODO("Not yet implemented") + } + + override fun createFinalSchema(streamId: StreamId) { + TODO("Not yet implemented") + } + + override fun createFinalTable(streamConfig: StreamConfig, suffix: String, replace: Boolean) { + TODO("Not yet implemented") + } + + override fun softResetFinalTable(streamConfig: StreamConfig) { + TODO("Not yet implemented") + } + + override fun overwriteFinalTable(streamConfig: StreamConfig, tmpTableSuffix: String) { + TODO("Not yet implemented") + } + + override fun typeAndDedupe( + streamConfig: StreamConfig, + maxProcessedTimestamp: Optional, + finalTableSuffix: String + ) { + TODO("Not yet implemented") + } +} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/operation/BigQueryStreamOperationFactory.kt b/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/operation/BigQueryStreamOperationFactory.kt new file mode 100644 index 000000000000..9db8ad88fa92 --- /dev/null +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/operation/BigQueryStreamOperationFactory.kt @@ -0,0 +1,18 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.bigquery.operation + +import io.airbyte.integrations.base.destination.operation.StreamOperation +import io.airbyte.integrations.base.destination.operation.StreamOperationsFactory +import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus +import io.airbyte.integrations.destination.bigquery.migrators.BigQueryDestinationState + +class BigQueryStreamOperationFactory : StreamOperationsFactory { + override fun createInstance( + destinationInitialStatus: DestinationInitialStatus + ): StreamOperation { + TODO("Not yet implemented") + } +} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/operation/BigQueryStreamOperations.kt b/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/operation/BigQueryStreamOperations.kt new file mode 100644 index 000000000000..1f48205872bf --- /dev/null +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/operation/BigQueryStreamOperations.kt @@ -0,0 +1,22 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.bigquery.operation + +import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage +import io.airbyte.integrations.base.destination.operation.AbstractStreamOperation +import io.airbyte.integrations.base.destination.operation.StorageOperations +import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus +import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig +import io.airbyte.integrations.destination.bigquery.migrators.BigQueryDestinationState +import java.util.stream.Stream + +class BigQueryStreamOperations( + storageOperations: StorageOperations, + destinationInitialStatus: DestinationInitialStatus +) : AbstractStreamOperation(storageOperations, destinationInitialStatus) { + override fun writeRecords(streamConfig: StreamConfig, stream: Stream) { + TODO("Not yet implemented") + } +} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/writer/BigQueryFlushFunction.kt b/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/writer/BigQueryFlushFunction.kt new file mode 100644 index 000000000000..9a5fe814a2b3 --- /dev/null +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/writer/BigQueryFlushFunction.kt @@ -0,0 +1,20 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.bigquery.writer + +import io.airbyte.cdk.integrations.destination.async.function.DestinationFlushFunction +import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage +import io.airbyte.cdk.integrations.destination.operation.SyncOperation +import io.airbyte.protocol.models.v0.StreamDescriptor +import java.util.stream.Stream + +class BigQueryFlushFunction( + override val optimalBatchSizeBytes: Long, + private val syncOperation: SyncOperation +) : DestinationFlushFunction { + override fun flush(streamDescriptor: StreamDescriptor, stream: Stream) { + syncOperation.flushStream(streamDescriptor, stream) + } +}