Skip to content

Commit

Permalink
remove spark lineage plugin from engine
Browse files Browse the repository at this point in the history
  • Loading branch information
iodone committed Oct 25, 2022
1 parent 8b3c1c6 commit 011abfc
Show file tree
Hide file tree
Showing 8 changed files with 156 additions and 56 deletions.
32 changes: 29 additions & 3 deletions docs/extensions/engines/spark/lineage.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,23 +152,49 @@ With the `kyuubi-spark-lineage_*.jar` and its transitive dependencies available

## Configure

### Settings for Spark Listener Extensions
The two outputs of the lineage are supported here:
1. Logging lineage information to the event logger.
2. Using `planOnly` mode, input SQL, and return lineage information as a result of execution.

We can choose to use either or both.

### Logging Lineage Event

#### Settings for Spark Listener Extensions

Add `org.apache.kyuubi.plugin.lineage.SparkOperationLineageQueryExecutionListener` to the spark configuration `spark.sql.queryExecutionListeners`.

```properties
spark.sql.queryExecutionListeners=org.apache.kyuubi.plugin.lineage.SparkOperationLineageQueryExecutionListener
```

### Settings for Lineage Logger and Path
#### Settings for Lineage Logger and Path

#### Lineage Logger Path
The location of all the engine operation lineage events go for the builtin JSON logger.
We first need set `kyuubi.engine.event.loggers` to `JSON`.
All operation lineage events will be written in the unified event json logger path, which be setting with
`kyuubi.engine.event.json.log.path`. We can get the lineage logger from the `operation_lineage` dir in the
`kyuubi.engine.event.json.log.path`.

### The lineage `planOnly` mode

#### Settings for Lineage Spark Extensions
Add `org.apache.kyuubi.plugin.lineage.LineageSparkExtension` to the spark configuration `spark.sql.extensions`.

```properties
spark.sql.extensions=org.apache.kyuubi.plugin.lineage.LineageSparkExtension
```

### Setting the `planOnly` mode with `lineage`

```sql
SET kyuubi.operation.plan.only.mode=lineage
```
Here we can get the lineage information of the SQL instead of the real execution result.







Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.plugin.lineage

import java.sql.SQLException

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule

import org.apache.kyuubi.config.KyuubiConf.OPERATION_PLAN_ONLY_MODE
import org.apache.kyuubi.plugin.lineage.helper.LineageParser

case class LineageParserRule(session: SparkSession) extends Rule[LogicalPlan] {

override def apply(plan: LogicalPlan): LogicalPlan = {
session.conf.getOption(OPERATION_PLAN_ONLY_MODE.key).foreach { mode =>
if (mode == "lineage") {
val lineage = LineageParserHelper(session).parseToJson(plan)
plan.conf.setConfString("current.sql.lineage", lineage)
}
}
plan
}
}

case class LineageParserHelper(spark: SparkSession) extends LineageParser {

def sparkSession: SparkSession = spark

def parseToJson(plan: LogicalPlan): String = {
try {
val lineage = parse(plan)
val mapper = new ObjectMapper().registerModule(DefaultScalaModule)
mapper.writeValueAsString(lineage)
} catch {
case e: Exception =>
throw new SQLException(s"Extract columns lineage failed.", e)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,14 @@
* limitations under the License.
*/

package org.apache.kyuubi.engine.spark.operation
package org.apache.kyuubi.plugin.lineage

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.SparkSessionExtensions

import org.apache.kyuubi.plugin.lineage.helper.LineageParser
class LineageSparkExtension extends (SparkSessionExtensions => Unit) {

case class LineageParserWrapper(spark: SparkSession) extends LineageParser {

def sparkSession: SparkSession = spark

def parseToJson(plan: LogicalPlan): String = {
val lineage = parse(plan)
val mapper = new ObjectMapper().registerModule(DefaultScalaModule)
mapper.writeValueAsString(lineage)
override def apply(extensions: SparkSessionExtensions): Unit = {
extensions.injectOptimizerRule(LineageParserRule)
}

}
7 changes: 0 additions & 7 deletions externals/kyuubi-spark-sql-engine/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,6 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-spark-lineage_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<optional>true</optional>
</dependency>

<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,11 @@ class PlanOnlyStatement(
val optimized = spark.sessionState.optimizer.execute(analyzed)
try {
if (!SparkUtilsHelper.classesArePresent(
"org.apache.kyuubi.plugin.lineage.helper.LineageParser")) {
throw new Exception("'org.apache.kyuubi.plugin.lineage.helper.LineageParser' not found," +
"org.apache.kyuubi.plugin.lineage.LineageSparkExtension")) {
throw new Exception("'org.apache.kyuubi.plugin.lineage.LineageSparkExtension' not found," +
" need to install kyuubi-spark-lineage plugin before using the 'lineage' mode")
}
LineageParserWrapper(spark).parseToJson(optimized)
optimized.conf.getConfString("current.sql.lineage", null)
} catch {
case e: Throwable =>
throw KyuubiSQLException(s"Extract columns lineage failed: ${e.getMessage}", e)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.engine.spark.operation

import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.spark.WithSparkSQLEngine
import org.apache.kyuubi.operation.{HiveJDBCTestHelper, NoneMode}

class PlanOnlyOperationSuite extends WithSparkSQLEngine with HiveJDBCTestHelper {

override protected def jdbcUrl: String = getJdbcUrl
override def withKyuubiConf: Map[String, String] = Map(
"spark.sql.extensions" ->
"org.apache.kyuubi.plugin.lineage.LineageSparkExtension")

test("kyuubi #3444: Plan only mode with lineage mode") {

val ddl = "create table if not exists t0(a int) using parquet"
val dql = "select * from t0"
withSessionConf()(Map(KyuubiConf.OPERATION_PLAN_ONLY_MODE.key -> NoneMode.name))() {
withJdbcStatement("t0") { statement =>
statement.execute(ddl)
statement.execute("SET kyuubi.operation.plan.only.mode=lineage")
val lineageParserClassName = "org.apache.kyuubi.plugin.lineage.LineageSparkExtension"
try {
val resultSet = statement.executeQuery(dql)
assert(resultSet.next())
val actualResult =
"""
|{"inputTables":["default.t0"],"outputTables":[],
|"columnLineage":[{"column":"a","originalColumns":["default.t0.a"]}]}
|""".stripMargin.split("\n").mkString("")
assert(resultSet.getString(1) == actualResult)
} catch {
case e: Throwable =>
assert(e.getMessage.contains(s"'$lineageParserClassName' not found"))
} finally {
statement.execute("SET kyuubi.operation.plan.only.mode=none")
}
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -509,34 +509,6 @@ trait SparkQueryTests extends HiveJDBCTestHelper {
}
}

test("kyuubi #3444: Plan only mode with lineage mode") {

val ddl = "create table if not exists t0(a int) using parquet"
val dql = "select * from t0"
withSessionConf()(Map(KyuubiConf.OPERATION_PLAN_ONLY_MODE.key -> NoneMode.name))() {
withJdbcStatement("t0") { statement =>
statement.execute(ddl)
statement.execute("SET kyuubi.operation.plan.only.mode=lineage")
val lineageParserClassName = "org.apache.kyuubi.plugin.lineage.helper.LineageParser"
try {
val resultSet = statement.executeQuery(dql)
assert(resultSet.next())
val actualResult =
"""
|{"inputTables":["default.t0"],"outputTables":[],
|"columnLineage":[{"column":"a","originalColumns":["default.t0.a"]}]}
|""".stripMargin.split("\n").mkString("")
assert(resultSet.getString(1) == actualResult)
} catch {
case e: Throwable =>
assert(e.getMessage.contains(s"'$lineageParserClassName' not found"))
} finally {
statement.execute("SET kyuubi.operation.plan.only.mode=none")
}
}
}
}

test("execute simple scala code") {
withJdbcStatement() { statement =>
statement.execute("SET kyuubi.operation.language=scala")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ class PlanOnlyOperationSuite extends WithKyuubiServer with HiveJDBCTestHelper {
withJdbcStatement("t0") { statement =>
statement.execute(ddl)
statement.execute("SET kyuubi.operation.plan.only.mode=lineage")
val lineageParserClassName = "org.apache.kyuubi.plugin.lineage.helper.LineageParser"
val lineageParserClassName = "org.apache.kyuubi.plugin.lineage.LineageSparkExtension"
try {
val resultSet = statement.executeQuery(dql)
assert(resultSet.next())
Expand Down

0 comments on commit 011abfc

Please sign in to comment.