Skip to content

Commit

Permalink
Merge branch 'master' into btkcodedev/intercomMigrateManifestOnly
Browse files Browse the repository at this point in the history
  • Loading branch information
btkcodedev authored Dec 12, 2024
2 parents 8bebea4 + e6a56d3 commit b0d4537
Show file tree
Hide file tree
Showing 534 changed files with 13,223 additions and 13,240 deletions.
64 changes: 31 additions & 33 deletions .github/workflows/connectors_up_to_date.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,45 +13,43 @@ on:
description: "Options to pass to the 'airbyte-ci connectors' command group."
default: "--language=python --language=low-code --language=manifest-only"
jobs:
scheduled_connectors_up_to_date:
name: Connectors up-to-date [SCHEDULED TRIGGER]
runs-on: connector-nightly-xlarge
if: github.event_name == 'schedule'
strategy:
matrix:
connector_language:
- python
# TODO: re-add low_code and manifest-only once new SDM image is deemed stable
# - low_code
# - manifest-only
support_level:
- certified
- community
permissions:
pull-requests: write
generate_matrix:
name: Generate matrix
runs-on: ubuntu-latest
outputs:
generated_matrix: ${{ steps.generate_matrix.outputs.generated_matrix }}
steps:
- name: Checkout Airbyte
uses: actions/checkout@v4
- name: Run airbyte-ci connectors up-to-date [WORKFLOW]
id: airbyte-ci-connectors-up-to-date-workflow-dispatch
- name: Run airbyte-ci connectors list [SCHEDULED TRIGGER]
if: github.event_name == 'schedule'
id: airbyte-ci-connectors-list-scheduled
uses: ./.github/actions/run-airbyte-ci
with:
context: "master"
dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_CACHE_3 }}
docker_hub_password: ${{ secrets.DOCKER_HUB_PASSWORD }}
docker_hub_username: ${{ secrets.DOCKER_HUB_USERNAME }}
gcp_gsm_credentials: ${{ secrets.GCP_GSM_CREDENTIALS }}
gcs_credentials: ${{ secrets.METADATA_SERVICE_PROD_GCS_CREDENTIALS }}
github_token: ${{ secrets.GH_PAT_MAINTENANCE_OSS }}
sentry_dsn: ${{ secrets.SENTRY_AIRBYTE_CI_DSN }}
s3_build_cache_access_key_id: ${{ secrets.SELF_RUNNER_AWS_ACCESS_KEY_ID }}
s3_build_cache_secret_key: ${{ secrets.SELF_RUNNER_AWS_SECRET_ACCESS_KEY }}
subcommand: 'connectors --concurrency=10 --language=${{ matrix.connector_language}} --support-level=${{ matrix.support_level}} --metadata-query="\"source-declarative-manifest\" not in data.dockerRepository" --metadata-query="\"-rc.\" not in data.dockerImageTag" up-to-date --create-prs --auto-merge'
subcommand: 'connectors --language=python --language=low-code --language=manifest-only --metadata-query="\"-rc.\" not in data.dockerImageTag and \"source-declarative-manifest\" not in data.dockerRepository" list --output=selected_connectors.json'
- name: Run airbyte-ci connectors list [MANUAL TRIGGER]
if: github.event_name == 'workflow_dispatch'
id: airbyte-ci-connectors-list-workflow-dispatch
uses: ./.github/actions/run-airbyte-ci
with:
context: "master"
subcommand: 'connectors ${{ github.event.inputs.connectors-options }} --metadata-query="\"-rc.\" not in data.dockerImageTag and \"source-declarative-manifest\" not in data.dockerRepository" list --output=selected_connectors.json'
# We generate a dynamic matrix from the list of selected connectors.
# A matrix is required in this situation because the number of connectors is large and running them all in a single job would exceed the maximum job time limit of 6 hours.
- name: Generate matrix - 100 connectors per job
id: generate_matrix
run: |
matrix=$(jq -c -r '{include: [.[] | "--name=" + .] | to_entries | group_by(.key / 100 | floor) | map(map(.value) | {"connector_names": join(" ")})}' selected_connectors.json)
echo "::set-output name=generated_matrix::$matrix"
workflow_dispatch_connectors_up_to_date:
name: Connectors up-to-date [MANUAL TRIGGER]
runs-on: connector-nightly-xlarge
if: github.event_name == 'workflow_dispatch'
run_connectors_up_to_date:
needs: generate_matrix
name: Connectors up-to-date
runs-on: connector-up-to-date-small
continue-on-error: true
strategy:
matrix: ${{fromJson(needs.generate_matrix.outputs.generated_matrix)}}
permissions:
pull-requests: write
steps:
Expand All @@ -71,4 +69,4 @@ jobs:
sentry_dsn: ${{ secrets.SENTRY_AIRBYTE_CI_DSN }}
s3_build_cache_access_key_id: ${{ secrets.SELF_RUNNER_AWS_ACCESS_KEY_ID }}
s3_build_cache_secret_key: ${{ secrets.SELF_RUNNER_AWS_SECRET_ACCESS_KEY }}
subcommand: 'connectors --concurrency=10 ${{ github.event.inputs.connectors-options }} --metadata-query="\"source-declarative-manifest\" not in data.dockerRepository" --metadata-query="\"-rc.\" not in data.dockerImageTag" up-to-date --create-prs --auto-merge'
subcommand: "connectors --concurrency=10 ${{ matrix.connector_names}} up-to-date --create-prs --auto-merge"
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,14 @@ class FeedReader(
}
var checkpoint: PartitionReadCheckpoint
try {
withTimeout(root.timeout.toKotlinDuration()) { partitionReader.run() }
if (partitionReader is UnlimitedTimePartitionReader) {
partitionReader.run()
} else {
log.info {
"Running partition reader with ${root.timeout.toKotlinDuration()} timeout"
}
withTimeout(root.timeout.toKotlinDuration()) { partitionReader.run() }
}
log.info {
"completed reading partition $partitionReaderID " +
"for '${feed.label}' in round $partitionsCreatorID"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,3 +154,6 @@ data class PartitionReadCheckpoint(
val opaqueStateValue: OpaqueStateValue,
val numRecords: Long,
)

/** A [PartitionReader] with no time limit for its execution. */
interface UnlimitedTimePartitionReader : PartitionReader
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,17 @@ class MockDestinationWriter : DestinationWriter {
}

class MockStreamLoader(override val stream: DestinationStream) : StreamLoader {
data class LocalBatch(val records: List<DestinationRecord>) : Batch {
abstract class MockBatch : Batch {
override val groupId: String? = null
}

data class LocalBatch(val records: List<DestinationRecord>) : MockBatch() {
override val state = Batch.State.LOCAL
}
data class LocalFileBatch(val file: DestinationFile) : Batch {
data class LocalFileBatch(val file: DestinationFile) : MockBatch() {
override val state = Batch.State.LOCAL
}
data class PersistedBatch(val records: List<DestinationRecord>) : Batch {
data class PersistedBatch(val records: List<DestinationRecord>) : MockBatch() {
override val state = Batch.State.PERSISTED
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,17 @@ import com.google.common.collect.TreeRangeSet
* the associated ranges have been persisted remotely, and that platform checkpoint messages can be
* emitted.
*
* [State.SPILLED] is used internally to indicate that records have been spooled to disk for
* [State.LOCAL] is used internally to indicate that records have been spooled to disk for
* processing and should not be used by implementors.
*
* When a stream has been read to End-of-stream, and all ranges between 0 and End-of-stream are
* [State.COMPLETE], then all records are considered to have been processed.
*
* A [Batch] may contain an optional `groupId`. If provided, the most advanced state provided for
* any batch will apply to all batches with the same `groupId`. This is useful for a case where each
* batch represents part of a larger work unit that is only completed when all parts are processed.
* (We used most advanced instead of latest to avoid race conditions.)
*
* The intended usage for implementors is to implement the provided interfaces in case classes that
* contain the necessary metadata for processing, using them in @
* [io.airbyte.cdk.write.StreamLoader.processBatch] to route to the appropriate handler(s).
Expand All @@ -45,6 +50,8 @@ import com.google.common.collect.TreeRangeSet
* ```
*/
interface Batch {
val groupId: String?

enum class State {
LOCAL,
PERSISTED,
Expand All @@ -62,7 +69,10 @@ interface Batch {
}

/** Simple batch: use if you need no other metadata for processing. */
data class SimpleBatch(override val state: Batch.State) : Batch
data class SimpleBatch(
override val state: Batch.State,
override val groupId: String? = null,
) : Batch

/**
* Internally-used wrapper for tracking the association between a batch and the range of records it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ class DestinationMessageFactory(
if (fileTransferEnabled) {
DestinationFileStreamComplete(
stream.descriptor,
message.trace.emittedAt.toLong()
message.trace.emittedAt?.toLong() ?: 0L
)
} else {
DestinationRecordStreamComplete(
Expand All @@ -461,12 +461,12 @@ class DestinationMessageFactory(
if (fileTransferEnabled) {
DestinationFileStreamIncomplete(
stream.descriptor,
message.trace.emittedAt.toLong()
message.trace.emittedAt?.toLong() ?: 0L
)
} else {
DestinationRecordStreamIncomplete(
stream.descriptor,
message.trace.emittedAt.toLong()
message.trace.emittedAt?.toLong() ?: 0L
)
}
else -> Undefined
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ class DefaultStreamManager(
) : StreamManager {
private val streamResult = CompletableDeferred<StreamResult>()

data class CachedRanges(val state: Batch.State, val ranges: RangeSet<Long>)
private val cachedRangesById = ConcurrentHashMap<String, CachedRanges>()

private val log = KotlinLogging.logger {}

private val recordCount = AtomicLong(0)
Expand Down Expand Up @@ -144,15 +147,30 @@ class DefaultStreamManager(
rangesState[batch.batch.state]
?: throw IllegalArgumentException("Invalid batch state: ${batch.batch.state}")

// If the batch is part of a group, update all ranges associated with its groupId
// to the most advanced state. Otherwise, just use the ranges provided.
val cachedRangesMaybe = batch.batch.groupId?.let { cachedRangesById[batch.batch.groupId] }

log.info {
"Updating state for stream ${stream.descriptor} with batch $batch using cached ranges $cachedRangesMaybe"
}

val stateToSet =
cachedRangesMaybe?.state?.let { maxOf(it, batch.batch.state) } ?: batch.batch.state
val rangesToUpdate = TreeRangeSet.create(batch.ranges)
cachedRangesMaybe?.ranges?.also { rangesToUpdate.addAll(it) }

log.info { "Marking ranges for stream ${stream.descriptor} $rangesToUpdate as $stateToSet" }

// Force the ranges to overlap at their endpoints, in order to work around
// the behavior of `.encloses`, which otherwise would not consider adjacent ranges as
// contiguous.
// This ensures that a state message received at eg, index 10 (after messages 0..9 have
// been received), will pass `{'[0..5]','[6..9]'}.encloses('[0..10)')`.
val expanded =
batch.ranges.asRanges().map { it.span(Range.singleton(it.upperEndpoint() + 1)) }
rangesToUpdate.asRanges().map { it.span(Range.singleton(it.upperEndpoint() + 1)) }

when (batch.batch.state) {
when (stateToSet) {
Batch.State.PERSISTED -> {
rangesState[Batch.State.PERSISTED]?.addAll(expanded)
}
Expand All @@ -167,6 +185,10 @@ class DefaultStreamManager(
log.info {
"Updated ranges for ${stream.descriptor}[${batch.batch.state}]: $expanded. PERSISTED is also updated on COMPLETE."
}

batch.batch.groupId?.also {
cachedRangesById[it] = CachedRanges(stateToSet, rangesToUpdate)
}
}

/** True if all records in `[0, index)` have reached the given state. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import io.micronaut.context.annotation.Secondary
import jakarta.inject.Singleton
import java.util.concurrent.ConcurrentHashMap
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.withTimeoutOrNull

sealed interface SyncResult

Expand All @@ -29,7 +28,10 @@ interface SyncManager {
/** Get the manager for the given stream. Throws an exception if the stream is not found. */
fun getStreamManager(stream: DestinationStream.Descriptor): StreamManager

fun registerStartedStreamLoader(streamLoader: StreamLoader)
fun registerStartedStreamLoader(
streamDescriptor: DestinationStream.Descriptor,
streamLoaderResult: Result<StreamLoader>
)
suspend fun getOrAwaitStreamLoader(stream: DestinationStream.Descriptor): StreamLoader
suspend fun getStreamLoaderOrNull(stream: DestinationStream.Descriptor): StreamLoader?

Expand All @@ -56,32 +58,33 @@ class DefaultSyncManager(
) : SyncManager {
private val syncResult = CompletableDeferred<SyncResult>()
private val streamLoaders =
ConcurrentHashMap<DestinationStream.Descriptor, CompletableDeferred<StreamLoader>>()
ConcurrentHashMap<DestinationStream.Descriptor, CompletableDeferred<Result<StreamLoader>>>()
private val inputConsumed = CompletableDeferred<Boolean>()
private val checkpointsProcessed = CompletableDeferred<Boolean>()

override fun getStreamManager(stream: DestinationStream.Descriptor): StreamManager {
return streamManagers[stream] ?: throw IllegalArgumentException("Stream not found: $stream")
}

override fun registerStartedStreamLoader(streamLoader: StreamLoader) {
override fun registerStartedStreamLoader(
streamDescriptor: DestinationStream.Descriptor,
streamLoaderResult: Result<StreamLoader>
) {
streamLoaders
.getOrPut(streamLoader.stream.descriptor) { CompletableDeferred() }
.complete(streamLoader)
.getOrPut(streamDescriptor) { CompletableDeferred() }
.complete(streamLoaderResult)
}

override suspend fun getOrAwaitStreamLoader(
stream: DestinationStream.Descriptor
): StreamLoader {
return streamLoaders.getOrPut(stream) { CompletableDeferred() }.await()
return streamLoaders.getOrPut(stream) { CompletableDeferred() }.await().getOrThrow()
}

override suspend fun getStreamLoaderOrNull(
stream: DestinationStream.Descriptor
): StreamLoader? {
val completable = streamLoaders[stream]
// `.isCompleted` does not work as expected here.
return completable?.let { withTimeoutOrNull(1000L) { it.await() } }
return streamLoaders[stream]?.await()?.getOrNull()
}

override suspend fun awaitAllStreamsCompletedSuccessfully(): Boolean {
Expand Down
Loading

0 comments on commit b0d4537

Please sign in to comment.