diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/json/JsonSchemaToAirbyteType.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/json/JsonSchemaToAirbyteType.kt index 7d0201c84c3f..932382b89db3 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/json/JsonSchemaToAirbyteType.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/json/JsonSchemaToAirbyteType.kt @@ -22,6 +22,7 @@ class JsonSchemaToAirbyteType { when (schema.get("type").asText()) { "string" -> fromString(schema) "boolean" -> BooleanType + "int", "integer" -> IntegerType "number" -> fromNumber(schema) "array" -> fromArray(schema) diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessageDeserializer.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessageDeserializer.kt index a25526654803..afd5e8978090 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessageDeserializer.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessageDeserializer.kt @@ -25,7 +25,28 @@ class DefaultDestinationMessageDeserializer(private val messageFactory: Destinat val airbyteMessage = serialized.deserializeToClass(AirbyteMessage::class.java) return messageFactory.fromAirbyteMessage(airbyteMessage, serialized) } catch (t: Throwable) { - throw RuntimeException("Failed to deserialize AirbyteMessage") + /** + * We don't want to expose client data, but we'd like to get as much info as we can + * about these malformed messages. + */ + val type = + if (serialized.contains("RECORD")) { + "record" + } else if (serialized.contains("STATE")) { + "state" + } else if (serialized.contains("TRACE")) { + if (serialized.contains("STATUS", ignoreCase = true)) { + "status" + } else { + "trace" + } + } else { + "unknown" + } + + throw RuntimeException( + "Failed to deserialize airbyte message (type=$type; length=${serialized.length}; reason=${t.javaClass})" + ) } } } diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/json/JsonSchemaToAirbyteSchemaTypeTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/json/JsonSchemaToAirbyteSchemaTypeTest.kt index 4591a6258e57..59e8ccd33bd8 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/json/JsonSchemaToAirbyteSchemaTypeTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/json/JsonSchemaToAirbyteSchemaTypeTest.kt @@ -52,6 +52,14 @@ class JsonSchemaToAirbyteSchemaTypeTest { Assertions.assertTrue(airbyteType is IntegerType) } + /** Note: this is nonstandard, but some sources apparently use it. */ + @Test + fun testInt() { + val integerType = ofType("int") + val airbyteType = JsonSchemaToAirbyteType().convert(integerType) + Assertions.assertTrue(airbyteType is IntegerType) + } + @Test fun testNumber() { val numberType = ofType("number") diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStoragePathFactory.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStoragePathFactory.kt index c1c89e55799d..3dbbc18911f8 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStoragePathFactory.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStoragePathFactory.kt @@ -223,6 +223,9 @@ class ObjectStoragePathFactory( FileVariable("date", """\d{4}_\d{2}_\d{2}""") { DATE_FORMATTER.format(it.syncTime) }, + FileVariable("date:yyyy_MM", """\d{4}_\d{2}""") { + DATE_FORMATTER.format(it.syncTime).substring(0, 7) + }, FileVariable("timestamp", """\d+""") { // NOTE: We use a constant time for the path but wall time for the files it.currentTimeProvider.currentTimeMillis().toString() @@ -381,7 +384,7 @@ class ObjectStoragePathFactory( val replacedForFile = buildPattern( filePatternResolved, - """\{(\w+)}""", + """\{([\w\:]+)}""", pathVariableToPattern, variableToIndex ) diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStoragePathFactoryTest.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStoragePathFactoryTest.kt index b4a138bcd9ec..a7739bd3e06f 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStoragePathFactoryTest.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStoragePathFactoryTest.kt @@ -60,7 +60,8 @@ class ObjectStoragePathFactoryTest { stagingPrefix = "staging/prefix", pathSuffixPattern = "\${NAMESPACE}/\${STREAM_NAME}/\${YEAR}/\${MONTH}/\${DAY}/\${HOUR}/\${MINUTE}/\${SECOND}/\${MILLISECOND}/\${EPOCH}/", - fileNamePattern = "{date}-{timestamp}-{part_number}-{sync_id}{format_extension}", + fileNamePattern = + "{date}-{date:yyyy_MM}-{timestamp}-{part_number}-{sync_id}{format_extension}", usesStagingDirectory = true ) } @@ -125,7 +126,7 @@ class ObjectStoragePathFactoryTest { val stream1 = MockDestinationCatalogFactory.stream1 val (namespace, name) = stream1.descriptor val prefixOnly = "prefix/$namespace/$name/2020/01/02/03/04/05/0678/$syncTime/" - val fileName = "2020_01_02-$wallTime-173-42.jsonl.gz" + val fileName = "2020_01_02-2020_01-$wallTime-173-42.jsonl.gz" Assertions.assertEquals( "staging/$prefixOnly", pathFactory.getStagingDirectory(stream1).toString(), @@ -153,7 +154,7 @@ class ObjectStoragePathFactoryTest { val stream1 = MockDestinationCatalogFactory.stream1 val (namespace, name) = stream1.descriptor val expectedToMatch = - "prefix/$namespace/$name/2020/01/02/03/04/05/0678/$syncTime/2020_01_02-1577934245678-173-42.jsonl.gz" + "prefix/$namespace/$name/2020/01/02/03/04/05/0678/$syncTime/2020_01_02-2020_01-1577934245678-173-42.jsonl.gz" val match = pathFactory.getPathMatcher(stream1).match(expectedToMatch) Assertions.assertTrue(match != null) Assertions.assertTrue(match?.partNumber == 173L) @@ -170,7 +171,7 @@ class ObjectStoragePathFactoryTest { val emptyNamespaceStream = stream1.copy(descriptor = stream1.descriptor.copy(namespace = null)) val expectedToMatch = - "prefix/$name/2020/01/02/03/04/05/0678/$epochMilli/2020_01_02-1577934245678-173-42.jsonl.gz" + "prefix/$name/2020/01/02/03/04/05/0678/$epochMilli/2020_01_02-2020_01-1577934245678-173-42.jsonl.gz" val match = pathFactory.getPathMatcher(emptyNamespaceStream).match(expectedToMatch) Assertions.assertTrue(match != null) Assertions.assertTrue(match?.partNumber == 173L) @@ -231,7 +232,7 @@ class ObjectStoragePathFactoryTest { val stream1 = MockDestinationCatalogFactory.stream1 val (namespace, name) = stream1.descriptor val prefixOnly = "prefix/$namespace/$name/2020/01/02/03/04/05/0678/$syncTime/" - val fileName = "2020_01_02-$wallTime-173-42.jsonl.gz" + val fileName = "2020_01_02-2020_01-$wallTime-173-42.jsonl.gz" Assertions.assertEquals( prefixOnly, pathFactory.getFinalDirectory(stream1), @@ -270,7 +271,7 @@ class ObjectStoragePathFactoryTest { val stream1 = MockDestinationCatalogFactory.stream1 val (namespace, name) = stream1.descriptor val prefixOnly = "prefix/$namespace/$name/2020/01/02/03/04/05/0678/${syncTime}_" - val fileName = "2020_01_02-$wallTime-173-42.jsonl.gz" + val fileName = "2020_01_02-2020_01-$wallTime-173-42.jsonl.gz" Assertions.assertEquals( prefixOnly, pathFactory.getFinalDirectory(stream1),