Skip to content

Commit

Permalink
Add more Javadoc and questions to realtime code
Browse files Browse the repository at this point in the history
  • Loading branch information
abyrd committed Jan 21, 2024
1 parent 54ed5f1 commit b2b45a2
Show file tree
Hide file tree
Showing 12 changed files with 140 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,26 @@
import uk.org.siri.siri20.WorkflowStatusEnumeration;

/**
* This updater applies the equivalent of GTFS Alerts, but from SIRI Situation Exchange feeds. NOTE
* this cannot handle situations where there are multiple feeds with different IDs (for now it may
* only work in single-feed regions).
* This updater applies the equivalent of GTFS Alerts, but from SIRI Situation Exchange feeds.
* TODO REALTIME: The name should be clarified, as there is no such thing as "SIRI Alerts", and it
* is referencing the internal model concept of "Alerts" which are derived from GTFS terminology.
* NOTE this cannot handle situations where there are multiple feeds with different IDs (for now it
* may only work in single-feed regions).
*/
public class SiriAlertsUpdateHandler {

private static final Logger LOG = LoggerFactory.getLogger(SiriAlertsUpdateHandler.class);
private final String feedId;
private final Set<TransitAlert> alerts = new HashSet<>();
private final TransitAlertService transitAlertService;
/** How long before the posted start of an event it should be displayed to users */

/** How long before the posted start of an event it should be displayed to users. */
private final Duration earlyStart;

/**
* This takes the parts of the SIRI SX message saying which transit entities are affected and
* maps them to multiple OTP internal model entities.
*/
private final AffectsMapper affectsMapper;

public SiriAlertsUpdateHandler(
Expand Down Expand Up @@ -120,6 +128,12 @@ public void update(ServiceDelivery delivery) {
}
}

/**
* FIXME REALTIME This does not just "handle" an alert, it builds an internal model Alert from
* an incoming SIRI situation exchange element. It is a mapper or factory.
* It may return null if all of header, description, and detail text are empty or missing in the
* SIRI message. In all other cases it will return a valid TransitAlert instance.
*/
private TransitAlert handleAlert(PtSituationElement situation) {
TransitAlertBuilder alert = createAlertWithTexts(situation);

Expand Down Expand Up @@ -196,7 +210,9 @@ private long getEpochSecond(ZonedDateTime startTime) {
}

/*
* Creates alert from PtSituation with all textual content
* Creates alert from PtSituation with all textual content.
* The feed scoped ID of this alert will be the single feed ID associated with this update handler
* and the situation number provided in the feed.
*/
private TransitAlertBuilder createAlertWithTexts(PtSituationElement situation) {
return TransitAlert
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@

/**
* Maps a {@link AffectsScopeStructure} to a list of {@link EntitySelector}s
*
* Concretely: this takes the parts of the SIRI SX (Alerts) message describing which transit
* entities are concerned by the alert, and maps them to EntitySelectors, which can match multiple
* OTP internal model entities that should be associated with the message.
*/
public class AffectsMapper {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public class SiriSXUpdater extends PollingGraphUpdater implements TransitAlertPr
private final String url;
private final String originalRequestorRef;
private final TransitAlertService transitAlertService;
// TODO What is this, why does it exist as a persistent instance?
private final SiriAlertsUpdateHandler updateHandler;
private WriteToGraphCallback saveResultOnGraph;
private ZonedDateTime lastTimestamp = ZonedDateTime.now().minusWeeks(1);
Expand Down Expand Up @@ -85,6 +86,7 @@ public SiriSXUpdater(SiriSXUpdaterParameters config, TransitModel transitModel)

@Override
public void setGraphUpdaterManager(WriteToGraphCallback saveResultOnGraph) {
// TODO REALTIME this callback should have a different name, it is currently too verb-like.
this.saveResultOnGraph = saveResultOnGraph;
}

Expand All @@ -101,6 +103,9 @@ protected void runPolling() throws InterruptedException {
retry.execute(this::updateSiri);
}

/**
* This part has been factored out to allow repeated retries in case the connection fails etc.
*/
private void updateSiri() {
boolean moreData = false;
do {
Expand All @@ -112,6 +117,27 @@ private void updateSiri() {
// primitive, because the object moreData persists across iterations.
final boolean markPrimed = !moreData;
if (serviceDelivery.getSituationExchangeDeliveries() != null) {
// FIXME REALTIME This is submitting a method on a long-lived instance as a runnable.
// These runnables were intended to be small, disposable self-contained update tasks.
// See org/opentripplanner/updater/trip/PollingTripUpdater.java:90
// Clarify why that is passing in so many other references. It should only contain
// what's needed to operate on the graph. This should be illustrated in documentation
// as a little box labeled "change trip ABC123 by making stop 53 late by 2 minutes."
// Also clarify how this works without even using the supplied graph or TransitModel:
// there are multiple TransitAlertServices and they are not versioned along with the\
// Graph, they are attached to updaters.
// This is submitting a runnable to an executor, but that runnable only writes back to
// objects owned by this updater itself with no versioning. Why is this happening?
// If this is an intentional choice to live-patch a single server-wide instance of an
// alerts service/index while it's already in use by routing, we should be clear about
// this and document why it differs from the graph-writer design. Currently the code
// seems to go through the a ritual of following the threadsafe copy-on-write pattern
// without actually doing so.
// It's understandable to defer the list-of-alerts processing to another thread than this
// fetching thread, but I don't think we want that happening on the graph writer thread.
// There seems to be a misunderstanding that the tasks are submitted to get them off the
// updater thread, but the real reason is to ensure consistent transactions in graph
// writing and reading.
saveResultOnGraph.execute((graph, transitModel) -> {
updateHandler.update(serviceDelivery);
if (markPrimed) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ public class OtpRetry {
private final Duration initialRetryInterval;
private final int backoffMultiplier;
private final Runnable onRetry;

/**
* A predicate to determine whether a particular exception should end the retry cycle or not.
* If the predicate returns true, retries will continue. False, and the retry cycle is broken.
*/
private final Predicate<Exception> retryableException;

OtpRetry(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@
import org.opentripplanner.transit.model.framework.FeedScopedId;
import org.opentripplanner.transit.model.timetable.Direction;

/**
* This encompasses many different kinds of entity keys, all of which are simple record types, all
* grouped together as the only allowed implementations of a sealed marker interface. These key
* types represent various combinations used to look up Alerts that might be associated with a
* particular stop, or a stop on a route, or all routes of a certain type etc.
*/
public sealed interface EntityKey {
record Agency(FeedScopedId agencyId) implements EntityKey {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,14 @@
import org.opentripplanner.transit.model.framework.FeedScopedId;
import org.opentripplanner.transit.model.timetable.Direction;

/**
* Describes which elements in the internal transit data model are affected by a realtime alert.
* Note that this is specific to alerts and doesn't seem to be used by anything else.
* This is probably because alerts are unique in their ability to attach themselves to many
* different routes, stops, etc. at once, while non-alert elements tend to be associated with very
* specific single other elements.
* @see EntityKey
*/
public sealed interface EntitySelector {
EntityKey key();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,23 @@
* This class is used to combine alerts from multiple {@link TransitAlertService}s. Each
* {@link TransitAlertProvider} has its own service, and all need to be queried in order to fetch
* all alerts.
*
* Concretely: every realtime updater receiving GTFS Alerts or SIRI Situation Exchange (SX)
* messages currently maintains its own private index of alerts seperate from all other updaters.
* To make the set of all alerts from all updaters available in a single operaion and associate it
* with the graph as a whole, the various indexes are merged in such a way as to have the same
* index as each individual index.
*/
public class DelegatingTransitAlertServiceImpl implements TransitAlertService {

private final ArrayList<TransitAlertService> transitAlertServices = new ArrayList<>();

/**
* Constructor which scans over all existing GraphUpdaters associated with a TransitModel
* instance and retains references to all their TransitAlertService instances.
* This implies that these instances are expected to remain in use indefinitely (not be replaced
* with new instances or taken out of service over time).
*/
public DelegatingTransitAlertServiceImpl(TransitModel transitModel) {
if (transitModel.getUpdaterManager() != null) {
transitModel
Expand All @@ -38,7 +50,9 @@ public DelegatingTransitAlertServiceImpl(TransitModel transitModel) {

@Override
public void setAlerts(Collection<TransitAlert> alerts) {
throw new UnsupportedOperationException("Not supported");
throw new UnsupportedOperationException(
"This delegating TransitAlertService is not intended to hold any TransitAlerts of its own."
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,16 @@
import org.opentripplanner.transit.service.TransitModel;

/**
* This is the primary implementation of TransitAlertService, which actually retains its own set
* of TransitAlerts and indexes them for fast lookup by which transit entity is affected.
* The only other implementation exists just to combine several instances of this primary
* implementation into one.
* TODO REALTIME investigate why each updater has its own service instead of taking turns
* sequentially writing to a single service. Original design was for all data and indexes to be
* associated with the Graph or transit model (i.e. the object graph of instances of the transit
* model) and for updaters to submit write tasks that would patch the current version in a
* sequential way, e.g. "add these 10 alerts", "remove these 5 alerts", etc.
*
* When an alert is added with more than one transit entity, e.g. a Stop and a Trip, both conditions
* must be met for the alert to be displayed. This is the case in both the Norwegian interpretation
* of SIRI, and the GTFS-RT alerts specification.
Expand All @@ -32,6 +42,11 @@ public TransitAlertServiceImpl(TransitModel transitModel) {

@Override
public void setAlerts(Collection<TransitAlert> alerts) {
// NOTE this is being patched live by updaters while in use (being read) by other threads
// performing trip planning. The single-action assignment helps a bit, but the map can be
// swapped out while the delegating service is in the middle of multiple calls that read from it.
// The consistent approach would be to duplicate the entire service, update it copy-on write,
// and swap in the entire service after the update.
Multimap<EntityKey, TransitAlert> newAlerts = HashMultimap.create();
for (TransitAlert alert : alerts) {
for (EntitySelector entity : alert.entities()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,24 @@
import org.opentripplanner.transit.model.framework.FeedScopedId;
import org.opentripplanner.transit.model.timetable.Direction;

/**
* The TransitAlertService stores a set of alerts (passenger-facing textual information associated
* with transit entities such as stops or routes) which are currently active and should be provided
* to end users when their itineraries include the relevant stop, route, etc.
*
* Its primary purpose is to index those alerts, which may be numerous, so they can be looked up
* rapidly and attached to the various pieces of an itinerary as it's being returned to the user.
*
* Most elements in an itinerary will have no alerts attached, so those cases need to return
* quickly. For example, no alerts on board stop A, no alerts on route 1 ridden, no alerts on alight
* stop B, no alerts on route 2 ridden, yes one alert found on alight stop C.
*
* The fact that alerts are relatively sparse (at the scale of the entire transportation system)
* is central to this implementation. Adding a list of alerts to every element in the system would
* mean storing large amounts of null or empty list references. Instead, alerts are looked up in
* maps allowing them to be attached to any object with minimal space overhead, but requiring some
* careful indexing to ensure their presence or absence on each object can be determined quickly.
*/
public interface TransitAlertService {
void setAlerts(Collection<TransitAlert> alerts);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,24 @@
import java.util.Optional;
import org.opentripplanner.framework.doc.DocumentedEnum;

/**
* This converts strings appearing in configuration files into enum values.
* The values appearing in config files are case-insensitive and can use either dashes
* or underscores indiscriminately.
* Dashes are replaced with underscores, and the string is converted to upper case.
* In practice, this serves to convert from kebab-case to SCREAMING_SNAKE_CASE (which is
* conventional for Java enum values), leaving the latter unchanged if it's used in the config file.
*/
public class EnumMapper {

@SuppressWarnings("unchecked")
public static <E extends Enum<E>> Optional<E> mapToEnum(String text, Class<E> type) {
return (Optional<E>) mapToEnum2(text, type);
}

/**
* Maps an individual value from a config file into its corresponding enum value.
*/
public static Optional<? extends Enum<?>> mapToEnum2(String text, Class<? extends Enum<?>> type) {
if (text == null) {
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@
import org.opentripplanner.routing.api.request.framework.TimePenalty;
import org.opentripplanner.transit.model.framework.FeedScopedId;

/**
* TODO clarify whether this is building a declarative representation of the parameter, or building
* a concrete key-value pair for a parameter in a config file being read at server startup, or both.
*/
public class ParameterBuilder {

private static final Object UNDEFINED = new Object();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@

import org.opentripplanner.routing.services.TransitAlertService;

/**
* Interface for things that maintain their own individual index associating TransitAlerts with the
* transit entities they affect. In practice, these are always realtime updaters handling GTFS-RT
* Alerts or Siri SX messages. This interface appears to exist only to allow merging multiple such
* services together, which appears to be a workaround for not maintaining snapshots of a single
* instance-wide index.
*/
public interface TransitAlertProvider {
TransitAlertService getTransitAlertService();
}

0 comments on commit b2b45a2

Please sign in to comment.