Skip to content

Commit

Permalink
Add Ray data virtual cluster test (#453)
Browse files Browse the repository at this point in the history
* ray.nodes fetch only the virtual cluster nodes

Signed-off-by: NKcqx <[email protected]>

* pass UT

Signed-off-by: NKcqx <[email protected]>

* support cluster_resources split virtual_cluster

Signed-off-by: NKcqx <[email protected]>
* available_resources support virtual_cluster

Signed-off-by: NKcqx <[email protected]>

* job only get virtual cluster data

Signed-off-by: NKcqx <[email protected]>

* fix ut

Signed-off-by: NKcqx <[email protected]>

* rm outdated warning

Signed-off-by: NKcqx <[email protected]>

* assert job in certain virtual cluster

Signed-off-by: NKcqx <[email protected]>

* pass UT

Signed-off-by: NKcqx <[email protected]>

* add UT

Signed-off-by: NKcqx <[email protected]>

* lint codes

Signed-off-by: NKcqx <[email protected]>

* add barrier for sync job progress

Signed-off-by: NKcqx <[email protected]>

* more comments

Signed-off-by: NKcqx <[email protected]>

---------

Signed-off-by: NKcqx <[email protected]>
  • Loading branch information
NKcqx authored Jan 9, 2025
1 parent c362087 commit bbaf5ea
Show file tree
Hide file tree
Showing 3 changed files with 534 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,6 @@ def set_normal_task_info(self, key, value):
ray.init(address="auto")
storage = ray.get_actor(name="{storage_actor_name}", namespace="storage")
@ray.remote
def access_nodes():
return ray.nodes()
Expand Down Expand Up @@ -822,10 +821,10 @@ async def test_list_nodes(job_sdk_client):
"_system_config": {"gcs_actor_scheduling_enabled": False},
"ntemplates": 3,
},
# {
# "_system_config": {"gcs_actor_scheduling_enabled": True},
# "ntemplates": 3,
# },
{
"_system_config": {"gcs_actor_scheduling_enabled": True},
"ntemplates": 3,
},
],
indirect=True,
)
Expand Down
113 changes: 113 additions & 0 deletions python/ray/data/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,15 @@
import ray
import ray.util.state
from ray._private.internal_api import get_memory_info_reply, get_state_from_address
from ray._private.ray_constants import DEFAULT_DASHBOARD_AGENT_LISTEN_PORT
from ray._private.test_utils import format_web_url, wait_until_server_available
from ray._private.utils import _get_pyarrow_version
from ray.air.constants import TENSOR_COLUMN_NAME
from ray.air.util.tensor_extensions.arrow import ArrowTensorArray
from ray.data import Schema
from ray.data.block import BlockExecStats, BlockMetadata
from ray.data.tests.mock_server import * # noqa
from ray.job_submission import JobSubmissionClient

# Trigger pytest hook to automatically zip test cluster logs to archive dir on failure
from ray.tests.conftest import * # noqa
Expand Down Expand Up @@ -717,3 +720,113 @@ def _assert(last_snapshot):
@pytest.fixture(autouse=True, scope="function")
def log_internal_stack_trace_to_stdout(restore_data_context):
ray.data.context.DataContext.get_current().log_internal_stack_trace_to_stdout = True


@contextmanager
def _ray_start_cluster(**kwargs):
cluster_not_supported_ = kwargs.pop("skip_cluster", cluster_not_supported)
if cluster_not_supported_:
pytest.skip("Cluster not supported")
init_kwargs = get_default_fixture_ray_kwargs()
num_nodes = 0
do_init = False
# num_nodes & do_init are not arguments for ray.init, so delete them.
if "num_nodes" in kwargs:
num_nodes = kwargs["num_nodes"]
del kwargs["num_nodes"]
if "do_init" in kwargs:
do_init = kwargs["do_init"]
del kwargs["do_init"]
elif num_nodes > 0:
do_init = True
init_kwargs.update(kwargs)
namespace = init_kwargs.pop("namespace")
cluster = Cluster()
remote_nodes = []
for i in range(num_nodes):
if i > 0 and "_system_config" in init_kwargs:
del init_kwargs["_system_config"]
remote_nodes.append(cluster.add_node(**init_kwargs))
# We assume driver will connect to the head (first node),
# so ray init will be invoked if do_init is true
if len(remote_nodes) == 1 and do_init:
ray.init(address=cluster.address, namespace=namespace)
yield cluster
# The code after the yield will run as teardown code.
ray.shutdown()
cluster.shutdown()


def create_or_update_virtual_cluster(
webui_url, virtual_cluster_id, allocation_mode, replica_sets, revision
):
try:
resp = requests.post(
webui_url + "/virtual_clusters",
json={
"virtualClusterId": virtual_cluster_id,
"allocationMode": allocation_mode,
"replicaSets": replica_sets,
"revision": revision,
},
timeout=10,
)
result = resp.json()
print(result)
return result
except Exception as ex:
logger.info(ex)


@pytest.fixture
def create_virtual_cluster(request):
param = getattr(request, "param", {})
node_instances = param.get("node_instances", None)
virtual_cluster = param.get("virtual_cluster", None)
default_node_instances = [
("1c2g", 3),
("2c4g", 3),
("4c8g", 3),
]
default_virtual_cluster = {}
for i in range(3):
default_virtual_cluster[f"VIRTUAL_CLUSTER_{i}"] = {
"allocation_mode": "mixed",
"replica_sets": {
"1c2g": 1,
"2c4g": 1,
"4c8g": 1,
},
"revision": 0,
}
if node_instances is None or len(node_instances) == 0:
node_instances = default_node_instances
if virtual_cluster is None or len(virtual_cluster) == 0:
virtual_cluster = default_virtual_cluster
with _ray_start_cluster(do_init=True, num_nodes=1) as cluster:
webui_url = cluster.webui_url
ip, _ = webui_url.split(":")
webui_url = format_web_url(webui_url)
agent_address = f"{ip}:{DEFAULT_DASHBOARD_AGENT_LISTEN_PORT}"
assert wait_until_server_available(agent_address)
assert wait_until_server_available(cluster.webui_url) is True
for node_type, amount in node_instances:
num_cpus = node_type.split("c")[0]
for _ in range(amount):
cluster.add_node(
env_vars={"RAY_NODE_TYPE_NAME": node_type}, num_cpus=int(num_cpus)
)

for virtual_cluster_id, config in virtual_cluster.items():
create_or_update_virtual_cluster(
webui_url=webui_url,
virtual_cluster_id=virtual_cluster_id,
allocation_mode=config.get("allocation_mode", "mixed"),
replica_sets=config.get("replica_sets", {}),
revision=config.get("revision", 0),
)

yield (
cluster,
JobSubmissionClient(format_web_url(webui_url)),
)
Loading

0 comments on commit bbaf5ea

Please sign in to comment.