diff --git a/.github/workflows/workflow.yaml b/.github/workflows/workflow.yaml index 699f838..20a2d8c 100644 --- a/.github/workflows/workflow.yaml +++ b/.github/workflows/workflow.yaml @@ -36,6 +36,7 @@ jobs: PREFECT_API_URL: ${{ secrets.PREFECT_API_URL }} PREFECT_API_KEY: ${{ secrets.PREFECT_API_KEY }} run: | + pip install -e . envsubst < ci/prefect-agent-service-ci.yaml > /tmp/prefect-agent-service.out anyscale service deploy /tmp/prefect-agent-service.out PYTHONPATH=$PYTHONPATH:. python ci/submit_prefect_run_and_check.py diff --git a/README.md b/README.md index 402626b..449f243 100644 --- a/README.md +++ b/README.md @@ -68,7 +68,7 @@ and create a `prefect-agent-service.yaml` file where you fill in the information name: prefect-agent entrypoint: pip install prefect && PREFECT_API_URL="https://api.prefect.cloud/api/accounts/..." PREFECT_API_KEY="..." python start_anyscale_service.py --queue test runtime_env: - working_dir: https://github.com/anyscale/prefect-anyscale/archive/refs/tags/v0.0.6.zip + working_dir: https://github.com/anyscale/prefect-anyscale/archive/refs/tags/v0.1.0.zip healthcheck_url: "/healthcheck" ``` @@ -84,19 +84,32 @@ You can then start the service with anyscale service deploy prefect-agent-service.yaml ``` -Now create a Prefect infrastructure that will be used to run the deployments inside of Anyscale: - -![set up prefect infra](./doc/set_up_prefect_infra.png) - -You can specify the cluster environment and compute environment used to run the workload with the `--cluster-env` and `--compute-config` -variables of `anyscale_prefect_agent.py`. You can define many different such infrastructures for different environments. These cluster -environments will need `prefect`, `prefect_ray` and `s3fs` installed. +Now create a Prefect infrastructure that will be used to run the deployments inside of Anyscale. You can do this +by running `pip install prefect-anyscale` and then in a Python interpreter +```python +import prefect_anyscale +infra = prefect_anyscale.AnyscaleJob(cluster_env="prefect-test-environment") +infra.save("test-infra") +``` #### Creating a deployment and scheduling the run Now we can go ahead and create a Prefect deployment: -```bash -prefect deployment build prefect_test.py:main -n prefect_test -q test --storage-block s3/test-storage --infra-block process/anyscale-infra --apply +```python +import prefect +from prefect.filesystems import S3 +from prefect_anyscale import AnyscaleJob + +from prefect_test import count_to + +deployment = prefect.deployments.Deployment.build_from_flow( + flow=count_to, + name="prefect_test", + work_queue_name="test", + storage=S3.load("test-storage"), + infrastructure=AnyscaleJob.load("test-infra") +) +deployment.apply() ``` You can now schedule new runs with this deployment from the Prefect UI diff --git a/anyscale_prefect_agent.py b/anyscale_prefect_agent.py deleted file mode 100644 index 370bc32..0000000 --- a/anyscale_prefect_agent.py +++ /dev/null @@ -1,46 +0,0 @@ -""" -Version 0.0.2 of the Anyscale Prefect Agent. -""" - -import argparse -import logging -import os -import subprocess -import tempfile - -logging.basicConfig(level=logging.INFO) - -parser = argparse.ArgumentParser() -parser.add_argument("--cluster-env", type=str) -parser.add_argument("--compute-config", type=str) -args = parser.parse_args() - -api_url = os.environ.get("PREFECT_API_URL") -api_key = os.environ.get("PREFECT_API_KEY") -flow_run_id = os.environ.get("PREFECT__FLOW_RUN_ID") - -cmd = "" -if api_url: - cmd += "PREFECT_API_URL={}".format(api_url) -if api_key: - cmd += " PREFECT_API_KEY={}".format(api_key) -if flow_run_id: - cmd += " PREFECT__FLOW_RUN_ID={}".format(flow_run_id) - -cmd += " /home/ray/anaconda3/bin/python -m prefect.engine" - -content = """ -entrypoint: "{}" -""".format(cmd) - -if args.compute_config: - content += 'compute_config: "{}"\n'.format(args.compute_config) - -if args.cluster_env: - content += 'cluster_env: "{}"\n'.format(args.cluster_env) - -with tempfile.NamedTemporaryFile(mode="w") as f: - f.write(content) - f.flush() - logging.info(f"Submitting Anyscale Job with configuration '{content}'") - subprocess.check_call(["anyscale", "job", "submit", f.name]) diff --git a/ci/prefect-agent-service-ci.yaml b/ci/prefect-agent-service-ci.yaml index d2b81a5..97e2390 100644 --- a/ci/prefect-agent-service-ci.yaml +++ b/ci/prefect-agent-service-ci.yaml @@ -1,5 +1,5 @@ -name: prefect-agent-10 -entrypoint: pip install prefect && PREFECT_API_URL=$PREFECT_API_URL PREFECT_API_KEY=$PREFECT_API_KEY python start_anyscale_service.py --queue test +name: prefect-agent-18 +entrypoint: pip install prefect-anyscale && PREFECT_API_URL=$PREFECT_API_URL PREFECT_API_KEY=$PREFECT_API_KEY python start_anyscale_service.py --queue test runtime_env: working_dir: . upload_path: "s3://anyscale-prefect-integration-test/github-working-dir/" diff --git a/ci/submit_prefect_run_and_check.py b/ci/submit_prefect_run_and_check.py index 5990674..619aad1 100644 --- a/ci/submit_prefect_run_and_check.py +++ b/ci/submit_prefect_run_and_check.py @@ -1,13 +1,10 @@ import asyncio -import uuid -import logging -import os -import subprocess import prefect.deployments from prefect.client import get_client from prefect.filesystems import S3 -from prefect.infrastructure import Process + +from prefect_anyscale import AnyscaleJob from prefect_test import count_to @@ -16,7 +13,7 @@ name="prefect_test", work_queue_name="test", storage=S3.load("test-storage-github"), - infrastructure=Process.load("anyscale-infra") + infrastructure=AnyscaleJob.load("anyscale-job-infra") ) deployment.apply() diff --git a/prefect-agent-service.yaml b/prefect-agent-service.yaml index bf66527..76b0b62 100644 --- a/prefect-agent-service.yaml +++ b/prefect-agent-service.yaml @@ -1,5 +1,5 @@ name: prefect-agent-10 -entrypoint: pip install prefect && PREFECT_API_URL=$PREFECT_API_URL PREFECT_API_KEY=$PREFECT_API_KEY python start_anyscale_service.py --queue test +entrypoint: pip install prefect-anyscale && PREFECT_API_URL=$PREFECT_API_URL PREFECT_API_KEY=$PREFECT_API_KEY python start_anyscale_service.py --queue test runtime_env: - working_dir: https://github.com/anyscale/prefect-anyscale/archive/refs/tags/v0.0.6.zip + working_dir: https://github.com/anyscale/prefect-anyscale/archive/refs/tags/v0.1.0.zip healthcheck_url: "/healthcheck" diff --git a/prefect_anyscale/__init__.py b/prefect_anyscale/__init__.py new file mode 100644 index 0000000..3753a08 --- /dev/null +++ b/prefect_anyscale/__init__.py @@ -0,0 +1,5 @@ +from prefect_anyscale.infrastructure import AnyscaleJob + +__all__ = [ + "AnyscaleJob" +] diff --git a/prefect_anyscale/infrastructure.py b/prefect_anyscale/infrastructure.py new file mode 100644 index 0000000..d8a197e --- /dev/null +++ b/prefect_anyscale/infrastructure.py @@ -0,0 +1,95 @@ +import logging +import os +from typing import Dict, Union +import subprocess +import sys +import tempfile + +from prefect.infrastructure.base import Infrastructure, InfrastructureResult +from prefect.utilities.asyncutils import sync_compatible +from pydantic import Field +from typing_extensions import Literal + + +class AnyscaleJob(Infrastructure): + + type: Literal["anyscale-job"] = Field( + default="anyscale-job", description="The type of infrastructure." + ) + + compute_config: Union[None, str, Dict[str, str]] = Field( + description="Compute config to use for the execution of the job.", + default=None, + ) + + cluster_env: Union[None, str, Dict[str, str]] = Field( + description="Cluster environment to use for the execution of the job." + ) + + _block_type_name = "Anyscale Job" + + def preview(self): + return " \\\n".join( + "compute_config = " + str(self.compute_config), + "cluster_env = " + str(self.cluster_env), + ) + + @sync_compatible + async def run( + self, + task_status = None, + ): + env = self._get_environment_variables() + api_url = env.get("PREFECT_API_URL") + api_key = env.get("PREFECT_API_KEY") + flow_run_id = env.get("PREFECT__FLOW_RUN_ID") + + cmd = "" + if api_url: + cmd += "PREFECT_API_URL={}".format(api_url) + if api_key: + cmd += " PREFECT_API_KEY={}".format(api_key) + if flow_run_id: + cmd += " PREFECT__FLOW_RUN_ID={}".format(flow_run_id) + + cmd += " /home/ray/anaconda3/bin/python -m prefect.engine" + + # Link the Job on the Anyscale UI with the prefect flow run + job_name = "prefect-job-" + flow_run_id + + content = """ + name: "{}" + entrypoint: "{}" + """.format(job_name, cmd) + + if self.compute_config: + content += 'compute_config: "{}"\n'.format(self.compute_config) + + if self.cluster_env: + content += 'cluster_env: "{}"\n'.format(self.cluster_env) + + with tempfile.NamedTemporaryFile(mode="w") as f: + f.write(content) + f.flush() + logging.info(f"Submitting Anyscale Job with configuration '{content}'") + returncode = subprocess.check_call(["anyscale", "job", "submit", f.name]) + + if task_status: + task_status.started(job_name) + + return AnyscaleJobResult( + status_code=returncode, identifier="" + ) + + def _get_environment_variables(self, include_os_environ: bool = True): + os_environ = os.environ if include_os_environ else {} + # The base environment must override the current environment or + # the Prefect settings context may not be respected + env = {**os_environ, **self._base_environment(), **self.env} + + # Drop null values allowing users to "unset" variables + return {key: value for key, value in env.items() if value is not None} + +class AnyscaleJobResult(InfrastructureResult): + """Contains information about the final state of a completed process""" + pass diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..1d5f359 --- /dev/null +++ b/setup.py @@ -0,0 +1,30 @@ +from setuptools import find_packages, setup + +setup( + name="prefect-anyscale", + version="0.1.0", + description="Prefect integrations with Anyscale.", + license="Apache License 2.0", + author="Anyscale, Inc.", + author_email="support@anyscale.com", + keywords="prefect", + url="https://github.com/anyscale/prefect-anyscale", + packages=find_packages(include=["prefect_anyscale"]), + python_requires=">=3.7", + install_requires = [ + "prefect>=2.7.1", + ], + classifiers=[ + "Natural Language :: English", + "Intended Audience :: Developers", + "Intended Audience :: System Administrators", + "License :: OSI Approved :: Apache Software License", + "Programming Language :: Python :: 3 :: Only", + "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Topic :: Software Development :: Libraries", + ], +) diff --git a/start_anyscale_service.py b/start_anyscale_service.py index 5da1f27..7c22d86 100644 --- a/start_anyscale_service.py +++ b/start_anyscale_service.py @@ -1,12 +1,8 @@ import argparse -import logging import os -import shutil import subprocess -import uuid from fastapi import FastAPI -import ray from ray import serve parser = argparse.ArgumentParser() @@ -21,9 +17,6 @@ @serve.ingress(app) class PrefectAgentDeployment: def __init__(self, prefect_env): - anyscale_prefect_dir = os.path.dirname(os.path.realpath(__file__)) - shutil.copy(os.path.join(anyscale_prefect_dir, "anyscale_prefect_agent.py"), "/home/ray/") - self.agent = subprocess.Popen( ["prefect", "agent", "start", "-q", args.queue], env=dict(os.environ, **prefect_env), @@ -39,5 +32,6 @@ def healthcheck(self): serve.run(PrefectAgentDeployment.bind({ "PREFECT_API_URL": os.environ["PREFECT_API_URL"], - "PREFECT_API_KEY": os.environ["PREFECT_API_KEY"] + "PREFECT_API_KEY": os.environ["PREFECT_API_KEY"], + "PREFECT_EXTRA_ENTRYPOINTS": "prefect_anyscale", }))