Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle fasta #266

Merged
merged 8 commits into from
Jan 31, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 84 additions & 0 deletions bin/fasta2peptides.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
#!/usr/bin/env python3

import argparse
import logging
from collections import defaultdict
from time import time
from Bio import SeqIO

# Configure logging
logging.basicConfig(
format="%(asctime)s - %(levelname)s - %(message)s",
level=logging.INFO
)

def parse_fasta(fasta_file):
"""Parses a FASTA file and returns a dictionary {header: peptide_col_name}"""
fasta_map = {}
with open(fasta_file) as f:
for record in SeqIO.parse(f, "fasta"):
fasta_map[record.id] = str(record.seq)
return fasta_map

def generate_peptides(fasta_map, peptide_length):
"""Generates peptides of a specific length from the input sequences."""
start_time = time()
peptides_set = set()

for header, seq in fasta_map.items():
for i in range(len(seq) - peptide_length + 1):
peptides_set.add((seq[i:i + peptide_length], header))

logging.info(f"Generated {len(peptides_set):,.0f} peptides of length {peptide_length} in {time() - start_time:.2f} seconds")
return peptides_set

def group_peptides(peptides_set, peptide_col_name):
"""Collapses identical peptides from different proteins and aggregates headers."""
start_time = time()
peptides_dict = defaultdict(lambda: {"protein_ids": set(), "counts": 0})

for sequence, protein_id in peptides_set:
peptides_dict[sequence]["protein_ids"].add(protein_id)
peptides_dict[sequence]["counts"] += 1

peptides = [
{peptide_col_name: seq, "protein_ids": ";".join(data["protein_ids"]), "counts": data["counts"]}
for seq, data in peptides_dict.items()
]

logging.info(f"Grouped peptides in {time() - start_time:.2f} seconds")
return peptides

def write_output(peptides, output_file, peptide_col_name):
"""Writes the peptides data to a TSV file without using pandas."""
start_time = time()

with open(output_file, "w") as f:
# Write header
f.write(f"{peptide_col_name}\tprotein_ids\tcounts\n")
# Write data
for peptide in peptides:
f.write(f"{peptide[peptide_col_name]}\t{peptide['protein_ids']}\t{peptide['counts']}\n")

logging.info(f"Wrote {len(peptides):,.0f} peptides to {output_file} in {time() - start_time:.2f} seconds")

def main():
parser = argparse.ArgumentParser(description="Generate peptides of different lengths from a FASTA file and save as separate TSV files.")
parser.add_argument("-i", "--input", required=True, help="Input FASTA file")
parser.add_argument("-o", "--output_prefix", required=True, help="Output file prefix (each length will have its own file)")
parser.add_argument("-minl","--min_length", type=int, required=True, help="Minimum peptide length")
parser.add_argument("-maxl","--max_length", type=int, required=True, help="Maximum peptide length")
parser.add_argument("-pepcol","--peptide_col_name", type=str, required=True, help="Peptide column name")

args = parser.parse_args()

fasta_map = parse_fasta(args.input)

for k in range(args.min_length, args.max_length + 1):
peptides_set = generate_peptides(fasta_map, k)
peptides = group_peptides(peptides_set, args.peptide_col_name)
output_filename = f"{args.output_prefix}_length_{k}.tsv"
write_output(peptides, output_filename, args.peptide_col_name)

if __name__ == "__main__":
main()
68 changes: 0 additions & 68 deletions bin/gen_peptides.py

This file was deleted.

73 changes: 44 additions & 29 deletions bin/split_peptides.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,49 @@
#!/usr/bin/env python
# Written by Sabrina Krakau, Christopher Mohr and released under the MIT license (2022).

#!/usr/bin/env python3
# Written by Jonas Scheid the MIT license (2025).

import argparse
import math
from pathlib import Path

def split_peptides(input_file, output_base, min_size, max_chunks):
"""Splits the peptide input file into smaller chunks in a single pass."""
input_path = Path(input_file)
output_base = Path(output_base)

with input_path.open("r") as infile:
lines = infile.readlines() # Read all lines into memory

header, data_lines = lines[0], lines[1:] # Separate header
total_size = len(data_lines) # Number of peptides (excluding header)

if total_size == 0:
raise ValueError("Input file contains no peptides.")

# Determine number of chunks & chunk size
num_chunks = min(math.ceil(total_size / min_size), max_chunks)
chunk_size = max(min_size, math.ceil(total_size / num_chunks))

parser = argparse.ArgumentParser("Split peptides input file.")
parser.add_argument("-i", "--input", metavar="FILE", type=str, help="Input file containing peptides.")
parser.add_argument("-o", "--output_base", type=str, help="Base filename for output files.")
parser.add_argument(
"-s", "--min_size", metavar="N", type=int, help="Minimum number of peptides that should be written into one file."
)
parser.add_argument(
"-c", "--max_chunks", metavar="N", type=int, help="Maximum number of chunks that should be created."
)
args = parser.parse_args()

with open(args.input) as infile:
tot_size = sum([1 for _ in infile]) - 1

n = int(min(math.ceil(float(tot_size) / args.min_size), args.max_chunks))
h = int(max(args.min_size, math.ceil(float(tot_size) / n)))

with open(args.input) as infile:
header = next(infile)
for chunk in range(n):
with open(args.output_base + ".chunk_" + str(chunk) + ".tsv", "w") as outfile:
for chunk_idx in range(num_chunks):
chunk_file = output_base.with_name(f"{output_base.stem}_chunk_{chunk_idx}.tsv")
start = chunk_idx * chunk_size
end = start + chunk_size

with chunk_file.open("w") as outfile:
outfile.write(header)
for _ in range(h):
try:
outfile.write(next(infile))
except StopIteration:
break
outfile.writelines(data_lines[start:end])

if end >= total_size:
break # Stop if we've written all data

def main():
parser = argparse.ArgumentParser(description="Split a peptide file into smaller chunks.")
parser.add_argument("-i", "--input", required=True, help="Input file containing peptides.")
parser.add_argument("-o", "--output_base", required=True, help="Base filename for output files.")
parser.add_argument("--min_size", type=int, required=True, help="Minimum peptides per file.")
parser.add_argument("--max_chunks", type=int, required=True, help="Maximum number of chunks.")

args = parser.parse_args()
split_peptides(args.input, args.output_base, args.min_size, args.max_chunks)

if __name__ == "__main__":
main()
Loading
Loading