Skip to content

Commit

Permalink
Merge branch 'master' into alex/test_iterable
Browse files Browse the repository at this point in the history
  • Loading branch information
girarda committed May 17, 2024
2 parents c67f331 + 9e2b057 commit 8465b76
Show file tree
Hide file tree
Showing 1,021 changed files with 52,820 additions and 23,775 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.59.0
current_version = 0.60.0
commit = False
tag = False
parse = (?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+)(\-[a-z]+)?
Expand Down
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
/airbyte-integrations/connectors/destination-milvus @airbytehq/ai-language-models
/airbyte-integrations/connectors/destination-qdrant @airbytehq/ai-language-models
/airbyte-integrations/connectors/destination-chroma @airbytehq/ai-language-models
/airbyte-integrations/connectors/destination-snowflake-cortex @airbytehq/ai-language-models
/airbyte-cdk/python/airbyte_cdk/destinations/vector_db_based @airbytehq/ai-language-models

# CI/CD
Expand Down
32 changes: 32 additions & 0 deletions .github/workflows/auto_merge.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
name: Auto merge connector PRs Cron

on:
schedule:
# 0AM UTC is 2AM CEST, 3AM EEST, 5PM PDT.
- cron: "0 0 * * *"
workflow_dispatch:
jobs:
run_auto_merge:
name: Run auto-merge
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.10"
- name: Install and configure Poetry
uses: snok/install-poetry@v1
- name: Run auto merge
shell: bash
working-directory: airbyte-ci/connectors/auto_merge
env:
# We need a custom Github Token as some API endpoints
# are not available from GHA auto generated tokens
# like the one to list branch protection rules...
GITHUB_TOKEN: ${{ secrets.AUTO_MERGE_GITHUB_TOKEN }}
AUTO_MERGE_PRODUCTION: ${{ vars.ENABLE_CONNECTOR_AUTO_MERGE }}
run: |
poetry install
poetry run auto-merge
25 changes: 23 additions & 2 deletions .github/workflows/metadata_service_deploy_orchestrator_dagger.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ name: Connector Ops CI - Metadata Service Deploy Orchestrator

on:
workflow_dispatch:
inputs:
deployment_target:
description: "The deployment target for the metadata orchestrator (prod or dev)"
default: "dev"
push:
branches:
- master
Expand All @@ -14,8 +18,9 @@ jobs:
steps:
- name: Checkout Airbyte
uses: actions/checkout@v2
- name: Deploy the metadata orchestrator
id: metadata-orchestrator-deploy-orchestrator-pipeline
- name: Deploy the metadata orchestrator [On merge to master]
id: metadata-orchestrator-deploy-orchestrator-pipeline-prod
if: github.event_name == 'push'
uses: ./.github/actions/run-airbyte-ci
with:
subcommand: "metadata deploy orchestrator"
Expand All @@ -27,3 +32,19 @@ jobs:
gcp_gsm_credentials: ${{ secrets.GCP_GSM_CREDENTIALS }}
env:
DAGSTER_CLOUD_METADATA_API_TOKEN: ${{ secrets.DAGSTER_CLOUD_METADATA_API_TOKEN }}
DAGSTER_CLOUD_DEPLOYMENT: "prod"
- name: Deploy the metadata orchestrator [On workflow]
id: metadata-orchestrator-deploy-orchestrator-pipeline-branch
if: github.event_name == 'workflow_dispatch'
uses: ./.github/actions/run-airbyte-ci
with:
subcommand: "metadata deploy orchestrator"
context: "manual"
dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_2 }}
github_token: ${{ secrets.GITHUB_TOKEN }}
docker_hub_username: ${{ secrets.DOCKER_HUB_USERNAME }}
docker_hub_password: ${{ secrets.DOCKER_HUB_PASSWORD }}
gcp_gsm_credentials: ${{ secrets.GCP_GSM_CREDENTIALS }}
env:
DAGSTER_CLOUD_METADATA_API_TOKEN: ${{ secrets.DAGSTER_CLOUD_METADATA_API_TOKEN }}
DAGSTER_CLOUD_DEPLOYMENT: ${{ inputs.deployment_target }}
4 changes: 3 additions & 1 deletion .github/workflows/publish-cdk-command-manually.yml
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,9 @@ jobs:
uses: ./.github/actions/run-airbyte-ci
with:
context: "master" # TODO: figure out why changing this yells with `The ci_gcs_credentials was not set on this PipelineContext.`
dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_2 }}
# Disable the dagger_cloud_token to disable remote cache access.
# See https://github.com/airbytehq/airbyte-internal-issues/issues/6439#issuecomment-2109503985 for context
#dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_2 }}
docker_hub_password: ${{ secrets.DOCKER_HUB_PASSWORD }}
docker_hub_username: ${{ secrets.DOCKER_HUB_USERNAME }}
gcp_gsm_credentials: ${{ secrets.GCP_GSM_CREDENTIALS }}
Expand Down
33 changes: 29 additions & 4 deletions .github/workflows/regression_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,20 @@ on:
workflow_dispatch:
inputs:
connector_name:
description: "Connector name (e.g. source-faker)"
description: Connector name (e.g. source-faker)
required: true
connection_id:
description: "ID of the connection to test"
description: ID of the connection to test; use "auto" to let the connection retriever choose a connection
required: true
pr_url:
description: "URL of the PR containing the code change"
description: URL of the PR containing the code change
required: true
streams:
description: Streams to include in regression tests
use_local_cdk:
description: Use the local CDK when building the target connector
default: "false"
type: boolean

jobs:
regression_tests:
Expand Down Expand Up @@ -61,6 +67,25 @@ jobs:
id: fetch_last_commit_id_wd
run: echo "commit_id=$(git rev-parse origin/${{ steps.extract_branch.outputs.branch }})" >> $GITHUB_OUTPUT

- name: Setup Stream Parameters
if: github.event_name == 'workflow_dispatch'
run: |
if [ -z "${{ github.event.inputs.streams }}" ]; then
echo "STREAM_PARAMS=" >> $GITHUB_ENV
else
STREAMS=$(echo "${{ github.event.inputs.streams }}" | sed 's/,/ --connector_regression_tests.selected-streams=/g')
echo "STREAM_PARAMS=--connector_regression_tests.selected-streams=$STREAMS" >> $GITHUB_ENV
fi
- name: Setup Local CDK Flag
if: github.event_name == 'workflow_dispatch'
run: |
if ${{ github.event.inputs.use_local_cdk }}; then
echo "USE_LOCAL_CDK_FLAG=--use-local-cdk" >> $GITHUB_ENV
else
echo "USE_LOCAL_CDK_FLAG=" >> $GITHUB_ENV
fi
- name: Run Regression Tests [WORKFLOW DISPATCH]
if: github.event_name == 'workflow_dispatch' # TODO: consider using the matrix strategy (https://docs.github.com/en/actions/using-jobs/using-a-matrix-for-your-jobs). See https://github.com/airbytehq/airbyte/pull/37659#discussion_r1583380234 for details.
uses: ./.github/actions/run-airbyte-ci
Expand All @@ -77,4 +102,4 @@ jobs:
github_token: ${{ secrets.GH_PAT_MAINTENANCE_OSS }}
s3_build_cache_access_key_id: ${{ secrets.SELF_RUNNER_AWS_ACCESS_KEY_ID }}
s3_build_cache_secret_key: ${{ secrets.SELF_RUNNER_AWS_SECRET_ACCESS_KEY }}
subcommand: "connectors --name ${{ github.event.inputs.connector_name }} test --only-step connector_regression_tests --connector_regression_tests.connection-id=${{ github.event.inputs.connection_id }} --connector_regression_tests.pr-url=${{ github.event.inputs.pr_url }}"
subcommand: connectors ${{ env.USE_LOCAL_CDK_FLAG }} --name ${{ github.event.inputs.connector_name }} test --only-step connector_regression_tests --connector_regression_tests.connection-id=${{ github.event.inputs.connection_id }} --connector_regression_tests.pr-url=${{ github.event.inputs.pr_url }} ${{ env.STREAM_PARAMS }}
4 changes: 1 addition & 3 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ static_checker_reports/
acceptance_tests_logs/
airbyte_ci_logs/
live_tests_debug_reports/
dagger_engine_logs*

# Secrets
secrets
Expand All @@ -26,9 +27,6 @@ updated_configurations
# Connector debug configs
airbyte-integrations/connectors/**/src/test/resources/debug_resources

# Test logs
acceptance_tests_logs

# Python
*.egg-info
__pycache__
Expand Down
8 changes: 7 additions & 1 deletion airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,13 @@ corresponds to that version.
### Java CDK

| Version | Date | Pull Request | Subject |
|:--------|:-----------| :--------------------------------------------------------- |:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.35.6 | 2024-05-17 | [\#38107](https://github.com/airbytehq/airbyte/pull/38107) | New interfaces for Destination connectors to plug into AsyncStreamConsumer |
| 0.35.5 | 2024-05-17 | [\#38204](https://github.com/airbytehq/airbyte/pull/38204) | add assume-role authentication to s3 |
| 0.35.2 | 2024-05-13 | [\#38104](https://github.com/airbytehq/airbyte/pull/38104) | Handle transient error messages |
| 0.35.0 | 2024-05-13 | [\#38127](https://github.com/airbytehq/airbyte/pull/38127) | Destinations: Populate generation/sync ID on StreamConfig |
| 0.34.4 | 2024-05-10 | [\#37712](https://github.com/airbytehq/airbyte/pull/37712) | make sure the exceptionHandler always terminates |
| 0.34.3 | 2024-05-10 | [\#38095](https://github.com/airbytehq/airbyte/pull/38095) | Minor changes for databricks connector |
| 0.34.1 | 2024-05-07 | [\#38030](https://github.com/airbytehq/airbyte/pull/38030) | Add support for transient errors |
| 0.34.0 | 2024-05-01 | [\#37712](https://github.com/airbytehq/airbyte/pull/37712) | Destinations: Remove incremental T+D |
| 0.33.2 | 2024-05-03 | [\#37824](https://github.com/airbytehq/airbyte/pull/37824) | improve source acceptance tests |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,51 +26,58 @@ class AirbyteExceptionHandler : Thread.UncaughtExceptionHandler {
// the sync."
// from the spec:
// https://docs.google.com/document/d/1ctrj3Yh_GjtQ93aND-WH3ocqGxsmxyC3jfiarrF6NY0/edit#
LOGGER.error(logMessage, throwable)
// Attempt to deinterpolate the error message before emitting a trace message
val mangledMessage: String?
// If any exception in the chain is of a deinterpolatable type, find it and deinterpolate
// its
// message.
// This assumes that any wrapping exceptions are just noise (e.g. runtime exception).
val deinterpolatableException =
ExceptionUtils.getThrowableList(throwable)
.stream()
.filter { t: Throwable ->
THROWABLES_TO_DEINTERPOLATE.stream().anyMatch {
deinterpolatableClass: Class<out Throwable> ->
deinterpolatableClass.isAssignableFrom(t.javaClass)
try {
LOGGER.error(logMessage, throwable)
// Attempt to deinterpolate the error message before emitting a trace message
val mangledMessage: String?
// If any exception in the chain is of a deinterpolatable type, find it and
// deinterpolate
// its
// message.
// This assumes that any wrapping exceptions are just noise (e.g. runtime exception).
val deinterpolatableException =
ExceptionUtils.getThrowableList(throwable)
.stream()
.filter { t: Throwable ->
THROWABLES_TO_DEINTERPOLATE.stream().anyMatch {
deinterpolatableClass: Class<out Throwable> ->
deinterpolatableClass.isAssignableFrom(t.javaClass)
}
}
}
.findFirst()
val messageWasMangled: Boolean
if (deinterpolatableException.isPresent) {
val originalMessage = deinterpolatableException.get().message
mangledMessage =
STRINGS_TO_DEINTERPOLATE
.stream() // Sort the strings longest to shortest, in case any target string is
// a substring of another
// e.g. "airbyte_internal" should be swapped out before "airbyte"
.sorted(Comparator.comparing { obj: String -> obj.length }.reversed())
.reduce(originalMessage) { message: String?, targetString: String? ->
deinterpolate(message, targetString)
}
messageWasMangled = mangledMessage != originalMessage
} else {
mangledMessage = throwable.message
messageWasMangled = false
}
.findFirst()
val messageWasMangled: Boolean
if (deinterpolatableException.isPresent) {
val originalMessage = deinterpolatableException.get().message
mangledMessage =
STRINGS_TO_DEINTERPOLATE
.stream() // Sort the strings longest to shortest, in case any target string
// is
// a substring of another
// e.g. "airbyte_internal" should be swapped out before "airbyte"
.sorted(Comparator.comparing { obj: String -> obj.length }.reversed())
.reduce(originalMessage) { message: String?, targetString: String? ->
deinterpolate(message, targetString)
}
messageWasMangled = mangledMessage != originalMessage
} else {
mangledMessage = throwable.message
messageWasMangled = false
}

if (!messageWasMangled) {
// If we did not modify the message (either not a deinterpolatable class, or we tried to
// deinterpolate but made no changes) then emit our default trace message
AirbyteTraceMessageUtility.emitSystemErrorTrace(throwable, logMessage)
} else {
// If we did modify the message, then emit a custom trace message
AirbyteTraceMessageUtility.emitCustomErrorTrace(throwable.message, mangledMessage)
if (!messageWasMangled) {
// If we did not modify the message (either not a deinterpolatable class, or we
// tried to
// deinterpolate but made no changes) then emit our default trace message
AirbyteTraceMessageUtility.emitSystemErrorTrace(throwable, logMessage)
} else {
// If we did modify the message, then emit a custom trace message
AirbyteTraceMessageUtility.emitCustomErrorTrace(throwable.message, mangledMessage)
}
} catch (t: Throwable) {
LOGGER.error("exception in the exception handler", t)
} finally {
terminate()
}

terminate()
}

// by doing this in a separate method we can mock it to avoid closing the jvm and therefore test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ object JavaBaseConstants {

const val DEFAULT_AIRBYTE_INTERNAL_NAMESPACE: String = "airbyte_internal"
enum class DestinationColumns(val rawColumns: List<String>) {
V2_WITH_META(JavaBaseConstants.V2_RAW_TABLE_COLUMN_NAMES),
V2_WITHOUT_META(JavaBaseConstants.V2_RAW_TABLE_COLUMN_NAMES_WITHOUT_META),
LEGACY(JavaBaseConstants.LEGACY_RAW_TABLE_COLUMNS)
V2_WITH_META(V2_RAW_TABLE_COLUMN_NAMES),
V2_WITHOUT_META(V2_RAW_TABLE_COLUMN_NAMES_WITHOUT_META),
LEGACY(LEGACY_RAW_TABLE_COLUMNS)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import java.util.*
* destinations framework; new implementations should always provide this information). If this
* value is empty, consumers should assume that the sync wrote nonzero records for this stream.
*/
class StreamSyncSummary(val recordsWritten: Optional<Long>) {
data class StreamSyncSummary(val recordsWritten: Optional<Long>) {

companion object {
@JvmField val DEFAULT: StreamSyncSummary = StreamSyncSummary(Optional.empty())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.integrations.destination.operation

import io.airbyte.cdk.integrations.destination.StreamSyncSummary
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage
import io.airbyte.protocol.models.v0.StreamDescriptor
import java.util.stream.Stream

/**
* Destination Connector sync operations Any initialization required for the connector should be
* done as part of instantiation/init blocks
*/
interface SyncOperation {

/**
* This function is a shim for
* [io.airbyte.cdk.integrations.destination.async.function.DestinationFlushFunction] After the
* method control is returned, it should be assumed that the data is committed to a durable
* storage and send back any State message acknowledgements.
*/
fun flushStream(descriptor: StreamDescriptor, stream: Stream<PartialAirbyteMessage>)

/**
* Finalize streams which could involve typing deduping or any other housekeeping tasks
* required.
*/
fun finalizeStreams(streamSyncSummaries: Map<StreamDescriptor, StreamSyncSummary>)
}
Loading

0 comments on commit 8465b76

Please sign in to comment.