Skip to content

Commit

Permalink
Use async unsorted map (#17)
Browse files Browse the repository at this point in the history
* Use async unsorted map

* Bump version

* Remove import pprint

* Move timeout around upload of input files
  • Loading branch information
wvangeit authored Dec 23, 2024
1 parent 4c91a70 commit fa9300c
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 48 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 1.1.10
current_version = 1.2.0
commit = False
message = service version: {current_version} → {new_version}
tag = False
Expand Down
2 changes: 1 addition & 1 deletion .osparc/osparc-meta-parallelrunner/metadata.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name: ParallelRunner
description: "ParallelRunnerService"
key: simcore/services/dynamic/osparc-meta-parallelrunner
version: 1.1.10
version: 1.2.0
integration-version: 2.0.0
type: dynamic
authors:
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ SHELL = /bin/sh
MAKEFLAGS += -j2

export DOCKER_IMAGE_NAME ?= osparc-meta-parallelrunner
export DOCKER_IMAGE_TAG ?= 1.1.10
export DOCKER_IMAGE_TAG ?= 1.2.0

export MASTER_AWS_REGISTRY ?= registry.osparc-master-zmt.click
export MASTER_REGISTRY ?= registry.osparc-master.speag.com
Expand Down
2 changes: 1 addition & 1 deletion docker-compose-local.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
services:
osparc-meta-parallelrunner:
image: simcore/services/dynamic/osparc-meta-parallelrunner:1.1.10
image: simcore/services/dynamic/osparc-meta-parallelrunner:1.2.0
ports:
- "8888:8888"
environment:
Expand Down
104 changes: 60 additions & 44 deletions docker_scripts/parallelrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,36 +161,42 @@ def run_input_tasks(self, input_tasks, tasks_uuid):

input_batches = self.batch_input_tasks(input_tasks, n_of_batches)

output_tasks = input_tasks.copy()
for output_task in output_tasks:
output_task["status"] = "SUBMITTED"
output_tasks_content = json.dumps(
{"uuid": tasks_uuid, "tasks": output_tasks}
)
self.output_tasks_path.write_text(output_tasks_content)

output_batches = self.run_batches(
tasks_uuid, input_batches, number_of_workers
)

output_tasks = self.unbatch_output_tasks(output_batches)
for output_batch in output_batches:
output_batch_tasks = output_batch["tasks"]

output_tasks_content = json.dumps(
{"uuid": tasks_uuid, "tasks": output_tasks}
)
self.output_tasks_path.write_text(output_tasks_content)
for output_task_i, output_task in output_batch_tasks:
output_tasks[output_task_i] = output_task
# logging.info(output_task["status"])

output_tasks_content = json.dumps(
{"uuid": tasks_uuid, "tasks": output_tasks}
)
self.output_tasks_path.write_text(output_tasks_content)
logger.info(f"Finished a batch of {len(output_batch_tasks)} tasks")
logger.info(f"Finished a set of {len(output_tasks)} tasks")
logger.debug(f"Finished a set of tasks: {output_tasks_content}")

def batch_input_tasks(self, input_tasks, n_of_batches):
batches = [[] for _ in range(n_of_batches)]
batches = [{"batch_i": None, "tasks": []} for _ in range(n_of_batches)]

for task_i, input_task in enumerate(input_tasks):
batch_id = task_i % n_of_batches
batches[batch_id].append(input_task)
batches[batch_id]["batch_i"] = batch_id
batches[batch_id]["tasks"].append((task_i, input_task))
return batches

def unbatch_output_tasks(self, batches):
output_tasks = []
n_of_tasks = sum(len(batch) for batch in batches)

for task_i in range(n_of_tasks):
batch_id = task_i % len(batches)
output_tasks.append(batches[batch_id].pop(0))
return output_tasks

def create_job_inputs(self, input):
"""Create job inputs"""

Expand All @@ -201,21 +207,21 @@ def create_job_inputs(self, input):
param_value = param_input["value"]
if param_type == "FileJSON":
param_filename = param_input["filename"]
tmp_dir = tempfile.TemporaryDirectory()
tmp_dir_path = pl.Path(tmp_dir.name)
tmp_input_file_path = tmp_dir_path / param_filename
tmp_input_file_path.write_text(json.dumps(param_value))
with tempfile.TemporaryDirectory() as tmp_dir:
tmp_dir_path = pl.Path(tmp_dir)
tmp_input_file_path = tmp_dir_path / param_filename
tmp_input_file_path.write_text(json.dumps(param_value))

if self.test_mode:
processed_param_value = f"File json: {param_value}"
else:
logger.info("Calling upload file api for job input")
with self.lock:
input_data_file = osparc.FilesApi(
self.api_client
).upload_file(file=tmp_input_file_path)
logger.info("File upload for job input done")
processed_param_value = input_data_file
if self.test_mode:
processed_param_value = f"File json: {param_value}"
else:
logger.info("Calling upload file api for job input")
with self.lock:
input_data_file = osparc.FilesApi(
self.api_client
).upload_file(file=tmp_input_file_path)
logger.info("File upload for job input done")
processed_param_value = input_data_file
elif param_type == "file":
file_info = json.loads(param_value)
if self.test_mode:
Expand All @@ -240,9 +246,11 @@ def create_job_inputs(self, input):

return job_inputs

async def run_job(self, job_inputs, input_batch):
async def run_job(self, task_input, input_batch):
"""Run a job with given inputs"""

job_inputs = self.create_job_inputs(task_input)

logger.debug(f"Sending inputs: {job_inputs}")
if self.test_mode:
import datetime
Expand Down Expand Up @@ -296,10 +304,12 @@ async def run_job(self, job_inputs, input_batch):
def process_job_outputs(self, results, batch, status):
if self.settings.template_id == "TEST_UUID":
logger.info("Map in test mode, just returning input")
for task_i, task in batch["tasks"]:
task["status"] = "SUCCESS"

return batch

for task_i, task in enumerate(batch):
for task_i, task in batch["tasks"]:
output = task["output"]
task["status"] = status
for probe_name, probe_output in results.items():
Expand Down Expand Up @@ -358,7 +368,7 @@ def process_job_outputs(self, results, batch, status):

def transform_batch_to_task_input(self, batch):
task_input = {}
for task in batch:
for task_i, task in batch["tasks"]:
input = task["input"]
for param_name, param_input in input.items():
param_type = param_input["type"]
Expand Down Expand Up @@ -449,11 +459,16 @@ def run_batches(self, tasks_uuid, input_batches, number_of_workers):
def map_func(batch_with_uuid, trial_number=1):
return asyncio.run(async_map_func(batch_with_uuid, trial_number))

def set_batch_status(batch, message):
for task_i, task in batch["tasks"]:
task["status"] = "FAILURE"

async def async_map_func(batch_with_uuid, trial_number=1):
batch_uuid, batch = batch_with_uuid
try:
logger.info(
"Running worker for a batch of " f"{len(batch)} tasks"
"Running worker for a batch of "
f"{len(batch["tasks"])} tasks"
)
logger.debug(f"Running worker for batch: {batch}")
self.jobs_file_write_status_change(
Expand All @@ -463,16 +478,14 @@ async def async_map_func(batch_with_uuid, trial_number=1):

task_input = self.transform_batch_to_task_input(batch)

job_inputs = self.create_job_inputs(task_input)

job_timeout = (
self.settings.job_timeout
if self.settings.job_timeout > 0
else None
)

output_batch = await asyncio.wait_for(
self.run_job(job_inputs, batch), timeout=job_timeout
self.run_job(task_input, batch), timeout=job_timeout
)

self.jobs_file_write_status_change(
Expand All @@ -489,32 +502,35 @@ async def async_map_func(batch_with_uuid, trial_number=1):
except ParallelRunner.FatalException as error:
logger.info(
f"Batch {batch} failed with fatal error ({error}) in "
f"trial {trial_number}, not retrying, raising error"
f"trial {trial_number}, not retrying"
)
self.jobs_file_write_status_change(
id=batch_uuid,
status="failed",
)

raise error
except Exception as error:
set_batch_status(batch, "FAILURE")
# raise error
except Exception:
if trial_number >= self.settings.max_job_trials:
logger.info(
f"Batch {batch} failed with error ("
f"{traceback.format_exc()}) in "
f"trial {trial_number}, reach max number of trials of "
f"{self.settings.max_job_trials}, not retrying, raising error"
f"{self.settings.max_job_trials}, not retrying"
)
self.jobs_file_write_status_change(
id=batch_uuid,
status="failed",
)
raise error
set_batch_status(batch, "FAILURE")
# raise error
else:
logger.info(
f"Batch {batch} failed with error ("
f"{traceback.format_exc()}) in "
f"trial {trial_number}, retrying "
f"{self.settings.max_job_trials-trial_number}"
" more times."
)
output_batch = map_func(
batch_with_uuid, trial_number=trial_number + 1
Expand All @@ -540,7 +556,7 @@ async def async_map_func(batch_with_uuid, trial_number=1):

with pathos.pools.ThreadPool(nodes=number_of_workers) as pool:
pool.restart()
output_tasks = list(pool.map(map_func, input_batches_with_uuid))
output_tasks = pool.uimap(map_func, input_batches_with_uuid)
pool.close()
pool.join()
pool.clear() # Pool is singleton, need to clear old pool
Expand Down

0 comments on commit fa9300c

Please sign in to comment.