From 4f214744806ab19701a81a6366aba71c92b18432 Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Sun, 16 Jun 2024 12:46:25 +0800 Subject: [PATCH] PARQUET-2471: Add support for geometry logical type --- parquet-column/pom.xml | 5 + .../column/statistics/GeometryStatistics.java | 100 ++++++++++++ .../statistics/geometry/BoundingBox.java | 148 ++++++++++++++++++ .../column/statistics/geometry/Covering.java | 79 ++++++++++ .../statistics/geometry/EnvelopeCovering.java | 77 +++++++++ .../statistics/geometry/GeometryTypes.java | 120 ++++++++++++++ .../parquet/schema/LogicalTypeAnnotation.java | 120 ++++++++++++++ .../parquet/schema/PrimitiveStringifier.java | 19 +++ .../apache/parquet/schema/PrimitiveType.java | 7 + .../java/org/apache/parquet/schema/Types.java | 9 ++ .../converter/ParquetMetadataConverter.java | 82 ++++++++++ .../statistics/TestGeometryTypeRoundTrip.java | 111 +++++++++++++ pom.xml | 3 +- 13 files changed, 879 insertions(+), 1 deletion(-) create mode 100644 parquet-column/src/main/java/org/apache/parquet/column/statistics/GeometryStatistics.java create mode 100644 parquet-column/src/main/java/org/apache/parquet/column/statistics/geometry/BoundingBox.java create mode 100644 parquet-column/src/main/java/org/apache/parquet/column/statistics/geometry/Covering.java create mode 100644 parquet-column/src/main/java/org/apache/parquet/column/statistics/geometry/EnvelopeCovering.java create mode 100644 parquet-column/src/main/java/org/apache/parquet/column/statistics/geometry/GeometryTypes.java create mode 100644 parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestGeometryTypeRoundTrip.java diff --git a/parquet-column/pom.xml b/parquet-column/pom.xml index aeca0a2eab..7d24f311ca 100644 --- a/parquet-column/pom.xml +++ b/parquet-column/pom.xml @@ -75,6 +75,11 @@ slf4j-api ${slf4j.version} + + org.locationtech.jts + jts-core + ${jts.version} + com.carrotsearch diff --git a/parquet-column/src/main/java/org/apache/parquet/column/statistics/GeometryStatistics.java b/parquet-column/src/main/java/org/apache/parquet/column/statistics/GeometryStatistics.java new file mode 100644 index 0000000000..5977cb2f1f --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/column/statistics/GeometryStatistics.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.statistics; + +import org.apache.parquet.Preconditions; +import org.apache.parquet.column.statistics.geometry.BoundingBox; +import org.apache.parquet.column.statistics.geometry.Covering; +import org.apache.parquet.column.statistics.geometry.GeometryTypes; +import org.apache.parquet.io.api.Binary; +import org.locationtech.jts.geom.Geometry; +import org.locationtech.jts.io.ParseException; +import org.locationtech.jts.io.WKBReader; + +public class GeometryStatistics { + + private final BoundingBox boundingBox; + private final Covering covering; + private final GeometryTypes geometryTypes; + private final WKBReader reader = new WKBReader(); + + public GeometryStatistics(BoundingBox boundingBox, Covering covering, GeometryTypes geometryTypes) { + this.boundingBox = boundingBox; + this.covering = covering; + this.geometryTypes = geometryTypes; + } + + public BoundingBox getBoundingBox() { + return boundingBox; + } + + public Covering getCovering() { + return covering; + } + + public GeometryTypes getGeometryTypes() { + return geometryTypes; + } + + public void update(Binary value) { + if (value == null) { + return; + } + try { + Geometry geom = reader.read(value.getBytes()); + update(geom); + } catch (ParseException e) { + abort(); + } + } + + public void update(Geometry geom) { + boundingBox.update(geom); + covering.update(geom); + geometryTypes.update(geom); + } + + public void merge(GeometryStatistics other) { + Preconditions.checkArgument(other != null, "Cannot merge with null GeometryStatistics"); + boundingBox.merge(other.boundingBox); + covering.merge(other.covering); + geometryTypes.merge(other.geometryTypes); + } + + public void reset() { + boundingBox.reset(); + covering.reset(); + geometryTypes.reset(); + } + + public void abort() { + boundingBox.abort(); + covering.abort(); + geometryTypes.abort(); + } + + @Override + public String toString() { + return "GeometryStatistics{" + + "boundingBox=" + boundingBox + + ", covering=" + covering + + ", geometryTypes=" + geometryTypes + + '}'; + } +} diff --git a/parquet-column/src/main/java/org/apache/parquet/column/statistics/geometry/BoundingBox.java b/parquet-column/src/main/java/org/apache/parquet/column/statistics/geometry/BoundingBox.java new file mode 100644 index 0000000000..98930cf56f --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/column/statistics/geometry/BoundingBox.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.statistics.geometry; + +import org.apache.parquet.Preconditions; +import org.locationtech.jts.geom.Coordinate; +import org.locationtech.jts.geom.Geometry; + +public class BoundingBox { + + private double xMin = Double.MAX_VALUE; + private double xMax = Double.MIN_VALUE; + private double yMin = Double.MAX_VALUE; + private double yMax = Double.MIN_VALUE; + private double zMin = Double.MAX_VALUE; + private double zMax = Double.MIN_VALUE; + private double mMin = Double.MAX_VALUE; + private double mMax = Double.MIN_VALUE; + + public BoundingBox(double xMin, double xMax, double yMin, double yMax, double zMin, double zMax, double mMin, + double mMax) { + this.xMin = xMin; + this.xMax = xMax; + this.yMin = yMin; + this.yMax = yMax; + this.zMin = zMin; + this.zMax = zMax; + this.mMin = mMin; + this.mMax = mMax; + } + + public double getXMin() { + return xMin; + } + + public double getXMax() { + return xMax; + } + + public double getYMin() { + return yMin; + } + + public double getYMax() { + return yMax; + } + + public double getZMin() { + return zMin; + } + + public double getZMax() { + return zMax; + } + + public double getMMin() { + return mMin; + } + + public double getMMax() { + return mMax; + } + + public void update(Geometry geom) { + if (geom == null || geom.isEmpty()) { + return; + } + Coordinate[] coordinates = geom.getCoordinates(); + for (Coordinate coordinate : coordinates) { + update(coordinate.getX(), coordinate.getY(), coordinate.getZ(), coordinate.getM()); + } + } + + public void update(double x, double y, double z, double m) { + xMin = Math.min(xMin, x); + xMax = Math.max(xMax, x); + yMin = Math.min(yMin, y); + yMax = Math.max(yMax, y); + zMin = Math.min(zMin, z); + zMax = Math.max(zMax, z); + mMin = Math.min(mMin, m); + mMax = Math.max(mMax, m); + } + + public void merge(BoundingBox other) { + Preconditions.checkArgument(other != null, "Cannot merge with null bounding box"); + xMin = Math.min(xMin, other.xMin); + xMax = Math.max(xMax, other.xMax); + yMin = Math.min(yMin, other.yMin); + yMax = Math.max(yMax, other.yMax); + zMin = Math.min(zMin, other.zMin); + zMax = Math.max(zMax, other.zMax); + mMin = Math.min(mMin, other.mMin); + mMax = Math.max(mMax, other.mMax); + } + + public void reset() { + xMin = Double.MAX_VALUE; + xMax = Double.MIN_VALUE; + yMin = Double.MAX_VALUE; + yMax = Double.MIN_VALUE; + zMin = Double.MAX_VALUE; + zMax = Double.MIN_VALUE; + mMin = Double.MAX_VALUE; + mMax = Double.MIN_VALUE; + } + + public void abort() { + xMin = Double.NaN; + xMax = Double.NaN; + yMin = Double.NaN; + yMax = Double.NaN; + zMin = Double.NaN; + zMax = Double.NaN; + mMin = Double.NaN; + mMax = Double.NaN; + } + + @Override + public String toString() { + return "BoundingBox{" + + "xMin=" + xMin + + ", xMax=" + xMax + + ", yMin=" + yMin + + ", yMax=" + yMax + + ", zMin=" + zMin + + ", zMax=" + zMax + + ", mMin=" + mMin + + ", mMax=" + mMax + + '}'; + } +} diff --git a/parquet-column/src/main/java/org/apache/parquet/column/statistics/geometry/Covering.java b/parquet-column/src/main/java/org/apache/parquet/column/statistics/geometry/Covering.java new file mode 100644 index 0000000000..e978f30545 --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/column/statistics/geometry/Covering.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.statistics.geometry; + +import java.nio.ByteBuffer; +import org.apache.parquet.Preconditions; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.locationtech.jts.geom.Geometry; +import org.locationtech.jts.io.ParseException; +import org.locationtech.jts.io.WKBReader; +import org.locationtech.jts.io.WKBWriter; + +public class Covering { + + protected final LogicalTypeAnnotation.Edges edges; + protected ByteBuffer geometry; + + public Covering(ByteBuffer geometry, LogicalTypeAnnotation.Edges edges) { + Preconditions.checkArgument(geometry != null, "Geometry cannot be null"); + Preconditions.checkArgument(edges != null, "Edges cannot be null"); + this.geometry = geometry; + this.edges = edges; + } + + public ByteBuffer getGeometry() { + return geometry; + } + + public LogicalTypeAnnotation.Edges getEdges() { + return edges; + } + + public void update(Geometry geom) { + geometry = ByteBuffer.wrap(new WKBWriter().write(geom)); + } + + public void merge(Covering other) { + throw new UnsupportedOperationException("Merge is not supported for " + this.getClass().getSimpleName()); + } + + public void reset() { + throw new UnsupportedOperationException("Reset is not supported for " + this.getClass().getSimpleName()); + } + + public void abort() { + throw new UnsupportedOperationException("Abort is not supported for " + this.getClass().getSimpleName()); + } + + @Override + public String toString() { + String geomText; + try { + geomText = new WKBReader().read(geometry.array()).toText(); + } catch (ParseException e) { + geomText = "Invalid Geometry"; + } + + return "Covering{" + + "geometry=" + geomText + + ", edges=" + edges + + '}'; + } +} diff --git a/parquet-column/src/main/java/org/apache/parquet/column/statistics/geometry/EnvelopeCovering.java b/parquet-column/src/main/java/org/apache/parquet/column/statistics/geometry/EnvelopeCovering.java new file mode 100644 index 0000000000..65e6850228 --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/column/statistics/geometry/EnvelopeCovering.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.statistics.geometry; + +import java.nio.ByteBuffer; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.locationtech.jts.geom.Geometry; +import org.locationtech.jts.io.ParseException; +import org.locationtech.jts.io.WKBReader; +import org.locationtech.jts.io.WKBWriter; + +public class EnvelopeCovering extends Covering { + + private final static ByteBuffer EMPTY = ByteBuffer.wrap(new byte[0]); + private final WKBReader reader = new WKBReader(); + private final WKBWriter writer = new WKBWriter(); + + public EnvelopeCovering() { + super(EMPTY, LogicalTypeAnnotation.Edges.PLANAR); + } + + @Override + public void update(Geometry geom) { + if (geometry == null) { + return; + } + try { + if (geometry != EMPTY) { + Geometry envelope = reader.read(geometry.array()); + geometry = ByteBuffer.wrap(writer.write(envelope.union(geom).getEnvelope())); + } else { + geometry = ByteBuffer.wrap(writer.write(geom.getEnvelope())); + } + } catch (ParseException e) { + geometry = null; + } + } + + @Override + public void merge(Covering other) { + if (other instanceof EnvelopeCovering) { + try { + update(reader.read(other.geometry.array())); + } catch (ParseException e) { + geometry = null; + } + } else { + throw new UnsupportedOperationException("Cannot merge " + this.getClass() + " with " + other.getClass().getSimpleName()); + } + } + + @Override + public void reset() { + geometry = EMPTY; + } + + @Override + public void abort() { + geometry = null; + } +} diff --git a/parquet-column/src/main/java/org/apache/parquet/column/statistics/geometry/GeometryTypes.java b/parquet-column/src/main/java/org/apache/parquet/column/statistics/geometry/GeometryTypes.java new file mode 100644 index 0000000000..03905cbf59 --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/column/statistics/geometry/GeometryTypes.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.statistics.geometry; + +import java.util.HashSet; +import java.util.Set; +import org.apache.parquet.Preconditions; +import org.locationtech.jts.geom.Coordinate; +import org.locationtech.jts.geom.Geometry; + +public class GeometryTypes { + + private static final int UNKNOWN_TYPE_ID = -1; + private Set types = new HashSet<>(); + private boolean valid = true; + + public GeometryTypes(Set types) { + this.types = types; + } + + public Set getTypes() { + return types; + } + + public void update(Geometry geometry) { + if (!valid) { + return; + } + int code = getGeometryTypeCode(geometry); + if (code != UNKNOWN_TYPE_ID) { + types.add(code); + } else { + valid = false; + types.clear(); + } + } + + public void merge(GeometryTypes other) { + Preconditions.checkArgument(other != null, "Cannot merge with null GeometryTypes"); + if (!valid) { + return; + } + if (!other.valid) { + valid = false; + types.clear(); + return; + } + types.addAll(other.types); + } + + public void reset() { + types.clear(); + valid = true; + } + + public void abort() { + valid = false; + types.clear(); + } + + @Override + public String toString() { + // TODO: Print the geometry types as strings + return "GeometryTypes{" + + "types=" + types + + '}'; + } + + private int getGeometryTypeId(Geometry geometry) { + switch (geometry.getGeometryType()) { + case Geometry.TYPENAME_POINT: + return 1; + case Geometry.TYPENAME_LINESTRING: + return 2; + case Geometry.TYPENAME_POLYGON: + return 3; + case Geometry.TYPENAME_MULTIPOINT: + return 4; + case Geometry.TYPENAME_MULTILINESTRING: + return 5; + case Geometry.TYPENAME_MULTIPOLYGON: + return 6; + case Geometry.TYPENAME_GEOMETRYCOLLECTION: + return 7; + default: + return UNKNOWN_TYPE_ID; + } + } + + private int getGeometryTypeCode(Geometry geometry) { + int typeId = getGeometryTypeId(geometry); + if (typeId == UNKNOWN_TYPE_ID) { + return UNKNOWN_TYPE_ID; + } + Coordinate coordinate = geometry.getCoordinate(); + if (!Double.isNaN(coordinate.getZ())) { + typeId += 1000; + } + if (!Double.isNaN(coordinate.getM())) { + typeId += 2000; + } + return typeId; + } +} diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/LogicalTypeAnnotation.java b/parquet-column/src/main/java/org/apache/parquet/schema/LogicalTypeAnnotation.java index 05629dd388..0c9032dd33 100644 --- a/parquet-column/src/main/java/org/apache/parquet/schema/LogicalTypeAnnotation.java +++ b/parquet-column/src/main/java/org/apache/parquet/schema/LogicalTypeAnnotation.java @@ -33,6 +33,7 @@ import static org.apache.parquet.schema.PrimitiveStringifier.TIME_STRINGIFIER; import static org.apache.parquet.schema.PrimitiveStringifier.TIME_UTC_STRINGIFIER; +import java.nio.ByteBuffer; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -146,6 +147,19 @@ protected LogicalTypeAnnotation fromString(List params) { protected LogicalTypeAnnotation fromString(List params) { return float16Type(); } + }, + GEOMETRY { + @Override + protected LogicalTypeAnnotation fromString(List params) { + if (params.size() < 2) { + throw new RuntimeException("Expecting at least 2 parameters for geometry logical type, got " + params.size()); + } + GeometryEncoding encoding = GeometryEncoding.valueOf(params.get(0)); + Edges edges = Edges.valueOf(params.get(1)); + String crs = params.size() > 2 ? params.get(2) : null; + ByteBuffer metadata = params.size() > 3 ? ByteBuffer.wrap(params.get(3).getBytes()) : null; + return geometryType(encoding, edges, crs, metadata); + } }; protected abstract LogicalTypeAnnotation fromString(List params); @@ -316,6 +330,10 @@ public static Float16LogicalTypeAnnotation float16Type() { return Float16LogicalTypeAnnotation.INSTANCE; } + public static GeometryLogicalTypeAnnotation geometryType(GeometryEncoding encoding, Edges edges, String crs, ByteBuffer metadata) { + return new GeometryLogicalTypeAnnotation(encoding, edges, crs, metadata); + } + public static class StringLogicalTypeAnnotation extends LogicalTypeAnnotation { private static final StringLogicalTypeAnnotation INSTANCE = new StringLogicalTypeAnnotation(); @@ -1091,6 +1109,104 @@ public int hashCode() { } } + public enum GeometryEncoding { + WKB + } + + public enum Edges { + PLANAR, + SPHERICAL + } + + public static class GeometryLogicalTypeAnnotation extends LogicalTypeAnnotation { + private final GeometryEncoding encoding; + private final Edges edges; + private final String crs; + private final ByteBuffer metadata; + + private GeometryLogicalTypeAnnotation(GeometryEncoding encoding, Edges edges, String crs, ByteBuffer metadata) { + Preconditions.checkArgument(encoding != null, "Geometry encoding is required"); + Preconditions.checkArgument(edges != null, "Geometry edges is required"); + this.encoding = encoding; + this.edges = edges; + this.crs = crs; + this.metadata = metadata; + } + + @Override + @Deprecated + public OriginalType toOriginalType() { + return null; + } + + @Override + public Optional accept(LogicalTypeAnnotationVisitor logicalTypeAnnotationVisitor) { + return logicalTypeAnnotationVisitor.visit(this); + } + + @Override + LogicalTypeToken getType() { + return LogicalTypeToken.GEOMETRY; + } + + @Override + protected String typeParametersAsString() { + StringBuilder sb = new StringBuilder(); + sb.append("("); + sb.append(encoding); + sb.append(","); + sb.append(edges); + if (crs != null && !crs.isEmpty()) { + sb.append(","); + sb.append(crs); + } + if (metadata != null) { + sb.append(","); + sb.append(metadata); + } + sb.append(")"); + return sb.toString(); + } + + public GeometryEncoding getEncoding() { + return encoding; + } + + public Edges getEdges() { + return edges; + } + + public String getCrs() { + return crs; + } + + public ByteBuffer getMetadata() { + return metadata; + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof GeometryLogicalTypeAnnotation)) { + return false; + } + GeometryLogicalTypeAnnotation other = (GeometryLogicalTypeAnnotation) obj; + return (encoding == other.encoding) && (edges == other.edges) && crs.equals(other.crs); + } + + @Override + public int hashCode() { + return Objects.hash(encoding, crs, edges); + } + + @Override + PrimitiveStringifier valueStringifier(PrimitiveType primitiveType) { + if (encoding == GeometryEncoding.WKB) { + return PrimitiveStringifier.WKB_STRINGIFIER; + } + return super.valueStringifier(primitiveType); + } + } + /** * Implement this interface to visit a logical type annotation in the schema. * The default implementation for each logical type specific visitor method is empty. @@ -1162,5 +1278,9 @@ default Optional visit(MapKeyValueTypeAnnotation mapKeyValueLogicalType) { default Optional visit(Float16LogicalTypeAnnotation float16LogicalType) { return empty(); } + + default Optional visit(GeometryLogicalTypeAnnotation geometryLogicalType) { + return empty(); + } } } diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveStringifier.java b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveStringifier.java index c46e94367f..bb5c8a9474 100644 --- a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveStringifier.java +++ b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveStringifier.java @@ -35,6 +35,9 @@ import java.util.concurrent.TimeUnit; import javax.naming.OperationNotSupportedException; import org.apache.parquet.io.api.Binary; +import org.locationtech.jts.geom.Geometry; +import org.locationtech.jts.io.ParseException; +import org.locationtech.jts.io.WKBReader; /** * Class that provides string representations for the primitive values. These string values are to be used for @@ -449,4 +452,20 @@ String stringifyNotNull(Binary value) { return Float16.toFloatString(value); } }; + + static final PrimitiveStringifier WKB_STRINGIFIER = new BinaryStringifierBase("WKB_STRINGIFIER") { + + @Override + String stringifyNotNull(Binary value) { + + Geometry geometry; + try { + WKBReader reader = new WKBReader(); + geometry = reader.read(value.getBytesUnsafe()); + return geometry.toText(); + } catch (ParseException e) { + return BINARY_INVALID; + } + } + }; } diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java index e74d7cde02..bb11a2f1c8 100644 --- a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java +++ b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java @@ -271,6 +271,13 @@ public Optional visit( LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonLogicalType) { return of(PrimitiveComparator.UNSIGNED_LEXICOGRAPHICAL_BINARY_COMPARATOR); } + + @Override + public Optional visit( + LogicalTypeAnnotation.GeometryLogicalTypeAnnotation geometryLogicalType) { + // TODO: implement a custom comparator for geometries + return of(PrimitiveComparator.UNSIGNED_LEXICOGRAPHICAL_BINARY_COMPARATOR); + } }) .orElseThrow(() -> new ShouldNeverHappenException( "No comparator logic implemented for BINARY logical type: " + logicalType)); diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/Types.java b/parquet-column/src/main/java/org/apache/parquet/schema/Types.java index 5bc2f89f47..45985c7a31 100644 --- a/parquet-column/src/main/java/org/apache/parquet/schema/Types.java +++ b/parquet-column/src/main/java/org/apache/parquet/schema/Types.java @@ -571,6 +571,15 @@ public Optional visit( return checkBinaryPrimitiveType(enumLogicalType); } + @Override + public Optional visit( + LogicalTypeAnnotation.GeometryLogicalTypeAnnotation geometryLogicalType) { + if (geometryLogicalType.getEncoding() != LogicalTypeAnnotation.GeometryEncoding.WKB) { + throw new RuntimeException("Only WKB geometry encoding is supported for now"); + } + return checkBinaryPrimitiveType(geometryLogicalType); + } + private Optional checkFixedPrimitiveType( int l, LogicalTypeAnnotation logicalTypeAnnotation) { Preconditions.checkState( diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java index e752b4ceea..e3c62a931b 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java @@ -31,6 +31,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -78,12 +79,15 @@ import org.apache.parquet.format.DateType; import org.apache.parquet.format.DecimalType; import org.apache.parquet.format.DictionaryPageHeader; +import org.apache.parquet.format.Edges; import org.apache.parquet.format.Encoding; import org.apache.parquet.format.EncryptionWithColumnKey; import org.apache.parquet.format.EnumType; import org.apache.parquet.format.FieldRepetitionType; import org.apache.parquet.format.FileMetaData; import org.apache.parquet.format.Float16Type; +import org.apache.parquet.format.GeometryEncoding; +import org.apache.parquet.format.GeometryType; import org.apache.parquet.format.IntType; import org.apache.parquet.format.JsonType; import org.apache.parquet.format.KeyValue; @@ -346,6 +350,27 @@ static org.apache.parquet.format.TimeUnit convertUnit(LogicalTypeAnnotation.Time } } + static org.apache.parquet.format.GeometryEncoding convertGeometryEncoding( + LogicalTypeAnnotation.GeometryEncoding encoding) { + switch (encoding) { + case WKB: + return org.apache.parquet.format.GeometryEncoding.WKB; + default: + throw new RuntimeException("Unknown geometry encoding " + encoding); + } + } + + static org.apache.parquet.format.Edges convertEdges(LogicalTypeAnnotation.Edges edges) { + switch (edges) { + case PLANAR: + return org.apache.parquet.format.Edges.PLANAR; + case SPHERICAL: + return org.apache.parquet.format.Edges.SPHERICAL; + default: + throw new RuntimeException("Unknown edges " + edges); + } + } + private static class ConvertedTypeConverterVisitor implements LogicalTypeAnnotation.LogicalTypeAnnotationVisitor { @Override @@ -519,6 +544,24 @@ public Optional visit(LogicalTypeAnnotation.Float16LogicalTypeAnnot public Optional visit(LogicalTypeAnnotation.IntervalLogicalTypeAnnotation intervalLogicalType) { return of(LogicalType.UNKNOWN(new NullType())); } + + @Override + public Optional visit(LogicalTypeAnnotation.GeometryLogicalTypeAnnotation geometryLogicalType) { + GeometryType geometryType = new GeometryType(); + if (geometryLogicalType.getEncoding() != null) { + geometryType.setEncoding(convertGeometryEncoding(geometryLogicalType.getEncoding())); + } + if (geometryLogicalType.getCrs() != null) { + geometryType.setCrs(geometryLogicalType.getCrs()); + } + if (geometryLogicalType.getEdges() != null) { + geometryType.setEdges(convertEdges(geometryLogicalType.getEdges())); + } + if (geometryLogicalType.getMetadata() != null) { + geometryType.setMetadata(geometryLogicalType.getMetadata()); + } + return of(LogicalType.GEOMETRY(geometryType)); + } } private void addRowGroup( @@ -1030,6 +1073,12 @@ public Optional visit( LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) { return of(SortOrder.SIGNED); } + + @Override + public Optional visit( + LogicalTypeAnnotation.GeometryLogicalTypeAnnotation geometryLogicalType) { + return of(SortOrder.UNKNOWN); + } }) .orElse(defaultSortOrder(primitive.getPrimitiveTypeName())); } @@ -1173,6 +1222,13 @@ LogicalTypeAnnotation getLogicalTypeAnnotation(LogicalType type) { return LogicalTypeAnnotation.uuidType(); case FLOAT16: return LogicalTypeAnnotation.float16Type(); + case GEOMETRY: + GeometryType geometry = type.getGEOMETRY(); + return LogicalTypeAnnotation.geometryType( + convertGeometryEncoding(geometry.getEncoding()), + convertEdges(geometry.getEdges()), + geometry.getCrs(), + geometry.getMetadata() != null ? ByteBuffer.wrap(geometry.getMetadata()) : null); default: throw new RuntimeException("Unknown logical type " + type); } @@ -1191,6 +1247,32 @@ private LogicalTypeAnnotation.TimeUnit convertTimeUnit(TimeUnit unit) { } } + private LogicalTypeAnnotation.GeometryEncoding convertGeometryEncoding(GeometryEncoding encoding) { + if (encoding == null) { + return null; + } + switch (encoding) { + case WKB: + return LogicalTypeAnnotation.GeometryEncoding.WKB; + default: + throw new RuntimeException("Unknown geometry encoding " + encoding); + } + } + + private LogicalTypeAnnotation.Edges convertEdges(Edges edge) { + if (edge == null) { + return null; + } + switch (edge) { + case PLANAR: + return LogicalTypeAnnotation.Edges.PLANAR; + case SPHERICAL: + return LogicalTypeAnnotation.Edges.SPHERICAL; + default: + throw new RuntimeException("Unknown geometry edge " + edge); + } + } + private static void addKeyValue(FileMetaData fileMetaData, String key, String value) { KeyValue keyValue = new KeyValue(key); keyValue.value = value; diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestGeometryTypeRoundTrip.java b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestGeometryTypeRoundTrip.java new file mode 100644 index 0000000000..a5b1ed6d4d --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestGeometryTypeRoundTrip.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.statistics; + +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.Preconditions; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.GroupFactory; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.hadoop.example.GroupWriteSupport; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.internal.column.columnindex.ColumnIndex; +import org.apache.parquet.io.LocalInputFile; +import org.apache.parquet.io.LocalOutputFile; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.LogicalTypeAnnotation.Edges; +import org.apache.parquet.schema.LogicalTypeAnnotation.GeometryEncoding; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Types; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.locationtech.jts.geom.Coordinate; +import org.locationtech.jts.geom.Geometry; +import org.locationtech.jts.geom.GeometryFactory; +import org.locationtech.jts.io.WKBWriter; +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.util.Collections; + +import static org.apache.parquet.schema.LogicalTypeAnnotation.geometryType; +import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY; +import static org.junit.Assert.assertEquals; + +public class TestGeometryTypeRoundTrip { + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + private Path newTempPath() throws IOException { + File file = temp.newFile(); + Preconditions.checkArgument(file.delete(), "Could not remove temp file"); + return file.toPath(); + } + + @Test + public void testBasicReadWriteGeometryValue() throws IOException { + GeometryFactory geomFactory = new GeometryFactory(); + WKBWriter wkbWriter = new WKBWriter(); + Binary[] points = { + Binary.fromConstantByteArray(wkbWriter.write(geomFactory.createPoint(new Coordinate(1.0, 1.0)))), + Binary.fromConstantByteArray(wkbWriter.write(geomFactory.createPoint(new Coordinate(2.0, 2.0)))) + }; + + MessageType schema = Types.buildMessage() + .required(BINARY) + .as(geometryType(GeometryEncoding.WKB, Edges.PLANAR, "EPSG:4326",null)) + .named("col_geom") + .named("msg"); + + Configuration conf = new Configuration(); + GroupWriteSupport.setSchema(schema, conf); + GroupFactory factory = new SimpleGroupFactory(schema); + Path path = newTempPath(); + try (ParquetWriter writer = ExampleParquetWriter.builder(new LocalOutputFile(path)) + .withConf(conf) + .withDictionaryEncoding(false) + .build()) { + for (Binary value : points) { + writer.write(factory.newGroup().append("col_geom", value)); + } + } + + try (ParquetFileReader reader = + ParquetFileReader.open(new LocalInputFile(path))) { + Assert.assertEquals(2, reader.getRecordCount()); + + System.out.println("Footer"); + System.out.println(reader.getFooter().toString()); + + ColumnChunkMetaData columnChunkMetaData = reader.getRowGroups().get(0).getColumns().get(0); + System.out.println("Statistics"); + System.out.println(columnChunkMetaData.getStatistics().toString()); + } + } + +} diff --git a/pom.xml b/pom.xml index f7c29f6a72..cd797708b1 100644 --- a/pom.xml +++ b/pom.xml @@ -75,7 +75,7 @@ 2.30.0 shaded.parquet 3.3.6 - 2.10.0 + 2.11.0-SNAPSHOT 1.13.1 thrift ${thrift.executable} @@ -98,6 +98,7 @@ 2.0.9 0.16 1.6.0 + 1.19.0 2.3