Skip to content

Commit

Permalink
kyuubi-3444
Browse files Browse the repository at this point in the history
  • Loading branch information
iodone committed Oct 8, 2022
1 parent 612a82e commit f66202d
Show file tree
Hide file tree
Showing 8 changed files with 108 additions and 5 deletions.
2 changes: 1 addition & 1 deletion docs/deployment/settings.md
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ kyuubi.operation.interrupt.on.cancel|true|When true, all running tasks will be i
kyuubi.operation.language|SQL|Choose a programing language for the following inputs <ul><li>SQL: (Default) Run all following statements as SQL queries.</li> <li>SCALA: Run all following input a scala codes</li></ul>|string|1.5.0
kyuubi.operation.log.dir.root|server_operation_logs|Root directory for query operation log at server-side.|string|1.4.0
kyuubi.operation.plan.only.excludes|ResetCommand,SetCommand,SetNamespaceCommand,UseStatement,SetCatalogAndNamespace|Comma-separated list of query plan names, in the form of simple class names, i.e, for `set abc=xyz`, the value will be `SetCommand`. For those auxiliary plans, such as `switch databases`, `set properties`, or `create temporary view` e.t.c, which are used for setup evaluating environments for analyzing actual queries, we can use this config to exclude them and let them take effect. See also kyuubi.operation.plan.only.mode.|seq|1.5.0
kyuubi.operation.plan.only.mode|none|Configures the statement performed mode, The value can be 'parse', 'analyze', 'optimize', 'optimize_with_stats', 'physical', 'execution', or 'none', when it is 'none', indicate to the statement will be fully executed, otherwise only way without executing the query. different engines currently support different modes, the Spark engine supports all modes, and the Flink engine supports 'parse', 'physical', and 'execution', other engines do not support planOnly currently.|string|1.4.0
kyuubi.operation.plan.only.mode|none|Configures the statement performed mode, The value can be 'parse', 'analyze', 'optimize', 'optimize_with_stats', 'physical', 'execution', 'lineage' or 'none', when it is 'none', indicate to the statement will be fully executed, otherwise only way without executing the query. different engines currently support different modes, the Spark engine supports all modes, and the Flink engine supports 'parse', 'physical', and 'execution', other engines do not support planOnly currently.|string|1.4.0
kyuubi.operation.plan.only.output.style|plain|Configures the planOnly output style, The value can be 'plain' and 'json', default value is 'plain', this configuration supports only the output styles of the Spark engine|string|1.7.0
kyuubi.operation.progress.enabled|false|Whether to enable the operation progress. When true, the operation progress will be returned in `GetOperationStatus`.|boolean|1.6.0
kyuubi.operation.query.timeout|&lt;undefined&gt;|Timeout for query executions at server-side, take affect with client-side timeout(`java.sql.Statement.setQueryTimeout`) together, a running query will be cancelled automatically if timeout. It's off by default, which means only client-side take fully control whether the query should timeout or not. If set, client-side timeout capped at this point. To cancel the queries right away without waiting task to finish, consider enabling kyuubi.operation.interrupt.on.cancel together.|duration|1.2.0
Expand Down
7 changes: 7 additions & 0 deletions externals/kyuubi-spark-sql-engine/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-spark-lineage_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,21 @@

package org.apache.kyuubi.engine.spark.operation

import org.apache.spark.sql.Row
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.spark.kyuubi.SparkUtilsHelper
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType

import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiConf.{OPERATION_PLAN_ONLY_EXCLUDES, OPERATION_PLAN_ONLY_OUT_STYLE}
import org.apache.kyuubi.operation.{AnalyzeMode, ArrayFetchIterator, ExecutionMode, IterableFetchIterator, JsonStyle, OptimizeMode, OptimizeWithStatsMode, ParseMode, PhysicalMode, PlainStyle, PlanOnlyMode, PlanOnlyStyle, UnknownMode, UnknownStyle}
import org.apache.kyuubi.operation._
import org.apache.kyuubi.operation.PlanOnlyMode.{notSupportedModeError, unknownModeError}
import org.apache.kyuubi.operation.PlanOnlyStyle.{notSupportedStyleError, unknownStyleError}
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.plugin.lineage.helper.LineageParser
import org.apache.kyuubi.session.Session

/**
Expand Down Expand Up @@ -112,6 +116,9 @@ class PlanOnlyStatement(
case ExecutionMode =>
val executed = spark.sql(statement).queryExecution.executedPlan
iter = new IterableFetchIterator(Seq(Row(executed.toString())))
case LineageMode =>
val result = parseLineage(spark, plan)
iter = new IterableFetchIterator(Seq(Row(result)))
case UnknownMode => throw unknownModeError(mode)
case _ => throw notSupportedModeError(mode, "Spark SQL")
}
Expand All @@ -136,10 +143,33 @@ class PlanOnlyStatement(
case ExecutionMode =>
val executed = spark.sql(statement).queryExecution.executedPlan
iter = new IterableFetchIterator(Seq(Row(executed.toJSON)))
case LineageMode =>
val result = parseLineage(spark, plan)
iter = new IterableFetchIterator(Seq(Row(result)))
case UnknownMode => throw unknownModeError(mode)
case _ =>
throw KyuubiSQLException(s"The operation mode $mode" +
" doesn't support in Spark SQL engine.")
}
}

private def parseLineage(spark: SparkSession, plan: LogicalPlan): String = {
val analyzed = spark.sessionState.analyzer.execute(plan)
spark.sessionState.analyzer.checkAnalysis(analyzed)
val optimized = spark.sessionState.optimizer.execute(analyzed)
try {
if (!SparkUtilsHelper.classesArePresent(
"org.apache.kyuubi.plugin.lineage.helper.LineageParser")) {
throw new Exception("'org.apache.kyuubi.plugin.lineage.helper.LineageParser' not found," +
" need to install kyuubi-spark-lineage plugin before using the 'lineage' mode")
}
val lineageParser = new LineageParser { def sparkSession = spark }
val lineage = lineageParser.parse(optimized)
val mapper = new ObjectMapper().registerModule(DefaultScalaModule)
mapper.writeValueAsString(lineage)
} catch {
case e: Throwable =>
throw KyuubiSQLException(s"Extract columns lineage failed: ${e.getMessage}", e)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,13 @@ object SparkUtilsHelper extends Logging {
def getLocalDir(conf: SparkConf): String = {
Utils.getLocalDir(conf)
}

def classesArePresent(className: String): Boolean = {
try {
Utils.classForName(className)
true
} catch {
case _: ClassNotFoundException | _: NoClassDefFoundError => false
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1648,7 +1648,7 @@ object KyuubiConf {
val OPERATION_PLAN_ONLY_MODE: ConfigEntry[String] =
buildConf("kyuubi.operation.plan.only.mode")
.doc("Configures the statement performed mode, The value can be 'parse', 'analyze', " +
"'optimize', 'optimize_with_stats', 'physical', 'execution', or 'none', " +
"'optimize', 'optimize_with_stats', 'physical', 'execution', 'lineage' or 'none', " +
"when it is 'none', indicate to the statement will be fully executed, otherwise " +
"only way without executing the query. different engines currently support different " +
"modes, the Spark engine supports all modes, and the Flink engine supports 'parse', " +
Expand All @@ -1665,10 +1665,11 @@ object KyuubiConf {
"OPTIMIZE_WITH_STATS",
"PHYSICAL",
"EXECUTION",
"LINEAGE",
"NONE").contains(mode),
"Invalid value for 'kyuubi.operation.plan.only.mode'. Valid values are" +
"'parse', 'analyze', 'optimize', 'optimize_with_stats', 'physical', 'execution' and " +
"'none'.")
"'lineage', 'none'.")
.createWithDefault(NoneMode.name)

val OPERATION_PLAN_ONLY_OUT_STYLE: ConfigEntry[String] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ case object PhysicalMode extends PlanOnlyMode { val name = "physical" }

case object ExecutionMode extends PlanOnlyMode { val name = "execution" }

case object LineageMode extends PlanOnlyMode { val name = "lineage" }

case object NoneMode extends PlanOnlyMode { val name = "none" }

case object UnknownMode extends PlanOnlyMode {
Expand All @@ -64,6 +66,7 @@ object PlanOnlyMode {
case OptimizeWithStatsMode.name => OptimizeWithStatsMode
case PhysicalMode.name => PhysicalMode
case ExecutionMode.name => ExecutionMode
case LineageMode.name => LineageMode
case NoneMode.name => NoneMode
case other => UnknownMode.mode(other)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,31 @@ trait SparkQueryTests extends HiveJDBCTestHelper {
}
}

test("KYUUBI #3444: Plan only operation for lineage") {
val ddl = "create table if not exists t0(a int) using parquet"
val dql = "select * from t0"
withJdbcStatement("t0") { statement =>
statement.execute(ddl)
statement.execute("SET kyuubi.operation.plan.only.mode=lineage")
val lineageParserClassName = "org.apache.kyuubi.plugin.lineage.helper.LineageParser"
try {
val resultSet = statement.executeQuery(dql)
assert(resultSet.next())
val actualResult =
"""
|{"inputTables":["default.t0"],"outputTables":[],
|"columnLineage":[{"column":"a","originalColumns":["default.t0.a"]}]}
|""".stripMargin.split("\n").mkString("")
assert(resultSet.getString(1) == actualResult)
} catch {
case e: Throwable =>
assert(e.getMessage.contains(s"'$lineageParserClassName' not found"))
} finally {
statement.executeQuery("SET kyuubi.operation.plan.only.mode=none")
}
}
}

test("execute simple scala code") {
withJdbcStatement() { statement =>
statement.execute("SET kyuubi.operation.language=scala")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,34 @@ class PlanOnlyOperationSuite extends WithKyuubiServer with HiveJDBCTestHelper {
}
}

test("kyuubi #3444: Plan only mode with lineage mode") {

val ddl = "create table tt(a int) using parquet"
val dql = "select * from tt"
withSessionConf()(Map(KyuubiConf.OPERATION_PLAN_ONLY_MODE.key -> NoneMode.name))() {
withJdbcStatement("tt") { statement =>
statement.execute(ddl)
statement.execute("SET kyuubi.operation.plan.only.mode=lineage")
val lineageParserClassName = "org.apache.kyuubi.plugin.lineage.helper.LineageParser"
try {
val resultSet = statement.executeQuery(dql)
assert(resultSet.next())
val actualResult =
"""
|{"inputTables":["default.t0"],"outputTables":[],
|"columnLineage":[{"column":"a","originalColumns":["default.t0.a"]}]}
|""".stripMargin.split("\n").mkString("")
assert(resultSet.getString(1) == actualResult)
} catch {
case e: Throwable =>
assert(e.getMessage.contains(s"'$lineageParserClassName' not found"))
} finally {
statement.executeQuery("SET kyuubi.operation.plan.only.mode=none")
}
}
}
}

private def getOperationPlanWithStatement(statement: Statement): String = {
val resultSet = statement.executeQuery("select 1 where true")
assert(resultSet.next())
Expand Down

0 comments on commit f66202d

Please sign in to comment.