Skip to content

Commit

Permalink
Add RecordReader trait and proc macro to implement it for a struct (a…
Browse files Browse the repository at this point in the history
…pache#4773)

* add and implement RecordReader trait for rust structs

* Fix typo in comment

* run cargo fmt

* partially solve issues raised in review

* remove references

* change interface to use vectors

* change interface to use vectors in  as well

* update comments

* remove intitialisation requirement

* prevent conflicts with existing default implementation

* update documentation

* run cargo fmt

* change writer back to slice

* change 'Handle' back to 'Derive' for RecordWriter macro in readme

---------

Co-authored-by: joseph rance <[email protected]>
  • Loading branch information
Joseph-Rance and joseph rance authored Oct 30, 2023
1 parent 890823b commit d9aaa43
Show file tree
Hide file tree
Showing 7 changed files with 553 additions and 30 deletions.
2 changes: 2 additions & 0 deletions parquet/src/record/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
mod api;
pub mod reader;
mod record_reader;
mod record_writer;
mod triplet;

pub use self::{
api::{
Field, List, ListAccessor, Map, MapAccessor, Row, RowAccessor, RowColumnIter, RowFormatter,
},
record_reader::RecordReader,
record_writer::RecordWriter,
};
30 changes: 30 additions & 0 deletions parquet/src/record/record_reader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use super::super::errors::ParquetError;
use super::super::file::reader::RowGroupReader;

/// read up to `max_records` records from `row_group_reader` into `self`
/// The type parameter `T` is used to work around the rust orphan rule
/// when implementing on types such as `Vec<T>`.
pub trait RecordReader<T> {
fn read_from_row_group(
&mut self,
row_group_reader: &mut dyn RowGroupReader,
num_records: usize,
) -> Result<(), ParquetError>;
}
4 changes: 4 additions & 0 deletions parquet/src/record/record_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ use crate::schema::types::TypePtr;
use super::super::errors::ParquetError;
use super::super::file::writer::SerializedRowGroupWriter;

/// `write_to_row_group` writes from `self` into `row_group_writer`
/// `schema` builds the schema used by `row_group_writer`
/// The type parameter `T` is used to work around the rust orphan rule
/// when implementing on types such as `&[T]`.
pub trait RecordWriter<T> {
fn write_to_row_group<W: std::io::Write + Send>(
&self,
Expand Down
51 changes: 45 additions & 6 deletions parquet_derive/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@

# Parquet Derive

A crate for deriving `RecordWriter` for arbitrary, _simple_ structs. This does not generate writers for arbitrarily nested
structures. It only works for primitives and a few generic structures and
various levels of reference. Please see features checklist for what is currently
A crate for deriving `RecordWriter` and `RecordReader` for arbitrary, _simple_ structs. This does not
generate readers or writers for arbitrarily nested structures. It only works for primitives and a few
generic structures and various levels of reference. Please see features checklist for what is currently
supported.

Derive also has some support for the chrono time library. You must must enable the `chrono` feature to get this support.
Expand Down Expand Up @@ -77,16 +77,55 @@ writer.close_row_group(row_group).unwrap();
writer.close().unwrap();
```

Example usage of deriving a `RecordReader` for your struct:

```rust
use parquet::file::{serialized_reader::SerializedFileReader, reader::FileReader};
use parquet_derive::ParquetRecordReader;

#[derive(ParquetRecordReader)]
struct ACompleteRecord {
pub a_bool: bool,
pub a_string: String,
pub i16: i16,
pub i32: i32,
pub u64: u64,
pub isize: isize,
pub float: f32,
pub double: f64,
pub now: chrono::NaiveDateTime,
pub byte_vec: Vec<u8>,
}

// Initialize your parquet file
let reader = SerializedFileReader::new(file).unwrap();
let mut row_group = reader.get_row_group(0).unwrap();

// create your records vector to read into
let mut chunks: Vec<ACompleteRecord> = Vec::new();

// The derived `RecordReader` takes over here
chunks.read_from_row_group(&mut *row_group, 1).unwrap();
```

## Features

- [x] Support writing `String`, `&str`, `bool`, `i32`, `f32`, `f64`, `Vec<u8>`
- [ ] Support writing dictionaries
- [x] Support writing logical types like timestamp
- [x] Derive definition_levels for `Option`
- [ ] Derive definition levels for nested structures
- [x] Derive definition_levels for `Option` for writing
- [ ] Derive definition levels for nested structures for writing
- [ ] Derive writing tuple struct
- [ ] Derive writing `tuple` container types

- [x] Support reading `String`, `&str`, `bool`, `i32`, `f32`, `f64`, `Vec<u8>`
- [ ] Support reading/writing dictionaries
- [x] Support reading/writing logical types like timestamp
- [ ] Handle definition_levels for `Option` for reading
- [ ] Handle definition levels for nested structures for reading
- [ ] Derive reading/writing tuple struct
- [ ] Derive reading/writing `tuple` container types

## Requirements

- Same as `parquet-rs`
Expand All @@ -103,4 +142,4 @@ To compile and view in the browser, run `cargo doc --no-deps --open`.

## License

Licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0.
Licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0.
88 changes: 87 additions & 1 deletion parquet_derive/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ mod parquet_field;
/// use parquet::file::writer::SerializedFileWriter;
///
/// use std::sync::Arc;
//
///
/// #[derive(ParquetRecordWriter)]
/// struct ACompleteRecord<'a> {
/// pub a_bool: bool,
Expand Down Expand Up @@ -137,3 +137,89 @@ pub fn parquet_record_writer(input: proc_macro::TokenStream) -> proc_macro::Toke
}
}).into()
}

/// Derive flat, simple RecordReader implementations. Works by parsing
/// a struct tagged with `#[derive(ParquetRecordReader)]` and emitting
/// the correct writing code for each field of the struct. Column readers
/// are generated in the order they are defined.
///
/// It is up to the programmer to keep the order of the struct
/// fields lined up with the schema.
///
/// Example:
///
/// ```ignore
/// use parquet::file::{serialized_reader::SerializedFileReader, reader::FileReader};
/// use parquet_derive::{ParquetRecordReader};
///
/// #[derive(ParquetRecordReader)]
/// struct ACompleteRecord {
/// pub a_bool: bool,
/// pub a_string: String,
/// }
///
/// pub fn read_some_records() -> Vec<ACompleteRecord> {
/// let mut samples: Vec<ACompleteRecord> = Vec::new();
///
/// let reader = SerializedFileReader::new(file).unwrap();
/// let mut row_group = reader.get_row_group(0).unwrap();
/// samples.read_from_row_group(&mut *row_group, 1).unwrap();
/// samples
/// }
/// ```
///
#[proc_macro_derive(ParquetRecordReader)]
pub fn parquet_record_reader(input: proc_macro::TokenStream) -> proc_macro::TokenStream {
let input: DeriveInput = parse_macro_input!(input as DeriveInput);
let fields = match input.data {
Data::Struct(DataStruct { fields, .. }) => fields,
Data::Enum(_) => unimplemented!("Enum currently is not supported"),
Data::Union(_) => unimplemented!("Union currently is not supported"),
};

let field_infos: Vec<_> = fields.iter().map(parquet_field::Field::from).collect();
let field_names: Vec<_> = fields.iter().map(|f| f.ident.clone()).collect();
let reader_snippets: Vec<proc_macro2::TokenStream> =
field_infos.iter().map(|x| x.reader_snippet()).collect();
let i: Vec<_> = (0..reader_snippets.len()).collect();

let derived_for = input.ident;
let generics = input.generics;

(quote! {

impl #generics ::parquet::record::RecordReader<#derived_for #generics> for Vec<#derived_for #generics> {
fn read_from_row_group(
&mut self,
row_group_reader: &mut dyn ::parquet::file::reader::RowGroupReader,
num_records: usize,
) -> Result<(), ::parquet::errors::ParquetError> {
use ::parquet::column::reader::ColumnReader;

let mut row_group_reader = row_group_reader;

for _ in 0..num_records {
self.push(#derived_for {
#(
#field_names: Default::default()
),*
})
}

let records = self; // Used by all the reader snippets to be more clear

#(
{
if let Ok(mut column_reader) = row_group_reader.get_column_reader(#i) {
#reader_snippets
} else {
return Err(::parquet::errors::ParquetError::General("Failed to get next column".into()))
}
}
);*

Ok(())
}
}
}).into()
}
Loading

0 comments on commit d9aaa43

Please sign in to comment.