Skip to content

Commit

Permalink
chore: remove unstable features
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Nov 3, 2023
1 parent aa29e38 commit 8f76cbc
Show file tree
Hide file tree
Showing 9 changed files with 106 additions and 39 deletions.
5 changes: 4 additions & 1 deletion src/arrow_reader/column/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ use crate::reader::decode::variable_length::Values;
use crate::reader::decompress::Decompressor;

pub fn new_binary_iterator(column: &Column) -> error::Result<NullableIterator<Vec<u8>>> {
let null_mask = new_present_iter(column)?.try_collect::<Vec<_>>()?;
let null_mask = new_present_iter(column)?
.collect::<Vec<_>>()
.into_iter()
.collect::<error::Result<Vec<_>>>()?;

let values = column
.stream(Kind::Data)
Expand Down
5 changes: 4 additions & 1 deletion src/arrow_reader/column/boolean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ use crate::proto::stream::Kind;
use crate::reader::decode::boolean_rle::BooleanIter;

pub fn new_boolean_iter(column: &Column) -> Result<NullableIterator<bool>> {
let present = new_present_iter(column)?.try_collect::<Vec<_>>()?;
let present = new_present_iter(column)?
.collect::<Vec<_>>()
.into_iter()
.collect::<Result<Vec<_>>>()?;
let rows: usize = present.iter().filter(|&p| *p).count();

let iter = column
Expand Down
5 changes: 4 additions & 1 deletion src/arrow_reader/column/date.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ impl Iterator for DateIterator {
}

pub fn new_date_iter(column: &Column) -> Result<NullableIterator<NaiveDate>> {
let present = new_present_iter(column)?.try_collect::<Vec<_>>()?;
let present = new_present_iter(column)?
.collect::<Vec<_>>()
.into_iter()
.collect::<Result<Vec<_>>>()?;

let data = column
.stream(Kind::Data)
Expand Down
4 changes: 3 additions & 1 deletion src/arrow_reader/column/float.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ macro_rules! impl_float_iter {
($tp:ident) => {
paste::item! {
pub fn [<new_ $tp _iter>] (column: &Column) -> Result<NullableIterator<$tp>> {
let present = new_present_iter(column)?.try_collect::<Vec<_>>()?;
let present = new_present_iter(column)?.collect::<Vec<_>>()
.into_iter()
.collect::<Result<Vec<_>>>()?;
let rows: usize = present.iter().filter(|&p| *p).count();
let iter = column
.stream(Kind::Data)
Expand Down
5 changes: 4 additions & 1 deletion src/arrow_reader/column/int.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ use crate::proto::stream::Kind;
use crate::reader::decode::rle_v2::RleReaderV2;

pub fn new_i64_iter(column: &Column) -> Result<NullableIterator<i64>> {
let present = new_present_iter(column)?.try_collect::<Vec<_>>()?;
let present = new_present_iter(column)?
.collect::<Vec<_>>()
.into_iter()
.collect::<Result<Vec<_>>>()?;

let iter = column
.stream(Kind::Data)
Expand Down
17 changes: 13 additions & 4 deletions src/arrow_reader/column/string.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ impl Iterator for DirectStringIterator {
}

pub fn new_direct_string_iter(column: &Column) -> Result<NullableIterator<String>> {
let present = new_present_iter(column)?.try_collect::<Vec<_>>()?;
let present = new_present_iter(column)?
.collect::<Vec<_>>()
.into_iter()
.collect::<Result<Vec<_>>>()?;

let values = column
.stream(Kind::Data)
Expand All @@ -57,7 +60,10 @@ pub fn new_direct_string_iter(column: &Column) -> Result<NullableIterator<String
}

pub fn new_arrow_dict_string_decoder(column: &Column) -> Result<(NullableIterator<i64>, ArrayRef)> {
let present = new_present_iter(column)?.try_collect::<Vec<_>>()?;
let present = new_present_iter(column)?
.collect::<Vec<_>>()
.into_iter()
.collect::<Result<Vec<_>>>()?;

// DictionaryData
let values = column
Expand All @@ -71,9 +77,12 @@ pub fn new_arrow_dict_string_decoder(column: &Column) -> Result<(NullableIterato
.transpose()?
.map(|reader| Box::new(RleReaderV2::try_new(reader, false, true)))
.context(error::InvalidColumnSnafu { name: &column.name })?;
let mut iter = DirectStringIterator { values, lengths };
let iter = DirectStringIterator { values, lengths };

let values = iter.try_collect::<Vec<_>>()?;
let values = iter
.collect::<Vec<_>>()
.into_iter()
.collect::<Result<Vec<_>>>()?;

let indexes = column
.stream(Kind::Data)
Expand Down
5 changes: 4 additions & 1 deletion src/arrow_reader/column/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ impl Iterator for TimestampIterator {
}

pub fn new_timestamp_iter(column: &Column) -> Result<NullableIterator<NaiveDateTime>> {
let present = new_present_iter(column)?.try_collect::<Vec<_>>()?;
let present = new_present_iter(column)?
.collect::<Vec<_>>()
.into_iter()
.collect::<Result<Vec<_>>>()?;

let data = column
.stream(Kind::Data)
Expand Down
5 changes: 0 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,3 @@
#![feature(trait_upcasting)]
#![allow(incomplete_features)]
#![feature(iterator_try_collect)]
#![feature(iter_next_chunk)]

pub mod arrow_reader;
pub mod async_arrow_reader;
pub mod error;
Expand Down
94 changes: 70 additions & 24 deletions src/reader/decode/rle_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,17 +143,25 @@ mod test {
let expected = [1, 1, 1, 1, 1, 0, 1, 0, 1, 0, 0, 1, 1, 1, 1];

let cursor = Cursor::new(data);
let mut reader = UnsignedRleReaderV2::try_new(cursor, false);
let a = reader.try_collect::<Vec<_>>().unwrap();
let reader = UnsignedRleReaderV2::try_new(cursor, false);
let a = reader
.collect::<Vec<_>>()
.into_iter()
.collect::<Result<Vec<_>>>()
.unwrap();
assert_eq!(a, expected);

// direct
let data = [0x5eu8, 0x03, 0x5c, 0xa1, 0xab, 0x1e, 0xde, 0xad, 0xbe, 0xef];
let expected = [23713, 43806, 57005, 48879];

let cursor = Cursor::new(data);
let mut reader = UnsignedRleReaderV2::try_new(cursor, false);
let a = reader.try_collect::<Vec<_>>().unwrap();
let reader = UnsignedRleReaderV2::try_new(cursor, false);
let a = reader
.collect::<Vec<_>>()
.into_iter()
.collect::<Result<Vec<_>>>()
.unwrap();
assert_eq!(a, expected);

// patched base
Expand All @@ -167,32 +175,48 @@ mod test {
];

let cursor = Cursor::new(data);
let mut reader = UnsignedRleReaderV2::try_new(cursor, false);
let a = reader.try_collect::<Vec<_>>().unwrap();
let reader = UnsignedRleReaderV2::try_new(cursor, false);
let a = reader
.collect::<Vec<_>>()
.into_iter()
.collect::<Result<Vec<_>>>()
.unwrap();
assert_eq!(a, expected);

let data = [196u8, 9, 2, 2, 74, 40, 166];
let expected = [2u64, 3, 5, 7, 11, 13, 17, 19, 23, 29];

let cursor = Cursor::new(data);
let mut reader = UnsignedRleReaderV2::try_new(cursor, false);
let a = reader.try_collect::<Vec<_>>().unwrap();
let reader = UnsignedRleReaderV2::try_new(cursor, false);
let a = reader
.collect::<Vec<_>>()
.into_iter()
.collect::<Result<Vec<_>>>()
.unwrap();
assert_eq!(a, expected);

let data = [0xc6u8, 0x09, 0x02, 0x02, 0x22, 0x42, 0x42, 0x46];
let expected = [2u64, 3, 5, 7, 11, 13, 17, 19, 23, 29];

let cursor = Cursor::new(data);
let mut reader = UnsignedRleReaderV2::try_new(cursor, false);
let a = reader.try_collect::<Vec<_>>().unwrap();
let reader = UnsignedRleReaderV2::try_new(cursor, false);
let a = reader
.collect::<Vec<_>>()
.into_iter()
.collect::<Result<Vec<_>>>()
.unwrap();
assert_eq!(a, expected);

let data = [7u8, 1];
let expected = [1u64, 1, 1, 1, 1, 1, 1, 1, 1, 1];

let cursor = Cursor::new(data);
let mut reader = UnsignedRleReaderV2::try_new(cursor, false);
let a = reader.try_collect::<Vec<_>>().unwrap();
let reader = UnsignedRleReaderV2::try_new(cursor, false);
let a = reader
.collect::<Vec<_>>()
.into_iter()
.collect::<Result<Vec<_>>>()
.unwrap();
assert_eq!(a, expected);
}

Expand All @@ -202,8 +226,12 @@ mod test {
let data: [u8; 3] = [0x0a, 0x27, 0x10];

let cursor = Cursor::new(data);
let mut reader = UnsignedRleReaderV2::try_new(cursor, false);
let a = reader.try_collect::<Vec<_>>().unwrap();
let reader = UnsignedRleReaderV2::try_new(cursor, false);
let a = reader
.collect::<Vec<_>>()
.into_iter()
.collect::<Result<Vec<_>>>()
.unwrap();

assert_eq!(a, vec![10000, 10000, 10000, 10000, 10000]);
}
Expand All @@ -214,8 +242,12 @@ mod test {
let data: [u8; 10] = [0x5e, 0x03, 0x5c, 0xa1, 0xab, 0x1e, 0xde, 0xad, 0xbe, 0xef];

let cursor = Cursor::new(data);
let mut reader = UnsignedRleReaderV2::try_new(cursor, false);
let a = reader.try_collect::<Vec<_>>().unwrap();
let reader = UnsignedRleReaderV2::try_new(cursor, false);
let a = reader
.collect::<Vec<_>>()
.into_iter()
.collect::<Result<Vec<_>>>()
.unwrap();

assert_eq!(a, vec![23713, 43806, 57005, 48879]);
}
Expand All @@ -226,8 +258,12 @@ mod test {
let data = [110u8, 3, 0, 185, 66, 1, 86, 60, 1, 189, 90, 1, 125, 222];

let cursor = Cursor::new(data);
let mut reader = RleReaderV2::try_new(cursor, true, false);
let a = reader.try_collect::<Vec<_>>().unwrap();
let reader = RleReaderV2::try_new(cursor, true, false);
let a = reader
.collect::<Vec<_>>()
.into_iter()
.collect::<Result<Vec<_>>>()
.unwrap();

assert_eq!(a, vec![23713, 43806, 57005, 48879]);
}
Expand All @@ -241,8 +277,12 @@ mod test {
let data: [u8; 8] = [0xc6, 0x09, 0x02, 0x02, 0x22, 0x42, 0x42, 0x46];

let cursor = Cursor::new(data);
let mut reader = UnsignedRleReaderV2::try_new(cursor, false);
let a = reader.try_collect::<Vec<_>>().unwrap();
let reader = UnsignedRleReaderV2::try_new(cursor, false);
let a = reader
.collect::<Vec<_>>()
.into_iter()
.collect::<Result<Vec<_>>>()
.unwrap();

assert_eq!(a, vec![2, 3, 5, 7, 11, 13, 17, 19, 23, 29]);
}
Expand All @@ -259,9 +299,11 @@ mod test {
];

let cursor = Cursor::new(data);
let mut reader = RleReaderV2::try_new(cursor, false, false);
let reader = RleReaderV2::try_new(cursor, false, false);
let a = reader
.try_collect::<Vec<_>>()
.collect::<Vec<_>>()
.into_iter()
.collect::<Result<Vec<_>>>()
.unwrap()
.into_iter()
.map(|v| v as u64)
Expand Down Expand Up @@ -301,8 +343,12 @@ mod test {
];

let cursor = Cursor::new(data);
let mut reader = RleReaderV2::try_new(cursor, false, false);
let a = reader.try_collect::<Vec<_>>().unwrap();
let reader = RleReaderV2::try_new(cursor, false, false);
let a = reader
.collect::<Vec<_>>()
.into_iter()
.collect::<Result<Vec<_>>>()
.unwrap();

assert_eq!(a.len(), expected.len());

Expand Down

0 comments on commit 8f76cbc

Please sign in to comment.