Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve workflow package #802

Merged
merged 21 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ public class ImportOsm implements Callable<Integer> {
public Integer call() throws Exception {
new org.apache.baremaps.workflow.tasks.ImportOsmPbf(
file.toAbsolutePath(),
null,
true,
database,
srid,
true).execute(new WorkflowContext());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,13 @@ public class UpdateOsm implements Callable<Integer> {
description = "The projection used by the database.")
private int srid = 3857;

@Option(names = {"--replication-url"}, paramLabel = "REPLICATION_URL",
description = "The replication url of the OpenStreetMap server.")
private String replicationUrl = "https://planet.osm.org/replication/hour";

@Override
public Integer call() throws Exception {
new UpdateOsmDatabase(database, srid)
new UpdateOsmDatabase(database, srid, replicationUrl)
.execute(new WorkflowContext());
return 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,22 +50,14 @@ public class Export implements Callable<Integer> {
@Option(names = {"--tiles"}, paramLabel = "TILES", description = "The tiles to export.")
private URI tiles;

@Option(names = {"--batch-array-size"}, paramLabel = "BATCH_ARRAY_SIZE",
description = "The size of the batch array.")
private int batchArraySize = 1;

@Option(names = {"--batch-array-index"}, paramLabel = "READER",
description = "The index of the batch in the array.")
private int batchArrayIndex = 0;

@Option(names = {"--format"}, paramLabel = "FORMAT",
description = "The format of the repository.")
private ExportVectorTiles.Format format = ExportVectorTiles.Format.file;

@Override
public Integer call() throws Exception {
new ExportVectorTiles(tileset.toAbsolutePath(),
repository.toAbsolutePath(), batchArraySize, batchArrayIndex, format)
repository.toAbsolutePath(), format)
.execute(new WorkflowContext());
return 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public Integer call() throws Exception {
var configReader = new ConfigReader();
var workflow = mapper.readValue(configReader.read(file.toAbsolutePath()), Workflow.class);
try (var executor = new WorkflowExecutor(workflow)) {
executor.execute().get();
executor.execute();
}
logger.info("Finished executing the workflow {}", file);
return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,16 +73,16 @@ public static Geometry deserialize(byte[] wkb) {
/**
* Creates a coordinate transform with the provided SRIDs.
*
* @param sourceSRID the source SRID
* @param targetSRID the target SRID
* @param sourceSrid the source SRID
* @param targetSrid the target SRID
* @return the coordinate transform
*/
public static CoordinateTransform coordinateTransform(Integer sourceSRID, Integer targetSRID) {
public static CoordinateTransform coordinateTransform(Integer sourceSrid, Integer targetSrid) {
CRSFactory crsFactory = new CRSFactory();
CoordinateReferenceSystem sourceCRS =
crsFactory.createFromName(String.format("EPSG:%d", sourceSRID));
crsFactory.createFromName(String.format("EPSG:%d", sourceSrid));
CoordinateReferenceSystem targetCRS =
crsFactory.createFromName(String.format("EPSG:%d", targetSRID));
crsFactory.createFromName(String.format("EPSG:%d", targetSrid));
CoordinateTransformFactory coordinateTransformFactory = new CoordinateTransformFactory();
return coordinateTransformFactory.createTransform(sourceCRS, targetCRS);
}
Expand Down
40 changes: 20 additions & 20 deletions baremaps-core/src/main/java/org/apache/baremaps/workflow/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
Expand All @@ -32,26 +33,25 @@
@JsonSerialize
@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.ANY)
@JsonTypeInfo(use = Id.NAME, property = "type")
@JsonSubTypes({@JsonSubTypes.Type(value = DownloadUrl.class, name = "DownloadUrl"),
@JsonSubTypes.Type(value = ExecuteCommand.class, name = "ExecuteCommand"),
@JsonSubTypes.Type(value = ExecuteSql.class, name = "ExecuteSql"),
@JsonSubTypes.Type(value = ExecuteSqlScript.class, name = "ExecuteSqlScript"),
@JsonSubTypes.Type(value = ExportVectorTiles.class, name = "ExportVectorTiles"),
@JsonSubTypes.Type(value = ImportGeoPackage.class, name = "ImportGeoPackage"),
@JsonSubTypes.Type(value = ImportOsmPbf.class, name = "ImportOsmPbf"),
@JsonSubTypes.Type(value = ImportOsmOsc.class, name = "ImportOsmOsc"),
@JsonSubTypes.Type(value = ImportShapefile.class, name = "ImportShapefile"),
@JsonSubTypes.Type(value = LogMessage.class, name = "LogMessage"),
@JsonSubTypes.Type(value = UnzipFile.class, name = "UnzipFile"),
@JsonSubTypes.Type(value = UngzipFile.class, name = "UngzipFile"),
@JsonSubTypes.Type(value = DecompressBZip2.class, name = "DecompressBZip2"),
@JsonSubTypes.Type(value = DecompressFile.class, name = "DecompressFile"),
@JsonSubTypes.Type(value = UpdateOsmDatabase.class, name = "UpdateOsmDatabase"),
@JsonSubTypes.Type(value = CreateGeonamesIndex.class, name = "CreateGeonamesIndex"),
@JsonSubTypes.Type(value = CreateIplocIndex.class, name = "CreateIplocIndex"),
@JsonSubTypes.Type(value = ImportDaylightTranslations.class,
name = "ImportDaylightTranslations"),
@JsonSubTypes.Type(value = ImportDaylightFeatures.class, name = "ImportDaylightFeatures")
@JsonSubTypes({
@Type(value = CleanContextCache.class, name = "CleanContextCache"),
@Type(value = CleanContextData.class, name = "CleanContextData"),
@Type(value = CreateGeonamesIndex.class, name = "CreateGeonamesIndex"),
@Type(value = CreateIplocIndex.class, name = "CreateIplocIndex"),
@Type(value = DecompressFile.class, name = "DecompressFile"),
@Type(value = DownloadUrl.class, name = "DownloadUrl"),
@Type(value = ExecuteCommand.class, name = "ExecuteCommand"),
@Type(value = ExecuteSql.class, name = "ExecuteSql"),
@Type(value = ExecuteSqlScript.class, name = "ExecuteSqlScript"),
@Type(value = ExportVectorTiles.class, name = "ExportVectorTiles"),
@Type(value = ImportDaylightFeatures.class, name = "ImportDaylightFeatures"),
@Type(value = ImportDaylightTranslations.class, name = "ImportDaylightTranslations"),
@Type(value = ImportGeoPackage.class, name = "ImportGeoPackage"),
@Type(value = ImportOsmOsc.class, name = "ImportOsmOsc"),
@Type(value = ImportOsmPbf.class, name = "ImportOsmPbf"),
@Type(value = ImportShapefile.class, name = "ImportShapefile"),
@Type(value = LogMessage.class, name = "LogMessage"),
@Type(value = UpdateOsmDatabase.class, name = "UpdateOsmDatabase"),
})
public interface Task {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,40 @@



import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.sql.DataSource;
import org.apache.baremaps.database.collection.*;
import org.apache.baremaps.database.memory.MemoryMappedDirectory;
import org.apache.baremaps.database.type.*;
import org.apache.baremaps.database.type.geometry.LonLatDataType;
import org.apache.baremaps.utils.FileUtils;
import org.apache.baremaps.utils.PostgresUtils;
import org.locationtech.jts.geom.Coordinate;

/**
* A context that is passed to the tasks of a workflow and used to share data between tasks.
*/
public class WorkflowContext {

private final Path dataDir;

private final Path cacheDir;

public WorkflowContext() {
this(Paths.get("./data"), Paths.get("./cache"));
}

public WorkflowContext(Path dataDir, Path cacheDir) {
this.dataDir = dataDir;
this.cacheDir = cacheDir;
}

private Map<Object, DataSource> dataSources = new ConcurrentHashMap<>() {};

/**
Expand All @@ -41,4 +65,45 @@
return dataSources.computeIfAbsent(database, PostgresUtils::createDataSourceFromObject);
}

public DataMap<Long, Coordinate> getCoordinateMap(Path path) throws IOException {
if (Files.size(path) > 1 << 30) {
return getMemoryAlignedDataMap("coordinates", new LonLatDataType());
} else {
return getMonotonicDataMap("coordinates", new LonLatDataType());
}
}

public DataMap<Long, List<Long>> getReferenceMap(Path path) throws IOException {

Check notice

Code scanning / CodeQL

Useless parameter Note

The parameter 'path' is never used.
return getMonotonicDataMap("references", new LongListDataType());
}

public <T> DataMap<Long, T> getMemoryAlignedDataMap(String name, FixedSizeDataType<T> dataType)
throws IOException {
var coordinateDir = Files.createDirectories(cacheDir.resolve(name));
return new MemoryAlignedDataMap<>(
dataType,
new MemoryMappedDirectory(coordinateDir));
}

public <T> DataMap<Long, T> getMonotonicDataMap(String name, DataType<T> dataType)
throws IOException {
var mapDir = Files.createDirectories(cacheDir.resolve(name));
var keysDir = Files.createDirectories(mapDir.resolve("keys"));
var valuesDir = Files.createDirectories(mapDir.resolve("values"));
return new MonotonicDataMap<>(
new MemoryAlignedDataList<>(
new PairDataType<>(new LongDataType(), new LongDataType()),
new MemoryMappedDirectory(keysDir)),
new AppendOnlyBuffer<>(
dataType,
new MemoryMappedDirectory(valuesDir)));
}

public void cleanCache() throws IOException {
FileUtils.deleteRecursively(cacheDir);
}

public void cleanData() throws IOException {
FileUtils.deleteRecursively(dataDir);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,17 +105,24 @@ public WorkflowExecutor(Workflow workflow, ExecutorService executorService) {
/**
* Executes the workflow.
*/
public CompletableFuture<Void> execute() {
public void execute() {
try {
executeAsync().join();
logStepMeasures();
} catch (Exception e) {
logger.error("Error while executing the workflow", e);
}
}

public CompletableFuture<Void> executeAsync() {
// Create futures for each end step
var endSteps = graph.nodes().stream()
.filter(this::isEndStep)
.map(this::getFutureStep)
.map(this::getStep)
.toArray(CompletableFuture[]::new);

// Create a future that logs the stepMeasures when all the futures are completed
var future = CompletableFuture.allOf(endSteps).thenRun(this::logStepMeasures);

return future;
// Wait for all the end steps to complete
return CompletableFuture.allOf(endSteps);
}

/**
Expand All @@ -125,8 +132,8 @@ public CompletableFuture<Void> execute() {
* @param step the step id
* @return the future step
*/
private CompletableFuture<Void> getFutureStep(String step) {
return futures.computeIfAbsent(step, this::createFutureStep);
private CompletableFuture<Void> getStep(String step) {
return futures.computeIfAbsent(step, this::createStep);
}

/**
Expand All @@ -135,10 +142,10 @@ private CompletableFuture<Void> getFutureStep(String step) {
* @param stepId the step id
* @return the future step
*/
private CompletableFuture<Void> createFutureStep(String stepId) {
private CompletableFuture<Void> createStep(String stepId) {
// Initialize the future step with the previous future step
// as it depends on its completion.
var future = getPreviousFutureStep(stepId);
var future = getPreviousStep(stepId);

// Time the execution of the tasks
var measures = new ArrayList<TaskMeasure>();
Expand Down Expand Up @@ -182,7 +189,7 @@ private CompletableFuture<Void> createFutureStep(String stepId) {
* @param stepId the step id
* @return the future step
*/
private CompletableFuture<Void> getPreviousFutureStep(String stepId) {
private CompletableFuture<Void> getPreviousStep(String stepId) {
var predecessors = graph.predecessors(stepId).stream().toList();

// If the step has no predecessor,
Expand All @@ -194,13 +201,13 @@ private CompletableFuture<Void> getPreviousFutureStep(String stepId) {
// If the step has one predecessor,
// return the future step associated to it.
if (predecessors.size() == 1) {
return getFutureStep(predecessors.get(0));
return getStep(predecessors.get(0));
}

// If the step has multiple predecessors,
// return a future step that completes when all the predecessors complete.
var futurePredecessors = predecessors.stream()
.map(this::getFutureStep)
.map(this::getStep)
.toArray(CompletableFuture[]::new);
return CompletableFuture.allOf(futurePredecessors);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,24 @@

package org.apache.baremaps.workflow.tasks;



import java.nio.file.Files;
import org.apache.baremaps.testing.TestFiles;
import org.apache.baremaps.utils.FileUtils;
import org.apache.baremaps.workflow.Task;
import org.apache.baremaps.workflow.WorkflowContext;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

class UngzipFileTest {
/**
* Clean the context cache.
*/
public class CleanContextCache implements Task {

/**
* Constructs an {@code CleanContextCache}.
*/
public CleanContextCache() {}

@Test
@Tag("integration")
void run() throws Exception {
var gzip = TestFiles.resolve("ripe/sample.txt.gz");
var directory = Files.createTempDirectory("tmp_");
var task = new UngzipFile(gzip, directory);
task.execute(new WorkflowContext());
FileUtils.deleteRecursively(directory);
/**
* {@inheritDoc}
*/
@Override
public void execute(WorkflowContext context) throws Exception {
context.cleanCache();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,21 @@

package org.apache.baremaps.workflow.tasks;



import java.nio.file.Files;
import org.apache.baremaps.testing.TestFiles;
import org.apache.baremaps.utils.FileUtils;
import org.apache.baremaps.workflow.Task;
import org.apache.baremaps.workflow.WorkflowContext;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

class UnzipFileTest {
/**
* Clean the context data.
*/
public class CleanContextData implements Task {

/**
* Constructs an {@code CleanContextData}.
*/
public CleanContextData() {}

@Test
@Tag("integration")
void execute() throws Exception {
var zip = TestFiles.resolve("monaco-shapefile.zip");
var directory = Files.createTempDirectory("tmp_");
var task = new UnzipFile(zip, directory);
task.execute(new WorkflowContext());
FileUtils.deleteRecursively(directory);
@Override
public void execute(WorkflowContext context) throws Exception {
context.cleanData();
}
}
Loading