Skip to content

HBPMedical/data-factory-airflow-dags

 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

CHUV License Codacy Badge Code Health CircleCI

Airflow MRI preprocessing DAGs

Requirements:

  • airflow-imaging-plugins
  • mri-preprocessing-pipeline
  • data-tracking

To see this project in action, go to the demo of MIP Data Factory and follow the instructions.

Setup and configuration

Airflow setup for MRI scans pipeline:

  • Create the following pools:

    • image_preprocessing with N slots, where N is less than the number of vCPUs available on the machine
    • remote_file_copy with N slots, where N should be 1 or 2 to avoid saturating network IO
  • In Airflow config file, add a [spm] section with the following entries:

    • SPM_DIR: path to the root folder of SPM 12.
  • In Airflow config file, add a [mipmap] section with the following entries (required only if MipMap is used for ETL):

    • DB_CONFIG_FILE: path to the configuration file used by MipMap to connect to its work database.
  • In Airflow config file, add a [data-factory] section with the following entries:

    • DATASETS: comma separated list of datasets to process. Each dataset is configured using a [<dataset>] section in the config file
    • EMAIL_ERRORS_TO: email address to send errors to
    • SLACK_CHANNEL: optional, Slack channel to use to send status messages
    • SLACK_CHANNEL_USER: optional, user to post as in Slack
    • SLACK_TOKEN: optional, authorisation token for Slack
    • DATA_CATALOG_SQL_ALCHEMY_CONN: connection URL to the Data catalog database tracking artifacts generated by the MRI pipelines.
    • I2B2_SQL_ALCHEMY_CONN: connection URL to the I2B2 database storing all the MRI pipelines results.
  • For each dataset, add a [data-factory:<dataset>] section, replacing <dataset> with the name of the dataset and define the following entries:

    • DATASET_LABEL: Name of the dataset
  • For each dataset, configure the [data-factory:<dataset>:reorganisation] section if you need to reorganise an input folder containing only files or folders that will be split into several folders, one per visit for example:

    • INPUT_FOLDER: Folder containing the original imaging data to process. This data should have been already anonymised by a tool
    • INPUT_FOLDER_DEPTH: depth of folders to explore while scanning the original imaging data to process.
    • INPUT_CONFIG: List of flags defining how incoming imaging data are organised, values are defined below in the preprocessing section.
    • MAX_ACTIVE_RUNS: maximum number of reorganisation tasks in parallel
    • FOLDER_FILTER: regex that describes acceptable folder names. Folders that does not fully match it will be discarded.
    • PIPELINES: List of pipelines to execute. Values are
      • copy_to_local: if used, input data are first copied to a local folder to speed-up processing.
      • dicom_reorganise:
        • output_folder: output folder that will contain the reorganised data.
        • output_folder_structure: description of the desired folder organisation. E.g. '#PatientID/#StudyID/#SeriesDescription/#SeriesNumber'
        • docker_image: organiser docker image.
        • docker_input_dir: docker input volume for the organiser (path inside the container).
        • docker_output_dir: docker output volume for the organiser (path inside the container).
        • allowed_field_values: list of fields with restricted set of values used to filter out unwanted images, e.g. FIELD=VALUE1,VALUE2,VALUE3 [FIELD2=VALUE1,VALUE2 ...]
      • nifti_reorganise:
        • output_folder: output folder that will contain the reorganised data.
        • output_folder_structure: description of the desired folder organisation. E.g. '#PatientID/#StudyID/#SeriesDescription/#SeriesNumber'
        • docker_image: organiser docker image.
        • docker_input_dir: docker input volume for the organiser (path inside the container).
        • docker_output_dir: docker output volume for the organiser (path inside the container).
      • trigger_preprocessing: scan the current folder and triggers preprocessing of images on each folder discovered
      • trigger_ehr: scan the current folder and triggers importation of EHR data on each folder discovered
  • If trigger_preprocessing is used, configure the [data-factory:<dataset>:reorganisation:trigger_preprocessing] section:

    • DEPTH: depth of folders to explore when triggering importation of EHR data
  • If trigger_ehr is used, configure the [data-factory:<dataset>:reorganisation:trigger_ehr] section:

    • DEPTH: depth of folders to explore when triggering preprocessing
  • For each dataset, now configure the [data-factory:<dataset>:preprocessing] section:

    • INPUT_FOLDER: Folder containing the original imaging data to process. This data should have been already anonymised by a tool. Not required when the reorganisation pipelines have been used before.
    • INPUT_CONFIG: List of flags defining how incoming imaging data are organised, values are
      • boost: (optional) When enabled, we consider that all the files from a same folder share the same meta-data. The processing is (about 2 times) faster. This option is enabled by default.
      • session_id_by_patient: Rarely, a data set might use study IDs which are unique by patient (not for the whole study). E.g.: LREN data. In such a case, you have to enable this flag. This will use PatientID + StudyID as a session ID.
      • visit_id_in_patient_id: Rarely, a data set might mix patient IDs and visit IDs. E.g. : LREN data. In such a case, you have to enable this flag. This will try to split PatientID into VisitID and PatientID.
      • visit_id_from_path: Enable this flag to get the visit ID from the folder hierarchy instead of DICOM meta-data (e.g. can be useful for PPMI).
      • repetition_from_path: Enable this flag to get the repetition ID from the folder hierarchy instead of DICOM meta-data (e.g. can be useful for PPMI).
    • MAX_ACTIVE_RUNS: maximum number of folders containing scans to pre-process in parallel
    • MIN_FREE_SPACE: minimum percentage of free space available on local disk
    • MISC_LIBRARY_PATH: path to the Misc&Libraries folder for SPM pipelines.
    • PIPELINES_PATH: path to the root folder containing the Matlab scripts for the pipelines
    • PROTOCOLS_DEFINITION_FILE: path to the default protocols definition file defining the protocols used on the scanner.
    • SCANNERS: List of methods describing how the preprocessing data folder is scanned for new work, values are
      • continuous: input folder is scanned frequently for new data. Sub-folders should contain a .ready file to indicate that processing can be performed on that folder.
      • daily: input folder contains a sub-folder for the year, this folder contains daily sub-folders for each day of the year (format yyyyMMdd). Those daily sub-folders in turn contain the folders for each scan to process.
      • once: input folder contains a set of sub-folders each containing a scan to process.
    • PIPELINES: List of pipelines to execute. Values are
      • copy_to_local: if used, input data are first copied to a local folder to speed-up processing.
      • dicom_to_nifti: convert all DICOM files to Nifti format.
      • mpm_maps: computes the Multiparametric Maps (MPMs) and brain segmentation in different tissue maps.
      • neuro_morphometric_atlas: computes an individual Atlas based on the NeuroMorphometrics Atlas.
      • export_features: exports neuroimaging features stored in CSV files to the I2B2 database
      • catalog_to_i2b2: exports meta-data from the data catalog to the I2B2 database.
  • If copy_to_local is used, configure the [data-factory:<dataset>:preprocessing:copy_to_local] section:

    • OUTPUT_FOLDER: destination folder for the local copy
  • If dicom_to_nifti is used or required (when DICOM images are used as input), configure the [data-factory:<dataset>:preprocessing:dicom_to_nifti] section:

    • OUTPUT_FOLDER: destination folder for the Nifti images
    • BACKUP_FOLDER: backup folder for the Nitfi images
    • SPM_FUNCTION: SPM function called. Default to 'DCM2NII_LREN'
    • PIPELINE_PATH: path to the folder containing the SPM script for this pipeline. Default to [data-factory:<dataset>:preprocessing]PIPELINES_PATH + '/Nifti_Conversion_Pipeline'
    • MISC_LIBRARY_PATH: path to the Misc&Libraries folder for SPM pipelines. Default to MISC_LIBRARY_PATH value in [data-factory:<dataset>:preprocessing] section.
    • PROTOCOLS_DEFINITION_FILE: path to the Protocols definition file defining the protocols used on the scanner. Default to PROTOCOLS_DEFINITION_FILE value in [data-factory:<dataset>:preprocessing] section.
    • DCM2NII_PROGRAM: Path to DCM2NII program. Default to [data-factory:<dataset>:preprocessing]PIPELINES_PATH + '/dcm2nii'
  • If mpm_maps is used, configure the [data-factory:<dataset>:preprocessing:mpm_maps] section:

    • OUTPUT_FOLDER: destination folder for the MPMs and brain segmentation
    • BACKUP_FOLDER: backup folder for the MPMs and brain segmentation
    • SPM_FUNCTION: SPM function called. Default to 'Preproc_mpm_maps'
    • PIPELINE_PATH: path to the folder containing the SPM script for this pipeline. Default to [data-factory:<dataset>:preprocessing]PIPELINES_PATH + '/MPMs_Pipeline'
    • MISC_LIBRARY_PATH: path to the Misc&Libraries folder for SPM pipelines. Default to MISC_LIBRARY_PATH value in [data-factory:<dataset>:preprocessing] section.
    • PROTOCOLS_DEFINITION_FILE: path to the Protocols definition file defining the protocols used on the scanner. Default to PROTOCOLS_DEFINITION_FILE value in [data-factory:<dataset>:preprocessing] section.
  • If neuro_morphometric_atlas is used, configure the [data-factory:<dataset>:preprocessing:neuro_morphometric_atlas] section:

    • OUTPUT_FOLDER: destination folder for the Atlas File, the volumes of the Morphometric Atlas structures (.txt), the csv file containing the volume, and globals plus Multiparametric Maps (R2*, R1, MT, PD) for each structure defined in the Subject Atlas.
    • BACKUP_FOLDER: backup folder for the Atlas File, the volumes of the Morphometric Atlas structures (.txt), the csv file containing the volume, and globals plus Multiparametric Maps (R2*, R1, MT, PD) for each structure defined in the Subject Atlas.
    • SPM_FUNCTION: SPM function called. Default to 'NeuroMorphometric_pipeline'
    • PIPELINE_PATH: path to the folder containing the SPM script for this pipeline. Default to [data-factory:<dataset>:preprocessing]PIPELINES_PATH + '/NeuroMorphometric_Pipeline/NeuroMorphometric_tbx/label'
    • MISC_LIBRARY_PATH: path to the Misc&Libraries folder for SPM pipelines. Default to MISC_LIBRARY_PATH value in [data-factory:<dataset>:preprocessing] section.
    • PROTOCOLS_DEFINITION_FILE: path to the Protocols definition file defining the protocols used on the scanner. Default to PROTOCOLS_DEFINITION_FILE value in [data-factory:<dataset>:preprocessing] section.
    • TPM_TEMPLATE: Path to the the template used for segmentation step in case the image is not segmented. Default to SPM_DIR + 'tpm/nwTPM_sl3.nii'
  • For each dataset, now configure the [data-factory:<dataset>:ehr] section:

    • INPUT_FOLDER: Folder containing the original EHR data to process. This data should have been already anonymised by a tool
    • INPUT_FOLDER_DEPTH: When a once scanner is used, indicates the depth of folders to traverse before reaching EHR data. Default to 1.
    • MIN_FREE_SPACE: minimum percentage of free space available on local disk
    • SCANNERS: List of methods describing how the EHR data folder is scanned for new work, values are
      • daily: input folder contains a sub-folder for the year, this folder contains daily sub-folders for each day of the year (format yyyyMMdd). Those daily sub-folders in turn contain the EHR files in CSV format to process.
      • once: input folder contains the EHR files in CSV format to process.
    • PIPELINES: List of pipelines to execute. Values are
      • map_ehr_to_i2b2: .
  • Configure the [data-factory:<dataset>:ehr:map_ehr_to_i2b2] section:

    • DOCKER_IMAGE: Docker image of the tool that maps EHR data to an I2B2 schema.
  • Configure the [data-factory:<dataset>:ehr:version_incoming_ehr] section:

    • OUTPUT_FOLDER: output folder used to store versioned EHR data.

Sample configuration:


[spm]
spm_dir = /opt/spm12
[mipmap]
db_config_file = /dev/null
[data-factory]
data_catalog_sql_alchemy_conn = postgresql://data_catalog:datacatalogpwd@demo:4321/data_catalog
datasets = main
email_errors_to =
i2b2_sql_alchemy_conn = postgresql://i2b2:i2b2pwd@demo:4321/i2b2
slack_channel = #data
slack_channel_user = Airflow
slack_token =
[data-factory:main]
dataset_label = Demo
[data-factory:main:preprocessing]
input_config = boost
input_folder = /data/demo
max_active_runs = 30
min_free_space = 0.3
misc_library_path = /opt/airflow-scripts/mri-preprocessing-pipeline/Miscellaneous&Others
pipelines = dicom_to_nifti,mpm_maps,neuro_morphometric_atlas,export_features,catalog_to_i2b2
pipelines_path = /opt/airflow-scripts/mri-preprocessing-pipeline/Pipelines
protocols_definition_file = /opt/airflow-scripts/mri-preprocessing-pipeline/Protocols_definition.txt
scanners = once
[data-factory:main:preprocessing:copy_to_local]
output_folder = /data/incoming
[data-factory:main:preprocessing:dicom_to_nifti]
backup_folder =
dcm2nii_program = /opt/airflow-scripts/mri-preprocessing-pipeline/Pipelines/Nifti_Conversion_Pipeline/dcm2nii
misc_library_path = /opt/airflow-scripts/mri-preprocessing-pipeline/Miscellaneous&Others
output_folder = /data/nifti
pipeline_path = /opt/airflow-scripts/mri-preprocessing-pipeline/Pipelines
protocols_definition_file = /opt/airflow-scripts/mri-preprocessing-pipeline/Protocols_definition.txt
spm_function = DCM2NII_LREN
[data-factory:main:preprocessing:mpm_maps]
backup_folder =
misc_library_path = /opt/airflow-scripts/mri-preprocessing-pipeline/Miscellaneous&Others
output_folder = /data/mpm_maps
pipeline_path = /opt/airflow-scripts/mri-preprocessing-pipeline/Pipelines
protocols_definition_file = /opt/airflow-scripts/mri-preprocessing-pipeline/Protocols_definition.txt
spm_function = Preproc_mpm_maps
[data-factory:main:preprocessing:neuro_morphometric_atlas]
backup_folder =
misc_library_path = /opt/airflow-scripts/mri-preprocessing-pipeline/Miscellaneous&Others
output_folder = /data/neuro_morphometric_atlas
pipeline_path = /opt/airflow-scripts/mri-preprocessing-pipeline/Pipelines
protocols_definition_file = /opt/airflow-scripts/mri-preprocessing-pipeline/Protocols_definition.txt
spm_function = NeuroMorphometric_pipeline
tpm_template = /opt/spm12/tpm/nwTPM_sl3.nii
[data-factory:main:ehr]
input_folder = /data/ehr_demo
input_folder_depth = 0
max_active_runs = 30
min_free_space = 0.3
pipelines =
scanners = once
[data-factory:main:ehr:map_ehr_to_i2b2]
docker_image = hbpmip/mipmap-demo-ehr-to-i2b2:0.1
[data-factory:main:ehr:version_incoming_ehr]
output_folder = /data/ehr_versioned
[data-factory:main:reorganisation]
input_config = boost
input_folder = /data/demo
max_active_runs = 30
min_free_space = 0.3
pipelines = dicom_reorganise,trigger_preprocessing
[data-factory:main:reorganisation:copy_to_local]
output_folder = /data/all_incoming
[data-factory:main:reorganisation:dicom_reorganise]
docker_image = hbpmip/hierarchizer:1.3.6
docker_input_dir = /input_folder
docker_output_dir = /output_folder
output_folder = /data/dicom_organised
output_folder_structure = #PatientID/#StudyID/#SeriesDescription/#SeriesNumber
[data-factory:main:reorganisation:nifti_reorganise]
docker_image = hbpmip/hierarchizer:1.3.6
docker_input_dir = /input_folder
docker_output_dir = /output_folder
output_folder = /data/nifti_organised
output_folder_structure = #PatientID/#StudyID/#SeriesDescription/#SeriesNumber
[data-factory:main:reorganisation:trigger_preprocessing]
depth = 1
[data-factory:main:reorganisation:trigger_ehr]
depth = 0

Acknowledgements

This work has been funded by the European Union Seventh Framework Program (FP7/2007­2013) under grant agreement no. 604102 (HBP)

This work is part of SP8 of the Human Brain Project (SGA1).