Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unify timetable snapshot handling for GTFS-RT and Siri #5853

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,9 @@
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nullable;
import org.opentripplanner.framework.time.CountdownTimer;
import org.opentripplanner.model.Timetable;
import org.opentripplanner.model.TimetableSnapshot;
import org.opentripplanner.model.TimetableSnapshotProvider;
import org.opentripplanner.routing.algorithm.raptoradapter.transit.mappers.TransitLayerUpdater;
import org.opentripplanner.transit.model.framework.DataValidationException;
import org.opentripplanner.transit.model.framework.Result;
import org.opentripplanner.transit.model.network.TripPattern;
Expand All @@ -32,6 +28,7 @@
import org.opentripplanner.updater.spi.UpdateError;
import org.opentripplanner.updater.spi.UpdateResult;
import org.opentripplanner.updater.spi.UpdateSuccess;
import org.opentripplanner.updater.trip.AbstractTimetableSnapshotSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import uk.org.siri.siri20.EstimatedTimetableDeliveryStructure;
Expand All @@ -42,20 +39,10 @@
* necessary to provide planning threads a consistent constant view of a graph with real-time data at
* a specific point in time.
*/
public class SiriTimetableSnapshotSource implements TimetableSnapshotProvider {
public class SiriTimetableSnapshotSource extends AbstractTimetableSnapshotSource {

private static final Logger LOG = LoggerFactory.getLogger(SiriTimetableSnapshotSource.class);

/**
* The working copy of the timetable snapshot. Should not be visible to routing threads. Should
* only be modified by a thread that holds a lock on {@link #bufferLock}. All public methods that
* might modify this buffer will correctly acquire the lock.
*/
private final TimetableSnapshot buffer = new TimetableSnapshot();
/**
* Lock to indicate that buffer is in use
*/
private final ReentrantLock bufferLock = new ReentrantLock(true);
/**
* Use a id generator to generate TripPattern ids for new TripPatterns created by RealTime
* updates.
Expand All @@ -69,66 +56,22 @@ public class SiriTimetableSnapshotSource implements TimetableSnapshotProvider {
private final TransitModel transitModel;

private final TransitService transitService;
private final TransitLayerUpdater transitLayerUpdater;

/**
* If a timetable snapshot is requested less than this number of milliseconds after the previous
* snapshot, just return the same one. Throttles the potentially resource-consuming task of
* duplicating a TripPattern -> Timetable map and indexing the new Timetables.
*/
protected CountdownTimer snapshotFrequencyThrottle;

/**
* The last committed snapshot that was handed off to a routing thread. This snapshot may be given
* to more than one routing thread if the maximum snapshot frequency is exceeded.
*/
private volatile TimetableSnapshot snapshot = null;

/** Should expired real-time data be purged from the graph. */
private final boolean purgeExpiredData;

protected LocalDate lastPurgeDate = null;

public SiriTimetableSnapshotSource(
TimetableSnapshotSourceParameters parameters,
TransitModel transitModel
) {
super(
transitModel.getTransitLayerUpdater(),
parameters,
() -> LocalDate.now(transitModel.getTimeZone())
);
this.transitModel = transitModel;
this.transitService = new DefaultTransitService(transitModel);
this.transitLayerUpdater = transitModel.getTransitLayerUpdater();
this.snapshotFrequencyThrottle = new CountdownTimer(parameters.maxSnapshotFrequency());
this.purgeExpiredData = parameters.purgeExpiredData();
this.tripPatternCache =
new SiriTripPatternCache(tripPatternIdGenerator, transitService::getPatternForTrip);

transitModel.initTimetableSnapshotProvider(this);

// Force commit so that snapshot initializes
commitTimetableSnapshot(true);
}

/**
* @return an up-to-date snapshot mapping TripPatterns to Timetables. This snapshot and the
* timetable objects it references are guaranteed to never change, so the requesting thread is
* provided a consistent view of all TripTimes. The routing thread need only release its reference
* to the snapshot to release resources.
*/
public TimetableSnapshot getTimetableSnapshot() {
TimetableSnapshot snapshotToReturn;

// Try to get a lock on the buffer
if (bufferLock.tryLock()) {
// Make a new snapshot if necessary
try {
commitTimetableSnapshot(false);
return snapshot;
} finally {
bufferLock.unlock();
}
}
// No lock could be obtained because there is either a snapshot commit busy or updates
// are applied at this moment, just return the current snapshot
return snapshot;
}

/**
Expand All @@ -152,12 +95,9 @@ public UpdateResult applyEstimatedTimetable(
return UpdateResult.empty();
}

// Acquire lock on buffer
bufferLock.lock();

List<Result<UpdateSuccess, UpdateError>> results = new ArrayList<>();

try {
withLock(() -> {
if (fullDataset) {
// Remove all updates from the buffer
buffer.clear(feedId);
Expand All @@ -175,19 +115,9 @@ public UpdateResult applyEstimatedTimetable(

LOG.debug("message contains {} trip updates", updates.size());

// Make a snapshot after each message in anticipation of incoming requests
// Purge data if necessary (and force new snapshot if anything was purged)
// Make sure that the public (locking) getTimetableSnapshot function is not called.
if (purgeExpiredData) {
final boolean modified = purgeExpiredData();
commitTimetableSnapshot(modified);
} else {
commitTimetableSnapshot(false);
}
} finally {
// Always release lock
bufferLock.unlock();
}
purgeAndCommit();
});

return UpdateResult.ofResults(results);
}

Expand Down Expand Up @@ -249,31 +179,13 @@ private boolean shouldAddNewTrip(
return entityResolver.resolveTrip(vehicleJourney) == null;
}

private void commitTimetableSnapshot(final boolean force) {
if (force || snapshotFrequencyThrottle.timeIsUp()) {
if (force || buffer.isDirty()) {
LOG.debug("Committing {}", buffer);
snapshot = buffer.commit(transitLayerUpdater, force);

// We only reset the timer when the snapshot is updated. This will cause the first
// update to be committed after a silent period. This should not have any effect in
// a busy updater. It is however useful when manually testing the updater.
snapshotFrequencyThrottle.restart();
} else {
LOG.debug("Buffer was unchanged, keeping old snapshot.");
}
} else {
LOG.debug("Snapshot frequency exceeded. Reusing snapshot {}", snapshot);
}
}

/**
* Get the latest timetable for TripPattern for a given service date.
* <p>
* Snapshot timetable is used as source if initialised, trip patterns scheduled timetable if not.
*/
private Timetable getCurrentTimetable(TripPattern tripPattern, LocalDate serviceDate) {
TimetableSnapshot timetableSnapshot = snapshot;
TimetableSnapshot timetableSnapshot = getTimetableSnapshot();
if (timetableSnapshot != null) {
return timetableSnapshot.resolve(tripPattern, serviceDate);
}
Expand Down Expand Up @@ -429,19 +341,4 @@ private boolean removePreviousRealtimeUpdate(final Trip trip, final LocalDate se

return success;
}

private boolean purgeExpiredData() {
final LocalDate today = LocalDate.now(transitModel.getTimeZone());
final LocalDate previously = today.minusDays(2); // Just to be safe...

if (lastPurgeDate != null && lastPurgeDate.compareTo(previously) > 0) {
return false;
}

LOG.debug("purging expired real-time data");

lastPurgeDate = previously;

return buffer.purgeExpiredData(previously);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,13 @@ public void setPatternsForStop(SetMultimap<StopLocation, TripPattern> patternsFo
this.patternsForStop = patternsForStop;
}

/**
* Does this snapshot contain any realtime data or is it completely empty?
*/
public boolean isEmpty() {
return dirtyTimetables.isEmpty() && timetables.isEmpty() && realtimeAddedTripPattern.isEmpty();
}

/**
* Clear timetable for all patterns matching the provided feed id.
*
Expand Down
Loading
Loading