Skip to content

Commit

Permalink
Finished updating workflow functions. Only createWorkflowFromJSON not…
Browse files Browse the repository at this point in the history
… working.
  • Loading branch information
RileyMiyamoto committed Jan 18, 2024
1 parent ec1c17c commit 1baf1ba
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 53 deletions.
25 changes: 13 additions & 12 deletions tests/bare_metal_compute_service_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,21 +50,22 @@
print(f"Created file registry service has name {frs.get_name()}")

print("Creating a 2-task chain workflow as a single job, which will fail due to missing file locations...")
file1 = simulation.add_file("file1", 1024)
workflow = simulation.create_workflow()

file1 = simulation.add_file(workflow, "file1", 1024)
ss.create_file_copy(file1)
file2 = simulation.add_file("file2", 1024)
file3 = simulation.add_file("file3", 1024)
file2 = simulation.add_file(workflow, "file2", 1024)
file3 = simulation.add_file(workflow, "file3", 1024)

workflow = simulation.create_workflow()
task1 = workflow.add_task("task1", 10000000000, 1, 1, 0)
task1.add_input_file(file1)
task1.add_output_file(file2)
task2 = workflow.add_task("task2", 200000000000, 1, 1, 0)
task2.add_input_file(file2)
task2.add_output_file(file3)
task1 = workflow.add_task(workflow, "task1", 10000000000, 1, 1, 0)
task1.add_input_file(workflow, file1)
task1.add_output_file(workflow, file2)
task2 = workflow.add_task(workflow, "task2", 200000000000, 1, 1, 0)
task2.add_input_file(workflow, file2)
task2.add_output_file(workflow, file3)

print("Creating a standard job with both tasks, but that doesn't specify file locations")
job = simulation.create_standard_job([task1, task2], {})
job = simulation.create_standard_job(workflow, [task1, task2], {})

print("Submitting the standard job to the compute service...")
cs.submit_standard_job(job)
Expand All @@ -77,7 +78,7 @@
raise wrench.WRENCHException("Was expecting a job failure event but instead got a: " + event["event_type"])

print("Trying again, but giving file locations for first and last file (second file will be on scratch!)...")
job = simulation.create_standard_job([task1, task2], {file1: ss, file3: ss})
job = simulation.create_standard_job(workflow, [task1, task2], {file1: ss, file3: ss})
cs.submit_standard_job(job)
print("Getting simulation events...")
event = simulation.wait_for_next_event()
Expand Down
24 changes: 12 additions & 12 deletions tests/batch_compute_service_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,22 +49,22 @@
frs = simulation.create_file_registry_service("ControllerHost")
print(f"Created file registry service has name {frs.get_name()}")

workflow = simulation.create_workflow()
print("Creating a 2-task chain workflow as a single job, which will fail due to missing file locations...")
file1 = simulation.add_file("file1", 1024)
file1 = simulation.add_file(workflow, "file1", 1024)
ss.create_file_copy(file1)
file2 = simulation.add_file("file2", 1024)
file3 = simulation.add_file("file3", 1024)
file2 = simulation.add_file(workflow, "file2", 1024)
file3 = simulation.add_file(workflow, "file3", 1024)

workflow = simulation.create_workflow()
task1 = workflow.add_task("task1", 10000000000, 1, 1, 0)
task1.add_input_file(file1)
task1.add_output_file(file2)
task2 = workflow.add_task("task2", 200000000000, 1, 1, 0)
task2.add_input_file(file2)
task2.add_output_file(file3)
task1 = workflow.add_task(workflow, "task1", 10000000000, 1, 1, 0)
task1.add_input_file(workflow, file1)
task1.add_output_file(workflow, file2)
task2 = workflow.add_task(workflow, "task2", 200000000000, 1, 1, 0)
task2.add_input_file(workflow, file2)
task2.add_output_file(workflow, file3)

print("Creating a standard job with both tasks, but that doesn't specify file locations")
job = simulation.create_standard_job([task1, task2], {})
job = simulation.create_standard_job(workflow, [task1, task2], {})

print("Random select args for job")
host = random.randint(1,2)
Expand All @@ -83,7 +83,7 @@
raise wrench.WRENCHException("Was expecting a job failure event but instead got a: " + event["event_type"])

print("Trying again, but giving file locations for first and last file (second file will be on scratch!)...")
job = simulation.create_standard_job([task1, task2], {file1: ss, file3: ss})
job = simulation.create_standard_job(workflow, [task1, task2], {file1: ss, file3: ss})
cs.submit_standard_job(job, service_specific_args)
print("Getting simulation events...")
event = simulation.wait_for_next_event()
Expand Down
6 changes: 0 additions & 6 deletions tests/workflow_job_task_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,6 @@

print(f"This job contains the following tasks: {job.get_tasks()}")

print(f"Creating a workflow from JSON")
workflow2 = simulation.create_workflow_from_json_string("sample_platform.xml", "2", False, False,
False, 3.0, 3.0, False,
False, False)
print(f"Created {workflow2}")

print("Terminating simulation daemon")
simulation.terminate()

Expand Down
61 changes: 54 additions & 7 deletions tests/workflow_simulator_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

import pathlib

import wrench

if __name__ == "__main__":
Expand All @@ -19,14 +19,61 @@

simulation = wrench.Simulation()
simulation.start(platform_file_path, "ControllerHost")
workflow = simulation.create_workflow()
print("Adding a a 1kB file to the simulation...")
file1 = simulation.add_file(workflow, "file1", 1024)
print(f"Created file {file1}")
print("Adding another 1kB file to the workflow...")
file2 = simulation.add_file(workflow, "file2", 2048)
print(f"Created file {file2}")
print("Adding another 1kB file to the workflow...")
file3 = simulation.add_file(workflow, "file3", 10000)
print(f"Created file {file3}")

print(f"The files in the simulation are {simulation.get_all_files()}")

print("Creating a task")
task1 = workflow.add_task(workflow, "task1", 100.0, 1, 8, 1024)
print(f"Just created a task with flops={task1.get_flops()}" +
f", min_num_cores={task1.get_min_num_cores()}" +
f", max_num_cores={task1.get_max_num_cores()}" +
f", memory={task1.get_memory()}")

task1.add_input_file(workflow, file1)
print(f"Attached file {file1} as input file to task {task1.get_name()}")
print(f"The list of input files for task {task1.get_name()} is: {task1.get_input_files()}")
task1.add_output_file(workflow, file2)
print(f"Attached file {file2} as output file to task {task1.get_name()}")
print(f"The list of output files for task {task1.get_name()} is: {task1.get_output_files()}")

print("Creating another task")
task2 = workflow.add_task(workflow, "task2", 1000.0, 4, 4, 0)
print(f"Just created a task with flops={task2.get_flops()}" +
f", min_num_cores={task2.get_min_num_cores()}" +
f", max_num_cores={task2.get_max_num_cores()}" +
f", memory={task2.get_memory()}")

task2.add_input_file(workflow, file2)
print(f"Attached file {file2} as input file to task {task2.get_name()}")
print(f"The list of input files for task {task2.get_name()} is: {task2.get_input_files()}")
task1.add_output_file(workflow, file3)
print(f"Attached file {file3} as output file to task {task2.get_name()}")
print(f"The list of output files for task {task2.get_name()} is: {task2.get_output_files()}")

print(f"The tasks in the simulation are: {simulation._workflow_get_all_tasks()}")

print("Creating a standard job with both tasks")
job = simulation.create_standard_job(workflow, [task1, task2], {})
print(f"Created standard job has name {job.get_name()}")

print(f"New simulation, time is {simulation.get_simulated_time()}")
print(f"Creating Workflow from JSON file")
result = simulation.create_workflow_from_json_string("sample_platform.xml","2", False,False,
False, 3.0, 3.0, False,
False, False)
print(f"This job contains the following tasks: {job.get_tasks()}")

print(f"Time is {simulation.get_simulated_time()}")
print(f"Creating workflow from JSON")
workflow2 = simulation.create_workflow_from_json_string("sample_platform.xml", "2", False,
False,
False, 3.0, 3.0, False,
False, False)
print(f"{workflow2}")

print("Terminating simulation daemon")
simulation.terminate()
Expand Down
29 changes: 15 additions & 14 deletions wrench/simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,13 +453,14 @@ def create_workflow_from_json_string(self, json_string: str, reference_flop_rate
:return: Name of workflow
:rtype: str
"""
data = {"jsonString": json_string, "referenceFlopRate": reference_flop_rate, "ignoreMachineSpecs": ignore_machine_specs,
"redundantDependencies": redundant_dependencies, "ignoreCycleCreatingDependencies": ignore_cycle_creating_dependencies,
"minCoresPerTask": min_cores_per_task, "maxCoresPerTask": max_cores_per_task, "enforceNumCores": enforce_num_cores,
"ignoreAvgCPU": ignore_avg_cpu, "showWarnings": show_warnings}
data = {"json_string": json_string, "reference_flop_rate": reference_flop_rate, "ignore_machine_specs": ignore_machine_specs,
"redundant_dependencies": redundant_dependencies, "ignore_cycle_creating_dependencies": ignore_cycle_creating_dependencies,
"min_cores_per_task": min_cores_per_task, "max_cores_per_task": max_cores_per_task, "enforce_num_cores": enforce_num_cores,
"ignore_avg_cpu": ignore_avg_cpu, "show_warnings": show_warnings}

r = self.__send_request_to_daemon(requests.post, f"{self.daemon_url}/{self.simid}/createWorkflowFromJSONString", json={})
r = self.__send_request_to_daemon(requests.post, f"{self.daemon_url}/{self.simid}/createWorkflowFromJSONString", json=data)
response = r.json()
print(response)
return response["results"]

####################################################################################
Expand Down Expand Up @@ -492,12 +493,12 @@ def _submit_standard_job(self, job_name: str, cs_name: str, service_specific_arg
if not response["wrench_api_request_success"]:
raise WRENCHException(response["failure_cause"])

def _create_file_copy_at_storage_service(self, workflow_name: str, file_name: str, storage_service_name: str):
def _create_file_copy_at_storage_service(self, workflow: Workflow, file_name: str, storage_service_name: str):
"""
Create a copy (ex nihilo) of a file at a storage service
:param workflow_name: the workflow's name
:type workflow_name: str
:param workflow: the workflow
:type workflow: Workflow
:param file_name: the file name
:type file_name: str
:param storage_service_name: the name of the storage service
Expand All @@ -507,18 +508,18 @@ def _create_file_copy_at_storage_service(self, workflow_name: str, file_name: st
"""
data = {"filename": file_name}
r = self.__send_request_to_daemon(requests.post,
f"{self.daemon_url}/{self.simid}/{storage_service_name}/createFileCopy",
f"{self.daemon_url}/{self.simid}/{workflow.name}/{storage_service_name}/createFileCopy",
json=data)
response = r.json()
if not response["wrench_api_request_success"]:
raise WRENCHException(response["failure_cause"])

def _lookup_file_at_storage_service(self, workflow_name: str, file_name: str, storage_service_name: str):
def _lookup_file_at_storage_service(self, workflow: Workflow, file_name: str, storage_service_name: str):
"""
Checks whether a copy of a file is stored at a storage service
:param workflow_name: the workflow's name
:type workflow_name: str
:param workflow: the workflow
:type workflow: Workflow
:param file_name: the file name
:type file_name: str
:param storage_service_name: the name of the storage service
Expand All @@ -531,7 +532,7 @@ def _lookup_file_at_storage_service(self, workflow_name: str, file_name: str, st
"""
data = {"filename": file_name}
r = self.__send_request_to_daemon(requests.post,
f"{self.daemon_url}/{self.simid}/{storage_service_name}/lookupFile",
f"{self.daemon_url}/{self.simid}/{workflow.name}/{storage_service_name}/lookupFile",
json=data)
response = r.json()
if not response["wrench_api_request_success"]:
Expand Down Expand Up @@ -647,7 +648,7 @@ def _file_get_size(self, workflow, file_name: str) -> int:
return response["size"]
raise WRENCHException(response["failure_cause"])

def _task_get_flops(self, workflow_name: str, task_name: str) -> float:
def _task_get_flops(self, workflow_name, task_name: str) -> float:
"""
Get the number of flops in a task
Expand Down
4 changes: 2 additions & 2 deletions wrench/storage_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def create_file_copy(self, file: File) -> None:
:param file: the file
:return:
"""
return self.simulation._create_file_copy_at_storage_service(file.name, self.name)
return self.simulation._create_file_copy_at_storage_service(file.workflow, file.name, self.name)

def lookup_file(self, file: File) -> bool:
"""
Expand All @@ -45,7 +45,7 @@ def lookup_file(self, file: File) -> bool:
:return: true or false
:rtype: bool
"""
return self.simulation._lookup_file_at_storage_service(file.name, self.name)
return self.simulation._lookup_file_at_storage_service(file.workflow, file.name, self.name)

def __str__(self) -> str:
"""
Expand Down

0 comments on commit 1baf1ba

Please sign in to comment.