diff --git a/.classpath b/.classpath
index c7944e3b..d2663e59 100644
--- a/.classpath
+++ b/.classpath
@@ -10,6 +10,7 @@
+
diff --git a/.gitignore b/.gitignore
index 256bd8e0..87d4da98 100644
--- a/.gitignore
+++ b/.gitignore
@@ -17,3 +17,5 @@ test-index.db
.idea/
out/
data/
+.project
+.settings
diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml
new file mode 100644
index 00000000..925f9d4e
--- /dev/null
+++ b/.gitlab-ci.yml
@@ -0,0 +1,47 @@
+image: ${DEVOPS_REGISTRY}usgs/centos:8
+
+stages:
+ - build
+
+.check_code:
+ after_script:
+ # runs before cache is saved (copying from .travis.yml)
+ - rm -f "${CI_PROJECT_DIR}/.gradle/caches/modules-2/modules-2.lock"
+ - rm -fr "${CI_PROJECT_DIR}/.gradle/caches/*/plugin-resolution/"
+ artifacts:
+ paths:
+ - ProductClient.jar
+ - ProductClient.zip
+ reports:
+ cobertura: build/reports/cobertura/cobertura.xml
+ junit: build/test-results/test/TEST-*.xml
+ cache:
+ paths:
+ - .gradle/caches
+ - .gradle/wrapper
+ image: ${DEVOPS_REGISTRY}usgs/java:11-jdk
+ script:
+ # run gradle
+ - export GRADLE_USER_HOME="${CI_PROJECT_DIR}/.gradle"
+ - ./gradlew build createZip
+ # set up artifact paths
+ - cp build/libs/ProductClient.jar ProductClient.jar
+ - cp build/distributions/ProductClient.zip ProductClient.zip
+ stage: build
+ tags:
+ - development
+
+Java 8:
+ extends:
+ - .check_code
+ image: ${DEVOPS_REGISTRY}usgs/java:8-jdk
+
+Java 11:
+ extends:
+ - .check_code
+ image: ${DEVOPS_REGISTRY}usgs/java:11-jdk
+
+Java Latest:
+ extends:
+ - .check_code
+ image: ${DEVOPS_REGISTRY}usgs/java:latest-jdk
diff --git a/.project b/.project
deleted file mode 100644
index 5b921ea6..00000000
--- a/.project
+++ /dev/null
@@ -1,23 +0,0 @@
-
-
- pdl
-
-
-
-
-
- org.eclipse.jdt.core.javabuilder
-
-
-
-
- org.eclipse.buildship.core.gradleprojectbuilder
-
-
-
-
-
- org.eclipse.jdt.core.javanature
- org.eclipse.buildship.core.gradleprojectnature
-
-
diff --git a/.settings/org.eclipse.buildship.core.prefs b/.settings/org.eclipse.buildship.core.prefs
deleted file mode 100644
index e8895216..00000000
--- a/.settings/org.eclipse.buildship.core.prefs
+++ /dev/null
@@ -1,2 +0,0 @@
-connection.project.dir=
-eclipse.preferences.version=1
diff --git a/.settings/org.eclipse.jdt.core.prefs b/.settings/org.eclipse.jdt.core.prefs
deleted file mode 100644
index 35068d95..00000000
--- a/.settings/org.eclipse.jdt.core.prefs
+++ /dev/null
@@ -1,4 +0,0 @@
-eclipse.preferences.version=1
-org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.8
-org.eclipse.jdt.core.compiler.compliance=1.8
-org.eclipse.jdt.core.compiler.source=1.8
diff --git a/build.gradle b/build.gradle
index 71027b97..2798adb3 100644
--- a/build.gradle
+++ b/build.gradle
@@ -8,9 +8,9 @@ plugins {
id "java"
id "jacoco" // code coverage
id "pmd" // static code analysis
- id "com.gradle.build-scan" version "2.0.2" // post build results online
- id "org.ajoberstar.grgit" version "2.3.0" // git repo information
- id "org.ajoberstar.git-publish" version "1.0.1" // publish gh-pages branch
+ id "org.ajoberstar.grgit" version "4.1.0" // git repo information
+ id "org.ajoberstar.git-publish" version "3.0.0" // publish gh-pages branch
+ id "com.kageiit.jacobo" version "2.1.0"
}
sourceCompatibility = 1.8
@@ -27,30 +27,33 @@ configurations {
dependencies {
codacy "com.codacy:codacy-coverage-reporter:4.0+"
+ // for explanation of dependency types, see
+ // https://docs.gradle.org/current/userguide/building_java_projects.html
+
// javax.json
- compile "javax.json:javax.json-api:1.1"
- compile "org.glassfish:javax.json:1.1"
+ implementation "javax.json:javax.json-api:1.1"
+ runtimeOnly "org.glassfish:javax.json:1.1"
// javax.xml (for MessageUtils)
- compile "javax.activation:activation:1.1.1"
- compile "javax.xml.bind:jaxb-api:2.3+"
- compile "org.glassfish.jaxb:jaxb-runtime:2.3+"
+ runtimeOnly "javax.activation:activation:1.1.1"
+ implementation "javax.xml.bind:jaxb-api:2.3+"
+ runtimeOnly "org.glassfish.jaxb:jaxb-runtime:2.3+"
// javax.websocket (for notifications)
- compile "javax.websocket:javax.websocket-all:1.1"
- compile "org.glassfish.tyrus.bundles:tyrus-standalone-client:1.17"
+ implementation "javax.websocket:javax.websocket-all:1.1"
+ implementation "org.glassfish.tyrus.bundles:tyrus-standalone-client:1.17"
// database drivers
- compile "mysql:mysql-connector-java:5.1.47"
- compile "org.xerial:sqlite-jdbc:3.23.1"
+ runtimeOnly "mysql:mysql-connector-java:5.1.47"
+ runtimeOnly "org.xerial:sqlite-jdbc:3.23.1"
// ssh keys
- compile "ch.ethz.ganymed:ganymed-ssh2:262"
+ implementation "ch.ethz.ganymed:ganymed-ssh2:262"
// nats
implementation "io.nats:java-nats-streaming:2.1.6"
- compile files("lib/MessageUtils.jar")
- compile files("lib/QWFileOutClient.jar")
+ implementation files("lib/MessageUtils.jar")
+ implementation files("lib/QWFileOutClient.jar")
// XMLVerifier uses this, but is excluded for now
// compile files("lib/cap-library-r11.jar")
- testImplementation("junit:junit:4.13")
+ testImplementation("junit:junit:4.13.1")
testImplementation("org.junit.jupiter:junit-jupiter-api:5.7.0")
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:5.7.0")
testRuntimeOnly("org.junit.vintage:junit-vintage-engine:5.7.0")
@@ -64,13 +67,14 @@ sourceSets {
// show compile warnings
tasks.withType(JavaCompile) {
+ options.encoding = 'UTF-8';
options.setDeprecation(true);
options.setWarnings(true);
}
// coverage reports
jacoco {
- toolVersion "0.8.2"
+ toolVersion "0.8.6"
}
jacocoTestReport {
reports {
@@ -81,7 +85,7 @@ jacocoTestReport {
check.dependsOn jacocoTestReport
pmd {
- toolVersion = "6.17.0"
+ toolVersion = "6.29.0"
ruleSets = [
file("pmd-ruleset.xml")
]
@@ -128,14 +132,16 @@ javadoc {
// create jar file
jar {
- baseName = "ProductClient"
+ archiveBaseName = "ProductClient"
+ duplicatesStrategy = DuplicatesStrategy.EXCLUDE
+
manifest {
attributes "Git-Commit": grgit.head().id
attributes "Main-Class": "gov.usgs.earthquake.distribution.Bootstrap"
}
from {
// classes and dependencies
- configurations.runtime.collect { it.isDirectory() ? it : zipTree(it) }
+ configurations.runtimeClasspath.collect { it.isDirectory() ? it : zipTree(it) }
}
from (".") {
// resources
@@ -148,7 +154,7 @@ jar {
// create zip bundle
task createZip(type: Zip, dependsOn: jar) {
- archiveName "ProductClient.zip"
+ archiveFileName = "ProductClient.zip"
// init scripts and README
from "etc/examples/default"
// example listeners
@@ -186,11 +192,15 @@ gitPublish {
// Tasks for TravisCI
-// run with "gradle build --scan" to post build output online
-buildScan {
- termsOfServiceUrl = "https://gradle.com/terms-of-service"
- termsOfServiceAgree = "yes"
-}
+// convert jacoco to cobertura
+import com.kageiit.jacobo.JacoboTask
+tasks.create("jacobo", JacoboTask, {
+ it.jacocoReport = file("${project.buildDir}/reports/jacoco/test/jacocoTestReport.xml")
+ it.coberturaReport = file("${project.buildDir}/reports/cobertura/cobertura.xml")
+ it.srcDirs = sourceSets.main.java.srcDirs
+}).dependsOn(jacocoTestReport)
+check.dependsOn jacobo
+
// .travis.yml uses this to upload coverage
task sendCoverageToCodacy(type: JavaExec, dependsOn: jacocoTestReport) {
diff --git a/build.xml b/build.xml
deleted file mode 100755
index 59917ff9..00000000
--- a/build.xml
+++ /dev/null
@@ -1,240 +0,0 @@
-
-
-
-
- Product distribution includes tools to build, send, and receive products.
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- -->
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/code.json b/code.json
index 693aaefc..146c0756 100644
--- a/code.json
+++ b/code.json
@@ -3,7 +3,7 @@
"name": "Product Distribution Layer",
"organization": "U.S. Geological Survey",
"description": "Distribution system used for derived earthquake information",
- "version": "v2.7.0",
+ "version": "v2.7.1",
"status": "Production",
"permissions": {
"usageType": "openSource",
@@ -27,7 +27,7 @@
"email": "jmfee@usgs.gov"
},
"date": {
- "metadataLastUpdated": "2020-12-03"
+ "metadataLastUpdated": "2020-12-15"
}
}
]
diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar
index 0d4a9516..5c2d1cf0 100644
Binary files a/gradle/wrapper/gradle-wrapper.jar and b/gradle/wrapper/gradle-wrapper.jar differ
diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties
index f4d7b2bf..4d9ca164 100644
--- a/gradle/wrapper/gradle-wrapper.properties
+++ b/gradle/wrapper/gradle-wrapper.properties
@@ -1,5 +1,5 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
-distributionUrl=https\://services.gradle.org/distributions/gradle-5.4.1-bin.zip
+distributionUrl=https\://services.gradle.org/distributions/gradle-6.7.1-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
diff --git a/gradlew b/gradlew
index cccdd3d5..b0d6d0ab 100755
--- a/gradlew
+++ b/gradlew
@@ -1,5 +1,21 @@
#!/usr/bin/env sh
+#
+# Copyright 2015 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
##############################################################################
##
## Gradle start up script for UN*X
@@ -28,7 +44,7 @@ APP_NAME="Gradle"
APP_BASE_NAME=`basename "$0"`
# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
-DEFAULT_JVM_OPTS=""
+DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"'
# Use the maximum available, or set MAX_FD != -1 to use that value.
MAX_FD="maximum"
diff --git a/gradlew.bat b/gradlew.bat
index e95643d6..15e1ee37 100644
--- a/gradlew.bat
+++ b/gradlew.bat
@@ -1,3 +1,19 @@
+@rem
+@rem Copyright 2015 the original author or authors.
+@rem
+@rem Licensed under the Apache License, Version 2.0 (the "License");
+@rem you may not use this file except in compliance with the License.
+@rem You may obtain a copy of the License at
+@rem
+@rem http://www.apache.org/licenses/LICENSE-2.0
+@rem
+@rem Unless required by applicable law or agreed to in writing, software
+@rem distributed under the License is distributed on an "AS IS" BASIS,
+@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+@rem See the License for the specific language governing permissions and
+@rem limitations under the License.
+@rem
+
@if "%DEBUG%" == "" @echo off
@rem ##########################################################################
@rem
@@ -14,7 +30,7 @@ set APP_BASE_NAME=%~n0
set APP_HOME=%DIRNAME%
@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
-set DEFAULT_JVM_OPTS=
+set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m"
@rem Find java.exe
if defined JAVA_HOME goto findJavaFromJavaHome
diff --git a/settings.gradle b/settings.gradle
index 1f74fe7c..cfadee41 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -2,9 +2,21 @@
* This file was generated by the Gradle 'init' task.
*
* The settings file is used to specify which projects to include in your build.
- *
+ *
* Detailed information about configuring a multi-project build in Gradle can be found
* in the user guide at https://docs.gradle.org/4.9/userguide/multi_project_builds.html
*/
-rootProject.name = 'pdl'
+plugins {
+ id "com.gradle.enterprise" version "3.5"
+}
+
+gradleEnterprise {
+ // run with "gradle build --scan" to post build output online
+ buildScan {
+ termsOfServiceUrl = "https://gradle.com/terms-of-service"
+ termsOfServiceAgree = "yes"
+ }
+}
+
+rootProject.name = 'pdl'
\ No newline at end of file
diff --git a/src/main/java/gov/usgs/earthquake/aws/AwsProductReceiver.java b/src/main/java/gov/usgs/earthquake/aws/AwsProductReceiver.java
index 38fb1029..f056c593 100644
--- a/src/main/java/gov/usgs/earthquake/aws/AwsProductReceiver.java
+++ b/src/main/java/gov/usgs/earthquake/aws/AwsProductReceiver.java
@@ -56,9 +56,17 @@ public class AwsProductReceiver extends DefaultNotificationReceiver implements W
private Session session;
/* µs timestamp of last message that has been processed */
- private Instant createdAfter = null;
- private JsonNotification lastBroadcast = null;
- private boolean processBroadcast = false;
+ protected Instant createdAfter = null;
+ protected JsonNotification lastBroadcast = null;
+ protected Long lastBroadcastId = null;
+ protected boolean processBroadcast = false;
+
+ /** Used to coordinate sending products_created_after message. */
+ protected boolean sendProductsCreatedAfterFlag = false;
+ protected boolean sendProductsCreatedAfterRunning = false;
+ protected Object sendProductsCreatedAfterSync = new Object();
+ protected Thread sendProductsCreatedAfterThread;
+
@Override
public void configure(Config config) throws Exception {
@@ -165,12 +173,34 @@ synchronized public void onMessage(String message) throws IOException {
protected void onBroadcast(final JsonObject json) throws Exception {
final JsonNotification notification = new JsonNotification(
json.getJsonObject("notification"));
- lastBroadcast = notification;
LOGGER.info("[" + getName() + "] onBroadcast(" + notification.getProductId() + ")");
- if (!processBroadcast) {
- return;
+
+ Long broadcastId = json.getJsonObject("notification").getJsonNumber("id").longValue();
+ if (lastBroadcastId != null && broadcastId != (lastBroadcastId + 1)) {
+ // sanity check, broadcast ids are expected to increment
+ // if incoming broadcast is not lastBroadcastId + 1, may have missed a broadcast
+ LOGGER.warning(
+ "[" + getName() + "] broadcast ids out of sequence"
+ + " (got " + broadcastId + ", expected " + (lastBroadcastId + 1) + ")");
+
+ if (processBroadcast) {
+ // not in catch up mode, switch back
+ LOGGER.info("[" + getName() + "] switching to catch up mode");
+ processBroadcast = false;
+ sendProductsCreatedAfter();
+ }
+ }
+
+ // track last broadcast for catch up process (as long as newer)
+ if (lastBroadcastId == null || broadcastId > lastBroadcastId) {
+ lastBroadcastId = broadcastId;
+ lastBroadcast = notification;
+ }
+
+ // process message if not in catch up mode
+ if (processBroadcast) {
+ onJsonNotification(notification);
}
- onJsonNotification(notification);
}
/**
@@ -236,6 +266,81 @@ protected void onProductsCreatedAfter(final JsonObject json) throws Exception {
}
}
+ /**
+ * Request background thread to send "products_created_after" message.
+ *
+ * @throws IOException
+ */
+ protected void sendProductsCreatedAfter() throws IOException {
+ synchronized(sendProductsCreatedAfterSync) {
+ // set flag that we want to send products created after
+ sendProductsCreatedAfterFlag = true;
+ // wake up background thread that sends message
+ sendProductsCreatedAfterSync.notifyAll();
+ }
+ }
+
+ /**
+ * Start background thread that sends "products_created_after" messages.
+ *
+ * @return started thread.
+ */
+ protected void startSendProductsCreatedAfterThread() {
+ if (sendProductsCreatedAfterThread != null) {
+ throw new IllegalStateException("sendProductsCreatedThread already exists");
+ }
+ sendProductsCreatedAfterFlag = false;
+ sendProductsCreatedAfterRunning = true;
+ sendProductsCreatedAfterThread = new Thread(() -> {
+ while (sendProductsCreatedAfterRunning) {
+ try {
+ synchronized (sendProductsCreatedAfterSync) {
+ if (!sendProductsCreatedAfterFlag) {
+ // wait until ready to send
+ sendProductsCreatedAfterSync.wait();
+ continue;
+ }
+ }
+
+ // ready to send, try to keep queue size manageable
+ throttleQueues();
+
+ synchronized (sendProductsCreatedAfterSync) {
+ // now send actual products created after message
+ try {
+ _sendProductsCreatedAfter();
+ // message sent, reset flag
+ sendProductsCreatedAfterFlag = false;
+ } catch (IOException e) {
+ LOGGER.log(
+ Level.WARNING,
+ "[" + getName() + "] Exception sending products_created_after",
+ e);
+ }
+ }
+ } catch (InterruptedException ie) {
+ // interrupted usually means shutting down thread
+ }
+ }
+ });
+ sendProductsCreatedAfterThread.start();
+ }
+
+ protected void stopProductsCreatedAfterThread() {
+ try {
+ sendProductsCreatedAfterRunning = false;
+ sendProductsCreatedAfterThread.interrupt();
+ sendProductsCreatedAfterThread.join();
+ } catch (Exception e) {
+ LOGGER.log(
+ Level.WARNING,
+ "[" + getName() + "] exception stopping sendProductsCreatedAfterThread",
+ e);
+ } finally {
+ sendProductsCreatedAfterThread = null;
+ }
+ }
+
/**
* Send an "action"="products_created_after" request, which is part of the
* catch up process.
@@ -244,7 +349,7 @@ protected void onProductsCreatedAfter(final JsonObject json) throws Exception {
* then one "action"="products_created_after" message to indicate the request
* is complete.
*/
- protected void sendProductsCreatedAfter() throws IOException {
+ protected void _sendProductsCreatedAfter() throws IOException {
// set default for created after
if (this.createdAfter == null) {
this.createdAfter = Instant.now().minusSeconds(7 * 86400);
@@ -299,6 +404,9 @@ public void startup() throws Exception{
createdAfter = Instant.parse(json.getString(CREATED_AFTER_PROPERTY));
}
+ // start background thread for products_create_after messages
+ startSendProductsCreatedAfterThread();
+
//open websocket
client = new WebSocketClient(uri, this, attempts, timeout, true);
}
@@ -309,8 +417,11 @@ public void startup() throws Exception{
*/
@Override
public void shutdown() throws Exception{
+ stopProductsCreatedAfterThread();
//close socket
- client.shutdown();
+ try {
+ client.shutdown();
+ } catch (Exception e) {}
super.shutdown();
}
diff --git a/src/main/java/gov/usgs/earthquake/distribution/DefaultNotificationReceiver.java b/src/main/java/gov/usgs/earthquake/distribution/DefaultNotificationReceiver.java
index addf139a..c207ddb0 100644
--- a/src/main/java/gov/usgs/earthquake/distribution/DefaultNotificationReceiver.java
+++ b/src/main/java/gov/usgs/earthquake/distribution/DefaultNotificationReceiver.java
@@ -685,6 +685,12 @@ public Map getQueueStatus() {
return null;
}
+ public void throttleQueues() throws InterruptedException {
+ if (notifier instanceof ExecutorListenerNotifier) {
+ ((ExecutorListenerNotifier) notifier).throttleQueues();
+ }
+ }
+
public Long getReceiverCleanupInterval() {
return receiverCleanupInterval;
}
@@ -701,6 +707,14 @@ public void setConnectTimeout(int connectTimeout) {
this.connectTimeout = connectTimeout;
}
+ public ListenerNotifier getNotifier() {
+ return this.notifier;
+ }
+
+ public void setNotifier(final ListenerNotifier notifier) {
+ this.notifier = notifier;
+ }
+
public int getReadTimeout() {
return readTimeout;
}
diff --git a/src/main/java/gov/usgs/earthquake/distribution/ExecutorListenerNotifier.java b/src/main/java/gov/usgs/earthquake/distribution/ExecutorListenerNotifier.java
index 245db2c5..5508dfb8 100644
--- a/src/main/java/gov/usgs/earthquake/distribution/ExecutorListenerNotifier.java
+++ b/src/main/java/gov/usgs/earthquake/distribution/ExecutorListenerNotifier.java
@@ -48,6 +48,15 @@ public class ExecutorListenerNotifier extends DefaultConfigurable implements
*/
protected Timer retryTimer = new Timer();
+ /** When queue size reaches this level, start throttling */
+ protected int throttleStartThreshold = 50000;
+
+ /** When queue size reaches this level, stop throttling */
+ protected int throttleStopThreshold = 25000;
+
+ /** When throttling, wait this many milliseconds between queue size checks. */
+ protected long throttleWaitInterval = 5000L;
+
public ExecutorListenerNotifier(final DefaultNotificationReceiver receiver) {
this.receiver = receiver;
}
@@ -242,6 +251,9 @@ public void startup() throws Exception {
// still valid
this.notifyListeners(event, gracefulListeners);
}
+
+ // try to keep queue size managable during restart
+ throttleQueues();
}
LOGGER.info("All notifications queued");
@@ -279,6 +291,58 @@ public Map getStatus() {
return status;
}
+ /**
+ * Check queue status and return length of longest queue.
+ *
+ * @return length of longest queue, or null if no queue lengths.
+ */
+ public Integer getMaxQueueSize() {
+ final Map status = getStatus();
+ Integer maxQueueSize = null;
+ for (Integer queueSize : status.values()) {
+ if (queueSize != null && (maxQueueSize == null || queueSize > maxQueueSize)) {
+ maxQueueSize = queueSize;
+ }
+ }
+ return maxQueueSize;
+ }
+
+ /**
+ * If longest queue has more than 50k notifications,
+ * wait until longest queue has 25k notifications before returning.
+ *
+ * @throws InterruptedException
+ */
+ public void throttleQueues() throws InterruptedException {
+ // try to keep queue size managable during restart
+ int maxSize = throttleStartThreshold;
+ // track whether any throttles occurred
+ boolean throttled = false;
+
+ while (true) {
+ final Integer size = getMaxQueueSize();
+ if (size == null || size <= maxSize) {
+ // within limit
+ if (throttled) {
+ LOGGER.info("[" + getName() + "] done throttling (size = " + size + ")");
+ }
+ break;
+ }
+
+ throttled = true;
+ LOGGER.info("[" + getName() + "]"
+ + " queueing throttled until below "
+ + throttleStopThreshold
+ + " (size = " + size + ")");
+ // too many messages queued
+ // set maxSize to stop threshold
+ maxSize = throttleStopThreshold;
+ // wait for listener to do some processing
+ // 5s is a little low, but don't want to wait too long
+ Thread.sleep(throttleWaitInterval);
+ }
+ }
+
/**
* NOTE: messing with the executors map is not a good idea.
*
@@ -288,4 +352,13 @@ public Map getExecutors() {
return notificationListeners;
}
+ public int getThrottleStartThreshold() { return this.throttleStartThreshold; }
+ public void setThrottleStartThreshold(final int n) { this.throttleStartThreshold = n; }
+
+ public int getThrottleStopThreshold() { return this.throttleStopThreshold; }
+ public void setThrottleStopThreshold(final int n) { this.throttleStopThreshold = n; }
+
+ public long getThrottleWaitInterval() { return this.throttleWaitInterval; }
+ public void setThrottleWaitInterval(final long ms) { this.throttleWaitInterval = ms; }
+
}
diff --git a/src/main/java/gov/usgs/earthquake/distribution/ProductClient.java b/src/main/java/gov/usgs/earthquake/distribution/ProductClient.java
index 3d8fec86..d487d8fb 100644
--- a/src/main/java/gov/usgs/earthquake/distribution/ProductClient.java
+++ b/src/main/java/gov/usgs/earthquake/distribution/ProductClient.java
@@ -64,7 +64,7 @@ public class ProductClient extends DefaultConfigurable implements
ProductClientMBean, Bootstrappable {
/** The "release" version number. */
- public static final String RELEASE_VERSION = "Version 2.7.0 2020-12-03";
+ public static final String RELEASE_VERSION = "Version 2.7.1 2020-12-15";
/** Property name used on products for current RELEASE_VERSION. */
public static final String PDL_CLIENT_VERSION_PROPERTY = "pdl-client-version";
diff --git a/src/main/java/gov/usgs/earthquake/indexer/JDBCProductIndex.java b/src/main/java/gov/usgs/earthquake/indexer/JDBCProductIndex.java
index 92e79424..ee0b37c9 100644
--- a/src/main/java/gov/usgs/earthquake/indexer/JDBCProductIndex.java
+++ b/src/main/java/gov/usgs/earthquake/indexer/JDBCProductIndex.java
@@ -1270,7 +1270,7 @@ protected BigDecimal normalizeLongitude(BigDecimal lon) {
return null;
}
- return new BigDecimal(normalizeLongitude(lon.doubleValue()));
+ return BigDecimal.valueOf(normalizeLongitude(lon.doubleValue()));
}
/**
diff --git a/src/main/java/gov/usgs/earthquake/product/io/JsonProduct.java b/src/main/java/gov/usgs/earthquake/product/io/JsonProduct.java
index c30e8741..782f4f3e 100644
--- a/src/main/java/gov/usgs/earthquake/product/io/JsonProduct.java
+++ b/src/main/java/gov/usgs/earthquake/product/io/JsonProduct.java
@@ -54,7 +54,11 @@ public JsonObject getJsonObject(final Product product) throws Exception {
json.add("id", getIdJson(id));
json.add("links", getLinksJson(product.getLinks()));
json.add("properties", getPropertiesJson(product.getProperties()));
- json.add("signature", product.getSignature());
+ if (product.getSignature() == null) {
+ json.addNull("signature");
+ } else {
+ json.add("signature", product.getSignature());
+ }
json.add("signatureVersion", product.getSignatureVersion().toString());
json.add("status", product.getStatus());
json.add("type", "Feature");
@@ -70,7 +74,11 @@ public Product getProduct(final JsonObject json) throws Exception {
product.setLinks(getLinks(json.getJsonArray("links")));
product.setProperties(getProperties(json.getJsonObject("properties")));
product.setStatus(json.getString("status"));
- product.setSignature(json.getString("signature"));
+ try {
+ product.setSignature(json.getString("signature"));
+ } catch (Exception e) {
+ product.setSignature(null);
+ }
product.setSignatureVersion(Version.fromString(json.getString("signatureVersion")));
product.setTrackerURL(new URL("data:,"));
return product;
diff --git a/src/test/java/gov/usgs/earthquake/aws/AwsProductReceiverTest.java b/src/test/java/gov/usgs/earthquake/aws/AwsProductReceiverTest.java
new file mode 100644
index 00000000..084d6d1b
--- /dev/null
+++ b/src/test/java/gov/usgs/earthquake/aws/AwsProductReceiverTest.java
@@ -0,0 +1,194 @@
+package gov.usgs.earthquake.aws;
+
+import java.time.Instant;
+
+import javax.json.Json;
+import javax.json.JsonObject;
+
+import org.junit.Assert;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+
+import gov.usgs.earthquake.distribution.Notification;
+import gov.usgs.earthquake.product.Product;
+import gov.usgs.earthquake.product.ProductId;
+import gov.usgs.earthquake.product.io.JsonProduct;
+
+public class AwsProductReceiverTest {
+
+ TestAwsProductReceiver receiver;
+
+ @BeforeEach
+ public void before() throws Exception {
+ receiver = new TestAwsProductReceiver();
+ receiver.startSendProductsCreatedAfterThread();
+ }
+
+ @AfterEach
+ public void after() throws Exception {
+ receiver.stopProductsCreatedAfterThread();
+ receiver = null;
+ }
+
+ @Test
+ public void testSwitchToBroadcast() throws Exception {
+ TestSession testSession = new TestSession();
+ // connect
+ receiver.onOpen(testSession);
+
+ // receive broadcast
+ Instant created = Instant.now();
+ receiver.onMessage(getNotification("broadcast", 10, created).toString());
+ Assert.assertFalse("not in broadcast mode yet", receiver.isProcessBroadcast());
+ Assert.assertNull("didn't process broadcast", receiver.lastJsonNotification);
+ Assert.assertEquals("saved broadcast id",
+ Long.valueOf(10L), receiver.getLastBroadcastId());
+
+ // receive response to products_created_after
+ receiver.onMessage(getNotification("product", 10, created).toString());
+ Assert.assertNotNull(
+ "processed product during catch up",
+ receiver.lastJsonNotification);
+
+ // receive end of products_created_after response
+ receiver.onMessage(getProductsCreatedAfter(created, 1).toString());
+ Assert.assertTrue("switched to broadcast mode", receiver.isProcessBroadcast());
+ }
+
+
+ @Test
+ public void testBroadcastOutOfOrder() throws Exception {
+ TestSession testSession = new TestSession();
+ // connect
+ receiver.onOpen(testSession);
+ // enable broadcast mode
+ receiver.setProcessBroadcast(true);
+ receiver.setLastBroadcastId(10L);
+
+ // receive broadcast in order
+ receiver.onMessage(getNotification("broadcast", 11, Instant.now()).toString());
+ Assert.assertTrue("still in broadcast mode", receiver.isProcessBroadcast());
+ Assert.assertNotNull("processed broadcast", receiver.lastJsonNotification);
+ Assert.assertEquals("saved broadcast id",
+ Long.valueOf(11L), receiver.getLastBroadcastId());
+
+ // clear any previous products created after message
+ testSession.testBasicRemote.lastSendText = null;
+ receiver.lastJsonNotification = null;
+
+ // receive broadcast out of order
+ receiver.onMessage(getNotification("broadcast", 13, Instant.now()).toString());
+ Assert.assertFalse("no longer in broadcast mode", receiver.isProcessBroadcast());
+ Assert.assertNull("did not broadcast", receiver.lastJsonNotification);
+ Assert.assertEquals("still saved broadcast id",
+ Long.valueOf(13L), receiver.getLastBroadcastId());
+ String sent = testSession.waitForBasicSendText(100L);
+ Assert.assertTrue(
+ "sent products_created_after",
+ sent.contains("\"action\":\"products_created_after\""));
+ }
+
+ @Test
+ public void testStartCatchUpWhenConnected() throws Exception {
+ // set flags being tested to opposite values
+ receiver.setProcessBroadcast(true);
+ TestSession testSession = new TestSession();
+
+ // call onOpen to simulate connection
+ receiver.onOpen(testSession);
+
+ // processBroadcast disabled and created after request sent
+ Assert.assertFalse("not in process broadcast mode", receiver.isProcessBroadcast());
+
+ String sent = testSession.waitForBasicSendText(1000L);
+ Assert.assertTrue(
+ "sent products_created_after",
+ sent.contains("\"action\":\"products_created_after\""));
+ }
+
+ @Test
+ public void testThrottleQueue() throws Exception {
+ TestSession testSession = new TestSession();
+ TestListenerNotifier testNotifier = new TestListenerNotifier(receiver);
+ receiver.setNotifier(testNotifier);
+ // simulate queue that needs to be throttled
+ testNotifier.queueSize = 5001;
+ testNotifier.setThrottleStartThreshold(5000);
+ testNotifier.setThrottleStopThreshold(2500);
+ testNotifier.setThrottleWaitInterval(100L);
+
+ // call onOpen to simulate connection
+ receiver.onOpen(testSession);
+ Assert.assertNull(
+ "throttling should prevent message",
+ testSession.waitForBasicSendText(100L));
+
+ // now simulate queue that is done throttling
+ testNotifier.queueSize = 2499;
+ String sent = testSession.waitForBasicSendText(500L);
+ Assert.assertTrue(
+ "sent products_created_after",
+ sent.contains("\"action\":\"products_created_after\""));
+ }
+
+ static JsonObject getNotification(final String action, final long id, final Instant created) throws Exception {
+ Product product = new Product(new ProductId("source", "type", "code"));
+ return Json.createObjectBuilder()
+ .add("action", action)
+ .add("notification",
+ Json.createObjectBuilder()
+ .add("id", id)
+ .add("created", created.toString())
+ .add("product", new JsonProduct().getJsonObject(product)))
+ .build();
+ }
+
+ static JsonObject getProductsCreatedAfter(final Instant createdAfter, final int count) {
+ return Json.createObjectBuilder()
+ .add("action", "products_created_after")
+ .add("created_after", createdAfter.toString())
+ .add("count", count)
+ .build();
+ }
+
+ /**
+ * Stub socket connections to test message handling behavior.
+ */
+ static class TestAwsProductReceiver extends AwsProductReceiver {
+ public JsonNotification lastJsonNotification;
+ public boolean onJsonNotificationCalled = false;
+
+ @Override
+ protected void onJsonNotification(JsonNotification notification) throws Exception {
+ onJsonNotificationCalled = true;
+ lastJsonNotification = notification;
+ super.onJsonNotification(notification);
+ }
+
+ @Override
+ public void receiveNotification(Notification notification) throws Exception {
+ // skip actual processing
+ }
+
+ @Override
+ public void writeTrackingData() {
+ // skip tracking
+ }
+
+ // getter/setter to control state for testing
+
+ public Instant getCreatedAfter() { return this.createdAfter; }
+ public void setCreatedAfter(final Instant c) { this.createdAfter = c; }
+
+ public JsonNotification getLastBroadcast() { return this.lastBroadcast; }
+ public void setLastBroadcast(final JsonNotification j) { this.lastBroadcast = j; }
+
+ public Long getLastBroadcastId() { return this.lastBroadcastId; }
+ public void setLastBroadcastId(final Long l) { this.lastBroadcastId = l; }
+
+ public boolean isProcessBroadcast() { return this.processBroadcast; }
+ public void setProcessBroadcast(final boolean b) { this.processBroadcast = b; }
+ }
+
+}
diff --git a/src/test/java/gov/usgs/earthquake/aws/TestBasicRemote.java b/src/test/java/gov/usgs/earthquake/aws/TestBasicRemote.java
new file mode 100644
index 00000000..cb51ba27
--- /dev/null
+++ b/src/test/java/gov/usgs/earthquake/aws/TestBasicRemote.java
@@ -0,0 +1,97 @@
+package gov.usgs.earthquake.aws;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.Writer;
+import java.nio.ByteBuffer;
+import java.util.logging.Logger;
+
+import javax.websocket.EncodeException;
+import javax.websocket.RemoteEndpoint.Basic;
+
+/**
+ * TestBasicRemote captures data sent via sendText calls.
+ */
+public class TestBasicRemote implements Basic {
+
+ private static final Logger LOGGER = Logger.getLogger(TestBasicRemote.class.getName());
+
+ public String lastSendText = null;
+ private Object sendTextSync = new Object();
+
+ @Override
+ public void sendText(String text) throws IOException {
+ LOGGER.info("sendText called with " + text);
+ synchronized(sendTextSync) {
+ lastSendText = text;
+ sendTextSync.notifyAll();
+ }
+ }
+
+ public String waitForSendText(final long timeoutMillis) throws InterruptedException {
+ synchronized(sendTextSync) {
+ if (lastSendText == null) {
+ sendTextSync.wait(timeoutMillis);
+ }
+ return lastSendText;
+ }
+ };
+
+ // other methods in interface are not used at this time
+
+ @Override
+ public void setBatchingAllowed(boolean allowed) throws IOException {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public boolean getBatchingAllowed() {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public void flushBatch() throws IOException {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public void sendPing(ByteBuffer applicationData) throws IOException, IllegalArgumentException {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public void sendPong(ByteBuffer applicationData) throws IOException, IllegalArgumentException {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public void sendBinary(ByteBuffer data) throws IOException {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public void sendText(String partialMessage, boolean isLast) throws IOException {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public void sendBinary(ByteBuffer partialByte, boolean isLast) throws IOException {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public OutputStream getSendStream() throws IOException {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public Writer getSendWriter() throws IOException {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public void sendObject(Object data) throws IOException, EncodeException {
+ throw new RuntimeException("Not implemented");
+ }
+
+}
diff --git a/src/test/java/gov/usgs/earthquake/aws/TestListenerNotifier.java b/src/test/java/gov/usgs/earthquake/aws/TestListenerNotifier.java
new file mode 100644
index 00000000..a428912c
--- /dev/null
+++ b/src/test/java/gov/usgs/earthquake/aws/TestListenerNotifier.java
@@ -0,0 +1,23 @@
+package gov.usgs.earthquake.aws;
+
+import gov.usgs.earthquake.distribution.DefaultNotificationReceiver;
+import gov.usgs.earthquake.distribution.ExecutorListenerNotifier;
+
+public class TestListenerNotifier extends ExecutorListenerNotifier {
+
+ // override queueSize for testing
+ public Integer queueSize = null;
+
+ public TestListenerNotifier(DefaultNotificationReceiver receiver) {
+ super(receiver);
+ }
+
+ @Override
+ public Integer getMaxQueueSize() {
+ if (queueSize != null) {
+ return queueSize;
+ }
+ return super.getMaxQueueSize();
+ }
+
+}
diff --git a/src/test/java/gov/usgs/earthquake/aws/TestSession.java b/src/test/java/gov/usgs/earthquake/aws/TestSession.java
new file mode 100644
index 00000000..1121e65c
--- /dev/null
+++ b/src/test/java/gov/usgs/earthquake/aws/TestSession.java
@@ -0,0 +1,179 @@
+package gov.usgs.earthquake.aws;
+
+import java.io.IOException;
+import java.net.URI;
+import java.security.Principal;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.websocket.CloseReason;
+import javax.websocket.Extension;
+import javax.websocket.MessageHandler;
+import javax.websocket.MessageHandler.Partial;
+import javax.websocket.MessageHandler.Whole;
+import javax.websocket.RemoteEndpoint.Async;
+import javax.websocket.RemoteEndpoint.Basic;
+import javax.websocket.Session;
+import javax.websocket.WebSocketContainer;
+
+
+/**
+ * Test session provides testing basic remote to capture sendText calls.
+ */
+public class TestSession implements Session {
+
+ TestBasicRemote testBasicRemote = new TestBasicRemote();
+
+ @Override
+ public Basic getBasicRemote() {
+ return testBasicRemote;
+ }
+
+ @Override
+ public String getId() {
+ return "test session";
+ }
+
+ public String waitForBasicSendText(final long timeoutMillis) throws InterruptedException {
+ return testBasicRemote.waitForSendText(timeoutMillis);
+ }
+
+ // other interface methods not implemented at this time.
+
+ @Override
+ public WebSocketContainer getContainer() {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public void addMessageHandler(MessageHandler handler) throws IllegalStateException {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public void addMessageHandler(Class clazz, Whole handler) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public void addMessageHandler(Class clazz, Partial handler) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public Set getMessageHandlers() {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public void removeMessageHandler(MessageHandler handler) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public String getProtocolVersion() {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public String getNegotiatedSubprotocol() {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public List getNegotiatedExtensions() {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public boolean isSecure() {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public boolean isOpen() {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public long getMaxIdleTimeout() {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public void setMaxIdleTimeout(long milliseconds) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public void setMaxBinaryMessageBufferSize(int length) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public int getMaxBinaryMessageBufferSize() {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public void setMaxTextMessageBufferSize(int length) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public int getMaxTextMessageBufferSize() {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public Async getAsyncRemote() {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public void close() throws IOException {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public void close(CloseReason closeReason) throws IOException {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public URI getRequestURI() {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public Map> getRequestParameterMap() {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public String getQueryString() {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public Map getPathParameters() {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public Map getUserProperties() {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public Principal getUserPrincipal() {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public Set getOpenSessions() {
+ throw new RuntimeException("Not implemented");
+ }
+
+}
diff --git a/src/test/java/gov/usgs/earthquake/distribution/ListenerBackupTest.java b/src/test/java/gov/usgs/earthquake/distribution/ListenerBackupTest.java
index e1b94529..e327d0f6 100644
--- a/src/test/java/gov/usgs/earthquake/distribution/ListenerBackupTest.java
+++ b/src/test/java/gov/usgs/earthquake/distribution/ListenerBackupTest.java
@@ -1,7 +1,10 @@
package gov.usgs.earthquake.distribution;
+import java.io.BufferedReader;
+import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.InputStream;
+import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
@@ -95,6 +98,13 @@ public void teardown() throws Exception {
FileUtils.deleteTree(listener2Storage);
}
+ @Test
+ public void testListenerBackup() throws Exception {
+ for (File f : TEST_FILES) {
+ sendProduct(readProduct(f));
+ }
+ }
+
public void sendProduct(final Product product) throws Exception {
long timeStart = new Date().getTime();
@@ -117,42 +127,30 @@ public void sendProduct(final Product product) throws Exception {
// the queues block until here
start.countDown();
-
// this blocks until both have completed.
finish.await();
+
Assert.assertNotNull("listener1 product is not null",
listener1.getProduct());
Assert.assertNotNull("listener2 product is not null",
listener2.getProduct());
long timeEnd = new Date().getTime();
- System.err.println((timeEnd - timeStart) + "ms sending "
- + product.getId());
+ System.err.println((timeEnd - timeStart) + "ms sending " + product.getId());
}
public Product readProduct(final File file) throws Exception {
long start = new Date().getTime();
- InputStream in = StreamUtils.getInputStream(file);
- try {
- return ObjectProductHandler.getProduct(IOUtil
- .autoDetectProductSource(in));
+ try (
+ final InputStream in = StreamUtils.getInputStream(file);
+ ) {
+ return ObjectProductHandler.getProduct(IOUtil.autoDetectProductSource(in));
} finally {
- try {
- in.close();
- } catch (Exception ignore) {
- }
long end = new Date().getTime();
System.err.println((end - start) + " ms reading " + file.getName());
}
}
- @Test
- public void testListenerBackup() throws Exception {
- for (File f : TEST_FILES) {
- sendProduct(readProduct(f));
- }
- }
-
private class ProductServer implements SocketListenerInterface {
private Product product;
@@ -166,25 +164,50 @@ public void setProduct(Product product) {
}
@Override
- public void onSocket(Socket socket) {
+ public void onSocket(final Socket socket) {
long start = new Date().getTime();
- try {
- OutputStream out = socket.getOutputStream();
+ try (
+ final BufferedReader in = new BufferedReader(
+ new InputStreamReader(socket.getInputStream()));
+ final OutputStream out = socket.getOutputStream();
+ ) {
+ // read request from socket
+ String line;
+ while ((line = in.readLine()) != null) {
+ if ("".equals(line)) {
+ break;
+ }
+ }
+
+ // convert product to bytes first
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ final ObjectProductSource source = new ObjectProductSource(this.product);
+ source.streamTo(new BinaryProductHandler(baos));
+ final byte[] productBytes = baos.toByteArray();
+ long encoded = new Date().getTime();
+ System.err.println((encoded - start) + "ms encoding product "
+ + this.product.getId().toString());
// this is an http response
- out.write("HTTP/1.0 200 OK\r\n".getBytes());
- out.write("Content-type: application/octet-stream\r\n"
- .getBytes());
- out.write("\r\n".getBytes());
-
- // send the product
- ObjectProductSource source = new ObjectProductSource(
- this.product);
- source.streamTo(new BinaryProductHandler(
- new StreamUtils.UnclosableOutputStream(out)));
-
- socket.shutdownOutput();
- socket.close();
+ out.write(
+ String.join("\r\n",
+ "HTTP/1.0 200 OK",
+ "Connection: close",
+ "Content-Length: " + productBytes.length,
+ "Content-Type: application/octet-stream",
+ "", "").getBytes());
+
+ // then transfer
+ out.write(productBytes);
+ out.flush();
+
+ // close socket before try w/resources ends
+ try {
+ socket.shutdownOutput();
+ } catch (Exception e) { e.printStackTrace(); }
+ try {
+ socket.close();
+ } catch (Exception e) { e.printStackTrace(); }
} catch (Exception e) {
e.printStackTrace();
} finally {
@@ -237,13 +260,10 @@ public Product getProduct() {
public void onNotification(final NotificationEvent notification)
throws Exception {
start.await();
- System.err.println("retrieving product "
- + Thread.currentThread().getName());
- // Thread.sleep(600);
+ System.err.println("retrieving product " + Thread.currentThread().getName());
// just retrieve the product
retrieveProduct(notification.getNotification().getProductId());
- System.err.println("retrieved product "
- + Thread.currentThread().getName());
+ System.err.println("retrieved product " + Thread.currentThread().getName());
finish.countDown();
}
diff --git a/src/test/java/gov/usgs/earthquake/distribution/ReplicationStorageListenerTest.java b/src/test/java/gov/usgs/earthquake/distribution/ReplicationStorageListenerTest.java
index 9658b107..5b89c3f0 100644
--- a/src/test/java/gov/usgs/earthquake/distribution/ReplicationStorageListenerTest.java
+++ b/src/test/java/gov/usgs/earthquake/distribution/ReplicationStorageListenerTest.java
@@ -12,8 +12,10 @@
import org.junit.Assume;
import org.junit.After;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
+@Ignore
public class ReplicationStorageListenerTest {
private static final String LOCAL_STORAGE_SECTION = "localStorage";
diff --git a/src/test/java/gov/usgs/earthquake/distribution/SocketProductSenderTest.java b/src/test/java/gov/usgs/earthquake/distribution/SocketProductSenderTest.java
index fd58e133..478a72e0 100644
--- a/src/test/java/gov/usgs/earthquake/distribution/SocketProductSenderTest.java
+++ b/src/test/java/gov/usgs/earthquake/distribution/SocketProductSenderTest.java
@@ -21,22 +21,22 @@ public class SocketProductSenderTest {
@Test
public void testSendXmlWithDeflate() throws Exception {
- testSendProduct(/* binaryFormat */false, /* enableDeflate */true);
+ testSendProduct(/* binaryFormat */false, /* enableDeflate */true, 1984);
}
@Test
public void testSendBinaryWithDeflate() throws Exception {
- testSendProduct(/* binaryFormat */true, /* enableDeflate */true);
+ testSendProduct(/* binaryFormat */true, /* enableDeflate */true, 1985);
}
@Test
public void testSendXmlNoDeflate() throws Exception {
- testSendProduct(/* binaryFormat */false, /* enableDeflate */false);
+ testSendProduct(/* binaryFormat */false, /* enableDeflate */false, 1986);
}
@Test
public void testSendBinaryNoDeflate() throws Exception {
- testSendProduct(/* binaryFormat */true, /* enableDeflate */false);
+ testSendProduct(/* binaryFormat */true, /* enableDeflate */false, 1987);
}
// ------------------------------------------------------------------------
@@ -45,7 +45,7 @@ public void testSendBinaryNoDeflate() throws Exception {
/**
* This is the testing utility method called by individual tests.
- *
+ *
* @param binaryFormat
* whether to use the binary format during test.
* @param enableDeflate
@@ -53,13 +53,13 @@ public void testSendBinaryNoDeflate() throws Exception {
* @throws Exception
*/
protected void testSendProduct(final boolean binaryFormat,
- final boolean enableDeflate) throws Exception {
- TestSocketAcceptor server = new TestSocketAcceptor(this);
+ final boolean enableDeflate, final int port) throws Exception {
+ TestSocketAcceptor server = new TestSocketAcceptor(this, port);
server.start();
SocketProductSender sender = new SocketProductSender();
sender.setHost("localhost");
- sender.setPort(1984);
+ sender.setPort(port);
sender.setBinaryFormat(binaryFormat);
sender.setEnableDeflate(enableDeflate);
sender.startup();
@@ -105,8 +105,8 @@ private class TestSocketAcceptor extends Thread {
private ServerSocket sock = null;
private SocketProductSenderTest cb = null;
- public TestSocketAcceptor(SocketProductSenderTest cb) throws Exception {
- sock = new ServerSocket(1984);
+ public TestSocketAcceptor(SocketProductSenderTest cb, final int port) throws Exception {
+ sock = new ServerSocket(port);
this.cb = cb;
}