diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt index 8a01b0685b60..b88d5b4dd089 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt @@ -148,7 +148,7 @@ class DefaultDestinationTaskLauncher( log.info { "Task $innerTask was cancelled." } throw e } catch (e: Exception) { - log.error { "Caught exception in task $innerTask: $e" } + log.error(e) { "Caught exception in task $innerTask" } handleException(e) } } diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/DestinationCleaner.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/DestinationCleaner.kt index 1ab6c2e20f10..84375e28ea31 100644 --- a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/DestinationCleaner.kt +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/DestinationCleaner.kt @@ -8,6 +8,10 @@ fun interface DestinationCleaner { /** * Search the test destination for old test data and delete it. This should leave recent data * (e.g. from the last week) untouched, to avoid causing failures in actively-running tests. + * + * Implementers should generally list all namespaces in the destination, filter for namespace + * which match [IntegrationTest.randomizedNamespaceRegex], and then use + * [IntegrationTest.isNamespaceOld] to filter down to namespaces which can be deleted. */ fun cleanup() } diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/IntegrationTest.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/IntegrationTest.kt index a8e91bf95cd6..56a7cdae05f0 100644 --- a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/IntegrationTest.kt +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/IntegrationTest.kt @@ -19,6 +19,7 @@ import io.airbyte.protocol.models.v0.AirbyteMessage import io.airbyte.protocol.models.v0.AirbyteStateMessage import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage.AirbyteStreamStatus import java.time.Instant +import java.time.LocalDate import java.time.LocalDateTime import java.time.ZoneOffset import java.time.format.DateTimeFormatter @@ -63,7 +64,7 @@ abstract class IntegrationTest( @Suppress("DEPRECATION") private val randomSuffix = RandomStringUtils.randomAlphabetic(4) private val timestampString = LocalDateTime.ofInstant(Instant.now(), ZoneOffset.UTC) - .format(DateTimeFormatter.ofPattern("YYYYMMDD")) + .format(randomizedNamespaceDateFormatter) // stream name doesn't need to be randomized, only the namespace. val randomizedNamespace = "test$timestampString$randomSuffix" @@ -262,6 +263,23 @@ abstract class IntegrationTest( } companion object { + val randomizedNamespaceRegex = Regex("test(\\d{8})[A-Za-z]{4}") + val randomizedNamespaceDateFormatter: DateTimeFormatter = + DateTimeFormatter.ofPattern("yyyyMMdd") + + /** + * Given a randomizedNamespace (such as `test20241216abcd`), return whether the namespace + * was created more than [retentionDays] days ago, and therefore should be deleted by a + * [DestinationCleaner]. + */ + fun isNamespaceOld(namespace: String, retentionDays: Long = 30): Boolean { + val cleanupCutoffDate = LocalDate.now().minusDays(retentionDays) + val matchResult = randomizedNamespaceRegex.find(namespace) + val namespaceCreationDate = + LocalDate.parse(matchResult!!.groupValues[1], randomizedNamespaceDateFormatter) + return namespaceCreationDate.isBefore(cleanupCutoffDate) + } + private val hasRunCleaner = AtomicBoolean(false) // Connectors are calling System.getenv rather than using micronaut-y properties, diff --git a/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/command/iceberg/parquet/IcebergCatalogSpecifications.kt b/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/command/iceberg/parquet/IcebergCatalogSpecifications.kt index 49247d0c2100..6c93e70d1e6e 100644 --- a/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/command/iceberg/parquet/IcebergCatalogSpecifications.kt +++ b/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/command/iceberg/parquet/IcebergCatalogSpecifications.kt @@ -181,7 +181,7 @@ class GlueCatalogSpecification( @get:JsonPropertyDescription( "The AWS Account ID associated with the Glue service used by the Iceberg catalog." ) - @get:JsonProperty("glue_id") + @JsonProperty("glue_id") @JsonSchemaInject(json = """{"order":1}""") val glueId: String, ) : CatalogType(catalogType) diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/metadata.yaml b/airbyte-integrations/connectors/destination-iceberg-v2/metadata.yaml index 831b9aade4e0..0a8bb55624cd 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/metadata.yaml +++ b/airbyte-integrations/connectors/destination-iceberg-v2/metadata.yaml @@ -2,7 +2,7 @@ data: connectorSubtype: file connectorType: destination definitionId: 37a928c1-2d5c-431a-a97d-ae236bd1ea0c - dockerImageTag: 0.1.14 + dockerImageTag: 0.1.15 dockerRepository: airbyte/destination-iceberg-v2 githubIssueLabel: destination-iceberg-v2 icon: icon.svg @@ -26,59 +26,9 @@ data: - suite: unitTests - suite: integrationTests testSecrets: - - name: SECRET_DESTINATION-S3-V2-MINIMAL-REQUIRED-CONFIG - fileName: s3_dest_v2_minimal_required_config.json + - name: SECRET_DESTINATION-ICEBERG_V2_S3_GLUE_CONFIG + fileName: glue.json secretStore: type: GSM alias: airbyte-connector-testing-secret-store -# - name: SECRET_DESTINATION-S3-V2-JSONL-ROOT-LEVEL-FLATTENING -# fileName: s3_dest_v2_jsonl_root_level_flattening_config.json -# secretStore: -# type: GSM -# alias: airbyte-connector-testing-secret-store -# - name: SECRET_DESTINATION-S3-V2-JSONL-GZIP -# fileName: s3_dest_v2_jsonl_gzip_config.json -# secretStore: -# type: GSM -# alias: airbyte-connector-testing-secret-store -# - name: SECRET_DESTINATION-S3-V2-JSONL-STAGING -# fileName: s3_dest_v2_jsonl_staging_config.json -# secretStore: -# type: GSM -# alias: airbyte-connector-testing-secret-store -# - name: SECRET_DESTINATION-S3-V2-CSV -# fileName: s3_dest_v2_csv_config.json -# secretStore: -# type: GSM -# alias: airbyte-connector-testing-secret-store -# - name: SECRET_DESTINATION-S3-V2-CSV-ROOT-LEVEL-FLATTENING -# fileName: s3_dest_v2_csv_root_level_flattening_config.json -# secretStore: -# type: GSM -# alias: airbyte-connector-testing-secret-store -# - name: SECRET_DESTINATION-S3-V2-CSV-GZIP -# fileName: s3_dest_v2_csv_gzip_config.json -# secretStore: -# type: GSM -# alias: airbyte-connector-testing-secret-store -# - name: SECRET_DESTINATION-S3-V2-AVRO -# fileName: s3_dest_v2_avro_config.json -# secretStore: -# type: GSM -# alias: airbyte-connector-testing-secret-store -# - name: SECRET_DESTINATION-S3-V2-AVRO-BZIP2 -# fileName: s3_dest_v2_avro_bzip2_config.json -# secretStore: -# type: GSM -# alias: airbyte-connector-testing-secret-store -# - name: SECRET_DESTINATION-S3-V2-PARQUET -# fileName: s3_dest_v2_parquet_config.json -# secretStore: -# type: GSM -# alias: airbyte-connector-testing-secret-store -# - name: SECRET_DESTINATION-S3-V2-PARQUET-SNAPPY -# fileName: s3_dest_v2_parquet_snappy_config.json -# secretStore: -# type: GSM -# alias: airbyte-connector-testing-secret-store metadataSpecVersion: "1.0" diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2Checker.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2Checker.kt index bfa977196f4e..d790c27560cd 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2Checker.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2Checker.kt @@ -8,7 +8,6 @@ import io.airbyte.cdk.load.check.DestinationChecker import io.airbyte.cdk.load.command.DestinationStream import io.airbyte.integrations.destination.iceberg.v2.io.IcebergTableCleaner import io.airbyte.integrations.destination.iceberg.v2.io.IcebergUtil -import io.airbyte.integrations.destination.iceberg.v2.io.toIcebergTableIdentifier import javax.inject.Singleton import org.apache.iceberg.Schema import org.apache.iceberg.types.Types @@ -16,7 +15,8 @@ import org.apache.iceberg.types.Types @Singleton class IcebergV2Checker( private val icebergTableCleaner: IcebergTableCleaner, - private val icebergUtil: IcebergUtil + private val icebergUtil: IcebergUtil, + private val tableIdGenerator: TableIdGenerator, ) : DestinationChecker { override fun check(config: IcebergV2Configuration) { @@ -43,7 +43,7 @@ class IcebergV2Checker( icebergTableCleaner.clearTable( catalog, - testTableIdentifier.toIcebergTableIdentifier(), + tableIdGenerator.toTableIdentifier(testTableIdentifier), table.io(), table.location() ) diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/TableIdGenerator.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/TableIdGenerator.kt new file mode 100644 index 000000000000..c52dfd06f44a --- /dev/null +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/TableIdGenerator.kt @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.iceberg.v2 + +import io.airbyte.cdk.load.command.DestinationStream +import io.airbyte.cdk.load.command.iceberg.parquet.GlueCatalogConfiguration +import io.micronaut.context.annotation.Factory +import javax.inject.Singleton +import org.apache.iceberg.catalog.Namespace +import org.apache.iceberg.catalog.TableIdentifier + +/** + * Convert our internal stream descriptor to an Iceberg [TableIdentifier]. Implementations should + * handle catalog-specific naming restrictions. + */ +// TODO accept default namespace in config as a val here +interface TableIdGenerator { + fun toTableIdentifier(stream: DestinationStream.Descriptor): TableIdentifier +} + +class SimpleTableIdGenerator : TableIdGenerator { + override fun toTableIdentifier(stream: DestinationStream.Descriptor): TableIdentifier = + tableIdOf(stream.namespace!!, stream.name) +} + +/** AWS Glue requires lowercase database+table names. */ +class GlueTableIdGenerator : TableIdGenerator { + override fun toTableIdentifier(stream: DestinationStream.Descriptor): TableIdentifier = + tableIdOf(stream.namespace!!.lowercase(), stream.name.lowercase()) +} + +@Factory +class TableIdGeneratorFactory(private val icebergConfiguration: IcebergV2Configuration) { + @Singleton + fun create() = + when (icebergConfiguration.icebergCatalogConfiguration.catalogConfiguration) { + is GlueCatalogConfiguration -> GlueTableIdGenerator() + else -> SimpleTableIdGenerator() + } +} + +// iceberg namespace+name must both be nonnull. +private fun tableIdOf(namespace: String, name: String) = + TableIdentifier.of(Namespace.of(namespace), name) diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtil.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtil.kt index 3771af1b36ac..b760d8bac44d 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtil.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtil.kt @@ -20,6 +20,7 @@ import io.airbyte.integrations.destination.iceberg.v2.ACCESS_KEY_ID import io.airbyte.integrations.destination.iceberg.v2.GlueCredentialsProvider import io.airbyte.integrations.destination.iceberg.v2.IcebergV2Configuration import io.airbyte.integrations.destination.iceberg.v2.SECRET_ACCESS_KEY +import io.airbyte.integrations.destination.iceberg.v2.TableIdGenerator import io.github.oshai.kotlinlogging.KotlinLogging import jakarta.inject.Singleton import org.apache.hadoop.conf.Configuration @@ -40,30 +41,19 @@ import org.apache.iceberg.aws.AwsProperties import org.apache.iceberg.aws.s3.S3FileIO import org.apache.iceberg.aws.s3.S3FileIOProperties import org.apache.iceberg.catalog.Catalog -import org.apache.iceberg.catalog.Namespace import org.apache.iceberg.catalog.SupportsNamespaces -import org.apache.iceberg.catalog.TableIdentifier import org.apache.iceberg.data.Record import org.apache.iceberg.exceptions.AlreadyExistsException import org.projectnessie.client.NessieConfigConstants +import software.amazon.awssdk.services.glue.model.ConcurrentModificationException private val logger = KotlinLogging.logger {} const val AIRBYTE_CDC_DELETE_COLUMN = "_ab_cdc_deleted_at" -/** - * Extension function for the[DestinationStream.Descriptor] class that converts the descriptor to an - * Iceberg [TableIdentifier]. - * - * @return An Iceberg [TableIdentifier] representation of the stream descriptor. - */ -fun DestinationStream.Descriptor.toIcebergTableIdentifier(): TableIdentifier { - return TableIdentifier.of(Namespace.of(this.namespace), this.name) -} - /** Collection of Iceberg related utilities. */ @Singleton -class IcebergUtil { +class IcebergUtil(private val tableIdGenerator: TableIdGenerator) { internal class InvalidFormatException(message: String) : Exception(message) private val generationIdRegex = Regex("""ab-generation-id-\d+-e""") @@ -117,7 +107,7 @@ class IcebergUtil { schema: Schema, properties: Map ): Table { - val tableIdentifier = streamDescriptor.toIcebergTableIdentifier() + val tableIdentifier = tableIdGenerator.toTableIdentifier(streamDescriptor) synchronized(tableIdentifier.namespace()) { if ( catalog is SupportsNamespaces && @@ -135,6 +125,11 @@ class IcebergUtil { logger.info { "Namespace '${tableIdentifier.namespace()}' was likely created by another thread during parallel operations." } + } catch (e: ConcurrentModificationException) { + // do the same for AWS Glue + logger.info { + "Namespace '${tableIdentifier.namespace()}' was likely created by another thread during parallel operations." + } } } } diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergDestinationCleaner.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergDestinationCleaner.kt new file mode 100644 index 000000000000..447abd82fcba --- /dev/null +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergDestinationCleaner.kt @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.iceberg.v2 + +import io.airbyte.cdk.load.test.util.DestinationCleaner +import io.airbyte.cdk.load.test.util.IntegrationTest.Companion.isNamespaceOld +import io.airbyte.cdk.load.test.util.IntegrationTest.Companion.randomizedNamespaceRegex +import io.airbyte.integrations.destination.iceberg.v2.io.IcebergTableCleaner +import io.airbyte.integrations.destination.iceberg.v2.io.IcebergUtil +import org.apache.iceberg.catalog.Catalog +import org.apache.iceberg.catalog.Namespace +import org.apache.iceberg.catalog.SupportsNamespaces + +class IcebergDestinationCleaner(private val catalog: Catalog) : DestinationCleaner { + override fun cleanup() { + val namespaces: List = + (catalog as SupportsNamespaces).listNamespaces().filter { + val namespace = it.level(0) + randomizedNamespaceRegex.matches(namespace) && isNamespaceOld(namespace) + } + + // we're passing explicit TableIdentifier to clearTable, so just use SimpleTableIdGenerator + val tableCleaner = IcebergTableCleaner(IcebergUtil(SimpleTableIdGenerator())) + + namespaces.forEach { namespace -> + catalog.listTables(namespace).forEach { tableId -> + val table = catalog.loadTable(tableId) + tableCleaner.clearTable(catalog, tableId, table.io(), table.location()) + } + catalog.dropNamespace(namespace) + } + } +} diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2CheckTest.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2CheckTest.kt index 3c619ea0ea98..420bab9ef5a4 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2CheckTest.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2CheckTest.kt @@ -6,13 +6,14 @@ package io.airbyte.integrations.destination.iceberg.v2 import io.airbyte.cdk.load.check.CheckIntegrationTest import io.airbyte.cdk.load.check.CheckTestConfig -import io.airbyte.integrations.destination.iceberg.v2.IcebergV2TestUtil.PATH -import org.junit.jupiter.api.Disabled +import io.airbyte.integrations.destination.iceberg.v2.IcebergV2TestUtil.GLUE_CONFIG_PATH -@Disabled class IcebergV2CheckTest : CheckIntegrationTest( - successConfigFilenames = listOf(CheckTestConfig(PATH)), + successConfigFilenames = + listOf( + CheckTestConfig(GLUE_CONFIG_PATH), + ), // TODO we maybe should add some configs that are expected to fail `check` failConfigFilenamesAndFailureReasons = mapOf(), ) diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2DataDumper.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2DataDumper.kt index 5689320eedbb..7a755a5d4695 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2DataDumper.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2DataDumper.kt @@ -11,17 +11,13 @@ import io.airbyte.cdk.load.data.parquet.ParquetMapperPipelineFactory import io.airbyte.cdk.load.message.DestinationRecord import io.airbyte.cdk.load.test.util.DestinationDataDumper import io.airbyte.cdk.load.test.util.OutputRecord -import io.airbyte.integrations.destination.iceberg.v2.io.IcebergUtil import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange import java.math.BigDecimal import java.time.Instant import java.util.LinkedHashMap import java.util.UUID -import org.apache.hadoop.conf.Configuration -import org.apache.iceberg.catalog.TableIdentifier import org.apache.iceberg.data.IcebergGenerics import org.apache.iceberg.data.Record -import org.apache.iceberg.nessie.NessieCatalog object IcebergV2DataDumper : DestinationDataDumper { @@ -83,15 +79,13 @@ object IcebergV2DataDumper : DestinationDataDumper { spec: ConfigurationSpecification, stream: DestinationStream ): List { - val config = - IcebergV2ConfigurationFactory() - .makeWithoutExceptionHandling(spec as IcebergV2Specification) + val config = IcebergV2TestUtil.getConfig(spec) val pipeline = ParquetMapperPipelineFactory().create(stream) val schema = pipeline.finalSchema as ObjectType - val catalog = getNessieCatalog(config) + val catalog = IcebergV2TestUtil.getCatalog(config) val table = catalog.loadTable( - TableIdentifier.of(stream.descriptor.namespace, stream.descriptor.name) + TableIdGeneratorFactory(config).create().toTableIdentifier(stream.descriptor) ) val outputRecords = mutableListOf() @@ -121,7 +115,10 @@ object IcebergV2DataDumper : DestinationDataDumper { } } - catalog.close() + // some catalogs (e.g. Nessie) have a close() method. Call it here. + if (catalog is AutoCloseable) { + catalog.close() + } return outputRecords } @@ -131,13 +128,4 @@ object IcebergV2DataDumper : DestinationDataDumper { ): List { throw NotImplementedError("Iceberg doesn't support universal file transfer") } - - private fun getNessieCatalog(config: IcebergV2Configuration): NessieCatalog { - val catalogProperties = IcebergUtil().toCatalogProperties(config) - - val catalog = NessieCatalog() - catalog.setConf(Configuration()) - catalog.initialize("nessie", catalogProperties) - return catalog - } } diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2TestUtil.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2TestUtil.kt index 8de9fa59ca90..2da1841dbf0c 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2TestUtil.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2TestUtil.kt @@ -4,15 +4,26 @@ package io.airbyte.integrations.destination.iceberg.v2 +import io.airbyte.cdk.command.ConfigurationSpecification +import io.airbyte.cdk.command.ValidatedJsonUtils +import io.airbyte.integrations.destination.iceberg.v2.io.IcebergUtil import java.nio.file.Files import java.nio.file.Path object IcebergV2TestUtil { - // TODO this is just here as an example, we should remove it + add real configs - private val resource = - this::class.java.classLoader.getResource("iceberg_dest_v2_minimal_required_config.json") - ?: throw IllegalArgumentException("File not found in resources") - val PATH: Path = Path.of(resource.toURI()) + val GLUE_CONFIG_PATH: Path = Path.of("secrets/glue.json") - fun getConfig(configPath: String): String = Files.readString(Path.of(configPath)) + fun parseConfig(path: Path) = + getConfig( + ValidatedJsonUtils.parseOne(IcebergV2Specification::class.java, Files.readString(path)) + ) + + fun getConfig(spec: ConfigurationSpecification) = + IcebergV2ConfigurationFactory().makeWithoutExceptionHandling(spec as IcebergV2Specification) + + fun getCatalog(config: IcebergV2Configuration) = + IcebergUtil(SimpleTableIdGenerator()).let { icebergUtil -> + val props = icebergUtil.toCatalogProperties(config) + icebergUtil.createCatalog(DEFAULT_CATALOG_NAME, props) + } } diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriteTest.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriteTest.kt index 1f227440dfc9..573bd5e59ce7 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriteTest.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriteTest.kt @@ -6,22 +6,29 @@ package io.airbyte.integrations.destination.iceberg.v2 import com.fasterxml.jackson.databind.JsonNode import com.fasterxml.jackson.databind.ObjectMapper +import io.airbyte.cdk.load.test.util.DestinationCleaner import io.airbyte.cdk.load.test.util.NoopDestinationCleaner import io.airbyte.cdk.load.test.util.NoopExpectedRecordMapper import io.airbyte.cdk.load.write.BasicFunctionalityIntegrationTest import io.airbyte.cdk.load.write.StronglyTyped +import java.nio.file.Files import java.util.Base64 -import okhttp3.* +import okhttp3.FormBody +import okhttp3.OkHttpClient +import okhttp3.Request import org.junit.jupiter.api.BeforeAll import org.junit.jupiter.api.Disabled import org.junit.jupiter.api.Test -abstract class IcebergV2WriteTest(configContents: String) : +abstract class IcebergV2WriteTest( + configContents: String, + destinationCleaner: DestinationCleaner, +) : BasicFunctionalityIntegrationTest( configContents, IcebergV2Specification::class.java, IcebergV2DataDumper, - NoopDestinationCleaner, + destinationCleaner, NoopExpectedRecordMapper, isStreamSchemaRetroactive = true, supportsDedup = false, @@ -32,19 +39,6 @@ abstract class IcebergV2WriteTest(configContents: String) : supportFileTransfer = false, allTypesBehavior = StronglyTyped(), ) { - companion object { - @JvmStatic - @BeforeAll - fun setup() { - NessieTestContainers.start() - } - } -} - -@Disabled( - "This is currently disabled until we are able to make it run via airbyte-ci. It works as expected locally" -) -class IcebergNessieMinioWriteTest : IcebergV2WriteTest(getConfig()) { @Test @Disabled( "Expected because we seem to be mapping timestamps to long when we should be mapping them to an OffsetDateTime" @@ -92,7 +86,27 @@ class IcebergNessieMinioWriteTest : IcebergV2WriteTest(getConfig()) { override fun testUnions() { super.testUnions() } +} +class IcebergGlueWriteTest : + IcebergV2WriteTest( + Files.readString(IcebergV2TestUtil.GLUE_CONFIG_PATH), + IcebergDestinationCleaner( + IcebergV2TestUtil.getCatalog( + IcebergV2TestUtil.parseConfig(IcebergV2TestUtil.GLUE_CONFIG_PATH) + ) + ), + ) + +@Disabled( + "This is currently disabled until we are able to make it run via airbyte-ci. It works as expected locally" +) +class IcebergNessieMinioWriteTest : + IcebergV2WriteTest( + getConfig(), + // we're writing to ephemeral testcontainers, so no need to clean up after ourselves + NoopDestinationCleaner + ) { companion object { private fun getToken(): String { val client = OkHttpClient() @@ -140,5 +154,11 @@ class IcebergNessieMinioWriteTest : IcebergV2WriteTest(getConfig()) { } """.trimIndent() } + + @JvmStatic + @BeforeAll + fun setup() { + NessieTestContainers.start() + } } } diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/resources/iceberg_dest_v2_minimal_required_config.json b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/resources/iceberg_dest_v2_minimal_required_config.json deleted file mode 100644 index eac8923528c5..000000000000 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/resources/iceberg_dest_v2_minimal_required_config.json +++ /dev/null @@ -1,7 +0,0 @@ -{ - "s3_bucket_name": "bucket", - "s3_bucket_region": "us-east-1", - "server_uri": "http://localhost:19120/api/v1", - "warehouse_location": "s3://demobucket/", - "main_branch_name": "main" -} diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtilTest.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtilTest.kt index 82279fbd0bef..10ada5c71dec 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtilTest.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtilTest.kt @@ -27,6 +27,7 @@ import io.airbyte.cdk.load.message.DestinationRecord.Meta.Companion.COLUMN_NAME_ import io.airbyte.cdk.load.message.DestinationRecord.Meta.Companion.COLUMN_NAME_AB_META import io.airbyte.cdk.load.message.DestinationRecord.Meta.Companion.COLUMN_NAME_AB_RAW_ID import io.airbyte.integrations.destination.iceberg.v2.IcebergV2Configuration +import io.airbyte.integrations.destination.iceberg.v2.SimpleTableIdGenerator import io.mockk.every import io.mockk.mockk import io.mockk.verify @@ -52,10 +53,11 @@ import org.junit.jupiter.api.assertThrows internal class IcebergUtilTest { private lateinit var icebergUtil: IcebergUtil + private val tableIdGenerator = SimpleTableIdGenerator() @BeforeEach fun setup() { - icebergUtil = IcebergUtil() + icebergUtil = IcebergUtil(tableIdGenerator) } @Test @@ -86,11 +88,13 @@ internal class IcebergUtilTest { every { create() } returns mockk() } val catalog: NessieCatalog = mockk { - every { buildTable(streamDescriptor.toIcebergTableIdentifier(), any()) } returns - tableBuilder + every { + buildTable(tableIdGenerator.toTableIdentifier(streamDescriptor), any()) + } returns tableBuilder every { createNamespace(any()) } returns Unit every { namespaceExists(any()) } returns false - every { tableExists(streamDescriptor.toIcebergTableIdentifier()) } returns false + every { tableExists(tableIdGenerator.toTableIdentifier(streamDescriptor)) } returns + false } val table = icebergUtil.createTable( @@ -101,7 +105,9 @@ internal class IcebergUtilTest { ) assertNotNull(table) verify(exactly = 1) { - catalog.createNamespace(streamDescriptor.toIcebergTableIdentifier().namespace()) + catalog.createNamespace( + tableIdGenerator.toTableIdentifier(streamDescriptor).namespace() + ) } verify(exactly = 1) { tableBuilder.create() } } @@ -119,10 +125,12 @@ internal class IcebergUtilTest { every { create() } returns mockk() } val catalog: NessieCatalog = mockk { - every { buildTable(streamDescriptor.toIcebergTableIdentifier(), any()) } returns - tableBuilder + every { + buildTable(tableIdGenerator.toTableIdentifier(streamDescriptor), any()) + } returns tableBuilder every { namespaceExists(any()) } returns true - every { tableExists(streamDescriptor.toIcebergTableIdentifier()) } returns false + every { tableExists(tableIdGenerator.toTableIdentifier(streamDescriptor)) } returns + false } val table = icebergUtil.createTable( @@ -133,7 +141,9 @@ internal class IcebergUtilTest { ) assertNotNull(table) verify(exactly = 0) { - catalog.createNamespace(streamDescriptor.toIcebergTableIdentifier().namespace()) + catalog.createNamespace( + tableIdGenerator.toTableIdentifier(streamDescriptor).namespace() + ) } verify(exactly = 1) { tableBuilder.create() } } @@ -144,9 +154,10 @@ internal class IcebergUtilTest { val streamDescriptor = DestinationStream.Descriptor("namespace", "name") val schema = Schema() val catalog: NessieCatalog = mockk { - every { loadTable(streamDescriptor.toIcebergTableIdentifier()) } returns mockk() + every { loadTable(tableIdGenerator.toTableIdentifier(streamDescriptor)) } returns + mockk() every { namespaceExists(any()) } returns true - every { tableExists(streamDescriptor.toIcebergTableIdentifier()) } returns true + every { tableExists(tableIdGenerator.toTableIdentifier(streamDescriptor)) } returns true } val table = icebergUtil.createTable( @@ -157,9 +168,13 @@ internal class IcebergUtilTest { ) assertNotNull(table) verify(exactly = 0) { - catalog.createNamespace(streamDescriptor.toIcebergTableIdentifier().namespace()) + catalog.createNamespace( + tableIdGenerator.toTableIdentifier(streamDescriptor).namespace() + ) + } + verify(exactly = 1) { + catalog.loadTable(tableIdGenerator.toTableIdentifier(streamDescriptor)) } - verify(exactly = 1) { catalog.loadTable(streamDescriptor.toIcebergTableIdentifier()) } } @Test