Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

Commit

Permalink
Enable ApplyHyperspace rule (#446)
Browse files Browse the repository at this point in the history
  • Loading branch information
sezruby authored May 28, 2021
1 parent 17cd87f commit 3a91624
Show file tree
Hide file tree
Showing 8 changed files with 246 additions and 33 deletions.
24 changes: 20 additions & 4 deletions src/main/scala/com/microsoft/hyperspace/Hyperspace.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.spark.sql.{DataFrame, SparkSession}
import com.microsoft.hyperspace.index._
import com.microsoft.hyperspace.index.IndexConstants.{OPTIMIZE_MODE_QUICK, REFRESH_MODE_FULL}
import com.microsoft.hyperspace.index.plananalysis.PlanAnalyzer
import com.microsoft.hyperspace.index.rules.ApplyHyperspace
import com.microsoft.hyperspace.index.sources.FileBasedSourceProviderManager

class Hyperspace(spark: SparkSession) {
Expand All @@ -40,7 +41,9 @@ class Hyperspace(spark: SparkSession) {
* @param indexConfig the configuration of index to be created.
*/
def createIndex(df: DataFrame, indexConfig: IndexConfig): Unit = {
indexManager.create(df, indexConfig)
withHyperspaceRuleDisabled {
indexManager.create(df, indexConfig)
}
}

/**
Expand Down Expand Up @@ -87,7 +90,9 @@ class Hyperspace(spark: SparkSession) {
* @param mode Refresh mode. Currently supported modes are `incremental` and `full`.
*/
def refreshIndex(indexName: String, mode: String): Unit = {
indexManager.refresh(indexName, mode)
withHyperspaceRuleDisabled {
indexManager.refresh(indexName, mode)
}
}

/**
Expand All @@ -102,7 +107,7 @@ class Hyperspace(spark: SparkSession) {
* @param indexName Name of the index to optimize.
*/
def optimizeIndex(indexName: String): Unit = {
indexManager.optimize(indexName, OPTIMIZE_MODE_QUICK)
optimizeIndex(indexName, OPTIMIZE_MODE_QUICK)
}

/**
Expand All @@ -125,7 +130,9 @@ class Hyperspace(spark: SparkSession) {
* files, based on a threshold. "full" refers to recreation of index.
*/
def optimizeIndex(indexName: String, mode: String): Unit = {
indexManager.optimize(indexName, mode)
withHyperspaceRuleDisabled {
indexManager.optimize(indexName, mode)
}
}

/**
Expand Down Expand Up @@ -163,6 +170,15 @@ class Hyperspace(spark: SparkSession) {
def index(indexName: String): DataFrame = {
indexManager.index(indexName)
}

private def withHyperspaceRuleDisabled(f: => Unit): Unit = {
try {
ApplyHyperspace.disableForIndexMaintenance.set(true)
f
} finally {
ApplyHyperspace.disableForIndexMaintenance.set(false)
}
}
}

object Hyperspace extends ActiveSparkSession {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ object CandidateIndexCollector extends ActiveSparkSession {
* Apply Hyperspace indexes based on the score of each index application.
*/
class ScoreBasedIndexPlanOptimizer {
// TODO: FilterIndexRule :: JoinIndexRule :: Nil
private val rules: Seq[HyperspaceRule] = NoOpRule :: Nil
private val rules
: Seq[HyperspaceRule] = disabled.FilterIndexRule :: disabled.JoinIndexRule :: NoOpRule :: Nil

// Map for memoization. The key is the logical plan before applying [[HyperspaceRule]]s
// and its value is a pair of best transformed plan and its score.
Expand All @@ -78,8 +78,29 @@ class ScoreBasedIndexPlanOptimizer {
// If pre-calculated value exists, return it.
scoreMap.get(plan).foreach(res => return res)

val optResult = (plan, 0)
// TODO apply indexes recursively.
def recChildren(cur: LogicalPlan): (LogicalPlan, Int) = {
// Get the best plan & score for each child node.
var score = 0
val resultPlan = cur.mapChildren { child =>
val res = recApply(child, indexes)
score += res._2
res._1
}
(resultPlan, score)
}

var optResult = (plan, 0)
rules.foreach { rule =>
val (transformedPlan, curScore) = rule(plan, indexes)
if (curScore > 0 || rule.equals(NoOpRule)) {
// Positive curScore means the rule is applied.
val result = recChildren(transformedPlan)
if (optResult._2 < result._2 + curScore) {
// Update if the total score is higher than the previous optimal.
optResult = (result._1, result._2 + curScore)
}
}
}

scoreMap.put(plan, optResult)
optResult
Expand Down Expand Up @@ -109,7 +130,15 @@ object ApplyHyperspace
type PlanToIndexesMap = Map[LogicalPlan, Seq[IndexLogEntry]]
type PlanToSelectedIndexMap = Map[LogicalPlan, IndexLogEntry]

// Flag to disable ApplyHyperspace rule during index maintenance jobs such as createIndex,
// refreshIndex and optimizeIndex.
private[hyperspace] val disableForIndexMaintenance = new ThreadLocal[Boolean]

override def apply(plan: LogicalPlan): LogicalPlan = {
if (disableForIndexMaintenance.get) {
return plan
}

val indexManager = Hyperspace
.getContext(spark)
.indexCollectionManager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ trait IndexFilter extends ActiveSparkSession {
* Append a given reason string to FILTER_REASONS tag of the index if the condition is false and
* FILTER_REASONS_ENABLED tag is set to the index.
*
* @param condition Flag for reason string
* @param condition Flag to append reason string
* @param plan Query plan to tag
* @param index Index to tag
* @param reasonString Informational message in case condition is false.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ object FilterRankFilter extends IndexRankFilter {
|| applicableIndexes.head._2.isEmpty) {
Map.empty
} else {
val selected = FilterIndexRanker.rank(spark, plan, applicableIndexes.head._2).get
val relation = RuleUtils.getRelation(spark, plan).get
val selected = FilterIndexRanker.rank(spark, relation.plan, applicableIndexes.head._2).get
setFilterReasonTagForRank(plan, applicableIndexes.head._2, selected)
Map(applicableIndexes.head._1 -> selected)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -679,7 +679,7 @@ object JoinIndexRule extends HyperspaceRule with HyperspaceEventLogging {
.getOrElse {
relation.allFileInfos.foldLeft(0L) { (res, f) =>
if (index.sourceFileInfoSet.contains(f)) {
(res + f.size) // count, total bytes
res + f.size // count, total bytes
} else {
res
}
Expand Down
19 changes: 4 additions & 15 deletions src/main/scala/com/microsoft/hyperspace/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,9 @@ package com.microsoft
import org.apache.spark.sql.SparkSession

import com.microsoft.hyperspace.index.execution.BucketUnionStrategy
import com.microsoft.hyperspace.index.rules.{FilterIndexRule, JoinIndexRule}
import com.microsoft.hyperspace.index.rules.ApplyHyperspace

package object hyperspace {
// The order of Hyperspace index rules does matter here, because by our current design, once an
// index rule is applied to a base table, no further index rules can be applied to the same
// table again.
// For instance, let's say the Join rule gets applied first, then the original data source gets
// replaced by its index. Now we have a new logical plan with the index folder as the "new"
// data source. If the Filter rule gets applied on this, no change will happen because
// this "new" data source doesn't have any indexes.
// We therefore choose to put JoinIndexRule before FilterIndexRule to give join indexes
// higher priority, because join indexes typically result in higher performance improvement
// compared to filter indexes.
private val hyperspaceOptimizationRuleBatch = JoinIndexRule :: FilterIndexRule :: Nil

/**
* Hyperspace-specific implicit class on SparkSession.
Expand All @@ -47,7 +36,7 @@ package object hyperspace {
def enableHyperspace(): SparkSession = {
disableHyperspace
sparkSession.sessionState.experimentalMethods.extraOptimizations ++=
hyperspaceOptimizationRuleBatch
ApplyHyperspace :: Nil
sparkSession.sessionState.experimentalMethods.extraStrategies ++=
BucketUnionStrategy :: Nil
sparkSession
Expand All @@ -61,7 +50,7 @@ package object hyperspace {
def disableHyperspace(): SparkSession = {
val experimentalMethods = sparkSession.sessionState.experimentalMethods
experimentalMethods.extraOptimizations =
experimentalMethods.extraOptimizations.filterNot(hyperspaceOptimizationRuleBatch.contains)
experimentalMethods.extraOptimizations.filterNot(ApplyHyperspace.equals)
experimentalMethods.extraStrategies =
experimentalMethods.extraStrategies.filterNot(BucketUnionStrategy.equals)
sparkSession
Expand All @@ -74,7 +63,7 @@ package object hyperspace {
*/
def isHyperspaceEnabled(): Boolean = {
val experimentalMethods = sparkSession.sessionState.experimentalMethods
hyperspaceOptimizationRuleBatch.forall(experimentalMethods.extraOptimizations.contains) &&
experimentalMethods.extraOptimizations.contains(ApplyHyperspace) &&
experimentalMethods.extraStrategies.contains(BucketUnionStrategy)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@ import org.apache.spark.sql.execution.datasources.{FileIndex, HadoopFsRelation,
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec

import com.microsoft.hyperspace.{Hyperspace, Implicits, SampleData, TestConfig, TestUtils}
import com.microsoft.hyperspace.actions.Constants
import com.microsoft.hyperspace.index.IndexConstants.{GLOBBING_PATTERN_KEY, REFRESH_MODE_INCREMENTAL, REFRESH_MODE_QUICK}
import com.microsoft.hyperspace.index.IndexLogEntryTags._
import com.microsoft.hyperspace.index.execution.BucketUnionStrategy
import com.microsoft.hyperspace.index.plans.logical.IndexHadoopFsRelation
import com.microsoft.hyperspace.index.rules.{FilterIndexRule, JoinIndexRule}
import com.microsoft.hyperspace.index.rules.{ApplyHyperspace, CandidateIndexCollector}
import com.microsoft.hyperspace.index.rules.disabled.JoinIndexRule
import com.microsoft.hyperspace.util.PathUtils

class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite {
Expand Down Expand Up @@ -70,7 +71,7 @@ class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite {
}

test("verify enableHyperspace()/disableHyperspace() plug in/out optimization rules.") {
val expectedOptimizationRuleBatch = Seq(JoinIndexRule, FilterIndexRule)
val expectedOptimizationRuleBatch = Seq(ApplyHyperspace)
val expectedOptimizationStrategy = Seq(BucketUnionStrategy)

assert(
Expand Down Expand Up @@ -225,10 +226,13 @@ class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite {

test("E2E test for join query with alias columns is not supported.") {
def verifyNoChange(f: () => DataFrame): Unit = {
spark.disableHyperspace()
val originalPlan = f().queryExecution.optimizedPlan
val updatedPlan = JoinIndexRule(originalPlan)
assert(originalPlan.equals(updatedPlan))
spark.enableHyperspace()
val plan = f().queryExecution.optimizedPlan
val allIndexes = IndexCollectionManager(spark).getIndexes(Seq(Constants.States.ACTIVE))
val candidateIndexes = CandidateIndexCollector.apply(plan, allIndexes)
val (updatedPlan, score) = JoinIndexRule.apply(plan, candidateIndexes)
assert(updatedPlan.equals(plan))
assert(score == 0)
}

withView("t1", "t2") {
Expand Down
Loading

0 comments on commit 3a91624

Please sign in to comment.