diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index a845b4472e7..d53ad29835f 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -415,7 +415,7 @@ kyuubi.operation.interrupt.on.cancel|true|When true, all running tasks will be i
kyuubi.operation.language|SQL|Choose a programing language for the following inputs
- SQL: (Default) Run all following statements as SQL queries.
- SCALA: Run all following input a scala codes
|string|1.5.0
kyuubi.operation.log.dir.root|server_operation_logs|Root directory for query operation log at server-side.|string|1.4.0
kyuubi.operation.plan.only.excludes|ResetCommand,SetCommand,SetNamespaceCommand,UseStatement,SetCatalogAndNamespace|Comma-separated list of query plan names, in the form of simple class names, i.e, for `set abc=xyz`, the value will be `SetCommand`. For those auxiliary plans, such as `switch databases`, `set properties`, or `create temporary view` e.t.c, which are used for setup evaluating environments for analyzing actual queries, we can use this config to exclude them and let them take effect. See also kyuubi.operation.plan.only.mode.|seq|1.5.0
-kyuubi.operation.plan.only.mode|none|Configures the statement performed mode, The value can be 'parse', 'analyze', 'optimize', 'optimize_with_stats', 'physical', 'execution', or 'none', when it is 'none', indicate to the statement will be fully executed, otherwise only way without executing the query. different engines currently support different modes, the Spark engine supports all modes, and the Flink engine supports 'parse', 'physical', and 'execution', other engines do not support planOnly currently.|string|1.4.0
+kyuubi.operation.plan.only.mode|none|Configures the statement performed mode, The value can be 'parse', 'analyze', 'optimize', 'optimize_with_stats', 'physical', 'execution', 'lineage' or 'none', when it is 'none', indicate to the statement will be fully executed, otherwise only way without executing the query. different engines currently support different modes, the Spark engine supports all modes, and the Flink engine supports 'parse', 'physical', and 'execution', other engines do not support planOnly currently.|string|1.4.0
kyuubi.operation.plan.only.output.style|plain|Configures the planOnly output style, The value can be 'plain' and 'json', default value is 'plain', this configuration supports only the output styles of the Spark engine|string|1.7.0
kyuubi.operation.progress.enabled|false|Whether to enable the operation progress. When true, the operation progress will be returned in `GetOperationStatus`.|boolean|1.6.0
kyuubi.operation.query.timeout|<undefined>|Timeout for query executions at server-side, take affect with client-side timeout(`java.sql.Statement.setQueryTimeout`) together, a running query will be cancelled automatically if timeout. It's off by default, which means only client-side take fully control whether the query should timeout or not. If set, client-side timeout capped at this point. To cancel the queries right away without waiting task to finish, consider enabling kyuubi.operation.interrupt.on.cancel together.|duration|1.2.0
diff --git a/externals/kyuubi-spark-sql-engine/pom.xml b/externals/kyuubi-spark-sql-engine/pom.xml
index 55709cd79a1..4cc287ea8a7 100644
--- a/externals/kyuubi-spark-sql-engine/pom.xml
+++ b/externals/kyuubi-spark-sql-engine/pom.xml
@@ -51,6 +51,13 @@
${project.version}
+
+ org.apache.kyuubi
+ kyuubi-spark-lineage_${scala.binary.version}
+ ${project.version}
+ provided
+
+
io.grpc
grpc-core
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala
index 486e6bb63bf..493ea796861 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala
@@ -17,17 +17,21 @@
package org.apache.kyuubi.engine.spark.operation
-import org.apache.spark.sql.Row
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import org.apache.spark.kyuubi.SparkUtilsHelper
+import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiConf.{OPERATION_PLAN_ONLY_EXCLUDES, OPERATION_PLAN_ONLY_OUT_STYLE}
-import org.apache.kyuubi.operation.{AnalyzeMode, ArrayFetchIterator, ExecutionMode, IterableFetchIterator, JsonStyle, OptimizeMode, OptimizeWithStatsMode, ParseMode, PhysicalMode, PlainStyle, PlanOnlyMode, PlanOnlyStyle, UnknownMode, UnknownStyle}
+import org.apache.kyuubi.operation._
import org.apache.kyuubi.operation.PlanOnlyMode.{notSupportedModeError, unknownModeError}
import org.apache.kyuubi.operation.PlanOnlyStyle.{notSupportedStyleError, unknownStyleError}
import org.apache.kyuubi.operation.log.OperationLog
+import org.apache.kyuubi.plugin.lineage.helper.LineageParser
import org.apache.kyuubi.session.Session
/**
@@ -112,6 +116,9 @@ class PlanOnlyStatement(
case ExecutionMode =>
val executed = spark.sql(statement).queryExecution.executedPlan
iter = new IterableFetchIterator(Seq(Row(executed.toString())))
+ case LineageMode =>
+ val result = parseLineage(spark, plan)
+ iter = new IterableFetchIterator(Seq(Row(result)))
case UnknownMode => throw unknownModeError(mode)
case _ => throw notSupportedModeError(mode, "Spark SQL")
}
@@ -136,10 +143,33 @@ class PlanOnlyStatement(
case ExecutionMode =>
val executed = spark.sql(statement).queryExecution.executedPlan
iter = new IterableFetchIterator(Seq(Row(executed.toJSON)))
+ case LineageMode =>
+ val result = parseLineage(spark, plan)
+ iter = new IterableFetchIterator(Seq(Row(result)))
case UnknownMode => throw unknownModeError(mode)
case _ =>
throw KyuubiSQLException(s"The operation mode $mode" +
" doesn't support in Spark SQL engine.")
}
}
+
+ private def parseLineage(spark: SparkSession, plan: LogicalPlan): String = {
+ val analyzed = spark.sessionState.analyzer.execute(plan)
+ spark.sessionState.analyzer.checkAnalysis(analyzed)
+ val optimized = spark.sessionState.optimizer.execute(analyzed)
+ try {
+ if (!SparkUtilsHelper.classesArePresent(
+ "org.apache.kyuubi.plugin.lineage.helper.LineageParser")) {
+ throw new Exception("'org.apache.kyuubi.plugin.lineage.helper.LineageParser' not found," +
+ " need to install kyuubi-spark-lineage plugin before using the 'lineage' mode")
+ }
+ val lineageParser = new LineageParser { def sparkSession = spark }
+ val lineage = lineageParser.parse(optimized)
+ val mapper = new ObjectMapper().registerModule(DefaultScalaModule)
+ mapper.writeValueAsString(lineage)
+ } catch {
+ case e: Throwable =>
+ throw KyuubiSQLException(s"Extract columns lineage failed: ${e.getMessage}", e)
+ }
+ }
}
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkUtilsHelper.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkUtilsHelper.scala
index e2f51e648c0..106be3fc789 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkUtilsHelper.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkUtilsHelper.scala
@@ -43,4 +43,13 @@ object SparkUtilsHelper extends Logging {
def getLocalDir(conf: SparkConf): String = {
Utils.getLocalDir(conf)
}
+
+ def classesArePresent(className: String): Boolean = {
+ try {
+ Utils.classForName(className)
+ true
+ } catch {
+ case _: ClassNotFoundException | _: NoClassDefFoundError => false
+ }
+ }
}
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index d6ec9f86a1d..c4ae9cb0c42 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -1648,7 +1648,7 @@ object KyuubiConf {
val OPERATION_PLAN_ONLY_MODE: ConfigEntry[String] =
buildConf("kyuubi.operation.plan.only.mode")
.doc("Configures the statement performed mode, The value can be 'parse', 'analyze', " +
- "'optimize', 'optimize_with_stats', 'physical', 'execution', or 'none', " +
+ "'optimize', 'optimize_with_stats', 'physical', 'execution', 'lineage' or 'none', " +
"when it is 'none', indicate to the statement will be fully executed, otherwise " +
"only way without executing the query. different engines currently support different " +
"modes, the Spark engine supports all modes, and the Flink engine supports 'parse', " +
@@ -1665,10 +1665,11 @@ object KyuubiConf {
"OPTIMIZE_WITH_STATS",
"PHYSICAL",
"EXECUTION",
+ "LINEAGE",
"NONE").contains(mode),
"Invalid value for 'kyuubi.operation.plan.only.mode'. Valid values are" +
"'parse', 'analyze', 'optimize', 'optimize_with_stats', 'physical', 'execution' and " +
- "'none'.")
+ "'lineage', 'none'.")
.createWithDefault(NoneMode.name)
val OPERATION_PLAN_ONLY_OUT_STYLE: ConfigEntry[String] =
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/PlanOnlyMode.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/PlanOnlyMode.scala
index 3e170f05fc0..0407dab6266 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/PlanOnlyMode.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/PlanOnlyMode.scala
@@ -41,6 +41,8 @@ case object PhysicalMode extends PlanOnlyMode { val name = "physical" }
case object ExecutionMode extends PlanOnlyMode { val name = "execution" }
+case object LineageMode extends PlanOnlyMode { val name = "lineage" }
+
case object NoneMode extends PlanOnlyMode { val name = "none" }
case object UnknownMode extends PlanOnlyMode {
@@ -64,6 +66,7 @@ object PlanOnlyMode {
case OptimizeWithStatsMode.name => OptimizeWithStatsMode
case PhysicalMode.name => PhysicalMode
case ExecutionMode.name => ExecutionMode
+ case LineageMode.name => LineageMode
case NoneMode.name => NoneMode
case other => UnknownMode.mode(other)
}
diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkQueryTests.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkQueryTests.scala
index 72694848526..bd5b7bf93f4 100644
--- a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkQueryTests.scala
+++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkQueryTests.scala
@@ -499,6 +499,31 @@ trait SparkQueryTests extends HiveJDBCTestHelper {
}
}
+ test("KYUUBI #3444: Plan only operation for lineage") {
+ val ddl = "create table if not exists t0(a int) using parquet"
+ val dql = "select * from t0"
+ withJdbcStatement("t0") { statement =>
+ statement.execute(ddl)
+ statement.execute("SET kyuubi.operation.plan.only.mode=lineage")
+ val lineageParserClassName = "org.apache.kyuubi.plugin.lineage.helper.LineageParser"
+ try {
+ val resultSet = statement.executeQuery(dql)
+ assert(resultSet.next())
+ val actualResult =
+ """
+ |{"inputTables":["default.t0"],"outputTables":[],
+ |"columnLineage":[{"column":"a","originalColumns":["default.t0.a"]}]}
+ |""".stripMargin.split("\n").mkString("")
+ assert(resultSet.getString(1) == actualResult)
+ } catch {
+ case e: Throwable =>
+ assert(e.getMessage.contains(s"'$lineageParserClassName' not found"))
+ } finally {
+ statement.executeQuery("SET kyuubi.operation.plan.only.mode=none")
+ }
+ }
+ }
+
test("execute simple scala code") {
withJdbcStatement() { statement =>
statement.execute("SET kyuubi.operation.language=scala")
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/PlanOnlyOperationSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/PlanOnlyOperationSuite.scala
index 6a37e823db5..b4e52d853bf 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/PlanOnlyOperationSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/PlanOnlyOperationSuite.scala
@@ -201,6 +201,34 @@ class PlanOnlyOperationSuite extends WithKyuubiServer with HiveJDBCTestHelper {
}
}
+ test("kyuubi #3444: Plan only mode with lineage mode") {
+
+ val ddl = "create table tt(a int) using parquet"
+ val dql = "select * from tt"
+ withSessionConf()(Map(KyuubiConf.OPERATION_PLAN_ONLY_MODE.key -> NoneMode.name))() {
+ withJdbcStatement("tt") { statement =>
+ statement.execute(ddl)
+ statement.execute("SET kyuubi.operation.plan.only.mode=lineage")
+ val lineageParserClassName = "org.apache.kyuubi.plugin.lineage.helper.LineageParser"
+ try {
+ val resultSet = statement.executeQuery(dql)
+ assert(resultSet.next())
+ val actualResult =
+ """
+ |{"inputTables":["default.t0"],"outputTables":[],
+ |"columnLineage":[{"column":"a","originalColumns":["default.t0.a"]}]}
+ |""".stripMargin.split("\n").mkString("")
+ assert(resultSet.getString(1) == actualResult)
+ } catch {
+ case e: Throwable =>
+ assert(e.getMessage.contains(s"'$lineageParserClassName' not found"))
+ } finally {
+ statement.executeQuery("SET kyuubi.operation.plan.only.mode=none")
+ }
+ }
+ }
+ }
+
private def getOperationPlanWithStatement(statement: Statement): String = {
val resultSet = statement.executeQuery("select 1 where true")
assert(resultSet.next())