From ba0f9b558cfe669879bb90ff0444f79eae84d348 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dominik=20D=C4=99bowczyk?= Date: Thu, 22 Aug 2024 15:16:19 +0200 Subject: [PATCH] Add OpenLineage reporting support for Spark connector --- pom.xml | 14 +- spark/hbase-spark/pom.xml | 35 +++ ...ark.extension.OpenLineageExtensionProvider | 1 + .../hadoop/hbase/spark/DefaultSource.scala | 33 ++- .../spark/SparkHBaseLineageProvider.scala | 30 +++ .../hadoop/hbase/spark/OpenLineageSuite.scala | 208 ++++++++++++++++++ 6 files changed, 318 insertions(+), 3 deletions(-) create mode 100644 spark/hbase-spark/src/main/resources/META-INF/services/io.openlineage.spark.extension.OpenLineageExtensionProvider create mode 100644 spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/SparkHBaseLineageProvider.scala create mode 100644 spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/OpenLineageSuite.scala diff --git a/pom.xml b/pom.xml index 8038cfaa..43336ec2 100644 --- a/pom.xml +++ b/pom.xml @@ -173,6 +173,7 @@ 1.4.11 1.0.0 0.8.8 + 1.20.5 @@ -243,6 +244,17 @@ audience-annotations ${audience-annotations.version} + + io.openlineage + spark-extension-interfaces + ${openlineage.version} + + + io.openlineage + spark-extension-entrypoint + 1.0.0 + provided + org.apache.hbase hbase-annotations @@ -387,7 +399,7 @@ org.apache.maven.plugins maven-shade-plugin - 3.2.1 + 3.4.1 org.apache.maven.plugins diff --git a/spark/hbase-spark/pom.xml b/spark/hbase-spark/pom.xml index 10a9fd2b..9ba40c52 100644 --- a/spark/hbase-spark/pom.xml +++ b/spark/hbase-spark/pom.xml @@ -182,6 +182,21 @@ spark-catalyst_${scala.binary.version} ${spark.version} + + io.openlineage + spark-extension-interfaces + + + io.openlineage + spark-extension-entrypoint + provided + + + io.openlineage + openlineage-spark_2.12 + ${openlineage.version} + test + @@ -265,6 +280,26 @@ + + org.apache.maven.plugins + maven-shade-plugin + + + + shade + + package + + + + io.openlineage.spark.shade + org.apache.hbase.thirdparty.io.openlineage.spark.shade + + + + + + diff --git a/spark/hbase-spark/src/main/resources/META-INF/services/io.openlineage.spark.extension.OpenLineageExtensionProvider b/spark/hbase-spark/src/main/resources/META-INF/services/io.openlineage.spark.extension.OpenLineageExtensionProvider new file mode 100644 index 00000000..639ed7e3 --- /dev/null +++ b/spark/hbase-spark/src/main/resources/META-INF/services/io.openlineage.spark.extension.OpenLineageExtensionProvider @@ -0,0 +1 @@ +org.apache.hadoop.hbase.spark.SparkHBaseLineageProvider \ No newline at end of file diff --git a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala index 4ad945a3..92fe3df7 100644 --- a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala +++ b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala @@ -17,6 +17,9 @@ */ package org.apache.hadoop.hbase.spark +import io.openlineage.spark.shade.client.OpenLineage +import io.openlineage.spark.shade.client.utils.DatasetIdentifier +import io.openlineage.spark.shade.extension.v1.{LineageRelation, LineageRelationProvider} import java.util import java.util.concurrent.ConcurrentLinkedQueue import org.apache.hadoop.hbase.CellUtil @@ -53,7 +56,11 @@ import scala.collection.mutable * Through the HBase Bytes object commands. */ @InterfaceAudience.Private -class DefaultSource extends RelationProvider with CreatableRelationProvider with Logging { +class DefaultSource + extends RelationProvider + with CreatableRelationProvider + with Logging + with LineageRelationProvider { /** * Is given input from SparkSQL to construct a BaseRelation @@ -78,6 +85,19 @@ class DefaultSource extends RelationProvider with CreatableRelationProvider with relation.insert(data, false) relation } + + def getLineageDatasetIdentifier( + sparkListenerEventName: String, + openLineage: OpenLineage, + sqlContext: Any, + parameters: Any): DatasetIdentifier = { + val params = parameters.asInstanceOf[Map[String, String]] + val hbaseContext = LatestHBaseContextCache.latest + val catalog = HBaseTableCatalog(params) + val name = s"${catalog.namespace}.${catalog.name}" + val namespace = s"hbase://${hbaseContext.config.get("hbase.zookeeper.quorum")}" + new DatasetIdentifier(name, namespace) + } } /** @@ -93,7 +113,8 @@ case class HBaseRelation( extends BaseRelation with PrunedFilteredScan with InsertableRelation - with Logging { + with Logging + with LineageRelation { val timestamp = parameters.get(HBaseSparkConf.TIMESTAMP).map(_.toLong) val minTimestamp = parameters.get(HBaseSparkConf.TIMERANGE_START).map(_.toLong) val maxTimestamp = parameters.get(HBaseSparkConf.TIMERANGE_END).map(_.toLong) @@ -611,6 +632,14 @@ case class HBaseRelation( new PassThroughLogicExpression } } + + def getLineageDatasetIdentifier( + sparkListenerEventName: String, + openLineage: OpenLineage): DatasetIdentifier = { + val namespace = s"hbase://${this.hbaseConf.get("hbase.zookeeper.quorum")}" + val name = s"${this.catalog.namespace}.${this.catalog.name}" + new DatasetIdentifier(name, namespace) + } } /** diff --git a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/SparkHBaseLineageProvider.scala b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/SparkHBaseLineageProvider.scala new file mode 100644 index 00000000..4dba0d4e --- /dev/null +++ b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/SparkHBaseLineageProvider.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.spark + +import io.openlineage.spark.extension.OpenLineageExtensionProvider +import io.openlineage.spark.shade.extension.v1.lifecycle.plan.SparkOpenLineageExtensionVisitor + +class SparkHBaseLineageProvider extends OpenLineageExtensionProvider { + + def shadedPackage(): String = + "org.apache.hbase.thirdparty.io.openlineage.spark.shade" + + override def getVisitorClassName: String = + classOf[SparkOpenLineageExtensionVisitor].getCanonicalName +} diff --git a/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/OpenLineageSuite.scala b/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/OpenLineageSuite.scala new file mode 100644 index 00000000..ec6f6b11 --- /dev/null +++ b/spark/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/OpenLineageSuite.scala @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.spark + +import io.openlineage.spark.agent.OpenLineageSparkListener +import java.io.File +import org.apache.hadoop.hbase.{HBaseTestingUtility, TableName} +import org.apache.hadoop.hbase.client.{ConnectionFactory, Put} +import org.apache.hadoop.hbase.spark.datasources.{HBaseSparkConf, HBaseTableCatalog} +import org.apache.hadoop.hbase.util.Bytes +import org.apache.spark.SparkConf +import org.apache.spark.sql.{SparkSession, SQLContext} +import org.json4s._ +import org.json4s.jackson.JsonMethods._ +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite} +import org.scalatest.Matchers.convertToAnyShouldWrapper +import org.scalatest.concurrent.Eventually +import scala.collection.mutable.ArrayBuffer +import scala.io.Source + +class OpenLineageSuite + extends FunSuite + with Eventually + with BeforeAndAfterEach + with BeforeAndAfterAll + with Logging { + @transient var sc: SparkSession = null + var TEST_UTIL: HBaseTestingUtility = new HBaseTestingUtility + + val t1TableName = "t1" + val t2TableName = "t2" + val columnFamily = "c" + var sqlContext: SQLContext = null + + val timestamp = 1234567890000L + val lineageFile = File.createTempFile(s"openlineage_test_${System.nanoTime()}", ".log") + + override def beforeAll() { + + TEST_UTIL.startMiniCluster + + logInfo(" - minicluster started") + try + TEST_UTIL.deleteTable(TableName.valueOf(t1TableName)) + catch { + case e: Exception => logInfo(" - no table " + t1TableName + " found") + } + try + TEST_UTIL.deleteTable(TableName.valueOf(t2TableName)) + catch { + case e: Exception => logInfo(" - no table " + t2TableName + " found") + } + + logInfo(" - creating table " + t1TableName) + TEST_UTIL.createTable(TableName.valueOf(t1TableName), Bytes.toBytes(columnFamily)) + logInfo(" - created table") + logInfo(" - creating table " + t2TableName) + TEST_UTIL.createTable(TableName.valueOf(t2TableName), Bytes.toBytes(columnFamily)) + logInfo(" - created table") + + val sparkConf = new SparkConf + sparkConf.set(HBaseSparkConf.QUERY_CACHEBLOCKS, "true") + sparkConf.set(HBaseSparkConf.QUERY_BATCHSIZE, "100") + sparkConf.set(HBaseSparkConf.QUERY_CACHEDROWS, "100") + sparkConf.set("spark.extraListeners", classOf[OpenLineageSparkListener].getCanonicalName) + sparkConf.set("spark.openlineage.transport.type", "file") + sparkConf.set("spark.openlineage.transport.location", lineageFile.getAbsolutePath) + + sc = SparkSession + .builder() + .master("local") + .appName("openlineage-test") + .config(sparkConf) + .getOrCreate(); + val connection = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration) + try { + val t1Table = connection.getTable(TableName.valueOf(t1TableName)) + + try { + var put = new Put(Bytes.toBytes("get1")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("1")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(1)) + t1Table.put(put) + put = new Put(Bytes.toBytes("get2")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo2")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("4")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(4)) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("z"), Bytes.toBytes("FOO")) + t1Table.put(put) + put = new Put(Bytes.toBytes("get3")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("8")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(8)) + t1Table.put(put) + put = new Put(Bytes.toBytes("get4")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo4")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("10")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(10)) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("z"), Bytes.toBytes("BAR")) + t1Table.put(put) + put = new Put(Bytes.toBytes("get5")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo5")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("8")) + put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(8)) + t1Table.put(put) + } finally { + t1Table.close() + } + } finally { + connection.close() + } + + new HBaseContext(sc.sparkContext, TEST_UTIL.getConfiguration) + } + + override def afterAll() { + TEST_UTIL.deleteTable(TableName.valueOf(t1TableName)) + logInfo("shuting down minicluster") + TEST_UTIL.shutdownMiniCluster() + + sc.stop() + } + + override def beforeEach(): Unit = { + DefaultSourceStaticUtils.lastFiveExecutionRules.clear() + } + + test("Test rowKey point only rowKey query") { + val hbaseTable1Catalog = + s"""{ + |"table":{"namespace":"default", "name":"t1"}, + |"rowkey":"key", + |"columns":{ + |"KEY_FIELD":{"cf":"rowkey", "col":"key", "type":"string"}, + |"A_FIELD":{"cf":"c", "col":"a", "type":"string"}, + |"B_FIELD":{"cf":"c", "col":"b", "type":"string"} + |} + |}""".stripMargin + + val hbaseTable2Catalog = + s"""{ + |"table":{"namespace":"default", "name":"t2"}, + |"rowkey":"key", + |"columns":{ + |"KEY_FIELD":{"cf":"rowkey", "col":"key", "type":"string"}, + |"OUTPUT_A_FIELD":{"cf":"c", "col":"a", "type":"string"}, + |"OUTPUT_B_FIELD":{"cf":"c", "col":"b", "type":"string"} + |} + |}""".stripMargin + + val results = sc.read + .options(Map(HBaseTableCatalog.tableCatalog -> hbaseTable1Catalog)) + .format("org.apache.hadoop.hbase.spark") + .load() + + results.createOrReplaceTempView("tempview"); + + val outputDf = + sc.sql("SELECT KEY_FIELD, A_FIELD AS OUTPUT_A_FIELD, B_FIELD AS OUTPUT_B_FIELD FROM tempview") + + outputDf.write + .format("org.apache.hadoop.hbase.spark") + .options(Map(HBaseTableCatalog.tableCatalog -> hbaseTable2Catalog)) + .save() + + val events = eventually { + val eventLog = parseEventLog(lineageFile); eventLog.size shouldBe 1; eventLog + } + + val json = events.head + assert(((json \\ "inputs")(0) \ "name") == JString("default.t1")) + assert(((json \\ "inputs")(0) \ "namespace") == JString("hbase://127.0.0.1")) + assert(((json \\ "outputs")(0) \ "name") == JString("default.t2")) + assert(((json \\ "outputs")(0) \ "namespace") == JString("hbase://127.0.0.1")) + } + + def parseEventLog(file: File): List[JValue] = { + val source = Source.fromFile(file) + val eventlist = ArrayBuffer.empty[JValue] + for (line <- source.getLines()) { + val event = parse(line) + for { + JObject(child) <- event + JField("inputs", JArray(inputs)) <- child + JField("outputs", JArray(outputs)) <- child + JField("eventType", JString(eventType)) <- child + if outputs.nonEmpty && inputs.nonEmpty && eventType == "COMPLETE" + } yield eventlist += event + } + eventlist.toList + } +}