From e7fb67e498ebb1d7b60711b15eb27b8e87b80fff Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Fri, 1 Sep 2023 14:56:38 +0800 Subject: [PATCH 1/7] upgrade jni and tokio --- .../arrow/datafusion/DefaultDataFrame.java | 2 +- .../datafusion/DefaultSessionContext.java | 4 +- datafusion-jni/Cargo.toml | 4 +- datafusion-jni/src/context.rs | 84 ++++++++------ datafusion-jni/src/dataframe.rs | 108 ++++++++---------- 5 files changed, 101 insertions(+), 101 deletions(-) diff --git a/datafusion-java/src/main/java/org/apache/arrow/datafusion/DefaultDataFrame.java b/datafusion-java/src/main/java/org/apache/arrow/datafusion/DefaultDataFrame.java index 394668f..05c284e 100644 --- a/datafusion-java/src/main/java/org/apache/arrow/datafusion/DefaultDataFrame.java +++ b/datafusion-java/src/main/java/org/apache/arrow/datafusion/DefaultDataFrame.java @@ -42,7 +42,7 @@ public CompletableFuture collect(BufferAllocator allocator) { } private boolean containsError(String errString) { - return errString != null && !"".equals(errString); + return errString != null && !errString.isEmpty(); } @Override diff --git a/datafusion-java/src/main/java/org/apache/arrow/datafusion/DefaultSessionContext.java b/datafusion-java/src/main/java/org/apache/arrow/datafusion/DefaultSessionContext.java index 233ad27..fe4a33f 100644 --- a/datafusion-java/src/main/java/org/apache/arrow/datafusion/DefaultSessionContext.java +++ b/datafusion-java/src/main/java/org/apache/arrow/datafusion/DefaultSessionContext.java @@ -28,7 +28,7 @@ public CompletableFuture sql(String sql) { getPointer(), sql, (errMessage, dataframeId) -> { - if (null != errMessage && !errMessage.equals("")) { + if (null != errMessage && !errMessage.isEmpty()) { future.completeExceptionally(new RuntimeException(errMessage)); } else { DefaultDataFrame frame = new DefaultDataFrame(DefaultSessionContext.this, dataframeId); @@ -65,7 +65,7 @@ public CompletableFuture registerParquet(String name, Path path) { } private void voidCallback(CompletableFuture future, String errMessage) { - if (null != errMessage && !errMessage.equals("")) { + if (null != errMessage && !errMessage.isEmpty()) { future.completeExceptionally(new RuntimeException(errMessage)); } else { future.complete(null); diff --git a/datafusion-jni/Cargo.toml b/datafusion-jni/Cargo.toml index b7f9be4..cf445f9 100644 --- a/datafusion-jni/Cargo.toml +++ b/datafusion-jni/Cargo.toml @@ -10,8 +10,8 @@ license = "Apache-2.0" edition = "2021" [dependencies] -jni = "^0.19.0" -tokio = "^1.18.0" +jni = "^0.21.0" +tokio = "^1.32.0" arrow = "^22.0" datafusion = "^12.0" diff --git a/datafusion-jni/src/context.rs b/datafusion-jni/src/context.rs index a58ed87..07d614b 100644 --- a/datafusion-jni/src/context.rs +++ b/datafusion-jni/src/context.rs @@ -6,14 +6,16 @@ use jni::JNIEnv; use tokio::runtime::Runtime; #[no_mangle] -pub extern "system" fn Java_org_apache_arrow_datafusion_DefaultSessionContext_registerCsv( - env: JNIEnv, - _class: JClass, +pub extern "system" fn Java_org_apache_arrow_datafusion_DefaultSessionContext_registerCsv< + 'local, +>( + mut env: JNIEnv<'local>, + _class: &JClass<'local>, runtime: jlong, pointer: jlong, - name: JString, - path: JString, - callback: JObject, + name: &JString<'local>, + path: &JString<'local>, + callback: &JObject<'local>, ) { let runtime = unsafe { &mut *(runtime as *mut Runtime) }; let name: String = env @@ -29,29 +31,32 @@ pub extern "system" fn Java_org_apache_arrow_datafusion_DefaultSessionContext_re let register_result = context .register_csv(&name, &path, CsvReadOptions::new()) .await; - let err_message: JValue = match register_result { - Ok(_) => JValue::Void, - Err(err) => { - let err_message = env - .new_string(err.to_string()) - .expect("Couldn't create java string!"); - err_message.into() - } + let err_message = match register_result { + Ok(_) => "".to_string(), + Err(err) => err.to_string(), }; - env.call_method(callback, "accept", "(Ljava/lang/Object;)V", &[err_message]) - .expect("failed to callback method"); + let err_message = env + .new_string(err_message) + .expect("Couldn't create java string!"); + env.call_method( + callback, + "accept", + "(Ljava/lang/Object;)V", + &[(&err_message).into()], + ) + .expect("failed to callback method"); }); } #[no_mangle] pub extern "system" fn Java_org_apache_arrow_datafusion_DefaultSessionContext_registerParquet( - env: JNIEnv, - _class: JClass, + mut env: JNIEnv, + _class: &JClass, runtime: jlong, pointer: jlong, - name: JString, - path: JString, - callback: JObject, + name: &JString, + path: &JString, + callback: &JObject, ) { let runtime = unsafe { &mut *(runtime as *mut Runtime) }; let name: String = env @@ -67,28 +72,31 @@ pub extern "system" fn Java_org_apache_arrow_datafusion_DefaultSessionContext_re let register_result = context .register_parquet(&name, &path, ParquetReadOptions::default()) .await; - let err_message: JValue = match register_result { - Ok(_) => JValue::Void, - Err(err) => { - let err_message = env - .new_string(err.to_string()) - .expect("Couldn't create java string!"); - err_message.into() - } + let err_message = match register_result { + Ok(_) => "".to_string(), + Err(err) => err.to_string(), }; - env.call_method(callback, "accept", "(Ljava/lang/Object;)V", &[err_message]) - .expect("failed to callback method"); + let err_message = env + .new_string(err_message) + .expect("Couldn't create java string!"); + env.call_method( + callback, + "accept", + "(Ljava/lang/Object;)V", + &[(&err_message).into()], + ) + .expect("failed to callback method"); }); } #[no_mangle] pub extern "system" fn Java_org_apache_arrow_datafusion_DefaultSessionContext_querySql( - env: JNIEnv, - _class: JClass, + mut env: JNIEnv, + _class: &JClass, runtime: jlong, pointer: jlong, - sql: JString, - callback: JObject, + sql: &JString, + callback: &JObject, ) { let runtime = unsafe { &mut *(runtime as *mut Runtime) }; let sql: String = env @@ -117,7 +125,7 @@ pub extern "system" fn Java_org_apache_arrow_datafusion_DefaultSessionContext_qu callback, "callback", "(Ljava/lang/String;J)V", - &[err_message.into(), dataframe.into()], + &[(&err_message).into(), dataframe.into()], ) } } @@ -127,7 +135,7 @@ pub extern "system" fn Java_org_apache_arrow_datafusion_DefaultSessionContext_qu #[no_mangle] pub extern "system" fn Java_org_apache_arrow_datafusion_SessionContexts_destroySessionContext( _env: JNIEnv, - _class: JClass, + _class: &JClass, pointer: jlong, ) { let _ = unsafe { Box::from_raw(pointer as *mut SessionContext) }; @@ -136,7 +144,7 @@ pub extern "system" fn Java_org_apache_arrow_datafusion_SessionContexts_destroyS #[no_mangle] pub extern "system" fn Java_org_apache_arrow_datafusion_SessionContexts_createSessionContext( _env: JNIEnv, - _class: JClass, + _class: &JClass, ) -> jlong { let context = SessionContext::new(); Box::into_raw(Box::new(context)) as jlong diff --git a/datafusion-jni/src/dataframe.rs b/datafusion-jni/src/dataframe.rs index 3a3d8fd..00fadb7 100644 --- a/datafusion-jni/src/dataframe.rs +++ b/datafusion-jni/src/dataframe.rs @@ -1,7 +1,7 @@ use arrow::ipc::writer::FileWriter; use datafusion::dataframe::DataFrame; use datafusion::prelude::SessionContext; -use jni::objects::{JClass, JObject, JString, JValue}; +use jni::objects::{JClass, JObject, JString}; use jni::sys::jlong; use jni::JNIEnv; use std::convert::Into; @@ -12,11 +12,11 @@ use tokio::runtime::Runtime; #[no_mangle] pub extern "system" fn Java_org_apache_arrow_datafusion_DataFrames_collectDataframe( - env: JNIEnv, - _class: JClass, + mut env: JNIEnv, + _class: &JClass, runtime: jlong, dataframe: jlong, - callback: JObject, + callback: &JObject, ) { let runtime = unsafe { &mut *(runtime as *mut Runtime) }; let dataframe = unsafe { &mut *(dataframe as *mut Arc) }; @@ -39,13 +39,13 @@ pub extern "system" fn Java_org_apache_arrow_datafusion_DataFrames_collectDatafr .new_string("".to_string()) .expect("Couldn't create java string!"); let ba = env - .byte_array_from_slice(&buff.get_ref()) + .byte_array_from_slice(buff.get_ref()) .expect("cannot create empty byte array"); env.call_method( callback, "accept", "(Ljava/lang/Object;Ljava/lang/Object;)V", - &[err_message.into(), ba.into()], + &[(&err_message).into(), (&ba).into()], ) .expect("failed to call method"); }); @@ -53,30 +53,28 @@ pub extern "system" fn Java_org_apache_arrow_datafusion_DataFrames_collectDatafr #[no_mangle] pub extern "system" fn Java_org_apache_arrow_datafusion_DataFrames_showDataframe( - env: JNIEnv, - _class: JClass, + mut env: JNIEnv, + _class: &JClass, runtime: jlong, dataframe: jlong, - callback: JObject, + callback: &JObject, ) { let runtime = unsafe { &mut *(runtime as *mut Runtime) }; let dataframe = unsafe { &mut *(dataframe as *mut Arc) }; runtime.block_on(async { let r = dataframe.show().await; - let err_message: JValue = match r { - Ok(_) => JValue::Void, - Err(err) => { - let err_message = env - .new_string(err.to_string()) - .expect("Couldn't create java string!"); - err_message.into() - } + let err_message = match r { + Ok(_) => "".to_string(), + Err(err) => err.to_string(), }; + let err_message = env + .new_string(err_message) + .expect("Couldn't create java string!"); env.call_method( callback, "accept", "(Ljava/lang/Object;)V", - &[err_message.into()], + &[(&err_message).into()], ) .expect("failed to call method"); }); @@ -84,12 +82,12 @@ pub extern "system" fn Java_org_apache_arrow_datafusion_DataFrames_showDataframe #[no_mangle] pub extern "system" fn Java_org_apache_arrow_datafusion_DataFrames_writeParquet( - env: JNIEnv, - _class: JClass, + mut env: JNIEnv, + _class: &JClass, runtime: jlong, dataframe: jlong, - path: JString, - callback: JObject, + path: &JString, + callback: &JObject, ) { let runtime = unsafe { &mut *(runtime as *mut Runtime) }; let dataframe = unsafe { &mut *(dataframe as *mut Arc) }; @@ -99,20 +97,18 @@ pub extern "system" fn Java_org_apache_arrow_datafusion_DataFrames_writeParquet( .into(); runtime.block_on(async { let r = dataframe.write_parquet(&path, None).await; - let err_message: JValue = match r { - Ok(_) => JValue::Void, - Err(err) => { - let err_message = env - .new_string(err.to_string()) - .expect("Couldn't create java string!"); - err_message.into() - } + let err_message = match r { + Ok(_) => "".to_string(), + Err(err) => err.to_string(), }; + let err_message = env + .new_string(err_message) + .expect("Couldn't create java string!"); env.call_method( callback, "accept", "(Ljava/lang/Object;)V", - &[err_message.into()], + &[(&err_message).into()], ) .expect("failed to call method"); }); @@ -120,12 +116,12 @@ pub extern "system" fn Java_org_apache_arrow_datafusion_DataFrames_writeParquet( #[no_mangle] pub extern "system" fn Java_org_apache_arrow_datafusion_DataFrames_writeCsv( - env: JNIEnv, - _class: JClass, + mut env: JNIEnv, + _class: &JClass, runtime: jlong, dataframe: jlong, - path: JString, - callback: JObject, + path: &JString, + callback: &JObject, ) { let runtime = unsafe { &mut *(runtime as *mut Runtime) }; let dataframe = unsafe { &mut *(dataframe as *mut Arc) }; @@ -135,20 +131,18 @@ pub extern "system" fn Java_org_apache_arrow_datafusion_DataFrames_writeCsv( .into(); runtime.block_on(async { let r = dataframe.write_csv(&path).await; - let err_message: JValue = match r { - Ok(_) => JValue::Void, - Err(err) => { - let err_message = env - .new_string(err.to_string()) - .expect("Couldn't create java string!"); - err_message.into() - } + let err_message = match r { + Ok(_) => "".to_string(), + Err(err) => err.to_string(), }; + let err_message = env + .new_string(err_message) + .expect("Couldn't create java string!"); env.call_method( callback, "accept", "(Ljava/lang/Object;)V", - &[err_message.into()], + &[(&err_message).into()], ) .expect("failed to call method"); }); @@ -156,13 +150,13 @@ pub extern "system" fn Java_org_apache_arrow_datafusion_DataFrames_writeCsv( #[no_mangle] pub extern "system" fn Java_org_apache_arrow_datafusion_DataFrames_registerTable( - env: JNIEnv, - _class: JClass, + mut env: JNIEnv, + _class: &JClass, runtime: jlong, dataframe: jlong, session: jlong, - name: JString, - callback: JObject, + name: &JString, + callback: &JObject, ) { let runtime = unsafe { &mut *(runtime as *mut Runtime) }; let dataframe = unsafe { &mut *(dataframe as *mut Arc) }; @@ -173,20 +167,18 @@ pub extern "system" fn Java_org_apache_arrow_datafusion_DataFrames_registerTable .into(); runtime.block_on(async { let r = context.register_table(name.as_str(), dataframe.clone()); - let err_message: JValue = match r { - Ok(_) => JValue::Void, - Err(err) => { - let err_message = env - .new_string(err.to_string()) - .expect("Couldn't create java string!"); - err_message.into() - } + let err_message = match r { + Ok(_) => "".to_string(), + Err(err) => err.to_string(), }; + let err_message = env + .new_string(err_message) + .expect("Couldn't create java string!"); env.call_method( callback, "accept", "(Ljava/lang/Object;)V", - &[err_message.into()], + &[(&err_message).into()], ) .expect("failed to call method"); }); @@ -195,7 +187,7 @@ pub extern "system" fn Java_org_apache_arrow_datafusion_DataFrames_registerTable #[no_mangle] pub extern "system" fn Java_org_apache_arrow_datafusion_DataFrames_destroyDataFrame( _env: JNIEnv, - _class: JClass, + _class: &JClass, pointer: jlong, ) { let _ = unsafe { Box::from_raw(pointer as *mut Arc) }; From 1f0692abb571609ce12fb27a8d012f550c45427d Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Fri, 1 Sep 2023 15:58:13 +0800 Subject: [PATCH 2/7] use no reference --- datafusion-jni/src/context.rs | 42 ++++++++++++++++----------------- datafusion-jni/src/dataframe.rs | 34 +++++++++++++------------- 2 files changed, 37 insertions(+), 39 deletions(-) diff --git a/datafusion-jni/src/context.rs b/datafusion-jni/src/context.rs index 07d614b..7bd2ced 100644 --- a/datafusion-jni/src/context.rs +++ b/datafusion-jni/src/context.rs @@ -6,24 +6,22 @@ use jni::JNIEnv; use tokio::runtime::Runtime; #[no_mangle] -pub extern "system" fn Java_org_apache_arrow_datafusion_DefaultSessionContext_registerCsv< - 'local, ->( - mut env: JNIEnv<'local>, - _class: &JClass<'local>, +pub extern "system" fn Java_org_apache_arrow_datafusion_DefaultSessionContext_registerCsv( + mut env: JNIEnv, + _class: JClass, runtime: jlong, pointer: jlong, - name: &JString<'local>, - path: &JString<'local>, - callback: &JObject<'local>, + name: JString, + path: JString, + callback: JObject, ) { let runtime = unsafe { &mut *(runtime as *mut Runtime) }; let name: String = env - .get_string(name) + .get_string(&name) .expect("Couldn't get name as string!") .into(); let path: String = env - .get_string(path) + .get_string(&path) .expect("Couldn't get name as string!") .into(); let context = unsafe { &mut *(pointer as *mut SessionContext) }; @@ -51,20 +49,20 @@ pub extern "system" fn Java_org_apache_arrow_datafusion_DefaultSessionContext_re #[no_mangle] pub extern "system" fn Java_org_apache_arrow_datafusion_DefaultSessionContext_registerParquet( mut env: JNIEnv, - _class: &JClass, + _class: JClass, runtime: jlong, pointer: jlong, - name: &JString, - path: &JString, - callback: &JObject, + name: JString, + path: JString, + callback: JObject, ) { let runtime = unsafe { &mut *(runtime as *mut Runtime) }; let name: String = env - .get_string(name) + .get_string(&name) .expect("Couldn't get name as string!") .into(); let path: String = env - .get_string(path) + .get_string(&path) .expect("Couldn't get path as string!") .into(); let context = unsafe { &mut *(pointer as *mut SessionContext) }; @@ -92,15 +90,15 @@ pub extern "system" fn Java_org_apache_arrow_datafusion_DefaultSessionContext_re #[no_mangle] pub extern "system" fn Java_org_apache_arrow_datafusion_DefaultSessionContext_querySql( mut env: JNIEnv, - _class: &JClass, + _class: JClass, runtime: jlong, pointer: jlong, - sql: &JString, - callback: &JObject, + sql: JString, + callback: JObject, ) { let runtime = unsafe { &mut *(runtime as *mut Runtime) }; let sql: String = env - .get_string(sql) + .get_string(&sql) .expect("Couldn't get sql as string!") .into(); let context = unsafe { &mut *(pointer as *mut SessionContext) }; @@ -135,7 +133,7 @@ pub extern "system" fn Java_org_apache_arrow_datafusion_DefaultSessionContext_qu #[no_mangle] pub extern "system" fn Java_org_apache_arrow_datafusion_SessionContexts_destroySessionContext( _env: JNIEnv, - _class: &JClass, + _class: JClass, pointer: jlong, ) { let _ = unsafe { Box::from_raw(pointer as *mut SessionContext) }; @@ -144,7 +142,7 @@ pub extern "system" fn Java_org_apache_arrow_datafusion_SessionContexts_destroyS #[no_mangle] pub extern "system" fn Java_org_apache_arrow_datafusion_SessionContexts_createSessionContext( _env: JNIEnv, - _class: &JClass, + _class: JClass, ) -> jlong { let context = SessionContext::new(); Box::into_raw(Box::new(context)) as jlong diff --git a/datafusion-jni/src/dataframe.rs b/datafusion-jni/src/dataframe.rs index 00fadb7..5a4aa7c 100644 --- a/datafusion-jni/src/dataframe.rs +++ b/datafusion-jni/src/dataframe.rs @@ -13,10 +13,10 @@ use tokio::runtime::Runtime; #[no_mangle] pub extern "system" fn Java_org_apache_arrow_datafusion_DataFrames_collectDataframe( mut env: JNIEnv, - _class: &JClass, + _class: JClass, runtime: jlong, dataframe: jlong, - callback: &JObject, + callback: JObject, ) { let runtime = unsafe { &mut *(runtime as *mut Runtime) }; let dataframe = unsafe { &mut *(dataframe as *mut Arc) }; @@ -54,10 +54,10 @@ pub extern "system" fn Java_org_apache_arrow_datafusion_DataFrames_collectDatafr #[no_mangle] pub extern "system" fn Java_org_apache_arrow_datafusion_DataFrames_showDataframe( mut env: JNIEnv, - _class: &JClass, + _class: JClass, runtime: jlong, dataframe: jlong, - callback: &JObject, + callback: JObject, ) { let runtime = unsafe { &mut *(runtime as *mut Runtime) }; let dataframe = unsafe { &mut *(dataframe as *mut Arc) }; @@ -83,16 +83,16 @@ pub extern "system" fn Java_org_apache_arrow_datafusion_DataFrames_showDataframe #[no_mangle] pub extern "system" fn Java_org_apache_arrow_datafusion_DataFrames_writeParquet( mut env: JNIEnv, - _class: &JClass, + _class: JClass, runtime: jlong, dataframe: jlong, - path: &JString, - callback: &JObject, + path: JString, + callback: JObject, ) { let runtime = unsafe { &mut *(runtime as *mut Runtime) }; let dataframe = unsafe { &mut *(dataframe as *mut Arc) }; let path: String = env - .get_string(path) + .get_string(&path) .expect("Couldn't get path as string!") .into(); runtime.block_on(async { @@ -117,16 +117,16 @@ pub extern "system" fn Java_org_apache_arrow_datafusion_DataFrames_writeParquet( #[no_mangle] pub extern "system" fn Java_org_apache_arrow_datafusion_DataFrames_writeCsv( mut env: JNIEnv, - _class: &JClass, + _class: JClass, runtime: jlong, dataframe: jlong, - path: &JString, - callback: &JObject, + path: JString, + callback: JObject, ) { let runtime = unsafe { &mut *(runtime as *mut Runtime) }; let dataframe = unsafe { &mut *(dataframe as *mut Arc) }; let path: String = env - .get_string(path) + .get_string(&path) .expect("Couldn't get path as string!") .into(); runtime.block_on(async { @@ -151,18 +151,18 @@ pub extern "system" fn Java_org_apache_arrow_datafusion_DataFrames_writeCsv( #[no_mangle] pub extern "system" fn Java_org_apache_arrow_datafusion_DataFrames_registerTable( mut env: JNIEnv, - _class: &JClass, + _class: JClass, runtime: jlong, dataframe: jlong, session: jlong, - name: &JString, - callback: &JObject, + name: JString, + callback: JObject, ) { let runtime = unsafe { &mut *(runtime as *mut Runtime) }; let dataframe = unsafe { &mut *(dataframe as *mut Arc) }; let context = unsafe { &mut *(session as *mut SessionContext) }; let name: String = env - .get_string(name) + .get_string(&name) .expect("Couldn't get name as string!") .into(); runtime.block_on(async { @@ -187,7 +187,7 @@ pub extern "system" fn Java_org_apache_arrow_datafusion_DataFrames_registerTable #[no_mangle] pub extern "system" fn Java_org_apache_arrow_datafusion_DataFrames_destroyDataFrame( _env: JNIEnv, - _class: &JClass, + _class: JClass, pointer: jlong, ) { let _ = unsafe { Box::from_raw(pointer as *mut Arc) }; From 1533346e7c59133b4470c49c068fe91cbad827bc Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Fri, 1 Sep 2023 16:06:29 +0800 Subject: [PATCH 3/7] fix gradle build --- datafusion-java/build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion-java/build.gradle b/datafusion-java/build.gradle index 892daaf..0425fc4 100644 --- a/datafusion-java/build.gradle +++ b/datafusion-java/build.gradle @@ -30,7 +30,7 @@ java { withSourcesJar() compileJava { - options.compilerArgs += ["-h", "${buildDir}/target/headers"] + options.compilerArgs += ["-h", "${layout.buildDirectory}/target/headers"] } } From 562fd5b9ebab2dcbb690801bbf78fea854ff0729 Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Fri, 1 Sep 2023 18:04:41 +0800 Subject: [PATCH 4/7] write rs --- .../datafusion/examples/ExampleMain.java | 11 +- datafusion-java/build.gradle | 2 +- .../apache/arrow/datafusion/DataFrame.java | 9 +- .../apache/arrow/datafusion/DataFrames.java | 3 +- .../arrow/datafusion/DefaultDataFrame.java | 22 +-- .../datafusion/DefaultSessionContext.java | 157 ++++++++++-------- .../datafusion/DefaultTableProvider.java | 12 ++ .../arrow/datafusion/SessionContext.java | 12 ++ .../arrow/datafusion/TableProvider.java | 4 + .../arrow/datafusion/TableProviders.java | 8 + datafusion-jni/Cargo.toml | 4 +- datafusion-jni/src/context.rs | 28 ++++ datafusion-jni/src/dataframe.rs | 62 ++----- datafusion-jni/src/lib.rs | 1 + datafusion-jni/src/table_provider.rs | 14 ++ 15 files changed, 197 insertions(+), 152 deletions(-) create mode 100644 datafusion-java/src/main/java/org/apache/arrow/datafusion/DefaultTableProvider.java create mode 100644 datafusion-java/src/main/java/org/apache/arrow/datafusion/TableProvider.java create mode 100644 datafusion-java/src/main/java/org/apache/arrow/datafusion/TableProviders.java create mode 100644 datafusion-jni/src/table_provider.rs diff --git a/datafusion-examples/src/main/java/org/apache/arrow/datafusion/examples/ExampleMain.java b/datafusion-examples/src/main/java/org/apache/arrow/datafusion/examples/ExampleMain.java index d568405..b2f2e1a 100644 --- a/datafusion-examples/src/main/java/org/apache/arrow/datafusion/examples/ExampleMain.java +++ b/datafusion-examples/src/main/java/org/apache/arrow/datafusion/examples/ExampleMain.java @@ -55,7 +55,16 @@ public static void main(String[] args) throws Exception { context .sql("select * from test_parquet limit 3") - .thenComposeAsync(df -> df.registerTable(context, "test_parquet_limited")) + .thenAccept( + df -> { + try { + boolean previouslyRegistered = + context.registerTable("test_parquet_limited", df.intoView()).isPresent(); + assert !previouslyRegistered; + } catch (Exception e) { + throw new RuntimeException(e); + } + }) .join(); context.sql("select * from test_parquet_limited").thenComposeAsync(DataFrame::show).join(); diff --git a/datafusion-java/build.gradle b/datafusion-java/build.gradle index 0425fc4..0041a92 100644 --- a/datafusion-java/build.gradle +++ b/datafusion-java/build.gradle @@ -30,7 +30,7 @@ java { withSourcesJar() compileJava { - options.compilerArgs += ["-h", "${layout.buildDirectory}/target/headers"] + options.compilerArgs += ["-h", "${layout.buildDirectory.asFile.get()}/target/headers"] } } diff --git a/datafusion-java/src/main/java/org/apache/arrow/datafusion/DataFrame.java b/datafusion-java/src/main/java/org/apache/arrow/datafusion/DataFrame.java index 28c3b4d..f2d9750 100644 --- a/datafusion-java/src/main/java/org/apache/arrow/datafusion/DataFrame.java +++ b/datafusion-java/src/main/java/org/apache/arrow/datafusion/DataFrame.java @@ -45,11 +45,8 @@ public interface DataFrame extends NativeProxy { CompletableFuture writeCsv(Path path); /** - * Register this dataframe as a temporary table. - * - * @param context SessionContext to register table to - * @param name name of the tmp table - * @return null + * Converts this DataFrame into a TableProvider that can be registered as a table view using + * {@link SessionContext#registerParquet(String, Path)} */ - CompletableFuture registerTable(SessionContext context, String name); + TableProvider intoView(); } diff --git a/datafusion-java/src/main/java/org/apache/arrow/datafusion/DataFrames.java b/datafusion-java/src/main/java/org/apache/arrow/datafusion/DataFrames.java index 611f382..3791fb1 100644 --- a/datafusion-java/src/main/java/org/apache/arrow/datafusion/DataFrames.java +++ b/datafusion-java/src/main/java/org/apache/arrow/datafusion/DataFrames.java @@ -20,6 +20,5 @@ static native void writeParquet( static native void writeCsv(long runtime, long dataframe, String path, Consumer callback); - static native void registerTable( - long runtime, long dataframe, long context, String name, Consumer callback); + static native long intoView(long dataframe); } diff --git a/datafusion-java/src/main/java/org/apache/arrow/datafusion/DefaultDataFrame.java b/datafusion-java/src/main/java/org/apache/arrow/datafusion/DefaultDataFrame.java index 05c284e..ae67d9d 100644 --- a/datafusion-java/src/main/java/org/apache/arrow/datafusion/DefaultDataFrame.java +++ b/datafusion-java/src/main/java/org/apache/arrow/datafusion/DefaultDataFrame.java @@ -104,25 +104,11 @@ public CompletableFuture writeCsv(Path path) { return future; } - public CompletableFuture registerTable(SessionContext ctx, String name) { - Runtime runtime = context.getRuntime(); - long runtimePointer = runtime.getPointer(); + @Override + public TableProvider intoView() { long dataframe = getPointer(); - long contextPointer = ctx.getPointer(); - CompletableFuture future = new CompletableFuture<>(); - DataFrames.registerTable( - runtimePointer, - dataframe, - contextPointer, - name, - (String errString) -> { - if (containsError(errString)) { - future.completeExceptionally(new RuntimeException(errString)); - } else { - future.complete(null); - } - }); - return future; + long tableProviderPointer = DataFrames.intoView(dataframe); + return new DefaultTableProvider(tableProviderPointer); } @Override diff --git a/datafusion-java/src/main/java/org/apache/arrow/datafusion/DefaultSessionContext.java b/datafusion-java/src/main/java/org/apache/arrow/datafusion/DefaultSessionContext.java index fe4a33f..a13e19a 100644 --- a/datafusion-java/src/main/java/org/apache/arrow/datafusion/DefaultSessionContext.java +++ b/datafusion-java/src/main/java/org/apache/arrow/datafusion/DefaultSessionContext.java @@ -1,92 +1,107 @@ package org.apache.arrow.datafusion; import java.nio.file.Path; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; class DefaultSessionContext extends AbstractProxy implements SessionContext { - private static final Logger logger = LoggerFactory.getLogger(DefaultSessionContext.class); + private static final Logger logger = LoggerFactory.getLogger(DefaultSessionContext.class); + + static native void querySql( + long runtime, long context, String sql, ObjectResultCallback callback); - static native void querySql( - long runtime, long context, String sql, ObjectResultCallback callback); + static native void registerCsv( + long runtime, long context, String name, String path, Consumer callback); - static native void registerCsv( - long runtime, long context, String name, String path, Consumer callback); + static native void registerParquet( + long runtime, long context, String name, String path, Consumer callback); - static native void registerParquet( - long runtime, long context, String name, String path, Consumer callback); + static native long registerTable(long context, String name, long tableProvider) + throws Exception; + + @Override + public CompletableFuture sql(String sql) { + long runtime = getRuntime().getPointer(); + CompletableFuture future = new CompletableFuture<>(); + querySql( + runtime, + getPointer(), + sql, + (errMessage, dataframeId) -> { + if (null != errMessage && !errMessage.isEmpty()) { + future.completeExceptionally(new RuntimeException(errMessage)); + } else { + DefaultDataFrame frame = new DefaultDataFrame(DefaultSessionContext.this, dataframeId); + future.complete(frame); + } + }); + return future; + } - @Override - public CompletableFuture sql(String sql) { - long runtime = getRuntime().getPointer(); - CompletableFuture future = new CompletableFuture<>(); - querySql( - runtime, - getPointer(), - sql, - (errMessage, dataframeId) -> { - if (null != errMessage && !errMessage.isEmpty()) { + @Override + public CompletableFuture registerCsv(String name, Path path) { + long runtime = getRuntime().getPointer(); + CompletableFuture future = new CompletableFuture<>(); + registerCsv( + runtime, + getPointer(), + name, + path.toAbsolutePath().toString(), + (errMessage) -> voidCallback(future, errMessage)); + return future; + } + + @Override + public CompletableFuture registerParquet(String name, Path path) { + long runtime = getRuntime().getPointer(); + CompletableFuture future = new CompletableFuture<>(); + registerParquet( + runtime, + getPointer(), + name, + path.toAbsolutePath().toString(), + (errMessage) -> voidCallback(future, errMessage)); + return future; + } + + @Override + public Optional registerTable(String name, TableProvider tableProvider) + throws Exception { + long previouslyRegistered = registerTable(getPointer(), name, tableProvider.getPointer()); + if (previouslyRegistered == 0) { + return Optional.empty(); + } + return Optional.of(new DefaultTableProvider(previouslyRegistered)); + } + + private void voidCallback(CompletableFuture future, String errMessage) { + if (null != errMessage && !errMessage.isEmpty()) { future.completeExceptionally(new RuntimeException(errMessage)); - } else { - DefaultDataFrame frame = new DefaultDataFrame(DefaultSessionContext.this, dataframeId); - future.complete(frame); - } - }); - return future; - } - - @Override - public CompletableFuture registerCsv(String name, Path path) { - long runtime = getRuntime().getPointer(); - CompletableFuture future = new CompletableFuture<>(); - registerCsv( - runtime, - getPointer(), - name, - path.toAbsolutePath().toString(), - (errMessage) -> voidCallback(future, errMessage)); - return future; - } - - @Override - public CompletableFuture registerParquet(String name, Path path) { - long runtime = getRuntime().getPointer(); - CompletableFuture future = new CompletableFuture<>(); - registerParquet( - runtime, - getPointer(), - name, - path.toAbsolutePath().toString(), - (errMessage) -> voidCallback(future, errMessage)); - return future; - } - - private void voidCallback(CompletableFuture future, String errMessage) { - if (null != errMessage && !errMessage.isEmpty()) { - future.completeExceptionally(new RuntimeException(errMessage)); - } else { - future.complete(null); + } else { + future.complete(null); + } } - } - @Override - public Runtime getRuntime() { - return runtime; - } + @Override + public Runtime getRuntime() { + return runtime; + } - private final TokioRuntime runtime; + private final TokioRuntime runtime; - DefaultSessionContext(long pointer) { - super(pointer); - this.runtime = TokioRuntime.create(); - registerChild(runtime); - } + DefaultSessionContext(long pointer) { + super(pointer); + this.runtime = TokioRuntime.create(); + registerChild(runtime); + } - @Override - void doClose(long pointer) throws Exception { - SessionContexts.destroySessionContext(pointer); - } + @Override + void doClose(long pointer) throws Exception { + SessionContexts.destroySessionContext(pointer); + } } diff --git a/datafusion-java/src/main/java/org/apache/arrow/datafusion/DefaultTableProvider.java b/datafusion-java/src/main/java/org/apache/arrow/datafusion/DefaultTableProvider.java new file mode 100644 index 0000000..a07af64 --- /dev/null +++ b/datafusion-java/src/main/java/org/apache/arrow/datafusion/DefaultTableProvider.java @@ -0,0 +1,12 @@ +package org.apache.arrow.datafusion; + +class DefaultTableProvider extends AbstractProxy implements TableProvider { + DefaultTableProvider(long pointer) { + super(pointer); + } + + @Override + void doClose(long pointer) throws Exception { + TableProviders.destroyTableProvider(pointer); + } +} diff --git a/datafusion-java/src/main/java/org/apache/arrow/datafusion/SessionContext.java b/datafusion-java/src/main/java/org/apache/arrow/datafusion/SessionContext.java index 3dcd462..e13d8d6 100644 --- a/datafusion-java/src/main/java/org/apache/arrow/datafusion/SessionContext.java +++ b/datafusion-java/src/main/java/org/apache/arrow/datafusion/SessionContext.java @@ -1,6 +1,7 @@ package org.apache.arrow.datafusion; import java.nio.file.Path; +import java.util.Optional; import java.util.concurrent.CompletableFuture; /** A session context holds resources and is the entrance for obtaining {@link DataFrame} */ @@ -32,6 +33,17 @@ public interface SessionContext extends AutoCloseable, NativeProxy { */ CompletableFuture registerParquet(String name, Path path); + /** + * Registers a TableProvider as a table that can be referenced from SQL statements executed + * against this context. + * + * @param name table reference + * @param tableProvider table provider + * @return the TableProvider previously registered for this reference, if any + * @throws Exception when exception happened + */ + Optional registerTable(String name, TableProvider tableProvider) throws Exception; + /** * Get the runtime associated with this context * diff --git a/datafusion-java/src/main/java/org/apache/arrow/datafusion/TableProvider.java b/datafusion-java/src/main/java/org/apache/arrow/datafusion/TableProvider.java new file mode 100644 index 0000000..acdd5c8 --- /dev/null +++ b/datafusion-java/src/main/java/org/apache/arrow/datafusion/TableProvider.java @@ -0,0 +1,4 @@ +package org.apache.arrow.datafusion; + +/** vague interface that maps to {@code Arc}. */ +public interface TableProvider extends NativeProxy {} diff --git a/datafusion-java/src/main/java/org/apache/arrow/datafusion/TableProviders.java b/datafusion-java/src/main/java/org/apache/arrow/datafusion/TableProviders.java new file mode 100644 index 0000000..63750c8 --- /dev/null +++ b/datafusion-java/src/main/java/org/apache/arrow/datafusion/TableProviders.java @@ -0,0 +1,8 @@ +package org.apache.arrow.datafusion; + +class TableProviders { + + private TableProviders() {} + + static native void destroyTableProvider(long pointer); +} diff --git a/datafusion-jni/Cargo.toml b/datafusion-jni/Cargo.toml index cf445f9..1615ee1 100644 --- a/datafusion-jni/Cargo.toml +++ b/datafusion-jni/Cargo.toml @@ -12,8 +12,8 @@ edition = "2021" [dependencies] jni = "^0.21.0" tokio = "^1.32.0" -arrow = "^22.0" -datafusion = "^12.0" +arrow = "^36.0" +datafusion = "^22.0" [lib] crate_type = ["cdylib"] diff --git a/datafusion-jni/src/context.rs b/datafusion-jni/src/context.rs index 7bd2ced..868c1ff 100644 --- a/datafusion-jni/src/context.rs +++ b/datafusion-jni/src/context.rs @@ -1,8 +1,10 @@ +use datafusion::datasource::TableProvider; use datafusion::execution::context::SessionContext; use datafusion::prelude::{CsvReadOptions, ParquetReadOptions}; use jni::objects::{JClass, JObject, JString, JValue}; use jni::sys::jlong; use jni::JNIEnv; +use std::sync::Arc; use tokio::runtime::Runtime; #[no_mangle] @@ -46,6 +48,32 @@ pub extern "system" fn Java_org_apache_arrow_datafusion_DefaultSessionContext_re }); } +#[no_mangle] +pub extern "system" fn Java_org_apache_arrow_datafusion_DefaultSessionContext_registerTable( + mut env: JNIEnv, + _class: JClass, + pointer: jlong, + name: JString, + table_provider: jlong, +) -> jlong { + let name: String = env + .get_string(&name) + .expect("Couldn't get name as string!") + .into(); + let context = unsafe { &mut *(pointer as *mut SessionContext) }; + let table_provider = unsafe { &*(table_provider as *const Arc) }; + let result = context.register_table(&name, table_provider.clone()); + match result { + Ok(Some(v)) => Box::into_raw(Box::new(v)) as jlong, + Ok(None) => 0, + Err(err) => { + env.throw_new("java/lang/Exception", err.to_string()) + .unwrap(); + 0 + } + } +} + #[no_mangle] pub extern "system" fn Java_org_apache_arrow_datafusion_DefaultSessionContext_registerParquet( mut env: JNIEnv, diff --git a/datafusion-jni/src/dataframe.rs b/datafusion-jni/src/dataframe.rs index 5a4aa7c..d75ec55 100644 --- a/datafusion-jni/src/dataframe.rs +++ b/datafusion-jni/src/dataframe.rs @@ -1,13 +1,12 @@ use arrow::ipc::writer::FileWriter; use datafusion::dataframe::DataFrame; -use datafusion::prelude::SessionContext; +use datafusion::error::DataFusionError; use jni::objects::{JClass, JObject, JString}; use jni::sys::jlong; use jni::JNIEnv; use std::convert::Into; use std::io::BufWriter; use std::io::Cursor; -use std::sync::Arc; use tokio::runtime::Runtime; #[no_mangle] @@ -19,10 +18,11 @@ pub extern "system" fn Java_org_apache_arrow_datafusion_DataFrames_collectDatafr callback: JObject, ) { let runtime = unsafe { &mut *(runtime as *mut Runtime) }; - let dataframe = unsafe { &mut *(dataframe as *mut Arc) }; + let dataframe = unsafe { &mut *(dataframe as *mut DataFrame) }; let schema = dataframe.schema().into(); runtime.block_on(async { let batches = dataframe + .clone() .collect() .await .expect("failed to collect dataframe"); @@ -60,9 +60,9 @@ pub extern "system" fn Java_org_apache_arrow_datafusion_DataFrames_showDataframe callback: JObject, ) { let runtime = unsafe { &mut *(runtime as *mut Runtime) }; - let dataframe = unsafe { &mut *(dataframe as *mut Arc) }; + let dataframe = unsafe { & *(dataframe as *const DataFrame) }; runtime.block_on(async { - let r = dataframe.show().await; + let r = dataframe.clone().show().await; let err_message = match r { Ok(_) => "".to_string(), Err(err) => err.to_string(), @@ -90,13 +90,13 @@ pub extern "system" fn Java_org_apache_arrow_datafusion_DataFrames_writeParquet( callback: JObject, ) { let runtime = unsafe { &mut *(runtime as *mut Runtime) }; - let dataframe = unsafe { &mut *(dataframe as *mut Arc) }; + let dataframe = unsafe { &*(dataframe as *const DataFrame) }; let path: String = env .get_string(&path) .expect("Couldn't get path as string!") .into(); runtime.block_on(async { - let r = dataframe.write_parquet(&path, None).await; + let r = dataframe.clone().write_parquet(&path, None).await; let err_message = match r { Ok(_) => "".to_string(), Err(err) => err.to_string(), @@ -124,55 +124,15 @@ pub extern "system" fn Java_org_apache_arrow_datafusion_DataFrames_writeCsv( callback: JObject, ) { let runtime = unsafe { &mut *(runtime as *mut Runtime) }; - let dataframe = unsafe { &mut *(dataframe as *mut Arc) }; + let dataframe = unsafe { &*(dataframe as *const DataFrame) }; let path: String = env .get_string(&path) .expect("Couldn't get path as string!") .into(); runtime.block_on(async { - let r = dataframe.write_csv(&path).await; - let err_message = match r { - Ok(_) => "".to_string(), - Err(err) => err.to_string(), - }; + dataframe.clone().write_csv(&path).await; let err_message = env - .new_string(err_message) - .expect("Couldn't create java string!"); - env.call_method( - callback, - "accept", - "(Ljava/lang/Object;)V", - &[(&err_message).into()], - ) - .expect("failed to call method"); - }); -} - -#[no_mangle] -pub extern "system" fn Java_org_apache_arrow_datafusion_DataFrames_registerTable( - mut env: JNIEnv, - _class: JClass, - runtime: jlong, - dataframe: jlong, - session: jlong, - name: JString, - callback: JObject, -) { - let runtime = unsafe { &mut *(runtime as *mut Runtime) }; - let dataframe = unsafe { &mut *(dataframe as *mut Arc) }; - let context = unsafe { &mut *(session as *mut SessionContext) }; - let name: String = env - .get_string(&name) - .expect("Couldn't get name as string!") - .into(); - runtime.block_on(async { - let r = context.register_table(name.as_str(), dataframe.clone()); - let err_message = match r { - Ok(_) => "".to_string(), - Err(err) => err.to_string(), - }; - let err_message = env - .new_string(err_message) + .new_string("".to_string()) .expect("Couldn't create java string!"); env.call_method( callback, @@ -190,5 +150,5 @@ pub extern "system" fn Java_org_apache_arrow_datafusion_DataFrames_destroyDataFr _class: JClass, pointer: jlong, ) { - let _ = unsafe { Box::from_raw(pointer as *mut Arc) }; + let _ = unsafe { Box::from_raw(pointer as *mut DataFrame) }; } diff --git a/datafusion-jni/src/lib.rs b/datafusion-jni/src/lib.rs index ac30a3f..dbf3e92 100644 --- a/datafusion-jni/src/lib.rs +++ b/datafusion-jni/src/lib.rs @@ -1,3 +1,4 @@ mod context; mod dataframe; mod runtime; +mod table_provider; diff --git a/datafusion-jni/src/table_provider.rs b/datafusion-jni/src/table_provider.rs new file mode 100644 index 0000000..0db384d --- /dev/null +++ b/datafusion-jni/src/table_provider.rs @@ -0,0 +1,14 @@ +use datafusion::datasource::TableProvider; +use jni::objects::JClass; +use jni::sys::jlong; +use jni::JNIEnv; +use std::sync::Arc; + +#[no_mangle] +pub extern "system" fn Java_org_apache_arrow_datafusion_TableProviders_destroyTableProvider( + _env: JNIEnv, + _class: JClass, + pointer: jlong, +) { + let _ = unsafe { Box::from_raw(pointer as *mut Arc) }; +} From 8a23c93bf0cbd41ccd2691b51fd2649f0b00615a Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Fri, 1 Sep 2023 18:06:21 +0800 Subject: [PATCH 5/7] Revert "write rs" This reverts commit 562fd5b9ebab2dcbb690801bbf78fea854ff0729. --- .../datafusion/examples/ExampleMain.java | 11 +- datafusion-java/build.gradle | 2 +- .../apache/arrow/datafusion/DataFrame.java | 9 +- .../apache/arrow/datafusion/DataFrames.java | 3 +- .../arrow/datafusion/DefaultDataFrame.java | 22 ++- .../datafusion/DefaultSessionContext.java | 157 ++++++++---------- .../datafusion/DefaultTableProvider.java | 12 -- .../arrow/datafusion/SessionContext.java | 12 -- .../arrow/datafusion/TableProvider.java | 4 - .../arrow/datafusion/TableProviders.java | 8 - datafusion-jni/Cargo.toml | 4 +- datafusion-jni/src/context.rs | 28 ---- datafusion-jni/src/dataframe.rs | 62 +++++-- datafusion-jni/src/lib.rs | 1 - datafusion-jni/src/table_provider.rs | 14 -- 15 files changed, 152 insertions(+), 197 deletions(-) delete mode 100644 datafusion-java/src/main/java/org/apache/arrow/datafusion/DefaultTableProvider.java delete mode 100644 datafusion-java/src/main/java/org/apache/arrow/datafusion/TableProvider.java delete mode 100644 datafusion-java/src/main/java/org/apache/arrow/datafusion/TableProviders.java delete mode 100644 datafusion-jni/src/table_provider.rs diff --git a/datafusion-examples/src/main/java/org/apache/arrow/datafusion/examples/ExampleMain.java b/datafusion-examples/src/main/java/org/apache/arrow/datafusion/examples/ExampleMain.java index b2f2e1a..d568405 100644 --- a/datafusion-examples/src/main/java/org/apache/arrow/datafusion/examples/ExampleMain.java +++ b/datafusion-examples/src/main/java/org/apache/arrow/datafusion/examples/ExampleMain.java @@ -55,16 +55,7 @@ public static void main(String[] args) throws Exception { context .sql("select * from test_parquet limit 3") - .thenAccept( - df -> { - try { - boolean previouslyRegistered = - context.registerTable("test_parquet_limited", df.intoView()).isPresent(); - assert !previouslyRegistered; - } catch (Exception e) { - throw new RuntimeException(e); - } - }) + .thenComposeAsync(df -> df.registerTable(context, "test_parquet_limited")) .join(); context.sql("select * from test_parquet_limited").thenComposeAsync(DataFrame::show).join(); diff --git a/datafusion-java/build.gradle b/datafusion-java/build.gradle index 0041a92..0425fc4 100644 --- a/datafusion-java/build.gradle +++ b/datafusion-java/build.gradle @@ -30,7 +30,7 @@ java { withSourcesJar() compileJava { - options.compilerArgs += ["-h", "${layout.buildDirectory.asFile.get()}/target/headers"] + options.compilerArgs += ["-h", "${layout.buildDirectory}/target/headers"] } } diff --git a/datafusion-java/src/main/java/org/apache/arrow/datafusion/DataFrame.java b/datafusion-java/src/main/java/org/apache/arrow/datafusion/DataFrame.java index f2d9750..28c3b4d 100644 --- a/datafusion-java/src/main/java/org/apache/arrow/datafusion/DataFrame.java +++ b/datafusion-java/src/main/java/org/apache/arrow/datafusion/DataFrame.java @@ -45,8 +45,11 @@ public interface DataFrame extends NativeProxy { CompletableFuture writeCsv(Path path); /** - * Converts this DataFrame into a TableProvider that can be registered as a table view using - * {@link SessionContext#registerParquet(String, Path)} + * Register this dataframe as a temporary table. + * + * @param context SessionContext to register table to + * @param name name of the tmp table + * @return null */ - TableProvider intoView(); + CompletableFuture registerTable(SessionContext context, String name); } diff --git a/datafusion-java/src/main/java/org/apache/arrow/datafusion/DataFrames.java b/datafusion-java/src/main/java/org/apache/arrow/datafusion/DataFrames.java index 3791fb1..611f382 100644 --- a/datafusion-java/src/main/java/org/apache/arrow/datafusion/DataFrames.java +++ b/datafusion-java/src/main/java/org/apache/arrow/datafusion/DataFrames.java @@ -20,5 +20,6 @@ static native void writeParquet( static native void writeCsv(long runtime, long dataframe, String path, Consumer callback); - static native long intoView(long dataframe); + static native void registerTable( + long runtime, long dataframe, long context, String name, Consumer callback); } diff --git a/datafusion-java/src/main/java/org/apache/arrow/datafusion/DefaultDataFrame.java b/datafusion-java/src/main/java/org/apache/arrow/datafusion/DefaultDataFrame.java index ae67d9d..05c284e 100644 --- a/datafusion-java/src/main/java/org/apache/arrow/datafusion/DefaultDataFrame.java +++ b/datafusion-java/src/main/java/org/apache/arrow/datafusion/DefaultDataFrame.java @@ -104,11 +104,25 @@ public CompletableFuture writeCsv(Path path) { return future; } - @Override - public TableProvider intoView() { + public CompletableFuture registerTable(SessionContext ctx, String name) { + Runtime runtime = context.getRuntime(); + long runtimePointer = runtime.getPointer(); long dataframe = getPointer(); - long tableProviderPointer = DataFrames.intoView(dataframe); - return new DefaultTableProvider(tableProviderPointer); + long contextPointer = ctx.getPointer(); + CompletableFuture future = new CompletableFuture<>(); + DataFrames.registerTable( + runtimePointer, + dataframe, + contextPointer, + name, + (String errString) -> { + if (containsError(errString)) { + future.completeExceptionally(new RuntimeException(errString)); + } else { + future.complete(null); + } + }); + return future; } @Override diff --git a/datafusion-java/src/main/java/org/apache/arrow/datafusion/DefaultSessionContext.java b/datafusion-java/src/main/java/org/apache/arrow/datafusion/DefaultSessionContext.java index a13e19a..fe4a33f 100644 --- a/datafusion-java/src/main/java/org/apache/arrow/datafusion/DefaultSessionContext.java +++ b/datafusion-java/src/main/java/org/apache/arrow/datafusion/DefaultSessionContext.java @@ -1,107 +1,92 @@ package org.apache.arrow.datafusion; import java.nio.file.Path; -import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; class DefaultSessionContext extends AbstractProxy implements SessionContext { - private static final Logger logger = LoggerFactory.getLogger(DefaultSessionContext.class); - - static native void querySql( - long runtime, long context, String sql, ObjectResultCallback callback); + private static final Logger logger = LoggerFactory.getLogger(DefaultSessionContext.class); - static native void registerCsv( - long runtime, long context, String name, String path, Consumer callback); + static native void querySql( + long runtime, long context, String sql, ObjectResultCallback callback); - static native void registerParquet( - long runtime, long context, String name, String path, Consumer callback); + static native void registerCsv( + long runtime, long context, String name, String path, Consumer callback); - static native long registerTable(long context, String name, long tableProvider) - throws Exception; - - @Override - public CompletableFuture sql(String sql) { - long runtime = getRuntime().getPointer(); - CompletableFuture future = new CompletableFuture<>(); - querySql( - runtime, - getPointer(), - sql, - (errMessage, dataframeId) -> { - if (null != errMessage && !errMessage.isEmpty()) { - future.completeExceptionally(new RuntimeException(errMessage)); - } else { - DefaultDataFrame frame = new DefaultDataFrame(DefaultSessionContext.this, dataframeId); - future.complete(frame); - } - }); - return future; - } + static native void registerParquet( + long runtime, long context, String name, String path, Consumer callback); - @Override - public CompletableFuture registerCsv(String name, Path path) { - long runtime = getRuntime().getPointer(); - CompletableFuture future = new CompletableFuture<>(); - registerCsv( - runtime, - getPointer(), - name, - path.toAbsolutePath().toString(), - (errMessage) -> voidCallback(future, errMessage)); - return future; - } - - @Override - public CompletableFuture registerParquet(String name, Path path) { - long runtime = getRuntime().getPointer(); - CompletableFuture future = new CompletableFuture<>(); - registerParquet( - runtime, - getPointer(), - name, - path.toAbsolutePath().toString(), - (errMessage) -> voidCallback(future, errMessage)); - return future; - } - - @Override - public Optional registerTable(String name, TableProvider tableProvider) - throws Exception { - long previouslyRegistered = registerTable(getPointer(), name, tableProvider.getPointer()); - if (previouslyRegistered == 0) { - return Optional.empty(); - } - return Optional.of(new DefaultTableProvider(previouslyRegistered)); - } - - private void voidCallback(CompletableFuture future, String errMessage) { - if (null != errMessage && !errMessage.isEmpty()) { + @Override + public CompletableFuture sql(String sql) { + long runtime = getRuntime().getPointer(); + CompletableFuture future = new CompletableFuture<>(); + querySql( + runtime, + getPointer(), + sql, + (errMessage, dataframeId) -> { + if (null != errMessage && !errMessage.isEmpty()) { future.completeExceptionally(new RuntimeException(errMessage)); - } else { - future.complete(null); - } + } else { + DefaultDataFrame frame = new DefaultDataFrame(DefaultSessionContext.this, dataframeId); + future.complete(frame); + } + }); + return future; + } + + @Override + public CompletableFuture registerCsv(String name, Path path) { + long runtime = getRuntime().getPointer(); + CompletableFuture future = new CompletableFuture<>(); + registerCsv( + runtime, + getPointer(), + name, + path.toAbsolutePath().toString(), + (errMessage) -> voidCallback(future, errMessage)); + return future; + } + + @Override + public CompletableFuture registerParquet(String name, Path path) { + long runtime = getRuntime().getPointer(); + CompletableFuture future = new CompletableFuture<>(); + registerParquet( + runtime, + getPointer(), + name, + path.toAbsolutePath().toString(), + (errMessage) -> voidCallback(future, errMessage)); + return future; + } + + private void voidCallback(CompletableFuture future, String errMessage) { + if (null != errMessage && !errMessage.isEmpty()) { + future.completeExceptionally(new RuntimeException(errMessage)); + } else { + future.complete(null); } + } - @Override - public Runtime getRuntime() { - return runtime; - } + @Override + public Runtime getRuntime() { + return runtime; + } - private final TokioRuntime runtime; + private final TokioRuntime runtime; - DefaultSessionContext(long pointer) { - super(pointer); - this.runtime = TokioRuntime.create(); - registerChild(runtime); - } + DefaultSessionContext(long pointer) { + super(pointer); + this.runtime = TokioRuntime.create(); + registerChild(runtime); + } - @Override - void doClose(long pointer) throws Exception { - SessionContexts.destroySessionContext(pointer); - } + @Override + void doClose(long pointer) throws Exception { + SessionContexts.destroySessionContext(pointer); + } } diff --git a/datafusion-java/src/main/java/org/apache/arrow/datafusion/DefaultTableProvider.java b/datafusion-java/src/main/java/org/apache/arrow/datafusion/DefaultTableProvider.java deleted file mode 100644 index a07af64..0000000 --- a/datafusion-java/src/main/java/org/apache/arrow/datafusion/DefaultTableProvider.java +++ /dev/null @@ -1,12 +0,0 @@ -package org.apache.arrow.datafusion; - -class DefaultTableProvider extends AbstractProxy implements TableProvider { - DefaultTableProvider(long pointer) { - super(pointer); - } - - @Override - void doClose(long pointer) throws Exception { - TableProviders.destroyTableProvider(pointer); - } -} diff --git a/datafusion-java/src/main/java/org/apache/arrow/datafusion/SessionContext.java b/datafusion-java/src/main/java/org/apache/arrow/datafusion/SessionContext.java index e13d8d6..3dcd462 100644 --- a/datafusion-java/src/main/java/org/apache/arrow/datafusion/SessionContext.java +++ b/datafusion-java/src/main/java/org/apache/arrow/datafusion/SessionContext.java @@ -1,7 +1,6 @@ package org.apache.arrow.datafusion; import java.nio.file.Path; -import java.util.Optional; import java.util.concurrent.CompletableFuture; /** A session context holds resources and is the entrance for obtaining {@link DataFrame} */ @@ -33,17 +32,6 @@ public interface SessionContext extends AutoCloseable, NativeProxy { */ CompletableFuture registerParquet(String name, Path path); - /** - * Registers a TableProvider as a table that can be referenced from SQL statements executed - * against this context. - * - * @param name table reference - * @param tableProvider table provider - * @return the TableProvider previously registered for this reference, if any - * @throws Exception when exception happened - */ - Optional registerTable(String name, TableProvider tableProvider) throws Exception; - /** * Get the runtime associated with this context * diff --git a/datafusion-java/src/main/java/org/apache/arrow/datafusion/TableProvider.java b/datafusion-java/src/main/java/org/apache/arrow/datafusion/TableProvider.java deleted file mode 100644 index acdd5c8..0000000 --- a/datafusion-java/src/main/java/org/apache/arrow/datafusion/TableProvider.java +++ /dev/null @@ -1,4 +0,0 @@ -package org.apache.arrow.datafusion; - -/** vague interface that maps to {@code Arc}. */ -public interface TableProvider extends NativeProxy {} diff --git a/datafusion-java/src/main/java/org/apache/arrow/datafusion/TableProviders.java b/datafusion-java/src/main/java/org/apache/arrow/datafusion/TableProviders.java deleted file mode 100644 index 63750c8..0000000 --- a/datafusion-java/src/main/java/org/apache/arrow/datafusion/TableProviders.java +++ /dev/null @@ -1,8 +0,0 @@ -package org.apache.arrow.datafusion; - -class TableProviders { - - private TableProviders() {} - - static native void destroyTableProvider(long pointer); -} diff --git a/datafusion-jni/Cargo.toml b/datafusion-jni/Cargo.toml index 1615ee1..cf445f9 100644 --- a/datafusion-jni/Cargo.toml +++ b/datafusion-jni/Cargo.toml @@ -12,8 +12,8 @@ edition = "2021" [dependencies] jni = "^0.21.0" tokio = "^1.32.0" -arrow = "^36.0" -datafusion = "^22.0" +arrow = "^22.0" +datafusion = "^12.0" [lib] crate_type = ["cdylib"] diff --git a/datafusion-jni/src/context.rs b/datafusion-jni/src/context.rs index 868c1ff..7bd2ced 100644 --- a/datafusion-jni/src/context.rs +++ b/datafusion-jni/src/context.rs @@ -1,10 +1,8 @@ -use datafusion::datasource::TableProvider; use datafusion::execution::context::SessionContext; use datafusion::prelude::{CsvReadOptions, ParquetReadOptions}; use jni::objects::{JClass, JObject, JString, JValue}; use jni::sys::jlong; use jni::JNIEnv; -use std::sync::Arc; use tokio::runtime::Runtime; #[no_mangle] @@ -48,32 +46,6 @@ pub extern "system" fn Java_org_apache_arrow_datafusion_DefaultSessionContext_re }); } -#[no_mangle] -pub extern "system" fn Java_org_apache_arrow_datafusion_DefaultSessionContext_registerTable( - mut env: JNIEnv, - _class: JClass, - pointer: jlong, - name: JString, - table_provider: jlong, -) -> jlong { - let name: String = env - .get_string(&name) - .expect("Couldn't get name as string!") - .into(); - let context = unsafe { &mut *(pointer as *mut SessionContext) }; - let table_provider = unsafe { &*(table_provider as *const Arc) }; - let result = context.register_table(&name, table_provider.clone()); - match result { - Ok(Some(v)) => Box::into_raw(Box::new(v)) as jlong, - Ok(None) => 0, - Err(err) => { - env.throw_new("java/lang/Exception", err.to_string()) - .unwrap(); - 0 - } - } -} - #[no_mangle] pub extern "system" fn Java_org_apache_arrow_datafusion_DefaultSessionContext_registerParquet( mut env: JNIEnv, diff --git a/datafusion-jni/src/dataframe.rs b/datafusion-jni/src/dataframe.rs index d75ec55..5a4aa7c 100644 --- a/datafusion-jni/src/dataframe.rs +++ b/datafusion-jni/src/dataframe.rs @@ -1,12 +1,13 @@ use arrow::ipc::writer::FileWriter; use datafusion::dataframe::DataFrame; -use datafusion::error::DataFusionError; +use datafusion::prelude::SessionContext; use jni::objects::{JClass, JObject, JString}; use jni::sys::jlong; use jni::JNIEnv; use std::convert::Into; use std::io::BufWriter; use std::io::Cursor; +use std::sync::Arc; use tokio::runtime::Runtime; #[no_mangle] @@ -18,11 +19,10 @@ pub extern "system" fn Java_org_apache_arrow_datafusion_DataFrames_collectDatafr callback: JObject, ) { let runtime = unsafe { &mut *(runtime as *mut Runtime) }; - let dataframe = unsafe { &mut *(dataframe as *mut DataFrame) }; + let dataframe = unsafe { &mut *(dataframe as *mut Arc) }; let schema = dataframe.schema().into(); runtime.block_on(async { let batches = dataframe - .clone() .collect() .await .expect("failed to collect dataframe"); @@ -60,9 +60,9 @@ pub extern "system" fn Java_org_apache_arrow_datafusion_DataFrames_showDataframe callback: JObject, ) { let runtime = unsafe { &mut *(runtime as *mut Runtime) }; - let dataframe = unsafe { & *(dataframe as *const DataFrame) }; + let dataframe = unsafe { &mut *(dataframe as *mut Arc) }; runtime.block_on(async { - let r = dataframe.clone().show().await; + let r = dataframe.show().await; let err_message = match r { Ok(_) => "".to_string(), Err(err) => err.to_string(), @@ -90,13 +90,13 @@ pub extern "system" fn Java_org_apache_arrow_datafusion_DataFrames_writeParquet( callback: JObject, ) { let runtime = unsafe { &mut *(runtime as *mut Runtime) }; - let dataframe = unsafe { &*(dataframe as *const DataFrame) }; + let dataframe = unsafe { &mut *(dataframe as *mut Arc) }; let path: String = env .get_string(&path) .expect("Couldn't get path as string!") .into(); runtime.block_on(async { - let r = dataframe.clone().write_parquet(&path, None).await; + let r = dataframe.write_parquet(&path, None).await; let err_message = match r { Ok(_) => "".to_string(), Err(err) => err.to_string(), @@ -124,15 +124,55 @@ pub extern "system" fn Java_org_apache_arrow_datafusion_DataFrames_writeCsv( callback: JObject, ) { let runtime = unsafe { &mut *(runtime as *mut Runtime) }; - let dataframe = unsafe { &*(dataframe as *const DataFrame) }; + let dataframe = unsafe { &mut *(dataframe as *mut Arc) }; let path: String = env .get_string(&path) .expect("Couldn't get path as string!") .into(); runtime.block_on(async { - dataframe.clone().write_csv(&path).await; + let r = dataframe.write_csv(&path).await; + let err_message = match r { + Ok(_) => "".to_string(), + Err(err) => err.to_string(), + }; let err_message = env - .new_string("".to_string()) + .new_string(err_message) + .expect("Couldn't create java string!"); + env.call_method( + callback, + "accept", + "(Ljava/lang/Object;)V", + &[(&err_message).into()], + ) + .expect("failed to call method"); + }); +} + +#[no_mangle] +pub extern "system" fn Java_org_apache_arrow_datafusion_DataFrames_registerTable( + mut env: JNIEnv, + _class: JClass, + runtime: jlong, + dataframe: jlong, + session: jlong, + name: JString, + callback: JObject, +) { + let runtime = unsafe { &mut *(runtime as *mut Runtime) }; + let dataframe = unsafe { &mut *(dataframe as *mut Arc) }; + let context = unsafe { &mut *(session as *mut SessionContext) }; + let name: String = env + .get_string(&name) + .expect("Couldn't get name as string!") + .into(); + runtime.block_on(async { + let r = context.register_table(name.as_str(), dataframe.clone()); + let err_message = match r { + Ok(_) => "".to_string(), + Err(err) => err.to_string(), + }; + let err_message = env + .new_string(err_message) .expect("Couldn't create java string!"); env.call_method( callback, @@ -150,5 +190,5 @@ pub extern "system" fn Java_org_apache_arrow_datafusion_DataFrames_destroyDataFr _class: JClass, pointer: jlong, ) { - let _ = unsafe { Box::from_raw(pointer as *mut DataFrame) }; + let _ = unsafe { Box::from_raw(pointer as *mut Arc) }; } diff --git a/datafusion-jni/src/lib.rs b/datafusion-jni/src/lib.rs index dbf3e92..ac30a3f 100644 --- a/datafusion-jni/src/lib.rs +++ b/datafusion-jni/src/lib.rs @@ -1,4 +1,3 @@ mod context; mod dataframe; mod runtime; -mod table_provider; diff --git a/datafusion-jni/src/table_provider.rs b/datafusion-jni/src/table_provider.rs deleted file mode 100644 index 0db384d..0000000 --- a/datafusion-jni/src/table_provider.rs +++ /dev/null @@ -1,14 +0,0 @@ -use datafusion::datasource::TableProvider; -use jni::objects::JClass; -use jni::sys::jlong; -use jni::JNIEnv; -use std::sync::Arc; - -#[no_mangle] -pub extern "system" fn Java_org_apache_arrow_datafusion_TableProviders_destroyTableProvider( - _env: JNIEnv, - _class: JClass, - pointer: jlong, -) { - let _ = unsafe { Box::from_raw(pointer as *mut Arc) }; -} From 722a961de2fdce833821193e29addc159b114ce9 Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Fri, 1 Sep 2023 18:06:58 +0800 Subject: [PATCH 6/7] revert --- datafusion-java/build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion-java/build.gradle b/datafusion-java/build.gradle index 0425fc4..892daaf 100644 --- a/datafusion-java/build.gradle +++ b/datafusion-java/build.gradle @@ -30,7 +30,7 @@ java { withSourcesJar() compileJava { - options.compilerArgs += ["-h", "${layout.buildDirectory}/target/headers"] + options.compilerArgs += ["-h", "${buildDir}/target/headers"] } } From c790cd823dca817edfd6e729a8326b8992869995 Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Fri, 1 Sep 2023 21:44:32 +0800 Subject: [PATCH 7/7] fix void --- datafusion-jni/src/context.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/datafusion-jni/src/context.rs b/datafusion-jni/src/context.rs index 7bd2ced..c9f5105 100644 --- a/datafusion-jni/src/context.rs +++ b/datafusion-jni/src/context.rs @@ -1,6 +1,6 @@ use datafusion::execution::context::SessionContext; use datafusion::prelude::{CsvReadOptions, ParquetReadOptions}; -use jni::objects::{JClass, JObject, JString, JValue}; +use jni::objects::{JClass, JObject, JString}; use jni::sys::jlong; use jni::JNIEnv; use tokio::runtime::Runtime; @@ -106,12 +106,15 @@ pub extern "system" fn Java_org_apache_arrow_datafusion_DefaultSessionContext_qu let query_result = context.sql(&sql).await; match query_result { Ok(v) => { + let empty_str = env + .new_string("".to_string()) + .expect("Couldn't create java string!"); let dataframe = Box::into_raw(Box::new(v)) as jlong; env.call_method( callback, "callback", "(Ljava/lang/String;J)V", - &[JValue::Void, dataframe.into()], + &[(&empty_str).into(), dataframe.into()], ) } Err(err) => {