Skip to content

Commit

Permalink
Merge pull request #146 from overture-stack/rc/3.0.0-fe99363
Browse files Browse the repository at this point in the history
release 3.0.0
#116 fix junit 5 version to avoid conflicts in class path
#118 update analysis centric index fields, add data_type, rename files filed to file, remove data_path and metadata_path from repositories field
#119 kafka stream listener configurable status (PUBLISHED, SUPPRESSED, wtv)
#120 add support for cram/crai index files
#121 change specimen type and sample type to list
#127 update mappings fields
#129 rename format to file_type
#131 remove last_modified from file centric mapping
#132 rename file-id to file-object_id
#135 fix index analysis response message
#136 add maven formatter
#139 add ability to remove document by analysis id from analysis_centric
#143 add dynamic info indexed field support to donor, Specimen, sample, and file
update test document to cover that for file centric document
#145 fix NPE for analysis centric index
  • Loading branch information
blabadi authored Jun 10, 2020
2 parents 388ac6c + fe99363 commit 046ab3a
Show file tree
Hide file tree
Showing 135 changed files with 6,466 additions and 5,696 deletions.
2 changes: 1 addition & 1 deletion .mvn/maven.config
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
-Drevision=2.0.0
-Drevision=3.0.0
-Dsha1=
-Dchangelist=-SNAPSHOT
6 changes: 3 additions & 3 deletions docs/features.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ As an example, you can see in the `repositories` section of the following docume
"access": "controlled",
"study": "ABCD-CA",
"analysis": {
"id": "EGAZ",
"type": "sequencingRead",
"state": "PUBLISHED",
"analysis_id": "EGAZ",
"analysis_type": "sequencingRead",
"analysis_state": "PUBLISHED",
"study": "ABCD-CA",
"experiment": {
"analysisId": "EGAZ",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
@SpringBootApplication
@Import({RootConfiguration.class})
public class Maestro {
public static void main(String[] args) {
SpringApplication.run(Maestro.class, args);
}
public static void main(String[] args) {
SpringApplication.run(Maestro.class, args);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,10 @@
@ToString
@AllArgsConstructor
class IndexAnalysisMessage {
@NonNull
private String analysisId;
@NonNull
private String studyId;
@NonNull
private String repositoryCode;
@NonNull private String analysisId;
@NonNull private String studyId;
@NonNull private String repositoryCode;

/** if callers set this flag it will do a remove instead of add.*/
private Boolean removeAnalysis = false;
/** if callers set this flag it will do a remove instead of add. */
private Boolean removeAnalysis = false;
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,9 @@
@ToString
@AllArgsConstructor
class IndexMessage {
private String analysisId;
private String studyId;
@NonNull
private String repositoryCode;
/** if callers set this flag it will do a remove instead of add.*/
private Boolean removeAnalysis = false;
private String analysisId;
private String studyId;
@NonNull private String repositoryCode;
/** if callers set this flag it will do a remove instead of add. */
private Boolean removeAnalysis = false;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,45 +19,46 @@

import bio.overture.maestro.domain.api.message.IndexResult;
import io.vavr.Tuple2;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.function.Supplier;

@Slf4j
public class IndexMessagesHelper {
public static <T> void handleIndexRepository(Supplier<Mono<Tuple2<T, IndexResult>>> resultSupplier) {
val result = resultSupplier.get().blockOptional();
val tuple = result.orElseThrow(() -> new RuntimeException("failed to obtain result"));
if (!tuple._2().isSuccessful()) {
log.error("failed to process message : {} successfully", tuple._1());
throw new RuntimeException("failed to process the message");
}
public static <T> void handleIndexRepository(
Supplier<Mono<Tuple2<T, IndexResult>>> resultSupplier) {
val result = resultSupplier.get().blockOptional();
val tuple = result.orElseThrow(() -> new RuntimeException("failed to obtain result"));
if (!tuple._2().isSuccessful()) {
log.error("failed to process message : {} successfully", tuple._1());
throw new RuntimeException("failed to process the message");
}
}

public static <T> void handleIndexResult(Supplier<Flux<Tuple2<T, IndexResult>>> resultSupplier) {
/*
* Why Blocking?
*
* - this is a stream consumer, it's supposed to process one message at a time
* the value of reactive processing diminishes since the queue provides a buffering level,
* without blocking it will async process the messages and if one fails we can
* async add it to a DLQ in the subscriber, However, I opted to use blocking because of the next point.
*
* - spring reactive cloud stream is deprecated in favor of spring cloud functions that support
* stream processing: https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/2.2.0.RELEASE/spring-cloud-stream.html#spring_cloud_function
* so I don't want to use a deprecated library, and if needed we can switch to cloud function in future
* https://stackoverflow.com/questions/53438208/spring-cloud-stream-reactive-how-to-do-the-error-handling-in-case-of-reactive
*/
val result = resultSupplier.get().collectList().blockOptional();
val tupleList = result.orElseThrow(() -> new RuntimeException("failed to obtain result"));
tupleList.forEach(tuple -> {
if (!tuple._2().isSuccessful()) {
log.error("failed to process message : {} successfully", tuple._1());
throw new RuntimeException("failed to process the message");
}
public static <T> void handleIndexResult(Supplier<Flux<Tuple2<T, IndexResult>>> resultSupplier) {
/*
* Why Blocking?
*
* - this is a stream consumer, it's supposed to process one message at a time
* the value of reactive processing diminishes since the queue provides a buffering level,
* without blocking it will async process the messages and if one fails we can
* async add it to a DLQ in the subscriber, However, I opted to use blocking because of the next point.
*
* - spring reactive cloud stream is deprecated in favor of spring cloud functions that support
* stream processing: https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/2.2.0.RELEASE/spring-cloud-stream.html#spring_cloud_function
* so I don't want to use a deprecated library, and if needed we can switch to cloud function in future
* https://stackoverflow.com/questions/53438208/spring-cloud-stream-reactive-how-to-do-the-error-handling-in-case-of-reactive
*/
val result = resultSupplier.get().collectList().blockOptional();
val tupleList = result.orElseThrow(() -> new RuntimeException("failed to obtain result"));
tupleList.forEach(
tuple -> {
if (!tuple._2().isSuccessful()) {
log.error("failed to process message : {} successfully", tuple._1());
throw new RuntimeException("failed to process the message");
}
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,5 @@
@ToString
@AllArgsConstructor
class IndexRepositoryMessage {
@NonNull
private String repositoryCode;
@NonNull private String repositoryCode;
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
@ToString
@AllArgsConstructor
class IndexStudyMessage {
@NonNull
private String studyId;
@NonNull
private String repositoryCode;
@NonNull private String studyId;
@NonNull private String repositoryCode;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package bio.overture.maestro.app.infra.adapter.inbound.messaging;

import static bio.overture.maestro.app.infra.adapter.inbound.messaging.IndexMessagesHelper.handleIndexRepository;
import static bio.overture.maestro.app.infra.adapter.inbound.messaging.IndexMessagesHelper.handleIndexResult;

import bio.overture.maestro.domain.api.Indexer;
import bio.overture.maestro.domain.api.exception.FailureData;
import bio.overture.maestro.domain.api.message.*;
Expand All @@ -32,111 +35,118 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import static bio.overture.maestro.app.infra.adapter.inbound.messaging.IndexMessagesHelper.handleIndexRepository;
import static bio.overture.maestro.app.infra.adapter.inbound.messaging.IndexMessagesHelper.handleIndexResult;

@Slf4j
@EnableBinding(Sink.class)
public class IndexingMessagesStreamListener {

private final Indexer indexer;

public IndexingMessagesStreamListener(@NonNull Indexer indexer) {
this.indexer = indexer;
}

@StreamListener(Sink.INPUT)
public void handleAnalysisMessage(@Payload IndexMessage indexMessage) {
if (isAnalysisReq(indexMessage)) {
val indexAnalysisMessage = new IndexAnalysisMessage(indexMessage.getAnalysisId(),
indexMessage.getStudyId(),
indexMessage.getRepositoryCode(),
indexMessage.getRemoveAnalysis());
handleIndexResult(() -> this.indexOrRemoveAnalysis(indexAnalysisMessage));
} else if (isStudyMsg(indexMessage)) {
val indexStudyMessage = new IndexStudyMessage(indexMessage.getStudyId(), indexMessage.getRepositoryCode());
handleIndexResult(() -> this.indexStudy(indexStudyMessage));
} else if (isRepoMsg(indexMessage)) {
val indexRepositoryMessage = new IndexRepositoryMessage(indexMessage.getRepositoryCode());
handleIndexRepository(() -> this.indexRepository(indexRepositoryMessage));
} else {
throw new IllegalArgumentException("invalid message format");
}
}

private boolean isAnalysisReq(IndexMessage indexMessage) {
return !StringUtils.isEmpty(indexMessage.getAnalysisId())
&& !StringUtils.isEmpty(indexMessage.getStudyId())
&& !StringUtils.isEmpty(indexMessage.getRepositoryCode());
}

private boolean isStudyMsg(IndexMessage indexMessage) {
return StringUtils.isEmpty(indexMessage.getAnalysisId())
&& !StringUtils.isEmpty(indexMessage.getStudyId())
&& !StringUtils.isEmpty(indexMessage.getRepositoryCode());
}

private boolean isRepoMsg(IndexMessage indexMessage) {
return StringUtils.isEmpty(indexMessage.getAnalysisId())
&& StringUtils.isEmpty(indexMessage.getStudyId())
&& !StringUtils.isEmpty(indexMessage.getRepositoryCode());
private final Indexer indexer;

public IndexingMessagesStreamListener(@NonNull Indexer indexer) {
this.indexer = indexer;
}

@StreamListener(Sink.INPUT)
public void handleAnalysisMessage(@Payload IndexMessage indexMessage) {
if (isAnalysisReq(indexMessage)) {
val indexAnalysisMessage =
new IndexAnalysisMessage(
indexMessage.getAnalysisId(),
indexMessage.getStudyId(),
indexMessage.getRepositoryCode(),
indexMessage.getRemoveAnalysis());
handleIndexResult(() -> this.indexOrRemoveAnalysis(indexAnalysisMessage));
} else if (isStudyMsg(indexMessage)) {
val indexStudyMessage =
new IndexStudyMessage(indexMessage.getStudyId(), indexMessage.getRepositoryCode());
handleIndexResult(() -> this.indexStudy(indexStudyMessage));
} else if (isRepoMsg(indexMessage)) {
val indexRepositoryMessage = new IndexRepositoryMessage(indexMessage.getRepositoryCode());
handleIndexRepository(() -> this.indexRepository(indexRepositoryMessage));
} else {
throw new IllegalArgumentException("invalid message format");
}

private Flux<Tuple2<IndexAnalysisMessage, IndexResult>> indexOrRemoveAnalysis(IndexAnalysisMessage msg) {
if (msg.getRemoveAnalysis()) {
return Flux.from(removeAnalysis(msg));
} else {
return indexAnalysis(msg);
}
}

private boolean isAnalysisReq(IndexMessage indexMessage) {
return !StringUtils.isEmpty(indexMessage.getAnalysisId())
&& !StringUtils.isEmpty(indexMessage.getStudyId())
&& !StringUtils.isEmpty(indexMessage.getRepositoryCode());
}

private boolean isStudyMsg(IndexMessage indexMessage) {
return StringUtils.isEmpty(indexMessage.getAnalysisId())
&& !StringUtils.isEmpty(indexMessage.getStudyId())
&& !StringUtils.isEmpty(indexMessage.getRepositoryCode());
}

private boolean isRepoMsg(IndexMessage indexMessage) {
return StringUtils.isEmpty(indexMessage.getAnalysisId())
&& StringUtils.isEmpty(indexMessage.getStudyId())
&& !StringUtils.isEmpty(indexMessage.getRepositoryCode());
}

private Flux<Tuple2<IndexAnalysisMessage, IndexResult>> indexOrRemoveAnalysis(
IndexAnalysisMessage msg) {
if (msg.getRemoveAnalysis()) {
return Flux.from(removeAnalysis(msg));
} else {
return indexAnalysis(msg);
}

private Mono<Tuple2<IndexAnalysisMessage, IndexResult>> removeAnalysis(IndexAnalysisMessage msg) {
return indexer.removeAnalysis(RemoveAnalysisCommand.builder()
.analysisIdentifier(AnalysisIdentifier.builder()
.studyId(msg.getStudyId())
.analysisId(msg.getAnalysisId())
.repositoryCode(msg.getRepositoryCode())
.build())
}

private Flux<Tuple2<IndexAnalysisMessage, IndexResult>> removeAnalysis(IndexAnalysisMessage msg) {
return indexer
.removeAnalysis(
RemoveAnalysisCommand.builder()
.analysisIdentifier(
AnalysisIdentifier.builder()
.studyId(msg.getStudyId())
.analysisId(msg.getAnalysisId())
.repositoryCode(msg.getRepositoryCode())
.build())
.build())
.map(out -> new Tuple2<>(msg, out))
.onErrorResume((e) -> catchUnhandledErrors(msg, e));
}
.map(out -> new Tuple2<>(msg, out))
.onErrorResume((e) -> catchUnhandledErrors(msg, e));
}

private Flux<Tuple2<IndexAnalysisMessage, IndexResult>> indexAnalysis(IndexAnalysisMessage msg) {
return indexer
.indexAnalysis(
IndexAnalysisCommand.builder()
.analysisIdentifier(
AnalysisIdentifier.builder()
.studyId(msg.getStudyId())
.analysisId(msg.getAnalysisId())
.repositoryCode(msg.getRepositoryCode())
.build())
.build())
.map(out -> new Tuple2<>(msg, out))
.onErrorResume((e) -> catchUnhandledErrors(msg, e));
}

private Flux<Tuple2<IndexAnalysisMessage, IndexResult>> indexAnalysis(IndexAnalysisMessage msg) {
return indexer.indexAnalysis(IndexAnalysisCommand.builder()
.analysisIdentifier(AnalysisIdentifier.builder()
private Flux<Tuple2<IndexStudyMessage, IndexResult>> indexStudy(IndexStudyMessage msg) {
return indexer
.indexStudy(
IndexStudyCommand.builder()
.studyId(msg.getStudyId())
.analysisId(msg.getAnalysisId())
.repositoryCode(msg.getRepositoryCode())
.build()
).build())
.build())
.map(out -> new Tuple2<>(msg, out));
}

private Mono<Tuple2<IndexRepositoryMessage, IndexResult>> indexRepository(
IndexRepositoryMessage msg) {
return indexer
.indexRepository(
IndexStudyRepositoryCommand.builder().repositoryCode(msg.getRepositoryCode()).build())
.map(out -> new Tuple2<>(msg, out))
.onErrorResume((e) -> catchUnhandledErrors(msg, e));
}

private Flux<Tuple2<IndexStudyMessage, IndexResult>> indexStudy(IndexStudyMessage msg) {
return indexer.indexStudy(IndexStudyCommand.builder()
.studyId(msg.getStudyId())
.repositoryCode(msg.getRepositoryCode())
.build())
.map(out -> new Tuple2<>(msg, out));
}

private Mono<Tuple2<IndexRepositoryMessage, IndexResult>> indexRepository(IndexRepositoryMessage msg) {
return indexer.indexRepository(IndexStudyRepositoryCommand.builder()
.repositoryCode(msg.getRepositoryCode())
.build())
.map(out -> new Tuple2<>(msg, out))
.onErrorResume((e) -> catchUnhandledErrors(msg, e));
}

private <T> Mono<Tuple2<T, IndexResult>> catchUnhandledErrors(T msg, Throwable e) {
log.error("failed processing message: {} ", msg, e);
val indexResult = IndexResult.builder()
.successful(false)
.failureData(FailureData.builder().build())
.build();
return Mono.just(new Tuple2<>(msg, indexResult));
}

}

private <T> Mono<Tuple2<T, IndexResult>> catchUnhandledErrors(T msg, Throwable e) {
log.error("failed processing message: {} ", msg, e);
val indexResult =
IndexResult.builder().successful(false).failureData(FailureData.builder().build()).build();
return Mono.just(new Tuple2<>(msg, indexResult));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@
import bio.overture.maestro.app.infra.adapter.inbound.messaging.song.SongAnalysisStreamListener;
import org.springframework.context.annotation.Import;


@Import({
IndexingMessagesStreamListener.class,
SongAnalysisStreamListener.class,
IndexingMessagesStreamListener.class,
SongAnalysisStreamListener.class,
})
public class MessagingConfig { }
public class MessagingConfig {}
Loading

0 comments on commit 046ab3a

Please sign in to comment.