diff --git a/biobb_dna/backbone/bipopulations.py b/biobb_dna/backbone/bipopulations.py index 9372f6ad..04559ed9 100755 --- a/biobb_dna/backbone/bipopulations.py +++ b/biobb_dna/backbone/bipopulations.py @@ -6,12 +6,14 @@ import matplotlib.pyplot as plt import pandas as pd +from biobb_common.configuration import settings +from biobb_common.generic.biobb_object import BiobbObject +from biobb_common.tools.file_utils import launchlogger from numpy import nan + +from biobb_dna.utils.common import _from_string_to_list from biobb_dna.utils.loader import read_series from biobb_dna.utils.transform import inverse_complement -from biobb_common.generic.biobb_object import BiobbObject -from biobb_common.tools.file_utils import launchlogger -from biobb_common.configuration import settings class BIPopulations(BiobbObject): @@ -60,10 +62,17 @@ class BIPopulations(BiobbObject): """ - def __init__(self, input_epsilC_path, input_epsilW_path, - input_zetaC_path, input_zetaW_path, - output_csv_path, output_jpg_path, - properties=None, **kwargs) -> None: + def __init__( + self, + input_epsilC_path, + input_epsilW_path, + input_zetaC_path, + input_zetaW_path, + output_csv_path, + output_jpg_path, + properties=None, + **kwargs, + ) -> None: properties = properties or {} # Call parent class constructor @@ -72,21 +81,23 @@ def __init__(self, input_epsilC_path, input_epsilW_path, # Input/Output files self.io_dict = { - 'in': { - 'input_epsilC_path': input_epsilC_path, - 'input_epsilW_path': input_epsilW_path, - 'input_zetaC_path': input_zetaC_path, - 'input_zetaW_path': input_zetaW_path + "in": { + "input_epsilC_path": input_epsilC_path, + "input_epsilW_path": input_epsilW_path, + "input_zetaC_path": input_zetaC_path, + "input_zetaW_path": input_zetaW_path, + }, + "out": { + "output_csv_path": output_csv_path, + "output_jpg_path": output_jpg_path, }, - 'out': { - 'output_csv_path': output_csv_path, - 'output_jpg_path': output_jpg_path - } } self.properties = properties self.sequence = properties.get("sequence") - self.seqpos = properties.get("seqpos", None) + self.seqpos = [ + int(item) for item in _from_string_to_list(properties.get("seqpos", None)) + ] # Check the properties self.check_properties(properties) @@ -102,90 +113,68 @@ def launch(self) -> int: self.stage_files() # check sequence - if self.sequence is None or len(self.sequence) < 2: + if not self.sequence or len(self.sequence) < 2: raise ValueError("sequence is null or too short!") # check seqpos - if self.seqpos is not None: + if self.seqpos: if (max(self.seqpos) > len(self.sequence) - 2) or (min(self.seqpos) < 1): raise ValueError( - f"seqpos values must be between 1 and {len(self.sequence) - 2}") + f"seqpos values must be between 1 and {len(self.sequence) - 2}" + ) if not (isinstance(self.seqpos, list) and len(self.seqpos) > 1): - raise ValueError( - "seqpos must be a list of at least two integers") + raise ValueError("seqpos must be a list of at least two integers") # read input files epsilC = read_series( - self.stage_io_dict['in']['input_epsilC_path'], - usecols=self.seqpos) + self.stage_io_dict["in"]["input_epsilC_path"], usecols=self.seqpos + ) epsilW = read_series( - self.stage_io_dict['in']['input_epsilW_path'], - usecols=self.seqpos) + self.stage_io_dict["in"]["input_epsilW_path"], usecols=self.seqpos + ) zetaC = read_series( - self.stage_io_dict['in']['input_zetaC_path'], - usecols=self.seqpos) + self.stage_io_dict["in"]["input_zetaC_path"], usecols=self.seqpos + ) zetaW = read_series( - self.stage_io_dict['in']['input_zetaW_path'], - usecols=self.seqpos) + self.stage_io_dict["in"]["input_zetaW_path"], usecols=self.seqpos + ) # calculate difference between epsil and zeta parameters - xlabels = self.get_xlabels( - self.sequence, - inverse_complement(self.sequence)) - diff_epsil_zeta = self.get_angles_difference( - epsilC, - zetaC, - epsilW, - zetaW) + xlabels = self.get_xlabels(self.sequence, inverse_complement(self.sequence)) + diff_epsil_zeta = self.get_angles_difference(epsilC, zetaC, epsilW, zetaW) # calculate BI population BI = (diff_epsil_zeta < 0).sum(axis=0) * 100 / len(diff_epsil_zeta) BII = 100 - BI # save table - Bpopulations_df = pd.DataFrame({ - "Nucleotide": xlabels, - "BI population": BI, - "BII population": BII}) + Bpopulations_df = pd.DataFrame( + {"Nucleotide": xlabels, "BI population": BI, "BII population": BII} + ) Bpopulations_df.to_csv( - self.stage_io_dict['out']['output_csv_path'], - index=False) + self.stage_io_dict["out"]["output_csv_path"], index=False + ) # save plot fig, axs = plt.subplots(1, 1, dpi=300, tight_layout=True) - axs.bar( - range(len(xlabels)), - BI, - label="BI") - axs.bar( - range(len(xlabels)), - BII, - bottom=BI, - label="BII") + axs.bar(range(len(xlabels)), BI, label="BI") + axs.bar(range(len(xlabels)), BII, bottom=BI, label="BII") # empty bar to divide both sequences - axs.bar( - [len(BI)//2], - [100], - color='white', - label=None) + axs.bar([len(BI) // 2], [100], color="white", label=None) axs.legend() axs.set_xticks(range(len(xlabels))) axs.set_xticklabels(xlabels, rotation=90) axs.set_xlabel("Nucleotide Sequence") axs.set_ylabel("BI/BII Population (%)") axs.set_title("Nucleotide parameter: BI/BII Population") - fig.savefig( - self.stage_io_dict['out']['output_jpg_path'], - format="jpg") + fig.savefig(self.stage_io_dict["out"]["output_jpg_path"], format="jpg") plt.close() # Copy files to host self.copy_to_host() # Remove temporary file(s) - self.tmp_files.extend([ - self.stage_io_dict.get("unique_dir", "") - ]) + self.tmp_files.extend([self.stage_io_dict.get("unique_dir", "")]) self.remove_tmp_files() self.check_arguments(output_files_created=True, raise_exception=False) @@ -195,35 +184,25 @@ def launch(self) -> int: def get_xlabels(self, strand1, strand2): # get list of tetramers, except first and last two bases labelsW = list(strand1) - labelsW[0] = f"{labelsW[0]}5\'" - labelsW[-1] = f"{labelsW[-1]}3\'" - labelsW = [ - f"{i}-{j}" for i, j in zip(labelsW, range(1, len(labelsW)+1))] + labelsW[0] = f"{labelsW[0]}5'" + labelsW[-1] = f"{labelsW[-1]}3'" + labelsW = [f"{i}-{j}" for i, j in zip(labelsW, range(1, len(labelsW) + 1))] labelsC = list(strand2)[::-1] - labelsC[0] = f"{labelsC[0]}5\'" - labelsC[-1] = f"{labelsC[-1]}3\'" - labelsC = [ - f"{i}-{j}" for i, j in zip(labelsC, range(len(labelsC), 0, -1))] + labelsC[0] = f"{labelsC[0]}5'" + labelsC[-1] = f"{labelsC[-1]}3'" + labelsC = [f"{i}-{j}" for i, j in zip(labelsC, range(len(labelsC), 0, -1))] - if self.seqpos is not None: + if self.seqpos: labelsC = [labelsC[i] for i in self.seqpos] labelsW = [labelsW[i] for i in self.seqpos] - xlabels = labelsW + ['-'] + labelsC + xlabels = labelsW + ["-"] + labelsC return xlabels def get_angles_difference(self, epsilC, zetaC, epsilW, zetaW): # concatenate zeta and epsil arrays separator_df = pd.DataFrame({"-": nan}, index=range(len(zetaW))) - zeta = pd.concat([ - zetaW, - separator_df, - zetaC[zetaC.columns[::-1]]], - axis=1) - epsil = pd.concat([ - epsilW, - separator_df, - epsilC[epsilC.columns[::-1]]], - axis=1) + zeta = pd.concat([zetaW, separator_df, zetaC[zetaC.columns[::-1]]], axis=1) + epsil = pd.concat([epsilW, separator_df, epsilC[epsilC.columns[::-1]]], axis=1) # difference between epsilon and zeta coordinates diff_epsil_zeta = epsil - zeta @@ -231,12 +210,17 @@ def get_angles_difference(self, epsilC, zetaC, epsilW, zetaW): def bipopulations( - input_epsilC_path: str, input_epsilW_path: str, - input_zetaC_path: str, input_zetaW_path: str, - output_csv_path: str, output_jpg_path: str, - properties: Optional[dict] = None, **kwargs) -> int: + input_epsilC_path: str, + input_epsilW_path: str, + input_zetaC_path: str, + input_zetaW_path: str, + output_csv_path: str, + output_jpg_path: str, + properties: Optional[dict] = None, + **kwargs, +) -> int: """Create :class:`BIPopulations ` class and - execute the: meth: `launch() ` method. """ + execute the: meth: `launch() ` method.""" return BIPopulations( input_epsilC_path=input_epsilC_path, @@ -245,28 +229,50 @@ def bipopulations( input_zetaW_path=input_zetaW_path, output_csv_path=output_csv_path, output_jpg_path=output_jpg_path, - properties=properties, **kwargs).launch() + properties=properties, + **kwargs, + ).launch() def main(): """Command line execution of this building block. Please check the command line documentation.""" - parser = argparse.ArgumentParser(description='Calculate BI/BII populations.', - formatter_class=lambda prog: argparse.RawTextHelpFormatter(prog, width=99999)) - parser.add_argument('--config', required=False, help='Configuration file') - - required_args = parser.add_argument_group('required arguments') - required_args.add_argument('--input_epsilC_path', required=True, - help='Helical parameter input ser file path. Accepted formats: ser.') - required_args.add_argument('--input_epsilW_path', required=True, - help='Helical parameter input ser file path. Accepted formats: ser.') - required_args.add_argument('--input_zetaC_path', required=True, - help='Helical parameter input ser file path. Accepted formats: ser.') - required_args.add_argument('--input_zetaW_path', required=True, - help='Helical parameter input ser file path. Accepted formats: ser.') - required_args.add_argument('--output_csv_path', required=True, - help='Path to output csv file. Accepted formats: csv.') - required_args.add_argument('--output_jpg_path', required=True, - help='Path to output jpg file. Accepted formats: jpg.') + parser = argparse.ArgumentParser( + description="Calculate BI/BII populations.", + formatter_class=lambda prog: argparse.RawTextHelpFormatter(prog, width=99999), + ) + parser.add_argument("--config", required=False, help="Configuration file") + + required_args = parser.add_argument_group("required arguments") + required_args.add_argument( + "--input_epsilC_path", + required=True, + help="Helical parameter input ser file path. Accepted formats: ser.", + ) + required_args.add_argument( + "--input_epsilW_path", + required=True, + help="Helical parameter input ser file path. Accepted formats: ser.", + ) + required_args.add_argument( + "--input_zetaC_path", + required=True, + help="Helical parameter input ser file path. Accepted formats: ser.", + ) + required_args.add_argument( + "--input_zetaW_path", + required=True, + help="Helical parameter input ser file path. Accepted formats: ser.", + ) + required_args.add_argument( + "--output_csv_path", + required=True, + help="Path to output csv file. Accepted formats: csv.", + ) + required_args.add_argument( + "--output_jpg_path", + required=True, + help="Path to output jpg file. Accepted formats: jpg.", + ) args = parser.parse_args() args.config = args.config or "{}" @@ -279,8 +285,9 @@ def main(): input_zetaW_path=args.input_zetaW_path, output_csv_path=args.output_csv_path, output_jpg_path=args.output_jpg_path, - properties=properties) + properties=properties, + ) -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/biobb_dna/backbone/canonicalag.py b/biobb_dna/backbone/canonicalag.py index bd84e494..47c8f1b9 100755 --- a/biobb_dna/backbone/canonicalag.py +++ b/biobb_dna/backbone/canonicalag.py @@ -5,13 +5,15 @@ from typing import Optional import matplotlib.pyplot as plt -import pandas as pd import numpy as np -from biobb_dna.utils.loader import read_series -from biobb_dna.utils.transform import inverse_complement +import pandas as pd +from biobb_common.configuration import settings from biobb_common.generic.biobb_object import BiobbObject from biobb_common.tools.file_utils import launchlogger -from biobb_common.configuration import settings + +from biobb_dna.utils.common import _from_string_to_list +from biobb_dna.utils.loader import read_series +from biobb_dna.utils.transform import inverse_complement class CanonicalAG(BiobbObject): @@ -61,10 +63,17 @@ class CanonicalAG(BiobbObject): """ - def __init__(self, input_alphaC_path, input_alphaW_path, - input_gammaC_path, input_gammaW_path, - output_csv_path, output_jpg_path, - properties=None, **kwargs) -> None: + def __init__( + self, + input_alphaC_path, + input_alphaW_path, + input_gammaC_path, + input_gammaW_path, + output_csv_path, + output_jpg_path, + properties=None, + **kwargs, + ) -> None: properties = properties or {} # Call parent class constructor @@ -73,24 +82,24 @@ def __init__(self, input_alphaC_path, input_alphaW_path, # Input/Output files self.io_dict = { - 'in': { - 'input_alphaC_path': input_alphaC_path, - 'input_alphaW_path': input_alphaW_path, - 'input_gammaC_path': input_gammaC_path, - 'input_gammaW_path': input_gammaW_path + "in": { + "input_alphaC_path": input_alphaC_path, + "input_alphaW_path": input_alphaW_path, + "input_gammaC_path": input_gammaC_path, + "input_gammaW_path": input_gammaW_path, + }, + "out": { + "output_csv_path": output_csv_path, + "output_jpg_path": output_jpg_path, }, - 'out': { - 'output_csv_path': output_csv_path, - 'output_jpg_path': output_jpg_path - } } self.properties = properties self.sequence = properties.get("sequence") - self.stride = properties.get( - "stride", 1000) - self.seqpos = properties.get( - "seqpos", None) + self.stride = properties.get("stride", 1000) + self.seqpos = [ + int(elem) for elem in _from_string_to_list(properties.get("seqpos", None)) + ] # Check the properties self.check_properties(properties) @@ -106,27 +115,31 @@ def launch(self) -> int: self.stage_files() # check sequence - if self.sequence is None or len(self.sequence) < 2: + if not self.sequence or len(self.sequence) < 2: raise ValueError("sequence is null or too short!") # check seqpos - if self.seqpos is not None: + if self.seqpos: if (max(self.seqpos) > len(self.sequence) - 2) or (min(self.seqpos) < 1): raise ValueError( - f"seqpos values must be between 1 and {len(self.sequence) - 2}") + f"seqpos values must be between 1 and {len(self.sequence) - 2}" + ) if not (isinstance(self.seqpos, list) and len(self.seqpos) > 1): - raise ValueError( - "seqpos must be a list of at least two integers") + raise ValueError("seqpos must be a list of at least two integers") # read input files alphaC = read_series( - self.stage_io_dict['in']['input_alphaC_path'], usecols=self.seqpos) + self.stage_io_dict["in"]["input_alphaC_path"], usecols=self.seqpos + ) alphaW = read_series( - self.stage_io_dict['in']['input_alphaW_path'], usecols=self.seqpos) + self.stage_io_dict["in"]["input_alphaW_path"], usecols=self.seqpos + ) gammaC = read_series( - self.stage_io_dict['in']['input_gammaC_path'], usecols=self.seqpos) + self.stage_io_dict["in"]["input_gammaC_path"], usecols=self.seqpos + ) gammaW = read_series( - self.stage_io_dict['in']['input_gammaW_path'], usecols=self.seqpos) + self.stage_io_dict["in"]["input_gammaW_path"], usecols=self.seqpos + ) # fix angle range so its not negative alphaC = self.fix_angles(alphaC) @@ -135,59 +148,45 @@ def launch(self) -> int: gammaW = self.fix_angles(gammaW) # calculate difference between epsil and zeta parameters - xlabels = self.get_xlabels( - self.sequence, - inverse_complement(self.sequence)) - canonical_populations = self.check_alpha_gamma( - alphaC, - gammaC, - alphaW, - gammaW) + xlabels = self.get_xlabels(self.sequence, inverse_complement(self.sequence)) + canonical_populations = self.check_alpha_gamma(alphaC, gammaC, alphaW, gammaW) # save table canonical_populations.name = "Canonical alpha/gamma" - ag_populations_df = pd.DataFrame({ - "Nucleotide": xlabels, - "Canonical alpha/gamma": canonical_populations}) + ag_populations_df = pd.DataFrame( + {"Nucleotide": xlabels, "Canonical alpha/gamma": canonical_populations} + ) ag_populations_df.to_csv( - self.stage_io_dict['out']['output_csv_path'], - index=False) + self.stage_io_dict["out"]["output_csv_path"], index=False + ) # save plot fig, axs = plt.subplots(1, 1, dpi=300, tight_layout=True) axs.bar( - range(len(xlabels)), - canonical_populations, - label="canonical alpha/gamma") + range(len(xlabels)), canonical_populations, label="canonical alpha/gamma" + ) axs.bar( range(len(xlabels)), 100 - canonical_populations, bottom=canonical_populations, - label=None) + label=None, + ) # empty bar to divide both sequences - axs.bar( - [len(alphaC.columns)], - [100], - color='white', - label=None) + axs.bar([len(alphaC.columns)], [100], color="white", label=None) axs.legend() axs.set_xticks(range(len(xlabels))) axs.set_xticklabels(xlabels, rotation=90) axs.set_xlabel("Nucleotide Sequence") axs.set_ylabel("Canonical Alpha-Gamma (%)") axs.set_title("Nucleotide parameter: Canonical Alpha-Gamma") - fig.savefig( - self.stage_io_dict['out']['output_jpg_path'], - format="jpg") + fig.savefig(self.stage_io_dict["out"]["output_jpg_path"], format="jpg") plt.close() # Copy files to host self.copy_to_host() # Remove temporary file(s) - self.tmp_files.extend([ - self.stage_io_dict.get("unique_dir", "") - ]) + self.tmp_files.extend([self.stage_io_dict.get("unique_dir", "")]) self.remove_tmp_files() return 0 @@ -195,39 +194,28 @@ def launch(self) -> int: def get_xlabels(self, strand1, strand2): # get list of tetramers, except first and last two bases labelsW = list(strand1) - labelsW[0] = f"{labelsW[0]}5\'" - labelsW[-1] = f"{labelsW[-1]}3\'" - labelsW = [ - f"{i}-{j}" for i, j in zip(labelsW, range(1, len(labelsW)+1))] + labelsW[0] = f"{labelsW[0]}5'" + labelsW[-1] = f"{labelsW[-1]}3'" + labelsW = [f"{i}-{j}" for i, j in zip(labelsW, range(1, len(labelsW) + 1))] labelsC = list(strand2)[::-1] - labelsC[0] = f"{labelsC[0]}5\'" - labelsC[-1] = f"{labelsC[-1]}3\'" - labelsC = [ - f"{i}-{j}" for i, j in zip(labelsC, range(len(labelsC), 0, -1))] + labelsC[0] = f"{labelsC[0]}5'" + labelsC[-1] = f"{labelsC[-1]}3'" + labelsC = [f"{i}-{j}" for i, j in zip(labelsC, range(len(labelsC), 0, -1))] if self.seqpos is not None: labelsC = [labelsC[i] for i in self.seqpos] labelsW = [labelsW[i] for i in self.seqpos] - xlabels = labelsW + ['-'] + labelsC + xlabels = labelsW + ["-"] + labelsC return xlabels def check_alpha_gamma(self, alphaC, gammaC, alphaW, gammaW): separator_df = pd.DataFrame({"-": np.nan}, index=range(len(gammaW))) - gamma = pd.concat([ - gammaW, - separator_df, - gammaC[gammaC.columns[::-1]]], - axis=1) - alpha = pd.concat([ - alphaW, - separator_df, - alphaC[alphaC.columns[::-1]]], - axis=1) + gamma = pd.concat([gammaW, separator_df, gammaC[gammaC.columns[::-1]]], axis=1) + alpha = pd.concat([alphaW, separator_df, alphaC[alphaC.columns[::-1]]], axis=1) alpha_filter = np.logical_and(alpha > 240, alpha < 360) gamma_filter = np.logical_and(gamma > 0, gamma < 120) - canonical_alpha_gamma = np.logical_and( - alpha_filter, gamma_filter).mean() * 100 + canonical_alpha_gamma = np.logical_and(alpha_filter, gamma_filter).mean() * 100 return canonical_alpha_gamma @@ -239,12 +227,17 @@ def fix_angles(self, dataset): def canonicalag( - input_alphaC_path: str, input_alphaW_path: str, - input_gammaC_path: str, input_gammaW_path: str, - output_csv_path: str, output_jpg_path: str, - properties: Optional[dict] = None, **kwargs) -> int: + input_alphaC_path: str, + input_alphaW_path: str, + input_gammaC_path: str, + input_gammaW_path: str, + output_csv_path: str, + output_jpg_path: str, + properties: Optional[dict] = None, + **kwargs, +) -> int: """Create :class:`CanonicalAG ` class and - execute the: meth: `launch() ` method. """ + execute the: meth: `launch() ` method.""" return CanonicalAG( input_alphaC_path=input_alphaC_path, @@ -253,28 +246,50 @@ def canonicalag( input_gammaW_path=input_gammaW_path, output_csv_path=output_csv_path, output_jpg_path=output_jpg_path, - properties=properties, **kwargs).launch() + properties=properties, + **kwargs, + ).launch() def main(): """Command line execution of this building block. Please check the command line documentation.""" - parser = argparse.ArgumentParser(description='Calculate Canonical Alpha/Gamma distributions.', - formatter_class=lambda prog: argparse.RawTextHelpFormatter(prog, width=99999)) - parser.add_argument('--config', required=False, help='Configuration file') - - required_args = parser.add_argument_group('required arguments') - required_args.add_argument('--input_alphaC_path', required=True, - help='Helical parameter input ser file path. Accepted formats: ser.') - required_args.add_argument('--input_alphaW_path', required=True, - help='Helical parameter input ser file path. Accepted formats: ser.') - required_args.add_argument('--input_gammaC_path', required=True, - help='Helical parameter input ser file path. Accepted formats: ser.') - required_args.add_argument('--input_gammaW_path', required=True, - help='Helical parameter input ser file path. Accepted formats: ser.') - required_args.add_argument('--output_csv_path', required=True, - help='Path to output csv file. Accepted formats: csv.') - required_args.add_argument('--output_jpg_path', required=True, - help='Path to output jpg file. Accepted formats: jpg.') + parser = argparse.ArgumentParser( + description="Calculate Canonical Alpha/Gamma distributions.", + formatter_class=lambda prog: argparse.RawTextHelpFormatter(prog, width=99999), + ) + parser.add_argument("--config", required=False, help="Configuration file") + + required_args = parser.add_argument_group("required arguments") + required_args.add_argument( + "--input_alphaC_path", + required=True, + help="Helical parameter input ser file path. Accepted formats: ser.", + ) + required_args.add_argument( + "--input_alphaW_path", + required=True, + help="Helical parameter input ser file path. Accepted formats: ser.", + ) + required_args.add_argument( + "--input_gammaC_path", + required=True, + help="Helical parameter input ser file path. Accepted formats: ser.", + ) + required_args.add_argument( + "--input_gammaW_path", + required=True, + help="Helical parameter input ser file path. Accepted formats: ser.", + ) + required_args.add_argument( + "--output_csv_path", + required=True, + help="Path to output csv file. Accepted formats: csv.", + ) + required_args.add_argument( + "--output_jpg_path", + required=True, + help="Path to output jpg file. Accepted formats: jpg.", + ) args = parser.parse_args() args.config = args.config or "{}" @@ -287,8 +302,9 @@ def main(): input_gammaW_path=args.input_gammaW_path, output_csv_path=args.output_csv_path, output_jpg_path=args.output_jpg_path, - properties=properties) + properties=properties, + ) -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/biobb_dna/backbone/puckering.py b/biobb_dna/backbone/puckering.py index 3fa24251..8d13d4fe 100755 --- a/biobb_dna/backbone/puckering.py +++ b/biobb_dna/backbone/puckering.py @@ -5,13 +5,15 @@ from typing import Optional import matplotlib.pyplot as plt -import pandas as pd import numpy as np -from biobb_dna.utils.loader import read_series -from biobb_dna.utils.transform import inverse_complement +import pandas as pd +from biobb_common.configuration import settings from biobb_common.generic.biobb_object import BiobbObject from biobb_common.tools.file_utils import launchlogger -from biobb_common.configuration import settings + +from biobb_dna.utils.common import _from_string_to_list +from biobb_dna.utils.loader import read_series +from biobb_dna.utils.transform import inverse_complement class Puckering(BiobbObject): @@ -57,9 +59,15 @@ class Puckering(BiobbObject): """ - def __init__(self, input_phaseC_path, input_phaseW_path, - output_csv_path, output_jpg_path, - properties=None, **kwargs) -> None: + def __init__( + self, + input_phaseC_path, + input_phaseW_path, + output_csv_path, + output_jpg_path, + properties=None, + **kwargs, + ) -> None: properties = properties or {} # Call parent class constructor @@ -68,22 +76,22 @@ def __init__(self, input_phaseC_path, input_phaseW_path, # Input/Output files self.io_dict = { - 'in': { - 'input_phaseC_path': input_phaseC_path, - 'input_phaseW_path': input_phaseW_path + "in": { + "input_phaseC_path": input_phaseC_path, + "input_phaseW_path": input_phaseW_path, + }, + "out": { + "output_csv_path": output_csv_path, + "output_jpg_path": output_jpg_path, }, - 'out': { - 'output_csv_path': output_csv_path, - 'output_jpg_path': output_jpg_path - } } self.properties = properties self.sequence = properties.get("sequence") - self.stride = properties.get( - "stride", 1000) - self.seqpos = properties.get( - "seqpos", None) + self.stride = properties.get("stride", 1000) + self.seqpos = [ + int(elem) for elem in _from_string_to_list(properties.get("seqpos", None)) + ] # Check the properties self.check_properties(properties) @@ -103,79 +111,57 @@ def launch(self) -> int: raise ValueError("sequence is null or too short!") # check seqpos - if self.seqpos is not None: + if self.seqpos: if (max(self.seqpos) > len(self.sequence) - 2) or (min(self.seqpos) < 1): raise ValueError( - f"seqpos values must be between 1 and {len(self.sequence) - 2}") + f"seqpos values must be between 1 and {len(self.sequence) - 2}" + ) if not (isinstance(self.seqpos, list) and len(self.seqpos) > 1): - raise ValueError( - "seqpos must be a list of at least two integers") + raise ValueError("seqpos must be a list of at least two integers") # read input files phaseC = read_series( - self.stage_io_dict['in']['input_phaseC_path'], - usecols=self.seqpos) + self.stage_io_dict["in"]["input_phaseC_path"], usecols=self.seqpos + ) phaseW = read_series( - self.stage_io_dict['in']['input_phaseW_path'], - usecols=self.seqpos) + self.stage_io_dict["in"]["input_phaseW_path"], usecols=self.seqpos + ) # fix angle range so its not negative phaseC = self.fix_angles(phaseC) phaseW = self.fix_angles(phaseW) # calculate difference between epsil and zeta parameters - xlabels = self.get_xlabels( - self.sequence, - inverse_complement(self.sequence)) + xlabels = self.get_xlabels(self.sequence, inverse_complement(self.sequence)) Npop, Epop, Wpop, Spop = self.check_puckering(phaseC, phaseW) # save plot fig, axs = plt.subplots(1, 1, dpi=300, tight_layout=True) - axs.bar( - range(len(xlabels)), - Npop, - label="North") - axs.bar( - range(len(xlabels)), - Epop, - bottom=Npop, - label="East") - axs.bar( - range(len(xlabels)), - Spop, - bottom=Npop+Epop, - label="South") - axs.bar( - range(len(xlabels)), - Wpop, - bottom=Npop+Epop+Spop, - label="West") + axs.bar(range(len(xlabels)), Npop, label="North") + axs.bar(range(len(xlabels)), Epop, bottom=Npop, label="East") + axs.bar(range(len(xlabels)), Spop, bottom=Npop + Epop, label="South") + axs.bar(range(len(xlabels)), Wpop, bottom=Npop + Epop + Spop, label="West") # empty bar to divide both sequences - axs.bar( - [len(phaseC.columns)], - [100], - color='white', - label=None) + axs.bar([len(phaseC.columns)], [100], color="white", label=None) axs.legend() axs.set_xticks(range(len(xlabels))) axs.set_xticklabels(xlabels, rotation=90) axs.set_xlabel("Nucleotide Sequence") axs.set_ylabel("Puckering (%)") axs.set_title("Nucleotide parameter: Puckering") - fig.savefig( - self.stage_io_dict['out']['output_jpg_path'], - format="jpg") + fig.savefig(self.stage_io_dict["out"]["output_jpg_path"], format="jpg") # save table - populations = pd.DataFrame({ - "Nucleotide": xlabels, - "North": Npop, - "East": Epop, - "West": Wpop, - "South": Spop}) - populations.to_csv( - self.stage_io_dict['out']['output_csv_path'], - index=False) + populations = pd.DataFrame( + { + "Nucleotide": xlabels, + "North": Npop, + "East": Epop, + "West": Wpop, + "South": Spop, + } + ) + populations.to_csv(self.stage_io_dict["out"]["output_csv_path"], index=False) plt.close() @@ -183,9 +169,7 @@ def launch(self) -> int: self.copy_to_host() # Remove temporary file(s) - self.tmp_files.extend([ - self.stage_io_dict.get("unique_dir", "") - ]) + self.tmp_files.extend([self.stage_io_dict.get("unique_dir", "")]) self.remove_tmp_files() self.check_arguments(output_files_created=True, raise_exception=False) @@ -195,29 +179,23 @@ def launch(self) -> int: def get_xlabels(self, strand1, strand2): # get list of tetramers, except first and last two bases labelsW = list(strand1) - labelsW[0] = f"{labelsW[0]}5\'" - labelsW[-1] = f"{labelsW[-1]}3\'" - labelsW = [ - f"{i}-{j}" for i, j in zip(labelsW, range(1, len(labelsW)+1))] + labelsW[0] = f"{labelsW[0]}5'" + labelsW[-1] = f"{labelsW[-1]}3'" + labelsW = [f"{i}-{j}" for i, j in zip(labelsW, range(1, len(labelsW) + 1))] labelsC = list(strand2)[::-1] - labelsC[0] = f"{labelsC[0]}5\'" - labelsC[-1] = f"{labelsC[-1]}3\'" - labelsC = [ - f"{i}-{j}" for i, j in zip(labelsC, range(len(labelsC), 0, -1))] + labelsC[0] = f"{labelsC[0]}5'" + labelsC[-1] = f"{labelsC[-1]}3'" + labelsC = [f"{i}-{j}" for i, j in zip(labelsC, range(len(labelsC), 0, -1))] - if self.seqpos is not None: + if self.seqpos: labelsC = [labelsC[i] for i in self.seqpos] labelsW = [labelsW[i] for i in self.seqpos] - xlabels = labelsW + ['-'] + labelsC + xlabels = labelsW + ["-"] + labelsC return xlabels def check_puckering(self, phaseC, phaseW): separator_df = pd.DataFrame({"-": np.nan}, index=range(1, len(phaseC))) - phase = pd.concat([ - phaseW, - separator_df, - phaseC[phaseC.columns[::-1]]], - axis=1) + phase = pd.concat([phaseW, separator_df, phaseC[phaseC.columns[::-1]]], axis=1) # phase.columns = columns Npop = np.logical_or(phase > 315, phase < 45).mean() * 100 @@ -234,35 +212,55 @@ def fix_angles(self, dataset): def puckering( - input_phaseC_path: str, input_phaseW_path: str, - output_csv_path: str, output_jpg_path: str, - properties: Optional[dict] = None, **kwargs) -> int: + input_phaseC_path: str, + input_phaseW_path: str, + output_csv_path: str, + output_jpg_path: str, + properties: Optional[dict] = None, + **kwargs, +) -> int: """Create :class:`Puckering ` class and - execute the: meth: `launch() ` method. """ + execute the: meth: `launch() ` method.""" return Puckering( input_phaseC_path=input_phaseC_path, input_phaseW_path=input_phaseW_path, output_csv_path=output_csv_path, output_jpg_path=output_jpg_path, - properties=properties, **kwargs).launch() + properties=properties, + **kwargs, + ).launch() def main(): """Command line execution of this building block. Please check the command line documentation.""" - parser = argparse.ArgumentParser(description='Calculate North/East/West/South distribution of sugar puckering backbone torsions.', - formatter_class=lambda prog: argparse.RawTextHelpFormatter(prog, width=99999)) - parser.add_argument('--config', required=False, help='Configuration file') - - required_args = parser.add_argument_group('required arguments') - required_args.add_argument('--input_phaseC_path', required=True, - help='Helical parameter input ser file path. Accepted formats: ser.') - required_args.add_argument('--input_phaseW_path', required=True, - help='Helical parameter input ser file path. Accepted formats: ser.') - required_args.add_argument('--output_csv_path', required=True, - help='Path to output csv file. Accepted formats: csv.') - required_args.add_argument('--output_jpg_path', required=True, - help='Path to output jpg file. Accepted formats: jpg.') + parser = argparse.ArgumentParser( + description="Calculate North/East/West/South distribution of sugar puckering backbone torsions.", + formatter_class=lambda prog: argparse.RawTextHelpFormatter(prog, width=99999), + ) + parser.add_argument("--config", required=False, help="Configuration file") + + required_args = parser.add_argument_group("required arguments") + required_args.add_argument( + "--input_phaseC_path", + required=True, + help="Helical parameter input ser file path. Accepted formats: ser.", + ) + required_args.add_argument( + "--input_phaseW_path", + required=True, + help="Helical parameter input ser file path. Accepted formats: ser.", + ) + required_args.add_argument( + "--output_csv_path", + required=True, + help="Path to output csv file. Accepted formats: csv.", + ) + required_args.add_argument( + "--output_jpg_path", + required=True, + help="Path to output jpg file. Accepted formats: jpg.", + ) args = parser.parse_args() args.config = args.config or "{}" @@ -273,8 +271,9 @@ def main(): input_phaseW_path=args.input_phaseW_path, output_csv_path=args.output_csv_path, output_jpg_path=args.output_jpg_path, - properties=properties) + properties=properties, + ) -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/biobb_dna/dna/dna_averages.py b/biobb_dna/dna/dna_averages.py index 8218d949..3ac41989 100755 --- a/biobb_dna/dna/dna_averages.py +++ b/biobb_dna/dna/dna_averages.py @@ -1,17 +1,20 @@ #!/usr/bin/env python3 """Module containing the HelParAverages class and the command line interface.""" + import argparse -from typing import Optional from pathlib import Path +from typing import Optional import matplotlib.pyplot as plt import pandas as pd -from biobb_dna.utils import constants -from biobb_dna.utils.loader import read_series +from biobb_common.configuration import settings from biobb_common.generic.biobb_object import BiobbObject from biobb_common.tools.file_utils import launchlogger -from biobb_common.configuration import settings + +from biobb_dna.utils import constants +from biobb_dna.utils.common import _from_string_to_list +from biobb_dna.utils.loader import read_series class HelParAverages(BiobbObject): @@ -58,8 +61,14 @@ class HelParAverages(BiobbObject): """ - def __init__(self, input_ser_path, output_csv_path, output_jpg_path, - properties=None, **kwargs) -> None: + def __init__( + self, + input_ser_path, + output_csv_path, + output_jpg_path, + properties=None, + **kwargs, + ) -> None: properties = properties or {} # Call parent class constructor @@ -68,24 +77,23 @@ def __init__(self, input_ser_path, output_csv_path, output_jpg_path, # Input/Output files self.io_dict = { - 'in': { - 'input_ser_path': input_ser_path, + "in": { + "input_ser_path": input_ser_path, + }, + "out": { + "output_csv_path": output_csv_path, + "output_jpg_path": output_jpg_path, }, - 'out': { - 'output_csv_path': output_csv_path, - 'output_jpg_path': output_jpg_path - } } # Properties specific for BB self.properties = properties self.sequence = properties.get("sequence", None) - self.stride = properties.get( - "stride", 1000) - self.seqpos = properties.get( - "seqpos", None) - self.helpar_name = properties.get( - "helpar_name", None) + self.stride = properties.get("stride", 1000) + self.seqpos = [ + int(elem) for elem in _from_string_to_list(properties.get("seqpos", None)) + ] + self.helpar_name = properties.get("helpar_name", None) # Check the properties self.check_properties(properties) @@ -101,25 +109,26 @@ def launch(self) -> int: self.stage_files() # check sequence - if self.sequence is None or len(self.sequence) < 2: + if not self.sequence or len(self.sequence) < 2: raise ValueError("sequence is null or too short!") # get helical parameter from filename if not specified if self.helpar_name is None: for hp in constants.helical_parameters: - ser_name = Path( - self.stage_io_dict['in']['input_ser_path']).name.lower() + ser_name = Path(self.stage_io_dict["in"]["input_ser_path"]).name.lower() if hp.lower() in ser_name: self.helpar_name = hp if self.helpar_name is None: raise ValueError( "Helical parameter name can't be inferred from file, " - "so it must be specified!") + "so it must be specified!" + ) else: if self.helpar_name not in constants.helical_parameters: raise ValueError( "Helical parameter name is invalid! " - f"Options: {constants.helical_parameters}") + f"Options: {constants.helical_parameters}" + ) # get base length and unit from helical parameter name if self.helpar_name.lower() in constants.hp_basepairs: @@ -132,72 +141,68 @@ def launch(self) -> int: self.hp_unit = "Angstroms" # check seqpos - if self.seqpos is not None: + if self.seqpos: if (max(self.seqpos) > len(self.sequence) - 2) or (min(self.seqpos) < 1): raise ValueError( - f"seqpos values must be between 1 and {len(self.sequence) - 2}") + f"seqpos values must be between 1 and {len(self.sequence) - 2}" + ) if not (isinstance(self.seqpos, list) and len(self.seqpos) > 1): - raise ValueError( - "seqpos must be a list of at least two integers") + raise ValueError("seqpos must be a list of at least two integers") # read input .ser file ser_data = read_series( - self.stage_io_dict['in']['input_ser_path'], - usecols=self.seqpos) - if self.seqpos is None: + self.stage_io_dict["in"]["input_ser_path"], usecols=self.seqpos + ) + if not self.seqpos: ser_data = ser_data[ser_data.columns[1:-1]] # discard first and last base(pairs) from sequence sequence = self.sequence[1:] xlabels = [ f"{sequence[i:i+1+self.baselen]}" - for i in range(len(ser_data.columns) - self.baselen)] + for i in range(len(ser_data.columns) - self.baselen) + ] else: sequence = self.sequence - xlabels = [ - f"{sequence[i:i+1+self.baselen]}" - for i in self.seqpos] + xlabels = [f"{sequence[i:i+1+self.baselen]}" for i in self.seqpos] # rename duplicated subunits while any(pd.Index(ser_data.columns).duplicated()): ser_data.columns = [ name if not duplicated else name + "_dup" - for duplicated, name - in zip(pd.Index(ser_data.columns).duplicated(), ser_data.columns)] + for duplicated, name in zip( + pd.Index(ser_data.columns).duplicated(), ser_data.columns + ) + ] # write output files for all selected bases - means = ser_data.mean(axis=0).iloc[:len(xlabels)] - stds = ser_data.std(axis=0).iloc[:len(xlabels)] + means = ser_data.mean(axis=0).iloc[: len(xlabels)] + stds = ser_data.std(axis=0).iloc[: len(xlabels)] # save plot fig, axs = plt.subplots(1, 1, dpi=300, tight_layout=True) axs.errorbar( - means.index, - means.to_numpy(), - yerr=stds.to_numpy(), - marker="o", - capsize=5) + means.index, means.to_numpy(), yerr=stds.to_numpy(), marker="o", capsize=5 + ) axs.set_xticks(means.index) axs.set_xticklabels(xlabels, rotation=90) - axs.set_xlabel( - "Sequence Base Pair " - f"{'Step' if self.baselen==1 else ''}") + axs.set_xlabel("Sequence Base Pair " f"{'Step' if self.baselen==1 else ''}") axs.set_ylabel(f"{self.helpar_name.capitalize()} ({self.hp_unit})") axs.set_title( "Base Pair " f"{'Step' if self.baselen==1 else ''} " - f"Helical Parameter: {self.helpar_name.capitalize()}") - fig.savefig( - self.stage_io_dict['out']['output_jpg_path'], - format="jpg") + f"Helical Parameter: {self.helpar_name.capitalize()}" + ) + fig.savefig(self.stage_io_dict["out"]["output_jpg_path"], format="jpg") # save table - dataset = pd.DataFrame({ - f"Base Pair {'Step' if self.baselen==1 else ''}": xlabels, - "mean": means.to_numpy(), - "std": stds.to_numpy()}) - dataset.to_csv( - self.stage_io_dict['out']['output_csv_path'], - index=False) + dataset = pd.DataFrame( + { + f"Base Pair {'Step' if self.baselen==1 else ''}": xlabels, + "mean": means.to_numpy(), + "std": stds.to_numpy(), + } + ) + dataset.to_csv(self.stage_io_dict["out"]["output_csv_path"], index=False) plt.close() @@ -205,9 +210,7 @@ def launch(self) -> int: self.copy_to_host() # Remove temporary file(s) - self.tmp_files.extend([ - self.stage_io_dict.get("unique_dir", "") - ]) + self.tmp_files.extend([self.stage_io_dict.get("unique_dir", "")]) self.remove_tmp_files() self.check_arguments(output_files_created=True, raise_exception=False) @@ -216,30 +219,48 @@ def launch(self) -> int: def dna_averages( - input_ser_path: str, output_csv_path: str, output_jpg_path: str, - properties: Optional[dict] = None, **kwargs) -> int: + input_ser_path: str, + output_csv_path: str, + output_jpg_path: str, + properties: Optional[dict] = None, + **kwargs, +) -> int: """Create :class:`HelParAverages ` class and execute the :meth:`launch() ` method.""" - return HelParAverages(input_ser_path=input_ser_path, - output_csv_path=output_csv_path, - output_jpg_path=output_jpg_path, - properties=properties, **kwargs).launch() + return HelParAverages( + input_ser_path=input_ser_path, + output_csv_path=output_csv_path, + output_jpg_path=output_jpg_path, + properties=properties, + **kwargs, + ).launch() def main(): """Command line execution of this building block. Please check the command line documentation.""" - parser = argparse.ArgumentParser(description='Load helical parameter file and calculate average values for each base pair.', - formatter_class=lambda prog: argparse.RawTextHelpFormatter(prog, width=99999)) - parser.add_argument('--config', required=False, help='Configuration file') - - required_args = parser.add_argument_group('required arguments') - required_args.add_argument('--input_ser_path', required=True, - help='Helical parameter input ser file path. Accepted formats: ser.') - required_args.add_argument('--output_csv_path', required=True, - help='Path to output csv file. Accepted formats: csv.') - required_args.add_argument('--output_jpg_path', required=True, - help='Path to output jpg file. Accepted formats: jpg.') + parser = argparse.ArgumentParser( + description="Load helical parameter file and calculate average values for each base pair.", + formatter_class=lambda prog: argparse.RawTextHelpFormatter(prog, width=99999), + ) + parser.add_argument("--config", required=False, help="Configuration file") + + required_args = parser.add_argument_group("required arguments") + required_args.add_argument( + "--input_ser_path", + required=True, + help="Helical parameter input ser file path. Accepted formats: ser.", + ) + required_args.add_argument( + "--output_csv_path", + required=True, + help="Path to output csv file. Accepted formats: csv.", + ) + required_args.add_argument( + "--output_jpg_path", + required=True, + help="Path to output jpg file. Accepted formats: jpg.", + ) args = parser.parse_args() args.config = args.config or "{}" @@ -249,8 +270,9 @@ def main(): input_ser_path=args.input_ser_path, output_csv_path=args.output_csv_path, output_jpg_path=args.output_jpg_path, - properties=properties) + properties=properties, + ) -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/biobb_dna/dna/dna_timeseries.py b/biobb_dna/dna/dna_timeseries.py index 6c0d7644..3750d551 100755 --- a/biobb_dna/dna/dna_timeseries.py +++ b/biobb_dna/dna/dna_timeseries.py @@ -1,21 +1,24 @@ #!/usr/bin/env python3 """Module containing the HelParTimeSeries class and the command line interface.""" + import argparse -from typing import Optional -import zipfile import re +import zipfile from pathlib import Path +from typing import Optional -import pandas as pd import matplotlib.pyplot as plt -from biobb_dna.utils import constants -from biobb_dna.utils.loader import read_series -from biobb_common.generic.biobb_object import BiobbObject +import pandas as pd from biobb_common.configuration import settings +from biobb_common.generic.biobb_object import BiobbObject from biobb_common.tools import file_utils as fu from biobb_common.tools.file_utils import launchlogger +from biobb_dna.utils import constants +from biobb_dna.utils.common import _from_string_to_list +from biobb_dna.utils.loader import read_series + class HelParTimeSeries(BiobbObject): """ @@ -60,8 +63,9 @@ class HelParTimeSeries(BiobbObject): """ - def __init__(self, input_ser_path, output_zip_path, - properties=None, **kwargs) -> None: + def __init__( + self, input_ser_path, output_zip_path, properties=None, **kwargs + ) -> None: properties = properties or {} # Call parent class constructor @@ -70,23 +74,20 @@ def __init__(self, input_ser_path, output_zip_path, # Input/Output files self.io_dict = { - 'in': { - 'input_ser_path': input_ser_path, + "in": { + "input_ser_path": input_ser_path, }, - 'out': { - 'output_zip_path': output_zip_path - } + "out": {"output_zip_path": output_zip_path}, } self.properties = properties self.sequence = properties.get("sequence", None) self.bins = properties.get("bins", "auto") - self.stride = properties.get( - "stride", 10) - self.seqpos = properties.get( - "seqpos", None) - self.helpar_name = properties.get( - "helpar_name", None) + self.stride = properties.get("stride", 10) + self.seqpos = [ + int(elem) for elem in _from_string_to_list(properties.get("seqpos", None)) + ] + self.helpar_name = properties.get("helpar_name", None) # get helical parameter from filename if not specified if self.helpar_name is None: @@ -96,12 +97,14 @@ def __init__(self, input_ser_path, output_zip_path, if self.helpar_name is None: raise ValueError( "Helical parameter name can't be inferred from file, " - "so it must be specified!") + "so it must be specified!" + ) else: if self.helpar_name not in constants.helical_parameters: raise ValueError( "Helical parameter name is invalid! " - f"Options: {constants.helical_parameters}") + f"Options: {constants.helical_parameters}" + ) # get base length from helical parameter name if self.helpar_name.lower() in constants.hp_singlebases: @@ -128,12 +131,12 @@ def launch(self) -> int: self.stage_files() # check sequence - if self.sequence is None or len(self.sequence) < 2: + if not self.sequence or len(self.sequence) < 2: raise ValueError("sequence is null or too short!") # calculate cols with 0 index - if self.seqpos is not None: - cols = [i-1 for i in self.seqpos] + if self.seqpos: + cols = [i - 1 for i in self.seqpos] else: cols = list(range(len(self.sequence))) @@ -141,21 +144,21 @@ def launch(self) -> int: cols.sort() # check seqpos for base pairs - if self.seqpos is not None and self.helpar_name in constants.hp_basepairs: + if self.seqpos and self.helpar_name in constants.hp_basepairs: if (max(cols) > len(self.sequence) - 2) or (min(cols) < 0): raise ValueError( - f"seqpos values must be between 1 and {len(self.sequence) - 1}") + f"seqpos values must be between 1 and {len(self.sequence) - 1}" + ) if not (isinstance(self.seqpos, list) and len(self.seqpos) > 1): - raise ValueError( - "seqpos must be a list of at least two integers") + raise ValueError("seqpos must be a list of at least two integers") # check seqpos for non base pairs - elif self.seqpos is not None and self.helpar_name not in constants.hp_basepairs: + elif self.seqpos and self.helpar_name not in constants.hp_basepairs: if (max(cols) > len(self.sequence) - 1) or (min(cols) < 0): raise ValueError( - f"seqpos values must be between 1 and {len(self.sequence)}") + f"seqpos values must be between 1 and {len(self.sequence)}" + ) if not (isinstance(self.seqpos, list) and len(self.seqpos) > 1): - raise ValueError( - "seqpos must be a list of at least two integers") + raise ValueError("seqpos must be a list of at least two integers") if self.helpar_name in constants.hp_basepairs: # remove first and last base pairs from cols if they match 0 and len(sequence) @@ -171,13 +174,17 @@ def launch(self) -> int: # create subunits list from cols subunits = [f"{i+1}_{sequence[i-1:i+self.baselen]}" for i in cols] # clean subunits (leave only basepairs) - pattern = re.compile(r'\d+_[A-Za-z]{2}') + pattern = re.compile(r"\d+_[A-Za-z]{2}") # get removed items removed_items = [s for s in subunits if not pattern.fullmatch(s)] # get indices of removed items (in integer format and starting from 0) - removed_numbers = [int(match.group()) for item in removed_items if (match := re.match(r'\d+', item))] + removed_numbers = [ + int(match.group()) + for item in removed_items + if (match := re.match(r"\d+", item)) + ] removed_numbers = list(map(int, removed_numbers)) - removed_numbers = [int(i)-1 for i in removed_numbers] + removed_numbers = [int(i) - 1 for i in removed_numbers] # remove non basepairs from subunits and indices subunits = [s for s in subunits if pattern.fullmatch(s)] indices = [i for i in indices if i not in removed_numbers] @@ -192,47 +199,50 @@ def launch(self) -> int: # read input .ser file ser_data = read_series( - self.stage_io_dict['in']['input_ser_path'], - usecols=indices) + self.stage_io_dict["in"]["input_ser_path"], usecols=indices + ) # get columns for selected bases ser_data.columns = subunits # write output files for all selected bases (one per column) - zf = zipfile.ZipFile( - Path(self.stage_io_dict["out"]["output_zip_path"]), "w") + zf = zipfile.ZipFile(Path(self.stage_io_dict["out"]["output_zip_path"]), "w") for col in ser_data.columns: # unstack columns to prevent errors from repeated base pairs - column_data = ( - ser_data[[col]] - .unstack() - .dropna() - .reset_index(drop=True)) + column_data = ser_data[[col]].unstack().dropna().reset_index(drop=True) column_data.name = col fu.log(f"Computing base number {col}...") # column series series_colfn = f"series_{self.helpar_name}_{col}" column_data.to_csv( - Path(self.stage_io_dict.get("unique_dir", "")) / f"{series_colfn}.csv") + Path(self.stage_io_dict.get("unique_dir", "")) / f"{series_colfn}.csv" + ) # save table zf.write( - Path(self.stage_io_dict.get("unique_dir", "")) / f"{series_colfn}.csv", arcname=f"{series_colfn}.csv") + Path(self.stage_io_dict.get("unique_dir", "")) / f"{series_colfn}.csv", + arcname=f"{series_colfn}.csv", + ) fig, axs = plt.subplots(1, 1, dpi=300, tight_layout=True) - reduced_data = column_data.iloc[::self.stride] + reduced_data = column_data.iloc[:: self.stride] axs.plot(reduced_data.index, reduced_data.to_numpy()) axs.set_xlabel("Time (Snapshots)") axs.set_ylabel(f"{self.helpar_name.capitalize()} ({self.hp_unit})") axs.set_title( f"Helical Parameter vs Time: {self.helpar_name.capitalize()} " "(base pair " - f"{'step' if self.baselen==1 else ''} {col})") + f"{'step' if self.baselen==1 else ''} {col})" + ) fig.savefig( - Path(self.stage_io_dict.get("unique_dir", "")) / f"{series_colfn}.jpg", format="jpg") + Path(self.stage_io_dict.get("unique_dir", "")) / f"{series_colfn}.jpg", + format="jpg", + ) # save plot zf.write( - Path(self.stage_io_dict.get("unique_dir", "")) / f"{series_colfn}.jpg", arcname=f"{series_colfn}.jpg") + Path(self.stage_io_dict.get("unique_dir", "")) / f"{series_colfn}.jpg", + arcname=f"{series_colfn}.jpg", + ) plt.close() # columns histogram @@ -241,21 +251,25 @@ def launch(self) -> int: ybins, x, _ = axs.hist(column_data, bins=self.bins) pd.DataFrame({self.helpar_name: x[:-1], "density": ybins}).to_csv( Path(self.stage_io_dict.get("unique_dir", "")) / f"{hist_colfn}.csv", - index=False) + index=False, + ) # save table zf.write( Path(self.stage_io_dict.get("unique_dir", "")) / f"{hist_colfn}.csv", - arcname=f"{hist_colfn}.csv") + arcname=f"{hist_colfn}.csv", + ) axs.set_ylabel("Density") axs.set_xlabel(f"{self.helpar_name.capitalize()} ({self.hp_unit})") fig.savefig( Path(self.stage_io_dict.get("unique_dir", "")) / f"{hist_colfn}.jpg", - format="jpg") + format="jpg", + ) # save plot zf.write( Path(self.stage_io_dict.get("unique_dir", "")) / f"{hist_colfn}.jpg", - arcname=f"{hist_colfn}.jpg") + arcname=f"{hist_colfn}.jpg", + ) plt.close() zf.close() @@ -263,9 +277,7 @@ def launch(self) -> int: self.copy_to_host() # Remove temporary file(s) - self.tmp_files.extend([ - self.stage_io_dict.get("unique_dir", "") - ]) + self.tmp_files.extend([self.stage_io_dict.get("unique_dir", "")]) self.remove_tmp_files() self.check_arguments(output_files_created=True, raise_exception=False) @@ -274,29 +286,39 @@ def launch(self) -> int: def dna_timeseries( - input_ser_path: str, output_zip_path: str, - properties: Optional[dict] = None, **kwargs) -> int: + input_ser_path: str, + output_zip_path: str, + properties: Optional[dict] = None, + **kwargs, +) -> int: """Create :class:`HelParTimeSeries ` class and execute the :meth:`launch() ` method.""" return HelParTimeSeries( input_ser_path=input_ser_path, output_zip_path=output_zip_path, - properties=properties, **kwargs).launch() + properties=properties, + **kwargs, + ).launch() def main(): """Command line execution of this building block. Please check the command line documentation.""" - parser = argparse.ArgumentParser(description='Created time series and histogram plots for each base pair from a helical parameter series file.', - formatter_class=lambda prog: argparse.RawTextHelpFormatter(prog, width=99999)) - parser.add_argument('--config', required=False, - help='Configuration file') - - required_args = parser.add_argument_group('required arguments') - required_args.add_argument('--input_ser_path', required=True, - help='Helical parameter input ser file path. Accepted formats: ser.') - required_args.add_argument('--output_zip_path', required=True, - help='Path to output directory.') + parser = argparse.ArgumentParser( + description="Created time series and histogram plots for each base pair from a helical parameter series file.", + formatter_class=lambda prog: argparse.RawTextHelpFormatter(prog, width=99999), + ) + parser.add_argument("--config", required=False, help="Configuration file") + + required_args = parser.add_argument_group("required arguments") + required_args.add_argument( + "--input_ser_path", + required=True, + help="Helical parameter input ser file path. Accepted formats: ser.", + ) + required_args.add_argument( + "--output_zip_path", required=True, help="Path to output directory." + ) args = parser.parse_args() args.config = args.config or "{}" @@ -305,8 +327,9 @@ def main(): dna_timeseries( input_ser_path=args.input_ser_path, output_zip_path=args.output_zip_path, - properties=properties) + properties=properties, + ) -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/biobb_dna/interbp_correlations/interbpcorr.py b/biobb_dna/interbp_correlations/interbpcorr.py index 39a38dc0..a0e8fa18 100755 --- a/biobb_dna/interbp_correlations/interbpcorr.py +++ b/biobb_dna/interbp_correlations/interbpcorr.py @@ -1,20 +1,22 @@ #!/usr/bin/env python3 """Module containing the InterBasePairCorrelation class and the command line interface.""" + import argparse -from typing import Optional from itertools import product +from typing import Optional -import numpy as np -import pandas as pd import matplotlib as mpl import matplotlib.pyplot as plt - -from biobb_common.generic.biobb_object import BiobbObject +import numpy as np +import pandas as pd from biobb_common.configuration import settings +from biobb_common.generic.biobb_object import BiobbObject from biobb_common.tools.file_utils import launchlogger -from biobb_dna.utils.loader import read_series + from biobb_dna.utils import constants +from biobb_dna.utils.common import _from_string_to_list +from biobb_dna.utils.loader import read_series class InterBasePairCorrelation(BiobbObject): @@ -65,11 +67,18 @@ class InterBasePairCorrelation(BiobbObject): """ def __init__( - self, input_filename_shift, input_filename_slide, - input_filename_rise, input_filename_tilt, - input_filename_roll, input_filename_twist, - output_csv_path, output_jpg_path, - properties=None, **kwargs) -> None: + self, + input_filename_shift, + input_filename_slide, + input_filename_rise, + input_filename_tilt, + input_filename_roll, + input_filename_twist, + output_csv_path, + output_jpg_path, + properties=None, + **kwargs, + ) -> None: properties = properties or {} # Call parent class constructor @@ -78,23 +87,25 @@ def __init__( # Input/Output files self.io_dict = { - 'in': { - 'input_filename_shift': input_filename_shift, - 'input_filename_slide': input_filename_slide, - 'input_filename_rise': input_filename_rise, - 'input_filename_tilt': input_filename_tilt, - 'input_filename_roll': input_filename_roll, - 'input_filename_twist': input_filename_twist + "in": { + "input_filename_shift": input_filename_shift, + "input_filename_slide": input_filename_slide, + "input_filename_rise": input_filename_rise, + "input_filename_tilt": input_filename_tilt, + "input_filename_roll": input_filename_roll, + "input_filename_twist": input_filename_twist, + }, + "out": { + "output_csv_path": output_csv_path, + "output_jpg_path": output_jpg_path, }, - 'out': { - 'output_csv_path': output_csv_path, - 'output_jpg_path': output_jpg_path - } } self.properties = properties self.sequence = properties.get("sequence", None) - self.seqpos = properties.get("seqpos", None) + self.seqpos = [ + int(elem) for elem in _from_string_to_list(properties.get("seqpos", None)) + ] # Check the properties self.check_properties(properties) @@ -110,30 +121,35 @@ def launch(self) -> int: self.stage_files() # check sequence - if self.sequence is None or len(self.sequence) < 2: + if not self.sequence or len(self.sequence) < 2: raise ValueError("sequence is null or too short!") # check seqpos - if self.seqpos is not None: + if self.seqpos: if not (isinstance(self.seqpos, list) and len(self.seqpos) > 1): - raise ValueError( - "seqpos must be a list of at least two integers") + raise ValueError("seqpos must be a list of at least two integers") # read input shift = read_series( - self.stage_io_dict["in"]["input_filename_shift"], usecols=self.seqpos) + self.stage_io_dict["in"]["input_filename_shift"], usecols=self.seqpos + ) slide = read_series( - self.stage_io_dict["in"]["input_filename_slide"], usecols=self.seqpos) + self.stage_io_dict["in"]["input_filename_slide"], usecols=self.seqpos + ) rise = read_series( - self.stage_io_dict["in"]["input_filename_rise"], usecols=self.seqpos) + self.stage_io_dict["in"]["input_filename_rise"], usecols=self.seqpos + ) tilt = read_series( - self.stage_io_dict["in"]["input_filename_tilt"], usecols=self.seqpos) + self.stage_io_dict["in"]["input_filename_tilt"], usecols=self.seqpos + ) roll = read_series( - self.stage_io_dict["in"]["input_filename_roll"], usecols=self.seqpos) + self.stage_io_dict["in"]["input_filename_roll"], usecols=self.seqpos + ) twist = read_series( - self.stage_io_dict["in"]["input_filename_twist"], usecols=self.seqpos) + self.stage_io_dict["in"]["input_filename_twist"], usecols=self.seqpos + ) - if self.seqpos is None: + if not self.seqpos: # drop first and last columns shift = shift[shift.columns[1:-2]] slide = slide[slide.columns[1:-2]] @@ -142,9 +158,12 @@ def launch(self) -> int: roll = roll[roll.columns[1:-2]] twist = twist[twist.columns[1:-2]] labels = [ - f"{i+1}_{self.sequence[i:i+2]}" for i in range(1, len(shift.columns) + 1)] + f"{i+1}_{self.sequence[i:i+2]}" + for i in range(1, len(shift.columns) + 1) + ] corr_index = [ - f"{self.sequence[i:i+3]}" for i in range(1, len(shift.columns) + 1)] + f"{self.sequence[i:i+3]}" for i in range(1, len(shift.columns) + 1) + ] else: labels = [f"{i+1}_{self.sequence[i:i+2]}" for i in self.seqpos] corr_index = [f"{self.sequence[i:i+3]}" for i in self.seqpos] @@ -171,17 +190,14 @@ def launch(self) -> int: for ser1, ser2 in product(datasets, datasets): ser2_shifted = ser2.shift(axis=1) ser2_shifted[labels[0]] = ser2[labels[-1]] - if ( - ser1.name in constants.hp_angular and ser2.name in constants.hp_angular): + if ser1.name in constants.hp_angular and ser2.name in constants.hp_angular: method = self.circular elif ( - ( - ser1.name in constants.hp_angular and not ( - ser2.name in constants.hp_angular) - ) or ( - ser2.name in constants.hp_angular and not ( - ser1.name in constants.hp_angular) - ) + ser1.name in constants.hp_angular + and ser2.name not in constants.hp_angular + ) or ( + ser2.name in constants.hp_angular + and ser1.name not in constants.hp_angular ): method = self.circlineal else: @@ -197,18 +213,13 @@ def launch(self) -> int: # create heatmap cmap = plt.get_cmap("bwr").copy() - bounds = [-1, -.8, -.6, -.4, -.2, .2, .4, .6, .8, 1] + bounds = [-1, -0.8, -0.6, -0.4, -0.2, 0.2, 0.4, 0.6, 0.8, 1] num = cmap.N norm = mpl.colors.BoundaryNorm(bounds, num) # type: ignore cmap.set_bad(color="gainsboro") - fig, ax = plt.subplots( - 1, - 1, - dpi=300, - figsize=(7.5, 5), - tight_layout=True) - im = ax.imshow(result_df, cmap=cmap, norm=norm, aspect='auto') - plt.colorbar(im, ticks=[-1, -.8, -.6, -.4, -.2, .2, .4, .6, .8, 1]) + fig, ax = plt.subplots(1, 1, dpi=300, figsize=(7.5, 5), tight_layout=True) + im = ax.imshow(result_df, cmap=cmap, norm=norm, aspect="auto") + plt.colorbar(im, ticks=[-1, -0.8, -0.6, -0.4, -0.2, 0.2, 0.4, 0.6, 0.8, 1]) # axes xlocs = np.arange(len(result_df.columns)) @@ -220,22 +231,22 @@ def launch(self) -> int: _ = ax.set_yticklabels(result_df.index.to_list()) # type: ignore ax.set_title( - "Correlation for neighboring basepairs " - "and pairs of helical parameters") + "Correlation for neighboring basepairs " "and pairs of helical parameters" + ) fig.tight_layout() - fig.savefig( - self.stage_io_dict['out']['output_jpg_path'], - format="jpg") + fig.savefig(self.stage_io_dict["out"]["output_jpg_path"], format="jpg") plt.close() # Copy files to host self.copy_to_host() # Remove temporary file(s) - self.tmp_files.extend([ - self.stage_io_dict.get("unique_dir", ""), - ]) + self.tmp_files.extend( + [ + self.stage_io_dict.get("unique_dir", ""), + ] + ) self.remove_tmp_files() self.check_arguments(output_files_created=True, raise_exception=False) @@ -249,7 +260,7 @@ def circular(x1, x2): diff_1 = np.sin(x1 - x1.mean()) diff_2 = np.sin(x2 - x2.mean()) num = (diff_1 * diff_2).sum() - den = np.sqrt((diff_1 ** 2).sum() * (diff_2 ** 2).sum()) + den = np.sqrt((diff_1**2).sum() * (diff_2**2).sum()) return num / den @staticmethod @@ -258,8 +269,8 @@ def circlineal(x1, x2): rc = np.corrcoef(x1, np.cos(x2))[1, 0] rs = np.corrcoef(x1, np.sin(x2))[1, 0] rcs = np.corrcoef(np.sin(x2), np.cos(x2))[1, 0] - num = (rc ** 2) + (rs ** 2) - 2 * rc * rs * rcs - den = 1 - (rcs ** 2) + num = (rc**2) + (rs**2) - 2 * rc * rs * rcs + den = 1 - (rcs**2) correlation = np.sqrt(num / den) if np.corrcoef(x1, x2)[1, 0] < 0: correlation *= -1 @@ -267,11 +278,17 @@ def circlineal(x1, x2): def interbpcorr( - input_filename_shift: str, input_filename_slide: str, - input_filename_rise: str, input_filename_tilt: str, - input_filename_roll: str, input_filename_twist: str, - output_csv_path: str, output_jpg_path: str, - properties: Optional[dict] = None, **kwargs) -> int: + input_filename_shift: str, + input_filename_slide: str, + input_filename_rise: str, + input_filename_tilt: str, + input_filename_roll: str, + input_filename_twist: str, + output_csv_path: str, + output_jpg_path: str, + properties: Optional[dict] = None, + **kwargs, +) -> int: """Create :class:`HelParCorrelation ` class and execute the :meth:`launch() ` method.""" @@ -284,32 +301,60 @@ def interbpcorr( input_filename_twist=input_filename_twist, output_csv_path=output_csv_path, output_jpg_path=output_jpg_path, - properties=properties, **kwargs).launch() + properties=properties, + **kwargs, + ).launch() def main(): """Command line execution of this building block. Please check the command line documentation.""" - parser = argparse.ArgumentParser(description='Load .ser file from Canal output and calculate correlation between base pairs of the corresponding sequence.', - formatter_class=lambda prog: argparse.RawTextHelpFormatter(prog, width=99999)) - parser.add_argument('--config', required=False, help='Configuration file') - - required_args = parser.add_argument_group('required arguments') - required_args.add_argument('--input_filename_shift', required=True, - help='Path to ser file for helical parameter shift. Accepted formats: ser.') - required_args.add_argument('--input_filename_slide', required=True, - help='Path to ser file for helical parameter slide. Accepted formats: ser.') - required_args.add_argument('--input_filename_rise', required=True, - help='Path to ser file for helical parameter rise. Accepted formats: ser.') - required_args.add_argument('--input_filename_tilt', required=True, - help='Path to ser file for helical parameter tilt. Accepted formats: ser.') - required_args.add_argument('--input_filename_roll', required=True, - help='Path to ser file for helical parameter roll. Accepted formats: ser.') - required_args.add_argument('--input_filename_twist', required=True, - help='Path to ser file for helical parameter twist. Accepted formats: ser.') - required_args.add_argument('--output_csv_path', required=True, - help='Path to output file. Accepted formats: csv.') - required_args.add_argument('--output_jpg_path', required=True, - help='Path to output plot. Accepted formats: jpg.') + parser = argparse.ArgumentParser( + description="Load .ser file from Canal output and calculate correlation between base pairs of the corresponding sequence.", + formatter_class=lambda prog: argparse.RawTextHelpFormatter(prog, width=99999), + ) + parser.add_argument("--config", required=False, help="Configuration file") + + required_args = parser.add_argument_group("required arguments") + required_args.add_argument( + "--input_filename_shift", + required=True, + help="Path to ser file for helical parameter shift. Accepted formats: ser.", + ) + required_args.add_argument( + "--input_filename_slide", + required=True, + help="Path to ser file for helical parameter slide. Accepted formats: ser.", + ) + required_args.add_argument( + "--input_filename_rise", + required=True, + help="Path to ser file for helical parameter rise. Accepted formats: ser.", + ) + required_args.add_argument( + "--input_filename_tilt", + required=True, + help="Path to ser file for helical parameter tilt. Accepted formats: ser.", + ) + required_args.add_argument( + "--input_filename_roll", + required=True, + help="Path to ser file for helical parameter roll. Accepted formats: ser.", + ) + required_args.add_argument( + "--input_filename_twist", + required=True, + help="Path to ser file for helical parameter twist. Accepted formats: ser.", + ) + required_args.add_argument( + "--output_csv_path", + required=True, + help="Path to output file. Accepted formats: csv.", + ) + required_args.add_argument( + "--output_jpg_path", + required=True, + help="Path to output plot. Accepted formats: jpg.", + ) args = parser.parse_args() args.config = args.config or "{}" @@ -324,8 +369,9 @@ def main(): input_filename_twist=args.input_filename_twist, output_csv_path=args.output_csv_path, output_jpg_path=args.output_jpg_path, - properties=properties) + properties=properties, + ) -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/biobb_dna/interbp_correlations/interseqcorr.py b/biobb_dna/interbp_correlations/interseqcorr.py index ecd501c5..a746cd51 100755 --- a/biobb_dna/interbp_correlations/interseqcorr.py +++ b/biobb_dna/interbp_correlations/interseqcorr.py @@ -1,18 +1,20 @@ #!/usr/bin/env python3 """Module containing the InterSequenceCorrelation class and the command line interface.""" + import argparse -from typing import Optional from pathlib import Path +from typing import Optional -import numpy as np import matplotlib.pyplot as plt - -from biobb_common.generic.biobb_object import BiobbObject +import numpy as np from biobb_common.configuration import settings +from biobb_common.generic.biobb_object import BiobbObject from biobb_common.tools.file_utils import launchlogger -from biobb_dna.utils.loader import read_series + from biobb_dna.utils import constants +from biobb_dna.utils.common import _from_string_to_list +from biobb_dna.utils.loader import read_series class InterSequenceCorrelation(BiobbObject): @@ -58,8 +60,13 @@ class InterSequenceCorrelation(BiobbObject): """ def __init__( - self, input_ser_path, output_csv_path, - output_jpg_path, properties=None, **kwargs) -> None: + self, + input_ser_path, + output_csv_path, + output_jpg_path, + properties=None, + **kwargs, + ) -> None: properties = properties or {} # Call parent class constructor @@ -68,18 +75,18 @@ def __init__( # Input/Output files self.io_dict = { - 'in': { - 'input_ser_path': input_ser_path + "in": {"input_ser_path": input_ser_path}, + "out": { + "output_csv_path": output_csv_path, + "output_jpg_path": output_jpg_path, }, - 'out': { - 'output_csv_path': output_csv_path, - 'output_jpg_path': output_jpg_path - } } self.properties = properties self.sequence = properties.get("sequence", None) - self.seqpos = properties.get("seqpos", None) + self.seqpos = [ + int(elem) for elem in _from_string_to_list(properties.get("seqpos", None)) + ] self.helpar_name = properties.get("helpar_name", None) # Check the properties @@ -96,47 +103,49 @@ def launch(self) -> int: self.stage_files() # check sequence - if self.sequence is None or len(self.sequence) < 2: + if not self.sequence or len(self.sequence) < 2: raise ValueError("sequence is null or too short!") # get helical parameter from filename if not specified if self.helpar_name is None: for hp in constants.helical_parameters: - if hp.lower() in Path( - self.stage_io_dict['in']['input_ser_path']).name.lower(): + if ( + hp.lower() + in Path(self.stage_io_dict["in"]["input_ser_path"]).name.lower() + ): self.helpar_name = hp if self.helpar_name is None: raise ValueError( "Helical parameter name can't be inferred from file, " - "so it must be specified!") + "so it must be specified!" + ) else: if self.helpar_name not in constants.helical_parameters: raise ValueError( "Helical parameter name is invalid! " - f"Options: {constants.helical_parameters}") + f"Options: {constants.helical_parameters}" + ) # get base length and unit from helical parameter name if self.helpar_name in constants.hp_angular: self.method = "pearson" else: - self.method = self.circular + self.method = self.circular # type: ignore # check seqpos - if self.seqpos is not None: + if self.seqpos: if not (isinstance(self.seqpos, list) and len(self.seqpos) > 1): - raise ValueError( - "seqpos must be a list of at least two integers") + raise ValueError("seqpos must be a list of at least two integers") # read input .ser file ser_data = read_series( - self.stage_io_dict['in']['input_ser_path'], - usecols=self.seqpos) - if self.seqpos is None: + self.stage_io_dict["in"]["input_ser_path"], usecols=self.seqpos + ) + if not self.seqpos: ser_data = ser_data[ser_data.columns[1:-1]] # discard first and last base(pairs) from strands sequence = self.sequence[1:] - labels = [ - f"{i+1}_{sequence[i:i+2]}" for i in range(len(ser_data.columns))] + labels = [f"{i+1}_{sequence[i:i+2]}" for i in range(len(ser_data.columns))] else: labels = [f"{i+1}_{self.sequence[i:i+2]}" for i in self.seqpos] ser_data.columns = labels @@ -154,32 +163,29 @@ def launch(self) -> int: for i in range(len(corr_data)): for j in range(len(corr_data)): axs.text( - j+.5, - i+.5, + j + 0.5, + i + 0.5, f"{corr_data[corr_data.columns[j]].iloc[i]:.2f}", ha="center", va="center", - color="w") + color="w", + ) axs.set_xticks([i + 0.5 for i in range(len(corr_data))]) axs.set_xticklabels(labels, rotation=90) axs.set_yticks([i + 0.5 for i in range(len(corr_data))]) axs.set_yticklabels(labels) axs.set_title( - "Base Pair Correlation " - f"for Helical Parameter \'{self.helpar_name}\'") + "Base Pair Correlation " f"for Helical Parameter '{self.helpar_name}'" + ) fig.tight_layout() - fig.savefig( - self.stage_io_dict['out']['output_jpg_path'], - format="jpg") + fig.savefig(self.stage_io_dict["out"]["output_jpg_path"], format="jpg") plt.close() # Copy files to host self.copy_to_host() # Remove temporary file(s) - self.tmp_files.extend([ - self.stage_io_dict.get("unique_dir", "") - ]) + self.tmp_files.extend([self.stage_io_dict.get("unique_dir", "")]) self.remove_tmp_files() self.check_arguments(output_files_created=True, raise_exception=False) @@ -193,14 +199,17 @@ def circular(x1, x2): diff_1 = np.sin(x1 - x1.mean()) diff_2 = np.sin(x2 - x2.mean()) num = (diff_1 * diff_2).sum() - den = np.sqrt((diff_1 ** 2).sum() * (diff_2 ** 2).sum()) + den = np.sqrt((diff_1**2).sum() * (diff_2**2).sum()) return num / den def interseqcorr( - input_ser_path: str, - output_csv_path: str, output_jpg_path: str, - properties: Optional[dict] = None, **kwargs) -> int: + input_ser_path: str, + output_csv_path: str, + output_jpg_path: str, + properties: Optional[dict] = None, + **kwargs, +) -> int: """Create :class:`HelParCorrelation ` class and execute the :meth:`launch() ` method.""" @@ -208,22 +217,35 @@ def interseqcorr( input_ser_path=input_ser_path, output_csv_path=output_csv_path, output_jpg_path=output_jpg_path, - properties=properties, **kwargs).launch() + properties=properties, + **kwargs, + ).launch() def main(): """Command line execution of this building block. Please check the command line documentation.""" - parser = argparse.ArgumentParser(description='Load .ser file from Canal output and calculate correlation between base pairs of the corresponding sequence.', - formatter_class=lambda prog: argparse.RawTextHelpFormatter(prog, width=99999)) - parser.add_argument('--config', required=False, help='Configuration file') - - required_args = parser.add_argument_group('required arguments') - required_args.add_argument('--input_ser_path', required=True, - help='Path to ser file with inputs. Accepted formats: ser.') - required_args.add_argument('--output_csv_path', required=True, - help='Path to output file. Accepted formats: csv.') - required_args.add_argument('--output_jpg_path', required=True, - help='Path to output plot. Accepted formats: jpg.') + parser = argparse.ArgumentParser( + description="Load .ser file from Canal output and calculate correlation between base pairs of the corresponding sequence.", + formatter_class=lambda prog: argparse.RawTextHelpFormatter(prog, width=99999), + ) + parser.add_argument("--config", required=False, help="Configuration file") + + required_args = parser.add_argument_group("required arguments") + required_args.add_argument( + "--input_ser_path", + required=True, + help="Path to ser file with inputs. Accepted formats: ser.", + ) + required_args.add_argument( + "--output_csv_path", + required=True, + help="Path to output file. Accepted formats: csv.", + ) + required_args.add_argument( + "--output_jpg_path", + required=True, + help="Path to output plot. Accepted formats: jpg.", + ) args = parser.parse_args() args.config = args.config or "{}" @@ -233,8 +255,9 @@ def main(): input_ser_path=args.input_ser_path, output_csv_path=args.output_csv_path, output_jpg_path=args.output_jpg_path, - properties=properties) + properties=properties, + ) -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/biobb_dna/intrabp_correlations/intrabpcorr.py b/biobb_dna/intrabp_correlations/intrabpcorr.py index dcb0a35a..ef7346a8 100755 --- a/biobb_dna/intrabp_correlations/intrabpcorr.py +++ b/biobb_dna/intrabp_correlations/intrabpcorr.py @@ -1,20 +1,22 @@ #!/usr/bin/env python3 """Module containing the IntraBasePairCorrelation class and the command line interface.""" + import argparse -from typing import Optional from itertools import product +from typing import Optional -import numpy as np -import pandas as pd import matplotlib as mpl import matplotlib.pyplot as plt - -from biobb_common.generic.biobb_object import BiobbObject +import numpy as np +import pandas as pd from biobb_common.configuration import settings +from biobb_common.generic.biobb_object import BiobbObject from biobb_common.tools.file_utils import launchlogger -from biobb_dna.utils.loader import read_series + from biobb_dna.utils import constants +from biobb_dna.utils.common import _from_string_to_list +from biobb_dna.utils.loader import read_series class IntraBasePairCorrelation(BiobbObject): @@ -65,11 +67,18 @@ class IntraBasePairCorrelation(BiobbObject): """ def __init__( - self, input_filename_shear, input_filename_stretch, - input_filename_stagger, input_filename_buckle, - input_filename_propel, input_filename_opening, - output_csv_path, output_jpg_path, - properties=None, **kwargs) -> None: + self, + input_filename_shear, + input_filename_stretch, + input_filename_stagger, + input_filename_buckle, + input_filename_propel, + input_filename_opening, + output_csv_path, + output_jpg_path, + properties=None, + **kwargs, + ) -> None: properties = properties or {} # Call parent class constructor @@ -78,23 +87,25 @@ def __init__( # Input/Output files self.io_dict = { - 'in': { - 'input_filename_shear': input_filename_shear, - 'input_filename_stretch': input_filename_stretch, - 'input_filename_stagger': input_filename_stagger, - 'input_filename_buckle': input_filename_buckle, - 'input_filename_propel': input_filename_propel, - 'input_filename_opening': input_filename_opening + "in": { + "input_filename_shear": input_filename_shear, + "input_filename_stretch": input_filename_stretch, + "input_filename_stagger": input_filename_stagger, + "input_filename_buckle": input_filename_buckle, + "input_filename_propel": input_filename_propel, + "input_filename_opening": input_filename_opening, + }, + "out": { + "output_csv_path": output_csv_path, + "output_jpg_path": output_jpg_path, }, - 'out': { - 'output_csv_path': output_csv_path, - 'output_jpg_path': output_jpg_path - } } self.properties = properties self.sequence = properties.get("sequence", None) - self.seqpos = properties.get("seqpos", None) + self.seqpos = [ + int(elem) for elem in _from_string_to_list(properties.get("seqpos", None)) + ] # Check the properties self.check_properties(properties) @@ -114,26 +125,31 @@ def launch(self) -> int: raise ValueError("sequence is null or too short!") # check seqpos - if self.seqpos is not None: + if self.seqpos: if not (isinstance(self.seqpos, list) and len(self.seqpos) > 1): - raise ValueError( - "seqpos must be a list of at least two integers") + raise ValueError("seqpos must be a list of at least two integers") # read input shear = read_series( - self.stage_io_dict["in"]["input_filename_shear"], usecols=self.seqpos) + self.stage_io_dict["in"]["input_filename_shear"], usecols=self.seqpos + ) stretch = read_series( - self.stage_io_dict["in"]["input_filename_stretch"], usecols=self.seqpos) + self.stage_io_dict["in"]["input_filename_stretch"], usecols=self.seqpos + ) stagger = read_series( - self.stage_io_dict["in"]["input_filename_stagger"], usecols=self.seqpos) + self.stage_io_dict["in"]["input_filename_stagger"], usecols=self.seqpos + ) buckle = read_series( - self.stage_io_dict["in"]["input_filename_buckle"], usecols=self.seqpos) + self.stage_io_dict["in"]["input_filename_buckle"], usecols=self.seqpos + ) propel = read_series( - self.stage_io_dict["in"]["input_filename_propel"], usecols=self.seqpos) + self.stage_io_dict["in"]["input_filename_propel"], usecols=self.seqpos + ) opening = read_series( - self.stage_io_dict["in"]["input_filename_opening"], usecols=self.seqpos) + self.stage_io_dict["in"]["input_filename_opening"], usecols=self.seqpos + ) - if self.seqpos is None: + if not self.seqpos: # drop first and last columns shear = shear[shear.columns[1:-1]] stretch = stretch[stretch.columns[1:-1]] @@ -142,9 +158,12 @@ def launch(self) -> int: propel = propel[propel.columns[1:-1]] opening = opening[opening.columns[1:-1]] labels = [ - f"{i+1}_{self.sequence[i:i+1]}" for i in range(1, len(shear.columns) + 1)] + f"{i+1}_{self.sequence[i:i+1]}" + for i in range(1, len(shear.columns) + 1) + ] corr_index = [ - f"{self.sequence[i:i+2]}" for i in range(1, len(shear.columns) + 1)] + f"{self.sequence[i:i+2]}" for i in range(1, len(shear.columns) + 1) + ] else: labels = [f"{i+1}_{self.sequence[i:i+1]}" for i in self.seqpos] corr_index = [f"{self.sequence[i:i+2]}" for i in self.seqpos] @@ -171,17 +190,14 @@ def launch(self) -> int: for ser1, ser2 in product(datasets, datasets): ser2_shifted = ser2.shift(axis=1) ser2_shifted[labels[0]] = ser2[labels[-1]] - if ( - ser1.name in constants.hp_angular and ser2.name in constants.hp_angular): + if ser1.name in constants.hp_angular and ser2.name in constants.hp_angular: method = self.circular elif ( - ( - ser1.name in constants.hp_angular and not ( - ser2.name in constants.hp_angular) - ) or ( - ser2.name in constants.hp_angular and not ( - ser1.name in constants.hp_angular) - ) + ser1.name in constants.hp_angular + and ser2.name not in constants.hp_angular + ) or ( + ser2.name in constants.hp_angular + and ser1.name not in constants.hp_angular ): method = self.circlineal else: @@ -197,18 +213,13 @@ def launch(self) -> int: # create heatmap cmap = plt.get_cmap("bwr").copy() - bounds = [-1, -.8, -.6, -.4, -.2, .2, .4, .6, .8, 1] + bounds = [-1, -0.8, -0.6, -0.4, -0.2, 0.2, 0.4, 0.6, 0.8, 1] num = cmap.N norm = mpl.colors.BoundaryNorm(bounds, num) # type: ignore cmap.set_bad(color="gainsboro") - fig, ax = plt.subplots( - 1, - 1, - dpi=300, - figsize=(7.5, 5), - tight_layout=True) - im = ax.imshow(result_df, cmap=cmap, norm=norm, aspect='auto') - plt.colorbar(im, ticks=[-1, -.8, -.6, -.4, -.2, .2, .4, .6, .8, 1]) + fig, ax = plt.subplots(1, 1, dpi=300, figsize=(7.5, 5), tight_layout=True) + im = ax.imshow(result_df, cmap=cmap, norm=norm, aspect="auto") + plt.colorbar(im, ticks=[-1, -0.8, -0.6, -0.4, -0.2, 0.2, 0.4, 0.6, 0.8, 1]) # axes xlocs = np.arange(len(result_df.columns)) @@ -220,22 +231,18 @@ def launch(self) -> int: _ = ax.set_yticklabels(result_df.index.to_list()) # type: ignore ax.set_title( - "Correlation for neighboring basepairs " - "and pairs of helical parameters") + "Correlation for neighboring basepairs " "and pairs of helical parameters" + ) fig.tight_layout() - fig.savefig( - self.stage_io_dict['out']['output_jpg_path'], - format="jpg") + fig.savefig(self.stage_io_dict["out"]["output_jpg_path"], format="jpg") plt.close() # Copy files to host self.copy_to_host() # Remove temporary file(s) - self.tmp_files.extend([ - self.stage_io_dict.get("unique_dir", "") - ]) + self.tmp_files.extend([self.stage_io_dict.get("unique_dir", "")]) self.remove_tmp_files() self.check_arguments(output_files_created=True, raise_exception=False) @@ -249,7 +256,7 @@ def circular(x1, x2): diff_1 = np.sin(x1 - x1.mean()) diff_2 = np.sin(x2 - x2.mean()) num = (diff_1 * diff_2).sum() - den = np.sqrt((diff_1 ** 2).sum() * (diff_2 ** 2).sum()) + den = np.sqrt((diff_1**2).sum() * (diff_2**2).sum()) return num / den @staticmethod @@ -258,8 +265,8 @@ def circlineal(x1, x2): rc = np.corrcoef(x1, np.cos(x2))[1, 0] rs = np.corrcoef(x1, np.sin(x2))[1, 0] rcs = np.corrcoef(np.sin(x2), np.cos(x2))[1, 0] - num = (rc ** 2) + (rs ** 2) - 2 * rc * rs * rcs - den = 1 - (rcs ** 2) + num = (rc**2) + (rs**2) - 2 * rc * rs * rcs + den = 1 - (rcs**2) correlation = np.sqrt(num / den) if np.corrcoef(x1, x2)[1, 0] < 0: correlation *= -1 @@ -267,11 +274,17 @@ def circlineal(x1, x2): def intrabpcorr( - input_filename_shear: str, input_filename_stretch: str, - input_filename_stagger: str, input_filename_buckle: str, - input_filename_propel: str, input_filename_opening: str, - output_csv_path: str, output_jpg_path: str, - properties: Optional[dict] = None, **kwargs) -> int: + input_filename_shear: str, + input_filename_stretch: str, + input_filename_stagger: str, + input_filename_buckle: str, + input_filename_propel: str, + input_filename_opening: str, + output_csv_path: str, + output_jpg_path: str, + properties: Optional[dict] = None, + **kwargs, +) -> int: """Create :class:`HelParCorrelation ` class and execute the :meth:`launch() ` method.""" @@ -284,32 +297,60 @@ def intrabpcorr( input_filename_opening=input_filename_opening, output_csv_path=output_csv_path, output_jpg_path=output_jpg_path, - properties=properties, **kwargs).launch() + properties=properties, + **kwargs, + ).launch() def main(): """Command line execution of this building block. Please check the command line documentation.""" - parser = argparse.ArgumentParser(description='Load .ser file from Canal output and calculate correlation between base pairs of the corresponding sequence.', - formatter_class=lambda prog: argparse.RawTextHelpFormatter(prog, width=99999)) - parser.add_argument('--config', required=False, help='Configuration file') - - required_args = parser.add_argument_group('required arguments') - required_args.add_argument('--input_filename_shear', required=True, - help='Path to ser file for helical parameter shear. Accepted formats: ser.') - required_args.add_argument('--input_filename_stretch', required=True, - help='Path to ser file for helical parameter stretch. Accepted formats: ser.') - required_args.add_argument('--input_filename_stagger', required=True, - help='Path to ser file for helical parameter stagger. Accepted formats: ser.') - required_args.add_argument('--input_filename_buckle', required=True, - help='Path to ser file for helical parameter buckle. Accepted formats: ser.') - required_args.add_argument('--input_filename_propel', required=True, - help='Path to ser file for helical parameter propel. Accepted formats: ser.') - required_args.add_argument('--input_filename_opening', required=True, - help='Path to ser file for helical parameter opening. Accepted formats: ser.') - required_args.add_argument('--output_csv_path', required=True, - help='Path to output file. Accepted formats: csv.') - required_args.add_argument('--output_jpg_path', required=True, - help='Path to output plot. Accepted formats: jpg.') + parser = argparse.ArgumentParser( + description="Load .ser file from Canal output and calculate correlation between base pairs of the corresponding sequence.", + formatter_class=lambda prog: argparse.RawTextHelpFormatter(prog, width=99999), + ) + parser.add_argument("--config", required=False, help="Configuration file") + + required_args = parser.add_argument_group("required arguments") + required_args.add_argument( + "--input_filename_shear", + required=True, + help="Path to ser file for helical parameter shear. Accepted formats: ser.", + ) + required_args.add_argument( + "--input_filename_stretch", + required=True, + help="Path to ser file for helical parameter stretch. Accepted formats: ser.", + ) + required_args.add_argument( + "--input_filename_stagger", + required=True, + help="Path to ser file for helical parameter stagger. Accepted formats: ser.", + ) + required_args.add_argument( + "--input_filename_buckle", + required=True, + help="Path to ser file for helical parameter buckle. Accepted formats: ser.", + ) + required_args.add_argument( + "--input_filename_propel", + required=True, + help="Path to ser file for helical parameter propel. Accepted formats: ser.", + ) + required_args.add_argument( + "--input_filename_opening", + required=True, + help="Path to ser file for helical parameter opening. Accepted formats: ser.", + ) + required_args.add_argument( + "--output_csv_path", + required=True, + help="Path to output file. Accepted formats: csv.", + ) + required_args.add_argument( + "--output_jpg_path", + required=True, + help="Path to output plot. Accepted formats: jpg.", + ) args = parser.parse_args() args.config = args.config or "{}" @@ -324,8 +365,9 @@ def main(): input_filename_opening=args.input_filename_opening, output_csv_path=args.output_csv_path, output_jpg_path=args.output_jpg_path, - properties=properties) + properties=properties, + ) -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/biobb_dna/intrabp_correlations/intraseqcorr.py b/biobb_dna/intrabp_correlations/intraseqcorr.py index 76f5c6d8..f0fc7338 100755 --- a/biobb_dna/intrabp_correlations/intraseqcorr.py +++ b/biobb_dna/intrabp_correlations/intraseqcorr.py @@ -1,18 +1,20 @@ #!/usr/bin/env python3 """Module containing the IntraSequenceCorrelation class and the command line interface.""" + import argparse -from typing import Optional from pathlib import Path +from typing import Optional -import numpy as np import matplotlib.pyplot as plt - -from biobb_common.generic.biobb_object import BiobbObject +import numpy as np from biobb_common.configuration import settings +from biobb_common.generic.biobb_object import BiobbObject from biobb_common.tools.file_utils import launchlogger -from biobb_dna.utils.loader import read_series + from biobb_dna.utils import constants +from biobb_dna.utils.common import _from_string_to_list +from biobb_dna.utils.loader import read_series class IntraSequenceCorrelation(BiobbObject): @@ -54,8 +56,13 @@ class IntraSequenceCorrelation(BiobbObject): """ def __init__( - self, input_ser_path, output_csv_path, - output_jpg_path, properties=None, **kwargs) -> None: + self, + input_ser_path, + output_csv_path, + output_jpg_path, + properties=None, + **kwargs, + ) -> None: properties = properties or {} # Call parent class constructor @@ -64,18 +71,18 @@ def __init__( # Input/Output files self.io_dict = { - 'in': { - 'input_ser_path': input_ser_path + "in": {"input_ser_path": input_ser_path}, + "out": { + "output_csv_path": output_csv_path, + "output_jpg_path": output_jpg_path, }, - 'out': { - 'output_csv_path': output_csv_path, - 'output_jpg_path': output_jpg_path - } } self.properties = properties self.sequence = properties.get("sequence", None) - self.seqpos = properties.get("seqpos", None) + self.seqpos = [ + int(elem) for elem in _from_string_to_list(properties.get("seqpos", None)) + ] self.helpar_name = properties.get("helpar_name", None) # Check the properties @@ -98,41 +105,43 @@ def launch(self) -> int: # get helical parameter from filename if not specified if self.helpar_name is None: for hp in constants.helical_parameters: - if hp.lower() in Path( - self.stage_io_dict['in']['input_ser_path']).name.lower(): + if ( + hp.lower() + in Path(self.stage_io_dict["in"]["input_ser_path"]).name.lower() + ): self.helpar_name = hp if self.helpar_name is None: raise ValueError( "Helical parameter name can't be inferred from file, " - "so it must be specified!") + "so it must be specified!" + ) else: if self.helpar_name not in constants.helical_parameters: raise ValueError( "Helical parameter name is invalid! " - f"Options: {constants.helical_parameters}") + f"Options: {constants.helical_parameters}" + ) # get base length and unit from helical parameter name if self.helpar_name in constants.hp_angular: self.method = "pearson" else: - self.method = self.circular + self.method = self.circular # type: ignore # check seqpos - if self.seqpos is not None: + if self.seqpos: if not (isinstance(self.seqpos, list) and len(self.seqpos) > 1): - raise ValueError( - "seqpos must be a list of at least two integers") + raise ValueError("seqpos must be a list of at least two integers") # read input .ser file ser_data = read_series( - self.stage_io_dict['in']['input_ser_path'], - usecols=self.seqpos) - if self.seqpos is None: + self.stage_io_dict["in"]["input_ser_path"], usecols=self.seqpos + ) + if not self.seqpos: ser_data = ser_data[ser_data.columns[1:-1]] # discard first and last base(pairs) from strands sequence = self.sequence[1:] - labels = [ - f"{i+1}_{sequence[i:i+1]}" for i in range(len(ser_data.columns))] + labels = [f"{i+1}_{sequence[i:i+1]}" for i in range(len(ser_data.columns))] else: labels = [f"{i+1}_{self.sequence[i:i+1]}" for i in self.seqpos] ser_data.columns = labels @@ -150,32 +159,29 @@ def launch(self) -> int: for i in range(len(corr_data)): for j in range(len(corr_data)): axs.text( - j+.5, - i+.5, + j + 0.5, + i + 0.5, f"{corr_data[corr_data.columns[j]].iloc[i]:.2f}", ha="center", va="center", - color="w") + color="w", + ) axs.set_xticks([i + 0.5 for i in range(len(corr_data))]) axs.set_xticklabels(labels, rotation=90) axs.set_yticks([i + 0.5 for i in range(len(corr_data))]) axs.set_yticklabels(labels) axs.set_title( - "Base Pair Correlation " - f"for Helical Parameter \'{self.helpar_name}\'") + "Base Pair Correlation " f"for Helical Parameter '{self.helpar_name}'" + ) fig.tight_layout() - fig.savefig( - self.stage_io_dict['out']['output_jpg_path'], - format="jpg") + fig.savefig(self.stage_io_dict["out"]["output_jpg_path"], format="jpg") plt.close() # Copy files to host self.copy_to_host() # Remove temporary file(s) - self.tmp_files.extend([ - self.stage_io_dict.get("unique_dir", "") - ]) + self.tmp_files.extend([self.stage_io_dict.get("unique_dir", "")]) self.remove_tmp_files() self.check_arguments(output_files_created=True, raise_exception=False) @@ -189,14 +195,17 @@ def circular(x1, x2): diff_1 = np.sin(x1 - x1.mean()) diff_2 = np.sin(x2 - x2.mean()) num = (diff_1 * diff_2).sum() - den = np.sqrt((diff_1 ** 2).sum() * (diff_2 ** 2).sum()) + den = np.sqrt((diff_1**2).sum() * (diff_2**2).sum()) return num / den def intraseqcorr( - input_ser_path: str, - output_csv_path: str, output_jpg_path: str, - properties: Optional[dict] = None, **kwargs) -> int: + input_ser_path: str, + output_csv_path: str, + output_jpg_path: str, + properties: Optional[dict] = None, + **kwargs, +) -> int: """Create :class:`HelParCorrelation ` class and execute the :meth:`launch() ` method.""" @@ -204,22 +213,35 @@ def intraseqcorr( input_ser_path=input_ser_path, output_csv_path=output_csv_path, output_jpg_path=output_jpg_path, - properties=properties, **kwargs).launch() + properties=properties, + **kwargs, + ).launch() def main(): """Command line execution of this building block. Please check the command line documentation.""" - parser = argparse.ArgumentParser(description='Load .ser file from Canal output and calculate correlation between base pairs of the corresponding sequence.', - formatter_class=lambda prog: argparse.RawTextHelpFormatter(prog, width=99999)) - parser.add_argument('--config', required=False, help='Configuration file') - - required_args = parser.add_argument_group('required arguments') - required_args.add_argument('--input_ser_path', required=True, - help='Path to ser file with inputs. Accepted formats: ser.') - required_args.add_argument('--output_csv_path', required=True, - help='Path to output file. Accepted formats: csv.') - required_args.add_argument('--output_jpg_path', required=True, - help='Path to output plot. Accepted formats: jpg.') + parser = argparse.ArgumentParser( + description="Load .ser file from Canal output and calculate correlation between base pairs of the corresponding sequence.", + formatter_class=lambda prog: argparse.RawTextHelpFormatter(prog, width=99999), + ) + parser.add_argument("--config", required=False, help="Configuration file") + + required_args = parser.add_argument_group("required arguments") + required_args.add_argument( + "--input_ser_path", + required=True, + help="Path to ser file with inputs. Accepted formats: ser.", + ) + required_args.add_argument( + "--output_csv_path", + required=True, + help="Path to output file. Accepted formats: csv.", + ) + required_args.add_argument( + "--output_jpg_path", + required=True, + help="Path to output plot. Accepted formats: jpg.", + ) args = parser.parse_args() args.config = args.config or "{}" @@ -229,8 +251,9 @@ def main(): input_ser_path=args.input_ser_path, output_csv_path=args.output_csv_path, output_jpg_path=args.output_jpg_path, - properties=properties) + properties=properties, + ) -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/biobb_dna/stiffness/average_stiffness.py b/biobb_dna/stiffness/average_stiffness.py index 3ccdf81a..19c4d8ce 100755 --- a/biobb_dna/stiffness/average_stiffness.py +++ b/biobb_dna/stiffness/average_stiffness.py @@ -2,17 +2,19 @@ """Module containing the AverageStiffness class and the command line interface.""" import argparse -from typing import Optional from pathlib import Path +from typing import Optional import matplotlib.pyplot as plt -import pandas as pd import numpy as np -from biobb_dna.utils import constants -from biobb_dna.utils.loader import read_series +import pandas as pd +from biobb_common.configuration import settings from biobb_common.generic.biobb_object import BiobbObject from biobb_common.tools.file_utils import launchlogger -from biobb_common.configuration import settings + +from biobb_dna.utils import constants +from biobb_dna.utils.common import _from_string_to_list +from biobb_dna.utils.loader import read_series class AverageStiffness(BiobbObject): @@ -58,8 +60,14 @@ class AverageStiffness(BiobbObject): """ - def __init__(self, input_ser_path, output_csv_path, output_jpg_path, - properties=None, **kwargs) -> None: + def __init__( + self, + input_ser_path, + output_csv_path, + output_jpg_path, + properties=None, + **kwargs, + ) -> None: properties = properties or {} # Call parent class constructor @@ -68,20 +76,21 @@ def __init__(self, input_ser_path, output_csv_path, output_jpg_path, # Input/Output files self.io_dict = { - 'in': { - 'input_ser_path': input_ser_path, + "in": { + "input_ser_path": input_ser_path, + }, + "out": { + "output_csv_path": output_csv_path, + "output_jpg_path": output_jpg_path, }, - 'out': { - 'output_csv_path': output_csv_path, - 'output_jpg_path': output_jpg_path - } } self.properties = properties self.sequence = properties.get("sequence") - self.KT = properties.get( - "KT", 0.592186827) - self.seqpos = properties.get("seqpos", None) + self.KT = properties.get("KT", 0.592186827) + self.seqpos = [ + int(elem) for elem in _from_string_to_list(properties.get("seqpos", None)) + ] self.helpar_name = properties.get("helpar_name", None) # Check the properties @@ -104,52 +113,52 @@ def launch(self) -> int: # get helical parameter from filename if not specified if self.helpar_name is None: for hp in constants.helical_parameters: - if hp.lower() in Path( - self.stage_io_dict['in']['input_ser_path']).name.lower(): + if ( + hp.lower() + in Path(self.stage_io_dict["in"]["input_ser_path"]).name.lower() + ): self.helpar_name = hp if self.helpar_name is None: raise ValueError( "Helical parameter name can't be inferred from file, " - "so it must be specified!") + "so it must be specified!" + ) else: if self.helpar_name not in constants.helical_parameters: raise ValueError( "Helical parameter name is invalid! " - f"Options: {constants.helical_parameters}") + f"Options: {constants.helical_parameters}" + ) # get base length and unit from helical parameter name if self.helpar_name.lower() in ["roll", "tilt", "twist"]: self.hp_unit = "kcal/(mol*degree²)" - scale = 1 + scale = 1.0 else: self.hp_unit = "kcal/(mol*Ų)" scale = 10.6 # check seqpos - if self.seqpos is not None: + if self.seqpos: if (max(self.seqpos) > len(self.sequence) - 2) or (min(self.seqpos) < 1): raise ValueError( - f"seqpos values must be between 1 and {len(self.sequence) - 2}") + f"seqpos values must be between 1 and {len(self.sequence) - 2}" + ) if not (isinstance(self.seqpos, list) and len(self.seqpos) > 1): - raise ValueError( - "seqpos must be a list of at least two integers") + raise ValueError("seqpos must be a list of at least two integers") # read input .ser file ser_data = read_series( - self.stage_io_dict['in']['input_ser_path'], - usecols=self.seqpos) - if self.seqpos is None: + self.stage_io_dict["in"]["input_ser_path"], usecols=self.seqpos + ) + if not self.seqpos: ser_data = ser_data[ser_data.columns[1:-1]] # discard first and last base(pairs) from sequence sequence = self.sequence[1:] - xlabels = [ - f"{sequence[i:i+2]}" - for i in range(len(ser_data.columns))] + xlabels = [f"{sequence[i:i+2]}" for i in range(len(ser_data.columns))] else: sequence = self.sequence - xlabels = [ - f"{sequence[i:i+2]}" - for i in self.seqpos] + xlabels = [f"{sequence[i:i+2]}" for i in self.seqpos] # calculate average stiffness cov = ser_data.cov() @@ -158,27 +167,21 @@ def launch(self) -> int: # save plot fig, axs = plt.subplots(1, 1, dpi=300, tight_layout=True) - axs.plot( - range(len(xlabels)), - avg_stiffness, - "-o") + axs.plot(range(len(xlabels)), avg_stiffness, "-o") axs.set_xticks(range(len(xlabels))) axs.set_xticklabels(xlabels) axs.set_xlabel("Sequence Base Pair") axs.set_ylabel(f"{self.helpar_name.capitalize()} ({self.hp_unit})") axs.set_title( - "Base Pair Helical Parameter Stiffness: " - f"{self.helpar_name.capitalize()}") - fig.savefig( - self.stage_io_dict['out']['output_jpg_path'], - format="jpg") + "Base Pair Helical Parameter Stiffness: " f"{self.helpar_name.capitalize()}" + ) + fig.savefig(self.stage_io_dict["out"]["output_jpg_path"], format="jpg") # save table dataset = pd.DataFrame( - data=avg_stiffness, - index=xlabels, - columns=[f"{self.helpar_name}_stiffness"]) - dataset.to_csv(self.stage_io_dict['out']['output_csv_path']) + data=avg_stiffness, index=xlabels, columns=[f"{self.helpar_name}_stiffness"] + ) + dataset.to_csv(self.stage_io_dict["out"]["output_csv_path"]) plt.close() @@ -186,9 +189,7 @@ def launch(self) -> int: self.copy_to_host() # Remove temporary file(s) - self.tmp_files.extend([ - self.stage_io_dict.get("unique_dir", "") - ]) + self.tmp_files.extend([self.stage_io_dict.get("unique_dir", "")]) self.remove_tmp_files() self.check_arguments(output_files_created=True, raise_exception=False) @@ -197,8 +198,12 @@ def launch(self) -> int: def average_stiffness( - input_ser_path: str, output_csv_path: str, output_jpg_path: str, - properties: Optional[dict] = None, **kwargs) -> int: + input_ser_path: str, + output_csv_path: str, + output_jpg_path: str, + properties: Optional[dict] = None, + **kwargs, +) -> int: """Create :class:`AverageStiffness ` class and execute the :meth:`launch() ` method.""" @@ -206,22 +211,35 @@ def average_stiffness( input_ser_path=input_ser_path, output_csv_path=output_csv_path, output_jpg_path=output_jpg_path, - properties=properties, **kwargs).launch() + properties=properties, + **kwargs, + ).launch() def main(): """Command line execution of this building block. Please check the command line documentation.""" - parser = argparse.ArgumentParser(description='Calculate average stiffness constants for each base pair of a trajectory\'s series.', - formatter_class=lambda prog: argparse.RawTextHelpFormatter(prog, width=99999)) - parser.add_argument('--config', required=False, help='Configuration file') - - required_args = parser.add_argument_group('required arguments') - required_args.add_argument('--input_ser_path', required=True, - help='Helical parameter input ser file path. Accepted formats: ser.') - required_args.add_argument('--output_csv_path', required=True, - help='Path to output csv file. Accepted formats: csv.') - required_args.add_argument('--output_jpg_path', required=True, - help='Path to output jpg file. Accepted formats: jpg.') + parser = argparse.ArgumentParser( + description="Calculate average stiffness constants for each base pair of a trajectory's series.", + formatter_class=lambda prog: argparse.RawTextHelpFormatter(prog, width=99999), + ) + parser.add_argument("--config", required=False, help="Configuration file") + + required_args = parser.add_argument_group("required arguments") + required_args.add_argument( + "--input_ser_path", + required=True, + help="Helical parameter input ser file path. Accepted formats: ser.", + ) + required_args.add_argument( + "--output_csv_path", + required=True, + help="Path to output csv file. Accepted formats: csv.", + ) + required_args.add_argument( + "--output_jpg_path", + required=True, + help="Path to output jpg file. Accepted formats: jpg.", + ) args = parser.parse_args() args.config = args.config or "{}" @@ -231,8 +249,9 @@ def main(): input_ser_path=args.input_ser_path, output_csv_path=args.output_csv_path, output_jpg_path=args.output_jpg_path, - properties=properties) + properties=properties, + ) -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/biobb_dna/stiffness/basepair_stiffness.py b/biobb_dna/stiffness/basepair_stiffness.py index bc5d03fe..115712a7 100755 --- a/biobb_dna/stiffness/basepair_stiffness.py +++ b/biobb_dna/stiffness/basepair_stiffness.py @@ -1,17 +1,19 @@ #!/usr/bin/env python3 """Module containing the HelParStiffness class and the command line interface.""" + import argparse -from typing import Optional from pathlib import Path +from typing import Optional -import pandas as pd -import numpy as np import matplotlib.pyplot as plt - -from biobb_common.generic.biobb_object import BiobbObject +import numpy as np +import pandas as pd from biobb_common.configuration import settings +from biobb_common.generic.biobb_object import BiobbObject from biobb_common.tools.file_utils import launchlogger + +from biobb_dna.utils.common import _from_string_to_list from biobb_dna.utils.loader import load_data @@ -66,10 +68,19 @@ class BPStiffness(BiobbObject): """ - def __init__(self, input_filename_shift, input_filename_slide, - input_filename_rise, input_filename_tilt, - input_filename_roll, input_filename_twist, - output_csv_path, output_jpg_path, properties=None, **kwargs) -> None: + def __init__( + self, + input_filename_shift, + input_filename_slide, + input_filename_rise, + input_filename_tilt, + input_filename_roll, + input_filename_twist, + output_csv_path, + output_jpg_path, + properties=None, + **kwargs, + ) -> None: properties = properties or {} # Call parent class constructor @@ -78,25 +89,28 @@ def __init__(self, input_filename_shift, input_filename_slide, # Input/Output files self.io_dict = { - 'in': { - 'input_filename_shift': input_filename_shift, - 'input_filename_slide': input_filename_slide, - 'input_filename_rise': input_filename_rise, - 'input_filename_tilt': input_filename_tilt, - 'input_filename_roll': input_filename_roll, - 'input_filename_twist': input_filename_twist + "in": { + "input_filename_shift": input_filename_shift, + "input_filename_slide": input_filename_slide, + "input_filename_rise": input_filename_rise, + "input_filename_tilt": input_filename_tilt, + "input_filename_roll": input_filename_roll, + "input_filename_twist": input_filename_twist, + }, + "out": { + "output_csv_path": output_csv_path, + "output_jpg_path": output_jpg_path, }, - 'out': { - 'output_csv_path': output_csv_path, - 'output_jpg_path': output_jpg_path - } } self.properties = properties - self.KT = properties.get( - "KT", 0.592186827) - self.scaling = properties.get( - "scaling", [1, 1, 1, 10.6, 10.6, 10.6]) + self.KT = properties.get("KT", 0.592186827) + self.scaling = [ + int(elem) + for elem in _from_string_to_list( + properties.get("scaling", [1, 1, 1, 10.6, 10.6, 10.6]) + ) + ] # Check the properties self.check_properties(properties) @@ -112,34 +126,24 @@ def launch(self) -> int: self.stage_files() # read input - shift = load_data( - self.stage_io_dict["in"]["input_filename_shift"]) - slide = load_data( - self.stage_io_dict["in"]["input_filename_slide"]) - rise = load_data( - self.stage_io_dict["in"]["input_filename_rise"]) - tilt = load_data( - self.stage_io_dict["in"]["input_filename_tilt"]) - roll = load_data( - self.stage_io_dict["in"]["input_filename_roll"]) - twist = load_data( - self.stage_io_dict["in"]["input_filename_twist"]) + shift = load_data(self.stage_io_dict["in"]["input_filename_shift"]) + slide = load_data(self.stage_io_dict["in"]["input_filename_slide"]) + rise = load_data(self.stage_io_dict["in"]["input_filename_rise"]) + tilt = load_data(self.stage_io_dict["in"]["input_filename_tilt"]) + roll = load_data(self.stage_io_dict["in"]["input_filename_roll"]) + twist = load_data(self.stage_io_dict["in"]["input_filename_twist"]) # build matrix cols_arr from helpar input data files coordinates = ["shift", "slide", "rise", "tilt", "roll", "twist"] basepairname = shift.columns[0] - helpar_matrix = pd.concat( - [shift, slide, rise, tilt, roll, twist], axis=1) + helpar_matrix = pd.concat([shift, slide, rise, tilt, roll, twist], axis=1) helpar_matrix.columns = coordinates # covariance cov_df = helpar_matrix.cov() # stiffness stiff = np.linalg.inv(cov_df) * self.KT stiff_diag = stiff * np.array(self.scaling) - stiff_df = pd.DataFrame( - stiff_diag, - columns=cov_df.columns, - index=cov_df.index) + stiff_df = pd.DataFrame(stiff_diag, columns=cov_df.columns, index=cov_df.index) stiff_df.index.name = basepairname # save csv data @@ -152,37 +156,35 @@ def launch(self) -> int: for i in range(len(stiff_df)): for j in range(len(stiff_df)): axs.text( - j+.5, - i+.5, + j + 0.5, + i + 0.5, f"{stiff_df[coordinates[j]].loc[coordinates[i]]:.2f}", ha="center", va="center", - color="w") + color="w", + ) axs.text( - 0, -1.35, + 0, + -1.35, "Units:\n" "Diagonal Shift/Slide/Rise in kcal/(mol*Ų), Diagonal Tilt/Roll/Twist in kcal/(mol*degree²)\n" "Out of Diagonal: Shift/Slide/Rise in kcal/(mol*Å), Out of Diagonal Tilt/Roll/Twist in kcal/(mol*degree)", - fontsize=6) + fontsize=6, + ) axs.set_xticks([i + 0.5 for i in range(len(stiff_df))]) axs.set_xticklabels(stiff_df.columns, rotation=90) - axs.set_yticks([i+0.5 for i in range(len(stiff_df))]) + axs.set_yticks([i + 0.5 for i in range(len(stiff_df))]) axs.set_yticklabels(stiff_df.index) - axs.set_title( - f"Stiffness Constants for Base Pair Step \'{basepairname}\'") + axs.set_title(f"Stiffness Constants for Base Pair Step '{basepairname}'") fig.tight_layout() - fig.savefig( - self.stage_io_dict['out']['output_jpg_path'], - format="jpg") + fig.savefig(self.stage_io_dict["out"]["output_jpg_path"], format="jpg") plt.close() # Copy files to host self.copy_to_host() # Remove temporary file(s) - self.tmp_files.extend([ - self.stage_io_dict.get("unique_dir", "") - ]) + self.tmp_files.extend([self.stage_io_dict.get("unique_dir", "")]) self.remove_tmp_files() self.check_arguments(output_files_created=True, raise_exception=False) @@ -191,10 +193,17 @@ def launch(self) -> int: def basepair_stiffness( - input_filename_shift: str, input_filename_slide: str, - input_filename_rise: str, input_filename_tilt: str, - input_filename_roll: str, input_filename_twist: str, - output_csv_path: str, output_jpg_path: str, properties: Optional[dict] = None, **kwargs) -> int: + input_filename_shift: str, + input_filename_slide: str, + input_filename_rise: str, + input_filename_tilt: str, + input_filename_roll: str, + input_filename_twist: str, + output_csv_path: str, + output_jpg_path: str, + properties: Optional[dict] = None, + **kwargs, +) -> int: """Create :class:`BPStiffness ` class and execute the :meth:`launch() ` method.""" @@ -207,32 +216,60 @@ def basepair_stiffness( input_filename_twist=input_filename_twist, output_csv_path=output_csv_path, output_jpg_path=output_jpg_path, - properties=properties, **kwargs).launch() + properties=properties, + **kwargs, + ).launch() def main(): """Command line execution of this building block. Please check the command line documentation.""" - parser = argparse.ArgumentParser(description='Calculate stiffness constants matrix between all six helical parameters for a single base pair step.', - formatter_class=lambda prog: argparse.RawTextHelpFormatter(prog, width=99999)) - parser.add_argument('--config', required=False, help='Configuration file') - - required_args = parser.add_argument_group('required arguments') - required_args.add_argument('--input_filename_shift', required=True, - help='Path to csv file with shift inputs. Accepted formats: csv.') - required_args.add_argument('--input_filename_slide', required=True, - help='Path to csv file with slide inputs. Accepted formats: csv.') - required_args.add_argument('--input_filename_rise', required=True, - help='Path to csv file with rise inputs. Accepted formats: csv.') - required_args.add_argument('--input_filename_tilt', required=True, - help='Path to csv file with tilt inputs. Accepted formats: csv.') - required_args.add_argument('--input_filename_roll', required=True, - help='Path to csv file with roll inputs. Accepted formats: csv.') - required_args.add_argument('--input_filename_twist', required=True, - help='Path to csv file with twist inputs. Accepted formats: csv.') - required_args.add_argument('--output_csv_path', required=True, - help='Path to output covariance data file. Accepted formats: csv.') - required_args.add_argument('--output_jpg_path', required=True, - help='Path to output covariance data file. Accepted formats: csv.') + parser = argparse.ArgumentParser( + description="Calculate stiffness constants matrix between all six helical parameters for a single base pair step.", + formatter_class=lambda prog: argparse.RawTextHelpFormatter(prog, width=99999), + ) + parser.add_argument("--config", required=False, help="Configuration file") + + required_args = parser.add_argument_group("required arguments") + required_args.add_argument( + "--input_filename_shift", + required=True, + help="Path to csv file with shift inputs. Accepted formats: csv.", + ) + required_args.add_argument( + "--input_filename_slide", + required=True, + help="Path to csv file with slide inputs. Accepted formats: csv.", + ) + required_args.add_argument( + "--input_filename_rise", + required=True, + help="Path to csv file with rise inputs. Accepted formats: csv.", + ) + required_args.add_argument( + "--input_filename_tilt", + required=True, + help="Path to csv file with tilt inputs. Accepted formats: csv.", + ) + required_args.add_argument( + "--input_filename_roll", + required=True, + help="Path to csv file with roll inputs. Accepted formats: csv.", + ) + required_args.add_argument( + "--input_filename_twist", + required=True, + help="Path to csv file with twist inputs. Accepted formats: csv.", + ) + required_args.add_argument( + "--output_csv_path", + required=True, + help="Path to output covariance data file. Accepted formats: csv.", + ) + required_args.add_argument( + "--output_jpg_path", + required=True, + help="Path to output covariance data file. Accepted formats: csv.", + ) args = parser.parse_args() args.config = args.config or "{}" @@ -247,8 +284,9 @@ def main(): input_filename_twist=args.input_filename_twist, output_csv_path=args.output_csv_path, output_jpg_path=args.output_jpg_path, - properties=properties) + properties=properties, + ) -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/biobb_dna/utils/common.py b/biobb_dna/utils/common.py new file mode 100644 index 00000000..cba3c853 --- /dev/null +++ b/biobb_dna/utils/common.py @@ -0,0 +1,30 @@ +from typing import Optional, Union + + +# TODO: Move this function to biobb_common.tools.file_utils +def _from_string_to_list(input_data: Optional[Union[str, list[str]]]) -> list[str]: + """ + Converts a string to a list, splitting by commas or spaces. If the input is already a list, returns it as is. + Returns an empty list if input_data is None. + + Parameters: + input_data (str, list, or None): The string, list, or None value to convert. + + Returns: + list: A list of string elements or an empty list if input_data is None. + """ + if input_data is None: + return [] + + if isinstance(input_data, list): + # If input is already a list, return it + return input_data + + # If input is a string, determine the delimiter based on presence of commas + delimiter = "," if "," in input_data else " " + items = input_data.split(delimiter) + + # Remove whitespace from each item and ignore empty strings + processed_items = [item.strip() for item in items if item.strip()] + + return processed_items