From 4c1752b5a60c9051a686c2e2f9a6af871105d4fe Mon Sep 17 00:00:00 2001 From: tison Date: Mon, 18 Mar 2024 18:10:31 +0800 Subject: [PATCH] feat(bindings/java): explicit async runtime (#4376) * feat: explicit async runtime Signed-off-by: tison * add executor param everywhere Signed-off-by: tison * pipe Signed-off-by: tison * fixup Signed-off-by: tison * add test Signed-off-by: tison * license header Signed-off-by: tison * tidy Signed-off-by: tison * docs and errors Signed-off-by: tison --------- Signed-off-by: tison --- bindings/java/.gitignore | 2 + bindings/java/src/blocking_operator.rs | 22 +-- bindings/java/src/executor.rs | 152 +++++++++++++++ bindings/java/src/lib.rs | 63 +----- .../org/apache/opendal/AsyncExecutor.java | 40 ++++ .../java/org/apache/opendal/Operator.java | 85 ++++---- bindings/java/src/operator.rs | 183 ++++++++++++------ .../opendal/test/AsyncExecutorTest.java | 49 +++++ bindings/java/src/utility.rs | 2 +- 9 files changed, 430 insertions(+), 168 deletions(-) create mode 100644 bindings/java/src/executor.rs create mode 100644 bindings/java/src/main/java/org/apache/opendal/AsyncExecutor.java create mode 100644 bindings/java/src/test/java/org/apache/opendal/test/AsyncExecutorTest.java diff --git a/bindings/java/.gitignore b/bindings/java/.gitignore index 73d5f145287b..07a2ebae0457 100644 --- a/bindings/java/.gitignore +++ b/bindings/java/.gitignore @@ -1,2 +1,4 @@ .mvn/wrapper/maven-wrapper.jar Cargo.lock + +*.log diff --git a/bindings/java/src/blocking_operator.rs b/bindings/java/src/blocking_operator.rs index f2d77302c0c9..47b1abd53eb8 100644 --- a/bindings/java/src/blocking_operator.rs +++ b/bindings/java/src/blocking_operator.rs @@ -34,7 +34,7 @@ use crate::Result; /// # Safety /// -/// This function should not be called before the Operator are ready. +/// This function should not be called before the Operator is ready. #[no_mangle] pub unsafe extern "system" fn Java_org_apache_opendal_BlockingOperator_disposeInternal( _: JNIEnv, @@ -46,7 +46,7 @@ pub unsafe extern "system" fn Java_org_apache_opendal_BlockingOperator_disposeIn /// # Safety /// -/// This function should not be called before the Operator are ready. +/// This function should not be called before the Operator is ready. #[no_mangle] pub unsafe extern "system" fn Java_org_apache_opendal_BlockingOperator_duplicate( _: JNIEnv, @@ -59,7 +59,7 @@ pub unsafe extern "system" fn Java_org_apache_opendal_BlockingOperator_duplicate /// # Safety /// -/// This function should not be called before the Operator are ready. +/// This function should not be called before the Operator is ready. #[no_mangle] pub unsafe extern "system" fn Java_org_apache_opendal_BlockingOperator_read( mut env: JNIEnv, @@ -82,7 +82,7 @@ fn intern_read(env: &mut JNIEnv, op: &mut BlockingOperator, path: JString) -> Re /// # Safety /// -/// This function should not be called before the Operator are ready. +/// This function should not be called before the Operator is ready. #[no_mangle] pub unsafe extern "system" fn Java_org_apache_opendal_BlockingOperator_write( mut env: JNIEnv, @@ -109,7 +109,7 @@ fn intern_write( /// # Safety /// -/// This function should not be called before the Operator are ready. +/// This function should not be called before the Operator is ready. #[no_mangle] pub unsafe extern "system" fn Java_org_apache_opendal_BlockingOperator_stat( mut env: JNIEnv, @@ -131,7 +131,7 @@ fn intern_stat(env: &mut JNIEnv, op: &mut BlockingOperator, path: JString) -> Re /// # Safety /// -/// This function should not be called before the Operator are ready. +/// This function should not be called before the Operator is ready. #[no_mangle] pub unsafe extern "system" fn Java_org_apache_opendal_BlockingOperator_delete( mut env: JNIEnv, @@ -151,7 +151,7 @@ fn intern_delete(env: &mut JNIEnv, op: &mut BlockingOperator, path: JString) -> /// # Safety /// -/// This function should not be called before the Operator are ready. +/// This function should not be called before the Operator is ready. #[no_mangle] pub unsafe extern "system" fn Java_org_apache_opendal_BlockingOperator_createDir( mut env: JNIEnv, @@ -171,7 +171,7 @@ fn intern_create_dir(env: &mut JNIEnv, op: &mut BlockingOperator, path: JString) /// # Safety /// -/// This function should not be called before the Operator are ready. +/// This function should not be called before the Operator is ready. #[no_mangle] pub unsafe extern "system" fn Java_org_apache_opendal_BlockingOperator_copy( mut env: JNIEnv, @@ -199,7 +199,7 @@ fn intern_copy( /// # Safety /// -/// This function should not be called before the Operator are ready. +/// This function should not be called before the Operator is ready. #[no_mangle] pub unsafe extern "system" fn Java_org_apache_opendal_BlockingOperator_rename( mut env: JNIEnv, @@ -227,7 +227,7 @@ fn intern_rename( /// # Safety /// -/// This function should not be called before the Operator are ready. +/// This function should not be called before the Operator is ready. #[no_mangle] pub unsafe extern "system" fn Java_org_apache_opendal_BlockingOperator_removeAll( mut env: JNIEnv, @@ -248,7 +248,7 @@ fn intern_remove_all(env: &mut JNIEnv, op: &mut BlockingOperator, path: JString) /// # Safety /// -/// This function should not be called before the Operator are ready. +/// This function should not be called before the Operator is ready. #[no_mangle] pub unsafe extern "system" fn Java_org_apache_opendal_BlockingOperator_list( mut env: JNIEnv, diff --git a/bindings/java/src/executor.rs b/bindings/java/src/executor.rs new file mode 100644 index 000000000000..506e86b2335d --- /dev/null +++ b/bindings/java/src/executor.rs @@ -0,0 +1,152 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::cell::RefCell; +use std::ffi::c_void; +use std::future::Future; + +use jni::objects::{JClass, JObject}; +use jni::sys::jlong; +use jni::{JNIEnv, JavaVM}; +use once_cell::sync::OnceCell; +use tokio::task::JoinHandle; + +use crate::Result; + +static mut RUNTIME: OnceCell = OnceCell::new(); +thread_local! { + static ENV: RefCell> = RefCell::new(None); +} + +/// # Safety +/// +/// This function could be only called by java vm when unload this lib. +#[no_mangle] +pub unsafe extern "system" fn JNI_OnUnload(_: JavaVM, _: *mut c_void) { + let _ = RUNTIME.take(); +} + +/// # Safety +/// +/// This function could be only called when the lib is loaded and within an executor thread. +pub(crate) unsafe fn get_current_env<'local>() -> JNIEnv<'local> { + let env = ENV + .with(|cell| *cell.borrow_mut()) + .expect("env must be available"); + JNIEnv::from_raw(env).expect("env must be valid") +} + +pub enum Executor { + Tokio(tokio::runtime::Runtime), +} + +impl Executor { + pub fn enter_with(&self, f: F) -> R + where + F: FnOnce() -> R, + { + match self { + Executor::Tokio(e) => { + let _guard = e.enter(); + f() + } + } + } + + pub fn spawn(&self, future: F) -> JoinHandle + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + match self { + Executor::Tokio(e) => e.spawn(future), + } + } +} + +#[no_mangle] +pub extern "system" fn Java_org_apache_opendal_AsyncExecutor_makeTokioExecutor( + mut env: JNIEnv, + _: JClass, + cores: usize, +) -> jlong { + make_tokio_executor(&mut env, cores) + .map(|executor| Box::into_raw(Box::new(executor)) as jlong) + .unwrap_or_else(|e| { + e.throw(&mut env); + 0 + }) +} + +/// # Safety +/// +/// This function should not be called before the AsyncExecutor is ready. +#[no_mangle] +pub unsafe extern "system" fn Java_org_apache_opendal_AsyncExecutor_disposeInternal( + _: JNIEnv, + _: JObject, + executor: *mut Executor, +) { + drop(Box::from_raw(executor)); +} + +pub(crate) fn make_tokio_executor(env: &mut JNIEnv, cores: usize) -> Result { + let vm = env.get_java_vm().expect("JavaVM must be available"); + let executor = tokio::runtime::Builder::new_multi_thread() + .worker_threads(cores) + .on_thread_start(move || { + ENV.with(|cell| { + let env = vm + .attach_current_thread_as_daemon() + .expect("attach thread must succeed"); + *cell.borrow_mut() = Some(env.get_raw()); + }) + }) + .enable_all() + .build() + .map_err(|e| { + opendal::Error::new( + opendal::ErrorKind::Unexpected, + "Failed to create tokio runtime.", + ) + .set_source(e) + })?; + Ok(Executor::Tokio(executor)) +} + +/// # Safety +/// +/// This function could be only when the lib is loaded. +pub(crate) unsafe fn executor_or_default<'a>( + env: &mut JNIEnv<'a>, + executor: *const Executor, +) -> &'a Executor { + if executor.is_null() { + default_executor(env) + } else { + &*executor + } +} + +/// # Safety +/// +/// This function could be only when the lib is loaded. +unsafe fn default_executor<'a>(env: &mut JNIEnv<'a>) -> &'a Executor { + RUNTIME + .get_or_try_init(|| make_tokio_executor(env, num_cpus::get())) + .expect("default executor must be able to initialize") +} diff --git a/bindings/java/src/lib.rs b/bindings/java/src/lib.rs index d95c0debd71e..63d4c347aea0 100644 --- a/bindings/java/src/lib.rs +++ b/bindings/java/src/lib.rs @@ -15,19 +15,15 @@ // specific language governing permissions and limitations // under the License. -use std::cell::RefCell; use std::collections::HashMap; -use std::ffi::c_void; use jni::objects::JObject; use jni::objects::JValue; use jni::sys::jboolean; use jni::sys::jint; use jni::sys::jlong; -use jni::sys::JNI_VERSION_1_8; use jni::JNIEnv; -use jni::JavaVM; -use once_cell::sync::OnceCell; + use opendal::raw::PresignedRequest; use opendal::Capability; use opendal::Entry; @@ -35,72 +31,17 @@ use opendal::EntryMode; use opendal::Metadata; use opendal::Metakey; use opendal::OperatorInfo; -use tokio::runtime::Builder; -use tokio::runtime::Runtime; mod blocking_operator; mod convert; mod error; +mod executor; mod layer; mod operator; mod utility; pub(crate) type Result = std::result::Result; -static mut RUNTIME: OnceCell = OnceCell::new(); -thread_local! { - static ENV: RefCell> = RefCell::new(None); -} - -/// # Safety -/// -/// This function could be only called by java vm when load this lib. -#[no_mangle] -pub unsafe extern "system" fn JNI_OnLoad(vm: JavaVM, _: *mut c_void) -> jint { - RUNTIME - .set( - Builder::new_multi_thread() - .worker_threads(num_cpus::get()) - .on_thread_start(move || { - ENV.with(|cell| { - let env = vm.attach_current_thread_as_daemon().unwrap(); - *cell.borrow_mut() = Some(env.get_raw()); - }) - }) - .enable_all() - .build() - .unwrap(), - ) - .unwrap(); - - JNI_VERSION_1_8 -} - -/// # Safety -/// -/// This function could be only called by java vm when unload this lib. -#[no_mangle] -pub unsafe extern "system" fn JNI_OnUnload(_: JavaVM, _: *mut c_void) { - if let Some(r) = RUNTIME.take() { - r.shutdown_background() - } -} - -/// # Safety -/// -/// This function could be only when the lib is loaded and within a RUNTIME-spawned thread. -unsafe fn get_current_env<'local>() -> JNIEnv<'local> { - let env = ENV.with(|cell| *cell.borrow_mut()).unwrap(); - JNIEnv::from_raw(env).unwrap() -} - -/// # Safety -/// -/// This function could be only when the lib is loaded. -unsafe fn get_global_runtime<'local>() -> &'local Runtime { - RUNTIME.get_unchecked() -} - fn make_presigned_request<'a>(env: &mut JNIEnv<'a>, req: PresignedRequest) -> Result> { let method = env.new_string(req.method().as_str())?; let uri = env.new_string(req.uri().to_string())?; diff --git a/bindings/java/src/main/java/org/apache/opendal/AsyncExecutor.java b/bindings/java/src/main/java/org/apache/opendal/AsyncExecutor.java new file mode 100644 index 000000000000..227cb323377d --- /dev/null +++ b/bindings/java/src/main/java/org/apache/opendal/AsyncExecutor.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.opendal; + +/** + * AsyncExecutor represents an underneath OpenDAL executor that runs async tasks spawned in the Rust world. + * + *

If the executor is passed to construct operators, the executor must outlive the operators.

+ */ +public class AsyncExecutor extends NativeObject { + public static AsyncExecutor createTokioExecutor(int cores) { + return new AsyncExecutor(makeTokioExecutor(cores)); + } + + private AsyncExecutor(long nativeHandle) { + super(nativeHandle); + } + + @Override + protected native void disposeInternal(long handle); + + private static native long makeTokioExecutor(int cores); +} diff --git a/bindings/java/src/main/java/org/apache/opendal/Operator.java b/bindings/java/src/main/java/org/apache/opendal/Operator.java index cfbe6fa4bf12..a6d84b7b11fb 100644 --- a/bindings/java/src/main/java/org/apache/opendal/Operator.java +++ b/bindings/java/src/main/java/org/apache/opendal/Operator.java @@ -104,6 +104,8 @@ private static CompletableFuture take(long requestId) { public final OperatorInfo info; + private final long executorHandle; + /** * Construct an OpenDAL operator: * @@ -115,14 +117,31 @@ private static CompletableFuture take(long requestId) { * @param map a map of properties to construct the underneath operator. */ public static Operator of(String schema, Map map) { - final long nativeHandle = constructor(schema, map); + return of(schema, map, null); + } + + /** + * Construct an OpenDAL operator: + * + *

+ * You can find all possible schemes here + * and see what config options each service supports. + * + * @param schema the name of the underneath service to access data from. + * @param map a map of properties to construct the underneath operator. + * @param executor the underneath executor to run async operations; {@code null} to use a default global executor. + */ + public static Operator of(String schema, Map map, AsyncExecutor executor) { + final long executorHandle = executor != null ? executor.nativeHandle : 0; + final long nativeHandle = constructor(executorHandle, schema, map); final OperatorInfo info = makeOperatorInfo(nativeHandle); - return new Operator(nativeHandle, info); + return new Operator(nativeHandle, executorHandle, info); } - private Operator(long nativeHandle, OperatorInfo info) { + private Operator(long nativeHandle, long executorHandle, OperatorInfo info) { super(nativeHandle); this.info = info; + this.executorHandle = executorHandle; } /** @@ -136,12 +155,12 @@ private Operator(long nativeHandle, OperatorInfo info) { */ public Operator duplicate() { final long nativeHandle = duplicate(this.nativeHandle); - return new Operator(nativeHandle, this.info); + return new Operator(nativeHandle, this.executorHandle, this.info); } public Operator layer(Layer layer) { final long nativeHandle = layer.layer(this.nativeHandle); - return new Operator(nativeHandle, makeOperatorInfo(nativeHandle)); + return new Operator(nativeHandle, this.executorHandle, makeOperatorInfo(nativeHandle)); } public BlockingOperator blocking() { @@ -155,7 +174,7 @@ public CompletableFuture write(String path, String content) { } public CompletableFuture write(String path, byte[] content) { - final long requestId = write(nativeHandle, path, content); + final long requestId = write(nativeHandle, executorHandle, path, content); return AsyncRegistry.take(requestId); } @@ -164,63 +183,63 @@ public CompletableFuture append(String path, String content) { } public CompletableFuture append(String path, byte[] content) { - final long requestId = append(nativeHandle, path, content); + final long requestId = append(nativeHandle, executorHandle, path, content); return AsyncRegistry.take(requestId); } public CompletableFuture stat(String path) { - final long requestId = stat(nativeHandle, path); + final long requestId = stat(nativeHandle, executorHandle, path); return AsyncRegistry.take(requestId); } public CompletableFuture read(String path) { - final long requestId = read(nativeHandle, path); + final long requestId = read(nativeHandle, executorHandle, path); return AsyncRegistry.take(requestId); } public CompletableFuture presignRead(String path, Duration duration) { - final long requestId = presignRead(nativeHandle, path, duration.toNanos()); + final long requestId = presignRead(nativeHandle, executorHandle, path, duration.toNanos()); return AsyncRegistry.take(requestId); } public CompletableFuture presignWrite(String path, Duration duration) { - final long requestId = presignWrite(nativeHandle, path, duration.toNanos()); + final long requestId = presignWrite(nativeHandle, executorHandle, path, duration.toNanos()); return AsyncRegistry.take(requestId); } public CompletableFuture presignStat(String path, Duration duration) { - final long requestId = presignStat(nativeHandle, path, duration.toNanos()); + final long requestId = presignStat(nativeHandle, executorHandle, path, duration.toNanos()); return AsyncRegistry.take(requestId); } public CompletableFuture delete(String path) { - final long requestId = delete(nativeHandle, path); + final long requestId = delete(nativeHandle, executorHandle, path); return AsyncRegistry.take(requestId); } public CompletableFuture createDir(String path) { - final long requestId = createDir(nativeHandle, path); + final long requestId = createDir(nativeHandle, executorHandle, path); return AsyncRegistry.take(requestId); } public CompletableFuture copy(String sourcePath, String targetPath) { - final long requestId = copy(nativeHandle, sourcePath, targetPath); + final long requestId = copy(nativeHandle, executorHandle, sourcePath, targetPath); return AsyncRegistry.take(requestId); } public CompletableFuture rename(String sourcePath, String targetPath) { - final long requestId = rename(nativeHandle, sourcePath, targetPath); + final long requestId = rename(nativeHandle, executorHandle, sourcePath, targetPath); return AsyncRegistry.take(requestId); } public CompletableFuture removeAll(String path) { - final long requestId = removeAll(nativeHandle, path); + final long requestId = removeAll(nativeHandle, executorHandle, path); return AsyncRegistry.take(requestId); } public CompletableFuture> list(String path) { - final long requestid = list(nativeHandle, path); - final CompletableFuture result = AsyncRegistry.take(requestid); + final long requestId = list(nativeHandle, executorHandle, path); + final CompletableFuture result = AsyncRegistry.take(requestId); return Objects.requireNonNull(result).thenApplyAsync(Arrays::asList); } @@ -229,35 +248,35 @@ public CompletableFuture> list(String path) { private static native long duplicate(long nativeHandle); - private static native long constructor(String schema, Map map); + private static native long constructor(long executorHandle, String schema, Map map); - private static native long read(long nativeHandle, String path); + private static native long read(long nativeHandle, long executorHandle, String path); - private static native long write(long nativeHandle, String path, byte[] content); + private static native long write(long nativeHandle, long executorHandle, String path, byte[] content); - private static native long append(long nativeHandle, String path, byte[] content); + private static native long append(long nativeHandle, long executorHandle, String path, byte[] content); - private static native long delete(long nativeHandle, String path); + private static native long delete(long nativeHandle, long executorHandle, String path); - private static native long stat(long nativeHandle, String path); + private static native long stat(long nativeHandle, long executorHandle, String path); - private static native long presignRead(long nativeHandle, String path, long duration); + private static native long presignRead(long nativeHandle, long executorHandle, String path, long duration); - private static native long presignWrite(long nativeHandle, String path, long duration); + private static native long presignWrite(long nativeHandle, long executorHandle, String path, long duration); - private static native long presignStat(long nativeHandle, String path, long duration); + private static native long presignStat(long nativeHandle, long executorHandle, String path, long duration); private static native OperatorInfo makeOperatorInfo(long nativeHandle); private static native long makeBlockingOp(long nativeHandle); - private static native long createDir(long nativeHandle, String path); + private static native long createDir(long nativeHandle, long executorHandle, String path); - private static native long copy(long nativeHandle, String sourcePath, String targetPath); + private static native long copy(long nativeHandle, long executorHandle, String sourcePath, String targetPath); - private static native long rename(long nativeHandle, String sourcePath, String targetPath); + private static native long rename(long nativeHandle, long executorHandle, String sourcePath, String targetPath); - private static native long removeAll(long nativeHandle, String path); + private static native long removeAll(long nativeHandle, long executorHandle, String path); - private static native long list(long nativeHandle, String path); + private static native long list(long nativeHandle, long executorHandle, String path); } diff --git a/bindings/java/src/operator.rs b/bindings/java/src/operator.rs index 5d05f2102690..30ff8b76d9dd 100644 --- a/bindings/java/src/operator.rs +++ b/bindings/java/src/operator.rs @@ -35,8 +35,7 @@ use opendal::Scheme; use crate::convert::jmap_to_hashmap; use crate::convert::jstring_to_string; -use crate::get_current_env; -use crate::get_global_runtime; +use crate::executor::{executor_or_default, get_current_env, Executor}; use crate::make_entry; use crate::make_metadata; use crate::make_operator_info; @@ -47,29 +46,36 @@ use crate::Result; pub extern "system" fn Java_org_apache_opendal_Operator_constructor( mut env: JNIEnv, _: JClass, + executor: *const Executor, scheme: JString, map: JObject, ) -> jlong { - intern_constructor(&mut env, scheme, map).unwrap_or_else(|e| { + intern_constructor(&mut env, executor, scheme, map).unwrap_or_else(|e| { e.throw(&mut env); 0 }) } -fn intern_constructor(env: &mut JNIEnv, scheme: JString, map: JObject) -> Result { +fn intern_constructor( + env: &mut JNIEnv, + executor: *const Executor, + scheme: JString, + map: JObject, +) -> Result { let scheme = Scheme::from_str(jstring_to_string(env, &scheme)?.as_str())?; let map = jmap_to_hashmap(env, &map)?; let mut op = Operator::via_map(scheme, map)?; if !op.info().full_capability().blocking { - let _guard = unsafe { get_global_runtime() }.enter(); - op = op.layer(BlockingLayer::create()?); + let layer = + unsafe { executor_or_default(env, executor) }.enter_with(BlockingLayer::create)?; + op = op.layer(layer); } Ok(Box::into_raw(Box::new(op)) as jlong) } /// # Safety /// -/// This function should not be called before the Operator are ready. +/// This function should not be called before the Operator is ready. #[no_mangle] pub unsafe extern "system" fn Java_org_apache_opendal_Operator_duplicate( _: JNIEnv, @@ -82,7 +88,7 @@ pub unsafe extern "system" fn Java_org_apache_opendal_Operator_duplicate( /// # Safety /// -/// This function should not be called before the Operator are ready. +/// This function should not be called before the Operator is ready. #[no_mangle] pub unsafe extern "system" fn Java_org_apache_opendal_Operator_disposeInternal( _: JNIEnv, @@ -94,16 +100,17 @@ pub unsafe extern "system" fn Java_org_apache_opendal_Operator_disposeInternal( /// # Safety /// -/// This function should not be called before the Operator are ready. +/// This function should not be called before the Operator is ready. #[no_mangle] pub unsafe extern "system" fn Java_org_apache_opendal_Operator_write( mut env: JNIEnv, _: JClass, op: *mut Operator, + executor: *const Executor, path: JString, content: JByteArray, ) -> jlong { - intern_write(&mut env, op, path, content).unwrap_or_else(|e| { + intern_write(&mut env, op, executor, path, content).unwrap_or_else(|e| { e.throw(&mut env); 0 }) @@ -112,6 +119,7 @@ pub unsafe extern "system" fn Java_org_apache_opendal_Operator_write( fn intern_write( env: &mut JNIEnv, op: *mut Operator, + executor: *const Executor, path: JString, content: JByteArray, ) -> Result { @@ -121,7 +129,7 @@ fn intern_write( let path = jstring_to_string(env, &path)?; let content = env.convert_byte_array(content)?; - unsafe { get_global_runtime() }.spawn(async move { + unsafe { executor_or_default(env, executor) }.spawn(async move { let result = do_write(op, path, content).await; complete_future(id, result.map(|_| JValueOwned::Void)) }); @@ -135,16 +143,17 @@ async fn do_write(op: &mut Operator, path: String, content: Vec) -> Result<( /// # Safety /// -/// This function should not be called before the Operator are ready. +/// This function should not be called before the Operator is ready. #[no_mangle] pub unsafe extern "system" fn Java_org_apache_opendal_Operator_append( mut env: JNIEnv, _: JClass, op: *mut Operator, + executor: *const Executor, path: JString, content: JByteArray, ) -> jlong { - intern_append(&mut env, op, path, content).unwrap_or_else(|e| { + intern_append(&mut env, op, executor, path, content).unwrap_or_else(|e| { e.throw(&mut env); 0 }) @@ -153,6 +162,7 @@ pub unsafe extern "system" fn Java_org_apache_opendal_Operator_append( fn intern_append( env: &mut JNIEnv, op: *mut Operator, + executor: *const Executor, path: JString, content: JByteArray, ) -> Result { @@ -162,7 +172,7 @@ fn intern_append( let path = jstring_to_string(env, &path)?; let content = env.convert_byte_array(content)?; - unsafe { get_global_runtime() }.spawn(async move { + unsafe { executor_or_default(env, executor) }.spawn(async move { let result = do_append(op, path, content).await; complete_future(id, result.map(|_| JValueOwned::Void)) }); @@ -176,27 +186,33 @@ async fn do_append(op: &mut Operator, path: String, content: Vec) -> Result< /// # Safety /// -/// This function should not be called before the Operator are ready. +/// This function should not be called before the Operator is ready. #[no_mangle] pub unsafe extern "system" fn Java_org_apache_opendal_Operator_stat( mut env: JNIEnv, _: JClass, op: *mut Operator, + executor: *const Executor, path: JString, ) -> jlong { - intern_stat(&mut env, op, path).unwrap_or_else(|e| { + intern_stat(&mut env, op, executor, path).unwrap_or_else(|e| { e.throw(&mut env); 0 }) } -fn intern_stat(env: &mut JNIEnv, op: *mut Operator, path: JString) -> Result { +fn intern_stat( + env: &mut JNIEnv, + op: *mut Operator, + executor: *const Executor, + path: JString, +) -> Result { let op = unsafe { &mut *op }; let id = request_id(env)?; let path = jstring_to_string(env, &path)?; - unsafe { get_global_runtime() }.spawn(async move { + unsafe { executor_or_default(env, executor) }.spawn(async move { let result = do_stat(op, path).await; complete_future(id, result.map(JValueOwned::Object)) }); @@ -212,27 +228,33 @@ async fn do_stat<'local>(op: &mut Operator, path: String) -> Result jlong { - intern_read(&mut env, op, path).unwrap_or_else(|e| { + intern_read(&mut env, op, executor, path).unwrap_or_else(|e| { e.throw(&mut env); 0 }) } -fn intern_read(env: &mut JNIEnv, op: *mut Operator, path: JString) -> Result { +fn intern_read( + env: &mut JNIEnv, + op: *mut Operator, + executor: *const Executor, + path: JString, +) -> Result { let op = unsafe { &mut *op }; let id = request_id(env)?; let path = jstring_to_string(env, &path)?; - unsafe { get_global_runtime() }.spawn(async move { + unsafe { executor_or_default(env, executor) }.spawn(async move { let result = do_read(op, path).await; complete_future(id, result.map(JValueOwned::Object)) }); @@ -250,27 +272,33 @@ async fn do_read<'local>(op: &mut Operator, path: String) -> Result jlong { - intern_delete(&mut env, op, path).unwrap_or_else(|e| { + intern_delete(&mut env, op, executor, path).unwrap_or_else(|e| { e.throw(&mut env); 0 }) } -fn intern_delete(env: &mut JNIEnv, op: *mut Operator, path: JString) -> Result { +fn intern_delete( + env: &mut JNIEnv, + op: *mut Operator, + executor: *const Executor, + path: JString, +) -> Result { let op = unsafe { &mut *op }; let id = request_id(env)?; let path = jstring_to_string(env, &path)?; - unsafe { get_global_runtime() }.spawn(async move { + unsafe { executor_or_default(env, executor) }.spawn(async move { let result = do_delete(op, path).await; complete_future(id, result.map(|_| JValueOwned::Void)) }); @@ -284,7 +312,7 @@ async fn do_delete(op: &mut Operator, path: String) -> Result<()> { /// # Safety /// -/// This function should not be called before the Operator are ready. +/// This function should not be called before the Operator is ready. #[no_mangle] pub unsafe extern "system" fn Java_org_apache_opendal_Operator_makeBlockingOp( _: JNIEnv, @@ -297,7 +325,7 @@ pub unsafe extern "system" fn Java_org_apache_opendal_Operator_makeBlockingOp( /// # Safety /// -/// This function should not be called before the Operator are ready. +/// This function should not be called before the Operator is ready. #[no_mangle] pub unsafe extern "system" fn Java_org_apache_opendal_Operator_makeOperatorInfo( mut env: JNIEnv, @@ -317,27 +345,33 @@ fn intern_make_operator_info(env: &mut JNIEnv, op: *mut Operator) -> Result jlong { - intern_create_dir(&mut env, op, path).unwrap_or_else(|e| { + intern_create_dir(&mut env, op, executor, path).unwrap_or_else(|e| { e.throw(&mut env); 0 }) } -fn intern_create_dir(env: &mut JNIEnv, op: *mut Operator, path: JString) -> Result { +fn intern_create_dir( + env: &mut JNIEnv, + op: *mut Operator, + executor: *const Executor, + path: JString, +) -> Result { let op = unsafe { &mut *op }; let id = request_id(env)?; let path = jstring_to_string(env, &path)?; - unsafe { get_global_runtime() }.spawn(async move { + unsafe { executor_or_default(env, executor) }.spawn(async move { let result = do_create_dir(op, path).await; complete_future(id, result.map(|_| JValueOwned::Void)) }); @@ -351,16 +385,17 @@ async fn do_create_dir(op: &mut Operator, path: String) -> Result<()> { /// # Safety /// -/// This function should not be called before the Operator are ready. +/// This function should not be called before the Operator is ready. #[no_mangle] pub unsafe extern "system" fn Java_org_apache_opendal_Operator_copy( mut env: JNIEnv, _: JClass, op: *mut Operator, + executor: *const Executor, source_path: JString, target_path: JString, ) -> jlong { - intern_copy(&mut env, op, source_path, target_path).unwrap_or_else(|e| { + intern_copy(&mut env, op, executor, source_path, target_path).unwrap_or_else(|e| { e.throw(&mut env); 0 }) @@ -369,6 +404,7 @@ pub unsafe extern "system" fn Java_org_apache_opendal_Operator_copy( fn intern_copy( env: &mut JNIEnv, op: *mut Operator, + executor: *const Executor, source_path: JString, target_path: JString, ) -> Result { @@ -378,7 +414,7 @@ fn intern_copy( let source_path = jstring_to_string(env, &source_path)?; let target_path = jstring_to_string(env, &target_path)?; - unsafe { get_global_runtime() }.spawn(async move { + unsafe { executor_or_default(env, executor) }.spawn(async move { let result = do_copy(op, source_path, target_path).await; complete_future(id, result.map(|_| JValueOwned::Void)) }); @@ -392,16 +428,17 @@ async fn do_copy(op: &mut Operator, source_path: String, target_path: String) -> /// # Safety /// -/// This function should not be called before the Operator are ready. +/// This function should not be called before the Operator is ready. #[no_mangle] pub unsafe extern "system" fn Java_org_apache_opendal_Operator_rename( mut env: JNIEnv, _: JClass, op: *mut Operator, + executor: *const Executor, source_path: JString, target_path: JString, ) -> jlong { - intern_rename(&mut env, op, source_path, target_path).unwrap_or_else(|e| { + intern_rename(&mut env, op, executor, source_path, target_path).unwrap_or_else(|e| { e.throw(&mut env); 0 }) @@ -410,6 +447,7 @@ pub unsafe extern "system" fn Java_org_apache_opendal_Operator_rename( fn intern_rename( env: &mut JNIEnv, op: *mut Operator, + executor: *const Executor, source_path: JString, target_path: JString, ) -> Result { @@ -419,7 +457,7 @@ fn intern_rename( let source_path = jstring_to_string(env, &source_path)?; let target_path = jstring_to_string(env, &target_path)?; - unsafe { get_global_runtime() }.spawn(async move { + unsafe { executor_or_default(env, executor) }.spawn(async move { let result = do_rename(op, source_path, target_path).await; complete_future(id, result.map(|_| JValueOwned::Void)) }); @@ -433,27 +471,33 @@ async fn do_rename(op: &mut Operator, source_path: String, target_path: String) /// # Safety /// -/// This function should not be called before the Operator are ready. +/// This function should not be called before the Operator is ready. #[no_mangle] pub unsafe extern "system" fn Java_org_apache_opendal_Operator_removeAll( mut env: JNIEnv, _: JClass, op: *mut Operator, + executor: *const Executor, path: JString, ) -> jlong { - intern_remove_all(&mut env, op, path).unwrap_or_else(|e| { + intern_remove_all(&mut env, op, executor, path).unwrap_or_else(|e| { e.throw(&mut env); 0 }) } -fn intern_remove_all(env: &mut JNIEnv, op: *mut Operator, path: JString) -> Result { +fn intern_remove_all( + env: &mut JNIEnv, + op: *mut Operator, + executor: *const Executor, + path: JString, +) -> Result { let op = unsafe { &mut *op }; let id = request_id(env)?; let path = jstring_to_string(env, &path)?; - unsafe { get_global_runtime() }.spawn(async move { + unsafe { executor_or_default(env, executor) }.spawn(async move { let result = do_remove_all(op, path).await; complete_future(id, result.map(|_| JValueOwned::Void)) }); @@ -467,27 +511,33 @@ async fn do_remove_all(op: &mut Operator, path: String) -> Result<()> { /// # Safety /// -/// This function should not be called before the Operator are ready. +/// This function should not be called before the Operator is ready. #[no_mangle] pub unsafe extern "system" fn Java_org_apache_opendal_Operator_list( mut env: JNIEnv, _: JClass, op: *mut Operator, + executor: *const Executor, path: JString, ) -> jlong { - intern_list(&mut env, op, path).unwrap_or_else(|e| { + intern_list(&mut env, op, executor, path).unwrap_or_else(|e| { e.throw(&mut env); 0 }) } -fn intern_list(env: &mut JNIEnv, op: *mut Operator, path: JString) -> Result { +fn intern_list( + env: &mut JNIEnv, + op: *mut Operator, + executor: *const Executor, + path: JString, +) -> Result { let op = unsafe { &mut *op }; let id = request_id(env)?; let path = jstring_to_string(env, &path)?; - unsafe { get_global_runtime() }.spawn(async move { + unsafe { executor_or_default(env, executor) }.spawn(async move { let result = do_list(op, path).await; complete_future(id, result.map(JValueOwned::Object)) }); @@ -515,16 +565,17 @@ async fn do_list<'local>(op: &mut Operator, path: String) -> Result jlong { - intern_presign_read(&mut env, op, path, expire).unwrap_or_else(|e| { + intern_presign_read(&mut env, op, executor, path, expire).unwrap_or_else(|e| { e.throw(&mut env); 0 }) @@ -533,6 +584,7 @@ pub unsafe extern "system" fn Java_org_apache_opendal_Operator_presignRead( fn intern_presign_read( env: &mut JNIEnv, op: *mut Operator, + executor: *const Executor, path: JString, expire: jlong, ) -> Result { @@ -542,7 +594,7 @@ fn intern_presign_read( let path = jstring_to_string(env, &path)?; let expire = Duration::from_nanos(expire as u64); - unsafe { get_global_runtime() }.spawn(async move { + unsafe { executor_or_default(env, executor) }.spawn(async move { let result = do_presign_read(op, path, expire).await; let mut env = unsafe { get_current_env() }; let result = result.and_then(|req| make_presigned_request(&mut env, req)); @@ -562,16 +614,17 @@ async fn do_presign_read( /// # Safety /// -/// This function should not be called before the Operator are ready. +/// This function should not be called before the Operator is ready. #[no_mangle] pub unsafe extern "system" fn Java_org_apache_opendal_Operator_presignWrite( mut env: JNIEnv, _: JClass, op: *mut Operator, + executor: *const Executor, path: JString, expire: jlong, ) -> jlong { - intern_presign_write(&mut env, op, path, expire).unwrap_or_else(|e| { + intern_presign_write(&mut env, op, executor, path, expire).unwrap_or_else(|e| { e.throw(&mut env); 0 }) @@ -580,6 +633,7 @@ pub unsafe extern "system" fn Java_org_apache_opendal_Operator_presignWrite( fn intern_presign_write( env: &mut JNIEnv, op: *mut Operator, + executor: *const Executor, path: JString, expire: jlong, ) -> Result { @@ -589,7 +643,7 @@ fn intern_presign_write( let path = jstring_to_string(env, &path)?; let expire = Duration::from_nanos(expire as u64); - unsafe { get_global_runtime() }.spawn(async move { + unsafe { executor_or_default(env, executor) }.spawn(async move { let result = do_presign_write(op, path, expire).await; let mut env = unsafe { get_current_env() }; let result = result.and_then(|req| make_presigned_request(&mut env, req)); @@ -609,16 +663,17 @@ async fn do_presign_write( /// # Safety /// -/// This function should not be called before the Operator are ready. +/// This function should not be called before the Operator is ready. #[no_mangle] pub unsafe extern "system" fn Java_org_apache_opendal_Operator_presignStat( mut env: JNIEnv, _: JClass, op: *mut Operator, + executor: *const Executor, path: JString, expire: jlong, ) -> jlong { - intern_presign_stat(&mut env, op, path, expire).unwrap_or_else(|e| { + intern_presign_stat(&mut env, op, executor, path, expire).unwrap_or_else(|e| { e.throw(&mut env); 0 }) @@ -627,6 +682,7 @@ pub unsafe extern "system" fn Java_org_apache_opendal_Operator_presignStat( fn intern_presign_stat( env: &mut JNIEnv, op: *mut Operator, + executor: *const Executor, path: JString, expire: jlong, ) -> Result { @@ -636,7 +692,7 @@ fn intern_presign_stat( let path = jstring_to_string(env, &path)?; let expire = Duration::from_nanos(expire as u64); - unsafe { get_global_runtime() }.spawn(async move { + unsafe { executor_or_default(env, executor) }.spawn(async move { let result = do_presign_stat(op, path, expire).await; let mut env = unsafe { get_current_env() }; let result = result.and_then(|req| make_presigned_request(&mut env, req)); @@ -674,30 +730,33 @@ fn make_object<'local>( } fn complete_future(id: jlong, result: Result) { + try_complete_future(id, result).expect("complete future must succeed"); +} + +fn try_complete_future(id: jlong, result: Result) -> Result<()> { let mut env = unsafe { get_current_env() }; - let future = get_future(&mut env, id).unwrap(); + let future = get_future(&mut env, id)?; match result { Ok(result) => { - let result = make_object(&mut env, result).unwrap(); + let result = make_object(&mut env, result)?; env.call_method( future, "complete", "(Ljava/lang/Object;)Z", &[JValue::Object(&result)], - ) - .unwrap() + )? } Err(err) => { - let exception = err.to_exception(&mut env).unwrap(); + let exception = err.to_exception(&mut env)?; env.call_method( future, "completeExceptionally", "(Ljava/lang/Throwable;)Z", &[JValue::Object(&exception)], - ) - .unwrap() + )? } }; + Ok(()) } fn request_id(env: &mut JNIEnv) -> Result { diff --git a/bindings/java/src/test/java/org/apache/opendal/test/AsyncExecutorTest.java b/bindings/java/src/test/java/org/apache/opendal/test/AsyncExecutorTest.java new file mode 100644 index 000000000000..266a7c121c54 --- /dev/null +++ b/bindings/java/src/test/java/org/apache/opendal/test/AsyncExecutorTest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.opendal.test; + +import static org.assertj.core.api.Assertions.assertThat; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import lombok.Cleanup; +import org.apache.opendal.AsyncExecutor; +import org.apache.opendal.Operator; +import org.junit.jupiter.api.Test; + +public class AsyncExecutorTest { + @Test + void testOperatorWithRetryLayer() { + final Map conf = new HashMap<>(); + conf.put("root", "/opendal/"); + final int cores = Runtime.getRuntime().availableProcessors(); + @Cleanup final AsyncExecutor executor = AsyncExecutor.createTokioExecutor(cores); + @Cleanup final Operator op = Operator.of("memory", conf, executor); + assertThat(op.info).isNotNull(); + + final String key = "key"; + final byte[] v0 = "v0".getBytes(StandardCharsets.UTF_8); + final byte[] v1 = "v1".getBytes(StandardCharsets.UTF_8); + op.write(key, v0).join(); + assertThat(op.read(key).join()).isEqualTo(v0); + op.write(key, v1).join(); + assertThat(op.read(key).join()).isEqualTo(v1); + } +} diff --git a/bindings/java/src/utility.rs b/bindings/java/src/utility.rs index 02f382ec80e2..1a57a384d8e8 100644 --- a/bindings/java/src/utility.rs +++ b/bindings/java/src/utility.rs @@ -27,7 +27,7 @@ use crate::Result; /// # Safety /// -/// This function should not be called before the Operator are ready. +/// This function should not be called before the Operator is ready. #[no_mangle] pub unsafe extern "system" fn Java_org_apache_opendal_OpenDAL_loadEnabledServices( mut env: JNIEnv,