Skip to content

Commit

Permalink
make execute_workflow a flow
Browse files Browse the repository at this point in the history
  • Loading branch information
andrii-i committed Sep 11, 2024
1 parent 6c9de64 commit 9195e4f
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 10 deletions.
34 changes: 24 additions & 10 deletions jupyter_scheduler/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from jupyter_scheduler.orm import Job, Workflow, create_session
from jupyter_scheduler.parameterize import add_parameters
from jupyter_scheduler.utils import get_utc_timestamp
from jupyter_scheduler.workflows import DescribeWorkflow
from jupyter_scheduler.workflows import DescribeTask, DescribeWorkflow


class ExecutionManager(ABC):
Expand Down Expand Up @@ -188,26 +188,40 @@ class DefaultExecutionManager(ExecutionManager):
"""Default execution manager that executes notebooks"""

@task(task_run_name="{task_id}")
def execute_task(task_id: str):
def execute_task(self, task_id: str):
print(f"Task {task_id} executed")
return task_id

@flow(task_runner=DaskTaskRunner())
@task
def get_task_data(self, task_ids: List[str] = []):
# TODO: get orm objects from Task table of the db, create DescribeTask for each
tasks_data_obj = [
{"id": "task0", "dependsOn": ["task3"]},
{"id": "task4", "dependsOn": ["task0", "task1", "task2", "task3"]},
{"id": "task1", "dependsOn": []},
{"id": "task2", "dependsOn": ["task1"]},
{"id": "task3", "dependsOn": ["task1", "task2"]},
]

return tasks_data_obj

@flow()
def execute_workflow(self):
workflow: DescribeWorkflow = self.model
tasks = {task["id"]: task for task in workflow.tasks}

tasks_info = self.get_task_data()
tasks = {task["id"]: task for task in tasks_info}

# create Prefect tasks, use caching to ensure Prefect tasks are created before wait_for is called on them
@lru_cache(maxsize=None)
def make_task(task_id, execute_task):
def make_task(task_id):
deps = tasks[task_id]["dependsOn"]
return execute_task.submit(
task_id, wait_for=[make_task(dep_id, execute_task) for dep_id in deps]
return self.execute_task.submit(
task_id, wait_for=[make_task(dep_id) for dep_id in deps]
)

final_tasks = [make_task(task_id, self.execute_task) for task_id in tasks]
final_tasks = [make_task(task_id) for task_id in tasks]
for future in as_completed(final_tasks):
print(future.result())
future.result()

def execute(self):
job = self.model
Expand Down
2 changes: 2 additions & 0 deletions jupyter_scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,8 @@ def create_workflow(self, model: CreateWorkflow) -> str:
session.commit()

execution_manager = self.execution_manager_class(
job_id="123",
staging_paths=dict(),
workflow_id=workflow.workflow_id,
root_dir=self.root_dir,
db_url=self.db_url,
Expand Down
11 changes: 11 additions & 0 deletions jupyter_scheduler/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,14 @@ class DescribeWorkflow(BaseModel):
workflow_id: str
tasks: List[str] = None
status: Status = Status.CREATED

class Config:
orm_mode = True


class DescribeTask(BaseModel):
dependsOn: List[str] = []
status: Status = Status.CREATED

class Config:
orm_mode = True

0 comments on commit 9195e4f

Please sign in to comment.