Skip to content

Commit

Permalink
Add IPC StreamDecoder
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Mar 19, 2024
1 parent f41c2a4 commit 3f83468
Show file tree
Hide file tree
Showing 7 changed files with 356 additions and 15 deletions.
22 changes: 16 additions & 6 deletions arrow-buffer/src/buffer/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,23 +171,33 @@ impl Buffer {

/// Returns a new [Buffer] that is a slice of this buffer starting at `offset`.
/// Doing so allows the same memory region to be shared between buffers.
///
/// # Panics
///
/// Panics iff `offset` is larger than `len`.
pub fn slice(&self, offset: usize) -> Self {
let mut s = self.clone();
s.advance(offset);
s
}

/// Increases the offset of this buffer by `offset`
///
/// # Panics
///
/// Panics iff `offset` is larger than `len`.
#[inline]
pub fn advance(&mut self, offset: usize) {
assert!(
offset <= self.length,
"the offset of the new Buffer cannot exceed the existing length"
);
self.length -= offset;
// Safety:
// This cannot overflow as
// `self.offset + self.length < self.data.len()`
// `offset < self.length`
let ptr = unsafe { self.ptr.add(offset) };
Self {
data: self.data.clone(),
length: self.length - offset,
ptr,
}
self.ptr = unsafe { self.ptr.add(offset) };
}

/// Returns a new [Buffer] that is a slice of this buffer starting at `offset`,
Expand Down
35 changes: 32 additions & 3 deletions arrow-integration-testing/tests/ipc_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@
//! in `testing/arrow-ipc-stream/integration/...`
use arrow::error::ArrowError;
use arrow::ipc::reader::{FileReader, StreamReader};
use arrow::ipc::reader::{FileReader, StreamDecoder, StreamReader};
use arrow::util::test_util::arrow_test_data;
use arrow_buffer::Buffer;
use arrow_integration_testing::read_gzip_json;
use std::fs::File;
use std::io::Read;

#[test]
fn read_0_1_4() {
Expand Down Expand Up @@ -182,18 +184,45 @@ fn verify_arrow_stream(testdata: &str, version: &str, path: &str) {
let filename = format!("{testdata}/arrow-ipc-stream/integration/{version}/{path}.stream");
println!("Verifying {filename}");

// read expected JSON output
let arrow_json = read_gzip_json(version, path);

// Compare contents to the expected output format in JSON
{
println!(" verifying content");
let file = File::open(&filename).unwrap();
let mut reader = StreamReader::try_new(file, None).unwrap();

// read expected JSON output
let arrow_json = read_gzip_json(version, path);
assert!(arrow_json.equals_reader(&mut reader).unwrap());
// the next batch must be empty
assert!(reader.next().is_none());
// the stream must indicate that it's finished
assert!(reader.is_finished());
}

// Test stream decoder
let expected = arrow_json.get_record_batches().unwrap();
for chunk_sizes in [1, 2, 8, 123] {
let mut decoder = StreamDecoder::new();
let stream = chunked_file(&filename, chunk_sizes);
let mut actual = Vec::with_capacity(expected.len());
for mut x in stream {
while !x.is_empty() {
if let Some(x) = decoder.decode(&mut x).unwrap() {
actual.push(x);
}
}
}
decoder.finish().unwrap();
assert_eq!(expected, actual);
}
}

fn chunked_file(filename: &str, chunk_size: u64) -> impl Iterator<Item = Buffer> {
let mut file = File::open(filename).unwrap();
std::iter::from_fn(move || {
let mut buf = vec![];
let read = (&mut file).take(chunk_size).read_to_end(&mut buf).unwrap();
(read != 0).then(|| Buffer::from_vec(buf))
})
}
48 changes: 46 additions & 2 deletions arrow-ipc/src/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,17 @@

//! Utilities for converting between IPC types and native Arrow types
use arrow_buffer::Buffer;
use arrow_schema::*;
use flatbuffers::{FlatBufferBuilder, ForwardsUOffset, UnionWIPOffset, Vector, WIPOffset};
use flatbuffers::{
FlatBufferBuilder, ForwardsUOffset, UnionWIPOffset, Vector, Verifiable, Verifier,
VerifierOptions, WIPOffset,
};
use std::collections::HashMap;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;

use crate::{size_prefixed_root_as_message, KeyValue, CONTINUATION_MARKER};
use crate::{size_prefixed_root_as_message, KeyValue, Message, CONTINUATION_MARKER};
use DataType::*;

/// Serialize a schema in IPC format
Expand Down Expand Up @@ -806,6 +811,45 @@ pub(crate) fn get_fb_dictionary<'a>(
builder.finish()
}

/// An owned container for a validated [`Message`]
///
/// Safely decoding a flatbuffer requires validating the various embedded offsets,
/// see [`Verifier`]. This is a potentially expensive operation, and it is therefore desirable
/// to only do this once. [`crate::root_as_message`] performs this validation on construction,
/// however, it returns a [`Message`] borrowing the provided byte slice. This prevents
/// storing this [`Message`] in the same data structure that owns the buffer, as this
/// would require self-referential borrows.
///
/// [`MessageBuffer`] solves this problem by providing a safe API for a [`Message`]
/// without a lifetime bound.
#[derive(Clone)]
pub struct MessageBuffer(Buffer);

impl Debug for MessageBuffer {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
self.as_ref().fmt(f)
}
}

impl MessageBuffer {
/// Try to create a [`MessageBuffer`] from the provided [`Buffer`]
pub fn try_new(buf: Buffer) -> Result<Self, ArrowError> {
let opts = VerifierOptions::default();
let mut v = Verifier::new(&opts, &buf);
<ForwardsUOffset<Message>>::run_verifier(&mut v, 0).map_err(|err| {
ArrowError::ParseError(format!("Unable to get root as message: {err:?}"))
})?;
Ok(Self(buf))
}

/// Return the [`Message`]
#[inline]
pub fn as_ref(&self) -> Message<'_> {
// SAFETY: Run verifier on construction
unsafe { crate::root_as_message_unchecked(&self.0) }
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
4 changes: 4 additions & 0 deletions arrow-ipc/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
//! The `FileReader` and `StreamReader` have similar interfaces,
//! however the `FileReader` expects a reader that supports `Seek`ing
mod stream;

pub use stream::*;

use flatbuffers::{VectorIter, VerifierOptions};
use std::collections::HashMap;
use std::fmt;
Expand Down
Loading

0 comments on commit 3f83468

Please sign in to comment.