Skip to content

Commit

Permalink
Merge branch 'master' into btkcodedev/coinApiCompatable
Browse files Browse the repository at this point in the history
  • Loading branch information
btkcodedev authored May 19, 2024
2 parents 8f529c7 + d793c1d commit cc0012f
Show file tree
Hide file tree
Showing 566 changed files with 21,384 additions and 11,461 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.59.1
current_version = 0.60.0
commit = False
tag = False
parse = (?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+)(\-[a-z]+)?
Expand Down
25 changes: 23 additions & 2 deletions .github/workflows/metadata_service_deploy_orchestrator_dagger.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ name: Connector Ops CI - Metadata Service Deploy Orchestrator

on:
workflow_dispatch:
inputs:
deployment_target:
description: "The deployment target for the metadata orchestrator (prod or dev)"
default: "dev"
push:
branches:
- master
Expand All @@ -14,8 +18,9 @@ jobs:
steps:
- name: Checkout Airbyte
uses: actions/checkout@v2
- name: Deploy the metadata orchestrator
id: metadata-orchestrator-deploy-orchestrator-pipeline
- name: Deploy the metadata orchestrator [On merge to master]
id: metadata-orchestrator-deploy-orchestrator-pipeline-prod
if: github.event_name == 'push'
uses: ./.github/actions/run-airbyte-ci
with:
subcommand: "metadata deploy orchestrator"
Expand All @@ -27,3 +32,19 @@ jobs:
gcp_gsm_credentials: ${{ secrets.GCP_GSM_CREDENTIALS }}
env:
DAGSTER_CLOUD_METADATA_API_TOKEN: ${{ secrets.DAGSTER_CLOUD_METADATA_API_TOKEN }}
DAGSTER_CLOUD_DEPLOYMENT: "prod"
- name: Deploy the metadata orchestrator [On workflow]
id: metadata-orchestrator-deploy-orchestrator-pipeline-branch
if: github.event_name == 'workflow_dispatch'
uses: ./.github/actions/run-airbyte-ci
with:
subcommand: "metadata deploy orchestrator"
context: "manual"
dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_2 }}
github_token: ${{ secrets.GITHUB_TOKEN }}
docker_hub_username: ${{ secrets.DOCKER_HUB_USERNAME }}
docker_hub_password: ${{ secrets.DOCKER_HUB_PASSWORD }}
gcp_gsm_credentials: ${{ secrets.GCP_GSM_CREDENTIALS }}
env:
DAGSTER_CLOUD_METADATA_API_TOKEN: ${{ secrets.DAGSTER_CLOUD_METADATA_API_TOKEN }}
DAGSTER_CLOUD_DEPLOYMENT: ${{ inputs.deployment_target }}
15 changes: 14 additions & 1 deletion .github/workflows/regression_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ on:
required: true
streams:
description: Streams to include in regression tests
use_local_cdk:
description: Use the local CDK when building the target connector
default: "false"
type: boolean

jobs:
regression_tests:
Expand Down Expand Up @@ -73,6 +77,15 @@ jobs:
echo "STREAM_PARAMS=--connector_regression_tests.selected-streams=$STREAMS" >> $GITHUB_ENV
fi
- name: Setup Local CDK Flag
if: github.event_name == 'workflow_dispatch'
run: |
if ${{ github.event.inputs.use_local_cdk }}; then
echo "USE_LOCAL_CDK_FLAG=--use-local-cdk" >> $GITHUB_ENV
else
echo "USE_LOCAL_CDK_FLAG=" >> $GITHUB_ENV
fi
- name: Run Regression Tests [WORKFLOW DISPATCH]
if: github.event_name == 'workflow_dispatch' # TODO: consider using the matrix strategy (https://docs.github.com/en/actions/using-jobs/using-a-matrix-for-your-jobs). See https://github.com/airbytehq/airbyte/pull/37659#discussion_r1583380234 for details.
uses: ./.github/actions/run-airbyte-ci
Expand All @@ -89,4 +102,4 @@ jobs:
github_token: ${{ secrets.GH_PAT_MAINTENANCE_OSS }}
s3_build_cache_access_key_id: ${{ secrets.SELF_RUNNER_AWS_ACCESS_KEY_ID }}
s3_build_cache_secret_key: ${{ secrets.SELF_RUNNER_AWS_SECRET_ACCESS_KEY }}
subcommand: connectors --name ${{ github.event.inputs.connector_name }} test --only-step connector_regression_tests --connector_regression_tests.connection-id=${{ github.event.inputs.connection_id }} --connector_regression_tests.pr-url=${{ github.event.inputs.pr_url }} ${{ env.STREAM_PARAMS }}
subcommand: connectors ${{ env.USE_LOCAL_CDK_FLAG }} --name ${{ github.event.inputs.connector_name }} test --only-step connector_regression_tests --connector_regression_tests.connection-id=${{ github.event.inputs.connection_id }} --connector_regression_tests.pr-url=${{ github.event.inputs.pr_url }} ${{ env.STREAM_PARAMS }}
5 changes: 3 additions & 2 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,12 @@ corresponds to that version.
### Java CDK

| Version | Date | Pull Request | Subject |
|:--------| :--------- | :--------------------------------------------------------- |:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.35.6 | 2024-05-17 | [\#38107](https://github.com/airbytehq/airbyte/pull/38107) | New interfaces for Destination connectors to plug into AsyncStreamConsumer |
| 0.35.5 | 2024-05-17 | [\#38204](https://github.com/airbytehq/airbyte/pull/38204) | add assume-role authentication to s3 |
| 0.35.2 | 2024-05-13 | [\#38104](https://github.com/airbytehq/airbyte/pull/38104) | Handle transient error messages |
| 0.35.0 | 2024-05-13 | [\#38127](https://github.com/airbytehq/airbyte/pull/38127) | Destinations: Populate generation/sync ID on StreamConfig |
| 0.34.4 | 2024-05-10 | [\#37712](https://github.com/airbytehq/airbyte/pull/37712) | make sure the exceptionHandler always terminates |
| 0.34.4 | 2024-05-10 | [\#37712](https://github.com/airbytehq/airbyte/pull/37712) | make sure the exceptionHandler always terminates |
| 0.34.3 | 2024-05-10 | [\#38095](https://github.com/airbytehq/airbyte/pull/38095) | Minor changes for databricks connector |
| 0.34.1 | 2024-05-07 | [\#38030](https://github.com/airbytehq/airbyte/pull/38030) | Add support for transient errors |
| 0.34.0 | 2024-05-01 | [\#37712](https://github.com/airbytehq/airbyte/pull/37712) | Destinations: Remove incremental T+D |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ object JavaBaseConstants {

const val DEFAULT_AIRBYTE_INTERNAL_NAMESPACE: String = "airbyte_internal"
enum class DestinationColumns(val rawColumns: List<String>) {
V2_WITH_META(JavaBaseConstants.V2_RAW_TABLE_COLUMN_NAMES),
V2_WITHOUT_META(JavaBaseConstants.V2_RAW_TABLE_COLUMN_NAMES_WITHOUT_META),
LEGACY(JavaBaseConstants.LEGACY_RAW_TABLE_COLUMNS)
V2_WITH_META(V2_RAW_TABLE_COLUMN_NAMES),
V2_WITHOUT_META(V2_RAW_TABLE_COLUMN_NAMES_WITHOUT_META),
LEGACY(LEGACY_RAW_TABLE_COLUMNS)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import java.util.*
* destinations framework; new implementations should always provide this information). If this
* value is empty, consumers should assume that the sync wrote nonzero records for this stream.
*/
class StreamSyncSummary(val recordsWritten: Optional<Long>) {
data class StreamSyncSummary(val recordsWritten: Optional<Long>) {

companion object {
@JvmField val DEFAULT: StreamSyncSummary = StreamSyncSummary(Optional.empty())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.integrations.destination.operation

import io.airbyte.cdk.integrations.destination.StreamSyncSummary
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage
import io.airbyte.protocol.models.v0.StreamDescriptor
import java.util.stream.Stream

/**
* Destination Connector sync operations Any initialization required for the connector should be
* done as part of instantiation/init blocks
*/
interface SyncOperation {

/**
* This function is a shim for
* [io.airbyte.cdk.integrations.destination.async.function.DestinationFlushFunction] After the
* method control is returned, it should be assumed that the data is committed to a durable
* storage and send back any State message acknowledgements.
*/
fun flushStream(descriptor: StreamDescriptor, stream: Stream<PartialAirbyteMessage>)

/**
* Finalize streams which could involve typing deduping or any other housekeeping tasks
* required.
*/
fun finalizeStreams(streamSyncSummaries: Map<StreamDescriptor, StreamSyncSummary>)
}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.35.2
version=0.35.6
Original file line number Diff line number Diff line change
Expand Up @@ -367,10 +367,14 @@ abstract class DestinationAcceptanceTest {
workspaceRoot.toString(),
localRoot.toString(),
"host",
emptyMap()
getConnectorEnv()
)
}

open fun getConnectorEnv(): Map<String, String> {
return emptyMap()
}

@AfterEach
@Throws(Exception::class)
fun tearDownInternal() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import com.google.common.collect.AbstractIterator
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.AirbyteStateStats
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
import io.airbyte.protocol.models.v0.SyncMode
import java.time.Duration
import java.time.Instant
import java.time.OffsetDateTime
Expand Down Expand Up @@ -41,9 +42,11 @@ open class SourceStateIterator<T>(
) {
val stateMessage =
sourceStateMessageProducer.generateStateMessageAtCheckpoint(stream)
stateMessage!!.withSourceStats(
AirbyteStateStats().withRecordCount(recordCount.toDouble())
)
if (shouldAttachCountWithState()) {
stateMessage!!.withSourceStats(
AirbyteStateStats().withRecordCount(recordCount.toDouble())
)
}

recordCount = 0L
lastCheckpoint = Instant.now()
Expand All @@ -64,9 +67,11 @@ open class SourceStateIterator<T>(
hasEmittedFinalState = true
val finalStateMessageForStream =
sourceStateMessageProducer.createFinalStateMessage(stream)
finalStateMessageForStream!!.withSourceStats(
AirbyteStateStats().withRecordCount(recordCount.toDouble())
)
if (shouldAttachCountWithState()) {
finalStateMessageForStream!!.withSourceStats(
AirbyteStateStats().withRecordCount(recordCount.toDouble())
)
}
recordCount = 0L
return AirbyteMessage()
.withType(AirbyteMessage.Type.STATE)
Expand All @@ -76,6 +81,14 @@ open class SourceStateIterator<T>(
}
}

/**
* We are disabling counts for FULL_REFRESH streams cause there is are issues with it. We should
* re-enable it once we do the work for project Counts: Emit Counts in Full Refresh
*/
private fun shouldAttachCountWithState(): Boolean {
return stream?.syncMode != SyncMode.FULL_REFRESH
}

// This method is used to check if we should emit a state message. If the record count is set to
// 0,
// we should not emit a state message.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,7 @@ internal class CursorStateMessageProducerTest {
NAMESPACE,
Field.of(UUID_FIELD_NAME, JsonSchemaType.STRING)
)
.withSyncMode(SyncMode.INCREMENTAL)
.withCursorField(List.of(UUID_FIELD_NAME))

private val EMPTY_STATE_MESSAGE = createEmptyStateMessage(0.0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -665,10 +665,6 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
modelsSchema(),
)
} else {
assertExpectedStateMessageCountMatches(
stateMessages1,
MODEL_RECORDS.size.toLong() + MODEL_RECORDS_2.size.toLong(),
)
assertExpectedRecords(
Streams.concat(MODEL_RECORDS_2.stream(), MODEL_RECORDS.stream())
.collect(Collectors.toSet()),
Expand All @@ -690,7 +686,6 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
val recordMessages2 = extractRecordMessages(actualRecords2)
val stateMessages2 = extractStateMessages(actualRecords2)

assertExpectedStateMessageCountMatches(stateMessages2, 7)
assertExpectedRecords(
Streams.concat(MODEL_RECORDS_2.stream(), Stream.of(puntoRecord))
.collect(Collectors.toSet()),
Expand Down Expand Up @@ -1134,7 +1129,6 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
val recordsFromFirstBatch = extractRecordMessages(dataFromFirstBatch)
val stateAfterFirstBatch = extractStateMessages(dataFromFirstBatch)
assertExpectedStateMessagesForFullRefresh(stateAfterFirstBatch)
assertExpectedStateMessageCountMatches(stateAfterFirstBatch, MODEL_RECORDS.size.toLong())

val stateMessageEmittedAfterFirstSyncCompletion =
stateAfterFirstBatch[stateAfterFirstBatch.size - 1]
Expand Down Expand Up @@ -1244,10 +1238,6 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {

Assertions.assertEquals(12, recordsFromFirstBatch.size)

assertExpectedStateMessageCountMatches(
stateAfterFirstBatch,
MODEL_RECORDS.size.toLong() + MODEL_RECORDS_2.size.toLong(),
)
stateAfterFirstBatch.map { state -> assertStateDoNotHaveDuplicateStreams(state) }
}

Expand Down
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/s3-destinations/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ dependencies {

// Re-export dependencies for gcs-destinations.
api 'com.amazonaws:aws-java-sdk-s3:1.12.647'
api 'com.amazonaws:aws-java-sdk-sts:1.12.647'
api ('com.github.airbytehq:json-avro-converter:1.1.0') { exclude group: 'ch.qos.logback', module: 'logback-classic'}
api 'com.github.alexmojaki:s3-stream-upload:2.2.4'
api 'org.apache.avro:avro:1.11.3'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ import org.slf4j.LoggerFactory

abstract class BaseS3Destination
protected constructor(
protected val configFactory: S3DestinationConfigFactory = S3DestinationConfigFactory()
protected val configFactory: S3DestinationConfigFactory = S3DestinationConfigFactory(),
protected val environment: Map<String, String> = System.getenv()
) : BaseConnector(), Destination {
private val nameTransformer: NamingConventionTransformer = S3NameTransformer()

override fun check(config: JsonNode): AirbyteConnectionStatus? {
try {
val destinationConfig = configFactory.getS3DestinationConfig(config, storageProvider())
val destinationConfig =
configFactory.getS3DestinationConfig(config, storageProvider(), environment)
val s3Client = destinationConfig.getS3Client()

S3BaseChecks.testIAMUserHasListObjectPermission(s3Client, destinationConfig.bucketName)
Expand Down Expand Up @@ -60,7 +62,7 @@ protected constructor(
catalog: ConfiguredAirbyteCatalog,
outputRecordCollector: Consumer<AirbyteMessage>
): AirbyteMessageConsumer? {
val s3Config = configFactory.getS3DestinationConfig(config, storageProvider())
val s3Config = configFactory.getS3DestinationConfig(config, storageProvider(), environment)
return S3ConsumerFactory()
.create(
outputRecordCollector,
Expand Down
Loading

0 comments on commit cc0012f

Please sign in to comment.