Skip to content

Commit

Permalink
Put file download inside job context manager
Browse files Browse the repository at this point in the history
  • Loading branch information
wvangeit committed Jul 11, 2024
1 parent 3ea7622 commit 6453be1
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 12 deletions.
1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ publish-master: ## push to local throw away registry to test integration
publish-staging: ## push to local throw away registry to test integration
docker tag simcore/services/dynamic/${DOCKER_IMAGE_NAME}:${DOCKER_IMAGE_TAG} $(STAGING_REGISTRY)/simcore/services/dynamic/$(DOCKER_IMAGE_NAME):$(DOCKER_IMAGE_TAG)
docker push $(STAGING_REGISTRY)/simcore/services/dynamic/$(DOCKER_IMAGE_NAME):$(DOCKER_IMAGE_TAG)
@curl $(STAGING_REGISTRY)/v2/_catalog | jq

.PHONY: publish-master-aws
publish-master-aws: ## push to local throw away registry to test integration
Expand Down
29 changes: 18 additions & 11 deletions docker_scripts/parallelrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,15 +288,19 @@ def create_job_inputs(self, input):

return job_inputs

def run_job(self, job_inputs):
def run_job(self, job_inputs, input_batch):
"""Run a job with given inputs"""

logger.debug(f"Sending inputs: {job_inputs}")

if self.test_mode:
logger.info("Map in test mode, just returning input")

return job_inputs, "SUCCESS"
done_batch = self.process_job_outputs(
job_inputs, input_batch, "SUCCESS"
)

return done_batch

with self.create_study_job(
self.template_id, job_inputs, self.studies_api
Expand All @@ -313,8 +317,6 @@ def run_job(self, job_inputs):
)
time.sleep(1)

status = job_status.state

if job_status.state == "FAILED":
logger.error(f"Batch failed: {job_inputs}")
raise Exception("Job returned a failed status")
Expand All @@ -323,7 +325,11 @@ def run_job(self, job_inputs):
study_id=self.template_id, job_id=job.id
).results

return job_outputs, status
done_batch = self.process_job_outputs(
job_outputs, input_batch, job_status.state
)

return done_batch

def process_job_outputs(self, results, batch, status):
if self.template_id == "TEST_UUID":
Expand Down Expand Up @@ -432,9 +438,7 @@ def map_func(batch, trial_number=1):

job_inputs = self.create_job_inputs(task_input)

job_outputs, status = self.run_job(job_inputs)

batch = self.process_job_outputs(job_outputs, batch, status)
output_batch = self.run_job(job_inputs, batch)

self.n_of_finished_batches += 1
logger.info(
Expand All @@ -451,17 +455,20 @@ def map_func(batch, trial_number=1):
if trial_number >= self.max_trials:
logger.info(
f"Batch {batch} failed with error ({error}) in "
f"trial {trial_number}, not retrying, raising error"
f"trial {trial_number}, reach max number of trials of "
f"{self.max_trials}, not retrying, raising error"
)
raise error
else:
logger.info(
f"Batch {batch} failed with error ({error}) in "
f"trial {trial_number}, retrying "
)
batch = map_func(batch, trial_number=trial_number + 1)
output_batch = map_func(
batch, trial_number=trial_number + 1
)

return batch
return output_batch

logger.info(
f"Starting {len(input_batches)} batches on {n_of_workers} workers"
Expand Down

0 comments on commit 6453be1

Please sign in to comment.