Skip to content

Commit

Permalink
If configuration property is set to true then sort output on Z-order …
Browse files Browse the repository at this point in the history
…value.

Signed-off-by: Malte Velin <[email protected]>
  • Loading branch information
maltevelin committed Dec 28, 2024
1 parent 59b0449 commit 3644342
Showing 1 changed file with 11 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,13 @@ trait SpaceFillingCurveClustering extends MultiDimClustering {
val conf = df.sparkSession.sessionState.conf
val numRanges = conf.getConf(DeltaSQLConf.MDC_NUM_RANGE_IDS)
val addNoise = conf.getConf(DeltaSQLConf.MDC_ADD_NOISE)
val sort = conf.getConf(DeltaSQLConf.MDC_SORT_WITHIN_PARTITIONS)

val cols = colNames.map(df(_))
val mdcCol = getClusteringExpression(cols, numRanges)
val repartitionKeyColName = s"${UUID.randomUUID().toString}-rpKey1"

var repartitionedDf = if (addNoise) {
val repartitionedDf = if (addNoise) {
val randByteColName = s"${UUID.randomUUID().toString}-rpKey2"
val randByteCol = randomizationExpressionOpt.getOrElse((rand() * 255 - 128).cast(ByteType))
df.withColumn(repartitionKeyColName, mdcCol).withColumn(randByteColName, randByteCol)
Expand All @@ -90,7 +91,15 @@ trait SpaceFillingCurveClustering extends MultiDimClustering {
.repartitionByRange(approxNumPartitions, col(repartitionKeyColName))
}

repartitionedDf.drop(repartitionKeyColName)
val optionallySortedDf = if (sort) {
repartitionedDf.sortWithinPartitions(repartitionKeyColName)
}
else {
repartitionedDf
}

optionallySortedDf.drop(repartitionKeyColName)

}
}

Expand Down

0 comments on commit 3644342

Please sign in to comment.