-
Notifications
You must be signed in to change notification settings - Fork 839
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add IPC StreamDecoder #5531
Add IPC StreamDecoder #5531
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,10 +19,12 @@ | |
//! in `testing/arrow-ipc-stream/integration/...` | ||
|
||
use arrow::error::ArrowError; | ||
use arrow::ipc::reader::{FileReader, StreamReader}; | ||
use arrow::ipc::reader::{FileReader, StreamDecoder, StreamReader}; | ||
use arrow::util::test_util::arrow_test_data; | ||
use arrow_buffer::Buffer; | ||
use arrow_integration_testing::read_gzip_json; | ||
use std::fs::File; | ||
use std::io::Read; | ||
|
||
#[test] | ||
fn read_0_1_4() { | ||
|
@@ -182,18 +184,45 @@ fn verify_arrow_stream(testdata: &str, version: &str, path: &str) { | |
let filename = format!("{testdata}/arrow-ipc-stream/integration/{version}/{path}.stream"); | ||
println!("Verifying {filename}"); | ||
|
||
// read expected JSON output | ||
let arrow_json = read_gzip_json(version, path); | ||
|
||
// Compare contents to the expected output format in JSON | ||
{ | ||
println!(" verifying content"); | ||
let file = File::open(&filename).unwrap(); | ||
let mut reader = StreamReader::try_new(file, None).unwrap(); | ||
|
||
// read expected JSON output | ||
let arrow_json = read_gzip_json(version, path); | ||
assert!(arrow_json.equals_reader(&mut reader).unwrap()); | ||
// the next batch must be empty | ||
assert!(reader.next().is_none()); | ||
// the stream must indicate that it's finished | ||
assert!(reader.is_finished()); | ||
} | ||
|
||
// Test stream decoder | ||
let expected = arrow_json.get_record_batches().unwrap(); | ||
for chunk_sizes in [1, 2, 8, 123] { | ||
let mut decoder = StreamDecoder::new(); | ||
let stream = chunked_file(&filename, chunk_sizes); | ||
let mut actual = Vec::with_capacity(expected.len()); | ||
for mut x in stream { | ||
while !x.is_empty() { | ||
if let Some(x) = decoder.decode(&mut x).unwrap() { | ||
actual.push(x); | ||
} | ||
} | ||
} | ||
decoder.finish().unwrap(); | ||
assert_eq!(expected, actual); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we also add tests for error conditions? At the very least I think we should have a test for truncated stream which I think would be the most likely error in practice. It would be nice to have a test for things like missing dictionaries, but I think that the value of those tests is lower |
||
} | ||
} | ||
|
||
fn chunked_file(filename: &str, chunk_size: u64) -> impl Iterator<Item = Buffer> { | ||
let mut file = File::open(filename).unwrap(); | ||
std::iter::from_fn(move || { | ||
let mut buf = vec![]; | ||
let read = (&mut file).take(chunk_size).read_to_end(&mut buf).unwrap(); | ||
(read != 0).then(|| Buffer::from_vec(buf)) | ||
}) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,12 +17,17 @@ | |
|
||
//! Utilities for converting between IPC types and native Arrow types | ||
|
||
use arrow_buffer::Buffer; | ||
use arrow_schema::*; | ||
use flatbuffers::{FlatBufferBuilder, ForwardsUOffset, UnionWIPOffset, Vector, WIPOffset}; | ||
use flatbuffers::{ | ||
FlatBufferBuilder, ForwardsUOffset, UnionWIPOffset, Vector, Verifiable, Verifier, | ||
VerifierOptions, WIPOffset, | ||
}; | ||
use std::collections::HashMap; | ||
use std::fmt::{Debug, Formatter}; | ||
use std::sync::Arc; | ||
|
||
use crate::{size_prefixed_root_as_message, KeyValue, CONTINUATION_MARKER}; | ||
use crate::{size_prefixed_root_as_message, KeyValue, Message, CONTINUATION_MARKER}; | ||
use DataType::*; | ||
|
||
/// Serialize a schema in IPC format | ||
|
@@ -806,6 +811,45 @@ pub(crate) fn get_fb_dictionary<'a>( | |
builder.finish() | ||
} | ||
|
||
/// An owned container for a validated [`Message`] | ||
/// | ||
/// Safely decoding a flatbuffer requires validating the various embedded offsets, | ||
/// see [`Verifier`]. This is a potentially expensive operation, and it is therefore desirable | ||
/// to only do this once. [`crate::root_as_message`] performs this validation on construction, | ||
/// however, it returns a [`Message`] borrowing the provided byte slice. This prevents | ||
/// storing this [`Message`] in the same data structure that owns the buffer, as this | ||
/// would require self-referential borrows. | ||
/// | ||
/// [`MessageBuffer`] solves this problem by providing a safe API for a [`Message`] | ||
/// without a lifetime bound. | ||
#[derive(Clone)] | ||
pub struct MessageBuffer(Buffer); | ||
|
||
impl Debug for MessageBuffer { | ||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { | ||
self.as_ref().fmt(f) | ||
} | ||
} | ||
|
||
impl MessageBuffer { | ||
/// Try to create a [`MessageBuffer`] from the provided [`Buffer`] | ||
pub fn try_new(buf: Buffer) -> Result<Self, ArrowError> { | ||
let opts = VerifierOptions::default(); | ||
let mut v = Verifier::new(&opts, &buf); | ||
<ForwardsUOffset<Message>>::run_verifier(&mut v, 0).map_err(|err| { | ||
ArrowError::ParseError(format!("Unable to get root as message: {err:?}")) | ||
})?; | ||
Ok(Self(buf)) | ||
} | ||
|
||
/// Return the [`Message`] | ||
#[inline] | ||
pub fn as_ref(&self) -> Message<'_> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it would be more standard to call this function There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unfortunately this isn't possible because of the lifetime on |
||
// SAFETY: Run verifier on construction | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was worried that this is a not a safe API as since this is a public struct and it could be created directly let buf = MessageBuffer(my_unsafe_buffer);
// cast to message without checks being called:
let my unsafe_message = buf.as_ref(); However, I convinced myself it was actually safe as long as |
||
unsafe { crate::root_as_message_unchecked(&self.0) } | ||
} | ||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
use super::*; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✅ this is a very nice test