Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add configurable alert to print log statement warning of missing TIMs deposits #106

Open
wants to merge 15 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
"redhat.java",
"redhat.vscode-commons",
"ms-vscode.cpptools",
"ms-vscode.cmake-tools"
"ms-vscode.cmake-tools",
"ms-vscode.makefile-tools",
"Oracle.oracle-java"
]
}
},
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ Copy the following files from `jpo-ode` directory into your DOCKER_SHARED_VOLUME
- Copy jpo-ode/aem.properties to ${DOCKER_SHARED_VOLUME}/aem.properties
- Copy jpo-utils/sample.env to jpo-utils/.env
- Fill in the variables as described in the [README](jpo-utils/README.md)
- If you want to see log-based alerts notifying you if no TIMs were ingested in a specific period of time, you will want to update your `.env` file to set `ODE_TIM_INGEST_MONITORING_ENABLED=true` and `ODE_TIM_INGEST_MONITORING_INTERVAL=<seconds between ingest count checks>`. See [TimIngestWatcher](jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/traveler/TimIngestWatcher.java) to see the log-based monitoring provided.

**Make:**

Expand Down
2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ services:
DATA_SIGNING_ENABLED_RSU: ${DATA_SIGNING_ENABLED_RSU}
DATA_SIGNING_ENABLED_SDW: ${DATA_SIGNING_ENABLED_SDW}
DEFAULT_SNMP_PROTOCOL: ${DEFAULT_SNMP_PROTOCOL}
ODE_TIM_INGEST_MONITORING_ENABLED: ${ODE_TIM_INGEST_MONITORING_ENABLED}
ODE_TIM_INGEST_MONITORING_INTERVAL: ${ODE_TIM_INGEST_MONITORING_INTERVAL}
depends_on:
kafka:
condition: service_healthy
Expand Down
Empty file modified jpo-ode-core/mvnw
100644 → 100755
Empty file.
Empty file modified jpo-ode-core/mvnw.cmd
100644 → 100755
Empty file.
Empty file modified jpo-ode-svcs/mvnw
100644 → 100755
Empty file.
Empty file modified jpo-ode-svcs/mvnw.cmd
100644 → 100755
Empty file.
Empty file modified jpo-ode-svcs/run.bat
100644 → 100755
Empty file.
Empty file modified jpo-ode-svcs/run.sh
100644 → 100755
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package us.dot.its.jpo.ode;

public class ConfigEnvironmentVariables {
public static final String ODE_TIM_INGEST_MONITORING_ENABLED = "ODE_TIM_INGEST_MONITORING_ENABLED";
public static final String ODE_TIM_INGEST_MONITORING_INTERVAL = "ODE_TIM_INGEST_MONITORING_INTERVAL"; // in seconds
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import java.text.SimpleDateFormat;
import java.time.format.DateTimeParseException;
import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

import org.json.JSONObject;
import org.slf4j.Logger;
Expand All @@ -34,6 +36,7 @@

import com.fasterxml.jackson.databind.node.ObjectNode;

import us.dot.its.jpo.ode.ConfigEnvironmentVariables;
import us.dot.its.jpo.ode.OdeProperties;
import us.dot.its.jpo.ode.context.AppContext;
import us.dot.its.jpo.ode.model.OdeMsgMetadata.GeneratedBy;
Expand Down Expand Up @@ -62,6 +65,7 @@
public class TimDepositController {

private static final Logger logger = LoggerFactory.getLogger(TimDepositController.class);
private static final TimIngestTracker INGEST_MONITOR = TimIngestTracker.getInstance();

private static final String ERRSTR = "error";
private static final String WARNING = "warning";
Expand Down Expand Up @@ -105,6 +109,18 @@ public TimDepositController(OdeProperties odeProperties) {
? Boolean.parseBoolean(System.getenv("DATA_SIGNING_ENABLED_SDW"))
: true;

// start the TIM ingest monitoring service if enabled
Boolean timIngestMonitoringEnabled = Boolean.valueOf(odeProperties.getProperty(ConfigEnvironmentVariables.ODE_TIM_INGEST_MONITORING_ENABLED));
if (timIngestMonitoringEnabled) {
logger.info("TIM ingest monitoring enabled.");

ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
Long monitoringInterval = Long.valueOf(odeProperties.getProperty(ConfigEnvironmentVariables.ODE_TIM_INGEST_MONITORING_INTERVAL));
mcook42 marked this conversation as resolved.
Show resolved Hide resolved

scheduledExecutorService.scheduleAtFixedRate(new TimIngestWatcher(monitoringInterval), monitoringInterval, monitoringInterval, java.util.concurrent.TimeUnit.SECONDS);
} else {
logger.info("TIM ingest monitoring disabled.");
}
}

/**
Expand Down Expand Up @@ -261,6 +277,8 @@ public synchronized ResponseEntity<String> depositTim(String jsonString, Request
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(JsonUtils.jsonKeyValue(ERRSTR, errMsg));
}

INGEST_MONITOR.incrementTotalMessagesReceived();

return ResponseEntity.status(HttpStatus.OK).body(JsonUtils.jsonKeyValue(SUCCESS, "true"));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package us.dot.its.jpo.ode.traveler;

public class TimIngestTracker {

private long totalMessagesReceived;

private TimIngestTracker() {
totalMessagesReceived = 0;
}

public static TimIngestTracker getInstance() {
return TimIngestMonitorHolder.INSTANCE;
}

private static class TimIngestMonitorHolder {
private static final TimIngestTracker INSTANCE = new TimIngestTracker();
}

public long getTotalMessagesReceived() {
return totalMessagesReceived;
}

public void incrementTotalMessagesReceived() {
totalMessagesReceived++;
}

public void resetTotalMessagesReceived() {
totalMessagesReceived = 0;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package us.dot.its.jpo.ode.traveler;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TimIngestWatcher implements Runnable {

private static final Logger logger = LoggerFactory.getLogger(TimIngestWatcher.class.getName());
private final long interval;

public TimIngestWatcher(long interval) {
this.interval = interval;
}

@Override
public void run() {
TimIngestTracker tracker = TimIngestTracker.getInstance();
long ingested = tracker.getTotalMessagesReceived();

if (ingested == 0) {
logger.warn("ODE has not received TIM deposits in {} seconds.", interval);
} else {
logger.debug("ODE has received {} TIM deposits in the last {} seconds.", ingested, interval);
}

// After checking the number of TIMs ingested in the last interval, reset the counter
tracker.resetTotalMessagesReceived();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@
import us.dot.its.jpo.ode.OdeProperties;
import us.dot.its.jpo.ode.udp.bsm.BsmReceiver;
import us.dot.its.jpo.ode.udp.generic.GenericReceiver;
import us.dot.its.jpo.ode.udp.tim.TimReceiver;
import us.dot.its.jpo.ode.udp.ssm.SsmReceiver;
import us.dot.its.jpo.ode.udp.srm.SrmReceiver;
import us.dot.its.jpo.ode.udp.spat.SpatReceiver;
import us.dot.its.jpo.ode.udp.map.MapReceiver;
import us.dot.its.jpo.ode.udp.psm.PsmReceiver;
import us.dot.its.jpo.ode.udp.spat.SpatReceiver;
import us.dot.its.jpo.ode.udp.srm.SrmReceiver;
import us.dot.its.jpo.ode.udp.ssm.SsmReceiver;
import us.dot.its.jpo.ode.udp.tim.TimReceiver;

/**
* Centralized UDP service dispatcher.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
package us.dot.its.jpo.ode.udp.tim;

import java.net.DatagramPacket;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

import us.dot.its.jpo.ode.coder.StringPublisher;
import us.dot.its.jpo.ode.OdeProperties;
import us.dot.its.jpo.ode.coder.StringPublisher;
import us.dot.its.jpo.ode.udp.AbstractUdpReceiverPublisher;
import us.dot.its.jpo.ode.udp.UdpHexDecoder;

public class TimReceiver extends AbstractUdpReceiverPublisher {
private static Logger logger = LoggerFactory.getLogger(TimReceiver.class);

private StringPublisher timPublisher;

@Autowired
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@
******************************************************************************/
package us.dot.its.jpo.ode.traveler;

import static org.junit.Assert.assertEquals;

import org.apache.commons.io.IOUtils;
import static org.junit.Assert.assertEquals;
import org.junit.jupiter.api.Test;
import org.springframework.http.ResponseEntity;

Expand All @@ -38,6 +37,7 @@
import us.dot.its.jpo.ode.util.XmlUtils.XmlUtilsException;
import us.dot.its.jpo.ode.wrapper.MessageProducer;


public class TimDepositControllerTest {

@Tested
Expand Down Expand Up @@ -167,6 +167,15 @@ public void testDepositingTimWithExtraProperties(@Capturing TimTransmogrifier ca
assertEquals("{\"success\":\"true\"}", actualResponse.getBody());
}

@Test
public void testSuccessfulTimIngestIsTracked(@Capturing TimTransmogrifier capturingTimTransmogrifier, @Capturing XmlUtils capturingXmlUtils) {
String timToSubmit = "{\"request\":{\"rsus\":[],\"snmp\":{},\"randomProp1\":true,\"randomProp2\":\"hello world\"},\"tim\":{\"msgCnt\":\"13\",\"timeStamp\":\"2017-03-13T01:07:11-05:00\",\"randomProp3\":123,\"randomProp4\":{\"nestedProp1\":\"foo\",\"nestedProp2\":\"bar\"}}}";
long priorIngestCount = TimIngestTracker.getInstance().getTotalMessagesReceived();
ResponseEntity<String> actualResponse = testTimDepositController.postTim(timToSubmit);
assertEquals("{\"success\":\"true\"}", actualResponse.getBody());
assertEquals(priorIngestCount + 1, TimIngestTracker.getInstance().getTotalMessagesReceived());
}

@Test
public void testSuccessfulRsuMessageReturnsSuccessMessagePost(@Capturing TimTransmogrifier capturingTimTransmogrifier, @Capturing XmlUtils capturingXmlUtils) {
String timToSubmit = "{\"request\": {\"rsus\": [{\"latitude\": 30.123456, \"longitude\": -100.12345, \"rsuId\": 123, \"route\": \"myroute\", \"milepost\": 10, \"rsuTarget\": \"172.0.0.1\", \"rsuRetries\": 3, \"rsuTimeout\": 5000, \"rsuIndex\": 7, \"rsuUsername\": \"myusername\", \"rsuPassword\": \"mypassword\"}], \"snmp\": {\"rsuid\": \"83\", \"msgid\": 31, \"mode\": 1, \"channel\": 183, \"interval\": 2000, \"deliverystart\": \"2024-05-13T14:30:00Z\", \"deliverystop\": \"2024-05-13T22:30:00Z\", \"enable\": 1, \"status\": 4}}, \"tim\": {\"msgCnt\": \"1\", \"timeStamp\": \"2024-05-10T19:01:22Z\", \"packetID\": \"123451234512345123\", \"urlB\": \"null\", \"dataframes\": [{\"startDateTime\": \"2024-05-13T20:30:05.014Z\", \"durationTime\": \"30\", \"sspTimRights\": \"1\", \"frameType\": \"advisory\", \"msgId\": {\"roadSignID\": {\"mutcdCode\": \"warning\", \"viewAngle\": \"1111111111111111\", \"position\": {\"latitude\": 30.123456, \"longitude\": -100.12345}}}, \"priority\": \"5\", \"sspLocationRights\": \"1\", \"regions\": [{\"name\": \"I_myroute_RSU_172.0.0.1\", \"anchorPosition\": {\"latitude\": 30.123456, \"longitude\": -100.12345}, \"laneWidth\": \"50\", \"directionality\": \"3\", \"closedPath\": \"false\", \"description\": \"path\", \"path\": {\"scale\": 0, \"nodes\": [{\"delta\": \"node-LL\", \"nodeLat\": 0.0, \"nodeLong\": 0.0}, {\"delta\": \"node-LL\", \"nodeLat\": 0.0, \"nodeLong\": 0.0}], \"type\": \"ll\"}, \"direction\": \"0000000000010000\"}], \"sspMsgTypes\": \"1\", \"sspMsgContent\": \"1\", \"content\": \"workZone\", \"items\": [\"771\"], \"url\": \"null\"}]}}";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package us.dot.its.jpo.ode.traveler;

import static org.junit.Assert.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import org.junit.jupiter.api.Test;

public class TimIngestTrackerTest {

@Test
public void testCanIncrementTotalMessagesReceived() {
TimIngestTracker testTimIngestTracker = TimIngestTracker.getInstance();
long priorCount = testTimIngestTracker.getTotalMessagesReceived();
testTimIngestTracker.incrementTotalMessagesReceived();
assertTrue(testTimIngestTracker.getTotalMessagesReceived() > priorCount);
}

@Test
public void testCanResetTotalMessagesReceived() {
TimIngestTracker testTimIngestTracker = TimIngestTracker.getInstance();
testTimIngestTracker.incrementTotalMessagesReceived();
assertTrue(testTimIngestTracker.getTotalMessagesReceived()> 0);
testTimIngestTracker.resetTotalMessagesReceived();
assertEquals(0, testTimIngestTracker.getTotalMessagesReceived());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Click nbfs://nbhost/SystemFileSystem/Templates/Licenses/license-default.txt to change this license
* Click nbfs://nbhost/SystemFileSystem/Templates/UnitTests/JUnit5TestClass.java to edit this template
*/

package us.dot.its.jpo.ode.traveler;

import static org.junit.jupiter.api.Assertions.assertEquals;
import org.junit.jupiter.api.Test;

public class TimIngestWatcherTest {

@Test
public void testRun() {
TimIngestWatcher watcher = new TimIngestWatcher(0);
watcher.run();

// we can't easily test that the run method wrote the correct log message, but we can test that it reset the total messages received after running
TimIngestTracker testTimIngestTracker = TimIngestTracker.getInstance();
assertEquals(0, testTimIngestTracker.getTotalMessagesReceived());
}

}
6 changes: 5 additions & 1 deletion sample.env
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,8 @@ AEM_LOG_TO_FILE=false
AEM_LOG_LEVEL=INFO
ADM_LOG_TO_FILE=false
ADM_LOG_TO_CONSOLE=true
ADM_LOG_LEVEL=INFO
ADM_LOG_LEVEL=INFO

# ODE Monitoring
ODE_TIM_INGEST_MONITORING_ENABLED=false
mcook42 marked this conversation as resolved.
Show resolved Hide resolved
TIM_INGEST_MONITORING_INTERVAL=
Loading