From fc815397c40265eb5a4875fff674623ac4c8c90d Mon Sep 17 00:00:00 2001 From: Jeremy Fee Date: Mon, 30 Nov 2020 17:37:23 -0700 Subject: [PATCH 01/18] Use connect/read timeout in aws package, lower default connect timeout --- .../java/gov/usgs/earthquake/aws/AwsProductSender.java | 10 ++++++++++ src/main/java/gov/usgs/util/StreamUtils.java | 2 +- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/src/main/java/gov/usgs/earthquake/aws/AwsProductSender.java b/src/main/java/gov/usgs/earthquake/aws/AwsProductSender.java index 89c3ace4..4db4e834 100644 --- a/src/main/java/gov/usgs/earthquake/aws/AwsProductSender.java +++ b/src/main/java/gov/usgs/earthquake/aws/AwsProductSender.java @@ -47,6 +47,12 @@ public class AwsProductSender extends DefaultConfigurable implements ProductSend // whether to sign products protected boolean signProducts = false; + // 5s seems excessive, but be cautious for now + protected int connectTimeout = 5000; + // this corresponds to server-side timeout + // read timeout applies once getInputStream().read() is called + protected int readTimeout = 30000; + public AwsProductSender() {} @Override @@ -186,6 +192,8 @@ protected HttpResponse postProductJson(final URL url, final JsonObject product) // send as attribute, for extensibility final JsonObject json = Json.createObjectBuilder().add("product", product).build(); HttpURLConnection connection = (HttpURLConnection) url.openConnection(); + connection.setConnectTimeout(connectTimeout); + connection.setReadTimeout(readTimeout); connection.setDoOutput(true); connection.setRequestMethod("POST"); connection.setRequestProperty("Content-Type", "application/json"); @@ -249,6 +257,8 @@ protected HttpResponse uploadContent(final String path, final Content content, f final long start = new Date().getTime(); final HttpURLConnection connection = (HttpURLConnection) signedUrl.openConnection(); connection.setDoOutput(true); + connection.setConnectTimeout(connectTimeout); + connection.setReadTimeout(readTimeout); // these values are part of signed url and are required connection.setRequestMethod("PUT"); connection.addRequestProperty("Content-Length", content.getLength().toString()); diff --git a/src/main/java/gov/usgs/util/StreamUtils.java b/src/main/java/gov/usgs/util/StreamUtils.java index 6f81b1b6..88112f22 100644 --- a/src/main/java/gov/usgs/util/StreamUtils.java +++ b/src/main/java/gov/usgs/util/StreamUtils.java @@ -31,7 +31,7 @@ public class StreamUtils { public static final int DEFAULT_BUFFER_SIZE = 4096; /** Default connect timeout for url connections. */ - public static final int DEFAULT_URL_CONNECT_TIMEOUT = 15000; + public static final int DEFAULT_URL_CONNECT_TIMEOUT = 5000; /** Default read timeout for url connections. */ public static final int DEFAULT_URL_READ_TIMEOUT = 15000; From caadf937a087121c57d22729dcca32f0a12c514b Mon Sep 17 00:00:00 2001 From: Jeremy Fee Date: Mon, 30 Nov 2020 17:54:23 -0700 Subject: [PATCH 02/18] Make date configurable, update default Json expiration --- .../gov/usgs/earthquake/aws/AwsProductReceiver.java | 2 ++ .../gov/usgs/earthquake/aws/JsonNotification.java | 11 ++++++++++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/src/main/java/gov/usgs/earthquake/aws/AwsProductReceiver.java b/src/main/java/gov/usgs/earthquake/aws/AwsProductReceiver.java index 38fb1029..1d2d94ca 100644 --- a/src/main/java/gov/usgs/earthquake/aws/AwsProductReceiver.java +++ b/src/main/java/gov/usgs/earthquake/aws/AwsProductReceiver.java @@ -48,6 +48,8 @@ public class AwsProductReceiver extends DefaultNotificationReceiver implements W private String trackingFileName; private int attempts; private long timeout; + // default expiration for notifications + private long expirationAge = 7 * 86400 * 1000; private TrackingIndex trackingIndex; private WebSocketClient client; diff --git a/src/main/java/gov/usgs/earthquake/aws/JsonNotification.java b/src/main/java/gov/usgs/earthquake/aws/JsonNotification.java index 7bfbfaac..3d712457 100644 --- a/src/main/java/gov/usgs/earthquake/aws/JsonNotification.java +++ b/src/main/java/gov/usgs/earthquake/aws/JsonNotification.java @@ -54,10 +54,18 @@ public class JsonNotification extends URLNotification { * Create a JsonNotification from an existing Product. */ JsonNotification(final Instant created, final Product product) throws Exception { + this(created, product, new Date(Instant.now().plusSeconds(7 * 86400).toEpochMilli())); + } + + /** + * Create a JsonNotification with an expiration date. + */ + JsonNotification(final Instant created, final Product product, final Date expiration) + throws Exception { super( product.getId(), // expiration date - new Date(created.plusSeconds(30 * 86400).toEpochMilli()), + expiration, // no tracker EMPTY_URL, // store product as data url @@ -68,4 +76,5 @@ public class JsonNotification extends URLNotification { this.created = created; this.product = product; } + } From 9cdba89a096cfc59cc654e0f2a19c57ae0f1b767 Mon Sep 17 00:00:00 2001 From: Jeremy Fee Date: Mon, 30 Nov 2020 17:58:07 -0700 Subject: [PATCH 03/18] Update version to 2.6.1 --- code.json | 4 ++-- .../java/gov/usgs/earthquake/distribution/ProductClient.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/code.json b/code.json index fa9a4b84..d6841545 100644 --- a/code.json +++ b/code.json @@ -3,7 +3,7 @@ "name": "Product Distribution Layer", "organization": "U.S. Geological Survey", "description": "Distribution system used for derived earthquake information", - "version": "v2.6.0", + "version": "v2.6.1", "status": "Production", "permissions": { "usageType": "openSource", @@ -27,7 +27,7 @@ "email": "jmfee@usgs.gov" }, "date": { - "metadataLastUpdated": "2020-11-20" + "metadataLastUpdated": "2020-11-30" } } ] diff --git a/src/main/java/gov/usgs/earthquake/distribution/ProductClient.java b/src/main/java/gov/usgs/earthquake/distribution/ProductClient.java index eeaee71f..d754f733 100644 --- a/src/main/java/gov/usgs/earthquake/distribution/ProductClient.java +++ b/src/main/java/gov/usgs/earthquake/distribution/ProductClient.java @@ -64,7 +64,7 @@ public class ProductClient extends DefaultConfigurable implements ProductClientMBean, Bootstrappable { /** The "release" version number. */ - public static final String RELEASE_VERSION = "Version 2.6.0 2020-11-20"; + public static final String RELEASE_VERSION = "Version 2.6.1 2020-11-30"; /** Property name used on products for current RELEASE_VERSION. */ public static final String PDL_CLIENT_VERSION_PROPERTY = "pdl-client-version"; From 6048e0fa10d296521d9f0eb66bc514b89ab8ac71 Mon Sep 17 00:00:00 2001 From: Jeremy Fee Date: Mon, 30 Nov 2020 21:33:14 -0700 Subject: [PATCH 04/18] Clean up timers, new future executor for listeners --- .../earthquake/aws/AwsProductReceiver.java | 2 - .../usgs/earthquake/distribution/Command.java | 53 ++--- .../DefaultNotificationReceiver.java | 5 +- .../ExecutorListenerNotifier.java | 6 +- .../distribution/FutureListenerNotifier.java | 82 ++++++++ .../indexer/ExternalIndexerListener.java | 18 +- .../gov/usgs/earthquake/indexer/Indexer.java | 17 +- .../java/gov/usgs/util/DirectoryPoller.java | 18 +- src/main/java/gov/usgs/util/ExecutorTask.java | 54 ++--- .../gov/usgs/util/FutureExecutorTask.java | 187 ++++++++++++++++++ .../java/gov/usgs/util/TimeoutProcess.java | 29 +-- 11 files changed, 385 insertions(+), 86 deletions(-) create mode 100644 src/main/java/gov/usgs/earthquake/distribution/FutureListenerNotifier.java create mode 100644 src/main/java/gov/usgs/util/FutureExecutorTask.java diff --git a/src/main/java/gov/usgs/earthquake/aws/AwsProductReceiver.java b/src/main/java/gov/usgs/earthquake/aws/AwsProductReceiver.java index 1d2d94ca..38fb1029 100644 --- a/src/main/java/gov/usgs/earthquake/aws/AwsProductReceiver.java +++ b/src/main/java/gov/usgs/earthquake/aws/AwsProductReceiver.java @@ -48,8 +48,6 @@ public class AwsProductReceiver extends DefaultNotificationReceiver implements W private String trackingFileName; private int attempts; private long timeout; - // default expiration for notifications - private long expirationAge = 7 * 86400 * 1000; private TrackingIndex trackingIndex; private WebSocketClient client; diff --git a/src/main/java/gov/usgs/earthquake/distribution/Command.java b/src/main/java/gov/usgs/earthquake/distribution/Command.java index a7e0ef42..72c52491 100644 --- a/src/main/java/gov/usgs/earthquake/distribution/Command.java +++ b/src/main/java/gov/usgs/earthquake/distribution/Command.java @@ -38,36 +38,43 @@ public void execute() throws CommandTimeout, IOException, final Process process = Runtime.getRuntime().exec(commandArray, envp, dir); - Timer timer = new Timer(); + final Timer timer; if (timeout > 0) { + timer = new Timer(); timer.schedule(new TimerTask() { public void run() { process.destroy(); } }, timeout); + } else { + timer = null; } - OutputStream processStdin = process.getOutputStream(); - if (stdin != null) { - StreamUtils.transferStream(stdin, processStdin); - } - StreamUtils.closeStream(processStdin); - - outputTransfer = new StreamTransferThread(process.getInputStream(), - stdout); - outputTransfer.start(); - errorTransfer = new StreamTransferThread(process.getErrorStream(), - stderr); - errorTransfer.start(); - - // now wait for process to complete - exitCode = process.waitFor(); - if (exitCode == 143) { - throw new CommandTimeout(); + try { + OutputStream processStdin = process.getOutputStream(); + if (stdin != null) { + StreamUtils.transferStream(stdin, processStdin); + } + StreamUtils.closeStream(processStdin); + + outputTransfer = new StreamTransferThread(process.getInputStream(), + stdout); + outputTransfer.start(); + errorTransfer = new StreamTransferThread(process.getErrorStream(), + stderr); + errorTransfer.start(); + + // now wait for process to complete + exitCode = process.waitFor(); + if (exitCode == 143) { + throw new CommandTimeout(); + } + } finally { + // cancel destruction of process, if it hasn't already run + if (timer != null) { + timer.cancel(); + } } - - // cancel destruction of process, if it hasn't already run - timer.cancel(); } finally { try { outputTransfer.interrupt(); @@ -136,12 +143,12 @@ public byte[] getStderr() { /** * Split a command string into a command array. - * + * * This version uses a StringTokenizer to split arguments. Quoted arguments * are supported (single or double), with quotes removed before passing to * runtime. Double quoting arguments will preserve quotes when passing to * runtime. - * + * * @param command * command to run. * @return Array of arguments suitable for passing to diff --git a/src/main/java/gov/usgs/earthquake/distribution/DefaultNotificationReceiver.java b/src/main/java/gov/usgs/earthquake/distribution/DefaultNotificationReceiver.java index e0717251..addf139a 100644 --- a/src/main/java/gov/usgs/earthquake/distribution/DefaultNotificationReceiver.java +++ b/src/main/java/gov/usgs/earthquake/distribution/DefaultNotificationReceiver.java @@ -93,6 +93,7 @@ public class DefaultNotificationReceiver extends DefaultConfigurable implements public static final String LISTENER_NOTIFIER_PROPERTY = "listenerNotifier"; public static final String EXECUTOR_LISTENER_NOTIFIER = "executor"; + public static final String FUTURE_LISTENER_NOTIFIER = "future"; public static final String ROUNDROBIN_LISTENER_NOTIFIER = "roundrobin"; /** The notification index where received notifications are stored. */ @@ -119,7 +120,7 @@ public class DefaultNotificationReceiver extends DefaultConfigurable implements private ObjectLock retrieveLocks = new ObjectLock(); public DefaultNotificationReceiver() { - notifier = new ExecutorListenerNotifier(this); + notifier = new FutureListenerNotifier(this); } /** @@ -565,6 +566,8 @@ public void configure(Config config) throws Exception { notifier = new ExecutorListenerNotifier(this); LOGGER.config("[" + getName() + "] using executor listener notifier"); + } else if (notifierType.equals(FUTURE_LISTENER_NOTIFIER)) { + notifier = new FutureListenerNotifier(this); } else if (notifierType.equals(ROUNDROBIN_LISTENER_NOTIFIER)) { notifier = new RoundRobinListenerNotifier(this); LOGGER.config("[" + getName() diff --git a/src/main/java/gov/usgs/earthquake/distribution/ExecutorListenerNotifier.java b/src/main/java/gov/usgs/earthquake/distribution/ExecutorListenerNotifier.java index 00cf9655..5387e70f 100644 --- a/src/main/java/gov/usgs/earthquake/distribution/ExecutorListenerNotifier.java +++ b/src/main/java/gov/usgs/earthquake/distribution/ExecutorListenerNotifier.java @@ -32,19 +32,19 @@ public class ExecutorListenerNotifier extends DefaultConfigurable implements * Notification listeners registered to receive notifications, and an * ExecutorService that delivers Notifications to each in a separate thread. */ - private Map notificationListeners = new HashMap(); + protected Map notificationListeners = new HashMap(); /** * Make sure listener will accept notification before queueing it for * processing. */ - private boolean acceptBeforeQueuing = true; + protected boolean acceptBeforeQueuing = true; /** * Timer used to retry tasks when they fail and listeners have configured * retryDelay. */ - private Timer retryTimer = new Timer(); + protected Timer retryTimer = new Timer(); public ExecutorListenerNotifier(final DefaultNotificationReceiver receiver) { this.receiver = receiver; diff --git a/src/main/java/gov/usgs/earthquake/distribution/FutureListenerNotifier.java b/src/main/java/gov/usgs/earthquake/distribution/FutureListenerNotifier.java new file mode 100644 index 00000000..21a48033 --- /dev/null +++ b/src/main/java/gov/usgs/earthquake/distribution/FutureListenerNotifier.java @@ -0,0 +1,82 @@ +package gov.usgs.earthquake.distribution; + +import gov.usgs.earthquake.product.AbstractListener; +import gov.usgs.util.FutureExecutorTask; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.logging.Logger; + +/** + * FutureListenerNotifier is similar to ExecutorListenerNotifier, but uses + * Futures with an ExecutorService to implement timeouts instead of Timers. + * + * backgroundService is an unbounded executor, but will execute only as many + * threads are allowed by listener executors since listener executors submit + * tasks to the backgroundService and wait on the future. + * + * This ends up being more efficient because the threads where jobs execute are + * cached, instead of a new Timer thread created for each task. + */ +public class FutureListenerNotifier extends ExecutorListenerNotifier { + + private static final Logger LOGGER = Logger + .getLogger(FutureListenerNotifier.class.getName()); + + /** Service where tasks execute using futures for timeouts. */ + private ExecutorService backgroundService; + + public FutureListenerNotifier(final DefaultNotificationReceiver receiver) { + super(receiver); + } + + @Override + protected void queueNotification(final NotificationListener listener, + final NotificationEvent event) { + if (acceptBeforeQueuing + && listener instanceof DefaultNotificationListener) { + DefaultNotificationListener defaultListener = (DefaultNotificationListener) listener; + if (!defaultListener.accept(event.getNotification().getProductId())) { + return; + } + } + + // determine retry delay + long retryDelay = 0L; + if (listener instanceof AbstractListener) { + retryDelay = ((AbstractListener) listener).getRetryDelay(); + } + + ExecutorService listenerExecutor = notificationListeners.get(listener); + FutureExecutorTask listenerTask = new FutureExecutorTask( + backgroundService, listenerExecutor, listener.getMaxTries(), + listener.getTimeout(), new NotificationListenerCallable( + listener, event), retryTimer, retryDelay); + listenerExecutor.submit(listenerTask); + + // log how many notifications are pending + if (listenerExecutor instanceof ThreadPoolExecutor) { + BlockingQueue pending = ((ThreadPoolExecutor) listenerExecutor) + .getQueue(); + LOGGER.fine("[" + event.getNotificationReceiver().getName() + + "] listener (" + listener.getName() + ") has " + + pending.size() + " queued notifications"); + } + } + + @Override + public void shutdown() throws Exception { + super.shutdown(); + backgroundService.shutdown(); + backgroundService = null; + } + + @Override + public void startup() throws Exception { + backgroundService = Executors.newCachedThreadPool(); + super.startup(); + } + +} diff --git a/src/main/java/gov/usgs/earthquake/indexer/ExternalIndexerListener.java b/src/main/java/gov/usgs/earthquake/indexer/ExternalIndexerListener.java index d7246f59..97dd719d 100644 --- a/src/main/java/gov/usgs/earthquake/indexer/ExternalIndexerListener.java +++ b/src/main/java/gov/usgs/earthquake/indexer/ExternalIndexerListener.java @@ -232,8 +232,9 @@ public void runProductCommand(final String command, final Product product) throw // Close the output stream StreamUtils.closeStream(process.getOutputStream()); - Timer commandTimer = new Timer(); + final Timer commandTimer; if (this.getTimeout() > 0) { + commandTimer = new Timer(); // Schedule process destruction for commandTimeout // milliseconds in the future commandTimer.schedule(new TimerTask() { @@ -244,12 +245,19 @@ public void run() { process.destroy(); } }, this.getTimeout()); + } else { + commandTimer = null; } - // Wait for process to complete - process.waitFor(); - // Cancel the timer if it was not triggered - commandTimer.cancel(); + try { + // Wait for process to complete + process.waitFor(); + } finally { + if (commandTimer != null) { + // Cancel the timer if it was not triggered + commandTimer.cancel(); + } + } LOGGER.info("[" + getName() + "] command '" + command + "' exited with status '" + process.exitValue() + "'"); if (process.exitValue() != 0) { diff --git a/src/main/java/gov/usgs/earthquake/indexer/Indexer.java b/src/main/java/gov/usgs/earthquake/indexer/Indexer.java index 7067e405..488d041f 100644 --- a/src/main/java/gov/usgs/earthquake/indexer/Indexer.java +++ b/src/main/java/gov/usgs/earthquake/indexer/Indexer.java @@ -15,7 +15,7 @@ import gov.usgs.earthquake.util.CompareUtil; import gov.usgs.util.Config; import gov.usgs.util.Configurable; -import gov.usgs.util.ExecutorTask; +import gov.usgs.util.FutureExecutorTask; import gov.usgs.util.StringUtils; import java.io.File; @@ -141,6 +141,12 @@ public class Indexer extends DefaultNotificationListener { /** Task for archive policy thread. */ private TimerTask archiveTask = null; + /** + * Service used by FutureExecutorTask for execution. + * See distribution.FutureListenerNotifier for more details. + */ + private ExecutorService backgroundService; + /** Whether to (false) or not (true) to run archive policies. */ private boolean disableArchive = false; @@ -325,6 +331,9 @@ public void removeListener(final IndexerListener toRemove) { // Shutdown executor thread listenerExecutor.shutdown(); } + + backgroundService.shutdown(); + backgroundService = null; } /** @@ -382,8 +391,8 @@ protected synchronized void notifyListeners(final IndexerEvent event) { while (it.hasNext()) { final IndexerListener listener = it.next(); ExecutorService listenerExecutor = listeners.get(listener); - ExecutorTask listenerTask = new ExecutorTask( - listenerExecutor, listener.getMaxTries(), + FutureExecutorTask listenerTask = new FutureExecutorTask( + backgroundService, listenerExecutor, listener.getMaxTries(), listener.getTimeout(), new IndexerListenerCallable(listener, event)); listenerExecutor.submit(listenerTask); @@ -1784,6 +1793,8 @@ public synchronized void startup() throws Exception { // -- Start up our own specific processes -- // + backgroundService = Executors.newCachedThreadPool(); + // -- Start dependent processes -- // // ExecutorServices tied to known listeners. Iterator iter = listeners.keySet().iterator(); diff --git a/src/main/java/gov/usgs/util/DirectoryPoller.java b/src/main/java/gov/usgs/util/DirectoryPoller.java index fb196bd7..0dea28f4 100644 --- a/src/main/java/gov/usgs/util/DirectoryPoller.java +++ b/src/main/java/gov/usgs/util/DirectoryPoller.java @@ -16,14 +16,14 @@ /** * Monitor a directory for files, notifying FileListenerInterfaces. - * + * * Implementers of the FileListenerInterface should process files before * returning, because these files may move or disappear. */ public class DirectoryPoller { /** Timer schedules polling frequency. */ - private Timer timer = new Timer(); + private Timer timer; /** Directory to watch. */ private final File pollDirectory; @@ -36,7 +36,7 @@ public class DirectoryPoller { /** * Create a DirectoryPoller. - * + * * @param pollDirectory * directory that is polled for new files. * @param storageDirectory @@ -73,11 +73,11 @@ public void removeFileListener(final FileListenerInterface listener) { /** * Start polling in a background thread. - * + * * Any previously scheduled polling is stopped before starting at this * frequency. This schedules using fixed-delay (time between complete polls) * as opposed to fixed-rate (how often to start polling). - * + * * @param frequencyInMilliseconds * how often to poll. */ @@ -104,9 +104,9 @@ public void stop() { /** * The Polling Task. Notifies all listeners then either deletes or moves the * file to storage. - * + * * @author jmfee - * + * */ protected class PollTask extends TimerTask { public void run() { @@ -123,7 +123,7 @@ public void run() { /** * Notify all listeners that files exist and need to be processed. - * + * * @param file */ public void notifyListeners(final File file) { @@ -142,7 +142,7 @@ public void notifyListeners(final File file) { * Move a file from polldir to storage directory. Attempts to move file into * storage directory. The file is not moved if no storage directory was * specified, or if the file no longer exists. - * + * * @param file * file to move. */ diff --git a/src/main/java/gov/usgs/util/ExecutorTask.java b/src/main/java/gov/usgs/util/ExecutorTask.java index 503e8a90..c6501fda 100644 --- a/src/main/java/gov/usgs/util/ExecutorTask.java +++ b/src/main/java/gov/usgs/util/ExecutorTask.java @@ -1,6 +1,6 @@ /* * ExecutorTask - * + * * $Id$ * $URL$ */ @@ -21,13 +21,13 @@ /** * A wrapper for Runnable or Callable objects for use with an ExecutorService. - * + * * Can be used to schedule interrupt based timeouts, multiple attempts, and * Future style exception tracking for Runnable or Callable objects. - * + * * @param return type for callable. */ -public final class ExecutorTask implements Future, Runnable { +public class ExecutorTask implements Future, Runnable { /** Logging object. */ private static final Logger LOGGER = Logger.getLogger(ExecutorTask.class @@ -43,49 +43,49 @@ public final class ExecutorTask implements Future, Runnable { public static final long DEFAULT_TIMEOUT = 0L; /** ExecutorService used to execute this task. */ - private ExecutorService service; + protected ExecutorService service; /** The callable to be called. */ - private Callable callable; + protected Callable callable; /** Timeout for task. */ - private long timeout = DEFAULT_TIMEOUT; + protected long timeout = DEFAULT_TIMEOUT; /** Number of tries to execute this task. */ - private int maxTries = DEFAULT_NUM_TRIES; + protected int maxTries = DEFAULT_NUM_TRIES; /** Number of milliseconds to wait before trying again. */ - private long retryDelay = DEFAULT_RETRY_DELAY; + protected long retryDelay = DEFAULT_RETRY_DELAY; /** Timer used to schedule retries, when they have a non-zero delay. */ - private Timer retryTimer; + protected Timer retryTimer; /** The future from the executor service. */ - private T result; + protected T result; /** List of exceptions thrown, up to maxTries in length. */ ArrayList exceptions; /** Whether this task is complete. */ - private Boolean done = false; + protected Boolean done = false; /** Whether this task has been canceled. */ - private Boolean cancelled = false; + protected Boolean cancelled = false; /** Number of tries used. */ - private int numTries = 0; + protected int numTries = 0; /** The thread where this is running, used to interrupt. */ - private Thread runThread = null; + protected Thread runThread = null; /** Name for this task. */ - private String name = null; + protected String name = null; - private final Object syncObject = new Object(); + protected final Object syncObject = new Object(); /** * Construct a new ExecutorTask - * + * * @param service * ExecutorService that this task will be submitted to. * @param maxTries @@ -106,7 +106,7 @@ public ExecutorTask(ExecutorService service, int maxTries, long timeout, /** * Wraps a runnable and result using the CallableRunnable class. - * + * * @see java.util.concurrent.Executors#callable(Runnable, Object) */ public ExecutorTask(ExecutorService service, int maxTries, long timeout, @@ -116,7 +116,7 @@ public ExecutorTask(ExecutorService service, int maxTries, long timeout, /** * Construct a new ExecutorTask - * + * * @param service * ExecutorService that this task will be submitted to. * @param maxTries @@ -223,7 +223,7 @@ public void run() { * Called when task is completed, either successfully, or unsuccessfully and * has no more tries */ - private void done() { + protected void done() { // done running, either successfully or because out of tries done = true; // notify anyone waiting for task to complete @@ -308,7 +308,7 @@ public T get(long timeout, TimeUnit unit) throws InterruptedException, /** * Number of tries used. - * + * * @return actual number of attempts. */ public int getNumTries() { @@ -317,7 +317,7 @@ public int getNumTries() { /** * Maximum number of tries before giving up. - * + * * @return maximum number of attempts. */ public int getMaxTries() { @@ -326,7 +326,7 @@ public int getMaxTries() { /** * Any exceptions thrown, during any execution attempt. - * + * * @return array of thrown exceptions. should contain no more than numTries * exceptions. */ @@ -336,7 +336,7 @@ public ArrayList getExceptions() { /** * The callable object that is/was called. - * + * * @return The callable object for this task. If this task was created using * a runnable, this was created using Executors.callable(Runnable). */ @@ -384,7 +384,7 @@ public void setRetryTimer(Timer retryTimer) { /** * Submit an ExecutorTask to an ExecutorService. - * + * * Used to defer resubmission of a task after it fails, but scheduling its * resubmission using a timer. */ @@ -395,7 +395,7 @@ private class SubmitTaskToExecutor extends TimerTask { /** * Construct a new SubmitTaskToExecutor instance. - * + * * @param task * the task to resubmit. */ diff --git a/src/main/java/gov/usgs/util/FutureExecutorTask.java b/src/main/java/gov/usgs/util/FutureExecutorTask.java new file mode 100644 index 00000000..9b23df42 --- /dev/null +++ b/src/main/java/gov/usgs/util/FutureExecutorTask.java @@ -0,0 +1,187 @@ +package gov.usgs.util; + +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * FutureExecutorTask overrides how timeouts are handled to use a + * separate executor service with Futures. + */ +public class FutureExecutorTask extends ExecutorTask { + + /** Logging object. */ + private static final Logger LOGGER = Logger.getLogger(FutureExecutorTask.class + .getName()); + + /** Default number of milliseconds to wait before a retry. */ + public static final long DEFAULT_RETRY_DELAY = 0L; + + /** Default number of tries to run this task. */ + public static final int DEFAULT_NUM_TRIES = 1; + + /** Default timeout for this task. */ + public static final long DEFAULT_TIMEOUT = 0L; + + /** ExecutorService used to execute callable. */ + protected ExecutorService backgroundService; + + /** + * Construct a new ExecutorTask + * + * @param service + * ExecutorService that this task will be submitted to. + * @param maxTries + * maximum number of tries callable can throw an exception or + * timeout before giving up. < 1 means never run. + * @param timeout + * number of milliseconds to allow callable to run before it is + * interrupted. <= 0 means never timeout. + * @param callable + * the callable to call. To work well, the callable should handle + * interrupts gracefully. + * @see InterruptedException + */ + public FutureExecutorTask(ExecutorService backgroundService, ExecutorService service, + int maxTries, long timeout, Callable callable) { + super(service, maxTries, timeout, callable, null, DEFAULT_RETRY_DELAY); + this.backgroundService = backgroundService; + } + + /** + * Wraps a runnable and result using the CallableRunnable class. + * + * @see java.util.concurrent.Executors#callable(Runnable, Object) + */ + public FutureExecutorTask(ExecutorService backgroundService, ExecutorService service, + int maxTries, long timeout, Runnable runnable, T result) { + super(service, maxTries, timeout, Executors.callable(runnable, result)); + this.backgroundService = backgroundService; + } + + /** + * Construct a new FutureExecutorTask + * + * @param service + * ExecutorService that this task will be submitted to. + * @param maxTries + * maximum number of tries callable can throw an exception or + * timeout before giving up. < 1 means never run. + * @param timeout + * number of milliseconds to allow callable to run before it is + * interrupted. <= 0 means never timeout. + * @param callable + * the callable to call. To work well, the callable should handle + * interrupts gracefully. + * @param retryTimer + * a timer used to schedule retries when retryDelay is non-zero. + * @param retryDelay + * the number of milliseconds to wait before retrying after an + * exception. + * @see InterruptedException + */ + public FutureExecutorTask(ExecutorService backgroundService, ExecutorService service, + int maxTries, long timeout, Callable callable, Timer retryTimer, + long retryDelay) { + super(service, maxTries, timeout, callable, retryTimer, retryDelay); + this.backgroundService = backgroundService; + } + + /** + * Run calls the callable, scheduling timeout interruption, catching + * exceptions, and potentially resubmitting to the executor service. + */ + @Override + public void run() { + try { + if (done || cancelled || numTries >= maxTries) { + // already done, cancelled, or out of attempts + return; + } + + // otherwise, + ++numTries; + + // signal that we are running + runThread = Thread.currentThread(); + + // use future to manage timeout + Future future = backgroundService.submit(this.callable); + try { + if (timeout > 0) { + result = future.get(timeout, TimeUnit.MILLISECONDS); + } else { + result = future.get(); + } + } catch (InterruptedException e) { + // interrupt the future too + future.cancel(true); + } + + // signal that we are done running + runThread = null; + + // computed without exceptions, done + done(); + } catch (Exception e) { + LOGGER.log(Level.INFO, "Exception executing task", e); + // signal that we are not running + runThread = null; + + // track this exception + exceptions.add(e); + + // try to resubmit + if (!cancelled && numTries < maxTries) { + LOGGER.info("Resubmitting task to executor " + numTries + "/" + + maxTries + " attempts"); + SubmitTaskToExecutor retryTask = new SubmitTaskToExecutor(this); + if (retryDelay <= 0L || retryTimer == null) { + retryTask.run(); + } else { + retryTimer.schedule(retryTask, retryDelay); + } + } else { + // cancelled or out of tries, done + done(); + } + } + } + + /** + * Submit a FutureExecutorTask to an ExecutorService. + * + * Used to defer resubmission of a task after it fails, but scheduling its + * resubmission using a timer. + */ + private class SubmitTaskToExecutor extends TimerTask { + + /** The task to resubmit. */ + private FutureExecutorTask task; + + /** + * Construct a new SubmitTaskToExecutor instance. + * + * @param task + * the task to resubmit. + */ + public SubmitTaskToExecutor(final FutureExecutorTask task) { + this.task = task; + } + + /** + * Submits the task to the executor. + */ + public void run() { + service.submit(task); + } + + } + +} diff --git a/src/main/java/gov/usgs/util/TimeoutProcess.java b/src/main/java/gov/usgs/util/TimeoutProcess.java index 87d009ef..2dd2b39b 100644 --- a/src/main/java/gov/usgs/util/TimeoutProcess.java +++ b/src/main/java/gov/usgs/util/TimeoutProcess.java @@ -1,6 +1,6 @@ /* * TimeoutProcess - * + * * $Id$ * $URL$ */ @@ -13,10 +13,10 @@ /** * TimeoutProcess wraps a Process object. - * + * * It is most commonly used with TimeoutProcessBuilder, which configures the * process timeout (and sets the timed out state once the timeout is reached). - * + * * @see java.lang.Process * @see TimeoutProcessBuilder * @see ProcessTimeoutException @@ -37,7 +37,7 @@ public class TimeoutProcess { /** * Construct a new TimeoutProcess. - * + * * @param process * the wrapped process. */ @@ -72,22 +72,25 @@ public OutputStream getOutputStream() { /** * Wait for the process to complete, either normally or because its timeout * was reached. - * + * * @return exitStatus. * @throws InterruptedException * @throws ProcessTimeoutException * if the process timed out before exiting. */ public int waitFor() throws InterruptedException, IOException, ProcessTimeoutException { - int status = process.waitFor(); - - if (timeoutElapsed()) { - throw new ProcessTimeoutException("The process has timed out."); - } + int status = -1; + try { + status = process.waitFor(); - if (timer != null) { - // the timer hasn't destroyed this process already, cancel it. - timer.cancel(); + if (timeoutElapsed()) { + throw new ProcessTimeoutException("The process has timed out."); + } + } finally { + if (timer != null) { + // the timer hasn't destroyed this process already, cancel it. + timer.cancel(); + } } try { From 4e6b66d07830eed8880a5eb463d5dd7b968738bb Mon Sep 17 00:00:00 2001 From: Jeremy Fee Date: Mon, 30 Nov 2020 22:10:10 -0700 Subject: [PATCH 05/18] Update future exception handling to interrupt task --- src/main/java/gov/usgs/util/FutureExecutorTask.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/gov/usgs/util/FutureExecutorTask.java b/src/main/java/gov/usgs/util/FutureExecutorTask.java index 9b23df42..08f0acce 100644 --- a/src/main/java/gov/usgs/util/FutureExecutorTask.java +++ b/src/main/java/gov/usgs/util/FutureExecutorTask.java @@ -119,8 +119,8 @@ public void run() { } else { result = future.get(); } - } catch (InterruptedException e) { - // interrupt the future too + } finally { + // cancel whether successful (noop) or exception (interrupt callable) future.cancel(true); } From 458cb65bd1922dd776788e0218007d23645312a7 Mon Sep 17 00:00:00 2001 From: Jeremy Fee Date: Mon, 30 Nov 2020 23:19:07 -0700 Subject: [PATCH 06/18] Update Exception handling from future --- src/main/java/gov/usgs/util/FutureExecutorTask.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/main/java/gov/usgs/util/FutureExecutorTask.java b/src/main/java/gov/usgs/util/FutureExecutorTask.java index 08f0acce..8ba6d113 100644 --- a/src/main/java/gov/usgs/util/FutureExecutorTask.java +++ b/src/main/java/gov/usgs/util/FutureExecutorTask.java @@ -3,6 +3,7 @@ import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -99,6 +100,7 @@ public FutureExecutorTask(ExecutorService backgroundService, ExecutorService ser */ @Override public void run() { + Future future = null; try { if (done || cancelled || numTries >= maxTries) { // already done, cancelled, or out of attempts @@ -112,7 +114,7 @@ public void run() { runThread = Thread.currentThread(); // use future to manage timeout - Future future = backgroundService.submit(this.callable); + future = backgroundService.submit(this.callable); try { if (timeout > 0) { result = future.get(timeout, TimeUnit.MILLISECONDS); @@ -130,6 +132,13 @@ public void run() { // computed without exceptions, done done(); } catch (Exception e) { + if (e instanceof ExecutionException) { + // unpack cause + Throwable cause = e.getCause(); + if (cause != null && cause instanceof Exception) { + e = (Exception) e.getCause(); + } + } LOGGER.log(Level.INFO, "Exception executing task", e); // signal that we are not running runThread = null; From 828bd5d8c3e92d5fbb5162933a06d91677a59b25 Mon Sep 17 00:00:00 2001 From: Jeremy Fee Date: Tue, 1 Dec 2020 09:32:21 -0700 Subject: [PATCH 07/18] Try to speed up queueing process --- .../earthquake/aws/JsonNotificationIndex.java | 52 +++++++++++++++++++ .../ExecutorListenerNotifier.java | 30 +++++++++-- .../NotificationListenerCallable.java | 3 +- 3 files changed, 81 insertions(+), 4 deletions(-) diff --git a/src/main/java/gov/usgs/earthquake/aws/JsonNotificationIndex.java b/src/main/java/gov/usgs/earthquake/aws/JsonNotificationIndex.java index b8a30a5d..2bfd5afa 100644 --- a/src/main/java/gov/usgs/earthquake/aws/JsonNotificationIndex.java +++ b/src/main/java/gov/usgs/earthquake/aws/JsonNotificationIndex.java @@ -539,6 +539,58 @@ public synchronized List findNotifications(ProductId id) throws Ex return new ArrayList(); } + /** + * This method is used to find notifications present in this index + * but not present in another JsonNotificationIndex table in the same + * database. + * + * This is used to optimize the queuing process at startup and returns + * DefaultNotifications. The receiver process will look up the actual + * notification object during processing. + * + * @param otherTable + * name of table in same database. + * @return + * list of notifications found in this indexes table, but not found in the + * other table. + * @throws Exception + */ + public synchronized List getMissingNotifications( + final String otherTable) throws Exception { + // this is used to requeue a notification index. + // run query in a way that returns list of default notifications, + // (by returning empty created, data, and url) + // since full details are not needed during requeue + final String sql = "SELECT DISTINCT" + + " '' as created, t.expires, t.source, t.type, t.code, t.updateTime" + + ", '' as url, null as data" + + " FROM " + this.table + " t" + + " WHERE NOT EXISTS (" + + "SELECT * FROM " + otherTable + + " WHERE source=t.source AND type=t.type" + + " AND code=t.code AND updatetime=t.updateTime" + + ")"; + // prepare statement + beginTransaction(); + try (final PreparedStatement statement = getConnection().prepareStatement(sql)) { + try { + // execute and commit if successful + final List notifications = getNotifications(statement); + commitTransaction(); + return notifications; + } catch (SQLException e) { + LOGGER.log(Level.WARNING, "Exception finding notifications", e); + try { + // otherwise roll back + rollbackTransaction(); + } catch (SQLException e2) { + // ignore + } + } + } + return new ArrayList(); + } + /** * Parse notifications from a statement ready to be executed. */ diff --git a/src/main/java/gov/usgs/earthquake/distribution/ExecutorListenerNotifier.java b/src/main/java/gov/usgs/earthquake/distribution/ExecutorListenerNotifier.java index 5387e70f..245db2c5 100644 --- a/src/main/java/gov/usgs/earthquake/distribution/ExecutorListenerNotifier.java +++ b/src/main/java/gov/usgs/earthquake/distribution/ExecutorListenerNotifier.java @@ -1,5 +1,6 @@ package gov.usgs.earthquake.distribution; +import gov.usgs.earthquake.aws.JsonNotificationIndex; import gov.usgs.earthquake.product.AbstractListener; import gov.usgs.util.DefaultConfigurable; import gov.usgs.util.ExecutorTask; @@ -16,6 +17,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor; +import java.util.logging.Level; import java.util.logging.Logger; public class ExecutorListenerNotifier extends DefaultConfigurable implements @@ -204,9 +206,31 @@ public void startup() throws Exception { LOGGER.info("[" + receiver.getName() + "] requeueing notification index '" + index.getName() + "'"); // find all existing notifications - Iterator allNotifications = index.findNotifications( - (List) null, (List) null, (List) null) - .iterator(); + Iterator allNotifications = null; + + // for json index, push intersection into database if only one listener + if (index instanceof JsonNotificationIndex && gracefulListeners.size() == 1) { + NotificationIndex listenerIndex = + ((DefaultNotificationListener) gracefulListeners.get(0)) + .getNotificationIndex(); + if (listenerIndex instanceof JsonNotificationIndex) { + // get intersection + try { + allNotifications = + ((JsonNotificationIndex) index).getMissingNotifications( + ((JsonNotificationIndex) listenerIndex).getTable()).iterator(); + } catch (Exception e) { + LOGGER.log(Level.INFO, "Exception loading intersection, continuing", e); + } + } + } + + if (allNotifications == null) { + // fallback to previous behavior + allNotifications = index.findNotifications( + (List) null, (List) null, (List) null) + .iterator(); + } LOGGER.info("Done finding existing notifications"); // queue them for processing in case they were previous missed diff --git a/src/main/java/gov/usgs/earthquake/distribution/NotificationListenerCallable.java b/src/main/java/gov/usgs/earthquake/distribution/NotificationListenerCallable.java index 0847a58b..cfd97189 100644 --- a/src/main/java/gov/usgs/earthquake/distribution/NotificationListenerCallable.java +++ b/src/main/java/gov/usgs/earthquake/distribution/NotificationListenerCallable.java @@ -17,7 +17,7 @@ public class NotificationListenerCallable implements Callable { /** * Create an ExecutorListenerNotifierCallable. - * + * * @param listener * the listener to notify * @param event @@ -34,6 +34,7 @@ public Void call() throws Exception { listener.onNotification(event); return null; } catch (Exception e) { + e.printStackTrace(); LOGGER.log(Level.WARNING, "[" + event.getNotificationReceiver().getName() + "] listener (" + listener.getName() From 21cc99925687295cb0bc36a32ad015bf926ce4d1 Mon Sep 17 00:00:00 2001 From: Jeremy Fee Date: Tue, 1 Dec 2020 13:22:35 -0700 Subject: [PATCH 08/18] Add retry for 503 slow down, url content check to uploads --- .../usgs/earthquake/aws/AwsProductSender.java | 28 +++++++++++++++++-- .../usgs/earthquake/aws/HttpException.java | 4 +-- .../NotificationListenerCallable.java | 1 - 3 files changed, 28 insertions(+), 5 deletions(-) diff --git a/src/main/java/gov/usgs/earthquake/aws/AwsProductSender.java b/src/main/java/gov/usgs/earthquake/aws/AwsProductSender.java index 4db4e834..f7300eba 100644 --- a/src/main/java/gov/usgs/earthquake/aws/AwsProductSender.java +++ b/src/main/java/gov/usgs/earthquake/aws/AwsProductSender.java @@ -122,7 +122,25 @@ public void sendProduct(final Product product) throws Exception { + " (" + (afterGetUploadUrls - start) + " ms) "); // upload contents - uploadContents(product, uploadProduct); + try { + uploadContents(product, uploadProduct); + } catch (HttpException e) { + HttpURLConnection connection = e.response.connection; + // check for S3 "503 Slow Down" error + if ( + 503 == connection.getResponseCode() + && "Slow Down".equals(connection.getResponseMessage()) + ) { + LOGGER.fine("[" + getName() + "] 503 slow down exception, trying again"); + // try again after random back off (1-5 s) + Thread.sleep(1000 + Math.round(4000 * Math.random())); + uploadContents(product, uploadProduct); + } else { + // otherwise propagate exception as usual + throw e; + } + } + afterUploadContent = new Date().getTime(); LOGGER.fine("[" + getName() + "] upload contents " + id.toString() + " (" + (afterUploadContent - afterGetUploadUrls) + " ms) "); @@ -316,12 +334,18 @@ protected Map uploadContents( .forEach( path -> { try { + Content uploadContent = uploadProduct.getContents().get(path); + if (!(uploadContent instanceof URLContent)) { + throw new IllegalStateException( + "Expected URLContent for " + product.getId().toString() + + " path '" + path + "' but got " + uploadContent); + } uploadResults.put( path, uploadContent( path, product.getContents().get(path), - ((URLContent) uploadProduct.getContents().get(path)).getURL())); + ((URLContent) uploadContent).getURL())); } catch (Exception e) { uploadExceptions.put(path, e); } diff --git a/src/main/java/gov/usgs/earthquake/aws/HttpException.java b/src/main/java/gov/usgs/earthquake/aws/HttpException.java index bcd8de82..2d7e6202 100644 --- a/src/main/java/gov/usgs/earthquake/aws/HttpException.java +++ b/src/main/java/gov/usgs/earthquake/aws/HttpException.java @@ -10,8 +10,8 @@ class HttpException extends Exception { public final HttpResponse response; - public HttpException(final HttpResponse response, final String cause) { - super(cause); + public HttpException(final HttpResponse response, final String message) { + super(message); this.response = response; } diff --git a/src/main/java/gov/usgs/earthquake/distribution/NotificationListenerCallable.java b/src/main/java/gov/usgs/earthquake/distribution/NotificationListenerCallable.java index cfd97189..244af3b9 100644 --- a/src/main/java/gov/usgs/earthquake/distribution/NotificationListenerCallable.java +++ b/src/main/java/gov/usgs/earthquake/distribution/NotificationListenerCallable.java @@ -34,7 +34,6 @@ public Void call() throws Exception { listener.onNotification(event); return null; } catch (Exception e) { - e.printStackTrace(); LOGGER.log(Level.WARNING, "[" + event.getNotificationReceiver().getName() + "] listener (" + listener.getName() From c1c29434aa6611a45ef108858b7fcbfcac8a06ee Mon Sep 17 00:00:00 2001 From: Jeremy Fee Date: Tue, 1 Dec 2020 14:41:39 -0700 Subject: [PATCH 09/18] Update done method to setDone, to avoid confusion with done member variable --- src/main/java/gov/usgs/util/ExecutorTask.java | 8 ++++---- src/main/java/gov/usgs/util/FutureExecutorTask.java | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/main/java/gov/usgs/util/ExecutorTask.java b/src/main/java/gov/usgs/util/ExecutorTask.java index c6501fda..7a03dd5f 100644 --- a/src/main/java/gov/usgs/util/ExecutorTask.java +++ b/src/main/java/gov/usgs/util/ExecutorTask.java @@ -187,7 +187,7 @@ public void run() { runThread = null; // computed without exceptions, done - done(); + setDone(); // } } catch (Exception e) { LOGGER.log(Level.INFO, "Exception executing task", e); @@ -210,7 +210,7 @@ public void run() { } } else { // cancelled or out of tries, done - done(); + setDone(); } // } } finally { @@ -223,7 +223,7 @@ public void run() { * Called when task is completed, either successfully, or unsuccessfully and * has no more tries */ - protected void done() { + protected void setDone() { // done running, either successfully or because out of tries done = true; // notify anyone waiting for task to complete @@ -247,7 +247,7 @@ public boolean cancel(boolean mayInterruptIfRunning) { // thread may still be running, if it doesn't handle interrupts well, // but Future interface says we are done - done(); + setDone(); return cancelled; } diff --git a/src/main/java/gov/usgs/util/FutureExecutorTask.java b/src/main/java/gov/usgs/util/FutureExecutorTask.java index 8ba6d113..17dceda4 100644 --- a/src/main/java/gov/usgs/util/FutureExecutorTask.java +++ b/src/main/java/gov/usgs/util/FutureExecutorTask.java @@ -130,7 +130,7 @@ public void run() { runThread = null; // computed without exceptions, done - done(); + setDone(); } catch (Exception e) { if (e instanceof ExecutionException) { // unpack cause @@ -158,7 +158,7 @@ public void run() { } } else { // cancelled or out of tries, done - done(); + setDone(); } } } From 077feb537c163bdb994ee227416b875ed6795306 Mon Sep 17 00:00:00 2001 From: Jeremy Fee Date: Tue, 1 Dec 2020 14:52:21 -0700 Subject: [PATCH 10/18] Bump version to 2.7.0 --- code.json | 4 ++-- .../java/gov/usgs/earthquake/distribution/ProductClient.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/code.json b/code.json index d6841545..41d7f67b 100644 --- a/code.json +++ b/code.json @@ -3,7 +3,7 @@ "name": "Product Distribution Layer", "organization": "U.S. Geological Survey", "description": "Distribution system used for derived earthquake information", - "version": "v2.6.1", + "version": "v2.7.0", "status": "Production", "permissions": { "usageType": "openSource", @@ -27,7 +27,7 @@ "email": "jmfee@usgs.gov" }, "date": { - "metadataLastUpdated": "2020-11-30" + "metadataLastUpdated": "2020-12-01" } } ] diff --git a/src/main/java/gov/usgs/earthquake/distribution/ProductClient.java b/src/main/java/gov/usgs/earthquake/distribution/ProductClient.java index d754f733..5f5fa019 100644 --- a/src/main/java/gov/usgs/earthquake/distribution/ProductClient.java +++ b/src/main/java/gov/usgs/earthquake/distribution/ProductClient.java @@ -64,7 +64,7 @@ public class ProductClient extends DefaultConfigurable implements ProductClientMBean, Bootstrappable { /** The "release" version number. */ - public static final String RELEASE_VERSION = "Version 2.6.1 2020-11-30"; + public static final String RELEASE_VERSION = "Version 2.7.0 2020-12-01"; /** Property name used on products for current RELEASE_VERSION. */ public static final String PDL_CLIENT_VERSION_PROPERTY = "pdl-client-version"; From b507ee1eeed9df91e9060b7bc5da4c8fc32ff7b1 Mon Sep 17 00:00:00 2001 From: Jeremy Fee Date: Wed, 2 Dec 2020 16:37:08 -0700 Subject: [PATCH 11/18] Add retries for AWS calls --- .../usgs/earthquake/aws/AwsProductSender.java | 39 +++++++++++++++++-- 1 file changed, 36 insertions(+), 3 deletions(-) diff --git a/src/main/java/gov/usgs/earthquake/aws/AwsProductSender.java b/src/main/java/gov/usgs/earthquake/aws/AwsProductSender.java index f7300eba..a491209d 100644 --- a/src/main/java/gov/usgs/earthquake/aws/AwsProductSender.java +++ b/src/main/java/gov/usgs/earthquake/aws/AwsProductSender.java @@ -116,7 +116,24 @@ public void sendProduct(final Product product) throws Exception { ) { LOGGER.fine("Getting upload urls for " + json.toString()); // get upload urls, response is product with signed content urls for upload - Product uploadProduct = getUploadUrls(json); + Product uploadProduct; + try { + uploadProduct = getUploadUrls(json); + } catch (HttpException e) { + HttpURLConnection connection = e.response.connection; + // check for server error + if (connection.getResponseCode() >= 500) { + LOGGER.log(Level.FINE, + "[" + getName() + "] get upload urls exception, trying again", e); + // try again after random back off (1-5 s) + Thread.sleep(1000 + Math.round(4000 * Math.random())); + uploadProduct = getUploadUrls(json); + } else { + // otherwise propagate exception as usual + throw e; + } + } + final long afterGetUploadUrls = new Date().getTime(); LOGGER.fine("[" + getName() + "] get upload urls " + id.toString() + " (" + (afterGetUploadUrls - start) + " ms) "); @@ -148,8 +165,24 @@ public void sendProduct(final Product product) throws Exception { afterUploadContent = new Date().getTime(); } - // send product - sendProduct(json); + try { + // send product + sendProduct(json); + } catch (HttpException e) { + HttpURLConnection connection = e.response.connection; + // check for server error + if (connection.getResponseCode() >= 500) { + LOGGER.log(Level.FINE, + "[" + getName() + "] send product exception, trying again", e); + // try again after random back off (1-5 s) + Thread.sleep(1000 + Math.round(4000 * Math.random())); + sendProduct(json); + } else { + // otherwise propagate exception as usual + throw e; + } + } + final long afterSendProduct = new Date().getTime(); LOGGER.fine("[" + getName() + "] send product " + id.toString() + " (" + (afterSendProduct - afterUploadContent) + " ms) "); From 8623487c9dcfd0c6d2b4ec0a7a44eeb2f54a3f0a Mon Sep 17 00:00:00 2001 From: Jeremy Fee Date: Wed, 2 Dec 2020 16:37:28 -0700 Subject: [PATCH 12/18] Move synchronization so modules can run in parallel --- .../gov/usgs/earthquake/indexer/Indexer.java | 39 ++++++++++++------- 1 file changed, 25 insertions(+), 14 deletions(-) diff --git a/src/main/java/gov/usgs/earthquake/indexer/Indexer.java b/src/main/java/gov/usgs/earthquake/indexer/Indexer.java index 488d041f..36db7cc4 100644 --- a/src/main/java/gov/usgs/earthquake/indexer/Indexer.java +++ b/src/main/java/gov/usgs/earthquake/indexer/Indexer.java @@ -276,7 +276,7 @@ public void removeModule(final IndexerModule toRemove) { * the product to summarize. * @return module best suited to summarize product. */ - protected synchronized IndexerModule getModule(final Product product) { + protected IndexerModule getModule(final Product product) { // mit is the module fetched off the iterator // m is the module to return IndexerModule mit = null, m = null; @@ -479,7 +479,7 @@ public boolean accept(final ProductId id) { * if an exception occurs. */ @Override - public synchronized void onProduct(final Product product) throws Exception { + public void onProduct(final Product product) throws Exception { onProduct(product, false); } @@ -494,8 +494,7 @@ public synchronized void onProduct(final Product product) throws Exception { * (true), or skip (false). * @throws Exception */ - public synchronized void onProduct(final Product product, - final boolean force) throws Exception { + public void onProduct(final Product product, final boolean force) throws Exception { ProductId id = product.getId(); // The notification to be sent when we are finished with this product @@ -548,6 +547,22 @@ public synchronized void onProduct(final Product product, // -- Step 3: Add product summary to the product index // -------------------------------------------------------------------// + try { + productSummary = indexProduct(productSummary, notification); + } finally { + final Date endIndex = new Date(); + LOGGER.fine("[" + getName() + "] indexer processed product id=" + + id.toString() + " in " + + (endIndex.getTime() - beginStore.getTime()) + " ms"); + } + } + + /** + * Add product summary to product index. + */ + protected synchronized ProductSummary indexProduct( + ProductSummary productSummary, + IndexerEvent notification) throws Exception { LOGGER.finest("[" + getName() + "] beginning index transaction"); // Start the product index transaction, only proceed if able productIndex.beginTransaction(); @@ -725,28 +740,24 @@ public synchronized void onProduct(final Product product, // send heartbeat info HeartbeatListener.sendHeartbeatMessage(getName(), - "indexed product", id.toString()); + "indexed product", productSummary.getId().toString()); + + // return summary after added to index + return productSummary; } catch (Exception e) { - LOGGER.log(Level.FINE, "[" + getName() - + "] rolling back transaction", e); + LOGGER.log(Level.FINE, "[" + getName() + "] rolling back transaction", e); // just rollback since it wasn't successful productIndex.rollbackTransaction(); // send heartbeat info HeartbeatListener.sendHeartbeatMessage(getName(), - "index exception", id.toString()); + "index exception", productSummary.getId().toString()); // send heartbeat info HeartbeatListener.sendHeartbeatMessage(getName(), "index exception class", e.getClass().getName()); throw e; - } finally { - final Date endIndex = new Date(); - LOGGER.fine("[" + getName() + "] indexer processed product id=" - + id.toString() + " in " + - (endIndex.getTime() - beginStore.getTime()) + " ms"); } - } /** From 41eb931287cf133b5a408810cbe21d8131690d60 Mon Sep 17 00:00:00 2001 From: Jeremy Fee Date: Wed, 2 Dec 2020 16:37:59 -0700 Subject: [PATCH 13/18] Update JDBCProductIndex to use fewer queries --- .../earthquake/indexer/JDBCProductIndex.java | 1722 +++++------------ 1 file changed, 470 insertions(+), 1252 deletions(-) diff --git a/src/main/java/gov/usgs/earthquake/indexer/JDBCProductIndex.java b/src/main/java/gov/usgs/earthquake/indexer/JDBCProductIndex.java index 3ac0f6a0..1d2ad0fd 100644 --- a/src/main/java/gov/usgs/earthquake/indexer/JDBCProductIndex.java +++ b/src/main/java/gov/usgs/earthquake/indexer/JDBCProductIndex.java @@ -3,36 +3,33 @@ */ package gov.usgs.earthquake.indexer; -import gov.usgs.earthquake.product.InvalidProductException; -import gov.usgs.earthquake.product.Product; import gov.usgs.earthquake.product.ProductId; import gov.usgs.earthquake.util.JDBCConnection; import gov.usgs.util.Config; import gov.usgs.util.JDBCUtils; import gov.usgs.util.StreamUtils; +import gov.usgs.util.StringUtils; import java.io.File; import java.math.BigDecimal; -import java.net.MalformedURLException; import java.net.URI; -import java.net.URISyntaxException; import java.net.URL; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; -import java.sql.Statement; import java.sql.Types; import java.util.ArrayList; +import java.util.Arrays; import java.util.Date; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.logging.Level; import java.util.logging.Logger; +import java.util.stream.Collectors; /** * JDBC Implementation of {@link ProductIndex}. @@ -81,29 +78,29 @@ public class JDBCProductIndex extends JDBCConnection implements ProductIndex { private static final String JDBC_CONNECTION_PREFIX = "jdbc:sqlite:"; /** Variables to store the event and product column names */ - private static final String EVENT_TABLE = "event"; + // private static final String EVENT_TABLE = "event"; private static final String EVENT_TABLE_ALIAS = "e"; - private static final String EVENT_INDEX_ID = "id"; - private static final String EVENT_CREATED = "created"; - private static final String EVENT_UPDATED = "updated"; - private static final String EVENT_SOURCE = "source"; - private static final String EVENT_SOURCE_CODE = "sourceCode"; + // private static final String EVENT_INDEX_ID = "id"; + // private static final String EVENT_CREATED = "created"; + // private static final String EVENT_UPDATED = "updated"; + // private static final String EVENT_SOURCE = "source"; + // private static final String EVENT_SOURCE_CODE = "sourceCode"; private static final String EVENT_TIME = "eventTime"; private static final String EVENT_LATITUDE = "latitude"; private static final String EVENT_LONGITUDE = "longitude"; private static final String EVENT_DEPTH = "depth"; private static final String EVENT_MAGNITUDE = "magnitude"; - private static final String EVENT_STATUS = "status"; + // private static final String EVENT_STATUS = "status"; private static final String EVENT_STATUS_UPDATE = "UPDATE"; private static final String EVENT_STATUS_DELETE = "DELETE"; private static final String SUMMARY_TABLE = "productSummary"; private static final String SUMMARY_TABLE_ALIAS = "p"; - private static final String SUMMARY_CREATED = "created"; + // private static final String SUMMARY_CREATED = "created"; public static final String SUMMARY_PRODUCT_INDEX_ID = "id"; private static final String SUMMARY_PRODUCT_ID = "productId"; - private static final String SUMMARY_EVENT_ID = "eventId"; + // private static final String SUMMARY_EVENT_ID = "eventId"; private static final String SUMMARY_TYPE = "type"; private static final String SUMMARY_SOURCE = "source"; private static final String SUMMARY_CODE = "code"; @@ -119,128 +116,14 @@ public class JDBCProductIndex extends JDBCConnection implements ProductIndex { private static final String SUMMARY_STATUS = "status"; private static final String SUMMARY_TRACKER_URL = "trackerURL"; private static final String SUMMARY_PREFERRED = "preferred"; - private static final String SUMMARY_PROPERTY_TABLE = "productSummaryProperty"; - private static final String SUMMARY_PROPERTY_ID = "productSummaryIndexId"; - private static final String SUMMARY_PROPERTY_NAME = "name"; - private static final String SUMMARY_PROPERTY_VALUE = "value"; - private static final String SUMMARY_LINK_TABLE = "productSummaryLink"; - private static final String SUMMARY_LINK_ID = "productSummaryIndexId"; - private static final String SUMMARY_LINK_RELATION = "relation"; - private static final String SUMMARY_LINK_URL = "url"; - - /** Query used to insert events */ - private static final String INSERT_EVENT_QUERY = String.format( - "INSERT INTO %s (%s) VALUES (?)", EVENT_TABLE, EVENT_CREATED); - - /** Query used to update preferred event properties. */ - private static final String UPDATE_EVENT_QUERY = String - .format("UPDATE %s SET %s=?, %s=?, %s=?, %s=?, %s=?, %s=?, %s=?, %s=?, %s=? WHERE %s=?", - EVENT_TABLE, EVENT_UPDATED, EVENT_SOURCE, - EVENT_SOURCE_CODE, EVENT_TIME, EVENT_LATITUDE, - EVENT_LONGITUDE, EVENT_DEPTH, EVENT_MAGNITUDE, - EVENT_STATUS, EVENT_INDEX_ID); - - /** Query used to update preferred event properties. */ - private static final String UPDATE_DELETED_EVENT_QUERY = String.format( - "UPDATE %s SET %s=? WHERE %s=?", EVENT_TABLE, EVENT_STATUS, - EVENT_INDEX_ID); - - /** Query used to insert product summaries */ - private static final String INSERT_SUMMARY_QUERY = String - .format("INSERT INTO %s ( %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s ) " - + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? )", - SUMMARY_TABLE, SUMMARY_CREATED, SUMMARY_PRODUCT_ID, - SUMMARY_TYPE, SUMMARY_SOURCE, SUMMARY_CODE, - SUMMARY_UPDATE_TIME, SUMMARY_EVENT_SOURCE, - SUMMARY_EVENT_SOURCE_CODE, SUMMARY_EVENT_TIME, - SUMMARY_EVENT_LATITUDE, SUMMARY_EVENT_LONGITUDE, - SUMMARY_EVENT_DEPTH, SUMMARY_EVENT_MAGNITUDE, - SUMMARY_VERSION, SUMMARY_STATUS, SUMMARY_TRACKER_URL, - SUMMARY_PREFERRED); - - /** Query used to store the property */ - private static final String ADD_PROPERTY_QUERY = String.format( - "INSERT INTO %s ( %s, %s, %s ) " + "VALUES (?, ?, ?)", - SUMMARY_PROPERTY_TABLE, SUMMARY_PROPERTY_ID, SUMMARY_PROPERTY_NAME, - SUMMARY_PROPERTY_VALUE); - - /** Query used to store the link */ - private static final String ADD_LINK_QUERY = String.format( - "INSERT INTO %s ( %s, %s, %s ) " + "VALUES (?, ?, ?)", - SUMMARY_LINK_TABLE, SUMMARY_LINK_ID, SUMMARY_LINK_RELATION, - SUMMARY_LINK_URL); - - /** Query used to store the relation between products and events */ - private static final String ADD_ASSOCIATION_QUERY = String.format( - "UPDATE %s SET %s=? WHERE %s=? AND %s=? AND %s=?", SUMMARY_TABLE, - SUMMARY_EVENT_ID, SUMMARY_SOURCE, SUMMARY_TYPE, SUMMARY_CODE); - - /** Query to delete events */ - private static final String DELETE_EVENT_QUERY = String.format( - "DELETE FROM %s WHERE id=?", EVENT_TABLE); - - /** Query to delete products */ - private static final String DELETE_SUMMARY_QUERY = String.format( - "DELETE FROM %s WHERE id=?", SUMMARY_TABLE); - - /** Query to delete properties */ - private static final String DELETE_PROPERTIES_QUERY = String.format( - "DELETE FROM %s WHERE %s=?", SUMMARY_PROPERTY_TABLE, - SUMMARY_PROPERTY_ID); - - /** Query to delete links */ - private static final String DELETE_LINKS_QUERY = String.format( - "DELETE FROM %s WHERE %s=?", SUMMARY_LINK_TABLE, SUMMARY_LINK_ID); - - /** Query to remove the association between a product and an event */ - private static final String REMOVE_ASSOCIATION_QUERY = String.format( - "UPDATE %s SET %s=? WHERE %s=? AND %s=? AND %s=?", SUMMARY_TABLE, - SUMMARY_EVENT_ID, SUMMARY_SOURCE, SUMMARY_TYPE, SUMMARY_CODE); - - /** Query to get a summary using its id */ - private static final String GET_SUMMARY_BY_PRODUCT_INDEX_ID = String - .format("SELECT %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s FROM %s WHERE %s = ?", - SUMMARY_PRODUCT_ID, SUMMARY_TYPE, SUMMARY_SOURCE, - SUMMARY_CODE, SUMMARY_UPDATE_TIME, SUMMARY_EVENT_SOURCE, - SUMMARY_EVENT_SOURCE_CODE, SUMMARY_EVENT_TIME, - SUMMARY_EVENT_LATITUDE, SUMMARY_EVENT_LONGITUDE, - SUMMARY_EVENT_DEPTH, SUMMARY_EVENT_MAGNITUDE, - SUMMARY_VERSION, SUMMARY_STATUS, SUMMARY_TRACKER_URL, - SUMMARY_PREFERRED, SUMMARY_TABLE, SUMMARY_PRODUCT_INDEX_ID); - - /** Query to get product ids that share an event id */ - private static final String GET_SUMMARIES_BY_EVENT_INDEX_ID = String - .format("SELECT %s FROM %s WHERE %s = ?", SUMMARY_PRODUCT_INDEX_ID, - SUMMARY_TABLE, SUMMARY_EVENT_ID); - - /** Query to get all the links for a product */ - private static final String GET_LINKS_BY_PRODUCT_INDEX_ID = String.format( - "SELECT %s, %s FROM %s WHERE %s = ?", SUMMARY_LINK_RELATION, - SUMMARY_LINK_URL, SUMMARY_LINK_TABLE, SUMMARY_LINK_ID); - - /** Query to get all the properties for a product */ - private static final String GET_PROPS_BY_PRODUCT_INDEX_ID = String - .format("SELECT %s, %s FROM %s WHERE %s = ?", - SUMMARY_PROPERTY_NAME, SUMMARY_PROPERTY_VALUE, - SUMMARY_PROPERTY_TABLE, SUMMARY_PROPERTY_ID); - - /** Create some prepared statements */ - private PreparedStatement insertEvent; - private PreparedStatement updateEvent; - private PreparedStatement updateDeletedEvent; - private PreparedStatement insertSummary; - private PreparedStatement insertProperty; - private PreparedStatement insertLink; - private PreparedStatement addAssociation; - private PreparedStatement deleteEvent; - private PreparedStatement deleteSummary; - private PreparedStatement deleteProperties; - private PreparedStatement deleteLinks; - private PreparedStatement removeAssociation; - private PreparedStatement getSummary; - private PreparedStatement getSummaries; - private PreparedStatement getProductLinks; - private PreparedStatement getProductProperties; + // private static final String SUMMARY_PROPERTY_TABLE = "productSummaryProperty"; + // private static final String SUMMARY_PROPERTY_ID = "productSummaryIndexId"; + // private static final String SUMMARY_PROPERTY_NAME = "name"; + // private static final String SUMMARY_PROPERTY_VALUE = "value"; + // private static final String SUMMARY_LINK_TABLE = "productSummaryLink"; + // private static final String SUMMARY_LINK_ID = "productSummaryIndexId"; + // private static final String SUMMARY_LINK_RELATION = "relation"; + // private static final String SUMMARY_LINK_URL = "url"; private String driver; private String url; @@ -288,54 +171,6 @@ public void configure(Config config) throws Exception { } } - /** - * Connect to the database and set up some prepared statements - */ - @Override - public synchronized void startup() throws Exception { - // initialize connection - super.startup(); - Connection connection = getConnection(); - - // Prepare statements for interacting with the database - try { - insertEvent = connection.prepareStatement(INSERT_EVENT_QUERY, - new String[] { EVENT_INDEX_ID }); - } catch (SQLException e) { - // sqlite doesn't support RETURN_GENERATED_KEYS, but appears to - // return generated keys anyways - insertEvent = connection.prepareStatement(INSERT_EVENT_QUERY); - } - updateEvent = connection.prepareStatement(UPDATE_EVENT_QUERY); - updateDeletedEvent = connection - .prepareStatement(UPDATE_DELETED_EVENT_QUERY); - try { - insertSummary = connection.prepareStatement(INSERT_SUMMARY_QUERY, - new String[] { SUMMARY_PRODUCT_INDEX_ID }); - } catch (SQLException e) { - // sqlite doesn't support RETURN_GENERATED_KEYS, but appears to - // return generated keys anyways - insertSummary = connection.prepareStatement(INSERT_SUMMARY_QUERY); - } - insertProperty = connection.prepareStatement(ADD_PROPERTY_QUERY); - insertLink = connection.prepareStatement(ADD_LINK_QUERY); - deleteEvent = connection.prepareStatement(DELETE_EVENT_QUERY); - deleteSummary = connection.prepareStatement(DELETE_SUMMARY_QUERY); - deleteProperties = connection.prepareStatement(DELETE_PROPERTIES_QUERY); - deleteLinks = connection.prepareStatement(DELETE_LINKS_QUERY); - removeAssociation = connection - .prepareStatement(REMOVE_ASSOCIATION_QUERY); - addAssociation = connection.prepareStatement(ADD_ASSOCIATION_QUERY); - getSummary = connection - .prepareStatement(GET_SUMMARY_BY_PRODUCT_INDEX_ID); - getSummaries = connection - .prepareStatement(GET_SUMMARIES_BY_EVENT_INDEX_ID); - getProductLinks = connection - .prepareStatement(GET_LINKS_BY_PRODUCT_INDEX_ID); - getProductProperties = connection - .prepareStatement(GET_PROPS_BY_PRODUCT_INDEX_ID); - } - /** * Return a connection to the database. * @@ -371,163 +206,6 @@ public Connection connect() throws Exception { return JDBCUtils.getConnection(driver, url); } - /** - * Close the database connection and each of the prepared statements. Before - * closing each resource, this method checks if it is already closed. - */ - @Override - public synchronized void shutdown() throws Exception { - // Close each of the prepared statements, then close the connection. - // Make sure exceptions don't prevent closing of any statements. - - if (insertEvent != null) { - try { - insertEvent.close(); - } catch (Exception e) { - // ignore - } - insertEvent = null; - } - - if (updateEvent != null) { - try { - updateEvent.close(); - } catch (Exception e) { - // ignore - } - updateEvent = null; - } - - if (updateDeletedEvent != null) { - try { - updateDeletedEvent.close(); - } catch (Exception e) { - // ignore - } - updateDeletedEvent = null; - } - - if (insertSummary != null) { - try { - insertSummary.close(); - } catch (Exception e) { - // ignore - } - insertSummary = null; - } - - if (insertProperty != null) { - try { - insertProperty.close(); - } catch (Exception e) { - // ignore - } - insertProperty = null; - } - - if (insertLink != null) { - try { - insertLink.close(); - } catch (Exception e) { - // ignore - } - insertLink = null; - } - - if (deleteEvent != null) { - try { - deleteEvent.close(); - } catch (Exception e) { - // ignore - } - deleteEvent = null; - } - - if (deleteSummary != null) { - try { - deleteSummary.close(); - } catch (Exception e) { - // ignore - } - deleteSummary = null; - } - - if (deleteProperties != null) { - try { - deleteProperties.close(); - } catch (Exception e) { - // ignore - } - deleteProperties = null; - } - - if (deleteLinks != null) { - try { - deleteLinks.close(); - } catch (Exception e) { - // ignore - } - deleteLinks = null; - } - - if (removeAssociation != null) { - try { - removeAssociation.close(); - } catch (Exception e) { - // ignore - } - removeAssociation = null; - } - - if (addAssociation != null) { - try { - addAssociation.close(); - } catch (Exception e) { - // ignore - } - addAssociation = null; - } - - if (getSummary != null) { - try { - getSummary.close(); - } catch (Exception e) { - // ignore - } - getSummary = null; - } - - if (getSummaries != null) { - try { - getSummaries.close(); - } catch (Exception e) { - // ignore - } - getSummaries = null; - } - - if (getProductLinks != null) { - try { - getProductLinks.close(); - } catch (Exception e) { - // ignore - } - getProductLinks = null; - } - - if (getProductProperties != null) { - try { - getProductProperties.close(); - } catch (Exception e) { - // ignore - } - getProductProperties = null; - } - - // disconnect - super.shutdown(); - } - /** * Return all events from the database that meet the parameters specified in * the ProductIndexQuery object. @@ -539,18 +217,53 @@ public synchronized void shutdown() throws Exception { @Override public synchronized List getEvents(ProductIndexQuery query) throws Exception { - List events = new LinkedList(); + // map of events (index id => event), so products can be added incrementally + final Map events = new HashMap<>(); + // all products for loading details + ArrayList products = new ArrayList<>(); + + // Build up our clause list like always + // These clauses may only match certain products within events, + // and are used to find a list of event ids + List clauses = buildProductClauses(query); - // Get a list of event indexIds from the database that match this query - List eventIndexIds = getEventIndexIds(query); + // Build the SQL Query from our ProductIndexQuery object + String sql = "SELECT DISTINCT e.id" + + " FROM event e, productSummary p" + + " WHERE e.id=p.eventId"; + // Add all appropriate where clauses + for (final String clause : clauses) { + sql = sql + " AND " + clause; + } - Iterator iter = eventIndexIds.iterator(); - while (iter.hasNext()) { - Long eventIndexId = iter.next(); - events.add(getEvent(eventIndexId)); + // Now use query that finds event ids as sub-select and load all products + sql = "SELECT DISTINCT * FROM productSummary ps WHERE eventId IN (" + sql + ")"; + // load event products + try ( + final PreparedStatement statement = getConnection().prepareStatement(sql); + final ResultSet results = statement.executeQuery(); + ) { + while (results.next()) { + // eventid not part of product summary object, + // so need to do this as products are parsed... + final Long id = results.getLong("eventId"); + Event event = events.get(id); + if (event == null) { + // create event to hold products + event = new Event(); + event.setIndexId(id); + events.put(id, event); + } + final ProductSummary productSummary = parseProductSummary(results); + event.addProduct(productSummary); + products.add(productSummary); + } } - return events; + // load product details + loadProductSummaries(products); + + return events.values().stream().collect(Collectors.toList()); } /** @@ -563,40 +276,35 @@ public synchronized List getEvents(ProductIndexQuery query) @Override public synchronized Event addEvent(Event event) throws Exception { Event e = null; - ResultSet keys = null; - try { + final String sql = "INSERT INTO event (created) VALUES (?)"; + try ( + final PreparedStatement insertEvent = + getConnection().prepareStatement(sql, new String[] {"id"}); + ) { // Add the values to the prepared statement - JDBCUtils.setParameter(insertEvent, 1, new Date().getTime(), - Types.BIGINT); + JDBCUtils.setParameter(insertEvent, 1, new Date().getTime(), Types.BIGINT); // Execute the prepared statement int rows = insertEvent.executeUpdate(); if (rows == 1) { - keys = insertEvent.getGeneratedKeys(); long id = 0; - while (keys.next()) { - id = keys.getLong(1); + try (final ResultSet keys = insertEvent.getGeneratedKeys()) { + while (keys.next()) { + id = keys.getLong(1); + } + e = new Event(event); + e.setIndexId(id); } - e = new Event(event); - e.setIndexId(id); - LOGGER.finest("Added event id=" + id); } else { LOGGER.log(Level.WARNING, "[" + getName() + "] Exception when adding new event to database"); - throw new Exception(); - - } - } finally { - try { - keys.close(); - } catch (Exception e2) { + throw new Exception("Error adding new event to database"); } } - LOGGER.log(Level.FINEST, "[" + getName() - + "] Added event to Product Index"); + LOGGER.log(Level.FINEST, "[" + getName() + "] Added event to Product Index"); return e; } @@ -613,36 +321,33 @@ public synchronized List removeEvent(Event event) throws Exception { Long id = event.getIndexId(); - // If there is no index id on the event, we can assume its // not in the database if (id == null) { return null; } - // A list of all the productIds that got deleted - ArrayList productIds = new ArrayList(); + // remove event products + final List productIds = removeProductSummaries(event.getProductList()); - // We need to remove all the products associated with this event - List summaries = event.getProductList(); - Iterator summaryIter = summaries.iterator(); - while (summaryIter.hasNext()) { - ProductId productId = removeProductSummary(summaryIter.next()); - productIds.add(productId); - } + // and now remove event + final String sql = "DELETE FROM event WHERE id=?"; + try ( + final PreparedStatement deleteEvent = getConnection().prepareStatement(sql); + ) { + JDBCUtils.setParameter(deleteEvent, 1, id, Types.BIGINT); + int rows = deleteEvent.executeUpdate(); + // If we didn't delete a row, or we deleted more than 1 row, throw an + // exception + if (rows != 1) { + LOGGER.log(Level.WARNING, "[" + getName() + + "] Exception when deleting an event from the database"); + throw new Exception("Error deleting event from database"); + } - JDBCUtils.setParameter(deleteEvent, 1, id, Types.BIGINT); - int rows = deleteEvent.executeUpdate(); - // If we didn't delete a row, or we deleted more than 1 row, throw an - // exception - if (rows != 1) { - LOGGER.log(Level.WARNING, "[" + getName() - + "] Exception when deleting an event from the database"); - throw new Exception(); + LOGGER.finest("[" + getName() + "] Removed event id=" + id); } - LOGGER.finest("[" + getName() + "] Removed event id=" + id); - return productIds; } @@ -663,36 +368,26 @@ public synchronized List getUnassociatedProducts( "getUnassociatedProducts does not support SEARCH_EVENT_PREFERRED"); } - ArrayList products = new ArrayList(); + final ArrayList products = new ArrayList(); - List clauseList = buildProductClauses(query); + final List clauseList = buildProductClauses(query); // Add the unassociated quantifier to the clause list clauseList.add("eventId IS NULL"); - String query_text = buildProductQuery(clauseList); - - Statement statement = null; - ResultSet results = null; - try { - // Great. We have the query built up, so lets run it - statement = verifyConnection().createStatement(); - results = statement.executeQuery(query_text); + final String sql = buildProductQuery(clauseList); - // Now lets build an Event object from each row in the result set + try ( + final PreparedStatement statement = getConnection().prepareStatement(sql); + final ResultSet results = statement.executeQuery(); + ) { + // Now lets build product objects from each row in the result set while (results.next()) { - ProductSummary p = parseSummaryResult(results); - products.add(p); - } - } finally { - try { - results.close(); - } catch (Exception e) { - } - try { - statement.close(); - } catch (Exception e) { + products.add(parseProductSummary(results)); } } + // load properties and links + loadProductSummaries(products); + return products; } @@ -709,19 +404,24 @@ public synchronized List getUnassociatedProducts( @Override public synchronized List getProducts(ProductIndexQuery query) throws Exception { - if (query.getEventSearchType() == ProductIndexQuery.SEARCH_EVENT_PREFERRED) { - throw new IllegalArgumentException( - "getUnassociatedProducts does not support SEARCH_EVENT_PREFERRED"); + final List clauseList = buildProductClauses(query); + final String sql = buildProductQuery(clauseList); + + final List products = new LinkedList(); + try ( + final PreparedStatement statement = getConnection().prepareStatement(sql); + final ResultSet results = statement.executeQuery(); + ) { + // Now lets build product objects from each row in the result set + while (results.next()) { + products.add(parseProductSummary(results)); + } } - List summaries = new LinkedList(); + // load properties and links + loadProductSummaries(products); - Iterator summaryIndexIds = getSummaryIndexIds(query).iterator(); - while (summaryIndexIds.hasNext()) { - summaries.add(getSummary(summaryIndexIds.next())); - } - - return summaries; + return products; } /** @@ -737,89 +437,91 @@ public synchronized List getProducts(ProductIndexQuery query) public synchronized ProductSummary addProductSummary(ProductSummary summary) throws Exception { // Add values to the prepared statement + long productId = 0; + final ProductId sid = summary.getId(); + + final String sql = "INSERT INTO productSummary" + + "(created, productId, type, source, code" + + ", updateTime, eventSource, eventSourceCode, eventTime" + + ", eventLatitude, eventLongitude, eventDepth, eventMagnitude" + + ", version, status, trackerURL, preferred" + + ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; + try ( + final PreparedStatement insertSummary = + getConnection().prepareStatement(sql, new String[] {"id"}); + ) { + // Set the created timestamp + JDBCUtils.setParameter(insertSummary, 1, new Date().getTime(), + Types.BIGINT); - // Set the created timestamp - JDBCUtils.setParameter(insertSummary, 1, new Date().getTime(), - Types.BIGINT); + if (sid != null) { + JDBCUtils.setParameter(insertSummary, 2, sid.toString(), + Types.VARCHAR); + JDBCUtils.setParameter(insertSummary, 3, sid.getType(), + Types.VARCHAR); + JDBCUtils.setParameter(insertSummary, 4, sid.getSource(), + Types.VARCHAR); + JDBCUtils.setParameter(insertSummary, 5, sid.getCode(), + Types.VARCHAR); + JDBCUtils.setParameter(insertSummary, 6, + (sid.getUpdateTime() != null) ? sid.getUpdateTime() + .getTime() : null, Types.BIGINT); + } else { + // Summary product id is null. Set all these parameter to null + JDBCUtils.setParameter(insertSummary, 2, null, Types.VARCHAR); + JDBCUtils.setParameter(insertSummary, 3, null, Types.VARCHAR); + JDBCUtils.setParameter(insertSummary, 4, null, Types.VARCHAR); + JDBCUtils.setParameter(insertSummary, 5, null, Types.VARCHAR); + JDBCUtils.setParameter(insertSummary, 6, null, Types.BIGINT); + } - ProductId sid = summary.getId(); - if (sid != null) { - JDBCUtils.setParameter(insertSummary, 2, sid.toString(), + JDBCUtils.setParameter(insertSummary, 7, summary.getEventSource(), Types.VARCHAR); - JDBCUtils.setParameter(insertSummary, 3, sid.getType(), + JDBCUtils.setParameter(insertSummary, 8, summary.getEventSourceCode(), Types.VARCHAR); - JDBCUtils.setParameter(insertSummary, 4, sid.getSource(), + + Date eventTime = summary.getEventTime(); + JDBCUtils.setParameter(insertSummary, 9, + (eventTime != null) ? eventTime.getTime() : null, Types.BIGINT); + + JDBCUtils + .setParameter(insertSummary, 10, + (summary.getEventLatitude() != null) ? summary + .getEventLatitude().doubleValue() : null, + Types.DECIMAL); + JDBCUtils + .setParameter( + insertSummary, + 11, + (summary.getEventLongitude() != null) ? normalizeLongitude(summary + .getEventLongitude().doubleValue()) : null, + Types.DECIMAL); + JDBCUtils.setParameter(insertSummary, 12, + (summary.getEventDepth() != null) ? summary.getEventDepth() + .doubleValue() : null, Types.DECIMAL); + JDBCUtils.setParameter(insertSummary, 13, + (summary.getEventMagnitude() != null) ? summary + .getEventMagnitude().doubleValue() : null, + Types.DECIMAL); + JDBCUtils.setParameter(insertSummary, 14, summary.getVersion(), Types.VARCHAR); - JDBCUtils.setParameter(insertSummary, 5, sid.getCode(), + JDBCUtils.setParameter(insertSummary, 15, summary.getStatus(), Types.VARCHAR); - JDBCUtils.setParameter(insertSummary, 6, - (sid.getUpdateTime() != null) ? sid.getUpdateTime() - .getTime() : null, Types.BIGINT); - } else { - // Summary product id is null. Set all these parameter to null - JDBCUtils.setParameter(insertSummary, 2, null, Types.VARCHAR); - JDBCUtils.setParameter(insertSummary, 3, null, Types.VARCHAR); - JDBCUtils.setParameter(insertSummary, 4, null, Types.VARCHAR); - JDBCUtils.setParameter(insertSummary, 5, null, Types.VARCHAR); - JDBCUtils.setParameter(insertSummary, 6, null, Types.BIGINT); - } + JDBCUtils.setParameter(insertSummary, 16, + (summary.getTrackerURL() != null) ? summary.getTrackerURL() + .toString() : null, Types.VARCHAR); + JDBCUtils.setParameter(insertSummary, 17, summary.getPreferredWeight(), + Types.BIGINT); - JDBCUtils.setParameter(insertSummary, 7, summary.getEventSource(), - Types.VARCHAR); - JDBCUtils.setParameter(insertSummary, 8, summary.getEventSourceCode(), - Types.VARCHAR); - - Date eventTime = summary.getEventTime(); - JDBCUtils.setParameter(insertSummary, 9, - (eventTime != null) ? eventTime.getTime() : null, Types.BIGINT); - - JDBCUtils - .setParameter(insertSummary, 10, - (summary.getEventLatitude() != null) ? summary - .getEventLatitude().doubleValue() : null, - Types.DECIMAL); - JDBCUtils - .setParameter( - insertSummary, - 11, - (summary.getEventLongitude() != null) ? normalizeLongitude(summary - .getEventLongitude().doubleValue()) : null, - Types.DECIMAL); - JDBCUtils.setParameter(insertSummary, 12, - (summary.getEventDepth() != null) ? summary.getEventDepth() - .doubleValue() : null, Types.DECIMAL); - JDBCUtils.setParameter(insertSummary, 13, - (summary.getEventMagnitude() != null) ? summary - .getEventMagnitude().doubleValue() : null, - Types.DECIMAL); - JDBCUtils.setParameter(insertSummary, 14, summary.getVersion(), - Types.VARCHAR); - JDBCUtils.setParameter(insertSummary, 15, summary.getStatus(), - Types.VARCHAR); - JDBCUtils.setParameter(insertSummary, 16, - (summary.getTrackerURL() != null) ? summary.getTrackerURL() - .toString() : null, Types.VARCHAR); - JDBCUtils.setParameter(insertSummary, 17, summary.getPreferredWeight(), - Types.BIGINT); - - // Execute the prepared statement - insertSummary.executeUpdate(); - - ResultSet keys = null; - long productId = 0; + // Execute the prepared statement + insertSummary.executeUpdate(); - try { - keys = insertSummary.getGeneratedKeys(); - while (keys.next()) { - productId = keys.getLong(1); - } - } finally { - try { - keys.close(); - } catch (Exception e) { + try (final ResultSet keys = insertSummary.getGeneratedKeys()) { + while (keys.next()) { + productId = keys.getLong(1); + } } } - // Now that the summary is stored, lets try to store the properties addProductProperties(productId, summary.getProperties()); // And try to store the links @@ -845,25 +547,8 @@ public synchronized ProductSummary addProductSummary(ProductSummary summary) @Override public synchronized ProductId removeProductSummary(ProductSummary summary) throws Exception { - if (summary.getIndexId() != 0 && summary.getIndexId() != null) { - // First remove all the properties and links - long id = summary.getIndexId(); - removeProductProperties(id); - removeProductLinks(id); - - JDBCUtils.setParameter(deleteSummary, 1, id, Types.BIGINT); - deleteSummary.executeUpdate(); - - LOGGER.finest("[" + getName() + "] Removed productSummary id=" + id); - - // Return the id of the product deleted - return summary.getId(); - } else { - LOGGER.log(Level.WARNING, "[" + getName() - + "] Could not delete product summary. Index id not found"); - throw new Exception("[" + getName() - + "] Could not delete summary. Index id not found."); - } + List removed = removeProductSummaries(Arrays.asList(summary)); + return removed.get(0); } /** @@ -886,18 +571,22 @@ public synchronized Event addAssociation(Event event, ProductSummary summary) + "] Cannot add association between event or summary that are not already in index."); } - ProductId sid = summary.getId(); + final ProductId sid = summary.getId(); + final String sql = "UPDATE productSummary" + + " SET eventId=? WHERE source=? AND type=? AND code=?"; + try ( + final PreparedStatement addAssociation = getConnection().prepareStatement(sql); + ) { + JDBCUtils.setParameter(addAssociation, 1, event.getIndexId(), Types.BIGINT); + // these will target EVERY version of the given product + JDBCUtils.setParameter(addAssociation, 2, sid.getSource(), Types.VARCHAR); + JDBCUtils.setParameter(addAssociation, 3, sid.getType(), Types.VARCHAR); + JDBCUtils.setParameter(addAssociation, 4, sid.getCode(), Types.VARCHAR); - JDBCUtils.setParameter(addAssociation, 1, event.getIndexId(), - Types.BIGINT); - // these will target EVERY version of the given product - JDBCUtils.setParameter(addAssociation, 2, sid.getSource(), - Types.VARCHAR); - JDBCUtils.setParameter(addAssociation, 3, sid.getType(), Types.VARCHAR); - JDBCUtils.setParameter(addAssociation, 4, sid.getCode(), Types.VARCHAR); + addAssociation.executeUpdate(); + } - addAssociation.executeUpdate(); - Event e = new Event(event); + final Event e = new Event(event); e.addProduct(summary); LOGGER.log( Level.FINER, @@ -938,24 +627,29 @@ public synchronized Event removeAssociation(Event event, return event; } - ProductId sid = summary.getId(); - - // Now run the query - JDBCUtils.setParameter(removeAssociation, 1, null, Types.BIGINT); - // these will target EVERY version of the given product - JDBCUtils.setParameter(removeAssociation, 2, summary.getId() - .getSource(), Types.VARCHAR); - JDBCUtils.setParameter(removeAssociation, 3, summary.getId().getType(), - Types.VARCHAR); - JDBCUtils.setParameter(removeAssociation, 4, summary.getId().getCode(), - Types.VARCHAR); - - int rows = removeAssociation.executeUpdate(); - // Throw an exception if we didn't update any - if (rows < 1) { - LOGGER.log(Level.INFO, "[" + getName() - + "] Failed to remove an association in the Product Index"); - throw new Exception("Failed to remove association"); + final ProductId sid = summary.getId(); + final String sql = "UPDATE productSummary" + + " SET eventId=? WHERE source=? AND type=? AND code=?"; + try ( + final PreparedStatement removeAssociation = getConnection().prepareStatement(sql); + ) { + // Now run the query + JDBCUtils.setParameter(removeAssociation, 1, null, Types.BIGINT); + // these will target EVERY version of the given product + JDBCUtils.setParameter(removeAssociation, 2, summary.getId() + .getSource(), Types.VARCHAR); + JDBCUtils.setParameter(removeAssociation, 3, summary.getId().getType(), + Types.VARCHAR); + JDBCUtils.setParameter(removeAssociation, 4, summary.getId().getCode(), + Types.VARCHAR); + + int rows = removeAssociation.executeUpdate(); + // Throw an exception if we didn't update any + if (rows < 1) { + LOGGER.log(Level.INFO, "[" + getName() + + "] Failed to remove an association in the Product Index"); + throw new Exception("Failed to remove association"); + } } LOGGER.finer("[" + getName() + "] Removed associations event id=" @@ -996,430 +690,6 @@ public synchronized Event removeAssociation(Event event, // Protected Methods // ____________________________________ - /** - * Query the database to get the event with the given event index id - * - * @param eventIndexId - * @return Event object - * @throws SQLException - * @throws InvalidProductException - */ - protected synchronized Event getEvent(Long eventIndexId) - throws SQLException, InvalidProductException { - // Create an event object with its eventIndexId set - Event event = new Event(eventIndexId); - - // Find a list of summary index ids whose summaries are associated to - // the given eventIndexId - Iterator summaryIndexIds = getSummaryIndexIds(eventIndexId) - .iterator(); - - while (summaryIndexIds.hasNext()) { - // Create the product summary for each returned summary index id and - // add the created summary to the event - Long summaryIndexId = summaryIndexIds.next(); - event.addProduct(getSummary(summaryIndexId)); - } - - // Return our results. There may or may not be any products - return event; - } - - /** - * Query the database to get a list of event index ids that have products - * matching the given ProductIndexQuery. - * - * @param query - * @return List of index ids - * @throws Exception - */ - protected synchronized List getEventIndexIds(ProductIndexQuery query) - throws Exception { - // Object to return - List eventIndexIds = new LinkedList(); - - if (query == null) { - // a null query shouldn't match ALL events - return eventIndexIds; - } - - // Build up our clause list like always - List clauses = buildProductClauses(query); - - // Build the SQL Query from our ProductIndexQuery object - StringBuilder sql = new StringBuilder(); - sql.append("SELECT DISTINCT e."); - sql.append(EVENT_INDEX_ID); - sql.append(" FROM "); - sql.append(SUMMARY_TABLE).append(" p,"); - sql.append(EVENT_TABLE).append(" e"); - sql.append(" WHERE "); - // this join is effectively the same as SUMMARY_EVENT_ID IS NOT NULL - sql.append("e.").append(EVENT_INDEX_ID).append("=p.") - .append(SUMMARY_EVENT_ID); - - // Add all appropriate where clauses - Iterator clauseIter = clauses.iterator(); - while (clauseIter.hasNext()) { - sql.append(" AND "); - sql.append(clauseIter.next()); - } - - // Query the database. - Statement statement = null; - ResultSet results = null; - - try { - statement = verifyConnection().createStatement(); - results = statement.executeQuery(sql.toString()); - - // Loop over our results and add each eventIndexId to the list - while (results.next()) { - // EVENT_INDEX_ID - eventIndexIds.add(Long.valueOf(results.getLong(1))); - } - } finally { - try { - results.close(); - } catch (Exception e) { - } - try { - statement.close(); - } catch (Exception e) { - } - } - - // Return our result. Note this is never null but may be empty. - return eventIndexIds; - } - - /** - * Use the index id to get a ProductSummary from the database. - * - * @param summaryIndexId - * @return ProductSummary pulled from the database - * @throws SQLException - * @throws InvalidProductException - */ - protected synchronized ProductSummary getSummary(Long summaryIndexId) - throws SQLException, InvalidProductException { - ProductSummary summary = new ProductSummary(); - summary.setIndexId(summaryIndexId); - - // ------------------------------------------------------------------- - // -- Add basic summary information - // ------------------------------------------------------------------- - ResultSet results = null; - - try { - // Query the index for raw information - getSummary.setLong(1, summaryIndexId); - results = getSummary.executeQuery(); - - // Order of results is (taken from getSummary SQL) - // 1) SUMMARY_PRODUCT_ID, - // 2) SUMMARY_TYPE, - // 3) SUMMARY_SOURCE, - // 4) SUMMARY_CODE, - // 5) SUMMARY_UPDATE_TIME, - // 6) SUMMARY_EVENT_SOURCE, - // 7) SUMMARY_EVENT_SOURCE_CODE, - // 8) SUMMARY_EVENT_TIME, - // 9) SUMMARY_EVENT_LATITUDE, - // 10) SUMMARY_EVENT_LONGITUDE, - // 11) SUMMARY_EVENT_DEPTH, - // 12) SUMMARY_EVENT_MAGNITUDE, - // 13) SUMMARY_VERSION, - // 14) SUMMARY_STATUS, - // 15) SUMMARY_TRACKER_URL, - // 16) SUMMARY_PREFERRED - - // Parse the raw information and set the summary parameters - if (results.next()) { - try { - // SUMMARY_PRODUCT_ID - summary.setId(ProductId.parse(results.getString(1))); - } catch (NullPointerException npx) { - // Product ID not allowed to be null - // Remove from index? - LOGGER.log( - Level.WARNING, - "[" - + getName() - + "] Failed to get summary. Product ID was null, summary index id=" - + summaryIndexId, npx); - throw new InvalidProductException("Product ID was null", - npx); - } - - // Set some simple types. Null values are fine. - try { - // SUMMARY_EVENT_SOURCE - summary.setEventSource(results.getString(6)); - } catch (Exception e) { - // ignore - } - if (results.wasNull()) { - summary.setEventSource(null); - } - - try { - // SUMMARY_EVENT_SOURCE_CODE - summary.setEventSourceCode(results.getString(7)); - } catch (Exception e) { - // ignore - } - if (results.wasNull()) { - summary.setEventSourceCode(null); - } - - try { - // SUMMARY_EVENT_TIME - summary.setEventTime(new Date(results.getLong(8))); - } catch (Exception e) { - // ignore - } - if (results.wasNull()) { - summary.setEventTime(null); - } - - try { - // SUMMARY_EVENT_LATITUDE - summary.setEventLatitude(new BigDecimal(results - .getDouble(9))); - } catch (Exception e) { - // ignore - } - if (results.wasNull()) { - summary.setEventLatitude(null); - } - - try { - // SUMMARY_EVENT_LONGITUDE - summary.setEventLongitude(new BigDecimal(results - .getDouble(10))); - } catch (Exception e) { - // ignore - } - if (results.wasNull()) { - summary.setEventLongitude(null); - } - - try { - // SUMMARY_EVENT_DEPTH - summary.setEventDepth(new BigDecimal(results.getDouble(11))); - } catch (Exception e) { - // ignore - } - if (results.wasNull()) { - summary.setEventDepth(null); - } - - try { - // SUMMARY_EVENT_MAGNITUDE - summary.setEventMagnitude(new BigDecimal(results - .getDouble(12))); - } catch (Exception e) { - // ignore - } - if (results.wasNull()) { - summary.setEventMagnitude(null); - } - - // Set some more simple values - try { - // SUMMARY_VERSION - summary.setVersion(results.getString(13)); - } catch (Exception e) { - // ignore - } - if (results.wasNull()) { - summary.setVersion(null); - } - - try { - // SUMMARY_STATUS - summary.setStatus(results.getString(14)); - } catch (Exception e) { - // ignore - } - if (results.wasNull()) { - summary.setStatus(null); - } - - try { - // SUMMARY_TRACKER_URL - summary.setTrackerURL(new URL(results.getString(15))); - } catch (MalformedURLException mux) { - - // Tracker URL is not allowed to be null - // Log a message? - // Remove this product from the index? - - // Throw a more informative exception - LOGGER.log(Level.INFO, "[" + getName() - + "] Bad TrackerURL value", mux); - throw new InvalidProductException("[" + getName() - + "] Bad TrackerURL value", mux); - } - - // This will default to 0 if not set in index db - // SUMMARY_PREFERRED - summary.setPreferredWeight(results.getLong(16)); - } - } finally { - // must close result set to keep from blocking transaction - try { - results.close(); - } catch (Exception e) { - } - } - - // Add summary link information - summary.setLinks(getSummaryLinks(summaryIndexId)); - - // Add summary property information - Map properties = getSummaryProperties(summaryIndexId); - summary.setProperties(properties); - - // set numeric attributes based on string values to preserve original precision - if (properties.containsKey(Product.DEPTH_PROPERTY)) { - summary.setEventDepth(new BigDecimal( - properties.get(Product.DEPTH_PROPERTY))); - } - if (properties.containsKey(Product.LATITUDE_PROPERTY)) { - summary.setEventLatitude(new BigDecimal( - properties.get(Product.LATITUDE_PROPERTY))); - } - if (properties.containsKey(Product.LONGITUDE_PROPERTY)) { - summary.setEventLongitude(new BigDecimal( - properties.get(Product.LONGITUDE_PROPERTY))); - } - if (properties.containsKey(Product.MAGNITUDE_PROPERTY)) { - summary.setEventMagnitude(new BigDecimal( - properties.get(Product.MAGNITUDE_PROPERTY))); - } - - // Return our generated result. Note this is never null. - return summary; - } - - /** - * Use the event index id to get a list of all of the product summary ids - * associated with that event - * - * @param eventIndexId - * @return List of product index ids - * @throws SQLException - */ - protected synchronized List getSummaryIndexIds(Long eventIndexId) - throws SQLException { - // Create a list object to return - List summaryIndexIds = new LinkedList(); - - ResultSet results = null; - - try { - // Query database for a list of product summary index ids - getSummaries.setLong(1, eventIndexId.longValue()); - results = getSummaries.executeQuery(); - - // Add each product summary index id to our list - while (results.next()) { - // SUMMARY_PRODUCT_INDEX_ID - summaryIndexIds.add(Long.valueOf(results.getLong(1))); - } - } finally { - // must close result set to keep from blocking transaction - try { - results.close(); - } catch (Exception e) { - } - } - - // Return our results. Note this is never null but may be empty. - return summaryIndexIds; - } - - /** - * Query the database for a list of product summary index ids for summaries - * that match the given query. - * - * @param query - * @return List of product index ids - * @throws SQLException - */ - protected synchronized List getSummaryIndexIds(ProductIndexQuery query) - throws SQLException { - // Object to return - List summaryIndexIds = new LinkedList(); - - // Build up our clause list like always - List clauses = buildProductClauses(query); - - // Build the SQL Query from our ProductIndexQuery object - StringBuilder sql = new StringBuilder(); - sql.append("SELECT DISTINCT "); - sql.append(SUMMARY_PRODUCT_INDEX_ID); - sql.append(" FROM "); - sql.append(SUMMARY_TABLE).append(" ").append(SUMMARY_TABLE_ALIAS); - sql.append(" WHERE "); - sql.append(SUMMARY_TABLE_ALIAS).append(".") - .append(SUMMARY_PRODUCT_INDEX_ID); - sql.append(" IS NOT NULL"); - - // Add all appropriate where clauses - Iterator clauseIter = clauses.iterator(); - while (clauseIter.hasNext()) { - sql.append(" AND "); - sql.append(clauseIter.next()); - } - - String orderBy = query.getOrderBy(); - if (orderBy != null) { - sql.append(" ORDER BY " + orderBy); - } - - Integer limit = query.getLimit(); - if (limit != null) { - sql.append(" LIMIT " + limit); - } - - Statement statement = null; - ResultSet results = null; - try { - LOGGER.finest("[" + getName() + "] running query \n" - + sql.toString()); - // Query the database. - statement = verifyConnection().createStatement(); - results = statement.executeQuery(sql.toString()); - - // Loop over our results and add each eventIndexId to the list - while (results.next()) { - // SUMMARY_PRODUCT_INDEX_ID - summaryIndexIds.add(Long.valueOf(results.getLong(1))); - } - - LOGGER.finest("[" + getName() + "] query complete"); - } catch (Exception e) { - LOGGER.log(Level.WARNING, "[" + getName() - + "] exception querying index", e); - } finally { - // must close result set to keep from blocking transaction - try { - results.close(); - } catch (Exception e) { - } - try { - statement.close(); - } catch (Exception e) { - } - } - - // Return our result. Note this is never null but may be empty. - return summaryIndexIds; - } - /** * Build a list of all the pieces of the WHERE clause relevant to the * productSummary table. If the query doesn't set any properties, this @@ -1713,21 +983,88 @@ protected String buildProductQuery(List clauseList) { } /** - * Parse the next item in the result set into a ProductSummary object + * Populate links and properties for provided product summaries. + * + * @param summaries + * @throws Exception + */ + protected void loadProductSummaries(final List summaries) + throws Exception { + if (summaries.size() == 0) { + // nothing to load + return; + } + + // index by id + final Map summaryMap = new HashMap<>(); + for (final ProductSummary summary : summaries) { + summaryMap.put(summary.getIndexId(), summary); + } + + // load all links in one query + final String linkSql = "SELECT productSummaryIndexId as id, relation, url" + + " FROM productSummaryLink" + + " WHERE productSummaryIndexId IN (" + + StringUtils.join( + summaryMap.keySet().stream().collect(Collectors.toList()), + ",") + + ")"; + try ( + final PreparedStatement statement = verifyConnection().prepareStatement(linkSql); + final ResultSet results = statement.executeQuery(); + ) { + while (results.next()) { + Long id = results.getLong("id"); + String relation = results.getString("relation"); + String uri = results.getString("url"); + // add properties to existing objects + summaryMap.get(id).addLink(relation, new URI(uri)); + } + } + + // load all properties in one query + final String propertySql = "SELECT productSummaryIndexId as id, name, value" + + " FROM productSummaryProperty" + + " WHERE productSummaryIndexId IN (" + + StringUtils.join( + summaryMap.keySet().stream().collect(Collectors.toList()), + ",") + + ")"; + try ( + final PreparedStatement statement = + verifyConnection().prepareStatement(propertySql); + final ResultSet results = statement.executeQuery(); + ) { + while (results.next()) { + Long id = results.getLong("id"); + String name = results.getString("name"); + String value = results.getString("value"); + // add properties to existing objects + summaryMap.get(id).getProperties().put(name, value); + } + } + } + + /** + * Parse ProductSummary without loading links or properties. * * @param results - * @return ProductSummary object with attributes filled from database + * @return ProductSummary object without links or properties. * @throws Exception */ - protected ProductSummary parseSummaryResult(ResultSet results) + protected ProductSummary parseProductSummary(ResultSet results) throws Exception { ProductSummary p = new ProductSummary(); - p.setIndexId(results.getLong("id")); + p.setIndexId(results.getLong(SUMMARY_PRODUCT_INDEX_ID)); ProductId pid = ProductId.parse(results.getString(SUMMARY_PRODUCT_ID)); p.setId(pid); p.setEventSource(results.getString(SUMMARY_EVENT_SOURCE)); p.setEventSourceCode(results.getString(SUMMARY_EVENT_SOURCE_CODE)); - p.setEventTime(new Date(results.getLong(SUMMARY_EVENT_TIME))); + try { + p.setEventTime(new Date(results.getLong(SUMMARY_EVENT_TIME))); + } catch (Exception e) { + p.setEventTime(null); + } // getDouble() returns 0 if the value was actually NULL. In this case, // we are going to set the value to null @@ -1761,95 +1098,53 @@ protected ProductSummary parseSummaryResult(ResultSet results) results.getString(SUMMARY_TRACKER_URL)) : null); p.setPreferredWeight(results.getLong(SUMMARY_PREFERRED)); - // Set product links - Long indexId = p.getIndexId(); - ResultSet links = null; - try { - JDBCUtils.setParameter(getProductLinks, 1, indexId, Types.BIGINT); - links = getProductLinks.executeQuery(); - while (links.next()) { - p.addLink(links.getString(SUMMARY_LINK_RELATION), - new URI(links.getString(SUMMARY_LINK_URL))); - } - } finally { - try { - links.close(); // Free this result set - } catch (Exception e) { - } - } - - ResultSet props = null; - try { - // Set product properties - JDBCUtils.setParameter(getProductProperties, 1, indexId, - Types.BIGINT); - props = getProductProperties.executeQuery(); - Map properties = p.getProperties(); - while (props.next()) { - properties.put(props.getString(SUMMARY_PROPERTY_NAME), - props.getString(SUMMARY_PROPERTY_VALUE)); - } - p.setProperties(properties); - } finally { - try { - props.close(); - } catch (Exception e) { - } - } - return p; } - /** - * Look in the database for all the properties associated with the given - * product summary. - * - * @param summaryIndexId - * @return Map of property name to property value - * @throws SQLException - * @throws InvalidProductException - */ - protected synchronized Map getSummaryProperties( - Long summaryIndexId) throws SQLException, InvalidProductException { - // Create our object to populate and return - Map properties = new HashMap(); - - ResultSet results = null; - try { - getProductProperties.setLong(1, summaryIndexId.longValue()); - results = getProductProperties.executeQuery(); - while (results.next()) { - // SUMMARY_PROPERTY_NAME - String name = results.getString(1); - // SUMMARY_PROPERTY_VALUE - String value = results.getString(2); - - if (name == null || value == null) { - - // Both name and value are required - // Log something? - // Remove link from product index db? - InvalidProductException ipx = new InvalidProductException( - "Bad Product Property"); - ipx.fillInStackTrace(); - LOGGER.log(Level.INFO, "[" + getName() - + "] Bad Product Property", ipx); - throw ipx; - } - - // Add this link back to the map of links - properties.put(name, value); + public synchronized List removeProductSummaries( + final List summaries) throws Exception { + // index by id + final ArrayList ids = new ArrayList<>(); + // index by id + final Map summaryMap = new HashMap<>(); + for (final ProductSummary summary : summaries) { + if (summary.getIndexId() == null) { + LOGGER.log(Level.WARNING, "[" + getName() + + "] Could not delete product summary. Index id not found"); + throw new Exception("[" + getName() + + "] Could not delete summary. Index id not found."); } - } finally { - // must close result set to keep from blocking transaction - try { - results.close(); - } catch (Exception e) { + summaryMap.put(summary.getIndexId(), summary); + ids.add(summary.getId()); + } + + if (summaries.size() == 0) { + return ids; + } + + // remove all products in one query + // on delete cascade wasn't always set... + final String[] sqls = { + "DELETE FROM productSummaryLink WHERE productSummaryIndexId IN", + "DELETE FROM productSummaryProperty WHERE productSummaryIndexId IN", + "DELETE FROM productSummary WHERE id IN", + }; + final String idsIn =" (" + + StringUtils.join( + summaryMap.keySet().stream().collect(Collectors.toList()), + ",") + + ")"; + for (final String sql : sqls) { + try ( + final PreparedStatement statement = + verifyConnection().prepareStatement(sql + idsIn); + ) { + int rows = statement.executeUpdate(); + LOGGER.log(Level.FINER, "[" + getName() + "] removed " + rows + " rows"); } } - // Return our mapping of generated properties. Note this is never null - // but may be empty. - return properties; + + return ids; } /** @@ -1860,108 +1155,28 @@ protected synchronized Map getSummaryProperties( * @param properties * @throws SQLException */ - protected synchronized void addProductProperties(long productId, - Map properties) throws SQLException { + protected synchronized void addProductProperties(final long productId, + final Map properties) throws SQLException { // Loop through the properties list and add them all to the database - Set keys = properties.keySet(); - for (String key : keys) { - JDBCUtils.setParameter(insertProperty, 1, productId, Types.BIGINT); - JDBCUtils.setParameter(insertProperty, 2, key, Types.VARCHAR); - JDBCUtils.setParameter(insertProperty, 3, properties.get(key), - Types.VARCHAR); - - insertProperty.executeUpdate(); - if (LOGGER.isLoggable(Level.FINEST)) { - LOGGER.log(Level.FINEST, "[" + getName() + "] Added property " - + key + ":" + properties.get(key) + " for product " - + productId); - } - } - } - - /** - * Delete the given properties from the index - * - * @param productId - */ - protected synchronized void removeProductProperties(long productId) - throws Exception { - JDBCUtils.setParameter(deleteProperties, 1, productId, Types.BIGINT); - - deleteProperties.executeUpdate(); - } - - /** - * Look in the database for all the links associated with the given product - * summary. - * - * @param summaryIndexId - * @return Map of link relation (link type) to URL - * @throws SQLException - * @throws InvalidProductException - */ - protected synchronized Map> getSummaryLinks( - Long summaryIndexId) throws SQLException, InvalidProductException { - // Create our object to populate and return - Map> links = new HashMap>(); - - ResultSet results = null; - try { - getProductLinks.setLong(1, summaryIndexId.longValue()); - results = getProductLinks.executeQuery(); - - while (results.next()) { - // SUMMARY_LINK_RELATION - String relation = results.getString(1); - // SUMMARY_LINK_URL - String uriStr = results.getString(2); - - if (relation == null || uriStr == null) { - - // Both relation and uri are required - // Log something? - // Remove link from product index db? - - InvalidProductException ipx = new InvalidProductException( - "[" + getName() + "] Bad Product Link"); - ipx.fillInStackTrace(); - LOGGER.log(Level.INFO, "[" + getName() - + "] Bad Product link", ipx); - throw ipx; - } - List l = links.get(relation); - - // Case when no links for this relation yet - if (l == null) { - l = new LinkedList(); - } - - try { - l.add(new URI(uriStr)); - } catch (URISyntaxException usx) { - - // Link URI String in DB was malformed. - // Log something? - // Remove from index? - LOGGER.log(Level.INFO, "[" + getName() - + "] Bad Product Link", usx); - throw new InvalidProductException("[" + getName() - + "] Bad Product Link", usx); + final String sql = "INSERT INTO productSummaryProperty" + + " (productSummaryIndexId, name, value) VALUES (?, ?, ?)"; + try ( + final PreparedStatement insertProperty = getConnection().prepareStatement(sql); + ) { + for (String key : properties.keySet()) { + JDBCUtils.setParameter(insertProperty, 1, productId, Types.BIGINT); + JDBCUtils.setParameter(insertProperty, 2, key, Types.VARCHAR); + JDBCUtils.setParameter(insertProperty, 3, properties.get(key), + Types.VARCHAR); + insertProperty.addBatch(); + if (LOGGER.isLoggable(Level.FINEST)) { + LOGGER.log(Level.FINEST, "[" + getName() + "] Added property " + + key + ":" + properties.get(key) + " for product " + + productId); } - - // Add this link back to the map of links - links.put(relation, l); - } - } finally { - // must close result set to keep from blocking transaction - try { - results.close(); - } catch (Exception e) { } + insertProperty.executeBatch(); } - // Return our mapping of generated links. Note this is never null but - // may be empty. - return links; } /** @@ -1976,35 +1191,26 @@ protected synchronized Map> getSummaryLinks( protected synchronized void addProductLinks(long productId, Map> links) throws SQLException { // Loop through the properties list and add them all to the database - Set keys = links.keySet(); - for (String key : keys) { - List uris = links.get(key); - for (URI uri : uris) { - JDBCUtils.setParameter(insertLink, 1, productId, Types.BIGINT); - JDBCUtils.setParameter(insertLink, 2, key, Types.VARCHAR); - JDBCUtils.setParameter(insertLink, 3, uri.toString(), - Types.VARCHAR); - - insertLink.executeUpdate(); - LOGGER.log(Level.FINEST, "[" + getName() + "] Added link " - + key + ":" + uri.toString() + " for product " - + productId); + final String sql = "INSERT INTO productSummaryLink" + + " (productSummaryIndexId, relation, url) VALUES (?, ?, ?)"; + try ( + final PreparedStatement insertLink = getConnection().prepareStatement(sql); + ) { + for (final String relation : links.keySet()) { + for (final URI uri : links.get(relation)) { + JDBCUtils.setParameter(insertLink, 1, productId, Types.BIGINT); + JDBCUtils.setParameter(insertLink, 2, relation, Types.VARCHAR); + JDBCUtils.setParameter(insertLink, 3, uri.toString(), Types.VARCHAR); + insertLink.addBatch(); + LOGGER.log(Level.FINEST, "[" + getName() + "] Added link " + + relation + ":" + uri.toString() + " for product " + + productId); + } } + insertLink.executeBatch(); } } - /** - * Delete the given links from the index - * - * @param productId - */ - protected synchronized void removeProductLinks(long productId) - throws Exception { - JDBCUtils.setParameter(deleteLinks, 1, productId, Types.BIGINT); - - deleteLinks.executeUpdate(); - } - /** * Convert the given longitude to be between -180 and 180. If the given * value is already in the range, this method just returns the value. @@ -2061,89 +1267,101 @@ protected BigDecimal normalizeLongitude(BigDecimal lon) { public synchronized void eventsUpdated(List events) throws Exception { Long indexId = null; - Iterator iter = events.iterator(); - while (iter.hasNext()) { - Event updated = iter.next(); - - LOGGER.finer("[" + getName() + "] Updating event indexid=" - + updated.getIndexId()); - updated.log(LOGGER); + final String deletedSql = "UPDATE event SET status=? WHERE id=?"; + final String updatedSql = "UPDATE event" + + " SET updated=?, source=?, sourceCode=?, eventTime=?" + + " , latitude=?, longitude=?, depth=?, magnitude=?, status=?" + + " WHERE id=?"; - try { - indexId = updated.getIndexId(); - - if (updated.isDeleted()) { - // only update status if event deleted, leave other - // parameters intact - JDBCUtils.setParameter(updateDeletedEvent, 1, - EVENT_STATUS_DELETE, Types.VARCHAR); - JDBCUtils.setParameter(updateDeletedEvent, 2, indexId, - Types.BIGINT); - - updateDeletedEvent.executeUpdate(); - } else { - EventSummary summary = updated.getEventSummary(); - - // otherwise update event parameters - JDBCUtils.setParameter(updateEvent, 1, - new Date().getTime(), Types.BIGINT); - JDBCUtils.setParameter(updateEvent, 2, summary.getSource(), - Types.VARCHAR); - JDBCUtils.setParameter(updateEvent, 3, - summary.getSourceCode(), Types.VARCHAR); - - Long eventTime = null; - if (summary.getTime() != null) { - eventTime = summary.getTime().getTime(); - } - JDBCUtils.setParameter(updateEvent, 4, eventTime, - Types.BIGINT); + try ( + final PreparedStatement updateDeletedEvent = + getConnection().prepareStatement(deletedSql); + final PreparedStatement updateEvent = + getConnection().prepareStatement(updatedSql); + ) { - Double latitude = null; - if (summary.getLatitude() != null) { - latitude = summary.getLatitude().doubleValue(); - } - JDBCUtils.setParameter(updateEvent, 5, latitude, - Types.DOUBLE); - - Double longitude = null; - if (summary.getLongitude() != null) { - longitude = summary.getLongitude().doubleValue(); - } - JDBCUtils.setParameter(updateEvent, 6, longitude, - Types.DOUBLE); + Iterator iter = events.iterator(); + while (iter.hasNext()) { + Event updated = iter.next(); - // these may be null, handle carefully - Double depth = null; - if (summary.getDepth() != null) { - depth = summary.getDepth().doubleValue(); - } - JDBCUtils.setParameter(updateEvent, 7, depth, Types.DOUBLE); + indexId = updated.getIndexId(); + LOGGER.finer("[" + getName() + "] Updating event indexid=" + indexId); + updated.log(LOGGER); - Double magnitude = null; - if (summary.getMagnitude() != null) { - magnitude = summary.getMagnitude().doubleValue(); + try { + if (updated.isDeleted()) { + // only update status if event deleted, leave other + // parameters intact + JDBCUtils.setParameter(updateDeletedEvent, 1, + EVENT_STATUS_DELETE, Types.VARCHAR); + JDBCUtils.setParameter(updateDeletedEvent, 2, indexId, + Types.BIGINT); + + updateDeletedEvent.executeUpdate(); + } else { + EventSummary summary = updated.getEventSummary(); + + // otherwise update event parameters + JDBCUtils.setParameter(updateEvent, 1, + new Date().getTime(), Types.BIGINT); + JDBCUtils.setParameter(updateEvent, 2, summary.getSource(), + Types.VARCHAR); + JDBCUtils.setParameter(updateEvent, 3, + summary.getSourceCode(), Types.VARCHAR); + + Long eventTime = null; + if (summary.getTime() != null) { + eventTime = summary.getTime().getTime(); + } + JDBCUtils.setParameter(updateEvent, 4, eventTime, + Types.BIGINT); + + Double latitude = null; + if (summary.getLatitude() != null) { + latitude = summary.getLatitude().doubleValue(); + } + JDBCUtils.setParameter(updateEvent, 5, latitude, + Types.DOUBLE); + + Double longitude = null; + if (summary.getLongitude() != null) { + longitude = summary.getLongitude().doubleValue(); + } + JDBCUtils.setParameter(updateEvent, 6, longitude, + Types.DOUBLE); + + // these may be null, handle carefully + Double depth = null; + if (summary.getDepth() != null) { + depth = summary.getDepth().doubleValue(); + } + JDBCUtils.setParameter(updateEvent, 7, depth, Types.DOUBLE); + + Double magnitude = null; + if (summary.getMagnitude() != null) { + magnitude = summary.getMagnitude().doubleValue(); + } + JDBCUtils.setParameter(updateEvent, 8, magnitude, + Types.DOUBLE); + + JDBCUtils.setParameter(updateEvent, 9, EVENT_STATUS_UPDATE, + Types.VARCHAR); + + JDBCUtils.setParameter(updateEvent, 10, indexId, + Types.BIGINT); + + updateEvent.executeUpdate(); } - JDBCUtils.setParameter(updateEvent, 8, magnitude, - Types.DOUBLE); - JDBCUtils.setParameter(updateEvent, 9, EVENT_STATUS_UPDATE, - Types.VARCHAR); - - JDBCUtils.setParameter(updateEvent, 10, indexId, - Types.BIGINT); - - updateEvent.executeUpdate(); + LOGGER.log(Level.FINEST, "[" + getName() + + "] Updated event properties in Product Index"); + } catch (Exception e) { + LOGGER.log(Level.WARNING, "[" + getName() + + "] Error updating event properties, eventid=" + + indexId, e); + // trigger a rollback + throw e; } - - LOGGER.log(Level.FINEST, "[" + getName() - + "] Updated event properties in Product Index"); - } catch (Exception e) { - LOGGER.log(Level.WARNING, "[" + getName() - + "] Error updating event properties, eventid=" - + indexId, e); - // trigger a rollback - throw e; } } } From ecabcbbc7bdc9ec5ea9f639e8bd424047c0aa1af Mon Sep 17 00:00:00 2001 From: Jeremy Fee Date: Wed, 2 Dec 2020 23:35:28 -0700 Subject: [PATCH 14/18] Add Indexer onBeforeProcessNotification, optimize hasProductBeenIndexed --- .../gov/usgs/earthquake/indexer/Indexer.java | 44 ++++++++++++++----- .../earthquake/indexer/JDBCProductIndex.java | 21 ++++++++- 2 files changed, 52 insertions(+), 13 deletions(-) diff --git a/src/main/java/gov/usgs/earthquake/indexer/Indexer.java b/src/main/java/gov/usgs/earthquake/indexer/Indexer.java index 36db7cc4..ab925805 100644 --- a/src/main/java/gov/usgs/earthquake/indexer/Indexer.java +++ b/src/main/java/gov/usgs/earthquake/indexer/Indexer.java @@ -7,6 +7,7 @@ import gov.usgs.earthquake.distribution.DefaultNotificationListener; import gov.usgs.earthquake.distribution.FileProductStorage; import gov.usgs.earthquake.distribution.HeartbeatListener; +import gov.usgs.earthquake.distribution.Notification; import gov.usgs.earthquake.distribution.ProductAlreadyInStorageException; import gov.usgs.earthquake.distribution.ProductStorage; import gov.usgs.earthquake.geoserve.ANSSRegionsFactory; @@ -419,8 +420,15 @@ protected boolean hasProductBeenIndexed(final ProductId id) { alreadyProcessedQuery.setMinProductUpdateTime(id.getUpdateTime()); alreadyProcessedQuery.setMaxProductUpdateTime(id.getUpdateTime()); - List existingSummary = productIndex - .getProducts(alreadyProcessedQuery); + final List existingSummary; + if (productIndex instanceof JDBCProductIndex) { + // only checking if exists, use one query instead of multiple. + existingSummary = ((JDBCProductIndex) productIndex) + .getProducts(alreadyProcessedQuery, false); + } else { + existingSummary = productIndex.getProducts(alreadyProcessedQuery); + } + if (existingSummary.size() > 0 && existingSummary.get(0).getId().equals(id)) { // it is in the product index @@ -463,6 +471,20 @@ public boolean accept(final ProductId id) { return superAccept; } + /** + * Check whether to skip products that have already been indexed. + */ + @Override + protected boolean onBeforeProcessNotification(Notification notification) throws Exception { + // try to short-circuit duplicates + if (!isProcessDuplicates() && hasProductBeenIndexed(notification.getProductId())) { + LOGGER.fine("[" + getName() + " notification already indexed, skipping"); + return false; + } + // otherwise, use default behavior + return super.onBeforeProcessNotification(notification); + } + /** * This method receives a product from Product Distribution and adds it to * the index. @@ -497,10 +519,6 @@ public void onProduct(final Product product) throws Exception { public void onProduct(final Product product, final boolean force) throws Exception { ProductId id = product.getId(); - // The notification to be sent when we are finished with this product - IndexerEvent notification = new IndexerEvent(this); - notification.setIndex(getProductIndex()); - // -------------------------------------------------------------------// // -- Step 1: Store product // -------------------------------------------------------------------// @@ -541,14 +559,13 @@ public void onProduct(final Product product, final boolean force) throws Excepti // Use this module to summarize the product ProductSummary productSummary = module.getProductSummary(product); - notification.setSummary(productSummary); // -------------------------------------------------------------------// // -- Step 3: Add product summary to the product index // -------------------------------------------------------------------// try { - productSummary = indexProduct(productSummary, notification); + productSummary = indexProduct(productSummary); } finally { final Date endIndex = new Date(); LOGGER.fine("[" + getName() + "] indexer processed product id=" @@ -561,9 +578,14 @@ public void onProduct(final Product product, final boolean force) throws Excepti * Add product summary to product index. */ protected synchronized ProductSummary indexProduct( - ProductSummary productSummary, - IndexerEvent notification) throws Exception { + ProductSummary productSummary) throws Exception { LOGGER.finest("[" + getName() + "] beginning index transaction"); + + // The notification to be sent when we are finished with this product + IndexerEvent notification = new IndexerEvent(this); + notification.setIndex(getProductIndex()); + notification.setSummary(productSummary); + // Start the product index transaction, only proceed if able productIndex.beginTransaction(); @@ -571,10 +593,10 @@ protected synchronized ProductSummary indexProduct( LOGGER.finer("[" + getName() + "] finding previous version"); // Check index for previous version of this product ProductSummary prevSummary = getPrevProductVersion(productSummary); - boolean redundantProduct = isRedundantProduct(prevSummary, productSummary); LOGGER.finer("[" + getName() + "] finding previous event"); Event prevEvent = null; + boolean redundantProduct = isRedundantProduct(prevSummary, productSummary); if (!redundantProduct) { // Skip association queries and use existing product association // performed in next branch (should be associated already if diff --git a/src/main/java/gov/usgs/earthquake/indexer/JDBCProductIndex.java b/src/main/java/gov/usgs/earthquake/indexer/JDBCProductIndex.java index 1d2ad0fd..92e79424 100644 --- a/src/main/java/gov/usgs/earthquake/indexer/JDBCProductIndex.java +++ b/src/main/java/gov/usgs/earthquake/indexer/JDBCProductIndex.java @@ -404,6 +404,21 @@ public synchronized List getUnassociatedProducts( @Override public synchronized List getProducts(ProductIndexQuery query) throws Exception { + // load full product summaries by default + return getProducts(query, true); + } + + /** + * Load product summaries. + * + * @param query + * product query + * @param loadDetails + * whether to call {@link #loadProductSummaries(List)}, + * which loads links and properties with additional queries. + */ + public synchronized List getProducts(ProductIndexQuery query, final boolean loadDetails) + throws Exception { final List clauseList = buildProductClauses(query); final String sql = buildProductQuery(clauseList); @@ -418,8 +433,10 @@ public synchronized List getProducts(ProductIndexQuery query) } } - // load properties and links - loadProductSummaries(products); + if (loadDetails) { + // load properties and links + loadProductSummaries(products); + } return products; } From de4e4ff73cd9652f9780631f43d20e9cb220d996 Mon Sep 17 00:00:00 2001 From: Jeremy Fee Date: Wed, 2 Dec 2020 23:59:26 -0700 Subject: [PATCH 15/18] Update Indexer onBeforeProcessNotification logging --- src/main/java/gov/usgs/earthquake/indexer/Indexer.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/main/java/gov/usgs/earthquake/indexer/Indexer.java b/src/main/java/gov/usgs/earthquake/indexer/Indexer.java index ab925805..a374c614 100644 --- a/src/main/java/gov/usgs/earthquake/indexer/Indexer.java +++ b/src/main/java/gov/usgs/earthquake/indexer/Indexer.java @@ -478,7 +478,9 @@ public boolean accept(final ProductId id) { protected boolean onBeforeProcessNotification(Notification notification) throws Exception { // try to short-circuit duplicates if (!isProcessDuplicates() && hasProductBeenIndexed(notification.getProductId())) { - LOGGER.fine("[" + getName() + " notification already indexed, skipping"); + LOGGER.finer( + "[" + getName() + "] notification already indexed, skipping " + + notification.getProductId().toString()); return false; } // otherwise, use default behavior From 750b049d539a3d9a5b292298b4860330cdc76e48 Mon Sep 17 00:00:00 2001 From: Jeremy Fee Date: Thu, 3 Dec 2020 00:28:05 -0700 Subject: [PATCH 16/18] Measure and log time for indexer to enter sync block --- .../gov/usgs/earthquake/indexer/Indexer.java | 31 +++++++++++++------ 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/src/main/java/gov/usgs/earthquake/indexer/Indexer.java b/src/main/java/gov/usgs/earthquake/indexer/Indexer.java index a374c614..c99a7679 100644 --- a/src/main/java/gov/usgs/earthquake/indexer/Indexer.java +++ b/src/main/java/gov/usgs/earthquake/indexer/Indexer.java @@ -142,6 +142,9 @@ public class Indexer extends DefaultNotificationListener { /** Task for archive policy thread. */ private TimerTask archiveTask = null; + /** Synchronization object for indexing. */ + private final Object indexProductSync = new Object(); + /** * Service used by FutureExecutorTask for execution. * See distribution.FutureListenerNotifier for more details. @@ -524,7 +527,7 @@ public void onProduct(final Product product, final boolean force) throws Excepti // -------------------------------------------------------------------// // -- Step 1: Store product // -------------------------------------------------------------------// - final Date beginStore = new Date(); + final long beginStore = new Date().getTime(); try { LOGGER.finest("[" + getName() + "] storing product id=" + id.toString()); @@ -545,10 +548,10 @@ public void onProduct(final Product product, final boolean force) throws Excepti return; } } - final Date endStore = new Date(); + final long endStore = new Date().getTime(); LOGGER.fine("[" + getName() + "] indexer downloaded product id=" + id.toString() + " in " + - (endStore.getTime() - beginStore.getTime()) + " ms"); + (endStore - beginStore) + " ms"); // -------------------------------------------------------------------// // -- Step 2: Use product module to summarize product @@ -566,13 +569,21 @@ public void onProduct(final Product product, final boolean force) throws Excepti // -- Step 3: Add product summary to the product index // -------------------------------------------------------------------// - try { - productSummary = indexProduct(productSummary); - } finally { - final Date endIndex = new Date(); - LOGGER.fine("[" + getName() + "] indexer processed product id=" - + id.toString() + " in " + - (endIndex.getTime() - beginStore.getTime()) + " ms"); + + // measure time waiting to enter synchronized block + final long beforeEnterSync = new Date().getTime(); + synchronized (indexProductSync) { + final long afterEnterSync = new Date().getTime(); + + try { + productSummary = indexProduct(productSummary); + } finally { + final long endIndex = new Date().getTime(); + LOGGER.fine("[" + getName() + "] indexer processed product id=" + + id.toString() + " in " + + (endIndex - beginStore) + " ms" + + " (" + (afterEnterSync - beforeEnterSync) + " ms sync delay)"); + } } } From 048064f9aebef9d3115579b8a9b11cb78e56fac0 Mon Sep 17 00:00:00 2001 From: Jeremy Fee Date: Thu, 3 Dec 2020 09:47:55 -0700 Subject: [PATCH 17/18] Use existing cause variable --- src/main/java/gov/usgs/util/FutureExecutorTask.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/gov/usgs/util/FutureExecutorTask.java b/src/main/java/gov/usgs/util/FutureExecutorTask.java index 17dceda4..c4342f03 100644 --- a/src/main/java/gov/usgs/util/FutureExecutorTask.java +++ b/src/main/java/gov/usgs/util/FutureExecutorTask.java @@ -136,7 +136,7 @@ public void run() { // unpack cause Throwable cause = e.getCause(); if (cause != null && cause instanceof Exception) { - e = (Exception) e.getCause(); + e = (Exception) cause; } } LOGGER.log(Level.INFO, "Exception executing task", e); From 09987db6521a7f108a4a9476e95957b9b80fa151 Mon Sep 17 00:00:00 2001 From: Jeremy Fee Date: Thu, 3 Dec 2020 10:28:05 -0700 Subject: [PATCH 18/18] Update release date --- code.json | 2 +- .../java/gov/usgs/earthquake/distribution/ProductClient.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/code.json b/code.json index 41d7f67b..693aaefc 100644 --- a/code.json +++ b/code.json @@ -27,7 +27,7 @@ "email": "jmfee@usgs.gov" }, "date": { - "metadataLastUpdated": "2020-12-01" + "metadataLastUpdated": "2020-12-03" } } ] diff --git a/src/main/java/gov/usgs/earthquake/distribution/ProductClient.java b/src/main/java/gov/usgs/earthquake/distribution/ProductClient.java index 5f5fa019..3d8fec86 100644 --- a/src/main/java/gov/usgs/earthquake/distribution/ProductClient.java +++ b/src/main/java/gov/usgs/earthquake/distribution/ProductClient.java @@ -64,7 +64,7 @@ public class ProductClient extends DefaultConfigurable implements ProductClientMBean, Bootstrappable { /** The "release" version number. */ - public static final String RELEASE_VERSION = "Version 2.7.0 2020-12-01"; + public static final String RELEASE_VERSION = "Version 2.7.0 2020-12-03"; /** Property name used on products for current RELEASE_VERSION. */ public static final String PDL_CLIENT_VERSION_PROPERTY = "pdl-client-version";