Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Reader struct, condense into Cursor #48

Merged
merged 2 commits into from
Nov 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions benches/arrow_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use std::fs::File;

use criterion::{criterion_group, criterion_main, Criterion};
use datafusion_orc::{ArrowReader, ArrowStreamReader, Cursor, Reader};
use datafusion_orc::{ArrowReader, ArrowStreamReader, Cursor};
use futures_util::TryStreamExt;

fn basic_path(path: &str) -> String {
Expand All @@ -44,8 +44,7 @@ async fn async_read_all() {
let file_path = basic_path(file);
let f = tokio::fs::File::open(file_path).await.unwrap();

let reader = Reader::new_async(f).await.unwrap();
let cursor = Cursor::root(reader).unwrap();
let cursor = Cursor::root_async(f).await.unwrap();

ArrowStreamReader::new(cursor, None)
.try_collect::<Vec<_>>()
Expand All @@ -58,8 +57,7 @@ fn sync_read_all() {
let file_path = basic_path(file);
let f = File::open(file_path).unwrap();

let reader = Reader::new(f).unwrap();
let cursor = Cursor::root(reader).unwrap();
let cursor = Cursor::root(f).unwrap();

ArrowReader::new(cursor, None)
.collect::<Result<Vec<_>, _>>()
Expand Down
167 changes: 109 additions & 58 deletions src/arrow_reader.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
pub mod column;

use std::collections::HashMap;
use std::io::{Read, Seek};
use std::io::Read;
use std::sync::Arc;

use arrow::array::{
Expand All @@ -20,6 +20,7 @@ use arrow::datatypes::{Field, TimeUnit};
use arrow::error::ArrowError;
use arrow::record_batch::{RecordBatch, RecordBatchReader};
use bytes::Bytes;
use prost::Message;
use snafu::{OptionExt, ResultExt};

use self::column::list::{new_list_iter, ListDecoder};
Expand All @@ -36,15 +37,16 @@ use crate::arrow_reader::column::struct_column::StructDecoder;
use crate::arrow_reader::column::timestamp::new_timestamp_iter;
use crate::arrow_reader::column::NullableIterator;
use crate::builder::BoxedArrayBuilder;
use crate::error::{self, InvalidColumnSnafu, Result};
use crate::error::{self, InvalidColumnSnafu, IoSnafu, Result};
use crate::proto::stream::Kind;
use crate::proto::StripeFooter;
use crate::reader::decompress::{Compression, Decompressor};
use crate::reader::Reader;
use crate::reader::metadata::{read_metadata, read_metadata_async, FileMetadata};
use crate::reader::{AsyncChunkReader, ChunkReader};
use crate::schema::{DataType, RootDataType};
use crate::stripe::StripeMetadata;

pub struct ArrowReader<R: Read> {
pub struct ArrowReader<R: ChunkReader> {
cursor: Cursor<R>,
schema_ref: SchemaRef,
current_stripe: Option<Box<dyn Iterator<Item = Result<RecordBatch>>>>,
Expand All @@ -53,7 +55,7 @@ pub struct ArrowReader<R: Read> {

pub const DEFAULT_BATCH_SIZE: usize = 8192;

impl<R: Read> ArrowReader<R> {
impl<R: ChunkReader> ArrowReader<R> {
pub fn new(cursor: Cursor<R>, batch_size: Option<usize>) -> Self {
let batch_size = batch_size.unwrap_or(DEFAULT_BATCH_SIZE);
let schema = Arc::new(create_arrow_schema(&cursor));
Expand All @@ -66,11 +68,11 @@ impl<R: Read> ArrowReader<R> {
}

pub fn total_row_count(&self) -> u64 {
self.cursor.reader.metadata().number_of_rows()
self.cursor.file_metadata.number_of_rows()
}
}

impl<R: Read + Seek> ArrowReader<R> {
impl<R: ChunkReader> ArrowReader<R> {
fn try_advance_stripe(&mut self) -> Option<std::result::Result<RecordBatch, ArrowError>> {
match self
.cursor
Expand All @@ -96,22 +98,21 @@ impl<R: Read + Seek> ArrowReader<R> {

pub fn create_arrow_schema<R>(cursor: &Cursor<R>) -> Schema {
let metadata = cursor
.reader
.metadata()
.file_metadata
.user_custom_metadata()
.iter()
.map(|(key, value)| (key.clone(), String::from_utf8_lossy(value).to_string()))
.collect::<HashMap<_, _>>();
cursor.root_data_type.create_arrow_schema(&metadata)
cursor.projected_data_type.create_arrow_schema(&metadata)
}

impl<R: Read + Seek> RecordBatchReader for ArrowReader<R> {
impl<R: ChunkReader> RecordBatchReader for ArrowReader<R> {
fn schema(&self) -> SchemaRef {
self.schema_ref.clone()
}
}

impl<R: Read + Seek> Iterator for ArrowReader<R> {
impl<R: ChunkReader> Iterator for ArrowReader<R> {
type Item = std::result::Result<RecordBatch, ArrowError>;

fn next(&mut self) -> Option<Self::Item> {
Expand Down Expand Up @@ -806,45 +807,78 @@ impl NaiveStripeDecoder {
}

pub struct Cursor<R> {
pub(crate) reader: Reader<R>,
pub(crate) root_data_type: RootDataType,
pub(crate) reader: R,
pub(crate) file_metadata: Arc<FileMetadata>,
pub(crate) projected_data_type: RootDataType,
pub(crate) stripe_offset: usize,
}

impl<R> Cursor<R> {
pub fn new<T: AsRef<str>>(r: Reader<R>, fields: &[T]) -> Result<Self> {
let projected_data_type = r.metadata().root_data_type().project(fields);
impl<R: ChunkReader> Cursor<R> {
pub fn new<T: AsRef<str>>(mut reader: R, fields: &[T]) -> Result<Self> {
let file_metadata = Arc::new(read_metadata(&mut reader)?);
let projected_data_type = file_metadata.root_data_type().project(fields);
Ok(Self {
reader: r,
root_data_type: projected_data_type,
reader,
file_metadata,
projected_data_type,
stripe_offset: 0,
})
}

pub fn root(r: Reader<R>) -> Result<Self> {
let data_type = r.metadata().root_data_type().clone();
pub fn root(mut reader: R) -> Result<Self> {
let file_metadata = Arc::new(read_metadata(&mut reader)?);
let data_type = file_metadata.root_data_type().clone();
Ok(Self {
reader: r,
root_data_type: data_type,
reader,
file_metadata,
projected_data_type: data_type,
stripe_offset: 0,
})
}
}

impl<R: Read + Seek> Iterator for Cursor<R> {
impl<R: AsyncChunkReader> Cursor<R> {
pub async fn new_async<T: AsRef<str>>(mut reader: R, fields: &[T]) -> Result<Self> {
let file_metadata = Arc::new(read_metadata_async(&mut reader).await?);
let projected_data_type = file_metadata.root_data_type().project(fields);
Ok(Self {
reader,
file_metadata,
projected_data_type,
stripe_offset: 0,
})
}

pub async fn root_async(mut reader: R) -> Result<Self> {
let file_metadata = Arc::new(read_metadata_async(&mut reader).await?);
let data_type = file_metadata.root_data_type().clone();
Ok(Self {
reader,
file_metadata,
projected_data_type: data_type,
stripe_offset: 0,
})
}
}
WenyXu marked this conversation as resolved.
Show resolved Hide resolved

impl<R: ChunkReader> Iterator for Cursor<R> {
type Item = Result<Stripe>;

fn next(&mut self) -> Option<Self::Item> {
if let Some(info) = self.reader.stripe(self.stripe_offset).cloned() {
if let Some(info) = self
.file_metadata
.stripe_metadatas()
.get(self.stripe_offset)
.cloned()
{
let stripe = Stripe::new(
&mut self.reader,
self.root_data_type.clone(),
&self.file_metadata,
&self.projected_data_type.clone(),
self.stripe_offset,
&info,
);

self.stripe_offset += 1;

Some(stripe)
} else {
None
Expand All @@ -861,41 +895,23 @@ pub struct Stripe {
pub(crate) stream_map: Arc<StreamMap>,
}

#[derive(Debug)]
pub struct StreamMap {
pub inner: HashMap<(u32, Kind), Bytes>,
pub compression: Option<Compression>,
}

impl StreamMap {
pub fn get(&self, column: &Column, kind: Kind) -> Result<Decompressor> {
self.get_opt(column, kind).context(InvalidColumnSnafu {
name: column.name(),
})
}

pub fn get_opt(&self, column: &Column, kind: Kind) -> Option<Decompressor> {
let column_id = column.column_id();

self.inner
.get(&(column_id, kind))
.cloned()
.map(|data| Decompressor::new(data, self.compression, vec![]))
}
}

impl Stripe {
pub fn new<R: Read + Seek>(
r: &mut Reader<R>,
root_data_type: RootDataType,
pub fn new<R: ChunkReader>(
reader: &mut R,
file_metadata: &Arc<FileMetadata>,
projected_data_type: &RootDataType,
stripe: usize,
info: &StripeMetadata,
) -> Result<Self> {
let footer = Arc::new(r.stripe_footer(stripe).clone());
let compression = file_metadata.compression();

let footer = reader
.get_bytes(info.footer_offset(), info.footer_length())
.context(IoSnafu)?;
let footer = Arc::new(deserialize_stripe_footer(&footer, compression)?);

let compression = r.metadata().compression();
//TODO(weny): add tz
let columns = root_data_type
let columns = projected_data_type
.children()
.iter()
.map(|(name, data_type)| Column::new(name, data_type, &footer, info.number_of_rows()))
Expand All @@ -907,7 +923,7 @@ impl Stripe {
let length = stream.length();
let column_id = stream.column();
let kind = stream.kind();
let data = Column::read_stream(r, stream_offset, length as usize)?;
let data = Column::read_stream(reader, stream_offset, length)?;

// TODO(weny): filter out unused streams.
stream_map.insert((column_id, kind), data);
Expand All @@ -934,3 +950,38 @@ impl Stripe {
self.stripe_offset
}
}

#[derive(Debug)]
pub struct StreamMap {
pub inner: HashMap<(u32, Kind), Bytes>,
pub compression: Option<Compression>,
}

impl StreamMap {
pub fn get(&self, column: &Column, kind: Kind) -> Result<Decompressor> {
self.get_opt(column, kind).context(InvalidColumnSnafu {
name: column.name(),
})
}

pub fn get_opt(&self, column: &Column, kind: Kind) -> Option<Decompressor> {
let column_id = column.column_id();

self.inner
.get(&(column_id, kind))
.cloned()
.map(|data| Decompressor::new(data, self.compression, vec![]))
}
}

pub(crate) fn deserialize_stripe_footer(
bytes: &[u8],
compression: Option<Compression>,
) -> Result<StripeFooter> {
let mut buffer = vec![];
// TODO: refactor to not need Bytes::copy_from_slice
Decompressor::new(Bytes::copy_from_slice(bytes), compression, vec![])
.read_to_end(&mut buffer)
.context(error::IoSnafu)?;
StripeFooter::decode(buffer.as_slice()).context(error::DecodeProtoSnafu)
}
40 changes: 8 additions & 32 deletions src/arrow_reader/column.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
use std::io::{Read, Seek, SeekFrom};
use std::sync::Arc;

use arrow::datatypes::Field;
use bytes::Bytes;
use snafu::ResultExt;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};

use crate::error::{self, Result};
use crate::error::{IoSnafu, Result};
use crate::proto::{ColumnEncoding, StripeFooter};
use crate::reader::Reader;
use crate::reader::{AsyncChunkReader, ChunkReader};
use crate::schema::DataType;

pub mod binary;
Expand Down Expand Up @@ -45,24 +43,6 @@ impl From<&Column> for Field {
}
}

macro_rules! impl_read_stream {
($reader:ident,$start:ident,$length:ident $($_await:tt)*) => {{
$reader
.inner
.seek(SeekFrom::Start($start))$($_await)*
.context(error::IoSnafu)?;

let mut scratch = vec![0; $length];

$reader
.inner
.read_exact(&mut scratch)$($_await)*
.context(error::IoSnafu)?;

Ok(Bytes::from(scratch))
}};
}

impl Column {
pub fn new(
name: &str,
Expand Down Expand Up @@ -172,20 +152,16 @@ impl Column {
}
}

pub fn read_stream<R: Read + Seek>(
reader: &mut Reader<R>,
start: u64,
length: usize,
) -> Result<Bytes> {
impl_read_stream!(reader, start, length)
pub fn read_stream<R: ChunkReader>(reader: &mut R, start: u64, length: u64) -> Result<Bytes> {
reader.get_bytes(start, length).context(IoSnafu)
}

pub async fn read_stream_async<R: AsyncRead + AsyncSeek + Unpin + Send>(
reader: &mut Reader<R>,
pub async fn read_stream_async<R: AsyncChunkReader>(
reader: &mut R,
start: u64,
length: usize,
length: u64,
) -> Result<Bytes> {
impl_read_stream!(reader, start, length.await)
reader.get_bytes(start, length).await.context(IoSnafu)
}
}

Expand Down
Loading
Loading