diff --git a/.github/workflows/run-tests.yml b/.github/workflows/run-tests.yml new file mode 100644 index 0000000..1d7274a --- /dev/null +++ b/.github/workflows/run-tests.yml @@ -0,0 +1,55 @@ +name: Test + +on: + push: + branches: [master, devel] + pull_request: + branches: [master, devel] + +jobs: + + test: + runs-on: ubuntu-latest + + services: + mysql: + image: "mysql:8.0" + ports: + - "3306:3306" + options: >- + --health-cmd "mysqladmin ping" + --health-interval 10s + --health-timeout 5s + --health-retries 10 + env: + MYSQL_RANDOM_ROOT_PASSWORD: yes + MYSQL_TCP_PORT: 3306 + MYSQL_USER: "test" + MYSQL_PASSWORD: "test" + MYSQL_DATABASE: "study_notify" + + steps: + - uses: actions/checkout@v4 + + - name: Install Poetry + run: | + pipx install poetry + + - uses: actions/setup-python@v5 + with: + python-version: '3.11' + architecture: 'x64' + + - name: Run poetry install + run: | + poetry env use '3.11' + poetry install + + - name: Run pytest + run: | + poetry run pytest + + - name: Run linter (ruff) + run: | + poetry run ruff check --output-format=github . + diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..18966b1 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,12 @@ +# Change Log for npg_notifications Project + +The format is based on [Keep a Changelog](http://keepachangelog.com/). +This project adheres to [Semantic Versioning](http://semver.org/). + +## [Unreleased] + +## [0.1.0] - 2024-07-24 + +### Added + +# Initial project scaffold, code and tests diff --git a/README.md b/README.md index c9c0c6b..fafd1b7 100644 --- a/README.md +++ b/README.md @@ -1 +1,43 @@ -# npg_notifications \ No newline at end of file +# npg_notifications + +A utility for notifying customers about lifecycle events in the analysis +and QC of sequencing data. + +[porch](https://github.com/wtsi-npg/npg_porch) service is used to queue and +process (send) notifications. The notification producer can repeatedly send +to `porch` the same notification. `porch` guarantees that the repeated +notifications are not kept and therefore not processed. + +The consumer of the notifications is responsible for sending the message +to the customer. For a fully automated system the consumer should implement +the correct protocol for dealing with failed attempts to notify the customer. + +If different types of notification (for example, an e-mail and a MQ message) +have to be sent for the same event, it is advised either to use a separate +`porch` pipeline for each type of notification or to include additional +information about the notification protocol and format into the payload that +is sent to `porch`. + +## Scope + +The current version implements notifications for PacBio sequencing platform +customers. + +## Running the scripts + +To register recently QC-ed entities as tasks with `porch` + +```bash +npg_qc_state_notification register --conf_file_path path/to/qc_state_app_config.ini +``` + +To process one `porch` task + +```bash +npg_qc_state_notification process --conf_file_path path/to/qc_state_app_config.ini +``` + +Processing includes claiming one task, sending per-study emails and updating the +status of the `porch` task to `DONE`. + +The test data directory has an example of a [configuration file](tests/data/qc_state_app_config.ini). diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..9a48d84 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,45 @@ +[tool.poetry] +name = "npg_notify" +version = "0.0.1" +description = "Utility for client notifications" +authors = ["Marina Gourtovaia"] +license = "GPL-3.0-or-later" +readme = "README.md" + +[tool.poetry.scripts] +npg_qc_state_notification = "npg_notify.porch_wrapper.qc_state:run" + +[tool.poetry.dependencies] +python = "^3.11" +SQLAlchemy = { version="^2.0.1", extras=["pymysql"] } +SQLAlchemy-Utils = "^0.41.2" +cryptography = "^41.0.3" +PyYAML = "^6.0.0" +npg_porch_cli = { git="https://github.com/wtsi-npg/npg_porch_cli.git", tag="0.1.0" } + +[tool.poetry.dev-dependencies] +pytest = "^8.2.2" +requests-mock = "^1.12.1" +ruff = "^0.4.9" + +[build-system] +requires = ["poetry-core>=1.0.0"] +build-backend = "poetry.core.masonry.api" + +[tool.ruff] +# Set the maximum line length to 79. +line-length = 79 + +[tool.ruff.lint] +select = [ + # flake8 + "W", +] + +[tool.pytest.ini_options] +addopts = [ + "--import-mode=importlib", +] +pythonpath = [ + "src" +] diff --git a/src/npg_notify/__init__.py b/src/npg_notify/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/npg_notify/config.py b/src/npg_notify/config.py new file mode 100644 index 0000000..b079e30 --- /dev/null +++ b/src/npg_notify/config.py @@ -0,0 +1,75 @@ +import configparser +import json +import pathlib + +"""Common utility functions for the package.""" + +DEFAULT_CONF_FILE_TYPE = "ini" + + +def get_config_data(conf_file_path: str, conf_file_section: str = None): + """ + Parses a configuration file and returns its content. + + Allows for two types of configuration files, 'ini' and 'json'. The type of + the file is determined from the extension of the file name. In case of no + extension an 'ini' type is assumed. + + The content of the file is not cached, so subsequent calls to get data from + the same configuration file result in re-reading and re-parsing of the file. + + Args: + + conf_file_path: + A configuration file with database connection details. + conf_file_section: + The section of the configuration file. Optional. Should be defined + for 'ini' files. + + Returns: + For an 'ini' file returns the content of the given section of the file as + a dictionary. + For a 'json' file, if the conf_file_section argument is not defined, the + content of a file as a Python object is returned. If the conf_file_section + argument is defined, the object returned by the parser is assumed to be + a dictionary that has the value of the 'conf_file_section' argument as a key. + The value corresponding to this key is returned. + """ + + conf_file_extension = pathlib.Path(conf_file_path).suffix + if conf_file_extension: + conf_file_extension = conf_file_extension[1:] + else: + conf_file_extension = DEFAULT_CONF_FILE_TYPE + + if conf_file_extension == DEFAULT_CONF_FILE_TYPE: + if not conf_file_section: + raise Exception( + "'conf_file_section' argument is not given, " + "it should be defined for '{DEFAULT_CONF_FILE_TYPE}' " + "configuration file." + ) + + config = configparser.ConfigParser() + config.read(conf_file_path) + + return {i[0]: i[1] for i in config[conf_file_section].items()} + + elif conf_file_extension == "json": + conf: dict = json.load(conf_file_path) + if conf_file_section: + if isinstance(conf, dict) is False: + raise Exception(f"{conf_file_path} does not have sections.") + if conf_file_section in conf.keys: + conf = conf[conf_file_section] + else: + raise Exception( + f"{conf_file_path} does not contain {conf_file_section} key" + ) + + return conf + + else: + raise Exception( + f"Parsing for '{conf_file_extension}' files is not implemented" + ) diff --git a/src/npg_notify/db/__init__.py b/src/npg_notify/db/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/npg_notify/db/mlwh.py b/src/npg_notify/db/mlwh.py new file mode 100644 index 0000000..603c800 --- /dev/null +++ b/src/npg_notify/db/mlwh.py @@ -0,0 +1,148 @@ +# Copyright (C) 2024 Genome Research Ltd. +# +# Authors: +# Marina Gourtovaia +# Kieron Taylor +# +# This file is part of npg_notifications software package.. +# +# npg_notifications is free software: you can redistribute it and/or modify it +# under the terms of the GNU General Public License as published by the Free +# Sftware Foundation; either version 3 of the License, or any later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +# details. +# +# You should have received a copy of the GNU General Public License along with +# this program. If not, see . + +from sqlalchemy import ForeignKey, Integer, String, UniqueConstraint, select +from sqlalchemy.exc import NoResultFound +from sqlalchemy.orm import ( + DeclarativeBase, + Mapped, + Session, + mapped_column, + relationship, +) + +""" +Declarative ORM for some tables of multi-lims warehouse (mlwh) database. +For simplicity, only columns used by this package are represented. + +Available ORM classes: Study, StudyUser. + +Utility methods: get_study_contacts. +""" + +"Study contacts with these roles will receive notifications." +ROLES = ["manager", "follower", "owner"] + + +class Base(DeclarativeBase): + pass + + +class Study(Base): + "A representation for the 'study' table." + + __tablename__ = "study" + + id_study_tmp = mapped_column(Integer, primary_key=True, autoincrement=True) + id_lims = mapped_column(String(10), nullable=False) + id_study_lims = mapped_column(String(20), nullable=False) + name = mapped_column(String(255)) + + ( + UniqueConstraint( + "id_lims", + "id_study_lims", + name="study_id_lims_id_study_lims_index", + ), + ) + + study_users: Mapped[set["StudyUser"]] = relationship() + + def __repr__(self): + return f"Study {self.id_study_lims}, {self.name}" + + def contacts(self) -> list[str]: + """Retrieves emails of contacts for this study object. + + Returns: + A sorted list of unique emails for managers, followers or owners of + the study. + """ + + # In order to eliminate repetition, the comprehension expression below + # returns a set, which is then sorted to return a sorted list. + return sorted( + { + u.email + for u in self.study_users + if (u.email is not None and u.role is not None) + and (u.role in ROLES) + } + ) + + +class StudyUser(Base): + "A representation for the 'study_users' table." + + __tablename__ = "study_users" + + id_study_users_tmp = mapped_column( + Integer, primary_key=True, autoincrement=True + ) + id_study_tmp = mapped_column( + Integer, ForeignKey("study.id_study_tmp"), nullable=False, index=True + ) + role = mapped_column(String(255), nullable=True) + email = mapped_column(String(255), nullable=True) + + study: Mapped["Study"] = relationship(back_populates="study_users") + + def __repr__(self): + role = self.role if self.role else "None" + email = self.email if self.email else "None" + return f"StudyUser role={role}, email={email}" + + +class StudyNotFoundError(Exception): + "An error to use when a study does not exist in mlwh." + + pass + + +def get_study_contacts(session: Session, id: str) -> list[str]: + """Retrieves emails of study contacts from the mlwh database. + + Args: + session: + sqlalchemy.orm.Session object + id: + String study ID. + + Returns: + A sorted list of unique emails for managers, followers or owners of + the study. + + Example: + + from npg_notify.db.mlwh get_study_contacts + contact_emails = get_study_contacts(session=session, id="5901") + """ + try: + contacts = ( + session.execute(select(Study).where(Study.id_study_lims == id)) + .scalar_one() + .contacts() + ) + except NoResultFound: + raise StudyNotFoundError( + f"Study with ID {id} is not found in ml warehouse" + ) + + return contacts diff --git a/src/npg_notify/db/utils.py b/src/npg_notify/db/utils.py new file mode 100644 index 0000000..a87d8d8 --- /dev/null +++ b/src/npg_notify/db/utils.py @@ -0,0 +1,177 @@ +import importlib +import os +import pathlib +import re +from contextlib import contextmanager + +import yaml +from sqlalchemy import create_engine, insert, text +from sqlalchemy.engine import Engine +from sqlalchemy.orm import DeclarativeBase, Session +from sqlalchemy_utils import create_database, database_exists, drop_database + +from npg_notify.config import get_config_data + + +def get_db_connection_string( + conf_file_path: str, conf_file_section: str | None = None +): + """Parses a configuration file, generates a database connection string. + + Args: + conf_file_path: + A configuration file with database connection details. + conf_file_section: + The section of the configuration file. Is needed for the INI + configuration file format, otherwise optional. + + Returns: + MySQL connection string suitable for SQLAchemy + """ + + config = get_config_data( + conf_file_path=conf_file_path, conf_file_section=conf_file_section + ) + if "dbschema" not in config: + raise Exception("Database schema name should be defined as dbschema") + + user_creds = config["dbuser"] + # Not having a password is not that unusual for read-only access. + if "dbpassword" in config: + user_creds += ":" + config["dbpassword"] + + return ( + f"mysql+pymysql://{user_creds}@" + f"{config['dbhost']}:{config['dbport']}/{config['dbschema']}?charset=utf8mb4" + ) + + +@contextmanager +def get_connection( + conf_file_path: str, conf_file_section: str = None +) -> Session: + """Connects to MySQL database and returns a database session. + + Using database credentials specified in the configuration file, establishes + a connection to MySQL database and returns sqlalchemy.orm.Session object. + + Args: + conf_file_path: + A configuration file with database connection details. + conf_file_section: + The section of the configuration file. Optional. + + Returns: + sqlalchemy.orm.Session object + + Example: + with get_connection( + conf_file_path=test_config, conf_file_section="MySQL MLWH" + ) as session: + pass + """ + + url = get_db_connection_string( + conf_file_path=conf_file_path, conf_file_section=conf_file_section + ) + engine: Engine = create_engine(url, echo=False) + session = Session(engine) + try: + yield session + finally: + session.close() + + +def create_schema( + base: DeclarativeBase, + conf_file_path: str, + conf_file_section: str = None, + drop: bool = False, +): + """Connects to MySQL database, creates a new schema. + + This method is good to use in unit tests. While it can be used for + creating production instances from scratch, the correctness of the created + schema cannot be guaranteed. + + Args: + base: + sqlalchemy.orm.DeclarativeBase object for the schema to be loaded. + conf_file_path: + A configuration file with database connection details. + conf_file_section: + The section of the configuration file. Optional. + drop: + A boolean option, defaults to False. If True, the existing tables of + the database schema are dropped. + """ + + url = get_db_connection_string( + conf_file_path=conf_file_path, conf_file_section=conf_file_section + ) + engine: Engine = create_engine(url, echo=False) + + if database_exists(engine.url): + if drop is True: + drop_database(engine.url) + else: + raise Exception( + "Cannot create a new database: " + "Database exists, drop_database option is False" + ) + + create_database(engine.url) + base.metadata.create_all(engine) + with engine.connect() as conn: + # Workaround for invalid default values for dates. + # Needed only in CI. + conn.execute(text("SET sql_mode = '';")) + conn.commit() + + +def batch_load_from_yaml( + session: Session, module: str, fixtures_dir_path: str +): + """Loads data to the database. + + This method is good for use in unit tests, it is not intended + for production use. + + Args: + session: + sqlalchemy.orm.Session object + module: + A string representing the name of the module where ORM classes + for the database tables are defined. + fixtures_dir_path: + A path to the directory with YAML files containing data to load, one + file per table. The names of the files should follow the pattern + 200-.yml The integer prefix can be any number, + data from files with lower value of the prefix are loader first. + + Example: + batch_load_from_yaml( + session=session, + fixtures_dir_path="tests/data/mlwh_fixtures", + module="npg_notify.db.mlwh", + ) + """ + # Load the schema module where the table ORM classes are defined. + module = importlib.import_module(module) + # Find all files in a given directory. + dir_obj = pathlib.Path(fixtures_dir_path) + file_paths = list(str(f) for f in dir_obj.iterdir()) + file_paths.sort() + + for file_path in file_paths: + with open(file_path, "r") as f: + (head, file_name) = os.path.split(file_path) + # File name example: 200-StudyUser.yml + m = re.match(r"\A\d+-([a-zA-Z]+)\.yml\Z", file_name) + if m: + class_name = m.group(1) + table_class = getattr(module, class_name) + data = yaml.safe_load(f) + session.execute(insert(table_class), data) + + session.commit() diff --git a/src/npg_notify/mail.py b/src/npg_notify/mail.py new file mode 100644 index 0000000..6c0a918 --- /dev/null +++ b/src/npg_notify/mail.py @@ -0,0 +1,159 @@ +import logging +import os +import smtplib +from email.message import EmailMessage +from email.utils import parseaddr + +logger = logging.getLogger("npg_notify") + + +def send_notification( + domain: str, contacts: list[str], subject: str, content: str +): + """Sends an email. + + Sending an email succeeds if at least one recipient is likely to receive it. + + Args: + domain: + The domain of the mail server which is used for sending the email. + contacts: + A non-empty list of valid email addresses. + subject: + The subject line of the email, a non-empty string. + content: + The content of the email, a non-empty string. + """ + user_name = os.environ.get("USER") + if not user_name: + raise ValueError("USER not set in the environment.") + + if subject == "": + raise ValueError("Email subject cannot be empty.") + if content == "": + raise ValueError("Email content cannot be empty.") + + if len(contacts) == 0: + raise ValueError("List of contacts cannot not be empty.") + validated_contacts = [ + valid_address + for valid_address in [parseaddr(addr)[1] for addr in contacts] + if valid_address != "" + ] + if len(validated_contacts) != len(contacts): + logger.warning( + "Some contact emails are invalid in " + ", ".join(contacts) + ) + if len(validated_contacts) == 0: + raise ValueError( + "All contact emails are invalid in " + ", ".join(contacts) + ) + + recipients = ", ".join(validated_contacts) + logger.debug(f"Sending an email to {recipients}") + + msg = EmailMessage() + msg.set_content(content) + msg["Subject"] = subject + msg["To"] = recipients + msg["From"] = f"{user_name}@{domain}" + s = smtplib.SMTP(f"mail.{domain}") + # Sending the message might fail, then there will be an error we + # do not have to catch here. Sending succeeds if at least one + # recipient is likely to receive the message. The return value + # contains information about failed attempts. If this happens it is + # safer to consider the message as being sent. + reply = s.send_message(msg) + s.quit() + if len(reply) != 0: + logger.warning(reply) + + +def generate_email_pac_bio( + domain: str, + langqc_run_url: str, + irods_docs_url: str, + qc_outcome: dict, + well_data: dict, + libraries: list, +) -> tuple[str, str]: + """Generates the subject line and the content for a notification. + + This code is specific for the PacBio platform. + + Args: + domain: + The domain for any email addresses in the content of the notification. + langqc_run_url: + LangQC run URL for the recipient to refer to. + irods_docs_url: + iRODS documentation page URL for the recipient to refer to. + qc_outcome: + A dictionary representing information about the QC outcome for + for the PacBio well. + well_data: + A dictionary representing information about a well. + libraries: + A list of dictionaries, which represent individual libraries. + All libraries in this list should belong to the same study. + + Returns: + A tuple of two strings, the subject line and the content for the + email notification. + """ + study_ids = {lib["study_id"] for lib in libraries} + if len(study_ids) != 1: + raise ValueError( + "Libraries from different studies in 'libraries' attribute" + ) + + run_name = well_data["run_name"] + plate_number = ( + well_data["plate_number"] if well_data["plate_number"] else "n/a" + ) + outcome = "Undefined" + if qc_outcome["outcome"] is not None: + outcome = "Pass" if qc_outcome["outcome"] is True else "Fail" + + study_id = study_ids.pop() + study_name = libraries[0]["study_name"] + + subject = f"Study {study_id}: PacBio data is available" + + lines = [ + subject, + "", + f"Study name: {study_name}", + f"Run: {run_name}", + f"Well label: {well_data['label']}", + f"Plate number: {plate_number}", + f"QC outcome: {qc_outcome['qc_state']} ({outcome})", + "Samples:", + ] + num_samples = len(libraries) + num_samples_listed = num_samples if num_samples <= 5 else 5 + for name in [ + lib["sample_name"] for lib in libraries[0:num_samples_listed] + ]: + lines.append(f"\t{name},") + + if num_samples > num_samples_listed: + lines.append("\t.....") + + lines = lines + [ + f"\t{num_samples} sample" + + ("s" if num_samples > 1 else "") + + " in total", + "", + "The QC review is complete and your data should now be available from" + + f" iRODS (see {irods_docs_url}).", + f"QC information for this run: {'/'.join([langqc_run_url, run_name])}.", + "", + "If you have any questions or need further assistance, please feel " + + "free to reach out to a Scientific Service Representative at " + + f"dnap-ssr@{domain}.", + "", + "NPG on behalf of DNA Pipelines\n", + ] + + return (subject, "\n".join(lines)) diff --git a/src/npg_notify/porch_wrapper/qc_state.py b/src/npg_notify/porch_wrapper/qc_state.py new file mode 100644 index 0000000..f5937c0 --- /dev/null +++ b/src/npg_notify/porch_wrapper/qc_state.py @@ -0,0 +1,381 @@ +import argparse +import logging +import os +import re +import sys +import time +from pathlib import PurePath +from urllib.parse import urljoin + +from npg_notify.config import get_config_data +from npg_notify.db.mlwh import get_study_contacts +from npg_notify.db.utils import get_connection, get_db_connection_string +from npg_notify.mail import generate_email_pac_bio, send_notification +from npg_porch_cli import send_request +from npg_porch_cli.api import Pipeline, PorchAction +from npg_porch_cli.api import send as send_porch_request + +SSL_CONFIG_FILE_SECTION = "SSL" +LANGQC_CONFIG_FILE_SECTION = "LANGQC" +PORCH_CONFIG_FILE_SECTION = "PORCH" +IRODS_CONFIG_FILE_SECTION = "IRODS" +MAIL_CONFIG_FILE_SECTION = "MAIL" + +logger = logging.getLogger("npg_notify") + + +def run(): + _configure_logger() + logger.info("Started") + + parser = argparse.ArgumentParser( + prog="qc_state_notification", + description="Creates or processes notifications about QC states.", + ) + parser.add_argument( + "action", + type=str, + help="A task to perform.", + choices=["register", "process"], + ) + parser.add_argument( + "--conf_file_path", + type=str, + required=True, + help="Configuration file path.", + ) + args = parser.parse_args() + + conf_file_path = args.conf_file_path + ssl_cert_file = _get_ssl_cert_file_path(conf_file_path=conf_file_path) + if ssl_cert_file: + os.environ["SSL_CERT_FILE"] = ssl_cert_file + # For a custom CA SSL certificate, the directory assigned to the + # REQUESTS_CA_BUNDLE env. variable should have been 'treated' with the + # c_rehash tool, which is supplied with the Python interpreter, see + # https://requests.readthedocs.io/en/latest/user/advanced/#ssl-cert-verification + os.environ["REQUESTS_CA_BUNDLE"] = str(PurePath(ssl_cert_file).parent) + + action = args.action + if action == "register": + success = create_tasks(conf_file_path=conf_file_path) + elif action == "process": + success = process_task(conf_file_path=conf_file_path) + else: + raise Exception(f"Action '{action}' is not implemented") + + logger.info("Finished") + exit(0 if success else 1) + + +def create_tasks(conf_file_path: str) -> bool: + """Retrieves and registers with Porch recently assigned QC states. + + Retrieves from LangQC API all recently (within the last four weeks) assigned + final QC states for PacBio wells. Registers these states as pending with + one of the pipelines registered with Porch API. URLs to use with these APIs, + details of the Porch pipeline and Porch authorization token should be given + in the configuration file. + + All errors in task registration are captured and logged. + + Args: + conf_file_path: + Configuration file path for this application. + """ + + porch_conf = get_config_data( + conf_file_path=conf_file_path, + conf_file_section=PORCH_CONFIG_FILE_SECTION, + ) + pipeline = _pipeline_object(porch_conf) + + # query LangQC for recently QC-ed wells + langqc_conf = get_config_data( + conf_file_path=conf_file_path, + conf_file_section=LANGQC_CONFIG_FILE_SECTION, + ) + qc_states = send_request( + validate_ca_cert=_validate_ca_cert(), + url=urljoin(langqc_conf["api_url"], langqc_conf["recently_qced_path"]), + method="GET", + auth_type=None, + ) + + num_products = len(qc_states) + logger.info(f"Retrieved QC states for {num_products} products.") + + os.environ["NPG_PORCH_TOKEN"] = porch_conf.pop("npg_porch_token") + + # qc_states is a dictionary where keys are product IDs and values are + # lists of QC states. A list of one QC state is expected because we are + # limiting to sequencing products only. + num_errors = 0 + for product_id, qc_state_data in qc_states.items(): + try: + task = create_task( + porch_config=porch_conf, + pipeline=pipeline, + qc_state=qc_state_data[0], + ) + logger.debug(f"Created a new task {task}") + except Exception as err: + logger.error( + f"Error registering task for pipeline {pipeline.name} with " + f"QC state change of {product_id}: {str(err)}" + ) + num_errors += 1 + + del os.environ["NPG_PORCH_TOKEN"] + + logger.info( + f"{num_errors} errors when registering products. " + f"Registered QC states for {num_products-num_errors} products." + ) + + return True if not num_errors else False + + +def create_task( + porch_config: dict[str], pipeline: Pipeline, qc_state: dict[str] +): + """Creates and queues a single porch task for the notification pipeline. + + Args: + porch_config: + A dictionary of porch-related configuration parameters. + pipeline: + npg_porch_cli.api.Pipeline object + qc_state: + A Python object that can be encoded into a JSON string. This object + uniquely defines a porch task for a given pipeline. + """ + + action = PorchAction( + validate_ca_cert=_validate_ca_cert(), + porch_url=porch_config["api_url"], + action="add_task", + task_input=qc_state, + ) + send_porch_request(action=action, pipeline=pipeline) + + +def process_task(conf_file_path) -> bool: + """Processes one task for the email notification pipeline. + + Performs the following steps: + 1. claims one porch task, + 2. gets necessary details about the product (PacBio well) from the + LangQC API, + 3. for each study linked to the product gets contact details from + ml warehouse, + 4 for each study sends an email notification, + 5. using porch API, updates the claimed task status to `DONE`, `FAIL`, + or resets it back to `PENDING`. + + If an error occurs at steps 2 or 3, the task will be re-assigned the + `PENDING` status. + + On step 4 an attempt to send an email will be made separately for each + study. If this fails for one of the studies, the task will is assigned the + `FAIL` status. + + Step 5 can fail, then the task will remain as claimed. Tasks like this + should be mopped up manually. The log will contain information about the + intended state of the task. + + The event log (table) of the porch database contains information about all + task status updates. This information can be used to identify tasks, which + has their status updated repeatedly. + """ + + porch_config = get_config_data( + conf_file_path=conf_file_path, + conf_file_section=PORCH_CONFIG_FILE_SECTION, + ) + langqc_conf = get_config_data( + conf_file_path=conf_file_path, + conf_file_section=LANGQC_CONFIG_FILE_SECTION, + ) + irods_config = get_config_data( + conf_file_path=conf_file_path, + conf_file_section=IRODS_CONFIG_FILE_SECTION, + ) + mail_config = get_config_data( + conf_file_path=conf_file_path, + conf_file_section=MAIL_CONFIG_FILE_SECTION, + ) + + # Get all config data or error before claiming the task. + porch_api_url = porch_config["api_url"] + pac_bio_well_libraries_path = langqc_conf["pac_bio_well_libraries_path"] + langqc_base_url = langqc_conf["api_url"] + pac_bio_run_iu_path = langqc_conf["pac_bio_run_iu_path"] + irods_docs_url = irods_config["user_manual_url"] + domain = mail_config["domain"] + # Check that database credentials are in place + get_db_connection_string( + conf_file_path=conf_file_path, conf_file_section="MySQL MLWH" + ) + + os.environ["NPG_PORCH_TOKEN"] = porch_config["npg_porch_token"] + pipeline = _pipeline_object(porch_config) + action = PorchAction( + validate_ca_cert=_validate_ca_cert(), + porch_url=porch_api_url, + action="claim_task", + ) + claimed_tasks = send_porch_request(action=action, pipeline=pipeline) + if len(claimed_tasks) == 0: + logger.info("No pending tasks returned from porch") + return True + + claimed_task = claimed_tasks[0] + logger.debug(f"Claimed task {claimed_task}") + + new_task_status = "DONE" + + # Get product lims data from LangQC - a platform-specific step. + # For PacBio the product is a well. + product_id = claimed_task["task_input"]["id_product"] + try: + url = re.sub( + r"\[\w+\]", + product_id, + pac_bio_well_libraries_path, + ) + url = urljoin(langqc_base_url, url) + response = send_request( + validate_ca_cert=_validate_ca_cert(), + url=url, + method="GET", + auth_type=None, + ) + libraries_per_study = {} + for library in response["libraries"]: + study_id = library["study_id"] + if study_id in libraries_per_study: + libraries_per_study[study_id].append(library) + else: + libraries_per_study[study_id] = [library] + + study_ids = libraries_per_study.keys() + contacts_per_study = {} + + with get_connection( + conf_file_path=conf_file_path, conf_file_section="MySQL MLWH" + ) as session: + for study_id in study_ids: + contacts_per_study[study_id] = get_study_contacts( + session=session, id=study_id + ) + + except Exception as err: + logger.error(str(err)) + new_task_status = "PENDING" + + task_input = claimed_task["task_input"] + + if new_task_status == "DONE": + for study_id, contacts in contacts_per_study.items(): + if len(contacts) == 0: + print(f"No contacts are registered for study {study_id}") + continue + + try: + (subject, text) = generate_email_pac_bio( + domain=domain, + langqc_run_url=urljoin( + langqc_base_url, pac_bio_run_iu_path + ), + irods_docs_url=irods_docs_url, + qc_outcome=task_input, + well_data=response, + libraries=libraries_per_study[study_id], + ) # platform-specific step + send_notification( + domain=domain, + contacts=contacts, + subject=subject, + content=text, + ) + except Exception as err: + logger.error( + "Error generating or sending a notification: " + str(err) + ) + new_task_status = "FAILED" + + return _update_task_status( + porch_api_url, pipeline, task_input, new_task_status + ) + + +def _pipeline_object(porch_conf: dict): + return Pipeline( + name=porch_conf["pipeline_name"], + uri=porch_conf["pipeline_uri"], + version=porch_conf["pipeline_version"], + ) + + +def _update_task_status(porch_api_url, pipeline, task_input, task_status): + action = PorchAction( + validate_ca_cert=_validate_ca_cert(), + porch_url=porch_api_url, + action="update_task", + task_input=task_input, + task_status=task_status, + ) + + num_attempts = 3 + i = 0 + success = False + + message = f"porch task {task_input} to status {task_status}" + + while i < num_attempts: + try: + send_porch_request(action=action, pipeline=pipeline) + logger.info(f"Updated {message}") + success = True + i = num_attempts + except Exception as err: + logger.warning( + "Error: " + str(err) + f"\nwhile trying to update {message}" + ) + if i != (num_attempts - 1): + time.sleep(60 + (i * 300)) # sleep 60 or 360 sec + i = +1 + if not success: + logger.error(f"Failed to update {message}.") + + return success + + +def _get_ssl_cert_file_path(conf_file_path: str): + ssl_cert_file = None + try: + ssl_conf = get_config_data( + conf_file_path=conf_file_path, + conf_file_section=SSL_CONFIG_FILE_SECTION, + ) + ssl_cert_file = ssl_conf["ssl_cert_file"] + if not os.path.isfile(ssl_cert_file): + ssl_cert_file = None + except Exception as err: + logger.warning(str(err)) + + return ssl_cert_file + + +def _validate_ca_cert(): + return True if "SSL_CERT_FILE" in os.environ else False + + +def _configure_logger(): + logging.basicConfig( + stream=sys.stderr, + level=logging.INFO, + datefmt="%Y:%m:%d %H:%M:%S", + format="%(asctime)s %(levelname)s %(message)s", + ) diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..0450e15 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,39 @@ +import os + +import pytest +from npg_notify.db.mlwh import Base +from npg_notify.db.utils import ( + batch_load_from_yaml, + create_schema, + get_connection, +) + +TEST_CONFIG_FILE = "qc_state_app_config.ini" +test_config = os.path.join(os.path.dirname(__file__), "data", TEST_CONFIG_FILE) + + +@pytest.fixture(scope="module", name="mlwh_test_session") +def get_test_db_session(): + """ + Establishes a connection to the database, creates a schema, loads + data and returns a new database session. + """ + fixtures_dir = os.path.join( + os.path.dirname(__file__), "data/mlwh_fixtures" + ) + create_schema( + base=Base, + drop=True, + conf_file_path=test_config, + conf_file_section="MySQL MLWH", + ) + + with get_connection( + conf_file_path=test_config, conf_file_section="MySQL MLWH" + ) as session: + batch_load_from_yaml( + session=session, + fixtures_dir_path=fixtures_dir, + module="npg_notify.db.mlwh", + ) + return session diff --git a/tests/data/mlwh_fixtures/100-Study.yml b/tests/data/mlwh_fixtures/100-Study.yml new file mode 100644 index 0000000..b30857f --- /dev/null +++ b/tests/data/mlwh_fixtures/100-Study.yml @@ -0,0 +1,18 @@ +- name: Study two name + id_lims: SQSCP + id_study_lims: 2 + id_study_tmp: 1 +- name: Study three name + id_lims: SQSCP + id_study_lims: 3 + id_study_tmp: 2 +- name: Study four name + id_lims: SQSCP + id_study_lims: 4 + id_study_tmp: 3 +- name: Study five name + id_lims: SQSCP + id_study_lims: 5 + id_study_tmp: 4 + + diff --git a/tests/data/mlwh_fixtures/200-StudyUser.yml b/tests/data/mlwh_fixtures/200-StudyUser.yml new file mode 100644 index 0000000..9bc1014 --- /dev/null +++ b/tests/data/mlwh_fixtures/200-StudyUser.yml @@ -0,0 +1,36 @@ +- email: user2@sanger.ac.uk + id_study_tmp: 2 + id_study_users_tmp: 1 + role: manager +- email: user3@sanger.ac.uk + id_study_tmp: 2 + id_study_users_tmp: 2 + role: follower +- email: user3@sanger.ac.uk + id_study_tmp: 2 + id_study_users_tmp: 8 + role: owner +- email: user1@sanger.ac.uk + id_study_tmp: 2 + id_study_users_tmp: 3 + role: owner +- email: owner@sanger.ac.uk + id_study_tmp: 2 + id_study_users_tmp: 7 + role: owner +- email: ~ + id_study_tmp: 2 + id_study_users_tmp: 4 + role: ~ +- email: loader@sanger.ac.uk + id_study_tmp: 3 + id_study_users_tmp: 5 + role: loader +- email: user1@sanger.ac.uk + id_study_tmp: 4 + id_study_users_tmp: 6 + role: ~ +- email: ~ + id_study_tmp: 4 + id_study_users_tmp: 9 + role: owner \ No newline at end of file diff --git a/tests/data/qc_state_app_config.ini b/tests/data/qc_state_app_config.ini new file mode 100644 index 0000000..e58ddc9 --- /dev/null +++ b/tests/data/qc_state_app_config.ini @@ -0,0 +1,37 @@ +# Configuration for the npg_notify package. + +[MySQL MLWH] + +dbuser = test +dbpassword = test +dbhost = 127.0.0.1 +dbport = 3306 +dbschema = study_notify + +[SSL] + +ssl_cert_file = some_path + +[LANGQC] + +api_url = https://langqc.com +recently_qced_url = api/products/qc?weeks=4&final=yes&seq_level=yes +pac_bio_well_libraries_url = api/pacbio/wells/[product_id]/libraries +pac_bio_run_iu_url = ui/run + +[PORCH] + +api_url = https://porch.com +pipeline_name = qc_notifications +pipeline_uri = https://github.com/wtsi/npg_notifications.git +pipeline_version = 0.1 +npg_porch_token = aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa + +[IRODS] + +user_manual_url = https://confluence_irods.com/display/FARM/iRODS + +[MAIL] + +domain = some.com + diff --git a/tests/test_generate_and_send_email.py b/tests/test_generate_and_send_email.py new file mode 100644 index 0000000..bedc5a9 --- /dev/null +++ b/tests/test_generate_and_send_email.py @@ -0,0 +1,244 @@ +import random + +import pytest +from npg_notify.mail import generate_email_pac_bio + +domain = "langqc.com" +langqc_run_url = "https://langqc.com/ui/run" +irods_docs_url = "https://confluence_irods.com/iRODS" +footer = ( + "The QC review is complete and your data should now be available " + + "from iRODS (see https://confluence_irods.com/iRODS).\n" + + "QC information for this run: https://langqc.com/ui/run/TRACTION-RUN.\n\n" + + "If you have any questions or need further assistance, please feel free " + + "to reach out to a Scientific Service Representative at dnap-ssr@langqc.com.\n\n" + + "NPG on behalf of DNA Pipelines\n" +) + +id_product = "f910e2fc6bd10913fb7df100e788192962d71d57c85e3d300c9fa3e24e6691db" + + +def get_well_data(libraries): + return { + "id_product": id_product, + "label": "D1", + "plate_number": 1, + "run_name": "TRACTION-RUN-1333", + "run_start_time": "2024-06-18T10:01:42", + "run_complete_time": "2024-06-19T16:26:46", + "well_start_time": "2024-06-18T11:44:43", + "well_complete_time": "2024-06-20T14:36:27", + "run_status": "Complete", + "well_status": "Complete", + "instrument_name": "84098", + "instrument_type": "Revio", + "qc_state": None, + "libraries": libraries, + } + + +def generate_tag(): + return "".join([random.choice(["C", "T", "G", "A"]) for i in range(1, 16)]) + + +def test_generate_email_one_library(): + qc_outcome = { + "qc_state": "Passed With Distinction", + "is_preliminary": False, + "qc_type": "sequencing", + "outcome": True, + "id_product": id_product, + "date_created": "2024-06-28T14:22:18", + "date_updated": "2024-06-28T14:24:47", + "user": "user1@langqc.com", + "created_by": "LangQC", + } + libraries = [ + { + "study_id": "1234", + "study_name": "Reference Genomes_ DNA", + "sample_id": "778655549", + "sample_name": "1234STDY13618009", + "tag_sequence": ["CTGCGATCACGAGTAT"], + "library_type": "Pacbio_HiFi", + "pool_name": "TRAC-2-3818", + } + ] + + (subject, generated_content) = generate_email_pac_bio( + domain=domain, + langqc_run_url=langqc_run_url, + irods_docs_url=irods_docs_url, + qc_outcome=qc_outcome, + well_data=get_well_data(libraries), + libraries=libraries, + ) + + content = ( + "Study 1234: PacBio data is available\n\n" + + "Study name: Reference Genomes_ DNA\n" + + "Run: TRACTION-RUN-1333\n" + + "Well label: D1\n" + + "Plate number: 1\n" + + "QC outcome: Passed With Distinction (Pass)\n" + + "Samples:\n" + + "\t1234STDY13618009,\n" + + "\t1 sample in total\n\n" + ) + + assert subject == "Study 1234: PacBio data is available" + assert generated_content == ( + content + footer.replace("TRACTION-RUN", "TRACTION-RUN-1333") + ) + + qc_outcome["outcome"] = None + qc_outcome["qc_state"] = "Nobody can tell" + (subject, generated_content) = generate_email_pac_bio( + domain=domain, + langqc_run_url=langqc_run_url, + irods_docs_url=irods_docs_url, + qc_outcome=qc_outcome, + well_data=get_well_data(libraries), + libraries=libraries, + ) + + content = ( + "Study 1234: PacBio data is available\n\n" + + "Study name: Reference Genomes_ DNA\n" + + "Run: TRACTION-RUN-1333\n" + + "Well label: D1\n" + + "Plate number: 1\n" + + "QC outcome: Nobody can tell (Undefined)\n" + + "Samples:\n" + + "\t1234STDY13618009,\n" + + "\t1 sample in total\n\n" + ) + + assert generated_content == ( + content + footer.replace("TRACTION-RUN", "TRACTION-RUN-1333") + ) + + +def test_generate_email_two_libraries(): + qc_outcome = { + "qc_state": "Failed (Instrument)", + "is_preliminary": False, + "qc_type": "sequencing", + "outcome": False, + "id_product": id_product, + "date_created": "2024-06-28T14:22:18", + "date_updated": "2024-06-28T14:24:47", + "user": "user1@langqc.com", + "created_by": "LangQC", + } + + libraries = [ + { + "study_id": "1234", + "study_name": "Reference Genomes_ DNA", + "sample_id": "778655549", + "sample_name": f"1234STDY13618009{i}", + "tag_sequence": [generate_tag()], + "library_type": "Pacbio_HiFi", + "pool_name": "TRAC-2-3818", + } + for i in range(0, 5) + ] + + (subject, generated_content) = generate_email_pac_bio( + domain=domain, + langqc_run_url=langqc_run_url, + irods_docs_url=irods_docs_url, + qc_outcome=qc_outcome, + well_data=get_well_data(libraries), + libraries=libraries[0:2], + ) + + content = ( + "Study 1234: PacBio data is available\n\n" + + "Study name: Reference Genomes_ DNA\n" + + "Run: TRACTION-RUN-1333\n" + + "Well label: D1\n" + + "Plate number: 1\n" + + "QC outcome: Failed (Instrument) (Fail)\n" + + "Samples:\n" + + "\t1234STDY136180090,\n" + + "\t1234STDY136180091,\n" + + "\t2 samples in total\n\n" + ) + + assert subject == "Study 1234: PacBio data is available" + assert generated_content == ( + content + footer.replace("TRACTION-RUN", "TRACTION-RUN-1333") + ) + + +def test_generate_email_seven_libraries(): + qc_outcome = { + "qc_state": "Failed", + "is_preliminary": False, + "qc_type": "sequencing", + "outcome": False, + "id_product": id_product, + "date_created": "2024-06-28T14:22:18", + "date_updated": "2024-06-28T14:24:47", + "user": "user1@langqc.com", + "created_by": "LangQC", + } + libraries = [ + { + "study_id": "1234", + "study_name": "Reference Genomes_ DNA", + "sample_id": "778655549", + "sample_name": f"1234STDY13618009{i}", + "tag_sequence": [generate_tag()], + "library_type": "Pacbio_HiFi", + "pool_name": "TRAC-2-3818", + } + for i in range(0, 7) + ] + + (subject, generated_content) = generate_email_pac_bio( + domain=domain, + langqc_run_url=langqc_run_url, + irods_docs_url=irods_docs_url, + qc_outcome=qc_outcome, + well_data=get_well_data(libraries), + libraries=libraries, + ) + + content = ( + "Study 1234: PacBio data is available\n\n" + + "Study name: Reference Genomes_ DNA\n" + + "Run: TRACTION-RUN-1333\n" + + "Well label: D1\n" + + "Plate number: 1\n" + + "QC outcome: Failed (Fail)\n" + + "Samples:\n" + + "\t1234STDY136180090,\n" + + "\t1234STDY136180091,\n" + + "\t1234STDY136180092,\n" + + "\t1234STDY136180093,\n" + + "\t1234STDY136180094,\n" + + "\t.....\n" + + "\t7 samples in total\n\n" + ) + + assert subject == "Study 1234: PacBio data is available" + assert generated_content == ( + content + footer.replace("TRACTION-RUN", "TRACTION-RUN-1333") + ) + + libraries[3]["study_id"] = "12345" + with pytest.raises( + ValueError, + match=r"Libraries from different studies in 'libraries' attribute", + ): + generate_email_pac_bio( + domain=domain, + langqc_run_url=langqc_run_url, + irods_docs_url=irods_docs_url, + qc_outcome=qc_outcome, + well_data=get_well_data(libraries), + libraries=libraries, + ) diff --git a/tests/test_retrieve_contacts.py b/tests/test_retrieve_contacts.py new file mode 100644 index 0000000..03067d7 --- /dev/null +++ b/tests/test_retrieve_contacts.py @@ -0,0 +1,18 @@ +import pytest +from npg_notify.db.mlwh import StudyNotFoundError, get_study_contacts + + +def test_retrieving_study_contacts(mlwh_test_session): + with pytest.raises( + StudyNotFoundError, + match=r"Study with ID 666 is not found in ml warehouse", + ): + get_study_contacts(mlwh_test_session, "666") + + for id in ["2", "5", "4"]: + assert get_study_contacts(mlwh_test_session, id) == [] + + users = ["owner", "user1", "user2", "user3"] + assert get_study_contacts(mlwh_test_session, "3") == [ + f"{u}@sanger.ac.uk" for u in users + ]