Skip to content

Commit

Permalink
Merge pull request #132 from jmfee-usgs/timeout-updates
Browse files Browse the repository at this point in the history
Timeout updates
  • Loading branch information
emartinez-usgs committed Dec 3, 2020
2 parents aaa2986 + 09987db commit 6090679
Show file tree
Hide file tree
Showing 19 changed files with 1,122 additions and 1,379 deletions.
4 changes: 2 additions & 2 deletions code.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"name": "Product Distribution Layer",
"organization": "U.S. Geological Survey",
"description": "Distribution system used for derived earthquake information",
"version": "v2.6.0",
"version": "v2.7.0",
"status": "Production",
"permissions": {
"usageType": "openSource",
Expand All @@ -27,7 +27,7 @@
"email": "[email protected]"
},
"date": {
"metadataLastUpdated": "2020-11-20"
"metadataLastUpdated": "2020-12-03"
}
}
]
77 changes: 72 additions & 5 deletions src/main/java/gov/usgs/earthquake/aws/AwsProductSender.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ public class AwsProductSender extends DefaultConfigurable implements ProductSend
// whether to sign products
protected boolean signProducts = false;

// 5s seems excessive, but be cautious for now
protected int connectTimeout = 5000;
// this corresponds to server-side timeout
// read timeout applies once getInputStream().read() is called
protected int readTimeout = 30000;

public AwsProductSender() {}

@Override
Expand Down Expand Up @@ -110,22 +116,73 @@ public void sendProduct(final Product product) throws Exception {
) {
LOGGER.fine("Getting upload urls for " + json.toString());
// get upload urls, response is product with signed content urls for upload
Product uploadProduct = getUploadUrls(json);
Product uploadProduct;
try {
uploadProduct = getUploadUrls(json);
} catch (HttpException e) {
HttpURLConnection connection = e.response.connection;
// check for server error
if (connection.getResponseCode() >= 500) {
LOGGER.log(Level.FINE,
"[" + getName() + "] get upload urls exception, trying again", e);
// try again after random back off (1-5 s)
Thread.sleep(1000 + Math.round(4000 * Math.random()));
uploadProduct = getUploadUrls(json);
} else {
// otherwise propagate exception as usual
throw e;
}
}

final long afterGetUploadUrls = new Date().getTime();
LOGGER.fine("[" + getName() + "] get upload urls " + id.toString()
+ " (" + (afterGetUploadUrls - start) + " ms) ");

// upload contents
uploadContents(product, uploadProduct);
try {
uploadContents(product, uploadProduct);
} catch (HttpException e) {
HttpURLConnection connection = e.response.connection;
// check for S3 "503 Slow Down" error
if (
503 == connection.getResponseCode()
&& "Slow Down".equals(connection.getResponseMessage())
) {
LOGGER.fine("[" + getName() + "] 503 slow down exception, trying again");
// try again after random back off (1-5 s)
Thread.sleep(1000 + Math.round(4000 * Math.random()));
uploadContents(product, uploadProduct);
} else {
// otherwise propagate exception as usual
throw e;
}
}

afterUploadContent = new Date().getTime();
LOGGER.fine("[" + getName() + "] upload contents " + id.toString()
+ " (" + (afterUploadContent - afterGetUploadUrls) + " ms) ");
} else {
afterUploadContent = new Date().getTime();
}

// send product
sendProduct(json);
try {
// send product
sendProduct(json);
} catch (HttpException e) {
HttpURLConnection connection = e.response.connection;
// check for server error
if (connection.getResponseCode() >= 500) {
LOGGER.log(Level.FINE,
"[" + getName() + "] send product exception, trying again", e);
// try again after random back off (1-5 s)
Thread.sleep(1000 + Math.round(4000 * Math.random()));
sendProduct(json);
} else {
// otherwise propagate exception as usual
throw e;
}
}

final long afterSendProduct = new Date().getTime();
LOGGER.fine("[" + getName() + "] send product " + id.toString()
+ " (" + (afterSendProduct - afterUploadContent) + " ms) ");
Expand Down Expand Up @@ -186,6 +243,8 @@ protected HttpResponse postProductJson(final URL url, final JsonObject product)
// send as attribute, for extensibility
final JsonObject json = Json.createObjectBuilder().add("product", product).build();
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
connection.setConnectTimeout(connectTimeout);
connection.setReadTimeout(readTimeout);
connection.setDoOutput(true);
connection.setRequestMethod("POST");
connection.setRequestProperty("Content-Type", "application/json");
Expand Down Expand Up @@ -249,6 +308,8 @@ protected HttpResponse uploadContent(final String path, final Content content, f
final long start = new Date().getTime();
final HttpURLConnection connection = (HttpURLConnection) signedUrl.openConnection();
connection.setDoOutput(true);
connection.setConnectTimeout(connectTimeout);
connection.setReadTimeout(readTimeout);
// these values are part of signed url and are required
connection.setRequestMethod("PUT");
connection.addRequestProperty("Content-Length", content.getLength().toString());
Expand Down Expand Up @@ -306,12 +367,18 @@ protected Map<String, HttpResponse> uploadContents(
.forEach(
path -> {
try {
Content uploadContent = uploadProduct.getContents().get(path);
if (!(uploadContent instanceof URLContent)) {
throw new IllegalStateException(
"Expected URLContent for " + product.getId().toString()
+ " path '" + path + "' but got " + uploadContent);
}
uploadResults.put(
path,
uploadContent(
path,
product.getContents().get(path),
((URLContent) uploadProduct.getContents().get(path)).getURL()));
((URLContent) uploadContent).getURL()));
} catch (Exception e) {
uploadExceptions.put(path, e);
}
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/gov/usgs/earthquake/aws/HttpException.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ class HttpException extends Exception {

public final HttpResponse response;

public HttpException(final HttpResponse response, final String cause) {
super(cause);
public HttpException(final HttpResponse response, final String message) {
super(message);
this.response = response;
}

Expand Down
11 changes: 10 additions & 1 deletion src/main/java/gov/usgs/earthquake/aws/JsonNotification.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,18 @@ public class JsonNotification extends URLNotification {
* Create a JsonNotification from an existing Product.
*/
JsonNotification(final Instant created, final Product product) throws Exception {
this(created, product, new Date(Instant.now().plusSeconds(7 * 86400).toEpochMilli()));
}

/**
* Create a JsonNotification with an expiration date.
*/
JsonNotification(final Instant created, final Product product, final Date expiration)
throws Exception {
super(
product.getId(),
// expiration date
new Date(created.plusSeconds(30 * 86400).toEpochMilli()),
expiration,
// no tracker
EMPTY_URL,
// store product as data url
Expand All @@ -68,4 +76,5 @@ public class JsonNotification extends URLNotification {
this.created = created;
this.product = product;
}

}
52 changes: 52 additions & 0 deletions src/main/java/gov/usgs/earthquake/aws/JsonNotificationIndex.java
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,58 @@ public synchronized List<Notification> findNotifications(ProductId id) throws Ex
return new ArrayList<Notification>();
}

/**
* This method is used to find notifications present in this index
* but not present in another JsonNotificationIndex table in the same
* database.
*
* This is used to optimize the queuing process at startup and returns
* DefaultNotifications. The receiver process will look up the actual
* notification object during processing.
*
* @param otherTable
* name of table in same database.
* @return
* list of notifications found in this indexes table, but not found in the
* other table.
* @throws Exception
*/
public synchronized List<Notification> getMissingNotifications(
final String otherTable) throws Exception {
// this is used to requeue a notification index.
// run query in a way that returns list of default notifications,
// (by returning empty created, data, and url)
// since full details are not needed during requeue
final String sql = "SELECT DISTINCT"
+ " '' as created, t.expires, t.source, t.type, t.code, t.updateTime"
+ ", '' as url, null as data"
+ " FROM " + this.table + " t"
+ " WHERE NOT EXISTS ("
+ "SELECT * FROM " + otherTable
+ " WHERE source=t.source AND type=t.type"
+ " AND code=t.code AND updatetime=t.updateTime"
+ ")";
// prepare statement
beginTransaction();
try (final PreparedStatement statement = getConnection().prepareStatement(sql)) {
try {
// execute and commit if successful
final List<Notification> notifications = getNotifications(statement);
commitTransaction();
return notifications;
} catch (SQLException e) {
LOGGER.log(Level.WARNING, "Exception finding notifications", e);
try {
// otherwise roll back
rollbackTransaction();
} catch (SQLException e2) {
// ignore
}
}
}
return new ArrayList<Notification>();
}

/**
* Parse notifications from a statement ready to be executed.
*/
Expand Down
53 changes: 30 additions & 23 deletions src/main/java/gov/usgs/earthquake/distribution/Command.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,36 +38,43 @@ public void execute() throws CommandTimeout, IOException,
final Process process = Runtime.getRuntime().exec(commandArray,
envp, dir);

Timer timer = new Timer();
final Timer timer;
if (timeout > 0) {
timer = new Timer();
timer.schedule(new TimerTask() {
public void run() {
process.destroy();
}
}, timeout);
} else {
timer = null;
}

OutputStream processStdin = process.getOutputStream();
if (stdin != null) {
StreamUtils.transferStream(stdin, processStdin);
}
StreamUtils.closeStream(processStdin);

outputTransfer = new StreamTransferThread(process.getInputStream(),
stdout);
outputTransfer.start();
errorTransfer = new StreamTransferThread(process.getErrorStream(),
stderr);
errorTransfer.start();

// now wait for process to complete
exitCode = process.waitFor();
if (exitCode == 143) {
throw new CommandTimeout();
try {
OutputStream processStdin = process.getOutputStream();
if (stdin != null) {
StreamUtils.transferStream(stdin, processStdin);
}
StreamUtils.closeStream(processStdin);

outputTransfer = new StreamTransferThread(process.getInputStream(),
stdout);
outputTransfer.start();
errorTransfer = new StreamTransferThread(process.getErrorStream(),
stderr);
errorTransfer.start();

// now wait for process to complete
exitCode = process.waitFor();
if (exitCode == 143) {
throw new CommandTimeout();
}
} finally {
// cancel destruction of process, if it hasn't already run
if (timer != null) {
timer.cancel();
}
}

// cancel destruction of process, if it hasn't already run
timer.cancel();
} finally {
try {
outputTransfer.interrupt();
Expand Down Expand Up @@ -136,12 +143,12 @@ public byte[] getStderr() {

/**
* Split a command string into a command array.
*
*
* This version uses a StringTokenizer to split arguments. Quoted arguments
* are supported (single or double), with quotes removed before passing to
* runtime. Double quoting arguments will preserve quotes when passing to
* runtime.
*
*
* @param command
* command to run.
* @return Array of arguments suitable for passing to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public class DefaultNotificationReceiver extends DefaultConfigurable implements

public static final String LISTENER_NOTIFIER_PROPERTY = "listenerNotifier";
public static final String EXECUTOR_LISTENER_NOTIFIER = "executor";
public static final String FUTURE_LISTENER_NOTIFIER = "future";
public static final String ROUNDROBIN_LISTENER_NOTIFIER = "roundrobin";

/** The notification index where received notifications are stored. */
Expand All @@ -119,7 +120,7 @@ public class DefaultNotificationReceiver extends DefaultConfigurable implements
private ObjectLock<ProductId> retrieveLocks = new ObjectLock<ProductId>();

public DefaultNotificationReceiver() {
notifier = new ExecutorListenerNotifier(this);
notifier = new FutureListenerNotifier(this);
}

/**
Expand Down Expand Up @@ -565,6 +566,8 @@ public void configure(Config config) throws Exception {
notifier = new ExecutorListenerNotifier(this);
LOGGER.config("[" + getName()
+ "] using executor listener notifier");
} else if (notifierType.equals(FUTURE_LISTENER_NOTIFIER)) {
notifier = new FutureListenerNotifier(this);
} else if (notifierType.equals(ROUNDROBIN_LISTENER_NOTIFIER)) {
notifier = new RoundRobinListenerNotifier(this);
LOGGER.config("[" + getName()
Expand Down
Loading

0 comments on commit 6090679

Please sign in to comment.