diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala index 3dc771e6ccf..d2e738366f8 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala @@ -38,10 +38,9 @@ import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys} import org.apache.kyuubi.config.KyuubiConf._ import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_SUBMIT_TIME_KEY, KYUUBI_ENGINE_URL} import org.apache.kyuubi.engine.ShareLevel -import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.engineId import org.apache.kyuubi.engine.spark.SparkSQLEngine.{countDownLatch, currentEngine} import org.apache.kyuubi.engine.spark.events.{EngineEvent, EngineEventsStore, SparkEventHandlerRegister} -import org.apache.kyuubi.engine.spark.session.SparkSessionImpl +import org.apache.kyuubi.engine.spark.session.{SparkSessionImpl, SparkSQLSessionManager} import org.apache.kyuubi.events.EventBus import org.apache.kyuubi.ha.HighAvailabilityConf._ import org.apache.kyuubi.ha.client.RetryPolicies @@ -60,7 +59,8 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin @volatile private var lifetimeTerminatingChecker: Option[ScheduledExecutorService] = None @volatile private var stopEngineExec: Option[ThreadPoolExecutor] = None - @volatile private var engineSavePath: Option[String] = None + private lazy val engineSavePath = + backendService.sessionManager.asInstanceOf[SparkSQLSessionManager].getEngineResultSavePath() override def initialize(conf: KyuubiConf): Unit = { val listener = new SparkSQLEngineListener(this) @@ -92,9 +92,7 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin } if (backendService.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE)) { - val savePath = backendService.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE_DIR) - engineSavePath = Some(s"$savePath/$engineId") - val path = new Path(engineSavePath.get) + val path = new Path(engineSavePath) val fs = path.getFileSystem(spark.sparkContext.hadoopConfiguration) fs.mkdirs(path) fs.deleteOnExit(path) @@ -114,9 +112,14 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin exec, Duration(60, TimeUnit.SECONDS)) }) - engineSavePath.foreach { p => - val path = new Path(p) - path.getFileSystem(spark.sparkContext.hadoopConfiguration).delete(path, true) + try { + val path = new Path(engineSavePath) + val fs = path.getFileSystem(spark.sparkContext.hadoopConfiguration) + if (fs.exists(path)) { + fs.delete(path, true) + } + } catch { + case e: Throwable => error(s"Error cleaning engine result save path: $engineSavePath", e) } } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala index 8b47e2075a0..bf68f18f064 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala @@ -28,9 +28,9 @@ import org.apache.spark.sql.kyuubi.SparkDatasetHelper._ import org.apache.spark.sql.types._ import org.apache.kyuubi.{KyuubiSQLException, Logging} -import org.apache.kyuubi.config.KyuubiConf.{OPERATION_RESULT_MAX_ROWS, OPERATION_RESULT_SAVE_TO_FILE, OPERATION_RESULT_SAVE_TO_FILE_DIR, OPERATION_RESULT_SAVE_TO_FILE_MINSIZE} +import org.apache.kyuubi.config.KyuubiConf.{OPERATION_RESULT_MAX_ROWS, OPERATION_RESULT_SAVE_TO_FILE, OPERATION_RESULT_SAVE_TO_FILE_MINSIZE} import org.apache.kyuubi.engine.spark.KyuubiSparkUtil._ -import org.apache.kyuubi.engine.spark.session.SparkSessionImpl +import org.apache.kyuubi.engine.spark.session.{SparkSessionImpl, SparkSQLSessionManager} import org.apache.kyuubi.operation.{ArrayFetchIterator, FetchIterator, IterableFetchIterator, OperationHandle, OperationState} import org.apache.kyuubi.operation.log.OperationLog import org.apache.kyuubi.session.Session @@ -172,14 +172,16 @@ class ExecuteStatement( }) } else { val resultSaveEnabled = getSessionConf(OPERATION_RESULT_SAVE_TO_FILE, spark) - lazy val resultSaveThreshold = getSessionConf(OPERATION_RESULT_SAVE_TO_FILE_MINSIZE, spark) + val resultSaveThreshold = getSessionConf(OPERATION_RESULT_SAVE_TO_FILE_MINSIZE, spark) if (hasResultSet && resultSaveEnabled && shouldSaveResultToFs( resultMaxRows, resultSaveThreshold, result)) { - val sessionId = session.handle.identifier.toString - val savePath = session.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE_DIR) - saveFileName = Some(s"$savePath/$engineId/$sessionId/$statementId") + saveFileName = + Some( + session.sessionManager.asInstanceOf[SparkSQLSessionManager].getOperationResultSavePath( + session.handle, + handle)) // Rename all col name to avoid duplicate columns val colName = range(0, result.schema.size).map(x => "col" + x) @@ -192,7 +194,7 @@ class ExecuteStatement( result.toDF(colName: _*).write .option("compression", codec).format("orc").save(saveFileName.get) } - info(s"Save result to $saveFileName") + info(s"Save result to ${saveFileName.get}") fetchOrcStatement = Some(new FetchOrcStatement(spark)) return fetchOrcStatement.get.getIterator(saveFileName.get, resultSchema) } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala index aab2d51068f..3d44fc8b608 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala @@ -24,13 +24,15 @@ import org.apache.spark.api.python.KyuubiPythonGatewayServer import org.apache.spark.sql.SparkSession import org.apache.kyuubi.KyuubiSQLException +import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf._ import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY import org.apache.kyuubi.engine.ShareLevel import org.apache.kyuubi.engine.ShareLevel._ import org.apache.kyuubi.engine.spark.{KyuubiSparkUtil, SparkSQLEngine} -import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.engineId +import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.{engineId, getSessionConf} import org.apache.kyuubi.engine.spark.operation.SparkSQLOperationManager +import org.apache.kyuubi.operation.OperationHandle import org.apache.kyuubi.session._ import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TProtocolVersion import org.apache.kyuubi.util.ThreadUtils @@ -181,17 +183,25 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession) } catch { case e: KyuubiSQLException => warn(s"Error closing session ${sessionHandle}", e) + } finally { + if (getSessionConf(KyuubiConf.OPERATION_RESULT_SAVE_TO_FILE, spark)) { + val sessionSavePath = getSessionResultSavePath(sessionHandle) + try { + val path = new Path(sessionSavePath) + val fs = path.getFileSystem(spark.sparkContext.hadoopConfiguration) + if (fs.exists(path)) { + fs.delete(path, true) + info(s"Deleted session result path: $sessionSavePath") + } + } catch { + case e: Throwable => error(s"Error cleaning session result path: $sessionSavePath", e) + } + } } if (shareLevel == ShareLevel.CONNECTION) { info("Session stopped due to shared level is Connection.") stopSession() } - if (conf.get(OPERATION_RESULT_SAVE_TO_FILE)) { - val path = new Path(s"${conf.get(OPERATION_RESULT_SAVE_TO_FILE_DIR)}/" + - s"$engineId/${sessionHandle.identifier}") - path.getFileSystem(spark.sparkContext.hadoopConfiguration).delete(path, true) - info(s"Delete session result file $path") - } } private def stopSession(): Unit = { @@ -199,4 +209,19 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession) } override protected def isServer: Boolean = false + + private[spark] def getEngineResultSavePath(): String = { + Seq(conf.get(OPERATION_RESULT_SAVE_TO_FILE_DIR), engineId).mkString(Path.SEPARATOR) + } + + private def getSessionResultSavePath(sessionHandle: SessionHandle): String = { + Seq(getEngineResultSavePath(), sessionHandle.identifier.toString).mkString(Path.SEPARATOR) + } + + private[spark] def getOperationResultSavePath( + sessionHandle: SessionHandle, + opHandle: OperationHandle): String = { + Seq(getSessionResultSavePath(sessionHandle), opHandle.identifier.toString).mkString( + Path.SEPARATOR) + } }