Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destination Iceberg: integration test for glue #49467

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ class DefaultDestinationTaskLauncher(
log.info { "Task $innerTask was cancelled." }
throw e
} catch (e: Exception) {
log.error { "Caught exception in task $innerTask: $e" }
log.error(e) { "Caught exception in task $innerTask" }
handleException(e)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ fun interface DestinationCleaner {
/**
* Search the test destination for old test data and delete it. This should leave recent data
* (e.g. from the last week) untouched, to avoid causing failures in actively-running tests.
*
* Implementers should generally list all namespaces in the destination, filter for namespace
* which match [IntegrationTest.randomizedNamespaceRegex], and then use
* [IntegrationTest.isNamespaceOld] to filter down to namespaces which can be deleted.
*/
fun cleanup()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.AirbyteStateMessage
import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage.AirbyteStreamStatus
import java.time.Instant
import java.time.LocalDate
import java.time.LocalDateTime
import java.time.ZoneOffset
import java.time.format.DateTimeFormatter
Expand Down Expand Up @@ -63,7 +64,7 @@ abstract class IntegrationTest(
@Suppress("DEPRECATION") private val randomSuffix = RandomStringUtils.randomAlphabetic(4)
private val timestampString =
LocalDateTime.ofInstant(Instant.now(), ZoneOffset.UTC)
.format(DateTimeFormatter.ofPattern("YYYYMMDD"))
.format(randomizedNamespaceDateFormatter)
// stream name doesn't need to be randomized, only the namespace.
val randomizedNamespace = "test$timestampString$randomSuffix"

Expand Down Expand Up @@ -262,6 +263,23 @@ abstract class IntegrationTest(
}

companion object {
val randomizedNamespaceRegex = Regex("test(\\d{8})[A-Za-z]{4}")
val randomizedNamespaceDateFormatter: DateTimeFormatter =
DateTimeFormatter.ofPattern("yyyyMMdd")

/**
* Given a randomizedNamespace (such as `test20241216abcd`), return whether the namespace
* was created more than [retentionDays] days ago, and therefore should be deleted by a
* [DestinationCleaner].
*/
fun isNamespaceOld(namespace: String, retentionDays: Long = 30): Boolean {
val cleanupCutoffDate = LocalDate.now().minusDays(retentionDays)
val matchResult = randomizedNamespaceRegex.find(namespace)
val namespaceCreationDate =
LocalDate.parse(matchResult!!.groupValues[1], randomizedNamespaceDateFormatter)
return namespaceCreationDate.isBefore(cleanupCutoffDate)
}

private val hasRunCleaner = AtomicBoolean(false)

// Connectors are calling System.getenv rather than using micronaut-y properties,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ class GlueCatalogSpecification(
@get:JsonPropertyDescription(
"The AWS Account ID associated with the Glue service used by the Iceberg catalog."
)
@get:JsonProperty("glue_id")
@JsonProperty("glue_id")
@JsonSchemaInject(json = """{"order":1}""")
val glueId: String,
) : CatalogType(catalogType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: file
connectorType: destination
definitionId: 37a928c1-2d5c-431a-a97d-ae236bd1ea0c
dockerImageTag: 0.1.12
dockerImageTag: 0.1.13
dockerRepository: airbyte/destination-iceberg-v2
githubIssueLabel: destination-iceberg-v2
icon: s3.svg
Expand All @@ -26,59 +26,9 @@ data:
- suite: unitTests
- suite: integrationTests
testSecrets:
- name: SECRET_DESTINATION-S3-V2-MINIMAL-REQUIRED-CONFIG
fileName: s3_dest_v2_minimal_required_config.json
- name: SECRET_DESTINATION-ICEBERG_V2_S3_GLUE_CONFIG
fileName: glue.json
secretStore:
type: GSM
alias: airbyte-connector-testing-secret-store
# - name: SECRET_DESTINATION-S3-V2-JSONL-ROOT-LEVEL-FLATTENING
# fileName: s3_dest_v2_jsonl_root_level_flattening_config.json
# secretStore:
# type: GSM
# alias: airbyte-connector-testing-secret-store
# - name: SECRET_DESTINATION-S3-V2-JSONL-GZIP
# fileName: s3_dest_v2_jsonl_gzip_config.json
# secretStore:
# type: GSM
# alias: airbyte-connector-testing-secret-store
# - name: SECRET_DESTINATION-S3-V2-JSONL-STAGING
# fileName: s3_dest_v2_jsonl_staging_config.json
# secretStore:
# type: GSM
# alias: airbyte-connector-testing-secret-store
# - name: SECRET_DESTINATION-S3-V2-CSV
# fileName: s3_dest_v2_csv_config.json
# secretStore:
# type: GSM
# alias: airbyte-connector-testing-secret-store
# - name: SECRET_DESTINATION-S3-V2-CSV-ROOT-LEVEL-FLATTENING
# fileName: s3_dest_v2_csv_root_level_flattening_config.json
# secretStore:
# type: GSM
# alias: airbyte-connector-testing-secret-store
# - name: SECRET_DESTINATION-S3-V2-CSV-GZIP
# fileName: s3_dest_v2_csv_gzip_config.json
# secretStore:
# type: GSM
# alias: airbyte-connector-testing-secret-store
# - name: SECRET_DESTINATION-S3-V2-AVRO
# fileName: s3_dest_v2_avro_config.json
# secretStore:
# type: GSM
# alias: airbyte-connector-testing-secret-store
# - name: SECRET_DESTINATION-S3-V2-AVRO-BZIP2
# fileName: s3_dest_v2_avro_bzip2_config.json
# secretStore:
# type: GSM
# alias: airbyte-connector-testing-secret-store
# - name: SECRET_DESTINATION-S3-V2-PARQUET
# fileName: s3_dest_v2_parquet_config.json
# secretStore:
# type: GSM
# alias: airbyte-connector-testing-secret-store
# - name: SECRET_DESTINATION-S3-V2-PARQUET-SNAPPY
# fileName: s3_dest_v2_parquet_snappy_config.json
# secretStore:
# type: GSM
# alias: airbyte-connector-testing-secret-store
metadataSpecVersion: "1.0"
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ import io.airbyte.cdk.load.check.DestinationChecker
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.integrations.destination.iceberg.v2.io.IcebergTableCleaner
import io.airbyte.integrations.destination.iceberg.v2.io.IcebergUtil
import io.airbyte.integrations.destination.iceberg.v2.io.toIcebergTableIdentifier
import javax.inject.Singleton
import org.apache.iceberg.Schema
import org.apache.iceberg.types.Types

@Singleton
class IcebergV2Checker(
private val icebergTableCleaner: IcebergTableCleaner,
private val icebergUtil: IcebergUtil
private val icebergUtil: IcebergUtil,
private val tableIdGenerator: TableIdGenerator,
) : DestinationChecker<IcebergV2Configuration> {

override fun check(config: IcebergV2Configuration) {
Expand All @@ -43,7 +43,7 @@ class IcebergV2Checker(

icebergTableCleaner.clearTable(
catalog,
testTableIdentifier.toIcebergTableIdentifier(),
tableIdGenerator.toTableIdentifier(testTableIdentifier),
table.io(),
table.location()
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.iceberg.v2

import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.command.iceberg.parquet.GlueCatalogConfiguration
import io.micronaut.context.annotation.Factory
import javax.inject.Singleton
import org.apache.iceberg.catalog.Namespace
import org.apache.iceberg.catalog.TableIdentifier

/**
* Convert our internal stream descriptor to an Iceberg [TableIdentifier]. Implementations should
* handle catalog-specific naming restrictions.
*/
// TODO accept default namespace in config as a val here
interface TableIdGenerator {
fun toTableIdentifier(stream: DestinationStream.Descriptor): TableIdentifier
}

class SimpleTableIdGenerator : TableIdGenerator {
override fun toTableIdentifier(stream: DestinationStream.Descriptor): TableIdentifier =
tableIdOf(stream.namespace!!, stream.name)
}

/** AWS Glue requires lowercase database+table names. */
class GlueTableIdGenerator : TableIdGenerator {
override fun toTableIdentifier(stream: DestinationStream.Descriptor): TableIdentifier =
tableIdOf(stream.namespace!!.lowercase(), stream.name.lowercase())
}

@Factory
class TableIdGeneratorFactory(private val icebergConfiguration: IcebergV2Configuration) {
@Singleton
fun create() =
when (icebergConfiguration.icebergCatalogConfiguration.catalogConfiguration) {
is GlueCatalogConfiguration -> GlueTableIdGenerator()
else -> SimpleTableIdGenerator()
}
}

// iceberg namespace+name must both be nonnull.
private fun tableIdOf(namespace: String, name: String) =
TableIdentifier.of(Namespace.of(namespace), name)
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import io.airbyte.integrations.destination.iceberg.v2.ACCESS_KEY_ID
import io.airbyte.integrations.destination.iceberg.v2.GlueCredentialsProvider
import io.airbyte.integrations.destination.iceberg.v2.IcebergV2Configuration
import io.airbyte.integrations.destination.iceberg.v2.SECRET_ACCESS_KEY
import io.airbyte.integrations.destination.iceberg.v2.TableIdGenerator
import io.github.oshai.kotlinlogging.KotlinLogging
import jakarta.inject.Singleton
import org.apache.hadoop.conf.Configuration
Expand All @@ -40,30 +41,19 @@ import org.apache.iceberg.aws.AwsProperties
import org.apache.iceberg.aws.s3.S3FileIO
import org.apache.iceberg.aws.s3.S3FileIOProperties
import org.apache.iceberg.catalog.Catalog
import org.apache.iceberg.catalog.Namespace
import org.apache.iceberg.catalog.SupportsNamespaces
import org.apache.iceberg.catalog.TableIdentifier
import org.apache.iceberg.data.Record
import org.apache.iceberg.exceptions.AlreadyExistsException
import org.projectnessie.client.NessieConfigConstants
import software.amazon.awssdk.services.glue.model.ConcurrentModificationException

private val logger = KotlinLogging.logger {}

const val AIRBYTE_CDC_DELETE_COLUMN = "_ab_cdc_deleted_at"

/**
* Extension function for the[DestinationStream.Descriptor] class that converts the descriptor to an
* Iceberg [TableIdentifier].
*
* @return An Iceberg [TableIdentifier] representation of the stream descriptor.
*/
fun DestinationStream.Descriptor.toIcebergTableIdentifier(): TableIdentifier {
return TableIdentifier.of(Namespace.of(this.namespace), this.name)
}

/** Collection of Iceberg related utilities. */
@Singleton
class IcebergUtil {
class IcebergUtil(private val tableIdGenerator: TableIdGenerator) {
internal class InvalidFormatException(message: String) : Exception(message)

private val generationIdRegex = Regex("""ab-generation-id-\d+-e""")
Expand Down Expand Up @@ -117,7 +107,7 @@ class IcebergUtil {
schema: Schema,
properties: Map<String, String>
): Table {
val tableIdentifier = streamDescriptor.toIcebergTableIdentifier()
val tableIdentifier = tableIdGenerator.toTableIdentifier(streamDescriptor)
synchronized(tableIdentifier.namespace()) {
if (
catalog is SupportsNamespaces &&
Expand All @@ -135,6 +125,11 @@ class IcebergUtil {
logger.info {
"Namespace '${tableIdentifier.namespace()}' was likely created by another thread during parallel operations."
}
} catch (e: ConcurrentModificationException) {
// do the same for AWS Glue
logger.info {
"Namespace '${tableIdentifier.namespace()}' was likely created by another thread during parallel operations."
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.iceberg.v2

import io.airbyte.cdk.load.test.util.DestinationCleaner
import io.airbyte.cdk.load.test.util.IntegrationTest.Companion.isNamespaceOld
import io.airbyte.cdk.load.test.util.IntegrationTest.Companion.randomizedNamespaceRegex
import org.apache.iceberg.catalog.Catalog
import org.apache.iceberg.catalog.Namespace
import org.apache.iceberg.catalog.SupportsNamespaces

class IcebergDestinationCleaner(private val catalog: Catalog) : DestinationCleaner {
override fun cleanup() {
val namespaces: List<Namespace> =
(catalog as SupportsNamespaces).listNamespaces().filter {
val namespace = it.level(0)
randomizedNamespaceRegex.matches(namespace) && isNamespaceOld(namespace)
}
namespaces.forEach { namespace ->
catalog.listTables(namespace).forEach { table ->
catalog.dropTable(table, /* purge = */ true)
}
catalog.dropNamespace(namespace)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ package io.airbyte.integrations.destination.iceberg.v2

import io.airbyte.cdk.load.check.CheckIntegrationTest
import io.airbyte.cdk.load.check.CheckTestConfig
import io.airbyte.integrations.destination.iceberg.v2.IcebergV2TestUtil.PATH
import org.junit.jupiter.api.Disabled
import io.airbyte.integrations.destination.iceberg.v2.IcebergV2TestUtil.GLUE_CONFIG_PATH

@Disabled
class IcebergV2CheckTest :
CheckIntegrationTest<IcebergV2Specification>(
successConfigFilenames = listOf(CheckTestConfig(PATH)),
successConfigFilenames =
listOf(
CheckTestConfig(GLUE_CONFIG_PATH),
),
// TODO we maybe should add some configs that are expected to fail `check`
failConfigFilenamesAndFailureReasons = mapOf(),
)
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,13 @@ import io.airbyte.cdk.load.data.parquet.ParquetMapperPipelineFactory
import io.airbyte.cdk.load.message.DestinationRecord
import io.airbyte.cdk.load.test.util.DestinationDataDumper
import io.airbyte.cdk.load.test.util.OutputRecord
import io.airbyte.integrations.destination.iceberg.v2.io.IcebergUtil
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange
import java.math.BigDecimal
import java.time.Instant
import java.util.LinkedHashMap
import java.util.UUID
import org.apache.hadoop.conf.Configuration
import org.apache.iceberg.catalog.TableIdentifier
import org.apache.iceberg.data.IcebergGenerics
import org.apache.iceberg.data.Record
import org.apache.iceberg.nessie.NessieCatalog

object IcebergV2DataDumper : DestinationDataDumper {

Expand Down Expand Up @@ -83,15 +79,13 @@ object IcebergV2DataDumper : DestinationDataDumper {
spec: ConfigurationSpecification,
stream: DestinationStream
): List<OutputRecord> {
val config =
IcebergV2ConfigurationFactory()
.makeWithoutExceptionHandling(spec as IcebergV2Specification)
val config = IcebergV2TestUtil.getConfig(spec)
val pipeline = ParquetMapperPipelineFactory().create(stream)
val schema = pipeline.finalSchema as ObjectType
val catalog = getNessieCatalog(config)
val catalog = IcebergV2TestUtil.getCatalog(config)
val table =
catalog.loadTable(
TableIdentifier.of(stream.descriptor.namespace, stream.descriptor.name)
TableIdGeneratorFactory(config).create().toTableIdentifier(stream.descriptor)
)

val outputRecords = mutableListOf<OutputRecord>()
Expand Down Expand Up @@ -121,7 +115,10 @@ object IcebergV2DataDumper : DestinationDataDumper {
}
}

catalog.close()
// some catalogs (e.g. Nessie) have a close() method. Call it here.
if (catalog is AutoCloseable) {
catalog.close()
}
return outputRecords
}

Expand All @@ -131,13 +128,4 @@ object IcebergV2DataDumper : DestinationDataDumper {
): List<String> {
throw NotImplementedError("Iceberg doesn't support universal file transfer")
}

private fun getNessieCatalog(config: IcebergV2Configuration): NessieCatalog {
val catalogProperties = IcebergUtil().toCatalogProperties(config)

val catalog = NessieCatalog()
catalog.setConf(Configuration())
catalog.initialize("nessie", catalogProperties)
return catalog
}
}
Loading
Loading