Skip to content

Commit

Permalink
fix string column
Browse files Browse the repository at this point in the history
  • Loading branch information
dovahcrow committed Feb 25, 2021
1 parent b439e6c commit eceb156
Show file tree
Hide file tree
Showing 15 changed files with 341 additions and 52 deletions.
11 changes: 11 additions & 0 deletions .cargo/config
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[target.x86_64-apple-darwin]
rustflags = [
"-C", "link-arg=-undefined",
"-C", "link-arg=dynamic_lookup",
]

[target.aarch64-apple-darwin]
rustflags = [
"-C", "link-arg=-undefined",
"-C", "link-arg=dynamic_lookup",
]
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ flamegraph.svg
perf.data
.env
.venv
.pytest_cache
.pytest_cache
data.txt
16 changes: 15 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 12 additions & 2 deletions connector-agent-python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,25 @@ futures = "0.3"
itertools = "0.10"
log = "0.4"
ndarray = "0.14"
numpy = {git = "https://github.com/dovahcrow/rust-numpy"}
pyo3 = {version = "0.13", features = ["extension-module"]}
numpy = "0.13"
pyo3 = {version = "0.13", default-features = false, features = ["macros"]}
pyo3-built = "0.4"
thiserror = "1"
tokio = {version = "1", features = ["rt-multi-thread", "io-util"]}

[build-dependencies]
built = {version = "0.4", features = ["chrono"]}

[dev-dependencies]
criterion = "0.3"
criterion-macro = "0.3"
rayon = "1"

[lib]
crate-type = ["cdylib"]
name = "connector_agent_python"

[features]
default = ["extension"]
extension = ["pyo3/extension-module"]
tests = ["pyo3/auto-initialize"]
136 changes: 136 additions & 0 deletions connector-agent-python/benches/bench_string.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
#![feature(custom_test_frameworks)]
#![test_runner(criterion::runner)]

use anyhow::Error;
use criterion::{black_box, Criterion};
use criterion_macro::criterion;
use ndarray::{ArrayViewMut1, Axis};
use numpy::PyArray1;
use pyo3::{
types::{IntoPyDict, PyString},
PyObject, PyResult, Python, ToPyObject,
};
use rayon::prelude::*;
use std::fs::read_to_string;

#[criterion(Criterion::default())]
fn benchmark(c: &mut Criterion) {
let data = read_to_string("data.txt").unwrap();
let mut strings: Vec<_> = data.split('\n').collect();
strings.extend(strings.clone());
strings.extend(strings.clone());
strings.extend(strings.clone());
strings.extend(strings.clone());

rayon::ThreadPoolBuilder::new()
.num_threads(1)
.build_global()
.unwrap();

c.bench_function("single_thread_batch_alloc", |b| {
b.iter(|| {
Python::with_gil(|py| single_thread_batch_alloc(py, black_box(&strings)).unwrap())
})
});

c.bench_function("single_thread", |b| {
b.iter(|| Python::with_gil(|py| single_thread(py, black_box(&strings)).unwrap()))
});
}

fn single_thread(py: Python, strings: &[&str]) -> Result<(), Error> {
let mut view = create_numpy_object_array(py, strings.len()).unwrap();

for (i, s) in strings.iter().enumerate() {
view[i] = PyString::new(py, s).into();
}
Ok(())
}

fn single_thread_batch_alloc(py: Python, strings: &[&str]) -> Result<(), Error> {
let chunksize = 30000;
let mut view = create_numpy_object_array(py, strings.len()).unwrap();

let mut views = vec![];

while view.len() != 0 {
let size = view.len().min(chunksize);
let (a, b) = view.split_at(Axis(0), size);
views.push(a);
view = b;
}

py.allow_threads(|| {
strings
.par_chunks(chunksize)
.zip_eq(views)
.for_each(|(s, mut v)| {
Python::with_gil(|py| {
for i in 0..s.len() {
v[i] = pystring::new(py, s[i].len()).to_object(py);
}
});

let py = unsafe { Python::assume_gil_acquired() };
unsafe {
for i in 0..s.len() {
pystring::write(v[i].as_ref(py).downcast::<PyString>().unwrap(), &s[i])
}
}
});
});

Ok(())
}

fn create_numpy_object_array<'a>(py: Python<'a>, num: usize) -> PyResult<ArrayViewMut1<PyObject>> {
let locals = [("pd", py.import("pandas")?), ("np", py.import("numpy")?)].into_py_dict(py);
let code = format!("np.full({}, pd.NA)", num);
let array = py.eval(&code, None, Some(locals))?;
let pyarray = array.downcast::<PyArray1<PyObject>>()?;
Ok(unsafe { pyarray.as_array_mut() })
}

mod pystring {
use pyo3::{ffi, types::PyString, Py, Python};

pub fn new(py: Python, len: usize) -> Py<PyString> {
let objptr = unsafe {
ffi::PyUnicode_New(len as ffi::Py_ssize_t, /*0x10FFFF*/ 127)
};

let s: &PyString = unsafe { py.from_owned_ptr(objptr) };
s.into()
}

pub unsafe fn write(pystring: &PyString, val: &str) {
let ascii = PyASCIIObject::from_ref(pystring);
let buf = std::slice::from_raw_parts_mut(
(ascii as *mut PyASCIIObject).offset(1) as *mut u8,
ascii.length as usize,
);
buf.copy_from_slice(val.as_bytes());
}

#[repr(C)]
pub struct PyASCIIObject {
obj: ffi::PyObject,
length: ffi::Py_ssize_t,
hash: ffi::Py_hash_t,
opaque: u32,
wstr: *mut u8,
}

impl PyASCIIObject {
// pub unsafe fn from_owned<'a>(obj: Py<PyString>) -> &'a mut Self {
// let ascii: &mut PyASCIIObject = std::mem::transmute(obj);
// ascii
// }

pub unsafe fn from_ref(obj: &PyString) -> &mut Self {
#[allow(mutable_transmutes)]
let ascii: &mut PyASCIIObject = std::mem::transmute(obj);
ascii
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ def test_write_pandas(postgres_url: str) -> None:

schema = ["uint64", "UInt64", "string", "float64", "boolean"]
df = write_pandas(postgres_url, queries, schema)

print(df)
print(df["2"].dtype)
print(type(df["2"][0]))
expected = pd.DataFrame(
index=range(6),
data={
Expand All @@ -34,4 +36,8 @@ def test_write_pandas(postgres_url: str) -> None:
},
)

for i in range(6):
print(df["2"][i], df["2"][i] == expected["2"][i])

print(hash(df["2"][0]), hash(expected["2"][0]))
assert_frame_equal(df, expected, check_names=True)
9 changes: 9 additions & 0 deletions connector-agent-python/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ pub enum ConnectorAgentPythonError {
#[error("Unknown pandas data type: {0}.")]
UnknownPandasType(String),

#[error("Python: {0}.")]
PythonError(String),

#[error(transparent)]
ConnectorAgentError(#[from] connector_agent::ConnectorAgentError),

Expand All @@ -24,3 +27,9 @@ impl From<ConnectorAgentPythonError> for PyErr {
PyRuntimeError::new_err(format!("{}", e))
}
}

impl From<PyErr> for ConnectorAgentPythonError {
fn from(e: PyErr) -> ConnectorAgentPythonError {
ConnectorAgentPythonError::PythonError(format!("{}", e))
}
}
6 changes: 4 additions & 2 deletions connector-agent-python/src/pandas/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod pandas_columns;
mod pystring;
mod writers;

use crate::errors::{ConnectorAgentPythonError, Result};
Expand All @@ -23,11 +24,12 @@ pub fn write_pandas<'a>(
let mut writer = PandasWriter::new(py);
let sb = PostgresDataSourceBuilder::new(conn);

// ! Do not unlock GIL. Object columns might need to allocate a python object while writing.
// ! They carried the assumption that GIL is already acquired and use unsafe Python::assume_gil_acquired.
// TODO: unlock gil for these two line
let dispatcher = Dispatcher::new(sb, &mut writer, queries, &schema);
dispatcher.run_checked()?;

writer.write_string_columns()?;

writer.result().ok_or(anyhow!("writer not run"))?
}

Expand Down
4 changes: 2 additions & 2 deletions connector-agent-python/src/pandas/pandas_columns/boolean.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::{check_numpy_dtype, HasPandasColumn, PandasColumn, PandasColumnObject};
use super::{check_dtype, HasPandasColumn, PandasColumn, PandasColumnObject};
use ndarray::{ArrayViewMut1, ArrayViewMut2, Axis, Ix2};
use numpy::{PyArray, PyArray1};
use pyo3::{FromPyObject, PyAny, PyResult};
Expand All @@ -12,7 +12,7 @@ pub enum BooleanBlock<'a> {
impl<'a> FromPyObject<'a> for BooleanBlock<'a> {
fn extract(ob: &'a PyAny) -> PyResult<Self> {
if let Ok(array) = ob.downcast::<PyArray<bool, Ix2>>() {
check_numpy_dtype(ob, "bool")?;
check_dtype(ob, "bool")?;
let data = unsafe { array.as_array_mut() };
Ok(BooleanBlock::NumPy(data))
} else {
Expand Down
4 changes: 2 additions & 2 deletions connector-agent-python/src/pandas/pandas_columns/float64.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::{check_numpy_dtype, HasPandasColumn, PandasColumn, PandasColumnObject};
use super::{check_dtype, HasPandasColumn, PandasColumn, PandasColumnObject};
use ndarray::{ArrayViewMut1, ArrayViewMut2, Axis, Ix2};
use numpy::PyArray;
use pyo3::{FromPyObject, PyAny, PyResult};
Expand All @@ -11,7 +11,7 @@ pub struct Float64Block<'a> {

impl<'a> FromPyObject<'a> for Float64Block<'a> {
fn extract(ob: &'a PyAny) -> PyResult<Self> {
check_numpy_dtype(ob, "float64")?;
check_dtype(ob, "float64")?;
let array = ob.downcast::<PyArray<f64, Ix2>>()?;
let data = unsafe { array.as_array_mut() };
Ok(Float64Block { data })
Expand Down
2 changes: 1 addition & 1 deletion connector-agent-python/src/pandas/pandas_columns/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub trait HasPandasColumn: Sized {
type PandasColumn<'a>: PandasColumn<Self>;
}

fn check_numpy_dtype(ob: &PyAny, expected_dtype: &str) -> PyResult<()> {
pub fn check_dtype(ob: &PyAny, expected_dtype: &str) -> PyResult<()> {
let dtype = ob.getattr("dtype")?.str()?;
let dtype = dtype.to_str()?;
if dtype != expected_dtype {
Expand Down
41 changes: 11 additions & 30 deletions connector-agent-python/src/pandas/pandas_columns/string.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,18 @@
use super::{check_numpy_dtype, HasPandasColumn, PandasColumn, PandasColumnObject};
use super::{HasPandasColumn, PandasColumn, PandasColumnObject};
use ndarray::{ArrayViewMut1, Axis};
use numpy::PyArray1;
use pyo3::{types::PyString, FromPyObject, PyAny, PyObject, PyResult, Python};
use std::any::TypeId;

// Pandas squeezes na and string object into a single array
// Defer string writing to the end: We are not able to allocate string objects
// in this stage because python requires a GIL to be hold.
pub struct StringColumn<'a> {
data: ArrayViewMut1<'a, PyObject>,
data: ArrayViewMut1<'a, Option<String>>,
}

impl<'a> FromPyObject<'a> for StringColumn<'a> {
fn extract(ob: &'a PyAny) -> PyResult<Self> {
check_numpy_dtype(ob, "string")?;
let data = ob.getattr("_ndarray")?;
check_numpy_dtype(data, "object")?;

Ok(StringColumn {
data: unsafe {
data.downcast::<PyArray1<PyObject>>()
.unwrap()
.as_array_mut()
},
})
impl<'a> StringColumn<'a> {
pub fn new(buf: &'a mut [Option<String>]) -> Self {
StringColumn {
data: ArrayViewMut1::from(buf),
}
}
}

Expand All @@ -39,23 +30,13 @@ impl<'a> PandasColumnObject for StringColumn<'a> {

impl<'a> PandasColumn<String> for StringColumn<'a> {
fn write(&mut self, i: usize, val: String) {
let py = unsafe { Python::assume_gil_acquired() };
let val = PyString::new(py, &val).into();

self.data[i] = val;
self.data[i] = Some(val);
}
}

impl<'a> PandasColumn<Option<String>> for StringColumn<'a> {
fn write(&mut self, i: usize, val: Option<String>) {
match val {
Some(s) => {
let py = unsafe { Python::assume_gil_acquired() };
let val = PyString::new(py, &s).into();
self.data[i] = val;
}
None => {}
}
self.data[i] = val;
}
}

Expand Down
Loading

0 comments on commit eceb156

Please sign in to comment.