-
Notifications
You must be signed in to change notification settings - Fork 0
/
compress_files.py
176 lines (148 loc) · 5.15 KB
/
compress_files.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
#!/usr/bin/env python3
"""
This script searches for uncompressed/human readable sequence data files in a given directory,
compresses them using pigz or converts SAM files to BAM format using samtools, and logs the
compression details.
"""
import os
import subprocess
import multiprocessing
import argparse
from typing import Tuple, Set, List
import time
from filelock import FileLock
def parse_command_line_args() -> Tuple[str, str, bool]:
"""
Parse command line arguments.
Returns:
Tuple: A tuple containing the search directory path and the logging file path.
"""
parser = argparse.ArgumentParser()
parser.add_argument(
"--search_dir",
"-d",
type=str,
required=True,
help="Directory to search for uncompressed/human readable sequence data.",
)
parser.add_argument(
"--logging_file",
"-l",
type=str,
required=False,
default="compression.log",
help="Directory to search for uncompressed/human readable sequence data.",
)
parser.add_argument(
"--dry-run",
"-t",
type=bool,
required=False,
default=False,
help="Set to true to create a log of all files without actually compressing them.",
)
args = parser.parse_args()
return args.search_dir, args.logging_file, args.dry_run
def find_files(directory: str, extensions: Tuple[str, ...]) -> List[str]:
"""
Find files with specified extensions in a given directory and its subdirectories.
Args:
directory (str): The directory to search for files.
extensions (Tuple[str, ...]): A tuple of file extensions to search for.
Returns:
List[str]: A list of file paths that match the specified extensions.
"""
file_list: List[str] = []
for root, _, files in os.walk(directory):
for file in files:
if file.endswith(extensions) and not file.startswith("._"):
file_list.append(os.path.join(root, file))
return file_list
def process_file(
file: str,
preprocessed_files: Set[str],
extensions1: Tuple[str],
extensions2: Tuple[str],
extensions3: Tuple[str],
log_path: str,
dry_run: bool,
) -> None:
"""
Compress or convert a file and log the details.
Args:
file (str): The path to the file to process.
preprocessed_files (Set[str]): A set of files that have already been processed.
extensions1 (Tuple[str]): Tuple of extensions for the first compression method.
extensions2 (Tuple[str]): Tuple of extensions for the second compression method.
extensions3 (Tuple[str]): Tuple of extensions for file conversion.
log_path (str): The path to the logging file.
"""
t1 = time.time()
orig_size = os.stat(file).st_size / (1024 * 1024)
if file in preprocessed_files:
return
if file.endswith(extensions1) or file.endswith(extensions2) and not dry_run:
subprocess.run(["pigz", file], check=True)
elif file.endswith(extensions3) and not dry_run:
subprocess.run(
["samtools", "view", "-b", "-o", f"{file}.bam", file], check=True
)
time_elapsed = time.time() - t1
if not dry_run:
fin_size = os.stat(f"{file}.gz").st_size / (1024 * 1024)
else:
fin_size = None
lock_path = f"{log_path}.lock"
lock = FileLock(lock_path, timeout=60)
lock.acquire()
try:
with open(log_path, "a", encoding="utf-8") as log:
log.write(f"{file}\t{time_elapsed}\t{orig_size}\t{fin_size}\n")
finally:
lock.release()
def main() -> None:
"""
Main function to execute the file processing and compression tasks.
"""
# Set the directory to start the search
base_directory, log_path, whether_dry = parse_command_line_args()
# Define file extensions
fasta_extensions = tuple([".fasta", ".fa"])
fastq_extensions = tuple([".fastq", ".fq"])
sam_extensions = tuple([".sam"])
# Check for existing logs
processed_files = set()
if os.path.exists("compression.log"):
with open("compression.log", "r", encoding="utf-8") as log_file:
for line in log_file:
processed_files.add(line.strip())
else:
with open("compression.log", "w", encoding="utf-8") as log:
log.write("File\tTime (ms)\tOriginal Size\tCompressed Size\n")
# Get lists of files
fasta_files = find_files(base_directory, fasta_extensions)
fastq_files = find_files(base_directory, fastq_extensions)
sam_files = find_files(base_directory, sam_extensions)
# Create a pool of workers for parallel processing
pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
# Process files in parallel
pool.starmap(
process_file,
[
(
file,
processed_files,
fasta_extensions,
fastq_extensions,
sam_extensions,
log_path,
whether_dry,
)
for file in fasta_files + fastq_files + sam_files
],
)
# Close the pool
pool.close()
pool.join()
if __name__ == "__main__":
main()