Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

Commit

Permalink
Introduce whyNot API
Browse files Browse the repository at this point in the history
  • Loading branch information
sezruby committed Jul 12, 2021
1 parent 53fb758 commit 63bc1c7
Show file tree
Hide file tree
Showing 19 changed files with 774 additions and 169 deletions.
13 changes: 12 additions & 1 deletion src/main/scala/com/microsoft/hyperspace/Hyperspace.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import org.apache.spark.sql.{DataFrame, SparkSession}

import com.microsoft.hyperspace.index._
import com.microsoft.hyperspace.index.IndexConstants.{OPTIMIZE_MODE_QUICK, REFRESH_MODE_FULL}
import com.microsoft.hyperspace.index.plananalysis.PlanAnalyzer
import com.microsoft.hyperspace.index.plananalysis.{CandidateIndexAnalyzer, PlanAnalyzer}
import com.microsoft.hyperspace.index.rules.ApplyHyperspace
import com.microsoft.hyperspace.index.sources.FileBasedSourceProviderManager

Expand Down Expand Up @@ -171,6 +171,17 @@ class Hyperspace(spark: SparkSession) {
indexManager.index(indexName)
}

def whyNot(df: DataFrame, indexName: String = "", extended: Boolean = false)(
implicit redirectFunc: String => Unit = print): Unit = {
withHyperspaceRuleDisabled {
if (indexName.nonEmpty) {
redirectFunc(CandidateIndexAnalyzer.whyNotIndexString(spark, df, indexName, extended))
} else {
redirectFunc(CandidateIndexAnalyzer.whyNotIndexesString(spark, df, extended))
}
}
}

private def withHyperspaceRuleDisabled(f: => Unit): Unit = {
try {
ApplyHyperspace.disableForIndexMaintenance.set(true)
Expand Down
13 changes: 13 additions & 0 deletions src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala
Original file line number Diff line number Diff line change
Expand Up @@ -527,10 +527,23 @@ case class IndexLogEntry(
tags.get((plan, tag)).map(_.asInstanceOf[T])
}

def getTagValueForAllPlan[T](tag: IndexLogEntryTag[T]): Seq[(LogicalPlan, T)] = {
tags.filter(entry => entry._1._2.equals(tag)).toSeq.map { case (k, v) =>
(k._1, v.asInstanceOf[T])
}
}

def unsetTagValue[T](plan: LogicalPlan, tag: IndexLogEntryTag[T]): Unit = {
tags.remove((plan, tag))
}

def unsetTagValueForAllPlan[T](tag: IndexLogEntryTag[T]): Unit = {
val plansWithTag = tags.keys.filter(_._2.name.equals(tag.name)).map(_._1)
plansWithTag.foreach { plan =>
tags.remove((plan, tag))
}
}

def withCachedTag[T](plan: LogicalPlan, tag: IndexLogEntryTag[T])(f: => T): T = {
getTagValue(plan, tag) match {
case Some(v) => v
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package com.microsoft.hyperspace.index

import org.apache.spark.sql.execution.datasources.InMemoryFileIndex

import com.microsoft.hyperspace.index.plananalysis.FilterReason

object IndexLogEntryTags {
// HYBRIDSCAN_REQUIRED indicates if Hybrid Scan is required for the index or not.
val HYBRIDSCAN_REQUIRED: IndexLogEntryTag[Boolean] =
Expand Down Expand Up @@ -55,10 +57,15 @@ object IndexLogEntryTags {
IndexLogEntryTag[InMemoryFileIndex]("inMemoryFileIndexHybridScanAppended")

// FILTER_REASONS stores reason strings for disqualification.
val FILTER_REASONS: IndexLogEntryTag[Seq[String]] =
IndexLogEntryTag[Seq[String]]("filterReasons")
val FILTER_REASONS: IndexLogEntryTag[Seq[FilterReason]] =
IndexLogEntryTag[Seq[FilterReason]]("filterReasons")

// APPLIED_INDEX_RULES stores rule's names can apply the index to the plan.
val APPLICABLE_INDEX_RULES: IndexLogEntryTag[Seq[String]] =
IndexLogEntryTag[Seq[String]]("applicableIndexRules")

// FILTER_REASONS_ENABLED indicates whether whyNotAPI is enabled or not.
val FILTER_REASONS_ENABLED: IndexLogEntryTag[Boolean] =
IndexLogEntryTag[Boolean]("filterReasonsEnabled")
// If it's enabled, FILTER_REASONS and APPLIED_INDEX_RULES info will be tagged.
val INDEX_PLAN_ANALYSIS_ENABLED: IndexLogEntryTag[Boolean] =
IndexLogEntryTag[Boolean]("indexPlanAnalysisEnabled")
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{Filter, LeafNode, LogicalPla

import com.microsoft.hyperspace.{ActiveSparkSession, Hyperspace}
import com.microsoft.hyperspace.index.IndexLogEntryTags
import com.microsoft.hyperspace.index.plananalysis.{FilterReasonCode, FilterReasons}
import com.microsoft.hyperspace.index.rules.{HyperspaceRule, IndexRankFilter, QueryPlanIndexFilter}
import com.microsoft.hyperspace.index.rules.ApplyHyperspace.{PlanToIndexesMap, PlanToSelectedIndexMap}
import com.microsoft.hyperspace.index.sources.FileBasedRelation
Expand Down Expand Up @@ -96,17 +97,23 @@ object FilterColumnFilter extends QueryPlanIndexFilter {
withFilterReasonTag(
plan,
index,
"The first indexed column should be in filter condition columns.") {
FilterReasons.apply(
FilterReasonCode.NO_FIRST_INDEXED_COL_COND,
Seq(
("firstIndexedCol", index.derivedDataset.indexedColumns.head),
("filterColumns", filterColumnNames.mkString(", "))))) {
ResolverUtils
.resolve(spark, index.derivedDataset.indexedColumns.head, filterColumnNames)
.isDefined
} &&
withFilterReasonTag(
plan,
index,
"Index does not contain required columns. Required columns: " +
s"[${(filterColumnNames ++ projectColumnNames).mkString(",")}], Indexed & " +
s"included columns: [${(index.derivedDataset.referencedColumns).mkString(",")}]") {
FilterReasons.apply(
FilterReasonCode.MISSING_REQUIRED_COL,
Seq(
("requiredCols", (filterColumnNames ++ projectColumnNames).toSet.mkString(",")),
("indexCols", index.derivedDataset.referencedColumns.mkString(","))))) {
ResolverUtils
.resolve(
spark,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
import com.microsoft.hyperspace.Hyperspace
import com.microsoft.hyperspace.index.{IndexLogEntry, IndexLogEntryTags}
import com.microsoft.hyperspace.index.covering.JoinAttributeFilter.extractConditions
import com.microsoft.hyperspace.index.plananalysis.{FilterReasonCode, FilterReasons}
import com.microsoft.hyperspace.index.rules.{HyperspaceRule, IndexRankFilter, QueryPlanIndexFilter}
import com.microsoft.hyperspace.index.rules.ApplyHyperspace.{PlanToIndexesMap, PlanToSelectedIndexMap}
import com.microsoft.hyperspace.index.sources.FileBasedRelation
Expand Down Expand Up @@ -65,17 +66,28 @@ object JoinPlanNodeFilter extends QueryPlanIndexFilter {
val joinConditionCond = withFilterReasonTag(
plan,
leftAndRightIndexes,
"Join condition is not eligible. Equi-Joins in simple CNF form are supported. " +
"Literal is not supported.") {
FilterReasons.apply(
FilterReasonCode.NOT_ELIGIBLE_JOIN,
Seq(("reason", "Non equi-join or has literal")))) {
isJoinConditionSupported(condition)
}

val leftPlanLinearCond =
withFilterReasonTag(plan, leftAndRightIndexes, "Left child is not a linear plan.") {
withFilterReasonTag(
plan,
leftAndRightIndexes,
FilterReasons.apply(
FilterReasonCode.NOT_ELIGIBLE_JOIN,
Seq(("reason", "Non linear left child plan")))) {
isPlanLinear(l)
}
val rightPlanLinearCond =
withFilterReasonTag(plan, leftAndRightIndexes, "Right child is not a linear plan.") {
withFilterReasonTag(
plan,
leftAndRightIndexes,
FilterReasons.apply(
FilterReasonCode.NOT_ELIGIBLE_JOIN,
Seq(("reason", "Non linear right child plan")))) {
isPlanLinear(r)
}

Expand All @@ -96,7 +108,9 @@ object JoinPlanNodeFilter extends QueryPlanIndexFilter {
setFilterReasonTag(
plan,
candidateIndexes.values.flatten.toSeq,
"Not eligible Join - no join condition.")
FilterReasons.apply(
FilterReasonCode.NOT_ELIGIBLE_JOIN,
Seq(("reason", "No join condition"))))
Map.empty
case _ =>
Map.empty
Expand Down Expand Up @@ -164,10 +178,9 @@ object JoinAttributeFilter extends QueryPlanIndexFilter {
if (withFilterReasonTag(
plan,
candidateIndexes.head._2 ++ candidateIndexes.last._2,
"Each join condition column should come from " +
"relations directly and attributes from left plan must exclusively have " +
"one-to-one mapping with attributes from right plan. " +
"E.g. join(A = B and A = D) is not eligible.") {
FilterReasons.apply(
FilterReasonCode.NOT_ELIGIBLE_JOIN,
Seq(("reason", "incompatible left and right join columns")))) {
ensureAttributeRequirements(
JoinIndexRule.leftRelation.get,
JoinIndexRule.rightRelation.get,
Expand Down Expand Up @@ -329,39 +342,39 @@ object JoinColumnFilter extends QueryPlanIndexFilter {
val lRequiredAllCols = resolve(spark, allRequiredCols(l), lBaseAttrs).get
val rRequiredAllCols = resolve(spark, allRequiredCols(r), rBaseAttrs).get

// Make sure required indexed columns are subset of all required columns.
assert(
resolve(spark, lRequiredIndexedCols, lRequiredAllCols).isDefined &&
resolve(spark, rRequiredIndexedCols, rRequiredAllCols).isDefined)

val lIndexes =
getUsableIndexes(
plan,
candidateIndexes.getOrElse(leftRelation.plan, Nil),
lRequiredIndexedCols,
lRequiredAllCols,
"left")
val rIndexes =
getUsableIndexes(
plan,
candidateIndexes.getOrElse(rightRelation.plan, Nil),
rRequiredIndexedCols,
rRequiredAllCols,
"right")

if (withFilterReasonTag(
plan,
candidateIndexes.head._2 ++ candidateIndexes.last._2,
"Invalid query plan.") {
// Make sure required indexed columns are subset of all required columns.
resolve(spark, lRequiredIndexedCols, lRequiredAllCols).isDefined &&
resolve(spark, rRequiredIndexedCols, rRequiredAllCols).isDefined
}) {
val lIndexes =
getUsableIndexes(
plan,
candidateIndexes.getOrElse(leftRelation.plan, Nil),
lRequiredIndexedCols,
lRequiredAllCols)
val rIndexes =
getUsableIndexes(
plan,
candidateIndexes.getOrElse(rightRelation.plan, Nil),
rRequiredIndexedCols,
rRequiredAllCols)

if (withFilterReasonTag(
plan,
candidateIndexes.head._2 ++ candidateIndexes.last._2,
"No available indexes for left subplan.")(lIndexes.nonEmpty) &&
withFilterReasonTag(
plan,
candidateIndexes.head._2 ++ candidateIndexes.last._2,
"No available indexes for right subplan.")(rIndexes.nonEmpty)) {
Map(leftRelation.plan -> lIndexes, rightRelation.plan -> rIndexes)
} else {
Map.empty
}
FilterReasons.apply(
FilterReasonCode.NO_AVAIL_JOIN_INDEX_PAIR,
Seq(("child", "left"))))(lIndexes.nonEmpty) &&
withFilterReasonTag(
plan,
candidateIndexes.head._2 ++ candidateIndexes.last._2,
FilterReasons.apply(
FilterReasonCode.NO_AVAIL_JOIN_INDEX_PAIR,
Seq(("child", "right"))))(rIndexes.nonEmpty)) {
Map(leftRelation.plan -> lIndexes, rightRelation.plan -> rIndexes)
} else {
Map.empty
}
Expand Down Expand Up @@ -404,42 +417,36 @@ object JoinColumnFilter extends QueryPlanIndexFilter {
}.toMap
}

/**
* Get usable indexes which satisfy indexed and included column requirements.
*
* Pre-requisite: the indexed and included columns required must be already resolved with their
* corresponding base relation columns at this point.
*
* @param plan Query plan
* @param indexes All available indexes for the logical plan
* @param requiredIndexCols required indexed columns resolved with their base relation column.
* @param allRequiredCols required included columns resolved with their base relation column.
* @return Indexes which satisfy the indexed and covering column requirements from the logical
* plan and join condition
*/
private def getUsableIndexes(
plan: LogicalPlan,
indexes: Seq[IndexLogEntry],
requiredIndexCols: Seq[String],
allRequiredCols: Seq[String]): Seq[IndexLogEntry] = {
allRequiredCols: Seq[String],
leftOrRight: String): Seq[IndexLogEntry] = {
indexes.filter { idx =>
val allCols = idx.derivedDataset.referencedColumns
// All required index columns should match one-to-one with all indexed columns and
// vice-versa. All required columns must be present in the available index columns.
withFilterReasonTag(
plan,
idx,
s"All join condition column should be the indexed columns. " +
s"Join columns: [${requiredIndexCols
.mkString(",")}], Indexed columns: [${idx.indexedColumns.mkString(",")}]") {
FilterReasons.apply(
FilterReasonCode.NOT_ALL_JOIN_COL_INDEXED,
Seq(
("child", leftOrRight),
("joinCols", requiredIndexCols.mkString(", ")),
("indexedCols", idx.indexedColumns.mkString(", "))))) {
requiredIndexCols.toSet.equals(idx.indexedColumns.toSet)
} &&
withFilterReasonTag(
plan,
idx,
s"Index does not contain all required columns. " +
s"Required columns: [${allRequiredCols.mkString(",")}], " +
s"Index columns: [${(idx.derivedDataset.referencedColumns).mkString(",")}]") {
FilterReasons.apply(
FilterReasonCode.MISSING_INDEXED_COL,
Seq(
("child", leftOrRight),
("requiredIndexedCols", allRequiredCols.mkString(", ")),
("IndexedCols", idx.indexedColumns.mkString(", "))))) {
allRequiredCols.forall(allCols.contains)
}
}
Expand Down Expand Up @@ -523,7 +530,7 @@ object JoinRankFilter extends IndexRankFilter {
setFilterReasonTag(
plan,
indexes.head._2 ++ indexes.last._2,
"No compatible left and right index pair.")
FilterReasons.apply(FilterReasonCode.NO_COMPATIBLE_JOIN_INDEX_PAIR, Seq.empty))
Map.empty
}
}
Expand Down
Loading

0 comments on commit 63bc1c7

Please sign in to comment.