Skip to content

Commit

Permalink
Merge pull request #22 from WenyXu/feat/support-tiny-int
Browse files Browse the repository at this point in the history
feat: support to read tinyint
  • Loading branch information
WenyXu authored Nov 5, 2023
2 parents dd93878 + 493a753 commit e1af701
Show file tree
Hide file tree
Showing 9 changed files with 178 additions and 22 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ Read [Apache ORC](https://orc.apache.org/) in Rust.
| Float, Double || | f32, f64 | Float32, Float64 |
| String, Char, and VarChar || | string | Utf8 |
| Boolean || | bool | Boolean |
| TinyInt | | | | |
| TinyInt | | | i8 | Int8 |
| Binary || | Vec\<u8\> | Binary |
| Decimal || | | |
| Date || | chrono::NaiveDate | Date32 |
Expand Down
9 changes: 8 additions & 1 deletion src/arrow_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use arrow::record_batch::{RecordBatch, RecordBatchReader};
use chrono::{Datelike, NaiveDate, NaiveDateTime};
use snafu::{OptionExt, ResultExt};

use self::column::tinyint::new_i8_iter;
use self::column::Column;
use crate::arrow_reader::column::binary::new_binary_iterator;
use crate::arrow_reader::column::boolean::new_boolean_iter;
Expand Down Expand Up @@ -117,6 +118,7 @@ pub enum Decoder {
Int64(NullableIterator<i64>),
Int32(NullableIterator<i64>),
Int16(NullableIterator<i64>),
Int8(NullableIterator<i8>),
Boolean(NullableIterator<bool>),
Float32(NullableIterator<f32>),
Float64(NullableIterator<f64>),
Expand Down Expand Up @@ -207,6 +209,7 @@ macro_rules! impl_decode_next_batch_cast {
impl_decode_next_batch_cast!(i64, Int64Type);
impl_decode_next_batch_cast!(i32, Int32Type);
impl_decode_next_batch_cast!(i16, Int16Type);
impl_decode_next_batch!(i8);
impl_decode_next_batch!(f32);
impl_decode_next_batch!(f64);

Expand Down Expand Up @@ -238,6 +241,10 @@ impl NaiveStripeDecoder {
Some(array) => fields.push(array),
None => break,
},
Decoder::Int8(decoder) => match decode_next_batch_i8(decoder, chunk)? {
Some(array) => fields.push(array),
None => break,
},
Decoder::Float32(decoder) => match decode_next_batch_f32(decoder, chunk)? {
Some(array) => fields.push(array),
None => break,
Expand Down Expand Up @@ -328,7 +335,7 @@ impl NaiveStripeDecoder {
for col in &stripe.columns {
let decoder = match col.kind() {
crate::proto::r#type::Kind::Boolean => Decoder::Boolean(new_boolean_iter(col)?),
crate::proto::r#type::Kind::Byte => todo!(),
crate::proto::r#type::Kind::Byte => Decoder::Int8(new_i8_iter(col)?),
crate::proto::r#type::Kind::Short => Decoder::Int16(new_i64_iter(col)?),
crate::proto::r#type::Kind::Int => Decoder::Int32(new_i64_iter(col)?),
crate::proto::r#type::Kind::Long => Decoder::Int64(new_i64_iter(col)?),
Expand Down
1 change: 1 addition & 0 deletions src/arrow_reader/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub mod int;
pub mod present;
pub mod string;
pub mod timestamp;
pub mod tinyint;

#[derive(Debug)]
pub struct Column {
Expand Down
26 changes: 26 additions & 0 deletions src/arrow_reader/column/tinyint.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
use snafu::OptionExt;

use crate::arrow_reader::column::present::new_present_iter;
use crate::arrow_reader::column::{Column, NullableIterator};
use crate::error::{InvalidColumnSnafu, Result};
use crate::proto::stream::Kind;
use crate::reader::decode::byte_rle::ByteRleIter;

pub fn new_i8_iter(column: &Column) -> Result<NullableIterator<i8>> {
let present = new_present_iter(column)?.collect::<Result<Vec<_>>>()?;
let rows: usize = present.iter().filter(|&p| *p).count();

let iter = column
.stream(Kind::Data)
.transpose()?
.map(|reader| {
Box::new(ByteRleIter::new(reader, rows).map(|value| value.map(|value| value as i8)))
as _
})
.context(InvalidColumnSnafu { name: &column.name })?;

Ok(NullableIterator {
present: Box::new(present.into_iter()),
iter,
})
}
1 change: 1 addition & 0 deletions src/reader/decode.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod boolean_rle;
pub mod byte_rle;
pub mod float;
pub mod rle_v2;
mod util;
Expand Down
119 changes: 119 additions & 0 deletions src/reader/decode/byte_rle.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
use crate::error::Result;
use std::io::Read;

use super::util::read_u8;

const MAX_LITERAL_SIZE: usize = 128;
const MIN_REPEAT_SIZE: usize = 3;

pub struct ByteRleIter<R: Read> {
reader: R,
literals: [u8; MAX_LITERAL_SIZE],
num_literals: usize,
used: usize,
repeat: bool,
remaining: usize,
}

impl<R: Read> ByteRleIter<R> {
pub fn new(reader: R, length: usize) -> Self {
Self {
reader,
literals: [0u8; MAX_LITERAL_SIZE],
num_literals: 0,
used: 0,
repeat: false,
remaining: length,
}
}

pub fn into_inner(self) -> R {
self.reader
}

fn read_byte(&mut self) -> Result<u8> {
read_u8(&mut self.reader)
}

fn read_values(&mut self) -> Result<()> {
let control = self.read_byte()?;
self.used = 0;
if control < 0x80 {
self.repeat = true;
self.num_literals = control as usize + MIN_REPEAT_SIZE;
let val = self.read_byte()?;
self.literals[0] = val;
} else {
self.repeat = false;
self.num_literals = 0x100 - control as usize;
for i in 0..self.num_literals {
let result = self.read_byte()?;
self.literals[i] = result;
}
}
Ok(())
}
}

impl<R: Read> Iterator for ByteRleIter<R> {
type Item = Result<u8>;

fn next(&mut self) -> Option<Self::Item> {
if self.remaining == 0 {
return None;
}
if self.used == self.num_literals {
match self.read_values() {
Ok(_) => {}
Err(err) => return Some(Err(err)),
}
}

let result = if self.repeat {
self.literals[0]
} else {
self.literals[self.used]
};
self.used += 1;
self.remaining -= 1;
Some(Ok(result))
}
}

#[cfg(test)]
mod test {
use super::*;

#[test]
fn reader_test() {
let data = [0x61u8, 0x00];

let data = &mut data.as_ref();

let iter = ByteRleIter::new(data, 100)
.collect::<Result<Vec<_>>>()
.unwrap();

assert_eq!(iter, vec![0; 100]);

let data = [0x01, 0x01];

let data = &mut data.as_ref();

let iter = ByteRleIter::new(data, 4)
.collect::<Result<Vec<_>>>()
.unwrap();

assert_eq!(iter, vec![1; 4]);

let data = [0xfe, 0x44, 0x45];

let data = &mut data.as_ref();

let iter = ByteRleIter::new(data, 2)
.collect::<Result<Vec<_>>>()
.unwrap();

assert_eq!(iter, vec![0x44, 0x45]);
}
}
Binary file modified tests/basic/data/test.orc
Binary file not shown.
5 changes: 4 additions & 1 deletion tests/basic/data/write.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
"utf8_increase": ["a", "bb", "ccc", "dddd", "eeeee"],
"utf8_decrease": ["eeeee", "dddd", "ccc", "bb", "a"],
"timestamp_simple": [datetime.datetime(2023, 4, 1, 20, 15, 30, 2000), datetime.datetime.fromtimestamp(int('1629617204525777000')/1000000000), datetime.datetime(2023, 1, 1), datetime.datetime(2023, 2, 1), datetime.datetime(2023, 3, 1)],
"date_simple": [datetime.date(2023, 4, 1), datetime.date(2023, 3, 1), datetime.date(2023, 1, 1), datetime.date(2023, 2, 1), datetime.date(2023, 3, 1)]
"date_simple": [datetime.date(2023, 4, 1), datetime.date(2023, 3, 1), datetime.date(2023, 1, 1), datetime.date(2023, 2, 1), datetime.date(2023, 3, 1)],
"tinyint_simple": [-1, None, 1, 127, -127]
}

def infer_schema(data):
Expand All @@ -48,6 +49,8 @@ def infer_schema(data):
dt = "double"
if key.startswith("bigint"):
dt = "bigint"
if key.startswith("tinyint"):
dt = "tinyint"
schema += key + ":" + dt + ","

schema = schema[:-1] + ">"
Expand Down
37 changes: 18 additions & 19 deletions tests/basic/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,15 +229,15 @@ pub fn basic_test_0() {
let reader = new_arrow_reader_root(&path);
let batch = reader.collect::<Result<Vec<_>, _>>().unwrap();

let expected = r#"+-----+-------+------------+-----+-----+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+----------------------------+-------------+
| a | b | str_direct | d | e | f | int_short_repeated | int_neg_short_repeated | int_delta | int_neg_delta | int_direct | int_neg_direct | bigint_direct | bigint_neg_direct | bigint_other | utf8_increase | utf8_decrease | timestamp_simple | date_simple |
+-----+-------+------------+-----+-----+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+----------------------------+-------------+
| 1.0 | true | a | a | ddd | aaaaa | 5 | -5 | 1 | 5 | 1 | -1 | 1 | -1 | 5 | a | eeeee | 2023-04-01T20:15:30.002 | 2023-04-01 |
| 2.0 | false | cccccc | bb | cc | bbbbb | 5 | -5 | 2 | 4 | 6 | -6 | 6 | -6 | -5 | bb | dddd | 2021-08-22T07:26:44.525777 | 2023-03-01 |
| | | | | | | | | | | | | | | 1 | ccc | ccc | 2023-01-01T00:00:00 | 2023-01-01 |
| 4.0 | true | ddd | ccc | bb | ccccc | 5 | -5 | 4 | 2 | 3 | -3 | 3 | -3 | 5 | dddd | bb | 2023-02-01T00:00:00 | 2023-02-01 |
| 5.0 | false | ee | ddd | a | ddddd | 5 | -5 | 5 | 1 | 2 | -2 | 2 | -2 | 5 | eeeee | a | 2023-03-01T00:00:00 | 2023-03-01 |
+-----+-------+------------+-----+-----+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+----------------------------+-------------+"#;
let expected = r#"+-----+-------+------------+-----+-----+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+----------------------------+-------------+----------------+
| a | b | str_direct | d | e | f | int_short_repeated | int_neg_short_repeated | int_delta | int_neg_delta | int_direct | int_neg_direct | bigint_direct | bigint_neg_direct | bigint_other | utf8_increase | utf8_decrease | timestamp_simple | date_simple | tinyint_simple |
+-----+-------+------------+-----+-----+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+----------------------------+-------------+----------------+
| 1.0 | true | a | a | ddd | aaaaa | 5 | -5 | 1 | 5 | 1 | -1 | 1 | -1 | 5 | a | eeeee | 2023-04-01T20:15:30.002 | 2023-04-01 | -1 |
| 2.0 | false | cccccc | bb | cc | bbbbb | 5 | -5 | 2 | 4 | 6 | -6 | 6 | -6 | -5 | bb | dddd | 2021-08-22T07:26:44.525777 | 2023-03-01 | |
| | | | | | | | | | | | | | | 1 | ccc | ccc | 2023-01-01T00:00:00 | 2023-01-01 | 1 |
| 4.0 | true | ddd | ccc | bb | ccccc | 5 | -5 | 4 | 2 | 3 | -3 | 3 | -3 | 5 | dddd | bb | 2023-02-01T00:00:00 | 2023-02-01 | 127 |
| 5.0 | false | ee | ddd | a | ddddd | 5 | -5 | 5 | 1 | 2 | -2 | 2 | -2 | 5 | eeeee | a | 2023-03-01T00:00:00 | 2023-03-01 | -127 |
+-----+-------+------------+-----+-----+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+----------------------------+-------------+----------------+"#;
assert_eq!(
expected,
pretty::pretty_format_batches(&batch).unwrap().to_string()
Expand All @@ -250,16 +250,15 @@ pub async fn async_basic_test_0() {
let reader = new_arrow_stream_reader_root(&path).await;
let batch = reader.try_collect::<Vec<_>>().await.unwrap();

let expected = r#"+-----+-------+------------+-----+-----+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+----------------------------+-------------+
| a | b | str_direct | d | e | f | int_short_repeated | int_neg_short_repeated | int_delta | int_neg_delta | int_direct | int_neg_direct | bigint_direct | bigint_neg_direct | bigint_other | utf8_increase | utf8_decrease | timestamp_simple | date_simple |
+-----+-------+------------+-----+-----+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+----------------------------+-------------+
| 1.0 | true | a | a | ddd | aaaaa | 5 | -5 | 1 | 5 | 1 | -1 | 1 | -1 | 5 | a | eeeee | 2023-04-01T20:15:30.002 | 2023-04-01 |
| 2.0 | false | cccccc | bb | cc | bbbbb | 5 | -5 | 2 | 4 | 6 | -6 | 6 | -6 | -5 | bb | dddd | 2021-08-22T07:26:44.525777 | 2023-03-01 |
| | | | | | | | | | | | | | | 1 | ccc | ccc | 2023-01-01T00:00:00 | 2023-01-01 |
| 4.0 | true | ddd | ccc | bb | ccccc | 5 | -5 | 4 | 2 | 3 | -3 | 3 | -3 | 5 | dddd | bb | 2023-02-01T00:00:00 | 2023-02-01 |
| 5.0 | false | ee | ddd | a | ddddd | 5 | -5 | 5 | 1 | 2 | -2 | 2 | -2 | 5 | eeeee | a | 2023-03-01T00:00:00 | 2023-03-01 |
+-----+-------+------------+-----+-----+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+----------------------------+-------------+"#;

let expected = r#"+-----+-------+------------+-----+-----+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+----------------------------+-------------+----------------+
| a | b | str_direct | d | e | f | int_short_repeated | int_neg_short_repeated | int_delta | int_neg_delta | int_direct | int_neg_direct | bigint_direct | bigint_neg_direct | bigint_other | utf8_increase | utf8_decrease | timestamp_simple | date_simple | tinyint_simple |
+-----+-------+------------+-----+-----+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+----------------------------+-------------+----------------+
| 1.0 | true | a | a | ddd | aaaaa | 5 | -5 | 1 | 5 | 1 | -1 | 1 | -1 | 5 | a | eeeee | 2023-04-01T20:15:30.002 | 2023-04-01 | -1 |
| 2.0 | false | cccccc | bb | cc | bbbbb | 5 | -5 | 2 | 4 | 6 | -6 | 6 | -6 | -5 | bb | dddd | 2021-08-22T07:26:44.525777 | 2023-03-01 | |
| | | | | | | | | | | | | | | 1 | ccc | ccc | 2023-01-01T00:00:00 | 2023-01-01 | 1 |
| 4.0 | true | ddd | ccc | bb | ccccc | 5 | -5 | 4 | 2 | 3 | -3 | 3 | -3 | 5 | dddd | bb | 2023-02-01T00:00:00 | 2023-02-01 | 127 |
| 5.0 | false | ee | ddd | a | ddddd | 5 | -5 | 5 | 1 | 2 | -2 | 2 | -2 | 5 | eeeee | a | 2023-03-01T00:00:00 | 2023-03-01 | -127 |
+-----+-------+------------+-----+-----+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+----------------------------+-------------+----------------+"#;
assert_eq!(
expected,
pretty::pretty_format_batches(&batch).unwrap().to_string()
Expand Down

0 comments on commit e1af701

Please sign in to comment.