Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate Asn1EncodedDataRouter to use Spring Kafka #138

Merged
merged 162 commits into from
Feb 20, 2025

Conversation

mcook42
Copy link

@mcook42 mcook42 commented Dec 24, 2024

PR Details

Description

  • Migrated Asn1EncodedDataRouter to use Spring Kafka instead of homegrown Kafka implementation
  • Changed from swallowing all exceptions to explicitly handling the non-critical exceptions (where we can reasonably continue to process the data) and throwing specific exceptions where we cannot continue processing (where missing data would cause unexpected downstream issues)
  • Removed duplicated logging wherever EventLogger was used
  • Moved OdeTimJsonTopology to a Spring Configuration class to enable dependency injection for testing and Asn1DecodedDataRouter listener Bean initialization
  • Extracted interactions with the security services module into SecurityServicesClient and defined the request and response models based on the raw JSON manipulation done prior
  • Deleted Asn1CommandManager and moved logic to the Asn1EncodedDataRouter for clearer code and fewer classes to maintain
  • Moved class creation out of Asn1EncodedDataRouter into the relevant Beans to enable easier testing with Spring Dependency Injection
  • Replaced outdated (and buggy) usage of the Date class with usages of LocalDateTime (only in Asn1EncodedDataRouter) to allow for explicit usage of UTC timezone instead of relying on the system settings (they aren't consistent between local and CI, so it's reasonable to suspect there is inconsistency between hosting environments)

NOTE: The interactions with the security service rely on the changes contained within: CDOT-CV/jpo-security-svcs#16. The jpo-security-svcs submodule points to the CDOT-CV dev branch in this PR

Related Issue

No related USDOT issue

Motivation and Context

Implementing Spring Kafka gives us better lifecycle management of producers and consumers, more reusable producer/consumer code, easier testability, and a more robust production-ready Kafka library. This is part of a more significant effort to replace our hand-rolled Kafka implementation with Spring Kafka. The previous changesets related to this effort are:

How Has This Been Tested?

  • Before making any code changes, I refactored the Asn1EncodedDataRouter class to check the output to the different topics is as expected.
  • Refactored Asn1EncodedDataRouter class to use Spring Kafka and confirmed the tests continued to behave as expected
  • I sent the data from the udpsender_[msgType].py scripts found under scripts/tests through a live local system started up with make rebuild. I confirmed there were no errors in the logs. I also confirmed that all expected messages were produced to and consumed from the correct queues by using the kafka-ui container available at localhost:8001 (on my local machine, of course).

For the live testing, I set the following environment variables on my system to enable signing and confirm all pieces played well together

DATA_SIGNING_ENABLED_RSU=true
DATA_SIGNING_ENABLED_SDW=true

To test the signing flow, I produced the test XML to the topic.Asn1EncoderOutput topic via the kafka-ui.

Snippet from the locals during local testing:

2025-01-06 21:57:56 [Asn1EncodedDataRouter-0-C-1] DEBUG Asn1EncodedDataRouter - ServiceRequest: {"ode":{"verb":"POST","version":"3"},"sdw":{"recordId":"6B573067","serviceRegion":{"nwCorner":{"latitude":"38.98721843900006","longitude":"-104.76767069499999"},"seCorner":{"latitude":"38.96666515900006","longitude":"-104.74048299899994"}},"ttl":"oneday"}} 
2025-01-06 21:57:56 [Asn1EncodedDataRouter-0-C-1] DEBUG Asn1EncodedDataRouter - Mapped to object ServiceRequest: {"ode":{"version":3,"verb":"POST"},"sdw":{"serviceRegion":{"nwCorner":{"latitude":38.98721843900006,"longitude":-104.76767069499999},"seCorner":{"latitude":38.96666515900006,"longitude":-104.74048299899994}},"ttl":"oneday","recordId":"6B573067"}} 
2025-01-06 21:57:56 [Asn1EncodedDataRouter-0-C-1] INFO  Asn1EncodedDataRouter - Processing unsigned message. 
2025-01-06 21:57:56 [Asn1EncodedDataRouter-0-C-1] DEBUG Asn1EncodedDataRouter - Signing encoded TIM message... 
2025-01-06 21:57:56 [Asn1EncodedDataRouter-0-C-1] INFO  SecurityServicesClient - Sending data to security services module at http://172.21.103.243:8090/sign with validity override 17820000 to be signed 
2025-01-06 21:57:56 [Asn1EncodedDataRouter-0-C-1] DEBUG SecurityServicesClient - Data to be signed: <SignatureRequestModel(message=AB+AhHARTzcDsCfPIHEyjhIPd12bAwHCZw83QWa8MCf/+T9AvmKBKaAH+Tk34c9ayY36cGpfbQmrdmwbN4AAAAAJnDzdBZrwwITiMCAAhEAzYRQHQBczzcLWID/aF1Y5g1jtJ1V8aFb7qsHDoPjZwiHdhYpg4K2llhJFZ/KxggAQAgCe7rs2AA==, sigValidityOverride=17820000),[Content-Type:"application/json"]> 
2025-01-06 21:58:00 [Asn1EncodedDataRouter-0-C-1] DEBUG SecurityServicesClient - Security services module response: <200 OK OK,SignatureResultModel(messageSigned=AB+AhHARTzcDsCfPIHEyjhIPd12bAwHCZw83QWa8MCf/+T9AvmKBKaAH+Tk34c9ayY36cGpfbQmrdmwbN4AAAAAJnDzdBZrwwITiMCAAhEAzYRQHQBczzcLWID/aF1Y5g1jtJ1V8aFb7qsHDoPjZwiHdhYpg4K2llhJFZ/KxggAQAgCe7rs2AA==c26d50a4, messageExpiry=17820000),[Content-Type:"application/json", Transfer-Encoding:"chunked", Date:"Mon, 06 Jan 2025 21:58:00 GMT", Keep-Alive:"timeout=60", Connection:"keep-alive"]> 
2025-01-06 21:58:00 [Asn1EncodedDataRouter-0-C-1] DEBUG Asn1EncodedDataRouter - Encoded message - phase 1: 001F808470114F3703B027CF2071328E120F775D9B0301C2670F374166BC3027FFF93F40BE628129A007F93937E1CF5AC98DFA706A5F6D09AB766C1B3780000000099C3CDD059AF0C084E2302000844033611407401733CDC2D6203FDA1756398358ED27557C6856FBAAC1C3A0F8D9C221DD858A60E0ADA596124567F2B182001002009EEEBB3600736E9DE746B8 
2025-01-06 21:58:00 [Asn1EncodedDataRouter-0-C-1] DEBUG Asn1EncodedDataRouter - No RSUs or SNMP provided. Not sending to RSUs. 
2025-01-06 21:58:00 [kafka-producer-network-thread | producer-5] DEBUG LoggingProducerListener - Successfully produced to topic topic.OdeTIMCertExpirationTimeJson with key null and value {"packetID":"03B027CF2071328E12","startDateTime":"2024-03-08T16:37:05.414Z","requiredExpirationDate":"2024-03-08T21:34:05.414Z","expirationDate":"1970-07-26T06:00:00.000Z"}  
2025-01-06 21:58:00 [Asn1EncodedDataRouter-0-C-1] DEBUG Asn1EncodedDataRouter - TIM not found in k-table. Skipping deposit to TMC-filtered topic. 
2025-01-06 21:58:00 [Asn1EncodedDataRouter-0-C-1] DEBUG Asn1EncodedDataRouter - Publishing message for round 2 encoding 
2025-01-06 21:58:00 [Asn1EncodedDataRouter-0-C-1] DEBUG Asn1EncodedDataRouter - Fully crafted ASD to be encoded: <OdeAsn1Data><metadata><payloadType>us.dot.its.jpo.ode.model.OdeAsdPayload</payloadType><serialId><streamId>05ce8f1c-9c30-4ba1-afb3-5cac3d681ce9</streamId><bundleSize>1</bundleSize><bundleId>0</bundleId><recordId>0</recordId><serialNumber>0</serialNumber></serialId><odeReceivedAt>2025-01-06T21:58:00.453Z</odeReceivedAt><schemaVersion>8</schemaVersion><maxDurationTime>0</maxDurationTime><sanitized>false</sanitized><request><ode><version>3</version><verb>POST</verb></ode><sdw><serviceRegion><nwCorner><latitude>38.98721843900006</latitude><longitude>-104.76767069499999</longitude></nwCorner><seCorner><latitude>38.96666515900006</latitude><longitude>-104.74048299899994</longitude></seCorner></serviceRegion><ttl>oneday</ttl><recordId>6B573067</recordId></sdw></request><encodings><encodings><elementName>AdvisorySituationData</elementName><elementType>AdvisorySituationData</elementType><encodingRule>UPER</encodingRule></encodings></encodings></metadata><payload><dataType>us.dot.its.jpo.ode.plugin.j2735.DdsAdvisorySituationData</dataType><data><AdvisorySituationData><dialogID>156</dialogID><seqID>5</seqID><groupID>00000000</groupID><requestID>0CDD17A4</requestID><recordID>6B573067</recordID><timeToLive>2</timeToLive><serviceRegion><nwCorner><lat>389872184</lat><long>-1047676707</long></nwCorner><seCorner><lat>389666652</lat><long>-1047404830</long></seCorner></serviceRegion><asdmDetails><asdmID>0CDD17A4</asdmID><asdmType>2</asdmType><distType>02</distType><startTime><year>0</year><month>0</month><day>0</day><hour>31</hour><minute>60</minute></startTime><stopTime><year>0</year><month>0</month><day>0</day><hour>31</hour><minute>60</minute></stopTime><advisoryMessage>001F808470114F3703B027CF2071328E120F775D9B0301C2670F374166BC3027FFF93F40BE628129A007F93937E1CF5AC98DFA706A5F6D09AB766C1B3780000000099C3CDD059AF0C084E2302000844033611407401733CDC2D6203FDA1756398358ED27557C6856FBAAC1C3A0F8D9C221DD858A60E0ADA596124567F2B182001002009EEEBB3600736E9DE746B8</advisoryMessage></asdmDetails></AdvisorySituationData></data></payload></OdeAsn1Data> 
2025-01-06 21:58:00 [kafka-producer-network-thread | producer-5] DEBUG LoggingProducerListener - Successfully produced to topic topic.Asn1EncoderInput with key null and value <OdeAsn1Data><metadata><payloadType>us.dot.its.jpo.ode.model.OdeAsdPayload</payloadType><serialId><streamId>05ce8f1c-9c30-4ba1-afb3-5cac3d681ce9</streamId><bundleSize>1</bundleSize><bundleId>0</bundleId><recordId>0</recordId><serialNumber>0</serialNumber></serialId><odeReceivedAt>2025-01-06T21:58:00.453Z</odeReceivedAt><schemaVersion>8</schemaVersion><maxDurationTime>0</maxDurationTime><sanitized>false</sanitized><request><ode><version>3</version><verb>POST</verb></ode><sdw><serviceRegion><nwCorner><latitude>38.98721843900006</latitude><longitude>-104.76767069499999</longitude></nwCorner><seCorner><latitude>38.96666515900006</latitude><longitude>-104.74048299899994</longitude></seCorner></serviceRegion><ttl>oneday</ttl><recordId>6B573067</recordId></sdw></request><encodings><encodings><elementName>AdvisorySituationData</elementName><elementType>AdvisorySituationData</elementType><encodingRule>UPER</encodingRule></encodings></encodings></metadata><payload><dataType>us.dot.its.jpo.ode.plugin.j2735.DdsAdvisorySituationData</dataType><data><AdvisorySituationData><dialogID>156</dialogID><seqID>5</seqID><groupID>00000000</groupID><requestID>0CDD17A4</requestID><recordID>6B573067</recordID><timeToLive>2</timeToLive><serviceRegion><nwCorner><lat>389872184</lat><long>-1047676707</long></nwCorner><seCorner><lat>389666652</lat><long>-1047404830</long></seCorner></serviceRegion><asdmDetails><asdmID>0CDD17A4</asdmID><asdmType>2</asdmType><distType>02</distType><startTime><year>0</year><month>0</month><day>0</day><hour>31</hour><minute>60</minute></startTime><stopTime><year>0</year><month>0</month><day>0</day><hour>31</hour><minute>60</minute></stopTime><advisoryMessage>001F808470114F3703B027CF2071328E120F775D9B0301C2670F374166BC3027FFF93F40BE628129A007F93937E1CF5AC98DFA706A5F6D09AB766C1B3780000000099C3CDD059AF0C084E2302000844033611407401733CDC2D6203FDA1756398358ED27557C6856FBAAC1C3A0F8D9C221DD858A60E0ADA596124567F2B182001002009EEEBB3600736E9DE746B8</advisoryMessage></asdmDetails></AdvisorySituationData></data></payload></OdeAsn1Data>  
2025-01-06 21:58:00 [Asn1EncodedDataRouter-0-C-1] DEBUG Asn1EncodedDataRouter - ServiceRequest: {"ode":{"verb":"POST","version":"3"},"sdw":{"recordId":"6B573067","serviceRegion":{"nwCorner":{"latitude":"38.98721843900006","longitude":"-104.76767069499999"},"seCorner":{"latitude":"38.96666515900006","longitude":"-104.74048299899994"}},"ttl":"oneday"}} 
2025-01-06 21:58:00 [Asn1EncodedDataRouter-0-C-1] DEBUG Asn1EncodedDataRouter - Mapped to object ServiceRequest: {"ode":{"version":3,"verb":"POST"},"sdw":{"serviceRegion":{"nwCorner":{"latitude":38.98721843900006,"longitude":-104.76767069499999},"seCorner":{"latitude":38.96666515900006,"longitude":-104.74048299899994}},"ttl":"oneday","recordId":"6B573067"}} 
2025-01-06 21:58:00 [kafka-producer-network-thread | producer-5] DEBUG LoggingProducerListener - Successfully produced to topic topic.SDWDepositorInput with key null and value {"encodedMsg":"C44000000000CDD17A46B57306742670F19C166BC56E099BD80B859B761C3866E8BD2202000007FC000007FC11C003F0108E0229E6E07604F9E40E2651C241EEEBB36060384CE1E6E82CD78604FFFF27E817CC50253400FF2726FC39EB5931BF4E0D4BEDA1356ECD8366F00000000133879BA0B35E18109C4604001088066C2280E802E679B85AC407FB42EAC7306B1DA4EAAF8D0ADF755838741F1B38443BB0B14C1C15B4B2C248ACFE56304002004013DDD766C00E6DD3BCE8D700"}  

Types of changes

  • Defect fix (non-breaking change that fixes an issue)
  • New feature (non-breaking change that adds functionality)
  • Breaking change (fix or feature that cause existing functionality to change)

Checklist:

  • I have added any new packages to the sonar-scanner.properties file
  • My change requires a change to the documentation.
  • I have updated the documentation accordingly.
  • I have read the CONTRIBUTING document.
    ODE Contributing Guide
  • I have added tests to cover my changes.
  • All new and existing tests passed.

Helpful Documentation

Generated by sending tim to deposit tim endpoint and grabbing value produced to the `topic.Asn1EncoderOutput` topic via kafka-ui
…TimUnsecured_depositsToSdxTopicAndTimTmcFiltered
…tImpl

These can be used to wrap the calls to security services and allow easier mocking of responses. It also allows us to better shield ourselves from API contract changes. users of the client don't need to be aware of changes to the external API as all interactions can be encapsulated within the client
This moves the calls to security services from Asn1CommandManager to SecurityServicesClientImpl which allows for more testability, flexibility (mocking client calls), and modularity
The responsibility to submit data to topics is already contained within Asn1EncodedDataRouter, and Asn1CommandManager is not responsible for any kafka interactions otherwise, so the responsibility was moved to Asn1EncodedDataRouter
Updated method signatures and instance usage to specify ResponseEvent with Address as the generic type. This change improves type safety and aligns with best practices for clearer code readability and maintainability.
Renamed methods and variables for clarity in message encoding workflows. Updated references in test cases and adjusted test resources to align with naming changes. This improves code readability and better reflects the encoding stages.
Copy link
Member

@dmccoystephenson dmccoystephenson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good to me & the unit tests pass! Great job on this refactor!

@mcook42 mcook42 mentioned this pull request Feb 4, 2025
9 tasks
Copy link

@mwodahl mwodahl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good to me!

}

/**
* Sign the encoded TIM message and write to Kafka with an expiration time.
* Constructs an XML representation of an Advisory Situation Data (ASD) message containing a
* signed Traveler Information Message (TIM). Processes the provided service request and signed

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: the ASD doesn't always have a signed TIM. We should update documentation to reflect it's just a TIM, and ensure our code paths are capable of delivering unsigned

Copy link
Author

@mcook42 mcook42 Feb 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated the javadoc comment to remove the "signed" portion 3a07370. I also read through the execution path and can confirm that the documentation was inaccurate & the code supports both signed and unsigned TIMs.

}
private String packageSignedTimIntoAsd(ServiceRequest request, String signedMsg) throws JsonProcessingException, ParseException {
SDW sdw = request.getSdw();
SNMP snmp = request.getSnmp();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: We're only packing into ASD if we're depositing into the SDX, so we should ignore the SNMP portion here and just take the SDW.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated: efd5f3d

if (dataSigningEnabledSDW && request.getSdw() != null) {
hexEncodedTim = signTIMAndProduceToExpireTopic(hexEncodedTim, consumedObj);
}
// no SDW in metadata (SNMP deposit only) -> sign MF -> send to RSU

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: this comment is a little misleading, we certainly could have SDW in metadata here and frequently do.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good callout. I've removed the comment in favor of "self-documenting" code 20a7b0a


hexEncodedTim = mfObj.getString(BYTES);
JSONObject deposit = new JSONObject();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question(non-blocking): would it be worth making a POJO for this object that gets sent to the SdwDepositor and on to the SDX?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, it was simple enough to implement so I defined one in 7c03cce

@mcook42 mcook42 requested a review from payneBrandon February 12, 2025 23:29
Copy link
Collaborator

@drewjj drewjj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everything passes tests and works for the most part. The one thing that seems to no longer be occurring is that TIM messages are not being written to the topic.OdeTimJsonTMCFiltered topic. This seems to occur no matter how you configure the jpo-ode to sign TIMs. This seems incorrect. Am I missing something?

Copy link
Collaborator

@drewjj drewjj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re-tested with the security module running and everything is working correctly. My mistake on not running the sec module! Having both RSU and SDW set to unsigned still results in this behavior but this is apparently how the existing functionality works too.

Everything looks to be good to go.

…h from Asn1EncodedDataRouter"

This reverts commit a00ab06.
…sitTim when dataSigningEnabledSDW is false

Eliminated processing logic and tests related to unsigned TIM messages, as well as the conditional ASD element addition when dataSigningEnabledSDW is false. This streamlines the codebase by ensuring all TIM messages follow the secured flow, improving maintainability and alignment with production recommendations.
Added/replaced logic to handle RSU XML-to-JSON array normalization in `processRsusIfPresent`. Refactored metadata handling in `processUnsignedMessage` and enhanced test cases to include RSU-related input/output scenarios. This ensures proper handling of RSU data while improving test robustness.
Copy link

@Michael7371 Michael7371 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks really good! I like the updates here. One other question I had is that I had to update the jpo-security-svcs module reference in the .gitmodules file. Should this PR modify that reference so it is in line with what the current ODE expects?

Passed Checks:

  1. tests pass
  2. log files process properly
  3. All services start up through docker-compose
  4. REST created TIMs for SDW deposit functions properly
  5. REST created TIMs for SNMP deposit functions properly

mcook42 and others added 2 commits February 20, 2025 14:45
@mcook42 mcook42 merged commit 6e36757 into dev Feb 20, 2025
4 checks passed
@mcook42 mcook42 deleted the mcook42/spring-kafka/asn1-encoded-router branch February 20, 2025 21:49
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants