Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-28814 Add OpenLineage reporting support for Spark connector #135

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@
<scoverage.version>1.4.11</scoverage.version>
<sbt-compiler.version>1.0.0</sbt-compiler.version>
<jacoco.version>0.8.8</jacoco.version>
<openlineage.version>1.20.5</openlineage.version>
</properties>
<dependencyManagement>
<dependencies>
Expand Down Expand Up @@ -243,6 +244,17 @@
<artifactId>audience-annotations</artifactId>
<version>${audience-annotations.version}</version>
</dependency>
<dependency>
<groupId>io.openlineage</groupId>
<artifactId>spark-extension-interfaces</artifactId>
<version>${openlineage.version}</version>
</dependency>
<dependency>
<groupId>io.openlineage</groupId>
<artifactId>spark-extension-entrypoint</artifactId>
<version>1.0.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-annotations</artifactId>
Expand Down Expand Up @@ -387,7 +399,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.1</version>
<version>3.4.1</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
Expand Down
35 changes: 35 additions & 0 deletions spark/hbase-spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,21 @@
<artifactId>spark-catalyst_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>io.openlineage</groupId>
<artifactId>spark-extension-interfaces</artifactId>
</dependency>
<dependency>
<groupId>io.openlineage</groupId>
<artifactId>spark-extension-entrypoint</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.openlineage</groupId>
<artifactId>openlineage-spark_2.12</artifactId>
<version>${openlineage.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down Expand Up @@ -265,6 +280,26 @@
</rules>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>shade</goal>
</goals>
<phase>package</phase>
<configuration>
<relocations>
<relocation>
<pattern>io.openlineage.spark.shade</pattern>
<shadedPattern>org.apache.hbase.thirdparty.io.openlineage.spark.shade</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.apache.hadoop.hbase.spark.SparkHBaseLineageProvider
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
*/
package org.apache.hadoop.hbase.spark

import io.openlineage.spark.shade.client.OpenLineage
import io.openlineage.spark.shade.client.utils.DatasetIdentifier
import io.openlineage.spark.shade.extension.v1.{LineageRelation, LineageRelationProvider}
import java.util
import java.util.concurrent.ConcurrentLinkedQueue
import org.apache.hadoop.hbase.CellUtil
Expand Down Expand Up @@ -53,7 +56,11 @@ import scala.collection.mutable
* Through the HBase Bytes object commands.
*/
@InterfaceAudience.Private
class DefaultSource extends RelationProvider with CreatableRelationProvider with Logging {
class DefaultSource
extends RelationProvider
with CreatableRelationProvider
with Logging
with LineageRelationProvider {

/**
* Is given input from SparkSQL to construct a BaseRelation
Expand All @@ -78,6 +85,19 @@ class DefaultSource extends RelationProvider with CreatableRelationProvider with
relation.insert(data, false)
relation
}

def getLineageDatasetIdentifier(
sparkListenerEventName: String,
openLineage: OpenLineage,
sqlContext: Any,
parameters: Any): DatasetIdentifier = {
val params = parameters.asInstanceOf[Map[String, String]]
val hbaseContext = LatestHBaseContextCache.latest
val catalog = HBaseTableCatalog(params)
val name = s"${catalog.namespace}.${catalog.name}"
val namespace = s"hbase://${hbaseContext.config.get("hbase.zookeeper.quorum")}"
new DatasetIdentifier(name, namespace)
}
}

/**
Expand All @@ -93,7 +113,8 @@ case class HBaseRelation(
extends BaseRelation
with PrunedFilteredScan
with InsertableRelation
with Logging {
with Logging
with LineageRelation {
val timestamp = parameters.get(HBaseSparkConf.TIMESTAMP).map(_.toLong)
val minTimestamp = parameters.get(HBaseSparkConf.TIMERANGE_START).map(_.toLong)
val maxTimestamp = parameters.get(HBaseSparkConf.TIMERANGE_END).map(_.toLong)
Expand Down Expand Up @@ -611,6 +632,14 @@ case class HBaseRelation(
new PassThroughLogicExpression
}
}

def getLineageDatasetIdentifier(
sparkListenerEventName: String,
openLineage: OpenLineage): DatasetIdentifier = {
val name = s"${this.catalog.namespace}.${this.catalog.name}"
val namespace = s"hbase://${this.hbaseConf.get("hbase.zookeeper.quorum")}"
new DatasetIdentifier(name, namespace)
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.spark

import io.openlineage.spark.extension.OpenLineageExtensionProvider
import io.openlineage.spark.shade.extension.v1.lifecycle.plan.SparkOpenLineageExtensionVisitor

class SparkHBaseLineageProvider extends OpenLineageExtensionProvider {

def shadedPackage(): String =
"org.apache.hbase.thirdparty.io.openlineage.spark.shade"

override def getVisitorClassName: String =
classOf[SparkOpenLineageExtensionVisitor].getCanonicalName
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.spark

import io.openlineage.spark.agent.OpenLineageSparkListener
import java.io.File
import org.apache.hadoop.hbase.{HBaseTestingUtility, TableName}
import org.apache.hadoop.hbase.client.{ConnectionFactory, Put}
import org.apache.hadoop.hbase.spark.datasources.{HBaseSparkConf, HBaseTableCatalog}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SparkSession, SQLContext}
import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
import org.scalatest.Matchers.convertToAnyShouldWrapper
import org.scalatest.concurrent.Eventually
import scala.collection.mutable.ArrayBuffer
import scala.io.Source

class OpenLineageSuite
extends FunSuite
with Eventually
with BeforeAndAfterEach
with BeforeAndAfterAll
with Logging {
@transient var sc: SparkSession = null
var TEST_UTIL: HBaseTestingUtility = new HBaseTestingUtility

val t1TableName = "t1"
val t2TableName = "t2"
val columnFamily = "c"
var sqlContext: SQLContext = null

val timestamp = 1234567890000L
val lineageFile = File.createTempFile(s"openlineage_test_${System.nanoTime()}", ".log")

override def beforeAll() {

TEST_UTIL.startMiniCluster

logInfo(" - minicluster started")
try
TEST_UTIL.deleteTable(TableName.valueOf(t1TableName))
catch {
case e: Exception => logInfo(" - no table " + t1TableName + " found")
}
try
TEST_UTIL.deleteTable(TableName.valueOf(t2TableName))
catch {
case e: Exception => logInfo(" - no table " + t2TableName + " found")
}

logInfo(" - creating table " + t1TableName)
TEST_UTIL.createTable(TableName.valueOf(t1TableName), Bytes.toBytes(columnFamily))
logInfo(" - created table")
logInfo(" - creating table " + t2TableName)
TEST_UTIL.createTable(TableName.valueOf(t2TableName), Bytes.toBytes(columnFamily))
logInfo(" - created table")

val sparkConf = new SparkConf
sparkConf.set(HBaseSparkConf.QUERY_CACHEBLOCKS, "true")
sparkConf.set(HBaseSparkConf.QUERY_BATCHSIZE, "100")
sparkConf.set(HBaseSparkConf.QUERY_CACHEDROWS, "100")
sparkConf.set("spark.extraListeners", classOf[OpenLineageSparkListener].getCanonicalName)
sparkConf.set("spark.openlineage.transport.type", "file")
sparkConf.set("spark.openlineage.transport.location", lineageFile.getAbsolutePath)

sc = SparkSession
.builder()
.master("local")
.appName("openlineage-test")
.config(sparkConf)
.getOrCreate();
val connection = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration)
try {
val t1Table = connection.getTable(TableName.valueOf(t1TableName))

try {
var put = new Put(Bytes.toBytes("get1"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("1"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(1))
t1Table.put(put)
put = new Put(Bytes.toBytes("get2"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo2"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("4"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(4))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("z"), Bytes.toBytes("FOO"))
t1Table.put(put)
put = new Put(Bytes.toBytes("get3"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("8"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(8))
t1Table.put(put)
put = new Put(Bytes.toBytes("get4"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo4"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("10"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(10))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("z"), Bytes.toBytes("BAR"))
t1Table.put(put)
put = new Put(Bytes.toBytes("get5"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo5"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("8"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(8))
t1Table.put(put)
} finally {
t1Table.close()
}
} finally {
connection.close()
}

new HBaseContext(sc.sparkContext, TEST_UTIL.getConfiguration)
}

override def afterAll() {
TEST_UTIL.deleteTable(TableName.valueOf(t1TableName))
logInfo("shuting down minicluster")
TEST_UTIL.shutdownMiniCluster()

sc.stop()
}

override def beforeEach(): Unit = {
DefaultSourceStaticUtils.lastFiveExecutionRules.clear()
}

test("Test rowKey point only rowKey query") {
val hbaseTable1Catalog =
s"""{
|"table":{"namespace":"default", "name":"t1"},
|"rowkey":"key",
|"columns":{
|"KEY_FIELD":{"cf":"rowkey", "col":"key", "type":"string"},
|"A_FIELD":{"cf":"c", "col":"a", "type":"string"},
|"B_FIELD":{"cf":"c", "col":"b", "type":"string"}
|}
|}""".stripMargin

val hbaseTable2Catalog =
s"""{
|"table":{"namespace":"default", "name":"t2"},
|"rowkey":"key",
|"columns":{
|"KEY_FIELD":{"cf":"rowkey", "col":"key", "type":"string"},
|"OUTPUT_A_FIELD":{"cf":"c", "col":"a", "type":"string"},
|"OUTPUT_B_FIELD":{"cf":"c", "col":"b", "type":"string"}
|}
|}""".stripMargin

val results = sc.read
.options(Map(HBaseTableCatalog.tableCatalog -> hbaseTable1Catalog))
.format("org.apache.hadoop.hbase.spark")
.load()

results.createOrReplaceTempView("tempview");

val outputDf =
sc.sql("SELECT KEY_FIELD, A_FIELD AS OUTPUT_A_FIELD, B_FIELD AS OUTPUT_B_FIELD FROM tempview")

outputDf.write
.format("org.apache.hadoop.hbase.spark")
.options(Map(HBaseTableCatalog.tableCatalog -> hbaseTable2Catalog))
.save()

val events = eventually {
val eventLog = parseEventLog(lineageFile); eventLog.size shouldBe 1; eventLog
}

val json = events.head
assert(((json \\ "inputs")(0) \ "name") == JString("default.t1"))
assert(((json \\ "inputs")(0) \ "namespace") == JString("hbase://127.0.0.1"))
assert(((json \\ "outputs")(0) \ "name") == JString("default.t2"))
assert(((json \\ "outputs")(0) \ "namespace") == JString("hbase://127.0.0.1"))
}

def parseEventLog(file: File): List[JValue] = {
val source = Source.fromFile(file)
val eventlist = ArrayBuffer.empty[JValue]
for (line <- source.getLines()) {
val event = parse(line)
for {
JObject(child) <- event
JField("inputs", JArray(inputs)) <- child
JField("outputs", JArray(outputs)) <- child
JField("eventType", JString(eventType)) <- child
if outputs.nonEmpty && inputs.nonEmpty && eventType == "COMPLETE"
} yield eventlist += event
}
eventlist.toList
}
}