From 011abfcdf2acb1919d0a47e22123e520a3f79f58 Mon Sep 17 00:00:00 2001 From: odone Date: Tue, 25 Oct 2022 10:26:45 +0800 Subject: [PATCH] remove spark lineage plugin from engine --- docs/extensions/engines/spark/lineage.md | 32 +++++++++- .../plugin/lineage/LineageParserRule.scala | 58 ++++++++++++++++++ .../lineage/LineageSparkExtension.scala | 20 ++----- externals/kyuubi-spark-sql-engine/pom.xml | 7 --- .../spark/operation/PlanOnlyStatement.scala | 6 +- .../operation/PlanOnlyOperationSuite.scala | 59 +++++++++++++++++++ .../kyuubi/operation/SparkQueryTests.scala | 28 --------- .../operation/PlanOnlyOperationSuite.scala | 2 +- 8 files changed, 156 insertions(+), 56 deletions(-) create mode 100644 extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/LineageParserRule.scala rename externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/LineageParserWrapper.scala => extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/LineageSparkExtension.scala (56%) create mode 100644 externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyOperationSuite.scala diff --git a/docs/extensions/engines/spark/lineage.md b/docs/extensions/engines/spark/lineage.md index 6a00db1bf36..9255c63de48 100644 --- a/docs/extensions/engines/spark/lineage.md +++ b/docs/extensions/engines/spark/lineage.md @@ -152,7 +152,15 @@ With the `kyuubi-spark-lineage_*.jar` and its transitive dependencies available ## Configure -### Settings for Spark Listener Extensions +The two outputs of the lineage are supported here: +1. Logging lineage information to the event logger. +2. Using `planOnly` mode, input SQL, and return lineage information as a result of execution. + +We can choose to use either or both. + +### Logging Lineage Event + +#### Settings for Spark Listener Extensions Add `org.apache.kyuubi.plugin.lineage.SparkOperationLineageQueryExecutionListener` to the spark configuration `spark.sql.queryExecutionListeners`. @@ -160,15 +168,33 @@ Add `org.apache.kyuubi.plugin.lineage.SparkOperationLineageQueryExecutionListene spark.sql.queryExecutionListeners=org.apache.kyuubi.plugin.lineage.SparkOperationLineageQueryExecutionListener ``` -### Settings for Lineage Logger and Path +#### Settings for Lineage Logger and Path -#### Lineage Logger Path The location of all the engine operation lineage events go for the builtin JSON logger. We first need set `kyuubi.engine.event.loggers` to `JSON`. All operation lineage events will be written in the unified event json logger path, which be setting with `kyuubi.engine.event.json.log.path`. We can get the lineage logger from the `operation_lineage` dir in the `kyuubi.engine.event.json.log.path`. +### The lineage `planOnly` mode + +#### Settings for Lineage Spark Extensions +Add `org.apache.kyuubi.plugin.lineage.LineageSparkExtension` to the spark configuration `spark.sql.extensions`. + +```properties +spark.sql.extensions=org.apache.kyuubi.plugin.lineage.LineageSparkExtension +``` + +### Setting the `planOnly` mode with `lineage` + +```sql +SET kyuubi.operation.plan.only.mode=lineage +``` +Here we can get the lineage information of the SQL instead of the real execution result. + + + + diff --git a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/LineageParserRule.scala b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/LineageParserRule.scala new file mode 100644 index 00000000000..2b6c2f0ccf2 --- /dev/null +++ b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/LineageParserRule.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.plugin.lineage + +import java.sql.SQLException + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.scala.DefaultScalaModule +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.Rule + +import org.apache.kyuubi.config.KyuubiConf.OPERATION_PLAN_ONLY_MODE +import org.apache.kyuubi.plugin.lineage.helper.LineageParser + +case class LineageParserRule(session: SparkSession) extends Rule[LogicalPlan] { + + override def apply(plan: LogicalPlan): LogicalPlan = { + session.conf.getOption(OPERATION_PLAN_ONLY_MODE.key).foreach { mode => + if (mode == "lineage") { + val lineage = LineageParserHelper(session).parseToJson(plan) + plan.conf.setConfString("current.sql.lineage", lineage) + } + } + plan + } +} + +case class LineageParserHelper(spark: SparkSession) extends LineageParser { + + def sparkSession: SparkSession = spark + + def parseToJson(plan: LogicalPlan): String = { + try { + val lineage = parse(plan) + val mapper = new ObjectMapper().registerModule(DefaultScalaModule) + mapper.writeValueAsString(lineage) + } catch { + case e: Exception => + throw new SQLException(s"Extract columns lineage failed.", e) + } + } +} diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/LineageParserWrapper.scala b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/LineageSparkExtension.scala similarity index 56% rename from externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/LineageParserWrapper.scala rename to extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/LineageSparkExtension.scala index 5cd18d25f75..a3fc52bd1fd 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/LineageParserWrapper.scala +++ b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/LineageSparkExtension.scala @@ -15,22 +15,14 @@ * limitations under the License. */ -package org.apache.kyuubi.engine.spark.operation +package org.apache.kyuubi.plugin.lineage -import com.fasterxml.jackson.databind.ObjectMapper -import com.fasterxml.jackson.module.scala.DefaultScalaModule -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.SparkSessionExtensions -import org.apache.kyuubi.plugin.lineage.helper.LineageParser +class LineageSparkExtension extends (SparkSessionExtensions => Unit) { -case class LineageParserWrapper(spark: SparkSession) extends LineageParser { - - def sparkSession: SparkSession = spark - - def parseToJson(plan: LogicalPlan): String = { - val lineage = parse(plan) - val mapper = new ObjectMapper().registerModule(DefaultScalaModule) - mapper.writeValueAsString(lineage) + override def apply(extensions: SparkSessionExtensions): Unit = { + extensions.injectOptimizerRule(LineageParserRule) } + } diff --git a/externals/kyuubi-spark-sql-engine/pom.xml b/externals/kyuubi-spark-sql-engine/pom.xml index c4d98e046a9..55709cd79a1 100644 --- a/externals/kyuubi-spark-sql-engine/pom.xml +++ b/externals/kyuubi-spark-sql-engine/pom.xml @@ -51,13 +51,6 @@ ${project.version} - - org.apache.kyuubi - kyuubi-spark-lineage_${scala.binary.version} - ${project.version} - true - - io.grpc grpc-core diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala index 7863e604d6a..b60af0d4086 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala @@ -156,11 +156,11 @@ class PlanOnlyStatement( val optimized = spark.sessionState.optimizer.execute(analyzed) try { if (!SparkUtilsHelper.classesArePresent( - "org.apache.kyuubi.plugin.lineage.helper.LineageParser")) { - throw new Exception("'org.apache.kyuubi.plugin.lineage.helper.LineageParser' not found," + + "org.apache.kyuubi.plugin.lineage.LineageSparkExtension")) { + throw new Exception("'org.apache.kyuubi.plugin.lineage.LineageSparkExtension' not found," + " need to install kyuubi-spark-lineage plugin before using the 'lineage' mode") } - LineageParserWrapper(spark).parseToJson(optimized) + optimized.conf.getConfString("current.sql.lineage", null) } catch { case e: Throwable => throw KyuubiSQLException(s"Extract columns lineage failed: ${e.getMessage}", e) diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyOperationSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyOperationSuite.scala new file mode 100644 index 00000000000..dfa43c86cbc --- /dev/null +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyOperationSuite.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine.spark.operation + +import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.engine.spark.WithSparkSQLEngine +import org.apache.kyuubi.operation.{HiveJDBCTestHelper, NoneMode} + +class PlanOnlyOperationSuite extends WithSparkSQLEngine with HiveJDBCTestHelper { + + override protected def jdbcUrl: String = getJdbcUrl + override def withKyuubiConf: Map[String, String] = Map( + "spark.sql.extensions" -> + "org.apache.kyuubi.plugin.lineage.LineageSparkExtension") + + test("kyuubi #3444: Plan only mode with lineage mode") { + + val ddl = "create table if not exists t0(a int) using parquet" + val dql = "select * from t0" + withSessionConf()(Map(KyuubiConf.OPERATION_PLAN_ONLY_MODE.key -> NoneMode.name))() { + withJdbcStatement("t0") { statement => + statement.execute(ddl) + statement.execute("SET kyuubi.operation.plan.only.mode=lineage") + val lineageParserClassName = "org.apache.kyuubi.plugin.lineage.LineageSparkExtension" + try { + val resultSet = statement.executeQuery(dql) + assert(resultSet.next()) + val actualResult = + """ + |{"inputTables":["default.t0"],"outputTables":[], + |"columnLineage":[{"column":"a","originalColumns":["default.t0.a"]}]} + |""".stripMargin.split("\n").mkString("") + assert(resultSet.getString(1) == actualResult) + } catch { + case e: Throwable => + assert(e.getMessage.contains(s"'$lineageParserClassName' not found")) + } finally { + statement.execute("SET kyuubi.operation.plan.only.mode=none") + } + } + } + } + +} diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkQueryTests.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkQueryTests.scala index eec5d0f398c..d227f400266 100644 --- a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkQueryTests.scala +++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkQueryTests.scala @@ -509,34 +509,6 @@ trait SparkQueryTests extends HiveJDBCTestHelper { } } - test("kyuubi #3444: Plan only mode with lineage mode") { - - val ddl = "create table if not exists t0(a int) using parquet" - val dql = "select * from t0" - withSessionConf()(Map(KyuubiConf.OPERATION_PLAN_ONLY_MODE.key -> NoneMode.name))() { - withJdbcStatement("t0") { statement => - statement.execute(ddl) - statement.execute("SET kyuubi.operation.plan.only.mode=lineage") - val lineageParserClassName = "org.apache.kyuubi.plugin.lineage.helper.LineageParser" - try { - val resultSet = statement.executeQuery(dql) - assert(resultSet.next()) - val actualResult = - """ - |{"inputTables":["default.t0"],"outputTables":[], - |"columnLineage":[{"column":"a","originalColumns":["default.t0.a"]}]} - |""".stripMargin.split("\n").mkString("") - assert(resultSet.getString(1) == actualResult) - } catch { - case e: Throwable => - assert(e.getMessage.contains(s"'$lineageParserClassName' not found")) - } finally { - statement.execute("SET kyuubi.operation.plan.only.mode=none") - } - } - } - } - test("execute simple scala code") { withJdbcStatement() { statement => statement.execute("SET kyuubi.operation.language=scala") diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/PlanOnlyOperationSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/PlanOnlyOperationSuite.scala index 730908d3f4a..ade1872c0c5 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/PlanOnlyOperationSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/PlanOnlyOperationSuite.scala @@ -209,7 +209,7 @@ class PlanOnlyOperationSuite extends WithKyuubiServer with HiveJDBCTestHelper { withJdbcStatement("t0") { statement => statement.execute(ddl) statement.execute("SET kyuubi.operation.plan.only.mode=lineage") - val lineageParserClassName = "org.apache.kyuubi.plugin.lineage.helper.LineageParser" + val lineageParserClassName = "org.apache.kyuubi.plugin.lineage.LineageSparkExtension" try { val resultSet = statement.executeQuery(dql) assert(resultSet.next())