diff --git a/src/main/scala/com/microsoft/hyperspace/Hyperspace.scala b/src/main/scala/com/microsoft/hyperspace/Hyperspace.scala index fe1c55377..e14b67768 100644 --- a/src/main/scala/com/microsoft/hyperspace/Hyperspace.scala +++ b/src/main/scala/com/microsoft/hyperspace/Hyperspace.scala @@ -150,7 +150,7 @@ class Hyperspace(spark: SparkSession) { } /** - * Explains how indexes will be applied to the given dataframe. + * Explain how indexes will be applied to the given dataframe. * * @param df dataFrame. * @param redirectFunc optional function to redirect output of explain. @@ -171,10 +171,18 @@ class Hyperspace(spark: SparkSession) { indexManager.index(indexName) } + /** + * Explain why indexes are not applied to the given dataframe. + * + * @param df Dataframe + * @param indexName Optional index name to filter out the output + * @param extended If true, print more verbose messages. + * @param redirectFunc Optional function to redirect output + */ def whyNot(df: DataFrame, indexName: String = "", extended: Boolean = false)( implicit redirectFunc: String => Unit = print): Unit = { withHyperspaceRuleDisabled { - if (indexName.nonEmpty) { + if (indexName.isEmpty) { redirectFunc(CandidateIndexAnalyzer.whyNotIndexString(spark, df, indexName, extended)) } else { redirectFunc(CandidateIndexAnalyzer.whyNotIndexesString(spark, df, extended)) diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala index c4781dde8..5257fa530 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala @@ -527,7 +527,7 @@ case class IndexLogEntry( tags.get((plan, tag)).map(_.asInstanceOf[T]) } - def getTagValueForAllPlan[T](tag: IndexLogEntryTag[T]): Seq[(LogicalPlan, T)] = { + def getTagValuesForAllPlan[T](tag: IndexLogEntryTag[T]): Seq[(LogicalPlan, T)] = { tags.filter(entry => entry._1._2.equals(tag)).toSeq.map { case (k, v) => (k._1, v.asInstanceOf[T]) } diff --git a/src/main/scala/com/microsoft/hyperspace/index/covering/FilterIndexRule.scala b/src/main/scala/com/microsoft/hyperspace/index/covering/FilterIndexRule.scala index ca7e81dc0..24ec60f79 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/covering/FilterIndexRule.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/covering/FilterIndexRule.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{Filter, LeafNode, LogicalPla import com.microsoft.hyperspace.{ActiveSparkSession, Hyperspace} import com.microsoft.hyperspace.index.IndexLogEntryTags -import com.microsoft.hyperspace.index.plananalysis.{FilterReasonCode, FilterReasons} +import com.microsoft.hyperspace.index.plananalysis.FilterReasons import com.microsoft.hyperspace.index.rules.{HyperspaceRule, IndexRankFilter, QueryPlanIndexFilter} import com.microsoft.hyperspace.index.rules.ApplyHyperspace.{PlanToIndexesMap, PlanToSelectedIndexMap} import com.microsoft.hyperspace.index.sources.FileBasedRelation @@ -97,10 +97,9 @@ object FilterColumnFilter extends QueryPlanIndexFilter { withFilterReasonTag( plan, index, - FilterReasons.apply( - FilterReasonCode.NO_FIRST_INDEXED_COL_COND, - ("firstIndexedCol", index.derivedDataset.indexedColumns.head), - ("filterColumns", filterColumnNames.mkString(", ")))) { + FilterReasons.NoFirstIndexedColCond( + index.derivedDataset.indexedColumns.head, + filterColumnNames.mkString(", "))) { ResolverUtils .resolve(spark, index.derivedDataset.indexedColumns.head, filterColumnNames) .isDefined @@ -108,10 +107,9 @@ object FilterColumnFilter extends QueryPlanIndexFilter { withFilterReasonTag( plan, index, - FilterReasons.apply( - FilterReasonCode.MISSING_REQUIRED_COL, - ("requiredCols", (filterColumnNames ++ projectColumnNames).toSet.mkString(",")), - ("indexCols", index.derivedDataset.referencedColumns.mkString(",")))) { + FilterReasons.MissingRequiredCol( + (filterColumnNames ++ projectColumnNames).toSet.mkString(","), + index.derivedDataset.referencedColumns.mkString(","))) { ResolverUtils .resolve( spark, diff --git a/src/main/scala/com/microsoft/hyperspace/index/covering/JoinIndexRule.scala b/src/main/scala/com/microsoft/hyperspace/index/covering/JoinIndexRule.scala index 616e50df0..8f2441263 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/covering/JoinIndexRule.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/covering/JoinIndexRule.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan} import com.microsoft.hyperspace.Hyperspace import com.microsoft.hyperspace.index.{IndexLogEntry, IndexLogEntryTags} import com.microsoft.hyperspace.index.covering.JoinAttributeFilter.extractConditions -import com.microsoft.hyperspace.index.plananalysis.{FilterReasonCode, FilterReasons} +import com.microsoft.hyperspace.index.plananalysis.FilterReasons import com.microsoft.hyperspace.index.rules.{HyperspaceRule, IndexRankFilter, QueryPlanIndexFilter} import com.microsoft.hyperspace.index.rules.ApplyHyperspace.{PlanToIndexesMap, PlanToSelectedIndexMap} import com.microsoft.hyperspace.index.sources.FileBasedRelation @@ -66,9 +66,7 @@ object JoinPlanNodeFilter extends QueryPlanIndexFilter { val joinConditionCond = withFilterReasonTag( plan, leftAndRightIndexes, - FilterReasons.apply( - FilterReasonCode.NOT_ELIGIBLE_JOIN, - ("reason", "Non equi-join or has literal"))) { + FilterReasons.NotEligibleJoin("Non equi-join or has literal")) { isJoinConditionSupported(condition) } @@ -76,18 +74,14 @@ object JoinPlanNodeFilter extends QueryPlanIndexFilter { withFilterReasonTag( plan, leftAndRightIndexes, - FilterReasons.apply( - FilterReasonCode.NOT_ELIGIBLE_JOIN, - ("reason", "Non linear left child plan"))) { + FilterReasons.NotEligibleJoin("Non linear left child plan")) { isPlanLinear(l) } val rightPlanLinearCond = withFilterReasonTag( plan, leftAndRightIndexes, - FilterReasons.apply( - FilterReasonCode.NOT_ELIGIBLE_JOIN, - ("reason", "Non linear right child plan"))) { + FilterReasons.NotEligibleJoin("Non linear right child plan")) { isPlanLinear(r) } @@ -108,9 +102,7 @@ object JoinPlanNodeFilter extends QueryPlanIndexFilter { setFilterReasonTag( plan, candidateIndexes.values.flatten.toSeq, - FilterReasons.apply( - FilterReasonCode.NOT_ELIGIBLE_JOIN, - ("reason", "No join condition"))) + FilterReasons.NotEligibleJoin("No join condition")) Map.empty case _ => Map.empty @@ -178,9 +170,7 @@ object JoinAttributeFilter extends QueryPlanIndexFilter { if (withFilterReasonTag( plan, candidateIndexes.head._2 ++ candidateIndexes.last._2, - FilterReasons.apply( - FilterReasonCode.NOT_ELIGIBLE_JOIN, - ("reason", "incompatible left and right join columns"))) { + FilterReasons.NotEligibleJoin("incompatible left and right join columns")) { ensureAttributeRequirements( JoinIndexRule.leftRelation.get, JoinIndexRule.rightRelation.get, @@ -365,13 +355,11 @@ object JoinColumnFilter extends QueryPlanIndexFilter { if (withFilterReasonTag( plan, candidateIndexes.head._2 ++ candidateIndexes.last._2, - FilterReasons.apply(FilterReasonCode.NO_AVAIL_JOIN_INDEX_PAIR, ("child", "left")))( - lIndexes.nonEmpty) && + FilterReasons.NoAvailJoinIndexPair("left"))(lIndexes.nonEmpty) && withFilterReasonTag( plan, candidateIndexes.head._2 ++ candidateIndexes.last._2, - FilterReasons.apply(FilterReasonCode.NO_AVAIL_JOIN_INDEX_PAIR, ("child", "right")))( - rIndexes.nonEmpty)) { + FilterReasons.NoAvailJoinIndexPair("right"))(rIndexes.nonEmpty)) { Map(leftRelation.plan -> lIndexes, rightRelation.plan -> rIndexes) } else { Map.empty @@ -415,6 +403,19 @@ object JoinColumnFilter extends QueryPlanIndexFilter { }.toMap } + /** + * Get usable indexes which satisfy indexed and included column requirements. + * + * Pre-requisite: the indexed and included columns required must be already resolved with their + * corresponding base relation columns at this point. + * + * @param plan Query plan + * @param indexes All available indexes for the logical plan + * @param requiredIndexCols required indexed columns resolved with their base relation column. + * @param allRequiredCols required included columns resolved with their base relation column. + * @return Indexes which satisfy the indexed and covering column requirements from the logical + * plan and join condition + */ private def getUsableIndexes( plan: LogicalPlan, indexes: Seq[IndexLogEntry], @@ -428,21 +429,19 @@ object JoinColumnFilter extends QueryPlanIndexFilter { withFilterReasonTag( plan, idx, - FilterReasons.apply( - FilterReasonCode.NOT_ALL_JOIN_COL_INDEXED, - ("child", leftOrRight), - ("joinCols", requiredIndexCols.mkString(", ")), - ("indexedCols", idx.indexedColumns.mkString(", ")))) { + FilterReasons.NotAllJoinColIndexed( + leftOrRight, + requiredIndexCols.mkString(", "), + idx.indexedColumns.mkString(", "))) { requiredIndexCols.toSet.equals(idx.indexedColumns.toSet) } && withFilterReasonTag( plan, idx, - FilterReasons.apply( - FilterReasonCode.MISSING_INDEXED_COL, - ("child", leftOrRight), - ("requiredIndexedCols", allRequiredCols.mkString(", ")), - ("IndexedCols", idx.indexedColumns.mkString(", ")))) { + FilterReasons.MissingIndexedCol( + leftOrRight, + allRequiredCols.mkString(", "), + idx.indexedColumns.mkString(", "))) { allRequiredCols.forall(allCols.contains) } } @@ -526,7 +525,7 @@ object JoinRankFilter extends IndexRankFilter { setFilterReasonTag( plan, indexes.head._2 ++ indexes.last._2, - FilterReasons.apply(FilterReasonCode.NO_COMPATIBLE_JOIN_INDEX_PAIR)) + FilterReasons.NoCompatibleJoinIndexPair()) Map.empty } } diff --git a/src/main/scala/com/microsoft/hyperspace/index/plananalysis/CandidateIndexAnalyzer.scala b/src/main/scala/com/microsoft/hyperspace/index/plananalysis/CandidateIndexAnalyzer.scala index 901f97304..ae4b8a65b 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/plananalysis/CandidateIndexAnalyzer.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/plananalysis/CandidateIndexAnalyzer.scala @@ -104,16 +104,6 @@ object CandidateIndexAnalyzer extends Logging { applicableIndexes: Seq[(IndexLogEntry, Seq[(LogicalPlan, Seq[String])])]): String = { val stringBuilder = new StringBuilder val originalPlanString = planWithoutHyperspace.numberedTreeString.split('\n') - if (applicableIndexes.isEmpty) { - return "No applicable indexes. Try hyperspace.whyNot()" - } - val newLine = System.lineSeparator() - stringBuilder.append("Plan without Hyperspace:") - stringBuilder.append(newLine) - stringBuilder.append(newLine) - stringBuilder.append(originalPlanString.mkString(newLine)) - stringBuilder.append(newLine) - stringBuilder.append(newLine) // to Dataframe // sub plan line number, index name, rule name @@ -132,6 +122,18 @@ object CandidateIndexAnalyzer extends Logging { .sortBy(r => (r._1, r._3)) .distinct + if (res.isEmpty) { + return "No applicable indexes. Try hyperspace.whyNot()" + } + val newLine = System.lineSeparator() + stringBuilder.append("Plan without Hyperspace:") + stringBuilder.append(newLine) + stringBuilder.append(newLine) + stringBuilder.append(originalPlanString.mkString(newLine)) + stringBuilder.append(newLine) + stringBuilder.append(newLine) + + import spark.implicits._ val df = res.toDF("SubPlan", "IndexName", "IndexType", "RuleName") @@ -330,9 +332,9 @@ object CandidateIndexAnalyzer extends Logging { transformedPlan, indexes .filter(i => indexName.isEmpty || indexName.get.equals(i.name)) - .map(i => (i, i.getTagValueForAllPlan(IndexLogEntryTags.FILTER_REASONS))), + .map(i => (i, i.getTagValuesForAllPlan(IndexLogEntryTags.FILTER_REASONS))), indexes - .map(i => (i, i.getTagValueForAllPlan(IndexLogEntryTags.APPLICABLE_INDEX_RULES)))) + .map(i => (i, i.getTagValuesForAllPlan(IndexLogEntryTags.APPLICABLE_INDEX_RULES)))) } finally { cleanupAnalysisTags(indexes) } diff --git a/src/main/scala/com/microsoft/hyperspace/index/plananalysis/FilterReason.scala b/src/main/scala/com/microsoft/hyperspace/index/plananalysis/FilterReason.scala index 5922654f4..d0229e995 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/plananalysis/FilterReason.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/plananalysis/FilterReason.scala @@ -21,7 +21,7 @@ trait FilterReason { def codeStr: String final def argStr: String = { // key1=[value1], key2=[value2] - args.map(kv => s"${kv._1}=[${kv._2}]").mkString(",") + args.map(kv => s"${kv._1}=[${kv._2}]").mkString(", ") } def verboseStr: String } @@ -30,71 +30,14 @@ trait FilterReasonNoArg extends FilterReason { final override val args: Seq[(String, String)] = Seq.empty } -object FilterReasonCode extends Enumeration { - type FilterReasonCode = Value - - // Common - val COL_SCHEMA_MISMATCH = Value - val SOURCE_DATA_CHANGED = Value - val NO_DELETE_SUPPORT = Value - val NO_COMMON_FILES = Value - val TOO_MUCH_APPENDED = Value - val TOO_MUCH_DELETED = Value - val ANOTHER_INDEX_APPLIED = Value - - // CoveringIndex - FilterIndexRule - val NO_FIRST_INDEXED_COL_COND = Value - val MISSING_REQUIRED_COL = Value - - // CoveringIndex - JoinIndexRule - val NOT_ELIGIBLE_JOIN = Value - val NO_AVAIL_JOIN_INDEX_PAIR = Value - val NO_COMPATIBLE_JOIN_INDEX_PAIR = Value - val NOT_ALL_JOIN_COL_INDEXED = Value - val MISSING_INDEXED_COL = Value -} - object FilterReasons { - import com.microsoft.hyperspace.index.plananalysis.FilterReasonCode._ - def apply(code: FilterReasonCode, args: (String, String)*): FilterReason = { - code match { - case COL_SCHEMA_MISMATCH => - ColSchemaMismatch(args) - case SOURCE_DATA_CHANGED => - SourceDataChanged() - case NO_DELETE_SUPPORT => - NoDeleteSupport() - case NO_COMMON_FILES => - NoCommonFiles() - case TOO_MUCH_APPENDED => - TooMuchAppended(args) - case TOO_MUCH_DELETED => - TooMuchDeleted(args) - case MISSING_REQUIRED_COL => - MissingRequiredCol(args) - case NO_FIRST_INDEXED_COL_COND => - NoFirstIndexedColCond(args) - case NOT_ELIGIBLE_JOIN => - NotEligibleJoin(args) - case NO_AVAIL_JOIN_INDEX_PAIR => - NoAvailJoinIndexPair(args) - case MISSING_INDEXED_COL => - MissingIndexedCol(args) - case NOT_ALL_JOIN_COL_INDEXED => - NotAllJoinColIndexed(args) - case NO_COMPATIBLE_JOIN_INDEX_PAIR => - NoCompatibleJoinIndexPair() - case ANOTHER_INDEX_APPLIED => - AnotherIndexApplied(args) - } - } - case class ColSchemaMismatch(override val args: Seq[(String, String)]) - extends FilterReason { + case class ColSchemaMismatch(sourceColumns: String, indexColumns: String) extends FilterReason { override final val codeStr: String = "COL_SCHEMA_MISMATCH" + override val args = Seq("sourceColumns" -> sourceColumns, "indexColumns" -> indexColumns) override def verboseStr: String = { - "Column Schema does not match. Source data columns: [" + args(0)._2 + - "], Index columns: [" + args(1)._2 + s"Column Schema does not match. Source data columns: [$sourceColumns], " + + s"Index columns: [$indexColumns]" } } @@ -113,79 +56,96 @@ object FilterReasons { override def verboseStr: String = "No common files." } - case class TooMuchAppended(override val args: Seq[(String, String)]) + case class TooMuchAppended(appendedRatio: String, hybridScanAppendThreshold: String) extends FilterReason { - override def codeStr: String = "TOO_MUCH_APPENDED" + override final def codeStr: String = "TOO_MUCH_APPENDED" + override val args = Seq( + "appendedRatio" -> appendedRatio, + "hybridScanAppendThreshold" -> hybridScanAppendThreshold) override def verboseStr: String = - s"Appended bytes ratio (${args(0)._2}) is larger than " + - s"threshold config ${args(1)._2}). " + s"Appended bytes ratio ($appendedRatio) is larger than " + + s"threshold config $hybridScanAppendThreshold). " } - case class TooMuchDeleted(override val args: Seq[(String, String)]) extends FilterReason { - override def codeStr: String = "TOO_MUCH_DELETED" + case class TooMuchDeleted(deletedRatio: String, hybridScanDeleteThreshold: String) + extends FilterReason { + override final def codeStr: String = "TOO_MUCH_DELETED" + override val args = Seq( + "deletedRatio" -> deletedRatio, + "hybridScanDeleteThreshold" -> hybridScanDeleteThreshold) override def verboseStr: String = - s"Deleted bytes ratio (${args(0)._2}) is larger than " + - s"threshold config ${args(1)._2}). " + s"Deleted bytes ratio ($deletedRatio) is larger than " + + s"threshold config $hybridScanDeleteThreshold). " } - case class MissingRequiredCol(override val args: Seq[(String, String)]) - extends FilterReason { - override def codeStr: String = "MISSING_REQUIRED_COL" + case class MissingRequiredCol(requiredCols: String, indexCols: String) extends FilterReason { + override final def codeStr: String = "MISSING_REQUIRED_COL" + override val args = Seq("requiredCols" -> requiredCols, "indexCols" -> indexCols) override def verboseStr: String = - s"Index does not contain required column. Required columns: [${args(0)._2}], " + - s"Index columns: [${args(1)._2}]" + s"Index does not contain required columns. Required columns: [$requiredCols], " + + s"Index columns: [$indexCols]" } - case class NoFirstIndexedColCond(override val args: Seq[(String, String)]) + case class NoFirstIndexedColCond(firstIndexedCol: String, filterCols: String) extends FilterReason { - override def codeStr: String = "NO_FIRST_INDEXED_COL_COND" + override final def codeStr: String = "NO_FIRST_INDEXED_COL_COND" + override val args = Seq("firstIndexedCol" -> firstIndexedCol, "filterCols" -> filterCols) override def verboseStr: String = "The first indexed column should be used in filter conditions. " + - s"The first indexed column: ${args(0)._2}, " + - s"Columns in filter condition: [${args(1)._2}]" + s"The first indexed column: $firstIndexedCol, " + + s"Columns in filter condition: [$filterCols]" } - case class NotEligibleJoin(override val args: Seq[(String, String)]) - extends FilterReason { - override def codeStr: String = "NOT_ELIGIBLE_JOIN" + case class NotEligibleJoin(reason: String) extends FilterReason { + override final def codeStr: String = "NOT_ELIGIBLE_JOIN" + override val args = Seq("reason" -> reason) override def verboseStr: String = - s"Join condition is not eligible. Reason: ${args(0)._2}" + s"Join condition is not eligible. Reason: $reason" } - case class NoAvailJoinIndexPair(override val args: Seq[(String, String)]) - extends FilterReason { + case class NoAvailJoinIndexPair(leftOrRight: String) extends FilterReason { override def codeStr: String = "NO_AVAIL_JOIN_INDEX_PAIR" + override val args = Seq("child" -> leftOrRight) override def verboseStr: String = - s"No available indexes for ${args(0)._2} subplan. " + - "Both left and right index are required for Join query" + s"No available indexes for $leftOrRight subplan. " + + "Both left and right indexes are required for Join query." } - case class MissingIndexedCol(override val args: Seq[(String, String)]) + case class MissingIndexedCol( + leftOrRight: String, + requiredIndexedCols: String, + indexedCols: String) extends FilterReason { - override def codeStr: String = "MISSING_INDEXED_COL" + override final def codeStr: String = "MISSING_INDEXED_COL" + override val args = Seq( + "child" -> leftOrRight, + "requiredIndexedCols" -> requiredIndexedCols, + "IndexedCols" -> indexedCols) override def verboseStr: String = - s"Index does not contain required columns for ${args(0)._2} subplan. " + - s"Required indexed columns: [${args(1)._2}], " + - s"Indexed columns: [${args(2)._2}]" + s"Index does not contain required columns for $leftOrRight subplan. " + + s"Required indexed columns: [$requiredIndexedCols], " + + s"Indexed columns: [$indexedCols]" } - case class NotAllJoinColIndexed(override val args: Seq[(String, String)]) + case class NotAllJoinColIndexed(leftOrRight: String, joinCols: String, indexedCols: String) extends FilterReason { - override def codeStr: String = "NOT_ALL_JOIN_COL_INDEXED" + override final def codeStr: String = "NOT_ALL_JOIN_COL_INDEXED" + override val args = + Seq("child" -> leftOrRight, "joinCols" -> joinCols, "indexedCols" -> indexedCols) override def verboseStr: String = - s"All join condition column should be the indexed columns. " + - s"Join columns: [${args(0)._2}], Indexed columns: [${args(1)._2}]" + s"All join condition column and indexed column should be the same. " + + s"Join columns: [$joinCols], Indexed columns for $leftOrRight subplan: [$indexedCols]" } case class NoCompatibleJoinIndexPair() extends FilterReasonNoArg { - override def codeStr: String = "NO_COMPATIBLE_JOIN_INDEX_PAIR" + override final def codeStr: String = "NO_COMPATIBLE_JOIN_INDEX_PAIR" override def verboseStr: String = "No compatible left and right index pair." } - case class AnotherIndexApplied(override val args: Seq[(String, String)]) - extends FilterReason { - override def codeStr: String = "ANOTHER_INDEX_APPLIED" + case class AnotherIndexApplied(appliedIndex: String) extends FilterReason { + override final def codeStr: String = "ANOTHER_INDEX_APPLIED" + override val args = Seq("appliedIndex" -> appliedIndex) override def verboseStr: String = - s"Another candidate index is applied: ${args(0)._2}" + s"Another candidate index is applied: $appliedIndex" } } diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/ColumnSchemaFilter.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/ColumnSchemaFilter.scala index beff7da54..cecb98476 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/ColumnSchemaFilter.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/ColumnSchemaFilter.scala @@ -19,7 +19,7 @@ package com.microsoft.hyperspace.index.rules import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import com.microsoft.hyperspace.index.IndexLogEntry -import com.microsoft.hyperspace.index.plananalysis.{FilterReasonCode, FilterReasons} +import com.microsoft.hyperspace.index.plananalysis.FilterReasons import com.microsoft.hyperspace.util.ResolverUtils /** @@ -33,10 +33,9 @@ object ColumnSchemaFilter extends SourcePlanIndexFilter { withFilterReasonTag( plan, index, - FilterReasons.apply( - FilterReasonCode.COL_SCHEMA_MISMATCH, - ("sourceColumns", relationColumnNames.mkString(", ")), - ("indexColumns", index.derivedDataset.referencedColumns.mkString(", ")))) { + FilterReasons.ColSchemaMismatch( + relationColumnNames.mkString(", "), + index.derivedDataset.referencedColumns.mkString(", "))) { ResolverUtils .resolve(spark, index.derivedDataset.referencedColumns, relationColumnNames) .isDefined diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/FileSignatureFilter.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/FileSignatureFilter.scala index c828675c3..4846ef732 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/FileSignatureFilter.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/FileSignatureFilter.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import com.microsoft.hyperspace.Hyperspace import com.microsoft.hyperspace.index.{IndexLogEntry, IndexLogEntryTags, LogicalPlanSignatureProvider} import com.microsoft.hyperspace.index.IndexLogEntryTags.{HYBRIDSCAN_RELATED_CONFIGS, IS_HYBRIDSCAN_CANDIDATE} -import com.microsoft.hyperspace.index.plananalysis.{FilterReasonCode, FilterReasons} +import com.microsoft.hyperspace.index.plananalysis.FilterReasons import com.microsoft.hyperspace.index.sources.FileBasedRelation import com.microsoft.hyperspace.util.HyperspaceConf @@ -60,10 +60,7 @@ object FileSignatureFilter extends SourcePlanIndexFilter { // Map of a signature provider to a signature generated for the given plan. val signatureMap = mutable.Map[String, Option[String]]() indexes.filter { index => - withFilterReasonTag( - plan, - index, - FilterReasons.apply(FilterReasonCode.SOURCE_DATA_CHANGED)) { + withFilterReasonTag(plan, index, FilterReasons.SourceDataChanged()) { signatureValid(relation, index, signatureMap) } } @@ -134,17 +131,11 @@ object FileSignatureFilter extends SourcePlanIndexFilter { // Tag to original index log entry to check the reason string with the given log entry. lazy val hasLineageColumnCond = - withFilterReasonTag( - relation.plan, - index, - FilterReasons.apply(FilterReasonCode.NO_DELETE_SUPPORT)) { + withFilterReasonTag(relation.plan, index, FilterReasons.NoDeleteSupport()) { entry.derivedDataset.canHandleDeletedFiles } lazy val hasCommonFilesCond = - withFilterReasonTag( - relation.plan, - index, - FilterReasons.apply(FilterReasonCode.NO_COMMON_FILES)) { + withFilterReasonTag(relation.plan, index, FilterReasons.NoCommonFiles()) { commonCnt > 0 } @@ -153,19 +144,17 @@ object FileSignatureFilter extends SourcePlanIndexFilter { lazy val appendThresholdCond = withFilterReasonTag( relation.plan, index, - FilterReasons.apply( - FilterReasonCode.TOO_MUCH_APPENDED, - ("appendedRatio", appendedBytesRatio.toString), - ("hybridScanAppendThreshold", hybridScanAppendThreshold.toString))) { + FilterReasons.TooMuchAppended( + appendedBytesRatio.toString, + hybridScanAppendThreshold.toString)) { appendedBytesRatio < hybridScanAppendThreshold } lazy val deleteThresholdCond = withFilterReasonTag( relation.plan, index, - FilterReasons.apply( - FilterReasonCode.TOO_MUCH_DELETED, - ("deletedRatio", deletedBytesRatio.toString), - ("hybridScanDeleteThreshold", hybridScanDeleteThreshold.toString))) { + FilterReasons.TooMuchDeleted( + deletedBytesRatio.toString, + hybridScanDeleteThreshold.toString)) { deletedBytesRatio < HyperspaceConf.hybridScanDeletedRatioThreshold(spark) } diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/IndexRankFilter.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/IndexRankFilter.scala index 7b73b64f7..cfea96bd5 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/IndexRankFilter.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/IndexRankFilter.scala @@ -19,7 +19,7 @@ package com.microsoft.hyperspace.index.rules import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import com.microsoft.hyperspace.index.IndexLogEntry -import com.microsoft.hyperspace.index.plananalysis.{FilterReasonCode, FilterReasons} +import com.microsoft.hyperspace.index.plananalysis.FilterReasons import com.microsoft.hyperspace.index.rules.ApplyHyperspace.{PlanToIndexesMap, PlanToSelectedIndexMap} /** @@ -52,9 +52,7 @@ trait IndexRankFilter extends IndexFilter { selectedIndex.name.equals(index.name), plan, index, - FilterReasons.apply( - FilterReasonCode.ANOTHER_INDEX_APPLIED, - ("appliedIndex", selectedIndex.name))) + FilterReasons.AnotherIndexApplied(selectedIndex.name)) } } } diff --git a/src/test/resources/expected/spark-2.4/selfJoin.txt b/src/test/resources/expected/spark-2.4/selfJoin.txt index 0333e01be..79a05ecec 100644 --- a/src/test/resources/expected/spark-2.4/selfJoin.txt +++ b/src/test/resources/expected/spark-2.4/selfJoin.txt @@ -44,3 +44,22 @@ Physical operator stats: | SortMergeJoin| 1| 1| 0| +----------------------------------------------------------+-------------------+------------------+----------+ +============================================================= +Applicable indexes: +============================================================= +Plan without Hyperspace: + +00 Join Inner, (Col1# = Col1#) +01 :- Filter isnotnull(Col1#) +02 : +- Relation[Col1#,Col2#] parquet +03 +- Filter isnotnull(Col1#) +04 +- Relation[Col1#,Col2#] parquet + ++----------+---------+---------+---------------+ +|SubPlan |IndexName|IndexType|RuleName | ++----------+---------+---------+---------------+ +|Filter @ 1|joinIndex|CI |FilterIndexRule| +|Filter @ 3|joinIndex|CI |FilterIndexRule| +|Join @ 0 |joinIndex|CI |JoinIndexRule | ++----------+---------+---------+---------------+ + diff --git a/src/test/resources/expected/spark-2.4/subquery.txt b/src/test/resources/expected/spark-2.4/subquery.txt index 32716c697..775247a4b 100644 --- a/src/test/resources/expected/spark-2.4/subquery.txt +++ b/src/test/resources/expected/spark-2.4/subquery.txt @@ -45,3 +45,7 @@ Physical operator stats: |WholeStageCodegen| 1| 1| 0| +-----------------+-------------------+------------------+----------+ +============================================================= +Applicable indexes: +============================================================= +No applicable indexes. Try hyperspace.whyNot() diff --git a/src/test/resources/expected/spark-3.0/selfJoin.txt b/src/test/resources/expected/spark-3.0/selfJoin.txt index 2832623b6..68b21c0d1 100644 --- a/src/test/resources/expected/spark-3.0/selfJoin.txt +++ b/src/test/resources/expected/spark-3.0/selfJoin.txt @@ -52,3 +52,22 @@ Physical operator stats: | WholeStageCodegen (2)| 1| 1| 0| +----------------------------------------------------------+-------------------+------------------+----------+ +============================================================= +Applicable indexes: +============================================================= +Plan without Hyperspace: + +00 Join Inner, (Col1# = Col1#) +01 :- Filter isnotnull(Col1#) +02 : +- Relation[Col1#,Col2#] parquet +03 +- Filter isnotnull(Col1#) +04 +- Relation[Col1#,Col2#] parquet + ++----------+---------+---------+---------------+ +|SubPlan |IndexName|IndexType|RuleName | ++----------+---------+---------+---------------+ +|Filter @ 1|joinIndex|CI |FilterIndexRule| +|Filter @ 3|joinIndex|CI |FilterIndexRule| +|Join @ 0 |joinIndex|CI |JoinIndexRule | ++----------+---------+---------+---------------+ + diff --git a/src/test/resources/expected/spark-3.0/subquery.txt b/src/test/resources/expected/spark-3.0/subquery.txt index c306eec78..4aea12e39 100644 --- a/src/test/resources/expected/spark-3.0/subquery.txt +++ b/src/test/resources/expected/spark-3.0/subquery.txt @@ -43,3 +43,7 @@ Physical operator stats: |WholeStageCodegen (1)| 1| 1| 0| +---------------------+-------------------+------------------+----------+ +============================================================= +Applicable indexes: +============================================================= +No applicable indexes. Try hyperspace.whyNot() diff --git a/src/test/resources/expected/spark-3.1/selfJoin.txt b/src/test/resources/expected/spark-3.1/selfJoin.txt index e7770d082..34f9ca42e 100644 --- a/src/test/resources/expected/spark-3.1/selfJoin.txt +++ b/src/test/resources/expected/spark-3.1/selfJoin.txt @@ -48,3 +48,22 @@ Physical operator stats: | WholeStageCodegen (2)| 1| 1| 0| +----------------------------------------------------------+-------------------+------------------+----------+ +============================================================= +Applicable indexes: +============================================================= +Plan without Hyperspace: + +00 Join Inner, (Col1# = Col1#) +01 :- Filter isnotnull(Col1#) +02 : +- Relation[Col1#,Col2#] parquet +03 +- Filter isnotnull(Col1#) +04 +- Relation[Col1#,Col2#] parquet + ++----------+---------+---------+---------------+ +|SubPlan |IndexName|IndexType|RuleName | ++----------+---------+---------+---------------+ +|Filter @ 1|joinIndex|CI |FilterIndexRule| +|Filter @ 3|joinIndex|CI |FilterIndexRule| +|Join @ 0 |joinIndex|CI |JoinIndexRule | ++----------+---------+---------+---------------+ + diff --git a/src/test/resources/expected/spark-3.1/subquery.txt b/src/test/resources/expected/spark-3.1/subquery.txt index e4be7a355..c83b78a6d 100644 --- a/src/test/resources/expected/spark-3.1/subquery.txt +++ b/src/test/resources/expected/spark-3.1/subquery.txt @@ -40,3 +40,7 @@ Physical operator stats: |WholeStageCodegen (1)| 1| 1| 0| +---------------------+-------------------+------------------+----------+ +============================================================= +Applicable indexes: +============================================================= +No applicable indexes. Try hyperspace.whyNot() diff --git a/src/test/scala/com/microsoft/hyperspace/index/covering/FilterIndexRuleTest.scala b/src/test/scala/com/microsoft/hyperspace/index/covering/FilterIndexRuleTest.scala index dfbc30e53..6631b7b02 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/covering/FilterIndexRuleTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/covering/FilterIndexRuleTest.scala @@ -129,12 +129,12 @@ class FilterIndexRuleTest extends HyperspaceRuleSuite { assert( msg.exists( _.equals( - s"[$indexName1,FilterColumnFilter] Index does not contain required columns. " + - "Required columns: [c3,c2,c4], Indexed & included columns: [c3,c2,c1]"))) + s"Index does not contain required columns. " + + "Required columns: [c3,c2,c4], Index columns: [c3,c2,c1]"))) case `indexName2` | `indexName3` => assert( msg.exists( - _.contains("The first indexed column should be in filter condition columns."))) + _.contains("The first indexed column should be used in filter conditions."))) } } @@ -163,16 +163,16 @@ class FilterIndexRuleTest extends HyperspaceRuleSuite { verifyTransformedPlanWithIndex(transformedPlan, indexName2) allIndexes.foreach { index => val reasons = index.getTagValue(originalPlan, IndexLogEntryTags.FILTER_REASONS) - assert(reasons.isDefined) - val msg = reasons.get.map(_.verboseStr) index.name match { case `indexName1` => + val msg = reasons.get.map(_.verboseStr) assert( msg.exists( - _.contains("The first indexed column should be in filter condition columns."))) + _.contains("The first indexed column should be used in filter conditions."))) case `indexName2` => - assert(msg.isEmpty) + assert(reasons.isEmpty) case `indexName3` => + val msg = reasons.get.map(_.verboseStr) assert(msg.exists(_.contains(s"Another candidate index is applied: $indexName2"))) } } diff --git a/src/test/scala/com/microsoft/hyperspace/index/covering/JoinIndexRuleTest.scala b/src/test/scala/com/microsoft/hyperspace/index/covering/JoinIndexRuleTest.scala index 762d32a5a..3c52eb6db 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/covering/JoinIndexRuleTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/covering/JoinIndexRuleTest.scala @@ -164,7 +164,7 @@ class JoinIndexRuleTest extends HyperspaceRuleSuite with SQLHelper { val reasons = index.getTagValue(originalPlan, IndexLogEntryTags.FILTER_REASONS) assert(reasons.isDefined) val msg = reasons.get.map(_.verboseStr) - assert(msg.exists(_.contains("Not eligible Join - no join condition."))) + assert(msg.exists(_.contains("Join condition is not eligible. Reason: No join condition"))) } } @@ -179,7 +179,8 @@ class JoinIndexRuleTest extends HyperspaceRuleSuite with SQLHelper { assert(reasons.isDefined) val msg = reasons.get.map(_.verboseStr) assert( - msg.exists(_.contains("Join condition is not eligible. Equi-Joins in simple CNF"))) + msg.exists( + _.contains("Join condition is not eligible. Reason: Non equi-join or has literal"))) } } @@ -194,7 +195,8 @@ class JoinIndexRuleTest extends HyperspaceRuleSuite with SQLHelper { assert(reasons.isDefined) val msg = reasons.get.map(_.verboseStr) assert( - msg.exists(_.contains("Join condition is not eligible. Equi-Joins in simple CNF"))) + msg.exists( + _.contains("Join condition is not eligible. Reason: Non equi-join or has literal"))) } } @@ -209,7 +211,8 @@ class JoinIndexRuleTest extends HyperspaceRuleSuite with SQLHelper { assert(reasons.isDefined) val msg = reasons.get.map(_.verboseStr) assert( - msg.exists(_.contains("Join condition is not eligible. Equi-Joins in simple CNF"))) + msg.exists( + _.contains("Join condition is not eligible. Reason: Non equi-join or has literal"))) } } @@ -228,40 +231,53 @@ class JoinIndexRuleTest extends HyperspaceRuleSuite with SQLHelper { assert(updatedPlan.equals(originalPlan)) allIndexes.foreach { index => - val msg = index.getTagValue(originalPlan, IndexLogEntryTags.FILTER_REASONS) - assert(msg.isDefined) - + val reasons = index.getTagValue(originalPlan, IndexLogEntryTags.FILTER_REASONS) + assert(reasons.isDefined) + val msg = reasons.get.map(_.verboseStr) index.name match { case "t1i1" => assert( - msg.get.toSet.equals(Set( - "All join condition column should be the indexed columns. " + - "Join columns: [t1c2], Indexed columns: [t1c1]", - "No available indexes for right subplan.")), - msg.get) + msg.toSet.equals( + Set( + "All join condition column and indexed column should be the same. " + + "Join columns: [t1c2], Indexed columns for left subplan: [t1c1]", + "No available indexes for right subplan. " + + "Both left and right indexes are required for Join query.")), + msg) case "t1i2" => assert( - msg.get.toSet.equals(Set( - "All join condition column should be the indexed columns. " + - "Join columns: [t1c2], Indexed columns: [t1c1,t1c2]", - "No available indexes for right subplan.")), - msg.get) + msg.toSet.equals( + Set( + "All join condition column and indexed column should be the same. " + + "Join columns: [t1c2], Indexed columns for left subplan: [t1c1, t1c2]", + "No available indexes for right subplan. " + + "Both left and right indexes are required for Join query.")), + msg) case "t1i3" => assert( - msg.get.toSet - .equals(Set("No available indexes for right subplan."))) + msg.toSet + .equals( + Set("No available indexes for right subplan. " + + "Both left and right indexes are required for Join query.")), + msg) case "t2i1" => assert( - msg.get.toSet.equals(Set( - "All join condition column should be the indexed columns. " + - "Join columns: [t2c2], Indexed columns: [t2c1]", - "No available indexes for right subplan."))) + msg.toSet.equals( + Set( + "All join condition column and indexed column should be the same. " + + "Join columns: [t2c2], Indexed columns for right subplan: [t2c1]", + "No available indexes for right subplan. " + + "Both left and right indexes are required for Join query.")), + msg) case "t2i2" => assert( - msg.get.toSet.equals(Set( - "All join condition column should be the indexed columns. " + - "Join columns: [t2c2], Indexed columns: [t2c1,t2c2]", - "No available indexes for right subplan."))) + msg.toSet.equals( + Set( + "All join condition column and indexed column should be the same. " + + "Join columns: [t2c2], Indexed columns for right subplan: [t2c1, t2c2]", + "No available indexes for right subplan. " + + "Both left and right indexes are required for Join query.")), + msg) } } } @@ -283,24 +299,29 @@ class JoinIndexRuleTest extends HyperspaceRuleSuite with SQLHelper { assert(updatedPlan.equals(originalPlan)) allIndexes.foreach { index => - val msg = index.getTagValue(originalPlan, IndexLogEntryTags.FILTER_REASONS) - assert(msg.isDefined) + val reasons = index.getTagValue(originalPlan, IndexLogEntryTags.FILTER_REASONS) + assert(reasons.isDefined) + val msg = reasons.get.map(_.verboseStr) index.name match { case "t1i1" => assert( - msg.get.toSet.equals(Set( - "Index does not contain all required columns. " + - "Required columns: [t1c1,t1c4], Index columns: [t1c1,t1c3]", - "No available indexes for left subplan.")), - msg.get) + msg.toSet.equals( + Set( + "Index does not contain required columns for left subplan. " + + "Required indexed columns: [t1c1, t1c4], Indexed columns: [t1c1]", + "No available indexes for left subplan. " + + "Both left and right indexes are required for Join query.")), + msg) case "t2i1" => assert( - msg.get.toSet.equals( + msg.toSet.equals( Set( - "Index does not contain all required columns. " + - "Required columns: [t2c1,t2c4], Index columns: [t2c1,t2c3]", - "No available indexes for left subplan."))) + "Index does not contain required columns for right subplan. " + + "Required indexed columns: [t2c1, t2c4], Indexed columns: [t2c1]", + "No available indexes for left subplan. " + + "Both left and right indexes are required for Join query.")), + msg) case _ => } } @@ -448,9 +469,8 @@ class JoinIndexRuleTest extends HyperspaceRuleSuite with SQLHelper { val msg = reasons.get.map(_.verboseStr) assert( msg.size == 1 && msg.head.contains( - "Each join condition column should come from relations " + - "directly and attributes from left plan must exclusively have one-to-one mapping " + - "with attributes from right plan. E.g. join(A = B and A = D) is not eligible.")) + "Join condition is not eligible. Reason: incompatible left and right join columns"), + msg) } } {