Skip to content

Commit

Permalink
Billing message handler, persistence layer, service layer (#1134)
Browse files Browse the repository at this point in the history
Signed-off-by: Angelica Ochoa <[email protected]>
  • Loading branch information
ao508 authored Apr 3, 2024
1 parent e23938a commit d556c5e
Show file tree
Hide file tree
Showing 8 changed files with 205 additions and 3 deletions.
27 changes: 27 additions & 0 deletions model/src/main/java/org/mskcc/smile/model/tempo/Tempo.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ public class Tempo implements Serializable {
private Long id;
private String custodianInformation;
private String accessLevel;
private Boolean billed;
private String billedBy;
private String costCenter;
@Relationship(type = "HAS_EVENT", direction = Relationship.OUTGOING)
private List<BamComplete> bamCompleteEvents;
@Relationship(type = "HAS_EVENT", direction = Relationship.OUTGOING)
Expand Down Expand Up @@ -59,6 +62,30 @@ public void setAccessLevel(String accessLevel) {
this.accessLevel = accessLevel;
}

public Boolean getBilled() {
return billed;
}

public void setBilled(Boolean billed) {
this.billed = billed;
}

public String getBilledBy() {
return billedBy;
}

public void setBilledBy(String billedBy) {
this.billedBy = billedBy;
}

public String getCostCenter() {
return costCenter;
}

public void setCostCenter(String costCenter) {
this.costCenter = costCenter;
}

/**
* Returns list of bam complete events.
* @return
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package org.mskcc.smile.model.tempo.json;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.io.Serializable;
import org.apache.commons.lang.builder.ToStringBuilder;

/**
*
* @author ochoaa
*/
@JsonInclude(JsonInclude.Include.NON_NULL)
public class SampleBillingJson implements Serializable {
@JsonProperty("primaryId")
private String primaryId;
@JsonProperty("billed")
private Boolean billed;
@JsonProperty("billedBy")
private String billedBy;
@JsonProperty("costCenter")
private String costCenter;

public SampleBillingJson() {}

public String getPrimaryId() {
return primaryId;
}

public void setPrimaryId(String primaryId) {
this.primaryId = primaryId;
}

public Boolean getBilled() {
return billed;
}

public void setBilled(Boolean billed) {
this.billed = billed;
}

public String getBilledBy() {
return billedBy;
}

public void setBilledBy(String billedBy) {
this.billedBy = billedBy;
}

public String getCostCenter() {
return costCenter;
}

public void setCostCenter(String costCenter) {
this.costCenter = costCenter;
}

@Override
public String toString() {
return ToStringBuilder.reflectionToString(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.mskcc.smile.model.tempo.MafComplete;
import org.mskcc.smile.model.tempo.QcComplete;
import org.mskcc.smile.model.tempo.Tempo;
import org.mskcc.smile.model.tempo.json.SampleBillingJson;
import org.springframework.data.neo4j.annotation.Query;
import org.springframework.data.neo4j.repository.Neo4jRepository;
import org.springframework.data.repository.query.Param;
Expand Down Expand Up @@ -38,8 +39,8 @@ public interface TempoRepository extends Neo4jRepository<Tempo, Long> {

@Query("MATCH (t:Tempo) WHERE ID(t) = $tempoId MATCH (t)-[:HAS_EVENT]->(bc: BamComplete) "
+ "RETURN bc ORDER BY bc.date DESC LIMIT 1")
BamComplete findLatestBamCompleteEventByTempoId(@Param("tempoId") Long tempoId);
BamComplete findLatestBamCompleteEventByTempoId(@Param("tempoId") Long tempoId);

@Query("MATCH (t:Tempo) WHERE ID(t) = $tempoId MATCH (t)-[:HAS_EVENT]->(mc: MafComplete) "
+ "RETURN mc ORDER BY mc.date DESC LIMIT 1")
MafComplete findLatestMafCompleteEventByTempoId(@Param("tempoId") Long tempoId);
Expand All @@ -65,8 +66,14 @@ Tempo mergeQcCompleteEventBySamplePrimaryId(@Param("primaryId") String primaryId
@Query("MATCH (s: Sample)-[:HAS_METADATA]->(sm: SampleMetadata {primaryId: $primaryId}) "
+ "MERGE (s)-[:HAS_TEMPO]->(t: Tempo) WITH s,t "
+ "MERGE (t)-[:HAS_EVENT]->(mc: MafComplete {date: $mcEvent.date, "
+ "normalPrimaryId: $mcEvent.normalPrimaryId, status: $mcEvent.status}) "
+ "normalPrimaryId: $mcEvent.normalPrimaryId, status: $mcEvent.status}) "
+ "WITH s,t,mc RETURN t")
Tempo mergeMafCompleteEventBySamplePrimaryId(@Param("primaryId") String primaryId,
@Param("mcEvent") MafComplete mcEvent);

@Query("MATCH (s: Sample)-[:HAS_METADATA]->(sm: SampleMetadata {primaryId: $billing.primaryId}) "
+ "MATCH (s)-[:HAS_TEMPO]->(t: Tempo) "
+ "SET t.billed = $billing.billed, t.billedBy = $billing.billedBy, "
+ "t.costCenter = $billing.costCenter")
void updateSampleBilling(@Param("billing") SampleBillingJson billing);
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.mskcc.smile.model.tempo.MafComplete;
import org.mskcc.smile.model.tempo.QcComplete;
import org.mskcc.smile.model.tempo.json.CohortCompleteJson;
import org.mskcc.smile.model.tempo.json.SampleBillingJson;

/**
*
Expand All @@ -17,5 +18,6 @@ public interface TempoMessageHandlingService {
void qcCompleteHandler(Map.Entry<String, QcComplete> bcEvent) throws Exception;
void mafCompleteHandler(Map.Entry<String, MafComplete> mcEvent) throws Exception;
void cohortCompleteHandler(CohortCompleteJson ccEvent) throws Exception;
void sampleBillingHandler(SampleBillingJson billing) throws Exception;
void shutdown() throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import org.mskcc.smile.model.tempo.MafComplete;
import org.mskcc.smile.model.tempo.QcComplete;
import org.mskcc.smile.model.tempo.Tempo;
import org.mskcc.smile.model.tempo.json.SampleBillingJson;

/**
*
Expand All @@ -18,4 +19,5 @@ public interface TempoService {
Tempo mergeQcCompleteEventBySamplePrimaryId(String primaryId, QcComplete qcComplete) throws Exception;
Tempo mergeMafCompleteEventBySamplePrimaryId(String primaryId, MafComplete mafComplete) throws Exception;
Tempo initAndSaveDefaultTempoData(String primaryId) throws Exception;
void updateSampleBilling(SampleBillingJson billing) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.mskcc.smile.model.tempo.QcComplete;
import org.mskcc.smile.model.tempo.Tempo;
import org.mskcc.smile.model.tempo.json.CohortCompleteJson;
import org.mskcc.smile.model.tempo.json.SampleBillingJson;
import org.mskcc.smile.service.CohortCompleteService;
import org.mskcc.smile.service.SmileSampleService;
import org.mskcc.smile.service.TempoMessageHandlingService;
Expand All @@ -54,6 +55,9 @@ public class TempoMessageHandlingServiceImpl implements TempoMessageHandlingServ
@Value("${tempo.wes_cohort_complete_topic}")
private String TEMPO_WES_COHORT_COMPLETE_TOPIC;

@Value("${tempo.sample_billing_topic}")
private String TEMPO_SAMPLE_BILLING_TOPIC;

@Value("${num.tempo_msg_handler_threads}")
private int NUM_TEMPO_MSG_HANDLERS;

Expand Down Expand Up @@ -81,11 +85,14 @@ public class TempoMessageHandlingServiceImpl implements TempoMessageHandlingServ
new LinkedBlockingQueue<Map.Entry<String, MafComplete>>();
private static final BlockingQueue<CohortCompleteJson> cohortCompleteQueue =
new LinkedBlockingQueue<CohortCompleteJson>();
private static final BlockingQueue<SampleBillingJson> sampleBillingQueue =
new LinkedBlockingQueue<SampleBillingJson>();

private static CountDownLatch bamCompleteHandlerShutdownLatch;
private static CountDownLatch qcCompleteHandlerShutdownLatch;
private static CountDownLatch mafCompleteHandlerShutdownLatch;
private static CountDownLatch cohortCompleteHandlerShutdownLatch;
private static CountDownLatch sampleBillingHandlerShutdownLatch;

private class BamCompleteHandler implements Runnable {
final Phaser phaser;
Expand Down Expand Up @@ -248,6 +255,43 @@ public void run() {
}
}

private class SampleBillingHandler implements Runnable {
final Phaser phaser;
boolean interrupted = false;

SampleBillingHandler(Phaser phaser) {
this.phaser = phaser;
}

@Override
public void run() {
phaser.arrive();
while (true) {
try {
SampleBillingJson billing = sampleBillingQueue.poll(100, TimeUnit.MILLISECONDS);
if (billing != null) {
// this message is coming straight from the dashboard and should therefore always
// have data for a valid sample that exists in the database
// however this check is will make extra sure that the primary id received
// in the nats message actually exists in the database before conducting
// further operations in the db
if (sampleService.sampleExistsByPrimaryId(billing.getPrimaryId())) {
LOG.info("Updating billing information for sample: " + billing.getPrimaryId());
tempoService.updateSampleBilling(billing);
} else {
LOG.error("Cannot update billing information for sample that does not exist: "
+ billing.getPrimaryId());
}
}
} catch (InterruptedException e) {
interrupted = true;
} catch (Exception e) {
LOG.error("Error during handling of sample billing data", e);
}
}
}
}

@Override
public void intialize(Gateway gateway) throws Exception {
if (!initialized) {
Expand All @@ -256,6 +300,7 @@ public void intialize(Gateway gateway) throws Exception {
setupQcCompleteHandler(messagingGateway, this);
setupMafCompleteHandler(messagingGateway, this);
setupCohortCompleteHandler(messagingGateway, this);
setupSampleBillingHandler(messagingGateway, this);
initializeMessageHandlers();
initialized = true;
} else {
Expand Down Expand Up @@ -315,6 +360,19 @@ public void cohortCompleteHandler(CohortCompleteJson cohortEvent) throws Excepti
}
}

@Override
public void sampleBillingHandler(SampleBillingJson billing) throws Exception {
if (!initialized) {
throw new IllegalStateException("Message Handling Service has not been initialized");
}
if (!shutdownInitiated) {
sampleBillingQueue.put(billing);
} else {
LOG.error("Shutdown initiated, not accepting billing event: " + billing);
throw new IllegalStateException("Shutdown initiated, not handling any more TEMPO events");
}
}

@Override
public void shutdown() throws Exception {
if (!initialized) {
Expand All @@ -325,6 +383,7 @@ public void shutdown() throws Exception {
qcCompleteHandlerShutdownLatch.await();
mafCompleteHandlerShutdownLatch.await();
cohortCompleteHandlerShutdownLatch.await();
sampleBillingHandlerShutdownLatch.await();
shutdownInitiated = true;
}

Expand Down Expand Up @@ -368,6 +427,16 @@ private void initializeMessageHandlers() throws Exception {
exec.execute(new CohortCompleteHandler(cohortCompletePhaser));
}
cohortCompletePhaser.arriveAndAwaitAdvance();

// sample billing handler
sampleBillingHandlerShutdownLatch = new CountDownLatch(NUM_TEMPO_MSG_HANDLERS);
final Phaser sampleBillingPhaser = new Phaser();
sampleBillingPhaser.register();
for (int lc = 0; lc < NUM_TEMPO_MSG_HANDLERS; lc++) {
sampleBillingPhaser.register();
exec.execute(new SampleBillingHandler(sampleBillingPhaser));
}
sampleBillingPhaser.arriveAndAwaitAdvance();
}

private void setupBamCompleteHandler(Gateway gateway,
Expand Down Expand Up @@ -463,4 +532,24 @@ public void onMessage(Message msg, Object message) {
}
});
}

private void setupSampleBillingHandler(Gateway gateway,
TempoMessageHandlingService tempoMessageHandlingService) throws Exception {
gateway.subscribe(TEMPO_SAMPLE_BILLING_TOPIC, Object.class, new MessageConsumer() {
@Override
public void onMessage(Message msg, Object message) {
try {
String billingJson = mapper.readValue(
new String(msg.getData(), StandardCharsets.UTF_8),
String.class);
SampleBillingJson billing = mapper.readValue(billingJson,
SampleBillingJson.class);
tempoMessageHandlingService.sampleBillingHandler(billing);
} catch (Exception e) {
LOG.error("Exception occurred during processing of Cohort Complete event: "
+ TEMPO_SAMPLE_BILLING_TOPIC, e);
}
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.mskcc.smile.model.tempo.MafComplete;
import org.mskcc.smile.model.tempo.QcComplete;
import org.mskcc.smile.model.tempo.Tempo;
import org.mskcc.smile.model.tempo.json.SampleBillingJson;
import org.mskcc.smile.persistence.neo4j.TempoRepository;
import org.mskcc.smile.service.SmileRequestService;
import org.mskcc.smile.service.SmileSampleService;
Expand Down Expand Up @@ -51,6 +52,7 @@ public Tempo saveTempoData(Tempo tempo) throws Exception {
}

@Override
@Transactional(rollbackFor = {Exception.class})
public Tempo getTempoDataBySampleId(SmileSample smileSample) throws Exception {
Tempo tempo = tempoRepository.findTempoBySmileSampleId(smileSample.getSmileSampleId());
if (tempo == null) {
Expand All @@ -60,6 +62,7 @@ public Tempo getTempoDataBySampleId(SmileSample smileSample) throws Exception {
}

@Override
@Transactional(rollbackFor = {Exception.class})
public Tempo getTempoDataBySamplePrimaryId(String primaryId) throws Exception {
Tempo tempo = tempoRepository.findTempoBySamplePrimaryId(primaryId);
if (tempo == null) {
Expand All @@ -69,6 +72,7 @@ public Tempo getTempoDataBySamplePrimaryId(String primaryId) throws Exception {
}

@Override
@Transactional(rollbackFor = {Exception.class})
public Tempo mergeBamCompleteEventBySamplePrimaryId(String primaryId, BamComplete bamCompleteEvent)
throws Exception {
if (getTempoDataBySamplePrimaryId(primaryId) == null) {
Expand All @@ -79,6 +83,7 @@ public Tempo mergeBamCompleteEventBySamplePrimaryId(String primaryId, BamComplet
}

@Override
@Transactional(rollbackFor = {Exception.class})
public Tempo mergeQcCompleteEventBySamplePrimaryId(String primaryId, QcComplete qcCompleteEvent)
throws Exception {
if (getTempoDataBySamplePrimaryId(primaryId) == null) {
Expand All @@ -89,6 +94,7 @@ public Tempo mergeQcCompleteEventBySamplePrimaryId(String primaryId, QcComplete
}

@Override
@Transactional(rollbackFor = {Exception.class})
public Tempo mergeMafCompleteEventBySamplePrimaryId(String primaryId, MafComplete mafCompleteEvent)
throws Exception {
if (getTempoDataBySamplePrimaryId(primaryId) == null) {
Expand All @@ -99,6 +105,7 @@ public Tempo mergeMafCompleteEventBySamplePrimaryId(String primaryId, MafComplet
}

@Override
@Transactional(rollbackFor = {Exception.class})
public Tempo initAndSaveDefaultTempoData(String primaryId) throws Exception {
SmileSample sample = sampleService.getSampleByInputId(primaryId);
Tempo tempo = new Tempo(sample);
Expand All @@ -120,4 +127,10 @@ private Tempo getDetailedTempoData(Tempo tempo) {
tempo.setMafCompleteEvents(tempoRepository.findMafCompleteEventsByTempoId(tempo.getId()));
return tempo;
}

@Override
@Transactional(rollbackFor = {Exception.class})
public void updateSampleBilling(SampleBillingJson billing) throws Exception {
tempoRepository.updateSampleBilling(billing);
}
}
1 change: 1 addition & 0 deletions src/main/resources/application.properties.EXAMPLE
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,4 @@ tempo.wes_bam_complete_topic=
tempo.wes_qc_complete_topic=
tempo.wes_maf_complete_topic=
tempo.wes_cohort_complete_topic=
tempo.sample_billing_topic=

0 comments on commit d556c5e

Please sign in to comment.