From 6c6b71349d41e024bf799e6e94a8fc5ebf3a4f2e Mon Sep 17 00:00:00 2001 From: Werner Van Geit Date: Mon, 19 Aug 2024 10:44:25 +0200 Subject: [PATCH] Dashboard (#9) * Locally working dynamic settings version * Bump version * Fix waiting settings file * Rename config file to settings.json * Bump version * Switch to alpine node * Working version of dashboard * Remove the ability to change the job status * Add hover boxes etc * Use light grey for todo in dashboard * Changed direction of progress bar --- .bumpversion.cfg | 2 +- .../osparc-meta-parallelrunner/metadata.yml | 24 +- Dockerfile | 32 ++- Makefile | 25 +- docker-compose-local.yml | 2 +- docker_scripts/entrypoint.bash | 9 +- docker_scripts/http/dashboard/index.html | 12 + docker_scripts/http/dashboard/package.json | 21 ++ docker_scripts/http/dashboard/src/App.jsx | 162 +++++++++++ docker_scripts/http/dashboard/src/index.css | 16 ++ docker_scripts/http/dashboard/src/main.jsx | 10 + .../http/dashboard/tailwind.config.js | 15 + docker_scripts/http/dashboard/vite.config.js | 17 ++ docker_scripts/http/index.html | 28 -- docker_scripts/http/server/jobs.json | 2 + docker_scripts/http/server/package.json | 17 ++ docker_scripts/http/server/server.js | 52 ++++ docker_scripts/main.bash | 16 +- docker_scripts/main.py | 155 ++++++----- docker_scripts/parallelrunner.py | 263 +++++++++--------- docker_scripts/requirements.txt | 6 + docker_scripts/tools.py | 43 +++ validation/inputs/input_1/settings.json | 6 + validation/inputs/input_3/parallelrunner.json | 4 - validation/inputs/key_values.json | 9 - 25 files changed, 663 insertions(+), 285 deletions(-) create mode 100755 docker_scripts/http/dashboard/index.html create mode 100755 docker_scripts/http/dashboard/package.json create mode 100755 docker_scripts/http/dashboard/src/App.jsx create mode 100755 docker_scripts/http/dashboard/src/index.css create mode 100755 docker_scripts/http/dashboard/src/main.jsx create mode 100755 docker_scripts/http/dashboard/tailwind.config.js create mode 100755 docker_scripts/http/dashboard/vite.config.js delete mode 100755 docker_scripts/http/index.html create mode 100755 docker_scripts/http/server/jobs.json create mode 100755 docker_scripts/http/server/package.json create mode 100755 docker_scripts/http/server/server.js create mode 100755 docker_scripts/tools.py create mode 100644 validation/inputs/input_1/settings.json delete mode 100644 validation/inputs/input_3/parallelrunner.json delete mode 100755 validation/inputs/key_values.json diff --git a/.bumpversion.cfg b/.bumpversion.cfg index 92d9fd4..0d7e44a 100755 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 0.2.1 +current_version = 1.0.1 commit = False message = service version: {current_version} → {new_version} tag = False diff --git a/.osparc/osparc-meta-parallelrunner/metadata.yml b/.osparc/osparc-meta-parallelrunner/metadata.yml index 073d576..f9e476a 100755 --- a/.osparc/osparc-meta-parallelrunner/metadata.yml +++ b/.osparc/osparc-meta-parallelrunner/metadata.yml @@ -1,7 +1,7 @@ name: ParallelRunner description: "ParallelRunnerService" key: simcore/services/dynamic/osparc-meta-parallelrunner -version: 0.2.1 +version: 1.0.1 integration-version: 2.0.0 type: dynamic authors: @@ -10,30 +10,18 @@ authors: affiliation: IT'IS Foundation contact: vangeit@itis.swiss inputs: - input_0: - displayOrder: 0.0 - label: Template ID - description: - Template ID - type: string input_1: - displayOrder: 1.0 - label: Number of workers + displayOrder: 0.0 + label: Settings description: - Number of parallel workers to use to run the jobs - type: integer + JSON file with settings for the parallel runner + type: data:*/* input_2: - displayOrder: 2.0 + displayOrder: 1.0 label: Input parameters description: File with the parameter sets to evaluate type: data:*/* - input_3: - displayOrder: 2.0 - label: Settings - description: - JSON file with settings for the parallel runner - type: data:*/* outputs: output_1: displayOrder: 1.0 diff --git a/Dockerfile b/Dockerfile index 25a4eb9..fbfbe48 100755 --- a/Dockerfile +++ b/Dockerfile @@ -1,29 +1,43 @@ -FROM ubuntu:22.04 as base +FROM node:alpine as base -RUN useradd -m -r osparcuser +RUN adduser osparcuser --disabled-password USER root ENV DEBIAN_FRONTEND noninteractive ENV DEBCONF_NOWARNINGS="yes" -RUN apt-get update --yes && apt-get upgrade --yes -RUN apt-get install -y --no-install-recommends apt-utils -RUN apt-get install --yes --no-install-recommends python3 python-is-python3 python3-venv wget python3-pip gosu - +RUN apk update && apk upgrade +RUN apk add --no-cache python3 py3-pip wget bash su-exec # Copying boot scripts COPY docker_scripts /docker -RUN pip3 install pathos osparc pydantic-settings osparc-filecomms --upgrade - USER osparcuser WORKDIR /home/osparcuser +RUN python3 -m venv venv +RUN . ./venv/bin/activate && pip3 install -r /docker/requirements.txt USER root +WORKDIR /docker/http +RUN npm install vite @vitejs/plugin-react --save-dev +RUN npm create vite@latest dashboard -- --template react + +WORKDIR /docker/http/dashboard +RUN npm install +RUN npm install -D tailwindcss@latest postcss@latest autoprefixer@latest +RUN npx tailwindcss init -p + +WORKDIR /docker/http/server +RUN chown osparcuser:osparcuser jobs.json + +RUN npm install express +RUN npm run build + +USER root EXPOSE 8888 +ENV JOBS_STATUS_PATH=/docker/http/server/jobs.json ENTRYPOINT [ "/bin/bash", "-c", "/docker/entrypoint.bash" ] -CMD [ "/bin/bash", "-c", "/docker/runner.bash "] diff --git a/Makefile b/Makefile index 7e412fe..81bae79 100755 --- a/Makefile +++ b/Makefile @@ -5,7 +5,7 @@ SHELL = /bin/sh MAKEFLAGS += -j2 export DOCKER_IMAGE_NAME ?= osparc-meta-parallelrunner -export DOCKER_IMAGE_TAG ?= 0.2.1 +export DOCKER_IMAGE_TAG ?= 1.0.1 export MASTER_AWS_REGISTRY ?= registry.osparc-master-zmt.click export MASTER_REGISTRY ?= registry.osparc-master.speag.com @@ -35,25 +35,25 @@ compose-spec: ## runs ooil to assemble the docker-compose.yml file sh -c "cd /${DOCKER_IMAGE_NAME} && ooil compose" clean: - rm -rf docker-compose.yml + @rm -rf docker-compose.yml .PHONY: build build: clean compose-spec ## build docker image - chmod -R 755 docker_scripts - docker compose build + @chmod -R 755 docker_scripts + @docker compose build validation-clean: - rm -rf validation-tmp - cp -r validation validation-tmp - chmod -R 770 validation-tmp + @rm -rf validation-tmp + @cp -r validation validation-tmp + @chmod -R 770 validation-tmp validation_client_run: validation-clean - pip install osparc-filecomms - VALIDATION_CLIENT_INPUT_PATH=validation-tmp/outputs/output_1 VALIDATION_CLIENT_OUTPUT_PATH=validation-tmp/inputs/input_2 python validation-client/client.py + @pip install osparc-filecomms + @VALIDATION_CLIENT_INPUT_PATH=validation-tmp/outputs/output_1 VALIDATION_CLIENT_OUTPUT_PATH=validation-tmp/inputs/input_2 python validation-client/client.py docker_compose: validation-clean - docker compose down - docker compose --file docker-compose-local.yml up + @docker compose down + @docker compose --file docker-compose-local.yml up run-local-parallel: docker_compose validation_client_run @@ -65,13 +65,11 @@ run-local: build publish-local: run-local ## push to local throw away registry to test integration docker tag simcore/services/dynamic/${DOCKER_IMAGE_NAME}:${DOCKER_IMAGE_TAG} $(LOCAL_REGISTRY)/simcore/services/dynamic/$(DOCKER_IMAGE_NAME):$(DOCKER_IMAGE_TAG) docker push $(LOCAL_REGISTRY)/simcore/services/dynamic/$(DOCKER_IMAGE_NAME):$(DOCKER_IMAGE_TAG) - @curl $(LOCAL_REGISTRY)/v2/_catalog | jq .PHONY: publish-master publish-master: run-local ## push to local throw away registry to test integration docker tag simcore/services/dynamic/${DOCKER_IMAGE_NAME}:${DOCKER_IMAGE_TAG} $(MASTER_REGISTRY)/simcore/services/dynamic/$(DOCKER_IMAGE_NAME):$(DOCKER_IMAGE_TAG) docker push $(MASTER_REGISTRY)/simcore/services/dynamic/$(DOCKER_IMAGE_NAME):$(DOCKER_IMAGE_TAG) - @curl $(MASTER_REGISTRY)/v2/_catalog | jq .PHONY: publish-staging publish-staging: run-local ## push to local throw away registry to test integration @@ -82,7 +80,6 @@ publish-staging: run-local ## push to local throw away registry to test integrat publish-master-aws: ## push to local throw away registry to test integration docker tag simcore/services/dynamic/${DOCKER_IMAGE_NAME}:${DOCKER_IMAGE_TAG} $(MASTER_AWS_REGISTRY)/simcore/services/dynamic/$(DOCKER_IMAGE_NAME):$(DOCKER_IMAGE_TAG) docker push $(MASTER_AWS_REGISTRY)/simcore/services/dynamic/$(DOCKER_IMAGE_NAME):$(DOCKER_IMAGE_TAG) - @curl $(MASTER_AWS_REGISTRY)/v2/_catalog | jq .PHONY: help help: ## this colorful help diff --git a/docker-compose-local.yml b/docker-compose-local.yml index 64e005f..a2e6d25 100755 --- a/docker-compose-local.yml +++ b/docker-compose-local.yml @@ -1,6 +1,6 @@ services: osparc-meta-parallelrunner: - image: simcore/services/dynamic/osparc-meta-parallelrunner:0.2.1 + image: simcore/services/dynamic/osparc-meta-parallelrunner:1.0.1 ports: - "8888:8888" environment: diff --git a/docker_scripts/entrypoint.bash b/docker_scripts/entrypoint.bash index 7a95a27..b084a75 100755 --- a/docker_scripts/entrypoint.bash +++ b/docker_scripts/entrypoint.bash @@ -1,14 +1,17 @@ #!/bin/bash set -euo pipefail + IFS=$'\n\t' INFO="INFO: [$(basename "$0")] " echo "$INFO" "Starting container for parallelrunner ..." +echo "$JOBS_STATUS_PATH" + HOST_USERID=$(stat -c %u "${DY_SIDECAR_PATH_INPUTS}") HOST_GROUPID=$(stat -c %g "${DY_SIDECAR_PATH_INPUTS}") -CONTAINER_GROUPNAME=$(getent group | grep "${HOST_GROUPID}" | cut --delimiter=: --fields=1 || echo "") +CONTAINER_GROUPNAME=$(grep ":${HOST_GROUPID}:" /etc/group | cut -d: -f1 || echo "") OSPARC_USER='osparcuser' @@ -27,7 +30,7 @@ else fi echo "adding $OSPARC_USER to group $CONTAINER_GROUPNAME..." - usermod --append --groups "$CONTAINER_GROUPNAME" "$OSPARC_USER" + addgroup "$OSPARC_USER" "$CONTAINER_GROUPNAME" echo "changing owner ship of state directory /home/${OSPARC_USER}/work/workspace" chown --recursive "$OSPARC_USER" "/home/${OSPARC_USER}/work/workspace" @@ -37,4 +40,4 @@ else chown --recursive "$OSPARC_USER" "${DY_SIDECAR_PATH_OUTPUTS}" fi -exec gosu "$OSPARC_USER" /docker/main.bash +exec su-exec "$OSPARC_USER" /docker/main.bash diff --git a/docker_scripts/http/dashboard/index.html b/docker_scripts/http/dashboard/index.html new file mode 100755 index 0000000..9953913 --- /dev/null +++ b/docker_scripts/http/dashboard/index.html @@ -0,0 +1,12 @@ + + + + + + Job Status Dashboard + + +
+ + + diff --git a/docker_scripts/http/dashboard/package.json b/docker_scripts/http/dashboard/package.json new file mode 100755 index 0000000..01902c3 --- /dev/null +++ b/docker_scripts/http/dashboard/package.json @@ -0,0 +1,21 @@ +{ + "name": "job-status-dashboard", + "private": true, + "version": "0.0.0", + "type": "module", + "scripts": { + "dev": "vite", + "build": "vite build", + "preview": "vite preview" + }, + "dependencies": { + "react": "^18.2.0", + "react-dom": "^18.2.0" + }, + "devDependencies": { + "@types/react": "^18.0.28", + "@types/react-dom": "^18.0.11", + "@vitejs/plugin-react": "^3.1.0", + "vite": "^4.2.0" + } +} diff --git a/docker_scripts/http/dashboard/src/App.jsx b/docker_scripts/http/dashboard/src/App.jsx new file mode 100755 index 0000000..6860d70 --- /dev/null +++ b/docker_scripts/http/dashboard/src/App.jsx @@ -0,0 +1,162 @@ +import React, { useState, useEffect, useCallback } from 'react' + +const statusColors = { + done: 'bg-green-700', + failed: 'bg-red-700', + running: 'bg-blue-700', + todo: 'bg-gray-300' +}; + +const StatusIcon = ({ status }) => { + switch (status) { + case 'todo': + return ( + + + + ); + case 'running': + return ( + + + + ); + case 'done': + return ( + + + + ); + case 'failed': + return ( + + + + ); + default: + return null; + } +}; + +const ProgressBar = ({ jobsByStatus }) => { + const total = Object.values(jobsByStatus).reduce((acc, jobs) => acc + Object.keys(jobs).length, 0); + const widths = { + done: (Object.keys(jobsByStatus.done).length / total) * 100, + failed: (Object.keys(jobsByStatus.failed).length / total) * 100, + running: (Object.keys(jobsByStatus.running).length / total) * 100, + todo: (Object.keys(jobsByStatus.todo).length / total) * 100 + }; + + return ( +
+ {Object.entries(widths).map(([status, width]) => ( +
+ {status === 'running' && ( +
+
+
+ )} +
+ ))} +
+ ); +}; + +const JobCard = ({ job }) => ( +
+
+
+

{job.name}

+ +
+
+
+
+

{job.name}

+ +
+

{job.description}

+
+
+); + +const StatusColumn = ({ title, jobs }) => ( +
+

{title}

+

Jobs: {Object.keys(jobs).length}

+
+ {Object.entries(jobs).map(([id, job]) => ( + + ))} +
+
+); + +const Dashboard = () => { + const [jobs, setJobs] = useState({}); + const [loading, setLoading] = useState(true); + const [error, setError] = useState(null); + + const fetchJobs = useCallback(async () => { + try { + const response = await fetch('/api/jobs'); + if (!response.ok) { + throw new Error(`HTTP error! status: ${response.status}`); + } + const data = await response.json(); + setJobs(data); + setLoading(false); + } catch (e) { + console.error("Failed to fetch jobs:", e); + setError("Failed to load jobs. Please try again later."); + setLoading(false); + } + }, []); + + useEffect(() => { + fetchJobs(); + const intervalId = setInterval(fetchJobs, 1000); // Refresh every second + + return () => clearInterval(intervalId); + }, [fetchJobs]); + + const jobsByStatus = Object.entries(jobs).reduce((acc, [id, job]) => { + if (!acc[job.status]) acc[job.status] = {}; + acc[job.status][id] = job; + return acc; + }, {todo: {}, running: {}, done: {}, failed: {}}); + + if (loading) { + return
Loading...
; + } + + if (error) { + return
{error}
; + } + + return ( +
+ +
+ + + + +
+
+ ); +}; + +const App = () => { + return ( +
+ +
+ ); +} + +export default App; diff --git a/docker_scripts/http/dashboard/src/index.css b/docker_scripts/http/dashboard/src/index.css new file mode 100755 index 0000000..ddbaabb --- /dev/null +++ b/docker_scripts/http/dashboard/src/index.css @@ -0,0 +1,16 @@ +@tailwind base; +@tailwind components; +@tailwind utilities; + +@keyframes progress-mac { + 0% { + transform: translateX(-100%); + } + 100% { + transform: translateX(0); + } +} + +.animate-progress-mac { + animation: progress-mac 2s linear infinite; +} diff --git a/docker_scripts/http/dashboard/src/main.jsx b/docker_scripts/http/dashboard/src/main.jsx new file mode 100755 index 0000000..54b39dd --- /dev/null +++ b/docker_scripts/http/dashboard/src/main.jsx @@ -0,0 +1,10 @@ +import React from 'react' +import ReactDOM from 'react-dom/client' +import App from './App.jsx' +import './index.css' + +ReactDOM.createRoot(document.getElementById('root')).render( + + + , +) diff --git a/docker_scripts/http/dashboard/tailwind.config.js b/docker_scripts/http/dashboard/tailwind.config.js new file mode 100755 index 0000000..3daec51 --- /dev/null +++ b/docker_scripts/http/dashboard/tailwind.config.js @@ -0,0 +1,15 @@ +/** @type {import('tailwindcss').Config} */ +export default { + content: [ + "./index.html", + "./src/**/*.{js,ts,jsx,tsx}", + ], + theme: { + extend: { + animation: { + 'progress-mac': 'progress-mac 2s linear infinite', + }, + }, + }, + plugins: [], +} diff --git a/docker_scripts/http/dashboard/vite.config.js b/docker_scripts/http/dashboard/vite.config.js new file mode 100755 index 0000000..69b4764 --- /dev/null +++ b/docker_scripts/http/dashboard/vite.config.js @@ -0,0 +1,17 @@ +import { defineConfig } from 'vite' +import react from '@vitejs/plugin-react' +import path from 'path' + +export default defineConfig({ + plugins: [react()], + root: path.resolve(__dirname, './'), + build: { + outDir: path.resolve(__dirname, '../server/dist'), + emptyOutDir: true, + }, + server: { + proxy: { + '/api': 'http://localhost:8888' + } + } +}) diff --git a/docker_scripts/http/index.html b/docker_scripts/http/index.html deleted file mode 100755 index 97fea85..0000000 --- a/docker_scripts/http/index.html +++ /dev/null @@ -1,28 +0,0 @@ - - - - - - - -

-oSparc Meta Parallel Runner -

- - - - - - diff --git a/docker_scripts/http/server/jobs.json b/docker_scripts/http/server/jobs.json new file mode 100755 index 0000000..2c63c08 --- /dev/null +++ b/docker_scripts/http/server/jobs.json @@ -0,0 +1,2 @@ +{ +} diff --git a/docker_scripts/http/server/package.json b/docker_scripts/http/server/package.json new file mode 100755 index 0000000..0a23d5f --- /dev/null +++ b/docker_scripts/http/server/package.json @@ -0,0 +1,17 @@ +{ + "name": "job-status-server", + "version": "1.0.0", + "description": "Server for Job Status Dashboard", + "main": "server.js", + "scripts": { + "start": "node server.js", + "build": "cd ../dashboard && npm install && npm run build", + "dev": "nodemon server.js" + }, + "dependencies": { + "express": "^4.17.1" + }, + "devDependencies": { + "nodemon": "^2.0.12" + } +} diff --git a/docker_scripts/http/server/server.js b/docker_scripts/http/server/server.js new file mode 100755 index 0000000..9f98635 --- /dev/null +++ b/docker_scripts/http/server/server.js @@ -0,0 +1,52 @@ +const express = require('express'); +const fs = require('fs').promises; +const path = require('path'); + +const app = express(); +const port = 8888; + +app.use(express.json()); + +const dataFile = path.join(__dirname, 'jobs.json'); + +app.get('/api/jobs', async (req, res) => { + try { + const data = await fs.readFile(dataFile, 'utf8'); + res.json(JSON.parse(data)); + } catch (err) { + console.error('Error reading file:', err); + res.status(500).json({ error: 'Error reading job data' }); + } +}); + +app.put('/api/jobs/:id', async (req, res) => { + const { id } = req.params; + const { status } = req.body; + + try { + const data = await fs.readFile(dataFile, 'utf8'); + const jobs = JSON.parse(data); + + if (!jobs[id]) { + return res.status(404).json({ error: 'Job not found' }); + } + + jobs[id].status = status; + await fs.writeFile(dataFile, JSON.stringify(jobs, null, 2)); + res.json(jobs[id]); + } catch (err) { + console.error('Error updating job:', err); + res.status(500).json({ error: 'Error updating job' }); + } +}); + +app.use(express.static(path.join(__dirname, 'dist'))); + +app.get('*', (req, res) => { + res.sendFile(path.join(__dirname, 'dist', 'index.html')); +}); + +app.listen(port, () => { + console.log(`Server running at http://localhost:${port}`); +}); + diff --git a/docker_scripts/main.bash b/docker_scripts/main.bash index c9aa4a3..b0aea7c 100755 --- a/docker_scripts/main.bash +++ b/docker_scripts/main.bash @@ -1,6 +1,14 @@ #!/bin/bash -pip install -r /docker/requirements.txt -echo "Starting map python code" -python3 /docker/main.py -echo "Closing map python code" +set -e + +echo "Starting webserver" +cd /docker/http/server +npm start & + +echo "Starting parallelrunner python code" +cd ${HOME} +source ./venv/bin/activate +python /docker/main.py + +echo "Closing parallelrunner" diff --git a/docker_scripts/main.py b/docker_scripts/main.py index a44a61a..11697d4 100755 --- a/docker_scripts/main.py +++ b/docker_scripts/main.py @@ -1,15 +1,11 @@ -import http.server import json import logging -import pathlib as pl -import socketserver -import threading -import time import pydantic as pyda import pydantic_settings import parallelrunner +import tools logging.basicConfig( level=logging.INFO, format="[%(filename)s:%(lineno)d] %(message)s" @@ -17,82 +13,109 @@ logger = logging.getLogger(__name__) HTTP_PORT = 8888 -INPUT_CONF_KEY = "input_3" +INPUT_CONF_KEY = "input_1" CONF_SCHEMA_KEY = "conf_json_schema" -FILE_POLLING_INTERVAL = 1 # second +DEFAULT_FILE_POLLING_INTERVAL = 1 # second -MAX_JOB_CREATE_ATTEMPTS = 5 -JOB_CREATE_ATTEMPTS_DELAY = 5 -MAX_JOB_TRIALS = 5 +DEFAULT_NUMBER_OF_WORKERS = 1 +DEFAULT_TEMPLATE_ID = "" -JOB_TIMEOUT = None +DEFAULT_MAX_JOB_CREATE_ATTEMPTS = 5 +DEFAULT_JOB_CREATE_ATTEMPTS_DELAY = 5 +DEFAULT_MAX_JOB_TRIALS = 5 + +DEFAULT_JOB_TIMEOUT = 0 + +MAX_N_OF_WORKERS = 10 def main(): """Main""" - settings = ParallelRunnerMainSettings() - settings_schema = settings.model_json_schema() - logger.info(settings_schema) - conf_json_schema_path = ( - settings.output_path / CONF_SCHEMA_KEY / "schema.json" + settings = ParallelRunnerDynamicSettings() + + # Wait for and read the settings file + logger.info( + f"Waiting for settings file to appear at {settings.settings_file_path}" ) - conf_json_schema_path.write_text(json.dumps(settings_schema, indent=2)) + settings.read_settings_file() + logger.info("Settings file was read") - config_path = settings.input_path / INPUT_CONF_KEY / "parallelrunner.json" + # Create and start the maprunner + maprunner = parallelrunner.ParallelRunner(settings) + maprunner.setup() + maprunner.start() - http_dir_path = pl.Path(__file__).parent / "http" + # Stop the maprunner + maprunner.teardown() - class HTTPHandler(http.server.SimpleHTTPRequestHandler): - def __init__(self, *args, **kwargs): - super().__init__( - *args, **kwargs, directory=http_dir_path.resolve() - ) - try: - logger.info( - f"Starting http server at port {HTTP_PORT} and serving path {http_dir_path}" +class ParallelRunnerDynamicSettings: + def __init__(self): + self._settings = self.ParallelRunnerMainSettings() + conf_json_schema_path = ( + self._settings.output_path / CONF_SCHEMA_KEY / "schema.json" ) - with socketserver.TCPServer(("", HTTP_PORT), HTTPHandler) as httpd: - # First start the empty web server - httpd_thread = threading.Thread(target=httpd.serve_forever) - httpd_thread.start() - - # Now start the real parallel runner - waiter = 0 - while not config_path.exists(): - if waiter % 10 == 0: - logger.info("Waiting for parallelrunner.json to exist ...") - time.sleep(settings.file_polling_interval) - waiter += 1 - - settings = settings.parse_file(config_path) - logging.info(f"Received the following settings: {settings}") - - maprunner = parallelrunner.ParallelRunner(**settings.dict()) - - maprunner.setup() - maprunner.start() - maprunner.teardown() - - httpd.shutdown() - except Exception as err: # pylint: disable=broad-except - logger.error(f"{err} . Stopping %s", exc_info=True) - - -class ParallelRunnerMainSettings(pydantic_settings.BaseSettings): - batch_mode: bool = False - file_polling_interval: int = FILE_POLLING_INTERVAL - input_path: pyda.DirectoryPath = pyda.Field(alias="DY_SIDECAR_PATH_INPUTS") - output_path: pyda.DirectoryPath = pyda.Field( - alias="DY_SIDECAR_PATH_OUTPUTS" - ) - max_job_trials: int = MAX_JOB_TRIALS - file_polling_interval: int = FILE_POLLING_INTERVAL - max_job_create_attempts: int = MAX_JOB_CREATE_ATTEMPTS - job_create_attempts_delay: int = JOB_CREATE_ATTEMPTS_DELAY - job_timeout: None | float = JOB_TIMEOUT + + settings_schema = self._settings.model_json_schema() + + # Hide some settings from the user + for field_name in [ + "max_number_of_workers", + "JOBS_STATUS_PATH", + "DY_SIDECAR_PATH_INPUTS", + "DY_SIDECAR_PATH_OUTPUTS", + ]: + settings_schema["properties"].pop(field_name) + + conf_json_schema_path.write_text(json.dumps(settings_schema, indent=2)) + + self.settings_file_path = ( + self._settings.input_path / INPUT_CONF_KEY / "settings.json" + ) + + def __getattr__(self, name): + if name in self.__dict__: + return self.__dict__[name] + else: + self.read_settings_file() + return getattr(self._settings, name) + + def read_settings_file(self): + tools.wait_for_path(self.settings_file_path) + self._settings = self._settings.parse_file(self.settings_file_path) + + class ParallelRunnerMainSettings(pydantic_settings.BaseSettings): + template_id: str = pyda.Field(default=DEFAULT_TEMPLATE_ID) + max_number_of_workers: int = pyda.Field( + MAX_N_OF_WORKERS, allow_mutation=False + ) + number_of_workers: int = pyda.Field( + default=DEFAULT_NUMBER_OF_WORKERS, gt=0, le=MAX_N_OF_WORKERS + ) + batch_mode: bool = pyda.Field(default=False) + file_polling_interval: int = pyda.Field( + default=DEFAULT_FILE_POLLING_INTERVAL + ) + input_path: pyda.DirectoryPath = pyda.Field( + alias="DY_SIDECAR_PATH_INPUTS" + ) + output_path: pyda.DirectoryPath = pyda.Field( + alias="DY_SIDECAR_PATH_OUTPUTS" + ) + max_job_trials: int = pyda.Field(default=DEFAULT_MAX_JOB_TRIALS, gt=0) + file_polling_interval: int = pyda.Field( + DEFAULT_FILE_POLLING_INTERVAL, gt=0 + ) + max_job_create_attempts: int = pyda.Field( + default=DEFAULT_MAX_JOB_CREATE_ATTEMPTS, gt=0 + ) + job_create_attempts_delay: int = pyda.Field( + default=DEFAULT_JOB_CREATE_ATTEMPTS_DELAY, gt=0 + ) + job_timeout: float = pyda.Field(default=DEFAULT_JOB_TIMEOUT, ge=0) + jobs_status_path: pyda.FilePath = pyda.Field(alias="JOBS_STATUS_PATH") if __name__ == "__main__": diff --git a/docker_scripts/parallelrunner.py b/docker_scripts/parallelrunner.py index d542179..cadfd6f 100755 --- a/docker_scripts/parallelrunner.py +++ b/docker_scripts/parallelrunner.py @@ -2,6 +2,7 @@ import getpass import json import logging +import multiprocessing import os import pathlib as pl import tempfile @@ -15,50 +16,27 @@ import pathos from osparc_filecomms import handshakers +import tools + logging.basicConfig( level=logging.INFO, format="[%(filename)s:%(lineno)d] %(message)s" ) logger = logging.getLogger(__name__) -MAX_N_OF_WORKERS = 10 - -TEMPLATE_ID_KEY = "input_0" -N_OF_WORKERS_KEY = "input_1" -INPUT_PARAMETERS_KEY = "input_2" - - class ParallelRunner: - def __init__( - self, - input_path, - output_path, - max_n_of_workers=MAX_N_OF_WORKERS, - batch_mode=False, - file_polling_interval=None, - max_job_trials=None, - max_job_create_attempts=None, - job_create_attempts_delay=None, - job_timeout=None, - ): + def __init__(self, settings): """Constructor""" - self.test_mode = False + self.settings = settings - self.batch_mode = batch_mode - self.max_n_of_workers = max_n_of_workers - self.max_job_trials = max_job_trials - self.max_job_create_attempts = max_job_create_attempts - self.job_create_attempts_delay = job_create_attempts_delay - self.job_timeout = job_timeout + self.test_mode = False - self.input_path = input_path # path where osparc write all our input - self.output_path = output_path # path where osparc write all our input - self.key_values_path = self.input_path / "key_values.json" + self.key_values_path = self.settings.input_path / "key_values.json" - self.input_tasks_dir_path = self.input_path / "input_2" + self.input_tasks_dir_path = self.settings.input_path / "input_2" self.input_tasks_path = self.input_tasks_dir_path / "input_tasks.json" - self.output_tasks_dir_path = self.output_path / "output_1" + self.output_tasks_dir_path = self.settings.output_path / "output_1" self.output_tasks_path = ( self.output_tasks_dir_path / "output_tasks.json" ) @@ -66,7 +44,6 @@ def __init__( if self.output_tasks_path.exists(): self.output_tasks_path.unlink() - self.file_polling_interval = file_polling_interval self.caller_uuid = None self.uuid = str(uuid.uuid4()) @@ -79,12 +56,11 @@ def __init__( polling_interval=0.1, print_polling_interval=100, ) + self.lock = multiprocessing.Lock() def setup(self): """Setup the Python Runner""" - logger.info(f"Using host: [{os.environ['OSPARC_API_HOST']}]") - logger.info(f"Using key: [{os.environ['OSPARC_API_KEY']}]") - logger.info(f"Using secret: [{os.environ['OSPARC_API_SECRET']}]") + logger.info(f"Using API host: [{os.environ['OSPARC_API_HOST']}]") self.osparc_cfg = osparc.Configuration( host=os.environ["OSPARC_API_HOST"], username=os.environ["OSPARC_API_KEY"], @@ -104,70 +80,36 @@ def start(self): logger.info("Starting map ...") logger.info(f"User: {getpass.getuser()}, UID: {os.getuid()}") - logger.info(f"Input path: {self.input_path.resolve()}") - - waiter = 0 - while not self.key_values_path.exists(): - if waiter % 10 == 0: - logger.info("Waiting for key_values.json to exist ...") - time.sleep(self.file_polling_interval) - waiter += 1 - - key_values = json.loads(self.key_values_path.read_text()) + logger.info(f"Input path: {self.settings.input_path.resolve()}") self.caller_uuid = self.handshaker.shake() logger.info(f"Performed handshake with caller: {self.caller_uuid}") - waiter = 0 - while ( - INPUT_PARAMETERS_KEY not in key_values - or key_values[INPUT_PARAMETERS_KEY]["value"] is None - or TEMPLATE_ID_KEY not in key_values - or key_values[TEMPLATE_ID_KEY]["value"] is None - or N_OF_WORKERS_KEY not in key_values - or key_values[N_OF_WORKERS_KEY]["value"] is None - ): - if waiter % 10 == 0: - logger.info( - "Waiting for all required keys to " - f"exist in key_values, current content: {key_values}..." - ) - key_values = json.loads(self.key_values_path.read_text()) - time.sleep(self.file_polling_interval) - waiter += 1 - - self.template_id = key_values[TEMPLATE_ID_KEY]["value"] - if self.template_id is None: + if self.settings.template_id is None: raise ValueError("Template ID can't be None") - if self.template_id == "TEST_UUID": + if self.settings.template_id == "TEST_UUID": self.test_mode = True self.api_client = None self.studies_api = None - n_of_workers = key_values[N_OF_WORKERS_KEY]["value"] - if n_of_workers is None: + if self.settings.number_of_workers is None: raise ValueError("Number of workers can't be None") - elif n_of_workers > self.max_n_of_workers: - logger.warning( - "Attempt to set number of workers to more than " - f"is allowed ({self.max_n_of_workers}), limiting value " - "to maximum amount" + elif ( + self.settings.number_of_workers + > self.settings.max_number_of_workers + ): + raise ValueError( + "Attempt to set number of workers to " + f"{self.settings.number_of_workers} which is more than " + f"is allowed ({self.settings.max_number_of_workers}), " + "limit value to maximum amount" ) - n_of_workers = self.max_n_of_workers last_tasks_uuid = "" waiter_wrong_uuid = 0 while True: - waiter_input_exists = 0 - while not self.input_tasks_path.exists(): - if waiter_input_exists % 10 == 0: - logger.info( - f"Waiting for input file at {self.input_tasks_path}..." - ) - self.handshaker.retry_last_write() - time.sleep(self.file_polling_interval) - waiter_input_exists += 1 + tools.wait_for_path(self.input_tasks_path) input_dict = json.loads(self.input_tasks_path.read_text()) command = input_dict["command"] @@ -179,7 +121,7 @@ def start(self): "Received command with wrong caller uuid: " f"{caller_uuid} or map uuid: {map_uuid}" ) - time.sleep(self.file_polling_interval) + time.sleep(self.settings.file_polling_interval) waiter_wrong_uuid += 1 continue @@ -191,40 +133,42 @@ def start(self): if tasks_uuid == last_tasks_uuid: if waiter_wrong_uuid % 10 == 0: logger.info("Waiting for new tasks uuid") - time.sleep(self.file_polling_interval) + time.sleep(self.settings.file_polling_interval) waiter_wrong_uuid += 1 else: input_tasks = input_dict["tasks"] - if self.batch_mode: - n_of_batches = n_of_workers - else: - n_of_batches = len(input_tasks) - - input_batches = self.batch_input_tasks( - input_tasks, n_of_batches - ) - - output_batches = self.run_batches( - tasks_uuid, input_batches, n_of_workers - ) - - output_tasks = self.unbatch_output_tasks(output_batches) - - output_tasks_content = json.dumps( - {"uuid": tasks_uuid, "tasks": output_tasks} - ) - self.output_tasks_path.write_text(output_tasks_content) - logger.info(f"Finished a set of {len(output_tasks)} tasks") - logger.debug( - f"Finished a set of tasks: {output_tasks_content}" - ) + self.run_input_tasks(input_tasks, tasks_uuid) last_tasks_uuid = tasks_uuid waiter_wrong_uuid = 0 else: raise ValueError("Command unknown: {command}") - time.sleep(self.file_polling_interval) + time.sleep(self.settings.file_polling_interval) + + def run_input_tasks(self, input_tasks, tasks_uuid): + number_of_workers = self.settings.number_of_workers + batch_mode = self.settings.batch_mode + + if batch_mode: + n_of_batches = number_of_workers + else: + n_of_batches = len(input_tasks) + + input_batches = self.batch_input_tasks(input_tasks, n_of_batches) + + output_batches = self.run_batches( + tasks_uuid, input_batches, number_of_workers + ) + + output_tasks = self.unbatch_output_tasks(output_batches) + + output_tasks_content = json.dumps( + {"uuid": tasks_uuid, "tasks": output_tasks} + ) + self.output_tasks_path.write_text(output_tasks_content) + logger.info(f"Finished a set of {len(output_tasks)} tasks") + logger.debug(f"Finished a set of tasks: {output_tasks_content}") def batch_input_tasks(self, input_tasks, n_of_batches): batches = [[] for _ in range(n_of_batches)] @@ -304,15 +248,15 @@ def run_job(self, job_inputs, input_batch): return done_batch with self.create_study_job( - self.template_id, job_inputs, self.studies_api + self.settings.template_id, job_inputs, self.studies_api ) as job: job_status = self.studies_api.start_study_job( - study_id=self.template_id, job_id=job.id + study_id=self.settings.template_id, job_id=job.id ) while job_status.stopped_at is None: job_status = self.studies_api.inspect_study_job( - study_id=self.template_id, job_id=job.id + study_id=self.settings.template_id, job_id=job.id ) time.sleep(1) @@ -325,7 +269,7 @@ def run_job(self, job_inputs, input_batch): ) else: job_outputs = self.studies_api.get_study_job_outputs( - study_id=self.template_id, job_id=job.id + study_id=self.settings.template_id, job_id=job.id ).results done_batch = self.process_job_outputs( @@ -335,7 +279,7 @@ def run_job(self, job_inputs, input_batch): return done_batch def process_job_outputs(self, results, batch, status): - if self.template_id == "TEST_UUID": + if self.settings.template_id == "TEST_UUID": logger.info("Map in test mode, just returning input") return batch @@ -382,12 +326,12 @@ def process_job_outputs(self, results, batch, status): else: output[probe_name]["value"] = probe_output - if self.batch_mode and probe_type == "FileJSON": + if self.settings.batch_mode and probe_type == "FileJSON": output[probe_name]["value"] = output[probe_name]["value"][ task_i ] else: - if self.batch_mode: + if self.settings.batch_mode: raise ParallelRunner.FatalException( "Only FileJSON output allowed in batch mode, " f"received {probe_type} for {probe_name} output" @@ -402,7 +346,7 @@ def transform_batch_to_task_input(self, batch): for param_name, param_input in input.items(): param_type = param_input["type"] - if param_type == "FileJSON" and self.batch_mode: + if param_type == "FileJSON" and self.settings.batch_mode: param_filename = param_input["filename"] param_value = param_input["value"] @@ -426,7 +370,31 @@ def transform_batch_to_task_input(self, batch): return task_input - def run_batches(self, tasks_uuid, input_batches, n_of_workers): + def jobs_file_write_new(self, id, name, description, status): + with self.lock: + jobs_statuses = json.loads( + self.settings.jobs_status_path.read_text() + ) + jobs_statuses[id] = { + "name": name, + "description": description, + "status": status, + } + self.settings.jobs_status_path.write_text( + json.dumps(jobs_statuses) + ) + + def jobs_file_write_status_change(self, id, status): + with self.lock: + jobs_statuses = json.loads( + self.settings.jobs_status_path.read_text() + ) + jobs_statuses[id]["status"] = status + self.settings.jobs_status_path.write_text( + json.dumps(jobs_statuses) + ) + + def run_batches(self, tasks_uuid, input_batches, number_of_workers): """Run the tasks""" logger.info(f"Evaluating {len(input_batches)} batches") @@ -434,12 +402,17 @@ def run_batches(self, tasks_uuid, input_batches, n_of_workers): self.n_of_finished_batches = 0 - def map_func(batch, trial_number=1): + def map_func(batch_with_uuid, trial_number=1): + batch_uuid, batch = batch_with_uuid try: logger.info( "Running worker for a batch of " f"{len(batch)} tasks" ) logger.debug(f"Running worker for batch: {batch}") + self.jobs_file_write_status_change( + id=batch_uuid, + status="running", + ) task_input = self.transform_batch_to_task_input(batch) @@ -449,30 +422,48 @@ def map_func(batch, trial_number=1): output_batch_waiter = timeout_pool.apipe( self.run_job, job_inputs, batch ) - output_batch = output_batch_waiter.get( - timeout=self.job_timeout + job_timeout = ( + self.settings.job_timeout + if self.settings.job_timeout > 0 + else None ) + output_batch = output_batch_waiter.get(timeout=job_timeout) timeout_pool.close() timeout_pool.join() timeout_pool.clear() # Pool is singleton, need to clear old pool + self.jobs_file_write_status_change( + id=batch_uuid, + status="done", + ) + self.n_of_finished_batches += 1 logger.info( "Worker has finished batch " f"{self.n_of_finished_batches} of {len(input_batches)}" ) + except ParallelRunner.FatalException as error: logger.info( f"Batch {batch} failed with fatal error ({error}) in " f"trial {trial_number}, not retrying, raising error" ) + self.jobs_file_write_status_change( + id=batch_uuid, + status="failed", + ) + raise error except Exception as error: - if trial_number >= self.max_job_trials: + if trial_number >= self.settings.max_job_trials: logger.info( f"Batch {batch} failed with error ({error}) in " f"trial {trial_number}, reach max number of trials of " - f"{self.max_job_trials}, not retrying, raising error" + f"{self.settings.max_job_trials}, not retrying, raising error" + ) + self.jobs_file_write_status_change( + id=batch_uuid, + status="failed", ) raise error else: @@ -487,10 +478,23 @@ def map_func(batch, trial_number=1): return output_batch logger.info( - f"Starting {len(input_batches)} batches on {n_of_workers} workers" + f"Starting {len(input_batches)} batches on {number_of_workers} workers" ) - with pathos.pools.ThreadPool(nodes=n_of_workers) as pool: - output_tasks = list(pool.map(map_func, input_batches)) + + input_batches_with_uuid = [ + (str(uuid.uuid4()), input_batch) for input_batch in input_batches + ] + + for batch_uuid, input_batch in input_batches_with_uuid: + self.jobs_file_write_new( + id=batch_uuid, + name=f"Batch {batch_uuid}", + description=str(input_batch), + status="todo", + ) + + with pathos.pools.ThreadPool(nodes=number_of_workers) as pool: + output_tasks = list(pool.map(map_func, input_batches_with_uuid)) pool.close() pool.join() pool.clear() # Pool is singleton, need to clear old pool @@ -527,7 +531,10 @@ def create_study_job(self, template_id, job_inputs, studies_api): ) break except osparc_client.exceptions.ApiException as api_exception: - if n_of_create_attempts >= self.max_job_create_attempts: + if ( + n_of_create_attempts + >= self.settings.max_job_create_attempts + ): raise Exception( f"Tried {n_of_create_attempts} times to create a job from " "the study, but failed" @@ -542,7 +549,7 @@ def create_study_job(self, template_id, job_inputs, studies_api): "Received an unhandled API Exception from server " "when creating job, retrying..." ) - time.sleep(self.job_create_attempts_delay) + time.sleep(self.settings.job_create_attempts_delay) try: yield job diff --git a/docker_scripts/requirements.txt b/docker_scripts/requirements.txt index e69de29..fc07272 100755 --- a/docker_scripts/requirements.txt +++ b/docker_scripts/requirements.txt @@ -0,0 +1,6 @@ +pathos +osparc +pydantic +pydantic-settings +osparc-filecomms +watchdog diff --git a/docker_scripts/tools.py b/docker_scripts/tools.py new file mode 100755 index 0000000..582f6b3 --- /dev/null +++ b/docker_scripts/tools.py @@ -0,0 +1,43 @@ +import pathlib as pl + +from watchdog.events import FileSystemEventHandler +from watchdog.observers import Observer + + +def wait_for_path(file_path): + path = pl.Path(file_path).resolve() + if path.exists(): + return str(path) + + # Find the closest existing parent directory + watch_dir = path + while not watch_dir.exists(): + watch_dir = watch_dir.parent + + class Handler(FileSystemEventHandler): + def __init__(self): + self.created = False + + def on_created(self, event): + nonlocal path + created_path = pl.Path(event.src_path).resolve() + if created_path == path: + self.created = True + elif path.is_relative_to(created_path): + # Update path if a parent directory was created + path = created_path / path.relative_to(created_path) + + handler = Handler() + observer = Observer() + observer.schedule(handler, str(watch_dir), recursive=True) + observer.start() + + try: + while not handler.created: + if path.exists(): + return str(path) + observer.join(0.1) + return str(path) + finally: + observer.stop() + observer.join() diff --git a/validation/inputs/input_1/settings.json b/validation/inputs/input_1/settings.json new file mode 100644 index 0000000..6d2c752 --- /dev/null +++ b/validation/inputs/input_1/settings.json @@ -0,0 +1,6 @@ +{ + "template_id": "TEST_UUID", + "number_of_workers": 2, + "batch_mode": true, + "job_timeout": 60 +} diff --git a/validation/inputs/input_3/parallelrunner.json b/validation/inputs/input_3/parallelrunner.json deleted file mode 100644 index e775459..0000000 --- a/validation/inputs/input_3/parallelrunner.json +++ /dev/null @@ -1,4 +0,0 @@ -{ - "batch_mode": true, - "job_timeout": 60 -} diff --git a/validation/inputs/key_values.json b/validation/inputs/key_values.json deleted file mode 100755 index e2859a9..0000000 --- a/validation/inputs/key_values.json +++ /dev/null @@ -1,9 +0,0 @@ -{ - "input_0": { - "key": "input_0", - "value": "TEST_UUID" - }, - "input_1": {"key": "input_1", "value": 2}, - "input_2": {"key": "input_2", "value": "/tmp/inputs/input_2/"} - -}