diff --git a/xcp_d/utils/bids.py b/xcp_d/utils/bids.py index 19ab33c39..0357d9b60 100644 --- a/xcp_d/utils/bids.py +++ b/xcp_d/utils/bids.py @@ -306,11 +306,27 @@ def write_dataset_description(fmri_dir, xcpd_dir): Path to the BIDS derivative dataset being ingested. xcpd_dir : str Path to the output xcp-d dataset. + + Returns + ------- + usable_dataset_links : :obj:`dict` + A dictionary containing any dataset links that should be used in xcpd derivatives. + The keys to this dictionary are standardized strings to be referenced in xcpd code. + The values are 2-element tuples, with the first element being the DatasetLink key + (which may vary depending on pre-existing DatasetLinks) and the second element + being the path to be replaced in the derivative references. + + For example, ``{"preprocessed": ("preprocessed10", "../fmriprep/")}`` might be used to + change ``Sources: ["/path/to/derivatives/fmriprep/sub-01/thing.nii.gz"]`` + (a meaningless path outside the user's filesystem) + to ``Sources: ["bids:preprocessed10:sub-01/thing.nii.gz"]`` (a valid BIDS URI). """ import json from xcp_d.__about__ import DOWNLOAD_URL, __version__ + MINIMUM_BIDS_VERSION = "1.8.0" + orig_dset_description = os.path.join(fmri_dir, "dataset_description.json") if not os.path.isfile(orig_dset_description): dset_desc = {} @@ -321,6 +337,14 @@ def write_dataset_description(fmri_dir, xcpd_dir): assert dset_desc["DatasetType"] == "derivative" + # xcp-d uses BIDS URIs, which require BIDS version >= 1.8.0 + orig_bids_version = dset_desc.get("BIDSVersion", MINIMUM_BIDS_VERSION) + if Version(orig_bids_version).public < Version(MINIMUM_BIDS_VERSION).public: + LOGGER.warning( + f"Updating BIDSVersion from {orig_bids_version} to {MINIMUM_BIDS_VERSION}." + ) + dset_desc["BIDSVersion"] = MINIMUM_BIDS_VERSION + # Update dataset description dset_desc["Name"] = "XCP-D: A Robust Postprocessing Pipeline of fMRI data" generated_by = dset_desc.get("GeneratedBy", []) @@ -335,6 +359,30 @@ def write_dataset_description(fmri_dir, xcpd_dir): dset_desc["GeneratedBy"] = generated_by dset_desc["HowToAcknowledge"] = "Include the generated boilerplate in the methods section." + # Define a dataset link to preprocessed derivatives in BIDS URIs throughout xcpd derivatives. + dataset_links = dset_desc.get("DatasetLinks", {}) + relative_path = os.path.relpath(fmri_dir, start=xcpd_dir) + + # If undefined or already set to the same path, use "preprocessed" + if dataset_links.get("preprocessed", relative_path) == relative_path: + usable_dataset_links = {"preprocessed": ("preprocessed", relative_path)} + else: + LOGGER.info( + "DatasetLink 'preprocessed' is already defined. " + "Will use 'preprocessed([0-9]+)' instead." + ) + + # Find an unused (or pre-set) preprocessed([0-9]+) key to use. + key_counter = 0 + while dataset_links.get(f"preprocessed{key_counter}", relative_path) != relative_path: + key_counter += 1 + + usable_dataset_links = {"preprocessed": (f"preprocessed{key_counter}", relative_path)} + + dataset_links.update( + {usable_dataset_links["preprocessed"][0]: usable_dataset_links["preprocessed"][1]} + ) + xcpd_dset_description = os.path.join(xcpd_dir, "dataset_description.json") if os.path.isfile(xcpd_dset_description): with open(xcpd_dset_description, "r") as fo: @@ -342,8 +390,22 @@ def write_dataset_description(fmri_dir, xcpd_dir): old_version = old_dset_desc["GeneratedBy"][0]["Version"] if Version(__version__).public != Version(old_version).public: - LOGGER.warning(f"Previous output generated by version {old_version} found.") + LOGGER.error(f"Previous output generated by version {old_version} found.") else: with open(xcpd_dset_description, "w") as fo: json.dump(dset_desc, fo, indent=4, sort_keys=True) + + return usable_dataset_links + + +def resolve_bids_uri(filepath, xcpd_dir, dataset_links): + """Resolve BIDS URI.""" + dataset_name, xcpd_dir_relpath = dataset_links['preprocessed'] + + preproc_dir = os.path.abspath(os.path.join(xcpd_dir, xcpd_dir_relpath)) + filepath_relative = os.path.relpath(filepath, start=preproc_dir) + + bids_uri = f"bids:{dataset_name}:{filepath_relative}" + + return bids_uri diff --git a/xcp_d/workflow/base.py b/xcp_d/workflow/base.py index bee346f90..8589acd72 100644 --- a/xcp_d/workflow/base.py +++ b/xcp_d/workflow/base.py @@ -148,7 +148,7 @@ def init_xcpd_wf( xcpd_wf.base_dir = work_dir print(f"Begin the {name} workflow") - write_dataset_description(fmri_dir, os.path.join(output_dir, "xcp_d")) + dataset_links = write_dataset_description(fmri_dir, os.path.join(output_dir, "xcp_d")) for subject_id in subject_list: single_subj_wf = init_subject_wf( @@ -176,6 +176,7 @@ def init_xcpd_wf( fd_thresh=fd_thresh, func_only=func_only, input_type=input_type, + dataset_links=dataset_links, name=f"single_subject_{subject_id}_wf", ) @@ -215,6 +216,7 @@ def init_subject_wf( func_only, output_dir, input_type, + dataset_links, name, ): """Organize the postprocessing pipeline for a single subject. @@ -250,6 +252,7 @@ def init_subject_wf( func_only=False, output_dir=".", input_type="fmriprep", + dataset_links={}, name="single_subject_sub-01_wf", ) @@ -302,13 +305,22 @@ def init_subject_wf( preproc_files = preproc_cifti_files if cifti else preproc_nifti_files inputnode = pe.Node( - niu.IdentityInterface(fields=['custom_confounds', 'mni_to_t1w', 't1w', 't1seg']), + niu.IdentityInterface( + fields=[ + 'custom_confounds', + 'mni_to_t1w', + 't1w', + 't1seg', + 'dataset_links', + ], + ), name='inputnode', ) inputnode.inputs.custom_confounds = custom_confounds inputnode.inputs.t1w = t1w inputnode.inputs.t1seg = t1wseg inputnode.inputs.mni_to_t1w = mni_to_t1w + inputnode.inputs.dataset_links = dataset_links workflow = Workflow(name=name) @@ -380,6 +392,7 @@ def init_subject_wf( output_dir=output_dir, t1w_to_mni=t1w_to_mni, input_type=input_type, + dataset_links=dataset_links, mem_gb=5) # RF: need to chnage memory size # send t1w and t1seg to anatomical workflow @@ -417,6 +430,7 @@ def init_subject_wf( fd_thresh=fd_thresh, output_dir=output_dir, mni_to_t1w=mni_to_t1w, + dataset_links=dataset_links, name=f"{'cifti' if cifti else 'nifti'}_postprocess_{i_run}_wf", ) diff --git a/xcp_d/workflow/bold.py b/xcp_d/workflow/bold.py index 15691f8f6..a784c5543 100644 --- a/xcp_d/workflow/bold.py +++ b/xcp_d/workflow/bold.py @@ -60,6 +60,7 @@ def init_boldpostprocess_wf( n_runs, mni_to_t1w, despike, + dataset_links, layout=None, name='bold_postprocess_wf', ): @@ -125,6 +126,7 @@ def init_boldpostprocess_wf( If True, run 3dDespike from AFNI layout : BIDSLayout object BIDS dataset layout + dataset_links %(name)s Inputs @@ -235,24 +237,6 @@ def init_boldpostprocess_wf( inputnode.inputs.custom_confounds = str(custom_confounds) inputnode.inputs.fmriprep_confounds_tsv = str(confounds_tsv) - outputnode = pe.Node( - niu.IdentityInterface( - fields=[ - 'processed_bold', - 'smoothed_bold', - 'alff_out', - 'smoothed_alff', - 'reho_out', - 'atlas_names', - 'timeseries', - 'correlations', - 'qc_file', - 'fd', - ], - ), - name='outputnode', - ) - mem_gbx = _create_mem_gb(bold_file) fcon_ts_wf = init_nifti_functional_connectivity_wf( @@ -263,7 +247,7 @@ def init_boldpostprocess_wf( omp_nthreads=omp_nthreads, ) - alff_compute_wf = init_compute_alff_wf(mem_gb=mem_gbx['timeseries'], + compute_alff_wf = init_compute_alff_wf(mem_gb=mem_gbx['timeseries'], TR=TR, lowpass=upper_bpf, highpass=lower_bpf, @@ -272,7 +256,7 @@ def init_boldpostprocess_wf( name="compute_alff_wf", omp_nthreads=omp_nthreads) - reho_compute_wf = init_3d_reho_wf(mem_gb=mem_gbx['timeseries'], + compute_reho_wf = init_3d_reho_wf(mem_gb=mem_gbx['timeseries'], name="afni_reho_wf", omp_nthreads=omp_nthreads) @@ -307,27 +291,27 @@ def init_boldpostprocess_wf( name="resd_smoothing_wf", omp_nthreads=omp_nthreads) - filtering_wf = pe.Node( + bandpass_filter_bold = pe.Node( FilteringData( TR=TR, lowpass=upper_bpf, highpass=lower_bpf, filter_order=bpf_order, bandpass_filter=bandpass_filter), - name="filtering_wf", + name="bandpass_filter_bold", mem_gb=mem_gbx['timeseries'], n_procs=omp_nthreads) - regression_wf = pe.Node( + denoise_bold = pe.Node( Regress(TR=TR, original_file=bold_file), - name="regression_wf", + name="denoise_bold", mem_gb=mem_gbx['timeseries'], n_procs=omp_nthreads) - interpolate_wf = pe.Node( + interpolate_censored_volumes = pe.Node( Interpolate(TR=TR), - name="interpolation_wf", + name="interpolate_censored_volumes", mem_gb=mem_gbx['timeseries'], n_procs=omp_nthreads) @@ -415,8 +399,7 @@ def init_boldpostprocess_wf( # Remove TR first: if dummytime > 0: rm_dummytime = pe.Node( - RemoveTR(initial_volumes_to_drop=initial_volumes_to_drop, - custom_confounds=custom_confounds), + RemoveTR(initial_volumes_to_drop=initial_volumes_to_drop), name="remove_dummy_time", mem_gb=0.1 * mem_gbx['timeseries']) workflow.connect([ @@ -458,105 +441,94 @@ def init_boldpostprocess_wf( workflow.connect([(censor_scrub, despike3d, [('bold_censored', 'in_file')])]) # Censor Scrub: workflow.connect([ - (despike3d, regression_wf, [ - ('out_file', 'in_file')]), - (inputnode, regression_wf, [('bold_mask', 'mask')]), - (censor_scrub, regression_wf, - [('fmriprep_confounds_censored', 'confounds'), - ('custom_confounds_censored', 'custom_confounds')])]) + (despike3d, denoise_bold, [('out_file', 'in_file')]), + (inputnode, denoise_bold, [('bold_mask', 'mask')]), + (censor_scrub, denoise_bold, [('fmriprep_confounds_censored', 'confounds'), + ('custom_confounds_censored', 'custom_confounds')]), + ]) else: # If we don't despike # regression workflow - workflow.connect([(inputnode, regression_wf, [('bold_mask', 'mask')]), - (censor_scrub, regression_wf, - [('bold_censored', 'in_file'), - ('fmriprep_confounds_censored', 'confounds'), - ('custom_confounds_censored', 'custom_confounds')])]) + workflow.connect([ + (inputnode, denoise_bold, [('bold_mask', 'mask')]), + (censor_scrub, denoise_bold, [('bold_censored', 'in_file'), + ('fmriprep_confounds_censored', 'confounds'), + ('custom_confounds_censored', 'custom_confounds')]), + ]) # interpolation workflow workflow.connect([ - (inputnode, interpolate_wf, [('bold_file', 'bold_file'), - ('bold_mask', 'mask_file')]), - (censor_scrub, interpolate_wf, [('tmask', 'tmask')]), - (regression_wf, interpolate_wf, [('res_file', 'in_file')]) + (inputnode, interpolate_censored_volumes, [('bold_file', 'bold_file'), + ('bold_mask', 'mask_file')]), + (censor_scrub, interpolate_censored_volumes, [('tmask', 'tmask')]), + (denoise_bold, interpolate_censored_volumes, [('res_file', 'in_file')]), ]) # add filtering workflow - workflow.connect([(inputnode, filtering_wf, [('bold_mask', 'mask')]), - (interpolate_wf, filtering_wf, [('bold_interpolated', - 'in_file')])]) + workflow.connect([ + (inputnode, bandpass_filter_bold, [('bold_mask', 'mask')]), + (interpolate_censored_volumes, bandpass_filter_bold, [('bold_interpolated', 'in_file')]), + ]) # residual smoothing - workflow.connect([(filtering_wf, resdsmoothing_wf, - [('filtered_file', 'inputnode.bold_file')])]) + workflow.connect([ + (bandpass_filter_bold, resdsmoothing_wf, [('filtered_file', 'inputnode.bold_file')]), + ]) # functional connect workflow workflow.connect([ (inputnode, fcon_ts_wf, [('bold_file', 'inputnode.bold_file'), ('ref_file', 'inputnode.ref_file')]), - (filtering_wf, fcon_ts_wf, [('filtered_file', 'inputnode.clean_bold')]) + (bandpass_filter_bold, fcon_ts_wf, [('filtered_file', 'inputnode.clean_bold')]), ]) # reho and alff workflow.connect([ - (inputnode, alff_compute_wf, [('bold_mask', 'inputnode.bold_mask')]), - (inputnode, reho_compute_wf, [('bold_mask', 'inputnode.bold_mask')]), - (filtering_wf, alff_compute_wf, [('filtered_file', 'inputnode.clean_bold') - ]), - (filtering_wf, reho_compute_wf, [('filtered_file', 'inputnode.clean_bold') - ]), + (inputnode, compute_alff_wf, [('bold_mask', 'inputnode.bold_mask')]), + (inputnode, compute_reho_wf, [('bold_mask', 'inputnode.bold_mask')]), + (bandpass_filter_bold, compute_alff_wf, [('filtered_file', 'inputnode.clean_bold')]), + (bandpass_filter_bold, compute_reho_wf, [('filtered_file', 'inputnode.clean_bold')]), ]) # qc report workflow.connect([ (inputnode, qcreport, [('bold_mask', 'mask_file')]), - (filtering_wf, qcreport, [('filtered_file', 'cleaned_file')]), + (bandpass_filter_bold, qcreport, [('filtered_file', 'cleaned_file')]), (censor_scrub, qcreport, [('tmask', 'tmask')]), (inputnode, resample_parc, [('ref_file', 'reference_image')]), (resample_parc, qcreport, [('output_image', 'seg_file')]), (resample_bold2T1w, qcreport, [('output_image', 'bold2T1w_mask')]), (resample_bold2MNI, qcreport, [('output_image', 'bold2temp_mask')]), - (qcreport, outputnode, [('qc_file', 'qc_file')]) - ]) - - # write to the outputnode, may be use in future - workflow.connect([ - (filtering_wf, outputnode, [('filtered_file', 'processed_bold')]), - (censor_scrub, outputnode, [('fd_timeseries', 'fd')]), - (resdsmoothing_wf, outputnode, [('outputnode.smoothed_bold', - 'smoothed_bold')]), - (alff_compute_wf, outputnode, [('outputnode.alff_out', 'alff_out'), - ('outputnode.smoothed_alff', - 'smoothed_alff')]), - (reho_compute_wf, outputnode, [('outputnode.reho_out', 'reho_out')]), - (fcon_ts_wf, outputnode, [('outputnode.atlas_names', 'atlas_names'), - ('outputnode.correlations', 'correlations'), - ('outputnode.timeseries', 'timeseries')]), ]) # write derivatives workflow.connect([ - (filtering_wf, write_derivative_wf, [('filtered_file', - 'inputnode.processed_bold')]), - (resdsmoothing_wf, write_derivative_wf, [('outputnode.smoothed_bold', - 'inputnode.smoothed_bold')]), - (censor_scrub, write_derivative_wf, [('fd_timeseries', - 'inputnode.fd')]), - (alff_compute_wf, write_derivative_wf, - [('outputnode.alff_out', 'inputnode.alff_out'), - ('outputnode.smoothed_alff', 'inputnode.smoothed_alff')]), - (reho_compute_wf, write_derivative_wf, [('outputnode.reho_out', - 'inputnode.reho_out')]), - (fcon_ts_wf, write_derivative_wf, [('outputnode.atlas_names', 'inputnode.atlas_names'), - ('outputnode.correlations', 'inputnode.correlations'), - ('outputnode.timeseries', 'inputnode.timeseries')]), + (bandpass_filter_bold, write_derivative_wf, [ + ('filtered_file', 'inputnode.processed_bold'), + ]), + (resdsmoothing_wf, write_derivative_wf, [ + ('outputnode.smoothed_bold', 'inputnode.smoothed_bold'), + ]), + (censor_scrub, write_derivative_wf, [('fd_timeseries', 'inputnode.fd')]), + (compute_alff_wf, write_derivative_wf, [ + ('outputnode.alff_out', 'inputnode.alff_out'), + ('outputnode.smoothed_alff', 'inputnode.smoothed_alff'), + ]), + (compute_reho_wf, write_derivative_wf, [('outputnode.reho_out', 'inputnode.reho_out')]), + (fcon_ts_wf, write_derivative_wf, [ + ('outputnode.atlas_names', 'inputnode.atlas_names'), + ('outputnode.correlations', 'inputnode.correlations'), + ('outputnode.timeseries', 'inputnode.timeseries'), + ]), (qcreport, write_derivative_wf, [('qc_file', 'inputnode.qc_file')]), ]) - functional_qc = pe.Node(FunctionalSummary(bold_file=bold_file, TR=TR), - name='qcsummary', - run_without_submitting=False, - mem_gb=mem_gbx['timeseries']) + functional_qc = pe.Node( + FunctionalSummary(bold_file=bold_file, TR=TR), + name='qcsummary', + run_without_submitting=False, + mem_gb=mem_gbx['timeseries'], + ) ds_report_qualitycontrol = pe.Node(DerivativesDataSink( base_directory=output_dir, @@ -610,22 +582,21 @@ def init_boldpostprocess_wf( (qcreport, functional_qc, [('qc_file', 'qc_file')]), (functional_qc, ds_report_qualitycontrol, [('out_report', 'in_file')]), (fcon_ts_wf, ds_report_connectivity, [('outputnode.connectplot', 'in_file')]), - (reho_compute_wf, ds_report_rehoplot, [('outputnode.rehohtml', 'in_file')]), - (alff_compute_wf, ds_report_afniplot, [('outputnode.alffhtml', 'in_file')]), + (compute_reho_wf, ds_report_rehoplot, [('outputnode.rehohtml', 'in_file')]), + (compute_alff_wf, ds_report_afniplot, [('outputnode.alffhtml', 'in_file')]), ]) - # exexetive summary workflow + # executive summary workflow workflow.connect([ (inputnode, executivesummary_wf, [('t1w', 'inputnode.t1w'), ('t1seg', 'inputnode.t1seg'), ('bold_file', 'inputnode.bold_file'), ('bold_mask', 'inputnode.mask')]), - (regression_wf, executivesummary_wf, [('res_file', 'inputnode.regressed_data') - ]), - (filtering_wf, executivesummary_wf, [('filtered_file', - 'inputnode.residual_data')]), - (censor_scrub, executivesummary_wf, [('fd_timeseries', - 'inputnode.fd')]), + (denoise_bold, executivesummary_wf, [('res_file', 'inputnode.regressed_data')]), + (bandpass_filter_bold, executivesummary_wf, [ + ('filtered_file', 'inputnode.residual_data'), + ]), + (censor_scrub, executivesummary_wf, [('fd_timeseries', 'inputnode.fd')]), ]) return workflow diff --git a/xcp_d/workflow/cifti.py b/xcp_d/workflow/cifti.py index 2e09d2511..bdd01307c 100644 --- a/xcp_d/workflow/cifti.py +++ b/xcp_d/workflow/cifti.py @@ -27,7 +27,7 @@ from xcp_d.workflow.postprocessing import init_resd_smoothing from xcp_d.workflow.restingstate import init_compute_alff_wf, init_surface_reho_wf -LOGGER = logging.getLogger('nipype.workflow') +LOGGER = logging.getLogger("nipype.workflow") @fill_doc @@ -115,6 +115,7 @@ def init_ciftipostprocess_wf( n_runs layout : BIDSLayout object BIDS dataset layout + dataset_links %(name)s Default is 'cifti_postprocess_wf'. @@ -209,25 +210,6 @@ def init_ciftipostprocess_wf( inputnode.inputs.bold_file = bold_file inputnode.inputs.fmriprep_confounds_tsv = confounds_tsv - outputnode = pe.Node( - niu.IdentityInterface( - fields=[ - 'processed_bold', - 'smoothed_bold', - 'alff_out', - 'smoothed_alff', - 'reho_lh', - 'reho_rh', - 'atlas_names', - 'timeseries', - 'correlations', - 'qc_file', - 'fd' - ], - ), - name='outputnode', - ) - mem_gbx = _create_mem_gb(bold_file) fcon_ts_wf = init_cifti_functional_connectivity_wf( @@ -235,7 +217,7 @@ def init_ciftipostprocess_wf( name='cifti_ts_con_wf', omp_nthreads=omp_nthreads) - alff_compute_wf = init_compute_alff_wf( + compute_alff_wf = init_compute_alff_wf( mem_gb=mem_gbx['timeseries'], TR=TR, lowpass=upper_bpf, @@ -245,7 +227,7 @@ def init_ciftipostprocess_wf( name="compute_alff_wf", omp_nthreads=omp_nthreads) - reho_compute_wf = init_surface_reho_wf( + compute_reho_wf = init_surface_reho_wf( mem_gb=mem_gbx['timeseries'], name="surface_reho_wf", omp_nthreads=omp_nthreads) @@ -282,25 +264,25 @@ def init_ciftipostprocess_wf( name="resd_smoothing_wf", omp_nthreads=omp_nthreads) - filtering_wf = pe.Node( + bandpass_filter_bold = pe.Node( FilteringData( TR=TR, lowpass=upper_bpf, highpass=lower_bpf, filter_order=bpf_order, bandpass_filter=bandpass_filter), - name="filtering_wf", + name="bandpass_filter_bold", mem_gb=mem_gbx['timeseries'], n_procs=omp_nthreads) - regression_wf = pe.Node( + denoise_bold = pe.Node( Regress(TR=TR, original_file=bold_file), - name="regression_wf", + name="denoise_bold", mem_gb=mem_gbx['timeseries'], n_procs=omp_nthreads) - interpolate_wf = pe.Node( + interpolate_censored_volumes = pe.Node( Interpolate(TR=TR), name="interpolation_wf", mem_gb=mem_gbx['timeseries'], @@ -361,87 +343,83 @@ def init_ciftipostprocess_wf( workflow.connect([(censor_scrub, despike3d, [('bold_censored', 'in_file')])]) # Censor Scrub: workflow.connect([ - (despike3d, regression_wf, [ + (despike3d, denoise_bold, [ ('des_file', 'in_file')]), - (censor_scrub, regression_wf, + (censor_scrub, denoise_bold, [('fmriprep_confounds_censored', 'confounds'), ('custom_confounds_censored', 'custom_confounds')])]) else: # If we don't despike # regression workflow - workflow.connect([(censor_scrub, regression_wf, + workflow.connect([(censor_scrub, denoise_bold, [('bold_censored', 'in_file'), ('fmriprep_confounds_censored', 'confounds'), ('custom_confounds_censored', 'custom_confounds')])]) # interpolation workflow workflow.connect([ - (inputnode, interpolate_wf, [('bold_file', 'bold_file')]), - (censor_scrub, interpolate_wf, [('tmask', 'tmask')]), - (regression_wf, interpolate_wf, [('res_file', 'in_file')]) + (inputnode, interpolate_censored_volumes, [('bold_file', 'bold_file')]), + (censor_scrub, interpolate_censored_volumes, [('tmask', 'tmask')]), + (denoise_bold, interpolate_censored_volumes, [('res_file', 'in_file')]) ]) # add filtering workflow - workflow.connect([(interpolate_wf, filtering_wf, [('bold_interpolated', - 'in_file')])]) + workflow.connect([ + (interpolate_censored_volumes, bandpass_filter_bold, [('bold_interpolated', 'in_file')]), + ]) # residual smoothing - workflow.connect([(filtering_wf, resdsmoothing_wf, - [('filtered_file', 'inputnode.bold_file')])]) + workflow.connect([ + (bandpass_filter_bold, resdsmoothing_wf, [('filtered_file', 'inputnode.bold_file')]), + ]) # functional connect workflow - workflow.connect([(filtering_wf, fcon_ts_wf, [('filtered_file', 'inputnode.clean_bold')])]) + workflow.connect([ + (bandpass_filter_bold, fcon_ts_wf, [('filtered_file', 'inputnode.clean_bold')]), + ]) # reho and alff - workflow.connect([(filtering_wf, alff_compute_wf, - [('filtered_file', 'inputnode.clean_bold')]), - (filtering_wf, reho_compute_wf, - [('filtered_file', 'inputnode.clean_bold')])]) - - # qc report workflow.connect([ - (filtering_wf, qcreport, [('filtered_file', 'cleaned_file')]), - (censor_scrub, qcreport, [('tmask', 'tmask')]), - (qcreport, outputnode, [('qc_file', 'qc_file')]) + (bandpass_filter_bold, compute_alff_wf, [('filtered_file', 'inputnode.clean_bold')]), + (bandpass_filter_bold, compute_reho_wf, [('filtered_file', 'inputnode.clean_bold')]), ]) + # qc report workflow.connect([ - (filtering_wf, outputnode, [('filtered_file', 'processed_bold')]), - (censor_scrub, outputnode, [('fd_timeseries', 'fd')]), - (resdsmoothing_wf, outputnode, [('outputnode.smoothed_bold', - 'smoothed_bold')]), - (alff_compute_wf, outputnode, [('outputnode.alff_out', 'alff_out')]), - (reho_compute_wf, outputnode, [('outputnode.lh_reho', 'reho_lh'), - ('outputnode.rh_reho', 'reho_rh')]), - (fcon_ts_wf, outputnode, [('outputnode.atlas_names', 'atlas_names'), - ('outputnode.correlations', 'correlations'), - ('outputnode.timeseries', 'timeseries')]), + (bandpass_filter_bold, qcreport, [('filtered_file', 'cleaned_file')]), + (censor_scrub, qcreport, [('tmask', 'tmask')]), ]) # write derivatives workflow.connect([ - (filtering_wf, write_derivative_wf, [('filtered_file', - 'inputnode.processed_bold')]), - (resdsmoothing_wf, write_derivative_wf, [('outputnode.smoothed_bold', - 'inputnode.smoothed_bold')]), - (censor_scrub, write_derivative_wf, [('fd_timeseries', - 'inputnode.fd')]), - (alff_compute_wf, write_derivative_wf, - [('outputnode.alff_out', 'inputnode.alff_out'), - ('outputnode.smoothed_alff', 'inputnode.smoothed_alff')]), - (reho_compute_wf, write_derivative_wf, - [('outputnode.rh_reho', 'inputnode.reho_rh'), - ('outputnode.lh_reho', 'inputnode.reho_lh')]), - (fcon_ts_wf, write_derivative_wf, - [('outputnode.atlas_names', 'inputnode.atlas_names'), - ('outputnode.correlations', 'inputnode.correlations'), - ('outputnode.timeseries', 'inputnode.timeseries')]), + (bandpass_filter_bold, write_derivative_wf, [ + ('filtered_file', 'inputnode.processed_bold'), + ]), + (resdsmoothing_wf, write_derivative_wf, [ + ('outputnode.smoothed_bold', 'inputnode.smoothed_bold'), + ]), + (censor_scrub, write_derivative_wf, [('fd_timeseries', 'inputnode.fd')]), + (compute_alff_wf, write_derivative_wf, [ + ('outputnode.alff_out', 'inputnode.alff_out'), + ('outputnode.smoothed_alff', 'inputnode.smoothed_alff'), + ]), + (compute_reho_wf, write_derivative_wf, [ + ('outputnode.rh_reho', 'inputnode.reho_rh'), + ('outputnode.lh_reho', 'inputnode.reho_lh'), + ]), + (fcon_ts_wf, write_derivative_wf, [ + ('outputnode.atlas_names', 'inputnode.atlas_names'), + ('outputnode.correlations', 'inputnode.correlations'), + ('outputnode.timeseries', 'inputnode.timeseries'), + ]), (qcreport, write_derivative_wf, [('qc_file', 'inputnode.qc_file')]) ]) - functional_qc = pe.Node(FunctionalSummary(bold_file=bold_file, TR=TR), - name='qcsummary', - run_without_submitting=True) + functional_qc = pe.Node( + FunctionalSummary(bold_file=bold_file, TR=TR), + name='qcsummary', + run_without_submitting=True, + ) ds_report_qualitycontrol = pe.Node(DerivativesDataSink( base_directory=output_dir, @@ -483,18 +461,16 @@ def init_ciftipostprocess_wf( (fcon_ts_wf, ds_report_connectivity, [('outputnode.connectplot', "in_file")]) ]) - # exexetive summary workflow + # executive summary workflow workflow.connect([ (inputnode, executivesummary_wf, [('t1w', 'inputnode.t1w'), ('t1seg', 'inputnode.t1seg'), - ('bold_file', 'inputnode.bold_file') - ]), - (regression_wf, executivesummary_wf, [('res_file', 'inputnode.regressed_data') - ]), - (filtering_wf, executivesummary_wf, [('filtered_file', - 'inputnode.residual_data')]), - (censor_scrub, executivesummary_wf, [('fd_timeseries', - 'inputnode.fd')]), + ('bold_file', 'inputnode.bold_file')]), + (denoise_bold, executivesummary_wf, [('res_file', 'inputnode.regressed_data')]), + (bandpass_filter_bold, executivesummary_wf, [ + ('filtered_file', 'inputnode.residual_data'), + ]), + (censor_scrub, executivesummary_wf, [('fd_timeseries', 'inputnode.fd')]), ]) return workflow diff --git a/xcp_d/workflow/outputs.py b/xcp_d/workflow/outputs.py index 85d6174f6..5595965af 100644 --- a/xcp_d/workflow/outputs.py +++ b/xcp_d/workflow/outputs.py @@ -121,7 +121,7 @@ def init_writederivatives_wf( smoothed_data_dictionary = {'FWHM': smoothing} # Separate dictionary for smoothing # Write out detivatives via DerivativesDataSink if not cifti: # if Nifti - write_derivative_cleandata_wf = pe.Node( + ds_denoised_bold = pe.Node( DerivativesDataSink( base_directory=output_dir, meta_dict=cleaned_data_dictionary, @@ -130,12 +130,12 @@ def init_writederivatives_wf( extension='.nii.gz', compression=True, ), - name='write_derivative_cleandata_wf', + name='ds_denoised_bold', run_without_submitting=True, mem_gb=2, ) - write_derivative_alff_wf = pe.Node( + ds_alff = pe.Node( DerivativesDataSink( base_directory=output_dir, source_file=bold_file, @@ -144,12 +144,12 @@ def init_writederivatives_wf( extension='.nii.gz', compression=True, ), - name='write_derivative_alff_wf', + name='ds_alff', run_without_submitting=True, mem_gb=1, ) - write_derivative_qcfile_wf = pe.Node( + ds_qc_file = pe.Node( DerivativesDataSink( base_directory=output_dir, source_file=bold_file, @@ -157,12 +157,12 @@ def init_writederivatives_wf( suffix='qc', extension='.csv', ), - name='write_derivative_qcfile_wf', + name='ds_qc_file', run_without_submitting=True, mem_gb=1, ) - timeseries_wf = pe.MapNode( + ds_timeseries = pe.MapNode( DerivativesDataSink( base_directory=output_dir, source_file=bold_file, @@ -170,12 +170,12 @@ def init_writederivatives_wf( suffix='timeseries', extension=".tsv", ), - name="timeseries_wf", + name="ds_timeseries", run_without_submitting=True, mem_gb=1, iterfield=["atlas", "in_file"], ) - correlations_wf = pe.MapNode( + ds_conmats = pe.MapNode( DerivativesDataSink( base_directory=output_dir, source_file=bold_file, @@ -184,13 +184,13 @@ def init_writederivatives_wf( suffix='conmat', extension=".tsv", ), - name="correlations_wf", + name="ds_conmats", run_without_submitting=True, mem_gb=1, iterfield=["atlas", "in_file"], ) - write_derivative_reho_wf = pe.Node( + ds_reho = pe.Node( DerivativesDataSink( base_directory=output_dir, source_file=bold_file, @@ -199,12 +199,12 @@ def init_writederivatives_wf( extension='.nii.gz', compression=True, ), - name='write_derivative_reho_wf', + name='ds_reho', run_without_submitting=True, mem_gb=1, ) - write_derivative_fd_wf = pe.Node( + ds_fd_motion = pe.Node( DerivativesDataSink( base_directory=output_dir, source_file=bold_file, @@ -213,16 +213,16 @@ def init_writederivatives_wf( suffix="motion", extension='.tsv', ), - name='write_derivative_fd_wf', + name='ds_fd_motion', run_without_submitting=True, mem_gb=1, ) - workflow.connect([(inputnode, write_derivative_reho_wf, [('reho_out', 'in_file')])]) + workflow.connect([(inputnode, ds_reho, [('reho_out', 'in_file')])]) if smoothing: # if smoothed # Write out detivatives via DerivativesDataSink - write_derivative_smoothcleandata_wf = pe.Node( + ds_denoised_smoothed_bold = pe.Node( DerivativesDataSink( base_directory=output_dir, meta_dict=smoothed_data_dictionary, @@ -231,12 +231,12 @@ def init_writederivatives_wf( extension='.nii.gz', compression=True, ), - name='write_derivative_smoothcleandata_wf', + name='ds_denoised_smoothed_bold', run_without_submitting=True, mem_gb=2, ) - write_derivative_smoothalff_wf = pe.Node( + ds_smoothed_alff = pe.Node( DerivativesDataSink( base_directory=output_dir, meta_dict=smoothed_data_dictionary, @@ -246,14 +246,14 @@ def init_writederivatives_wf( extension='.nii.gz', compression=True, ), - name='write_derivative_smoothalff_wf', + name='ds_smoothed_alff', run_without_submitting=True, mem_gb=1, ) else: # For cifti files # Write out derivatives via DerivativesDataSink - write_derivative_cleandata_wf = pe.Node( + ds_denoised_bold = pe.Node( DerivativesDataSink( base_directory=output_dir, meta_dict=cleaned_data_dictionary, @@ -263,12 +263,12 @@ def init_writederivatives_wf( density='91k', extension='.dtseries.nii', ), - name='write_derivative_cleandata_wf', + name='ds_denoised_bold', run_without_submitting=True, mem_gb=2, ) - write_derivative_alff_wf = pe.Node( + ds_alff = pe.Node( DerivativesDataSink( base_directory=output_dir, source_file=bold_file, @@ -278,12 +278,12 @@ def init_writederivatives_wf( suffix='alff', extension='.dscalar.nii', ), - name='write_derivative_alff_wf', + name='ds_alff', run_without_submitting=True, mem_gb=1, ) - write_derivative_qcfile_wf = pe.Node( + ds_qc_file = pe.Node( DerivativesDataSink( base_directory=output_dir, source_file=bold_file, @@ -292,12 +292,12 @@ def init_writederivatives_wf( suffix='qc', extension='.csv', ), - name='write_derivative_qcfile_wf', + name='ds_qc_file', run_without_submitting=True, mem_gb=1, ) - timeseries_wf = pe.MapNode( + ds_timeseries = pe.MapNode( DerivativesDataSink( base_directory=output_dir, source_file=bold_file, @@ -307,13 +307,13 @@ def init_writederivatives_wf( suffix="timeseries", extension='.ptseries.nii', ), - name="timeseries_wf", + name="ds_timeseries", run_without_submitting=True, mem_gb=1, iterfield=["atlas", "in_file"], ) - correlations_wf = pe.MapNode( + ds_conmats = pe.MapNode( DerivativesDataSink( base_directory=output_dir, source_file=bold_file, @@ -324,13 +324,13 @@ def init_writederivatives_wf( suffix='conmat', extension='.pconn.nii', ), - name="correlations_wf", + name="ds_conmats", run_without_submitting=True, mem_gb=1, iterfield=["atlas", "in_file"], ) - write_derivative_reholh_wf = pe.Node( + ds_reho_lh = pe.Node( DerivativesDataSink( base_directory=output_dir, source_file=bold_file, @@ -341,12 +341,12 @@ def init_writederivatives_wf( suffix='reho', extension='.shape.gii', ), - name='write_derivative_reholh_wf', + name='ds_reho_lh', run_without_submitting=True, mem_gb=1, ) - write_derivative_rehorh_wf = pe.Node( + ds_reho_rh = pe.Node( DerivativesDataSink( base_directory=output_dir, source_file=bold_file, @@ -357,12 +357,12 @@ def init_writederivatives_wf( suffix='reho', extension='.shape.gii', ), - name='write_derivative_rehorh_wf', + name='ds_reho_rh', run_without_submitting=True, mem_gb=1, ) - write_derivative_fd_wf = pe.Node( + ds_fd_motion = pe.Node( DerivativesDataSink( base_directory=output_dir, source_file=bold_file, @@ -371,19 +371,19 @@ def init_writederivatives_wf( suffix="motion", extension='.tsv', ), - name='write_derivative_fd_wf', + name='ds_fd_motion', run_without_submitting=True, mem_gb=1, ) workflow.connect([ - (inputnode, write_derivative_reholh_wf, [('reho_lh', 'in_file')]), - (inputnode, write_derivative_rehorh_wf, [('reho_rh', 'in_file')]), + (inputnode, ds_reho_lh, [('reho_lh', 'in_file')]), + (inputnode, ds_reho_rh, [('reho_rh', 'in_file')]), ]) if smoothing: # If smoothed # Write out detivatives via DerivativesDataSink - write_derivative_smoothcleandata_wf = pe.Node( + ds_denoised_smoothed_bold = pe.Node( DerivativesDataSink( base_directory=output_dir, meta_dict=smoothed_data_dictionary, @@ -394,12 +394,12 @@ def init_writederivatives_wf( extension='.dtseries.nii', check_hdr=False, ), - name='write_derivative_smoothcleandata_wf', + name='ds_denoised_smoothed_bold', run_without_submitting=True, mem_gb=2, ) - write_derivative_smoothalff_wf = pe.Node( + ds_smoothed_alff = pe.Node( DerivativesDataSink( base_directory=output_dir, meta_dict=smoothed_data_dictionary, @@ -411,24 +411,24 @@ def init_writederivatives_wf( extension='.dscalar.nii', check_hdr=False, ), - name='write_derivative_smoothalff_wf', + name='ds_smoothed_alff', run_without_submitting=True, mem_gb=1, ) workflow.connect([ - (inputnode, write_derivative_cleandata_wf, [('processed_bold', 'in_file')]), - (inputnode, write_derivative_alff_wf, [('alff_out', 'in_file')]), - (inputnode, write_derivative_qcfile_wf, [('qc_file', 'in_file')]), - (inputnode, timeseries_wf, [('timeseries', 'in_file'), ('atlas_names', 'atlas')]), - (inputnode, correlations_wf, [('correlations', 'in_file'), ('atlas_names', 'atlas')]), - (inputnode, write_derivative_fd_wf, [('fd', 'in_file')]), + (inputnode, ds_denoised_bold, [('processed_bold', 'in_file')]), + (inputnode, ds_alff, [('alff_out', 'in_file')]), + (inputnode, ds_qc_file, [('qc_file', 'in_file')]), + (inputnode, ds_timeseries, [('timeseries', 'in_file'), ('atlas_names', 'atlas')]), + (inputnode, ds_conmats, [('correlations', 'in_file'), ('atlas_names', 'atlas')]), + (inputnode, ds_fd_motion, [('fd', 'in_file')]), ]) if smoothing: workflow.connect([ - (inputnode, write_derivative_smoothcleandata_wf, [('smoothed_bold', 'in_file')]), - (inputnode, write_derivative_smoothalff_wf, [('smoothed_alff', 'in_file')]), + (inputnode, ds_denoised_smoothed_bold, [('smoothed_bold', 'in_file')]), + (inputnode, ds_smoothed_alff, [('smoothed_alff', 'in_file')]), ]) return workflow