Skip to content

Commit

Permalink
Split functionality from the Sync class out into smaller classes
Browse files Browse the repository at this point in the history
Also corrected default Directory URL
  • Loading branch information
DavidCroftDKFZ committed Sep 11, 2024
1 parent 28c16e8 commit 5001166
Show file tree
Hide file tree
Showing 11 changed files with 416 additions and 324 deletions.
28 changes: 14 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,20 +43,20 @@ We recommend using Docker for running Directory sync.

First, you will need to set up the environment variables for this:

| Variable | Purpose |Default if not specified |
|:-----------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------|:---------------------------------|
| DS_DIRECTORY_URL | Base URL of the Directory |https://directory.bbmri-eric.eu |
| DS_DIRECTORY_USER_NAME | User name for logging in to Directory | |
| DS_DIRECTORY_USER_PASS | Password for logging in to Directory | |
| DS_DIRECTORY_DEFAULT_COLLECTION_ID | ID of collection to be used if not in samples | |
| DS_DIRECTORY_MIN_DONORS | Minimum number of donors per star model hypercube |10 |
| DS_DIRECTORY_MAX_FACTS | Max number of star model hypercubes to be generated | |
| DS_DIRECTORY_ALLOW_STAR_MODEL | Set to 'True' to send star model info to Directory |False |
| DS_DIRECTORY_MOCK | Set to 'True' mock a Directory. In this mode, directory-sync will not contact the Directory. All Directory-related methods will simply return plausible fake values. |False |
| DS_FHIR_STORE_URL | URL for FHIR store |http://bridgehead-bbmri-blaze:8080|
| DS_TIMER_CRON | Execution interval for Directory sync, cron format | |
| DS_RETRY_MAX | Maximum number of retries when sync fails |10 |
| DS_RETRY_INTERVAL | Interval between retries (seconds) |20 seconds |
| Variable | Purpose | Default if not specified |
|:-----------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------|:---------------------------------------|
| DS_DIRECTORY_URL | Base URL of the Directory | https://directory-backend.molgenis.net |
| DS_DIRECTORY_USER_NAME | User name for logging in to Directory | |
| DS_DIRECTORY_USER_PASS | Password for logging in to Directory | |
| DS_DIRECTORY_DEFAULT_COLLECTION_ID | ID of collection to be used if not in samples | |
| DS_DIRECTORY_MIN_DONORS | Minimum number of donors per star model hypercube | 10 |
| DS_DIRECTORY_MAX_FACTS | Max number of star model hypercubes to be generated | |
| DS_DIRECTORY_ALLOW_STAR_MODEL | Set to 'True' to send star model info to Directory | False |
| DS_DIRECTORY_MOCK | Set to 'True' mock a Directory. In this mode, directory-sync will not contact the Directory. All Directory-related methods will simply return plausible fake values. | False |
| DS_FHIR_STORE_URL | URL for FHIR store | http://bridgehead-bbmri-blaze:8080 |
| DS_TIMER_CRON | Execution interval for Directory sync, cron format | |
| DS_RETRY_MAX | Maximum number of retries when sync fails | 10 |
| DS_RETRY_INTERVAL | Interval between retries (seconds) | 20 seconds |

DS_DIRECTORY_USER_NAME and DS_DIRECTORY_USER_PASS are mandatory. If you do not specify these,
Directory sync will not run. Contact [Directory admin]([email protected]) to get login credentials.
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
</parent>
<groupId>de.samply</groupId>
<artifactId>directory_sync_service</artifactId>
<version>1.4.4</version>
<version>1.4.5</version>
<name>directory_sync_service</name>
<description>Directory sync</description>
<url>https://github.com/samply/directory_sync_service</url>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,9 +364,12 @@ public List<String> getDiagnosisAvailable() {
*
* @param correctedDiagnoses A map containing diagnosis corrections, where the keys
* represent the original diagnoses and the values represent
* the corrected diagnoses.
* the corrected diagnoses. Return without applying corrections
* if this parameter is null.
*/
public void applyDiagnosisCorrections(Map<String, String> correctedDiagnoses) {
if (correctedDiagnoses == null)
return;
for (Entity entity: getEntities()) {
List<String> directoryDiagnoses = entity.getDiagnosisAvailable().stream()
.filter(diagnosis -> diagnosis != null && correctedDiagnoses.containsKey(diagnosis) && correctedDiagnoses.get(diagnosis) != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import java.util.function.Function;
import java.util.HashSet;

import de.samply.directory_sync_service.model.StarModelData;
import org.hl7.fhir.instance.model.api.IBaseBundle;
import org.hl7.fhir.instance.model.api.IBaseOperationOutcome;
import org.hl7.fhir.instance.model.api.IBaseResource;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,10 +272,12 @@ public int getFactCount() {
* <p>
* Note: This method directly modifies the factTables in-place.
*
* @param diagnoses Maps FHIR diagnoses onto Directory diagnoses.
* @param diagnoses Maps FHIR diagnoses onto Directory diagnoses. If null, no corrections are applied.
* @throws NullPointerException if diagnoses or any fact in factTables is null.
*/
public void applyDiagnosisCorrections(Map<String,String> diagnoses) {
if (diagnoses == null)
return;
for (Map<String, String> fact: factTables) {
if (!fact.containsKey("disease"))
continue;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package de.samply.directory_sync_service.sync;

import de.samply.directory_sync_service.Util;
import de.samply.directory_sync_service.directory.DirectoryApi;
import de.samply.directory_sync_service.directory.model.Biobank;
import de.samply.directory_sync_service.fhir.FhirApi;
import de.samply.directory_sync_service.model.BbmriEricId;
import org.hl7.fhir.r4.model.OperationOutcome;
import org.hl7.fhir.r4.model.Organization;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Optional;
import java.util.function.Function;

/**
* Updates the biobanks in the local FHIR store with metadata from the Directory.
*/
public class BiobanksUpdater {
private static final Logger logger = LoggerFactory.getLogger(BiobanksUpdater.class);
public static final Function<BiobankTuple, BiobankTuple> UPDATE_BIOBANK_NAME = t -> {
t.fhirBiobank.setName(t.dirBiobank.getName());
return t;
};

/**
* Updates all biobanks from the FHIR server with information from the Directory.
*
* @return the individual {@link OperationOutcome}s from each update
*/
public static boolean updateBiobanksInFhirStore(FhirApi fhirApi, DirectoryApi directoryApi) {
// Retrieve the list of all biobanks
List<Organization> organizations = fhirApi.listAllBiobanks();

// Check if the result is a failure or success
boolean succeeded = true;
if (organizations == null) {
logger.warn("error retrieving the biobanks");
succeeded = false;
} else {
// If successful, process each biobank and update it on the FHIR server if necessary
for (Organization organization : organizations) {
// Update each biobank and report any errors
if (!updateBiobankInFhirStore(fhirApi, directoryApi, organization)) {
logger.warn("updateBiobankOnFhirServerIfNecessary: problem updating: " + organization.getIdElement().getValue());
succeeded = false;
}
}
}

return succeeded;
}

/**
* Takes a biobank from FHIR and updates it with current information from the Directory.
*
* @param fhirBiobank the biobank to update.
* @return the {@link OperationOutcome} from the FHIR server update
*/
private static boolean updateBiobankInFhirStore(FhirApi fhirApi, DirectoryApi directoryApi, Organization fhirBiobank) {
logger.info("updateBiobankOnFhirServerIfNecessary: entered");

// Retrieve the biobank's BBMRI-ERIC identifier from the FHIR organization
Optional<BbmriEricId> bbmriEricIdOpt = FhirApi.bbmriEricId(fhirBiobank);

logger.info("updateBiobankOnFhirServerIfNecessary: bbmriEricIdOpt: " + bbmriEricIdOpt);

// Check if the identifier is present, if not, return false
if (!bbmriEricIdOpt.isPresent()) {
logger.warn("updateBiobankOnFhirServerIfNecessary: Missing BBMRI-ERIC identifier");
return false;
}
BbmriEricId bbmriEricId = bbmriEricIdOpt.get();

logger.info("updateBiobankOnFhirServerIfNecessary: bbmriEricId: " + bbmriEricId);

// Fetch the corresponding biobank from the Directory API
Biobank directoryBiobank = directoryApi.fetchBiobank(bbmriEricId);

logger.info("updateBiobankOnFhirServerIfNecessary: directoryBiobank: " + directoryBiobank);

// Check if fetching the biobank was successful, if not, return false
if (directoryBiobank == null) {
logger.warn("updateBiobankOnFhirServerIfNecessary: Failed to fetch biobank from Directory API");
return false;
}

logger.info("updateBiobankOnFhirServerIfNecessary: Create a BiobankTuple containing the FHIR biobank and the Directory biobank");

// Create a BiobankTuple containing the FHIR biobank and the Directory biobank
BiobankTuple biobankTuple = new BiobankTuple(fhirBiobank, directoryBiobank);

logger.info("updateBiobankOnFhirServerIfNecessary: Update the biobank name if necessary");

// Update the biobank name if necessary
BiobankTuple updatedBiobankTuple = UPDATE_BIOBANK_NAME.apply(biobankTuple);

logger.info("updateBiobankOnFhirServerIfNecessary: Check if any changes have been made; if not, return a no-update necessary outcome");

// Check if any changes have been made; if not, return true (because this outcome is OK)
if (!updatedBiobankTuple.hasChanged()) {
logger.info("updateBiobankOnFhirServerIfNecessary: No update necessary");
return true;
}

logger.info("updateBiobankOnFhirServerIfNecessary: Update the biobank resource on the FHIR server if changes were made");

// Update the biobank resource on the FHIR server
OperationOutcome updateOutcome = fhirApi.updateResource(updatedBiobankTuple.fhirBiobank);

String errorMessage = Util.getErrorMessageFromOperationOutcome(updateOutcome);

if (!errorMessage.isEmpty()) {
logger.warn("updateBiobankOnFhirServerIfNecessary: Problem during FHIR store update");
return false;
}

logger.info("updateBiobankOnFhirServerIfNecessary: done!");

return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package de.samply.directory_sync_service.sync;

import de.samply.directory_sync_service.Util;
import de.samply.directory_sync_service.converter.FhirCollectionToDirectoryCollectionPutConverter;
import de.samply.directory_sync_service.directory.DirectoryApi;
import de.samply.directory_sync_service.directory.MergeDirectoryCollectionGetToDirectoryCollectionPut;
import de.samply.directory_sync_service.directory.model.DirectoryCollectionGet;
import de.samply.directory_sync_service.directory.model.DirectoryCollectionPut;
import de.samply.directory_sync_service.fhir.FhirApi;
import de.samply.directory_sync_service.fhir.model.FhirCollection;
import de.samply.directory_sync_service.model.BbmriEricId;
import org.hl7.fhir.r4.model.OperationOutcome;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Map;

/**
* Update collections in the Directory with data from the local FHIR store.
*/
public class CollectionUpdater {
private static final Logger logger = LoggerFactory.getLogger(CollectionUpdater.class);

/**
* Take information from the FHIR store and send aggregated updates to the Directory.
* <p>
* This is a multi step process:
* 1. Fetch a list of collections objects from the FHIR store. These contain aggregated
* information over all specimens in the collections.
* 2. Convert the FHIR collection objects into Directory collection PUT DTOs. Copy
* over avaialble information from FHIR, converting where necessary.
* 3. Using the collection IDs found in the FHIR store, send queries to the Directory
* and fetch back the relevant GET collections. If any of the collection IDs cannot be
* found, this ie a breaking error.
* 4. Transfer data from the Directory GET collections to the corresponding Directory PUT
* collections.
* 5. Push the new information back to the Directory.
*
* @param defaultCollectionId The default collection ID to use for fetching collections from the FHIR store.
* @return A list of OperationOutcome objects indicating the outcome of the update operation.
*/
public static boolean sendUpdatesToDirectory(FhirApi fhirApi, DirectoryApi directoryApi, Map<String, String> correctedDiagnoses, String defaultCollectionId) {
try {
BbmriEricId defaultBbmriEricCollectionId = BbmriEricId
.valueOf(defaultCollectionId)
.orElse(null);

List<FhirCollection> fhirCollection = fhirApi.fetchFhirCollections(defaultBbmriEricCollectionId);
if (fhirCollection == null) {
logger.warn("Problem getting collections from FHIR store");
return false;
}
logger.info("__________ sendUpdatesToDirectory: FHIR collection count): " + fhirCollection.size());

DirectoryCollectionPut directoryCollectionPut = FhirCollectionToDirectoryCollectionPutConverter.convert(fhirCollection);
if (directoryCollectionPut == null) {
logger.warn("Problem converting FHIR attributes to Directory attributes");
return false;
}
logger.info("__________ sendUpdatesToDirectory: 1 directoryCollectionPut.getCollectionIds().size()): " + directoryCollectionPut.getCollectionIds().size());

List<String> collectionIds = directoryCollectionPut.getCollectionIds();
String countryCode = directoryCollectionPut.getCountryCode();
directoryApi.relogin();
DirectoryCollectionGet directoryCollectionGet = directoryApi.fetchCollectionGetOutcomes(countryCode, collectionIds);
if (directoryCollectionGet == null) {
logger.warn("Problem getting collections from Directory");
return false;
}
logger.info("__________ sendUpdatesToDirectory: 1 directoryCollectionGet.getItems().size()): " + directoryCollectionGet.getItems().size());

if (!MergeDirectoryCollectionGetToDirectoryCollectionPut.merge(directoryCollectionGet, directoryCollectionPut)) {
logger.warn("Problem merging Directory GET attributes to Directory PUT attributes");
return false;
}
logger.info("__________ sendUpdatesToDirectory: 2 directoryCollectionGet.getItems().size()): " + directoryCollectionGet.getItems().size());

// Apply corrections to ICD 10 diagnoses, to make them compatible with
// the Directory.
directoryCollectionPut.applyDiagnosisCorrections(correctedDiagnoses);
logger.info("__________ sendUpdatesToDirectory: 2 directoryCollectionPut.getCollectionIds().size()): " + directoryCollectionPut.getCollectionIds().size());

directoryApi.relogin();
OperationOutcome updateOutcome = directoryApi.updateEntities(directoryCollectionPut);
String errorMessage = Util.getErrorMessageFromOperationOutcome(updateOutcome);

if (!errorMessage.isEmpty()) {
logger.warn("sendUpdatesToDirectory: Problem during star model update");
return false;
}

return true;
} catch (Exception e) {
logger.warn("sendUpdatesToDirectory - unexpected error: " + Util.traceFromException(e));
return false;
}
}
}
Loading

0 comments on commit 5001166

Please sign in to comment.