diff --git a/.github/workflows/pythonbuild.yml b/.github/workflows/pythonbuild.yml index b18cdc229a1..ac99c251412 100644 --- a/.github/workflows/pythonbuild.yml +++ b/.github/workflows/pythonbuild.yml @@ -6,6 +6,7 @@ on: push: branches: - master + workflow_dispatch: pull_request: schedule: - cron: "0 13 * * *" # This schedule runs at 1pm UTC every day @@ -152,6 +153,149 @@ jobs: fail_ci_if_error: false files: coverage.xml + build-with-grpcio: + needs: + - detect-python-versions + runs-on: ${{ matrix.os }} + strategy: + fail-fast: false + matrix: + os: [ubuntu-latest, macos-latest] + # Exclude `universal2` targets, due to `unionai-oss/flytectl-setup-action@master` has no `arm` release. + python-version: ${{fromJson(needs.detect-python-versions.outputs.python-versions)}} + steps: + - uses: actions/checkout@v4 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python-version }} + - name: Cache pip + uses: actions/cache@v3 + with: + # This path is specific to Ubuntu + path: ~/.cache/pip + # Look to see if there is a cache hit for the corresponding requirements files + key: ${{ format('{0}-pip-{1}', runner.os, hashFiles('dev-requirements.in', 'requirements.in')) }} + - name: Install dependencies + run: | + pip install uv + make setup-global-uv + - name: Build wheels - x86_64 + uses: PyO3/maturin-action@v1 + if: matrix.os != 'macos-latest' + with: + target: x86_64 + command: build + args: --release --out dist --sdist -m flyrs/Cargo.toml + - name: Install built wheel - x86_64 + if: matrix.os != 'macos-latest' + run: | + uv pip install --system flyrs --no-index --find-links dist --force-reinstall + python -c "import flyrs" + - name: Build wheels - universal2 + if: matrix.os == 'macos-latest' + uses: PyO3/maturin-action@v1 + with: + target: universal2-apple-darwin + command: build + args: --release --out dist --sdist -m flyrs/Cargo.toml + - name: Install built wheel - universal2 + if: matrix.os == 'macos-latest' + run: | + uv pip install --system flyrs --no-index --find-links dist --force-reinstall + python -c "import flyrs" + - name: Freeze dependencies + run: uv pip freeze + - name: Install FlyteCTL + if: matrix.os != 'macos-latest' + uses: unionai-oss/flytectl-setup-action@master + - name: Setup Flyte Sandbox + if: matrix.os != 'macos-latest' + run: | + flytectl demo start + - name: Integration test of flytekit remote rust client + if: matrix.os != 'macos-latest' + run: | + python -m pytest tests/flytekit/integration/remote/test_rust_remote.py + - name: Codecov + if: matrix.os != 'macos-latest' + uses: codecov/codecov-action@v3.1.4 + with: + fail_ci_if_error: false + files: coverage.xml + + build-without-grpcio: + needs: + - detect-python-versions + runs-on: ${{ matrix.os }} + strategy: + fail-fast: false + matrix: + os: [ubuntu-latest, macos-latest] + # Exclude `universal2` targets, due to `unionai-oss/flytectl-setup-action@master` has no `arm` release. + python-version: ${{fromJson(needs.detect-python-versions.outputs.python-versions)}} + steps: + - uses: actions/checkout@v4 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python-version }} + - name: Cache pip + uses: actions/cache@v3 + with: + # This path is specific to Ubuntu + path: ~/.cache/pip + # Look to see if there is a cache hit for the corresponding requirements files + key: ${{ format('{0}-pip-{1}', runner.os, hashFiles('dev-requirements.in', 'requirements.in')) }} + - name: Install dependencies + run: | + pip install uv + make setup-global-uv + uv pip uninstall --system grpcio grpcio-status + - name: Build wheels - x86_64 + uses: PyO3/maturin-action@v1 + if: matrix.os != 'macos-latest' + with: + target: x86_64 + command: build + args: --release --out dist --sdist -m flyrs/Cargo.toml + - name: Install built wheel - x86_64 + if: matrix.os != 'macos-latest' + run: | + uv pip install --system flyrs --no-index --find-links dist --force-reinstall + python -c "import flyrs" + - name: Build wheels - universal2 + if: matrix.os == 'macos-latest' + uses: PyO3/maturin-action@v1 + with: + target: universal2-apple-darwin + command: build + args: --release --out dist --sdist -m flyrs/Cargo.toml + - name: Install built wheel - universal2 + if: matrix.os == 'macos-latest' + run: | + uv pip install --system flyrs --no-index --find-links dist --force-reinstall + python -c "import flyrs" + - name: Freeze dependencies + run: uv pip freeze + - name: Install FlyteCTL + if: matrix.os != 'macos-latest' + uses: unionai-oss/flytectl-setup-action@master + - name: Setup Flyte Sandbox + if: matrix.os != 'macos-latest' + run: | + flytectl demo start + - name: Integration test of flytekit remote rust client + if: matrix.os != 'macos-latest' + run: | + python -m pytest tests/flytekit/integration/remote/test_rust_remote.py + - name: Codecov + if: matrix.os != 'macos-latest' + uses: codecov/codecov-action@v3.1.4 + with: + fail_ci_if_error: false + files: coverage.xml + test-hypothesis: needs: - detect-python-versions @@ -433,6 +577,7 @@ jobs: uses: codecov/codecov-action@v3.1.0 with: fail_ci_if_error: false + lint: runs-on: ubuntu-latest steps: diff --git a/Makefile b/Makefile index ba574ae5869..a67879f45ba 100644 --- a/Makefile +++ b/Makefile @@ -95,7 +95,7 @@ integration_test_codecov: .PHONY: integration_test integration_test: - $(PYTEST_AND_OPTS) tests/flytekit/integration ${CODECOV_OPTS} + $(PYTEST_AND_OPTS) tests/flytekit/integration --ignore=tests/flytekit/integration/remote/test_rust_remote.py ${CODECOV_OPTS} doc-requirements.txt: export CUSTOM_COMPILE_COMMAND := make doc-requirements.txt doc-requirements.txt: doc-requirements.in install-piptools diff --git a/flyrs/Cargo.toml b/flyrs/Cargo.toml index 0a75d7f0ad4..a75fa0dd8eb 100644 --- a/flyrs/Cargo.toml +++ b/flyrs/Cargo.toml @@ -9,7 +9,7 @@ name = "flyrs" crate-type = ["cdylib"] [dependencies] -pyo3 = "0.21.2" +pyo3 = { version = "0.21.2", features = [ "abi3-py312", "extension-module" ] } flyteidl = { git = "https://github.com/flyteorg/flyte.git", branch = "master" } prost = "0.12.4" tonic = "0.11.0" diff --git a/flyrs/pyproject.toml b/flyrs/pyproject.toml index 044979e547a..69021e3b211 100644 --- a/flyrs/pyproject.toml +++ b/flyrs/pyproject.toml @@ -1,10 +1,10 @@ [build-system] -requires = ["maturin>=1.5,<2.0"] +requires = ["maturin>=1.5.1,<2.0"] build-backend = "maturin" [project] name = "flyrs" -requires-python = ">=3.8" +requires-python = ">=3.12" classifiers = [ "Programming Language :: Rust", "Programming Language :: Python :: Implementation :: CPython", diff --git a/flyrs/src/lib.rs b/flyrs/src/lib.rs index 3c4c8179c00..64006fee967 100644 --- a/flyrs/src/lib.rs +++ b/flyrs/src/lib.rs @@ -58,7 +58,7 @@ impl std::convert::From for MessageDecodeError { } /// A Python class constructs the gRPC service stubs and a Tokio asynchronous runtime in Rust. -#[pyclass(subclass)] +#[pyclass(subclass, module="flyrs", name = "FlyteClient")] pub struct FlyteClient { admin_service: AdminServiceClient, runtime: Runtime, diff --git a/flytekit/clients/auth_helper.py b/flytekit/clients/auth_helper.py index 04028bc10a8..c9e244770a6 100644 --- a/flytekit/clients/auth_helper.py +++ b/flytekit/clients/auth_helper.py @@ -2,11 +2,11 @@ import ssl from http import HTTPStatus -import grpc import requests from flyteidl.service.auth_pb2 import OAuth2MetadataRequest, PublicClientAuthConfigRequest from flyteidl.service.auth_pb2_grpc import AuthMetadataServiceStub +from flytekit import lazy_module from flytekit.clients.auth.authenticator import ( Authenticator, ClientConfig, @@ -21,6 +21,8 @@ from flytekit.clients.grpc_utils.wrap_exception_interceptor import RetryExceptionWrapperInterceptor from flytekit.configuration import AuthType, PlatformConfig +grpc = lazy_module("grpc") + class RemoteClientConfigStore(ClientConfigStore): """ diff --git a/flytekit/clients/friendly_rs.py b/flytekit/clients/friendly_rs.py index 7fdb18df2f2..0564566d7ac 100644 --- a/flytekit/clients/friendly_rs.py +++ b/flytekit/clients/friendly_rs.py @@ -3,7 +3,6 @@ from flytekit.clients.friendly import SynchronousFlyteClient as _SynchronousFlyteClient from flytekit.configuration import PlatformConfig -from flytekit.lazy_import.lazy_module import lazy_module from flytekit.models import common as _common from flytekit.models import filters as _filters from flytekit.models import task as _task @@ -24,7 +23,9 @@ class RustSynchronousFlyteClient(_SynchronousFlyteClient): """ def __init__(self, cfg: PlatformConfig): - flyrs = lazy_module("flyrs") + # flyrs = lazy_module("flyrs") + import flyrs + self.cfg = cfg self._raw = flyrs.FlyteClient(endpoint=self.cfg.endpoint) diff --git a/flytekit/clients/grpc_utils/auth_interceptor.py b/flytekit/clients/grpc_utils/auth_interceptor.py index e467801a774..e543665e493 100644 --- a/flytekit/clients/grpc_utils/auth_interceptor.py +++ b/flytekit/clients/grpc_utils/auth_interceptor.py @@ -1,10 +1,11 @@ import typing from collections import namedtuple -import grpc - +from flytekit import lazy_module from flytekit.clients.auth.authenticator import Authenticator +grpc = lazy_module("grpc") + class _ClientCallDetails( namedtuple("_ClientCallDetails", ("method", "timeout", "metadata", "credentials")), diff --git a/flytekit/clients/grpc_utils/default_metadata_interceptor.py b/flytekit/clients/grpc_utils/default_metadata_interceptor.py index 12b06cca031..a73dcadc518 100644 --- a/flytekit/clients/grpc_utils/default_metadata_interceptor.py +++ b/flytekit/clients/grpc_utils/default_metadata_interceptor.py @@ -1,9 +1,10 @@ import typing -import grpc - +from flytekit import lazy_module from flytekit.clients.grpc_utils.auth_interceptor import _ClientCallDetails +grpc = lazy_module("grpc") + class DefaultMetadataInterceptor(grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor): def _inject_default_metadata(self, call_details: grpc.ClientCallDetails): diff --git a/flytekit/clients/grpc_utils/wrap_exception_interceptor.py b/flytekit/clients/grpc_utils/wrap_exception_interceptor.py index ea796f464a7..e79fa11ce38 100644 --- a/flytekit/clients/grpc_utils/wrap_exception_interceptor.py +++ b/flytekit/clients/grpc_utils/wrap_exception_interceptor.py @@ -1,8 +1,7 @@ import typing from typing import Union -import grpc - +from flytekit import lazy_module from flytekit.exceptions.base import FlyteException from flytekit.exceptions.system import FlyteSystemException from flytekit.exceptions.user import ( @@ -12,6 +11,8 @@ FlyteInvalidInputException, ) +grpc = lazy_module("grpc") + class RetryExceptionWrapperInterceptor(grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor): def __init__(self, max_retries: int = 3): diff --git a/flytekit/clients/raw.py b/flytekit/clients/raw.py index 67a0baa7706..c1b517b996f 100644 --- a/flytekit/clients/raw.py +++ b/flytekit/clients/raw.py @@ -6,6 +6,7 @@ from flyteidl.admin.signal_pb2 import SignalList, SignalListRequest, SignalSetRequest, SignalSetResponse from flyteidl.service import dataproxy_pb2 as _dataproxy_pb2 +from flytekit import lazy_module from flytekit.configuration import PlatformConfig from flytekit.loggers import logger @@ -32,16 +33,16 @@ def __init__(self, cfg: PlatformConfig, **kwargs): url: The server address. insecure: if insecure is desired """ - from flyteidl.service import admin_pb2_grpc as _admin_service - from flyteidl.service import dataproxy_pb2_grpc as dataproxy_service - from flyteidl.service import signal_pb2_grpc as signal_service - from flytekit.clients.auth_helper import ( - get_channel, - upgrade_channel_to_authenticated, - upgrade_channel_to_proxy_authenticated, - wrap_exceptions_channel, - ) + _admin_service = lazy_module("flyteidl.service.admin_pb2_grpc") + dataproxy_service = lazy_module("flyteidl.service.dataproxy_pb2_grpc") + signal_service = lazy_module("flyteidl.service.signal_pb2_grpc") + + auth_helper = lazy_module("flytekit.clients.auth_helper") + get_channel = auth_helper.get_channel + upgrade_channel_to_authenticated = auth_helper.upgrade_channel_to_authenticated + upgrade_channel_to_proxy_authenticated = auth_helper.upgrade_channel_to_proxy_authenticated + wrap_exceptions_channel = auth_helper.wrap_exceptions_channel # Set the value here to match the limit in Admin, otherwise the client will cut off and the user gets a # StreamRemoved exception. @@ -66,7 +67,7 @@ def __init__(self, cfg: PlatformConfig, **kwargs): @classmethod def with_root_certificate(cls, cfg: PlatformConfig, root_cert_file: str) -> RawSynchronousFlyteClient: - import grpc + grpc = lazy_module("grpc") b = None with open(root_cert_file, "rb") as fp: diff --git a/tests/flytekit/integration/remote/test_rust_remote.py b/tests/flytekit/integration/remote/test_rust_remote.py new file mode 100644 index 00000000000..a65975d5763 --- /dev/null +++ b/tests/flytekit/integration/remote/test_rust_remote.py @@ -0,0 +1,72 @@ +import time + +import pytest + +from flytekit import task +from flytekit.configuration import Config, ImageConfig, SerializationSettings +from flytekit.remote import FlyteRemote +from flytekit.remote.entities import FlyteTask + +PROJECT = "flytesnacks" +DOMAIN = "development" + +TASK_NAME = "tests.flytekit.integration.remote.test_rust_remote.my_test_task" +VERSION_ID = f"{hash(time.time())}" # Use current timestamp when initialize tests, to prevent identical re-register. + + +def test_register_task(): + pytest.importorskip("flyrs") + + @task() + def my_test_task(n: int) -> int: + return n + + remote_rs = FlyteRemote( + Config.for_endpoint(endpoint="localhost:30080", insecure=True), + default_project=PROJECT, + default_domain=DOMAIN, + enable_rust=True, + ) + flyte_task = remote_rs.register_task( + entity=my_test_task, + serialization_settings=SerializationSettings( + image_config=ImageConfig.auto(img_name="flyte-cr.io/image-name:tag") + ), + version=VERSION_ID, + ) + assert isinstance(flyte_task, FlyteTask) + assert f"{flyte_task.id}" == f"TASK:{PROJECT}:{DOMAIN}:{TASK_NAME}:{VERSION_ID}" + + +def test_fetch_task_without_grpc(): + pytest.importorskip("flyrs") + + remote_rs = FlyteRemote( + Config.for_endpoint(endpoint="localhost:30080", insecure=True), + default_project=PROJECT, + default_domain=DOMAIN, + enable_rust=True, + ) + + task_rs = remote_rs.fetch_task(name=TASK_NAME, version=VERSION_ID) + assert isinstance(task_rs, FlyteTask) + assert f"{task_rs.id}" == f"TASK:{PROJECT}:{DOMAIN}:{TASK_NAME}:{VERSION_ID}" + + +def test_fetch_task_and_compare(): + pytest.importorskip("flyrs") + pytest.importorskip("grpc") + + remote_py = FlyteRemote( + Config.for_endpoint(endpoint="localhost:30080", insecure=True), default_project=PROJECT, default_domain=DOMAIN + ) + remote_rs = FlyteRemote( + Config.for_endpoint(endpoint="localhost:30080", insecure=True), + default_project=PROJECT, + default_domain=DOMAIN, + enable_rust=True, + ) + + task_py = remote_py.fetch_task(name=TASK_NAME, version=VERSION_ID) + task_rs = remote_rs.fetch_task(name=TASK_NAME, version=VERSION_ID) + assert task_py == task_rs