Skip to content

Commit

Permalink
Improve workflow package (#802)
Browse files Browse the repository at this point in the history
* Replace task records by classes

* Make task argument names more uniform (source, target)

* Introduce a single decompression task

* Remove unused code and tasks

* Initialize lists in the context of the workflow

* Improve javadoc
  • Loading branch information
bchapuis authored Dec 13, 2023
1 parent 0dd52e9 commit 7b97e5d
Show file tree
Hide file tree
Showing 47 changed files with 918 additions and 644 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ public class ImportOsm implements Callable<Integer> {
public Integer call() throws Exception {
new org.apache.baremaps.workflow.tasks.ImportOsmPbf(
file.toAbsolutePath(),
null,
true,
database,
srid,
true).execute(new WorkflowContext());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,13 @@ public class UpdateOsm implements Callable<Integer> {
description = "The projection used by the database.")
private int srid = 3857;

@Option(names = {"--replication-url"}, paramLabel = "REPLICATION_URL",
description = "The replication url of the OpenStreetMap server.")
private String replicationUrl = "https://planet.osm.org/replication/hour";

@Override
public Integer call() throws Exception {
new UpdateOsmDatabase(database, srid)
new UpdateOsmDatabase(database, srid, replicationUrl)
.execute(new WorkflowContext());
return 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,22 +50,14 @@ public class Export implements Callable<Integer> {
@Option(names = {"--tiles"}, paramLabel = "TILES", description = "The tiles to export.")
private URI tiles;

@Option(names = {"--batch-array-size"}, paramLabel = "BATCH_ARRAY_SIZE",
description = "The size of the batch array.")
private int batchArraySize = 1;

@Option(names = {"--batch-array-index"}, paramLabel = "READER",
description = "The index of the batch in the array.")
private int batchArrayIndex = 0;

@Option(names = {"--format"}, paramLabel = "FORMAT",
description = "The format of the repository.")
private ExportVectorTiles.Format format = ExportVectorTiles.Format.file;

@Override
public Integer call() throws Exception {
new ExportVectorTiles(tileset.toAbsolutePath(),
repository.toAbsolutePath(), batchArraySize, batchArrayIndex, format)
repository.toAbsolutePath(), format)
.execute(new WorkflowContext());
return 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public Integer call() throws Exception {
var configReader = new ConfigReader();
var workflow = mapper.readValue(configReader.read(file.toAbsolutePath()), Workflow.class);
try (var executor = new WorkflowExecutor(workflow)) {
executor.execute().get();
executor.execute();
}
logger.info("Finished executing the workflow {}", file);
return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,16 +73,16 @@ public static Geometry deserialize(byte[] wkb) {
/**
* Creates a coordinate transform with the provided SRIDs.
*
* @param sourceSRID the source SRID
* @param targetSRID the target SRID
* @param sourceSrid the source SRID
* @param targetSrid the target SRID
* @return the coordinate transform
*/
public static CoordinateTransform coordinateTransform(Integer sourceSRID, Integer targetSRID) {
public static CoordinateTransform coordinateTransform(Integer sourceSrid, Integer targetSrid) {
CRSFactory crsFactory = new CRSFactory();
CoordinateReferenceSystem sourceCRS =
crsFactory.createFromName(String.format("EPSG:%d", sourceSRID));
crsFactory.createFromName(String.format("EPSG:%d", sourceSrid));
CoordinateReferenceSystem targetCRS =
crsFactory.createFromName(String.format("EPSG:%d", targetSRID));
crsFactory.createFromName(String.format("EPSG:%d", targetSrid));
CoordinateTransformFactory coordinateTransformFactory = new CoordinateTransformFactory();
return coordinateTransformFactory.createTransform(sourceCRS, targetCRS);
}
Expand Down
40 changes: 20 additions & 20 deletions baremaps-core/src/main/java/org/apache/baremaps/workflow/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
Expand All @@ -32,26 +33,25 @@
@JsonSerialize
@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.ANY)
@JsonTypeInfo(use = Id.NAME, property = "type")
@JsonSubTypes({@JsonSubTypes.Type(value = DownloadUrl.class, name = "DownloadUrl"),
@JsonSubTypes.Type(value = ExecuteCommand.class, name = "ExecuteCommand"),
@JsonSubTypes.Type(value = ExecuteSql.class, name = "ExecuteSql"),
@JsonSubTypes.Type(value = ExecuteSqlScript.class, name = "ExecuteSqlScript"),
@JsonSubTypes.Type(value = ExportVectorTiles.class, name = "ExportVectorTiles"),
@JsonSubTypes.Type(value = ImportGeoPackage.class, name = "ImportGeoPackage"),
@JsonSubTypes.Type(value = ImportOsmPbf.class, name = "ImportOsmPbf"),
@JsonSubTypes.Type(value = ImportOsmOsc.class, name = "ImportOsmOsc"),
@JsonSubTypes.Type(value = ImportShapefile.class, name = "ImportShapefile"),
@JsonSubTypes.Type(value = LogMessage.class, name = "LogMessage"),
@JsonSubTypes.Type(value = UnzipFile.class, name = "UnzipFile"),
@JsonSubTypes.Type(value = UngzipFile.class, name = "UngzipFile"),
@JsonSubTypes.Type(value = DecompressBZip2.class, name = "DecompressBZip2"),
@JsonSubTypes.Type(value = DecompressFile.class, name = "DecompressFile"),
@JsonSubTypes.Type(value = UpdateOsmDatabase.class, name = "UpdateOsmDatabase"),
@JsonSubTypes.Type(value = CreateGeonamesIndex.class, name = "CreateGeonamesIndex"),
@JsonSubTypes.Type(value = CreateIplocIndex.class, name = "CreateIplocIndex"),
@JsonSubTypes.Type(value = ImportDaylightTranslations.class,
name = "ImportDaylightTranslations"),
@JsonSubTypes.Type(value = ImportDaylightFeatures.class, name = "ImportDaylightFeatures")
@JsonSubTypes({
@Type(value = CleanContextCache.class, name = "CleanContextCache"),
@Type(value = CleanContextData.class, name = "CleanContextData"),
@Type(value = CreateGeonamesIndex.class, name = "CreateGeonamesIndex"),
@Type(value = CreateIplocIndex.class, name = "CreateIplocIndex"),
@Type(value = DecompressFile.class, name = "DecompressFile"),
@Type(value = DownloadUrl.class, name = "DownloadUrl"),
@Type(value = ExecuteCommand.class, name = "ExecuteCommand"),
@Type(value = ExecuteSql.class, name = "ExecuteSql"),
@Type(value = ExecuteSqlScript.class, name = "ExecuteSqlScript"),
@Type(value = ExportVectorTiles.class, name = "ExportVectorTiles"),
@Type(value = ImportDaylightFeatures.class, name = "ImportDaylightFeatures"),
@Type(value = ImportDaylightTranslations.class, name = "ImportDaylightTranslations"),
@Type(value = ImportGeoPackage.class, name = "ImportGeoPackage"),
@Type(value = ImportOsmOsc.class, name = "ImportOsmOsc"),
@Type(value = ImportOsmPbf.class, name = "ImportOsmPbf"),
@Type(value = ImportShapefile.class, name = "ImportShapefile"),
@Type(value = LogMessage.class, name = "LogMessage"),
@Type(value = UpdateOsmDatabase.class, name = "UpdateOsmDatabase"),
})
public interface Task {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,40 @@



import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.sql.DataSource;
import org.apache.baremaps.database.collection.*;
import org.apache.baremaps.database.memory.MemoryMappedDirectory;
import org.apache.baremaps.database.type.*;
import org.apache.baremaps.database.type.geometry.LonLatDataType;
import org.apache.baremaps.utils.FileUtils;
import org.apache.baremaps.utils.PostgresUtils;
import org.locationtech.jts.geom.Coordinate;

/**
* A context that is passed to the tasks of a workflow and used to share data between tasks.
*/
public class WorkflowContext {

private final Path dataDir;

private final Path cacheDir;

public WorkflowContext() {
this(Paths.get("./data"), Paths.get("./cache"));
}

public WorkflowContext(Path dataDir, Path cacheDir) {
this.dataDir = dataDir;
this.cacheDir = cacheDir;
}

private Map<Object, DataSource> dataSources = new ConcurrentHashMap<>() {};

/**
Expand All @@ -41,4 +65,45 @@ public DataSource getDataSource(Object database) {
return dataSources.computeIfAbsent(database, PostgresUtils::createDataSourceFromObject);
}

public DataMap<Long, Coordinate> getCoordinateMap(Path path) throws IOException {
if (Files.size(path) > 1 << 30) {
return getMemoryAlignedDataMap("coordinates", new LonLatDataType());
} else {
return getMonotonicDataMap("coordinates", new LonLatDataType());
}
}

public DataMap<Long, List<Long>> getReferenceMap(Path path) throws IOException {
return getMonotonicDataMap("references", new LongListDataType());
}

public <T> DataMap<Long, T> getMemoryAlignedDataMap(String name, FixedSizeDataType<T> dataType)
throws IOException {
var coordinateDir = Files.createDirectories(cacheDir.resolve(name));
return new MemoryAlignedDataMap<>(
dataType,
new MemoryMappedDirectory(coordinateDir));
}

public <T> DataMap<Long, T> getMonotonicDataMap(String name, DataType<T> dataType)
throws IOException {
var mapDir = Files.createDirectories(cacheDir.resolve(name));
var keysDir = Files.createDirectories(mapDir.resolve("keys"));
var valuesDir = Files.createDirectories(mapDir.resolve("values"));
return new MonotonicDataMap<>(
new MemoryAlignedDataList<>(
new PairDataType<>(new LongDataType(), new LongDataType()),
new MemoryMappedDirectory(keysDir)),
new AppendOnlyBuffer<>(
dataType,
new MemoryMappedDirectory(valuesDir)));
}

public void cleanCache() throws IOException {
FileUtils.deleteRecursively(cacheDir);
}

public void cleanData() throws IOException {
FileUtils.deleteRecursively(dataDir);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,17 +105,24 @@ public WorkflowExecutor(Workflow workflow, ExecutorService executorService) {
/**
* Executes the workflow.
*/
public CompletableFuture<Void> execute() {
public void execute() {
try {
executeAsync().join();
logStepMeasures();
} catch (Exception e) {
logger.error("Error while executing the workflow", e);
}
}

public CompletableFuture<Void> executeAsync() {
// Create futures for each end step
var endSteps = graph.nodes().stream()
.filter(this::isEndStep)
.map(this::getFutureStep)
.map(this::getStep)
.toArray(CompletableFuture[]::new);

// Create a future that logs the stepMeasures when all the futures are completed
var future = CompletableFuture.allOf(endSteps).thenRun(this::logStepMeasures);

return future;
// Wait for all the end steps to complete
return CompletableFuture.allOf(endSteps);
}

/**
Expand All @@ -125,8 +132,8 @@ public CompletableFuture<Void> execute() {
* @param step the step id
* @return the future step
*/
private CompletableFuture<Void> getFutureStep(String step) {
return futures.computeIfAbsent(step, this::createFutureStep);
private CompletableFuture<Void> getStep(String step) {
return futures.computeIfAbsent(step, this::createStep);
}

/**
Expand All @@ -135,10 +142,10 @@ private CompletableFuture<Void> getFutureStep(String step) {
* @param stepId the step id
* @return the future step
*/
private CompletableFuture<Void> createFutureStep(String stepId) {
private CompletableFuture<Void> createStep(String stepId) {
// Initialize the future step with the previous future step
// as it depends on its completion.
var future = getPreviousFutureStep(stepId);
var future = getPreviousStep(stepId);

// Time the execution of the tasks
var measures = new ArrayList<TaskMeasure>();
Expand Down Expand Up @@ -182,7 +189,7 @@ private CompletableFuture<Void> createFutureStep(String stepId) {
* @param stepId the step id
* @return the future step
*/
private CompletableFuture<Void> getPreviousFutureStep(String stepId) {
private CompletableFuture<Void> getPreviousStep(String stepId) {
var predecessors = graph.predecessors(stepId).stream().toList();

// If the step has no predecessor,
Expand All @@ -194,13 +201,13 @@ private CompletableFuture<Void> getPreviousFutureStep(String stepId) {
// If the step has one predecessor,
// return the future step associated to it.
if (predecessors.size() == 1) {
return getFutureStep(predecessors.get(0));
return getStep(predecessors.get(0));
}

// If the step has multiple predecessors,
// return a future step that completes when all the predecessors complete.
var futurePredecessors = predecessors.stream()
.map(this::getFutureStep)
.map(this::getStep)
.toArray(CompletableFuture[]::new);
return CompletableFuture.allOf(futurePredecessors);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,24 @@

package org.apache.baremaps.workflow.tasks;



import java.nio.file.Files;
import org.apache.baremaps.testing.TestFiles;
import org.apache.baremaps.utils.FileUtils;
import org.apache.baremaps.workflow.Task;
import org.apache.baremaps.workflow.WorkflowContext;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

class UngzipFileTest {
/**
* Clean the context cache.
*/
public class CleanContextCache implements Task {

/**
* Constructs an {@code CleanContextCache}.
*/
public CleanContextCache() {}

@Test
@Tag("integration")
void run() throws Exception {
var gzip = TestFiles.resolve("ripe/sample.txt.gz");
var directory = Files.createTempDirectory("tmp_");
var task = new UngzipFile(gzip, directory);
task.execute(new WorkflowContext());
FileUtils.deleteRecursively(directory);
/**
* {@inheritDoc}
*/
@Override
public void execute(WorkflowContext context) throws Exception {
context.cleanCache();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,21 @@

package org.apache.baremaps.workflow.tasks;



import java.nio.file.Files;
import org.apache.baremaps.testing.TestFiles;
import org.apache.baremaps.utils.FileUtils;
import org.apache.baremaps.workflow.Task;
import org.apache.baremaps.workflow.WorkflowContext;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

class UnzipFileTest {
/**
* Clean the context data.
*/
public class CleanContextData implements Task {

/**
* Constructs an {@code CleanContextData}.
*/
public CleanContextData() {}

@Test
@Tag("integration")
void execute() throws Exception {
var zip = TestFiles.resolve("monaco-shapefile.zip");
var directory = Files.createTempDirectory("tmp_");
var task = new UnzipFile(zip, directory);
task.execute(new WorkflowContext());
FileUtils.deleteRecursively(directory);
@Override
public void execute(WorkflowContext context) throws Exception {
context.cleanData();
}
}
Loading

0 comments on commit 7b97e5d

Please sign in to comment.