Skip to content

Commit

Permalink
#57 fix KAP ut Don't delete shuffle data immediately
Browse files Browse the repository at this point in the history
  • Loading branch information
zheniantoushipashi committed Oct 18, 2019
1 parent cc48029 commit 0566b51
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 35 deletions.
32 changes: 0 additions & 32 deletions core/src/main/scala/org/apache/spark/ContextCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,6 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {

private val periodicGCService: ScheduledExecutorService =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("context-cleaner-periodic-gc")

var queryShuffle = new ConcurrentHashMap[String, HashSet[Int]]()
/**
* How often to trigger a garbage collection in this JVM.
*
Expand Down Expand Up @@ -173,17 +171,6 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {

/** Register a ShuffleDependency for cleanup when it is garbage collected. */
def registerShuffleForCleanup(shuffleDependency: ShuffleDependency[_, _, _]): Unit = {
Option(SparkContext.getActive.get.getLocalProperty("spark.sql.execution.id")) match {
case Some(executionId) =>
if (queryShuffle.containsKey(executionId)) {
queryShuffle.get(executionId).add(shuffleDependency.shuffleId)
} else {
queryShuffle.put(executionId, new HashSet[Int]()+=(shuffleDependency.shuffleId))
}
logDebug(s"add shuffle id ${shuffleDependency.shuffleId}" +
s" for executionId: $executionId")
case _ =>
}
registerForCleanup(shuffleDependency, CleanShuffle(shuffleDependency.shuffleId))
}

Expand Down Expand Up @@ -219,25 +206,6 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
}
}

def cleanupShuffle(executionId: String): Unit = {
logInfo(s"Cleaning shuffle for executionId: $executionId")
if (queryShuffle.containsKey(executionId)) {
queryShuffle.get(executionId).foreach { shuffleId =>
logDebug(s"Cleaning shuffleId: $shuffleId for executionId: $executionId")
cleanupExecutorPool.submit(new Runnable {
override def run(): Unit = {
try {
doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
} catch {
case ie: InterruptedException if stopped => // ignore
case e: Exception => logError("Error in cleaning shuffle", e)
}
}
})
}
}
}

private def runtCleanTask(ref: CleanupTaskWeakReference) = {
cleanupExecutorPool.submit(new Runnable {
override def run(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,6 @@ object SQLExecution extends Logging{
executionIdToQueryExecution.remove(executionId)
sc.setLocalProperty(EXECUTION_ID_KEY, oldExecutionId)
SparkEnv.get.broadcastManager.cleanBroadCast(executionId.toString)
logDebug(s"start clean shuffle data for executionId: $executionId")
sc.cleaner.get.cleanupShuffle(executionId.toString)
logDebug(s"finish clean shuffle data for executionId: $executionId")
}
}

Expand Down

0 comments on commit 0566b51

Please sign in to comment.