Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Prefect 3 instead of multiprocessing module #539

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions jupyter_scheduler/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import nbconvert
import nbformat
from nbconvert.preprocessors import CellExecutionError, ExecutePreprocessor
from prefect import flow, task
from prefect_dask import DaskTaskRunner

from jupyter_scheduler.models import DescribeJob, JobFeature, Status
from jupyter_scheduler.orm import Job, create_session
Expand Down Expand Up @@ -122,6 +124,7 @@ def on_complete(self):
class DefaultExecutionManager(ExecutionManager):
"""Default execution manager that executes notebooks"""

@flow(task_runner=DaskTaskRunner)
def execute(self):
job = self.model

Expand All @@ -144,6 +147,7 @@ def execute(self):
self.add_side_effects_files(staging_dir)
self.create_output_files(job, nb)

@task
def add_side_effects_files(self, staging_dir: str):
"""Scan for side effect files potentially created after input file execution and update the job's packaged_files with these files"""
input_notebook = os.path.relpath(self.staging_paths["input"])
Expand All @@ -166,6 +170,7 @@ def add_side_effects_files(self, staging_dir: str):
)
session.commit()

@task
def create_output_files(self, job: DescribeJob, notebook_node):
for output_format in job.output_formats:
cls = nbconvert.get_exporter(output_format)
Expand Down
27 changes: 10 additions & 17 deletions jupyter_scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from jupyter_core.paths import jupyter_data_dir
from jupyter_server.transutils import _i18n
from jupyter_server.utils import to_os_path
from prefect import flow, task
from prefect_dask import DaskTaskRunner
from sqlalchemy import and_, asc, desc, func
from traitlets import Instance
from traitlets import Type as TType
Expand Down Expand Up @@ -478,25 +480,16 @@ def create_job(self, model: CreateJob) -> str:
else:
self.copy_input_file(model.input_uri, staging_paths["input"])

# The MP context forces new processes to not be forked on Linux.
# This is necessary because `asyncio.get_event_loop()` is bugged in
# forked processes in Python versions below 3.12. This method is
# called by `jupyter_core` by `nbconvert` in the default executor.
#
# See: https://github.com/python/cpython/issues/66285
# See also: https://github.com/jupyter/jupyter_core/pull/362
mp_ctx = mp.get_context("spawn")
p = mp_ctx.Process(
target=self.execution_manager_class(
job_id=job.job_id,
staging_paths=staging_paths,
root_dir=self.root_dir,
db_url=self.db_url,
).process
execution_manager = self.execution_manager_class(
job_id=job.job_id,
staging_paths=staging_paths,
root_dir=self.root_dir,
db_url=self.db_url,
)
p.start()

job.pid = p.pid
execution_manager.process()

job.pid = 1 # TODO: fix pid hardcode
session.commit()

job_id = job.job_id
Expand Down
Loading