Skip to content

[V1] Support DP with Ray #18779

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 54 commits into
base: main
Choose a base branch
from

Conversation

ruisearch42
Copy link
Collaborator

@ruisearch42 ruisearch42 commented May 27, 2025

This PR adds support for DP with Ray, with support for multi-node and API server scale out.

We reuse ZMQ communication mechanism between frontend and engine cores, as in #15977 , and the same API server scale out mechanism, as in #17546

Main differences from those PRs:

  • The handshake between frontend and engine cores are greatly simplified, thanks to the Ray API
  • We can launch all DP ranks just on the head node

Examples

This will run DP=4 on the head node.

# Head node  (with ip address 10.99.48.128)
vllm serve $MODEL --data-parallel-size 4 --data-parallel-size-local 4 \
                  --data-parallel-address 10.99.48.128 --data-parallel-rpc-port 13345
                  --data-parallel-backend ray

This will run DP=4 with DP ranks 0 and 1 on the head node and ranks 2 and 3 on other nodes.

# Head node  (with ip address 10.99.48.128)
vllm serve $MODEL --data-parallel-size 4 --data-parallel-size-local 2 \
                  --data-parallel-address 10.99.48.128 --data-parallel-rpc-port 13345
                  --data-parallel-backend ray

This will run DP=4 with only the API server on the head node and all engines other nodes:

# Head node  (with ip address 10.99.48.128)
vllm serve $MODEL --data-parallel-size 4 --data-parallel-size-local 0 \
                  --data-parallel-address 10.99.48.128 --data-parallel-rpc-port 13345
                  --data-parallel-backend ray

Design

See the following illustration. DP Coordinator is omitted, but is the same as #17546 .

image

njhill added 30 commits April 4, 2025 17:04
Signed-off-by: Nick Hill <[email protected]>
Signed-off-by: Nick Hill <[email protected]>
…-engines

Signed-off-by: Nick Hill <[email protected]>

# Conflicts:
#	vllm/v1/engine/core_client.py
#	vllm/v1/utils.py
Signed-off-by: Nick Hill <[email protected]>

# Conflicts:
#	vllm/config.py
#	vllm/engine/arg_utils.py
#	vllm/v1/engine/core.py
#	vllm/v1/engine/core_client.py
Signed-off-by: Nick Hill <[email protected]>

# Conflicts:
#	vllm/v1/engine/core.py
#	vllm/v1/engine/core_client.py
…-engines

Signed-off-by: Nick Hill <[email protected]>

# Conflicts:
#	vllm/config.py
#	vllm/v1/engine/core.py
Signed-off-by: Nick Hill <[email protected]>

# Conflicts:
#	vllm/v1/engine/core_client.py
#	vllm/v1/utils.py
…-engines

Signed-off-by: Nick Hill <[email protected]>

# Conflicts:
#	vllm/v1/engine/core.py
#	vllm/v1/engine/core_client.py
Avoid exception but still needs more work to be functional with multiple api server procs.

Signed-off-by: Nick Hill <[email protected]>
Signed-off-by: Nick Hill <[email protected]>
Signed-off-by: Nick Hill <[email protected]>
Signed-off-by: Nick Hill <[email protected]>

# Conflicts:
#	vllm/v1/engine/core_client.py
…nto all-to-all

Signed-off-by: Nick Hill <[email protected]>

# Conflicts:
#	vllm/entrypoints/openai/api_server.py
#	vllm/v1/engine/core.py
#	vllm/v1/engine/core_client.py
Signed-off-by: Nick Hill <[email protected]>
# Conflicts:
#	vllm/v1/core/sched/scheduler.py
Copy link

👋 Hi! Thank you for contributing to the vLLM project.

💬 Join our developer Slack at https://slack.vllm.ai to discuss your PR in #pr-reviews, coordinate on features in #feat- channels, or join special interest groups in #sig- channels.

Just a reminder: PRs would not trigger full CI run by default. Instead, it would only run fastcheck CI which starts running only a small and essential subset of CI tests to quickly catch errors. You can run other CI tests on top of those by going to your fastcheck build on Buildkite UI (linked in the PR checks section) and unblock them. If you do not have permission to unblock, ping simon-mo or khluu to add you in our Buildkite org.

Once the PR is approved and ready to go, your PR reviewer(s) can run CI to test the changes comprehensively before merging.

To run CI, PR reviewers can either: Add ready label to the PR or enable auto-merge.

🚀

Copy link
Contributor

@kouroshHakha kouroshHakha left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, looks good. There are some comments that need to be addressed and some follow ups that are not blockers.

vllm/v1/utils.py Outdated
Comment on lines 345 to 346
assert nodes[0].node_ip == head_node_ip, (
"The first node must be the head node")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check is not quite right and actually blocks.

-dpa can be localhost or 127.0.0.1 in which case it will violate this assertion and block the serve run.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is a single head node, 127.0.0.1 should work fine here. Regarding localhost, we should clearly define the expected IP or hostname.
If there are multiple IPs, 127.0.0.1 should not be passed as -dpa.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

even in 127.0.0.1, node_ip would return the physical ip and not 127.0.0.1. (think of using this on anyscale, the node.node_ip would return the actual ip of the head node)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, why should the user pass in 127.0.0.1? If they want to specify a public IP, they can pass in. If they want to use the current node IP, they can leave it out, current code:

        if self.data_parallel_address is None:
            if self.data_parallel_backend == "ray":
                host_ip = get_ip()
                logger.info(
                    "Using host IP %s as ray-based data parallel address",
                    host_ip)
                data_parallel_address = host_ip

Copy link
Contributor

@kouroshHakha kouroshHakha May 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, why should the user pass in 127.0.0.1? If they want to specify a public IP

Because it's a simple ip to pass in when you mean local :) without having to do a get_ip yourself. I know they can leave it out, but explicit should also work I think. I don't think you are really gaining anything by checking if node.ip == specified_ip

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still don't feel passing in 127.0.0.1 is necessary :) but it might be convenient.

In that case, we may just resolve to host IP, just like when None is passed in, if we want to support it.

Checking node.ip == dp_master_ip is just to respect dp_size_local, or more accurately, the DP ranks to allocate on DP master node. If we don't need that functionality, we can remove it. Currently it gives a knob to decide how to allocate on DP master node.

vllm/v1/utils.py Outdated
key=lambda node: node.node_ip != head_node_ip)
assert nodes[0].node_ip == head_node_ip, (
"The first node must be the head node")
assert len(nodes) == 1 or nodes[1].node_ip != head_node_ip, (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand the second condition in or?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it is multi-node, the second node (after sorting) cannot have the same IP as the head node.

Copy link
Contributor

@kouroshHakha kouroshHakha May 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how can this even happen? I feel like we can remove this assertion and it will never get violated? Also there would be no reason to sort.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah this should not happen, using assertion is just for defensive programming, in cases things messed up. We can remove this as well, should be pretty safe as you mentioned.

See the other comments regarding why the sorting is preferred. I originally did not use sorting, but code is cleaner with it.

vllm/v1/utils.py Outdated
else:
logger.info("Creating placement groups")
nodes = list_nodes()
nodes = sorted(list_nodes(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need to sort the nodes by node ip?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorting makes the head node always the first node, and worker nodes follow. This makes it easier to create and manage placement_groups/local_dp_ranks

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why would sorting put head node first? What is the comparison basis? Is this a property of node objects returned by ray api?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh because of the lambda we use:

            nodes = sorted(list_nodes(),
                           key=lambda node: node.node_ip != head_node_ip)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh I C. You are doing all this sorting for the check below, right? Or you need sorting for another reason?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

basically to be able to respect dp_size_local. With sorting the code is simpler.

@@ -618,6 +619,11 @@ def add_cli_args(parser: FlexibleArgumentParser) -> FlexibleArgumentParser:
type=int,
help='Port for data parallel RPC '
'communication.')
parallel_group.add_argument('--data-parallel-backend',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add default as EngineArgs.data_parallel_backend

Copy link
Collaborator Author

@ruisearch42 ruisearch42 May 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the class definition of EngineArgs, there is already

data_parallel_backend: str = ParallelConfig.data_parallel_backend

and ParallelConfig.data_parallel_backend is by default mp.

This follows the current convention of defining defaults.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When you don't pass --data-parallel-backend through cli it will be None and will barf later where you check it should be either be ray or mp. Try it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

vllm/v1/utils.py Outdated
assert available_engine_count >= local_engine_count, (
"Not enough resources to allocate DP ranks "
f"on DP master node {node_ip}")
for i in range(local_engine_count):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so if I don't pass data_parallel_local when using ray, I think the current config path chooses dp_size for data_parallel_local size, but it does not make sense to make this loop the default one. This loop, should only be on the critical path if you really want granular control over selecting a certain amount of dp workers to be guaranteed to run locally.

So to summarize

vllm serve $MODEL -dp 4 -dpb ray

should pass through bundles = [{"GPU": 1.0}] * world_size + [{"CPU": 1.0}] as critical path not the other one.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah you are right. If we don't specify dp_size_local then dp_size will be used. This behavior is the same as MP.

To achieve the behavior you mentioned (and respect dp_size_local), we could do one of the following:

  1. Use special value (None or -1) for dp_size_local to mean "not specified". However, MP probably does not like these special values.
  2. Use another ray only flag (e.g., dp_size_local_ignore, dp_size_local_auto or some better name) to override dp_size_local

Out of these, 2) is probably better.

Or for ray we simply do not respect dp_size_local? Is this your preference?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can do 2 without adding a new flag. Basically the default for dp_size_local is established at runtime, so you can set it to 0 if it's not specified and backend is ray.

Copy link
Collaborator Author

@ruisearch42 ruisearch42 May 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dp_size_local=0 means "don't allocate on DP master node" (in current semantics, including MP). Did you mean to change it to "allocate on DP master node (and other nodes) based on resource availability"?

I think you are suggesting 1) but with 0 as the special value. Not sure if 0 is a good special value:

  1. The semantics differ between MP and Ray, and
  2. 0 is a bit confusing when it means "unspecified". It's weird that when dp_size_local is respected when the value > 0, but when it is 0, it means "unspecified".

ray.util.remove_placement_group(pg)


def wait_for_engine_startup(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a todo to unify this with the other wait_for_enigne_startup or sth. There is a few redundancies between this and the other one.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is MP only, and Ray doesn't actually call this method. Ray wait for engine startup is trivial, basically:

        refs.append(actor.wait_for_init.remote()) for actor in actors
        ray.get(refs)

And there is no need to introduce a method for Ray.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I missed that this was from nick's PR.

f"died with exit code {proc.exitcode}")

if actor_run_refs:
import ray
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import at the top?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not every vLLM installation has ray. This avoids import error for cases don't need to use ray.

@@ -164,9 +284,328 @@ def finished_procs(self) -> dict[str, int]:
}


class CoreEngineActorManager:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this go to ray_utils.py?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently there is a ray_utils.py in vllm/executor. That is not a proper place.
I also tried moving this to a new file ray_dp.py in vllm/v1, and feels it does not actually improve the organization: 1) the imports will be circular unless we do larger refactoring, e.g. reorganizing vllm/v1/utils.py ; 2) placing CoreEngineProcManager and CoreEngineActorManager closer for now groups similar functionalities.
I feel refactoring is better done as a follow up, where we reorganize vllm/v1/utils.py as well.

vllm/v1/utils.py Outdated
placement_groups = []
local_dp_ranks = []

for node in nodes:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we re-iterate on why we need to be aware of nodes when using ray? We could be very much in a cluster that has not scaled, as a result the nodes would be just a cpu head node. Why not do sth simple and general like the following?

if placement_groups is not None:
    ...
else: 
   placement_groups = []
   for _ in range(dp_size):
        bundles = [{"GPU": 1.0}] * world_size + [{"CPU": 1.0}]
        pg = ray.util.placement_group(
            name=f"dp_rank_{len(placement_groups)}",
            strategy="STRICT_PACK",
            bundles=bundles,
        )
        placement_groups.append(pg)

We should not have any diff between local / remote actors then.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do think we should merge this and then re-evaluate the need for keep local vs. remote concept at all.

Copy link
Collaborator Author

@ruisearch42 ruisearch42 May 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah good point. We may end up getting rid of the node IP placement restriction. I wanted to use this in Ray Serve and clarify the exact requirements and preference, so thought it would be better to re-evaluate (this is an easy change, internal to Ray DP) rather than optimizing too early.

Copy link

mergify bot commented May 28, 2025

This pull request has merge conflicts that must be resolved before it can be
merged. Please rebase the PR, @ruisearch42.

https://docs.github.com/en/pull-requests/collaborating-with-pull-requests/working-with-forks/syncing-a-fork

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants