Skip to content

Commit

Permalink
#97 add class bytes cache to org.apache.spark.util.ClosureCleaner#get…
Browse files Browse the repository at this point in the history
…ClassReader
  • Loading branch information
hn5092 committed Jan 13, 2020
1 parent 89ad847 commit e68b34b
Showing 1 changed file with 25 additions and 6 deletions.
31 changes: 25 additions & 6 deletions core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.lang.invoke.SerializedLambda
import scala.collection.mutable.{Map, Set, Stack}
import scala.language.existentials

import com.google.common.cache.{Cache, CacheBuilder, RemovalListener, RemovalNotification}
import org.apache.xbean.asm6.{ClassReader, ClassVisitor, MethodVisitor, Type}
import org.apache.xbean.asm6.Opcodes._

Expand All @@ -35,18 +36,36 @@ import org.apache.spark.internal.Logging
private[spark] object ClosureCleaner extends Logging {

private val isScala2_11 = scala.util.Properties.versionString.contains("2.11")
val classBytesCache: Cache[String, Array[Byte]] = CacheBuilder.newBuilder
.maximumSize(10000)
.removalListener(new RemovalListener[String, Array[Byte]]() {
override def onRemoval(
notification: RemovalNotification[String, Array[Byte]]): Unit = {
logInfo("Remove class : " + notification.getKey)
}
})
.build
.asInstanceOf[Cache[String, Array[Byte]]]


// Get an ASM class reader for a given class from the JAR that loaded it
private[util] def getClassReader(cls: Class[_]): ClassReader = {
// Copy data over, before delegating to ClassReader - else we can run out of open file handles.
val className = cls.getName.replaceFirst("^.*\\.", "") + ".class"
val resourceStream = cls.getResourceAsStream(className)
if (resourceStream == null) {
null
val bytes = classBytesCache.getIfPresent(className)
if (bytes != null) {
new ClassReader(new ByteArrayInputStream(bytes.clone()))
} else {
val baos = new ByteArrayOutputStream(128)
Utils.copyStream(resourceStream, baos, true)
new ClassReader(new ByteArrayInputStream(baos.toByteArray))
val resourceStream = cls.getResourceAsStream(className)
if (resourceStream == null) {
null
} else {
val baos = new ByteArrayOutputStream(128)
Utils.copyStream(resourceStream, baos, true)
val array = baos.toByteArray
classBytesCache.put(className, array)
new ClassReader(new ByteArrayInputStream(array.clone()))
}
}
}

Expand Down

0 comments on commit e68b34b

Please sign in to comment.