diff --git a/.github/workflows/cibuild.yml b/.github/workflows/cibuild.yml index 09150d0d868..120220de7f1 100644 --- a/.github/workflows/cibuild.yml +++ b/.github/workflows/cibuild.yml @@ -117,12 +117,13 @@ jobs: - uses: actions/setup-node@v4 with: - node-version: 18 + node-version: 20 - name: Build GTFS GraphQL API documentation run: | - npm install -g @magidoc/cli@4.1.4 - magidoc generate + npm install -g @magidoc/cli@6.0.0 + cat src/main/resources/org/opentripplanner/apis/gtfs/schema.graphqls + magidoc generate --stacktrace - name: Deploy compiled HTML to Github pages if: github.event_name == 'push' && (github.ref == 'refs/heads/dev-2.x' || github.ref == 'refs/heads/master') diff --git a/.github/workflows/performance-test.yml b/.github/workflows/performance-test.yml index 60b3e69610d..bfddea1b408 100644 --- a/.github/workflows/performance-test.yml +++ b/.github/workflows/performance-test.yml @@ -109,7 +109,7 @@ jobs: - name: Archive Flight Recorder instrumentation file if: matrix.profile == 'core' || github.ref == 'refs/heads/dev-2.x' - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: name: ${{ matrix.location }}-flight-recorder path: ${{ matrix.location}}-speed-test.jfr diff --git a/docs/Changelog.md b/docs/Changelog.md index d0db11c2f2c..8cdfb231eb3 100644 --- a/docs/Changelog.md +++ b/docs/Changelog.md @@ -34,6 +34,7 @@ based on merged pull requests. Search GitHub issues and pull requests for smalle - Fix debug client after breaking change in dependency graphql-request [#5899](https://github.com/opentripplanner/OpenTripPlanner/pull/5899) - Remove TravelTime API [#5890](https://github.com/opentripplanner/OpenTripPlanner/pull/5890) - Improve cancellation of large response in TransModel API [#5908](https://github.com/opentripplanner/OpenTripPlanner/pull/5908) +- Refactor SIRI-ET updaters [#5904](https://github.com/opentripplanner/OpenTripPlanner/pull/5904) [](AUTOMATIC_CHANGELOG_PLACEHOLDER_DO_NOT_REMOVE) ## 2.5.0 (2024-03-13) diff --git a/magidoc.mjs b/magidoc.mjs index 4fea5e4e127..4d9e8c98a7f 100644 --- a/magidoc.mjs +++ b/magidoc.mjs @@ -36,7 +36,15 @@ To learn how to deactivate it, read the queryGenerationFactories: { 'Polyline': '<>', 'GeoJson': '<>', - 'OffsetDateTime': '2024-02-05T18:04:23+01:00' + 'OffsetDateTime': '2024-02-05T18:04:23+01:00', + 'Duration': 'PT10M', + 'CoordinateValue': 19.24, + 'Reluctance': 3.1, + 'Speed': 3.4, + 'Cost': 100, + 'Ratio': 0.25, + 'Locale': 'en' + }, } }, diff --git a/src/ext-test/java/org/opentripplanner/ext/fares/impl/OrcaFareServiceTest.java b/src/ext-test/java/org/opentripplanner/ext/fares/impl/OrcaFareServiceTest.java index 7493c33bbb2..e7a2610b42f 100644 --- a/src/ext-test/java/org/opentripplanner/ext/fares/impl/OrcaFareServiceTest.java +++ b/src/ext-test/java/org/opentripplanner/ext/fares/impl/OrcaFareServiceTest.java @@ -134,7 +134,7 @@ private static void assertLegFareEquals( void calculateFareForSingleAgency() { List rides = List.of(getLeg(COMM_TRANS_AGENCY_ID, "400", 0)); calculateFare(rides, regular, DEFAULT_TEST_RIDE_PRICE); - calculateFare(rides, FareType.senior, DEFAULT_TEST_RIDE_PRICE); + calculateFare(rides, FareType.senior, TWO_DOLLARS); calculateFare(rides, FareType.youth, ZERO_USD); calculateFare(rides, FareType.electronicSpecial, TWO_DOLLARS); calculateFare(rides, FareType.electronicRegular, DEFAULT_TEST_RIDE_PRICE); @@ -154,18 +154,14 @@ void calculateFareWithNoFreeTransfer() { getLeg(COMM_TRANS_AGENCY_ID, 2) ); calculateFare(rides, regular, DEFAULT_TEST_RIDE_PRICE.times(3)); - calculateFare(rides, FareType.senior, DEFAULT_TEST_RIDE_PRICE.times(3)); + calculateFare(rides, FareType.senior, DEFAULT_TEST_RIDE_PRICE.plus(usDollars(2.25f))); calculateFare(rides, FareType.youth, Money.ZERO_USD); calculateFare( rides, FareType.electronicSpecial, DEFAULT_TEST_RIDE_PRICE.plus(usDollars(1.25f)) ); - calculateFare( - rides, - FareType.electronicRegular, - DEFAULT_TEST_RIDE_PRICE.plus(DEFAULT_TEST_RIDE_PRICE) - ); + calculateFare(rides, FareType.electronicRegular, DEFAULT_TEST_RIDE_PRICE.times(2)); calculateFare(rides, FareType.electronicSenior, DEFAULT_TEST_RIDE_PRICE.plus(usDollars(1.25f))); calculateFare(rides, FareType.electronicYouth, Money.ZERO_USD); } @@ -200,7 +196,7 @@ void calculateFareThatExceedsTwoHourFreeTransferWindow() { ); calculateFare(rides, regular, DEFAULT_TEST_RIDE_PRICE.times(2)); - calculateFare(rides, FareType.senior, DEFAULT_TEST_RIDE_PRICE.times(2)); + calculateFare(rides, FareType.senior, TWO_DOLLARS); calculateFare(rides, FareType.youth, ZERO_USD); calculateFare(rides, FareType.electronicSpecial, TWO_DOLLARS); calculateFare(rides, FareType.electronicRegular, DEFAULT_TEST_RIDE_PRICE.times(2)); @@ -227,7 +223,7 @@ void calculateFareThatIncludesNoFreeTransfers() { calculateFare( rides, FareType.senior, - DEFAULT_TEST_RIDE_PRICE.times(2).plus(usDollars(.50f)).plus(HALF_FERRY_FARE) + ONE_DOLLAR.plus(ONE_DOLLAR).plus(HALF_FERRY_FARE).plus(usDollars(0.5f)) ); calculateFare(rides, FareType.youth, Money.ZERO_USD); // We don't get any fares for the skagit transit leg below here because they don't accept ORCA (electronic) @@ -263,7 +259,7 @@ void calculateFareThatExceedsTwoHourFreeTransferWindowTwice() { getLeg(KITSAP_TRANSIT_AGENCY_ID, 270) ); calculateFare(rides, regular, DEFAULT_TEST_RIDE_PRICE.times(3)); - calculateFare(rides, FareType.senior, DEFAULT_TEST_RIDE_PRICE.times(3)); + calculateFare(rides, FareType.senior, usDollars(3)); calculateFare(rides, FareType.youth, Money.ZERO_USD); calculateFare(rides, FareType.electronicSpecial, usDollars(3)); calculateFare(rides, FareType.electronicRegular, DEFAULT_TEST_RIDE_PRICE.times(3)); @@ -286,7 +282,7 @@ void calculateFareThatStartsWithACashFare() { getLeg(KITSAP_TRANSIT_AGENCY_ID, 149) ); calculateFare(rides, regular, DEFAULT_TEST_RIDE_PRICE.times(2)); - calculateFare(rides, FareType.senior, DEFAULT_TEST_RIDE_PRICE.times(2)); + calculateFare(rides, FareType.senior, DEFAULT_TEST_RIDE_PRICE.plus(ONE_DOLLAR)); calculateFare(rides, FareType.youth, Money.ZERO_USD); calculateFare(rides, FareType.electronicSpecial, DEFAULT_TEST_RIDE_PRICE.plus(ONE_DOLLAR)); calculateFare( @@ -305,7 +301,7 @@ void calculateFareThatStartsWithACashFare() { void calculateFareForKitsapFastFerry() { List rides = List.of(getLeg(KITSAP_TRANSIT_AGENCY_ID, 0, 4, "404", "east")); calculateFare(rides, regular, TWO_DOLLARS); - calculateFare(rides, FareType.senior, TWO_DOLLARS); + calculateFare(rides, FareType.senior, ONE_DOLLAR); calculateFare(rides, FareType.youth, Money.ZERO_USD); calculateFare(rides, FareType.electronicSpecial, ONE_DOLLAR); calculateFare(rides, FareType.electronicRegular, TWO_DOLLARS); @@ -314,7 +310,7 @@ void calculateFareForKitsapFastFerry() { rides = List.of(getLeg(KITSAP_TRANSIT_AGENCY_ID, 0, 4, "404", "west")); calculateFare(rides, regular, usDollars(10f)); - calculateFare(rides, FareType.senior, usDollars(10f)); + calculateFare(rides, FareType.senior, usDollars(5f)); calculateFare(rides, FareType.youth, Money.ZERO_USD); calculateFare(rides, FareType.electronicSpecial, usDollars(5f)); calculateFare(rides, FareType.electronicRegular, usDollars(10f)); @@ -349,7 +345,7 @@ void calculateFareForSTRail() { getLeg(SOUND_TRANSIT_AGENCY_ID, "S Line", 100, "King Street Station", "Auburn Station") ); calculateFare(rides, regular, DEFAULT_TEST_RIDE_PRICE.times(2)); - calculateFare(rides, FareType.senior, DEFAULT_TEST_RIDE_PRICE.times(2)); + calculateFare(rides, FareType.senior, TWO_DOLLARS); calculateFare(rides, FareType.youth, Money.ZERO_USD); calculateFare(rides, FareType.electronicSpecial, ORCA_SPECIAL_FARE); calculateFare(rides, FareType.electronicRegular, DEFAULT_TEST_RIDE_PRICE); @@ -364,7 +360,7 @@ void calculateFareForSTRail() { void calculateWaterTaxiFares() { List rides = List.of(getLeg(KC_METRO_AGENCY_ID, "973", 1)); calculateFare(rides, regular, WEST_SEATTLE_WATER_TAXI_CASH_FARE); - calculateFare(rides, FareType.senior, WEST_SEATTLE_WATER_TAXI_CASH_FARE); + calculateFare(rides, FareType.senior, usDollars(2.50f)); calculateFare(rides, FareType.youth, Money.ZERO_USD); calculateFare(rides, FareType.electronicSpecial, usDollars(3.75f)); calculateFare(rides, FareType.electronicRegular, usDollars(5f)); @@ -374,7 +370,7 @@ void calculateWaterTaxiFares() { rides = List.of(getLeg(KC_METRO_AGENCY_ID, "975", 1)); calculateFare(rides, regular, VASHON_WATER_TAXI_CASH_FARE); - calculateFare(rides, FareType.senior, VASHON_WATER_TAXI_CASH_FARE); + calculateFare(rides, FareType.senior, usDollars(3f)); calculateFare(rides, FareType.youth, Money.ZERO_USD); calculateFare(rides, FareType.electronicSpecial, usDollars(4.50f)); calculateFare(rides, FareType.electronicRegular, usDollars(5.75f)); @@ -395,8 +391,7 @@ void calculateSoundTransitBusFares() { getLeg(KC_METRO_AGENCY_ID, "550", 240) ); calculateFare(rides, regular, usDollars(9.75f)); - // Sound Transit does not accept senior fares in cash - calculateFare(rides, FareType.senior, usDollars(9.75f)); + calculateFare(rides, FareType.senior, usDollars(3.00f)); calculateFare(rides, FareType.youth, Money.ZERO_USD); calculateFare(rides, FareType.electronicSpecial, usDollars(3f)); calculateFare(rides, FareType.electronicRegular, usDollars(9.75f)); @@ -410,7 +405,7 @@ void calculateSoundTransitBusFares() { getLeg(PIERCE_COUNTY_TRANSIT_AGENCY_ID, "501", 60) ); calculateFare(rides, regular, DEFAULT_TEST_RIDE_PRICE.times(2)); - calculateFare(rides, FareType.senior, DEFAULT_TEST_RIDE_PRICE.times(2)); + calculateFare(rides, FareType.senior, TWO_DOLLARS); calculateFare(rides, FareType.youth, Money.ZERO_USD); calculateFare(rides, FareType.electronicSpecial, usDollars(1f)); calculateFare(rides, FareType.electronicRegular, DEFAULT_TEST_RIDE_PRICE); @@ -430,7 +425,7 @@ void calculateCashFreeTransferKCMetroAndKitsap() { getLeg(KITSAP_TRANSIT_AGENCY_ID, 132) ); calculateFare(rides, regular, DEFAULT_TEST_RIDE_PRICE.times(4)); - calculateFare(rides, FareType.senior, DEFAULT_TEST_RIDE_PRICE.times(4)); + calculateFare(rides, FareType.senior, usDollars(4.25f)); calculateFare(rides, FareType.youth, Money.ZERO_USD); calculateFare(rides, FareType.electronicSpecial, usDollars(1.25f)); calculateFare(rides, FareType.electronicRegular, DEFAULT_TEST_RIDE_PRICE.times(2)); @@ -442,12 +437,12 @@ void calculateCashFreeTransferKCMetroAndKitsap() { void calculateTransferExtension() { List rides = List.of( getLeg(KITSAP_TRANSIT_AGENCY_ID, 0, 4, "Kitsap Fast Ferry", "east"), // 2.00 - getLeg(KC_METRO_AGENCY_ID, 100), // Default ride price, extends transfer + getLeg(KC_METRO_AGENCY_ID, 100), // Default ride price, extends transfer for regular fare getLeg(KITSAP_TRANSIT_AGENCY_ID, 150, 4, "Kitsap Fast Ferry", "west") // 10.00 ); var regularFare = usDollars(2.00f).plus(DEFAULT_TEST_RIDE_PRICE).plus(usDollars(10f)); calculateFare(rides, regular, regularFare); - calculateFare(rides, FareType.senior, regularFare); + calculateFare(rides, FareType.senior, usDollars(7f)); calculateFare(rides, FareType.youth, Money.ZERO_USD); calculateFare(rides, FareType.electronicSpecial, usDollars(6f)); calculateFare(rides, FareType.electronicRegular, usDollars(10f)); // transfer extended on second leg diff --git a/src/ext/java/org/opentripplanner/ext/fares/impl/OrcaFareService.java b/src/ext/java/org/opentripplanner/ext/fares/impl/OrcaFareService.java index 8f961b0b01b..c3c1873ad6d 100644 --- a/src/ext/java/org/opentripplanner/ext/fares/impl/OrcaFareService.java +++ b/src/ext/java/org/opentripplanner/ext/fares/impl/OrcaFareService.java @@ -352,10 +352,10 @@ private Optional getSeniorFare( var regularFare = getRegularFare(fareType, rideType, defaultFare, leg); // Many agencies only provide senior discount if using ORCA return switch (rideType) { - case COMM_TRANS_LOCAL_SWIFT -> usesOrca(fareType) ? optionalUSD(1.25f) : regularFare; - case COMM_TRANS_COMMUTER_EXPRESS -> usesOrca(fareType) ? optionalUSD(2f) : regularFare; + case COMM_TRANS_LOCAL_SWIFT -> optionalUSD(1.25f); + case COMM_TRANS_COMMUTER_EXPRESS -> optionalUSD(2f); case SKAGIT_TRANSIT, WHATCOM_LOCAL, SKAGIT_LOCAL -> optionalUSD(0.5f); - case EVERETT_TRANSIT -> usesOrca(fareType) ? optionalUSD(0.5f) : regularFare; + case EVERETT_TRANSIT -> optionalUSD(0.5f); case KITSAP_TRANSIT_FAST_FERRY_EASTBOUND, SOUND_TRANSIT, SOUND_TRANSIT_BUS, @@ -365,14 +365,10 @@ private Optional getSeniorFare( KC_METRO, PIERCE_COUNTY_TRANSIT, SEATTLE_STREET_CAR, - KITSAP_TRANSIT -> fareType.equals(FareType.electronicSenior) - ? optionalUSD(1f) - : regularFare; - case KC_WATER_TAXI_VASHON_ISLAND -> usesOrca(fareType) ? optionalUSD(3f) : regularFare; - case KC_WATER_TAXI_WEST_SEATTLE -> usesOrca(fareType) ? optionalUSD(2.5f) : regularFare; - case KITSAP_TRANSIT_FAST_FERRY_WESTBOUND -> fareType.equals(FareType.electronicSenior) - ? optionalUSD(5f) - : regularFare; + KITSAP_TRANSIT -> optionalUSD(1f); + case KC_WATER_TAXI_VASHON_ISLAND -> optionalUSD(3f); + case KC_WATER_TAXI_WEST_SEATTLE -> optionalUSD(2.5f); + case KITSAP_TRANSIT_FAST_FERRY_WESTBOUND -> optionalUSD(5f); // Discount specific to Skagit transit and not Orca. case WASHINGTON_STATE_FERRIES -> Optional.of( getWashingtonStateFerriesFare(route.getLongName(), fareType, defaultFare) diff --git a/src/ext/java/org/opentripplanner/ext/siri/updater/AsyncEstimatedTimetableProcessor.java b/src/ext/java/org/opentripplanner/ext/siri/updater/AsyncEstimatedTimetableProcessor.java new file mode 100644 index 00000000000..81643c476ab --- /dev/null +++ b/src/ext/java/org/opentripplanner/ext/siri/updater/AsyncEstimatedTimetableProcessor.java @@ -0,0 +1,45 @@ +package org.opentripplanner.ext.siri.updater; + +import java.util.concurrent.Future; +import java.util.function.Consumer; +import org.opentripplanner.updater.spi.UpdateResult; +import org.opentripplanner.updater.spi.WriteToGraphCallback; +import org.opentripplanner.updater.trip.UpdateIncrementality; +import uk.org.siri.siri20.ServiceDelivery; + +/** + * Apply asynchronously estimated timetable updates in the graph-writer thread and forward the + * result to an update result consumer. + */ +public class AsyncEstimatedTimetableProcessor { + + private final EstimatedTimetableHandler estimatedTimetableHandler; + private final WriteToGraphCallback saveResultOnGraph; + private final Consumer updateResultConsumer; + + public AsyncEstimatedTimetableProcessor( + EstimatedTimetableHandler estimatedTimetableHandler, + WriteToGraphCallback saveResultOnGraph, + Consumer updateResultConsumer + ) { + this.estimatedTimetableHandler = estimatedTimetableHandler; + this.saveResultOnGraph = saveResultOnGraph; + this.updateResultConsumer = updateResultConsumer; + } + + /** + * Apply the estimated timetables to the transit model. + * This method is non-blocking and applies the changes asynchronously. + * @return a future indicating when the changes are applied. + */ + public Future processSiriData(ServiceDelivery serviceDelivery) { + return saveResultOnGraph.execute((graph, transitModel) -> + updateResultConsumer.accept( + estimatedTimetableHandler.applyUpdate( + serviceDelivery.getEstimatedTimetableDeliveries(), + UpdateIncrementality.DIFFERENTIAL + ) + ) + ); + } +} diff --git a/src/ext/java/org/opentripplanner/ext/siri/updater/AsyncEstimatedTimetableSource.java b/src/ext/java/org/opentripplanner/ext/siri/updater/AsyncEstimatedTimetableSource.java new file mode 100644 index 00000000000..9a70c9d5615 --- /dev/null +++ b/src/ext/java/org/opentripplanner/ext/siri/updater/AsyncEstimatedTimetableSource.java @@ -0,0 +1,28 @@ +package org.opentripplanner.ext.siri.updater; + +import java.util.concurrent.Future; +import java.util.function.Function; +import uk.org.siri.siri20.ServiceDelivery; + +/** + * A source of estimated timetables produced by an asynchronous (push) SIRI-ET feed. + */ +public interface AsyncEstimatedTimetableSource { + /** + * Start reading from the SIRI-ET feed and forward the estimated timetables to a consumer for + * further processing. + *
Starting the source includes all the necessary steps to set up the network + * communication with the SIRI-ET feed as well as the (optional) processing of the message + * backlog, that is the recent history of SIRI-ET messages produced by this feed and made + * available by a message cache. + * + * @param serviceDeliveryConsumer apply asynchronously the updates to the transit model. Return a + * future indicating when the updates are applied. + */ + void start(Function> serviceDeliveryConsumer); + + /** + * Return true if the message backlog is processed and the source is ready to listen to the feed. + */ + boolean isPrimed(); +} diff --git a/src/ext/java/org/opentripplanner/ext/siri/updater/EstimatedTimetableHandler.java b/src/ext/java/org/opentripplanner/ext/siri/updater/EstimatedTimetableHandler.java new file mode 100644 index 00000000000..961d6a10282 --- /dev/null +++ b/src/ext/java/org/opentripplanner/ext/siri/updater/EstimatedTimetableHandler.java @@ -0,0 +1,69 @@ +package org.opentripplanner.ext.siri.updater; + +import java.util.List; +import org.opentripplanner.ext.siri.EntityResolver; +import org.opentripplanner.ext.siri.SiriFuzzyTripMatcher; +import org.opentripplanner.ext.siri.SiriTimetableSnapshotSource; +import org.opentripplanner.transit.service.TransitService; +import org.opentripplanner.updater.spi.UpdateResult; +import org.opentripplanner.updater.trip.UpdateIncrementality; +import uk.org.siri.siri20.EstimatedTimetableDeliveryStructure; + +/** + * A consumer of estimated timetables that applies the real-time updates to the transit model. + */ +public class EstimatedTimetableHandler { + + private final SiriTimetableSnapshotSource snapshotSource; + private final SiriFuzzyTripMatcher fuzzyTripMatcher; + private final EntityResolver entityResolver; + /** + * The ID for the static feed to which these real time updates are applied + */ + private final String feedId; + + public EstimatedTimetableHandler( + SiriTimetableSnapshotSource snapshotSource, + boolean fuzzyMatching, + TransitService transitService, + String feedId + ) { + this( + snapshotSource, + fuzzyMatching ? SiriFuzzyTripMatcher.of(transitService) : null, + transitService, + feedId + ); + } + + /** + * Constructor for tests only. + */ + public EstimatedTimetableHandler( + SiriTimetableSnapshotSource snapshotSource, + SiriFuzzyTripMatcher siriFuzzyTripMatcher, + TransitService transitService, + String feedId + ) { + this.snapshotSource = snapshotSource; + this.fuzzyTripMatcher = siriFuzzyTripMatcher; + this.entityResolver = new EntityResolver(transitService, feedId); + this.feedId = feedId; + } + + /** + * Apply the update to the transit model. + */ + public UpdateResult applyUpdate( + List estimatedTimetableDeliveries, + UpdateIncrementality updateMode + ) { + return snapshotSource.applyEstimatedTimetable( + fuzzyTripMatcher, + entityResolver, + feedId, + updateMode, + estimatedTimetableDeliveries + ); + } +} diff --git a/src/ext/java/org/opentripplanner/ext/siri/updater/SiriETUpdater.java b/src/ext/java/org/opentripplanner/ext/siri/updater/SiriETUpdater.java index 1f95468565f..c811d3ee5d8 100644 --- a/src/ext/java/org/opentripplanner/ext/siri/updater/SiriETUpdater.java +++ b/src/ext/java/org/opentripplanner/ext/siri/updater/SiriETUpdater.java @@ -2,12 +2,9 @@ import java.util.List; import java.util.function.Consumer; -import org.opentripplanner.ext.siri.EntityResolver; -import org.opentripplanner.ext.siri.SiriFuzzyTripMatcher; import org.opentripplanner.ext.siri.SiriTimetableSnapshotSource; import org.opentripplanner.transit.service.DefaultTransitService; import org.opentripplanner.transit.service.TransitModel; -import org.opentripplanner.transit.service.TransitService; import org.opentripplanner.updater.spi.PollingGraphUpdater; import org.opentripplanner.updater.spi.ResultLogger; import org.opentripplanner.updater.spi.UpdateResult; @@ -38,21 +35,14 @@ public class SiriETUpdater extends PollingGraphUpdater { */ protected WriteToGraphCallback saveResultOnGraph; - /** - * The place where we'll record the incoming real-time timetables to make them available to the - * router in a thread safe way. - */ - private final SiriTimetableSnapshotSource snapshotSource; - - private final SiriFuzzyTripMatcher fuzzyTripMatcher; - private final EntityResolver entityResolver; + private final EstimatedTimetableHandler estimatedTimetableHandler; private final Consumer recordMetrics; public SiriETUpdater( SiriETUpdaterParameters config, TransitModel transitModel, - SiriTimetableSnapshotSource timetableSnapshot + SiriTimetableSnapshotSource timetableSnapshotSource ) { super(config); // Create update streamer from preferences @@ -60,19 +50,22 @@ public SiriETUpdater( this.updateSource = new SiriETHttpTripUpdateSource(config.sourceParameters()); - this.snapshotSource = timetableSnapshot; - this.blockReadinessUntilInitialized = config.blockReadinessUntilInitialized(); - TransitService transitService = new DefaultTransitService(transitModel); - this.entityResolver = new EntityResolver(transitService, feedId); - this.fuzzyTripMatcher = - config.fuzzyTripMatching() ? SiriFuzzyTripMatcher.of(transitService) : null; LOG.info( "Creating stop time updater (SIRI ET) running every {} seconds : {}", pollingPeriod(), updateSource ); + + estimatedTimetableHandler = + new EstimatedTimetableHandler( + timetableSnapshotSource, + config.fuzzyTripMatching(), + new DefaultTransitService(transitModel), + feedId + ); + recordMetrics = TripUpdateMetrics.streaming(config); } @@ -100,13 +93,7 @@ public void runPolling() { List etds = serviceDelivery.getEstimatedTimetableDeliveries(); if (etds != null) { saveResultOnGraph.execute((graph, transitModel) -> { - var result = snapshotSource.applyEstimatedTimetable( - fuzzyTripMatcher, - entityResolver, - feedId, - incrementality, - etds - ); + var result = estimatedTimetableHandler.applyUpdate(etds, incrementality); ResultLogger.logUpdateResult(feedId, "siri-et", result); recordMetrics.accept(result); if (markPrimed) { diff --git a/src/ext/java/org/opentripplanner/ext/siri/updater/SiriETGooglePubsubUpdater.java b/src/ext/java/org/opentripplanner/ext/siri/updater/google/GooglePubsubEstimatedTimetableSource.java similarity index 64% rename from src/ext/java/org/opentripplanner/ext/siri/updater/SiriETGooglePubsubUpdater.java rename to src/ext/java/org/opentripplanner/ext/siri/updater/google/GooglePubsubEstimatedTimetableSource.java index 7ca23ad4fc7..88b04feb9c5 100644 --- a/src/ext/java/org/opentripplanner/ext/siri/updater/SiriETGooglePubsubUpdater.java +++ b/src/ext/java/org/opentripplanner/ext/siri/updater/google/GooglePubsubEstimatedTimetableSource.java @@ -1,4 +1,4 @@ -package org.opentripplanner.ext.siri.updater; +package org.opentripplanner.ext.siri.updater.google; import com.google.api.gax.rpc.NotFoundException; import com.google.cloud.pubsub.v1.AckReplyConsumer; @@ -17,42 +17,36 @@ import java.net.URI; import java.time.Duration; import java.time.Instant; -import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Consumer; +import java.util.function.Function; import org.entur.protobuf.mapper.SiriMapper; -import org.opentripplanner.ext.siri.EntityResolver; -import org.opentripplanner.ext.siri.SiriFuzzyTripMatcher; -import org.opentripplanner.ext.siri.SiriTimetableSnapshotSource; +import org.opentripplanner.ext.siri.updater.AsyncEstimatedTimetableSource; import org.opentripplanner.framework.application.ApplicationShutdownSupport; import org.opentripplanner.framework.io.OtpHttpClientFactory; import org.opentripplanner.framework.retry.OtpRetry; import org.opentripplanner.framework.retry.OtpRetryBuilder; import org.opentripplanner.framework.text.FileSizeToTextConverter; import org.opentripplanner.framework.time.DurationUtils; -import org.opentripplanner.transit.service.DefaultTransitService; -import org.opentripplanner.transit.service.TransitModel; -import org.opentripplanner.transit.service.TransitService; -import org.opentripplanner.updater.spi.GraphUpdater; -import org.opentripplanner.updater.spi.UpdateResult; -import org.opentripplanner.updater.spi.WriteToGraphCallback; -import org.opentripplanner.updater.trip.UpdateIncrementality; -import org.opentripplanner.updater.trip.metrics.TripUpdateMetrics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import uk.org.siri.siri20.EstimatedTimetableDeliveryStructure; +import uk.org.siri.siri20.ServiceDelivery; import uk.org.siri.siri20.Siri; import uk.org.siri.www.siri.SiriType; /** - * This class starts a Google PubSub subscription + * A source of estimated timetables that reads SIRI-ET messages from a Google PubSub subscription. + *

+ * This class starts a Google PubSub subscription *

* NOTE: - Path to Google credentials (.json-file) MUST exist in environment-variable * "GOOGLE_APPLICATION_CREDENTIALS" as described here: - * ServiceAccount need access to + * ServiceAccount need access + * to * create subscription ("editor") *

*

@@ -69,9 +63,11 @@ * "dataInitializationUrl": "http://server/realtime/protobuf/et" // Optional URL used to initialize OTP with all existing data * */ -public class SiriETGooglePubsubUpdater implements GraphUpdater { +public class GooglePubsubEstimatedTimetableSource implements AsyncEstimatedTimetableSource { - private static final Logger LOG = LoggerFactory.getLogger(SiriETGooglePubsubUpdater.class); + private static final Logger LOG = LoggerFactory.getLogger( + GooglePubsubEstimatedTimetableSource.class + ); private static final AtomicLong MESSAGE_COUNTER = new AtomicLong(0); private static final AtomicLong UPDATE_COUNTER = new AtomicLong(0); @@ -83,80 +79,56 @@ public class SiriETGooglePubsubUpdater implements GraphUpdater { private static final int RETRY_BACKOFF = 2; /** - * The URL used to fetch all initial updates + * The URL used to fetch all initial updates. + * The URL responds to HTTP GET and returns all initial data in protobuf-format. It will be + * called once to initialize real-time-data. + * All subsequent updates will be received from Google Cloud Pubsub. */ private final URI dataInitializationUrl; + /** - * The ID for the static feed to which these TripUpdates are applied - */ - private final String feedId; - /** - * The number of seconds to wait before reconnecting after a failed connection. + * The time to wait before reconnecting after a failed connection. */ - private final java.time.Duration reconnectPeriod; + private final Duration reconnectPeriod; /** * For larger deployments it sometimes takes more than the default 30 seconds to fetch data, if so * this parameter can be increased. */ - private final java.time.Duration initialGetDataTimeout; + private final Duration initialGetDataTimeout; private final String subscriptionName; private final ProjectTopicName topic; private final Subscriber subscriber; private final PushConfig pushConfig; - private final String configRef; - private final SiriTimetableSnapshotSource snapshotSource; - private final SiriFuzzyTripMatcher fuzzyTripMatcher; private final Instant startTime = Instant.now(); - private final Consumer recordMetrics; - private final EntityResolver entityResolver; private final OtpRetry retry; - /** - * Parent update manager. Is used to execute graph writer runnables. - */ - private WriteToGraphCallback saveResultOnGraph; - + private Function> serviceDeliveryConsumer; private volatile boolean primed; - public SiriETGooglePubsubUpdater( - SiriETGooglePubsubUpdaterParameters config, - TransitModel transitModel, - SiriTimetableSnapshotSource timetableSnapshot + public GooglePubsubEstimatedTimetableSource( + String dataInitializationUrl, + Duration reconnectPeriod, + Duration initialGetDataTimeout, + String subscriptionProjectName, + String topicProjectName, + String topicName ) { - this.configRef = config.configRef(); - - // URL that responds to HTTP GET which returns all initial data in protobuf-format. Will be - // called once to initialize real-time-data. All updates will be received from Google Cloud - // Pubsub - this.dataInitializationUrl = URI.create(config.dataInitializationUrl()); - this.feedId = config.feedId(); - this.reconnectPeriod = config.reconnectPeriod(); - this.initialGetDataTimeout = config.initialGetDataTimeout(); - this.snapshotSource = timetableSnapshot; - - // set subscriber - String subscriptionId = buildSubscriptionId(); - String subscriptionProjectName = config.subscriptionProjectName(); - String topicProjectName = config.topicProjectName(); - - String topicName = config.topicName(); + // + this.dataInitializationUrl = URI.create(dataInitializationUrl); + this.reconnectPeriod = reconnectPeriod; + this.initialGetDataTimeout = initialGetDataTimeout; + String subscriptionId = buildSubscriptionId(); subscriptionName = ProjectSubscriptionName.of(subscriptionProjectName, subscriptionId).toString(); subscriber = Subscriber.newBuilder(subscriptionName, new EstimatedTimetableMessageReceiver()).build(); this.topic = ProjectTopicName.of(topicProjectName, topicName); this.pushConfig = PushConfig.getDefaultInstance(); - TransitService transitService = new DefaultTransitService(transitModel); - this.entityResolver = new EntityResolver(transitService, feedId); - this.fuzzyTripMatcher = - config.fuzzyTripMatching() ? SiriFuzzyTripMatcher.of(transitService) : null; - recordMetrics = TripUpdateMetrics.streaming(config); - addShutdownHook(); retry = new OtpRetryBuilder() .withName("SIRI-ET Google PubSub Updater setup") @@ -164,15 +136,20 @@ public SiriETGooglePubsubUpdater( .withInitialRetryInterval(RETRY_INITIAL_DELAY) .withBackoffMultiplier(RETRY_BACKOFF) .build(); - } - @Override - public void setup(WriteToGraphCallback writeToGraphCallback) { - this.saveResultOnGraph = writeToGraphCallback; + addShutdownHook(); } + /** + * Create a PubSub subscription, read the backlog of messages and start listening to the + * subscription. + * Enter an infinite loop waiting for messages. An interruption sent at server + * shutdown will cause the loop to stop. + */ @Override - public void run() { + public void start(Function> serviceDeliveryConsumer) { + this.serviceDeliveryConsumer = serviceDeliveryConsumer; + try { LOG.info("Creating subscription {}", subscriptionName); retry.execute(this::createSubscription); @@ -184,6 +161,7 @@ public void run() { while (true) { try { subscriber.startAsync().awaitRunning(); + primed = true; subscriber.awaitTerminated(); } catch (IllegalStateException e) { subscriber.stopAsync(); @@ -198,25 +176,7 @@ public void run() { @Override public boolean isPrimed() { - return this.primed; - } - - @Override - public String getConfigRef() { - return configRef; - } - - private void addShutdownHook() { - ApplicationShutdownSupport.addShutdownHook( - "siri-et-google-pubsub-shutdown", - () -> { - if (subscriber != null) { - LOG.info("Stopping SIRI-ET PubSub subscriber '{}'.", subscriptionName); - subscriber.stopAsync(); - } - deleteSubscription(); - } - ); + return primed; } /** @@ -284,10 +244,24 @@ private void deleteSubscription() { } } - private String getTimeSinceStartupString() { - return DurationUtils.durationToStr(Duration.between(startTime, Instant.now())); + /** + * Decode the protobuf-encoded message payload into an optional SIRI ServiceDelivery. + */ + private Optional serviceDelivery(ByteString data) { + SiriType siriType; + try { + siriType = SiriType.parseFrom(data); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + Siri siri = SiriMapper.mapToJaxb(siriType); + return Optional.ofNullable(siri.getServiceDelivery()); } + /** + * Fetch the backlog of messages and apply the changes to the transit model. + * Block until the backlog is applied. + */ private void initializeData() { if (dataInitializationUrl != null) { LOG.info("Fetching initial data from {}", dataInitializationUrl); @@ -299,8 +273,19 @@ private void initializeData() { (t2 - t1), FileSizeToTextConverter.fileSizeToString(value.size()) ); - processSiriData(value); - primed = true; + serviceDelivery(value) + .map(serviceDeliveryConsumer) + .ifPresent(future -> { + try { + future.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + }); + LOG.info( "Pubsub updater initialized after {} ms: [messages: {}, updates: {}, total size: {}, time since startup: {}]", (System.currentTimeMillis() - t2), @@ -312,6 +297,9 @@ private void initializeData() { } } + /** + * Fetch the backlog of messages over HTTP from the configured data initialization URL. + */ private ByteString fetchInitialData() { try (OtpHttpClientFactory otpHttpClientFactory = new OtpHttpClientFactory()) { var otpHttpClient = otpHttpClientFactory.create(LOG); @@ -324,82 +312,72 @@ private ByteString fetchInitialData() { } } - private void processSiriData(ByteString data) { - Siri siri; - try { - SIZE_COUNTER.addAndGet(data.size()); - final SiriType siriType = SiriType.parseFrom(data); - siri = SiriMapper.mapToJaxb(siriType); - } catch (InvalidProtocolBufferException e) { - throw new RuntimeException(e); - } - - if (siri.getServiceDelivery() != null) { - // Handle trip updates via graph writer runnable - List estimatedTimetableDeliveries = siri - .getServiceDelivery() - .getEstimatedTimetableDeliveries(); - - int numberOfUpdatedTrips = 0; - try { - numberOfUpdatedTrips = - estimatedTimetableDeliveries - .get(0) - .getEstimatedJourneyVersionFrames() - .get(0) - .getEstimatedVehicleJourneies() - .size(); - } catch (Exception e) { - //ignore - } - long numberOfUpdates = UPDATE_COUNTER.addAndGet(numberOfUpdatedTrips); - long numberOfMessages = MESSAGE_COUNTER.incrementAndGet(); - - if (numberOfMessages % 1000 == 0) { - LOG.info( - "Pubsub stats: [messages: {}, updates: {}, total size: {}, current delay {} ms, time since startup: {}]", - numberOfMessages, - numberOfUpdates, - FileSizeToTextConverter.fileSizeToString(SIZE_COUNTER.get()), - Duration - .between(siri.getServiceDelivery().getResponseTimestamp().toInstant(), Instant.now()) - .toMillis(), - getTimeSinceStartupString() - ); - } - - var f = saveResultOnGraph.execute((graph, transitModel) -> { - var results = snapshotSource.applyEstimatedTimetable( - fuzzyTripMatcher, - entityResolver, - feedId, - UpdateIncrementality.DIFFERENTIAL, - estimatedTimetableDeliveries - ); - - recordMetrics.accept(results); - }); - - if (!isPrimed()) { - try { - f.get(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } catch (ExecutionException e) { - throw new RuntimeException(e); + /** + * Shut down the PubSub subscriber at server shutdown. + */ + private void addShutdownHook() { + ApplicationShutdownSupport.addShutdownHook( + "siri-et-google-pubsub-shutdown", + () -> { + if (subscriber != null) { + LOG.info("Stopping SIRI-ET PubSub subscriber '{}'.", subscriptionName); + subscriber.stopAsync(); } + deleteSubscription(); } - } + ); } + private String getTimeSinceStartupString() { + return DurationUtils.durationToStr(Duration.between(startTime, Instant.now())); + } + + /** + * Message receiver callback that consumes messages from the PubSub subscription. + */ class EstimatedTimetableMessageReceiver implements MessageReceiver { @Override public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) { - processSiriData(message.getData()); + Optional serviceDelivery = serviceDelivery(message.getData()); + serviceDelivery.ifPresent(sd -> { + logPubsubMessage(sd); + serviceDeliveryConsumer.apply(sd); + }); + // Ack only after all work for the message is complete. consumer.ack(); } } + + private void logPubsubMessage(ServiceDelivery serviceDelivery) { + int numberOfUpdatedTrips = 0; + try { + numberOfUpdatedTrips = + serviceDelivery + .getEstimatedTimetableDeliveries() + .getFirst() + .getEstimatedJourneyVersionFrames() + .getFirst() + .getEstimatedVehicleJourneies() + .size(); + } catch (Exception e) { + //ignore + } + long numberOfUpdates = UPDATE_COUNTER.addAndGet(numberOfUpdatedTrips); + long numberOfMessages = MESSAGE_COUNTER.incrementAndGet(); + + if (numberOfMessages % 1000 == 0) { + LOG.info( + "Pubsub stats: [messages: {}, updates: {}, total size: {}, current delay {} ms, time since startup: {}]", + numberOfMessages, + numberOfUpdates, + FileSizeToTextConverter.fileSizeToString(SIZE_COUNTER.get()), + Duration + .between(serviceDelivery.getResponseTimestamp().toInstant(), Instant.now()) + .toMillis(), + getTimeSinceStartupString() + ); + } + } } diff --git a/src/ext/java/org/opentripplanner/ext/siri/updater/google/SiriETGooglePubsubUpdater.java b/src/ext/java/org/opentripplanner/ext/siri/updater/google/SiriETGooglePubsubUpdater.java new file mode 100644 index 00000000000..58c01815230 --- /dev/null +++ b/src/ext/java/org/opentripplanner/ext/siri/updater/google/SiriETGooglePubsubUpdater.java @@ -0,0 +1,80 @@ +package org.opentripplanner.ext.siri.updater.google; + +import java.util.function.Consumer; +import org.opentripplanner.ext.siri.SiriTimetableSnapshotSource; +import org.opentripplanner.ext.siri.updater.AsyncEstimatedTimetableProcessor; +import org.opentripplanner.ext.siri.updater.AsyncEstimatedTimetableSource; +import org.opentripplanner.ext.siri.updater.EstimatedTimetableHandler; +import org.opentripplanner.transit.service.DefaultTransitService; +import org.opentripplanner.transit.service.TransitModel; +import org.opentripplanner.updater.spi.GraphUpdater; +import org.opentripplanner.updater.spi.UpdateResult; +import org.opentripplanner.updater.spi.WriteToGraphCallback; +import org.opentripplanner.updater.trip.metrics.TripUpdateMetrics; + +/** + * Graph updater that processes a SIRI-ET feed based on a Google Pubsub subscription. This class + * configures a {@link GooglePubsubEstimatedTimetableSource} and an {@link EstimatedTimetableHandler} + * and delegates the update process to {@link AsyncEstimatedTimetableProcessor} + */ +public class SiriETGooglePubsubUpdater implements GraphUpdater { + + private final String configRef; + private final AsyncEstimatedTimetableSource asyncEstimatedTimetableSource; + private final EstimatedTimetableHandler estimatedTimetableHandler; + private final Consumer updateResultConsumer; + private WriteToGraphCallback saveResultOnGraph; + + public SiriETGooglePubsubUpdater( + SiriETGooglePubsubUpdaterParameters config, + TransitModel transitModel, + SiriTimetableSnapshotSource timetableSnapshotSource + ) { + configRef = config.configRef(); + + asyncEstimatedTimetableSource = + new GooglePubsubEstimatedTimetableSource( + config.dataInitializationUrl(), + config.reconnectPeriod(), + config.initialGetDataTimeout(), + config.subscriptionProjectName(), + config.topicProjectName(), + config.topicName() + ); + + estimatedTimetableHandler = + new EstimatedTimetableHandler( + timetableSnapshotSource, + config.fuzzyTripMatching(), + new DefaultTransitService(transitModel), + config.feedId() + ); + + updateResultConsumer = TripUpdateMetrics.streaming(config); + } + + @Override + public void setup(WriteToGraphCallback writeToGraphCallback) { + this.saveResultOnGraph = writeToGraphCallback; + } + + @Override + public void run() { + AsyncEstimatedTimetableProcessor asyncEstimatedTimetableProcessor = new AsyncEstimatedTimetableProcessor( + estimatedTimetableHandler, + saveResultOnGraph, + updateResultConsumer + ); + asyncEstimatedTimetableSource.start(asyncEstimatedTimetableProcessor::processSiriData); + } + + @Override + public boolean isPrimed() { + return asyncEstimatedTimetableSource.isPrimed(); + } + + @Override + public String getConfigRef() { + return configRef; + } +} diff --git a/src/ext/java/org/opentripplanner/ext/siri/updater/SiriETGooglePubsubUpdaterParameters.java b/src/ext/java/org/opentripplanner/ext/siri/updater/google/SiriETGooglePubsubUpdaterParameters.java similarity index 97% rename from src/ext/java/org/opentripplanner/ext/siri/updater/SiriETGooglePubsubUpdaterParameters.java rename to src/ext/java/org/opentripplanner/ext/siri/updater/google/SiriETGooglePubsubUpdaterParameters.java index acb75509913..24ff12f6bd4 100644 --- a/src/ext/java/org/opentripplanner/ext/siri/updater/SiriETGooglePubsubUpdaterParameters.java +++ b/src/ext/java/org/opentripplanner/ext/siri/updater/google/SiriETGooglePubsubUpdaterParameters.java @@ -1,4 +1,4 @@ -package org.opentripplanner.ext.siri.updater; +package org.opentripplanner.ext.siri.updater.google; import java.time.Duration; import java.util.Objects; diff --git a/src/main/java/org/opentripplanner/standalone/config/routerconfig/UpdatersConfig.java b/src/main/java/org/opentripplanner/standalone/config/routerconfig/UpdatersConfig.java index 08371660b8a..b60b389f788 100644 --- a/src/main/java/org/opentripplanner/standalone/config/routerconfig/UpdatersConfig.java +++ b/src/main/java/org/opentripplanner/standalone/config/routerconfig/UpdatersConfig.java @@ -21,11 +21,11 @@ import java.util.List; import java.util.function.BiFunction; import javax.annotation.Nullable; -import org.opentripplanner.ext.siri.updater.SiriETGooglePubsubUpdaterParameters; import org.opentripplanner.ext.siri.updater.SiriETUpdaterParameters; import org.opentripplanner.ext.siri.updater.SiriSXUpdaterParameters; import org.opentripplanner.ext.siri.updater.azure.SiriAzureETUpdaterParameters; import org.opentripplanner.ext.siri.updater.azure.SiriAzureSXUpdaterParameters; +import org.opentripplanner.ext.siri.updater.google.SiriETGooglePubsubUpdaterParameters; import org.opentripplanner.ext.vehiclerentalservicedirectory.VehicleRentalServiceDirectoryFetcher; import org.opentripplanner.ext.vehiclerentalservicedirectory.api.VehicleRentalServiceDirectoryFetcherParameters; import org.opentripplanner.standalone.config.framework.json.NodeAdapter; diff --git a/src/main/java/org/opentripplanner/standalone/config/routerconfig/updaters/SiriETGooglePubsubUpdaterConfig.java b/src/main/java/org/opentripplanner/standalone/config/routerconfig/updaters/SiriETGooglePubsubUpdaterConfig.java index 1d7c8249224..45221785109 100644 --- a/src/main/java/org/opentripplanner/standalone/config/routerconfig/updaters/SiriETGooglePubsubUpdaterConfig.java +++ b/src/main/java/org/opentripplanner/standalone/config/routerconfig/updaters/SiriETGooglePubsubUpdaterConfig.java @@ -1,10 +1,10 @@ package org.opentripplanner.standalone.config.routerconfig.updaters; -import static org.opentripplanner.ext.siri.updater.SiriETGooglePubsubUpdaterParameters.INITIAL_GET_DATA_TIMEOUT; -import static org.opentripplanner.ext.siri.updater.SiriETGooglePubsubUpdaterParameters.RECONNECT_PERIOD; +import static org.opentripplanner.ext.siri.updater.google.SiriETGooglePubsubUpdaterParameters.INITIAL_GET_DATA_TIMEOUT; +import static org.opentripplanner.ext.siri.updater.google.SiriETGooglePubsubUpdaterParameters.RECONNECT_PERIOD; import static org.opentripplanner.standalone.config.framework.json.OtpVersion.NA; -import org.opentripplanner.ext.siri.updater.SiriETGooglePubsubUpdaterParameters; +import org.opentripplanner.ext.siri.updater.google.SiriETGooglePubsubUpdaterParameters; import org.opentripplanner.standalone.config.framework.json.NodeAdapter; public class SiriETGooglePubsubUpdaterConfig { diff --git a/src/main/java/org/opentripplanner/updater/UpdatersParameters.java b/src/main/java/org/opentripplanner/updater/UpdatersParameters.java index a955e757100..5f27f9dbd45 100644 --- a/src/main/java/org/opentripplanner/updater/UpdatersParameters.java +++ b/src/main/java/org/opentripplanner/updater/UpdatersParameters.java @@ -1,11 +1,11 @@ package org.opentripplanner.updater; import java.util.List; -import org.opentripplanner.ext.siri.updater.SiriETGooglePubsubUpdaterParameters; import org.opentripplanner.ext.siri.updater.SiriETUpdaterParameters; import org.opentripplanner.ext.siri.updater.SiriSXUpdaterParameters; import org.opentripplanner.ext.siri.updater.azure.SiriAzureETUpdaterParameters; import org.opentripplanner.ext.siri.updater.azure.SiriAzureSXUpdaterParameters; +import org.opentripplanner.ext.siri.updater.google.SiriETGooglePubsubUpdaterParameters; import org.opentripplanner.ext.vehiclerentalservicedirectory.api.VehicleRentalServiceDirectoryFetcherParameters; import org.opentripplanner.updater.alert.GtfsRealtimeAlertsUpdaterParameters; import org.opentripplanner.updater.trip.MqttGtfsRealtimeUpdaterParameters; diff --git a/src/main/java/org/opentripplanner/updater/configure/UpdaterConfigurator.java b/src/main/java/org/opentripplanner/updater/configure/UpdaterConfigurator.java index 3b6f7f52279..1e9b730ee3a 100644 --- a/src/main/java/org/opentripplanner/updater/configure/UpdaterConfigurator.java +++ b/src/main/java/org/opentripplanner/updater/configure/UpdaterConfigurator.java @@ -3,11 +3,11 @@ import java.util.ArrayList; import java.util.List; import org.opentripplanner.ext.siri.SiriTimetableSnapshotSource; -import org.opentripplanner.ext.siri.updater.SiriETGooglePubsubUpdater; import org.opentripplanner.ext.siri.updater.SiriETUpdater; import org.opentripplanner.ext.siri.updater.SiriSXUpdater; import org.opentripplanner.ext.siri.updater.azure.SiriAzureETUpdater; import org.opentripplanner.ext.siri.updater.azure.SiriAzureSXUpdater; +import org.opentripplanner.ext.siri.updater.google.SiriETGooglePubsubUpdater; import org.opentripplanner.ext.vehiclerentalservicedirectory.VehicleRentalServiceDirectoryFetcher; import org.opentripplanner.ext.vehiclerentalservicedirectory.api.VehicleRentalServiceDirectoryFetcherParameters; import org.opentripplanner.framework.io.OtpHttpClientFactory; diff --git a/src/test/java/org/opentripplanner/updater/trip/RealtimeTestEnvironment.java b/src/test/java/org/opentripplanner/updater/trip/RealtimeTestEnvironment.java index c2fb31e2dfb..d557aa1319b 100644 --- a/src/test/java/org/opentripplanner/updater/trip/RealtimeTestEnvironment.java +++ b/src/test/java/org/opentripplanner/updater/trip/RealtimeTestEnvironment.java @@ -12,9 +12,9 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import org.opentripplanner.DateTimeHelper; -import org.opentripplanner.ext.siri.EntityResolver; import org.opentripplanner.ext.siri.SiriFuzzyTripMatcher; import org.opentripplanner.ext.siri.SiriTimetableSnapshotSource; +import org.opentripplanner.ext.siri.updater.EstimatedTimetableHandler; import org.opentripplanner.graph_builder.issue.api.DataImportIssueStore; import org.opentripplanner.model.StopTime; import org.opentripplanner.model.TimetableSnapshot; @@ -167,8 +167,13 @@ public String getFeedId() { return TransitModelForTest.FEED_ID; } - public EntityResolver getEntityResolver() { - return new EntityResolver(getTransitService(), getFeedId()); + private EstimatedTimetableHandler getEstimatedTimetableHandler(boolean fuzzyMatching) { + return new EstimatedTimetableHandler( + siriSource, + fuzzyMatching ? new SiriFuzzyTripMatcher(getTransitService()) : null, + getTransitService(), + getFeedId() + ); } public TripPattern getPatternForTrip(FeedScopedId tripId) { @@ -242,18 +247,11 @@ public String getScheduledTimetable(FeedScopedId tripId) { public UpdateResult applyEstimatedTimetableWithFuzzyMatcher( List updates ) { - SiriFuzzyTripMatcher siriFuzzyTripMatcher = new SiriFuzzyTripMatcher(getTransitService()); - return applyEstimatedTimetable(updates, siriFuzzyTripMatcher); + return applyEstimatedTimetable(updates, true); } public UpdateResult applyEstimatedTimetable(List updates) { - return siriSource.applyEstimatedTimetable( - null, - getEntityResolver(), - getFeedId(), - DIFFERENTIAL, - updates - ); + return applyEstimatedTimetable(updates, false); } // GTFS-RT updates @@ -287,16 +285,10 @@ public UpdateResult applyTripUpdates( private UpdateResult applyEstimatedTimetable( List updates, - SiriFuzzyTripMatcher siriFuzzyTripMatcher + boolean fuzzyMatching ) { Objects.requireNonNull(siriSource, "Test environment is configured for GTFS-RT only"); - return siriSource.applyEstimatedTimetable( - siriFuzzyTripMatcher, - getEntityResolver(), - getFeedId(), - DIFFERENTIAL, - updates - ); + return getEstimatedTimetableHandler(fuzzyMatching).applyUpdate(updates, DIFFERENTIAL); } private Trip createTrip(String id, Route route, List stops) {