Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

Commit

Permalink
Data Skipping Index Part 1: Refactoring (#481)
Browse files Browse the repository at this point in the history
  • Loading branch information
Chungmin Lee authored Jul 29, 2021
1 parent 12e61c6 commit 467a891
Show file tree
Hide file tree
Showing 10 changed files with 263 additions and 114 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,10 @@ import org.apache.spark.sql.types.{LongType, StructType}
import com.microsoft.hyperspace.Hyperspace
import com.microsoft.hyperspace.index._
import com.microsoft.hyperspace.index.plans.logical.{BucketUnion, IndexHadoopFsRelation}
import com.microsoft.hyperspace.index.sources.FileBasedRelation
import com.microsoft.hyperspace.index.rules.RuleUtils
import com.microsoft.hyperspace.util.HyperspaceConf

object RuleUtils {

/**
* Check if an index was applied the given relation or not.
* This can be determined by an identifier in [[FileBasedRelation]]'s options.
*
* @param relation FileBasedRelation to check if an index is already applied.
* @return true if an index is applied to the given relation. Otherwise false.
*/
def isIndexApplied(relation: FileBasedRelation): Boolean = {
relation.options.exists(_.equals(IndexConstants.INDEX_RELATION_IDENTIFIER))
}
object CoveringIndexRuleUtils {

/**
* Transform the current plan to utilize the given index.
Expand All @@ -70,7 +59,7 @@ object RuleUtils {
useBucketSpec: Boolean,
useBucketUnionForAppended: Boolean): LogicalPlan = {
// Check pre-requisite.
val relation = getRelation(spark, plan)
val relation = RuleUtils.getRelation(spark, plan)
assert(relation.isDefined)

// If there is no change in source data files, the index can be applied by
Expand All @@ -93,23 +82,6 @@ object RuleUtils {
transformed
}

/**
* Extract the relation node if the given logical plan is linear.
*
* @param plan Logical plan to extract a relation node from.
* @return If the plan is linear and the relation node is supported, the [[FileBasedRelation]]
* object that wraps the relation node. Otherwise None.
*/
def getRelation(spark: SparkSession, plan: LogicalPlan): Option[FileBasedRelation] = {
val provider = Hyperspace.getContext(spark).sourceProviderManager
val leaves = plan.collectLeaves()
if (leaves.size == 1 && provider.isSupportedRelation(leaves.head)) {
Some(provider.getRelation(leaves.head))
} else {
None
}
}

/**
* Transform the current plan to utilize index.
* The transformed plan reads data from indexes instead of the source relations.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,11 @@ package com.microsoft.hyperspace.index.covering

import org.apache.spark.sql.catalyst.analysis.CleanupAliases
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LeafNode, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}

import com.microsoft.hyperspace.{ActiveSparkSession, Hyperspace}
import com.microsoft.hyperspace.index.IndexLogEntryTags
import com.microsoft.hyperspace.index.rules.{HyperspaceRule, IndexRankFilter, QueryPlanIndexFilter}
import com.microsoft.hyperspace.index.rules._
import com.microsoft.hyperspace.index.rules.ApplyHyperspace.{PlanToIndexesMap, PlanToSelectedIndexMap}
import com.microsoft.hyperspace.index.sources.FileBasedRelation
import com.microsoft.hyperspace.util.{HyperspaceConf, ResolverUtils}

/**
Expand Down Expand Up @@ -139,25 +137,14 @@ object FilterRankFilter extends IndexRankFilter {
}
}

object ExtractRelation extends ActiveSparkSession {
def unapply(plan: LeafNode): Option[FileBasedRelation] = {
val provider = Hyperspace.getContext(spark).sourceProviderManager
if (provider.isSupportedRelation(plan)) {
Some(provider.getRelation(plan))
} else {
None
}
}
}

/**
* FilterIndexRule looks for opportunities in a logical plan to replace
* a relation with an available hash partitioned index according to columns in
* filter predicate.
*/
object FilterIndexRule extends HyperspaceRule {
override val filtersOnQueryPlan: Seq[QueryPlanIndexFilter] =
CoveringIndexFilter :: FilterPlanNodeFilter :: FilterColumnFilter :: Nil
IndexTypeFilter[CoveringIndex]() :: FilterPlanNodeFilter :: FilterColumnFilter :: Nil

override val indexRanker: IndexRankFilter = FilterRankFilter

Expand All @@ -169,7 +156,7 @@ object FilterIndexRule extends HyperspaceRule {
// As FilterIndexRule is not intended to support bucketed scan, we set
// useBucketUnionForAppended as false. If it's true, Hybrid Scan can cause
// unnecessary shuffle for appended data to apply BucketUnion for merging data.
RuleUtils.transformPlanToUseIndex(
CoveringIndexRuleUtils.transformPlanToUseIndex(
spark,
indexes.head._2,
plan,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
import com.microsoft.hyperspace.Hyperspace
import com.microsoft.hyperspace.index.{IndexLogEntry, IndexLogEntryTags}
import com.microsoft.hyperspace.index.covering.JoinAttributeFilter.extractConditions
import com.microsoft.hyperspace.index.rules.{HyperspaceRule, IndexRankFilter, QueryPlanIndexFilter}
import com.microsoft.hyperspace.index.rules.{HyperspaceRule, IndexRankFilter, IndexTypeFilter, QueryPlanIndexFilter, RuleUtils}
import com.microsoft.hyperspace.index.rules.ApplyHyperspace.{PlanToIndexesMap, PlanToSelectedIndexMap}
import com.microsoft.hyperspace.index.sources.FileBasedRelation
import com.microsoft.hyperspace.shim.JoinWithoutHint
Expand Down Expand Up @@ -618,7 +618,11 @@ object JoinRankFilter extends IndexRankFilter {
object JoinIndexRule extends HyperspaceRule with HyperspaceEventLogging {

override val filtersOnQueryPlan: Seq[QueryPlanIndexFilter] =
CoveringIndexFilter :: JoinPlanNodeFilter :: JoinAttributeFilter :: JoinColumnFilter :: Nil
IndexTypeFilter[CoveringIndex]() ::
JoinPlanNodeFilter ::
JoinAttributeFilter ::
JoinColumnFilter ::
Nil

override val indexRanker: IndexRankFilter = JoinRankFilter

Expand All @@ -641,13 +645,13 @@ object JoinIndexRule extends HyperspaceRule with HyperspaceEventLogging {
val updatedPlan =
join
.copy(
left = RuleUtils.transformPlanToUseIndex(
left = CoveringIndexRuleUtils.transformPlanToUseIndex(
spark,
lIndex,
l,
useBucketSpec = true,
useBucketUnionForAppended = true),
right = RuleUtils.transformPlanToUseIndex(
right = CoveringIndexRuleUtils.transformPlanToUseIndex(
spark,
rIndex,
r,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.microsoft.hyperspace.index.rules

import org.apache.spark.sql.catalyst.plans.logical.LeafNode

import com.microsoft.hyperspace.{ActiveSparkSession, Hyperspace}
import com.microsoft.hyperspace.index.sources.FileBasedRelation

object ExtractRelation extends ActiveSparkSession {
def unapply(plan: LeafNode): Option[FileBasedRelation] = {
val provider = Hyperspace.getContext(spark).sourceProviderManager
if (provider.isSupportedRelation(plan)) {
Some(provider.getRelation(plan))
} else {
None
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.microsoft.hyperspace.index.rules

import scala.reflect.ClassTag

import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

import com.microsoft.hyperspace.index.Index
import com.microsoft.hyperspace.index.rules.ApplyHyperspace.PlanToIndexesMap

object IndexTypeFilter {

/**
* Returns a [[QueryPlanIndexFilter]] that filters out indexes which are not T.
*/
def apply[T <: Index: ClassTag](): QueryPlanIndexFilter =
new QueryPlanIndexFilter {
override def apply(
plan: LogicalPlan,
candidateIndexes: PlanToIndexesMap): PlanToIndexesMap = {
candidateIndexes
.map {
case (plan, indexes) =>
plan -> indexes.filter {
_.derivedDataset match {
case _: T => true
case _ => false
}
}
}
.filter { case (_, indexes) => indexes.nonEmpty }
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.microsoft.hyperspace.index.rules

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

import com.microsoft.hyperspace.Hyperspace
import com.microsoft.hyperspace.index.IndexConstants
import com.microsoft.hyperspace.index.sources.FileBasedRelation

object RuleUtils {

/**
* Check if an index was applied the given relation or not.
* This can be determined by an identifier in [[FileBasedRelation]]'s options.
*
* @param relation FileBasedRelation to check if an index is already applied.
* @return true if an index is applied to the given relation. Otherwise false.
*/
def isIndexApplied(relation: FileBasedRelation): Boolean = {
relation.options.exists(_.equals(IndexConstants.INDEX_RELATION_IDENTIFIER))
}

/**
* Extract the relation node if the given logical plan is linear.
*
* @param plan Logical plan to extract a relation node from.
* @return If the plan is linear and the relation node is supported, the [[FileBasedRelation]]
* object that wraps the relation node. Otherwise None.
*/
def getRelation(spark: SparkSession, plan: LogicalPlan): Option[FileBasedRelation] = {
val provider = Hyperspace.getContext(spark).sourceProviderManager
val leaves = plan.collectLeaves()
if (leaves.size == 1 && provider.isSupportedRelation(leaves.head)) {
Some(provider.getRelation(leaves.head))
} else {
None
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ package com.microsoft.hyperspace.index.covering
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, IsNotNull}
import org.apache.spark.sql.catalyst.plans.{JoinType, SQLHelper}
import org.apache.spark.sql.catalyst.plans.SQLHelper
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InMemoryFileIndex, LogicalRelation, NoopCache}
import org.apache.spark.sql.types.{IntegerType, StringType}

import com.microsoft.hyperspace.index.HyperspaceRuleSuite
import com.microsoft.hyperspace.shim.{JoinWithoutHint, RepartitionByExpressionWithOptionalNumPartitions}
import com.microsoft.hyperspace.shim.RepartitionByExpressionWithOptionalNumPartitions

class RuleUtilsTest extends HyperspaceRuleSuite with SQLHelper {
class CoveringIndexRuleUtilsTest extends HyperspaceRuleSuite with SQLHelper {
override val indexLocationDirName = "ruleUtilsTest"

val t1c1 = AttributeReference("t1c1", IntegerType)()
Expand Down Expand Up @@ -85,20 +85,6 @@ class RuleUtilsTest extends HyperspaceRuleSuite with SQLHelper {
createIndexLogEntry("t2i2", Seq(t2c1, t2c2), Seq(t2c3), t2ProjectNode)
}

test("Verify get logical relation for single logical relation node plan.") {
validateLogicalRelation(t1ScanNode, t1ScanNode)
}

test("Verify get logical relation for multi-node linear plan.") {
validateLogicalRelation(t1ProjectNode, t1ScanNode)
}

test("Verify get logical relation for non-linear plan.") {
val joinNode = JoinWithoutHint(t1ProjectNode, t2ProjectNode, JoinType("inner"), None)
val r = RuleUtils.getRelation(spark, Project(Seq(t1c3, t2c3), joinNode))
assert(r.isEmpty)
}

test("Verify the location of injected shuffle for Hybrid Scan.") {
withTempPath { tempPath =>
val dataPath = tempPath.getAbsolutePath
Expand All @@ -112,7 +98,7 @@ class RuleUtilsTest extends HyperspaceRuleSuite with SQLHelper {
val df = spark.read.parquet(dataPath)
val query = df.filter(df("id") >= 3).select("id", "name")
val bucketSpec = BucketSpec(100, Seq("id"), Seq())
val shuffled = RuleUtils.transformPlanToShuffleUsingBucketSpec(
val shuffled = CoveringIndexRuleUtils.transformPlanToShuffleUsingBucketSpec(
bucketSpec,
query.queryExecution.optimizedPlan)

Expand Down Expand Up @@ -146,7 +132,7 @@ class RuleUtilsTest extends HyperspaceRuleSuite with SQLHelper {
val bucketSpec2 = BucketSpec(100, Seq("age"), Seq())
val query2 = df.filter(df("id") <= 3).select("id", "name")
val shuffled2 =
RuleUtils.transformPlanToShuffleUsingBucketSpec(
CoveringIndexRuleUtils.transformPlanToShuffleUsingBucketSpec(
bucketSpec2,
query2.queryExecution.optimizedPlan)
assert(shuffled2.collect {
Expand All @@ -163,10 +149,4 @@ class RuleUtilsTest extends HyperspaceRuleSuite with SQLHelper {
}.length == 1)
}
}

private def validateLogicalRelation(plan: LogicalPlan, expected: LogicalRelation): Unit = {
val r = RuleUtils.getRelation(spark, plan)
assert(r.isDefined)
assert(r.get.plan.equals(expected))
}
}
Loading

0 comments on commit 467a891

Please sign in to comment.