Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

Commit

Permalink
Fix FilterReason class
Browse files Browse the repository at this point in the history
  • Loading branch information
sezruby committed Jul 14, 2021
1 parent 9ec1ca2 commit 28cbb3d
Show file tree
Hide file tree
Showing 8 changed files with 182 additions and 156 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,8 @@ object FilterColumnFilter extends QueryPlanIndexFilter {
index,
FilterReasons.apply(
FilterReasonCode.NO_FIRST_INDEXED_COL_COND,
Seq(
("firstIndexedCol", index.derivedDataset.indexedColumns.head),
("filterColumns", filterColumnNames.mkString(", "))))) {
("firstIndexedCol", index.derivedDataset.indexedColumns.head),
("filterColumns", filterColumnNames.mkString(", ")))) {
ResolverUtils
.resolve(spark, index.derivedDataset.indexedColumns.head, filterColumnNames)
.isDefined
Expand All @@ -111,9 +110,8 @@ object FilterColumnFilter extends QueryPlanIndexFilter {
index,
FilterReasons.apply(
FilterReasonCode.MISSING_REQUIRED_COL,
Seq(
("requiredCols", (filterColumnNames ++ projectColumnNames).toSet.mkString(",")),
("indexCols", index.derivedDataset.referencedColumns.mkString(","))))) {
("requiredCols", (filterColumnNames ++ projectColumnNames).toSet.mkString(",")),
("indexCols", index.derivedDataset.referencedColumns.mkString(",")))) {
ResolverUtils
.resolve(
spark,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ object JoinPlanNodeFilter extends QueryPlanIndexFilter {
leftAndRightIndexes,
FilterReasons.apply(
FilterReasonCode.NOT_ELIGIBLE_JOIN,
Seq(("reason", "Non equi-join or has literal")))) {
("reason", "Non equi-join or has literal"))) {
isJoinConditionSupported(condition)
}

Expand All @@ -78,7 +78,7 @@ object JoinPlanNodeFilter extends QueryPlanIndexFilter {
leftAndRightIndexes,
FilterReasons.apply(
FilterReasonCode.NOT_ELIGIBLE_JOIN,
Seq(("reason", "Non linear left child plan")))) {
("reason", "Non linear left child plan"))) {
isPlanLinear(l)
}
val rightPlanLinearCond =
Expand All @@ -87,7 +87,7 @@ object JoinPlanNodeFilter extends QueryPlanIndexFilter {
leftAndRightIndexes,
FilterReasons.apply(
FilterReasonCode.NOT_ELIGIBLE_JOIN,
Seq(("reason", "Non linear right child plan")))) {
("reason", "Non linear right child plan"))) {
isPlanLinear(r)
}

Expand All @@ -110,7 +110,7 @@ object JoinPlanNodeFilter extends QueryPlanIndexFilter {
candidateIndexes.values.flatten.toSeq,
FilterReasons.apply(
FilterReasonCode.NOT_ELIGIBLE_JOIN,
Seq(("reason", "No join condition"))))
("reason", "No join condition")))
Map.empty
case _ =>
Map.empty
Expand Down Expand Up @@ -180,7 +180,7 @@ object JoinAttributeFilter extends QueryPlanIndexFilter {
candidateIndexes.head._2 ++ candidateIndexes.last._2,
FilterReasons.apply(
FilterReasonCode.NOT_ELIGIBLE_JOIN,
Seq(("reason", "incompatible left and right join columns")))) {
("reason", "incompatible left and right join columns"))) {
ensureAttributeRequirements(
JoinIndexRule.leftRelation.get,
JoinIndexRule.rightRelation.get,
Expand Down Expand Up @@ -365,15 +365,13 @@ object JoinColumnFilter extends QueryPlanIndexFilter {
if (withFilterReasonTag(
plan,
candidateIndexes.head._2 ++ candidateIndexes.last._2,
FilterReasons.apply(
FilterReasonCode.NO_AVAIL_JOIN_INDEX_PAIR,
Seq(("child", "left"))))(lIndexes.nonEmpty) &&
FilterReasons.apply(FilterReasonCode.NO_AVAIL_JOIN_INDEX_PAIR, ("child", "left")))(
lIndexes.nonEmpty) &&
withFilterReasonTag(
plan,
candidateIndexes.head._2 ++ candidateIndexes.last._2,
FilterReasons.apply(
FilterReasonCode.NO_AVAIL_JOIN_INDEX_PAIR,
Seq(("child", "right"))))(rIndexes.nonEmpty)) {
FilterReasons.apply(FilterReasonCode.NO_AVAIL_JOIN_INDEX_PAIR, ("child", "right")))(
rIndexes.nonEmpty)) {
Map(leftRelation.plan -> lIndexes, rightRelation.plan -> rIndexes)
} else {
Map.empty
Expand Down Expand Up @@ -432,21 +430,19 @@ object JoinColumnFilter extends QueryPlanIndexFilter {
idx,
FilterReasons.apply(
FilterReasonCode.NOT_ALL_JOIN_COL_INDEXED,
Seq(
("child", leftOrRight),
("joinCols", requiredIndexCols.mkString(", ")),
("indexedCols", idx.indexedColumns.mkString(", "))))) {
("child", leftOrRight),
("joinCols", requiredIndexCols.mkString(", ")),
("indexedCols", idx.indexedColumns.mkString(", ")))) {
requiredIndexCols.toSet.equals(idx.indexedColumns.toSet)
} &&
withFilterReasonTag(
plan,
idx,
FilterReasons.apply(
FilterReasonCode.MISSING_INDEXED_COL,
Seq(
("child", leftOrRight),
("requiredIndexedCols", allRequiredCols.mkString(", ")),
("IndexedCols", idx.indexedColumns.mkString(", "))))) {
("child", leftOrRight),
("requiredIndexedCols", allRequiredCols.mkString(", ")),
("IndexedCols", idx.indexedColumns.mkString(", ")))) {
allRequiredCols.forall(allCols.contains)
}
}
Expand Down Expand Up @@ -530,7 +526,7 @@ object JoinRankFilter extends IndexRankFilter {
setFilterReasonTag(
plan,
indexes.head._2 ++ indexes.last._2,
FilterReasons.apply(FilterReasonCode.NO_COMPATIBLE_JOIN_INDEX_PAIR, Seq.empty))
FilterReasons.apply(FilterReasonCode.NO_COMPATIBLE_JOIN_INDEX_PAIR))
Map.empty
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ object CandidateIndexAnalyzer extends Logging {
stringBuilder.append(newLine)
stringBuilder.append(planWithHyperspace.toString)
stringBuilder.append(newLine)
stringBuilder.append(newLine)

def printIndexNames(indexNames: Seq[String]): Unit = {
indexNames.foreach { idxName =>
Expand Down Expand Up @@ -207,7 +206,7 @@ object CandidateIndexAnalyzer extends Logging {
subPlanLocStr,
index.name,
index.derivedDataset.kindAbbr,
reason.code,
reason.codeStr,
reason.argStr,
reason.verboseStr)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,100 +16,176 @@

package com.microsoft.hyperspace.index.plananalysis

import com.microsoft.hyperspace.index.plananalysis.FilterReasonCode._

class FilterReason(
val code: String,
argStrings: => Seq[(String, String)],
verboseString: => String) {

def argStr: String = {
trait FilterReason {
val args: Seq[(String, String)]
def codeStr: String
final def argStr: String = {
// key1=[value1], key2=[value2]
argStrings.map(kv => s"${kv._1}=[${kv._2}]").mkString(",")
args.map(kv => s"${kv._1}=[${kv._2}]").mkString(",")
}
def verboseStr: String
}

def verboseStr: String = {
verboseString
}
trait FilterReasonNoArg extends FilterReason {
final override val args: Seq[(String, String)] = Seq.empty
}

object FilterReasonCode extends Enumeration {
type FilterReasonCode = Value

// Common
val COL_SCHEMA_MISMATCH = Value
val SOURCE_DATA_CHANGED = Value
val NO_DELETE_SUPPORT = Value
val NO_COMMON_FILES = Value
val TOO_MUCH_APPENDED = Value
val TOO_MUCH_DELETED = Value
val ANOTHER_INDEX_APPLIED = Value

// CoveringIndex - FilterIndexRule
val NO_FIRST_INDEXED_COL_COND = Value
val MISSING_REQUIRED_COL = Value

// CoveringIndex - JoinIndexRule
val NOT_ELIGIBLE_JOIN = Value
val NO_AVAIL_JOIN_INDEX_PAIR = Value
val NO_COMPATIBLE_JOIN_INDEX_PAIR = Value
val NOT_ALL_JOIN_COL_INDEXED = Value
val MISSING_INDEXED_COL = Value
}

object FilterReasons {
def apply(code: FilterReasonCode, argStrings: => Seq[(String, String)]): FilterReason = {
import com.microsoft.hyperspace.index.plananalysis.FilterReasonCode._
def apply(code: FilterReasonCode, args: (String, String)*): FilterReason = {
code match {
case COL_SCHEMA_MISMATCH =>
new FilterReason(
code.toString,
argStrings,
"Column Schema does not match. Source data columns: [" + argStrings(0)._2 +
"], Index columns: [" + argStrings(1)._2)
case SOURCE_DATA_CHANGE =>
new FilterReason(code.toString, argStrings, "Index signature does not match.")
ColSchemaMismatch(args)
case SOURCE_DATA_CHANGED =>
SourceDataChanged()
case NO_DELETE_SUPPORT =>
new FilterReason(code.toString, argStrings, "Index doesn't support deleted files.")
NoDeleteSupport()
case NO_COMMON_FILES =>
new FilterReason(code.toString, argStrings, "No common files.")
NoCommonFiles()
case TOO_MUCH_APPENDED =>
new FilterReason(
code.toString,
argStrings,
s"Appended bytes ratio (${argStrings(0)._2}) is larger than " +
s"threshold config ${argStrings(1)._2}). ")
TooMuchAppended(args)
case TOO_MUCH_DELETED =>
new FilterReason(
code.toString,
argStrings,
s"Deleted bytes ratio (${argStrings(0)._2}) is larger than " +
s"threshold config ${argStrings(1)._2}). ")
case NO_FIRST_INDEXED_COL_COND =>
new FilterReason(
code.toString,
argStrings,
"The first indexed column should be used in filter conditions. " +
s"The first indexed column: ${argStrings(0)._2}, " +
s"Columns in filter condition: [${argStrings(1)._2}]")
TooMuchDeleted(args)
case MISSING_REQUIRED_COL =>
new FilterReason(
code.toString,
argStrings,
s"Index does not contain required column. Required columns: [${argStrings(0)._2}], " +
s"Index columns: [${argStrings(1)._2}]")
MissingRequiredCol(args)
case NO_FIRST_INDEXED_COL_COND =>
NoFirstIndexedColCond(args)
case NOT_ELIGIBLE_JOIN =>
new FilterReason(
code.toString,
argStrings,
s"Join condition is not eligible. Reason: ${argStrings(0)._2}")
NotEligibleJoin(args)
case NO_AVAIL_JOIN_INDEX_PAIR =>
new FilterReason(
code.toString,
argStrings,
s"No available indexes for ${argStrings(0)._2} subplan. " +
"Both left and right index are required for Join query")
case NOT_ALL_JOIN_COL_INDEXED =>
new FilterReason(
code.toString,
argStrings,
s"All join condition column should be the indexed columns. " +
s"Join columns: [${argStrings(0)._2}], Indexed columns: [${argStrings(1)._2}]")
NoAvailJoinIndexPair(args)
case MISSING_INDEXED_COL =>
new FilterReason(
code.toString,
argStrings,
s"Index does not contain required columns for ${argStrings(0)._2} subplan. " +
s"Required indexed columns: [${argStrings(1)._2}], " +
s"Indexed columns: [${argStrings(2)._2}]")
MissingIndexedCol(args)
case NOT_ALL_JOIN_COL_INDEXED =>
NotAllJoinColIndexed(args)
case NO_COMPATIBLE_JOIN_INDEX_PAIR =>
new FilterReason(
code.toString,
argStrings,
"No compatible left and right index pair."
)
NoCompatibleJoinIndexPair()
case ANOTHER_INDEX_APPLIED =>
new FilterReason(
code.toString,
argStrings,
s"Another candidate index is applied: ${argStrings(0)._2}"
)
AnotherIndexApplied(args)
}
}

case class ColSchemaMismatch(override val args: Seq[(String, String)])
extends FilterReason {
override final val codeStr: String = "COL_SCHEMA_MISMATCH"
override def verboseStr: String = {
"Column Schema does not match. Source data columns: [" + args(0)._2 +
"], Index columns: [" + args(1)._2
}
}

case class SourceDataChanged() extends FilterReasonNoArg {
override final val codeStr: String = "SOURCE_DATA_CHANGED"
override def verboseStr: String = "Index signature does not match."
}

case class NoDeleteSupport() extends FilterReasonNoArg {
override def codeStr: String = "NO_DELETE_SUPPORT"
override def verboseStr: String = "Index doesn't support deleted files."
}

case class NoCommonFiles() extends FilterReasonNoArg {
override def codeStr: String = "NO_COMMON_FILES"
override def verboseStr: String = "No common files."
}

case class TooMuchAppended(override val args: Seq[(String, String)])
extends FilterReason {
override def codeStr: String = "TOO_MUCH_APPENDED"
override def verboseStr: String =
s"Appended bytes ratio (${args(0)._2}) is larger than " +
s"threshold config ${args(1)._2}). "
}

case class TooMuchDeleted(override val args: Seq[(String, String)]) extends FilterReason {
override def codeStr: String = "TOO_MUCH_DELETED"
override def verboseStr: String =
s"Deleted bytes ratio (${args(0)._2}) is larger than " +
s"threshold config ${args(1)._2}). "
}

case class MissingRequiredCol(override val args: Seq[(String, String)])
extends FilterReason {
override def codeStr: String = "MISSING_REQUIRED_COL"
override def verboseStr: String =
s"Index does not contain required column. Required columns: [${args(0)._2}], " +
s"Index columns: [${args(1)._2}]"
}

case class NoFirstIndexedColCond(override val args: Seq[(String, String)])
extends FilterReason {
override def codeStr: String = "NO_FIRST_INDEXED_COL_COND"
override def verboseStr: String =
"The first indexed column should be used in filter conditions. " +
s"The first indexed column: ${args(0)._2}, " +
s"Columns in filter condition: [${args(1)._2}]"
}

case class NotEligibleJoin(override val args: Seq[(String, String)])
extends FilterReason {
override def codeStr: String = "NOT_ELIGIBLE_JOIN"
override def verboseStr: String =
s"Join condition is not eligible. Reason: ${args(0)._2}"
}

case class NoAvailJoinIndexPair(override val args: Seq[(String, String)])
extends FilterReason {
override def codeStr: String = "NO_AVAIL_JOIN_INDEX_PAIR"
override def verboseStr: String =
s"No available indexes for ${args(0)._2} subplan. " +
"Both left and right index are required for Join query"
}

case class MissingIndexedCol(override val args: Seq[(String, String)])
extends FilterReason {
override def codeStr: String = "MISSING_INDEXED_COL"
override def verboseStr: String =
s"Index does not contain required columns for ${args(0)._2} subplan. " +
s"Required indexed columns: [${args(1)._2}], " +
s"Indexed columns: [${args(2)._2}]"
}

case class NotAllJoinColIndexed(override val args: Seq[(String, String)])
extends FilterReason {
override def codeStr: String = "NOT_ALL_JOIN_COL_INDEXED"
override def verboseStr: String =
s"All join condition column should be the indexed columns. " +
s"Join columns: [${args(0)._2}], Indexed columns: [${args(1)._2}]"
}

case class NoCompatibleJoinIndexPair() extends FilterReasonNoArg {
override def codeStr: String = "NO_COMPATIBLE_JOIN_INDEX_PAIR"
override def verboseStr: String = "No compatible left and right index pair."
}

case class AnotherIndexApplied(override val args: Seq[(String, String)])
extends FilterReason {
override def codeStr: String = "ANOTHER_INDEX_APPLIED"
override def verboseStr: String =
s"Another candidate index is applied: ${args(0)._2}"
}
}
Loading

0 comments on commit 28cbb3d

Please sign in to comment.