From 277d45a023771d6efe618d6308833d3f2e2905b8 Mon Sep 17 00:00:00 2001 From: Jeremy Fee Date: Fri, 15 Jan 2021 10:59:06 -0700 Subject: [PATCH 1/6] Remove duplicate import --- .../java/gov/usgs/earthquake/aws/AwsProductReceiverTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/test/java/gov/usgs/earthquake/aws/AwsProductReceiverTest.java b/src/test/java/gov/usgs/earthquake/aws/AwsProductReceiverTest.java index 2b52def7..0c2efc99 100644 --- a/src/test/java/gov/usgs/earthquake/aws/AwsProductReceiverTest.java +++ b/src/test/java/gov/usgs/earthquake/aws/AwsProductReceiverTest.java @@ -5,7 +5,6 @@ import javax.json.Json; import javax.json.JsonObject; import javax.websocket.CloseReason; -import javax.websocket.CloseReason.CloseCode; import javax.websocket.CloseReason.CloseCodes; import org.junit.Assert; From f6d1272b2460e9078979288321349643dc42710e Mon Sep 17 00:00:00 2001 From: Jeremy Fee Date: Fri, 15 Jan 2021 10:59:23 -0700 Subject: [PATCH 2/6] Add AwsBatchIndexer --- .../usgs/earthquake/aws/AwsBatchIndexer.java | 182 ++++++++++++++++++ 1 file changed, 182 insertions(+) create mode 100644 src/main/java/gov/usgs/earthquake/aws/AwsBatchIndexer.java diff --git a/src/main/java/gov/usgs/earthquake/aws/AwsBatchIndexer.java b/src/main/java/gov/usgs/earthquake/aws/AwsBatchIndexer.java new file mode 100644 index 00000000..8000bbde --- /dev/null +++ b/src/main/java/gov/usgs/earthquake/aws/AwsBatchIndexer.java @@ -0,0 +1,182 @@ +package gov.usgs.earthquake.aws; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.io.StringReader; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.Date; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.logging.Level; +import java.util.logging.Logger; + +import javax.json.Json; +import javax.json.JsonObject; +import javax.json.JsonReader; + +import gov.usgs.earthquake.distribution.Bootstrappable; +import gov.usgs.earthquake.indexer.Indexer; +import gov.usgs.earthquake.product.Product; +import gov.usgs.earthquake.product.ProductId; +import gov.usgs.earthquake.product.io.JsonProduct; +import gov.usgs.util.Config; +import gov.usgs.util.StreamUtils; +import gov.usgs.util.XmlUtils; + +/** + * Class to index a batch of products that have already been sent to the AWS hub. + * + * Reads a list of products to be indexed. + * Reads indexer from configuration file. + * + * For each product, fetch product information from the get_product AWS endpoint + * and call indexer.onProduct. + */ +public class AwsBatchIndexer implements Bootstrappable { + + /** Logging object. */ + private static final Logger LOGGER = Logger.getLogger(AwsBatchIndexer.class.getName()); + + /** Executor where indexing runs. */ + private ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10); + + /** Whether to force indexing. */ + private boolean force = false; + + /** AWS URL for get_product endpoint, with placeholders. */ + private String getProductUrlTemplate = "https://earthquake.usgs.gov/pdl/west" + + "/get_product?source={source}&type={type}&code={code}&updateTime={updateTime}"; + + /** Indexer to process products. */ + private Indexer indexer; + + + public static final String FORCE_REINDEX_ARGUMENT = "--force"; + public static final String GET_PRODUCT_URL_ARGUMENT = "--getProductUrl="; + public static final String INDEXER_CONFIG_NAME_ARGUMENT="--indexerConfigName="; + public static final String INDEXER_CONFIG_NAME_DEFAULT = "indexer"; + + @Override + public void run(String[] args) throws Exception { + String indexerConfigName = INDEXER_CONFIG_NAME_DEFAULT; + + // parse arguments + for (final String arg : args) { + if (arg.equals(FORCE_REINDEX_ARGUMENT)) { + force = true; + } else if (arg.startsWith(GET_PRODUCT_URL_ARGUMENT)) { + getProductUrlTemplate = arg.replace(GET_PRODUCT_URL_ARGUMENT, ""); + } else if (arg.startsWith(INDEXER_CONFIG_NAME_ARGUMENT)) { + indexerConfigName = arg.replace(INDEXER_CONFIG_NAME_ARGUMENT, ""); + } + } + + // load indexer from configuration + indexer = (Indexer) Config.getConfig().getObject(indexerConfigName); + + // read product ids from stdin + BufferedReader br = new BufferedReader(new InputStreamReader(System.in)); + String line = null; + while ((line = br.readLine()) != null) { + if (line.equals("")) { + continue; + } + + // parse product id + ProductId id; + try { + id = ProductId.parse(line); + } catch (Exception e) { + LOGGER.warning("Error parsing product id '" + line + "'"); + continue; + } + + // queue for processing + executor.submit(() -> { + long start = new Date().getTime(); + Exception e = indexProduct(id); + long total = new Date().getTime() - start; + if (e == null) { + LOGGER.info("Indexed " + id.toString() + " in " + total + " ms"); + } else { + LOGGER.log( + Level.WARNING, + "Error indexing " + id.toString() + " in " + total + "ms", + e); + } + }); + + // keep queue size smallish + if (executor.getQueue().size() > 500) { + while (executor.getQueue().size() > 100) { + LOGGER.info("Queue size " + executor.getQueue().size()); + Thread.sleep(5000L); + } + } + + } + } + + /** + * Use getProductUrl template to generate URL. + * + * Replace "{source}", "{type}", "{code}", and "{updateTime}" placeholders. + * + * @param id + * which product. + * @return URL with placeholders replaced. + * @throws Exception + */ + public URL getProductUrl(final ProductId id) throws Exception { + String url = getProductUrlTemplate; + url = url.replace("{source}", id.getSource()); + url = url.replace("{type}", id.getType()); + url = url.replace("{code}", id.getCode()); + url = url.replace("{updateTime}", XmlUtils.formatDate(id.getUpdateTime())); + return new URL(url); + } + + /** + * Get Product from endpoint. + * + * @param id + * which product. + * @return Product object. + * @throws Exception + */ + public Product getProduct(final ProductId id) throws Exception { + final URL url = getProductUrl(id); + byte[] bytes = StreamUtils.readStream(url); + try ( + final JsonReader reader = Json.createReader(new StringReader( + new String(bytes, StandardCharsets.UTF_8))) + ) { + // parse message + final JsonObject json = reader.readObject(); + return new JsonProduct().getProduct(json); + } + } + + /** + * Index a product. + * + * @param product + * which product + * @throws Exception + */ + public Exception indexProduct(final ProductId id) { + try { + long start = new Date().getTime(); + final Product product = getProduct(id); + long afterGetProduct = new Date().getTime(); + LOGGER.fine("Loaded product " + id.toString() + " in " + + (afterGetProduct - start) + " ms"); + indexer.onProduct(product, force); + return null; + } catch (Exception e) { + return e; + } + } + +} From 5d192a66ec5152845bb7d1bed043c8c8aa527713 Mon Sep 17 00:00:00 2001 From: Jeremy Fee Date: Fri, 15 Jan 2021 11:59:34 -0700 Subject: [PATCH 3/6] Fix tabs to spaces --- src/main/java/gov/usgs/earthquake/aws/AwsBatchIndexer.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/gov/usgs/earthquake/aws/AwsBatchIndexer.java b/src/main/java/gov/usgs/earthquake/aws/AwsBatchIndexer.java index 8000bbde..ab740557 100644 --- a/src/main/java/gov/usgs/earthquake/aws/AwsBatchIndexer.java +++ b/src/main/java/gov/usgs/earthquake/aws/AwsBatchIndexer.java @@ -57,8 +57,8 @@ public class AwsBatchIndexer implements Bootstrappable { public static final String INDEXER_CONFIG_NAME_ARGUMENT="--indexerConfigName="; public static final String INDEXER_CONFIG_NAME_DEFAULT = "indexer"; - @Override - public void run(String[] args) throws Exception { + @Override + public void run(String[] args) throws Exception { String indexerConfigName = INDEXER_CONFIG_NAME_DEFAULT; // parse arguments @@ -116,7 +116,7 @@ public void run(String[] args) throws Exception { } } - } + } /** * Use getProductUrl template to generate URL. From c7884b05120d2f377c25255bac20659374458fd7 Mon Sep 17 00:00:00 2001 From: Jeremy Fee Date: Fri, 15 Jan 2021 13:31:48 -0700 Subject: [PATCH 4/6] Refactor processing out of run method, add database support --- .../usgs/earthquake/aws/AwsBatchIndexer.java | 200 +++++++++++++----- 1 file changed, 146 insertions(+), 54 deletions(-) diff --git a/src/main/java/gov/usgs/earthquake/aws/AwsBatchIndexer.java b/src/main/java/gov/usgs/earthquake/aws/AwsBatchIndexer.java index ab740557..20132aa6 100644 --- a/src/main/java/gov/usgs/earthquake/aws/AwsBatchIndexer.java +++ b/src/main/java/gov/usgs/earthquake/aws/AwsBatchIndexer.java @@ -5,6 +5,9 @@ import java.io.StringReader; import java.net.URL; import java.nio.charset.StandardCharsets; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; import java.util.Date; import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor; @@ -20,6 +23,7 @@ import gov.usgs.earthquake.product.Product; import gov.usgs.earthquake.product.ProductId; import gov.usgs.earthquake.product.io.JsonProduct; +import gov.usgs.earthquake.util.JDBCConnection; import gov.usgs.util.Config; import gov.usgs.util.StreamUtils; import gov.usgs.util.XmlUtils; @@ -35,6 +39,16 @@ */ public class AwsBatchIndexer implements Bootstrappable { + public static final String FORCE_REINDEX_ARGUMENT = "--force"; + public static final String GET_PRODUCT_URL_ARGUMENT = "--getProductUrl="; + public static final String INDEXER_CONFIG_NAME_ARGUMENT="--indexerConfigName="; + public static final String INDEXER_CONFIG_NAME_DEFAULT = "indexer"; + + public static final String DATABASE_DRIVER_ARGUMENT = "--databaseDriver="; + public static final String DATABASE_URL_ARGUMENT = "--databaseUrl="; + public static final String INDEXER_DATABASE_ARGUMENT = "--indexerDatabase="; + public static final String INDEXER_DATABASE_DEFAULT = "indexer"; + /** Logging object. */ private static final Logger LOGGER = Logger.getLogger(AwsBatchIndexer.class.getName()); @@ -52,18 +66,18 @@ public class AwsBatchIndexer implements Bootstrappable { private Indexer indexer; - public static final String FORCE_REINDEX_ARGUMENT = "--force"; - public static final String GET_PRODUCT_URL_ARGUMENT = "--getProductUrl="; - public static final String INDEXER_CONFIG_NAME_ARGUMENT="--indexerConfigName="; - public static final String INDEXER_CONFIG_NAME_DEFAULT = "indexer"; - @Override public void run(String[] args) throws Exception { - String indexerConfigName = INDEXER_CONFIG_NAME_DEFAULT; - // parse arguments + String databaseDriver = "com.mysql.jdbc.Driver"; + String databaseUrl = null; + String indexerConfigName = INDEXER_CONFIG_NAME_DEFAULT; for (final String arg : args) { - if (arg.equals(FORCE_REINDEX_ARGUMENT)) { + if (arg.equals(DATABASE_DRIVER_ARGUMENT)) { + databaseDriver = arg.replace(DATABASE_DRIVER_ARGUMENT, ""); + } else if (arg.equals(DATABASE_URL_ARGUMENT)) { + databaseUrl = arg.replace(DATABASE_URL_ARGUMENT, ""); + } else if (arg.equals(FORCE_REINDEX_ARGUMENT)) { force = true; } else if (arg.startsWith(GET_PRODUCT_URL_ARGUMENT)) { getProductUrlTemplate = arg.replace(GET_PRODUCT_URL_ARGUMENT, ""); @@ -75,46 +89,12 @@ public void run(String[] args) throws Exception { // load indexer from configuration indexer = (Indexer) Config.getConfig().getObject(indexerConfigName); - // read product ids from stdin - BufferedReader br = new BufferedReader(new InputStreamReader(System.in)); - String line = null; - while ((line = br.readLine()) != null) { - if (line.equals("")) { - continue; - } - - // parse product id - ProductId id; - try { - id = ProductId.parse(line); - } catch (Exception e) { - LOGGER.warning("Error parsing product id '" + line + "'"); - continue; - } - - // queue for processing - executor.submit(() -> { - long start = new Date().getTime(); - Exception e = indexProduct(id); - long total = new Date().getTime() - start; - if (e == null) { - LOGGER.info("Indexed " + id.toString() + " in " + total + " ms"); - } else { - LOGGER.log( - Level.WARNING, - "Error indexing " + id.toString() + " in " + total + "ms", - e); - } - }); - - // keep queue size smallish - if (executor.getQueue().size() > 500) { - while (executor.getQueue().size() > 100) { - LOGGER.info("Queue size " + executor.getQueue().size()); - Thread.sleep(5000L); - } - } - + if (databaseUrl != null) { + LOGGER.info("Reading product ids from database"); + readProductIdsFromDatabase(databaseDriver, databaseUrl); + } else { + LOGGER.info("Reading product ids from stdin"); + readProductIdsFromStdin(); } } @@ -159,24 +139,136 @@ public Product getProduct(final ProductId id) throws Exception { } /** - * Index a product. + * Fetch and Index a product. * - * @param product + * Called from executor service to process product ids. + * + * @param id * which product * @throws Exception */ - public Exception indexProduct(final ProductId id) { + public void processProductId(final ProductId id) { + long start = new Date().getTime(); try { - long start = new Date().getTime(); final Product product = getProduct(id); long afterGetProduct = new Date().getTime(); LOGGER.fine("Loaded product " + id.toString() + " in " + (afterGetProduct - start) + " ms"); + indexer.onProduct(product, force); - return null; + LOGGER.info("Indexed " + id.toString() + + " in " + (new Date().getTime() - afterGetProduct) + " ms"); } catch (Exception e) { - return e; + LOGGER.log( + Level.WARNING, + "Error indexing " + id.toString() + + " in " + (new Date().getTime() - start) + "ms", + e); } } + public void readProductIdsFromDatabase( + final String driver, + final String url) throws Exception { + try ( + final JDBCConnection jdbcConnection = new JDBCConnection() + ) { + jdbcConnection.setDriver(driver); + jdbcConnection.setUrl(url); + jdbcConnection.startup(); + + final String sql = "SELECT id, source, type, code, updatetime" + + " FROM pdl.product h" + + " WHERE id > ?" + + " AND NOT EXISTS (" + + " SELECT * FROM indexer.productSummary i" + + " WHERE h.source=i.source" + + " AND h.type=i.type" + + " AND h.code=i.code" + + " AND h.updatetime=i.updateTime" + + " )" + + " ORDER BY id" + + " LIMIT 500"; + + // start at the beginning + long lastId = -1; + while (true) { + try ( + final Connection conn = jdbcConnection.verifyConnection(); + final PreparedStatement statement = conn.prepareStatement(sql); + ) { + // load next batch of products + statement.setLong(1, lastId); + try ( + final ResultSet rs = statement.executeQuery(); + ) { + int count = 0; + while (rs.next()) { + lastId = rs.getLong("id"); + final ProductId id = new ProductId( + rs.getString("source"), + rs.getString("type"), + rs.getString("code"), + new Date(rs.getLong("updatetime"))); + submitProductId(id); + count++; + } + + // exit once all products processed + if (count == 0) { + LOGGER.info("No more rows returned, exiting"); + break; + } + } + } + } + } + } + + /** + * Read product ids (as urns) from stdin and submit to executor for processing. + * + * @throws Exception + */ + public void readProductIdsFromStdin() throws Exception { + // read product ids from stdin + BufferedReader br = new BufferedReader(new InputStreamReader(System.in)); + String line = null; + while ((line = br.readLine()) != null) { + if (line.equals("")) { + continue; + } + // parse product id + final ProductId id; + try { + id = ProductId.parse(line); + } catch (Exception e) { + LOGGER.warning("Error parsing product id '" + line + "'"); + continue; + } + submitProductId(id); + } + } + + /** + * Submit a product id to the executor service for processing. + * + * If queue is too large (500 ids), blocks until queue is smaller (100 ids). + * + * @param id + * which product + * @throws InterruptedException + */ + public void submitProductId(final ProductId id) throws InterruptedException { + // queue for processing + executor.submit(() -> processProductId(id)); + + // keep queue size smallish + if (executor.getQueue().size() > 500) { + while (executor.getQueue().size() > 100) { + LOGGER.info("Queue size " + executor.getQueue().size()); + Thread.sleep(5000L); + } + } + } } From 890390c2349ef00689b1190afa7945391fbc6ce6 Mon Sep 17 00:00:00 2001 From: Jeremy Fee Date: Fri, 15 Jan 2021 16:50:04 -0700 Subject: [PATCH 5/6] Call indexer startup, parse notification response from get_product --- .../usgs/earthquake/aws/AwsBatchIndexer.java | 24 ++++++++++++------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/src/main/java/gov/usgs/earthquake/aws/AwsBatchIndexer.java b/src/main/java/gov/usgs/earthquake/aws/AwsBatchIndexer.java index 20132aa6..27ee9e81 100644 --- a/src/main/java/gov/usgs/earthquake/aws/AwsBatchIndexer.java +++ b/src/main/java/gov/usgs/earthquake/aws/AwsBatchIndexer.java @@ -73,9 +73,9 @@ public void run(String[] args) throws Exception { String databaseUrl = null; String indexerConfigName = INDEXER_CONFIG_NAME_DEFAULT; for (final String arg : args) { - if (arg.equals(DATABASE_DRIVER_ARGUMENT)) { + if (arg.startsWith(DATABASE_DRIVER_ARGUMENT)) { databaseDriver = arg.replace(DATABASE_DRIVER_ARGUMENT, ""); - } else if (arg.equals(DATABASE_URL_ARGUMENT)) { + } else if (arg.startsWith(DATABASE_URL_ARGUMENT)) { databaseUrl = arg.replace(DATABASE_URL_ARGUMENT, ""); } else if (arg.equals(FORCE_REINDEX_ARGUMENT)) { force = true; @@ -88,13 +88,18 @@ public void run(String[] args) throws Exception { // load indexer from configuration indexer = (Indexer) Config.getConfig().getObject(indexerConfigName); + indexer.startup(); - if (databaseUrl != null) { - LOGGER.info("Reading product ids from database"); - readProductIdsFromDatabase(databaseDriver, databaseUrl); - } else { - LOGGER.info("Reading product ids from stdin"); - readProductIdsFromStdin(); + try { + if (databaseUrl != null) { + LOGGER.info("Reading product ids from database"); + readProductIdsFromDatabase(databaseDriver, databaseUrl); + } else { + LOGGER.info("Reading product ids from stdin"); + readProductIdsFromStdin(); + } + } finally { + indexer.shutdown(); } } @@ -134,7 +139,8 @@ public Product getProduct(final ProductId id) throws Exception { ) { // parse message final JsonObject json = reader.readObject(); - return new JsonProduct().getProduct(json); + final JsonNotification notification = new JsonNotification(json); + return notification.product; } } From cd802b304a7aa04a843624fad22a64ad08d68e8c Mon Sep 17 00:00:00 2001 From: Jeremy Fee Date: Fri, 15 Jan 2021 16:50:18 -0700 Subject: [PATCH 6/6] Update version to 2.7.7 --- code.json | 4 ++-- .../java/gov/usgs/earthquake/distribution/ProductClient.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/code.json b/code.json index 066c7122..1f69e51b 100644 --- a/code.json +++ b/code.json @@ -3,7 +3,7 @@ "name": "Product Distribution Layer", "organization": "U.S. Geological Survey", "description": "Distribution system used for derived earthquake information", - "version": "v2.7.6", + "version": "v2.7.7", "status": "Production", "permissions": { "usageType": "openSource", @@ -27,7 +27,7 @@ "email": "jmfee@usgs.gov" }, "date": { - "metadataLastUpdated": "2021-01-05" + "metadataLastUpdated": "2021-01-15" } } ] diff --git a/src/main/java/gov/usgs/earthquake/distribution/ProductClient.java b/src/main/java/gov/usgs/earthquake/distribution/ProductClient.java index ac292b9a..6250243f 100644 --- a/src/main/java/gov/usgs/earthquake/distribution/ProductClient.java +++ b/src/main/java/gov/usgs/earthquake/distribution/ProductClient.java @@ -64,7 +64,7 @@ public class ProductClient extends DefaultConfigurable implements ProductClientMBean, Bootstrappable { /** The "release" version number. */ - public static final String RELEASE_VERSION = "Version 2.7.6 2021-01-05"; + public static final String RELEASE_VERSION = "Version 2.7.7 2021-01-15"; /** Property name used on products for current RELEASE_VERSION. */ public static final String PDL_CLIENT_VERSION_PROPERTY = "pdl-client-version";