Skip to content

Commit

Permalink
fixed parallel segfault, some other issue remains
Browse files Browse the repository at this point in the history
  • Loading branch information
wangxiaoying committed May 8, 2024
1 parent 240716f commit 4fd05d5
Show file tree
Hide file tree
Showing 6 changed files with 335 additions and 130 deletions.
3 changes: 2 additions & 1 deletion connectorx-python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ tokio-util = "0.6"
url = "2"
urlencoding = "2.1"
uuid = "0.8"
rayon = "1"

[build-dependencies]
built = {version = "0.5", features = ["chrono"]}
Expand All @@ -56,7 +57,7 @@ rayon = "1"

[lib]
crate-type = ["cdylib", "rlib"]
name = "connectorx_python"
name = "connectorx"

[features]
branch = ["connectorx/branch"]
Expand Down
83 changes: 48 additions & 35 deletions connectorx-python/src/pandas/destination.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ impl PandasBlockInfo {
}

pub struct PandasDestination<'py> {
py: Python<'py>,
nrow: usize,
schema: Vec<PandasTypeSystem>,
names: Vec<String>,
Expand All @@ -50,9 +49,8 @@ pub struct PandasDestination<'py> {
}

impl<'py> PandasDestination<'py> {
pub fn new(py: Python<'py>) -> Self {
pub fn new() -> Self {
PandasDestination {
py,
nrow: 0,
schema: vec![],
names: vec![],
Expand All @@ -61,7 +59,7 @@ impl<'py> PandasDestination<'py> {
}
}

pub fn result(self) -> Result<Bound<'py, PyAny>> {
pub fn result(self, py: Python<'py>) -> Result<Bound<'py, PyAny>> {
#[throws(ConnectorXPythonError)]
fn to_list<T: IntoPy<PyObject>>(py: Python<'_>, arr: Vec<T>) -> Bound<PyList> {
let list = PyList::empty_bound(py);
Expand All @@ -70,26 +68,27 @@ impl<'py> PandasDestination<'py> {
}
list
}
let block_infos = to_list(self.py, self.block_infos)?;
let names = to_list(self.py, self.names)?;
let block_datas = to_list(self.py, self.block_datas)?;
let block_infos = to_list(py, self.block_infos)?;
let names = to_list(py, self.names)?;
let block_datas = to_list(py, self.block_datas)?;
let result = [
("data", block_datas),
("headers", names),
("block_infos", block_infos),
]
.into_py_dict_bound(self.py);
.into_py_dict_bound(py);
Ok(result.into_any())
}

#[throws(ConnectorXPythonError)]
fn allocate_array<T: numpy::Element + 'py>(
&mut self,
py: Python<'py>,
dt: PandasBlockType,
placement: Vec<usize>,
) {
// has to use `zeros` instead of `new` for String type initialization
let data = PyArray2::<T>::zeros_bound(self.py, [placement.len(), self.nrow], false);
let data = PyArray2::<T>::zeros_bound(py, [placement.len(), self.nrow], false);
let block_info = PandasBlockInfo {
dt,
cids: placement,
Expand All @@ -102,6 +101,7 @@ impl<'py> PandasDestination<'py> {
#[throws(ConnectorXPythonError)]
fn allocate_masked_array<T: numpy::Element + 'py>(
&mut self,
py: Python<'py>,
dt: PandasBlockType,
placement: Vec<usize>,
) {
Expand All @@ -110,28 +110,18 @@ impl<'py> PandasDestination<'py> {
dt,
cids: vec![pos],
};
let data = PyArray1::<T>::zeros_bound(self.py, self.nrow, false);
let mask = PyArray1::<bool>::zeros_bound(self.py, self.nrow, false);
let obj = PyTuple::new_bound(self.py, vec![data.as_any(), mask.as_any()]);
let data = PyArray1::<T>::zeros_bound(py, self.nrow, false);
let mask = PyArray1::<bool>::zeros_bound(py, self.nrow, false);
let obj = PyTuple::new_bound(py, vec![data.as_any(), mask.as_any()]);
self.block_datas.push(obj.into_any());
self.block_infos.push(block_info);
}
}
}

impl<'py> Destination for PandasDestination<'py> {
const DATA_ORDERS: &'static [DataOrder] = &[DataOrder::RowMajor];
type TypeSystem = PandasTypeSystem;
type Partition<'b> = PandasPartitionDestination<'b> where 'py: 'b;
type Error = ConnectorXPythonError;

fn needs_count(&self) -> bool {
true
}

#[throws(ConnectorXPythonError)]
fn allocate<S: AsRef<str>>(
pub fn allocate_py<S: AsRef<str>>(
&mut self,
py: Python<'py>,
nrows: usize,
names: &[S],
schema: &[PandasTypeSystem],
Expand All @@ -154,41 +144,64 @@ impl<'py> Destination for PandasDestination<'py> {
for (dt, placement) in block_indices {
match dt {
PandasBlockType::Boolean(true) => {
self.allocate_masked_array::<bool>(dt, placement)?;
self.allocate_masked_array::<bool>(py, dt, placement)?;
}
PandasBlockType::Boolean(false) => {
self.allocate_array::<bool>(dt, placement)?;
self.allocate_array::<bool>(py, dt, placement)?;
}
PandasBlockType::Int64(true) => {
self.allocate_masked_array::<i64>(dt, placement)?;
self.allocate_masked_array::<i64>(py, dt, placement)?;
}
PandasBlockType::Int64(false) => {
self.allocate_array::<i64>(dt, placement)?;
self.allocate_array::<i64>(py, dt, placement)?;
}
PandasBlockType::Float64 => {
self.allocate_array::<f64>(dt, placement)?;
self.allocate_array::<f64>(py, dt, placement)?;
}
PandasBlockType::BooleanArray => {
self.allocate_array::<super::pandas_columns::PyList>(dt, placement)?;
self.allocate_array::<super::pandas_columns::PyList>(py, dt, placement)?;
}
PandasBlockType::Float64Array => {
self.allocate_array::<super::pandas_columns::PyList>(dt, placement)?;
self.allocate_array::<super::pandas_columns::PyList>(py, dt, placement)?;
}
PandasBlockType::Int64Array => {
self.allocate_array::<super::pandas_columns::PyList>(dt, placement)?;
self.allocate_array::<super::pandas_columns::PyList>(py, dt, placement)?;
}
PandasBlockType::String => {
self.allocate_array::<PyString>(dt, placement)?;
self.allocate_array::<PyString>(py, dt, placement)?;
}
PandasBlockType::DateTime => {
self.allocate_array::<i64>(dt, placement)?;
self.allocate_array::<i64>(py, dt, placement)?;
}
PandasBlockType::Bytes => {
self.allocate_array::<PyBytes>(dt, placement)?;
self.allocate_array::<PyBytes>(py, dt, placement)?;
}
};
}
}
}

impl<'py> Destination for PandasDestination<'py> {
const DATA_ORDERS: &'static [DataOrder] = &[DataOrder::RowMajor];
type TypeSystem = PandasTypeSystem;
type Partition<'b> = PandasPartitionDestination<'b> where 'py: 'b;
type Error = ConnectorXPythonError;

fn needs_count(&self) -> bool {
true
}

#[allow(unreachable_code)]
#[throws(ConnectorXPythonError)]
fn allocate<S: AsRef<str>>(
&mut self,
_nrows: usize,
_names: &[S],
_schema: &[PandasTypeSystem],
_data_order: DataOrder,
) {
unimplemented!("not implemented for python destination!");
}

#[throws(ConnectorXPythonError)]
fn partition(&mut self, counts: usize) -> Vec<Self::Partition<'_>> {
Expand Down
198 changes: 198 additions & 0 deletions connectorx-python/src/pandas/dispatcher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
use super::{destination::PandasDestination, typesystem::PandasTypeSystem};
use crate::errors::ConnectorXPythonError;
use connectorx::errors::Result as CXResult;
use connectorx::prelude::*;
use itertools::Itertools;
use log::debug;
use pyo3::prelude::*;
use rayon::prelude::*;
use std::marker::PhantomData;

pub struct PandasDispatcher<'py, S, TP> {
src: S,
dst: PandasDestination<'py>,
queries: Vec<CXQuery<String>>,
origin_query: Option<String>,
_phantom: PhantomData<TP>,
}

impl<'py, S, TP> PandasDispatcher<'py, S, TP>
where
S: Source,
TP: Transport<TSS = S::TypeSystem, TSD = PandasTypeSystem, S = S, D = PandasDestination<'py>>,
<TP as connectorx::typesystem::Transport>::Error: From<ConnectorXPythonError>,
{
/// Create a new dispatcher by providing a source, a destination and the queries.
pub fn new<Q>(
src: S,
dst: PandasDestination<'py>,
queries: &[Q],
origin_query: Option<String>,
) -> Self
where
for<'a> &'a Q: Into<CXQuery>,
{
Self {
src,
dst,
queries: queries.iter().map(Into::into).collect(),
origin_query,
_phantom: PhantomData,
}
}

/// Start the data loading process.
pub fn run(mut self, py: Python<'py>) -> Result<Bound<'py, PyAny>, TP::Error> {
debug!("Run dispatcher");

debug!("Prepare");
let dorder = coordinate(S::DATA_ORDERS, PandasDestination::DATA_ORDERS)?;
self.src.set_data_order(dorder)?;
self.src.set_queries(self.queries.as_slice());
self.src.set_origin_query(self.origin_query);

debug!("Fetching metadata");
self.src.fetch_metadata()?;
let src_schema = self.src.schema();
let dst_schema = src_schema
.iter()
.map(|&s| TP::convert_typesystem(s))
.collect::<CXResult<Vec<_>>>()?;
let names = self.src.names();

let mut total_rows = if self.dst.needs_count() {
// return None if cannot derive total count
debug!("Try get row rounts for entire result");
self.src.result_rows()?
} else {
debug!("Do not need counts in advance");
Some(0)
};
let mut src_partitions: Vec<S::Partition> = self.src.partition()?;
if self.dst.needs_count() && total_rows.is_none() {
debug!("Manually count rows of each partitioned query and sum up");
// run queries
src_partitions
.par_iter_mut()
.try_for_each(|partition| -> Result<(), S::Error> { partition.result_rows() })?;

// get number of row of each partition from the source
let part_rows: Vec<usize> = src_partitions
.iter()
.map(|partition| partition.nrows())
.collect();
total_rows = Some(part_rows.iter().sum());
}
let total_rows = total_rows.ok_or_else(ConnectorXError::CountError)?;

debug!(
"Allocate destination memory: {}x{}",
total_rows,
src_schema.len()
);
self.dst
.allocate_py(py, total_rows, &names, &dst_schema, dorder)?;

debug!("Create destination partition");
let dst_partitions = self.dst.partition(self.queries.len())?;

#[cfg(all(not(feature = "branch"), not(feature = "fptr")))]
compile_error!("branch or fptr, pick one");

#[cfg(feature = "branch")]
let schemas: Vec<_> = src_schema
.iter()
.zip_eq(&dst_schema)
.map(|(&src_ty, &dst_ty)| (src_ty, dst_ty))
.collect();

debug!("Start writing");

// release GIL
py.allow_threads(move || -> Result<(), TP::Error> {
// parse and write
dst_partitions
.into_par_iter()
.zip_eq(src_partitions)
.enumerate()
.try_for_each(|(i, (mut dst, mut src))| -> Result<(), TP::Error> {
#[cfg(feature = "fptr")]
let f: Vec<_> = src_schema
.iter()
.zip_eq(&dst_schema)
.map(|(&src_ty, &dst_ty)| TP::processor(src_ty, dst_ty))
.collect::<CXResult<Vec<_>>>()?;

let mut parser = src.parser()?;

match dorder {
DataOrder::RowMajor => loop {
let (n, is_last) = parser.fetch_next()?;
dst.aquire_row(n)?;
for _ in 0..n {
#[allow(clippy::needless_range_loop)]
for col in 0..dst.ncols() {
#[cfg(feature = "fptr")]
f[col](&mut parser, &mut dst)?;

#[cfg(feature = "branch")]
{
let (s1, s2) = schemas[col];
TP::process(s1, s2, &mut parser, &mut dst)?;
}
}
}
if is_last {
break;
}
},
DataOrder::ColumnMajor => loop {
let (n, is_last) = parser.fetch_next()?;
dst.aquire_row(n)?;
#[allow(clippy::needless_range_loop)]
for col in 0..dst.ncols() {
for _ in 0..n {
#[cfg(feature = "fptr")]
f[col](&mut parser, &mut dst)?;
#[cfg(feature = "branch")]
{
let (s1, s2) = schemas[col];
TP::process(s1, s2, &mut parser, &mut dst)?;
}
}
}
if is_last {
break;
}
},
}

debug!("Finalize partition {}", i);
dst.finalize()?;
debug!("Partition {} finished", i);
Ok(())
})?;
Ok(())
})?;
debug!("Writing finished");

Ok(self.dst.result(py).unwrap())
}

/// Only fetch the metadata (header) of the destination.
pub fn get_meta(&mut self) -> Result<(), TP::Error> {
let dorder = coordinate(S::DATA_ORDERS, PandasDestination::DATA_ORDERS)?;
self.src.set_data_order(dorder)?;
self.src.set_queries(self.queries.as_slice());
self.src.set_origin_query(self.origin_query.clone());
self.src.fetch_metadata()?;
let src_schema = self.src.schema();
let dst_schema = src_schema
.iter()
.map(|&s| TP::convert_typesystem(s))
.collect::<CXResult<Vec<_>>>()?;
let names = self.src.names();
self.dst.allocate(0, &names, &dst_schema, dorder)?;
Ok(())
}
}
Loading

0 comments on commit 4fd05d5

Please sign in to comment.