From 07d8e877b4cdffbbb8fbce0226fd49982f2bd0f5 Mon Sep 17 00:00:00 2001 From: Henrik Abrahamsson Date: Wed, 20 Mar 2024 11:31:09 +0100 Subject: [PATCH 1/6] SiriAzureUpdaters: refactor initialization logic - Remove duplicate retry logic - Make sure that primed status is set correctly --- .../azure/AbstractAzureSiriUpdater.java | 36 +++++- .../updater/azure/SiriAzureETUpdater.java | 120 +++++++----------- .../SiriAzureInitializationException.java | 8 ++ .../updater/azure/SiriAzureSXUpdater.java | 113 +++++++---------- 4 files changed, 129 insertions(+), 148 deletions(-) create mode 100644 src/ext/java/org/opentripplanner/ext/siri/updater/azure/SiriAzureInitializationException.java diff --git a/src/ext/java/org/opentripplanner/ext/siri/updater/azure/AbstractAzureSiriUpdater.java b/src/ext/java/org/opentripplanner/ext/siri/updater/azure/AbstractAzureSiriUpdater.java index 3a600a755b1..f24999eb521 100644 --- a/src/ext/java/org/opentripplanner/ext/siri/updater/azure/AbstractAzureSiriUpdater.java +++ b/src/ext/java/org/opentripplanner/ext/siri/updater/azure/AbstractAzureSiriUpdater.java @@ -13,15 +13,18 @@ import com.azure.messaging.servicebus.models.ServiceBusReceiveMode; import com.google.common.base.Preconditions; import com.google.common.io.CharStreams; +import jakarta.xml.bind.JAXBException; import java.io.InputStreamReader; import java.net.URI; import java.net.URISyntaxException; import java.time.Duration; import java.time.temporal.ChronoUnit; import java.util.Objects; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import javax.xml.stream.XMLStreamException; import org.opentripplanner.ext.siri.EntityResolver; import org.opentripplanner.ext.siri.SiriFuzzyTripMatcher; import org.opentripplanner.framework.application.ApplicationShutdownSupport; @@ -32,8 +35,10 @@ import org.opentripplanner.updater.spi.GraphUpdater; import org.opentripplanner.updater.spi.HttpHeaders; import org.opentripplanner.updater.spi.WriteToGraphCallback; +import org.rutebanken.siri20.util.SiriXml; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import uk.org.siri.siri20.ServiceDelivery; public abstract class AbstractAzureSiriUpdater implements GraphUpdater { @@ -157,6 +162,7 @@ public void run() { .topicName(topicName) .subscriptionName(subscriptionName) .receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE) + .disableAutoComplete() // Receive and delete does not need autocomplete .prefetchCount(prefetchCount) .processError(errorConsumer) .processMessage(messageConsumer) @@ -170,6 +176,8 @@ public void run() { prefetchCount ); + setPrimed(); + ApplicationShutdownSupport.addShutdownHook( "azure-siri-updater-shutdown", () -> { @@ -186,8 +194,8 @@ public boolean isPrimed() { return this.isPrimed; } - public void setPrimed(boolean primed) { - isPrimed = primed; + private void setPrimed() { + isPrimed = true; } @Override @@ -195,11 +203,16 @@ public String getConfigRef() { return this.configRef; } - protected String fetchInitialData(URI uri) { + /** + * Returns None for empty result + */ + protected Optional fetchInitialSiriData(URI uri) { // Maybe put this in the config? HttpHeaders rh = HttpHeaders.of().acceptApplicationXML().build(); String initialData; + try (OtpHttpClient otpHttpClient = new OtpHttpClient()) { + var t1 = System.currentTimeMillis(); initialData = otpHttpClient.getAndMap( uri, @@ -207,8 +220,21 @@ protected String fetchInitialData(URI uri) { rh.asMap(), is -> CharStreams.toString(new InputStreamReader(is)) ); + final long t2 = System.currentTimeMillis(); + + LOG.info( + "Fetching initial data - finished after {} ms, got {} bytes", + (t2 - t1), + initialData.length() + ); + } + + try { + var serviceDelivery = SiriXml.parseXml(initialData).getServiceDelivery(); + return Optional.ofNullable(serviceDelivery); + } catch (JAXBException | XMLStreamException e) { + throw new SiriAzureInitializationException("Could not parse history message", e); } - return initialData; } SiriFuzzyTripMatcher fuzzyTripMatcher() { @@ -232,7 +258,7 @@ private void initializeData() { initializeData(dataInitializationUrl, messageConsumer); break; } catch (Exception e) { - sleepPeriod = sleepPeriod * 2; + sleepPeriod = Math.min(sleepPeriod * 2, 60 * 1000); LOG.warn( "Caught exception while initializing data will retry after {} ms - attempt {}. ({})", diff --git a/src/ext/java/org/opentripplanner/ext/siri/updater/azure/SiriAzureETUpdater.java b/src/ext/java/org/opentripplanner/ext/siri/updater/azure/SiriAzureETUpdater.java index 4b9a86618a5..76d75cd5576 100644 --- a/src/ext/java/org/opentripplanner/ext/siri/updater/azure/SiriAzureETUpdater.java +++ b/src/ext/java/org/opentripplanner/ext/siri/updater/azure/SiriAzureETUpdater.java @@ -5,19 +5,17 @@ import jakarta.xml.bind.JAXBException; import java.net.URI; import java.net.URISyntaxException; -import java.time.Duration; -import java.time.Instant; import java.time.LocalDate; import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; import javax.xml.stream.XMLStreamException; import org.apache.hc.core5.net.URIBuilder; import org.opentripplanner.ext.siri.SiriTimetableSnapshotSource; -import org.opentripplanner.framework.time.DurationUtils; import org.opentripplanner.transit.service.TransitModel; import org.opentripplanner.updater.spi.ResultLogger; import org.opentripplanner.updater.spi.UpdateResult; @@ -26,6 +24,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import uk.org.siri.siri20.EstimatedTimetableDeliveryStructure; +import uk.org.siri.siri20.ServiceDelivery; public class SiriAzureETUpdater extends AbstractAzureSiriUpdater { @@ -36,8 +35,6 @@ public class SiriAzureETUpdater extends AbstractAzureSiriUpdater { private final LocalDate fromDateTime; private final SiriTimetableSnapshotSource snapshotSource; - private Instant startTime; - private final Consumer recordMetrics; public SiriAzureETUpdater( @@ -60,7 +57,14 @@ protected void messageConsumer(ServiceBusReceivedMessageContext messageContext) LOG.debug("Total SIRI-ET messages received={}", MESSAGE_COUNTER.get()); } - processMessage(message.getBody().toString(), message.getMessageId()); + try { + var updates = parseSiriEt(message.getBody().toString(), message.getMessageId()); + if (!updates.isEmpty()) { + processMessage(updates); + } + } catch (JAXBException | XMLStreamException e) { + LOG.error(e.getLocalizedMessage(), e); + } } @Override @@ -80,89 +84,53 @@ protected void initializeData(String url, Consumer updates = getUpdates(message, id); + private Future processMessage(List updates) { + return super.saveResultOnGraph.execute((graph, transitModel) -> { + var result = snapshotSource.applyEstimatedTimetable( + fuzzyTripMatcher(), + entityResolver(), + feedId, + false, + updates + ); + ResultLogger.logUpdateResultErrors(feedId, "siri-et", result); + recordMetrics.accept(result); + }); + } - if (updates.isEmpty()) { - return; - } + private void processHistory(ServiceDelivery siri) { + var updates = siri.getEstimatedTimetableDeliveries(); - super.saveResultOnGraph.execute((graph, transitModel) -> { - var result = snapshotSource.applyEstimatedTimetable( - fuzzyTripMatcher(), - entityResolver(), - feedId, - false, - updates - ); - ResultLogger.logUpdateResultErrors(feedId, "siri-et", result); - recordMetrics.accept(result); - }); - } catch (JAXBException | XMLStreamException e) { - LOG.error(e.getLocalizedMessage(), e); + if (updates == null || updates.isEmpty()) { + LOG.info("Did not receive any ET messages from history endpoint"); + return; } - } - private void processHistory(String message, String id) { try { - List updates = getUpdates(message, id); - - if (updates.isEmpty()) { - LOG.info("Did not receive any ET messages from history endpoint"); - return; - } - - var f = super.saveResultOnGraph.execute((graph, transitModel) -> { - try { - long t1 = System.currentTimeMillis(); - var result = snapshotSource.applyEstimatedTimetable( - fuzzyTripMatcher(), - entityResolver(), - feedId, - false, - updates - ); - ResultLogger.logUpdateResultErrors(feedId, "siri-et", result); - recordMetrics.accept(result); - - setPrimed(true); - LOG.info( - "Azure ET updater initialized after {} ms: [time since startup: {}]", - (System.currentTimeMillis() - t1), - DurationUtils.durationToStr(Duration.between(startTime, Instant.now())) - ); - } catch (Exception e) { - LOG.error("Could not process ET history", e); - } - }); + long t1 = System.currentTimeMillis(); + var f = processMessage(updates); f.get(); - } catch (JAXBException | XMLStreamException | ExecutionException | InterruptedException e) { - LOG.error(e.getLocalizedMessage(), e); + LOG.info("Azure ET updater initialized after {} ms", (System.currentTimeMillis() - t1)); + } catch (ExecutionException | InterruptedException e) { + throw new SiriAzureInitializationException("Error applying history", e); } } - private List getUpdates(String message, String id) + private List parseSiriEt(String siriXmlMessage, String id) throws JAXBException, XMLStreamException { - var siri = SiriXml.parseXml(message); + var siri = SiriXml.parseXml(siriXmlMessage); if ( siri.getServiceDelivery() == null || siri.getServiceDelivery().getEstimatedTimetableDeliveries() == null || @@ -171,7 +139,7 @@ private List getUpdates(String message, Str if (siri.getHeartbeatNotification() != null) { LOG.debug("Received SIRI heartbeat message"); } else { - LOG.warn("Empty Siri message {}: {}", id, message); + LOG.info("Empty Siri message {}: {}", id, siriXmlMessage); } return new ArrayList<>(); } diff --git a/src/ext/java/org/opentripplanner/ext/siri/updater/azure/SiriAzureInitializationException.java b/src/ext/java/org/opentripplanner/ext/siri/updater/azure/SiriAzureInitializationException.java new file mode 100644 index 00000000000..ced913404b6 --- /dev/null +++ b/src/ext/java/org/opentripplanner/ext/siri/updater/azure/SiriAzureInitializationException.java @@ -0,0 +1,8 @@ +package org.opentripplanner.ext.siri.updater.azure; + +public class SiriAzureInitializationException extends RuntimeException { + + public SiriAzureInitializationException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/src/ext/java/org/opentripplanner/ext/siri/updater/azure/SiriAzureSXUpdater.java b/src/ext/java/org/opentripplanner/ext/siri/updater/azure/SiriAzureSXUpdater.java index 8d33035b971..1766b0ba1a3 100644 --- a/src/ext/java/org/opentripplanner/ext/siri/updater/azure/SiriAzureSXUpdater.java +++ b/src/ext/java/org/opentripplanner/ext/siri/updater/azure/SiriAzureSXUpdater.java @@ -6,16 +6,16 @@ import java.net.URI; import java.net.URISyntaxException; import java.time.Duration; -import java.time.Instant; import java.time.LocalDate; import java.time.format.DateTimeFormatter; +import java.util.Optional; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; import javax.xml.stream.XMLStreamException; import org.apache.hc.core5.net.URIBuilder; import org.opentripplanner.ext.siri.SiriAlertsUpdateHandler; -import org.opentripplanner.framework.time.DurationUtils; import org.opentripplanner.routing.impl.TransitAlertServiceImpl; import org.opentripplanner.routing.services.TransitAlertService; import org.opentripplanner.transit.service.TransitModel; @@ -23,7 +23,7 @@ import org.rutebanken.siri20.util.SiriXml; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import uk.org.siri.siri20.Siri; +import uk.org.siri.siri20.ServiceDelivery; public class SiriAzureSXUpdater extends AbstractAzureSiriUpdater implements TransitAlertProvider { @@ -34,7 +34,6 @@ public class SiriAzureSXUpdater extends AbstractAzureSiriUpdater implements Tran private static final transient AtomicLong messageCounter = new AtomicLong(0); private final LocalDate fromDateTime; private final LocalDate toDateTime; - private Instant startTime; public SiriAzureSXUpdater(SiriAzureSXUpdaterParameters config, TransitModel transitModel) { super(config, transitModel); @@ -63,7 +62,16 @@ protected void messageConsumer(ServiceBusReceivedMessageContext messageContext) ); messageCounter.incrementAndGet(); - processMessage(message.getBody().toString(), message.getMessageId()); + + try { + var siriSx = parseSiriSx(message.getBody().toString(), message.getMessageId()); + if (siriSx.isEmpty()) { + return; + } + processMessage(siriSx.get()); + } catch (JAXBException | XMLStreamException e) { + LOG.error(e.getLocalizedMessage(), e); + } } @Override @@ -75,36 +83,30 @@ protected void errorConsumer(ServiceBusErrorContext errorContext) { protected void initializeData(String url, Consumer consumer) throws URISyntaxException { if (url == null) { - LOG.info("No history url set up for Siri Azure Sx Updater"); + LOG.info("No history url set up for Siri Azure SX Updater"); return; } - while (!isPrimed()) { - startTime = Instant.now(); - URI uri = new URIBuilder(url) - .addParameter("publishFromDateTime", fromDateTime.format(DateTimeFormatter.ISO_LOCAL_DATE)) - .addParameter("publishToDateTime", toDateTime.format(DateTimeFormatter.ISO_LOCAL_DATE)) - .build(); + URI uri = new URIBuilder(url) + .addParameter("publishFromDateTime", fromDateTime.format(DateTimeFormatter.ISO_LOCAL_DATE)) + .addParameter("publishToDateTime", toDateTime.format(DateTimeFormatter.ISO_LOCAL_DATE)) + .build(); - LOG.info("Fetching initial Siri SX data from {}, timeout is {}ms", uri, timeout); + LOG.info("Fetching initial Siri SX data from {}, timeout is {}ms", uri, timeout); + var siri = fetchInitialSiriData(uri); - final long t1 = System.currentTimeMillis(); - String string = fetchInitialData(uri); - final long t2 = System.currentTimeMillis(); - - LOG.info( - "Fetching initial data - finished after {} ms, got {} bytes", - (t2 - t1), - string.length() - ); - - // This is fine since runnables are scheduled after each other - processHistory(string, "SX-INITIAL-1"); + if (siri.isEmpty()) { + LOG.info("Got empty SX response from history endpoint"); + return; } + + // This is fine since runnables are scheduled after each other + processHistory(siri.get()); } - private Siri getSiri(String message, String id) throws XMLStreamException, JAXBException { - var siri = SiriXml.parseXml(message); + private Optional parseSiriSx(String xmlMessage, String id) + throws XMLStreamException, JAXBException { + var siri = SiriXml.parseXml(xmlMessage); if ( siri.getServiceDelivery() == null || siri.getServiceDelivery().getSituationExchangeDeliveries() == null || @@ -113,55 +115,32 @@ private Siri getSiri(String message, String id) throws XMLStreamException, JAXBE if (siri.getHeartbeatNotification() != null) { LOG.debug("Received SIRI heartbeat message"); } else { - LOG.warn("Empty Siri message for messageId {}", id); - LOG.debug(message); + LOG.info("Empty Siri message for messageId {}", id); } - return null; + return Optional.empty(); } - return siri; + return Optional.of(siri.getServiceDelivery()); } - private void processMessage(String message, String id) { - try { - Siri siri = getSiri(message, id); - if (siri == null) { - return; - } + private Future processMessage(ServiceDelivery siriSx) { + return super.saveResultOnGraph.execute((graph, transitModel) -> updateHandler.update(siriSx)); + } - super.saveResultOnGraph.execute((graph, transitModel) -> - updateHandler.update(siri.getServiceDelivery()) - ); - } catch (JAXBException | XMLStreamException e) { - LOG.error(e.getLocalizedMessage(), e); + private void processHistory(ServiceDelivery siri) { + var sx = siri.getSituationExchangeDeliveries(); + + if (sx == null || sx.isEmpty()) { + LOG.info("Did not receive any SX messages from history endpoint"); + return; } - } - private void processHistory(String message, String id) { try { - Siri siri = getSiri(message, id); - if (siri == null) { - LOG.info("Did not receive any SX messages from history endpoint."); - return; - } - - var f = super.saveResultOnGraph.execute((graph, transitModel) -> { - try { - long t1 = System.currentTimeMillis(); - updateHandler.update(siri.getServiceDelivery()); - - LOG.info( - "Azure SX updater initialized after {} ms: [time since startup: {}]", - (System.currentTimeMillis() - t1), - DurationUtils.durationToStr(Duration.between(startTime, Instant.now())) - ); - setPrimed(true); - } catch (Exception e) { - LOG.error("Could not process SX history", e); - } - }); + var t1 = System.currentTimeMillis(); + var f = processMessage(siri); f.get(); - } catch (JAXBException | XMLStreamException | ExecutionException | InterruptedException e) { - LOG.error(e.getLocalizedMessage(), e); + LOG.info("Azure SX updater initialized. Took {} ms", (System.currentTimeMillis() - t1)); + } catch (ExecutionException | InterruptedException e) { + throw new SiriAzureInitializationException("Error applying SX history", e); } } From d93adc75a518431415c9d5d15ddbce185f6920ae Mon Sep 17 00:00:00 2001 From: Henrik Abrahamsson Date: Wed, 20 Mar 2024 14:15:27 +0100 Subject: [PATCH 2/6] Handle 204 No Response status in SiriAzure updaters --- .../updater/azure/AbstractAzureSiriUpdater.java | 8 ++++++++ .../framework/io/OtpHttpClient.java | 4 +--- .../io/OtpHttpClientStatusCodeException.java | 15 +++++++++++++++ 3 files changed, 24 insertions(+), 3 deletions(-) create mode 100644 src/main/java/org/opentripplanner/framework/io/OtpHttpClientStatusCodeException.java diff --git a/src/ext/java/org/opentripplanner/ext/siri/updater/azure/AbstractAzureSiriUpdater.java b/src/ext/java/org/opentripplanner/ext/siri/updater/azure/AbstractAzureSiriUpdater.java index f24999eb521..f2ea647465c 100644 --- a/src/ext/java/org/opentripplanner/ext/siri/updater/azure/AbstractAzureSiriUpdater.java +++ b/src/ext/java/org/opentripplanner/ext/siri/updater/azure/AbstractAzureSiriUpdater.java @@ -29,6 +29,7 @@ import org.opentripplanner.ext.siri.SiriFuzzyTripMatcher; import org.opentripplanner.framework.application.ApplicationShutdownSupport; import org.opentripplanner.framework.io.OtpHttpClient; +import org.opentripplanner.framework.io.OtpHttpClientStatusCodeException; import org.opentripplanner.transit.service.DefaultTransitService; import org.opentripplanner.transit.service.TransitModel; import org.opentripplanner.transit.service.TransitService; @@ -227,6 +228,13 @@ protected Optional fetchInitialSiriData(URI uri) { (t2 - t1), initialData.length() ); + } catch (OtpHttpClientStatusCodeException e) { + // 204 No Response status is ok. + if (e.getStatusCode() == 204) { + LOG.info("Got status 204 No Response"); + return Optional.empty(); + } + throw e; } try { diff --git a/src/main/java/org/opentripplanner/framework/io/OtpHttpClient.java b/src/main/java/org/opentripplanner/framework/io/OtpHttpClient.java index c922cfcc4be..33ef72b121e 100644 --- a/src/main/java/org/opentripplanner/framework/io/OtpHttpClient.java +++ b/src/main/java/org/opentripplanner/framework/io/OtpHttpClient.java @@ -320,9 +320,7 @@ public T executeAndMap( headers, response -> { if (isFailedRequest(response)) { - throw new OtpHttpClientException( - "HTTP request failed with status code " + response.getCode() - ); + throw new OtpHttpClientStatusCodeException(response.getCode()); } if (response.getEntity() == null || response.getEntity().getContent() == null) { throw new OtpHttpClientException("HTTP request failed: empty response"); diff --git a/src/main/java/org/opentripplanner/framework/io/OtpHttpClientStatusCodeException.java b/src/main/java/org/opentripplanner/framework/io/OtpHttpClientStatusCodeException.java new file mode 100644 index 00000000000..e001f23b05e --- /dev/null +++ b/src/main/java/org/opentripplanner/framework/io/OtpHttpClientStatusCodeException.java @@ -0,0 +1,15 @@ +package org.opentripplanner.framework.io; + +public class OtpHttpClientStatusCodeException extends OtpHttpClientException { + + private final int statusCode; + + public OtpHttpClientStatusCodeException(int statusCode) { + super("HTTP request failed with status code " + statusCode); + this.statusCode = statusCode; + } + + public int getStatusCode() { + return statusCode; + } +} From 332d390e5b8aa8c3e262e7dd8660fbaa83284dcf Mon Sep 17 00:00:00 2001 From: Henrik Abrahamsson <127481124+habrahamsson-skanetrafiken@users.noreply.github.com> Date: Wed, 20 Mar 2024 16:16:20 +0100 Subject: [PATCH 3/6] Apply logging suggestions from code review Co-authored-by: Johan Torin --- .../ext/siri/updater/azure/AbstractAzureSiriUpdater.java | 6 +++--- .../ext/siri/updater/azure/SiriAzureETUpdater.java | 4 ++-- .../ext/siri/updater/azure/SiriAzureSXUpdater.java | 4 ++-- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/ext/java/org/opentripplanner/ext/siri/updater/azure/AbstractAzureSiriUpdater.java b/src/ext/java/org/opentripplanner/ext/siri/updater/azure/AbstractAzureSiriUpdater.java index f2ea647465c..3301d625854 100644 --- a/src/ext/java/org/opentripplanner/ext/siri/updater/azure/AbstractAzureSiriUpdater.java +++ b/src/ext/java/org/opentripplanner/ext/siri/updater/azure/AbstractAzureSiriUpdater.java @@ -224,14 +224,14 @@ protected Optional fetchInitialSiriData(URI uri) { final long t2 = System.currentTimeMillis(); LOG.info( - "Fetching initial data - finished after {} ms, got {} bytes", + "Fetching initial data, received {} bytes in {} ms.", (t2 - t1), initialData.length() ); } catch (OtpHttpClientStatusCodeException e) { - // 204 No Response status is ok. + // 204 No Content status is ok. if (e.getStatusCode() == 204) { - LOG.info("Got status 204 No Response"); + LOG.info("Got status 204 'No Content', handling gracefully."); return Optional.empty(); } throw e; diff --git a/src/ext/java/org/opentripplanner/ext/siri/updater/azure/SiriAzureETUpdater.java b/src/ext/java/org/opentripplanner/ext/siri/updater/azure/SiriAzureETUpdater.java index 76d75cd5576..699a89ddb4b 100644 --- a/src/ext/java/org/opentripplanner/ext/siri/updater/azure/SiriAzureETUpdater.java +++ b/src/ext/java/org/opentripplanner/ext/siri/updater/azure/SiriAzureETUpdater.java @@ -84,7 +84,7 @@ protected void initializeData(String url, Consumer Date: Fri, 22 Mar 2024 13:30:59 +0100 Subject: [PATCH 4/6] Add 204 optional handling to OtpHttpClient --- .../azure/AbstractAzureSiriUpdater.java | 42 +++++---------- .../framework/io/OtpHttpClient.java | 54 +++++++++++++++---- .../io/OtpHttpClientStatusCodeException.java | 15 ------ 3 files changed, 55 insertions(+), 56 deletions(-) delete mode 100644 src/main/java/org/opentripplanner/framework/io/OtpHttpClientStatusCodeException.java diff --git a/src/ext/java/org/opentripplanner/ext/siri/updater/azure/AbstractAzureSiriUpdater.java b/src/ext/java/org/opentripplanner/ext/siri/updater/azure/AbstractAzureSiriUpdater.java index 3301d625854..bf5f8a6113a 100644 --- a/src/ext/java/org/opentripplanner/ext/siri/updater/azure/AbstractAzureSiriUpdater.java +++ b/src/ext/java/org/opentripplanner/ext/siri/updater/azure/AbstractAzureSiriUpdater.java @@ -12,9 +12,6 @@ import com.azure.messaging.servicebus.administration.models.CreateSubscriptionOptions; import com.azure.messaging.servicebus.models.ServiceBusReceiveMode; import com.google.common.base.Preconditions; -import com.google.common.io.CharStreams; -import jakarta.xml.bind.JAXBException; -import java.io.InputStreamReader; import java.net.URI; import java.net.URISyntaxException; import java.time.Duration; @@ -24,12 +21,11 @@ import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; -import javax.xml.stream.XMLStreamException; +import org.apache.hc.client5.http.classic.methods.HttpGet; import org.opentripplanner.ext.siri.EntityResolver; import org.opentripplanner.ext.siri.SiriFuzzyTripMatcher; import org.opentripplanner.framework.application.ApplicationShutdownSupport; import org.opentripplanner.framework.io.OtpHttpClient; -import org.opentripplanner.framework.io.OtpHttpClientStatusCodeException; import org.opentripplanner.transit.service.DefaultTransitService; import org.opentripplanner.transit.service.TransitModel; import org.opentripplanner.transit.service.TransitService; @@ -208,40 +204,26 @@ public String getConfigRef() { * Returns None for empty result */ protected Optional fetchInitialSiriData(URI uri) { - // Maybe put this in the config? - HttpHeaders rh = HttpHeaders.of().acceptApplicationXML().build(); - String initialData; + var headers = HttpHeaders.of().acceptApplicationXML().build().asMap(); try (OtpHttpClient otpHttpClient = new OtpHttpClient()) { var t1 = System.currentTimeMillis(); - initialData = - otpHttpClient.getAndMap( - uri, - Duration.ofMillis(timeout), - rh.asMap(), - is -> CharStreams.toString(new InputStreamReader(is)) - ); - final long t2 = System.currentTimeMillis(); - - LOG.info( - "Fetching initial data, received {} bytes in {} ms.", - (t2 - t1), - initialData.length() + var siriOptional = otpHttpClient.executeAndMapOptional( + new HttpGet(uri), + Duration.ofMillis(timeout), + headers, + SiriXml::parseXml ); - } catch (OtpHttpClientStatusCodeException e) { - // 204 No Content status is ok. - if (e.getStatusCode() == 204) { + var t2 = System.currentTimeMillis(); + LOG.info("Fetched initial data in {} ms", (t2 - t1)); + + if (siriOptional.isEmpty()) { LOG.info("Got status 204 'No Content', handling gracefully."); return Optional.empty(); } - throw e; - } - try { - var serviceDelivery = SiriXml.parseXml(initialData).getServiceDelivery(); + var serviceDelivery = siriOptional.get().getServiceDelivery(); return Optional.ofNullable(serviceDelivery); - } catch (JAXBException | XMLStreamException e) { - throw new SiriAzureInitializationException("Could not parse history message", e); } } diff --git a/src/main/java/org/opentripplanner/framework/io/OtpHttpClient.java b/src/main/java/org/opentripplanner/framework/io/OtpHttpClient.java index 33ef72b121e..1ac8891553c 100644 --- a/src/main/java/org/opentripplanner/framework/io/OtpHttpClient.java +++ b/src/main/java/org/opentripplanner/framework/io/OtpHttpClient.java @@ -13,6 +13,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import org.apache.hc.client5.http.classic.methods.HttpGet; import org.apache.hc.client5.http.classic.methods.HttpPost; import org.apache.hc.client5.http.classic.methods.HttpUriRequestBase; @@ -24,6 +25,7 @@ import org.apache.hc.client5.http.impl.classic.HttpClients; import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager; import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManagerBuilder; +import org.apache.hc.core5.http.ClassicHttpResponse; import org.apache.hc.core5.http.ContentType; import org.apache.hc.core5.http.Header; import org.apache.hc.core5.http.HttpEntity; @@ -313,23 +315,34 @@ public T executeAndMap( Duration timeout, Map headers, ResponseMapper contentMapper + ) { + return executeAndMapWithResponseHandler( + httpRequest, + timeout, + headers, + response -> mapResponse(response, contentMapper) + ); + } + + /** + * Executes an HTTP request and returns the body mapped according to the provided content mapper. + * Returns empty result on http status 204 "No Content" + */ + public Optional executeAndMapOptional( + HttpUriRequestBase httpRequest, + Duration timeout, + Map headers, + ResponseMapper contentMapper ) { return executeAndMapWithResponseHandler( httpRequest, timeout, headers, response -> { - if (isFailedRequest(response)) { - throw new OtpHttpClientStatusCodeException(response.getCode()); - } - if (response.getEntity() == null || response.getEntity().getContent() == null) { - throw new OtpHttpClientException("HTTP request failed: empty response"); - } - try (InputStream is = response.getEntity().getContent()) { - return contentMapper.apply(is); - } catch (Exception e) { - throw new OtpHttpClientException(e); + if (response.getCode() == 204) { + return Optional.empty(); } + return Optional.of(mapResponse(response, contentMapper)); } ); } @@ -401,6 +414,25 @@ protected T executeAndMapWithResponseHandler( } } + private T mapResponse(ClassicHttpResponse response, ResponseMapper contentMapper) { + if (isFailedRequest(response)) { + throw new OtpHttpClientException( + "HTTP request failed with status code " + response.getCode() + ); + } + if (response.getEntity() == null) { + throw new OtpHttpClientException("HTTP request failed: empty response"); + } + try (InputStream is = response.getEntity().getContent()) { + if (is == null) { + throw new OtpHttpClientException("HTTP request failed: empty response"); + } + return contentMapper.apply(is); + } catch (Exception e) { + throw new OtpHttpClientException(e); + } + } + private T sendAndMap( HttpUriRequestBase request, URI uri, @@ -443,7 +475,7 @@ private static RequestConfig requestConfig(Duration timeout) { * Returns true if the HTTP status code is not 200. */ private static boolean isFailedRequest(HttpResponse response) { - return response.getCode() != 200; + return response.getCode() < 200 || response.getCode() >= 300; } /** diff --git a/src/main/java/org/opentripplanner/framework/io/OtpHttpClientStatusCodeException.java b/src/main/java/org/opentripplanner/framework/io/OtpHttpClientStatusCodeException.java deleted file mode 100644 index e001f23b05e..00000000000 --- a/src/main/java/org/opentripplanner/framework/io/OtpHttpClientStatusCodeException.java +++ /dev/null @@ -1,15 +0,0 @@ -package org.opentripplanner.framework.io; - -public class OtpHttpClientStatusCodeException extends OtpHttpClientException { - - private final int statusCode; - - public OtpHttpClientStatusCodeException(int statusCode) { - super("HTTP request failed with status code " + statusCode); - this.statusCode = statusCode; - } - - public int getStatusCode() { - return statusCode; - } -} From e88ca44b76e5d3b22ad61e0eab411bbeee75926a Mon Sep 17 00:00:00 2001 From: Henrik Abrahamsson Date: Fri, 22 Mar 2024 18:01:41 +0100 Subject: [PATCH 5/6] Minor cleanup --- .../ext/siri/updater/azure/AbstractAzureSiriUpdater.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/ext/java/org/opentripplanner/ext/siri/updater/azure/AbstractAzureSiriUpdater.java b/src/ext/java/org/opentripplanner/ext/siri/updater/azure/AbstractAzureSiriUpdater.java index bf5f8a6113a..ba1ae880d34 100644 --- a/src/ext/java/org/opentripplanner/ext/siri/updater/azure/AbstractAzureSiriUpdater.java +++ b/src/ext/java/org/opentripplanner/ext/siri/updater/azure/AbstractAzureSiriUpdater.java @@ -218,12 +218,10 @@ protected Optional fetchInitialSiriData(URI uri) { LOG.info("Fetched initial data in {} ms", (t2 - t1)); if (siriOptional.isEmpty()) { - LOG.info("Got status 204 'No Content', handling gracefully."); - return Optional.empty(); + LOG.info("Got status 204 'No Content'."); } - var serviceDelivery = siriOptional.get().getServiceDelivery(); - return Optional.ofNullable(serviceDelivery); + return siriOptional.flatMap(siri -> Optional.ofNullable(siri.getServiceDelivery())); } } From ee00762b9c16ec7bcd4465be7828d4a8ee9e2472 Mon Sep 17 00:00:00 2001 From: Johan Torin Date: Mon, 25 Mar 2024 11:11:47 +0100 Subject: [PATCH 6/6] Update src/ext/java/org/opentripplanner/ext/siri/updater/azure/AbstractAzureSiriUpdater.java Co-authored-by: Leonard Ehrenfried --- .../ext/siri/updater/azure/AbstractAzureSiriUpdater.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ext/java/org/opentripplanner/ext/siri/updater/azure/AbstractAzureSiriUpdater.java b/src/ext/java/org/opentripplanner/ext/siri/updater/azure/AbstractAzureSiriUpdater.java index ba1ae880d34..1915105960e 100644 --- a/src/ext/java/org/opentripplanner/ext/siri/updater/azure/AbstractAzureSiriUpdater.java +++ b/src/ext/java/org/opentripplanner/ext/siri/updater/azure/AbstractAzureSiriUpdater.java @@ -221,7 +221,7 @@ protected Optional fetchInitialSiriData(URI uri) { LOG.info("Got status 204 'No Content'."); } - return siriOptional.flatMap(siri -> Optional.ofNullable(siri.getServiceDelivery())); + return siriOptional.map(siri -> siri.getServiceDelivery()); } }