From 7d4007fd2c75d731641a01fdc9dbbb64e8919053 Mon Sep 17 00:00:00 2001 From: FlyPythons Date: Tue, 17 Apr 2018 21:00:00 +0800 Subject: [PATCH] reconstruct the code and update to decrease the memory use --- fastaSplit.py | 150 ++++++++++++++-------------------------------- fastqSplit.py | 161 +++++++++++++++++--------------------------------- 2 files changed, 98 insertions(+), 213 deletions(-) diff --git a/fastaSplit.py b/fastaSplit.py index c6397f1..7f32fa6 100644 --- a/fastaSplit.py +++ b/fastaSplit.py @@ -2,13 +2,9 @@ """ Split fasta files by accumulated sequence length or number Copyright@fanjunpeng (jpfan@whu.edu.cn) - -2018/1/12: init by fanjunpeng -2018/1/13: add support for more fasta files -2018/1/13: add support for more multiprocess """ -from __future__ import absolute_import +#from __future__ import absolute_import import logging import os @@ -23,128 +19,71 @@ LOG = logging.getLogger(__name__) -def write_record(records, out_filename): - """ - write fasta records to file +def split_fasta(filename, mode, number, output_dir="split", max_split=1000): """ - - # write seq records - fasta_out = open(out_filename, "w") - bed_out = open("%s.bed" % out_filename, 'w') - - for record in records: - fasta_out.write("%s\n" % str(record)) - bed_out.write("%s\t1\t%s\n" % (record.id, record.length)) - - fasta_out.close() - bed_out.close() - - return out_filename - - -def split_by_number(filename, number, output_dir="split", max_split=1000): - """ - split Fasta file by {number} records per file + split Fasta file by {number} length per file :param filename: Fasta filename - :param number: records per file + :param mode: length or number + :param number: record length per file :param output_dir: the output directory of sub fasta files :param max_split: the max number of sub files, avoid too much sub files :return: list of sub file names """ + assert isinstance(number, int) + assert mode in ["length", "number"] + r = [] head = os.path.splitext(os.path.basename(filename.rstrip(".gz")))[0] prefix = os.path.join(output_dir, head) - sub_num = 0 - fasta_records = [] - n = 0 + n = 1 + records = open_fasta(filename) - for record in open_fasta(filename): + while True: - if n == number: + if n >= max_split: + msg = "file %r cuts more than %s, break" % (filename, max_split) + raise Exception(msg) - if sub_num > max_split: - msg = "file %r cuts more than %s, break" % (filename, max_split) - raise Exception(msg) + out_filename = "%s%s.fasta" % (prefix, n) + out = open(out_filename, "w") + count = 0 - out_filename = "%s.%s.fasta" % (prefix, sub_num) - write_record(fasta_records, out_filename) - r.append(out_filename) - sub_num += 1 + while count < number: + record = records.next() - fasta_records = [] - n = 0 + if record: + out.write(str(record)+"\n") + if mode == "length": + count += record.length + else: + count += 1 + else: + break - fasta_records.append(record) - n += 1 - - # add the last part if has - if fasta_records: - out_filename = "%s.%s.fasta" % (prefix, sub_num) - write_record(fasta_records, out_filename) + out.close() r.append(out_filename) - return r - + if count < number: + break -def split_by_length(filename, number, output_dir="split", max_split=1000): - """ - split Fasta file by {number} length per file - :param filename: Fasta filename - :param number: record length per file - :param output_dir: the output directory of sub fasta files - :param max_split: the max number of sub files, avoid too much sub files - :return: list of sub file names - """ - r = [] - - head = os.path.splitext(os.path.basename(filename.rstrip(".gz")))[0] - prefix = os.path.join(output_dir, head) - - sub_num = 0 - fasta_records = [] - n = 0 - - for record in open_fasta(filename): - - if n > number: - - if sub_num > max_split: - msg = "file %r cuts more than %s, break" % (filename, max_split) - raise Exception(msg) - - out_filename = "%s.%s.fasta" % (prefix, sub_num) - write_record(fasta_records, out_filename) - r.append(out_filename) - sub_num += 1 - - fasta_records = [] - n = 0 - - fasta_records.append(record) - n += record.length - - # add the last part if has - if fasta_records: - out_filename = "%s.%s.fasta" % (prefix, sub_num) - write_record(fasta_records, out_filename) - r.append(out_filename) + n += 1 return r def fastaSplit(filenames, mode, num, output_dir, concurrent=1, max_split=1000): """ - split fasta files + split fasta files, use multiprocess for parallel :param filenames: a list of fasta files :param mode: length or number - :param num: + :param num: :param output_dir: output directory :param concurrent: see -h :param max_split: see -h - :return: + :return: """ assert mode in ["number", "length"] num = int(num) @@ -155,19 +94,18 @@ def fastaSplit(filenames, mode, num, output_dir, concurrent=1, max_split=1000): # avoid rerun if os.path.exists(done): - print("%r exists, pass this step; if you want to rerun, delete the file" % done) + LOG.info("%r exists, pass this step; if you want to rerun, delete the file" % done) return fofn2list(split_list) # for multiprocessing pool = Pool(processes=concurrent) results = [] - print("Split '{filenames}' by sequence {mode} =~ {num} per file".format(**locals())) - if mode == "number": - for f in filenames: - results.append(pool.apply_async(split_by_number, (f, num, output_dir, max_split))) - if mode == "length": - for f in filenames: - results.append(pool.apply_async(split_by_length, (f, num, output_dir, max_split))) + + LOG.info("Split '{filenames}' by sequence {mode} =~ {num} per file".format(**locals())) + + for f in filenames: + print("processing %s" % f) + results.append(pool.apply_async(split_fasta, (f, mode, num, output_dir, max_split))) pool.close() pool.join() @@ -193,10 +131,10 @@ def set_args(): """) args.add_argument("input", metavar="FASTAs", nargs="+", help="fasta files") - args.add_argument("-m", "--mode", choices=["number", "length"], default="length", help="split by number or length") - args.add_argument("-n", "--number", type=int, metavar="INT", help="the value of mode") + args.add_argument("-m", "--mode", choices=["number", "length"], required=True, help="split by number or length") + args.add_argument("-n", "--number", type=int, required=True, metavar="INT", help="the value of mode") args.add_argument("-o", "--output_dir", default="split", metavar="DIR", help="output directory") - args.add_argument("-ms", "--max_split", type=int, default=1000, metavar="INT", help="the max number of sub files") + args.add_argument("-ms", "--max_split", type=int, default=6666, metavar="INT", help="the max number of sub files") args.add_argument("-c", "--concurrent", type=int, default=1, metavar="INT", help="number of concurrent process") return args.parse_args() diff --git a/fastqSplit.py b/fastqSplit.py index f739424..ae855a5 100644 --- a/fastqSplit.py +++ b/fastqSplit.py @@ -2,13 +2,9 @@ """ Split fastq files by accumulated sequence length or number Copyright@fanjunpeng (jpfan@whu.edu.cn) - -2018/1/12: init by fanjunpeng -2018/1/13: add support for more fastq files -2018/1/13: add support for more multiprocess """ -from __future__ import absolute_import +#from __future__ import absolute_import import logging import os @@ -23,118 +19,74 @@ LOG = logging.getLogger(__name__) -def write_record(records, out_filename): - """ - write fastq records to file - """ - - # write seq records - fastq_out = open(out_filename, "w") - - for record in records: - fastq_out.write("%s\n" % str(record)) - - fastq_out.close() - - return out_filename - - -def split_by_number(filename, number, output_dir="split", max_split=1000): - """ - split Fastq file by {number} records per file - :param filename: Fastq filename - :param number: records per file - :param output_dir: the output directory of sub fastq files - :param max_split: the max number of sub files, avoid too much sub files - :return: list of sub file names - """ - - r = [] - - head = os.path.splitext(os.path.basename(filename.rstrip(".gz")))[0] - prefix = os.path.join(output_dir, head) - - sub_num = 0 - fastq_records = [] - n = 0 - - for record in open_fastq(filename): - - if n == number: - - if sub_num > max_split: - msg = "file %r cuts more than %s, break" % (filename, max_split) - raise Exception(msg) - - out_filename = "%s.%s.fastq" % (prefix, sub_num) - write_record(fastq_records, out_filename) - r.append(out_filename) - sub_num += 1 - - fastq_records = [] - n = 0 - - fastq_records.append(record) - n += 1 - - # add the last part if has - if fastq_records: - out_filename = "%s.%s.fastq" % (prefix, sub_num) - write_record(fastq_records, out_filename) - r.append(out_filename) - - return r - - -def split_by_length(filename, number, output_dir="split", max_split=1000): +def split_fastq(filename, mode, number, output_dir="split", max_split=1000): """ split Fastq file by {number} length per file :param filename: Fastq filename + :param mode: length or number :param number: record length per file :param output_dir: the output directory of sub fastq files :param max_split: the max number of sub files, avoid too much sub files :return: list of sub file names """ + + assert isinstance(number, int) + assert mode in ["length", "number"] + r = [] head = os.path.splitext(os.path.basename(filename.rstrip(".gz")))[0] prefix = os.path.join(output_dir, head) - sub_num = 0 - fastq_records = [] - n = 0 - - for record in open_fastq(filename): - - if n > number: - - if sub_num > max_split: - msg = "file %r cuts more than %s, break" % (filename, max_split) - raise Exception(msg) - - out_filename = "%s.%s.fastq" % (prefix, sub_num) - write_record(fastq_records, out_filename) - r.append(out_filename) - sub_num += 1 - - fastq_records = [] - n = 0 + if prefix.endswith("R1"): + prefix = prefix.rstrip("R1") + part = "R1." + elif prefix.endswith("R2"): + prefix = prefix.rstrip("R2") + part = "R2." + else: + LOG.info("filename don't match '*R[12].fastq'") + part = "" + + n = 1 + records = open_fastq(filename) + + while True: + + if n >= max_split: + msg = "file %r cuts more than %s, break" % (filename, max_split) + raise Exception(msg) + + out_filename = "%s%s.%sfastq" % (prefix, n, part) + out = open(out_filename, "w") + count = 0 + + while count < number: + record = records.next() + + if record: + out.write(str(record)+"\n") + if mode == "length": + count += record.length + else: + count += 1 + else: + break + + out.close() + r.append(out_filename) - fastq_records.append(record) - n += record.length + if count < number: + break - # add the last part if has - if fastq_records: - out_filename = "%s.%s.fastq" % (prefix, sub_num) - write_record(fastq_records, out_filename) - r.append(out_filename) + n += 1 return r def fastqSplit(filenames, mode, num, output_dir, concurrent=1, max_split=1000): """ - split fastq files + split fastq files, use multiprocess for parallel :param filenames: a list of fastq files :param mode: length or number :param num: @@ -152,23 +104,18 @@ def fastqSplit(filenames, mode, num, output_dir, concurrent=1, max_split=1000): # avoid rerun if os.path.exists(done): - print("%r exists, pass this step; if you want to rerun, delete the file" % done) + LOG.info("%r exists, pass this step; if you want to rerun, delete the file" % done) return fofn2list(split_list) # for multiprocessing pool = Pool(processes=concurrent) results = [] - print("Split '{filenames}' by sequence {mode} =~ {num} per file".format(**locals())) + LOG.info("Split '{filenames}' by sequence {mode} =~ {num} per file".format(**locals())) - if mode == "number": - for f in filenames: - print("processing %s" % f) - results.append(pool.apply_async(split_by_number, (f, num, output_dir, max_split))) - if mode == "length": - for f in filenames: - print("processing %s" % f) - results.append(pool.apply_async(split_by_length, (f, num, output_dir, max_split))) + for f in filenames: + print("processing %s" % f) + results.append(pool.apply_async(split_fastq, (f, mode, num, output_dir, max_split))) pool.close() pool.join() @@ -197,7 +144,7 @@ def set_args(): args.add_argument("-m", "--mode", choices=["number", "length"], required=True, help="split by number or length") args.add_argument("-n", "--number", type=int, required=True, metavar="INT", help="the value of mode") args.add_argument("-o", "--output_dir", default="split", metavar="DIR", help="output directory") - args.add_argument("-ms", "--max_split", type=int, default=1000, metavar="INT", help="the max number of sub files") + args.add_argument("-ms", "--max_split", type=int, default=6666, metavar="INT", help="the max number of sub files") args.add_argument("-c", "--concurrent", type=int, default=1, metavar="INT", help="number of concurrent process") return args.parse_args()