Skip to content

Commit

Permalink
[KYUUBI #5377] Spark engine query result save to file
Browse files Browse the repository at this point in the history
### _Why are the changes needed?_

close #5377

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request

### _Was this patch authored or co-authored using generative AI tooling?_

NO

Closes #5591 from lsm1/branch-kyuubi-5377.

Closes #5377

9d1a18c [senmiaoliu] ignore empty file
3c70a1e [LSM] fix doc
73d3c3a [senmiaoliu] fix style and add some comment
80e1f0d [senmiaoliu] Close orc fetchOrcStatement and remove result save file when ExecuteStatement close
42634a1 [senmiaoliu] fix style
979125d [senmiaoliu] fix style
1dc07a5 [senmiaoliu] spark engine save into hdfs file

Lead-authored-by: senmiaoliu <[email protected]>
Co-authored-by: LSM <[email protected]>
Signed-off-by: Fu Chen <[email protected]>
  • Loading branch information
lsm1 authored and cfmcgrady committed Dec 13, 2023
1 parent 1b36ee5 commit 4c029f9
Show file tree
Hide file tree
Showing 7 changed files with 271 additions and 1 deletion.
3 changes: 3 additions & 0 deletions docs/configuration/settings.md
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,9 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co
| kyuubi.operation.result.arrow.timestampAsString | false | When true, arrow-based rowsets will convert columns of type timestamp to strings for transmission. | boolean | 1.7.0 |
| kyuubi.operation.result.format | thrift | Specify the result format, available configs are: <ul> <li>THRIFT: the result will convert to TRow at the engine driver side. </li> <li>ARROW: the result will be encoded as Arrow at the executor side before collecting by the driver, and deserialized at the client side. note that it only takes effect for kyuubi-hive-jdbc clients now.</li></ul> | string | 1.7.0 |
| kyuubi.operation.result.max.rows | 0 | Max rows of Spark query results. Rows exceeding the limit would be ignored. By setting this value to 0 to disable the max rows limit. | int | 1.6.0 |
| kyuubi.operation.result.saveToFile.dir | /tmp/kyuubi/tmp_kyuubi_result | The Spark query result save dir, it should be a public accessible to every engine. Results are saved in ORC format, and the directory structure is `/OPERATION_RESULT_SAVE_TO_FILE_DIR/engineId/sessionId/statementId`. Each query result will delete when query finished. | string | 1.9.0 |
| kyuubi.operation.result.saveToFile.enabled | false | The switch for Spark query result save to file. | boolean | 1.9.0 |
| kyuubi.operation.result.saveToFile.minSize | 209715200 | The minSize of Spark result save to file, default value is 200 MB.we use spark's `EstimationUtils#getSizePerRowestimate` to estimate the output size of the execution plan. | long | 1.9.0 |
| kyuubi.operation.scheduler.pool | &lt;undefined&gt; | The scheduler pool of job. Note that, this config should be used after changing Spark config spark.scheduler.mode=FAIR. | string | 1.1.1 |
| kyuubi.operation.spark.listener.enabled | true | When set to true, Spark engine registers an SQLOperationListener before executing the statement, logging a few summary statistics when each stage completes. | boolean | 1.6.0 |
| kyuubi.operation.status.polling.timeout | PT5S | Timeout(ms) for long polling asynchronous running sql query's status | duration | 1.0.0 |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import scala.concurrent.duration.Duration
import scala.util.control.NonFatal

import com.google.common.annotations.VisibleForTesting
import org.apache.hadoop.fs.Path
import org.apache.spark.{ui, SparkConf}
import org.apache.spark.kyuubi.{SparkContextHelper, SparkSQLEngineEventListener, SparkSQLEngineListener}
import org.apache.spark.kyuubi.SparkUtilsHelper.getLocalDir
Expand All @@ -37,6 +38,7 @@ import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys}
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_SUBMIT_TIME_KEY, KYUUBI_ENGINE_URL}
import org.apache.kyuubi.engine.ShareLevel
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.engineId
import org.apache.kyuubi.engine.spark.SparkSQLEngine.{countDownLatch, currentEngine}
import org.apache.kyuubi.engine.spark.events.{EngineEvent, EngineEventsStore, SparkEventHandlerRegister}
import org.apache.kyuubi.engine.spark.session.SparkSessionImpl
Expand All @@ -58,6 +60,7 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin

@volatile private var lifetimeTerminatingChecker: Option[ScheduledExecutorService] = None
@volatile private var stopEngineExec: Option[ThreadPoolExecutor] = None
@volatile private var engineSavePath: Option[String] = None

override def initialize(conf: KyuubiConf): Unit = {
val listener = new SparkSQLEngineListener(this)
Expand Down Expand Up @@ -87,6 +90,15 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin
maxInitTimeout > 0) {
startFastFailChecker(maxInitTimeout)
}

if (backendService.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE)) {
val savePath = backendService.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE_DIR)
engineSavePath = Some(s"$savePath/$engineId")
val path = new Path(engineSavePath.get)
val fs = path.getFileSystem(spark.sparkContext.hadoopConfiguration)
fs.mkdirs(path)
fs.deleteOnExit(path)
}
}

override def stop(): Unit = if (shutdown.compareAndSet(false, true)) {
Expand All @@ -102,6 +114,10 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin
exec,
Duration(60, TimeUnit.SECONDS))
})
engineSavePath.foreach { p =>
val path = new Path(p)
path.getFileSystem(spark.sparkContext.hadoopConfiguration).delete(path, true)
}
}

def gracefulStop(): Unit = if (gracefulStopDeregistered.compareAndSet(false, true)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@ package org.apache.kyuubi.engine.spark.operation

import java.util.concurrent.RejectedExecutionException

import scala.Array._
import scala.collection.JavaConverters._

import org.apache.hadoop.fs.Path
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.kyuubi.SparkDatasetHelper._
import org.apache.spark.sql.types._

import org.apache.kyuubi.{KyuubiSQLException, Logging}
import org.apache.kyuubi.config.KyuubiConf.OPERATION_RESULT_MAX_ROWS
import org.apache.kyuubi.config.KyuubiConf.{OPERATION_RESULT_MAX_ROWS, OPERATION_RESULT_SAVE_TO_FILE, OPERATION_RESULT_SAVE_TO_FILE_DIR, OPERATION_RESULT_SAVE_TO_FILE_MINSIZE}
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil._
import org.apache.kyuubi.engine.spark.session.SparkSessionImpl
import org.apache.kyuubi.operation.{ArrayFetchIterator, FetchIterator, IterableFetchIterator, OperationHandle, OperationState}
Expand All @@ -46,6 +48,8 @@ class ExecuteStatement(
override def getOperationLog: Option[OperationLog] = Option(operationLog)
override protected def supportProgress: Boolean = true

private var fetchOrcStatement: Option[FetchOrcStatement] = None
private var saveFileName: Option[String] = None
override protected def resultSchema: StructType = {
if (result == null || result.schema.isEmpty) {
new StructType().add("Result", "string")
Expand All @@ -64,6 +68,15 @@ class ExecuteStatement(
OperationLog.removeCurrentOperationLog()
}

override def close(): Unit = {
super.close()
fetchOrcStatement.foreach(_.close())
saveFileName.foreach { p =>
val path = new Path(p)
path.getFileSystem(spark.sparkContext.hadoopConfiguration).delete(path, true)
}
}

protected def incrementalCollectResult(resultDF: DataFrame): Iterator[Any] = {
resultDF.toLocalIterator().asScala
}
Expand Down Expand Up @@ -158,6 +171,29 @@ class ExecuteStatement(
override def iterator: Iterator[Any] = incrementalCollectResult(resultDF)
})
} else {
val resultSaveEnabled = getSessionConf(OPERATION_RESULT_SAVE_TO_FILE, spark)
lazy val resultSaveThreshold = getSessionConf(OPERATION_RESULT_SAVE_TO_FILE_MINSIZE, spark)
if (hasResultSet && resultSaveEnabled && shouldSaveResultToFs(
resultMaxRows,
resultSaveThreshold,
result)) {
val sessionId = session.handle.identifier.toString
val savePath = session.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE_DIR)
saveFileName = Some(s"$savePath/$engineId/$sessionId/$statementId")
// Rename all col name to avoid duplicate columns
val colName = range(0, result.schema.size).map(x => "col" + x)
// df.write will introduce an extra shuffle for the outermost limit, and hurt performance
if (resultMaxRows > 0) {
result.toDF(colName: _*).limit(resultMaxRows).write
.option("compression", "zstd").format("orc").save(saveFileName.get)
} else {
result.toDF(colName: _*).write
.option("compression", "zstd").format("orc").save(saveFileName.get)
}
info(s"Save result to $saveFileName")
fetchOrcStatement = Some(new FetchOrcStatement(spark))
return fetchOrcStatement.get.getIterator(saveFileName.get, resultSchema)
}
val internalArray = if (resultMaxRows <= 0) {
info("Execute in full collect mode")
fullCollectResult(resultDF)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.engine.spark.operation

import scala.Array._
import scala.collection.mutable.ListBuffer

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{LocatedFileStatus, Path}
import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
import org.apache.hadoop.mapreduce.lib.input.FileSplit
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.orc.mapred.OrcStruct
import org.apache.orc.mapreduce.OrcInputFormat
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.execution.datasources.RecordReaderIterator
import org.apache.spark.sql.execution.datasources.orc.OrcDeserializer
import org.apache.spark.sql.types.StructType

import org.apache.kyuubi.KyuubiException
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.SPARK_ENGINE_RUNTIME_VERSION
import org.apache.kyuubi.operation.{FetchIterator, IterableFetchIterator}
import org.apache.kyuubi.util.reflect.DynConstructors

class FetchOrcStatement(spark: SparkSession) {

var orcIter: OrcFileIterator = _
def getIterator(path: String, orcSchema: StructType): FetchIterator[Row] = {
val conf = spark.sparkContext.hadoopConfiguration
val savePath = new Path(path)
val fsIterator = savePath.getFileSystem(conf).listFiles(savePath, false)
val list = new ListBuffer[LocatedFileStatus]
while (fsIterator.hasNext) {
val file = fsIterator.next()
if (file.getPath.getName.endsWith(".orc") && file.getLen > 0) {
list += file
}
}
val toRowConverter: InternalRow => Row = {
CatalystTypeConverters.createToScalaConverter(orcSchema)
.asInstanceOf[InternalRow => Row]
}
val colId = range(0, orcSchema.size)
val fullSchema = orcSchema.map(f =>
AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())
val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema)
val deserializer = getOrcDeserializer(orcSchema, colId)
orcIter = new OrcFileIterator(list)
val iterRow = orcIter.map(value =>
unsafeProjection(deserializer.deserialize(value)))
.map(value => toRowConverter(value))
new IterableFetchIterator[Row](iterRow.toIterable)
}

def close(): Unit = {
orcIter.close()
}

private def getOrcDeserializer(orcSchema: StructType, colId: Array[Int]): OrcDeserializer = {
try {
if (SPARK_ENGINE_RUNTIME_VERSION >= "3.2") {
// SPARK-34535 changed the constructor signature of OrcDeserializer
DynConstructors.builder()
.impl(classOf[OrcDeserializer], classOf[StructType], classOf[Array[Int]])
.build[OrcDeserializer]()
.newInstance(
orcSchema,
colId)
} else {
DynConstructors.builder()
.impl(
classOf[OrcDeserializer],
classOf[StructType],
classOf[StructType],
classOf[Array[Int]])
.build[OrcDeserializer]()
.newInstance(
new StructType,
orcSchema,
colId)
}
} catch {
case e: Throwable =>
throw new KyuubiException("Failed to create OrcDeserializer", e)
}
}
}

class OrcFileIterator(fileList: ListBuffer[LocatedFileStatus]) extends Iterator[OrcStruct] {

private val iters = fileList.map(x => getOrcFileIterator(x))

var idx = 0

override def hasNext: Boolean = {
val hasNext = iters(idx).hasNext
if (!hasNext) {
iters(idx).close()
idx += 1
// skip empty file
while (idx < iters.size) {
if (iters(idx).hasNext) {
return true
} else {
iters(idx).close()
idx = idx + 1
}
}
}
hasNext
}

override def next(): OrcStruct = {
iters(idx).next()
}

def close(): Unit = {
iters.foreach(_.close())
}

private def getOrcFileIterator(file: LocatedFileStatus): RecordReaderIterator[OrcStruct] = {
val orcRecordReader = {
val split =
new FileSplit(file.getPath, 0, file.getLen, Array.empty[String])
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
val hadoopAttemptContext =
new TaskAttemptContextImpl(new Configuration(), attemptId)
val oif = new OrcInputFormat[OrcStruct]
oif.createRecordReader(split, hadoopAttemptContext)
}
new RecordReaderIterator[OrcStruct](orcRecordReader)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.kyuubi.engine.spark.session

import java.util.concurrent.{ScheduledExecutorService, TimeUnit}

import org.apache.hadoop.fs.Path
import org.apache.spark.api.python.KyuubiPythonGatewayServer
import org.apache.spark.sql.SparkSession

Expand All @@ -28,6 +29,7 @@ import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY
import org.apache.kyuubi.engine.ShareLevel
import org.apache.kyuubi.engine.ShareLevel._
import org.apache.kyuubi.engine.spark.{KyuubiSparkUtil, SparkSQLEngine}
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.engineId
import org.apache.kyuubi.engine.spark.operation.SparkSQLOperationManager
import org.apache.kyuubi.session._
import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TProtocolVersion
Expand Down Expand Up @@ -184,6 +186,12 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)
info("Session stopped due to shared level is Connection.")
stopSession()
}
if (conf.get(OPERATION_RESULT_SAVE_TO_FILE)) {
val path = new Path(s"${conf.get(OPERATION_RESULT_SAVE_TO_FILE_DIR)}/" +
s"$engineId/${sessionHandle.identifier}")
path.getFileSystem(spark.sparkContext.hadoopConfiguration).delete(path, true)
info(s"Delete session result file $path")
}
}

private def stopSession(): Unit = {
Expand Down
Loading

0 comments on commit 4c029f9

Please sign in to comment.