From cb1bfbe5424d2d4892c90aca0c50e07288a8bb2d Mon Sep 17 00:00:00 2001 From: Stephane Geneix <147216312+stephane-airbyte@users.noreply.github.com> Date: Wed, 23 Oct 2024 16:25:04 -0700 Subject: [PATCH] simple split of DestinationAcceptanceTest (#46689) just a split of `DestinationAcceptanceTest` so we can have file-based tests and record-based tests. The change was made in such a way that all existing subclass of `DestinationAcceptanceTest` will still work as-is. This Pr was simply split to isolate those changes from functional changes --- airbyte-cdk/java/airbyte-cdk/README.md | 1 + .../src/main/resources/version.properties | 2 +- .../BaseDestinationAcceptanceTest.kt | 285 ++++++++++++++++++ .../destination/DestinationAcceptanceTest.kt | 233 +------------- 4 files changed, 296 insertions(+), 225 deletions(-) create mode 100644 airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/BaseDestinationAcceptanceTest.kt diff --git a/airbyte-cdk/java/airbyte-cdk/README.md b/airbyte-cdk/java/airbyte-cdk/README.md index 6a70eb75520a..48f390a35cc4 100644 --- a/airbyte-cdk/java/airbyte-cdk/README.md +++ b/airbyte-cdk/java/airbyte-cdk/README.md @@ -174,6 +174,7 @@ corresponds to that version. | Version | Date | Pull Request | Subject | |:-----------|:-----------|:------------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 0.47.3 | 2024-10-23 | [\#46689](https://github.com/airbytehq/airbyte/pull/46689) | Split DestinationAcceptanceTest| | 0.47.2 | 2024-10-21 | [\#47216](https://github.com/airbytehq/airbyte/pull/47216) | improve java compatibiilty| | 0.47.1 | 2024-09-27 | [\#45397](https://github.com/airbytehq/airbyte/pull/45397) | Allow logical replication from Postgres 16 read-replicas| | 0.47.0 | 2024-09-26 | [\#42030](https://github.com/airbytehq/airbyte/pull/42030) | minor refactor | diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties index 14859cbf1468..5312981f0d4d 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties @@ -1 +1 @@ -version=0.47.2 +version=0.47.3 diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/BaseDestinationAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/BaseDestinationAcceptanceTest.kt new file mode 100644 index 000000000000..6fdd1d4836bf --- /dev/null +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/BaseDestinationAcceptanceTest.kt @@ -0,0 +1,285 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.integrations.standardtest.destination + +import com.fasterxml.jackson.databind.JsonNode +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings +import io.airbyte.commons.features.EnvVariableFeatureFlags +import io.airbyte.commons.features.FeatureFlags +import io.airbyte.commons.features.FeatureFlagsWrapper +import io.airbyte.commons.lang.Exceptions +import io.airbyte.configoss.WorkerDestinationConfig +import io.airbyte.protocol.models.v0.AirbyteMessage +import io.airbyte.protocol.models.v0.AirbyteStateStats +import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog +import io.airbyte.workers.helper.ConnectorConfigUpdater +import io.airbyte.workers.internal.AirbyteDestination +import io.airbyte.workers.internal.DefaultAirbyteDestination +import io.airbyte.workers.process.AirbyteIntegrationLauncher +import io.airbyte.workers.process.DockerProcessFactory +import io.airbyte.workers.process.ProcessFactory +import io.github.oshai.kotlinlogging.KotlinLogging +import java.nio.file.Files +import java.nio.file.Path +import java.util.* +import java.util.function.Consumer +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.BeforeEach +import org.mockito.Mockito + +private val LOGGER = KotlinLogging.logger {} + +@SuppressFBWarnings("NP_NONNULL_RETURN_VIOLATION") +abstract class BaseDestinationAcceptanceTest( + // If false, ignore counts and only verify the final state message. + protected val verifyIndividualStateAndCounts: Boolean = false, +) { + protected lateinit var processFactory: ProcessFactory + private set + protected lateinit var jobRoot: Path + private set + protected var localRoot: Path? = null + private set + protected lateinit var testEnv: DestinationAcceptanceTest.TestDestinationEnv + private set + protected var fileTransferMountSource: Path? = null + private set + protected open val supportsFileTransfer: Boolean = false + protected var testSchemas: HashSet = HashSet() + protected lateinit var mConnectorConfigUpdater: ConnectorConfigUpdater + private set + protected open val isCloudTest: Boolean = true + protected val featureFlags: FeatureFlags = + if (isCloudTest) { + FeatureFlagsWrapper.overridingDeploymentMode(EnvVariableFeatureFlags(), "CLOUD") + } else { + FeatureFlagsWrapper.overridingDeploymentMode(EnvVariableFeatureFlags(), "OSS") + } + protected abstract val imageName: String + /** + * Name of the docker image that the tests will run against. + * + * @return docker image name + */ + get + + /** + * Configuration specific to the integration. Will be passed to integration where appropriate in + * each test. Should be valid. + * + * @return integration-specific configuration + */ + @Throws(Exception::class) protected abstract fun getConfig(): JsonNode + + protected open fun supportsInDestinationNormalization(): Boolean { + return false + } + + protected fun inDestinationNormalizationFlags(shouldNormalize: Boolean): Map { + if (shouldNormalize && supportsInDestinationNormalization()) { + return java.util.Map.of("NORMALIZATION_TECHNIQUE", "LEGACY") + } + return emptyMap() + } + + protected fun getDestinationConfig( + config: JsonNode, + catalog: ConfiguredAirbyteCatalog, + ): WorkerDestinationConfig { + return WorkerDestinationConfig() + .withConnectionId(UUID.randomUUID()) + .withCatalog( + DestinationAcceptanceTest.convertProtocolObject( + catalog, + io.airbyte.protocol.models.ConfiguredAirbyteCatalog::class.java + ) + ) + .withDestinationConnectionConfiguration(config) + } + + protected fun runSyncAndVerifyStateOutput( + config: JsonNode, + messages: List, + catalog: ConfiguredAirbyteCatalog, + runNormalization: Boolean, + ) { + runSyncAndVerifyStateOutput( + config, + messages, + catalog, + runNormalization, + imageName, + verifyIndividualStateAndCounts + ) + } + + @Throws(Exception::class) + protected fun runSyncAndVerifyStateOutput( + config: JsonNode, + messages: List, + catalog: ConfiguredAirbyteCatalog, + runNormalization: Boolean, + imageName: String, + verifyIndividualStateAndCounts: Boolean + ) { + val destinationOutput = runSync(config, messages, catalog, runNormalization, imageName) + + var expected = messages.filter { it.type == AirbyteMessage.Type.STATE } + var actual = destinationOutput.filter { it.type == AirbyteMessage.Type.STATE } + + if (verifyIndividualStateAndCounts) { + /* Collect the counts and add them to each expected state message */ + val stateToCount = mutableMapOf() + messages.fold(0) { acc, message -> + if (message.type == AirbyteMessage.Type.STATE) { + stateToCount[message.state.global.sharedState] = acc + 0 + } else { + acc + 1 + } + } + + expected.forEach { message -> + val clone = message.state + clone.destinationStats = + AirbyteStateStats() + .withRecordCount(stateToCount[clone.global.sharedState]!!.toDouble()) + message.state = clone + } + } else { + /* Null the stats and collect only the final messages */ + val finalActual = + actual.lastOrNull() + ?: throw IllegalArgumentException( + "All message sets used for testing should include a state record" + ) + val clone = finalActual.state + clone.destinationStats = null + finalActual.state = clone + + expected = listOf(expected.last()) + actual = listOf(finalActual) + } + + Assertions.assertEquals(expected, actual) + } + + @Throws(Exception::class) + open protected fun runSync( + config: JsonNode, + messages: List, + catalog: ConfiguredAirbyteCatalog, + runNormalization: Boolean, + imageName: String, + ): List { + val destinationConfig = getDestinationConfig(config, catalog) + return runSync(messages, runNormalization, imageName, destinationConfig) + } + + @Throws(Exception::class) + protected fun runSync( + messages: List, + runNormalization: Boolean, + imageName: String, + destinationConfig: WorkerDestinationConfig, + ): List { + val destination = getDestination(imageName) + + destination.start( + destinationConfig, + jobRoot, + inDestinationNormalizationFlags(runNormalization) + ) + messages.forEach( + Consumer { message: AirbyteMessage -> + Exceptions.toRuntime { + destination.accept( + DestinationAcceptanceTest.convertProtocolObject( + message, + io.airbyte.protocol.models.AirbyteMessage::class.java + ) + ) + } + } + ) + destination.notifyEndOfInput() + + val destinationOutput: MutableList = ArrayList() + while (!destination.isFinished()) { + destination.attemptRead().ifPresent { + destinationOutput.add( + DestinationAcceptanceTest.convertProtocolObject(it, AirbyteMessage::class.java) + ) + } + } + + destination.close() + + return destinationOutput + } + + protected fun getDestination(imageName: String): AirbyteDestination { + return DefaultAirbyteDestination( + integrationLauncher = + AirbyteIntegrationLauncher( + DestinationAcceptanceTest.JOB_ID, + DestinationAcceptanceTest.JOB_ATTEMPT, + imageName, + processFactory, + null, + null, + false, + featureFlags + ) + ) + } + + @BeforeEach + @Throws(Exception::class) + open fun setUpInternal() { + val testDir = Path.of("/tmp/airbyte_tests/") + Files.createDirectories(testDir) + val workspaceRoot = Files.createTempDirectory(testDir, "test") + jobRoot = Files.createDirectories(Path.of(workspaceRoot.toString(), "job")) + localRoot = Files.createTempDirectory(testDir, "output") + LOGGER.info { "${"jobRoot: {}"} $jobRoot" } + LOGGER.info { "${"localRoot: {}"} $localRoot" } + testEnv = DestinationAcceptanceTest.TestDestinationEnv(localRoot) + mConnectorConfigUpdater = Mockito.mock(ConnectorConfigUpdater::class.java) + testSchemas = HashSet() + setup(testEnv, testSchemas) + fileTransferMountSource = + if (supportsFileTransfer) Files.createTempDirectory(testDir, "file_transfer") else null + + processFactory = + DockerProcessFactory( + workspaceRoot, + workspaceRoot.toString(), + localRoot.toString(), + "host", + getConnectorEnv() + ) + } + + /** + * Function that performs any setup of external resources required for the test. e.g. + * instantiate a postgres database. This function will be called before EACH test. + * + * @param testEnv + * - information about the test environment. + * @param TEST_SCHEMAS + * @throws Exception + * - can throw any exception, test framework will handle. + */ + @Throws(Exception::class) + protected abstract fun setup( + testEnv: DestinationAcceptanceTest.TestDestinationEnv, + TEST_SCHEMAS: HashSet + ) + + open fun getConnectorEnv(): Map { + return emptyMap() + } +} diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt index d3dfc0d5c794..581da32655b8 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt @@ -16,12 +16,8 @@ import io.airbyte.cdk.integrations.standardtest.destination.argproviders.DataTyp import io.airbyte.cdk.integrations.standardtest.destination.argproviders.util.ArgumentProviderUtil import io.airbyte.cdk.integrations.standardtest.destination.comparator.BasicTestDataComparator import io.airbyte.cdk.integrations.standardtest.destination.comparator.TestDataComparator -import io.airbyte.commons.features.EnvVariableFeatureFlags -import io.airbyte.commons.features.FeatureFlags -import io.airbyte.commons.features.FeatureFlagsWrapper import io.airbyte.commons.jackson.MoreMappers import io.airbyte.commons.json.Jsons -import io.airbyte.commons.lang.Exceptions import io.airbyte.commons.resources.MoreResources import io.airbyte.commons.util.MoreIterators import io.airbyte.configoss.JobGetSpecConfig @@ -38,7 +34,6 @@ import io.airbyte.protocol.models.v0.AirbyteMessage.Type import io.airbyte.protocol.models.v0.AirbyteRecordMessage import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange import io.airbyte.protocol.models.v0.AirbyteStateMessage -import io.airbyte.protocol.models.v0.AirbyteStateStats import io.airbyte.protocol.models.v0.AirbyteStream import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage import io.airbyte.protocol.models.v0.AirbyteTraceMessage @@ -52,15 +47,12 @@ import io.airbyte.workers.exception.TestHarnessException import io.airbyte.workers.general.DbtTransformationRunner import io.airbyte.workers.general.DefaultCheckConnectionTestHarness import io.airbyte.workers.general.DefaultGetSpecTestHarness -import io.airbyte.workers.helper.ConnectorConfigUpdater import io.airbyte.workers.helper.EntrypointEnvChecker import io.airbyte.workers.internal.AirbyteDestination import io.airbyte.workers.internal.DefaultAirbyteDestination import io.airbyte.workers.normalization.DefaultNormalizationRunner import io.airbyte.workers.normalization.NormalizationRunner import io.airbyte.workers.process.AirbyteIntegrationLauncher -import io.airbyte.workers.process.DockerProcessFactory -import io.airbyte.workers.process.ProcessFactory import io.github.oshai.kotlinlogging.KotlinLogging import java.io.IOException import java.io.UncheckedIOException @@ -85,61 +77,24 @@ import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.Arguments import org.junit.jupiter.params.provider.ArgumentsProvider import org.junit.jupiter.params.provider.ArgumentsSource -import org.mockito.Mockito private val LOGGER = KotlinLogging.logger {} abstract class DestinationAcceptanceTest( // If false, ignore counts and only verify the final state message. - private val verifyIndividualStateAndCounts: Boolean = false, + verifyIndividualStateAndCounts: Boolean = false, private val useV2Fields: Boolean = false, private val supportsChangeCapture: Boolean = false, private val expectNumericTimestamps: Boolean = false, private val expectSchemalessObjectsCoercedToStrings: Boolean = false, private val expectUnionsPromotedToDisjointRecords: Boolean = false -) { - protected var testSchemas: HashSet = HashSet() - - private lateinit var testEnv: TestDestinationEnv - private set - protected open val isCloudTest: Boolean = true - protected val featureFlags: FeatureFlags = - if (isCloudTest) { - FeatureFlagsWrapper.overridingDeploymentMode(EnvVariableFeatureFlags(), "CLOUD") - } else { - FeatureFlagsWrapper.overridingDeploymentMode(EnvVariableFeatureFlags(), "OSS") - } - - private lateinit var jobRoot: Path - private lateinit var processFactory: ProcessFactory - private lateinit var mConnectorConfigUpdater: ConnectorConfigUpdater - - protected var localRoot: Path? = null +) : BaseDestinationAcceptanceTest(verifyIndividualStateAndCounts = verifyIndividualStateAndCounts) { open protected var _testDataComparator: TestDataComparator = getTestDataComparator() protected open fun getTestDataComparator(): TestDataComparator { return BasicTestDataComparator { @Suppress("deprecation") this.resolveIdentifier(it) } } - protected abstract val imageName: String - /** - * Name of the docker image that the tests will run against. - * - * @return docker image name - */ - get - - protected open fun supportsInDestinationNormalization(): Boolean { - return false - } - - protected fun inDestinationNormalizationFlags(shouldNormalize: Boolean): Map { - if (shouldNormalize && supportsInDestinationNormalization()) { - return java.util.Map.of("NORMALIZATION_TECHNIQUE", "LEGACY") - } - return emptyMap() - } - private val imageNameWithoutTag: String get() = if (imageName.contains(":")) @@ -166,14 +121,6 @@ abstract class DestinationAcceptanceTest( return normalizationRepository.asText() + ":" + NORMALIZATION_VERSION } - /** - * Configuration specific to the integration. Will be passed to integration where appropriate in - * each test. Should be valid. - * - * @return integration-specific configuration - */ - @Throws(Exception::class) protected abstract fun getConfig(): JsonNode - /** * Configuration specific to the integration. Will be passed to integration where appropriate in * tests that test behavior when configuration is invalid. e.g incorrect password. Should be @@ -389,19 +336,6 @@ abstract class DestinationAcceptanceTest( throw IllegalStateException("Not implemented") } - /** - * Function that performs any setup of external resources required for the test. e.g. - * instantiate a postgres database. This function will be called before EACH test. - * - * @param testEnv - * - information about the test environment. - * @param TEST_SCHEMAS - * @throws Exception - * - can throw any exception, test framework will handle. - */ - @Throws(Exception::class) - protected abstract fun setup(testEnv: TestDestinationEnv, TEST_SCHEMAS: HashSet) - /** * Function that performs any clean up of external resources required for the test. e.g. delete * a postgres database. This function will be called after EACH test. It MUST remove all data in @@ -422,35 +356,6 @@ abstract class DestinationAcceptanceTest( return listOf(identifier) } - @BeforeEach - @Throws(Exception::class) - fun setUpInternal() { - val testDir = Path.of("/tmp/airbyte_tests/") - Files.createDirectories(testDir) - val workspaceRoot = Files.createTempDirectory(testDir, "test") - jobRoot = Files.createDirectories(Path.of(workspaceRoot.toString(), "job")) - localRoot = Files.createTempDirectory(testDir, "output") - LOGGER.info { "${"jobRoot: {}"} $jobRoot" } - LOGGER.info { "${"localRoot: {}"} $localRoot" } - testEnv = TestDestinationEnv(localRoot) - mConnectorConfigUpdater = Mockito.mock(ConnectorConfigUpdater::class.java) - testSchemas = HashSet() - setup(testEnv, testSchemas) - - processFactory = - DockerProcessFactory( - workspaceRoot, - workspaceRoot.toString(), - localRoot.toString(), - "host", - getConnectorEnv() - ) - } - - open fun getConnectorEnv(): Map { - return emptyMap() - } - @AfterEach @Throws(Exception::class) fun tearDownInternal() { @@ -2032,137 +1937,17 @@ abstract class DestinationAcceptanceTest( ) } - private fun getDestination(imageName: String): AirbyteDestination { - return DefaultAirbyteDestination( - integrationLauncher = - AirbyteIntegrationLauncher( - JOB_ID, - JOB_ATTEMPT, - imageName, - processFactory, - null, - null, - false, - featureFlags - ) - ) - } - - protected fun runSyncAndVerifyStateOutput( - config: JsonNode, - messages: List, - catalog: ConfiguredAirbyteCatalog, - runNormalization: Boolean, - ) { - runSyncAndVerifyStateOutput( - config, - messages, - catalog, - runNormalization, - imageName, - verifyIndividualStateAndCounts - ) - } - - @Throws(Exception::class) - protected fun runSyncAndVerifyStateOutput( - config: JsonNode, - messages: List, - catalog: ConfiguredAirbyteCatalog, - runNormalization: Boolean, - imageName: String, - verifyIndividualStateAndCounts: Boolean - ) { - val destinationOutput = runSync(config, messages, catalog, runNormalization, imageName) - - var expected = messages.filter { it.type == Type.STATE } - var actual = destinationOutput.filter { it.type == Type.STATE } - - if (verifyIndividualStateAndCounts) { - /* Collect the counts and add them to each expected state message */ - val stateToCount = mutableMapOf() - messages.fold(0) { acc, message -> - if (message.type == Type.STATE) { - stateToCount[message.state.global.sharedState] = acc - 0 - } else { - acc + 1 - } - } - - expected.forEach { message -> - val clone = message.state - clone.destinationStats = - AirbyteStateStats() - .withRecordCount(stateToCount[clone.global.sharedState]!!.toDouble()) - message.state = clone - } - } else { - /* Null the stats and collect only the final messages */ - val finalActual = - actual.lastOrNull() - ?: throw IllegalArgumentException( - "All message sets used for testing should include a state record" - ) - val clone = finalActual.state - clone.destinationStats = null - finalActual.state = clone - - expected = listOf(expected.last()) - actual = listOf(finalActual) - } - - Assertions.assertEquals(expected, actual) - } - @Throws(Exception::class) - private fun runSync( + override fun runSync( config: JsonNode, messages: List, catalog: ConfiguredAirbyteCatalog, runNormalization: Boolean, imageName: String, ): List { - val destinationConfig = - WorkerDestinationConfig() - .withConnectionId(UUID.randomUUID()) - .withCatalog( - convertProtocolObject( - catalog, - io.airbyte.protocol.models.ConfiguredAirbyteCatalog::class.java - ) - ) - .withDestinationConnectionConfiguration(config) - - val destination = getDestination(imageName) - - destination.start( - destinationConfig, - jobRoot, - inDestinationNormalizationFlags(runNormalization) - ) - messages.forEach( - Consumer { message: AirbyteMessage -> - Exceptions.toRuntime { - destination.accept( - convertProtocolObject( - message, - io.airbyte.protocol.models.AirbyteMessage::class.java - ) - ) - } - } - ) - destination.notifyEndOfInput() - - val destinationOutput: MutableList = ArrayList() - while (!destination.isFinished()) { - destination.attemptRead().ifPresent { - destinationOutput.add(convertProtocolObject(it, AirbyteMessage::class.java)) - } - } - - destination.close() + val destinationConfig = getDestinationConfig(config, catalog) + val destinationOutput = + super.runSync(messages, runNormalization, imageName, destinationConfig) if (!runNormalization || (supportsInDestinationNormalization())) { return destinationOutput @@ -2690,8 +2475,8 @@ abstract class DestinationAcceptanceTest( private val RANDOM = Random() private const val NORMALIZATION_VERSION = "dev" - private const val JOB_ID = "0" - private const val JOB_ATTEMPT = 0 + public const val JOB_ID = "0" + public const val JOB_ATTEMPT = 0 private const val DUMMY_CATALOG_NAME = "DummyCatalog" @@ -2828,7 +2613,7 @@ abstract class DestinationAcceptanceTest( return airbyteMessages } - private fun convertProtocolObject(v1: V1, klass: Class): V0 { + fun convertProtocolObject(v1: V1, klass: Class): V0 { return Jsons.`object`(Jsons.jsonNode(v1), klass)!! } }