Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AMQ-9394] Tech Preview: Virtual Thread support #1172

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.activemq.ConfigurationException;
import org.apache.activemq.Service;
import org.apache.activemq.advisory.AdvisoryBroker;
import org.apache.activemq.annotation.Experimental;
import org.apache.activemq.broker.cluster.ConnectionSplitBroker;
import org.apache.activemq.broker.jmx.AnnotatedMBean;
import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
Expand Down Expand Up @@ -223,6 +224,7 @@ public class BrokerService implements Service {
private boolean monitorConnectionSplits = false;
private int taskRunnerPriority = Thread.NORM_PRIORITY;
private boolean dedicatedTaskRunner;
private boolean virtualThreadTaskRunner;
private boolean cacheTempDestinations = false;// useful for failover
private int timeBeforePurgeTempDestinations = 5000;
private final List<Runnable> shutdownHooks = new ArrayList<>();
Expand Down Expand Up @@ -1269,7 +1271,7 @@ public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) throws
public TaskRunnerFactory getTaskRunnerFactory() {
if (this.taskRunnerFactory == null) {
this.taskRunnerFactory = new TaskRunnerFactory("ActiveMQ BrokerService["+getBrokerName()+"] Task", getTaskRunnerPriority(), true, 1000,
isDedicatedTaskRunner());
isDedicatedTaskRunner(), isVirtualThreadTaskRunner());
this.taskRunnerFactory.setThreadClassLoader(this.getClass().getClassLoader());
}
return this.taskRunnerFactory;
Expand All @@ -1280,9 +1282,10 @@ public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
}

public TaskRunnerFactory getPersistenceTaskRunnerFactory() {
// [AMQ-9394] TODO: Should we have a separate config flag for virtualThread for persistence task runner?
if (taskRunnerFactory == null) {
persistenceTaskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", persistenceThreadPriority,
true, 1000, isDedicatedTaskRunner());
true, 1000, isDedicatedTaskRunner(), isVirtualThreadTaskRunner());
}
return persistenceTaskRunnerFactory;
}
Expand Down Expand Up @@ -1891,6 +1894,15 @@ public void setDedicatedTaskRunner(boolean dedicatedTaskRunner) {
this.dedicatedTaskRunner = dedicatedTaskRunner;
}

public boolean isVirtualThreadTaskRunner() {
return virtualThreadTaskRunner;
}

@Experimental("Tech Preview for Virtaul Thread support")
public void setVirtualThreadTaskRunner(boolean virtualThreadTaskRunner) {
this.virtualThreadTaskRunner = virtualThreadTaskRunner;
}

public boolean isCacheTempDestinations() {
return cacheTempDestinations;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,16 @@ public boolean isSlave() {
return brokerService.isSlave();
}

@Override
public boolean isDedicatedTaskRunner() {
return brokerService.isDedicatedTaskRunner();
}

@Override
public boolean isVirtualThreadTaskRunner() {
return brokerService.isVirtualThreadTaskRunner();
}

private ManagedRegionBroker safeGetBroker() {
if (broker == null) {
throw new IllegalStateException("Broker is not yet started.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,4 +335,11 @@ public interface BrokerViewMBean extends Service {

@MBeanInfo(value="The total number of times that the max number of uncommitted count has been exceeded across all destinations")
long getTotalMaxUncommittedExceededCount();

@MBeanInfo("Dedicated Task Runner enabled.")
boolean isDedicatedTaskRunner();

@MBeanInfo("Virtual Thread Task Runner enabled.")
boolean isVirtualThreadTaskRunner();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker;

import junit.framework.Test;

public class VirtualThreadTaskRunnerBrokerTest extends BrokerTest {

protected BrokerService createBroker() throws Exception {
BrokerService broker = super.createBroker();
broker.setVirtualThreadTaskRunner(true);
return broker;
}

public static Test suite() {
return suite(VirtualThreadTaskRunnerBrokerTest.class);
}

public static void main(String[] args) {
junit.textui.TestRunner.run(suite());
}

}
209 changes: 209 additions & 0 deletions activemq-client-jdk21/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>6.1.1-SNAPSHOT</version>
</parent>
<artifactId>activemq-client-jdk21</artifactId>
<packaging>bundle</packaging>
<name>ActiveMQ :: Client JDK 21</name>
<description>ActiveMQ Client implementation compiled with JDK 21 and tech preview support for Virtual Threads</description>
<properties>
<source-version>21</source-version>
<target-version>21</target-version>
<maven.javadoc.skip>true</maven.javadoc.skip>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>jakarta.jms</groupId>
<artifactId>jakarta.jms-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>jakarta.jms</groupId>
<artifactId>jakarta.jms-api</artifactId>
<version>${jakarta-jms-api-version}</version>
</dependency>
<dependency>
<groupId>org.fusesource.hawtbuf</groupId>
<artifactId>hawtbuf</artifactId>
<version>${hawtbuf-version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.jmdns</groupId>
<artifactId>jmdns</artifactId>
<optional>true</optional>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.thoughtworks.xstream</groupId>
<artifactId>xstream</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-broker</artifactId>
<scope>test</scope>
<classifier>tests</classifier>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-unit-tests</artifactId>
<scope>test</scope>
<classifier>tests</classifier>
<version>${project.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>unpack-source</id>
<phase>initialize</phase>
<goals>
<goal>unpack</goal>
</goals>
<configuration>
<artifactItems>
<artifactItem>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<classifier>sources</classifier>
<type>jar</type>
<outputDirectory>${project.build.directory}/copied-sources/activemq-client</outputDirectory>
<excludes>**/META-INF/*,**/META-INF/maven/**,**/zeroconf/**</excludes>
<includes>**/**</includes>
</artifactItem>
</artifactItems>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<executions>
<execution>
<id>copy-java-source</id>
<phase>generate-sources</phase>
<goals>
<goal>copy-resources</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/generated-sources</outputDirectory>
<resources>
<resource>
<directory>${project.build.directory}/copied-sources/activemq-client</directory>
</resource>
</resources>
</configuration>
</execution>
<execution>
<id>copy-resources</id>
<phase>generate-sources</phase>
<goals>
<goal>copy-resources</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/generated-resources/META-INF</outputDirectory>
<resources>
<resource>
<directory>${project.build.directory}/copied-sources/activemq-client/META-INF</directory>
</resource>
</resources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<executions>
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>${project.build.directory}/generated-sources</source>
</sources>
</configuration>
</execution>
<execution>
<id>add-resource</id>
<phase>generate-resources</phase>
<goals>
<goal>add-resource</goal>
</goals>
<configuration>
<resources>
<resource>
<directory>${project.build.directory}/generated-resources</directory>
</resource>
</resources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
<extensions>true</extensions>
<inherited>true</inherited>
<configuration>
<instructions>
<Import-Package>
!java.*,
!com.google.errorprone.annotations,
!com.google.errorprone.annotations.concurrent,
com.thoughtworks.xstream.*;resolution:="optional",
*
</Import-Package>
<Private-Package>
com.google.errorprone.annotations,
com.google.errorprone.annotations.concurrent
</Private-Package>
<_noee>true</_noee>
</instructions>
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.thread;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.activemq.annotation.Experimental;
import org.slf4j.Logger;

@Experimental("Tech Preview for Virtual Thread support")
public class VirtualThreadExecutor {

private VirtualThreadExecutor() {}

public static ExecutorService createVirtualThreadExecutorService(final String name, final AtomicLong id, final Logger LOG) {

// [AMQ-9394] NOTE: Submitted JDK feature enhancement id: 9076243 to allow AtomicLong thread id param
// https://bugs.java.com/bugdatabase/view_bug?bug_id=JDK-8320377
Thread.Builder.OfVirtual threadBuilderOfVirtual = Thread.ofVirtual()
.name(name)
.uncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(final Thread t, final Throwable e) {
LOG.error("Error in thread '{}'", t.getName(), e);
}
});

// [AMQ-9394] Work around to have global thread id increment across ThreadFactories
ThreadFactory virtualThreadFactory = threadBuilderOfVirtual.factory();
ThreadFactory atomicThreadFactory = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread tmpThread = virtualThreadFactory.newThread(r);
tmpThread.setName(tmpThread.getName() + id.incrementAndGet());
return tmpThread;
}
};

return Executors.newThreadPerTaskExecutor(atomicThreadFactory); // [AMQ-9394] Same as newVirtualThreadPerTaskExecutor
}
}
Loading