diff --git a/src/main/scala/com/microsoft/hyperspace/Hyperspace.scala b/src/main/scala/com/microsoft/hyperspace/Hyperspace.scala index afd34dc03..cf680421c 100644 --- a/src/main/scala/com/microsoft/hyperspace/Hyperspace.scala +++ b/src/main/scala/com/microsoft/hyperspace/Hyperspace.scala @@ -21,6 +21,7 @@ import org.apache.spark.sql.{DataFrame, SparkSession} import com.microsoft.hyperspace.index._ import com.microsoft.hyperspace.index.IndexConstants.{OPTIMIZE_MODE_QUICK, REFRESH_MODE_FULL} import com.microsoft.hyperspace.index.plananalysis.PlanAnalyzer +import com.microsoft.hyperspace.index.rules.ApplyHyperspace import com.microsoft.hyperspace.index.sources.FileBasedSourceProviderManager class Hyperspace(spark: SparkSession) { @@ -40,7 +41,9 @@ class Hyperspace(spark: SparkSession) { * @param indexConfig the configuration of index to be created. */ def createIndex(df: DataFrame, indexConfig: IndexConfig): Unit = { - indexManager.create(df, indexConfig) + withHyperspaceRuleDisabled { + indexManager.create(df, indexConfig) + } } /** @@ -87,7 +90,9 @@ class Hyperspace(spark: SparkSession) { * @param mode Refresh mode. Currently supported modes are `incremental` and `full`. */ def refreshIndex(indexName: String, mode: String): Unit = { - indexManager.refresh(indexName, mode) + withHyperspaceRuleDisabled { + indexManager.refresh(indexName, mode) + } } /** @@ -102,7 +107,7 @@ class Hyperspace(spark: SparkSession) { * @param indexName Name of the index to optimize. */ def optimizeIndex(indexName: String): Unit = { - indexManager.optimize(indexName, OPTIMIZE_MODE_QUICK) + optimizeIndex(indexName, OPTIMIZE_MODE_QUICK) } /** @@ -125,7 +130,9 @@ class Hyperspace(spark: SparkSession) { * files, based on a threshold. "full" refers to recreation of index. */ def optimizeIndex(indexName: String, mode: String): Unit = { - indexManager.optimize(indexName, mode) + withHyperspaceRuleDisabled { + indexManager.optimize(indexName, mode) + } } /** @@ -163,6 +170,15 @@ class Hyperspace(spark: SparkSession) { def index(indexName: String): DataFrame = { indexManager.index(indexName) } + + private def withHyperspaceRuleDisabled(f: => Unit): Unit = { + try { + ApplyHyperspace.disableForIndexMaintenance.set(true) + f + } finally { + ApplyHyperspace.disableForIndexMaintenance.set(false) + } + } } object Hyperspace extends ActiveSparkSession { diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/ApplyHyperspace.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/ApplyHyperspace.scala index 99d97139d..197e9f03c 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/ApplyHyperspace.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/ApplyHyperspace.scala @@ -67,8 +67,8 @@ object CandidateIndexCollector extends ActiveSparkSession { * Apply Hyperspace indexes based on the score of each index application. */ class ScoreBasedIndexPlanOptimizer { - // TODO: FilterIndexRule :: JoinIndexRule :: Nil - private val rules: Seq[HyperspaceRule] = NoOpRule :: Nil + private val rules + : Seq[HyperspaceRule] = disabled.FilterIndexRule :: disabled.JoinIndexRule :: NoOpRule :: Nil // Map for memoization. The key is the logical plan before applying [[HyperspaceRule]]s // and its value is a pair of best transformed plan and its score. @@ -78,8 +78,29 @@ class ScoreBasedIndexPlanOptimizer { // If pre-calculated value exists, return it. scoreMap.get(plan).foreach(res => return res) - val optResult = (plan, 0) - // TODO apply indexes recursively. + def recChildren(cur: LogicalPlan): (LogicalPlan, Int) = { + // Get the best plan & score for each child node. + var score = 0 + val resultPlan = cur.mapChildren { child => + val res = recApply(child, indexes) + score += res._2 + res._1 + } + (resultPlan, score) + } + + var optResult = (plan, 0) + rules.foreach { rule => + val (transformedPlan, curScore) = rule(plan, indexes) + if (curScore > 0 || rule.equals(NoOpRule)) { + // Positive curScore means the rule is applied. + val result = recChildren(transformedPlan) + if (optResult._2 < result._2 + curScore) { + // Update if the total score is higher than the previous optimal. + optResult = (result._1, result._2 + curScore) + } + } + } scoreMap.put(plan, optResult) optResult @@ -109,7 +130,15 @@ object ApplyHyperspace type PlanToIndexesMap = Map[LogicalPlan, Seq[IndexLogEntry]] type PlanToSelectedIndexMap = Map[LogicalPlan, IndexLogEntry] + // Flag to disable ApplyHyperspace rule during index maintenance jobs such as createIndex, + // refreshIndex and optimizeIndex. + private[hyperspace] val disableForIndexMaintenance = new ThreadLocal[Boolean] + override def apply(plan: LogicalPlan): LogicalPlan = { + if (disableForIndexMaintenance.get) { + return plan + } + val indexManager = Hyperspace .getContext(spark) .indexCollectionManager diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/IndexFilter.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/IndexFilter.scala index d859165b0..d480f25ac 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/IndexFilter.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/IndexFilter.scala @@ -33,7 +33,7 @@ trait IndexFilter extends ActiveSparkSession { * Append a given reason string to FILTER_REASONS tag of the index if the condition is false and * FILTER_REASONS_ENABLED tag is set to the index. * - * @param condition Flag for reason string + * @param condition Flag to append reason string * @param plan Query plan to tag * @param index Index to tag * @param reasonString Informational message in case condition is false. diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/disabled/FilterIndexRule.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/disabled/FilterIndexRule.scala index d79660cb9..5da0a03a0 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/disabled/FilterIndexRule.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/disabled/FilterIndexRule.scala @@ -129,7 +129,8 @@ object FilterRankFilter extends IndexRankFilter { || applicableIndexes.head._2.isEmpty) { Map.empty } else { - val selected = FilterIndexRanker.rank(spark, plan, applicableIndexes.head._2).get + val relation = RuleUtils.getRelation(spark, plan).get + val selected = FilterIndexRanker.rank(spark, relation.plan, applicableIndexes.head._2).get setFilterReasonTagForRank(plan, applicableIndexes.head._2, selected) Map(applicableIndexes.head._1 -> selected) } diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/disabled/JoinIndexRule.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/disabled/JoinIndexRule.scala index c69c9451c..9a057e5cb 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/disabled/JoinIndexRule.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/disabled/JoinIndexRule.scala @@ -679,7 +679,7 @@ object JoinIndexRule extends HyperspaceRule with HyperspaceEventLogging { .getOrElse { relation.allFileInfos.foldLeft(0L) { (res, f) => if (index.sourceFileInfoSet.contains(f)) { - (res + f.size) // count, total bytes + res + f.size // count, total bytes } else { res } diff --git a/src/main/scala/com/microsoft/hyperspace/package.scala b/src/main/scala/com/microsoft/hyperspace/package.scala index 03081e7be..5a2313976 100644 --- a/src/main/scala/com/microsoft/hyperspace/package.scala +++ b/src/main/scala/com/microsoft/hyperspace/package.scala @@ -19,20 +19,9 @@ package com.microsoft import org.apache.spark.sql.SparkSession import com.microsoft.hyperspace.index.execution.BucketUnionStrategy -import com.microsoft.hyperspace.index.rules.{FilterIndexRule, JoinIndexRule} +import com.microsoft.hyperspace.index.rules.ApplyHyperspace package object hyperspace { - // The order of Hyperspace index rules does matter here, because by our current design, once an - // index rule is applied to a base table, no further index rules can be applied to the same - // table again. - // For instance, let's say the Join rule gets applied first, then the original data source gets - // replaced by its index. Now we have a new logical plan with the index folder as the "new" - // data source. If the Filter rule gets applied on this, no change will happen because - // this "new" data source doesn't have any indexes. - // We therefore choose to put JoinIndexRule before FilterIndexRule to give join indexes - // higher priority, because join indexes typically result in higher performance improvement - // compared to filter indexes. - private val hyperspaceOptimizationRuleBatch = JoinIndexRule :: FilterIndexRule :: Nil /** * Hyperspace-specific implicit class on SparkSession. @@ -47,7 +36,7 @@ package object hyperspace { def enableHyperspace(): SparkSession = { disableHyperspace sparkSession.sessionState.experimentalMethods.extraOptimizations ++= - hyperspaceOptimizationRuleBatch + ApplyHyperspace :: Nil sparkSession.sessionState.experimentalMethods.extraStrategies ++= BucketUnionStrategy :: Nil sparkSession @@ -61,7 +50,7 @@ package object hyperspace { def disableHyperspace(): SparkSession = { val experimentalMethods = sparkSession.sessionState.experimentalMethods experimentalMethods.extraOptimizations = - experimentalMethods.extraOptimizations.filterNot(hyperspaceOptimizationRuleBatch.contains) + experimentalMethods.extraOptimizations.filterNot(ApplyHyperspace.equals) experimentalMethods.extraStrategies = experimentalMethods.extraStrategies.filterNot(BucketUnionStrategy.equals) sparkSession @@ -74,7 +63,7 @@ package object hyperspace { */ def isHyperspaceEnabled(): Boolean = { val experimentalMethods = sparkSession.sessionState.experimentalMethods - hyperspaceOptimizationRuleBatch.forall(experimentalMethods.extraOptimizations.contains) && + experimentalMethods.extraOptimizations.contains(ApplyHyperspace) && experimentalMethods.extraStrategies.contains(BucketUnionStrategy) } } diff --git a/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTest.scala b/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTest.scala index f60d270c1..4e6b85520 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTest.scala @@ -25,11 +25,12 @@ import org.apache.spark.sql.execution.datasources.{FileIndex, HadoopFsRelation, import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec import com.microsoft.hyperspace.{Hyperspace, Implicits, SampleData, TestConfig, TestUtils} +import com.microsoft.hyperspace.actions.Constants import com.microsoft.hyperspace.index.IndexConstants.{GLOBBING_PATTERN_KEY, REFRESH_MODE_INCREMENTAL, REFRESH_MODE_QUICK} import com.microsoft.hyperspace.index.IndexLogEntryTags._ import com.microsoft.hyperspace.index.execution.BucketUnionStrategy -import com.microsoft.hyperspace.index.plans.logical.IndexHadoopFsRelation -import com.microsoft.hyperspace.index.rules.{FilterIndexRule, JoinIndexRule} +import com.microsoft.hyperspace.index.rules.{ApplyHyperspace, CandidateIndexCollector} +import com.microsoft.hyperspace.index.rules.disabled.JoinIndexRule import com.microsoft.hyperspace.util.PathUtils class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite { @@ -70,7 +71,7 @@ class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite { } test("verify enableHyperspace()/disableHyperspace() plug in/out optimization rules.") { - val expectedOptimizationRuleBatch = Seq(JoinIndexRule, FilterIndexRule) + val expectedOptimizationRuleBatch = Seq(ApplyHyperspace) val expectedOptimizationStrategy = Seq(BucketUnionStrategy) assert( @@ -225,10 +226,13 @@ class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite { test("E2E test for join query with alias columns is not supported.") { def verifyNoChange(f: () => DataFrame): Unit = { - spark.disableHyperspace() - val originalPlan = f().queryExecution.optimizedPlan - val updatedPlan = JoinIndexRule(originalPlan) - assert(originalPlan.equals(updatedPlan)) + spark.enableHyperspace() + val plan = f().queryExecution.optimizedPlan + val allIndexes = IndexCollectionManager(spark).getIndexes(Seq(Constants.States.ACTIVE)) + val candidateIndexes = CandidateIndexCollector.apply(plan, allIndexes) + val (updatedPlan, score) = JoinIndexRule.apply(plan, candidateIndexes) + assert(updatedPlan.equals(plan)) + assert(score == 0) } withView("t1", "t2") { diff --git a/src/test/scala/com/microsoft/hyperspace/index/ScoreBasedIndexPlanOptimizerTest.scala b/src/test/scala/com/microsoft/hyperspace/index/ScoreBasedIndexPlanOptimizerTest.scala new file mode 100644 index 000000000..c61e605ae --- /dev/null +++ b/src/test/scala/com/microsoft/hyperspace/index/ScoreBasedIndexPlanOptimizerTest.scala @@ -0,0 +1,174 @@ +/* + * Copyright (2020) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.{DataFrame, QueryTest} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InMemoryFileIndex, LogicalRelation} + +import com.microsoft.hyperspace.{Hyperspace, Implicits, SampleData, TestConfig} +import com.microsoft.hyperspace.actions.Constants +import com.microsoft.hyperspace.index.rules.CandidateIndexCollector +import com.microsoft.hyperspace.index.rules.disabled.{FilterIndexRule, JoinIndexRule} + +class ScoreBasedIndexPlanOptimizerTest extends QueryTest with HyperspaceSuite { + private val testDir = inTempDir("scoreBasedIndexPlanOptimizerTest") + private val fileSystem = new Path(testDir).getFileSystem(new Configuration) + private var hyperspace: Hyperspace = _ + + override def beforeAll(): Unit = { + super.beforeAll() + + spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) + hyperspace = new Hyperspace(spark) + fileSystem.delete(new Path(testDir), true) + } + + before { + // Clear index cache so a new test does not see stale indexes from previous ones. + clearCache() + } + + override def afterAll(): Unit = { + fileSystem.delete(new Path(testDir), true) + super.afterAll() + } + + after { + fileSystem.delete(systemPath, true) + spark.disableHyperspace() + } + + test( + "Verify filter index pair with high score should be prior to " + + "join index pair with lower score.") { + withTempPathAsString { testPath => + SampleData.save(spark, testPath, Seq("c1", "c2", "c3", "c4", "c5")) + + withSQLConf(IndexConstants.INDEX_LINEAGE_ENABLED -> "true") { + { + val leftDf = spark.read.parquet(testPath) + val leftDfJoinIndexConfig = IndexConfig("leftDfJoinIndex", Seq("c3"), Seq("c4")) + hyperspace.createIndex(leftDf, leftDfJoinIndexConfig) + + val rightDf = spark.read.parquet(testPath) + val rightDfJoinIndexConfig = IndexConfig("rightDfJoinIndex", Seq("c3"), Seq("c5")) + hyperspace.createIndex(rightDf, rightDfJoinIndexConfig) + + // Append data to the same path. + leftDf.write.mode("append").parquet(testPath) + } + + val leftDf = spark.read.parquet(testPath) + val leftDfFilterIndexConfig = IndexConfig("leftDfFilterIndex", Seq("c4"), Seq("c3")) + hyperspace.createIndex(leftDf, leftDfFilterIndexConfig) + + val rightDf = spark.read.parquet(testPath) + val rightDfFilterIndexConfig = IndexConfig("rightDfFilterIndex", Seq("c5"), Seq("c3")) + hyperspace.createIndex(rightDf, rightDfFilterIndexConfig) + + def query(left: DataFrame, right: DataFrame): () => DataFrame = { () => + left + .filter("c4 == 2") + .select("c4", "c3") + .join(right.filter("c5 == 3000").select("c5", "c3"), left("c3") === right("c3")) + } + + withSQLConf(TestConfig.HybridScanEnabled: _*) { + // For join index pair, the score is 70 * 0.5(left) + 70 * 0.5(right) = 70 + // because of the appended data. For filter index pair, the score is 50 + 50 = 100 + val plan = query(leftDf, rightDf)().queryExecution.optimizedPlan + val allIndexes = IndexCollectionManager(spark).getIndexes(Seq(Constants.States.ACTIVE)) + val candidateIndexes = CandidateIndexCollector.apply(plan, allIndexes) + val (_, score) = JoinIndexRule.apply(plan, candidateIndexes) + assert(score == 70) + + val (leftChildPlan, leftChildScore) = + FilterIndexRule.apply(plan.children.head, candidateIndexes) + val (rightChildPlan, rightChildScore) = + FilterIndexRule.apply(plan.children.last, candidateIndexes) + assert(leftChildScore == 50) + assert(!leftChildPlan.equals(plan.children.head)) + assert(rightChildScore == 50) + assert(!rightChildPlan.equals(plan.children.last)) + + verifyIndexUsage( + query(leftDf, rightDf), + getIndexFilesPath(leftDfFilterIndexConfig.indexName) ++ + getIndexFilesPath(rightDfFilterIndexConfig.indexName)) + } + } + } + } + + /** + * Verify that the query plan has the expected root paths. + * + * @param optimizedPlan Query plan + * @param expectedPaths Expected root paths + */ + private def verifyQueryPlanHasExpectedRootPaths( + optimizedPlan: LogicalPlan, + expectedPaths: Seq[Path]): Unit = { + assert(getAllRootPaths(optimizedPlan).sortBy(_.getName) === expectedPaths.sortBy(_.getName)) + } + + /** + * Get all rootPaths from a query plan. + * + * @param optimizedPlan Query plan + * @return A sequence of [[Path]] + */ + private def getAllRootPaths(optimizedPlan: LogicalPlan): Seq[Path] = { + optimizedPlan.collect { + case LogicalRelation( + HadoopFsRelation(location: InMemoryFileIndex, _, _, _, _, _), + _, + _, + _) => + location.rootPaths + }.flatten + } + + private def getIndexFilesPath(indexName: String, versions: Seq[Int] = Seq(0)): Seq[Path] = { + versions.flatMap { v => + Content + .fromDirectory( + new Path(systemPath, s"$indexName/${IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX}=$v"), + new FileIdTracker, + new Configuration) + .files + } + } + + private def verifyIndexUsage(f: () => DataFrame, expectedRootPaths: Seq[Path]): Unit = { + spark.disableHyperspace() + val dfWithHyperspaceDisabled = f() + val schemaWithHyperspaceDisabled = dfWithHyperspaceDisabled.schema + + spark.enableHyperspace() + val dfWithHyperspaceEnabled = f() + + verifyQueryPlanHasExpectedRootPaths( + dfWithHyperspaceEnabled.queryExecution.optimizedPlan, + expectedRootPaths) + + assert(schemaWithHyperspaceDisabled.equals(dfWithHyperspaceEnabled.schema)) + } +}