Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #183 - Support Parsing of MultiPolygon in WKT Format #195

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ scalaVersion := "2.11.8"

crossScalaVersions := Seq("2.11.8")

sparkVersion := "2.2.0"
sparkVersion := "2.2.1"

scalacOptions += "-optimize"

Expand Down
134 changes: 97 additions & 37 deletions src/main/scala/magellan/WKTParser.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,73 +23,126 @@ import scala.collection.mutable.ListBuffer

object WKTParser {

def whitespace: P[String] = P(" ") map {_.toString}
def whitespace: P[String] = P(" ") map {
_.toString
}

val posInt: P[String] = P(CharIn('0'to'9').rep(1).!)
val posInt: P[String] = P(CharIn('0' to '9').rep(1).!)

val negInt: P[String] = P("-" ~ posInt) map {"-" + _}
val negInt: P[String] = P("-" ~ posInt) map {
"-" + _
}

val int: P[String] = P(posInt | negInt)

val float: P[String] = P(int ~ P(".") ~ posInt) map { case (x , y) => (x + "." + y)}
val float: P[String] = P(int ~ P(".") ~ posInt) map { case (x, y) => (x + "." + y) }

val number = P(float | int) map {_.toDouble}
val number = P(float | int) map {
_.toDouble
}

def point0: P[String] = P("""POINT""") map {_.toString}
def multi0: P[String] = P("""MULTI""") map {
_.toString
}

def empty0: P[String] = P("""EMPTY""") map {_.toString}
def point0: P[String] = P("""POINT""") map {
_.toString
}

def comma: P[String] = P(",") map {_.toString}
def empty0: P[String] = P("""EMPTY""") map {
_.toString
}

def leftBrace: P[String] = P("(") map {_.toString}
def comma: P[String] = P(",") map {
_.toString
}

def rightBrace: P[String] = P(")") map {_.toString}
def leftBrace: P[String] = P("(") map {
_.toString
}

def coords: P[Point] = P(number ~ whitespace ~ number) map {
def rightBrace: P[String] = P(")") map {
_.toString
}

def coords: P[Point] = P(number ~ whitespace ~ number) map {
case (x, _, y) => Point(x, y)
}

def ring: P[Array[Point]] = P(leftBrace ~ coords.rep(1, (comma ~ whitespace | comma)) ~ rightBrace) map {
case (_, x ,_) => x.toArray
case (_, x, _) => x.toArray
}

def pointCoords: P[Point] = P(leftBrace ~ coords ~ rightBrace) map {
case (_, x, _) => x
}

def point: P[Point] = P(point0 ~ whitespace.? ~ pointCoords) map {
case (_, _, p) => p
}

def point: P[Point] = P(point0 ~ whitespace.? ~ leftBrace ~ coords ~ rightBrace) map {
case (_ , _, _, p, _) => p
def multipoint: P[Array[Point]] = P(multi0 ~ point0 ~ whitespace.? ~ leftBrace ~ (pointCoords.rep(1, (comma ~ whitespace | comma)) | coords.rep(1, (comma ~ whitespace | comma))) ~ rightBrace) map {
case (_, _, _, _, p, _) => p.toArray
}

def pointEmpty: P[Shape] = P(point0 ~ whitespace ~ empty0) map {_ => NullShape}
def pointEmpty: P[Shape] = P(point0 ~ whitespace ~ empty0) map { _ => NullShape }

def linestring0: P[String] = P("""LINESTRING""") map {_.toString}
def linestring0: P[String] = P("""LINESTRING""") map {
_.toString
}

def linestring: P[PolyLine] = P(linestring0 ~ whitespace.? ~ ring) map {
case (_ , _, x) => PolyLine(Array(0), x)
case (_, _, x) => PolyLine(Array(0), x)
}

def polygon0: P[String] = P("""POLYGON""") map {_.toString}
def multilinestring: P[Array[PolyLine]] = P(multi0 ~ linestring0 ~ whitespace.? ~ leftBrace ~ ring.rep(1, (comma ~ whitespace | comma)) ~ rightBrace) map {
case (_, _, _, _, p, _) => p.map(points => PolyLine(Array(0), points)).toArray
}

def polygonWithoutHoles: P[Polygon] =
P(polygon0 ~ whitespace.? ~ P("((") ~ coords.rep(1, (comma ~ whitespace | comma)) ~ P("))")) map {
case (_ , _, x ) => Polygon(Array(0), x.toArray)
def polygon0: P[String] = P("""POLYGON""") map {
_.toString
}

def polygonWithHoles: P[Polygon] =
P(polygon0 ~ whitespace.? ~ P("(") ~ ring.rep(1, (comma ~ whitespace | comma)) ~ P(")")) map {
case (_ , _, x) =>
val indices = ListBuffer[Int]()
val points = ListBuffer[Point]()
var prev = 0
var i = 0
val numRings = x.size
while (i < numRings) {
indices.+= (prev)
prev += x(i).length
points.++=(x(i))
i += 1
}
Polygon(indices.toArray, points.toArray)
def polygonWithoutHoles: P[Polygon] = polygon


def polygonWithHoles: P[Polygon] = polygon

def polygon: P[Polygon] =
P(polygon0 ~ whitespace.? ~ polygonCoords) map {
case (_, _, x) => x
}

def polygonCoords: P[Polygon] =
P(P("(") ~ ring.rep(1, (comma ~ whitespace | comma)) ~ P(")")) map {
case (x) =>
val indices = ListBuffer[Int]()
val points = ListBuffer[Point]()
var prev = 0
var i = 0
val numRings = x.size
while (i < numRings) {
indices.+=(prev)
prev += x(i).length
points.++=(x(i))
i += 1
}
Polygon(indices.toArray, points.toArray)
}


def multipolygon: P[Array[Polygon]] = P(multi0 ~ polygon0 ~ whitespace.? ~ leftBrace ~ polygonCoords.rep(1, (comma ~ whitespace | comma)) ~ rightBrace) map {
case (_, _, _, _, p, _) => p.toArray
}

def expr: P[Shape] = P(point | pointEmpty | linestring | polygon ~ End)

def singleShapeArray: P[Array[Shape]] = P(point | pointEmpty | linestring | polygon) map {
case (p) => Array(p)
}

def expr: P[Shape] = P(point | pointEmpty | linestring | polygonWithoutHoles | polygonWithHoles ~ End)

def exprArray: P[Array[_ <: Shape]] = P(singleShapeArray | multipoint | multilinestring | multipolygon ~ End)

def parseAll(text: String): Shape = {
expr.parse(text) match {
Expand All @@ -98,4 +151,11 @@ object WKTParser {
}
}

def parseAllArray(text: String): Array[_ <: Shape] = {
exprArray.parse(text) match {
case Success(value, _) => value
case Failure(parser, index, stack) => throw new RuntimeException(stack.toString)
}
}

}
4 changes: 3 additions & 1 deletion src/main/scala/magellan/dsl/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,13 @@ package object dsl {

def withinRange(origin: Point, radius: Double): Column = Column(WithinCircleRange(c.expr, origin, radius))
}

implicit def point(x: Column, y: Column) = Column(PointConverter(x.expr, y.expr))

implicit def wkt(x: Column) = Column(WKT(x.expr))

implicit def wktArray(x: Column) = Column(WKTArray(x.expr))

implicit class DslDataset[T](c: Dataset[T]) {
def df: Dataset[T] = c

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.expressions

import magellan.catalyst.MagellanExpression
import magellan.index.{ZOrderCurve, ZOrderCurveIndexer}
import magellan.{Point, Relate, Shape}
import magellan.{Point, Relate, Shape, WKTParser}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.util.GenericArrayData
Expand Down Expand Up @@ -156,6 +156,97 @@ case class WKT(override val child: Expression)

}

/**
* Extracts shapes from WKT Text.
*
* @param child
*/
case class WKTArray(override val child: Expression)
extends UnaryExpression with MagellanExpression {

private val pointUDT = new PointUDT()
private val lineUDT = new LineUDT()
private val polyLineUDT = new PolyLineUDT()
private val polygonUDT = new PolygonUDT()

override protected def nullSafeEval(input: Any): Any = {

val text = input.asInstanceOf[UTF8String]
val shapes = WKTParser.parseAllArray(text.toString)
new GenericArrayData(
shapes.map(s => {
val (udt: UserDefinedType[Shape], indexVar) = s.getType() match {
case 1 => (pointUDT, 0)
case 2 => (lineUDT, 1)
case 3 => (polyLineUDT, 1)
case 5 => (polygonUDT, 2)
}

val row = new GenericInternalRow(3)
row.update(indexVar, udt.serialize(s))
row
})
)
}

override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val serializersVar = ctx.freshName("serializers")

ctx.addMutableState(classOf[java.util.HashMap[Integer, UserDefinedType[Shape]]].getName, s"$serializersVar",
s"$serializersVar = new java.util.HashMap<Integer, org.apache.spark.sql.types.UserDefinedType<magellan.Shape>>() ;" +
s"$serializersVar.put(1, new org.apache.spark.sql.types.PointUDT());" +
s"$serializersVar.put(2, new org.apache.spark.sql.types.LineUDT());" +
s"$serializersVar.put(3, new org.apache.spark.sql.types.PolyLineUDT());" +
s"$serializersVar.put(5, new org.apache.spark.sql.types.PolygonUDT());" +
"")

val childTypeVar = ctx.freshName("childType")
val childShapeVar = ctx.freshName("childShape")
val serializerVar = ctx.freshName("serializer")

val indexVar = ctx.freshName("index")
val resultVar = ctx.freshName("result")

val j = ctx.freshName("j")
val n = ctx.freshName("n")
val values = ctx.freshName("values")
val arrayClass = classOf[GenericArrayData].getName
val internalRowClass = classOf[InternalRow].getName
val genericInternalRowClass = classOf[GenericInternalRow].getName

nullSafeCodeGen(ctx, ev, (c1) => {
s"" +
s"String text = ${c1}.toString();\n" +
s"magellan.Shape[] $childShapeVar = (magellan.Shape[]) " +
s"magellan.WKTParser.parseAllArray(text);\n" +
s"final int $n = $childShapeVar.length;\n" +
s"final $internalRowClass[] $values = new $internalRowClass[$n];\n" +
s"for (int $j = 0; $j < $n; $j++) {\n" +
s"Integer $childTypeVar = $childShapeVar[$j].getType();\n" +
s"org.apache.spark.sql.types.UserDefinedType<magellan.Shape> $serializerVar =" +
s" (org.apache.spark.sql.types.UserDefinedType<magellan.Shape>) $serializersVar.get($childTypeVar);\n" +
s"Integer $indexVar = -1; \n" +
s"if ($childTypeVar == 1) {$indexVar = 0;}\n" +
s"else if ($childTypeVar == 2 || $childTypeVar == 3) {$indexVar = 1;} \n" +
s"else {$indexVar = 2;} \n" +
s"$genericInternalRowClass $resultVar = new $genericInternalRowClass(3);\n" +
s"$resultVar.update($indexVar, $serializerVar.serialize($childShapeVar[$j])); \n" +
s"$values[$j] = $resultVar;\n" +
s"}\n" +
s"${ev.value} = new $arrayClass($values); \n"
})
}

override def dataType: DataType = {
ArrayType(
StructType(List(StructField("point", new PointUDT(), true),
StructField("polyline", new PolyLineUDT(), true),
StructField("polygon", new PolygonUDT(), true))
))
}

}

/**
* Geohash Indexes a given shape expression to a specified precision.
*
Expand Down
Loading