Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle primitive REPEATED field not contained in LIST annotated group #6649

Merged
merged 9 commits into from
Nov 2, 2024

Conversation

zeevm
Copy link
Contributor

@zeevm zeevm commented Oct 29, 2024

Which issue does this PR close?

Closes #6648

Rationale for this change

What changes are included in this PR?

Are there any user-facing changes?

@github-actions github-actions bot added the parquet Changes to the parquet crate label Oct 29, 2024
Copy link
Contributor

@tustvold tustvold left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we please get a test for this

let reader = Reader::PrimitiveReader(field.clone(), Box::new(column));
if repetition == Repetition::REPEATED && path.len() == 1 {
if curr_def_level != 1 || curr_rep_level != 1 {
return Err(ParquetError::General(format!("Top level REPEATED primitve field {} should have definition and repetition levels of 1", field.name())));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not very familiar with this code, but AFAICT there's no reason we're necessarily at the top-level here

Copy link
Contributor Author

@zeevm zeevm Oct 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should handle these cases (from the format ):

A repeated field that is neither contained by a LIST- or MAP-annotated group nor annotated by LIST or MAP should be interpreted as a required list of required elements where the element type is the type of the field.

Implementations should use either LIST and MAP annotations or unannotated repeated fields, but not both. When using the annotations, no unannotated repeated types are allowed.

IIUC we're talking about top level fields not contained in LIST groups, this is a legacy schema for lists.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right but this could be contained in a nullable group for example

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right but this could be contained in a nullable group for example

In fact there's a test for this in parquet-testing (which is used by the test_tree_reader_handle_repeated_fields_with_no_annotation test)...repeated_no_annotation.parquet.

created by: parquet-rs version 0.3.0 (build b45ce7cba2199f22d93269c150d8a83916c69b5e)
message user {
  REQUIRED INT32 id;
  OPTIONAL group phoneNumbers {
    REPEATED group phone {
      REQUIRED INT64 number;
      OPTIONAL BYTE_ARRAY kind (UTF8);
    }
  }

But parquet-rs handles this case, so perhaps it's just the top level unannotated list that needs special handling.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just did a test of

message spark_schema {
  REPEATED group my_struct {
    REQUIRED INT32 key;
    OPTIONAL INT32 value;
  }
}

and this also worked without the patch in this PR. A second test with

message spark_schema {
    REPEATED INT32 key;
    REPEATED INT32 value;
}

fails without the patch.

test code
    #[test]
    fn test_repeated_struct_no_annotation() {
        let schema = "
            message spark_schema {
                REPEATED group my_struct {
                    REQUIRED INT32 key;
                    OPTIONAL INT32 value;
                }
            }
            ";
        let schema = Arc::new(parse_message_type(schema).unwrap());

        // Write Parquet file to buffer
        //let mut file = std::fs::File::create("/Users/seidl/test_struct.pq").unwrap();
        let mut buffer: Vec<u8> = Vec::new();
        let mut file_writer =
            SerializedFileWriter::new(&mut buffer, schema, Default::default()).unwrap();
        let mut row_group_writer = file_writer.next_row_group().unwrap();

        // Write column my_struct.key
        let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
        column_writer
            .typed::<Int32Type>()
            .write_batch(
                &[1, 2, 3, 4, 5, 6, 7, 8, 9],
                Some(&[1, 1, 1, 1, 1, 1, 1, 1, 1]),
                Some(&[0, 1, 1, 0, 1, 1, 0, 1, 1]),
            )
            .unwrap();
        column_writer.close().unwrap();

        // Write column my_struct.value
        let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
        column_writer
            .typed::<Int32Type>()
            .write_batch(
                &[42, 7, 6, 99, 100, 2, 7, 9],
                Some(&[2, 2, 2, 2, 2, 2, 2, 1, 2]),
                Some(&[0, 1, 1, 0, 1, 1, 0, 1, 1]),
            )
            .unwrap();
        column_writer.close().unwrap();

        // Finalize Parquet file
        row_group_writer.close().unwrap();
        file_writer.close().unwrap();
        assert_eq!(&buffer[0..4], b"PAR1");

        // Read Parquet file from buffer
        let file_reader = SerializedFileReader::new(Bytes::from(buffer)).unwrap();
        let rows: Vec<_> = file_reader
            .get_row_iter(None)
            .unwrap()
            .map(|row| row.unwrap())
            .collect();

        let expected_rows = vec![
            row![(
                "my_struct".to_string(),
                list![
                    group![
                        ("key".to_string(), Field::Int(1)),
                        ("value".to_string(), Field::Int(42))
                    ],
                    group![
                        ("key".to_string(), Field::Int(2)),
                        ("value".to_string(), Field::Int(7))
                    ],
                    group![
                        ("key".to_string(), Field::Int(3)),
                        ("value".to_string(), Field::Int(6))
                    ]
                ]
            )],
            row![(
                "my_struct".to_string(),
                list![
                    group![
                        ("key".to_string(), Field::Int(4)),
                        ("value".to_string(), Field::Int(99))
                    ],
                    group![
                        ("key".to_string(), Field::Int(5)),
                        ("value".to_string(), Field::Int(100))
                    ],
                    group![
                        ("key".to_string(), Field::Int(6)),
                        ("value".to_string(), Field::Int(2))
                    ]
                ]
            )],
            row![(
                "my_struct".to_string(),
                list![
                    group![
                        ("key".to_string(), Field::Int(7)),
                        ("value".to_string(), Field::Int(7))
                    ],
                    group![
                        ("key".to_string(), Field::Int(8)),
                        ("value".to_string(), Field::Null)
                    ],
                    group![
                        ("key".to_string(), Field::Int(9)),
                        ("value".to_string(), Field::Int(9))
                    ]
                ]
            )],
        ];

        assert_eq!(rows, expected_rows);
    }

    #[test]
    fn test_repeated_primitive_no_annotation() {
        let schema = "
            message spark_schema {
                REPEATED INT32 key;
                REPEATED INT32 value;
            }
            ";
        let schema = Arc::new(parse_message_type(schema).unwrap());

        // Write Parquet file to buffer
        //let mut file = std::fs::File::create("/Users/seidl/test_struct.pq").unwrap();
        let mut buffer: Vec<u8> = Vec::new();
        let mut file_writer =
            SerializedFileWriter::new(&mut buffer, schema, Default::default()).unwrap();
        let mut row_group_writer = file_writer.next_row_group().unwrap();

        // Write column my_struct.key
        let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
        column_writer
            .typed::<Int32Type>()
            .write_batch(
                &[1, 2, 3, 4, 5, 6, 7, 8, 9],
                Some(&[1, 1, 1, 1, 1, 1, 1, 1, 1]),
                Some(&[0, 1, 1, 0, 1, 1, 0, 1, 1]),
            )
            .unwrap();
        column_writer.close().unwrap();

        // Write column my_struct.value
        let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
        column_writer
            .typed::<Int32Type>()
            .write_batch(
                &[42, 7, 6, 99, 100, 2],
                Some(&[1, 1, 1, 1, 1, 1, 0]),
                Some(&[0, 1, 1, 0, 1, 1, 0]),
            )
            .unwrap();
        column_writer.close().unwrap();

        // Finalize Parquet file
        row_group_writer.close().unwrap();
        file_writer.close().unwrap();
        assert_eq!(&buffer[0..4], b"PAR1");

        // Read Parquet file from buffer
        let file_reader = SerializedFileReader::new(Bytes::from(buffer)).unwrap();
        let rows: Vec<_> = file_reader
            .get_row_iter(None)
            .unwrap()
            .map(|row| row.unwrap())
            .collect();

        let expected_rows = vec![
            row![
                (
                    "key".to_string(),
                    list![Field::Int(1), Field::Int(2), Field::Int(3)]
                ),
                (
                    "value".to_string(),
                    list![Field::Int(42), Field::Int(7), Field::Int(6)]
                ),
            ],
            row![
                (
                    "key".to_string(),
                    list![Field::Int(4), Field::Int(5), Field::Int(6)]
                ),
                (
                    "value".to_string(),
                    list![Field::Int(99), Field::Int(100), Field::Int(2)]
                ),
            ],
            row![
                (
                    "key".to_string(),
                    list![Field::Int(7), Field::Int(8), Field::Int(9)]
                ),
                (
                    "value".to_string(),
                    list![]
                ),
            ],
        ];

        assert_eq!(rows, expected_rows);
    }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, your first case is explicitly accounted for in the current implementation , see the comment there

The second case isn't implemented at the moment and requires the patch

Copy link
Contributor

@etseidl etseidl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given some testing, I think this is a good fix for a very specific problem. Thanks!

@zeevm
Copy link
Contributor Author

zeevm commented Oct 31, 2024

@tustvold / @etseidl How do I trigger the checks again without commiting new changes? The test file I added is not merged into parquet-testing so the new UT should pass now

@etseidl
Copy link
Contributor

etseidl commented Oct 31, 2024

I lack permissions. Perhaps @alamb can kick off the tests again.

@zeevm
Copy link
Contributor Author

zeevm commented Oct 31, 2024

@tustvold / @etseidl My new test file is merged into parquet-testing, my local branch runs the UT successfully, but the CI fails on the UT not finding the file, any ideas why that is?

Is there some step required after commiting a new test file to update the test environment?

@etseidl
Copy link
Contributor

etseidl commented Oct 31, 2024

I'll take a look in a little bit. It could be the CI is pegged to a specific parquet-testing commit.

@etseidl
Copy link
Contributor

etseidl commented Oct 31, 2024

@zeevm I admit to not being an expert here. Looking at the error logs, CI is pulling an older commit. Pure speculation, but based on this I wonder if you need to do git submodule update --remote in your branch, create a new commit and push it. That may cause CI to pick up the recent changes.

Anyone with more git/github/workflow knowledge please chime in if I'm just embarrassing myself here 😅

Update: that seemed to work. See 916572f (after the submodule update, I staged everything under arrow-rs and committed). I still don't know if this is the right thing to do, but CI passed.

@zeevm
Copy link
Contributor Author

zeevm commented Oct 31, 2024

@etseidl you suggestion works, and the UT now passes, can you merge?

@etseidl
Copy link
Contributor

etseidl commented Oct 31, 2024

can you merge?

I'm afraid I cannot as I'm not a committer. I know committer bandwidth is very constrained at the moment, so it may take little while.

@zeevm
Copy link
Contributor Author

zeevm commented Oct 31, 2024

@tustvold is this good for merge?

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @zeevm and @etseidl -- this makes sense to me

@@ -138,7 +138,17 @@ impl TreeBuilder {
.column_descr_ptr();
let col_reader = row_group_reader.get_column_reader(orig_index)?;
let column = TripletIter::new(col_descr, col_reader, self.batch_size);
Reader::PrimitiveReader(field, Box::new(column))
let reader = Reader::PrimitiveReader(field.clone(), Box::new(column));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the changes in parquet-testing come from apache/parquet-testing#61 👍

@@ -1688,6 +1698,131 @@ mod tests {
assert_eq!(rows, expected_rows);
}

#[test]
fn test_tree_reader_handle_primitive_repeated_fields_with_no_annotation() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran these tests without the change in this PR and it failed like this. Thus I conclude this test covers the code change


assertion `left == right` failed
  left: [Row { fields: [("Int32_list", Int(0)), ("String_list", Str("foo")), ("group_of_lists", Group(Row { fields: [("Int32_list_in_group", Int(0)), ("String_list_in_group", Str("foo"))] }))] }, Row { fields: [("Int32_list", Int(1)), ("String_list", Str("zero")), ("group_of_lists", Group(Row { fields: [("Int32_list_in_group", Int(1)), ("String_list_in_group", Str("zero"))] }))] }, Row { fields: [("Int32_list", Int(2)), ("String_list", Str("one")), ("group_of_lists", Group(Row { fields: [("Int32_list_in_group", Int(2)), ("String_list_in_group", Str("one"))] }))] }, Row { fields: [("Int32_list", Int(3)), ("String_list", Str("two")), ("group_of_lists", Group(Row { fields: [("Int32_list_in_group", Int(3)), ("String_list_in_group", Str("two"))] }))] }]
 right: [Row { fields: [("Int32_list", ListInternal(List { elements: [Int(0), Int(1), Int(2), Int(3)] })), ("String_list", ListInternal(List { elements: [Str("foo"), Str("zero"), Str("one"), Str("two")] })), ("group_of_lists", Group(Row { fields: [("Int32_list_in_group", ListInternal(List { elements: [Int(0), Int(1), Int(2), Int(3)] })), ("String_list_in_group", ListInternal(List { elements: [Str("foo"), Str("zero"), Str("one"), Str("two")] }))] }))] }, Row { fields: [("Int32_list", ListInternal(List { elements: [] })), ("String_list", ListInternal(List { elements: [Str("three")] })), ("group_of_lists", Group(Row { fields: [("Int32_list_in_group", ListInternal(List { elements: [] })), ("String_list_in_group", ListInternal(List { elements: [Str("three")] }))] }))] }, Row { fields: [("Int32_list", ListInternal(List { elements: [Int(4)] })), ("String_list", ListInternal(List { elements: [Str("four")] })), ("group_of_lists", Group(Row { fields: [("Int32_list_in_group", ListInternal(List { elements: [Int(4)] })), ("String_list_in_group", ListInternal(List { elements: [Str("four")] }))] }))] }, Row { fields: [("Int32_list", ListInternal(List { elements: [Int(5), Int(6), Int(7), Int(8)] })), ("String_list", ListInternal(List { elements: [Str("five"), Str("six"), Str("seven"), Str("eight")] })), ("group_of_lists", Group(Row { fields: [("Int32_list_in_group", ListInternal(List { elements: [Int(5), Int(6), Int(7), Int(8)] })), ("String_list_in_group", ListInternal(List { elements: [Str("five"), Str("six"), Str("seven"), Str("eight")] }))] }))] }]

Left:  [Row { fields: [("Int32_list", Int(0)), ("String_list", Str("foo")), ("group_of_lists", Group(Row { fields: [("Int32_list_in_group", Int(0)), ("String_list_in_group", Str("foo"))] }))] }, Row { fields: [("Int32_list", Int(1)), ("String_list", Str("ze ...

Right: [Row { fields: [("Int32_list", ListInternal(List { elements: [Int(0), Int(1), Int(2), Int(3)] })), ("String_list", ListInternal(List { elements: [Str("foo"), Str("zero"), Str("one"), Str("two")] })), ("group_of_lists", Group(Row { fields: [("Int32_li ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

correct @alamb , are you good to merge this?

@alamb
Copy link
Contributor

alamb commented Oct 31, 2024

Yes, I plan to merge this PR tomorrow unless there are any more comments

@alamb
Copy link
Contributor

alamb commented Nov 2, 2024

🚀

@alamb alamb merged commit 6859c87 into apache:master Nov 2, 2024
16 checks passed
@alamb
Copy link
Contributor

alamb commented Nov 2, 2024

Thanks again @zeevm and @etseidl

@wgtmac
Copy link
Member

wgtmac commented Nov 4, 2024

Sorry for the late reply. I'm not sure whether we should fix reading or totally prohibit writing repeated primitive fields without LIST annotation as a list type. This is a gray area from the spec and should be discussed prior to the fix. The file added to parquet-testing is for interoperability across parquet implementations to have the same behavior. I have opened apache/parquet-format#466 to clarify it.

@alamb
Copy link
Contributor

alamb commented Nov 8, 2024

Sorry for the late reply. I'm not sure whether we should fix reading or totally prohibit writing repeated primitive fields without LIST annotation as a list type. This is a gray area from the spec and should be discussed prior to the fix. The file added to parquet-testing is for interoperability across parquet implementations to have the same behavior. I have opened apache/parquet-format#466 to clarify it.

Than you @wgtmac -- given there are parquet files (such as what is in parquet-format) that do have this structure, I think supporting reads makes sense. If we decide it would be good to prohibit writers fro writing such files that sounds like a good follow on to me, but it still adds value to parquet-rs to read such files from my perspective.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
parquet Changes to the parquet crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Primitive REPEATED fields not contained in LIST annotated groups aren't read as lists by record reader
5 participants