Skip to content

Commit

Permalink
upgrade arrow to 37 and datafusion to 23
Browse files Browse the repository at this point in the history
  • Loading branch information
houqp committed May 30, 2023
1 parent f1d4e5f commit 7df4bcc
Show file tree
Hide file tree
Showing 8 changed files with 1,087 additions and 330 deletions.
661 changes: 532 additions & 129 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion connectorx-cpp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ license = "MIT"
[dependencies]
libc = "0.2"
connectorx = {path = "../connectorx", default-features = false}
arrow = {version = "26", features = ["prettyprint", "ffi"]}
arrow = {version = "37", features = ["prettyprint", "ffi"]}

[lib]
crate-type = ["cdylib"]
Expand Down
67 changes: 11 additions & 56 deletions connectorx-cpp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ pub unsafe extern "C" fn connectorx_rewrite(

#[repr(C)]
pub struct CXArray {
array: *const FFI_ArrowArray,
schema: *const FFI_ArrowSchema,
array: FFI_ArrowArray,
schema: FFI_ArrowSchema,
}

#[repr(C)]
Expand All @@ -142,17 +142,6 @@ pub unsafe fn free_str(ptr: *const c_char) {
pub unsafe extern "C" fn free_result(res: *const CXResult) {
let header = get_vec::<_>((*res).header.ptr, (*res).header.len, (*res).header.capacity);
header.into_iter().for_each(|col| free_str(col));

let rbs = get_vec::<_>((*res).data.ptr, (*res).data.len, (*res).data.capacity);
rbs.into_iter().for_each(|rb| {
get_vec::<_>(rb.ptr, rb.len, rb.capacity)
.into_iter()
.for_each(|a| {
// Otherwise memory leak
std::sync::Arc::from_raw(a.array);
std::sync::Arc::from_raw(a.schema);
})
});
}

#[no_mangle]
Expand Down Expand Up @@ -183,13 +172,10 @@ pub unsafe extern "C" fn connectorx_scan(conn: *const c_char, query: *const c_ch
let mut cols = vec![];

for array in rb.columns() {
let data = array.data().clone();
let array = ArrowArray::try_new(data).expect("c ptr");
let (array_ptr, schema_ptr) = ArrowArray::into_raw(array);

let data = array.to_data();
let cx_array = CXArray {
array: array_ptr,
schema: schema_ptr,
schema: FFI_ArrowSchema::try_from(data.data_type()).unwrap(),
array: FFI_ArrowArray::new(&data),
};
cols.push(cx_array);
}
Expand Down Expand Up @@ -217,32 +203,6 @@ pub unsafe extern "C" fn free_iter(iter: *mut Box<dyn RecordBatchIterator>) {
let _ = Box::from_raw(iter);
}

#[no_mangle]
pub unsafe extern "C" fn free_schema(schema: *mut CXSchema) {
let res = Box::from_raw(schema);

let header = get_vec::<_>(res.headers.ptr, res.headers.len, res.headers.capacity);
header.into_iter().for_each(|col| free_str(col));

get_vec::<_>(res.types.ptr, res.types.len, res.types.capacity)
.into_iter()
.for_each(|a| {
std::sync::Arc::from_raw(a.array);
std::sync::Arc::from_raw(a.schema);
});
}

#[no_mangle]
pub unsafe extern "C" fn free_record_batch(rb: *mut CXSlice<CXArray>) {
let slice = Box::from_raw(rb);
get_vec::<_>(slice.ptr, slice.len, slice.capacity)
.into_iter()
.for_each(|a| {
std::sync::Arc::from_raw(a.array);
std::sync::Arc::from_raw(a.schema);
})
}

#[no_mangle]
pub unsafe extern "C" fn connectorx_scan_iter(
conn: *const c_char,
Expand Down Expand Up @@ -274,12 +234,10 @@ pub unsafe extern "C" fn connectorx_get_schema(
let (empty_batch, names) = arrow_iter.get_schema();
let mut cols = vec![];
for array in empty_batch.columns() {
let data = array.data().clone();
let array = ArrowArray::try_new(data).expect("c ptr");
let (array_ptr, schema_ptr) = ArrowArray::into_raw(array);
let data = array.to_data();
let cx_array = CXArray {
array: array_ptr,
schema: schema_ptr,
schema: FFI_ArrowSchema::try_from(data.data_type()).unwrap(),
array: FFI_ArrowArray::new(&data),
};
cols.push(cx_array);
}
Expand Down Expand Up @@ -317,13 +275,10 @@ pub unsafe extern "C" fn connectorx_iter_next(
let mut cols = vec![];

for array in rb.columns() {
let data = array.data().clone();
let array = ArrowArray::try_new(data).expect("c ptr");
let (array_ptr, schema_ptr) = ArrowArray::into_raw(array);

let data = array.to_data();
let cx_array = CXArray {
array: array_ptr,
schema: schema_ptr,
schema: FFI_ArrowSchema::try_from(data.data_type()).unwrap(),
array: FFI_ArrowArray::new(&data),
};
cols.push(cx_array);
}
Expand Down
Loading

0 comments on commit 7df4bcc

Please sign in to comment.