Skip to content

Commit 31464bc

Browse files
authored
Merge pull request #1 from EricFecteau/arrow_rs_polars
Update arrow-rs with .polars
2 parents 31a89c4 + b8d2df2 commit 31464bc

File tree

6 files changed

+428
-54
lines changed

6 files changed

+428
-54
lines changed

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,5 @@ lto = true
1010
[workspace.dependencies]
1111
arrow = {version = "46", features = ["prettyprint", "ffi"]}
1212
arrow2 = {version = "0.17", default-features = false}
13+
polars = {version = "0.45", features=["dtype-u8", "dtype-u16", "lazy"]}
14+
polars-arrow = {version = "0.45"}

connectorx/Cargo.toml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ chrono = "0.4"
2424

2525
arrow = {workspace = true, optional = true}
2626
arrow2 = {workspace = true, default-features = false, optional = true}
27+
polars = {workspace = true, optional = true, features=["dtype-u8", "dtype-u16", "lazy"]}
28+
polars-arrow = {workspace = true, optional = true}
2729
bb8 = {version = "0.7", optional = true}
2830
bb8-tiberius = {version = "0.8", optional = true}
2931
csv = {version = "1", optional = true}
@@ -36,8 +38,6 @@ ndarray = {version = "0.15", optional = true}
3638
num-traits = {version = "0.2", optional = true}
3739
openssl = {version = "0.10", optional = true, features = ["vendored"]}
3840
oracle = {version = "0.5", optional = true}
39-
polars = {version = "0.45", optional = true, features=["dtype-u8", "dtype-u16", "lazy"]}
40-
polars-arrow = {version = "0.45", optional = true}
4141
postgres = {version = "0.19", features = ["with-chrono-0_4", "with-uuid-0_8", "with-serde_json-1"], optional = true}
4242
postgres-native-tls = {version = "0.5", optional = true}
4343
postgres-openssl = {version = "0.5", optional = true}
@@ -72,11 +72,12 @@ iai = "0.1"
7272
pprof = {version = "0.5", features = ["flamegraph"]}
7373

7474
[features]
75-
all = ["src_sqlite", "src_postgres", "src_mysql", "src_mssql", "src_oracle", "src_bigquery", "src_csv", "src_dummy", "src_trino", "dst_arrow", "dst_arrow2", "federation", "fed_exec"]
75+
all = ["src_sqlite", "src_postgres", "src_mysql", "src_mssql", "src_oracle", "src_bigquery", "src_csv", "src_dummy", "src_trino", "dst_arrow", "dst_arrow2", "dst_arrow", "federation", "fed_exec", "dst_polars"]
7676
branch = []
7777
default = ["fptr"]
7878
dst_arrow = ["arrow"]
7979
dst_arrow2 = ["polars", "polars-arrow", "arrow2"]
80+
dst_polars = ["dst_arrow", "polars", "polars-arrow"]
8081
fptr = []
8182
src_bigquery = ["gcp-bigquery-client", "tokio"]
8283
src_csv = ["csv", "regex"]

connectorx/src/destinations/arrow/errors.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@ pub enum ArrowDestinationError {
1010
#[error(transparent)]
1111
ConnectorXError(#[from] crate::errors::ConnectorXError),
1212

13+
#[cfg(feature = "dst_polars")]
14+
#[error(transparent)]
15+
PolarsError(#[from] polars::error::PolarsError),
16+
1317
/// Any other errors that are too trivial to be put here explicitly.
1418
#[error(transparent)]
1519
Other(#[from] anyhow::Error),

connectorx/src/destinations/arrow/mod.rs

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,15 @@ use std::{
2222
sync::{Arc, Mutex},
2323
};
2424

25+
#[cfg(feature = "dst_polars")]
26+
use {
27+
arrow::ffi::to_ffi,
28+
polars::prelude::{concat, DataFrame, IntoLazy, PlSmallStr, Series, UnionArgs},
29+
polars_arrow::ffi::{import_array_from_c, import_field_from_c},
30+
std::iter::FromIterator,
31+
std::mem::transmute,
32+
};
33+
2534
type Builder = Box<dyn Any + Send>;
2635
type Builders = Vec<Builder>;
2736

@@ -125,6 +134,63 @@ impl ArrowDestination {
125134
.map_err(|e| anyhow!("mutex poisoned {}", e))?
126135
}
127136

137+
#[cfg(feature = "dst_polars")]
138+
#[throws(ArrowDestinationError)]
139+
pub fn polars(self) -> DataFrame {
140+
// Convert to arrow first
141+
let rbs = self.arrow()?;
142+
143+
// Ready LazyFrame vector for the chunks
144+
let mut lf_vec = vec![];
145+
146+
for chunk in rbs.into_iter() {
147+
// Column vector
148+
let mut columns = Vec::with_capacity(chunk.num_columns());
149+
150+
// Arrow stores data by columns, therefore need to be Zero-copied by column
151+
for (i, col) in chunk.columns().iter().enumerate() {
152+
// Convert to ArrayData (arrow-rs)
153+
let array = col.to_data();
154+
155+
// Convert to ffi with arrow-rs
156+
let (out_array, out_schema) = to_ffi(&array).unwrap();
157+
158+
// Import field from ffi with polars
159+
let field = unsafe {
160+
import_field_from_c(transmute::<
161+
&arrow::ffi::FFI_ArrowSchema,
162+
&polars_arrow::ffi::ArrowSchema,
163+
>(&out_schema))
164+
}
165+
.unwrap();
166+
167+
// Import data from ffi with polars
168+
let data = unsafe {
169+
import_array_from_c(
170+
transmute::<arrow::ffi::FFI_ArrowArray, polars_arrow::ffi::ArrowArray>(
171+
out_array,
172+
),
173+
field.dtype().clone(),
174+
)
175+
}
176+
.unwrap();
177+
178+
// Create Polars series from arrow column
179+
columns.push(Series::from_arrow(
180+
PlSmallStr::from(chunk.schema().field(i).name()),
181+
data,
182+
)?);
183+
}
184+
185+
// Create DataFrame from the columns
186+
lf_vec.push(DataFrame::from_iter(columns).lazy());
187+
}
188+
189+
// Concat the chunks
190+
let union_args = UnionArgs::default();
191+
concat(lf_vec, union_args)?.collect()?
192+
}
193+
128194
#[throws(ArrowDestinationError)]
129195
pub fn record_batch(&mut self) -> Option<RecordBatch> {
130196
let mut guard = self

0 commit comments

Comments
 (0)