Skip to content

Commit

Permalink
cleanup (#1200)
Browse files Browse the repository at this point in the history
Signed-off-by: Angelica Ochoa <[email protected]>
  • Loading branch information
ao508 authored Jun 24, 2024
1 parent 9af4591 commit de5f044
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public Cohort saveCohort(Cohort cohort, Set<String> sampleIds) throws Exception
cohortCompleteRepository.save(cohort);
Set<String> unknownSamples = new HashSet<>(); // tracks unknown samples in smile
// create cohort-sample relationships
LOG.info("Adding cohort-sample edges in database for " + sampleIds.size() + " samples...");
for (String sampleId : sampleIds) {
// confirm sample exists by primary id and then link to cohort
SmileSample sample = sampleService.getSampleByInputId(sampleId);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.mskcc.smile.service.impl;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.nats.client.Message;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -35,6 +36,7 @@
import org.mskcc.smile.service.SmileSampleService;
import org.mskcc.smile.service.TempoMessageHandlingService;
import org.mskcc.smile.service.TempoService;
import org.mskcc.smile.service.util.NatsMsgUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.dao.IncorrectResultSizeDataAccessException;
Expand Down Expand Up @@ -488,10 +490,16 @@ private void setupBamCompleteHandler(Gateway gateway,
@Override
public void onMessage(Message msg, Object message) {
try {
String bamCompleteJson = mapper.convertValue(
new String(msg.getData(), StandardCharsets.UTF_8),
String.class);
Map<String, String> bamCompleteMap = mapper.readValue(bamCompleteJson, Map.class);
LOG.info("Received message on topic: " + TEMPO_WES_BAM_COMPLETE_TOPIC);
String bamCompleteJson = NatsMsgUtil.extractNatsJsonString(msg);
if (bamCompleteJson == null) {
LOG.error("Exception occurred during processing of NATS message data");
return;
}
System.out.println("Extracted message contents =\n\n" + bamCompleteJson + "\n\n");
Map<String, String> bamCompleteMap =
(Map<String, String>) NatsMsgUtil.convertObjectFromString(
bamCompleteJson, new TypeReference<Map<String, String>>() {});

// resolve sample id, bam complete object
String sampleId = ObjectUtils.firstNonNull(bamCompleteMap.get("primaryId"),
Expand All @@ -516,10 +524,16 @@ private void setupQcCompleteHandler(Gateway gateway,
@Override
public void onMessage(Message msg, Object message) {
try {
String qcCompleteJson = mapper.convertValue(
new String(msg.getData(), StandardCharsets.UTF_8),
String.class);
Map<String, String> qcCompleteMap = mapper.readValue(qcCompleteJson, Map.class);
LOG.info("Received message on topic: " + TEMPO_WES_QC_COMPLETE_TOPIC);
String qcCompleteJson = NatsMsgUtil.extractNatsJsonString(msg);
if (qcCompleteJson == null) {
LOG.error("Exception occurred during processing of NATS message data");
return;
}
System.out.println("Extracted message contents =\n\n" + qcCompleteJson + "\n\n");
Map<String, String> qcCompleteMap =
(Map<String, String>) NatsMsgUtil.convertObjectFromString(
qcCompleteJson, new TypeReference<Map<String, String>>() {});

// resolve sample id, qc complete object
String sampleId = ObjectUtils.firstNonNull(qcCompleteMap.get("primaryId"),
Expand All @@ -545,10 +559,16 @@ private void setupMafCompleteHandler(Gateway gateway,
@Override
public void onMessage(Message msg, Object message) {
try {
String mafCompleteJson = mapper.convertValue(
new String(msg.getData(), StandardCharsets.UTF_8),
String.class);
Map<String, String> mafCompleteMap = mapper.readValue(mafCompleteJson, Map.class);
LOG.info("Received message on topic: " + TEMPO_WES_MAF_COMPLETE_TOPIC);
String mafCompleteJson = NatsMsgUtil.extractNatsJsonString(msg);
if (mafCompleteJson == null) {
LOG.error("Exception occurred during processing of NATS message data");
return;
}
System.out.println("Extracted message contents =\n\n" + mafCompleteJson + "\n\n");
Map<String, String> mafCompleteMap =
(Map<String, String>) NatsMsgUtil.convertObjectFromString(
mafCompleteJson, new TypeReference<Map<String, String>>() {});

// resolve sample id, normal ids, and maf complete object
String sampleId = ObjectUtils.firstNonNull(mafCompleteMap.get("primaryId"),
Expand Down Expand Up @@ -576,11 +596,17 @@ private void setupCohortCompleteHandler(Gateway gateway,
@Override
public void onMessage(Message msg, Object message) {
try {
String cohortCompleteJson = mapper.convertValue(
new String(msg.getData(), StandardCharsets.UTF_8),
String.class);
CohortCompleteJson cohortCompleteData = mapper.readValue(cohortCompleteJson,
CohortCompleteJson.class);
LOG.info("Received message on topic: " + TEMPO_WES_COHORT_COMPLETE_TOPIC);
String cohortCompleteJson = NatsMsgUtil.extractNatsJsonString(msg);
if (cohortCompleteJson == null) {
LOG.error("Exception occurred during processing of NATS message data");
return;
}
System.out.println("Extracted message contents =\n\n" + cohortCompleteJson + "\n\n");
CohortCompleteJson cohortCompleteData =
(CohortCompleteJson) NatsMsgUtil.convertObjectFromString(
cohortCompleteJson, new TypeReference<CohortCompleteJson>() {});

tempoMessageHandlingService.cohortCompleteHandler(cohortCompleteData);
} catch (Exception e) {
LOG.error("Exception occurred during processing of Cohort Complete event: "
Expand All @@ -596,11 +622,17 @@ private void setupSampleBillingHandler(Gateway gateway,
@Override
public void onMessage(Message msg, Object message) {
try {
String billingJson = mapper.convertValue(
new String(msg.getData(), StandardCharsets.UTF_8),
String.class);
SampleBillingJson billing = mapper.readValue(billingJson,
SampleBillingJson.class);
LOG.info("Received message on topic: " + TEMPO_SAMPLE_BILLING_TOPIC);
String billingJson = NatsMsgUtil.extractNatsJsonString(msg);
if (billingJson == null) {
LOG.error("Exception occurred during processing of NATS message data");
return;
}
System.out.println("Extracted message contents =\n\n" + billingJson + "\n\n");
SampleBillingJson billing =
(SampleBillingJson) NatsMsgUtil.convertObjectFromString(
billingJson, new TypeReference<SampleBillingJson>() {});

tempoMessageHandlingService.sampleBillingHandler(billing);
} catch (Exception e) {
LOG.error("Exception occurred during processing of Cohort Complete event: "
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package org.mskcc.smile.service.util;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.exc.MismatchedInputException;
import io.nats.client.Message;
import java.nio.charset.StandardCharsets;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/**
*
* @author ochoaa
*/
public class NatsMsgUtil {
private static final ObjectMapper mapper = new ObjectMapper();
private static final Log LOG = LogFactory.getLog(NatsMsgUtil.class);

/**
* Extracts string from NATS message data contents.
* @param msg
* @return String
* @throws JsonProcessingException
*/
public static String extractNatsJsonString(Message msg)
throws JsonProcessingException {
byte[] msgData = msg.getData();
try {
String jsonString = mapper.readValue(
new String(msgData, StandardCharsets.UTF_8),
String.class);
return jsonString;
} catch (MismatchedInputException e) {
LOG.warn("Failed to deserialize with mapper.readValue() - "
+ "attempting mapper.convertValue()");
}

try {
String jsonString = mapper.convertValue(
new String(msgData, StandardCharsets.UTF_8),
String.class);
return jsonString;
} catch (Exception e) {
LOG.warn("Failed to deserialize with mapper.convertValue(). "
+ "Cannot deserialize message");
e.printStackTrace();
}

LOG.error("Could not deserialize message from contents: " + msg.toString());
LOG.error(msg);
return null;
}

/**
* Converts string to given TypeReference.
* @param input
* @param valueTypeReference
* @return Object
* @throws JsonProcessingException
*/
public static Object convertObjectFromString(String input, TypeReference valueTypeReference)
throws JsonProcessingException {
try {
return mapper.convertValue(input, valueTypeReference);
} catch (IllegalArgumentException e) {
return mapper.readValue(input, valueTypeReference);
}
}
}

0 comments on commit de5f044

Please sign in to comment.