From c3fd62f9ecccbd728e2b68e419027b9a07a354bf Mon Sep 17 00:00:00 2001 From: EJ Song Date: Fri, 28 May 2021 10:16:16 -0700 Subject: [PATCH] Introduce whyNot API --- .../com/microsoft/hyperspace/Hyperspace.scala | 13 +- .../hyperspace/index/IndexLogEntry.scala | 13 ++ .../hyperspace/index/IndexLogEntryTags.scala | 9 +- .../index/rules/FilterIndexRule.scala | 2 + .../index/rules/HyperspaceRule.scala | 12 ++ .../hyperspace/index/rules/IndexFilter.scala | 9 +- .../index/rules/JoinIndexRule.scala | 3 + .../index/rules/WhyNotAnalyzer.scala | 182 ++++++++++++++++++ .../ScoreBasedIndexPlanOptimizerTest.scala | 2 + .../rules/CandidateIndexCollectorTest.scala | 2 +- .../index/rules/FilterIndexRuleTest.scala | 9 +- .../index/rules/JoinIndexRuleTest.scala | 56 +++--- 12 files changed, 275 insertions(+), 37 deletions(-) create mode 100644 src/main/scala/com/microsoft/hyperspace/index/rules/WhyNotAnalyzer.scala diff --git a/src/main/scala/com/microsoft/hyperspace/Hyperspace.scala b/src/main/scala/com/microsoft/hyperspace/Hyperspace.scala index cf680421c..ebe24c541 100644 --- a/src/main/scala/com/microsoft/hyperspace/Hyperspace.scala +++ b/src/main/scala/com/microsoft/hyperspace/Hyperspace.scala @@ -21,7 +21,7 @@ import org.apache.spark.sql.{DataFrame, SparkSession} import com.microsoft.hyperspace.index._ import com.microsoft.hyperspace.index.IndexConstants.{OPTIMIZE_MODE_QUICK, REFRESH_MODE_FULL} import com.microsoft.hyperspace.index.plananalysis.PlanAnalyzer -import com.microsoft.hyperspace.index.rules.ApplyHyperspace +import com.microsoft.hyperspace.index.rules.{ApplyHyperspace, WhyNotAnalyzer} import com.microsoft.hyperspace.index.sources.FileBasedSourceProviderManager class Hyperspace(spark: SparkSession) { @@ -171,6 +171,17 @@ class Hyperspace(spark: SparkSession) { indexManager.index(indexName) } + def whyNot(df: DataFrame, indexName: String = "")( + implicit redirectFunc: String => Unit = print): Unit = { + withHyperspaceRuleDisabled { + if (indexName.nonEmpty) { + redirectFunc(WhyNotAnalyzer.whyNotIndexString(spark, df, indexName)) + } else { + redirectFunc(WhyNotAnalyzer.whyNotIndexesString(spark, df)) + } + } + } + private def withHyperspaceRuleDisabled(f: => Unit): Unit = { try { ApplyHyperspace.disableForIndexMaintenance.set(true) diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala index 6e0badba7..1fa838b31 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala @@ -583,10 +583,23 @@ case class IndexLogEntry( tags.get((plan, tag)).map(_.asInstanceOf[T]) } + def getTagValueForAllPlan[T](tag: IndexLogEntryTag[T]): Seq[(LogicalPlan, T)] = { + tags.filter(entry => entry._1._2.equals(tag)).toSeq.map { case (k, v) => + (k._1, v.asInstanceOf[T]) + } + } + def unsetTagValue[T](plan: LogicalPlan, tag: IndexLogEntryTag[T]): Unit = { tags.remove((plan, tag)) } + def unsetTagValueForAllPlan[T](tag: IndexLogEntryTag[T]): Unit = { + val plansWithTag = tags.keys.filter(_._2.name.equals(tag.name)).map(_._1) + plansWithTag.foreach { plan => + tags.remove((plan, tag)) + } + } + def withCachedTag[T](plan: LogicalPlan, tag: IndexLogEntryTag[T])(f: => T): T = { getTagValue(plan, tag) match { case Some(v) => v diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntryTags.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntryTags.scala index 61a752887..8908b4dd9 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntryTags.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntryTags.scala @@ -58,7 +58,12 @@ object IndexLogEntryTags { val FILTER_REASONS: IndexLogEntryTag[Seq[String]] = IndexLogEntryTag[Seq[String]]("filterReasons") + // APPLIED_INDEX_RULES stores rule's names can apply the index to the plan. + val APPLICABLE_INDEX_RULES: IndexLogEntryTag[Seq[String]] = + IndexLogEntryTag[Seq[String]]("applicableIndexRules") + // FILTER_REASONS_ENABLED indicates whether whyNotAPI is enabled or not. - val FILTER_REASONS_ENABLED: IndexLogEntryTag[Boolean] = - IndexLogEntryTag[Boolean]("filterReasonsEnabled") + // If it's enabled, FILTER_REASONS and APPLIED_INDEX_RULES info will be tagged. + val INDEX_PLAN_ANALYSIS_ENABLED: IndexLogEntryTag[Boolean] = + IndexLogEntryTag[Boolean]("indexPlanAnalysisEnabled") } diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/FilterIndexRule.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/FilterIndexRule.scala index 5f3c360bd..c8b82dc07 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/FilterIndexRule.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/FilterIndexRule.scala @@ -165,6 +165,8 @@ object FilterIndexRule extends HyperspaceRule { return plan } + setApplicableIndexTag(plan, indexes.head._2) + // As FilterIndexRule is not intended to support bucketed scan, we set // useBucketUnionForAppended as false. If it's true, Hybrid Scan can cause // unnecessary shuffle for appended data to apply BucketUnion for merging data. diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/HyperspaceRule.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/HyperspaceRule.scala index a45a13066..fde404e08 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/HyperspaceRule.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/HyperspaceRule.scala @@ -19,6 +19,7 @@ package com.microsoft.hyperspace.index.rules import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import com.microsoft.hyperspace.ActiveSparkSession +import com.microsoft.hyperspace.index.{IndexLogEntry, IndexLogEntryTags} import com.microsoft.hyperspace.index.rules.ApplyHyperspace.{PlanToIndexesMap, PlanToSelectedIndexMap} /** @@ -75,6 +76,17 @@ trait HyperspaceRule extends ActiveSparkSession { (plan, 0) } } + + protected def setApplicableIndexTag(plan: LogicalPlan, index: IndexLogEntry): Unit = { + if (index.getTagValue(IndexLogEntryTags.INDEX_PLAN_ANALYSIS_ENABLED).getOrElse(false)) { + val prevRules = + index.getTagValue(plan, IndexLogEntryTags.APPLICABLE_INDEX_RULES).getOrElse(Nil) + index.setTagValue( + plan, + IndexLogEntryTags.APPLICABLE_INDEX_RULES, + prevRules :+ s"[${index.name},${getClass.getSimpleName.split("\\$").last}]") + } + } } /** diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/IndexFilter.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/IndexFilter.scala index d480f25ac..40da36b29 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/IndexFilter.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/IndexFilter.scala @@ -44,11 +44,16 @@ trait IndexFilter extends ActiveSparkSession { index: IndexLogEntry, reasonString: => String): Unit = { if (!condition && index - .getTagValue(IndexLogEntryTags.FILTER_REASONS_ENABLED) + .getTagValue(IndexLogEntryTags.INDEX_PLAN_ANALYSIS_ENABLED) .getOrElse(false)) { + val reasonStringWithInfo = + s"[${index.name},${getClass.getSimpleName.split("\\$").last}] $reasonString" val prevReason = index.getTagValue(plan, IndexLogEntryTags.FILTER_REASONS).getOrElse(Nil) - index.setTagValue(plan, IndexLogEntryTags.FILTER_REASONS, prevReason :+ reasonString) + index.setTagValue( + plan, + IndexLogEntryTags.FILTER_REASONS, + prevReason :+ reasonStringWithInfo) } } diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/JoinIndexRule.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/JoinIndexRule.scala index 61c6860cd..870f8e031 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/JoinIndexRule.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/JoinIndexRule.scala @@ -637,6 +637,9 @@ object JoinIndexRule extends HyperspaceRule with HyperspaceEventLogging { val lIndex = indexes(leftRelation.get.plan) val rIndex = indexes(rightRelation.get.plan) + setApplicableIndexTag(plan, lIndex) + setApplicableIndexTag(plan, rIndex) + val updatedPlan = join .copy( diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/WhyNotAnalyzer.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/WhyNotAnalyzer.scala new file mode 100644 index 000000000..2d1ece6c7 --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/WhyNotAnalyzer.scala @@ -0,0 +1,182 @@ +/* + * Copyright (2020) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.rules + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan + +import com.microsoft.hyperspace.{Hyperspace, HyperspaceException} +import com.microsoft.hyperspace.actions.Constants +import com.microsoft.hyperspace.index.{IndexLogEntry, IndexLogEntryTags} + +object WhyNotAnalyzer extends Logging { + + private def cleanupAnalysisTags(indexes: Seq[IndexLogEntry]): Unit = { + indexes.foreach { index => + index.unsetTagValueForAllPlan(IndexLogEntryTags.INDEX_PLAN_ANALYSIS_ENABLED) + index.unsetTagValueForAllPlan(IndexLogEntryTags.FILTER_REASONS) + index.unsetTagValueForAllPlan(IndexLogEntryTags.APPLICABLE_INDEX_RULES) + } + } + + private def prepareTagsForAnalysis(indexes: Seq[IndexLogEntry]): Unit = { + indexes.foreach { index => + index.setTagValue(IndexLogEntryTags.INDEX_PLAN_ANALYSIS_ENABLED, true) + + // Clean up previous reason tags. + index.unsetTagValueForAllPlan(IndexLogEntryTags.FILTER_REASONS) + index.unsetTagValueForAllPlan(IndexLogEntryTags.APPLICABLE_INDEX_RULES) + } + } + + private def generateWhyNotString( + planWithoutHyperspace: LogicalPlan, + planWithHyperspace: LogicalPlan, + reasonStrings: Seq[(LogicalPlan, Seq[String])], + applyStrings: Seq[(LogicalPlan, Seq[String])]): String = { + val stringBuilder = new StringBuilder + val originalPlanString = planWithoutHyperspace.numberedTreeString.split('\n') + + stringBuilder.append("\nPlan with Hyperspace\n") + stringBuilder.append("============================================\n") + stringBuilder.append(planWithHyperspace) + stringBuilder.append("\n\n") + stringBuilder.append("Plan without Hyperspace\n") + stringBuilder.append("============================================\n") + stringBuilder.append(originalPlanString.mkString("\n")) + stringBuilder.append("\n\n") + + stringBuilder.append("********************************************\n") + stringBuilder.append(s"WhyNot Analysis\n") + + reasonStrings.groupBy(_._1).foreach { + case (plan, stringsForPlan) => + val planStrLines = plan.toString.split('\n') + val firstLineStr = originalPlanString.find(str => str.endsWith(planStrLines.head)) + stringBuilder.append("============================================\n") + stringBuilder.append("Candidate plan:\n") + // Start with numbered plan string + stringBuilder.append(firstLineStr.get) + stringBuilder.append("\n") + val numberSpaces = " " * (firstLineStr.get.length - planStrLines.head.length) + stringBuilder.append(numberSpaces) + stringBuilder.append(planStrLines.tail.toSeq.mkString(s"\n$numberSpaces")) + stringBuilder.append("\n\nFailure reasons:\n") + stringsForPlan.foreach { + case (_, strings) => + stringBuilder.append("- ") + stringBuilder.append(strings.mkString("\n- ")) + stringBuilder.append("\n") + } + } + + stringBuilder.append("\n") + if (applyStrings.nonEmpty) { + stringBuilder.append("********************************************\n") + stringBuilder.append("Applicable indexes\n") + applyStrings.groupBy(_._1).foreach { + case (plan, stringsForPlan) => + val planStrLines = plan.toString.split('\n') + val firstLineStr = originalPlanString.find(str => str.endsWith(planStrLines.head)) + stringBuilder.append("============================================\n") + stringBuilder.append("Source plan:\n") + // Start with numbered plan string + stringBuilder.append(firstLineStr.get) + stringBuilder.append("\n") + val numberSpaces = " " * (firstLineStr.get.length - planStrLines.head.length) + stringBuilder.append(numberSpaces) + stringBuilder.append(planStrLines.tail.toSeq.mkString(s"\n$numberSpaces")) + stringBuilder.append("\n\nApplicable indexes:\n") + stringsForPlan.foreach { + case (_, strings) => + stringBuilder.append("- ") + stringBuilder.append(strings.mkString("\n- ")) + stringBuilder.append("\n") + } + } + } + stringBuilder.toString + } + + def whyNotIndexString(spark: SparkSession, df: DataFrame, indexName: String): String = { + val (planWithHyperspace, reasonStrings, applyStrings) = whyNotIndex(spark, df, indexName) + generateWhyNotString( + df.queryExecution.optimizedPlan, + planWithHyperspace, + reasonStrings, + applyStrings) + } + + def whyNotIndexesString(spark: SparkSession, df: DataFrame): String = { + val (planWithHyperspace, reasonStrings, applyStrings) = whyNotIndexes(spark, df) + generateWhyNotString( + df.queryExecution.optimizedPlan, + planWithHyperspace, + reasonStrings, + applyStrings) + } + + def whyNotIndex(spark: SparkSession, df: DataFrame, indexName: String) + : (LogicalPlan, Seq[(LogicalPlan, Seq[String])], Seq[(LogicalPlan, Seq[String])]) = { + val plan = df.queryExecution.optimizedPlan + + val indexManager = Hyperspace.getContext(spark).indexCollectionManager + val allIndexes = + indexManager.getIndexes().filter(!_.state.equals(Constants.States.DOESNOTEXIST)) + val targetIndex = allIndexes.find(index => index.name.equals(indexName)) + + if (targetIndex.isEmpty) { + throw HyperspaceException(s"Index with name $indexName could not be found.") + } else if (!targetIndex.get.state.equals(Constants.States.ACTIVE)) { + throw HyperspaceException( + s"Index with name $indexName is not ACTIVE state: ${targetIndex.get.state}") + } + + val allActiveIndexes = allIndexes.filter(_.state.equals(Constants.States.ACTIVE)) + applyHyperspaceForAnalysis(plan, allActiveIndexes) + } + + def whyNotIndexes(spark: SparkSession, df: DataFrame) + : (LogicalPlan, Seq[(LogicalPlan, Seq[String])], Seq[(LogicalPlan, Seq[String])]) = { + val plan = df.queryExecution.optimizedPlan + + val indexManager = Hyperspace.getContext(spark).indexCollectionManager + val allActiveIndexes = indexManager.getIndexes(Seq(Constants.States.ACTIVE)) + + if (allActiveIndexes.isEmpty) { + throw HyperspaceException(s"No available ACTIVE indexes") + } + applyHyperspaceForAnalysis(plan, allActiveIndexes) + } + + def applyHyperspaceForAnalysis(plan: LogicalPlan, indexes: Seq[IndexLogEntry]) + : (LogicalPlan, Seq[(LogicalPlan, Seq[String])], Seq[(LogicalPlan, Seq[String])]) = { + try { + prepareTagsForAnalysis(indexes) + val candidateIndexes = CandidateIndexCollector.apply(plan, indexes) + val transformedPlan = new ScoreBasedIndexPlanOptimizer().apply(plan, candidateIndexes) + ( + transformedPlan, + indexes.flatMap(_.getTagValueForAllPlan(IndexLogEntryTags.FILTER_REASONS)), + indexes.flatMap(_.getTagValueForAllPlan(IndexLogEntryTags.APPLICABLE_INDEX_RULES))) + } finally { + cleanupAnalysisTags(indexes) + } + } + +} diff --git a/src/test/scala/com/microsoft/hyperspace/index/ScoreBasedIndexPlanOptimizerTest.scala b/src/test/scala/com/microsoft/hyperspace/index/ScoreBasedIndexPlanOptimizerTest.scala index 62f2b2628..2a5137369 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/ScoreBasedIndexPlanOptimizerTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/ScoreBasedIndexPlanOptimizerTest.scala @@ -108,6 +108,8 @@ class ScoreBasedIndexPlanOptimizerTest extends QueryTest with HyperspaceSuite { assert(rightChildScore == 50) assert(!rightChildPlan.equals(plan.children.last)) + hyperspace.whyNot(query(leftDf, rightDf)()) + verifyIndexUsage( query(leftDf, rightDf), getIndexFilesPath(leftDfFilterIndexConfig.indexName) ++ diff --git a/src/test/scala/com/microsoft/hyperspace/index/rules/CandidateIndexCollectorTest.scala b/src/test/scala/com/microsoft/hyperspace/index/rules/CandidateIndexCollectorTest.scala index 17d23f2a9..5543b8c97 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/rules/CandidateIndexCollectorTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/rules/CandidateIndexCollectorTest.scala @@ -330,7 +330,7 @@ class CandidateIndexCollectorTest extends HyperspaceRuleSuite with SQLHelper { } val allIndexes = indexList.map(indexName => latestIndexLogEntry(systemPath, indexName)) - allIndexes.foreach(_.setTagValue(IndexLogEntryTags.FILTER_REASONS_ENABLED, true)) + allIndexes.foreach(_.setTagValue(IndexLogEntryTags.INDEX_PLAN_ANALYSIS_ENABLED, true)) val plan1 = spark.read.parquet(dataPath).select("id", "name").queryExecution.optimizedPlan diff --git a/src/test/scala/com/microsoft/hyperspace/index/rules/FilterIndexRuleTest.scala b/src/test/scala/com/microsoft/hyperspace/index/rules/FilterIndexRuleTest.scala index b2b996adc..602537363 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/rules/FilterIndexRuleTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/rules/FilterIndexRuleTest.scala @@ -116,7 +116,7 @@ class FilterIndexRuleTest extends HyperspaceRuleSuite { val originalPlan = Project(Seq(c2, c3, c4), filterNode) // c4 is not covered by index val allIndexes = IndexCollectionManager(spark).getIndexes(Seq(Constants.States.ACTIVE)) - allIndexes.foreach(_.setTagValue(IndexLogEntryTags.FILTER_REASONS_ENABLED, true)) + allIndexes.foreach(_.setTagValue(IndexLogEntryTags.INDEX_PLAN_ANALYSIS_ENABLED, true)) val (transformedPlan, score) = applyFilterIndexRuleHelper(originalPlan, allIndexes) assert(transformedPlan.equals(originalPlan), "Plan should not transform.") allIndexes.foreach { index => @@ -126,8 +126,9 @@ class FilterIndexRuleTest extends HyperspaceRuleSuite { assert(msg.isDefined) assert( msg.get.exists( - _.equals("Index does not contain required columns. Required columns: " + - "[c3,c2,c3,c4], Indexed & included columns: [c3,c2,c1]"))) + _.equals( + s"[$indexName1,FilterColumnFilter] Index does not contain required columns. " + + "Required columns: [c3,c2,c3,c4], Indexed & included columns: [c3,c2,c1]"))) case `indexName2` | `indexName3` => assert(msg.isDefined) assert( @@ -154,7 +155,7 @@ class FilterIndexRuleTest extends HyperspaceRuleSuite { val originalPlan = Filter(filterCondition, scanNode) val allIndexes = IndexCollectionManager(spark).getIndexes(Seq(Constants.States.ACTIVE)) - allIndexes.foreach(_.setTagValue(IndexLogEntryTags.FILTER_REASONS_ENABLED, true)) + allIndexes.foreach(_.setTagValue(IndexLogEntryTags.INDEX_PLAN_ANALYSIS_ENABLED, true)) val (transformedPlan, score) = applyFilterIndexRuleHelper(originalPlan, allIndexes) assert(!transformedPlan.equals(originalPlan), "No plan transformation.") verifyTransformedPlanWithIndex(transformedPlan, indexName2) diff --git a/src/test/scala/com/microsoft/hyperspace/index/rules/JoinIndexRuleTest.scala b/src/test/scala/com/microsoft/hyperspace/index/rules/JoinIndexRuleTest.scala index daf3da836..e3f4662fe 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/rules/JoinIndexRuleTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/rules/JoinIndexRuleTest.scala @@ -109,7 +109,7 @@ class JoinIndexRuleTest extends HyperspaceRuleSuite with SQLHelper { val indexes = if (allIndexes.isEmpty) { IndexCollectionManager(spark).getIndexes(Seq(Constants.States.ACTIVE)) } else { - allIndexes.foreach(_.setTagValue(IndexLogEntryTags.FILTER_REASONS_ENABLED, true)) + allIndexes.foreach(_.setTagValue(IndexLogEntryTags.INDEX_PLAN_ANALYSIS_ENABLED, true)) allIndexes } val candidateIndexes = CandidateIndexCollector(plan, indexes) @@ -229,32 +229,34 @@ class JoinIndexRuleTest extends HyperspaceRuleSuite with SQLHelper { index.name match { case "t1i1" => assert( - msg.get.toSet.equals( - Set( - "All join condition column should be the indexed columns. " + - "Join columns: [t1c2], Indexed columns: [t1c1]", - "No available indexes for right subplan."))) + msg.get.toSet.equals(Set( + "[t1i1,JoinColumnFilter] All join condition column should be the indexed columns. " + + "Join columns: [t1c2], Indexed columns: [t1c1]", + "[t1i1,JoinColumnFilter] No available indexes for right subplan.")), + msg.get) case "t1i2" => assert( msg.get.toSet.equals(Set( - "All join condition column should be the indexed columns. " + + "[t1i2,JoinColumnFilter] All join condition column should be the indexed columns. " + "Join columns: [t1c2], Indexed columns: [t1c1,t1c2]", - "No available indexes for right subplan."))) + "[t1i2,JoinColumnFilter] No available indexes for right subplan.")), + msg.get) case "t1i3" => - assert(msg.get.toSet.equals(Set("No available indexes for right subplan."))) + assert( + msg.get.toSet + .equals(Set("[t1i3,JoinColumnFilter] No available indexes for right subplan."))) case "t2i1" => assert( - msg.get.toSet.equals( - Set( - "All join condition column should be the indexed columns. " + - "Join columns: [t2c2], Indexed columns: [t2c1]", - "No available indexes for right subplan."))) + msg.get.toSet.equals(Set( + "[t2i1,JoinColumnFilter] All join condition column should be the indexed columns. " + + "Join columns: [t2c2], Indexed columns: [t2c1]", + "[t2i1,JoinColumnFilter] No available indexes for right subplan."))) case "t2i2" => assert( msg.get.toSet.equals(Set( - "All join condition column should be the indexed columns. " + + "[t2i2,JoinColumnFilter] All join condition column should be the indexed columns. " + "Join columns: [t2c2], Indexed columns: [t2c1,t2c2]", - "No available indexes for right subplan."))) + "[t2i2,JoinColumnFilter] No available indexes for right subplan."))) } } } @@ -281,18 +283,18 @@ class JoinIndexRuleTest extends HyperspaceRuleSuite with SQLHelper { index.name match { case "t1i1" => assert( - msg.get.toSet.equals( - Set( - "Index does not contain all required columns. " + - "Required columns: [t1c1,t1c4], Index columns: [t1c1,t1c3]", - "No available indexes for left subplan."))) + msg.get.toSet.equals(Set( + "[t1i1,JoinColumnFilter] Index does not contain all required columns. " + + "Required columns: [t1c1,t1c4], Index columns: [t1c1,t1c3]", + "[t1i1,JoinColumnFilter] No available indexes for left subplan.")), + msg.get) case "t2i1" => assert( msg.get.toSet.equals( Set( - "Index does not contain all required columns. " + + "[t2i1,JoinColumnFilter] Index does not contain all required columns. " + "Required columns: [t2c1,t2c4], Index columns: [t2c1,t2c3]", - "No available indexes for left subplan."))) + "[t2i1,JoinColumnFilter] No available indexes for left subplan."))) case _ => } } @@ -438,10 +440,10 @@ class JoinIndexRuleTest extends HyperspaceRuleSuite with SQLHelper { val msg = index.getTagValue(originalPlan, IndexLogEntryTags.FILTER_REASONS) assert(msg.isDefined) assert( - msg.get.toSet.equals( - Set("Each join condition column should come from relations directly and attributes " + - "from left plan must exclusively have one-to-one mapping with attributes from " + - "right plan. E.g. join(A = B and A = D) is not eligible."))) + msg.get.size == 1 && msg.get.head.contains( + "Each join condition column should come from relations " + + "directly and attributes from left plan must exclusively have one-to-one mapping " + + "with attributes from right plan. E.g. join(A = B and A = D) is not eligible.")) } } {