Skip to content

Commit

Permalink
Destination Snowflake: Storage ops to support refreshes (#39473)
Browse files Browse the repository at this point in the history
  • Loading branch information
gisripa authored and xiaohansong committed Jul 2, 2024
1 parent 1e1b8e5 commit 0ac4d27
Show file tree
Hide file tree
Showing 23 changed files with 507 additions and 135 deletions.
9 changes: 5 additions & 4 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,18 +174,18 @@ corresponds to that version.

| Version | Date | Pull Request | Subject |
|:--------| :--------- | :--------------------------------------------------------- |:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.40.8 | 2024-06-25 | [\#40499](https://github.com/airbytehq/airbyte/pull/40499) | Make JdbcDatabase SQL statement logging optional; add generation_id support to JdbcSqlGenerator |
| 0.38.3 | 2024-06-25 | [\#40499](https://github.com/airbytehq/airbyte/pull/40499) | (backport) Make JdbcDatabase SQL statement logging optional; add generation_id support to JdbcSqlGenerator |
| 0.40.7 | 2024-07-01 | [40516](https://github.com/airbytehq/airbyte/pull/40516) | Remove dbz hearbeat. |
| 0.40.9 | 2024-07-01 | [\#39473](https://github.com/airbytehq/airbyte/pull/39473) | minor changes around error logging and testing |
| 0.40.8 | 2024-07-01 | [\#40499](https://github.com/airbytehq/airbyte/pull/40499) | Make JdbcDatabase SQL statement logging optional; add generation_id support to JdbcSqlGenerator |
| 0.40.7 | 2024-07-01 | [\#40516](https://github.com/airbytehq/airbyte/pull/40516) | Remove dbz hearbeat. |
| ~~0.40.6~~ | | | (this version does not exist) |
| 0.40.5 | 2024-06-26 | [\#40517](https://github.com/airbytehq/airbyte/pull/40517) | JdbcDatabase.executeWithinTransaction allows disabling SQL statement logging |
| 0.35.16 | 2024-06-25 | [\#40517](https://github.com/airbytehq/airbyte/pull/40517) | (backport) JdbcDatabase.executeWithinTransaction allows disabling SQL statement logging |
| 0.40.4 | 2024-06-18 | [\#40254](https://github.com/airbytehq/airbyte/pull/40254) | Destinations: Do not throw on unrecognized airbyte message type (ignore message instead) |
| 0.40.3 | 2024-06-18 | [\#39526](https://github.com/airbytehq/airbyte/pull/39526) | Destinations: INCOMPLETE stream status is a TRANSIENT error rather than SYSTEM |
| 0.40.2 | 2024-06-18 | [\#39552](https://github.com/airbytehq/airbyte/pull/39552) | Destinations: Throw error if the ConfiguredCatalog has no streams |
| 0.40.1 | 2024-06-14 | [\#39349](https://github.com/airbytehq/airbyte/pull/39349) | Source stats for full refresh streams |
| 0.40.0 | 2024-06-17 | [\#38622](https://github.com/airbytehq/airbyte/pull/38622) | Destinations: Implement refreshes logic in AbstractStreamOperation |
| 0.39.0 | 2024-06-17 | [\#38067](https://github.com/airbytehq/airbyte/pull/38067) | Destinations: Breaking changes for refreshes (fail on INCOMPLETE stream status; ignore OVERWRITE sync mode) |
| 0.38.3 | 2024-06-25 | [\#40499](https://github.com/airbytehq/airbyte/pull/40499) | (backport) Make JdbcDatabase SQL statement logging optional; add generation_id support to JdbcSqlGenerator |
| 0.38.2 | 2024-06-14 | [\#39460](https://github.com/airbytehq/airbyte/pull/39460) | Bump postgres JDBC driver version |
| 0.38.1 | 2024-06-13 | [\#39445](https://github.com/airbytehq/airbyte/pull/39445) | Sources: More CDK changes to handle big initial snapshots. |
| 0.38.0 | 2024-06-11 | [\#39405](https://github.com/airbytehq/airbyte/pull/39405) | Sources: Debezium properties manager interface changed to accept a list of streams to scope to |
Expand All @@ -198,6 +198,7 @@ corresponds to that version.
| 0.36.4 | 2024-05-31 | [\#38824](https://github.com/airbytehq/airbyte/pull/38824) | Param marked as non-null to nullable in JdbcDestinationHandler for NPE fix |
| 0.36.2 | 2024-05-29 | [\#38538](https://github.com/airbytehq/airbyte/pull/38357) | Exit connector when encountering a config error. |
| 0.36.0 | 2024-05-29 | [\#38358](https://github.com/airbytehq/airbyte/pull/38358) | Plumb generation_id / sync_id to destinations code |
| 0.35.16 | 2024-06-25 | [\#40517](https://github.com/airbytehq/airbyte/pull/40517) | (backport) JdbcDatabase.executeWithinTransaction allows disabling SQL statement logging |
| 0.35.15 | 2024-05-31 | [\#38824](https://github.com/airbytehq/airbyte/pull/38824) | Param marked as non-null to nullable in JdbcDestinationHandler for NPE fix |
| 0.35.14 | 2024-05-28 | [\#38738](https://github.com/airbytehq/airbyte/pull/38738) | make ThreadCreationInfo cast as nullable |
| 0.35.13 | 2024-05-28 | [\#38632](https://github.com/airbytehq/airbyte/pull/38632) | minor changes to allow conversion of snowflake tests to kotlin |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ internal constructor(
}
}
} catch (e: Exception) {
LOGGER.error(e) { "caught exception!" }
// Many of the exceptions thrown are nested inside layers of RuntimeExceptions. An
// attempt is made
// to
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.40.8
version=0.40.9
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ constructor(

@VisibleForTesting
fun toStreamConfig(stream: ConfiguredAirbyteStream): StreamConfig {
if (stream.generationId == null) {
if (stream.generationId == null || stream.minimumGenerationId == null) {
throw ConfigErrorException(
"You must upgrade your platform version to use this connector version. Either downgrade your connector or upgrade platform to 0.63.0"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ import org.junit.jupiter.api.Assumptions.assumeTrue
import org.junit.jupiter.api.function.Executable
import org.junit.jupiter.api.parallel.Execution
import org.junit.jupiter.api.parallel.ExecutionMode
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.EnumSource

private val LOGGER = KotlinLogging.logger {}
/**
Expand Down Expand Up @@ -230,15 +228,11 @@ abstract class BaseTypingDedupingTest {
* Starting with an empty destination, execute a full refresh overwrite sync. Verify that the
* records are written to the destination table. Then run a second sync, and verify that the
* records are overwritten.
*
* Parameterized on destination sync mode. After the refreshes project, APPEND and OVERWRITE
* behave identically.
*/
@ParameterizedTest
@EnumSource(DestinationSyncMode::class, names = ["APPEND", "OVERWRITE"])
@Throws(Exception::class)
fun truncateRefresh() {
val catalog =
@Test
open fun truncateRefresh() {
val catalog1 =
io.airbyte.protocol.models.v0
.ConfiguredAirbyteCatalog()
.withStreams(
Expand All @@ -247,8 +241,8 @@ abstract class BaseTypingDedupingTest {
.withSyncId(42)
.withGenerationId(43)
.withMinimumGenerationId(43)
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withSyncMode(SyncMode.FULL_REFRESH)
.withDestinationSyncMode(DestinationSyncMode.OVERWRITE)
.withStream(
AirbyteStream()
.withNamespace(streamNamespace)
Expand All @@ -261,36 +255,54 @@ abstract class BaseTypingDedupingTest {
// First sync
val messages1 = readMessages("dat/sync1_messages.jsonl")

runSync(catalog, messages1)
runSync(catalog1, messages1)

val expectedRawRecords1 = readRecords("dat/sync1_expectedrecords_raw.jsonl")
val expectedFinalRecords1 = readRecords("dat/sync1_expectedrecords_nondedup_final.jsonl")
verifySyncResult(expectedRawRecords1, expectedFinalRecords1, disableFinalTableComparison())

// Second sync
val messages2 = readMessages("dat/sync2_messages.jsonl")
val catalog2 =
io.airbyte.protocol.models.v0
.ConfiguredAirbyteCatalog()
.withStreams(
java.util.List.of(
ConfiguredAirbyteStream()
.withSyncId(42)
.withGenerationId(44)
.withMinimumGenerationId(44)
.withDestinationSyncMode(DestinationSyncMode.OVERWRITE)
.withSyncMode(SyncMode.FULL_REFRESH)
.withStream(
AirbyteStream()
.withNamespace(streamNamespace)
.withName(streamName)
.withJsonSchema(SCHEMA)
)
)
)

runSync(catalog, messages2)
runSync(catalog2, messages2)

val expectedRawRecords2 =
readRecords("dat/sync2_expectedrecords_fullrefresh_overwrite_raw.jsonl")
readRecords("dat/sync2_expectedrecords_fullrefresh_overwrite_with_new_gen_id_raw.jsonl")
val expectedFinalRecords2 =
readRecords("dat/sync2_expectedrecords_fullrefresh_overwrite_final.jsonl")
readRecords(
"dat/sync2_expectedrecords_fullrefresh_overwrite_with_new_gen_id_final.jsonl"
)
verifySyncResult(expectedRawRecords2, expectedFinalRecords2, disableFinalTableComparison())
}

/**
* Starting with an empty destination, execute a full refresh append sync. Verify that the
* records are written to the destination table. Then run a second sync, and verify that the old
* and new records are all present.
*
* Similar to [truncateRefresh], this is parameterized on sync mode.
*/
@ParameterizedTest
@EnumSource(DestinationSyncMode::class, names = ["APPEND", "OVERWRITE"])
@Throws(Exception::class)
fun mergeRefresh() {
val catalog =
@Test
open fun mergeRefresh() {
val catalog1 =
io.airbyte.protocol.models.v0
.ConfiguredAirbyteCatalog()
.withStreams(
Expand All @@ -299,8 +311,8 @@ abstract class BaseTypingDedupingTest {
.withSyncId(42)
.withGenerationId(43)
.withMinimumGenerationId(0)
.withSyncMode(SyncMode.FULL_REFRESH)
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withSyncMode(SyncMode.FULL_REFRESH)
.withStream(
AirbyteStream()
.withNamespace(streamNamespace)
Expand All @@ -313,20 +325,39 @@ abstract class BaseTypingDedupingTest {
// First sync
val messages1 = readMessages("dat/sync1_messages.jsonl")

runSync(catalog, messages1)
runSync(catalog1, messages1)

val expectedRawRecords1 = readRecords("dat/sync1_expectedrecords_raw.jsonl")
val expectedFinalRecords1 = readRecords("dat/sync1_expectedrecords_nondedup_final.jsonl")
verifySyncResult(expectedRawRecords1, expectedFinalRecords1, disableFinalTableComparison())

// Second sync
val messages2 = readMessages("dat/sync2_messages.jsonl")
val catalog2 =
io.airbyte.protocol.models.v0
.ConfiguredAirbyteCatalog()
.withStreams(
java.util.List.of(
ConfiguredAirbyteStream()
.withSyncId(42)
.withGenerationId(44)
.withMinimumGenerationId(0)
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withSyncMode(SyncMode.FULL_REFRESH)
.withStream(
AirbyteStream()
.withNamespace(streamNamespace)
.withName(streamName)
.withJsonSchema(SCHEMA)
)
)
)

runSync(catalog, messages2)
runSync(catalog2, messages2)

val expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_raw.jsonl")
val expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_with_new_gen_id_raw.jsonl")
val expectedFinalRecords2 =
readRecords("dat/sync2_expectedrecords_fullrefresh_append_final.jsonl")
readRecords("dat/sync2_expectedrecords_fullrefresh_append_with_new_gen_id_final.jsonl")
verifySyncResult(expectedRawRecords2, expectedFinalRecords2, disableFinalTableComparison())
}

Expand Down Expand Up @@ -1024,6 +1055,7 @@ abstract class BaseTypingDedupingTest {
disableFinalTableComparison: Boolean
) {
val actualRawRecords = dumpRawTableRecords(streamNamespace, streamName)

if (disableFinalTableComparison) {
DIFFER!!.diffRawTableRecords(expectedRawRecords, actualRawRecords)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.37.1'
cdkVersionRequired = '0.40.9'
features = ['db-destinations', 's3-destinations', 'typing-deduping']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 424892c4-daac-4491-b35d-c6688ba547ba
dockerImageTag: 3.10.1
dockerImageTag: 3.11.0
dockerRepository: airbyte/destination-snowflake
documentationUrl: https://docs.airbyte.com/integrations/destinations/snowflake
githubIssueLabel: destination-snowflake
Expand Down Expand Up @@ -34,6 +34,7 @@ data:
memory_request: 2Gi
supportLevel: certified
supportsDbt: true
supportsRefreshes: true
tags:
- language:java
connectorTestSuitesOptions:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,12 @@ constructor(
hasUnprocessedRecords = true,
maxProcessedTimestamp = Optional.empty()
),
initialTempRawTableStatus =
InitialRawTableStatus(
rawTableExists = false,
hasUnprocessedRecords = true,
maxProcessedTimestamp = Optional.empty()
),
isSchemaMismatch = true,
isFinalTableEmpty = true,
destinationState =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,14 @@ class SnowflakeAbMetaAndGenIdMigration(private val database: JdbcDatabase) :
state.destinationState.copy(isAirbyteMetaPresentInRaw = true),
true
)
} else if (!state.isFinalTablePresent) {
log.info {
"skipping migration of generation_id for table ${stream.id.finalNamespace}.${stream.id.finalName} because final table doesn't exist"
}
} else {
log.info {
"skipping migration of generation_id for table ${stream.id.finalNamespace}.${stream.id.finalName} because schemas match"
}
}

// Final table is untouched, so we don't need to fetch the initial status
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,11 +169,12 @@ class SnowflakeStagingClient(private val database: JdbcDatabase) {
stageName: String,
stagingPath: String,
stagedFiles: List<String>,
streamId: StreamId
streamId: StreamId,
suffix: String = ""
) {
try {
val queryId = UUID.randomUUID()
val query = getCopyQuery(stageName, stagingPath, stagedFiles, streamId)
val query = getCopyQuery(stageName, stagingPath, stagedFiles, streamId, suffix)
log.info { "query $queryId, $query" }
// queryJsons is intentionally used here to get the error message in case of failure
// instead of execute
Expand Down Expand Up @@ -252,12 +253,13 @@ class SnowflakeStagingClient(private val database: JdbcDatabase) {
stageName: String,
stagingPath: String,
stagedFiles: List<String>,
streamId: StreamId
streamId: StreamId,
suffix: String
): String {
return String.format(
COPY_QUERY_1S1T + generateFilesList(stagedFiles) + ";",
streamId.rawNamespace,
streamId.rawName,
streamId.rawName + suffix,
stageName,
stagingPath
)
Expand Down
Loading

0 comments on commit 0ac4d27

Please sign in to comment.