Skip to content

Commit

Permalink
Feature/gdal datasources (#315)
Browse files Browse the repository at this point in the history
* Added custom grid

* Fixed missing IndexID

* Changed index system parameter type

* Fixed snapshot version in python

* Passing IndexSystem instead of IndexSystemName

* Updated python path loader

* Fixed CustomIndexSystem polyfill for empty polygon

* Fixed mosaic context h3

* Fixed mosaic unite tests for custom grid

* Added K-ring and K-loop for custom grid

* Fixed cell center offset for custom grid

* Fixed lon lat order in custom grid

* Fixed grid offset for cell geometries

* Fixed KNN tests for custom grid

* Fixed buffer radius custom grid

* Reduced KNN iterations to speed up tests

* Fixed python test

* Fixed python import

* Changed custom grid conf structure

* Fixed unit tests for custom grid

* Refactored IndexSystemFactory

* Fixed unit test for KNN

* Added custom grid for python

* Fixed R bindings for custom grid

* Updated docs for custom grid

* Updated docs for custom grid

* Fix KNN tests.

* Fixed index Y splits

* Fixed index KNN resolution

* Fixed double build for feature branch

* Fixed grid root cell size calculation

* Fixed Ring Neighbours tests

* Change Raster to Grid reader to use Centroid of raster pixel if pixel area >= cell area.

* Fix variable naming issue.

* Add grid_distance expression.
Improve metadata extraction for rasters.
Improve raster subdataset extraction.

* Fix broken tests.

* Fix band_id being dropped in raster to grid reader.

* Fix broken k interpolation.

* Improve code coverage.

* Fix merge conflicts.

* Merge main and fix conflicts.

* Merge custom grid and fix conflicts.
Fix grid distance functions.
Fix grid distance tests.

* Fix Kloop broken test.

* Defaulted pixel reference to centroid

---------

Co-authored-by: Erni Durdevic <[email protected]>
  • Loading branch information
Milos Colic and edurdevic authored Mar 20, 2023
1 parent 74a55e9 commit 2e23279
Show file tree
Hide file tree
Showing 20 changed files with 299 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -412,21 +412,6 @@ object BNGIndexSystem extends IndexSystem(StringType) with Serializable {
}
}

private def encode(eLetter: Int, nLetter: Int, eBin: Int, nBin: Int, quadrant: Int, nPositions: Int, resolution: Int): Long = {
val idPlaceholder = math.pow(10, 5 + 2 * nPositions - 2) // 1(##)(##)(#...#)(#...#)(#)
val eLetterShift = math.pow(10, 3 + 2 * nPositions - 2) // (##)(##)(#...#)(#...#)(#)
val nLetterShift = math.pow(10, 1 + 2 * nPositions - 2) // (##)(#...#)(#...#)(#)
val eShift = math.pow(10, nPositions) // (#...#)(#...#)(#)
val nShift = 10
val id =
if (resolution == -1) {
(idPlaceholder + eLetter * eLetterShift) / 100 + quadrant
} else {
idPlaceholder + eLetter * eLetterShift + nLetter * nLetterShift + eBin * eShift + nBin * nShift + quadrant
}
id.toLong
}

/**
* Constructs a geometry representing the index tile corresponding to
* provided index id.
Expand Down Expand Up @@ -526,4 +511,33 @@ object BNGIndexSystem extends IndexSystem(StringType) with Serializable {

override def getResolutionStr(resolution: Int): String = resolutionMap.find(_._2 == resolution).map(_._1).getOrElse("")

override def distance(cellId: Long, cellId2: Long): Long = {
val digits1 = indexDigits(cellId)
val digits2 = indexDigits(cellId2)
val resolution1 = getResolution(digits1)
val resolution2 = getResolution(digits2)
val edgeSize = getEdgeSize(math.min(resolution1, resolution2))
val x1 = getX(digits1, edgeSize)
val x2 = getX(digits2, edgeSize)
val y1 = getY(digits1, edgeSize)
val y2 = getY(digits2, edgeSize)
// Manhattan distance with edge size precision
math.abs((x1 - x2) / edgeSize) + math.abs((y1 - y2) / edgeSize)
}

private def encode(eLetter: Int, nLetter: Int, eBin: Int, nBin: Int, quadrant: Int, nPositions: Int, resolution: Int): Long = {
val idPlaceholder = math.pow(10, 5 + 2 * nPositions - 2) // 1(##)(##)(#...#)(#...#)(#)
val eLetterShift = math.pow(10, 3 + 2 * nPositions - 2) // (##)(##)(#...#)(#...#)(#)
val nLetterShift = math.pow(10, 1 + 2 * nPositions - 2) // (##)(#...#)(#...#)(#)
val eShift = math.pow(10, nPositions) // (#...#)(#...#)(#)
val nShift = 10
val id =
if (resolution == -1) {
(idPlaceholder + eLetter * eLetterShift) / 100 + quadrant
} else {
idPlaceholder + eLetter * eLetterShift + nLetter * nLetterShift + eBin * eShift + nBin * nShift + quadrant
}
id.toLong
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ case class CustomIndexSystem(conf: GridConf) extends IndexSystem(LongType) with
* @return
* An instance of [[MosaicGeometry]] corresponding to index.
*/
//noinspection DuplicatedCode
// noinspection DuplicatedCode
override def indexToGeometry(index: Long, geometryAPI: GeometryAPI): MosaicGeometry = {

val cellNumber = getCellPosition(index)
Expand Down Expand Up @@ -287,6 +287,20 @@ case class CustomIndexSystem(conf: GridConf) extends IndexSystem(LongType) with
conf.rootCellCountY * Math.pow(conf.cellSplits, resolution).toLong
}

override def distance(cellId: Long, cellId2: Long): Long = {
val resolution1 = getCellResolution(cellId)
val resolution2 = getCellResolution(cellId2)
val edgeSizeX = getCellWidth(resolution1)
val edgeSizeY = getCellHeight(resolution1)
val x1 = getCellCenterX(getCellPositionX(cellId, resolution1), resolution1)
val x2 = getCellCenterX(getCellPositionX(cellId2, resolution2), resolution2)
val y1 = getCellCenterY(getCellPositionY(cellId, resolution1), resolution1)
val y2 = getCellCenterY(getCellPositionY(cellId2, resolution2), resolution2)
// Manhattan distance with edge size precision
val distance = math.abs((x1 - x2) / edgeSizeX) + math.abs((y1 - y2) / edgeSizeY)
distance.toLong
}

private def getCellCenterX(cellPositionX: Long, resolution: Int) = {
val cellWidth = getCellWidth(resolution)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ object H3IndexSystem extends IndexSystem(LongType) with Serializable {
override def kLoop(index: Long, n: Int): Seq[Long] = {
// HexRing crashes in case of pentagons.
// Ensure a KRing fallback in said case.
require(index >= 0L)
Try(
h3.hexRing(index, n).asScala.map(_.toLong)
).getOrElse(
Expand Down Expand Up @@ -215,4 +216,6 @@ object H3IndexSystem extends IndexSystem(LongType) with Serializable {
h3.geoToH3(geo.lat, geo.lng, h3.h3GetResolution(id))
}

override def distance(cellId: Long, cellId2: Long): Long = h3.h3Distance(cellId, cellId2).toLong

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import org.apache.spark.unsafe.types.UTF8String
*/
abstract class IndexSystem(var cellIdType: DataType) extends Serializable {

def distance(cellId: Long, cellId2: Long): Long

def getCellIdDataType: DataType = cellIdType

def setCellIdDataType(dataType: DataType): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import org.locationtech.proj4j.CRSFactory
import java.io.File
import java.nio.file.{Files, Paths}
import java.nio.file.StandardCopyOption.REPLACE_EXISTING
import java.util.Locale
import scala.collection.JavaConverters.dictionaryAsScalaMapConverter
import scala.util.Try

Expand All @@ -19,21 +20,37 @@ case class MosaicRasterGDAL(raster: Dataset, path: String, memSize: Long) extend

val crsFactory: CRSFactory = new CRSFactory

override def metadata: Map[String, String] =
Option(raster.GetMetadata_Dict)
.map(_.asScala.toMap.asInstanceOf[Map[String, String]])
override def metadata: Map[String, String] = {
Option(raster.GetMetadataDomainList())
.map(_.toArray)
.map(domain =>
domain
.map(domainName =>
Option(raster.GetMetadata_Dict(domainName.toString))
.map(_.asScala.toMap.asInstanceOf[Map[String, String]])
.getOrElse(Map.empty[String, String])
)
.reduceOption(_ ++ _).getOrElse(Map.empty[String, String])
)
.getOrElse(Map.empty[String, String])

override def subdatasets: Map[String, String] =
Option(raster.GetMetadata_List("SUBDATASETS"))
.map(
_.toArray
.map(_.toString)
.grouped(2)
.map({ case Array(p, d) => (p.split("=").last, d.split("=").last) })
.toMap
)
}

override def subdatasets: Map[String, String] = {
val subdatasetsMap = Option(raster.GetMetadata_Dict("SUBDATASETS"))
.map(_.asScala.toMap.asInstanceOf[Map[String, String]])
.getOrElse(Map.empty[String, String])
val keys = subdatasetsMap.keySet
keys.flatMap(key =>
if (key.toUpperCase(Locale.ROOT).contains("NAME")) {
val path = subdatasetsMap(key)
Seq(
key -> path.split(":").last,
path.split(":").last -> path
)
} else Seq(key -> subdatasetsMap(key))
).toMap
}

override def SRID: Int = {
Try(crsFactory.readEpsgFromParameters(proj4String))
Expand Down Expand Up @@ -82,7 +99,10 @@ case class MosaicRasterGDAL(raster: Dataset, path: String, memSize: Long) extend

def spatialRef: SpatialReference = raster.GetSpatialRef()

override def cleanUp(): Unit = {/** Nothing to clean up = NOOP */}
override def cleanUp(): Unit = {

/** Nothing to clean up = NOOP */
}

override def transformBands[T](f: MosaicRasterBand => T): Seq[T] = for (i <- 1 to numBands) yield f(getBand(i))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,16 @@ class RasterAsGridReader(sparkSession: SparkSession) extends MosaicDataFrameRead
col("raster")
)
.select(
explode(col("grid_measures")).alias("grid_measures"),
col("raster")
col("raster"),
col("band_id"),
explode(col("grid_measures")).alias("grid_measures")
)
.select(
col("band_id"),
col("grid_measures").getItem("cellID").alias("cell_id"),
col("grid_measures").getItem("measure").alias("measure")
)
.groupBy("cell_id")
.groupBy("band_id", "cell_id")
.agg(avg("measure").alias("measure"))

kRingResample(loadedDf, config)
Expand Down Expand Up @@ -170,8 +172,8 @@ class RasterAsGridReader(sparkSession: SparkSession) extends MosaicDataFrameRead
rasterDf
.withColumn("origin_cell_id", col("cell_id"))
.withColumn("cell_id", explode(grid_cellkring(col("origin_cell_id"), k)))
.withColumn("weight", lit(k + 1) - expr("h3_distance(origin_cell_id, cell_id)"))
.groupBy("cell_id")
.withColumn("weight", lit(k + 1) - grid_distance(col("origin_cell_id"), col("cell_id)")))
.groupBy("band_id", "cell_id")
.agg(weighted_sum("measure", "weight"))
} else {
rasterDf
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package com.databricks.labs.mosaic.expressions.index

import com.databricks.labs.mosaic.core.geometry.api.GeometryAPI
import com.databricks.labs.mosaic.core.index.IndexSystem
import org.apache.spark.sql.catalyst.expressions.{BinaryExpression, Expression, ExpressionInfo, NullIntolerant}
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.types._

case class GridDistance(cellId: Expression, cellId2: Expression, indexSystem: IndexSystem, geometryAPIName: String)
extends BinaryExpression
with NullIntolerant
with CodegenFallback {

val geometryAPI: GeometryAPI = GeometryAPI(geometryAPIName)

override def right: Expression = cellId2

override def left: Expression = cellId

/** Expression output DataType. */
override def dataType: DataType = LongType

override def toString: String = s"grid_distance($cellId, $cellId2)"

/** Overridden to ensure [[Expression.sql]] is properly formatted. */
override def prettyName: String = "grid_distance"

/**
* Generates a set of indices corresponding to kring call over the input
* cell id.
*
* @param input1
* Any instance containing the cell id.
* @param input2
* Any instance containing the k.
* @return
* A set of indices.
*/
// noinspection DuplicatedCode
override def nullSafeEval(input1: Any, input2: Any): Any = {
val cellId = indexSystem.formatCellId(input1, LongType).asInstanceOf[Long]
val cellId2 = indexSystem.formatCellId(input2, LongType).asInstanceOf[Long]
indexSystem.distance(cellId, cellId2)
}

override def makeCopy(newArgs: Array[AnyRef]): Expression = {
val asArray = newArgs.take(2).map(_.asInstanceOf[Expression])
val res = GridDistance(asArray(0), asArray(1), indexSystem, geometryAPIName)
res.copyTagsFrom(this)
res
}

override protected def withNewChildrenInternal(newLeft: Expression, newRight: Expression): Expression =
makeCopy(Array[AnyRef](newLeft, newRight))

}

object GridDistance {

/** Entry to use in the function registry. */
def registryExpressionInfo(db: Option[String]): ExpressionInfo =
new ExpressionInfo(
classOf[GridDistance].getCanonicalName,
db.orNull,
"grid_distance",
"_FUNC_(cellId, cellId2) - Returns k distance for given cells.",
"",
"""
| Examples:
| > SELECT _FUNC_(a, b);
| 10
| """.stripMargin,
"",
"collection_funcs",
"1.0",
"",
"built-in"
)

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.databricks.labs.mosaic.expressions.raster.base

import com.databricks.labs.mosaic.core.geometry.api.GeometryAPI
import com.databricks.labs.mosaic.core.index.{IndexSystem, IndexSystemFactory}
import com.databricks.labs.mosaic.core.raster.{MosaicRaster, MosaicRasterBand}
import com.databricks.labs.mosaic.expressions.raster.RasterToGridType
Expand Down Expand Up @@ -39,6 +40,7 @@ abstract class RasterToGridExpression[T <: Expression: ClassTag, P](

/** The index system to be used. */
val indexSystem: IndexSystem = IndexSystemFactory.getIndexSystem(expressionConfig.getIndexSystem)
val geometryAPI: GeometryAPI = GeometryAPI(expressionConfig.getGeometryAPI)

/**
* It projects the pixels to the grid and groups by the results so that the
Expand Down Expand Up @@ -80,8 +82,11 @@ abstract class RasterToGridExpression[T <: Expression: ClassTag, P](
def valuesCombiner(values: Seq[Double]): P

private def pixelTransformer(gt: Seq[Double], resolution: Int)(x: Int, y: Int, value: Double): (Long, Double) = {
val xGeo = gt(0) + x * gt(1) + y * gt(2)
val yGeo = gt(3) + x * gt(4) + y * gt(5)
val offset = 0.5 // This centers the point to the pixel centroid
val xOffset = offset + x
val yOffset = offset + y
val xGeo = gt(0) + xOffset * gt(1) + yOffset * gt(2)
val yGeo = gt(3) + xOffset * gt(4) + yOffset * gt(5)
val cellID = indexSystem.pointToIndex(xGeo, yGeo, resolution)
(cellID, value)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class MosaicContext(indexSystem: IndexSystem, geometryAPI: GeometryAPI, rasterAP
aliasFunction(registry, "grid_longlatascellid", dbName, "h3_longlatash3", None)
aliasFunction(registry, "grid_polyfill", dbName, "h3_polyfillash3", None)
aliasFunction(registry, "grid_boundaryaswkb", dbName, "h3_boundaryaswkb", None)
aliasFunction(registry, "grid_distance", dbName, "h3_distance", None)
}

def aliasFunction(
Expand Down Expand Up @@ -336,6 +337,12 @@ class MosaicContext(indexSystem: IndexSystem, geometryAPI: GeometryAPI, rasterAP
IndexGeometry.registryExpressionInfo(database),
(exprs: Seq[Expression]) => IndexGeometry(exprs(0), Literal("WKB"), indexSystem, geometryAPI.name)
)

registry.registerFunction(
FunctionIdentifier("grid_distance", database),
GridDistance.registryExpressionInfo(database),
(exprs: Seq[Expression]) => GridDistance(exprs(0), exprs(1), indexSystem, geometryAPI.name)
)
}

registry.registerFunction(
Expand Down Expand Up @@ -624,6 +631,7 @@ class MosaicContext(indexSystem: IndexSystem, geometryAPI: GeometryAPI, rasterAP
/** IndexSystem Specific */

/** IndexSystem and GeometryAPI Specific methods */
def grid_distance(cell1: Column, cell2: Column): Column = ColumnAdapter(GridDistance(cell1.expr, cell2.expr, indexSystem, geometryAPI.name))
def grid_tessellateexplode(geom: Column, resolution: Column): Column = grid_tessellateexplode(geom, resolution, lit(true))
def grid_tessellateexplode(geom: Column, resolution: Int): Column = grid_tessellateexplode(geom, lit(resolution), lit(true))
def grid_tessellateexplode(geom: Column, resolution: Int, keepCoreGeometries: Boolean): Column =
Expand Down
Loading

0 comments on commit 2e23279

Please sign in to comment.