Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

Commit

Permalink
Refactoring for an extensible Index API: Part 3
Browse files Browse the repository at this point in the history
- Move CoveringIndex specific code into the
  com.microsoft.hyperspace.index.types.covering package
- Move traits/classes/objects in the
  com.microsoft.hyperspace.index.rules package into their own files
- Add alias for CoveringIndexConfig for backward compatibility
  • Loading branch information
Chungmin Lee committed Jun 28, 2021
1 parent be09d41 commit f41e610
Show file tree
Hide file tree
Showing 42 changed files with 685 additions and 442 deletions.
2 changes: 1 addition & 1 deletion python/hyperspace/hyperspace.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def _getJavaIndexConfig(self, index_config):
"""
indexed_columns = self._getScalaSeqFromList(index_config.indexedColumns)
included_columns = self._getScalaSeqFromList(index_config.includedColumns)
_jindexConfig = self.jvm.com.microsoft.hyperspace.index.IndexConfig(
_jindexConfig = self.jvm.com.microsoft.hyperspace.index.types.covering.CoveringIndexConfig(
self.jvm.java.lang.String(index_config.indexName), indexed_columns, included_columns)
return _jindexConfig

Expand Down
34 changes: 34 additions & 0 deletions src/main/scala/com/microsoft/hyperspace/index/package.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright (2020) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.microsoft.hyperspace

import com.microsoft.hyperspace.index.types.covering.CoveringIndexConfig

package object index {

/**
* IndexConfig is defined as an alias for [[CoveringIndexConfig]] for
* backward compatibility.
*/
type IndexConfig = CoveringIndexConfig

/**
* IndexConfig is defined as an alias for [[CoveringIndexConfig]] for
* backward compatibility.
*/
val IndexConfig = CoveringIndexConfig
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,109 +16,15 @@

package com.microsoft.hyperspace.index.rules

import scala.collection.mutable

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule

import com.microsoft.hyperspace.{ActiveSparkSession, Hyperspace}
import com.microsoft.hyperspace.actions.Constants
import com.microsoft.hyperspace.index.IndexLogEntry
import com.microsoft.hyperspace.index.rules.ApplyHyperspace.PlanToIndexesMap
import com.microsoft.hyperspace.telemetry.HyperspaceEventLogging

/**
* Collect candidate indexes for each source plan.
*/
object CandidateIndexCollector extends ActiveSparkSession {
private val sourceFilters: Seq[SourcePlanIndexFilter] =
ColumnSchemaFilter :: FileSignatureFilter :: Nil

private def initializePlanToIndexes(
plan: LogicalPlan,
indexes: Seq[IndexLogEntry]): PlanToIndexesMap = {
val provider = Hyperspace.getContext(spark).sourceProviderManager
plan.collect {
case l: LeafNode if provider.isSupportedRelation(l) =>
(l.asInstanceOf[LogicalPlan], indexes)
}.toMap
}

/**
* Extract candidate indexes for each source plan in the given query plan.
*
* @param plan Original query plan
* @param allIndexes All indexes
* @return Map of source plan to candidate indexes
*/
def apply(plan: LogicalPlan, allIndexes: Seq[IndexLogEntry]): PlanToIndexesMap = {
val planToIndexes = initializePlanToIndexes(plan, allIndexes)
planToIndexes.flatMap {
case (node, allIndexes) =>
Some(
node,
sourceFilters.foldLeft(allIndexes) { (indexes, filter) =>
filter(node, indexes)
}).filter(_._2.nonEmpty)
}
}
}

/**
* Apply Hyperspace indexes based on the score of each index application.
*/
class ScoreBasedIndexPlanOptimizer {
private val rules: Seq[HyperspaceRule] = FilterIndexRule :: JoinIndexRule :: NoOpRule :: Nil

// Map for memoization. The key is the logical plan before applying [[HyperspaceRule]]s
// and its value is a pair of best transformed plan and its score.
private val scoreMap: mutable.HashMap[LogicalPlan, (LogicalPlan, Int)] = mutable.HashMap()

private def recApply(plan: LogicalPlan, indexes: PlanToIndexesMap): (LogicalPlan, Int) = {
// If pre-calculated value exists, return it.
scoreMap.get(plan).foreach(res => return res)

def recChildren(cur: LogicalPlan): (LogicalPlan, Int) = {
// Get the best plan & score for each child node.
var score = 0
val resultPlan = cur.mapChildren { child =>
val res = recApply(child, indexes)
score += res._2
res._1
}
(resultPlan, score)
}

var optResult = (plan, 0)
rules.foreach { rule =>
val (transformedPlan, curScore) = rule(plan, indexes)
if (curScore > 0 || rule.equals(NoOpRule)) {
// Positive curScore means the rule is applied.
val result = recChildren(transformedPlan)
if (optResult._2 < result._2 + curScore) {
// Update if the total score is higher than the previous optimal.
optResult = (result._1, result._2 + curScore)
}
}
}

scoreMap.put(plan, optResult)
optResult
}

/**
* Transform the given query plan to use selected indexes based on score.
*
* @param plan Original query plan
* @param candidateIndexes Map of source plan to candidate indexes
* @return Transformed plan using selected indexes based on score
*/
def apply(plan: LogicalPlan, candidateIndexes: PlanToIndexesMap): LogicalPlan = {
recApply(plan, candidateIndexes)._1
}
}

/**
* Transform the given plan to use Hyperspace indexes.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.microsoft.hyperspace.index.rules

import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}

import com.microsoft.hyperspace.{ActiveSparkSession, Hyperspace}
import com.microsoft.hyperspace.index.IndexLogEntry
import com.microsoft.hyperspace.index.rules.ApplyHyperspace.PlanToIndexesMap

/**
* Collect candidate indexes for each source plan.
*/
object CandidateIndexCollector extends ActiveSparkSession {
private val sourceFilters: Seq[SourcePlanIndexFilter] =
ColumnSchemaFilter :: FileSignatureFilter :: Nil

private def initializePlanToIndexes(
plan: LogicalPlan,
indexes: Seq[IndexLogEntry]): PlanToIndexesMap = {
val provider = Hyperspace.getContext(spark).sourceProviderManager
plan.collect {
case l: LeafNode if provider.isSupportedRelation(l) =>
(l.asInstanceOf[LogicalPlan], indexes)
}.toMap
}

/**
* Extract candidate indexes for each source plan in the given query plan.
*
* @param plan Original query plan
* @param allIndexes All indexes
* @return Map of source plan to candidate indexes
*/
def apply(plan: LogicalPlan, allIndexes: Seq[IndexLogEntry]): PlanToIndexesMap = {
val planToIndexes = initializePlanToIndexes(plan, allIndexes)
planToIndexes.flatMap {
case (node, allIndexes) =>
Some(
node,
sourceFilters.foldLeft(allIndexes) { (indexes, filter) =>
filter(node, indexes)
}).filter(_._2.nonEmpty)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.microsoft.hyperspace.index.rules

import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

import com.microsoft.hyperspace.index.IndexLogEntry
import com.microsoft.hyperspace.util.ResolverUtils

/**
* Check if the given source plan contains all index columns.
*/
object ColumnSchemaFilter extends SourcePlanIndexFilter {
override def apply(plan: LogicalPlan, indexes: Seq[IndexLogEntry]): Seq[IndexLogEntry] = {
val relationColumnNames = plan.output.map(_.name)

indexes.filter { index =>
withFilterReasonTag(
plan,
index,
"Column Schema does not match. " +
s"Relation columns: [${relationColumnNames.mkString(", ")}], " +
s"Index columns: [${(index.derivedDataset.referencedColumns).mkString(", ")}]") {
ResolverUtils
.resolve(spark, index.derivedDataset.referencedColumns, relationColumnNames)
.isDefined
}
}
}
}
Loading

0 comments on commit f41e610

Please sign in to comment.