From 0566b51335f1982646670ea873564e62f675e6ba Mon Sep 17 00:00:00 2001 From: zheniantoushipashi Date: Thu, 17 Oct 2019 19:05:00 +0800 Subject: [PATCH] #57 fix KAP ut Don't delete shuffle data immediately --- .../org/apache/spark/ContextCleaner.scala | 32 ------------------- .../spark/sql/execution/SQLExecution.scala | 3 -- 2 files changed, 35 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala index 64b60f13159e4..f5e747356334a 100644 --- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala +++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala @@ -77,8 +77,6 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { private val periodicGCService: ScheduledExecutorService = ThreadUtils.newDaemonSingleThreadScheduledExecutor("context-cleaner-periodic-gc") - - var queryShuffle = new ConcurrentHashMap[String, HashSet[Int]]() /** * How often to trigger a garbage collection in this JVM. * @@ -173,17 +171,6 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { /** Register a ShuffleDependency for cleanup when it is garbage collected. */ def registerShuffleForCleanup(shuffleDependency: ShuffleDependency[_, _, _]): Unit = { - Option(SparkContext.getActive.get.getLocalProperty("spark.sql.execution.id")) match { - case Some(executionId) => - if (queryShuffle.containsKey(executionId)) { - queryShuffle.get(executionId).add(shuffleDependency.shuffleId) - } else { - queryShuffle.put(executionId, new HashSet[Int]()+=(shuffleDependency.shuffleId)) - } - logDebug(s"add shuffle id ${shuffleDependency.shuffleId}" + - s" for executionId: $executionId") - case _ => - } registerForCleanup(shuffleDependency, CleanShuffle(shuffleDependency.shuffleId)) } @@ -219,25 +206,6 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { } } - def cleanupShuffle(executionId: String): Unit = { - logInfo(s"Cleaning shuffle for executionId: $executionId") - if (queryShuffle.containsKey(executionId)) { - queryShuffle.get(executionId).foreach { shuffleId => - logDebug(s"Cleaning shuffleId: $shuffleId for executionId: $executionId") - cleanupExecutorPool.submit(new Runnable { - override def run(): Unit = { - try { - doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks) - } catch { - case ie: InterruptedException if stopped => // ignore - case e: Exception => logError("Error in cleaning shuffle", e) - } - } - }) - } - } - } - private def runtCleanTask(ref: CleanupTaskWeakReference) = { cleanupExecutorPool.submit(new Runnable { override def run(): Unit = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala index 41c821137806b..399b155e17f0b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala @@ -94,9 +94,6 @@ object SQLExecution extends Logging{ executionIdToQueryExecution.remove(executionId) sc.setLocalProperty(EXECUTION_ID_KEY, oldExecutionId) SparkEnv.get.broadcastManager.cleanBroadCast(executionId.toString) - logDebug(s"start clean shuffle data for executionId: $executionId") - sc.cleaner.get.cleanupShuffle(executionId.toString) - logDebug(s"finish clean shuffle data for executionId: $executionId") } }