Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into whynot4
Browse files Browse the repository at this point in the history
  • Loading branch information
sezruby committed Aug 2, 2021
2 parents 4155cad + 467a891 commit 1257950
Show file tree
Hide file tree
Showing 11 changed files with 263 additions and 124 deletions.
10 changes: 0 additions & 10 deletions azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,6 @@ jobs:
sparkVersionUnderBar: "3_0"
scalaVersion: "2.12.8"

- job: Build_Spark3_1_2_12_WIN
displayName: 'Build sources and run unit tests for Spark 3.1 / Scala 2.12 on Windows'
pool:
vmImage: 'windows-2019'
steps:
- template: 'ci/windows_test.yml'
parameters:
sparkVersionUnderBar: "3_1"
scalaVersion: "2.12.8"

- job: PythonTest
displayName: 'Run Python tests'
pool:
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,10 @@ import org.apache.spark.sql.types.{LongType, StructType}
import com.microsoft.hyperspace.Hyperspace
import com.microsoft.hyperspace.index._
import com.microsoft.hyperspace.index.plans.logical.{BucketUnion, IndexHadoopFsRelation}
import com.microsoft.hyperspace.index.sources.FileBasedRelation
import com.microsoft.hyperspace.index.rules.RuleUtils
import com.microsoft.hyperspace.util.HyperspaceConf

object RuleUtils {

/**
* Check if an index was applied the given relation or not.
* This can be determined by an identifier in [[FileBasedRelation]]'s options.
*
* @param relation FileBasedRelation to check if an index is already applied.
* @return true if an index is applied to the given relation. Otherwise false.
*/
def isIndexApplied(relation: FileBasedRelation): Boolean = {
relation.options.exists(_.equals(IndexConstants.INDEX_RELATION_IDENTIFIER))
}
object CoveringIndexRuleUtils {

/**
* Transform the current plan to utilize the given index.
Expand All @@ -70,7 +59,7 @@ object RuleUtils {
useBucketSpec: Boolean,
useBucketUnionForAppended: Boolean): LogicalPlan = {
// Check pre-requisite.
val relation = getRelation(spark, plan)
val relation = RuleUtils.getRelation(spark, plan)
assert(relation.isDefined)

// If there is no change in source data files, the index can be applied by
Expand All @@ -93,23 +82,6 @@ object RuleUtils {
transformed
}

/**
* Extract the relation node if the given logical plan is linear.
*
* @param plan Logical plan to extract a relation node from.
* @return If the plan is linear and the relation node is supported, the [[FileBasedRelation]]
* object that wraps the relation node. Otherwise None.
*/
def getRelation(spark: SparkSession, plan: LogicalPlan): Option[FileBasedRelation] = {
val provider = Hyperspace.getContext(spark).sourceProviderManager
val leaves = plan.collectLeaves()
if (leaves.size == 1 && provider.isSupportedRelation(leaves.head)) {
Some(provider.getRelation(leaves.head))
} else {
None
}
}

/**
* Transform the current plan to utilize index.
* The transformed plan reads data from indexes instead of the source relations.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,12 @@ package com.microsoft.hyperspace.index.covering

import org.apache.spark.sql.catalyst.analysis.CleanupAliases
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LeafNode, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}

import com.microsoft.hyperspace.{ActiveSparkSession, Hyperspace}
import com.microsoft.hyperspace.index.IndexLogEntryTags
import com.microsoft.hyperspace.index.plananalysis.FilterReasons
import com.microsoft.hyperspace.index.rules.{HyperspaceRule, IndexRankFilter, QueryPlanIndexFilter}
import com.microsoft.hyperspace.index.rules._
import com.microsoft.hyperspace.index.rules.ApplyHyperspace.{PlanToIndexesMap, PlanToSelectedIndexMap}
import com.microsoft.hyperspace.index.sources.FileBasedRelation
import com.microsoft.hyperspace.util.{HyperspaceConf, ResolverUtils}

/**
Expand Down Expand Up @@ -142,25 +140,14 @@ object FilterRankFilter extends IndexRankFilter {
}
}

object ExtractRelation extends ActiveSparkSession {
def unapply(plan: LeafNode): Option[FileBasedRelation] = {
val provider = Hyperspace.getContext(spark).sourceProviderManager
if (provider.isSupportedRelation(plan)) {
Some(provider.getRelation(plan))
} else {
None
}
}
}

/**
* FilterIndexRule looks for opportunities in a logical plan to replace
* a relation with an available hash partitioned index according to columns in
* filter predicate.
*/
object FilterIndexRule extends HyperspaceRule {
override val filtersOnQueryPlan: Seq[QueryPlanIndexFilter] =
CoveringIndexFilter :: FilterPlanNodeFilter :: FilterColumnFilter :: Nil
IndexTypeFilter[CoveringIndex]() :: FilterPlanNodeFilter :: FilterColumnFilter :: Nil

override val indexRanker: IndexRankFilter = FilterRankFilter

Expand All @@ -172,7 +159,7 @@ object FilterIndexRule extends HyperspaceRule {
// As FilterIndexRule is not intended to support bucketed scan, we set
// useBucketUnionForAppended as false. If it's true, Hybrid Scan can cause
// unnecessary shuffle for appended data to apply BucketUnion for merging data.
RuleUtils.transformPlanToUseIndex(
CoveringIndexRuleUtils.transformPlanToUseIndex(
spark,
indexes.head._2,
plan,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import com.microsoft.hyperspace.Hyperspace
import com.microsoft.hyperspace.index.{IndexLogEntry, IndexLogEntryTags}
import com.microsoft.hyperspace.index.covering.JoinAttributeFilter.extractConditions
import com.microsoft.hyperspace.index.plananalysis.FilterReasons
import com.microsoft.hyperspace.index.rules.{HyperspaceRule, IndexRankFilter, QueryPlanIndexFilter}
import com.microsoft.hyperspace.index.rules.{HyperspaceRule, IndexRankFilter, IndexTypeFilter, QueryPlanIndexFilter, RuleUtils}
import com.microsoft.hyperspace.index.rules.ApplyHyperspace.{PlanToIndexesMap, PlanToSelectedIndexMap}
import com.microsoft.hyperspace.index.sources.FileBasedRelation
import com.microsoft.hyperspace.shim.JoinWithoutHint
Expand Down Expand Up @@ -620,7 +620,11 @@ object JoinRankFilter extends IndexRankFilter {
object JoinIndexRule extends HyperspaceRule with HyperspaceEventLogging {

override val filtersOnQueryPlan: Seq[QueryPlanIndexFilter] =
CoveringIndexFilter :: JoinPlanNodeFilter :: JoinAttributeFilter :: JoinColumnFilter :: Nil
IndexTypeFilter[CoveringIndex]() ::
JoinPlanNodeFilter ::
JoinAttributeFilter ::
JoinColumnFilter ::
Nil

override val indexRanker: IndexRankFilter = JoinRankFilter

Expand All @@ -643,13 +647,13 @@ object JoinIndexRule extends HyperspaceRule with HyperspaceEventLogging {
val updatedPlan =
join
.copy(
left = RuleUtils.transformPlanToUseIndex(
left = CoveringIndexRuleUtils.transformPlanToUseIndex(
spark,
lIndex,
l,
useBucketSpec = true,
useBucketUnionForAppended = true),
right = RuleUtils.transformPlanToUseIndex(
right = CoveringIndexRuleUtils.transformPlanToUseIndex(
spark,
rIndex,
r,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.microsoft.hyperspace.index.rules

import org.apache.spark.sql.catalyst.plans.logical.LeafNode

import com.microsoft.hyperspace.{ActiveSparkSession, Hyperspace}
import com.microsoft.hyperspace.index.sources.FileBasedRelation

object ExtractRelation extends ActiveSparkSession {
def unapply(plan: LeafNode): Option[FileBasedRelation] = {
val provider = Hyperspace.getContext(spark).sourceProviderManager
if (provider.isSupportedRelation(plan)) {
Some(provider.getRelation(plan))
} else {
None
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.microsoft.hyperspace.index.rules

import scala.reflect.ClassTag

import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

import com.microsoft.hyperspace.index.Index
import com.microsoft.hyperspace.index.rules.ApplyHyperspace.PlanToIndexesMap

object IndexTypeFilter {

/**
* Returns a [[QueryPlanIndexFilter]] that filters out indexes which are not T.
*/
def apply[T <: Index: ClassTag](): QueryPlanIndexFilter =
new QueryPlanIndexFilter {
override def apply(
plan: LogicalPlan,
candidateIndexes: PlanToIndexesMap): PlanToIndexesMap = {
candidateIndexes
.map {
case (plan, indexes) =>
plan -> indexes.filter {
_.derivedDataset match {
case _: T => true
case _ => false
}
}
}
.filter { case (_, indexes) => indexes.nonEmpty }
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.microsoft.hyperspace.index.rules

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

import com.microsoft.hyperspace.Hyperspace
import com.microsoft.hyperspace.index.IndexConstants
import com.microsoft.hyperspace.index.sources.FileBasedRelation

object RuleUtils {

/**
* Check if an index was applied the given relation or not.
* This can be determined by an identifier in [[FileBasedRelation]]'s options.
*
* @param relation FileBasedRelation to check if an index is already applied.
* @return true if an index is applied to the given relation. Otherwise false.
*/
def isIndexApplied(relation: FileBasedRelation): Boolean = {
relation.options.exists(_.equals(IndexConstants.INDEX_RELATION_IDENTIFIER))
}

/**
* Extract the relation node if the given logical plan is linear.
*
* @param plan Logical plan to extract a relation node from.
* @return If the plan is linear and the relation node is supported, the [[FileBasedRelation]]
* object that wraps the relation node. Otherwise None.
*/
def getRelation(spark: SparkSession, plan: LogicalPlan): Option[FileBasedRelation] = {
val provider = Hyperspace.getContext(spark).sourceProviderManager
val leaves = plan.collectLeaves()
if (leaves.size == 1 && provider.isSupportedRelation(leaves.head)) {
Some(provider.getRelation(leaves.head))
} else {
None
}
}
}
Loading

0 comments on commit 1257950

Please sign in to comment.