From 9b491fd343d3661a2591a8328f6b9daf04d6088e Mon Sep 17 00:00:00 2001 From: nameexhaustion Date: Thu, 5 Dec 2024 18:32:05 +1100 Subject: [PATCH] perf: Reduce memory copy when scanning from Python objects (#20142) --- .../src/executors/scan/ipc.rs | 2 +- .../polars-plan/src/plans/ir/scan_sources.rs | 10 ++-- .../src/plans/optimizer/count_star.rs | 4 +- crates/polars-python/src/conversion/mod.rs | 3 +- crates/polars-python/src/dataframe/io.rs | 2 +- crates/polars-python/src/file.rs | 52 ++++++++----------- crates/polars-utils/src/mmap.rs | 17 ++++-- 7 files changed, 49 insertions(+), 41 deletions(-) diff --git a/crates/polars-mem-engine/src/executors/scan/ipc.rs b/crates/polars-mem-engine/src/executors/scan/ipc.rs index 78b31f268756..3fcb4b66e263 100644 --- a/crates/polars-mem-engine/src/executors/scan/ipc.rs +++ b/crates/polars-mem-engine/src/executors/scan/ipc.rs @@ -84,7 +84,7 @@ impl IpcExec { MemSlice::from_file(&file)? }, ScanSourceRef::File(file) => MemSlice::from_file(file)?, - ScanSourceRef::Buffer(buff) => MemSlice::from_bytes(buff.clone()), + ScanSourceRef::Buffer(buff) => buff.clone(), }; IpcReader::new(std::io::Cursor::new(memslice)) diff --git a/crates/polars-plan/src/plans/ir/scan_sources.rs b/crates/polars-plan/src/plans/ir/scan_sources.rs index d0a060d6e48d..4111bc18d29b 100644 --- a/crates/polars-plan/src/plans/ir/scan_sources.rs +++ b/crates/polars-plan/src/plans/ir/scan_sources.rs @@ -25,7 +25,7 @@ pub enum ScanSources { #[cfg_attr(feature = "serde", serde(skip))] Files(Arc<[File]>), #[cfg_attr(feature = "serde", serde(skip))] - Buffers(Arc<[bytes::Bytes]>), + Buffers(Arc<[MemSlice]>), } impl Debug for ScanSources { @@ -43,7 +43,7 @@ impl Debug for ScanSources { pub enum ScanSourceRef<'a> { Path(&'a Path), File(&'a File), - Buffer(&'a bytes::Bytes), + Buffer(&'a MemSlice), } /// An iterator for [`ScanSources`] @@ -263,7 +263,7 @@ impl ScanSourceRef<'_> { MemSlice::from_file(&file) }, ScanSourceRef::File(file) => MemSlice::from_file(file), - ScanSourceRef::Buffer(buff) => Ok(MemSlice::from_bytes((*buff).clone())), + ScanSourceRef::Buffer(buff) => Ok((*buff).clone()), } } @@ -289,7 +289,7 @@ impl ScanSourceRef<'_> { MemSlice::from_file(&file) }, Self::File(file) => MemSlice::from_file(file), - Self::Buffer(buff) => Ok(MemSlice::from_bytes((*buff).clone())), + Self::Buffer(buff) => Ok((*buff).clone()), } } @@ -306,7 +306,7 @@ impl ScanSourceRef<'_> { .await }, Self::File(file) => Ok(DynByteSource::from(MemSlice::from_file(file)?)), - Self::Buffer(buff) => Ok(DynByteSource::from(MemSlice::from_bytes((*buff).clone()))), + Self::Buffer(buff) => Ok(DynByteSource::from((*buff).clone())), } } } diff --git a/crates/polars-plan/src/plans/optimizer/count_star.rs b/crates/polars-plan/src/plans/optimizer/count_star.rs index 1f20c83f6a87..e0643028e0fe 100644 --- a/crates/polars-plan/src/plans/optimizer/count_star.rs +++ b/crates/polars-plan/src/plans/optimizer/count_star.rs @@ -1,5 +1,7 @@ use std::path::PathBuf; +use polars_utils::mmap::MemSlice; + use super::*; pub(super) struct CountStar; @@ -68,7 +70,7 @@ fn visit_logical_plan_for_scan_paths( IR::Union { inputs, .. } => { enum MutableSources { Paths(Vec), - Buffers(Vec), + Buffers(Vec), } let mut scan_type: Option = None; diff --git a/crates/polars-python/src/conversion/mod.rs b/crates/polars-python/src/conversion/mod.rs index c99e7fcfecf2..ed28df05746d 100644 --- a/crates/polars-python/src/conversion/mod.rs +++ b/crates/polars-python/src/conversion/mod.rs @@ -21,6 +21,7 @@ use polars_lazy::prelude::*; #[cfg(feature = "parquet")] use polars_parquet::write::StatisticsOptions; use polars_plan::plans::ScanSources; +use polars_utils::mmap::MemSlice; use polars_utils::pl_str::PlSmallStr; use polars_utils::total_ord::{TotalEq, TotalHash}; use pyo3::basic::CompareOp; @@ -542,7 +543,7 @@ impl<'py> FromPyObject<'py> for Wrap { enum MutableSources { Paths(Vec), Files(Vec), - Buffers(Vec), + Buffers(Vec), } let num_items = list.len(); diff --git a/crates/polars-python/src/dataframe/io.rs b/crates/polars-python/src/dataframe/io.rs index bd1015f3ff62..37ba89418a28 100644 --- a/crates/polars-python/src/dataframe/io.rs +++ b/crates/polars-python/src/dataframe/io.rs @@ -158,7 +158,7 @@ impl PyDataFrame { let result = match get_either_file(py_f, false)? { Py(f) => { - let buf = f.as_buffer(); + let buf = std::io::Cursor::new(f.to_memslice()); py.allow_threads(move || { ParquetReader::new(buf) .with_projection(projection) diff --git a/crates/polars-python/src/file.rs b/crates/polars-python/src/file.rs index 45a61e777e28..ffec35a34f52 100644 --- a/crates/polars-python/src/file.rs +++ b/crates/polars-python/src/file.rs @@ -7,10 +7,12 @@ use std::io::{Cursor, ErrorKind, Read, Seek, SeekFrom, Write}; #[cfg(target_family = "unix")] use std::os::fd::{FromRawFd, RawFd}; use std::path::PathBuf; +use std::sync::Arc; use polars::io::mmap::MmapBytesReader; use polars_error::polars_err; use polars_io::cloud::CloudOptions; +use polars_utils::mmap::MemSlice; use pyo3::exceptions::PyTypeError; use pyo3::prelude::*; use pyo3::types::{PyBytes, PyString, PyStringMethods}; @@ -39,38 +41,28 @@ impl PyFileLikeObject { PyFileLikeObject { inner: object } } - pub fn as_bytes(&self) -> bytes::Bytes { - self.as_file_buffer().into_inner().into() - } - - pub fn as_buffer(&self) -> std::io::Cursor> { - let data = self.as_file_buffer().into_inner(); - std::io::Cursor::new(data) - } - - pub fn as_file_buffer(&self) -> Cursor> { - let buf = Python::with_gil(|py| { + pub fn to_memslice(&self) -> MemSlice { + Python::with_gil(|py| { let bytes = self .inner .call_method_bound(py, "read", (), None) .expect("no read method found"); - if let Ok(bytes) = bytes.downcast_bound::(py) { - return bytes.as_bytes().to_vec(); + if let Ok(b) = bytes.downcast_bound::(py) { + return MemSlice::from_arc(b.as_bytes(), Arc::new(bytes.clone_ref(py))); } - if let Ok(bytes) = bytes.downcast_bound::(py) { - return bytes - .to_cow() - .expect("PyString is not valid UTF-8") - .into_owned() - .into_bytes(); + if let Ok(b) = bytes.downcast_bound::(py) { + return match b.to_cow().expect("PyString is not valid UTF-8") { + Cow::Borrowed(v) => { + MemSlice::from_arc(v.as_bytes(), Arc::new(bytes.clone_ref(py))) + }, + Cow::Owned(v) => MemSlice::from_vec(v.into_bytes()), + }; } panic!("Expecting to be able to downcast into bytes from read result."); - }); - - Cursor::new(buf) + }) } /// Validates that the underlying @@ -212,7 +204,7 @@ impl EitherRustPythonFile { fn into_scan_source_input(self) -> PythonScanSourceInput { match self { - EitherRustPythonFile::Py(f) => PythonScanSourceInput::Buffer(f.as_bytes()), + EitherRustPythonFile::Py(f) => PythonScanSourceInput::Buffer(f.to_memslice()), EitherRustPythonFile::Rust(f) => PythonScanSourceInput::File(f), } } @@ -226,7 +218,7 @@ impl EitherRustPythonFile { } pub enum PythonScanSourceInput { - Buffer(bytes::Bytes), + Buffer(MemSlice), Path(PathBuf), File(File), } @@ -328,13 +320,15 @@ pub fn get_python_scan_source_input( write: bool, ) -> PyResult { Python::with_gil(|py| { - let py_f = py_f.into_bound(py); + let py_f_0 = py_f; + let py_f = py_f_0.clone_ref(py).into_bound(py); // If the pyobject is a `bytes` class - if let Ok(bytes) = py_f.downcast::() { - return Ok(PythonScanSourceInput::Buffer( - bytes::Bytes::copy_from_slice(bytes.as_bytes()), - )); + if let Ok(b) = py_f.downcast::() { + return Ok(PythonScanSourceInput::Buffer(MemSlice::from_arc( + b.as_bytes(), + Arc::new(py_f_0), + ))); } if let Ok(s) = py_f.extract::>() { diff --git a/crates/polars-utils/src/mmap.rs b/crates/polars-utils/src/mmap.rs index 52cd7f04b0a7..bdac6e095a94 100644 --- a/crates/polars-utils/src/mmap.rs +++ b/crates/polars-utils/src/mmap.rs @@ -35,8 +35,8 @@ mod private { #[derive(Clone, Debug)] #[allow(unused)] enum MemSliceInner { - Bytes(bytes::Bytes), - Mmap(Arc), + Bytes(bytes::Bytes), // Separate because it does atomic refcounting internally + Arc(Arc), } impl Deref for MemSlice { @@ -97,7 +97,18 @@ mod private { slice: unsafe { std::mem::transmute::<&[u8], &'static [u8]>(mmap.as_ref().as_ref()) }, - inner: MemSliceInner::Mmap(mmap), + inner: MemSliceInner::Arc(mmap), + } + } + + #[inline] + pub fn from_arc(slice: &[u8], arc: Arc) -> Self + where + T: std::fmt::Debug + Send + Sync + 'static, + { + Self { + slice: unsafe { std::mem::transmute::<&[u8], &'static [u8]>(slice) }, + inner: MemSliceInner::Arc(arc), } }