diff --git a/Dockerfile-base b/Dockerfile-base index 7725a6bd6..8a74da27d 100644 --- a/Dockerfile-base +++ b/Dockerfile-base @@ -84,7 +84,7 @@ WORKDIR /app # copy assets COPY assets/configs/requirements.base.txt ./assets/scripts/remove_deps.sh ./assets/scripts/update_deps.sh ./assets/dropped.txt ./assets/scripts/install_deps.sh /app/assets/ -COPY pyproject.toml /app/pyproject.toml +COPY pyproject.toml assets/shared.py /app/ COPY assets/configs/.pys_usercfg.ini /Pysces/.pys_usercfg.ini COPY assets/configs/.pys_usercfg.ini /root/Pysces/.pys_usercfg.ini diff --git a/assets/shared.py b/assets/shared.py index a52d9aad9..44a28b6c0 100644 --- a/assets/shared.py +++ b/assets/shared.py @@ -1,111 +1,37 @@ -# -- db connectors and globally shared tooling -- # +"""Globally-shared content amongst both API and Worker microservices.""" import logging import os +import uuid from abc import abstractmethod, ABC from dataclasses import dataclass, asdict from datetime import datetime +from enum import Enum from typing import * +from asyncio import sleep +import dotenv +from dotenv import load_dotenv from google.cloud import storage from pydantic import BaseModel as _BaseModel, ConfigDict from fastapi import UploadFile from pymongo import MongoClient from pymongo.collection import Collection from pymongo.database import Database +# from process_bigraph import ProcessTypes -# -- globally-shared content-- # +# for dev only +dotenv.load_dotenv("../assets/dev/.env_dev") -# BUCKET_URL = "gs://bio-check-requests-1/" +# constraints/definitions +DB_TYPE = "mongo" # ie: postgres, etc +DB_NAME = "service_requests" +BUCKET_NAME = os.getenv("BUCKET_NAME") -def increment_version(version: str, increment_position: int): - """Args: - version (str): version to increment by - increment_position (int): position to increment by: 0 for major 1 for minor and 2 for patch - """ - parts = [int(n) for n in version.split(".")] - new_part = parts[increment_position] + 1 - parts.remove(parts[increment_position]) - parts.insert(increment_position, new_part) - return ".".join([str(n) for n in parts]) - - -def setup_logging(fname: str): - logging.basicConfig( - filename=fname, - level=logging.ERROR, - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' - ) - - -def upload_blob(bucket_name, source_file_name, destination_blob_name): - """Uploads a file to the bucket.""" - # The ID of your GCS bucket - # bucket_name = "your-bucket-name" - # The path to your file to upload - # source_file_name = "local/path/to/file" - # The ID of your GCS object - # destination_blob_name = "storage-object-name" - - storage_client = storage.Client('bio-check-428516') - bucket = storage_client.bucket(bucket_name) - blob = bucket.blob(destination_blob_name) - - # Optional: set a generation-match precondition to avoid potential race conditions - # and data corruptions. The request to upload is aborted if the object's - # generation number does not match your precondition. For a destination - # object that does not yet exist, set the if_generation_match precondition to 0. - # If the destination object already exists in your bucket, set instead a - # generation-match precondition using its generation number. - generation_match_precondition = 0 - - blob.upload_from_filename(source_file_name, if_generation_match=generation_match_precondition) - - return { - 'message': f"File {source_file_name} uploaded to {destination_blob_name}." - } - - -def read_uploaded_file(bucket_name, source_blob_name, destination_file_name): - download_blob(bucket_name, source_blob_name, destination_file_name) - - with open(destination_file_name, 'r') as f: - return f.read() - - -def download_blob(bucket_name, source_blob_name, destination_file_name): - """Downloads a blob from the bucket.""" - # bucket_name = "your-bucket-name" - # source_blob_name = "storage-object-name" - # destination_file_name = "local/path/to/file" - - storage_client = storage.Client() - bucket = storage_client.bucket(bucket_name) - blob = bucket.blob(source_blob_name) - - # Download the file to a destination - blob.download_to_filename(destination_file_name) - - -async def save_uploaded_file(uploaded_file: UploadFile, save_dest: str) -> str: - """Write `fastapi.UploadFile` instance passed by api gateway user to `save_dest`.""" - file_path = os.path.join(save_dest, uploaded_file.filename) - with open(file_path, 'wb') as file: - contents = await uploaded_file.read() - file.write(contents) - return file_path - - -def make_dir(fp: str): - if not os.path.exists(fp): - os.mkdir(fp) - - -# -- base models -- - +# shared data-models class BaseModel(_BaseModel): """Base Pydantic Model with custom app configuration""" model_config = ConfigDict(arbitrary_types_allowed=True) @@ -118,6 +44,19 @@ def to_dict(self): return asdict(self) +class JobStatus(Enum): + PENDING = "PENDING" + IN_PROGRESS = "IN_PROGRESS" + COMPLETED = "COMPLETED" + FAILED = "FAILED" + + +class DatabaseCollections(Enum): + PENDING_JOBS = "PENDING_JOBS".lower() + IN_PROGRESS_JOBS = "IN_PROGRESS_JOBS".lower() + COMPLETED_JOBS = "COMPLETED_JOBS".lower() + + class MultipleConnectorError(Exception): def __init__(self, message: str): self.message = message @@ -150,15 +89,24 @@ class CompletedJob(Job): class DatabaseConnector(ABC): """Abstract class that is both serializable and interacts with the database (of any type). """ + def __init__(self, connection_uri: str, database_id: str, connector_id: str): self.database_id = database_id self.client = self._get_client(connection_uri) self.db = self._get_database(self.database_id) - @classmethod - def timestamp(cls) -> str: + @staticmethod + def timestamp() -> str: return str(datetime.utcnow()) + def refresh_jobs(self): + def refresh_collection(coll): + for job in self.db[coll].find(): + self.db[coll].delete_one(job) + + for collname in ['completed_jobs', 'in_progress_jobs', 'pending_jobs']: + refresh_collection(collname) + @abstractmethod def _get_client(self, *args): pass @@ -167,6 +115,26 @@ def _get_client(self, *args): def _get_database(self, db_id: str): pass + @abstractmethod + def pending_jobs(self): + pass + + @abstractmethod + def completed_jobs(self): + pass + + @abstractmethod + async def read(self, *args, **kwargs): + pass + + @abstractmethod + async def write(self, *args, **kwargs): + pass + + @abstractmethod + def get_collection(self, **kwargs): + pass + class MongoDbConnector(DatabaseConnector): def __init__(self, connection_uri: str, database_id: str, connector_id: str = None): @@ -184,12 +152,35 @@ def _get_jobs_from_collection(self, coll_name: str): def pending_jobs(self): return self._get_jobs_from_collection("pending_jobs") - def in_progress_jobs(self): - return self._get_jobs_from_collection("in_progress_jobs") - def completed_jobs(self): return self._get_jobs_from_collection("completed_jobs") + async def read(self, collection_name: DatabaseCollections | str, **kwargs): + """Args: + collection_name: str + kwargs: (as in mongodb query) + """ + coll_name = self._parse_enum_input(collection_name) + coll = self.get_collection(coll_name) + result = coll.find_one(kwargs.copy()) + return result + + async def write(self, collection_name: DatabaseCollections | str, **kwargs): + """ + Args: + collection_name: str: collection name in mongodb + **kwargs: mongo db `insert_one` query defining the document where the key is as in the key of the document. + """ + coll_name = self._parse_enum_input(collection_name) + + for k in kwargs.keys(): + v = kwargs[k] + kwargs[k] = self._parse_enum_input(v) + + coll = self.get_collection(coll_name) + result = coll.insert_one(kwargs.copy()) + return kwargs + def get_collection(self, collection_name: str) -> Collection: try: return self.db[collection_name] @@ -205,35 +196,248 @@ def insert_job(self, collection_name: str, **kwargs) -> Dict[str, Any]: coll.insert_one(job_doc) return job_doc - async def insert_completed_job(self, job_id: str, results: Any, source: str) -> Dict[str, str]: - collection_name = "completed_jobs" - _time = self.timestamp() - in_progress_job_doc = { - "job_id": job_id, - "status": "COMPLETED", - "timestamp": _time, - "results": results, - "source": source} - - return self.insert_job(collection_name=collection_name, **in_progress_job_doc) - - def fetch_job(self, comparison_id: str) -> Mapping[str, Any]: - # try each collection, starting with completed_jobs - collections = ['completed_jobs', 'in_progress_jobs', 'pending_jobs'] - for i, collection in enumerate(collections): - coll = self.get_collection(collection) - job = coll.find_one({'comparison_id': comparison_id}) - # case: job exists of some type for that comparison id; return that - if not isinstance(job, type(None)): - return job - - # case: no job exists for that id - return {'bio-check-message': f"No job exists for the comparison id: {comparison_id}"} + async def update_job_status(self, collection_name: str, job_id: str, status: str | JobStatus): + job_status = self._parse_enum_input(status) + return self.db[collection_name].update_one({'job_id': job_id, }, {'$set': {'status': job_status}}) + + def _parse_enum_input(self, _input: Any) -> str: + return _input.value if isinstance(_input, Enum) else _input + + +# -- API CONTENT -- # + +def file_upload_prefix(job_id: str): + # bucket params + upload_prefix = f"file_uploads/{job_id}/" + bucket_prefix = f"gs://{BUCKET_NAME}/" + upload_prefix + return upload_prefix, bucket_prefix + + +def check_upload_file_extension(file: UploadFile, purpose: str, ext: str) -> bool: + if not file.filename.endswith(ext): + raise ValueError(f"Files for {purpose} must be passed in {ext} format.") + else: + return True + + +def setup_logging(fname: str): + logging.basicConfig( + filename=fname, + level=logging.CRITICAL, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' + ) + + +def upload_blob(bucket_name, source_file_name, destination_blob_name): + """Uploads a file to the bucket.""" + # The ID of your GCS bucket + # bucket_name = "your-bucket-name" + # The path to your file to upload + # source_file_name = "local/path/to/file" + # The ID of your GCS object + # destination_blob_name = "storage-object-name" + + storage_client = storage.Client('bio-check-428516') + bucket = storage_client.bucket(bucket_name) + blob = bucket.blob(destination_blob_name) + + # Optional: set a generation-match precondition to avoid potential race conditions + # and data corruptions. The request to upload is aborted if the object's + # generation number does not match your precondition. For a destination + # object that does not yet exist, set the if_generation_match precondition to 0. + # If the destination object already exists in your bucket, set instead a + # generation-match precondition using its generation number. + generation_match_precondition = 0 + + blob.upload_from_filename(source_file_name, if_generation_match=generation_match_precondition) + + return { + 'message': f"File {source_file_name} uploaded to {destination_blob_name}." + } + + +def read_uploaded_file(bucket_name, source_blob_name, destination_file_name): + download_blob(bucket_name, source_blob_name, destination_file_name) + + with open(destination_file_name, 'r') as f: + return f.read() + + +def download_blob(bucket_name, source_blob_name, destination_file_name): + """Downloads a blob from the bucket.""" + # bucket_name = "your-bucket-name" + # source_blob_name = "storage-object-name" + # destination_file_name = "local/path/to/file" + + storage_client = storage.Client() + bucket = storage_client.bucket(bucket_name) + blob = bucket.blob(source_blob_name) + + # Download the file to a destination + blob.download_to_filename(destination_file_name) + + +def make_dir(fp: str): + if not os.path.exists(fp): + os.mkdir(fp) + + +# -- WORKER CONTENT -- # + +def unique_id(): + return str(uuid.uuid4()) + + +def handle_exception(context: str) -> str: + import traceback + from pprint import pformat + tb_str = traceback.format_exc() + error_message = pformat(f"{context} error:\n{tb_str}") + + return error_message + + +async def load_arrows(timer): + check_timer = timer + ell = "" + bars = "" + msg = "|" + n_ellipses = timer + log_interval = check_timer / n_ellipses + for n in range(n_ellipses): + single_interval = log_interval / 3 + await sleep(single_interval) + bars += "=" + disp = bars + ">" + if n == n_ellipses - 1: + disp += "|" + print(disp) + + +# Content originally in API: +# class DatabaseConnector(ABC): +# """Abstract class that is both serializable and interacts with the database (of any type). """ +# def __init__(self, connection_uri: str, database_id: str, connector_id: str): +# self.database_id = database_id +# self.client = self._get_client(connection_uri) +# self.db = self._get_database(self.database_id) +# +# @staticmethod +# def timestamp() -> str: +# return str(datetime.utcnow()) +# +# def refresh_collection(self, coll): +# for job in self.db[coll].find(): +# self.db[coll].delete_one(job) +# +# def refresh_jobs(self): +# for collname in ['completed_jobs', 'in_progress_jobs', 'pending_jobs']: +# self.refresh_collection(collname) +# +# @abstractmethod +# def _get_client(self, *args): +# pass +# +# @abstractmethod +# def _get_database(self, db_id: str): +# pass +# +# @abstractmethod +# def pending_jobs(self): +# pass +# +# @abstractmethod +# def completed_jobs(self): +# pass +# +# @abstractmethod +# async def read(self, *args, **kwargs): +# pass +# +# @abstractmethod +# async def write(self, *args, **kwargs): +# pass +# +# @abstractmethod +# def get_collection(self, **kwargs): +# pass + + +# class MongoDbConnector(DatabaseConnector): +# def __init__(self, connection_uri: str, database_id: str, connector_id: str = None): +# super().__init__(connection_uri, database_id, connector_id) +# +# def _get_client(self, *args): +# return MongoClient(args[0]) +# +# def _get_database(self, db_id: str) -> Database: +# return self.client.get_database(db_id) +# +# def _get_jobs_from_collection(self, coll_name: str): +# return [job for job in self.db[coll_name].find()] +# +# def pending_jobs(self): +# return self._get_jobs_from_collection("pending_jobs") +# +# def completed_jobs(self): +# return self._get_jobs_from_collection("completed_jobs") +# +# @property +# def data(self): +# return self._get_data() +# +# def _get_data(self): +# return {coll_name: [v for v in self.db[coll_name].find()] for coll_name in self.db.list_collection_names()} +# +# async def read(self, collection_name: DatabaseCollections | str, **kwargs): +# """Args: +# collection_name: str +# kwargs: (as in mongodb query) +# """ +# coll_name = self._parse_enum_input(collection_name) +# coll = self.get_collection(coll_name) +# result = coll.find_one(kwargs.copy()) +# return result +# +# async def write(self, collection_name: DatabaseCollections | str, **kwargs): +# """ +# Args: +# collection_name: str: collection name in mongodb +# **kwargs: mongo db `insert_one` query defining the document where the key is as in the key of the document. +# """ +# coll_name = collection_name +# +# coll = self.get_collection(coll_name) +# result = coll.insert_one(kwargs.copy()) +# return kwargs +# +# def get_collection(self, collection_name: str) -> Collection: +# try: +# return self.db[collection_name] +# except: +# return None +# +# async def insert_job_async(self, collection_name: str, **kwargs) -> Dict[str, Any]: +# return self.insert_job(collection_name, **kwargs) +# +# def insert_job(self, collection_name: str, **kwargs) -> Dict[str, Any]: +# coll = self.get_collection(collection_name) +# job_doc = kwargs.copy() +# print("Inserting job...") +# coll.insert_one(job_doc) +# print(f"Job successfully inserted: {self.db.pending_jobs.find_one(kwargs)}.") +# return kwargs +# +# async def update_job_status(self, collection_name: str, job_id: str, status: str | JobStatus): +# job_status = self._parse_enum_input(status) +# return self.db[collection_name].update_one({'job_id': job_id, }, {'$set': {'status': job_status}}) +# +# def _parse_enum_input(self, _input: Any) -> str: +# return _input.value if isinstance(_input, Enum) else _input + + + + + - def refresh_jobs(self): - def refresh_collection(coll): - for job in self.db[coll].find(): - self.db[coll].delete_one(job) - for collname in ['completed_jobs', 'in_progress_jobs', 'pending_jobs']: - refresh_collection(collname) diff --git a/worker/Dockerfile-worker b/worker/Dockerfile-worker index d707128a1..3bab7fd01 100644 --- a/worker/Dockerfile-worker +++ b/worker/Dockerfile-worker @@ -47,6 +47,9 @@ RUN apt-get update && apt-get install -y libatlas-base-dev \ WORKDIR /app/worker +# TODO: make this permanent eventually: +RUN mv ../shared.py . + ENTRYPOINT ["bash", "-c", "source ~/.bashrc && poetry run python3 main.py"] diff --git a/worker/workers.py b/worker/workers.py index c8264e807..4e55b62c8 100644 --- a/worker/workers.py +++ b/worker/workers.py @@ -11,7 +11,7 @@ from io_worker import read_h5_reports from log_config import setup_logging -from shared import unique_id, BUCKET_NAME, PROCESS_TYPES, handle_exception +from shared import unique_id, BUCKET_NAME, handle_exception from io_worker import get_sbml_species_names, get_sbml_model_file_from_archive, read_report_outputs, download_file, format_smoldyn_configuration, write_uploaded_file from output_data import ( generate_biosimulator_utc_outputs,