Skip to content

Commit

Permalink
feat: create known arrow schemas form kernel types
Browse files Browse the repository at this point in the history
  • Loading branch information
roeap committed Nov 11, 2023
1 parent 76471e1 commit 6a99a5e
Show file tree
Hide file tree
Showing 7 changed files with 173 additions and 312 deletions.
18 changes: 12 additions & 6 deletions crates/deltalake-core/src/kernel/actions/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ pub mod checkpoint;
pub mod schemas;
pub mod snapshot;

const MAP_KEYS_NAME: &str = "keys";
const MAP_VALUES_NAME: &str = "values";

impl TryFrom<&StructType> for ArrowSchema {
type Error = ArrowError;

Expand Down Expand Up @@ -68,9 +71,9 @@ impl TryFrom<&MapType> for ArrowField {
"entries",
ArrowDataType::Struct(
vec![
ArrowField::new("key", ArrowDataType::try_from(a.key_type())?, false),
ArrowField::new(MAP_KEYS_NAME, ArrowDataType::try_from(a.key_type())?, false),
ArrowField::new(
"value",
MAP_VALUES_NAME,
ArrowDataType::try_from(a.value_type())?,
a.value_contains_null(),
),
Expand Down Expand Up @@ -127,7 +130,10 @@ impl TryFrom<&DataType> for ArrowDataType {
}
PrimitiveType::Timestamp => {
// Issue: https://github.com/delta-io/delta/issues/643
Ok(ArrowDataType::Timestamp(TimeUnit::Microsecond, None))
Ok(ArrowDataType::Timestamp(
TimeUnit::Microsecond,
Some("UTC".into()),
))
}
}
}
Expand All @@ -147,12 +153,12 @@ impl TryFrom<&DataType> for ArrowDataType {
ArrowDataType::Struct(
vec![
ArrowField::new(
"keys",
MAP_KEYS_NAME,
<ArrowDataType as TryFrom<&DataType>>::try_from(m.key_type())?,
false,
),
ArrowField::new(
"values",
MAP_VALUES_NAME,
<ArrowDataType as TryFrom<&DataType>>::try_from(m.value_type())?,
m.value_contains_null(),
),
Expand Down Expand Up @@ -784,7 +790,7 @@ mod tests {
let timestamp_field = DataType::Primitive(PrimitiveType::Timestamp);
assert_eq!(
<ArrowDataType as TryFrom<&DataType>>::try_from(&timestamp_field).unwrap(),
ArrowDataType::Timestamp(TimeUnit::Microsecond, None)
ArrowDataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into()))
);
}

Expand Down
17 changes: 12 additions & 5 deletions crates/deltalake-core/src/kernel/actions/arrow/checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::str::FromStr;

use arrow_array::{
BooleanArray, Int32Array, Int64Array, ListArray, MapArray, RecordBatch, StringArray,
StructArray,
StructArray, TimestampMicrosecondArray,
};
use arrow_json::ReaderBuilder;
use arrow_schema::SchemaRef as ArrowSchemaRef;
Expand Down Expand Up @@ -247,7 +247,8 @@ fn parse_action_protocol(arr: &StructArray) -> DeltaResult<Box<dyn Iterator<Item
fn parse_actions_add(arr: &StructArray) -> DeltaResult<Box<dyn Iterator<Item = Action> + '_>> {
let paths = cast_struct_column::<StringArray>(arr, "path")?;
let sizes = cast_struct_column::<Int64Array>(arr, "size")?;
let modification_times = cast_struct_column::<Int64Array>(arr, "modificationTime")?;
let modification_times =
cast_struct_column::<TimestampMicrosecondArray>(arr, "modificationTime")?;
let data_changes = cast_struct_column::<BooleanArray>(arr, "dataChange")?;
let partition_values = cast_struct_column::<MapArray>(arr, "partitionValues")?
.iter()
Expand Down Expand Up @@ -348,7 +349,7 @@ fn parse_actions_remove(arr: &StructArray) -> DeltaResult<Box<dyn Iterator<Item
let data_changes = cast_struct_column::<BooleanArray>(arr, "dataChange")?;

let deletion_timestamps =
if let Ok(ts) = cast_struct_column::<Int64Array>(arr, "deletionTimestamp") {
if let Ok(ts) = cast_struct_column::<TimestampMicrosecondArray>(arr, "deletionTimestamp") {
Either::Left(ts.into_iter())
} else {
Either::Right(std::iter::repeat(None).take(data_changes.len()))
Expand Down Expand Up @@ -515,8 +516,13 @@ fn cast_struct_column<T: 'static>(arr: &StructArray, name: impl AsRef<str>) -> D
}

fn struct_array_to_map(arr: &StructArray) -> DeltaResult<HashMap<String, Option<String>>> {
let keys = cast_struct_column::<StringArray>(arr, "key")?;
let values = cast_struct_column::<StringArray>(arr, "value")?;
if arr.fields().len() != 2 {
return Err(Error::UnexpectedColumnType(
"Error parsing map: expected struct array with 2 fields".into(),
));
}
let keys = cast_struct_column::<StringArray>(arr, arr.fields()[0].name())?;
let values = cast_struct_column::<StringArray>(arr, arr.fields()[1].name())?;
Ok(keys
.into_iter()
.zip(values)
Expand Down Expand Up @@ -558,6 +564,7 @@ mod tests {
});
assert_eq!(action[0], expected)
}

#[test]
fn test_parse_metadata() {
let batch = action_batch();
Expand Down
Loading

0 comments on commit 6a99a5e

Please sign in to comment.