Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable Tools to Define Execution Model #1023

Merged
merged 6 commits into from
Nov 6, 2024

Conversation

cescoffier
Copy link
Collaborator

@cescoffier cescoffier commented Oct 30, 2024

This commit introduces support for tools to define their execution model with annotations such as @Blocking, @nonblocking, and @RunOnVirtualThread, or to rely on the method signature to specify the execution model.

Execution constraints are enforced based on the caller thread; for instance, blocking calls are disallowed on the event loop.

For reactive AiService method (streaming), a detection is done at build time to determine if the execution can be done on the event loop or need to be switched to a worker thread. The detection checks the execution model of the various tool methods the AI service method can invoke.

  • Write documentation
  • Update sample to show blocking tools when streaming is used
  • Make sure the duplicated context is propagated

@cescoffier cescoffier requested a review from a team as a code owner October 30, 2024 18:28
@cescoffier cescoffier requested a review from geoand October 30, 2024 18:33
@andreadimaio
Copy link
Collaborator

andreadimaio commented Oct 30, 2024

I was curious to test these changes in my workspace, but when I use the @RunOnVirtualThread annotation on a @Tool method, I encounter an exception during application startup. However, this issue doesn't occur when I run your tests in the core module, where "virtual thread" methods work fine. It is possible that this is an issue on my side 🤔

Moving on, I would like to ask about a piece of code used within the StreamingChatLanguageModel in watsonx.ai. This code can be removed once this PR is merged, right?

@geoand
Copy link
Collaborator

geoand commented Oct 31, 2024

I was curious to test these changes in my workspace, but when I use the @RunOnVirtualThread annotation on a @tool method, I encounter an exception during application startup. However, this issue doesn't occur when I run your tests in the core module, where "virtual thread" methods work fine. It is possible that this is an issue on my side 🤔

Can you perhaps add a test into this PR that surfaces the problem?

Copy link
Collaborator

@geoand geoand left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome stuff!

I only added a few minor comments, but we should also see what is causing the problem that @andreadimaio mentions

@cescoffier
Copy link
Collaborator Author

It should have been a draft - sorry.

@cescoffier cescoffier marked this pull request as draft October 31, 2024 07:24
@andreadimaio
Copy link
Collaborator

andreadimaio commented Oct 31, 2024

Can you perhaps add a test into this PR that surfaces the problem?

Yes, I could create something in the integration-tests folder. Let me know if you'd like me to proceed, or if you'd prefer to add the test later.

@geoand
Copy link
Collaborator

geoand commented Oct 31, 2024

Up to @cescoffier

@cescoffier
Copy link
Collaborator Author

@andreadimaio would be great!

I know a few cases that should not have been working but were working that are now rejected (calling an imperative method from the event loop without the @NonBlocking annotation). Still, most of these cases are detected (in the case of streaming), and a switch to a worker thread is automatically done. Because of this, I don't think the emitOn you add is required anymore. But I would need to see the full stack trace.

Now there is still a bit of work do do, as I "forgot" to implement context propagation. It should not take long, but my next coding session will be next week :-(

@andreadimaio
Copy link
Collaborator

Here you can find the test cescoffier#1.
When I run the test, I get the exception:

The @Blocking, @NonBlocking and @RunOnVirtualThread annotations may only be used on "entrypoint" methods (methods invoked by various frameworks in Quarkus)
Using the @Blocking, @NonBlocking and @RunOnVirtualThread annotations on methods that can only be invoked by application code is invalid
        at io.quarkus.deployment.execannotations.ExecutionModelAnnotationsProcessor.check(ExecutionModelAnnotationsProcessor.java:55)
        at java.base/java.lang.invoke.MethodHandle.invokeWithArguments(MethodHandle.java:733)
        at io.quarkus.deployment.ExtensionLoader$3.execute(ExtensionLoader.java:856)
        at io.quarkus.builder.BuildContext.run(BuildContext.java:256)
        at org.jboss.threads.ContextHandler$1.runWith(ContextHandler.java:18)
        at org.jboss.threads.EnhancedQueueExecutor$Task.doRunWith(EnhancedQueueExecutor.java:2516)
        at org.jboss.threads.EnhancedQueueExecutor$Task.run(EnhancedQueueExecutor.java:2495)
        at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1521)
        at java.base/java.lang.Thread.run(Thread.java:1583)
        at org.jboss.threads.JBossThread.run(JBossThread.java:483)

If I remove the annotations: @Blocking, @NonBlocking and @RunOnVirtualThread, the tests are green.

@cescoffier
Copy link
Collaborator Author

cescoffier commented Oct 31, 2024 via email

@cescoffier
Copy link
Collaborator Author

cescoffier commented Oct 31, 2024 via email

@cescoffier
Copy link
Collaborator Author

@andreadimaio I think I fixed it.

@andreadimaio
Copy link
Collaborator

@andreadimaio I think I fixed it.

Yes, it works!

@andreadimaio
Copy link
Collaborator

About this code? I think it can be removed and managed with annotations.

@andreadimaio
Copy link
Collaborator

Uhm... if I remove these lines from the WatsonxChatModel class:

if (tools != null) {
   // Today Langchain4j doesn't allow to use the async operation with tools.
   // One idea might be to give to the developer the possibility to use the VirtualThread.
   mutiny.emitOn(Infrastructure.getDefaultWorkerPool());
}

And I try to call a tool method annotated with @Blocking:

@ToolBox(Calculator.class)
public Multi<String> message(@UserMessage String message);
@ApplicationScoped
public class Calculator {

    @Inject
    EmbeddingModel model;

    @Tool
    @Blocking
    public int sum(int a, int b) {
        model.embed("test");
        return a + b;
    }
}

I am getting java.lang.IllegalStateException: Cannot execute blocking tools on event loop thread.
Looking at the code, the TokenStream is executed in a worker-pool, but the exception is raised in any case.

@andreadimaio
Copy link
Collaborator

andreadimaio commented Oct 31, 2024

QuarkusToolExecutor #58

case BLOCKING:
    if (io.vertx.core.Context.isOnEventLoopThread()) {
        throw new IllegalStateException("Cannot execute blocking tools on event loop thread");
    }

I think the problem at this point is the Multi created in the WatsonxChatModel.

@cescoffier
Copy link
Collaborator Author

cescoffier commented Oct 31, 2024 via email

@andreadimaio
Copy link
Collaborator

Yes, if the switch is the line Infrastructure.getDefaultWorkerPool().execute(tokenStream::start); it is executed, but the exception is raised in any case

@cescoffier
Copy link
Collaborator Author

Interesting, can I reproduce it with your test?

@cescoffier
Copy link
Collaborator Author

Ok, I know why it happens - I need to think about the proper way to fix it.

@andreadimaio
Copy link
Collaborator

andreadimaio commented Oct 31, 2024

Interesting, can I reproduce it with your test?

Not yet, but I can try to add it.

Ok, I know why it happens - I need to think about the proper way to fix it.

What's happening?

@cescoffier
Copy link
Collaborator Author

cescoffier commented Oct 31, 2024

Basically, each item is emitted on the event loop. In my tests, I simplified this a bit too much by sending all the items synchronously on subscription (on the event loop).

The current switch only happens at subscription time. So it works for my tests... but only in my tests.

@geoand
Copy link
Collaborator

geoand commented Nov 5, 2024

@cescoffier I'd like to include this in the next release, but if it still needs plenty of work, we can have in a later one

@cescoffier
Copy link
Collaborator Author

I have a plan to fix the remaining issue.

When do you want to cut the release?

@geoand
Copy link
Collaborator

geoand commented Nov 5, 2024

This week was the plan, but it's not set in stone

@cescoffier
Copy link
Collaborator Author

I just got 1 free hour this afternoon, that should be enough

@geoand
Copy link
Collaborator

geoand commented Nov 5, 2024

💪

This commit introduces support for tools to define their execution model with annotations such as @Blocking, @nonblocking, and @RunOnVirtualThread, or to rely on the method signature to specify the execution model.

Execution constraints are enforced based on the caller thread: for instance, blocking calls are disallowed on the event loop.

For reactive AiService method (streaming), a detection is done at build time to determine if the execution can be done on the event loop or need to be switched to a worker thread. The detection checks the execution model of the various tool methods the AI service method can invoke.
@cescoffier cescoffier force-pushed the tools-execution-model branch from 56e1d7c to b33c558 Compare November 5, 2024 15:40
@cescoffier cescoffier marked this pull request as ready for review November 5, 2024 15:40
@cescoffier
Copy link
Collaborator Author

@geoand how do we modify samples? I need to bump the version of quarkus langchain4j to demonstrate the feature. If I switch to 999-SNAPSHOT would it breaks the world?

@cescoffier
Copy link
Collaborator Author

(@geoand well... let's try)

@geoand
Copy link
Collaborator

geoand commented Nov 6, 2024

It seems like some of the tests fail

@cescoffier
Copy link
Collaborator Author

@geoand @andreadimaio Seems like Watsonx is doing something that now blocks the event loop - I'm having a look.

@cescoffier
Copy link
Collaborator Author

Hopefully, it should be better now. The issue was that the initial call was not on a Vert.x context (neither event loop nor worker), which broke the switch (as I had nowhere to switch to).

@geoand
Copy link
Collaborator

geoand commented Nov 6, 2024

All green!

@andreadimaio
Copy link
Collaborator

I could do some testing in my local environment if you think it would be useful. But I can do that in a bit.

@cescoffier cescoffier force-pushed the tools-execution-model branch from 5f1eea1 to bc7d6c0 Compare November 6, 2024 08:55
@andreadimaio
Copy link
Collaborator

It works!! 🚀
@cescoffier I have opened a PR to merge some changes in watsonx, I have removed:

if (tools != null) {
   // Today Langchain4j doesn't allow to use the async operation with tools.
   // One idea might be to give to the developer the possibility to use the VirtualThread.
   mutiny.emitOn(Infrastructure.getDefaultWorkerPool());
}

If you don't want to add this change here, I can open another PR.

Removed emitOn on default worker pool for watsonx.ai
@cescoffier cescoffier merged commit b9d826a into quarkiverse:main Nov 6, 2024
60 checks passed
@cescoffier cescoffier deleted the tools-execution-model branch November 18, 2024 12:58
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants