Skip to content

Commit

Permalink
Move timeout around upload of input files
Browse files Browse the repository at this point in the history
  • Loading branch information
wvangeit committed Dec 23, 2024
1 parent 88ec502 commit c941192
Showing 1 changed file with 26 additions and 24 deletions.
50 changes: 26 additions & 24 deletions docker_scripts/parallelrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ def run_input_tasks(self, input_tasks, tasks_uuid):
)

for output_batch in output_batches:
output_batch_tasks = output_batch['tasks']
output_batch_tasks = output_batch["tasks"]

for output_task_i, output_task in output_batch_tasks:
output_tasks[output_task_i] = output_task
Expand All @@ -189,15 +189,14 @@ def run_input_tasks(self, input_tasks, tasks_uuid):
logger.debug(f"Finished a set of tasks: {output_tasks_content}")

def batch_input_tasks(self, input_tasks, n_of_batches):
batches = [{'batch_i': None,'tasks':[]} for _ in range(n_of_batches)]
batches = [{"batch_i": None, "tasks": []} for _ in range(n_of_batches)]

for task_i, input_task in enumerate(input_tasks):
batch_id = task_i % n_of_batches
batches[batch_id]['batch_i'] = batch_id
batches[batch_id]['tasks'].append((task_i, input_task))
batches[batch_id]["batch_i"] = batch_id
batches[batch_id]["tasks"].append((task_i, input_task))
return batches


def create_job_inputs(self, input):
"""Create job inputs"""

Expand All @@ -208,21 +207,21 @@ def create_job_inputs(self, input):
param_value = param_input["value"]
if param_type == "FileJSON":
param_filename = param_input["filename"]
tmp_dir = tempfile.TemporaryDirectory()
tmp_dir_path = pl.Path(tmp_dir.name)
tmp_input_file_path = tmp_dir_path / param_filename
tmp_input_file_path.write_text(json.dumps(param_value))
with tempfile.TemporaryDirectory() as tmp_dir:
tmp_dir_path = pl.Path(tmp_dir)
tmp_input_file_path = tmp_dir_path / param_filename
tmp_input_file_path.write_text(json.dumps(param_value))

if self.test_mode:
processed_param_value = f"File json: {param_value}"
else:
logger.info("Calling upload file api for job input")
with self.lock:
input_data_file = osparc.FilesApi(
self.api_client
).upload_file(file=tmp_input_file_path)
logger.info("File upload for job input done")
processed_param_value = input_data_file
if self.test_mode:
processed_param_value = f"File json: {param_value}"
else:
logger.info("Calling upload file api for job input")
with self.lock:
input_data_file = osparc.FilesApi(
self.api_client
).upload_file(file=tmp_input_file_path)
logger.info("File upload for job input done")
processed_param_value = input_data_file
elif param_type == "file":
file_info = json.loads(param_value)
if self.test_mode:
Expand All @@ -247,9 +246,11 @@ def create_job_inputs(self, input):

return job_inputs

async def run_job(self, job_inputs, input_batch):
async def run_job(self, task_input, input_batch):
"""Run a job with given inputs"""

job_inputs = self.create_job_inputs(task_input)

logger.debug(f"Sending inputs: {job_inputs}")
if self.test_mode:
import datetime
Expand Down Expand Up @@ -466,7 +467,8 @@ async def async_map_func(batch_with_uuid, trial_number=1):
batch_uuid, batch = batch_with_uuid
try:
logger.info(
"Running worker for a batch of " f"{len(batch)} tasks"
"Running worker for a batch of "
f"{len(batch["tasks"])} tasks"
)
logger.debug(f"Running worker for batch: {batch}")
self.jobs_file_write_status_change(
Expand All @@ -476,16 +478,14 @@ async def async_map_func(batch_with_uuid, trial_number=1):

task_input = self.transform_batch_to_task_input(batch)

job_inputs = self.create_job_inputs(task_input)

job_timeout = (
self.settings.job_timeout
if self.settings.job_timeout > 0
else None
)

output_batch = await asyncio.wait_for(
self.run_job(job_inputs, batch), timeout=job_timeout
self.run_job(task_input, batch), timeout=job_timeout
)

self.jobs_file_write_status_change(
Expand Down Expand Up @@ -529,6 +529,8 @@ async def async_map_func(batch_with_uuid, trial_number=1):
f"Batch {batch} failed with error ("
f"{traceback.format_exc()}) in "
f"trial {trial_number}, retrying "
f"{self.settings.max_job_trials-trial_number}"
" more times."
)
output_batch = map_func(
batch_with_uuid, trial_number=trial_number + 1
Expand Down

0 comments on commit c941192

Please sign in to comment.