Skip to content

Commit

Permalink
mod_0405: Add mrs spark codes
Browse files Browse the repository at this point in the history
  • Loading branch information
yhcast0 committed Apr 5, 2024
1 parent 5e44eef commit c864311
Show file tree
Hide file tree
Showing 10 changed files with 58 additions and 16 deletions.
16 changes: 16 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2551,6 +2551,22 @@
<artifactId>arpack</artifactId>
<version>${netlib.ludovic.dev.version}</version>
</dependency>
<dependency>
<groupId>io.kyligence.mrs</groupId>
<artifactId>mrs-spark-extractor</artifactId>
<version>1.0-SNAPSHOT</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_2.12</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down
4 changes: 4 additions & 0 deletions sql/catalyst/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory-netty</artifactId>
</dependency>
<dependency>
<groupId>io.kyligence.mrs</groupId>
<artifactId>mrs-spark-extractor</artifactId>
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ class SessionCatalog(
functionResourceLoader: FunctionResourceLoader,
functionExpressionBuilder: FunctionExpressionBuilder,
cacheSize: Int = SQLConf.get.tableRelationCacheSize,
cacheTTL: Long = SQLConf.get.metadataCacheTTL) extends SQLConfHelper with Logging {
cacheTTL: Long = SQLConf.get.metadataCacheTTL) extends SQLConfHelper
with ExtendedProperties with Logging {
import SessionCatalog._
import CatalogTypes.TablePartitionSpec

Expand Down
4 changes: 4 additions & 0 deletions sql/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-asm9-shaded</artifactId>
</dependency>
<dependency>
<groupId>io.kyligence.mrs</groupId>
<artifactId>mrs-spark-extractor</artifactId>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_${scala.binary.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ class SparkSession private(
// The call site where this SparkSession was constructed.
private val creationSite: CallSite = Utils.getCallSite()

private[spark] val sessionStateInitialized: AtomicBoolean = new AtomicBoolean(false)

/**
* Constructor used in Pyspark. Contains explicit application of Spark Session Extensions
* which otherwise only occurs during getOrCreate. We cannot add this to the default constructor
Expand All @@ -110,7 +112,9 @@ class SparkSession private(

// If there is no active SparkSession, uses the default SQL conf. Otherwise, use the session's.
SQLConf.setSQLConfGetter(() => {
SparkSession.getActiveSession.filterNot(_.sparkContext.isStopped).map(_.sessionState.conf)
SparkSession.getActiveSession.filterNot(_.sparkContext.isStopped)
.filter(_.sessionStateInitialized.get())
.map(_.sessionState.conf)
.getOrElse(SQLConf.getFallbackConf)
})

Expand Down Expand Up @@ -157,6 +161,7 @@ class SparkSession private(
val state = SparkSession.instantiateSessionState(
SparkSession.sessionStateClassName(sharedState.conf),
self)
sessionStateInitialized.set(true)
state
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@ package org.apache.spark.sql.internal

import java.io.File
import java.net.URI

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import org.apache.spark.annotation.Unstable
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry, TableFunctionRegistry}
Expand Down Expand Up @@ -94,6 +92,10 @@ private[sql] class SessionState(
// when connecting to ThriftServer.
lazy val streamingQueryManager: StreamingQueryManager = streamingQueryManagerBuilder()

var preExecutionRules: Seq[Rule[SparkPlan]] = _

val sessionStateListenerManager: SessionStateListenerManager = new SessionStateListenerManager()

def catalogManager: CatalogManager = analyzer.catalogManager

def newHadoopConf(): Configuration = SessionState.newHadoopConf(
Expand Down
4 changes: 4 additions & 0 deletions sql/hive/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.kyligence.mrs</groupId>
<artifactId>mrs-spark-extractor</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ private[spark] object HiveUtils extends Logging {
hadoopConf = hadoopConf,
execJars = jars.toSeq,
config = configurations,
isolationOn = !isCliSessionState(),
isolationOn = false,
barrierPrefixes = hiveMetastoreBarrierPrefixes,
sharedPrefixes = hiveMetastoreSharedPrefixes)
} else if (hiveMetastoreJars == "maven") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,9 @@ private[hive] class HiveClientImpl(

private def newState(): SessionState = {
val hiveConf = newHiveConf(sparkConf, hadoopConf, extraConfig, Some(initClassLoader))
val state = new SessionState(hiveConf)
val user = Utils.getCurrentUserName()
logInfo(s"newState user: ${user}")
val state = new SessionState(hiveConf, user)
if (clientLoader.cachedHive != null) {
Hive.set(clientLoader.cachedHive.asInstanceOf[Hive])
}
Expand Down Expand Up @@ -270,7 +272,7 @@ private[hive] class HiveClientImpl(
false
}

private def client: Hive = {
def client: Hive = {
if (clientLoader.cachedHive != null) {
clientLoader.cachedHive.asInstanceOf[Hive]
} else {
Expand Down Expand Up @@ -1273,8 +1275,8 @@ private[hive] object HiveClientImpl extends Logging {
// 3: we set all entries in config to this hiveConf.
val confMap = (hadoopConf.iterator().asScala.map(kv => kv.getKey -> kv.getValue) ++
sparkConf.getAll.toMap ++ extraConfig).toMap
confMap.foreach { case (k, v) => hiveConf.set(k, v) }
SQLConf.get.redactOptions(confMap).foreach { case (k, v) =>
confMap.foreach { case (k, v) =>
hiveConf.set(k, v)
logDebug(s"Applying Hadoop/Hive/Spark and extra properties to Hive Conf:$k=$v")
}
// Disable CBO because we removed the Calcite dependency.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,15 @@
package org.apache.spark.sql.hive.client

import java.io.File
import java.lang.reflect.InvocationTargetException
import java.lang.reflect.{InvocationTargetException}
import java.net.{URL, URLClassLoader}
import java.util

import scala.util.Try

import org.apache.commons.io.{FileUtils, IOUtils}
import org.apache.commons.lang3.{JavaVersion, SystemUtils}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.shims.ShimLoader

import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkSubmitUtils
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -295,8 +292,11 @@ private[hive] class IsolatedClientLoader(
private[hive] def createClient(): HiveClient = synchronized {
val warehouseDir = Option(hadoopConf.get(ConfVars.METASTOREWAREHOUSE.varname))
if (!isolationOn) {
return new HiveClientImpl(version, warehouseDir, sparkConf, hadoopConf, config,
baseClassLoader, this)
return Thread.currentThread().getContextClassLoader
.loadClass(getHiveClientImplClassName())
.getConstructors.head
.newInstance(version, warehouseDir, sparkConf, hadoopConf, config, baseClassLoader, this)
.asInstanceOf[HiveClient]
}
// Pre-reflective instantiation setup.
logDebug("Initializing the logger to avoid disaster...")
Expand All @@ -305,7 +305,7 @@ private[hive] class IsolatedClientLoader(

try {
classLoader
.loadClass(classOf[HiveClientImpl].getName)
.loadClass(getHiveClientImplClassName())
.getConstructors.head
.newInstance(version, warehouseDir, sparkConf, hadoopConf, config, classLoader, this)
.asInstanceOf[HiveClient]
Expand All @@ -328,4 +328,8 @@ private[hive] class IsolatedClientLoader(
* IsolatedClientLoader).
*/
private[hive] var cachedHive: Any = null

private[hive] def getHiveClientImplClassName(): String = {
sparkConf.get("spark.sql.hive.implementation", HiveClientImpl.getClass.getName)
}
}

0 comments on commit c864311

Please sign in to comment.