Skip to content

Commit

Permalink
Merge branch 'master' into christo/sdm-unit-test-poc
Browse files Browse the repository at this point in the history
  • Loading branch information
ChristoGrab authored Dec 11, 2024
2 parents a9e8371 + 61c5777 commit 7980b3e
Show file tree
Hide file tree
Showing 462 changed files with 10,297 additions and 12,928 deletions.
64 changes: 31 additions & 33 deletions .github/workflows/connectors_up_to_date.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,45 +13,43 @@ on:
description: "Options to pass to the 'airbyte-ci connectors' command group."
default: "--language=python --language=low-code --language=manifest-only"
jobs:
scheduled_connectors_up_to_date:
name: Connectors up-to-date [SCHEDULED TRIGGER]
runs-on: connector-nightly-xlarge
if: github.event_name == 'schedule'
strategy:
matrix:
connector_language:
- python
# TODO: re-add low_code and manifest-only once new SDM image is deemed stable
# - low_code
# - manifest-only
support_level:
- certified
- community
permissions:
pull-requests: write
generate_matrix:
name: Generate matrix
runs-on: ubuntu-latest
outputs:
generated_matrix: ${{ steps.generate_matrix.outputs.generated_matrix }}
steps:
- name: Checkout Airbyte
uses: actions/checkout@v4
- name: Run airbyte-ci connectors up-to-date [WORKFLOW]
id: airbyte-ci-connectors-up-to-date-workflow-dispatch
- name: Run airbyte-ci connectors list [SCHEDULED TRIGGER]
if: github.event_name == 'schedule'
id: airbyte-ci-connectors-list-scheduled
uses: ./.github/actions/run-airbyte-ci
with:
context: "master"
dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_CACHE_3 }}
docker_hub_password: ${{ secrets.DOCKER_HUB_PASSWORD }}
docker_hub_username: ${{ secrets.DOCKER_HUB_USERNAME }}
gcp_gsm_credentials: ${{ secrets.GCP_GSM_CREDENTIALS }}
gcs_credentials: ${{ secrets.METADATA_SERVICE_PROD_GCS_CREDENTIALS }}
github_token: ${{ secrets.GH_PAT_MAINTENANCE_OSS }}
sentry_dsn: ${{ secrets.SENTRY_AIRBYTE_CI_DSN }}
s3_build_cache_access_key_id: ${{ secrets.SELF_RUNNER_AWS_ACCESS_KEY_ID }}
s3_build_cache_secret_key: ${{ secrets.SELF_RUNNER_AWS_SECRET_ACCESS_KEY }}
subcommand: 'connectors --concurrency=10 --language=${{ matrix.connector_language}} --support-level=${{ matrix.support_level}} --metadata-query="\"source-declarative-manifest\" not in data.dockerRepository" --metadata-query="\"-rc.\" not in data.dockerImageTag" up-to-date --create-prs --auto-merge'
subcommand: 'connectors --language=python --language=low-code --language=manifest-only --metadata-query="\"-rc.\" not in data.dockerImageTag and \"source-declarative-manifest\" not in data.dockerRepository" list --output=selected_connectors.json'
- name: Run airbyte-ci connectors list [MANUAL TRIGGER]
if: github.event_name == 'workflow_dispatch'
id: airbyte-ci-connectors-list-workflow-dispatch
uses: ./.github/actions/run-airbyte-ci
with:
context: "master"
subcommand: 'connectors ${{ github.event.inputs.connectors-options }} --metadata-query="\"-rc.\" not in data.dockerImageTag and \"source-declarative-manifest\" not in data.dockerRepository" list --output=selected_connectors.json'
# We generate a dynamic matrix from the list of selected connectors.
# A matrix is required in this situation because the number of connectors is large and running them all in a single job would exceed the maximum job time limit of 6 hours.
- name: Generate matrix - 100 connectors per job
id: generate_matrix
run: |
matrix=$(jq -c -r '{include: [.[] | "--name=" + .] | to_entries | group_by(.key / 100 | floor) | map(map(.value) | {"connector_names": join(" ")})}' selected_connectors.json)
echo "::set-output name=generated_matrix::$matrix"
workflow_dispatch_connectors_up_to_date:
name: Connectors up-to-date [MANUAL TRIGGER]
runs-on: connector-nightly-xlarge
if: github.event_name == 'workflow_dispatch'
run_connectors_up_to_date:
needs: generate_matrix
name: Connectors up-to-date
runs-on: connector-up-to-date-small
continue-on-error: true
strategy:
matrix: ${{fromJson(needs.generate_matrix.outputs.generated_matrix)}}
permissions:
pull-requests: write
steps:
Expand All @@ -71,4 +69,4 @@ jobs:
sentry_dsn: ${{ secrets.SENTRY_AIRBYTE_CI_DSN }}
s3_build_cache_access_key_id: ${{ secrets.SELF_RUNNER_AWS_ACCESS_KEY_ID }}
s3_build_cache_secret_key: ${{ secrets.SELF_RUNNER_AWS_SECRET_ACCESS_KEY }}
subcommand: 'connectors --concurrency=10 ${{ github.event.inputs.connectors-options }} --metadata-query="\"source-declarative-manifest\" not in data.dockerRepository" --metadata-query="\"-rc.\" not in data.dockerImageTag" up-to-date --create-prs --auto-merge'
subcommand: "connectors --concurrency=10 ${{ matrix.connector_names}} up-to-date --create-prs --auto-merge"
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,14 @@ class FeedReader(
}
var checkpoint: PartitionReadCheckpoint
try {
withTimeout(root.timeout.toKotlinDuration()) { partitionReader.run() }
if (partitionReader is UnlimitedTimePartitionReader) {
partitionReader.run()
} else {
log.info {
"Running partition reader with ${root.timeout.toKotlinDuration()} timeout"
}
withTimeout(root.timeout.toKotlinDuration()) { partitionReader.run() }
}
log.info {
"completed reading partition $partitionReaderID " +
"for '${feed.label}' in round $partitionsCreatorID"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,3 +154,6 @@ data class PartitionReadCheckpoint(
val opaqueStateValue: OpaqueStateValue,
val numRecords: Long,
)

/** A [PartitionReader] with no time limit for its execution. */
interface UnlimitedTimePartitionReader : PartitionReader
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import io.micronaut.context.annotation.Secondary
import jakarta.inject.Singleton
import java.util.concurrent.ConcurrentHashMap
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.withTimeoutOrNull

sealed interface SyncResult

Expand All @@ -29,7 +28,10 @@ interface SyncManager {
/** Get the manager for the given stream. Throws an exception if the stream is not found. */
fun getStreamManager(stream: DestinationStream.Descriptor): StreamManager

fun registerStartedStreamLoader(streamLoader: StreamLoader)
fun registerStartedStreamLoader(
streamDescriptor: DestinationStream.Descriptor,
streamLoaderResult: Result<StreamLoader>
)
suspend fun getOrAwaitStreamLoader(stream: DestinationStream.Descriptor): StreamLoader
suspend fun getStreamLoaderOrNull(stream: DestinationStream.Descriptor): StreamLoader?

Expand All @@ -56,32 +58,33 @@ class DefaultSyncManager(
) : SyncManager {
private val syncResult = CompletableDeferred<SyncResult>()
private val streamLoaders =
ConcurrentHashMap<DestinationStream.Descriptor, CompletableDeferred<StreamLoader>>()
ConcurrentHashMap<DestinationStream.Descriptor, CompletableDeferred<Result<StreamLoader>>>()
private val inputConsumed = CompletableDeferred<Boolean>()
private val checkpointsProcessed = CompletableDeferred<Boolean>()

override fun getStreamManager(stream: DestinationStream.Descriptor): StreamManager {
return streamManagers[stream] ?: throw IllegalArgumentException("Stream not found: $stream")
}

override fun registerStartedStreamLoader(streamLoader: StreamLoader) {
override fun registerStartedStreamLoader(
streamDescriptor: DestinationStream.Descriptor,
streamLoaderResult: Result<StreamLoader>
) {
streamLoaders
.getOrPut(streamLoader.stream.descriptor) { CompletableDeferred() }
.complete(streamLoader)
.getOrPut(streamDescriptor) { CompletableDeferred() }
.complete(streamLoaderResult)
}

override suspend fun getOrAwaitStreamLoader(
stream: DestinationStream.Descriptor
): StreamLoader {
return streamLoaders.getOrPut(stream) { CompletableDeferred() }.await()
return streamLoaders.getOrPut(stream) { CompletableDeferred() }.await().getOrThrow()
}

override suspend fun getStreamLoaderOrNull(
stream: DestinationStream.Descriptor
): StreamLoader? {
val completable = streamLoaders[stream]
// `.isCompleted` does not work as expected here.
return completable?.let { withTimeoutOrNull(1000L) { it.await() } }
return streamLoaders[stream]?.await()?.getOrNull()
}

override suspend fun awaitAllStreamsCompletedSuccessfully(): Boolean {
Expand Down

This file was deleted.

Loading

0 comments on commit 7980b3e

Please sign in to comment.