diff --git a/baremaps-core/src/main/java/org/apache/baremaps/openstreetmap/state/StateReader.java b/baremaps-core/src/main/java/org/apache/baremaps/openstreetmap/state/StateReader.java index 9d8e22940..5be7e154c 100644 --- a/baremaps-core/src/main/java/org/apache/baremaps/openstreetmap/state/StateReader.java +++ b/baremaps-core/src/main/java/org/apache/baremaps/openstreetmap/state/StateReader.java @@ -23,22 +23,40 @@ import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; +import java.net.MalformedURLException; +import java.net.URI; +import java.net.URL; import java.nio.charset.StandardCharsets; import java.time.LocalDateTime; +import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import org.apache.baremaps.openstreetmap.model.State; public class StateReader { + private final String replicationUrl; + + private final boolean balancedSearch; + + public StateReader() { + this("https://planet.osm.org/replication/hour", true); + } + + public StateReader(String replicationUrl, boolean balancedSearch) { + this.replicationUrl = replicationUrl; + this.balancedSearch = balancedSearch; + } + /** * Parse an OSM state file. * * @param input the OpenStreetMap state file * @return the state */ - public State state(InputStream input) throws IOException { + public State readState(InputStream input) throws IOException { InputStreamReader reader = new InputStreamReader(input, StandardCharsets.UTF_8); Map map = new HashMap<>(); for (String line : CharStreams.readLines(reader)) { @@ -52,4 +70,119 @@ public State state(InputStream input) throws IOException { LocalDateTime timestamp = LocalDateTime.parse(map.get("timestamp").replace("\\", ""), format); return new State(sequenceNumber, timestamp); } + + public Optional getStateFromTimestamp(LocalDateTime timestamp) { + var upper = getState(Optional.empty()); + if (upper.isEmpty()) { + return Optional.empty(); + } + if (timestamp.isAfter(upper.get().getTimestamp()) || upper.get().getSequenceNumber() <= 0) { + return upper; + } + var lower = Optional.empty(); + var lowerId = Optional.of(0L); + while (lower.isEmpty()) { + lower = getState(lowerId); + if (lower.isPresent() && lower.get().getTimestamp().isAfter(timestamp)) { + if (lower.get().getSequenceNumber() == 0 + || lower.get().getSequenceNumber() + 1 >= upper.get().getSequenceNumber()) { + return lower; + } + upper = lower; + lower = Optional.empty(); + lowerId = Optional.of(0L); + } + if (lower.isEmpty()) { + var newId = (lowerId.get() + upper.get().getSequenceNumber()) / 2; + if (newId <= lowerId.get()) { + return upper; + } + lowerId = Optional.of(newId); + } + } + long baseSplitId; + while (true) { + if (balancedSearch) { + baseSplitId = ((lower.get().getSequenceNumber() + upper.get().getSequenceNumber()) / 2); + } else { + var tsInt = upper.get().getTimestamp().toEpochSecond(ZoneOffset.UTC) + - lower.get().getTimestamp().toEpochSecond(ZoneOffset.UTC); + var seqInt = upper.get().getSequenceNumber() - lower.get().getSequenceNumber(); + var goal = timestamp.getSecond() - lower.get().getTimestamp().getSecond(); + baseSplitId = lower.get().getSequenceNumber() + (long) Math.ceil(goal * seqInt / tsInt); + if (baseSplitId >= upper.get().getSequenceNumber()) { + baseSplitId = upper.get().getSequenceNumber() - 1; + } + } + var split = getState(Optional.of(baseSplitId)); + if (split.isEmpty()) { + var splitId = baseSplitId - 1; + while (split.isEmpty() && splitId > lower.get().getSequenceNumber()) { + split = getState(Optional.of(splitId)); + splitId--; + } + } + if (split.isEmpty()) { + var splitId = baseSplitId + 1; + while (split.isEmpty() && splitId < upper.get().getSequenceNumber()) { + split = getState(Optional.of(splitId)); + splitId++; + } + } + if (split.isEmpty()) { + return lower; + } + if (split.get().getTimestamp().isBefore(timestamp)) { + lower = split; + } else { + upper = split; + } + if (lower.get().getSequenceNumber() + 1 >= upper.get().getSequenceNumber()) { + return lower; + } + } + } + + public Optional getState(Optional sequenceNumber) { + for (int i = 0; i < 3; i++) { + try (var inputStream = getStateUrl(sequenceNumber).openStream()) { + var state = new StateReader().readState(inputStream); + return Optional.of(state); + } catch (Exception e) { + e.printStackTrace(); + } + } + return Optional.empty(); + } + + public URL getStateUrl(Optional sequenceNumber) throws MalformedURLException { + if (sequenceNumber.isPresent()) { + + var s = String.format("%09d", sequenceNumber.get()); + var uri = + String.format("%s/%s/%s/%s.%s", replicationUrl, s.substring(0, 3), s.substring(3, 6), + s.substring(6, 9), "state.txt"); + return URI.create(uri).toURL(); + } else { + return new URL(replicationUrl + "/state.txt"); + } + } + + public static URL resolve(String replicationUrl, Long sequenceNumber, String extension) + throws MalformedURLException { + var s = String.format("%09d", sequenceNumber); + var uri = String.format("%s/%s/%s/%s.%s", replicationUrl, s.substring(0, 3), s.substring(3, 6), + s.substring(6, 9), extension); + return URI.create(uri).toURL(); + } + + public static void main(String... args) throws MalformedURLException { + var reader = new StateReader(); + var state = reader.getStateFromTimestamp(LocalDateTime.now().minusDays(10)); + System.out.println(state.get().getSequenceNumber()); + System.out + .println(resolve(reader.replicationUrl, state.get().getSequenceNumber(), "state.txt")); + System.out.println(resolve(reader.replicationUrl, state.get().getSequenceNumber(), "osc.gz")); + } + } diff --git a/baremaps-core/src/main/java/org/apache/baremaps/workflow/tasks/UpdateOsmDatabase.java b/baremaps-core/src/main/java/org/apache/baremaps/workflow/tasks/UpdateOsmDatabase.java index df08d1af1..ef5839059 100644 --- a/baremaps-core/src/main/java/org/apache/baremaps/workflow/tasks/UpdateOsmDatabase.java +++ b/baremaps-core/src/main/java/org/apache/baremaps/workflow/tasks/UpdateOsmDatabase.java @@ -20,9 +20,6 @@ import static org.apache.baremaps.stream.ConsumerUtils.consumeThenReturn; import java.io.BufferedInputStream; -import java.net.MalformedURLException; -import java.net.URI; -import java.net.URL; import java.util.List; import java.util.zip.GZIPInputStream; import org.apache.baremaps.database.collection.DataMap; @@ -86,39 +83,45 @@ public static void execute(DataMap coordinateMap, String replicationUrl) throws Exception { var header = headerRepository.selectLatest(); - var sequenceNumber = header.getReplicationSequenceNumber() + 1; + // If the replicationUrl is not provided, use the one from the latest header. if (replicationUrl == null) { replicationUrl = header.getReplicationUrl(); } + var stateReader = new StateReader("https://planet.osm.org/replication/hour", true); + var sequenceNumber = header.getReplicationSequenceNumber(); + + // If the replicationTimestamp is not provided, guess it from the replication timestamp. + if (sequenceNumber <= 0) { + var replicationTimestamp = header.getReplicationTimestamp(); + var state = stateReader.getStateFromTimestamp(replicationTimestamp); + if (state.isPresent()) { + sequenceNumber = state.get().getSequenceNumber(); + } + } + + var nextSequenceNumber = sequenceNumber + 1; + var changeUrl = StateReader.resolve(replicationUrl, nextSequenceNumber, "osc.gz"); + logger.info("Updating the database with the changeset: {}", changeUrl); + var createGeometry = new EntityGeometryBuilder(coordinateMap, referenceMap); var reprojectGeometry = new EntityProjectionTransformer(4326, databaseSrid); var prepareGeometries = new ChangeEntitiesHandler(createGeometry.andThen(reprojectGeometry)); var prepareChange = consumeThenReturn(prepareGeometries); var importChange = new PutChangeImporter(nodeRepository, wayRepository, relationRepository); - var changeUrl = resolve(replicationUrl, sequenceNumber, "osc.gz"); - logger.info("Updating the database with the changeset: {}", changeUrl); - try (var changeInputStream = new GZIPInputStream(new BufferedInputStream(changeUrl.openStream()))) { new XmlChangeReader().stream(changeInputStream).map(prepareChange).forEach(importChange); } - var stateUrl = resolve(replicationUrl, sequenceNumber, "state.txt"); + var stateUrl = StateReader.resolve(replicationUrl, nextSequenceNumber, "state.txt"); try (var stateInputStream = new BufferedInputStream(stateUrl.openStream())) { - var state = new StateReader().state(stateInputStream); + var state = new StateReader().readState(stateInputStream); headerRepository.put(new Header(state.getSequenceNumber(), state.getTimestamp(), header.getReplicationUrl(), header.getSource(), header.getWritingProgram())); } } - public static URL resolve(String replicationUrl, Long sequenceNumber, String extension) - throws MalformedURLException { - var s = String.format("%09d", sequenceNumber); - var uri = String.format("%s/%s/%s/%s.%s", replicationUrl, s.substring(0, 3), s.substring(3, 6), - s.substring(6, 9), extension); - return URI.create(uri).toURL(); - } } diff --git a/baremaps-core/src/test/java/org/apache/baremaps/openstreetmap/OpenStreetMapTest.java b/baremaps-core/src/test/java/org/apache/baremaps/openstreetmap/OpenStreetMapTest.java index ddb90f0ce..c8d57544d 100644 --- a/baremaps-core/src/test/java/org/apache/baremaps/openstreetmap/OpenStreetMapTest.java +++ b/baremaps-core/src/test/java/org/apache/baremaps/openstreetmap/OpenStreetMapTest.java @@ -123,7 +123,7 @@ void relationsOsmPbf() throws IOException { @Test void monacoStateTxt() throws URISyntaxException, IOException { try (InputStream inputStream = Files.newInputStream(MONACO_STATE_TXT)) { - State state = new StateReader().state(inputStream); + State state = new StateReader().readState(inputStream); assertEquals(2788, state.getSequenceNumber()); assertEquals(LocalDateTime.parse("2020-11-10T21:42:03"), state.getTimestamp()); }