Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track provenance of derivatives #568

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 63 additions & 1 deletion xcp_d/utils/bids.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,11 +306,27 @@ def write_dataset_description(fmri_dir, xcpd_dir):
Path to the BIDS derivative dataset being ingested.
xcpd_dir : str
Path to the output xcp-d dataset.

Returns
-------
usable_dataset_links : :obj:`dict`
A dictionary containing any dataset links that should be used in xcpd derivatives.
The keys to this dictionary are standardized strings to be referenced in xcpd code.
The values are 2-element tuples, with the first element being the DatasetLink key
(which may vary depending on pre-existing DatasetLinks) and the second element
being the path to be replaced in the derivative references.

For example, ``{"preprocessed": ("preprocessed10", "../fmriprep/")}`` might be used to
change ``Sources: ["/path/to/derivatives/fmriprep/sub-01/thing.nii.gz"]``
(a meaningless path outside the user's filesystem)
to ``Sources: ["bids:preprocessed10:sub-01/thing.nii.gz"]`` (a valid BIDS URI).
"""
import json

from xcp_d.__about__ import DOWNLOAD_URL, __version__

MINIMUM_BIDS_VERSION = "1.8.0"

orig_dset_description = os.path.join(fmri_dir, "dataset_description.json")
if not os.path.isfile(orig_dset_description):
dset_desc = {}
Expand All @@ -321,6 +337,14 @@ def write_dataset_description(fmri_dir, xcpd_dir):

assert dset_desc["DatasetType"] == "derivative"

# xcp-d uses BIDS URIs, which require BIDS version >= 1.8.0
orig_bids_version = dset_desc.get("BIDSVersion", MINIMUM_BIDS_VERSION)
if Version(orig_bids_version).public < Version(MINIMUM_BIDS_VERSION).public:
LOGGER.warning(
f"Updating BIDSVersion from {orig_bids_version} to {MINIMUM_BIDS_VERSION}."
)
dset_desc["BIDSVersion"] = MINIMUM_BIDS_VERSION

# Update dataset description
dset_desc["Name"] = "XCP-D: A Robust Postprocessing Pipeline of fMRI data"
generated_by = dset_desc.get("GeneratedBy", [])
Expand All @@ -335,15 +359,53 @@ def write_dataset_description(fmri_dir, xcpd_dir):
dset_desc["GeneratedBy"] = generated_by
dset_desc["HowToAcknowledge"] = "Include the generated boilerplate in the methods section."

# Define a dataset link to preprocessed derivatives in BIDS URIs throughout xcpd derivatives.
dataset_links = dset_desc.get("DatasetLinks", {})
relative_path = os.path.relpath(fmri_dir, start=xcpd_dir)

# If undefined or already set to the same path, use "preprocessed"
if dataset_links.get("preprocessed", relative_path) == relative_path:
usable_dataset_links = {"preprocessed": ("preprocessed", relative_path)}
else:
LOGGER.info(
"DatasetLink 'preprocessed' is already defined. "
"Will use 'preprocessed([0-9]+)' instead."
)

# Find an unused (or pre-set) preprocessed([0-9]+) key to use.
key_counter = 0
while dataset_links.get(f"preprocessed{key_counter}", relative_path) != relative_path:
key_counter += 1

usable_dataset_links = {"preprocessed": (f"preprocessed{key_counter}", relative_path)}

dataset_links.update(
{usable_dataset_links["preprocessed"][0]: usable_dataset_links["preprocessed"][1]}
)

xcpd_dset_description = os.path.join(xcpd_dir, "dataset_description.json")
if os.path.isfile(xcpd_dset_description):
with open(xcpd_dset_description, "r") as fo:
old_dset_desc = json.load(fo)

old_version = old_dset_desc["GeneratedBy"][0]["Version"]
if Version(__version__).public != Version(old_version).public:
LOGGER.warning(f"Previous output generated by version {old_version} found.")
LOGGER.error(f"Previous output generated by version {old_version} found.")

else:
with open(xcpd_dset_description, "w") as fo:
json.dump(dset_desc, fo, indent=4, sort_keys=True)

return usable_dataset_links


def resolve_bids_uri(filepath, xcpd_dir, dataset_links):
"""Resolve BIDS URI."""
dataset_name, xcpd_dir_relpath = dataset_links['preprocessed']

preproc_dir = os.path.abspath(os.path.join(xcpd_dir, xcpd_dir_relpath))
filepath_relative = os.path.relpath(filepath, start=preproc_dir)

bids_uri = f"bids:{dataset_name}:{filepath_relative}"

return bids_uri
18 changes: 16 additions & 2 deletions xcp_d/workflow/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def init_xcpd_wf(
xcpd_wf.base_dir = work_dir
print(f"Begin the {name} workflow")

write_dataset_description(fmri_dir, os.path.join(output_dir, "xcp_d"))
dataset_links = write_dataset_description(fmri_dir, os.path.join(output_dir, "xcp_d"))

for subject_id in subject_list:
single_subj_wf = init_subject_wf(
Expand Down Expand Up @@ -176,6 +176,7 @@ def init_xcpd_wf(
fd_thresh=fd_thresh,
func_only=func_only,
input_type=input_type,
dataset_links=dataset_links,
name=f"single_subject_{subject_id}_wf",
)

Expand Down Expand Up @@ -215,6 +216,7 @@ def init_subject_wf(
func_only,
output_dir,
input_type,
dataset_links,
name,
):
"""Organize the postprocessing pipeline for a single subject.
Expand Down Expand Up @@ -250,6 +252,7 @@ def init_subject_wf(
func_only=False,
output_dir=".",
input_type="fmriprep",
dataset_links={},
name="single_subject_sub-01_wf",
)

Expand Down Expand Up @@ -302,13 +305,22 @@ def init_subject_wf(
preproc_files = preproc_cifti_files if cifti else preproc_nifti_files

inputnode = pe.Node(
niu.IdentityInterface(fields=['custom_confounds', 'mni_to_t1w', 't1w', 't1seg']),
niu.IdentityInterface(
fields=[
'custom_confounds',
'mni_to_t1w',
't1w',
't1seg',
'dataset_links',
],
),
name='inputnode',
)
inputnode.inputs.custom_confounds = custom_confounds
inputnode.inputs.t1w = t1w
inputnode.inputs.t1seg = t1wseg
inputnode.inputs.mni_to_t1w = mni_to_t1w
inputnode.inputs.dataset_links = dataset_links

workflow = Workflow(name=name)

Expand Down Expand Up @@ -380,6 +392,7 @@ def init_subject_wf(
output_dir=output_dir,
t1w_to_mni=t1w_to_mni,
input_type=input_type,
dataset_links=dataset_links,
mem_gb=5) # RF: need to chnage memory size

# send t1w and t1seg to anatomical workflow
Expand Down Expand Up @@ -417,6 +430,7 @@ def init_subject_wf(
fd_thresh=fd_thresh,
output_dir=output_dir,
mni_to_t1w=mni_to_t1w,
dataset_links=dataset_links,
name=f"{'cifti' if cifti else 'nifti'}_postprocess_{i_run}_wf",
)

Expand Down
Loading