Skip to content

Commit

Permalink
Handle Delta StructType, ArrayType and MapType (#16884)
Browse files Browse the repository at this point in the history
Handle the following Delta complex types:
a. StructType as JSON
b. ArrayType as Java list
c. MapType as Java map

Generate and add a new Delta table complex-types-table that contains the above complex types for testing.

Update the tests to include a parameterized test with complex-types-table, with the expectations defined in ComplexTypesDeltaTable.java.
  • Loading branch information
abhishekrb19 authored Aug 13, 2024
1 parent c6da2f3 commit acadc2d
Show file tree
Hide file tree
Showing 20 changed files with 235 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@

package org.apache.druid.delta.input;

import io.delta.kernel.data.ArrayValue;
import io.delta.kernel.data.MapValue;
import io.delta.kernel.internal.util.VectorUtils;
import io.delta.kernel.types.ArrayType;
import io.delta.kernel.types.BinaryType;
import io.delta.kernel.types.BooleanType;
import io.delta.kernel.types.ByteType;
Expand All @@ -29,6 +33,7 @@
import io.delta.kernel.types.FloatType;
import io.delta.kernel.types.IntegerType;
import io.delta.kernel.types.LongType;
import io.delta.kernel.types.MapType;
import io.delta.kernel.types.ShortType;
import io.delta.kernel.types.StringType;
import io.delta.kernel.types.StructField;
Expand Down Expand Up @@ -197,6 +202,15 @@ private static Object getValue(DataType dataType, io.delta.kernel.data.Row dataR
return String.valueOf(charArray);
} else if (dataType instanceof DecimalType) {
return dataRow.getDecimal(columnOrdinal).longValue();
} else if (dataType instanceof StructType) {
final io.delta.kernel.data.Row structRow = dataRow.getStruct(columnOrdinal);
return RowSerde.convertRowToJsonObject(structRow);
} else if (dataType instanceof ArrayType) {
final ArrayValue arrayRow = dataRow.getArray(columnOrdinal);
return VectorUtils.toJavaList(arrayRow);
} else if (dataType instanceof MapType) {
final MapValue map = dataRow.getMap(columnOrdinal);
return VectorUtils.toJavaMap(map);
} else {
throw InvalidInput.exception(
"Unsupported data type[%s] for fieldName[%s].",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.delta.input;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.segment.AutoTypeColumnSchema;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
* Refer to extensions-contrib/druid-deltalake-extensions/src/test/resources/README.md to generate the
* sample complex types Delta Lake table used in the unit tests.
*
*/
public class ComplexTypesDeltaTable
{
/**
* The Delta table path used by unit tests.
*/
public static final String DELTA_TABLE_PATH = "src/test/resources/complex-types-table";

/**
* The list of dimensions in the Delta table {@link #DELTA_TABLE_PATH}.
*/
public static final List<String> DIMENSIONS = ImmutableList.of(
"id",
"array_info",
"struct_info",
"nested_struct_info",
"map_info"
);

/**
* The expected set of rows from the first checkpoint file {@code {@link #DELTA_TABLE_PATH}/_delta_log/00000000000000000000.json}
*/
private static final List<Map<String, Object>> SPLIT_0_EXPECTED_ROWS = new ArrayList<>(
ImmutableList.of(
ImmutableMap.of(
"id", 0L,
"array_info", ImmutableList.of(0, 1, 2, 3),
"struct_info", ImmutableMap.of("id", 0L, "name", "0"),
"nested_struct_info", ImmutableMap.of("id", 0L, "name", "0", "nested", ImmutableMap.of("nested_int", 0, "nested_double", 1.0)),
"map_info", ImmutableMap.of("key1", 1.0f, "key2", 1.0f)
),
ImmutableMap.of(
"id", 1L,
"array_info", ImmutableList.of(1, 2, 3, 4),
"struct_info", ImmutableMap.of("id", 1L, "name", "1"),
"nested_struct_info", ImmutableMap.of("id", 1L, "name", "1", "nested", ImmutableMap.of("nested_int", 1, "nested_double", 2.0)),
"map_info", ImmutableMap.of("key1", 2.0f, "key2", 2.0f)
),
ImmutableMap.of(
"id", 2L,
"array_info", ImmutableList.of(2, 3, 4, 5),
"struct_info", ImmutableMap.of("id", 2L, "name", "2"),
"nested_struct_info", ImmutableMap.of("id", 2L, "name", "2", "nested", ImmutableMap.of("nested_int", 2, "nested_double", 3.0)),
"map_info", ImmutableMap.of("key1", 3.0f, "key2", 3.0f)
),
ImmutableMap.of(
"id", 3L,
"array_info", ImmutableList.of(3, 4, 5, 6),
"struct_info", ImmutableMap.of("id", 3L, "name", "3"),
"nested_struct_info", ImmutableMap.of("id", 3L, "name", "3", "nested", ImmutableMap.of("nested_int", 3, "nested_double", 4.0)),
"map_info", ImmutableMap.of("key1", 4.0f, "key2", 4.0f)
),
ImmutableMap.of(
"id", 4L,
"array_info", ImmutableList.of(4, 5, 6, 7),
"struct_info", ImmutableMap.of("id", 4L, "name", "4"),
"nested_struct_info", ImmutableMap.of("id", 4L, "name", "4", "nested", ImmutableMap.of("nested_int", 4, "nested_double", 5.0)),
"map_info", ImmutableMap.of("key1", 5.0f, "key2", 5.0f)
)
)
);

/**
* Mapping of checkpoint file identifier to the list of expected rows in that checkpoint.
*/
public static final Map<Integer, List<Map<String, Object>>> SPLIT_TO_EXPECTED_ROWS = new HashMap<>(
ImmutableMap.of(
0, SPLIT_0_EXPECTED_ROWS
)
);

/**
* Complete set of expected rows across all checkpoint files for {@link #DELTA_TABLE_PATH}.
*/
public static final List<Map<String, Object>> EXPECTED_ROWS = SPLIT_TO_EXPECTED_ROWS.values().stream()
.flatMap(List::stream)
.collect(Collectors.toList());

/**
* The Druid schema used for ingestion of {@link #DELTA_TABLE_PATH}.
*/
public static final InputRowSchema FULL_SCHEMA = new InputRowSchema(
new TimestampSpec("na", "posix", DateTimes.of("2024-01-01")),
new DimensionsSpec(
ImmutableList.of(
new AutoTypeColumnSchema("id", null),
new AutoTypeColumnSchema("array_info", null),
new AutoTypeColumnSchema("struct_info", null),
new AutoTypeColumnSchema("nested_struct_info", null),
new AutoTypeColumnSchema("map_info", null)
)
),
ColumnsFilter.all()
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ public static Collection<Object[]> data()
{
Object[][] data = new Object[][]{
{NonPartitionedDeltaTable.DELTA_TABLE_PATH, NonPartitionedDeltaTable.FULL_SCHEMA, NonPartitionedDeltaTable.DIMENSIONS, NonPartitionedDeltaTable.EXPECTED_ROWS},
{PartitionedDeltaTable.DELTA_TABLE_PATH, PartitionedDeltaTable.FULL_SCHEMA, PartitionedDeltaTable.DIMENSIONS, PartitionedDeltaTable.EXPECTED_ROWS}
{PartitionedDeltaTable.DELTA_TABLE_PATH, PartitionedDeltaTable.FULL_SCHEMA, PartitionedDeltaTable.DIMENSIONS, PartitionedDeltaTable.EXPECTED_ROWS},
{ComplexTypesDeltaTable.DELTA_TABLE_PATH, ComplexTypesDeltaTable.FULL_SCHEMA, ComplexTypesDeltaTable.DIMENSIONS, ComplexTypesDeltaTable.EXPECTED_ROWS}
};
return Arrays.asList(data);
}
Expand Down Expand Up @@ -116,7 +117,7 @@ public void testDeltaInputRow(
}
}
}
Assert.assertEquals(NonPartitionedDeltaTable.EXPECTED_ROWS.size(), totalRecordCount);
Assert.assertEquals(expectedRows.size(), totalRecordCount);
}

@MethodSource("data")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ public static Object[][] data()
PartitionedDeltaTable.DELTA_TABLE_PATH,
PartitionedDeltaTable.FULL_SCHEMA,
PartitionedDeltaTable.EXPECTED_ROWS
},
{
ComplexTypesDeltaTable.DELTA_TABLE_PATH,
ComplexTypesDeltaTable.FULL_SCHEMA,
ComplexTypesDeltaTable.EXPECTED_ROWS
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,14 @@ python3 create_delta_table.py --save_path=employee-delta-table-partitioned-name

The resulting Delta table is checked in to the repo. The expectated rows to be used in tests are updated in
`PartitionedDeltaTable.java` accordingly.

### Complex types table `complex-types-table`:

The test data in `resources/complex-types-table` contains 5 Delta records generated with 1 snapshot.
The table was generated by running the following commands:
```shell
python3 create_delta_table.py --save_path=complex-types-table --num_records=5 --gen_complex_types=True
```

The resulting Delta table is checked in to the repo. The expectated rows to be used in tests are updated in
`ComplexTypesDeltaTable.java` accordingly.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{"commitInfo":{"timestamp":1723511561738,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"6","numOutputRows":"5","numOutputBytes":"17937"},"engineInfo":"Apache-Spark/3.5.0 Delta-Lake/3.1.0","txnId":"b9eae5f4-d55b-4c38-b365-8228ec09248e"}}
{"metaData":{"id":"ce998219-9bde-4831-b78c-14b11f919fbe","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"array_info\",\"type\":{\"type\":\"array\",\"elementType\":\"integer\",\"containsNull\":true},\"nullable\":true,\"metadata\":{}},{\"name\":\"struct_info\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}},{\"name\":\"nested_struct_info\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"nested\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"nested_int\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"nested_double\",\"type\":\"double\",\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}},{\"name\":\"map_info\",\"type\":{\"type\":\"map\",\"keyType\":\"string\",\"valueType\":\"float\",\"valueContainsNull\":true},\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1723511559184}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"add":{"path":"part-00001-01efecb8-5771-4e91-834e-2a1cb6601eb8-c000.snappy.parquet","partitionValues":{},"size":3288,"modificationTime":1723511561689,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":0,\"struct_info\":{\"id\":0,\"name\":\"0\"},\"nested_struct_info\":{\"id\":0,\"name\":\"0\",\"nested\":{\"nested_int\":0,\"nested_double\":1.0}}},\"maxValues\":{\"id\":0,\"struct_info\":{\"id\":0,\"name\":\"0\"},\"nested_struct_info\":{\"id\":0,\"name\":\"0\",\"nested\":{\"nested_int\":0,\"nested_double\":1.0}}},\"nullCount\":{\"id\":0,\"array_info\":0,\"struct_info\":{\"id\":0,\"name\":0},\"nested_struct_info\":{\"id\":0,\"name\":0,\"nested\":{\"nested_int\":0,\"nested_double\":0}},\"map_info\":0}}"}}
{"add":{"path":"part-00003-383f5a97-c624-4ef3-82a4-f3f273308e53-c000.snappy.parquet","partitionValues":{},"size":3291,"modificationTime":1723511561689,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":1,\"struct_info\":{\"id\":1,\"name\":\"1\"},\"nested_struct_info\":{\"id\":1,\"name\":\"1\",\"nested\":{\"nested_int\":1,\"nested_double\":2.0}}},\"maxValues\":{\"id\":1,\"struct_info\":{\"id\":1,\"name\":\"1\"},\"nested_struct_info\":{\"id\":1,\"name\":\"1\",\"nested\":{\"nested_int\":1,\"nested_double\":2.0}}},\"nullCount\":{\"id\":0,\"array_info\":0,\"struct_info\":{\"id\":0,\"name\":0},\"nested_struct_info\":{\"id\":0,\"name\":0,\"nested\":{\"nested_int\":0,\"nested_double\":0}},\"map_info\":0}}"}}
{"add":{"path":"part-00005-febee455-5e89-404a-bb38-f627c47eb20b-c000.snappy.parquet","partitionValues":{},"size":3289,"modificationTime":1723511561689,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":2,\"struct_info\":{\"id\":2,\"name\":\"2\"},\"nested_struct_info\":{\"id\":2,\"name\":\"2\",\"nested\":{\"nested_int\":2,\"nested_double\":3.0}}},\"maxValues\":{\"id\":2,\"struct_info\":{\"id\":2,\"name\":\"2\"},\"nested_struct_info\":{\"id\":2,\"name\":\"2\",\"nested\":{\"nested_int\":2,\"nested_double\":3.0}}},\"nullCount\":{\"id\":0,\"array_info\":0,\"struct_info\":{\"id\":0,\"name\":0},\"nested_struct_info\":{\"id\":0,\"name\":0,\"nested\":{\"nested_int\":0,\"nested_double\":0}},\"map_info\":0}}"}}
{"add":{"path":"part-00007-07d88387-16f9-4141-bc77-0106e7f28f7a-c000.snappy.parquet","partitionValues":{},"size":3290,"modificationTime":1723511561689,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":3,\"struct_info\":{\"id\":3,\"name\":\"3\"},\"nested_struct_info\":{\"id\":3,\"name\":\"3\",\"nested\":{\"nested_int\":3,\"nested_double\":4.0}}},\"maxValues\":{\"id\":3,\"struct_info\":{\"id\":3,\"name\":\"3\"},\"nested_struct_info\":{\"id\":3,\"name\":\"3\",\"nested\":{\"nested_int\":3,\"nested_double\":4.0}}},\"nullCount\":{\"id\":0,\"array_info\":0,\"struct_info\":{\"id\":0,\"name\":0},\"nested_struct_info\":{\"id\":0,\"name\":0,\"nested\":{\"nested_int\":0,\"nested_double\":0}},\"map_info\":0}}"}}
{"add":{"path":"part-00009-73760316-7ace-43fe-b605-506c942cd969-c000.snappy.parquet","partitionValues":{},"size":3291,"modificationTime":1723511561689,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":4,\"struct_info\":{\"id\":4,\"name\":\"4\"},\"nested_struct_info\":{\"id\":4,\"name\":\"4\",\"nested\":{\"nested_int\":4,\"nested_double\":5.0}}},\"maxValues\":{\"id\":4,\"struct_info\":{\"id\":4,\"name\":\"4\"},\"nested_struct_info\":{\"id\":4,\"name\":\"4\",\"nested\":{\"nested_int\":4,\"nested_double\":5.0}}},\"nullCount\":{\"id\":0,\"array_info\":0,\"struct_info\":{\"id\":0,\"name\":0},\"nested_struct_info\":{\"id\":0,\"name\":0,\"nested\":{\"nested_int\":0,\"nested_double\":0}},\"map_info\":0}}"}}
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import os

import argparse
from delta import *
import pyspark
from pyspark.sql.types import StructType, StructField, ShortType, StringType, TimestampType, LongType, IntegerType, DoubleType, FloatType, DateType, BooleanType
from pyspark.sql.types import MapType, StructType, StructField, ShortType, StringType, TimestampType, LongType, IntegerType, DoubleType, FloatType, DateType, BooleanType, ArrayType
from datetime import datetime, timedelta
import random

Expand All @@ -39,6 +37,55 @@ def config_spark_with_delta_lake():
return spark


def create_dataset_with_complex_types(num_records):
"""
Create a mock dataset with records containing complex types like arrays, structs and maps.
Parameters:
- num_records (int): Number of records to generate.
Returns:
- Tuple: A tuple containing a list of records and the corresponding schema.
- List of Records: Each record is a tuple representing a row of data.
- StructType: The schema defining the structure of the records.
Example:
```python
data, schema = create_dataset_with_complex_types(10)
```
"""
schema = StructType([
StructField("id", LongType(), False),
StructField("array_info", ArrayType(IntegerType(), True), True),
StructField("struct_info", StructType([
StructField("id", LongType(), False),
StructField("name", StringType(), True)
])),
StructField("nested_struct_info", StructType([
StructField("id", LongType(), False),
StructField("name", StringType(), True),
StructField("nested", StructType([
StructField("nested_int", IntegerType(), False),
StructField("nested_double", DoubleType(), True),
]))
])),
StructField("map_info", MapType(StringType(), FloatType()))
])

data = []

for idx in range(num_records):
record = (
idx,
(idx, idx + 1, idx + 2, idx + 3),
(idx, f"{idx}"),
(idx, f"{idx}", (idx, idx + 1.0)),
{"key1": idx + 1.0, "key2": idx + 1.0}
)
data.append(record)
return data, schema


def create_dataset(num_records):
"""
Generate a mock employee dataset with different datatypes for testing purposes.
Expand Down Expand Up @@ -94,6 +141,9 @@ def main():
parser = argparse.ArgumentParser(description="Script to write a Delta Lake table.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter)

parser.add_argument("--gen_complex_types", type=bool, default=False, help="Generate a Delta table with records"
" containing complex types like structs,"
" maps and arrays.")
parser.add_argument('--save_path', default=None, required=True, help="Save path for Delta table")
parser.add_argument('--save_mode', choices=('append', 'overwrite'), default="append",
help="Specify write mode (append/overwrite)")
Expand All @@ -103,14 +153,19 @@ def main():

args = parser.parse_args()

is_gen_complex_types = args.gen_complex_types
save_mode = args.save_mode
save_path = args.save_path
num_records = args.num_records
partitioned_by = args.partitioned_by

spark = config_spark_with_delta_lake()

data, schema = create_dataset(num_records=num_records)
if is_gen_complex_types:
data, schema = create_dataset_with_complex_types(num_records=num_records)
else:
data, schema = create_dataset(num_records=num_records)

df = spark.createDataFrame(data, schema=schema)
if not partitioned_by:
df.write.format("delta").mode(save_mode).save(save_path)
Expand Down

0 comments on commit acadc2d

Please sign in to comment.