Skip to content

Commit

Permalink
[FIX](all): Accept lists in different formats on input properties.
Browse files Browse the repository at this point in the history
  • Loading branch information
PauAndrio committed Nov 14, 2024
1 parent 0dcf716 commit ad1d66b
Show file tree
Hide file tree
Showing 12 changed files with 1,205 additions and 917 deletions.
225 changes: 116 additions & 109 deletions biobb_dna/backbone/bipopulations.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@

import matplotlib.pyplot as plt
import pandas as pd
from biobb_common.configuration import settings
from biobb_common.generic.biobb_object import BiobbObject
from biobb_common.tools.file_utils import launchlogger
from numpy import nan

from biobb_dna.utils.common import _from_string_to_list
from biobb_dna.utils.loader import read_series
from biobb_dna.utils.transform import inverse_complement
from biobb_common.generic.biobb_object import BiobbObject
from biobb_common.tools.file_utils import launchlogger
from biobb_common.configuration import settings


class BIPopulations(BiobbObject):
Expand Down Expand Up @@ -60,10 +62,17 @@ class BIPopulations(BiobbObject):
"""

def __init__(self, input_epsilC_path, input_epsilW_path,
input_zetaC_path, input_zetaW_path,
output_csv_path, output_jpg_path,
properties=None, **kwargs) -> None:
def __init__(
self,
input_epsilC_path,
input_epsilW_path,
input_zetaC_path,
input_zetaW_path,
output_csv_path,
output_jpg_path,
properties=None,
**kwargs,
) -> None:
properties = properties or {}

# Call parent class constructor
Expand All @@ -72,21 +81,23 @@ def __init__(self, input_epsilC_path, input_epsilW_path,

# Input/Output files
self.io_dict = {
'in': {
'input_epsilC_path': input_epsilC_path,
'input_epsilW_path': input_epsilW_path,
'input_zetaC_path': input_zetaC_path,
'input_zetaW_path': input_zetaW_path
"in": {
"input_epsilC_path": input_epsilC_path,
"input_epsilW_path": input_epsilW_path,
"input_zetaC_path": input_zetaC_path,
"input_zetaW_path": input_zetaW_path,
},
"out": {
"output_csv_path": output_csv_path,
"output_jpg_path": output_jpg_path,
},
'out': {
'output_csv_path': output_csv_path,
'output_jpg_path': output_jpg_path
}
}

self.properties = properties
self.sequence = properties.get("sequence")
self.seqpos = properties.get("seqpos", None)
self.seqpos = [
int(item) for item in _from_string_to_list(properties.get("seqpos", None))
]

# Check the properties
self.check_properties(properties)
Expand All @@ -102,90 +113,68 @@ def launch(self) -> int:
self.stage_files()

# check sequence
if self.sequence is None or len(self.sequence) < 2:
if not self.sequence or len(self.sequence) < 2:
raise ValueError("sequence is null or too short!")

# check seqpos
if self.seqpos is not None:
if self.seqpos:
if (max(self.seqpos) > len(self.sequence) - 2) or (min(self.seqpos) < 1):
raise ValueError(
f"seqpos values must be between 1 and {len(self.sequence) - 2}")
f"seqpos values must be between 1 and {len(self.sequence) - 2}"
)
if not (isinstance(self.seqpos, list) and len(self.seqpos) > 1):
raise ValueError(
"seqpos must be a list of at least two integers")
raise ValueError("seqpos must be a list of at least two integers")

# read input files
epsilC = read_series(
self.stage_io_dict['in']['input_epsilC_path'],
usecols=self.seqpos)
self.stage_io_dict["in"]["input_epsilC_path"], usecols=self.seqpos
)
epsilW = read_series(
self.stage_io_dict['in']['input_epsilW_path'],
usecols=self.seqpos)
self.stage_io_dict["in"]["input_epsilW_path"], usecols=self.seqpos
)
zetaC = read_series(
self.stage_io_dict['in']['input_zetaC_path'],
usecols=self.seqpos)
self.stage_io_dict["in"]["input_zetaC_path"], usecols=self.seqpos
)
zetaW = read_series(
self.stage_io_dict['in']['input_zetaW_path'],
usecols=self.seqpos)
self.stage_io_dict["in"]["input_zetaW_path"], usecols=self.seqpos
)

# calculate difference between epsil and zeta parameters
xlabels = self.get_xlabels(
self.sequence,
inverse_complement(self.sequence))
diff_epsil_zeta = self.get_angles_difference(
epsilC,
zetaC,
epsilW,
zetaW)
xlabels = self.get_xlabels(self.sequence, inverse_complement(self.sequence))
diff_epsil_zeta = self.get_angles_difference(epsilC, zetaC, epsilW, zetaW)

# calculate BI population
BI = (diff_epsil_zeta < 0).sum(axis=0) * 100 / len(diff_epsil_zeta)
BII = 100 - BI

# save table
Bpopulations_df = pd.DataFrame({
"Nucleotide": xlabels,
"BI population": BI,
"BII population": BII})
Bpopulations_df = pd.DataFrame(
{"Nucleotide": xlabels, "BI population": BI, "BII population": BII}
)
Bpopulations_df.to_csv(
self.stage_io_dict['out']['output_csv_path'],
index=False)
self.stage_io_dict["out"]["output_csv_path"], index=False
)

# save plot
fig, axs = plt.subplots(1, 1, dpi=300, tight_layout=True)
axs.bar(
range(len(xlabels)),
BI,
label="BI")
axs.bar(
range(len(xlabels)),
BII,
bottom=BI,
label="BII")
axs.bar(range(len(xlabels)), BI, label="BI")
axs.bar(range(len(xlabels)), BII, bottom=BI, label="BII")
# empty bar to divide both sequences
axs.bar(
[len(BI)//2],
[100],
color='white',
label=None)
axs.bar([len(BI) // 2], [100], color="white", label=None)
axs.legend()
axs.set_xticks(range(len(xlabels)))
axs.set_xticklabels(xlabels, rotation=90)
axs.set_xlabel("Nucleotide Sequence")
axs.set_ylabel("BI/BII Population (%)")
axs.set_title("Nucleotide parameter: BI/BII Population")
fig.savefig(
self.stage_io_dict['out']['output_jpg_path'],
format="jpg")
fig.savefig(self.stage_io_dict["out"]["output_jpg_path"], format="jpg")
plt.close()

# Copy files to host
self.copy_to_host()

# Remove temporary file(s)
self.tmp_files.extend([
self.stage_io_dict.get("unique_dir", "")
])
self.tmp_files.extend([self.stage_io_dict.get("unique_dir", "")])
self.remove_tmp_files()

self.check_arguments(output_files_created=True, raise_exception=False)
Expand All @@ -195,48 +184,43 @@ def launch(self) -> int:
def get_xlabels(self, strand1, strand2):
# get list of tetramers, except first and last two bases
labelsW = list(strand1)
labelsW[0] = f"{labelsW[0]}5\'"
labelsW[-1] = f"{labelsW[-1]}3\'"
labelsW = [
f"{i}-{j}" for i, j in zip(labelsW, range(1, len(labelsW)+1))]
labelsW[0] = f"{labelsW[0]}5'"
labelsW[-1] = f"{labelsW[-1]}3'"
labelsW = [f"{i}-{j}" for i, j in zip(labelsW, range(1, len(labelsW) + 1))]
labelsC = list(strand2)[::-1]
labelsC[0] = f"{labelsC[0]}5\'"
labelsC[-1] = f"{labelsC[-1]}3\'"
labelsC = [
f"{i}-{j}" for i, j in zip(labelsC, range(len(labelsC), 0, -1))]
labelsC[0] = f"{labelsC[0]}5'"
labelsC[-1] = f"{labelsC[-1]}3'"
labelsC = [f"{i}-{j}" for i, j in zip(labelsC, range(len(labelsC), 0, -1))]

if self.seqpos is not None:
if self.seqpos:
labelsC = [labelsC[i] for i in self.seqpos]
labelsW = [labelsW[i] for i in self.seqpos]
xlabels = labelsW + ['-'] + labelsC
xlabels = labelsW + ["-"] + labelsC
return xlabels

def get_angles_difference(self, epsilC, zetaC, epsilW, zetaW):
# concatenate zeta and epsil arrays
separator_df = pd.DataFrame({"-": nan}, index=range(len(zetaW)))
zeta = pd.concat([
zetaW,
separator_df,
zetaC[zetaC.columns[::-1]]],
axis=1)
epsil = pd.concat([
epsilW,
separator_df,
epsilC[epsilC.columns[::-1]]],
axis=1)
zeta = pd.concat([zetaW, separator_df, zetaC[zetaC.columns[::-1]]], axis=1)
epsil = pd.concat([epsilW, separator_df, epsilC[epsilC.columns[::-1]]], axis=1)

# difference between epsilon and zeta coordinates
diff_epsil_zeta = epsil - zeta
return diff_epsil_zeta


def bipopulations(
input_epsilC_path: str, input_epsilW_path: str,
input_zetaC_path: str, input_zetaW_path: str,
output_csv_path: str, output_jpg_path: str,
properties: Optional[dict] = None, **kwargs) -> int:
input_epsilC_path: str,
input_epsilW_path: str,
input_zetaC_path: str,
input_zetaW_path: str,
output_csv_path: str,
output_jpg_path: str,
properties: Optional[dict] = None,
**kwargs,
) -> int:
"""Create :class:`BIPopulations <dna.backbone.bipopulations.BIPopulations>` class and
execute the: meth: `launch() <dna.backbone.bipopulations.BIPopulations.launch>` method. """
execute the: meth: `launch() <dna.backbone.bipopulations.BIPopulations.launch>` method."""

return BIPopulations(
input_epsilC_path=input_epsilC_path,
Expand All @@ -245,28 +229,50 @@ def bipopulations(
input_zetaW_path=input_zetaW_path,
output_csv_path=output_csv_path,
output_jpg_path=output_jpg_path,
properties=properties, **kwargs).launch()
properties=properties,
**kwargs,
).launch()


def main():
"""Command line execution of this building block. Please check the command line documentation."""
parser = argparse.ArgumentParser(description='Calculate BI/BII populations.',
formatter_class=lambda prog: argparse.RawTextHelpFormatter(prog, width=99999))
parser.add_argument('--config', required=False, help='Configuration file')

required_args = parser.add_argument_group('required arguments')
required_args.add_argument('--input_epsilC_path', required=True,
help='Helical parameter <epsilC> input ser file path. Accepted formats: ser.')
required_args.add_argument('--input_epsilW_path', required=True,
help='Helical parameter <epsilW> input ser file path. Accepted formats: ser.')
required_args.add_argument('--input_zetaC_path', required=True,
help='Helical parameter <zetaC> input ser file path. Accepted formats: ser.')
required_args.add_argument('--input_zetaW_path', required=True,
help='Helical parameter <zetaW> input ser file path. Accepted formats: ser.')
required_args.add_argument('--output_csv_path', required=True,
help='Path to output csv file. Accepted formats: csv.')
required_args.add_argument('--output_jpg_path', required=True,
help='Path to output jpg file. Accepted formats: jpg.')
parser = argparse.ArgumentParser(
description="Calculate BI/BII populations.",
formatter_class=lambda prog: argparse.RawTextHelpFormatter(prog, width=99999),
)
parser.add_argument("--config", required=False, help="Configuration file")

required_args = parser.add_argument_group("required arguments")
required_args.add_argument(
"--input_epsilC_path",
required=True,
help="Helical parameter <epsilC> input ser file path. Accepted formats: ser.",
)
required_args.add_argument(
"--input_epsilW_path",
required=True,
help="Helical parameter <epsilW> input ser file path. Accepted formats: ser.",
)
required_args.add_argument(
"--input_zetaC_path",
required=True,
help="Helical parameter <zetaC> input ser file path. Accepted formats: ser.",
)
required_args.add_argument(
"--input_zetaW_path",
required=True,
help="Helical parameter <zetaW> input ser file path. Accepted formats: ser.",
)
required_args.add_argument(
"--output_csv_path",
required=True,
help="Path to output csv file. Accepted formats: csv.",
)
required_args.add_argument(
"--output_jpg_path",
required=True,
help="Path to output jpg file. Accepted formats: jpg.",
)

args = parser.parse_args()
args.config = args.config or "{}"
Expand All @@ -279,8 +285,9 @@ def main():
input_zetaW_path=args.input_zetaW_path,
output_csv_path=args.output_csv_path,
output_jpg_path=args.output_jpg_path,
properties=properties)
properties=properties,
)


if __name__ == '__main__':
if __name__ == "__main__":
main()
Loading

0 comments on commit ad1d66b

Please sign in to comment.