Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

upgrade jni and tokio #74

Merged
merged 7 commits into from
Sep 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public CompletableFuture<ArrowReader> collect(BufferAllocator allocator) {
}

private boolean containsError(String errString) {
return errString != null && !"".equals(errString);
return errString != null && !errString.isEmpty();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public CompletableFuture<DataFrame> sql(String sql) {
getPointer(),
sql,
(errMessage, dataframeId) -> {
if (null != errMessage && !errMessage.equals("")) {
if (null != errMessage && !errMessage.isEmpty()) {
future.completeExceptionally(new RuntimeException(errMessage));
} else {
DefaultDataFrame frame = new DefaultDataFrame(DefaultSessionContext.this, dataframeId);
Expand Down Expand Up @@ -65,7 +65,7 @@ public CompletableFuture<Void> registerParquet(String name, Path path) {
}

private void voidCallback(CompletableFuture<Void> future, String errMessage) {
if (null != errMessage && !errMessage.equals("")) {
if (null != errMessage && !errMessage.isEmpty()) {
future.completeExceptionally(new RuntimeException(errMessage));
} else {
future.complete(null);
Expand Down
4 changes: 2 additions & 2 deletions datafusion-jni/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ license = "Apache-2.0"
edition = "2021"

[dependencies]
jni = "^0.19.0"
tokio = "^1.18.0"
jni = "^0.21.0"
tokio = "^1.32.0"
arrow = "^22.0"
datafusion = "^12.0"

Expand Down
71 changes: 40 additions & 31 deletions datafusion-jni/src/context.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use datafusion::execution::context::SessionContext;
use datafusion::prelude::{CsvReadOptions, ParquetReadOptions};
use jni::objects::{JClass, JObject, JString, JValue};
use jni::objects::{JClass, JObject, JString};
use jni::sys::jlong;
use jni::JNIEnv;
use tokio::runtime::Runtime;

#[no_mangle]
pub extern "system" fn Java_org_apache_arrow_datafusion_DefaultSessionContext_registerCsv(
env: JNIEnv,
mut env: JNIEnv,
_class: JClass,
runtime: jlong,
pointer: jlong,
Expand All @@ -17,35 +17,38 @@ pub extern "system" fn Java_org_apache_arrow_datafusion_DefaultSessionContext_re
) {
let runtime = unsafe { &mut *(runtime as *mut Runtime) };
let name: String = env
.get_string(name)
.get_string(&name)
.expect("Couldn't get name as string!")
.into();
let path: String = env
.get_string(path)
.get_string(&path)
.expect("Couldn't get name as string!")
.into();
let context = unsafe { &mut *(pointer as *mut SessionContext) };
runtime.block_on(async {
let register_result = context
.register_csv(&name, &path, CsvReadOptions::new())
.await;
let err_message: JValue = match register_result {
Ok(_) => JValue::Void,
Err(err) => {
let err_message = env
.new_string(err.to_string())
.expect("Couldn't create java string!");
err_message.into()
}
let err_message = match register_result {
Ok(_) => "".to_string(),
Err(err) => err.to_string(),
};
env.call_method(callback, "accept", "(Ljava/lang/Object;)V", &[err_message])
.expect("failed to callback method");
let err_message = env
.new_string(err_message)
.expect("Couldn't create java string!");
env.call_method(
callback,
"accept",
"(Ljava/lang/Object;)V",
&[(&err_message).into()],
)
.expect("failed to callback method");
});
}

#[no_mangle]
pub extern "system" fn Java_org_apache_arrow_datafusion_DefaultSessionContext_registerParquet(
env: JNIEnv,
mut env: JNIEnv,
_class: JClass,
runtime: jlong,
pointer: jlong,
Expand All @@ -55,35 +58,38 @@ pub extern "system" fn Java_org_apache_arrow_datafusion_DefaultSessionContext_re
) {
let runtime = unsafe { &mut *(runtime as *mut Runtime) };
let name: String = env
.get_string(name)
.get_string(&name)
.expect("Couldn't get name as string!")
.into();
let path: String = env
.get_string(path)
.get_string(&path)
.expect("Couldn't get path as string!")
.into();
let context = unsafe { &mut *(pointer as *mut SessionContext) };
runtime.block_on(async {
let register_result = context
.register_parquet(&name, &path, ParquetReadOptions::default())
.await;
let err_message: JValue = match register_result {
Ok(_) => JValue::Void,
Err(err) => {
let err_message = env
.new_string(err.to_string())
.expect("Couldn't create java string!");
err_message.into()
}
let err_message = match register_result {
Ok(_) => "".to_string(),
Err(err) => err.to_string(),
};
env.call_method(callback, "accept", "(Ljava/lang/Object;)V", &[err_message])
.expect("failed to callback method");
let err_message = env
.new_string(err_message)
.expect("Couldn't create java string!");
env.call_method(
callback,
"accept",
"(Ljava/lang/Object;)V",
&[(&err_message).into()],
)
.expect("failed to callback method");
});
}

#[no_mangle]
pub extern "system" fn Java_org_apache_arrow_datafusion_DefaultSessionContext_querySql(
env: JNIEnv,
mut env: JNIEnv,
_class: JClass,
runtime: jlong,
pointer: jlong,
Expand All @@ -92,20 +98,23 @@ pub extern "system" fn Java_org_apache_arrow_datafusion_DefaultSessionContext_qu
) {
let runtime = unsafe { &mut *(runtime as *mut Runtime) };
let sql: String = env
.get_string(sql)
.get_string(&sql)
.expect("Couldn't get sql as string!")
.into();
let context = unsafe { &mut *(pointer as *mut SessionContext) };
runtime.block_on(async {
let query_result = context.sql(&sql).await;
match query_result {
Ok(v) => {
let empty_str = env
.new_string("".to_string())
.expect("Couldn't create java string!");
let dataframe = Box::into_raw(Box::new(v)) as jlong;
env.call_method(
callback,
"callback",
"(Ljava/lang/String;J)V",
&[JValue::Void, dataframe.into()],
&[(&empty_str).into(), dataframe.into()],
)
}
Err(err) => {
Expand All @@ -117,7 +126,7 @@ pub extern "system" fn Java_org_apache_arrow_datafusion_DefaultSessionContext_qu
callback,
"callback",
"(Ljava/lang/String;J)V",
&[err_message.into(), dataframe.into()],
&[(&err_message).into(), dataframe.into()],
)
}
}
Expand Down
86 changes: 39 additions & 47 deletions datafusion-jni/src/dataframe.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use arrow::ipc::writer::FileWriter;
use datafusion::dataframe::DataFrame;
use datafusion::prelude::SessionContext;
use jni::objects::{JClass, JObject, JString, JValue};
use jni::objects::{JClass, JObject, JString};
use jni::sys::jlong;
use jni::JNIEnv;
use std::convert::Into;
Expand All @@ -12,7 +12,7 @@ use tokio::runtime::Runtime;

#[no_mangle]
pub extern "system" fn Java_org_apache_arrow_datafusion_DataFrames_collectDataframe(
env: JNIEnv,
mut env: JNIEnv,
_class: JClass,
runtime: jlong,
dataframe: jlong,
Expand All @@ -39,21 +39,21 @@ pub extern "system" fn Java_org_apache_arrow_datafusion_DataFrames_collectDatafr
.new_string("".to_string())
.expect("Couldn't create java string!");
let ba = env
.byte_array_from_slice(&buff.get_ref())
.byte_array_from_slice(buff.get_ref())
.expect("cannot create empty byte array");
env.call_method(
callback,
"accept",
"(Ljava/lang/Object;Ljava/lang/Object;)V",
&[err_message.into(), ba.into()],
&[(&err_message).into(), (&ba).into()],
)
.expect("failed to call method");
});
}

#[no_mangle]
pub extern "system" fn Java_org_apache_arrow_datafusion_DataFrames_showDataframe(
env: JNIEnv,
mut env: JNIEnv,
_class: JClass,
runtime: jlong,
dataframe: jlong,
Expand All @@ -63,28 +63,26 @@ pub extern "system" fn Java_org_apache_arrow_datafusion_DataFrames_showDataframe
let dataframe = unsafe { &mut *(dataframe as *mut Arc<DataFrame>) };
runtime.block_on(async {
let r = dataframe.show().await;
let err_message: JValue = match r {
Ok(_) => JValue::Void,
Err(err) => {
let err_message = env
.new_string(err.to_string())
.expect("Couldn't create java string!");
err_message.into()
}
let err_message = match r {
Ok(_) => "".to_string(),
Err(err) => err.to_string(),
};
let err_message = env
.new_string(err_message)
.expect("Couldn't create java string!");
env.call_method(
callback,
"accept",
"(Ljava/lang/Object;)V",
&[err_message.into()],
&[(&err_message).into()],
)
.expect("failed to call method");
});
}

#[no_mangle]
pub extern "system" fn Java_org_apache_arrow_datafusion_DataFrames_writeParquet(
env: JNIEnv,
mut env: JNIEnv,
_class: JClass,
runtime: jlong,
dataframe: jlong,
Expand All @@ -94,33 +92,31 @@ pub extern "system" fn Java_org_apache_arrow_datafusion_DataFrames_writeParquet(
let runtime = unsafe { &mut *(runtime as *mut Runtime) };
let dataframe = unsafe { &mut *(dataframe as *mut Arc<DataFrame>) };
let path: String = env
.get_string(path)
.get_string(&path)
.expect("Couldn't get path as string!")
.into();
runtime.block_on(async {
let r = dataframe.write_parquet(&path, None).await;
let err_message: JValue = match r {
Ok(_) => JValue::Void,
Err(err) => {
let err_message = env
.new_string(err.to_string())
.expect("Couldn't create java string!");
err_message.into()
}
let err_message = match r {
Ok(_) => "".to_string(),
Err(err) => err.to_string(),
};
let err_message = env
.new_string(err_message)
.expect("Couldn't create java string!");
env.call_method(
callback,
"accept",
"(Ljava/lang/Object;)V",
&[err_message.into()],
&[(&err_message).into()],
)
.expect("failed to call method");
});
}

#[no_mangle]
pub extern "system" fn Java_org_apache_arrow_datafusion_DataFrames_writeCsv(
env: JNIEnv,
mut env: JNIEnv,
_class: JClass,
runtime: jlong,
dataframe: jlong,
Expand All @@ -130,33 +126,31 @@ pub extern "system" fn Java_org_apache_arrow_datafusion_DataFrames_writeCsv(
let runtime = unsafe { &mut *(runtime as *mut Runtime) };
let dataframe = unsafe { &mut *(dataframe as *mut Arc<DataFrame>) };
let path: String = env
.get_string(path)
.get_string(&path)
.expect("Couldn't get path as string!")
.into();
runtime.block_on(async {
let r = dataframe.write_csv(&path).await;
let err_message: JValue = match r {
Ok(_) => JValue::Void,
Err(err) => {
let err_message = env
.new_string(err.to_string())
.expect("Couldn't create java string!");
err_message.into()
}
let err_message = match r {
Ok(_) => "".to_string(),
Err(err) => err.to_string(),
};
let err_message = env
.new_string(err_message)
.expect("Couldn't create java string!");
env.call_method(
callback,
"accept",
"(Ljava/lang/Object;)V",
&[err_message.into()],
&[(&err_message).into()],
)
.expect("failed to call method");
});
}

#[no_mangle]
pub extern "system" fn Java_org_apache_arrow_datafusion_DataFrames_registerTable(
env: JNIEnv,
mut env: JNIEnv,
_class: JClass,
runtime: jlong,
dataframe: jlong,
Expand All @@ -168,25 +162,23 @@ pub extern "system" fn Java_org_apache_arrow_datafusion_DataFrames_registerTable
let dataframe = unsafe { &mut *(dataframe as *mut Arc<DataFrame>) };
let context = unsafe { &mut *(session as *mut SessionContext) };
let name: String = env
.get_string(name)
.get_string(&name)
.expect("Couldn't get name as string!")
.into();
runtime.block_on(async {
let r = context.register_table(name.as_str(), dataframe.clone());
let err_message: JValue = match r {
Ok(_) => JValue::Void,
Err(err) => {
let err_message = env
.new_string(err.to_string())
.expect("Couldn't create java string!");
err_message.into()
}
let err_message = match r {
Ok(_) => "".to_string(),
Err(err) => err.to_string(),
};
let err_message = env
.new_string(err_message)
.expect("Couldn't create java string!");
env.call_method(
callback,
"accept",
"(Ljava/lang/Object;)V",
&[err_message.into()],
&[(&err_message).into()],
)
.expect("failed to call method");
});
Expand Down
Loading