Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destination Snowflake: Storage ops to support refreshes #39473

Merged
merged 1 commit into from
Jul 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,18 +174,18 @@ corresponds to that version.

| Version | Date | Pull Request | Subject |
|:--------| :--------- | :--------------------------------------------------------- |:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.40.8 | 2024-06-25 | [\#40499](https://github.com/airbytehq/airbyte/pull/40499) | Make JdbcDatabase SQL statement logging optional; add generation_id support to JdbcSqlGenerator |
| 0.38.3 | 2024-06-25 | [\#40499](https://github.com/airbytehq/airbyte/pull/40499) | (backport) Make JdbcDatabase SQL statement logging optional; add generation_id support to JdbcSqlGenerator |
| 0.40.7 | 2024-07-01 | [40516](https://github.com/airbytehq/airbyte/pull/40516) | Remove dbz hearbeat. |
| 0.40.9 | 2024-07-01 | [\#39473](https://github.com/airbytehq/airbyte/pull/39473) | minor changes around error logging and testing |
| 0.40.8 | 2024-07-01 | [\#40499](https://github.com/airbytehq/airbyte/pull/40499) | Make JdbcDatabase SQL statement logging optional; add generation_id support to JdbcSqlGenerator |
| 0.40.7 | 2024-07-01 | [\#40516](https://github.com/airbytehq/airbyte/pull/40516) | Remove dbz hearbeat. |
| ~~0.40.6~~ | | | (this version does not exist) |
| 0.40.5 | 2024-06-26 | [\#40517](https://github.com/airbytehq/airbyte/pull/40517) | JdbcDatabase.executeWithinTransaction allows disabling SQL statement logging |
| 0.35.16 | 2024-06-25 | [\#40517](https://github.com/airbytehq/airbyte/pull/40517) | (backport) JdbcDatabase.executeWithinTransaction allows disabling SQL statement logging |
| 0.40.4 | 2024-06-18 | [\#40254](https://github.com/airbytehq/airbyte/pull/40254) | Destinations: Do not throw on unrecognized airbyte message type (ignore message instead) |
| 0.40.3 | 2024-06-18 | [\#39526](https://github.com/airbytehq/airbyte/pull/39526) | Destinations: INCOMPLETE stream status is a TRANSIENT error rather than SYSTEM |
| 0.40.2 | 2024-06-18 | [\#39552](https://github.com/airbytehq/airbyte/pull/39552) | Destinations: Throw error if the ConfiguredCatalog has no streams |
| 0.40.1 | 2024-06-14 | [\#39349](https://github.com/airbytehq/airbyte/pull/39349) | Source stats for full refresh streams |
| 0.40.0 | 2024-06-17 | [\#38622](https://github.com/airbytehq/airbyte/pull/38622) | Destinations: Implement refreshes logic in AbstractStreamOperation |
| 0.39.0 | 2024-06-17 | [\#38067](https://github.com/airbytehq/airbyte/pull/38067) | Destinations: Breaking changes for refreshes (fail on INCOMPLETE stream status; ignore OVERWRITE sync mode) |
| 0.38.3 | 2024-06-25 | [\#40499](https://github.com/airbytehq/airbyte/pull/40499) | (backport) Make JdbcDatabase SQL statement logging optional; add generation_id support to JdbcSqlGenerator |
| 0.38.2 | 2024-06-14 | [\#39460](https://github.com/airbytehq/airbyte/pull/39460) | Bump postgres JDBC driver version |
| 0.38.1 | 2024-06-13 | [\#39445](https://github.com/airbytehq/airbyte/pull/39445) | Sources: More CDK changes to handle big initial snapshots. |
| 0.38.0 | 2024-06-11 | [\#39405](https://github.com/airbytehq/airbyte/pull/39405) | Sources: Debezium properties manager interface changed to accept a list of streams to scope to |
Expand All @@ -198,6 +198,7 @@ corresponds to that version.
| 0.36.4 | 2024-05-31 | [\#38824](https://github.com/airbytehq/airbyte/pull/38824) | Param marked as non-null to nullable in JdbcDestinationHandler for NPE fix |
| 0.36.2 | 2024-05-29 | [\#38538](https://github.com/airbytehq/airbyte/pull/38357) | Exit connector when encountering a config error. |
| 0.36.0 | 2024-05-29 | [\#38358](https://github.com/airbytehq/airbyte/pull/38358) | Plumb generation_id / sync_id to destinations code |
| 0.35.16 | 2024-06-25 | [\#40517](https://github.com/airbytehq/airbyte/pull/40517) | (backport) JdbcDatabase.executeWithinTransaction allows disabling SQL statement logging |
| 0.35.15 | 2024-05-31 | [\#38824](https://github.com/airbytehq/airbyte/pull/38824) | Param marked as non-null to nullable in JdbcDestinationHandler for NPE fix |
| 0.35.14 | 2024-05-28 | [\#38738](https://github.com/airbytehq/airbyte/pull/38738) | make ThreadCreationInfo cast as nullable |
| 0.35.13 | 2024-05-28 | [\#38632](https://github.com/airbytehq/airbyte/pull/38632) | minor changes to allow conversion of snowflake tests to kotlin |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ internal constructor(
}
}
} catch (e: Exception) {
LOGGER.error(e) { "caught exception!" }
// Many of the exceptions thrown are nested inside layers of RuntimeExceptions. An
// attempt is made
// to
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.40.8
version=0.40.9
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ constructor(

@VisibleForTesting
fun toStreamConfig(stream: ConfiguredAirbyteStream): StreamConfig {
if (stream.generationId == null) {
if (stream.generationId == null || stream.minimumGenerationId == null) {
throw ConfigErrorException(
"You must upgrade your platform version to use this connector version. Either downgrade your connector or upgrade platform to 0.63.0"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ import org.junit.jupiter.api.Assumptions.assumeTrue
import org.junit.jupiter.api.function.Executable
import org.junit.jupiter.api.parallel.Execution
import org.junit.jupiter.api.parallel.ExecutionMode
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.EnumSource

private val LOGGER = KotlinLogging.logger {}
/**
Expand Down Expand Up @@ -230,15 +228,11 @@ abstract class BaseTypingDedupingTest {
* Starting with an empty destination, execute a full refresh overwrite sync. Verify that the
* records are written to the destination table. Then run a second sync, and verify that the
* records are overwritten.
*
* Parameterized on destination sync mode. After the refreshes project, APPEND and OVERWRITE
* behave identically.
*/
@ParameterizedTest
@EnumSource(DestinationSyncMode::class, names = ["APPEND", "OVERWRITE"])
@Throws(Exception::class)
fun truncateRefresh() {
val catalog =
@Test
open fun truncateRefresh() {
val catalog1 =
io.airbyte.protocol.models.v0
.ConfiguredAirbyteCatalog()
.withStreams(
Expand All @@ -247,8 +241,8 @@ abstract class BaseTypingDedupingTest {
.withSyncId(42)
.withGenerationId(43)
.withMinimumGenerationId(43)
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withSyncMode(SyncMode.FULL_REFRESH)
.withDestinationSyncMode(DestinationSyncMode.OVERWRITE)
.withStream(
AirbyteStream()
.withNamespace(streamNamespace)
Expand All @@ -261,36 +255,54 @@ abstract class BaseTypingDedupingTest {
// First sync
val messages1 = readMessages("dat/sync1_messages.jsonl")

runSync(catalog, messages1)
runSync(catalog1, messages1)

val expectedRawRecords1 = readRecords("dat/sync1_expectedrecords_raw.jsonl")
val expectedFinalRecords1 = readRecords("dat/sync1_expectedrecords_nondedup_final.jsonl")
verifySyncResult(expectedRawRecords1, expectedFinalRecords1, disableFinalTableComparison())

// Second sync
val messages2 = readMessages("dat/sync2_messages.jsonl")
val catalog2 =
io.airbyte.protocol.models.v0
.ConfiguredAirbyteCatalog()
.withStreams(
java.util.List.of(
ConfiguredAirbyteStream()
.withSyncId(42)
.withGenerationId(44)
.withMinimumGenerationId(44)
.withDestinationSyncMode(DestinationSyncMode.OVERWRITE)
.withSyncMode(SyncMode.FULL_REFRESH)
.withStream(
AirbyteStream()
.withNamespace(streamNamespace)
.withName(streamName)
.withJsonSchema(SCHEMA)
)
)
)

runSync(catalog, messages2)
runSync(catalog2, messages2)

val expectedRawRecords2 =
readRecords("dat/sync2_expectedrecords_fullrefresh_overwrite_raw.jsonl")
readRecords("dat/sync2_expectedrecords_fullrefresh_overwrite_with_new_gen_id_raw.jsonl")
val expectedFinalRecords2 =
readRecords("dat/sync2_expectedrecords_fullrefresh_overwrite_final.jsonl")
readRecords(
"dat/sync2_expectedrecords_fullrefresh_overwrite_with_new_gen_id_final.jsonl"
)
verifySyncResult(expectedRawRecords2, expectedFinalRecords2, disableFinalTableComparison())
}

/**
* Starting with an empty destination, execute a full refresh append sync. Verify that the
* records are written to the destination table. Then run a second sync, and verify that the old
* and new records are all present.
*
* Similar to [truncateRefresh], this is parameterized on sync mode.
*/
@ParameterizedTest
@EnumSource(DestinationSyncMode::class, names = ["APPEND", "OVERWRITE"])
@Throws(Exception::class)
fun mergeRefresh() {
val catalog =
@Test
open fun mergeRefresh() {
val catalog1 =
io.airbyte.protocol.models.v0
.ConfiguredAirbyteCatalog()
.withStreams(
Expand All @@ -299,8 +311,8 @@ abstract class BaseTypingDedupingTest {
.withSyncId(42)
.withGenerationId(43)
.withMinimumGenerationId(0)
.withSyncMode(SyncMode.FULL_REFRESH)
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withSyncMode(SyncMode.FULL_REFRESH)
.withStream(
AirbyteStream()
.withNamespace(streamNamespace)
Expand All @@ -313,20 +325,39 @@ abstract class BaseTypingDedupingTest {
// First sync
val messages1 = readMessages("dat/sync1_messages.jsonl")

runSync(catalog, messages1)
runSync(catalog1, messages1)

val expectedRawRecords1 = readRecords("dat/sync1_expectedrecords_raw.jsonl")
val expectedFinalRecords1 = readRecords("dat/sync1_expectedrecords_nondedup_final.jsonl")
verifySyncResult(expectedRawRecords1, expectedFinalRecords1, disableFinalTableComparison())

// Second sync
val messages2 = readMessages("dat/sync2_messages.jsonl")
val catalog2 =
io.airbyte.protocol.models.v0
.ConfiguredAirbyteCatalog()
.withStreams(
java.util.List.of(
ConfiguredAirbyteStream()
.withSyncId(42)
.withGenerationId(44)
.withMinimumGenerationId(0)
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withSyncMode(SyncMode.FULL_REFRESH)
.withStream(
AirbyteStream()
.withNamespace(streamNamespace)
.withName(streamName)
.withJsonSchema(SCHEMA)
)
)
)

runSync(catalog, messages2)
runSync(catalog2, messages2)

val expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_raw.jsonl")
val expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_with_new_gen_id_raw.jsonl")
val expectedFinalRecords2 =
readRecords("dat/sync2_expectedrecords_fullrefresh_append_final.jsonl")
readRecords("dat/sync2_expectedrecords_fullrefresh_append_with_new_gen_id_final.jsonl")
verifySyncResult(expectedRawRecords2, expectedFinalRecords2, disableFinalTableComparison())
}

Expand Down Expand Up @@ -1024,6 +1055,7 @@ abstract class BaseTypingDedupingTest {
disableFinalTableComparison: Boolean
) {
val actualRawRecords = dumpRawTableRecords(streamNamespace, streamName)

if (disableFinalTableComparison) {
DIFFER!!.diffRawTableRecords(expectedRawRecords, actualRawRecords)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.37.1'
cdkVersionRequired = '0.40.9'
features = ['db-destinations', 's3-destinations', 'typing-deduping']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 424892c4-daac-4491-b35d-c6688ba547ba
dockerImageTag: 3.10.1
dockerImageTag: 3.11.0
dockerRepository: airbyte/destination-snowflake
documentationUrl: https://docs.airbyte.com/integrations/destinations/snowflake
githubIssueLabel: destination-snowflake
Expand Down Expand Up @@ -34,6 +34,7 @@ data:
memory_request: 2Gi
supportLevel: certified
supportsDbt: true
supportsRefreshes: true
tags:
- language:java
connectorTestSuitesOptions:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,12 @@ constructor(
hasUnprocessedRecords = true,
maxProcessedTimestamp = Optional.empty()
),
initialTempRawTableStatus =
InitialRawTableStatus(
rawTableExists = false,
hasUnprocessedRecords = true,
maxProcessedTimestamp = Optional.empty()
),
isSchemaMismatch = true,
isFinalTableEmpty = true,
destinationState =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,14 @@ class SnowflakeAbMetaAndGenIdMigration(private val database: JdbcDatabase) :
state.destinationState.copy(isAirbyteMetaPresentInRaw = true),
true
)
} else if (!state.isFinalTablePresent) {
log.info {
"skipping migration of generation_id for table ${stream.id.finalNamespace}.${stream.id.finalName} because final table doesn't exist"
}
} else {
log.info {
"skipping migration of generation_id for table ${stream.id.finalNamespace}.${stream.id.finalName} because schemas match"
}
}

// Final table is untouched, so we don't need to fetch the initial status
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,11 +169,12 @@ class SnowflakeStagingClient(private val database: JdbcDatabase) {
stageName: String,
stagingPath: String,
stagedFiles: List<String>,
streamId: StreamId
streamId: StreamId,
suffix: String = ""
) {
try {
val queryId = UUID.randomUUID()
val query = getCopyQuery(stageName, stagingPath, stagedFiles, streamId)
val query = getCopyQuery(stageName, stagingPath, stagedFiles, streamId, suffix)
log.info { "query $queryId, $query" }
// queryJsons is intentionally used here to get the error message in case of failure
// instead of execute
Expand Down Expand Up @@ -252,12 +253,13 @@ class SnowflakeStagingClient(private val database: JdbcDatabase) {
stageName: String,
stagingPath: String,
stagedFiles: List<String>,
streamId: StreamId
streamId: StreamId,
suffix: String
): String {
return String.format(
COPY_QUERY_1S1T + generateFilesList(stagedFiles) + ";",
streamId.rawNamespace,
streamId.rawName,
streamId.rawName + suffix,
stageName,
stagingPath
)
Expand Down
Loading
Loading