From 67e93296bec6425bc04cb60dfb3dddb539e751af Mon Sep 17 00:00:00 2001 From: mariofusco Date: Mon, 3 Jun 2024 17:22:07 +0200 Subject: [PATCH] Replace read/write lock in JarResource to avoid virtual threads pinning --- .../bootstrap/runner/JarFileReference.java | 178 ++++++++++++++++++ .../quarkus/bootstrap/runner/JarResource.java | 141 +++++--------- .../bootstrap/runner/RunnerClassLoader.java | 92 +++++---- .../runner/VirtualThreadSupport.java | 52 +++++ 4 files changed, 335 insertions(+), 128 deletions(-) create mode 100644 independent-projects/bootstrap/runner/src/main/java/io/quarkus/bootstrap/runner/JarFileReference.java create mode 100644 independent-projects/bootstrap/runner/src/main/java/io/quarkus/bootstrap/runner/VirtualThreadSupport.java diff --git a/independent-projects/bootstrap/runner/src/main/java/io/quarkus/bootstrap/runner/JarFileReference.java b/independent-projects/bootstrap/runner/src/main/java/io/quarkus/bootstrap/runner/JarFileReference.java new file mode 100644 index 00000000000000..cd066b5f4c6cdb --- /dev/null +++ b/independent-projects/bootstrap/runner/src/main/java/io/quarkus/bootstrap/runner/JarFileReference.java @@ -0,0 +1,178 @@ +package io.quarkus.bootstrap.runner; + +import static io.quarkus.bootstrap.runner.VirtualThreadSupport.isVirtualThread; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.jar.JarFile; + +import io.smallrye.common.io.jar.JarFiles; + +public class JarFileReference { + // Guarded by an atomic reader counter that emulate the behaviour of a read/write lock. + // To enable virtual threads compatibility and avoid pinning it is not possible to use an explicit read/write lock + // because the jarFile access may happen inside a native call (for example triggered by the RunnerClassLoader) + // and then it is necessary to avoid blocking on it. + private final JarFile jarFile; + + // The referenceCounter - 1 represents the number of effective readers (#aqcuire - #release), while the first + // reference is used to determine if a close has been required. + private final AtomicInteger referenceCounter; + + private JarFileReference(JarFile jarFile, boolean acquired) { + this.jarFile = jarFile; + // The JarFileReference can be created as already acquired and in this case the referenceCounter starts from 2 + this.referenceCounter = new AtomicInteger(acquired ? 2 : 1); + } + + /** + * Increase the readers counter of the jarFile. + * + * @return true if the acquiring succeeded: it's now safe to access and use the inner jarFile. + * false if the jar reference is going to be closed and then no longer usable. + */ + private boolean acquire() { + while (true) { + int count = referenceCounter.get(); + if (count == 0) { + return false; + } + if (referenceCounter.compareAndSet(count, count + 1)) { + return true; + } + } + } + + /** + * Decrease the readers counter of the jarFile. + * If the counter drops to 0 and a release has been requested also closes the jarFile. + * + * @return true if the release also closes the underlying jarFile. + */ + private boolean release(JarResource jarResource) { + while (true) { + int count = referenceCounter.get(); + if (count <= 0) { + throw new IllegalStateException( + "The reference counter cannot be negative, found: " + (referenceCounter.get() - 1)); + } + if (referenceCounter.compareAndSet(count, count - 1)) { + if (count == 1) { + try { + jarFile.close(); + } catch (IOException e) { + // ignore + } finally { + jarResource.jarFileReference.set(null); + } + return true; + } + return false; + } + } + } + + /** + * Ask to close this reference. + * If there are no readers currently accessing the jarFile also close it, otherwise defer the closing when the last reader + * will leave. + */ + void close(JarResource jarResource) { + release(jarResource); + } + + @FunctionalInterface + interface JarFileConsumer { + T apply(JarFile jarFile, Path jarPath, String resource); + } + + static T withJarFile(JarResource jarResource, String resource, JarFileConsumer fileConsumer) { + + // Happy path: the jar reference already exists and it's ready to be used + final var localJarFileRefFuture = jarResource.jarFileReference.get(); + if (localJarFileRefFuture != null && localJarFileRefFuture.isDone()) { + JarFileReference jarFileReference = localJarFileRefFuture.join(); + if (jarFileReference.acquire()) { + return consumeSharedJarFile(jarFileReference, jarResource, resource, fileConsumer); + } + } + + // There's no valid jar reference, so load a new one + + // Platform threads can load the jarfile asynchronously and eventually blocking till not ready + // to avoid loading the same jarfile multiple times in parallel + if (!isVirtualThread()) { + // It's ok to eventually block on a join() here since we're sure this is used only by platform thread + return consumeSharedJarFile(asyncLoadAcquiredJarFile(jarResource).join(), jarResource, resource, fileConsumer); + } + + // Virtual threads needs to load the jarfile synchronously to avoid blocking. This means that eventually + // multiple threads could load the same jarfile in parallel and this duplication has to be reconciled + final var newJarFileRef = syncLoadAcquiredJarFile(jarResource); + if (jarResource.jarFileReference.compareAndSet(localJarFileRefFuture, newJarFileRef) || + jarResource.jarFileReference.compareAndSet(null, newJarFileRef)) { + // The new file reference has been successfully published and can be used by the current and other threads + // The join() cannot be blocking here since the CompletableFuture has been created already completed + return consumeSharedJarFile(newJarFileRef.join(), jarResource, resource, fileConsumer); + } + + // The newly created file reference hasn't been published, so it can be used exclusively by the current virtual thread + return consumeUnsharedJarFile(newJarFileRef, jarResource, resource, fileConsumer); + } + + private static T consumeSharedJarFile(JarFileReference jarFileReference, + JarResource jarResource, String resource, JarFileConsumer fileConsumer) { + try { + return fileConsumer.apply(jarFileReference.jarFile, jarResource.jarPath, resource); + } finally { + jarFileReference.release(jarResource); + } + } + + private static T consumeUnsharedJarFile(CompletableFuture jarFileReferenceFuture, + JarResource jarResource, String resource, JarFileConsumer fileConsumer) { + JarFileReference jarFileReference = jarFileReferenceFuture.join(); + try { + return fileConsumer.apply(jarFileReference.jarFile, jarResource.jarPath, resource); + } finally { + boolean closed = jarFileReference.release(jarResource); + assert !closed; + // Check one last time if the file reference can be published and reused by other threads, otherwise close it + if (!jarResource.jarFileReference.compareAndSet(null, jarFileReferenceFuture)) { + closed = jarFileReference.release(jarResource); + assert closed; + } + } + } + + private static CompletableFuture syncLoadAcquiredJarFile(JarResource jarResource) { + try { + return CompletableFuture.completedFuture(new JarFileReference(JarFiles.create(jarResource.jarPath.toFile()), true)); + } catch (IOException e) { + throw new RuntimeException("Failed to open " + jarResource.jarPath, e); + } + } + + private static CompletableFuture asyncLoadAcquiredJarFile(JarResource jarResource) { + CompletableFuture newJarFileRef; + do { + final var jarFileRefFuture = getJarFileReferenceFuture(jarResource); + newJarFileRef = jarFileRefFuture == null ? jarResource.jarFileReference.get() : jarFileRefFuture; + } while (!newJarFileRef.join().acquire()); + return newJarFileRef; + } + + private static CompletableFuture getJarFileReferenceFuture(JarResource jarResource) { + CompletableFuture future = new CompletableFuture(); + if (jarResource.jarFileReference.compareAndSet(null, future)) { + try { + future.complete(new JarFileReference(JarFiles.create(jarResource.jarPath.toFile()), false)); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + return jarResource.jarFileReference.get(); + } +} diff --git a/independent-projects/bootstrap/runner/src/main/java/io/quarkus/bootstrap/runner/JarResource.java b/independent-projects/bootstrap/runner/src/main/java/io/quarkus/bootstrap/runner/JarResource.java index 8b4da096a891d1..60f337e2f37551 100644 --- a/independent-projects/bootstrap/runner/src/main/java/io/quarkus/bootstrap/runner/JarResource.java +++ b/independent-projects/bootstrap/runner/src/main/java/io/quarkus/bootstrap/runner/JarResource.java @@ -11,44 +11,28 @@ import java.security.ProtectionDomain; import java.security.cert.Certificate; import java.util.Objects; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReference; import java.util.jar.JarEntry; import java.util.jar.JarFile; import java.util.zip.ZipEntry; -import java.util.zip.ZipFile; import io.smallrye.common.io.jar.JarEntries; -import io.smallrye.common.io.jar.JarFiles; /** * A jar resource */ public class JarResource implements ClassLoadingResource { - private final ManifestInfo manifestInfo; - private final Path jarPath; - - private final Lock readLock; - private final Lock writeLock; - private volatile ProtectionDomain protectionDomain; + private final ManifestInfo manifestInfo; - //Guarded by the read/write lock; open/close operations on the JarFile require the exclusive lock, - //while using an existing open reference can use the shared lock. - //If a lock is acquired, and as long as it's owned, we ensure that the zipFile reference - //points to an open JarFile instance, and read operations are valid. - //To close the jar, the exclusive lock must be owned, and reference will be set to null before releasing it. - //Likewise, opening a JarFile requires the exclusive lock. - private volatile JarFile zipFile; + final Path jarPath; + final AtomicReference> jarFileReference = new AtomicReference<>(); public JarResource(ManifestInfo manifestInfo, Path jarPath) { this.manifestInfo = manifestInfo; this.jarPath = jarPath; - final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); - this.readLock = readWriteLock.readLock(); - this.writeLock = readWriteLock.writeLock(); } @Override @@ -69,38 +53,48 @@ public void init() { @Override public byte[] getResourceData(String resource) { - final ZipFile zipFile = readLockAcquireAndGetJarReference(); - try { - ZipEntry entry = zipFile.getEntry(resource); + return JarFileReference.withJarFile(this, resource, JarResourceDataProvider.INSTANCE); + } + + private static class JarResourceDataProvider implements JarFileReference.JarFileConsumer { + private static final JarResourceDataProvider INSTANCE = new JarResourceDataProvider(); + + @Override + public byte[] apply(JarFile jarFile, Path path, String res) { + ZipEntry entry = jarFile.getEntry(res); if (entry == null) { return null; } - try (InputStream is = zipFile.getInputStream(entry)) { + try (InputStream is = jarFile.getInputStream(entry)) { byte[] data = new byte[(int) entry.getSize()]; int pos = 0; int rem = data.length; while (rem > 0) { int read = is.read(data, pos, rem); if (read == -1) { - throw new RuntimeException("Failed to read all data for " + resource); + throw new RuntimeException("Failed to read all data for " + res); } pos += read; rem -= read; } return data; } catch (IOException e) { - throw new RuntimeException("Failed to read zip entry " + resource, e); + throw new RuntimeException("Failed to read zip entry " + res, e); } - } finally { - readLock.unlock(); } } @Override public URL getResourceURL(String resource) { - final JarFile jarFile = readLockAcquireAndGetJarReference(); - try { - JarEntry entry = jarFile.getJarEntry(resource); + return JarFileReference.withJarFile(this, resource, JarResourceURLProvider.INSTANCE); + } + + private static class JarResourceURLProvider implements JarFileReference.JarFileConsumer { + private static final JarResourceURLProvider INSTANCE = new JarResourceURLProvider(); + + @Override + public URL apply(JarFile jarFile, Path path, String res) { + JarEntry entry = jarFile.getJarEntry(res); if (entry == null) { return null; } @@ -110,15 +104,7 @@ public URL getResourceURL(String resource) { if (realName.endsWith("/")) { realName = realName.substring(0, realName.length() - 1); } - final URI jarUri = jarPath.toUri(); - // first create a URI which includes both the jar file path and the relative resource name - // and then invoke a toURL on it. The URI reconstruction allows for any encoding to be done - // for the "path" which includes the "realName" - var ssp = new StringBuilder(jarUri.getPath().length() + realName.length() + 2); - ssp.append(jarUri.getPath()); - ssp.append("!/"); - ssp.append(realName); - final URL resUrl = new URI(jarUri.getScheme(), ssp.toString(), null).toURL(); + final URL resUrl = getUrl(path, realName); // wrap it up into a "jar" protocol URL //horrible hack to deal with '?' characters in the URL //seems to be the only way, the URI constructor just does not let you handle them in a sane way @@ -136,8 +122,18 @@ public URL getResourceURL(String resource) { } catch (MalformedURLException | URISyntaxException e) { throw new RuntimeException(e); } - } finally { - readLock.unlock(); + } + + private static URL getUrl(Path jarPath, String realName) throws MalformedURLException, URISyntaxException { + final URI jarUri = jarPath.toUri(); + // first create a URI which includes both the jar file path and the relative resource name + // and then invoke a toURL on it. The URI reconstruction allows for any encoding to be done + // for the "path" which includes the "realName" + var ssp = new StringBuilder(jarUri.getPath().length() + realName.length() + 2); + ssp.append(jarUri.getPath()); + ssp.append("!/"); + ssp.append(realName); + return new URI(jarUri.getScheme(), ssp.toString(), null).toURL(); } } @@ -151,60 +147,15 @@ public ProtectionDomain getProtectionDomain() { return protectionDomain; } - private JarFile readLockAcquireAndGetJarReference() { - while (true) { - readLock.lock(); - final JarFile zipFileLocal = this.zipFile; - if (zipFileLocal != null) { - //Expected fast path: returns a reference to the open JarFile while owning the readLock - return zipFileLocal; - } else { - //This Lock implementation doesn't allow upgrading a readLock to a writeLock, so release it - //as we're going to need the WriteLock. - readLock.unlock(); - //trigger the JarFile being (re)opened. - ensureJarFileIsOpen(); - //Now since we no longer own any lock, we need to try again to obtain the readLock - //and check for the reference still being valid. - //This exposes us to a race with closing the just-opened JarFile; - //however this should be extremely rare, so we can trust we won't loop much; - //A counter doesn't seem necessary, as in fact we know that methods close() - //and resetInternalCaches() are invoked each at most once, which limits the amount - //of loops here in practice. - } - } - } - - private void ensureJarFileIsOpen() { - writeLock.lock(); - try { - if (this.zipFile == null) { - try { - this.zipFile = JarFiles.create(jarPath.toFile()); - } catch (IOException e) { - throw new RuntimeException("Failed to open " + jarPath, e); - } - } - } finally { - writeLock.unlock(); - } - } - @Override public void close() { - writeLock.lock(); - try { - final JarFile zipFileLocal = this.zipFile; - if (zipFileLocal != null) { - try { - this.zipFile = null; - zipFileLocal.close(); - } catch (Throwable e) { - //ignore - } - } - } finally { - writeLock.unlock(); + var futureRef = jarFileReference.get(); + if (futureRef != null) { + // The jarfile has been already used and it's going to be removed from the cache, + // so the future must be already completed + var ref = futureRef.getNow(null); + assert (ref != null); + ref.close(this); } } diff --git a/independent-projects/bootstrap/runner/src/main/java/io/quarkus/bootstrap/runner/RunnerClassLoader.java b/independent-projects/bootstrap/runner/src/main/java/io/quarkus/bootstrap/runner/RunnerClassLoader.java index 7917d17b851f0d..ae5f3d6ba3e4ce 100644 --- a/independent-projects/bootstrap/runner/src/main/java/io/quarkus/bootstrap/runner/RunnerClassLoader.java +++ b/independent-projects/bootstrap/runner/src/main/java/io/quarkus/bootstrap/runner/RunnerClassLoader.java @@ -28,6 +28,10 @@ */ public final class RunnerClassLoader extends ClassLoader { + static { + registerAsParallelCapable(); + } + /** * A map of resources by dir name. Root dir/default package is represented by the empty string */ @@ -103,18 +107,55 @@ public Class loadClass(String name, boolean resolve) throws ClassNotFoundExce continue; } definePackage(packageName, resources); - try { - return defineClass(name, data, 0, data.length, resource.getProtectionDomain()); - } catch (LinkageError e) { - loaded = findLoadedClass(name); - if (loaded != null) { - return loaded; + return defineClass(name, data, resource); + } + } + return getParent().loadClass(name); + } + + private void definePackage(String pkgName, ClassLoadingResource[] resources) { + if ((pkgName != null) && getDefinedPackage(pkgName) == null) { + for (ClassLoadingResource classPathElement : resources) { + ManifestInfo mf = classPathElement.getManifestInfo(); + if (mf != null) { + try { + definePackage(pkgName, mf.getSpecTitle(), + mf.getSpecVersion(), + mf.getSpecVendor(), + mf.getImplTitle(), + mf.getImplVersion(), + mf.getImplVendor(), null); + } catch (IllegalArgumentException e) { + var loaded = getDefinedPackage(pkgName); + if (loaded == null) { + throw e; + } } + return; + } + } + try { + definePackage(pkgName, null, null, null, null, null, null, null); + } catch (IllegalArgumentException e) { + var loaded = getDefinedPackage(pkgName); + if (loaded == null) { throw e; } } } - return getParent().loadClass(name); + } + + private Class defineClass(String name, byte[] data, ClassLoadingResource resource) { + Class loaded; + try { + return defineClass(name, data, 0, data.length, resource.getProtectionDomain()); + } catch (LinkageError e) { + loaded = findLoadedClass(name); + if (loaded != null) { + return loaded; + } + throw e; + } } private void accessingResource(final ClassLoadingResource resource) { @@ -131,21 +172,28 @@ private void accessingResource(final ClassLoadingResource resource) { //it's already on the head of the cache: nothing to be done. return; } + for (int i = 1; i < currentlyBufferedResources.length; i++) { final ClassLoadingResource currentI = currentlyBufferedResources[i]; if (currentI == resource || currentI == null) { //it was already cached, or we found an empty slot: bubble it up by one position to give it a boost - final ClassLoadingResource previous = currentlyBufferedResources[i - 1]; - currentlyBufferedResources[i - 1] = resource; - currentlyBufferedResources[i] = previous; + for (int j = i; j > 0; j--) { + currentlyBufferedResources[j] = currentlyBufferedResources[j - 1]; + } + currentlyBufferedResources[0] = resource; return; } } + // else, we drop one element from the cache, // and inserting the latest resource on the tail: toEvict = currentlyBufferedResources[currentlyBufferedResources.length - 1]; - currentlyBufferedResources[currentlyBufferedResources.length - 1] = resource; + for (int j = currentlyBufferedResources.length - 1; j > 0; j--) { + currentlyBufferedResources[j] = currentlyBufferedResources[j - 1]; + } + currentlyBufferedResources[0] = resource; } + // Finally, release the cache for the dropped element: toEvict.resetInternalCaches(); } @@ -221,28 +269,6 @@ protected Enumeration findResources(String name) { return Collections.enumeration(urls); } - private void definePackage(String pkgName, ClassLoadingResource[] resources) { - if ((pkgName != null) && getPackage(pkgName) == null) { - synchronized (getClassLoadingLock(pkgName)) { - if (getPackage(pkgName) == null) { - for (ClassLoadingResource classPathElement : resources) { - ManifestInfo mf = classPathElement.getManifestInfo(); - if (mf != null) { - definePackage(pkgName, mf.getSpecTitle(), - mf.getSpecVersion(), - mf.getSpecVendor(), - mf.getImplTitle(), - mf.getImplVersion(), - mf.getImplVendor(), null); - return; - } - } - definePackage(pkgName, null, null, null, null, null, null, null); - } - } - } - } - private String getPackageNameFromClassName(String className) { final int index = className.lastIndexOf('.'); if (index == -1) { diff --git a/independent-projects/bootstrap/runner/src/main/java/io/quarkus/bootstrap/runner/VirtualThreadSupport.java b/independent-projects/bootstrap/runner/src/main/java/io/quarkus/bootstrap/runner/VirtualThreadSupport.java new file mode 100644 index 00000000000000..5d6f03a51a3ab2 --- /dev/null +++ b/independent-projects/bootstrap/runner/src/main/java/io/quarkus/bootstrap/runner/VirtualThreadSupport.java @@ -0,0 +1,52 @@ +package io.quarkus.bootstrap.runner; + +import java.lang.invoke.MethodHandle; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.MethodType; + +public class VirtualThreadSupport { + + private static final int MAJOR_JAVA_VERSION = majorVersionFromJavaSpecificationVersion(); + + private static final MethodHandle virtualMh = MAJOR_JAVA_VERSION >= 21 ? findVirtualMH() : null; + + private static MethodHandle findVirtualMH() { + try { + return MethodHandles.publicLookup().findVirtual(Thread.class, "isVirtual", + MethodType.methodType(boolean.class)); + } catch (Exception e) { + return null; + } + } + + static boolean isVirtualThread() { + if (virtualMh == null) { + return false; + } + try { + return (boolean) virtualMh.invokeExact(Thread.currentThread()); + } catch (Throwable t) { + return false; + } + } + + static int majorVersionFromJavaSpecificationVersion() { + return majorVersion(System.getProperty("java.specification.version", "17")); + } + + static int majorVersion(String javaSpecVersion) { + String[] components = javaSpecVersion.split("\\."); + int[] version = new int[components.length]; + + for (int i = 0; i < components.length; ++i) { + version[i] = Integer.parseInt(components[i]); + } + + if (version[0] == 1) { + assert version[1] >= 6; + return version[1]; + } else { + return version[0]; + } + } +}