From 9195e4fb2ca74c9f3aee03355fb8826553b981cb Mon Sep 17 00:00:00 2001 From: Andrii Ieroshenko Date: Tue, 10 Sep 2024 19:50:39 -0700 Subject: [PATCH] make execute_workflow a flow --- jupyter_scheduler/executors.py | 34 ++++++++++++++++++++++++---------- jupyter_scheduler/scheduler.py | 2 ++ jupyter_scheduler/workflows.py | 11 +++++++++++ 3 files changed, 37 insertions(+), 10 deletions(-) diff --git a/jupyter_scheduler/executors.py b/jupyter_scheduler/executors.py index e6a290cd..b7d6ac7f 100644 --- a/jupyter_scheduler/executors.py +++ b/jupyter_scheduler/executors.py @@ -19,7 +19,7 @@ from jupyter_scheduler.orm import Job, Workflow, create_session from jupyter_scheduler.parameterize import add_parameters from jupyter_scheduler.utils import get_utc_timestamp -from jupyter_scheduler.workflows import DescribeWorkflow +from jupyter_scheduler.workflows import DescribeTask, DescribeWorkflow class ExecutionManager(ABC): @@ -188,26 +188,40 @@ class DefaultExecutionManager(ExecutionManager): """Default execution manager that executes notebooks""" @task(task_run_name="{task_id}") - def execute_task(task_id: str): + def execute_task(self, task_id: str): print(f"Task {task_id} executed") return task_id - @flow(task_runner=DaskTaskRunner()) + @task + def get_task_data(self, task_ids: List[str] = []): + # TODO: get orm objects from Task table of the db, create DescribeTask for each + tasks_data_obj = [ + {"id": "task0", "dependsOn": ["task3"]}, + {"id": "task4", "dependsOn": ["task0", "task1", "task2", "task3"]}, + {"id": "task1", "dependsOn": []}, + {"id": "task2", "dependsOn": ["task1"]}, + {"id": "task3", "dependsOn": ["task1", "task2"]}, + ] + + return tasks_data_obj + + @flow() def execute_workflow(self): - workflow: DescribeWorkflow = self.model - tasks = {task["id"]: task for task in workflow.tasks} + + tasks_info = self.get_task_data() + tasks = {task["id"]: task for task in tasks_info} # create Prefect tasks, use caching to ensure Prefect tasks are created before wait_for is called on them @lru_cache(maxsize=None) - def make_task(task_id, execute_task): + def make_task(task_id): deps = tasks[task_id]["dependsOn"] - return execute_task.submit( - task_id, wait_for=[make_task(dep_id, execute_task) for dep_id in deps] + return self.execute_task.submit( + task_id, wait_for=[make_task(dep_id) for dep_id in deps] ) - final_tasks = [make_task(task_id, self.execute_task) for task_id in tasks] + final_tasks = [make_task(task_id) for task_id in tasks] for future in as_completed(final_tasks): - print(future.result()) + future.result() def execute(self): job = self.model diff --git a/jupyter_scheduler/scheduler.py b/jupyter_scheduler/scheduler.py index ee56c9bc..18aacc5a 100644 --- a/jupyter_scheduler/scheduler.py +++ b/jupyter_scheduler/scheduler.py @@ -541,6 +541,8 @@ def create_workflow(self, model: CreateWorkflow) -> str: session.commit() execution_manager = self.execution_manager_class( + job_id="123", + staging_paths=dict(), workflow_id=workflow.workflow_id, root_dir=self.root_dir, db_url=self.db_url, diff --git a/jupyter_scheduler/workflows.py b/jupyter_scheduler/workflows.py index 57926f09..40cfb2f7 100644 --- a/jupyter_scheduler/workflows.py +++ b/jupyter_scheduler/workflows.py @@ -55,3 +55,14 @@ class DescribeWorkflow(BaseModel): workflow_id: str tasks: List[str] = None status: Status = Status.CREATED + + class Config: + orm_mode = True + + +class DescribeTask(BaseModel): + dependsOn: List[str] = [] + status: Status = Status.CREATED + + class Config: + orm_mode = True