-
Notifications
You must be signed in to change notification settings - Fork 825
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Parquet: clear metadata and project fields of ParquetRecordBatchStream::schema #5135
Parquet: clear metadata and project fields of ParquetRecordBatchStream::schema #5135
Conversation
Whilst I agree with the spirit of this, stripping this metadata would be a major breaking change that would definitely break IOx and possibly some other workloads. Perhaps we could just document this potential disparity? |
No problem, updated the docstring |
/// Note that unlike its synchronous counterpart [`ParquetRecordBatchReader`], the [`SchemaRef`] | ||
/// returned here will contain the original metadata, whereas [`ParquetRecordBatchReader`] | ||
/// strips this metadata. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you sure about this, they use the same logic? The difference is that the returned RecordBatch lack the metadata
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, see how schema is created for ParquetRecordBatchReader
here:
schema: Arc::new(Schema::new(levels.fields.clone())), |
And here:
arrow-rs/parquet/src/arrow/arrow_reader/mod.rs
Lines 622 to 625 in 093a10e
let schema = match array_reader.get_data_type() { | |
ArrowType::Struct(ref fields) => Schema::new(fields.clone()), | |
_ => unreachable!("Struct array reader's data type is not struct!"), | |
}; |
They only account for the fields, ignoring the metadata.
Ok sorry for going back and forth on this, I hadn't quite grasped what the issue here was
I think demonstrates the issue, in particular the schema returned by ParquetRecordBatchStream is incorrect, it should return the projected schema with the metadata removed. |
I'll revert back to the initial change, as well as account for projection which I wasn't aware it didn't account for either |
I think if you determine the schema from the ParquetField on ReaderFactory (which should be a DataType::StructArray) I think you might be able to get both in one |
Re-implemented the metadata stripping for |
@@ -385,13 +385,28 @@ impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> { | |||
offset: self.offset, | |||
}; | |||
|
|||
// Ensure schema of ParquetRecordBatchStream respects projection, and does | |||
// not store metadata (same as for ParquetRecordBatchReader and emitted RecordBatches) | |||
let projected_fields = match reader.fields.as_deref().map(|pf| &pf.arrow_type) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't quite correct in the case of a projection of a nested schema, its possible this isn't actually determined until the ArrayReader is constructed...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although https://docs.rs/parquet/latest/parquet/arrow/fn.parquet_to_arrow_field_levels.html
takes a ProjectionMask and should apply it already?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was a bit worried about this, as I couldn't find a straightforward way that the schema was constructed from ParquetField + ProjectionMask, as it seems done in the ArrayReader construction indeed.
Edit: wasn't aware of https://docs.rs/parquet/latest/parquet/arrow/fn.parquet_to_arrow_field_levels.html, will check it out 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll have a quick play, stand by
I've reworked it to make use of the function introduced by #5149 Hopefully I've used it correctly here |
Which issue does this PR close?
Closes #4023
Rationale for this change
What changes are included in this PR?
Ensure metadata hashmap of Schema from
ParquetRecordBatchStream::schema
is empty, to ensure exact same schema as for its RecordBatches it outputs.Are there any user-facing changes?