From 2f364cfdedccd1e00855e2b9f713e318b879431a Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Fri, 3 Nov 2023 14:43:11 -0500 Subject: [PATCH] Use dedicated executor service for class analysis (#340) The common fork join pool might be used by extern libs, so spinning up a dynamically resizing dedicated executor service capped by num CPUs. --- CHANGELOG.md | 4 ++ gradle.properties | 2 +- .../parseq/lambda/ASMBasedTaskDescriptor.java | 56 +++++++++++-------- 3 files changed, 38 insertions(+), 24 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 78561755..d99ffcee 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +v5.1.17 +------ +* Use dedicated executor service for lambda analysis + v5.1.16 ------ * Add the support of offloading sendRequest call to an executor diff --git a/gradle.properties b/gradle.properties index 337bf753..fe103ea0 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,3 +1,3 @@ -version=5.1.16 +version=5.1.17 group=com.linkedin.parseq org.gradle.parallel=true diff --git a/subprojects/parseq-lambda-names/src/main/java/com/linkedin/parseq/lambda/ASMBasedTaskDescriptor.java b/subprojects/parseq-lambda-names/src/main/java/com/linkedin/parseq/lambda/ASMBasedTaskDescriptor.java index ac0f3136..d6dbfc65 100644 --- a/subprojects/parseq-lambda-names/src/main/java/com/linkedin/parseq/lambda/ASMBasedTaskDescriptor.java +++ b/subprojects/parseq-lambda-names/src/main/java/com/linkedin/parseq/lambda/ASMBasedTaskDescriptor.java @@ -13,7 +13,11 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -41,9 +45,15 @@ */ public class ASMBasedTaskDescriptor implements TaskDescriptor { - private static final ConcurrentMap _names = new ConcurrentHashMap<>(); - private static final AtomicReference _latchRef = new AtomicReference(); - private static final AtomicInteger _count = new AtomicInteger(); + private static final ConcurrentMap NAMES = new ConcurrentHashMap<>(); + private static final AtomicReference LATCH_REF = new AtomicReference<>(); + private static final AtomicInteger COUNT = new AtomicInteger(); + // Dynamically allow downsizing of threads, never increase more than CPU due to analysis being CPU intensive + private static final ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(0, + Runtime.getRuntime().availableProcessors(), + 5, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>()); public static class AnalyzerAdvice { @@ -151,7 +161,7 @@ Optional getLambdaClassDescription(String className) { return Optional.empty(); } String name = className.substring(0, slashIndex); - String description = _names.get(name); + String description = NAMES.get(name); // If we have already analyzed the class, we don't need to await // analysis on other lambdas. @@ -159,7 +169,7 @@ Optional getLambdaClassDescription(String className) { return Optional.of(description).filter(s -> !s.isEmpty()); } - CountDownLatch latch = _latchRef.get(); + CountDownLatch latch = LATCH_REF.get(); if (latch != null) { try { // We wait up to one minute - an arbitrary, sufficiently large amount of time. @@ -167,18 +177,18 @@ Optional getLambdaClassDescription(String className) { latch.await(1, TimeUnit.MINUTES); } catch (InterruptedException e) { System.err.println("ERROR: ParSeq Latch timed out suggesting serious issue in ASMBasedTaskDescriptor. " - + "Current number of class being analyzed: " + _count.get()); + + "Current number of class being analyzed: " + COUNT.get()); e.printStackTrace(); Thread.currentThread().interrupt(); } } // Try again - return Optional.ofNullable(_names.get(name)).filter(s -> !s.isEmpty()); + return Optional.ofNullable(NAMES.get(name)).filter(s -> !s.isEmpty()); } private static void add(String lambdaClassName, String description) { - _names.put(lambdaClassName, description); + NAMES.put(lambdaClassName, description); } public static class Analyzer implements ClassFileTransformer { @@ -218,12 +228,22 @@ public void run() { System.out.println("WARNING: Parseq cannot doAnalyze"); t.printStackTrace(); } - if (_count.decrementAndGet() == 0) { - CountDownLatch latch = _latchRef.getAndSet(null); + if (COUNT.decrementAndGet() == 0) { + CountDownLatch latch = LATCH_REF.getAndSet(null); latch.countDown(); } } + + public static void doAnalyze(byte[] byteCode, ClassLoader loader, Exception exception) { + ClassReader reader = new ClassReader(byteCode); + LambdaClassLocator cv = new LambdaClassLocator(Opcodes.ASM7, loader, exception); + reader.accept(cv, 0); + if (cv.isLambdaClass()) { + LambdaClassDescription lambdaClassDescription = cv.getLambdaClassDescription(); + add(lambdaClassDescription.getClassName(), lambdaClassDescription.getDescription()); + } + } } @Override @@ -236,9 +256,9 @@ public byte[] transform(ClassLoader loader, String className, Class classBein } public static void analyze(byte[] byteCode, ClassLoader loader) { - if (_count.getAndIncrement() == 0) { + if (COUNT.getAndIncrement() == 0) { CountDownLatch latch = new CountDownLatch(1); - while (!_latchRef.compareAndSet(null, latch)) { + while (!LATCH_REF.compareAndSet(null, latch)) { /* * Busy spin. If we got here it means that other thread just * decremented _count to 0 and is about to null out _latchRef. @@ -248,17 +268,7 @@ public static void analyze(byte[] byteCode, ClassLoader loader) { } } final Exception e = new Exception(); - ForkJoinPool.commonPool().execute(AnalyzerRunnable.of(byteCode, loader, e)); - } - - public static void doAnalyze(byte[] byteCode, ClassLoader loader, Exception exception) { - ClassReader reader = new ClassReader(byteCode); - LambdaClassLocator cv = new LambdaClassLocator(Opcodes.ASM7, loader, exception); - reader.accept(cv, 0); - if (cv.isLambdaClass()) { - LambdaClassDescription lambdaClassDescription = cv.getLambdaClassDescription(); - add(lambdaClassDescription.getClassName(), lambdaClassDescription.getDescription()); - } + EXECUTOR_SERVICE.submit(AnalyzerRunnable.of(byteCode, loader, e)); } } }