Skip to content

Commit

Permalink
Merge branch 'master' into btkcodedev/outreachManifestonly
Browse files Browse the repository at this point in the history
  • Loading branch information
btkcodedev committed Dec 17, 2024
2 parents 48ce2c3 + b7e25c9 commit b99113e
Show file tree
Hide file tree
Showing 1,275 changed files with 38,242 additions and 28,184 deletions.
8 changes: 7 additions & 1 deletion .github/actions/run-airbyte-ci/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -195,11 +195,17 @@ runs:
shell: bash
run: tar cvzf ./dagger_engine_logs.tgz ./dagger_engine_logs

- name: Hash subcommand
id: hash-subcommand
shell: bash
if: always()
run: echo "::set-output name=subcommand_hash::$(echo ${{ inputs.subcommand }} | sha256sum | cut -d ' ' -f 1)"

- name: Upload logs to GitHub
id: upload-dagger-engine-logs
if: always()
uses: actions/upload-artifact@v4
with:
name: ${{ github.job }}_dagger_engine_logs.tgz
name: ${{ github.job }}_${{ steps.hash-subcommand.outputs.subcommand_hash }}_dagger_engine_logs.tgz
path: ./dagger_engine_logs.tgz
retention-days: 7
64 changes: 31 additions & 33 deletions .github/workflows/connectors_up_to_date.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,45 +13,43 @@ on:
description: "Options to pass to the 'airbyte-ci connectors' command group."
default: "--language=python --language=low-code --language=manifest-only"
jobs:
scheduled_connectors_up_to_date:
name: Connectors up-to-date [SCHEDULED TRIGGER]
runs-on: connector-nightly-xlarge
if: github.event_name == 'schedule'
strategy:
matrix:
connector_language:
- python
# TODO: re-add low_code and manifest-only once new SDM image is deemed stable
# - low_code
# - manifest-only
support_level:
- certified
- community
permissions:
pull-requests: write
generate_matrix:
name: Generate matrix
runs-on: ubuntu-latest
outputs:
generated_matrix: ${{ steps.generate_matrix.outputs.generated_matrix }}
steps:
- name: Checkout Airbyte
uses: actions/checkout@v4
- name: Run airbyte-ci connectors up-to-date [WORKFLOW]
id: airbyte-ci-connectors-up-to-date-workflow-dispatch
- name: Run airbyte-ci connectors list [SCHEDULED TRIGGER]
if: github.event_name == 'schedule'
id: airbyte-ci-connectors-list-scheduled
uses: ./.github/actions/run-airbyte-ci
with:
context: "master"
dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_CACHE_3 }}
docker_hub_password: ${{ secrets.DOCKER_HUB_PASSWORD }}
docker_hub_username: ${{ secrets.DOCKER_HUB_USERNAME }}
gcp_gsm_credentials: ${{ secrets.GCP_GSM_CREDENTIALS }}
gcs_credentials: ${{ secrets.METADATA_SERVICE_PROD_GCS_CREDENTIALS }}
github_token: ${{ secrets.GH_PAT_MAINTENANCE_OSS }}
sentry_dsn: ${{ secrets.SENTRY_AIRBYTE_CI_DSN }}
s3_build_cache_access_key_id: ${{ secrets.SELF_RUNNER_AWS_ACCESS_KEY_ID }}
s3_build_cache_secret_key: ${{ secrets.SELF_RUNNER_AWS_SECRET_ACCESS_KEY }}
subcommand: 'connectors --concurrency=10 --language=${{ matrix.connector_language}} --support-level=${{ matrix.support_level}} --metadata-query="\"source-declarative-manifest\" not in data.dockerRepository" --metadata-query="\"-rc.\" not in data.dockerImageTag" up-to-date --create-prs --auto-merge'
subcommand: 'connectors --language=python --language=low-code --language=manifest-only --metadata-query="\"-rc.\" not in data.dockerImageTag and \"source-declarative-manifest\" not in data.dockerRepository" list --output=selected_connectors.json'
- name: Run airbyte-ci connectors list [MANUAL TRIGGER]
if: github.event_name == 'workflow_dispatch'
id: airbyte-ci-connectors-list-workflow-dispatch
uses: ./.github/actions/run-airbyte-ci
with:
context: "master"
subcommand: 'connectors ${{ github.event.inputs.connectors-options }} --metadata-query="\"-rc.\" not in data.dockerImageTag and \"source-declarative-manifest\" not in data.dockerRepository" list --output=selected_connectors.json'
# We generate a dynamic matrix from the list of selected connectors.
# A matrix is required in this situation because the number of connectors is large and running them all in a single job would exceed the maximum job time limit of 6 hours.
- name: Generate matrix - 100 connectors per job
id: generate_matrix
run: |
matrix=$(jq -c -r '{include: [.[] | "--name=" + .] | to_entries | group_by(.key / 100 | floor) | map(map(.value) | {"connector_names": join(" ")})}' selected_connectors.json)
echo "::set-output name=generated_matrix::$matrix"
workflow_dispatch_connectors_up_to_date:
name: Connectors up-to-date [MANUAL TRIGGER]
runs-on: connector-nightly-xlarge
if: github.event_name == 'workflow_dispatch'
run_connectors_up_to_date:
needs: generate_matrix
name: Connectors up-to-date
runs-on: connector-up-to-date-medium
continue-on-error: true
strategy:
matrix: ${{fromJson(needs.generate_matrix.outputs.generated_matrix)}}
permissions:
pull-requests: write
steps:
Expand All @@ -71,4 +69,4 @@ jobs:
sentry_dsn: ${{ secrets.SENTRY_AIRBYTE_CI_DSN }}
s3_build_cache_access_key_id: ${{ secrets.SELF_RUNNER_AWS_ACCESS_KEY_ID }}
s3_build_cache_secret_key: ${{ secrets.SELF_RUNNER_AWS_SECRET_ACCESS_KEY }}
subcommand: 'connectors --concurrency=10 ${{ github.event.inputs.connectors-options }} --metadata-query="\"source-declarative-manifest\" not in data.dockerRepository" --metadata-query="\"-rc.\" not in data.dockerImageTag" up-to-date --create-prs --auto-merge'
subcommand: "connectors --concurrency=10 ${{ matrix.connector_names}} up-to-date --create-prs --auto-merge"
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,14 @@ class FeedReader(
}
var checkpoint: PartitionReadCheckpoint
try {
withTimeout(root.timeout.toKotlinDuration()) { partitionReader.run() }
if (partitionReader is UnlimitedTimePartitionReader) {
partitionReader.run()
} else {
log.info {
"Running partition reader with ${root.timeout.toKotlinDuration()} timeout"
}
withTimeout(root.timeout.toKotlinDuration()) { partitionReader.run() }
}
log.info {
"completed reading partition $partitionReaderID " +
"for '${feed.label}' in round $partitionsCreatorID"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,3 +154,6 @@ data class PartitionReadCheckpoint(
val opaqueStateValue: OpaqueStateValue,
val numRecords: Long,
)

/** A [PartitionReader] with no time limit for its execution. */
interface UnlimitedTimePartitionReader : PartitionReader
5 changes: 5 additions & 0 deletions airbyte-cdk/bulk/core/load/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ def integrationTestTask = tasks.register('integrationTest', Test) {
systemProperties = project.test.systemProperties
maxParallelForks = project.test.maxParallelForks
maxHeapSize = project.test.maxHeapSize

testLogging() {
events 'skipped', 'started', 'passed', 'failed'
exceptionFormat 'full'
}
}

// These tests are lightweight enough to run on every PR.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class MockBasicFunctionalityIntegrationTest :
preserveUndeclaredFields = true,
commitDataIncrementally = false,
allTypesBehavior = Untyped,
supportFileTransfer = false,
) {
@Test
override fun testBasicWrite() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.cdk.load.mock_integration_test

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
import io.airbyte.cdk.load.command.Append
import io.airbyte.cdk.load.command.Dedupe
import io.airbyte.cdk.load.command.DestinationStream
Expand All @@ -12,13 +13,15 @@ import io.airbyte.cdk.load.message.Batch
import io.airbyte.cdk.load.message.DestinationFile
import io.airbyte.cdk.load.message.DestinationRecord
import io.airbyte.cdk.load.message.SimpleBatch
import io.airbyte.cdk.load.state.StreamIncompleteResult
import io.airbyte.cdk.load.state.StreamProcessingFailed
import io.airbyte.cdk.load.test.util.OutputRecord
import io.airbyte.cdk.load.write.DestinationWriter
import io.airbyte.cdk.load.write.StreamLoader
import io.github.oshai.kotlinlogging.KotlinLogging
import java.time.Instant
import java.util.UUID
import javax.inject.Singleton
import kotlinx.coroutines.delay

@Singleton
class MockDestinationWriter : DestinationWriter {
Expand All @@ -27,18 +30,22 @@ class MockDestinationWriter : DestinationWriter {
}
}

@SuppressFBWarnings("NP_NONNULL_PARAM_VIOLATION", justification = "Kotlin async continuation")
class MockStreamLoader(override val stream: DestinationStream) : StreamLoader {
data class LocalBatch(val records: List<DestinationRecord>) : Batch {
override val state = Batch.State.LOCAL
private val log = KotlinLogging.logger {}

abstract class MockBatch : Batch {
override val groupId: String? = null
}
data class LocalFileBatch(val file: DestinationFile) : Batch {

data class LocalBatch(val records: List<DestinationRecord>) : MockBatch() {
override val state = Batch.State.LOCAL
}
data class PersistedBatch(val records: List<DestinationRecord>) : Batch {
override val state = Batch.State.PERSISTED
data class LocalFileBatch(val file: DestinationFile) : MockBatch() {
override val state = Batch.State.LOCAL
}

override suspend fun close(streamFailure: StreamIncompleteResult?) {
override suspend fun close(streamFailure: StreamProcessingFailed?) {
if (streamFailure == null) {
when (val importType = stream.importType) {
is Append -> {
Expand Down Expand Up @@ -78,6 +85,7 @@ class MockStreamLoader(override val stream: DestinationStream) : StreamLoader {
override suspend fun processBatch(batch: Batch): Batch {
return when (batch) {
is LocalBatch -> {
log.info { "Persisting ${batch.records.size} records for ${stream.descriptor}" }
batch.records.forEach {
val filename = getFilename(it.stream, staging = true)
val record =
Expand All @@ -95,9 +103,14 @@ class MockStreamLoader(override val stream: DestinationStream) : StreamLoader {
// blind insert into the staging area. We'll dedupe on commit.
MockDestinationBackend.insert(filename, record)
}
PersistedBatch(batch.records)
// HACK: This destination is too fast and causes a race
// condition between consuming and flushing state messages
// that causes the test to fail. This would not be an issue
// in a real sync, because we would always either get more
// data or an end-of-stream that would force a final flush.
delay(100L)
SimpleBatch(state = Batch.State.COMPLETE)
}
is PersistedBatch -> SimpleBatch(state = Batch.State.COMPLETE)
else -> throw IllegalStateException("Unexpected batch type: $batch")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ data class DestinationCatalog(val streams: List<DestinationStream> = emptyList()

fun asProtocolObject(): ConfiguredAirbyteCatalog =
ConfiguredAirbyteCatalog().withStreams(streams.map { it.asProtocolObject() })

fun size(): Int = streams.size
}

interface DestinationCatalogFactory {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ abstract class DestinationConfiguration : Configuration {
*/
open val gracefulCancellationTimeoutMs: Long = 60 * 1000L // 1 minutes

open val numProcessRecordsWorkers: Int = 2
open val numProcessBatchWorkers: Int = 5
open val batchQueueDepth: Int = 10

/**
* Micronaut factory which glues [ConfigurationSpecificationSupplier] and
* [DestinationConfigurationFactory] together to produce a [DestinationConfiguration] singleton.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,25 @@

package io.airbyte.cdk.load.config

import io.airbyte.cdk.load.command.DestinationCatalog
import io.airbyte.cdk.load.command.DestinationConfiguration
import io.airbyte.cdk.load.message.BatchEnvelope
import io.airbyte.cdk.load.message.MultiProducerChannel
import io.airbyte.cdk.load.state.ReservationManager
import io.airbyte.cdk.load.task.implementor.FileAggregateMessage
import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.context.annotation.Factory
import io.micronaut.context.annotation.Value
import jakarta.inject.Named
import jakarta.inject.Singleton
import kotlin.math.min
import kotlinx.coroutines.channels.Channel

/** Factory for instantiating beans necessary for the sync process. */
@Factory
class SyncBeanFactory {
private val log = KotlinLogging.logger {}

@Singleton
@Named("memoryManager")
fun memoryManager(
Expand All @@ -31,4 +40,43 @@ class SyncBeanFactory {
): ReservationManager {
return ReservationManager(availableBytes)
}

/**
* The queue that sits between the aggregation (SpillToDiskTask) and load steps
* (ProcessRecordsTask).
*
* Since we are buffering on disk, we must consider the available disk space in our depth
* configuration.
*/
@Singleton
@Named("fileAggregateQueue")
fun fileAggregateQueue(
@Value("\${airbyte.resources.disk.bytes}") availableBytes: Long,
config: DestinationConfiguration,
catalog: DestinationCatalog
): MultiProducerChannel<FileAggregateMessage> {
val streamCount = catalog.size()
// total batches by disk capacity
val maxBatchesThatFitOnDisk = (availableBytes / config.recordBatchSizeBytes).toInt()
// account for batches in flight processing by the workers
val maxBatchesMinusUploadOverhead =
maxBatchesThatFitOnDisk - config.numProcessRecordsWorkers
// ideally we'd allow enough headroom to smooth out rate differences between consumer /
// producer streams
val idealDepth = 4 * config.numProcessRecordsWorkers
// take the smaller of the two—this should be the idealDepth except in corner cases
val capacity = min(maxBatchesMinusUploadOverhead, idealDepth)
log.info { "Creating file aggregate queue with limit $capacity" }
val channel = Channel<FileAggregateMessage>(capacity)
return MultiProducerChannel(streamCount.toLong(), channel)
}

@Singleton
@Named("batchQueue")
fun batchQueue(
config: DestinationConfiguration,
): MultiProducerChannel<BatchEnvelope<*>> {
val channel = Channel<BatchEnvelope<*>>(config.batchQueueDepth)
return MultiProducerChannel(config.numProcessRecordsWorkers.toLong(), channel)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ import java.time.temporal.ChronoUnit
*/
class TimeStringToInteger : AirbyteValueIdentityMapper() {
companion object {
private val DATE_TIME_FORMATTER: DateTimeFormatter =
val DATE_TIME_FORMATTER: DateTimeFormatter =
DateTimeFormatter.ofPattern(
"[yyyy][yy]['-']['/']['.'][' '][MMM][MM][M]['-']['/']['.'][' '][dd][d][[' '][G]][[' ']['T']HH:mm[':'ss[.][SSSSSS][SSSSS][SSSS][SSS][' '][z][zzz][Z][O][x][XXX][XX][X][[' '][G]]]]"
)
private val TIME_FORMATTER: DateTimeFormatter =
val TIME_FORMATTER: DateTimeFormatter =
DateTimeFormatter.ofPattern(
"HH:mm[':'ss[.][SSSSSS][SSSSS][SSSS][SSS][' '][z][zzz][Z][O][x][XXX][XX][X]]"
)
Expand Down
Loading

0 comments on commit b99113e

Please sign in to comment.