Skip to content

Commit

Permalink
updates to worker for deps
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexPatrie committed Nov 1, 2024
1 parent 2256fa8 commit c4d7199
Show file tree
Hide file tree
Showing 11 changed files with 106 additions and 107 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ smoldyn = "^2.73"
biosimulators-amici = "^0.1.24"
biosimulators-copasi = "^0.2.22"
biosimulators-tellurium = "^0.1.44"
simulariumio = "^1.11.0"


[tool.poetry.group.composition.dependencies]
process-bigraph = "^0.0.21"
simulariumio = "^1.11.0"


[tool.poetry.group.dev.dependencies]
Expand Down
3 changes: 2 additions & 1 deletion worker/.dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,5 @@ model-examples
tests.py
!enter.sh
bio_check/.env
test.ipynb
test.ipynb
config/
2 changes: 1 addition & 1 deletion worker/Dockerfile-worker
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ RUN apt-get update && apt-get install -y libatlas-base-dev \
libxml2 \
libncurses5

COPY environment.worker.yml /tmp/environment.worker.yml
COPY config/environment.worker.yml /tmp/environment.worker.yml
COPY . /app/worker

SHELL ["/usr/bin/env", "bash", "-c"]
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
3 changes: 2 additions & 1 deletion worker/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@

from shared import MongoDbConnector
from log_config import setup_logging
from supervisor import Supervisor, CompositionSupervisor
from supervisor import Supervisor
# from supervisor import CompositionSupervisor


load_dotenv('../assets/dev/.env_dev')
Expand Down
203 changes: 100 additions & 103 deletions worker/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
handle_sbml_exception,
_get_output_stack
)
from bigraph_steps import generate_simularium_file


# -- WORKER: "Toolkit" => Has all of the tooling necessary to process jobs.
Expand Down Expand Up @@ -707,79 +706,79 @@ def compare_arrays(self, arr1: np.ndarray, arr2: np.ndarray, atol=None, rtol=Non
return np.allclose(arr1, arr2, rtol=rTol, atol=aTol)


class CompositionWorker(Worker):
def __init__(self, job):
super().__init__(job=job)

async def _run(self):
# extract params
duration = self.job_params['duration']
composite_doc = self.job_params['composite_doc']

# create emitter for results if not already:
result_emitter_spec = {
'_type': 'step',
"address": "local:ram-emitter",
"config": {
"emit": {
"time": "float",
"floating_species_concentrations": "tree[float]"
}
},
"inputs": {
"time": ["time_store"],
"floating_species_concentrations": ["floating_species_concentrations_store"],
}
}

composite_doc['results'] = composite_doc.get('results', result_emitter_spec)

# instantiate composition
composition = Composite(config=composite_doc, core=PROCESS_TYPES)

# run composition and set results
composition.run(duration)
self.job_result = composition.gather_results()

return self.job_result

async def run(self, conn):
from process_bigraph import Composite
# from biosimulators_processes import CORE
from uuid import uuid4

doc = self.job_params['composite_spec']
process_name = list(doc.keys())[0]
duration = self.job_params['duration']
simulator = self.job_params['simulators'][0]
job_id = self.job_params['job_id']
new_job = {'job_id': job_id}
out_dir = tempfile.mkdtemp()
source_fp = self.job_params['path']
local_fp = download_file(source_blob_path=source_fp, out_dir=out_dir, bucket_name=BUCKET_NAME)

doc[process_name]['config']['model']['model_source'] = local_fp

# make composite
composite = Composite(config={'state': doc}, core=PROCESS_TYPES)

for n in range(duration):
# run composite
composite.run(1)
# get historical results
results = composite.gather_results()
data = results[('emitter',)]
# find job and update data
write_data = conn.db.in_progress_jobs.find_one(new_job)
write_data['data'] = data
# update db
conn.db.in_progress_jobs.update_one(new_job, {'$set': write_data})
from time import sleep
print(f'{n}: sleeping for 10...')
sleep(10)

self.job_result = conn.db.in_progress_jobs.find_one({'job_id': job_id}).get('results', {})
return self.job_result
# class CompositionWorker(Worker):
# def __init__(self, job):
# super().__init__(job=job)
#
# async def _run(self):
# # extract params
# duration = self.job_params['duration']
# composite_doc = self.job_params['composite_doc']
#
# # create emitter for results if not already:
# result_emitter_spec = {
# '_type': 'step',
# "address": "local:ram-emitter",
# "config": {
# "emit": {
# "time": "float",
# "floating_species_concentrations": "tree[float]"
# }
# },
# "inputs": {
# "time": ["time_store"],
# "floating_species_concentrations": ["floating_species_concentrations_store"],
# }
# }
#
# composite_doc['results'] = composite_doc.get('results', result_emitter_spec)
#
# # instantiate composition
# composition = Composite(config=composite_doc, core=PROCESS_TYPES)
#
# # run composition and set results
# composition.run(duration)
# self.job_result = composition.gather_results()
#
# return self.job_result
#
# async def run(self, conn):
# from process_bigraph import Composite
# # from biosimulators_processes import CORE
# from uuid import uuid4
#
# doc = self.job_params['composite_spec']
# process_name = list(doc.keys())[0]
# duration = self.job_params['duration']
# simulator = self.job_params['simulators'][0]
# job_id = self.job_params['job_id']
# new_job = {'job_id': job_id}
# out_dir = tempfile.mkdtemp()
# source_fp = self.job_params['path']
# local_fp = download_file(source_blob_path=source_fp, out_dir=out_dir, bucket_name=BUCKET_NAME)
#
# doc[process_name]['config']['model']['model_source'] = local_fp
#
# # make composite
# composite = Composite(config={'state': doc}, core=PROCESS_TYPES)
#
# for n in range(duration):
# # run composite
# composite.run(1)
# # get historical results
# results = composite.gather_results()
# data = results[('emitter',)]
# # find job and update data
# write_data = conn.db.in_progress_jobs.find_one(new_job)
# write_data['data'] = data
# # update db
# conn.db.in_progress_jobs.update_one(new_job, {'$set': write_data})
# from time import sleep
# print(f'{n}: sleeping for 10...')
# sleep(10)
#
# self.job_result = conn.db.in_progress_jobs.find_one({'job_id': job_id}).get('results', {})
# return self.job_result


class FilesWorker(Worker):
Expand All @@ -798,37 +797,35 @@ async def run(self):
local_input_path = download_file(source_blob_path=input_path, bucket_name=BUCKET_NAME, out_dir=dest)

# case: is a smoldyn output file and thus a simularium job
if local_input_path.endswith('.txt'):
await self._run_simularium(job_id=job_id, input_path=local_input_path, dest=dest)
# if local_input_path.endswith('.txt'):
# await self._run_simularium(job_id=job_id, input_path=local_input_path, dest=dest)
except Exception as e:
self.job_result = {'results': str(e)}

return self.job_result

async def _run_simularium(self, job_id: str, input_path: str, dest: str):
# get parameters from job
box_size = self.job_params['box_size']
translate = self.job_params['translate_output']
validate = self.job_params['validate_output']
params = self.job_params.get('agent_parameters')

# generate file
result = generate_simularium_file(
input_fp=input_path,
dest_dir=dest,
box_size=box_size,
translate_output=translate,
run_validation=validate,
agent_parameters=params
)

# upload file to bucket
results_file = result.get('simularium_file')
uploaded_file_location = None
if results_file is not None:
if not results_file.endswith('.simularium'):
results_file += '.simularium'
uploaded_file_location = await write_uploaded_file(job_id=job_id, bucket_name=BUCKET_NAME, uploaded_file=results_file, extension='.simularium')

# set uploaded file as result
self.job_result = {'results_file': uploaded_file_location}
# async def _run_simularium(self, job_id: str, input_path: str, dest: str):
# from bigraph_steps import generate_simularium_file
# # get parameters from job
# box_size = self.job_params['box_size']
# translate = self.job_params['translate_output']
# validate = self.job_params['validate_output']
# params = self.job_params.get('agent_parameters')
# # generate file
# result = generate_simularium_file(
# input_fp=input_path,
# dest_dir=dest,
# box_size=box_size,
# translate_output=translate,
# run_validation=validate,
# agent_parameters=params
# )
# # upload file to bucket
# results_file = result.get('simularium_file')
# uploaded_file_location = None
# if results_file is not None:
# if not results_file.endswith('.simularium'):
# results_file += '.simularium'
# uploaded_file_location = await write_uploaded_file(job_id=job_id, bucket_name=BUCKET_NAME, uploaded_file=results_file, extension='.simularium')
# # set uploaded file as result
# self.job_result = {'results_file': uploaded_file_location}

0 comments on commit c4d7199

Please sign in to comment.