Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destinations CDK: refreshes logic #38622

Merged
merged 2 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ corresponds to that version.

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :--------------------------------------------------------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| 0.40.0 | 2024-06-17 | [\#38622](https://github.com/airbytehq/airbyte/pull/38622) | Destinations: Implement refreshes logic in AbstractStreamOperation |
| 0.39.0 | 2024-06-17 | [\#38067](https://github.com/airbytehq/airbyte/pull/38067) | Destinations: Breaking changes for refreshes (fail on INCOMPLETE stream status; ignore OVERWRITE sync mode) |
| 0.38.2 | 2024-06-14 | [\#39460](https://github.com/airbytehq/airbyte/pull/39460) | Bump postgres JDBC driver version |
| 0.38.1 | 2024-06-13 | [\#39445](https://github.com/airbytehq/airbyte/pull/39445) | Sources: More CDK changes to handle big initial snapshots. |
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.39.0
version=0.40.0
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,10 @@ abstract class JdbcDestinationHandler<DestinationState>(
streamConfig,
finalTableDefinition.isPresent,
initialRawTableState,
// TODO fix this
// for now, no JDBC destinations actually do refreshes
// so this is just to make our code compile
InitialRawTableStatus(false, false, Optional.empty()),
isSchemaMismatch,
isFinalTableEmpty,
destinationState
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@ class StagingStreamOperations<DestinationState : MinimumDestinationState>(
) {

private val log = KotlinLogging.logger {}
override fun writeRecords(streamConfig: StreamConfig, stream: Stream<PartialAirbyteMessage>) {
override fun writeRecordsImpl(
streamConfig: StreamConfig,
suffix: String,
stream: Stream<PartialAirbyteMessage>
) {
val writeBuffer =
StagingSerializedBufferFactory.initializeBuffer(fileUploadFormat, destinationColumns)

Expand All @@ -51,7 +55,7 @@ class StagingStreamOperations<DestinationState : MinimumDestinationState>(
"Buffer flush complete for stream ${streamConfig.id.originalName} (${FileUtils.byteCountToDisplaySize(it.byteCount)}) to staging"
}
if (it.byteCount != 0L) {
storageOperation.writeToStage(streamConfig, writeBuffer)
storageOperation.writeToStage(streamConfig, suffix, writeBuffer)
} else {
log.info { "Skipping writing to storage since there are no bytes to write" }
}
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@ class StandardStreamOperation<DestinationState : MinimumDestinationState>(
destinationInitialStatus,
disableTypeDedupe
) {
override fun writeRecords(streamConfig: StreamConfig, stream: Stream<PartialAirbyteMessage>) {
storageOperation.writeToStage(streamConfig, stream)
override fun writeRecordsImpl(
streamConfig: StreamConfig,
suffix: String,
stream: Stream<PartialAirbyteMessage>
) {
storageOperation.writeToStage(streamConfig, suffix, stream)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package io.airbyte.integrations.base.destination.operation

import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig
import io.airbyte.integrations.base.destination.typing_deduping.StreamId
import io.airbyte.protocol.models.v0.DestinationSyncMode
import java.time.Instant
import java.util.Optional

Expand All @@ -16,15 +15,44 @@ interface StorageOperation<Data> {
*/

/**
* Prepare staging area which cloud be creating any object storage, temp tables or file storage
* Prepare staging area which cloud be creating any object storage, temp tables or file storage.
* Similar to [createFinalTable], accepts a [suffix] parameter, which should be used in
* conjunction with [overwriteStage].
*
* @param replace If true, then replace existing resources with empty e.g. tables. If false,
* then leave existing resources untouched.
*/
fun prepareStage(streamId: StreamId, destinationSyncMode: DestinationSyncMode)
fun prepareStage(streamId: StreamId, suffix: String, replace: Boolean = false)
gisripa marked this conversation as resolved.
Show resolved Hide resolved

/**
* Swap the "temporary" stage into the "real" stage. For example, `DROP TABLE IF NOT EXISTS
* airbyte_internal.foo; ALTER TABLE airbyte_internal.foo_tmp RENAME TO foo`.
*/
fun overwriteStage(streamId: StreamId, suffix: String)

/**
* Copy all records from the temporary stage into the real stage, then drop the temporary stage.
* For example `INSERT INTO airbyte_internal.foo SELECT * FROM airbyte_internal.foo_tmp; DROP
* TABLE airbyte_internal.foo_tmp`.
*/
fun transferFromTempStage(streamId: StreamId, suffix: String)

/**
* Get the generation of a single record in the stage. Not necessarily the min or max
* generation, just _any_ record.
*
* [AbstractStreamOperation] is responsible for orchestrating the stages so that the temp stage
* always contains exactly one generation.
*
* @return The generation ID of a record in the stage, or `null` if the stage is empty.
*/
fun getStageGeneration(streamId: StreamId, suffix: String): Long?

/** Delete previously staged data, using deterministic information from streamId. */
fun cleanupStage(streamId: StreamId)

/** Write data to stage. */
fun writeToStage(streamConfig: StreamConfig, data: Data)
fun writeToStage(streamConfig: StreamConfig, suffix: String, data: Data)

/*
* ==================== Final Table Operations ================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,13 @@ package io.airbyte.integrations.base.destination.typing_deduping
data class DestinationInitialStatus<DestinationState>(
val streamConfig: StreamConfig,
val isFinalTablePresent: Boolean,
// TODO we should probably make this nullable, then delete InitialRawTableStatus.rawTableExists
val initialRawTableStatus: InitialRawTableStatus,
/**
* The state of the temp raw table, or null if there is no temp raw table at the start of the
* sync.
*/
val initialTempRawTableStatus: InitialRawTableStatus,
val isSchemaMismatch: Boolean,
val isFinalTableEmpty: Boolean,
val destinationState: DestinationState,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,18 @@ import java.util.*

data class InitialRawTableStatus(
val rawTableExists: Boolean,
/**
* Whether there were any records with null `_airbyte_loaded_at`, at the time that this status
* was fetched.
*/
val hasUnprocessedRecords: Boolean,
// TODO Make maxProcessedTimestamp just `Instant?` instead of Optional
/**
* The highest timestamp such that all records in `SELECT * FROM raw_table WHERE
* _airbyte_extracted_at <= ?` have a nonnull `_airbyte_loaded_at`.
*
* Destinations MAY use this value to only run T+D on records with `_airbyte_extracted_at > ?`
* (note the strictly-greater comparison).
*/
val maxProcessedTimestamp: Optional<Instant>
)
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ data class StreamId(
return "$quote$finalNamespace$quote.$quote$finalName$suffix$quote"
}

fun rawTableId(quote: String): String {
return "$quote$rawNamespace$quote.$quote$rawName$quote"
@JvmOverloads
fun rawTableId(quote: String, suffix: String = ""): String {
return "$quote$rawNamespace$quote.$quote$rawName$suffix$quote"
}

fun finalName(quote: String): String {
Expand Down
Loading
Loading