Skip to content

Commit

Permalink
Only require compatible batch schema in ArrowWriter (#4027)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold authored Apr 6, 2023
1 parent c26bb81 commit 9bf9984
Showing 1 changed file with 51 additions and 1 deletion.
52 changes: 51 additions & 1 deletion parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,10 @@ impl<W: Write> ArrowWriter<W> {
/// and drop any fully written `RecordBatch`
pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
// validate batch schema against writer's supplied schema
if self.arrow_schema != batch.schema() {
let batch_schema = batch.schema();
if !(Arc::ptr_eq(&self.arrow_schema, &batch_schema)
|| self.arrow_schema.contains(&batch_schema))
{
return Err(ParquetError::ArrowError(
"Record batch schema does not match writer schema".to_string(),
));
Expand Down Expand Up @@ -2358,4 +2361,51 @@ mod tests {
let actual = pretty_format_batches(&batches).unwrap().to_string();
assert_eq!(actual, expected);
}

#[test]
fn test_arrow_writer_metadata() {
let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
let file_schema = batch_schema.clone().with_metadata(
vec![("foo".to_string(), "bar".to_string())]
.into_iter()
.collect(),
);

let batch = RecordBatch::try_new(
Arc::new(batch_schema),
vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
)
.unwrap();

let mut buf = Vec::with_capacity(1024);
let mut writer =
ArrowWriter::try_new(&mut buf, Arc::new(file_schema), None).unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();
}

#[test]
fn test_arrow_writer_nullable() {
let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
let file_schema = Schema::new(vec![Field::new("int32", DataType::Int32, true)]);
let file_schema = Arc::new(file_schema);

let batch = RecordBatch::try_new(
Arc::new(batch_schema),
vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
)
.unwrap();

let mut buf = Vec::with_capacity(1024);
let mut writer =
ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();

let mut read = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
let back = read.next().unwrap().unwrap();
assert_eq!(back.schema(), file_schema);
assert_ne!(back.schema(), batch.schema());
assert_eq!(back.column(0).as_ref(), batch.column(0).as_ref());
}
}

0 comments on commit 9bf9984

Please sign in to comment.