Skip to content

Commit

Permalink
tests/integration/: Add support for decoding binaries
Browse files Browse the repository at this point in the history
  • Loading branch information
progval committed Mar 13, 2024
1 parent 8e254a6 commit e00a10c
Showing 1 changed file with 80 additions and 34 deletions.
114 changes: 80 additions & 34 deletions tests/integration/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,26 @@

/// Tests against `.orc` and `.jsn.gz` in the official test suite (`orc/examples/`)
use std::fs::File;
use std::io::Read;
use std::io::{Cursor, Read};
use std::sync::Arc;

use pretty_assertions::assert_eq;

use arrow::array::StructArray;
use arrow::array::{AsArray, ListArray, PrimitiveArray};
use arrow::datatypes::{DataType, Field, Schema, UInt8Type};
use arrow::record_batch::RecordBatch;
use arrow_json::{writer::LineDelimited, WriterBuilder};
use datafusion_orc::arrow_reader::ArrowReaderBuilder;

fn normalize_json(json: &[u8]) -> String {
json.split(|c| *c == b'\n')
.filter(|line| !line.is_empty())
.map(|line| serde_json::from_slice::<serde_json::Value>(line).expect("Could not parse line"))
.map(|v| serde_json::to_string_pretty(&v).expect("Could not re-serialize line"))
.collect::<Vec<_>>()
.join("\n")
}

/// Checks parsing a `.orc` file produces the expected result in the `.jsn.gz` path
fn test_expected_file(name: &str) {
let dir = env!("CARGO_MANIFEST_DIR");
Expand All @@ -18,46 +30,32 @@ fn test_expected_file(name: &str) {
let f = File::open(orc_path).expect("Could not open .orc");
let builder = ArrowReaderBuilder::try_new(f).unwrap();
let orc_reader = builder.build();
let total_row_count = orc_reader.total_row_count();

// Read .orc into JSON objects
let mut json_writer = WriterBuilder::new()
.with_explicit_nulls(true)
.build::<_, LineDelimited>(Cursor::new(Vec::new()));
let batches: Vec<RecordBatch> = orc_reader.collect::<Result<Vec<_>, _>>().unwrap();
let objects: Vec<serde_json::Value> = batches
batches
.into_iter()
.map(|batch| -> StructArray { batch.into() })
.flat_map(|array| {
arrow_json::writer::array_to_json_array(&array)
.expect("Could not serialize convert row from .orc to JSON value")
})
.collect();
.map(binaries_to_uint8_list)
.for_each(|batch| {
json_writer
.write(&batch)
.expect("Could not serialize row from .orc to JSON string")
});
let lines = normalize_json(
&json_writer.into_inner().into_inner(),
);

// Read expected JSON objects
let mut expected_json = String::new();
flate2::read::GzDecoder::new(&File::open(jsn_gz_path).expect("Could not open .jsn.gz"))
.read_to_string(&mut expected_json)
.expect("Could not read .jsn.gz");

let objects_count = objects.len();

// Reencode the input to normalize it
let expected_lines = expected_json
.split('\n')
.filter(|line| !line.is_empty())
.map(|line| {
serde_json::from_str::<serde_json::Value>(line)
.expect("Could not parse line in .jsn.gz")
})
.map(|v| {
serde_json::to_string_pretty(&v).expect("Could not re-serialize line from .jsn.gz")
})
.collect::<Vec<_>>()
.join("\n");

let lines = objects
.into_iter()
.map(|v| serde_json::to_string_pretty(&v).expect("Could not serialize row from .orc"))
.collect::<Vec<_>>()
.join("\n");
// Reencode the input
let expected_lines = normalize_json(expected_json.as_bytes());

if lines.len() < 1000 {
assert_eq!(lines, expected_lines);
Expand All @@ -67,8 +65,57 @@ fn test_expected_file(name: &str) {
assert_eq!(lines[0..1000], expected_lines[0..1000]);
assert!(lines == expected_lines);
}
}

assert_eq!(total_row_count, objects_count as u64);
/// Converts binary columns into lists of UInt8 so they can be JSON-encoded
fn binaries_to_uint8_list(batch: RecordBatch) -> RecordBatch {
RecordBatch::try_new(
Arc::new(Schema::new(
batch
.schema()
.fields
.into_iter()
.map(|field| {
Field::new(
field.name(),
match field.data_type() {
DataType::Binary => DataType::List(Arc::new(Field::new(
"value",
DataType::UInt8,
false,
))),
DataType::LargeBinary => DataType::LargeList(Arc::new(Field::new(
"value",
DataType::UInt8,
false,
))),
data_type => data_type.clone(),
},
field.is_nullable(),
)
})
.collect::<Vec<_>>(),
)),
batch
.columns()
.iter()
.map(|array| match array.as_binary_opt() {
Some(array) => {
let (offsets, values, nulls) = array.clone().into_parts();
ListArray::try_new(
Arc::new(Field::new("value", DataType::UInt8, false)),
offsets,
Arc::new(PrimitiveArray::<UInt8Type>::new(values.into(), None)),
nulls,
)
.map(Arc::new)
.expect("Could not create ListArray")
}
None => array.clone(),
})
.collect(),
)
.expect("Could not rebuild RecordBatch")
}

#[test]
Expand Down Expand Up @@ -121,7 +168,6 @@ fn testSnappy() {
test_expected_file("TestOrcFile.testSnappy");
}
#[test]
#[ignore] // TODO: arrow_json does not support binaries
fn testStringAndBinaryStatistics() {
test_expected_file("TestOrcFile.testStringAndBinaryStatistics");
}
Expand Down Expand Up @@ -152,7 +198,7 @@ fn testLzo() {
test_expected_file("TestVectorOrcFile.testLzo");
}
#[test]
#[ignore] // TODO: Differs on representation of some Decimals
#[ignore] // TODO: not yet implemented
fn decimal() {
test_expected_file("decimal");
}
Expand Down

0 comments on commit e00a10c

Please sign in to comment.