diff --git a/.github/workflows/publish.yaml b/.github/workflows/publish.yaml index 0231521..fc1ac7f 100644 --- a/.github/workflows/publish.yaml +++ b/.github/workflows/publish.yaml @@ -17,7 +17,7 @@ jobs: - name: Setup Python uses: actions/setup-python@v4 with: - python-version: "3.8" + python-version: "3.10" - name: Build package run: | diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index f9f9075..5b07e51 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -57,4 +57,4 @@ jobs: - name: Publish docs run: | - mike deploy --config-file mkdocs.yml --push --force --update-aliases $(git tag --sort=committerdate | tail -1 | sed 's/v//') latest + mike deploy --config-file mkdocs.yml --push --update-aliases $(git tag --sort=committerdate | tail -1 | sed 's/v//') latest diff --git a/AmpliGone/AmpliGone.py b/AmpliGone/AmpliGone.py deleted file mode 100644 index ef05ef1..0000000 --- a/AmpliGone/AmpliGone.py +++ /dev/null @@ -1,442 +0,0 @@ -""" -Copyright © 2021 RIVM - -https://github.com/RIVM-bioinformatics/AmpliGone -""" - - -import argparse -import concurrent.futures as cf -import multiprocessing -import os -import pathlib -import sys -from itertools import chain -from typing import Callable, Iterable, List - -import numpy as np -import pandas as pd -import parmap -from rich import print -from rich.console import Console -from rich.progress import Progress, SpinnerColumn - -from .cut_reads import CutReads -from .fasta2bed import CoordinateListsToBed, MakeCoordinateLists -from .func import FlexibleArgFormatter, RichParser, log -from .io_ops import IndexReads, WriteOutput, read_bed -from .mappreset import FindPreset, parse_scoring_matrix -from .version import __version__ - - -def get_args(givenargs: List[str]) -> argparse.Namespace: - """It takes the arguments given to the script and parses them into the argparse namespace - - Parameters - ---------- - givenargs - the arguments given to the script - - Returns - ------- - The arguments that are given to the program. - - """ - - def check_file_extensions(allowed_extensions: Iterable[str], fname: str) -> str: - """ - Check if the file extension of the file name passed to it is one of the extensions in the list passed to it. - - Parameters - ---------- - allowed_extensions : iterable of str - A tuple of strings that are valid extensions. - fname : str - The name of the file to be read. - - Returns - ------- - str - The absolute path of the file passed. - - Raises - ------ - ArgumentParser.error - If the file extension of the file name passed to it is not one of the extensions in the list passed to it. - """ - ext = "".join(pathlib.Path(fname).suffixes) - if not any(ext.endswith(c) for c in allowed_extensions): - parser.error(f"File {fname} doesn't end with one of {allowed_extensions}") - return os.path.abspath(fname) - - def check_file_exists(fname: str) -> str: - """Errors if the given file `fname` does not exist - - Parameters - ---------- - fname : str - The name of the file to be read. - - Returns - ------- - str - The name of the file to be read. - - """ - if os.path.isfile(fname): - return fname - parser.error(f'"{fname}" is not a file. Exiting...') - - parser = RichParser( - prog="[bold]AmpliGone[/bold]", - usage="%(prog)s \[required options] \[optional arguments]", - description="[bold underline]AmpliGone[/bold underline]: An accurate and efficient tool to remove primers from NGS reads in reference-based experiments", - formatter_class=FlexibleArgFormatter, - add_help=False, - ) - - standard_threads = min(multiprocessing.cpu_count(), 128) - - required_args = parser.add_argument_group("Required Arguments") - - required_args.add_argument( - "--input", - "-i", - type=lambda s: check_file_exists( - check_file_extensions((".fastq", ".fq", ".bam", ".fastq.gz", ".fq.gz"), s) - ), - metavar="File", - help="Input file with reads in either FastQ or BAM format.", - required=True, - ) - - required_args.add_argument( - "--output", - "-o", - type=lambda s: check_file_extensions((".fastq", ".fq"), s), - metavar="File", - help="Output (FastQ) file with cleaned reads.", - required=True, - ) - - required_args.add_argument( - "--reference", - "-ref", - type=lambda s: check_file_exists(check_file_extensions((".fasta", ".fa"), s)), - metavar="File", - help="Input Reference genome in FASTA format", - required=True, - ) - required_args.add_argument( - "--primers", - "-pr", - type=lambda s: check_file_exists( - check_file_extensions((".fasta", ".fa", ".bed"), s) - ), - metavar="File", - help="""Used primer sequences in FASTA format or primer coordinates in BED format.\n Note that using bed-files overrides error-rate and ambiguity functionality""", - required=True, - ) - - optional_args = parser.add_argument_group("Optional Arguments") - - optional_args.add_argument( - "--amplicon-type", - "-at", - default="end-to-end", - choices=("end-to-end", "end-to-mid", "fragmented"), - help="Define the amplicon-type, either being [green]'end-to-end'[/green], [green]'end-to-mid'[/green], or [green]'fragmented'[/green]. See the docs for more info :book:", - required=False, - metavar="'end-to-end'/'end-to-mid'/'fragmented'", - ) - - optional_args.add_argument( - "--fragment-lookaround-size", - "-fls", - required=False, - type=int, - metavar="N", - help="The number of bases to look around a primer-site to consider it part of a fragment. Only used if amplicon-type is 'fragmented'. Default is 10", - ) - - optional_args.add_argument( - "--export-primers", - "-ep", - type=lambda s: check_file_extensions((".bed",), s), - metavar="File", - help="Output BED file with found primer coordinates if they are actually cut from the reads", - required=False, - ) - - optional_args.add_argument( - "--threads", - "-t", - type=int, - default=standard_threads, - metavar="N", - help=f"""Number of threads you wish to use.\n Default is the number of available threads in your system ({standard_threads})""", - ) - - optional_args.add_argument( - "-to", - action="store_true", - help="If set, AmpliGone will always create the output files even if there is nothing to output. (for example when an empty input-file is given)\n This is useful in (automated) pipelines where you want to make sure that the output files are always created.", - required=False, - ) - - optional_args.add_argument( - "--error-rate", - "-er", - type=float, - default=0.1, - metavar="N", - help="The maximum allowed error rate for the primer search. Use 0 for exact primer matches.", - required=False, - ) - - optional_args.add_argument( - "--alignment-preset", - "-ap", - type=str, - default=None, - choices=("sr", "map-ont", "map-pb", "splice"), - help="The preset to use for alignment of reads against the reference. This can be either 'sr', 'map-ont', 'map-pb', or 'splice'. The alignment-preset can be combined with a custom alignment-scoring matrix. See the docs for more info :book:", - required=False, - metavar="'sr'/'map-ont'/'map-pb'/'splice'", - ) - - optional_args.add_argument( - "--alignment-scoring", - "-as", - type=str, - default=None, - metavar="KEY=VALUE", - nargs="+", - help="The scoring matrix to use for alignment of reads. This should be list of key-value pairs, where the key is the scoring-parameter and the value is a positive integer indicating the scoring-value for that parameter. Possible parameters are \n * (1) match\n * (2) mismatch\n * (3) gap_o1\n * (4) gap_e1\n * (5) gap_o2 (Optional: requires 1,2,3,4)\n * (6) gap_e2 (Optional, requires 1,2,3,4,5)\n * (7) mma (Optional, requires 1,2,3,4,5,6)\nFor example:\n --alignment-scoring match=4 mismatch=3 gap_o1=2 gap_e1=1\nSee the docs for more info :book:", - required=False, - ) - - optional_args.add_argument( - "--version", - "-v", - action="version", - version=__version__, - help="Show the AmpliGone version and exit", - ) - - optional_args.add_argument( - "--help", - "-h", - action="help", - default=argparse.SUPPRESS, - help="Show this help message and exit", - ) - - flags = parser.parse_args(givenargs) - - return flags - - -def parallel( - frame: pd.DataFrame, - function: Callable[..., pd.DataFrame], - workers: int, - primer_df: pd.DataFrame, - reference: str, - preset: str, - scoring: List[int], - fragment_lookaround_size: int, - amplicon_type: str, -) -> pd.DataFrame: - """ - Apply a function to a pandas DataFrame in parallel using multiple workers. - - Parameters - ---------- - frame : pandas.DataFrame - The DataFrame to apply the function to. - function : Callable[..., pandas.DataFrame] - The function to apply to the DataFrame. - workers : int - The number of workers to use for parallel processing. - primer_df : pandas.DataFrame - The DataFrame containing primer information. - reference : str - The reference sequence to use for alignment. - preset : str - The preset to use for alignment. - scoring : List[int] - The scoring matrix to use for alignment. - fragment_lookaround_size : int - The size of the fragment lookaround. - amplicon_type : str - The type of amplicon. - - Returns - ------- - pandas.DataFrame - The resulting DataFrame after applying the function to the input DataFrame in parallel. - """ - frame_split = np.array_split(frame, workers) - tr = [*range(workers)] - return pd.concat( - parmap.map( - function, - zip(frame_split, tr), - primer_df, - reference, - preset, - scoring, - fragment_lookaround_size, - amplicon_type, - workers, - pm_processes=workers, - ) - ) - - -def main(): - if len(sys.argv[1:]) < 1: - print( - "AmpliGone was called but no arguments were given, please try again.\nUse 'AmpliGone -h' to see the help document" - ) - sys.exit(1) - args = get_args(sys.argv[1:]) - log.info( - f"Starting AmpliGone with inputfile [green]'{os.path.abspath(args.input)}'[/green]" - ) - - with cf.ThreadPoolExecutor(max_workers=args.threads) as ex: - TP_indexreads = ex.submit(IndexReads, args.input) - - if not args.primers.endswith(".bed"): - TP_PrimerLists = ex.submit( - MakeCoordinateLists, args.primers, args.reference, args.error_rate - ) - primer_df = TP_PrimerLists.result() - else: - log.info( - "Primer coordinates are given in BED format, skipping primer search" - ) - primer_df = read_bed(args.primers) - IndexedReads = TP_indexreads.result() - - if len(IndexedReads.index) < 1 and args.to is True: - ReadDict = IndexedReads.to_dict(orient="records") - WriteOutput(args.output, ReadDict) - if args.export_primers is not None: - with open(args.export_primers, "w") as f: - f.write("") - log.warning( - "AmpliGone was given an empty input file but the [green]'-to'[/green] flag was given.\nOne or multiple empty output file(s) have therefore been generated.\n[bold yellow]Please check the input file to make sure this is correct[/bold yellow]" - ) - sys.exit(0) - elif len(IndexedReads.index) < 1: - log.error( - "AmpliGone was given an empty input file. Exiting..\nPlease check the input file to make sure this is correct\n\n[bold yellow]Use the -to flag to force AmpliGone to create an output file even if there is nothing to output.[/bold yellow]" - ) - sys.exit(1) - else: - log.info( - f"Succesfully loaded [bold green]{len(IndexedReads.index)}[/bold green] reads." - ) - - if len(primer_df) < 1: - log.error( - "AmpliGone was unable to match any primers to the reference. AmpliGone is therefore unable to remove primers from the reads.\nPlease check the primers and reference to make sure this is correct\nExiting..." - ) - sys.exit(1) - - if len(IndexedReads.index) < args.threads: - log.info( - "[yellow]AmpliGone is set to use more threads than reads present. Downscaling threads to match.[/yellow]" - ) - args.threads = len(IndexedReads.index) - - preset: str = args.alignment_preset - if preset is None: - log.info("Finding optimal alignment-preset for the given reads") - # Todo: split this over two threads if possible - if len(IndexedReads.index) > 20000: - preset = FindPreset( - args.threads, IndexedReads.sample(frac=0.3) - ) # Todo: Make this more efficient - else: - preset = FindPreset(args.threads, IndexedReads) - - if args.alignment_scoring is not None: - log.info("Alignment scoring values given, parsing to scoring-matrix") - scoring = parse_scoring_matrix(sorted(args.alignment_scoring)) - else: - scoring = [] - - print(preset, scoring) - ## correct the lookaround size if the amplicon type is not fragmented - if args.amplicon_type != "fragmented": - args.fragment_lookaround_size = 10000 - elif args.fragment_lookaround_size is None: - args.fragment_lookaround_size = 10 - log.warning( - "[yellow]No fragment lookaround size was given, [underline]using default of 10[/underline][/yellow]" - ) - - log.info( - f"Distributing {len(IndexedReads.index)} reads across {args.threads} threads for processing. Processing around [bold green]{round(len(IndexedReads.index)/args.threads)}[/bold green] reads per thread" - ) - - IndexedReads = IndexedReads.sample(frac=1).reset_index(drop=True) - - with Progress( - SpinnerColumn(), - *Progress.get_default_columns(), - console=Console(record=True), - transient=True, - ) as progress: - progress.add_task("[yellow]Removing primer sequences...", total=None) - ProcessedReads = parallel( - IndexedReads, - CutReads, - args.threads, - primer_df, - args.reference, - preset, - scoring, - fragment_lookaround_size=args.fragment_lookaround_size, - amplicon_type=args.amplicon_type, - ) - - ProcessedReads.reset_index(drop=True) - log.info("Done removing primer sequences") - - total_nuc_preprocessing = sum(tuple(chain(IndexedReads["Sequence"].str.len()))) - removed_coordinates = tuple(chain(*ProcessedReads["Removed_coordinates"])) - - log.info( - f"\tRemoved a total of [bold cyan]{len(removed_coordinates)}[/bold cyan] nucleotides." - ) - log.info( - f"\tThis is [bold cyan]{round((len(removed_coordinates)/total_nuc_preprocessing)*100, 2)}%[/bold cyan] of the total amount of nucleotides present in the original reads." - ) - log.info( - f"\tThese nucleotides were removed from [bold cyan]{len(set(removed_coordinates))}[/bold cyan] unique nucleotide-coordinates." - ) - log.info( - f"\tThese nucleotide-coordinates correspond to the coordinates of [bold cyan]{len(primer_df)}[/bold cyan] (found) primers." - ) - - log.info("Writing output files") - if args.export_primers is not None: - removed_coords = set(removed_coordinates) - filtered_primer_df = primer_df[ - primer_df[["start", "end"]].apply( - lambda r: any(coord in removed_coords for coord in range(*r)), - axis=1, - ) - ] - CoordinateListsToBed(filtered_primer_df, args.export_primers) - - ProcessedReads = ProcessedReads.drop(columns=["Removed_coordinates"]) - - ReadDict = ProcessedReads.to_dict(orient="records") - - WriteOutput(args.output, ReadDict) diff --git a/AmpliGone/__init__.py b/AmpliGone/__init__.py index e69de29..955210f 100644 --- a/AmpliGone/__init__.py +++ b/AmpliGone/__init__.py @@ -0,0 +1,2 @@ +__version__ = "1.3.0" +__prog__ = "AmpliGone" diff --git a/AmpliGone/__main__.py b/AmpliGone/__main__.py new file mode 100644 index 0000000..28678b5 --- /dev/null +++ b/AmpliGone/__main__.py @@ -0,0 +1,494 @@ +""" +Copyright © 2021 RIVM + +https://github.com/RIVM-bioinformatics/AmpliGone +""" + +import argparse +import sys +from collections import defaultdict +from concurrent.futures import ProcessPoolExecutor +from itertools import chain +from typing import Callable, List, Set, Tuple + +import pandas as pd +import parmap +from rich import print +from rich.console import Console +from rich.progress import Progress, SpinnerColumn + +import AmpliGone.alignmentmatrix as AlignmentMatrix +import AmpliGone.alignmentpreset as AlignmentPreset +from AmpliGone import __prog__, __version__ +from AmpliGone.args import get_args +from AmpliGone.cut_reads import CutReads +from AmpliGone.fasta2bed import CoordinateListsToBed, find_or_read_primers +from AmpliGone.io_ops import SequenceReads, write_output +from AmpliGone.log import log + + +def check_loaded_index( + indexed_reads: SequenceReads, args: argparse.Namespace +) -> SequenceReads: + """ + Check the input file for indexed reads and handle empty input file scenarios. + + Parameters + ---------- + indexed_reads : SequenceReads + The indexed reads. + + args : argparse.Namespace + The command-line arguments. + + Returns + ------- + SequenceReads + The indexed reads. + + Raises + ------ + SystemExit + If the input file is empty and the '-to' flag is not provided. + + Notes + ----- + This function checks if the input file for indexed reads is empty. If it is empty and the '-to' flag is not provided, it raises a SystemExit exception. If the '-to' flag is provided, it writes an empty output file and logs a warning message. If the input file is not empty, it logs an info message and returns the indexed reads. + + """ + if len(indexed_reads.tuples) < 1: + if args.to is True: + read_records = indexed_reads.frame.to_dict(orient="records") + write_output(args.output, read_records, threads=args.threads) + if args.export_primers is not None: + with open(args.export_primers, "w") as f: + f.write("") + log.warning( + f"{__prog__} was given an empty input file but the [green]'-to'[/green] flag was given.\nOne or multiple empty output file(s) have therefore been generated.\n[bold yellow]Please check the input file to make sure this is correct[/bold yellow]" + ) + sys.exit(0) + else: + log.error( + f"{__prog__} was given an empty input file. Exiting..\nPlease check the input file to make sure this is correct\n\n[bold yellow]Use the -to flag to force {__prog__} to create an output file even if there is nothing to output.[/bold yellow]" + ) + sys.exit(1) + log.info( + f"Succesfully loaded [bold green]{len(indexed_reads.tuples)}[/bold green] reads." + ) + return indexed_reads + + +def primer_df_to_primer_index( + primer_df: pd.DataFrame, bind_virtual_primer: bool = True +) -> Tuple[defaultdict, defaultdict]: + """ + Convert primer DataFrame to primer index dictionaries. + + Parameters + ---------- + primer_df : pd.DataFrame + DataFrame containing primer information. + bind_virtual_primer : bool, optional + Whether to match closely positioned primers in the same orientation into a virtual primer. Defaults to False. + + Returns + ------- + Tuple[defaultdict, defaultdict] + A tuple of two defaultdicts representing the forward and reverse primer indices. + + Raises + ------ + SystemExit + If the primer DataFrame is empty. + + Notes + ----- + This function converts the primer DataFrame into two defaultdicts representing the forward and reverse primer indices. The primer DataFrame should contain information about the primers, including the reference ID, start coordinate, end coordinate, and strand. If the primer DataFrame is empty, a SystemExit exception is raised. + + The bind_virtual_primer parameter determines whether virtual primers should be bound. If bind_virtual_primer is set to True, closely positioned primers in the same orientation will be matched to generate a virtual primer for proper removal. By default, bind_virtual_primer is set to False. + + Examples + -------- + >>> primer_df = pd.DataFrame(...) + >>> forward_dict, reverse_dict = primer_df_to_primer_index(primer_df) + >>> print(forward_dict) + defaultdict(, {'ref1': {1, 2, 3}, 'ref2': {4, 5, 6}}) + >>> print(reverse_dict) + defaultdict(, {'ref1': {7, 8, 9}, 'ref2': {10, 11, 12}}) + """ + if len(primer_df) < 1: + log.error( + f"{__prog__} was unable to match any primers to the reference. {__prog__} is therefore unable to remove primers from the reads.\nPlease check the primers and reference to make sure this is correct\nExiting..." + ) + sys.exit(1) + + forward_dict = defaultdict(set) + reverse_dict = defaultdict(set) + + reference_set: Set[str] = set(primer_df["ref"].unique()) + for refid in reference_set: + forward_dict[refid] = set() + reverse_dict[refid] = set() + + # split the primer_df into forward and reverse primers based on strand + forward_primers_df, reverse_primer_df = ( + primer_df[primer_df["strand"] == "+"], + primer_df[primer_df["strand"] == "-"], + ) + + def coordinates_to_index( + oriented_primer_df: pd.DataFrame, refid: str, start: int, end: int + ) -> range: + """ + Converts coordinates to an index range based on the given parameters. + + Parameters + ---------- + oriented_primer_df : pd.DataFrame + The DataFrame containing the oriented primer information. + refid : str + The reference ID. + start : int + The start coordinate. + end : int + The end coordinate. + + Returns + ------- + range + The index range based on the given coordinates. + + Notes + ----- + If bind_virtual_primer is True, the function will also match closely positioned primers in the same orientation to generate a virtual primer for proper removal. + bind_virtual_primer is set in the wrapping function and inherited here. + """ + if bind_virtual_primer: + length = end - start + for _, refid2, start2, end2 in oriented_primer_df[ + ["ref", "start", "end"] + ].itertuples(): + if ( + refid == refid2 + and start2 <= end + length + and end2 >= start - length + ): + start = min(start, start2) + end = max(end, end2) + return range(start + 1, end) + + # iterate over forward_primers_df and add the coordinates between "start" and "end" to the forward_dict + for _, refid, start, end in forward_primers_df[ + ["ref", "start", "end"] + ].itertuples(): + refid: str + start: int + end: int + forward_dict[refid].update( + coordinates_to_index(forward_primers_df, refid, start, end) + ) + + # iterate over reverse_primers_df and add the coordinates between "start" and "end" to the reverse_dict + for _, refid, start, end in reverse_primer_df[["ref", "start", "end"]].itertuples(): + refid: str + start: int + end: int + reverse_dict[refid].update( + coordinates_to_index(reverse_primer_df, refid, start, end) + ) + + return forward_dict, reverse_dict + + +def check_thread_count( + indexed_reads: SequenceReads, args: argparse.Namespace +) -> argparse.Namespace: + """ + Check the thread count and adjust it if necessary based on the number of reads. + + Parameters + ---------- + indexed_reads : SequenceReads + The indexed reads. + args : argparse.Namespace + The command-line arguments. + + Returns + ------- + argparse.Namespace + The updated command-line arguments. + + Notes + ----- + This function checks if the number of threads specified in the command-line arguments + is greater than the number of reads present in the indexed reads. If it is, the number + of threads is downscaled to match the number of reads. + + If the number of reads is less than the specified number of threads, the number of threads + is set to the minimum value between the number of reads and 2. + + Examples + -------- + >>> indexed_reads = SequenceReads(...) + >>> args = argparse.Namespace(threads=4) + >>> updated_args = check_thread_count(indexed_reads, args) + >>> updated_args.threads + 2 + """ + if len(indexed_reads.tuples) < args.threads: + log.info( + f"[yellow]{__prog__} is set to use more threads than reads present. Downscaling threads to match.[/yellow]" + ) + args.threads = min(len(indexed_reads.tuples), 2) + return args + + +def correct_fragment_lookaround_size(args: argparse.Namespace) -> argparse.Namespace: + """ + Corrects the fragment lookaround size based on the amplicon type. + + Parameters + ---------- + args : argparse.Namespace + The command-line arguments. + + Returns + ------- + argparse.Namespace + The updated command-line arguments. + + Notes + ----- + This function adjusts the `fragment_lookaround_size` based on the `amplicon_type` value in the `args` namespace. + If the `amplicon_type` is not "fragmented", the `fragment_lookaround_size` is set to 10000. + If the `amplicon_type` is "fragmented" and `fragment_lookaround_size` is not provided, it is set to 10. + A warning message is logged if the `fragment_lookaround_size` is set to the default value. + + Examples + -------- + >>> args = argparse.Namespace(amplicon_type="fragmented", fragment_lookaround_size=None) + >>> corrected_args = correct_fragment_lookaround_size(args) + >>> print(corrected_args.fragment_lookaround_size) + 10 + """ + if args.amplicon_type != "fragmented": + args.fragment_lookaround_size = 10000 + elif args.fragment_lookaround_size is None: + args.fragment_lookaround_size = 10 + log.warning( + "[yellow]No fragment lookaround size was given, [underline]using default of 10[/underline][/yellow]" + ) + return args + + +def parallel_dispatcher( + indexed_reads: SequenceReads, + args: argparse.Namespace, + primer_sets: Tuple[defaultdict, defaultdict], + preset: str, + matrix: List[int], +) -> pd.DataFrame: + """ + Wrapping function that actually calls the parallelization function to process the primer removal process of the reads. + + Parameters + ---------- + indexed_reads : SequenceReads + The indexed reads to be processed. + args : argparse.Namespace + The command-line arguments. + primer_sets : Tuple[defaultdict, defaultdict] + The primer sequences to be removed. + preset : str + The preset configuration for processing. + matrix : List[int] + The matrix for processing. + + Returns + ------- + pd.DataFrame + The processed reads with primer sequences removed. + """ + with Progress( + SpinnerColumn(), + *Progress.get_default_columns(), + console=Console( + record=False, + ), + transient=True, + disable=True if args.verbose is True or args.quiet is True else False, + ) as progress: + progress.add_task("[yellow]Removing primer sequences...", total=None) + processed_reads = parallel( + indexed_reads.frame, + CutReads, + args.threads, + primer_sets, + args.reference, + preset, + matrix, + fragment_lookaround_size=args.fragment_lookaround_size, + amplicon_type=args.amplicon_type, + ) + + processed_reads.reset_index(drop=True) + log.info("Done removing primer sequences") + return processed_reads + + +def parallel( + frame: pd.DataFrame, + function: Callable[..., pd.DataFrame], + workers: int, + primer_sets: Tuple[defaultdict, defaultdict], + reference: str, + preset: str, + scoring: List[int], + fragment_lookaround_size: int, + amplicon_type: str, +) -> pd.DataFrame: + """ + Apply a function to a pandas DataFrame in parallel using multiple workers. + + Parameters + ---------- + frame : pandas.DataFrame + The DataFrame to apply the function to. + function : Callable[..., pandas.DataFrame] + The function to apply to the DataFrame. + workers : int + The number of workers to use for parallel processing. + primer_df : Tuple[defaultdict, defaultdict] + A tuple containing the indexes of the primer coordinates to remove. + reference : str + The reference sequence to use for alignment. + preset : str + The preset to use for alignment. + scoring : List[int] + The scoring matrix to use for alignment. + The size of the fragment lookaround. + fragment_lookaround_size : int + amplicon_type : str + The type of amplicon. + + Returns + ------- + pandas.DataFrame + The resulting DataFrame after applying the function to the input DataFrame in parallel. + """ + frame_split = [frame.iloc[i::workers] for i in range(workers)] + tr = [*range(workers)] + return pd.concat( + parmap.map( + function, + zip(frame_split, tr), + primer_sets, + reference, + preset, + scoring, + fragment_lookaround_size, + amplicon_type, + workers, + pm_processes=workers, + ) + ) + + +def main(): + if len(sys.argv[1:]) < 1: + print( + f"{__prog__} was called but no arguments were given, please try again.\nUse '{__prog__} -h' to see the help document" + ) + sys.exit(1) + args = get_args(sys.argv[1:]) + # check if verbose and quiet aren't both set + if args.verbose is True and args.quiet is True: + log.error( + f"{__prog__} was given both the [green]'--verbose'[/green] and [green]'--quiet'[/green] flags. Please only use one of these flags at a time. Exiting..." + ) + sys.exit(1) + if args.verbose is True: + log.setLevel("DEBUG") + log.debug(f"Arguments: {args}") + if args.quiet is True: + log.setLevel("WARNING") + + log.info(f"{__prog__} version: [blue]{__version__}[/blue]") + + # TODO: improve this log message + if args.threads < 2: + log.error( + f"{__prog__} requires a minimum of 2 threads for execution, but only {args.threads} thread was provided. Exiting..." + ) + sys.exit(1) + + with ProcessPoolExecutor(max_workers=args.threads) as pool: + future_indexreads = pool.submit(SequenceReads, args.input) + future_primersearch = pool.submit( + find_or_read_primers, + primerfile=args.primers, + referencefile=args.reference, + err_rate=args.error_rate, + ) + future_scoring = pool.submit( + AlignmentMatrix.get_scoring_matrix, args.alignment_scoring + ) + + matrix = future_scoring.result() + primer_df = future_primersearch.result() + indexed_reads = future_indexreads.result() + + indexed_reads = check_loaded_index(indexed_reads, args) + primer_indexes = primer_df_to_primer_index( + primer_df=primer_df, bind_virtual_primer=args.virtual_primers + ) + args = check_thread_count(indexed_reads, args) + + preset: str = AlignmentPreset.get_alignment_preset(args, indexed_reads) + + args = correct_fragment_lookaround_size(args) + + log.info( + f"Distributing {len(indexed_reads.tuples)} reads across {args.threads} threads for processing. Processing around [bold green]{round(len(indexed_reads.tuples)/args.threads)}[/bold green] reads per thread" + ) + + indexed_reads.frame = indexed_reads.frame.sample(frac=1).reset_index(drop=True) + + processed_reads = parallel_dispatcher( + indexed_reads, args, primer_indexes, preset, matrix + ) + + total_nuc_preprocessing = sum( + tuple(chain(indexed_reads.frame["Sequence"].str.len())) + ) + removed_coordinates = tuple(chain(*processed_reads["Removed_coordinates"])) + + log.info( + f"\tRemoved a total of [bold cyan]{len(removed_coordinates)}[/bold cyan] nucleotides." + ) + log.info( + f"\tThis is [bold cyan]{round((len(removed_coordinates)/total_nuc_preprocessing)*100, 2)}%[/bold cyan] of the total amount of nucleotides present in the original reads." + ) + log.info( + f"\tThese nucleotides were removed from [bold cyan]{len(set(removed_coordinates))}[/bold cyan] unique nucleotide-coordinates." + ) + log.info( + f"\tThese nucleotide-coordinates correspond to the coordinates of [bold cyan]{len(primer_df)}[/bold cyan] (found) primers." + ) + + log.info("Writing output files") + if args.export_primers is not None: + removed_coords = set(removed_coordinates) + filtered_primer_df = primer_df[ + primer_df[["start", "end"]].apply( + lambda r: any(coord in removed_coords for coord in range(*r)), + axis=1, + ) + ] + CoordinateListsToBed(filtered_primer_df, args.export_primers) + + processed_reads = processed_reads.drop(columns=["Removed_coordinates"]) + + read_records = processed_reads.to_dict(orient="records") + + write_output(args.output, read_records, args.threads) diff --git a/AmpliGone/alignmentmatrix.py b/AmpliGone/alignmentmatrix.py new file mode 100644 index 0000000..45d9806 --- /dev/null +++ b/AmpliGone/alignmentmatrix.py @@ -0,0 +1,350 @@ +import os +import sys +from typing import Dict, List + +from AmpliGone.log import log + + +def get_scoring_matrix(input_matrix: List[str] | None) -> List[int]: + """ + Calculate the scoring matrix based on the input matrix. + + Parameters + ---------- + input_matrix : List[str] or None + The input matrix used to calculate the scoring matrix. Each element in the list should be in the format 'key=value'. + + Returns + ------- + List[int] + The calculated scoring matrix. + + Raises + ------ + ValueError + If the input matrix is not in the correct format or contains negative values. + + Notes + ----- + This function calculates the scoring matrix based on the input matrix. + The input matrix must have a length of 4, 6, or 7 parameters. + The scoring matrix may only contain non-negative integers. + + Examples + -------- + >>> input_matrix = ['match=1', 'mismatch=2', 'gap_o1=3', 'gap_e1=4'] + >>> get_scoring_matrix(input_matrix) + [1, 2, 3, 4] + + >>> input_matrix = ['match=1', 'mismatch=2', 'gap_o1=3', 'gap_e1=4', 'gap_o2=5', 'gap_e2=6'] + >>> get_scoring_matrix(input_matrix) + [1, 2, 3, 4, 5, 6] + + >>> input_matrix = ['match=1', 'mismatch=2', 'gap_o1=3', 'gap_e1=4', 'gap_o2=5', 'gap_e2=6', 'mma=7'] + >>> get_scoring_matrix(input_matrix) + [1, 2, 3, 4, 5, 6, 7] + + >>> input_matrix = ['match=1', 'mismatch=2', 'gap_o1=3', 'gap_e1=4', 'gap_o2=5', 'gap_e2=6', 'mma=7', 'extra=8'] + >>> get_scoring_matrix(input_matrix) + SystemExit: Invalid scoring matrix length. The scoring-matrix must have a length of 4, 6 or 7 parameters. + The following input parameters were given: 'match=1 mismatch=2 gap_o1=3 gap_e1=4 gap_o2=5 gap_e2=6 mma=7 extra=8'. + After parsing, these inputs result in the following: {'match': 1, 'mismatch': 2, 'gap_o1': 3, 'gap_e1': 4, 'gap_o2': 5, 'gap_e2': 6, 'mma': 7, 'extra': 8}. + Please note that adding the same key multiple times will result in the last value being used. + + >>> input_matrix = ['match=1', 'mismatch=2', 'gap_o1=3', 'gap_e1=4', 'gap_o2=5', 'gap_e2=6', 'mma=-7'] + >>> get_scoring_matrix(input_matrix) + SystemExit: Given scoring matrix contains a negative value. + The scoring matrix may only contain non-negative integers. Please check your input and try again. + + """ + log.debug(f"Starting MATRIXPARSER process\t@ ProcessID {os.getpid()}") + matrix_dict = _input_to_dict(input_matrix) + if matrix_dict is None: + return [] + if _valid_scoring_list_length(list(matrix_dict.keys())) is False: + log.error( + f"Invalid scoring matrix length. The scoring-matrix must have a length of 4, 6 or 7 parameters. \nThe following input parameters were given: '[red]{' '.join(input_matrix) if input_matrix is not None else None}[/red]'. \nAfter parsing, these inputs result in the following: [red]{matrix_dict}[/red]. \nPlease note that adding the same key multiple times will result in the last value being used." + ) + sys.exit(1) + if _scoring_has_negative_values(list(matrix_dict.values())) is True: + log.error( + "Given scoring matrix contains a negative value. \nThe scoring matrix may only contain non-negative integers. Please check your input and try again." + ) + sys.exit(1) + return _validate_matrix_combinations(matrix_dict) + + +def _input_to_dict(input_matrix: List[str] | None) -> Dict[str, int] | None: + """ + Convert the input matrix to a dictionary. + + Parameters + ---------- + input_matrix : List[str] or None + The input matrix to be converted. Each element in the list should be in the format 'key=value'. + + Returns + ------- + dict or None + A dictionary representation of the input matrix. Returns None if the input matrix is None. + + Raises + ------ + ValueError + If the input matrix is not in the correct format. + + Examples + -------- + >>> input_matrix = ['a=1', 'b=2', 'c=3'] + >>> _input_to_dict(input_matrix) + {'a': 1, 'b': 2, 'c': 3} + + >>> input_matrix = None + >>> _input_to_dict(input_matrix) + None + + >>> input_matrix = ['a=1', 'b=2', 'c'] + >>> _input_to_dict(input_matrix) + ValueError: Invalid scoring matrix input. The scoring matrix input must be in the format 'key=value'. + Please check if your input does not contain any spaces and try again. + """ + if input_matrix is None: + log.debug("MATRIXPARSER :: No input matrix given.") + return None + for item in input_matrix: + if "=" not in item: + log.error( + "Invalid scoring matrix input. The scoring matrix input must be in the format 'key=value'.\nPlease check if your input does not contain any spaces and try again." + ) + sys.exit(1) + sort_order = ["match", "mismatch", "gap_o1", "gap_e1", "gap_o2", "gap_e2", "mma"] + return { + item.split("=")[0]: int(item.split("=")[1]) + for item in sorted( + input_matrix, key=lambda x: sort_order.index(x.split("=")[0]) + ) + } + + +def _valid_scoring_list_length(input_list: List[str]) -> bool: + """ + Check if the length of the input list is either 4, 6, or 7. + + Parameters + ---------- + input_list : list of str + The list to check the length of. + + Returns + ------- + bool + True if the length of the list is 4, 6, or 7, False otherwise. + + Raises + ------ + None + + Examples + -------- + >>> _valid_scoring_list_length(['A', 'B', 'C', 'D']) + True + + >>> _valid_scoring_list_length(['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H']) + False + """ + return len(input_list) in {4, 6, 7} + + +def _scoring_has_negative_values(input_list: List[int]) -> bool: + """ + Check if the input list contains any negative values. + + Parameters + ---------- + input_list : list of int + The list to check for negative values. + + Returns + ------- + bool + True if any item in the list is less than 0, False otherwise. + + Examples + -------- + >>> _scoring_has_negative_values([1, 2, 3]) + False + + >>> _scoring_has_negative_values([-1, 2, 3]) + True + """ + return any(item < 0 for item in input_list) + + +def _sort_matrix_dict( + matrix_dict: Dict[str, int], + required_4: List[str], + required_6: List[str], + required_7: List[str], +) -> Dict[str, int]: + """ + Sorts the given matrix dictionary based on the provided order of keys. + + Parameters + ---------- + matrix_dict : Dict[str, int] + A dictionary containing key-value pairs, where the keys are strings and the values are integers. + required_4 : List[str] + A list of strings representing the required keys that should be placed at the beginning of the sorted dictionary. + required_6 : List[str] + A list of strings representing the required keys that should be placed after the required_4 keys in the sorted dictionary. + required_7 : List[str] + A list of strings representing the required keys that should be placed after the required_6 keys in the sorted dictionary. + + Returns + ------- + Dict[str, int] + A dictionary containing the sorted key-value pairs from the input matrix_dict, based on the provided order of keys. + """ + order = required_4 + required_6 + required_7 + matrix_dict = {key: matrix_dict[key] for key in order if key in matrix_dict} + return matrix_dict + + +def _get_ordered_values( + matrix_dict: Dict[str, int], + required_4: List[str], + required_6: List[str], + required_7: List[str], +) -> List[int]: + """ + Get the ordered values from the matrix dictionary based on the number of keys present. + + Parameters + ---------- + matrix_dict : Dict[str, int] + A dictionary containing string keys and integer values. + required_4 : List[str] + A list of strings representing the required keys when there are 4 keys in the matrix_dict. + required_6 : List[str] + A list of strings representing the required keys when there are 6 keys in the matrix_dict. + required_7 : List[str] + A list of strings representing the required keys when there are 7 keys in the matrix_dict. + + Returns + ------- + List[int] + A list of integers representing the ordered values corresponding to the required keys. + + Raises + ------ + None + + """ + ordered_vals: List[int] = [] + matrix_keys = list(matrix_dict.keys()) + if len(matrix_keys) == 4: + if not set(matrix_keys).issubset(required_4): + _log_invalid_combination_error(matrix_keys, required_4) + ordered_vals = [matrix_dict[key] for key in required_4] + log.debug(f"MATRIXPARSER :: Ordered values: {ordered_vals}") + elif len(matrix_keys) == 6: + if not set(matrix_keys).issubset(required_6): + _log_invalid_combination_error(matrix_keys, required_6) + ordered_vals = [matrix_dict[key] for key in required_6] + log.debug(f"MATRIXPARSER :: Ordered values: {ordered_vals}") + elif len(matrix_keys) == 7: + if not set(matrix_keys).issubset(required_7): + _log_invalid_combination_error(matrix_keys, required_7) + ordered_vals = [matrix_dict[key] for key in required_7] + log.debug(f"MATRIXPARSER :: Ordered values: {ordered_vals}") + return ordered_vals + + +def _log_invalid_combination_error( + matrix_keys: List[str], required_keys: List[str] +) -> None: + """ + Log an error message and exit the program when an invalid combination of scoring matrix keys is encountered. + + Parameters + ---------- + matrix_keys : List[str] + The list of scoring matrix keys that were given. + required_keys : List[str] + The list of valid scoring matrix keys that are supported. + + Returns + ------- + None + + Notes + ----- + This function logs an error message and exits the program if an invalid combination of scoring matrix keys is encountered. + The error message includes information about the number of valid scoring matrix keys and the keys that were given. + + Examples + -------- + >>> matrix_keys = ['A', 'B', 'C'] + >>> required_keys = ['A', 'B', 'C', 'D'] + >>> _log_invalid_combination_error(matrix_keys, required_keys) + Invalid combination of scoring matrix keys. A total of 4 valid scoring-matrix keys were given. + The following keys are supported for 4 scoring-matrix keys: 'A | B | C | D'. + The following keys were given: 'A | B | C'. + """ + log.error( + f"Invalid combination of scoring matrix keys. A total of {len(required_keys)} valid scoring-matrix keys were given. \nThe following keys are supported for {len(required_keys)} scoring-matrix keys: '[green]{' | '.join(required_keys)}[/green]'. \nThe following keys were given: '[red]{' | '.join(matrix_keys)}[/red]'." + ) + sys.exit(1) + + +def _validate_matrix_combinations(matrix_dict: Dict[str, int]) -> List[int]: + """ + Validate the combinations of matrix values in the given matrix dictionary. + + Parameters + ---------- + matrix_dict : Dict[str, int] + A dictionary containing matrix values. + + Returns + ------- + List[int] + A list of ordered matrix values. + + Raises + ------ + None + + Notes + ----- + The function validates the combinations of matrix values in the given matrix dictionary. + It ensures that the required matrix values are present and in the correct order. + The required matrix values are 'match', 'mismatch', 'gap_o1', 'gap_e1', 'gap_o2', 'gap_e2', and 'mma'. + The function sorts the matrix dictionary based on the required values and returns the ordered matrix values. + + Examples + -------- + >>> matrix_dict = { + ... 'match': 4, + ... 'mismatch': 4, + ... 'gap_o1': 6, + ... 'gap_e1': 2, + ... 'gap_o2': 14, + ... 'gap_e2': 1, + ... 'mma': 10 + ... } + >>> ordered_values = _validate_matrix_combinations(matrix_dict) + >>> print(ordered_values) + [4, 4, 6, 2, 14, 1, 10] + """ + required_4 = ["match", "mismatch", "gap_o1", "gap_e1"] + required_6 = required_4 + ["gap_o2", "gap_e2"] + required_7 = required_6 + ["mma"] + + log.debug( + "MATRIXPARSER :: Sorting matrix dictionary to fit required alignment matrix." + ) + matrix_dict = _sort_matrix_dict(matrix_dict, required_4, required_6, required_7) + log.debug(f"MATRIXPARSER :: {matrix_dict}") + log.debug("MATRIXPARSER :: Fetching ordered values from sorted matrix dictionary") + return _get_ordered_values(matrix_dict, required_4, required_6, required_7) diff --git a/AmpliGone/alignmentpreset.py b/AmpliGone/alignmentpreset.py new file mode 100644 index 0000000..afb3ed8 --- /dev/null +++ b/AmpliGone/alignmentpreset.py @@ -0,0 +1,498 @@ +import argparse +from concurrent.futures import ProcessPoolExecutor +from typing import Generator, List, Tuple + +import pandas as pd + +from AmpliGone.io_ops import SequenceReads +from AmpliGone.log import log + + +def get_alignment_preset( + input_args: argparse.Namespace, indexed_reads: SequenceReads +) -> str: + """ + Get the alignment preset for the given reads. + + Parameters + ---------- + input_args : argparse.Namespace + The input arguments. + indexed_reads : LoadData + The indexed reads. + + Returns + ------- + str + The alignment preset. + + Raises + ------ + None + + Notes + ----- + If the `alignment_preset` argument is provided in `input_args`, it will be returned directly. + Otherwise, the function will find the optimal alignment preset for the given reads. + + The optimal alignment preset is determined by sampling a subset of the reads and using the + `find_preset` function to calculate the preset based on the sampled reads. + + The sample size is determined by taking the minimum of the total number of reads and 15000. + + """ + if input_args.alignment_preset is not None: + return input_args.alignment_preset + log.info("Finding optimal alignment-preset for the given reads") + sample_size = min(len(indexed_reads.tuples), 15000) + return find_preset(input_args.threads, indexed_reads.frame.sample(n=sample_size)) + + +def find_preset(threads: int, data: pd.DataFrame) -> str: + """ + Find the preset based on the statistics calculated from the input data. + + Parameters + ---------- + threads : int + Number of threads to be used for processing the data. + data : pd.DataFrame + Pandas DataFrame containing at least two columns: "Sequence" and "Qualities". + The "Sequence" column should contain strings representing sequences, and the "Qualities" column + should contain strings representing quality data associated with the sequences. + + Returns + ------- + str + The preset determined based on the calculated statistics. + + Notes + ----- + This function takes in a number of threads and a DataFrame, extracts read data, calculates statistics, + and determines a preset based on the statistics. + + The preset is determined by calling the `_determine_preset` function with the calculated statistics as arguments. + + See Also + -------- + _determine_preset : Function to determine the preset based on the calculated statistics. + + Examples + -------- + >>> import pandas as pd + >>> data = pd.DataFrame({'Sequence': ['ATCG', 'GCTA'], 'Qualities': ['!@#$%', '&*()']}) + >>> find_preset(4, data) + 'sr' + """ + + def _extract_read_data(data: pd.DataFrame) -> Tuple[List[str], str]: + """ + Extract read data from the input DataFrame. + + Parameters + ---------- + data : pd.DataFrame + Pandas DataFrame containing at least two columns: "Sequence" and "Qualities". + + Returns + ------- + Tuple[List[str], str] + A tuple containing two elements: + 1. `ReadList` : List[str] + A list of strings extracted from the "Sequence" column of the input DataFrame. + 2. `QualData` : str + A single string obtained by joining all the elements in the "Qualities" column of the input DataFrame. + + Notes + ----- + This function takes a pandas DataFrame as input and returns a tuple containing a list of sequences + and a concatenated string of qualities. + + Examples + -------- + >>> import pandas as pd + >>> data = pd.DataFrame({'Sequence': ['ATCG', 'GCTA'], 'Qualities': ['!@#$%', '&*()']}) + >>> _extract_read_data(data) + (['ATCG', 'GCTA'], '!@#$%&*()') + """ + list_of_reads: List[str] = data["Sequence"].tolist() + qualities_str: str = "".join(data["Qualities"].tolist()) + return list_of_reads, qualities_str + + reads_list, quality_data = _extract_read_data(data) + ord_quality_list: List[int] = _qual_to_ord_dispatcher(quality_data, threads) + avg_len, avg_qual, quality_range, length_range = _sequence_statistics_dispatcher( + reads_list, ord_quality_list, threads + ) + return _determine_preset(avg_len, avg_qual, quality_range, length_range) + + +def _qual_to_ord_dispatcher(qdata: str, threads) -> List[int]: + """ + Convert a string of characters to a list of ASCII values minus 33 using multiple threads. + + Parameters + ---------- + qdata : str + The input string to be converted. + threads : int + The number of threads to use for parallel processing. + + Returns + ------- + list + A list of integers representing the ASCII values of the characters in the input string minus 33. + + Examples + -------- + >>> _qual_to_ord_dispatcher("Hello", 2) + [32, 29, 34, 34, 37] + """ + + def _create_chunks(lst: str, n: int) -> Generator: + """ + Splits a list into approximately equal-sized chunks. + + Parameters + ---------- + lst : list + The list to be split. + n : int + The number of chunks to create. + + Yields + ------ + generator + A generator that yields the chunks. + + Examples + -------- + >>> lst = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] + >>> for chunk in chunks(lst, 3): + ... print(chunk) + [1, 2, 3, 4] + [5, 6, 7] + [8, 9, 10] + """ + chunk_size, remainder = divmod(len(lst), n) + start = 0 + for i in range(n): + end = start + chunk_size + if i < remainder: + end += 1 + yield lst[start:end] + start = end + + qdata_chunks: List[str] = list(_create_chunks(qdata, threads)) + ordinal_quality_list: List[int] = [] + with ProcessPoolExecutor(max_workers=threads) as pool: + results = pool.map(_process_chunk, qdata_chunks) + for result in results: + ordinal_quality_list.extend(result) + return ordinal_quality_list + + +def _process_chunk(chunk: str): + """ + Converts each character in the given chunk to its corresponding ASCII value minus 33. + + Parameters + ---------- + chunk : str + The input chunk of characters. + + Returns + ------- + list of int + A list of integers representing the ASCII values of the characters in the chunk minus 33. + + Examples + -------- + >>> _process_chunk("Hello") + [32, 29, 34, 34, 37] + + Notes + ----- + This function uses the `ord` function to convert each character to its corresponding ASCII value. + The ASCII value is then subtracted by 33 to obtain the desired result. + """ + return [ord(character) - 33 for character in chunk] + + +def _determine_preset( + avg_len: float, avg_qual: float, quality_range: int, length_range: int +) -> str: + def _determine_sequence_variance( + quality_range: int, length_range: int, avg_qual: float, avg_len: float + ) -> Tuple[float, float]: + """ + Calculate the variability of a sequence based on the difference between the average quality score and the quality score of the + current sequence, and the difference between the average length and the length of the current + sequence. + + Parameters + ---------- + quality_range : int + The difference between the highest and lowest quality scores. + length_range : int + The difference between the longest and shortest sequence. + avg_qual : float + The average quality score of the sequence. + avg_len : float + The average length of the sequences. + + Returns + ------- + Tuple[float, float] + The quality variance and length variance. + + Examples + -------- + >>> _determine_sequence_variance(20, 10, 30.0, 5.0) + (0.6666666666666666, 2.0) + + Notes + ----- + The quality variance is calculated as the ratio of the quality range to the average quality score. + The length variance is calculated as the ratio of the length range to the average length. + + """ + quality_variancy = quality_range / avg_qual + length_variancy = length_range / avg_len + return quality_variancy, length_variancy + + def _determine_sequence_stability( + quality_range: int, length_range: int, avg_qual: float, avg_len: float + ) -> float: + """ + Calculate the stability of a sequence based on the variability of the quality scores and the lengths of the sequences. + + Parameters + ---------- + quality_range : int + The difference between the highest and lowest quality scores. + length_range : int + The difference between the longest and shortest sequence. + avg_qual : float + The average quality score of the sequence. + avg_len : float + The average length of the sequences. + + Returns + ------- + float + The percentage of stability of the sequence. + + Examples + -------- + >>> _determine_sequence_stability(20, 10, 30.0, 5.0) + 32.33333333333333 + + Notes + ----- + The stability of a sequence is calculated by subtracting the sum of the quality score variance and length variance from 100. + """ + quality_variance, length_variance = _determine_sequence_variance( + quality_range, length_range, avg_qual, avg_len + ) + return 100 - (quality_variance + length_variance) + + def _is_long_read(avg_len: float) -> bool: + """ + Determine if a read is considered long based on its average length. + + Parameters + ---------- + avg_len : float + The average length of the read. + + Returns + ------- + bool + True if the read is considered long (average length > 300), False otherwise. + + Examples + -------- + >>> _is_long_read(250) + False + >>> _is_long_read(350) + True + + Notes + ----- + This function considers a read to be long if its average length is greater than 300. + """ + return avg_len > 300 + + if ( + _determine_sequence_stability(quality_range, length_range, avg_qual, avg_len) + > 97 + ): + if _is_long_read(avg_len) is False: + # this is probably 'short read' illumina NextSeq data + # --> set the 'SR' preset + return "sr" + ##! previous if-statement is not False. + # this is probably 'long read' illumina MiSeq data + # --> the 'SR' preset still applies but we keep it split + # in case a custom set of parameters is necessary in the future + return "sr" + if _is_long_read(avg_len) is True: + # this is probably oxford nanopore data + # --> set the preset to 'map-ont' + return "map-ont" + ##! previous if-statement is not True. + # this might be very 'unstable' nextseq data, + # or from a platform we currently dont really support officially. + # fallback to 'sr' preset + return "sr" + + +def _calc_avg_read_length(sequence_list: List[str]) -> float: + """ + Calculate the average length of a list of sequences. + + Parameters + ---------- + sequence_list : List[str] + A list of sequences. + + Returns + ------- + float + The average length of the sequences in the list. + + Examples + -------- + >>> _calc_avg_read_length(['ATCG', 'ATCGATCG', 'AT']) + 5.0 + + Notes + ----- + This function calculates the average length of the sequences in the given list. + The average length is calculated by summing the lengths of all sequences and dividing it by the total number of sequences. + + The function assumes that all sequences in the list are of type str. + + """ + return sum(map(len, sequence_list)) / float(len(sequence_list)) + + +def _calc_avg_read_qual(quality_list: List[int]) -> float: + """ + Calculate the average quality score of a list of quality scores. + + Parameters + ---------- + quality_list : List[int] + A list of quality scores. + + Returns + ------- + float + The average quality score of the quality scores in the list. + + Examples + -------- + >>> _calc_avg_read_qual([20, 30, 40]) + 30.0 + + Notes + ----- + This function calculates the average quality score by summing up all the quality scores in the list + and dividing it by the total number of quality scores. + + The average quality score is a measure of the overall quality of the sequence data. + Higher average quality scores indicate better quality data. + + """ + return sum(quality_list) / len(quality_list) + + +def _get_unique_quality_scores(quality_list: List[int]) -> int: + """ + Return the number of unique quality scores in a list. + + Parameters + ---------- + quality_list : List[int] + A list of quality scores. + + Returns + ------- + int + The number of unique quality scores in the list. + + Examples + -------- + >>> GetQualRange([20, 30, 20, 40]) + 3 + """ + return len(set(quality_list)) + + +def _get_unique_read_lengths(sequence_list: List[str]) -> int: + """ + Return the number of unique lengths of strings in a list. + + Parameters + ---------- + sequence_list : List[str] + A list of strings. + + Returns + ------- + int + The number of unique lengths in the list of strings. + + Examples + -------- + >>> _get_unique_read_lengths(['ATCG', 'ATCGATCG', 'AT']) + 2 + + Notes + ----- + This function calculates the number of unique lengths of strings in the given list. + It does this by iterating over each string in the list and getting its length. + The lengths are then converted into a set to remove duplicates, and the length of the set is returned. + """ + all_read_lengths = [len(i) for i in sequence_list] + return len(set(all_read_lengths)) + + +def _sequence_statistics_dispatcher( + reads_list: List[str], ordinal_qualities_list: List[int], threads: int +) -> Tuple[float, float, int, int]: + """ + Calculate sequence statistics using multiple threads. + + Parameters + ---------- + reads_list : List[str] + A list of DNA sequences. + ordinal_qualities_list : List[int] + A list of quality scores corresponding to the DNA sequences. + threads : int + The number of threads to use for parallel execution. + + Returns + ------- + Tuple[float, float, int, int] + A tuple containing the average length of the DNA sequences, + the average quality score, the number of unique quality scores, and the number of unique + DNA sequence lengths. + """ + + with ProcessPoolExecutor(max_workers=threads) as pool: + future_averagelength = pool.submit(_calc_avg_read_length, reads_list) + future_averagequal = pool.submit(_calc_avg_read_qual, ordinal_qualities_list) + future_qualityrange = pool.submit( + _get_unique_quality_scores, ordinal_qualities_list + ) + future_lengthrange = pool.submit(_get_unique_read_lengths, reads_list) + + avg_len = future_averagelength.result() + avg_qual = future_averagequal.result() + quality_range = future_qualityrange.result() + length_range = future_lengthrange.result() + return avg_len, avg_qual, quality_range, length_range diff --git a/AmpliGone/args.py b/AmpliGone/args.py new file mode 100644 index 0000000..b2b0657 --- /dev/null +++ b/AmpliGone/args.py @@ -0,0 +1,365 @@ +import argparse +import multiprocessing +import os +import pathlib +import re +import shutil +import textwrap +from typing import IO, Iterable, List, Optional + +import rich + +from AmpliGone import __prog__, __version__ + + +def get_args(givenargs: List[str]) -> argparse.Namespace: + """It takes the arguments given to the script and parses them into the argparse namespace + + Parameters + ---------- + givenargs + the arguments given to the script + + Returns + ------- + The arguments that are given to the program. + + """ + + def check_file_extensions(allowed_extensions: Iterable[str], fname: str) -> str: + """ + Check if the file extension of the file name passed to it is one of the extensions in the list passed to it. + + Parameters + ---------- + allowed_extensions : iterable of str + A tuple of strings that are valid extensions. + fname : str + The name of the file to be read. + + Returns + ------- + str + The absolute path of the file passed. + + Raises + ------ + ArgumentParser.error + If the file extension of the file name passed to it is not one of the extensions in the list passed to it. + """ + ext = "".join(pathlib.Path(fname).suffixes) + if not any(ext.endswith(c) for c in allowed_extensions): + parser.error(f"File {fname} doesn't end with one of {allowed_extensions}") + return os.path.abspath(fname) + + def check_file_exists(fname: str) -> str | None: + """Check if the given file `fname` exists and return the absolute path. + + Parameters + ---------- + fname : str + The name of the file to be checked. + + Returns + ------- + str + The absolute path of the file if it exists. + + Raises + ------ + ArgumentParser.error + If the file does not exist. + + """ + if os.path.isfile(fname): + return fname + parser.error(f'Error: File "{fname}" does not exist.') + + parser = RichParser( + prog=f"[bold]{__prog__}[/bold]", + usage=f"[bold]{__prog__}[/bold] \[required options] \[optional arguments]", + description=f"[bold underline]{__prog__}[/bold underline]: An accurate and efficient tool to remove primers from NGS reads in reference-based experiments", + formatter_class=FlexibleArgFormatter, + add_help=False, + ) + + standard_threads = min(multiprocessing.cpu_count(), 128) + + required_args = parser.add_argument_group("Required Arguments") + + required_args.add_argument( + "--input", + "-i", + type=lambda s: check_file_exists( + check_file_extensions((".fastq", ".fq", ".bam", ".fastq.gz", ".fq.gz"), s) + ), + metavar="File", + help="Input file with reads in either FastQ or BAM format.", + required=True, + ) + + required_args.add_argument( + "--output", + "-o", + type=lambda s: check_file_extensions( + (".fastq", ".fq", ".fastq.gz", ".fq.gz"), s + ), + metavar="File", + help="Output (FastQ) file with cleaned reads.", + required=True, + ) + + required_args.add_argument( + "--reference", + "-ref", + type=lambda s: check_file_exists(check_file_extensions((".fasta", ".fa"), s)), + metavar="File", + help="Input Reference genome in FASTA format", + required=True, + ) + required_args.add_argument( + "--primers", + "-pr", + type=lambda s: check_file_exists( + check_file_extensions((".fasta", ".fa", ".bed"), s) + ), + metavar="File", + help="""Used primer sequences in FASTA format or primer coordinates in BED format.\n Note that using bed-files overrides error-rate and ambiguity functionality""", + required=True, + ) + + optional_args = parser.add_argument_group("Optional Arguments") + + optional_args.add_argument( + "--amplicon-type", + "-at", + default="end-to-end", + choices=("end-to-end", "end-to-mid", "fragmented"), + help="Define the amplicon-type, either being [green]'end-to-end'[/green], [green]'end-to-mid'[/green], or [green]'fragmented'[/green]. See the docs for more info :book:", + required=False, + metavar="'end-to-end'/'end-to-mid'/'fragmented'", + ) + + optional_args.add_argument( + "--virtual-primers", + "-vp", + default=False, + action="store_true", + required=False, + help="If set, primers closely positioned to each other in the same orientation will be virtually combined into a single primer, ensuring proper removal of all primer-related data in a specific region. This is useful for amplicons that share multiple (alternative) primers for increased specificity.", + ) + + optional_args.add_argument( + "--fragment-lookaround-size", + "-fls", + required=False, + type=int, + metavar="N", + help="The number of bases to look around a primer-site to consider it part of a fragment. Only used if amplicon-type is 'fragmented'. Default is 10", + ) + + optional_args.add_argument( + "--export-primers", + "-ep", + type=lambda s: check_file_extensions((".bed",), s), + metavar="File", + help="Output BED file with found primer coordinates if they are actually cut from the reads", + required=False, + ) + + optional_args.add_argument( + "--threads", + "-t", + type=int, + default=standard_threads, + metavar="N", + help=f"""Number of threads you wish to use.\n Default is the number of available threads in your system ({standard_threads})""", + ) + + optional_args.add_argument( + "-to", + action="store_true", + help=f"If set, {__prog__} will always create the output files even if there is nothing to output. (for example when an empty input-file is given)\n This is useful in (automated) pipelines where you want to make sure that the output files are always created.", + required=False, + ) + + # TODO: explainer that this is a percentage (0.1 == 10%) + optional_args.add_argument( + "--error-rate", + "-er", + type=float, + default=0.1, + metavar="N", + help="The maximum allowed error rate for the primer search. Use 0 for exact primer matches.", + required=False, + ) + + optional_args.add_argument( + "--alignment-preset", + "-ap", + type=str, + default=None, + choices=("sr", "map-ont", "map-pb", "splice"), + help="The preset to use for alignment of reads against the reference. This can be either 'sr', 'map-ont', 'map-pb', or 'splice'. The alignment-preset can be combined with a custom alignment-scoring matrix. See the docs for more info :book:", + required=False, + metavar="'sr'/'map-ont'/'map-pb'/'splice'", + ) + + optional_args.add_argument( + "--alignment-scoring", + "-as", + type=str, + default=None, + metavar="KEY=VALUE", + nargs="+", + help="The scoring matrix to use for alignment of reads. This should be list of key-value pairs, where the key is the scoring-parameter and the value is a positive integer indicating the scoring-value for that parameter. Possible parameters are \n * (1) match\n * (2) mismatch\n * (3) gap_o1\n * (4) gap_e1\n * (5) gap_o2 (Optional: requires 1,2,3,4)\n * (6) gap_e2 (Optional, requires 1,2,3,4,5)\n * (7) mma (Optional, requires 1,2,3,4,5,6)\nFor example:\n --alignment-scoring match=4 mismatch=3 gap_o1=2 gap_e1=1\nSee the docs for more info :book:", + required=False, + ) + + optional_args.add_argument( + "--version", + "-v", + action="version", + version=__version__, + help=f"Show the {__prog__} version and exit", + ) + + optional_args.add_argument( + "--help", + "-h", + action="help", + default=argparse.SUPPRESS, + help="Show this help message and exit", + ) + + optional_args.add_argument( + "--verbose", + "-V", + action="store_true", + help="Prints more information, like DEBUG statements, to the terminal", + required=False, + ) + + optional_args.add_argument( + "--quiet", + "-q", + action="store_true", + help="Prints less information, like only WARNING and ERROR statements, to the terminal", + required=False, + ) + + return parser.parse_args(givenargs) + + +class FlexibleArgFormatter(argparse.HelpFormatter): + """ + A subclass of ArgumentParser.HelpFormatter that fixes spacing in the help text and respects bullet points. + Especially useful for multi-line help texts combined with default values. + + This class has taken a lot of inspiration from the 'argparse-formatter' made by Dave Steele; https://github.com/davesteele/argparse_formatter + + Direct link to davesteele's original code that served as an inspiration for this class: https://github.com/davesteele/argparse_formatter/blob/a15d89a99e20b0cad4c389a2aa490c551cef4f9c/argparse_formatter/flexi_formatter.py + + --- + This class helps to alleviate the following points of the ArgParse help formatting: + * The help text will be aligned with the argument name/flags, instead of printing the help description on a newline + * Adjusting the width of the help text in relationship to the width of the terminal to make sure there is enough space between the argument name/flags and the help text (thus not overloading the end-user with an unreadable wall of text) + * Adding a default value to the help text (on a newline, and indented) if one is provided in the ArgParse constructor + * Respecting bullet points in the help description + * Respecting newlines in the help description (you may have to add a space after the newline to make sure it is properly catched by the formatter) + * Respecting indentation in the help description (up to a certain degree) + * Changes the behaviour of the metavar to be only printed once per long AND shorthand argument, instead of printing the metavar multiple times for every possible flag. + """ + + def __init__(self, prog): + term_width = shutil.get_terminal_size().columns + max_help_position = min(max(24, term_width // 2), 80) + super().__init__(prog, max_help_position=max_help_position) + + def _get_help_string(self, action): + """ """ + help_text = action.help + if ( + action.default != argparse.SUPPRESS + and help_text is not None + and "default" not in help_text.lower() + and action.default is not None + ): + help_text += f"\n ([underline]default: {str(action.default)}[/underline])" + return help_text + + def _format_action_invocation(self, action): + """ """ + if not action.option_strings or action.nargs == 0: + return super()._format_action_invocation(action) + default = self._get_default_metavar_for_optional(action) + args_string = self._format_args(action, default) + return ", ".join(action.option_strings) + " " + args_string + + def _split_lines(self, text, width): + return self._para_reformat(text, width) + + def _fill_text(self, text, width, indent): + lines = self._para_reformat(text, width) + return "\n".join(lines) + + def _indents(self, line): + """Return line indent level and "sub_indent" for bullet list text.""" + + indent = len(re.match(r"( *)", line).group(1)) + if list_match := re.match(r"( *)(([*\-+>]+|\w+\)|\w+\.) +)", line): + sub_indent = indent + len(list_match.group(2)) + else: + sub_indent = indent + + return (indent, sub_indent) + + def _split_paragraphs(self, text): + """Split text in to paragraphs of like-indented lines.""" + + text = textwrap.dedent(text).strip() + text = re.sub("\n\n[\n]+", "\n\n", text) + + last_sub_indent = None + paragraphs = [] + for line in text.splitlines(): + (indent, sub_indent) = self._indents(line) + is_text = re.search(r"[^\s]", line) is not None + + if is_text and indent == sub_indent == last_sub_indent: + paragraphs[-1] += f" {line}" + else: + paragraphs.append(line) + + last_sub_indent = sub_indent if is_text else None + return paragraphs + + def _para_reformat(self, text, width): + """Reformat text, by paragraph.""" + + paragraphs = [] + for paragraph in self._split_paragraphs(text): + (indent, sub_indent) = self._indents(paragraph) + + paragraph = self._whitespace_matcher.sub(" ", paragraph).strip() + new_paragraphs = textwrap.wrap( + text=paragraph, + width=width, + initial_indent=" " * indent, + subsequent_indent=" " * sub_indent, + ) + + # Blank lines get eaten by textwrap, put it back with [' '] + paragraphs.extend(new_paragraphs or [" "]) + + return paragraphs + + +class RichParser(argparse.ArgumentParser): + """ + A subclass of `argparse.ArgumentParser` that overrides the `_print_message` method to use + `rich.print` instead of `print` + """ + + def _print_message(self, message: str, file: Optional[IO[str]] = None) -> None: + return rich.print(message) diff --git a/AmpliGone/cut_reads.py b/AmpliGone/cut_reads.py index fd926e1..8ca0682 100644 --- a/AmpliGone/cut_reads.py +++ b/AmpliGone/cut_reads.py @@ -1,9 +1,12 @@ +import os from collections import defaultdict -from typing import Callable, List, Set, Tuple +from typing import Callable, List, Tuple import mappy as mp import pandas as pd +from AmpliGone.log import log + from .cutlery import PositionInOrAfterPrimer, PositionInOrBeforePrimer @@ -98,7 +101,7 @@ def cut_read( def CutReads( data: Tuple[pd.DataFrame, int], - primer_df: pd.DataFrame, + primer_sets: Tuple[defaultdict, defaultdict], reference: str, preset: str, scoring: List[int], @@ -114,9 +117,8 @@ def CutReads( data : Tuple[pd.DataFrame, int] A tuple containing a pandas DataFrame with columns "Readname", "Sequence", and "Qualities", and an integer representing the thread number. - primer_df : pd.DataFrame - A pandas DataFrame with columns "ref", "start", "end", and "strand" representing the primer - locations on the reference genome. + primer_sets : Tuple[defaultdict, defaultdict] + A tuple containing two defaultdicts, one for forward primers and one for reverse primers. These defaultdicts contain the primer coordinates to remove. reference : str The reference genome sequence. preset : str @@ -137,27 +139,11 @@ def CutReads( representing the processed reads and the coordinates that were removed. """ Frame, _threadnumber = data + log.debug( + f"Initiated thread {_threadnumber} @ process ID {os.getpid()} :: Processing {len(Frame)} reads." + ) - RVDict = defaultdict(set) - FWDict = defaultdict(set) - - reference_ids: Set[str] = set(primer_df["ref"].unique()) - for refid in reference_ids: - RVDict[refid] = set() - FWDict[refid] = set() - - for _, refid, start, end, strand in primer_df[ - ["ref", "start", "end", "strand"] - ].itertuples(): - refid: str - start: int - end: int - strand: str - for coord in range(start + 1, end): # +1 because reference is 1-based - if strand == "+": - FWDict[refid].add(coord) - elif strand == "-": - RVDict[refid].add(coord) + FWDict, RVDict = primer_sets Aln = mp.Aligner( reference, @@ -175,12 +161,45 @@ def CutReads( max_iter = ( 10 # If more iterations are needed, the sequence is discarded (not recorded) ) - for _index, name, seq, qual in Frame[ - ["Readname", "Sequence", "Qualities"] - ].itertuples(): + total_reads = len(Frame) + for index, (_, name, seq, qual) in enumerate( + Frame[["Readname", "Sequence", "Qualities"]].itertuples(), 1 + ): name: str seq: str qual: str + if total_reads >= 10 and index % (total_reads // 10) == 0 and log.level == 10: + completion_percentage = round(index / total_reads * 100) + maxsize = PositionInOrBeforePrimer.cache_info().maxsize + currsize = PositionInOrBeforePrimer.cache_info().currsize + cache_usage_before = ( + currsize / maxsize * 100 + if maxsize is not None and currsize is not None + else 0 + ) + maxsize = PositionInOrAfterPrimer.cache_info().maxsize + currsize = PositionInOrAfterPrimer.cache_info().currsize + cache_usage_after = ( + currsize / maxsize * 100 + if maxsize is not None and currsize is not None + else 0 + ) + # todo: clean up this section of safely dividing by zero + cache_misses = PositionInOrBeforePrimer.cache_info().misses + cache_hit_ratio_before = ( + (PositionInOrBeforePrimer.cache_info().hits / cache_misses) + if cache_misses != 0 + else 0 + ) + cache_misses = PositionInOrAfterPrimer.cache_info().misses + cache_hit_ratio_after = ( + (PositionInOrAfterPrimer.cache_info().hits / cache_misses) + if cache_misses != 0 + else 0 + ) + log.debug( + f"Thread {_threadnumber} @ processID {os.getpid()}\t::\tReads processing {completion_percentage}% complete.\n\tMODULE {PositionInOrBeforePrimer.__module__}.{PositionInOrBeforePrimer.__qualname__} CACHE INFORMATION\n\t\tCache size usage = {cache_usage_before:.2f}%\n\t\tCache hit ratio = {cache_hit_ratio_before:.2f}%\n\tMODULE {PositionInOrAfterPrimer.__module__}.{PositionInOrAfterPrimer.__qualname__} CACHE INFORMATION\n\t\tCache size usage = {cache_usage_after:.2f}%\n\t\tCache hit ratio = {cache_hit_ratio_after:.2f}%" + ) removed_coords_fw = [] removed_coords_rv = [] @@ -217,7 +236,10 @@ def CutReads( RVTuple: Tuple[int, ...] = tuple(RVDict[hit.ctg]) if not FWTuple or not RVTuple: - print(FWTuple, RVTuple, hit.ctg) + log.debug( + f"Thread {_threadnumber} @ processID {os.getpid()}\t::\tRead with name '{name}' aligns to '{hit.ctg}', but there are no primers affiliated with '{hit.ctg}'." + ) + continue qstart: int = hit.q_st qend: int = hit.q_en diff --git a/AmpliGone/fasta2bed.py b/AmpliGone/fasta2bed.py index ca8cef0..917b274 100644 --- a/AmpliGone/fasta2bed.py +++ b/AmpliGone/fasta2bed.py @@ -1,15 +1,20 @@ import argparse +import os +import re from itertools import product -from typing import Any, Dict, Generator, Hashable, List, Set, Tuple +from typing import Dict, Generator, List, Tuple, Union import pandas as pd -import regex as re +import parasail as ps from Bio import Seq, SeqIO +from Bio.Data import IUPACData +from parasail import Cigar -from .func import log +from AmpliGone.io_ops import read_bed +from AmpliGone.log import log -def FindAmbigousOptions(seq: str) -> List[str]: +def find_ambiguous_options(seq: str) -> List[str]: """ Find all possible unambiguous sequences from a sequence containing ambiguous nucleotides. @@ -25,102 +30,405 @@ def FindAmbigousOptions(seq: str) -> List[str]: Examples -------- - >>> FindAmbigousOptions('ATGCR') - ['ATGCA', 'ATGCT'] + >>> find_ambiguous_options('ATGCR') + ['ATGCA', 'ATGCG'] """ - ambigs = Seq.IUPACData.ambiguous_dna_values + ambigs = IUPACData.ambiguous_dna_values return list(map("".join, product(*[ambigs.get(nuc, nuc) for nuc in seq]))) -def get_coords(seq: str, ref_seq: str, err_rate: float = 0.1) -> Set[Tuple[int, int]]: +def parse_cigar_obj(cig_obj: Cigar) -> Tuple[str, str]: """ - Find all the positions in the reference sequence where the query sequence could be found, allowing - for up to `err_rate` errors. + Parse a Cigar object and return the original cigar string and a cleaned cigar string. + + Parameters + ---------- + cig_obj : Cigar + The parasail Cigar object to parse. + + Returns + ------- + Tuple[str, str] + A tuple containing the original cigar string and the cleaned cigar string. + + Examples + -------- + >>> cig_obj = Cigar("10M2D5M") + >>> parse_cigar_obj(cig_obj) + ('10M2D5M', '10M2D5M') + + >>> cig_obj = Cigar("2D10M2D") + >>> parse_cigar_obj(cig_obj) + ('2D10M2D', '10M') + + Notes + ----- + The `parse_cigar_obj` function takes a Cigar object and extracts the original cigar string and a cleaned cigar string. + The original cigar string represents the full cigar string as returned by the `decode` method of the Cigar object. + The cleaned cigar string is obtained by removing any leading or trailing deletions from the original cigar string. + This is necessary because the function uses a semi-global alignment within the parasail library. + + The function first checks if the `cigar_representation` returned by `cig_obj.decode` is a bytes object. + If it is, the bytes object is converted to a string using the 'utf-8' encoding. + Then, the function uses regular expressions to remove any leading or trailing deletions from the cigar string. + The resulting original cigar string and cleaned cigar string are returned as a tuple. + + """ + cigar_representation = cig_obj.decode + cigar_str = ( + str(cigar_representation, "utf-8") + if isinstance(cigar_representation, bytes) + else str(cigar_representation) + ) + cleaned_cigar_str = re.sub(r"(^\d+D)|(\d+D$)", "", cigar_str) + return cigar_str, cleaned_cigar_str + + +def count_cigar_information( + cigar: str, +) -> Tuple[int, int, int, int, Tuple[int, int], Tuple[int, int]]: + """ + Count the number of errors (insertions, deletions, and mismatches) in a CIGAR string. + + Parameters + ---------- + cigar : str + The CIGAR string representing the alignment. + + Returns + ------- + int + The total number of errors in the CIGAR string. + + Examples + -------- + >>> count_cigar_errors("10=2I1=5D3=") + 7 + >>> count_cigar_errors("6=3X2D2=") + 5 + + Notes + ----- + This function takes a CIGAR string and counts the number of errors, which include insertions, deletions, and mismatches. + The CIGAR string represents the alignment between two sequences, where each character in the string represents an operation. + The "=" character represents a match, the "X" character represents a mismatch, the "I" character represents an insertion, + and the "D" character represents a deletion. + + The function uses regular expressions to find all occurrences of insertions, deletions, and mismatches in the CIGAR string. + It then sums the lengths of these occurrences to calculate the total number of errors. + + This function assumes that the CIGAR string is in a valid format and does not perform any error checking or validation. + """ + # print(cigar) + matches = sum(map(int, re.findall(r"(\d+)=", cigar))) + insertions = sum(map(int, re.findall(r"(\d+)I", cigar))) + deletions = sum(map(int, re.findall(r"(\d+)D", cigar))) + mismatches = sum(map(int, re.findall(r"(\d+)X", cigar))) + + # grouped deletions and insertions are another metric. It's necessary to know how many deletions and insertions are grouped together or if they are spread out. + # should be 0 by default and be recorded every time a "D" is found in the cigar string with a number higher than 1 in front of it. + grouped_deletions = 0 + grouped_insertions = 0 + del_groups = 0 + ins_groups = 0 + for i in re.findall(r"(\d+)D", cigar): + if int(i) > 1: + grouped_deletions += int(i) + del_groups += 1 + for i in re.findall(r"(\d+)I", cigar): + if int(i) > 1: + grouped_insertions += int(i) + ins_groups += 1 + + # print("grouped:", grouped_deletions, grouped_insertions) + # print("groups:", del_groups, ins_groups) + # print(f"Matches: {matches}, Mismatches: {mismatches}, Deletions: {deletions}, Insertions: {insertions}") + return ( + matches, + mismatches, + deletions, + insertions, + (grouped_deletions, del_groups), + (grouped_insertions, ins_groups), + ) + + +# def assign_score(cigar_data: Tuple[int, int, int, int, Tuple[int, int], Tuple[int, int]], option_length: int, max_errors: int) -> int: + + +# # from cigar_data we get the number of mismatches, deletions and insertions in that order. +# # right now we only care about the errors so we're omitting matches +# base_errors = cigar_data[1] + cigar_data[2] + cigar_data[3] +# # deletions_grouped = cigar_data[4][0] +# # insertions_grouped = cigar_data[5][0] +# # del_groups = cigar_data[4][1] +# # ins_groups = cigar_data[5][1] + +# if base_errors > max_errors: +# return -1 + +# score_modifier = 100 +# # we start with a base score of 100. + +# # TODO: determine if the penalize logic is *actually* desired. +# # # if there are deletion or insertion groups then we want to *severely* penalize the score. +# # # deletions or insertions itself are okay, and they are penalized as a 'base' error. +# # # however, grouped deletions or insertions (i.e. 4D, or 2I in the cigar string) should be penalized even more as this is should not happen with primers. +# # for _ in range(del_groups): +# # # print(fraction_d) +# # score_modifier /= (deletions_grouped / del_groups if del_groups > 0 else 1) +# # for _ in range(ins_groups): +# # # print(fraction_i) +# # score_modifier /= (insertions_grouped / ins_groups if ins_groups > 0 else 1) + +# # calculate the score based on the number of matches and the length of the option +# matches = cigar_data[0] +# score = int(((option_length - base_errors) / option_length) * score_modifier) + +# # discard if the score is below 50% (50.0). Including scores this low into the final considerations would be a waste of time. +# # else, return the score +# return -1 if score < 50 else score + + +def percentage_representation(option_length: int, score: int) -> int: + # the max score in the nuc44 matrix is 5. So the highest achievable score of a primer-option is the length of the primer times 5. + max_score_of_option = option_length * 5 + return int((score / max_score_of_option) * 100) + + +def get_coords( + seq: str, ref_seq: str, err_rate: float = 0.1 +) -> Tuple[str, int, int, int, int]: + """ + Get the coordinates of the best primer option for a given sequence. Parameters ---------- seq : str - The sequence you want to find in the reference sequence. + The input sequence. ref_seq : str - The reference sequence to search in. + The reference sequence. err_rate : float, optional - The maximum error rate you want to allow. Default is 0.1. + The error rate, which determines the maximum number of errors allowed in the primer sequence, by default 0.1. Returns ------- - set of tuple of int - A set of tuples, each tuple is a match of the sequence in the reference sequence. + Tuple[str, int, int, int] + A tuple containing the best primer option, start position, end position, and score. Examples -------- - >>> get_coords('ATCG', 'CGTATCGTACG') - {(3, 7)} + >>> seq = "ATCG" + >>> ref_seq = "ATCGATCG" + >>> get_coords(seq, ref_seq, err_rate=0.1) + ('ATCG', 0, 4, 100) + + Notes + ----- + This function calculates the best primer option for a given sequence by allowing a maximum number of errors + based on the error rate. It searches for primer options resolved from ambiguous nucleotides in the sequence. + The function uses the `ps.sg_trace_scan_sat` function to perform sequence alignment and calculates the score + and errors based on the alignment results. The start and end positions are determined from the alignment. + The function returns the primer option with the highest score. """ max_errors = int(len(seq) * err_rate) - options = FindAmbigousOptions(seq.upper()) - - matches = set() - for e in range(max_errors + 1): - for option in options: - for match in re.finditer( - f"(?:{str(option)}){{s<={e}}}", ref_seq, re.IGNORECASE, concurrent=True - ): - matches.add(match.span()) - return matches + options = find_ambiguous_options(seq.upper()) + + if len(options) > 1: + log.debug( + f"PRIMERSEARCH :: Searching for [cyan]{len(options)}[/cyan] primer options resolved from ambiguous nucleotides in [green]{seq}[/green]" + ) + not_aligned_section_pattern = r"^(\d+)D" + localresults = [] + for option in options: + alignment = ps.sg_trace_scan_sat( + option, ref_seq, 8, 30, ps.nuc44 + ) # TODO: validate if these gap_open and gap_extend values are appropriate for this use case. See below + # gap_open = 8, gap_extend = 30 + # These values are chosen with the following idea: We want to allow for a few (one or two) deletions in the primer sequence vs the reference to deal with the possibility of a primer that is not perfectly matching the reference. + # However, we only want to allow deletions of 1 nucleotide length, and we want to heavily penalize longer gaps. + # The general idea is that a primer mismatch towards reference is okay if it compensates with one or two small deletions. However the primer should not be too different from the reference as this would indicate a bad primer design. + + cigar_str, cleaned_cigar_str = parse_cigar_obj(alignment.cigar) + parsed_cigar_info = count_cigar_information(cleaned_cigar_str) + score: int = alignment.score + errors = parsed_cigar_info[1] + parsed_cigar_info[2] + parsed_cigar_info[3] + percentage = percentage_representation(len(option), score) + # print("length of option:", len(option)) + # print(max_errors) + if errors > max_errors: + score = -1 + + new_start = 0 + if match := re.search(not_aligned_section_pattern, cigar_str): + new_start = int(match[1]) + start = new_start + end: int = alignment.end_ref + 1 + # print(score) + localresults.append((option, start, end, score, percentage)) + + return max(localresults, key=lambda x: x[3]) + + +def find_or_read_primers( + primerfile: str, + referencefile: str, + err_rate: float, + represent_as_score: bool = False, +) -> pd.DataFrame: + """ + Find or read primers from a given file. + This function checks if the primer file is in BED format. If it is, it reads the file directly. + Otherwise, it generates a DataFrame from the primer file and reference file with a specified error rate. -def MakeCoordinateLists(*args, **kwargs) -> pd.DataFrame: - """Takes a list of sequences, and returns a list of coordinates for each sequence + Parameters + ---------- + primerfile : str + The path to the primer file. This can be in BED format or any other format. + referencefile : str + The path to the reference file. This is used when the primer file is not in BED format. + err_rate : float + The error rate to use when generating the DataFrame from the primer and reference files. Returns ------- - A dataframe with the columns ref, start, end, name, score, strand, seq, revcomp + pd.DataFrame + A DataFrame containing the primer information. The columns are ["ref", "start", "end", "name", "score", "strand", "seq", "revcomp"]. """ + if primerfile.endswith(".bed"): + log.info("Primer coordinates are given in BED format, skipping primer search") + return read_bed(primerfile) return pd.DataFrame( - CoordListGen(*args, **kwargs), + CoordListGen( + primerfile=primerfile, + referencefile=referencefile, + err_rate=err_rate, + represent_as_score=represent_as_score, + ), columns=["ref", "start", "end", "name", "score", "strand", "seq", "revcomp"], ) +def choose_best_fitting_coordinates( + fw_coords: Tuple[str, int, int, int, int], rv_coords: Tuple[str, int, int, int, int] +) -> Tuple[str, int, int, int, int] | None: + """ + Compares the forward and reverse coordinates and returns the best fitting coordinates based on their scores. + + Parameters + ---------- + fw_coords : Tuple[str, int, int, int] + The forward coordinates as a tuple of primer-sequence, start position, end position, and score. + rv_coords : Tuple[str, int, int, int] + The reverse coordinates as a tuple of primer-sequence, start position, end position, and score. + + Returns + ------- + Tuple[str, int, int, int] | None + The best fitting coordinates as a tuple of primer-sequence, start position, end position, and score, or None if no well-fitting coordinates are found. + + Examples + -------- + >>> fw_coords = ('chr1', 100, 200, 5) + >>> rv_coords = ('chr1', 150, 250, 7) + >>> choose_best_fitting_coordinates(fw_coords, rv_coords) + ('chr1', 150, 250, 7) + + >>> fw_coords = ('chr1', 100, 200, 5) + >>> rv_coords = ('chr1', 150, 250, 3) + >>> choose_best_fitting_coordinates(fw_coords, rv_coords) + ('chr1', 100, 200, 5) + + Notes + ----- + This function compares the scores of the forward and reverse coordinates. + If the score of the forward coordinates is higher than the score of the reverse coordinates, the forward coordinates are returned. + If the score of the reverse coordinates is higher, the reverse coordinates are returned. + If the score of either set of coordinates is -1, that set is ignored. + If no coordinates are found, None is returned. + """ + + # compare 'coords' and 'rev_coords'. Keep the one with the highest score, and empty the other. If both have the same score, keep both. + if fw_coords[3] > rv_coords[3]: + forward_coords = fw_coords + reverse_coords = None + elif rv_coords[3] > fw_coords[3]: + forward_coords = None + reverse_coords = rv_coords + else: + forward_coords = fw_coords + reverse_coords = rv_coords + + # remove the record if the score is -1 + if forward_coords is not None and forward_coords[3] == -1: + forward_coords = None + if reverse_coords is not None and reverse_coords[3] == -1: + reverse_coords = None + + best_fitting = forward_coords or reverse_coords + return best_fitting or None + + def CoordListGen( - primerfile: str, referencefile: str, err_rate: float = 0.1 -) -> Generator[Dict[Hashable, Any], None, None]: + primerfile: str, + referencefile: str, + err_rate: float = 0.1, + represent_as_score: bool = False, +) -> Generator[Dict[str, Union[str, int]], None, None]: """ - Generate a list of dictionaries containing the coordinates of primers in a reference sequence. + Generate a list of coordinates for primers found in a reference sequence. Parameters ---------- primerfile : str - The name of the file containing the primers in FASTA format. + The path to the primer file in FASTA format. referencefile : str - The name of the file containing the reference sequence in FASTA format. + The path to the reference file in FASTA format. err_rate : float, optional - The maximum number of mismatches allowed between the primer and the reference. Default is 0.1. + The allowed error rate for primer matching. Defaults to 0.1. Yields ------ dict - A dictionary containing the coordinates of a primer in the reference sequence with keys "ref", "start", "end", - "name", "score", "strand", "seq", and "revcomp". - - Notes - ----- - This function takes a FASTA file of primers and a FASTA file of a reference sequence, and returns a list of - dictionaries containing the coordinates of the primers in the reference sequence. The coordinates are returned as a - dictionary with keys "ref", "start", "end", "name", "score", "strand", "seq", and "revcomp". The "ref" key contains - the reference sequence ID, the "start" and "end" keys contain the start and end positions of the primer in the - reference sequence, the "name" key contains the name of the primer, the "score" key contains a score ("." - by default), the "strand" key contains the strand of the primer ("+" or "-"), the "seq" key contains the sequence - of the primer, and the "revcomp" key contains the reverse complement of the primer sequence. + A dictionary containing the following information for each primer: + - ref : str + The reference ID. + - start : int + The start coordinate of the primer in the reference sequence. + - end : int + The end coordinate of the primer in the reference sequence. + - name : str + The name of the primer. + - score : int + The alignment score of the primer. + - strand : str + The orientation of the primer strand ('+' or '-'). + - seq : str + The sequence of the primer. + - revcomp : str + The reverse complement sequence of the primer. Examples -------- - >>> for coord in CoordListGen('primers.fasta', 'reference.fasta', 0.1): - ... print(coord) + >>> primerfile = "primers.fasta" + >>> referencefile = "reference.fasta" + >>> err_rate = 0.1 + >>> for primer_info in CoordListGen(primerfile, referencefile, err_rate): + ... print(primer_info) + {'ref': 'reference1', 'start': 10, 'end': 20, 'name': 'primer1', 'score': 100, 'strand': '+', 'seq': 'ATCG', 'revcomp': 'CGAT'} + {'ref': 'reference1', 'start': 30, 'end': 40, 'name': 'primer2', 'score': 90, 'strand': '-', 'seq': 'GCTA', 'revcomp': 'TAGC'} + Notes + ----- + This function generates a list of coordinates for primers found in a reference sequence. It takes a primer file in FASTA format and a reference file in FASTA format as input. The allowed error rate for primer matching can be specified using the `err_rate` parameter, which defaults to 0.1. + The function uses the `get_coords` function to calculate the coordinates of the best primer option for each primer sequence. The `get_coords` function allows a maximum number of errors based on the error rate and searches for primer options resolved from ambiguous nucleotides in the sequence. + The function yields a dictionary for each primer, containing the reference ID, start and end coordinates, name, alignment score, strand orientation, primer sequence, and reverse complement sequence. + The examples demonstrate how to use the `CoordListGen` function to generate a list of primer coordinates for a given primer file and reference file. The yielded dictionary for each primer contains the relevant information for further analysis or processing. """ + log.debug(f"Starting PRIMERSEARCH process\t@ ProcessID {os.getpid()}") keyl = ("LEFT", "PLUS", "POSITIVE", "FORWARD") keyr = ("RIGHT", "MINUS", "NEGATIVE", "REVERSE") @@ -142,43 +450,43 @@ def CoordListGen( coords = get_coords(seq, ref_seq, err_rate) rev_coords = get_coords(revcomp, ref_seq, err_rate) - if coords and rev_coords: - log.warning( - f"\tPrimer [yellow underline]{primer.id}[/yellow underline] found on both [underline]forward[/underline] and [underline]reverse strand[/underline] of [yellow underline]{ref_id}[/yellow underline].\n\t[yellow bold]Check to see if this is intended.[/yellow bold]" - ) - if not coords and not rev_coords: + + best_fitting = choose_best_fitting_coordinates(coords, rev_coords) + + if not best_fitting: log.warning( f"\tSkipping [yellow underline]{primer.id}[/yellow underline] as it is found on neither [underline]forward[/underline] or [underline]reverse strand[/underline] of [yellow underline]{ref_id}[/yellow underline].\n\t[yellow bold]Check to see if this is intended.[/yellow bold]" ) continue - if coords and len(coords) > 1: - log.warning( - f"\tPrimer [yellow underline]{primer.id}[/yellow underline] found on multiple locations on [underline]forward strand[/underline] of [yellow underline]{ref_id}[/yellow underline]: \n\t[yellow]{coords}\n\t[bold]Check to see if this is intended.[/yellow][/bold]" - ) - if rev_coords and len(rev_coords) > 1: - log.warning( - f"\tPrimer [yellow underline]{primer.id}[/yellow underline] found on multiple locations on [underline]reverse strand[/underline] of [yellow underline]{ref_id}[/yellow underline]: \n\t[yellow]{rev_coords}\n\t[bold]Check to see if this is intended.[/yellow][/bold]" - ) - for start, end in coords.union(rev_coords): - if any(o in primer.id for o in keyl): - strand = "+" - elif any(o in primer.id for o in keyr): - strand = "-" - else: - log.error( - f"Primer name {primer.id} does not contain orientation (e.g. {primer.id}_RIGHT). Consider suffixing with {keyl + keyr}" - ) - continue - yield dict( - ref=ref_id, - start=start, - end=end, - name=primer.id, - score=".", - strand=strand, - seq=seq, - revcomp=revcomp, + + _, start, end, score, percentage = best_fitting + + if any(o in primer.id for o in keyl): + strand = "+" + elif any(o in primer.id for o in keyr): + strand = "-" + else: + log.error( + f"Primer name {primer.id} does not contain orientation (e.g. {primer.id}_RIGHT). Consider suffixing with {keyl + keyr}" ) + continue + log.debug( + f"PRIMERSEARCH :: Found primer [yellow]{primer.id}[/yellow] at coordinates [cyan]{start}-{end}[/cyan] with alignment-score [cyan]{score}[/cyan] ({percentage}% match) on [yellow]{ref_id}[/yellow]" + ) + + if not represent_as_score: + score = percentage + + yield dict( + ref=ref_id, + start=start, + end=end, + name=primer.id, + score=score, + strand=strand, + seq=seq, + revcomp=revcomp, + ) def CoordinateListsToBed(df: pd.DataFrame, outfile: str) -> None: @@ -250,8 +558,28 @@ def CoordinateListsToBed(df: pd.DataFrame, outfile: str) -> None: default=0.1, ) + args.add_argument( + "--score-representation", + action="store_true", + help="Present the alignment score in the bed file instead of the match-percentage for each primer option (default).", + ) + + args.add_argument( + "--verbose", + action="store_true", + help="Print debug information", + ) + flags = args.parse_args() - df = MakeCoordinateLists(flags.primers, flags.reference, flags.primer_mismatch_rate) + if flags.verbose: + log.setLevel("DEBUG") + + df = find_or_read_primers( + primerfile=flags.primers, + referencefile=flags.reference, + err_rate=flags.primer_mismatch_rate, + represent_as_score=flags.score_representation, + ) CoordinateListsToBed(df, flags.output) diff --git a/AmpliGone/func.py b/AmpliGone/func.py deleted file mode 100644 index 91a589d..0000000 --- a/AmpliGone/func.py +++ /dev/null @@ -1,141 +0,0 @@ -import logging -import shutil -import textwrap -from argparse import SUPPRESS, ArgumentParser, HelpFormatter -from typing import IO, Optional - -import regex as _re -import rich -from rich.highlighter import NullHighlighter -from rich.logging import RichHandler - -# Central logging object using Rich's logging library -FORMAT = "%(message)s" -logging.basicConfig( - level="NOTSET", - format=FORMAT, - datefmt="[%d/%m/%y %H:%M:%S]", - handlers=[ - RichHandler( - show_path=False, - omit_repeated_times=False, - markup=True, - highlighter=NullHighlighter(), - ) - ], -) -log = logging.getLogger("rich") - - -class FlexibleArgFormatter(HelpFormatter): - """ - A subclass of ArgumentParser.HelpFormatter that fixes spacing in the help text and respects bullet points. - Especially useful for multi-line help texts combined with default values. - - This class has taken a lot of inspiration from the 'argparse-formatter' made by Dave Steele; https://github.com/davesteele/argparse_formatter - - Direct link to davesteele's original code that served as an inspiration for this class: https://github.com/davesteele/argparse_formatter/blob/a15d89a99e20b0cad4c389a2aa490c551cef4f9c/argparse_formatter/flexi_formatter.py - - --- - This class helps to alleviate the following points of the ArgParse help formatting: - * The help text will be aligned with the argument name/flags, instead of printing the help description on a newline - * Adjusting the width of the help text in relationship to the width of the terminal to make sure there is enough space between the argument name/flags and the help text (thus not overloading the end-user with an unreadable wall of text) - * Adding a default value to the help text (on a newline, and indented) if one is provided in the ArgParse constructor - * Respecting bullet points in the help description - * Respecting newlines in the help description (you may have to add a space after the newline to make sure it is properly catched by the formatter) - * Respecting indentation in the help description (up to a certain degree) - * Changes the behaviour of the metavar to be only printed once per long AND shorthand argument, instead of printing the metavar multiple times for every possible flag. - """ - - def __init__(self, prog): - term_width = shutil.get_terminal_size().columns - max_help_position = min(max(24, term_width // 2), 80) - super().__init__(prog, max_help_position=max_help_position) - - def _get_help_string(self, action): - """ """ - help_text = action.help - if ( - action.default != SUPPRESS - and help_text is not None - and "default" not in help_text.lower() - and action.default is not None - ): - help_text += f"\n ([underline]default: {str(action.default)}[/underline])" - return help_text - - def _format_action_invocation(self, action): - """ """ - if not action.option_strings or action.nargs == 0: - return super()._format_action_invocation(action) - default = self._get_default_metavar_for_optional(action) - args_string = self._format_args(action, default) - return ", ".join(action.option_strings) + " " + args_string - - def _split_lines(self, text, width): - return self._para_reformat(text, width) - - def _fill_text(self, text, width, indent): - lines = self._para_reformat(text, width) - return "\n".join(lines) - - def _indents(self, line): - """Return line indent level and "sub_indent" for bullet list text.""" - - indent = len(_re.match(r"( *)", line).group(1)) - if list_match := _re.match(r"( *)(([*\-+>]+|\w+\)|\w+\.) +)", line): - sub_indent = indent + len(list_match.group(2)) - else: - sub_indent = indent - - return (indent, sub_indent) - - def _split_paragraphs(self, text): - """Split text in to paragraphs of like-indented lines.""" - - text = textwrap.dedent(text).strip() - text = _re.sub("\n\n[\n]+", "\n\n", text) - - last_sub_indent = None - paragraphs = [] - for line in text.splitlines(): - (indent, sub_indent) = self._indents(line) - is_text = _re.search(r"[^\s]", line) is not None - - if is_text and indent == sub_indent == last_sub_indent: - paragraphs[-1] += f" {line}" - else: - paragraphs.append(line) - - last_sub_indent = sub_indent if is_text else None - return paragraphs - - def _para_reformat(self, text, width): - """Reformat text, by paragraph.""" - - paragraphs = [] - for paragraph in self._split_paragraphs(text): - (indent, sub_indent) = self._indents(paragraph) - - paragraph = self._whitespace_matcher.sub(" ", paragraph).strip() - new_paragraphs = textwrap.wrap( - text=paragraph, - width=width, - initial_indent=" " * indent, - subsequent_indent=" " * sub_indent, - ) - - # Blank lines get eaten by textwrap, put it back with [' '] - paragraphs.extend(new_paragraphs or [" "]) - - return paragraphs - - -class RichParser(ArgumentParser): - """ - A subclass of `argparse.ArgumentParser` that overrides the `_print_message` method to use - `rich.print` instead of `print` - """ - - def _print_message(self, message: str, file: Optional[IO[str]] = None) -> None: - return rich.print(message) diff --git a/AmpliGone/io_ops.py b/AmpliGone/io_ops.py index b7529f1..eb2f83e 100644 --- a/AmpliGone/io_ops.py +++ b/AmpliGone/io_ops.py @@ -1,196 +1,15 @@ import gzip +import os import pathlib import sys from typing import Any, Dict, Hashable, List, TextIO, Tuple import pandas as pd +import pgzip import pysam +from pgzip import PgzipFile -from .func import log - - -def is_zipped(filename: str) -> bool: - """ - Check if the given file is a gzipped file. - - Parameters - ---------- - filename : str - The name of the file to be checked. - - Returns - ------- - bool - True if the file is gzipped, False otherwise. - - Examples - -------- - >>> is_zipped("file.txt.gz") - True - - >>> is_zipped("file.txt") - False - """ - return bool(".gz" in pathlib.Path(filename).suffixes) - - -def is_fastq(filename: str) -> bool: - """ - Check if the given file is a FASTQ file. - - Parameters - ---------- - filename : str - The name of the file to be checked. - - Returns - ------- - bool - True if the file is a FASTQ file, False otherwise. - - Examples - -------- - >>> is_fastq("file.fastq") - True - - >>> is_fastq("file.txt") - False - """ - ext = [".fastq", ".fq"] - return bool(any(item in ext for item in pathlib.Path(filename).suffixes)) - - -def is_bam(filename: str) -> bool: - """ - Check if the given file is a BAM file. - - Parameters - ---------- - filename : str - The name of the file to be checked. - - Returns - ------- - bool - True if the file is a BAM file, False otherwise. - - Examples - -------- - >>> is_bam("file.bam") - True - - >>> is_bam("file.txt") - False - """ - return bool(".bam" in pathlib.Path(filename).suffixes) - - -def read_gzip(filename: str) -> TextIO: - """ - Open a gzip file for reading and return an opened file object. - - Parameters - ---------- - filename : str - The name of the file to be opened. - - Returns - ------- - TextIO - An opened file object. - - Examples - -------- - >>> with read_gzip("file.txt.gz") as f: - ... print(f.read()) - ... - This is a gzipped file. - - """ - return gzip.open(filename, "rt") - - -def read_fastq(filename: str) -> TextIO: - """ - Open a FASTQ file for reading and return an opened file object. - - Parameters - ---------- - filename : str - The name of the file to be opened. - - Returns - ------- - TextIO - An opened file object. - - Examples - -------- - >>> with read_fastq("file.fastq") as f: - ... print(f.read()) - ... - @SEQ_ID - GATTTGGGGTTCAAAGCAGTATCGATCAAATAGTAAATCCATTTGTTCAACTCACAGTTT - + - !''*((((***+))%%%++)(%%%%).1***-+*''))**55CCF>>>>>>CCCCCCC65 - - """ - return open(filename, "rt") - - -def fastq_opener(inputfile: str) -> TextIO: - """ - Open a FASTQ file for reading and return an opened file object. - If the file is gzipped, it will be unzipped before opening. - - Parameters - ---------- - inputfile : str - The name of the input file. - - Returns - ------- - TextIO - An opened file object. - - Examples - -------- - >>> with fastq_opener("file.fastq.gz") as f: - ... print(f.read()) - ... - @SEQ_ID - GATTTGGGGTTCAAAGCAGTATCGATCAAATAGTAAATCCATTTGTTCAACTCACAGTTT - + - !''*((((***+))%%%++)(%%%%).1***-+*''))**55CCF>>>>>>CCCCCCC65 - - """ - if is_zipped(inputfile) is True: - return read_gzip(inputfile) - return read_fastq(inputfile) - - -def LoadBam(inputfile: str) -> pysam.AlignmentFile: - """ - Load a BAM file and return a pysam.AlignmentFile object. - - Parameters - ---------- - inputfile : str - The path to the BAM file. - - Returns - ------- - pysam.AlignmentFile - A file object for reading the BAM file. - - Examples - -------- - >>> bam_file = LoadBam('path/to/file.bam') - >>> for read in bam_file: - ... print(read) - - """ - return pysam.AlignmentFile(inputfile, "rb") +from AmpliGone.log import log def read_bed(filename: str) -> pd.DataFrame: @@ -247,145 +66,296 @@ def read_bed(filename: str) -> pd.DataFrame: return primer_df -def FlipStrand(seq: str, qual: str) -> Tuple[str, str]: - """ - Return the reverse complement of a DNA sequence and its quality score. - - Parameters - ---------- - seq : str - The DNA sequence to be reverse complemented. - qual : str - The quality score of the DNA sequence. - - Returns - ------- - Tuple[str, str] - A tuple containing the reverse complement of the DNA sequence and its quality score. - - Notes - ----- - This function uses the standard Watson-Crick base pairing rules to obtain the reverse complement of the DNA sequence. - - Examples - -------- - >>> seq = 'ATCG' - >>> qual = 'IIII' - >>> flipped_seq, flipped_qual = FlipStrand(seq, qual) - >>> print(flipped_seq, flipped_qual) - CGAT IIII - - """ - complement = {"A": "T", "C": "G", "G": "C", "T": "A", "N": "N"} - bases = list(seq) - bases = [complement[base] for base in bases] - seq = "".join(bases) - seq = seq[::-1] - qual = qual[::-1] - - return seq, qual - - -def LoadData(inputfile: str) -> List[Tuple[str, str, str]]: - """ - Load data from a file and return a list of tuples, where each tuple contains the name, sequence, and quality score of a read. - - Parameters - ---------- - inputfile : str - The input file to be processed. - - Returns - ------- - List[Tuple[str, str, str]] - A list of tuples. Each tuple contains the name, sequence, and quality score of a read. - - Raises - ------ - SystemExit - If the input file is an unsupported filetype. - - Notes - ----- - This function supports two file types: FASTQ and BAM. If the input file is a FASTQ file, it reads the file and extracts the name, sequence, and quality score of each read. If the input file is a BAM file, it uses the LoadBam function to extract the necessary information. - - Examples - -------- - >>> LoadData('example.fastq') - [('read1', 'ACGT', 'IIII'), ('read2', 'TGCA', 'JJJJ')] - - >>> LoadData('example.bam') - [('read1', 'ACGT', 'IIII'), ('read2', 'TGCA', 'JJJJ')] - """ - Reads = [] +class SequenceReads: + def __init__(self, inputfile: str): + log.debug(f"Starting INDEXREADS process\t@ ProcessID {os.getpid()}") + self.tuples = [] + if self._is_fastq(inputfile): + log.debug("INDEXREADS :: Parsing reads from FASTQ file") + self._read_fastq(inputfile) + elif self._is_bam(inputfile): + log.debug("INDEXREADS :: Parsing reads from BAM file") + self._read_bam(inputfile) + else: + log.error( + f'"{inputfile}" is an unsupported filetype. Please try again with a supported filetype' + ) + sys.exit(-1) + + log.debug("INDEXREADS :: Storing copy of indexed reads in a DataFrame") + self.frame = pd.DataFrame.from_records( + self.tuples, columns=["Readname", "Sequence", "Qualities"] + ) - if is_fastq(inputfile) is True: - with fastq_opener(inputfile) as fq: + def _read_fastq(self, inputfile: str) -> None: + with self._fastq_opener(inputfile) as fq: for line in fq: - name: str = line.split()[0][1:] - seq: str = next(fq).strip() + name = line.split()[0][1:] + seq = next(fq).strip() next(fq) - qual: str = next(fq).strip() + qual = next(fq).strip() - Reads.append((name, seq, qual)) - return Reads - if is_bam(inputfile) is True: - for read in LoadBam(inputfile): - if read.is_unmapped is True: - continue + self.tuples.append((name, seq, qual)) - name: str = read.query_name - seq: str = read.query_sequence - qual: str = "".join(map(lambda x: chr(x + 33), read.query_qualities)) + def _read_bam(self, inputfile: str) -> None: + for read in self._load_bam(inputfile): + if read.is_unmapped: + continue - if read.is_reverse is True: - seq, qual = FlipStrand(seq, qual) + name: str | None = read.query_name + seq: str | None = read.query_sequence + qual: str | None = ( + "".join([chr(x + 33) for x in read.query_qualities]) + if read.query_qualities is not None + else None + ) + if name is None or seq is None or qual is None: + continue + if read.is_reverse: + seq, qual = self._flip_strand(seq, qual) if len(seq) != len(qual): + log.debug( + f"Excluding read with name '{name}' from the index due to mismatched sequence and quality length" + ) continue if len(seq) == 0: + log.debug( + f"Excluding read with name '{name}' from the index due to empty sequence" + ) continue - Reads.append((name, seq, qual)) - return Reads - log.error( - f'"{inputfile}" is an unsupported filetype. Please try again with a supported filetype' - ) - sys.exit(-1) - - -def IndexReads(inputfile: str) -> pd.DataFrame: - """ - Read in an input file and return a pandas dataframe with the readname, sequence, and qualities. - - Parameters - ---------- - inputfile : str - The path to the input file. - - Returns - ------- - pd.DataFrame - A pandas dataframe with the columns Readname, Sequence, and Qualities. - - Notes - ----- - This function uses the LoadData function to extract the necessary information from the input file and returns a pandas dataframe with the extracted information. - - Examples - -------- - >>> IndexReads('example.fastq') - Readname Sequence Qualities - 0 read1 ACGT IIII - 1 read2 TGCA JJJJ - """ - return pd.DataFrame.from_records( - LoadData(inputfile), columns=["Readname", "Sequence", "Qualities"] - ) - - -def WriteOutput(output: str, ReadDict: List[Dict[Hashable, Any]]) -> None: + self.tuples.append((name, seq, qual)) + + def _is_fastq(self, filename: str) -> bool: + """ + Check if the given file is a FASTQ file. + + Parameters + ---------- + filename : str + The name of the file to be checked. + + Returns + ------- + bool + True if the file is a FASTQ file, False otherwise. + + Examples + -------- + >>> is_fastq("file.fastq") + True + + >>> is_fastq("file.txt") + False + """ + ext = [".fastq", ".fq"] + return any((item in ext for item in pathlib.Path(filename).suffixes)) + + def _is_zipped(self, filename: str) -> bool: + """ + Check if the given file is a gzipped file. + + Parameters + ---------- + filename : str + The name of the file to be checked. + + Returns + ------- + bool + True if the file is gzipped, False otherwise. + + Examples + -------- + >>> is_zipped("file.txt.gz") + True + + >>> is_zipped("file.txt") + False + """ + with open(filename, "rb") as f: + return f.read(2) == b"\x1f\x8b" + + def _is_bam(self, filename: str) -> bool: + """ + Check if the given file is a BAM file. + + Parameters + ---------- + filename : str + The name of the file to be checked. + + Returns + ------- + bool + True if the file is a BAM file, False otherwise. + + Examples + -------- + >>> is_bam("file.bam") + True + + >>> is_bam("file.txt") + False + """ + return ".bam" in pathlib.Path(filename).suffixes + + def _load_bam(self, inputfile: str) -> pysam.AlignmentFile: + """ + Load a BAM file and return a pysam.AlignmentFile object. + + Parameters + ---------- + inputfile : str + The path to the BAM file. + + Returns + ------- + pysam.AlignmentFile + A file object for reading the BAM file. + + Examples + -------- + >>> bam_file = LoadBam('path/to/file.bam') + >>> for read in bam_file: + ... print(read) + + """ + return pysam.AlignmentFile(inputfile, "rb") + + def _open_gzip_fastq_file(self, filename: str) -> TextIO: + """ + Open a gzip file for reading and return an opened file object. + + Parameters + ---------- + filename : str + The name of the file to be opened. + + Returns + ------- + TextIO + An opened file object. + + Examples + -------- + >>> with read_gzip("file.txt.gz") as f: + ... print(f.read()) + ... + This is a gzipped file. + + """ + return gzip.open(filename, "rt") + + def _open_fastq_file(self, filename: str) -> TextIO: + """ + Open a FASTQ file for reading and return an opened file object. + + Parameters + ---------- + filename : str + The name of the file to be opened. + + Returns + ------- + TextIO + An opened file object. + + Examples + -------- + >>> with read_fastq("file.fastq") as f: + ... print(f.read()) + ... + @SEQ_ID + GATTTGGGGTTCAAAGCAGTATCGATCAAATAGTAAATCCATTTGTTCAACTCACAGTTT + + + !''*((((***+))%%%++)(%%%%).1***-+*''))**55CCF>>>>>>CCCCCCC65 + + """ + return open(filename, "rt") + + def _fastq_opener(self, inputfile: str) -> TextIO: + """ + Open a FASTQ file for reading and return an opened file object. + If the file is gzipped, it will be unzipped before opening. + + Parameters + ---------- + inputfile : str + The name of the input file. + + Returns + ------- + TextIO + An opened file object. + + Examples + -------- + >>> with fastq_opener("file.fastq.gz") as f: + ... print(f.read()) + ... + @SEQ_ID + GATTTGGGGTTCAAAGCAGTATCGATCAAATAGTAAATCCATTTGTTCAACTCACAGTTT + + + !''*((((***+))%%%++)(%%%%).1***-+*''))**55CCF>>>>>>CCCCCCC65 + + """ + if self._is_zipped(inputfile) is True: + log.debug("INDEXREADS :: Reading a gzipped file") + return self._open_gzip_fastq_file(inputfile) + log.debug("INDEXREADS :: Reading a non-gzipped file.") + return self._open_fastq_file(inputfile) + + def _flip_strand(self, seq: str, qual: str) -> Tuple[str, str]: + """ + Return the reverse complement of a DNA sequence and its quality score. + + Parameters + ---------- + seq : str + The DNA sequence to be reverse complemented. + qual : str + The quality score of the DNA sequence. + + Returns + ------- + Tuple[str, str] + A tuple containing the reverse complement of the DNA sequence and its quality score. + + Notes + ----- + This function uses the standard Watson-Crick base pairing rules to obtain the reverse complement of the DNA sequence. + + Examples + -------- + >>> seq = 'ATCG' + >>> qual = 'IIII' + >>> flipped_seq, flipped_qual = FlipStrand(seq, qual) + >>> print(flipped_seq, flipped_qual) + CGAT IIII + + """ + complement = {"A": "T", "C": "G", "G": "C", "T": "A", "N": "N"} + bases = list(seq) + bases = [complement[base] for base in bases] + seq = "".join(bases) + seq = seq[::-1] + qual = qual[::-1] + + return seq, qual + + +def output_file_opener(output_file: str, threads: int) -> TextIO | PgzipFile: + if ".gz" in output_file: + return pgzip.open(output_file, "wt", compresslevel=6, thread=threads) + return open(output_file, "w") + + +def write_output( + output: str, read_records: List[Dict[Hashable, Any]], threads: int +) -> None: """ Write the reads to the output file. @@ -393,8 +363,10 @@ def WriteOutput(output: str, ReadDict: List[Dict[Hashable, Any]]) -> None: ---------- output : str The name of the output file. - ReadDict : List[Dict[Hashable, Any]] - A list of dictionaries, where each dictionary is a read. + read_records : List[Dict[Hashable, Any]] + A list of dictionaries, where each dictionary represents a read. + threads : int + The number of threads to use for writing the output file. Returns ------- @@ -402,19 +374,19 @@ def WriteOutput(output: str, ReadDict: List[Dict[Hashable, Any]]) -> None: Notes ----- - This function takes the output file name and the dictionary of reads as input, and writes the reads - to the output file. + This function takes the output file name, the list of read dictionaries, and the number of threads as input. + It writes the reads to the output file. Examples -------- - >>> WriteOutput("output.txt", [{"Readname": "read1", "Sequence": "ATCG", "Qualities": "20"}]) + >>> write_output("output.txt", [{"Readname": "read1", "Sequence": "ATCG", "Qualities": "20"}], 4) """ - with open(output, "w") as fileout: - for index, k in enumerate(ReadDict): - for key in ReadDict[index]: + with output_file_opener(output, threads) as fileout: + for index, k in enumerate(read_records): + for key in read_records[index]: if key == "Readname": - fileout.write("@" + ReadDict[index][key] + "\n") - if key == "Sequence": - fileout.write(str(ReadDict[index][key]) + "\n" + "+" + "\n") - if key == "Qualities": - fileout.write(str(ReadDict[index][key]) + "\n") + fileout.write("@" + read_records[index][key] + "\n") + elif key == "Sequence": + fileout.write(read_records[index][key] + "\n" + "+" + "\n") + elif key == "Qualities": + fileout.write(read_records[index][key] + "\n") diff --git a/AmpliGone/log.py b/AmpliGone/log.py new file mode 100644 index 0000000..3684487 --- /dev/null +++ b/AmpliGone/log.py @@ -0,0 +1,21 @@ +import logging + +from rich.highlighter import NullHighlighter +from rich.logging import RichHandler + +# Central logging object using Rich's logging library +FORMAT = "%(message)s" +logging.basicConfig( + level="INFO", + format=FORMAT, + datefmt="[%d/%m/%y %H:%M:%S]", + handlers=[ + RichHandler( + show_path=False, + omit_repeated_times=False, + markup=True, + highlighter=NullHighlighter(), + ) + ], +) +log = logging.getLogger("rich") diff --git a/AmpliGone/mappreset.py b/AmpliGone/mappreset.py deleted file mode 100644 index a9955fc..0000000 --- a/AmpliGone/mappreset.py +++ /dev/null @@ -1,375 +0,0 @@ -import sys -from concurrent.futures import ThreadPoolExecutor -from typing import List, Set, Tuple - -import pandas as pd - -from AmpliGone.func import log - - -def Calculate_avg_seq_len(sequence_list: List[str]) -> float: - """ - Calculate the average length of a list of sequences. - - Parameters - ---------- - sequence_list : List[str] - A list of sequences. - - Returns - ------- - float - The average length of the sequences in the list. - - Examples - -------- - >>> Calculate_avg_seq_len(['ATCG', 'ATCGATCG', 'AT']) - 5.0 - """ - return sum(map(len, sequence_list)) / float(len(sequence_list)) - - -def Calculate_avg_seq_qual(quality_list: List[int]) -> float: - """ - Calculate the average quality score of a list of quality scores. - - Parameters - ---------- - quality_list : List[int] - A list of quality scores. - - Returns - ------- - float - The average quality score of the quality scores in the list. - - Examples - -------- - >>> Calculate_avg_seq_qual([20, 30, 40]) - 30.0 - """ - return sum(quality_list) / len(quality_list) - - -def GetQualRange(quality_list: List[int]) -> int: - """ - Return the number of unique quality scores in a list. - - Parameters - ---------- - quality_list : List[int] - A list of quality scores. - - Returns - ------- - int - The number of unique quality scores in the list. - - Examples - -------- - >>> GetQualRange([20, 30, 20, 40]) - 3 - """ - return len(set(quality_list)) - - -def GetLenRange(sequence_list: List[str]) -> int: - """ - Return the number of unique lengths of strings in a list. - - Parameters - ---------- - sequence_list : List[str] - A list of strings. - - Returns - ------- - int - The number of unique lengths in the list of strings. - - Examples - -------- - >>> GetLenRange(['ATCG', 'ATCGATCG', 'AT']) - 2 - """ - AllLengths = [len(i) for i in sequence_list] - return len(set(AllLengths)) - - -def SequenceVariability( - QualRange: int, LengthRange: int, avg_qual: float, avg_len: float -) -> Tuple[float, float]: - """ - Calculate the variability of a sequence based on the difference between the average quality score and the quality score of the - current sequence, and the difference between the average length and the length of the current - sequence. - - Parameters - ---------- - QualRange : int - The difference between the highest and lowest quality scores. - LengthRange : int - The difference between the longest and shortest sequence. - avg_qual : float - The average quality score of the sequence. - avg_len : float - The average length of the sequences. - - Returns - ------- - Tuple[float, float] - The quality variance and length variance. - - Examples - -------- - >>> SequenceVariability(20, 10, 30.0, 5.0) - (0.6666666666666666, 2.0) - """ - QualVariance = QualRange / avg_qual - LengthVariance = LengthRange / avg_len - return QualVariance, LengthVariance - - -def SequenceStability( - QualRange: int, LengthRange: int, avg_qual: float, avg_len: float -) -> float: - """ - Calculate the stability of a sequence based on the variability of the quality scores and the lengths of the sequences. - - Parameters - ---------- - QualRange : int - The difference between the highest and lowest quality scores. - LengthRange : int - The difference between the longest and shortest sequence. - avg_qual : float - The average quality score of the sequence. - avg_len : float - The average length of the sequences. - - Returns - ------- - float - The percentage of stability of the sequence. - - Examples - -------- - >>> SequenceStability(20, 10, 30.0, 5.0) - 32.33333333333333 - """ - QualVar, LenVar = SequenceVariability(QualRange, LengthRange, avg_qual, avg_len) - return 100 - (QualVar + LenVar) - - -def IsLongRead(avg_len: float) -> bool: - """ - Determine if a read is considered long based on its average length. - - Parameters - ---------- - avg_len : float - The average length of the read. - - Returns - ------- - bool - True if the read is considered long (average length > 300), False otherwise. - - Examples - -------- - >>> IsLongRead(250) - False - >>> IsLongRead(350) - True - """ - return avg_len > 300 - - -def FindPreset(threads: int, data: pd.DataFrame) -> str: - """ - Takes a dataframe with the sequence and quality data, and returns a preset name and a list of scoring parameters. - - Parameters - ---------- - threads : int - Number of threads to use for the calculations. - data : pandas.DataFrame - A dataframe containing the reads and qualities. - - Returns - ------- - str - A string that represents the preset. - - Notes - ----- - This function takes a dataframe with the sequence and quality data, and returns a preset name and a list of scoring parameters. - It first extracts the sequence and quality data from the dataframe, and then calculates the average length and quality of the sequences, - as well as the range of quality scores and sequence lengths. Based on these values, it determines the appropriate preset to use for the data. - If the data is considered stable and short, it returns the 'sr' preset. If the data is considered stable and long, it returns the 'map-ont' preset. - If the data is considered unstable or from an unsupported platform, it falls back to the 'sr' preset. - - Examples - -------- - >>> data = pd.DataFrame({'Sequence': ['ATCG', 'GCTA'], 'Qualities': ['!@#$', 'abcd']}) - >>> FindPreset(threads=2, data=data) - 'sr' - """ - ReadList: List[str] = data["Sequence"].tolist() - QualList: List[int] = [ - ord(character) - 33 - for character in [ - x for y in [list(item) for item in data["Qualities"].tolist()] for x in y - ] - ] - - with ThreadPoolExecutor(max_workers=threads) as ex: - TP_averagelength = ex.submit(Calculate_avg_seq_len, ReadList) - TP_averagequal = ex.submit(Calculate_avg_seq_qual, QualList) - TP_qualityrange = ex.submit(GetQualRange, QualList) - TP_lengthrange = ex.submit(GetLenRange, ReadList) - - avg_len = TP_averagelength.result() - avg_qual = TP_averagequal.result() - QualRange = TP_qualityrange.result() - LengthRange = TP_lengthrange.result() - if SequenceStability(QualRange, LengthRange, avg_qual, avg_len) > 97: - if IsLongRead(avg_len) is False: - # this is probably 'short read' illumina NextSeq data - # --> set the 'SR' preset - return "sr" - ##! previous if-statement is not False. - # this is probably 'long read' illumina MiSeq data - # --> the 'SR' preset still applies but we keep it split - # in case a custom set of parameters is necessary in the future - return "sr" - if IsLongRead(avg_len) is True: - # this is probably oxford nanopore data - # --> set the preset to 'map-ont' - return "map-ont" - ##! previous if-statement is not True. - # this might be very 'unstable' nextseq data, - # or from a platform we currently dont really support officially. - # fallback to 'sr' preset - return "sr" - - -def valid_scoring_list_length(input_list: List[str]) -> bool: - """ - Check if the length of the input list is either 4, 6, or 7. - - Parameters - ---------- - input_list : list of str - The list to check the length of. - - Returns - ------- - bool - True if the length of the list is 4, 6, or 7, False otherwise. - """ - return len(input_list) in {4, 6, 7} - - -def valid_scoring_elements(input_list: Set[str], required_elements: Set[str]) -> bool: - """ - Check if all elements of the input list are present in the set of required elements. - - Parameters - ---------- - input_list : set of str - The set to check if its elements are in the required elements set. - required_elements : set of str - The set of required elements. - - Returns - ------- - bool - True if all elements of the input list are in the required elements set, False otherwise. - """ - return input_list.issubset(required_elements) - - -def scoring_has_negative_values(input_list: List[int]) -> bool: - """ - Check if the input list contains any negative values. - - Parameters - ---------- - input_list : list of int - The list to check for negative values. - - Returns - ------- - bool - True if any item in the list is less than 0, False otherwise. - """ - return any(item < 0 for item in input_list) - - -def parse_scoring_matrix(input_matrix: List[str]) -> List[int]: - """ - Parse the scoring matrix from a list of 'key=value' strings to a list of integers matching the required scoring matrix order. - - Parameters - ---------- - input_matrix : list of str - The scoring matrix as a list of strings, where each string is a key-value pair separated by '='. - - Returns - ------- - list of int - The scoring matrix as a list of integers, ordered to fit the mappy input. - - Raises - ------ - SystemExit - If the input matrix is invalid (e.g., wrong length, contains negative values, invalid keys). - - """ - required_4 = sorted(["match", "mismatch", "gap_o1", "gap_e1"]) - required_6 = sorted(required_4 + ["gap_o2", "gap_e2"]) - required_7 = sorted(required_6 + ["mma"]) - - matrix_dict = { - item.split("=")[0]: int(item.split("=")[1]) for item in sorted(input_matrix) - } - - if valid_scoring_list_length(list(matrix_dict.keys())) is False: - log.error( - f"Invalid scoring matrix length. The scoring-matrix must have a length of 4, 6 or 7 parameters. \nThe following input parameters were given: '[red]{' '.join(input_matrix)}[/red]'. \nAfter parsing, these inputs result in the following: [red]{matrix_dict}[/red]. \nPlease note that adding the same key multiple times will result in the last value being used." - ) - sys.exit(1) - - # check if the values of the scoring matrix are non-negative integers - if scoring_has_negative_values(list(matrix_dict.values())) is True: - log.error( - "Given scoring matrix contains a negative value. \nThe scoring matrix may only contain non-negative integers. Please check your input and try again." - ) - sys.exit(1) - - # this section is quite redundant and the same thing is being done multiple times, will see to optimize it later but this is reasonably valid for now. - ordered_vals: List[int] = [] - matrix_keys = list(matrix_dict.keys()) - if len(matrix_keys) == 4: - if not set(matrix_keys).issubset(required_4): - log.error( - f"Invalid combination of scoring matrix keys. A total of 4 valid scoring-matrix keys were given. \nThe following keys are supported for 4 scoring-matrix keys: '[green]{' | '.join(required_4)}[/green]'. \nThe following keys were given: '[red]{' | '.join(matrix_keys)}[/red]'." - ) - sys.exit(1) - ordered_vals = [matrix_dict[key] for key in required_4] - elif len(matrix_keys) == 6: - if not set(matrix_keys).issubset(required_6): - log.error( - f"Invalid combination of scoring matrix keys. A total of 6 valid scoring-matrix keys were given. \nThe following keys are supported for 6 scoring-matrix keys: '[green]{' | '.join(required_6)}[/green]'. \nThe following keys were given: '[red]{' | '.join(matrix_keys)}[/red]'." - ) - sys.exit(1) - ordered_vals = [matrix_dict[key] for key in required_6] - elif len(matrix_keys) == 7: - if not set(matrix_keys).issubset(required_7): - log.error( - f"Invalid combination of scoring matrix keys. A total of 7 valid scoring-matrix keys were given. \nThe following keys are supported for 7 scoring-matrix keys: '[green]{' | '.join(required_7)}[/green]'. \nThe following keys were given: '[red]{' | '.join(matrix_keys)}[/red]'." - ) - sys.exit(1) - ordered_vals = [matrix_dict[key] for key in required_7] - return ordered_vals diff --git a/AmpliGone/version.py b/AmpliGone/version.py deleted file mode 100644 index 67bc602..0000000 --- a/AmpliGone/version.py +++ /dev/null @@ -1 +0,0 @@ -__version__ = "1.3.0" diff --git a/env.yml b/env.yml index a1473b3..c8900aa 100644 --- a/env.yml +++ b/env.yml @@ -2,14 +2,14 @@ name: AmpliGone channels: - bioconda - conda-forge - - intel dependencies: - - python=3.8 - - pandas>=1.3 - - pysam=0.19 - - biopython>=1.79 - - mappy==2.24 - - minimap2==2.24 - - parmap=1.5 - - regex>=2021.11.10 - - rich=12.5 + - python>=3.10 + - pandas=2.2 + - pysam=0.22 + - biopython==1.84 + - mappy==2.28 + - minimap2==2.28 + - parmap=1.7 + - parasail-python==1.3.4 + - rich=13.7 + - pgzip==0.3.4 diff --git a/setup.py b/setup.py index c404fff..1223c26 100644 --- a/setup.py +++ b/setup.py @@ -2,48 +2,49 @@ from setuptools import find_packages, setup -from AmpliGone.version import __version__ +from AmpliGone import __prog__, __version__ -if sys.version_info.major != 3 or sys.version_info.minor < 8: - print("Error: you must execute setup.py using Python 3.8 or later") +if sys.version_info.major != 3 or sys.version_info.minor < 10: + print("Error: you must execute setup.py using Python 3.10 or later") sys.exit(1) with open("README.md", "r", encoding="utf-8") as readme: DESCR = readme.read() setup( - name="AmpliGone", + name=__prog__, version=__version__, url="https://rivm-bioinformatics.github.io/AmpliGone/", project_urls={"Source Code": "https://github.com/RIVM-bioinformatics/AmpliGone"}, author="Florian Zwagemaker", author_email="ids-bioinformatics@rivm.nl", - description="Ampligone is a tool which accurately removes primer sequences from FastQ NGS reads in an amplicon sequencing experiment", + description=f"{__prog__} is a tool which accurately removes primer sequences from FastQ NGS reads in an amplicon sequencing experiment", long_description=DESCR, long_description_content_type="text/markdown", - python_requires=">=3.8", + python_requires=">=3.10", license="AGPLv3", packages=find_packages(), install_requires=[ - "pysam==0.19.*", - "pandas>=1.3.0", - "mappy==2.24", - "biopython>=1.79", - "parmap==1.5.*", - "regex>=2021.11.10", - "rich==12.5.*", + "pysam==0.22.*", + "pandas==2.2.*", + "mappy==2.28", + "biopython==1.84", + "parmap==1.7.*", + "parasail==1.3.4", + "rich==13.7.*", + "pgzip==0.3.4", ], entry_points={ "console_scripts": [ - "ampligone = AmpliGone.AmpliGone:main", - "AmpliGone = AmpliGone.AmpliGone:main", + "ampligone = AmpliGone.__main__:main", + "AmpliGone = AmpliGone.__main__:main", ] }, keywords=[], zip_safe=False, classifiers=[ "Development Status :: 5 - Production/Stable", - "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.10", "License :: OSI Approved :: GNU Affero General Public License v3", "Topic :: Scientific/Engineering :: Bio-Informatics", "Intended Audience :: Science/Research",