diff --git a/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputRow.java b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputRow.java index acf909452a0b..442412dd154f 100644 --- a/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputRow.java +++ b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputRow.java @@ -19,6 +19,10 @@ package org.apache.druid.delta.input; +import io.delta.kernel.data.ArrayValue; +import io.delta.kernel.data.MapValue; +import io.delta.kernel.internal.util.VectorUtils; +import io.delta.kernel.types.ArrayType; import io.delta.kernel.types.BinaryType; import io.delta.kernel.types.BooleanType; import io.delta.kernel.types.ByteType; @@ -29,6 +33,7 @@ import io.delta.kernel.types.FloatType; import io.delta.kernel.types.IntegerType; import io.delta.kernel.types.LongType; +import io.delta.kernel.types.MapType; import io.delta.kernel.types.ShortType; import io.delta.kernel.types.StringType; import io.delta.kernel.types.StructField; @@ -197,6 +202,15 @@ private static Object getValue(DataType dataType, io.delta.kernel.data.Row dataR return String.valueOf(charArray); } else if (dataType instanceof DecimalType) { return dataRow.getDecimal(columnOrdinal).longValue(); + } else if (dataType instanceof StructType) { + final io.delta.kernel.data.Row structRow = dataRow.getStruct(columnOrdinal); + return RowSerde.convertRowToJsonObject(structRow); + } else if (dataType instanceof ArrayType) { + final ArrayValue arrayRow = dataRow.getArray(columnOrdinal); + return VectorUtils.toJavaList(arrayRow); + } else if (dataType instanceof MapType) { + final MapValue map = dataRow.getMap(columnOrdinal); + return VectorUtils.toJavaMap(map); } else { throw InvalidInput.exception( "Unsupported data type[%s] for fieldName[%s].", diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/ComplexTypesDeltaTable.java b/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/ComplexTypesDeltaTable.java new file mode 100644 index 000000000000..7fdffc03041c --- /dev/null +++ b/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/ComplexTypesDeltaTable.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.delta.input; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.data.input.ColumnsFilter; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.segment.AutoTypeColumnSchema; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Refer to extensions-contrib/druid-deltalake-extensions/src/test/resources/README.md to generate the + * sample complex types Delta Lake table used in the unit tests. + * + */ +public class ComplexTypesDeltaTable +{ + /** + * The Delta table path used by unit tests. + */ + public static final String DELTA_TABLE_PATH = "src/test/resources/complex-types-table"; + + /** + * The list of dimensions in the Delta table {@link #DELTA_TABLE_PATH}. + */ + public static final List DIMENSIONS = ImmutableList.of( + "id", + "array_info", + "struct_info", + "nested_struct_info", + "map_info" + ); + + /** + * The expected set of rows from the first checkpoint file {@code {@link #DELTA_TABLE_PATH}/_delta_log/00000000000000000000.json} + */ + private static final List> SPLIT_0_EXPECTED_ROWS = new ArrayList<>( + ImmutableList.of( + ImmutableMap.of( + "id", 0L, + "array_info", ImmutableList.of(0, 1, 2, 3), + "struct_info", ImmutableMap.of("id", 0L, "name", "0"), + "nested_struct_info", ImmutableMap.of("id", 0L, "name", "0", "nested", ImmutableMap.of("nested_int", 0, "nested_double", 1.0)), + "map_info", ImmutableMap.of("key1", 1.0f, "key2", 1.0f) + ), + ImmutableMap.of( + "id", 1L, + "array_info", ImmutableList.of(1, 2, 3, 4), + "struct_info", ImmutableMap.of("id", 1L, "name", "1"), + "nested_struct_info", ImmutableMap.of("id", 1L, "name", "1", "nested", ImmutableMap.of("nested_int", 1, "nested_double", 2.0)), + "map_info", ImmutableMap.of("key1", 2.0f, "key2", 2.0f) + ), + ImmutableMap.of( + "id", 2L, + "array_info", ImmutableList.of(2, 3, 4, 5), + "struct_info", ImmutableMap.of("id", 2L, "name", "2"), + "nested_struct_info", ImmutableMap.of("id", 2L, "name", "2", "nested", ImmutableMap.of("nested_int", 2, "nested_double", 3.0)), + "map_info", ImmutableMap.of("key1", 3.0f, "key2", 3.0f) + ), + ImmutableMap.of( + "id", 3L, + "array_info", ImmutableList.of(3, 4, 5, 6), + "struct_info", ImmutableMap.of("id", 3L, "name", "3"), + "nested_struct_info", ImmutableMap.of("id", 3L, "name", "3", "nested", ImmutableMap.of("nested_int", 3, "nested_double", 4.0)), + "map_info", ImmutableMap.of("key1", 4.0f, "key2", 4.0f) + ), + ImmutableMap.of( + "id", 4L, + "array_info", ImmutableList.of(4, 5, 6, 7), + "struct_info", ImmutableMap.of("id", 4L, "name", "4"), + "nested_struct_info", ImmutableMap.of("id", 4L, "name", "4", "nested", ImmutableMap.of("nested_int", 4, "nested_double", 5.0)), + "map_info", ImmutableMap.of("key1", 5.0f, "key2", 5.0f) + ) + ) + ); + + /** + * Mapping of checkpoint file identifier to the list of expected rows in that checkpoint. + */ + public static final Map>> SPLIT_TO_EXPECTED_ROWS = new HashMap<>( + ImmutableMap.of( + 0, SPLIT_0_EXPECTED_ROWS + ) + ); + + /** + * Complete set of expected rows across all checkpoint files for {@link #DELTA_TABLE_PATH}. + */ + public static final List> EXPECTED_ROWS = SPLIT_TO_EXPECTED_ROWS.values().stream() + .flatMap(List::stream) + .collect(Collectors.toList()); + + /** + * The Druid schema used for ingestion of {@link #DELTA_TABLE_PATH}. + */ + public static final InputRowSchema FULL_SCHEMA = new InputRowSchema( + new TimestampSpec("na", "posix", DateTimes.of("2024-01-01")), + new DimensionsSpec( + ImmutableList.of( + new AutoTypeColumnSchema("id", null), + new AutoTypeColumnSchema("array_info", null), + new AutoTypeColumnSchema("struct_info", null), + new AutoTypeColumnSchema("nested_struct_info", null), + new AutoTypeColumnSchema("map_info", null) + ) + ), + ColumnsFilter.all() + ); +} diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaInputRowTest.java b/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaInputRowTest.java index 4e1c2566f02e..4c1b57c434a2 100644 --- a/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaInputRowTest.java +++ b/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaInputRowTest.java @@ -54,7 +54,8 @@ public static Collection data() { Object[][] data = new Object[][]{ {NonPartitionedDeltaTable.DELTA_TABLE_PATH, NonPartitionedDeltaTable.FULL_SCHEMA, NonPartitionedDeltaTable.DIMENSIONS, NonPartitionedDeltaTable.EXPECTED_ROWS}, - {PartitionedDeltaTable.DELTA_TABLE_PATH, PartitionedDeltaTable.FULL_SCHEMA, PartitionedDeltaTable.DIMENSIONS, PartitionedDeltaTable.EXPECTED_ROWS} + {PartitionedDeltaTable.DELTA_TABLE_PATH, PartitionedDeltaTable.FULL_SCHEMA, PartitionedDeltaTable.DIMENSIONS, PartitionedDeltaTable.EXPECTED_ROWS}, + {ComplexTypesDeltaTable.DELTA_TABLE_PATH, ComplexTypesDeltaTable.FULL_SCHEMA, ComplexTypesDeltaTable.DIMENSIONS, ComplexTypesDeltaTable.EXPECTED_ROWS} }; return Arrays.asList(data); } @@ -116,7 +117,7 @@ public void testDeltaInputRow( } } } - Assert.assertEquals(NonPartitionedDeltaTable.EXPECTED_ROWS.size(), totalRecordCount); + Assert.assertEquals(expectedRows.size(), totalRecordCount); } @MethodSource("data") diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaInputSourceTest.java b/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaInputSourceTest.java index 3fe42676498e..e6bcf9f5fc87 100644 --- a/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaInputSourceTest.java +++ b/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaInputSourceTest.java @@ -84,6 +84,11 @@ public static Object[][] data() PartitionedDeltaTable.DELTA_TABLE_PATH, PartitionedDeltaTable.FULL_SCHEMA, PartitionedDeltaTable.EXPECTED_ROWS + }, + { + ComplexTypesDeltaTable.DELTA_TABLE_PATH, + ComplexTypesDeltaTable.FULL_SCHEMA, + ComplexTypesDeltaTable.EXPECTED_ROWS } }; } diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/resources/README.md b/extensions-contrib/druid-deltalake-extensions/src/test/resources/README.md index f1ac54fb8b30..f45b33ab62cf 100644 --- a/extensions-contrib/druid-deltalake-extensions/src/test/resources/README.md +++ b/extensions-contrib/druid-deltalake-extensions/src/test/resources/README.md @@ -84,3 +84,14 @@ python3 create_delta_table.py --save_path=employee-delta-table-partitioned-name The resulting Delta table is checked in to the repo. The expectated rows to be used in tests are updated in `PartitionedDeltaTable.java` accordingly. + +### Complex types table `complex-types-table`: + +The test data in `resources/complex-types-table` contains 5 Delta records generated with 1 snapshot. +The table was generated by running the following commands: +```shell +python3 create_delta_table.py --save_path=complex-types-table --num_records=5 --gen_complex_types=True +``` + +The resulting Delta table is checked in to the repo. The expectated rows to be used in tests are updated in +`ComplexTypesDeltaTable.java` accordingly. diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/resources/complex-types-table/.part-00000-f4353008-5e85-4a53-9b74-0cc7b853103a-c000.snappy.parquet.crc b/extensions-contrib/druid-deltalake-extensions/src/test/resources/complex-types-table/.part-00000-f4353008-5e85-4a53-9b74-0cc7b853103a-c000.snappy.parquet.crc new file mode 100644 index 000000000000..a56f68f447ba Binary files /dev/null and b/extensions-contrib/druid-deltalake-extensions/src/test/resources/complex-types-table/.part-00000-f4353008-5e85-4a53-9b74-0cc7b853103a-c000.snappy.parquet.crc differ diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/resources/complex-types-table/.part-00001-01efecb8-5771-4e91-834e-2a1cb6601eb8-c000.snappy.parquet.crc b/extensions-contrib/druid-deltalake-extensions/src/test/resources/complex-types-table/.part-00001-01efecb8-5771-4e91-834e-2a1cb6601eb8-c000.snappy.parquet.crc new file mode 100644 index 000000000000..6b7e86bcf54a Binary files /dev/null and b/extensions-contrib/druid-deltalake-extensions/src/test/resources/complex-types-table/.part-00001-01efecb8-5771-4e91-834e-2a1cb6601eb8-c000.snappy.parquet.crc differ diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/resources/complex-types-table/.part-00003-383f5a97-c624-4ef3-82a4-f3f273308e53-c000.snappy.parquet.crc b/extensions-contrib/druid-deltalake-extensions/src/test/resources/complex-types-table/.part-00003-383f5a97-c624-4ef3-82a4-f3f273308e53-c000.snappy.parquet.crc new file mode 100644 index 000000000000..88b089e95034 Binary files /dev/null and b/extensions-contrib/druid-deltalake-extensions/src/test/resources/complex-types-table/.part-00003-383f5a97-c624-4ef3-82a4-f3f273308e53-c000.snappy.parquet.crc differ diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/resources/complex-types-table/.part-00005-febee455-5e89-404a-bb38-f627c47eb20b-c000.snappy.parquet.crc b/extensions-contrib/druid-deltalake-extensions/src/test/resources/complex-types-table/.part-00005-febee455-5e89-404a-bb38-f627c47eb20b-c000.snappy.parquet.crc new file mode 100644 index 000000000000..7f4972520056 Binary files /dev/null and b/extensions-contrib/druid-deltalake-extensions/src/test/resources/complex-types-table/.part-00005-febee455-5e89-404a-bb38-f627c47eb20b-c000.snappy.parquet.crc differ diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/resources/complex-types-table/.part-00007-07d88387-16f9-4141-bc77-0106e7f28f7a-c000.snappy.parquet.crc b/extensions-contrib/druid-deltalake-extensions/src/test/resources/complex-types-table/.part-00007-07d88387-16f9-4141-bc77-0106e7f28f7a-c000.snappy.parquet.crc new file mode 100644 index 000000000000..cd8fc7a087f6 Binary files /dev/null and b/extensions-contrib/druid-deltalake-extensions/src/test/resources/complex-types-table/.part-00007-07d88387-16f9-4141-bc77-0106e7f28f7a-c000.snappy.parquet.crc differ diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/resources/complex-types-table/.part-00009-73760316-7ace-43fe-b605-506c942cd969-c000.snappy.parquet.crc b/extensions-contrib/druid-deltalake-extensions/src/test/resources/complex-types-table/.part-00009-73760316-7ace-43fe-b605-506c942cd969-c000.snappy.parquet.crc new file mode 100644 index 000000000000..038082a933b7 Binary files /dev/null and b/extensions-contrib/druid-deltalake-extensions/src/test/resources/complex-types-table/.part-00009-73760316-7ace-43fe-b605-506c942cd969-c000.snappy.parquet.crc differ diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/resources/complex-types-table/_delta_log/.00000000000000000000.json.crc b/extensions-contrib/druid-deltalake-extensions/src/test/resources/complex-types-table/_delta_log/.00000000000000000000.json.crc new file mode 100644 index 000000000000..311d2a22b04d Binary files /dev/null and b/extensions-contrib/druid-deltalake-extensions/src/test/resources/complex-types-table/_delta_log/.00000000000000000000.json.crc differ diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/resources/complex-types-table/_delta_log/00000000000000000000.json b/extensions-contrib/druid-deltalake-extensions/src/test/resources/complex-types-table/_delta_log/00000000000000000000.json new file mode 100644 index 000000000000..84803f9483ca --- /dev/null +++ b/extensions-contrib/druid-deltalake-extensions/src/test/resources/complex-types-table/_delta_log/00000000000000000000.json @@ -0,0 +1,8 @@ +{"commitInfo":{"timestamp":1723511561738,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"6","numOutputRows":"5","numOutputBytes":"17937"},"engineInfo":"Apache-Spark/3.5.0 Delta-Lake/3.1.0","txnId":"b9eae5f4-d55b-4c38-b365-8228ec09248e"}} +{"metaData":{"id":"ce998219-9bde-4831-b78c-14b11f919fbe","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"array_info\",\"type\":{\"type\":\"array\",\"elementType\":\"integer\",\"containsNull\":true},\"nullable\":true,\"metadata\":{}},{\"name\":\"struct_info\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}},{\"name\":\"nested_struct_info\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"nested\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"nested_int\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"nested_double\",\"type\":\"double\",\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}},{\"name\":\"map_info\",\"type\":{\"type\":\"map\",\"keyType\":\"string\",\"valueType\":\"float\",\"valueContainsNull\":true},\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1723511559184}} +{"protocol":{"minReaderVersion":1,"minWriterVersion":2}} +{"add":{"path":"part-00001-01efecb8-5771-4e91-834e-2a1cb6601eb8-c000.snappy.parquet","partitionValues":{},"size":3288,"modificationTime":1723511561689,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":0,\"struct_info\":{\"id\":0,\"name\":\"0\"},\"nested_struct_info\":{\"id\":0,\"name\":\"0\",\"nested\":{\"nested_int\":0,\"nested_double\":1.0}}},\"maxValues\":{\"id\":0,\"struct_info\":{\"id\":0,\"name\":\"0\"},\"nested_struct_info\":{\"id\":0,\"name\":\"0\",\"nested\":{\"nested_int\":0,\"nested_double\":1.0}}},\"nullCount\":{\"id\":0,\"array_info\":0,\"struct_info\":{\"id\":0,\"name\":0},\"nested_struct_info\":{\"id\":0,\"name\":0,\"nested\":{\"nested_int\":0,\"nested_double\":0}},\"map_info\":0}}"}} +{"add":{"path":"part-00003-383f5a97-c624-4ef3-82a4-f3f273308e53-c000.snappy.parquet","partitionValues":{},"size":3291,"modificationTime":1723511561689,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":1,\"struct_info\":{\"id\":1,\"name\":\"1\"},\"nested_struct_info\":{\"id\":1,\"name\":\"1\",\"nested\":{\"nested_int\":1,\"nested_double\":2.0}}},\"maxValues\":{\"id\":1,\"struct_info\":{\"id\":1,\"name\":\"1\"},\"nested_struct_info\":{\"id\":1,\"name\":\"1\",\"nested\":{\"nested_int\":1,\"nested_double\":2.0}}},\"nullCount\":{\"id\":0,\"array_info\":0,\"struct_info\":{\"id\":0,\"name\":0},\"nested_struct_info\":{\"id\":0,\"name\":0,\"nested\":{\"nested_int\":0,\"nested_double\":0}},\"map_info\":0}}"}} +{"add":{"path":"part-00005-febee455-5e89-404a-bb38-f627c47eb20b-c000.snappy.parquet","partitionValues":{},"size":3289,"modificationTime":1723511561689,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":2,\"struct_info\":{\"id\":2,\"name\":\"2\"},\"nested_struct_info\":{\"id\":2,\"name\":\"2\",\"nested\":{\"nested_int\":2,\"nested_double\":3.0}}},\"maxValues\":{\"id\":2,\"struct_info\":{\"id\":2,\"name\":\"2\"},\"nested_struct_info\":{\"id\":2,\"name\":\"2\",\"nested\":{\"nested_int\":2,\"nested_double\":3.0}}},\"nullCount\":{\"id\":0,\"array_info\":0,\"struct_info\":{\"id\":0,\"name\":0},\"nested_struct_info\":{\"id\":0,\"name\":0,\"nested\":{\"nested_int\":0,\"nested_double\":0}},\"map_info\":0}}"}} +{"add":{"path":"part-00007-07d88387-16f9-4141-bc77-0106e7f28f7a-c000.snappy.parquet","partitionValues":{},"size":3290,"modificationTime":1723511561689,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":3,\"struct_info\":{\"id\":3,\"name\":\"3\"},\"nested_struct_info\":{\"id\":3,\"name\":\"3\",\"nested\":{\"nested_int\":3,\"nested_double\":4.0}}},\"maxValues\":{\"id\":3,\"struct_info\":{\"id\":3,\"name\":\"3\"},\"nested_struct_info\":{\"id\":3,\"name\":\"3\",\"nested\":{\"nested_int\":3,\"nested_double\":4.0}}},\"nullCount\":{\"id\":0,\"array_info\":0,\"struct_info\":{\"id\":0,\"name\":0},\"nested_struct_info\":{\"id\":0,\"name\":0,\"nested\":{\"nested_int\":0,\"nested_double\":0}},\"map_info\":0}}"}} +{"add":{"path":"part-00009-73760316-7ace-43fe-b605-506c942cd969-c000.snappy.parquet","partitionValues":{},"size":3291,"modificationTime":1723511561689,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":4,\"struct_info\":{\"id\":4,\"name\":\"4\"},\"nested_struct_info\":{\"id\":4,\"name\":\"4\",\"nested\":{\"nested_int\":4,\"nested_double\":5.0}}},\"maxValues\":{\"id\":4,\"struct_info\":{\"id\":4,\"name\":\"4\"},\"nested_struct_info\":{\"id\":4,\"name\":\"4\",\"nested\":{\"nested_int\":4,\"nested_double\":5.0}}},\"nullCount\":{\"id\":0,\"array_info\":0,\"struct_info\":{\"id\":0,\"name\":0},\"nested_struct_info\":{\"id\":0,\"name\":0,\"nested\":{\"nested_int\":0,\"nested_double\":0}},\"map_info\":0}}"}} diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/resources/complex-types-table/part-00000-f4353008-5e85-4a53-9b74-0cc7b853103a-c000.snappy.parquet b/extensions-contrib/druid-deltalake-extensions/src/test/resources/complex-types-table/part-00000-f4353008-5e85-4a53-9b74-0cc7b853103a-c000.snappy.parquet new file mode 100644 index 000000000000..7a105c0f74ff Binary files /dev/null and b/extensions-contrib/druid-deltalake-extensions/src/test/resources/complex-types-table/part-00000-f4353008-5e85-4a53-9b74-0cc7b853103a-c000.snappy.parquet differ diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/resources/complex-types-table/part-00001-01efecb8-5771-4e91-834e-2a1cb6601eb8-c000.snappy.parquet b/extensions-contrib/druid-deltalake-extensions/src/test/resources/complex-types-table/part-00001-01efecb8-5771-4e91-834e-2a1cb6601eb8-c000.snappy.parquet new file mode 100644 index 000000000000..bb2fb67389d2 Binary files /dev/null and b/extensions-contrib/druid-deltalake-extensions/src/test/resources/complex-types-table/part-00001-01efecb8-5771-4e91-834e-2a1cb6601eb8-c000.snappy.parquet differ diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/resources/complex-types-table/part-00003-383f5a97-c624-4ef3-82a4-f3f273308e53-c000.snappy.parquet b/extensions-contrib/druid-deltalake-extensions/src/test/resources/complex-types-table/part-00003-383f5a97-c624-4ef3-82a4-f3f273308e53-c000.snappy.parquet new file mode 100644 index 000000000000..641396cb6f7c Binary files /dev/null and b/extensions-contrib/druid-deltalake-extensions/src/test/resources/complex-types-table/part-00003-383f5a97-c624-4ef3-82a4-f3f273308e53-c000.snappy.parquet differ diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/resources/complex-types-table/part-00005-febee455-5e89-404a-bb38-f627c47eb20b-c000.snappy.parquet b/extensions-contrib/druid-deltalake-extensions/src/test/resources/complex-types-table/part-00005-febee455-5e89-404a-bb38-f627c47eb20b-c000.snappy.parquet new file mode 100644 index 000000000000..abee0ea5d0a9 Binary files /dev/null and b/extensions-contrib/druid-deltalake-extensions/src/test/resources/complex-types-table/part-00005-febee455-5e89-404a-bb38-f627c47eb20b-c000.snappy.parquet differ diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/resources/complex-types-table/part-00007-07d88387-16f9-4141-bc77-0106e7f28f7a-c000.snappy.parquet b/extensions-contrib/druid-deltalake-extensions/src/test/resources/complex-types-table/part-00007-07d88387-16f9-4141-bc77-0106e7f28f7a-c000.snappy.parquet new file mode 100644 index 000000000000..453c3879fa35 Binary files /dev/null and b/extensions-contrib/druid-deltalake-extensions/src/test/resources/complex-types-table/part-00007-07d88387-16f9-4141-bc77-0106e7f28f7a-c000.snappy.parquet differ diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/resources/complex-types-table/part-00009-73760316-7ace-43fe-b605-506c942cd969-c000.snappy.parquet b/extensions-contrib/druid-deltalake-extensions/src/test/resources/complex-types-table/part-00009-73760316-7ace-43fe-b605-506c942cd969-c000.snappy.parquet new file mode 100644 index 000000000000..aadf1e152c87 Binary files /dev/null and b/extensions-contrib/druid-deltalake-extensions/src/test/resources/complex-types-table/part-00009-73760316-7ace-43fe-b605-506c942cd969-c000.snappy.parquet differ diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/resources/create_delta_table.py b/extensions-contrib/druid-deltalake-extensions/src/test/resources/create_delta_table.py index 34a649773047..a116513b01dd 100755 --- a/extensions-contrib/druid-deltalake-extensions/src/test/resources/create_delta_table.py +++ b/extensions-contrib/druid-deltalake-extensions/src/test/resources/create_delta_table.py @@ -15,12 +15,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -import os - import argparse from delta import * import pyspark -from pyspark.sql.types import StructType, StructField, ShortType, StringType, TimestampType, LongType, IntegerType, DoubleType, FloatType, DateType, BooleanType +from pyspark.sql.types import MapType, StructType, StructField, ShortType, StringType, TimestampType, LongType, IntegerType, DoubleType, FloatType, DateType, BooleanType, ArrayType from datetime import datetime, timedelta import random @@ -39,6 +37,55 @@ def config_spark_with_delta_lake(): return spark +def create_dataset_with_complex_types(num_records): + """ + Create a mock dataset with records containing complex types like arrays, structs and maps. + + Parameters: + - num_records (int): Number of records to generate. + + Returns: + - Tuple: A tuple containing a list of records and the corresponding schema. + - List of Records: Each record is a tuple representing a row of data. + - StructType: The schema defining the structure of the records. + + Example: + ```python + data, schema = create_dataset_with_complex_types(10) + ``` + """ + schema = StructType([ + StructField("id", LongType(), False), + StructField("array_info", ArrayType(IntegerType(), True), True), + StructField("struct_info", StructType([ + StructField("id", LongType(), False), + StructField("name", StringType(), True) + ])), + StructField("nested_struct_info", StructType([ + StructField("id", LongType(), False), + StructField("name", StringType(), True), + StructField("nested", StructType([ + StructField("nested_int", IntegerType(), False), + StructField("nested_double", DoubleType(), True), + ])) + ])), + StructField("map_info", MapType(StringType(), FloatType())) + ]) + + data = [] + + for idx in range(num_records): + record = ( + idx, + (idx, idx + 1, idx + 2, idx + 3), + (idx, f"{idx}"), + (idx, f"{idx}", (idx, idx + 1.0)), + {"key1": idx + 1.0, "key2": idx + 1.0} + ) + data.append(record) + return data, schema + + def create_dataset(num_records): """ Generate a mock employee dataset with different datatypes for testing purposes. @@ -94,6 +141,9 @@ def main(): parser = argparse.ArgumentParser(description="Script to write a Delta Lake table.", formatter_class=argparse.ArgumentDefaultsHelpFormatter) + parser.add_argument("--gen_complex_types", type=bool, default=False, help="Generate a Delta table with records" + " containing complex types like structs," + " maps and arrays.") parser.add_argument('--save_path', default=None, required=True, help="Save path for Delta table") parser.add_argument('--save_mode', choices=('append', 'overwrite'), default="append", help="Specify write mode (append/overwrite)") @@ -103,6 +153,7 @@ def main(): args = parser.parse_args() + is_gen_complex_types = args.gen_complex_types save_mode = args.save_mode save_path = args.save_path num_records = args.num_records @@ -110,7 +161,11 @@ def main(): spark = config_spark_with_delta_lake() - data, schema = create_dataset(num_records=num_records) + if is_gen_complex_types: + data, schema = create_dataset_with_complex_types(num_records=num_records) + else: + data, schema = create_dataset(num_records=num_records) + df = spark.createDataFrame(data, schema=schema) if not partitioned_by: df.write.format("delta").mode(save_mode).save(save_path)