diff --git a/parquet-testing b/parquet-testing index 4cb3cff24c96..1ba34478f535 160000 --- a/parquet-testing +++ b/parquet-testing @@ -1 +1 @@ -Subproject commit 4cb3cff24c965fb329cdae763eabce47395a68a0 +Subproject commit 1ba34478f535c89382263c42c675a9af4f57f2dd diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 78d0fd6da8a9..a30bf168619f 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -1436,6 +1436,46 @@ mod tests { assert_eq!(row_count, 300); } + #[test] + fn test_read_incorrect_map_schema_file() { + let testdata = arrow::util::test_util::parquet_test_data(); + // see https://github.com/apache/parquet-testing/pull/47 + let path = format!("{testdata}/incorrect_map_schema.parquet"); + let file = File::open(path).unwrap(); + let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap(); + + let batch = record_reader.next().unwrap().unwrap(); + assert_eq!(batch.num_rows(), 1); + + let expected_schema = Schema::new(Fields::from(vec![Field::new( + "my_map", + ArrowDataType::Map( + Arc::new(Field::new( + "key_value", + ArrowDataType::Struct(Fields::from(vec![ + Field::new("key", ArrowDataType::Utf8, false), + Field::new("value", ArrowDataType::Utf8, true), + ])), + false, + )), + false, + ), + true, + )])); + assert_eq!(batch.schema().as_ref(), &expected_schema); + + assert_eq!(batch.num_rows(), 1); + assert_eq!(batch.column(0).null_count(), 0); + assert_eq!( + batch.column(0).as_map().keys().as_ref(), + &StringArray::from(vec!["parent", "name"]) + ); + assert_eq!( + batch.column(0).as_map().values().as_ref(), + &StringArray::from(vec!["another", "report"]) + ); + } + /// Parameters for single_column_reader_test #[derive(Clone)] struct TestOptions { diff --git a/parquet/src/arrow/schema/complex.rs b/parquet/src/arrow/schema/complex.rs index 9f85b2c284c6..3107a282e0db 100644 --- a/parquet/src/arrow/schema/complex.rs +++ b/parquet/src/arrow/schema/complex.rs @@ -286,8 +286,16 @@ impl Visitor { let map_key = &map_key_value.get_fields()[0]; let map_value = &map_key_value.get_fields()[1]; - if map_key.get_basic_info().repetition() != Repetition::REQUIRED { - return Err(arrow_err!("Map keys must be required")); + match map_key.get_basic_info().repetition() { + Repetition::REPEATED => { + return Err(arrow_err!("Map keys cannot be repeated")); + } + Repetition::REQUIRED | Repetition::OPTIONAL => { + // Relaxed check for having repetition REQUIRED as there exists + // parquet writers and files that do not conform to this standard. + // This allows us to consume a broader range of existing files even + // if they are out of spec. + } } if map_value.get_basic_info().repetition() == Repetition::REPEATED { @@ -346,7 +354,11 @@ impl Visitor { // Need both columns to be projected match (maybe_key, maybe_value) { (Some(key), Some(value)) => { - let key_field = Arc::new(convert_field(map_key, &key, arrow_key)); + let key_field = Arc::new( + convert_field(map_key, &key, arrow_key) + // The key is always non-nullable (#5630) + .with_nullable(false), + ); let value_field = Arc::new(convert_field(map_value, &value, arrow_value)); let field_metadata = match arrow_map { Some(field) => field.metadata().clone(), diff --git a/parquet/src/arrow/schema/mod.rs b/parquet/src/arrow/schema/mod.rs index e50016fd5b10..37f368b203ac 100644 --- a/parquet/src/arrow/schema/mod.rs +++ b/parquet/src/arrow/schema/mod.rs @@ -1018,6 +1018,12 @@ mod tests { OPTIONAL int32 value; } } + REQUIRED group my_map4 (MAP) { + REPEATED group map { + OPTIONAL binary key (UTF8); + REQUIRED int32 value; + } + } } "; @@ -1075,6 +1081,24 @@ mod tests { )); } + // // Map (non-compliant nullable key) + // group my_map (MAP_KEY_VALUE) { + // repeated group map { + // optional binary key (UTF8); + // required int32 value; + // } + // } + { + arrow_fields.push(Field::new_map( + "my_map4", + "map", + Field::new("key", DataType::Utf8, false), // The key is always non-nullable (#5630) + Field::new("value", DataType::Int32, false), + false, + false, + )); + } + let parquet_group_type = parse_message_type(message_type).unwrap(); let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));