+
+
+
+
+
+
+
Type: boolean
+
+
+
+
+
+
+
diff --git a/gencode/java/udmi/schema/PubberOptions.java b/gencode/java/udmi/schema/PubberOptions.java
index d00bad7a03..f560b3169d 100644
--- a/gencode/java/udmi/schema/PubberOptions.java
+++ b/gencode/java/udmi/schema/PubberOptions.java
@@ -24,6 +24,7 @@
"barfConfig",
"messageTrace",
"extraPoint",
+ "configStateDelay",
"missingPoint",
"extraField",
"emptyMissing",
@@ -52,6 +53,8 @@ public class PubberOptions {
public Boolean messageTrace;
@JsonProperty("extraPoint")
public String extraPoint;
+ @JsonProperty("configStateDelay")
+ public Boolean configStateDelay;
@JsonProperty("missingPoint")
public String missingPoint;
@JsonProperty("extraField")
@@ -98,6 +101,7 @@ public int hashCode() {
result = ((result* 31)+((this.missingPoint == null)? 0 :this.missingPoint.hashCode()));
result = ((result* 31)+((this.noConfigAck == null)? 0 :this.noConfigAck.hashCode()));
result = ((result* 31)+((this.extraPoint == null)? 0 :this.extraPoint.hashCode()));
+ result = ((result* 31)+((this.configStateDelay == null)? 0 :this.configStateDelay.hashCode()));
return result;
}
@@ -110,7 +114,7 @@ public boolean equals(Object other) {
return false;
}
PubberOptions rhs = ((PubberOptions) other);
- return (((((((((((((((((this.noPersist == rhs.noPersist)||((this.noPersist!= null)&&this.noPersist.equals(rhs.noPersist)))&&((this.smokeCheck == rhs.smokeCheck)||((this.smokeCheck!= null)&&this.smokeCheck.equals(rhs.smokeCheck))))&&((this.redirectRegistry == rhs.redirectRegistry)||((this.redirectRegistry!= null)&&this.redirectRegistry.equals(rhs.redirectRegistry))))&&((this.noPointState == rhs.noPointState)||((this.noPointState!= null)&&this.noPointState.equals(rhs.noPointState))))&&((this.disableWriteback == rhs.disableWriteback)||((this.disableWriteback!= null)&&this.disableWriteback.equals(rhs.disableWriteback))))&&((this.noHardware == rhs.noHardware)||((this.noHardware!= null)&&this.noHardware.equals(rhs.noHardware))))&&((this.barfConfig == rhs.barfConfig)||((this.barfConfig!= null)&&this.barfConfig.equals(rhs.barfConfig))))&&((this.extraField == rhs.extraField)||((this.extraField!= null)&&this.extraField.equals(rhs.extraField))))&&((this.messageTrace == rhs.messageTrace)||((this.messageTrace!= null)&&this.messageTrace.equals(rhs.messageTrace))))&&((this.emptyMissing == rhs.emptyMissing)||((this.emptyMissing!= null)&&this.emptyMissing.equals(rhs.emptyMissing))))&&((this.noWriteback == rhs.noWriteback)||((this.noWriteback!= null)&&this.noWriteback.equals(rhs.noWriteback))))&&((this.fixedSampleRate == rhs.fixedSampleRate)||((this.fixedSampleRate!= null)&&this.fixedSampleRate.equals(rhs.fixedSampleRate))))&&((this.noLastStart == rhs.noLastStart)||((this.noLastStart!= null)&&this.noLastStart.equals(rhs.noLastStart))))&&((this.missingPoint == rhs.missingPoint)||((this.missingPoint!= null)&&this.missingPoint.equals(rhs.missingPoint))))&&((this.noConfigAck == rhs.noConfigAck)||((this.noConfigAck!= null)&&this.noConfigAck.equals(rhs.noConfigAck))))&&((this.extraPoint == rhs.extraPoint)||((this.extraPoint!= null)&&this.extraPoint.equals(rhs.extraPoint))));
+ return ((((((((((((((((((this.noPersist == rhs.noPersist)||((this.noPersist!= null)&&this.noPersist.equals(rhs.noPersist)))&&((this.smokeCheck == rhs.smokeCheck)||((this.smokeCheck!= null)&&this.smokeCheck.equals(rhs.smokeCheck))))&&((this.redirectRegistry == rhs.redirectRegistry)||((this.redirectRegistry!= null)&&this.redirectRegistry.equals(rhs.redirectRegistry))))&&((this.noPointState == rhs.noPointState)||((this.noPointState!= null)&&this.noPointState.equals(rhs.noPointState))))&&((this.disableWriteback == rhs.disableWriteback)||((this.disableWriteback!= null)&&this.disableWriteback.equals(rhs.disableWriteback))))&&((this.noHardware == rhs.noHardware)||((this.noHardware!= null)&&this.noHardware.equals(rhs.noHardware))))&&((this.barfConfig == rhs.barfConfig)||((this.barfConfig!= null)&&this.barfConfig.equals(rhs.barfConfig))))&&((this.extraField == rhs.extraField)||((this.extraField!= null)&&this.extraField.equals(rhs.extraField))))&&((this.messageTrace == rhs.messageTrace)||((this.messageTrace!= null)&&this.messageTrace.equals(rhs.messageTrace))))&&((this.emptyMissing == rhs.emptyMissing)||((this.emptyMissing!= null)&&this.emptyMissing.equals(rhs.emptyMissing))))&&((this.noWriteback == rhs.noWriteback)||((this.noWriteback!= null)&&this.noWriteback.equals(rhs.noWriteback))))&&((this.fixedSampleRate == rhs.fixedSampleRate)||((this.fixedSampleRate!= null)&&this.fixedSampleRate.equals(rhs.fixedSampleRate))))&&((this.noLastStart == rhs.noLastStart)||((this.noLastStart!= null)&&this.noLastStart.equals(rhs.noLastStart))))&&((this.missingPoint == rhs.missingPoint)||((this.missingPoint!= null)&&this.missingPoint.equals(rhs.missingPoint))))&&((this.noConfigAck == rhs.noConfigAck)||((this.noConfigAck!= null)&&this.noConfigAck.equals(rhs.noConfigAck))))&&((this.extraPoint == rhs.extraPoint)||((this.extraPoint!= null)&&this.extraPoint.equals(rhs.extraPoint))))&&((this.configStateDelay == rhs.configStateDelay)||((this.configStateDelay!= null)&&this.configStateDelay.equals(rhs.configStateDelay))));
}
}
diff --git a/gencode/python/udmi/schema/options_pubber.py b/gencode/python/udmi/schema/options_pubber.py
index a9214b542c..8633ac8ad6 100644
--- a/gencode/python/udmi/schema/options_pubber.py
+++ b/gencode/python/udmi/schema/options_pubber.py
@@ -13,6 +13,7 @@ def __init__(self):
self.barfConfig = None
self.messageTrace = None
self.extraPoint = None
+ self.configStateDelay = None
self.missingPoint = None
self.extraField = None
self.emptyMissing = None
@@ -35,6 +36,7 @@ def from_dict(source):
result.barfConfig = source.get('barfConfig')
result.messageTrace = source.get('messageTrace')
result.extraPoint = source.get('extraPoint')
+ result.configStateDelay = source.get('configStateDelay')
result.missingPoint = source.get('missingPoint')
result.extraField = source.get('extraField')
result.emptyMissing = source.get('emptyMissing')
@@ -79,6 +81,8 @@ def to_dict(self):
result['messageTrace'] = self.messageTrace # 5
if self.extraPoint:
result['extraPoint'] = self.extraPoint # 5
+ if self.configStateDelay:
+ result['configStateDelay'] = self.configStateDelay # 5
if self.missingPoint:
result['missingPoint'] = self.missingPoint # 5
if self.extraField:
diff --git a/pubber/src/main/java/daq/pubber/Pubber.java b/pubber/src/main/java/daq/pubber/Pubber.java
index fb9b8f1e84..c2e83c0f10 100644
--- a/pubber/src/main/java/daq/pubber/Pubber.java
+++ b/pubber/src/main/java/daq/pubber/Pubber.java
@@ -2,6 +2,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
import static com.google.udmi.util.GeneralUtils.deepCopy;
import static com.google.udmi.util.GeneralUtils.fromJsonFile;
import static com.google.udmi.util.GeneralUtils.fromJsonString;
@@ -171,6 +172,7 @@ public class Pubber {
private static final long BYTES_PER_MEGABYTE = 1024 * 1024;
private static final String CORRUPT_STATE_MESSAGE = "!&*@(!*&@!";
private static final long INJECT_MESSAGE_DELAY_MS = 2000; // Delay to make sure testing is stable.
+ private static final int FORCED_STATE_TIME_MS = 10000;
final State deviceState = new State();
private final File outDir;
private final ScheduledExecutorService executor = new CatchingScheduledThreadPoolExecutor(1);
@@ -936,13 +938,32 @@ private void publisherHandler(String type, String phase, Throwable cause) {
publishLogMessage(report);
// TODO: Replace this with a heap so only the highest-priority status is reported.
deviceState.system.status = shouldLogLevel(report.level) ? report : null;
- publishAsynchronousState();
+ publishConfigStateUpdate();
if (cause != null && configLatch.getCount() > 0) {
configLatch.countDown();
warn("Released startup latch because reported error");
}
}
+ /**
+ * Issue a state update in response to a received config message. This will optionally
+ * add a synthetic delay in so that testing infrastructure can test that related sequence
+ * tests handle this case appropriately.
+ */
+ private void publishConfigStateUpdate() {
+ if (TRUE.equals(configuration.options.configStateDelay)) {
+ delayNextStateUpdate();
+ }
+ publishAsynchronousState();
+ }
+
+ private void delayNextStateUpdate() {
+ // Calculate a synthetic last state time that factors in the optional delay.
+ long syntheticType = System.currentTimeMillis() - STATE_THROTTLE_MS + FORCED_STATE_TIME_MS;
+ // And use the synthetic time iff it's later than the actual last state time.
+ lastStateTimeMs = Math.max(lastStateTimeMs, syntheticType);
+ }
+
private boolean shouldLogLevel(int level) {
Integer minLoglevel = deviceConfig.system == null ? null : deviceConfig.system.min_loglevel;
return level >= (minLoglevel == null ? Level.INFO.value() : minLoglevel);
@@ -986,7 +1007,7 @@ private void configHandler(Config config) {
} catch (Exception e) {
publisherConfigLog("apply", e);
}
- publishAsynchronousState();
+ publishConfigStateUpdate();
}
private void processConfigUpdate(Config config) {
@@ -1494,7 +1515,8 @@ private void publishLogMessage(Entry report) {
private void publishAsynchronousState() {
if (stateLock.tryAcquire()) {
try {
- long delay = lastStateTimeMs + STATE_THROTTLE_MS - System.currentTimeMillis();
+ long soonestAllowedStateUpdate = lastStateTimeMs + STATE_THROTTLE_MS;
+ long delay = soonestAllowedStateUpdate - System.currentTimeMillis();
debug(String.format("State update defer %dms", delay));
if (delay > 0) {
markStateDirty(delay);
@@ -1545,7 +1567,6 @@ private void publishStateMessage(Object stateToSend) {
warn(String.format("State update delay %dms", delay));
safeSleep(delay);
}
-
lastStateTimeMs = System.currentTimeMillis();
CountDownLatch latch = new CountDownLatch(1);
publishDeviceMessage(stateToSend, () -> {
diff --git a/schema/options_pubber.json b/schema/options_pubber.json
index 09ad91819c..6fe9871f08 100644
--- a/schema/options_pubber.json
+++ b/schema/options_pubber.json
@@ -28,6 +28,9 @@
"extraPoint": {
"type": "string"
},
+ "configStateDelay": {
+ "type": "boolean"
+ },
"missingPoint": {
"type": "string"
},
diff --git a/validator/src/main/java/com/google/daq/mqtt/sequencer/SequenceBase.java b/validator/src/main/java/com/google/daq/mqtt/sequencer/SequenceBase.java
index 7994a1c006..ffbfdf393c 100644
--- a/validator/src/main/java/com/google/daq/mqtt/sequencer/SequenceBase.java
+++ b/validator/src/main/java/com/google/daq/mqtt/sequencer/SequenceBase.java
@@ -104,6 +104,7 @@ public class SequenceBase {
private static final int FUNCTIONS_VERSION_BETA = Validator.REQUIRED_FUNCTION_VER;
private static final int FUNCTIONS_VERSION_ALPHA = 6; // Version required for alpha execution.
+ private static final long CONFIG_BARRIER_MS = 1000;
static {
// Sanity check to make sure ALPHA version is increased if forced by increased BETA.
@@ -723,6 +724,8 @@ private void assertConfigIsNotPending() {
protected void updateConfig(String reason) {
assertConfigIsNotPending();
+ // Add a forced sleep to make sure second-quantized timestamps are unique.
+ safeSleep(CONFIG_BARRIER_MS);
updateConfig(SubFolder.SYSTEM, augmentConfig(deviceConfig.system));
updateConfig(SubFolder.POINTSET, deviceConfig.pointset);
updateConfig(SubFolder.GATEWAY, deviceConfig.gateway);
@@ -1346,6 +1349,12 @@ protected void checkThatHasInterestingSystemStatus(boolean isInteresting) {
check.accept("interesting system status", this::hasInterestingSystemStatus);
}
+ protected void untilHasInterestingSystemStatus(boolean isInteresting) {
+ BiConsumer
> until =
+ isInteresting ? this::untilTrue : this::untilFalse;
+ until.accept("interesting system status", this::hasInterestingSystemStatus);
+ }
+
/**
* Add a summary of a test, with a simple description of what it's testing.
*/
diff --git a/validator/src/main/java/com/google/daq/mqtt/sequencer/sequences/ConfigSequences.java b/validator/src/main/java/com/google/daq/mqtt/sequencer/sequences/ConfigSequences.java
index f89e5ffd4a..83dbfb8b67 100644
--- a/validator/src/main/java/com/google/daq/mqtt/sequencer/sequences/ConfigSequences.java
+++ b/validator/src/main/java/com/google/daq/mqtt/sequencer/sequences/ConfigSequences.java
@@ -91,7 +91,7 @@ public void broken_config() {
setExtraField("break_json");
untilLogged(SYSTEM_CONFIG_RECEIVE, SYSTEM_CONFIG_RECEIVE_LEVEL);
- checkThatHasInterestingSystemStatus(true);
+ untilHasInterestingSystemStatus(true);
Entry stateStatus = deviceState.system.status;
info("Error message: " + stateStatus.message);
debug("Error detail: " + stateStatus.detail);
@@ -106,7 +106,11 @@ public void broken_config() {
untilLogged(SYSTEM_CONFIG_PARSE, Level.ERROR);
checkNotLogged(SYSTEM_CONFIG_APPLY, SYSTEM_CONFIG_APPLY_LEVEL);
+ // Will restore min_loglevel to the default of INFO.
resetConfig(); // clears extra_field
+ untilLogged(SYSTEM_CONFIG_RECEIVE, SYSTEM_CONFIG_RECEIVE_LEVEL);
+ untilLogged(SYSTEM_CONFIG_APPLY, SYSTEM_CONFIG_APPLY_LEVEL);
+
deviceConfig.system.min_loglevel = Level.DEBUG.value();
checkThatHasInterestingSystemStatus(false);
untilTrue("last_config updated",
@@ -114,6 +118,7 @@ public void broken_config() {
);
assertTrue("system operational", deviceState.system.operation.operational);
untilLogged(SYSTEM_CONFIG_APPLY, SYSTEM_CONFIG_APPLY_LEVEL);
+ // These should not be logged since the level was at INFO until the new config is applied.
checkNotLogged(SYSTEM_CONFIG_RECEIVE, SYSTEM_CONFIG_RECEIVE_LEVEL);
checkNotLogged(SYSTEM_CONFIG_PARSE, SYSTEM_CONFIG_PARSE_LEVEL);
}