From f41e6108c4996900b92de96ddc1ec70f7a29c501 Mon Sep 17 00:00:00 2001 From: Chungmin Lee Date: Tue, 29 Jun 2021 05:12:50 +0900 Subject: [PATCH] Refactoring for an extensible Index API: Part 3 - Move CoveringIndex specific code into the com.microsoft.hyperspace.index.types.covering package - Move traits/classes/objects in the com.microsoft.hyperspace.index.rules package into their own files - Add alias for CoveringIndexConfig for backward compatibility --- python/hyperspace/hyperspace.py | 2 +- .../microsoft/hyperspace/index/package.scala | 34 +++ .../index/rules/ApplyHyperspace.scala | 96 +------ .../index/rules/CandidateIndexCollector.scala | 60 ++++ .../index/rules/ColumnSchemaFilter.scala | 44 +++ .../index/rules/FileSignatureFilter.scala | 186 +++++++++++++ .../index/rules/HyperspaceRule.scala | 20 -- .../hyperspace/index/rules/IndexFilter.scala | 256 +----------------- .../index/rules/IndexRankFilter.scala | 57 ++++ .../hyperspace/index/rules/NoOpRule.scala | 41 +++ .../index/rules/QueryPlanIndexFilter.scala | 36 +++ .../rules/ScoreBasedIndexPlanOptimizer.scala | 78 ++++++ .../index/rules/SourcePlanIndexFilter.scala | 36 +++ .../{ => types/covering}/CoveringIndex.scala | 3 +- .../covering/CoveringIndexConfig.scala} | 53 ++-- .../types/covering/CoveringIndexFilter.scala | 36 +++ .../covering}/FilterIndexRanker.scala | 2 +- .../covering}/FilterIndexRule.scala | 6 +- .../covering}/JoinIndexRanker.scala | 4 +- .../covering}/JoinIndexRule.scala | 8 +- .../{rules => types/covering}/RuleUtils.scala | 4 +- .../actions/RefreshActionTest.scala | 1 + .../index/E2EHyperspaceRulesTest.scala | 2 +- .../{rules => }/HyperspaceRuleSuite.scala | 4 +- .../hyperspace/index/HyperspaceSuite.scala | 11 +- .../hyperspace/index/IndexCacheTest.scala | 1 + .../index/IndexCollectionManagerTest.scala | 1 + .../hyperspace/index/IndexLogEntryTest.scala | 3 +- .../index/IndexLogManagerImplTest.scala | 1 + .../hyperspace/index/IndexManagerTest.scala | 1 + .../hyperspace/index/IndexTest.scala | 1 + .../index/RefreshIndexNestedTest.scala | 1 + .../hyperspace/index/RefreshIndexTest.scala | 1 + .../index/{rules => }/RuleTestHelper.scala | 4 +- .../rules/CandidateIndexCollectorTest.scala | 4 +- .../ScoreBasedIndexPlanOptimizerTest.scala | 6 +- .../covering}/FilterIndexRankerTest.scala | 5 +- .../covering}/FilterIndexRuleTest.scala | 3 +- .../covering}/JoinIndexRankerTest.scala | 5 +- .../covering}/JoinIndexRuleTest.scala | 3 +- .../covering}/RuleUtilsTest.scala | 6 +- .../hyperspace/util/JsonUtilsTest.scala | 1 + 42 files changed, 685 insertions(+), 442 deletions(-) create mode 100644 src/main/scala/com/microsoft/hyperspace/index/package.scala create mode 100644 src/main/scala/com/microsoft/hyperspace/index/rules/CandidateIndexCollector.scala create mode 100644 src/main/scala/com/microsoft/hyperspace/index/rules/ColumnSchemaFilter.scala create mode 100644 src/main/scala/com/microsoft/hyperspace/index/rules/FileSignatureFilter.scala create mode 100644 src/main/scala/com/microsoft/hyperspace/index/rules/IndexRankFilter.scala create mode 100644 src/main/scala/com/microsoft/hyperspace/index/rules/NoOpRule.scala create mode 100644 src/main/scala/com/microsoft/hyperspace/index/rules/QueryPlanIndexFilter.scala create mode 100644 src/main/scala/com/microsoft/hyperspace/index/rules/ScoreBasedIndexPlanOptimizer.scala create mode 100644 src/main/scala/com/microsoft/hyperspace/index/rules/SourcePlanIndexFilter.scala rename src/main/scala/com/microsoft/hyperspace/index/{ => types/covering}/CoveringIndex.scala (99%) rename src/main/scala/com/microsoft/hyperspace/index/{IndexConfig.scala => types/covering/CoveringIndexConfig.scala} (75%) create mode 100644 src/main/scala/com/microsoft/hyperspace/index/types/covering/CoveringIndexFilter.scala rename src/main/scala/com/microsoft/hyperspace/index/{rankers => types/covering}/FilterIndexRanker.scala (97%) rename src/main/scala/com/microsoft/hyperspace/index/{rules => types/covering}/FilterIndexRule.scala (97%) rename src/main/scala/com/microsoft/hyperspace/index/{rankers => types/covering}/JoinIndexRanker.scala (96%) rename src/main/scala/com/microsoft/hyperspace/index/{rules => types/covering}/JoinIndexRule.scala (98%) rename src/main/scala/com/microsoft/hyperspace/index/{rules => types/covering}/RuleUtils.scala (99%) rename src/test/scala/com/microsoft/hyperspace/index/{rules => }/HyperspaceRuleSuite.scala (97%) rename src/test/scala/com/microsoft/hyperspace/index/{rules => }/RuleTestHelper.scala (90%) rename src/test/scala/com/microsoft/hyperspace/index/{ => rules}/ScoreBasedIndexPlanOptimizerTest.scala (97%) rename src/test/scala/com/microsoft/hyperspace/index/{rankers => types/covering}/FilterIndexRankerTest.scala (95%) rename src/test/scala/com/microsoft/hyperspace/index/{rules => types/covering}/FilterIndexRuleTest.scala (98%) rename src/test/scala/com/microsoft/hyperspace/index/{rankers => types/covering}/JoinIndexRankerTest.scala (97%) rename src/test/scala/com/microsoft/hyperspace/index/{rules => types/covering}/JoinIndexRuleTest.scala (99%) rename src/test/scala/com/microsoft/hyperspace/index/{rules => types/covering}/RuleUtilsTest.scala (96%) diff --git a/python/hyperspace/hyperspace.py b/python/hyperspace/hyperspace.py index a220c41c5..547af0d81 100644 --- a/python/hyperspace/hyperspace.py +++ b/python/hyperspace/hyperspace.py @@ -29,7 +29,7 @@ def _getJavaIndexConfig(self, index_config): """ indexed_columns = self._getScalaSeqFromList(index_config.indexedColumns) included_columns = self._getScalaSeqFromList(index_config.includedColumns) - _jindexConfig = self.jvm.com.microsoft.hyperspace.index.IndexConfig( + _jindexConfig = self.jvm.com.microsoft.hyperspace.index.types.covering.CoveringIndexConfig( self.jvm.java.lang.String(index_config.indexName), indexed_columns, included_columns) return _jindexConfig diff --git a/src/main/scala/com/microsoft/hyperspace/index/package.scala b/src/main/scala/com/microsoft/hyperspace/index/package.scala new file mode 100644 index 000000000..b5b43dd66 --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/package.scala @@ -0,0 +1,34 @@ +/* + * Copyright (2020) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace + +import com.microsoft.hyperspace.index.types.covering.CoveringIndexConfig + +package object index { + + /** + * IndexConfig is defined as an alias for [[CoveringIndexConfig]] for + * backward compatibility. + */ + type IndexConfig = CoveringIndexConfig + + /** + * IndexConfig is defined as an alias for [[CoveringIndexConfig]] for + * backward compatibility. + */ + val IndexConfig = CoveringIndexConfig +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/ApplyHyperspace.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/ApplyHyperspace.scala index 17c0ae674..df87202e0 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/ApplyHyperspace.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/ApplyHyperspace.scala @@ -16,109 +16,15 @@ package com.microsoft.hyperspace.index.rules -import scala.collection.mutable - import org.apache.spark.internal.Logging -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule import com.microsoft.hyperspace.{ActiveSparkSession, Hyperspace} import com.microsoft.hyperspace.actions.Constants import com.microsoft.hyperspace.index.IndexLogEntry -import com.microsoft.hyperspace.index.rules.ApplyHyperspace.PlanToIndexesMap import com.microsoft.hyperspace.telemetry.HyperspaceEventLogging -/** - * Collect candidate indexes for each source plan. - */ -object CandidateIndexCollector extends ActiveSparkSession { - private val sourceFilters: Seq[SourcePlanIndexFilter] = - ColumnSchemaFilter :: FileSignatureFilter :: Nil - - private def initializePlanToIndexes( - plan: LogicalPlan, - indexes: Seq[IndexLogEntry]): PlanToIndexesMap = { - val provider = Hyperspace.getContext(spark).sourceProviderManager - plan.collect { - case l: LeafNode if provider.isSupportedRelation(l) => - (l.asInstanceOf[LogicalPlan], indexes) - }.toMap - } - - /** - * Extract candidate indexes for each source plan in the given query plan. - * - * @param plan Original query plan - * @param allIndexes All indexes - * @return Map of source plan to candidate indexes - */ - def apply(plan: LogicalPlan, allIndexes: Seq[IndexLogEntry]): PlanToIndexesMap = { - val planToIndexes = initializePlanToIndexes(plan, allIndexes) - planToIndexes.flatMap { - case (node, allIndexes) => - Some( - node, - sourceFilters.foldLeft(allIndexes) { (indexes, filter) => - filter(node, indexes) - }).filter(_._2.nonEmpty) - } - } -} - -/** - * Apply Hyperspace indexes based on the score of each index application. - */ -class ScoreBasedIndexPlanOptimizer { - private val rules: Seq[HyperspaceRule] = FilterIndexRule :: JoinIndexRule :: NoOpRule :: Nil - - // Map for memoization. The key is the logical plan before applying [[HyperspaceRule]]s - // and its value is a pair of best transformed plan and its score. - private val scoreMap: mutable.HashMap[LogicalPlan, (LogicalPlan, Int)] = mutable.HashMap() - - private def recApply(plan: LogicalPlan, indexes: PlanToIndexesMap): (LogicalPlan, Int) = { - // If pre-calculated value exists, return it. - scoreMap.get(plan).foreach(res => return res) - - def recChildren(cur: LogicalPlan): (LogicalPlan, Int) = { - // Get the best plan & score for each child node. - var score = 0 - val resultPlan = cur.mapChildren { child => - val res = recApply(child, indexes) - score += res._2 - res._1 - } - (resultPlan, score) - } - - var optResult = (plan, 0) - rules.foreach { rule => - val (transformedPlan, curScore) = rule(plan, indexes) - if (curScore > 0 || rule.equals(NoOpRule)) { - // Positive curScore means the rule is applied. - val result = recChildren(transformedPlan) - if (optResult._2 < result._2 + curScore) { - // Update if the total score is higher than the previous optimal. - optResult = (result._1, result._2 + curScore) - } - } - } - - scoreMap.put(plan, optResult) - optResult - } - - /** - * Transform the given query plan to use selected indexes based on score. - * - * @param plan Original query plan - * @param candidateIndexes Map of source plan to candidate indexes - * @return Transformed plan using selected indexes based on score - */ - def apply(plan: LogicalPlan, candidateIndexes: PlanToIndexesMap): LogicalPlan = { - recApply(plan, candidateIndexes)._1 - } -} - /** * Transform the given plan to use Hyperspace indexes. */ diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/CandidateIndexCollector.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/CandidateIndexCollector.scala new file mode 100644 index 000000000..629980abf --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/CandidateIndexCollector.scala @@ -0,0 +1,60 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.rules + +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan} + +import com.microsoft.hyperspace.{ActiveSparkSession, Hyperspace} +import com.microsoft.hyperspace.index.IndexLogEntry +import com.microsoft.hyperspace.index.rules.ApplyHyperspace.PlanToIndexesMap + +/** + * Collect candidate indexes for each source plan. + */ +object CandidateIndexCollector extends ActiveSparkSession { + private val sourceFilters: Seq[SourcePlanIndexFilter] = + ColumnSchemaFilter :: FileSignatureFilter :: Nil + + private def initializePlanToIndexes( + plan: LogicalPlan, + indexes: Seq[IndexLogEntry]): PlanToIndexesMap = { + val provider = Hyperspace.getContext(spark).sourceProviderManager + plan.collect { + case l: LeafNode if provider.isSupportedRelation(l) => + (l.asInstanceOf[LogicalPlan], indexes) + }.toMap + } + + /** + * Extract candidate indexes for each source plan in the given query plan. + * + * @param plan Original query plan + * @param allIndexes All indexes + * @return Map of source plan to candidate indexes + */ + def apply(plan: LogicalPlan, allIndexes: Seq[IndexLogEntry]): PlanToIndexesMap = { + val planToIndexes = initializePlanToIndexes(plan, allIndexes) + planToIndexes.flatMap { + case (node, allIndexes) => + Some( + node, + sourceFilters.foldLeft(allIndexes) { (indexes, filter) => + filter(node, indexes) + }).filter(_._2.nonEmpty) + } + } +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/ColumnSchemaFilter.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/ColumnSchemaFilter.scala new file mode 100644 index 000000000..b18ae27f6 --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/ColumnSchemaFilter.scala @@ -0,0 +1,44 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.rules + +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan + +import com.microsoft.hyperspace.index.IndexLogEntry +import com.microsoft.hyperspace.util.ResolverUtils + +/** + * Check if the given source plan contains all index columns. + */ +object ColumnSchemaFilter extends SourcePlanIndexFilter { + override def apply(plan: LogicalPlan, indexes: Seq[IndexLogEntry]): Seq[IndexLogEntry] = { + val relationColumnNames = plan.output.map(_.name) + + indexes.filter { index => + withFilterReasonTag( + plan, + index, + "Column Schema does not match. " + + s"Relation columns: [${relationColumnNames.mkString(", ")}], " + + s"Index columns: [${(index.derivedDataset.referencedColumns).mkString(", ")}]") { + ResolverUtils + .resolve(spark, index.derivedDataset.referencedColumns, relationColumnNames) + .isDefined + } + } + } +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/FileSignatureFilter.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/FileSignatureFilter.scala new file mode 100644 index 000000000..df9733d6f --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/FileSignatureFilter.scala @@ -0,0 +1,186 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.rules + +import scala.collection.mutable + +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan + +import com.microsoft.hyperspace.Hyperspace +import com.microsoft.hyperspace.index.{IndexLogEntry, IndexLogEntryTags, LogicalPlanSignatureProvider} +import com.microsoft.hyperspace.index.IndexLogEntryTags.{HYBRIDSCAN_RELATED_CONFIGS, IS_HYBRIDSCAN_CANDIDATE} +import com.microsoft.hyperspace.index.sources.FileBasedRelation +import com.microsoft.hyperspace.util.HyperspaceConf + +/** + * Check if an index can leverage source data of the given source plan. + */ +object FileSignatureFilter extends SourcePlanIndexFilter { + + /** + * Filter the given indexes by matching signatures. + * + * If Hybrid Scan is enabled, it compares the file metadata directly, and does not + * match signatures. By doing that, we could perform file-level comparison between + * index data files and the input files of the given plan. If appended files and + * deleted files are less than threshold configs, the index is not filtered out. + * Also, HYBRIDSCAN_REQUIRED tag is set as true if there is any of appended or deleted files, + * for the plan transformation function in application step. + * + * @param plan Source plan + * @param indexes Indexes + * @return Indexes which meet conditions of Filter + */ + override def apply(plan: LogicalPlan, indexes: Seq[IndexLogEntry]): Seq[IndexLogEntry] = { + val provider = Hyperspace.getContext(spark).sourceProviderManager + val hybridScanEnabled = HyperspaceConf.hybridScanEnabled(spark) + if (hybridScanEnabled) { + val relation = provider.getRelation(plan) + prepareHybridScanCandidateSelection(relation.plan, indexes) + indexes.flatMap { index => + getHybridScanCandidate(relation, index) + } + } else { + val relation = provider.getRelation(plan) + // Map of a signature provider to a signature generated for the given plan. + val signatureMap = mutable.Map[String, Option[String]]() + indexes.filter { index => + withFilterReasonTag( + plan, + index, + s"Index signature does not match. Try Hybrid Scan or refreshIndex.") { + signatureValid(relation, index, signatureMap) + } + } + } + } + + private def signatureValid( + relation: FileBasedRelation, + entry: IndexLogEntry, + signatureMap: mutable.Map[String, Option[String]]): Boolean = { + entry.withCachedTag(relation.plan, IndexLogEntryTags.SIGNATURE_MATCHED) { + val sourcePlanSignatures = entry.source.plan.properties.fingerprint.properties.signatures + assert(sourcePlanSignatures.length == 1) + val sourcePlanSignature = sourcePlanSignatures.head + + signatureMap.getOrElseUpdate( + sourcePlanSignature.provider, + LogicalPlanSignatureProvider + .create(sourcePlanSignature.provider) + .signature(relation.plan)) match { + case Some(s) => s.equals(sourcePlanSignature.value) + case None => false + } + } + } + + private def prepareHybridScanCandidateSelection( + plan: LogicalPlan, + indexes: Seq[IndexLogEntry]): Unit = { + assert(HyperspaceConf.hybridScanEnabled(spark)) + val curConfigs = Seq( + HyperspaceConf.hybridScanAppendedRatioThreshold(spark).toString, + HyperspaceConf.hybridScanDeletedRatioThreshold(spark).toString) + + indexes.foreach { index => + val taggedConfigs = index.getTagValue(plan, HYBRIDSCAN_RELATED_CONFIGS) + if (taggedConfigs.isEmpty || !taggedConfigs.get.equals(curConfigs)) { + // Need to reset cached tags as these config changes can change the result. + index.unsetTagValue(plan, IS_HYBRIDSCAN_CANDIDATE) + index.setTagValue(plan, HYBRIDSCAN_RELATED_CONFIGS, curConfigs) + } + } + } + + private def getHybridScanCandidate( + relation: FileBasedRelation, + index: IndexLogEntry): Option[IndexLogEntry] = { + // TODO: As in [[PlanSignatureProvider]], Source plan signature comparison is required to + // support arbitrary source plans at index creation. + // See https://github.com/microsoft/hyperspace/issues/158 + + val entry = relation.closestIndex(index) + + val isHybridScanCandidate = + entry.withCachedTag(relation.plan, IndexLogEntryTags.IS_HYBRIDSCAN_CANDIDATE) { + // Find the number of common files between the source relation and index source files. + // The total size of common files are collected and tagged for candidate. + val (commonCnt, commonBytes) = relation.allFileInfos.foldLeft(0L, 0L) { (res, f) => + if (entry.sourceFileInfoSet.contains(f)) { + (res._1 + 1, res._2 + f.size) // count, total bytes + } else { + res + } + } + + val appendedBytesRatio = 1 - commonBytes / relation.allFileSizeInBytes.toFloat + val deletedBytesRatio = 1 - commonBytes / entry.sourceFilesSizeInBytes.toFloat + + // Tag to original index log entry to check the reason string with the given log entry. + lazy val hasLineageColumnCond = + withFilterReasonTag(relation.plan, index, "Index doesn't support deleted files.")( + entry.derivedDataset.canHandleDeletedFiles) + lazy val hasCommonFilesCond = + withFilterReasonTag(relation.plan, index, "No common files.")(commonCnt > 0) + lazy val appendThresholdCond = withFilterReasonTag( + relation.plan, + index, + s"Appended bytes ratio ($appendedBytesRatio) is larger than " + + s"threshold config ${HyperspaceConf.hybridScanAppendedRatioThreshold(spark)}") { + appendedBytesRatio < HyperspaceConf.hybridScanAppendedRatioThreshold(spark) + } + lazy val deleteThresholdCond = withFilterReasonTag( + relation.plan, + index, + s"Deleted bytes ratio ($deletedBytesRatio) is larger than " + + s"threshold config ${HyperspaceConf.hybridScanDeletedRatioThreshold(spark)}") { + deletedBytesRatio < HyperspaceConf.hybridScanDeletedRatioThreshold(spark) + } + + // For append-only Hybrid Scan, deleted files are not allowed. + val deletedCnt = entry.sourceFileInfoSet.size - commonCnt + + val isCandidate = if (deletedCnt == 0) { + hasCommonFilesCond && appendThresholdCond + } else { + hasLineageColumnCond && hasCommonFilesCond && appendThresholdCond && deleteThresholdCond + } + + if (isCandidate) { + entry.setTagValue( + relation.plan, + IndexLogEntryTags.COMMON_SOURCE_SIZE_IN_BYTES, + commonBytes) + + // If there is no change in source dataset, the index will be applied by + // RuleUtils.transformPlanToUseIndexOnlyScan. + entry.setTagValue( + relation.plan, + IndexLogEntryTags.HYBRIDSCAN_REQUIRED, + !(commonCnt == entry.sourceFileInfoSet.size + && commonCnt == relation.allFileInfos.size)) + } + isCandidate + } + if (isHybridScanCandidate) { + Some(entry) + } else { + None + } + } +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/HyperspaceRule.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/HyperspaceRule.scala index a45a13066..5fbcfaab6 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/HyperspaceRule.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/HyperspaceRule.scala @@ -76,23 +76,3 @@ trait HyperspaceRule extends ActiveSparkSession { } } } - -/** - * No-op rule for traversal. - */ -object NoOpRule extends HyperspaceRule { - - object FilterAll extends QueryPlanIndexFilter { - override def apply(plan: LogicalPlan, candidateIndexes: PlanToIndexesMap): PlanToIndexesMap = - Map.empty - } - - override val filtersOnQueryPlan = FilterAll :: Nil - - // As there's no applicable index after [[FilterAll]], indexRanker is not reachable. - override val indexRanker = null - - override def applyIndex(plan: LogicalPlan, indexes: PlanToSelectedIndexMap): LogicalPlan = plan - - override def score(plan: LogicalPlan, indexes: PlanToSelectedIndexMap): Int = 0 -} diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/IndexFilter.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/IndexFilter.scala index 5d7115f5d..89a674b5c 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/IndexFilter.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/IndexFilter.scala @@ -16,16 +16,10 @@ package com.microsoft.hyperspace.index.rules -import scala.collection.mutable - import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import com.microsoft.hyperspace.{ActiveSparkSession, Hyperspace} -import com.microsoft.hyperspace.index.{IndexLogEntry, IndexLogEntryTags, LogicalPlanSignatureProvider} -import com.microsoft.hyperspace.index.IndexLogEntryTags.{HYBRIDSCAN_RELATED_CONFIGS, IS_HYBRIDSCAN_CANDIDATE} -import com.microsoft.hyperspace.index.rules.ApplyHyperspace.{PlanToIndexesMap, PlanToSelectedIndexMap} -import com.microsoft.hyperspace.index.sources.FileBasedRelation -import com.microsoft.hyperspace.util.{HyperspaceConf, ResolverUtils} +import com.microsoft.hyperspace.ActiveSparkSession +import com.microsoft.hyperspace.index.{IndexLogEntry, IndexLogEntryTags} trait IndexFilter extends ActiveSparkSession { @@ -110,249 +104,3 @@ trait IndexFilter extends ActiveSparkSession { } } } - -/** - * IndexFilter used in CandidateIndexCollector. - */ -trait SourcePlanIndexFilter extends IndexFilter { - - /** - * Filter out indexes for the given source plan. - * - * @param plan Source plan - * @param indexes Indexes - * @return Indexes which meet conditions of Filter - */ - def apply(plan: LogicalPlan, indexes: Seq[IndexLogEntry]): Seq[IndexLogEntry] -} - -/** - * IndexFilter used in HyperspaceRule. - */ -trait QueryPlanIndexFilter extends IndexFilter { - - /** - * Filter out candidate indexes for the given query plan. - * - * @param plan Query plan - * @param candidateIndexes Map of source plan to candidate indexes - * @return Map of source plan to applicable indexes which meet conditions of Filter - */ - def apply(plan: LogicalPlan, candidateIndexes: PlanToIndexesMap): PlanToIndexesMap -} - -/** - * IndexFilter used in ranking applicable indexes. - */ -trait IndexRankFilter extends IndexFilter { - - /** - * Rank best index for the given query plan. - * - * @param plan Query plan - * @param applicableIndexes Map of source plan to applicable indexes - * @return Map of source plan to selected index - */ - def apply(plan: LogicalPlan, applicableIndexes: PlanToIndexesMap): PlanToSelectedIndexMap - - /** - * Set FILTER_REASONS tag for unselected indexes. - * - * @param plan Plan to tag - * @param indexes Indexes to tag - * @param selectedIndex Selected index - */ - protected def setFilterReasonTagForRank( - plan: LogicalPlan, - indexes: Seq[IndexLogEntry], - selectedIndex: IndexLogEntry): Unit = { - indexes.foreach { index => - setFilterReasonTag( - selectedIndex.name.equals(index.name), - plan, - index, - s"Another candidate index is applied: ${selectedIndex.name}") - } - } -} - -/** - * Check if the given source plan contains all index columns. - */ -object ColumnSchemaFilter extends SourcePlanIndexFilter { - override def apply(plan: LogicalPlan, indexes: Seq[IndexLogEntry]): Seq[IndexLogEntry] = { - val relationColumnNames = plan.output.map(_.name) - - indexes.filter { index => - withFilterReasonTag( - plan, - index, - "Column Schema does not match. " + - s"Relation columns: [${relationColumnNames.mkString(", ")}], " + - s"Index columns: [${(index.derivedDataset.referencedColumns).mkString(", ")}]") { - ResolverUtils - .resolve(spark, index.derivedDataset.referencedColumns, relationColumnNames) - .isDefined - } - } - } -} - -/** - * Check if an index can leverage source data of the given source plan. - */ -object FileSignatureFilter extends SourcePlanIndexFilter { - - /** - * Filter the given indexes by matching signatures. - * - * If Hybrid Scan is enabled, it compares the file metadata directly, and does not - * match signatures. By doing that, we could perform file-level comparison between - * index data files and the input files of the given plan. If appended files and - * deleted files are less than threshold configs, the index is not filtered out. - * Also, HYBRIDSCAN_REQUIRED tag is set as true if there is any of appended or deleted files, - * for the plan transformation function in application step. - * - * @param plan Source plan - * @param indexes Indexes - * @return Indexes which meet conditions of Filter - */ - override def apply(plan: LogicalPlan, indexes: Seq[IndexLogEntry]): Seq[IndexLogEntry] = { - val provider = Hyperspace.getContext(spark).sourceProviderManager - val hybridScanEnabled = HyperspaceConf.hybridScanEnabled(spark) - if (hybridScanEnabled) { - val relation = provider.getRelation(plan) - prepareHybridScanCandidateSelection(relation.plan, indexes) - indexes.flatMap { index => - getHybridScanCandidate(relation, index) - } - } else { - val relation = provider.getRelation(plan) - // Map of a signature provider to a signature generated for the given plan. - val signatureMap = mutable.Map[String, Option[String]]() - indexes.filter { index => - withFilterReasonTag( - plan, - index, - s"Index signature does not match. Try Hybrid Scan or refreshIndex.") { - signatureValid(relation, index, signatureMap) - } - } - } - } - - private def signatureValid( - relation: FileBasedRelation, - entry: IndexLogEntry, - signatureMap: mutable.Map[String, Option[String]]): Boolean = { - entry.withCachedTag(relation.plan, IndexLogEntryTags.SIGNATURE_MATCHED) { - val sourcePlanSignatures = entry.source.plan.properties.fingerprint.properties.signatures - assert(sourcePlanSignatures.length == 1) - val sourcePlanSignature = sourcePlanSignatures.head - - signatureMap.getOrElseUpdate( - sourcePlanSignature.provider, - LogicalPlanSignatureProvider - .create(sourcePlanSignature.provider) - .signature(relation.plan)) match { - case Some(s) => s.equals(sourcePlanSignature.value) - case None => false - } - } - } - - private def prepareHybridScanCandidateSelection( - plan: LogicalPlan, - indexes: Seq[IndexLogEntry]): Unit = { - assert(HyperspaceConf.hybridScanEnabled(spark)) - val curConfigs = Seq( - HyperspaceConf.hybridScanAppendedRatioThreshold(spark).toString, - HyperspaceConf.hybridScanDeletedRatioThreshold(spark).toString) - - indexes.foreach { index => - val taggedConfigs = index.getTagValue(plan, HYBRIDSCAN_RELATED_CONFIGS) - if (taggedConfigs.isEmpty || !taggedConfigs.get.equals(curConfigs)) { - // Need to reset cached tags as these config changes can change the result. - index.unsetTagValue(plan, IS_HYBRIDSCAN_CANDIDATE) - index.setTagValue(plan, HYBRIDSCAN_RELATED_CONFIGS, curConfigs) - } - } - } - - private def getHybridScanCandidate( - relation: FileBasedRelation, - index: IndexLogEntry): Option[IndexLogEntry] = { - // TODO: As in [[PlanSignatureProvider]], Source plan signature comparison is required to - // support arbitrary source plans at index creation. - // See https://github.com/microsoft/hyperspace/issues/158 - - val entry = relation.closestIndex(index) - - val isHybridScanCandidate = - entry.withCachedTag(relation.plan, IndexLogEntryTags.IS_HYBRIDSCAN_CANDIDATE) { - // Find the number of common files between the source relation and index source files. - // The total size of common files are collected and tagged for candidate. - val (commonCnt, commonBytes) = relation.allFileInfos.foldLeft(0L, 0L) { (res, f) => - if (entry.sourceFileInfoSet.contains(f)) { - (res._1 + 1, res._2 + f.size) // count, total bytes - } else { - res - } - } - - val appendedBytesRatio = 1 - commonBytes / relation.allFileSizeInBytes.toFloat - val deletedBytesRatio = 1 - commonBytes / entry.sourceFilesSizeInBytes.toFloat - - // Tag to original index log entry to check the reason string with the given log entry. - lazy val hasLineageColumnCond = - withFilterReasonTag(relation.plan, index, "Index doesn't support deleted files.")( - entry.derivedDataset.canHandleDeletedFiles) - lazy val hasCommonFilesCond = - withFilterReasonTag(relation.plan, index, "No common files.")(commonCnt > 0) - lazy val appendThresholdCond = withFilterReasonTag( - relation.plan, - index, - s"Appended bytes ratio ($appendedBytesRatio) is larger than " + - s"threshold config ${HyperspaceConf.hybridScanAppendedRatioThreshold(spark)}") { - appendedBytesRatio < HyperspaceConf.hybridScanAppendedRatioThreshold(spark) - } - lazy val deleteThresholdCond = withFilterReasonTag( - relation.plan, - index, - s"Deleted bytes ratio ($deletedBytesRatio) is larger than " + - s"threshold config ${HyperspaceConf.hybridScanDeletedRatioThreshold(spark)}") { - deletedBytesRatio < HyperspaceConf.hybridScanDeletedRatioThreshold(spark) - } - - // For append-only Hybrid Scan, deleted files are not allowed. - val deletedCnt = entry.sourceFileInfoSet.size - commonCnt - - val isCandidate = if (deletedCnt == 0) { - hasCommonFilesCond && appendThresholdCond - } else { - hasLineageColumnCond && hasCommonFilesCond && appendThresholdCond && deleteThresholdCond - } - - if (isCandidate) { - entry.setTagValue( - relation.plan, - IndexLogEntryTags.COMMON_SOURCE_SIZE_IN_BYTES, - commonBytes) - - // If there is no change in source dataset, the index will be applied by - // RuleUtils.transformPlanToUseIndexOnlyScan. - entry.setTagValue( - relation.plan, - IndexLogEntryTags.HYBRIDSCAN_REQUIRED, - !(commonCnt == entry.sourceFileInfoSet.size - && commonCnt == relation.allFileInfos.size)) - } - isCandidate - } - if (isHybridScanCandidate) { - Some(entry) - } else { - None - } - } -} diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/IndexRankFilter.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/IndexRankFilter.scala new file mode 100644 index 000000000..b759b6a46 --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/IndexRankFilter.scala @@ -0,0 +1,57 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.rules + +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan + +import com.microsoft.hyperspace.index.IndexLogEntry +import com.microsoft.hyperspace.index.rules.ApplyHyperspace.{PlanToIndexesMap, PlanToSelectedIndexMap} + +/** + * IndexFilter used in ranking applicable indexes. + */ +trait IndexRankFilter extends IndexFilter { + + /** + * Rank best index for the given query plan. + * + * @param plan Query plan + * @param applicableIndexes Map of source plan to applicable indexes + * @return Map of source plan to selected index + */ + def apply(plan: LogicalPlan, applicableIndexes: PlanToIndexesMap): PlanToSelectedIndexMap + + /** + * Set FILTER_REASONS tag for unselected indexes. + * + * @param plan Plan to tag + * @param indexes Indexes to tag + * @param selectedIndex Selected index + */ + protected def setFilterReasonTagForRank( + plan: LogicalPlan, + indexes: Seq[IndexLogEntry], + selectedIndex: IndexLogEntry): Unit = { + indexes.foreach { index => + setFilterReasonTag( + selectedIndex.name.equals(index.name), + plan, + index, + s"Another candidate index is applied: ${selectedIndex.name}") + } + } +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/NoOpRule.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/NoOpRule.scala new file mode 100644 index 000000000..7b19a7a36 --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/NoOpRule.scala @@ -0,0 +1,41 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.rules + +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan + +import com.microsoft.hyperspace.index.rules.ApplyHyperspace.{PlanToIndexesMap, PlanToSelectedIndexMap} + +/** + * No-op rule for traversal. + */ +object NoOpRule extends HyperspaceRule { + + object FilterAll extends QueryPlanIndexFilter { + override def apply(plan: LogicalPlan, candidateIndexes: PlanToIndexesMap): PlanToIndexesMap = + Map.empty + } + + override val filtersOnQueryPlan = FilterAll :: Nil + + // As there's no applicable index after [[FilterAll]], indexRanker is not reachable. + override val indexRanker = null + + override def applyIndex(plan: LogicalPlan, indexes: PlanToSelectedIndexMap): LogicalPlan = plan + + override def score(plan: LogicalPlan, indexes: PlanToSelectedIndexMap): Int = 0 +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/QueryPlanIndexFilter.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/QueryPlanIndexFilter.scala new file mode 100644 index 000000000..fa76b08df --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/QueryPlanIndexFilter.scala @@ -0,0 +1,36 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.rules + +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan + +import com.microsoft.hyperspace.index.rules.ApplyHyperspace.PlanToIndexesMap + +/** + * IndexFilter used in HyperspaceRule. + */ +trait QueryPlanIndexFilter extends IndexFilter { + + /** + * Filter out candidate indexes for the given query plan. + * + * @param plan Query plan + * @param candidateIndexes Map of source plan to candidate indexes + * @return Map of source plan to applicable indexes which meet conditions of Filter + */ + def apply(plan: LogicalPlan, candidateIndexes: PlanToIndexesMap): PlanToIndexesMap +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/ScoreBasedIndexPlanOptimizer.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/ScoreBasedIndexPlanOptimizer.scala new file mode 100644 index 000000000..84a534e4f --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/ScoreBasedIndexPlanOptimizer.scala @@ -0,0 +1,78 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.rules + +import scala.collection.mutable + +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan + +import com.microsoft.hyperspace.index.rules.ApplyHyperspace.PlanToIndexesMap +import com.microsoft.hyperspace.index.types.covering.{FilterIndexRule, JoinIndexRule} + +/** + * Apply Hyperspace indexes based on the score of each index application. + */ +class ScoreBasedIndexPlanOptimizer { + private val rules: Seq[HyperspaceRule] = FilterIndexRule :: JoinIndexRule :: NoOpRule :: Nil + + // Map for memoization. The key is the logical plan before applying [[HyperspaceRule]]s + // and its value is a pair of best transformed plan and its score. + private val scoreMap: mutable.HashMap[LogicalPlan, (LogicalPlan, Int)] = mutable.HashMap() + + private def recApply(plan: LogicalPlan, indexes: PlanToIndexesMap): (LogicalPlan, Int) = { + // If pre-calculated value exists, return it. + scoreMap.get(plan).foreach(res => return res) + + def recChildren(cur: LogicalPlan): (LogicalPlan, Int) = { + // Get the best plan & score for each child node. + var score = 0 + val resultPlan = cur.mapChildren { child => + val res = recApply(child, indexes) + score += res._2 + res._1 + } + (resultPlan, score) + } + + var optResult = (plan, 0) + rules.foreach { rule => + val (transformedPlan, curScore) = rule(plan, indexes) + if (curScore > 0 || rule.equals(NoOpRule)) { + // Positive curScore means the rule is applied. + val result = recChildren(transformedPlan) + if (optResult._2 < result._2 + curScore) { + // Update if the total score is higher than the previous optimal. + optResult = (result._1, result._2 + curScore) + } + } + } + + scoreMap.put(plan, optResult) + optResult + } + + /** + * Transform the given query plan to use selected indexes based on score. + * + * @param plan Original query plan + * @param candidateIndexes Map of source plan to candidate indexes + * @return Transformed plan using selected indexes based on score + */ + def apply(plan: LogicalPlan, candidateIndexes: PlanToIndexesMap): LogicalPlan = { + recApply(plan, candidateIndexes)._1 + } +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/SourcePlanIndexFilter.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/SourcePlanIndexFilter.scala new file mode 100644 index 000000000..e6bc78264 --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/SourcePlanIndexFilter.scala @@ -0,0 +1,36 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.rules + +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan + +import com.microsoft.hyperspace.index.IndexLogEntry + +/** + * IndexFilter used in CandidateIndexCollector. + */ +trait SourcePlanIndexFilter extends IndexFilter { + + /** + * Filter out indexes for the given source plan. + * + * @param plan Source plan + * @param indexes Indexes + * @return Indexes which meet conditions of Filter + */ + def apply(plan: LogicalPlan, indexes: Seq[IndexLogEntry]): Seq[IndexLogEntry] +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/CoveringIndex.scala b/src/main/scala/com/microsoft/hyperspace/index/types/covering/CoveringIndex.scala similarity index 99% rename from src/main/scala/com/microsoft/hyperspace/index/CoveringIndex.scala rename to src/main/scala/com/microsoft/hyperspace/index/types/covering/CoveringIndex.scala index fc3bf4bc2..d6295ec86 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/CoveringIndex.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/types/covering/CoveringIndex.scala @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.microsoft.hyperspace.index +package com.microsoft.hyperspace.index.types.covering import org.apache.spark.sql.{DataFrame, SaveMode} import org.apache.spark.sql.catalyst.catalog.BucketSpec @@ -22,6 +22,7 @@ import org.apache.spark.sql.functions.{col, input_file_name} import org.apache.spark.sql.hyperspace.utils.StructTypeUtils import org.apache.spark.sql.types.StructType +import com.microsoft.hyperspace.index._ import com.microsoft.hyperspace.index.DataFrameWriterExtensions.Bucketizer import com.microsoft.hyperspace.util.ResolverUtils import com.microsoft.hyperspace.util.ResolverUtils.ResolvedColumn diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexConfig.scala b/src/main/scala/com/microsoft/hyperspace/index/types/covering/CoveringIndexConfig.scala similarity index 75% rename from src/main/scala/com/microsoft/hyperspace/index/IndexConfig.scala rename to src/main/scala/com/microsoft/hyperspace/index/types/covering/CoveringIndexConfig.scala index e89ca6a53..42a5c97be 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexConfig.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/types/covering/CoveringIndexConfig.scala @@ -14,17 +14,18 @@ * limitations under the License. */ -package com.microsoft.hyperspace.index +package com.microsoft.hyperspace.index.types.covering import java.util.Locale import org.apache.spark.sql.DataFrame import com.microsoft.hyperspace.Hyperspace +import com.microsoft.hyperspace.index._ import com.microsoft.hyperspace.util.HyperspaceConf /** - * IndexConfig specifies the configuration of a covering index. + * CoveringIndexConfig specifies the configuration of a covering index. * * Use this class to create a covering index with [[Hyperspace.createIndex()]]. * @@ -35,7 +36,7 @@ import com.microsoft.hyperspace.util.HyperspaceConf * @param indexedColumns Columns from which an index is created. * @param includedColumns Columns to be included in the index. */ -case class IndexConfig( +case class CoveringIndexConfig( override val indexName: String, indexedColumns: Seq[String], includedColumns: Seq[String] = Seq()) @@ -65,7 +66,7 @@ case class IndexConfig( override def equals(that: Any): Boolean = { that match { - case IndexConfig(thatIndexName, thatIndexedColumns, thatIncludedColumns) => + case CoveringIndexConfig(thatIndexName, thatIndexedColumns, thatIncludedColumns) => indexName.equalsIgnoreCase(thatIndexName) && lowerCaseIndexedColumns.equals(toLowerCase(thatIndexedColumns)) && lowerCaseIncludedColumnsSet.equals(toLowerCase(thatIncludedColumns).toSet) @@ -110,13 +111,13 @@ case class IndexConfig( } /** - * Defines [[IndexConfig.Builder]] and relevant helper methods for enabling builder pattern for - * [[IndexConfig]]. + * Defines [[CoveringIndexConfig.Builder]] and relevant helper methods for enabling builder pattern + * for [[CoveringIndexConfig]]. */ -object IndexConfig { +object CoveringIndexConfig { /** - * Builder for [[IndexConfig]]. + * Builder for [[CoveringIndexConfig]]. */ class Builder { @@ -125,10 +126,10 @@ object IndexConfig { private[this] var indexName: String = "" /** - * Updates index name for [[IndexConfig]]. + * Updates index name for [[CoveringIndexConfig]]. * - * @param indexName index name for the [[IndexConfig]]. - * @return an [[IndexConfig.Builder]] object with updated index name. + * @param indexName index name for the [[CoveringIndexConfig]]. + * @return an [[CoveringIndexConfig.Builder]] object with updated index name. */ def indexName(indexName: String): Builder = { if (!this.indexName.isEmpty) { @@ -144,13 +145,13 @@ object IndexConfig { } /** - * Updates column names for [[IndexConfig]]. + * Updates column names for [[CoveringIndexConfig]]. * * Note: API signature supports passing one or more argument. * - * @param indexedColumn indexed column for the [[IndexConfig]]. - * @param indexedColumns indexed columns for the [[IndexConfig]]. - * @return an [[IndexConfig.Builder]] object with updated indexed columns. + * @param indexedColumn indexed column for the [[CoveringIndexConfig]]. + * @param indexedColumns indexed columns for the [[CoveringIndexConfig]]. + * @return an [[CoveringIndexConfig.Builder]] object with updated indexed columns. */ def indexBy(indexedColumn: String, indexedColumns: String*): Builder = { if (this.indexedColumns.nonEmpty) { @@ -162,13 +163,13 @@ object IndexConfig { } /** - * Updates included columns for [[IndexConfig]]. + * Updates included columns for [[CoveringIndexConfig]]. * * Note: API signature supports passing one or more argument. * - * @param includedColumn included column for [[IndexConfig]]. - * @param includedColumns included columns for [[IndexConfig]]. - * @return an [[IndexConfig.Builder]] object with updated included columns. + * @param includedColumn included column for [[CoveringIndexConfig]]. + * @param includedColumns included columns for [[CoveringIndexConfig]]. + * @return an [[CoveringIndexConfig.Builder]] object with updated included columns. */ def include(includedColumn: String, includedColumns: String*): Builder = { if (this.includedColumns.nonEmpty) { @@ -180,20 +181,20 @@ object IndexConfig { } /** - * Creates IndexConfig from supplied index name, indexed columns and included columns - * to [[IndexConfig.Builder]]. + * Creates CoveringIndexConfig from supplied index name, indexed columns and included columns + * to [[CoveringIndexConfig.Builder]]. * - * @return an [[IndexConfig]] object. + * @return an [[CoveringIndexConfig]] object. */ - def create(): IndexConfig = { - IndexConfig(indexName, indexedColumns, includedColumns) + def create(): CoveringIndexConfig = { + CoveringIndexConfig(indexName, indexedColumns, includedColumns) } } /** - * Creates new [[IndexConfig.Builder]] for constructing an [[IndexConfig]]. + * Creates new [[CoveringIndexConfig.Builder]] for constructing an [[CoveringIndexConfig]]. * - * @return an [[IndexConfig.Builder]] object. + * @return an [[CoveringIndexConfig.Builder]] object. */ def builder(): Builder = new Builder } diff --git a/src/main/scala/com/microsoft/hyperspace/index/types/covering/CoveringIndexFilter.scala b/src/main/scala/com/microsoft/hyperspace/index/types/covering/CoveringIndexFilter.scala new file mode 100644 index 000000000..816da8fba --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/types/covering/CoveringIndexFilter.scala @@ -0,0 +1,36 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.types.covering + +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan + +import com.microsoft.hyperspace.index.rules.ApplyHyperspace.PlanToIndexesMap +import com.microsoft.hyperspace.index.rules.QueryPlanIndexFilter + +/** + * Filters out indexes which are not [[CoveringIndex]]. + */ +object CoveringIndexFilter extends QueryPlanIndexFilter { + override def apply(plan: LogicalPlan, candidateIndexes: PlanToIndexesMap): PlanToIndexesMap = { + candidateIndexes + .map { + case (plan, indexes) => + plan -> indexes.filter(_.derivedDataset.isInstanceOf[CoveringIndex]) + } + .filter { case (_, indexes) => indexes.nonEmpty } + } +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/rankers/FilterIndexRanker.scala b/src/main/scala/com/microsoft/hyperspace/index/types/covering/FilterIndexRanker.scala similarity index 97% rename from src/main/scala/com/microsoft/hyperspace/index/rankers/FilterIndexRanker.scala rename to src/main/scala/com/microsoft/hyperspace/index/types/covering/FilterIndexRanker.scala index 4c850218c..d5cf92748 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rankers/FilterIndexRanker.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/types/covering/FilterIndexRanker.scala @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.microsoft.hyperspace.index.rankers +package com.microsoft.hyperspace.index.types.covering import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/FilterIndexRule.scala b/src/main/scala/com/microsoft/hyperspace/index/types/covering/FilterIndexRule.scala similarity index 97% rename from src/main/scala/com/microsoft/hyperspace/index/rules/FilterIndexRule.scala rename to src/main/scala/com/microsoft/hyperspace/index/types/covering/FilterIndexRule.scala index 98dd37a59..88633c492 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/FilterIndexRule.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/types/covering/FilterIndexRule.scala @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.microsoft.hyperspace.index.rules +package com.microsoft.hyperspace.index.types.covering import org.apache.spark.sql.catalyst.analysis.CleanupAliases import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} @@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{Filter, LeafNode, LogicalPla import com.microsoft.hyperspace.{ActiveSparkSession, Hyperspace} import com.microsoft.hyperspace.index.IndexLogEntryTags -import com.microsoft.hyperspace.index.rankers.FilterIndexRanker +import com.microsoft.hyperspace.index.rules.{HyperspaceRule, IndexRankFilter, QueryPlanIndexFilter} import com.microsoft.hyperspace.index.rules.ApplyHyperspace.{PlanToIndexesMap, PlanToSelectedIndexMap} import com.microsoft.hyperspace.index.sources.FileBasedRelation import com.microsoft.hyperspace.util.{HyperspaceConf, ResolverUtils} @@ -157,7 +157,7 @@ object ExtractRelation extends ActiveSparkSession { */ object FilterIndexRule extends HyperspaceRule { override val filtersOnQueryPlan: Seq[QueryPlanIndexFilter] = - FilterPlanNodeFilter :: FilterColumnFilter :: Nil + CoveringIndexFilter :: FilterPlanNodeFilter :: FilterColumnFilter :: Nil override val indexRanker: IndexRankFilter = FilterRankFilter diff --git a/src/main/scala/com/microsoft/hyperspace/index/rankers/JoinIndexRanker.scala b/src/main/scala/com/microsoft/hyperspace/index/types/covering/JoinIndexRanker.scala similarity index 96% rename from src/main/scala/com/microsoft/hyperspace/index/rankers/JoinIndexRanker.scala rename to src/main/scala/com/microsoft/hyperspace/index/types/covering/JoinIndexRanker.scala index c6897e249..1b1c775fb 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rankers/JoinIndexRanker.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/types/covering/JoinIndexRanker.scala @@ -14,12 +14,12 @@ * limitations under the License. */ -package com.microsoft.hyperspace.index.rankers +package com.microsoft.hyperspace.index.types.covering import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import com.microsoft.hyperspace.index.{CoveringIndex, IndexLogEntry, IndexLogEntryTags} +import com.microsoft.hyperspace.index.{IndexLogEntry, IndexLogEntryTags} import com.microsoft.hyperspace.util.HyperspaceConf /** diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/JoinIndexRule.scala b/src/main/scala/com/microsoft/hyperspace/index/types/covering/JoinIndexRule.scala similarity index 98% rename from src/main/scala/com/microsoft/hyperspace/index/rules/JoinIndexRule.scala rename to src/main/scala/com/microsoft/hyperspace/index/types/covering/JoinIndexRule.scala index 28b82888a..fbdbe67fd 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/JoinIndexRule.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/types/covering/JoinIndexRule.scala @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.microsoft.hyperspace.index.rules +package com.microsoft.hyperspace.index.types.covering import scala.collection.mutable import scala.util.Try @@ -25,10 +25,10 @@ import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan} import com.microsoft.hyperspace.Hyperspace import com.microsoft.hyperspace.index.{IndexLogEntry, IndexLogEntryTags} -import com.microsoft.hyperspace.index.rankers.JoinIndexRanker +import com.microsoft.hyperspace.index.rules.{HyperspaceRule, IndexRankFilter, QueryPlanIndexFilter} import com.microsoft.hyperspace.index.rules.ApplyHyperspace.{PlanToIndexesMap, PlanToSelectedIndexMap} -import com.microsoft.hyperspace.index.rules.JoinAttributeFilter.extractConditions import com.microsoft.hyperspace.index.sources.FileBasedRelation +import com.microsoft.hyperspace.index.types.covering.JoinAttributeFilter.extractConditions import com.microsoft.hyperspace.shim.JoinWithoutHint import com.microsoft.hyperspace.telemetry.{AppInfo, HyperspaceEventLogging, HyperspaceIndexUsageEvent} import com.microsoft.hyperspace.util.ResolverUtils.resolve @@ -618,7 +618,7 @@ object JoinRankFilter extends IndexRankFilter { object JoinIndexRule extends HyperspaceRule with HyperspaceEventLogging { override val filtersOnQueryPlan: Seq[QueryPlanIndexFilter] = - JoinPlanNodeFilter :: JoinAttributeFilter :: JoinColumnFilter :: Nil + CoveringIndexFilter :: JoinPlanNodeFilter :: JoinAttributeFilter :: JoinColumnFilter :: Nil override val indexRanker: IndexRankFilter = JoinRankFilter diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala b/src/main/scala/com/microsoft/hyperspace/index/types/covering/RuleUtils.scala similarity index 99% rename from src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala rename to src/main/scala/com/microsoft/hyperspace/index/types/covering/RuleUtils.scala index 61a80d962..073a700da 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/types/covering/RuleUtils.scala @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.microsoft.hyperspace.index.rules +package com.microsoft.hyperspace.index.types.covering import org.apache.hadoop.fs.Path import org.apache.spark.sql.SparkSession @@ -382,7 +382,7 @@ object RuleUtils { * @param plan Plan to be shuffled. * @return Transformed plan by injecting on-the-fly shuffle with given bucket specification. */ - private[rules] def transformPlanToShuffleUsingBucketSpec( + private[covering] def transformPlanToShuffleUsingBucketSpec( bucketSpec: BucketSpec, plan: LogicalPlan): LogicalPlan = { // Extract top level plan including all required columns for shuffle in its output. diff --git a/src/test/scala/com/microsoft/hyperspace/actions/RefreshActionTest.scala b/src/test/scala/com/microsoft/hyperspace/actions/RefreshActionTest.scala index 56a770c30..062b9472f 100644 --- a/src/test/scala/com/microsoft/hyperspace/actions/RefreshActionTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/actions/RefreshActionTest.scala @@ -28,6 +28,7 @@ import com.microsoft.hyperspace.{HyperspaceException, SampleData} import com.microsoft.hyperspace.actions.Constants.States.{ACTIVE, CREATING} import com.microsoft.hyperspace.index._ import com.microsoft.hyperspace.index.sources.FileBasedSourceProviderManager +import com.microsoft.hyperspace.index.types.covering.CoveringIndex class RefreshActionTest extends HyperspaceSuite { private val sampleParquetDataLocation = inTempDir("sampleparquet") diff --git a/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTest.scala b/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTest.scala index 66a0e5651..5cc4b1980 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTest.scala @@ -30,7 +30,7 @@ import com.microsoft.hyperspace.index.IndexConstants.{GLOBBING_PATTERN_KEY, REFR import com.microsoft.hyperspace.index.IndexLogEntryTags._ import com.microsoft.hyperspace.index.execution.BucketUnionStrategy import com.microsoft.hyperspace.index.rules.{ApplyHyperspace, CandidateIndexCollector} -import com.microsoft.hyperspace.index.rules.JoinIndexRule +import com.microsoft.hyperspace.index.types.covering.JoinIndexRule import com.microsoft.hyperspace.util.PathUtils class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite { diff --git a/src/test/scala/com/microsoft/hyperspace/index/rules/HyperspaceRuleSuite.scala b/src/test/scala/com/microsoft/hyperspace/index/HyperspaceRuleSuite.scala similarity index 97% rename from src/test/scala/com/microsoft/hyperspace/index/rules/HyperspaceRuleSuite.scala rename to src/test/scala/com/microsoft/hyperspace/index/HyperspaceRuleSuite.scala index 5d65c1766..3d3644e4a 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/rules/HyperspaceRuleSuite.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/HyperspaceRuleSuite.scala @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.microsoft.hyperspace.index.rules +package com.microsoft.hyperspace.index import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} @@ -26,8 +26,8 @@ import org.apache.spark.sql.types.{StructField, StructType} import com.microsoft.hyperspace.HyperspaceException import com.microsoft.hyperspace.actions.Constants -import com.microsoft.hyperspace.index._ import com.microsoft.hyperspace.index.Hdfs.Properties +import com.microsoft.hyperspace.index.types.covering.CoveringIndex import com.microsoft.hyperspace.util.PathUtils trait HyperspaceRuleSuite extends HyperspaceSuite { diff --git a/src/test/scala/com/microsoft/hyperspace/index/HyperspaceSuite.scala b/src/test/scala/com/microsoft/hyperspace/index/HyperspaceSuite.scala index efa48e213..859887dfa 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/HyperspaceSuite.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/HyperspaceSuite.scala @@ -22,18 +22,11 @@ import java.nio.charset.StandardCharsets import org.apache.hadoop.fs.Path import org.apache.spark.SparkFunSuite import org.apache.spark.util.hyperspace.Utils -import org.scalatest.{BeforeAndAfterAllConfigMap, ConfigMap} import com.microsoft.hyperspace.{BuildInfo, Hyperspace, SparkInvolvedSuite} import com.microsoft.hyperspace.util.{FileUtils, PathUtils} -trait HyperspaceSuite - extends SparkFunSuite - with SparkInvolvedSuite - with BeforeAndAfterAllConfigMap { - - // Needed to resolve conflicts between BeforeAndAfterAll and BeforeAndAfterAllConfigMap - override val invokeBeforeAllAndAfterAllEvenIfNoTestsAreExpected = false +trait HyperspaceSuite extends SparkFunSuite with SparkInvolvedSuite { // Temporary directory lazy val tempDir: Path = new Path(Utils.createTempDir().getAbsolutePath) @@ -47,7 +40,7 @@ trait HyperspaceSuite // Each test suite that extends HyperspaceSuite should define this. lazy val systemPath: Path = PathUtils.makeAbsolute(inTempDir(indexLocationDirName)) - override def beforeAll(cm: ConfigMap): Unit = { + override def beforeAll(): Unit = { super.beforeAll() FileUtils.delete(tempDir) spark.conf.set(IndexConstants.INDEX_SYSTEM_PATH, systemPath.toUri.toString) diff --git a/src/test/scala/com/microsoft/hyperspace/index/IndexCacheTest.scala b/src/test/scala/com/microsoft/hyperspace/index/IndexCacheTest.scala index c99f185de..3952d8117 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/IndexCacheTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/IndexCacheTest.scala @@ -23,6 +23,7 @@ import org.apache.spark.sql.types.{StringType, StructField, StructType} import com.microsoft.hyperspace.{Hyperspace, HyperspaceException, SampleData} import com.microsoft.hyperspace.actions.Constants +import com.microsoft.hyperspace.index.types.covering.CoveringIndex import com.microsoft.hyperspace.util.FileUtils class IndexCacheTest extends HyperspaceSuite { diff --git a/src/test/scala/com/microsoft/hyperspace/index/IndexCollectionManagerTest.scala b/src/test/scala/com/microsoft/hyperspace/index/IndexCollectionManagerTest.scala index 07d8f1db2..928625658 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/IndexCollectionManagerTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/IndexCollectionManagerTest.scala @@ -25,6 +25,7 @@ import org.mockito.Mockito.{mock, when} import com.microsoft.hyperspace.HyperspaceException import com.microsoft.hyperspace.actions.Constants import com.microsoft.hyperspace.index.IndexConstants.{REFRESH_MODE_FULL, REFRESH_MODE_INCREMENTAL} +import com.microsoft.hyperspace.index.types.covering.CoveringIndex class IndexCollectionManagerTest extends HyperspaceSuite { private val testLogManagerFactory: IndexLogManagerFactory = new IndexLogManagerFactory { diff --git a/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala b/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala index bf4acbb14..5ef089c73 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.types.{StringType, StructField, StructType} import com.microsoft.hyperspace.{BuildInfo, HyperspaceException, TestUtils} import com.microsoft.hyperspace.index.IndexConstants.UNKNOWN_FILE_ID +import com.microsoft.hyperspace.index.types.covering.CoveringIndex import com.microsoft.hyperspace.util.{JsonUtils, PathUtils} class IndexLogEntryTest extends HyperspaceSuite with SQLHelper { @@ -76,7 +77,7 @@ class IndexLogEntryTest extends HyperspaceSuite with SQLHelper { |{ | "name" : "indexName", | "derivedDataset" : { - | "type" : "com.microsoft.hyperspace.index.CoveringIndex", + | "type" : "com.microsoft.hyperspace.index.types.covering.CoveringIndex", | "indexedColumns" : [ "col1" ], | "includedColumns" : [ "col2", "col3" ], | "schema" : { diff --git a/src/test/scala/com/microsoft/hyperspace/index/IndexLogManagerImplTest.scala b/src/test/scala/com/microsoft/hyperspace/index/IndexLogManagerImplTest.scala index 601626d88..684334303 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/IndexLogManagerImplTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/IndexLogManagerImplTest.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructT import com.microsoft.hyperspace.TestUtils import com.microsoft.hyperspace.index.IndexConstants.HYPERSPACE_LOG +import com.microsoft.hyperspace.index.types.covering.CoveringIndex import com.microsoft.hyperspace.util.{FileUtils, JsonUtils} class IndexLogManagerImplTest extends HyperspaceSuite { diff --git a/src/test/scala/com/microsoft/hyperspace/index/IndexManagerTest.scala b/src/test/scala/com/microsoft/hyperspace/index/IndexManagerTest.scala index 7ca17decc..ccb0bb21a 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/IndexManagerTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/IndexManagerTest.scala @@ -28,6 +28,7 @@ import com.microsoft.hyperspace.{Hyperspace, HyperspaceException, MockEventLogge import com.microsoft.hyperspace.TestUtils.{copyWithState, getFileIdTracker, latestIndexLogEntry, logManager} import com.microsoft.hyperspace.actions.Constants import com.microsoft.hyperspace.index.IndexConstants.{GLOBBING_PATTERN_KEY, OPTIMIZE_FILE_SIZE_THRESHOLD, REFRESH_MODE_FULL, REFRESH_MODE_INCREMENTAL} +import com.microsoft.hyperspace.index.types.covering.CoveringIndex import com.microsoft.hyperspace.telemetry.OptimizeActionEvent import com.microsoft.hyperspace.util.{FileUtils, PathUtils} diff --git a/src/test/scala/com/microsoft/hyperspace/index/IndexTest.scala b/src/test/scala/com/microsoft/hyperspace/index/IndexTest.scala index b490e4a8f..a4d29262f 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/IndexTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/IndexTest.scala @@ -20,6 +20,7 @@ import org.apache.spark.SparkFunSuite import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import com.microsoft.hyperspace.actions.Constants +import com.microsoft.hyperspace.index.types.covering.CoveringIndex class IndexTest extends SparkFunSuite { val indexConfig1 = IndexConfig("myIndex1", Array("id"), Seq("name")) diff --git a/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexNestedTest.scala b/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexNestedTest.scala index a7f721bfb..f5f9b08f0 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexNestedTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexNestedTest.scala @@ -24,6 +24,7 @@ import com.microsoft.hyperspace.{Hyperspace, HyperspaceException, MockEventLogge import com.microsoft.hyperspace.TestUtils.{getFileIdTracker, logManager} import com.microsoft.hyperspace.actions.{RefreshIncrementalAction, RefreshQuickAction} import com.microsoft.hyperspace.index.IndexConstants.REFRESH_MODE_INCREMENTAL +import com.microsoft.hyperspace.index.types.covering.CoveringIndex import com.microsoft.hyperspace.telemetry.RefreshIncrementalActionEvent import com.microsoft.hyperspace.util.{FileUtils, PathUtils} import com.microsoft.hyperspace.util.PathUtils.DataPathFilter diff --git a/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTest.scala b/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTest.scala index 452500971..a06f6e844 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTest.scala @@ -24,6 +24,7 @@ import com.microsoft.hyperspace.{Hyperspace, HyperspaceException, MockEventLogge import com.microsoft.hyperspace.TestUtils.{getFileIdTracker, logManager} import com.microsoft.hyperspace.actions.{RefreshIncrementalAction, RefreshQuickAction} import com.microsoft.hyperspace.index.IndexConstants.REFRESH_MODE_INCREMENTAL +import com.microsoft.hyperspace.index.types.covering.CoveringIndex import com.microsoft.hyperspace.telemetry.RefreshIncrementalActionEvent import com.microsoft.hyperspace.util.{FileUtils, PathUtils} import com.microsoft.hyperspace.util.PathUtils.DataPathFilter diff --git a/src/test/scala/com/microsoft/hyperspace/index/rules/RuleTestHelper.scala b/src/test/scala/com/microsoft/hyperspace/index/RuleTestHelper.scala similarity index 90% rename from src/test/scala/com/microsoft/hyperspace/index/rules/RuleTestHelper.scala rename to src/test/scala/com/microsoft/hyperspace/index/RuleTestHelper.scala index 708517d10..01d2fbd42 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/rules/RuleTestHelper.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/RuleTestHelper.scala @@ -14,13 +14,11 @@ * limitations under the License. */ -package com.microsoft.hyperspace.index.rules +package com.microsoft.hyperspace.index import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} -import com.microsoft.hyperspace.index.LogicalPlanSignatureProvider - object RuleTestHelper { class TestSignatureProvider extends LogicalPlanSignatureProvider { def signature(plan: LogicalPlan): Option[String] = diff --git a/src/test/scala/com/microsoft/hyperspace/index/rules/CandidateIndexCollectorTest.scala b/src/test/scala/com/microsoft/hyperspace/index/rules/CandidateIndexCollectorTest.scala index d74b1d822..32abd2484 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/rules/CandidateIndexCollectorTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/rules/CandidateIndexCollectorTest.scala @@ -23,10 +23,10 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InMemoryFileIndex, LogicalRelation, NoopCache} import org.apache.spark.sql.types.{IntegerType, StringType} -import com.microsoft.hyperspace.Hyperspace import com.microsoft.hyperspace.TestUtils.latestIndexLogEntry import com.microsoft.hyperspace.actions.Constants -import com.microsoft.hyperspace.index.{IndexCollectionManager, IndexConfig, IndexConstants, IndexLogEntryTags} +import com.microsoft.hyperspace.index.{HyperspaceRuleSuite, IndexCollectionManager, IndexConfig, IndexConstants, IndexLogEntryTags} +import com.microsoft.hyperspace.index.types.covering.RuleUtils import com.microsoft.hyperspace.util.FileUtils class CandidateIndexCollectorTest extends HyperspaceRuleSuite with SQLHelper { diff --git a/src/test/scala/com/microsoft/hyperspace/index/ScoreBasedIndexPlanOptimizerTest.scala b/src/test/scala/com/microsoft/hyperspace/index/rules/ScoreBasedIndexPlanOptimizerTest.scala similarity index 97% rename from src/test/scala/com/microsoft/hyperspace/index/ScoreBasedIndexPlanOptimizerTest.scala rename to src/test/scala/com/microsoft/hyperspace/index/rules/ScoreBasedIndexPlanOptimizerTest.scala index b1c87412f..92ac10b82 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/ScoreBasedIndexPlanOptimizerTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/rules/ScoreBasedIndexPlanOptimizerTest.scala @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.microsoft.hyperspace.index +package com.microsoft.hyperspace.index.rules import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path @@ -24,8 +24,8 @@ import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InMemoryFil import com.microsoft.hyperspace.{Hyperspace, Implicits, SampleData, TestConfig} import com.microsoft.hyperspace.actions.Constants -import com.microsoft.hyperspace.index.rules.{FilterIndexRule, JoinIndexRule} -import com.microsoft.hyperspace.index.rules.CandidateIndexCollector +import com.microsoft.hyperspace.index._ +import com.microsoft.hyperspace.index.types.covering.{FilterIndexRule, JoinIndexRule} class ScoreBasedIndexPlanOptimizerTest extends QueryTest with HyperspaceSuite { private val testDir = inTempDir("scoreBasedIndexPlanOptimizerTest") diff --git a/src/test/scala/com/microsoft/hyperspace/index/rankers/FilterIndexRankerTest.scala b/src/test/scala/com/microsoft/hyperspace/index/types/covering/FilterIndexRankerTest.scala similarity index 95% rename from src/test/scala/com/microsoft/hyperspace/index/rankers/FilterIndexRankerTest.scala rename to src/test/scala/com/microsoft/hyperspace/index/types/covering/FilterIndexRankerTest.scala index 2e3a5c67b..851b5ac19 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/rankers/FilterIndexRankerTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/types/covering/FilterIndexRankerTest.scala @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.microsoft.hyperspace.index.rankers +package com.microsoft.hyperspace.index.types.covering import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path @@ -22,8 +22,7 @@ import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.types.{IntegerType, StringType} -import com.microsoft.hyperspace.index.{FileInfo, IndexConstants, IndexLogEntryTags} -import com.microsoft.hyperspace.index.rules.HyperspaceRuleSuite +import com.microsoft.hyperspace.index.{FileInfo, HyperspaceRuleSuite, IndexConstants} import com.microsoft.hyperspace.util.FileUtils class FilterIndexRankerTest extends HyperspaceRuleSuite { diff --git a/src/test/scala/com/microsoft/hyperspace/index/rules/FilterIndexRuleTest.scala b/src/test/scala/com/microsoft/hyperspace/index/types/covering/FilterIndexRuleTest.scala similarity index 98% rename from src/test/scala/com/microsoft/hyperspace/index/rules/FilterIndexRuleTest.scala rename to src/test/scala/com/microsoft/hyperspace/index/types/covering/FilterIndexRuleTest.scala index 47e2a2956..ae4b86e98 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/rules/FilterIndexRuleTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/types/covering/FilterIndexRuleTest.scala @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.microsoft.hyperspace.index.rules +package com.microsoft.hyperspace.index.types.covering import org.apache.hadoop.fs.Path import org.apache.spark.sql.catalyst.catalog.BucketSpec @@ -26,6 +26,7 @@ import org.apache.spark.sql.types.{IntegerType, StringType, StructType} import com.microsoft.hyperspace.actions.Constants import com.microsoft.hyperspace.index._ +import com.microsoft.hyperspace.index.rules.CandidateIndexCollector class FilterIndexRuleTest extends HyperspaceRuleSuite { override val indexLocationDirName = "joinIndexTest" diff --git a/src/test/scala/com/microsoft/hyperspace/index/rankers/JoinIndexRankerTest.scala b/src/test/scala/com/microsoft/hyperspace/index/types/covering/JoinIndexRankerTest.scala similarity index 97% rename from src/test/scala/com/microsoft/hyperspace/index/rankers/JoinIndexRankerTest.scala rename to src/test/scala/com/microsoft/hyperspace/index/types/covering/JoinIndexRankerTest.scala index f9ab06bc3..fb7eb4d94 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/rankers/JoinIndexRankerTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/types/covering/JoinIndexRankerTest.scala @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.microsoft.hyperspace.index.rankers +package com.microsoft.hyperspace.index.types.covering import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path @@ -23,8 +23,7 @@ import org.apache.spark.sql.catalyst.plans.SQLHelper import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.types.{IntegerType, StringType} -import com.microsoft.hyperspace.index.{FileInfo, IndexConstants} -import com.microsoft.hyperspace.index.rules.HyperspaceRuleSuite +import com.microsoft.hyperspace.index.{FileInfo, HyperspaceRuleSuite, IndexConstants} import com.microsoft.hyperspace.util.FileUtils class JoinIndexRankerTest extends HyperspaceRuleSuite with SQLHelper { diff --git a/src/test/scala/com/microsoft/hyperspace/index/rules/JoinIndexRuleTest.scala b/src/test/scala/com/microsoft/hyperspace/index/types/covering/JoinIndexRuleTest.scala similarity index 99% rename from src/test/scala/com/microsoft/hyperspace/index/rules/JoinIndexRuleTest.scala rename to src/test/scala/com/microsoft/hyperspace/index/types/covering/JoinIndexRuleTest.scala index 9acac9e46..ec1628744 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/rules/JoinIndexRuleTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/types/covering/JoinIndexRuleTest.scala @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.microsoft.hyperspace.index.rules +package com.microsoft.hyperspace.index.types.covering import org.apache.hadoop.fs.Path import org.apache.spark.sql.Row @@ -26,6 +26,7 @@ import org.apache.spark.sql.types.{IntegerType, StringType} import com.microsoft.hyperspace.actions.Constants import com.microsoft.hyperspace.index._ +import com.microsoft.hyperspace.index.rules.CandidateIndexCollector import com.microsoft.hyperspace.shim.{JoinWithoutHint => Join} import com.microsoft.hyperspace.util.{FileUtils, SparkTestShims} diff --git a/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala b/src/test/scala/com/microsoft/hyperspace/index/types/covering/RuleUtilsTest.scala similarity index 96% rename from src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala rename to src/test/scala/com/microsoft/hyperspace/index/types/covering/RuleUtilsTest.scala index 46e6d2c73..cd62e023f 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/types/covering/RuleUtilsTest.scala @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.microsoft.hyperspace.index.rules +package com.microsoft.hyperspace.index.types.covering import org.apache.hadoop.fs.Path import org.apache.spark.sql.catalyst.catalog.BucketSpec @@ -24,10 +24,8 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InMemoryFileIndex, LogicalRelation, NoopCache} import org.apache.spark.sql.types.{IntegerType, StringType} -import com.microsoft.hyperspace.actions.Constants -import com.microsoft.hyperspace.index.{IndexCollectionManager, IndexConfig, IndexConstants, IndexLogEntryTags} +import com.microsoft.hyperspace.index.HyperspaceRuleSuite import com.microsoft.hyperspace.shim.{JoinWithoutHint, RepartitionByExpressionWithOptionalNumPartitions} -import com.microsoft.hyperspace.util.{FileUtils, PathUtils} class RuleUtilsTest extends HyperspaceRuleSuite with SQLHelper { override val indexLocationDirName = "ruleUtilsTest" diff --git a/src/test/scala/com/microsoft/hyperspace/util/JsonUtilsTest.scala b/src/test/scala/com/microsoft/hyperspace/util/JsonUtilsTest.scala index 8fb0f8b69..eac656e23 100644 --- a/src/test/scala/com/microsoft/hyperspace/util/JsonUtilsTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/util/JsonUtilsTest.scala @@ -21,6 +21,7 @@ import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructT import com.microsoft.hyperspace.actions.Constants import com.microsoft.hyperspace.index._ +import com.microsoft.hyperspace.index.types.covering.CoveringIndex class JsonUtilsTest extends SparkFunSuite { test("Test for JsonUtils.") {