diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala index 43d62561e8eba..ada91ed5b439a 100644 --- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala +++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala @@ -23,6 +23,7 @@ import java.lang.invoke.SerializedLambda import scala.collection.mutable.{Map, Set, Stack} import scala.language.existentials +import com.google.common.cache.{Cache, CacheBuilder, RemovalListener, RemovalNotification} import org.apache.xbean.asm6.{ClassReader, ClassVisitor, MethodVisitor, Type} import org.apache.xbean.asm6.Opcodes._ @@ -35,18 +36,36 @@ import org.apache.spark.internal.Logging private[spark] object ClosureCleaner extends Logging { private val isScala2_11 = scala.util.Properties.versionString.contains("2.11") + val classBytesCache: Cache[String, Array[Byte]] = CacheBuilder.newBuilder + .maximumSize(10000) + .removalListener(new RemovalListener[String, Array[Byte]]() { + override def onRemoval( + notification: RemovalNotification[String, Array[Byte]]): Unit = { + logInfo("Remove class : " + notification.getKey) + } + }) + .build + .asInstanceOf[Cache[String, Array[Byte]]] + // Get an ASM class reader for a given class from the JAR that loaded it private[util] def getClassReader(cls: Class[_]): ClassReader = { // Copy data over, before delegating to ClassReader - else we can run out of open file handles. val className = cls.getName.replaceFirst("^.*\\.", "") + ".class" - val resourceStream = cls.getResourceAsStream(className) - if (resourceStream == null) { - null + val bytes = classBytesCache.getIfPresent(className) + if (bytes != null) { + new ClassReader(new ByteArrayInputStream(bytes.clone())) } else { - val baos = new ByteArrayOutputStream(128) - Utils.copyStream(resourceStream, baos, true) - new ClassReader(new ByteArrayInputStream(baos.toByteArray)) + val resourceStream = cls.getResourceAsStream(className) + if (resourceStream == null) { + null + } else { + val baos = new ByteArrayOutputStream(128) + Utils.copyStream(resourceStream, baos, true) + val array = baos.toByteArray + classBytesCache.put(className, array) + new ClassReader(new ByteArrayInputStream(array.clone())) + } } }