diff --git a/src/main/scala/com/microsoft/hyperspace/index/covering/FilterIndexRule.scala b/src/main/scala/com/microsoft/hyperspace/index/covering/FilterIndexRule.scala index 71a03f335..ca7e81dc0 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/covering/FilterIndexRule.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/covering/FilterIndexRule.scala @@ -99,9 +99,8 @@ object FilterColumnFilter extends QueryPlanIndexFilter { index, FilterReasons.apply( FilterReasonCode.NO_FIRST_INDEXED_COL_COND, - Seq( - ("firstIndexedCol", index.derivedDataset.indexedColumns.head), - ("filterColumns", filterColumnNames.mkString(", "))))) { + ("firstIndexedCol", index.derivedDataset.indexedColumns.head), + ("filterColumns", filterColumnNames.mkString(", ")))) { ResolverUtils .resolve(spark, index.derivedDataset.indexedColumns.head, filterColumnNames) .isDefined @@ -111,9 +110,8 @@ object FilterColumnFilter extends QueryPlanIndexFilter { index, FilterReasons.apply( FilterReasonCode.MISSING_REQUIRED_COL, - Seq( - ("requiredCols", (filterColumnNames ++ projectColumnNames).toSet.mkString(",")), - ("indexCols", index.derivedDataset.referencedColumns.mkString(","))))) { + ("requiredCols", (filterColumnNames ++ projectColumnNames).toSet.mkString(",")), + ("indexCols", index.derivedDataset.referencedColumns.mkString(",")))) { ResolverUtils .resolve( spark, diff --git a/src/main/scala/com/microsoft/hyperspace/index/covering/JoinIndexRule.scala b/src/main/scala/com/microsoft/hyperspace/index/covering/JoinIndexRule.scala index ceb265ff8..616e50df0 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/covering/JoinIndexRule.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/covering/JoinIndexRule.scala @@ -68,7 +68,7 @@ object JoinPlanNodeFilter extends QueryPlanIndexFilter { leftAndRightIndexes, FilterReasons.apply( FilterReasonCode.NOT_ELIGIBLE_JOIN, - Seq(("reason", "Non equi-join or has literal")))) { + ("reason", "Non equi-join or has literal"))) { isJoinConditionSupported(condition) } @@ -78,7 +78,7 @@ object JoinPlanNodeFilter extends QueryPlanIndexFilter { leftAndRightIndexes, FilterReasons.apply( FilterReasonCode.NOT_ELIGIBLE_JOIN, - Seq(("reason", "Non linear left child plan")))) { + ("reason", "Non linear left child plan"))) { isPlanLinear(l) } val rightPlanLinearCond = @@ -87,7 +87,7 @@ object JoinPlanNodeFilter extends QueryPlanIndexFilter { leftAndRightIndexes, FilterReasons.apply( FilterReasonCode.NOT_ELIGIBLE_JOIN, - Seq(("reason", "Non linear right child plan")))) { + ("reason", "Non linear right child plan"))) { isPlanLinear(r) } @@ -110,7 +110,7 @@ object JoinPlanNodeFilter extends QueryPlanIndexFilter { candidateIndexes.values.flatten.toSeq, FilterReasons.apply( FilterReasonCode.NOT_ELIGIBLE_JOIN, - Seq(("reason", "No join condition")))) + ("reason", "No join condition"))) Map.empty case _ => Map.empty @@ -180,7 +180,7 @@ object JoinAttributeFilter extends QueryPlanIndexFilter { candidateIndexes.head._2 ++ candidateIndexes.last._2, FilterReasons.apply( FilterReasonCode.NOT_ELIGIBLE_JOIN, - Seq(("reason", "incompatible left and right join columns")))) { + ("reason", "incompatible left and right join columns"))) { ensureAttributeRequirements( JoinIndexRule.leftRelation.get, JoinIndexRule.rightRelation.get, @@ -365,15 +365,13 @@ object JoinColumnFilter extends QueryPlanIndexFilter { if (withFilterReasonTag( plan, candidateIndexes.head._2 ++ candidateIndexes.last._2, - FilterReasons.apply( - FilterReasonCode.NO_AVAIL_JOIN_INDEX_PAIR, - Seq(("child", "left"))))(lIndexes.nonEmpty) && + FilterReasons.apply(FilterReasonCode.NO_AVAIL_JOIN_INDEX_PAIR, ("child", "left")))( + lIndexes.nonEmpty) && withFilterReasonTag( plan, candidateIndexes.head._2 ++ candidateIndexes.last._2, - FilterReasons.apply( - FilterReasonCode.NO_AVAIL_JOIN_INDEX_PAIR, - Seq(("child", "right"))))(rIndexes.nonEmpty)) { + FilterReasons.apply(FilterReasonCode.NO_AVAIL_JOIN_INDEX_PAIR, ("child", "right")))( + rIndexes.nonEmpty)) { Map(leftRelation.plan -> lIndexes, rightRelation.plan -> rIndexes) } else { Map.empty @@ -432,10 +430,9 @@ object JoinColumnFilter extends QueryPlanIndexFilter { idx, FilterReasons.apply( FilterReasonCode.NOT_ALL_JOIN_COL_INDEXED, - Seq( - ("child", leftOrRight), - ("joinCols", requiredIndexCols.mkString(", ")), - ("indexedCols", idx.indexedColumns.mkString(", "))))) { + ("child", leftOrRight), + ("joinCols", requiredIndexCols.mkString(", ")), + ("indexedCols", idx.indexedColumns.mkString(", ")))) { requiredIndexCols.toSet.equals(idx.indexedColumns.toSet) } && withFilterReasonTag( @@ -443,10 +440,9 @@ object JoinColumnFilter extends QueryPlanIndexFilter { idx, FilterReasons.apply( FilterReasonCode.MISSING_INDEXED_COL, - Seq( - ("child", leftOrRight), - ("requiredIndexedCols", allRequiredCols.mkString(", ")), - ("IndexedCols", idx.indexedColumns.mkString(", "))))) { + ("child", leftOrRight), + ("requiredIndexedCols", allRequiredCols.mkString(", ")), + ("IndexedCols", idx.indexedColumns.mkString(", ")))) { allRequiredCols.forall(allCols.contains) } } @@ -530,7 +526,7 @@ object JoinRankFilter extends IndexRankFilter { setFilterReasonTag( plan, indexes.head._2 ++ indexes.last._2, - FilterReasons.apply(FilterReasonCode.NO_COMPATIBLE_JOIN_INDEX_PAIR, Seq.empty)) + FilterReasons.apply(FilterReasonCode.NO_COMPATIBLE_JOIN_INDEX_PAIR)) Map.empty } } diff --git a/src/main/scala/com/microsoft/hyperspace/index/plananalysis/CandidateIndexAnalyzer.scala b/src/main/scala/com/microsoft/hyperspace/index/plananalysis/CandidateIndexAnalyzer.scala index 63cb4071e..901f97304 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/plananalysis/CandidateIndexAnalyzer.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/plananalysis/CandidateIndexAnalyzer.scala @@ -162,7 +162,6 @@ object CandidateIndexAnalyzer extends Logging { stringBuilder.append(newLine) stringBuilder.append(planWithHyperspace.toString) stringBuilder.append(newLine) - stringBuilder.append(newLine) def printIndexNames(indexNames: Seq[String]): Unit = { indexNames.foreach { idxName => @@ -207,7 +206,7 @@ object CandidateIndexAnalyzer extends Logging { subPlanLocStr, index.name, index.derivedDataset.kindAbbr, - reason.code, + reason.codeStr, reason.argStr, reason.verboseStr) } diff --git a/src/main/scala/com/microsoft/hyperspace/index/plananalysis/FilterReason.scala b/src/main/scala/com/microsoft/hyperspace/index/plananalysis/FilterReason.scala index f723464d3..5922654f4 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/plananalysis/FilterReason.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/plananalysis/FilterReason.scala @@ -16,100 +16,176 @@ package com.microsoft.hyperspace.index.plananalysis -import com.microsoft.hyperspace.index.plananalysis.FilterReasonCode._ - -class FilterReason( - val code: String, - argStrings: => Seq[(String, String)], - verboseString: => String) { - - def argStr: String = { +trait FilterReason { + val args: Seq[(String, String)] + def codeStr: String + final def argStr: String = { // key1=[value1], key2=[value2] - argStrings.map(kv => s"${kv._1}=[${kv._2}]").mkString(",") + args.map(kv => s"${kv._1}=[${kv._2}]").mkString(",") } + def verboseStr: String +} - def verboseStr: String = { - verboseString - } +trait FilterReasonNoArg extends FilterReason { + final override val args: Seq[(String, String)] = Seq.empty +} + +object FilterReasonCode extends Enumeration { + type FilterReasonCode = Value + + // Common + val COL_SCHEMA_MISMATCH = Value + val SOURCE_DATA_CHANGED = Value + val NO_DELETE_SUPPORT = Value + val NO_COMMON_FILES = Value + val TOO_MUCH_APPENDED = Value + val TOO_MUCH_DELETED = Value + val ANOTHER_INDEX_APPLIED = Value + + // CoveringIndex - FilterIndexRule + val NO_FIRST_INDEXED_COL_COND = Value + val MISSING_REQUIRED_COL = Value + + // CoveringIndex - JoinIndexRule + val NOT_ELIGIBLE_JOIN = Value + val NO_AVAIL_JOIN_INDEX_PAIR = Value + val NO_COMPATIBLE_JOIN_INDEX_PAIR = Value + val NOT_ALL_JOIN_COL_INDEXED = Value + val MISSING_INDEXED_COL = Value } object FilterReasons { - def apply(code: FilterReasonCode, argStrings: => Seq[(String, String)]): FilterReason = { + import com.microsoft.hyperspace.index.plananalysis.FilterReasonCode._ + def apply(code: FilterReasonCode, args: (String, String)*): FilterReason = { code match { case COL_SCHEMA_MISMATCH => - new FilterReason( - code.toString, - argStrings, - "Column Schema does not match. Source data columns: [" + argStrings(0)._2 + - "], Index columns: [" + argStrings(1)._2) - case SOURCE_DATA_CHANGE => - new FilterReason(code.toString, argStrings, "Index signature does not match.") + ColSchemaMismatch(args) + case SOURCE_DATA_CHANGED => + SourceDataChanged() case NO_DELETE_SUPPORT => - new FilterReason(code.toString, argStrings, "Index doesn't support deleted files.") + NoDeleteSupport() case NO_COMMON_FILES => - new FilterReason(code.toString, argStrings, "No common files.") + NoCommonFiles() case TOO_MUCH_APPENDED => - new FilterReason( - code.toString, - argStrings, - s"Appended bytes ratio (${argStrings(0)._2}) is larger than " + - s"threshold config ${argStrings(1)._2}). ") + TooMuchAppended(args) case TOO_MUCH_DELETED => - new FilterReason( - code.toString, - argStrings, - s"Deleted bytes ratio (${argStrings(0)._2}) is larger than " + - s"threshold config ${argStrings(1)._2}). ") - case NO_FIRST_INDEXED_COL_COND => - new FilterReason( - code.toString, - argStrings, - "The first indexed column should be used in filter conditions. " + - s"The first indexed column: ${argStrings(0)._2}, " + - s"Columns in filter condition: [${argStrings(1)._2}]") + TooMuchDeleted(args) case MISSING_REQUIRED_COL => - new FilterReason( - code.toString, - argStrings, - s"Index does not contain required column. Required columns: [${argStrings(0)._2}], " + - s"Index columns: [${argStrings(1)._2}]") + MissingRequiredCol(args) + case NO_FIRST_INDEXED_COL_COND => + NoFirstIndexedColCond(args) case NOT_ELIGIBLE_JOIN => - new FilterReason( - code.toString, - argStrings, - s"Join condition is not eligible. Reason: ${argStrings(0)._2}") + NotEligibleJoin(args) case NO_AVAIL_JOIN_INDEX_PAIR => - new FilterReason( - code.toString, - argStrings, - s"No available indexes for ${argStrings(0)._2} subplan. " + - "Both left and right index are required for Join query") - case NOT_ALL_JOIN_COL_INDEXED => - new FilterReason( - code.toString, - argStrings, - s"All join condition column should be the indexed columns. " + - s"Join columns: [${argStrings(0)._2}], Indexed columns: [${argStrings(1)._2}]") + NoAvailJoinIndexPair(args) case MISSING_INDEXED_COL => - new FilterReason( - code.toString, - argStrings, - s"Index does not contain required columns for ${argStrings(0)._2} subplan. " + - s"Required indexed columns: [${argStrings(1)._2}], " + - s"Indexed columns: [${argStrings(2)._2}]") + MissingIndexedCol(args) + case NOT_ALL_JOIN_COL_INDEXED => + NotAllJoinColIndexed(args) case NO_COMPATIBLE_JOIN_INDEX_PAIR => - new FilterReason( - code.toString, - argStrings, - "No compatible left and right index pair." - ) + NoCompatibleJoinIndexPair() case ANOTHER_INDEX_APPLIED => - new FilterReason( - code.toString, - argStrings, - s"Another candidate index is applied: ${argStrings(0)._2}" - ) + AnotherIndexApplied(args) + } + } + case class ColSchemaMismatch(override val args: Seq[(String, String)]) + extends FilterReason { + override final val codeStr: String = "COL_SCHEMA_MISMATCH" + override def verboseStr: String = { + "Column Schema does not match. Source data columns: [" + args(0)._2 + + "], Index columns: [" + args(1)._2 } } + + case class SourceDataChanged() extends FilterReasonNoArg { + override final val codeStr: String = "SOURCE_DATA_CHANGED" + override def verboseStr: String = "Index signature does not match." + } + + case class NoDeleteSupport() extends FilterReasonNoArg { + override def codeStr: String = "NO_DELETE_SUPPORT" + override def verboseStr: String = "Index doesn't support deleted files." + } + + case class NoCommonFiles() extends FilterReasonNoArg { + override def codeStr: String = "NO_COMMON_FILES" + override def verboseStr: String = "No common files." + } + + case class TooMuchAppended(override val args: Seq[(String, String)]) + extends FilterReason { + override def codeStr: String = "TOO_MUCH_APPENDED" + override def verboseStr: String = + s"Appended bytes ratio (${args(0)._2}) is larger than " + + s"threshold config ${args(1)._2}). " + } + + case class TooMuchDeleted(override val args: Seq[(String, String)]) extends FilterReason { + override def codeStr: String = "TOO_MUCH_DELETED" + override def verboseStr: String = + s"Deleted bytes ratio (${args(0)._2}) is larger than " + + s"threshold config ${args(1)._2}). " + } + + case class MissingRequiredCol(override val args: Seq[(String, String)]) + extends FilterReason { + override def codeStr: String = "MISSING_REQUIRED_COL" + override def verboseStr: String = + s"Index does not contain required column. Required columns: [${args(0)._2}], " + + s"Index columns: [${args(1)._2}]" + } + + case class NoFirstIndexedColCond(override val args: Seq[(String, String)]) + extends FilterReason { + override def codeStr: String = "NO_FIRST_INDEXED_COL_COND" + override def verboseStr: String = + "The first indexed column should be used in filter conditions. " + + s"The first indexed column: ${args(0)._2}, " + + s"Columns in filter condition: [${args(1)._2}]" + } + + case class NotEligibleJoin(override val args: Seq[(String, String)]) + extends FilterReason { + override def codeStr: String = "NOT_ELIGIBLE_JOIN" + override def verboseStr: String = + s"Join condition is not eligible. Reason: ${args(0)._2}" + } + + case class NoAvailJoinIndexPair(override val args: Seq[(String, String)]) + extends FilterReason { + override def codeStr: String = "NO_AVAIL_JOIN_INDEX_PAIR" + override def verboseStr: String = + s"No available indexes for ${args(0)._2} subplan. " + + "Both left and right index are required for Join query" + } + + case class MissingIndexedCol(override val args: Seq[(String, String)]) + extends FilterReason { + override def codeStr: String = "MISSING_INDEXED_COL" + override def verboseStr: String = + s"Index does not contain required columns for ${args(0)._2} subplan. " + + s"Required indexed columns: [${args(1)._2}], " + + s"Indexed columns: [${args(2)._2}]" + } + + case class NotAllJoinColIndexed(override val args: Seq[(String, String)]) + extends FilterReason { + override def codeStr: String = "NOT_ALL_JOIN_COL_INDEXED" + override def verboseStr: String = + s"All join condition column should be the indexed columns. " + + s"Join columns: [${args(0)._2}], Indexed columns: [${args(1)._2}]" + } + + case class NoCompatibleJoinIndexPair() extends FilterReasonNoArg { + override def codeStr: String = "NO_COMPATIBLE_JOIN_INDEX_PAIR" + override def verboseStr: String = "No compatible left and right index pair." + } + + case class AnotherIndexApplied(override val args: Seq[(String, String)]) + extends FilterReason { + override def codeStr: String = "ANOTHER_INDEX_APPLIED" + override def verboseStr: String = + s"Another candidate index is applied: ${args(0)._2}" + } } diff --git a/src/main/scala/com/microsoft/hyperspace/index/plananalysis/FilterReasonCode.scala b/src/main/scala/com/microsoft/hyperspace/index/plananalysis/FilterReasonCode.scala deleted file mode 100644 index 7d1a437c7..000000000 --- a/src/main/scala/com/microsoft/hyperspace/index/plananalysis/FilterReasonCode.scala +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Copyright (2020) The Hyperspace Project Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.microsoft.hyperspace.index.plananalysis - -object FilterReasonCode extends Enumeration { - type FilterReasonCode = Value - - val COL_SCHEMA_MISMATCH = Value("COL_SCHEMA_MISMATCH") - val SOURCE_DATA_CHANGE = Value("SOURCE_DATA_CHANGE") - val NO_DELETE_SUPPORT = Value("NO_DELETE_SUPPORT") - val NO_COMMON_FILES = Value("NO_COMMON_FILES") - val TOO_MUCH_APPENDED = Value("TOO_MUCH_APPENDED") - val TOO_MUCH_DELETED = Value("TOO_MUCH_DELETED") - val ANOTHER_INDEX_APPLIED = Value("ANOTHER_INDEX_APPLIED") - - // CoveringIndex - FilterIndexRule - val NO_FIRST_INDEXED_COL_COND = Value("NO_FIRST_INDEXED_COL_COND") - val MISSING_REQUIRED_COL = Value("MISSING_REQUIRED_COL") - - // CoveringIndex - JoinIndexRule - val NOT_ELIGIBLE_JOIN = Value("NOT_ELIGIBLE_JOIN") - val NO_AVAIL_JOIN_INDEX_PAIR = Value("NO_AVAIL_INDEX_PAIR") - val NO_COMPATIBLE_JOIN_INDEX_PAIR = Value("NO_COMPATIBLE_INDEX_PAIR") - val NOT_ALL_JOIN_COL_INDEXED = Value("NOT_ALL_JOIN_COL_INDEXED") - val MISSING_INDEXED_COL = Value("MISSING_INDEXED_COL") -} diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/ColumnSchemaFilter.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/ColumnSchemaFilter.scala index ec2427587..beff7da54 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/ColumnSchemaFilter.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/ColumnSchemaFilter.scala @@ -35,9 +35,8 @@ object ColumnSchemaFilter extends SourcePlanIndexFilter { index, FilterReasons.apply( FilterReasonCode.COL_SCHEMA_MISMATCH, - Seq( - ("sourceColumns", relationColumnNames.mkString(", ")), - ("indexColumns", index.derivedDataset.referencedColumns.mkString(", "))))) { + ("sourceColumns", relationColumnNames.mkString(", ")), + ("indexColumns", index.derivedDataset.referencedColumns.mkString(", ")))) { ResolverUtils .resolve(spark, index.derivedDataset.referencedColumns, relationColumnNames) .isDefined diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/FileSignatureFilter.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/FileSignatureFilter.scala index 6eff030c1..c828675c3 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/FileSignatureFilter.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/FileSignatureFilter.scala @@ -63,7 +63,7 @@ object FileSignatureFilter extends SourcePlanIndexFilter { withFilterReasonTag( plan, index, - FilterReasons.apply(FilterReasonCode.SOURCE_DATA_CHANGE, Seq.empty)) { + FilterReasons.apply(FilterReasonCode.SOURCE_DATA_CHANGED)) { signatureValid(relation, index, signatureMap) } } @@ -137,14 +137,14 @@ object FileSignatureFilter extends SourcePlanIndexFilter { withFilterReasonTag( relation.plan, index, - FilterReasons.apply(FilterReasonCode.NO_DELETE_SUPPORT, Seq.empty)) { + FilterReasons.apply(FilterReasonCode.NO_DELETE_SUPPORT)) { entry.derivedDataset.canHandleDeletedFiles } lazy val hasCommonFilesCond = withFilterReasonTag( relation.plan, index, - FilterReasons.apply(FilterReasonCode.NO_COMMON_FILES, Seq.empty)) { + FilterReasons.apply(FilterReasonCode.NO_COMMON_FILES)) { commonCnt > 0 } @@ -155,9 +155,8 @@ object FileSignatureFilter extends SourcePlanIndexFilter { index, FilterReasons.apply( FilterReasonCode.TOO_MUCH_APPENDED, - Seq( - ("appendedRatio", appendedBytesRatio.toString), - ("hybridScanAppendThreshold", hybridScanAppendThreshold.toString)))) { + ("appendedRatio", appendedBytesRatio.toString), + ("hybridScanAppendThreshold", hybridScanAppendThreshold.toString))) { appendedBytesRatio < hybridScanAppendThreshold } lazy val deleteThresholdCond = withFilterReasonTag( @@ -165,9 +164,8 @@ object FileSignatureFilter extends SourcePlanIndexFilter { index, FilterReasons.apply( FilterReasonCode.TOO_MUCH_DELETED, - Seq( - ("deletedRatio", deletedBytesRatio.toString), - ("hybridScanDeleteThreshold", hybridScanDeleteThreshold.toString)))) { + ("deletedRatio", deletedBytesRatio.toString), + ("hybridScanDeleteThreshold", hybridScanDeleteThreshold.toString))) { deletedBytesRatio < HyperspaceConf.hybridScanDeletedRatioThreshold(spark) } diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/IndexRankFilter.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/IndexRankFilter.scala index 8e58c0584..7b73b64f7 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/IndexRankFilter.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/IndexRankFilter.scala @@ -54,7 +54,7 @@ trait IndexRankFilter extends IndexFilter { index, FilterReasons.apply( FilterReasonCode.ANOTHER_INDEX_APPLIED, - Seq(("appliedIndex", selectedIndex.name)))) + ("appliedIndex", selectedIndex.name))) } } }