Skip to content

Commit

Permalink
Merge pull request #3 from ITISFoundation/fix_wait
Browse files Browse the repository at this point in the history
Move wait for input_tasks.json inside loop
  • Loading branch information
wvangeit authored Jun 14, 2024
2 parents f4da9b2 + 549526e commit c0e5e11
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 22 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.0.11
current_version = 0.0.12
commit = False
message = service version: {current_version} → {new_version}
tag = False
Expand Down
2 changes: 1 addition & 1 deletion .osparc/osparc-meta-parallelrunner/metadata.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name: Parallel Runner
description: "ParallelRunnerService"
key: simcore/services/dynamic/osparc-meta-parallelrunner
version: 0.0.11
version: 0.0.12
integration-version: 2.0.0
type: dynamic
authors:
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ SHELL = /bin/sh
MAKEFLAGS += -j2

export DOCKER_IMAGE_NAME ?= osparc-map
export DOCKER_IMAGE_TAG ?= 0.0.11
export DOCKER_IMAGE_TAG ?= 0.0.12

export MASTER_AWS_REGISTRY ?= registry.osparc-master-zmt.click
export MASTER_REGISTRY ?= registry.osparc-master.speag.com
Expand Down
2 changes: 1 addition & 1 deletion docker-compose-local.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
version: '3.7'
services:
osparc-meta-parallelrunner:
image: simcore/services/dynamic/osparc-meta-parallelrunner:0.0.11
image: simcore/services/dynamic/osparc-meta-parallelrunner:0.0.12
ports:
- "8888:8888"
environment:
Expand Down
37 changes: 19 additions & 18 deletions docker_scripts/map.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,14 @@ def create_study_job(template_id, job_inputs, studies_api):
job_inputs=job_inputs,
)
break
except osparc_client.exceptions.ApiException:
except osparc_client.exceptions.ApiException as api_exception:
if n_of_create_attempts >= MAX_JOB_CREATE_ATTEMPTS:
raise Exception(
f"Tried {n_of_create_attempts} to create a job from "
f"Tried {n_of_create_attempts} times to create a job from "
"the study, but failed"
)
else:
logger.exception(api_exception)
logger.info(
"Received an API Exception from server "
"when creating job, retrying..."
Expand Down Expand Up @@ -196,31 +197,31 @@ def start(self):
)
n_of_workers = MAX_N_OF_WORKERS

waiter = 0
while not self.input_tasks_path.exists():
if waiter % 10 == 0:
logger.info(
f"Waiting for input file at {self.input_tasks_path}..."
)
self.handshaker.retry_last_write()
time.sleep(self.polling_interval)
waiter += 1

last_tasks_uuid = ""
waiter = 0
waiter_wrong_uuid = 0
while True:
waiter_input_exists = 0
while not self.input_tasks_path.exists():
if waiter_input_exists % 10 == 0:
logger.info(
f"Waiting for input file at {self.input_tasks_path}..."
)
self.handshaker.retry_last_write()
time.sleep(self.polling_interval)
waiter_input_exists += 1

input_dict = json.loads(self.input_tasks_path.read_text())
command = input_dict["command"]
caller_uuid = input_dict["caller_uuid"]
map_uuid = input_dict["map_uuid"]
if caller_uuid != self.caller_uuid or map_uuid != self.uuid:
if waiter % 10 == 0:
if waiter_wrong_uuid % 10 == 0:
logger.info(
"Received command with wrong caller uuid: "
f"{caller_uuid} or map uuid: {map_uuid}"
)
time.sleep(self.polling_interval)
waiter += 1
waiter_wrong_uuid += 1
continue

if command == "stop":
Expand All @@ -229,10 +230,10 @@ def start(self):
tasks_uuid = input_dict["uuid"]

if tasks_uuid == last_tasks_uuid:
if waiter % 10 == 0:
if waiter_wrong_uuid % 10 == 0:
logger.info("Waiting for new tasks uuid")
time.sleep(self.polling_interval)
waiter += 1
waiter_wrong_uuid += 1
else:
input_tasks = input_dict["tasks"]
output_tasks = self.run_tasks(
Expand All @@ -246,7 +247,7 @@ def start(self):
f"Finished a set of tasks: {output_tasks_content}"
)
last_tasks_uuid = tasks_uuid
waiter = 0
waiter_wrong_uuid = 0
else:
raise ValueError("Command unknown: {command}")

Expand Down

0 comments on commit c0e5e11

Please sign in to comment.