diff --git a/src/main/scala/com/microsoft/hyperspace/Hyperspace.scala b/src/main/scala/com/microsoft/hyperspace/Hyperspace.scala index 94f2317d0..a6c883f88 100644 --- a/src/main/scala/com/microsoft/hyperspace/Hyperspace.scala +++ b/src/main/scala/com/microsoft/hyperspace/Hyperspace.scala @@ -20,7 +20,7 @@ import org.apache.spark.sql.{DataFrame, SparkSession} import com.microsoft.hyperspace.index._ import com.microsoft.hyperspace.index.IndexConstants.{OPTIMIZE_MODE_QUICK, REFRESH_MODE_FULL} -import com.microsoft.hyperspace.index.plananalysis.PlanAnalyzer +import com.microsoft.hyperspace.index.plananalysis.{CandidateIndexAnalyzer, PlanAnalyzer} import com.microsoft.hyperspace.index.rules.ApplyHyperspace import com.microsoft.hyperspace.index.sources.FileBasedSourceProviderManager @@ -150,7 +150,7 @@ class Hyperspace(spark: SparkSession) { } /** - * Explains how indexes will be applied to the given dataframe. + * Explain how indexes will be applied to the given dataframe. * * @param df dataFrame. * @param redirectFunc optional function to redirect output of explain. @@ -171,6 +171,25 @@ class Hyperspace(spark: SparkSession) { indexManager.index(indexName) } + /** + * Explain why indexes are not applied to the given dataframe. + * + * @param df Dataframe + * @param indexName Optional index name to filter out the output + * @param extended If true, print more verbose messages. + * @param redirectFunc Optional function to redirect output + */ + def whyNot(df: DataFrame, indexName: String = "", extended: Boolean = false)( + implicit redirectFunc: String => Unit = print): Unit = { + withHyperspaceRuleDisabled { + if (indexName.nonEmpty) { + redirectFunc(CandidateIndexAnalyzer.whyNotIndexString(spark, df, indexName, extended)) + } else { + redirectFunc(CandidateIndexAnalyzer.whyNotIndexesString(spark, df, extended)) + } + } + } + private def withHyperspaceRuleDisabled(f: => Unit): Unit = { try { ApplyHyperspace.disableForIndexMaintenance.set(true) diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala index 84ea9ca65..5257fa530 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala @@ -527,10 +527,23 @@ case class IndexLogEntry( tags.get((plan, tag)).map(_.asInstanceOf[T]) } + def getTagValuesForAllPlan[T](tag: IndexLogEntryTag[T]): Seq[(LogicalPlan, T)] = { + tags.filter(entry => entry._1._2.equals(tag)).toSeq.map { case (k, v) => + (k._1, v.asInstanceOf[T]) + } + } + def unsetTagValue[T](plan: LogicalPlan, tag: IndexLogEntryTag[T]): Unit = { tags.remove((plan, tag)) } + def unsetTagValueForAllPlan[T](tag: IndexLogEntryTag[T]): Unit = { + val plansWithTag = tags.keys.filter(_._2.name.equals(tag.name)).map(_._1) + plansWithTag.foreach { plan => + tags.remove((plan, tag)) + } + } + def withCachedTag[T](plan: LogicalPlan, tag: IndexLogEntryTag[T])(f: => T): T = { getTagValue(plan, tag) match { case Some(v) => v diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntryTags.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntryTags.scala index 4184696d3..ec9e071bb 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntryTags.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntryTags.scala @@ -18,6 +18,8 @@ package com.microsoft.hyperspace.index import org.apache.spark.sql.execution.datasources.InMemoryFileIndex +import com.microsoft.hyperspace.index.plananalysis.FilterReason + object IndexLogEntryTags { // HYBRIDSCAN_REQUIRED indicates if Hybrid Scan is required for the index or not. val HYBRIDSCAN_REQUIRED: IndexLogEntryTag[Boolean] = @@ -55,10 +57,15 @@ object IndexLogEntryTags { IndexLogEntryTag[InMemoryFileIndex]("inMemoryFileIndexHybridScanAppended") // FILTER_REASONS stores reason strings for disqualification. - val FILTER_REASONS: IndexLogEntryTag[Seq[String]] = - IndexLogEntryTag[Seq[String]]("filterReasons") + val FILTER_REASONS: IndexLogEntryTag[Seq[FilterReason]] = + IndexLogEntryTag[Seq[FilterReason]]("filterReasons") + + // APPLIED_INDEX_RULES stores rule's names can apply the index to the plan. + val APPLICABLE_INDEX_RULES: IndexLogEntryTag[Seq[String]] = + IndexLogEntryTag[Seq[String]]("applicableIndexRules") // FILTER_REASONS_ENABLED indicates whether whyNotAPI is enabled or not. - val FILTER_REASONS_ENABLED: IndexLogEntryTag[Boolean] = - IndexLogEntryTag[Boolean]("filterReasonsEnabled") + // If it's enabled, FILTER_REASONS and APPLIED_INDEX_RULES info will be tagged. + val INDEX_PLAN_ANALYSIS_ENABLED: IndexLogEntryTag[Boolean] = + IndexLogEntryTag[Boolean]("indexPlanAnalysisEnabled") } diff --git a/src/main/scala/com/microsoft/hyperspace/index/covering/FilterIndexRule.scala b/src/main/scala/com/microsoft/hyperspace/index/covering/FilterIndexRule.scala index 4fc56f8ef..c1827f0d9 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/covering/FilterIndexRule.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/covering/FilterIndexRule.scala @@ -21,6 +21,7 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} import com.microsoft.hyperspace.index.IndexLogEntryTags +import com.microsoft.hyperspace.index.plananalysis.FilterReasons import com.microsoft.hyperspace.index.rules._ import com.microsoft.hyperspace.index.rules.ApplyHyperspace.{PlanToIndexesMap, PlanToSelectedIndexMap} import com.microsoft.hyperspace.util.{HyperspaceConf, ResolverUtils} @@ -94,7 +95,9 @@ object FilterColumnFilter extends QueryPlanIndexFilter { withFilterReasonTag( plan, index, - "The first indexed column should be in filter condition columns.") { + FilterReasons.NoFirstIndexedColCond( + index.derivedDataset.indexedColumns.head, + filterColumnNames.mkString(","))) { ResolverUtils .resolve(spark, index.derivedDataset.indexedColumns.head, filterColumnNames) .isDefined @@ -102,9 +105,9 @@ object FilterColumnFilter extends QueryPlanIndexFilter { withFilterReasonTag( plan, index, - "Index does not contain required columns. Required columns: " + - s"[${(filterColumnNames ++ projectColumnNames).mkString(",")}], Indexed & " + - s"included columns: [${(index.derivedDataset.referencedColumns).mkString(",")}]") { + FilterReasons.MissingRequiredCol( + (filterColumnNames ++ projectColumnNames).toSet.mkString(","), + index.derivedDataset.referencedColumns.mkString(","))) { ResolverUtils .resolve( spark, diff --git a/src/main/scala/com/microsoft/hyperspace/index/covering/JoinIndexRule.scala b/src/main/scala/com/microsoft/hyperspace/index/covering/JoinIndexRule.scala index 6004a8b66..6051e57c8 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/covering/JoinIndexRule.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/covering/JoinIndexRule.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan} import com.microsoft.hyperspace.Hyperspace import com.microsoft.hyperspace.index.{IndexLogEntry, IndexLogEntryTags} import com.microsoft.hyperspace.index.covering.JoinAttributeFilter.extractConditions +import com.microsoft.hyperspace.index.plananalysis.FilterReasons import com.microsoft.hyperspace.index.rules.{HyperspaceRule, IndexRankFilter, IndexTypeFilter, QueryPlanIndexFilter, RuleUtils} import com.microsoft.hyperspace.index.rules.ApplyHyperspace.{PlanToIndexesMap, PlanToSelectedIndexMap} import com.microsoft.hyperspace.index.sources.FileBasedRelation @@ -65,17 +66,22 @@ object JoinPlanNodeFilter extends QueryPlanIndexFilter { val joinConditionCond = withFilterReasonTag( plan, leftAndRightIndexes, - "Join condition is not eligible. Equi-Joins in simple CNF form are supported. " + - "Literal is not supported.") { + FilterReasons.NotEligibleJoin("Non equi-join or has literal")) { isJoinConditionSupported(condition) } val leftPlanLinearCond = - withFilterReasonTag(plan, leftAndRightIndexes, "Left child is not a linear plan.") { + withFilterReasonTag( + plan, + leftAndRightIndexes, + FilterReasons.NotEligibleJoin("Non linear left child plan")) { isPlanLinear(l) } val rightPlanLinearCond = - withFilterReasonTag(plan, leftAndRightIndexes, "Right child is not a linear plan.") { + withFilterReasonTag( + plan, + leftAndRightIndexes, + FilterReasons.NotEligibleJoin("Non linear right child plan")) { isPlanLinear(r) } @@ -96,7 +102,7 @@ object JoinPlanNodeFilter extends QueryPlanIndexFilter { setFilterReasonTag( plan, candidateIndexes.values.flatten.toSeq, - "Not eligible Join - no join condition.") + FilterReasons.NotEligibleJoin("No join condition")) Map.empty case _ => Map.empty @@ -164,10 +170,7 @@ object JoinAttributeFilter extends QueryPlanIndexFilter { if (withFilterReasonTag( plan, candidateIndexes.head._2 ++ candidateIndexes.last._2, - "Each join condition column should come from " + - "relations directly and attributes from left plan must exclusively have " + - "one-to-one mapping with attributes from right plan. " + - "E.g. join(A = B and A = D) is not eligible.") { + FilterReasons.NotEligibleJoin("incompatible left and right join columns")) { ensureAttributeRequirements( JoinIndexRule.leftRelation.get, JoinIndexRule.rightRelation.get, @@ -329,39 +332,35 @@ object JoinColumnFilter extends QueryPlanIndexFilter { val lRequiredAllCols = resolve(spark, allRequiredCols(l), lBaseAttrs).get val rRequiredAllCols = resolve(spark, allRequiredCols(r), rBaseAttrs).get + // Make sure required indexed columns are subset of all required columns. + assert( + resolve(spark, lRequiredIndexedCols, lRequiredAllCols).isDefined && + resolve(spark, rRequiredIndexedCols, rRequiredAllCols).isDefined) + + val lIndexes = + getUsableIndexes( + plan, + candidateIndexes.getOrElse(leftRelation.plan, Nil), + lRequiredIndexedCols, + lRequiredAllCols, + "left") + val rIndexes = + getUsableIndexes( + plan, + candidateIndexes.getOrElse(rightRelation.plan, Nil), + rRequiredIndexedCols, + rRequiredAllCols, + "right") + if (withFilterReasonTag( plan, candidateIndexes.head._2 ++ candidateIndexes.last._2, - "Invalid query plan.") { - // Make sure required indexed columns are subset of all required columns. - resolve(spark, lRequiredIndexedCols, lRequiredAllCols).isDefined && - resolve(spark, rRequiredIndexedCols, rRequiredAllCols).isDefined - }) { - val lIndexes = - getUsableIndexes( - plan, - candidateIndexes.getOrElse(leftRelation.plan, Nil), - lRequiredIndexedCols, - lRequiredAllCols) - val rIndexes = - getUsableIndexes( - plan, - candidateIndexes.getOrElse(rightRelation.plan, Nil), - rRequiredIndexedCols, - rRequiredAllCols) - - if (withFilterReasonTag( - plan, - candidateIndexes.head._2 ++ candidateIndexes.last._2, - "No available indexes for left subplan.")(lIndexes.nonEmpty) && - withFilterReasonTag( - plan, - candidateIndexes.head._2 ++ candidateIndexes.last._2, - "No available indexes for right subplan.")(rIndexes.nonEmpty)) { - Map(leftRelation.plan -> lIndexes, rightRelation.plan -> rIndexes) - } else { - Map.empty - } + FilterReasons.NoAvailJoinIndexPair("left"))(lIndexes.nonEmpty) && + withFilterReasonTag( + plan, + candidateIndexes.head._2 ++ candidateIndexes.last._2, + FilterReasons.NoAvailJoinIndexPair("right"))(rIndexes.nonEmpty)) { + Map(leftRelation.plan -> lIndexes, rightRelation.plan -> rIndexes) } else { Map.empty } @@ -421,7 +420,8 @@ object JoinColumnFilter extends QueryPlanIndexFilter { plan: LogicalPlan, indexes: Seq[IndexLogEntry], requiredIndexCols: Seq[String], - allRequiredCols: Seq[String]): Seq[IndexLogEntry] = { + allRequiredCols: Seq[String], + leftOrRight: String): Seq[IndexLogEntry] = { indexes.filter { idx => val allCols = idx.derivedDataset.referencedColumns // All required index columns should match one-to-one with all indexed columns and @@ -429,17 +429,19 @@ object JoinColumnFilter extends QueryPlanIndexFilter { withFilterReasonTag( plan, idx, - s"All join condition column should be the indexed columns. " + - s"Join columns: [${requiredIndexCols - .mkString(",")}], Indexed columns: [${idx.indexedColumns.mkString(",")}]") { + FilterReasons.NotAllJoinColIndexed( + leftOrRight, + requiredIndexCols.mkString(","), + idx.indexedColumns.mkString(","))) { requiredIndexCols.toSet.equals(idx.indexedColumns.toSet) } && withFilterReasonTag( plan, idx, - s"Index does not contain all required columns. " + - s"Required columns: [${allRequiredCols.mkString(",")}], " + - s"Index columns: [${(idx.derivedDataset.referencedColumns).mkString(",")}]") { + FilterReasons.MissingIndexedCol( + leftOrRight, + allRequiredCols.mkString(","), + idx.indexedColumns.mkString(","))) { allRequiredCols.forall(allCols.contains) } } @@ -523,7 +525,7 @@ object JoinRankFilter extends IndexRankFilter { setFilterReasonTag( plan, indexes.head._2 ++ indexes.last._2, - "No compatible left and right index pair.") + FilterReasons.NoCompatibleJoinIndexPair()) Map.empty } } diff --git a/src/main/scala/com/microsoft/hyperspace/index/plananalysis/CandidateIndexAnalyzer.scala b/src/main/scala/com/microsoft/hyperspace/index/plananalysis/CandidateIndexAnalyzer.scala new file mode 100644 index 000000000..13b8f7fb8 --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/plananalysis/CandidateIndexAnalyzer.scala @@ -0,0 +1,346 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.plananalysis + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.hyperspace.utils.DataFrameUtils + +import com.microsoft.hyperspace.{Hyperspace, HyperspaceException} +import com.microsoft.hyperspace.actions.Constants +import com.microsoft.hyperspace.index.{IndexLogEntry, IndexLogEntryTags} +import com.microsoft.hyperspace.index.rules.{CandidateIndexCollector, ScoreBasedIndexPlanOptimizer} + +object CandidateIndexAnalyzer extends Logging { + def whyNotIndexString( + spark: SparkSession, + df: DataFrame, + indexName: String, + extended: Boolean): String = { + val (planWithHyperspace, filterReasons, applicableIndexes) = + collectAnalysisResult(spark, df, indexName) + generateWhyNotString( + spark, + df.queryExecution.optimizedPlan, + planWithHyperspace, + filterReasons, + applicableIndexes, + extended) + } + + def whyNotIndexesString(spark: SparkSession, df: DataFrame, extended: Boolean): String = { + val (planWithHyperspace, filterReasons, applicableIndexes) = + collectAnalysisResult(spark, df) + generateWhyNotString( + spark, + df.queryExecution.optimizedPlan, + planWithHyperspace, + filterReasons, + applicableIndexes, + extended) + } + + def applicableIndexInfoString(spark: SparkSession, df: DataFrame): String = { + val (_, applicableIndexes) = collectApplicableIndexInfo(spark, df) + generateApplicableIndexInfoString(spark, df.queryExecution.optimizedPlan, applicableIndexes) + } + + private def cleanupAnalysisTags(indexes: Seq[IndexLogEntry]): Unit = { + indexes.foreach { index => + index.unsetTagValueForAllPlan(IndexLogEntryTags.INDEX_PLAN_ANALYSIS_ENABLED) + index.unsetTagValueForAllPlan(IndexLogEntryTags.FILTER_REASONS) + index.unsetTagValueForAllPlan(IndexLogEntryTags.APPLICABLE_INDEX_RULES) + } + } + + private def prepareTagsForAnalysis(indexes: Seq[IndexLogEntry]): Unit = { + indexes.foreach { index => + index.setTagValue(IndexLogEntryTags.INDEX_PLAN_ANALYSIS_ENABLED, true) + + // Clean up previous reason tags. + index.unsetTagValueForAllPlan(IndexLogEntryTags.FILTER_REASONS) + index.unsetTagValueForAllPlan(IndexLogEntryTags.APPLICABLE_INDEX_RULES) + } + } + + private def getSubPlanLoc( + originalPlanStrings: Seq[String], + plan: LogicalPlan, + numberSpaces: Int): String = { + val planStrLines = plan.toString.split('\n') + + val firstLine = planStrLines.head + val firstLineStr = originalPlanStrings.find { s => + s.endsWith(firstLine) && + s.substring(numberSpaces, s.length - firstLine.length).distinct.forall(" +-:".contains(_)) + } + val numberRegex = raw"([0-9]+)".r + val startLineNumber = if (firstLineStr.isDefined) { + numberRegex.findFirstIn(firstLineStr.get).get.toInt + } else { + -1 + } + s"${plan.nodeName} @$startLineNumber" + } + + private def generateApplicableIndexInfoString( + spark: SparkSession, + planWithoutHyperspace: LogicalPlan, + applicableIndexes: Seq[(IndexLogEntry, Seq[(LogicalPlan, Seq[String])])]): String = { + val stringBuilder = new StringBuilder + val originalPlanString = planWithoutHyperspace.numberedTreeString.split('\n') + + // to Dataframe + // sub plan line number, index name, rule name + val numberSpaces = originalPlanString.length.toString.length + 1 + val res = applicableIndexes + .flatMap { + case (index: IndexLogEntry, applicableRules: Seq[(LogicalPlan, Seq[String])]) => + applicableRules.flatMap { + case (plan, ruleNames) => + val subPlanLocStr = getSubPlanLoc(originalPlanString, plan, numberSpaces) + ruleNames.map { ruleName => + (subPlanLocStr, index.name, index.derivedDataset.kindAbbr, ruleName) + } + } + } + .sortBy(r => (r._1, r._2, r._3, r._4)) + .distinct + + if (res.isEmpty) { + return "No applicable indexes. Try hyperspace.whyNot()" + } + val newLine = System.lineSeparator() + stringBuilder.append("Plan without Hyperspace:") + stringBuilder.append(newLine) + stringBuilder.append(newLine) + stringBuilder.append(originalPlanString.mkString(newLine)) + stringBuilder.append(newLine) + stringBuilder.append(newLine) + + + import spark.implicits._ + val df = res.toDF("SubPlan", "IndexName", "IndexType", "RuleName") + + // '\n' is used in `showString` as a delimiter for newlines. + df.showString(res.size, truncate = 0).split('\n').foreach { l => + stringBuilder.append(l) + stringBuilder.append(newLine) + } + stringBuilder.toString + } + + private def generateWhyNotString( + spark: SparkSession, + planWithoutHyperspace: LogicalPlan, + planWithHyperspace: LogicalPlan, + filterReasons: Seq[(IndexLogEntry, Seq[(LogicalPlan, Seq[FilterReason])])], + applicableIndexes: Seq[(IndexLogEntry, Seq[(LogicalPlan, Seq[String])])], + extended: Boolean = false): String = { + val stringBuilder = new StringBuilder + val originalPlanString = planWithoutHyperspace.numberedTreeString.split('\n') + val newLine = System.lineSeparator() + stringBuilder.append(newLine) + stringBuilder.append("=============================================================") + stringBuilder.append(newLine) + stringBuilder.append("Plan with Hyperspace & Summary:") + stringBuilder.append(newLine) + stringBuilder.append("=============================================================") + stringBuilder.append(newLine) + stringBuilder.append(planWithHyperspace.toString) + stringBuilder.append(newLine) + + def printIndexNames(indexNames: Seq[String]): Unit = { + indexNames.foreach { idxName => + stringBuilder.append(s"- $idxName") + stringBuilder.append(newLine) + } + if (indexNames.isEmpty) { + stringBuilder.append("- No such index found.") + } + stringBuilder.append(newLine) + } + + stringBuilder.append("Applied indexes:") + stringBuilder.append(newLine) + val indexNameRegex = "Hyperspace\\(Type: .+, Name: (.+), LogVersion: [0-9]+\\)".r + val appliedIndexNames = indexNameRegex + .findAllIn(planWithHyperspace.toString) + .matchData + .map { matchStr => + matchStr.group(1) + } + .toSeq + .distinct + .sorted + printIndexNames(appliedIndexNames) + + stringBuilder.append("Applicable indexes, but not applied due to priority:") + stringBuilder.append(newLine) + val applicableButNotAppliedIndexNames = + applicableIndexes.map(_._1.name).distinct.sorted.filterNot(appliedIndexNames.contains(_)) + printIndexNames(applicableButNotAppliedIndexNames) + + // Covert reasons to Dataframe rows. + // (sub plan location string, index name, index type, reason code, arg strings, verbose string) + val numberSpaces = originalPlanString.length.toString.length + 1 + val res = filterReasons + .flatMap { + case (index: IndexLogEntry, reasonsForIndex: Seq[(LogicalPlan, Seq[FilterReason])]) => + reasonsForIndex.flatMap { + case (plan, reasons) => + val subPlanLocStr = getSubPlanLoc(originalPlanString, plan, numberSpaces) + reasons.map { reason => + ( + subPlanLocStr, + index.name, + index.derivedDataset.kindAbbr, + reason.codeStr, + reason.argStr, + reason.verboseStr) + } + } + } + .sortBy(r => (r._1, r._2, r._3, r._4, r._5)) + .distinct + + import spark.implicits._ + val df = res.toDF("SubPlan", "IndexName", "IndexType", "Reason", "Message", "VerboseMessage") + + val outputDf = if (extended) { + df + } else { + df.drop("VerboseMessage") + .filter(!$"Reason".like("COL_SCHEMA_MISMATCH")) + } + + stringBuilder.append("Non-applicable indexes - index is outdated:") + stringBuilder.append(newLine) + val indexNamesForOutdated = res + .filter(row => row._4.equals("SOURCE_DATA_CHANGE")) + .map(_._2) + .distinct + .sorted + .filterNot(appliedIndexNames.contains(_)) + .filterNot(applicableButNotAppliedIndexNames.contains(_)) + printIndexNames(indexNamesForOutdated) + + stringBuilder.append(newLine) + stringBuilder.append("Non-applicable indexes - no applicable query plan:") + stringBuilder.append(newLine) + val indexNamesForNoApplicablePlan = res + .filterNot(row => + row._4.equals("COL_SCHEMA_MISMATCH") || row._4.equals("SOURCE_DATA_CHANGE")) + .map(_._2) + .distinct + .sorted + .filterNot(appliedIndexNames.contains(_)) + .filterNot(applicableButNotAppliedIndexNames.contains(_)) + printIndexNames(indexNamesForNoApplicablePlan) + stringBuilder.append(newLine) + + stringBuilder.append("For more information, please visit: ") + stringBuilder.append("https://microsoft.github.io/hyperspace/docs/why-not-result-analysis") + + stringBuilder.append(newLine) + stringBuilder.append(newLine) + stringBuilder.append("=============================================================") + stringBuilder.append(newLine) + stringBuilder.append("Plan without Hyperspace & WhyNot reasons:") + stringBuilder.append(newLine) + stringBuilder.append("=============================================================") + stringBuilder.append(newLine) + stringBuilder.append(originalPlanString.mkString(newLine)) + stringBuilder.append(newLine) + stringBuilder.append(newLine) + + // '\n' is used in `showString` as a delimiter for newlines. + outputDf.showString(res.size, truncate = 0).split('\n').foreach { l => + stringBuilder.append(l) + stringBuilder.append(newLine) + } + stringBuilder.toString + } + + private def collectApplicableIndexInfo( + spark: SparkSession, + df: DataFrame): (LogicalPlan, Seq[(IndexLogEntry, Seq[(LogicalPlan, Seq[String])])]) = { + val (transformedPlan, _, applicableIndexInfo) = collectAnalysisResult(spark, df) + (transformedPlan, applicableIndexInfo) + } + + private def collectAnalysisResult(spark: SparkSession, df: DataFrame): ( + LogicalPlan, + Seq[(IndexLogEntry, Seq[(LogicalPlan, Seq[FilterReason])])], + Seq[(IndexLogEntry, Seq[(LogicalPlan, Seq[String])])]) = { + val plan = df.queryExecution.optimizedPlan + + val indexManager = Hyperspace.getContext(spark).indexCollectionManager + val allActiveIndexes = indexManager.getIndexes(Seq(Constants.States.ACTIVE)) + + if (allActiveIndexes.isEmpty) { + throw HyperspaceException(s"No available ACTIVE indexes") + } + applyHyperspaceForAnalysis(plan, allActiveIndexes) + } + + private def collectAnalysisResult(spark: SparkSession, df: DataFrame, indexName: String): ( + LogicalPlan, + Seq[(IndexLogEntry, Seq[(LogicalPlan, Seq[FilterReason])])], + Seq[(IndexLogEntry, Seq[(LogicalPlan, Seq[String])])]) = { + val plan = df.queryExecution.optimizedPlan + + val indexManager = Hyperspace.getContext(spark).indexCollectionManager + val allIndexes = + indexManager.getIndexes().filter(!_.state.equals(Constants.States.DOESNOTEXIST)) + val targetIndex = allIndexes.find(index => index.name.equals(indexName)) + + if (targetIndex.isEmpty) { + throw HyperspaceException(s"Index with name $indexName could not be found.") + } else if (!targetIndex.get.state.equals(Constants.States.ACTIVE)) { + throw HyperspaceException( + s"Index with name $indexName is not ACTIVE state: ${targetIndex.get.state}") + } + + val allActiveIndexes = allIndexes.filter(_.state.equals(Constants.States.ACTIVE)) + applyHyperspaceForAnalysis(plan, allActiveIndexes, Some(indexName)) + } + + private def applyHyperspaceForAnalysis( + plan: LogicalPlan, + indexes: Seq[IndexLogEntry], + indexName: Option[String] = None): ( + LogicalPlan, + Seq[(IndexLogEntry, Seq[(LogicalPlan, Seq[FilterReason])])], + Seq[(IndexLogEntry, Seq[(LogicalPlan, Seq[String])])]) = { + try { + prepareTagsForAnalysis(indexes) + val candidateIndexes = CandidateIndexCollector.apply(plan, indexes) + val transformedPlan = new ScoreBasedIndexPlanOptimizer().apply(plan, candidateIndexes) + ( + transformedPlan, + indexes + .filter(i => indexName.isEmpty || indexName.get.equals(i.name)) + .map(i => (i, i.getTagValuesForAllPlan(IndexLogEntryTags.FILTER_REASONS))), + indexes + .map(i => (i, i.getTagValuesForAllPlan(IndexLogEntryTags.APPLICABLE_INDEX_RULES)))) + } finally { + cleanupAnalysisTags(indexes) + } + } +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/plananalysis/FilterReason.scala b/src/main/scala/com/microsoft/hyperspace/index/plananalysis/FilterReason.scala new file mode 100644 index 000000000..88dbfd1df --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/plananalysis/FilterReason.scala @@ -0,0 +1,151 @@ +/* + * Copyright (2020) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.plananalysis + +trait FilterReason { + val args: Seq[(String, String)] + def codeStr: String + final def argStr: String = { + // key1=[value1], key2=[value2] + args.map(kv => s"${kv._1}=[${kv._2}]").mkString(", ") + } + def verboseStr: String +} + +trait FilterReasonNoArg extends FilterReason { + final override val args: Seq[(String, String)] = Seq.empty +} + +object FilterReasons { + + case class ColSchemaMismatch(sourceColumns: String, indexColumns: String) extends FilterReason { + override final val codeStr: String = "COL_SCHEMA_MISMATCH" + override val args = Seq("sourceColumns" -> sourceColumns, "indexColumns" -> indexColumns) + override def verboseStr: String = { + s"Column Schema does not match. Source data columns: [$sourceColumns], " + + s"Index columns: [$indexColumns]" + } + } + + case class SourceDataChanged() extends FilterReasonNoArg { + override final val codeStr: String = "SOURCE_DATA_CHANGED" + override def verboseStr: String = "Index signature does not match." + } + + case class NoDeleteSupport() extends FilterReasonNoArg { + override def codeStr: String = "NO_DELETE_SUPPORT" + override def verboseStr: String = "Index doesn't support deleted files." + } + + case class NoCommonFiles() extends FilterReasonNoArg { + override def codeStr: String = "NO_COMMON_FILES" + override def verboseStr: String = "No common files." + } + + case class TooMuchAppended(appendedRatio: String, hybridScanAppendThreshold: String) + extends FilterReason { + override final def codeStr: String = "TOO_MUCH_APPENDED" + override val args = Seq( + "appendedRatio" -> appendedRatio, + "hybridScanAppendThreshold" -> hybridScanAppendThreshold) + override def verboseStr: String = + s"Appended bytes ratio ($appendedRatio) is larger than " + + s"threshold config $hybridScanAppendThreshold). " + } + + case class TooMuchDeleted(deletedRatio: String, hybridScanDeleteThreshold: String) + extends FilterReason { + override final def codeStr: String = "TOO_MUCH_DELETED" + override val args = Seq( + "deletedRatio" -> deletedRatio, + "hybridScanDeleteThreshold" -> hybridScanDeleteThreshold) + override def verboseStr: String = + s"Deleted bytes ratio ($deletedRatio) is larger than " + + s"threshold config $hybridScanDeleteThreshold). " + } + + case class MissingRequiredCol(requiredCols: String, indexCols: String) extends FilterReason { + override final def codeStr: String = "MISSING_REQUIRED_COL" + override val args = Seq("requiredCols" -> requiredCols, "indexCols" -> indexCols) + override def verboseStr: String = + s"Index does not contain required columns. Required columns: [$requiredCols], " + + s"Index columns: [$indexCols]" + } + + case class NoFirstIndexedColCond(firstIndexedCol: String, filterCols: String) + extends FilterReason { + override final def codeStr: String = "NO_FIRST_INDEXED_COL_COND" + override val args = Seq("firstIndexedCol" -> firstIndexedCol, "filterCols" -> filterCols) + override def verboseStr: String = + "The first indexed column should be used in filter conditions. " + + s"The first indexed column: $firstIndexedCol, " + + s"Columns in filter condition: [$filterCols]" + } + + case class NotEligibleJoin(reason: String) extends FilterReason { + override final def codeStr: String = "NOT_ELIGIBLE_JOIN" + override val args = Seq("reason" -> reason) + override def verboseStr: String = + s"Join condition is not eligible. Reason: $reason" + } + + case class NoAvailJoinIndexPair(leftOrRight: String) extends FilterReason { + override def codeStr: String = "NO_AVAIL_JOIN_INDEX_PAIR" + override val args = Seq("child" -> leftOrRight) + override def verboseStr: String = + s"No available indexes for $leftOrRight subplan. " + + "Both left and right indexes are required for Join query." + } + + case class MissingIndexedCol( + leftOrRight: String, + requiredIndexedCols: String, + indexedCols: String) + extends FilterReason { + override final def codeStr: String = "MISSING_INDEXED_COL" + override val args = Seq( + "child" -> leftOrRight, + "requiredIndexedCols" -> requiredIndexedCols, + "indexedCols" -> indexedCols) + override def verboseStr: String = + s"Index does not contain required columns for $leftOrRight subplan. " + + s"Required indexed columns: [$requiredIndexedCols], " + + s"Indexed columns: [$indexedCols]" + } + + case class NotAllJoinColIndexed(leftOrRight: String, joinCols: String, indexedCols: String) + extends FilterReason { + override final def codeStr: String = "NOT_ALL_JOIN_COL_INDEXED" + override val args = + Seq("child" -> leftOrRight, "joinCols" -> joinCols, "indexedCols" -> indexedCols) + override def verboseStr: String = + s"All join condition column and indexed column should be the same. " + + s"Join columns: [$joinCols], Indexed columns for $leftOrRight subplan: [$indexedCols]" + } + + case class NoCompatibleJoinIndexPair() extends FilterReasonNoArg { + override final def codeStr: String = "NO_COMPATIBLE_JOIN_INDEX_PAIR" + override def verboseStr: String = "No compatible left and right index pair." + } + + case class AnotherIndexApplied(appliedIndex: String) extends FilterReason { + override final def codeStr: String = "ANOTHER_INDEX_APPLIED" + override val args = Seq("appliedIndex" -> appliedIndex) + override def verboseStr: String = + s"Another candidate index is applied: $appliedIndex" + } +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/plananalysis/PlanAnalyzer.scala b/src/main/scala/com/microsoft/hyperspace/index/plananalysis/PlanAnalyzer.scala index 3fbe2b132..a9336bf96 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/plananalysis/PlanAnalyzer.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/plananalysis/PlanAnalyzer.scala @@ -125,6 +125,10 @@ object PlanAnalyzer { outputStream, spark) outputStream.writeLine() + + buildHeader(outputStream, "Applicable indexes:") + outputStream.write(CandidateIndexAnalyzer.applicableIndexInfoString(spark, df)) + outputStream.writeLine() } outputStream.withTag() diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/ColumnSchemaFilter.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/ColumnSchemaFilter.scala index b18ae27f6..dfd8d25b1 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/ColumnSchemaFilter.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/ColumnSchemaFilter.scala @@ -19,6 +19,7 @@ package com.microsoft.hyperspace.index.rules import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import com.microsoft.hyperspace.index.IndexLogEntry +import com.microsoft.hyperspace.index.plananalysis.FilterReasons import com.microsoft.hyperspace.util.ResolverUtils /** @@ -32,9 +33,9 @@ object ColumnSchemaFilter extends SourcePlanIndexFilter { withFilterReasonTag( plan, index, - "Column Schema does not match. " + - s"Relation columns: [${relationColumnNames.mkString(", ")}], " + - s"Index columns: [${(index.derivedDataset.referencedColumns).mkString(", ")}]") { + FilterReasons.ColSchemaMismatch( + relationColumnNames.mkString(","), + index.derivedDataset.referencedColumns.mkString(","))) { ResolverUtils .resolve(spark, index.derivedDataset.referencedColumns, relationColumnNames) .isDefined diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/FileSignatureFilter.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/FileSignatureFilter.scala index df9733d6f..4846ef732 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/FileSignatureFilter.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/FileSignatureFilter.scala @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import com.microsoft.hyperspace.Hyperspace import com.microsoft.hyperspace.index.{IndexLogEntry, IndexLogEntryTags, LogicalPlanSignatureProvider} import com.microsoft.hyperspace.index.IndexLogEntryTags.{HYBRIDSCAN_RELATED_CONFIGS, IS_HYBRIDSCAN_CANDIDATE} +import com.microsoft.hyperspace.index.plananalysis.FilterReasons import com.microsoft.hyperspace.index.sources.FileBasedRelation import com.microsoft.hyperspace.util.HyperspaceConf @@ -59,10 +60,7 @@ object FileSignatureFilter extends SourcePlanIndexFilter { // Map of a signature provider to a signature generated for the given plan. val signatureMap = mutable.Map[String, Option[String]]() indexes.filter { index => - withFilterReasonTag( - plan, - index, - s"Index signature does not match. Try Hybrid Scan or refreshIndex.") { + withFilterReasonTag(plan, index, FilterReasons.SourceDataChanged()) { signatureValid(relation, index, signatureMap) } } @@ -133,22 +131,30 @@ object FileSignatureFilter extends SourcePlanIndexFilter { // Tag to original index log entry to check the reason string with the given log entry. lazy val hasLineageColumnCond = - withFilterReasonTag(relation.plan, index, "Index doesn't support deleted files.")( - entry.derivedDataset.canHandleDeletedFiles) + withFilterReasonTag(relation.plan, index, FilterReasons.NoDeleteSupport()) { + entry.derivedDataset.canHandleDeletedFiles + } lazy val hasCommonFilesCond = - withFilterReasonTag(relation.plan, index, "No common files.")(commonCnt > 0) + withFilterReasonTag(relation.plan, index, FilterReasons.NoCommonFiles()) { + commonCnt > 0 + } + + val hybridScanAppendThreshold = HyperspaceConf.hybridScanAppendedRatioThreshold(spark) + val hybridScanDeleteThreshold = HyperspaceConf.hybridScanDeletedRatioThreshold(spark) lazy val appendThresholdCond = withFilterReasonTag( relation.plan, index, - s"Appended bytes ratio ($appendedBytesRatio) is larger than " + - s"threshold config ${HyperspaceConf.hybridScanAppendedRatioThreshold(spark)}") { - appendedBytesRatio < HyperspaceConf.hybridScanAppendedRatioThreshold(spark) + FilterReasons.TooMuchAppended( + appendedBytesRatio.toString, + hybridScanAppendThreshold.toString)) { + appendedBytesRatio < hybridScanAppendThreshold } lazy val deleteThresholdCond = withFilterReasonTag( relation.plan, index, - s"Deleted bytes ratio ($deletedBytesRatio) is larger than " + - s"threshold config ${HyperspaceConf.hybridScanDeletedRatioThreshold(spark)}") { + FilterReasons.TooMuchDeleted( + deletedBytesRatio.toString, + hybridScanDeleteThreshold.toString)) { deletedBytesRatio < HyperspaceConf.hybridScanDeletedRatioThreshold(spark) } diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/HyperspaceRule.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/HyperspaceRule.scala index 5fbcfaab6..ab017cf8f 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/HyperspaceRule.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/HyperspaceRule.scala @@ -19,6 +19,7 @@ package com.microsoft.hyperspace.index.rules import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import com.microsoft.hyperspace.ActiveSparkSession +import com.microsoft.hyperspace.index.{IndexLogEntry, IndexLogEntryTags} import com.microsoft.hyperspace.index.rules.ApplyHyperspace.{PlanToIndexesMap, PlanToSelectedIndexMap} /** @@ -70,9 +71,21 @@ trait HyperspaceRule extends ActiveSparkSession { if (applicableIndexes.nonEmpty) { val selectedIndexes = indexRanker(plan, applicableIndexes) + selectedIndexes.values.toSeq.distinct.foreach(setApplicableIndexTag(plan, _)) (applyIndex(plan, selectedIndexes), score(plan, selectedIndexes)) } else { (plan, 0) } } + + protected def setApplicableIndexTag(plan: LogicalPlan, index: IndexLogEntry): Unit = { + if (index.getTagValue(IndexLogEntryTags.INDEX_PLAN_ANALYSIS_ENABLED).getOrElse(false)) { + val prevRules = + index.getTagValue(plan, IndexLogEntryTags.APPLICABLE_INDEX_RULES).getOrElse(Nil) + index.setTagValue( + plan, + IndexLogEntryTags.APPLICABLE_INDEX_RULES, + prevRules :+ getClass.getSimpleName.split("\\$").last) + } + } } diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/IndexFilter.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/IndexFilter.scala index 89a674b5c..2772cf4c2 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/IndexFilter.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/IndexFilter.scala @@ -20,6 +20,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import com.microsoft.hyperspace.ActiveSparkSession import com.microsoft.hyperspace.index.{IndexLogEntry, IndexLogEntryTags} +import com.microsoft.hyperspace.index.plananalysis.FilterReason trait IndexFilter extends ActiveSparkSession { @@ -30,19 +31,22 @@ trait IndexFilter extends ActiveSparkSession { * @param condition Flag to append reason string * @param plan Query plan to tag * @param index Index to tag - * @param reasonString Informational message in case condition is false. + * @param filterReason Informational reason in case condition is false. */ protected def setFilterReasonTag( condition: Boolean, plan: LogicalPlan, index: IndexLogEntry, - reasonString: => String): Unit = { + filterReason: => FilterReason): Unit = { if (!condition && index - .getTagValue(IndexLogEntryTags.FILTER_REASONS_ENABLED) - .getOrElse(false)) { + .getTagValue(IndexLogEntryTags.INDEX_PLAN_ANALYSIS_ENABLED) + .getOrElse(false)) { val prevReason = index.getTagValue(plan, IndexLogEntryTags.FILTER_REASONS).getOrElse(Nil) - index.setTagValue(plan, IndexLogEntryTags.FILTER_REASONS, prevReason :+ reasonString) + index.setTagValue( + plan, + IndexLogEntryTags.FILTER_REASONS, + prevReason :+ filterReason) } } @@ -50,18 +54,18 @@ trait IndexFilter extends ActiveSparkSession { * Append the reason string to FILTER_REASONS tag for the given index * if the result of the function is false and FILTER_REASONS tag is set to the index. * - * @param reasonString Informational message in case condition is false. * @param plan Query plan to tag * @param index Index to tag * @param f Function for a condition + * @param filterReason Informational reason in case condition is false. * @return Result of the given function */ protected def withFilterReasonTag( plan: LogicalPlan, index: IndexLogEntry, - reasonString: => String)(f: => Boolean): Boolean = { + filterReason: => FilterReason)(f: => Boolean): Boolean = { val ret = f - setFilterReasonTag(ret, plan, index, reasonString) + setFilterReasonTag(ret, plan, index, filterReason) ret } @@ -71,17 +75,17 @@ trait IndexFilter extends ActiveSparkSession { * * @param plan Query plan to tag * @param indexes Indexes to tag - * @param reasonString Informational message in case condition is false. + * @param filterReason Informational reason in case condition is false. * @param f Function for a condition * @return Result of the given function */ protected def withFilterReasonTag( plan: LogicalPlan, indexes: Seq[IndexLogEntry], - reasonString: => String)(f: => Boolean): Boolean = { + filterReason: => FilterReason)(f: => Boolean): Boolean = { val ret = f indexes.foreach { index => - setFilterReasonTag(ret, plan, index, reasonString) + setFilterReasonTag(ret, plan, index, filterReason) } ret } @@ -90,17 +94,17 @@ trait IndexFilter extends ActiveSparkSession { * Append the reason string to FILTER_REASONS tag for the given list of indexes * if FILTER_REASONS_ENABLED tag is set to the indexes. * - * @param reasonString Informational message in case condition is false. * @param plan Query plan to tag * @param indexes Indexes to tag + * @param filterReason Informational reason in case condition is false. * @return Result of the given function */ protected def setFilterReasonTag( plan: LogicalPlan, indexes: Seq[IndexLogEntry], - reasonString: => String): Unit = { + filterReason: => FilterReason): Unit = { indexes.foreach { index => - setFilterReasonTag(false, plan, index, reasonString) + setFilterReasonTag(false, plan, index, filterReason) } } } diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/IndexRankFilter.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/IndexRankFilter.scala index b759b6a46..cfea96bd5 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/IndexRankFilter.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/IndexRankFilter.scala @@ -19,6 +19,7 @@ package com.microsoft.hyperspace.index.rules import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import com.microsoft.hyperspace.index.IndexLogEntry +import com.microsoft.hyperspace.index.plananalysis.FilterReasons import com.microsoft.hyperspace.index.rules.ApplyHyperspace.{PlanToIndexesMap, PlanToSelectedIndexMap} /** @@ -51,7 +52,7 @@ trait IndexRankFilter extends IndexFilter { selectedIndex.name.equals(index.name), plan, index, - s"Another candidate index is applied: ${selectedIndex.name}") + FilterReasons.AnotherIndexApplied(selectedIndex.name)) } } } diff --git a/src/test/resources/expected/spark-2.4/selfJoin.txt b/src/test/resources/expected/spark-2.4/selfJoin.txt index 0333e01be..ad26b676d 100644 --- a/src/test/resources/expected/spark-2.4/selfJoin.txt +++ b/src/test/resources/expected/spark-2.4/selfJoin.txt @@ -44,3 +44,22 @@ Physical operator stats: | SortMergeJoin| 1| 1| 0| +----------------------------------------------------------+-------------------+------------------+----------+ +============================================================= +Applicable indexes: +============================================================= +Plan without Hyperspace: + +00 Join Inner, (Col1# = Col1#) +01 :- Filter isnotnull(Col1#) +02 : +- Relation[Col1#,Col2#] parquet +03 +- Filter isnotnull(Col1#) +04 +- Relation[Col1#,Col2#] parquet + ++---------+---------+---------+---------------+ +|SubPlan |IndexName|IndexType|RuleName | ++---------+---------+---------+---------------+ +|Filter @1|joinIndex|CI |FilterIndexRule| +|Filter @3|joinIndex|CI |FilterIndexRule| +|Join @0 |joinIndex|CI |JoinIndexRule | ++---------+---------+---------+---------------+ + diff --git a/src/test/resources/expected/spark-2.4/selfJoin_Iceberg.txt b/src/test/resources/expected/spark-2.4/selfJoin_Iceberg.txt index 4b14b81ef..8939d2e1c 100644 --- a/src/test/resources/expected/spark-2.4/selfJoin_Iceberg.txt +++ b/src/test/resources/expected/spark-2.4/selfJoin_Iceberg.txt @@ -43,3 +43,22 @@ Physical operator stats: | WholeStageCodegen| 3| 3| 0| +----------------------------------------------------------+-------------------+------------------+----------+ +============================================================= +Applicable indexes: +============================================================= +Plan without Hyperspace: + +00 Join Inner, (Col1# = Col1#) +01 :- Filter isnotnull(Col1#) +02 : +- RelationV2 iceberg[Col1#, Col2#] (Options: $icebergOptions) +03 +- Filter isnotnull(Col1#) +04 +- RelationV2 iceberg[Col1#, Col2#] (Options: $icebergOptions) + ++---------+---------+---------+---------------+ +|SubPlan |IndexName|IndexType|RuleName | ++---------+---------+---------+---------------+ +|Filter @1|joinIndex|CI |FilterIndexRule| +|Filter @3|joinIndex|CI |FilterIndexRule| +|Join @0 |joinIndex|CI |JoinIndexRule | ++---------+---------+---------+---------------+ + diff --git a/src/test/resources/expected/spark-2.4/subquery.txt b/src/test/resources/expected/spark-2.4/subquery.txt index 32716c697..775247a4b 100644 --- a/src/test/resources/expected/spark-2.4/subquery.txt +++ b/src/test/resources/expected/spark-2.4/subquery.txt @@ -45,3 +45,7 @@ Physical operator stats: |WholeStageCodegen| 1| 1| 0| +-----------------+-------------------+------------------+----------+ +============================================================= +Applicable indexes: +============================================================= +No applicable indexes. Try hyperspace.whyNot() diff --git a/src/test/resources/expected/spark-2.4/whyNot_allIndex.txt b/src/test/resources/expected/spark-2.4/whyNot_allIndex.txt new file mode 100644 index 000000000..67b2e840e --- /dev/null +++ b/src/test/resources/expected/spark-2.4/whyNot_allIndex.txt @@ -0,0 +1,63 @@ + +============================================================= +Plan with Hyperspace & Summary: +============================================================= +Join Inner, (c3# = c3#) +:- Project [c4#, c3#] +: +- Filter ((isnotnull(c4#) && (c4# = 2)) && isnotnull(c3#)) +: +- Relation[c3#,c4#] Hyperspace(Type: CI, Name: leftDfFilterIndex, LogVersion: 1) ++- Project [c5#, c3#] + +- Filter ((isnotnull(c5#) && (c5# = 3000)) && isnotnull(c3#)) + +- Relation[c3#,c5#] Hyperspace(Type: CI, Name: rightDfFilterIndex, LogVersion: 1) + +Applied indexes: +- leftDfFilterIndex +- rightDfFilterIndex + +Applicable indexes, but not applied due to priority: +- leftDfJoinIndex +- rightDfJoinIndex + +Non-applicable indexes - index is outdated: +- No such index found. + +Non-applicable indexes - no applicable query plan: +- No such index found. + +For more information, please visit: https://microsoft.github.io/hyperspace/docs/why-not-result-analysis + +============================================================= +Plan without Hyperspace & WhyNot reasons: +============================================================= +00 Join Inner, (c3# = c3#) +01 :- Project [c4#, c3#] +02 : +- Filter ((isnotnull(c4#) && (c4# = 2)) && isnotnull(c3#)) +03 : +- Relation[c1#,c2#,c3#,c4#,c5#] parquet +04 +- Project [c5#, c3#] +05 +- Filter ((isnotnull(c5#) && (c5# = 3000)) && isnotnull(c3#)) +06 +- Relation[c1#,c2#,c3#,c4#,c5#] parquet + ++----------+------------------+---------+-------------------------+------------------------------------------------------------+ +|SubPlan |IndexName |IndexType|Reason |Message | ++----------+------------------+---------+-------------------------+------------------------------------------------------------+ +|Filter @2 |leftDfFilterIndex |CI |MISSING_REQUIRED_COL |requiredCols=[c3,c4,c5,c2,c1], indexCols=[c4,c3] | +|Filter @2 |leftDfJoinIndex |CI |MISSING_REQUIRED_COL |requiredCols=[c3,c4,c5,c2,c1], indexCols=[c3,c4] | +|Filter @2 |rightDfFilterIndex|CI |NO_FIRST_INDEXED_COL_COND|firstIndexedCol=[c5], filterCols=[c4,c3] | +|Filter @2 |rightDfJoinIndex |CI |MISSING_REQUIRED_COL |requiredCols=[c3,c4,c5,c2,c1], indexCols=[c3,c5] | +|Filter @5 |leftDfFilterIndex |CI |NO_FIRST_INDEXED_COL_COND|firstIndexedCol=[c4], filterCols=[c5,c3] | +|Filter @5 |leftDfJoinIndex |CI |MISSING_REQUIRED_COL |requiredCols=[c3,c4,c5,c2,c1], indexCols=[c3,c4] | +|Filter @5 |rightDfFilterIndex|CI |MISSING_REQUIRED_COL |requiredCols=[c3,c4,c5,c2,c1], indexCols=[c5,c3] | +|Filter @5 |rightDfJoinIndex |CI |MISSING_REQUIRED_COL |requiredCols=[c3,c4,c5,c2,c1], indexCols=[c3,c5] | +|Join @0 |leftDfFilterIndex |CI |NOT_ALL_JOIN_COL_INDEXED |child=[left], joinCols=[c3], indexedCols=[c4] | +|Join @0 |leftDfFilterIndex |CI |NOT_ALL_JOIN_COL_INDEXED |child=[right], joinCols=[c3], indexedCols=[c4] | +|Join @0 |leftDfJoinIndex |CI |MISSING_INDEXED_COL |child=[right], requiredIndexedCols=[c5,c3], indexedCols=[c3]| +|Join @0 |rightDfFilterIndex|CI |NOT_ALL_JOIN_COL_INDEXED |child=[left], joinCols=[c3], indexedCols=[c5] | +|Join @0 |rightDfFilterIndex|CI |NOT_ALL_JOIN_COL_INDEXED |child=[right], joinCols=[c3], indexedCols=[c5] | +|Join @0 |rightDfJoinIndex |CI |MISSING_INDEXED_COL |child=[left], requiredIndexedCols=[c4,c3], indexedCols=[c3] | +|Project @1|leftDfJoinIndex |CI |ANOTHER_INDEX_APPLIED |appliedIndex=[leftDfFilterIndex] | +|Project @1|rightDfFilterIndex|CI |NO_FIRST_INDEXED_COL_COND|firstIndexedCol=[c5], filterCols=[c4,c3] | +|Project @1|rightDfJoinIndex |CI |MISSING_REQUIRED_COL |requiredCols=[c4,c3], indexCols=[c3,c5] | +|Project @4|leftDfFilterIndex |CI |NO_FIRST_INDEXED_COL_COND|firstIndexedCol=[c4], filterCols=[c5,c3] | +|Project @4|leftDfJoinIndex |CI |MISSING_REQUIRED_COL |requiredCols=[c5,c3], indexCols=[c3,c4] | +|Project @4|rightDfJoinIndex |CI |ANOTHER_INDEX_APPLIED |appliedIndex=[rightDfFilterIndex] | ++----------+------------------+---------+-------------------------+------------------------------------------------------------+ diff --git a/src/test/resources/expected/spark-2.4/whyNot_indexName.txt b/src/test/resources/expected/spark-2.4/whyNot_indexName.txt new file mode 100644 index 000000000..2c6832076 --- /dev/null +++ b/src/test/resources/expected/spark-2.4/whyNot_indexName.txt @@ -0,0 +1,48 @@ + +============================================================= +Plan with Hyperspace & Summary: +============================================================= +Join Inner, (c3# = c3#) +:- Project [c4#, c3#] +: +- Filter ((isnotnull(c4#) && (c4# = 2)) && isnotnull(c3#)) +: +- Relation[c3#,c4#] Hyperspace(Type: CI, Name: leftDfFilterIndex, LogVersion: 1) ++- Project [c5#, c3#] + +- Filter ((isnotnull(c5#) && (c5# = 3000)) && isnotnull(c3#)) + +- Relation[c3#,c5#] Hyperspace(Type: CI, Name: rightDfFilterIndex, LogVersion: 1) + +Applied indexes: +- leftDfFilterIndex +- rightDfFilterIndex + +Applicable indexes, but not applied due to priority: +- leftDfJoinIndex +- rightDfJoinIndex + +Non-applicable indexes - index is outdated: +- No such index found. + +Non-applicable indexes - no applicable query plan: +- No such index found. + +For more information, please visit: https://microsoft.github.io/hyperspace/docs/why-not-result-analysis + +============================================================= +Plan without Hyperspace & WhyNot reasons: +============================================================= +00 Join Inner, (c3# = c3#) +01 :- Project [c4#, c3#] +02 : +- Filter ((isnotnull(c4#) && (c4# = 2)) && isnotnull(c3#)) +03 : +- Relation[c1#,c2#,c3#,c4#,c5#] parquet +04 +- Project [c5#, c3#] +05 +- Filter ((isnotnull(c5#) && (c5# = 3000)) && isnotnull(c3#)) +06 +- Relation[c1#,c2#,c3#,c4#,c5#] parquet + ++----------+---------------+---------+---------------------+------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------+ +|SubPlan |IndexName |IndexType|Reason |Message |VerboseMessage | ++----------+---------------+---------+---------------------+------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------+ +|Filter @2 |leftDfJoinIndex|CI |MISSING_REQUIRED_COL |requiredCols=[c3,c4,c5,c2,c1], indexCols=[c3,c4] |Index does not contain required columns. Required columns: [c3,c4,c5,c2,c1], Index columns: [c3,c4] | +|Filter @5 |leftDfJoinIndex|CI |MISSING_REQUIRED_COL |requiredCols=[c3,c4,c5,c2,c1], indexCols=[c3,c4] |Index does not contain required columns. Required columns: [c3,c4,c5,c2,c1], Index columns: [c3,c4] | +|Join @0 |leftDfJoinIndex|CI |MISSING_INDEXED_COL |child=[right], requiredIndexedCols=[c5,c3], indexedCols=[c3]|Index does not contain required columns for right subplan. Required indexed columns: [c5,c3], Indexed columns: [c3]| +|Project @1|leftDfJoinIndex|CI |ANOTHER_INDEX_APPLIED|appliedIndex=[leftDfFilterIndex] |Another candidate index is applied: leftDfFilterIndex | +|Project @4|leftDfJoinIndex|CI |MISSING_REQUIRED_COL |requiredCols=[c5,c3], indexCols=[c3,c4] |Index does not contain required columns. Required columns: [c5,c3], Index columns: [c3,c4] | ++----------+---------------+---------+---------------------+------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------+ diff --git a/src/test/resources/expected/spark-3.0/selfJoin.txt b/src/test/resources/expected/spark-3.0/selfJoin.txt index 2832623b6..319d014f7 100644 --- a/src/test/resources/expected/spark-3.0/selfJoin.txt +++ b/src/test/resources/expected/spark-3.0/selfJoin.txt @@ -52,3 +52,22 @@ Physical operator stats: | WholeStageCodegen (2)| 1| 1| 0| +----------------------------------------------------------+-------------------+------------------+----------+ +============================================================= +Applicable indexes: +============================================================= +Plan without Hyperspace: + +00 Join Inner, (Col1# = Col1#) +01 :- Filter isnotnull(Col1#) +02 : +- Relation[Col1#,Col2#] parquet +03 +- Filter isnotnull(Col1#) +04 +- Relation[Col1#,Col2#] parquet + ++---------+---------+---------+---------------+ +|SubPlan |IndexName|IndexType|RuleName | ++---------+---------+---------+---------------+ +|Filter @1|joinIndex|CI |FilterIndexRule| +|Filter @3|joinIndex|CI |FilterIndexRule| +|Join @0 |joinIndex|CI |JoinIndexRule | ++---------+---------+---------+---------------+ + diff --git a/src/test/resources/expected/spark-3.0/selfJoin_Iceberg.txt b/src/test/resources/expected/spark-3.0/selfJoin_Iceberg.txt index 0b72df8ef..bab01a0f9 100644 --- a/src/test/resources/expected/spark-3.0/selfJoin_Iceberg.txt +++ b/src/test/resources/expected/spark-3.0/selfJoin_Iceberg.txt @@ -48,3 +48,22 @@ Physical operator stats: | WholeStageCodegen (3)| 1| 1| 0| +----------------------------------------------------------+-------------------+------------------+----------+ +============================================================= +Applicable indexes: +============================================================= +Plan without Hyperspace: + +00 Join Inner, (Col1# = Col1#) +01 :- Filter isnotnull(Col1#) +02 : +- RelationV2[Col1#, Col2#] $icebergPath +03 +- Filter isnotnull(Col1#) +04 +- RelationV2[Col1#, Col2#] $icebergPath + ++---------+---------+---------+---------------+ +|SubPlan |IndexName|IndexType|RuleName | ++---------+---------+---------+---------------+ +|Filter @1|joinIndex|CI |FilterIndexRule| +|Filter @3|joinIndex|CI |FilterIndexRule| +|Join @0 |joinIndex|CI |JoinIndexRule | ++---------+---------+---------+---------------+ + diff --git a/src/test/resources/expected/spark-3.0/subquery.txt b/src/test/resources/expected/spark-3.0/subquery.txt index c306eec78..4aea12e39 100644 --- a/src/test/resources/expected/spark-3.0/subquery.txt +++ b/src/test/resources/expected/spark-3.0/subquery.txt @@ -43,3 +43,7 @@ Physical operator stats: |WholeStageCodegen (1)| 1| 1| 0| +---------------------+-------------------+------------------+----------+ +============================================================= +Applicable indexes: +============================================================= +No applicable indexes. Try hyperspace.whyNot() diff --git a/src/test/resources/expected/spark-3.0/whyNot_allIndex.txt b/src/test/resources/expected/spark-3.0/whyNot_allIndex.txt new file mode 100644 index 000000000..1926c9d7c --- /dev/null +++ b/src/test/resources/expected/spark-3.0/whyNot_allIndex.txt @@ -0,0 +1,63 @@ + +============================================================= +Plan with Hyperspace & Summary: +============================================================= +Join Inner, (c3# = c3#) +:- Project [c4#, c3#] +: +- Filter ((isnotnull(c4#) AND (c4# = 2)) AND isnotnull(c3#)) +: +- Relation[c3#,c4#] Hyperspace(Type: CI, Name: leftDfFilterIndex, LogVersion: 1) ++- Project [c5#, c3#] + +- Filter ((isnotnull(c5#) AND (c5# = 3000)) AND isnotnull(c3#)) + +- Relation[c3#,c5#] Hyperspace(Type: CI, Name: rightDfFilterIndex, LogVersion: 1) + +Applied indexes: +- leftDfFilterIndex +- rightDfFilterIndex + +Applicable indexes, but not applied due to priority: +- leftDfJoinIndex +- rightDfJoinIndex + +Non-applicable indexes - index is outdated: +- No such index found. + +Non-applicable indexes - no applicable query plan: +- No such index found. + +For more information, please visit: https://microsoft.github.io/hyperspace/docs/why-not-result-analysis + +============================================================= +Plan without Hyperspace & WhyNot reasons: +============================================================= +00 Join Inner, (c3# = c3#) +01 :- Project [c4#, c3#] +02 : +- Filter ((isnotnull(c4#) AND (c4# = 2)) AND isnotnull(c3#)) +03 : +- Relation[c1#,c2#,c3#,c4#,c5#] parquet +04 +- Project [c5#, c3#] +05 +- Filter ((isnotnull(c5#) AND (c5# = 3000)) AND isnotnull(c3#)) +06 +- Relation[c1#,c2#,c3#,c4#,c5#] parquet + ++----------+------------------+---------+-------------------------+------------------------------------------------------------+ +|SubPlan |IndexName |IndexType|Reason |Message | ++----------+------------------+---------+-------------------------+------------------------------------------------------------+ +|Filter @2 |leftDfFilterIndex |CI |MISSING_REQUIRED_COL |requiredCols=[c3,c4,c5,c2,c1], indexCols=[c4,c3] | +|Filter @2 |leftDfJoinIndex |CI |MISSING_REQUIRED_COL |requiredCols=[c3,c4,c5,c2,c1], indexCols=[c3,c4] | +|Filter @2 |rightDfFilterIndex|CI |NO_FIRST_INDEXED_COL_COND|firstIndexedCol=[c5], filterCols=[c4,c3] | +|Filter @2 |rightDfJoinIndex |CI |MISSING_REQUIRED_COL |requiredCols=[c3,c4,c5,c2,c1], indexCols=[c3,c5] | +|Filter @5 |leftDfFilterIndex |CI |NO_FIRST_INDEXED_COL_COND|firstIndexedCol=[c4], filterCols=[c5,c3] | +|Filter @5 |leftDfJoinIndex |CI |MISSING_REQUIRED_COL |requiredCols=[c3,c4,c5,c2,c1], indexCols=[c3,c4] | +|Filter @5 |rightDfFilterIndex|CI |MISSING_REQUIRED_COL |requiredCols=[c3,c4,c5,c2,c1], indexCols=[c5,c3] | +|Filter @5 |rightDfJoinIndex |CI |MISSING_REQUIRED_COL |requiredCols=[c3,c4,c5,c2,c1], indexCols=[c3,c5] | +|Join @0 |leftDfFilterIndex |CI |NOT_ALL_JOIN_COL_INDEXED |child=[left], joinCols=[c3], indexedCols=[c4] | +|Join @0 |leftDfFilterIndex |CI |NOT_ALL_JOIN_COL_INDEXED |child=[right], joinCols=[c3], indexedCols=[c4] | +|Join @0 |leftDfJoinIndex |CI |MISSING_INDEXED_COL |child=[right], requiredIndexedCols=[c5,c3], indexedCols=[c3]| +|Join @0 |rightDfFilterIndex|CI |NOT_ALL_JOIN_COL_INDEXED |child=[left], joinCols=[c3], indexedCols=[c5] | +|Join @0 |rightDfFilterIndex|CI |NOT_ALL_JOIN_COL_INDEXED |child=[right], joinCols=[c3], indexedCols=[c5] | +|Join @0 |rightDfJoinIndex |CI |MISSING_INDEXED_COL |child=[left], requiredIndexedCols=[c4,c3], indexedCols=[c3] | +|Project @1|leftDfJoinIndex |CI |ANOTHER_INDEX_APPLIED |appliedIndex=[leftDfFilterIndex] | +|Project @1|rightDfFilterIndex|CI |NO_FIRST_INDEXED_COL_COND|firstIndexedCol=[c5], filterCols=[c4,c3] | +|Project @1|rightDfJoinIndex |CI |MISSING_REQUIRED_COL |requiredCols=[c4,c3], indexCols=[c3,c5] | +|Project @4|leftDfFilterIndex |CI |NO_FIRST_INDEXED_COL_COND|firstIndexedCol=[c4], filterCols=[c5,c3] | +|Project @4|leftDfJoinIndex |CI |MISSING_REQUIRED_COL |requiredCols=[c5,c3], indexCols=[c3,c4] | +|Project @4|rightDfJoinIndex |CI |ANOTHER_INDEX_APPLIED |appliedIndex=[rightDfFilterIndex] | ++----------+------------------+---------+-------------------------+------------------------------------------------------------+ diff --git a/src/test/resources/expected/spark-3.0/whyNot_indexName.txt b/src/test/resources/expected/spark-3.0/whyNot_indexName.txt new file mode 100644 index 000000000..325a43fa5 --- /dev/null +++ b/src/test/resources/expected/spark-3.0/whyNot_indexName.txt @@ -0,0 +1,48 @@ + +============================================================= +Plan with Hyperspace & Summary: +============================================================= +Join Inner, (c3# = c3#) +:- Project [c4#, c3#] +: +- Filter ((isnotnull(c4#) AND (c4# = 2)) AND isnotnull(c3#)) +: +- Relation[c3#,c4#] Hyperspace(Type: CI, Name: leftDfFilterIndex, LogVersion: 1) ++- Project [c5#, c3#] + +- Filter ((isnotnull(c5#) AND (c5# = 3000)) AND isnotnull(c3#)) + +- Relation[c3#,c5#] Hyperspace(Type: CI, Name: rightDfFilterIndex, LogVersion: 1) + +Applied indexes: +- leftDfFilterIndex +- rightDfFilterIndex + +Applicable indexes, but not applied due to priority: +- leftDfJoinIndex +- rightDfJoinIndex + +Non-applicable indexes - index is outdated: +- No such index found. + +Non-applicable indexes - no applicable query plan: +- No such index found. + +For more information, please visit: https://microsoft.github.io/hyperspace/docs/why-not-result-analysis + +============================================================= +Plan without Hyperspace & WhyNot reasons: +============================================================= +00 Join Inner, (c3# = c3#) +01 :- Project [c4#, c3#] +02 : +- Filter ((isnotnull(c4#) AND (c4# = 2)) AND isnotnull(c3#)) +03 : +- Relation[c1#,c2#,c3#,c4#,c5#] parquet +04 +- Project [c5#, c3#] +05 +- Filter ((isnotnull(c5#) AND (c5# = 3000)) AND isnotnull(c3#)) +06 +- Relation[c1#,c2#,c3#,c4#,c5#] parquet + ++----------+---------------+---------+---------------------+------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------+ +|SubPlan |IndexName |IndexType|Reason |Message |VerboseMessage | ++----------+---------------+---------+---------------------+------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------+ +|Filter @2 |leftDfJoinIndex|CI |MISSING_REQUIRED_COL |requiredCols=[c3,c4,c5,c2,c1], indexCols=[c3,c4] |Index does not contain required columns. Required columns: [c3,c4,c5,c2,c1], Index columns: [c3,c4] | +|Filter @5 |leftDfJoinIndex|CI |MISSING_REQUIRED_COL |requiredCols=[c3,c4,c5,c2,c1], indexCols=[c3,c4] |Index does not contain required columns. Required columns: [c3,c4,c5,c2,c1], Index columns: [c3,c4] | +|Join @0 |leftDfJoinIndex|CI |MISSING_INDEXED_COL |child=[right], requiredIndexedCols=[c5,c3], indexedCols=[c3]|Index does not contain required columns for right subplan. Required indexed columns: [c5,c3], Indexed columns: [c3]| +|Project @1|leftDfJoinIndex|CI |ANOTHER_INDEX_APPLIED|appliedIndex=[leftDfFilterIndex] |Another candidate index is applied: leftDfFilterIndex | +|Project @4|leftDfJoinIndex|CI |MISSING_REQUIRED_COL |requiredCols=[c5,c3], indexCols=[c3,c4] |Index does not contain required columns. Required columns: [c5,c3], Index columns: [c3,c4] | ++----------+---------------+---------+---------------------+------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------+ diff --git a/src/test/resources/expected/spark-3.1/selfJoin.txt b/src/test/resources/expected/spark-3.1/selfJoin.txt index e7770d082..fe95b14eb 100644 --- a/src/test/resources/expected/spark-3.1/selfJoin.txt +++ b/src/test/resources/expected/spark-3.1/selfJoin.txt @@ -48,3 +48,22 @@ Physical operator stats: | WholeStageCodegen (2)| 1| 1| 0| +----------------------------------------------------------+-------------------+------------------+----------+ +============================================================= +Applicable indexes: +============================================================= +Plan without Hyperspace: + +00 Join Inner, (Col1# = Col1#) +01 :- Filter isnotnull(Col1#) +02 : +- Relation[Col1#,Col2#] parquet +03 +- Filter isnotnull(Col1#) +04 +- Relation[Col1#,Col2#] parquet + ++---------+---------+---------+---------------+ +|SubPlan |IndexName|IndexType|RuleName | ++---------+---------+---------+---------------+ +|Filter @1|joinIndex|CI |FilterIndexRule| +|Filter @3|joinIndex|CI |FilterIndexRule| +|Join @0 |joinIndex|CI |JoinIndexRule | ++---------+---------+---------+---------------+ + diff --git a/src/test/resources/expected/spark-3.1/selfJoin_Iceberg.txt b/src/test/resources/expected/spark-3.1/selfJoin_Iceberg.txt index 0660999ed..771ec23a0 100644 --- a/src/test/resources/expected/spark-3.1/selfJoin_Iceberg.txt +++ b/src/test/resources/expected/spark-3.1/selfJoin_Iceberg.txt @@ -43,3 +43,22 @@ Physical operator stats: | WholeStageCodegen (3)| 1| 1| 0| +----------------------------------------------------------+-------------------+------------------+----------+ +============================================================= +Applicable indexes: +============================================================= +Plan without Hyperspace: + +00 Join Inner, (Col1# = Col1#) +01 :- Filter isnotnull(Col1#) +02 : +- RelationV2[Col1#, Col2#] $icebergPath +03 +- Filter isnotnull(Col1#) +04 +- RelationV2[Col1#, Col2#] $icebergPath + ++---------+---------+---------+---------------+ +|SubPlan |IndexName|IndexType|RuleName | ++---------+---------+---------+---------------+ +|Filter @1|joinIndex|CI |FilterIndexRule| +|Filter @3|joinIndex|CI |FilterIndexRule| +|Join @0 |joinIndex|CI |JoinIndexRule | ++---------+---------+---------+---------------+ + diff --git a/src/test/resources/expected/spark-3.1/subquery.txt b/src/test/resources/expected/spark-3.1/subquery.txt index e4be7a355..c83b78a6d 100644 --- a/src/test/resources/expected/spark-3.1/subquery.txt +++ b/src/test/resources/expected/spark-3.1/subquery.txt @@ -40,3 +40,7 @@ Physical operator stats: |WholeStageCodegen (1)| 1| 1| 0| +---------------------+-------------------+------------------+----------+ +============================================================= +Applicable indexes: +============================================================= +No applicable indexes. Try hyperspace.whyNot() diff --git a/src/test/resources/expected/spark-3.1/whyNot_allIndex.txt b/src/test/resources/expected/spark-3.1/whyNot_allIndex.txt new file mode 100644 index 000000000..1926c9d7c --- /dev/null +++ b/src/test/resources/expected/spark-3.1/whyNot_allIndex.txt @@ -0,0 +1,63 @@ + +============================================================= +Plan with Hyperspace & Summary: +============================================================= +Join Inner, (c3# = c3#) +:- Project [c4#, c3#] +: +- Filter ((isnotnull(c4#) AND (c4# = 2)) AND isnotnull(c3#)) +: +- Relation[c3#,c4#] Hyperspace(Type: CI, Name: leftDfFilterIndex, LogVersion: 1) ++- Project [c5#, c3#] + +- Filter ((isnotnull(c5#) AND (c5# = 3000)) AND isnotnull(c3#)) + +- Relation[c3#,c5#] Hyperspace(Type: CI, Name: rightDfFilterIndex, LogVersion: 1) + +Applied indexes: +- leftDfFilterIndex +- rightDfFilterIndex + +Applicable indexes, but not applied due to priority: +- leftDfJoinIndex +- rightDfJoinIndex + +Non-applicable indexes - index is outdated: +- No such index found. + +Non-applicable indexes - no applicable query plan: +- No such index found. + +For more information, please visit: https://microsoft.github.io/hyperspace/docs/why-not-result-analysis + +============================================================= +Plan without Hyperspace & WhyNot reasons: +============================================================= +00 Join Inner, (c3# = c3#) +01 :- Project [c4#, c3#] +02 : +- Filter ((isnotnull(c4#) AND (c4# = 2)) AND isnotnull(c3#)) +03 : +- Relation[c1#,c2#,c3#,c4#,c5#] parquet +04 +- Project [c5#, c3#] +05 +- Filter ((isnotnull(c5#) AND (c5# = 3000)) AND isnotnull(c3#)) +06 +- Relation[c1#,c2#,c3#,c4#,c5#] parquet + ++----------+------------------+---------+-------------------------+------------------------------------------------------------+ +|SubPlan |IndexName |IndexType|Reason |Message | ++----------+------------------+---------+-------------------------+------------------------------------------------------------+ +|Filter @2 |leftDfFilterIndex |CI |MISSING_REQUIRED_COL |requiredCols=[c3,c4,c5,c2,c1], indexCols=[c4,c3] | +|Filter @2 |leftDfJoinIndex |CI |MISSING_REQUIRED_COL |requiredCols=[c3,c4,c5,c2,c1], indexCols=[c3,c4] | +|Filter @2 |rightDfFilterIndex|CI |NO_FIRST_INDEXED_COL_COND|firstIndexedCol=[c5], filterCols=[c4,c3] | +|Filter @2 |rightDfJoinIndex |CI |MISSING_REQUIRED_COL |requiredCols=[c3,c4,c5,c2,c1], indexCols=[c3,c5] | +|Filter @5 |leftDfFilterIndex |CI |NO_FIRST_INDEXED_COL_COND|firstIndexedCol=[c4], filterCols=[c5,c3] | +|Filter @5 |leftDfJoinIndex |CI |MISSING_REQUIRED_COL |requiredCols=[c3,c4,c5,c2,c1], indexCols=[c3,c4] | +|Filter @5 |rightDfFilterIndex|CI |MISSING_REQUIRED_COL |requiredCols=[c3,c4,c5,c2,c1], indexCols=[c5,c3] | +|Filter @5 |rightDfJoinIndex |CI |MISSING_REQUIRED_COL |requiredCols=[c3,c4,c5,c2,c1], indexCols=[c3,c5] | +|Join @0 |leftDfFilterIndex |CI |NOT_ALL_JOIN_COL_INDEXED |child=[left], joinCols=[c3], indexedCols=[c4] | +|Join @0 |leftDfFilterIndex |CI |NOT_ALL_JOIN_COL_INDEXED |child=[right], joinCols=[c3], indexedCols=[c4] | +|Join @0 |leftDfJoinIndex |CI |MISSING_INDEXED_COL |child=[right], requiredIndexedCols=[c5,c3], indexedCols=[c3]| +|Join @0 |rightDfFilterIndex|CI |NOT_ALL_JOIN_COL_INDEXED |child=[left], joinCols=[c3], indexedCols=[c5] | +|Join @0 |rightDfFilterIndex|CI |NOT_ALL_JOIN_COL_INDEXED |child=[right], joinCols=[c3], indexedCols=[c5] | +|Join @0 |rightDfJoinIndex |CI |MISSING_INDEXED_COL |child=[left], requiredIndexedCols=[c4,c3], indexedCols=[c3] | +|Project @1|leftDfJoinIndex |CI |ANOTHER_INDEX_APPLIED |appliedIndex=[leftDfFilterIndex] | +|Project @1|rightDfFilterIndex|CI |NO_FIRST_INDEXED_COL_COND|firstIndexedCol=[c5], filterCols=[c4,c3] | +|Project @1|rightDfJoinIndex |CI |MISSING_REQUIRED_COL |requiredCols=[c4,c3], indexCols=[c3,c5] | +|Project @4|leftDfFilterIndex |CI |NO_FIRST_INDEXED_COL_COND|firstIndexedCol=[c4], filterCols=[c5,c3] | +|Project @4|leftDfJoinIndex |CI |MISSING_REQUIRED_COL |requiredCols=[c5,c3], indexCols=[c3,c4] | +|Project @4|rightDfJoinIndex |CI |ANOTHER_INDEX_APPLIED |appliedIndex=[rightDfFilterIndex] | ++----------+------------------+---------+-------------------------+------------------------------------------------------------+ diff --git a/src/test/resources/expected/spark-3.1/whyNot_indexName.txt b/src/test/resources/expected/spark-3.1/whyNot_indexName.txt new file mode 100644 index 000000000..325a43fa5 --- /dev/null +++ b/src/test/resources/expected/spark-3.1/whyNot_indexName.txt @@ -0,0 +1,48 @@ + +============================================================= +Plan with Hyperspace & Summary: +============================================================= +Join Inner, (c3# = c3#) +:- Project [c4#, c3#] +: +- Filter ((isnotnull(c4#) AND (c4# = 2)) AND isnotnull(c3#)) +: +- Relation[c3#,c4#] Hyperspace(Type: CI, Name: leftDfFilterIndex, LogVersion: 1) ++- Project [c5#, c3#] + +- Filter ((isnotnull(c5#) AND (c5# = 3000)) AND isnotnull(c3#)) + +- Relation[c3#,c5#] Hyperspace(Type: CI, Name: rightDfFilterIndex, LogVersion: 1) + +Applied indexes: +- leftDfFilterIndex +- rightDfFilterIndex + +Applicable indexes, but not applied due to priority: +- leftDfJoinIndex +- rightDfJoinIndex + +Non-applicable indexes - index is outdated: +- No such index found. + +Non-applicable indexes - no applicable query plan: +- No such index found. + +For more information, please visit: https://microsoft.github.io/hyperspace/docs/why-not-result-analysis + +============================================================= +Plan without Hyperspace & WhyNot reasons: +============================================================= +00 Join Inner, (c3# = c3#) +01 :- Project [c4#, c3#] +02 : +- Filter ((isnotnull(c4#) AND (c4# = 2)) AND isnotnull(c3#)) +03 : +- Relation[c1#,c2#,c3#,c4#,c5#] parquet +04 +- Project [c5#, c3#] +05 +- Filter ((isnotnull(c5#) AND (c5# = 3000)) AND isnotnull(c3#)) +06 +- Relation[c1#,c2#,c3#,c4#,c5#] parquet + ++----------+---------------+---------+---------------------+------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------+ +|SubPlan |IndexName |IndexType|Reason |Message |VerboseMessage | ++----------+---------------+---------+---------------------+------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------+ +|Filter @2 |leftDfJoinIndex|CI |MISSING_REQUIRED_COL |requiredCols=[c3,c4,c5,c2,c1], indexCols=[c3,c4] |Index does not contain required columns. Required columns: [c3,c4,c5,c2,c1], Index columns: [c3,c4] | +|Filter @5 |leftDfJoinIndex|CI |MISSING_REQUIRED_COL |requiredCols=[c3,c4,c5,c2,c1], indexCols=[c3,c4] |Index does not contain required columns. Required columns: [c3,c4,c5,c2,c1], Index columns: [c3,c4] | +|Join @0 |leftDfJoinIndex|CI |MISSING_INDEXED_COL |child=[right], requiredIndexedCols=[c5,c3], indexedCols=[c3]|Index does not contain required columns for right subplan. Required indexed columns: [c5,c3], Indexed columns: [c3]| +|Project @1|leftDfJoinIndex|CI |ANOTHER_INDEX_APPLIED|appliedIndex=[leftDfFilterIndex] |Another candidate index is applied: leftDfFilterIndex | +|Project @4|leftDfJoinIndex|CI |MISSING_REQUIRED_COL |requiredCols=[c5,c3], indexCols=[c3,c4] |Index does not contain required columns. Required columns: [c5,c3], Index columns: [c3,c4] | ++----------+---------------+---------+---------------------+------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------+ diff --git a/src/test/scala/com/microsoft/hyperspace/index/covering/FilterIndexRuleTest.scala b/src/test/scala/com/microsoft/hyperspace/index/covering/FilterIndexRuleTest.scala index cb59ec1bc..6631b7b02 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/covering/FilterIndexRuleTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/covering/FilterIndexRuleTest.scala @@ -117,23 +117,24 @@ class FilterIndexRuleTest extends HyperspaceRuleSuite { val originalPlan = Project(Seq(c2, c3, c4), filterNode) // c4 is not covered by index val allIndexes = IndexCollectionManager(spark).getIndexes(Seq(Constants.States.ACTIVE)) - allIndexes.foreach(_.setTagValue(IndexLogEntryTags.FILTER_REASONS_ENABLED, true)) + allIndexes.foreach(_.setTagValue(IndexLogEntryTags.INDEX_PLAN_ANALYSIS_ENABLED, true)) val (transformedPlan, score) = applyFilterIndexRuleHelper(originalPlan, allIndexes) assert(transformedPlan.equals(originalPlan), "Plan should not transform.") allIndexes.foreach { index => - val msg = index.getTagValue(originalPlan, IndexLogEntryTags.FILTER_REASONS) + val reasons = index.getTagValue(originalPlan, IndexLogEntryTags.FILTER_REASONS) + assert(reasons.isDefined) + val msg = reasons.get.map(_.verboseStr) index.name match { case `indexName1` => - assert(msg.isDefined) assert( - msg.get.exists( - _.equals("Index does not contain required columns. Required columns: " + - "[c3,c2,c3,c4], Indexed & included columns: [c3,c2,c1]"))) + msg.exists( + _.equals( + s"Index does not contain required columns. " + + "Required columns: [c3,c2,c4], Index columns: [c3,c2,c1]"))) case `indexName2` | `indexName3` => - assert(msg.isDefined) assert( - msg.get.exists( - _.contains("The first indexed column should be in filter condition columns."))) + msg.exists( + _.contains("The first indexed column should be used in filter conditions."))) } } @@ -156,23 +157,23 @@ class FilterIndexRuleTest extends HyperspaceRuleSuite { val originalPlan = Filter(filterCondition, scanNode) val allIndexes = IndexCollectionManager(spark).getIndexes(Seq(Constants.States.ACTIVE)) - allIndexes.foreach(_.setTagValue(IndexLogEntryTags.FILTER_REASONS_ENABLED, true)) + allIndexes.foreach(_.setTagValue(IndexLogEntryTags.INDEX_PLAN_ANALYSIS_ENABLED, true)) val (transformedPlan, score) = applyFilterIndexRuleHelper(originalPlan, allIndexes) assert(!transformedPlan.equals(originalPlan), "No plan transformation.") verifyTransformedPlanWithIndex(transformedPlan, indexName2) allIndexes.foreach { index => - val msg = index.getTagValue(originalPlan, IndexLogEntryTags.FILTER_REASONS) + val reasons = index.getTagValue(originalPlan, IndexLogEntryTags.FILTER_REASONS) index.name match { case `indexName1` => - assert(msg.isDefined) + val msg = reasons.get.map(_.verboseStr) assert( - msg.get.exists( - _.contains("The first indexed column should be in filter condition columns."))) + msg.exists( + _.contains("The first indexed column should be used in filter conditions."))) case `indexName2` => - assert(msg.isEmpty) + assert(reasons.isEmpty) case `indexName3` => - assert(msg.isDefined) - assert(msg.get.exists(_.contains(s"Another candidate index is applied: $indexName2"))) + val msg = reasons.get.map(_.verboseStr) + assert(msg.exists(_.contains(s"Another candidate index is applied: $indexName2"))) } } } diff --git a/src/test/scala/com/microsoft/hyperspace/index/covering/JoinIndexRuleTest.scala b/src/test/scala/com/microsoft/hyperspace/index/covering/JoinIndexRuleTest.scala index a8bfb400b..14e567ba9 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/covering/JoinIndexRuleTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/covering/JoinIndexRuleTest.scala @@ -110,7 +110,7 @@ class JoinIndexRuleTest extends HyperspaceRuleSuite with SQLHelper { val indexes = if (allIndexes.isEmpty) { IndexCollectionManager(spark).getIndexes(Seq(Constants.States.ACTIVE)) } else { - allIndexes.foreach(_.setTagValue(IndexLogEntryTags.FILTER_REASONS_ENABLED, true)) + allIndexes.foreach(_.setTagValue(IndexLogEntryTags.INDEX_PLAN_ANALYSIS_ENABLED, true)) allIndexes } val candidateIndexes = CandidateIndexCollector(plan, indexes) @@ -161,9 +161,10 @@ class JoinIndexRuleTest extends HyperspaceRuleSuite with SQLHelper { val (updatedPlan, _) = applyJoinIndexRuleHelper(originalPlan, allIndexes) assert(updatedPlan.equals(originalPlan)) allIndexes.foreach { index => - val msg = index.getTagValue(originalPlan, IndexLogEntryTags.FILTER_REASONS) - assert(msg.isDefined) - assert(msg.get.exists(_.contains("Not eligible Join - no join condition."))) + val reasons = index.getTagValue(originalPlan, IndexLogEntryTags.FILTER_REASONS) + assert(reasons.isDefined) + val msg = reasons.get.map(_.verboseStr) + assert(msg.exists(_.contains("Join condition is not eligible. Reason: No join condition"))) } } @@ -174,10 +175,12 @@ class JoinIndexRuleTest extends HyperspaceRuleSuite with SQLHelper { val (updatedPlan, _) = applyJoinIndexRuleHelper(originalPlan, allIndexes) assert(updatedPlan.equals(originalPlan)) allIndexes.foreach { index => - val msg = index.getTagValue(originalPlan, IndexLogEntryTags.FILTER_REASONS) - assert(msg.isDefined) + val reasons = index.getTagValue(originalPlan, IndexLogEntryTags.FILTER_REASONS) + assert(reasons.isDefined) + val msg = reasons.get.map(_.verboseStr) assert( - msg.get.exists(_.contains("Join condition is not eligible. Equi-Joins in simple CNF"))) + msg.exists( + _.contains("Join condition is not eligible. Reason: Non equi-join or has literal"))) } } @@ -188,10 +191,12 @@ class JoinIndexRuleTest extends HyperspaceRuleSuite with SQLHelper { val (updatedPlan, _) = applyJoinIndexRuleHelper(originalPlan, allIndexes) assert(updatedPlan.equals(originalPlan)) allIndexes.foreach { index => - val msg = index.getTagValue(originalPlan, IndexLogEntryTags.FILTER_REASONS) - assert(msg.isDefined) + val reasons = index.getTagValue(originalPlan, IndexLogEntryTags.FILTER_REASONS) + assert(reasons.isDefined) + val msg = reasons.get.map(_.verboseStr) assert( - msg.get.exists(_.contains("Join condition is not eligible. Equi-Joins in simple CNF"))) + msg.exists( + _.contains("Join condition is not eligible. Reason: Non equi-join or has literal"))) } } @@ -202,10 +207,12 @@ class JoinIndexRuleTest extends HyperspaceRuleSuite with SQLHelper { val (updatedPlan, _) = applyJoinIndexRuleHelper(originalPlan, allIndexes) assert(updatedPlan.equals(originalPlan)) allIndexes.foreach { index => - val msg = index.getTagValue(originalPlan, IndexLogEntryTags.FILTER_REASONS) - assert(msg.isDefined) + val reasons = index.getTagValue(originalPlan, IndexLogEntryTags.FILTER_REASONS) + assert(reasons.isDefined) + val msg = reasons.get.map(_.verboseStr) assert( - msg.get.exists(_.contains("Join condition is not eligible. Equi-Joins in simple CNF"))) + msg.exists( + _.contains("Join condition is not eligible. Reason: Non equi-join or has literal"))) } } @@ -224,38 +231,53 @@ class JoinIndexRuleTest extends HyperspaceRuleSuite with SQLHelper { assert(updatedPlan.equals(originalPlan)) allIndexes.foreach { index => - val msg = index.getTagValue(originalPlan, IndexLogEntryTags.FILTER_REASONS) - assert(msg.isDefined) - + val reasons = index.getTagValue(originalPlan, IndexLogEntryTags.FILTER_REASONS) + assert(reasons.isDefined) + val msg = reasons.get.map(_.verboseStr) index.name match { case "t1i1" => assert( - msg.get.toSet.equals( + msg.toSet.equals( Set( - "All join condition column should be the indexed columns. " + - "Join columns: [t1c2], Indexed columns: [t1c1]", - "No available indexes for right subplan."))) + "All join condition column and indexed column should be the same. " + + "Join columns: [t1c2], Indexed columns for left subplan: [t1c1]", + "No available indexes for right subplan. " + + "Both left and right indexes are required for Join query.")), + msg) case "t1i2" => assert( - msg.get.toSet.equals(Set( - "All join condition column should be the indexed columns. " + - "Join columns: [t1c2], Indexed columns: [t1c1,t1c2]", - "No available indexes for right subplan."))) + msg.toSet.equals( + Set( + "All join condition column and indexed column should be the same. " + + "Join columns: [t1c2], Indexed columns for left subplan: [t1c1,t1c2]", + "No available indexes for right subplan. " + + "Both left and right indexes are required for Join query.")), + msg) case "t1i3" => - assert(msg.get.toSet.equals(Set("No available indexes for right subplan."))) + assert( + msg.toSet + .equals( + Set("No available indexes for right subplan. " + + "Both left and right indexes are required for Join query.")), + msg) case "t2i1" => assert( - msg.get.toSet.equals( + msg.toSet.equals( Set( - "All join condition column should be the indexed columns. " + - "Join columns: [t2c2], Indexed columns: [t2c1]", - "No available indexes for right subplan."))) + "All join condition column and indexed column should be the same. " + + "Join columns: [t2c2], Indexed columns for right subplan: [t2c1]", + "No available indexes for right subplan. " + + "Both left and right indexes are required for Join query.")), + msg) case "t2i2" => assert( - msg.get.toSet.equals(Set( - "All join condition column should be the indexed columns. " + - "Join columns: [t2c2], Indexed columns: [t2c1,t2c2]", - "No available indexes for right subplan."))) + msg.toSet.equals( + Set( + "All join condition column and indexed column should be the same. " + + "Join columns: [t2c2], Indexed columns for right subplan: [t2c1,t2c2]", + "No available indexes for right subplan. " + + "Both left and right indexes are required for Join query.")), + msg) } } } @@ -277,24 +299,29 @@ class JoinIndexRuleTest extends HyperspaceRuleSuite with SQLHelper { assert(updatedPlan.equals(originalPlan)) allIndexes.foreach { index => - val msg = index.getTagValue(originalPlan, IndexLogEntryTags.FILTER_REASONS) - assert(msg.isDefined) + val reasons = index.getTagValue(originalPlan, IndexLogEntryTags.FILTER_REASONS) + assert(reasons.isDefined) + val msg = reasons.get.map(_.verboseStr) index.name match { case "t1i1" => assert( - msg.get.toSet.equals( + msg.toSet.equals( Set( - "Index does not contain all required columns. " + - "Required columns: [t1c1,t1c4], Index columns: [t1c1,t1c3]", - "No available indexes for left subplan."))) + "Index does not contain required columns for left subplan. " + + "Required indexed columns: [t1c1,t1c4], Indexed columns: [t1c1]", + "No available indexes for left subplan. " + + "Both left and right indexes are required for Join query.")), + msg) case "t2i1" => assert( - msg.get.toSet.equals( + msg.toSet.equals( Set( - "Index does not contain all required columns. " + - "Required columns: [t2c1,t2c4], Index columns: [t2c1,t2c3]", - "No available indexes for left subplan."))) + "Index does not contain required columns for right subplan. " + + "Required indexed columns: [t2c1,t2c4], Indexed columns: [t2c1]", + "No available indexes for left subplan. " + + "Both left and right indexes are required for Join query.")), + msg) case _ => } } @@ -437,13 +464,13 @@ class JoinIndexRuleTest extends HyperspaceRuleSuite with SQLHelper { val (updatedPlan, _) = applyJoinIndexRuleHelper(originalPlan, allIndexes) assert(updatedPlan.equals(originalPlan)) allIndexes.foreach { index => - val msg = index.getTagValue(originalPlan, IndexLogEntryTags.FILTER_REASONS) - assert(msg.isDefined) + val reasons = index.getTagValue(originalPlan, IndexLogEntryTags.FILTER_REASONS) + assert(reasons.isDefined) + val msg = reasons.get.map(_.verboseStr) assert( - msg.get.toSet.equals( - Set("Each join condition column should come from relations directly and attributes " + - "from left plan must exclusively have one-to-one mapping with attributes from " + - "right plan. E.g. join(A = B and A = D) is not eligible."))) + msg.size == 1 && msg.head.contains( + "Join condition is not eligible. Reason: incompatible left and right join columns"), + msg) } } { diff --git a/src/test/scala/com/microsoft/hyperspace/index/plananalysis/ExplainTest.scala b/src/test/scala/com/microsoft/hyperspace/index/plananalysis/ExplainTest.scala index 6168b368c..c6bc3461c 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/plananalysis/ExplainTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/plananalysis/ExplainTest.scala @@ -181,7 +181,7 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite { def filterQuery(query: DataFrame): DataFrame = { query.filter("Col2 == 2").select("Col1") } - verifyExplainOutput(df, expectedOutput.toString, verbose = false) { filterQuery } + verifyExplainOutput(df, expectedOutput, verbose = false) { filterQuery } } private def getIndexRootPath(indexName: String): Path = diff --git a/src/test/scala/com/microsoft/hyperspace/index/rules/CandidateIndexCollectorTest.scala b/src/test/scala/com/microsoft/hyperspace/index/rules/CandidateIndexCollectorTest.scala index 07c7e5833..dd79d8316 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/rules/CandidateIndexCollectorTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/rules/CandidateIndexCollectorTest.scala @@ -329,7 +329,7 @@ class CandidateIndexCollectorTest extends HyperspaceRuleSuite with SQLHelper { } val allIndexes = indexList.map(indexName => latestIndexLogEntry(systemPath, indexName)) - allIndexes.foreach(_.setTagValue(IndexLogEntryTags.FILTER_REASONS_ENABLED, true)) + allIndexes.foreach(_.setTagValue(IndexLogEntryTags.INDEX_PLAN_ANALYSIS_ENABLED, true)) val plan1 = spark.read.parquet(dataPath).select("id", "name").queryExecution.optimizedPlan @@ -340,16 +340,17 @@ class CandidateIndexCollectorTest extends HyperspaceRuleSuite with SQLHelper { assert(filtered.toSet.equals(Set("index_ok", "index_noLineage"))) allIndexes.foreach { entry => - val msg = entry.getTagValue(plan1, IndexLogEntryTags.FILTER_REASONS) + val reasons = entry.getTagValue(plan1, IndexLogEntryTags.FILTER_REASONS) if (filtered.contains(entry.name)) { - assert(msg.isEmpty) + assert(reasons.isEmpty) } else { - assert(msg.isDefined) + assert(reasons.isDefined) + val msg = reasons.get.map(_.verboseStr) entry.name match { case "index_differentColumn" => - assert(msg.get.exists(_.contains("Column Schema does not match"))) + assert(msg.exists(_.contains("Column Schema does not match"))) case "index_noCommonFiles" => - assert(msg.get.exists(_.contains("Index signature does not match"))) + assert(msg.exists(_.contains("Index signature does not match"))) } } @@ -366,15 +367,16 @@ class CandidateIndexCollectorTest extends HyperspaceRuleSuite with SQLHelper { assert(indexes.isEmpty) allIndexes.foreach { entry => - val msg = entry.getTagValue(plan2, IndexLogEntryTags.FILTER_REASONS) - assert(msg.isDefined) + val reasons = entry.getTagValue(plan2, IndexLogEntryTags.FILTER_REASONS) + assert(reasons.isDefined) + val msg = reasons.get.map(_.verboseStr) entry.name match { case "index_differentColumn" => - assert(msg.get.exists(_.contains("Column Schema does not match"))) + assert(msg.exists(_.contains("Column Schema does not match"))) case "index_noCommonFiles" => - assert(msg.get.exists(_.contains("No common files."))) + assert(msg.exists(_.contains("No common files."))) case _ => - assert(msg.get.exists(_.contains("Appended bytes ratio"))) + assert(msg.exists(_.contains("Appended bytes ratio"))) } // Unset for next test. @@ -389,16 +391,17 @@ class CandidateIndexCollectorTest extends HyperspaceRuleSuite with SQLHelper { assert(filtered.toSet.equals(Set("index_ok", "index_noLineage"))) allIndexes.foreach { entry => - val msg = entry.getTagValue(plan2, IndexLogEntryTags.FILTER_REASONS) + val reasons = entry.getTagValue(plan2, IndexLogEntryTags.FILTER_REASONS) if (filtered.contains(entry.name)) { - assert(msg.isEmpty) + assert(reasons.isEmpty) } else { - assert(msg.isDefined) + assert(reasons.isDefined) + val msg = reasons.get.map(_.verboseStr) entry.name match { case "index_differentColumn" => - assert(msg.get.exists(_.contains("Column Schema does not match"))) + assert(msg.exists(_.contains("Column Schema does not match"))) case "index_noCommonFiles" => - assert(msg.get.exists(_.contains("No common files."))) + assert(msg.exists(_.contains("No common files."))) } } @@ -420,17 +423,19 @@ class CandidateIndexCollectorTest extends HyperspaceRuleSuite with SQLHelper { assert(indexes.isEmpty) allIndexes.foreach { entry => - val msg = entry.getTagValue(plan3, IndexLogEntryTags.FILTER_REASONS) - assert(msg.isDefined) + val reasons = entry.getTagValue(plan3, IndexLogEntryTags.FILTER_REASONS) + assert(reasons.isDefined) + val msg = reasons.get.map(_.verboseStr) + entry.name match { case "index_differentColumn" => - assert(msg.get.exists(_.contains("Column Schema does not match"))) + assert(msg.exists(_.contains("Column Schema does not match"))) case "index_noCommonFiles" => - assert(msg.get.exists(_.contains("No common files."))) + assert(msg.exists(_.contains("No common files."))) case "index_noLineage" => - assert(msg.get.exists(_.contains("Index doesn't support deleted files."))) + assert(msg.exists(_.contains("Index doesn't support deleted files."))) case "index_ok" => - assert(msg.get.exists(_.contains("Deleted bytes ratio"))) + assert(msg.exists(_.contains("Deleted bytes ratio"))) } entry.unsetTagValue(plan3, IndexLogEntryTags.FILTER_REASONS) diff --git a/src/test/scala/com/microsoft/hyperspace/index/rules/ScoreBasedIndexPlanOptimizerTest.scala b/src/test/scala/com/microsoft/hyperspace/index/rules/ScoreBasedIndexPlanOptimizerTest.scala index 3742e8643..87a0055b0 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/rules/ScoreBasedIndexPlanOptimizerTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/rules/ScoreBasedIndexPlanOptimizerTest.scala @@ -112,6 +112,26 @@ class ScoreBasedIndexPlanOptimizerTest extends QueryTest with HyperspaceSuite { query(leftDf, rightDf), getIndexFilesPath(leftDfFilterIndexConfig.indexName) ++ getIndexFilesPath(rightDfFilterIndexConfig.indexName)) + + def normalize(str: String): String = { + // Expression ids are removed before comparison since they can be different. + str.replaceAll("""#(\d+)|subquery(\d+)""", "#") + } + + // Verify whyNot result. + hyperspace.whyNot(query(leftDf, rightDf)()) { o => + val expectedOutput = getExpectedResult("whyNot_allIndex.txt") + .replace(System.lineSeparator(), "\n") + val actual = normalize(o.replace(System.lineSeparator(), "\n")) + assert(actual.equals(expectedOutput), actual) + } + + hyperspace.whyNot(query(leftDf, rightDf)(), "leftDfJoinIndex", extended = true) { o => + val expectedOutput = getExpectedResult("whyNot_indexName.txt") + .replace(System.lineSeparator(), "\n") + val actual = normalize(o.replace(System.lineSeparator(), "\n")) + assert(actual.equals(expectedOutput), actual) + } } } }