Skip to content

Commit

Permalink
Add support for nested types, geoparquet groups, and postgres jsonb i…
Browse files Browse the repository at this point in the history
…n data table (#860)

* Add support for nested types in the DataTable

* Add a JsonbHandler that serializes Objects

* Add an EnvelopeField to the GeoParquet parser

* Save the EnvelopeField as geometry in Postgis

* Add a writeEnvelope method to the CopyWriter

* BBox use float values in GeoParquet

* Create Envelope from Double and Float values

* Use the default CRS when the crs field is null in Geoparquet (#861)

---------

Co-authored-by: Antoine Drabble <[email protected]>
  • Loading branch information
bchapuis and Drabble authored Jun 3, 2024
1 parent ee7aed7 commit cd2018d
Show file tree
Hide file tree
Showing 25 changed files with 502 additions and 131 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.locationtech.jts.geom.Envelope;
import org.locationtech.jts.geom.Geometry;
import org.postgresql.copy.PGCopyOutputStream;
import org.postgresql.core.Oid;
Expand Down Expand Up @@ -106,6 +107,9 @@ public class CopyWriter implements AutoCloseable {
public static final GeometryValueHandler GEOMETRY_HANDLER =
new GeometryValueHandler();

public static final EnvelopeValueHandler ENVELOPE_HANDLER =
new EnvelopeValueHandler();

private final DataOutputStream data;

/**
Expand Down Expand Up @@ -397,6 +401,16 @@ public void writeGeometry(Geometry value) throws IOException {
GEOMETRY_HANDLER.handle(data, value);
}

/**
* Writes an envelope value.
*
* @param value
* @throws IOException
*/
public void writeEnvelope(Envelope value) throws IOException {
ENVELOPE_HANDLER.handle(data, value);
}

/** Close the writer. */
@Override
public void close() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.baremaps.database.copy;

import static org.locationtech.jts.io.WKBConstants.wkbNDR;

import de.bytefish.pgbulkinsert.pgsql.handlers.BaseValueHandler;
import java.io.DataOutputStream;
import org.locationtech.jts.geom.Envelope;
import org.locationtech.jts.geom.Geometry;
import org.locationtech.jts.geom.GeometryFactory;
import org.locationtech.jts.io.WKBWriter;

public class EnvelopeValueHandler extends BaseValueHandler<Envelope> {

private static final GeometryFactory geometryFactory = new GeometryFactory();

private static byte[] asWKB(Envelope value) {
Geometry geometry = geometryFactory.toGeometry(value);
return new WKBWriter(2, wkbNDR, true).write(geometry);
}

@Override
protected void internalHandle(DataOutputStream buffer, Envelope value) throws Exception {
byte[] wkb = asWKB(value);
buffer.writeInt(wkb.length);
buffer.write(wkb, 0, wkb.length);
}

@Override
public int getLength(Envelope value) {
return asWKB(value).length + 4;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,19 @@

public class GeometryValueHandler extends BaseValueHandler<Geometry> {

private static byte[] asWKB(Geometry geometry) {
return new WKBWriter(2, wkbNDR, true).write(geometry);
}

@Override
protected void internalHandle(DataOutputStream buffer, Geometry value) throws IOException {
byte[] wkb = new WKBWriter(2, wkbNDR, true).write(value);
byte[] wkb = asWKB(value);
buffer.writeInt(wkb.length);
buffer.write(wkb, 0, wkb.length);
}

@Override
public int getLength(Geometry geometry) {
throw new UnsupportedOperationException();
return asWKB(geometry).length + 4;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.baremaps.database.copy;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.module.SimpleModule;
import de.bytefish.pgbulkinsert.pgsql.handlers.BaseValueHandler;
import java.io.DataOutputStream;
import java.io.IOException;

public class JsonbValueHandler extends BaseValueHandler<Object> {

private static final ObjectMapper objectMapper;

static {
objectMapper = new ObjectMapper();
SimpleModule module = new SimpleModule();
module.addSerializer(String.class, new NoQuotesStringSerializer());
objectMapper.registerModule(module);
}

static class NoQuotesStringSerializer extends JsonSerializer<String> {
@Override
public void serialize(String value, JsonGenerator gen, SerializerProvider serializers)
throws IOException {
gen.writeRawValue(value);
}
}

private final int jsonbProtocolVersion;

public JsonbValueHandler() {
this(1);
}

public JsonbValueHandler(int jsonbProtocolVersion) {
this.jsonbProtocolVersion = jsonbProtocolVersion;
}

private static byte[] asJson(Object object) {
try {
String value = objectMapper.writeValueAsString(object);
return value.getBytes("UTF-8");
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
protected void internalHandle(DataOutputStream buffer, Object value) throws Exception {
byte[] utf8Bytes = asJson(value);
buffer.writeInt(utf8Bytes.length + 1);
buffer.writeByte(jsonbProtocolVersion);
buffer.write(utf8Bytes);
}

@Override
public int getLength(Object value) {
byte[] utf8Bytes = asJson(value);
return utf8Bytes.length;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,18 +82,30 @@ private List<ColumnResult> getColumns(String catalog, String schemaPattern,
var resultSet = connection.getMetaData().getColumns(catalog, schemaPattern,
tableNamePattern, columnNamePattern)) {
while (resultSet.next()) {
tableColumns.add(new ColumnResult(resultSet.getString("TABLE_CAT"),
resultSet.getString("TABLE_SCHEM"), resultSet.getString("TABLE_NAME"),
resultSet.getString("COLUMN_NAME"), resultSet.getInt("DATA_TYPE"),
resultSet.getString("TYPE_NAME"), resultSet.getInt("COLUMN_SIZE"),
resultSet.getInt("DECIMAL_DIGITS"), resultSet.getInt("NUM_PREC_RADIX"),
resultSet.getInt("NULLABLE"), resultSet.getString("REMARKS"),
resultSet.getString("COLUMN_DEF"), resultSet.getInt("SQL_DATA_TYPE"),
resultSet.getInt("SQL_DATETIME_SUB"), resultSet.getInt("CHAR_OCTET_LENGTH"),
resultSet.getInt("ORDINAL_POSITION"), resultSet.getString("IS_NULLABLE"),
resultSet.getString("SCOPE_CATALOG"), resultSet.getString("SCOPE_SCHEMA"),
resultSet.getString("SCOPE_TABLE"), resultSet.getShort("SOURCE_DATA_TYPE"),
resultSet.getString("IS_AUTOINCREMENT"), resultSet.getString("IS_GENERATEDCOLUMN")));
tableColumns.add(new ColumnResult(
resultSet.getString("TABLE_CAT"),
resultSet.getString("TABLE_SCHEM"),
resultSet.getString("TABLE_NAME"),
resultSet.getString("COLUMN_NAME"),
resultSet.getInt("DATA_TYPE"),
resultSet.getString("TYPE_NAME"),
resultSet.getInt("COLUMN_SIZE"),
resultSet.getInt("DECIMAL_DIGITS"),
resultSet.getInt("NUM_PREC_RADIX"),
resultSet.getInt("NULLABLE"),
resultSet.getString("REMARKS"),
resultSet.getString("COLUMN_DEF"),
resultSet.getInt("SQL_DATA_TYPE"),
resultSet.getInt("SQL_DATETIME_SUB"),
resultSet.getInt("CHAR_OCTET_LENGTH"),
resultSet.getInt("ORDINAL_POSITION"),
resultSet.getString("IS_NULLABLE"),
resultSet.getString("SCOPE_CATALOG"),
resultSet.getString("SCOPE_SCHEMA"),
resultSet.getString("SCOPE_TABLE"),
resultSet.getShort("SOURCE_DATA_TYPE"),
resultSet.getString("IS_AUTOINCREMENT"),
resultSet.getString("IS_GENERATEDCOLUMN")));
}
} catch (SQLException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,12 @@ public FlatGeoBufDataTable(Path file) {
this.schema = readSchema(file);
}


/**
* Reads the schema from a flatgeobuf file.
*
* @param file the path to the flatgeobuf file
* @return the schema of the table
*/
private static DataSchema readSchema(Path file) {
try (var channel = FileChannel.open(file, StandardOpenOption.READ)) {
// try to read the schema from the file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.*;
import java.util.stream.Collectors;
import org.apache.baremaps.data.storage.*;
import org.apache.baremaps.data.storage.DataColumn.Cardinality;
import org.apache.baremaps.data.storage.DataColumn.Type;
import org.wololo.flatgeobuf.ColumnMeta;
import org.wololo.flatgeobuf.GeometryConversions;
Expand Down Expand Up @@ -53,7 +54,10 @@ public class FlatGeoBufTypeConversion {
public static DataSchema asSchema(HeaderMeta headerMeta) {
var name = headerMeta.name;
var columns = headerMeta.columns.stream()
.map(column -> new DataColumnImpl(column.name, Type.fromBinding(column.getBinding())))
.map(column -> new DataColumnFixed(
column.name,
column.nullable ? Cardinality.OPTIONAL : Cardinality.REQUIRED,
Type.fromBinding(column.getBinding())))
.map(DataColumn.class::cast)
.toList();
return new DataSchemaImpl(name, columns);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import mil.nga.geopackage.features.user.FeatureResultSet;
import mil.nga.geopackage.geom.GeoPackageGeometryData;
import org.apache.baremaps.data.storage.*;
import org.apache.baremaps.data.storage.DataColumn.Cardinality;
import org.apache.baremaps.data.storage.DataColumn.Type;
import org.locationtech.jts.geom.*;

Expand All @@ -50,7 +51,9 @@ public GeoPackageDataTable(FeatureDao featureDao) {
for (FeatureColumn column : featureDao.getColumns()) {
var propertyName = column.getName();
var propertyType = classType(column);
columns.add(new DataColumnImpl(propertyName, propertyType));
var propertyCardinality = column.isNotNull() ? Cardinality.REQUIRED : Cardinality.OPTIONAL;
columns.add(new DataColumnFixed(
propertyName, propertyCardinality, propertyType));
}
schema = new DataSchemaImpl(name, columns);
geometryFactory = new GeometryFactory(new PrecisionModel(), (int) featureDao.getSrs().getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,38 +18,50 @@
package org.apache.baremaps.storage.geoparquet;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import org.apache.baremaps.data.storage.DataColumn;
import java.util.Map;
import org.apache.baremaps.data.storage.*;
import org.apache.baremaps.data.storage.DataColumn.Cardinality;
import org.apache.baremaps.data.storage.DataColumn.Type;
import org.apache.baremaps.data.storage.DataColumnImpl;
import org.apache.baremaps.data.storage.DataSchema;
import org.apache.baremaps.data.storage.DataSchemaImpl;
import org.apache.baremaps.geoparquet.data.GeoParquetGroup;
import org.apache.baremaps.geoparquet.data.GeoParquetGroup.Field;
import org.apache.baremaps.geoparquet.data.GeoParquetGroup.GroupField;
import org.apache.baremaps.geoparquet.data.GeoParquetGroup.Schema;

public class GeoParquetTypeConversion {

private GeoParquetTypeConversion() {}

public static DataSchema asSchema(String table, Schema schema) {
List<DataColumn> columns = schema.fields().stream()
.map(field -> (DataColumn) new DataColumnImpl(field.name(), asSchema(field.type())))
.toList();
List<DataColumn> columns = asDataColumns(schema);
return new DataSchemaImpl(table, columns);
}

public static Type asSchema(GeoParquetGroup.Type type) {
return switch (type) {
case BINARY -> Type.BYTE_ARRAY;
case BOOLEAN -> Type.BOOLEAN;
case INTEGER -> Type.INTEGER;
case INT96, LONG -> Type.LONG;
case FLOAT -> Type.FLOAT;
case DOUBLE -> Type.DOUBLE;
case STRING -> Type.STRING;
case GEOMETRY -> Type.GEOMETRY;
case GROUP -> null;
private static List<DataColumn> asDataColumns(Schema field) {
return field.fields().stream()
.map(GeoParquetTypeConversion::asDataColumn)
.toList();
}

private static DataColumn asDataColumn(Field field) {
Cardinality cardinality = switch (field.cardinality()) {
case REQUIRED -> Cardinality.REQUIRED;
case OPTIONAL -> Cardinality.OPTIONAL;
case REPEATED -> Cardinality.REPEATED;
};
return switch (field.type()) {
case BINARY -> new DataColumnFixed(field.name(), cardinality, Type.BINARY);
case BOOLEAN -> new DataColumnFixed(field.name(), cardinality, Type.BOOLEAN);
case INTEGER -> new DataColumnFixed(field.name(), cardinality, Type.INTEGER);
case INT96, LONG -> new DataColumnFixed(field.name(), cardinality, Type.LONG);
case FLOAT -> new DataColumnFixed(field.name(), cardinality, Type.FLOAT);
case DOUBLE -> new DataColumnFixed(field.name(), cardinality, Type.DOUBLE);
case STRING -> new DataColumnFixed(field.name(), cardinality, Type.STRING);
case GEOMETRY -> new DataColumnFixed(field.name(), cardinality, Type.GEOMETRY);
case ENVELOPE -> new DataColumnFixed(field.name(), cardinality, Type.ENVELOPE);
case GROUP -> new DataColumnNested(field.name(), cardinality,
asDataColumns(((GroupField) field).schema()));
};
}

Expand All @@ -59,7 +71,6 @@ public static List<Object> asRowValues(GeoParquetGroup group) {
List<Field> fields = schema.fields();
for (int i = 0; i < fields.size(); i++) {
Field field = fields.get(i);
field.type();
switch (field.type()) {
case BINARY -> values.add(group.getBinaryValue(i).getBytes());
case BOOLEAN -> values.add(group.getBooleanValue(i));
Expand All @@ -69,9 +80,33 @@ public static List<Object> asRowValues(GeoParquetGroup group) {
case DOUBLE -> values.add(group.getDoubleValue(i));
case STRING -> values.add(group.getStringValue(i));
case GEOMETRY -> values.add(group.getGeometryValue(i));
case GROUP -> values.add(null); // TODO: values.add(asDataRow(group.getGroupValue(i)));
case ENVELOPE -> values.add(group.getEnvelopeValue(i));
case GROUP -> values.add(asNested(group.getGroupValue(i)));
}
}
return values;
}

public static Map<String, Object> asNested(GeoParquetGroup group) {
Map<String, Object> nested = new HashMap<>();
Schema schema = group.getSchema();
List<Field> fields = schema.fields();
for (int i = 0; i < fields.size(); i++) {
Field field = fields.get(i);
nested.put(field.name(), switch (field.type()) {
case BINARY -> group.getBinaryValue(i).getBytes();
case BOOLEAN -> group.getBooleanValue(i);
case INTEGER -> group.getIntegerValue(i);
case INT96, LONG -> group.getLongValue(i);
case FLOAT -> group.getFloatValue(i);
case DOUBLE -> group.getDoubleValue(i);
case STRING -> group.getStringValue(i);
case GEOMETRY -> group.getGeometryValue(i);
case ENVELOPE -> group.getEnvelopeValue(i);
case GROUP -> asNested(group.getGroupValue(i));
});
}
return nested;
}

}
Loading

0 comments on commit cd2018d

Please sign in to comment.