From 4fd05d53a80677ebf8dc5f9e7b13348d9a4dea4f Mon Sep 17 00:00:00 2001 From: Xiaoying Wang Date: Wed, 8 May 2024 10:41:14 -0700 Subject: [PATCH] fixed parallel segfault, some other issue remains --- connectorx-python/Cargo.toml | 3 +- connectorx-python/src/pandas/destination.rs | 83 ++++---- connectorx-python/src/pandas/dispatcher.rs | 198 ++++++++++++++++++ connectorx-python/src/pandas/get_meta.rs | 4 +- connectorx-python/src/pandas/mod.rs | 138 ++++++------ .../src/pandas/pandas_columns/string.rs | 39 ++-- 6 files changed, 335 insertions(+), 130 deletions(-) create mode 100644 connectorx-python/src/pandas/dispatcher.rs diff --git a/connectorx-python/Cargo.toml b/connectorx-python/Cargo.toml index 36d3c67fe2..c5109b025b 100644 --- a/connectorx-python/Cargo.toml +++ b/connectorx-python/Cargo.toml @@ -42,6 +42,7 @@ tokio-util = "0.6" url = "2" urlencoding = "2.1" uuid = "0.8" +rayon = "1" [build-dependencies] built = {version = "0.5", features = ["chrono"]} @@ -56,7 +57,7 @@ rayon = "1" [lib] crate-type = ["cdylib", "rlib"] -name = "connectorx_python" +name = "connectorx" [features] branch = ["connectorx/branch"] diff --git a/connectorx-python/src/pandas/destination.rs b/connectorx-python/src/pandas/destination.rs index e4d6a9e88b..cdd41dd492 100644 --- a/connectorx-python/src/pandas/destination.rs +++ b/connectorx-python/src/pandas/destination.rs @@ -41,7 +41,6 @@ impl PandasBlockInfo { } pub struct PandasDestination<'py> { - py: Python<'py>, nrow: usize, schema: Vec, names: Vec, @@ -50,9 +49,8 @@ pub struct PandasDestination<'py> { } impl<'py> PandasDestination<'py> { - pub fn new(py: Python<'py>) -> Self { + pub fn new() -> Self { PandasDestination { - py, nrow: 0, schema: vec![], names: vec![], @@ -61,7 +59,7 @@ impl<'py> PandasDestination<'py> { } } - pub fn result(self) -> Result> { + pub fn result(self, py: Python<'py>) -> Result> { #[throws(ConnectorXPythonError)] fn to_list>(py: Python<'_>, arr: Vec) -> Bound { let list = PyList::empty_bound(py); @@ -70,26 +68,27 @@ impl<'py> PandasDestination<'py> { } list } - let block_infos = to_list(self.py, self.block_infos)?; - let names = to_list(self.py, self.names)?; - let block_datas = to_list(self.py, self.block_datas)?; + let block_infos = to_list(py, self.block_infos)?; + let names = to_list(py, self.names)?; + let block_datas = to_list(py, self.block_datas)?; let result = [ ("data", block_datas), ("headers", names), ("block_infos", block_infos), ] - .into_py_dict_bound(self.py); + .into_py_dict_bound(py); Ok(result.into_any()) } #[throws(ConnectorXPythonError)] fn allocate_array( &mut self, + py: Python<'py>, dt: PandasBlockType, placement: Vec, ) { // has to use `zeros` instead of `new` for String type initialization - let data = PyArray2::::zeros_bound(self.py, [placement.len(), self.nrow], false); + let data = PyArray2::::zeros_bound(py, [placement.len(), self.nrow], false); let block_info = PandasBlockInfo { dt, cids: placement, @@ -102,6 +101,7 @@ impl<'py> PandasDestination<'py> { #[throws(ConnectorXPythonError)] fn allocate_masked_array( &mut self, + py: Python<'py>, dt: PandasBlockType, placement: Vec, ) { @@ -110,28 +110,18 @@ impl<'py> PandasDestination<'py> { dt, cids: vec![pos], }; - let data = PyArray1::::zeros_bound(self.py, self.nrow, false); - let mask = PyArray1::::zeros_bound(self.py, self.nrow, false); - let obj = PyTuple::new_bound(self.py, vec![data.as_any(), mask.as_any()]); + let data = PyArray1::::zeros_bound(py, self.nrow, false); + let mask = PyArray1::::zeros_bound(py, self.nrow, false); + let obj = PyTuple::new_bound(py, vec![data.as_any(), mask.as_any()]); self.block_datas.push(obj.into_any()); self.block_infos.push(block_info); } } -} - -impl<'py> Destination for PandasDestination<'py> { - const DATA_ORDERS: &'static [DataOrder] = &[DataOrder::RowMajor]; - type TypeSystem = PandasTypeSystem; - type Partition<'b> = PandasPartitionDestination<'b> where 'py: 'b; - type Error = ConnectorXPythonError; - - fn needs_count(&self) -> bool { - true - } #[throws(ConnectorXPythonError)] - fn allocate>( + pub fn allocate_py>( &mut self, + py: Python<'py>, nrows: usize, names: &[S], schema: &[PandasTypeSystem], @@ -154,41 +144,64 @@ impl<'py> Destination for PandasDestination<'py> { for (dt, placement) in block_indices { match dt { PandasBlockType::Boolean(true) => { - self.allocate_masked_array::(dt, placement)?; + self.allocate_masked_array::(py, dt, placement)?; } PandasBlockType::Boolean(false) => { - self.allocate_array::(dt, placement)?; + self.allocate_array::(py, dt, placement)?; } PandasBlockType::Int64(true) => { - self.allocate_masked_array::(dt, placement)?; + self.allocate_masked_array::(py, dt, placement)?; } PandasBlockType::Int64(false) => { - self.allocate_array::(dt, placement)?; + self.allocate_array::(py, dt, placement)?; } PandasBlockType::Float64 => { - self.allocate_array::(dt, placement)?; + self.allocate_array::(py, dt, placement)?; } PandasBlockType::BooleanArray => { - self.allocate_array::(dt, placement)?; + self.allocate_array::(py, dt, placement)?; } PandasBlockType::Float64Array => { - self.allocate_array::(dt, placement)?; + self.allocate_array::(py, dt, placement)?; } PandasBlockType::Int64Array => { - self.allocate_array::(dt, placement)?; + self.allocate_array::(py, dt, placement)?; } PandasBlockType::String => { - self.allocate_array::(dt, placement)?; + self.allocate_array::(py, dt, placement)?; } PandasBlockType::DateTime => { - self.allocate_array::(dt, placement)?; + self.allocate_array::(py, dt, placement)?; } PandasBlockType::Bytes => { - self.allocate_array::(dt, placement)?; + self.allocate_array::(py, dt, placement)?; } }; } } +} + +impl<'py> Destination for PandasDestination<'py> { + const DATA_ORDERS: &'static [DataOrder] = &[DataOrder::RowMajor]; + type TypeSystem = PandasTypeSystem; + type Partition<'b> = PandasPartitionDestination<'b> where 'py: 'b; + type Error = ConnectorXPythonError; + + fn needs_count(&self) -> bool { + true + } + + #[allow(unreachable_code)] + #[throws(ConnectorXPythonError)] + fn allocate>( + &mut self, + _nrows: usize, + _names: &[S], + _schema: &[PandasTypeSystem], + _data_order: DataOrder, + ) { + unimplemented!("not implemented for python destination!"); + } #[throws(ConnectorXPythonError)] fn partition(&mut self, counts: usize) -> Vec> { diff --git a/connectorx-python/src/pandas/dispatcher.rs b/connectorx-python/src/pandas/dispatcher.rs new file mode 100644 index 0000000000..f733228639 --- /dev/null +++ b/connectorx-python/src/pandas/dispatcher.rs @@ -0,0 +1,198 @@ +use super::{destination::PandasDestination, typesystem::PandasTypeSystem}; +use crate::errors::ConnectorXPythonError; +use connectorx::errors::Result as CXResult; +use connectorx::prelude::*; +use itertools::Itertools; +use log::debug; +use pyo3::prelude::*; +use rayon::prelude::*; +use std::marker::PhantomData; + +pub struct PandasDispatcher<'py, S, TP> { + src: S, + dst: PandasDestination<'py>, + queries: Vec>, + origin_query: Option, + _phantom: PhantomData, +} + +impl<'py, S, TP> PandasDispatcher<'py, S, TP> +where + S: Source, + TP: Transport>, + ::Error: From, +{ + /// Create a new dispatcher by providing a source, a destination and the queries. + pub fn new( + src: S, + dst: PandasDestination<'py>, + queries: &[Q], + origin_query: Option, + ) -> Self + where + for<'a> &'a Q: Into, + { + Self { + src, + dst, + queries: queries.iter().map(Into::into).collect(), + origin_query, + _phantom: PhantomData, + } + } + + /// Start the data loading process. + pub fn run(mut self, py: Python<'py>) -> Result, TP::Error> { + debug!("Run dispatcher"); + + debug!("Prepare"); + let dorder = coordinate(S::DATA_ORDERS, PandasDestination::DATA_ORDERS)?; + self.src.set_data_order(dorder)?; + self.src.set_queries(self.queries.as_slice()); + self.src.set_origin_query(self.origin_query); + + debug!("Fetching metadata"); + self.src.fetch_metadata()?; + let src_schema = self.src.schema(); + let dst_schema = src_schema + .iter() + .map(|&s| TP::convert_typesystem(s)) + .collect::>>()?; + let names = self.src.names(); + + let mut total_rows = if self.dst.needs_count() { + // return None if cannot derive total count + debug!("Try get row rounts for entire result"); + self.src.result_rows()? + } else { + debug!("Do not need counts in advance"); + Some(0) + }; + let mut src_partitions: Vec = self.src.partition()?; + if self.dst.needs_count() && total_rows.is_none() { + debug!("Manually count rows of each partitioned query and sum up"); + // run queries + src_partitions + .par_iter_mut() + .try_for_each(|partition| -> Result<(), S::Error> { partition.result_rows() })?; + + // get number of row of each partition from the source + let part_rows: Vec = src_partitions + .iter() + .map(|partition| partition.nrows()) + .collect(); + total_rows = Some(part_rows.iter().sum()); + } + let total_rows = total_rows.ok_or_else(ConnectorXError::CountError)?; + + debug!( + "Allocate destination memory: {}x{}", + total_rows, + src_schema.len() + ); + self.dst + .allocate_py(py, total_rows, &names, &dst_schema, dorder)?; + + debug!("Create destination partition"); + let dst_partitions = self.dst.partition(self.queries.len())?; + + #[cfg(all(not(feature = "branch"), not(feature = "fptr")))] + compile_error!("branch or fptr, pick one"); + + #[cfg(feature = "branch")] + let schemas: Vec<_> = src_schema + .iter() + .zip_eq(&dst_schema) + .map(|(&src_ty, &dst_ty)| (src_ty, dst_ty)) + .collect(); + + debug!("Start writing"); + + // release GIL + py.allow_threads(move || -> Result<(), TP::Error> { + // parse and write + dst_partitions + .into_par_iter() + .zip_eq(src_partitions) + .enumerate() + .try_for_each(|(i, (mut dst, mut src))| -> Result<(), TP::Error> { + #[cfg(feature = "fptr")] + let f: Vec<_> = src_schema + .iter() + .zip_eq(&dst_schema) + .map(|(&src_ty, &dst_ty)| TP::processor(src_ty, dst_ty)) + .collect::>>()?; + + let mut parser = src.parser()?; + + match dorder { + DataOrder::RowMajor => loop { + let (n, is_last) = parser.fetch_next()?; + dst.aquire_row(n)?; + for _ in 0..n { + #[allow(clippy::needless_range_loop)] + for col in 0..dst.ncols() { + #[cfg(feature = "fptr")] + f[col](&mut parser, &mut dst)?; + + #[cfg(feature = "branch")] + { + let (s1, s2) = schemas[col]; + TP::process(s1, s2, &mut parser, &mut dst)?; + } + } + } + if is_last { + break; + } + }, + DataOrder::ColumnMajor => loop { + let (n, is_last) = parser.fetch_next()?; + dst.aquire_row(n)?; + #[allow(clippy::needless_range_loop)] + for col in 0..dst.ncols() { + for _ in 0..n { + #[cfg(feature = "fptr")] + f[col](&mut parser, &mut dst)?; + #[cfg(feature = "branch")] + { + let (s1, s2) = schemas[col]; + TP::process(s1, s2, &mut parser, &mut dst)?; + } + } + } + if is_last { + break; + } + }, + } + + debug!("Finalize partition {}", i); + dst.finalize()?; + debug!("Partition {} finished", i); + Ok(()) + })?; + Ok(()) + })?; + debug!("Writing finished"); + + Ok(self.dst.result(py).unwrap()) + } + + /// Only fetch the metadata (header) of the destination. + pub fn get_meta(&mut self) -> Result<(), TP::Error> { + let dorder = coordinate(S::DATA_ORDERS, PandasDestination::DATA_ORDERS)?; + self.src.set_data_order(dorder)?; + self.src.set_queries(self.queries.as_slice()); + self.src.set_origin_query(self.origin_query.clone()); + self.src.fetch_metadata()?; + let src_schema = self.src.schema(); + let dst_schema = src_schema + .iter() + .map(|&s| TP::convert_typesystem(s)) + .collect::>>()?; + let names = self.src.names(); + self.dst.allocate(0, &names, &dst_schema, dorder)?; + Ok(()) + } +} diff --git a/connectorx-python/src/pandas/get_meta.rs b/connectorx-python/src/pandas/get_meta.rs index f134c36c1f..bc288ad194 100644 --- a/connectorx-python/src/pandas/get_meta.rs +++ b/connectorx-python/src/pandas/get_meta.rs @@ -38,7 +38,7 @@ pub fn get_meta<'py>( query: String, ) -> Bound<'py, PyAny> { let source_conn = SourceConn::try_from(conn)?; - let mut destination = PandasDestination::new(py); + let mut destination = PandasDestination::new(); let queries = &[CXQuery::Naked(query)]; match source_conn.ty { @@ -243,5 +243,5 @@ pub fn get_meta<'py>( _ => unimplemented!("{:?} not implemented!", source_conn.ty), } - destination.result()? + destination.result(py)? } diff --git a/connectorx-python/src/pandas/mod.rs b/connectorx-python/src/pandas/mod.rs index d5509877e4..15c91f7783 100644 --- a/connectorx-python/src/pandas/mod.rs +++ b/connectorx-python/src/pandas/mod.rs @@ -1,4 +1,5 @@ mod destination; +mod dispatcher; pub mod get_meta; mod pandas_columns; mod pystring; @@ -6,6 +7,7 @@ mod transports; mod typesystem; pub use self::destination::{PandasBlockInfo, PandasDestination, PandasPartitionDestination}; +use self::dispatcher::PandasDispatcher; pub use self::transports::{ BigQueryPandasTransport, MsSQLPandasTransport, MysqlPandasTransport, OraclePandasTransport, PostgresPandasTransport, SqlitePandasTransport, TrinoPandasTransport, @@ -33,17 +35,16 @@ use pyo3::prelude::*; use std::sync::Arc; #[throws(ConnectorXPythonError)] -pub fn write_pandas<'py>( +pub fn write_pandas<'a, 'py: 'a>( py: Python<'py>, source_conn: &SourceConn, origin_query: Option, queries: &[CXQuery], ) -> Bound<'py, PyAny> { - let mut destination = PandasDestination::new(py); + let destination = PandasDestination::new(); let protocol = source_conn.proto.as_str(); debug!("Protocol: {}", protocol); - // TODO: unlock gil if possible match source_conn.ty { SourceType::Postgres => { let (config, tls) = rewrite_tls_args(&source_conn.conn)?; @@ -54,26 +55,24 @@ pub fn write_pandas<'py>( tls_conn, queries.len(), )?; - let dispatcher = Dispatcher::< - _, + let dispatcher = PandasDispatcher::< _, PostgresPandasTransport, >::new( - sb, &mut destination, queries, origin_query + sb, destination, queries, origin_query ); - dispatcher.run()?; + dispatcher.run(py)? } ("csv", None) => { let sb = PostgresSource::::new(config, NoTls, queries.len())?; - let dispatcher = - Dispatcher::<_, _, PostgresPandasTransport>::new( - sb, - &mut destination, - queries, - origin_query, - ); - dispatcher.run()?; + let dispatcher = PandasDispatcher::< + _, + PostgresPandasTransport, + >::new( + sb, destination, queries, origin_query + ); + dispatcher.run(py)? } ("binary", Some(tls_conn)) => { let sb = PostgresSource::::new( @@ -82,12 +81,11 @@ pub fn write_pandas<'py>( queries.len(), )?; let dispatcher = - Dispatcher::< - _, + PandasDispatcher::< _, PostgresPandasTransport, - >::new(sb, &mut destination, queries, origin_query); - dispatcher.run()?; + >::new(sb, destination, queries, origin_query); + dispatcher.run(py)? } ("binary", None) => { let sb = PostgresSource::::new( @@ -95,14 +93,13 @@ pub fn write_pandas<'py>( NoTls, queries.len(), )?; - let dispatcher = Dispatcher::< - _, + let dispatcher = PandasDispatcher::< _, PostgresPandasTransport, >::new( - sb, &mut destination, queries, origin_query + sb, destination, queries, origin_query ); - dispatcher.run()?; + dispatcher.run(py)? } ("cursor", Some(tls_conn)) => { let sb = PostgresSource::::new( @@ -110,26 +107,23 @@ pub fn write_pandas<'py>( tls_conn, queries.len(), )?; - let dispatcher = Dispatcher::< - _, - _, - PostgresPandasTransport, - >::new( - sb, &mut destination, queries, origin_query - ); - dispatcher.run()?; + let dispatcher = + PandasDispatcher::< + _, + PostgresPandasTransport, + >::new(sb, destination, queries, origin_query); + dispatcher.run(py)? } ("cursor", None) => { let sb = PostgresSource::::new(config, NoTls, queries.len())?; - let dispatcher = Dispatcher::< - _, + let dispatcher = PandasDispatcher::< _, PostgresPandasTransport, >::new( - sb, &mut destination, queries, origin_query + sb, destination, queries, origin_query ); - dispatcher.run()?; + dispatcher.run(py)? } ("simple", Some(tls_conn)) => { let sb = PostgresSource::::new( @@ -137,26 +131,23 @@ pub fn write_pandas<'py>( tls_conn, queries.len(), )?; - let dispatcher = Dispatcher::< - _, - _, - PostgresPandasTransport, - >::new( - sb, &mut destination, queries, origin_query - ); - dispatcher.run()?; + let dispatcher = + PandasDispatcher::< + _, + PostgresPandasTransport, + >::new(sb, destination, queries, origin_query); + dispatcher.run(py)? } ("simple", None) => { let sb = PostgresSource::::new(config, NoTls, queries.len())?; - let dispatcher = Dispatcher::< - _, + let dispatcher = PandasDispatcher::< _, PostgresPandasTransport, >::new( - sb, &mut destination, queries, origin_query + sb, destination, queries, origin_query ); - dispatcher.run()?; + dispatcher.run(py)? } _ => unimplemented!("{} protocol not supported", protocol), } @@ -165,84 +156,83 @@ pub fn write_pandas<'py>( // remove the first "sqlite://" manually since url.path is not correct for windows let path = &source_conn.conn.as_str()[9..]; let source = SQLiteSource::new(path, queries.len())?; - let dispatcher = Dispatcher::<_, _, SqlitePandasTransport>::new( + let dispatcher = PandasDispatcher::<_, SqlitePandasTransport>::new( source, - &mut destination, + destination, queries, origin_query, ); - dispatcher.run()?; + dispatcher.run(py)? } SourceType::MySQL => match protocol { "binary" => { let source = MySQLSource::::new(&source_conn.conn[..], queries.len())?; - let dispatcher = Dispatcher::<_, _, MysqlPandasTransport>::new( - source, - &mut destination, - queries, - origin_query, - ); - dispatcher.run()?; + let dispatcher = + PandasDispatcher::<_, MysqlPandasTransport>::new( + source, + destination, + queries, + origin_query, + ); + dispatcher.run(py)? } "text" => { let source = MySQLSource::::new(&source_conn.conn[..], queries.len())?; - let dispatcher = Dispatcher::<_, _, MysqlPandasTransport>::new( + let dispatcher = PandasDispatcher::<_, MysqlPandasTransport>::new( source, - &mut destination, + destination, queries, origin_query, ); - dispatcher.run()?; + dispatcher.run(py)? } _ => unimplemented!("{} protocol not supported", protocol), }, SourceType::MsSQL => { let rt = Arc::new(tokio::runtime::Runtime::new().expect("Failed to create runtime")); let source = MsSQLSource::new(rt, &source_conn.conn[..], queries.len())?; - let dispatcher = Dispatcher::<_, _, MsSQLPandasTransport>::new( + let dispatcher = PandasDispatcher::<_, MsSQLPandasTransport>::new( source, - &mut destination, + destination, queries, origin_query, ); - dispatcher.run()?; + dispatcher.run(py)? } SourceType::Oracle => { let source = OracleSource::new(&source_conn.conn[..], queries.len())?; - let dispatcher = Dispatcher::<_, _, OraclePandasTransport>::new( + let dispatcher = PandasDispatcher::<_, OraclePandasTransport>::new( source, - &mut destination, + destination, queries, origin_query, ); - dispatcher.run()?; + dispatcher.run(py)? } SourceType::BigQuery => { let rt = Arc::new(tokio::runtime::Runtime::new().expect("Failed to create runtime")); let source = BigQuerySource::new(rt, &source_conn.conn[..])?; - let dispatcher = Dispatcher::<_, _, BigQueryPandasTransport>::new( + let dispatcher = PandasDispatcher::<_, BigQueryPandasTransport>::new( source, - &mut destination, + destination, queries, origin_query, ); - dispatcher.run()?; + dispatcher.run(py)? } SourceType::Trino => { let rt = Arc::new(tokio::runtime::Runtime::new().expect("Failed to create runtime")); let source = TrinoSource::new(rt, &source_conn.conn[..])?; - let dispatcher = Dispatcher::<_, _, TrinoPandasTransport>::new( + let dispatcher = PandasDispatcher::::new( source, - &mut destination, + destination, queries, origin_query, ); - dispatcher.run()?; + dispatcher.run(py)? } _ => unimplemented!("{:?} not implemented!", source_conn.ty), } - - destination.result()? } diff --git a/connectorx-python/src/pandas/pandas_columns/string.rs b/connectorx-python/src/pandas/pandas_columns/string.rs index 5f38a00eaa..a2783bbc38 100644 --- a/connectorx-python/src/pandas/pandas_columns/string.rs +++ b/connectorx-python/src/pandas/pandas_columns/string.rs @@ -265,27 +265,30 @@ impl StringColumn { Err(_) => return, } }; - let py = unsafe { Python::assume_gil_acquired() }; + // NOTE: from Python 3.12, we have to allocate the string with a real Python<'py> token + // previous `let py = unsafe { Python::assume_gil_acquired() }` approach will lead to segment fault when partition is enabled let mut string_infos = Vec::with_capacity(self.string_lengths.len()); - let mut start = 0; - for (i, &len) in self.string_lengths.iter().enumerate() { - if len != usize::MAX { - let end = start + len; - - unsafe { - let string_info = StringInfo::detect(&self.string_buf[start..end]); - *self.data.add(self.row_idx[i]) = string_info.pystring(py); - string_infos.push(Some(string_info)); - }; - - start = end; - } else { - string_infos.push(None); - - unsafe { *self.data.add(self.row_idx[i]) = PyString::none(py) }; + Python::with_gil(|py| { + let mut start = 0; + for (i, &len) in self.string_lengths.iter().enumerate() { + if len != usize::MAX { + let end = start + len; + + unsafe { + let string_info = StringInfo::detect(&self.string_buf[start..end]); + *self.data.add(self.row_idx[i]) = string_info.pystring(py); + string_infos.push(Some(string_info)); + }; + + start = end; + } else { + string_infos.push(None); + + unsafe { *self.data.add(self.row_idx[i]) = PyString::none(py) }; + } } - } + }); // unlock GIL std::mem::drop(guard);