Skip to content

Commit

Permalink
instantiate dask client inside BaseScheduler, remove dask client inje…
Browse files Browse the repository at this point in the history
…ction into scheduler class at extension launch
  • Loading branch information
andrii-i committed Jun 24, 2024
1 parent 73eef13 commit 572faca
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 29 deletions.
1 change: 0 additions & 1 deletion conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ def jp_scheduler(jp_scheduler_db_url, jp_scheduler_root_dir, jp_scheduler_db):
db_url=jp_scheduler_db_url,
root_dir=str(jp_scheduler_root_dir),
environments_manager=MockEnvironmentManager(),
dask_client_future=AsyncMock(),
)


Expand Down
30 changes: 4 additions & 26 deletions jupyter_scheduler/extension.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import asyncio

from dask.distributed import Client as DaskClient
from jupyter_core.paths import jupyter_data_dir
from jupyter_server.extension.application import ExtensionApp
Expand Down Expand Up @@ -72,15 +74,11 @@ def initialize_settings(self):

environments_manager = self.environment_manager_class()

asyncio_loop = self.serverapp.io_loop.asyncio_loop
dask_client_future = asyncio_loop.create_task(self._get_dask_client())

scheduler = self.scheduler_class(
root_dir=self.serverapp.root_dir,
environments_manager=environments_manager,
db_url=self.db_url,
config=self.config,
dask_client_future=dask_client_future,
)

job_files_manager = self.job_files_manager_class(scheduler=scheduler)
Expand All @@ -89,28 +87,8 @@ def initialize_settings(self):
environments_manager=environments_manager,
scheduler=scheduler,
job_files_manager=job_files_manager,
dask_client_future=dask_client_future,
)

if scheduler.task_runner:
asyncio_loop.create_task(scheduler.task_runner.start())

async def _get_dask_client(self):
"""Creates and configures a Dask client."""
return DaskClient(processes=False, asynchronous=True)

async def stop_extension(self):
"""Called by the Jupyter Server when stopping to cleanup resources."""
try:
await self._stop_extension()
except Exception as e:
self.log.error("Error while stopping Jupyter Scheduler:")
self.log.exception(e)

async def _stop_extension(self):
"""Closes the Dask client if it exists."""
if "dask_client_future" in self.settings:
dask_client: DaskClient = await self.settings["dask_client_future"]
self.log.info("Closing Dask client.")
await dask_client.close()
self.log.info("Dask client closed.")
loop = asyncio.get_event_loop()
loop.create_task(scheduler.task_runner.start())
10 changes: 8 additions & 2 deletions jupyter_scheduler/scheduler.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import os
import random
import shutil
Expand Down Expand Up @@ -100,14 +101,19 @@ def __init__(
self,
root_dir: str,
environments_manager: Type[EnvironmentManager],
dask_client_future: Awaitable[DaskClient],
config=None,
**kwargs,
):
super().__init__(config=config, **kwargs)
self.root_dir = root_dir
self.environments_manager = environments_manager
self.dask_client_future = dask_client_future

loop = asyncio.get_event_loop()
self.dask_client_future: Awaitable[DaskClient] = loop.create_task(self._get_dask_client())

async def _get_dask_client(self):
"""Creates and configures a Dask client."""
return DaskClient(processes=False, asynchronous=True)

def create_job(self, model: CreateJob) -> str:
"""Creates a new job record, may trigger execution of the job.
Expand Down

0 comments on commit 572faca

Please sign in to comment.