Skip to content

Commit

Permalink
add pipeline status monitor (#372)
Browse files Browse the repository at this point in the history
* add pipeline status monitor

* and project entry_points
  • Loading branch information
nicHoch authored Dec 7, 2023
1 parent a801e54 commit f065afe
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 3 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -162,3 +162,4 @@ stixcore/data/test/ephemeris/spice/kernels/mk/*.abs
stixcore/data/soop/
stixcore/data/publish/
stixcore/data/test/products/end2end/
monitor_status.json
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ install_requires =
[options.entry_points]
console_scripts =
stix-pipeline-status = stixcore.processing.pipeline_status:main
stix-pipeline-monitor = stixcore.processing.pipeline_monitor:main
stix-pipeline = stixcore.processing.pipeline:main
stix-pipeline-cli = stixcore.processing.pipeline_cli:main
stix-publish = stixcore.processing.publish:main
Expand Down
103 changes: 103 additions & 0 deletions stixcore/processing/pipeline_monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import sys
import json
import smtplib
import argparse
import datetime
from pprint import pprint, pformat
from pathlib import Path

from dateutil import parser as dateparser

from stixcore.config.config import CONFIG
from stixcore.processing.pipeline_status import get_status
from stixcore.util.logging import get_logger

__all__ = ['pipeline_monitor']

logger = get_logger(__name__)


def pipeline_monitor(args):
"""Status logger and notification script for the pipeline.
SetUp via cron.
Query the number of open files still to process. Logs that number into a status file
and checks if the the number is constantly equal or increasing.
Sends an notification via mail if a possible pipeline stuck is detected.
"""
parser = argparse.ArgumentParser(description='stix pipeline monitor')
parser.add_argument("-p", "--port",
help="connection port for the status info server",
default=CONFIG.getint('Pipeline', 'status_server_port', fallback=12345),
type=int)

parser.add_argument("-s", "--save_file",
help="file to persist last status",
default="monitor_status.json", type=str)

args = parser.parse_args(args)

ret = get_status("next".encode(), args.port)
open_files = int(ret.replace("open files: ", ""))
save_file = Path(args.save_file)

status = {}
status['last'] = []

if save_file.exists():
with open(save_file, "+r") as f:
try:
status = json.load(f)
except Exception:
pass

status['last'].append({"date": datetime.datetime.now().isoformat(timespec='milliseconds'),
"open": open_files})

status['last'] = status['last'][-9:]

if len(status['last']) == 9 and open_files > 0:
stuck = True
last_open = status['last'][0]
for la in status['last'][1:]:
if la['open'] <= 0 or la['open'] < last_open['open']:
stuck = False
break
last_open = la
if stuck:
fd = dateparser.parse(status['last'][0]['date'])
ld = dateparser.parse(status['last'][-1]['date'])
if (ld - fd).days >= 1:
if CONFIG.getboolean('Publish', 'report_mail_send', fallback=False):
try:
sender = CONFIG.get('Pipeline', 'error_mail_sender', fallback='')
receivers = CONFIG.get('Publish', 'report_mail_receivers').split(",")
host = CONFIG.get('Pipeline', 'error_mail_smpt_host', fallback='localhost')
port = CONFIG.getint('Pipeline', 'error_mail_smpt_port', fallback=25)
smtp_server = smtplib.SMTP(host=host, port=port)
message = f"""Subject: StixCore Pipeline Monitor
Pipeline stuck?
{pformat(status)}
Login to server and check
"""

smtp_server.sendmail(sender, receivers, message)
except Exception as e:
logger.error(f"Error: unable to send monitor email: {e}")

with open(save_file, "w") as f:
json.dump(status, f, indent=4)

pprint(status)


def main():
pipeline_monitor(sys.argv[1:])


if __name__ == '__main__':
main()
6 changes: 3 additions & 3 deletions stixcore/processing/pipeline_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from stixcore.config.config import CONFIG
from stixcore.util.logging import get_logger

__all__ = ['pipeline_status']
__all__ = ['pipeline_status', 'get_status']

logger = get_logger(__name__)

Expand All @@ -28,7 +28,7 @@ def get_status(msg, port=12346):
line = server.readline()
if not line:
break
print(f"{line.decode().rstrip()}")
return f"{line.decode().rstrip()}"

finally:
sock.close()
Expand Down Expand Up @@ -69,7 +69,7 @@ def pipeline_status(args):

cmd = args.cmd.encode() if args.cmd else b'last'

get_status(cmd, args.port)
print(get_status(cmd, args.port))


def main():
Expand Down

0 comments on commit f065afe

Please sign in to comment.