-
Notifications
You must be signed in to change notification settings - Fork 0
/
scheduler.py
253 lines (206 loc) · 9.06 KB
/
scheduler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
#!/usr/bin/env python
import logging
import subprocess
from logging.handlers import RotatingFileHandler
import click
import fire
import omegaconf
import termcolor
import tzlocal
from apscheduler.schedulers.blocking import BlockingScheduler
def load_config(config_file: str) -> omegaconf.dictconfig.DictConfig:
"""
Load the configuration file.
Args:
config_file (str): The path to the configuration file.
Returns:
omegaconf.dictconfig.DictConfig: The configuration file.
"""
# Load the configuration file.
cfg = omegaconf.OmegaConf.load(config_file)
# TODO: Validate the configuration file.
# validate_config(cfg)
return cfg
def build_command(step: dict) -> str:
"""
Build the command to be executed by subprocess.call().
Args:
step (DictConfig): The step to build the command for.
Returns:
str: The command to be executed.
"""
# Build the command
arguments = ""
if "arguments" in step.keys():
for arg in step.arguments:
logging.info(arg)
arguments += arg + " "
return "python " + step.script + " " + arguments
def setup_logger(
log_file: str, log_level: str = "INFO", log_size: int = 2 * 1024 * 1024, log_backup_count: int = 3
) -> logging.Logger:
"""Set up the logger using a rotating file handler.
Args:
log_file (str): The path to the log file.
log_level (str): The log level.
log_size (int): The maximum size of the log file.
log_backup_count (int): The number of log files to keep.
Returns:
logging.Logger: The logger.
"""
# Retrive the log level object
level = logging.getLevelName(log_level)
handler = RotatingFileHandler(log_file, mode="w", maxBytes=log_size, backupCount=log_backup_count, delay=False)
handler.setLevel(level)
# Define the log message format
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s", datefmt="%a, %d %b %Y %H:%M:%S")
handler.setFormatter(formatter)
# Set up the logger
logger = logging.getLogger(log_file)
logger.setLevel(level)
# Add the handler to the logger
logger.addHandler(handler)
return logger
def scheduler(
config_file: str = "conf/config.yaml",
log_file: str = None,
log_level: str = "INFO",
log_size: int = 2 * 1024 * 1024,
log_backup_count: int = 3,
non_interactive: bool = False,
keep_running: bool = False,
) -> None:
"""Schedule the pipeline to run either once or on a regular basis using cron.
Args:
config_file (str): The path to the configuration file.
log_file (str): The path to the log file.
log_level (str): The log level.
log_size (int): The maximum size of the log file in bytes.
log_backup_count (int): The number of log files to keep.
non_interactive (bool): Run the pipeline in non-interactive mode.
keep_running (bool): Stop the current run but keep scheduling new runs in accordance with the cron settings.
"""
# load the config file
cfg = load_config(config_file)
# set up the logger if a log file is specified
if log_file:
logger = setup_logger(log_file, log_level, log_size, log_backup_count)
# schedule the pipeline to run on a regular basis using cron if cron is specified in the config file
# and no-interactive is set to True.
if non_interactive and "cron" in cfg.keys():
# Schedule the pipeline to run on a regular basis using cron.
bsched = BlockingScheduler(timezone=str(tzlocal.get_localzone()))
bsched.add_job(
lambda: linker(cfg, logger, non_interactive, keep_running, bsched),
"cron",
**dict(cfg.cron),
)
bsched.start()
else:
# Run the pipeline once.
linker(cfg, logger, non_interactive)
def linker(
cfg: omegaconf.dictconfig.DictConfig,
logger: logging.Logger = None,
non_interactive: bool = False,
keep_running: bool = True,
bsched: BlockingScheduler = None,
) -> int:
"""
Process the configuration file and execute the pipeline steps in order. In interactive mode,
if a step encounters an error, the user is given the option to retry or skip it.
Opting to retry will prompt the scheduler to restart the step, while selecting to skip
will prompt the scheduler to inquire if the user wants to restart the entire pipeline or exit.
In non-interactive mode, the pipeline will exit if a step encounters an error.
The process is logged to a file if the --no-interactive flag is set.
Args:
cfg (DictConfig): The configuration file.
non_interactive (bool): Run the pipeline in non-interactive mode.
logger (Logger): The logger.
keep_running (bool): Stop the current run but keep scheduling new runs in accordance with the cron settings.
bsched (BlockingScheduler): The scheduler. Only used when keep-running is set to True.
Returns:
int: 0 if the pipeline completes successfully, 1 if the user exits the pipeline or an error is thrown.
"""
# Iterate through the pipeline steps and execute them in order.
pipeline_steps = cfg.steps
for i, step in enumerate(pipeline_steps):
# Build the command to be executed.
command = build_command(step)
while True:
# Print the step name and the script name to be executed.
if logger:
logger.info(f"Processing Step {i+1}: {step.script}...")
else:
click.echo(termcolor.colored(f"Processing Step {i+1}: {step.script}...", "yellow"))
# Execute the step.
try:
# Default the return code to 0.
return_code = 0
if logger:
logger.info("-" * 20 + " Output " + "-" * 20)
else:
click.echo("-" * 20 + " Output " + "-" * 20)
# Execute the step and capture the output.
if logger:
output = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=True)
logger.info("\n" + output.decode("utf-8").rstrip())
else:
subprocess.check_call(command, stderr=subprocess.STDOUT, shell=True)
if logger:
logger.info("-" * 20 + "--------" + "-" * 20)
else:
click.echo("-" * 20 + "--------" + "-" * 20)
except subprocess.CalledProcessError as e:
# If the step fails, print the error and set the return code to the error code.
if logger:
logger.error(e.output.decode("utf-8").rstrip())
logger.info("-" * 20 + "--------" + "-" * 20)
logger.error(f"Step {i+1} ({step.script}) failed with error: {e}.")
else:
click.echo("-" * 20 + "--------" + "-" * 20)
click.echo(termcolor.colored(f"Step {i+1} ({step.script}) failed with error: {e}.", "red"))
# Update the return code
return_code = e.returncode
# Check the return code
if return_code != 0 and non_interactive:
# If the --no-interactive flag is set, exit the pipeline.
if not keep_running and bsched is not None:
bsched.shutdown(wait=False)
return 1
elif return_code != 0:
# If the return code is not 0, the step failed. Prompt the user to retry or skip the step.
response = click.prompt(termcolor.colored("Do you want to rerun this step? (y/n)", "blue"))
# If the user selects to retry, restart the step.
if response.lower() == "y":
click.echo("Restarting step...")
else:
response = click.prompt(
termcolor.colored(
"Do you want to rerun the whole pipeline from the beginning? (y/n)",
"blue",
)
)
# If the user selects to restart the pipeline, restart the pipeline.
if response.lower() == "y":
click.echo(termcolor.colored("Restarting pipeline...", "yellow"))
linker(cfg, logger, non_interactive, keep_running, bsched)
return 0
else:
click.echo(termcolor.colored("Exiting...", "red"))
return 1
# If the return code is 0, the step completed successfully.
else:
if logger:
logger.info(f"Step {i+1} ({step.script}) completed successfully.")
else:
click.echo(
termcolor.colored(
f"Step {i+1} ({step.script}) completed successfully.",
"green",
)
)
break
return 0
if __name__ == "__main__":
fire.Fire(scheduler)