Skip to content

Commit

Permalink
Merge branch 'master' into alex/destination_amazon_sqs
Browse files Browse the repository at this point in the history
  • Loading branch information
girarda authored May 15, 2024
2 parents d33f761 + 464a89c commit fac8909
Show file tree
Hide file tree
Showing 353 changed files with 4,372 additions and 474 deletions.
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.35.2
version=0.35.4
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import com.google.common.collect.AbstractIterator
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.AirbyteStateStats
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
import io.airbyte.protocol.models.v0.SyncMode
import java.time.Duration
import java.time.Instant
import java.time.OffsetDateTime
Expand Down Expand Up @@ -41,9 +42,11 @@ open class SourceStateIterator<T>(
) {
val stateMessage =
sourceStateMessageProducer.generateStateMessageAtCheckpoint(stream)
stateMessage!!.withSourceStats(
AirbyteStateStats().withRecordCount(recordCount.toDouble())
)
if (shouldAttachCountWithState()) {
stateMessage!!.withSourceStats(
AirbyteStateStats().withRecordCount(recordCount.toDouble())
)
}

recordCount = 0L
lastCheckpoint = Instant.now()
Expand All @@ -64,9 +67,11 @@ open class SourceStateIterator<T>(
hasEmittedFinalState = true
val finalStateMessageForStream =
sourceStateMessageProducer.createFinalStateMessage(stream)
finalStateMessageForStream!!.withSourceStats(
AirbyteStateStats().withRecordCount(recordCount.toDouble())
)
if (shouldAttachCountWithState()) {
finalStateMessageForStream!!.withSourceStats(
AirbyteStateStats().withRecordCount(recordCount.toDouble())
)
}
recordCount = 0L
return AirbyteMessage()
.withType(AirbyteMessage.Type.STATE)
Expand All @@ -76,6 +81,14 @@ open class SourceStateIterator<T>(
}
}

/**
* We are disabling counts for FULL_REFRESH streams cause there is are issues with it. We should
* re-enable it once we do the work for project Counts: Emit Counts in Full Refresh
*/
private fun shouldAttachCountWithState(): Boolean {
return stream?.syncMode != SyncMode.FULL_REFRESH
}

// This method is used to check if we should emit a state message. If the record count is set to
// 0,
// we should not emit a state message.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,7 @@ internal class CursorStateMessageProducerTest {
NAMESPACE,
Field.of(UUID_FIELD_NAME, JsonSchemaType.STRING)
)
.withSyncMode(SyncMode.INCREMENTAL)
.withCursorField(List.of(UUID_FIELD_NAME))

private val EMPTY_STATE_MESSAGE = createEmptyStateMessage(0.0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -665,10 +665,6 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
modelsSchema(),
)
} else {
assertExpectedStateMessageCountMatches(
stateMessages1,
MODEL_RECORDS.size.toLong() + MODEL_RECORDS_2.size.toLong(),
)
assertExpectedRecords(
Streams.concat(MODEL_RECORDS_2.stream(), MODEL_RECORDS.stream())
.collect(Collectors.toSet()),
Expand All @@ -690,7 +686,6 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
val recordMessages2 = extractRecordMessages(actualRecords2)
val stateMessages2 = extractStateMessages(actualRecords2)

assertExpectedStateMessageCountMatches(stateMessages2, 7)
assertExpectedRecords(
Streams.concat(MODEL_RECORDS_2.stream(), Stream.of(puntoRecord))
.collect(Collectors.toSet()),
Expand Down Expand Up @@ -1134,7 +1129,6 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
val recordsFromFirstBatch = extractRecordMessages(dataFromFirstBatch)
val stateAfterFirstBatch = extractStateMessages(dataFromFirstBatch)
assertExpectedStateMessagesForFullRefresh(stateAfterFirstBatch)
assertExpectedStateMessageCountMatches(stateAfterFirstBatch, MODEL_RECORDS.size.toLong())

val stateMessageEmittedAfterFirstSyncCompletion =
stateAfterFirstBatch[stateAfterFirstBatch.size - 1]
Expand Down Expand Up @@ -1244,10 +1238,6 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {

Assertions.assertEquals(12, recordsFromFirstBatch.size)

assertExpectedStateMessageCountMatches(
stateAfterFirstBatch,
MODEL_RECORDS.size.toLong() + MODEL_RECORDS_2.size.toLong(),
)
stateAfterFirstBatch.map { state -> assertStateDoNotHaveDuplicateStreams(state) }
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
# Copyright (c) {{currentYear}} Airbyte, Inc., all rights reserved.
#

import logging

from typing import Any, Iterable, Mapping

from airbyte_cdk import AirbyteLogger
from airbyte_cdk.destinations import Destination
from airbyte_cdk.models import AirbyteConnectionStatus, AirbyteMessage, ConfiguredAirbyteCatalog, Status

Expand Down Expand Up @@ -33,7 +33,7 @@ class Destination{{properCase name}}(Destination):

pass

def check(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
"""
Tests if the input configuration can be used to successfully connect to the destination with the needed permissions
e.g: if a provided API token or password can be used to connect and write to the destination.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,13 @@ data:
tags:
- language:python
- cdk:python
metadataSpecVersion: '1.0'
connectorTestSuitesOptions:
- suite: unitTests
- suite: integrationTests
testSecrets:
- name: SECRET_DESTINATION-AMAZON-SQS
fileName: config.json
secretStore:
type: GSM
alias: airbyte-connector-testing-secret-store
metadataSpecVersion: "1.0"
16 changes: 16 additions & 0 deletions airbyte-integrations/connectors/destination-astra/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,20 @@ data:
tags:
- language:python
- cdk:python
connectorTestSuitesOptions:
- suite: unitTests
- suite: integrationTests
testSecrets:
- name: SECRET_DESTINATION-ASTRA__CREDS
fileName: config.json
secretStore:
type: GSM
alias: airbyte-connector-testing-secret-store
- suite: acceptanceTests
testSecrets:
- name: SECRET_DESTINATION-ASTRA__CREDS
fileName: config.json
secretStore:
type: GSM
alias: airbyte-connector-testing-secret-store
metadataSpecVersion: "1.0"
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,13 @@ data:
sl: 100
ql: 100
supportLevel: community
connectorTestSuitesOptions:
- suite: unitTests
- suite: integrationTests
testSecrets:
- name: SECRET_DESTINATION-AWS-DATALAKE_CREDS
fileName: config.json
secretStore:
type: GSM
alias: airbyte-connector-testing-secret-store
metadataSpecVersion: "1.0"
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,23 @@ data:
sl: 100
ql: 100
supportLevel: community
connectorTestSuitesOptions:
- suite: unitTests
- suite: integrationTests
testSecrets:
- name: SECRET_DESTINATION-AZURE-BLOB-STORAGE-GCS-STAGING__CREDS
fileName: config_gcs_staging.json
secretStore:
type: GSM
alias: airbyte-connector-testing-secret-store
- name: SECRET_DESTINATION-AZURE-BLOB-STORAGE-S3-STAGING__CREDS
fileName: config_s3_staging.json
secretStore:
type: GSM
alias: airbyte-connector-testing-secret-store
- name: SECRET_DESTINATION-AZURE-BLOB-STORAGE__CREDS
fileName: config.json
secretStore:
type: GSM
alias: airbyte-connector-testing-secret-store
metadataSpecVersion: "1.0"
115 changes: 100 additions & 15 deletions airbyte-integrations/connectors/destination-bigquery/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,7 @@ data:
releases:
breakingChanges:
2.0.0:
message:
"**Do not upgrade until you have run a test upgrade as outlined [here](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#testing-destinations-v2-for-a-single-connection)**.
This version introduces [Destinations V2](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#what-is-destinations-v2),
which provides better error handling, incremental delivery of data for large
syncs, and improved final table structures. To review the breaking changes,
and how to upgrade, see [here](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#quick-start-to-upgrading).
These changes will likely require updates to downstream dbt / SQL models,
which we walk through [here](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#updating-downstream-transformations).
Selecting `Upgrade` will upgrade **all** connections using this destination
at their next sync. You can manually sync existing connections prior to
the next scheduled sync to start the upgrade early.
"
message: "**Do not upgrade until you have run a test upgrade as outlined [here](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#testing-destinations-v2-for-a-single-connection)**.\nThis version introduces [Destinations V2](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#what-is-destinations-v2), which provides better error handling, incremental delivery of data for large syncs, and improved final table structures. To review the breaking changes, and how to upgrade, see [here](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#quick-start-to-upgrading). These changes will likely require updates to downstream dbt / SQL models, which we walk through [here](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#updating-downstream-transformations).\nSelecting `Upgrade` will upgrade **all** connections using this destination at their next sync. You can manually sync existing connections prior to the next scheduled sync to start the upgrade early.\n"
upgradeDeadline: "2023-11-07"
resourceRequirements:
jobSpecific:
Expand All @@ -47,4 +33,103 @@ data:
supportsDbt: true
tags:
- language:java
connectorTestSuitesOptions:
- suite: unitTests
- suite: integrationTests
testSecrets:
- name: SECRET_DESTINATION-BIGQUERY_CREDENTIALS_1S1T_DISABLETD_GCS_RAW_OVERRIDE
fileName: credentials-1s1t-disabletd-gcs-raw-override.json
secretStore:
type: GSM
alias: airbyte-connector-testing-secret-store
- name: SECRET_DESTINATION-BIGQUERY_CREDENTIALS_1S1T_DISABLETD_STANDARD_OVERRIDE
fileName: credentials-1s1t-disabletd-standard-raw-override.json
secretStore:
type: GSM
alias: airbyte-connector-testing-secret-store
- name: SECRET_DESTINATION-BIGQUERY_CREDENTIALS_1S1T_GCS
fileName: credentials-1s1t-gcs.json
secretStore:
type: GSM
alias: airbyte-connector-testing-secret-store
- name: SECRET_DESTINATION-BIGQUERY_CREDENTIALS_1S1T_GCS_RAW_OVERRIDE
fileName: credentials-1s1t-gcs-raw-override.json
secretStore:
type: GSM
alias: airbyte-connector-testing-secret-store
- name: SECRET_DESTINATION-BIGQUERY_CREDENTIALS_1S1T_STANDARD
fileName: credentials-1s1t-standard.json
secretStore:
type: GSM
alias: airbyte-connector-testing-secret-store
- name: SECRET_DESTINATION-BIGQUERY_CREDENTIALS_1S1T_STANDARD_RAW_OVERRIDE
fileName: credentials-1s1t-standard-raw-override.json
secretStore:
type: GSM
alias: airbyte-connector-testing-secret-store
- name: SECRET_DESTINATION-BIGQUERY_CREDENTIALS_BAD_PROJECT_CREDS
fileName: credentials-badproject.json
secretStore:
type: GSM
alias: airbyte-connector-testing-secret-store
- name: SECRET_DESTINATION-BIGQUERY_CREDENTIALS_GCS_STAGING
fileName: credentials-gcs-staging.json
secretStore:
type: GSM
alias: airbyte-connector-testing-secret-store
- name: SECRET_DESTINATION-BIGQUERY_CREDENTIALS_IMPERSONATE_FAIL_CREDS
fileName: credentials-impersonate-fail.json
secretStore:
type: GSM
alias: airbyte-connector-testing-secret-store
- name: SECRET_DESTINATION-BIGQUERY_CREDENTIALS_IMPERSONATE__CREDS
fileName: credentials-impersonate.json
secretStore:
type: GSM
alias: airbyte-connector-testing-secret-store
- name: SECRET_DESTINATION-BIGQUERY_CREDENTIALS_NO_GCS_CREATE_ROLE
fileName: copy_gcs_no_create_roles_config.json
secretStore:
type: GSM
alias: airbyte-connector-testing-secret-store
- name: SECRET_DESTINATION-BIGQUERY_CREDENTIALS_NO_GCS_WRITE_ROLE
fileName: copy_gcs_no_write_roles_config.json
secretStore:
type: GSM
alias: airbyte-connector-testing-secret-store
- name: SECRET_DESTINATION-BIGQUERY_CREDENTIALS_NO_PUBLIC_SCHEMA_EDIT_ROLE
fileName: credentials-no-edit-public-schema-role.json
secretStore:
type: GSM
alias: airbyte-connector-testing-secret-store
- name: SECRET_DESTINATION-BIGQUERY_CREDENTIALS_STANDARD
fileName: credentials-standard.json
secretStore:
type: GSM
alias: airbyte-connector-testing-secret-store
- name: SECRET_DESTINATION-BIGQUERY_CREDENTIALS_WITH_MISSED_DATASET_CREATION_ROLE__CREDS
fileName: credentials-with-missed-dataset-creation-role.json
secretStore:
type: GSM
alias: airbyte-connector-testing-secret-store
- name: SECRET_DESTINATION-BIGQUERY_CREDENTIALS__CREDS
fileName: credentials.json
secretStore:
type: GSM
alias: airbyte-connector-testing-secret-store
- name: SECRET_DESTINATION-BIGQUERY_CREDENTIALS__CREDS_OAUTH
fileName: credentials_oauth.json
secretStore:
type: GSM
alias: airbyte-connector-testing-secret-store
- name: SECRET_DESTINATION-BIGQUERY_STANDARD-NO-DATASET-CREATION__CREDS
fileName: credentials-standard-no-dataset-creation.json
secretStore:
type: GSM
alias: airbyte-connector-testing-secret-store
- name: SECRET_DESTINATION-BIGQUERY_STANDARD_NON_BILLABLE_PROJECT__CREDS
fileName: credentials-standard-non-billable-project.json
secretStore:
type: GSM
alias: airbyte-connector-testing-secret-store
metadataSpecVersion: "1.0"
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,7 @@ data:
tags:
- language:python
- cdk:python
connectorTestSuitesOptions:
- suite: unitTests
- suite: integrationTests
metadataSpecVersion: "1.0"
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,7 @@ data:
supportsDbt: false
tags:
- language:java
connectorTestSuitesOptions:
- suite: unitTests
- suite: integrationTests
metadataSpecVersion: "1.0"
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,7 @@ data:
sl: 100
ql: 200
supportLevel: community
connectorTestSuitesOptions:
- suite: unitTests
- suite: integrationTests
metadataSpecVersion: "1.0"
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,7 @@ data:
sl: 100
ql: 100
supportLevel: community
connectorTestSuitesOptions:
- suite: unitTests
- suite: integrationTests
metadataSpecVersion: "1.0"
3 changes: 3 additions & 0 deletions airbyte-integrations/connectors/destination-csv/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,7 @@ data:
sl: 100
ql: 100
supportLevel: community
connectorTestSuitesOptions:
- suite: unitTests
- suite: integrationTests
metadataSpecVersion: "1.0"
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,13 @@ data:
sl: 100
ql: 100
supportLevel: community
connectorTestSuitesOptions:
- suite: unitTests
- suite: integrationTests
testSecrets:
- name: SECRET_DESTINATION-CUMULIO_CREDENTIALS__CREDS
fileName: config.json
secretStore:
type: GSM
alias: airbyte-connector-testing-secret-store
metadataSpecVersion: "1.0"
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,13 @@ data:
sl: 100
ql: 100
supportLevel: community
connectorTestSuitesOptions:
- suite: unitTests
- suite: integrationTests
testSecrets:
- name: SECRET_DESTINATION_DATABEND_CLOUD_CREDS
fileName: config.json
secretStore:
type: GSM
alias: airbyte-connector-testing-secret-store
metadataSpecVersion: "1.0"
Loading

0 comments on commit fac8909

Please sign in to comment.