Skip to content

Commit

Permalink
Implement a geoparquet writer (#899)
Browse files Browse the repository at this point in the history
  • Loading branch information
bchapuis authored Nov 5, 2024
1 parent 39a8abf commit f2fb634
Show file tree
Hide file tree
Showing 10 changed files with 429 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,16 @@ public void setup() throws IOException {
@SuppressWarnings({"squid:S1481", "squid:S2201"})
@Benchmark
public void read() {
GeoParquetReader reader = new GeoParquetReader(directory.toUri());
var path = new org.apache.hadoop.fs.Path(directory.toUri());
GeoParquetReader reader = new GeoParquetReader(path);
reader.read().count();
}

@SuppressWarnings({"squid:S1481", "squid:S2201"})
@Benchmark
public void readParallel() {
GeoParquetReader reader = new GeoParquetReader(directory.toUri());
var path = new org.apache.hadoop.fs.Path(directory.toUri());
GeoParquetReader reader = new GeoParquetReader(path);
reader.readParallel().count();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,17 @@ public void setup() throws IOException {
@SuppressWarnings({"squid:S1481", "squid:S2201"})
@Benchmark
public void read() {
GeoParquetReader reader =
new GeoParquetReader(Path.of("baremaps-benchmarking/data/small/*.parquet").toUri());
var path = new org.apache.hadoop.fs.Path("baremaps-benchmarking/data/small/*.parquet");
GeoParquetReader reader = new GeoParquetReader(path);
reader.read().count();
}

@SuppressWarnings({"squid:S1481", "squid:S2201"})
@Benchmark
public void readParallel() {
var path = new org.apache.hadoop.fs.Path("baremaps-benchmarking/data/small/*.parquet");
GeoParquetReader reader =
new GeoParquetReader(Path.of("baremaps-benchmarking/data/small/*.parquet").toUri());
new GeoParquetReader(path);
reader.readParallel().count();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.baremaps.data.storage.*;
import org.apache.baremaps.geoparquet.GeoParquetException;
import org.apache.baremaps.geoparquet.GeoParquetReader;
import org.apache.hadoop.fs.Path;

public class GeoParquetDataTable implements DataTable {

Expand All @@ -35,7 +36,7 @@ public class GeoParquetDataTable implements DataTable {

public GeoParquetDataTable(URI path) {
this.path = path;
this.reader = new GeoParquetReader(path);
this.reader = new GeoParquetReader(new Path(path));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public int getFieldRepetitionCount(int fieldIndex) {
}
}

private Object getValue(int fieldIndex, int index) {
Object getValue(int fieldIndex, int index) {
Object value = data[fieldIndex];
if (value instanceof List<?>list) {
return list.get(index);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.net.URI;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -53,31 +52,31 @@ public class GeoParquetReader {
/**
* Constructs a new {@code GeoParquetReader}.
*
* @param uri the URI to read from
* @param path the path to read from
*/
public GeoParquetReader(URI uri) {
this(uri, null, new Configuration());
public GeoParquetReader(Path path) {
this(path, null, new Configuration());
}

/**
* Constructs a new {@code GeoParquetReader}.
*
* @param uri the URI to read from
* @param path the path to read from
* @param envelope the envelope to filter records
*/
public GeoParquetReader(URI uri, Envelope envelope) {
this(uri, envelope, new Configuration());
public GeoParquetReader(Path path, Envelope envelope) {
this(path, envelope, new Configuration());
}

/**
* Constructs a new {@code GeoParquetReader}.
*
* @param uri the URI to read from
* @param path the path to read from
* @param configuration the configuration
*/
public GeoParquetReader(URI uri, Envelope envelope, Configuration configuration) {
public GeoParquetReader(Path path, Envelope envelope, Configuration configuration) {
this.configuration = configuration;
this.files = initializeFiles(uri, configuration);
this.files = initializeFiles(path, configuration);
this.envelope = envelope;
}

Expand Down Expand Up @@ -168,11 +167,10 @@ private FileInfo getFileInfo(FileStatus fileStatus) {
}
}

private static List<FileStatus> initializeFiles(URI uri, Configuration configuration) {
private static List<FileStatus> initializeFiles(Path path, Configuration configuration) {
try {
Path globPath = new Path(uri.getPath());
FileSystem fileSystem = FileSystem.get(uri, configuration);
FileStatus[] fileStatuses = fileSystem.globStatus(globPath);
FileSystem fileSystem = FileSystem.get(path.toUri(), configuration);
FileStatus[] fileStatuses = fileSystem.globStatus(path);
if (fileStatuses == null) {
throw new GeoParquetException("No files found at the specified URI.");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.baremaps.geoparquet;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.io.api.RecordConsumer;
import org.apache.parquet.schema.*;

/**
* WriteSupport implementation for writing GeoParquetGroup instances to Parquet.
*/
public class GeoParquetWriteSupport extends WriteSupport<GeoParquetGroup> {

private Configuration configuration;
private final MessageType schema;
private final GeoParquetMetadata metadata;
private RecordConsumer recordConsumer;
private final ObjectMapper objectMapper = new ObjectMapper();

/**
* Constructs a new GeoParquetWriteSupport.
*
* @param schema the Parquet schema
* @param metadata the GeoParquet metadata
*/
public GeoParquetWriteSupport(MessageType schema, GeoParquetMetadata metadata) {
this.schema = schema;
this.metadata = metadata;
}

@Override
public WriteContext init(Configuration configuration) {
Map<String, String> extraMetadata = new HashMap<>();
String geoMetadataJson = serializeMetadata(metadata);
extraMetadata.put("geo", geoMetadataJson);
return new WriteContext(schema, extraMetadata);
}

@Override
public void prepareForWrite(RecordConsumer recordConsumer) {
this.recordConsumer = recordConsumer;
}

@Override
public void write(GeoParquetGroup group) {
recordConsumer.startMessage();
writeGroup(group, schema, true);
recordConsumer.endMessage();
}

private void writeGroup(GeoParquetGroup group, GroupType groupType, boolean isRoot) {
if (!isRoot) {
recordConsumer.startGroup();
}
for (int i = 0; i < groupType.getFieldCount(); i++) {
Type fieldType = groupType.getType(i);
String fieldName = fieldType.getName();
int repetitionCount = group.getFieldRepetitionCount(i);
if (repetitionCount == 0) {
continue; // Skip if no values are present
}
for (int j = 0; j < repetitionCount; j++) {
recordConsumer.startField(fieldName, i);
if (fieldType.isPrimitive()) {
Object value = group.getValue(i, j);
writePrimitive(value, fieldType.asPrimitiveType());
} else {
GeoParquetGroup childGroup = group.getGroup(i, j);
writeGroup(childGroup, fieldType.asGroupType(), false);
}
recordConsumer.endField(fieldName, i);
}
}
if (!isRoot) {
recordConsumer.endGroup();
}
}

private void writePrimitive(Object value, PrimitiveType primitiveType) {
if (value == null) {
// The Parquet format does not support writing null values directly.
// If the field is optional and the value is null, we simply do not write it.
return;
}
switch (primitiveType.getPrimitiveTypeName()) {
case INT32:
recordConsumer.addInteger((Integer) value);
break;
case INT64:
recordConsumer.addLong((Long) value);
break;
case FLOAT:
recordConsumer.addFloat((Float) value);
break;
case DOUBLE:
recordConsumer.addDouble((Double) value);
break;
case BOOLEAN:
recordConsumer.addBoolean((Boolean) value);
break;
case BINARY, FIXED_LEN_BYTE_ARRAY:
recordConsumer.addBinary((Binary) value);
break;
default:
throw new GeoParquetException(
"Unsupported type: " + primitiveType.getPrimitiveTypeName());
}
}

private String serializeMetadata(GeoParquetMetadata metadata) {
try {
return objectMapper.writeValueAsString(metadata);
} catch (JsonProcessingException e) {
throw new GeoParquetException("Failed to serialize GeoParquet metadata", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.baremaps.geoparquet;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.schema.MessageType;

/**
* A writer for GeoParquet files that writes GeoParquetGroup instances to a Parquet file.
*/
public class GeoParquetWriter {

private GeoParquetWriter() {
// Prevent instantiation
}

public static Builder builder(Path file) {
return new Builder(file);
}

public static class Builder
extends ParquetWriter.Builder<GeoParquetGroup, GeoParquetWriter.Builder> {

private MessageType type = null;

private GeoParquetMetadata metadata = null;

private Builder(Path file) {
super(file);
}

/**
* Replace the message type with the specified one.
*
* @param type the message type
* @return the builder
*/
public GeoParquetWriter.Builder withType(MessageType type) {
this.type = type;
return this;
}

/**
* Replace the metadata with the specified one.
*
* @param metadata the metadata
* @return the builder
*/
public GeoParquetWriter.Builder withGeoParquetMetadata(GeoParquetMetadata metadata) {
this.metadata = metadata;
return this;
}

/**
* {@inheritDoc}
*/
@Override
protected WriteSupport<GeoParquetGroup> getWriteSupport(Configuration conf) {
// We don't need access to the hadoop configuration for now
return getWriteSupport((ParquetConfiguration) null);
}

/**
* {@inheritDoc}
*/
@Override
protected WriteSupport<GeoParquetGroup> getWriteSupport(ParquetConfiguration conf) {
return new GeoParquetWriteSupport(type, metadata);
}

/**
* {@inheritDoc}
*/
@Override
protected GeoParquetWriter.Builder self() {
return this;
}
}
}
Loading

0 comments on commit f2fb634

Please sign in to comment.