diff --git a/src/python/ensembl/tools/anno/transcriptomic_annotation/star.py b/src/python/ensembl/tools/anno/transcriptomic_annotation/star.py index b9d7f42..56c181a 100644 --- a/src/python/ensembl/tools/anno/transcriptomic_annotation/star.py +++ b/src/python/ensembl/tools/anno/transcriptomic_annotation/star.py @@ -33,7 +33,7 @@ import re import shutil import subprocess -from typing import List +from typing import Dict, List from ensembl.tools.anno.utils._utils import ( check_exe, @@ -43,7 +43,7 @@ ) -def run_star( +def run_star( # pylint:disable=too-many-branches genome_file: Path, output_dir: Path, short_read_fastq_dir: Path, @@ -51,6 +51,11 @@ def run_star( trim_fastq: bool = False, max_reads_per_sample: int = 0, max_intron_length: int = 100000, + subsample_read_limit: int = 100000000, + subsample_percentage: float = 0.25, + sampling_via_read_limit: bool = False, + sampling_via_percentage: bool = False, + sampling_via_read_limit_percentage: bool = False, num_threads: int = 1, star_bin: Path = Path("star"), samtools_bin: Path = Path("samtools"), @@ -72,6 +77,20 @@ def run_star( :type max_reads_per_sample: int, default 0 :param max_intron_length: The maximum intron size for alignments. Defaults to 100000. :type max_intron_length: int, default 100000 + :param subsample_read_limit: Maximum number of reads to subsample. Defaults to 100000000. + :type subsample_read_limit:int, default 100000000, + :param subsample_percentage: Maximun percentage of reads to subsample. + :type subsample_percentage: int, default 0.25, + :param sampling_via_read_limit: If True will subsample an input dataset of \ + fastq files using --subsample_read_limit value. + :type sampling_via_read_limit : boolean, False, + :param sampling_via_percentage: If True will subsample an input dataset of \ + fastq files using --subsample_percentage value. + :type sampling_via_percentage : boolean, False, + :param sampling_via_read_limit_percentage: If True will subsample an input dataset \ + of fastq files using --subsample_read_limit and --subsample_percentage value; \ + the lowest number of reads is taken. + :type sampling_via_read_limit_percentage : boolean, False, :param num_threads: Number of available threads. :type num_threads: int, default 1 :param star_bin: Software path. @@ -107,7 +126,7 @@ def run_star( return star_index_file = star_dir / "SAindex" - fastq_file_list = [] + fastq_file_list: List[Path] = [] file_types = ("*.fastq", "*.fq", "*.fastq.gz", "*.fq.gz") fastq_file_list = [ path for file_type in file_types for path in Path(short_read_fastq_dir).rglob(file_type) @@ -119,14 +138,26 @@ def run_star( # fastq_file_list.extend(glob.glob(os.path.join(short_read_fastq_dir, file_type))) # Get list of paired paths - fastq_file_list = _create_paired_paths(fastq_file_list) + paired_fastq_file_list = _create_paired_paths(fastq_file_list) # Subsamples in parallel if there's a value set if max_reads_per_sample: - subsample_transcriptomic_data(fastq_file_list) + paired_fastq_file_list_sub: List[List[Path]] = [] + for paired_fastq_files in paired_fastq_file_list: + paired_fastq_files_sub = subsample_transcriptomic_data( + paired_fastq_files, + subsample_read_limit, + subsample_percentage, + sampling_via_read_limit, + sampling_via_percentage, + sampling_via_read_limit_percentage, + num_threads, + ) + paired_fastq_file_list_sub.append(paired_fastq_files_sub) + paired_fastq_file_list = paired_fastq_file_list_sub # Get the list of the new subsampled files - fastq_file_list = [ - path for file_type in file_types for path in Path(short_read_fastq_dir).rglob(file_type) - ] + # fastq_file_list = [ + # path for file_type in file_types for path in Path(short_read_fastq_dir).rglob(file_type) + # ] # I don't think is needed # fastq_file_list = check_for_fastq_subsamples(fastq_file_list) @@ -159,23 +190,27 @@ def run_star( str(genome_file), ] ) - except Exception as e:#pylint:disable=broad-exception-caught + except Exception as e: # pylint:disable=broad-exception-caught logging.error("An error occurred while creating star index: %s", e) logging.info("Running Star on the files in the fastq dir") - for fastq_file in fastq_file_list: + for paired_files in paired_fastq_file_list: + first_file_name = paired_files[0].name # Get the name of the first file + match = re.search(r"(.+)_\d+\.(fastq|fq)", first_file_name) # Search for pattern + if match: + first_part_of_name = match.group(1) # logger.info(fastq_file_path) # fastq_file_name = os.path.basename(fastq_file_path) star_tmp_dir = star_dir / "tmp" if star_tmp_dir.exists(): shutil.rmtree(star_tmp_dir) - sam_file = Path(f"{star_dir}/{fastq_file.name}.sam") - junctions_file = Path(f"{star_dir}/{fastq_file.name}.sj.tab") + sam_file = Path(f"{star_dir}/{first_part_of_name}.sam") + junctions_file = Path(f"{star_dir}/{first_part_of_name}.sj.tab") sam_file_name = sam_file.name sam_temp_file = Path(f"{star_dir}/{sam_file_name}.tmp") bam_file = re.sub(".sam", ".bam", sam_file_name) bam_sort_file = Path(f"{star_dir}/{bam_file}") - log_out_file = Path(f"{star_dir}/{fastq_file.name}.Log.final.out") + log_out_file = Path(f"{star_dir}/{first_part_of_name}.Log.final.out") if log_out_file.exists() and bam_sort_file.exists() and bam_sort_file.stat().st_size != 0: logging.info( "Found an existing bam file for the fastq file, \ @@ -183,7 +218,9 @@ def run_star( ) continue - logging.info("Processing %s", fastq_file) + read_files_in = ",".join(str(fastq_file) for fastq_file in paired_files) + + logging.info("Processing %s", list(paired_files)) star_command = [ str(star_bin), "--outFilterIntronMotifs", @@ -199,7 +236,7 @@ def run_star( "--genomeDir", str(star_dir), "--readFilesIn", - str(fastq_file), + read_files_in, "--outFileNamePrefix", f"{star_dir}/", "--outTmpDir", @@ -212,7 +249,7 @@ def run_star( #'--outSJfilterIntronMaxVsReadN','5000','10000','25000','40000', #'50000','50000','50000','50000','50000','100000'] # check_compression = re.search(r".gz$", fastq_file) - if fastq_file.suffix.endswith(".gz"): + if Path(paired_files[0].name).suffix.endswith(".gz"): star_command.append("--readFilesCommand") star_command.append("gunzip") star_command.append("-c") @@ -238,7 +275,7 @@ def run_star( logging.info("Completed running STAR") -def _create_paired_paths(fastq_file_paths: List) -> List[Path]: +def _create_paired_paths(fastq_file_paths: List[Path]) -> List[List[Path]]: """ Create list of paired transcriptomic fastq files @@ -246,30 +283,28 @@ def _create_paired_paths(fastq_file_paths: List) -> List[Path]: fastq_file_paths (List): List of transcriptomic file paths. Returns: - List: List of paired transcriptomic files + List[List[Path]]: List of paired transcriptomic files """ - path_dict = {} - # final_list = [] + path_dict: Dict[str, List[Path]] = {} + final_list: List[List[Path]] = [] for fastq_file in fastq_file_paths: paired_name = re.search(r"(.+)_\d+\.(fastq|fq)", str(fastq_file)) if not paired_name: logging.exception( "Could not find _1 or _2 at the end of the prefix \ for file. Assuming file is not paired: %s", - fastq_file, + str(fastq_file), ) - # final_list.append([fastq_file]) - path_dict[fastq_file] = [fastq_file] + final_list.append([fastq_file]) continue run_accession = paired_name.group(1) if run_accession in path_dict: path_dict[run_accession].append(fastq_file) else: path_dict[run_accession] = [fastq_file] - # for pair in path_dict: - # final_list.append(path_dict[pair]) - logging.info([value for values_list in path_dict.values() for value in values_list]) - return [value for values_list in path_dict.values() for value in values_list] + for pair in path_dict: # pylint:disable=consider-using-dict-items + final_list.append(path_dict[pair]) + return final_list # pylint:disable=pointless-string-statement @@ -279,22 +314,108 @@ def _create_paired_paths(fastq_file_paths: List) -> List[Path]: """ -def _subsample_paired_fastq_files( +def subsample_transcriptomic_data( + fastq_file_list: List[Path], + subsample_read_limit: int = 100000000, + subsample_percentage: float = 0.25, + sampling_via_read_limit: bool = False, + sampling_via_percentage: bool = False, + sampling_via_read_limit_percentage: bool = True, + num_threads: int = 2, +) -> List[Path]: + """ + Subsample list of paired files. + Args: + fastq_file_list : Subsample paired fastq files. + subsample_read_limit : Maximum number of reads to subsample, default to 100000000. + subsample_percentage : Maximun percentage of reads to subsample, default to 0.25. + sampling_via_read_limit : If True will subsample an input dataset of fastq files \ + using --subsample_read_limit value. + sampling_via_percentage : If True will subsample an input dataset of fastq files \ + using --subsample_percentage value. + sampling_via_read_limit_percentage : If True will subsample an input dataset of \ + fastq files using --subsample_read_limit and --subsample_percentage value; \ + the lowest number of reads is taken. + num_threads : number of threads + Returns: + List[Path]: List of subsampled paired transcriptomic files + """ + subsampled_fastq_files: List[Path] = [] + # for fastq_files in fastq_file_list: + # fastq_file_1, fastq_file_2 = fastq_files + # fastq_file_pair = "" + # if len(fastq_files) == 2: + # fastq_file_pair = fastq_files[1] + + if len(fastq_file_list) == 1: + fastq_file_1 = fastq_file_list[0] + if Path(f"{fastq_file_1}.sub").exists(): + logging.info( + "Found an existing .sub file on the fastq path, will use that \ + instead. File:%s.sub", + fastq_file_1, + ) + else: + _subsample_paired_fastq_files( + fastq_file_list, + subsample_read_limit, + subsample_percentage, + sampling_via_read_limit, + sampling_via_percentage, + sampling_via_read_limit_percentage, + num_threads, + ) + subsampled_fastq_files = [Path(f"{fastq_file_1}.sub")] + if len(fastq_file_list) == 2: + fastq_file_1, fastq_file_2 = fastq_file_list + if Path(f"{fastq_file_1}.sub").exists() and Path(f"{fastq_file_2}.sub").exists(): + logging.info( + "Found an existing .sub files on the fastq path for both members of the pair, will use \ + those instead of subsampling again. Files: %s.sub,%s.sub", + fastq_file_1, + fastq_file_2, + ) + else: + _subsample_paired_fastq_files( + fastq_file_list, + subsample_read_limit, + subsample_percentage, + sampling_via_read_limit, + sampling_via_percentage, + sampling_via_read_limit_percentage, + num_threads, + ) + subsampled_fastq_files = [Path(f"{fastq_file_1}.sub"), Path(f"{fastq_file_2}.sub")] + return subsampled_fastq_files + + +def _subsample_paired_fastq_files( # pylint:disable=too-many-branches fastq_files: List[Path], subsample_read_limit: int = 100000000, + subsample_percentage: float = 0.25, + sampling_via_read_limit: bool = False, + sampling_via_percentage: bool = False, + sampling_via_read_limit_percentage: bool = True, num_threads: int = 2, - compressed: bool = False, ) -> None: """ Perform subsampling on two paired FastQ files in parallel using multiple threads. Args: fastq_files : Path for paired fastq files. - output_files : Path for the output file. - subsample_read_limit : Subsample size, defaults to 100000000. + subsample_read_limit : Maximum number of reads to subsample, default to 100000000. + subsample_percentage : Maximun percentage of reads to subsample, default to 0.25. + sampling_via_read_limit : If True will subsample an input dataset of fastq files \ + using --subsample_read_limit value. + sampling_via_percentage : If True will subsample an input dataset of fastq files \ + using --subsample_percentage value. + sampling_via_read_limit_percentage : If True will subsample an input dataset of \ + fastq files using --subsample_read_limit and --subsample_percentage value; \ + the lowest number of reads is taken. num_threads : Number of threads, defaults to 2. compressed : file compressed, defaults to False. """ + if len(fastq_files) == 2: fastq_file_1, fastq_file_2 = fastq_files output_file_1, output_file_2 = [Path(f"{fastq_file_1}.sub"), Path(f"{fastq_file_2}.sub")] @@ -308,42 +429,68 @@ def _subsample_paired_fastq_files( compressed = True num_lines = sum(1 for line in gzip.open(fastq_file_1)) # pylint:disable=consider-using-with else: + compressed = False num_lines = sum(1 for line in open(fastq_file_1)) # pylint:disable=consider-using-with range_limit = int(num_lines / 4) - if range_limit <= subsample_read_limit: + sampling_size = 0 + if sampling_via_read_limit and subsample_read_limit: + sampling_size = subsample_read_limit + elif sampling_via_percentage and subsample_percentage: + sampling_size = round(range_limit * subsample_percentage) + elif sampling_via_read_limit_percentage and subsample_percentage and subsample_read_limit: + sampling_size = min(subsample_read_limit, round(range_limit * subsample_percentage)) + + if range_limit <= sampling_size: logging.info( "Number of reads (%s is less than the max allowed read count (%s), \ no need to subsample", str(range_limit), - str(subsample_read_limit), + str(sampling_size), ) return - rand_list = random.sample(range(0, range_limit - 1), subsample_read_limit) + rand_list = random.sample(range(0, range_limit - 1), sampling_size) random_indices = {idx * 4: 1 for idx in rand_list} logging.info("Processing paired files in parallel") - pool = multiprocessing.Pool(int(num_threads)) # pylint:disable=consider-using-with - pool.apply_async( - _subsample_fastq_subset, - args=( + if num_threads >= 2: + pool = multiprocessing.Pool(int(num_threads)) # pylint:disable=consider-using-with + pool.apply_async( + _subsample_fastq_subset, + args=( + fastq_file_1, + output_file_1, + random_indices, + compressed, + ), + ) + if len(fastq_files) == 2: + pool.apply_async( + _subsample_fastq_subset, + args=( + fastq_file_2, + output_file_2, + random_indices, + compressed, + ), + ) + + pool.close() + pool.join() + else: + _subsample_fastq_subset( fastq_file_1, output_file_1, random_indices, compressed, - ), - ) - pool.apply_async( - _subsample_fastq_subset, - args=( - fastq_file_2, - output_file_2, - random_indices, - compressed, - ), - ) - pool.close() - pool.join() + ) + if len(fastq_files) == 2: + _subsample_fastq_subset( + fastq_file_2, + output_file_2, + random_indices, + compressed, + ) def _subsample_fastq_subset( @@ -351,7 +498,6 @@ def _subsample_fastq_subset( ) -> None: """ Selecting specific sets of four lines from an input FastQ file and writing them to an output file. - Args: fastq_file : Path for the fastq file. output_file : Path for the output file. @@ -371,43 +517,6 @@ def _subsample_fastq_subset( lines = [file_in.readline() for _ in range(4)] -def subsample_transcriptomic_data(fastq_file_list: List, num_threads: int = 2) -> None: - """ - Subsample paired fastq files. - - Args: - fastq_file_list : List of fastq file path to process. - num_threads : number of threads - """ - for fastq_files in fastq_file_list: - fastq_file_1, fastq_file_2 = fastq_files - # fastq_file_pair = "" - # if len(fastq_files) == 2: - # fastq_file_pair = fastq_files[1] - - if len(fastq_files) == 1: - fastq_file_1 = fastq_files[0] - if Path(f"{fastq_file_1}.sub").exists(): - logging.info( - "Found an existing .sub file on the fastq path, will use that instead. File:%s.sub", - fastq_file_1, - ) - else: - _subsample_paired_fastq_files(fastq_files, compressed=True, num_threads=num_threads) - - elif len(fastq_files) == 2: - fastq_file_1, fastq_file_2 = fastq_files - if Path(f"{fastq_file_1}.sub").exists() and Path(f"{fastq_file_2}.sub").exists(): - logging.info( - "Found an existing .sub files on the fastq path for both members of the pair, will use \ - those instead of subsampling again. Files: %s.sub,%s.sub", - fastq_file_1, - fastq_file_2, - ) - elif Path(f"{fastq_file_2}.sub").exists(): - _subsample_paired_fastq_files(fastq_files, compressed=True, num_threads=num_threads) - - def run_trimming( output_dir: Path, short_read_fastq_dir: Path, @@ -427,12 +536,12 @@ def run_trimming( check_exe(trim_galore_bin) trim_dir = create_dir(output_dir, "trim_galore_output") - fastq_file_list = [] + fastq_file_list: List[Path] = [] file_types = ("*.fastq", "*.fq", "*.fastq.gz", "*.fq.gz") fastq_file_list = [ path for file_type in file_types for path in Path(short_read_fastq_dir).rglob(file_type) ] - fastq_file_list = _create_paired_paths(fastq_file_list) + paired_fastq_file_list = _create_paired_paths(fastq_file_list) trim_galore_cmd = [ str(trim_galore_bin), @@ -446,9 +555,11 @@ def run_trimming( ] pool = multiprocessing.Pool(int(num_threads)) # pylint:disable=consider-using-with - for fastq_file in fastq_file_list: + for fastq_file in paired_fastq_file_list: + file1, file2 = fastq_file if delete_pre_trim_fastq: - fastq_file.unlink() + file1.unlink() + file2.unlink() pool.apply_async( multiprocess_trim_galore, args=( @@ -527,6 +638,75 @@ def parse_args(): parser.add_argument("--star_bin", default="STAR", help="Star software path") parser.add_argument("--samtools_bin", default="samtools", help="Samtools software path") parser.add_argument("--trim_galore_bin", default="trim_galore", help="Trim Galore software path") + parser.add_argument( + "--subsample_read_limit", + type=int, + default=100000000, + help="Maximum number of reads to subsample. Default 1 hundred million reads", + required=False, + ) + parser.add_argument( + "--subsample_percentage", + type=float, + default=0.25, + help="Maximun percentage of reads to subsample (0 to 1)", + required=False, + ) + parser.add_argument( + "--sampling_via_read_limit", + type=bool, + default=False, + help="If True will subsample an input dataset of fastq files using --subsample_read_limit value.", + required=False, + ) + parser.add_argument( + "--sampling_via_percentage", + type=bool, + default=False, + help="If True will subsample an input dataset of fastq files using --subsample_percentage value.", + required=False, + ) + parser.add_argument( + "--sampling_via_read_limit_percentage", + type=bool, + default=True, + help="If True will subsample an input dataset of fastq files using --subsample_read_limit \ + and --subsample_percentage value; the lowest number of reads is taken.", + required=False, + ) + parser.add_argument( + "--paired_file_1", + help="Path for single or paired fastq file; used when --run_subsampling \ + or --run_trimming are enabled.", + required=False, + ) + parser.add_argument( + "--paired_file_2", + help="Path for single or paired fastq file; used when --run_subsampling \ + or --run_trimming are enabled.", + required=False, + ) + parser.add_argument( + "--run_star", + type=bool, + default=True, + help="If True will run STAR alignment given an input dataset of fastq files.", + required=False, + ) + parser.add_argument( + "--run_subsampling", + type=bool, + default=False, + help="If True will subsample an input dataset of fastq files.", + required=False, + ) + parser.add_argument( + "--run_trimming", + type=bool, + default=False, + help="If True will trim input dataset of fastq files using TrimGalore suite.", + required=False, + ) return parser.parse_args() @@ -542,20 +722,43 @@ def main(): defaults={"logfilename": str(log_file_path)}, disable_existing_loggers=False, ) - - run_star( - args.genome_file, - args.output_dir, - args.short_read_fastq_dir, - args.delete_pre_trim_fastq, - args.trim_fastq, - args.max_reads_per_sample, - args.max_intron_length, - args.num_threads, - args.star_bin, - args.samtools_bin, - args.trim_galore_bin, - ) + if args.run_star: + run_star( + args.genome_file, + args.output_dir, + args.short_read_fastq_dir, + args.delete_pre_trim_fastq, + args.trim_fastq, + args.max_reads_per_sample, + args.max_intron_length, + args.subsample_read_limit, + args.subsample_percentage, + args.sampling_via_read_limit, + args.sampling_via_percentage, + args.sampling_via_read_limit_percentage, + args.num_threads, + args.star_bin, + args.samtools_bin, + args.trim_galore_bin, + ) + if args.run_subsampling: + fastq_file_list = [Path(args.paired_file_1), Path(args.paired_file_2)] + subsample_transcriptomic_data( + fastq_file_list, + args.subsample_read_limit, + args.subsample_percentage, + args.sampling_via_read_limit, + args.sampling_via_percentage, + args.num_threads, + ) + if args.run_trimming: + run_trimming( + args.output_dir, + args.short_read_fastq_dir, + args.delete_pre_trim_fastq, + args.num_threads, + args.trim_galore_bin, + ) if __name__ == "__main__":