Skip to content

Commit

Permalink
ray actor
Browse files Browse the repository at this point in the history
  • Loading branch information
wangzheng422 committed Dec 30, 2024
1 parent 8aae4e5 commit f6ae616
Show file tree
Hide file tree
Showing 4 changed files with 1,285 additions and 0 deletions.
306 changes: 306 additions & 0 deletions redhat/ocp4/4.16/files/ray.notebook/2.5_run.llama.factory.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,306 @@
{
"cells": [
{
"cell_type": "markdown",
"id": "bbc21043",
"metadata": {},
"source": [
"In this notebook, we will go over how to leverage the SDK to directly work interactively with a Ray Cluster during development."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "b55bc3ea-4ce3-49bf-bb1f-e209de8ca47a",
"metadata": {},
"outputs": [],
"source": [
"# Import pieces from codeflare-sdk\n",
"from codeflare_sdk import Cluster, ClusterConfiguration, TokenAuthentication"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "614daa0c",
"metadata": {},
"outputs": [],
"source": [
"# Create authentication object for user permissions\n",
"# IF unused, SDK will automatically check for default kubeconfig, then in-cluster config\n",
"# KubeConfigFileAuthentication can also be used to specify kubeconfig path manually\n",
"auth = TokenAuthentication(\n",
" token = \"sha256~KAuGVI_ujl_uq57sVrOtMUN8VuNjbO00FG6P-vbhv2A\",\n",
" server = \"https://api.demo-01-rhsys.wzhlab.top:6443\",\n",
" skip_tls= True\n",
")\n",
"auth.login()"
]
},
{
"cell_type": "markdown",
"id": "bc27f84c",
"metadata": {},
"source": [
"Once again, let's start by running through the same cluster setup as before:\n",
"\n",
"NOTE: The default images used by the CodeFlare SDK for creating a RayCluster resource depend on the installed Python version:\n",
"\n",
"- For Python 3.9: 'quay.io/modh/ray:2.35.0-py39-cu121'\n",
"- For Python 3.11: 'quay.io/modh/ray:2.35.0-py311-cu121'\n",
"\n",
"If you prefer to use a custom Ray image that better suits your needs, you can specify it in the image field to override the default."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "0f4bc870-091f-4e11-9642-cba145710159",
"metadata": {},
"outputs": [],
"source": [
"# Create and configure our cluster object\n",
"# The SDK will try to find the name of your default local queue based on the annotation \"kueue.x-k8s.io/default-queue\": \"true\" unless you specify the local queue manually below\n",
"cluster_name = \"llama-factory-test\"\n",
"cluster = Cluster(ClusterConfiguration(\n",
" name=cluster_name,\n",
" head_cpu_requests=1,\n",
" head_cpu_limits=1,\n",
" head_memory_requests=6,\n",
" head_memory_limits=6,\n",
" head_extended_resource_requests={'nvidia.com/gpu':0}, # For GPU enabled workloads set the head_extended_resource_requests and worker_extended_resource_requests\n",
" worker_extended_resource_requests={'nvidia.com/gpu':0},\n",
" num_workers=2,\n",
" worker_cpu_requests='250m',\n",
" worker_cpu_limits=1,\n",
" worker_memory_requests=4,\n",
" worker_memory_limits=6,\n",
" image=\"quay.io/wangzheng422/qimgs:llama-factory-ray-20241226-v01\", # Optional Field \n",
" write_to_file=False, # When enabled Ray Cluster yaml files are written to /HOME/.codeflare/resources \n",
" # local_queue=\"local-queue-name\" # Specify the local queue manually\n",
"))"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "f0884bbc-c224-4ca0-98a0-02dfa09c2200",
"metadata": {},
"outputs": [],
"source": [
"# Bring up the cluster\n",
"cluster.up()\n",
"cluster.wait_ready()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "df71c1ed",
"metadata": {},
"outputs": [],
"source": [
"cluster.details()"
]
},
{
"cell_type": "markdown",
"id": "33663f47",
"metadata": {},
"source": [
"This time we will demonstrate another potential method of use: working with the Ray cluster interactively.\n",
"\n",
"Using the SDK, we can get both the Ray cluster URI and dashboard URI:"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "c1719bca",
"metadata": {},
"outputs": [],
"source": [
"ray_dashboard_uri = cluster.cluster_dashboard_uri()\n",
"ray_cluster_uri = cluster.cluster_uri()\n",
"print(ray_dashboard_uri)\n",
"print(ray_cluster_uri)"
]
},
{
"cell_type": "markdown",
"id": "2a2aca6a",
"metadata": {},
"source": [
"Now we can connect directly to our Ray cluster via the Ray python client:"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "c9436436",
"metadata": {},
"outputs": [],
"source": [
"from codeflare_sdk import generate_cert\n",
"# Create required TLS cert and export the environment variables to enable TLS\n",
"generate_cert.generate_tls_cert(cluster_name, cluster.config.namespace)\n",
"generate_cert.export_env(cluster_name, cluster.config.namespace)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "300146dc",
"metadata": {},
"outputs": [],
"source": [
"# before proceeding make sure the cluster exists and the uri is not empty\n",
"assert ray_cluster_uri, \"Ray cluster needs to be started and set before proceeding\"\n",
"\n",
"import ray\n",
"\n",
"# reset the ray context in case there's already one. \n",
"ray.shutdown()\n",
"# establish connection to ray cluster\n",
"\n",
"# install additional libraries that will be required for model training\n",
"# runtime_env = {\"pip\": [\"transformers==4.41.2\", \"datasets==2.17.0\", \"accelerate==0.31.0\", \"scikit-learn==1.5.0\"]}\n",
"runtime_env = {}\n",
"# NOTE: This will work for in-cluster notebook servers (RHODS/ODH), but not for local machines\n",
"# To see how to connect from your laptop, go to demo-notebooks/additional-demos/local_interactive.ipynb\n",
"ray.init(address=ray_cluster_uri, runtime_env=runtime_env)\n",
"\n",
"print(\"Ray cluster is up and running: \", ray.is_initialized())"
]
},
{
"cell_type": "markdown",
"id": "9711030b",
"metadata": {},
"source": [
"Now that we are connected (and have passed in some package requirements), let's try writing some training code:"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "1b36e0d9",
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"import subprocess\n",
"import re\n",
"\n",
"@ray.remote\n",
"class NetworkCommandActor:\n",
" def get_eth0_ip(self):\n",
" try:\n",
" result = subprocess.run(['ip', 'a', 'show', 'eth0'], capture_output=True, text=True, check=True)\n",
" ip_match = re.search(r'inet (\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3})', result.stdout)\n",
" if ip_match:\n",
" return ip_match.group(1)\n",
" else:\n",
" return \"IP address not found\"\n",
" except subprocess.CalledProcessError as e:\n",
" return f\"Error getting IP address: {e}\"\n",
"\n",
" def execute_command(self, ip_address, nnodes, node_rank):\n",
" command = f'FORCE_TORCHRUN=1 NNODES={nnodes} NODE_RANK={node_rank} MASTER_ADDR={ip_address} MASTER_PORT=29500 llamafactory-cli train examples/train_full/llama3_full_sft.yaml'\n",
" try:\n",
" os.system(command)\n",
" return \"Command executed successfully\"\n",
" except Exception as e:\n",
" return f\"Error executing command: {e}\""
]
},
{
"cell_type": "markdown",
"id": "d4d8fd65",
"metadata": {},
"source": [
"Once we want to test our code out, we can run the training function we defined above remotely on our Ray cluster:"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "5901d958",
"metadata": {},
"outputs": [],
"source": [
"#call the above cell as a remote ray function\n",
"actor1 = NetworkCommandActor.remote()\n",
"actor2 = NetworkCommandActor.remote()\n",
"\n",
"ip1 = ray.get(actor1.get_eth0_ip.remote())\n",
"ip2 = ray.get(actor2.get_eth0_ip.remote())\n",
"\n",
"print(f\"Actor 1 IP: {ip1}\")\n",
"print(f\"Actor 2 IP: {ip2}\")\n",
"\n",
"# Define nnodes and node_rank\n",
"nnodes = 2 # Assuming there are 2 nodes\n",
"node_rank1 = 0 # Rank for actor 1\n",
"node_rank2 = 1 # Rank for actor 2\n",
"\n",
"# Example of executing command with actor 1's IP\n",
"result1 = ray.get(actor1.execute_command.remote(ip1, nnodes, node_rank1))\n",
"print(f\"Actor 1 command result: {result1}\")\n",
"\n",
"# Example of executing command with actor 2's IP\n",
"result2 = ray.get(actor2.execute_command.remote(ip2, nnodes, node_rank2))\n",
"print(f\"Actor 2 command result: {result2}\")"
]
},
{
"cell_type": "markdown",
"id": "5af8cd32",
"metadata": {},
"source": [
"Once complete, we can bring our Ray cluster down and clean up:"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "5f36db0f-31f6-4373-9503-dc3c1c4c3f57",
"metadata": {},
"outputs": [],
"source": [
"cluster.down()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "0d41b90e",
"metadata": {},
"outputs": [],
"source": [
"auth.logout()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.11"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
Loading

0 comments on commit f6ae616

Please sign in to comment.