Skip to content

Commit

Permalink
bigquery-refactor-new-intf
Browse files Browse the repository at this point in the history
  • Loading branch information
gisripa committed May 13, 2024
1 parent f13bbc2 commit b64a0d5
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public BigQueryGcsOperations(final BigQuery bigQuery,
final StandardNameTransformer gcsNameTransformer,
final GcsDestinationConfig gcsConfig,
final GcsStorageOperations gcsStorageOperations,
final String datasetLocation,
final String datasetLocation, // TODO: Is this information same as GcsConfig.bucketRegion?
final UUID randomStagingId,
final DateTime syncDatetime,
final boolean keepStagingFiles) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.bigquery.migrators

import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState

data class BigQueryDestinationState(private val needsSoftReset: Boolean) : MinimumDestinationState {
override fun needsSoftReset(): Boolean {
return needsSoftReset
}

@Suppress("UNCHECKED_CAST")
override fun <T : MinimumDestinationState> withSoftReset(needsSoftReset: Boolean): T {
return copy(needsSoftReset = needsSoftReset) as T
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.bigquery.operation

import io.airbyte.cdk.integrations.destination.record_buffer.SerializableBuffer
import io.airbyte.integrations.base.destination.operation.StorageOperations
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig
import io.airbyte.integrations.base.destination.typing_deduping.StreamId
import io.airbyte.protocol.models.v0.DestinationSyncMode
import java.time.Instant
import java.util.*

class BigQueryStorageOperations : StorageOperations {
override fun prepareStage(streamId: StreamId, destinationSyncMode: DestinationSyncMode) {
TODO("Not yet implemented")
}

override fun cleanupStage(streamId: StreamId) {
TODO("Not yet implemented")
}

override fun writeToStage(streamId: StreamId, buffer: SerializableBuffer) {
TODO("Not yet implemented")
}

override fun createFinalSchema(streamId: StreamId) {
TODO("Not yet implemented")
}

override fun createFinalTable(streamConfig: StreamConfig, suffix: String, replace: Boolean) {
TODO("Not yet implemented")
}

override fun softResetFinalTable(streamConfig: StreamConfig) {
TODO("Not yet implemented")
}

override fun overwriteFinalTable(streamConfig: StreamConfig, tmpTableSuffix: String) {
TODO("Not yet implemented")
}

override fun typeAndDedupe(
streamConfig: StreamConfig,
maxProcessedTimestamp: Optional<Instant>,
finalTableSuffix: String
) {
TODO("Not yet implemented")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.bigquery.operation

import io.airbyte.integrations.base.destination.operation.StreamOperation
import io.airbyte.integrations.base.destination.operation.StreamOperationsFactory
import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus
import io.airbyte.integrations.destination.bigquery.migrators.BigQueryDestinationState

class BigQueryStreamOperationFactory : StreamOperationsFactory<BigQueryDestinationState> {
override fun createInstance(
destinationInitialStatus: DestinationInitialStatus<BigQueryDestinationState>
): StreamOperation<BigQueryDestinationState> {
TODO("Not yet implemented")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.bigquery.operation

import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage
import io.airbyte.integrations.base.destination.operation.AbstractStreamOperation
import io.airbyte.integrations.base.destination.operation.StorageOperations
import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig
import io.airbyte.integrations.destination.bigquery.migrators.BigQueryDestinationState
import java.util.stream.Stream

class BigQueryStreamOperations(
storageOperations: StorageOperations,
destinationInitialStatus: DestinationInitialStatus<BigQueryDestinationState>
) : AbstractStreamOperation<BigQueryDestinationState>(storageOperations, destinationInitialStatus) {
override fun writeRecords(streamConfig: StreamConfig, stream: Stream<PartialAirbyteMessage>) {
TODO("Not yet implemented")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.bigquery.writer

import io.airbyte.cdk.integrations.destination.async.function.DestinationFlushFunction
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage
import io.airbyte.cdk.integrations.destination.operation.SyncOperation
import io.airbyte.protocol.models.v0.StreamDescriptor
import java.util.stream.Stream

class BigQueryFlushFunction<T>(
override val optimalBatchSizeBytes: Long,
private val syncOperation: SyncOperation
) : DestinationFlushFunction {
override fun flush(streamDescriptor: StreamDescriptor, stream: Stream<PartialAirbyteMessage>) {
syncOperation.flushStream(streamDescriptor, stream)
}
}

0 comments on commit b64a0d5

Please sign in to comment.