Skip to content

Commit

Permalink
Compute the sequence number for planet osm
Browse files Browse the repository at this point in the history
  • Loading branch information
bchapuis committed Nov 15, 2023
1 parent 04ef184 commit 62519af
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,40 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.apache.baremaps.openstreetmap.model.State;

public class StateReader {

private final String replicationUrl;

private final boolean balancedSearch;

public StateReader() {
this("https://planet.osm.org/replication/hour", true);
}

public StateReader(String replicationUrl, boolean balancedSearch) {
this.replicationUrl = replicationUrl;
this.balancedSearch = balancedSearch;
}

/**
* Parse an OSM state file.
*
* @param input the OpenStreetMap state file
* @return the state
*/
public State state(InputStream input) throws IOException {
public State readState(InputStream input) throws IOException {
InputStreamReader reader = new InputStreamReader(input, StandardCharsets.UTF_8);
Map<String, String> map = new HashMap<>();
for (String line : CharStreams.readLines(reader)) {
Expand All @@ -52,4 +70,119 @@ public State state(InputStream input) throws IOException {
LocalDateTime timestamp = LocalDateTime.parse(map.get("timestamp").replace("\\", ""), format);
return new State(sequenceNumber, timestamp);
}

public Optional<State> getStateFromTimestamp(LocalDateTime timestamp) {
var upper = getState(Optional.empty());
if (upper.isEmpty()) {
return Optional.empty();
}
if (timestamp.isAfter(upper.get().getTimestamp()) || upper.get().getSequenceNumber() <= 0) {
return upper;
}
var lower = Optional.<State>empty();
var lowerId = Optional.of(0L);
while (lower.isEmpty()) {
lower = getState(lowerId);
if (lower.isPresent() && lower.get().getTimestamp().isAfter(timestamp)) {
if (lower.get().getSequenceNumber() == 0
|| lower.get().getSequenceNumber() + 1 >= upper.get().getSequenceNumber()) {
return lower;
}
upper = lower;
lower = Optional.empty();
lowerId = Optional.of(0L);
}
if (lower.isEmpty()) {
var newId = (lowerId.get() + upper.get().getSequenceNumber()) / 2;
if (newId <= lowerId.get()) {
return upper;
}
lowerId = Optional.of(newId);
}
}
long baseSplitId;
while (true) {
if (balancedSearch) {
baseSplitId = ((lower.get().getSequenceNumber() + upper.get().getSequenceNumber()) / 2);
} else {
var tsInt = upper.get().getTimestamp().toEpochSecond(ZoneOffset.UTC)
- lower.get().getTimestamp().toEpochSecond(ZoneOffset.UTC);
var seqInt = upper.get().getSequenceNumber() - lower.get().getSequenceNumber();
var goal = timestamp.getSecond() - lower.get().getTimestamp().getSecond();
baseSplitId = lower.get().getSequenceNumber() + (long) Math.ceil(goal * seqInt / tsInt);
if (baseSplitId >= upper.get().getSequenceNumber()) {
baseSplitId = upper.get().getSequenceNumber() - 1;
}
}
var split = getState(Optional.of(baseSplitId));
if (split.isEmpty()) {
var splitId = baseSplitId - 1;
while (split.isEmpty() && splitId > lower.get().getSequenceNumber()) {
split = getState(Optional.of(splitId));
splitId--;
}
}
if (split.isEmpty()) {
var splitId = baseSplitId + 1;
while (split.isEmpty() && splitId < upper.get().getSequenceNumber()) {
split = getState(Optional.of(splitId));
splitId++;
}
}
if (split.isEmpty()) {
return lower;
}
if (split.get().getTimestamp().isBefore(timestamp)) {
lower = split;
} else {
upper = split;
}
if (lower.get().getSequenceNumber() + 1 >= upper.get().getSequenceNumber()) {
return lower;
}
}
}

public Optional<State> getState(Optional<Long> sequenceNumber) {
for (int i = 0; i < 3; i++) {
try (var inputStream = getStateUrl(sequenceNumber).openStream()) {
var state = new StateReader().readState(inputStream);
return Optional.of(state);
} catch (Exception e) {
e.printStackTrace();
}
}
return Optional.empty();
}

public URL getStateUrl(Optional<Long> sequenceNumber) throws MalformedURLException {
if (sequenceNumber.isPresent()) {

var s = String.format("%09d", sequenceNumber.get());
var uri =
String.format("%s/%s/%s/%s.%s", replicationUrl, s.substring(0, 3), s.substring(3, 6),
s.substring(6, 9), "state.txt");
return URI.create(uri).toURL();
} else {
return new URL(replicationUrl + "/state.txt");
}
}

public static URL resolve(String replicationUrl, Long sequenceNumber, String extension)
throws MalformedURLException {
var s = String.format("%09d", sequenceNumber);
var uri = String.format("%s/%s/%s/%s.%s", replicationUrl, s.substring(0, 3), s.substring(3, 6),
s.substring(6, 9), extension);
return URI.create(uri).toURL();
}

public static void main(String... args) throws MalformedURLException {
var reader = new StateReader();
var state = reader.getStateFromTimestamp(LocalDateTime.now().minusDays(10));
System.out.println(state.get().getSequenceNumber());
System.out
.println(resolve(reader.replicationUrl, state.get().getSequenceNumber(), "state.txt"));
System.out.println(resolve(reader.replicationUrl, state.get().getSequenceNumber(), "osc.gz"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@
import static org.apache.baremaps.stream.ConsumerUtils.consumeThenReturn;

import java.io.BufferedInputStream;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.util.List;
import java.util.zip.GZIPInputStream;
import org.apache.baremaps.database.collection.DataMap;
Expand Down Expand Up @@ -86,39 +83,45 @@ public static void execute(DataMap<Long, Coordinate> coordinateMap,
String replicationUrl) throws Exception {

var header = headerRepository.selectLatest();
var sequenceNumber = header.getReplicationSequenceNumber() + 1;

// If the replicationUrl is not provided, use the one from the latest header.
if (replicationUrl == null) {
replicationUrl = header.getReplicationUrl();
}

var stateReader = new StateReader("https://planet.osm.org/replication/hour", true);
var sequenceNumber = header.getReplicationSequenceNumber();

// If the replicationTimestamp is not provided, guess it from the replication timestamp.
if (sequenceNumber <= 0) {
var replicationTimestamp = header.getReplicationTimestamp();
var state = stateReader.getStateFromTimestamp(replicationTimestamp);
if (state.isPresent()) {
sequenceNumber = state.get().getSequenceNumber();
}
}

var nextSequenceNumber = sequenceNumber + 1;
var changeUrl = StateReader.resolve(replicationUrl, nextSequenceNumber, "osc.gz");
logger.info("Updating the database with the changeset: {}", changeUrl);

var createGeometry = new EntityGeometryBuilder(coordinateMap, referenceMap);
var reprojectGeometry = new EntityProjectionTransformer(4326, databaseSrid);
var prepareGeometries = new ChangeEntitiesHandler(createGeometry.andThen(reprojectGeometry));
var prepareChange = consumeThenReturn(prepareGeometries);
var importChange = new PutChangeImporter(nodeRepository, wayRepository, relationRepository);

var changeUrl = resolve(replicationUrl, sequenceNumber, "osc.gz");
logger.info("Updating the database with the changeset: {}", changeUrl);

try (var changeInputStream =
new GZIPInputStream(new BufferedInputStream(changeUrl.openStream()))) {
new XmlChangeReader().stream(changeInputStream).map(prepareChange).forEach(importChange);
}

var stateUrl = resolve(replicationUrl, sequenceNumber, "state.txt");
var stateUrl = StateReader.resolve(replicationUrl, nextSequenceNumber, "state.txt");
try (var stateInputStream = new BufferedInputStream(stateUrl.openStream())) {
var state = new StateReader().state(stateInputStream);
var state = new StateReader().readState(stateInputStream);
headerRepository.put(new Header(state.getSequenceNumber(), state.getTimestamp(),
header.getReplicationUrl(), header.getSource(), header.getWritingProgram()));
}
}

public static URL resolve(String replicationUrl, Long sequenceNumber, String extension)
throws MalformedURLException {
var s = String.format("%09d", sequenceNumber);
var uri = String.format("%s/%s/%s/%s.%s", replicationUrl, s.substring(0, 3), s.substring(3, 6),
s.substring(6, 9), extension);
return URI.create(uri).toURL();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ void relationsOsmPbf() throws IOException {
@Test
void monacoStateTxt() throws URISyntaxException, IOException {
try (InputStream inputStream = Files.newInputStream(MONACO_STATE_TXT)) {
State state = new StateReader().state(inputStream);
State state = new StateReader().readState(inputStream);
assertEquals(2788, state.getSequenceNumber());
assertEquals(LocalDateTime.parse("2020-11-10T21:42:03"), state.getTimestamp());
}
Expand Down

0 comments on commit 62519af

Please sign in to comment.