diff --git a/docs/configuration/settings.md b/docs/configuration/settings.md
index 674568ad2ac..323392eae17 100644
--- a/docs/configuration/settings.md
+++ b/docs/configuration/settings.md
@@ -359,7 +359,7 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co
### Operation
-| Key | Default | Meaning | Type | Since |
+| Key | Default | Meaning | Type | Since |
|--------------------------------------------------|---------------------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|-------|
| kyuubi.operation.getTables.ignoreTableProperties | false | Speed up the `GetTables` operation by returning table identities only. | boolean | 1.8.0 |
| kyuubi.operation.idle.timeout | PT3H | Operation will be closed when it's not accessed for this duration of time | duration | 1.0.0 |
@@ -367,7 +367,7 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co
| kyuubi.operation.language | SQL | Choose a programing language for the following inputs
- SQL: (Default) Run all following statements as SQL queries.
- SCALA: Run all following input as scala codes
- PYTHON: (Experimental) Run all following input as Python codes with Spark engine
| string | 1.5.0 |
| kyuubi.operation.log.dir.root | server_operation_logs | Root directory for query operation log at server-side. | string | 1.4.0 |
| kyuubi.operation.plan.only.excludes | SetCatalogAndNamespace,UseStatement,SetNamespaceCommand,SetCommand,ResetCommand | Comma-separated list of query plan names, in the form of simple class names, i.e, for `SET abc=xyz`, the value will be `SetCommand`. For those auxiliary plans, such as `switch databases`, `set properties`, or `create temporary view` etc., which are used for setup evaluating environments for analyzing actual queries, we can use this config to exclude them and let them take effect. See also kyuubi.operation.plan.only.mode. | set | 1.5.0 |
-| kyuubi.operation.plan.only.mode | none | Configures the statement performed mode, The value can be 'parse', 'analyze', 'optimize', 'optimize_with_stats', 'physical', 'execution', or 'none', when it is 'none', indicate to the statement will be fully executed, otherwise only way without executing the query. different engines currently support different modes, the Spark engine supports all modes, and the Flink engine supports 'parse', 'physical', and 'execution', other engines do not support planOnly currently. | string | 1.4.0 |
+| kyuubi.operation.plan.only.mode | none | Configures the statement performed mode, The value can be 'parse', 'analyze', 'optimize', 'optimize_with_stats', 'physical', 'execution', 'lineage' or 'none', when it is 'none', indicate to the statement will be fully executed, otherwise only way without executing the query. different engines currently support different modes, the Spark engine supports all modes, and the Flink engine supports 'parse', 'physical', and 'execution', other engines do not support planOnly currently. | string | 1.4.0 |
| kyuubi.operation.plan.only.output.style | plain | Configures the planOnly output style. The value can be 'plain' or 'json', and the default value is 'plain'. This configuration supports only the output styles of the Spark engine | string | 1.7.0 |
| kyuubi.operation.progress.enabled | false | Whether to enable the operation progress. When true, the operation progress will be returned in `GetOperationStatus`. | boolean | 1.6.0 |
| kyuubi.operation.query.timeout | <undefined> | Timeout for query executions at server-side, take effect with client-side timeout(`java.sql.Statement.setQueryTimeout`) together, a running query will be cancelled automatically if timeout. It's off by default, which means only client-side take full control of whether the query should timeout or not. If set, client-side timeout is capped at this point. To cancel the queries right away without waiting for task to finish, consider enabling kyuubi.operation.interrupt.on.cancel together. | duration | 1.2.0 |
diff --git a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/LineageParserProvider.scala b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/LineageParserProvider.scala
new file mode 100644
index 00000000000..665efef100e
--- /dev/null
+++ b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/LineageParserProvider.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.plugin.lineage
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+
+import org.apache.kyuubi.plugin.lineage.helper.SparkSQLLineageParseHelper
+object LineageParserProvider {
+ def parse(spark: SparkSession, plan: LogicalPlan): Lineage = {
+ SparkSQLLineageParseHelper(spark).parse(plan)
+ }
+}
diff --git a/externals/kyuubi-spark-sql-engine/pom.xml b/externals/kyuubi-spark-sql-engine/pom.xml
index 52ceb945dde..c27f85fa9f1 100644
--- a/externals/kyuubi-spark-sql-engine/pom.xml
+++ b/externals/kyuubi-spark-sql-engine/pom.xml
@@ -170,6 +170,13 @@
flexmark-all
test
+
+
+ org.apache.kyuubi
+ kyuubi-spark-lineage_${scala.binary.version}
+ ${project.version}
+ test
+
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala
index 1ba3411cb7e..4f88083130a 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala
@@ -17,14 +17,17 @@
package org.apache.kyuubi.engine.spark.operation
-import org.apache.spark.sql.Row
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import org.apache.spark.kyuubi.SparkUtilsHelper
+import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
import org.apache.kyuubi.KyuubiSQLException
-import org.apache.kyuubi.config.KyuubiConf.{OPERATION_PLAN_ONLY_EXCLUDES, OPERATION_PLAN_ONLY_OUT_STYLE}
-import org.apache.kyuubi.operation.{AnalyzeMode, ArrayFetchIterator, ExecutionMode, IterableFetchIterator, JsonStyle, OperationHandle, OptimizeMode, OptimizeWithStatsMode, ParseMode, PhysicalMode, PlainStyle, PlanOnlyMode, PlanOnlyStyle, UnknownMode, UnknownStyle}
+import org.apache.kyuubi.config.KyuubiConf.{LINEAGE_PARSER_PLUGIN_PROVIDER, OPERATION_PLAN_ONLY_EXCLUDES, OPERATION_PLAN_ONLY_OUT_STYLE}
+import org.apache.kyuubi.operation.{AnalyzeMode, ArrayFetchIterator, ExecutionMode, IterableFetchIterator, JsonStyle, LineageMode, OperationHandle, OptimizeMode, OptimizeWithStatsMode, ParseMode, PhysicalMode, PlainStyle, PlanOnlyMode, PlanOnlyStyle, UnknownMode, UnknownStyle}
import org.apache.kyuubi.operation.PlanOnlyMode.{notSupportedModeError, unknownModeError}
import org.apache.kyuubi.operation.PlanOnlyStyle.{notSupportedStyleError, unknownStyleError}
import org.apache.kyuubi.operation.log.OperationLog
@@ -160,14 +163,19 @@ class PlanOnlyStatement(
val analyzed = spark.sessionState.analyzer.execute(plan)
spark.sessionState.analyzer.checkAnalysis(analyzed)
val optimized = spark.sessionState.optimizer.execute(analyzed)
+ val parserProviderClass = session.sessionManager.getConf.get(LINEAGE_PARSER_PLUGIN_PROVIDER)
+
try {
if (!SparkUtilsHelper.classesArePresent(
- "org.apache.kyuubi.plugin.lineage.helper.LineageParser")) {
- throw new Exception("'org.apache.kyuubi.plugin.lineage.helper.LineageParser' not found," +
+ parserProviderClass)) {
+ throw new Exception(s"'$parserProviderClass' not found," +
" need to install kyuubi-spark-lineage plugin before using the 'lineage' mode")
}
- val lineageParser = new LineageParser { def sparkSession = spark }
- val lineage = lineageParser.parse(optimized)
+
+ val lineage = Class.forName(parserProviderClass)
+ .getMethod("parse", classOf[SparkSession], classOf[LogicalPlan])
+ .invoke(null, spark, optimized)
+
val mapper = new ObjectMapper().registerModule(DefaultScalaModule)
mapper.writeValueAsString(lineage)
} catch {
@@ -175,4 +183,5 @@ class PlanOnlyStatement(
throw KyuubiSQLException(s"Extract columns lineage failed: ${e.getMessage}", e)
}
}
+
}
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 995142b734a..bbbb73b9546 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -2413,6 +2413,13 @@ object KyuubiConf {
"UseStatement",
"SetCatalogAndNamespace"))
+ val LINEAGE_PARSER_PLUGIN_PROVIDER: ConfigEntry[String] =
+ buildConf("kyuubi.lineage.parser.plugin.provider")
+ .doc("The provider for the Spark lineage parser plugin.")
+ .version("1.8.0")
+ .stringConf
+ .createWithDefault("org.apache.kyuubi.plugin.lineage.LineageParserProvider")
+
object OperationLanguages extends Enumeration with Logging {
type OperationLanguage = Value
val PYTHON, SQL, SCALA, UNKNOWN = Value
diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkQueryTests.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkQueryTests.scala
index d1c114e2b1e..20d3f6fad5b 100644
--- a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkQueryTests.scala
+++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkQueryTests.scala
@@ -226,14 +226,15 @@ trait SparkQueryTests extends SparkDataTypeTests with HiveJDBCTestHelper {
withJdbcStatement("t0") { statement =>
statement.execute(ddl)
statement.execute("SET kyuubi.operation.plan.only.mode=lineage")
- val lineageParserClassName = "org.apache.kyuubi.plugin.lineage.helper.LineageParser"
+ val lineageParserClassName = "org.apache.kyuubi.plugin.lineage.LineageParserProvider"
+
try {
val resultSet = statement.executeQuery(dql)
assert(resultSet.next())
val actualResult =
"""
- |{"inputTables":["default.t0"],"outputTables":[],
- |"columnLineage":[{"column":"a","originalColumns":["default.t0.a"]}]}
+ |{"inputTables":["spark_catalog.default.t0"],"outputTables":[],
+ |"columnLineage":[{"column":"a","originalColumns":["spark_catalog.default.t0.a"]}]}
|""".stripMargin.split("\n").mkString("")
assert(resultSet.getString(1) == actualResult)
} catch {
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/PlanOnlyOperationSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/PlanOnlyOperationSuite.scala
index 730908d3f4a..8773440a686 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/PlanOnlyOperationSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/PlanOnlyOperationSuite.scala
@@ -209,14 +209,14 @@ class PlanOnlyOperationSuite extends WithKyuubiServer with HiveJDBCTestHelper {
withJdbcStatement("t0") { statement =>
statement.execute(ddl)
statement.execute("SET kyuubi.operation.plan.only.mode=lineage")
- val lineageParserClassName = "org.apache.kyuubi.plugin.lineage.helper.LineageParser"
+ val lineageParserClassName = "org.apache.kyuubi.plugin.lineage.LineageParserProvider"
try {
val resultSet = statement.executeQuery(dql)
assert(resultSet.next())
val actualResult =
"""
- |{"inputTables":["default.t0"],"outputTables":[],
- |"columnLineage":[{"column":"a","originalColumns":["default.t0.a"]}]}
+ |{"inputTables":["spark_catalog.default.t0"],"outputTables":[],
+ |"columnLineage":[{"column":"a","originalColumns":["spark_catalog.default.t0.a"]}]}
|""".stripMargin.split("\n").mkString("")
assert(resultSet.getString(1) == actualResult)
} catch {