From 14012235c203a2256c8a208fa2c1e9b0c8e3af10 Mon Sep 17 00:00:00 2001 From: stevebuckingham Date: Fri, 5 May 2017 14:11:40 +0100 Subject: [PATCH 1/5] Update Python classes for Point, Polygon and PolyLine to work with up to date Scala classes. --- python/magellan/types.py | 525 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 514 insertions(+), 11 deletions(-) diff --git a/python/magellan/types.py b/python/magellan/types.py index ef3e3ff..e28ba30 100644 --- a/python/magellan/types.py +++ b/python/magellan/types.py @@ -20,7 +20,7 @@ from itertools import izip from pyspark import SparkContext -from pyspark.sql.types import DataType, UserDefinedType, StructField, StructType, \ +from pyspark.sql.types import DataType, UserDefinedType, Row, StructField, StructType, \ ArrayType, DoubleType, IntegerType __all__ = ['Point'] @@ -69,14 +69,16 @@ def scalaUDT(cls): """ The class name of the paired Scala UDT. """ - return "magellan.PointUDT" + return "org.apache.spark.sql.types.PointUDT" def serialize(self, obj): """ Converts the a user-type object into a SQL datum. """ if isinstance(obj, Point): - return obj + #pnt = Row(IntegerType(), DoubleType(),DoubleType(),DoubleType(),DoubleType(),DoubleType(),DoubleType() ) + #return pnt("udt",obj.x,obj.y,obj.x,obj.y,obj.x,obj.y) + return 1,obj.x, obj.y,obj.x, obj.y,obj.x, obj.y else: raise TypeError("cannot serialize %r of type %r" % (obj, type(obj))) @@ -87,9 +89,9 @@ def deserialize(self, datum): if isinstance(datum, Point): return datum else: - assert len(datum) == 2, \ - "PointUDT.deserialize given row with length %d but requires 2" % len(datum) - return Point(datum[0], datum[1]) + assert len(datum) == 7, \ + "PointUDT.deserialize given row with length %d but requires 7" % len(datum) + return Point(datum[5], datum[6]) def simpleString(self): return 'point' @@ -173,14 +175,19 @@ def scalaUDT(cls): """ The class name of the paired Scala UDT. """ - return "magellan.PolygonUDT" + return "org.apache.spark.sql.types.PolygonUDT" def serialize(self, obj): """ Converts the a user-type object into a SQL datum. """ if isinstance(obj, Polygon): - return obj + x_list = [] + y_list = [] + for p in obj.points: + x_list.append(p.x) + y_list.append(p.y) + return 5, min(x_list), min(y_list), max(x_list), max(y_list), obj.indices, x_list, y_list else: raise TypeError("cannot serialize %r of type %r" % (obj, type(obj))) @@ -190,10 +197,455 @@ def deserialize(self, datum): """ if isinstance(datum, Polygon): return datum + else: + assert len(datum) == 8, \ + "PolygonUDT.deserialize given row with length %d but requires 2" % len(datum) + return Polygon(datum[5], datum[6], datum[7], [self.pointUDT.deserialize(point) for point in zip(datum[6],datum[7])]) + + def simpleString(self): + return 'polygon' + + @classmethod + def fromJson(cls, json): + indices = json["indices"] + points = [PointUDT.fromJson(point) for point in json["points"]] + return Polygon(indices, points) + + +class Polygon(Shape): + """ + A polygon consists of one or more rings. A ring is a connected sequence of four or more points + that form a closed, non-self-intersecting loop. A polygon may contain multiple outer rings. + The order of vertices or orientation for a ring indicates which side of the ring is the interior + of the polygon. The neighborhood to the right of an observer walking along the ring + in vertex order is the neighborhood inside the polygon. + Vertices of rings defining holes in polygons are in a counterclockwise direction. + Vertices for a single, ringed polygon are, therefore, always in clockwise order. + The rings of a polygon are referred to as its parts. + >>> v = Polygon([0], [Point(1.0, 1.0), Point(1.0, -1.0), Point(1.0, 1.0)) + Point([-1.0,-1.0, 1.0, 1.0], [0], Point(1.0, 1.0), Point(1.0, -1.0), Point(1.0, 1.0)) + """ + + __UDT__ = PolygonUDT() + + def __init__(self, indices = [], points = []): + self.indices = indices + self.xcoordinates = [p.x for p in points] + self.ycoordinates = [p.y for p in points] + if points: + self.xmin = min(self.xcoordinates) + self.ymin = min(self.ycoordinates) + self.xmax = max(self.xcoordinates) + self.ymax = max(self.ycoordinates) + else: + self.xmin = None + self.ymin = None + self.xmax = None + self.ymax = None + self.boundingBox = BoundingBox(self.xmin, self.ymin, self.xmax, self.ymax) + self.size = len(points) + self.points = points + + def __str__(self): + inds = "[" + ",".join([str(i) for i in self.indices]) + "]" + pts = "[" + ",".join([str(v) for v in self.points]) + "]" + return "Polygon (" + ",".join((inds, pts)) + ")" + + def __repr__(self): + return self.__str__() + + def __reduce__(self): + return (Polygon, (self.indices, self.points)) + + @classmethod + def fromJson(cls, json): + indices = json["indices"] + points = [PointUDT.fromJson(point) for point in json["points"]] + return Polygon(indices, points) + + def jsonValue(self): + return {"type": "udt", + "pyClass": "magellan.types.PolygonUDT", + "class": "magellan.Polygon", + "sqlType": "magellan.Polygon"} + + def convert(self): + l = [] + l.extend(self.indices) + l.append(len(self.points)) + p = [] + for i,j in zip(l, l[1:]): + spoints = [(point.x, point.y) for point in self.points[i:j - 1]] + p.append(spoints) + + shell = p[0] + holes = p[1:] + return SPolygon(shell=shell, holes=holes) + + +class PolyLineUDT(UserDefinedType): + """User-defined type (UDT). + + .. note:: WARN: SpatialSDK Internal Use Only + """ + + pointUDT = PointUDT() + + @classmethod + def sqlType(cls): + """ + Underlying SQL storage type for this UDT. + """ + return PolyLine() + + @classmethod + def module(cls): + """ + The Python module of the UDT. + """ + return "magellan.types" + + @classmethod + def scalaUDT(cls): + """ + The class name of the paired Scala UDT. + """ + return "org.apache.spark.sql.types.PolyLineUDT" + + def serialize(self, obj): + """ + Converts the a user-type object into a SQL datum. + """ + if isinstance(obj, PolyLine): + x_list = [] + y_list = [] + for p in obj.points: + x_list.append(p.x) + y_list.append(p.y) + return 3, min(x_list), min(y_list), max(x_list), max(y_list), obj.indices, x_list, y_list + else: + raise TypeError("cannot serialize %r of type %r" % (obj, type(obj))) + + def deserialize(self, datum): + """ + Converts a SQL datum into a user-type object. + """ + if isinstance(datum, PolyLine): + return datum else: assert len(datum) == 2, \ + "PolyLineUDT.deserialize given row with length %d but requires 2" % len(datum) + return PolyLine(datum[0], [self.pointUDT.deserialize(point) for point in datum[1]]) + + def simpleString(self): + return 'polyline' + + @classmethod + def fromJson(cls, json): + indices = json["indices"] + points = [PointUDT.fromJson(point) for point in json["points"]] + return PolyLine(indices, points) + + +class PolyLine(Shape): + """ + A PolyLine is an ordered set of vertices that consists of one or more parts. + A part is a connected sequence of two or more points. + Parts may or may not be connected to one another. + Parts may or may not intersect one another + >>> v = PolyLine([0], [Point(1.0, 1.0), Point(1.0, -1.0), Point(1.0, 0.0)) + Point([0], Point(1.0, 1.0), Point(1.0, -1.0), Point(1.0, 0.0)) + """ + + __UDT__ = PolyLineUDT() + + def __init__(self, indices = [], points = []): + self.indices = indices + self.xcoordinates = [p.x for p in points] + self.ycoordinates = [p.y for p in points] + if points: + self.xmin = min(self.xcoordinates) + self.ymin = min(self.ycoordinates) + self.xmax = max(self.xcoordinates) + self.ymax = max(self.ycoordinates) + else: + self.xmin = None + self.ymin = None + self.xmax = None + self.ymax = None + self.boundingBox = BoundingBox(self.xmin, self.ymin, self.xmax, self.ymax) + self.size = len(points) + self.points = points + + def __str__(self): + inds = "[" + ",".join([str(i) for i in self.indices]) + "]" + pts = "[" + ",".join([str(v) for v in self.points]) + "]" + return "Polygon (" + ",".join((inds, pts)) + ")" + + def __repr__(self): + return self.__str__() + + def __reduce__(self): + return (PolyLine, (self.indices, self.points)) + + @classmethod + def fromJson(cls, json): + indices = json["indices"] + points = [PointUDT.fromJson(point) for point in json["points"]] + return PolyLine(indices, points) + + def jsonValue(self): + return {"type": "udt", + "pyClass": "magellan.types.PolyLineUDT", + "class": "magellan.PolyLine", + "sqlType": "magellan.PolyLine"} + + def convert(self): + l = [] + l.extend(self.indices) + l.append(len(self.points)) + p = [] + for i,j in zip(l, l[1:]): + spoints = [(point.x, point.y) for point in self.points[i:j - 1]] + p.append(LineString(spoints)) + return MultiLineString(p) + + +def _inbound_shape_converter(json_string): + j = json.loads(json_string) + shapeType = str(j["pyClass"]) # convert unicode to str + split = shapeType.rfind(".") + module = shapeType[:split] + shapeClass = shapeType[split+1:] + m = __import__(module, globals(), locals(), [shapeClass]) + UDT = getattr(m, shapeClass) + return UDT.fromJson(j) + +# This is used to unpickle a Row from JVM +def _create_row_inbound_converter(dataType): + return lambda *a: dataType.fromInternal(a) + +class BoundingBox(object): + + def __init__(self,xmin,ymin,xmax,ymax): + self.xmin = xmin + self.ymin = ymin + self.xmax = xmax + self.ymax = ymax + + def intersects(self, other): + if not other.xmin >= self.xmax and other.ymax >= self.min and other.ymax <= self.ymin and other.xmax <= self.xmin: + return True + else: + return False + +if __name__ == '__main__': + print Point(1,1) + print PointUDT(1,1) + +# +# Copyright 2015 Ram Sriharsha +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import json +import sys + +from itertools import izip + +from pyspark import SparkContext +from pyspark.sql.types import DataType, UserDefinedType, Row, StructField, StructType, \ + ArrayType, DoubleType, IntegerType + +__all__ = ['Point'] + +try: + from shapely.geometry import Point as SPoint + from shapely.geometry import Polygon as SPolygon + from shapely.geometry import LineString, MultiLineString + _have_shapely = True +except: + # No Shapely in environment, but that's okay + _have_shapely = False + + +class Shape(DataType): + + def convert(self): + raise NotImplementedError() + + def toShapely(self): + if _have_shapely: + return self.convert() + else: + raise TypeError("Cannot convert to Shapely type") + + +class PointUDT(UserDefinedType): + """User-defined type (UDT). + + .. note:: WARN: SpatialSDK Internal Use Only + """ + + @classmethod + def sqlType(cls): + return Point() + + @classmethod + def module(cls): + """ + The Python module of the UDT. + """ + return "magellan.types" + + @classmethod + def scalaUDT(cls): + """ + The class name of the paired Scala UDT. + """ + return "org.apache.spark.sql.types.PointUDT" + + def serialize(self, obj): + """ + Converts the a user-type object into a SQL datum. + """ + if isinstance(obj, Point): + #pnt = Row(IntegerType(), DoubleType(),DoubleType(),DoubleType(),DoubleType(),DoubleType(),DoubleType() ) + #return pnt("udt",obj.x,obj.y,obj.x,obj.y,obj.x,obj.y) + return 1,obj.x, obj.y,obj.x, obj.y,obj.x, obj.y + else: + raise TypeError("cannot serialize %r of type %r" % (obj, type(obj))) + + def deserialize(self, datum): + """ + Converts a SQL datum into a user-type object. + """ + if isinstance(datum, Point): + return datum + else: + assert len(datum) == 7, \ + "PointUDT.deserialize given row with length %d but requires 7" % len(datum) + return Point(datum[5], datum[6]) + + def simpleString(self): + return 'point' + + @classmethod + def fromJson(cls, json): + return Point(json['x'], json['y']) + + +class Point(Shape): + """ + A point is a zero dimensional shape. + The coordinates of a point can be in linear units such as feet or meters, + or they can be in angular units such as degrees or radians. + The associated spatial reference specifies the units of the coordinates. + In the case of a geographic coordinate system, the x-coordinate is the longitude + and the y-coordinate is the latitude. + + >>> v = Point(1.0, 2.0) + Point([1.0, 2.0]) + """ + + __UDT__ = PointUDT() + + def __init__(self, x = 0.0, y = 0.0): + self.x = x + self.y = y + + def __str__(self): + return "Point (" + str(self.x) + "," + str(self.y) + ")" + + def __repr__(self): + return self.__str__() + + def __unicode__(self): + return self.__str__() + + def __reduce__(self): + return (Point, (self.x, self.y)) + + def __eq__(self, other): + return isinstance(other, Point) and self.x == other.x and self.y == other.y + + @classmethod + def fromJson(cls, json): + return Point(json['x'], json['y']) + + def jsonValue(self): + return {"type": "udt", + "pyClass": "magellan.types.PointUDT", + "class": "magellan.PointUDT", + "sqlType": "magellan.Point"} + + def convert(self): + return SPoint(self.x, self.y) + + +class PolygonUDT(UserDefinedType): + """User-defined type (UDT). + + .. note:: WARN: SpatialSDK Internal Use Only + """ + pointUDT = PointUDT() + + @classmethod + def sqlType(cls): + """ + Underlying SQL storage type for this UDT. + """ + return Polygon() + + @classmethod + def module(cls): + """ + The Python module of the UDT. + """ + return "magellan.types" + + @classmethod + def scalaUDT(cls): + """ + The class name of the paired Scala UDT. + """ + return "org.apache.spark.sql.types.PolygonUDT" + + def serialize(self, obj): + """ + Converts the a user-type object into a SQL datum. + """ + if isinstance(obj, Polygon): + x_list = [] + y_list = [] + for p in obj.points: + x_list.append(p.x) + y_list.append(p.y) + return 5, min(x_list), min(y_list), max(x_list), max(y_list), obj.indices, x_list, y_list + else: + raise TypeError("cannot serialize %r of type %r" % (obj, type(obj))) + + def deserialize(self, datum): + """ + Converts a SQL datum into a user-type object. + """ + if isinstance(datum, Polygon): + return datum + else: + assert len(datum) == 8, \ "PolygonUDT.deserialize given row with length %d but requires 2" % len(datum) - return Polygon(datum[0], [self.pointUDT.deserialize(point) for point in datum[1]]) + return Polygon(datum[5], datum[6], datum[7], [self.pointUDT.deserialize(point) for point in zip(datum[6],datum[7])]) def simpleString(self): return 'polygon' @@ -223,6 +675,20 @@ class Polygon(Shape): def __init__(self, indices = [], points = []): self.indices = indices + self.xcoordinates = [p.x for p in points] + self.ycoordinates = [p.y for p in points] + if points: + self.xmin = min(self.xcoordinates) + self.ymin = min(self.ycoordinates) + self.xmax = max(self.xcoordinates) + self.ymax = max(self.ycoordinates) + else: + self.xmin = None + self.ymin = None + self.xmax = None + self.ymax = None + self.boundingBox = BoundingBox(self.xmin, self.ymin, self.xmax, self.ymax) + self.size = len(points) self.points = points def __str__(self): @@ -289,14 +755,19 @@ def scalaUDT(cls): """ The class name of the paired Scala UDT. """ - return "magellan.PolyLineUDT" + return "org.apache.spark.sql.types.PolyLineUDT" def serialize(self, obj): """ Converts the a user-type object into a SQL datum. """ if isinstance(obj, PolyLine): - return obj + x_list = [] + y_list = [] + for p in obj.points: + x_list.append(p.x) + y_list.append(p.y) + return 3, min(x_list), min(y_list), max(x_list), max(y_list), obj.indices, x_list, y_list else: raise TypeError("cannot serialize %r of type %r" % (obj, type(obj))) @@ -335,6 +806,20 @@ class PolyLine(Shape): def __init__(self, indices = [], points = []): self.indices = indices + self.xcoordinates = [p.x for p in points] + self.ycoordinates = [p.y for p in points] + if points: + self.xmin = min(self.xcoordinates) + self.ymin = min(self.ycoordinates) + self.xmax = max(self.xcoordinates) + self.ymax = max(self.ycoordinates) + else: + self.xmin = None + self.ymin = None + self.xmax = None + self.ymax = None + self.boundingBox = BoundingBox(self.xmin, self.ymin, self.xmax, self.ymax) + self.size = len(points) self.points = points def __str__(self): @@ -385,3 +870,21 @@ def _inbound_shape_converter(json_string): def _create_row_inbound_converter(dataType): return lambda *a: dataType.fromInternal(a) +class BoundingBox(object): + + def __init__(self,xmin,ymin,xmax,ymax): + self.xmin = xmin + self.ymin = ymin + self.xmax = xmax + self.ymax = ymax + + def intersects(self, other): + if not other.xmin >= self.xmax and other.ymax >= self.min and other.ymax <= self.ymin and other.xmax <= self.xmin: + return True + else: + return False + +if __name__ == '__main__': + print Point(1,1) + print PointUDT(1,1) + From b8a0f08ed95c971ed697df7ea64756256489a4fc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mateusz=20Bu=C5=9Bkiewicz?= Date: Mon, 27 Nov 2017 16:57:44 +0100 Subject: [PATCH 2/5] Fix deserialization of Polygon type; remove duplicated code --- python/magellan/types.py | 456 +-------------------------------------- 1 file changed, 3 insertions(+), 453 deletions(-) diff --git a/python/magellan/types.py b/python/magellan/types.py index e28ba30..63b8dbe 100644 --- a/python/magellan/types.py +++ b/python/magellan/types.py @@ -17,7 +17,7 @@ import json import sys -from itertools import izip +from itertools import izip, repeat from pyspark import SparkContext from pyspark.sql.types import DataType, UserDefinedType, Row, StructField, StructType, \ @@ -200,7 +200,7 @@ def deserialize(self, datum): else: assert len(datum) == 8, \ "PolygonUDT.deserialize given row with length %d but requires 2" % len(datum) - return Polygon(datum[5], datum[6], datum[7], [self.pointUDT.deserialize(point) for point in zip(datum[6],datum[7])]) + return Polygon(datum[5], [self.pointUDT.deserialize(point) for point in zip(repeat(1), datum[6], datum[7], datum[6], datum[7], datum[6], datum[7])]) def simpleString(self): return 'polygon' @@ -437,454 +437,4 @@ def intersects(self, other): if not other.xmin >= self.xmax and other.ymax >= self.min and other.ymax <= self.ymin and other.xmax <= self.xmin: return True else: - return False - -if __name__ == '__main__': - print Point(1,1) - print PointUDT(1,1) - -# -# Copyright 2015 Ram Sriharsha -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import json -import sys - -from itertools import izip - -from pyspark import SparkContext -from pyspark.sql.types import DataType, UserDefinedType, Row, StructField, StructType, \ - ArrayType, DoubleType, IntegerType - -__all__ = ['Point'] - -try: - from shapely.geometry import Point as SPoint - from shapely.geometry import Polygon as SPolygon - from shapely.geometry import LineString, MultiLineString - _have_shapely = True -except: - # No Shapely in environment, but that's okay - _have_shapely = False - - -class Shape(DataType): - - def convert(self): - raise NotImplementedError() - - def toShapely(self): - if _have_shapely: - return self.convert() - else: - raise TypeError("Cannot convert to Shapely type") - - -class PointUDT(UserDefinedType): - """User-defined type (UDT). - - .. note:: WARN: SpatialSDK Internal Use Only - """ - - @classmethod - def sqlType(cls): - return Point() - - @classmethod - def module(cls): - """ - The Python module of the UDT. - """ - return "magellan.types" - - @classmethod - def scalaUDT(cls): - """ - The class name of the paired Scala UDT. - """ - return "org.apache.spark.sql.types.PointUDT" - - def serialize(self, obj): - """ - Converts the a user-type object into a SQL datum. - """ - if isinstance(obj, Point): - #pnt = Row(IntegerType(), DoubleType(),DoubleType(),DoubleType(),DoubleType(),DoubleType(),DoubleType() ) - #return pnt("udt",obj.x,obj.y,obj.x,obj.y,obj.x,obj.y) - return 1,obj.x, obj.y,obj.x, obj.y,obj.x, obj.y - else: - raise TypeError("cannot serialize %r of type %r" % (obj, type(obj))) - - def deserialize(self, datum): - """ - Converts a SQL datum into a user-type object. - """ - if isinstance(datum, Point): - return datum - else: - assert len(datum) == 7, \ - "PointUDT.deserialize given row with length %d but requires 7" % len(datum) - return Point(datum[5], datum[6]) - - def simpleString(self): - return 'point' - - @classmethod - def fromJson(cls, json): - return Point(json['x'], json['y']) - - -class Point(Shape): - """ - A point is a zero dimensional shape. - The coordinates of a point can be in linear units such as feet or meters, - or they can be in angular units such as degrees or radians. - The associated spatial reference specifies the units of the coordinates. - In the case of a geographic coordinate system, the x-coordinate is the longitude - and the y-coordinate is the latitude. - - >>> v = Point(1.0, 2.0) - Point([1.0, 2.0]) - """ - - __UDT__ = PointUDT() - - def __init__(self, x = 0.0, y = 0.0): - self.x = x - self.y = y - - def __str__(self): - return "Point (" + str(self.x) + "," + str(self.y) + ")" - - def __repr__(self): - return self.__str__() - - def __unicode__(self): - return self.__str__() - - def __reduce__(self): - return (Point, (self.x, self.y)) - - def __eq__(self, other): - return isinstance(other, Point) and self.x == other.x and self.y == other.y - - @classmethod - def fromJson(cls, json): - return Point(json['x'], json['y']) - - def jsonValue(self): - return {"type": "udt", - "pyClass": "magellan.types.PointUDT", - "class": "magellan.PointUDT", - "sqlType": "magellan.Point"} - - def convert(self): - return SPoint(self.x, self.y) - - -class PolygonUDT(UserDefinedType): - """User-defined type (UDT). - - .. note:: WARN: SpatialSDK Internal Use Only - """ - pointUDT = PointUDT() - - @classmethod - def sqlType(cls): - """ - Underlying SQL storage type for this UDT. - """ - return Polygon() - - @classmethod - def module(cls): - """ - The Python module of the UDT. - """ - return "magellan.types" - - @classmethod - def scalaUDT(cls): - """ - The class name of the paired Scala UDT. - """ - return "org.apache.spark.sql.types.PolygonUDT" - - def serialize(self, obj): - """ - Converts the a user-type object into a SQL datum. - """ - if isinstance(obj, Polygon): - x_list = [] - y_list = [] - for p in obj.points: - x_list.append(p.x) - y_list.append(p.y) - return 5, min(x_list), min(y_list), max(x_list), max(y_list), obj.indices, x_list, y_list - else: - raise TypeError("cannot serialize %r of type %r" % (obj, type(obj))) - - def deserialize(self, datum): - """ - Converts a SQL datum into a user-type object. - """ - if isinstance(datum, Polygon): - return datum - else: - assert len(datum) == 8, \ - "PolygonUDT.deserialize given row with length %d but requires 2" % len(datum) - return Polygon(datum[5], datum[6], datum[7], [self.pointUDT.deserialize(point) for point in zip(datum[6],datum[7])]) - - def simpleString(self): - return 'polygon' - - @classmethod - def fromJson(cls, json): - indices = json["indices"] - points = [PointUDT.fromJson(point) for point in json["points"]] - return Polygon(indices, points) - - -class Polygon(Shape): - """ - A polygon consists of one or more rings. A ring is a connected sequence of four or more points - that form a closed, non-self-intersecting loop. A polygon may contain multiple outer rings. - The order of vertices or orientation for a ring indicates which side of the ring is the interior - of the polygon. The neighborhood to the right of an observer walking along the ring - in vertex order is the neighborhood inside the polygon. - Vertices of rings defining holes in polygons are in a counterclockwise direction. - Vertices for a single, ringed polygon are, therefore, always in clockwise order. - The rings of a polygon are referred to as its parts. - >>> v = Polygon([0], [Point(1.0, 1.0), Point(1.0, -1.0), Point(1.0, 1.0)) - Point([-1.0,-1.0, 1.0, 1.0], [0], Point(1.0, 1.0), Point(1.0, -1.0), Point(1.0, 1.0)) - """ - - __UDT__ = PolygonUDT() - - def __init__(self, indices = [], points = []): - self.indices = indices - self.xcoordinates = [p.x for p in points] - self.ycoordinates = [p.y for p in points] - if points: - self.xmin = min(self.xcoordinates) - self.ymin = min(self.ycoordinates) - self.xmax = max(self.xcoordinates) - self.ymax = max(self.ycoordinates) - else: - self.xmin = None - self.ymin = None - self.xmax = None - self.ymax = None - self.boundingBox = BoundingBox(self.xmin, self.ymin, self.xmax, self.ymax) - self.size = len(points) - self.points = points - - def __str__(self): - inds = "[" + ",".join([str(i) for i in self.indices]) + "]" - pts = "[" + ",".join([str(v) for v in self.points]) + "]" - return "Polygon (" + ",".join((inds, pts)) + ")" - - def __repr__(self): - return self.__str__() - - def __reduce__(self): - return (Polygon, (self.indices, self.points)) - - @classmethod - def fromJson(cls, json): - indices = json["indices"] - points = [PointUDT.fromJson(point) for point in json["points"]] - return Polygon(indices, points) - - def jsonValue(self): - return {"type": "udt", - "pyClass": "magellan.types.PolygonUDT", - "class": "magellan.Polygon", - "sqlType": "magellan.Polygon"} - - def convert(self): - l = [] - l.extend(self.indices) - l.append(len(self.points)) - p = [] - for i,j in zip(l, l[1:]): - spoints = [(point.x, point.y) for point in self.points[i:j - 1]] - p.append(spoints) - - shell = p[0] - holes = p[1:] - return SPolygon(shell=shell, holes=holes) - - -class PolyLineUDT(UserDefinedType): - """User-defined type (UDT). - - .. note:: WARN: SpatialSDK Internal Use Only - """ - - pointUDT = PointUDT() - - @classmethod - def sqlType(cls): - """ - Underlying SQL storage type for this UDT. - """ - return PolyLine() - - @classmethod - def module(cls): - """ - The Python module of the UDT. - """ - return "magellan.types" - - @classmethod - def scalaUDT(cls): - """ - The class name of the paired Scala UDT. - """ - return "org.apache.spark.sql.types.PolyLineUDT" - - def serialize(self, obj): - """ - Converts the a user-type object into a SQL datum. - """ - if isinstance(obj, PolyLine): - x_list = [] - y_list = [] - for p in obj.points: - x_list.append(p.x) - y_list.append(p.y) - return 3, min(x_list), min(y_list), max(x_list), max(y_list), obj.indices, x_list, y_list - else: - raise TypeError("cannot serialize %r of type %r" % (obj, type(obj))) - - def deserialize(self, datum): - """ - Converts a SQL datum into a user-type object. - """ - if isinstance(datum, PolyLine): - return datum - else: - assert len(datum) == 2, \ - "PolyLineUDT.deserialize given row with length %d but requires 2" % len(datum) - return PolyLine(datum[0], [self.pointUDT.deserialize(point) for point in datum[1]]) - - def simpleString(self): - return 'polyline' - - @classmethod - def fromJson(cls, json): - indices = json["indices"] - points = [PointUDT.fromJson(point) for point in json["points"]] - return PolyLine(indices, points) - - -class PolyLine(Shape): - """ - A PolyLine is an ordered set of vertices that consists of one or more parts. - A part is a connected sequence of two or more points. - Parts may or may not be connected to one another. - Parts may or may not intersect one another - >>> v = PolyLine([0], [Point(1.0, 1.0), Point(1.0, -1.0), Point(1.0, 0.0)) - Point([0], Point(1.0, 1.0), Point(1.0, -1.0), Point(1.0, 0.0)) - """ - - __UDT__ = PolyLineUDT() - - def __init__(self, indices = [], points = []): - self.indices = indices - self.xcoordinates = [p.x for p in points] - self.ycoordinates = [p.y for p in points] - if points: - self.xmin = min(self.xcoordinates) - self.ymin = min(self.ycoordinates) - self.xmax = max(self.xcoordinates) - self.ymax = max(self.ycoordinates) - else: - self.xmin = None - self.ymin = None - self.xmax = None - self.ymax = None - self.boundingBox = BoundingBox(self.xmin, self.ymin, self.xmax, self.ymax) - self.size = len(points) - self.points = points - - def __str__(self): - inds = "[" + ",".join([str(i) for i in self.indices]) + "]" - pts = "[" + ",".join([str(v) for v in self.points]) + "]" - return "Polygon (" + ",".join((inds, pts)) + ")" - - def __repr__(self): - return self.__str__() - - def __reduce__(self): - return (PolyLine, (self.indices, self.points)) - - @classmethod - def fromJson(cls, json): - indices = json["indices"] - points = [PointUDT.fromJson(point) for point in json["points"]] - return PolyLine(indices, points) - - def jsonValue(self): - return {"type": "udt", - "pyClass": "magellan.types.PolyLineUDT", - "class": "magellan.PolyLine", - "sqlType": "magellan.PolyLine"} - - def convert(self): - l = [] - l.extend(self.indices) - l.append(len(self.points)) - p = [] - for i,j in zip(l, l[1:]): - spoints = [(point.x, point.y) for point in self.points[i:j - 1]] - p.append(LineString(spoints)) - return MultiLineString(p) - - -def _inbound_shape_converter(json_string): - j = json.loads(json_string) - shapeType = str(j["pyClass"]) # convert unicode to str - split = shapeType.rfind(".") - module = shapeType[:split] - shapeClass = shapeType[split+1:] - m = __import__(module, globals(), locals(), [shapeClass]) - UDT = getattr(m, shapeClass) - return UDT.fromJson(j) - -# This is used to unpickle a Row from JVM -def _create_row_inbound_converter(dataType): - return lambda *a: dataType.fromInternal(a) - -class BoundingBox(object): - - def __init__(self,xmin,ymin,xmax,ymax): - self.xmin = xmin - self.ymin = ymin - self.xmax = xmax - self.ymax = ymax - - def intersects(self, other): - if not other.xmin >= self.xmax and other.ymax >= self.min and other.ymax <= self.ymin and other.xmax <= self.xmin: - return True - else: - return False - -if __name__ == '__main__': - print Point(1,1) - print PointUDT(1,1) - + return False \ No newline at end of file From c265dc0fb7c9b91545c7880a5a9d36ffbc63b5bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mateusz=20Bu=C5=9Bkiewicz?= Date: Mon, 27 Nov 2017 17:01:42 +0100 Subject: [PATCH 3/5] Fix scala classes of within, intersect and transform operators; cast string to column --- python/magellan/column.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/python/magellan/column.py b/python/magellan/column.py index 362dfbb..4f28386 100644 --- a/python/magellan/column.py +++ b/python/magellan/column.py @@ -16,11 +16,14 @@ from pyspark import SparkContext from pyspark.sql.column import Column, _create_column_from_literal +from pyspark.sql.functions import col as _col def _bin_op(name, doc="binary operator"): """ Create a method for given binary operator """ def _(col, other): + if isinstance(other, str): + other = _col(other) jc = other._jc if isinstance(other, Column) else other jcol = col._jc sc = SparkContext._active_spark_context @@ -56,9 +59,9 @@ def _(col, other): _.__doc__ = doc return _ -within = _bin_op("magellan.catalyst.Within") -intersects = _bin_op("magellan.catalyst.Intersects") -transform = _unary_op("magellan.catalyst.Transformer") +within = _bin_op("org.apache.spark.sql.types.Within") +intersects = _bin_op("org.apache.spark.sql.types.Intersects") +transform = _unary_op("org.apache.spark.sql.types.Transformer") Column.within = within Column.intersects = intersects Column.transform = transform From 7362957132010eb8ff4528d577b1da72ec90fb92 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mateusz=20Bu=C5=9Bkiewicz?= Date: Mon, 27 Nov 2017 17:19:25 +0100 Subject: [PATCH 4/5] Add proper setup.py --- python/setup.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/python/setup.py b/python/setup.py index e69de29..f1f4a33 100644 --- a/python/setup.py +++ b/python/setup.py @@ -0,0 +1,21 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +"""The setup script.""" +from setuptools import setup, find_packages + +requirements = [ + # TODO: put package requirements here +] +setup( + name='magellan', + version='1.0.5', + description="Magellan", + long_description="Magellan", + author="harsha2010", + url='https://github.com/harsha2010/magellan', + packages=[package for package in find_packages() if package.startswith('magellan')], + include_package_data=True, + install_requires=requirements, + zip_safe=False, + keywords='magellan', +) \ No newline at end of file From 7f8d75609654eed2dd6041095378ef9dae46703b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mateusz=20Bu=C5=9Bkiewicz?= Date: Tue, 2 Jan 2018 10:21:47 +0100 Subject: [PATCH 5/5] Add support for indexed spatial join --- python/README.md | 0 python/__init__.py | 20 --------- python/magellan/dataframe.py | 27 ++++++++++++ python/magellan/utils.py | 10 +++++ python/requirements.txt | 3 ++ python/tests/__init__.py | 0 python/tests/conftest.py | 59 ++++++++++++++++++++++++++ python/tests/magellan/test_magellan.py | 0 8 files changed, 99 insertions(+), 20 deletions(-) create mode 100644 python/README.md delete mode 100644 python/__init__.py create mode 100644 python/magellan/dataframe.py create mode 100644 python/magellan/utils.py create mode 100644 python/tests/__init__.py create mode 100644 python/tests/conftest.py create mode 100644 python/tests/magellan/test_magellan.py diff --git a/python/README.md b/python/README.md new file mode 100644 index 0000000..e69de29 diff --git a/python/__init__.py b/python/__init__.py deleted file mode 100644 index bb06d6e..0000000 --- a/python/__init__.py +++ /dev/null @@ -1,20 +0,0 @@ -# -# Copyright 2015 Ram Sriharsha -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -""" -This is the Python API for SpatialSDK. - -""" diff --git a/python/magellan/dataframe.py b/python/magellan/dataframe.py new file mode 100644 index 0000000..c0cce8d --- /dev/null +++ b/python/magellan/dataframe.py @@ -0,0 +1,27 @@ +from pyspark.sql import DataFrame +from spylon.spark.utils import SparkJVMHelpers + + +def _immutable_scala_map(jvm_helpers, dict_like): + jvm_helpers.to_scala_map(dict_like).toMap(jvm_helpers.jvm.scala.Predef.conforms()) + + +def index(df, precision): + jvm_helpers = SparkJVMHelpers(df._sc) + jdf = df._jdf + sparkSession = jdf.sparkSession() + SpatialJoinHint = df._sc._jvm.magellan.catalyst.SpatialJoinHint + Dataset = df._sc._jvm.org.apache.spark.sql.Dataset + + new_jdf = Dataset( + sparkSession, + SpatialJoinHint( + jdf.logicalPlan(), + _immutable_scala_map(jvm_helpers, {"magellan.index.precision": str(precision)}) + ), + jdf.exprEnc()) + + return DataFrame(new_jdf, df.sql_ctx) + + +DataFrame.index = index diff --git a/python/magellan/utils.py b/python/magellan/utils.py new file mode 100644 index 0000000..a2311b9 --- /dev/null +++ b/python/magellan/utils.py @@ -0,0 +1,10 @@ +from spylon.spark.utils import SparkJVMHelpers + + +def inject_rules(spark_session): + from magellan.column import * + from magellan.dataframe import * + + jvm_helpers = SparkJVMHelpers(spark_session._sc) + magellan_utils = jvm_helpers.import_scala_object('magellan.Utils') + magellan_utils.incjectRules(spark_session._jsparkSession) \ No newline at end of file diff --git a/python/requirements.txt b/python/requirements.txt index e69de29..d7d3f69 100644 --- a/python/requirements.txt +++ b/python/requirements.txt @@ -0,0 +1,3 @@ +spylon +pyspark +pytest \ No newline at end of file diff --git a/python/tests/__init__.py b/python/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/python/tests/conftest.py b/python/tests/conftest.py new file mode 100644 index 0000000..12baded --- /dev/null +++ b/python/tests/conftest.py @@ -0,0 +1,59 @@ +from pyspark import SparkConf, SQLContext, HiveContext +from pyspark.sql import DataFrame +import pytest +from base.spark.extensions import * + +import os + +pending = pytest.mark.xfail + +ROOT = os.path.abspath(os.path.join(__file__, '../..')) + +@pytest.fixture(scope="session") +def sparkContext(): + + conf = SparkConf() \ + .setAppName('py.test') + + sc = SparkContext(conf=conf) + + # disable logging + + sc.setLogLevel("OFF") + + return sc + + +@pytest.fixture(scope="session") +def sqlContext(sparkContext): + return SQLContext(sparkContext) + + +@pytest.fixture(scope="session") +def hiveContext(sparkContext): + return HiveContext(sparkContext) + + +def dfassert(left, right, useSet=False, skipExtraColumns=False): + if not isinstance(right, DataFrame): + right = left.sql_ctx.createDataFrame(right) + + if skipExtraColumns: + columns = list(set(left.columns) & set(right.columns)) + left = left[columns] + right = right[columns] + + assert sorted(left.columns) == sorted(right.columns) + + def _orderableColumns(df): + return [col for col in df.columns if df[col].dataType.typeName() != 'array'] + + left = left[sorted(left.columns)] + right = right[sorted(right.columns)] + + converter = set if useSet else list + + orderedLeft = left.orderBy(*_orderableColumns(left)) if _orderableColumns(left) else left + orderedRight = right.orderBy(*_orderableColumns(right)) if _orderableColumns(right) else right + + assert converter(orderedLeft.collect()) == converter(orderedRight.collect()) diff --git a/python/tests/magellan/test_magellan.py b/python/tests/magellan/test_magellan.py new file mode 100644 index 0000000..e69de29