diff --git a/docker_scripts/parallelrunner.py b/docker_scripts/parallelrunner.py index 76365ba..868bbfb 100755 --- a/docker_scripts/parallelrunner.py +++ b/docker_scripts/parallelrunner.py @@ -174,7 +174,7 @@ def run_input_tasks(self, input_tasks, tasks_uuid): ) for output_batch in output_batches: - output_batch_tasks = output_batch['tasks'] + output_batch_tasks = output_batch["tasks"] for output_task_i, output_task in output_batch_tasks: output_tasks[output_task_i] = output_task @@ -189,15 +189,14 @@ def run_input_tasks(self, input_tasks, tasks_uuid): logger.debug(f"Finished a set of tasks: {output_tasks_content}") def batch_input_tasks(self, input_tasks, n_of_batches): - batches = [{'batch_i': None,'tasks':[]} for _ in range(n_of_batches)] + batches = [{"batch_i": None, "tasks": []} for _ in range(n_of_batches)] for task_i, input_task in enumerate(input_tasks): batch_id = task_i % n_of_batches - batches[batch_id]['batch_i'] = batch_id - batches[batch_id]['tasks'].append((task_i, input_task)) + batches[batch_id]["batch_i"] = batch_id + batches[batch_id]["tasks"].append((task_i, input_task)) return batches - def create_job_inputs(self, input): """Create job inputs""" @@ -208,21 +207,21 @@ def create_job_inputs(self, input): param_value = param_input["value"] if param_type == "FileJSON": param_filename = param_input["filename"] - tmp_dir = tempfile.TemporaryDirectory() - tmp_dir_path = pl.Path(tmp_dir.name) - tmp_input_file_path = tmp_dir_path / param_filename - tmp_input_file_path.write_text(json.dumps(param_value)) + with tempfile.TemporaryDirectory() as tmp_dir: + tmp_dir_path = pl.Path(tmp_dir) + tmp_input_file_path = tmp_dir_path / param_filename + tmp_input_file_path.write_text(json.dumps(param_value)) - if self.test_mode: - processed_param_value = f"File json: {param_value}" - else: - logger.info("Calling upload file api for job input") - with self.lock: - input_data_file = osparc.FilesApi( - self.api_client - ).upload_file(file=tmp_input_file_path) - logger.info("File upload for job input done") - processed_param_value = input_data_file + if self.test_mode: + processed_param_value = f"File json: {param_value}" + else: + logger.info("Calling upload file api for job input") + with self.lock: + input_data_file = osparc.FilesApi( + self.api_client + ).upload_file(file=tmp_input_file_path) + logger.info("File upload for job input done") + processed_param_value = input_data_file elif param_type == "file": file_info = json.loads(param_value) if self.test_mode: @@ -247,9 +246,11 @@ def create_job_inputs(self, input): return job_inputs - async def run_job(self, job_inputs, input_batch): + async def run_job(self, task_input, input_batch): """Run a job with given inputs""" + job_inputs = self.create_job_inputs(task_input) + logger.debug(f"Sending inputs: {job_inputs}") if self.test_mode: import datetime @@ -466,7 +467,8 @@ async def async_map_func(batch_with_uuid, trial_number=1): batch_uuid, batch = batch_with_uuid try: logger.info( - "Running worker for a batch of " f"{len(batch)} tasks" + "Running worker for a batch of " + f"{len(batch["tasks"])} tasks" ) logger.debug(f"Running worker for batch: {batch}") self.jobs_file_write_status_change( @@ -476,8 +478,6 @@ async def async_map_func(batch_with_uuid, trial_number=1): task_input = self.transform_batch_to_task_input(batch) - job_inputs = self.create_job_inputs(task_input) - job_timeout = ( self.settings.job_timeout if self.settings.job_timeout > 0 @@ -485,7 +485,7 @@ async def async_map_func(batch_with_uuid, trial_number=1): ) output_batch = await asyncio.wait_for( - self.run_job(job_inputs, batch), timeout=job_timeout + self.run_job(task_input, batch), timeout=job_timeout ) self.jobs_file_write_status_change( @@ -529,6 +529,8 @@ async def async_map_func(batch_with_uuid, trial_number=1): f"Batch {batch} failed with error (" f"{traceback.format_exc()}) in " f"trial {trial_number}, retrying " + f"{self.settings.max_job_trials-trial_number}" + " more times." ) output_batch = map_func( batch_with_uuid, trial_number=trial_number + 1