Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

Introduce whyNot API #449

Merged
merged 6 commits into from
Aug 3, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions src/main/scala/com/microsoft/hyperspace/Hyperspace.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import org.apache.spark.sql.{DataFrame, SparkSession}

import com.microsoft.hyperspace.index._
import com.microsoft.hyperspace.index.IndexConstants.{OPTIMIZE_MODE_QUICK, REFRESH_MODE_FULL}
import com.microsoft.hyperspace.index.plananalysis.PlanAnalyzer
import com.microsoft.hyperspace.index.plananalysis.{CandidateIndexAnalyzer, PlanAnalyzer}
import com.microsoft.hyperspace.index.rules.ApplyHyperspace
import com.microsoft.hyperspace.index.sources.FileBasedSourceProviderManager

Expand Down Expand Up @@ -150,7 +150,7 @@ class Hyperspace(spark: SparkSession) {
}

/**
* Explains how indexes will be applied to the given dataframe.
* Explain how indexes will be applied to the given dataframe.
*
* @param df dataFrame.
* @param redirectFunc optional function to redirect output of explain.
Expand All @@ -171,6 +171,25 @@ class Hyperspace(spark: SparkSession) {
indexManager.index(indexName)
}

sezruby marked this conversation as resolved.
Show resolved Hide resolved
/**
* Explain why indexes are not applied to the given dataframe.
*
* @param df Dataframe
* @param indexName Optional index name to filter out the output
* @param extended If true, print more verbose messages.
* @param redirectFunc Optional function to redirect output
*/
def whyNot(df: DataFrame, indexName: String = "", extended: Boolean = false)(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indexName: Option[String] would make a better interface.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But this way, users need to add "Some(" . . which does not look intuitive

hyperspace.whyNot(query(leftDf, rightDf)(), Some("leftDfJoinIndex"), extended = true)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. By the way, how about supporting multiple index names? indexNames: Seq[String] = Nil

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could add additional APIs later on demand - like returning DF instead of printing

implicit redirectFunc: String => Unit = print): Unit = {
withHyperspaceRuleDisabled {
if (indexName.nonEmpty) {
redirectFunc(CandidateIndexAnalyzer.whyNotIndexString(spark, df, indexName, extended))
} else {
redirectFunc(CandidateIndexAnalyzer.whyNotIndexesString(spark, df, extended))
}
}
}

private def withHyperspaceRuleDisabled(f: => Unit): Unit = {
try {
ApplyHyperspace.disableForIndexMaintenance.set(true)
Expand Down
13 changes: 13 additions & 0 deletions src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala
Original file line number Diff line number Diff line change
Expand Up @@ -527,10 +527,23 @@ case class IndexLogEntry(
tags.get((plan, tag)).map(_.asInstanceOf[T])
}

def getTagValuesForAllPlan[T](tag: IndexLogEntryTag[T]): Seq[(LogicalPlan, T)] = {
tags.filter(entry => entry._1._2.equals(tag)).toSeq.map { case (k, v) =>
clee704 marked this conversation as resolved.
Show resolved Hide resolved
(k._1, v.asInstanceOf[T])
}
}

def unsetTagValue[T](plan: LogicalPlan, tag: IndexLogEntryTag[T]): Unit = {
tags.remove((plan, tag))
}

def unsetTagValueForAllPlan[T](tag: IndexLogEntryTag[T]): Unit = {
val plansWithTag = tags.keys.filter(_._2.name.equals(tag.name)).map(_._1)
plansWithTag.foreach { plan =>
tags.remove((plan, tag))
}
}

def withCachedTag[T](plan: LogicalPlan, tag: IndexLogEntryTag[T])(f: => T): T = {
getTagValue(plan, tag) match {
case Some(v) => v
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package com.microsoft.hyperspace.index

import org.apache.spark.sql.execution.datasources.InMemoryFileIndex

import com.microsoft.hyperspace.index.plananalysis.FilterReason

object IndexLogEntryTags {
// HYBRIDSCAN_REQUIRED indicates if Hybrid Scan is required for the index or not.
val HYBRIDSCAN_REQUIRED: IndexLogEntryTag[Boolean] =
Expand Down Expand Up @@ -55,10 +57,15 @@ object IndexLogEntryTags {
IndexLogEntryTag[InMemoryFileIndex]("inMemoryFileIndexHybridScanAppended")

// FILTER_REASONS stores reason strings for disqualification.
val FILTER_REASONS: IndexLogEntryTag[Seq[String]] =
IndexLogEntryTag[Seq[String]]("filterReasons")
val FILTER_REASONS: IndexLogEntryTag[Seq[FilterReason]] =
IndexLogEntryTag[Seq[FilterReason]]("filterReasons")

// APPLIED_INDEX_RULES stores rule's names can apply the index to the plan.
val APPLICABLE_INDEX_RULES: IndexLogEntryTag[Seq[String]] =
IndexLogEntryTag[Seq[String]]("applicableIndexRules")

// FILTER_REASONS_ENABLED indicates whether whyNotAPI is enabled or not.
val FILTER_REASONS_ENABLED: IndexLogEntryTag[Boolean] =
IndexLogEntryTag[Boolean]("filterReasonsEnabled")
// If it's enabled, FILTER_REASONS and APPLIED_INDEX_RULES info will be tagged.
val INDEX_PLAN_ANALYSIS_ENABLED: IndexLogEntryTag[Boolean] =
IndexLogEntryTag[Boolean]("indexPlanAnalysisEnabled")
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{Filter, LeafNode, LogicalPla

import com.microsoft.hyperspace.{ActiveSparkSession, Hyperspace}
import com.microsoft.hyperspace.index.IndexLogEntryTags
import com.microsoft.hyperspace.index.plananalysis.FilterReasons
import com.microsoft.hyperspace.index.rules.{HyperspaceRule, IndexRankFilter, QueryPlanIndexFilter}
import com.microsoft.hyperspace.index.rules.ApplyHyperspace.{PlanToIndexesMap, PlanToSelectedIndexMap}
import com.microsoft.hyperspace.index.sources.FileBasedRelation
Expand Down Expand Up @@ -96,17 +97,19 @@ object FilterColumnFilter extends QueryPlanIndexFilter {
withFilterReasonTag(
plan,
index,
"The first indexed column should be in filter condition columns.") {
FilterReasons.NoFirstIndexedColCond(
index.derivedDataset.indexedColumns.head,
filterColumnNames.mkString(","))) {
ResolverUtils
.resolve(spark, index.derivedDataset.indexedColumns.head, filterColumnNames)
.isDefined
} &&
withFilterReasonTag(
plan,
index,
"Index does not contain required columns. Required columns: " +
s"[${(filterColumnNames ++ projectColumnNames).mkString(",")}], Indexed & " +
s"included columns: [${(index.derivedDataset.referencedColumns).mkString(",")}]") {
FilterReasons.MissingRequiredCol(
(filterColumnNames ++ projectColumnNames).toSet.mkString(","),
index.derivedDataset.referencedColumns.mkString(","))) {
ResolverUtils
.resolve(
spark,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
import com.microsoft.hyperspace.Hyperspace
import com.microsoft.hyperspace.index.{IndexLogEntry, IndexLogEntryTags}
import com.microsoft.hyperspace.index.covering.JoinAttributeFilter.extractConditions
import com.microsoft.hyperspace.index.plananalysis.FilterReasons
import com.microsoft.hyperspace.index.rules.{HyperspaceRule, IndexRankFilter, QueryPlanIndexFilter}
import com.microsoft.hyperspace.index.rules.ApplyHyperspace.{PlanToIndexesMap, PlanToSelectedIndexMap}
import com.microsoft.hyperspace.index.sources.FileBasedRelation
Expand Down Expand Up @@ -65,17 +66,22 @@ object JoinPlanNodeFilter extends QueryPlanIndexFilter {
val joinConditionCond = withFilterReasonTag(
plan,
leftAndRightIndexes,
"Join condition is not eligible. Equi-Joins in simple CNF form are supported. " +
"Literal is not supported.") {
FilterReasons.NotEligibleJoin("Non equi-join or has literal")) {
isJoinConditionSupported(condition)
}

val leftPlanLinearCond =
withFilterReasonTag(plan, leftAndRightIndexes, "Left child is not a linear plan.") {
withFilterReasonTag(
plan,
leftAndRightIndexes,
FilterReasons.NotEligibleJoin("Non linear left child plan")) {
isPlanLinear(l)
}
val rightPlanLinearCond =
withFilterReasonTag(plan, leftAndRightIndexes, "Right child is not a linear plan.") {
withFilterReasonTag(
plan,
leftAndRightIndexes,
FilterReasons.NotEligibleJoin("Non linear right child plan")) {
isPlanLinear(r)
}

Expand All @@ -96,7 +102,7 @@ object JoinPlanNodeFilter extends QueryPlanIndexFilter {
setFilterReasonTag(
plan,
candidateIndexes.values.flatten.toSeq,
"Not eligible Join - no join condition.")
FilterReasons.NotEligibleJoin("No join condition"))
Map.empty
case _ =>
Map.empty
Expand Down Expand Up @@ -164,10 +170,7 @@ object JoinAttributeFilter extends QueryPlanIndexFilter {
if (withFilterReasonTag(
plan,
candidateIndexes.head._2 ++ candidateIndexes.last._2,
"Each join condition column should come from " +
"relations directly and attributes from left plan must exclusively have " +
"one-to-one mapping with attributes from right plan. " +
"E.g. join(A = B and A = D) is not eligible.") {
FilterReasons.NotEligibleJoin("incompatible left and right join columns")) {
ensureAttributeRequirements(
JoinIndexRule.leftRelation.get,
JoinIndexRule.rightRelation.get,
Expand Down Expand Up @@ -329,39 +332,35 @@ object JoinColumnFilter extends QueryPlanIndexFilter {
val lRequiredAllCols = resolve(spark, allRequiredCols(l), lBaseAttrs).get
val rRequiredAllCols = resolve(spark, allRequiredCols(r), rBaseAttrs).get

// Make sure required indexed columns are subset of all required columns.
assert(
resolve(spark, lRequiredIndexedCols, lRequiredAllCols).isDefined &&
resolve(spark, rRequiredIndexedCols, rRequiredAllCols).isDefined)

val lIndexes =
getUsableIndexes(
plan,
candidateIndexes.getOrElse(leftRelation.plan, Nil),
lRequiredIndexedCols,
lRequiredAllCols,
"left")
val rIndexes =
getUsableIndexes(
plan,
candidateIndexes.getOrElse(rightRelation.plan, Nil),
rRequiredIndexedCols,
rRequiredAllCols,
"right")

if (withFilterReasonTag(
plan,
candidateIndexes.head._2 ++ candidateIndexes.last._2,
"Invalid query plan.") {
// Make sure required indexed columns are subset of all required columns.
resolve(spark, lRequiredIndexedCols, lRequiredAllCols).isDefined &&
resolve(spark, rRequiredIndexedCols, rRequiredAllCols).isDefined
}) {
val lIndexes =
getUsableIndexes(
plan,
candidateIndexes.getOrElse(leftRelation.plan, Nil),
lRequiredIndexedCols,
lRequiredAllCols)
val rIndexes =
getUsableIndexes(
plan,
candidateIndexes.getOrElse(rightRelation.plan, Nil),
rRequiredIndexedCols,
rRequiredAllCols)

if (withFilterReasonTag(
plan,
candidateIndexes.head._2 ++ candidateIndexes.last._2,
"No available indexes for left subplan.")(lIndexes.nonEmpty) &&
withFilterReasonTag(
plan,
candidateIndexes.head._2 ++ candidateIndexes.last._2,
"No available indexes for right subplan.")(rIndexes.nonEmpty)) {
Map(leftRelation.plan -> lIndexes, rightRelation.plan -> rIndexes)
} else {
Map.empty
}
FilterReasons.NoAvailJoinIndexPair("left"))(lIndexes.nonEmpty) &&
withFilterReasonTag(
plan,
candidateIndexes.head._2 ++ candidateIndexes.last._2,
FilterReasons.NoAvailJoinIndexPair("right"))(rIndexes.nonEmpty)) {
Map(leftRelation.plan -> lIndexes, rightRelation.plan -> rIndexes)
} else {
Map.empty
}
Expand Down Expand Up @@ -421,25 +420,28 @@ object JoinColumnFilter extends QueryPlanIndexFilter {
plan: LogicalPlan,
indexes: Seq[IndexLogEntry],
requiredIndexCols: Seq[String],
allRequiredCols: Seq[String]): Seq[IndexLogEntry] = {
allRequiredCols: Seq[String],
leftOrRight: String): Seq[IndexLogEntry] = {
indexes.filter { idx =>
val allCols = idx.derivedDataset.referencedColumns
// All required index columns should match one-to-one with all indexed columns and
// vice-versa. All required columns must be present in the available index columns.
withFilterReasonTag(
plan,
idx,
s"All join condition column should be the indexed columns. " +
s"Join columns: [${requiredIndexCols
.mkString(",")}], Indexed columns: [${idx.indexedColumns.mkString(",")}]") {
FilterReasons.NotAllJoinColIndexed(
leftOrRight,
requiredIndexCols.mkString(","),
idx.indexedColumns.mkString(","))) {
requiredIndexCols.toSet.equals(idx.indexedColumns.toSet)
} &&
withFilterReasonTag(
plan,
idx,
s"Index does not contain all required columns. " +
s"Required columns: [${allRequiredCols.mkString(",")}], " +
s"Index columns: [${(idx.derivedDataset.referencedColumns).mkString(",")}]") {
FilterReasons.MissingIndexedCol(
leftOrRight,
allRequiredCols.mkString(","),
idx.indexedColumns.mkString(","))) {
allRequiredCols.forall(allCols.contains)
}
}
Expand Down Expand Up @@ -523,7 +525,7 @@ object JoinRankFilter extends IndexRankFilter {
setFilterReasonTag(
plan,
indexes.head._2 ++ indexes.last._2,
"No compatible left and right index pair.")
FilterReasons.NoCompatibleJoinIndexPair())
Map.empty
}
}
Expand Down
Loading