Skip to content

Commit

Permalink
update most deprecated pyo3 APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
wangxiaoying committed May 6, 2024
1 parent 0078b82 commit bec5b1c
Show file tree
Hide file tree
Showing 10 changed files with 96 additions and 86 deletions.
11 changes: 9 additions & 2 deletions connectorx-python/examples/tpch.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use connectorx_python::read_sql::{read_sql, PartitionQuery};
use connectorx_python::cx_read_sql::{read_sql, PyPartitionQuery};
use pyo3::Python;
use std::env;

Expand All @@ -17,12 +17,19 @@ pub fn run(nq: usize, conn: &str) {
"pandas",
None,
None,
Some(PartitionQuery::new(QUERY, "L_ORDERKEY", None, None, nq)),
Some(PyPartitionQuery {
query: QUERY.to_string(),
column: "L_ORDERKEY".to_string(),
min: None,
max: None,
num: nq,
}),
)
.unwrap();
});
}

#[allow(dead_code)]
fn main() {
run(1, "POSTGRES_URL");
}
8 changes: 4 additions & 4 deletions connectorx-python/src/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,17 @@ use std::convert::TryFrom;
use std::sync::Arc;

#[throws(ConnectorXPythonError)]
pub fn write_arrow<'a>(
py: Python<'a>,
pub fn write_arrow<'py>(
py: Python<'py>,
source_conn: &SourceConn,
origin_query: Option<String>,
queries: &[CXQuery<String>],
) -> &'a PyAny {
) -> Bound<'py, PyAny> {
let destination = get_arrow(source_conn, origin_query, queries)?;
let rbs = destination.arrow()?;
let ptrs = to_ptrs(rbs);
let obj: PyObject = ptrs.into_py(py);
obj.into_ref(py)
obj.into_bound(py)
}

pub fn to_ptrs(rbs: Vec<RecordBatch>) -> (Vec<String>, Vec<Vec<(uintptr_t, uintptr_t)>>) {
Expand Down
8 changes: 4 additions & 4 deletions connectorx-python/src/arrow2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,17 @@ use pyo3::{PyAny, Python};
use std::sync::Arc;

#[throws(ConnectorXPythonError)]
pub fn write_arrow<'a>(
py: Python<'a>,
pub fn write_arrow<'py>(
py: Python<'py>,
source_conn: &SourceConn,
origin_query: Option<String>,
queries: &[CXQuery<String>],
) -> &'a PyAny {
) -> Bound<'py, PyAny> {
let destination = get_arrow2(source_conn, origin_query, queries)?;
let (rbs, schema) = destination.arrow()?;
let ptrs = to_ptrs(rbs, schema);
let obj: PyObject = ptrs.into_py(py);
obj.into_ref(py)
obj.into_bound(py)
}

fn to_ptrs(
Expand Down
16 changes: 8 additions & 8 deletions connectorx-python/src/cx_read_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ use crate::errors::ConnectorXPythonError;
#[derive(FromPyObject)]
#[pyo3(from_item_all)]
pub struct PyPartitionQuery {
query: String,
column: String,
min: Option<i64>,
max: Option<i64>,
num: usize,
pub query: String,
pub column: String,
pub min: Option<i64>,
pub max: Option<i64>,
pub num: usize,
}

impl Into<PartitionQuery> for PyPartitionQuery {
Expand All @@ -31,14 +31,14 @@ impl Into<PartitionQuery> for PyPartitionQuery {
}
}

pub fn read_sql<'a>(
py: Python<'a>,
pub fn read_sql<'py>(
py: Python<'py>,
conn: &str,
return_type: &str,
protocol: Option<&str>,
queries: Option<Vec<String>>,
partition_query: Option<PyPartitionQuery>,
) -> PyResult<&'a PyAny> {
) -> PyResult<Bound<'py, PyAny>> {
let source_conn = parse_source(conn, protocol).map_err(|e| ConnectorXPythonError::from(e))?;
let (queries, origin_query) = match (queries, partition_query) {
(Some(queries), None) => (queries.into_iter().map(CXQuery::Naked).collect(), None),
Expand Down
22 changes: 11 additions & 11 deletions connectorx-python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ static START: Once = Once::new();
// }

#[pymodule]
fn connectorx(_: Python, m: &PyModule) -> PyResult<()> {
fn connectorx(_: Python, m: &Bound<'_, PyModule>) -> PyResult<()> {
START.call_once(|| {
let _ = env_logger::try_init();
});
Expand All @@ -40,14 +40,14 @@ fn connectorx(_: Python, m: &PyModule) -> PyResult<()> {
}

#[pyfunction]
pub fn read_sql<'a>(
py: Python<'a>,
pub fn read_sql<'py>(
py: Python<'py>,
conn: &str,
return_type: &str,
protocol: Option<&str>,
queries: Option<Vec<String>>,
partition_query: Option<cx_read_sql::PyPartitionQuery>,
) -> PyResult<&'a PyAny> {
) -> PyResult<Bound<'py, PyAny>> {
cx_read_sql::read_sql(py, conn, return_type, protocol, queries, partition_query)
}

Expand All @@ -64,11 +64,11 @@ pub fn partition_sql(
}

#[pyfunction]
pub fn read_sql2<'a>(
py: Python<'a>,
pub fn read_sql2<'py>(
py: Python<'py>,
sql: &str,
db_map: HashMap<String, String>,
) -> PyResult<&'a PyAny> {
) -> PyResult<Bound<'py, PyAny>> {
let rbs = run(
sql.to_string(),
db_map,
Expand All @@ -81,16 +81,16 @@ pub fn read_sql2<'a>(
.map_err(|e| PyRuntimeError::new_err(format!("{}", e)))?;
let ptrs = arrow::to_ptrs(rbs);
let obj: PyObject = ptrs.into_py(py);
Ok(obj.into_ref(py))
Ok(obj.into_bound(py))
}

#[pyfunction]
pub fn get_meta<'a>(
py: Python<'a>,
pub fn get_meta<'py>(
py: Python<'py>,
conn: &str,
query: String,
protocol: Option<&str>,
) -> PyResult<&'a PyAny> {
) -> PyResult<Bound<'py, PyAny>> {
pandas::get_meta::get_meta(py, conn, protocol.unwrap_or("binary"), query)
.map_err(|e| From::from(e))
}
Loading

0 comments on commit bec5b1c

Please sign in to comment.