Skip to content

Commit

Permalink
Merge branch 'dev' into trip-sharing
Browse files Browse the repository at this point in the history
  • Loading branch information
binh-dam-ibigroup committed Dec 3, 2024
2 parents a7b2082 + c904f1a commit 604abb3
Show file tree
Hide file tree
Showing 27 changed files with 1,163 additions and 253 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ The special E2E client settings should be defined in `env.yml`:
| TRIP_INSTRUCTION_UPCOMING_RADIUS | integer | Optional | 10 | The radius in meters under which an upcoming instruction is given. |
| TRIP_SURVEY_ID | string | Optional | abcdef123y | The ID of a survey (on the platform of your choice) for trip-related feedback. |
| TRIP_SURVEY_SUBDOMAIN | string | Optional | abcabc12a | The subdomain of a website where the trip-related surveys are administered. |
| TRIP_SURVEY_API_TOKEN | string | Optional | abcdef123y | The token for the survey API for downloading responses. |
| TWILIO_ACCOUNT_SID | string | Optional | your-account-sid | Twilio settings available at: https://twilio.com/user/account |
| TRUSTED_COMPANION_CONFIRMATION_PAGE_URL | string | Optional | https://otp-server.example.com/trusted/confirmation | URL to the trusted companion confirmation page. This page should support handling an error URL parameter. |
| TWILIO_AUTH_TOKEN | string | Optional | your-auth-token | Twilio settings available at: https://twilio.com/user/account |
Expand Down
2 changes: 2 additions & 0 deletions configurations/default/env.yml.tmp
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ TRIP_INSTRUCTION_UPCOMING_RADIUS: 10
# Survey ID and domain that is offered after users complete certain trips.
TRIP_SURVEY_ID: abcdef123y
TRIP_SURVEY_SUBDOMAIN: abcabc12a
# Survey API credentials to download responses
TRIP_SURVEY_API_TOKEN: token-12345c

US_RIDE_GWINNETT_BUS_OPERATOR_NOTIFIER_API_URL: https://bus.notifier.example.com
US_RIDE_GWINNETT_BUS_OPERATOR_NOTIFIER_API_KEY: your-key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.opentripplanner.middleware.persistence.Persistence;
import org.opentripplanner.middleware.tripmonitor.jobs.MonitorAllTripsJob;
import org.opentripplanner.middleware.triptracker.TripSurveySenderJob;
import org.opentripplanner.middleware.triptracker.TripSurveyUploadJob;
import org.opentripplanner.middleware.utils.ConfigUtils;
import org.opentripplanner.middleware.utils.HttpUtils;
import org.opentripplanner.middleware.utils.Scheduler;
Expand Down Expand Up @@ -54,7 +55,7 @@ public class OtpMiddlewareMain {
public static final String API_PREFIX = "/api/";
public static boolean inTestEnvironment = false;

public static void main(String[] args) throws IOException, InterruptedException {
public static void main(String[] args) throws IOException {
// Load configuration.
ConfigUtils.loadConfig(args);

Expand All @@ -77,8 +78,9 @@ public static void main(String[] args) throws IOException, InterruptedException
// Schedule trip history uploads.
ConnectedDataManager.scheduleTripHistoryUploadJob();

// Schedule recurring Monitor All Trips Job.
// Schedule recurring jobs.
// TODO: Determine whether this should go in some other process.

MonitorAllTripsJob monitorAllTripsJob = new MonitorAllTripsJob();
Scheduler.scheduleJob(
monitorAllTripsJob,
Expand All @@ -87,19 +89,28 @@ public static void main(String[] args) throws IOException, InterruptedException
TimeUnit.MINUTES
);

// Schedule recurring job for post-trip surveys, once every half-hour to catch recently completed trips.
// TODO: Determine whether this should go in some other process.
TripSurveySenderJob tripSurveySenderJob = new TripSurveySenderJob();
Scheduler.scheduleJob(
tripSurveySenderJob,
0,
30,
TimeUnit.MINUTES
);

if (TripSurveyUploadJob.isConfigured()) {
LOG.info("Scheduling trip survey upload every day");
TripSurveyUploadJob tripSurveyUploadJob = new TripSurveyUploadJob();
Scheduler.scheduleJob(
tripSurveyUploadJob,
0,
1,
TimeUnit.DAYS
);
}
}
}

private static void initializeHttpEndpoints() throws IOException, InterruptedException {
private static void initializeHttpEndpoints() throws IOException {
// Must start spark explicitly to use spark-swagger.
// https://github.com/manusant/spark-swagger#endpoints-binding
Service spark = Service.ignite().port(Service.SPARK_DEFAULT_PORT);
Expand All @@ -120,7 +131,7 @@ private static void initializeHttpEndpoints() throws IOException, InterruptedExc
new CDPFilesController(API_PREFIX),
new OtpRequestProcessor("/otp", OtpVersion.OTP2),
new OtpRequestProcessor("/otp2", OtpVersion.OTP2)
// TODO Add other models.
// Add other endpoints as needed.
))
// Spark-swagger auto-generates a swagger document at localhost:4567/doc.yaml.
// (That path is not configurable.)
Expand All @@ -138,7 +149,7 @@ private static void initializeHttpEndpoints() throws IOException, InterruptedExc
return Files.readString(publicDocPath);
});

/**
/*
* End point to receive project errors as soon as they are processed by Bugsnag. Information on Bugsnag's
* webhook can be found here: https://docs.bugsnag.com/product/integrations/data-forwarding/webhook/
*
Expand All @@ -153,7 +164,7 @@ private static void initializeHttpEndpoints() throws IOException, InterruptedExc
return "";
});

/**
/*
* End point to handle redirecting to the correct registration page from Auth0 as described here:
*
* https://auth0.com/docs/auth0-email-services/customize-email-templates#dynamic-redirect-to-urls
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.bson.conversions.Bson;
import org.opentripplanner.middleware.bugsnag.BugsnagReporter;
import org.opentripplanner.middleware.controllers.api.OtpRequestProcessor;
import org.opentripplanner.middleware.models.IntervalUpload;
import org.opentripplanner.middleware.models.TripHistoryUpload;
import org.opentripplanner.middleware.models.TripRequest;
import org.opentripplanner.middleware.models.TripSummary;
Expand All @@ -19,17 +20,14 @@
import org.opentripplanner.middleware.utils.DateTimeUtils;
import org.opentripplanner.middleware.utils.FileUtils;
import org.opentripplanner.middleware.utils.JsonUtils;
import org.opentripplanner.middleware.utils.S3Utils;
import org.opentripplanner.middleware.utils.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.time.DayOfWeek;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -133,7 +131,9 @@ public static void removeUsersTripHistory(String userId) {
}
// Get all hourly windows that have already been earmarked for uploading.
Set<LocalDateTime> incompleteUploadHours = new HashSet<>();
getIncompleteUploads().forEach(tripHistoryUpload -> incompleteUploadHours.add(tripHistoryUpload.uploadHour));
IntervalUpload
.getIncompleteUploads(Persistence.tripHistoryUploads)
.forEach(tripHistoryUpload -> incompleteUploadHours.add(tripHistoryUpload.uploadHour));
// Save all new hourly windows for uploading.
Set<LocalDateTime> newHourlyWindows = Sets.difference(userTripHourlyWindows, incompleteUploadHours);
TripHistoryUpload first = TripHistoryUpload.getFirst();
Expand Down Expand Up @@ -383,26 +383,25 @@ public static int compileAndUploadTripHistory(

// Not null because ReportedEntities only contains entries that correspond to persistenceMap.
TypedPersistence<?> typedPersistence = ReportedEntities.persistenceMap.get(entityName);
String filePrefix = getFilePrefix(reportingInterval, periodStart, entityName);
String tempFileFolder = FileUtils.getTempDirectory().getAbsolutePath();

String zipFileName = String.join(".", filePrefix, ZIP_FILE_EXTENSION);
String tempZipFile = String.join(File.separator, tempFileFolder, zipFileName);
String coreFileName = entityName;
boolean anonymize = isAnonymizedInterval(reportingMode);
boolean isTripRequest = "TripRequest".equals(entityName);
if (isTripRequest && anonymize) {
// Anonymized trip requests are stored under a special file name.
coreFileName = ANON_TRIP_FILE_NAME;
}

String jsonFileName = String.join(".", filePrefix, JSON_FILE_EXTENSION);
String tempDataFile = String.join(File.separator, tempFileFolder, jsonFileName);
IntervalUploadFiles uploadFiles = new IntervalUploadFiles(
getFilePrefix(reportingInterval, periodStart, coreFileName),
JSON_FILE_EXTENSION,
isTest
);

try {
try (uploadFiles) {
String tempDataFile = uploadFiles.getTempDataFile();
int recordsWritten = Integer.MIN_VALUE;

if ("TripRequest".equals(entityName)) {
// Anonymized trip requests are stored under a special file name.
boolean anonymize = isAnonymizedInterval(reportingMode);
if (anonymize) {
tempDataFile = tempDataFile.replace("TripRequest.json", ANON_TRIP_JSON_FILE_NAME);
tempZipFile = tempZipFile.replace("TripRequest.zip", ANON_TRIP_ZIP_FILE_NAME);
}

if (isTripRequest) {
// TripRequests must be processed separately because they must be combined, one per batchId.
// Note: Anonymized trips include TripRequest and TripSummary in the same entity.
recordsWritten = streamTripsToFile(tempDataFile, periodStart, reportingInterval, anonymize);
Expand All @@ -423,43 +422,19 @@ public static int compileAndUploadTripHistory(

if (recordsWritten > 0 || "true".equals(CONNECTED_DATA_PLATFORM_UPLOAD_BLANK_FILES)) {
// Upload the file if records were written or config setting requires uploading blank files.
FileUtils.addSingleFileToZip(tempDataFile, tempZipFile);
S3Utils.putObject(
CONNECTED_DATA_PLATFORM_S3_BUCKET_NAME,
String.format(
"%s/%s",
getUploadFolderName(
CONNECTED_DATA_PLATFORM_S3_FOLDER_NAME,
CONNECTED_DATA_PLATFORM_FOLDER_GROUPING,
periodStart.toLocalDate()
),
zipFileName
),
new File(tempZipFile)
);
uploadFiles.compressAndUpload(getUploadFolderName(
CONNECTED_DATA_PLATFORM_S3_FOLDER_NAME,
CONNECTED_DATA_PLATFORM_FOLDER_GROUPING,
periodStart.toLocalDate()
));
}
allRecordsWritten += recordsWritten;
} catch (Exception e) {
} catch (IOException e) {
BugsnagReporter.reportErrorToBugsnag(
String.format("Failed to process trip data for (%s)", periodStart),
String.format("Failed to write trip data for (%s)", periodStart),
e
);
return Integer.MIN_VALUE;
} finally {
// Delete the temporary files. This is done here in case the S3 upload fails.
try {
LOG.error("Deleting CDP zip file {} as an error occurred while processing the data it was supposed to contain.", tempZipFile);
FileUtils.deleteFile(tempDataFile);
if (!isTest) {
FileUtils.deleteFile(tempZipFile);
} else {
LOG.warn("In test mode, temp zip file {} not deleted. This is expected to be deleted by the calling test.",
tempZipFile
);
}
} catch (IOException e) {
LOG.error("Failed to delete temp files", e);
}
}
}

Expand All @@ -471,37 +446,25 @@ public static boolean isAnonymizedInterval(String reportingMode) {
return reportingModes.contains("interval") && reportingModes.contains("anonymized");
}

/**
* Get all incomplete trip history uploads.
*/
public static List<TripHistoryUpload> getIncompleteUploads() {
FindIterable<TripHistoryUpload> tripHistoryUploads = Persistence.tripHistoryUploads.getFiltered(
Filters.ne("status", TripHistoryUploadStatus.COMPLETED.getValue())
);
return tripHistoryUploads.into(new ArrayList<>());
public static String getDailyFileName(LocalDateTime date, String fileNameSuffix) {
return formatFileName(date, "yyyy-MM-dd", fileNameSuffix);
}

public static String getHourlyFileName(LocalDateTime date, String fileNameSuffix) {
final String DEFAULT_DATE_FORMAT_PATTERN = "yyyy-MM-dd-HH";
return String.format(
"%s-%s",
getStringFromDate(date, DEFAULT_DATE_FORMAT_PATTERN),
fileNameSuffix
);
return formatFileName(date, "yyyy-MM-dd-HH", fileNameSuffix);
}

/**
* Produce file name without path or extension.
*/
public static String getFilePrefix(ReportingInterval reportingInterval, LocalDateTime date, String entityName) {
final String DEFAULT_DATE_FORMAT_PATTERN = isReportingDaily(reportingInterval)
? "yyyy-MM-dd"
: "yyyy-MM-dd-HH";
return String.format(
"%s-%s",
getStringFromDate(date, DEFAULT_DATE_FORMAT_PATTERN),
entityName
);
return isReportingDaily(reportingInterval)
? getDailyFileName(date, entityName)
: getHourlyFileName(date, entityName);
}

public static String formatFileName(LocalDateTime date, String datePattern, String suffix) {
return String.format("%s-%s", getStringFromDate(date, datePattern), suffix);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package org.opentripplanner.middleware.connecteddataplatform;

import org.opentripplanner.middleware.bugsnag.BugsnagReporter;
import org.opentripplanner.middleware.utils.FileUtils;
import org.opentripplanner.middleware.utils.S3Exception;
import org.opentripplanner.middleware.utils.S3Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;

import static org.opentripplanner.middleware.connecteddataplatform.ConnectedDataManager.CONNECTED_DATA_PLATFORM_S3_BUCKET_NAME;
import static org.opentripplanner.middleware.connecteddataplatform.ConnectedDataManager.ZIP_FILE_EXTENSION;

/**
* Helper class for upload job file handling.
* When used in try-with-resource blocks, temp files will be automatically deleted.
*/
public class IntervalUploadFiles implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(IntervalUploadFiles.class);

private final boolean isTest;

private final String zipFileName;

private final String tempZipFile;

private final String tempDataFile;

public IntervalUploadFiles(String filePrefix, String coreExtension, boolean isTest) {
this.isTest = isTest;

zipFileName = String.join(".", filePrefix, ZIP_FILE_EXTENSION);
String tempFileFolder = FileUtils.getTempDirectory().getAbsolutePath();
tempZipFile = String.join(File.separator, tempFileFolder, zipFileName);
tempDataFile = String.join(File.separator, tempFileFolder, String.join(".", filePrefix, coreExtension));
}

public String getTempDataFile() {
return tempDataFile;
}

/**
* Compress and upload the data file.
*/
public void compressAndUpload(String folder) throws IOException {
FileUtils.addSingleFileToZip(tempDataFile, tempZipFile);
try {
S3Utils.putObject(
CONNECTED_DATA_PLATFORM_S3_BUCKET_NAME,
String.format("%s/%s", folder, zipFileName),
new File(tempZipFile)
);
} catch (S3Exception e) {
String message = String.format("Error uploading (%s) to S3", zipFileName);
LOG.error(message);
BugsnagReporter.reportErrorToBugsnag(message, e);
}
}

@Override
public void close() throws IOException {
// Delete the temporary files here, to cover S3 upload success or failure.
try {
LOG.info("Deleting zip file {}.", tempZipFile);
FileUtils.deleteFile(tempDataFile);
if (!isTest) {
FileUtils.deleteFile(tempZipFile);
} else {
LOG.warn("In test mode, temp zip file {} not deleted. This is expected to be deleted by the calling test.",
tempZipFile
);
}
} catch (IOException e) {
LOG.error("Failed to delete temp files", e);
throw e;
}
}
}
Loading

0 comments on commit 604abb3

Please sign in to comment.