From 6aaa98c1d758f0e00df9c588424a8099d1a259c9 Mon Sep 17 00:00:00 2001 From: nameexhaustion Date: Mon, 23 Dec 2024 20:32:24 +1100 Subject: [PATCH] fix: Fix using `new_columns` in `scan_csv` with compressed file (#20412) --- crates/polars-lazy/src/scan/csv.rs | 23 ++++++++++------------- py-polars/tests/unit/io/test_csv.py | 20 ++++++++++++++++++++ 2 files changed, 30 insertions(+), 13 deletions(-) diff --git a/crates/polars-lazy/src/scan/csv.rs b/crates/polars-lazy/src/scan/csv.rs index c284232f93ef..05f33bd0f257 100644 --- a/crates/polars-lazy/src/scan/csv.rs +++ b/crates/polars-lazy/src/scan/csv.rs @@ -5,10 +5,11 @@ use polars_io::cloud::CloudOptions; use polars_io::csv::read::{ infer_file_schema, CommentPrefix, CsvEncoding, CsvParseOptions, CsvReadOptions, NullValues, }; -use polars_io::mmap::ReaderBytes; use polars_io::path_utils::expand_paths; +use polars_io::utils::compression::maybe_decompress_bytes; use polars_io::utils::get_reader_bytes; use polars_io::RowIndex; +use polars_utils::mmap::MemSlice; use crate::prelude::*; @@ -242,14 +243,17 @@ impl LazyCsvReader { { let mut n_threads = self.read_options.n_threads; - let mut infer_schema = |reader_bytes: ReaderBytes| { + let mut infer_schema = |bytes: MemSlice| { let skip_rows = self.read_options.skip_rows; let skip_lines = self.read_options.skip_lines; let parse_options = self.read_options.get_parse_options(); + let mut owned = vec![]; + let bytes = maybe_decompress_bytes(bytes.as_ref(), &mut owned)?; + PolarsResult::Ok( infer_file_schema( - &reader_bytes, + &get_reader_bytes(&mut std::io::Cursor::new(bytes))?, &parse_options, self.read_options.infer_schema_length, self.read_options.has_header, @@ -275,28 +279,21 @@ impl LazyCsvReader { polars_bail!(ComputeError: "no paths specified for this reader"); }; - let mut file = polars_utils::open_file(path)?; - infer_schema(get_reader_bytes(&mut file).expect("could not mmap file"))? + infer_schema(MemSlice::from_file(&polars_utils::open_file(path)?)?)? }, ScanSources::Files(files) => { let Some(file) = files.first() else { polars_bail!(ComputeError: "no buffers specified for this reader"); }; - infer_schema( - get_reader_bytes(&mut std::io::BufReader::new(file)) - .expect("could not mmap file"), - )? + infer_schema(MemSlice::from_file(file)?)? }, ScanSources::Buffers(buffers) => { let Some(buffer) = buffers.first() else { polars_bail!(ComputeError: "no buffers specified for this reader"); }; - infer_schema( - get_reader_bytes(&mut std::io::Cursor::new(buffer)) - .expect("could not mmap file"), - )? + infer_schema(buffer.clone())? }, }; diff --git a/py-polars/tests/unit/io/test_csv.py b/py-polars/tests/unit/io/test_csv.py index 5d7552b9a2b8..43b05892d65d 100644 --- a/py-polars/tests/unit/io/test_csv.py +++ b/py-polars/tests/unit/io/test_csv.py @@ -2428,3 +2428,23 @@ def test_csv_invalid_quoted_comment_line() -> None: assert pl.read_csv( b'#"Comment\nColA\tColB\n1\t2', separator="\t", comment_prefix="#" ).to_dict(as_series=False) == {"ColA": [1], "ColB": [2]} + + +def test_csv_compressed_new_columns_19916() -> None: + n_rows = 100 + + df = pl.DataFrame( + { + "a": range(n_rows), + "b": range(n_rows), + "c": range(n_rows), + "d": range(n_rows), + "e": range(n_rows), + "f": range(n_rows), + } + ) + + b = zstandard.compress(df.write_csv(include_header=False).encode()) + + q = pl.scan_csv(b, has_header=False, new_columns=["a", "b", "c", "d", "e", "f"]) + assert_frame_equal(q.collect(), df)