Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

4.1.5: Adds support for async on @ExecuteOn methods #9566

Merged
merged 1 commit into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions docs/src/main/asciidoc/mp/threading.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ yet this process is still underway and some legacy libraries may never be fully

Helidon MP supports a new `@ExecuteOn` annotation to give developers full control on how to run
tasks. This annotation can be applied to any CDI bean method to control the type of thread in
which invocations of that method shall execute on.
which invocations of that method shall execute on. If such a method returns `CompletionStage`
or `CompletableFuture`, it is assumed to be asynchronous and shall execute in a new thread
but without blocking the caller's thread.

include::{rootdir}/includes/dependencies.adoc[]

Expand Down Expand Up @@ -129,10 +131,20 @@ but that is not a requirement in CDI.
include::{sourcedir}/mp/ExecuteOnSnippets.java[tag=snippet_2, indent=0]
----

3. Finally, it is also possible to explicitly execute a method in a
virtual thread, blocking the caller thread until the method execution is complete.
3. It is also possible to explicitly execute a method in a
virtual thread, blocking the caller's thread until the method execution is complete.
+
[source,java]
----
include::{sourcedir}/mp/ExecuteOnSnippets.java[tag=snippet_3, indent=0]
----

4. Finally, a method can be executed in another thread but without blocking
the caller's thread. This behavior is triggered automatically when the bean method returns
`CompletionStage` or `CompletableFuture`.
+
[source,java]
----
include::{sourcedir}/mp/ExecuteOnSnippets.java[tag=snippet_4, indent=0]
----

13 changes: 13 additions & 0 deletions docs/src/main/java/io/helidon/docs/mp/ExecuteOnSnippets.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import io.helidon.microprofile.cdi.ExecuteOn;
import jakarta.enterprise.inject.Produces;
Expand Down Expand Up @@ -69,4 +71,15 @@ void someTask() {
}
}
// end::snippet_3[]

// tag::snippet_4[]
public class MyVirtualBeanAsync {

@ExecuteOn(ThreadType.VIRTUAL)
CompletionStage<String> someTask() {
// run task on virtual thread without blocking caller
return CompletableFuture.completedFuture("DONE");
}
}
// end::snippet_4[]
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
import jakarta.interceptor.InterceptorBinding;

/**
* Annotates a CDI bean method that shall be executed on a new thread.
* Annotates a CDI bean method that shall be executed on a new thread. If the method returns
* {@link java.util.concurrent.CompletableFuture} or {@link java.util.concurrent.CompletionStage},
* it is assumed to be asynchronous.
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
Expand Down Expand Up @@ -65,15 +67,15 @@ enum ThreadType {
ThreadType value() default ThreadType.PLATFORM;

/**
* Waiting timeout.
* Waiting timeout, used when the method is synchronous.
*
* @return waiting timeout
*/
@Nonbinding
long timeout() default 10000L;

/**
* Waiting time unit.
* Waiting time unit, used when the method is synchronous.
*
* @return waiting time unit
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
import java.lang.reflect.Method;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

import io.helidon.common.LazyValue;

Expand All @@ -32,6 +35,7 @@
import jakarta.enterprise.inject.spi.Bean;
import jakarta.enterprise.inject.spi.BeanManager;
import jakarta.enterprise.inject.spi.BeforeBeanDiscovery;
import jakarta.enterprise.inject.spi.DeploymentException;
import jakarta.enterprise.inject.spi.Extension;
import jakarta.enterprise.inject.spi.ProcessManagedBean;
import jakarta.enterprise.inject.spi.ProcessSyntheticBean;
Expand All @@ -41,7 +45,13 @@
*/
public class ExecuteOnExtension implements Extension {

enum MethodType {
BLOCKING,
NON_BLOCKING
};

private final LazyValue<Map<Method, AnnotatedMethod<?>>> methodMap = LazyValue.create(ConcurrentHashMap::new);
private final LazyValue<Map<Method, MethodType>> methodType = LazyValue.create(ConcurrentHashMap::new);

void registerMethods(BeanManager bm, @Observes ProcessSyntheticBean<?> event) {
registerMethods(bm.createAnnotatedType(event.getBean().getBeanClass()));
Expand All @@ -54,7 +64,9 @@ void registerMethods(@Observes ProcessManagedBean<?> event) {
private void registerMethods(AnnotatedType<?> type) {
for (AnnotatedMethod<?> annotatedMethod : type.getMethods()) {
if (annotatedMethod.isAnnotationPresent(ExecuteOn.class)) {
methodMap.get().put(annotatedMethod.getJavaMember(), annotatedMethod);
Method method = annotatedMethod.getJavaMember();
methodMap.get().put(method, annotatedMethod);
methodType.get().put(method, findMethodType(method));
}
}
}
Expand All @@ -63,6 +75,19 @@ void validateAnnotations(BeanManager bm, @Observes @Initialized(ApplicationScope
methodMap.get().forEach((method, annotatedMethod) -> validateExecutor(bm, annotatedMethod));
}


private static MethodType findMethodType(Method method) {
Class<?> returnType = method.getReturnType();
if (CompletionStage.class.isAssignableFrom(returnType)
|| CompletableFuture.class.isAssignableFrom(returnType)) {
return MethodType.NON_BLOCKING;
}
if (Future.class.equals(returnType)) {
throw new DeploymentException("Future is not supported as return type of ExecuteOn method");
}
return MethodType.BLOCKING;
}

private static void validateExecutor(BeanManager bm, AnnotatedMethod<?> method) {
ExecuteOn executeOn = method.getAnnotation(ExecuteOn.class);
if (executeOn.value() == ExecuteOn.ThreadType.EXECUTOR) {
Expand All @@ -85,12 +110,17 @@ ExecuteOn getAnnotation(Method method) {
throw new IllegalArgumentException("Unable to map method " + method);
}

MethodType getMethodType(Method method) {
return methodType.get().get(method);
}

void registerInterceptors(@Observes BeforeBeanDiscovery discovery, BeanManager bm) {
discovery.addAnnotatedType(bm.createAnnotatedType(ExecuteOnInterceptor.class),
ExecuteOnInterceptor.class.getName());
}

void clearMethodMap() {
methodMap.get().clear();
methodType.get().clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,13 @@

package io.helidon.microprofile.cdi;

import java.lang.reflect.Method;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;

import io.helidon.common.LazyValue;
import io.helidon.common.configurable.ThreadPoolSupplier;
Expand Down Expand Up @@ -47,56 +53,111 @@ class ExecuteOnInterceptor {

private static final LazyValue<ExecutorService> PLATFORM_EXECUTOR_SERVICE
= LazyValue.create(() -> {
Config mpConfig = ConfigProvider.getConfig();
io.helidon.config.Config config = MpConfig.toHelidonConfig(mpConfig);
return ThreadPoolSupplier.builder()
.threadNamePrefix(EXECUTE_ON)
.config(config.get(RUN_ON_PLATFORM_THREAD))
.virtualThreads(false) // overrides to platform threads
.build()
.get();
});
Config mpConfig = ConfigProvider.getConfig();
io.helidon.config.Config config = MpConfig.toHelidonConfig(mpConfig);
return ThreadPoolSupplier.builder()
.threadNamePrefix(EXECUTE_ON)
.config(config.get(RUN_ON_PLATFORM_THREAD))
.virtualThreads(false) // overrides to platform threads
.build()
.get();
});

private static final LazyValue<ExecutorService> VIRTUAL_EXECUTOR_SERVICE
= LazyValue.create(() -> {
Config mpConfig = ConfigProvider.getConfig();
io.helidon.config.Config config = MpConfig.toHelidonConfig(mpConfig);
String threadNamePrefix = config.get(RUN_ON_VIRTUAL_THREAD)
.get("thread-name-prefix")
.asString()
.asOptional()
.orElse(EXECUTE_ON);
return ThreadPoolSupplier.builder()
.threadNamePrefix(threadNamePrefix)
.virtualThreads(true)
.build()
.get();
});
Config mpConfig = ConfigProvider.getConfig();
io.helidon.config.Config config = MpConfig.toHelidonConfig(mpConfig);
String threadNamePrefix = config.get(RUN_ON_VIRTUAL_THREAD)
.get("thread-name-prefix")
.asString()
.asOptional()
.orElse(EXECUTE_ON);
return ThreadPoolSupplier.builder()
.threadNamePrefix(threadNamePrefix)
.virtualThreads(true)
.build()
.get();
});

@Inject
private ExecuteOnExtension extension;

/**
* Intercepts a call to bean method annotated by {@code @OnNewThread}.
* Intercepts a call to bean method annotated by {@link io.helidon.microprofile.cdi.ExecuteOn}.
*
* @param context Invocation context.
* @return Whatever the intercepted method returns.
* @throws Throwable If a problem occurs.
*/
@AroundInvoke
@SuppressWarnings("unchecked")
public Object executeOn(InvocationContext context) throws Throwable {
ExecuteOn executeOn = extension.getAnnotation(context.getMethod());
return switch (executeOn.value()) {
case PLATFORM -> PLATFORM_EXECUTOR_SERVICE.get()
.submit(context::proceed)
.get(executeOn.timeout(), executeOn.unit());
case VIRTUAL -> VIRTUAL_EXECUTOR_SERVICE.get()
.submit(context::proceed)
.get(executeOn.timeout(), executeOn.unit());
case EXECUTOR -> findExecutor(executeOn.executorName())
.submit(context::proceed)
.get(executeOn.timeout(), executeOn.unit());
Method method = context.getMethod();
ExecuteOn executeOn = extension.getAnnotation(method);

// find executor service to use
ExecutorService executorService = switch (executeOn.value()) {
case PLATFORM -> PLATFORM_EXECUTOR_SERVICE.get();
case VIRTUAL -> VIRTUAL_EXECUTOR_SERVICE.get();
case EXECUTOR -> findExecutor(executeOn.executorName());
};

switch (extension.getMethodType(method)) {
case BLOCKING:
// block until call completes
return executorService.submit(context::proceed).get(executeOn.timeout(), executeOn.unit());
case NON_BLOCKING:
// execute call asynchronously
CompletableFuture<?> supplyFuture = CompletableFuture.supplyAsync(
() -> {
try {
return context.proceed();
} catch (Exception e) {
throw new RuntimeException(e);
}
}, executorService);

// return new, cancellable completable future
AtomicBoolean mayInterrupt = new AtomicBoolean(false);
CompletableFuture<Object> resultFuture = new CompletableFuture<>() {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
mayInterrupt.set(mayInterruptIfRunning);
return super.cancel(mayInterruptIfRunning);
}
};

// link completion of supplyFuture with resultFuture
supplyFuture.whenComplete((result, throwable) -> {
if (throwable == null) {
// result must be CompletionStage or CompletableFuture
CompletableFuture<Object> cfResult = !(result instanceof CompletableFuture<?>)
? ((CompletionStage<Object>) result).toCompletableFuture()
: (CompletableFuture<Object>) result;
cfResult.whenComplete((r, t) -> {
if (t == null) {
resultFuture.complete(r);
} else {
resultFuture.completeExceptionally(unwrapThrowable(t));
}
});
} else {
resultFuture.completeExceptionally(unwrapThrowable(throwable));
}
});

// if resultFuture is cancelled, then cancel supplyFuture
resultFuture.exceptionally(t -> {
if (t instanceof CancellationException) {
supplyFuture.cancel(mayInterrupt.get());
}
return null;
});

return resultFuture;
default:
throw new IllegalStateException("Unrecognized ExecuteOn method type");
}
}

/**
Expand All @@ -108,4 +169,14 @@ public Object executeOn(InvocationContext context) throws Throwable {
private static ExecutorService findExecutor(String executorName) {
return CDI.current().select(ExecutorService.class, NamedLiteral.of(executorName)).get();
}

/**
* Extract underlying throwable.
*
* @param t the throwable
* @return the wrapped throwable
*/
private static Throwable unwrapThrowable(Throwable t) {
return t instanceof ExecutionException ? t.getCause() : t;
}
}
Loading