Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only require compatible batch schema in ArrowWriter #4027

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 51 additions & 1 deletion parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,10 @@ impl<W: Write> ArrowWriter<W> {
/// and drop any fully written `RecordBatch`
pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
// validate batch schema against writer's supplied schema
if self.arrow_schema != batch.schema() {
let batch_schema = batch.schema();
if !(Arc::ptr_eq(&self.arrow_schema, &batch_schema)
|| self.arrow_schema.contains(&batch_schema))
{
return Err(ParquetError::ArrowError(
"Record batch schema does not match writer schema".to_string(),
));
Expand Down Expand Up @@ -2358,4 +2361,51 @@ mod tests {
let actual = pretty_format_batches(&batches).unwrap().to_string();
assert_eq!(actual, expected);
}

#[test]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I verified that these tests fail without the code change:


---- arrow::arrow_writer::tests::test_arrow_writer_metadata stdout ----
thread 'arrow::arrow_writer::tests::test_arrow_writer_metadata' panicked at 'called `Result::unwrap()` on an `Err` value: ArrowError("Record batch schema does not match writer schema")', parquet/src/arrow/arrow_writer/mod.rs:2380:30
stack backtrace:
   0: rust_begin_unwind
             at /rustc/2c8cc343237b8f7d5a3c3703e3a87f2eb2c54a74/library/std/src/panicking.rs:575:5
   1: core::panicking::panic_fmt
             at /rustc/2c8cc343237b8f7d5a3c3703e3a87f2eb2c54a74/library/core/src/panicking.rs:64:14
   2: core::result::unwrap_failed
             at /rustc/2c8cc343237b8f7d5a3c3703e3a87f2eb2c54a74/library/core/src/result.rs:1790:5
   3: core::result::Result<T,E>::unwrap
             at /rustc/2c8cc343237b8f7d5a3c3703e3a87f2eb2c54a74/library/core/src/result.rs:1112:23
   4: parquet::arrow::arrow_writer::tests::test_arrow_writer_metadata
             at ./src/arrow/arrow_writer/mod.rs:2380:9
   5: parquet::arrow::arrow_writer::tests::test_arrow_writer_metadata::{{closure}}
             at ./src/arrow/arrow_writer/mod.rs:2363:37
   6: core::ops::function::FnOnce::call_once
             at /rustc/2c8cc343237b8f7d5a3c3703e3a87f2eb2c54a74/library/core/src/ops/function.rs:250:5
   7: core::ops::function::FnOnce::call_once
             at /rustc/2c8cc343237b8f7d5a3c3703e3a87f2eb2c54a74/library/core/src/ops/function.rs:250:5
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.

---- arrow::arrow_writer::tests::test_arrow_writer_nullable stdout ----
thread 'arrow::arrow_writer::tests::test_arrow_writer_nullable' panicked at 'called `Result::unwrap()` on an `Err` value: ArrowError("Record batch schema does not match writer schema")', parquet/src/arrow/arrow_writer/mod.rs:2399:30
stack backtrace:
   0: rust_begin_unwind
             at /rustc/2c8cc343237b8f7d5a3c3703e3a87f2eb2c54a74/library/std/src/panicking.rs:575:5
   1: core::panicking::panic_fmt
             at /rustc/2c8cc343237b8f7d5a3c3703e3a87f2eb2c54a74/library/core/src/panicking.rs:64:14
   2: core::result::unwrap_failed
             at /rustc/2c8cc343237b8f7d5a3c3703e3a87f2eb2c54a74/library/core/src/result.rs:1790:5
   3: core::result::Result<T,E>::unwrap
             at /rustc/2c8cc343237b8f7d5a3c3703e3a87f2eb2c54a74/library/core/src/result.rs:1112:23
   4: parquet::arrow::arrow_writer::tests::test_arrow_writer_nullable
             at ./src/arrow/arrow_writer/mod.rs:2399:9
   5: parquet::arrow::arrow_writer::tests::test_arrow_writer_nullable::{{closure}}
             at ./src/arrow/arrow_writer/mod.rs:2385:37
   6: core::ops::function::FnOnce::call_once
             at /rustc/2c8cc343237b8f7d5a3c3703e3a87f2eb2c54a74/library/core/src/ops/function.rs:250:5
   7: core::ops::function::FnOnce::call_once
             at /rustc/2c8cc343237b8f7d5a3c3703e3a87f2eb2c54a74/library/core/src/ops/function.rs:250:5
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.

fn test_arrow_writer_metadata() {
let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
let file_schema = batch_schema.clone().with_metadata(
vec![("foo".to_string(), "bar".to_string())]
.into_iter()
.collect(),
);

let batch = RecordBatch::try_new(
Arc::new(batch_schema),
vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
)
.unwrap();

let mut buf = Vec::with_capacity(1024);
let mut writer =
ArrowWriter::try_new(&mut buf, Arc::new(file_schema), None).unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();
}

#[test]
fn test_arrow_writer_nullable() {
let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
let file_schema = Schema::new(vec![Field::new("int32", DataType::Int32, true)]);
let file_schema = Arc::new(file_schema);

let batch = RecordBatch::try_new(
Arc::new(batch_schema),
vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
)
.unwrap();

let mut buf = Vec::with_capacity(1024);
let mut writer =
ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();

let mut read = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
let back = read.next().unwrap().unwrap();
assert_eq!(back.schema(), file_schema);
assert_ne!(back.schema(), batch.schema());
assert_eq!(back.column(0).as_ref(), batch.column(0).as_ref());
}
}