Skip to content

Commit

Permalink
Create a simple overture example
Browse files Browse the repository at this point in the history
  • Loading branch information
Drabble committed Jun 2, 2024
1 parent 6cf69a2 commit 61b615c
Show file tree
Hide file tree
Showing 18 changed files with 346 additions and 50 deletions.
11 changes: 11 additions & 0 deletions .run/overture-serve.run.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="overture-serve" type="Application" factoryName="Application">
<option name="MAIN_CLASS_NAME" value="org.apache.baremaps.cli.Baremaps" />
<module name="baremaps-cli" />
<option name="PROGRAM_PARAMETERS" value="map serve --tileset tileset.json --style style.json" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/examples/overture" />
<method v="2">
<option name="Make" enabled="true" />
</method>
</configuration>
</component>
11 changes: 11 additions & 0 deletions .run/overture-workflow.run.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="overture-workflow" type="Application" factoryName="Application">
<option name="MAIN_CLASS_NAME" value="org.apache.baremaps.cli.Baremaps" />
<module name="baremaps-cli" />
<option name="PROGRAM_PARAMETERS" value="workflow execute --file workflow.json" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/examples/overture" />
<method v="2">
<option name="Make" enabled="true" />
</method>
</configuration>
</component>
8 changes: 8 additions & 0 deletions baremaps-cli/src/main/resources/log4j.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Root logger option
log4j.rootLogger=INFO, stdout

# Direct log messages to console
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,24 @@
public class GeoParquetDataStore implements DataStore {

private final URI uri;
private final String tableName;

public GeoParquetDataStore(URI uri) {
public GeoParquetDataStore(URI uri, String tableName) {
this.uri = uri;
this.tableName = tableName;
}

@Override
public List<String> list() throws DataStoreException {
return List.of(uri.toString());
return List.of(tableName);
}

@Override
public DataTable get(String name) throws DataStoreException {
if (!uri.toString().equals(name)) {
if (!tableName.equals(name)) {
throw new DataStoreException("Table not found");
}
return new GeoParquetDataTable(uri);
return new GeoParquetDataTable(uri, tableName);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,15 @@ public class GeoParquetDataTable implements DataTable {

private final URI path;

private final String name;

private DataSchema schema;

private GeoParquetReader reader;

public GeoParquetDataTable(URI path) {
public GeoParquetDataTable(URI path, String name) {
this.path = path;
this.name = name;
}

private GeoParquetReader reader() {
Expand Down Expand Up @@ -75,7 +78,7 @@ public Stream<DataRow> stream() {
public Stream<DataRow> parallelStream() {
try {
return reader().read().map(group -> new DataRowImpl(
GeoParquetTypeConversion.asSchema(path.toString(), group.getSchema()),
GeoParquetTypeConversion.asSchema(name, group.getSchema()),
GeoParquetTypeConversion.asRowValues(group)));
} catch (IOException | URISyntaxException e) {
throw new GeoParquetException("Fail to read() the reader", e);
Expand All @@ -98,12 +101,20 @@ public DataSchema schema() {
if (schema == null) {
try {
Schema schema = reader().getGeoParquetSchema();
this.schema = GeoParquetTypeConversion.asSchema(path.toString(), schema);
this.schema = GeoParquetTypeConversion.asSchema(name, schema);
return this.schema;
} catch (URISyntaxException e) {
throw new GeoParquetException("Fail toe get the schema.", e);
throw new GeoParquetException("Failed to get the schema.", e);
}
}
return schema;
}

public int srid(String column) {
try {
return reader().getGeoParquetMetadata().getSrid(column);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ private static List<DataColumn> asDataColumns(Schema field) {

private static DataColumn asDataColumn(Field field) {
Cardinality cardinality = switch (field.cardinality()) {
case REQUIRED -> Cardinality.REQUIRED;
case REQUIRED -> Cardinality.OPTIONAL;
case OPTIONAL -> Cardinality.OPTIONAL;
case REPEATED -> Cardinality.REPEATED;
case REPEATED -> Cardinality.OPTIONAL;
};
return switch (field.type()) {
case BINARY -> new DataColumnFixed(field.name(), cardinality, Type.BINARY);
Expand All @@ -71,6 +71,10 @@ public static List<Object> asRowValues(GeoParquetGroup group) {
List<Field> fields = schema.fields();
for (int i = 0; i < fields.size(); i++) {
Field field = fields.get(i);
if(group.getValues(i).isEmpty()){
values.add(null);
continue;
}
switch (field.type()) {
case BINARY -> values.add(group.getBinaryValue(i).getBytes());
case BOOLEAN -> values.add(group.getBooleanValue(i));
Expand All @@ -93,6 +97,10 @@ public static Map<String, Object> asNested(GeoParquetGroup group) {
List<Field> fields = schema.fields();
for (int i = 0; i < fields.size(); i++) {
Field field = fields.get(i);
if(group.getValues(i).isEmpty()){
nested.put(field.name(), null);
continue;
}
nested.put(field.name(), switch (field.type()) {
case BINARY -> group.getBinaryValue(i).getBytes();
case BOOLEAN -> group.getBooleanValue(i);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
@Type(value = ImportDaylightFeatures.class, name = "ImportDaylightFeatures"),
@Type(value = ImportDaylightTranslations.class, name = "ImportDaylightTranslations"),
@Type(value = ImportGeoPackage.class, name = "ImportGeoPackage"),
@Type(value = ImportGeoParquet.class, name = "ImportGeoParquet"),
@Type(value = ImportOsmOsc.class, name = "ImportOsmOsc"),
@Type(value = ImportOsmPbf.class, name = "ImportOsmPbf"),
@Type(value = ImportShapefile.class, name = "ImportShapefile"),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.baremaps.workflow.tasks;

import org.apache.baremaps.data.storage.DataTableGeometryMapper;
import org.apache.baremaps.data.storage.DataTableMapper;
import org.apache.baremaps.openstreetmap.function.ProjectionTransformer;
import org.apache.baremaps.storage.geoparquet.GeoParquetDataStore;
import org.apache.baremaps.storage.geoparquet.GeoParquetDataTable;
import org.apache.baremaps.storage.postgres.PostgresDataStore;
import org.apache.baremaps.workflow.Task;
import org.apache.baremaps.workflow.WorkflowContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.URI;
import java.util.StringJoiner;

/**
* Import a GeoParquet into a database.
*/
public class ImportGeoParquet implements Task {

private static final Logger logger = LoggerFactory.getLogger(ImportGeoParquet.class);

private URI uri;
private String tableName;
private Object database;
private Integer databaseSrid;

/**
* Constructs a {@code ImportGeoParquet}.
*/
public ImportGeoParquet() {

}

/**
* Constructs an {@code ImportGeoParquet}.
*
* @param uri the GeoParquet uri
* @param database the database
* @param databaseSrid the target SRID
*/
public ImportGeoParquet(URI uri, String tableName, Object database, Integer databaseSrid) {
this.uri = uri;
this.tableName = tableName;
this.database = database;
this.databaseSrid = databaseSrid;
}

/**
* {@inheritDoc}
*/
@Override
public void execute(WorkflowContext context) throws Exception {
var geoParquetDataStore = new GeoParquetDataStore(uri, tableName);
var dataSource = context.getDataSource(database);
var postgresDataStore = new PostgresDataStore(dataSource);
for (var name : geoParquetDataStore.list()) {
var geoParquetTable = (GeoParquetDataTable)geoParquetDataStore.get(name);
// TODO : How can we apply a different SRID for each column based on the geometry
var projectionTransformer = new ProjectionTransformer(geoParquetTable.srid("geometry"), databaseSrid);
var rowTransformer =
new DataTableGeometryMapper(geoParquetTable, projectionTransformer);
var transformedDataTable =
new DataTableMapper(geoParquetDataStore.get(name), rowTransformer);
postgresDataStore.add(transformedDataTable);
}
}

/**
* {@inheritDoc}
*/
@Override
public String toString() {
return new StringJoiner(", ", ImportGeoParquet.class.getSimpleName() + "[", "]")
.add("uri=" + uri)
.add("database=" + database)
.add("databaseSrid=" + databaseSrid)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.schema.MessageType;
Expand Down Expand Up @@ -95,13 +98,27 @@ private synchronized Map<FileStatus, FileInfo> files() throws URISyntaxException
try {
if (files == null) {
files = new HashMap<>();
Path globPath = new Path(uri.getPath());
URI rootUri = getRootUri(uri);
FileSystem fileSystem = FileSystem.get(rootUri, configuration);

// Iterate over all the files in the path
for (FileStatus file : fileSystem.globStatus(globPath)) {
files.put(file, buildFileInfo(file));
FileSystem fs = FileSystem.get(uri, configuration);
FileStatus[] fileStatuses = fs.globStatus(new Path(uri));

for (FileStatus fileStatus : fileStatuses) {
Path filePath = fileStatus.getPath();
ParquetFileReader reader = ParquetFileReader.open(configuration, filePath);
Long recordCount = reader.getRecordCount();
MessageType messageType = reader.getFileMetaData().getSchema();
Map<String, String> keyValueMetadata = reader.getFileMetaData().getKeyValueMetaData();
GeoParquetMetadata geoParquetMetadata = null;
GeoParquetGroup.Schema geoParquetSchema = null;
if (keyValueMetadata.containsKey("geo")) {
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
geoParquetMetadata =
objectMapper.readValue(keyValueMetadata.get("geo"), GeoParquetMetadata.class);
geoParquetSchema =
GeoParquetGroupFactory.createGeoParquetSchema(messageType, geoParquetMetadata);
}
files.put(fileStatus, new FileInfo(fileStatus, recordCount, keyValueMetadata, messageType,
geoParquetMetadata, geoParquetSchema));
}

// Verify that the files all have the same schema
Expand All @@ -110,12 +127,12 @@ private synchronized Map<FileStatus, FileInfo> files() throws URISyntaxException
if (commonMessageType == null) {
commonMessageType = entry.messageType;
} else if (!commonMessageType.equals(entry.messageType)) {
throw new GeoParquetException("The files do not have the same schema");
throw new RuntimeException("The files do not have the same schema");
}
}
}
} catch (IOException e) {
throw new GeoParquetException("IOException while attempting to list files.", e);
throw new RuntimeException("IOException while attempting to list files.", e);
}
return files;
}
Expand Down Expand Up @@ -254,31 +271,11 @@ public int characteristics() {
}

private static Configuration createConfiguration() {
Configuration configuration = new Configuration();
configuration.set("fs.s3a.aws.credentials.provider",
"org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider");
configuration.setBoolean("fs.s3a.path.style.access", true);
return configuration;
}

private static URI getRootUri(URI uri) throws URISyntaxException {
// TODO:
// This is a quick and dirty way to get the root uri of the path.
// We take everything before the first wildcard in the path.
// This is not a perfect solution, and we should probably look for a better way to do this.
String path = uri.getPath();
int index = path.indexOf("*");
if (index != -1) {
path = path.substring(0, path.lastIndexOf("/", index) + 1);
}
return new URI(
uri.getScheme(),
uri.getUserInfo(),
uri.getHost(),
uri.getPort(),
path,
null,
null);
Configuration conf = new Configuration();
conf.set("fs.s3a.endpoint", "s3.us-west-2.amazonaws.com");
conf.set("fs.s3a.aws.credentials.provider", AnonymousAWSCredentialsProvider.class.getName());
conf.set("fs.s3a.impl", S3AFileSystem.class.getName());
conf.set("fs.s3a.path.style.access", "true");
return conf;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ public interface GeoParquetGroup {
*/
GeoParquetGroup createGroup(int fieldIndex);

List<Primitive> getValues(int fieldIndex);

Binary getBinaryValue(int fieldIndex);

List<Binary> getBinaryValues(int fieldIndex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ private void appendToString(StringBuilder builder, String indent) {
}
}

private List<Primitive> getValues(int fieldIndex) {
public List<Primitive> getValues(int fieldIndex) {
return (List<Primitive>) data[fieldIndex];
}

Expand Down Expand Up @@ -435,10 +435,10 @@ public Envelope getEnvelopeValue(int fieldIndex) {
@Override
public List<Envelope> getEnvelopeValues(int fieldIndex) {
return getGroupValues(fieldIndex).stream().map(group -> {
var xMin = group.getDoubleValue(0);
var yMin = group.getDoubleValue(1);
var xMax = group.getDoubleValue(2);
var yMax = group.getDoubleValue(3);
var xMin = group.getFloatValue(0);
var yMin = group.getFloatValue(1);
var xMax = group.getFloatValue(2);
var yMax = group.getFloatValue(3);
return new Envelope(xMin, xMax, yMin, yMax);
}).toList();
}
Expand Down
Loading

0 comments on commit 61b615c

Please sign in to comment.