From c864311cc0cf0918b1d66a1e223bbcf5054b79ea Mon Sep 17 00:00:00 2001 From: Yinghao Lin Date: Fri, 5 Apr 2024 22:15:41 +0800 Subject: [PATCH] mod_0405: Add mrs spark codes --- pom.xml | 16 ++++++++++++++++ sql/catalyst/pom.xml | 4 ++++ .../sql/catalyst/catalog/SessionCatalog.scala | 3 ++- sql/core/pom.xml | 4 ++++ .../org/apache/spark/sql/SparkSession.scala | 7 ++++++- .../spark/sql/internal/SessionState.scala | 6 ++++-- sql/hive/pom.xml | 4 ++++ .../org/apache/spark/sql/hive/HiveUtils.scala | 2 +- .../spark/sql/hive/client/HiveClientImpl.scala | 10 ++++++---- .../sql/hive/client/IsolatedClientLoader.scala | 18 +++++++++++------- 10 files changed, 58 insertions(+), 16 deletions(-) diff --git a/pom.xml b/pom.xml index 79a3fa6f149aa..b511a8f6dd550 100644 --- a/pom.xml +++ b/pom.xml @@ -2551,6 +2551,22 @@ arpack ${netlib.ludovic.dev.version} + + io.kyligence.mrs + mrs-spark-extractor + 1.0-SNAPSHOT + provided + + + org.apache.spark + spark-sql_2.12 + + + org.apache.spark + spark-catalyst_2.12 + + + diff --git a/sql/catalyst/pom.xml b/sql/catalyst/pom.xml index c846741b9cf0c..1ff4b1ea151f3 100644 --- a/sql/catalyst/pom.xml +++ b/sql/catalyst/pom.xml @@ -131,6 +131,10 @@ org.apache.arrow arrow-memory-netty + + io.kyligence.mrs + mrs-spark-extractor + target/scala-${scala.binary.version}/classes diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 11c8d9fcacc62..2024111d7ec12 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -70,7 +70,8 @@ class SessionCatalog( functionResourceLoader: FunctionResourceLoader, functionExpressionBuilder: FunctionExpressionBuilder, cacheSize: Int = SQLConf.get.tableRelationCacheSize, - cacheTTL: Long = SQLConf.get.metadataCacheTTL) extends SQLConfHelper with Logging { + cacheTTL: Long = SQLConf.get.metadataCacheTTL) extends SQLConfHelper + with ExtendedProperties with Logging { import SessionCatalog._ import CatalogTypes.TablePartitionSpec diff --git a/sql/core/pom.xml b/sql/core/pom.xml index 02e05569c234e..8982d3725e221 100644 --- a/sql/core/pom.xml +++ b/sql/core/pom.xml @@ -128,6 +128,10 @@ org.apache.xbean xbean-asm9-shaded + + io.kyligence.mrs + mrs-spark-extractor + org.scalacheck scalacheck_${scala.binary.version} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index 4391f1747d5b6..25330edf83977 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -91,6 +91,8 @@ class SparkSession private( // The call site where this SparkSession was constructed. private val creationSite: CallSite = Utils.getCallSite() + private[spark] val sessionStateInitialized: AtomicBoolean = new AtomicBoolean(false) + /** * Constructor used in Pyspark. Contains explicit application of Spark Session Extensions * which otherwise only occurs during getOrCreate. We cannot add this to the default constructor @@ -110,7 +112,9 @@ class SparkSession private( // If there is no active SparkSession, uses the default SQL conf. Otherwise, use the session's. SQLConf.setSQLConfGetter(() => { - SparkSession.getActiveSession.filterNot(_.sparkContext.isStopped).map(_.sessionState.conf) + SparkSession.getActiveSession.filterNot(_.sparkContext.isStopped) + .filter(_.sessionStateInitialized.get()) + .map(_.sessionState.conf) .getOrElse(SQLConf.getFallbackConf) }) @@ -157,6 +161,7 @@ class SparkSession private( val state = SparkSession.instantiateSessionState( SparkSession.sessionStateClassName(sharedState.conf), self) + sessionStateInitialized.set(true) state } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index cdf764a7317dd..7b4d14220c5ce 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -19,10 +19,8 @@ package org.apache.spark.sql.internal import java.io.File import java.net.URI - import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path - import org.apache.spark.annotation.Unstable import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry, TableFunctionRegistry} @@ -94,6 +92,10 @@ private[sql] class SessionState( // when connecting to ThriftServer. lazy val streamingQueryManager: StreamingQueryManager = streamingQueryManagerBuilder() + var preExecutionRules: Seq[Rule[SparkPlan]] = _ + + val sessionStateListenerManager: SessionStateListenerManager = new SessionStateListenerManager() + def catalogManager: CatalogManager = analyzer.catalogManager def newHadoopConf(): Configuration = SessionState.newHadoopConf( diff --git a/sql/hive/pom.xml b/sql/hive/pom.xml index b87c8d3f0baa4..77e21939213f2 100644 --- a/sql/hive/pom.xml +++ b/sql/hive/pom.xml @@ -40,6 +40,10 @@ spark-core_${scala.binary.version} ${project.version} + + io.kyligence.mrs + mrs-spark-extractor + org.apache.spark spark-core_${scala.binary.version} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index 93a38e524ebdc..40a26f2ef8b02 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -427,7 +427,7 @@ private[spark] object HiveUtils extends Logging { hadoopConf = hadoopConf, execJars = jars.toSeq, config = configurations, - isolationOn = !isCliSessionState(), + isolationOn = false, barrierPrefixes = hiveMetastoreBarrierPrefixes, sharedPrefixes = hiveMetastoreSharedPrefixes) } else if (hiveMetastoreJars == "maven") { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 12e54be70e5fb..022fcc7fa86cb 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -162,7 +162,9 @@ private[hive] class HiveClientImpl( private def newState(): SessionState = { val hiveConf = newHiveConf(sparkConf, hadoopConf, extraConfig, Some(initClassLoader)) - val state = new SessionState(hiveConf) + val user = Utils.getCurrentUserName() + logInfo(s"newState user: ${user}") + val state = new SessionState(hiveConf, user) if (clientLoader.cachedHive != null) { Hive.set(clientLoader.cachedHive.asInstanceOf[Hive]) } @@ -270,7 +272,7 @@ private[hive] class HiveClientImpl( false } - private def client: Hive = { + def client: Hive = { if (clientLoader.cachedHive != null) { clientLoader.cachedHive.asInstanceOf[Hive] } else { @@ -1273,8 +1275,8 @@ private[hive] object HiveClientImpl extends Logging { // 3: we set all entries in config to this hiveConf. val confMap = (hadoopConf.iterator().asScala.map(kv => kv.getKey -> kv.getValue) ++ sparkConf.getAll.toMap ++ extraConfig).toMap - confMap.foreach { case (k, v) => hiveConf.set(k, v) } - SQLConf.get.redactOptions(confMap).foreach { case (k, v) => + confMap.foreach { case (k, v) => + hiveConf.set(k, v) logDebug(s"Applying Hadoop/Hive/Spark and extra properties to Hive Conf:$k=$v") } // Disable CBO because we removed the Calcite dependency. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala index 9aa6a09fd57af..330a54db2ddb3 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala @@ -18,18 +18,15 @@ package org.apache.spark.sql.hive.client import java.io.File -import java.lang.reflect.InvocationTargetException +import java.lang.reflect.{InvocationTargetException} import java.net.{URL, URLClassLoader} import java.util - import scala.util.Try - import org.apache.commons.io.{FileUtils, IOUtils} import org.apache.commons.lang3.{JavaVersion, SystemUtils} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hadoop.hive.shims.ShimLoader - import org.apache.spark.SparkConf import org.apache.spark.deploy.SparkSubmitUtils import org.apache.spark.internal.Logging @@ -295,8 +292,11 @@ private[hive] class IsolatedClientLoader( private[hive] def createClient(): HiveClient = synchronized { val warehouseDir = Option(hadoopConf.get(ConfVars.METASTOREWAREHOUSE.varname)) if (!isolationOn) { - return new HiveClientImpl(version, warehouseDir, sparkConf, hadoopConf, config, - baseClassLoader, this) + return Thread.currentThread().getContextClassLoader + .loadClass(getHiveClientImplClassName()) + .getConstructors.head + .newInstance(version, warehouseDir, sparkConf, hadoopConf, config, baseClassLoader, this) + .asInstanceOf[HiveClient] } // Pre-reflective instantiation setup. logDebug("Initializing the logger to avoid disaster...") @@ -305,7 +305,7 @@ private[hive] class IsolatedClientLoader( try { classLoader - .loadClass(classOf[HiveClientImpl].getName) + .loadClass(getHiveClientImplClassName()) .getConstructors.head .newInstance(version, warehouseDir, sparkConf, hadoopConf, config, classLoader, this) .asInstanceOf[HiveClient] @@ -328,4 +328,8 @@ private[hive] class IsolatedClientLoader( * IsolatedClientLoader). */ private[hive] var cachedHive: Any = null + + private[hive] def getHiveClientImplClassName(): String = { + sparkConf.get("spark.sql.hive.implementation", HiveClientImpl.getClass.getName) + } }