Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

Commit

Permalink
Introduce whyNot API
Browse files Browse the repository at this point in the history
  • Loading branch information
sezruby committed Jun 1, 2021
1 parent 0011d0d commit c3fd62f
Show file tree
Hide file tree
Showing 12 changed files with 275 additions and 37 deletions.
13 changes: 12 additions & 1 deletion src/main/scala/com/microsoft/hyperspace/Hyperspace.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.spark.sql.{DataFrame, SparkSession}
import com.microsoft.hyperspace.index._
import com.microsoft.hyperspace.index.IndexConstants.{OPTIMIZE_MODE_QUICK, REFRESH_MODE_FULL}
import com.microsoft.hyperspace.index.plananalysis.PlanAnalyzer
import com.microsoft.hyperspace.index.rules.ApplyHyperspace
import com.microsoft.hyperspace.index.rules.{ApplyHyperspace, WhyNotAnalyzer}
import com.microsoft.hyperspace.index.sources.FileBasedSourceProviderManager

class Hyperspace(spark: SparkSession) {
Expand Down Expand Up @@ -171,6 +171,17 @@ class Hyperspace(spark: SparkSession) {
indexManager.index(indexName)
}

def whyNot(df: DataFrame, indexName: String = "")(
implicit redirectFunc: String => Unit = print): Unit = {
withHyperspaceRuleDisabled {
if (indexName.nonEmpty) {
redirectFunc(WhyNotAnalyzer.whyNotIndexString(spark, df, indexName))
} else {
redirectFunc(WhyNotAnalyzer.whyNotIndexesString(spark, df))
}
}
}

private def withHyperspaceRuleDisabled(f: => Unit): Unit = {
try {
ApplyHyperspace.disableForIndexMaintenance.set(true)
Expand Down
13 changes: 13 additions & 0 deletions src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala
Original file line number Diff line number Diff line change
Expand Up @@ -583,10 +583,23 @@ case class IndexLogEntry(
tags.get((plan, tag)).map(_.asInstanceOf[T])
}

def getTagValueForAllPlan[T](tag: IndexLogEntryTag[T]): Seq[(LogicalPlan, T)] = {
tags.filter(entry => entry._1._2.equals(tag)).toSeq.map { case (k, v) =>
(k._1, v.asInstanceOf[T])
}
}

def unsetTagValue[T](plan: LogicalPlan, tag: IndexLogEntryTag[T]): Unit = {
tags.remove((plan, tag))
}

def unsetTagValueForAllPlan[T](tag: IndexLogEntryTag[T]): Unit = {
val plansWithTag = tags.keys.filter(_._2.name.equals(tag.name)).map(_._1)
plansWithTag.foreach { plan =>
tags.remove((plan, tag))
}
}

def withCachedTag[T](plan: LogicalPlan, tag: IndexLogEntryTag[T])(f: => T): T = {
getTagValue(plan, tag) match {
case Some(v) => v
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,12 @@ object IndexLogEntryTags {
val FILTER_REASONS: IndexLogEntryTag[Seq[String]] =
IndexLogEntryTag[Seq[String]]("filterReasons")

// APPLIED_INDEX_RULES stores rule's names can apply the index to the plan.
val APPLICABLE_INDEX_RULES: IndexLogEntryTag[Seq[String]] =
IndexLogEntryTag[Seq[String]]("applicableIndexRules")

// FILTER_REASONS_ENABLED indicates whether whyNotAPI is enabled or not.
val FILTER_REASONS_ENABLED: IndexLogEntryTag[Boolean] =
IndexLogEntryTag[Boolean]("filterReasonsEnabled")
// If it's enabled, FILTER_REASONS and APPLIED_INDEX_RULES info will be tagged.
val INDEX_PLAN_ANALYSIS_ENABLED: IndexLogEntryTag[Boolean] =
IndexLogEntryTag[Boolean]("indexPlanAnalysisEnabled")
}
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ object FilterIndexRule extends HyperspaceRule {
return plan
}

setApplicableIndexTag(plan, indexes.head._2)

// As FilterIndexRule is not intended to support bucketed scan, we set
// useBucketUnionForAppended as false. If it's true, Hybrid Scan can cause
// unnecessary shuffle for appended data to apply BucketUnion for merging data.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package com.microsoft.hyperspace.index.rules
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

import com.microsoft.hyperspace.ActiveSparkSession
import com.microsoft.hyperspace.index.{IndexLogEntry, IndexLogEntryTags}
import com.microsoft.hyperspace.index.rules.ApplyHyperspace.{PlanToIndexesMap, PlanToSelectedIndexMap}

/**
Expand Down Expand Up @@ -75,6 +76,17 @@ trait HyperspaceRule extends ActiveSparkSession {
(plan, 0)
}
}

protected def setApplicableIndexTag(plan: LogicalPlan, index: IndexLogEntry): Unit = {
if (index.getTagValue(IndexLogEntryTags.INDEX_PLAN_ANALYSIS_ENABLED).getOrElse(false)) {
val prevRules =
index.getTagValue(plan, IndexLogEntryTags.APPLICABLE_INDEX_RULES).getOrElse(Nil)
index.setTagValue(
plan,
IndexLogEntryTags.APPLICABLE_INDEX_RULES,
prevRules :+ s"[${index.name},${getClass.getSimpleName.split("\\$").last}]")
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,16 @@ trait IndexFilter extends ActiveSparkSession {
index: IndexLogEntry,
reasonString: => String): Unit = {
if (!condition && index
.getTagValue(IndexLogEntryTags.FILTER_REASONS_ENABLED)
.getTagValue(IndexLogEntryTags.INDEX_PLAN_ANALYSIS_ENABLED)
.getOrElse(false)) {
val reasonStringWithInfo =
s"[${index.name},${getClass.getSimpleName.split("\\$").last}] $reasonString"
val prevReason =
index.getTagValue(plan, IndexLogEntryTags.FILTER_REASONS).getOrElse(Nil)
index.setTagValue(plan, IndexLogEntryTags.FILTER_REASONS, prevReason :+ reasonString)
index.setTagValue(
plan,
IndexLogEntryTags.FILTER_REASONS,
prevReason :+ reasonStringWithInfo)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,9 @@ object JoinIndexRule extends HyperspaceRule with HyperspaceEventLogging {
val lIndex = indexes(leftRelation.get.plan)
val rIndex = indexes(rightRelation.get.plan)

setApplicableIndexTag(plan, lIndex)
setApplicableIndexTag(plan, rIndex)

val updatedPlan =
join
.copy(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
/*
* Copyright (2020) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.microsoft.hyperspace.index.rules

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

import com.microsoft.hyperspace.{Hyperspace, HyperspaceException}
import com.microsoft.hyperspace.actions.Constants
import com.microsoft.hyperspace.index.{IndexLogEntry, IndexLogEntryTags}

object WhyNotAnalyzer extends Logging {

private def cleanupAnalysisTags(indexes: Seq[IndexLogEntry]): Unit = {
indexes.foreach { index =>
index.unsetTagValueForAllPlan(IndexLogEntryTags.INDEX_PLAN_ANALYSIS_ENABLED)
index.unsetTagValueForAllPlan(IndexLogEntryTags.FILTER_REASONS)
index.unsetTagValueForAllPlan(IndexLogEntryTags.APPLICABLE_INDEX_RULES)
}
}

private def prepareTagsForAnalysis(indexes: Seq[IndexLogEntry]): Unit = {
indexes.foreach { index =>
index.setTagValue(IndexLogEntryTags.INDEX_PLAN_ANALYSIS_ENABLED, true)

// Clean up previous reason tags.
index.unsetTagValueForAllPlan(IndexLogEntryTags.FILTER_REASONS)
index.unsetTagValueForAllPlan(IndexLogEntryTags.APPLICABLE_INDEX_RULES)
}
}

private def generateWhyNotString(
planWithoutHyperspace: LogicalPlan,
planWithHyperspace: LogicalPlan,
reasonStrings: Seq[(LogicalPlan, Seq[String])],
applyStrings: Seq[(LogicalPlan, Seq[String])]): String = {
val stringBuilder = new StringBuilder
val originalPlanString = planWithoutHyperspace.numberedTreeString.split('\n')

stringBuilder.append("\nPlan with Hyperspace\n")
stringBuilder.append("============================================\n")
stringBuilder.append(planWithHyperspace)
stringBuilder.append("\n\n")
stringBuilder.append("Plan without Hyperspace\n")
stringBuilder.append("============================================\n")
stringBuilder.append(originalPlanString.mkString("\n"))
stringBuilder.append("\n\n")

stringBuilder.append("********************************************\n")
stringBuilder.append(s"WhyNot Analysis\n")

reasonStrings.groupBy(_._1).foreach {
case (plan, stringsForPlan) =>
val planStrLines = plan.toString.split('\n')
val firstLineStr = originalPlanString.find(str => str.endsWith(planStrLines.head))
stringBuilder.append("============================================\n")
stringBuilder.append("Candidate plan:\n")
// Start with numbered plan string
stringBuilder.append(firstLineStr.get)
stringBuilder.append("\n")
val numberSpaces = " " * (firstLineStr.get.length - planStrLines.head.length)
stringBuilder.append(numberSpaces)
stringBuilder.append(planStrLines.tail.toSeq.mkString(s"\n$numberSpaces"))
stringBuilder.append("\n\nFailure reasons:\n")
stringsForPlan.foreach {
case (_, strings) =>
stringBuilder.append("- ")
stringBuilder.append(strings.mkString("\n- "))
stringBuilder.append("\n")
}
}

stringBuilder.append("\n")
if (applyStrings.nonEmpty) {
stringBuilder.append("********************************************\n")
stringBuilder.append("Applicable indexes\n")
applyStrings.groupBy(_._1).foreach {
case (plan, stringsForPlan) =>
val planStrLines = plan.toString.split('\n')
val firstLineStr = originalPlanString.find(str => str.endsWith(planStrLines.head))
stringBuilder.append("============================================\n")
stringBuilder.append("Source plan:\n")
// Start with numbered plan string
stringBuilder.append(firstLineStr.get)
stringBuilder.append("\n")
val numberSpaces = " " * (firstLineStr.get.length - planStrLines.head.length)
stringBuilder.append(numberSpaces)
stringBuilder.append(planStrLines.tail.toSeq.mkString(s"\n$numberSpaces"))
stringBuilder.append("\n\nApplicable indexes:\n")
stringsForPlan.foreach {
case (_, strings) =>
stringBuilder.append("- ")
stringBuilder.append(strings.mkString("\n- "))
stringBuilder.append("\n")
}
}
}
stringBuilder.toString
}

def whyNotIndexString(spark: SparkSession, df: DataFrame, indexName: String): String = {
val (planWithHyperspace, reasonStrings, applyStrings) = whyNotIndex(spark, df, indexName)
generateWhyNotString(
df.queryExecution.optimizedPlan,
planWithHyperspace,
reasonStrings,
applyStrings)
}

def whyNotIndexesString(spark: SparkSession, df: DataFrame): String = {
val (planWithHyperspace, reasonStrings, applyStrings) = whyNotIndexes(spark, df)
generateWhyNotString(
df.queryExecution.optimizedPlan,
planWithHyperspace,
reasonStrings,
applyStrings)
}

def whyNotIndex(spark: SparkSession, df: DataFrame, indexName: String)
: (LogicalPlan, Seq[(LogicalPlan, Seq[String])], Seq[(LogicalPlan, Seq[String])]) = {
val plan = df.queryExecution.optimizedPlan

val indexManager = Hyperspace.getContext(spark).indexCollectionManager
val allIndexes =
indexManager.getIndexes().filter(!_.state.equals(Constants.States.DOESNOTEXIST))
val targetIndex = allIndexes.find(index => index.name.equals(indexName))

if (targetIndex.isEmpty) {
throw HyperspaceException(s"Index with name $indexName could not be found.")
} else if (!targetIndex.get.state.equals(Constants.States.ACTIVE)) {
throw HyperspaceException(
s"Index with name $indexName is not ACTIVE state: ${targetIndex.get.state}")
}

val allActiveIndexes = allIndexes.filter(_.state.equals(Constants.States.ACTIVE))
applyHyperspaceForAnalysis(plan, allActiveIndexes)
}

def whyNotIndexes(spark: SparkSession, df: DataFrame)
: (LogicalPlan, Seq[(LogicalPlan, Seq[String])], Seq[(LogicalPlan, Seq[String])]) = {
val plan = df.queryExecution.optimizedPlan

val indexManager = Hyperspace.getContext(spark).indexCollectionManager
val allActiveIndexes = indexManager.getIndexes(Seq(Constants.States.ACTIVE))

if (allActiveIndexes.isEmpty) {
throw HyperspaceException(s"No available ACTIVE indexes")
}
applyHyperspaceForAnalysis(plan, allActiveIndexes)
}

def applyHyperspaceForAnalysis(plan: LogicalPlan, indexes: Seq[IndexLogEntry])
: (LogicalPlan, Seq[(LogicalPlan, Seq[String])], Seq[(LogicalPlan, Seq[String])]) = {
try {
prepareTagsForAnalysis(indexes)
val candidateIndexes = CandidateIndexCollector.apply(plan, indexes)
val transformedPlan = new ScoreBasedIndexPlanOptimizer().apply(plan, candidateIndexes)
(
transformedPlan,
indexes.flatMap(_.getTagValueForAllPlan(IndexLogEntryTags.FILTER_REASONS)),
indexes.flatMap(_.getTagValueForAllPlan(IndexLogEntryTags.APPLICABLE_INDEX_RULES)))
} finally {
cleanupAnalysisTags(indexes)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ class ScoreBasedIndexPlanOptimizerTest extends QueryTest with HyperspaceSuite {
assert(rightChildScore == 50)
assert(!rightChildPlan.equals(plan.children.last))

hyperspace.whyNot(query(leftDf, rightDf)())

verifyIndexUsage(
query(leftDf, rightDf),
getIndexFilesPath(leftDfFilterIndexConfig.indexName) ++
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ class CandidateIndexCollectorTest extends HyperspaceRuleSuite with SQLHelper {
}

val allIndexes = indexList.map(indexName => latestIndexLogEntry(systemPath, indexName))
allIndexes.foreach(_.setTagValue(IndexLogEntryTags.FILTER_REASONS_ENABLED, true))
allIndexes.foreach(_.setTagValue(IndexLogEntryTags.INDEX_PLAN_ANALYSIS_ENABLED, true))

val plan1 =
spark.read.parquet(dataPath).select("id", "name").queryExecution.optimizedPlan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ class FilterIndexRuleTest extends HyperspaceRuleSuite {

val originalPlan = Project(Seq(c2, c3, c4), filterNode) // c4 is not covered by index
val allIndexes = IndexCollectionManager(spark).getIndexes(Seq(Constants.States.ACTIVE))
allIndexes.foreach(_.setTagValue(IndexLogEntryTags.FILTER_REASONS_ENABLED, true))
allIndexes.foreach(_.setTagValue(IndexLogEntryTags.INDEX_PLAN_ANALYSIS_ENABLED, true))
val (transformedPlan, score) = applyFilterIndexRuleHelper(originalPlan, allIndexes)
assert(transformedPlan.equals(originalPlan), "Plan should not transform.")
allIndexes.foreach { index =>
Expand All @@ -126,8 +126,9 @@ class FilterIndexRuleTest extends HyperspaceRuleSuite {
assert(msg.isDefined)
assert(
msg.get.exists(
_.equals("Index does not contain required columns. Required columns: " +
"[c3,c2,c3,c4], Indexed & included columns: [c3,c2,c1]")))
_.equals(
s"[$indexName1,FilterColumnFilter] Index does not contain required columns. " +
"Required columns: [c3,c2,c3,c4], Indexed & included columns: [c3,c2,c1]")))
case `indexName2` | `indexName3` =>
assert(msg.isDefined)
assert(
Expand All @@ -154,7 +155,7 @@ class FilterIndexRuleTest extends HyperspaceRuleSuite {
val originalPlan = Filter(filterCondition, scanNode)

val allIndexes = IndexCollectionManager(spark).getIndexes(Seq(Constants.States.ACTIVE))
allIndexes.foreach(_.setTagValue(IndexLogEntryTags.FILTER_REASONS_ENABLED, true))
allIndexes.foreach(_.setTagValue(IndexLogEntryTags.INDEX_PLAN_ANALYSIS_ENABLED, true))
val (transformedPlan, score) = applyFilterIndexRuleHelper(originalPlan, allIndexes)
assert(!transformedPlan.equals(originalPlan), "No plan transformation.")
verifyTransformedPlanWithIndex(transformedPlan, indexName2)
Expand Down
Loading

0 comments on commit c3fd62f

Please sign in to comment.