Skip to content

Commit

Permalink
Add utilities for creating and executing flow bundles (#17178)
Browse files Browse the repository at this point in the history
  • Loading branch information
desertaxle authored Feb 19, 2025
1 parent 3d30411 commit a476000
Show file tree
Hide file tree
Showing 4 changed files with 462 additions and 61 deletions.
136 changes: 136 additions & 0 deletions src/prefect/_experimental/bundles.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
from __future__ import annotations

import asyncio
import base64
import gzip
import multiprocessing
import multiprocessing.context
import os
from typing import Any, TypedDict

import cloudpickle

from prefect.client.schemas.objects import FlowRun
from prefect.context import SettingsContext, get_settings_context, serialize_context
from prefect.engine import handle_engine_signals
from prefect.flow_engine import run_flow
from prefect.flows import Flow
from prefect.settings.context import get_current_settings
from prefect.settings.models.root import Settings


class SerializedBundle(TypedDict):
"""
A serialized bundle is a serialized function, context, and flow run that can be
easily transported for later execution.
"""

function: str
context: str
flow_run: dict[str, Any]


def _serialize_bundle_object(obj: Any) -> str:
"""
Serializes an object to a string.
"""
return base64.b64encode(gzip.compress(cloudpickle.dumps(obj))).decode()


def _deserialize_bundle_object(serialized_obj: str) -> Any:
"""
Deserializes an object from a string.
"""
return cloudpickle.loads(gzip.decompress(base64.b64decode(serialized_obj)))


def create_bundle_for_flow_run(
flow: Flow[Any, Any],
flow_run: FlowRun,
context: dict[str, Any] | None = None,
) -> SerializedBundle:
"""
Creates a bundle for a flow run.
Args:
flow: The flow to bundle.
flow_run: The flow run to bundle.
context: The context to use when running the flow.
Returns:
A serialized bundle.
"""
context = context or serialize_context()

return {
"function": _serialize_bundle_object(flow),
"context": _serialize_bundle_object(context),
"flow_run": flow_run.model_dump(mode="json"),
}


def _extract_and_run_flow(
bundle: SerializedBundle, env: dict[str, Any] | None = None
) -> None:
"""
Extracts a flow from a bundle and runs it.
Designed to be run in a subprocess.
Args:
bundle: The bundle to extract and run.
env: The environment to use when running the flow.
"""

os.environ.update(env or {})
# TODO: make this a thing we can pass directly to the engine
os.environ["PREFECT__ENABLE_CANCELLATION_AND_CRASHED_HOOKS"] = "false"
settings_context = get_settings_context()

flow = _deserialize_bundle_object(bundle["function"])
context = _deserialize_bundle_object(bundle["context"])
flow_run = FlowRun.model_validate(bundle["flow_run"])

with SettingsContext(
profile=settings_context.profile,
settings=Settings(),
):
with handle_engine_signals(flow_run.id):
maybe_coro = run_flow(
flow=flow,
flow_run=flow_run,
context=context,
)
if asyncio.iscoroutine(maybe_coro):
# This is running in a brand new process, so there won't be an existing
# event loop.
asyncio.run(maybe_coro)


def execute_bundle_in_subprocess(
bundle: SerializedBundle,
) -> multiprocessing.context.SpawnProcess:
"""
Executes a bundle in a subprocess.
Args:
bundle: The bundle to execute.
Returns:
A multiprocessing.context.SpawnProcess.
"""

ctx = multiprocessing.get_context("spawn")

process = ctx.Process(
target=_extract_and_run_flow,
kwargs={
"bundle": bundle,
"env": get_current_settings().to_environment_variables(exclude_unset=True)
| os.environ,
},
)

process.start()

return process
92 changes: 63 additions & 29 deletions src/prefect/engine.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from __future__ import annotations

import os
import sys
from contextlib import contextmanager
from typing import TYPE_CHECKING, Any, Callable
from uuid import UUID

Expand All @@ -18,13 +21,71 @@
if TYPE_CHECKING:
import logging

from prefect.flow_engine import FlowRun
from prefect.client.schemas.objects import FlowRun
from prefect.flows import Flow
from prefect.logging.loggers import LoggingAdapter

engine_logger: "logging.Logger" = get_logger("engine")


@contextmanager
def handle_engine_signals(flow_run_id: UUID | None = None):
"""
Handle signals from the orchestrator to abort or pause the flow run or otherwise
handle unexpected exceptions.
This context manager will handle exiting the process depending on the signal received.
Args:
flow_run_id: The ID of the flow run to handle signals for.
Example:
```python
from prefect import flow
from prefect.engine import handle_engine_signals
from prefect.flow_engine import run_flow
@flow
def my_flow():
print("Hello, world!")
with handle_engine_signals():
run_flow(my_flow)
```
"""
try:
yield
except Abort:
if flow_run_id:
msg = f"Execution of flow run '{flow_run_id}' aborted by orchestrator."
else:
msg = "Execution aborted by orchestrator."
engine_logger.info(msg)
exit(0)
except Pause:
if flow_run_id:
msg = f"Execution of flow run '{flow_run_id}' is paused."
else:
msg = "Execution is paused."
engine_logger.info(msg)
exit(0)
except Exception:
if flow_run_id:
msg = f"Execution of flow run '{flow_run_id}' exited with unexpected exception"
else:
msg = "Execution exited with unexpected exception"
engine_logger.error(msg, exc_info=True)
exit(1)
except BaseException:
if flow_run_id:
msg = f"Execution of flow run '{flow_run_id}' interrupted by base exception"
else:
msg = "Execution interrupted by base exception"
engine_logger.error(msg, exc_info=True)
# Let the exit code be determined by the base exception type
raise


if __name__ == "__main__":
try:
flow_run_id: UUID = UUID(
Expand All @@ -36,7 +97,7 @@
)
exit(1)

try:
with handle_engine_signals(flow_run_id):
from prefect.flow_engine import (
flow_run_logger,
load_flow,
Expand All @@ -62,32 +123,5 @@
else:
run_flow(flow, flow_run=flow_run, error_logger=run_logger)

except Abort:
engine_logger.info(
f"Engine execution of flow run '{flow_run_id}' aborted by orchestrator."
)
exit(0)
except Pause:
engine_logger.info(f"Engine execution of flow run '{flow_run_id}' is paused.")
exit(0)
except Exception:
engine_logger.error(
(
f"Engine execution of flow run '{flow_run_id}' exited with unexpected "
"exception"
),
exc_info=True,
)
exit(1)
except BaseException:
engine_logger.error(
(
f"Engine execution of flow run '{flow_run_id}' interrupted by base "
"exception"
),
exc_info=True,
)
# Let the exit code be determined by the base exception type
raise

__getattr__: Callable[[str], Any] = getattr_migration(__name__)
34 changes: 2 additions & 32 deletions src/prefect/flow_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
hydrated_context,
serialize_context,
)
from prefect.engine import handle_engine_signals
from prefect.exceptions import (
Abort,
MissingFlowError,
Expand Down Expand Up @@ -1592,8 +1593,6 @@ def run_flow_with_env(
"""
Wrapper function to update environment variables and settings before running the flow.
"""
engine_logger = logging.getLogger("prefect.engine")

os.environ.update(env or {})
settings_context = get_settings_context()
# Create a new settings context with a new settings object to pick up the updated
Expand All @@ -1602,41 +1601,12 @@ def run_flow_with_env(
profile=settings_context.profile,
settings=Settings(),
):
try:
with handle_engine_signals(getattr(flow_run, "id", None)):
maybe_coro = run_flow(*args, **kwargs)
if asyncio.iscoroutine(maybe_coro):
# This is running in a brand new process, so there won't be an existing
# event loop.
asyncio.run(maybe_coro)
except Abort:
if flow_run:
msg = f"Execution of flow run '{flow_run.id}' aborted by orchestrator."
else:
msg = "Execution aborted by orchestrator."
engine_logger.info(msg)
exit(0)
except Pause:
if flow_run:
msg = f"Execution of flow run '{flow_run.id}' is paused."
else:
msg = "Execution is paused."
engine_logger.info(msg)
exit(0)
except Exception:
if flow_run:
msg = f"Execution of flow run '{flow_run.id}' exited with unexpected exception"
else:
msg = "Execution exited with unexpected exception"
engine_logger.error(msg, exc_info=True)
exit(1)
except BaseException:
if flow_run:
msg = f"Execution of flow run '{flow_run.id}' interrupted by base exception"
else:
msg = "Execution interrupted by base exception"
engine_logger.error(msg, exc_info=True)
# Let the exit code be determined by the base exception type
raise

ctx = multiprocessing.get_context("spawn")

Expand Down
Loading

0 comments on commit a476000

Please sign in to comment.