Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate ProcessedBsm #11

Draft
wants to merge 28 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
4f239e0
Mark repartion classes for removal
iyourshaw Dec 24, 2024
3079678
Mark classes to update
iyourshaw Dec 24, 2024
c33d495
Remove repartition topology
iyourshaw Dec 24, 2024
d45b67c
Converting BsmEventProcessor, BsmEvent, BsmTimestampExtractor
iyourshaw Dec 26, 2024
7385b7f
Use ProcessedBsm in analytics classes
iyourshaw Dec 26, 2024
59edf25
Fix ConflictMonitorProperties
iyourshaw Dec 27, 2024
56e39cd
Fixing tests
iyourshaw Dec 27, 2024
04fa849
Update unit tests
iyourshaw Dec 27, 2024
8735460
Fix unit test
iyourshaw Dec 31, 2024
bc6d11d
Update jpo-utils
iyourshaw Jan 8, 2025
6ddd726
Use ProcessedBsm feature
iyourshaw Jan 8, 2025
89ca335
Update unit tests
iyourshaw Jan 8, 2025
931452d
Update MessageIngestTopology
iyourshaw Jan 8, 2025
44290ae
Remove unused code
iyourshaw Jan 8, 2025
d407e56
Updating test-message-sender
iyourshaw Jan 8, 2025
16b8c17
Merge from develop
iyourshaw Jan 8, 2025
6f3061a
Update agg code
iyourshaw Jan 8, 2025
d7c683d
Update jpo-utils. Use specific image versions in docker compose.
iyourshaw Jan 9, 2025
eb33f25
Temporarily change maven repo
iyourshaw Jan 9, 2025
4c37561
Merge from develop
iyourshaw Jan 9, 2025
2bf8d33
Add ability to send ProcessdBsms to script runner
iyourshaw Jan 9, 2025
ef90bd8
Test message sender: create/send processed bsms
iyourshaw Jan 10, 2025
ca8e694
Fix generating ProcessedBsm scripts
iyourshaw Jan 10, 2025
978bdba
ProcessedBsm test script
iyourshaw Jan 10, 2025
c06cc23
Update script runner
iyourshaw Jan 10, 2025
8fba46f
Aggregation debug flag off by default
iyourshaw Jan 10, 2025
127ecfc
Maven settings back to normal
iyourshaw Feb 11, 2025
006f213
Resolve conflicts
iyourshaw Feb 11, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions docker-compose-ode.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ services:
- all
- cm_full
- ode_geojsonconverter
image: usdotjpoode/geojsonconverter:latest
image: usdotjpoode/geojsonconverter:2.0.0
restart: ${RESTART_POLICY}
deploy:
resources:
Expand Down Expand Up @@ -36,7 +36,7 @@ services:
- all
- cm_full
- ode_geojsonconverter
image: usdotjpoode/jpo-ode:latest
image: usdotjpoode/jpo-ode:4.0.0
restart: ${RESTART_POLICY}
deploy:
resources:
Expand Down Expand Up @@ -90,7 +90,7 @@ services:
- all
- cm_full
- ode_geojsonconverter
image: usdotjpoode/asn1_codec:latest
image: usdotjpoode/asn1_codec:3.0.0
restart: ${RESTART_POLICY}
deploy:
resources:
Expand Down
1,476 changes: 1,476 additions & 0 deletions jpo-conflictmonitor/scripts/IntegrationTestScripts/ProcessedBsms.csv

Large diffs are not rendered by default.

Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,6 @@
import us.dot.its.jpo.conflictmonitor.monitor.algorithms.notification.NotificationParameters;
import us.dot.its.jpo.conflictmonitor.monitor.algorithms.message_ingest.MessageIngestAlgorithmFactory;
import us.dot.its.jpo.conflictmonitor.monitor.algorithms.message_ingest.MessageIngestParameters;
import us.dot.its.jpo.conflictmonitor.monitor.algorithms.repartition.RepartitionAlgorithmFactory;
import us.dot.its.jpo.conflictmonitor.monitor.algorithms.repartition.RepartitionParameters;
import us.dot.its.jpo.conflictmonitor.monitor.algorithms.map_message_count_progression.MapMessageCountProgressionAlgorithmFactory;
import us.dot.its.jpo.conflictmonitor.monitor.algorithms.map_message_count_progression.MapMessageCountProgressionParameters;
import us.dot.its.jpo.conflictmonitor.monitor.algorithms.spat_message_count_progression.SpatMessageCountProgressionAlgorithmFactory;
Expand Down Expand Up @@ -208,10 +206,6 @@ public class ConflictMonitorProperties implements EnvironmentAware {
private String connectionOfTravelAssessmentAlgorithm;
private ConnectionOfTravelAssessmentParameters connectionOfTravelAssessmentAlgorithmParameters;

private RepartitionAlgorithmFactory repartitionAlgorithmFactory;
private String repartitionAlgorithm;
private RepartitionParameters repartitionAlgorithmParameters;

private MapMessageCountProgressionAlgorithmFactory mapMessageCountProgressionAlgorithmFactory;
private String mapMessageCountProgressionAlgorithm;
private MapMessageCountProgressionParameters mapMessageCountProgressionAlgorithmParameters;
Expand Down Expand Up @@ -257,8 +251,7 @@ public class ConflictMonitorProperties implements EnvironmentAware {
private MessageIngestAlgorithmFactory messageIngestAlgorithmFactory;
private MessageIngestParameters messageIngestParameters;



@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
@Autowired
public void setAggregationParameters(AggregationParameters aggregationParameters) {
this.aggregationParameters = aggregationParameters;
Expand Down Expand Up @@ -375,11 +368,13 @@ public void setSpatValidationParameters(SpatValidationParameters spatBroadcastRa
this.spatValidationParameters = spatBroadcastRateParameters;
}

@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
@Autowired
public void setMapValidationAlgorithmFactory(MapValidationAlgorithmFactory factory) {
this.mapValidationAlgorithmFactory = factory;
}

@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
@Autowired
public void setSpatValidationAlgorithmFactory(SpatValidationStreamsAlgorithmFactory factory) {
this.spatValidationAlgorithmFactory = factory;
Expand All @@ -402,7 +397,7 @@ public void setMapTimestampDeltaParameters(MapTimestampDeltaParameters mapTimest
this.mapTimestampDeltaAlgorithm = mapTimestampDeltaParameters.getAlgorithm();
}


@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
@Autowired
public void setMapTimestampDeltaAlgorithmFactory(MapTimestampDeltaAlgorithmFactory factory) {
this.mapTimestampDeltaAlgorithmFactory = factory;
Expand All @@ -415,14 +410,15 @@ public void setSpatTimestampDeltaParameters(SpatTimestampDeltaParameters spatTim
this.spatTimestampDeltaAlgorithm = spatTimestampDeltaParameters.getAlgorithm();
}

@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
@Autowired
public void setSpatTimestampDeltaAlgorithmFactory(SpatTimestampDeltaAlgorithmFactory factory) {
this.spatTimestampDeltaAlgorithmFactory = factory;
}




@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
@Autowired
public void setLaneDirectionOfTravelAlgorithmFactory(
LaneDirectionOfTravelAlgorithmFactory laneDirectionOfTravelAlgorithmFactory) {
Expand All @@ -444,7 +440,7 @@ public void setLaneDirectionOfTravelParameters(LaneDirectionOfTravelParameters l
}



@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
@Autowired
public void setConnectionOfTravelAlgorithmFactory(
ConnectionOfTravelAlgorithmFactory connectionOfTravelAlgorithmFactory) {
Expand All @@ -466,7 +462,7 @@ public void setConnectionOfTravelParameters(ConnectionOfTravelParameters connect
}



@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
@Autowired
public void setSignalStateVehicleCrossesAlgorithmFactory(
StopLinePassageAlgorithmFactory signalStateVehicleCrossesAlgorithmFactory) {
Expand All @@ -489,7 +485,7 @@ public void setSignalStateVehicleCrossesParameters(
}



@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
@Autowired
public void setSignalStateVehicleStopsAlgorithmFactory(
StopLineStopAlgorithmFactory signalStateVehicleStopsAlgorithmFactory) {
Expand All @@ -511,7 +507,7 @@ public void setSignalStateVehicleStopsParameters(StopLineStopParameters signalSt
}



@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
@Autowired
public void setMapSpatMessageAssessmentAlgorithmFactory(
MapSpatMessageAssessmentAlgorithmFactory mapSpatMessageAssessmentAlgorithmFactory) {
Expand All @@ -533,7 +529,7 @@ public void setMapSpatMessageAssessmentParameters(




@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
@Autowired
public void setSpatTimeChangeDetailsAlgorithmFactory(
SpatTimeChangeDetailsAlgorithmFactory spatTimeChangeDetailsAlgorithmFactory) {
Expand Down Expand Up @@ -561,6 +557,7 @@ public void setSpatTimeChangeDetailsParameters(SpatTimeChangeDetailsParameters s
this.spatTimeChangeDetailsParameters = spatTimeChangeDetailsParameters;
}

@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
@Autowired
public void setSpatTransitionAlgorithmFactory(EventStateProgressionAlgorithmFactory spatTransitionAlgorithmFactory) {
this.spatTransitionAlgorithmFactory = spatTransitionAlgorithmFactory;
Expand All @@ -576,7 +573,7 @@ public void setSpatTransitionParameters(EventStateProgressionParameters spatTran
this.spatTransitionParameters = spatTransitionParameters;
}


@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
@Autowired
public void setMapTimeChangeDetailsAlgorithmFactory(
MapTimeChangeDetailsAlgorithmFactory mapTimeChangeDetailsAlgorithmFactory) {
Expand All @@ -599,7 +596,7 @@ public void setMapTimeChangeDetailsParameters(MapTimeChangeDetailsParameters map




@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
@Autowired
public void setSignalStateEventAssessmentAlgorithmFactory(
StopLinePassageAssessmentAlgorithmFactory signalStateEventAssessmentAlgorithmFactory) {
Expand All @@ -621,6 +618,7 @@ public void setSignalStateEventAssessmentAlgorithmParameters(
this.signalStateEventAssessmentAlgorithmParameters = signalStateEventAssessmentAlgorithmParameters;
}

@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
@Autowired
public void setStopLineStopAssessmentAlgorithmFactory(
StopLineStopAssessmentAlgorithmFactory stopLineStopAssessmentAlgorithmFactory) {
Expand All @@ -643,7 +641,7 @@ public void setStopLineStopAssessmentAlgorithmParameters(
}



@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
@Autowired
public void setLaneDirectionOfTravelAssessmentAlgorithmFactory(
LaneDirectionOfTravelAssessmentAlgorithmFactory laneDirectionfOfTravelAssessmentAlgorithmFactory) {
Expand All @@ -666,7 +664,7 @@ public void setLaneDirectionOfTravelAssessmentAlgorithmParameters(
}



@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
@Autowired
public void setConnectionOfTravelAssessmentAlgorithmFactory(
ConnectionOfTravelAssessmentAlgorithmFactory connectionOfTravelAssessmentAlgorithmFactory) {
Expand All @@ -688,28 +686,7 @@ public void setConnectionOfTravelAssessmentAlgorithmParameters(
this.connectionOfTravelAssessmentAlgorithmParameters = connectionOfTravelAssessmentAlgorithmParameters;
}



@Autowired
public void setRepartitionAlgorithmFactory(RepartitionAlgorithmFactory repartitionAlgorithmFactory) {
this.repartitionAlgorithmFactory = repartitionAlgorithmFactory;
}



@Value("${repartition.algorithm}")
public void setRepartitionAlgorithm(String repartitionAlgorithm) {
this.repartitionAlgorithm = repartitionAlgorithm;
}



@Autowired
public void setRepartitionAlgorithmParameters(RepartitionParameters repartitionAlgorithmParameters) {
this.repartitionAlgorithmParameters = repartitionAlgorithmParameters;
}


@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
@Autowired
public void setMapMessageCountProgressionAlgorithmFactory(MapMessageCountProgressionAlgorithmFactory mapMessageCountProgressionAlgorithmFactory) {
this.mapMessageCountProgressionAlgorithmFactory = mapMessageCountProgressionAlgorithmFactory;
Expand All @@ -726,7 +703,7 @@ public void setMapMessageCountProgressionAlgorithmParameters(MapMessageCountProg
}



@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
@Autowired
public void setSpatMessageCountProgressionAlgorithmFactory(SpatMessageCountProgressionAlgorithmFactory spatMessageCountProgressionAlgorithmFactory) {
this.spatMessageCountProgressionAlgorithmFactory = spatMessageCountProgressionAlgorithmFactory;
Expand All @@ -742,7 +719,7 @@ public void setSpatMessageCountProgressionAlgorithmParameters(SpatMessageCountPr
this.spatMessageCountProgressionAlgorithmParameters = spatMessageCountProgressionAlgorithmParameters;
}


@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
@Autowired
public void setBsmMessageCountProgressionAlgorithmFactory(BsmMessageCountProgressionAlgorithmFactory bsmMessageCountProgressionAlgorithmFactory) {
this.bsmMessageCountProgressionAlgorithmFactory = bsmMessageCountProgressionAlgorithmFactory;
Expand All @@ -763,9 +740,9 @@ public NotificationAlgorithmFactory getNotificationAlgorithmFactory() {
return notificationAlgorithmFactory;
}





@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
@Autowired
public void setNotificationAlgorithmFactory(NotificationAlgorithmFactory notificationAlgorithmFactory) {
this.notificationAlgorithmFactory = notificationAlgorithmFactory;
Expand Down Expand Up @@ -795,6 +772,7 @@ public void setEventParameters(EventParameters eventParameters) {
this.eventAlgorithm = eventParameters.getAlgorithm();
}

@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
@Autowired
public void setEventAlgorithmFactory(EventAlgorithmFactory factory) {
this.eventAlgorithmFactory = factory;
Expand All @@ -804,7 +782,7 @@ public Boolean getConfluentCloudStatus() {
return confluentCloudEnabled;
}


@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
@Autowired
public void setBsmEventAlgorithmFactory(BsmEventAlgorithmFactory bsmEventAlgorithmFactory) {
this.bsmEventAlgorithmFactory = bsmEventAlgorithmFactory;
Expand All @@ -815,6 +793,7 @@ public void setBsmEventParameters(BsmEventParameters bsmEventParameters) {
this.bsmEventParameters = bsmEventParameters;
}

@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
@Autowired
public void setMessageIngestAlgorithmFactory(MessageIngestAlgorithmFactory messageIngestAlgorithmFactory) {
this.messageIngestAlgorithmFactory = messageIngestAlgorithmFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.springframework.web.client.RestTemplate;
import org.springframework.web.servlet.support.ServletUriComponentsBuilder;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
Expand All @@ -49,9 +48,10 @@
import us.dot.its.jpo.conflictmonitor.monitor.utils.BsmUtils;
import us.dot.its.jpo.geojsonconverter.partitioner.RsuIntersectionKey;
import us.dot.its.jpo.geojsonconverter.pojos.geojson.LineString;
import us.dot.its.jpo.geojsonconverter.pojos.geojson.Point;
import us.dot.its.jpo.geojsonconverter.pojos.geojson.bsm.ProcessedBsm;
import us.dot.its.jpo.geojsonconverter.pojos.geojson.map.ProcessedMap;
import us.dot.its.jpo.geojsonconverter.pojos.spat.ProcessedSpat;
import us.dot.its.jpo.ode.model.OdeBsmData;

import javax.ws.rs.Produces;

Expand Down Expand Up @@ -360,22 +360,22 @@ private void addLinks(Map<String, String> map, String... paths) {
Instant startTime = key.window().startTime();
Instant endTime = key.window().endTime();
BsmIntersectionIdKey theKey= key.key();
OdeBsmData value = kvp.value;
ProcessedBsm<Point> value = kvp.value;
// Integer intersectionId = value.();
String vehicleId = BsmUtils.getVehicleId(value);
TreeMap<String, TreeMap<String, OdeBsmData>> bsms = null;
TreeMap<String, TreeMap<String, ProcessedBsm<Point>>> bsms = null;
if (intersectionMap.containsKey(vehicleId)) {
bsms = intersectionMap.get(vehicleId);
} else {
bsms = new TreeMap<String, TreeMap<String, OdeBsmData>>();
bsms = new TreeMap<>();
intersectionMap.put(vehicleId, bsms);
}
String window = String.format("%s / %s", formatter.format(startTime.atZone(ZoneOffset.UTC)), formatter.format(endTime.atZone(ZoneOffset.UTC)));
TreeMap<String, OdeBsmData> bsmList = null;
TreeMap<String, ProcessedBsm<Point>> bsmList = null;
if (bsms.containsKey(window)) {
bsmList = bsms.get(window);
} else {
bsmList = new TreeMap<String, OdeBsmData>();
bsmList = new TreeMap<>();
bsms.put(window, bsmList);
}
bsmList.put(theKey.toString(), value);
Expand Down Expand Up @@ -458,7 +458,7 @@ private String baseUrl() {
}

public class IntersectionSpatMap extends TreeMap<Integer, TreeMap<String, TreeMap<String, ProcessedSpat>>> {}
public class IntersectionBsm extends TreeMap<String, TreeMap<String, TreeMap<String, OdeBsmData>>> {}
public class IntersectionBsm extends TreeMap<String, TreeMap<String, TreeMap<String, ProcessedBsm<Point>>>> {}


public record TopologyInfoLinks(String detailsUrl, String simpleGraphUrl) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,6 @@
import us.dot.its.jpo.conflictmonitor.monitor.algorithms.notification.NotificationAlgorithm;
import us.dot.its.jpo.conflictmonitor.monitor.algorithms.notification.NotificationAlgorithmFactory;
import us.dot.its.jpo.conflictmonitor.monitor.algorithms.notification.NotificationParameters;
import us.dot.its.jpo.conflictmonitor.monitor.algorithms.repartition.RepartitionAlgorithm;
import us.dot.its.jpo.conflictmonitor.monitor.algorithms.repartition.RepartitionAlgorithmFactory;
import us.dot.its.jpo.conflictmonitor.monitor.algorithms.repartition.RepartitionParameters;
import us.dot.its.jpo.conflictmonitor.monitor.algorithms.event_state_progression.EventStateProgressionAlgorithm;
import us.dot.its.jpo.conflictmonitor.monitor.algorithms.stop_line_passage.StopLinePassageAlgorithm;
import us.dot.its.jpo.conflictmonitor.monitor.algorithms.stop_line_passage.StopLinePassageAlgorithmFactory;
Expand Down Expand Up @@ -153,24 +150,6 @@ public MonitorServiceController(final ConflictMonitorProperties conflictMonitorP
configTopology.start();


final String repartition = "repartition";
final RepartitionAlgorithmFactory repartitionAlgoFactory = conflictMonitorProps.getRepartitionAlgorithmFactory();
final String repAlgo = conflictMonitorProps.getRepartitionAlgorithm();
final RepartitionAlgorithm repartitionAlgo = repartitionAlgoFactory.getAlgorithm(repAlgo);
final RepartitionParameters repartitionParams = conflictMonitorProps.getRepartitionAlgorithmParameters();
configTopology.registerConfigListeners(repartitionParams);
if (repartitionAlgo instanceof StreamsTopology) {
final var streamsAlgo = (StreamsTopology)repartitionAlgo;
streamsAlgo.setStreamsProperties(conflictMonitorProps.createStreamProperties(repartition));
streamsAlgo.registerStateListener(new StateChangeHandler(kafkaTemplate, repartition, stateChangeTopic, healthTopic));
streamsAlgo.registerUncaughtExceptionHandler(new StreamsExceptionHandler(kafkaTemplate, repartition, healthTopic));
algoMap.put(repartition, streamsAlgo);
}
repartitionAlgo.setParameters(repartitionParams);
Runtime.getRuntime().addShutdownHook(new Thread(repartitionAlgo::stop));
repartitionAlgo.start();


final String notification = "notification";
final NotificationAlgorithmFactory notificationAlgoFactory = conflictMonitorProps.getNotificationAlgorithmFactory();
final String notAlgo = conflictMonitorProps.getNotificationAlgorithm();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package us.dot.its.jpo.conflictmonitor.monitor.algorithms.aggregation.bsm_message_count_progression;

import lombok.*;
import us.dot.its.jpo.conflictmonitor.monitor.models.bsm.BsmRsuIdKey;
import us.dot.its.jpo.geojsonconverter.partitioner.RsuIdKey;

import us.dot.its.jpo.geojsonconverter.partitioner.RsuLogKey;

@EqualsAndHashCode(callSuper = true)
@Data
@Generated
public class BsmMessageCountProgressionAggregationKey extends BsmRsuIdKey {
public class BsmMessageCountProgressionAggregationKey extends RsuLogKey {
private String dataFrame;
private String change;
}
Loading
Loading