Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] Refactor ray._private.runtime_env.packaging.Protocol to make it extensible #49341

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions python/ray/_private/runtime_env/default_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,9 @@

def get_image_uri_plugin(ray_tmp_dir: str):
return ImageURIPlugin(ray_tmp_dir)


def get_protocols_provider():
from ray._private.runtime_env.protocol import ProtocolsProvider

return ProtocolsProvider
69 changes: 4 additions & 65 deletions python/ray/_private/runtime_env/packaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import logging
import os
import shutil
from enum import Enum
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Callable, List, Optional, Tuple
Expand All @@ -20,6 +19,7 @@
RAY_RUNTIME_ENV_IGNORE_GITIGNORE,
)
from ray._private.runtime_env.conda_utils import exec_cmd_stream_to_logger
from ray._private.runtime_env.protocol import Protocol
from ray._private.thirdparty.pathspec import PathSpec
from ray.experimental.internal_kv import (
_internal_kv_exists,
Expand Down Expand Up @@ -73,33 +73,6 @@ async def __aexit__(self, exc_type, exc, tb):
self.file.release()


class Protocol(Enum):
"""A enum for supported storage backends."""

# For docstring
def __new__(cls, value, doc=None):
self = object.__new__(cls)
self._value_ = value
if doc is not None:
self.__doc__ = doc
return self

GCS = "gcs", "For packages dynamically uploaded and managed by the GCS."
CONDA = "conda", "For conda environments installed locally on each node."
PIP = "pip", "For pip environments installed locally on each node."
UV = "uv", "For uv environments install locally on each node."
HTTPS = "https", "Remote https path, assumes everything packed in one zip file."
S3 = "s3", "Remote s3 path, assumes everything packed in one zip file."
GS = "gs", "Remote google storage path, assumes everything packed in one zip file."
FILE = "file", "File storage path, assumes everything packed in one zip file."

@classmethod
def remote_protocols(cls):
# Returns a list of protocols that support remote storage
# These protocols should only be used with paths that end in ".zip" or ".whl"
return [cls.HTTPS, cls.S3, cls.GS, cls.FILE]


def _xor_bytes(left: bytes, right: bytes) -> bytes:
if left and right:
return bytes(a ^ b for (a, b) in zip(left, right))
Expand Down Expand Up @@ -768,49 +741,15 @@ async def download_and_unpack_package(
elif protocol in Protocol.remote_protocols():
# Download package from remote URI
tp = None
install_warning = (
"Note that these must be preinstalled "
"on all nodes in the Ray cluster; it is not "
"sufficient to install them in the runtime_env."
)

if protocol == Protocol.S3:
try:
import boto3
from smart_open import open as open_file
except ImportError:
raise ImportError(
"You must `pip install smart_open` and "
"`pip install boto3` to fetch URIs in s3 "
"bucket. " + install_warning
)
tp = {"client": boto3.client("s3")}
elif protocol == Protocol.GS:
try:
from google.cloud import storage # noqa: F401
from smart_open import open as open_file
except ImportError:
raise ImportError(
"You must `pip install smart_open` and "
"`pip install google-cloud-storage` "
"to fetch URIs in Google Cloud Storage bucket."
+ install_warning
)
elif protocol == Protocol.FILE:
if protocol == Protocol.FILE:
pkg_uri = pkg_uri[len("file://") :]

def open_file(uri, mode, *, transport_params=None):
return open(uri, mode)

else:
try:
from smart_open import open as open_file
except ImportError:
raise ImportError(
"You must `pip install smart_open` "
f"to fetch {protocol.value.upper()} URIs. "
+ install_warning
)
tp = protocol.get_smart_open_transport_params()
from smart_open import open as open_file

with open_file(pkg_uri, "rb", transport_params=tp) as package_zip:
with open_file(pkg_file, "wb") as fin:
Expand Down
96 changes: 96 additions & 0 deletions python/ray/_private/runtime_env/protocol.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import enum
from ray._private.runtime_env.default_impl import get_protocols_provider


class ProtocolsProvider:
@classmethod
def get_protocols(cls):
return {
# For packages dynamically uploaded and managed by the GCS.
"gcs",
# For conda environments installed locally on each node.
"conda",
# For pip environments installed locally on each node.
"pip",
# For uv environments install locally on each node.
"uv",
# Remote https path, assumes everything packed in one zip file.
"https",
# Remote s3 path, assumes everything packed in one zip file.
"s3",
# Remote google storage path, assumes everything packed in one zip file.
"gs",
# File storage path, assumes everything packed in one zip file.
"file",
}

@classmethod
def get_remote_protocols(cls):
return {"https", "s3", "gs", "file"}

@classmethod
def get_smart_open_transport_params(cls, protocol):
jjyao marked this conversation as resolved.
Show resolved Hide resolved
install_warning = (
"Note that these must be preinstalled "
"on all nodes in the Ray cluster; it is not "
"sufficient to install them in the runtime_env."
)

if protocol == "s3":
try:
import boto3
from smart_open import open as open_file # noqa: F401
except ImportError:
raise ImportError(
"You must `pip install smart_open` and "
"`pip install boto3` to fetch URIs in s3 "
"bucket. " + install_warning
)
return {"client": boto3.client("s3")}
elif protocol == "gs":
try:
from google.cloud import storage # noqa: F401
from smart_open import open as open_file # noqa: F401
except ImportError:
raise ImportError(
"You must `pip install smart_open` and "
"`pip install google-cloud-storage` "
"to fetch URIs in Google Cloud Storage bucket." + install_warning
)
return None
else:
try:
from smart_open import open as open_file # noqa: F401
except ImportError:
raise ImportError(
"You must `pip install smart_open` "
f"to fetch {protocol.upper()} URIs. " + install_warning
)
return None


_protocols_provider = get_protocols_provider()

Protocol = enum.Enum(
"Protocol",
{protocol.upper(): protocol for protocol in _protocols_provider.get_protocols()},
)


@classmethod
def _remote_protocols(cls):
# Returns a list of protocols that support remote storage
# These protocols should only be used with paths that end in ".zip" or ".whl"
return [
cls[protocol.upper()] for protocol in _protocols_provider.get_remote_protocols()
]


Protocol.remote_protocols = _remote_protocols


def _get_smart_open_transport_params(self):
return _protocols_provider.get_smart_open_transport_params(self.value)


Protocol.get_smart_open_transport_params = _get_smart_open_transport_params
Loading