Skip to content

Commit

Permalink
Adds support for async on @ExecuteOn methods (#9551)
Browse files Browse the repository at this point in the history
* Adds support for async on @ExecuteOn methods.

Signed-off-by: Santiago Pericas-Geertsen <[email protected]>

* New test that verifies correct unwrapping of exceptions.

Signed-off-by: Santiago Pericas-Geertsen <[email protected]>

---------

Signed-off-by: Santiago Pericas-Geertsen <[email protected]>
  • Loading branch information
spericas authored Dec 5, 2024
1 parent beeac25 commit 56985e6
Show file tree
Hide file tree
Showing 6 changed files with 366 additions and 41 deletions.
18 changes: 15 additions & 3 deletions docs/src/main/asciidoc/mp/threading.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ yet this process is still underway and some legacy libraries may never be fully
Helidon MP supports a new `@ExecuteOn` annotation to give developers full control on how to run
tasks. This annotation can be applied to any CDI bean method to control the type of thread in
which invocations of that method shall execute on.
which invocations of that method shall execute on. If such a method returns `CompletionStage`
or `CompletableFuture`, it is assumed to be asynchronous and shall execute in a new thread
but without blocking the caller's thread.
include::{rootdir}/includes/dependencies.adoc[]
Expand Down Expand Up @@ -129,10 +131,20 @@ but that is not a requirement in CDI.
include::{sourcedir}/mp/ExecuteOnSnippets.java[tag=snippet_2, indent=0]
----
3. Finally, it is also possible to explicitly execute a method in a
virtual thread, blocking the caller thread until the method execution is complete.
3. It is also possible to explicitly execute a method in a
virtual thread, blocking the caller's thread until the method execution is complete.
+
[source,java]
----
include::{sourcedir}/mp/ExecuteOnSnippets.java[tag=snippet_3, indent=0]
----
4. Finally, a method can be executed in another thread but without blocking
the caller's thread. This behavior is triggered automatically when the bean method returns
`CompletionStage` or `CompletableFuture`.
+
[source,java]
----
include::{sourcedir}/mp/ExecuteOnSnippets.java[tag=snippet_4, indent=0]
----
13 changes: 13 additions & 0 deletions docs/src/main/java/io/helidon/docs/mp/ExecuteOnSnippets.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import io.helidon.microprofile.cdi.ExecuteOn;
import jakarta.enterprise.inject.Produces;
Expand Down Expand Up @@ -69,4 +71,15 @@ void someTask() {
}
}
// end::snippet_3[]

// tag::snippet_4[]
public class MyVirtualBeanAsync {

@ExecuteOn(ThreadType.VIRTUAL)
CompletionStage<String> someTask() {
// run task on virtual thread without blocking caller
return CompletableFuture.completedFuture("DONE");
}
}
// end::snippet_4[]
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
import jakarta.interceptor.InterceptorBinding;

/**
* Annotates a CDI bean method that shall be executed on a new thread.
* Annotates a CDI bean method that shall be executed on a new thread. If the method returns
* {@link java.util.concurrent.CompletableFuture} or {@link java.util.concurrent.CompletionStage},
* it is assumed to be asynchronous.
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
Expand Down Expand Up @@ -65,15 +67,15 @@ enum ThreadType {
ThreadType value() default ThreadType.PLATFORM;

/**
* Waiting timeout.
* Waiting timeout, used when the method is synchronous.
*
* @return waiting timeout
*/
@Nonbinding
long timeout() default 10000L;

/**
* Waiting time unit.
* Waiting time unit, used when the method is synchronous.
*
* @return waiting time unit
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
import java.lang.reflect.Method;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

import io.helidon.common.LazyValue;

Expand All @@ -32,6 +35,7 @@
import jakarta.enterprise.inject.spi.Bean;
import jakarta.enterprise.inject.spi.BeanManager;
import jakarta.enterprise.inject.spi.BeforeBeanDiscovery;
import jakarta.enterprise.inject.spi.DeploymentException;
import jakarta.enterprise.inject.spi.Extension;
import jakarta.enterprise.inject.spi.ProcessManagedBean;
import jakarta.enterprise.inject.spi.ProcessSyntheticBean;
Expand All @@ -41,7 +45,13 @@
*/
public class ExecuteOnExtension implements Extension {

enum MethodType {
BLOCKING,
NON_BLOCKING
};

private final LazyValue<Map<Method, AnnotatedMethod<?>>> methodMap = LazyValue.create(ConcurrentHashMap::new);
private final LazyValue<Map<Method, MethodType>> methodType = LazyValue.create(ConcurrentHashMap::new);

void registerMethods(BeanManager bm, @Observes ProcessSyntheticBean<?> event) {
registerMethods(bm.createAnnotatedType(event.getBean().getBeanClass()));
Expand All @@ -54,7 +64,9 @@ void registerMethods(@Observes ProcessManagedBean<?> event) {
private void registerMethods(AnnotatedType<?> type) {
for (AnnotatedMethod<?> annotatedMethod : type.getMethods()) {
if (annotatedMethod.isAnnotationPresent(ExecuteOn.class)) {
methodMap.get().put(annotatedMethod.getJavaMember(), annotatedMethod);
Method method = annotatedMethod.getJavaMember();
methodMap.get().put(method, annotatedMethod);
methodType.get().put(method, findMethodType(method));
}
}
}
Expand All @@ -63,6 +75,19 @@ void validateAnnotations(BeanManager bm, @Observes @Initialized(ApplicationScope
methodMap.get().forEach((method, annotatedMethod) -> validateExecutor(bm, annotatedMethod));
}


private static MethodType findMethodType(Method method) {
Class<?> returnType = method.getReturnType();
if (CompletionStage.class.isAssignableFrom(returnType)
|| CompletableFuture.class.isAssignableFrom(returnType)) {
return MethodType.NON_BLOCKING;
}
if (Future.class.equals(returnType)) {
throw new DeploymentException("Future is not supported as return type of ExecuteOn method");
}
return MethodType.BLOCKING;
}

private static void validateExecutor(BeanManager bm, AnnotatedMethod<?> method) {
ExecuteOn executeOn = method.getAnnotation(ExecuteOn.class);
if (executeOn.value() == ExecuteOn.ThreadType.EXECUTOR) {
Expand All @@ -85,12 +110,17 @@ ExecuteOn getAnnotation(Method method) {
throw new IllegalArgumentException("Unable to map method " + method);
}

MethodType getMethodType(Method method) {
return methodType.get().get(method);
}

void registerInterceptors(@Observes BeforeBeanDiscovery discovery, BeanManager bm) {
discovery.addAnnotatedType(bm.createAnnotatedType(ExecuteOnInterceptor.class),
ExecuteOnInterceptor.class.getName());
}

void clearMethodMap() {
methodMap.get().clear();
methodType.get().clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,13 @@

package io.helidon.microprofile.cdi;

import java.lang.reflect.Method;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;

import io.helidon.common.LazyValue;
import io.helidon.common.configurable.ThreadPoolSupplier;
Expand Down Expand Up @@ -47,56 +53,111 @@ class ExecuteOnInterceptor {

private static final LazyValue<ExecutorService> PLATFORM_EXECUTOR_SERVICE
= LazyValue.create(() -> {
Config mpConfig = ConfigProvider.getConfig();
io.helidon.config.Config config = MpConfig.toHelidonConfig(mpConfig);
return ThreadPoolSupplier.builder()
.threadNamePrefix(EXECUTE_ON)
.config(config.get(RUN_ON_PLATFORM_THREAD))
.virtualThreads(false) // overrides to platform threads
.build()
.get();
});
Config mpConfig = ConfigProvider.getConfig();
io.helidon.config.Config config = MpConfig.toHelidonConfig(mpConfig);
return ThreadPoolSupplier.builder()
.threadNamePrefix(EXECUTE_ON)
.config(config.get(RUN_ON_PLATFORM_THREAD))
.virtualThreads(false) // overrides to platform threads
.build()
.get();
});

private static final LazyValue<ExecutorService> VIRTUAL_EXECUTOR_SERVICE
= LazyValue.create(() -> {
Config mpConfig = ConfigProvider.getConfig();
io.helidon.config.Config config = MpConfig.toHelidonConfig(mpConfig);
String threadNamePrefix = config.get(RUN_ON_VIRTUAL_THREAD)
.get("thread-name-prefix")
.asString()
.asOptional()
.orElse(EXECUTE_ON);
return ThreadPoolSupplier.builder()
.threadNamePrefix(threadNamePrefix)
.virtualThreads(true)
.build()
.get();
});
Config mpConfig = ConfigProvider.getConfig();
io.helidon.config.Config config = MpConfig.toHelidonConfig(mpConfig);
String threadNamePrefix = config.get(RUN_ON_VIRTUAL_THREAD)
.get("thread-name-prefix")
.asString()
.asOptional()
.orElse(EXECUTE_ON);
return ThreadPoolSupplier.builder()
.threadNamePrefix(threadNamePrefix)
.virtualThreads(true)
.build()
.get();
});

@Inject
private ExecuteOnExtension extension;

/**
* Intercepts a call to bean method annotated by {@code @OnNewThread}.
* Intercepts a call to bean method annotated by {@link io.helidon.microprofile.cdi.ExecuteOn}.
*
* @param context Invocation context.
* @return Whatever the intercepted method returns.
* @throws Throwable If a problem occurs.
*/
@AroundInvoke
@SuppressWarnings("unchecked")
public Object executeOn(InvocationContext context) throws Throwable {
ExecuteOn executeOn = extension.getAnnotation(context.getMethod());
return switch (executeOn.value()) {
case PLATFORM -> PLATFORM_EXECUTOR_SERVICE.get()
.submit(context::proceed)
.get(executeOn.timeout(), executeOn.unit());
case VIRTUAL -> VIRTUAL_EXECUTOR_SERVICE.get()
.submit(context::proceed)
.get(executeOn.timeout(), executeOn.unit());
case EXECUTOR -> findExecutor(executeOn.executorName())
.submit(context::proceed)
.get(executeOn.timeout(), executeOn.unit());
Method method = context.getMethod();
ExecuteOn executeOn = extension.getAnnotation(method);

// find executor service to use
ExecutorService executorService = switch (executeOn.value()) {
case PLATFORM -> PLATFORM_EXECUTOR_SERVICE.get();
case VIRTUAL -> VIRTUAL_EXECUTOR_SERVICE.get();
case EXECUTOR -> findExecutor(executeOn.executorName());
};

switch (extension.getMethodType(method)) {
case BLOCKING:
// block until call completes
return executorService.submit(context::proceed).get(executeOn.timeout(), executeOn.unit());
case NON_BLOCKING:
// execute call asynchronously
CompletableFuture<?> supplyFuture = CompletableFuture.supplyAsync(
() -> {
try {
return context.proceed();
} catch (Exception e) {
throw new RuntimeException(e);
}
}, executorService);

// return new, cancellable completable future
AtomicBoolean mayInterrupt = new AtomicBoolean(false);
CompletableFuture<Object> resultFuture = new CompletableFuture<>() {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
mayInterrupt.set(mayInterruptIfRunning);
return super.cancel(mayInterruptIfRunning);
}
};

// link completion of supplyFuture with resultFuture
supplyFuture.whenComplete((result, throwable) -> {
if (throwable == null) {
// result must be CompletionStage or CompletableFuture
CompletableFuture<Object> cfResult = !(result instanceof CompletableFuture<?>)
? ((CompletionStage<Object>) result).toCompletableFuture()
: (CompletableFuture<Object>) result;
cfResult.whenComplete((r, t) -> {
if (t == null) {
resultFuture.complete(r);
} else {
resultFuture.completeExceptionally(unwrapThrowable(t));
}
});
} else {
resultFuture.completeExceptionally(unwrapThrowable(throwable));
}
});

// if resultFuture is cancelled, then cancel supplyFuture
resultFuture.exceptionally(t -> {
if (t instanceof CancellationException) {
supplyFuture.cancel(mayInterrupt.get());
}
return null;
});

return resultFuture;
default:
throw new IllegalStateException("Unrecognized ExecuteOn method type");
}
}

/**
Expand All @@ -108,4 +169,14 @@ public Object executeOn(InvocationContext context) throws Throwable {
private static ExecutorService findExecutor(String executorName) {
return CDI.current().select(ExecutorService.class, NamedLiteral.of(executorName)).get();
}

/**
* Extract underlying throwable.
*
* @param t the throwable
* @return the wrapped throwable
*/
private static Throwable unwrapThrowable(Throwable t) {
return t instanceof ExecutionException ? t.getCause() : t;
}
}
Loading

0 comments on commit 56985e6

Please sign in to comment.