Skip to content

Commit

Permalink
Remove geometry value
Browse files Browse the repository at this point in the history
  • Loading branch information
bchapuis committed May 22, 2024
1 parent e37c3a8 commit 707d622
Show file tree
Hide file tree
Showing 12 changed files with 270 additions and 348 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.baremaps.geoparquet;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.net.URI;
Expand All @@ -40,30 +39,28 @@
import org.apache.parquet.io.ColumnIOFactory;
import org.apache.parquet.io.RecordReader;
import org.apache.parquet.schema.MessageType;
import org.locationtech.jts.io.WKBReader;


/**
* This reader is based on the parquet example code located at: org.apache.parquet.example.data.*.
*/
public class GeoParquetReader {

private final URI uri;

private Configuration configuration;

private WKBReader wkbReader = new WKBReader();

private Map<FileStatus, GeoParquetFileInfo> metadata = new LinkedHashMap<>();

private long rowCount;
private Map<FileStatus, FileInfo> metadata = new LinkedHashMap<>();

public GeoParquetReader(URI uri) {
this.uri = uri;
this.initialize();
}

public void initialize() {
this.rowCount = 0;
this.configuration = getConfiguration();

try {
this.configuration = getConfiguration();

// List all the files that match the glob pattern
Path globPath = new Path(uri.getPath());
URI rootUri = getRootUri(uri);
Expand All @@ -90,18 +87,12 @@ public void initialize() {
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
.readValue(json, GeoParquetMetadata.class);

// Increment the total number of rows
this.rowCount += rowCount;

// Get the geometry columns of the Parquet file
Set<String> geometryColumns = geoParquetMetadata.getColumns().keySet();

// Store the metadata of the Parquet file
this.metadata.put(fileStatus, new GeoParquetFileInfo(rowCount, parquetMetadata,
geoParquetMetadata, geometryColumns));
this.metadata.put(
fileStatus,
new FileInfo(rowCount, parquetMetadata, geoParquetMetadata));
}
}

} catch (Exception e) {
throw new RuntimeException(e);
}
Expand All @@ -122,19 +113,6 @@ private static Configuration getConfiguration() {
return configuration;
}

private static int getSrid(GeoParquetMetadata geoParquetMetadata, String name) {
JsonNode crsId = geoParquetMetadata.getColumns().get(name).getCrs().get("id");
int srid = switch (crsId.get("authority").asText()) {
case "OGC" -> switch (crsId.get("code").asText()) {
case "CRS84" -> 4326;
default -> 0;
};
case "EPSG" -> crsId.get("code").asInt();
default -> 0;
};
return srid;
}

private static URI getRootUri(URI uri) throws URISyntaxException {
String path = uri.getPath();
int index = path.indexOf("*");
Expand All @@ -153,9 +131,10 @@ private static URI getRootUri(URI uri) throws URISyntaxException {

private class GroupIterator implements Iterator<GeoParquetGroup> {

private Iterator<Map.Entry<FileStatus, GeoParquetFileInfo>> fileIterator;
private Iterator<Map.Entry<FileStatus, FileInfo>> fileIterator;

private Map.Entry<FileStatus, FileInfo> currentFileStatus;

private Map.Entry<FileStatus, GeoParquetFileInfo> currentFileStatus;
private Iterator<PageReadStore> pageReadStoreIterator;

private PageReadStore currentPageReadStore;
Expand All @@ -169,7 +148,7 @@ public GroupIterator() throws IOException {
this.currentFileStatus = fileIterator.next();
this.pageReadStoreIterator = new PageReadStoreIterator(currentFileStatus);
this.currentPageReadStore = pageReadStoreIterator.next();
this.simpleGroupIterator = new FeatureGroupIterator(
this.simpleGroupIterator = new GeoParquetGroupIterator(
currentFileStatus.getValue(),
currentPageReadStore);
this.currentGeoParquetGroup = simpleGroupIterator.next();
Expand All @@ -181,7 +160,7 @@ public boolean hasNext() {
return true;
} else if (pageReadStoreIterator.hasNext()) {
currentPageReadStore = pageReadStoreIterator.next();
simpleGroupIterator = new FeatureGroupIterator(
simpleGroupIterator = new GeoParquetGroupIterator(
currentFileStatus.getValue(),
currentPageReadStore);
return hasNext();
Expand Down Expand Up @@ -209,15 +188,12 @@ private class PageReadStoreIterator implements Iterator<PageReadStore> {

private final ParquetFileReader parquetFileReader;

private final MessageType messageType;

private PageReadStore next;

public PageReadStoreIterator(Map.Entry<FileStatus, GeoParquetFileInfo> fileInfo)
public PageReadStoreIterator(Map.Entry<FileStatus, FileInfo> fileInfo)
throws IOException {
this.parquetFileReader = ParquetFileReader
.open(HadoopInputFile.fromPath(fileInfo.getKey().getPath(), configuration));
this.messageType = this.parquetFileReader.getFooter().getFileMetaData().getSchema();
try {
next = parquetFileReader.readNextRowGroup();
} catch (IOException e) {
Expand Down Expand Up @@ -258,20 +234,20 @@ public PageReadStore next() {
}
}

private static class FeatureGroupIterator implements Iterator<GeoParquetGroup> {
private static class GeoParquetGroupIterator implements Iterator<GeoParquetGroup> {
private final long rowCount;
private final RecordReader<GeoParquetGroup> recordReader;

private long i = 0;

private FeatureGroupIterator(GeoParquetFileInfo geoParquetFileInfo,
private GeoParquetGroupIterator(FileInfo fileInfo,
PageReadStore pageReadStore) {
this.rowCount = pageReadStore.getRowCount();

MessageType schema = geoParquetFileInfo.getParquetMetadata().getFileMetaData().getSchema();
MessageType schema = fileInfo.getParquetMetadata().getFileMetaData().getSchema();
this.recordReader = new ColumnIOFactory()
.getColumnIO(schema)
.getRecordReader(pageReadStore, new GeoParquetMaterializer(geoParquetFileInfo));
.getRecordReader(pageReadStore, new GeoParquetMaterializer(schema));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,23 @@
package org.apache.baremaps.geoparquet.data;

import com.google.common.base.Objects;
import java.util.Set;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;

public final class GeoParquetFileInfo {
public final class FileInfo {

private final long rowCount;

private final ParquetMetadata parquetMetadata;

private final GeoParquetMetadata geoParquetMetadata;
private final Set<String> geometryColumns;

public GeoParquetFileInfo(
public FileInfo(
long rowCount,
ParquetMetadata parquetMetadata,
GeoParquetMetadata geoParquetMetadata,
Set<String> geometryColumns) {
GeoParquetMetadata geoParquetMetadata) {
this.rowCount = rowCount;
this.parquetMetadata = parquetMetadata;
this.geoParquetMetadata = geoParquetMetadata;
this.geometryColumns = geometryColumns;
}

public long getRowCount() {
Expand All @@ -51,19 +49,6 @@ public GeoParquetMetadata getGeoParquetMetadata() {
return geoParquetMetadata;
}

public Set<String> getGeometryColumns() {
return geometryColumns;
}

public boolean isGeometryColumn(String column) {
return geometryColumns.contains(column);
}

public boolean isGeometryColumn(int column) {
return isGeometryColumn(
parquetMetadata.getFileMetaData().getSchema().getColumns().get(column).getPath()[0]);
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -72,15 +57,14 @@ public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}
GeoParquetFileInfo that = (GeoParquetFileInfo) o;
FileInfo that = (FileInfo) o;
return rowCount == that.rowCount
&& Objects.equal(parquetMetadata, that.parquetMetadata)
&& Objects.equal(geoParquetMetadata, that.geoParquetMetadata)
&& Objects.equal(geometryColumns, that.geometryColumns);
&& Objects.equal(geoParquetMetadata, that.geoParquetMetadata);
}

@Override
public int hashCode() {
return Objects.hashCode(rowCount, parquetMetadata, geoParquetMetadata, geometryColumns);
return Objects.hashCode(rowCount, parquetMetadata, geoParquetMetadata);
}
}
Loading

0 comments on commit 707d622

Please sign in to comment.