Skip to content

Commit

Permalink
Merge branch 'master' into xiaohan/postgres-rfr
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaohansong authored May 7, 2024
2 parents 46672ad + 18c9ebc commit 7f80227
Show file tree
Hide file tree
Showing 95 changed files with 322 additions and 780 deletions.
33 changes: 17 additions & 16 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,22 @@

This page will walk through the process of developing with the Java CDK.

- [Developing with the Java CDK](#developing-with-the-java-cdk)
- [Intro to the Java CDK](#intro-to-the-java-cdk)
- [What is included in the Java CDK?](#what-is-included-in-the-java-cdk)
- [How is the CDK published?](#how-is-the-cdk-published)
- [Using the Java CDK](#using-the-java-cdk)
- [Building the CDK](#building-the-cdk)
- [Bumping the CDK version](#bumping-the-cdk-version)
- [Publishing the CDK](#publishing-the-cdk)
- [Developing Connectors with the Java CDK](#developing-connectors-with-the-java-cdk)
- [Referencing the CDK from Java connectors](#referencing-the-cdk-from-java-connectors)
- [Developing a connector alongside the CDK](#developing-a-connector-alongside-the-cdk)
- [Publishing the CDK and switching to a pinned CDK reference](#publishing-the-cdk-and-switching-to-a-pinned-cdk-reference)
- [Troubleshooting CDK Dependency Caches](#troubleshooting-cdk-dependency-caches)
- [Developing a connector against a pinned CDK version](#developing-a-connector-against-a-pinned-cdk-version)
- [Changelog](#changelog)
- [Java CDK](#java-cdk)
* [Developing with the Java CDK](#developing-with-the-java-cdk)
* [Intro to the Java CDK](#intro-to-the-java-cdk)
* [What is included in the Java CDK?](#what-is-included-in-the-java-cdk)
* [How is the CDK published?](#how-is-the-cdk-published)
* [Using the Java CDK](#using-the-java-cdk)
* [Building the CDK](#building-the-cdk)
* [Bumping the CDK version](#bumping-the-cdk-version)
* [Publishing the CDK](#publishing-the-cdk)
* [Developing Connectors with the Java CDK](#developing-connectors-with-the-java-cdk)
* [Referencing the CDK from Java connectors](#referencing-the-cdk-from-java-connectors)
* [Developing a connector alongside the CDK](#developing-a-connector-alongside-the-cdk)
* [Publishing the CDK and switching to a pinned CDK reference](#publishing-the-cdk-and-switching-to-a-pinned-cdk-reference)
* [Troubleshooting CDK Dependency Caches](#troubleshooting-cdk-dependency-caches)
* [Developing a connector against a pinned CDK version](#developing-a-connector-against-a-pinned-cdk-version)
* [Changelog](#changelog)
* [Java CDK](#java-cdk)

## Intro to the Java CDK

Expand Down Expand Up @@ -174,6 +174,7 @@ corresponds to that version.

| Version | Date | Pull Request | Subject |
|:--------| :--------- | :--------------------------------------------------------- |:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.34.0 | 2024-05-01 | [\#37712](https://github.com/airbytehq/airbyte/pull/37712) | Destinations: Remove incremental T+D |
| 0.33.2 | 2024-05-03 | [\#37824](https://github.com/airbytehq/airbyte/pull/37824) | improve source acceptance tests |
| 0.33.1 | 2024-05-03 | [\#37824](https://github.com/airbytehq/airbyte/pull/37824) | Add a unit test for cursor based sync |
| 0.33.0 | 2024-05-03 | [\#36935](https://github.com/airbytehq/airbyte/pull/36935) | Destinations: Enable non-safe-casting DV2 tests |
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.33.2
version=0.34.0
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ import io.airbyte.cdk.integrations.destination.StreamSyncSummary
import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnCloseFunction
import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnStartFunction
import io.airbyte.cdk.integrations.destination.jdbc.WriteConfig
import io.airbyte.integrations.base.destination.typing_deduping.TypeAndDedupeOperationValve
import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduper
import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair
import io.airbyte.protocol.models.v0.DestinationSyncMode
import io.airbyte.protocol.models.v0.StreamDescriptor
import io.github.oshai.kotlinlogging.KotlinLogging
Expand Down Expand Up @@ -104,34 +102,16 @@ object GeneralStagingFunctions {
tableName: String?,
schemaName: String?,
stagingOperations: StagingOperations,
streamNamespace: String?,
streamName: String?,
typerDeduperValve: TypeAndDedupeOperationValve,
typerDeduper: TyperDeduper
) {
try {
val rawTableInsertLock =
typerDeduper.getRawTableInsertLock(streamNamespace!!, streamName!!)
rawTableInsertLock.lock()
try {
stagingOperations.copyIntoTableFromStage(
database,
stageName,
stagingPath,
stagedFiles,
tableName,
schemaName
)
} finally {
rawTableInsertLock.unlock()
}

val streamId = AirbyteStreamNameNamespacePair(streamName, streamNamespace)
typerDeduperValve.addStreamIfAbsent(streamId)
if (typerDeduperValve.readyToTypeAndDedupe(streamId)) {
typerDeduper.typeAndDedupe(streamId.namespace, streamId.name, false)
typerDeduperValve.updateTimeAndIncreaseInterval(streamId)
}
stagingOperations.copyIntoTableFromStage(
database,
stageName,
stagingPath,
stagedFiles,
tableName,
schemaName
)
} catch (e: Exception) {
throw RuntimeException("Failed to upload data from stage $stagingPath", e)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import io.airbyte.cdk.integrations.destination.record_buffer.FlushBufferFunction
import io.airbyte.cdk.integrations.destination.record_buffer.SerializableBuffer
import io.airbyte.commons.exceptions.ConfigErrorException
import io.airbyte.commons.json.Jsons
import io.airbyte.integrations.base.destination.typing_deduping.TypeAndDedupeOperationValve
import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduper
import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
import io.github.oshai.kotlinlogging.KotlinLogging
Expand Down Expand Up @@ -46,8 +44,6 @@ object SerialFlush {
stagingOperations: StagingOperations,
writeConfigs: List<WriteConfig>,
catalog: ConfiguredAirbyteCatalog,
typerDeduperValve: TypeAndDedupeOperationValve,
typerDeduper: TyperDeduper
): FlushBufferFunction {
// TODO: (ryankfu) move this block of code that executes before the lambda to
// #onStartFunction
Expand Down Expand Up @@ -119,10 +115,6 @@ object SerialFlush {
writeConfig.outputTableName,
schemaName,
stagingOperations,
writeConfig.namespace,
writeConfig.streamName,
typerDeduperValve,
typerDeduper
)
}
} catch (e: Exception) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,13 @@ import io.airbyte.cdk.integrations.destination.jdbc.WriteConfig
import io.airbyte.cdk.integrations.destination.record_buffer.BufferCreateFunction
import io.airbyte.cdk.integrations.destination.record_buffer.SerializedBufferingStrategy
import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog
import io.airbyte.integrations.base.destination.typing_deduping.TypeAndDedupeOperationValve
import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduper
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.AirbyteStream
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
import java.time.Instant
import java.util.*
import java.util.UUID
import java.util.function.Consumer
import java.util.function.Function
import java.util.stream.Collectors
Expand All @@ -41,7 +40,6 @@ open class SerialStagingConsumerFactory {
config: JsonNode,
catalog: ConfiguredAirbyteCatalog,
purgeStagingData: Boolean,
typerDeduperValve: TypeAndDedupeOperationValve,
typerDeduper: TyperDeduper,
parsedCatalog: ParsedCatalog,
defaultNamespace: String?,
Expand Down Expand Up @@ -71,8 +69,6 @@ open class SerialStagingConsumerFactory {
stagingOperations,
writeConfigs,
catalog,
typerDeduperValve,
typerDeduper
)
),
GeneralStagingFunctions.onCloseFunction(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ internal class SerialStagingConsumerFactoryTest {
)
),
mock(),
mock(),
mock()
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import io.airbyte.cdk.integrations.destination.record_buffer.FileBuffer
import io.airbyte.cdk.integrations.destination.s3.csv.CsvSerializedBuffer
import io.airbyte.cdk.integrations.destination.s3.csv.StagingDatabaseCsvSheetGenerator
import io.airbyte.commons.json.Jsons
import io.airbyte.integrations.base.destination.typing_deduping.TypeAndDedupeOperationValve
import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduper
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
import io.airbyte.protocol.models.v0.StreamDescriptor
import io.github.oshai.kotlinlogging.KotlinLogging
Expand All @@ -30,8 +28,6 @@ internal class AsyncFlush(
private val stagingOperations: StagingOperations?,
private val database: JdbcDatabase?,
private val catalog: ConfiguredAirbyteCatalog?,
private val typerDeduperValve: TypeAndDedupeOperationValve,
private val typerDeduper: TyperDeduper,
// In general, this size is chosen to improve the performance of lower memory
// connectors. With 1 Gi
// of
Expand Down Expand Up @@ -114,10 +110,6 @@ internal class AsyncFlush(
writeConfig.outputTableName,
schemaName,
stagingOperations,
writeConfig.namespace,
writeConfig.streamName,
typerDeduperValve,
typerDeduper
)
} catch (e: Exception) {
logger.error(e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,17 @@ import io.airbyte.cdk.integrations.destination.async.state.FlushFailure
import io.airbyte.cdk.integrations.destination.jdbc.WriteConfig
import io.airbyte.commons.exceptions.ConfigErrorException
import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog
import io.airbyte.integrations.base.destination.typing_deduping.TypeAndDedupeOperationValve
import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduper
import io.airbyte.protocol.models.v0.*
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.AirbyteStream
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
import io.airbyte.protocol.models.v0.StreamDescriptor
import java.time.Instant
import java.util.*
import java.util.Optional
import java.util.concurrent.Executors
import java.util.function.Consumer
import java.util.function.Function
import java.util.stream.Collectors
import org.slf4j.Logger
import org.slf4j.LoggerFactory

Expand All @@ -43,7 +45,6 @@ private constructor(
private val config: JsonNode?,
private val catalog: ConfiguredAirbyteCatalog?,
private val purgeStagingData: Boolean,
private val typerDeduperValve: TypeAndDedupeOperationValve?,
private val typerDeduper: TyperDeduper?,
private val parsedCatalog: ParsedCatalog?,
private val defaultNamespace: String?,
Expand All @@ -65,7 +66,6 @@ private constructor(
var config: JsonNode? = null
var catalog: ConfiguredAirbyteCatalog? = null
var purgeStagingData: Boolean = false
var typerDeduperValve: TypeAndDedupeOperationValve? = null
var typerDeduper: TyperDeduper? = null
var parsedCatalog: ParsedCatalog? = null
var defaultNamespace: String? = null
Expand Down Expand Up @@ -102,7 +102,6 @@ private constructor(
config,
catalog,
purgeStagingData,
typerDeduperValve,
typerDeduper,
parsedCatalog,
defaultNamespace,
Expand All @@ -116,7 +115,6 @@ private constructor(

fun createAsync(): SerializedAirbyteMessageConsumer {
val typerDeduper = this.typerDeduper!!
val typerDeduperValve = this.typerDeduperValve!!
val stagingOperations = this.stagingOperations!!

val writeConfigs: List<WriteConfig> =
Expand All @@ -129,8 +127,6 @@ private constructor(
stagingOperations,
database,
catalog,
typerDeduperValve,
typerDeduper,
optimalBatchSizeBytes,
destinationColumns
)
Expand Down Expand Up @@ -173,7 +169,6 @@ private constructor(
config: JsonNode?,
catalog: ConfiguredAirbyteCatalog,
purgeStagingData: Boolean,
typerDeduperValve: TypeAndDedupeOperationValve,
typerDeduper: TyperDeduper,
parsedCatalog: ParsedCatalog?,
defaultNamespace: String?,
Expand All @@ -187,7 +182,6 @@ private constructor(
builder.config = config
builder.catalog = catalog
builder.purgeStagingData = purgeStagingData
builder.typerDeduperValve = typerDeduperValve
builder.typerDeduper = typerDeduper
builder.parsedCatalog = parsedCatalog
builder.defaultNamespace = defaultNamespace
Expand Down Expand Up @@ -220,19 +214,15 @@ private constructor(
streamDescToWriteConfig[streamIdentifier] = config
}
}
if (!conflictingStreams.isEmpty()) {
if (conflictingStreams.isNotEmpty()) {
var affectedStreamsAsString =
conflictingStreams.joinToString(", ") { config: WriteConfig ->
config.namespace + "." + config.streamName
}
val message =
String.format(
"You are trying to write multiple streams to the same table. Consider switching to a custom namespace format using \${SOURCE_NAMESPACE}, or moving one of them into a separate connection with a different stream prefix. Affected streams: %s",
conflictingStreams
.stream()
.map<String>(
Function<WriteConfig, String> { config: WriteConfig ->
config.namespace + "." + config.streamName
}
)
.collect(Collectors.joining(", "))
)
"You are trying to write multiple streams to the same table. Consider switching to a custom namespace format using " +
"\${SOURCE_NAMESPACE}, or moving one of them into a separate connection with a different stream prefix. " +
"Affected streams: $affectedStreamsAsString"
throw ConfigErrorException(message)
}
return streamDescToWriteConfig
Expand Down Expand Up @@ -303,7 +293,7 @@ private constructor(
@Suppress("deprecation") namingResolver!!.getTmpTableName(streamName)
val syncMode = stream.destinationSyncMode

val writeConfig: WriteConfig =
val writeConfig =
WriteConfig(
streamName,
abStream.namespace,
Expand Down
Loading

0 comments on commit 7f80227

Please sign in to comment.