Skip to content

Commit

Permalink
interface changes
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao committed May 23, 2024
1 parent e2e12d8 commit 2991827
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class StagingStreamOperations<DestinationState : MinimumDestinationState>(
) {

private val log = KotlinLogging.logger {}
override fun writeRecords(streamConfig: StreamConfig, stream: Stream<PartialAirbyteMessage>) {
override fun writeRecordsImpl(streamConfig: StreamConfig, stream: Stream<PartialAirbyteMessage>) {
val writeBuffer =
StagingSerializedBufferFactory.initializeBuffer(fileUploadFormat, destinationColumns)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ abstract class AbstractStreamOperation<DestinationState : MinimumDestinationStat
private val log = KotlinLogging.logger {}

// State maintained to make decision between async calls
private val rawTableSuffix: String
private val finalTmpTableSuffix: String
private val initialRawTableStatus: InitialRawTableStatus =
destinationInitialStatus.initialRawTableStatus
Expand All @@ -36,7 +37,8 @@ abstract class AbstractStreamOperation<DestinationState : MinimumDestinationStat

init {
val stream = destinationInitialStatus.streamConfig
storageOperation.prepareStage(stream.id, stream.destinationSyncMode)
rawTableSuffix = NO_SUFFIX
storageOperation.prepareStage(stream.id, NO_SUFFIX)
if (!disableTypeDedupe) {
// Prepare final tables based on sync mode.
finalTmpTableSuffix = prepareFinalTable(destinationInitialStatus)
Expand Down Expand Up @@ -114,8 +116,16 @@ abstract class AbstractStreamOperation<DestinationState : MinimumDestinationStat
return NO_SUFFIX
}

override fun writeRecords(streamConfig: StreamConfig, stream: Stream<PartialAirbyteMessage>) {
// redirect to the appropriate raw table (potentially the temp raw table).
writeRecordsImpl(
streamConfig.copy(id = streamConfig.id.copy(rawName = streamConfig.id.rawName + rawTableSuffix)),
stream,
)
}

/** Write records will be destination type specific, Insert vs staging based on format */
abstract override fun writeRecords(
abstract fun writeRecordsImpl(
streamConfig: StreamConfig,
stream: Stream<PartialAirbyteMessage>
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class StandardStreamOperation<DestinationState : MinimumDestinationState>(
destinationInitialStatus,
disableTypeDedupe
) {
override fun writeRecords(streamConfig: StreamConfig, stream: Stream<PartialAirbyteMessage>) {
override fun writeRecordsImpl(streamConfig: StreamConfig, stream: Stream<PartialAirbyteMessage>) {
storageOperation.writeToStage(streamConfig, stream)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package io.airbyte.integrations.base.destination.operation

import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig
import io.airbyte.integrations.base.destination.typing_deduping.StreamId
import io.airbyte.protocol.models.v0.DestinationSyncMode
import java.time.Instant
import java.util.Optional

Expand All @@ -16,9 +15,33 @@ interface StorageOperation<Data> {
*/

/**
* Prepare staging area which cloud be creating any object storage, temp tables or file storage
* Prepare staging area which cloud be creating any object storage, temp tables or file storage.
* Similar to [createFinalTable], accepts a [suffix] parameter, which should be used in conjunction
* with [overwriteStage].
*/
fun prepareStage(streamId: StreamId, destinationSyncMode: DestinationSyncMode)
fun prepareStage(streamId: StreamId, suffix: String, replace: Boolean = false)

/**
* Swap the "temporary" stage into the "real" stage. For example,
* `DROP TABLE airbyte_internal.foo; ALTER TABLE airbyte_internal.foo_tmp RENAME TO foo`.
*/
fun overwriteStage(streamId: StreamId, suffix: String)

/**
* Copy all records from the temporary stage into the real stage, then drop the temporary stage.
* For example `INSERT INTO airbyte_internal.foo SELECT * FROM airbyte_internal.foo_tmp; DROP
* TABLE airbyte_internal.foo_tmp`.
*/
fun transferFromTempStage(streamId: StreamId, suffix: String)

/**
* Get the generation of a single record in the stage. Not necessarily the min or max generation,
* just _any_ record.
*
* [AbstractStreamOperation] is responsible for orchestrating the stages so that the temp stage
* always contains exactly one generation.
*/
fun getStageGeneration(streamId: StreamId, suffix: String): Long

/** Delete previously staged data, using deterministic information from streamId. */
fun cleanupStage(streamId: StreamId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class AbstractStreamOperationTest {
storageOperation,
destinationInitialStatus,
) {
override fun writeRecords(
override fun writeRecordsImpl(
streamConfig: StreamConfig,
stream: Stream<PartialAirbyteMessage>
) {
Expand Down Expand Up @@ -97,7 +97,7 @@ class AbstractStreamOperationTest {
val streamOperations = TestStreamOperation(storageOperation, initialState)

verifySequence {
storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode)
storageOperation.prepareStage(streamId, "")
storageOperation.createFinalTable(streamConfig, "", false)
}
confirmVerified(storageOperation)
Expand Down Expand Up @@ -144,7 +144,7 @@ class AbstractStreamOperationTest {
val streamOperations = TestStreamOperation(storageOperation, initialState)

verifySequence {
storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode)
storageOperation.prepareStage(streamId, "")
storageOperation.createFinalTable(streamConfig, EXPECTED_OVERWRITE_SUFFIX, true)
}
confirmVerified(storageOperation)
Expand Down Expand Up @@ -189,7 +189,7 @@ class AbstractStreamOperationTest {
val streamOperations = TestStreamOperation(storageOperation, initialState)

verifySequence {
storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode)
storageOperation.prepareStage(streamId, "")
// No table creation - we can just reuse the existing table.
}
confirmVerified(storageOperation)
Expand Down Expand Up @@ -232,7 +232,7 @@ class AbstractStreamOperationTest {
val streamOperations = TestStreamOperation(storageOperation, initialState)

verifySequence {
storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode)
storageOperation.prepareStage(streamId, "")
storageOperation.createFinalTable(streamConfig, EXPECTED_OVERWRITE_SUFFIX, true)
}
confirmVerified(storageOperation)
Expand Down Expand Up @@ -276,7 +276,7 @@ class AbstractStreamOperationTest {
val streamOperations = TestStreamOperation(storageOperation, initialState)

verifySequence {
storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode)
storageOperation.prepareStage(streamId, "")
storageOperation.createFinalTable(streamConfig, EXPECTED_OVERWRITE_SUFFIX, true)
}
confirmVerified(storageOperation)
Expand Down Expand Up @@ -320,7 +320,7 @@ class AbstractStreamOperationTest {
val streamOperations = TestStreamOperation(storageOperation, initialState)

verifySequence {
storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode)
storageOperation.prepareStage(streamId, "")
storageOperation.createFinalTable(streamConfig, "", false)
}
confirmVerified(storageOperation)
Expand Down Expand Up @@ -366,7 +366,7 @@ class AbstractStreamOperationTest {
val streamOperations = TestStreamOperation(storageOperation, initialState)

verifySequence {
storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode)
storageOperation.prepareStage(streamId, "")
storageOperation.softResetFinalTable(streamConfig)
}
confirmVerified(storageOperation)
Expand Down Expand Up @@ -410,7 +410,7 @@ class AbstractStreamOperationTest {
val streamOperations = TestStreamOperation(storageOperation, initialState)

verifySequence {
storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode)
storageOperation.prepareStage(streamId, "")
// No soft reset - we can just reuse the existing table.
}
confirmVerified(storageOperation)
Expand Down Expand Up @@ -450,7 +450,7 @@ class AbstractStreamOperationTest {
val streamOperations = TestStreamOperation(storageOperation, initialState)

verifySequence {
storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode)
storageOperation.prepareStage(streamId, "")
storageOperation.softResetFinalTable(streamConfig)
}
confirmVerified(storageOperation)
Expand Down Expand Up @@ -505,7 +505,7 @@ class AbstractStreamOperationTest {
val streamOperations = TestStreamOperation(storageOperation, initialState)

verifySequence {
storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode)
storageOperation.prepareStage(streamId, "")
}
confirmVerified(storageOperation)

Expand Down

0 comments on commit 2991827

Please sign in to comment.