From 764662b8a63796f1ed9dcc6ad479798a4adfa002 Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Tue, 8 Nov 2022 14:27:27 -0800 Subject: [PATCH] Put agent file at a predictable location and pass cluster environment (#2) * Put agent file at the right location * add additional args * add logging * update * update * update * update * fix --- anyscale_prefect_agent.py | 18 +++++++++++++++++- start_anyscale_service.py | 6 ++++++ 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/anyscale_prefect_agent.py b/anyscale_prefect_agent.py index 080b44c..f3a63fa 100644 --- a/anyscale_prefect_agent.py +++ b/anyscale_prefect_agent.py @@ -1,11 +1,20 @@ """ -Version 0.0.1 of the Anyscale Prefect Agent. +Version 0.0.2 of the Anyscale Prefect Agent. """ +import argparse +import logging import os import subprocess import tempfile +logging.basicConfig(level=logging.INFO) + +parser = argparse.ArgumentParser(add_help=False) +parser.add_argument("--cluster-env", type=str) +parser.add_argument("--compute-config", type=str) +args = parser.parse_args() + api_url = os.environ.get("PREFECT_API_URL") api_key = os.environ.get("PREFECT_API_KEY") flow_run_id = os.environ.get("PREFECT__FLOW_RUN_ID") @@ -24,7 +33,14 @@ entrypoint: "{}" """.format(cmd) +if args.compute_config: + content += 'compute_config: "{}"\n'.format(args.compute_config) + +if args.cluster_env: + content += 'cluster_env: "{}"\n'.format(args.cluster_env) + with tempfile.NamedTemporaryFile(mode="w") as f: f.write(content) f.flush() + logging.info(f"Submitting Anyscale Job with configuration '{content}'") subprocess.check_call(["anyscale", "job", "submit", f.name]) diff --git a/start_anyscale_service.py b/start_anyscale_service.py index 4ca7464..34f0a81 100644 --- a/start_anyscale_service.py +++ b/start_anyscale_service.py @@ -1,6 +1,12 @@ +import os import ray +import shutil import subprocess ray.init() +ANYSCALE_PREFECT_DIR = os.path.dirname(os.path.realpath(__file__)) + +shutil.copy(os.path.join(ANYSCALE_PREFECT_DIR, "anyscale_prefect_agent.py"), "/home/ray/") + subprocess.check_call(["prefect", "agent", "start", "-q", "test"])