Skip to content

Commit

Permalink
refactor: Fix parquet file metadata is dropped after first DSL->IR co…
Browse files Browse the repository at this point in the history
…nversion (#18789)
  • Loading branch information
nameexhaustion authored Sep 17, 2024
1 parent 5d81d4a commit 25f84e4
Show file tree
Hide file tree
Showing 17 changed files with 269 additions and 251 deletions.
4 changes: 4 additions & 0 deletions crates/polars-io/src/parquet/read/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,10 @@ impl<R: MmapBytesReader> ParquetReader<R> {
self
}

pub fn set_metadata(&mut self, metadata: FileMetadataRef) {
self.metadata = Some(metadata);
}

pub fn get_metadata(&mut self) -> PolarsResult<&FileMetadataRef> {
if self.metadata.is_none() {
self.metadata = Some(Arc::new(read::read_metadata(&mut self.reader)?));
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-lazy/src/scan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ impl LazyFileListReader for LazyCsvReader {
/// Get the final [LazyFrame].
fn finish(self) -> PolarsResult<LazyFrame> {
let mut lf: LazyFrame = DslBuilder::scan_csv(
self.sources.to_dsl(false),
self.sources,
self.read_options,
self.cache,
self.cloud_options,
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-lazy/src/scan/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl LazyFileListReader for LazyIpcReader {
let options = IpcScanOptions {};

let mut lf: LazyFrame = DslBuilder::scan_ipc(
self.sources.to_dsl(false),
self.sources,
options,
args.n_rows,
args.cache,
Expand Down
7 changes: 4 additions & 3 deletions crates/polars-lazy/src/scan/ndjson.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::num::NonZeroUsize;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex, RwLock};
use std::sync::Arc;

use polars_core::prelude::*;
use polars_io::cloud::CloudOptions;
Expand Down Expand Up @@ -155,10 +155,11 @@ impl LazyFileListReader for LazyJsonLineReader {
};

Ok(LazyFrame::from(DslPlan::Scan {
sources: Arc::new(Mutex::new(self.sources.to_dsl(false))),
file_info: Arc::new(RwLock::new(None)),
sources: self.sources,
file_info: None,
file_options,
scan_type,
cached_ir: Default::default(),
}))
}

Expand Down
2 changes: 1 addition & 1 deletion crates/polars-lazy/src/scan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl LazyFileListReader for LazyParquetReader {
let row_index = self.args.row_index;

let mut lf: LazyFrame = DslBuilder::scan_parquet(
self.sources.to_dsl(false),
self.sources,
self.args.n_rows,
self.args.cache,
self.args.parallel,
Expand Down
24 changes: 21 additions & 3 deletions crates/polars-mem-engine/src/executors/scan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,16 @@ impl ParquetExec {
.into_par_iter()
.map(|&i| {
let memslice = self.sources.at(i).to_memslice()?;
ParquetReader::new(std::io::Cursor::new(memslice)).num_rows()

let mut reader = ParquetReader::new(std::io::Cursor::new(memslice));

if i == 0 {
if let Some(md) = self.metadata.clone() {
reader.set_metadata(md)
}
}

reader.num_rows()
})
.collect::<PolarsResult<Vec<_>>>()?;

Expand Down Expand Up @@ -151,7 +160,15 @@ impl ParquetExec {

let memslice = source.to_memslice()?;

let mut reader = ParquetReader::new(std::io::Cursor::new(memslice))
let mut reader = ParquetReader::new(std::io::Cursor::new(memslice));

if i == 0 {
if let Some(md) = self.metadata.clone() {
reader.set_metadata(md)
}
}

let mut reader = reader
.read_parallel(parallel)
.set_low_memory(self.options.low_memory)
.use_statistics(self.options.use_statistics)
Expand Down Expand Up @@ -266,14 +283,15 @@ impl ParquetExec {
let mut iter = stream::iter((0..paths.len()).rev().map(|i| {
let paths = paths.clone();
let cloud_options = cloud_options.clone();
let first_metadata = first_metadata.clone();

pl_async::get_runtime().spawn(async move {
PolarsResult::Ok((
i,
ParquetAsyncReader::from_uri(
paths[i].to_str().unwrap(),
cloud_options.as_ref().as_ref(),
None,
first_metadata.filter(|_| i == 0),
)
.await?
.num_rows()
Expand Down
21 changes: 14 additions & 7 deletions crates/polars-pipe/src/executors/sources/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub struct ParquetSource {
file_options: FileScanOptions,
#[allow(dead_code)]
cloud_options: Option<CloudOptions>,
metadata: Option<FileMetadataRef>,
first_metadata: Option<FileMetadataRef>,
file_info: FileInfo,
hive_parts: Option<Arc<Vec<HivePartitions>>>,
verbose: bool,
Expand All @@ -61,7 +61,6 @@ impl ParquetSource {
}

fn init_next_reader_sync(&mut self) -> PolarsResult<()> {
self.metadata = None;
self.init_reader_sync()
}

Expand Down Expand Up @@ -133,7 +132,16 @@ impl ParquetSource {

let batched_reader = {
let file = std::fs::File::open(path).unwrap();
let mut reader = ParquetReader::new(file)

let mut reader = ParquetReader::new(file);

if index == 0 {
if let Some(md) = self.first_metadata.clone() {
reader.set_metadata(md);
}
}

let mut reader = reader
.with_projection(projection)
.check_schema(
self.file_info
Expand Down Expand Up @@ -191,7 +199,7 @@ impl ParquetSource {
async fn init_reader_async(&self, index: usize) -> PolarsResult<BatchedParquetReader> {
use std::sync::atomic::Ordering;

let metadata = self.metadata.clone();
let metadata = self.first_metadata.clone().filter(|_| index == 0);
let predicate = self.predicate.clone();
let cloud_options = self.cloud_options.clone();
let (path, options, file_options, projection, chunk_size, hive_partitions) =
Expand Down Expand Up @@ -252,7 +260,7 @@ impl ParquetSource {
sources: ScanSources,
options: ParquetOptions,
cloud_options: Option<CloudOptions>,
metadata: Option<FileMetadataRef>,
first_metadata: Option<FileMetadataRef>,
file_options: FileScanOptions,
file_info: FileInfo,
hive_parts: Option<Arc<Vec<HivePartitions>>>,
Expand Down Expand Up @@ -282,7 +290,7 @@ impl ParquetSource {
iter,
sources,
cloud_options,
metadata,
first_metadata,
file_info,
hive_parts,
verbose,
Expand All @@ -293,7 +301,6 @@ impl ParquetSource {
// Already start downloading when we deal with cloud urls.
if run_async {
source.init_next_reader()?;
source.metadata = None;
}
Ok(source)
}
Expand Down
3 changes: 1 addition & 2 deletions crates/polars-plan/src/client/check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ pub(super) fn assert_cloud_eligible(dsl: &DslPlan) -> PolarsResult<()> {
DslPlan::Scan {
sources, scan_type, ..
} => {
let sources_lock = sources.lock().unwrap();
match &sources_lock.sources {
match sources {
ScanSources::Paths(paths) => {
if paths.iter().any(|p| !is_cloud_url(p)) {
return ineligible_error("contains scan of local file system");
Expand Down
31 changes: 16 additions & 15 deletions crates/polars-plan/src/plans/builder_dsl.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::sync::{Arc, Mutex, RwLock};
use std::sync::Arc;

use polars_core::prelude::*;
#[cfg(any(feature = "parquet", feature = "ipc", feature = "csv"))]
Expand Down Expand Up @@ -57,11 +57,8 @@ impl DslBuilder {
};

Ok(DslPlan::Scan {
sources: Arc::new(Mutex::new(DslScanSources {
sources: ScanSources::Buffers(Arc::default()),
is_expanded: true,
})),
file_info: Arc::new(RwLock::new(Some(file_info))),
sources: ScanSources::Buffers(Arc::default()),
file_info: Some(file_info),
file_options,
scan_type: FileScan::Anonymous {
function,
Expand All @@ -70,14 +67,15 @@ impl DslBuilder {
skip_rows,
}),
},
cached_ir: Default::default(),
}
.into())
}

#[cfg(feature = "parquet")]
#[allow(clippy::too_many_arguments)]
pub fn scan_parquet(
sources: DslScanSources,
sources: ScanSources,
n_rows: Option<usize>,
cache: bool,
parallel: polars_io::parquet::read::ParallelStrategy,
Expand All @@ -102,8 +100,8 @@ impl DslBuilder {
include_file_paths,
};
Ok(DslPlan::Scan {
sources: Arc::new(Mutex::new(sources)),
file_info: Arc::new(RwLock::new(None)),
sources,
file_info: None,
file_options: options,
scan_type: FileScan::Parquet {
options: ParquetOptions {
Expand All @@ -114,14 +112,15 @@ impl DslBuilder {
cloud_options,
metadata: None,
},
cached_ir: Default::default(),
}
.into())
}

#[cfg(feature = "ipc")]
#[allow(clippy::too_many_arguments)]
pub fn scan_ipc(
sources: DslScanSources,
sources: ScanSources,
options: IpcScanOptions,
n_rows: Option<usize>,
cache: bool,
Expand All @@ -132,8 +131,8 @@ impl DslBuilder {
include_file_paths: Option<PlSmallStr>,
) -> PolarsResult<Self> {
Ok(DslPlan::Scan {
sources: Arc::new(Mutex::new(sources)),
file_info: Arc::new(RwLock::new(None)),
sources,
file_info: None,
file_options: FileScanOptions {
with_columns: None,
cache,
Expand All @@ -150,14 +149,15 @@ impl DslBuilder {
cloud_options,
metadata: None,
},
cached_ir: Default::default(),
}
.into())
}

#[allow(clippy::too_many_arguments)]
#[cfg(feature = "csv")]
pub fn scan_csv(
sources: DslScanSources,
sources: ScanSources,
read_options: CsvReadOptions,
cache: bool,
cloud_options: Option<CloudOptions>,
Expand All @@ -183,13 +183,14 @@ impl DslBuilder {
include_file_paths,
};
Ok(DslPlan::Scan {
sources: Arc::new(Mutex::new(sources)),
file_info: Arc::new(RwLock::new(None)),
sources,
file_info: None,
file_options: options,
scan_type: FileScan::Csv {
options: read_options,
cloud_options,
},
cached_ir: Default::default(),
}
.into())
}
Expand Down
Loading

0 comments on commit 25f84e4

Please sign in to comment.