From eaf7b9073eeacbe3c113c82c541a55bedbf3eb8b Mon Sep 17 00:00:00 2001 From: awang12345 Date: Fri, 16 Aug 2024 14:12:04 +0800 Subject: [PATCH 1/3] feat: support read and write from hive datasource --- .../src/main/resources/application.conf | 41 +++++++- .../com/vesoft/nebula/algorithm/Main.scala | 6 +- .../nebula/algorithm/config/Configs.scala | 96 ++++++++++++++++--- .../nebula/algorithm/config/SparkConfig.scala | 9 ++ .../nebula/algorithm/reader/DataReader.scala | 37 +++++++ .../nebula/algorithm/reader/ReaderType.scala | 5 +- .../nebula/algorithm/writer/AlgoWriter.scala | 50 +++++++++- .../nebula/algorithm/writer/WriterType.scala | 5 +- 8 files changed, 224 insertions(+), 25 deletions(-) diff --git a/nebula-algorithm/src/main/resources/application.conf b/nebula-algorithm/src/main/resources/application.conf index e01c0fe..5fb4d85 100644 --- a/nebula-algorithm/src/main/resources/application.conf +++ b/nebula-algorithm/src/main/resources/application.conf @@ -11,14 +11,49 @@ } data: { - # data source. optional of nebula,nebula-ngql,csv,json + # data source. optional of nebula,nebula-ngql,csv,json,hive source: csv - # data sink, means the algorithm result will be write into this sink. optional of nebula,csv,text + # data sink, means the algorithm result will be write into this sink. optional of nebula,csv,text,hive sink: csv # if your algorithm needs weight hasWeight: false } + # Hive related config + hive: { + # algo's data source from hive + read: { + #[Optional] spark and hive require configuration on different clusters + metaStoreUris: "thrift://hive-metastore-server-01:9083" + #spark sql + sql: "select column_1,column_2,column_3 from database_01.table_01 " + #[Optional] graph source vid mapping with column of sql result. + srcId: "column_1" + #[Optional] graph dest vid mapping with column of sql result + dstId: "column_2" + #[Optional] graph weight mapping with column of sql result + weight: "column_3" + } + + # algo result sink into hive + write: { + #[Optional] spark and hive require configuration on different clusters + metaStoreUris: "thrift://hive-metastore-server-02:9083" + #save result to hive table + dbTableName: "database_02.table_02" + #[Optional] spark dataframe save mode,optional of Append,Overwrite,ErrorIfExists,Ignore. Default is Overwrite + saveMode: "Overwrite" + #[Optional] if auto create hive table. Default is true + autoCreateTable: true + #[Optional] algorithm result mapping with hive table column name. Default same with column name of algo result dataframe + resultTableColumnMapping: { + # Note: Different algorithms have different output fields, so let's take the pagerank algorithm for example: + _id: "column_1" + pagerank: "pagerank_value" + } + } + } + # NebulaGraph related config nebula: { # algo's data source from Nebula. If data.source is nebula, then this nebula.read config can be valid. @@ -78,7 +113,7 @@ # the algorithm that you are going to execute,pick one from [pagerank, louvain, connectedcomponent, # labelpropagation, shortestpaths, degreestatic, kcore, stronglyconnectedcomponent, trianglecount, # betweenness, graphtriangleCount, clusteringcoefficient, bfs, hanp, closeness, jaccard, node2vec] - executeAlgo: graphtrianglecount + executeAlgo: pagerank # PageRank parameter pagerank: { diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/Main.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/Main.scala index 0b2fc54..974f88b 100644 --- a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/Main.scala +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/Main.scala @@ -55,7 +55,7 @@ object Main { val algoTime = System.currentTimeMillis() // writer - saveAlgoResult(algoResult, configs) + saveAlgoResult(sparkConfig.spark, algoResult, configs) val endTime = System.currentTimeMillis() sparkConfig.spark.stop() @@ -149,8 +149,8 @@ object Main { } } - private[this] def saveAlgoResult(algoResult: DataFrame, configs: Configs): Unit = { + private[this] def saveAlgoResult(spark: SparkSession, algoResult: DataFrame, configs: Configs): Unit = { val writer = AlgoWriter.make(configs) - writer.write(algoResult, configs) + writer.write(spark, algoResult, configs) } } diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/Configs.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/Configs.scala index dc906d6..b91b35b 100644 --- a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/Configs.scala +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/Configs.scala @@ -12,6 +12,7 @@ import org.apache.log4j.Logger import scala.collection.JavaConverters._ import com.typesafe.config.{Config, ConfigFactory} import com.vesoft.nebula.algorithm.config.Configs.readConfig +import com.vesoft.nebula.algorithm.config.Configs.getOrElse import scala.collection.mutable @@ -129,6 +130,46 @@ object LocalConfigEntry { } } + +object HiveConfigEntry { + def apply(config: Config): HiveConfigEntry = { + //执行SQL + val sql: String = getOrElse(config,"hive.read.sql","") + //起点ID字段名称 + val srcIdCol: String = getOrElse(config,"hive.read.srcId","") + //目标ID字段名称 + val dstIdCol: String = getOrElse(config,"hive.read.dstId","") + //权重字段名称 + val weightCol: String = getOrElse(config,"hive.read.weight","") + //hive元数据地址 + val readMetaStoreUris: String = getOrElse(config,"hive.read.metaStoreUris","") + val readConfigEntry = HiveReadConfigEntry(sql, srcIdCol, dstIdCol, weightCol, readMetaStoreUris) + + //写入hive表名:db.table + val dbTableName: String = getOrElse(config,"hive.write.dbTableName","") + //保存模式,见spark中的saveMode + val saveMode: String = getOrElse(config,"hive.write.saveMode","") + //是否自动建表 + val autoCreateTable: Boolean = getOrElse(config,"hive.write.autoCreateTable",true) + //hive元数据地址 + val writeMetaStoreUris: String = getOrElse(config,"hive.write.metaStoreUris","") + //执行结果和表字段映射关系,比如将算法结果中的_id映射为user_id + val resultColumnMapping = mutable.Map[String, String]() + val mappingKey = "hive.write.resultTableColumnMapping" + if (config.hasPath(mappingKey)) { + val mappingConfig = config.getObject(mappingKey) + for (subkey <- mappingConfig.unwrapped().keySet().asScala) { + val key = s"${mappingKey}.${subkey}" + val value = config.getString(key) + resultColumnMapping += subkey -> value + } + } + val writeConfigEntry = HiveWriteConfigEntry(dbTableName, saveMode, autoCreateTable, resultColumnMapping, writeMetaStoreUris) + + HiveConfigEntry(readConfigEntry, writeConfigEntry) + } +} + /** * SparkConfigEntry support key-value pairs for spark session. * @@ -173,6 +214,35 @@ case class LocalConfigEntry(filePath: String, } } +case class HiveConfigEntry(hiveReadConfigEntry:HiveReadConfigEntry, + hiveWriteConfigEntry:HiveWriteConfigEntry) { + override def toString: String = { + s"HiveConfigEntry: {read: $hiveReadConfigEntry, write: $hiveWriteConfigEntry}" + } +} + +case class HiveReadConfigEntry(sql: String, + srcIdCol: String = "srcId", + dstIdCol: String = "dstId", + weightCol: String, + metaStoreUris: String) { + override def toString: String = { + s"HiveReadConfigEntry: {sql: $sql, srcIdCol: $srcIdCol, dstIdCol: $dstIdCol, " + + s"weightCol:$weightCol, metaStoreUris:$metaStoreUris}" + } +} + +case class HiveWriteConfigEntry(dbTableName: String, + saveMode: String, + autoCreateTable: Boolean, + resultColumnMapping: mutable.Map[String, String], + metaStoreUris: String) { + override def toString: String = { + s"HiveWriteConfigEntry: {dbTableName: $dbTableName, saveMode=$saveMode, " + + s"autoCreateTable=$autoCreateTable, resultColumnMapping=$resultColumnMapping, metaStoreUris=$metaStoreUris}" + } +} + /** * NebulaConfigEntry * @param readConfigEntry config for nebula-spark-connector reader @@ -218,6 +288,7 @@ case class Configs(sparkConfig: SparkConfigEntry, dataSourceSinkEntry: DataSourceSinkEntry, nebulaConfig: NebulaConfigEntry, localConfigEntry: LocalConfigEntry, + hiveConfigEntry: HiveConfigEntry, algorithmConfig: AlgorithmConfigEntry) object Configs { @@ -237,10 +308,11 @@ object Configs { val dataSourceEntry = DataSourceSinkEntry(config) val localConfigEntry = LocalConfigEntry(config) val nebulaConfigEntry = NebulaConfigEntry(config) - val sparkEntry = SparkConfigEntry(config) - val algorithmEntry = AlgorithmConfigEntry(config) + val hiveConfigEntry = HiveConfigEntry(config) + val sparkEntry = SparkConfigEntry(config) + val algorithmEntry = AlgorithmConfigEntry(config) - Configs(sparkEntry, dataSourceEntry, nebulaConfigEntry, localConfigEntry, algorithmEntry) + Configs(sparkEntry, dataSourceEntry, nebulaConfigEntry, localConfigEntry, hiveConfigEntry, algorithmEntry) } /** @@ -277,15 +349,15 @@ object Configs { } /** - * Get the value from config by the path. If the path not exist, return the default value. - * - * @param config The config. - * @param path The path of the config. - * @param defaultValue The default value for the path. - * - * @return - */ - private[this] def getOrElse[T](config: Config, path: String, defaultValue: T): T = { + * Get the value from config by the path. If the path not exist, return the default value. + * + * @param config The config. + * @param path The path of the config. + * @param defaultValue The default value for the path. + * + * @return + */ + def getOrElse[T](config: Config, path: String, defaultValue: T): T = { if (config.hasPath(path)) { config.getAnyRef(path).asInstanceOf[T] } else { diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/SparkConfig.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/SparkConfig.scala index 7c863be..f407a37 100644 --- a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/SparkConfig.scala +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/SparkConfig.scala @@ -5,6 +5,8 @@ package com.vesoft.nebula.algorithm.config +import com.vesoft.nebula.algorithm.reader.ReaderType +import com.vesoft.nebula.algorithm.writer.WriterType import org.apache.spark.sql.SparkSession case class SparkConfig(spark: SparkSession, partitionNum: Int) @@ -20,6 +22,13 @@ object SparkConfig { sparkConfigs.foreach { case (key, value) => session.config(key, value) } + + val dataSource = configs.dataSourceSinkEntry + if (dataSource.source.equals(ReaderType.hive.stringify) + || dataSource.sink.equals(WriterType.hive.stringify)) { + session.enableHiveSupport() + } + val partitionNum = sparkConfigs.getOrElse("spark.app.partitionNum", "0") val spark = session.getOrCreate() validate(spark.version, "2.4.*") diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/reader/DataReader.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/reader/DataReader.scala index e11d868..4f61333 100644 --- a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/reader/DataReader.scala +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/reader/DataReader.scala @@ -25,6 +25,7 @@ object DataReader { case ReaderType.nebulaNgql => new NebulaNgqlReader case ReaderType.nebula => new NebulaReader case ReaderType.csv => new CsvReader + case ReaderType.hive => new HiveReader } .getOrElse(throw new UnsupportedOperationException("unsupported reader")) } @@ -179,3 +180,39 @@ final class JsonReader extends DataReader { data } } +final class HiveReader extends DataReader { + + override val tpe: ReaderType = ReaderType.hive + override def read(spark: SparkSession, configs: Configs, partitionNum: Int): DataFrame = { + val readConfig = configs.hiveConfigEntry.hiveReadConfigEntry + val sql = readConfig.sql + val srcIdCol = readConfig.srcIdCol + val dstIdCol = readConfig.dstIdCol + val weightCol = readConfig.weightCol + + println(s"""hiveDataReader, srcIdCol:$srcIdCol, dstIdCol:$dstIdCol, weightCol:$weightCol""") + + if (readConfig.metaStoreUris != null && readConfig.metaStoreUris.trim.nonEmpty) { + spark.conf.set("hive.metastore.schema.verification", false) + spark.conf.set("hive.metastore.uris", readConfig.metaStoreUris) + } + + var data = spark.sql(sql) + + if (srcIdCol != null && dstIdCol != null && srcIdCol.trim.nonEmpty && dstIdCol.trim.nonEmpty) { + if (configs.dataSourceSinkEntry.hasWeight && weightCol != null && weightCol.trim.nonEmpty) { + data = data.select(srcIdCol, dstIdCol, weightCol) + } else { + data = data.select(srcIdCol, dstIdCol) + } + } + + if (partitionNum != 0) { + data.repartition(partitionNum) + } + + data.show(3) + + data + } +} diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/reader/ReaderType.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/reader/ReaderType.scala index ca1d101..12fc054 100644 --- a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/reader/ReaderType.scala +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/reader/ReaderType.scala @@ -17,6 +17,7 @@ sealed trait ReaderType { case ReaderType.nebulaNgql => "nebula-ngql" case ReaderType.nebula => "nebula" case ReaderType.csv => "csv" + case ReaderType.hive => "hive" } } object ReaderType { @@ -24,10 +25,12 @@ object ReaderType { json.stringify -> json, nebulaNgql.stringify -> nebulaNgql, nebula.stringify -> nebula, - csv.stringify -> csv + csv.stringify -> csv, + hive.stringify -> hive ) object json extends ReaderType object nebulaNgql extends ReaderType object nebula extends ReaderType object csv extends ReaderType + object hive extends ReaderType } diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/writer/AlgoWriter.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/writer/AlgoWriter.scala index e4da34d..4b6e23f 100644 --- a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/writer/AlgoWriter.scala +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/writer/AlgoWriter.scala @@ -8,11 +8,11 @@ package com.vesoft.nebula.algorithm.writer import com.vesoft.nebula.connector.connector.NebulaDataFrameWriter import com.vesoft.nebula.connector.{NebulaConnectionConfig, WriteMode, WriteNebulaVertexConfig} import com.vesoft.nebula.algorithm.config.{AlgoConstants, Configs} -import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} abstract class AlgoWriter { val tpe:WriterType - def write(data: DataFrame, configs: Configs): Unit + def write(spark: SparkSession, data: DataFrame, configs: Configs): Unit } object AlgoWriter { def make(configs: Configs): AlgoWriter = { @@ -20,6 +20,7 @@ object AlgoWriter { case WriterType.text => new TextWriter case WriterType.nebula => new NebulaWriter case WriterType.csv => new CsvWriter + case WriterType.hive => new HiveWriter }.getOrElse(throw new UnsupportedOperationException("unsupported writer")) } @@ -27,7 +28,7 @@ object AlgoWriter { final class NebulaWriter extends AlgoWriter { override val tpe: WriterType = WriterType.nebula - override def write(data: DataFrame, configs: Configs): Unit = { + override def write(spark: SparkSession, data: DataFrame, configs: Configs): Unit = { val graphAddress = configs.nebulaConfig.writeConfigEntry.graphAddress val metaAddress = configs.nebulaConfig.writeConfigEntry.metaAddress val space = configs.nebulaConfig.writeConfigEntry.space @@ -61,7 +62,7 @@ final class NebulaWriter extends AlgoWriter { final class CsvWriter extends AlgoWriter { override val tpe: WriterType = WriterType.csv - override def write(data: DataFrame, configs: Configs): Unit = { + override def write(spark: SparkSession, data: DataFrame, configs: Configs): Unit = { val resultPath = configs.localConfigEntry.resultPath data.write.option("header", true).csv(resultPath) } @@ -69,8 +70,47 @@ final class CsvWriter extends AlgoWriter { final class TextWriter extends AlgoWriter { override val tpe: WriterType = WriterType.text - override def write(data: DataFrame, configs: Configs): Unit = { + override def write(spark: SparkSession, data: DataFrame, configs: Configs): Unit = { val resultPath = configs.localConfigEntry.resultPath data.write.option("header", true).text(resultPath) } } + +final class HiveWriter extends AlgoWriter { + override val tpe: WriterType = WriterType.hive + override def write(spark: SparkSession, data: DataFrame, configs: Configs): Unit = { + val config = configs.hiveConfigEntry.hiveWriteConfigEntry + val saveMode = SaveMode.values().find(_.name.equalsIgnoreCase(config.saveMode)).getOrElse(SaveMode.Append) + val columnMapping = config.resultColumnMapping + + var _data = data + columnMapping.map{ + case (from, to) => + _data = _data.withColumnRenamed(from, to) + } + + if (config.metaStoreUris != null && config.metaStoreUris.trim.nonEmpty) { + spark.conf.set("hive.metastore.schema.verification", false) + spark.conf.set("hive.metastore.uris", config.metaStoreUris) + } + + if(config.autoCreateTable){ + val createTableStatement = generateCreateTableStatement(_data, config.dbTableName) + println(s"execute create hive table statement:${createTableStatement}") + spark.sql(createTableStatement) + } + + println(s"Save to hive:${config.dbTableName}, saveMode:${saveMode}") + _data.show(3) + _data.write.mode(saveMode).insertInto(config.dbTableName) + } + + def generateCreateTableStatement(df: DataFrame, tableName: String): String = { + val columns = df.schema.fields + val columnDefinitions = columns.map { field => + s"${field.name} ${field.dataType.typeName}" + }.mkString(",\n ") + s"CREATE TABLE IF NOT EXISTS $tableName (\n $columnDefinitions\n)" + } + +} diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/writer/WriterType.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/writer/WriterType.scala index 84a7839..1a81497 100644 --- a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/writer/WriterType.scala +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/writer/WriterType.scala @@ -16,15 +16,18 @@ sealed trait WriterType { case WriterType.text => "text" case WriterType.nebula => "nebula" case WriterType.csv => "csv" + case WriterType.hive => "hive" } } object WriterType { lazy val mapping: Map[String, WriterType] = Map( text.stringify -> text, nebula.stringify -> nebula, - csv.stringify -> csv + csv.stringify -> csv, + hive.stringify -> hive ) object text extends WriterType object nebula extends WriterType object csv extends WriterType + object hive extends WriterType } From f1a2708af6ef5f08042ee42dcc01ced92b0f54cc Mon Sep 17 00:00:00 2001 From: awang12345 Date: Fri, 16 Aug 2024 18:25:12 +0800 Subject: [PATCH 2/3] feat: connect hive by meta store --- .../src/main/resources/application.conf | 6 +- .../nebula/algorithm/config/Configs.scala | 70 ++++++++++--------- .../nebula/algorithm/config/SparkConfig.scala | 20 ++++-- .../nebula/algorithm/reader/DataReader.scala | 7 -- .../nebula/algorithm/writer/AlgoWriter.scala | 5 -- 5 files changed, 54 insertions(+), 54 deletions(-) diff --git a/nebula-algorithm/src/main/resources/application.conf b/nebula-algorithm/src/main/resources/application.conf index 5fb4d85..0fec307 100644 --- a/nebula-algorithm/src/main/resources/application.conf +++ b/nebula-algorithm/src/main/resources/application.conf @@ -21,10 +21,10 @@ # Hive related config hive: { + #[Optional] spark and hive require configuration on different clusters. Read and write connect hive with this metastore + metaStoreUris: "thrift://hive-metastore-server:9083" # algo's data source from hive read: { - #[Optional] spark and hive require configuration on different clusters - metaStoreUris: "thrift://hive-metastore-server-01:9083" #spark sql sql: "select column_1,column_2,column_3 from database_01.table_01 " #[Optional] graph source vid mapping with column of sql result. @@ -37,8 +37,6 @@ # algo result sink into hive write: { - #[Optional] spark and hive require configuration on different clusters - metaStoreUris: "thrift://hive-metastore-server-02:9083" #save result to hive table dbTableName: "database_02.table_02" #[Optional] spark dataframe save mode,optional of Append,Overwrite,ErrorIfExists,Ignore. Default is Overwrite diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/Configs.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/Configs.scala index b91b35b..6222bc6 100644 --- a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/Configs.scala +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/Configs.scala @@ -133,27 +133,33 @@ object LocalConfigEntry { object HiveConfigEntry { def apply(config: Config): HiveConfigEntry = { - //执行SQL - val sql: String = getOrElse(config,"hive.read.sql","") - //起点ID字段名称 - val srcIdCol: String = getOrElse(config,"hive.read.srcId","") - //目标ID字段名称 - val dstIdCol: String = getOrElse(config,"hive.read.dstId","") - //权重字段名称 - val weightCol: String = getOrElse(config,"hive.read.weight","") - //hive元数据地址 - val readMetaStoreUris: String = getOrElse(config,"hive.read.metaStoreUris","") - val readConfigEntry = HiveReadConfigEntry(sql, srcIdCol, dstIdCol, weightCol, readMetaStoreUris) - - //写入hive表名:db.table - val dbTableName: String = getOrElse(config,"hive.write.dbTableName","") - //保存模式,见spark中的saveMode - val saveMode: String = getOrElse(config,"hive.write.saveMode","") - //是否自动建表 - val autoCreateTable: Boolean = getOrElse(config,"hive.write.autoCreateTable",true) - //hive元数据地址 - val writeMetaStoreUris: String = getOrElse(config,"hive.write.metaStoreUris","") - //执行结果和表字段映射关系,比如将算法结果中的_id映射为user_id + //uri of hive metastore. eg: thrift://127.0.0.1:9083 + val hiveMetaStoreUris: String = getOrElse(config, "hive.metaStoreUris", "") + val readConfigEntry = buildReadConfig(config) + val writeConfigEntry = buildWriteConfig(config) + HiveConfigEntry(hiveMetaStoreUris,readConfigEntry, writeConfigEntry) + } + + def buildReadConfig(config: Config): HiveReadConfigEntry = { + //source data of spark sql + val sql: String = getOrElse(config, "hive.read.sql", "") + //the source vertex ID is mapped with the SQL result column name + val srcIdCol: String = getOrElse(config, "hive.read.srcId", "") + //the dest vertex ID is mapped with the SQL result column name + val dstIdCol: String = getOrElse(config, "hive.read.dstId", "") + //the weight is mapped with the SQL result column name + val weightCol: String = getOrElse(config, "hive.read.weight", "") + HiveReadConfigEntry(sql, srcIdCol, dstIdCol, weightCol) + } + + def buildWriteConfig(config: Config): HiveWriteConfigEntry = { + //algo result save to hive table + val dbTableName: String = getOrElse(config, "hive.write.dbTableName", "") + //save mode of spark + val saveMode: String = getOrElse(config, "hive.write.saveMode", "") + //Whether the table is automatically created + val autoCreateTable: Boolean = getOrElse(config, "hive.write.autoCreateTable", true) + //algo results dataframe column and hive table column mapping relationships val resultColumnMapping = mutable.Map[String, String]() val mappingKey = "hive.write.resultTableColumnMapping" if (config.hasPath(mappingKey)) { @@ -164,10 +170,9 @@ object HiveConfigEntry { resultColumnMapping += subkey -> value } } - val writeConfigEntry = HiveWriteConfigEntry(dbTableName, saveMode, autoCreateTable, resultColumnMapping, writeMetaStoreUris) - - HiveConfigEntry(readConfigEntry, writeConfigEntry) + HiveWriteConfigEntry(dbTableName, saveMode, autoCreateTable, resultColumnMapping) } + } /** @@ -214,32 +219,31 @@ case class LocalConfigEntry(filePath: String, } } -case class HiveConfigEntry(hiveReadConfigEntry:HiveReadConfigEntry, - hiveWriteConfigEntry:HiveWriteConfigEntry) { +case class HiveConfigEntry(hiveMetaStoreUris: String, + hiveReadConfigEntry: HiveReadConfigEntry, + hiveWriteConfigEntry: HiveWriteConfigEntry) { override def toString: String = { - s"HiveConfigEntry: {read: $hiveReadConfigEntry, write: $hiveWriteConfigEntry}" + s"HiveConfigEntry: {hiveMetaStoreUris:$hiveMetaStoreUris, read: $hiveReadConfigEntry, write: $hiveWriteConfigEntry}" } } case class HiveReadConfigEntry(sql: String, srcIdCol: String = "srcId", dstIdCol: String = "dstId", - weightCol: String, - metaStoreUris: String) { + weightCol: String) { override def toString: String = { s"HiveReadConfigEntry: {sql: $sql, srcIdCol: $srcIdCol, dstIdCol: $dstIdCol, " + - s"weightCol:$weightCol, metaStoreUris:$metaStoreUris}" + s"weightCol:$weightCol}" } } case class HiveWriteConfigEntry(dbTableName: String, saveMode: String, autoCreateTable: Boolean, - resultColumnMapping: mutable.Map[String, String], - metaStoreUris: String) { + resultColumnMapping: mutable.Map[String, String]) { override def toString: String = { s"HiveWriteConfigEntry: {dbTableName: $dbTableName, saveMode=$saveMode, " + - s"autoCreateTable=$autoCreateTable, resultColumnMapping=$resultColumnMapping, metaStoreUris=$metaStoreUris}" + s"autoCreateTable=$autoCreateTable, resultColumnMapping=$resultColumnMapping}" } } diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/SparkConfig.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/SparkConfig.scala index f407a37..86c68b4 100644 --- a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/SparkConfig.scala +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/SparkConfig.scala @@ -23,11 +23,8 @@ object SparkConfig { session.config(key, value) } - val dataSource = configs.dataSourceSinkEntry - if (dataSource.source.equals(ReaderType.hive.stringify) - || dataSource.sink.equals(WriterType.hive.stringify)) { - session.enableHiveSupport() - } + // set hive config + setHiveConfig(session, configs) val partitionNum = sparkConfigs.getOrElse("spark.app.partitionNum", "0") val spark = session.getOrCreate() @@ -35,6 +32,19 @@ object SparkConfig { SparkConfig(spark, partitionNum.toInt) } + private def setHiveConfig(session: org.apache.spark.sql.SparkSession.Builder, configs: Configs): Unit = { + val dataSource = configs.dataSourceSinkEntry + if (dataSource.source.equals(ReaderType.hive.stringify) + || dataSource.sink.equals(WriterType.hive.stringify)) { + session.enableHiveSupport() + val uris = configs.hiveConfigEntry.hiveMetaStoreUris + if (uris != null && uris.trim.nonEmpty) { + session.config("hive.metastore.schema.verification", false) + session.config("hive.metastore.uris", uris) + } + } + } + private def validate(sparkVersion: String, supportedVersions: String*): Unit = { if (sparkVersion != "UNKNOWN" && !supportedVersions.exists(sparkVersion.matches)) { throw new RuntimeException( diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/reader/DataReader.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/reader/DataReader.scala index 4f61333..c2392f5 100644 --- a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/reader/DataReader.scala +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/reader/DataReader.scala @@ -190,13 +190,6 @@ final class HiveReader extends DataReader { val dstIdCol = readConfig.dstIdCol val weightCol = readConfig.weightCol - println(s"""hiveDataReader, srcIdCol:$srcIdCol, dstIdCol:$dstIdCol, weightCol:$weightCol""") - - if (readConfig.metaStoreUris != null && readConfig.metaStoreUris.trim.nonEmpty) { - spark.conf.set("hive.metastore.schema.verification", false) - spark.conf.set("hive.metastore.uris", readConfig.metaStoreUris) - } - var data = spark.sql(sql) if (srcIdCol != null && dstIdCol != null && srcIdCol.trim.nonEmpty && dstIdCol.trim.nonEmpty) { diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/writer/AlgoWriter.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/writer/AlgoWriter.scala index 4b6e23f..31f07e0 100644 --- a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/writer/AlgoWriter.scala +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/writer/AlgoWriter.scala @@ -89,11 +89,6 @@ final class HiveWriter extends AlgoWriter { _data = _data.withColumnRenamed(from, to) } - if (config.metaStoreUris != null && config.metaStoreUris.trim.nonEmpty) { - spark.conf.set("hive.metastore.schema.verification", false) - spark.conf.set("hive.metastore.uris", config.metaStoreUris) - } - if(config.autoCreateTable){ val createTableStatement = generateCreateTableStatement(_data, config.dbTableName) println(s"execute create hive table statement:${createTableStatement}") From a1ff1f8dec19055da7d73c4f6f34fa1ebc65ad85 Mon Sep 17 00:00:00 2001 From: awang12345 Date: Sat, 17 Aug 2024 10:30:29 +0800 Subject: [PATCH 3/3] refactor: remove show dataFrame --- .../scala/com/vesoft/nebula/algorithm/reader/DataReader.scala | 2 -- .../scala/com/vesoft/nebula/algorithm/writer/AlgoWriter.scala | 2 -- 2 files changed, 4 deletions(-) diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/reader/DataReader.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/reader/DataReader.scala index c2392f5..872e834 100644 --- a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/reader/DataReader.scala +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/reader/DataReader.scala @@ -204,8 +204,6 @@ final class HiveReader extends DataReader { data.repartition(partitionNum) } - data.show(3) - data } } diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/writer/AlgoWriter.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/writer/AlgoWriter.scala index 31f07e0..3fcb8ce 100644 --- a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/writer/AlgoWriter.scala +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/writer/AlgoWriter.scala @@ -95,8 +95,6 @@ final class HiveWriter extends AlgoWriter { spark.sql(createTableStatement) } - println(s"Save to hive:${config.dbTableName}, saveMode:${saveMode}") - _data.show(3) _data.write.mode(saveMode).insertInto(config.dbTableName) }