Skip to content

Commit

Permalink
Bulk Load CDK: Fixes for various S3V2 release issues (#48733)
Browse files Browse the repository at this point in the history
  • Loading branch information
johnny-schmidt authored Nov 28, 2024
1 parent 2337025 commit 4add9ca
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class JsonSchemaToAirbyteType {
when (schema.get("type").asText()) {
"string" -> fromString(schema)
"boolean" -> BooleanType
"int",
"integer" -> IntegerType
"number" -> fromNumber(schema)
"array" -> fromArray(schema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,28 @@ class DefaultDestinationMessageDeserializer(private val messageFactory: Destinat
val airbyteMessage = serialized.deserializeToClass(AirbyteMessage::class.java)
return messageFactory.fromAirbyteMessage(airbyteMessage, serialized)
} catch (t: Throwable) {
throw RuntimeException("Failed to deserialize AirbyteMessage")
/**
* We don't want to expose client data, but we'd like to get as much info as we can
* about these malformed messages.
*/
val type =
if (serialized.contains("RECORD")) {
"record"
} else if (serialized.contains("STATE")) {
"state"
} else if (serialized.contains("TRACE")) {
if (serialized.contains("STATUS", ignoreCase = true)) {
"status"
} else {
"trace"
}
} else {
"unknown"
}

throw RuntimeException(
"Failed to deserialize airbyte message (type=$type; length=${serialized.length}; reason=${t.javaClass})"
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ class JsonSchemaToAirbyteSchemaTypeTest {
Assertions.assertTrue(airbyteType is IntegerType)
}

/** Note: this is nonstandard, but some sources apparently use it. */
@Test
fun testInt() {
val integerType = ofType("int")
val airbyteType = JsonSchemaToAirbyteType().convert(integerType)
Assertions.assertTrue(airbyteType is IntegerType)
}

@Test
fun testNumber() {
val numberType = ofType("number")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,9 @@ class ObjectStoragePathFactory(
FileVariable("date", """\d{4}_\d{2}_\d{2}""") {
DATE_FORMATTER.format(it.syncTime)
},
FileVariable("date:yyyy_MM", """\d{4}_\d{2}""") {
DATE_FORMATTER.format(it.syncTime).substring(0, 7)
},
FileVariable("timestamp", """\d+""") {
// NOTE: We use a constant time for the path but wall time for the files
it.currentTimeProvider.currentTimeMillis().toString()
Expand Down Expand Up @@ -381,7 +384,7 @@ class ObjectStoragePathFactory(
val replacedForFile =
buildPattern(
filePatternResolved,
"""\{(\w+)}""",
"""\{([\w\:]+)}""",
pathVariableToPattern,
variableToIndex
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ class ObjectStoragePathFactoryTest {
stagingPrefix = "staging/prefix",
pathSuffixPattern =
"\${NAMESPACE}/\${STREAM_NAME}/\${YEAR}/\${MONTH}/\${DAY}/\${HOUR}/\${MINUTE}/\${SECOND}/\${MILLISECOND}/\${EPOCH}/",
fileNamePattern = "{date}-{timestamp}-{part_number}-{sync_id}{format_extension}",
fileNamePattern =
"{date}-{date:yyyy_MM}-{timestamp}-{part_number}-{sync_id}{format_extension}",
usesStagingDirectory = true
)
}
Expand Down Expand Up @@ -125,7 +126,7 @@ class ObjectStoragePathFactoryTest {
val stream1 = MockDestinationCatalogFactory.stream1
val (namespace, name) = stream1.descriptor
val prefixOnly = "prefix/$namespace/$name/2020/01/02/03/04/05/0678/$syncTime/"
val fileName = "2020_01_02-$wallTime-173-42.jsonl.gz"
val fileName = "2020_01_02-2020_01-$wallTime-173-42.jsonl.gz"
Assertions.assertEquals(
"staging/$prefixOnly",
pathFactory.getStagingDirectory(stream1).toString(),
Expand Down Expand Up @@ -153,7 +154,7 @@ class ObjectStoragePathFactoryTest {
val stream1 = MockDestinationCatalogFactory.stream1
val (namespace, name) = stream1.descriptor
val expectedToMatch =
"prefix/$namespace/$name/2020/01/02/03/04/05/0678/$syncTime/2020_01_02-1577934245678-173-42.jsonl.gz"
"prefix/$namespace/$name/2020/01/02/03/04/05/0678/$syncTime/2020_01_02-2020_01-1577934245678-173-42.jsonl.gz"
val match = pathFactory.getPathMatcher(stream1).match(expectedToMatch)
Assertions.assertTrue(match != null)
Assertions.assertTrue(match?.partNumber == 173L)
Expand All @@ -170,7 +171,7 @@ class ObjectStoragePathFactoryTest {
val emptyNamespaceStream =
stream1.copy(descriptor = stream1.descriptor.copy(namespace = null))
val expectedToMatch =
"prefix/$name/2020/01/02/03/04/05/0678/$epochMilli/2020_01_02-1577934245678-173-42.jsonl.gz"
"prefix/$name/2020/01/02/03/04/05/0678/$epochMilli/2020_01_02-2020_01-1577934245678-173-42.jsonl.gz"
val match = pathFactory.getPathMatcher(emptyNamespaceStream).match(expectedToMatch)
Assertions.assertTrue(match != null)
Assertions.assertTrue(match?.partNumber == 173L)
Expand Down Expand Up @@ -231,7 +232,7 @@ class ObjectStoragePathFactoryTest {
val stream1 = MockDestinationCatalogFactory.stream1
val (namespace, name) = stream1.descriptor
val prefixOnly = "prefix/$namespace/$name/2020/01/02/03/04/05/0678/$syncTime/"
val fileName = "2020_01_02-$wallTime-173-42.jsonl.gz"
val fileName = "2020_01_02-2020_01-$wallTime-173-42.jsonl.gz"
Assertions.assertEquals(
prefixOnly,
pathFactory.getFinalDirectory(stream1),
Expand Down Expand Up @@ -270,7 +271,7 @@ class ObjectStoragePathFactoryTest {
val stream1 = MockDestinationCatalogFactory.stream1
val (namespace, name) = stream1.descriptor
val prefixOnly = "prefix/$namespace/$name/2020/01/02/03/04/05/0678/${syncTime}_"
val fileName = "2020_01_02-$wallTime-173-42.jsonl.gz"
val fileName = "2020_01_02-2020_01-$wallTime-173-42.jsonl.gz"
Assertions.assertEquals(
prefixOnly,
pathFactory.getFinalDirectory(stream1),
Expand Down

0 comments on commit 4add9ca

Please sign in to comment.