From 77a3eb7fce9205956a70f059634645760961d885 Mon Sep 17 00:00:00 2001 From: EJ Song Date: Fri, 28 May 2021 10:16:16 -0700 Subject: [PATCH] Introduce whyNot API --- .../com/microsoft/hyperspace/Hyperspace.scala | 13 +- .../hyperspace/index/IndexLogEntry.scala | 13 ++ .../hyperspace/index/IndexLogEntryTags.scala | 9 +- .../index/rules/HyperspaceRule.scala | 12 ++ .../hyperspace/index/rules/IndexFilter.scala | 9 +- .../index/rules/WhyNotAnalyzer.scala | 182 ++++++++++++++++++ .../rules/disabled/FilterIndexRule.scala | 2 + .../index/rules/disabled/JoinIndexRule.scala | 3 + .../ScoreBasedIndexPlanOptimizerTest.scala | 2 + .../rules/CandidateIndexCollectorTest.scala | 2 +- .../rules/FilterIndexRule_disabledTest.scala | 4 +- .../rules/JoinIndexRule_disabledTest.scala | 2 +- 12 files changed, 244 insertions(+), 9 deletions(-) create mode 100644 src/main/scala/com/microsoft/hyperspace/index/rules/WhyNotAnalyzer.scala diff --git a/src/main/scala/com/microsoft/hyperspace/Hyperspace.scala b/src/main/scala/com/microsoft/hyperspace/Hyperspace.scala index cf680421c..ebe24c541 100644 --- a/src/main/scala/com/microsoft/hyperspace/Hyperspace.scala +++ b/src/main/scala/com/microsoft/hyperspace/Hyperspace.scala @@ -21,7 +21,7 @@ import org.apache.spark.sql.{DataFrame, SparkSession} import com.microsoft.hyperspace.index._ import com.microsoft.hyperspace.index.IndexConstants.{OPTIMIZE_MODE_QUICK, REFRESH_MODE_FULL} import com.microsoft.hyperspace.index.plananalysis.PlanAnalyzer -import com.microsoft.hyperspace.index.rules.ApplyHyperspace +import com.microsoft.hyperspace.index.rules.{ApplyHyperspace, WhyNotAnalyzer} import com.microsoft.hyperspace.index.sources.FileBasedSourceProviderManager class Hyperspace(spark: SparkSession) { @@ -171,6 +171,17 @@ class Hyperspace(spark: SparkSession) { indexManager.index(indexName) } + def whyNot(df: DataFrame, indexName: String = "")( + implicit redirectFunc: String => Unit = print): Unit = { + withHyperspaceRuleDisabled { + if (indexName.nonEmpty) { + redirectFunc(WhyNotAnalyzer.whyNotIndexString(spark, df, indexName)) + } else { + redirectFunc(WhyNotAnalyzer.whyNotIndexesString(spark, df)) + } + } + } + private def withHyperspaceRuleDisabled(f: => Unit): Unit = { try { ApplyHyperspace.disableForIndexMaintenance.set(true) diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala index 6e0badba7..1fa838b31 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala @@ -583,10 +583,23 @@ case class IndexLogEntry( tags.get((plan, tag)).map(_.asInstanceOf[T]) } + def getTagValueForAllPlan[T](tag: IndexLogEntryTag[T]): Seq[(LogicalPlan, T)] = { + tags.filter(entry => entry._1._2.equals(tag)).toSeq.map { case (k, v) => + (k._1, v.asInstanceOf[T]) + } + } + def unsetTagValue[T](plan: LogicalPlan, tag: IndexLogEntryTag[T]): Unit = { tags.remove((plan, tag)) } + def unsetTagValueForAllPlan[T](tag: IndexLogEntryTag[T]): Unit = { + val plansWithTag = tags.keys.filter(_._2.name.equals(tag.name)).map(_._1) + plansWithTag.foreach { plan => + tags.remove((plan, tag)) + } + } + def withCachedTag[T](plan: LogicalPlan, tag: IndexLogEntryTag[T])(f: => T): T = { getTagValue(plan, tag) match { case Some(v) => v diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntryTags.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntryTags.scala index 61a752887..05b366bf6 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntryTags.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntryTags.scala @@ -58,7 +58,12 @@ object IndexLogEntryTags { val FILTER_REASONS: IndexLogEntryTag[Seq[String]] = IndexLogEntryTag[Seq[String]]("filterReasons") + // APPLIED_INDEX_RULES stores rule's names applied to plan + val APPLIED_INDEX_RULES: IndexLogEntryTag[Seq[String]] = + IndexLogEntryTag[Seq[String]]("appliedIndexRules") + // FILTER_REASONS_ENABLED indicates whether whyNotAPI is enabled or not. - val FILTER_REASONS_ENABLED: IndexLogEntryTag[Boolean] = - IndexLogEntryTag[Boolean]("filterReasonsEnabled") + // If it's enabled, FILTER_REASONS and APPLIED_INDEX_RULES info will be tagged. + val INDEX_PLAN_ANALYSIS_ENABLED: IndexLogEntryTag[Boolean] = + IndexLogEntryTag[Boolean]("indexPlanAnalysisEnabled") } diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/HyperspaceRule.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/HyperspaceRule.scala index a45a13066..8703a075b 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/HyperspaceRule.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/HyperspaceRule.scala @@ -19,6 +19,7 @@ package com.microsoft.hyperspace.index.rules import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import com.microsoft.hyperspace.ActiveSparkSession +import com.microsoft.hyperspace.index.{IndexLogEntry, IndexLogEntryTags} import com.microsoft.hyperspace.index.rules.ApplyHyperspace.{PlanToIndexesMap, PlanToSelectedIndexMap} /** @@ -75,6 +76,17 @@ trait HyperspaceRule extends ActiveSparkSession { (plan, 0) } } + + protected def setAppliedIndexTag(plan: LogicalPlan, index: IndexLogEntry): Unit = { + if (index.getTagValue(IndexLogEntryTags.INDEX_PLAN_ANALYSIS_ENABLED).getOrElse(false)) { + val prevRules = + index.getTagValue(plan, IndexLogEntryTags.APPLIED_INDEX_RULES).getOrElse(Nil) + index.setTagValue( + plan, + IndexLogEntryTags.APPLIED_INDEX_RULES, + prevRules :+ s"[${index.name},${getClass.getSimpleName.split("\\$").last}]") + } + } } /** diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/IndexFilter.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/IndexFilter.scala index d480f25ac..40da36b29 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/IndexFilter.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/IndexFilter.scala @@ -44,11 +44,16 @@ trait IndexFilter extends ActiveSparkSession { index: IndexLogEntry, reasonString: => String): Unit = { if (!condition && index - .getTagValue(IndexLogEntryTags.FILTER_REASONS_ENABLED) + .getTagValue(IndexLogEntryTags.INDEX_PLAN_ANALYSIS_ENABLED) .getOrElse(false)) { + val reasonStringWithInfo = + s"[${index.name},${getClass.getSimpleName.split("\\$").last}] $reasonString" val prevReason = index.getTagValue(plan, IndexLogEntryTags.FILTER_REASONS).getOrElse(Nil) - index.setTagValue(plan, IndexLogEntryTags.FILTER_REASONS, prevReason :+ reasonString) + index.setTagValue( + plan, + IndexLogEntryTags.FILTER_REASONS, + prevReason :+ reasonStringWithInfo) } } diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/WhyNotAnalyzer.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/WhyNotAnalyzer.scala new file mode 100644 index 000000000..41c45d84b --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/WhyNotAnalyzer.scala @@ -0,0 +1,182 @@ +/* + * Copyright (2020) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.rules + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan + +import com.microsoft.hyperspace.{Hyperspace, HyperspaceException} +import com.microsoft.hyperspace.actions.Constants +import com.microsoft.hyperspace.index.{IndexLogEntry, IndexLogEntryTags} + +object WhyNotAnalyzer extends Logging { + + private def cleanupAnalysisTags(indexes: Seq[IndexLogEntry]): Unit = { + indexes.foreach { index => + index.unsetTagValueForAllPlan(IndexLogEntryTags.INDEX_PLAN_ANALYSIS_ENABLED) + index.unsetTagValueForAllPlan(IndexLogEntryTags.FILTER_REASONS) + index.unsetTagValueForAllPlan(IndexLogEntryTags.APPLIED_INDEX_RULES) + } + } + + private def prepareTagsForAnalysis(indexes: Seq[IndexLogEntry]): Unit = { + indexes.foreach { index => + index.setTagValue(IndexLogEntryTags.INDEX_PLAN_ANALYSIS_ENABLED, true) + + // Clean up previous reason tags. + index.unsetTagValueForAllPlan(IndexLogEntryTags.FILTER_REASONS) + index.unsetTagValueForAllPlan(IndexLogEntryTags.APPLIED_INDEX_RULES) + } + } + + private def generateWhyNotString( + planWithoutHyperspace: LogicalPlan, + planWithHyperspace: LogicalPlan, + reasonStrings: Seq[(LogicalPlan, Seq[String])], + applyStrings: Seq[(LogicalPlan, Seq[String])]): String = { + val stringBuilder = new StringBuilder + val originalPlanString = planWithoutHyperspace.numberedTreeString.split('\n') + + stringBuilder.append("\nPlan with Hyperspace\n") + stringBuilder.append("============================================\n") + stringBuilder.append(planWithHyperspace) + stringBuilder.append("\n\n") + stringBuilder.append("Plan without Hyperspace\n") + stringBuilder.append("============================================\n") + stringBuilder.append(originalPlanString.mkString("\n")) + stringBuilder.append("\n\n") + + stringBuilder.append("********************************************\n") + stringBuilder.append(s"WhyNot Analysis\n") + + reasonStrings.groupBy(_._1).foreach { + case (plan, stringsForPlan) => + val planStrLines = plan.toString.split('\n') + val firstLineStr = originalPlanString.find(str => str.endsWith(planStrLines.head)) + stringBuilder.append("============================================\n") + stringBuilder.append("Candidate plan:\n") + // Start with numbered plan string + stringBuilder.append(firstLineStr.get) + stringBuilder.append("\n") + val numberSpaces = " " * (firstLineStr.get.length - planStrLines.head.length) + stringBuilder.append(numberSpaces) + stringBuilder.append(planStrLines.tail.toSeq.mkString(s"\n$numberSpaces")) + stringBuilder.append("\n\nFailure reasons:\n") + stringsForPlan.foreach { + case (_, strings) => + stringBuilder.append("- ") + stringBuilder.append(strings.mkString("\n- ")) + stringBuilder.append("\n") + } + } + + stringBuilder.append("\n") + if (applyStrings.nonEmpty) { + stringBuilder.append("********************************************\n") + stringBuilder.append("Applicable indexes\n") + applyStrings.groupBy(_._1).foreach { + case (plan, stringsForPlan) => + val planStrLines = plan.toString.split('\n') + val firstLineStr = originalPlanString.find(str => str.endsWith(planStrLines.head)) + stringBuilder.append("============================================\n") + stringBuilder.append("Source plan:\n") + // Start with numbered plan string + stringBuilder.append(firstLineStr.get) + stringBuilder.append("\n") + val numberSpaces = " " * (firstLineStr.get.length - planStrLines.head.length) + stringBuilder.append(numberSpaces) + stringBuilder.append(planStrLines.tail.toSeq.mkString(s"\n$numberSpaces")) + stringBuilder.append("\n\nApplicable indexes:\n") + stringsForPlan.foreach { + case (_, strings) => + stringBuilder.append("- ") + stringBuilder.append(strings.mkString("\n- ")) + stringBuilder.append("\n") + } + } + } + stringBuilder.toString + } + + def whyNotIndexString(spark: SparkSession, df: DataFrame, indexName: String): String = { + val (planWithHyperspace, reasonStrings, applyStrings) = whyNotIndex(spark, df, indexName) + generateWhyNotString( + df.queryExecution.optimizedPlan, + planWithHyperspace, + reasonStrings, + applyStrings) + } + + def whyNotIndexesString(spark: SparkSession, df: DataFrame): String = { + val (planWithHyperspace, reasonStrings, applyStrings) = whyNotIndexes(spark, df) + generateWhyNotString( + df.queryExecution.optimizedPlan, + planWithHyperspace, + reasonStrings, + applyStrings) + } + + def whyNotIndex(spark: SparkSession, df: DataFrame, indexName: String) + : (LogicalPlan, Seq[(LogicalPlan, Seq[String])], Seq[(LogicalPlan, Seq[String])]) = { + val plan = df.queryExecution.optimizedPlan + + val indexManager = Hyperspace.getContext(spark).indexCollectionManager + val allIndexes = + indexManager.getIndexes().filter(!_.state.equals(Constants.States.DOESNOTEXIST)) + val targetIndex = allIndexes.find(index => index.name.equals(indexName)) + + if (targetIndex.isEmpty) { + throw HyperspaceException(s"Index with name $indexName could not be found.") + } else if (!targetIndex.get.state.equals(Constants.States.ACTIVE)) { + throw HyperspaceException( + s"Index with name $indexName is not ACTIVE state: ${targetIndex.get.state}") + } + + val allActiveIndexes = allIndexes.filter(_.state.equals(Constants.States.ACTIVE)) + applyHyperspaceForAnalysis(plan, allActiveIndexes) + } + + def whyNotIndexes(spark: SparkSession, df: DataFrame) + : (LogicalPlan, Seq[(LogicalPlan, Seq[String])], Seq[(LogicalPlan, Seq[String])]) = { + val plan = df.queryExecution.optimizedPlan + + val indexManager = Hyperspace.getContext(spark).indexCollectionManager + val allActiveIndexes = indexManager.getIndexes(Seq(Constants.States.ACTIVE)) + + if (allActiveIndexes.isEmpty) { + throw HyperspaceException(s"No available ACTIVE indexes") + } + applyHyperspaceForAnalysis(plan, allActiveIndexes) + } + + def applyHyperspaceForAnalysis(plan: LogicalPlan, indexes: Seq[IndexLogEntry]) + : (LogicalPlan, Seq[(LogicalPlan, Seq[String])], Seq[(LogicalPlan, Seq[String])]) = { + try { + prepareTagsForAnalysis(indexes) + val candidateIndexes = CandidateIndexCollector.apply(plan, indexes) + val transformedPlan = new ScoreBasedIndexPlanOptimizer().apply(plan, candidateIndexes) + ( + transformedPlan, + indexes.flatMap(_.getTagValueForAllPlan(IndexLogEntryTags.FILTER_REASONS)), + indexes.flatMap(_.getTagValueForAllPlan(IndexLogEntryTags.APPLIED_INDEX_RULES))) + } finally { + cleanupAnalysisTags(indexes) + } + } + +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/disabled/FilterIndexRule.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/disabled/FilterIndexRule.scala index 5da0a03a0..9a9a70728 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/disabled/FilterIndexRule.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/disabled/FilterIndexRule.scala @@ -153,6 +153,8 @@ object FilterIndexRule extends HyperspaceRule { return plan } + setAppliedIndexTag(plan, indexes.head._2) + // As FilterIndexRule is not intended to support bucketed scan, we set // useBucketUnionForAppended as false. If it's true, Hybrid Scan can cause // unnecessary shuffle for appended data to apply BucketUnion for merging data. diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/disabled/JoinIndexRule.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/disabled/JoinIndexRule.scala index 9a057e5cb..5e7407e78 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/disabled/JoinIndexRule.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/disabled/JoinIndexRule.scala @@ -638,6 +638,9 @@ object JoinIndexRule extends HyperspaceRule with HyperspaceEventLogging { val lIndex = indexes(leftRelation.get.plan) val rIndex = indexes(rightRelation.get.plan) + setAppliedIndexTag(plan, lIndex) + setAppliedIndexTag(plan, rIndex) + val updatedPlan = join .copy( diff --git a/src/test/scala/com/microsoft/hyperspace/index/ScoreBasedIndexPlanOptimizerTest.scala b/src/test/scala/com/microsoft/hyperspace/index/ScoreBasedIndexPlanOptimizerTest.scala index c61e605ae..888b738a2 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/ScoreBasedIndexPlanOptimizerTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/ScoreBasedIndexPlanOptimizerTest.scala @@ -108,6 +108,8 @@ class ScoreBasedIndexPlanOptimizerTest extends QueryTest with HyperspaceSuite { assert(rightChildScore == 50) assert(!rightChildPlan.equals(plan.children.last)) + hyperspace.whyNot(query(leftDf, rightDf)()) + verifyIndexUsage( query(leftDf, rightDf), getIndexFilesPath(leftDfFilterIndexConfig.indexName) ++ diff --git a/src/test/scala/com/microsoft/hyperspace/index/rules/CandidateIndexCollectorTest.scala b/src/test/scala/com/microsoft/hyperspace/index/rules/CandidateIndexCollectorTest.scala index 17d23f2a9..5543b8c97 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/rules/CandidateIndexCollectorTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/rules/CandidateIndexCollectorTest.scala @@ -330,7 +330,7 @@ class CandidateIndexCollectorTest extends HyperspaceRuleSuite with SQLHelper { } val allIndexes = indexList.map(indexName => latestIndexLogEntry(systemPath, indexName)) - allIndexes.foreach(_.setTagValue(IndexLogEntryTags.FILTER_REASONS_ENABLED, true)) + allIndexes.foreach(_.setTagValue(IndexLogEntryTags.INDEX_PLAN_ANALYSIS_ENABLED, true)) val plan1 = spark.read.parquet(dataPath).select("id", "name").queryExecution.optimizedPlan diff --git a/src/test/scala/com/microsoft/hyperspace/index/rules/FilterIndexRule_disabledTest.scala b/src/test/scala/com/microsoft/hyperspace/index/rules/FilterIndexRule_disabledTest.scala index ce2eddfe7..ec875c13e 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/rules/FilterIndexRule_disabledTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/rules/FilterIndexRule_disabledTest.scala @@ -116,7 +116,7 @@ class FilterIndexRule_disabledTest extends HyperspaceRuleSuite { val originalPlan = Project(Seq(c2, c3, c4), filterNode) // c4 is not covered by index val allIndexes = IndexCollectionManager(spark).getIndexes(Seq(Constants.States.ACTIVE)) - allIndexes.foreach(_.setTagValue(IndexLogEntryTags.FILTER_REASONS_ENABLED, true)) + allIndexes.foreach(_.setTagValue(IndexLogEntryTags.INDEX_PLAN_ANALYSIS_ENABLED, true)) val (transformedPlan, score) = applyFilterIndexRuleHelper(originalPlan, allIndexes) assert(transformedPlan.equals(originalPlan), "Plan should not transform.") allIndexes.foreach { index => @@ -154,7 +154,7 @@ class FilterIndexRule_disabledTest extends HyperspaceRuleSuite { val originalPlan = Filter(filterCondition, scanNode) val allIndexes = IndexCollectionManager(spark).getIndexes(Seq(Constants.States.ACTIVE)) - allIndexes.foreach(_.setTagValue(IndexLogEntryTags.FILTER_REASONS_ENABLED, true)) + allIndexes.foreach(_.setTagValue(IndexLogEntryTags.INDEX_PLAN_ANALYSIS_ENABLED, true)) val (transformedPlan, score) = applyFilterIndexRuleHelper(originalPlan, allIndexes) assert(!transformedPlan.equals(originalPlan), "No plan transformation.") verifyTransformedPlanWithIndex(transformedPlan, indexName2) diff --git a/src/test/scala/com/microsoft/hyperspace/index/rules/JoinIndexRule_disabledTest.scala b/src/test/scala/com/microsoft/hyperspace/index/rules/JoinIndexRule_disabledTest.scala index 7c4e93f3e..82ee5b4bb 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/rules/JoinIndexRule_disabledTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/rules/JoinIndexRule_disabledTest.scala @@ -109,7 +109,7 @@ class JoinIndexRule_disabledTest extends HyperspaceRuleSuite with SQLHelper { val indexes = if (allIndexes.isEmpty) { IndexCollectionManager(spark).getIndexes(Seq(Constants.States.ACTIVE)) } else { - allIndexes.foreach(_.setTagValue(IndexLogEntryTags.FILTER_REASONS_ENABLED, true)) + allIndexes.foreach(_.setTagValue(IndexLogEntryTags.INDEX_PLAN_ANALYSIS_ENABLED, true)) allIndexes } val candidateIndexes = CandidateIndexCollector(plan, indexes)