Skip to content

Commit

Permalink
Refactor schema/type handling (#45)
Browse files Browse the repository at this point in the history
* Refactor schema/type handling

* Removed unused dependency
  • Loading branch information
Jefffrey authored Nov 19, 2023
1 parent cae1014 commit a86dda2
Show file tree
Hide file tree
Showing 9 changed files with 613 additions and 474 deletions.
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ fallible-streaming-iterator = { version = "0.1" }
flate2 = "1"
futures = { version = "0.3", default-features = false, features = ["std"] }
futures-util = "0.3"
lazy_static = "1.4"
lz4_flex = "0.11"
lzokay-native = "0.1"
paste = "1.0"
Expand Down
91 changes: 39 additions & 52 deletions src/arrow_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ use crate::error::{self, InvalidColumnSnafu, Result};
use crate::proto::stream::Kind;
use crate::proto::StripeFooter;
use crate::reader::decompress::{Compression, Decompressor};
use crate::reader::schema::{create_field, TypeDescription};
use crate::reader::Reader;
use crate::schema::{DataType, RootDataType};
use crate::stripe::StripeMetadata;

pub struct ArrowReader<R: Read> {
Expand Down Expand Up @@ -102,14 +102,7 @@ pub fn create_arrow_schema<R>(cursor: &Cursor<R>) -> Schema {
.iter()
.map(|(key, value)| (key.clone(), String::from_utf8_lossy(value).to_string()))
.collect::<HashMap<_, _>>();

let fields = cursor
.columns
.iter()
.map(|(name, typ)| Arc::new(create_field((name, typ))))
.collect::<Vec<_>>();

Schema::new_with_metadata(fields, metadata)
cursor.root_data_type.create_arrow_schema(&metadata)
}

impl<R: Read + Seek> RecordBatchReader for ArrowReader<R> {
Expand Down Expand Up @@ -726,28 +719,26 @@ pub trait BatchDecoder: Send {
}

pub fn reader_factory(col: &Column, stripe: &Stripe) -> Result<Decoder> {
let reader = match col.kind() {
crate::proto::r#type::Kind::Boolean => Decoder::Boolean(new_boolean_iter(col, stripe)?),
crate::proto::r#type::Kind::Byte => Decoder::Int8(new_i8_iter(col, stripe)?),
crate::proto::r#type::Kind::Short => Decoder::Int16(new_i64_iter(col, stripe)?),
crate::proto::r#type::Kind::Int => Decoder::Int32(new_i64_iter(col, stripe)?),
crate::proto::r#type::Kind::Long => Decoder::Int64(new_i64_iter(col, stripe)?),
crate::proto::r#type::Kind::Float => Decoder::Float32(new_f32_iter(col, stripe)?),
crate::proto::r#type::Kind::Double => Decoder::Float64(new_f64_iter(col, stripe)?),
crate::proto::r#type::Kind::String => Decoder::String(StringDecoder::new(col, stripe)?),
crate::proto::r#type::Kind::Binary => Decoder::Binary(new_binary_iterator(col, stripe)?),
crate::proto::r#type::Kind::Timestamp => {
Decoder::Timestamp(new_timestamp_iter(col, stripe)?)
}
crate::proto::r#type::Kind::List => Decoder::List(new_list_iter(col, stripe)?),
crate::proto::r#type::Kind::Map => Decoder::Map(new_map_iter(col, stripe)?),
crate::proto::r#type::Kind::Struct => Decoder::Struct(new_struct_iter(col, stripe)?),
crate::proto::r#type::Kind::Union => todo!(),
crate::proto::r#type::Kind::Decimal => todo!(),
crate::proto::r#type::Kind::Date => Decoder::Date(new_i64_iter(col, stripe)?),
crate::proto::r#type::Kind::Varchar => Decoder::String(StringDecoder::new(col, stripe)?),
crate::proto::r#type::Kind::Char => Decoder::String(StringDecoder::new(col, stripe)?),
crate::proto::r#type::Kind::TimestampInstant => todo!(),
let reader = match col.data_type() {
DataType::Boolean { .. } => Decoder::Boolean(new_boolean_iter(col, stripe)?),
DataType::Byte { .. } => Decoder::Int8(new_i8_iter(col, stripe)?),
DataType::Short { .. } => Decoder::Int16(new_i64_iter(col, stripe)?),
DataType::Int { .. } => Decoder::Int32(new_i64_iter(col, stripe)?),
DataType::Long { .. } => Decoder::Int64(new_i64_iter(col, stripe)?),
DataType::Float { .. } => Decoder::Float32(new_f32_iter(col, stripe)?),
DataType::Double { .. } => Decoder::Float64(new_f64_iter(col, stripe)?),
DataType::String { .. } => Decoder::String(StringDecoder::new(col, stripe)?),
DataType::Binary { .. } => Decoder::Binary(new_binary_iterator(col, stripe)?),
DataType::Timestamp { .. } => Decoder::Timestamp(new_timestamp_iter(col, stripe)?),
DataType::List { .. } => Decoder::List(new_list_iter(col, stripe)?),
DataType::Map { .. } => Decoder::Map(new_map_iter(col, stripe)?),
DataType::Struct { .. } => Decoder::Struct(new_struct_iter(col, stripe)?),
DataType::Union { .. } => todo!(),
DataType::Decimal { .. } => todo!(),
DataType::Date { .. } => Decoder::Date(new_i64_iter(col, stripe)?),
DataType::Varchar { .. } => Decoder::String(StringDecoder::new(col, stripe)?),
DataType::Char { .. } => Decoder::String(StringDecoder::new(col, stripe)?),
DataType::TimestampWithLocalTimezone { .. } => todo!(),
};

Ok(reader)
Expand Down Expand Up @@ -816,35 +807,25 @@ impl NaiveStripeDecoder {

pub struct Cursor<R> {
pub(crate) reader: Reader<R>,
pub(crate) columns: Arc<Vec<(String, Arc<TypeDescription>)>>,
pub(crate) root_data_type: RootDataType,
pub(crate) stripe_offset: usize,
}

impl<R> Cursor<R> {
pub fn new<T: AsRef<str>>(r: Reader<R>, fields: &[T]) -> Result<Self> {
let mut columns = Vec::with_capacity(fields.len());
for name in fields {
let field = r
.metadata()
.type_description()
.field(name.as_ref())
.context(error::FieldNotFoundSnafu {
name: name.as_ref(),
})?;
columns.push((name.as_ref().to_string(), field));
}
let projected_data_type = r.metadata().root_data_type().project(fields);
Ok(Self {
reader: r,
columns: Arc::new(columns),
root_data_type: projected_data_type,
stripe_offset: 0,
})
}

pub fn root(r: Reader<R>) -> Result<Self> {
let columns = r.metadata().type_description().children();
let data_type = r.metadata().root_data_type().clone();
Ok(Self {
reader: r,
columns: Arc::new(columns),
root_data_type: data_type,
stripe_offset: 0,
})
}
Expand All @@ -855,7 +836,12 @@ impl<R: Read + Seek> Iterator for Cursor<R> {

fn next(&mut self) -> Option<Self::Item> {
if let Some(info) = self.reader.stripe(self.stripe_offset).cloned() {
let stripe = Stripe::new(&mut self.reader, &self.columns, self.stripe_offset, &info);
let stripe = Stripe::new(
&mut self.reader,
self.root_data_type.clone(),
self.stripe_offset,
&info,
);

self.stripe_offset += 1;

Expand Down Expand Up @@ -901,18 +887,19 @@ impl StreamMap {
impl Stripe {
pub fn new<R: Read + Seek>(
r: &mut Reader<R>,
column_defs: &[(String, Arc<TypeDescription>)],
root_data_type: RootDataType,
stripe: usize,
info: &StripeMetadata,
) -> Result<Self> {
let footer = Arc::new(r.stripe_footer(stripe).clone());

let compression = r.metadata().compression();
//TODO(weny): add tz
let mut columns = Vec::with_capacity(column_defs.len());
for (name, typ) in column_defs.iter() {
columns.push(Column::new(name, typ, &footer, info.number_of_rows()));
}
let columns = root_data_type
.children()
.iter()
.map(|(name, data_type)| Column::new(name, data_type, &footer, info.number_of_rows()))
.collect();

let mut stream_map = HashMap::new();
let mut stream_offset = info.offset();
Expand Down
Loading

0 comments on commit a86dda2

Please sign in to comment.