-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathCommandLineHandler.py
318 lines (290 loc) · 15.5 KB
/
CommandLineHandler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
#
# Class used to handle processing when the program is running in pure command line mode
# (i.e. no GUI interface).
#
import os
from datetime import datetime
import MasterMakerExceptions
from ConsoleSimplePrint import ConsoleSimplePrint
from Constants import Constants
from DataModel import DataModel
from FileCombiner import FileCombiner
from FileDescriptor import FileDescriptor
from RmFitsUtil import RmFitsUtil
from SessionController import SessionController
class CommandLineHandler:
def __init__(self, args, data_model: DataModel):
"""
Initialize this object
:param args: Text arguments given on the unix command line
:param data_model: Data model describing the program options (initialized with defaults)
"""
self._args = args
self._data_model: DataModel = data_model
def execute(self):
"""
Execute the program with the options specified on the command line, no GUI
"""
valid: bool
file_names: [str]
single_output_path: str
(valid, single_output_path, file_names) = self.validate_inputs()
if valid:
groups_output_directory = self._args.outputdirectory
if self.process_files(file_names, single_output_path, groups_output_directory):
print("Successful completion")
# Make sure the command-line inputs are valid. Fill in any give parameters into the existing
# data model (which is already set up with defaults).
# Check the following:
# - One or more input files, and all files exist
# - If a min-max clip value is specified, it is > 0
# - If a sigma threshold is specified, it is > 0
# - If -gt used, threshold is 0 to 100
# - If -mg used, group size is > 0
# Returns: validity flag, output path if specified, array of file names
def validate_inputs(self) -> (bool, [str]):
"""
Validate command-line arguments already stored in the object,
and consolidate them with preferences for any missing settings.
See the method source for an introductory comment listing all the validations that are done.
:return: Tuple, a validity boolean, the output path, and a list of input file paths
"""
valid = True
args = self._args
file_names = []
output_path = ""
# File names
if len(args.filenames) > 0:
for file_name in args.filenames:
if os.path.isfile(file_name):
# This file is OK, we're good here
pass
else:
print(f"File does not exist: {file_name}")
valid = False
file_names = args.filenames
else:
print("No file names given")
valid = False
# Master frame combination algorithm and parameters
if args.mean:
print(f" Setting MEAN combination")
self._data_model.set_master_combine_method(Constants.COMBINE_MEAN)
elif args.median:
print(f" Setting MEDIAN combination")
self._data_model.set_master_combine_method(Constants.COMBINE_MEDIAN)
elif args.minmax is not None:
self._data_model.set_master_combine_method(Constants.COMBINE_MINMAX)
if args.minmax >= 1:
print(f" Setting MIN-MAX combination, clipping {args.minmax} extremes")
self._data_model.set_min_max_number_clipped_per_end(args.minmax)
else:
print(f"Min-Max clipping argument must be > 0, not {args.minmax}")
valid = False
elif args.sigma is not None:
self._data_model.set_master_combine_method(Constants.COMBINE_SIGMA_CLIP)
if args.sigma > 0:
print(f" Setting SIGMA combination, z-threshold = {args.sigma}")
self._data_model.set_sigma_clip_threshold(args.sigma)
else:
print(f"Sigma clipping threshold must be > 0, not {args.sigma}")
valid = False
# Insist on same file type in all files?
if args.ignoretype:
print(f" Ignoring file types")
self._data_model.set_ignore_file_type(True)
# What to do with input files after a successful run
if args.moveinputs is not None:
self._data_model.set_input_file_disposition(Constants.INPUT_DISPOSITION_SUBFOLDER)
self._data_model.set_disposition_subfolder_name(args.moveinputs)
print(f" After processing move files to {args.moveinputs}")
# Where should output files go?
if args.output is not None:
print(f" Output path: {args.output}")
output_path = args.output
# Grouping gs gt <threshold> mg <minimum>
# - If -gt used, threshold is 0 to 100
# - If -mg used, group size is > 0
if args.groupsize:
print(" Group files by size")
self._data_model.set_group_by_size(True)
if args.grouptemperature is not None:
self._data_model.set_group_by_temperature(True)
bandwidth = float(args.grouptemperature)
if 0.1 <= bandwidth <= 50:
print(f" Group files by temperature with bandwidth {bandwidth}")
self._data_model.set_temperature_group_bandwidth(bandwidth)
else:
print("-gt bandwidth must be between 0.1 and 50")
valid = False
if args.minimumgroup is not None:
self._data_model.set_ignore_groups_fewer_than(True)
minimum_size = int(args.minimumgroup)
if minimum_size > 0:
print(f" Ignore groups smaller than {minimum_size}")
self._data_model.set_minimum_group_size(minimum_size)
else:
print(f" Minimum group size must be > 0, not {minimum_size}")
valid = False
# If any of the grouping options are in use, then the output directory is mandatory
if self._data_model.get_group_by_temperature() or self._data_model.get_group_by_size():
if args.outputdirectory is None:
print("If any of the group-by options are used, then the output directory option is mandatory")
valid = False
return valid, output_path, file_names
# The main processing method that combines the files using the selected algorithm
def process_files(self, file_names: [str],
output_path: str,
groups_output_directory: str) -> bool:
"""
Process all the files listed in the command line, with the given combination settings
:param file_names: List of file path names to be processed
:param output_path: Path where output is to be placed
:param groups_output_directory: Path for output directory if grouping option is used
:return: Success indicator
"""
success = True
file_descriptors = RmFitsUtil.make_file_descriptions(file_names)
# check types are all bias
if self._data_model.get_ignore_file_type() \
or FileCombiner.all_of_type(file_descriptors, FileDescriptor.FILE_TYPE_BIAS):
output_file_path = self.make_output_path(output_path, file_descriptors)
self.run_combination_session(file_descriptors, output_file_path, groups_output_directory)
else:
print("Files are not all Bias files. (Use -t option to suppress this check.)")
success = False
return success
def run_combination_session(self, descriptors: [FileDescriptor],
output_path: str,
output_directory: str):
"""
Create a console output object. This is passed in to the various math routines
to allow them to output progress. We use this indirect method of getting progress
so that it can go to the console window in this case, but the same worker code can send
progress lines to the standard system output when being run from the command line
:param descriptors: File descriptors of all input files to be processed
:param output_path: Path for single combined output file
:param output_directory: Path for output directory if grouping is in use
"""
console = ConsoleSimplePrint()
console.message("Starting session", 0)
# A "session controller" is necessary, but has an interesting effect only in the GUI version.
# In our command-line case we'll create it but its state will never change so it does nothing
dummy_session_controller = SessionController()
file_combiner = FileCombiner(dummy_session_controller, self.file_moved_callback)
# Do the file combination - select method depending on whether we are processing by groups
try:
# Are we using grouped processing?
if self._data_model.get_group_by_size() \
or self._data_model.get_group_by_temperature():
file_combiner.process_groups(self._data_model, descriptors,
output_directory,
console)
else:
# Not grouped, producing a single output file. Get output file location
file_combiner.original_non_grouped_processing(descriptors, self._data_model,
output_path,
console)
except FileNotFoundError as exception:
self.error_dialog("File not found", f"File \"{exception.filename}\" not found or not readable")
except MasterMakerExceptions.NoGroupOutputDirectory as exception:
self.error_dialog("Group Directory Missing",
f"The specified output directory \"{exception.get_directory_name()}\""
f" does not exist and could not be created.")
except MasterMakerExceptions.NotAllBiasFrames:
self.error_dialog("The selected files are not all Bias Frames",
"If you know the files are bias frames, they may not have proper FITS data "
"internally. Check the \"Ignore FITS file type\" box to proceed anyway.")
except MasterMakerExceptions.IncompatibleSizes:
self.error_dialog("The selected files can't be combined",
"To be combined into a master file, the files must have identical X and Y "
"dimensions, and identical Binning values.")
except PermissionError as exception:
self.error_dialog("Unable to write file",
f"The specified output file, "
f"\"{exception.filename}\","
f" cannot be written or replaced: \"permission error\"")
# Make output file name.
# If file name is specified on command line, use that.
# Otherwise make up a file name and a path that places it in the same
# location as the first input file.
def make_output_path(self,
output_path_parameter,
file_descriptors: [FileDescriptor]) -> str:
"""
Create a suitable output file name, fully-qualified
If file name is specified on command line, use that.
Otherwise make up a file name and a path that places it in the same
location as the first input file.
:param output_path_parameter: Given information about the output path
:param file_descriptors: Description of all the files being processed
:return: Created output path name
"""
if output_path_parameter == "":
return self.create_output_path(file_descriptors[0],
self._data_model.get_master_combine_method(),
self._data_model.get_sigma_clip_threshold(),
self._data_model.get_min_max_number_clipped_per_end())
else:
return output_path_parameter
# Create a file name for the output file
# of the form Bias-Mean-yyyymmddhhmm-temp-x-y-bin.fit
@classmethod
def create_output_path(cls, sample_input_file: FileDescriptor,
combine_method: int,
sigma_threshold: float,
min_max_clipped: int):
"""
Create an output file name in the case where one wasn't specified
:param sample_input_file: Input file to be used for data in output file name
:param combine_method: Code for the type of combination done
:param sigma_threshold: SIGMA parameter if sigma-clip method in use
:param min_max_clipped: Min-Max clip parameter if min-max-clip method in use
"""
# Get directory of sample input file
directory_prefix = os.path.dirname(sample_input_file.get_absolute_path())
file_name = cls.get_file_name_portion(combine_method, sample_input_file,
sigma_threshold, min_max_clipped)
file_path = f"{directory_prefix}/{file_name}"
return file_path
@classmethod
def get_file_name_portion(cls,
combine_method: int,
sample_input_file: FileDescriptor,
sigma_threshold: float,
min_max_clipped: int) -> str:
"""
Return the file name portion (no directory paths) of a generated file name for the given combine method
:param combine_method: Code for the type of combination being done
:param sample_input_file: Input file used as representative of output parameters
:param sigma_threshold: Threshold value if sigma-clip in use
:param min_max_clipped: Number of clips if min-max clip in use
:return: Generated file name
"""
now = datetime.now()
date_time_string = now.strftime("%Y%m%d-%H%M")
temperature = f"{sample_input_file.get_temperature():.1f}"
exposure = f"{sample_input_file.get_exposure():.3f}"
dimensions = f"{sample_input_file.get_x_dimension()}x{sample_input_file.get_y_dimension()}"
binning = f"{sample_input_file.get_binning()}x{sample_input_file.get_binning()}"
method = Constants.combine_method_string(combine_method)
if combine_method == Constants.COMBINE_SIGMA_CLIP:
method += str(sigma_threshold)
elif combine_method == Constants.COMBINE_MINMAX:
method += str(min_max_clipped)
file_name = f"BIAS-{method}-{date_time_string}-{exposure}s-{temperature}C-{dimensions}-{binning}.fit"
return file_name
def file_moved_callback(self, file_name_moved: str):
# print(f"file_moved_callback: {file_name_moved}")
pass
# We ignore the callback telling us a file was moved. No UI needs to be updated
#
# Error message from an exception. Put it on the console
#
def error_dialog(self, short_message: str, long_message: str):
"""
Put error message from a program exception on the console.
:param short_message: Brief form of message
:param long_message: More detail if available
"""
print("*** ERROR *** " + short_message + ":\n " + long_message)