From 63bc1c76b536f346a55912fdaeeeebc418418323 Mon Sep 17 00:00:00 2001 From: EJ Song Date: Mon, 12 Jul 2021 09:58:46 -0700 Subject: [PATCH] Introduce whyNot API --- .../com/microsoft/hyperspace/Hyperspace.scala | 13 +- .../hyperspace/index/IndexLogEntry.scala | 13 + .../hyperspace/index/IndexLogEntryTags.scala | 15 +- .../index/covering/FilterIndexRule.scala | 15 +- .../index/covering/JoinIndexRule.scala | 127 ++++--- .../plananalysis/CandidateIndexAnalyzer.scala | 342 ++++++++++++++++++ .../index/plananalysis/FilterReason.scala | 115 ++++++ .../index/plananalysis/FilterReasonCode.scala | 40 ++ .../index/plananalysis/PlanAnalyzer.scala | 4 + .../index/rules/ColumnSchemaFilter.scala | 9 +- .../index/rules/FileSignatureFilter.scala | 37 +- .../index/rules/HyperspaceRule.scala | 13 + .../hyperspace/index/rules/IndexFilter.scala | 32 +- .../index/rules/IndexRankFilter.scala | 5 +- .../index/covering/FilterIndexRuleTest.scala | 29 +- .../index/covering/JoinIndexRuleTest.scala | 79 ++-- .../index/plananalysis/ExplainTest.scala | 2 +- .../rules/CandidateIndexCollectorTest.scala | 49 +-- .../ScoreBasedIndexPlanOptimizerTest.scala | 4 + 19 files changed, 774 insertions(+), 169 deletions(-) create mode 100644 src/main/scala/com/microsoft/hyperspace/index/plananalysis/CandidateIndexAnalyzer.scala create mode 100644 src/main/scala/com/microsoft/hyperspace/index/plananalysis/FilterReason.scala create mode 100644 src/main/scala/com/microsoft/hyperspace/index/plananalysis/FilterReasonCode.scala diff --git a/src/main/scala/com/microsoft/hyperspace/Hyperspace.scala b/src/main/scala/com/microsoft/hyperspace/Hyperspace.scala index 94f2317d0..fe1c55377 100644 --- a/src/main/scala/com/microsoft/hyperspace/Hyperspace.scala +++ b/src/main/scala/com/microsoft/hyperspace/Hyperspace.scala @@ -20,7 +20,7 @@ import org.apache.spark.sql.{DataFrame, SparkSession} import com.microsoft.hyperspace.index._ import com.microsoft.hyperspace.index.IndexConstants.{OPTIMIZE_MODE_QUICK, REFRESH_MODE_FULL} -import com.microsoft.hyperspace.index.plananalysis.PlanAnalyzer +import com.microsoft.hyperspace.index.plananalysis.{CandidateIndexAnalyzer, PlanAnalyzer} import com.microsoft.hyperspace.index.rules.ApplyHyperspace import com.microsoft.hyperspace.index.sources.FileBasedSourceProviderManager @@ -171,6 +171,17 @@ class Hyperspace(spark: SparkSession) { indexManager.index(indexName) } + def whyNot(df: DataFrame, indexName: String = "", extended: Boolean = false)( + implicit redirectFunc: String => Unit = print): Unit = { + withHyperspaceRuleDisabled { + if (indexName.nonEmpty) { + redirectFunc(CandidateIndexAnalyzer.whyNotIndexString(spark, df, indexName, extended)) + } else { + redirectFunc(CandidateIndexAnalyzer.whyNotIndexesString(spark, df, extended)) + } + } + } + private def withHyperspaceRuleDisabled(f: => Unit): Unit = { try { ApplyHyperspace.disableForIndexMaintenance.set(true) diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala index 84ea9ca65..c4781dde8 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala @@ -527,10 +527,23 @@ case class IndexLogEntry( tags.get((plan, tag)).map(_.asInstanceOf[T]) } + def getTagValueForAllPlan[T](tag: IndexLogEntryTag[T]): Seq[(LogicalPlan, T)] = { + tags.filter(entry => entry._1._2.equals(tag)).toSeq.map { case (k, v) => + (k._1, v.asInstanceOf[T]) + } + } + def unsetTagValue[T](plan: LogicalPlan, tag: IndexLogEntryTag[T]): Unit = { tags.remove((plan, tag)) } + def unsetTagValueForAllPlan[T](tag: IndexLogEntryTag[T]): Unit = { + val plansWithTag = tags.keys.filter(_._2.name.equals(tag.name)).map(_._1) + plansWithTag.foreach { plan => + tags.remove((plan, tag)) + } + } + def withCachedTag[T](plan: LogicalPlan, tag: IndexLogEntryTag[T])(f: => T): T = { getTagValue(plan, tag) match { case Some(v) => v diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntryTags.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntryTags.scala index 4184696d3..ec9e071bb 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntryTags.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntryTags.scala @@ -18,6 +18,8 @@ package com.microsoft.hyperspace.index import org.apache.spark.sql.execution.datasources.InMemoryFileIndex +import com.microsoft.hyperspace.index.plananalysis.FilterReason + object IndexLogEntryTags { // HYBRIDSCAN_REQUIRED indicates if Hybrid Scan is required for the index or not. val HYBRIDSCAN_REQUIRED: IndexLogEntryTag[Boolean] = @@ -55,10 +57,15 @@ object IndexLogEntryTags { IndexLogEntryTag[InMemoryFileIndex]("inMemoryFileIndexHybridScanAppended") // FILTER_REASONS stores reason strings for disqualification. - val FILTER_REASONS: IndexLogEntryTag[Seq[String]] = - IndexLogEntryTag[Seq[String]]("filterReasons") + val FILTER_REASONS: IndexLogEntryTag[Seq[FilterReason]] = + IndexLogEntryTag[Seq[FilterReason]]("filterReasons") + + // APPLIED_INDEX_RULES stores rule's names can apply the index to the plan. + val APPLICABLE_INDEX_RULES: IndexLogEntryTag[Seq[String]] = + IndexLogEntryTag[Seq[String]]("applicableIndexRules") // FILTER_REASONS_ENABLED indicates whether whyNotAPI is enabled or not. - val FILTER_REASONS_ENABLED: IndexLogEntryTag[Boolean] = - IndexLogEntryTag[Boolean]("filterReasonsEnabled") + // If it's enabled, FILTER_REASONS and APPLIED_INDEX_RULES info will be tagged. + val INDEX_PLAN_ANALYSIS_ENABLED: IndexLogEntryTag[Boolean] = + IndexLogEntryTag[Boolean]("indexPlanAnalysisEnabled") } diff --git a/src/main/scala/com/microsoft/hyperspace/index/covering/FilterIndexRule.scala b/src/main/scala/com/microsoft/hyperspace/index/covering/FilterIndexRule.scala index 8dea6c00e..71a03f335 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/covering/FilterIndexRule.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/covering/FilterIndexRule.scala @@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{Filter, LeafNode, LogicalPla import com.microsoft.hyperspace.{ActiveSparkSession, Hyperspace} import com.microsoft.hyperspace.index.IndexLogEntryTags +import com.microsoft.hyperspace.index.plananalysis.{FilterReasonCode, FilterReasons} import com.microsoft.hyperspace.index.rules.{HyperspaceRule, IndexRankFilter, QueryPlanIndexFilter} import com.microsoft.hyperspace.index.rules.ApplyHyperspace.{PlanToIndexesMap, PlanToSelectedIndexMap} import com.microsoft.hyperspace.index.sources.FileBasedRelation @@ -96,7 +97,11 @@ object FilterColumnFilter extends QueryPlanIndexFilter { withFilterReasonTag( plan, index, - "The first indexed column should be in filter condition columns.") { + FilterReasons.apply( + FilterReasonCode.NO_FIRST_INDEXED_COL_COND, + Seq( + ("firstIndexedCol", index.derivedDataset.indexedColumns.head), + ("filterColumns", filterColumnNames.mkString(", "))))) { ResolverUtils .resolve(spark, index.derivedDataset.indexedColumns.head, filterColumnNames) .isDefined @@ -104,9 +109,11 @@ object FilterColumnFilter extends QueryPlanIndexFilter { withFilterReasonTag( plan, index, - "Index does not contain required columns. Required columns: " + - s"[${(filterColumnNames ++ projectColumnNames).mkString(",")}], Indexed & " + - s"included columns: [${(index.derivedDataset.referencedColumns).mkString(",")}]") { + FilterReasons.apply( + FilterReasonCode.MISSING_REQUIRED_COL, + Seq( + ("requiredCols", (filterColumnNames ++ projectColumnNames).toSet.mkString(",")), + ("indexCols", index.derivedDataset.referencedColumns.mkString(","))))) { ResolverUtils .resolve( spark, diff --git a/src/main/scala/com/microsoft/hyperspace/index/covering/JoinIndexRule.scala b/src/main/scala/com/microsoft/hyperspace/index/covering/JoinIndexRule.scala index 058afa230..ceb265ff8 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/covering/JoinIndexRule.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/covering/JoinIndexRule.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan} import com.microsoft.hyperspace.Hyperspace import com.microsoft.hyperspace.index.{IndexLogEntry, IndexLogEntryTags} import com.microsoft.hyperspace.index.covering.JoinAttributeFilter.extractConditions +import com.microsoft.hyperspace.index.plananalysis.{FilterReasonCode, FilterReasons} import com.microsoft.hyperspace.index.rules.{HyperspaceRule, IndexRankFilter, QueryPlanIndexFilter} import com.microsoft.hyperspace.index.rules.ApplyHyperspace.{PlanToIndexesMap, PlanToSelectedIndexMap} import com.microsoft.hyperspace.index.sources.FileBasedRelation @@ -65,17 +66,28 @@ object JoinPlanNodeFilter extends QueryPlanIndexFilter { val joinConditionCond = withFilterReasonTag( plan, leftAndRightIndexes, - "Join condition is not eligible. Equi-Joins in simple CNF form are supported. " + - "Literal is not supported.") { + FilterReasons.apply( + FilterReasonCode.NOT_ELIGIBLE_JOIN, + Seq(("reason", "Non equi-join or has literal")))) { isJoinConditionSupported(condition) } val leftPlanLinearCond = - withFilterReasonTag(plan, leftAndRightIndexes, "Left child is not a linear plan.") { + withFilterReasonTag( + plan, + leftAndRightIndexes, + FilterReasons.apply( + FilterReasonCode.NOT_ELIGIBLE_JOIN, + Seq(("reason", "Non linear left child plan")))) { isPlanLinear(l) } val rightPlanLinearCond = - withFilterReasonTag(plan, leftAndRightIndexes, "Right child is not a linear plan.") { + withFilterReasonTag( + plan, + leftAndRightIndexes, + FilterReasons.apply( + FilterReasonCode.NOT_ELIGIBLE_JOIN, + Seq(("reason", "Non linear right child plan")))) { isPlanLinear(r) } @@ -96,7 +108,9 @@ object JoinPlanNodeFilter extends QueryPlanIndexFilter { setFilterReasonTag( plan, candidateIndexes.values.flatten.toSeq, - "Not eligible Join - no join condition.") + FilterReasons.apply( + FilterReasonCode.NOT_ELIGIBLE_JOIN, + Seq(("reason", "No join condition")))) Map.empty case _ => Map.empty @@ -164,10 +178,9 @@ object JoinAttributeFilter extends QueryPlanIndexFilter { if (withFilterReasonTag( plan, candidateIndexes.head._2 ++ candidateIndexes.last._2, - "Each join condition column should come from " + - "relations directly and attributes from left plan must exclusively have " + - "one-to-one mapping with attributes from right plan. " + - "E.g. join(A = B and A = D) is not eligible.") { + FilterReasons.apply( + FilterReasonCode.NOT_ELIGIBLE_JOIN, + Seq(("reason", "incompatible left and right join columns")))) { ensureAttributeRequirements( JoinIndexRule.leftRelation.get, JoinIndexRule.rightRelation.get, @@ -329,39 +342,39 @@ object JoinColumnFilter extends QueryPlanIndexFilter { val lRequiredAllCols = resolve(spark, allRequiredCols(l), lBaseAttrs).get val rRequiredAllCols = resolve(spark, allRequiredCols(r), rBaseAttrs).get + // Make sure required indexed columns are subset of all required columns. + assert( + resolve(spark, lRequiredIndexedCols, lRequiredAllCols).isDefined && + resolve(spark, rRequiredIndexedCols, rRequiredAllCols).isDefined) + + val lIndexes = + getUsableIndexes( + plan, + candidateIndexes.getOrElse(leftRelation.plan, Nil), + lRequiredIndexedCols, + lRequiredAllCols, + "left") + val rIndexes = + getUsableIndexes( + plan, + candidateIndexes.getOrElse(rightRelation.plan, Nil), + rRequiredIndexedCols, + rRequiredAllCols, + "right") + if (withFilterReasonTag( plan, candidateIndexes.head._2 ++ candidateIndexes.last._2, - "Invalid query plan.") { - // Make sure required indexed columns are subset of all required columns. - resolve(spark, lRequiredIndexedCols, lRequiredAllCols).isDefined && - resolve(spark, rRequiredIndexedCols, rRequiredAllCols).isDefined - }) { - val lIndexes = - getUsableIndexes( - plan, - candidateIndexes.getOrElse(leftRelation.plan, Nil), - lRequiredIndexedCols, - lRequiredAllCols) - val rIndexes = - getUsableIndexes( - plan, - candidateIndexes.getOrElse(rightRelation.plan, Nil), - rRequiredIndexedCols, - rRequiredAllCols) - - if (withFilterReasonTag( - plan, - candidateIndexes.head._2 ++ candidateIndexes.last._2, - "No available indexes for left subplan.")(lIndexes.nonEmpty) && - withFilterReasonTag( - plan, - candidateIndexes.head._2 ++ candidateIndexes.last._2, - "No available indexes for right subplan.")(rIndexes.nonEmpty)) { - Map(leftRelation.plan -> lIndexes, rightRelation.plan -> rIndexes) - } else { - Map.empty - } + FilterReasons.apply( + FilterReasonCode.NO_AVAIL_JOIN_INDEX_PAIR, + Seq(("child", "left"))))(lIndexes.nonEmpty) && + withFilterReasonTag( + plan, + candidateIndexes.head._2 ++ candidateIndexes.last._2, + FilterReasons.apply( + FilterReasonCode.NO_AVAIL_JOIN_INDEX_PAIR, + Seq(("child", "right"))))(rIndexes.nonEmpty)) { + Map(leftRelation.plan -> lIndexes, rightRelation.plan -> rIndexes) } else { Map.empty } @@ -404,24 +417,12 @@ object JoinColumnFilter extends QueryPlanIndexFilter { }.toMap } - /** - * Get usable indexes which satisfy indexed and included column requirements. - * - * Pre-requisite: the indexed and included columns required must be already resolved with their - * corresponding base relation columns at this point. - * - * @param plan Query plan - * @param indexes All available indexes for the logical plan - * @param requiredIndexCols required indexed columns resolved with their base relation column. - * @param allRequiredCols required included columns resolved with their base relation column. - * @return Indexes which satisfy the indexed and covering column requirements from the logical - * plan and join condition - */ private def getUsableIndexes( plan: LogicalPlan, indexes: Seq[IndexLogEntry], requiredIndexCols: Seq[String], - allRequiredCols: Seq[String]): Seq[IndexLogEntry] = { + allRequiredCols: Seq[String], + leftOrRight: String): Seq[IndexLogEntry] = { indexes.filter { idx => val allCols = idx.derivedDataset.referencedColumns // All required index columns should match one-to-one with all indexed columns and @@ -429,17 +430,23 @@ object JoinColumnFilter extends QueryPlanIndexFilter { withFilterReasonTag( plan, idx, - s"All join condition column should be the indexed columns. " + - s"Join columns: [${requiredIndexCols - .mkString(",")}], Indexed columns: [${idx.indexedColumns.mkString(",")}]") { + FilterReasons.apply( + FilterReasonCode.NOT_ALL_JOIN_COL_INDEXED, + Seq( + ("child", leftOrRight), + ("joinCols", requiredIndexCols.mkString(", ")), + ("indexedCols", idx.indexedColumns.mkString(", "))))) { requiredIndexCols.toSet.equals(idx.indexedColumns.toSet) } && withFilterReasonTag( plan, idx, - s"Index does not contain all required columns. " + - s"Required columns: [${allRequiredCols.mkString(",")}], " + - s"Index columns: [${(idx.derivedDataset.referencedColumns).mkString(",")}]") { + FilterReasons.apply( + FilterReasonCode.MISSING_INDEXED_COL, + Seq( + ("child", leftOrRight), + ("requiredIndexedCols", allRequiredCols.mkString(", ")), + ("IndexedCols", idx.indexedColumns.mkString(", "))))) { allRequiredCols.forall(allCols.contains) } } @@ -523,7 +530,7 @@ object JoinRankFilter extends IndexRankFilter { setFilterReasonTag( plan, indexes.head._2 ++ indexes.last._2, - "No compatible left and right index pair.") + FilterReasons.apply(FilterReasonCode.NO_COMPATIBLE_JOIN_INDEX_PAIR, Seq.empty)) Map.empty } } diff --git a/src/main/scala/com/microsoft/hyperspace/index/plananalysis/CandidateIndexAnalyzer.scala b/src/main/scala/com/microsoft/hyperspace/index/plananalysis/CandidateIndexAnalyzer.scala new file mode 100644 index 000000000..ce36e771d --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/plananalysis/CandidateIndexAnalyzer.scala @@ -0,0 +1,342 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.plananalysis + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.hyperspace.utils.DataFrameUtils + +import com.microsoft.hyperspace.{Hyperspace, HyperspaceException} +import com.microsoft.hyperspace.actions.Constants +import com.microsoft.hyperspace.index.{IndexLogEntry, IndexLogEntryTags} +import com.microsoft.hyperspace.index.rules.{CandidateIndexCollector, ScoreBasedIndexPlanOptimizer} + +object CandidateIndexAnalyzer extends Logging { + def whyNotIndexString( + spark: SparkSession, + df: DataFrame, + indexName: String, + extended: Boolean): String = { + val (planWithHyperspace, filterReasons, applicableIndexes) = + collectAnalysisResult(spark, df, indexName) + generateWhyNotString( + spark, + df.queryExecution.optimizedPlan, + planWithHyperspace, + filterReasons, + applicableIndexes, + extended) + } + + def whyNotIndexesString(spark: SparkSession, df: DataFrame, extended: Boolean): String = { + val (planWithHyperspace, filterReasons, applicableIndexes) = + collectAnalysisResult(spark, df) + generateWhyNotString( + spark, + df.queryExecution.optimizedPlan, + planWithHyperspace, + filterReasons, + applicableIndexes, + extended) + } + + def applicableIndexInfoString(spark: SparkSession, df: DataFrame): String = { + val (_, applicableIndexes) = collectApplicableIndexInfo(spark, df) + generateApplicableIndexInfoString(spark, df.queryExecution.optimizedPlan, applicableIndexes) + } + + private def cleanupAnalysisTags(indexes: Seq[IndexLogEntry]): Unit = { + indexes.foreach { index => + index.unsetTagValueForAllPlan(IndexLogEntryTags.INDEX_PLAN_ANALYSIS_ENABLED) + index.unsetTagValueForAllPlan(IndexLogEntryTags.FILTER_REASONS) + index.unsetTagValueForAllPlan(IndexLogEntryTags.APPLICABLE_INDEX_RULES) + } + } + + private def prepareTagsForAnalysis(indexes: Seq[IndexLogEntry]): Unit = { + indexes.foreach { index => + index.setTagValue(IndexLogEntryTags.INDEX_PLAN_ANALYSIS_ENABLED, true) + + // Clean up previous reason tags. + index.unsetTagValueForAllPlan(IndexLogEntryTags.FILTER_REASONS) + index.unsetTagValueForAllPlan(IndexLogEntryTags.APPLICABLE_INDEX_RULES) + } + } + + private def getSubPlanLoc( + originalPlanStrings: Seq[String], + plan: LogicalPlan, + numberSpaces: Int): String = { + val planStrLines = plan.toString.split('\n') + + val firstLine = planStrLines.head + val firstLineStr = originalPlanStrings.find { s => + s.endsWith(firstLine) && + s.substring(numberSpaces, s.length - firstLine.length).distinct.forall(" +-:".contains(_)) + } + val numberRegex = raw"([0-9]+)".r + val startLineNumber = if (firstLineStr.isDefined) { + numberRegex.findFirstIn(firstLineStr.get).get.toInt + } else { + -1 + } + s"${plan.nodeName} @ $startLineNumber" + } + + private def generateApplicableIndexInfoString( + spark: SparkSession, + planWithoutHyperspace: LogicalPlan, + applicableIndexes: Seq[(IndexLogEntry, Seq[(LogicalPlan, Seq[String])])]): String = { + val stringBuilder = new StringBuilder + val originalPlanString = planWithoutHyperspace.numberedTreeString.split('\n') + if (applicableIndexes.isEmpty) { + return "No applicable indexes. Try hyperspace.whyNot()" + } + val newLine = System.lineSeparator() + stringBuilder.append("Plan without Hyperspace:") + stringBuilder.append(newLine) + stringBuilder.append(newLine) + stringBuilder.append(originalPlanString.mkString(newLine)) + stringBuilder.append(newLine) + stringBuilder.append(newLine) + + // to Dataframe + // sub plan line number, index name, rule name + val numberSpaces = originalPlanString.length.toString.length + 1 + val res = applicableIndexes + .flatMap { + case (index: IndexLogEntry, applicableRules: Seq[(LogicalPlan, Seq[String])]) => + applicableRules.flatMap { + case (plan, ruleNames) => + val subPlanLocStr = getSubPlanLoc(originalPlanString, plan, numberSpaces) + ruleNames.map { ruleName => + (subPlanLocStr, index.name, index.derivedDataset.kindAbbr, ruleName) + } + } + } + .sortBy(r => (r._1, r._3)) + .distinct + + import spark.implicits._ + val df = res.toDF("SubPlan", "IndexName", "IndexType", "RuleName") + + // '\n' is used in `showString` as a delimiter for newlines. + df.showString(res.size, truncate = 0).split('\n').foreach { l => + stringBuilder.append(l) + stringBuilder.append(newLine) + } + stringBuilder.toString + } + + private def generateWhyNotString( + spark: SparkSession, + planWithoutHyperspace: LogicalPlan, + planWithHyperspace: LogicalPlan, + filterReasons: Seq[(IndexLogEntry, Seq[(LogicalPlan, Seq[FilterReason])])], + applicableIndexes: Seq[(IndexLogEntry, Seq[(LogicalPlan, Seq[String])])], + extended: Boolean = false): String = { + val stringBuilder = new StringBuilder + val originalPlanString = planWithoutHyperspace.numberedTreeString.split('\n') + val newLine = System.lineSeparator() + stringBuilder.append(newLine) + stringBuilder.append("=============================================================") + stringBuilder.append(newLine) + stringBuilder.append("Plan with Hyperspace & Summary:") + stringBuilder.append(newLine) + stringBuilder.append("=============================================================") + stringBuilder.append(newLine) + stringBuilder.append(planWithHyperspace.toString) + stringBuilder.append(newLine) + stringBuilder.append(newLine) + + def printIndexNames(indexNames: Seq[String]): Unit = { + indexNames.foreach { idxName => + stringBuilder.append(s"- $idxName") + stringBuilder.append(newLine) + } + if (indexNames.isEmpty) { + stringBuilder.append("- No such index found.") + } + stringBuilder.append(newLine) + } + + stringBuilder.append("Applied indexes:") + stringBuilder.append(newLine) + val indexNameRegex = "Hyperspace\\(Type: .+, Name: (.+), LogVersion: [0-9]+\\)".r + val appliedIndexNames = indexNameRegex + .findAllIn(planWithHyperspace.toString) + .matchData + .map { matchStr => + matchStr.group(1) + } + .toSeq + printIndexNames(appliedIndexNames) + + stringBuilder.append("Applicable indexes, but not applied due to priority:") + stringBuilder.append(newLine) + val applicableButNotAppliedIndexNames = + applicableIndexes.map(_._1.name).distinct.filterNot(appliedIndexNames.contains(_)) + printIndexNames(applicableButNotAppliedIndexNames) + + // Covert reasons to Dataframe rows + // (sub plan location string, index name, index type, reason code, arg strings, verbose string) + val numberSpaces = originalPlanString.length.toString.length + 1 + val res = filterReasons + .flatMap { + case (index: IndexLogEntry, reasonsForIndex: Seq[(LogicalPlan, Seq[FilterReason])]) => + reasonsForIndex.flatMap { + case (plan, reasons) => + val subPlanLocStr = getSubPlanLoc(originalPlanString, plan, numberSpaces) + reasons.map { reason => + ( + subPlanLocStr, + index.name, + index.derivedDataset.kindAbbr, + reason.code, + reason.argStr, + reason.verboseStr) + } + } + } + .sortBy(r => (r._1, r._3)) + .distinct + + import spark.implicits._ + val df = res.toDF("SubPlan", "IndexName", "IndexType", "Reason", "Message", "VerboseMessage") + + val outputDf = if (extended) { + df + } else { + df.drop("VerboseMessage") + .filter(!$"Reason".like("COL_SCHEMA_MISMATCH")) + } + + stringBuilder.append("Non-applicable indexes - index is outdated:") + stringBuilder.append(newLine) + val indexNamesForOutdated = res + .filter(row => row._5.equals("FileSignatureFilter")) + .map(_._2) + .distinct + .filterNot(appliedIndexNames.contains(_)) + .filterNot(applicableButNotAppliedIndexNames.contains(_)) + printIndexNames(indexNamesForOutdated) + + stringBuilder.append(newLine) + stringBuilder.append("Non-applicable indexes - no applicable query plan:") + stringBuilder.append(newLine) + val indexNamesForNoApplicablePlan = res + .filterNot(row => + row._5.equals("ColumnSchemaFilter") || row._5.equals("FileSignatureFilter")) + .map(_._2) + .distinct + .filterNot(appliedIndexNames.contains(_)) + .filterNot(applicableButNotAppliedIndexNames.contains(_)) + printIndexNames(indexNamesForNoApplicablePlan) + stringBuilder.append(newLine) + + stringBuilder.append("For more information, please visit: ") + stringBuilder.append("https://microsoft.github.io/hyperspace/docs/why-not-result-analysis") + + stringBuilder.append(newLine) + stringBuilder.append(newLine) + stringBuilder.append("=============================================================") + stringBuilder.append(newLine) + stringBuilder.append("Plan without Hyperspace & WhyNot reasons:") + stringBuilder.append(newLine) + stringBuilder.append("=============================================================") + stringBuilder.append(newLine) + stringBuilder.append(originalPlanString.mkString(newLine)) + stringBuilder.append(newLine) + stringBuilder.append(newLine) + + // '\n' is used in `showString` as a delimiter for newlines. + outputDf.showString(res.size, truncate = 0).split('\n').foreach { l => + stringBuilder.append(l) + stringBuilder.append(newLine) + } + stringBuilder.toString + } + + private def collectApplicableIndexInfo( + spark: SparkSession, + df: DataFrame): (LogicalPlan, Seq[(IndexLogEntry, Seq[(LogicalPlan, Seq[String])])]) = { + val (transformedPlan, _, applicableIndexInfo) = collectAnalysisResult(spark, df) + (transformedPlan, applicableIndexInfo) + } + + private def collectAnalysisResult(spark: SparkSession, df: DataFrame): ( + LogicalPlan, + Seq[(IndexLogEntry, Seq[(LogicalPlan, Seq[FilterReason])])], + Seq[(IndexLogEntry, Seq[(LogicalPlan, Seq[String])])]) = { + val plan = df.queryExecution.optimizedPlan + + val indexManager = Hyperspace.getContext(spark).indexCollectionManager + val allActiveIndexes = indexManager.getIndexes(Seq(Constants.States.ACTIVE)) + + if (allActiveIndexes.isEmpty) { + throw HyperspaceException(s"No available ACTIVE indexes") + } + applyHyperspaceForAnalysis(plan, allActiveIndexes) + } + + private def collectAnalysisResult(spark: SparkSession, df: DataFrame, indexName: String): ( + LogicalPlan, + Seq[(IndexLogEntry, Seq[(LogicalPlan, Seq[FilterReason])])], + Seq[(IndexLogEntry, Seq[(LogicalPlan, Seq[String])])]) = { + val plan = df.queryExecution.optimizedPlan + + val indexManager = Hyperspace.getContext(spark).indexCollectionManager + val allIndexes = + indexManager.getIndexes().filter(!_.state.equals(Constants.States.DOESNOTEXIST)) + val targetIndex = allIndexes.find(index => index.name.equals(indexName)) + + if (targetIndex.isEmpty) { + throw HyperspaceException(s"Index with name $indexName could not be found.") + } else if (!targetIndex.get.state.equals(Constants.States.ACTIVE)) { + throw HyperspaceException( + s"Index with name $indexName is not ACTIVE state: ${targetIndex.get.state}") + } + + val allActiveIndexes = allIndexes.filter(_.state.equals(Constants.States.ACTIVE)) + applyHyperspaceForAnalysis(plan, allActiveIndexes, Some(indexName)) + } + + private def applyHyperspaceForAnalysis( + plan: LogicalPlan, + indexes: Seq[IndexLogEntry], + indexName: Option[String] = None): ( + LogicalPlan, + Seq[(IndexLogEntry, Seq[(LogicalPlan, Seq[FilterReason])])], + Seq[(IndexLogEntry, Seq[(LogicalPlan, Seq[String])])]) = { + try { + prepareTagsForAnalysis(indexes) + val candidateIndexes = CandidateIndexCollector.apply(plan, indexes) + val transformedPlan = new ScoreBasedIndexPlanOptimizer().apply(plan, candidateIndexes) + ( + transformedPlan, + indexes + .filter(i => indexName.isEmpty || indexName.get.equals(i.name)) + .map(i => (i, i.getTagValueForAllPlan(IndexLogEntryTags.FILTER_REASONS))), + indexes + .map(i => (i, i.getTagValueForAllPlan(IndexLogEntryTags.APPLICABLE_INDEX_RULES)))) + } finally { + cleanupAnalysisTags(indexes) + } + } + +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/plananalysis/FilterReason.scala b/src/main/scala/com/microsoft/hyperspace/index/plananalysis/FilterReason.scala new file mode 100644 index 000000000..f723464d3 --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/plananalysis/FilterReason.scala @@ -0,0 +1,115 @@ +/* + * Copyright (2020) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.plananalysis + +import com.microsoft.hyperspace.index.plananalysis.FilterReasonCode._ + +class FilterReason( + val code: String, + argStrings: => Seq[(String, String)], + verboseString: => String) { + + def argStr: String = { + // key1=[value1], key2=[value2] + argStrings.map(kv => s"${kv._1}=[${kv._2}]").mkString(",") + } + + def verboseStr: String = { + verboseString + } +} + +object FilterReasons { + def apply(code: FilterReasonCode, argStrings: => Seq[(String, String)]): FilterReason = { + code match { + case COL_SCHEMA_MISMATCH => + new FilterReason( + code.toString, + argStrings, + "Column Schema does not match. Source data columns: [" + argStrings(0)._2 + + "], Index columns: [" + argStrings(1)._2) + case SOURCE_DATA_CHANGE => + new FilterReason(code.toString, argStrings, "Index signature does not match.") + case NO_DELETE_SUPPORT => + new FilterReason(code.toString, argStrings, "Index doesn't support deleted files.") + case NO_COMMON_FILES => + new FilterReason(code.toString, argStrings, "No common files.") + case TOO_MUCH_APPENDED => + new FilterReason( + code.toString, + argStrings, + s"Appended bytes ratio (${argStrings(0)._2}) is larger than " + + s"threshold config ${argStrings(1)._2}). ") + case TOO_MUCH_DELETED => + new FilterReason( + code.toString, + argStrings, + s"Deleted bytes ratio (${argStrings(0)._2}) is larger than " + + s"threshold config ${argStrings(1)._2}). ") + case NO_FIRST_INDEXED_COL_COND => + new FilterReason( + code.toString, + argStrings, + "The first indexed column should be used in filter conditions. " + + s"The first indexed column: ${argStrings(0)._2}, " + + s"Columns in filter condition: [${argStrings(1)._2}]") + case MISSING_REQUIRED_COL => + new FilterReason( + code.toString, + argStrings, + s"Index does not contain required column. Required columns: [${argStrings(0)._2}], " + + s"Index columns: [${argStrings(1)._2}]") + case NOT_ELIGIBLE_JOIN => + new FilterReason( + code.toString, + argStrings, + s"Join condition is not eligible. Reason: ${argStrings(0)._2}") + case NO_AVAIL_JOIN_INDEX_PAIR => + new FilterReason( + code.toString, + argStrings, + s"No available indexes for ${argStrings(0)._2} subplan. " + + "Both left and right index are required for Join query") + case NOT_ALL_JOIN_COL_INDEXED => + new FilterReason( + code.toString, + argStrings, + s"All join condition column should be the indexed columns. " + + s"Join columns: [${argStrings(0)._2}], Indexed columns: [${argStrings(1)._2}]") + case MISSING_INDEXED_COL => + new FilterReason( + code.toString, + argStrings, + s"Index does not contain required columns for ${argStrings(0)._2} subplan. " + + s"Required indexed columns: [${argStrings(1)._2}], " + + s"Indexed columns: [${argStrings(2)._2}]") + case NO_COMPATIBLE_JOIN_INDEX_PAIR => + new FilterReason( + code.toString, + argStrings, + "No compatible left and right index pair." + ) + case ANOTHER_INDEX_APPLIED => + new FilterReason( + code.toString, + argStrings, + s"Another candidate index is applied: ${argStrings(0)._2}" + ) + + } + } +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/plananalysis/FilterReasonCode.scala b/src/main/scala/com/microsoft/hyperspace/index/plananalysis/FilterReasonCode.scala new file mode 100644 index 000000000..7d1a437c7 --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/plananalysis/FilterReasonCode.scala @@ -0,0 +1,40 @@ +/* + * Copyright (2020) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.plananalysis + +object FilterReasonCode extends Enumeration { + type FilterReasonCode = Value + + val COL_SCHEMA_MISMATCH = Value("COL_SCHEMA_MISMATCH") + val SOURCE_DATA_CHANGE = Value("SOURCE_DATA_CHANGE") + val NO_DELETE_SUPPORT = Value("NO_DELETE_SUPPORT") + val NO_COMMON_FILES = Value("NO_COMMON_FILES") + val TOO_MUCH_APPENDED = Value("TOO_MUCH_APPENDED") + val TOO_MUCH_DELETED = Value("TOO_MUCH_DELETED") + val ANOTHER_INDEX_APPLIED = Value("ANOTHER_INDEX_APPLIED") + + // CoveringIndex - FilterIndexRule + val NO_FIRST_INDEXED_COL_COND = Value("NO_FIRST_INDEXED_COL_COND") + val MISSING_REQUIRED_COL = Value("MISSING_REQUIRED_COL") + + // CoveringIndex - JoinIndexRule + val NOT_ELIGIBLE_JOIN = Value("NOT_ELIGIBLE_JOIN") + val NO_AVAIL_JOIN_INDEX_PAIR = Value("NO_AVAIL_INDEX_PAIR") + val NO_COMPATIBLE_JOIN_INDEX_PAIR = Value("NO_COMPATIBLE_INDEX_PAIR") + val NOT_ALL_JOIN_COL_INDEXED = Value("NOT_ALL_JOIN_COL_INDEXED") + val MISSING_INDEXED_COL = Value("MISSING_INDEXED_COL") +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/plananalysis/PlanAnalyzer.scala b/src/main/scala/com/microsoft/hyperspace/index/plananalysis/PlanAnalyzer.scala index 3fbe2b132..a9336bf96 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/plananalysis/PlanAnalyzer.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/plananalysis/PlanAnalyzer.scala @@ -125,6 +125,10 @@ object PlanAnalyzer { outputStream, spark) outputStream.writeLine() + + buildHeader(outputStream, "Applicable indexes:") + outputStream.write(CandidateIndexAnalyzer.applicableIndexInfoString(spark, df)) + outputStream.writeLine() } outputStream.withTag() diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/ColumnSchemaFilter.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/ColumnSchemaFilter.scala index b18ae27f6..ec2427587 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/ColumnSchemaFilter.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/ColumnSchemaFilter.scala @@ -19,6 +19,7 @@ package com.microsoft.hyperspace.index.rules import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import com.microsoft.hyperspace.index.IndexLogEntry +import com.microsoft.hyperspace.index.plananalysis.{FilterReasonCode, FilterReasons} import com.microsoft.hyperspace.util.ResolverUtils /** @@ -32,9 +33,11 @@ object ColumnSchemaFilter extends SourcePlanIndexFilter { withFilterReasonTag( plan, index, - "Column Schema does not match. " + - s"Relation columns: [${relationColumnNames.mkString(", ")}], " + - s"Index columns: [${(index.derivedDataset.referencedColumns).mkString(", ")}]") { + FilterReasons.apply( + FilterReasonCode.COL_SCHEMA_MISMATCH, + Seq( + ("sourceColumns", relationColumnNames.mkString(", ")), + ("indexColumns", index.derivedDataset.referencedColumns.mkString(", "))))) { ResolverUtils .resolve(spark, index.derivedDataset.referencedColumns, relationColumnNames) .isDefined diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/FileSignatureFilter.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/FileSignatureFilter.scala index df9733d6f..6eff030c1 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/FileSignatureFilter.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/FileSignatureFilter.scala @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import com.microsoft.hyperspace.Hyperspace import com.microsoft.hyperspace.index.{IndexLogEntry, IndexLogEntryTags, LogicalPlanSignatureProvider} import com.microsoft.hyperspace.index.IndexLogEntryTags.{HYBRIDSCAN_RELATED_CONFIGS, IS_HYBRIDSCAN_CANDIDATE} +import com.microsoft.hyperspace.index.plananalysis.{FilterReasonCode, FilterReasons} import com.microsoft.hyperspace.index.sources.FileBasedRelation import com.microsoft.hyperspace.util.HyperspaceConf @@ -62,7 +63,7 @@ object FileSignatureFilter extends SourcePlanIndexFilter { withFilterReasonTag( plan, index, - s"Index signature does not match. Try Hybrid Scan or refreshIndex.") { + FilterReasons.apply(FilterReasonCode.SOURCE_DATA_CHANGE, Seq.empty)) { signatureValid(relation, index, signatureMap) } } @@ -133,22 +134,40 @@ object FileSignatureFilter extends SourcePlanIndexFilter { // Tag to original index log entry to check the reason string with the given log entry. lazy val hasLineageColumnCond = - withFilterReasonTag(relation.plan, index, "Index doesn't support deleted files.")( - entry.derivedDataset.canHandleDeletedFiles) + withFilterReasonTag( + relation.plan, + index, + FilterReasons.apply(FilterReasonCode.NO_DELETE_SUPPORT, Seq.empty)) { + entry.derivedDataset.canHandleDeletedFiles + } lazy val hasCommonFilesCond = - withFilterReasonTag(relation.plan, index, "No common files.")(commonCnt > 0) + withFilterReasonTag( + relation.plan, + index, + FilterReasons.apply(FilterReasonCode.NO_COMMON_FILES, Seq.empty)) { + commonCnt > 0 + } + + val hybridScanAppendThreshold = HyperspaceConf.hybridScanAppendedRatioThreshold(spark) + val hybridScanDeleteThreshold = HyperspaceConf.hybridScanDeletedRatioThreshold(spark) lazy val appendThresholdCond = withFilterReasonTag( relation.plan, index, - s"Appended bytes ratio ($appendedBytesRatio) is larger than " + - s"threshold config ${HyperspaceConf.hybridScanAppendedRatioThreshold(spark)}") { - appendedBytesRatio < HyperspaceConf.hybridScanAppendedRatioThreshold(spark) + FilterReasons.apply( + FilterReasonCode.TOO_MUCH_APPENDED, + Seq( + ("appendedRatio", appendedBytesRatio.toString), + ("hybridScanAppendThreshold", hybridScanAppendThreshold.toString)))) { + appendedBytesRatio < hybridScanAppendThreshold } lazy val deleteThresholdCond = withFilterReasonTag( relation.plan, index, - s"Deleted bytes ratio ($deletedBytesRatio) is larger than " + - s"threshold config ${HyperspaceConf.hybridScanDeletedRatioThreshold(spark)}") { + FilterReasons.apply( + FilterReasonCode.TOO_MUCH_DELETED, + Seq( + ("deletedRatio", deletedBytesRatio.toString), + ("hybridScanDeleteThreshold", hybridScanDeleteThreshold.toString)))) { deletedBytesRatio < HyperspaceConf.hybridScanDeletedRatioThreshold(spark) } diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/HyperspaceRule.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/HyperspaceRule.scala index 5fbcfaab6..ab017cf8f 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/HyperspaceRule.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/HyperspaceRule.scala @@ -19,6 +19,7 @@ package com.microsoft.hyperspace.index.rules import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import com.microsoft.hyperspace.ActiveSparkSession +import com.microsoft.hyperspace.index.{IndexLogEntry, IndexLogEntryTags} import com.microsoft.hyperspace.index.rules.ApplyHyperspace.{PlanToIndexesMap, PlanToSelectedIndexMap} /** @@ -70,9 +71,21 @@ trait HyperspaceRule extends ActiveSparkSession { if (applicableIndexes.nonEmpty) { val selectedIndexes = indexRanker(plan, applicableIndexes) + selectedIndexes.values.toSeq.distinct.foreach(setApplicableIndexTag(plan, _)) (applyIndex(plan, selectedIndexes), score(plan, selectedIndexes)) } else { (plan, 0) } } + + protected def setApplicableIndexTag(plan: LogicalPlan, index: IndexLogEntry): Unit = { + if (index.getTagValue(IndexLogEntryTags.INDEX_PLAN_ANALYSIS_ENABLED).getOrElse(false)) { + val prevRules = + index.getTagValue(plan, IndexLogEntryTags.APPLICABLE_INDEX_RULES).getOrElse(Nil) + index.setTagValue( + plan, + IndexLogEntryTags.APPLICABLE_INDEX_RULES, + prevRules :+ getClass.getSimpleName.split("\\$").last) + } + } } diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/IndexFilter.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/IndexFilter.scala index 89a674b5c..2772cf4c2 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/IndexFilter.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/IndexFilter.scala @@ -20,6 +20,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import com.microsoft.hyperspace.ActiveSparkSession import com.microsoft.hyperspace.index.{IndexLogEntry, IndexLogEntryTags} +import com.microsoft.hyperspace.index.plananalysis.FilterReason trait IndexFilter extends ActiveSparkSession { @@ -30,19 +31,22 @@ trait IndexFilter extends ActiveSparkSession { * @param condition Flag to append reason string * @param plan Query plan to tag * @param index Index to tag - * @param reasonString Informational message in case condition is false. + * @param filterReason Informational reason in case condition is false. */ protected def setFilterReasonTag( condition: Boolean, plan: LogicalPlan, index: IndexLogEntry, - reasonString: => String): Unit = { + filterReason: => FilterReason): Unit = { if (!condition && index - .getTagValue(IndexLogEntryTags.FILTER_REASONS_ENABLED) - .getOrElse(false)) { + .getTagValue(IndexLogEntryTags.INDEX_PLAN_ANALYSIS_ENABLED) + .getOrElse(false)) { val prevReason = index.getTagValue(plan, IndexLogEntryTags.FILTER_REASONS).getOrElse(Nil) - index.setTagValue(plan, IndexLogEntryTags.FILTER_REASONS, prevReason :+ reasonString) + index.setTagValue( + plan, + IndexLogEntryTags.FILTER_REASONS, + prevReason :+ filterReason) } } @@ -50,18 +54,18 @@ trait IndexFilter extends ActiveSparkSession { * Append the reason string to FILTER_REASONS tag for the given index * if the result of the function is false and FILTER_REASONS tag is set to the index. * - * @param reasonString Informational message in case condition is false. * @param plan Query plan to tag * @param index Index to tag * @param f Function for a condition + * @param filterReason Informational reason in case condition is false. * @return Result of the given function */ protected def withFilterReasonTag( plan: LogicalPlan, index: IndexLogEntry, - reasonString: => String)(f: => Boolean): Boolean = { + filterReason: => FilterReason)(f: => Boolean): Boolean = { val ret = f - setFilterReasonTag(ret, plan, index, reasonString) + setFilterReasonTag(ret, plan, index, filterReason) ret } @@ -71,17 +75,17 @@ trait IndexFilter extends ActiveSparkSession { * * @param plan Query plan to tag * @param indexes Indexes to tag - * @param reasonString Informational message in case condition is false. + * @param filterReason Informational reason in case condition is false. * @param f Function for a condition * @return Result of the given function */ protected def withFilterReasonTag( plan: LogicalPlan, indexes: Seq[IndexLogEntry], - reasonString: => String)(f: => Boolean): Boolean = { + filterReason: => FilterReason)(f: => Boolean): Boolean = { val ret = f indexes.foreach { index => - setFilterReasonTag(ret, plan, index, reasonString) + setFilterReasonTag(ret, plan, index, filterReason) } ret } @@ -90,17 +94,17 @@ trait IndexFilter extends ActiveSparkSession { * Append the reason string to FILTER_REASONS tag for the given list of indexes * if FILTER_REASONS_ENABLED tag is set to the indexes. * - * @param reasonString Informational message in case condition is false. * @param plan Query plan to tag * @param indexes Indexes to tag + * @param filterReason Informational reason in case condition is false. * @return Result of the given function */ protected def setFilterReasonTag( plan: LogicalPlan, indexes: Seq[IndexLogEntry], - reasonString: => String): Unit = { + filterReason: => FilterReason): Unit = { indexes.foreach { index => - setFilterReasonTag(false, plan, index, reasonString) + setFilterReasonTag(false, plan, index, filterReason) } } } diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/IndexRankFilter.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/IndexRankFilter.scala index b759b6a46..8e58c0584 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/IndexRankFilter.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/IndexRankFilter.scala @@ -19,6 +19,7 @@ package com.microsoft.hyperspace.index.rules import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import com.microsoft.hyperspace.index.IndexLogEntry +import com.microsoft.hyperspace.index.plananalysis.{FilterReasonCode, FilterReasons} import com.microsoft.hyperspace.index.rules.ApplyHyperspace.{PlanToIndexesMap, PlanToSelectedIndexMap} /** @@ -51,7 +52,9 @@ trait IndexRankFilter extends IndexFilter { selectedIndex.name.equals(index.name), plan, index, - s"Another candidate index is applied: ${selectedIndex.name}") + FilterReasons.apply( + FilterReasonCode.ANOTHER_INDEX_APPLIED, + Seq(("appliedIndex", selectedIndex.name)))) } } } diff --git a/src/test/scala/com/microsoft/hyperspace/index/covering/FilterIndexRuleTest.scala b/src/test/scala/com/microsoft/hyperspace/index/covering/FilterIndexRuleTest.scala index cb59ec1bc..dfbc30e53 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/covering/FilterIndexRuleTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/covering/FilterIndexRuleTest.scala @@ -117,22 +117,23 @@ class FilterIndexRuleTest extends HyperspaceRuleSuite { val originalPlan = Project(Seq(c2, c3, c4), filterNode) // c4 is not covered by index val allIndexes = IndexCollectionManager(spark).getIndexes(Seq(Constants.States.ACTIVE)) - allIndexes.foreach(_.setTagValue(IndexLogEntryTags.FILTER_REASONS_ENABLED, true)) + allIndexes.foreach(_.setTagValue(IndexLogEntryTags.INDEX_PLAN_ANALYSIS_ENABLED, true)) val (transformedPlan, score) = applyFilterIndexRuleHelper(originalPlan, allIndexes) assert(transformedPlan.equals(originalPlan), "Plan should not transform.") allIndexes.foreach { index => - val msg = index.getTagValue(originalPlan, IndexLogEntryTags.FILTER_REASONS) + val reasons = index.getTagValue(originalPlan, IndexLogEntryTags.FILTER_REASONS) + assert(reasons.isDefined) + val msg = reasons.get.map(_.verboseStr) index.name match { case `indexName1` => - assert(msg.isDefined) assert( - msg.get.exists( - _.equals("Index does not contain required columns. Required columns: " + - "[c3,c2,c3,c4], Indexed & included columns: [c3,c2,c1]"))) + msg.exists( + _.equals( + s"[$indexName1,FilterColumnFilter] Index does not contain required columns. " + + "Required columns: [c3,c2,c4], Indexed & included columns: [c3,c2,c1]"))) case `indexName2` | `indexName3` => - assert(msg.isDefined) assert( - msg.get.exists( + msg.exists( _.contains("The first indexed column should be in filter condition columns."))) } @@ -156,23 +157,23 @@ class FilterIndexRuleTest extends HyperspaceRuleSuite { val originalPlan = Filter(filterCondition, scanNode) val allIndexes = IndexCollectionManager(spark).getIndexes(Seq(Constants.States.ACTIVE)) - allIndexes.foreach(_.setTagValue(IndexLogEntryTags.FILTER_REASONS_ENABLED, true)) + allIndexes.foreach(_.setTagValue(IndexLogEntryTags.INDEX_PLAN_ANALYSIS_ENABLED, true)) val (transformedPlan, score) = applyFilterIndexRuleHelper(originalPlan, allIndexes) assert(!transformedPlan.equals(originalPlan), "No plan transformation.") verifyTransformedPlanWithIndex(transformedPlan, indexName2) allIndexes.foreach { index => - val msg = index.getTagValue(originalPlan, IndexLogEntryTags.FILTER_REASONS) + val reasons = index.getTagValue(originalPlan, IndexLogEntryTags.FILTER_REASONS) + assert(reasons.isDefined) + val msg = reasons.get.map(_.verboseStr) index.name match { case `indexName1` => - assert(msg.isDefined) assert( - msg.get.exists( + msg.exists( _.contains("The first indexed column should be in filter condition columns."))) case `indexName2` => assert(msg.isEmpty) case `indexName3` => - assert(msg.isDefined) - assert(msg.get.exists(_.contains(s"Another candidate index is applied: $indexName2"))) + assert(msg.exists(_.contains(s"Another candidate index is applied: $indexName2"))) } } } diff --git a/src/test/scala/com/microsoft/hyperspace/index/covering/JoinIndexRuleTest.scala b/src/test/scala/com/microsoft/hyperspace/index/covering/JoinIndexRuleTest.scala index a8bfb400b..762d32a5a 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/covering/JoinIndexRuleTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/covering/JoinIndexRuleTest.scala @@ -110,7 +110,7 @@ class JoinIndexRuleTest extends HyperspaceRuleSuite with SQLHelper { val indexes = if (allIndexes.isEmpty) { IndexCollectionManager(spark).getIndexes(Seq(Constants.States.ACTIVE)) } else { - allIndexes.foreach(_.setTagValue(IndexLogEntryTags.FILTER_REASONS_ENABLED, true)) + allIndexes.foreach(_.setTagValue(IndexLogEntryTags.INDEX_PLAN_ANALYSIS_ENABLED, true)) allIndexes } val candidateIndexes = CandidateIndexCollector(plan, indexes) @@ -161,9 +161,10 @@ class JoinIndexRuleTest extends HyperspaceRuleSuite with SQLHelper { val (updatedPlan, _) = applyJoinIndexRuleHelper(originalPlan, allIndexes) assert(updatedPlan.equals(originalPlan)) allIndexes.foreach { index => - val msg = index.getTagValue(originalPlan, IndexLogEntryTags.FILTER_REASONS) - assert(msg.isDefined) - assert(msg.get.exists(_.contains("Not eligible Join - no join condition."))) + val reasons = index.getTagValue(originalPlan, IndexLogEntryTags.FILTER_REASONS) + assert(reasons.isDefined) + val msg = reasons.get.map(_.verboseStr) + assert(msg.exists(_.contains("Not eligible Join - no join condition."))) } } @@ -174,10 +175,11 @@ class JoinIndexRuleTest extends HyperspaceRuleSuite with SQLHelper { val (updatedPlan, _) = applyJoinIndexRuleHelper(originalPlan, allIndexes) assert(updatedPlan.equals(originalPlan)) allIndexes.foreach { index => - val msg = index.getTagValue(originalPlan, IndexLogEntryTags.FILTER_REASONS) - assert(msg.isDefined) + val reasons = index.getTagValue(originalPlan, IndexLogEntryTags.FILTER_REASONS) + assert(reasons.isDefined) + val msg = reasons.get.map(_.verboseStr) assert( - msg.get.exists(_.contains("Join condition is not eligible. Equi-Joins in simple CNF"))) + msg.exists(_.contains("Join condition is not eligible. Equi-Joins in simple CNF"))) } } @@ -188,10 +190,11 @@ class JoinIndexRuleTest extends HyperspaceRuleSuite with SQLHelper { val (updatedPlan, _) = applyJoinIndexRuleHelper(originalPlan, allIndexes) assert(updatedPlan.equals(originalPlan)) allIndexes.foreach { index => - val msg = index.getTagValue(originalPlan, IndexLogEntryTags.FILTER_REASONS) - assert(msg.isDefined) + val reasons = index.getTagValue(originalPlan, IndexLogEntryTags.FILTER_REASONS) + assert(reasons.isDefined) + val msg = reasons.get.map(_.verboseStr) assert( - msg.get.exists(_.contains("Join condition is not eligible. Equi-Joins in simple CNF"))) + msg.exists(_.contains("Join condition is not eligible. Equi-Joins in simple CNF"))) } } @@ -202,10 +205,11 @@ class JoinIndexRuleTest extends HyperspaceRuleSuite with SQLHelper { val (updatedPlan, _) = applyJoinIndexRuleHelper(originalPlan, allIndexes) assert(updatedPlan.equals(originalPlan)) allIndexes.foreach { index => - val msg = index.getTagValue(originalPlan, IndexLogEntryTags.FILTER_REASONS) - assert(msg.isDefined) + val reasons = index.getTagValue(originalPlan, IndexLogEntryTags.FILTER_REASONS) + assert(reasons.isDefined) + val msg = reasons.get.map(_.verboseStr) assert( - msg.get.exists(_.contains("Join condition is not eligible. Equi-Joins in simple CNF"))) + msg.exists(_.contains("Join condition is not eligible. Equi-Joins in simple CNF"))) } } @@ -230,26 +234,28 @@ class JoinIndexRuleTest extends HyperspaceRuleSuite with SQLHelper { index.name match { case "t1i1" => assert( - msg.get.toSet.equals( - Set( - "All join condition column should be the indexed columns. " + - "Join columns: [t1c2], Indexed columns: [t1c1]", - "No available indexes for right subplan."))) + msg.get.toSet.equals(Set( + "All join condition column should be the indexed columns. " + + "Join columns: [t1c2], Indexed columns: [t1c1]", + "No available indexes for right subplan.")), + msg.get) case "t1i2" => assert( msg.get.toSet.equals(Set( "All join condition column should be the indexed columns. " + "Join columns: [t1c2], Indexed columns: [t1c1,t1c2]", - "No available indexes for right subplan."))) + "No available indexes for right subplan.")), + msg.get) case "t1i3" => - assert(msg.get.toSet.equals(Set("No available indexes for right subplan."))) + assert( + msg.get.toSet + .equals(Set("No available indexes for right subplan."))) case "t2i1" => assert( - msg.get.toSet.equals( - Set( - "All join condition column should be the indexed columns. " + - "Join columns: [t2c2], Indexed columns: [t2c1]", - "No available indexes for right subplan."))) + msg.get.toSet.equals(Set( + "All join condition column should be the indexed columns. " + + "Join columns: [t2c2], Indexed columns: [t2c1]", + "No available indexes for right subplan."))) case "t2i2" => assert( msg.get.toSet.equals(Set( @@ -283,11 +289,11 @@ class JoinIndexRuleTest extends HyperspaceRuleSuite with SQLHelper { index.name match { case "t1i1" => assert( - msg.get.toSet.equals( - Set( - "Index does not contain all required columns. " + - "Required columns: [t1c1,t1c4], Index columns: [t1c1,t1c3]", - "No available indexes for left subplan."))) + msg.get.toSet.equals(Set( + "Index does not contain all required columns. " + + "Required columns: [t1c1,t1c4], Index columns: [t1c1,t1c3]", + "No available indexes for left subplan.")), + msg.get) case "t2i1" => assert( msg.get.toSet.equals( @@ -437,13 +443,14 @@ class JoinIndexRuleTest extends HyperspaceRuleSuite with SQLHelper { val (updatedPlan, _) = applyJoinIndexRuleHelper(originalPlan, allIndexes) assert(updatedPlan.equals(originalPlan)) allIndexes.foreach { index => - val msg = index.getTagValue(originalPlan, IndexLogEntryTags.FILTER_REASONS) - assert(msg.isDefined) + val reasons = index.getTagValue(originalPlan, IndexLogEntryTags.FILTER_REASONS) + assert(reasons.isDefined) + val msg = reasons.get.map(_.verboseStr) assert( - msg.get.toSet.equals( - Set("Each join condition column should come from relations directly and attributes " + - "from left plan must exclusively have one-to-one mapping with attributes from " + - "right plan. E.g. join(A = B and A = D) is not eligible."))) + msg.size == 1 && msg.head.contains( + "Each join condition column should come from relations " + + "directly and attributes from left plan must exclusively have one-to-one mapping " + + "with attributes from right plan. E.g. join(A = B and A = D) is not eligible.")) } } { diff --git a/src/test/scala/com/microsoft/hyperspace/index/plananalysis/ExplainTest.scala b/src/test/scala/com/microsoft/hyperspace/index/plananalysis/ExplainTest.scala index 6168b368c..c6bc3461c 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/plananalysis/ExplainTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/plananalysis/ExplainTest.scala @@ -181,7 +181,7 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite { def filterQuery(query: DataFrame): DataFrame = { query.filter("Col2 == 2").select("Col1") } - verifyExplainOutput(df, expectedOutput.toString, verbose = false) { filterQuery } + verifyExplainOutput(df, expectedOutput, verbose = false) { filterQuery } } private def getIndexRootPath(indexName: String): Path = diff --git a/src/test/scala/com/microsoft/hyperspace/index/rules/CandidateIndexCollectorTest.scala b/src/test/scala/com/microsoft/hyperspace/index/rules/CandidateIndexCollectorTest.scala index 8f2762748..36e65dab4 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/rules/CandidateIndexCollectorTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/rules/CandidateIndexCollectorTest.scala @@ -330,7 +330,7 @@ class CandidateIndexCollectorTest extends HyperspaceRuleSuite with SQLHelper { } val allIndexes = indexList.map(indexName => latestIndexLogEntry(systemPath, indexName)) - allIndexes.foreach(_.setTagValue(IndexLogEntryTags.FILTER_REASONS_ENABLED, true)) + allIndexes.foreach(_.setTagValue(IndexLogEntryTags.INDEX_PLAN_ANALYSIS_ENABLED, true)) val plan1 = spark.read.parquet(dataPath).select("id", "name").queryExecution.optimizedPlan @@ -341,16 +341,17 @@ class CandidateIndexCollectorTest extends HyperspaceRuleSuite with SQLHelper { assert(filtered.toSet.equals(Set("index_ok", "index_noLineage"))) allIndexes.foreach { entry => - val msg = entry.getTagValue(plan1, IndexLogEntryTags.FILTER_REASONS) + val reasons = entry.getTagValue(plan1, IndexLogEntryTags.FILTER_REASONS) if (filtered.contains(entry.name)) { - assert(msg.isEmpty) + assert(reasons.isEmpty) } else { - assert(msg.isDefined) + assert(reasons.isDefined) + val msg = reasons.get.map(_.verboseStr) entry.name match { case "index_differentColumn" => - assert(msg.get.exists(_.contains("Column Schema does not match"))) + assert(msg.exists(_.contains("Column Schema does not match"))) case "index_noCommonFiles" => - assert(msg.get.exists(_.contains("Index signature does not match"))) + assert(msg.exists(_.contains("Index signature does not match"))) } } @@ -367,15 +368,16 @@ class CandidateIndexCollectorTest extends HyperspaceRuleSuite with SQLHelper { assert(indexes.isEmpty) allIndexes.foreach { entry => - val msg = entry.getTagValue(plan2, IndexLogEntryTags.FILTER_REASONS) - assert(msg.isDefined) + val reasons = entry.getTagValue(plan1, IndexLogEntryTags.FILTER_REASONS) + assert(reasons.isDefined) + val msg = reasons.get.map(_.verboseStr) entry.name match { case "index_differentColumn" => - assert(msg.get.exists(_.contains("Column Schema does not match"))) + assert(msg.exists(_.contains("Column Schema does not match"))) case "index_noCommonFiles" => - assert(msg.get.exists(_.contains("No common files."))) + assert(msg.exists(_.contains("No common files."))) case _ => - assert(msg.get.exists(_.contains("Appended bytes ratio"))) + assert(msg.exists(_.contains("Appended bytes ratio"))) } // Unset for next test. @@ -390,16 +392,17 @@ class CandidateIndexCollectorTest extends HyperspaceRuleSuite with SQLHelper { assert(filtered.toSet.equals(Set("index_ok", "index_noLineage"))) allIndexes.foreach { entry => - val msg = entry.getTagValue(plan2, IndexLogEntryTags.FILTER_REASONS) + val reasons = entry.getTagValue(plan2, IndexLogEntryTags.FILTER_REASONS) if (filtered.contains(entry.name)) { - assert(msg.isEmpty) + assert(reasons.isEmpty) } else { - assert(msg.isDefined) + assert(reasons.isDefined) + val msg = reasons.get.map(_.verboseStr) entry.name match { case "index_differentColumn" => - assert(msg.get.exists(_.contains("Column Schema does not match"))) + assert(msg.exists(_.contains("Column Schema does not match"))) case "index_noCommonFiles" => - assert(msg.get.exists(_.contains("No common files."))) + assert(msg.exists(_.contains("No common files."))) } } @@ -421,17 +424,19 @@ class CandidateIndexCollectorTest extends HyperspaceRuleSuite with SQLHelper { assert(indexes.isEmpty) allIndexes.foreach { entry => - val msg = entry.getTagValue(plan3, IndexLogEntryTags.FILTER_REASONS) - assert(msg.isDefined) + val reasons = entry.getTagValue(plan3, IndexLogEntryTags.FILTER_REASONS) + assert(reasons.isDefined) + val msg = reasons.get.map(_.verboseStr) + entry.name match { case "index_differentColumn" => - assert(msg.get.exists(_.contains("Column Schema does not match"))) + assert(msg.exists(_.contains("Column Schema does not match"))) case "index_noCommonFiles" => - assert(msg.get.exists(_.contains("No common files."))) + assert(msg.exists(_.contains("No common files."))) case "index_noLineage" => - assert(msg.get.exists(_.contains("Index doesn't support deleted files."))) + assert(msg.exists(_.contains("Index doesn't support deleted files."))) case "index_ok" => - assert(msg.get.exists(_.contains("Deleted bytes ratio"))) + assert(msg.exists(_.contains("Deleted bytes ratio"))) } entry.unsetTagValue(plan3, IndexLogEntryTags.FILTER_REASONS) diff --git a/src/test/scala/com/microsoft/hyperspace/index/rules/ScoreBasedIndexPlanOptimizerTest.scala b/src/test/scala/com/microsoft/hyperspace/index/rules/ScoreBasedIndexPlanOptimizerTest.scala index 3742e8643..8f5cdc3d6 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/rules/ScoreBasedIndexPlanOptimizerTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/rules/ScoreBasedIndexPlanOptimizerTest.scala @@ -108,6 +108,10 @@ class ScoreBasedIndexPlanOptimizerTest extends QueryTest with HyperspaceSuite { assert(rightChildScore == 50) assert(!rightChildPlan.equals(plan.children.last)) + hyperspace.whyNot(query(leftDf, rightDf)()) + hyperspace.whyNot(query(leftDf, rightDf)(), "leftDfJoinIndex", extended = true) + hyperspace.explain(query(leftDf, rightDf)(), verbose = true) + verifyIndexUsage( query(leftDf, rightDf), getIndexFilesPath(leftDfFilterIndexConfig.indexName) ++