From 707d6222309a41ee562302dbc772949440ca84a0 Mon Sep 17 00:00:00 2001 From: Bertil Chapuis Date: Wed, 22 May 2024 15:51:04 +0200 Subject: [PATCH] Remove geometry value --- .../baremaps/geoparquet/GeoParquetReader.java | 64 ++--- ...{GeoParquetFileInfo.java => FileInfo.java} | 32 +-- .../geoparquet/data/GeoParquetGroup.java | 220 ++++++++++-------- .../data/GeoParquetGroupConverter.java | 7 +- .../data/GeoParquetGroupFactory.java | 9 +- .../data/GeoParquetMaterializer.java | 12 +- .../geoparquet/data/GeoParquetMetadata.java | 18 ++ .../geoparquet/data/GeometryValue.java | 67 ------ .../geoparquet/data/GroupValueSource.java | 88 ------- .../hadoop/GeoParquetInputFormat.java | 35 +++ .../hadoop/GeoParquetReadSupport.java | 49 ++++ .../geoparquet/GeoParquetReaderTest.java | 17 +- 12 files changed, 270 insertions(+), 348 deletions(-) rename baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/{GeoParquetFileInfo.java => FileInfo.java} (71%) delete mode 100644 baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeometryValue.java delete mode 100644 baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GroupValueSource.java create mode 100644 baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetInputFormat.java create mode 100644 baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetReadSupport.java diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java index c5099c2a2..b1b35f4c8 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java @@ -18,7 +18,6 @@ package org.apache.baremaps.geoparquet; import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; import java.net.URI; @@ -40,19 +39,18 @@ import org.apache.parquet.io.ColumnIOFactory; import org.apache.parquet.io.RecordReader; import org.apache.parquet.schema.MessageType; -import org.locationtech.jts.io.WKBReader; + +/** + * This reader is based on the parquet example code located at: org.apache.parquet.example.data.*. + */ public class GeoParquetReader { private final URI uri; private Configuration configuration; - private WKBReader wkbReader = new WKBReader(); - - private Map metadata = new LinkedHashMap<>(); - - private long rowCount; + private Map metadata = new LinkedHashMap<>(); public GeoParquetReader(URI uri) { this.uri = uri; @@ -60,10 +58,9 @@ public GeoParquetReader(URI uri) { } public void initialize() { - this.rowCount = 0; - this.configuration = getConfiguration(); - try { + this.configuration = getConfiguration(); + // List all the files that match the glob pattern Path globPath = new Path(uri.getPath()); URI rootUri = getRootUri(uri); @@ -90,18 +87,12 @@ public void initialize() { .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) .readValue(json, GeoParquetMetadata.class); - // Increment the total number of rows - this.rowCount += rowCount; - - // Get the geometry columns of the Parquet file - Set geometryColumns = geoParquetMetadata.getColumns().keySet(); - // Store the metadata of the Parquet file - this.metadata.put(fileStatus, new GeoParquetFileInfo(rowCount, parquetMetadata, - geoParquetMetadata, geometryColumns)); + this.metadata.put( + fileStatus, + new FileInfo(rowCount, parquetMetadata, geoParquetMetadata)); } } - } catch (Exception e) { throw new RuntimeException(e); } @@ -122,19 +113,6 @@ private static Configuration getConfiguration() { return configuration; } - private static int getSrid(GeoParquetMetadata geoParquetMetadata, String name) { - JsonNode crsId = geoParquetMetadata.getColumns().get(name).getCrs().get("id"); - int srid = switch (crsId.get("authority").asText()) { - case "OGC" -> switch (crsId.get("code").asText()) { - case "CRS84" -> 4326; - default -> 0; - }; - case "EPSG" -> crsId.get("code").asInt(); - default -> 0; - }; - return srid; - } - private static URI getRootUri(URI uri) throws URISyntaxException { String path = uri.getPath(); int index = path.indexOf("*"); @@ -153,9 +131,10 @@ private static URI getRootUri(URI uri) throws URISyntaxException { private class GroupIterator implements Iterator { - private Iterator> fileIterator; + private Iterator> fileIterator; + + private Map.Entry currentFileStatus; - private Map.Entry currentFileStatus; private Iterator pageReadStoreIterator; private PageReadStore currentPageReadStore; @@ -169,7 +148,7 @@ public GroupIterator() throws IOException { this.currentFileStatus = fileIterator.next(); this.pageReadStoreIterator = new PageReadStoreIterator(currentFileStatus); this.currentPageReadStore = pageReadStoreIterator.next(); - this.simpleGroupIterator = new FeatureGroupIterator( + this.simpleGroupIterator = new GeoParquetGroupIterator( currentFileStatus.getValue(), currentPageReadStore); this.currentGeoParquetGroup = simpleGroupIterator.next(); @@ -181,7 +160,7 @@ public boolean hasNext() { return true; } else if (pageReadStoreIterator.hasNext()) { currentPageReadStore = pageReadStoreIterator.next(); - simpleGroupIterator = new FeatureGroupIterator( + simpleGroupIterator = new GeoParquetGroupIterator( currentFileStatus.getValue(), currentPageReadStore); return hasNext(); @@ -209,15 +188,12 @@ private class PageReadStoreIterator implements Iterator { private final ParquetFileReader parquetFileReader; - private final MessageType messageType; - private PageReadStore next; - public PageReadStoreIterator(Map.Entry fileInfo) + public PageReadStoreIterator(Map.Entry fileInfo) throws IOException { this.parquetFileReader = ParquetFileReader .open(HadoopInputFile.fromPath(fileInfo.getKey().getPath(), configuration)); - this.messageType = this.parquetFileReader.getFooter().getFileMetaData().getSchema(); try { next = parquetFileReader.readNextRowGroup(); } catch (IOException e) { @@ -258,20 +234,20 @@ public PageReadStore next() { } } - private static class FeatureGroupIterator implements Iterator { + private static class GeoParquetGroupIterator implements Iterator { private final long rowCount; private final RecordReader recordReader; private long i = 0; - private FeatureGroupIterator(GeoParquetFileInfo geoParquetFileInfo, + private GeoParquetGroupIterator(FileInfo fileInfo, PageReadStore pageReadStore) { this.rowCount = pageReadStore.getRowCount(); - MessageType schema = geoParquetFileInfo.getParquetMetadata().getFileMetaData().getSchema(); + MessageType schema = fileInfo.getParquetMetadata().getFileMetaData().getSchema(); this.recordReader = new ColumnIOFactory() .getColumnIO(schema) - .getRecordReader(pageReadStore, new GeoParquetMaterializer(geoParquetFileInfo)); + .getRecordReader(pageReadStore, new GeoParquetMaterializer(schema)); } @Override diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetFileInfo.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/FileInfo.java similarity index 71% rename from baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetFileInfo.java rename to baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/FileInfo.java index 811c6186b..bd7ccaee6 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetFileInfo.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/FileInfo.java @@ -18,25 +18,23 @@ package org.apache.baremaps.geoparquet.data; import com.google.common.base.Objects; -import java.util.Set; import org.apache.parquet.hadoop.metadata.ParquetMetadata; -public final class GeoParquetFileInfo { +public final class FileInfo { private final long rowCount; + private final ParquetMetadata parquetMetadata; + private final GeoParquetMetadata geoParquetMetadata; - private final Set geometryColumns; - public GeoParquetFileInfo( + public FileInfo( long rowCount, ParquetMetadata parquetMetadata, - GeoParquetMetadata geoParquetMetadata, - Set geometryColumns) { + GeoParquetMetadata geoParquetMetadata) { this.rowCount = rowCount; this.parquetMetadata = parquetMetadata; this.geoParquetMetadata = geoParquetMetadata; - this.geometryColumns = geometryColumns; } public long getRowCount() { @@ -51,19 +49,6 @@ public GeoParquetMetadata getGeoParquetMetadata() { return geoParquetMetadata; } - public Set getGeometryColumns() { - return geometryColumns; - } - - public boolean isGeometryColumn(String column) { - return geometryColumns.contains(column); - } - - public boolean isGeometryColumn(int column) { - return isGeometryColumn( - parquetMetadata.getFileMetaData().getSchema().getColumns().get(column).getPath()[0]); - } - @Override public boolean equals(Object o) { if (this == o) { @@ -72,15 +57,14 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) { return false; } - GeoParquetFileInfo that = (GeoParquetFileInfo) o; + FileInfo that = (FileInfo) o; return rowCount == that.rowCount && Objects.equal(parquetMetadata, that.parquetMetadata) - && Objects.equal(geoParquetMetadata, that.geoParquetMetadata) - && Objects.equal(geometryColumns, that.geometryColumns); + && Objects.equal(geoParquetMetadata, that.geoParquetMetadata); } @Override public int hashCode() { - return Objects.hashCode(rowCount, parquetMetadata, geoParquetMetadata, geometryColumns); + return Objects.hashCode(rowCount, parquetMetadata, geoParquetMetadata); } } diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroup.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroup.java index fdf2dddc2..0e5bf3275 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroup.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroup.java @@ -23,91 +23,64 @@ import org.apache.parquet.io.api.RecordConsumer; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.Type; +import org.locationtech.jts.geom.Geometry; +import org.locationtech.jts.io.ParseException; +import org.locationtech.jts.io.WKBReader; +import org.locationtech.jts.io.WKBWriter; public class GeoParquetGroup { - private final GeoParquetFileInfo fileInfo; - private final GroupType schema; + private final GroupType groupType; + private final List[] data; @SuppressWarnings("unchecked") - public GeoParquetGroup(GeoParquetFileInfo fileInfo, GroupType schema) { - this.fileInfo = fileInfo; - this.schema = schema; - this.data = new List[schema.getFields().size()]; - for (int i = 0; i < schema.getFieldCount(); i++) { + public GeoParquetGroup(GroupType groupType) { + this.groupType = groupType; + this.data = new List[groupType.getFields().size()]; + for (int i = 0; i < groupType.getFieldCount(); i++) { this.data[i] = new ArrayList<>(); } } - @Override - public String toString() { - return toString(""); - } - - private void appendToString(StringBuilder builder, String indent) { - int i = 0; - for (Type field : schema.getFields()) { - String name = field.getName(); - List values = data[i]; - ++i; - if (values != null && !values.isEmpty()) { - for (Object value : values) { - builder.append(indent).append(name); - if (value == null) { - builder.append(": NULL\n"); - } else if (value instanceof GeoParquetGroup) { - builder.append('\n'); - ((GeoParquetGroup) value).appendToString(builder, indent + " "); - } else { - builder.append(": ").append(value.toString()).append('\n'); - } - } - } - } - } - - public String toString(String indent) { - StringBuilder builder = new StringBuilder(); - appendToString(builder, indent); - return builder.toString(); + public GroupType getGroupType() { + return groupType; } public GeoParquetGroup addGroup(int fieldIndex) { - GeoParquetGroup g = new GeoParquetGroup(fileInfo, schema.getType(fieldIndex).asGroupType()); + GeoParquetGroup g = new GeoParquetGroup(groupType.getType(fieldIndex).asGroupType()); add(fieldIndex, g); return g; } + public GeoParquetGroup addGroup(String field) { + return addGroup(getGroupType().getFieldIndex(field)); + } + public GeoParquetGroup getGroup(int fieldIndex, int index) { return (GeoParquetGroup) getValue(fieldIndex, index); } + public GeoParquetGroup getGroup(String field, int index) { + return getGroup(getGroupType().getFieldIndex(field), index); + } + private Object getValue(int fieldIndex, int index) { List list; try { list = data[fieldIndex]; } catch (IndexOutOfBoundsException e) { - throw new RuntimeException("not found " + fieldIndex + "(" + schema.getFieldName(fieldIndex) - + ") in group:\n" + this); + throw new RuntimeException( + "not found " + fieldIndex + "(" + groupType.getFieldName(fieldIndex) + + ") in group:\n" + this); } try { return list.get(index); } catch (IndexOutOfBoundsException e) { - throw new RuntimeException("not found " + fieldIndex + "(" + schema.getFieldName(fieldIndex) - + ") element number " + index + " in group:\n" + this); - } - } - - private void add(int fieldIndex, Primitive value) { - Type type = schema.getType(fieldIndex); - List list = data[fieldIndex]; - if (!type.isRepetition(Type.Repetition.REPEATED) - && !list.isEmpty()) { - throw new IllegalStateException("field " + fieldIndex + " (" + type.getName() - + ") can not have more than one value: " + list); + throw new RuntimeException( + "not found " + fieldIndex + "(" + groupType.getFieldName(fieldIndex) + + ") element number " + index + " in group:\n" + this); } - list.add(value); } public int getFieldRepetitionCount(int fieldIndex) { @@ -155,6 +128,26 @@ public Binary getInt96(int fieldIndex, int index) { return ((Int96Value) getValue(fieldIndex, index)).getInt96(); } + public Geometry getGeometry(int fieldIndex, int index) { + byte[] bytes = ((BinaryValue) getValue(fieldIndex, index)).getBinary().getBytes(); + try { + return new WKBReader().read(bytes); + } catch (ParseException e) { + throw new RuntimeException(e); + } + } + + private void add(int fieldIndex, Primitive value) { + Type type = groupType.getType(fieldIndex); + List list = data[fieldIndex]; + if (!type.isRepetition(Type.Repetition.REPEATED) + && !list.isEmpty()) { + throw new IllegalStateException("field " + fieldIndex + " (" + type.getName() + + ") can not have more than one value: " + list); + } + list.add(value); + } + public void add(int fieldIndex, int value) { add(fieldIndex, new IntegerValue(value)); } @@ -176,22 +169,17 @@ public void add(int fieldIndex, boolean value) { } public void add(int fieldIndex, Binary value) { - switch (getType().getType(fieldIndex).asPrimitiveType().getPrimitiveTypeName()) { + switch (getGroupType().getType(fieldIndex).asPrimitiveType().getPrimitiveTypeName()) { case BINARY: case FIXED_LEN_BYTE_ARRAY: - String fieldName = schema.getFieldName(fieldIndex); - if (fileInfo.getGeometryColumns().contains(fieldName)) { - add(fieldIndex, new GeometryValue(value)); - } else { - add(fieldIndex, new BinaryValue(value)); - } + add(fieldIndex, new BinaryValue(value)); break; case INT96: add(fieldIndex, new Int96Value(value)); break; default: throw new UnsupportedOperationException( - getType().asPrimitiveType().getName() + " not supported for Binary"); + getGroupType().asPrimitiveType().getName() + " not supported for Binary"); } } @@ -207,20 +195,54 @@ public void add(int fieldIndex, GeoParquetGroup value) { data[fieldIndex].add(value); } - public GroupType getType() { - return schema; + public void add(int fieldIndex, Geometry geometry) { + byte[] bytes = new WKBWriter().write(geometry); + add(fieldIndex, Binary.fromConstantByteArray(bytes)); } - public void writeValue(int field, int index, RecordConsumer recordConsumer) { - ((Primitive) getValue(field, index)).writeValue(recordConsumer); + public void add(String field, int value) { + add(getGroupType().getFieldIndex(field), value); } - public GeoParquetGroup addGroup(String field) { - return addGroup(getType().getFieldIndex(field)); + public void add(String field, long value) { + add(getGroupType().getFieldIndex(field), value); } - public GeoParquetGroup getGroup(String field, int index) { - return getGroup(getType().getFieldIndex(field), index); + public void add(String field, float value) { + add(getGroupType().getFieldIndex(field), value); + } + + public void add(String field, double value) { + add(getGroupType().getFieldIndex(field), value); + } + + public void add(String field, String value) { + add(getGroupType().getFieldIndex(field), value); + } + + public void add(String field, NanoTime value) { + add(getGroupType().getFieldIndex(field), value); + } + + public void add(String field, boolean value) { + add(getGroupType().getFieldIndex(field), value); + } + + public void add(String field, Binary value) { + add(getGroupType().getFieldIndex(field), value); + } + + public void add(String field, GeoParquetGroup value) { + add(getGroupType().getFieldIndex(field), value); + } + + public void add(String field, Geometry geometry) { + byte[] bytes = new WKBWriter().write(geometry); + add(getGroupType().getFieldIndex(field), Binary.fromConstantByteArray(bytes)); + } + + public void writeValue(int field, int index, RecordConsumer recordConsumer) { + ((Primitive) getValue(field, index)).writeValue(recordConsumer); } public GeoParquetGroup append(String fieldName, int value) { @@ -263,40 +285,42 @@ public GeoParquetGroup append(String fieldName, Binary value) { return this; } - public void add(String field, int value) { - add(getType().getFieldIndex(field), value); - } - - public void add(String field, long value) { - add(getType().getFieldIndex(field), value); - } - - public void add(String field, float value) { - add(getType().getFieldIndex(field), value); - } - - public void add(String field, double value) { - add(getType().getFieldIndex(field), value); - } - - public void add(String field, String value) { - add(getType().getFieldIndex(field), value); - } - - public void add(String field, NanoTime value) { - add(getType().getFieldIndex(field), value); + public GeoParquetGroup append(String fieldName, Geometry geometry) { + add(fieldName, geometry); + return this; } - public void add(String field, boolean value) { - add(getType().getFieldIndex(field), value); + @Override + public String toString() { + return toString(""); } - public void add(String field, Binary value) { - add(getType().getFieldIndex(field), value); + private void appendToString(StringBuilder builder, String indent) { + int i = 0; + for (Type field : groupType.getFields()) { + String name = field.getName(); + List values = data[i]; + ++i; + if (values != null && !values.isEmpty()) { + for (Object value : values) { + builder.append(indent).append(name); + if (value == null) { + builder.append(": NULL\n"); + } else if (value instanceof GeoParquetGroup) { + builder.append('\n'); + ((GeoParquetGroup) value).appendToString(builder, indent + " "); + } else { + builder.append(": ").append(value.toString()).append('\n'); + } + } + } + } } - public void add(String field, GeoParquetGroup value) { - add(getType().getFieldIndex(field), value); + public String toString(String indent) { + StringBuilder builder = new StringBuilder(); + appendToString(builder, indent); + return builder.toString(); } } diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupConverter.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupConverter.java index f7f49c873..c14107e3c 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupConverter.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupConverter.java @@ -24,27 +24,24 @@ class GeoParquetGroupConverter extends GroupConverter { - private final GeoParquetFileInfo fileInfo; private final GeoParquetGroupConverter parent; private final int index; protected GeoParquetGroup current; private Converter[] converters; - GeoParquetGroupConverter(GeoParquetFileInfo fileInfo, GeoParquetGroupConverter parent, int index, + GeoParquetGroupConverter(GeoParquetGroupConverter parent, int index, GroupType schema) { - this.fileInfo = fileInfo; this.parent = parent; this.index = index; converters = new Converter[schema.getFieldCount()]; for (int i = 0; i < converters.length; i++) { - final String name = schema.getName(); final Type type = schema.getType(i); if (type.isPrimitive()) { converters[i] = new GeoParquetPrimitiveConverter(this, i); } else { - converters[i] = new GeoParquetGroupConverter(fileInfo, this, i, type.asGroupType()); + converters[i] = new GeoParquetGroupConverter(this, i, type.asGroupType()); } } diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupFactory.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupFactory.java index 774abc8e3..6c7d5a4f1 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupFactory.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupFactory.java @@ -21,17 +21,14 @@ public class GeoParquetGroupFactory { - private final GeoParquetFileInfo fileInfo; - private final MessageType schema; - public GeoParquetGroupFactory(GeoParquetFileInfo fileInfo, MessageType schema) { - this.fileInfo = fileInfo; + public GeoParquetGroupFactory(MessageType schema) { this.schema = schema; } - public GeoParquetGroup newFeatureGroup() { - return new GeoParquetGroup(fileInfo, schema); + public GeoParquetGroup newGroup() { + return new GeoParquetGroup(schema); } } diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetMaterializer.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetMaterializer.java index 0d26f7582..1847ff6f1 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetMaterializer.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetMaterializer.java @@ -27,17 +27,13 @@ public class GeoParquetMaterializer extends RecordMaterializer private GeoParquetGroupConverter root; - public GeoParquetMaterializer(GeoParquetFileInfo fileInfo) { - MessageType schema = fileInfo.getParquetMetadata().getFileMetaData().getSchema(); - this.geoParquetGroupFactory = new GeoParquetGroupFactory(fileInfo, schema); - this.root = new GeoParquetGroupConverter(fileInfo, null, 0, schema) { + public GeoParquetMaterializer(MessageType schema) { + this.geoParquetGroupFactory = new GeoParquetGroupFactory(schema); + this.root = new GeoParquetGroupConverter(null, 0, schema) { @Override public void start() { - this.current = geoParquetGroupFactory.newFeatureGroup(); + this.current = geoParquetGroupFactory.newGroup(); } - - @Override - public void end() {} }; } diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetMetadata.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetMetadata.java index 64f6a2ce0..eccbedc6a 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetMetadata.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetMetadata.java @@ -18,6 +18,7 @@ package org.apache.baremaps.geoparquet.data; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.JsonNode; import com.google.common.base.Objects; import java.util.Map; @@ -58,6 +59,23 @@ public void setColumns(Map columns) { this.columns = columns; } + public int getSrid(String column) { + JsonNode crsId = getColumns().get(column).getCrs().get("id"); + int srid = switch (crsId.get("authority").asText()) { + case "OGC" -> switch (crsId.get("code").asText()) { + case "CRS84" -> 4326; + default -> 0; + }; + case "EPSG" -> crsId.get("code").asInt(); + default -> 0; + }; + return srid; + } + + public boolean isGeometryColumn(String column) { + return columns.containsKey(column); + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeometryValue.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeometryValue.java deleted file mode 100644 index 9757307a2..000000000 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeometryValue.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.baremaps.geoparquet.data; - -import org.apache.parquet.io.api.Binary; -import org.apache.parquet.io.api.RecordConsumer; -import org.locationtech.jts.geom.Geometry; -import org.locationtech.jts.io.ParseException; -import org.locationtech.jts.io.WKBReader; -import org.locationtech.jts.io.WKBWriter; - - -public class GeometryValue extends Primitive { - - private final Binary binary; - - public GeometryValue(Binary binary) { - this.binary = binary; - } - - public GeometryValue(Geometry geometry) { - this.binary = Binary.fromConstantByteArray(new WKBWriter().write(geometry)); - } - - @Override - public Geometry getGeometry() { - try { - return new WKBReader().read(binary.getBytes()); - } catch (ParseException e) { - throw new RuntimeException(e); - } - } - - @Override - public String getString() { - try { - return new WKBReader().read(binary.getBytes()).toString(); - } catch (ParseException e) { - throw new RuntimeException(e); - } - } - - @Override - public void writeValue(RecordConsumer recordConsumer) { - recordConsumer.addBinary(binary); - } - - @Override - public String toString() { - return getString(); - } -} diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GroupValueSource.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GroupValueSource.java deleted file mode 100644 index f029cd1fe..000000000 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GroupValueSource.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.baremaps.geoparquet.data; - -import org.apache.parquet.io.api.Binary; -import org.apache.parquet.schema.GroupType; - -abstract public class GroupValueSource { - - public int getFieldRepetitionCount(String field) { - return getFieldRepetitionCount(getType().getFieldIndex(field)); - } - - public GroupValueSource getGroup(String field, int index) { - return getGroup(getType().getFieldIndex(field), index); - } - - public String getString(String field, int index) { - return getString(getType().getFieldIndex(field), index); - } - - public int getInteger(String field, int index) { - return getInteger(getType().getFieldIndex(field), index); - } - - public long getLong(String field, int index) { - return getLong(getType().getFieldIndex(field), index); - } - - public double getDouble(String field, int index) { - return getDouble(getType().getFieldIndex(field), index); - } - - public float getFloat(String field, int index) { - return getFloat(getType().getFieldIndex(field), index); - } - - public boolean getBoolean(String field, int index) { - return getBoolean(getType().getFieldIndex(field), index); - } - - public Binary getBinary(String field, int index) { - return getBinary(getType().getFieldIndex(field), index); - } - - public Binary getInt96(String field, int index) { - return getInt96(getType().getFieldIndex(field), index); - } - - abstract public int getFieldRepetitionCount(int fieldIndex); - - abstract public GroupValueSource getGroup(int fieldIndex, int index); - - abstract public String getString(int fieldIndex, int index); - - abstract public int getInteger(int fieldIndex, int index); - - abstract public long getLong(int fieldIndex, int index); - - abstract public double getDouble(int fieldIndex, int index); - - abstract public float getFloat(int fieldIndex, int index); - - abstract public boolean getBoolean(int fieldIndex, int index); - - abstract public Binary getBinary(int fieldIndex, int index); - - abstract public Binary getInt96(int fieldIndex, int index); - - abstract public String getValueToString(int fieldIndex, int index); - - abstract public GroupType getType(); -} diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetInputFormat.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetInputFormat.java new file mode 100644 index 000000000..2e4e67497 --- /dev/null +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetInputFormat.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.baremaps.geoparquet.hadoop; + +import org.apache.baremaps.geoparquet.data.GeoParquetGroup; +import org.apache.parquet.hadoop.ParquetInputFormat; + +/** + * Example input format to read Parquet files + * + * This Input format uses a rather inefficient data model but works independently of higher level + * abstractions. + */ +public class GeoParquetInputFormat extends ParquetInputFormat { + + public GeoParquetInputFormat() { + super(GeoParquetReadSupport.class); + } + +} diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetReadSupport.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetReadSupport.java new file mode 100644 index 000000000..a875f427b --- /dev/null +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetReadSupport.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.baremaps.geoparquet.hadoop; + +import java.util.Map; +import org.apache.baremaps.geoparquet.data.GeoParquetGroup; +import org.apache.baremaps.geoparquet.data.GeoParquetMaterializer; +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.hadoop.api.ReadSupport; +import org.apache.parquet.io.api.RecordMaterializer; +import org.apache.parquet.schema.MessageType; + +public class GeoParquetReadSupport extends ReadSupport { + + @Override + public ReadContext init( + Configuration configuration, + Map keyValueMetaData, + MessageType fileSchema) { + String partialSchemaString = configuration.get(ReadSupport.PARQUET_READ_SCHEMA); + MessageType requestedProjection = getSchemaForRead(fileSchema, partialSchemaString); + return new ReadContext(requestedProjection); + } + + @Override + public RecordMaterializer prepareForRead( + Configuration configuration, + Map keyValueMetaData, + MessageType fileSchema, + ReadContext readContext) { + return new GeoParquetMaterializer(readContext.getRequestedSchema()); + } + +} diff --git a/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetReaderTest.java b/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetReaderTest.java index c109dd535..78e46f926 100644 --- a/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetReaderTest.java +++ b/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetReaderTest.java @@ -19,22 +19,23 @@ import java.io.IOException; import java.net.URI; -import java.util.List; -import org.apache.baremaps.geoparquet.data.GeoParquetGroup; +import java.net.URISyntaxException; +import org.apache.baremaps.geoparquet.hadoop.GeoParquetReadSupport; import org.apache.baremaps.testing.TestFiles; import org.junit.jupiter.api.Test; class GeoParquetReaderTest { @Test - void read() throws IOException { + void read() throws IOException, URISyntaxException { + // URI geoParquet = new + // URI("s3a://overturemaps-us-west-2/release/2024-03-12-alpha.0/theme=admins/type=locality_area/*.parquet"); URI geoParquet = TestFiles.GEOPARQUET.toUri(); System.out.println(geoParquet); GeoParquetReader geoParquetReader = new GeoParquetReader(geoParquet); - List geoParquetList = geoParquetReader.read().toList(); - for (GeoParquetGroup geoParquetGroup : geoParquetList) { - System.out.println(geoParquetGroup); - } + geoParquetReader.read().forEach(group -> { + System.out.println("--------------------"); + System.out.println(group); + }); } - }