Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

Commit

Permalink
Refactor hashing and fingerprint functions
Browse files Browse the repository at this point in the history
Fixes #271.

With newly added classes for fingerprinting, now it is easy to make a
fingerprint from various types of data.

FingerprintBuilder can be used to build a fingerprint from data.
FileBasedRelation.signature now takes it as an argument, so the
implementations don't have to do the hashing themselves. They can just
use the provided builder.

For unorderd combining, one can use bitwise XOR to combine multiple
fingerprints. This way, the order becomes irrelavant.
  • Loading branch information
Chungmin Lee committed Apr 1, 2021
1 parent c900cf5 commit b6d584d
Show file tree
Hide file tree
Showing 28 changed files with 479 additions and 152 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,44 +19,39 @@ package com.microsoft.hyperspace.index
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}

import com.microsoft.hyperspace.Hyperspace
import com.microsoft.hyperspace.util.HashingUtils
import com.microsoft.hyperspace.util.fingerprint.{Fingerprint, FingerprintBuilder, FingerprintBuilderFactory}

/**
* [[FileBasedSignatureProvider]] provides the logical plan signature based on files in the
* relation. File metadata, eg. size, modification time and path, of each file in the
* relation will be used to generate the signature.
*
* Note that while the order of files in a single relation does not affect the signature,
* the order of relations in the plan do affect the signature calculation.
*
* If the given logical plan does not have any supported relations, no signature is provided.
*/
class FileBasedSignatureProvider extends LogicalPlanSignatureProvider {
class FileBasedSignatureProvider(fbf: FingerprintBuilderFactory)
extends LogicalPlanSignatureProvider {

/**
* Generate the signature of logical plan.
*
* @param logicalPlan logical plan of data frame.
* @return signature, if the logical plan has supported relations; Otherwise None.
*/
def signature(logicalPlan: LogicalPlan): Option[String] = {
fingerprintVisitor(logicalPlan).map(HashingUtils.md5Hex)
}

/**
* Visit logical plan and collect info needed for fingerprint.
*
* @param logicalPlan logical plan of data frame.
* @return fingerprint, if the logical plan has supported relations; Otherwise None.
*/
private def fingerprintVisitor(logicalPlan: LogicalPlan): Option[String] = {
def signature(logicalPlan: LogicalPlan): Option[Fingerprint] = {
val provider = Hyperspace.getContext.sourceProviderManager
var fingerprint = ""
val fb: FingerprintBuilder = fbf.create
var updated = false
logicalPlan.foreachUp {
case l: LeafNode if provider.isSupportedRelation(l) =>
fingerprint ++= provider.getRelation(l).signature
provider.getRelation(l).signature(fb).foreach { f =>
fb.add(f)
updated = true
}
case _ =>
}

fingerprint match {
case "" => None
case _ => Some(fingerprint)
}
if (updated) Some(fb.build()) else None
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ import org.apache.spark.sql.types.{DataType, StructType}

import com.microsoft.hyperspace.HyperspaceException
import com.microsoft.hyperspace.actions.Constants
import com.microsoft.hyperspace.util.{PathUtils, SchemaUtils}
import com.microsoft.hyperspace.util.PathUtils
import com.microsoft.hyperspace.util.fingerprint.Fingerprint

// IndexLogEntry-specific fingerprint to be temporarily used where fingerprint is not defined.
case class NoOpFingerprint() {
Expand Down Expand Up @@ -361,7 +362,7 @@ object CoveringIndex {
}

// IndexLogEntry-specific Signature that stores the signature provider and value.
case class Signature(provider: String, value: String)
case class Signature(provider: String, value: Fingerprint)

// IndexLogEntry-specific LogicalPlanFingerprint to store fingerprint of logical plan.
case class LogicalPlanFingerprint(properties: LogicalPlanFingerprint.Properties) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package com.microsoft.hyperspace.index

import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

import com.microsoft.hyperspace.util.HashingUtils
import com.microsoft.hyperspace.util.fingerprint.{Fingerprint, FingerprintBuilderFactory}

/**
* [[IndexSignatureProvider]] provides signature for a logical plan based on:
Expand All @@ -30,9 +30,9 @@ import com.microsoft.hyperspace.util.HashingUtils
* If the plan does not comply with [[FileBasedSignatureProvider]] or [[PlanSignatureProvider]]
* requirements for signature computation, then no signature will be provided for the plan.
*/
class IndexSignatureProvider extends LogicalPlanSignatureProvider {
private val fileBasedSignatureProvider = new FileBasedSignatureProvider
private val planSignatureProvider = new PlanSignatureProvider
class IndexSignatureProvider(fbf: FingerprintBuilderFactory) extends LogicalPlanSignatureProvider {
private val fileBasedSignatureProvider = new FileBasedSignatureProvider(fbf)
private val planSignatureProvider = new PlanSignatureProvider(fbf)

/**
* Generate the signature of logical plan.
Expand All @@ -41,10 +41,10 @@ class IndexSignatureProvider extends LogicalPlanSignatureProvider {
* @return signature, if both [[FileBasedSignatureProvider]] and [[PlanSignatureProvider]]
* can generate signature for the logical plan; Otherwise None.
*/
def signature(logicalPlan: LogicalPlan): Option[String] = {
def signature(logicalPlan: LogicalPlan): Option[Fingerprint] = {
fileBasedSignatureProvider.signature(logicalPlan).flatMap { f =>
planSignatureProvider.signature(logicalPlan).map { p =>
HashingUtils.md5Hex(f + p)
fbf.create.add(f).add(p).build()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@ import scala.util.{Success, Try}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.util.hyperspace.Utils

import com.microsoft.hyperspace.util.fingerprint.{Fingerprint, FingerprintBuilderFactory, MD5FingerprintBuilderFactory}

/**
* This trait contains the interface that provides the signature of logical plan.
*
* The implementation must have a constructor taking [[FingerprintBuilderFactory]] as an argument.
*/
trait LogicalPlanSignatureProvider {

Expand All @@ -36,15 +40,17 @@ trait LogicalPlanSignatureProvider {
* @param logicalPlan logical plan.
* @return signature if it can be computed w.r.t signature provider assumptions; Otherwise None.
*/
def signature(logicalPlan: LogicalPlan): Option[String]
def signature(logicalPlan: LogicalPlan): Option[Fingerprint]
}

/**
* Factory object for LogicalPlanSignatureProvider.
*/
object LogicalPlanSignatureProvider {
private val fbf: FingerprintBuilderFactory = new MD5FingerprintBuilderFactory

// Creates a default signature provider.
def create(): LogicalPlanSignatureProvider = new IndexSignatureProvider
def create(): LogicalPlanSignatureProvider = new IndexSignatureProvider(fbf)

/**
* Creates a parameterized signature provider.
Expand All @@ -53,7 +59,9 @@ object LogicalPlanSignatureProvider {
* @return signature provider.
*/
def create(name: String): LogicalPlanSignatureProvider = {
Try(Utils.classForName(name).newInstance) match {
Try(Utils.classForName(name)
.getConstructor(classOf[FingerprintBuilderFactory])
.newInstance(fbf)) match {
case Success(provider: LogicalPlanSignatureProvider) => provider
case _ =>
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,24 @@ package com.microsoft.hyperspace.index

import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

import com.microsoft.hyperspace.util.HashingUtils
import com.microsoft.hyperspace.util.fingerprint.{Fingerprint, FingerprintBuilder, FingerprintBuilderFactory}

/**
* [[PlanSignatureProvider]] provides signature for a logical plan based on
* the type of operators in it.
* A plan needs to have at least one operator so its signature can be generated.
*/
class PlanSignatureProvider extends LogicalPlanSignatureProvider {
class PlanSignatureProvider(fbf: FingerprintBuilderFactory) extends LogicalPlanSignatureProvider {

/**
* Generate the signature of logical plan.
*
* @param logicalPlan logical plan.
* @return signature if there is at least one operator in the plan; Otherwise None.
*/
def signature(logicalPlan: LogicalPlan): Option[String] = {
var signature = ""
logicalPlan.foreachUp(p => signature = HashingUtils.md5Hex(signature + p.nodeName))
signature match {
case "" => None
case _ => Some(signature)
}
def signature(logicalPlan: LogicalPlan): Option[Fingerprint] = {
val fb: FingerprintBuilder = fbf.create
logicalPlan.foreachUp(node => fb.add(node.nodeName))
Some(fb.build())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import com.microsoft.hyperspace.index.IndexLogEntryTags.{HYBRIDSCAN_RELATED_CONF
import com.microsoft.hyperspace.index.plans.logical.{BucketUnion, IndexHadoopFsRelation}
import com.microsoft.hyperspace.index.sources.FileBasedRelation
import com.microsoft.hyperspace.util.HyperspaceConf
import com.microsoft.hyperspace.util.fingerprint.Fingerprint

object RuleUtils {

Expand All @@ -54,7 +55,7 @@ object RuleUtils {
indexes: Seq[IndexLogEntry],
relation: FileBasedRelation): Seq[IndexLogEntry] = {
// Map of a signature provider to a signature generated for the given plan.
val signatureMap = mutable.Map[String, Option[String]]()
val signatureMap = mutable.Map[String, Option[Fingerprint]]()

def signatureValid(entry: IndexLogEntry): Boolean = {
entry.withCachedTag(relation.plan, IndexLogEntryTags.SIGNATURE_MATCHED) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import com.microsoft.hyperspace.HyperspaceException
import com.microsoft.hyperspace.index.{Content, FileIdTracker, Hdfs, Relation}
import com.microsoft.hyperspace.index.IndexConstants.GLOBBING_PATTERN_KEY
import com.microsoft.hyperspace.index.sources.FileBasedRelation
import com.microsoft.hyperspace.util.HashingUtils
import com.microsoft.hyperspace.util.fingerprint.{Fingerprint, FingerprintBuilder}

/**
* Implementation for file-based relation used by [[DefaultFileBasedSource]]
Expand All @@ -42,13 +42,12 @@ class DefaultFileBasedRelation(spark: SparkSession, override val plan: LogicalRe
/**
* Computes the signature of the current relation.
*/
override def signature: String = plan.relation match {
override def signature(fb: FingerprintBuilder): Option[Fingerprint] = plan.relation match {
case HadoopFsRelation(location: PartitioningAwareFileIndex, _, _, _, _, _) =>
val result = filesFromIndex(location).sortBy(_.getPath.toString).foldLeft("") {
(acc: String, f: FileStatus) =>
HashingUtils.md5Hex(acc + fingerprint(f))
}
result
val initialFingerprint = fb.build()
var fingerprint = initialFingerprint
filesFromIndex(location).foreach(fingerprint ^= createFingerprint(_, fb))
Some(fingerprint).filter(_ != initialFingerprint)
}

/**
Expand Down Expand Up @@ -179,9 +178,11 @@ class DefaultFileBasedRelation(spark: SparkSession, override val plan: LogicalRe
}
}

private def fingerprint(fileStatus: FileStatus): String = {
fileStatus.getLen.toString + fileStatus.getModificationTime.toString +
fileStatus.getPath.toString
private def createFingerprint(fileStatus: FileStatus, fb: FingerprintBuilder): Fingerprint = {
fb.add(fileStatus.getLen)
.add(fileStatus.getModificationTime)
.add(fileStatus.getPath.toString)
.build()
}

private def filesFromIndex(index: PartitioningAwareFileIndex): Seq[FileStatus] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ import org.apache.spark.sql.delta.files.TahoeLogFileIndex
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}

import com.microsoft.hyperspace.Hyperspace
import com.microsoft.hyperspace.index.{Content, FileIdTracker, FileInfo, Hdfs, IndexLogEntry, Relation}
import com.microsoft.hyperspace.index.{Content, FileIdTracker, Hdfs, IndexLogEntry, Relation}
import com.microsoft.hyperspace.index.sources.default.DefaultFileBasedRelation
import com.microsoft.hyperspace.util.PathUtils
import com.microsoft.hyperspace.util.fingerprint.{Fingerprint, FingerprintBuilder}

/**
* Implementation for file-based relation used by [[DeltaLakeFileBasedSource]]
Expand All @@ -36,9 +37,9 @@ class DeltaLakeRelation(spark: SparkSession, override val plan: LogicalRelation)
/**
* Computes the signature of the current relation.
*/
override def signature: String = plan.relation match {
override def signature(fb: FingerprintBuilder): Option[Fingerprint] = plan.relation match {
case HadoopFsRelation(location: TahoeLogFileIndex, _, _, _, _, _) =>
location.tableVersion + location.path.toString
Some(fb.add(location.tableVersion).add(location.path.toString).build())
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

package com.microsoft.hyperspace.index.sources.iceberg

import java.util.Locale

import collection.JavaConverters._
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.iceberg.{FileScanTask, Schema, Table}
Expand All @@ -28,15 +26,16 @@ import org.apache.iceberg.spark.SparkSchemaUtil
import org.apache.iceberg.spark.source.IcebergSource
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.execution.datasources.{FileIndex, HadoopFsRelation, LogicalRelation, PartitioningAwareFileIndex}
import org.apache.spark.sql.execution.datasources.{FileIndex, HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.types.StructType

import com.microsoft.hyperspace.index.{Content, FileIdTracker, FileInfo, Hdfs, IndexConstants, Relation}
import com.microsoft.hyperspace.index.{Content, FileIdTracker, Hdfs, IndexConstants, Relation}
import com.microsoft.hyperspace.index.sources.FileBasedRelation
import com.microsoft.hyperspace.util.PathUtils
import com.microsoft.hyperspace.util.fingerprint.{Fingerprint, FingerprintBuilder}

/**
* Implementation for file-based relation used by [[IcebergFileBasedSource]]
Expand All @@ -47,11 +46,11 @@ class IcebergRelation(spark: SparkSession, override val plan: DataSourceV2Relati
/**
* Computes the signature of the current relation.
*/
override def signature: String = plan.source match {
override def signature(fb: FingerprintBuilder): Option[Fingerprint] = plan.source match {
case _: IcebergSource =>
val table = loadIcebergTable
val snapshotId = plan.options.getOrElse("snapshot-id", table.currentSnapshot().snapshotId())
snapshotId + table.location()
Some(fb.add(snapshotId.toString).add(table.location).build())
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.sql.execution.datasources.{FileIndex, HadoopFsRelation,
import org.apache.spark.sql.types.StructType

import com.microsoft.hyperspace.index.{FileIdTracker, FileInfo, IndexConstants, IndexLogEntry, Relation}
import com.microsoft.hyperspace.util.fingerprint.{Fingerprint, FingerprintBuilder}

/**
* ::Experimental::
Expand Down Expand Up @@ -57,8 +58,11 @@ trait FileBasedRelation extends SourceRelation {
*
* This API is used when the signature of source needs to be computed, e.g., creating an index,
* computing query plan's signature, etc.
*
* If it is not possible to compute the signature (e.g. there are no files left),
* the implementation might return None.
*/
def signature: String
def signature(fb: FingerprintBuilder): Option[Fingerprint]

/**
* FileStatus list for all source files that the current relation references to.
Expand Down
35 changes: 0 additions & 35 deletions src/main/scala/com/microsoft/hyperspace/util/HashingUtils.scala

This file was deleted.

Loading

0 comments on commit b6d584d

Please sign in to comment.