From 9a6881d7c13c485fe1037caf18408a7a780b4d19 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Fri, 13 Dec 2024 14:42:14 -0800 Subject: [PATCH 01/15] log exception correctly --- .../kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt index 8a01b0685b60..b88d5b4dd089 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt @@ -148,7 +148,7 @@ class DefaultDestinationTaskLauncher( log.info { "Task $innerTask was cancelled." } throw e } catch (e: Exception) { - log.error { "Caught exception in task $innerTask: $e" } + log.error(e) { "Caught exception in task $innerTask" } handleException(e) } } From 7c6699ecbf6aa36ba65b777924dfda04c93b37f1 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Fri, 13 Dec 2024 14:42:34 -0800 Subject: [PATCH 02/15] start writing things --- .../iceberg/v2/IcebergV2CheckTest.kt | 4 ++-- .../iceberg/v2/IcebergV2TestUtil.kt | 12 +++++----- .../iceberg/v2/IcebergV2WriteTest.kt | 23 +++++++++++++++++-- 3 files changed, 29 insertions(+), 10 deletions(-) diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2CheckTest.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2CheckTest.kt index 3c619ea0ea98..5ccc6136e541 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2CheckTest.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2CheckTest.kt @@ -6,13 +6,13 @@ package io.airbyte.integrations.destination.iceberg.v2 import io.airbyte.cdk.load.check.CheckIntegrationTest import io.airbyte.cdk.load.check.CheckTestConfig -import io.airbyte.integrations.destination.iceberg.v2.IcebergV2TestUtil.PATH +import io.airbyte.integrations.destination.iceberg.v2.IcebergV2TestUtil.MINIMAL_CONFIG_PATH import org.junit.jupiter.api.Disabled @Disabled class IcebergV2CheckTest : CheckIntegrationTest( - successConfigFilenames = listOf(CheckTestConfig(PATH)), + successConfigFilenames = listOf(CheckTestConfig(MINIMAL_CONFIG_PATH)), // TODO we maybe should add some configs that are expected to fail `check` failConfigFilenamesAndFailureReasons = mapOf(), ) diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2TestUtil.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2TestUtil.kt index 8de9fa59ca90..37a865ace206 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2TestUtil.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2TestUtil.kt @@ -4,15 +4,15 @@ package io.airbyte.integrations.destination.iceberg.v2 -import java.nio.file.Files +import java.net.URI import java.nio.file.Path object IcebergV2TestUtil { // TODO this is just here as an example, we should remove it + add real configs - private val resource = - this::class.java.classLoader.getResource("iceberg_dest_v2_minimal_required_config.json") - ?: throw IllegalArgumentException("File not found in resources") - val PATH: Path = Path.of(resource.toURI()) + val MINIMAL_CONFIG_PATH: Path = Path.of(getResourceUri("iceberg_dest_v2_minimal_required_config.json")) + val GLUE_CONFIG_PATH: Path = Path.of("secrets/glue.json") - fun getConfig(configPath: String): String = Files.readString(Path.of(configPath)) + private fun getResourceUri(path: String): URI = + this::class.java.classLoader.getResource(path)?.toURI() + ?: throw IllegalArgumentException("File not found in resources") } diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriteTest.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriteTest.kt index 1f227440dfc9..3c82d0115e1d 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriteTest.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriteTest.kt @@ -6,6 +6,7 @@ package io.airbyte.integrations.destination.iceberg.v2 import com.fasterxml.jackson.databind.JsonNode import com.fasterxml.jackson.databind.ObjectMapper +import io.airbyte.cdk.load.test.util.DestinationCleaner import io.airbyte.cdk.load.test.util.NoopDestinationCleaner import io.airbyte.cdk.load.test.util.NoopExpectedRecordMapper import io.airbyte.cdk.load.write.BasicFunctionalityIntegrationTest @@ -15,8 +16,12 @@ import okhttp3.* import org.junit.jupiter.api.BeforeAll import org.junit.jupiter.api.Disabled import org.junit.jupiter.api.Test +import java.nio.file.Files -abstract class IcebergV2WriteTest(configContents: String) : +abstract class IcebergV2WriteTest( + configContents: String, + destinationCleaner: DestinationCleaner, +) : BasicFunctionalityIntegrationTest( configContents, IcebergV2Specification::class.java, @@ -41,10 +46,24 @@ abstract class IcebergV2WriteTest(configContents: String) : } } +class IcebergGlueWriteTest : IcebergV2WriteTest( + Files.readString(IcebergV2TestUtil.GLUE_CONFIG_PATH), + NoopDestinationCleaner, +) { + @Test + override fun testBasicWrite() { + super.testBasicWrite() + } +} + @Disabled( "This is currently disabled until we are able to make it run via airbyte-ci. It works as expected locally" ) -class IcebergNessieMinioWriteTest : IcebergV2WriteTest(getConfig()) { +class IcebergNessieMinioWriteTest : IcebergV2WriteTest( + getConfig(), + // we're writing to ephemeral testcontainers, so no need to clean up after ourselves + NoopDestinationCleaner +) { @Test @Disabled( "Expected because we seem to be mapping timestamps to long when we should be mapping them to an OffsetDateTime" From 37c106242bc3934bd2f847a9932f87f2b5d62371 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Fri, 13 Dec 2024 16:42:05 -0800 Subject: [PATCH 03/15] add table id generator --- .../iceberg/v2/IcebergV2Checker.kt | 6 +-- .../iceberg/v2/TableIdGenerator.kt | 44 +++++++++++++++++++ .../destination/iceberg/v2/io/IcebergUtil.kt | 15 ++----- .../iceberg/v2/IcebergV2DataDumper.kt | 2 +- .../iceberg/v2/io/IcebergUtilTest.kt | 24 +++++----- 5 files changed, 64 insertions(+), 27 deletions(-) create mode 100644 airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/TableIdGenerator.kt diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2Checker.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2Checker.kt index bfa977196f4e..d790c27560cd 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2Checker.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2Checker.kt @@ -8,7 +8,6 @@ import io.airbyte.cdk.load.check.DestinationChecker import io.airbyte.cdk.load.command.DestinationStream import io.airbyte.integrations.destination.iceberg.v2.io.IcebergTableCleaner import io.airbyte.integrations.destination.iceberg.v2.io.IcebergUtil -import io.airbyte.integrations.destination.iceberg.v2.io.toIcebergTableIdentifier import javax.inject.Singleton import org.apache.iceberg.Schema import org.apache.iceberg.types.Types @@ -16,7 +15,8 @@ import org.apache.iceberg.types.Types @Singleton class IcebergV2Checker( private val icebergTableCleaner: IcebergTableCleaner, - private val icebergUtil: IcebergUtil + private val icebergUtil: IcebergUtil, + private val tableIdGenerator: TableIdGenerator, ) : DestinationChecker { override fun check(config: IcebergV2Configuration) { @@ -43,7 +43,7 @@ class IcebergV2Checker( icebergTableCleaner.clearTable( catalog, - testTableIdentifier.toIcebergTableIdentifier(), + tableIdGenerator.toTableIdentifier(testTableIdentifier), table.io(), table.location() ) diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/TableIdGenerator.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/TableIdGenerator.kt new file mode 100644 index 000000000000..5222c7a12d41 --- /dev/null +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/TableIdGenerator.kt @@ -0,0 +1,44 @@ +package io.airbyte.integrations.destination.iceberg.v2 + +import io.airbyte.cdk.load.command.DestinationStream +import io.airbyte.cdk.load.command.iceberg.parquet.GlueCatalogConfiguration +import io.micronaut.context.annotation.Factory +import javax.inject.Singleton +import org.apache.iceberg.catalog.Namespace +import org.apache.iceberg.catalog.TableIdentifier + +/** + * Convert our internal stream descriptor to an Iceberg [TableIdentifier]. + * Implementations should handle catalog-specific naming restrictions. + */ +// TODO accept default namespace in config as a val here +interface TableIdGenerator { + fun toTableIdentifier(stream: DestinationStream.Descriptor): TableIdentifier +} + +class SimpleTableIdGenerator : TableIdGenerator { + override fun toTableIdentifier(stream: DestinationStream.Descriptor): TableIdentifier = + tableIdOf(stream.namespace!!, stream.name) +} + +/** + * AWS Glue requires lowercase database+table names. + */ +class GlueTableIdGenerator : TableIdGenerator { + override fun toTableIdentifier(stream: DestinationStream.Descriptor): TableIdentifier = + tableIdOf(stream.namespace!!.lowercase(), stream.name.lowercase()) +} + +@Factory +class TableIdGeneratorFactory(private val icebergConfiguration: IcebergV2Configuration) { + @Singleton + fun create() = + when (icebergConfiguration.icebergCatalogConfiguration.catalogConfiguration) { + is GlueCatalogConfiguration -> GlueTableIdGenerator() + else -> SimpleTableIdGenerator() + } +} + +// iceberg namespace+name must both be nonnull. +private fun tableIdOf(namespace: String, name: String) = + TableIdentifier.of(Namespace.of(namespace), name) diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtil.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtil.kt index 3771af1b36ac..e516b02ef1ca 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtil.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtil.kt @@ -20,6 +20,7 @@ import io.airbyte.integrations.destination.iceberg.v2.ACCESS_KEY_ID import io.airbyte.integrations.destination.iceberg.v2.GlueCredentialsProvider import io.airbyte.integrations.destination.iceberg.v2.IcebergV2Configuration import io.airbyte.integrations.destination.iceberg.v2.SECRET_ACCESS_KEY +import io.airbyte.integrations.destination.iceberg.v2.TableIdGenerator import io.github.oshai.kotlinlogging.KotlinLogging import jakarta.inject.Singleton import org.apache.hadoop.conf.Configuration @@ -51,19 +52,9 @@ private val logger = KotlinLogging.logger {} const val AIRBYTE_CDC_DELETE_COLUMN = "_ab_cdc_deleted_at" -/** - * Extension function for the[DestinationStream.Descriptor] class that converts the descriptor to an - * Iceberg [TableIdentifier]. - * - * @return An Iceberg [TableIdentifier] representation of the stream descriptor. - */ -fun DestinationStream.Descriptor.toIcebergTableIdentifier(): TableIdentifier { - return TableIdentifier.of(Namespace.of(this.namespace), this.name) -} - /** Collection of Iceberg related utilities. */ @Singleton -class IcebergUtil { +class IcebergUtil(private val tableIdGenerator: TableIdGenerator) { internal class InvalidFormatException(message: String) : Exception(message) private val generationIdRegex = Regex("""ab-generation-id-\d+-e""") @@ -117,7 +108,7 @@ class IcebergUtil { schema: Schema, properties: Map ): Table { - val tableIdentifier = streamDescriptor.toIcebergTableIdentifier() + val tableIdentifier = tableIdGenerator.toTableIdentifier(streamDescriptor) synchronized(tableIdentifier.namespace()) { if ( catalog is SupportsNamespaces && diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2DataDumper.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2DataDumper.kt index 5689320eedbb..5bf76c748afd 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2DataDumper.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2DataDumper.kt @@ -133,7 +133,7 @@ object IcebergV2DataDumper : DestinationDataDumper { } private fun getNessieCatalog(config: IcebergV2Configuration): NessieCatalog { - val catalogProperties = IcebergUtil().toCatalogProperties(config) + val catalogProperties = IcebergUtil(SimpleTableIdGenerator()).toCatalogProperties(config) val catalog = NessieCatalog() catalog.setConf(Configuration()) diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtilTest.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtilTest.kt index 82279fbd0bef..fbd85281e0b3 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtilTest.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtilTest.kt @@ -27,6 +27,7 @@ import io.airbyte.cdk.load.message.DestinationRecord.Meta.Companion.COLUMN_NAME_ import io.airbyte.cdk.load.message.DestinationRecord.Meta.Companion.COLUMN_NAME_AB_META import io.airbyte.cdk.load.message.DestinationRecord.Meta.Companion.COLUMN_NAME_AB_RAW_ID import io.airbyte.integrations.destination.iceberg.v2.IcebergV2Configuration +import io.airbyte.integrations.destination.iceberg.v2.SimpleTableIdGenerator import io.mockk.every import io.mockk.mockk import io.mockk.verify @@ -52,10 +53,11 @@ import org.junit.jupiter.api.assertThrows internal class IcebergUtilTest { private lateinit var icebergUtil: IcebergUtil + private val tableIdGenerator = SimpleTableIdGenerator() @BeforeEach fun setup() { - icebergUtil = IcebergUtil() + icebergUtil = IcebergUtil(tableIdGenerator) } @Test @@ -86,11 +88,11 @@ internal class IcebergUtilTest { every { create() } returns mockk() } val catalog: NessieCatalog = mockk { - every { buildTable(streamDescriptor.toIcebergTableIdentifier(), any()) } returns + every { buildTable(tableIdGenerator.toTableIdentifier(streamDescriptor), any()) } returns tableBuilder every { createNamespace(any()) } returns Unit every { namespaceExists(any()) } returns false - every { tableExists(streamDescriptor.toIcebergTableIdentifier()) } returns false + every { tableExists(tableIdGenerator.toTableIdentifier(streamDescriptor)) } returns false } val table = icebergUtil.createTable( @@ -101,7 +103,7 @@ internal class IcebergUtilTest { ) assertNotNull(table) verify(exactly = 1) { - catalog.createNamespace(streamDescriptor.toIcebergTableIdentifier().namespace()) + catalog.createNamespace(tableIdGenerator.toTableIdentifier(streamDescriptor).namespace()) } verify(exactly = 1) { tableBuilder.create() } } @@ -119,10 +121,10 @@ internal class IcebergUtilTest { every { create() } returns mockk() } val catalog: NessieCatalog = mockk { - every { buildTable(streamDescriptor.toIcebergTableIdentifier(), any()) } returns + every { buildTable(tableIdGenerator.toTableIdentifier(streamDescriptor), any()) } returns tableBuilder every { namespaceExists(any()) } returns true - every { tableExists(streamDescriptor.toIcebergTableIdentifier()) } returns false + every { tableExists(tableIdGenerator.toTableIdentifier(streamDescriptor)) } returns false } val table = icebergUtil.createTable( @@ -133,7 +135,7 @@ internal class IcebergUtilTest { ) assertNotNull(table) verify(exactly = 0) { - catalog.createNamespace(streamDescriptor.toIcebergTableIdentifier().namespace()) + catalog.createNamespace(tableIdGenerator.toTableIdentifier(streamDescriptor).namespace()) } verify(exactly = 1) { tableBuilder.create() } } @@ -144,9 +146,9 @@ internal class IcebergUtilTest { val streamDescriptor = DestinationStream.Descriptor("namespace", "name") val schema = Schema() val catalog: NessieCatalog = mockk { - every { loadTable(streamDescriptor.toIcebergTableIdentifier()) } returns mockk() + every { loadTable(tableIdGenerator.toTableIdentifier(streamDescriptor)) } returns mockk() every { namespaceExists(any()) } returns true - every { tableExists(streamDescriptor.toIcebergTableIdentifier()) } returns true + every { tableExists(tableIdGenerator.toTableIdentifier(streamDescriptor)) } returns true } val table = icebergUtil.createTable( @@ -157,9 +159,9 @@ internal class IcebergUtilTest { ) assertNotNull(table) verify(exactly = 0) { - catalog.createNamespace(streamDescriptor.toIcebergTableIdentifier().namespace()) + catalog.createNamespace(tableIdGenerator.toTableIdentifier(streamDescriptor).namespace()) } - verify(exactly = 1) { catalog.loadTable(streamDescriptor.toIcebergTableIdentifier()) } + verify(exactly = 1) { catalog.loadTable(tableIdGenerator.toTableIdentifier(streamDescriptor)) } } @Test From bdfd7ce64e10af321235793f5b9c5568e58b5216 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Mon, 16 Dec 2024 10:03:27 -0800 Subject: [PATCH 04/15] basic cleaner --- .../cdk/load/test/util/IntegrationTest.kt | 6 ++- .../iceberg/v2/IcebergDestinationCleaner.kt | 43 +++++++++++++++++++ .../iceberg/v2/IcebergV2TestUtil.kt | 7 +++ .../iceberg/v2/IcebergV2WriteTest.kt | 26 +++++------ 4 files changed, 67 insertions(+), 15 deletions(-) create mode 100644 airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergDestinationCleaner.kt diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/IntegrationTest.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/IntegrationTest.kt index a8e91bf95cd6..3554e76fc79a 100644 --- a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/IntegrationTest.kt +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/IntegrationTest.kt @@ -62,8 +62,7 @@ abstract class IntegrationTest( @Suppress("DEPRECATION") private val randomSuffix = RandomStringUtils.randomAlphabetic(4) private val timestampString = - LocalDateTime.ofInstant(Instant.now(), ZoneOffset.UTC) - .format(DateTimeFormatter.ofPattern("YYYYMMDD")) + LocalDateTime.ofInstant(Instant.now(), ZoneOffset.UTC).format(randomizedNamespaceDateFormatter) // stream name doesn't need to be randomized, only the namespace. val randomizedNamespace = "test$timestampString$randomSuffix" @@ -262,6 +261,9 @@ abstract class IntegrationTest( } companion object { + val randomizedNamespaceRegex = Regex("test(\\d{8})[A-Za-z]{4}") + val randomizedNamespaceDateFormatter = DateTimeFormatter.ofPattern("yyyyMMdd") + private val hasRunCleaner = AtomicBoolean(false) // Connectors are calling System.getenv rather than using micronaut-y properties, diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergDestinationCleaner.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergDestinationCleaner.kt new file mode 100644 index 000000000000..4dd84b9a3797 --- /dev/null +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergDestinationCleaner.kt @@ -0,0 +1,43 @@ +package io.airbyte.integrations.destination.iceberg.v2 + +import io.airbyte.cdk.load.test.util.DestinationCleaner +import io.airbyte.cdk.load.test.util.IntegrationTest.Companion.randomizedNamespaceDateFormatter +import io.airbyte.cdk.load.test.util.IntegrationTest.Companion.randomizedNamespaceRegex +import io.airbyte.integrations.destination.iceberg.v2.io.IcebergUtil +import java.time.LocalDate +import org.apache.iceberg.catalog.Catalog +import org.apache.iceberg.catalog.Namespace +import org.apache.iceberg.catalog.SupportsNamespaces + +class IcebergDestinationCleaner(private val catalog: Catalog) : DestinationCleaner { + constructor(configuration: IcebergV2Configuration) : this( + IcebergUtil(SimpleTableIdGenerator()).let { icebergUtil -> + val props = icebergUtil.toCatalogProperties(configuration) + icebergUtil.createCatalog(DEFAULT_CATALOG_NAME, props) + } + ) + + override fun cleanup() { + val cleanupCutoffDate = LocalDate.now().minusDays(15) + val namespaces: List = + (catalog as SupportsNamespaces) + .listNamespaces() + .filter { + val matchResult = randomizedNamespaceRegex.find(it.level(0)) + if (matchResult == null) { + false + } else { + val namespaceCreationDate = LocalDate.parse( + matchResult.groupValues[1], + randomizedNamespaceDateFormatter + ) + namespaceCreationDate.isBefore(cleanupCutoffDate) + } + } + namespaces.forEach { namespace -> + catalog.listTables(namespace) + .forEach { table -> catalog.dropTable(table, /* purge = */ true) } + catalog.dropNamespace(namespace) + } + } +} diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2TestUtil.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2TestUtil.kt index 37a865ace206..f87741015f4f 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2TestUtil.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2TestUtil.kt @@ -4,7 +4,9 @@ package io.airbyte.integrations.destination.iceberg.v2 +import io.airbyte.cdk.command.ValidatedJsonUtils import java.net.URI +import java.nio.file.Files import java.nio.file.Path object IcebergV2TestUtil { @@ -12,6 +14,11 @@ object IcebergV2TestUtil { val MINIMAL_CONFIG_PATH: Path = Path.of(getResourceUri("iceberg_dest_v2_minimal_required_config.json")) val GLUE_CONFIG_PATH: Path = Path.of("secrets/glue.json") + fun parseConfig(path: Path) = + IcebergV2ConfigurationFactory().makeWithoutExceptionHandling( + ValidatedJsonUtils.parseOne(IcebergV2Specification::class.java, Files.readString(path)) + ) + private fun getResourceUri(path: String): URI = this::class.java.classLoader.getResource(path)?.toURI() ?: throw IllegalArgumentException("File not found in resources") diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriteTest.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriteTest.kt index 3c82d0115e1d..fe9f19408484 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriteTest.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriteTest.kt @@ -11,12 +11,14 @@ import io.airbyte.cdk.load.test.util.NoopDestinationCleaner import io.airbyte.cdk.load.test.util.NoopExpectedRecordMapper import io.airbyte.cdk.load.write.BasicFunctionalityIntegrationTest import io.airbyte.cdk.load.write.StronglyTyped -import java.util.Base64 -import okhttp3.* +import okhttp3.FormBody +import okhttp3.OkHttpClient +import okhttp3.Request import org.junit.jupiter.api.BeforeAll import org.junit.jupiter.api.Disabled import org.junit.jupiter.api.Test import java.nio.file.Files +import java.util.Base64 abstract class IcebergV2WriteTest( configContents: String, @@ -26,7 +28,7 @@ abstract class IcebergV2WriteTest( configContents, IcebergV2Specification::class.java, IcebergV2DataDumper, - NoopDestinationCleaner, + destinationCleaner, NoopExpectedRecordMapper, isStreamSchemaRetroactive = true, supportsDedup = false, @@ -36,19 +38,11 @@ abstract class IcebergV2WriteTest( commitDataIncrementally = false, supportFileTransfer = false, allTypesBehavior = StronglyTyped(), - ) { - companion object { - @JvmStatic - @BeforeAll - fun setup() { - NessieTestContainers.start() - } - } -} + ) class IcebergGlueWriteTest : IcebergV2WriteTest( Files.readString(IcebergV2TestUtil.GLUE_CONFIG_PATH), - NoopDestinationCleaner, + IcebergDestinationCleaner(IcebergV2TestUtil.parseConfig(IcebergV2TestUtil.GLUE_CONFIG_PATH)), ) { @Test override fun testBasicWrite() { @@ -159,5 +153,11 @@ class IcebergNessieMinioWriteTest : IcebergV2WriteTest( } """.trimIndent() } + + @JvmStatic + @BeforeAll + fun setup() { + NessieTestContainers.start() + } } } From 77b4855b8d265bc8e6aa1e82d8176885a5821d3e Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Mon, 16 Dec 2024 10:22:42 -0800 Subject: [PATCH 05/15] get correct catalog instance --- .../iceberg/v2/IcebergV2DataDumper.kt | 21 ++++++------------- .../iceberg/v2/IcebergV2TestUtil.kt | 15 ++++++++++--- 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2DataDumper.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2DataDumper.kt index 5bf76c748afd..d3fe9f0173a9 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2DataDumper.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2DataDumper.kt @@ -83,15 +83,13 @@ object IcebergV2DataDumper : DestinationDataDumper { spec: ConfigurationSpecification, stream: DestinationStream ): List { - val config = - IcebergV2ConfigurationFactory() - .makeWithoutExceptionHandling(spec as IcebergV2Specification) + val config = IcebergV2TestUtil.getConfig(spec) val pipeline = ParquetMapperPipelineFactory().create(stream) val schema = pipeline.finalSchema as ObjectType - val catalog = getNessieCatalog(config) + val catalog = IcebergV2TestUtil.getCatalog(config) val table = catalog.loadTable( - TableIdentifier.of(stream.descriptor.namespace, stream.descriptor.name) + TableIdGeneratorFactory(config).create().toTableIdentifier(stream.descriptor) ) val outputRecords = mutableListOf() @@ -121,7 +119,9 @@ object IcebergV2DataDumper : DestinationDataDumper { } } - catalog.close() + if (catalog is AutoCloseable) { + catalog.close() + } return outputRecords } @@ -131,13 +131,4 @@ object IcebergV2DataDumper : DestinationDataDumper { ): List { throw NotImplementedError("Iceberg doesn't support universal file transfer") } - - private fun getNessieCatalog(config: IcebergV2Configuration): NessieCatalog { - val catalogProperties = IcebergUtil(SimpleTableIdGenerator()).toCatalogProperties(config) - - val catalog = NessieCatalog() - catalog.setConf(Configuration()) - catalog.initialize("nessie", catalogProperties) - return catalog - } } diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2TestUtil.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2TestUtil.kt index f87741015f4f..07d00365d322 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2TestUtil.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2TestUtil.kt @@ -4,7 +4,9 @@ package io.airbyte.integrations.destination.iceberg.v2 +import io.airbyte.cdk.command.ConfigurationSpecification import io.airbyte.cdk.command.ValidatedJsonUtils +import io.airbyte.integrations.destination.iceberg.v2.io.IcebergUtil import java.net.URI import java.nio.file.Files import java.nio.file.Path @@ -15,9 +17,16 @@ object IcebergV2TestUtil { val GLUE_CONFIG_PATH: Path = Path.of("secrets/glue.json") fun parseConfig(path: Path) = - IcebergV2ConfigurationFactory().makeWithoutExceptionHandling( - ValidatedJsonUtils.parseOne(IcebergV2Specification::class.java, Files.readString(path)) - ) + getConfig(ValidatedJsonUtils.parseOne(IcebergV2Specification::class.java, Files.readString(path))) + + fun getConfig(spec: ConfigurationSpecification) = + IcebergV2ConfigurationFactory().makeWithoutExceptionHandling(spec as IcebergV2Specification) + + fun getCatalog(config: IcebergV2Configuration) = + IcebergUtil(SimpleTableIdGenerator()).let { icebergUtil -> + val props = icebergUtil.toCatalogProperties(config) + icebergUtil.createCatalog(DEFAULT_CATALOG_NAME, props) + } private fun getResourceUri(path: String): URI = this::class.java.classLoader.getResource(path)?.toURI() From 5a540c83cf3e2aade99864d7dd2d3fef8591ddca Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Mon, 16 Dec 2024 10:25:47 -0800 Subject: [PATCH 06/15] coment --- .../integrations/destination/iceberg/v2/IcebergV2DataDumper.kt | 1 + 1 file changed, 1 insertion(+) diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2DataDumper.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2DataDumper.kt index d3fe9f0173a9..5183c24a62a7 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2DataDumper.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2DataDumper.kt @@ -119,6 +119,7 @@ object IcebergV2DataDumper : DestinationDataDumper { } } + // some catalogs (e.g. Nessie) have a close() method. Call it here. if (catalog is AutoCloseable) { catalog.close() } From d7d7f405f6667bd138048049c3129910d5bd1077 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Mon, 16 Dec 2024 10:40:11 -0800 Subject: [PATCH 07/15] disable tests everywhere --- .../iceberg/v2/IcebergV2WriteTest.kt | 35 ++++++++----------- 1 file changed, 15 insertions(+), 20 deletions(-) diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriteTest.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriteTest.kt index fe9f19408484..5357710e1bbb 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriteTest.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriteTest.kt @@ -38,26 +38,7 @@ abstract class IcebergV2WriteTest( commitDataIncrementally = false, supportFileTransfer = false, allTypesBehavior = StronglyTyped(), - ) - -class IcebergGlueWriteTest : IcebergV2WriteTest( - Files.readString(IcebergV2TestUtil.GLUE_CONFIG_PATH), - IcebergDestinationCleaner(IcebergV2TestUtil.parseConfig(IcebergV2TestUtil.GLUE_CONFIG_PATH)), -) { - @Test - override fun testBasicWrite() { - super.testBasicWrite() - } -} - -@Disabled( - "This is currently disabled until we are able to make it run via airbyte-ci. It works as expected locally" -) -class IcebergNessieMinioWriteTest : IcebergV2WriteTest( - getConfig(), - // we're writing to ephemeral testcontainers, so no need to clean up after ourselves - NoopDestinationCleaner -) { + ) { @Test @Disabled( "Expected because we seem to be mapping timestamps to long when we should be mapping them to an OffsetDateTime" @@ -105,7 +86,21 @@ class IcebergNessieMinioWriteTest : IcebergV2WriteTest( override fun testUnions() { super.testUnions() } +} + +class IcebergGlueWriteTest : IcebergV2WriteTest( + Files.readString(IcebergV2TestUtil.GLUE_CONFIG_PATH), + IcebergDestinationCleaner(IcebergV2TestUtil.parseConfig(IcebergV2TestUtil.GLUE_CONFIG_PATH)), +) +@Disabled( + "This is currently disabled until we are able to make it run via airbyte-ci. It works as expected locally" +) +class IcebergNessieMinioWriteTest : IcebergV2WriteTest( + getConfig(), + // we're writing to ephemeral testcontainers, so no need to clean up after ourselves + NoopDestinationCleaner +) { companion object { private fun getToken(): String { val client = OkHttpClient() From 5c073f9b1becbfdf403e104b944418a112064776 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Mon, 16 Dec 2024 10:55:24 -0800 Subject: [PATCH 08/15] handle concurrent nonsense --- .../command/iceberg/parquet/IcebergCatalogSpecifications.kt | 2 +- .../integrations/destination/iceberg/v2/io/IcebergUtil.kt | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/command/iceberg/parquet/IcebergCatalogSpecifications.kt b/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/command/iceberg/parquet/IcebergCatalogSpecifications.kt index 49247d0c2100..6c93e70d1e6e 100644 --- a/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/command/iceberg/parquet/IcebergCatalogSpecifications.kt +++ b/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/command/iceberg/parquet/IcebergCatalogSpecifications.kt @@ -181,7 +181,7 @@ class GlueCatalogSpecification( @get:JsonPropertyDescription( "The AWS Account ID associated with the Glue service used by the Iceberg catalog." ) - @get:JsonProperty("glue_id") + @JsonProperty("glue_id") @JsonSchemaInject(json = """{"order":1}""") val glueId: String, ) : CatalogType(catalogType) diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtil.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtil.kt index e516b02ef1ca..e81def1da451 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtil.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtil.kt @@ -47,6 +47,7 @@ import org.apache.iceberg.catalog.TableIdentifier import org.apache.iceberg.data.Record import org.apache.iceberg.exceptions.AlreadyExistsException import org.projectnessie.client.NessieConfigConstants +import software.amazon.awssdk.services.glue.model.ConcurrentModificationException private val logger = KotlinLogging.logger {} @@ -126,6 +127,11 @@ class IcebergUtil(private val tableIdGenerator: TableIdGenerator) { logger.info { "Namespace '${tableIdentifier.namespace()}' was likely created by another thread during parallel operations." } + } catch (e: ConcurrentModificationException) { + // do the same for AWS Glue + logger.info { + "Namespace '${tableIdentifier.namespace()}' was likely created by another thread during parallel operations." + } } } } From a33418848a8bb99148c2932b482d407e70f4638f Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Mon, 16 Dec 2024 10:59:22 -0800 Subject: [PATCH 09/15] add secret --- .../destination-iceberg-v2/metadata.yaml | 54 +------------------ 1 file changed, 2 insertions(+), 52 deletions(-) diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/metadata.yaml b/airbyte-integrations/connectors/destination-iceberg-v2/metadata.yaml index 831b9aade4e0..f3cf9932cf12 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/metadata.yaml +++ b/airbyte-integrations/connectors/destination-iceberg-v2/metadata.yaml @@ -26,59 +26,9 @@ data: - suite: unitTests - suite: integrationTests testSecrets: - - name: SECRET_DESTINATION-S3-V2-MINIMAL-REQUIRED-CONFIG - fileName: s3_dest_v2_minimal_required_config.json + - name: SECRET_DESTINATION-ICEBERG_V2_S3_GLUE_CONFIG + fileName: glue.json secretStore: type: GSM alias: airbyte-connector-testing-secret-store -# - name: SECRET_DESTINATION-S3-V2-JSONL-ROOT-LEVEL-FLATTENING -# fileName: s3_dest_v2_jsonl_root_level_flattening_config.json -# secretStore: -# type: GSM -# alias: airbyte-connector-testing-secret-store -# - name: SECRET_DESTINATION-S3-V2-JSONL-GZIP -# fileName: s3_dest_v2_jsonl_gzip_config.json -# secretStore: -# type: GSM -# alias: airbyte-connector-testing-secret-store -# - name: SECRET_DESTINATION-S3-V2-JSONL-STAGING -# fileName: s3_dest_v2_jsonl_staging_config.json -# secretStore: -# type: GSM -# alias: airbyte-connector-testing-secret-store -# - name: SECRET_DESTINATION-S3-V2-CSV -# fileName: s3_dest_v2_csv_config.json -# secretStore: -# type: GSM -# alias: airbyte-connector-testing-secret-store -# - name: SECRET_DESTINATION-S3-V2-CSV-ROOT-LEVEL-FLATTENING -# fileName: s3_dest_v2_csv_root_level_flattening_config.json -# secretStore: -# type: GSM -# alias: airbyte-connector-testing-secret-store -# - name: SECRET_DESTINATION-S3-V2-CSV-GZIP -# fileName: s3_dest_v2_csv_gzip_config.json -# secretStore: -# type: GSM -# alias: airbyte-connector-testing-secret-store -# - name: SECRET_DESTINATION-S3-V2-AVRO -# fileName: s3_dest_v2_avro_config.json -# secretStore: -# type: GSM -# alias: airbyte-connector-testing-secret-store -# - name: SECRET_DESTINATION-S3-V2-AVRO-BZIP2 -# fileName: s3_dest_v2_avro_bzip2_config.json -# secretStore: -# type: GSM -# alias: airbyte-connector-testing-secret-store -# - name: SECRET_DESTINATION-S3-V2-PARQUET -# fileName: s3_dest_v2_parquet_config.json -# secretStore: -# type: GSM -# alias: airbyte-connector-testing-secret-store -# - name: SECRET_DESTINATION-S3-V2-PARQUET-SNAPPY -# fileName: s3_dest_v2_parquet_snappy_config.json -# secretStore: -# type: GSM -# alias: airbyte-connector-testing-secret-store metadataSpecVersion: "1.0" From 50e3b2ea2d78c4ae1dda2f19e6662297ef092634 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Mon, 16 Dec 2024 11:20:34 -0800 Subject: [PATCH 10/15] minor refactor --- .../cdk/load/test/util/DestinationCleaner.kt | 4 ++++ .../cdk/load/test/util/IntegrationTest.kt | 19 ++++++++++++++++++- .../iceberg/v2/IcebergDestinationCleaner.kt | 16 +++------------- 3 files changed, 25 insertions(+), 14 deletions(-) diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/DestinationCleaner.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/DestinationCleaner.kt index 1ab6c2e20f10..84375e28ea31 100644 --- a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/DestinationCleaner.kt +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/DestinationCleaner.kt @@ -8,6 +8,10 @@ fun interface DestinationCleaner { /** * Search the test destination for old test data and delete it. This should leave recent data * (e.g. from the last week) untouched, to avoid causing failures in actively-running tests. + * + * Implementers should generally list all namespaces in the destination, filter for namespace + * which match [IntegrationTest.randomizedNamespaceRegex], and then use + * [IntegrationTest.isNamespaceOld] to filter down to namespaces which can be deleted. */ fun cleanup() } diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/IntegrationTest.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/IntegrationTest.kt index 3554e76fc79a..509455c08435 100644 --- a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/IntegrationTest.kt +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/IntegrationTest.kt @@ -19,6 +19,7 @@ import io.airbyte.protocol.models.v0.AirbyteMessage import io.airbyte.protocol.models.v0.AirbyteStateMessage import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage.AirbyteStreamStatus import java.time.Instant +import java.time.LocalDate import java.time.LocalDateTime import java.time.ZoneOffset import java.time.format.DateTimeFormatter @@ -262,7 +263,23 @@ abstract class IntegrationTest( companion object { val randomizedNamespaceRegex = Regex("test(\\d{8})[A-Za-z]{4}") - val randomizedNamespaceDateFormatter = DateTimeFormatter.ofPattern("yyyyMMdd") + val randomizedNamespaceDateFormatter: DateTimeFormatter = + DateTimeFormatter.ofPattern("yyyyMMdd") + + /** + * Given a randomizedNamespace (such as `test20241216abcd`), return whether the namespace + * was created more than [retentionDays] days ago, and therefore should be deleted by a + * [DestinationCleaner]. + */ + fun isNamespaceOld(namespace: String, retentionDays: Long = 30): Boolean { + val cleanupCutoffDate = LocalDate.now().minusDays(retentionDays) + val matchResult = randomizedNamespaceRegex.find(namespace) + val namespaceCreationDate = LocalDate.parse( + matchResult!!.groupValues[1], + randomizedNamespaceDateFormatter + ) + return namespaceCreationDate.isBefore(cleanupCutoffDate) + } private val hasRunCleaner = AtomicBoolean(false) diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergDestinationCleaner.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergDestinationCleaner.kt index 4dd84b9a3797..d6f98c1ede40 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergDestinationCleaner.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergDestinationCleaner.kt @@ -1,10 +1,9 @@ package io.airbyte.integrations.destination.iceberg.v2 import io.airbyte.cdk.load.test.util.DestinationCleaner -import io.airbyte.cdk.load.test.util.IntegrationTest.Companion.randomizedNamespaceDateFormatter +import io.airbyte.cdk.load.test.util.IntegrationTest.Companion.isNamespaceOld import io.airbyte.cdk.load.test.util.IntegrationTest.Companion.randomizedNamespaceRegex import io.airbyte.integrations.destination.iceberg.v2.io.IcebergUtil -import java.time.LocalDate import org.apache.iceberg.catalog.Catalog import org.apache.iceberg.catalog.Namespace import org.apache.iceberg.catalog.SupportsNamespaces @@ -18,21 +17,12 @@ class IcebergDestinationCleaner(private val catalog: Catalog) : DestinationClean ) override fun cleanup() { - val cleanupCutoffDate = LocalDate.now().minusDays(15) val namespaces: List = (catalog as SupportsNamespaces) .listNamespaces() .filter { - val matchResult = randomizedNamespaceRegex.find(it.level(0)) - if (matchResult == null) { - false - } else { - val namespaceCreationDate = LocalDate.parse( - matchResult.groupValues[1], - randomizedNamespaceDateFormatter - ) - namespaceCreationDate.isBefore(cleanupCutoffDate) - } + val namespace = it.level(0) + randomizedNamespaceRegex.matches(namespace) && isNamespaceOld(namespace) } namespaces.forEach { namespace -> catalog.listTables(namespace) From 6107ab9e2bdd71308b86a32aa296495df8e1762c Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Mon, 16 Dec 2024 11:42:36 -0800 Subject: [PATCH 11/15] format --- .../cdk/load/test/util/IntegrationTest.kt | 9 +++-- .../iceberg/v2/TableIdGenerator.kt | 12 ++++--- .../destination/iceberg/v2/io/IcebergUtil.kt | 2 -- .../iceberg/v2/IcebergDestinationCleaner.kt | 23 +++++++----- .../iceberg/v2/IcebergV2DataDumper.kt | 4 --- .../iceberg/v2/IcebergV2TestUtil.kt | 7 ++-- .../iceberg/v2/IcebergV2WriteTest.kt | 26 ++++++++------ .../iceberg/v2/io/IcebergUtilTest.kt | 35 +++++++++++++------ 8 files changed, 69 insertions(+), 49 deletions(-) diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/IntegrationTest.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/IntegrationTest.kt index 509455c08435..56a7cdae05f0 100644 --- a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/IntegrationTest.kt +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/IntegrationTest.kt @@ -63,7 +63,8 @@ abstract class IntegrationTest( @Suppress("DEPRECATION") private val randomSuffix = RandomStringUtils.randomAlphabetic(4) private val timestampString = - LocalDateTime.ofInstant(Instant.now(), ZoneOffset.UTC).format(randomizedNamespaceDateFormatter) + LocalDateTime.ofInstant(Instant.now(), ZoneOffset.UTC) + .format(randomizedNamespaceDateFormatter) // stream name doesn't need to be randomized, only the namespace. val randomizedNamespace = "test$timestampString$randomSuffix" @@ -274,10 +275,8 @@ abstract class IntegrationTest( fun isNamespaceOld(namespace: String, retentionDays: Long = 30): Boolean { val cleanupCutoffDate = LocalDate.now().minusDays(retentionDays) val matchResult = randomizedNamespaceRegex.find(namespace) - val namespaceCreationDate = LocalDate.parse( - matchResult!!.groupValues[1], - randomizedNamespaceDateFormatter - ) + val namespaceCreationDate = + LocalDate.parse(matchResult!!.groupValues[1], randomizedNamespaceDateFormatter) return namespaceCreationDate.isBefore(cleanupCutoffDate) } diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/TableIdGenerator.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/TableIdGenerator.kt index 5222c7a12d41..c52dfd06f44a 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/TableIdGenerator.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/TableIdGenerator.kt @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + package io.airbyte.integrations.destination.iceberg.v2 import io.airbyte.cdk.load.command.DestinationStream @@ -8,8 +12,8 @@ import org.apache.iceberg.catalog.Namespace import org.apache.iceberg.catalog.TableIdentifier /** - * Convert our internal stream descriptor to an Iceberg [TableIdentifier]. - * Implementations should handle catalog-specific naming restrictions. + * Convert our internal stream descriptor to an Iceberg [TableIdentifier]. Implementations should + * handle catalog-specific naming restrictions. */ // TODO accept default namespace in config as a val here interface TableIdGenerator { @@ -21,9 +25,7 @@ class SimpleTableIdGenerator : TableIdGenerator { tableIdOf(stream.namespace!!, stream.name) } -/** - * AWS Glue requires lowercase database+table names. - */ +/** AWS Glue requires lowercase database+table names. */ class GlueTableIdGenerator : TableIdGenerator { override fun toTableIdentifier(stream: DestinationStream.Descriptor): TableIdentifier = tableIdOf(stream.namespace!!.lowercase(), stream.name.lowercase()) diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtil.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtil.kt index e81def1da451..b760d8bac44d 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtil.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtil.kt @@ -41,9 +41,7 @@ import org.apache.iceberg.aws.AwsProperties import org.apache.iceberg.aws.s3.S3FileIO import org.apache.iceberg.aws.s3.S3FileIOProperties import org.apache.iceberg.catalog.Catalog -import org.apache.iceberg.catalog.Namespace import org.apache.iceberg.catalog.SupportsNamespaces -import org.apache.iceberg.catalog.TableIdentifier import org.apache.iceberg.data.Record import org.apache.iceberg.exceptions.AlreadyExistsException import org.projectnessie.client.NessieConfigConstants diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergDestinationCleaner.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergDestinationCleaner.kt index d6f98c1ede40..118fa0a5d825 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergDestinationCleaner.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergDestinationCleaner.kt @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + package io.airbyte.integrations.destination.iceberg.v2 import io.airbyte.cdk.load.test.util.DestinationCleaner @@ -9,7 +13,9 @@ import org.apache.iceberg.catalog.Namespace import org.apache.iceberg.catalog.SupportsNamespaces class IcebergDestinationCleaner(private val catalog: Catalog) : DestinationCleaner { - constructor(configuration: IcebergV2Configuration) : this( + constructor( + configuration: IcebergV2Configuration + ) : this( IcebergUtil(SimpleTableIdGenerator()).let { icebergUtil -> val props = icebergUtil.toCatalogProperties(configuration) icebergUtil.createCatalog(DEFAULT_CATALOG_NAME, props) @@ -18,15 +24,14 @@ class IcebergDestinationCleaner(private val catalog: Catalog) : DestinationClean override fun cleanup() { val namespaces: List = - (catalog as SupportsNamespaces) - .listNamespaces() - .filter { - val namespace = it.level(0) - randomizedNamespaceRegex.matches(namespace) && isNamespaceOld(namespace) - } + (catalog as SupportsNamespaces).listNamespaces().filter { + val namespace = it.level(0) + randomizedNamespaceRegex.matches(namespace) && isNamespaceOld(namespace) + } namespaces.forEach { namespace -> - catalog.listTables(namespace) - .forEach { table -> catalog.dropTable(table, /* purge = */ true) } + catalog.listTables(namespace).forEach { table -> + catalog.dropTable(table, /* purge = */ true) + } catalog.dropNamespace(namespace) } } diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2DataDumper.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2DataDumper.kt index 5183c24a62a7..7a755a5d4695 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2DataDumper.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2DataDumper.kt @@ -11,17 +11,13 @@ import io.airbyte.cdk.load.data.parquet.ParquetMapperPipelineFactory import io.airbyte.cdk.load.message.DestinationRecord import io.airbyte.cdk.load.test.util.DestinationDataDumper import io.airbyte.cdk.load.test.util.OutputRecord -import io.airbyte.integrations.destination.iceberg.v2.io.IcebergUtil import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange import java.math.BigDecimal import java.time.Instant import java.util.LinkedHashMap import java.util.UUID -import org.apache.hadoop.conf.Configuration -import org.apache.iceberg.catalog.TableIdentifier import org.apache.iceberg.data.IcebergGenerics import org.apache.iceberg.data.Record -import org.apache.iceberg.nessie.NessieCatalog object IcebergV2DataDumper : DestinationDataDumper { diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2TestUtil.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2TestUtil.kt index 07d00365d322..613e86fb6c1b 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2TestUtil.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2TestUtil.kt @@ -13,11 +13,14 @@ import java.nio.file.Path object IcebergV2TestUtil { // TODO this is just here as an example, we should remove it + add real configs - val MINIMAL_CONFIG_PATH: Path = Path.of(getResourceUri("iceberg_dest_v2_minimal_required_config.json")) + val MINIMAL_CONFIG_PATH: Path = + Path.of(getResourceUri("iceberg_dest_v2_minimal_required_config.json")) val GLUE_CONFIG_PATH: Path = Path.of("secrets/glue.json") fun parseConfig(path: Path) = - getConfig(ValidatedJsonUtils.parseOne(IcebergV2Specification::class.java, Files.readString(path))) + getConfig( + ValidatedJsonUtils.parseOne(IcebergV2Specification::class.java, Files.readString(path)) + ) fun getConfig(spec: ConfigurationSpecification) = IcebergV2ConfigurationFactory().makeWithoutExceptionHandling(spec as IcebergV2Specification) diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriteTest.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriteTest.kt index 5357710e1bbb..f80b94b43240 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriteTest.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriteTest.kt @@ -11,14 +11,14 @@ import io.airbyte.cdk.load.test.util.NoopDestinationCleaner import io.airbyte.cdk.load.test.util.NoopExpectedRecordMapper import io.airbyte.cdk.load.write.BasicFunctionalityIntegrationTest import io.airbyte.cdk.load.write.StronglyTyped +import java.nio.file.Files +import java.util.Base64 import okhttp3.FormBody import okhttp3.OkHttpClient import okhttp3.Request import org.junit.jupiter.api.BeforeAll import org.junit.jupiter.api.Disabled import org.junit.jupiter.api.Test -import java.nio.file.Files -import java.util.Base64 abstract class IcebergV2WriteTest( configContents: String, @@ -88,19 +88,23 @@ abstract class IcebergV2WriteTest( } } -class IcebergGlueWriteTest : IcebergV2WriteTest( - Files.readString(IcebergV2TestUtil.GLUE_CONFIG_PATH), - IcebergDestinationCleaner(IcebergV2TestUtil.parseConfig(IcebergV2TestUtil.GLUE_CONFIG_PATH)), -) +class IcebergGlueWriteTest : + IcebergV2WriteTest( + Files.readString(IcebergV2TestUtil.GLUE_CONFIG_PATH), + IcebergDestinationCleaner( + IcebergV2TestUtil.parseConfig(IcebergV2TestUtil.GLUE_CONFIG_PATH) + ), + ) @Disabled( "This is currently disabled until we are able to make it run via airbyte-ci. It works as expected locally" ) -class IcebergNessieMinioWriteTest : IcebergV2WriteTest( - getConfig(), - // we're writing to ephemeral testcontainers, so no need to clean up after ourselves - NoopDestinationCleaner -) { +class IcebergNessieMinioWriteTest : + IcebergV2WriteTest( + getConfig(), + // we're writing to ephemeral testcontainers, so no need to clean up after ourselves + NoopDestinationCleaner + ) { companion object { private fun getToken(): String { val client = OkHttpClient() diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtilTest.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtilTest.kt index fbd85281e0b3..10ada5c71dec 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtilTest.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtilTest.kt @@ -88,11 +88,13 @@ internal class IcebergUtilTest { every { create() } returns mockk() } val catalog: NessieCatalog = mockk { - every { buildTable(tableIdGenerator.toTableIdentifier(streamDescriptor), any()) } returns - tableBuilder + every { + buildTable(tableIdGenerator.toTableIdentifier(streamDescriptor), any()) + } returns tableBuilder every { createNamespace(any()) } returns Unit every { namespaceExists(any()) } returns false - every { tableExists(tableIdGenerator.toTableIdentifier(streamDescriptor)) } returns false + every { tableExists(tableIdGenerator.toTableIdentifier(streamDescriptor)) } returns + false } val table = icebergUtil.createTable( @@ -103,7 +105,9 @@ internal class IcebergUtilTest { ) assertNotNull(table) verify(exactly = 1) { - catalog.createNamespace(tableIdGenerator.toTableIdentifier(streamDescriptor).namespace()) + catalog.createNamespace( + tableIdGenerator.toTableIdentifier(streamDescriptor).namespace() + ) } verify(exactly = 1) { tableBuilder.create() } } @@ -121,10 +125,12 @@ internal class IcebergUtilTest { every { create() } returns mockk() } val catalog: NessieCatalog = mockk { - every { buildTable(tableIdGenerator.toTableIdentifier(streamDescriptor), any()) } returns - tableBuilder + every { + buildTable(tableIdGenerator.toTableIdentifier(streamDescriptor), any()) + } returns tableBuilder every { namespaceExists(any()) } returns true - every { tableExists(tableIdGenerator.toTableIdentifier(streamDescriptor)) } returns false + every { tableExists(tableIdGenerator.toTableIdentifier(streamDescriptor)) } returns + false } val table = icebergUtil.createTable( @@ -135,7 +141,9 @@ internal class IcebergUtilTest { ) assertNotNull(table) verify(exactly = 0) { - catalog.createNamespace(tableIdGenerator.toTableIdentifier(streamDescriptor).namespace()) + catalog.createNamespace( + tableIdGenerator.toTableIdentifier(streamDescriptor).namespace() + ) } verify(exactly = 1) { tableBuilder.create() } } @@ -146,7 +154,8 @@ internal class IcebergUtilTest { val streamDescriptor = DestinationStream.Descriptor("namespace", "name") val schema = Schema() val catalog: NessieCatalog = mockk { - every { loadTable(tableIdGenerator.toTableIdentifier(streamDescriptor)) } returns mockk() + every { loadTable(tableIdGenerator.toTableIdentifier(streamDescriptor)) } returns + mockk() every { namespaceExists(any()) } returns true every { tableExists(tableIdGenerator.toTableIdentifier(streamDescriptor)) } returns true } @@ -159,9 +168,13 @@ internal class IcebergUtilTest { ) assertNotNull(table) verify(exactly = 0) { - catalog.createNamespace(tableIdGenerator.toTableIdentifier(streamDescriptor).namespace()) + catalog.createNamespace( + tableIdGenerator.toTableIdentifier(streamDescriptor).namespace() + ) + } + verify(exactly = 1) { + catalog.loadTable(tableIdGenerator.toTableIdentifier(streamDescriptor)) } - verify(exactly = 1) { catalog.loadTable(tableIdGenerator.toTableIdentifier(streamDescriptor)) } } @Test From ca1289560ec15cd340f2b307a8c4fedecd48675a Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Mon, 16 Dec 2024 12:50:04 -0800 Subject: [PATCH 12/15] remove redundant constructor --- .../iceberg/v2/IcebergDestinationCleaner.kt | 10 ---------- .../destination/iceberg/v2/IcebergV2WriteTest.kt | 4 +++- 2 files changed, 3 insertions(+), 11 deletions(-) diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergDestinationCleaner.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergDestinationCleaner.kt index 118fa0a5d825..4ee1c7e5f346 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergDestinationCleaner.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergDestinationCleaner.kt @@ -7,21 +7,11 @@ package io.airbyte.integrations.destination.iceberg.v2 import io.airbyte.cdk.load.test.util.DestinationCleaner import io.airbyte.cdk.load.test.util.IntegrationTest.Companion.isNamespaceOld import io.airbyte.cdk.load.test.util.IntegrationTest.Companion.randomizedNamespaceRegex -import io.airbyte.integrations.destination.iceberg.v2.io.IcebergUtil import org.apache.iceberg.catalog.Catalog import org.apache.iceberg.catalog.Namespace import org.apache.iceberg.catalog.SupportsNamespaces class IcebergDestinationCleaner(private val catalog: Catalog) : DestinationCleaner { - constructor( - configuration: IcebergV2Configuration - ) : this( - IcebergUtil(SimpleTableIdGenerator()).let { icebergUtil -> - val props = icebergUtil.toCatalogProperties(configuration) - icebergUtil.createCatalog(DEFAULT_CATALOG_NAME, props) - } - ) - override fun cleanup() { val namespaces: List = (catalog as SupportsNamespaces).listNamespaces().filter { diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriteTest.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriteTest.kt index f80b94b43240..573bd5e59ce7 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriteTest.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriteTest.kt @@ -92,7 +92,9 @@ class IcebergGlueWriteTest : IcebergV2WriteTest( Files.readString(IcebergV2TestUtil.GLUE_CONFIG_PATH), IcebergDestinationCleaner( - IcebergV2TestUtil.parseConfig(IcebergV2TestUtil.GLUE_CONFIG_PATH) + IcebergV2TestUtil.getCatalog( + IcebergV2TestUtil.parseConfig(IcebergV2TestUtil.GLUE_CONFIG_PATH) + ) ), ) From 16bf80f1fc79e10158de4a27f922c434b27c4e73 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Mon, 16 Dec 2024 12:50:36 -0800 Subject: [PATCH 13/15] also add to check test --- .../destination/iceberg/v2/IcebergV2CheckTest.kt | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2CheckTest.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2CheckTest.kt index 5ccc6136e541..5ca8e828dd39 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2CheckTest.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2CheckTest.kt @@ -6,13 +6,17 @@ package io.airbyte.integrations.destination.iceberg.v2 import io.airbyte.cdk.load.check.CheckIntegrationTest import io.airbyte.cdk.load.check.CheckTestConfig +import io.airbyte.integrations.destination.iceberg.v2.IcebergV2TestUtil.GLUE_CONFIG_PATH import io.airbyte.integrations.destination.iceberg.v2.IcebergV2TestUtil.MINIMAL_CONFIG_PATH import org.junit.jupiter.api.Disabled @Disabled class IcebergV2CheckTest : CheckIntegrationTest( - successConfigFilenames = listOf(CheckTestConfig(MINIMAL_CONFIG_PATH)), + successConfigFilenames = listOf( + CheckTestConfig(MINIMAL_CONFIG_PATH), + CheckTestConfig(GLUE_CONFIG_PATH), + ), // TODO we maybe should add some configs that are expected to fail `check` failConfigFilenamesAndFailureReasons = mapOf(), ) From e4abf3551611d68d78a43bee68dd69cea5b649a1 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Mon, 16 Dec 2024 12:57:58 -0800 Subject: [PATCH 14/15] add to check test + reenable --- .../destination/iceberg/v2/IcebergV2CheckTest.kt | 11 ++++------- .../destination/iceberg/v2/IcebergV2TestUtil.kt | 8 -------- .../iceberg_dest_v2_minimal_required_config.json | 7 ------- 3 files changed, 4 insertions(+), 22 deletions(-) delete mode 100644 airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/resources/iceberg_dest_v2_minimal_required_config.json diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2CheckTest.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2CheckTest.kt index 5ca8e828dd39..420bab9ef5a4 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2CheckTest.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2CheckTest.kt @@ -7,16 +7,13 @@ package io.airbyte.integrations.destination.iceberg.v2 import io.airbyte.cdk.load.check.CheckIntegrationTest import io.airbyte.cdk.load.check.CheckTestConfig import io.airbyte.integrations.destination.iceberg.v2.IcebergV2TestUtil.GLUE_CONFIG_PATH -import io.airbyte.integrations.destination.iceberg.v2.IcebergV2TestUtil.MINIMAL_CONFIG_PATH -import org.junit.jupiter.api.Disabled -@Disabled class IcebergV2CheckTest : CheckIntegrationTest( - successConfigFilenames = listOf( - CheckTestConfig(MINIMAL_CONFIG_PATH), - CheckTestConfig(GLUE_CONFIG_PATH), - ), + successConfigFilenames = + listOf( + CheckTestConfig(GLUE_CONFIG_PATH), + ), // TODO we maybe should add some configs that are expected to fail `check` failConfigFilenamesAndFailureReasons = mapOf(), ) diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2TestUtil.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2TestUtil.kt index 613e86fb6c1b..2da1841dbf0c 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2TestUtil.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2TestUtil.kt @@ -7,14 +7,10 @@ package io.airbyte.integrations.destination.iceberg.v2 import io.airbyte.cdk.command.ConfigurationSpecification import io.airbyte.cdk.command.ValidatedJsonUtils import io.airbyte.integrations.destination.iceberg.v2.io.IcebergUtil -import java.net.URI import java.nio.file.Files import java.nio.file.Path object IcebergV2TestUtil { - // TODO this is just here as an example, we should remove it + add real configs - val MINIMAL_CONFIG_PATH: Path = - Path.of(getResourceUri("iceberg_dest_v2_minimal_required_config.json")) val GLUE_CONFIG_PATH: Path = Path.of("secrets/glue.json") fun parseConfig(path: Path) = @@ -30,8 +26,4 @@ object IcebergV2TestUtil { val props = icebergUtil.toCatalogProperties(config) icebergUtil.createCatalog(DEFAULT_CATALOG_NAME, props) } - - private fun getResourceUri(path: String): URI = - this::class.java.classLoader.getResource(path)?.toURI() - ?: throw IllegalArgumentException("File not found in resources") } diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/resources/iceberg_dest_v2_minimal_required_config.json b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/resources/iceberg_dest_v2_minimal_required_config.json deleted file mode 100644 index eac8923528c5..000000000000 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/resources/iceberg_dest_v2_minimal_required_config.json +++ /dev/null @@ -1,7 +0,0 @@ -{ - "s3_bucket_name": "bucket", - "s3_bucket_region": "us-east-1", - "server_uri": "http://localhost:19120/api/v1", - "warehouse_location": "s3://demobucket/", - "main_branch_name": "main" -} From 6204548839d63980e6814dd65306cecbf7c53676 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Mon, 16 Dec 2024 14:18:47 -0800 Subject: [PATCH 15/15] version bump --- .../connectors/destination-iceberg-v2/metadata.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/metadata.yaml b/airbyte-integrations/connectors/destination-iceberg-v2/metadata.yaml index f3cf9932cf12..0a8bb55624cd 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/metadata.yaml +++ b/airbyte-integrations/connectors/destination-iceberg-v2/metadata.yaml @@ -2,7 +2,7 @@ data: connectorSubtype: file connectorType: destination definitionId: 37a928c1-2d5c-431a-a97d-ae236bd1ea0c - dockerImageTag: 0.1.14 + dockerImageTag: 0.1.15 dockerRepository: airbyte/destination-iceberg-v2 githubIssueLabel: destination-iceberg-v2 icon: icon.svg