Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

Commit

Permalink
review commit
Browse files Browse the repository at this point in the history
  • Loading branch information
sezruby committed Jul 20, 2021
1 parent 28cbb3d commit e3532a2
Show file tree
Hide file tree
Showing 17 changed files with 276 additions and 234 deletions.
12 changes: 10 additions & 2 deletions src/main/scala/com/microsoft/hyperspace/Hyperspace.scala
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ class Hyperspace(spark: SparkSession) {
}

/**
* Explains how indexes will be applied to the given dataframe.
* Explain how indexes will be applied to the given dataframe.
*
* @param df dataFrame.
* @param redirectFunc optional function to redirect output of explain.
Expand All @@ -171,10 +171,18 @@ class Hyperspace(spark: SparkSession) {
indexManager.index(indexName)
}

/**
* Explain why indexes are not applied to the given dataframe.
*
* @param df Dataframe
* @param indexName Optional index name to filter out the output
* @param extended If true, print more verbose messages.
* @param redirectFunc Optional function to redirect output
*/
def whyNot(df: DataFrame, indexName: String = "", extended: Boolean = false)(
implicit redirectFunc: String => Unit = print): Unit = {
withHyperspaceRuleDisabled {
if (indexName.nonEmpty) {
if (indexName.isEmpty) {
redirectFunc(CandidateIndexAnalyzer.whyNotIndexString(spark, df, indexName, extended))
} else {
redirectFunc(CandidateIndexAnalyzer.whyNotIndexesString(spark, df, extended))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,7 @@ case class IndexLogEntry(
tags.get((plan, tag)).map(_.asInstanceOf[T])
}

def getTagValueForAllPlan[T](tag: IndexLogEntryTag[T]): Seq[(LogicalPlan, T)] = {
def getTagValuesForAllPlan[T](tag: IndexLogEntryTag[T]): Seq[(LogicalPlan, T)] = {
tags.filter(entry => entry._1._2.equals(tag)).toSeq.map { case (k, v) =>
(k._1, v.asInstanceOf[T])
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{Filter, LeafNode, LogicalPla

import com.microsoft.hyperspace.{ActiveSparkSession, Hyperspace}
import com.microsoft.hyperspace.index.IndexLogEntryTags
import com.microsoft.hyperspace.index.plananalysis.{FilterReasonCode, FilterReasons}
import com.microsoft.hyperspace.index.plananalysis.FilterReasons
import com.microsoft.hyperspace.index.rules.{HyperspaceRule, IndexRankFilter, QueryPlanIndexFilter}
import com.microsoft.hyperspace.index.rules.ApplyHyperspace.{PlanToIndexesMap, PlanToSelectedIndexMap}
import com.microsoft.hyperspace.index.sources.FileBasedRelation
Expand Down Expand Up @@ -97,21 +97,19 @@ object FilterColumnFilter extends QueryPlanIndexFilter {
withFilterReasonTag(
plan,
index,
FilterReasons.apply(
FilterReasonCode.NO_FIRST_INDEXED_COL_COND,
("firstIndexedCol", index.derivedDataset.indexedColumns.head),
("filterColumns", filterColumnNames.mkString(", ")))) {
FilterReasons.NoFirstIndexedColCond(
index.derivedDataset.indexedColumns.head,
filterColumnNames.mkString(", "))) {
ResolverUtils
.resolve(spark, index.derivedDataset.indexedColumns.head, filterColumnNames)
.isDefined
} &&
withFilterReasonTag(
plan,
index,
FilterReasons.apply(
FilterReasonCode.MISSING_REQUIRED_COL,
("requiredCols", (filterColumnNames ++ projectColumnNames).toSet.mkString(",")),
("indexCols", index.derivedDataset.referencedColumns.mkString(",")))) {
FilterReasons.MissingRequiredCol(
(filterColumnNames ++ projectColumnNames).toSet.mkString(","),
index.derivedDataset.referencedColumns.mkString(","))) {
ResolverUtils
.resolve(
spark,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
import com.microsoft.hyperspace.Hyperspace
import com.microsoft.hyperspace.index.{IndexLogEntry, IndexLogEntryTags}
import com.microsoft.hyperspace.index.covering.JoinAttributeFilter.extractConditions
import com.microsoft.hyperspace.index.plananalysis.{FilterReasonCode, FilterReasons}
import com.microsoft.hyperspace.index.plananalysis.FilterReasons
import com.microsoft.hyperspace.index.rules.{HyperspaceRule, IndexRankFilter, QueryPlanIndexFilter}
import com.microsoft.hyperspace.index.rules.ApplyHyperspace.{PlanToIndexesMap, PlanToSelectedIndexMap}
import com.microsoft.hyperspace.index.sources.FileBasedRelation
Expand Down Expand Up @@ -66,28 +66,22 @@ object JoinPlanNodeFilter extends QueryPlanIndexFilter {
val joinConditionCond = withFilterReasonTag(
plan,
leftAndRightIndexes,
FilterReasons.apply(
FilterReasonCode.NOT_ELIGIBLE_JOIN,
("reason", "Non equi-join or has literal"))) {
FilterReasons.NotEligibleJoin("Non equi-join or has literal")) {
isJoinConditionSupported(condition)
}

val leftPlanLinearCond =
withFilterReasonTag(
plan,
leftAndRightIndexes,
FilterReasons.apply(
FilterReasonCode.NOT_ELIGIBLE_JOIN,
("reason", "Non linear left child plan"))) {
FilterReasons.NotEligibleJoin("Non linear left child plan")) {
isPlanLinear(l)
}
val rightPlanLinearCond =
withFilterReasonTag(
plan,
leftAndRightIndexes,
FilterReasons.apply(
FilterReasonCode.NOT_ELIGIBLE_JOIN,
("reason", "Non linear right child plan"))) {
FilterReasons.NotEligibleJoin("Non linear right child plan")) {
isPlanLinear(r)
}

Expand All @@ -108,9 +102,7 @@ object JoinPlanNodeFilter extends QueryPlanIndexFilter {
setFilterReasonTag(
plan,
candidateIndexes.values.flatten.toSeq,
FilterReasons.apply(
FilterReasonCode.NOT_ELIGIBLE_JOIN,
("reason", "No join condition")))
FilterReasons.NotEligibleJoin("No join condition"))
Map.empty
case _ =>
Map.empty
Expand Down Expand Up @@ -178,9 +170,7 @@ object JoinAttributeFilter extends QueryPlanIndexFilter {
if (withFilterReasonTag(
plan,
candidateIndexes.head._2 ++ candidateIndexes.last._2,
FilterReasons.apply(
FilterReasonCode.NOT_ELIGIBLE_JOIN,
("reason", "incompatible left and right join columns"))) {
FilterReasons.NotEligibleJoin("incompatible left and right join columns")) {
ensureAttributeRequirements(
JoinIndexRule.leftRelation.get,
JoinIndexRule.rightRelation.get,
Expand Down Expand Up @@ -365,13 +355,11 @@ object JoinColumnFilter extends QueryPlanIndexFilter {
if (withFilterReasonTag(
plan,
candidateIndexes.head._2 ++ candidateIndexes.last._2,
FilterReasons.apply(FilterReasonCode.NO_AVAIL_JOIN_INDEX_PAIR, ("child", "left")))(
lIndexes.nonEmpty) &&
FilterReasons.NoAvailJoinIndexPair("left"))(lIndexes.nonEmpty) &&
withFilterReasonTag(
plan,
candidateIndexes.head._2 ++ candidateIndexes.last._2,
FilterReasons.apply(FilterReasonCode.NO_AVAIL_JOIN_INDEX_PAIR, ("child", "right")))(
rIndexes.nonEmpty)) {
FilterReasons.NoAvailJoinIndexPair("right"))(rIndexes.nonEmpty)) {
Map(leftRelation.plan -> lIndexes, rightRelation.plan -> rIndexes)
} else {
Map.empty
Expand Down Expand Up @@ -415,6 +403,19 @@ object JoinColumnFilter extends QueryPlanIndexFilter {
}.toMap
}

/**
* Get usable indexes which satisfy indexed and included column requirements.
*
* Pre-requisite: the indexed and included columns required must be already resolved with their
* corresponding base relation columns at this point.
*
* @param plan Query plan
* @param indexes All available indexes for the logical plan
* @param requiredIndexCols required indexed columns resolved with their base relation column.
* @param allRequiredCols required included columns resolved with their base relation column.
* @return Indexes which satisfy the indexed and covering column requirements from the logical
* plan and join condition
*/
private def getUsableIndexes(
plan: LogicalPlan,
indexes: Seq[IndexLogEntry],
Expand All @@ -428,21 +429,19 @@ object JoinColumnFilter extends QueryPlanIndexFilter {
withFilterReasonTag(
plan,
idx,
FilterReasons.apply(
FilterReasonCode.NOT_ALL_JOIN_COL_INDEXED,
("child", leftOrRight),
("joinCols", requiredIndexCols.mkString(", ")),
("indexedCols", idx.indexedColumns.mkString(", ")))) {
FilterReasons.NotAllJoinColIndexed(
leftOrRight,
requiredIndexCols.mkString(", "),
idx.indexedColumns.mkString(", "))) {
requiredIndexCols.toSet.equals(idx.indexedColumns.toSet)
} &&
withFilterReasonTag(
plan,
idx,
FilterReasons.apply(
FilterReasonCode.MISSING_INDEXED_COL,
("child", leftOrRight),
("requiredIndexedCols", allRequiredCols.mkString(", ")),
("IndexedCols", idx.indexedColumns.mkString(", ")))) {
FilterReasons.MissingIndexedCol(
leftOrRight,
allRequiredCols.mkString(", "),
idx.indexedColumns.mkString(", "))) {
allRequiredCols.forall(allCols.contains)
}
}
Expand Down Expand Up @@ -526,7 +525,7 @@ object JoinRankFilter extends IndexRankFilter {
setFilterReasonTag(
plan,
indexes.head._2 ++ indexes.last._2,
FilterReasons.apply(FilterReasonCode.NO_COMPATIBLE_JOIN_INDEX_PAIR))
FilterReasons.NoCompatibleJoinIndexPair())
Map.empty
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,16 +104,6 @@ object CandidateIndexAnalyzer extends Logging {
applicableIndexes: Seq[(IndexLogEntry, Seq[(LogicalPlan, Seq[String])])]): String = {
val stringBuilder = new StringBuilder
val originalPlanString = planWithoutHyperspace.numberedTreeString.split('\n')
if (applicableIndexes.isEmpty) {
return "No applicable indexes. Try hyperspace.whyNot()"
}
val newLine = System.lineSeparator()
stringBuilder.append("Plan without Hyperspace:")
stringBuilder.append(newLine)
stringBuilder.append(newLine)
stringBuilder.append(originalPlanString.mkString(newLine))
stringBuilder.append(newLine)
stringBuilder.append(newLine)

// to Dataframe
// sub plan line number, index name, rule name
Expand All @@ -132,6 +122,18 @@ object CandidateIndexAnalyzer extends Logging {
.sortBy(r => (r._1, r._3))
.distinct

if (res.isEmpty) {
return "No applicable indexes. Try hyperspace.whyNot()"
}
val newLine = System.lineSeparator()
stringBuilder.append("Plan without Hyperspace:")
stringBuilder.append(newLine)
stringBuilder.append(newLine)
stringBuilder.append(originalPlanString.mkString(newLine))
stringBuilder.append(newLine)
stringBuilder.append(newLine)


import spark.implicits._
val df = res.toDF("SubPlan", "IndexName", "IndexType", "RuleName")

Expand Down Expand Up @@ -330,9 +332,9 @@ object CandidateIndexAnalyzer extends Logging {
transformedPlan,
indexes
.filter(i => indexName.isEmpty || indexName.get.equals(i.name))
.map(i => (i, i.getTagValueForAllPlan(IndexLogEntryTags.FILTER_REASONS))),
.map(i => (i, i.getTagValuesForAllPlan(IndexLogEntryTags.FILTER_REASONS))),
indexes
.map(i => (i, i.getTagValueForAllPlan(IndexLogEntryTags.APPLICABLE_INDEX_RULES))))
.map(i => (i, i.getTagValuesForAllPlan(IndexLogEntryTags.APPLICABLE_INDEX_RULES))))
} finally {
cleanupAnalysisTags(indexes)
}
Expand Down
Loading

0 comments on commit e3532a2

Please sign in to comment.