Skip to content

Commit

Permalink
start implementing refreshes orchestration
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao committed May 23, 2024
1 parent 2991827 commit 8a3d661
Show file tree
Hide file tree
Showing 6 changed files with 201 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import io.airbyte.integrations.base.destination.typing_deduping.InitialRawTableS
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState
import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage.AirbyteStreamStatus
import io.airbyte.protocol.models.v0.DestinationSyncMode
import io.github.oshai.kotlinlogging.KotlinLogging
import java.util.Optional
import java.util.stream.Stream
Expand All @@ -24,6 +23,7 @@ abstract class AbstractStreamOperation<DestinationState : MinimumDestinationStat
private val log = KotlinLogging.logger {}

// State maintained to make decision between async calls
private val isTruncateSync: Boolean
private val rawTableSuffix: String
private val finalTmpTableSuffix: String
private val initialRawTableStatus: InitialRawTableStatus =
Expand All @@ -37,8 +37,50 @@ abstract class AbstractStreamOperation<DestinationState : MinimumDestinationStat

init {
val stream = destinationInitialStatus.streamConfig
rawTableSuffix = NO_SUFFIX
storageOperation.prepareStage(stream.id, NO_SUFFIX)

isTruncateSync = when (stream.minimumGenerationId) {
0L -> false
stream.generationId -> true
else -> {
// This is technically already handled in CatalogParser.
throw IllegalArgumentException("Hybrid refreshes are not yet supported.")
}
}

if (isTruncateSync) {
if (initialRawTableStatus.tempRawTableExists) {
val tempStageGeneration =
storageOperation.getStageGeneration(stream.id, TMP_TABLE_SUFFIX)
if (tempStageGeneration != null && tempStageGeneration != stream.generationId) {
// The temp stage is from the wrong generation. Nuke it.
storageOperation.prepareStage(
stream.id,
TMP_TABLE_SUFFIX,
replace = true,
)
}
// (if the existing temp stage is from the correct generation, then we're resuming
// a truncate refresh, and should keep the previous temp stage).
} else {
// We're initiating a new truncate refresh. Create a new temp stage.
storageOperation.prepareStage(
stream.id,
TMP_TABLE_SUFFIX,
)
}
rawTableSuffix = TMP_TABLE_SUFFIX
} else {
if (initialRawTableStatus.tempRawTableExists) {
// There was a previous truncate refresh attempt, which failed, and left some
// records behind.
// Retrieve those records and put them in the real stage.
storageOperation.transferFromTempStage(stream.id, TMP_TABLE_SUFFIX)
// TODO refetch initial table status? or set initialRawTableStatus.hasUnprocessedRecords=true
}
rawTableSuffix = NO_SUFFIX
storageOperation.prepareStage(stream.id, NO_SUFFIX)
}

if (!disableTypeDedupe) {
// Prepare final tables based on sync mode.
finalTmpTableSuffix = prepareFinalTable(destinationInitialStatus)
Expand All @@ -51,7 +93,7 @@ abstract class AbstractStreamOperation<DestinationState : MinimumDestinationStat

companion object {
private const val NO_SUFFIX = ""
private const val TMP_OVERWRITE_TABLE_SUFFIX = "_airbyte_tmp"
private const val TMP_TABLE_SUFFIX = "_airbyte_tmp"
}

private fun prepareFinalTable(
Expand All @@ -70,27 +112,22 @@ abstract class AbstractStreamOperation<DestinationState : MinimumDestinationStat
log.info { "Final Table exists for stream ${stream.id.finalName}" }
// The table already exists. Decide whether we're writing to it directly, or
// using a tmp table.
when (stream.destinationSyncMode) {
// For overwrite, it's wasteful to do T+D, so we don't do soft-reset in prepare.
// Instead,
// we do type-dedupe on a suffixed table and do a swap in finalizeTable when we have to
// for schema mismatches.
DestinationSyncMode.OVERWRITE -> return prepareFinalTableForOverwrite(initialStatus)
DestinationSyncMode.APPEND,
DestinationSyncMode.APPEND_DEDUP -> {
if (
initialStatus.isSchemaMismatch ||
initialStatus.destinationState.needsSoftReset()
) {
// We're loading data directly into the existing table.
// Make sure it has the right schema.
// Also, if a raw table migration wants us to do a soft reset, do that
// here.
log.info { "Executing soft-reset on final table of stream $stream" }
storageOperation.softResetFinalTable(stream)
}
return NO_SUFFIX
if (isTruncateSync) {
// Truncate refresh. Use a temp final table.
return prepareFinalTableForOverwrite(initialStatus)
} else {
if (
initialStatus.isSchemaMismatch ||
initialStatus.destinationState.needsSoftReset()
) {
// We're loading data directly into the existing table.
// Make sure it has the right schema.
// Also, if a raw table migration wants us to do a soft reset, do that
// here.
log.info { "Executing soft-reset on final table of stream $stream" }
storageOperation.softResetFinalTable(stream)
}
return NO_SUFFIX
}
}

Expand All @@ -100,14 +137,13 @@ abstract class AbstractStreamOperation<DestinationState : MinimumDestinationStat
val stream = initialStatus.streamConfig
if (!initialStatus.isFinalTableEmpty || initialStatus.isSchemaMismatch) {
// overwrite an existing tmp table if needed.
storageOperation.createFinalTable(stream, TMP_OVERWRITE_TABLE_SUFFIX, true)
storageOperation.createFinalTable(stream, TMP_TABLE_SUFFIX, true)
log.info {
"Using temp final table for table ${stream.id.finalName}, this will be overwritten at end of sync"
}
// We want to overwrite an existing table. Write into a tmp table.
// We'll overwrite the table at the
// end of the sync.
return TMP_OVERWRITE_TABLE_SUFFIX
// We'll overwrite the table at the end of the sync.
return TMP_TABLE_SUFFIX
}

log.info {
Expand All @@ -133,6 +169,16 @@ abstract class AbstractStreamOperation<DestinationState : MinimumDestinationStat
override fun finalizeTable(streamConfig: StreamConfig, syncSummary: StreamSyncSummary) {
// Delete staging directory, implementation will handle if it has to do it or not or a No-OP
storageOperation.cleanupStage(streamConfig.id)

// Overwrite the raw table before doing anything else.
// This ensures that if T+D fails, we can easily retain the records on the next sync.
// It also means we don't need to run T+D using the temp raw table,
// which is possible (`typeAndDedupe(streamConfig.id.copy(rawName = streamConfig.id.rawName + suffix))`
// but annoying and confusing.
if (isTruncateSync) {
storageOperation.overwriteStage(streamConfig.id, rawTableSuffix)
}

if (disableTypeDedupe) {
log.info {
"Typing and deduping disabled, skipping final table finalization. " +
Expand All @@ -141,50 +187,36 @@ abstract class AbstractStreamOperation<DestinationState : MinimumDestinationStat
return
}

// Legacy logic that if recordsWritten or not tracked then it could be non-zero
val isNotOverwriteSync = streamConfig.destinationSyncMode != DestinationSyncMode.OVERWRITE
// Non-overwrite syncs should T+D regardless of status,
// so the user sees progress after every attempt.
// But overwrite syncs should only run T+D if the stream was successful
// (since we're T+Ding into a temp final table anyway).
val streamStatusRequiresTd =
isNotOverwriteSync || syncSummary.terminalStatus == AirbyteStreamStatus.COMPLETE
val shouldRunTypingDeduping: Boolean =
if (streamStatusRequiresTd) {
// Legacy logic that if recordsWritten or not tracked then it could be non-zero.
// But for OVERWRITE syncs, we don't need to look at old records.
val hasRecordsNeedingTd =
syncSummary.recordsWritten > 0 ||
(isNotOverwriteSync && initialRawTableStatus.hasUnprocessedRecords)
hasRecordsNeedingTd
} else {
false
}
val shouldRunTypingDeduping =
// Normal syncs should T+D regardless of status, so the user sees progress after
// every attempt. And we should T+D records from this sync, _or_ a previous sync.
(!isTruncateSync && (syncSummary.recordsWritten > 0 || initialRawTableStatus.hasUnprocessedRecords)) ||
// But truncate syncs should only T+D if the sync was successful, since we're T+Ding
// into a temp final table anyway. And we only need to check if _this_ sync emitted
// records, since we've nuked the old raw data.
(isTruncateSync && syncSummary.terminalStatus == AirbyteStreamStatus.COMPLETE && syncSummary.recordsWritten > 0)
if (!shouldRunTypingDeduping) {
log.info {
"Skipping typing and deduping for stream ${streamConfig.id.originalNamespace}.${streamConfig.id.originalName} " +
"because it had no records during this sync and no unprocessed records from a previous sync."
}
} else {
// In overwrite mode, we want to read all the raw records. Typically, this is equivalent
// to filtering on timestamp, but might as well be explicit.
val timestampFilter =
if (isNotOverwriteSync) {
initialRawTableStatus.maxProcessedTimestamp
} else {
Optional.empty()
}
storageOperation.typeAndDedupe(streamConfig, timestampFilter, finalTmpTableSuffix)
return
}

if (
streamConfig.destinationSyncMode == DestinationSyncMode.OVERWRITE &&
finalTmpTableSuffix.isNotBlank()
// We should only overwrite the final table if the stream was successful.
// This prevents data downtime if the stream didn't emit all the data.
&&
syncSummary.terminalStatus == AirbyteStreamStatus.COMPLETE
) {
// In truncate mode, we want to read all the raw records. Typically, this is equivalent
// to filtering on timestamp, but might as well be explicit.
val timestampFilter =
if (!isTruncateSync) {
initialRawTableStatus.maxProcessedTimestamp
} else {
Optional.empty()
}
storageOperation.typeAndDedupe(streamConfig, timestampFilter, finalTmpTableSuffix)

// The `shouldRunTypingDeduping` check means we'll only ever reach this point if stream
// status was COMPLETE, so we don't need to include
// `&& syncSummary.terminalStatus == AirbyteStreamStatus.COMPLETE` in this clause.
if (isTruncateSync && finalTmpTableSuffix.isNotBlank()) {
storageOperation.overwriteFinalTable(streamConfig, finalTmpTableSuffix)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@ interface StorageOperation<Data> {
*
* [AbstractStreamOperation] is responsible for orchestrating the stages so that the temp stage
* always contains exactly one generation.
*
* @return The generation ID of a record in the stage, or `null` if the stage is empty.
*/
fun getStageGeneration(streamId: StreamId, suffix: String): Long
fun getStageGeneration(streamId: StreamId, suffix: String): Long?

/** Delete previously staged data, using deterministic information from streamId. */
fun cleanupStage(streamId: StreamId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import java.util.*

data class InitialRawTableStatus(
val rawTableExists: Boolean,
val tempRawTableExists: Boolean,
val hasUnprocessedRecords: Boolean,
val maxProcessedTimestamp: Optional<Instant>
)
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ class DefaultSyncOperationTest {
initialRawTableStatus =
InitialRawTableStatus(
rawTableExists = true,
tempRawTableExists = false,
hasUnprocessedRecords = false,
maxProcessedTimestamp = Optional.empty(),
),
Expand Down Expand Up @@ -172,6 +173,7 @@ class DefaultSyncOperationTest {
initialRawTableStatus =
InitialRawTableStatus(
rawTableExists = true,
tempRawTableExists = false,
hasUnprocessedRecords = false,
maxProcessedTimestamp = Optional.empty(),
),
Expand Down
Loading

0 comments on commit 8a3d661

Please sign in to comment.