Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into xiaohan/testmove
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaohansong committed Oct 28, 2024
2 parents 70a31e7 + 423d745 commit 78c303a
Show file tree
Hide file tree
Showing 798 changed files with 50,134 additions and 14,509 deletions.
25 changes: 21 additions & 4 deletions .github/workflows/connectors_up_to_date.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ on:
inputs:
connectors-options:
description: "Options to pass to the 'airbyte-ci connectors' command group."
default: "--concurrency=10 --language=python --language=low-code --language=manifest-only"
default: '--concurrency=10 --language=python --language=low-code --language=manifest-only --metadata-query="''-rc.'' not in data.dockerImageTag" --metadata-query="''source-declarative-manifest'' not in data.dockerRepository"'
auto-merge:
description: "Whether to auto-merge the PRs created by the action."
default: "false"
Expand All @@ -25,8 +25,9 @@ jobs:
steps:
- name: Checkout Airbyte
uses: actions/checkout@v4
- name: Run airbyte-ci connectors up-to-date
id: airbyte-ci-connectors-up-to-date
- name: Run airbyte-ci connectors up-to-date [WORKFLOW]
if: github.event_name == 'workflow_dispatch'
id: airbyte-ci-connectors-up-to-date-workflow-dispatch
uses: ./.github/actions/run-airbyte-ci
with:
context: "master"
Expand All @@ -39,4 +40,20 @@ jobs:
sentry_dsn: ${{ secrets.SENTRY_AIRBYTE_CI_DSN }}
s3_build_cache_access_key_id: ${{ secrets.SELF_RUNNER_AWS_ACCESS_KEY_ID }}
s3_build_cache_secret_key: ${{ secrets.SELF_RUNNER_AWS_SECRET_ACCESS_KEY }}
subcommand: "connectors ${{ github.event.inputs.connectors-options || '--concurrency=10 --language=python --language=low-code --support-level=community --support-level=certified' }} up-to-date --ignore-connector=source-declarative-manifest --create-prs ${{ github.event.inputs.auto-merge == 'false' && '' || '--auto-merge' }}"
subcommand: "connectors ${{ github.event.inputs.connectors-options }} up-to-date --create-prs ${{ github.event.inputs.auto-merge == 'false' && '' || '--auto-merge' }}"
- name: Run airbyte-ci connectors up-to-date [SCHEDULE]
if: github.event_name == 'schedule'
id: airbyte-ci-connectors-up-to-date-schedule
uses: ./.github/actions/run-airbyte-ci
with:
context: "master"
dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_CACHE_3 }}
docker_hub_password: ${{ secrets.DOCKER_HUB_PASSWORD }}
docker_hub_username: ${{ secrets.DOCKER_HUB_USERNAME }}
gcp_gsm_credentials: ${{ secrets.GCP_GSM_CREDENTIALS }}
gcs_credentials: ${{ secrets.METADATA_SERVICE_PROD_GCS_CREDENTIALS }}
github_token: ${{ secrets.GH_PAT_MAINTENANCE_OSS }}
sentry_dsn: ${{ secrets.SENTRY_AIRBYTE_CI_DSN }}
s3_build_cache_access_key_id: ${{ secrets.SELF_RUNNER_AWS_ACCESS_KEY_ID }}
s3_build_cache_secret_key: ${{ secrets.SELF_RUNNER_AWS_SECRET_ACCESS_KEY }}
subcommand: 'connectors --concurrency=10 --language=python --language=low-code --support-level=community --support-level=certified --metadata-query="\"source-declarative-manifest\" not in data.dockerRepository" --metadata-query="\"-rc.\" not in data.dockerImageTag" up-to-date --ignore-connector=source-declarative-manifest --create-prs --auto-merge'
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
package io.airbyte.cdk.discover

import io.airbyte.cdk.data.AirbyteSchemaType
import io.airbyte.cdk.data.IntCodec
import io.airbyte.cdk.data.JsonDecoder
import io.airbyte.cdk.data.JsonEncoder
import io.airbyte.cdk.data.JsonStringCodec
import io.airbyte.cdk.data.LeafAirbyteSchemaType
import io.airbyte.cdk.data.LongCodec
import io.airbyte.cdk.data.OffsetDateTimeCodec
import java.time.OffsetDateTime

Expand Down Expand Up @@ -63,7 +63,6 @@ interface MetaField : FieldOrMetaField {
enum class CommonMetaField(
override val type: FieldType,
) : MetaField {
CDC_LSN(CdcStringMetaFieldType),
CDC_UPDATED_AT(CdcOffsetDateTimeMetaFieldType),
CDC_DELETED_AT(CdcOffsetDateTimeMetaFieldType),
;
Expand All @@ -80,8 +79,8 @@ data object CdcStringMetaFieldType : LosslessFieldType {

data object CdcIntegerMetaFieldType : LosslessFieldType {
override val airbyteSchemaType: AirbyteSchemaType = LeafAirbyteSchemaType.INTEGER
override val jsonEncoder: JsonEncoder<Int> = IntCodec
override val jsonDecoder: JsonDecoder<Int> = IntCodec
override val jsonEncoder: JsonEncoder<Long> = LongCodec
override val jsonDecoder: JsonDecoder<Long> = LongCodec
}

data object CdcOffsetDateTimeMetaFieldType : LosslessFieldType {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.discover

import com.fasterxml.jackson.databind.node.ObjectNode
import io.airbyte.cdk.command.OpaqueStateValue
import io.airbyte.cdk.read.Stream
import io.airbyte.protocol.models.v0.AirbyteStream
import java.time.OffsetDateTime

/** [MetaField] schema definition and utilities, to be implemented by each source connector. */
interface MetaFieldDecorator {

/** [MetaField] to use as a global cursor, if applicable. */
val globalCursor: MetaField?

/**
* All [MetaField]s to be found in [Global] stream records.
*
* This must include at least [globalCursor] if not null.
*
* Empty set when not applicable.
*/
val globalMetaFields: Set<MetaField>

/** Convenience function for [AirbyteStreamFactory]. */
fun decorateAirbyteStream(airbyteStream: AirbyteStream) {
(airbyteStream.jsonSchema["properties"] as ObjectNode).apply {
for (metaField in globalMetaFields) {
set<ObjectNode>(metaField.id, metaField.type.airbyteSchemaType.asJsonSchema())
}
}
val globalCursorIdentifier: String = globalCursor?.id ?: return
airbyteStream.defaultCursorField = listOf(globalCursorIdentifier)
airbyteStream.sourceDefinedCursor = true
}

/**
* Modifies [recordData] by setting all [MetaField] values in global [Stream] feeds.
*
* This is required by the fact that records of a given stream may be emitted by both a [Stream]
* and a [Global] feed and the schemas must be the same. This implies that the records emitted
* by [Stream] must have [MetaField]s set to suitable values, even though that [Feed] has no
* awareness of the [Global] state.
*
* This method is called at most once per [Stream].
*/
fun decorateRecordData(
/** Same value as emitted_at */
timestamp: OffsetDateTime,
/** Current state of the [Global] feed, if applicable. */
globalStateValue: OpaqueStateValue?,
stream: Stream,
recordData: ObjectNode
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,4 @@ interface MetadataQuerier : AutoCloseable {
/** An implementation might open a connection to build a [MetadataQuerier] instance. */
fun session(config: T): MetadataQuerier
}

fun commonCursorOrNull(cursorColumnID: String): FieldOrMetaField?
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ data class Global(
*/
data class Stream(
val id: StreamIdentifier,
val fields: List<Field>,
val schema: Set<FieldOrMetaField>,
val configuredSyncMode: ConfiguredSyncMode,
val configuredPrimaryKey: List<Field>?,
val configuredCursor: FieldOrMetaField?,
Expand All @@ -43,6 +43,9 @@ data class Stream(

override val label: String
get() = id.toString()

val fields: List<Field>
get() = schema.filterIsInstance<Field>()
}

/** List of [Stream]s this [Feed] emits records for. */
Expand Down
Loading

0 comments on commit 78c303a

Please sign in to comment.