Skip to content

Commit

Permalink
Add example
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Sep 27, 2023
1 parent 9ce9323 commit 9c07441
Showing 1 changed file with 92 additions and 7 deletions.
99 changes: 92 additions & 7 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ impl<W: Write + Send> RecordBatchWriter for ArrowWriter<W> {
}
}

/// A list of [`Bytes`] comprising a single column chunk
/// A single column chunk produced by [`ArrowColumnWriter`]
#[derive(Default)]
pub struct ArrowColumnChunk {
length: usize,
Expand Down Expand Up @@ -352,13 +352,88 @@ impl PageWriter for ArrowPageWriter {
/// A leaf column that can be encoded by [`ArrowColumnWriter`]
pub struct ArrowLeafColumn(ArrayLevels);

/// Computes the [`ArrowLeafColumn`] for a given potentially nested [`ArrayRef`]
/// Computes the [`ArrowLeafColumn`] for a potentially nested [`ArrayRef`]
pub fn compute_leaves(field: &Field, array: &ArrayRef) -> Result<Vec<ArrowLeafColumn>> {
let levels = calculate_array_levels(array, field)?;
Ok(levels.into_iter().map(ArrowLeafColumn).collect())
}

/// Encodes [`ArrowLeafColumn`] to [`ArrowColumnChunk`]
///
/// Note: This is a low-level interface for applications that require fine-grained control
/// of encoding, see [`ArrowWriter`] for a higher-level interface
///
/// ```
/// // The arrow schema
/// # use std::sync::Arc;
/// # use arrow_array::*;
/// # use arrow_schema::*;
/// # use parquet::arrow::arrow_to_parquet_schema;
/// # use parquet::arrow::arrow_writer::{ArrowLeafColumn, compute_leaves, get_column_writers};
/// # use parquet::file::properties::WriterProperties;
/// # use parquet::file::writer::SerializedFileWriter;
/// #
/// let schema = Arc::new(Schema::new(vec![
/// Field::new("i32", DataType::Int32, false),
/// Field::new("f32", DataType::Float32, false),
/// ]));
///
/// // Compute the parquet schema
/// let parquet_schema = arrow_to_parquet_schema(schema.as_ref()).unwrap();
/// let props = Arc::new(WriterProperties::default());
///
/// // Create writers for each of the leaf columns
/// let col_writers = get_column_writers(&parquet_schema, &props, &schema).unwrap();
///
/// // Spawn a worker thread for each column
/// // This is for demonstration purposes, a thread-pool e.g. rayon or tokio, would be better
/// let mut workers: Vec<_> = col_writers
/// .into_iter()
/// .map(|mut col_writer| {
/// let (send, recv) = std::sync::mpsc::channel::<ArrowLeafColumn>();
/// let handle = std::thread::spawn(move || {
/// for col in recv {
/// col_writer.write(&col)?;
/// }
/// col_writer.close()
/// });
/// (handle, send)
/// })
/// .collect();
///
/// // Create parquet writer
/// let root_schema = parquet_schema.root_schema_ptr();
/// let mut out = Vec::with_capacity(1024); // This could be a File
/// let mut writer = SerializedFileWriter::new(&mut out, root_schema, props.clone()).unwrap();
///
/// // Start row group
/// let mut row_group = writer.next_row_group().unwrap();
///
/// // Columns to encode
/// let to_write = vec![
/// Arc::new(Int32Array::from_iter_values([1, 2, 3])) as _,
/// Arc::new(Float32Array::from_iter_values([1., 45., -1.])) as _,
/// ];
///
/// // Spawn work to encode columns
/// let mut worker_iter = workers.iter_mut();
/// for (a, f) in to_write.iter().zip(&schema.fields) {
/// for c in compute_leaves(f, a).unwrap() {
/// worker_iter.next().unwrap().1.send(c).unwrap();
/// }
/// }
///
/// // Finish up parallel column encoding
/// for (handle, send) in workers {
/// drop(send); // Drop send side to signal termination
/// let (chunk, result) = handle.join().unwrap().unwrap();
/// row_group.append_column(&chunk, result).unwrap();
/// }
/// row_group.close().unwrap();
///
/// let metadata = writer.close().unwrap();
/// assert_eq!(metadata.num_rows, 3);
/// ```
pub struct ArrowColumnWriter {
writer: ArrowColumnWriterImpl,
chunk: SharedColumnChunk,
Expand Down Expand Up @@ -415,11 +490,7 @@ impl ArrowRowGroupWriter {
props: &WriterPropertiesPtr,
arrow: &SchemaRef,
) -> Result<Self> {
let mut writers = Vec::with_capacity(arrow.fields.len());
let mut leaves = parquet.columns().iter();
for field in &arrow.fields {
get_arrow_column_writer(field.data_type(), props, &mut leaves, &mut writers)?;
}
let writers = get_column_writers(parquet, props, arrow)?;
Ok(Self {
writers,
schema: arrow.clone(),
Expand All @@ -446,6 +517,20 @@ impl ArrowRowGroupWriter {
}
}

/// Returns the [`ArrowColumnWriter`] for a given schema
pub fn get_column_writers(
parquet: &SchemaDescriptor,
props: &WriterPropertiesPtr,
arrow: &SchemaRef,
) -> Result<Vec<ArrowColumnWriter>> {
let mut writers = Vec::with_capacity(arrow.fields.len());
let mut leaves = parquet.columns().iter();
for field in &arrow.fields {
get_arrow_column_writer(field.data_type(), props, &mut leaves, &mut writers)?;
}
Ok(writers)
}

/// Gets the [`ArrowColumnWriter`] for the given `data_type`
fn get_arrow_column_writer(
data_type: &ArrowDataType,
Expand Down

0 comments on commit 9c07441

Please sign in to comment.