From adbeb3ad3b8765027024c19893c2dcac65ca563a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Tourbier?= Date: Mon, 31 Jan 2022 11:18:15 +0100 Subject: [PATCH] ENH: Create main() and check_and_return_valid_nb_of_cores() in run.py --- run.py | 686 ++++++++++++++++++++++++++++++--------------------------- 1 file changed, 359 insertions(+), 327 deletions(-) diff --git a/run.py b/run.py index 3a3ab30cf..9a666d71a 100644 --- a/run.py +++ b/run.py @@ -120,6 +120,7 @@ def create_cmp_command(project, run_anat, run_dmri, run_fmri, number_of_threads= cmd.append(project.anat_config_file) else: print_error(" .. ERROR: anatomical pipeline is mandatory") + return 1 if run_dmri: cmd.append("--dwi_pipeline_config") @@ -229,7 +230,7 @@ def clean_cache(bids_root, debug=False): remove_files(os.path.join(bids_root, '.X99*')) -def run(command, env={}, log_filename={}): +def run(command, env=None, log_filename=None): """Execute a command via `subprocess.Popen`. Parameters @@ -249,343 +250,374 @@ def run(command, env={}, log_filename={}): A `subprocess.Popen` process """ merged_env = os.environ - merged_env.update(env) - - with open(log_filename, 'w+') as log: - process = subprocess.Popen(command, stdout=log, - stderr=log, shell=True, + if env is not None: # pragma: no cover + merged_env.update(env) + if log_filename is not None: + with open(log_filename, 'w+') as log: + process = subprocess.Popen(command, stdout=log, + stderr=log, shell=True, + env=merged_env) + else: # pragma: no cover + process = subprocess.Popen(command, shell=True, env=merged_env) - return process -# Parse script arguments -cmp_parser = parser.get() -args = cmp_parser.parse_args() - -print('> BIDS dataset: {}'.format(args.bids_dir)) - -# if not args.skip_bids_validator: -# run('bids-validator %s'%args.bids_dir) - -subjects_to_analyze = [] -# only for a subset of subjects -if args.participant_label: - subjects_to_analyze = args.participant_label -# for all subjects -else: - subject_dirs = glob(os.path.join(args.bids_dir, "sub-*")) - subjects_to_analyze = [subject_dir.split( - "-")[-1] for subject_dir in subject_dirs] - -print("> Subjects to analyze : {}".format(subjects_to_analyze)) - -# Derivatives directory creation if it does not exist -derivatives_dir = os.path.abspath(args.output_dir) -if not os.path.isdir(derivatives_dir): - os.makedirs(derivatives_dir) - -tools = [__cmp_directory__, __freesurfer_directory__, __nipype_directory__] - -for tool in tools: - tool_dir = os.path.join(args.output_dir, tool) - if not os.path.isdir(tool_dir): - os.makedirs(tool_dir) - -# Make sure freesurfer is happy with the license -print('> Set $FS_LICENSE which points to FreeSurfer license location (BIDS App)') - -if os.access(os.path.join('/bids_dir', 'code', 'license.txt'), os.F_OK): - os.environ['FS_LICENSE'] = os.path.join('/bids_dir', 'code', 'license.txt') -elif args.fs_license: - os.environ['FS_LICENSE'] = os.path.abspath(args.fs_license) -else: - print_error(" .. ERROR: Missing license.txt in code/ directory OR unspecified Freesurfer license with the option --fs_license ") - sys.exit(1) - -print(' .. INFO: $FS_LICENSE set to {}'.format(os.environ['FS_LICENSE'])) - -# Get the number of available cores and keep one for light processes if possible -max_number_of_cores = multiprocessing.cpu_count() - 1 - -# handles case with one CPU available -if max_number_of_cores < 1: - max_number_of_cores = 1 - -# Setup number of subjects to be processed in parallel -if args.number_of_participants_processed_in_parallel is not None: - parallel_number_of_subjects = int( - args.number_of_participants_processed_in_parallel) - if parallel_number_of_subjects > max_number_of_cores: - print( - ' * Number of subjects to be processed in parallel set to the maximal number of available cores ({})'.format( - max_number_of_cores)) - print( - BColors.WARNING + - ' .. WARNING: the specified number of subjects to be processed in parallel ({})'.format(args.number_of_participants_processed_in_parallel) + - ' exceeds the number of available cores ({})'.format(max_number_of_cores) + - BColors.ENDC) - parallel_number_of_subjects = max_number_of_cores - elif parallel_number_of_subjects <= 0: - print( - ' * Number of subjects to be processed in parallel set to one (sequential run)') - print( - BColors.WARNING + - ' .. WARNING: the specified number of subjects to be processed in parallel ({}) '.format(args.number_of_participants_processed_in_parallel) + - 'should be greater to 0' + BColors.ENDC) - parallel_number_of_subjects = 1 - else: - print(' * Number of subjects to be processed in parallel set to {} (Total of cores available: {})'.format( - parallel_number_of_subjects, max_number_of_cores)) -else: - print(' * Number of subjects to be processed in parallel set to one (sequential run)') - parallel_number_of_subjects = 1 - -# Setup number of threads to be used for multithreading by OpenMP -if args.number_of_threads is not None: - number_of_threads = int(args.number_of_threads) - if parallel_number_of_subjects == 1: - if number_of_threads > max_number_of_cores: - print(' * Number of parallel threads set to the maximal number of available cores ({})'.format( - max_number_of_cores)) +def check_and_return_valid_nb_of_cores(args): + """Function that checks and returns a valid number of subjects to be processed and a maximal number of threads. + + Parameters + ---------- + args : dict + Dictionary containing the parser argument + + Returns + ------- + parallel_number_of_subjects : int + Valid number of subject to be processed in parallel + + number_of_threads : int + Valid number of maximal threads in parallel for a particular subject + + """ + # Get the number of available cores and keep one for light processes if possible + max_number_of_cores = multiprocessing.cpu_count() - 1 + + # handles case with one CPU available + if max_number_of_cores < 1: + max_number_of_cores = 1 + + # Setup number of subjects to be processed in parallel + if args.number_of_participants_processed_in_parallel is not None: + parallel_number_of_subjects = int( + args.number_of_participants_processed_in_parallel) + if parallel_number_of_subjects > max_number_of_cores: + print( + ' * Number of subjects to be processed in parallel set to the maximal ' + + f'number of available cores ({max_number_of_cores})') + print( + BColors.WARNING + + ' .. WARNING: the specified number of subjects to be processed in parallel ({})'.format( + args.number_of_participants_processed_in_parallel) + + f' exceeds the number of available cores ({max_number_of_cores})' + + BColors.ENDC) + parallel_number_of_subjects = max_number_of_cores + elif parallel_number_of_subjects <= 0: + print( + ' * Number of subjects to be processed in parallel set to one (sequential run)') print(BColors.WARNING + - ' .. WARNING: the specified number of pipeline processes executed in parallel ({}) '.format(args.number_of_threads) + - 'exceeds the number of available cores ({})'.format(max_number_of_cores) + - BColors.ENDC) - number_of_threads = max_number_of_cores - elif number_of_threads <= 0: - print(' * Number of parallel threads set to one (total of cores: {})'.format(max_number_of_cores)) - print(BColors.WARNING + ' .. WARNING: The specified of pipeline processes executed in parallel ({}) '.format(args.number_of_threads) + + ' .. WARNING: the specified number of subjects to be processed in parallel' + + f' ({args.number_of_participants_processed_in_parallel}) ' + 'should be greater to 0' + BColors.ENDC) - number_of_threads = 1 + parallel_number_of_subjects = 1 else: - print(' * Number of parallel threads set to {} (total of cores: {})'.format( - number_of_threads, max_number_of_cores)) + print(' * Number of subjects to be processed in parallel set to ' + + f'{parallel_number_of_subjects} (Total of cores available: {max_number_of_cores})') else: - # Make sure that the total number of threads used does not exceed the total number of available cores - # Otherwise parallelize only at the subject level - total_number_of_threads = parallel_number_of_subjects * number_of_threads - if total_number_of_threads > max_number_of_cores: - print(BColors.WARNING + - ' * Total number of cores used ' + - '(Subjects in parallel: {}, Threads in parallel: {}, Total: {})'.format(parallel_number_of_subjects, - number_of_threads, - total_number_of_threads) + - 'is greater than the number of available cores ({})'.format(max_number_of_cores) + BColors.ENDC) - number_of_threads = 1 - parallel_number_of_subjects = max_number_of_cores - print(BColors.WARNING + - ' .. WARNING: Processing will be ONLY parallelized at the subject level using {} cores.'.format(parallel_number_of_subjects) + - BColors.ENDC) -else: - print(' * Number of parallel threads set to one (total of cores: {})'.format(max_number_of_cores)) - number_of_threads = 1 - -# Set number of threads used by programs based on OpenMP multi-threading library -# This includes AFNI, Dipy, Freesurfer, FSL, MRtrix3. -# os.environ.update(OMP_NUM_THREADS=f'{number_of_threads}') -# print(' * OMP_NUM_THREADS set to {} (total of cores: {})'.format(os.environ['OMP_NUM_THREADS'], max_number_of_cores)) - -# Set number of threads used by ANTs if specified. -# Otherwise use the same as the number of OpenMP threads -if args.ants_number_of_threads is not None: - os.environ['ITK_GLOBAL_DEFAULT_NUMBER_OF_THREADS'] = f'{args.ants_number_of_threads}' - print(f' * ITK_GLOBAL_DEFAULT_NUMBER_OF_THREADS set to {os.environ["ITK_GLOBAL_DEFAULT_NUMBER_OF_THREADS"]}') - -# Initialize random generator for enhanced reproducibility -# Numpy needs to be imported after setting the different multi-threading environment variable -# See https://stackoverflow.com/questions/30791550/limit-number-of-threads-in-numpy for more details -# noinspection PyPep8 -numpy.random.seed(1234) - -# Set random generator seed of MRtrix if specified -if args.mrtrix_random_seed is not None: - os.environ['MRTRIX_RNG_SEED'] = f'{args.mrtrix_random_seed}' - print(f' * MRTRIX_RNG_SEED set to {os.environ["MRTRIX_RNG_SEED"]}') - -# Set random generator seed of ANTs if specified -if args.ants_random_seed is not None: - os.environ['ANTS_RANDOM_SEED'] = f'{args.ants_random_seed}' - print(f' * ANTS_RANDOM_SEED set to {os.environ["ANTS_RANDOM_SEED"]}') - -# TODO: Implement log for subject(_session) -# with open(log_filename, 'w+') as log: -# proc = Popen(cmd, stdout=log, stderr=log, cwd=os.path.join(self.bids_root,'derivatives')) - -# running participant level -if args.analysis_level == "participant": - - # report_app_run_to_google_analytics() - if args.notrack is not True: - report_usage('BIDS App', 'Run', __version__) - - maxprocs = parallel_number_of_subjects - processes = [] - - # find all T1s and skullstrip them - for subject_label in subjects_to_analyze: - - project = ProjectInfo() - project.base_directory = args.bids_dir - project.output_directory = args.output_dir - - project.subjects = ['sub-{}'.format(label) - for label in subjects_to_analyze] - project.subject = 'sub-{}'.format(subject_label) - print('> Process subject {}'.format(project.subject)) - - if args.session_label is not None: - print("> Sessions specified by input args : {}".format( - args.session_label)) - subject_session_labels = args.session_label - project.subject_sessions = [ - 'ses-{}'.format(subject_session_label) for subject_session_label in subject_session_labels - ] - # Check if session exists - for session in project.subject_sessions: - session_path = os.path.join( - args.bids_dir, project.subject, session) - if not os.path.exists(session_path): - print_error(f' .. ERROR: The directory {session_path} corresponding ' - f'to the session {session.split("-")[-1]} ' - "specified by --session_label input flag DOES NOT exist.") - sys.exit(1) - else: - print(f' .. INFO: The directory {session_path} corresponding ' - f'to the session {session.split("-")[-1]} ' - 'specified by --session_label input flag DOES exist.') - else: - # Check if multiple session (sub-XX/ses-YY/anat/... structure or sub-XX/anat.. structure?) - subject_session_dirs = glob(os.path.join( - args.bids_dir, project.subject, "ses-*")) - project.subject_sessions = [ - 'ses-{}'.format(subject_session_dir.split("-")[-1]) for subject_session_dir in subject_session_dirs - ] - - if len(project.subject_sessions) > 0: # Session structure - print("> Sessions to analyze : {}".format(project.subject_sessions)) + print(' * Number of subjects to be processed in parallel set to one (sequential run)') + parallel_number_of_subjects = 1 + + # Setup number of threads to be used for multithreading by OpenMP + if args.number_of_threads is not None: + number_of_threads = int(args.number_of_threads) + if parallel_number_of_subjects == 1: + if number_of_threads > max_number_of_cores: + print(' * Number of parallel threads set to the maximal ' + + f'number of available cores ({max_number_of_cores})') + print(BColors.WARNING + + ' .. WARNING: the specified number of pipeline processes ' + + f'executed in parallel ({args.number_of_threads}) ' + + f'exceeds the number of available cores ({max_number_of_cores})' + + BColors.ENDC) + number_of_threads = max_number_of_cores + elif number_of_threads <= 0: + print(f' * Number of parallel threads set to one (total of cores: {max_number_of_cores})') + print(BColors.WARNING + + f' .. WARNING: The specified of pipeline processes executed in parallel ({args.number_of_threads}) ' + + 'should be greater to 0' + BColors.ENDC) + number_of_threads = 1 + else: + print(f' * Number of parallel threads set to {number_of_threads} (total of cores: {max_number_of_cores})') else: - project.subject_sessions = [''] - - for session in project.subject_sessions: - - if not args.coverage: - while len(processes) == maxprocs: - manage_processes(processes) - - if session != "": - print('> Process session {}'.format(session)) - - project.subject_session = session - - # Derivatives folder creation - for tool in tools: - if project.subject_session == "": - derivatives_dir = os.path.join(args.output_dir, tool, project.subject) - elif project.subject_session != "" and tool == __freesurfer_directory__: - derivatives_dir = os.path.join(args.output_dir, tool, - f'{project.subject}_{project.subject_session}') - elif project.subject_session != "" and tool != __freesurfer_directory__: - derivatives_dir = os.path.join(args.output_dir, tool, - project.subject, project.subject_session) - if not os.path.isdir(derivatives_dir): - os.makedirs(derivatives_dir) - - run_anat = False - run_dmri = False - run_fmri = False - - if args.anat_pipeline_config is not None: - if check_configuration_format(args.anat_pipeline_config) == '.ini': - anat_pipeline_config = convert_config_ini_2_json(args.anat_pipeline_config) - else: - anat_pipeline_config = args.anat_pipeline_config - project.anat_config_file = create_subject_configuration_from_ref( - project, anat_pipeline_config, 'anatomical' - ) - run_anat = True - print(f" ... Anatomical config created : {project.anat_config_file}") - if args.dwi_pipeline_config is not None: - if check_configuration_format(args.dwi_pipeline_config) == '.ini': - dwi_pipeline_config = convert_config_ini_2_json(args.dwi_pipeline_config) - else: - dwi_pipeline_config = args.dwi_pipeline_config - project.dmri_config_file = create_subject_configuration_from_ref( - project, dwi_pipeline_config, 'diffusion' - ) - run_dmri = True - print(f" ... Diffusion config created : {project.dmri_config_file}") - if args.func_pipeline_config is not None: - if check_configuration_format(args.func_pipeline_config) == '.ini': - func_pipeline_config = convert_config_ini_2_json(args.func_pipeline_config) - else: - func_pipeline_config = args.func_pipeline_config - project.fmri_config_file = create_subject_configuration_from_ref( - project, func_pipeline_config, 'fMRI' - ) - run_fmri = True - print(f" ... fMRI config created : {project.fmri_config_file}") + # Make sure that the total number of threads used does not exceed the total number of available cores + # Otherwise parallelize only at the subject level + total_number_of_threads = parallel_number_of_subjects * number_of_threads + if total_number_of_threads > max_number_of_cores: + print(BColors.WARNING + + ' * Total number of cores used ' + + f'(Subjects in parallel: {parallel_number_of_subjects}, ' + + f'Threads in parallel: {number_of_threads}, ' + + f'Total: {total_number_of_threads})' + + f'is greater than the number of available cores ({max_number_of_cores})' + + BColors.ENDC) + number_of_threads = 1 + parallel_number_of_subjects = max_number_of_cores + print(BColors.WARNING + + ' .. WARNING: Processing will be ONLY parallelized at the subject level ' + + f'using {parallel_number_of_subjects} cores.' + + BColors.ENDC) + else: + print(f' * Number of parallel threads set to one (total of cores: {max_number_of_cores})') + number_of_threads = 1 - if args.anat_pipeline_config is not None: - print(" .. INFO: Running pipelines : ") - print(" - Anatomical MRI (segmentation and parcellation)") + return parallel_number_of_subjects, number_of_threads - if args.dwi_pipeline_config is not None: - print(" - Diffusion MRI (structural connectivity matrices)") - if args.func_pipeline_config is not None: - print(" - fMRI (functional connectivity matrices)") - - if args.coverage: - if run_anat: - run_individual(project.base_directory, - project.output_directory, - project.subject, - project.subject_session, - anat_pipeline_config=project.anat_config_file, - dwi_pipeline_config=(None - if not run_dmri - else project.dmri_config_file), - func_pipeline_config=(None - if not run_fmri - else project.fmri_config_file), - number_of_threads=number_of_threads) - else: - cmd = create_cmp_command(project=project, - run_anat=run_anat, - run_dmri=run_dmri, - run_fmri=run_fmri, - number_of_threads=number_of_threads) - print_blue("... cmd : {}".format(cmd)) - if project.subject_session != "": - log_file = '{}_{}_log.txt'.format(project.subject, - project.subject_session) +def main(): + """Main function of the BIDS App entrypoint script.""" + # Parse script arguments + cmp_parser = parser.get() + args = cmp_parser.parse_args() + + print('> BIDS dataset: {}'.format(args.bids_dir)) + + # if not args.skip_bids_validator: + # run('bids-validator %s'%args.bids_dir) + + if args.participant_label: # only for a subset of subjects + subjects_to_analyze = args.participant_label + else: # for all subjects + subject_dirs = glob(os.path.join(args.bids_dir, "sub-*")) + subjects_to_analyze = [subject_dir.split( + "-")[-1] for subject_dir in subject_dirs] + + print("> Subjects to analyze : {}".format(subjects_to_analyze)) + + # Derivatives directory creation if it does not exist + derivatives_dir = os.path.abspath(args.output_dir) + if not os.path.isdir(derivatives_dir): + os.makedirs(derivatives_dir) + + tools = [__cmp_directory__, __freesurfer_directory__, __nipype_directory__] + + for tool in tools: + tool_dir = os.path.join(args.output_dir, tool) + if not os.path.isdir(tool_dir): + os.makedirs(tool_dir) + + # Make sure freesurfer is happy with the license + print('> Set $FS_LICENSE which points to FreeSurfer license location (BIDS App)') + + if os.access(os.path.join('/bids_dir', 'code', 'license.txt'), os.F_OK): + os.environ['FS_LICENSE'] = os.path.join('/bids_dir', 'code', 'license.txt') + elif args.fs_license: + os.environ['FS_LICENSE'] = os.path.abspath(args.fs_license) + else: + print_error(" .. ERROR: Missing license.txt in code/ directory OR unspecified Freesurfer license with the option --fs_license ") + return 1 + + print(' .. INFO: $FS_LICENSE set to {}'.format(os.environ['FS_LICENSE'])) + + parallel_number_of_subjects, number_of_threads = check_and_return_valid_nb_of_cores(args) + + # Set number of threads used by programs based on OpenMP multi-threading library + # This includes AFNI, Dipy, Freesurfer, FSL, MRtrix3. + # os.environ.update(OMP_NUM_THREADS=f'{number_of_threads}') + # print(' * OMP_NUM_THREADS set to {} (total of cores: {})'.format(os.environ['OMP_NUM_THREADS'], max_number_of_cores)) + + # Set number of threads used by ANTs if specified. + # Otherwise use the same as the number of OpenMP threads + if args.ants_number_of_threads is not None: + os.environ['ITK_GLOBAL_DEFAULT_NUMBER_OF_THREADS'] = f'{args.ants_number_of_threads}' + print(f' * ITK_GLOBAL_DEFAULT_NUMBER_OF_THREADS set to {os.environ["ITK_GLOBAL_DEFAULT_NUMBER_OF_THREADS"]}') + + # Initialize random generator for enhanced reproducibility + # Numpy needs to be imported after setting the different multi-threading environment variable + # See https://stackoverflow.com/questions/30791550/limit-number-of-threads-in-numpy for more details + # noinspection PyPep8 + numpy.random.seed(1234) + + # Set random generator seed of MRtrix if specified + if args.mrtrix_random_seed is not None: + os.environ['MRTRIX_RNG_SEED'] = f'{args.mrtrix_random_seed}' + print(f' * MRTRIX_RNG_SEED set to {os.environ["MRTRIX_RNG_SEED"]}') + + # Set random generator seed of ANTs if specified + if args.ants_random_seed is not None: + os.environ['ANTS_RANDOM_SEED'] = f'{args.ants_random_seed}' + print(f' * ANTS_RANDOM_SEED set to {os.environ["ANTS_RANDOM_SEED"]}') + + # running participant level + if args.analysis_level == "participant": + + # report_app_run_to_google_analytics() + if args.notrack is not True: + report_usage('BIDS App', 'Run', __version__) + + maxprocs = parallel_number_of_subjects + processes = [] + + # find all T1s and skullstrip them + for subject_label in subjects_to_analyze: + + project = ProjectInfo() + project.base_directory = args.bids_dir + project.output_directory = args.output_dir + + project.subjects = ['sub-{}'.format(label) + for label in subjects_to_analyze] + project.subject = 'sub-{}'.format(subject_label) + print('> Process subject {}'.format(project.subject)) + + if args.session_label is not None: + print("> Sessions specified by input args : {}".format( + args.session_label)) + subject_session_labels = args.session_label + project.subject_sessions = [ + 'ses-{}'.format(subject_session_label) for subject_session_label in subject_session_labels + ] + # Check if session exists + for session in project.subject_sessions: + session_path = os.path.join( + args.bids_dir, project.subject, session) + if not os.path.exists(session_path): + print_error(f' .. ERROR: The directory {session_path} corresponding ' + f'to the session {session.split("-")[-1]} ' + "specified by --session_label input flag DOES NOT exist.") + return 1 else: - log_file = '{}_log.txt'.format(project.subject) - proc = run(command=cmd, env={}, - log_filename=os.path.join(project.output_directory, __cmp_directory__, - project.subject, project.subject_session, - '{}_{}_log.txt'.format(project.subject, - project.subject_session))) - processes.append(proc) + print(f' .. INFO: The directory {session_path} corresponding ' + f'to the session {session.split("-")[-1]} ' + 'specified by --session_label input flag DOES exist.') else: - print("... Error: at least anatomical configuration file " - "has to be specified (--anat_pipeline_config)") - - if not args.coverage: - while len(processes) > 0: - manage_processes(processes) - - clean_cache(args.bids_dir) - -# running group level; ultimately it will compute average connectivity matrices -# elif args.analysis_level == "group": -# brain_sizes = [] -# for subject_label in subjects_to_analyze: -# for brain_file in glob(os.path.join(args.output_dir, "sub-%s*.nii*"%subject_label)): -# data = nibabel.load(brain_file).get_data() -# # calcualte average mask size in voxels -# brain_sizes.append((data != 0).sum()) -# -# with open(os.path.join(args.output_dir, "avg_brain_size.txt"), 'w') as fp: -# fp.write("Average brain size is %g voxels"%numpy.array(brain_sizes).mean()) + # Check if multiple session (sub-XX/ses-YY/anat/... structure or sub-XX/anat.. structure?) + subject_session_dirs = glob(os.path.join( + args.bids_dir, project.subject, "ses-*")) + project.subject_sessions = [ + 'ses-{}'.format(subject_session_dir.split("-")[-1]) for subject_session_dir in subject_session_dirs + ] + + if len(project.subject_sessions) > 0: # Session structure + print("> Sessions to analyze : {}".format(project.subject_sessions)) + else: + project.subject_sessions = [''] + + for session in project.subject_sessions: + + if not args.coverage: + while len(processes) == maxprocs: + manage_processes(processes) + + if session != "": + print('> Process session {}'.format(session)) + + project.subject_session = session + + # Derivatives folder creation + for tool in tools: + if project.subject_session == "": + derivatives_dir = os.path.join(args.output_dir, tool, project.subject) + elif project.subject_session != "" and tool == __freesurfer_directory__: + derivatives_dir = os.path.join(args.output_dir, tool, + f'{project.subject}_{project.subject_session}') + elif project.subject_session != "" and tool != __freesurfer_directory__: + derivatives_dir = os.path.join(args.output_dir, tool, + project.subject, project.subject_session) + if not os.path.isdir(derivatives_dir): + os.makedirs(derivatives_dir) + + run_anat = False + run_dmri = False + run_fmri = False + + if args.anat_pipeline_config is not None: + if check_configuration_format(args.anat_pipeline_config) == '.ini': + anat_pipeline_config = convert_config_ini_2_json(args.anat_pipeline_config) + else: + anat_pipeline_config = args.anat_pipeline_config + project.anat_config_file = create_subject_configuration_from_ref( + project, anat_pipeline_config, 'anatomical' + ) + run_anat = True + print(f"\t ... Anatomical config created : {project.anat_config_file}") + if args.dwi_pipeline_config is not None: + if check_configuration_format(args.dwi_pipeline_config) == '.ini': + dwi_pipeline_config = convert_config_ini_2_json(args.dwi_pipeline_config) + else: + dwi_pipeline_config = args.dwi_pipeline_config + project.dmri_config_file = create_subject_configuration_from_ref( + project, dwi_pipeline_config, 'diffusion' + ) + run_dmri = True + print(f"\t ... Diffusion config created : {project.dmri_config_file}") + if args.func_pipeline_config is not None: + if check_configuration_format(args.func_pipeline_config) == '.ini': + func_pipeline_config = convert_config_ini_2_json(args.func_pipeline_config) + else: + func_pipeline_config = args.func_pipeline_config + project.fmri_config_file = create_subject_configuration_from_ref( + project, func_pipeline_config, 'fMRI' + ) + run_fmri = True + print(f"\t ... fMRI config created : {project.fmri_config_file}") + + if args.anat_pipeline_config is not None: + print(" .. INFO: Running pipelines : ") + print("\t\t- Anatomical MRI (segmentation and parcellation)") + + if args.dwi_pipeline_config is not None: + print("\t\t- Diffusion MRI (structural connectivity matrices)") + + if args.func_pipeline_config is not None: + print("\t\t- fMRI (functional connectivity matrices)") + + if args.coverage: + if run_anat: + run_individual(project.base_directory, + project.output_directory, + project.subject, + project.subject_session, + anat_pipeline_config=project.anat_config_file, + dwi_pipeline_config=(None + if not run_dmri + else project.dmri_config_file), + func_pipeline_config=(None + if not run_fmri + else project.fmri_config_file), + number_of_threads=number_of_threads) + else: + cmd = create_cmp_command(project=project, + run_anat=run_anat, + run_dmri=run_dmri, + run_fmri=run_fmri, + number_of_threads=number_of_threads) + print_blue("... cmd : {}".format(cmd)) + if project.subject_session != "": + log_file = '{}_{}_log.txt'.format(project.subject, + project.subject_session) + else: + log_file = '{}_log.txt'.format(project.subject) + proc = run(command=cmd, env={}, + log_filename=os.path.join(project.output_directory, __cmp_directory__, + project.subject, project.subject_session, + log_file) + ) + processes.append(proc) + else: + print("... Error: at least anatomical configuration file " + "has to be specified (--anat_pipeline_config)") + return 1 + + if not args.coverage: + while len(processes) > 0: + manage_processes(processes) + + clean_cache(args.bids_dir) + + # running group level; ultimately it will compute average connectivity matrices + # elif args.analysis_level == "group": + # brain_sizes = [] + # for subject_label in subjects_to_analyze: + # for brain_file in glob(os.path.join(args.output_dir, "sub-%s*.nii*"%subject_label)): + # data = nibabel.load(brain_file).get_data() + # # calcualte average mask size in voxels + # brain_sizes.append((data != 0).sum()) + # + # with open(os.path.join(args.output_dir, "avg_brain_size.txt"), 'w') as fp: + # fp.write("Average brain size is %g voxels"%numpy.array(brain_sizes).mean()) + + return 1 + + +if __name__ == '__main__': + sys.exit(main())