Skip to content

Commit

Permalink
Implement AnyscaleJob infrastructure (#15)
Browse files Browse the repository at this point in the history
  • Loading branch information
pcmoritz authored Jan 12, 2023
1 parent 4bd3095 commit a89aae7
Show file tree
Hide file tree
Showing 10 changed files with 163 additions and 74 deletions.
1 change: 1 addition & 0 deletions .github/workflows/workflow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ jobs:
PREFECT_API_URL: ${{ secrets.PREFECT_API_URL }}
PREFECT_API_KEY: ${{ secrets.PREFECT_API_KEY }}
run: |
pip install -e .
envsubst < ci/prefect-agent-service-ci.yaml > /tmp/prefect-agent-service.out
anyscale service deploy /tmp/prefect-agent-service.out
PYTHONPATH=$PYTHONPATH:. python ci/submit_prefect_run_and_check.py
33 changes: 23 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ and create a `prefect-agent-service.yaml` file where you fill in the information
name: prefect-agent
entrypoint: pip install prefect && PREFECT_API_URL="https://api.prefect.cloud/api/accounts/..." PREFECT_API_KEY="..." python start_anyscale_service.py --queue test
runtime_env:
working_dir: https://github.com/anyscale/prefect-anyscale/archive/refs/tags/v0.0.6.zip
working_dir: https://github.com/anyscale/prefect-anyscale/archive/refs/tags/v0.1.0.zip
healthcheck_url: "/healthcheck"
```
Expand All @@ -84,19 +84,32 @@ You can then start the service with
anyscale service deploy prefect-agent-service.yaml
```

Now create a Prefect infrastructure that will be used to run the deployments inside of Anyscale:

![set up prefect infra](./doc/set_up_prefect_infra.png)

You can specify the cluster environment and compute environment used to run the workload with the `--cluster-env` and `--compute-config`
variables of `anyscale_prefect_agent.py`. You can define many different such infrastructures for different environments. These cluster
environments will need `prefect`, `prefect_ray` and `s3fs` installed.
Now create a Prefect infrastructure that will be used to run the deployments inside of Anyscale. You can do this
by running `pip install prefect-anyscale` and then in a Python interpreter
```python
import prefect_anyscale
infra = prefect_anyscale.AnyscaleJob(cluster_env="prefect-test-environment")
infra.save("test-infra")
```

#### Creating a deployment and scheduling the run

Now we can go ahead and create a Prefect deployment:
```bash
prefect deployment build prefect_test.py:main -n prefect_test -q test --storage-block s3/test-storage --infra-block process/anyscale-infra --apply
```python
import prefect
from prefect.filesystems import S3
from prefect_anyscale import AnyscaleJob
from prefect_test import count_to
deployment = prefect.deployments.Deployment.build_from_flow(
flow=count_to,
name="prefect_test",
work_queue_name="test",
storage=S3.load("test-storage"),
infrastructure=AnyscaleJob.load("test-infra")
)
deployment.apply()
```

You can now schedule new runs with this deployment from the Prefect UI
Expand Down
46 changes: 0 additions & 46 deletions anyscale_prefect_agent.py

This file was deleted.

4 changes: 2 additions & 2 deletions ci/prefect-agent-service-ci.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: prefect-agent-10
entrypoint: pip install prefect && PREFECT_API_URL=$PREFECT_API_URL PREFECT_API_KEY=$PREFECT_API_KEY python start_anyscale_service.py --queue test
name: prefect-agent-18
entrypoint: pip install prefect-anyscale && PREFECT_API_URL=$PREFECT_API_URL PREFECT_API_KEY=$PREFECT_API_KEY python start_anyscale_service.py --queue test
runtime_env:
working_dir: .
upload_path: "s3://anyscale-prefect-integration-test/github-working-dir/"
Expand Down
9 changes: 3 additions & 6 deletions ci/submit_prefect_run_and_check.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
import asyncio
import uuid
import logging
import os
import subprocess

import prefect.deployments
from prefect.client import get_client
from prefect.filesystems import S3
from prefect.infrastructure import Process

from prefect_anyscale import AnyscaleJob

from prefect_test import count_to

Expand All @@ -16,7 +13,7 @@
name="prefect_test",
work_queue_name="test",
storage=S3.load("test-storage-github"),
infrastructure=Process.load("anyscale-infra")
infrastructure=AnyscaleJob.load("anyscale-job-infra")
)
deployment.apply()

Expand Down
4 changes: 2 additions & 2 deletions prefect-agent-service.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: prefect-agent-10
entrypoint: pip install prefect && PREFECT_API_URL=$PREFECT_API_URL PREFECT_API_KEY=$PREFECT_API_KEY python start_anyscale_service.py --queue test
entrypoint: pip install prefect-anyscale && PREFECT_API_URL=$PREFECT_API_URL PREFECT_API_KEY=$PREFECT_API_KEY python start_anyscale_service.py --queue test
runtime_env:
working_dir: https://github.com/anyscale/prefect-anyscale/archive/refs/tags/v0.0.6.zip
working_dir: https://github.com/anyscale/prefect-anyscale/archive/refs/tags/v0.1.0.zip
healthcheck_url: "/healthcheck"
5 changes: 5 additions & 0 deletions prefect_anyscale/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from prefect_anyscale.infrastructure import AnyscaleJob

__all__ = [
"AnyscaleJob"
]
95 changes: 95 additions & 0 deletions prefect_anyscale/infrastructure.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import logging
import os
from typing import Dict, Union
import subprocess
import sys
import tempfile

from prefect.infrastructure.base import Infrastructure, InfrastructureResult
from prefect.utilities.asyncutils import sync_compatible
from pydantic import Field
from typing_extensions import Literal


class AnyscaleJob(Infrastructure):

type: Literal["anyscale-job"] = Field(
default="anyscale-job", description="The type of infrastructure."
)

compute_config: Union[None, str, Dict[str, str]] = Field(
description="Compute config to use for the execution of the job.",
default=None,
)

cluster_env: Union[None, str, Dict[str, str]] = Field(
description="Cluster environment to use for the execution of the job."
)

_block_type_name = "Anyscale Job"

def preview(self):
return " \\\n".join(
"compute_config = " + str(self.compute_config),
"cluster_env = " + str(self.cluster_env),
)

@sync_compatible
async def run(
self,
task_status = None,
):
env = self._get_environment_variables()
api_url = env.get("PREFECT_API_URL")
api_key = env.get("PREFECT_API_KEY")
flow_run_id = env.get("PREFECT__FLOW_RUN_ID")

cmd = ""
if api_url:
cmd += "PREFECT_API_URL={}".format(api_url)
if api_key:
cmd += " PREFECT_API_KEY={}".format(api_key)
if flow_run_id:
cmd += " PREFECT__FLOW_RUN_ID={}".format(flow_run_id)

cmd += " /home/ray/anaconda3/bin/python -m prefect.engine"

# Link the Job on the Anyscale UI with the prefect flow run
job_name = "prefect-job-" + flow_run_id

content = """
name: "{}"
entrypoint: "{}"
""".format(job_name, cmd)

if self.compute_config:
content += 'compute_config: "{}"\n'.format(self.compute_config)

if self.cluster_env:
content += 'cluster_env: "{}"\n'.format(self.cluster_env)

with tempfile.NamedTemporaryFile(mode="w") as f:
f.write(content)
f.flush()
logging.info(f"Submitting Anyscale Job with configuration '{content}'")
returncode = subprocess.check_call(["anyscale", "job", "submit", f.name])

if task_status:
task_status.started(job_name)

return AnyscaleJobResult(
status_code=returncode, identifier=""
)

def _get_environment_variables(self, include_os_environ: bool = True):
os_environ = os.environ if include_os_environ else {}
# The base environment must override the current environment or
# the Prefect settings context may not be respected
env = {**os_environ, **self._base_environment(), **self.env}

# Drop null values allowing users to "unset" variables
return {key: value for key, value in env.items() if value is not None}

class AnyscaleJobResult(InfrastructureResult):
"""Contains information about the final state of a completed process"""
pass
30 changes: 30 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from setuptools import find_packages, setup

setup(
name="prefect-anyscale",
version="0.1.0",
description="Prefect integrations with Anyscale.",
license="Apache License 2.0",
author="Anyscale, Inc.",
author_email="[email protected]",
keywords="prefect",
url="https://github.com/anyscale/prefect-anyscale",
packages=find_packages(include=["prefect_anyscale"]),
python_requires=">=3.7",
install_requires = [
"prefect>=2.7.1",
],
classifiers=[
"Natural Language :: English",
"Intended Audience :: Developers",
"Intended Audience :: System Administrators",
"License :: OSI Approved :: Apache Software License",
"Programming Language :: Python :: 3 :: Only",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Topic :: Software Development :: Libraries",
],
)
10 changes: 2 additions & 8 deletions start_anyscale_service.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
import argparse
import logging
import os
import shutil
import subprocess
import uuid

from fastapi import FastAPI
import ray
from ray import serve

parser = argparse.ArgumentParser()
Expand All @@ -21,9 +17,6 @@
@serve.ingress(app)
class PrefectAgentDeployment:
def __init__(self, prefect_env):
anyscale_prefect_dir = os.path.dirname(os.path.realpath(__file__))
shutil.copy(os.path.join(anyscale_prefect_dir, "anyscale_prefect_agent.py"), "/home/ray/")

self.agent = subprocess.Popen(
["prefect", "agent", "start", "-q", args.queue],
env=dict(os.environ, **prefect_env),
Expand All @@ -39,5 +32,6 @@ def healthcheck(self):

serve.run(PrefectAgentDeployment.bind({
"PREFECT_API_URL": os.environ["PREFECT_API_URL"],
"PREFECT_API_KEY": os.environ["PREFECT_API_KEY"]
"PREFECT_API_KEY": os.environ["PREFECT_API_KEY"],
"PREFECT_EXTRA_ENTRYPOINTS": "prefect_anyscale",
}))

0 comments on commit a89aae7

Please sign in to comment.