Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issues in SiriAzure initialization #5760

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,16 @@
import com.azure.messaging.servicebus.administration.models.CreateSubscriptionOptions;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import com.google.common.base.Preconditions;
import com.google.common.io.CharStreams;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.hc.client5.http.classic.methods.HttpGet;
import org.opentripplanner.ext.siri.EntityResolver;
import org.opentripplanner.ext.siri.SiriFuzzyTripMatcher;
import org.opentripplanner.framework.application.ApplicationShutdownSupport;
Expand All @@ -32,8 +32,10 @@
import org.opentripplanner.updater.spi.GraphUpdater;
import org.opentripplanner.updater.spi.HttpHeaders;
import org.opentripplanner.updater.spi.WriteToGraphCallback;
import org.rutebanken.siri20.util.SiriXml;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import uk.org.siri.siri20.ServiceDelivery;

public abstract class AbstractAzureSiriUpdater implements GraphUpdater {

Expand Down Expand Up @@ -157,6 +159,7 @@ public void run() {
.topicName(topicName)
.subscriptionName(subscriptionName)
.receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE)
.disableAutoComplete() // Receive and delete does not need autocomplete
.prefetchCount(prefetchCount)
.processError(errorConsumer)
.processMessage(messageConsumer)
Expand All @@ -170,6 +173,8 @@ public void run() {
prefetchCount
);

setPrimed();

ApplicationShutdownSupport.addShutdownHook(
"azure-siri-updater-shutdown",
() -> {
Expand All @@ -186,29 +191,40 @@ public boolean isPrimed() {
return this.isPrimed;
}

public void setPrimed(boolean primed) {
isPrimed = primed;
private void setPrimed() {
isPrimed = true;
}

@Override
public String getConfigRef() {
return this.configRef;
}

protected String fetchInitialData(URI uri) {
// Maybe put this in the config?
HttpHeaders rh = HttpHeaders.of().acceptApplicationXML().build();
String initialData;
/**
* Returns None for empty result
*/
protected Optional<ServiceDelivery> fetchInitialSiriData(URI uri) {
var headers = HttpHeaders.of().acceptApplicationXML().build().asMap();

try (OtpHttpClient otpHttpClient = new OtpHttpClient()) {
initialData =
otpHttpClient.getAndMap(
uri,
Duration.ofMillis(timeout),
rh.asMap(),
is -> CharStreams.toString(new InputStreamReader(is))
);
var t1 = System.currentTimeMillis();
var siriOptional = otpHttpClient.executeAndMapOptional(
new HttpGet(uri),
Duration.ofMillis(timeout),
headers,
SiriXml::parseXml
);
var t2 = System.currentTimeMillis();
LOG.info("Fetched initial data in {} ms", (t2 - t1));

if (siriOptional.isEmpty()) {
LOG.info("Got status 204 'No Content', handling gracefully.");
return Optional.empty();
}

var serviceDelivery = siriOptional.get().getServiceDelivery();
return Optional.ofNullable(serviceDelivery);
jtorin marked this conversation as resolved.
Show resolved Hide resolved
}
return initialData;
}

SiriFuzzyTripMatcher fuzzyTripMatcher() {
Expand All @@ -232,7 +248,7 @@ private void initializeData() {
initializeData(dataInitializationUrl, messageConsumer);
break;
} catch (Exception e) {
sleepPeriod = sleepPeriod * 2;
sleepPeriod = Math.min(sleepPeriod * 2, 60 * 1000);

LOG.warn(
"Caught exception while initializing data will retry after {} ms - attempt {}. ({})",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,17 @@
import jakarta.xml.bind.JAXBException;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import javax.xml.stream.XMLStreamException;
import org.apache.hc.core5.net.URIBuilder;
import org.opentripplanner.ext.siri.SiriTimetableSnapshotSource;
import org.opentripplanner.framework.time.DurationUtils;
import org.opentripplanner.transit.service.TransitModel;
import org.opentripplanner.updater.spi.ResultLogger;
import org.opentripplanner.updater.spi.UpdateResult;
Expand All @@ -26,6 +24,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import uk.org.siri.siri20.EstimatedTimetableDeliveryStructure;
import uk.org.siri.siri20.ServiceDelivery;

public class SiriAzureETUpdater extends AbstractAzureSiriUpdater {

Expand All @@ -36,8 +35,6 @@ public class SiriAzureETUpdater extends AbstractAzureSiriUpdater {
private final LocalDate fromDateTime;
private final SiriTimetableSnapshotSource snapshotSource;

private Instant startTime;

private final Consumer<UpdateResult> recordMetrics;

public SiriAzureETUpdater(
Expand All @@ -60,7 +57,14 @@ protected void messageConsumer(ServiceBusReceivedMessageContext messageContext)
LOG.debug("Total SIRI-ET messages received={}", MESSAGE_COUNTER.get());
}

processMessage(message.getBody().toString(), message.getMessageId());
try {
var updates = parseSiriEt(message.getBody().toString(), message.getMessageId());
if (!updates.isEmpty()) {
processMessage(updates);
}
} catch (JAXBException | XMLStreamException e) {
LOG.error(e.getLocalizedMessage(), e);
}
}

@Override
Expand All @@ -80,89 +84,53 @@ protected void initializeData(String url, Consumer<ServiceBusReceivedMessageCont
.addParameter("fromDateTime", fromDateTime.format(DateTimeFormatter.ISO_LOCAL_DATE))
.build();

while (!isPrimed()) {
startTime = Instant.now();
LOG.info("Fetching initial Siri ET data from {}, timeout is {}ms", uri, timeout);
final long t1 = System.currentTimeMillis();
String string = fetchInitialData(uri);
final long t2 = System.currentTimeMillis();

LOG.info(
"Fetching initial data - finished after {} ms, got {} bytes",
(t2 - t1),
string.length()
);
LOG.info("Fetching initial Siri ET data from {}, timeout is {} ms.", uri, timeout);
var siri = fetchInitialSiriData(uri);

// This is fine since runnables are scheduled after each other
processHistory(string, "ET-INITIAL-1");
if (siri.isEmpty()) {
LOG.info("Got empty ET response from history endpoint");
return;
}

// This is fine since runnables are scheduled after each other
processHistory(siri.get());
}

private void processMessage(String message, String id) {
try {
List<EstimatedTimetableDeliveryStructure> updates = getUpdates(message, id);
private Future<?> processMessage(List<EstimatedTimetableDeliveryStructure> updates) {
return super.saveResultOnGraph.execute((graph, transitModel) -> {
var result = snapshotSource.applyEstimatedTimetable(
fuzzyTripMatcher(),
entityResolver(),
feedId,
false,
updates
);
ResultLogger.logUpdateResultErrors(feedId, "siri-et", result);
recordMetrics.accept(result);
});
}

if (updates.isEmpty()) {
return;
}
private void processHistory(ServiceDelivery siri) {
var updates = siri.getEstimatedTimetableDeliveries();

super.saveResultOnGraph.execute((graph, transitModel) -> {
var result = snapshotSource.applyEstimatedTimetable(
fuzzyTripMatcher(),
entityResolver(),
feedId,
false,
updates
);
ResultLogger.logUpdateResultErrors(feedId, "siri-et", result);
recordMetrics.accept(result);
});
} catch (JAXBException | XMLStreamException e) {
LOG.error(e.getLocalizedMessage(), e);
if (updates == null || updates.isEmpty()) {
LOG.info("Did not receive any ET messages from history endpoint");
return;
}
}

private void processHistory(String message, String id) {
try {
List<EstimatedTimetableDeliveryStructure> updates = getUpdates(message, id);

if (updates.isEmpty()) {
LOG.info("Did not receive any ET messages from history endpoint");
return;
}

var f = super.saveResultOnGraph.execute((graph, transitModel) -> {
try {
long t1 = System.currentTimeMillis();
var result = snapshotSource.applyEstimatedTimetable(
fuzzyTripMatcher(),
entityResolver(),
feedId,
false,
updates
);
ResultLogger.logUpdateResultErrors(feedId, "siri-et", result);
recordMetrics.accept(result);

setPrimed(true);
LOG.info(
"Azure ET updater initialized after {} ms: [time since startup: {}]",
(System.currentTimeMillis() - t1),
DurationUtils.durationToStr(Duration.between(startTime, Instant.now()))
);
} catch (Exception e) {
LOG.error("Could not process ET history", e);
}
});
long t1 = System.currentTimeMillis();
var f = processMessage(updates);
f.get();
} catch (JAXBException | XMLStreamException | ExecutionException | InterruptedException e) {
LOG.error(e.getLocalizedMessage(), e);
LOG.info("Azure ET updater initialized in {} ms.", (System.currentTimeMillis() - t1));
} catch (ExecutionException | InterruptedException e) {
throw new SiriAzureInitializationException("Error applying history", e);
}
}

private List<EstimatedTimetableDeliveryStructure> getUpdates(String message, String id)
private List<EstimatedTimetableDeliveryStructure> parseSiriEt(String siriXmlMessage, String id)
throws JAXBException, XMLStreamException {
var siri = SiriXml.parseXml(message);
var siri = SiriXml.parseXml(siriXmlMessage);
if (
siri.getServiceDelivery() == null ||
siri.getServiceDelivery().getEstimatedTimetableDeliveries() == null ||
Expand All @@ -171,7 +139,7 @@ private List<EstimatedTimetableDeliveryStructure> getUpdates(String message, Str
if (siri.getHeartbeatNotification() != null) {
LOG.debug("Received SIRI heartbeat message");
} else {
LOG.warn("Empty Siri message {}: {}", id, message);
LOG.info("Empty Siri message {}: {}", id, siriXmlMessage);
}
return new ArrayList<>();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package org.opentripplanner.ext.siri.updater.azure;

public class SiriAzureInitializationException extends RuntimeException {

public SiriAzureInitializationException(String message, Throwable cause) {
super(message, cause);
}
}
Loading
Loading