Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add job timeout #8

Merged
merged 5 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.1.20
current_version = 0.2.1
commit = False
message = service version: {current_version} → {new_version}
tag = False
Expand Down
7 changes: 6 additions & 1 deletion .osparc/osparc-meta-parallelrunner/metadata.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name: ParallelRunner
description: "ParallelRunnerService"
key: simcore/services/dynamic/osparc-meta-parallelrunner
version: 0.1.20
version: 0.2.1
integration-version: 2.0.0
type: dynamic
authors:
Expand Down Expand Up @@ -40,6 +40,11 @@ outputs:
label: Output values
description: Output files uploaded from the outputs folder
type: data:*/*
conf_json_schema:
displayOrder: 2.0
label: JSON schema
description: JSON schema of configuration file
type: data:*/*
boot-options:
boot_mode:
label: Boot mode
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ SHELL = /bin/sh
MAKEFLAGS += -j2

export DOCKER_IMAGE_NAME ?= osparc-meta-parallelrunner
export DOCKER_IMAGE_TAG ?= 0.1.20
export DOCKER_IMAGE_TAG ?= 0.2.1

export MASTER_AWS_REGISTRY ?= registry.osparc-master-zmt.click
export MASTER_REGISTRY ?= registry.osparc-master.speag.com
Expand Down Expand Up @@ -43,7 +43,7 @@ build: clean compose-spec ## build docker image
docker compose build

validation-clean:
sudo rm -rf validation-tmp
rm -rf validation-tmp
cp -r validation validation-tmp
chmod -R 770 validation-tmp

Expand Down
2 changes: 1 addition & 1 deletion docker-compose-local.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
services:
osparc-meta-parallelrunner:
image: simcore/services/dynamic/osparc-meta-parallelrunner:0.1.20
image: simcore/services/dynamic/osparc-meta-parallelrunner:0.2.1
ports:
- "8888:8888"
environment:
Expand Down
17 changes: 14 additions & 3 deletions docker_scripts/main.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import http.server
import json
import logging
import pathlib as pl
import socketserver
import threading
import time
import typing

import pydantic as pyda
import pydantic_settings
Expand All @@ -18,18 +18,28 @@

HTTP_PORT = 8888
INPUT_CONF_KEY = "input_3"
CONF_SCHEMA_KEY = "conf_json_schema"

FILE_POLLING_INTERVAL = 1 # second

MAX_JOB_CREATE_ATTEMPTS = 5
JOB_CREATE_ATTEMPTS_DELAY = 5
MAX_JOB_TRIALS = 5

JOB_TIMEOUT = None


def main():
"""Main"""

settings = MainSettings()
settings = ParallelRunnerMainSettings()
settings_schema = settings.model_json_schema()
logger.info(settings_schema)
conf_json_schema_path = (
settings.output_path / CONF_SCHEMA_KEY / "schema.json"
)
conf_json_schema_path.write_text(json.dumps(settings_schema, indent=2))

config_path = settings.input_path / INPUT_CONF_KEY / "parallelrunner.json"

http_dir_path = pl.Path(__file__).parent / "http"
Expand Down Expand Up @@ -71,7 +81,7 @@ def __init__(self, *args, **kwargs):
logger.error(f"{err} . Stopping %s", exc_info=True)


class MainSettings(pydantic_settings.BaseSettings):
class ParallelRunnerMainSettings(pydantic_settings.BaseSettings):
batch_mode: bool = False
file_polling_interval: int = FILE_POLLING_INTERVAL
input_path: pyda.DirectoryPath = pyda.Field(alias="DY_SIDECAR_PATH_INPUTS")
Expand All @@ -82,6 +92,7 @@ class MainSettings(pydantic_settings.BaseSettings):
file_polling_interval: int = FILE_POLLING_INTERVAL
max_job_create_attempts: int = MAX_JOB_CREATE_ATTEMPTS
job_create_attempts_delay: int = JOB_CREATE_ATTEMPTS_DELAY
job_timeout: None | float = JOB_TIMEOUT


if __name__ == "__main__":
Expand Down
15 changes: 13 additions & 2 deletions docker_scripts/parallelrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def __init__(
max_job_trials=None,
max_job_create_attempts=None,
job_create_attempts_delay=None,
job_timeout=None,
):
"""Constructor"""
self.test_mode = False
Expand All @@ -48,6 +49,7 @@ def __init__(
self.max_job_trials = max_job_trials
self.max_job_create_attempts = max_job_create_attempts
self.job_create_attempts_delay = job_create_attempts_delay
self.job_timeout = job_timeout

self.input_path = input_path # path where osparc write all our input
self.output_path = output_path # path where osparc write all our input
Expand Down Expand Up @@ -291,13 +293,13 @@ def run_job(self, job_inputs, input_batch):
"""Run a job with given inputs"""

logger.debug(f"Sending inputs: {job_inputs}")

if self.test_mode:
logger.info("Map in test mode, just returning input")

done_batch = self.process_job_outputs(
job_inputs, input_batch, "SUCCESS"
)
time.sleep(1)

return done_batch

Expand Down Expand Up @@ -443,7 +445,16 @@ def map_func(batch, trial_number=1):

job_inputs = self.create_job_inputs(task_input)

output_batch = self.run_job(job_inputs, batch)
with pathos.pools.ThreadPool(nodes=1) as timeout_pool:
output_batch_waiter = timeout_pool.apipe(
self.run_job, job_inputs, batch
)
output_batch = output_batch_waiter.get(
timeout=self.job_timeout
)
timeout_pool.close()
timeout_pool.join()
timeout_pool.clear() # Pool is singleton, need to clear old pool

self.n_of_finished_batches += 1
logger.info(
Expand Down
3 changes: 2 additions & 1 deletion validation/inputs/input_3/parallelrunner.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{
"batch_mode": true
"batch_mode": true,
"job_timeout": 60
}
Empty file.