From de5f044bf260290f94aa25c62b47ef7248f36e27 Mon Sep 17 00:00:00 2001 From: ao508 <15623749+ao508@users.noreply.github.com> Date: Mon, 24 Jun 2024 15:23:39 -0400 Subject: [PATCH] cleanup (#1200) Signed-off-by: Angelica Ochoa <15623749+ao508@users.noreply.github.com> --- .../impl/CohortCompleteServiceImpl.java | 1 + .../impl/TempoMessageHandlingServiceImpl.java | 76 +++++++++++++------ .../mskcc/smile/service/util/NatsMsgUtil.java | 70 +++++++++++++++++ 3 files changed, 125 insertions(+), 22 deletions(-) create mode 100644 service/src/main/java/org/mskcc/smile/service/util/NatsMsgUtil.java diff --git a/service/src/main/java/org/mskcc/smile/service/impl/CohortCompleteServiceImpl.java b/service/src/main/java/org/mskcc/smile/service/impl/CohortCompleteServiceImpl.java index 1cd84aa9..d2825ce4 100644 --- a/service/src/main/java/org/mskcc/smile/service/impl/CohortCompleteServiceImpl.java +++ b/service/src/main/java/org/mskcc/smile/service/impl/CohortCompleteServiceImpl.java @@ -46,6 +46,7 @@ public Cohort saveCohort(Cohort cohort, Set sampleIds) throws Exception cohortCompleteRepository.save(cohort); Set unknownSamples = new HashSet<>(); // tracks unknown samples in smile // create cohort-sample relationships + LOG.info("Adding cohort-sample edges in database for " + sampleIds.size() + " samples..."); for (String sampleId : sampleIds) { // confirm sample exists by primary id and then link to cohort SmileSample sample = sampleService.getSampleByInputId(sampleId); diff --git a/service/src/main/java/org/mskcc/smile/service/impl/TempoMessageHandlingServiceImpl.java b/service/src/main/java/org/mskcc/smile/service/impl/TempoMessageHandlingServiceImpl.java index ec13038c..2bbdef77 100644 --- a/service/src/main/java/org/mskcc/smile/service/impl/TempoMessageHandlingServiceImpl.java +++ b/service/src/main/java/org/mskcc/smile/service/impl/TempoMessageHandlingServiceImpl.java @@ -1,5 +1,6 @@ package org.mskcc.smile.service.impl; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import io.nats.client.Message; import java.nio.charset.StandardCharsets; @@ -35,6 +36,7 @@ import org.mskcc.smile.service.SmileSampleService; import org.mskcc.smile.service.TempoMessageHandlingService; import org.mskcc.smile.service.TempoService; +import org.mskcc.smile.service.util.NatsMsgUtil; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.dao.IncorrectResultSizeDataAccessException; @@ -488,10 +490,16 @@ private void setupBamCompleteHandler(Gateway gateway, @Override public void onMessage(Message msg, Object message) { try { - String bamCompleteJson = mapper.convertValue( - new String(msg.getData(), StandardCharsets.UTF_8), - String.class); - Map bamCompleteMap = mapper.readValue(bamCompleteJson, Map.class); + LOG.info("Received message on topic: " + TEMPO_WES_BAM_COMPLETE_TOPIC); + String bamCompleteJson = NatsMsgUtil.extractNatsJsonString(msg); + if (bamCompleteJson == null) { + LOG.error("Exception occurred during processing of NATS message data"); + return; + } + System.out.println("Extracted message contents =\n\n" + bamCompleteJson + "\n\n"); + Map bamCompleteMap = + (Map) NatsMsgUtil.convertObjectFromString( + bamCompleteJson, new TypeReference>() {}); // resolve sample id, bam complete object String sampleId = ObjectUtils.firstNonNull(bamCompleteMap.get("primaryId"), @@ -516,10 +524,16 @@ private void setupQcCompleteHandler(Gateway gateway, @Override public void onMessage(Message msg, Object message) { try { - String qcCompleteJson = mapper.convertValue( - new String(msg.getData(), StandardCharsets.UTF_8), - String.class); - Map qcCompleteMap = mapper.readValue(qcCompleteJson, Map.class); + LOG.info("Received message on topic: " + TEMPO_WES_QC_COMPLETE_TOPIC); + String qcCompleteJson = NatsMsgUtil.extractNatsJsonString(msg); + if (qcCompleteJson == null) { + LOG.error("Exception occurred during processing of NATS message data"); + return; + } + System.out.println("Extracted message contents =\n\n" + qcCompleteJson + "\n\n"); + Map qcCompleteMap = + (Map) NatsMsgUtil.convertObjectFromString( + qcCompleteJson, new TypeReference>() {}); // resolve sample id, qc complete object String sampleId = ObjectUtils.firstNonNull(qcCompleteMap.get("primaryId"), @@ -545,10 +559,16 @@ private void setupMafCompleteHandler(Gateway gateway, @Override public void onMessage(Message msg, Object message) { try { - String mafCompleteJson = mapper.convertValue( - new String(msg.getData(), StandardCharsets.UTF_8), - String.class); - Map mafCompleteMap = mapper.readValue(mafCompleteJson, Map.class); + LOG.info("Received message on topic: " + TEMPO_WES_MAF_COMPLETE_TOPIC); + String mafCompleteJson = NatsMsgUtil.extractNatsJsonString(msg); + if (mafCompleteJson == null) { + LOG.error("Exception occurred during processing of NATS message data"); + return; + } + System.out.println("Extracted message contents =\n\n" + mafCompleteJson + "\n\n"); + Map mafCompleteMap = + (Map) NatsMsgUtil.convertObjectFromString( + mafCompleteJson, new TypeReference>() {}); // resolve sample id, normal ids, and maf complete object String sampleId = ObjectUtils.firstNonNull(mafCompleteMap.get("primaryId"), @@ -576,11 +596,17 @@ private void setupCohortCompleteHandler(Gateway gateway, @Override public void onMessage(Message msg, Object message) { try { - String cohortCompleteJson = mapper.convertValue( - new String(msg.getData(), StandardCharsets.UTF_8), - String.class); - CohortCompleteJson cohortCompleteData = mapper.readValue(cohortCompleteJson, - CohortCompleteJson.class); + LOG.info("Received message on topic: " + TEMPO_WES_COHORT_COMPLETE_TOPIC); + String cohortCompleteJson = NatsMsgUtil.extractNatsJsonString(msg); + if (cohortCompleteJson == null) { + LOG.error("Exception occurred during processing of NATS message data"); + return; + } + System.out.println("Extracted message contents =\n\n" + cohortCompleteJson + "\n\n"); + CohortCompleteJson cohortCompleteData = + (CohortCompleteJson) NatsMsgUtil.convertObjectFromString( + cohortCompleteJson, new TypeReference() {}); + tempoMessageHandlingService.cohortCompleteHandler(cohortCompleteData); } catch (Exception e) { LOG.error("Exception occurred during processing of Cohort Complete event: " @@ -596,11 +622,17 @@ private void setupSampleBillingHandler(Gateway gateway, @Override public void onMessage(Message msg, Object message) { try { - String billingJson = mapper.convertValue( - new String(msg.getData(), StandardCharsets.UTF_8), - String.class); - SampleBillingJson billing = mapper.readValue(billingJson, - SampleBillingJson.class); + LOG.info("Received message on topic: " + TEMPO_SAMPLE_BILLING_TOPIC); + String billingJson = NatsMsgUtil.extractNatsJsonString(msg); + if (billingJson == null) { + LOG.error("Exception occurred during processing of NATS message data"); + return; + } + System.out.println("Extracted message contents =\n\n" + billingJson + "\n\n"); + SampleBillingJson billing = + (SampleBillingJson) NatsMsgUtil.convertObjectFromString( + billingJson, new TypeReference() {}); + tempoMessageHandlingService.sampleBillingHandler(billing); } catch (Exception e) { LOG.error("Exception occurred during processing of Cohort Complete event: " diff --git a/service/src/main/java/org/mskcc/smile/service/util/NatsMsgUtil.java b/service/src/main/java/org/mskcc/smile/service/util/NatsMsgUtil.java new file mode 100644 index 00000000..8fcda1cc --- /dev/null +++ b/service/src/main/java/org/mskcc/smile/service/util/NatsMsgUtil.java @@ -0,0 +1,70 @@ +package org.mskcc.smile.service.util; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.exc.MismatchedInputException; +import io.nats.client.Message; +import java.nio.charset.StandardCharsets; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * + * @author ochoaa + */ +public class NatsMsgUtil { + private static final ObjectMapper mapper = new ObjectMapper(); + private static final Log LOG = LogFactory.getLog(NatsMsgUtil.class); + + /** + * Extracts string from NATS message data contents. + * @param msg + * @return String + * @throws JsonProcessingException + */ + public static String extractNatsJsonString(Message msg) + throws JsonProcessingException { + byte[] msgData = msg.getData(); + try { + String jsonString = mapper.readValue( + new String(msgData, StandardCharsets.UTF_8), + String.class); + return jsonString; + } catch (MismatchedInputException e) { + LOG.warn("Failed to deserialize with mapper.readValue() - " + + "attempting mapper.convertValue()"); + } + + try { + String jsonString = mapper.convertValue( + new String(msgData, StandardCharsets.UTF_8), + String.class); + return jsonString; + } catch (Exception e) { + LOG.warn("Failed to deserialize with mapper.convertValue(). " + + "Cannot deserialize message"); + e.printStackTrace(); + } + + LOG.error("Could not deserialize message from contents: " + msg.toString()); + LOG.error(msg); + return null; + } + + /** + * Converts string to given TypeReference. + * @param input + * @param valueTypeReference + * @return Object + * @throws JsonProcessingException + */ + public static Object convertObjectFromString(String input, TypeReference valueTypeReference) + throws JsonProcessingException { + try { + return mapper.convertValue(input, valueTypeReference); + } catch (IllegalArgumentException e) { + return mapper.readValue(input, valueTypeReference); + } + } +}