Skip to content

Commit

Permalink
[KYUUBI #5377][FOLLOWUP] Always try to cleanup session result path
Browse files Browse the repository at this point in the history
  • Loading branch information
turboFei committed Jan 17, 2024
1 parent e9e2d18 commit 895caec
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,9 @@ import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys}
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_SUBMIT_TIME_KEY, KYUUBI_ENGINE_URL}
import org.apache.kyuubi.engine.ShareLevel
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.engineId
import org.apache.kyuubi.engine.spark.SparkSQLEngine.{countDownLatch, currentEngine}
import org.apache.kyuubi.engine.spark.events.{EngineEvent, EngineEventsStore, SparkEventHandlerRegister}
import org.apache.kyuubi.engine.spark.session.SparkSessionImpl
import org.apache.kyuubi.engine.spark.session.{SparkSessionImpl, SparkSQLSessionManager}
import org.apache.kyuubi.events.EventBus
import org.apache.kyuubi.ha.HighAvailabilityConf._
import org.apache.kyuubi.ha.client.RetryPolicies
Expand All @@ -60,7 +59,8 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin

@volatile private var lifetimeTerminatingChecker: Option[ScheduledExecutorService] = None
@volatile private var stopEngineExec: Option[ThreadPoolExecutor] = None
@volatile private var engineSavePath: Option[String] = None
private lazy val engineSavePath =
backendService.sessionManager.asInstanceOf[SparkSQLSessionManager].getEngineResultSavePath()

override def initialize(conf: KyuubiConf): Unit = {
val listener = new SparkSQLEngineListener(this)
Expand Down Expand Up @@ -92,9 +92,7 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin
}

if (backendService.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE)) {
val savePath = backendService.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE_DIR)
engineSavePath = Some(s"$savePath/$engineId")
val path = new Path(engineSavePath.get)
val path = new Path(engineSavePath)
val fs = path.getFileSystem(spark.sparkContext.hadoopConfiguration)
fs.mkdirs(path)
fs.deleteOnExit(path)
Expand All @@ -114,9 +112,14 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin
exec,
Duration(60, TimeUnit.SECONDS))
})
engineSavePath.foreach { p =>
val path = new Path(p)
path.getFileSystem(spark.sparkContext.hadoopConfiguration).delete(path, true)
try {
val path = new Path(engineSavePath)
val fs = path.getFileSystem(spark.sparkContext.hadoopConfiguration)
if (fs.exists(path)) {
fs.delete(path, true)
}
} catch {
case e: Throwable => error(s"Error cleaning engine result save path: $engineSavePath", e)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ import org.apache.spark.sql.kyuubi.SparkDatasetHelper._
import org.apache.spark.sql.types._

import org.apache.kyuubi.{KyuubiSQLException, Logging}
import org.apache.kyuubi.config.KyuubiConf.{OPERATION_RESULT_MAX_ROWS, OPERATION_RESULT_SAVE_TO_FILE, OPERATION_RESULT_SAVE_TO_FILE_DIR, OPERATION_RESULT_SAVE_TO_FILE_MINSIZE}
import org.apache.kyuubi.config.KyuubiConf.{OPERATION_RESULT_MAX_ROWS, OPERATION_RESULT_SAVE_TO_FILE, OPERATION_RESULT_SAVE_TO_FILE_MINSIZE}
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil._
import org.apache.kyuubi.engine.spark.session.SparkSessionImpl
import org.apache.kyuubi.engine.spark.session.{SparkSessionImpl, SparkSQLSessionManager}
import org.apache.kyuubi.operation.{ArrayFetchIterator, FetchIterator, IterableFetchIterator, OperationHandle, OperationState}
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
Expand Down Expand Up @@ -172,14 +172,16 @@ class ExecuteStatement(
})
} else {
val resultSaveEnabled = getSessionConf(OPERATION_RESULT_SAVE_TO_FILE, spark)
lazy val resultSaveThreshold = getSessionConf(OPERATION_RESULT_SAVE_TO_FILE_MINSIZE, spark)
val resultSaveThreshold = getSessionConf(OPERATION_RESULT_SAVE_TO_FILE_MINSIZE, spark)
if (hasResultSet && resultSaveEnabled && shouldSaveResultToFs(
resultMaxRows,
resultSaveThreshold,
result)) {
val sessionId = session.handle.identifier.toString
val savePath = session.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE_DIR)
saveFileName = Some(s"$savePath/$engineId/$sessionId/$statementId")
saveFileName =
Some(
session.sessionManager.asInstanceOf[SparkSQLSessionManager].getOperationResultSavePath(
session.handle,
handle))
// Rename all col name to avoid duplicate columns
val colName = range(0, result.schema.size).map(x => "col" + x)

Expand All @@ -192,7 +194,7 @@ class ExecuteStatement(
result.toDF(colName: _*).write
.option("compression", codec).format("orc").save(saveFileName.get)
}
info(s"Save result to $saveFileName")
info(s"Save result to ${saveFileName.get}")
fetchOrcStatement = Some(new FetchOrcStatement(spark))
return fetchOrcStatement.get.getIterator(saveFileName.get, resultSchema)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@ import org.apache.spark.api.python.KyuubiPythonGatewayServer
import org.apache.spark.sql.SparkSession

import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY
import org.apache.kyuubi.engine.ShareLevel
import org.apache.kyuubi.engine.ShareLevel._
import org.apache.kyuubi.engine.spark.{KyuubiSparkUtil, SparkSQLEngine}
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.engineId
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.{engineId, getSessionConf}
import org.apache.kyuubi.engine.spark.operation.SparkSQLOperationManager
import org.apache.kyuubi.operation.OperationHandle
import org.apache.kyuubi.session._
import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TProtocolVersion
import org.apache.kyuubi.util.ThreadUtils
Expand Down Expand Up @@ -181,22 +183,45 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)
} catch {
case e: KyuubiSQLException =>
warn(s"Error closing session ${sessionHandle}", e)
} finally {
if (getSessionConf(KyuubiConf.OPERATION_RESULT_SAVE_TO_FILE, spark)) {
val sessionSavePath = getSessionResultSavePath(sessionHandle)
try {
val path = new Path(sessionSavePath)
val fs = path.getFileSystem(spark.sparkContext.hadoopConfiguration)
if (fs.exists(path)) {
fs.delete(path, true)
info(s"Deleted session result path: $sessionSavePath")
}
} catch {
case e: Throwable => error(s"Error cleaning session result path: $sessionSavePath", e)
}
}
}
if (shareLevel == ShareLevel.CONNECTION) {
info("Session stopped due to shared level is Connection.")
stopSession()
}
if (conf.get(OPERATION_RESULT_SAVE_TO_FILE)) {
val path = new Path(s"${conf.get(OPERATION_RESULT_SAVE_TO_FILE_DIR)}/" +
s"$engineId/${sessionHandle.identifier}")
path.getFileSystem(spark.sparkContext.hadoopConfiguration).delete(path, true)
info(s"Delete session result file $path")
}
}

private def stopSession(): Unit = {
SparkSQLEngine.currentEngine.foreach(_.stop())
}

override protected def isServer: Boolean = false

private[spark] def getEngineResultSavePath(): String = {
Seq(conf.get(OPERATION_RESULT_SAVE_TO_FILE_DIR), engineId).mkString(Path.SEPARATOR)
}

private def getSessionResultSavePath(sessionHandle: SessionHandle): String = {
Seq(getEngineResultSavePath(), sessionHandle.identifier.toString).mkString(Path.SEPARATOR)
}

private[spark] def getOperationResultSavePath(
sessionHandle: SessionHandle,
opHandle: OperationHandle): String = {
Seq(getSessionResultSavePath(sessionHandle), opHandle.identifier.toString).mkString(
Path.SEPARATOR)
}
}

0 comments on commit 895caec

Please sign in to comment.