Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support schema arg in read/scan_parquet() #19013

Merged
merged 1 commit into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/polars-io/src/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,5 @@ pub mod _internal {
pub use super::mmap::to_deserializer;
pub use super::predicates::read_this_row_group;
pub use super::read_impl::{calc_prefilter_cost, PrefilterMaskSetting};
pub use super::utils::ensure_matching_dtypes_if_found;
}
4 changes: 3 additions & 1 deletion crates/polars-io/src/parquet/read/options.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use arrow::datatypes::ArrowSchemaRef;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, PartialEq, Eq, Copy, Hash)]
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct ParquetOptions {
pub schema: Option<ArrowSchemaRef>,
pub parallel: ParallelStrategy,
pub low_memory: bool,
pub use_statistics: bool,
Expand Down
6 changes: 5 additions & 1 deletion crates/polars-io/src/parquet/read/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub use super::read_impl::BatchedParquetReader;
use super::read_impl::{compute_row_group_range, read_parquet, FetchRowGroupsFromMmapReader};
#[cfg(feature = "cloud")]
use super::utils::materialize_empty_df;
use super::utils::projected_arrow_schema_to_projection_indices;
use super::utils::{ensure_matching_dtypes_if_found, projected_arrow_schema_to_projection_indices};
#[cfg(feature = "cloud")]
use crate::cloud::CloudOptions;
use crate::mmap::MmapBytesReader;
Expand Down Expand Up @@ -90,6 +90,8 @@ impl<R: MmapBytesReader> ParquetReader<R> {
allow_missing_columns: bool,
) -> PolarsResult<Self> {
if allow_missing_columns {
// Must check the dtypes
ensure_matching_dtypes_if_found(first_schema, self.schema()?.as_ref())?;
self.schema.replace(first_schema.clone());
}

Expand Down Expand Up @@ -327,6 +329,8 @@ impl ParquetAsyncReader {
allow_missing_columns: bool,
) -> PolarsResult<Self> {
if allow_missing_columns {
// Must check the dtypes
ensure_matching_dtypes_if_found(first_schema, self.schema().await?.as_ref())?;
self.schema.replace(first_schema.clone());
}

Expand Down
34 changes: 32 additions & 2 deletions crates/polars-io/src/parquet/read/utils.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::borrow::Cow;

use polars_core::prelude::{ArrowSchema, DataFrame, DataType, Series, IDX_DTYPE};
use polars_core::schema::SchemaNamesAndDtypes;
use polars_error::{polars_bail, PolarsResult};

use crate::hive::materialize_hive_partitions;
Expand Down Expand Up @@ -51,11 +52,40 @@ pub(super) fn projected_arrow_schema_to_projection_indices(
let expected_dtype = DataType::from_arrow(&field.dtype, true);

if dtype.clone() != expected_dtype {
polars_bail!(SchemaMismatch: "data type mismatch for column {}: found: {}, expected: {}",
&field.name, dtype, expected_dtype
polars_bail!(SchemaMismatch: "data type mismatch for column {}: expected: {}, found: {}",
&field.name, expected_dtype, dtype
)
}
}

Ok((!is_full_ordered_projection).then_some(projection_indices))
}

/// Utility to ensure the dtype of the column in `current_schema` matches the dtype in `schema` if
/// that column exists in `schema`.
pub fn ensure_matching_dtypes_if_found(
schema: &ArrowSchema,
current_schema: &ArrowSchema,
) -> PolarsResult<()> {
current_schema
.iter_names_and_dtypes()
.try_for_each(|(name, dtype)| {
if let Some(field) = schema.get(name) {
if dtype != &field.dtype {
// Check again with timezone normalization
// TODO: Add an ArrowDtype eq wrapper?
let lhs = DataType::from_arrow(dtype, true);
let rhs = DataType::from_arrow(&field.dtype, true);

if lhs != rhs {
polars_bail!(
SchemaMismatch:
"dtypes differ for column {}: {:?} != {:?}"
, name, dtype, &field.dtype
);
}
}
}
Ok(())
})
}
3 changes: 3 additions & 0 deletions crates/polars-lazy/src/scan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub struct ScanArgsParquet {
pub cloud_options: Option<CloudOptions>,
pub hive_options: HiveOptions,
pub use_statistics: bool,
pub schema: Option<SchemaRef>,
pub low_memory: bool,
pub rechunk: bool,
pub cache: bool,
Expand All @@ -33,6 +34,7 @@ impl Default for ScanArgsParquet {
cloud_options: None,
hive_options: Default::default(),
use_statistics: true,
schema: None,
rechunk: false,
low_memory: false,
cache: true,
Expand Down Expand Up @@ -73,6 +75,7 @@ impl LazyFileListReader for LazyParquetReader {
self.args.low_memory,
self.args.cloud_options,
self.args.use_statistics,
self.args.schema.as_deref(),
self.args.hive_options,
self.args.glob,
self.args.include_file_paths,
Expand Down
12 changes: 10 additions & 2 deletions crates/polars-mem-engine/src/executors/scan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,11 @@ impl ParquetExec {
// Modified if we have a negative slice
let mut first_source = 0;

let first_schema = self.file_info.reader_schema.clone().unwrap().unwrap_left();
let first_schema = self
.options
.schema
.clone()
.unwrap_or_else(|| self.file_info.reader_schema.clone().unwrap().unwrap_left());

let projected_arrow_schema = {
if let Some(with_columns) = self.file_options.with_columns.as_deref() {
Expand Down Expand Up @@ -258,7 +262,11 @@ impl ParquetExec {
eprintln!("POLARS PREFETCH_SIZE: {}", batch_size)
}

let first_schema = self.file_info.reader_schema.clone().unwrap().unwrap_left();
let first_schema = self
.options
.schema
.clone()
.unwrap_or_else(|| self.file_info.reader_schema.clone().unwrap().unwrap_left());

let projected_arrow_schema = {
if let Some(with_columns) = self.file_options.with_columns.as_deref() {
Expand Down
7 changes: 5 additions & 2 deletions crates/polars-pipe/src/executors/sources/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl ParquetSource {
.as_paths()
.ok_or_else(|| polars_err!(nyi = "Streaming scanning of in-memory buffers"))?;
let path = &paths[index];
let options = self.options;
let options = self.options.clone();
let file_options = self.file_options.clone();

let hive_partitions = self
Expand Down Expand Up @@ -261,7 +261,10 @@ impl ParquetSource {
}
let run_async = paths.first().map(is_cloud_url).unwrap_or(false) || config::force_async();

let first_schema = file_info.reader_schema.clone().unwrap().unwrap_left();
let first_schema = options
.schema
.clone()
.unwrap_or_else(|| file_info.reader_schema.clone().unwrap().unwrap_left());

let projected_arrow_schema = {
if let Some(with_columns) = file_options.with_columns.as_deref() {
Expand Down
2 changes: 2 additions & 0 deletions crates/polars-plan/src/plans/builder_dsl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ impl DslBuilder {
low_memory: bool,
cloud_options: Option<CloudOptions>,
use_statistics: bool,
schema: Option<&Schema>,
hive_options: HiveOptions,
glob: bool,
include_file_paths: Option<PlSmallStr>,
Expand All @@ -108,6 +109,7 @@ impl DslBuilder {
file_options: options,
scan_type: FileScan::Parquet {
options: ParquetOptions {
schema: schema.map(|x| Arc::new(x.to_arrow(CompatLevel::newest()))),
parallel,
low_memory,
use_statistics,
Expand Down
4 changes: 3 additions & 1 deletion crates/polars-python/src/lazyframe/general.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ impl PyLazyFrame {
#[cfg(feature = "parquet")]
#[staticmethod]
#[pyo3(signature = (source, sources, n_rows, cache, parallel, rechunk, row_index,
low_memory, cloud_options, use_statistics, hive_partitioning, hive_schema, try_parse_hive_dates, retries, glob, include_file_paths, allow_missing_columns)
low_memory, cloud_options, use_statistics, hive_partitioning, schema, hive_schema, try_parse_hive_dates, retries, glob, include_file_paths, allow_missing_columns)
)]
fn new_from_parquet(
source: Option<PyObject>,
Expand All @@ -254,6 +254,7 @@ impl PyLazyFrame {
cloud_options: Option<Vec<(String, String)>>,
use_statistics: bool,
hive_partitioning: Option<bool>,
schema: Option<Wrap<Schema>>,
hive_schema: Option<Wrap<Schema>>,
try_parse_hive_dates: bool,
retries: usize,
Expand Down Expand Up @@ -285,6 +286,7 @@ impl PyLazyFrame {
low_memory,
cloud_options: None,
use_statistics,
schema: schema.map(|x| Arc::new(x.0)),
hive_options,
glob,
include_file_paths: include_file_paths.map(|x| x.into()),
Expand Down
18 changes: 2 additions & 16 deletions crates/polars-stream/src/nodes/parquet_source/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,7 @@ impl ParquetSourceNode {
);
}

let reader_schema = self
.file_info
.reader_schema
.as_ref()
.unwrap()
.as_ref()
.unwrap_left()
.clone();
let reader_schema = self.schema.clone().unwrap();

let (normalized_slice_oneshot_rx, metadata_rx, metadata_task_handle) =
self.init_metadata_fetcher();
Expand Down Expand Up @@ -361,14 +354,7 @@ impl ParquetSourceNode {
}

pub(super) fn init_projected_arrow_schema(&mut self) {
let reader_schema = self
.file_info
.reader_schema
.as_ref()
.unwrap()
.as_ref()
.unwrap_left()
.clone();
let reader_schema = self.schema.clone().unwrap();

self.projected_arrow_schema = Some(
if let Some(columns) = self.file_options.with_columns.as_deref() {
Expand Down
17 changes: 7 additions & 10 deletions crates/polars-stream/src/nodes/parquet_source/metadata_fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::sync::Arc;
use futures::StreamExt;
use polars_error::{polars_bail, PolarsResult};
use polars_io::prelude::FileMetadata;
use polars_io::prelude::_internal::ensure_matching_dtypes_if_found;
use polars_io::utils::byte_source::{DynByteSource, MemSliceByteSource};
use polars_io::utils::slice::SplitSlicePosition;
use polars_utils::mmap::MemSlice;
Expand Down Expand Up @@ -106,21 +107,15 @@ impl ParquetSourceNode {
};

let first_metadata = self.first_metadata.clone();
let reader_schema_len = self
.file_info
.reader_schema
.as_ref()
.unwrap()
.as_ref()
.unwrap_left()
.len();
let first_schema = self.schema.clone().unwrap();
let has_projection = self.file_options.with_columns.is_some();
let allow_missing_columns = self.file_options.allow_missing_columns;

let process_metadata_bytes = {
move |handle: task_handles_ext::AbortOnDropHandle<
PolarsResult<(usize, Arc<DynByteSource>, MemSlice)>,
>| {
let first_schema = first_schema.clone();
let projected_arrow_schema = projected_arrow_schema.clone();
let first_metadata = first_metadata.clone();
// Run on CPU runtime - metadata deserialization is expensive, especially
Expand All @@ -138,14 +133,16 @@ impl ParquetSourceNode {

let schema = polars_parquet::arrow::read::infer_schema(&metadata)?;

if !has_projection && schema.len() > reader_schema_len {
if !has_projection && schema.len() > first_schema.len() {
polars_bail!(
SchemaMismatch:
"parquet file contained extra columns and no selection was given"
)
}

if !allow_missing_columns {
if allow_missing_columns {
ensure_matching_dtypes_if_found(&first_schema, &schema)?;
} else {
ensure_schema_has_projected_fields(
&schema,
projected_arrow_schema.as_ref(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,8 @@ pub(super) fn ensure_schema_has_projected_fields(
};

if dtype != expected_dtype {
polars_bail!(SchemaMismatch: "data type mismatch for column {}: found: {}, expected: {}",
&field.name, dtype, expected_dtype
polars_bail!(SchemaMismatch: "data type mismatch for column {}: expected: {}, found: {}",
&field.name, expected_dtype, dtype
)
}
}
Expand Down
15 changes: 15 additions & 0 deletions crates/polars-stream/src/nodes/parquet_source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub struct ParquetSourceNode {
config: Config,
verbose: bool,
physical_predicate: Option<Arc<dyn PhysicalIoExpr>>,
schema: Option<Arc<ArrowSchema>>,
projected_arrow_schema: Option<Arc<ArrowSchema>>,
byte_source_builder: DynByteSourceBuilder,
memory_prefetch_func: fn(&[u8]) -> (),
Expand Down Expand Up @@ -112,6 +113,7 @@ impl ParquetSourceNode {
},
verbose,
physical_predicate: None,
schema: None,
projected_arrow_schema: None,
byte_source_builder,
memory_prefetch_func,
Expand Down Expand Up @@ -154,6 +156,19 @@ impl ComputeNode for ParquetSourceNode {
eprintln!("[ParquetSource]: {:?}", &self.config);
}

self.schema = Some(
self.options
.schema
.take()
.unwrap_or_else(|| self.file_info.reader_schema.take().unwrap().unwrap_left()),
);

{
// Ensure these are not used anymore
self.options.schema.take();
self.file_info.reader_schema.take();
}

self.init_projected_arrow_schema();
self.physical_predicate = self.predicate.clone().map(phys_expr_to_io_expr);

Expand Down
Loading