Skip to content
This repository has been archived by the owner on Mar 3, 2025. It is now read-only.

Commit

Permalink
Refactor SIRI-ET updaters
Browse files Browse the repository at this point in the history
  • Loading branch information
vpaturet committed Jun 12, 2024
1 parent cbc6b20 commit a7f4e7b
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,24 @@
import java.util.concurrent.ExecutionException;
import uk.org.siri.siri20.ServiceDelivery;

public class SiriETAsyncUpdater {
/**
* Process a SIRI-ET feed by combining an asynchronous source of estimated timetables
* {@link AsyncEstimatedTimetableSource} with a consumer of estimated timetables
* {@link EstimatedTimetableHandler}
*/
public class AsyncEstimatedTimetableProcessor {

private final AsyncEstimatedTimetableSource siriMessageSource;
private final EstimatedTimetableUpdater estimatedTimetableUpdater;
private final EstimatedTimetableHandler estimatedTimetableHandler;

private volatile boolean primed;

public SiriETAsyncUpdater(
public AsyncEstimatedTimetableProcessor(
AsyncEstimatedTimetableSource siriMessageSource,
EstimatedTimetableUpdater estimatedTimetableUpdater
EstimatedTimetableHandler estimatedTimetableHandler
) {
this.siriMessageSource = siriMessageSource;
this.estimatedTimetableUpdater = estimatedTimetableUpdater;
this.estimatedTimetableHandler = estimatedTimetableHandler;
}

public void run() {
Expand All @@ -27,7 +32,7 @@ public boolean isPrimed() {
}

private void processSiriData(ServiceDelivery serviceDelivery) {
var f = estimatedTimetableUpdater.applyUpdate(
var f = estimatedTimetableHandler.applyUpdate(
serviceDelivery.getEstimatedTimetableDeliveries(),
false
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,20 @@
import java.util.function.Consumer;
import uk.org.siri.siri20.ServiceDelivery;

/**
* A source of estimated timetables produced by an asynchronous (push) SIRI-ET feed.
*/
public interface AsyncEstimatedTimetableSource {
/**
* Start reading from the SIRI-ET feed and forward the estimated timetables to a consumer for
* further processing.
* <br>Starting the source includes all the necessary steps to set up the network
* communication with the SIRI-ET feed as well as the (optional) processing of the message
* backlog, that is the recent history of SIRI-ET messages produced by this feed and made
* available by a message cache.
*
* @param serviceDeliveryConsumer a consumer of estimated timetable responsible for applying the
* update to the transit model.
*/
void start(Consumer<ServiceDelivery> serviceDeliveryConsumer);
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
import org.opentripplanner.updater.spi.WriteToGraphCallback;
import uk.org.siri.siri20.EstimatedTimetableDeliveryStructure;

public class EstimatedTimetableUpdater {
/**
* A consumer of estimated timetables that applies the real-time updates to the transit model.
*/
public class EstimatedTimetableHandler {

/**
* Parent update manager. Is used to execute graph writer runnables.
Expand All @@ -28,7 +31,7 @@ public class EstimatedTimetableUpdater {
*/
private final String feedId;

public EstimatedTimetableUpdater(
public EstimatedTimetableHandler(
WriteToGraphCallback saveResultOnGraph,
SiriTimetableSnapshotSource snapshotSource,
boolean fuzzyMatching,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,31 @@
import uk.org.siri.siri20.Siri;
import uk.org.siri.www.siri.SiriType;

/**
* A source of estimated timetables that reads SIRI-ET messages from a Google PubSub subscription.
* <p>
* This class starts a Google PubSub subscription
* <p>
* NOTE: - Path to Google credentials (.json-file) MUST exist in environment-variable
* "GOOGLE_APPLICATION_CREDENTIALS" as described here:
* <a href="https://cloud.google.com/docs/authentication/getting-started">ServiceAccount need access
* to
* create subscription ("editor")</a>
* <p>
* <p>
* <p>
* Startup-flow: 1. Create subscription to topic. Subscription will receive all updates after
* creation. 2. Fetch current data to initialize state. 3. Flag updater as initialized 3. Start
* receiving updates from Pubsub-subscription
*
*
* <pre>
* "type": "google-pubsub-siri-et-updater",
* "projectName":"project-1234", // Google Cloud project name
* "topicName": "protobuf.estimated_timetables", // Google Cloud Pubsub topic
* "dataInitializationUrl": "http://server/realtime/protobuf/et" // Optional URL used to initialize OTP with all existing data
* </pre>
*/
public class GooglePubsubEstimatedTimetableSource implements AsyncEstimatedTimetableSource {

private static final Logger LOG = LoggerFactory.getLogger(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,33 +10,15 @@
import org.opentripplanner.updater.trip.metrics.TripUpdateMetrics;

/**
* This class starts a Google PubSub subscription
* <p>
* NOTE: - Path to Google credentials (.json-file) MUST exist in environment-variable
* "GOOGLE_APPLICATION_CREDENTIALS" as described here:
* <a href="https://cloud.google.com/docs/authentication/getting-started">ServiceAccount need access
* to
* create subscription ("editor")</a>
* <p>
* <p>
* <p>
* Startup-flow: 1. Create subscription to topic. Subscription will receive all updates after
* creation. 2. Fetch current data to initialize state. 3. Flag updater as initialized 3. Start
* receiving updates from Pubsub-subscription
*
*
* <pre>
* "type": "google-pubsub-siri-et-updater",
* "projectName":"project-1234", // Google Cloud project name
* "topicName": "protobuf.estimated_timetables", // Google Cloud Pubsub topic
* "dataInitializationUrl": "http://server/realtime/protobuf/et" // Optional URL used to initialize OTP with all existing data
* </pre>
* Graph updater that processes a SIRI-ET feed based on a Google Pubsub subscription. This class
* configures a {@link GooglePubsubEstimatedTimetableSource} and an {@link EstimatedTimetableHandler}
* and delegates the update process to {@link AsyncEstimatedTimetableProcessor}
*/
public class SiriETGooglePubsubUpdater implements GraphUpdater {

private final String configRef;

private final SiriETAsyncUpdater siriETAsyncUpdater;
private final AsyncEstimatedTimetableProcessor asyncEstimatedTimetableProcessor;
/**
* Parent update manager. Is used to execute graph writer runnables.
*/
Expand All @@ -58,7 +40,7 @@ public SiriETGooglePubsubUpdater(
config.topicName()
);

EstimatedTimetableUpdater estimatedTimetableUpdater = new EstimatedTimetableUpdater(
EstimatedTimetableHandler estimatedTimetableHandler = new EstimatedTimetableHandler(
this::writeToCallBack,
timetableSnapshot,
config.fuzzyTripMatching(),
Expand All @@ -67,8 +49,8 @@ public SiriETGooglePubsubUpdater(
config.feedId()
);

this.siriETAsyncUpdater =
new SiriETAsyncUpdater(asyncSiriMessageSource, estimatedTimetableUpdater);
this.asyncEstimatedTimetableProcessor =
new AsyncEstimatedTimetableProcessor(asyncSiriMessageSource, estimatedTimetableHandler);
}

private Future<?> writeToCallBack(GraphWriterRunnable graphWriterRunnable) {
Expand All @@ -81,13 +63,13 @@ public void setup(WriteToGraphCallback writeToGraphCallback) {
}

@Override
public void run() throws Exception {
siriETAsyncUpdater.run();
public void run() {
asyncEstimatedTimetableProcessor.run();
}

@Override
public boolean isPrimed() {
return siriETAsyncUpdater.isPrimed();
return asyncEstimatedTimetableProcessor.isPrimed();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class SiriETUpdater extends PollingGraphUpdater {
*/
protected WriteToGraphCallback saveResultOnGraph;

private final EstimatedTimetableUpdater estimatedTimetableUpdater;
private final EstimatedTimetableHandler estimatedTimetableHandler;

public SiriETUpdater(
SiriETUpdaterParameters config,
Expand All @@ -58,8 +58,8 @@ public SiriETUpdater(
updateSource
);

estimatedTimetableUpdater =
new EstimatedTimetableUpdater(
estimatedTimetableHandler =
new EstimatedTimetableHandler(
this::writeToCallBack,
timetableSnapshot,
config.fuzzyTripMatching(),
Expand Down Expand Up @@ -103,7 +103,7 @@ public void runPolling() {
final boolean markPrimed = !moreData;
List<EstimatedTimetableDeliveryStructure> etds = serviceDelivery.getEstimatedTimetableDeliveries();
if (etds != null) {
estimatedTimetableUpdater.applyUpdate(
estimatedTimetableHandler.applyUpdate(
etds,
fullDataset,
() -> {
Expand Down

0 comments on commit a7f4e7b

Please sign in to comment.