diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/IntegrationTest.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/IntegrationTest.kt index 509455c08435..56a7cdae05f0 100644 --- a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/IntegrationTest.kt +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/IntegrationTest.kt @@ -63,7 +63,8 @@ abstract class IntegrationTest( @Suppress("DEPRECATION") private val randomSuffix = RandomStringUtils.randomAlphabetic(4) private val timestampString = - LocalDateTime.ofInstant(Instant.now(), ZoneOffset.UTC).format(randomizedNamespaceDateFormatter) + LocalDateTime.ofInstant(Instant.now(), ZoneOffset.UTC) + .format(randomizedNamespaceDateFormatter) // stream name doesn't need to be randomized, only the namespace. val randomizedNamespace = "test$timestampString$randomSuffix" @@ -274,10 +275,8 @@ abstract class IntegrationTest( fun isNamespaceOld(namespace: String, retentionDays: Long = 30): Boolean { val cleanupCutoffDate = LocalDate.now().minusDays(retentionDays) val matchResult = randomizedNamespaceRegex.find(namespace) - val namespaceCreationDate = LocalDate.parse( - matchResult!!.groupValues[1], - randomizedNamespaceDateFormatter - ) + val namespaceCreationDate = + LocalDate.parse(matchResult!!.groupValues[1], randomizedNamespaceDateFormatter) return namespaceCreationDate.isBefore(cleanupCutoffDate) } diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/TableIdGenerator.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/TableIdGenerator.kt index 5222c7a12d41..c52dfd06f44a 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/TableIdGenerator.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/TableIdGenerator.kt @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + package io.airbyte.integrations.destination.iceberg.v2 import io.airbyte.cdk.load.command.DestinationStream @@ -8,8 +12,8 @@ import org.apache.iceberg.catalog.Namespace import org.apache.iceberg.catalog.TableIdentifier /** - * Convert our internal stream descriptor to an Iceberg [TableIdentifier]. - * Implementations should handle catalog-specific naming restrictions. + * Convert our internal stream descriptor to an Iceberg [TableIdentifier]. Implementations should + * handle catalog-specific naming restrictions. */ // TODO accept default namespace in config as a val here interface TableIdGenerator { @@ -21,9 +25,7 @@ class SimpleTableIdGenerator : TableIdGenerator { tableIdOf(stream.namespace!!, stream.name) } -/** - * AWS Glue requires lowercase database+table names. - */ +/** AWS Glue requires lowercase database+table names. */ class GlueTableIdGenerator : TableIdGenerator { override fun toTableIdentifier(stream: DestinationStream.Descriptor): TableIdentifier = tableIdOf(stream.namespace!!.lowercase(), stream.name.lowercase()) diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtil.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtil.kt index e81def1da451..b760d8bac44d 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtil.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtil.kt @@ -41,9 +41,7 @@ import org.apache.iceberg.aws.AwsProperties import org.apache.iceberg.aws.s3.S3FileIO import org.apache.iceberg.aws.s3.S3FileIOProperties import org.apache.iceberg.catalog.Catalog -import org.apache.iceberg.catalog.Namespace import org.apache.iceberg.catalog.SupportsNamespaces -import org.apache.iceberg.catalog.TableIdentifier import org.apache.iceberg.data.Record import org.apache.iceberg.exceptions.AlreadyExistsException import org.projectnessie.client.NessieConfigConstants diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergDestinationCleaner.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergDestinationCleaner.kt index d6f98c1ede40..118fa0a5d825 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergDestinationCleaner.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergDestinationCleaner.kt @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + package io.airbyte.integrations.destination.iceberg.v2 import io.airbyte.cdk.load.test.util.DestinationCleaner @@ -9,7 +13,9 @@ import org.apache.iceberg.catalog.Namespace import org.apache.iceberg.catalog.SupportsNamespaces class IcebergDestinationCleaner(private val catalog: Catalog) : DestinationCleaner { - constructor(configuration: IcebergV2Configuration) : this( + constructor( + configuration: IcebergV2Configuration + ) : this( IcebergUtil(SimpleTableIdGenerator()).let { icebergUtil -> val props = icebergUtil.toCatalogProperties(configuration) icebergUtil.createCatalog(DEFAULT_CATALOG_NAME, props) @@ -18,15 +24,14 @@ class IcebergDestinationCleaner(private val catalog: Catalog) : DestinationClean override fun cleanup() { val namespaces: List = - (catalog as SupportsNamespaces) - .listNamespaces() - .filter { - val namespace = it.level(0) - randomizedNamespaceRegex.matches(namespace) && isNamespaceOld(namespace) - } + (catalog as SupportsNamespaces).listNamespaces().filter { + val namespace = it.level(0) + randomizedNamespaceRegex.matches(namespace) && isNamespaceOld(namespace) + } namespaces.forEach { namespace -> - catalog.listTables(namespace) - .forEach { table -> catalog.dropTable(table, /* purge = */ true) } + catalog.listTables(namespace).forEach { table -> + catalog.dropTable(table, /* purge = */ true) + } catalog.dropNamespace(namespace) } } diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2DataDumper.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2DataDumper.kt index 5183c24a62a7..7a755a5d4695 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2DataDumper.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2DataDumper.kt @@ -11,17 +11,13 @@ import io.airbyte.cdk.load.data.parquet.ParquetMapperPipelineFactory import io.airbyte.cdk.load.message.DestinationRecord import io.airbyte.cdk.load.test.util.DestinationDataDumper import io.airbyte.cdk.load.test.util.OutputRecord -import io.airbyte.integrations.destination.iceberg.v2.io.IcebergUtil import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange import java.math.BigDecimal import java.time.Instant import java.util.LinkedHashMap import java.util.UUID -import org.apache.hadoop.conf.Configuration -import org.apache.iceberg.catalog.TableIdentifier import org.apache.iceberg.data.IcebergGenerics import org.apache.iceberg.data.Record -import org.apache.iceberg.nessie.NessieCatalog object IcebergV2DataDumper : DestinationDataDumper { diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2TestUtil.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2TestUtil.kt index 07d00365d322..613e86fb6c1b 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2TestUtil.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2TestUtil.kt @@ -13,11 +13,14 @@ import java.nio.file.Path object IcebergV2TestUtil { // TODO this is just here as an example, we should remove it + add real configs - val MINIMAL_CONFIG_PATH: Path = Path.of(getResourceUri("iceberg_dest_v2_minimal_required_config.json")) + val MINIMAL_CONFIG_PATH: Path = + Path.of(getResourceUri("iceberg_dest_v2_minimal_required_config.json")) val GLUE_CONFIG_PATH: Path = Path.of("secrets/glue.json") fun parseConfig(path: Path) = - getConfig(ValidatedJsonUtils.parseOne(IcebergV2Specification::class.java, Files.readString(path))) + getConfig( + ValidatedJsonUtils.parseOne(IcebergV2Specification::class.java, Files.readString(path)) + ) fun getConfig(spec: ConfigurationSpecification) = IcebergV2ConfigurationFactory().makeWithoutExceptionHandling(spec as IcebergV2Specification) diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriteTest.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriteTest.kt index 5357710e1bbb..f80b94b43240 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriteTest.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriteTest.kt @@ -11,14 +11,14 @@ import io.airbyte.cdk.load.test.util.NoopDestinationCleaner import io.airbyte.cdk.load.test.util.NoopExpectedRecordMapper import io.airbyte.cdk.load.write.BasicFunctionalityIntegrationTest import io.airbyte.cdk.load.write.StronglyTyped +import java.nio.file.Files +import java.util.Base64 import okhttp3.FormBody import okhttp3.OkHttpClient import okhttp3.Request import org.junit.jupiter.api.BeforeAll import org.junit.jupiter.api.Disabled import org.junit.jupiter.api.Test -import java.nio.file.Files -import java.util.Base64 abstract class IcebergV2WriteTest( configContents: String, @@ -88,19 +88,23 @@ abstract class IcebergV2WriteTest( } } -class IcebergGlueWriteTest : IcebergV2WriteTest( - Files.readString(IcebergV2TestUtil.GLUE_CONFIG_PATH), - IcebergDestinationCleaner(IcebergV2TestUtil.parseConfig(IcebergV2TestUtil.GLUE_CONFIG_PATH)), -) +class IcebergGlueWriteTest : + IcebergV2WriteTest( + Files.readString(IcebergV2TestUtil.GLUE_CONFIG_PATH), + IcebergDestinationCleaner( + IcebergV2TestUtil.parseConfig(IcebergV2TestUtil.GLUE_CONFIG_PATH) + ), + ) @Disabled( "This is currently disabled until we are able to make it run via airbyte-ci. It works as expected locally" ) -class IcebergNessieMinioWriteTest : IcebergV2WriteTest( - getConfig(), - // we're writing to ephemeral testcontainers, so no need to clean up after ourselves - NoopDestinationCleaner -) { +class IcebergNessieMinioWriteTest : + IcebergV2WriteTest( + getConfig(), + // we're writing to ephemeral testcontainers, so no need to clean up after ourselves + NoopDestinationCleaner + ) { companion object { private fun getToken(): String { val client = OkHttpClient() diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtilTest.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtilTest.kt index fbd85281e0b3..10ada5c71dec 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtilTest.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtilTest.kt @@ -88,11 +88,13 @@ internal class IcebergUtilTest { every { create() } returns mockk() } val catalog: NessieCatalog = mockk { - every { buildTable(tableIdGenerator.toTableIdentifier(streamDescriptor), any()) } returns - tableBuilder + every { + buildTable(tableIdGenerator.toTableIdentifier(streamDescriptor), any()) + } returns tableBuilder every { createNamespace(any()) } returns Unit every { namespaceExists(any()) } returns false - every { tableExists(tableIdGenerator.toTableIdentifier(streamDescriptor)) } returns false + every { tableExists(tableIdGenerator.toTableIdentifier(streamDescriptor)) } returns + false } val table = icebergUtil.createTable( @@ -103,7 +105,9 @@ internal class IcebergUtilTest { ) assertNotNull(table) verify(exactly = 1) { - catalog.createNamespace(tableIdGenerator.toTableIdentifier(streamDescriptor).namespace()) + catalog.createNamespace( + tableIdGenerator.toTableIdentifier(streamDescriptor).namespace() + ) } verify(exactly = 1) { tableBuilder.create() } } @@ -121,10 +125,12 @@ internal class IcebergUtilTest { every { create() } returns mockk() } val catalog: NessieCatalog = mockk { - every { buildTable(tableIdGenerator.toTableIdentifier(streamDescriptor), any()) } returns - tableBuilder + every { + buildTable(tableIdGenerator.toTableIdentifier(streamDescriptor), any()) + } returns tableBuilder every { namespaceExists(any()) } returns true - every { tableExists(tableIdGenerator.toTableIdentifier(streamDescriptor)) } returns false + every { tableExists(tableIdGenerator.toTableIdentifier(streamDescriptor)) } returns + false } val table = icebergUtil.createTable( @@ -135,7 +141,9 @@ internal class IcebergUtilTest { ) assertNotNull(table) verify(exactly = 0) { - catalog.createNamespace(tableIdGenerator.toTableIdentifier(streamDescriptor).namespace()) + catalog.createNamespace( + tableIdGenerator.toTableIdentifier(streamDescriptor).namespace() + ) } verify(exactly = 1) { tableBuilder.create() } } @@ -146,7 +154,8 @@ internal class IcebergUtilTest { val streamDescriptor = DestinationStream.Descriptor("namespace", "name") val schema = Schema() val catalog: NessieCatalog = mockk { - every { loadTable(tableIdGenerator.toTableIdentifier(streamDescriptor)) } returns mockk() + every { loadTable(tableIdGenerator.toTableIdentifier(streamDescriptor)) } returns + mockk() every { namespaceExists(any()) } returns true every { tableExists(tableIdGenerator.toTableIdentifier(streamDescriptor)) } returns true } @@ -159,9 +168,13 @@ internal class IcebergUtilTest { ) assertNotNull(table) verify(exactly = 0) { - catalog.createNamespace(tableIdGenerator.toTableIdentifier(streamDescriptor).namespace()) + catalog.createNamespace( + tableIdGenerator.toTableIdentifier(streamDescriptor).namespace() + ) + } + verify(exactly = 1) { + catalog.loadTable(tableIdGenerator.toTableIdentifier(streamDescriptor)) } - verify(exactly = 1) { catalog.loadTable(tableIdGenerator.toTableIdentifier(streamDescriptor)) } } @Test