From 291a74435f362cb9caa8786061af4ae012947393 Mon Sep 17 00:00:00 2001 From: Falko Schindler Date: Tue, 19 Sep 2023 22:49:53 +0200 Subject: [PATCH 1/4] introduce run.cpu_bound and run.io_bound --- nicegui/__init__.py | 5 ++++- nicegui/nicegui.py | 4 +++- nicegui/run_executor.py | 39 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 46 insertions(+), 2 deletions(-) create mode 100644 nicegui/run_executor.py diff --git a/nicegui/__init__.py b/nicegui/__init__.py index 2ec3b9335..5907b9c57 100644 --- a/nicegui/__init__.py +++ b/nicegui/__init__.py @@ -1,4 +1,6 @@ -from . import elements, globals, ui # pylint: disable=redefined-builtin +from . import ui # pylint: disable=redefined-builtin +from . import elements, globals # pylint: disable=redefined-builtin +from . import run_executor as run from .api_router import APIRouter from .client import Client from .nicegui import app @@ -11,6 +13,7 @@ 'Client', 'elements', 'globals', + 'run', 'Tailwind', 'ui', '__version__', diff --git a/nicegui/nicegui.py b/nicegui/nicegui.py index 379f69548..bc503ab97 100644 --- a/nicegui/nicegui.py +++ b/nicegui/nicegui.py @@ -11,7 +11,8 @@ from fastapi.staticfiles import StaticFiles from fastapi_socketio import SocketManager -from . import background_tasks, binding, favicon, globals, json, outbox, welcome # pylint: disable=redefined-builtin +from . import (background_tasks, binding, favicon, globals, json, outbox, # pylint: disable=redefined-builtin + run_executor, welcome) from .app import App from .client import Client from .dependencies import js_components, libraries @@ -109,6 +110,7 @@ async def handle_shutdown() -> None: with globals.index_client: for t in globals.shutdown_handlers: safe_invoke(t) + run_executor.tear_down() globals.state = globals.State.STOPPED if globals.air: await globals.air.disconnect() diff --git a/nicegui/run_executor.py b/nicegui/run_executor.py new file mode 100644 index 000000000..8ea54baae --- /dev/null +++ b/nicegui/run_executor.py @@ -0,0 +1,39 @@ +import asyncio +from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor +from functools import partial +from typing import Any, Callable + +from . import globals, helpers # pylint: disable=redefined-builtin + +process_pool = ProcessPoolExecutor() +thread_pool = ThreadPoolExecutor() + + +async def _run(executor: Any, callback: Callable, *args: Any, **kwargs: Any): + if globals.state == globals.State.STOPPING: + return + try: + loop = asyncio.get_running_loop() + return await loop.run_in_executor(executor, partial(callback, *args, **kwargs)) + except RuntimeError as e: + if 'cannot schedule new futures after shutdown' not in str(e): + raise + except asyncio.exceptions.CancelledError: + pass + + +async def cpu_bound(callback: Callable, *args: Any, **kwargs: Any): + _run(process_pool, callback, *args, **kwargs) + + +async def io_bound(callback: Callable, *args: Any, **kwargs: Any): + _run(thread_pool, callback, *args, **kwargs) + + +def tear_down() -> None: + if helpers.is_pytest(): + return + for p in process_pool._processes.values(): # pylint: disable=protected-access + p.kill() + process_pool.shutdown(wait=True, cancel_futures=True) + thread_pool.shutdown(wait=False, cancel_futures=True) From 5b977ff298f8c3e3982571777dc86668f3aef165 Mon Sep 17 00:00:00 2001 From: Falko Schindler Date: Tue, 19 Sep 2023 23:53:26 +0200 Subject: [PATCH 2/4] fix run_executor.py for Python 3.8 --- nicegui/run_executor.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/nicegui/run_executor.py b/nicegui/run_executor.py index 8ea54baae..dd1ccd187 100644 --- a/nicegui/run_executor.py +++ b/nicegui/run_executor.py @@ -1,4 +1,5 @@ import asyncio +import sys from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor from functools import partial from typing import Any, Callable @@ -35,5 +36,6 @@ def tear_down() -> None: return for p in process_pool._processes.values(): # pylint: disable=protected-access p.kill() - process_pool.shutdown(wait=True, cancel_futures=True) - thread_pool.shutdown(wait=False, cancel_futures=True) + kwargs = {'cancel_futures': True} if sys.version_info >= (3, 9) else {} + process_pool.shutdown(wait=True, **kwargs) + thread_pool.shutdown(wait=False, **kwargs) From a69fb4e9b6ff1a6feebf67da0611c591e6f671fd Mon Sep 17 00:00:00 2001 From: Falko Schindler Date: Thu, 21 Sep 2023 08:58:45 +0200 Subject: [PATCH 3/4] add documentation --- nicegui/run_executor.py | 13 +++++++----- website/documentation.py | 43 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 51 insertions(+), 5 deletions(-) diff --git a/nicegui/run_executor.py b/nicegui/run_executor.py index dd1ccd187..54e31e615 100644 --- a/nicegui/run_executor.py +++ b/nicegui/run_executor.py @@ -10,7 +10,7 @@ thread_pool = ThreadPoolExecutor() -async def _run(executor: Any, callback: Callable, *args: Any, **kwargs: Any): +async def _run(executor: Any, callback: Callable, *args: Any, **kwargs: Any) -> Any: if globals.state == globals.State.STOPPING: return try: @@ -23,15 +23,18 @@ async def _run(executor: Any, callback: Callable, *args: Any, **kwargs: Any): pass -async def cpu_bound(callback: Callable, *args: Any, **kwargs: Any): - _run(process_pool, callback, *args, **kwargs) +async def cpu_bound(callback: Callable, *args: Any, **kwargs: Any) -> Any: + """Run a CPU-bound function in a separate process.""" + return await _run(process_pool, callback, *args, **kwargs) -async def io_bound(callback: Callable, *args: Any, **kwargs: Any): - _run(thread_pool, callback, *args, **kwargs) +async def io_bound(callback: Callable, *args: Any, **kwargs: Any) -> Any: + """Run an I/O-bound function in a separate thread.""" + return await _run(thread_pool, callback, *args, **kwargs) def tear_down() -> None: + """Kill all processes and threads.""" if helpers.is_pytest(): return for p in process_pool._processes.values(): # pylint: disable=protected-access diff --git a/website/documentation.py b/website/documentation.py index 3b88514f8..2b28a5dc7 100644 --- a/website/documentation.py +++ b/website/documentation.py @@ -358,6 +358,49 @@ async def async_task(): ui.button('start async task', on_click=async_task) + @text_demo('Running CPU-bound tasks', ''' + NiceGUI provides a `cpu_bound` function for running CPU-bound tasks in a separate process. + This is useful for long-running computations that would otherwise block the event loop and make the UI unresponsive. + The function returns a future that can be awaited. + ''') + def cpu_bound_demo(): + import time + + from nicegui import run + + def compute_sum(a: float, b: float) -> float: + time.sleep(1) # simulate a long-running computation + return a + b + + async def handle_click(): + result = await run.cpu_bound(compute_sum, 1, 2) + ui.notify(f'Sum is {result}') + + # ui.button('Compute', on_click=handle_click) + # END OF DEMO + async def mock_click(): + import asyncio + await asyncio.sleep(1) + ui.notify('Sum is 3') + ui.button('Compute', on_click=mock_click) + + @text_demo('Running I/O-bound tasks', ''' + NiceGUI provides an `io_bound` function for running I/O-bound tasks in a separate thread. + This is useful for long-running I/O operations that would otherwise block the event loop and make the UI unresponsive. + The function returns a future that can be awaited. + ''') + def io_bound_demo(): + import requests + + from nicegui import run + + async def handle_click(): + URL = 'https://httpbin.org/delay/1' + response = await run.io_bound(requests.get, URL, timeout=3) + ui.notify(f'Downloaded {len(response.content)} bytes') + + ui.button('Download', on_click=handle_click) + heading('Pages') load_demo(ui.page) From 902f8e5c2ebb315707b6cd40adf36ebab58f3f85 Mon Sep 17 00:00:00 2001 From: Falko Schindler Date: Thu, 21 Sep 2023 09:09:07 +0200 Subject: [PATCH 4/4] use new run module where possible --- examples/ai_interface/main.py | 14 +++----------- examples/opencv_webcam/main.py | 13 +++---------- examples/progress/main.py | 15 +++------------ nicegui/native.py | 4 ++-- nicegui/welcome.py | 5 ++--- 5 files changed, 13 insertions(+), 38 deletions(-) diff --git a/examples/ai_interface/main.py b/examples/ai_interface/main.py index 39d42e606..90f9b9e03 100755 --- a/examples/ai_interface/main.py +++ b/examples/ai_interface/main.py @@ -1,25 +1,17 @@ #!/usr/bin/env python3 -import asyncio -import functools import io -from typing import Callable import replicate # very nice API to run AI models; see https://replicate.com/ -from nicegui import ui +from nicegui import run, ui from nicegui.events import UploadEventArguments -async def io_bound(callback: Callable, *args: any, **kwargs: any): - '''Makes a blocking function awaitable; pass function as first parameter and its arguments as the rest''' - return await asyncio.get_event_loop().run_in_executor(None, functools.partial(callback, *args, **kwargs)) - - async def transcribe(e: UploadEventArguments): transcription.text = 'Transcribing...' model = replicate.models.get('openai/whisper') version = model.versions.get('30414ee7c4fffc37e260fcab7842b5be470b9b840f2b608f5baa9bbef9a259ed') - prediction = await io_bound(version.predict, audio=io.BytesIO(e.content.read())) + prediction = await run.io_bound(version.predict, audio=io.BytesIO(e.content.read())) text = prediction.get('transcription', 'no transcription') transcription.set_text(f'result: "{text}"') @@ -28,7 +20,7 @@ async def generate_image(): image.source = 'https://dummyimage.com/600x400/ccc/000000.png&text=building+image...' model = replicate.models.get('stability-ai/stable-diffusion') version = model.versions.get('db21e45d3f7023abc2a46ee38a23973f6dce16bb082a930b0c49861f96d1e5bf') - prediction = await io_bound(version.predict, prompt=prompt.value) + prediction = await run.io_bound(version.predict, prompt=prompt.value) image.source = prediction[0] # User Interface diff --git a/examples/opencv_webcam/main.py b/examples/opencv_webcam/main.py index ef23a1838..a3c5baa23 100755 --- a/examples/opencv_webcam/main.py +++ b/examples/opencv_webcam/main.py @@ -1,7 +1,5 @@ #!/usr/bin/env python3 -import asyncio import base64 -import concurrent.futures import signal import time @@ -10,10 +8,8 @@ from fastapi import Response import nicegui.globals -from nicegui import app, ui +from nicegui import app, run, ui -# We need an executor to schedule CPU-intensive tasks with `loop.run_in_executor()`. -process_pool_executor = concurrent.futures.ProcessPoolExecutor() # In case you don't have a webcam, this will provide a black placeholder image. black_1px = 'iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAAAXNSR0IArs4c6QAAAA1JREFUGFdjYGBg+A8AAQQBAHAgZQsAAAAASUVORK5CYII=' placeholder = Response(content=base64.b64decode(black_1px.encode('ascii')), media_type='image/png') @@ -31,14 +27,13 @@ def convert(frame: np.ndarray) -> bytes: async def grab_video_frame() -> Response: if not video_capture.isOpened(): return placeholder - loop = asyncio.get_running_loop() # The `video_capture.read` call is a blocking function. # So we run it in a separate thread (default executor) to avoid blocking the event loop. - _, frame = await loop.run_in_executor(None, video_capture.read) + _, frame = await run.io_bound(video_capture.read) if frame is None: return placeholder # `convert` is a CPU-intensive function, so we run it in a separate process to avoid blocking the event loop and GIL. - jpeg = await loop.run_in_executor(process_pool_executor, convert, frame) + jpeg = await run.cpu_bound(convert, frame) return Response(content=jpeg, media_type='image/jpeg') # For non-flickering image updates an interactive image is much better than `ui.image()`. @@ -68,8 +63,6 @@ async def cleanup() -> None: await disconnect() # Release the webcam hardware so it can be used by other applications again. video_capture.release() - # The process pool executor must be shutdown when the app is closed, otherwise the process will not exit. - process_pool_executor.shutdown() app.on_shutdown(cleanup) # We also need to disconnect clients when the app is stopped with Ctrl+C, diff --git a/examples/progress/main.py b/examples/progress/main.py index 5663e00ca..18e771543 100755 --- a/examples/progress/main.py +++ b/examples/progress/main.py @@ -1,16 +1,12 @@ #!/usr/bin/env python3 -import asyncio import time -from concurrent.futures import ProcessPoolExecutor from multiprocessing import Manager, Queue -from nicegui import app, ui - -pool = ProcessPoolExecutor() +from nicegui import run, ui def heavy_computation(q: Queue) -> str: - '''Some heavy computation that updates the progress bar through the queue.''' + """Run some heavy computation that updates the progress bar through the queue.""" n = 50 for i in range(n): # Perform some heavy computation @@ -23,11 +19,9 @@ def heavy_computation(q: Queue) -> str: @ui.page('/') def main_page(): - async def start_computation(): progressbar.visible = True - loop = asyncio.get_running_loop() - result = await loop.run_in_executor(pool, heavy_computation, queue) + result = await run.cpu_bound(heavy_computation, queue) ui.notify(result) progressbar.visible = False @@ -42,7 +36,4 @@ async def start_computation(): progressbar.visible = False -# stop the pool when the app is closed; will not cancel any running tasks -app.on_shutdown(pool.shutdown) - ui.run() diff --git a/nicegui/native.py b/nicegui/native.py index a87fe71d5..c40d68caa 100644 --- a/nicegui/native.py +++ b/nicegui/native.py @@ -1,4 +1,3 @@ -import asyncio import inspect import warnings from dataclasses import dataclass, field @@ -8,6 +7,7 @@ from .dataclasses import KWONLY_SLOTS from .globals import log +from .run_executor import io_bound method_queue: Queue = Queue() response_queue: Queue = Queue() @@ -123,7 +123,7 @@ def wrapper(*args: Any, **kwargs: Any) -> Any: log.exception(f'error in {name}') return None name = inspect.currentframe().f_back.f_code.co_name # type: ignore - return await asyncio.get_event_loop().run_in_executor(None, partial(wrapper, *args, **kwargs)) + return await io_bound(wrapper, *args, **kwargs) def signal_server_shutdown(self) -> None: self._send() diff --git a/nicegui/welcome.py b/nicegui/welcome.py index 1f787a1c8..f4c92fa22 100644 --- a/nicegui/welcome.py +++ b/nicegui/welcome.py @@ -1,9 +1,9 @@ -import asyncio import os import socket from typing import List from . import globals # pylint: disable=redefined-builtin +from .run_executor import io_bound try: import netifaces @@ -33,8 +33,7 @@ async def print_message() -> None: print('NiceGUI ready to go ', end='', flush=True) host = os.environ['NICEGUI_HOST'] port = os.environ['NICEGUI_PORT'] - loop = asyncio.get_running_loop() - ips = set((await loop.run_in_executor(None, get_all_ips)) if host == '0.0.0.0' else []) + ips = set((await io_bound(get_all_ips)) if host == '0.0.0.0' else []) ips.discard('127.0.0.1') urls = [(f'http://{ip}:{port}' if port != '80' else f'http://{ip}') for ip in ['localhost'] + sorted(ips)] globals.app.urls.update(urls)