Skip to content

no-jira: add remote offline batch inference with vllm example #848

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Remote Offline Batch Inference with Ray Data & vLLM Example\n",
"\n",
"This notebook presumes:\n",
"- You have a Ray Cluster URL given to you to run workloads on\n"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"from codeflare_sdk import RayJobClient\n",
"\n",
"# Setup Authentication Configuration\n",
"auth_token = \"XXXX\"\n",
"header = {\"Authorization\": f\"Bearer {auth_token}\"}"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"# Gather the dashboard URL (provided by the creator of the RayCluster)\n",
"ray_dashboard = \"XXXX\" # Replace with the Ray dashboard URL\n",
"\n",
"# Initialize the RayJobClient\n",
"client = RayJobClient(address=ray_dashboard, headers=header, verify=True)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Simple Example Explanation\n",
"\n",
"With the RayJobClient instantiated, lets run some batch inference. The following code is stored in `simple_batch_inf.py`, and is used as the entrypoint for the RayJob.\n",
"\n",
"What this processor configuration does:\n",
"- Set up a vLLM engine with your model\n",
"- Configure some settings for GPU processing\n",
"- Defines batch processing parameters (8 requests per batch, 2 GPU workers)\n",
"\n",
"```python\n",
"import ray\n",
"from ray.data.llm import build_llm_processor, vLLMEngineProcessorConfig\n",
"\n",
"processor_config = vLLMEngineProcessorConfig(\n",
" model_source=\"replace-me\",\n",
" engine_kwargs=dict(\n",
" enable_lora=False,\n",
" dtype=\"half\",\n",
" max_model_len=1024,\n",
" ),\n",
" batch_size=8,\n",
" concurrency=2,\n",
")\n",
"```"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"With the config defined, we can instantiate the processor. This enables batch inference by processing multiple requests through the vLLM engine, with two key steps:\n",
"- **Preprocess**: Converts each row into a structured chat format with system instructions and user queries, preparing the input for the LLM\n",
"- **Postprocess**: Extracts only the generated text from the model response, cleaning up the output\n",
"\n",
"The processor defines the pipeline that will be applied to each row in the dataset, enabling efficient batch processing through Ray Data's distributed execution framework.\n",
"\n",
"```python\n",
"processor = build_llm_processor(\n",
" processor_config,\n",
" preprocess=lambda row: dict(\n",
" messages=[\n",
" {\n",
" \"role\": \"system\",\n",
" \"content\": \"You are a calculator. Please only output the answer \"\n",
" \"of the given equation.\",\n",
" },\n",
" {\"role\": \"user\", \"content\": f\"{row['id']} ** 3 = ?\"},\n",
" ],\n",
" sampling_params=dict(\n",
" temperature=0.3,\n",
" max_tokens=20,\n",
" detokenize=False,\n",
" ),\n",
" ),\n",
" postprocess=lambda row: {\n",
" \"resp\": row[\"generated_text\"],\n",
" },\n",
")\n",
"```"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now we can run the batch inference pipeline on our data, it will:\n",
"- In the background, the processor will download the model into memory where vLLM serves it locally (on Ray Cluster) for use in inference\n",
"- Generate a sample Ray Dataset with 32 rows (0-31) to process\n",
"- Run the LLM processor on the dataset, triggering the preprocessing, inference, and postprocessing steps\n",
"- Execute the lazy pipeline and loads results into memory\n",
"- Iterate through all outputs and print each response \n",
"\n",
"```python\n",
"ds = ray.data.range(30)\n",
"ds = processor(ds)\n",
"ds = ds.materialize()\n",
"\n",
"for out in ds.take_all():\n",
" print(out)\n",
" print(\"==========\")\n",
"```\n",
"\n",
"### Job Submission\n",
"\n",
"Now we can submit this job against the Ray Cluster using the `RayJobClient` from earlier "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"entrypoint_command = \"python simple_batch_inf.py\"\n",
"\n",
"submission_id = client.submit_job(\n",
" entrypoint=entrypoint_command,\n",
" runtime_env={\"working_dir\": \"./\", \"pip\": \"requirements.txt\"},\n",
")\n",
"\n",
"print(submission_id + \" successfully submitted\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Get the job's status\n",
"client.get_job_status(submission_id)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Get the job's logs\n",
"client.get_job_logs(submission_id)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": ".venv",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.12"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
vllm
transformers
triton>=2.0.0
torch>=2.0.0
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import ray
from ray.data.llm import build_llm_processor, vLLMEngineProcessorConfig


# 1. Construct a vLLM processor config.
processor_config = vLLMEngineProcessorConfig(
# The base model.
model_source="unsloth/Llama-3.2-1B-Instruct",
# vLLM engine config.
engine_kwargs=dict(
enable_lora=False,
# # Older GPUs (e.g. T4) don't support bfloat16. You should remove
# # this line if you're using later GPUs.
dtype="half",
# Reduce the model length to fit small GPUs. You should remove
# this line if you're using large GPUs.
max_model_len=1024,
),
# The batch size used in Ray Data.
batch_size=8,
# Use one GPU in this example.
concurrency=1,
# If you save the LoRA adapter in S3, you can set the following path.
# dynamic_lora_loading_path="s3://your-lora-bucket/",
)

# 2. Construct a processor using the processor config.
processor = build_llm_processor(
processor_config,
preprocess=lambda row: dict(
# Remove the LoRA model specification
messages=[
{
"role": "system",
"content": "You are a calculator. Please only output the answer "
"of the given equation.",
},
{"role": "user", "content": f"{row['id']} ** 3 = ?"},
],
sampling_params=dict(
temperature=0.3,
max_tokens=20,
detokenize=False,
),
),
postprocess=lambda row: {
"resp": row["generated_text"],
},
)

# 3. Synthesize a dataset with 32 rows.
ds = ray.data.range(32)
# 4. Apply the processor to the dataset. Note that this line won't kick off
# anything because processor is execution lazily.
ds = processor(ds)
# Materialization kicks off the pipeline execution.
ds = ds.materialize()

# 5. Print all outputs.
for out in ds.take_all():
print(out)
print("==========")
Loading