-
Notifications
You must be signed in to change notification settings - Fork 24
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #266 from jonasscheid/handle-fasta
Handle fasta
- Loading branch information
Showing
31 changed files
with
696 additions
and
479 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
#!/usr/bin/env python3 | ||
|
||
import argparse | ||
import logging | ||
from collections import defaultdict | ||
from time import time | ||
from Bio import SeqIO | ||
|
||
# Configure logging | ||
logging.basicConfig( | ||
format="%(asctime)s - %(levelname)s - %(message)s", | ||
level=logging.INFO | ||
) | ||
|
||
def parse_fasta(fasta_file): | ||
"""Parses a FASTA file and returns a dictionary {header: peptide_col_name}""" | ||
fasta_map = {} | ||
with open(fasta_file) as f: | ||
for record in SeqIO.parse(f, "fasta"): | ||
fasta_map[record.id] = str(record.seq) | ||
return fasta_map | ||
|
||
def generate_peptides(fasta_map, peptide_length): | ||
"""Generates peptides of a specific length from the input sequences.""" | ||
start_time = time() | ||
peptides_set = set() | ||
|
||
for header, seq in fasta_map.items(): | ||
for i in range(len(seq) - peptide_length + 1): | ||
peptides_set.add((seq[i:i + peptide_length], header)) | ||
|
||
logging.info(f"Generated {len(peptides_set):,.0f} peptides of length {peptide_length} in {time() - start_time:.2f} seconds") | ||
return peptides_set | ||
|
||
def group_peptides(peptides_set, peptide_col_name): | ||
"""Collapses identical peptides from different proteins and aggregates headers.""" | ||
start_time = time() | ||
peptides_dict = defaultdict(lambda: {"protein_ids": set(), "counts": 0}) | ||
|
||
for sequence, protein_id in peptides_set: | ||
peptides_dict[sequence]["protein_ids"].add(protein_id) | ||
peptides_dict[sequence]["counts"] += 1 | ||
|
||
peptides = [ | ||
{peptide_col_name: seq, "protein_ids": ";".join(data["protein_ids"]), "counts": data["counts"]} | ||
for seq, data in peptides_dict.items() | ||
] | ||
|
||
logging.info(f"Grouped peptides in {time() - start_time:.2f} seconds") | ||
return peptides | ||
|
||
def write_output(peptides, output_file, peptide_col_name): | ||
"""Writes the peptides data to a TSV file without using pandas.""" | ||
start_time = time() | ||
|
||
with open(output_file, "w") as f: | ||
# Write header | ||
f.write(f"{peptide_col_name}\tprotein_ids\tcounts\n") | ||
# Write data | ||
for peptide in peptides: | ||
f.write(f"{peptide[peptide_col_name]}\t{peptide['protein_ids']}\t{peptide['counts']}\n") | ||
|
||
logging.info(f"Wrote {len(peptides):,.0f} peptides to {output_file} in {time() - start_time:.2f} seconds") | ||
|
||
def main(): | ||
parser = argparse.ArgumentParser(description="Generate peptides of different lengths from a FASTA file and save as separate TSV files.") | ||
parser.add_argument("-i", "--input", required=True, help="Input FASTA file") | ||
parser.add_argument("-o", "--output_prefix", required=True, help="Output file prefix (each length will have its own file)") | ||
parser.add_argument("-minl","--min_length", type=int, required=True, help="Minimum peptide length") | ||
parser.add_argument("-maxl","--max_length", type=int, required=True, help="Maximum peptide length") | ||
parser.add_argument("-pepcol","--peptide_col_name", type=str, required=True, help="Peptide column name") | ||
|
||
args = parser.parse_args() | ||
|
||
fasta_map = parse_fasta(args.input) | ||
|
||
for k in range(args.min_length, args.max_length + 1): | ||
peptides_set = generate_peptides(fasta_map, k) | ||
peptides = group_peptides(peptides_set, args.peptide_col_name) | ||
output_filename = f"{args.output_prefix}_length_{k}.tsv" | ||
write_output(peptides, output_filename, args.peptide_col_name) | ||
|
||
if __name__ == "__main__": | ||
main() |
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,34 +1,49 @@ | ||
#!/usr/bin/env python | ||
# Written by Sabrina Krakau, Christopher Mohr and released under the MIT license (2022). | ||
|
||
#!/usr/bin/env python3 | ||
# Written by Jonas Scheid the MIT license (2025). | ||
|
||
import argparse | ||
import math | ||
from pathlib import Path | ||
|
||
def split_peptides(input_file, output_base, min_size, max_chunks): | ||
"""Splits the peptide input file into smaller chunks in a single pass.""" | ||
input_path = Path(input_file) | ||
output_base = Path(output_base) | ||
|
||
with input_path.open("r") as infile: | ||
lines = infile.readlines() # Read all lines into memory | ||
|
||
header, data_lines = lines[0], lines[1:] # Separate header | ||
total_size = len(data_lines) # Number of peptides (excluding header) | ||
|
||
if total_size == 0: | ||
raise ValueError("Input file contains no peptides.") | ||
|
||
# Determine number of chunks & chunk size | ||
num_chunks = min(math.ceil(total_size / min_size), max_chunks) | ||
chunk_size = max(min_size, math.ceil(total_size / num_chunks)) | ||
|
||
parser = argparse.ArgumentParser("Split peptides input file.") | ||
parser.add_argument("-i", "--input", metavar="FILE", type=str, help="Input file containing peptides.") | ||
parser.add_argument("-o", "--output_base", type=str, help="Base filename for output files.") | ||
parser.add_argument( | ||
"-s", "--min_size", metavar="N", type=int, help="Minimum number of peptides that should be written into one file." | ||
) | ||
parser.add_argument( | ||
"-c", "--max_chunks", metavar="N", type=int, help="Maximum number of chunks that should be created." | ||
) | ||
args = parser.parse_args() | ||
|
||
with open(args.input) as infile: | ||
tot_size = sum([1 for _ in infile]) - 1 | ||
|
||
n = int(min(math.ceil(float(tot_size) / args.min_size), args.max_chunks)) | ||
h = int(max(args.min_size, math.ceil(float(tot_size) / n))) | ||
|
||
with open(args.input) as infile: | ||
header = next(infile) | ||
for chunk in range(n): | ||
with open(args.output_base + ".chunk_" + str(chunk) + ".tsv", "w") as outfile: | ||
for chunk_idx in range(num_chunks): | ||
chunk_file = output_base.with_name(f"{output_base.stem}_chunk_{chunk_idx}.tsv") | ||
start = chunk_idx * chunk_size | ||
end = start + chunk_size | ||
|
||
with chunk_file.open("w") as outfile: | ||
outfile.write(header) | ||
for _ in range(h): | ||
try: | ||
outfile.write(next(infile)) | ||
except StopIteration: | ||
break | ||
outfile.writelines(data_lines[start:end]) | ||
|
||
if end >= total_size: | ||
break # Stop if we've written all data | ||
|
||
def main(): | ||
parser = argparse.ArgumentParser(description="Split a peptide file into smaller chunks.") | ||
parser.add_argument("-i", "--input", required=True, help="Input file containing peptides.") | ||
parser.add_argument("-o", "--output_base", required=True, help="Base filename for output files.") | ||
parser.add_argument("--min_size", type=int, required=True, help="Minimum peptides per file.") | ||
parser.add_argument("--max_chunks", type=int, required=True, help="Maximum number of chunks.") | ||
|
||
args = parser.parse_args() | ||
split_peptides(args.input, args.output_base, args.min_size, args.max_chunks) | ||
|
||
if __name__ == "__main__": | ||
main() |
Oops, something went wrong.