From 7a1c57addf965f743482bed8195d26483f013ec7 Mon Sep 17 00:00:00 2001 From: Jeremy Fee Date: Fri, 18 Dec 2020 09:38:10 -0700 Subject: [PATCH 01/11] Update how database synchronization is handled to provide better transaction isolation --- .../earthquake/aws/JsonNotificationIndex.java | 22 +- .../earthquake/aws/JsonProductStorage.java | 6 +- .../usgs/earthquake/aws/TrackingIndex.java | 6 +- .../distribution/JDBCNotificationIndex.java | 214 ++++--- .../gov/usgs/earthquake/indexer/Indexer.java | 535 +++++++++--------- .../earthquake/indexer/JDBCProductIndex.java | 60 +- .../usgs/earthquake/indexer/ProductIndex.java | 41 +- .../usgs/earthquake/util/JDBCConnection.java | 50 +- 8 files changed, 497 insertions(+), 437 deletions(-) diff --git a/src/main/java/gov/usgs/earthquake/aws/JsonNotificationIndex.java b/src/main/java/gov/usgs/earthquake/aws/JsonNotificationIndex.java index 2bfd5afa..ab5d2ee7 100644 --- a/src/main/java/gov/usgs/earthquake/aws/JsonNotificationIndex.java +++ b/src/main/java/gov/usgs/earthquake/aws/JsonNotificationIndex.java @@ -19,7 +19,6 @@ import javax.json.Json; - import gov.usgs.earthquake.distribution.DefaultNotification; import gov.usgs.earthquake.distribution.Notification; import gov.usgs.earthquake.distribution.NotificationIndex; @@ -215,7 +214,7 @@ public void createSchema() throws Exception { * TrackerURLs are ignored. */ @Override - public synchronized void addNotification(Notification notification) + public void addNotification(Notification notification) throws Exception { // all notifications Instant expires = notification.getExpirationDate().toInstant(); @@ -276,7 +275,7 @@ public synchronized void addNotification(Notification notification) * Tracker URLs are ignored. */ @Override - public synchronized void removeNotification(Notification notification) throws Exception { + public void removeNotification(Notification notification) throws Exception { // all notifications Instant expires = notification.getExpirationDate().toInstant(); ProductId id = notification.getProductId(); @@ -339,7 +338,7 @@ public synchronized void removeNotification(Notification notification) throws Ex * @return list with matching notifications, empty if not found. */ @Override - public synchronized List findNotifications( + public List findNotifications( String source, String type, String code) throws Exception { final ArrayList where = new ArrayList(); final ArrayList values = new ArrayList(); @@ -398,7 +397,7 @@ public synchronized List findNotifications( * @return list with matching notifications, empty if not found. */ @Override - public synchronized List findNotifications( + public List findNotifications( List sources, List types, List codes) throws Exception { final ArrayList where = new ArrayList(); @@ -474,7 +473,7 @@ public synchronized List findNotifications( * @return list with matching notifications, empty if not found. */ @Override - public synchronized List findExpiredNotifications() throws Exception { + public List findExpiredNotifications() throws Exception { final String sql = "SELECT * FROM " + this.table + " WHERE expires <= ?"; // prepare statement beginTransaction(); @@ -498,7 +497,6 @@ public synchronized List findExpiredNotifications() throws Excepti } } return new ArrayList(); - } /** @@ -509,7 +507,7 @@ public synchronized List findExpiredNotifications() throws Excepti * @return list with matching notifications, empty if not found. */ @Override - public synchronized List findNotifications(ProductId id) throws Exception { + public List findNotifications(ProductId id) throws Exception { final String sql = "SELECT * FROM " + this.table + " WHERE source=? AND type=? AND code=? AND updatetime=?"; // prepare statement @@ -555,8 +553,8 @@ public synchronized List findNotifications(ProductId id) throws Ex * other table. * @throws Exception */ - public synchronized List getMissingNotifications( - final String otherTable) throws Exception { + public List getMissingNotifications( final String otherTable) + throws Exception { // this is used to requeue a notification index. // run query in a way that returns list of default notifications, // (by returning empty created, data, and url) @@ -593,8 +591,10 @@ public synchronized List getMissingNotifications( /** * Parse notifications from a statement ready to be executed. + * + * Should be called in a transaction. */ - protected synchronized List getNotifications(PreparedStatement ps) + protected List getNotifications(PreparedStatement ps) throws Exception { final List n = new ArrayList(); try (final ResultSet rs = ps.executeQuery()) { diff --git a/src/main/java/gov/usgs/earthquake/aws/JsonProductStorage.java b/src/main/java/gov/usgs/earthquake/aws/JsonProductStorage.java index 18f011a8..5d2353db 100644 --- a/src/main/java/gov/usgs/earthquake/aws/JsonProductStorage.java +++ b/src/main/java/gov/usgs/earthquake/aws/JsonProductStorage.java @@ -205,7 +205,7 @@ public boolean hasProduct(ProductId id) throws Exception { * @return product if found, otherwise null. */ @Override - public synchronized Product getProduct(ProductId id) throws Exception { + public Product getProduct(ProductId id) throws Exception { Product product = null; final String sql = "SELECT * FROM " + this.table + " WHERE source=? AND type=? AND code=? AND updatetime=?"; @@ -253,7 +253,7 @@ public synchronized Product getProduct(ProductId id) throws Exception { * if product already in storage. */ @Override - public synchronized ProductId storeProduct(Product product) throws Exception { + public ProductId storeProduct(Product product) throws Exception { // prepare statement beginTransaction(); try ( @@ -323,7 +323,7 @@ public ProductId storeProductSource(ProductSource input) throws Exception { * Remove product from storage. */ @Override - public synchronized void removeProduct(ProductId id) throws Exception { + public void removeProduct(ProductId id) throws Exception { // prepare statement final String sql = "DELETE FROM " + this.table + " WHERE source=? AND type=? AND code=? AND updatetime=?"; diff --git a/src/main/java/gov/usgs/earthquake/aws/TrackingIndex.java b/src/main/java/gov/usgs/earthquake/aws/TrackingIndex.java index 73945b9b..8c34dc60 100644 --- a/src/main/java/gov/usgs/earthquake/aws/TrackingIndex.java +++ b/src/main/java/gov/usgs/earthquake/aws/TrackingIndex.java @@ -175,7 +175,7 @@ public void createSchema() throws Exception { * name of tracking data. * @return null if data not found. */ - public synchronized JsonObject getTrackingData(final String name) throws Exception { + public JsonObject getTrackingData(final String name) throws Exception { JsonObject data = null; final String sql = "SELECT * FROM " + this.table + " WHERE name=?"; @@ -210,7 +210,7 @@ public synchronized JsonObject getTrackingData(final String name) throws Excepti * name of tracking data. * @throws Exception */ - public synchronized void removeTrackingData(final String name) throws Exception { + public void removeTrackingData(final String name) throws Exception { final String sql = "DELETE FROM " + this.table + " WHERE name=?"; // create schema beginTransaction(); @@ -234,7 +234,7 @@ public synchronized void removeTrackingData(final String name) throws Exception * data to store. * @throws Exception */ - public synchronized void setTrackingData(final String name, final JsonObject data) throws Exception { + public void setTrackingData(final String name, final JsonObject data) throws Exception { final String update = "UPDATE " + this.table + " SET data=? WHERE name=?"; // usually updated, try update first beginTransaction(); diff --git a/src/main/java/gov/usgs/earthquake/distribution/JDBCNotificationIndex.java b/src/main/java/gov/usgs/earthquake/distribution/JDBCNotificationIndex.java index 337513e1..e6833d8e 100644 --- a/src/main/java/gov/usgs/earthquake/distribution/JDBCNotificationIndex.java +++ b/src/main/java/gov/usgs/earthquake/distribution/JDBCNotificationIndex.java @@ -390,7 +390,7 @@ public void startup() throws Exception { * * @see gov.usgs.util.Configurable */ - public synchronized void shutdown() throws Exception { + public void shutdown() throws Exception { // Close the DML statements try { _dml_addNotification.close(); @@ -513,11 +513,8 @@ public synchronized void shutdown() throws Exception { * if an error occurs while storing the notification. * @see gov.usgs.earthquake.distribution.NotificationIndex */ - public synchronized void addNotification(Notification notification) + public void addNotification(Notification notification) throws Exception { - // verify connection - this.verifyConnection(); - // Read the product id from the notification ProductId productId = notification.getProductId(); @@ -534,45 +531,35 @@ public synchronized void addNotification(Notification notification) ? notification.getTrackerURL().toString() : ""; - // Set the values we parsed above - _dml_addNotification.setString(1, productId.getSource()); - _dml_addNotification.setString(2, productId.getType()); - _dml_addNotification.setString(3, productId.getCode()); - _dml_addNotification.setDate(4, updateDate); - _dml_addNotification.setDate(5, expirationDate); - _dml_addNotification.setString(6, trackerUrl); - - // If this is a URL notification, set the product URL value as well - if (notification instanceof URLNotification) { - String productUrl = ((URLNotification) notification) - .getProductURL().toString(); - _dml_addNotification.setString(7, productUrl); - } else { - _dml_addNotification.setString(7, ""); - } - - // already verified above - Connection conn = getConnection(); + beginTransaction(); try { - // Begin a transaction - conn.setAutoCommit(false); + // Set the values we parsed above + _dml_addNotification.setString(1, productId.getSource()); + _dml_addNotification.setString(2, productId.getType()); + _dml_addNotification.setString(3, productId.getCode()); + _dml_addNotification.setDate(4, updateDate); + _dml_addNotification.setDate(5, expirationDate); + _dml_addNotification.setString(6, trackerUrl); + + // If this is a URL notification, set the product URL value as well + if (notification instanceof URLNotification) { + String productUrl = ((URLNotification) notification) + .getProductURL().toString(); + _dml_addNotification.setString(7, productUrl); + } else { + _dml_addNotification.setString(7, ""); + } + // Execute the query _dml_addNotification.executeUpdate(); - // Commit the changes - conn.setAutoCommit(true); + commitTransaction(); } catch (SQLException sqx) { // Undo any changes that may be in an unknown state. Ignore // exceptions that occur in this call since we're already throwing // an exception - try { - conn.rollback(); - } catch (SQLException ex) { - } - + rollbackTransaction(); // Re-throw this exception throw sqx; - } finally { - conn.setAutoCommit(true); } } @@ -587,10 +574,8 @@ public synchronized void addNotification(Notification notification) * if an error occurs while removing the notification. * @see gov.usgs.earthquake.distribution.NotificationIndex */ - public synchronized void removeNotification(Notification notification) + public void removeNotification(Notification notification) throws Exception { - // verify connection - this.verifyConnection(); // Read the product id from the notification ProductId productId = notification.getProductId(); @@ -605,45 +590,39 @@ public synchronized void removeNotification(Notification notification) ? notification.getTrackerURL().toString() : ""; - // Set the values we parsed above - _dml_removeNotification.setString(1, productId.getSource()); - _dml_removeNotification.setString(2, productId.getType()); - _dml_removeNotification.setString(3, productId.getCode()); - _dml_removeNotification.setDate(4, updateDate); - _dml_removeNotification.setDate(5, expirationDate); - _dml_removeNotification.setString(6, trackerUrl); - - // If this is a URL notification, set the product URL value as well - if (notification instanceof URLNotification) { - String productUrl = ((URLNotification) notification) - .getProductURL().toString(); - _dml_removeNotification.setString(7, productUrl); - } else { - // _dml_removeNotification.setNull(7, java.sql.Types.VARCHAR); - _dml_removeNotification.setString(7, ""); - } - - // already verified above - Connection conn = getConnection(); + // start transaction + beginTransaction(); try { - // Begin a transaction - conn.setAutoCommit(false); + // Set the values we parsed above + _dml_removeNotification.setString(1, productId.getSource()); + _dml_removeNotification.setString(2, productId.getType()); + _dml_removeNotification.setString(3, productId.getCode()); + _dml_removeNotification.setDate(4, updateDate); + _dml_removeNotification.setDate(5, expirationDate); + _dml_removeNotification.setString(6, trackerUrl); + + // If this is a URL notification, set the product URL value as well + if (notification instanceof URLNotification) { + String productUrl = ((URLNotification) notification) + .getProductURL().toString(); + _dml_removeNotification.setString(7, productUrl); + } else { + // _dml_removeNotification.setNull(7, java.sql.Types.VARCHAR); + _dml_removeNotification.setString(7, ""); + } + // Execute the query _dml_removeNotification.executeUpdate(); - // Commit the changes - conn.setAutoCommit(true); + + commitTransaction(); } catch (SQLException sqx) { // Undo any changes that may be in an unknown state. Ignore // exceptions that occur in this call since we're already throwing // an exception - try { - conn.rollback(); - } catch (SQLException ex) { - } + rollbackTransaction(); + // Re-throw this exception throw sqx; - } finally { - conn.setAutoCommit(true); } } @@ -659,22 +638,27 @@ public synchronized void removeNotification(Notification notification) * if an error occurs while searching the index. * @see gov.usgs.earthquake.distribution.NotificationIndex */ - public synchronized List findNotifications(ProductId id) - throws Exception { - // verify connection - this.verifyConnection(); - + public List findNotifications(ProductId id) throws Exception { String source = id.getSource(); String type = id.getType(); String code = id.getCode(); java.sql.Date update = new java.sql.Date(id.getUpdateTime().getTime()); - _query_findNotificationsById.setString(1, source); - _query_findNotificationsById.setString(2, type); - _query_findNotificationsById.setString(3, code); - _query_findNotificationsById.setDate(4, update); - - return getNotifications(_query_findNotificationsById); + // start transaction + this.beginTransaction(); + try { + _query_findNotificationsById.setString(1, source); + _query_findNotificationsById.setString(2, type); + _query_findNotificationsById.setString(3, code); + _query_findNotificationsById.setDate(4, update); + + List n = getNotifications(_query_findNotificationsById); + commitTransaction(); + return n; + } catch (Exception e) { + rollbackTransaction(); + throw e; + } } /** @@ -698,20 +682,25 @@ public synchronized List findNotifications(ProductId id) * if an error occurs while searching the index. * @see gov.usgs.earthquake.distribution.NotificationIndex */ - public synchronized List findNotifications(String source, + public List findNotifications(String source, String type, String code) throws Exception { - // verify connection - this.verifyConnection(); - source = (source == null) ? "%" : source.toUpperCase(); type = (type == null) ? "%" : type.toUpperCase(); code = (code == null) ? "%" : code.toUpperCase(); - _query_findNotificationsByData.setString(1, source); - _query_findNotificationsByData.setString(2, type); - _query_findNotificationsByData.setString(3, code); + beginTransaction(); + try { + _query_findNotificationsByData.setString(1, source); + _query_findNotificationsByData.setString(2, type); + _query_findNotificationsByData.setString(3, code); - return getNotifications(_query_findNotificationsByData); + final List n = getNotifications(_query_findNotificationsByData); + commitTransaction(); + return n; + } catch (Exception e) { + rollbackTransaction(); + throw e; + } } /** @@ -739,19 +728,13 @@ public synchronized List findNotifications(String source, * @throws Exception * if an error occurs while searching the index. */ - public synchronized List findNotifications( + public List findNotifications( List sources, List types, List codes) throws Exception { - // verify connection - this.verifyConnection(); - List n = null; - Connection conn = getConnection(); + beginTransaction(); try { - // begin a transaction - conn.setAutoCommit(false); - // Create a temporary lookup table _dml_createTmpTable.executeUpdate(); @@ -789,11 +772,10 @@ public synchronized List findNotifications( // is this a problem? reading with uncommitted writes? PreparedStatement ps = getCorrectStatement(sources, types, codes); n = getNotifications(ps); - } finally { - conn.rollback(); - // todo: this looks funky, but it's re-enabling autoCommit, which is - // needed for selects to not block other transactions - conn.setAutoCommit(true); + commitTransaction(); + } catch (Exception e) { + rollbackTransaction(); + throw e; } return n; @@ -809,18 +791,22 @@ public synchronized List findNotifications( * if an error occurs while searching the index. * @see gov.usgs.earthquake.distribution.NotificationIndex */ - public synchronized List findExpiredNotifications() - throws Exception { - // verify connection - this.verifyConnection(); - + public List findExpiredNotifications() throws Exception { // Create a new calendar object set to current date/time java.sql.Date curDate = new java.sql.Date((new Date()).getTime()); - // Bind the expiration date parameter and run the query - _query_findExpiredNotifications.setDate(1, curDate); + beginTransaction(); + try { + // Bind the expiration date parameter and run the query + _query_findExpiredNotifications.setDate(1, curDate); - return getNotifications(_query_findExpiredNotifications); + List n = getNotifications(_query_findExpiredNotifications); + commitTransaction(); + return n; + } catch (Exception e) { + rollbackTransaction(); + throw e; + } } /** @@ -842,13 +828,11 @@ public synchronized List findExpiredNotifications() * @throws Exception * If a SQLException occurs. */ - protected synchronized List getNotifications(PreparedStatement ps) + protected List getNotifications(PreparedStatement ps) throws Exception { List n = new ArrayList(); - ResultSet rs = null; - try { - rs = ps.executeQuery(); + try (final ResultSet rs = ps.executeQuery()) { while (rs.next()) { n.add(parseNotification(rs.getString(PRODUCT_SOURCE_COLUMN), rs.getString(PRODUCT_TYPE_COLUMN), @@ -858,12 +842,6 @@ protected synchronized List getNotifications(PreparedStatement ps) rs.getString(TRACKER_URL_COLUMN), rs.getString(PRODUCT_URL_COLUMN))); } - } finally { - try { - rs.close(); - } catch (Exception e) { - //ignore - } } return n; diff --git a/src/main/java/gov/usgs/earthquake/indexer/Indexer.java b/src/main/java/gov/usgs/earthquake/indexer/Indexer.java index c99a7679..f154ba04 100644 --- a/src/main/java/gov/usgs/earthquake/indexer/Indexer.java +++ b/src/main/java/gov/usgs/earthquake/indexer/Indexer.java @@ -142,9 +142,6 @@ public class Indexer extends DefaultNotificationListener { /** Task for archive policy thread. */ private TimerTask archiveTask = null; - /** Synchronization object for indexing. */ - private final Object indexProductSync = new Object(); - /** * Service used by FutureExecutorTask for execution. * See distribution.FutureListenerNotifier for more details. @@ -354,7 +351,7 @@ public void removeListener(final IndexerListener toRemove) { * IndexerEvent has a specific "type" to clarify the type of * event that occurred. */ - protected synchronized void notifyListeners(final IndexerEvent event) { + protected void notifyListeners(final IndexerEvent event) { StringBuffer buf = new StringBuffer(); Iterator changes = event.getIndexerChanges().iterator(); while (changes.hasNext()) { @@ -391,9 +388,7 @@ protected synchronized void notifyListeners(final IndexerEvent event) { + "event."); } - Iterator it = listeners.keySet().iterator(); - while (it.hasNext()) { - final IndexerListener listener = it.next(); + for (final IndexerListener listener : listeners.keySet()) { ExecutorService listenerExecutor = listeners.get(listener); FutureExecutorTask listenerTask = new FutureExecutorTask( backgroundService, listenerExecutor, listener.getMaxTries(), @@ -409,41 +404,21 @@ protected synchronized void notifyListeners(final IndexerEvent event) { * @param id * @return true if product has already been indexed. */ - protected boolean hasProductBeenIndexed(final ProductId id) { + protected boolean hasProductBeenIndexed(final ProductId id) throws Exception { + boolean hasProduct = false; + // use transaction to avoid stepping on #indexProduct() + productIndex.beginTransaction(); try { - ProductIndexQuery alreadyProcessedQuery = new ProductIndexQuery(); - alreadyProcessedQuery - .setResultType(ProductIndexQuery.RESULT_TYPE_ALL); - - // alreadyProcessedQuery.getProductIds().add(id); - // use existing indexes - alreadyProcessedQuery.setProductSource(id.getSource()); - alreadyProcessedQuery.setProductType(id.getType()); - alreadyProcessedQuery.setProductCode(id.getCode()); - alreadyProcessedQuery.setMinProductUpdateTime(id.getUpdateTime()); - alreadyProcessedQuery.setMaxProductUpdateTime(id.getUpdateTime()); - - final List existingSummary; - if (productIndex instanceof JDBCProductIndex) { - // only checking if exists, use one query instead of multiple. - existingSummary = ((JDBCProductIndex) productIndex) - .getProducts(alreadyProcessedQuery, false); - } else { - existingSummary = productIndex.getProducts(alreadyProcessedQuery); - } - - if (existingSummary.size() > 0 && - existingSummary.get(0).getId().equals(id)) { - // it is in the product index - return true; - } + hasProduct = productIndex.hasProduct(id); + productIndex.commitTransaction(); } catch (Exception wtf) { LOGGER.log(Level.WARNING, "[" + getName() + "] exception checking if product already indexed", wtf); + productIndex.rollbackTransaction(); } // default is it hasn't been processed - return false; + return hasProduct; } /** @@ -557,42 +532,35 @@ public void onProduct(final Product product, final boolean force) throws Excepti // -- Step 2: Use product module to summarize product // -------------------------------------------------------------------// - LOGGER.finer("[" + getName() + "] summarizing product id=" - + id.toString()); - // Find best available indexer module - IndexerModule module = getModule(product); - - // Use this module to summarize the product - ProductSummary productSummary = module.getProductSummary(product); + LOGGER.finer("[" + getName() + "] summarizing product id=" + id.toString()); + final ProductSummary productSummary = summarizeProduct(product); // -------------------------------------------------------------------// // -- Step 3: Add product summary to the product index // -------------------------------------------------------------------// + LOGGER.finer("[" + getName() + "] indexing product id=" + id.toString()); + indexProduct(productSummary); + } - // measure time waiting to enter synchronized block - final long beforeEnterSync = new Date().getTime(); - synchronized (indexProductSync) { - final long afterEnterSync = new Date().getTime(); + /** + * Use modules to summarize product. + */ + protected ProductSummary summarizeProduct(final Product product) throws Exception { + // Find best available indexer module + IndexerModule module = getModule(product); - try { - productSummary = indexProduct(productSummary); - } finally { - final long endIndex = new Date().getTime(); - LOGGER.fine("[" + getName() + "] indexer processed product id=" - + id.toString() + " in " + - (endIndex - beginStore) + " ms" - + " (" + (afterEnterSync - beforeEnterSync) + " ms sync delay)"); - } - } + // Use this module to summarize the product + ProductSummary productSummary = module.getProductSummary(product); + + return productSummary; } /** * Add product summary to product index. */ - protected synchronized ProductSummary indexProduct( + protected ProductSummary indexProduct( ProductSummary productSummary) throws Exception { - LOGGER.finest("[" + getName() + "] beginning index transaction"); // The notification to be sent when we are finished with this product IndexerEvent notification = new IndexerEvent(this); @@ -600,7 +568,11 @@ protected synchronized ProductSummary indexProduct( notification.setSummary(productSummary); // Start the product index transaction, only proceed if able + final long beforeBegin = new Date().getTime(); productIndex.beginTransaction(); + final long afterBegin = new Date().getTime(); + LOGGER.finer("[" + getName() + "] beginning index transaction" + + " (" + (afterBegin - beforeBegin) + " ms waiting for lock)"); try { LOGGER.finer("[" + getName() + "] finding previous version"); @@ -761,24 +733,6 @@ protected synchronized ProductSummary indexProduct( // Commit our changes to the index (after updating summary attrs) productIndex.commitTransaction(); - try { - LOGGER.fine("[" + getName() + "] notifying listeners"); - // ---------------------------------------------------------// - // -- Step 5: Notify listeners with our indexer event - // ---------------------------------------------------------// - notifyListeners(notification); - } catch (Exception e) { - // this doesn't affect success of index transaction... - LOGGER.log(Level.WARNING, "[" + getName() - + "] exception while notifying listeners", e); - } - - // send heartbeat info - HeartbeatListener.sendHeartbeatMessage(getName(), - "indexed product", productSummary.getId().toString()); - - // return summary after added to index - return productSummary; } catch (Exception e) { LOGGER.log(Level.FINE, "[" + getName() + "] rolling back transaction", e); // just rollback since it wasn't successful @@ -792,7 +746,34 @@ protected synchronized ProductSummary indexProduct( "index exception class", e.getClass().getName()); throw e; + } finally { + final long afterIndex = new Date().getTime(); + LOGGER.fine("[" + getName() + "] index transaction total" + + " (" + (afterIndex - afterBegin) + " ms)"); + } + + // ---------------------------------------------------------// + // -- Step 5: Notify listeners with our indexer event + // ---------------------------------------------------------// + + // this doesn't affect transaction, run outside + try { + LOGGER.fine("[" + getName() + "] notifying listeners"); + notifyListeners(notification); + } catch (Exception e) { + // this doesn't affect success of index transaction... + LOGGER.log( + Level.WARNING, + "[" + getName() + "] exception notifying listeners", + e); } + + // send heartbeat info + HeartbeatListener.sendHeartbeatMessage(getName(), + "indexed product", productSummary.getId().toString()); + + // return summary after added to index + return productSummary; } /** @@ -1044,6 +1025,8 @@ protected ProductId getTrumpedProductId(final ProductSummary trumpSummary) { /** * Get a product summary object using its product id. * + * Should be called within productIndex transaction. + * * @param id * id to find. * @return matching product summary or null. @@ -1063,6 +1046,8 @@ protected ProductSummary getProductSummaryById(final ProductId id) /** * Update a product summary weight * + * Should be called within productIndex transaction. + * * @param event * the event. * @param summary @@ -1099,6 +1084,8 @@ protected Event setSummaryWeight(Event event, ProductSummary summary, /** * Resummarize a product within an event. * + * Should be called within productIndex transaction. + * * @param event * the event. * @param summary @@ -1134,6 +1121,8 @@ protected Event resummarizeProduct(final Event event, /** * Check for event splits (and split them if needed). * + * Should be called within productIndex transaction. + * * @param summary * the summary the indexer is currently processing. * @param originalEvent @@ -1143,7 +1132,7 @@ protected Event resummarizeProduct(final Event event, * @return List of changes made during this method. * @throws Exception */ - protected synchronized List checkForEventSplits( + protected List checkForEventSplits( final ProductSummary summary, final Event originalEvent, final Event updatedEvent) throws Exception { List changes = new ArrayList(); @@ -1169,11 +1158,19 @@ protected synchronized List checkForEventSplits( // see how sub events associate compared to this event (since the // original event is the one that should be "UPDATED") Event originalSubEvent = subEvents.remove(originalEventId); + if (originalSubEvent == null) { + // wtf + // this should always exist because its ID was returned by getEventId, + // should should have at least one product. Log: + LOGGER.fine("[" + getName() + "] originalSubEvent is null" + + ", originalEventId=" + originalEventId); + for (final String id: subEvents.keySet()) { + final Event subEvent = subEvents.get(id); + subEvent.log(LOGGER); + } + } - Iterator subEventsIter = new ArrayList( - subEvents.keySet()).iterator(); - while (subEventsIter.hasNext()) { - String nextEventId = subEventsIter.next(); + for (final String nextEventId : subEvents.keySet()) { Event nextEvent = subEvents.get(nextEventId); if (!originalSubEvent.isAssociated(nextEvent, associator)) { @@ -1244,6 +1241,8 @@ protected synchronized List checkForEventSplits( * Removes the leaf event (and all its products) from the root event. This * method modifies the runtime objects as well as updating the index DB. * + * Should be called within productIndex transaction. + * * @param root * The root event from which all leaf products will be removed * @param leaf @@ -1252,7 +1251,7 @@ protected synchronized List checkForEventSplits( * indexId property of leaf is updated to its new value. * @throws Exception */ - protected synchronized Event splitEvents(final Event root, final Event leaf) + protected Event splitEvents(final Event root, final Event leaf) throws Exception { Event updated = root; Iterator leafProducts = leaf.getProductList() @@ -1283,13 +1282,15 @@ protected synchronized Event splitEvents(final Event root, final Event leaf) * products are duplicates. This method modifies the runtime objects as well * as the index DB. The child event is then deleted. * + * Should be called within productIndex transaction. + * * @param target * The target event into which the child is merged. * @param child * The child event to be merged into the target. * @throws Exception */ - protected synchronized Event mergeEvents(final Event target, + protected Event mergeEvents(final Event target, final Event child) throws Exception { Iterator childProducts = child.getProductList() .iterator(); @@ -1313,6 +1314,8 @@ protected synchronized Event mergeEvents(final Event target, * Check and merge any nearby events or previously unassociated products * that now associate. * + * Should be called within productIndex transaction. + * * @param summary * the summary currently being processed by the indexer. * @param originalEvent @@ -1322,7 +1325,7 @@ protected synchronized Event mergeEvents(final Event target, * @return list of any merge type changes. * @throws Exception */ - protected synchronized List checkForEventMerges( + protected List checkForEventMerges( final ProductSummary summary, final Event originalEvent, final Event updatedEvent) throws Exception { List changes = new ArrayList(); @@ -1465,7 +1468,16 @@ protected synchronized List checkForEventMerges( return changes; } - protected synchronized ProductSummary getPrevProductVersion( + /** + * Find previous version of product. + * + * Should be called within productIndex transaction. + * + * @param summary + * @return + * @throws Exception + */ + protected ProductSummary getPrevProductVersion( ProductSummary summary) throws Exception { ProductSummary prevSummary = null; List candidateSummaries = null; @@ -1500,6 +1512,8 @@ protected synchronized ProductSummary getPrevProductVersion( * {@link #checkForEventMerges(ProductSummary, Event, Event)} and are * ignored during this method. * + * Should be called within productIndex transaction. + * * @see Associator#getSearchRequest(ProductSummary) * @see Associator#chooseEvent(List, ProductSummary) * @@ -1508,7 +1522,7 @@ protected synchronized ProductSummary getPrevProductVersion( * found. * @throws Exception */ - protected synchronized Event getPrevEvent(ProductSummary summary) + protected Event getPrevEvent(ProductSummary summary) throws Exception { return getPrevEvent(summary, false); } @@ -1516,13 +1530,15 @@ protected synchronized Event getPrevEvent(ProductSummary summary) /** * Find an existing event that summary should associate with. * + * Should be called within productIndex transaction. + * * @param summary the previous event. * @param associating whether associating (vs archiving). * @return previous event, or null if none found. * @throws Exception */ - protected synchronized Event getPrevEvent(ProductSummary summary, - boolean associating) throws Exception { + protected Event getPrevEvent(ProductSummary summary, boolean associating) + throws Exception { Event prevEvent = null; List candidateEvents = null; @@ -1548,43 +1564,11 @@ protected synchronized Event getPrevEvent(ProductSummary summary, return prevEvent; } - /* - * protected IndexerEvent createIndexerEvent(ProductSummary prevSummary, - * Event prevEvent, ProductSummary summary, Event event) { IndexerType type - * = null; IndexerEvent indexerEvent = new IndexerEvent(this); - * - * // ---------------------------------- // Determine the type if - * IndexerEvent // ---------------------------------- - * - * if (summary.getStatus() == Product.STATUS_DELETE) { type = - * IndexerEvent.PRODUCT_DELETED; if (event != null) { // Since we have an - * event, this is now an EVENT_UPDATED type type = - * IndexerEvent.EVENT_UPDATED; - * - * // Check if all products on event are deleted. if - * (event.getProductList().size() == 0) { type = IndexerEvent.EVENT_DELETED; - * } } } else { // Product was not a "DELETE" status. Must be an added or - * updated. if (prevEvent == null && event != null) { type = - * IndexerEvent.EVENT_ADDED; } else if (prevEvent != null && event != null) - * { type = IndexerEvent.EVENT_UPDATED; } else if (prevSummary == null && - * summary != null) { type = IndexerEvent.PRODUCT_ADDED; } else if - * (prevSummary != null && summary != null) { type = - * IndexerEvent.PRODUCT_UPDATED; } - * - * if (summary == null) { // Not sure how this happens. - * LOGGER.warning("Trying to notify of a null summary."); } } - * - * // Set parameters indexerEvent.setEventType(type); - * indexerEvent.setOldEvent(prevEvent); indexerEvent.setSummary(summary); - * indexerEvent.setEvent(event); - * - * return indexerEvent; } - */ /** * Loads parent, specific, and dependent configurations; in that order. */ @Override - public synchronized void configure(Config config) throws Exception { + public void configure(Config config) throws Exception { // -- Load parent configurations -- // super.configure(config); @@ -1767,9 +1751,10 @@ public synchronized void configure(Config config) throws Exception { * executor services (from listeners) are shutdown in sequence. */ @Override - public synchronized void shutdown() throws Exception { + public void shutdown() throws Exception { // -- Shut down dependent processes -- // try { + // waits for current transaction, unless transaction thread is interrupted productIndex.shutdown(); } catch (Exception e) { LOGGER.log(Level.WARNING, "[" + getName() @@ -1833,7 +1818,7 @@ public synchronized void shutdown() throws Exception { * that order. */ @Override - public synchronized void startup() throws Exception { + public void startup() throws Exception { // -- Call parent startup method -- // super.startup(); @@ -1912,138 +1897,177 @@ public void run() { * * @see #archivePolicies */ - public synchronized int[] purgeExpiredProducts() throws Exception { + public int[] purgeExpiredProducts() throws Exception { int[] counts = { 0, 0 }; - ProductIndexQuery query = null; - ArchivePolicy policy = null; if (isDisableArchive()) { LOGGER.info("Archiving disabled"); return counts; } - for (int i = 0; i < archivePolicies.size(); i++) { - policy = archivePolicies.get(i); - query = policy.getIndexQuery(); + for (final ArchivePolicy policy : archivePolicies) { + int countIndex = 0; + String policyName = policy.getName(); + String policyType = "event"; + if (policy instanceof ProductArchivePolicy) { + countIndex = 1; + policyType = "product"; + } + LOGGER.fine("[" + getName() + "]" + + " running " + policyType + " archive policy (" + policyName + ")"); + try { + int count = runArchivePolicy(policy); + counts[countIndex] += count; + LOGGER.finer("[" + getName() + "]" + + " archive policy (" + policyName + ")" + + " removed " + count + " " + policyType + "s"); + } catch (Exception e) { + LOGGER.log( + Level.WARNING, + "[" + getName() + "] exception" + + " running " + policyType + " archive policy (" + policyName + ")", + e); + } - if (!(policy instanceof ProductArchivePolicy)) { - // -- Purge expired events for this policy -- // - LOGGER.fine("[" + getName() - + "] running event archive policy (" + policy.getName() - + ")"); - try { - // Get a list of those events - List expiredEvents = productIndex.getEvents(query); - - // Loop over list of expired events and remove each one - Iterator eventIter = expiredEvents.iterator(); - while (eventIter.hasNext()) { - Event event = eventIter.next(); - - LOGGER.info("[" + getName() + "] archiving event " - + event.getEventId()); - event.log(LOGGER); - - productIndex.beginTransaction(); - try { - removeEvent(event); - - // Notify of the event archived - IndexerEvent notification = new IndexerEvent(this); - notification.setSummary(null); - notification.addIndexerChange(new IndexerChange( - IndexerChange.EVENT_ARCHIVED, event, null)); - notifyListeners(notification); - - ++counts[0]; - productIndex.commitTransaction(); - } catch (Exception e) { - LOGGER.log(Level.WARNING, "[" + getName() - + "] exception archiving event " - + event.getEventId() + ", rolling back", e); - productIndex.rollbackTransaction(); - } - } - } catch (Exception e) { - LOGGER.log(Level.WARNING, "[" + getName() - + "] exception running event archive policy (" - + policy.getName() + ") ", e); + } + + return counts; + } + + /** + * Run an archive policy. + * + * Calls {@link #runProductArchivePolicy(ProductArchivePolicy)} instead for + * product archive policies. + * + * Otherwise, runs multiple productIndex transactions to incrementally + * remove events matching policy.. + * + * @param policy + * @return + * @throws Exception + */ + protected int runArchivePolicy(final ArchivePolicy policy) throws Exception { + if (policy instanceof ProductArchivePolicy) { + return runProductArchivePolicy((ProductArchivePolicy) policy); + } + + int count = 0; + final ProductIndexQuery query = policy.getIndexQuery(); + + // loop over query with limit for smaller transactions + query.setLimit(100); + while (true) { + List expiredEvents; + productIndex.beginTransaction(); + try { + // Get a list of those events + expiredEvents = productIndex.getEvents(query); + + // Remove events + for (final Event event : expiredEvents) { + LOGGER.info("[" + getName() + "] archiving event " + event.getEventId()); + event.log(LOGGER); + + removeEvent(event); + count += 1; } - } - if (policy instanceof ProductArchivePolicy) { - ProductArchivePolicy productPolicy = (ProductArchivePolicy) policy; + productIndex.commitTransaction(); + if (expiredEvents.size() == 0) { + // no more events to remove + break; + } + } catch (Exception e) { + // something went wrong, rollback and stop + productIndex.rollbackTransaction(); + throw e; + } - // -- Purge expired products for this policy -- // - LOGGER.fine("[" + getName() - + "] running product archive policy (" - + policy.getName() + ")"); + // notify listeners if removed successfully; outside transaction + for (final Event event : expiredEvents) { + IndexerEvent notification = new IndexerEvent(this); + notification.setSummary(null); + notification.addIndexerChange(new IndexerChange( + IndexerChange.EVENT_ARCHIVED, event, null)); + notifyListeners(notification); + } + } + return count; + } - try { - // Get a list of those products - List expiredProducts; + /** + * Run a product archive policy. + * + * Runs multiple productIndex transactions to incrementally + * remove products matching policy.. + * + * @param policy + * @return + * @throws Exception + */ + protected int runProductArchivePolicy(final ProductArchivePolicy policy) + throws Exception { + int count = 0; + final ProductIndexQuery query = policy.getIndexQuery(); + + // loop over query with limit for smaller transactions + query.setLimit(500); + while (true) { + List expiredProducts; + productIndex.beginTransaction(); + try { + // Get a list of those products + if (policy.isOnlyUnassociated()) { + expiredProducts = productIndex.getUnassociatedProducts(query); + } else { + expiredProducts = productIndex.getProducts(query); + } - if (productPolicy.isOnlyUnassociated()) { - expiredProducts = productIndex - .getUnassociatedProducts(query); - } else { - expiredProducts = productIndex.getProducts(query); - } + for (final ProductSummary product : expiredProducts) { + LOGGER.info("[" + getName() + "]" + + " archiving product " + product.getId().toString()); + removeSummary(product); + count += 1; + } - // Loop over list of expired products and remove each one - Iterator productIter = expiredProducts - .iterator(); - while (productIter.hasNext()) { - ProductSummary product = productIter.next(); - - LOGGER.info("[" + getName() + "] archiving product " - + product.getId().toString()); - productIndex.beginTransaction(); - try { - removeSummary(product); - - // Notify of the product archived - IndexerEvent notification = new IndexerEvent(this); - notification.setSummary(product); - notification.addIndexerChange(new IndexerChange( - IndexerChange.PRODUCT_ARCHIVED, null, null)); - notifyListeners(notification); - - ++counts[1]; - productIndex.commitTransaction(); - } catch (Exception e) { - LOGGER.log(Level.WARNING, "[" + getName() - + "] exception archiving event " - + product.getId().toString() + ", rolling back", e); - productIndex.rollbackTransaction(); - } - } - } catch (Exception e) { - LOGGER.log(Level.WARNING, "[" + getName() - + "] exception running product archive policy (" - + policy.getName() + ")", e); + productIndex.commitTransaction(); + if (expiredProducts.size() == 0) { + // no more products to remove + break; } + } catch (Exception e) { + // something went wrong, rollback and stop + productIndex.rollbackTransaction(); + throw e; + } + // notify listeners if removed successfully; outside transaction + for (final ProductSummary product : expiredProducts) { + IndexerEvent notification = new IndexerEvent(this); + notification.setSummary(product); + notification.addIndexerChange(new IndexerChange( + IndexerChange.PRODUCT_ARCHIVED, null, null)); + notifyListeners(notification); } } - return counts; + return count; } /** * Removes the given event from the Indexer ProductIndex and ProductStorage. * + * Should be called within productIndex transaction. + * * @param event * @throws Exception * If errors occur while removing the event */ - protected synchronized void removeEvent(Event event) throws Exception { + protected void removeEvent(Event event) throws Exception { // Removing an "event" from storage is really just removing all its // associated products - List summaries = event.getAllProductList(); - Iterator summaryIter = summaries.iterator(); - while (summaryIter.hasNext()) { - ProductSummary summary = summaryIter.next(); + for (final ProductSummary summary : event.getAllProductList()) { // Remove product from storage productStorage.removeProduct(summary.getId()); // Remove product summary from index @@ -2055,14 +2079,15 @@ protected synchronized void removeEvent(Event event) throws Exception { } /** - * Removes the given summary from the Indexer ProductIndex and - * ProductStorage. + * Removes the given summary from the Indexer ProductIndex and ProductStorage. + * + * Should be called within productIndex transaction. * * @param summary * @throws Exception * If errors occur while removing the summary */ - protected synchronized void removeSummary(ProductSummary summary) + protected void removeSummary(ProductSummary summary) throws Exception { Event event = getPrevEvent(summary); @@ -2104,6 +2129,8 @@ protected synchronized void removeSummary(ProductSummary summary) * latitude and longitude, and (time) time, in order to have the minimum * properties required to create a new event. * + * Should be called within productIndex transaction. + * * @param summary * The product summary serving as the basis for the new event. * @return The event that is created, added and associated or null if the @@ -2114,7 +2141,7 @@ protected synchronized void removeSummary(ProductSummary summary) * happen if this method is called before the summary is added * to the ProductIndex. */ - private synchronized Event createEvent(ProductSummary summary) + private Event createEvent(ProductSummary summary) throws Exception { if (Event.productHasOriginProperties(summary)) { Event event = productIndex.addEvent(new Event()); @@ -2127,60 +2154,48 @@ private synchronized Event createEvent(ProductSummary summary) /** * Search for products in this index. * + * Should be called within productIndex transaction. + * * @param request * the search request. * @return the search response. * @throws Exception */ - public synchronized SearchResponse search(SearchRequest request) + public SearchResponse search(SearchRequest request) throws Exception { SearchResponse response = new SearchResponse(); // Execute each query - Iterator iter = request.getQueries().iterator(); - while (iter.hasNext()) { - SearchQuery query = iter.next(); + for (final SearchQuery search : request.getQueries()) { + final ProductIndexQuery query = search.getProductIndexQuery(); - if (query instanceof EventsSummaryQuery) { + if (search instanceof EventDetailQuery) { + List events = productIndex.getEvents(query); + ((EventDetailQuery) search).setResult(events); + } else if (search instanceof EventsSummaryQuery) { List eventSummaries = new LinkedList(); - Iterator events = productIndex.getEvents( - query.getProductIndexQuery()).iterator(); + List events = productIndex.getEvents(query); // convert events to event summaries - while (events.hasNext()) { - Event event = events.next(); + for (final Event event : events) { eventSummaries.add(event.getEventSummary()); } - ((EventsSummaryQuery) query).setResult(eventSummaries); - } - - else if (query instanceof EventDetailQuery) { - List events = productIndex.getEvents(query - .getProductIndexQuery()); - ((EventDetailQuery) query).setResult(events); - } - - else if (query instanceof ProductsSummaryQuery) { - List products = productIndex.getProducts(query - .getProductIndexQuery()); - ((ProductsSummaryQuery) query).setResult(products); - } - - else if (query instanceof ProductDetailQuery) { + ((EventsSummaryQuery) search).setResult(eventSummaries); + } else if (search instanceof ProductDetailQuery) { List products = new LinkedList(); - Iterator ids = query.getProductIndexQuery() - .getProductIds().iterator(); // fetch products from storage - while (ids.hasNext()) { - ProductId id = ids.next(); + for (final ProductId id : query.getProductIds()) { Product product = productStorage.getProduct(id); if (product != null) { products.add(product); } } - ((ProductDetailQuery) query).setResult(products); + ((ProductDetailQuery) search).setResult(products); + } else if (search instanceof ProductsSummaryQuery) { + List products = productIndex.getProducts(query); + ((ProductsSummaryQuery) search).setResult(products); } - response.addResult(query); + response.addResult(search); } return response; diff --git a/src/main/java/gov/usgs/earthquake/indexer/JDBCProductIndex.java b/src/main/java/gov/usgs/earthquake/indexer/JDBCProductIndex.java index ee0b37c9..b0c34a3d 100644 --- a/src/main/java/gov/usgs/earthquake/indexer/JDBCProductIndex.java +++ b/src/main/java/gov/usgs/earthquake/indexer/JDBCProductIndex.java @@ -215,7 +215,7 @@ public Connection connect() throws Exception { * @return List of Event objects */ @Override - public synchronized List getEvents(ProductIndexQuery query) + public List getEvents(ProductIndexQuery query) throws Exception { // map of events (index id => event), so products can be added incrementally final Map events = new HashMap<>(); @@ -274,7 +274,7 @@ public synchronized List getEvents(ProductIndexQuery query) * @return Event object with eventId set to the database id */ @Override - public synchronized Event addEvent(Event event) throws Exception { + public Event addEvent(Event event) throws Exception { Event e = null; final String sql = "INSERT INTO event (created) VALUES (?)"; @@ -317,7 +317,7 @@ public synchronized Event addEvent(Event event) throws Exception { * method call */ @Override - public synchronized List removeEvent(Event event) + public List removeEvent(Event event) throws Exception { Long id = event.getIndexId(); @@ -361,7 +361,7 @@ public synchronized List removeEvent(Event event) * when query event search type is SEARCH_EVENT_PREFERRED. */ @Override - public synchronized List getUnassociatedProducts( + public List getUnassociatedProducts( ProductIndexQuery query) throws Exception { if (query.getEventSearchType() == ProductIndexQuery.SEARCH_EVENT_PREFERRED) { throw new IllegalArgumentException( @@ -402,7 +402,7 @@ public synchronized List getUnassociatedProducts( * when query event search type is SEARCH_EVENT_PREFERRED. */ @Override - public synchronized List getProducts(ProductIndexQuery query) + public List getProducts(ProductIndexQuery query) throws Exception { // load full product summaries by default return getProducts(query, true); @@ -417,7 +417,7 @@ public synchronized List getProducts(ProductIndexQuery query) * whether to call {@link #loadProductSummaries(List)}, * which loads links and properties with additional queries. */ - public synchronized List getProducts(ProductIndexQuery query, final boolean loadDetails) + public List getProducts(ProductIndexQuery query, final boolean loadDetails) throws Exception { final List clauseList = buildProductClauses(query); final String sql = buildProductQuery(clauseList); @@ -441,6 +441,32 @@ public synchronized List getProducts(ProductIndexQuery query, fi return products; } + /** + * Check whether product summary is in index. + * + * @param id + * product to search. + */ + public boolean hasProduct(final ProductId id) throws Exception { + final String sql = "SELECT id FROM productSummary" + + " WHERE source=? AND type=? AND code=? AND updateTime=?"; + try ( + final PreparedStatement statement = getConnection().prepareStatement(sql); + ) { + statement.setString(1, id.getSource()); + statement.setString(2, id.getType()); + statement.setString(3, id.getCode()); + statement.setLong(4, id.getUpdateTime().getTime()); + + try ( + final ResultSet results = statement.executeQuery(); + ) { + // return true if there is a matching row, false otherwise + return results.next(); + } + } + } + /** * Add a product summary to the database * @@ -451,7 +477,7 @@ public synchronized List getProducts(ProductIndexQuery query, fi * @throws Exception */ @Override - public synchronized ProductSummary addProductSummary(ProductSummary summary) + public ProductSummary addProductSummary(ProductSummary summary) throws Exception { // Add values to the prepared statement long productId = 0; @@ -562,7 +588,7 @@ public synchronized ProductSummary addProductSummary(ProductSummary summary) * ProductSummary object to delete */ @Override - public synchronized ProductId removeProductSummary(ProductSummary summary) + public ProductId removeProductSummary(ProductSummary summary) throws Exception { List removed = removeProductSummaries(Arrays.asList(summary)); return removed.get(0); @@ -578,7 +604,7 @@ public synchronized ProductId removeProductSummary(ProductSummary summary) * @return Copy of event with summary added to the event's products list */ @Override - public synchronized Event addAssociation(Event event, ProductSummary summary) + public Event addAssociation(Event event, ProductSummary summary) throws Exception { if (event.getIndexId() == null || summary.getIndexId() == null) { @@ -626,7 +652,7 @@ public synchronized Event addAssociation(Event event, ProductSummary summary) * @param summary */ @Override - public synchronized Event removeAssociation(Event event, + public Event removeAssociation(Event event, ProductSummary summary) throws Exception { // Deleting the association is really just removing the foreign key @@ -1027,7 +1053,7 @@ protected void loadProductSummaries(final List summaries) ",") + ")"; try ( - final PreparedStatement statement = verifyConnection().prepareStatement(linkSql); + final PreparedStatement statement = getConnection().prepareStatement(linkSql); final ResultSet results = statement.executeQuery(); ) { while (results.next()) { @@ -1049,7 +1075,7 @@ protected void loadProductSummaries(final List summaries) + ")"; try ( final PreparedStatement statement = - verifyConnection().prepareStatement(propertySql); + getConnection().prepareStatement(propertySql); final ResultSet results = statement.executeQuery(); ) { while (results.next()) { @@ -1118,7 +1144,7 @@ protected ProductSummary parseProductSummary(ResultSet results) return p; } - public synchronized List removeProductSummaries( + public List removeProductSummaries( final List summaries) throws Exception { // index by id final ArrayList ids = new ArrayList<>(); @@ -1154,7 +1180,7 @@ public synchronized List removeProductSummaries( for (final String sql : sqls) { try ( final PreparedStatement statement = - verifyConnection().prepareStatement(sql + idsIn); + getConnection().prepareStatement(sql + idsIn); ) { int rows = statement.executeUpdate(); LOGGER.log(Level.FINER, "[" + getName() + "] removed " + rows + " rows"); @@ -1172,7 +1198,7 @@ public synchronized List removeProductSummaries( * @param properties * @throws SQLException */ - protected synchronized void addProductProperties(final long productId, + protected void addProductProperties(final long productId, final Map properties) throws SQLException { // Loop through the properties list and add them all to the database final String sql = "INSERT INTO productSummaryProperty" @@ -1205,7 +1231,7 @@ protected synchronized void addProductProperties(final long productId, * Map of relations to URIs * @throws SQLException */ - protected synchronized void addProductLinks(long productId, + protected void addProductLinks(long productId, Map> links) throws SQLException { // Loop through the properties list and add them all to the database final String sql = "INSERT INTO productSummaryLink" @@ -1281,7 +1307,7 @@ protected BigDecimal normalizeLongitude(BigDecimal lon) { * the events that have been updated. */ @Override - public synchronized void eventsUpdated(List events) throws Exception { + public void eventsUpdated(List events) throws Exception { Long indexId = null; final String deletedSql = "UPDATE event SET status=? WHERE id=?"; diff --git a/src/main/java/gov/usgs/earthquake/indexer/ProductIndex.java b/src/main/java/gov/usgs/earthquake/indexer/ProductIndex.java index 51723e64..50fc0637 100644 --- a/src/main/java/gov/usgs/earthquake/indexer/ProductIndex.java +++ b/src/main/java/gov/usgs/earthquake/indexer/ProductIndex.java @@ -10,10 +10,10 @@ /** * An index of products. - * + * * The Indexer uses a ProductIndex to store received Products, and associate * them together into Events. - * + * * The transaction methods are used when one product results in several changes * to the database. For instance, add a ProductSummary then add an association * to an event. @@ -22,7 +22,7 @@ public interface ProductIndex extends Configurable { /** * If the index supports transactions, begin a transaction. - * + * * @throws Exception */ public void beginTransaction() throws Exception; @@ -30,7 +30,7 @@ public interface ProductIndex extends Configurable { /** * If the index supports transactions, and beginTransaction was previously * called, commit the pending transaction. - * + * * @throws Exception */ public void commitTransaction() throws Exception; @@ -38,14 +38,14 @@ public interface ProductIndex extends Configurable { /** * If the index supports transactions, and beginTransaction was previously * called, rollback the pending transaction. - * + * * @throws Exception */ public void rollbackTransaction() throws Exception; /** * Get events in this index. - * + * * @param query * a description of which events to retrieve. * @return a list of matching events. @@ -55,7 +55,7 @@ public interface ProductIndex extends Configurable { /** * Get products in this index. - * + * * @param query * a description of which products to retrieve. * @return a list of matching products. @@ -64,9 +64,18 @@ public interface ProductIndex extends Configurable { public List getProducts(ProductIndexQuery query) throws Exception; + /** + * Check whether product is in index. + * + * @param id + * @return + * @throws Exception + */ + public boolean hasProduct(ProductId id) throws Exception; + /** * Get products in this index that aren't associated to any event. - * + * * @param query * a description of which products to retrieve. * @return a list of unassociated products @@ -77,7 +86,7 @@ public List getUnassociatedProducts(ProductIndexQuery query) /** * Add an event to the index. - * + * * @param event * the event to add. * @return Copy of event with the eventId attribute set to the id in the @@ -88,7 +97,7 @@ public List getUnassociatedProducts(ProductIndexQuery query) /** * Remove an event from the index. - * + * * @param event * the event to remove. * @return Copy of the event removed @@ -99,7 +108,7 @@ public List getUnassociatedProducts(ProductIndexQuery query) /** * Add a product summary to the index. - * + * * @param summary * the summary to add. * @return Copy of the product summary object with the indexId set to the @@ -111,7 +120,7 @@ public ProductSummary addProductSummary(final ProductSummary summary) /** * Remove a product summary from the index. - * + * * @param summary * the summary to remove. * @return id of removed summary. @@ -122,7 +131,7 @@ public ProductId removeProductSummary(final ProductSummary summary) /** * Associate an Event and ProductSummary that are already in the index. - * + * * @param event * the event. * @param summary @@ -135,7 +144,7 @@ public Event addAssociation(final Event event, final ProductSummary summary) /** * Remove an association between and Event and ProductSummary. - * + * * @param event * the event. * @param summary @@ -149,10 +158,10 @@ public Event removeAssociation(final Event event, /** * An opportunity for the ProductIndex to update summary information it may * or may not store for efficient event searches. - * + * * This method is called by the indexer after it has finished updating * events during onProduct. - * + * * @param events * events that may have new preferred attributes. * @throws Exception diff --git a/src/main/java/gov/usgs/earthquake/util/JDBCConnection.java b/src/main/java/gov/usgs/earthquake/util/JDBCConnection.java index 011baf16..5d14e239 100644 --- a/src/main/java/gov/usgs/earthquake/util/JDBCConnection.java +++ b/src/main/java/gov/usgs/earthquake/util/JDBCConnection.java @@ -5,7 +5,7 @@ import java.sql.Connection; import java.sql.ResultSet; import java.sql.Statement; - +import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Level; import java.util.logging.Logger; @@ -27,6 +27,9 @@ public abstract class JDBCConnection extends DefaultConfigurable { /** Connection object. */ private Connection connection; + /** Lock prevents statements from mixing during transaction; avoids "synchronized". */ + private final ReentrantLock transactionLock = new ReentrantLock(true); + /** * Create a new JDBCConnection object. */ @@ -64,6 +67,7 @@ public void startup() throws Exception { */ @Override public void shutdown() throws Exception { + transactionLock.lock(); try { if (connection != null) { connection.close(); @@ -73,30 +77,53 @@ public void shutdown() throws Exception { e.printStackTrace(); } finally { connection = null; + transactionLock.unlock(); } } /** * Open a transaction on the database connection + * + * A lock is used to ensure no other transactions can begin until either + * {@link #commitTransaction()} or {@link #rollbackTransaction()} are called. */ - public synchronized void beginTransaction() throws Exception { - Connection conn = this.verifyConnection(); - conn.setAutoCommit(false); + public void beginTransaction() throws Exception { + // enter transaction, but allow thread to be interrupted + transactionLock.lockInterruptibly(); + try { + Connection conn = this.verifyConnection(); + conn.setAutoCommit(false); + } catch (Exception e) { + // if exception occurs, unlock and throw + transactionLock.unlock(); + throw e; + } + // otherwise, remain locked until commit/rollback are called } /** * Finalize the transaction by committing all the changes and closing the * transaction. */ - public synchronized void commitTransaction() throws Exception { - getConnection().setAutoCommit(true); + public void commitTransaction() throws Exception { + try { + getConnection().setAutoCommit(true); + } finally { + // if an exception occurred, somethings weird but still unlock + transactionLock.unlock(); + } } /** * Undo all of the changes made during the current transaction */ - public synchronized void rollbackTransaction() throws Exception { - getConnection().rollback(); + public void rollbackTransaction() throws Exception { + try { + getConnection().rollback(); + } finally { + // if an exception occurred, somethings weird but still unlock + transactionLock.unlock(); + } } /** @@ -113,11 +140,16 @@ public Connection getConnection() { * this doesn't succeed, reinitializes the database connection by calling * shutdown() then startup(). * + * NOTE: this method modifies the stored connection, and should not be called + * by multiple threads. + * + * This is normally called by {@link #beginTransaction()} which uses a lock. + * * @return Valid connection object. * @throws Exception * if unable to (re)connect. */ - public synchronized Connection verifyConnection() throws Exception { + public Connection verifyConnection() throws Exception { try { // usually throws an exception when connection is closed if (connection.isClosed()) { From a3b4d3d726da38e9c1cf968d22606770900d3a61 Mon Sep 17 00:00:00 2001 From: Jeremy Fee Date: Fri, 18 Dec 2020 09:53:02 -0700 Subject: [PATCH 02/11] Try unfair lock to improve performance --- src/main/java/gov/usgs/earthquake/util/JDBCConnection.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/gov/usgs/earthquake/util/JDBCConnection.java b/src/main/java/gov/usgs/earthquake/util/JDBCConnection.java index 5d14e239..361c9bea 100644 --- a/src/main/java/gov/usgs/earthquake/util/JDBCConnection.java +++ b/src/main/java/gov/usgs/earthquake/util/JDBCConnection.java @@ -28,7 +28,7 @@ public abstract class JDBCConnection extends DefaultConfigurable { private Connection connection; /** Lock prevents statements from mixing during transaction; avoids "synchronized". */ - private final ReentrantLock transactionLock = new ReentrantLock(true); + private final ReentrantLock transactionLock = new ReentrantLock(); /** * Create a new JDBCConnection object. From f30da5240c5971057ce39450e90c3302a3b7ffbd Mon Sep 17 00:00:00 2001 From: Jeremy Fee Date: Fri, 18 Dec 2020 09:53:48 -0700 Subject: [PATCH 03/11] Hold index transaction from (unused) search socket --- .../earthquake/indexer/SearchServerSocket.java | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/src/main/java/gov/usgs/earthquake/indexer/SearchServerSocket.java b/src/main/java/gov/usgs/earthquake/indexer/SearchServerSocket.java index a0da59e0..c7a083dc 100644 --- a/src/main/java/gov/usgs/earthquake/indexer/SearchServerSocket.java +++ b/src/main/java/gov/usgs/earthquake/indexer/SearchServerSocket.java @@ -68,17 +68,24 @@ public SearchServerSocket() { /** * Method to perform search. - * + * * Calls Indexer.search(SearchRequest). Simplifies testing. - * + * * @param request * the search to execute. * @return the search response. * @throws Exception */ - protected SearchResponse search(final SearchRequest request) - throws Exception { - return indexer.search(request); + protected SearchResponse search(final SearchRequest request) throws Exception { + indexer.getProductIndex().beginTransaction(); + try { + SearchResponse response = indexer.search(request); + indexer.getProductIndex().commitTransaction(); + return response; + } catch (Exception e) { + indexer.getProductIndex().rollbackTransaction(); + throw e; + } } /** From 587fa0c9289ab6585de7f112ec54eef34ef7fafd Mon Sep 17 00:00:00 2001 From: Jeremy Fee Date: Fri, 18 Dec 2020 09:56:14 -0700 Subject: [PATCH 04/11] Improve queue and task logging --- .../DefaultNotificationReceiver.java | 1 + .../ExecutorListenerNotifier.java | 24 ++++++++++++------- .../NotificationListenerCallable.java | 7 ++++++ .../gov/usgs/util/FutureExecutorTask.java | 12 +++++++++- 4 files changed, 34 insertions(+), 10 deletions(-) diff --git a/src/main/java/gov/usgs/earthquake/distribution/DefaultNotificationReceiver.java b/src/main/java/gov/usgs/earthquake/distribution/DefaultNotificationReceiver.java index c207ddb0..b227a0ae 100644 --- a/src/main/java/gov/usgs/earthquake/distribution/DefaultNotificationReceiver.java +++ b/src/main/java/gov/usgs/earthquake/distribution/DefaultNotificationReceiver.java @@ -576,6 +576,7 @@ public void configure(Config config) throws Exception { throw new ConfigurationException("Unknown notifier type " + notifierType); } + notifier.setName(getName() + ".notifier"); } } diff --git a/src/main/java/gov/usgs/earthquake/distribution/ExecutorListenerNotifier.java b/src/main/java/gov/usgs/earthquake/distribution/ExecutorListenerNotifier.java index 5508dfb8..37b3e3c2 100644 --- a/src/main/java/gov/usgs/earthquake/distribution/ExecutorListenerNotifier.java +++ b/src/main/java/gov/usgs/earthquake/distribution/ExecutorListenerNotifier.java @@ -215,7 +215,7 @@ public void startup() throws Exception { LOGGER.info("[" + receiver.getName() + "] requeueing notification index '" + index.getName() + "'"); // find all existing notifications - Iterator allNotifications = null; + List allNotifications = null; // for json index, push intersection into database if only one listener if (index instanceof JsonNotificationIndex && gracefulListeners.size() == 1) { @@ -227,7 +227,7 @@ public void startup() throws Exception { try { allNotifications = ((JsonNotificationIndex) index).getMissingNotifications( - ((JsonNotificationIndex) listenerIndex).getTable()).iterator(); + ((JsonNotificationIndex) listenerIndex).getTable()); } catch (Exception e) { LOGGER.log(Level.INFO, "Exception loading intersection, continuing", e); } @@ -237,23 +237,23 @@ public void startup() throws Exception { if (allNotifications == null) { // fallback to previous behavior allNotifications = index.findNotifications( - (List) null, (List) null, (List) null) - .iterator(); + (List) null, (List) null, (List) null); } LOGGER.info("Done finding existing notifications"); // queue them for processing in case they were previous missed Date now = new Date(); - while (allNotifications.hasNext()) { - NotificationEvent event = new NotificationEvent(receiver, - allNotifications.next()); + int count = 0; + for (final Notification n : allNotifications) { + NotificationEvent event = new NotificationEvent(receiver, n); if (event.getNotification().getExpirationDate().after(now)) { // still valid this.notifyListeners(event, gracefulListeners); } + count += 1; // try to keep queue size managable during restart - throttleQueues(); + throttleQueues(allNotifications.size() - count); } LOGGER.info("All notifications queued"); @@ -314,6 +314,10 @@ public Integer getMaxQueueSize() { * @throws InterruptedException */ public void throttleQueues() throws InterruptedException { + throttleQueues(null); + } + + public void throttleQueues(Integer remaining) throws InterruptedException { // try to keep queue size managable during restart int maxSize = throttleStartThreshold; // track whether any throttles occurred @@ -333,7 +337,9 @@ public void throttleQueues() throws InterruptedException { LOGGER.info("[" + getName() + "]" + " queueing throttled until below " + throttleStopThreshold - + " (size = " + size + ")"); + + " (size = " + size + ", remaining=" + + (remaining == null ? "?" : remaining) + + ")"); // too many messages queued // set maxSize to stop threshold maxSize = throttleStopThreshold; diff --git a/src/main/java/gov/usgs/earthquake/distribution/NotificationListenerCallable.java b/src/main/java/gov/usgs/earthquake/distribution/NotificationListenerCallable.java index 244af3b9..c5eff57e 100644 --- a/src/main/java/gov/usgs/earthquake/distribution/NotificationListenerCallable.java +++ b/src/main/java/gov/usgs/earthquake/distribution/NotificationListenerCallable.java @@ -51,4 +51,11 @@ public Void call() throws Exception { } } + /** + * Displayed in logs when timeout occurs. + */ + public String toString() { + return this.listener.getName() + + ": " + event.getNotification().getProductId().toString(); + } } diff --git a/src/main/java/gov/usgs/util/FutureExecutorTask.java b/src/main/java/gov/usgs/util/FutureExecutorTask.java index c4342f03..871713ab 100644 --- a/src/main/java/gov/usgs/util/FutureExecutorTask.java +++ b/src/main/java/gov/usgs/util/FutureExecutorTask.java @@ -8,6 +8,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.logging.Level; import java.util.logging.Logger; @@ -139,7 +140,16 @@ public void run() { e = (Exception) cause; } } - LOGGER.log(Level.INFO, "Exception executing task", e); + if (e instanceof InterruptedException) { + LOGGER.info("Interrupted executing " + this.callable.toString()); + } else if (e instanceof TimeoutException) { + LOGGER.info("Timeout executing " + this.callable.toString()); + } else { + LOGGER.log( + Level.INFO, + "Exception executing task " + this.callable.toString(), + e); + } // signal that we are not running runThread = null; From 129da012647a307fa9b21e9f294709ce6d9cc2ae Mon Sep 17 00:00:00 2001 From: Jeremy Fee Date: Fri, 18 Dec 2020 12:53:42 -0700 Subject: [PATCH 05/11] Move driver/url/connect to JDBCConnection, add default network timeout of 30s --- .../earthquake/aws/JsonNotificationIndex.java | 38 +++------- .../earthquake/aws/JsonProductStorage.java | 38 +++------- .../usgs/earthquake/aws/TrackingIndex.java | 38 +++------- .../earthquake/indexer/JDBCProductIndex.java | 41 +++------- .../usgs/earthquake/util/JDBCConnection.java | 75 ++++++++++++++++++- 5 files changed, 112 insertions(+), 118 deletions(-) diff --git a/src/main/java/gov/usgs/earthquake/aws/JsonNotificationIndex.java b/src/main/java/gov/usgs/earthquake/aws/JsonNotificationIndex.java index ab5d2ee7..4f4b64b7 100644 --- a/src/main/java/gov/usgs/earthquake/aws/JsonNotificationIndex.java +++ b/src/main/java/gov/usgs/earthquake/aws/JsonNotificationIndex.java @@ -67,12 +67,8 @@ public class JsonNotificationIndex public static final String DEFAULT_URL = "jdbc:sqlite:json_notification_index.db"; - /** JDBC driver classname. */ - private String driver; /** Database table name. */ private String table; - /** JDBC database connect url. */ - private String url; /** * Construct a JsonNotification using defaults. @@ -93,40 +89,26 @@ public JsonNotificationIndex(final String driver, final String url) { */ public JsonNotificationIndex( final String driver, final String url, final String table) { - this.driver = driver; + super(driver, url); this.table = table; - this.url = url; } - public String getDriver() { return this.driver; } public String getTable() { return this.table; } - public String getUrl() { return this.url; } - public void setDriver(final String driver) { this.driver = driver; } public void setTable(final String table) { this.table = table; } - public void setUrl(final String url) { this.url = url; } @Override public void configure(final Config config) throws Exception { - driver = config.getProperty("driver", DEFAULT_DRIVER); - LOGGER.config("[" + getName() + "] driver=" + driver); - table = config.getProperty("table", DEFAULT_TABLE); - LOGGER.config("[" + getName() + "] table=" + table); - url = config.getProperty("url", DEFAULT_URL); + super.configure(config); + if (getDriver() == null) { setDriver(DEFAULT_DRIVER); } + if (getUrl() == null) { setUrl(DEFAULT_URL); } + + setTable(config.getProperty("table", DEFAULT_TABLE)); + LOGGER.config("[" + getName() + "] driver=" + getDriver()); + LOGGER.config("[" + getName() + "] networkTimeout=" + getNetworkTimeout()); + LOGGER.config("[" + getName() + "] table=" + getTable()); // do not log url, it may contain user/pass } - /** - * Connect to database. - * - * Implements abstract JDBCConnection method. - */ - @Override - protected Connection connect() throws Exception { - // load driver if needed - Class.forName(driver); - return DriverManager.getConnection(url); - } - /** * After normal startup, check whether schema exists and attempt to create. */ @@ -177,7 +159,7 @@ public void createSchema() throws Exception { try (final Statement statement = getConnection().createStatement()) { String autoIncrement = ""; String engine = ""; - if (driver.contains("mysql")) { + if (getDriver().contains("mysql")) { autoIncrement = " AUTO_INCREMENT"; engine = " ENGINE=innodb CHARSET=utf8"; } diff --git a/src/main/java/gov/usgs/earthquake/aws/JsonProductStorage.java b/src/main/java/gov/usgs/earthquake/aws/JsonProductStorage.java index 5d2353db..323b4f43 100644 --- a/src/main/java/gov/usgs/earthquake/aws/JsonProductStorage.java +++ b/src/main/java/gov/usgs/earthquake/aws/JsonProductStorage.java @@ -56,12 +56,8 @@ public class JsonProductStorage extends JDBCConnection implements ProductStorage public static final String DEFAULT_TABLE = "product"; public static final String DEFAULT_URL = "jdbc:sqlite:json_product_index.db"; - /** JDBC driver classname. */ - private String driver; /** Database table name. */ private String table; - /** JDBC database connect url. */ - private String url; /** * Create a JsonProductStorage using defaults. @@ -82,38 +78,24 @@ public JsonProductStorage(final String driver, final String url) { */ public JsonProductStorage( final String driver, final String url, final String table) { - this.driver = driver; + super(driver, url); this.table = table; - this.url = url; } - public String getDriver() { return this.driver; } public String getTable() { return this.table; } - public String getUrl() { return this.url; } - public void setDriver(final String driver) { this.driver = driver; } public void setTable(final String table) { this.table = table; } - public void setUrl(final String url) { this.url = url; } @Override public void configure(final Config config) throws Exception { - driver = config.getProperty("driver", DEFAULT_DRIVER); - LOGGER.config("[" + getName() + "] driver=" + driver); - table = config.getProperty("table", DEFAULT_TABLE); - LOGGER.config("[" + getName() + "] table=" + table); - url = config.getProperty("url", DEFAULT_URL); - // do not log url, it may contain user/pass - } + super.configure(config); + if (getDriver() == null) { setDriver(DEFAULT_DRIVER); } + if (getUrl() == null) { setUrl(DEFAULT_URL); } - /** - * Connect to database. - * - * Implements abstract JDBCConnection method. - */ - @Override - protected Connection connect() throws Exception { - // load driver if needed - Class.forName(driver); - return DriverManager.getConnection(url); + setTable(config.getProperty("table", DEFAULT_TABLE)); + LOGGER.config("[" + getName() + "] driver=" + getDriver()); + LOGGER.config("[" + getName() + "] networkTimeout=" + getNetworkTimeout()); + LOGGER.config("[" + getName() + "] table=" + getTable()); + // do not log url, it may contain user/pass } /** @@ -166,7 +148,7 @@ public void createSchema() throws Exception { try (final Statement statement = getConnection().createStatement()) { String autoIncrement = ""; String engine = ""; - if (driver.contains("mysql")) { + if (getDriver().contains("mysql")) { autoIncrement = " AUTO_INCREMENT"; engine = " ENGINE=innodb CHARSET=utf8"; } diff --git a/src/main/java/gov/usgs/earthquake/aws/TrackingIndex.java b/src/main/java/gov/usgs/earthquake/aws/TrackingIndex.java index 8c34dc60..6d5e389a 100644 --- a/src/main/java/gov/usgs/earthquake/aws/TrackingIndex.java +++ b/src/main/java/gov/usgs/earthquake/aws/TrackingIndex.java @@ -40,12 +40,8 @@ public class TrackingIndex extends JDBCConnection { public static final String DEFAULT_TABLE = "tracking"; public static final String DEFAULT_URL = "jdbc:sqlite:json_tracking_index.db"; - /** JDBC driver classname. */ - private String driver; /** Database table name. */ private String table; - /** JDBC database connect url. */ - private String url; /** * Construct a TrackingIndex using defaults. @@ -66,40 +62,26 @@ public TrackingIndex(final String driver, final String url) { */ public TrackingIndex( final String driver, final String url, final String table) { - this.driver = driver; + super(driver, url); this.table = table; - this.url = url; } - public String getDriver() { return this.driver; } public String getTable() { return this.table; } - public String getUrl() { return this.url; } - public void setDriver(final String driver) { this.driver = driver; } public void setTable(final String table) { this.table = table; } - public void setUrl(final String url) { this.url = url; } @Override public void configure(final Config config) throws Exception { - driver = config.getProperty("driver", DEFAULT_DRIVER); - LOGGER.config("[" + getName() + "] driver=" + driver); - table = config.getProperty("table", DEFAULT_TABLE); - LOGGER.config("[" + getName() + "] table=" + table); - url = config.getProperty("url", DEFAULT_URL); + super.configure(config); + if (getDriver() == null) { setDriver(DEFAULT_DRIVER); } + if (getUrl() == null) { setUrl(DEFAULT_URL); } + + setTable(config.getProperty("table", DEFAULT_TABLE)); + LOGGER.config("[" + getName() + "] driver=" + getDriver()); + LOGGER.config("[" + getName() + "] networkTimeout=" + getNetworkTimeout()); + LOGGER.config("[" + getName() + "] table=" + getTable()); // do not log url, it may contain user/pass } - /** - * Connect to database. - * - * Implements abstract JDBCConnection method. - */ - @Override - protected Connection connect() throws Exception { - // load driver if needed - Class.forName(driver); - return DriverManager.getConnection(url); - } - /** * After normal startup, check whether schema exists and attempt to create. */ @@ -149,7 +131,7 @@ public void createSchema() throws Exception { beginTransaction(); try (final Statement statement = getConnection().createStatement()) { String autoIncrement = ""; - if (driver.contains("mysql")) { + if (getDriver().contains("mysql")) { autoIncrement = "AUTO_INCREMENT"; } statement.executeUpdate( diff --git a/src/main/java/gov/usgs/earthquake/indexer/JDBCProductIndex.java b/src/main/java/gov/usgs/earthquake/indexer/JDBCProductIndex.java index b0c34a3d..6df5f9c0 100644 --- a/src/main/java/gov/usgs/earthquake/indexer/JDBCProductIndex.java +++ b/src/main/java/gov/usgs/earthquake/indexer/JDBCProductIndex.java @@ -56,18 +56,6 @@ public class JDBCProductIndex extends JDBCConnection implements ProductIndex { */ public static final String JDBC_DEFAULT_FILE = "productIndex.db"; - /** - * Constant used to specify what the driver property should be called in the - * config file - */ - private static final String JDBC_DRIVER_PROPERTY = "driver"; - - /** - * Constant used to specify the url property should be called in the config - * file. - */ - private static final String JDBC_URL_PROPERTY = "url"; - /** * Constant used to specify what the index file property should be called in * to config file @@ -125,8 +113,6 @@ public class JDBCProductIndex extends JDBCConnection implements ProductIndex { // private static final String SUMMARY_LINK_RELATION = "relation"; // private static final String SUMMARY_LINK_URL = "url"; - private String driver; - private String url; private String index_file; /** @@ -136,13 +122,12 @@ public class JDBCProductIndex extends JDBCConnection implements ProductIndex { */ public JDBCProductIndex() throws Exception { // Default index file, so calling configure() isn't required - index_file = JDBC_DEFAULT_FILE; - driver = JDBC_DEFAULT_DRIVER; + this(JDBC_DEFAULT_FILE); } public JDBCProductIndex(final String sqliteFileName) throws Exception { index_file = sqliteFileName; - driver = JDBC_DEFAULT_DRIVER; + setDriver(JDBC_DEFAULT_DRIVER); } // ____________________________________ @@ -157,15 +142,10 @@ public JDBCProductIndex(final String sqliteFileName) throws Exception { */ @Override public void configure(Config config) throws Exception { + super.configure(config); + if (getDriver() == null) { setDriver(JDBC_DEFAULT_DRIVER); } - driver = config.getProperty(JDBC_DRIVER_PROPERTY); - index_file = config.getProperty(JDBC_FILE_PROPERTY); - url = config.getProperty(JDBC_URL_PROPERTY); - - if (driver == null || "".equals(driver)) { - driver = JDBC_DEFAULT_DRIVER; - } - + index_file = config.getProperty(JDBC_FILE_PROPERTY, JDBC_DEFAULT_FILE); if (index_file == null || "".equals(index_file)) { index_file = JDBC_DEFAULT_FILE; } @@ -179,9 +159,8 @@ public void configure(Config config) throws Exception { */ @Override public Connection connect() throws Exception { - // If they are using the sqlite driver, we need to try to create the - // file - if (driver.equals(JDBCUtils.SQLITE_DRIVER_CLASSNAME)) { + // If they are using the sqlite driver, we need to try to create the file + if (getUrl() == null && getDriver().equals(JDBCUtils.SQLITE_DRIVER_CLASSNAME)) { // Make sure file exists or copy it out of the JAR File indexFile = new File(index_file); if (!indexFile.exists()) { @@ -198,12 +177,10 @@ public Connection connect() throws Exception { } indexFile = null; - // Build the JDBC url - url = JDBC_CONNECTION_PREFIX + index_file; - driver = JDBCUtils.SQLITE_DRIVER_CLASSNAME; + setUrl(JDBC_CONNECTION_PREFIX + index_file); } - return JDBCUtils.getConnection(driver, url); + return super.connect(); } /** diff --git a/src/main/java/gov/usgs/earthquake/util/JDBCConnection.java b/src/main/java/gov/usgs/earthquake/util/JDBCConnection.java index 361c9bea..faa7ef50 100644 --- a/src/main/java/gov/usgs/earthquake/util/JDBCConnection.java +++ b/src/main/java/gov/usgs/earthquake/util/JDBCConnection.java @@ -1,10 +1,14 @@ package gov.usgs.earthquake.util; +import gov.usgs.util.Config; import gov.usgs.util.DefaultConfigurable; import java.sql.Connection; +import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.Statement; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Level; import java.util.logging.Logger; @@ -19,14 +23,31 @@ * * @author jmfee */ -public abstract class JDBCConnection extends DefaultConfigurable { +public class JDBCConnection extends DefaultConfigurable implements AutoCloseable { + + static { + // set default database connect/login timeout in seconds + DriverManager.setLoginTimeout(10); + } private static final Logger LOGGER = Logger.getLogger(JDBCConnection.class .getName()); + /** shared executor for network timeouts */ + private static final Executor TIMEOUT_EXECUTOR = Executors.newCachedThreadPool(); + /** Connection object. */ private Connection connection; + /** JDBC driver class. */ + private String driver; + + /** JDBC network timeout. */ + private int networkTimeout = 30000; + + /** JDBC connect url. */ + private String url; + /** Lock prevents statements from mixing during transaction; avoids "synchronized". */ private final ReentrantLock transactionLock = new ReentrantLock(); @@ -37,6 +58,38 @@ public JDBCConnection() { this.connection = null; } + public JDBCConnection(final String driver, final String url) { + this.driver = driver; + this.url = url; + } + + /** + * Implement autocloseable. + * + * Calls {@link #shutdown()}. + * + * @throws Exception + */ + @Override + public void close() throws Exception { + shutdown(); + } + + /** + * Implement Configurable + */ + @Override + public void configure(final Config config) throws Exception { + setDriver(config.getProperty("driver")); + + String timeout = config.getProperty("networkTimeout"); + if (timeout != null) { + setNetworkTimeout(Integer.parseInt(timeout)); + } + + setUrl(config.getProperty("url")); + } + /** * Connect to the database. * @@ -46,7 +99,15 @@ public JDBCConnection() { * @throws Exception * if unable to connect. */ - protected abstract Connection connect() throws Exception; + protected Connection connect() throws Exception { + // load driver if needed + Class.forName(driver); + final Connection conn = DriverManager.getConnection(url); + if (networkTimeout > 0) { + conn.setNetworkTimeout(TIMEOUT_EXECUTOR, networkTimeout); + } + return conn; + } /** * Initialize the database connection. @@ -204,4 +265,14 @@ public Connection verifyConnection() throws Exception { return this.connection; } + public String getDriver() { return this.driver; } + public void setDriver(final String driver) { this.driver = driver; } + + public int getNetworkTimeout() { return this.networkTimeout; } + /** NOTE: this does not affect existing connections. */ + public void setNetworkTimeout(final int timeout) { this.networkTimeout = timeout; } + + public String getUrl() { return this.url; } + public void setUrl(final String url) { this.url = url; } + } From 93228f4cf824b41324d6e2f750a9136110b87e68 Mon Sep 17 00:00:00 2001 From: Jeremy Fee Date: Fri, 18 Dec 2020 12:54:46 -0700 Subject: [PATCH 06/11] Configure separate read index when possible --- .../gov/usgs/earthquake/indexer/Indexer.java | 38 ++++++++++++++++--- 1 file changed, 33 insertions(+), 5 deletions(-) diff --git a/src/main/java/gov/usgs/earthquake/indexer/Indexer.java b/src/main/java/gov/usgs/earthquake/indexer/Indexer.java index f154ba04..f96378c9 100644 --- a/src/main/java/gov/usgs/earthquake/indexer/Indexer.java +++ b/src/main/java/gov/usgs/earthquake/indexer/Indexer.java @@ -127,6 +127,9 @@ public class Indexer extends DefaultNotificationListener { /** Index of stored products, and how they are related. */ private ProductIndex productIndex; + /** Index used by {@link #hasProductBeenIndexed(ProductId)}. */ + private ProductIndex readProductIndex; + /** Modules provide product specific functionality. */ private List modules = new LinkedList(); @@ -406,15 +409,16 @@ protected void notifyListeners(final IndexerEvent event) { */ protected boolean hasProductBeenIndexed(final ProductId id) throws Exception { boolean hasProduct = false; - // use transaction to avoid stepping on #indexProduct() - productIndex.beginTransaction(); + // readProductIndex may be the same as productIndex, + // or for mysql uses a separate connection + readProductIndex.beginTransaction(); try { - hasProduct = productIndex.hasProduct(id); - productIndex.commitTransaction(); + hasProduct = readProductIndex.hasProduct(id); + readProductIndex.commitTransaction(); } catch (Exception wtf) { LOGGER.log(Level.WARNING, "[" + getName() + "] exception checking if product already indexed", wtf); - productIndex.rollbackTransaction(); + readProductIndex.rollbackTransaction(); } // default is it hasn't been processed @@ -1760,6 +1764,14 @@ public void shutdown() throws Exception { LOGGER.log(Level.WARNING, "[" + getName() + "] exception shutting down product index", e); } + if (readProductIndex != productIndex) { + try { + readProductIndex.shutdown(); + } catch (Exception e) { + LOGGER.log(Level.WARNING, "[" + getName() + + "] exception shutting down read product index", e); + } + } productStorage.shutdown(); // ExecutorServices tied to known listeners. @@ -1853,6 +1865,22 @@ public void startup() throws Exception { productStorage.startup(); productIndex.startup(); + // when using mysql index, use separate connection for hasProductBeenIndexed + if (productIndex instanceof JDBCProductIndex) { + JDBCProductIndex jdbcProductIndex = (JDBCProductIndex) productIndex; + if (jdbcProductIndex.getDriver().contains("mysql")) { + JDBCProductIndex index = new JDBCProductIndex(); + index.setDriver(jdbcProductIndex.getDriver()); + index.setUrl(jdbcProductIndex.getUrl()); + readProductIndex = index; + readProductIndex.startup(); + } + } + if (readProductIndex == null) { + // otherwise share index + readProductIndex = productIndex; + } + // Cleanup thread to purge old products if (archivePolicies.size() > 0) { // Instantiate a timer object From 2332cd71e1ff00b5ff8c81158a901de0b10e631b Mon Sep 17 00:00:00 2001 From: Jeremy Fee Date: Fri, 18 Dec 2020 12:55:05 -0700 Subject: [PATCH 07/11] Update test to junit5 classes --- .../distribution/JDBCNotificationIndexTest.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/test/java/gov/usgs/earthquake/distribution/JDBCNotificationIndexTest.java b/src/test/java/gov/usgs/earthquake/distribution/JDBCNotificationIndexTest.java index 636027f9..1f5cad1d 100644 --- a/src/test/java/gov/usgs/earthquake/distribution/JDBCNotificationIndexTest.java +++ b/src/test/java/gov/usgs/earthquake/distribution/JDBCNotificationIndexTest.java @@ -15,10 +15,10 @@ import java.util.Iterator; import java.util.List; -import org.junit.After; import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; public class JDBCNotificationIndexTest { @@ -121,7 +121,7 @@ TRACKER_URL, new URL( * Creates a dummy notification index with sample notifications etc. This is * used for the testing environment and is run before the tests themselves. */ - @Before + @BeforeEach public void setupEnvironment() { try { String t = System.getProperty("user.dir"); // CWD-ish @@ -200,7 +200,7 @@ public void setupEnvironment() { /** * Cleans up the dummy environment after running all tests. */ - @After + @AfterEach public void cleanupEnvironment() { try { @@ -267,8 +267,8 @@ public void testRemoveNotifications() throws Exception { public void testRemoveExpiredNotifications() throws Exception { List found = index.findExpiredNotifications(); - for (int i = 0; i < EXPIRED_NOTIFICATIONS.size(); ++i) { - Assert.assertTrue(contains(found, EXPIRED_NOTIFICATIONS.get(i))); + for (final Notification expired : EXPIRED_NOTIFICATIONS) { + Assert.assertTrue(contains(found, expired)); } } From 3d6b4785bc07e59f00e5389aafe1f546e5df7aa7 Mon Sep 17 00:00:00 2001 From: Jeremy Fee Date: Fri, 18 Dec 2020 13:32:03 -0700 Subject: [PATCH 08/11] Remove unused imports, fix warnings --- .../java/gov/usgs/earthquake/aws/JsonNotificationIndex.java | 2 -- src/main/java/gov/usgs/earthquake/aws/JsonProductStorage.java | 2 -- src/main/java/gov/usgs/earthquake/aws/TrackingIndex.java | 2 -- src/test/java/gov/usgs/earthquake/indexer/ExtentIndexTest.java | 3 ++- 4 files changed, 2 insertions(+), 7 deletions(-) diff --git a/src/main/java/gov/usgs/earthquake/aws/JsonNotificationIndex.java b/src/main/java/gov/usgs/earthquake/aws/JsonNotificationIndex.java index 4f4b64b7..776dafbd 100644 --- a/src/main/java/gov/usgs/earthquake/aws/JsonNotificationIndex.java +++ b/src/main/java/gov/usgs/earthquake/aws/JsonNotificationIndex.java @@ -2,8 +2,6 @@ import java.io.ByteArrayInputStream; import java.net.URL; -import java.sql.Connection; -import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; diff --git a/src/main/java/gov/usgs/earthquake/aws/JsonProductStorage.java b/src/main/java/gov/usgs/earthquake/aws/JsonProductStorage.java index 323b4f43..345dba70 100644 --- a/src/main/java/gov/usgs/earthquake/aws/JsonProductStorage.java +++ b/src/main/java/gov/usgs/earthquake/aws/JsonProductStorage.java @@ -1,8 +1,6 @@ package gov.usgs.earthquake.aws; import java.io.ByteArrayInputStream; -import java.sql.Connection; -import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; diff --git a/src/main/java/gov/usgs/earthquake/aws/TrackingIndex.java b/src/main/java/gov/usgs/earthquake/aws/TrackingIndex.java index 6d5e389a..fcf6ad1a 100644 --- a/src/main/java/gov/usgs/earthquake/aws/TrackingIndex.java +++ b/src/main/java/gov/usgs/earthquake/aws/TrackingIndex.java @@ -1,8 +1,6 @@ package gov.usgs.earthquake.aws; import java.io.ByteArrayInputStream; -import java.sql.Connection; -import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.Statement; diff --git a/src/test/java/gov/usgs/earthquake/indexer/ExtentIndexTest.java b/src/test/java/gov/usgs/earthquake/indexer/ExtentIndexTest.java index a42a28a7..b98a09ca 100644 --- a/src/test/java/gov/usgs/earthquake/indexer/ExtentIndexTest.java +++ b/src/test/java/gov/usgs/earthquake/indexer/ExtentIndexTest.java @@ -17,7 +17,7 @@ public class ExtentIndexTest { @Test - public void addExistsTest() throws Exception{ + public void addExistsTest() throws Exception { ExtentIndex index = new ExtentIndex(); index.configure(new Config()); index.startup(); @@ -42,5 +42,6 @@ public void addExistsTest() throws Exception{ stmnt.executeUpdate(); index.shutdown(); + index.close(); } } \ No newline at end of file From e070ffca7740c81a03005f8eb3de5da5b5555f60 Mon Sep 17 00:00:00 2001 From: Jeremy Fee Date: Fri, 18 Dec 2020 13:46:12 -0700 Subject: [PATCH 09/11] Change associateUsingCurrentProducts configure default to match code default (true) --- src/main/java/gov/usgs/earthquake/indexer/Indexer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/gov/usgs/earthquake/indexer/Indexer.java b/src/main/java/gov/usgs/earthquake/indexer/Indexer.java index f96378c9..ca00c395 100644 --- a/src/main/java/gov/usgs/earthquake/indexer/Indexer.java +++ b/src/main/java/gov/usgs/earthquake/indexer/Indexer.java @@ -76,7 +76,7 @@ public class Indexer extends DefaultNotificationListener { public static final String ASSOCIATE_USING_CURRENT_PRODUCTS_PROPERTY = "associateUsingCurrentProducts"; - public static final String DEFAULT_ASSOCIATE_USING_CURRENT_PRODUCTS = "false"; + public static final String DEFAULT_ASSOCIATE_USING_CURRENT_PRODUCTS = "true"; /** Property name to configure a custom storage. */ public static final String STORAGE_CONFIG_PROPERTY = "storage"; From 906a5fdd5bb114a27982b2e5dccf9600d22d5417 Mon Sep 17 00:00:00 2001 From: Jeremy Fee Date: Fri, 18 Dec 2020 13:55:28 -0700 Subject: [PATCH 10/11] Adjust aws receiver logging --- .../gov/usgs/earthquake/aws/AwsProductReceiver.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/main/java/gov/usgs/earthquake/aws/AwsProductReceiver.java b/src/main/java/gov/usgs/earthquake/aws/AwsProductReceiver.java index f056c593..259f0223 100644 --- a/src/main/java/gov/usgs/earthquake/aws/AwsProductReceiver.java +++ b/src/main/java/gov/usgs/earthquake/aws/AwsProductReceiver.java @@ -116,7 +116,7 @@ public void onOpen(Session session) throws IOException { this.session = session; // start catch up process try { - LOGGER.info("[" + getName() + "] Starting catch up"); + LOGGER.fine("[" + getName() + "] Starting catch up"); sendProductsCreatedAfter(); } catch (Exception e) { LOGGER.log( @@ -173,19 +173,19 @@ synchronized public void onMessage(String message) throws IOException { protected void onBroadcast(final JsonObject json) throws Exception { final JsonNotification notification = new JsonNotification( json.getJsonObject("notification")); - LOGGER.info("[" + getName() + "] onBroadcast(" + notification.getProductId() + ")"); + LOGGER.fine("[" + getName() + "] onBroadcast(" + notification.getProductId() + ")"); Long broadcastId = json.getJsonObject("notification").getJsonNumber("id").longValue(); if (lastBroadcastId != null && broadcastId != (lastBroadcastId + 1)) { // sanity check, broadcast ids are expected to increment // if incoming broadcast is not lastBroadcastId + 1, may have missed a broadcast - LOGGER.warning( + LOGGER.finer( "[" + getName() + "] broadcast ids out of sequence" + " (got " + broadcastId + ", expected " + (lastBroadcastId + 1) + ")"); if (processBroadcast) { // not in catch up mode, switch back - LOGGER.info("[" + getName() + "] switching to catch up mode"); + LOGGER.fine("[" + getName() + "] switching to catch up mode"); processBroadcast = false; sendProductsCreatedAfter(); } @@ -228,7 +228,7 @@ protected void onJsonNotification(final JsonNotification notification) throws Ex protected void onProduct(final JsonObject json) throws Exception { final JsonNotification notification = new JsonNotification( json.getJsonObject("notification")); - LOGGER.info("[" + getName() + "] onProduct(" + notification.getProductId() + ")"); + LOGGER.fine("[" + getName() + "] onProduct(" + notification.getProductId() + ")"); onJsonNotification(notification); } From b71882af1c7774fef813c37d0dbb863d002d0408 Mon Sep 17 00:00:00 2001 From: Jeremy Fee Date: Fri, 18 Dec 2020 15:05:06 -0700 Subject: [PATCH 11/11] Update verison to 2.7.2 --- code.json | 4 ++-- .../java/gov/usgs/earthquake/distribution/ProductClient.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/code.json b/code.json index 146c0756..f8c094a9 100644 --- a/code.json +++ b/code.json @@ -3,7 +3,7 @@ "name": "Product Distribution Layer", "organization": "U.S. Geological Survey", "description": "Distribution system used for derived earthquake information", - "version": "v2.7.1", + "version": "v2.7.2", "status": "Production", "permissions": { "usageType": "openSource", @@ -27,7 +27,7 @@ "email": "jmfee@usgs.gov" }, "date": { - "metadataLastUpdated": "2020-12-15" + "metadataLastUpdated": "2020-12-18" } } ] diff --git a/src/main/java/gov/usgs/earthquake/distribution/ProductClient.java b/src/main/java/gov/usgs/earthquake/distribution/ProductClient.java index d487d8fb..f81bb1c8 100644 --- a/src/main/java/gov/usgs/earthquake/distribution/ProductClient.java +++ b/src/main/java/gov/usgs/earthquake/distribution/ProductClient.java @@ -64,7 +64,7 @@ public class ProductClient extends DefaultConfigurable implements ProductClientMBean, Bootstrappable { /** The "release" version number. */ - public static final String RELEASE_VERSION = "Version 2.7.1 2020-12-15"; + public static final String RELEASE_VERSION = "Version 2.7.2 2020-12-18"; /** Property name used on products for current RELEASE_VERSION. */ public static final String PDL_CLIENT_VERSION_PROPERTY = "pdl-client-version";