diff --git a/connectorx-python/examples/tpch.rs b/connectorx-python/examples/tpch.rs index 69c9afa2d..a38d07a92 100644 --- a/connectorx-python/examples/tpch.rs +++ b/connectorx-python/examples/tpch.rs @@ -1,4 +1,4 @@ -use connectorx_python::read_sql::{read_sql, PartitionQuery}; +use connectorx_python::cx_read_sql::{read_sql, PyPartitionQuery}; use pyo3::Python; use std::env; @@ -17,12 +17,19 @@ pub fn run(nq: usize, conn: &str) { "pandas", None, None, - Some(PartitionQuery::new(QUERY, "L_ORDERKEY", None, None, nq)), + Some(PyPartitionQuery { + query: QUERY.to_string(), + column: "L_ORDERKEY".to_string(), + min: None, + max: None, + num: nq, + }), ) .unwrap(); }); } +#[allow(dead_code)] fn main() { run(1, "POSTGRES_URL"); } diff --git a/connectorx-python/src/arrow.rs b/connectorx-python/src/arrow.rs index 6521a2fac..0a2be69c0 100644 --- a/connectorx-python/src/arrow.rs +++ b/connectorx-python/src/arrow.rs @@ -10,17 +10,17 @@ use std::convert::TryFrom; use std::sync::Arc; #[throws(ConnectorXPythonError)] -pub fn write_arrow<'a>( - py: Python<'a>, +pub fn write_arrow<'py>( + py: Python<'py>, source_conn: &SourceConn, origin_query: Option, queries: &[CXQuery], -) -> &'a PyAny { +) -> Bound<'py, PyAny> { let destination = get_arrow(source_conn, origin_query, queries)?; let rbs = destination.arrow()?; let ptrs = to_ptrs(rbs); let obj: PyObject = ptrs.into_py(py); - obj.into_ref(py) + obj.into_bound(py) } pub fn to_ptrs(rbs: Vec) -> (Vec, Vec>) { diff --git a/connectorx-python/src/arrow2.rs b/connectorx-python/src/arrow2.rs index 573c7c81b..5be62ac80 100644 --- a/connectorx-python/src/arrow2.rs +++ b/connectorx-python/src/arrow2.rs @@ -14,17 +14,17 @@ use pyo3::{PyAny, Python}; use std::sync::Arc; #[throws(ConnectorXPythonError)] -pub fn write_arrow<'a>( - py: Python<'a>, +pub fn write_arrow<'py>( + py: Python<'py>, source_conn: &SourceConn, origin_query: Option, queries: &[CXQuery], -) -> &'a PyAny { +) -> Bound<'py, PyAny> { let destination = get_arrow2(source_conn, origin_query, queries)?; let (rbs, schema) = destination.arrow()?; let ptrs = to_ptrs(rbs, schema); let obj: PyObject = ptrs.into_py(py); - obj.into_ref(py) + obj.into_bound(py) } fn to_ptrs( diff --git a/connectorx-python/src/cx_read_sql.rs b/connectorx-python/src/cx_read_sql.rs index 6e73449eb..c6416ceec 100644 --- a/connectorx-python/src/cx_read_sql.rs +++ b/connectorx-python/src/cx_read_sql.rs @@ -12,11 +12,11 @@ use crate::errors::ConnectorXPythonError; #[derive(FromPyObject)] #[pyo3(from_item_all)] pub struct PyPartitionQuery { - query: String, - column: String, - min: Option, - max: Option, - num: usize, + pub query: String, + pub column: String, + pub min: Option, + pub max: Option, + pub num: usize, } impl Into for PyPartitionQuery { @@ -31,14 +31,14 @@ impl Into for PyPartitionQuery { } } -pub fn read_sql<'a>( - py: Python<'a>, +pub fn read_sql<'py>( + py: Python<'py>, conn: &str, return_type: &str, protocol: Option<&str>, queries: Option>, partition_query: Option, -) -> PyResult<&'a PyAny> { +) -> PyResult> { let source_conn = parse_source(conn, protocol).map_err(|e| ConnectorXPythonError::from(e))?; let (queries, origin_query) = match (queries, partition_query) { (Some(queries), None) => (queries.into_iter().map(CXQuery::Naked).collect(), None), diff --git a/connectorx-python/src/lib.rs b/connectorx-python/src/lib.rs index fd5d97bc0..90a9356c3 100644 --- a/connectorx-python/src/lib.rs +++ b/connectorx-python/src/lib.rs @@ -26,7 +26,7 @@ static START: Once = Once::new(); // } #[pymodule] -fn connectorx(_: Python, m: &PyModule) -> PyResult<()> { +fn connectorx(_: Python, m: &Bound<'_, PyModule>) -> PyResult<()> { START.call_once(|| { let _ = env_logger::try_init(); }); @@ -40,14 +40,14 @@ fn connectorx(_: Python, m: &PyModule) -> PyResult<()> { } #[pyfunction] -pub fn read_sql<'a>( - py: Python<'a>, +pub fn read_sql<'py>( + py: Python<'py>, conn: &str, return_type: &str, protocol: Option<&str>, queries: Option>, partition_query: Option, -) -> PyResult<&'a PyAny> { +) -> PyResult> { cx_read_sql::read_sql(py, conn, return_type, protocol, queries, partition_query) } @@ -64,11 +64,11 @@ pub fn partition_sql( } #[pyfunction] -pub fn read_sql2<'a>( - py: Python<'a>, +pub fn read_sql2<'py>( + py: Python<'py>, sql: &str, db_map: HashMap, -) -> PyResult<&'a PyAny> { +) -> PyResult> { let rbs = run( sql.to_string(), db_map, @@ -81,16 +81,16 @@ pub fn read_sql2<'a>( .map_err(|e| PyRuntimeError::new_err(format!("{}", e)))?; let ptrs = arrow::to_ptrs(rbs); let obj: PyObject = ptrs.into_py(py); - Ok(obj.into_ref(py)) + Ok(obj.into_bound(py)) } #[pyfunction] -pub fn get_meta<'a>( - py: Python<'a>, +pub fn get_meta<'py>( + py: Python<'py>, conn: &str, query: String, protocol: Option<&str>, -) -> PyResult<&'a PyAny> { +) -> PyResult> { pandas::get_meta::get_meta(py, conn, protocol.unwrap_or("binary"), query) .map_err(|e| From::from(e)) } diff --git a/connectorx-python/src/pandas/destination.rs b/connectorx-python/src/pandas/destination.rs index 82426f3e9..e4d6a9e88 100644 --- a/connectorx-python/src/pandas/destination.rs +++ b/connectorx-python/src/pandas/destination.rs @@ -13,9 +13,8 @@ use fehler::{throw, throws}; use itertools::Itertools; use numpy::{PyArray1, PyArray2}; use pyo3::{ - prelude::{pyclass, pymethods, PyResult}, + prelude::*, types::{IntoPyDict, PyList, PyTuple}, - FromPyObject, IntoPy, PyAny, PyObject, Python, }; use std::{ collections::HashMap, @@ -46,12 +45,12 @@ pub struct PandasDestination<'py> { nrow: usize, schema: Vec, names: Vec, - block_datas: Vec<&'py PyAny>, // either 2d array for normal blocks, or two 1d arrays for extension blocks + block_datas: Vec>, // either 2d array for normal blocks, or two 1d arrays for extension blocks block_infos: Vec, } -impl<'a> PandasDestination<'a> { - pub fn new(py: Python<'a>) -> Self { +impl<'py> PandasDestination<'py> { + pub fn new(py: Python<'py>) -> Self { PandasDestination { py, nrow: 0, @@ -62,10 +61,10 @@ impl<'a> PandasDestination<'a> { } } - pub fn result(self) -> Result<&'a PyAny> { + pub fn result(self) -> Result> { #[throws(ConnectorXPythonError)] - fn to_list>(py: Python<'_>, arr: Vec) -> &'_ PyList { - let list = PyList::empty(py); + fn to_list>(py: Python<'_>, arr: Vec) -> Bound { + let list = PyList::empty_bound(py); for e in arr { list.append(e.into_py(py))?; } @@ -79,29 +78,29 @@ impl<'a> PandasDestination<'a> { ("headers", names), ("block_infos", block_infos), ] - .into_py_dict(self.py); - Ok(result) + .into_py_dict_bound(self.py); + Ok(result.into_any()) } #[throws(ConnectorXPythonError)] - fn allocate_array( + fn allocate_array( &mut self, dt: PandasBlockType, placement: Vec, ) { // has to use `zeros` instead of `new` for String type initialization - let data = PyArray2::::zeros(self.py, [placement.len(), self.nrow], false); + let data = PyArray2::::zeros_bound(self.py, [placement.len(), self.nrow], false); let block_info = PandasBlockInfo { dt, cids: placement, }; - self.block_datas.push(data.into()); + self.block_datas.push(data.into_any()); self.block_infos.push(block_info); } #[throws(ConnectorXPythonError)] - fn allocate_masked_array( + fn allocate_masked_array( &mut self, dt: PandasBlockType, placement: Vec, @@ -111,19 +110,19 @@ impl<'a> PandasDestination<'a> { dt, cids: vec![pos], }; - let data = PyArray1::::zeros(self.py, self.nrow, false); - let mask = PyArray1::::zeros(self.py, self.nrow, false); - let obj = PyTuple::new(self.py, vec![data.as_ref(), mask.as_ref()]); - self.block_datas.push(obj.into()); + let data = PyArray1::::zeros_bound(self.py, self.nrow, false); + let mask = PyArray1::::zeros_bound(self.py, self.nrow, false); + let obj = PyTuple::new_bound(self.py, vec![data.as_any(), mask.as_any()]); + self.block_datas.push(obj.into_any()); self.block_infos.push(block_info); } } } -impl<'a> Destination for PandasDestination<'a> { +impl<'py> Destination for PandasDestination<'py> { const DATA_ORDERS: &'static [DataOrder] = &[DataOrder::RowMajor]; type TypeSystem = PandasTypeSystem; - type Partition<'b> = PandasPartitionDestination<'b> where 'a: 'b; + type Partition<'b> = PandasPartitionDestination<'b> where 'py: 'b; type Error = ConnectorXPythonError; fn needs_count(&self) -> bool { @@ -197,10 +196,10 @@ impl<'a> Destination for PandasDestination<'a> { (0..self.schema.len()).map(|_| Vec::new()).collect(); for (idx, block) in self.block_infos.iter().enumerate() { - let buf = self.block_datas[idx]; + let buf = &self.block_datas[idx]; match block.dt { PandasBlockType::Boolean(_) => { - let bblock = BooleanBlock::extract(buf)?; + let bblock = buf.extract::()?; let bcols = bblock.split()?; for (&cid, bcol) in block.cids.iter().zip_eq(bcols) { @@ -212,7 +211,7 @@ impl<'a> Destination for PandasDestination<'a> { } } PandasBlockType::Float64 => { - let fblock = Float64Block::extract(buf)?; + let fblock = buf.extract::()?; let fcols = fblock.split()?; for (&cid, fcol) in block.cids.iter().zip_eq(fcols) { partitioned_columns[cid] = fcol @@ -223,7 +222,7 @@ impl<'a> Destination for PandasDestination<'a> { } } PandasBlockType::BooleanArray => { - let bblock = ArrayBlock::::extract(buf)?; + let bblock = buf.extract::>()?; let bcols = bblock.split()?; for (&cid, bcol) in block.cids.iter().zip_eq(bcols) { partitioned_columns[cid] = bcol @@ -234,7 +233,7 @@ impl<'a> Destination for PandasDestination<'a> { } } PandasBlockType::Float64Array => { - let fblock = ArrayBlock::::extract(buf)?; + let fblock = buf.extract::>()?; let fcols = fblock.split()?; for (&cid, fcol) in block.cids.iter().zip_eq(fcols) { partitioned_columns[cid] = fcol @@ -245,7 +244,7 @@ impl<'a> Destination for PandasDestination<'a> { } } PandasBlockType::Int64Array => { - let fblock = ArrayBlock::::extract(buf)?; + let fblock = buf.extract::>()?; let fcols = fblock.split()?; for (&cid, fcol) in block.cids.iter().zip_eq(fcols) { partitioned_columns[cid] = fcol @@ -256,7 +255,7 @@ impl<'a> Destination for PandasDestination<'a> { } } PandasBlockType::Int64(_) => { - let ublock = Int64Block::extract(buf)?; + let ublock = buf.extract::()?; let ucols = ublock.split()?; for (&cid, ucol) in block.cids.iter().zip_eq(ucols) { partitioned_columns[cid] = ucol @@ -267,7 +266,7 @@ impl<'a> Destination for PandasDestination<'a> { } } PandasBlockType::String => { - let sblock = StringBlock::extract(buf)?; + let sblock = buf.extract::()?; let scols = sblock.split()?; for (&cid, scol) in block.cids.iter().zip_eq(scols) { partitioned_columns[cid] = scol @@ -278,7 +277,7 @@ impl<'a> Destination for PandasDestination<'a> { } } PandasBlockType::Bytes => { - let bblock = BytesBlock::extract(buf)?; + let bblock = buf.extract::()?; let bcols = bblock.split()?; for (&cid, bcol) in block.cids.iter().zip_eq(bcols) { partitioned_columns[cid] = bcol @@ -289,7 +288,7 @@ impl<'a> Destination for PandasDestination<'a> { } } PandasBlockType::DateTime => { - let dblock = DateTimeBlock::extract(buf)?; + let dblock = buf.extract::()?; let dcols = dblock.split()?; for (&cid, dcol) in block.cids.iter().zip_eq(dcols) { partitioned_columns[cid] = dcol @@ -328,18 +327,18 @@ impl<'a> Destination for PandasDestination<'a> { self.schema.as_ref() } } -pub struct PandasPartitionDestination<'a> { - columns: Vec>, - schema: &'a [PandasTypeSystem], +pub struct PandasPartitionDestination<'py> { + columns: Vec>, + schema: &'py [PandasTypeSystem], seq: usize, glob_row: Arc, cur_row: usize, } -impl<'a> PandasPartitionDestination<'a> { +impl<'py> PandasPartitionDestination<'py> { fn new( - columns: Vec>, - schema: &'a [PandasTypeSystem], + columns: Vec>, + schema: &'py [PandasTypeSystem], glob_row: Arc, ) -> Self { Self { @@ -361,7 +360,7 @@ impl<'a> PandasPartitionDestination<'a> { } } -impl<'a> DestinationPartition<'a> for PandasPartitionDestination<'a> { +impl<'py> DestinationPartition<'py> for PandasPartitionDestination<'py> { type TypeSystem = PandasTypeSystem; type Error = ConnectorXPythonError; @@ -387,7 +386,7 @@ impl<'a> DestinationPartition<'a> for PandasPartitionDestination<'a> { } } -impl<'a, T> Consume for PandasPartitionDestination<'a> +impl<'py, T> Consume for PandasPartitionDestination<'py> where T: HasPandasColumn + TypeAssoc + std::fmt::Debug, { @@ -400,7 +399,7 @@ where // How do we check type id for borrowed types? // assert!(self.columns[col].typecheck(TypeId::of::())); - let (column, _): (&mut T::PandasColumn<'a>, *const ()) = + let (column, _): (&mut T::PandasColumn<'py>, *const ()) = unsafe { transmute(&*self.columns[col]) }; column.write(value, row) } diff --git a/connectorx-python/src/pandas/get_meta.rs b/connectorx-python/src/pandas/get_meta.rs index 7ee648e7d..f134c36c1 100644 --- a/connectorx-python/src/pandas/get_meta.rs +++ b/connectorx-python/src/pandas/get_meta.rs @@ -31,7 +31,12 @@ use std::convert::TryFrom; use std::sync::Arc; #[throws(ConnectorXPythonError)] -pub fn get_meta<'a>(py: Python<'a>, conn: &str, protocol: &str, query: String) -> &'a PyAny { +pub fn get_meta<'py>( + py: Python<'py>, + conn: &str, + protocol: &str, + query: String, +) -> Bound<'py, PyAny> { let source_conn = SourceConn::try_from(conn)?; let mut destination = PandasDestination::new(py); let queries = &[CXQuery::Naked(query)]; diff --git a/connectorx-python/src/pandas/mod.rs b/connectorx-python/src/pandas/mod.rs index 117280866..d5509877e 100644 --- a/connectorx-python/src/pandas/mod.rs +++ b/connectorx-python/src/pandas/mod.rs @@ -29,16 +29,16 @@ use fehler::throws; use log::debug; use postgres::NoTls; use postgres_openssl::MakeTlsConnector; -use pyo3::{PyAny, Python}; +use pyo3::prelude::*; use std::sync::Arc; #[throws(ConnectorXPythonError)] -pub fn write_pandas<'a>( - py: Python<'a>, +pub fn write_pandas<'py>( + py: Python<'py>, source_conn: &SourceConn, origin_query: Option, queries: &[CXQuery], -) -> &'a PyAny { +) -> Bound<'py, PyAny> { let mut destination = PandasDestination::new(py); let protocol = source_conn.proto.as_str(); debug!("Protocol: {}", protocol); diff --git a/connectorx-python/src/pandas/pandas_columns/datetime.rs b/connectorx-python/src/pandas/pandas_columns/datetime.rs index 1c927d9de..1a0fd5ee1 100644 --- a/connectorx-python/src/pandas/pandas_columns/datetime.rs +++ b/connectorx-python/src/pandas/pandas_columns/datetime.rs @@ -65,7 +65,11 @@ impl PandasColumnObject for DateTimeColumn { impl PandasColumn> for DateTimeColumn { #[throws(ConnectorXPythonError)] fn write(&mut self, val: DateTime, row: usize) { - unsafe { *self.data.add(row) = val.timestamp_nanos() }; + unsafe { + *self.data.add(row) = val + .timestamp_nanos_opt() + .unwrap_or_else(|| panic!("out of range DateTime")) + }; } } @@ -74,7 +78,12 @@ impl PandasColumn>> for DateTimeColumn { fn write(&mut self, val: Option>, row: usize) { // numpy use i64::MIN as NaT unsafe { - *self.data.add(row) = val.map(|t| t.timestamp_nanos()).unwrap_or(i64::MIN); + *self.data.add(row) = val + .map(|t| { + t.timestamp_nanos_opt() + .unwrap_or_else(|| panic!("out of range DateTime")) + }) + .unwrap_or(i64::MIN); }; } } diff --git a/connectorx-python/src/pandas/pystring.rs b/connectorx-python/src/pandas/pystring.rs index 5382ff2a8..e821e81b6 100644 --- a/connectorx-python/src/pandas/pystring.rs +++ b/connectorx-python/src/pandas/pystring.rs @@ -50,17 +50,7 @@ impl StringInfo { pub fn pystring(&self, py: Python) -> PyString { let objptr = unsafe { match self { - StringInfo::ASCII(len) => { - // let size = std::mem::size_of::() + (len + 1) * 1; - // println!( - // "len: {}, {}, size: {}", - // len, - // (*len as ffi::Py_ssize_t), - // size - // ); - // let tmp = ffi::PyObject_Malloc(size); - ffi::PyUnicode_New(*len as ffi::Py_ssize_t, 0x7F) - } + StringInfo::ASCII(len) => ffi::PyUnicode_New(*len as ffi::Py_ssize_t, 0x7F), StringInfo::UCS1(len) => ffi::PyUnicode_New(*len as ffi::Py_ssize_t, 0xFF), StringInfo::UCS2(len) => ffi::PyUnicode_New(*len as ffi::Py_ssize_t, 0xFFFF), StringInfo::UCS4(len) => ffi::PyUnicode_New(*len as ffi::Py_ssize_t, 0x10FFFF),