Skip to content

Commit

Permalink
fix: change map nullable value to false (#1620)
Browse files Browse the repository at this point in the history
# Description

This value was true but where arrow defines it as always false
https://github.com/apache/arrow-rs/blob/master/arrow-schema/src/field.rs#L230.

This is also described in apache/arrow-rs#1697.

This also replaces `key_value` as the struct name with `entries` to
remain consistent with
https://github.com/apache/arrow-rs/blob/878217b9e330b4f1ed13e798a214ea11fbeb2bbb/arrow-schema/src/datatype.rs#L247-L250

The description of the main changes of your pull request

# Related Issue(s)

- closes #1619 

# Documentation

<!---
Share links to useful documentation
--->

---------

Co-authored-by: Will Jones <[email protected]>
  • Loading branch information
cmackenzie1 and wjones127 authored Sep 11, 2023
1 parent 6e00c75 commit 78c4aab
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 19 deletions.
82 changes: 63 additions & 19 deletions rust/src/delta_arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,20 +63,17 @@ impl TryFrom<&schema::SchemaTypeMap> for ArrowField {
type Error = ArrowError;

fn try_from(a: &schema::SchemaTypeMap) -> Result<Self, ArrowError> {
Ok(ArrowField::new(
Ok(ArrowField::new_map(
"map",
"entries",
ArrowDataType::Struct(
vec![
ArrowField::new("key", ArrowDataType::try_from(a.get_key_type())?, false),
ArrowField::new(
"value",
ArrowDataType::try_from(a.get_value_type())?,
a.get_value_contains_null(),
),
]
.into(),
ArrowField::new("key", ArrowDataType::try_from(a.get_key_type())?, false),
ArrowField::new(
"value",
ArrowDataType::try_from(a.get_value_type())?,
a.get_value_contains_null(),
),
false, // always non-null
false,
false,
))
}
}
Expand Down Expand Up @@ -167,7 +164,7 @@ impl TryFrom<&schema::SchemaDataType> for ArrowDataType {
]
.into(),
),
true,
false,
)),
false,
)),
Expand Down Expand Up @@ -305,7 +302,7 @@ macro_rules! arrow_map {
stringify!($fieldname),
ArrowDataType::Map(
Arc::new(ArrowField::new(
"key_value",
"entries",
ArrowDataType::Struct(
vec![
ArrowField::new("key", ArrowDataType::Utf8, false),
Expand All @@ -325,11 +322,11 @@ macro_rules! arrow_map {
stringify!($fieldname),
ArrowDataType::Map(
Arc::new(ArrowField::new(
"key_value",
"entries",
ArrowDataType::Struct(
vec![
ArrowField::new("key", ArrowDataType::Utf8, false),
ArrowField::new("value", ArrowDataType::Utf8, true),
ArrowField::new("value", ArrowDataType::Utf8, false),
]
.into(),
),
Expand Down Expand Up @@ -637,6 +634,13 @@ fn null_count_schema_for_fields(dest: &mut Vec<ArrowField>, f: &ArrowField) {

#[cfg(test)]
mod tests {
use arrow::array::ArrayData;
use arrow::datatypes::DataType;
use arrow_array::Array;
use arrow_array::{make_array, ArrayRef, MapArray, StringArray, StructArray};
use arrow_buffer::{Buffer, ToByteSlice};
use arrow_schema::Field;

use super::*;
use std::collections::HashMap;
use std::sync::Arc;
Expand Down Expand Up @@ -840,15 +844,15 @@ mod tests {
fn test_delta_from_arrow_map_type() {
let arrow_map = ArrowDataType::Map(
Arc::new(ArrowField::new(
"key_value",
"entries",
ArrowDataType::Struct(
vec![
ArrowField::new("key", ArrowDataType::Int8, false),
ArrowField::new("value", ArrowDataType::Binary, true),
]
.into(),
),
true,
false,
)),
false,
);
Expand Down Expand Up @@ -877,7 +881,47 @@ mod tests {
let entry_offsets = vec![0u32, 1, 1, 4, 5, 5];
let num_rows = keys.len();

let map_array = arrow::array::MapArray::new_from_strings(
// Copied the function `new_from_string` with the patched code from https://github.com/apache/arrow-rs/pull/4808
// This should be reverted back [`MapArray::new_from_strings`] once arrow is upgraded in this project.
fn new_from_strings<'a>(
keys: impl Iterator<Item = &'a str>,
values: &dyn Array,
entry_offsets: &[u32],
) -> Result<MapArray, ArrowError> {
let entry_offsets_buffer = Buffer::from(entry_offsets.to_byte_slice());
let keys_data = StringArray::from_iter_values(keys);

let keys_field = Arc::new(Field::new("keys", DataType::Utf8, false));
let values_field = Arc::new(Field::new(
"values",
values.data_type().clone(),
values.null_count() > 0,
));

let entry_struct = StructArray::from(vec![
(keys_field, Arc::new(keys_data) as ArrayRef),
(values_field, make_array(values.to_data())),
]);

let map_data_type = DataType::Map(
Arc::new(Field::new(
"entries",
entry_struct.data_type().clone(),
false,
)),
false,
);

let map_data = ArrayData::builder(map_data_type)
.len(entry_offsets.len() - 1)
.add_buffer(entry_offsets_buffer)
.add_child_data(entry_struct.into_data())
.build()?;

Ok(MapArray::from(map_data))
}

let map_array = new_from_strings(
keys.into_iter(),
&arrow::array::BinaryArray::from(values),
entry_offsets.as_slice(),
Expand Down
51 changes: 51 additions & 0 deletions rust/tests/integration_datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ use std::error::Error;
mod common;

mod local {
use deltalake::{writer::JsonWriter, SchemaTypeMap};

use super::*;
#[tokio::test]
#[serial]
Expand Down Expand Up @@ -933,6 +935,55 @@ mod local {

Ok(())
}

#[tokio::test]
async fn test_issue_1619_parquet_panic_using_map_type() -> Result<()> {
let _ = tokio::fs::remove_dir_all("./tests/data/issue-1619").await;
let fields: Vec<SchemaField> = vec![SchemaField::new(
"metadata".to_string(),
SchemaDataType::map(SchemaTypeMap::new(
Box::new(SchemaDataType::primitive("string".to_string())),
Box::new(SchemaDataType::primitive("string".to_string())),
true,
)),
true,
HashMap::new(),
)];
let schema = deltalake::Schema::new(fields);
let table = deltalake::DeltaTableBuilder::from_uri("./tests/data/issue-1619").build()?;
let _ = DeltaOps::from(table)
.create()
.with_columns(schema.get_fields().to_owned())
.await?;

let mut table = deltalake::open_table("./tests/data/issue-1619").await?;

let mut writer = JsonWriter::for_table(&table).unwrap();
let _ = writer
.write(vec![
serde_json::json!({"metadata": {"hello": "world", "something": null}}),
])
.await
.unwrap();
writer.flush_and_commit(&mut table).await.unwrap();

let ctx = SessionContext::new();
ctx.register_table("t", Arc::new(table))?;

let batches = ctx.sql(r#"SELECT * FROM t"#).await?.collect().await?;

let expected = vec![
"+-----------------------------+",
"| metadata |",
"+-----------------------------+",
"| {hello: world, something: } |", // unclear why it doesn't say `null` for something...
"+-----------------------------+",
];

assert_batches_sorted_eq!(&expected, &batches);

Ok(())
}
}

#[cfg(any(feature = "s3", feature = "s3-native-tls"))]
Expand Down

0 comments on commit 78c4aab

Please sign in to comment.