Skip to content

Commit

Permalink
format
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao committed Dec 16, 2024
1 parent 4e67d18 commit b30724b
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ abstract class IntegrationTest(

@Suppress("DEPRECATION") private val randomSuffix = RandomStringUtils.randomAlphabetic(4)
private val timestampString =
LocalDateTime.ofInstant(Instant.now(), ZoneOffset.UTC).format(randomizedNamespaceDateFormatter)
LocalDateTime.ofInstant(Instant.now(), ZoneOffset.UTC)
.format(randomizedNamespaceDateFormatter)
// stream name doesn't need to be randomized, only the namespace.
val randomizedNamespace = "test$timestampString$randomSuffix"

Expand Down Expand Up @@ -274,10 +275,8 @@ abstract class IntegrationTest(
fun isNamespaceOld(namespace: String, retentionDays: Long = 30): Boolean {
val cleanupCutoffDate = LocalDate.now().minusDays(retentionDays)
val matchResult = randomizedNamespaceRegex.find(namespace)
val namespaceCreationDate = LocalDate.parse(
matchResult!!.groupValues[1],
randomizedNamespaceDateFormatter
)
val namespaceCreationDate =
LocalDate.parse(matchResult!!.groupValues[1], randomizedNamespaceDateFormatter)
return namespaceCreationDate.isBefore(cleanupCutoffDate)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.iceberg.v2

import io.airbyte.cdk.load.command.DestinationStream
Expand All @@ -8,8 +12,8 @@ import org.apache.iceberg.catalog.Namespace
import org.apache.iceberg.catalog.TableIdentifier

/**
* Convert our internal stream descriptor to an Iceberg [TableIdentifier].
* Implementations should handle catalog-specific naming restrictions.
* Convert our internal stream descriptor to an Iceberg [TableIdentifier]. Implementations should
* handle catalog-specific naming restrictions.
*/
// TODO accept default namespace in config as a val here
interface TableIdGenerator {
Expand All @@ -21,9 +25,7 @@ class SimpleTableIdGenerator : TableIdGenerator {
tableIdOf(stream.namespace!!, stream.name)
}

/**
* AWS Glue requires lowercase database+table names.
*/
/** AWS Glue requires lowercase database+table names. */
class GlueTableIdGenerator : TableIdGenerator {
override fun toTableIdentifier(stream: DestinationStream.Descriptor): TableIdentifier =
tableIdOf(stream.namespace!!.lowercase(), stream.name.lowercase())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,7 @@ import org.apache.iceberg.aws.AwsProperties
import org.apache.iceberg.aws.s3.S3FileIO
import org.apache.iceberg.aws.s3.S3FileIOProperties
import org.apache.iceberg.catalog.Catalog
import org.apache.iceberg.catalog.Namespace
import org.apache.iceberg.catalog.SupportsNamespaces
import org.apache.iceberg.catalog.TableIdentifier
import org.apache.iceberg.data.Record
import org.apache.iceberg.exceptions.AlreadyExistsException
import org.projectnessie.client.NessieConfigConstants
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.iceberg.v2

import io.airbyte.cdk.load.test.util.DestinationCleaner
Expand All @@ -9,7 +13,9 @@ import org.apache.iceberg.catalog.Namespace
import org.apache.iceberg.catalog.SupportsNamespaces

class IcebergDestinationCleaner(private val catalog: Catalog) : DestinationCleaner {
constructor(configuration: IcebergV2Configuration) : this(
constructor(
configuration: IcebergV2Configuration
) : this(
IcebergUtil(SimpleTableIdGenerator()).let { icebergUtil ->
val props = icebergUtil.toCatalogProperties(configuration)
icebergUtil.createCatalog(DEFAULT_CATALOG_NAME, props)
Expand All @@ -18,15 +24,14 @@ class IcebergDestinationCleaner(private val catalog: Catalog) : DestinationClean

override fun cleanup() {
val namespaces: List<Namespace> =
(catalog as SupportsNamespaces)
.listNamespaces()
.filter {
val namespace = it.level(0)
randomizedNamespaceRegex.matches(namespace) && isNamespaceOld(namespace)
}
(catalog as SupportsNamespaces).listNamespaces().filter {
val namespace = it.level(0)
randomizedNamespaceRegex.matches(namespace) && isNamespaceOld(namespace)
}
namespaces.forEach { namespace ->
catalog.listTables(namespace)
.forEach { table -> catalog.dropTable(table, /* purge = */ true) }
catalog.listTables(namespace).forEach { table ->
catalog.dropTable(table, /* purge = */ true)
}
catalog.dropNamespace(namespace)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,13 @@ import io.airbyte.cdk.load.data.parquet.ParquetMapperPipelineFactory
import io.airbyte.cdk.load.message.DestinationRecord
import io.airbyte.cdk.load.test.util.DestinationDataDumper
import io.airbyte.cdk.load.test.util.OutputRecord
import io.airbyte.integrations.destination.iceberg.v2.io.IcebergUtil
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange
import java.math.BigDecimal
import java.time.Instant
import java.util.LinkedHashMap
import java.util.UUID
import org.apache.hadoop.conf.Configuration
import org.apache.iceberg.catalog.TableIdentifier
import org.apache.iceberg.data.IcebergGenerics
import org.apache.iceberg.data.Record
import org.apache.iceberg.nessie.NessieCatalog

object IcebergV2DataDumper : DestinationDataDumper {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,14 @@ import java.nio.file.Path

object IcebergV2TestUtil {
// TODO this is just here as an example, we should remove it + add real configs
val MINIMAL_CONFIG_PATH: Path = Path.of(getResourceUri("iceberg_dest_v2_minimal_required_config.json"))
val MINIMAL_CONFIG_PATH: Path =
Path.of(getResourceUri("iceberg_dest_v2_minimal_required_config.json"))
val GLUE_CONFIG_PATH: Path = Path.of("secrets/glue.json")

fun parseConfig(path: Path) =
getConfig(ValidatedJsonUtils.parseOne(IcebergV2Specification::class.java, Files.readString(path)))
getConfig(
ValidatedJsonUtils.parseOne(IcebergV2Specification::class.java, Files.readString(path))
)

fun getConfig(spec: ConfigurationSpecification) =
IcebergV2ConfigurationFactory().makeWithoutExceptionHandling(spec as IcebergV2Specification)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ import io.airbyte.cdk.load.test.util.NoopDestinationCleaner
import io.airbyte.cdk.load.test.util.NoopExpectedRecordMapper
import io.airbyte.cdk.load.write.BasicFunctionalityIntegrationTest
import io.airbyte.cdk.load.write.StronglyTyped
import java.nio.file.Files
import java.util.Base64
import okhttp3.FormBody
import okhttp3.OkHttpClient
import okhttp3.Request
import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.Test
import java.nio.file.Files
import java.util.Base64

abstract class IcebergV2WriteTest(
configContents: String,
Expand Down Expand Up @@ -88,19 +88,23 @@ abstract class IcebergV2WriteTest(
}
}

class IcebergGlueWriteTest : IcebergV2WriteTest(
Files.readString(IcebergV2TestUtil.GLUE_CONFIG_PATH),
IcebergDestinationCleaner(IcebergV2TestUtil.parseConfig(IcebergV2TestUtil.GLUE_CONFIG_PATH)),
)
class IcebergGlueWriteTest :
IcebergV2WriteTest(
Files.readString(IcebergV2TestUtil.GLUE_CONFIG_PATH),
IcebergDestinationCleaner(
IcebergV2TestUtil.parseConfig(IcebergV2TestUtil.GLUE_CONFIG_PATH)
),
)

@Disabled(
"This is currently disabled until we are able to make it run via airbyte-ci. It works as expected locally"
)
class IcebergNessieMinioWriteTest : IcebergV2WriteTest(
getConfig(),
// we're writing to ephemeral testcontainers, so no need to clean up after ourselves
NoopDestinationCleaner
) {
class IcebergNessieMinioWriteTest :
IcebergV2WriteTest(
getConfig(),
// we're writing to ephemeral testcontainers, so no need to clean up after ourselves
NoopDestinationCleaner
) {
companion object {
private fun getToken(): String {
val client = OkHttpClient()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,13 @@ internal class IcebergUtilTest {
every { create() } returns mockk()
}
val catalog: NessieCatalog = mockk {
every { buildTable(tableIdGenerator.toTableIdentifier(streamDescriptor), any()) } returns
tableBuilder
every {
buildTable(tableIdGenerator.toTableIdentifier(streamDescriptor), any())
} returns tableBuilder
every { createNamespace(any()) } returns Unit
every { namespaceExists(any()) } returns false
every { tableExists(tableIdGenerator.toTableIdentifier(streamDescriptor)) } returns false
every { tableExists(tableIdGenerator.toTableIdentifier(streamDescriptor)) } returns
false
}
val table =
icebergUtil.createTable(
Expand All @@ -103,7 +105,9 @@ internal class IcebergUtilTest {
)
assertNotNull(table)
verify(exactly = 1) {
catalog.createNamespace(tableIdGenerator.toTableIdentifier(streamDescriptor).namespace())
catalog.createNamespace(
tableIdGenerator.toTableIdentifier(streamDescriptor).namespace()
)
}
verify(exactly = 1) { tableBuilder.create() }
}
Expand All @@ -121,10 +125,12 @@ internal class IcebergUtilTest {
every { create() } returns mockk()
}
val catalog: NessieCatalog = mockk {
every { buildTable(tableIdGenerator.toTableIdentifier(streamDescriptor), any()) } returns
tableBuilder
every {
buildTable(tableIdGenerator.toTableIdentifier(streamDescriptor), any())
} returns tableBuilder
every { namespaceExists(any()) } returns true
every { tableExists(tableIdGenerator.toTableIdentifier(streamDescriptor)) } returns false
every { tableExists(tableIdGenerator.toTableIdentifier(streamDescriptor)) } returns
false
}
val table =
icebergUtil.createTable(
Expand All @@ -135,7 +141,9 @@ internal class IcebergUtilTest {
)
assertNotNull(table)
verify(exactly = 0) {
catalog.createNamespace(tableIdGenerator.toTableIdentifier(streamDescriptor).namespace())
catalog.createNamespace(
tableIdGenerator.toTableIdentifier(streamDescriptor).namespace()
)
}
verify(exactly = 1) { tableBuilder.create() }
}
Expand All @@ -146,7 +154,8 @@ internal class IcebergUtilTest {
val streamDescriptor = DestinationStream.Descriptor("namespace", "name")
val schema = Schema()
val catalog: NessieCatalog = mockk {
every { loadTable(tableIdGenerator.toTableIdentifier(streamDescriptor)) } returns mockk()
every { loadTable(tableIdGenerator.toTableIdentifier(streamDescriptor)) } returns
mockk()
every { namespaceExists(any()) } returns true
every { tableExists(tableIdGenerator.toTableIdentifier(streamDescriptor)) } returns true
}
Expand All @@ -159,9 +168,13 @@ internal class IcebergUtilTest {
)
assertNotNull(table)
verify(exactly = 0) {
catalog.createNamespace(tableIdGenerator.toTableIdentifier(streamDescriptor).namespace())
catalog.createNamespace(
tableIdGenerator.toTableIdentifier(streamDescriptor).namespace()
)
}
verify(exactly = 1) {
catalog.loadTable(tableIdGenerator.toTableIdentifier(streamDescriptor))
}
verify(exactly = 1) { catalog.loadTable(tableIdGenerator.toTableIdentifier(streamDescriptor)) }
}

@Test
Expand Down

0 comments on commit b30724b

Please sign in to comment.