Skip to content

Commit

Permalink
Fix file related DAT tests (#48727)
Browse files Browse the repository at this point in the history
  • Loading branch information
benmoriceau authored Dec 10, 2024
1 parent 8d9f521 commit cf61e54
Show file tree
Hide file tree
Showing 14 changed files with 26 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ class DestinationMessageFactory(
if (fileTransferEnabled) {
DestinationFileStreamComplete(
stream.descriptor,
message.trace.emittedAt.toLong()
message.trace.emittedAt?.toLong() ?: 0L
)
} else {
DestinationRecordStreamComplete(
Expand All @@ -461,12 +461,12 @@ class DestinationMessageFactory(
if (fileTransferEnabled) {
DestinationFileStreamIncomplete(
stream.descriptor,
message.trace.emittedAt.toLong()
message.trace.emittedAt?.toLong() ?: 0L
)
} else {
DestinationRecordStreamIncomplete(
stream.descriptor,
message.trace.emittedAt.toLong()
message.trace.emittedAt?.toLong() ?: 0L
)
}
else -> Undefined
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,13 @@ class ObjectStorageStreamLoader<T : RemoteObject<*>, U : OutputStream>(
if (pathFactory.supportsStaging) {
throw IllegalStateException("Staging is not supported for files")
}
val fileUrl = file.fileMessage.fileUrl ?: ""
if (!File(fileUrl).exists()) {
log.error { "File does not exist: $fileUrl" }
throw IllegalStateException("File does not exist: $fileUrl")
}
val key =
Path.of(pathFactory.getFinalDirectory(stream).toString(), file.fileMessage.fileUrl!!)
Path.of(pathFactory.getFinalDirectory(stream), "${file.fileMessage.fileRelativePath}")
.toString()

val state = destinationStateManager.getState(stream)
Expand All @@ -137,13 +142,12 @@ class ObjectStorageStreamLoader<T : RemoteObject<*>, U : OutputStream>(
isStaging = false
)

val localFile = createFile(file.fileMessage.fileUrl!!)

val metadata = ObjectStorageDestinationState.metadataFor(stream)
val obj =
client.streamingUpload(key, metadata, streamProcessor = compressor) { outputStream ->
File(file.fileMessage.fileUrl!!).inputStream().use { it.copyTo(outputStream) }
File(fileUrl).inputStream().use { it.copyTo(outputStream) }
}
val localFile = createFile(fileUrl)
localFile.delete()
return RemoteObject(remoteObject = obj, partNumber = 0)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import io.mockk.spyk
import io.mockk.verify
import java.io.ByteArrayOutputStream
import java.io.File
import java.nio.file.Files
import java.nio.file.Path
import kotlin.test.assertEquals
import kotlinx.coroutines.test.runTest
Expand Down Expand Up @@ -52,12 +53,14 @@ class ObjectStorageStreamLoaderTest {

@Test
fun `test processFile`() = runTest {
val fileUrl = "fileUrl"
val fileUrl = "/tmp/fileUrl"
Files.deleteIfExists(Path.of(fileUrl))
Files.createFile(Path.of(fileUrl))
val stagingDirectory = "stagingDirectory"
val generationId = 12L
val destinationFile = mockk<DestinationFile>()
every { destinationFile.fileMessage } returns
DestinationFile.AirbyteRecordMessageFile(fileUrl = fileUrl)
DestinationFile.AirbyteRecordMessageFile(fileUrl = fileUrl, fileRelativePath = fileUrl)
every { pathFactory.getFinalDirectory(any()) } returns stagingDirectory
every { stream.generationId } returns generationId
val mockedStateStorage: ObjectStorageDestinationState = mockk(relaxed = true)
Expand All @@ -82,5 +85,6 @@ class ObjectStorageStreamLoaderTest {
(result as ObjectStorageStreamLoader.RemoteObject<*>).remoteObject
)
verify { mockedFile.delete() }
Files.deleteIfExists(Path.of(fileUrl))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,10 @@ abstract class BaseDestinationAcceptanceTest(
catalog: ConfiguredAirbyteCatalog,
runNormalization: Boolean,
imageName: String,
additionalEnvs: Map<String, String> = mapOf()
): List<AirbyteMessage> {
val destinationConfig = getDestinationConfig(config, catalog)
return runSync(messages, runNormalization, imageName, destinationConfig)
return runSync(messages, runNormalization, imageName, destinationConfig, additionalEnvs)
}

@Throws(Exception::class)
Expand All @@ -185,13 +186,14 @@ abstract class BaseDestinationAcceptanceTest(
runNormalization: Boolean,
imageName: String,
destinationConfig: WorkerDestinationConfig,
additionalEnvs: Map<String, String> = mapOf()
): List<AirbyteMessage> {
val destination = getDestination(imageName)

destination.start(
destinationConfig,
jobRoot,
inDestinationNormalizationFlags(runNormalization)
additionalEnvs + inDestinationNormalizationFlags(runNormalization)
)
messages.forEach(
Consumer { message: AirbyteMessage ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1944,10 +1944,11 @@ abstract class DestinationAcceptanceTest(
catalog: ConfiguredAirbyteCatalog,
runNormalization: Boolean,
imageName: String,
additionalEnvs: Map<String, String>,
): List<AirbyteMessage> {
val destinationConfig = getDestinationConfig(config, catalog)
val destinationOutput =
super.runSync(messages, runNormalization, imageName, destinationConfig)
super.runSync(messages, runNormalization, imageName, destinationConfig, additionalEnvs)

if (!runNormalization || (supportsInDestinationNormalization())) {
return destinationOutput
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -801,13 +801,11 @@ protected constructor(
catalog = catalog,
runNormalization = false,
imageName = imageName,
additionalEnvs = mapOf("USE_FILE_TRANSFER" to "true"),
)
fail("sync should have failed. Instead got output $destinationOutput")
} catch (e: TestHarnessException) {
assertContains(
e.outputMessages!![0].trace.error.internalMessage,
S3DestinationFlushFunction.FILE_RECORD_ERROR_MESSAGE
)
assertContains(e.outputMessages!![0].trace.error.internalMessage, "File does not exist")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import com.fasterxml.jackson.databind.JsonNode
import io.airbyte.cdk.integrations.destination.s3.S3BaseAvroDestinationAcceptanceTest
import io.airbyte.cdk.integrations.standardtest.destination.ProtocolVersion
import io.airbyte.cdk.integrations.standardtest.destination.comparator.TestDataComparator
import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.Test

class S3V2AvroDestinationAcceptanceTest : S3BaseAvroDestinationAcceptanceTest() {
override val imageName: String = "airbyte/destination-s3-v2:dev"
Expand All @@ -23,10 +21,4 @@ class S3V2AvroDestinationAcceptanceTest : S3BaseAvroDestinationAcceptanceTest()

override val baseConfigJson: JsonNode
get() = S3V2DestinationTestUtils.baseConfigJsonFilePath

@Test
@Disabled("Pending FILE TRANSFER S3V2")
override fun testFakeFileTransfer() {
super.testFakeFileTransfer()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ class S3V2CsvAssumeRoleDestinationAcceptanceTest : S3BaseCsvDestinationAcceptanc
}

@Test
@Disabled("Pending FILE TRANSFER S3V2")
override fun testFakeFileTransfer() {
super.testFakeFileTransfer()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ package io.airbyte.integrations.destination.s3
import com.fasterxml.jackson.databind.JsonNode
import io.airbyte.cdk.integrations.destination.s3.S3BaseCsvDestinationAcceptanceTest
import io.airbyte.cdk.integrations.standardtest.destination.ProtocolVersion
import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.Test

class S3V2CsvDestinationAcceptanceTest : S3BaseCsvDestinationAcceptanceTest() {
override val imageName: String = "airbyte/destination-s3-v2:dev"
Expand All @@ -17,10 +15,4 @@ class S3V2CsvDestinationAcceptanceTest : S3BaseCsvDestinationAcceptanceTest() {

override val baseConfigJson: JsonNode
get() = S3V2DestinationTestUtils.baseConfigJsonFilePath

@Test
@Disabled("Pending FILE TRANSFER S3V2")
override fun testFakeFileTransfer() {
super.testFakeFileTransfer()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ package io.airbyte.integrations.destination.s3
import com.fasterxml.jackson.databind.JsonNode
import io.airbyte.cdk.integrations.destination.s3.S3BaseCsvGzipDestinationAcceptanceTest
import io.airbyte.cdk.integrations.standardtest.destination.ProtocolVersion
import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.Test

class S3V2CsvGzipDestinationAcceptanceTest : S3BaseCsvGzipDestinationAcceptanceTest() {
override val imageName: String = "airbyte/destination-s3-v2:dev"
Expand All @@ -17,10 +15,4 @@ class S3V2CsvGzipDestinationAcceptanceTest : S3BaseCsvGzipDestinationAcceptanceT

override val baseConfigJson: JsonNode
get() = S3V2DestinationTestUtils.baseConfigJsonFilePath

@Test
@Disabled("Pending FILE TRANSFER S3V2")
override fun testFakeFileTransfer() {
super.testFakeFileTransfer()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import com.google.common.collect.ImmutableMap
import io.airbyte.cdk.integrations.destination.async.model.AirbyteRecordMessageFile
import io.airbyte.cdk.integrations.destination.s3.FileUploadFormat
import io.airbyte.cdk.integrations.destination.s3.S3BaseDestinationAcceptanceTest
import io.airbyte.cdk.integrations.destination.s3.S3ConsumerFactory
import io.airbyte.cdk.integrations.destination.s3.S3StorageOperations
import io.airbyte.cdk.integrations.destination.s3.constant.S3Constants
import io.airbyte.cdk.integrations.destination.s3.util.Flattening
Expand All @@ -29,12 +28,10 @@ import kotlin.io.path.writeText
import kotlin.random.Random
import kotlin.test.*
import org.apache.commons.lang3.RandomStringUtils
import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.Test

private val LOGGER = KotlinLogging.logger {}

@Disabled("Pending FILE TRANSFER S3V2")
class S3V2FileTransferDestinationTest : S3BaseDestinationAcceptanceTest() {
override val imageName: String = "airbyte/destination-s3-v2:dev"
override val supportsFileTransfer = true
Expand Down Expand Up @@ -167,7 +164,7 @@ class S3V2FileTransferDestinationTest : S3BaseDestinationAcceptanceTest() {
} catch (e: TestHarnessException) {
assertContains(
e.outputMessages!![0].trace.error.internalMessage,
S3ConsumerFactory.MISSING_FILE_FIELD_IN_FILE_TRANSFER_ERROR_MESSAGE
"Failed to convert AirbyteMessage"
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ package io.airbyte.integrations.destination.s3
import com.fasterxml.jackson.databind.JsonNode
import io.airbyte.cdk.integrations.destination.s3.S3BaseJsonlDestinationAcceptanceTest
import io.airbyte.cdk.integrations.standardtest.destination.ProtocolVersion
import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.Test

class S3V2JsonlDestinationAcceptanceTest : S3BaseJsonlDestinationAcceptanceTest() {
override val imageName: String = "airbyte/destination-s3-v2:dev"
Expand All @@ -17,10 +15,4 @@ class S3V2JsonlDestinationAcceptanceTest : S3BaseJsonlDestinationAcceptanceTest(

override val baseConfigJson: JsonNode
get() = S3V2DestinationTestUtils.baseConfigJsonFilePath

@Test
@Disabled("Pending FILE TRANSFER S3V2")
override fun testFakeFileTransfer() {
super.testFakeFileTransfer()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ package io.airbyte.integrations.destination.s3
import com.fasterxml.jackson.databind.JsonNode
import io.airbyte.cdk.integrations.destination.s3.S3BaseJsonlGzipDestinationAcceptanceTest
import io.airbyte.cdk.integrations.standardtest.destination.ProtocolVersion
import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.Test

class S3V2JsonlGzipDestinationAcceptanceTest : S3BaseJsonlGzipDestinationAcceptanceTest() {
override val imageName: String = "airbyte/destination-s3-v2:dev"
Expand All @@ -17,10 +15,4 @@ class S3V2JsonlGzipDestinationAcceptanceTest : S3BaseJsonlGzipDestinationAccepta

override val baseConfigJson: JsonNode
get() = S3V2DestinationTestUtils.baseConfigJsonFilePath

@Test
@Disabled("Pending FILE TRANSFER S3V2")
override fun testFakeFileTransfer() {
super.testFakeFileTransfer()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import io.airbyte.protocol.models.v0.AirbyteCatalog
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.CatalogHelpers
import java.util.concurrent.TimeUnit
import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.Timeout

Expand Down Expand Up @@ -74,10 +73,4 @@ class S3V2ParquetDestinationAcceptanceTest : S3BaseParquetDestinationAcceptanceT

runSyncAndVerifyStateOutput(config, messages, configuredCatalog, false)
}

@Test
@Disabled("Pending FILE TRANSFER S3V2")
override fun testFakeFileTransfer() {
super.testFakeFileTransfer()
}
}

0 comments on commit cf61e54

Please sign in to comment.