From e2b5edf0c07d1d86ec1a814310db07bf6e26c6a5 Mon Sep 17 00:00:00 2001 From: brosoul Date: Wed, 6 Nov 2024 00:02:50 +0800 Subject: [PATCH 01/18] init stream loader --- .../model_loader/stream/__init__.py | 16 ++ .../model_loader/stream/file.py | 203 ++++++++++++++++++ .../model_loader/stream/loader.py | 144 +++++++++++++ .../stream/requirements-stream.txt | 1 + .../model_loader/stream/utils.py | 197 +++++++++++++++++ 5 files changed, 561 insertions(+) create mode 100644 vllm/model_executor/model_loader/stream/__init__.py create mode 100644 vllm/model_executor/model_loader/stream/file.py create mode 100644 vllm/model_executor/model_loader/stream/loader.py create mode 100644 vllm/model_executor/model_loader/stream/requirements-stream.txt create mode 100644 vllm/model_executor/model_loader/stream/utils.py diff --git a/vllm/model_executor/model_loader/stream/__init__.py b/vllm/model_executor/model_loader/stream/__init__.py new file mode 100644 index 0000000000000..0320b75ba54cc --- /dev/null +++ b/vllm/model_executor/model_loader/stream/__init__.py @@ -0,0 +1,16 @@ +# Copyright 2024 The Aibrix Team. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +SUPPORTED_STREAM_STORAGE = ("s3", "tos", "local") diff --git a/vllm/model_executor/model_loader/stream/file.py b/vllm/model_executor/model_loader/stream/file.py new file mode 100644 index 0000000000000..b2623fe21295b --- /dev/null +++ b/vllm/model_executor/model_loader/stream/file.py @@ -0,0 +1,203 @@ +# Copyright 2024 The Aibrix Team. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from dataclasses import dataclass +from io import BytesIO +from pathlib import Path +from typing import NoReturn, Optional + +import numpy as np +from boto3.s3.transfer import TransferConfig + +from .utils import ( + _create_s3_client, + _parse_bucket_info_from_uri, + read_to_bytes_io, +) + +from vllm.logger import init_logger + +logger = init_logger(__name__) + + +class LoadFile: + def __init__(self, file_source: str) -> None: + self.file_source = file_source + + def load_whole_file(self, num_threads: int = 1) -> NoReturn: + raise NotImplementedError + + def load_to_bytes(self, offset: int, count: int) -> BytesIO: + raise NotImplementedError + + def load_to_buffer(self, offset: int, count: int) -> memoryview: + raise NotImplementedError + + def download(self, target_dir) -> NoReturn: + raise NotImplementedError + + +class LocalFile(LoadFile): + def __init__(self, file: str) -> None: + if not Path(file).exists(): + raise ValueError(f"file {file} not exist") + + self.file = file + super().__init__(file_source="local") + + def load_whole_file(self, num_threads: int = 1): + if num_threads != 1: + logger.warning( + f"num_threads {num_threads} is not supported for local file." + ) + + tensor_bytes = np.memmap( + self.file, + dtype=np.uint8, + mode="c", + ) + return tensor_bytes.tobytes() + + def load_to_bytes(self, offset: int, count: int): + return BytesIO(self.load_to_buffer(offset=offset, count=count)) + + def load_to_buffer(self, offset: int, count: int): + return np.memmap( + self.file, + dtype=np.uint8, + mode="r", + offset=offset, + shape=count, + ) + + +class RemoteFile(LoadFile): + def __init__(self, file: str, file_source: str) -> None: + self.file = file + super().__init__(file_source=file_source) + + def load_to_buffer(self, offset: int, count: int): + tensor_bytes = self.load_to_bytes(offset=offset, count=count) + return tensor_bytes.getbuffer() + + +class S3File(RemoteFile): + def __init__(self, file: str) -> None: + self.file = file + scheme, bucket_name, bucket_path = _parse_bucket_info_from_uri(file) + self.bucket_name = bucket_name + self.bucket_path = bucket_path + try: + s3_client = _create_s3_client() + s3_client.head_object(Bucket=bucket_name, Key=bucket_path) + except Exception as e: + raise ValueError(f"S3 bucket path {bucket_path} not exist for {e}.") + super().__init__(file=file, file_source="s3") + + def load_whole_file(self, num_threads: int): + s3_client = _create_s3_client() + + config_kwargs = { + "max_concurrency": num_threads, + "use_threads": True, + } + config = TransferConfig(**config_kwargs) + + data = BytesIO() + s3_client.download_fileobj( + Bucket=self.bucket_name, + Key=self.bucket_path, + Fileobj=data, + Config=config, + ) + return data.getbuffer() + + def load_to_bytes(self, offset: int, count: int): + s3_client = _create_s3_client() + + range_header = f"bytes={offset}-{offset+count-1}" + resp = s3_client.get_object( + Bucket=self.bucket_name, Key=self.bucket_path, Range=range_header + ) + return read_to_bytes_io(resp.get("Body")) + + +@dataclass +class StreamModel: + model_uri: str + num_threads: int = 16 + aws_access_key_id: Optional[str] = None + aws_secret_access_key: Optional[str] = None + aws_region: Optional[str] = None + aws_endpinit: Optional[str] = None + + def __post_init__(self): + scheme, bucket_name, bucket_path = _parse_bucket_info_from_uri(self.model_uri) + self.model_source_type = scheme + self.bucket_name = bucket_name + + # list config and safetensors files in model_uri + if self.model_source_type == "local": + local_dir = Path(bucket_path) + if not local_dir.exists(): + raise ValueError(f"local path {local_dir} not exist") + files = [file for file in local_dir.iterdir() if file.is_file()] + + self.config_files = [file for file in files if file.suffix == ".json"] + self.safetensors_files = [file for file in files if file.suffix == ".safetensors"] + else: + self.client = _create_s3_client(ak=self.aws_access_key_id, + sk=self.aws_secret_access_key, + endpoint=self.aws_endpinit, + region=self.aws_endpinit) + objects_out = self.client.list_objects_type2( + self.bucket_name, prefix=bucket_path, delimiter="/" + ) + files = [obj.key for obj in objects_out.contents] + + self.config_files = [file for file in files if file.endswith(".json")] + self.safetensors_files = [file for file in files if file.endswith(".safetensors")] + + if len(self.config_files) == 0: + raise ValueError(f"no config file found in {self.model_uri}") + if len(self.safetensors_files) == 0: + raise ValueError(f"no safetensors file found in {self.model_uri}") + + + def download_config(self, target_dir: str) -> Path: + if self.model_source_type == "local": + logger.info("local config no need to download") + return Path(self.model_uri) + + target_path = Path(target_dir) + target_path.mkdir(parents=True, exist_ok=True) + for config_file in self.config_files: + _file_name = config_file.split("/")[-1] + local_file = target_path.joinpath(_file_name).absolute() + config_kwargs = { + "max_concurrency": self.num_threads, + "use_threads": True, + } + config = TransferConfig(**config_kwargs) + self.client.download_file( + Bucket=self.bucket_name, + Key=config_file, + Filename=str( + local_file + ), # S3 client does not support Path, convert it to str + Config=config, + ) + return target_path + + \ No newline at end of file diff --git a/vllm/model_executor/model_loader/stream/loader.py b/vllm/model_executor/model_loader/stream/loader.py new file mode 100644 index 0000000000000..120343bd34ce5 --- /dev/null +++ b/vllm/model_executor/model_loader/stream/loader.py @@ -0,0 +1,144 @@ +# Copyright 2024 The Aibrix Team. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import concurrent.futures +import json +import queue +import struct +import threading +from typing import Generator, List, Tuple, Union + +import torch + +from .file import LoadFile +from .utils import TensorMeta, split_continue_tensors + + +def get_safetensors_metas(file: LoadFile): + LENTH_COUNT = 8 + length_bytes = file.load_to_bytes(offset=0, count=LENTH_COUNT) + length_of_header = struct.unpack(" None: + self.file = file + self.num_thread = num_thread + self.use_pinmem = use_pinmem + self.use_direct_io = use_direct_io + # TODO assert file type is safetensors + self.tensors_metas: List[TensorMeta] = get_safetensors_metas(file) + + def load_safetensors(self, device: Union[torch.device, str] = "cpu"): + return dict(self.get_weights_iterator(device=device)) + + + def _tensors_reader(self, + thread_idx, + barrier, + device: Union[torch.device, str], + tensor_metas: Tuple[TensorMeta], + transfer_out_queue: queue.SimpleQueue[Union[Exception, TensorMeta]] + ) -> None: + device = torch.device(device) + is_cuda = device.type == "cuda" + # TODO use stream nonblocking IO + for tensor_meta in tensor_metas: + tensor_buffer = self.file.load_to_buffer(offset=tensor_meta.real_offset, count=tensor_meta.count) + tensor = torch.frombuffer( + tensor_buffer, + dtype=tensor_meta.dtype + ).view(tensor_meta.shape) + if is_cuda: + tensor = tensor.to(device, non_blocking=True) + tensor_meta.set_tensor(tensor) + transfer_out_queue.put(tensor_meta) + + def get_weights_iterator( + self, + device: Union[torch.device, str] = "cpu" + ) -> Generator[Tuple[str, torch.Tensor], None, None]: + tensors_per_reader: List[Tuple[TensorMeta]] = split_continue_tensors(self.tensors_metas, self.num_thread) + + effective_num_readers = len(tensors_per_reader) + self._reader_pool = concurrent.futures.ThreadPoolExecutor( + max_workers=effective_num_readers, + thread_name_prefix="SafetensorsReader", + ) + transfer_out_queue: queue.SimpleQueue[Union[Exception, TensorMeta]] = queue.SimpleQueue() # type: ignore + futures: List[concurrent.futures.Future] = [] + + barrier = threading.Barrier(effective_num_readers) + + for thread_idx, tensor_metas in enumerate(tensors_per_reader): + future = self._reader_pool.submit( + self._tensors_reader, + thread_idx, + barrier, + device, + tensor_metas, + transfer_out_queue, + ) + futures.append(future) + + try: + for _ in range(len(self.tensors_metas)): + tensor_meta: TensorMeta = transfer_out_queue.get(timeout=3600) + if isinstance(tensor_meta, Exception): + raise tensor_meta + yield tensor_meta.name, tensor_meta.tensor + except BaseException: + raise + + def get_weights_iterator_wo_threads( + self, + device: Union[torch.device, str] = "cpu" + ) -> Generator[Tuple[str, torch.Tensor], None, None]: + + device = torch.device(device) + is_cuda = device.type == "cuda" + # TODO use stream nonblocking IO + for tensor_meta in self.tensors_metas: + tensor_buffer = self.file.load_to_bytes(offset=tensor_meta.real_offset, count=tensor_meta.count) + tensor = torch.frombuffer( + tensor_buffer, + dtype=tensor_meta.dtype + ).view(tensor_meta.shape) + + if is_cuda: + tensor = tensor.to(device, non_blocking=True) + # tensor_meta.set_tensor(tensor) + yield tensor_meta.name, tensor diff --git a/vllm/model_executor/model_loader/stream/requirements-stream.txt b/vllm/model_executor/model_loader/stream/requirements-stream.txt new file mode 100644 index 0000000000000..30ddf823b87c1 --- /dev/null +++ b/vllm/model_executor/model_loader/stream/requirements-stream.txt @@ -0,0 +1 @@ +boto3 diff --git a/vllm/model_executor/model_loader/stream/utils.py b/vllm/model_executor/model_loader/stream/utils.py new file mode 100644 index 0000000000000..bd2408a5cc221 --- /dev/null +++ b/vllm/model_executor/model_loader/stream/utils.py @@ -0,0 +1,197 @@ +import os +from io import BytesIO +from typing import List, Optional, Tuple +from urllib.parse import urlparse + +import boto3 +import torch +from botocore.config import Config + +from . import SUPPORTED_STREAM_STORAGE + + +def read_to_bytes_io(content, chunk_size=None): + chunk_size = int(os.getenv("STREAM_READ_CHUNK_SIZE", 8388608)) # 8MB + bytes_io = BytesIO() + buf = content.read(chunk_size) + while buf: + bytes_io.write(buf) + buf = content.read(chunk_size) + bytes_io.seek(0) + return bytes_io + + +def filter_suffix_files(files: List[str], suffix: str) -> List[str]: + return [file for file in files if file.endswith(suffix)] + + +def get_dtype(dtype_str: str): + # torch.float8 formats require 2.1; we do not support these dtypes on earlier versions + _float8_e4m3fn = getattr(torch, "float8_e4m3fn", None) + _float8_e5m2 = getattr(torch, "float8_e5m2", None) + _TYPES = { + "F64": torch.float64, + "F32": torch.float32, + "F16": torch.float16, + "BF16": torch.bfloat16, + "I64": torch.int64, + # "U64": torch.uint64, + "I32": torch.int32, + # "U32": torch.uint32, + "I16": torch.int16, + # "U16": torch.uint16, + "I8": torch.int8, + "U8": torch.uint8, + "BOOL": torch.bool, + "F8_E4M3": _float8_e4m3fn, + "F8_E5M2": _float8_e5m2, + } + return _TYPES[dtype_str] + + +def _parse_bucket_info_from_uri(uri: str) -> Tuple[str, str, str]: + parsed = urlparse(uri) + scheme = parsed.scheme + # uri is local path when scheme is empty + scheme = "local" if scheme == "" else scheme + if scheme not in SUPPORTED_STREAM_STORAGE: + raise ValueError(f"{scheme} not supported, + only {SUPPORTED_STREAM_STORAGE} supported") + + bucket_name = parsed.netloc + bucket_path = parsed.path.lstrip("/") if scheme != "" else parsed.path + return scheme, bucket_name, bucket_path + + +def _create_s3_client(ak, sk, endpoint, region): + ak = ak or os.getenv("AWS_ACCESS_KEY_ID") + sk = sk or os.getenv("AWS_SECRET_ACCESS_KEY") + endpoint = endpoint or os.getenv("AWS_ENDPOINT_URL") + region = region or os.getenv("AWS_REGION") + + my_config = Config( + # signature_version = 'v4', + s3={"addressing_style": "virtual"} + ) + return boto3.client( + service_name="s3", + region_name=region, + endpoint_url=endpoint, + aws_access_key_id=ak, + aws_secret_access_key=sk, + config=my_config + ) + + +class TensorMeta: + def __init__(self, name: str, base_offset: int, dtype: str, shape: List[int], data_offsets: List[int]) -> None: + self._name = name + self._base_offset = base_offset + self._dtype = get_dtype(dtype) + self._shape = shape + self._data_offsets = data_offsets + self._tensor = None + + @property + def name(self) -> str: + return self._name + + @property + def dtype(self) -> torch.dtype: + return self._dtype + + @property + def shape(self) -> List[int]: + return self._shape + + @property + def data_offsets(self) -> List[int]: + return self._data_offsets + + @property + def real_offset(self) -> List[int]: + return self._data_offsets[0] + self._base_offset + + @property + def count(self) -> List[int]: + return self._data_offsets[1] - self._data_offsets[0] + + @property + def tensor(self) -> Optional[torch.Tensor]: + return self._tensor + + def set_tensor(self, tensor: torch.Tensor) -> None: + self._tensor = tensor + + def __str__(self) -> str: + return str( + { + "name": self._name, + "dtype": self._dtype, + "shape": self._shape, + "data_offsets": self._data_offsets, + } + ) + + def __repr__(self) -> str: + return self.__str__() + + +def split_continue_tensors(tensor_metas: List[TensorMeta], num_readers:int) -> List[Tuple[TensorMeta]]: + """ + Note: Usually, the number of groups for splitting tensors + is greater than num_deaders. + """ + assert len(tensor_metas) > 0, "tensor_metas should not be empty" + assert num_readers > 0, "num_readers should be greater than 0" + + if len(tensor_metas) <= num_readers: + return [(item,) for item in tensor_metas] + + max_offset = tensor_metas[-1].data_offsets[1] + avg_size = max_offset // num_readers + group = [] + groups = [] + group_size = 0 + for tensor_meta in tensor_metas: + if len(group) == 0 or group_size + tensor_meta.count <= avg_size: + group.append(tensor_meta) + group_size += tensor_meta.count + else: + groups.append(tuple(group)) + group = [tensor_meta] + group_size = tensor_meta.count + + if len(group) != 0: + groups.append(tuple(group)) + return groups + + +def split_continue_tensors_v1(tensor_metas: List[TensorMeta], num_readers:int) -> List[Tuple[TensorMeta]]: + assert len(tensor_metas) > 0, "tensor_metas should not be empty" + assert num_readers > 0, "num_readers should be greater than 0" + + if len(tensor_metas) <= num_readers: + return [(item,) for item in tensor_metas] + + max_offset = tensor_metas[-1].data_offsets[1] + avg_size = max_offset // num_readers + group = [] + groups = [] + current_max_offset = avg_size + for tensor_meta in tensor_metas: + start, end = tensor_meta.data_offsets + while start >= current_max_offset: + current_max_offset += avg_size + + if end <= current_max_offset: + group.append(tensor_meta) + else: + if len(group) != 0: + groups.append(tuple(group)) + group = [tensor_meta] + + current_max_offset += avg_size + if len(group) != 0: + groups.append(tuple(group)) + return groups From b3ece61de1c28864d62bdd3e3cfcd1e829c66609 Mon Sep 17 00:00:00 2001 From: brosoul Date: Wed, 6 Nov 2024 15:37:30 +0800 Subject: [PATCH 02/18] use boto3 to get object from tos and s3 --- .../model_loader/stream/file.py | 149 ++++++++---------- .../model_loader/stream/loader.py | 147 +++++++++++++---- .../model_loader/stream/utils.py | 58 ++++--- 3 files changed, 216 insertions(+), 138 deletions(-) diff --git a/vllm/model_executor/model_loader/stream/file.py b/vllm/model_executor/model_loader/stream/file.py index b2623fe21295b..cd8c381aa0454 100644 --- a/vllm/model_executor/model_loader/stream/file.py +++ b/vllm/model_executor/model_loader/stream/file.py @@ -18,6 +18,7 @@ from typing import NoReturn, Optional import numpy as np +import boto3 from boto3.s3.transfer import TransferConfig from .utils import ( @@ -32,6 +33,7 @@ class LoadFile: + def __init__(self, file_source: str) -> None: self.file_source = file_source @@ -43,12 +45,13 @@ def load_to_bytes(self, offset: int, count: int) -> BytesIO: def load_to_buffer(self, offset: int, count: int) -> memoryview: raise NotImplementedError - + def download(self, target_dir) -> NoReturn: raise NotImplementedError class LocalFile(LoadFile): + def __init__(self, file: str) -> None: if not Path(file).exists(): raise ValueError(f"file {file} not exist") @@ -59,8 +62,7 @@ def __init__(self, file: str) -> None: def load_whole_file(self, num_threads: int = 1): if num_threads != 1: logger.warning( - f"num_threads {num_threads} is not supported for local file." - ) + f"num_threads {num_threads} is not supported for local file.") tensor_bytes = np.memmap( self.file, @@ -83,6 +85,7 @@ def load_to_buffer(self, offset: int, count: int): class RemoteFile(LoadFile): + def __init__(self, file: str, file_source: str) -> None: self.file = file super().__init__(file_source=file_source) @@ -91,23 +94,49 @@ def load_to_buffer(self, offset: int, count: int): tensor_bytes = self.load_to_bytes(offset=offset, count=count) return tensor_bytes.getbuffer() + def download_file(self, target_dir: str): + raise NotImplementedError + class S3File(RemoteFile): - def __init__(self, file: str) -> None: - self.file = file - scheme, bucket_name, bucket_path = _parse_bucket_info_from_uri(file) + + def __init__( + self, + scheme: str, + bucket_name: str, + bucket_path: str, + s3_client: Optional[boto3.client] = None, + s3_access_key_id: Optional[str] = None, + s3_secret_access_key: Optional[str] = None, + s3_region: Optional[str] = None, + s3_endpinit: Optional[str] = None, + ) -> None: self.bucket_name = bucket_name self.bucket_path = bucket_path + if s3_client is None: + try: + s3_client = _create_s3_client(ak=s3_access_key_id, + sk=s3_secret_access_key, + endpoint=s3_endpinit, + region=s3_region) + except Exception as e: + raise ValueError(f"create s3 client failed for {e}.") + self.s3_client = s3_client try: - s3_client = _create_s3_client() - s3_client.head_object(Bucket=bucket_name, Key=bucket_path) + self.s3_client.head_object(Bucket=bucket_name, Key=bucket_path) except Exception as e: - raise ValueError(f"S3 bucket path {bucket_path} not exist for {e}.") - super().__init__(file=file, file_source="s3") + raise ValueError("S3 bucket path {bucket_path} not exist for {e}.") - def load_whole_file(self, num_threads: int): - s3_client = _create_s3_client() + file = scheme + "://" + bucket_name + "/" + bucket_path + super().__init__(file=file, file_source=scheme) + @classmethod + def from_uri(cls, file_uri: str, **kwargs): + scheme, bucket_name, bucket_path = _parse_bucket_info_from_uri( + file_uri) + cls(scheme, bucket_name, bucket_path, **kwargs) + + def load_whole_file(self, num_threads: int): config_kwargs = { "max_concurrency": num_threads, "use_threads": True, @@ -115,7 +144,7 @@ def load_whole_file(self, num_threads: int): config = TransferConfig(**config_kwargs) data = BytesIO() - s3_client.download_fileobj( + self.s3_client.download_fileobj( Bucket=self.bucket_name, Key=self.bucket_path, Fileobj=data, @@ -124,80 +153,30 @@ def load_whole_file(self, num_threads: int): return data.getbuffer() def load_to_bytes(self, offset: int, count: int): - s3_client = _create_s3_client() - range_header = f"bytes={offset}-{offset+count-1}" - resp = s3_client.get_object( - Bucket=self.bucket_name, Key=self.bucket_path, Range=range_header - ) + resp = self.s3_client.get_object(Bucket=self.bucket_name, + Key=self.bucket_path, + Range=range_header) return read_to_bytes_io(resp.get("Body")) - -@dataclass -class StreamModel: - model_uri: str - num_threads: int = 16 - aws_access_key_id: Optional[str] = None - aws_secret_access_key: Optional[str] = None - aws_region: Optional[str] = None - aws_endpinit: Optional[str] = None - - def __post_init__(self): - scheme, bucket_name, bucket_path = _parse_bucket_info_from_uri(self.model_uri) - self.model_source_type = scheme - self.bucket_name = bucket_name - - # list config and safetensors files in model_uri - if self.model_source_type == "local": - local_dir = Path(bucket_path) - if not local_dir.exists(): - raise ValueError(f"local path {local_dir} not exist") - files = [file for file in local_dir.iterdir() if file.is_file()] - - self.config_files = [file for file in files if file.suffix == ".json"] - self.safetensors_files = [file for file in files if file.suffix == ".safetensors"] - else: - self.client = _create_s3_client(ak=self.aws_access_key_id, - sk=self.aws_secret_access_key, - endpoint=self.aws_endpinit, - region=self.aws_endpinit) - objects_out = self.client.list_objects_type2( - self.bucket_name, prefix=bucket_path, delimiter="/" - ) - files = [obj.key for obj in objects_out.contents] - - self.config_files = [file for file in files if file.endswith(".json")] - self.safetensors_files = [file for file in files if file.endswith(".safetensors")] - - if len(self.config_files) == 0: - raise ValueError(f"no config file found in {self.model_uri}") - if len(self.safetensors_files) == 0: - raise ValueError(f"no safetensors file found in {self.model_uri}") - - - def download_config(self, target_dir: str) -> Path: - if self.model_source_type == "local": - logger.info("local config no need to download") - return Path(self.model_uri) - + def download_file(self, target_dir: str, num_threads: int): + # ensure target dir exist target_path = Path(target_dir) target_path.mkdir(parents=True, exist_ok=True) - for config_file in self.config_files: - _file_name = config_file.split("/")[-1] - local_file = target_path.joinpath(_file_name).absolute() - config_kwargs = { - "max_concurrency": self.num_threads, - "use_threads": True, - } - config = TransferConfig(**config_kwargs) - self.client.download_file( - Bucket=self.bucket_name, - Key=config_file, - Filename=str( - local_file - ), # S3 client does not support Path, convert it to str - Config=config, - ) - return target_path - - \ No newline at end of file + + _file_name = self.bucket_path.split("/")[-1] + local_file = target_path.joinpath(_file_name).absolute() + config_kwargs = { + "max_concurrency": num_threads, + "use_threads": True, + } + config = TransferConfig(**config_kwargs) + self.s3_client.download_file( + Bucket=self.bucket_name, + Key=self.bucket_path, + Filename=str( + local_file + ), # S3 client does not support Path, convert it to str + Config=config, + ) + logger.info(f"download file from `{self.bucket_path}` to `{target_dir}` success.") diff --git a/vllm/model_executor/model_loader/stream/loader.py b/vllm/model_executor/model_loader/stream/loader.py index 120343bd34ce5..1b81019dfe7b9 100644 --- a/vllm/model_executor/model_loader/stream/loader.py +++ b/vllm/model_executor/model_loader/stream/loader.py @@ -13,16 +13,21 @@ # limitations under the License. import concurrent.futures +from dataclasses import dataclass import json +from pathlib import Path import queue import struct import threading -from typing import Generator, List, Tuple, Union +from typing import Generator, List, Tuple, Union, Optional import torch -from .file import LoadFile -from .utils import TensorMeta, split_continue_tensors +from .file import LoadFile, LocalFile, S3File +from .utils import TensorMeta, _create_s3_client, _parse_bucket_info_from_uri, split_continue_tensors +from vllm.logger import init_logger + +logger = init_logger(__name__) def get_safetensors_metas(file: LoadFile): @@ -38,18 +43,20 @@ def get_safetensors_metas(file: LoadFile): metas: List[TensorMeta] = [] for name, tensor_meta in tensors_meta.items(): - metas.append(TensorMeta( - name=name, - base_offset=base_offset, - dtype=tensor_meta["dtype"], - shape=tensor_meta["shape"], - data_offsets=tensor_meta["data_offsets"], - )) + metas.append( + TensorMeta( + name=name, + base_offset=base_offset, + dtype=tensor_meta["dtype"], + shape=tensor_meta["shape"], + data_offsets=tensor_meta["data_offsets"], + )) # Ensure tensors chunks could be split continuously return sorted(metas, key=lambda obj: obj.real_offset) class StreamLoader: + def __init__(self, file: LoadFile, num_thread: int = 32, @@ -65,23 +72,19 @@ def __init__(self, def load_safetensors(self, device: Union[torch.device, str] = "cpu"): return dict(self.get_weights_iterator(device=device)) - - def _tensors_reader(self, - thread_idx, - barrier, - device: Union[torch.device, str], - tensor_metas: Tuple[TensorMeta], - transfer_out_queue: queue.SimpleQueue[Union[Exception, TensorMeta]] - ) -> None: + def _tensors_reader( + self, thread_idx, barrier, device: Union[torch.device, str], + tensor_metas: Tuple[TensorMeta], + transfer_out_queue: queue.SimpleQueue[Union[Exception, TensorMeta]] + ) -> None: device = torch.device(device) is_cuda = device.type == "cuda" # TODO use stream nonblocking IO for tensor_meta in tensor_metas: - tensor_buffer = self.file.load_to_buffer(offset=tensor_meta.real_offset, count=tensor_meta.count) + tensor_buffer = self.file.load_to_buffer( + offset=tensor_meta.real_offset, count=tensor_meta.count) tensor = torch.frombuffer( - tensor_buffer, - dtype=tensor_meta.dtype - ).view(tensor_meta.shape) + tensor_buffer, dtype=tensor_meta.dtype).view(tensor_meta.shape) if is_cuda: tensor = tensor.to(device, non_blocking=True) tensor_meta.set_tensor(tensor) @@ -91,14 +94,16 @@ def get_weights_iterator( self, device: Union[torch.device, str] = "cpu" ) -> Generator[Tuple[str, torch.Tensor], None, None]: - tensors_per_reader: List[Tuple[TensorMeta]] = split_continue_tensors(self.tensors_metas, self.num_thread) + tensors_per_reader: List[Tuple[TensorMeta]] = split_continue_tensors( + self.tensors_metas, self.num_thread) effective_num_readers = len(tensors_per_reader) self._reader_pool = concurrent.futures.ThreadPoolExecutor( max_workers=effective_num_readers, thread_name_prefix="SafetensorsReader", ) - transfer_out_queue: queue.SimpleQueue[Union[Exception, TensorMeta]] = queue.SimpleQueue() # type: ignore + transfer_out_queue: queue.SimpleQueue[Union[ + Exception, TensorMeta]] = queue.SimpleQueue() # type: ignore futures: List[concurrent.futures.Future] = [] barrier = threading.Barrier(effective_num_readers) @@ -132,13 +137,99 @@ def get_weights_iterator_wo_threads( is_cuda = device.type == "cuda" # TODO use stream nonblocking IO for tensor_meta in self.tensors_metas: - tensor_buffer = self.file.load_to_bytes(offset=tensor_meta.real_offset, count=tensor_meta.count) + tensor_buffer = self.file.load_to_bytes( + offset=tensor_meta.real_offset, count=tensor_meta.count) tensor = torch.frombuffer( - tensor_buffer, - dtype=tensor_meta.dtype - ).view(tensor_meta.shape) + tensor_buffer, dtype=tensor_meta.dtype).view(tensor_meta.shape) if is_cuda: tensor = tensor.to(device, non_blocking=True) # tensor_meta.set_tensor(tensor) yield tensor_meta.name, tensor + + +@dataclass +class StreamModel: + model_uri: str + num_threads: int = 16 + s3_access_key_id: Optional[str] = None + s3_secret_access_key: Optional[str] = None + s3_region: Optional[str] = None + s3_endpinit: Optional[str] = None + + def __post_init__(self): + scheme, bucket_name, bucket_path = _parse_bucket_info_from_uri( + self.model_uri) + self.model_source_type = scheme + self.bucket_name = bucket_name + + # list config and safetensors files in model_uri + if self.model_source_type == "local": + local_dir = Path(self.model_uri) + if not local_dir.exists(): + raise ValueError(f"local path {local_dir} not exist") + files = [file for file in local_dir.iterdir() if file.is_file()] + + self.config_files = [ + file for file in files if file.suffix == ".json" + ] + self.safetensors_files = [ + file for file in files if file.suffix == ".safetensors" + ] + else: + self.s3_client = _create_s3_client(ak=self.s3_access_key_id, + sk=self.s3_secret_access_key, + endpoint=self.s3_endpinit, + region=self.s3_region, + num_threads=self.num_threads) + objects_out = self.s3_client.list_objects_v2( + Bucket=self.bucket_name, Delimiter="/", Prefix=bucket_path) + files = [ + content.get("Key") + for content in objects_out.get("Contents", []) + ] + self.config_files = [ + file for file in files if file.endswith(".json") + ] + self.safetensors_files = [ + file for file in files if file.endswith(".safetensors") + ] + + if len(self.config_files) == 0: + raise ValueError(f"no config file found in {self.model_uri}") + if len(self.safetensors_files) == 0: + raise ValueError(f"no safetensors file found in {self.model_uri}") + + def download_config(self, target_dir: str) -> Path: + if self.model_source_type == "local": + logger.info("local config no need to download") + return Path(self.model_uri) + + for config_file in self.config_files: + config_s3 = S3File(scheme=self.model_source_type, + bucket_name=self.bucket_name, + bucket_path=config_file, + s3_client=self.s3_client) + + config_s3.download_file(target_dir=target_dir, + num_threads=self.num_threads) + + target_path = Path(target_dir) + return target_path + + def get_weights_iterator(self, device: Union[torch.device, str] = "cpu"): + for safetensors_file in self.safetensors_files: + if self.model_source_type == "local": + safetensors_s3 = LocalFile(safetensors_file) + else: + safetensors_s3 = S3File(scheme=self.model_source_type, + bucket_name=self.bucket_name, + bucket_path=safetensors_file, + s3_client=self.s3_client) + safetensors_loader = StreamLoader( + file=safetensors_s3, + num_thread=self.num_threads, + ) + for name, tensor in safetensors_loader.get_weights_iterator( + device=device): + yield name, tensor diff --git a/vllm/model_executor/model_loader/stream/utils.py b/vllm/model_executor/model_loader/stream/utils.py index bd2408a5cc221..96ced8a5b87b5 100644 --- a/vllm/model_executor/model_loader/stream/utils.py +++ b/vllm/model_executor/model_loader/stream/utils.py @@ -9,6 +9,8 @@ from . import SUPPORTED_STREAM_STORAGE +DEFAULT_POOL_CONNECTIONS = 10 + def read_to_bytes_io(content, chunk_size=None): chunk_size = int(os.getenv("STREAM_READ_CHUNK_SIZE", 8388608)) # 8MB @@ -55,36 +57,42 @@ def _parse_bucket_info_from_uri(uri: str) -> Tuple[str, str, str]: # uri is local path when scheme is empty scheme = "local" if scheme == "" else scheme if scheme not in SUPPORTED_STREAM_STORAGE: - raise ValueError(f"{scheme} not supported, - only {SUPPORTED_STREAM_STORAGE} supported") + raise ValueError( + f"{scheme} not supported, only {SUPPORTED_STREAM_STORAGE} supported" + ) bucket_name = parsed.netloc bucket_path = parsed.path.lstrip("/") if scheme != "" else parsed.path return scheme, bucket_name, bucket_path -def _create_s3_client(ak, sk, endpoint, region): +def _create_s3_client(ak, sk, endpoint, region, num_threads=DEFAULT_POOL_CONNECTIONS): ak = ak or os.getenv("AWS_ACCESS_KEY_ID") sk = sk or os.getenv("AWS_SECRET_ACCESS_KEY") endpoint = endpoint or os.getenv("AWS_ENDPOINT_URL") region = region or os.getenv("AWS_REGION") + max_pool_connections = num_threads \ + if num_threads > DEFAULT_POOL_CONNECTIONS \ + else DEFAULT_POOL_CONNECTIONS + my_config = Config( # signature_version = 'v4', - s3={"addressing_style": "virtual"} - ) - return boto3.client( - service_name="s3", - region_name=region, - endpoint_url=endpoint, - aws_access_key_id=ak, - aws_secret_access_key=sk, - config=my_config + s3={"addressing_style": "virtual"}, + max_pool_connections=max_pool_connections, ) + return boto3.client(service_name="s3", + region_name=region, + endpoint_url=endpoint, + aws_access_key_id=ak, + aws_secret_access_key=sk, + config=my_config) class TensorMeta: - def __init__(self, name: str, base_offset: int, dtype: str, shape: List[int], data_offsets: List[int]) -> None: + + def __init__(self, name: str, base_offset: int, dtype: str, + shape: List[int], data_offsets: List[int]) -> None: self._name = name self._base_offset = base_offset self._dtype = get_dtype(dtype) @@ -124,20 +132,19 @@ def set_tensor(self, tensor: torch.Tensor) -> None: self._tensor = tensor def __str__(self) -> str: - return str( - { - "name": self._name, - "dtype": self._dtype, - "shape": self._shape, - "data_offsets": self._data_offsets, - } - ) + return str({ + "name": self._name, + "dtype": self._dtype, + "shape": self._shape, + "data_offsets": self._data_offsets, + }) def __repr__(self) -> str: return self.__str__() -def split_continue_tensors(tensor_metas: List[TensorMeta], num_readers:int) -> List[Tuple[TensorMeta]]: +def split_continue_tensors(tensor_metas: List[TensorMeta], + num_readers: int) -> List[Tuple[TensorMeta]]: """ Note: Usually, the number of groups for splitting tensors is greater than num_deaders. @@ -146,7 +153,7 @@ def split_continue_tensors(tensor_metas: List[TensorMeta], num_readers:int) -> L assert num_readers > 0, "num_readers should be greater than 0" if len(tensor_metas) <= num_readers: - return [(item,) for item in tensor_metas] + return [(item, ) for item in tensor_metas] max_offset = tensor_metas[-1].data_offsets[1] avg_size = max_offset // num_readers @@ -167,12 +174,13 @@ def split_continue_tensors(tensor_metas: List[TensorMeta], num_readers:int) -> L return groups -def split_continue_tensors_v1(tensor_metas: List[TensorMeta], num_readers:int) -> List[Tuple[TensorMeta]]: +def split_continue_tensors_v1(tensor_metas: List[TensorMeta], + num_readers: int) -> List[Tuple[TensorMeta]]: assert len(tensor_metas) > 0, "tensor_metas should not be empty" assert num_readers > 0, "num_readers should be greater than 0" if len(tensor_metas) <= num_readers: - return [(item,) for item in tensor_metas] + return [(item, ) for item in tensor_metas] max_offset = tensor_metas[-1].data_offsets[1] avg_size = max_offset // num_readers From b875e158c4f961137e12623d6cfb694421664993 Mon Sep 17 00:00:00 2001 From: brosoul Date: Wed, 6 Nov 2024 16:09:57 +0800 Subject: [PATCH 03/18] format --- .../model_loader/stream/file.py | 61 ++++---- .../model_loader/stream/loader.py | 148 ++++++++++-------- .../model_loader/stream/utils.py | 80 ++++++---- 3 files changed, 165 insertions(+), 124 deletions(-) diff --git a/vllm/model_executor/model_loader/stream/file.py b/vllm/model_executor/model_loader/stream/file.py index cd8c381aa0454..10c1980aadfb2 100644 --- a/vllm/model_executor/model_loader/stream/file.py +++ b/vllm/model_executor/model_loader/stream/file.py @@ -12,32 +12,30 @@ # See the License for the specific language governing permissions and # limitations under the License. -from dataclasses import dataclass from io import BytesIO from pathlib import Path -from typing import NoReturn, Optional +from typing import Optional, Union -import numpy as np import boto3 +import numpy as np from boto3.s3.transfer import TransferConfig +from vllm.logger import init_logger + from .utils import ( _create_s3_client, _parse_bucket_info_from_uri, read_to_bytes_io, ) -from vllm.logger import init_logger - logger = init_logger(__name__) class LoadFile: - def __init__(self, file_source: str) -> None: self.file_source = file_source - def load_whole_file(self, num_threads: int = 1) -> NoReturn: + def load_whole_file(self, num_threads: int = 1): raise NotImplementedError def load_to_bytes(self, offset: int, count: int) -> BytesIO: @@ -46,13 +44,12 @@ def load_to_bytes(self, offset: int, count: int) -> BytesIO: def load_to_buffer(self, offset: int, count: int) -> memoryview: raise NotImplementedError - def download(self, target_dir) -> NoReturn: + def download(self, target_dir): raise NotImplementedError class LocalFile(LoadFile): - - def __init__(self, file: str) -> None: + def __init__(self, file: Union[str, Path]) -> None: if not Path(file).exists(): raise ValueError(f"file {file} not exist") @@ -62,7 +59,8 @@ def __init__(self, file: str) -> None: def load_whole_file(self, num_threads: int = 1): if num_threads != 1: logger.warning( - f"num_threads {num_threads} is not supported for local file.") + "num_threads %s is not supported for local file.", num_threads + ) tensor_bytes = np.memmap( self.file, @@ -85,7 +83,6 @@ def load_to_buffer(self, offset: int, count: int): class RemoteFile(LoadFile): - def __init__(self, file: str, file_source: str) -> None: self.file = file super().__init__(file_source=file_source) @@ -94,12 +91,11 @@ def load_to_buffer(self, offset: int, count: int): tensor_bytes = self.load_to_bytes(offset=offset, count=count) return tensor_bytes.getbuffer() - def download_file(self, target_dir: str): + def download_file(self, target_dir: str, num_threads: int = 1): raise NotImplementedError class S3File(RemoteFile): - def __init__( self, scheme: str, @@ -115,28 +111,31 @@ def __init__( self.bucket_path = bucket_path if s3_client is None: try: - s3_client = _create_s3_client(ak=s3_access_key_id, - sk=s3_secret_access_key, - endpoint=s3_endpinit, - region=s3_region) + s3_client = _create_s3_client( + ak=s3_access_key_id, + sk=s3_secret_access_key, + endpoint=s3_endpinit, + region=s3_region, + ) except Exception as e: - raise ValueError(f"create s3 client failed for {e}.") + raise ValueError(f"create s3 client failed for {e}.") from e self.s3_client = s3_client try: self.s3_client.head_object(Bucket=bucket_name, Key=bucket_path) except Exception as e: - raise ValueError("S3 bucket path {bucket_path} not exist for {e}.") + raise ValueError( + f"S3 bucket path {bucket_path} not exist for {e}." + ) from e file = scheme + "://" + bucket_name + "/" + bucket_path super().__init__(file=file, file_source=scheme) @classmethod def from_uri(cls, file_uri: str, **kwargs): - scheme, bucket_name, bucket_path = _parse_bucket_info_from_uri( - file_uri) + scheme, bucket_name, bucket_path = _parse_bucket_info_from_uri(file_uri) cls(scheme, bucket_name, bucket_path, **kwargs) - def load_whole_file(self, num_threads: int): + def load_whole_file(self, num_threads: int = 1): config_kwargs = { "max_concurrency": num_threads, "use_threads": True, @@ -154,16 +153,16 @@ def load_whole_file(self, num_threads: int): def load_to_bytes(self, offset: int, count: int): range_header = f"bytes={offset}-{offset+count-1}" - resp = self.s3_client.get_object(Bucket=self.bucket_name, - Key=self.bucket_path, - Range=range_header) + resp = self.s3_client.get_object( + Bucket=self.bucket_name, Key=self.bucket_path, Range=range_header + ) return read_to_bytes_io(resp.get("Body")) - def download_file(self, target_dir: str, num_threads: int): + def download_file(self, target_dir: str, num_threads: int = 1): # ensure target dir exist target_path = Path(target_dir) target_path.mkdir(parents=True, exist_ok=True) - + _file_name = self.bucket_path.split("/")[-1] local_file = target_path.joinpath(_file_name).absolute() config_kwargs = { @@ -179,4 +178,8 @@ def download_file(self, target_dir: str, num_threads: int): ), # S3 client does not support Path, convert it to str Config=config, ) - logger.info(f"download file from `{self.bucket_path}` to `{target_dir}` success.") + logger.info( + "download file from `%s` to `%s` success.", + self.bucket_path, + target_dir, + ) diff --git a/vllm/model_executor/model_loader/stream/loader.py b/vllm/model_executor/model_loader/stream/loader.py index 1b81019dfe7b9..c8fced1390ee8 100644 --- a/vllm/model_executor/model_loader/stream/loader.py +++ b/vllm/model_executor/model_loader/stream/loader.py @@ -13,20 +13,26 @@ # limitations under the License. import concurrent.futures -from dataclasses import dataclass import json -from pathlib import Path import queue import struct import threading -from typing import Generator, List, Tuple, Union, Optional +from dataclasses import dataclass +from pathlib import Path +from typing import Generator, List, Optional, Tuple, Union import torch -from .file import LoadFile, LocalFile, S3File -from .utils import TensorMeta, _create_s3_client, _parse_bucket_info_from_uri, split_continue_tensors from vllm.logger import init_logger +from .file import LoadFile, LocalFile, S3File +from .utils import ( + TensorMeta, + _create_s3_client, + _parse_bucket_info_from_uri, + split_continue_tensors, +) + logger = init_logger(__name__) @@ -50,18 +56,20 @@ def get_safetensors_metas(file: LoadFile): dtype=tensor_meta["dtype"], shape=tensor_meta["shape"], data_offsets=tensor_meta["data_offsets"], - )) + ) + ) # Ensure tensors chunks could be split continuously return sorted(metas, key=lambda obj: obj.real_offset) class StreamLoader: - - def __init__(self, - file: LoadFile, - num_thread: int = 32, - use_pinmem: bool = False, - use_direct_io: bool = False) -> None: + def __init__( + self, + file: LoadFile, + num_thread: int = 32, + use_pinmem: bool = False, + use_direct_io: bool = False, + ) -> None: self.file = file self.num_thread = num_thread self.use_pinmem = use_pinmem @@ -73,37 +81,43 @@ def load_safetensors(self, device: Union[torch.device, str] = "cpu"): return dict(self.get_weights_iterator(device=device)) def _tensors_reader( - self, thread_idx, barrier, device: Union[torch.device, str], - tensor_metas: Tuple[TensorMeta], - transfer_out_queue: queue.SimpleQueue[Union[Exception, TensorMeta]] + self, + thread_idx, + barrier, + device: Union[torch.device, str], + tensor_metas: Tuple[TensorMeta, ...], + transfer_out_queue: queue.SimpleQueue[Union[Exception, TensorMeta]], ) -> None: device = torch.device(device) is_cuda = device.type == "cuda" # TODO use stream nonblocking IO for tensor_meta in tensor_metas: tensor_buffer = self.file.load_to_buffer( - offset=tensor_meta.real_offset, count=tensor_meta.count) + offset=tensor_meta.real_offset, count=tensor_meta.count + ) tensor = torch.frombuffer( - tensor_buffer, dtype=tensor_meta.dtype).view(tensor_meta.shape) + tensor_buffer, dtype=tensor_meta.dtype + ).view(tensor_meta.shape) if is_cuda: tensor = tensor.to(device, non_blocking=True) tensor_meta.set_tensor(tensor) transfer_out_queue.put(tensor_meta) def get_weights_iterator( - self, - device: Union[torch.device, str] = "cpu" + self, device: Union[torch.device, str] = "cpu" ) -> Generator[Tuple[str, torch.Tensor], None, None]: - tensors_per_reader: List[Tuple[TensorMeta]] = split_continue_tensors( - self.tensors_metas, self.num_thread) + tensors_per_reader: List[ + Tuple[TensorMeta, ...] + ] = split_continue_tensors(self.tensors_metas, self.num_thread) effective_num_readers = len(tensors_per_reader) self._reader_pool = concurrent.futures.ThreadPoolExecutor( max_workers=effective_num_readers, thread_name_prefix="SafetensorsReader", ) - transfer_out_queue: queue.SimpleQueue[Union[ - Exception, TensorMeta]] = queue.SimpleQueue() # type: ignore + transfer_out_queue: queue.SimpleQueue[ + Union[Exception, TensorMeta] + ] = queue.SimpleQueue() # type: ignore futures: List[concurrent.futures.Future] = [] barrier = threading.Barrier(effective_num_readers) @@ -121,7 +135,9 @@ def get_weights_iterator( try: for _ in range(len(self.tensors_metas)): - tensor_meta: TensorMeta = transfer_out_queue.get(timeout=3600) + tensor_meta: Union[ + TensorMeta, Exception + ] = transfer_out_queue.get(timeout=3600) if isinstance(tensor_meta, Exception): raise tensor_meta yield tensor_meta.name, tensor_meta.tensor @@ -129,18 +145,18 @@ def get_weights_iterator( raise def get_weights_iterator_wo_threads( - self, - device: Union[torch.device, str] = "cpu" + self, device: Union[torch.device, str] = "cpu" ) -> Generator[Tuple[str, torch.Tensor], None, None]: - device = torch.device(device) is_cuda = device.type == "cuda" # TODO use stream nonblocking IO for tensor_meta in self.tensors_metas: tensor_buffer = self.file.load_to_bytes( - offset=tensor_meta.real_offset, count=tensor_meta.count) + offset=tensor_meta.real_offset, count=tensor_meta.count + ) tensor = torch.frombuffer( - tensor_buffer, dtype=tensor_meta.dtype).view(tensor_meta.shape) + tensor_buffer, dtype=tensor_meta.dtype + ).view(tensor_meta.shape) if is_cuda: tensor = tensor.to(device, non_blocking=True) @@ -159,41 +175,40 @@ class StreamModel: def __post_init__(self): scheme, bucket_name, bucket_path = _parse_bucket_info_from_uri( - self.model_uri) + self.model_uri + ) self.model_source_type = scheme self.bucket_name = bucket_name # list config and safetensors files in model_uri + files: List[str] = [] if self.model_source_type == "local": local_dir = Path(self.model_uri) if not local_dir.exists(): raise ValueError(f"local path {local_dir} not exist") - files = [file for file in local_dir.iterdir() if file.is_file()] - - self.config_files = [ - file for file in files if file.suffix == ".json" - ] - self.safetensors_files = [ - file for file in files if file.suffix == ".safetensors" - ] + files = [str(file) for file in local_dir.iterdir() if file.is_file()] else: - self.s3_client = _create_s3_client(ak=self.s3_access_key_id, - sk=self.s3_secret_access_key, - endpoint=self.s3_endpinit, - region=self.s3_region, - num_threads=self.num_threads) + self.s3_client = _create_s3_client( + ak=self.s3_access_key_id, + sk=self.s3_secret_access_key, + endpoint=self.s3_endpinit, + region=self.s3_region, + num_threads=self.num_threads, + ) objects_out = self.s3_client.list_objects_v2( - Bucket=self.bucket_name, Delimiter="/", Prefix=bucket_path) + Bucket=self.bucket_name, Delimiter="/", Prefix=bucket_path + ) files = [ - content.get("Key") + str(content.get("Key")) for content in objects_out.get("Contents", []) ] - self.config_files = [ - file for file in files if file.endswith(".json") - ] - self.safetensors_files = [ - file for file in files if file.endswith(".safetensors") - ] + + self.config_files = [ + file for file in files if file.endswith(".json") + ] + self.safetensors_files = [ + file for file in files if file.endswith(".safetensors") + ] if len(self.config_files) == 0: raise ValueError(f"no config file found in {self.model_uri}") @@ -206,30 +221,37 @@ def download_config(self, target_dir: str) -> Path: return Path(self.model_uri) for config_file in self.config_files: - config_s3 = S3File(scheme=self.model_source_type, - bucket_name=self.bucket_name, - bucket_path=config_file, - s3_client=self.s3_client) + config_s3 = S3File( + scheme=self.model_source_type, + bucket_name=self.bucket_name, + bucket_path=config_file, + s3_client=self.s3_client, + ) - config_s3.download_file(target_dir=target_dir, - num_threads=self.num_threads) + config_s3.download_file( + target_dir=target_dir, num_threads=self.num_threads + ) target_path = Path(target_dir) return target_path def get_weights_iterator(self, device: Union[torch.device, str] = "cpu"): for safetensors_file in self.safetensors_files: + safetensors_s3: LoadFile if self.model_source_type == "local": - safetensors_s3 = LocalFile(safetensors_file) + safetensors_s3 = LocalFile(str(safetensors_file)) else: - safetensors_s3 = S3File(scheme=self.model_source_type, - bucket_name=self.bucket_name, - bucket_path=safetensors_file, - s3_client=self.s3_client) + safetensors_s3 = S3File( + scheme=self.model_source_type, + bucket_name=self.bucket_name, + bucket_path=safetensors_file, + s3_client=self.s3_client, + ) safetensors_loader = StreamLoader( file=safetensors_s3, num_thread=self.num_threads, ) for name, tensor in safetensors_loader.get_weights_iterator( - device=device): + device=device + ): yield name, tensor diff --git a/vllm/model_executor/model_loader/stream/utils.py b/vllm/model_executor/model_loader/stream/utils.py index 96ced8a5b87b5..01d77b6f7e398 100644 --- a/vllm/model_executor/model_loader/stream/utils.py +++ b/vllm/model_executor/model_loader/stream/utils.py @@ -28,7 +28,8 @@ def filter_suffix_files(files: List[str], suffix: str) -> List[str]: def get_dtype(dtype_str: str): - # torch.float8 formats require 2.1; we do not support these dtypes on earlier versions + # torch.float8 formats require 2.1; + # we do not support these dtypes on earlier versions _float8_e4m3fn = getattr(torch, "float8_e4m3fn", None) _float8_e5m2 = getattr(torch, "float8_e5m2", None) _TYPES = { @@ -66,33 +67,44 @@ def _parse_bucket_info_from_uri(uri: str) -> Tuple[str, str, str]: return scheme, bucket_name, bucket_path -def _create_s3_client(ak, sk, endpoint, region, num_threads=DEFAULT_POOL_CONNECTIONS): +def _create_s3_client( + ak, sk, endpoint, region, num_threads=DEFAULT_POOL_CONNECTIONS +): ak = ak or os.getenv("AWS_ACCESS_KEY_ID") sk = sk or os.getenv("AWS_SECRET_ACCESS_KEY") endpoint = endpoint or os.getenv("AWS_ENDPOINT_URL") region = region or os.getenv("AWS_REGION") - max_pool_connections = num_threads \ - if num_threads > DEFAULT_POOL_CONNECTIONS \ - else DEFAULT_POOL_CONNECTIONS + max_pool_connections = ( + num_threads + if num_threads > DEFAULT_POOL_CONNECTIONS + else DEFAULT_POOL_CONNECTIONS + ) my_config = Config( # signature_version = 'v4', s3={"addressing_style": "virtual"}, max_pool_connections=max_pool_connections, ) - return boto3.client(service_name="s3", - region_name=region, - endpoint_url=endpoint, - aws_access_key_id=ak, - aws_secret_access_key=sk, - config=my_config) + return boto3.client( + service_name="s3", + region_name=region, + endpoint_url=endpoint, + aws_access_key_id=ak, + aws_secret_access_key=sk, + config=my_config, + ) class TensorMeta: - - def __init__(self, name: str, base_offset: int, dtype: str, - shape: List[int], data_offsets: List[int]) -> None: + def __init__( + self, + name: str, + base_offset: int, + dtype: str, + shape: List[int], + data_offsets: List[int], + ) -> None: self._name = name self._base_offset = base_offset self._dtype = get_dtype(dtype) @@ -117,11 +129,11 @@ def data_offsets(self) -> List[int]: return self._data_offsets @property - def real_offset(self) -> List[int]: + def real_offset(self) -> int: return self._data_offsets[0] + self._base_offset @property - def count(self) -> List[int]: + def count(self) -> int: return self._data_offsets[1] - self._data_offsets[0] @property @@ -132,19 +144,22 @@ def set_tensor(self, tensor: torch.Tensor) -> None: self._tensor = tensor def __str__(self) -> str: - return str({ - "name": self._name, - "dtype": self._dtype, - "shape": self._shape, - "data_offsets": self._data_offsets, - }) + return str( + { + "name": self._name, + "dtype": self._dtype, + "shape": self._shape, + "data_offsets": self._data_offsets, + } + ) def __repr__(self) -> str: return self.__str__() -def split_continue_tensors(tensor_metas: List[TensorMeta], - num_readers: int) -> List[Tuple[TensorMeta]]: +def split_continue_tensors( + tensor_metas: List[TensorMeta], num_readers: int +) -> List[Tuple[TensorMeta, ...]]: """ Note: Usually, the number of groups for splitting tensors is greater than num_deaders. @@ -153,12 +168,12 @@ def split_continue_tensors(tensor_metas: List[TensorMeta], assert num_readers > 0, "num_readers should be greater than 0" if len(tensor_metas) <= num_readers: - return [(item, ) for item in tensor_metas] + return [(item,) for item in tensor_metas] max_offset = tensor_metas[-1].data_offsets[1] avg_size = max_offset // num_readers - group = [] - groups = [] + group: List[TensorMeta] = [] + groups: List[Tuple[TensorMeta, ...]] = [] group_size = 0 for tensor_meta in tensor_metas: if len(group) == 0 or group_size + tensor_meta.count <= avg_size: @@ -174,18 +189,19 @@ def split_continue_tensors(tensor_metas: List[TensorMeta], return groups -def split_continue_tensors_v1(tensor_metas: List[TensorMeta], - num_readers: int) -> List[Tuple[TensorMeta]]: +def split_continue_tensors_v1( + tensor_metas: List[TensorMeta], num_readers: int +) -> List[Tuple[TensorMeta, ...]]: assert len(tensor_metas) > 0, "tensor_metas should not be empty" assert num_readers > 0, "num_readers should be greater than 0" if len(tensor_metas) <= num_readers: - return [(item, ) for item in tensor_metas] + return [(item,) for item in tensor_metas] max_offset = tensor_metas[-1].data_offsets[1] avg_size = max_offset // num_readers - group = [] - groups = [] + group: List[TensorMeta] = [] + groups: List[Tuple[TensorMeta, ...]] = [] current_max_offset = avg_size for tensor_meta in tensor_metas: start, end = tensor_meta.data_offsets From e5e88bb9b8f62270c3635b7a3383d1981ea45ab9 Mon Sep 17 00:00:00 2001 From: brosoul Date: Thu, 7 Nov 2024 12:16:06 +0800 Subject: [PATCH 04/18] feat: vllm support stream loader --- vllm/config.py | 1 + vllm/engine/arg_utils.py | 16 ++++++- vllm/model_executor/model_loader/loader.py | 43 +++++++++++++++++ .../model_loader/stream/loader.py | 4 ++ .../model_loader/stream_loader.py | 48 +++++++++++++++++++ 5 files changed, 111 insertions(+), 1 deletion(-) create mode 100644 vllm/model_executor/model_loader/stream_loader.py diff --git a/vllm/config.py b/vllm/config.py index 7a15606836dcc..1876102dddb2d 100644 --- a/vllm/config.py +++ b/vllm/config.py @@ -722,6 +722,7 @@ class LoadFormat(str, enum.Enum): GGUF = "gguf" BITSANDBYTES = "bitsandbytes" MISTRAL = "mistral" + STREAM = "stream" @dataclass diff --git a/vllm/engine/arg_utils.py b/vllm/engine/arg_utils.py index 4139eca9c1832..e7df109afe734 100644 --- a/vllm/engine/arg_utils.py +++ b/vllm/engine/arg_utils.py @@ -16,6 +16,8 @@ from vllm.executor.executor_base import ExecutorBase from vllm.logger import init_logger from vllm.model_executor.layers.quantization import QUANTIZATION_METHODS +from vllm.model_executor.model_loader.stream.loader import StreamModel +from vllm.model_executor.model_loader.stream_loader import StreamConfig from vllm.transformers_utils.utils import check_gguf_file from vllm.utils import FlexibleArgumentParser @@ -258,7 +260,9 @@ def add_cli_args(parser: FlexibleArgumentParser) -> FlexibleArgumentParser: 'CoreWeave. See the Tensorize vLLM Model script in the Examples ' 'section for more information.\n' '* "bitsandbytes" will load the weights using bitsandbytes ' - 'quantization.\n') + 'quantization.\n' + '* "stream" will load the weights using stream from remote storage,' + 'like S3 or TOS.\n') parser.add_argument( '--config-format', default=EngineArgs.config_format, @@ -796,6 +800,16 @@ def from_cli_args(cls, args: argparse.Namespace): return engine_args def create_model_config(self) -> ModelConfig: + if self.load_format == "stream": + # download config json to `download_dir` + # and replace `model` with `download_dir` + stream_config = StreamConfig(**self.model_loader_extra_config) + stream_model = stream_config.construct_stream_model() + config_dir = self.download_dir or "/tmp/stream_load/" + config_path = stream_model.download_config(config_dir) + + self.served_model_name = self.served_model_name or self.model + self.model = str(config_path) return ModelConfig( model=self.model, tokenizer=self.tokenizer, diff --git a/vllm/model_executor/model_loader/loader.py b/vllm/model_executor/model_loader/loader.py index f0d2a9e7f06be..bb343fde3018d 100644 --- a/vllm/model_executor/model_loader/loader.py +++ b/vllm/model_executor/model_loader/loader.py @@ -28,6 +28,8 @@ from vllm.logger import init_logger from vllm.model_executor.layers.quantization.base_config import ( QuantizationConfig) +from vllm.model_executor.model_loader.stream.loader import StreamModel +from vllm.model_executor.model_loader.stream_loader import StreamConfig from vllm.model_executor.model_loader.tensorizer import ( TensorizerConfig, is_vllm_tensorized, load_with_tensorizer, serialize_vllm_model, tensorizer_weights_iterator) @@ -1164,6 +1166,44 @@ def load_model(self, *, model_config: ModelConfig, return model +class StreamModelLoader(BaseModelLoader): + """Model loader using AiBrix's stream loader library.""" + + def __init__(self, load_config: LoadConfig): + super().__init__(load_config) + if isinstance(load_config.model_loader_extra_config, StreamConfig): + self.stream_loader_config = load_config.model_loader_extra_config + else: + self.stream_loader_config = StreamConfig( + **load_config.model_loader_extra_config) + + self.stream_model = self.stream_loader_config.construct_stream_model() + + def _verify_config(self, model_config: ModelConfig, + parallel_config: ParallelConfig): + self.stream_loader_config.verify_with_model_config(model_config) + self.stream_loader_config.verify_with_parallel_config(parallel_config) + + + def load_model(self, *, model_config: ModelConfig, + device_config: DeviceConfig, + lora_config: Optional[LoRAConfig], + parallel_config: ParallelConfig, + scheduler_config: SchedulerConfig, + cache_config: CacheConfig) -> nn.Module: + self._verify_config(model_config, parallel_config) + + with set_default_torch_dtype(model_config.dtype): + with torch.device(device_config.device): + model = _initialize_model(model_config, self.load_config, + lora_config, cache_config, + scheduler_config) + + model.load_weights(self.stream_model.get_weights_iterator(device_config.device)) + + return model.eval() + + def get_model_loader(load_config: LoadConfig) -> BaseModelLoader: """Get a model loader based on the load format.""" @@ -1185,4 +1225,7 @@ def get_model_loader(load_config: LoadConfig) -> BaseModelLoader: if load_config.load_format == LoadFormat.GGUF: return GGUFModelLoader(load_config) + if load_config.load_format == LoadFormat.STREAM: + return StreamModelLoader(load_config) + return DefaultModelLoader(load_config) diff --git a/vllm/model_executor/model_loader/stream/loader.py b/vllm/model_executor/model_loader/stream/loader.py index c8fced1390ee8..458028095e836 100644 --- a/vllm/model_executor/model_loader/stream/loader.py +++ b/vllm/model_executor/model_loader/stream/loader.py @@ -172,6 +172,8 @@ class StreamModel: s3_secret_access_key: Optional[str] = None s3_region: Optional[str] = None s3_endpinit: Optional[str] = None + use_pinmem: bool = False + use_direct_io: bool = False def __post_init__(self): scheme, bucket_name, bucket_path = _parse_bucket_info_from_uri( @@ -250,6 +252,8 @@ def get_weights_iterator(self, device: Union[torch.device, str] = "cpu"): safetensors_loader = StreamLoader( file=safetensors_s3, num_thread=self.num_threads, + use_pinmem=self.use_pinmem, + use_direct_io=self.use_direct_io, ) for name, tensor in safetensors_loader.get_weights_iterator( device=device diff --git a/vllm/model_executor/model_loader/stream_loader.py b/vllm/model_executor/model_loader/stream_loader.py new file mode 100644 index 0000000000000..402378804c53b --- /dev/null +++ b/vllm/model_executor/model_loader/stream_loader.py @@ -0,0 +1,48 @@ + + + +from dataclasses import dataclass +from typing import Optional + +from vllm.config import ModelConfig, ParallelConfig +from vllm.logger import init_logger +from vllm.model_executor.model_loader.stream.loader import StreamModel + +logger = init_logger(__name__) + +@dataclass +class StreamConfig: + model_uri: str + num_threads: int = 16 + s3_access_key_id: Optional[str] = None + s3_secret_access_key: Optional[str] = None + s3_region: Optional[str] = None + s3_endpinit: Optional[str] = None + use_pinmem: bool = False + use_direct_io: bool = False + + def verify_with_parallel_config( + self, + parallel_config: "ParallelConfig", + ) -> None: + if parallel_config.tensor_parallel_size > 1: + raise ValueError( + "steam loader does not support tensor parallelism yet") + + def verify_with_model_config(self, model_config: "ModelConfig") -> None: + if model_config.quantization is not None: + logger.warning( + "Loading a model using VeturboIO with quantization on vLLM" + " is unstable and may lead to errors.") + + def construct_stream_model(self) -> StreamModel: + return StreamModel( + model_uri=self.model_uri, + num_threads=self.num_threads, + s3_access_key_id=self.s3_access_key_id, + s3_secret_access_key=self.s3_secret_access_key, + s3_endpinit=self.s3_endpinit, + s3_region=self.s3_region, + use_pinmem=self.use_pinmem, + use_direct_io=self.use_direct_io + ) \ No newline at end of file From ffb6af1d7beb78f10d7d5fe0d636941d46c33c4a Mon Sep 17 00:00:00 2001 From: brosoul Date: Thu, 7 Nov 2024 16:30:47 +0800 Subject: [PATCH 05/18] fix: start up with stream loader --- vllm/engine/arg_utils.py | 13 +++- vllm/entrypoints/openai/api_server.py | 3 + vllm/model_executor/model_loader/loader.py | 4 +- .../model_loader/stream/__init__.py | 1 + .../model_loader/stream/file.py | 24 ++++++- .../model_loader/stream/loader.py | 6 +- .../model_loader/stream/utils.py | 71 ++++++++++++++++++- .../model_loader/stream_loader.py | 3 - 8 files changed, 112 insertions(+), 13 deletions(-) diff --git a/vllm/engine/arg_utils.py b/vllm/engine/arg_utils.py index e7df109afe734..5380ccc83d118 100644 --- a/vllm/engine/arg_utils.py +++ b/vllm/engine/arg_utils.py @@ -16,8 +16,6 @@ from vllm.executor.executor_base import ExecutorBase from vllm.logger import init_logger from vllm.model_executor.layers.quantization import QUANTIZATION_METHODS -from vllm.model_executor.model_loader.stream.loader import StreamModel -from vllm.model_executor.model_loader.stream_loader import StreamConfig from vllm.transformers_utils.utils import check_gguf_file from vllm.utils import FlexibleArgumentParser @@ -801,15 +799,24 @@ def from_cli_args(cls, args: argparse.Namespace): def create_model_config(self) -> ModelConfig: if self.load_format == "stream": + from vllm.model_executor.model_loader.stream_loader import StreamConfig # download config json to `download_dir` # and replace `model` with `download_dir` - stream_config = StreamConfig(**self.model_loader_extra_config) + model_loader_extra_config = self.model_loader_extra_config or {} + if isinstance(model_loader_extra_config, str): + model_loader_extra_config = json.loads(model_loader_extra_config) + + stream_config = StreamConfig(**model_loader_extra_config) stream_model = stream_config.construct_stream_model() config_dir = self.download_dir or "/tmp/stream_load/" config_path = stream_model.download_config(config_dir) + reset_tokenizer = self.model == self.tokenizer self.served_model_name = self.served_model_name or self.model self.model = str(config_path) + if reset_tokenizer: + self.tokenizer = self.model + return ModelConfig( model=self.model, tokenizer=self.tokenizer, diff --git a/vllm/entrypoints/openai/api_server.py b/vllm/entrypoints/openai/api_server.py index fd6f36e8768dd..3da0a79ff52a1 100644 --- a/vllm/entrypoints/openai/api_server.py +++ b/vllm/entrypoints/openai/api_server.py @@ -533,6 +533,9 @@ def signal_handler(*_) -> None: app = build_app(args) model_config = await engine_client.get_model_config() + logger.info(">>>>>>>>>>>>") + logger.info(model_config) + logger.info(">>>>>>>>>>>>") init_app_state(engine_client, model_config, app.state, args) temp_socket.close() diff --git a/vllm/model_executor/model_loader/loader.py b/vllm/model_executor/model_loader/loader.py index bb343fde3018d..a23b2a4edc777 100644 --- a/vllm/model_executor/model_loader/loader.py +++ b/vllm/model_executor/model_loader/loader.py @@ -28,7 +28,6 @@ from vllm.logger import init_logger from vllm.model_executor.layers.quantization.base_config import ( QuantizationConfig) -from vllm.model_executor.model_loader.stream.loader import StreamModel from vllm.model_executor.model_loader.stream_loader import StreamConfig from vllm.model_executor.model_loader.tensorizer import ( TensorizerConfig, is_vllm_tensorized, load_with_tensorizer, @@ -1203,6 +1202,9 @@ def load_model(self, *, model_config: ModelConfig, return model.eval() + def download_model(self, model_config: ModelConfig) -> None: + pass # Nothing to download + def get_model_loader(load_config: LoadConfig) -> BaseModelLoader: """Get a model loader based on the load format.""" diff --git a/vllm/model_executor/model_loader/stream/__init__.py b/vllm/model_executor/model_loader/stream/__init__.py index 0320b75ba54cc..fed0dc6eaefdc 100644 --- a/vllm/model_executor/model_loader/stream/__init__.py +++ b/vllm/model_executor/model_loader/stream/__init__.py @@ -14,3 +14,4 @@ SUPPORTED_STREAM_STORAGE = ("s3", "tos", "local") +DOWNLOAD_CACHE_DIR = ".cache" diff --git a/vllm/model_executor/model_loader/stream/file.py b/vllm/model_executor/model_loader/stream/file.py index 10c1980aadfb2..89217d2bc746e 100644 --- a/vllm/model_executor/model_loader/stream/file.py +++ b/vllm/model_executor/model_loader/stream/file.py @@ -25,7 +25,10 @@ from .utils import ( _create_s3_client, _parse_bucket_info_from_uri, + meta_file, + need_to_download, read_to_bytes_io, + save_meta_data, ) logger = init_logger(__name__) @@ -158,13 +161,29 @@ def load_to_bytes(self, offset: int, count: int): ) return read_to_bytes_io(resp.get("Body")) - def download_file(self, target_dir: str, num_threads: int = 1): + def download_file(self, target_dir: str, num_threads: int = 1, force_download: bool = False): + try: + meta_data = self.s3_client.head_object(Bucket=self.bucket_name, + Key=self.bucket_path) + except Exception as e: + raise ValueError("S3 bucket path %s not exist for %s.", self.bucket_path, e) + # ensure target dir exist target_path = Path(target_dir) target_path.mkdir(parents=True, exist_ok=True) - + _file_name = self.bucket_path.split("/")[-1] local_file = target_path.joinpath(_file_name).absolute() + + # check if file exist + etag = meta_data.get("ETag", "") + file_size = meta_data.get("ContentLength", 0) + + meta_data_file = meta_file(local_path=target_path, file_name=_file_name) + if not need_to_download(local_file, meta_data_file, file_size, etag, force_download): + logger.info("file `%s` already exist.", self.bucket_path) + return + config_kwargs = { "max_concurrency": num_threads, "use_threads": True, @@ -178,6 +197,7 @@ def download_file(self, target_dir: str, num_threads: int = 1): ), # S3 client does not support Path, convert it to str Config=config, ) + save_meta_data(meta_data_file, etag) logger.info( "download file from `%s` to `%s` success.", self.bucket_path, diff --git a/vllm/model_executor/model_loader/stream/loader.py b/vllm/model_executor/model_loader/stream/loader.py index 458028095e836..c47df9283cd04 100644 --- a/vllm/model_executor/model_loader/stream/loader.py +++ b/vllm/model_executor/model_loader/stream/loader.py @@ -217,7 +217,7 @@ def __post_init__(self): if len(self.safetensors_files) == 0: raise ValueError(f"no safetensors file found in {self.model_uri}") - def download_config(self, target_dir: str) -> Path: + def download_config(self, target_dir: str, force_download: bool = False) -> Path: if self.model_source_type == "local": logger.info("local config no need to download") return Path(self.model_uri) @@ -231,7 +231,9 @@ def download_config(self, target_dir: str) -> Path: ) config_s3.download_file( - target_dir=target_dir, num_threads=self.num_threads + target_dir=target_dir, + num_threads=self.num_threads, + force_download=force_download ) target_path = Path(target_dir) diff --git a/vllm/model_executor/model_loader/stream/utils.py b/vllm/model_executor/model_loader/stream/utils.py index 01d77b6f7e398..c7c87fa4b694b 100644 --- a/vllm/model_executor/model_loader/stream/utils.py +++ b/vllm/model_executor/model_loader/stream/utils.py @@ -1,17 +1,84 @@ import os from io import BytesIO -from typing import List, Optional, Tuple +from pathlib import Path +from typing import List, Optional, Tuple, Union from urllib.parse import urlparse import boto3 import torch from botocore.config import Config -from . import SUPPORTED_STREAM_STORAGE +from . import SUPPORTED_STREAM_STORAGE, DOWNLOAD_CACHE_DIR DEFAULT_POOL_CONNECTIONS = 10 +def meta_file(local_path: Union[Path, str], file_name: str) -> Path: + return ( + Path(local_path) + .joinpath(DOWNLOAD_CACHE_DIR) + .joinpath(f"{file_name}.metadata") + .absolute() + ) + + +def save_meta_data(file_path: Union[Path, str], etag: str): + Path(file_path).parent.mkdir(parents=True, exist_ok=True) + with open(file_path, "w") as f: + f.write(etag) + + +def load_meta_data(file_path: Union[Path, str]): + if Path(file_path).exists(): + with open(file_path, "r") as f: + return f.read() + return None + + +def check_file_exist( + local_file: Union[Path, str], + meta_file: Union[Path, str], + expected_file_size: int, + expected_etag: str, +) -> bool: + if expected_file_size is None or expected_file_size <= 0: + return False + + if expected_etag is None or expected_etag == "": + return False + + if not Path(local_file).exists(): + return False + + file_size = os.path.getsize(local_file) + if file_size != expected_file_size: + return False + + if not Path(meta_file).exists(): + return False + + etag = load_meta_data(meta_file) + return etag == expected_etag + + +def need_to_download( + local_file: Union[Path, str], + meta_data_file: Union[Path, str], + expected_file_size: int, + expected_etag: str, + force_download: bool, +) -> bool: + _file_name = Path(local_file).name + if force_download: + return True + + if check_file_exist( + local_file, meta_data_file, expected_file_size, expected_etag + ): + return False + return True + + def read_to_bytes_io(content, chunk_size=None): chunk_size = int(os.getenv("STREAM_READ_CHUNK_SIZE", 8388608)) # 8MB bytes_io = BytesIO() diff --git a/vllm/model_executor/model_loader/stream_loader.py b/vllm/model_executor/model_loader/stream_loader.py index 402378804c53b..989e631affa78 100644 --- a/vllm/model_executor/model_loader/stream_loader.py +++ b/vllm/model_executor/model_loader/stream_loader.py @@ -1,6 +1,3 @@ - - - from dataclasses import dataclass from typing import Optional From 495fa65531089a8269e48e8f70e02ab3cf84003c Mon Sep 17 00:00:00 2001 From: brosoul Date: Thu, 7 Nov 2024 16:36:14 +0800 Subject: [PATCH 06/18] format --- vllm/engine/arg_utils.py | 8 +++- .../model_loader/stream/file.py | 34 +++++++++------- .../model_loader/stream/loader.py | 40 +++++++++---------- .../model_loader/stream/utils.py | 10 ++--- 4 files changed, 48 insertions(+), 44 deletions(-) diff --git a/vllm/engine/arg_utils.py b/vllm/engine/arg_utils.py index 5380ccc83d118..ac05bd730ec27 100644 --- a/vllm/engine/arg_utils.py +++ b/vllm/engine/arg_utils.py @@ -799,12 +799,16 @@ def from_cli_args(cls, args: argparse.Namespace): def create_model_config(self) -> ModelConfig: if self.load_format == "stream": - from vllm.model_executor.model_loader.stream_loader import StreamConfig + from vllm.model_executor.model_loader.stream_loader import ( + StreamConfig) + # download config json to `download_dir` # and replace `model` with `download_dir` model_loader_extra_config = self.model_loader_extra_config or {} if isinstance(model_loader_extra_config, str): - model_loader_extra_config = json.loads(model_loader_extra_config) + model_loader_extra_config = json.loads( + model_loader_extra_config + ) stream_config = StreamConfig(**model_loader_extra_config) stream_model = stream_config.construct_stream_model() diff --git a/vllm/model_executor/model_loader/stream/file.py b/vllm/model_executor/model_loader/stream/file.py index 89217d2bc746e..738e645c6398e 100644 --- a/vllm/model_executor/model_loader/stream/file.py +++ b/vllm/model_executor/model_loader/stream/file.py @@ -22,14 +22,8 @@ from vllm.logger import init_logger -from .utils import ( - _create_s3_client, - _parse_bucket_info_from_uri, - meta_file, - need_to_download, - read_to_bytes_io, - save_meta_data, -) +from .utils import (_create_s3_client, _parse_bucket_info_from_uri, meta_file, + need_to_download, read_to_bytes_io, save_meta_data) logger = init_logger(__name__) @@ -161,26 +155,36 @@ def load_to_bytes(self, offset: int, count: int): ) return read_to_bytes_io(resp.get("Body")) - def download_file(self, target_dir: str, num_threads: int = 1, force_download: bool = False): + def download_file( + self, + target_dir: str, + num_threads: int = 1, + force_download: bool = False, + ): try: - meta_data = self.s3_client.head_object(Bucket=self.bucket_name, - Key=self.bucket_path) + meta_data = self.s3_client.head_object( + Bucket=self.bucket_name, Key=self.bucket_path + ) except Exception as e: - raise ValueError("S3 bucket path %s not exist for %s.", self.bucket_path, e) + raise ValueError( + "S3 bucket path %s not exist for %s.", self.bucket_path, e + ) from e # ensure target dir exist target_path = Path(target_dir) target_path.mkdir(parents=True, exist_ok=True) - + _file_name = self.bucket_path.split("/")[-1] local_file = target_path.joinpath(_file_name).absolute() # check if file exist etag = meta_data.get("ETag", "") file_size = meta_data.get("ContentLength", 0) - + meta_data_file = meta_file(local_path=target_path, file_name=_file_name) - if not need_to_download(local_file, meta_data_file, file_size, etag, force_download): + if not need_to_download( + local_file, meta_data_file, file_size, etag, force_download + ): logger.info("file `%s` already exist.", self.bucket_path) return diff --git a/vllm/model_executor/model_loader/stream/loader.py b/vllm/model_executor/model_loader/stream/loader.py index c47df9283cd04..88761c2dadc87 100644 --- a/vllm/model_executor/model_loader/stream/loader.py +++ b/vllm/model_executor/model_loader/stream/loader.py @@ -26,12 +26,8 @@ from vllm.logger import init_logger from .file import LoadFile, LocalFile, S3File -from .utils import ( - TensorMeta, - _create_s3_client, - _parse_bucket_info_from_uri, - split_continue_tensors, -) +from .utils import (TensorMeta, _create_s3_client, _parse_bucket_info_from_uri, + split_continue_tensors) logger = init_logger(__name__) @@ -106,18 +102,18 @@ def _tensors_reader( def get_weights_iterator( self, device: Union[torch.device, str] = "cpu" ) -> Generator[Tuple[str, torch.Tensor], None, None]: - tensors_per_reader: List[ - Tuple[TensorMeta, ...] - ] = split_continue_tensors(self.tensors_metas, self.num_thread) + tensors_per_reader: List[Tuple[TensorMeta, ...]] = ( + split_continue_tensors(self.tensors_metas, self.num_thread) + ) effective_num_readers = len(tensors_per_reader) self._reader_pool = concurrent.futures.ThreadPoolExecutor( max_workers=effective_num_readers, thread_name_prefix="SafetensorsReader", ) - transfer_out_queue: queue.SimpleQueue[ - Union[Exception, TensorMeta] - ] = queue.SimpleQueue() # type: ignore + transfer_out_queue: queue.SimpleQueue[Union[Exception, TensorMeta]] = ( + queue.SimpleQueue() + ) # type: ignore futures: List[concurrent.futures.Future] = [] barrier = threading.Barrier(effective_num_readers) @@ -135,9 +131,9 @@ def get_weights_iterator( try: for _ in range(len(self.tensors_metas)): - tensor_meta: Union[ - TensorMeta, Exception - ] = transfer_out_queue.get(timeout=3600) + tensor_meta: Union[TensorMeta, Exception] = ( + transfer_out_queue.get(timeout=3600) + ) if isinstance(tensor_meta, Exception): raise tensor_meta yield tensor_meta.name, tensor_meta.tensor @@ -188,7 +184,9 @@ def __post_init__(self): local_dir = Path(self.model_uri) if not local_dir.exists(): raise ValueError(f"local path {local_dir} not exist") - files = [str(file) for file in local_dir.iterdir() if file.is_file()] + files = [ + str(file) for file in local_dir.iterdir() if file.is_file() + ] else: self.s3_client = _create_s3_client( ak=self.s3_access_key_id, @@ -205,9 +203,7 @@ def __post_init__(self): for content in objects_out.get("Contents", []) ] - self.config_files = [ - file for file in files if file.endswith(".json") - ] + self.config_files = [file for file in files if file.endswith(".json")] self.safetensors_files = [ file for file in files if file.endswith(".safetensors") ] @@ -217,7 +213,9 @@ def __post_init__(self): if len(self.safetensors_files) == 0: raise ValueError(f"no safetensors file found in {self.model_uri}") - def download_config(self, target_dir: str, force_download: bool = False) -> Path: + def download_config( + self, target_dir: str, force_download: bool = False + ) -> Path: if self.model_source_type == "local": logger.info("local config no need to download") return Path(self.model_uri) @@ -233,7 +231,7 @@ def download_config(self, target_dir: str, force_download: bool = False) -> Path config_s3.download_file( target_dir=target_dir, num_threads=self.num_threads, - force_download=force_download + force_download=force_download, ) target_path = Path(target_dir) diff --git a/vllm/model_executor/model_loader/stream/utils.py b/vllm/model_executor/model_loader/stream/utils.py index c7c87fa4b694b..0b3e8d7821951 100644 --- a/vllm/model_executor/model_loader/stream/utils.py +++ b/vllm/model_executor/model_loader/stream/utils.py @@ -8,7 +8,7 @@ import torch from botocore.config import Config -from . import SUPPORTED_STREAM_STORAGE, DOWNLOAD_CACHE_DIR +from . import DOWNLOAD_CACHE_DIR, SUPPORTED_STREAM_STORAGE DEFAULT_POOL_CONNECTIONS = 10 @@ -72,11 +72,9 @@ def need_to_download( if force_download: return True - if check_file_exist( + return not check_file_exist( local_file, meta_data_file, expected_file_size, expected_etag - ): - return False - return True + ) def read_to_bytes_io(content, chunk_size=None): @@ -95,7 +93,7 @@ def filter_suffix_files(files: List[str], suffix: str) -> List[str]: def get_dtype(dtype_str: str): - # torch.float8 formats require 2.1; + # torch.float8 formats require 2.1; # we do not support these dtypes on earlier versions _float8_e4m3fn = getattr(torch, "float8_e4m3fn", None) _float8_e5m2 = getattr(torch, "float8_e5m2", None) From 38cee460002d05fedca0355574a9a401e955dbbc Mon Sep 17 00:00:00 2001 From: brosoul Date: Thu, 7 Nov 2024 19:13:30 +0800 Subject: [PATCH 07/18] style: add model load time log --- vllm/worker/model_runner.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/vllm/worker/model_runner.py b/vllm/worker/model_runner.py index e8c472df8b5fc..d278eb4e84242 100644 --- a/vllm/worker/model_runner.py +++ b/vllm/worker/model_runner.py @@ -1012,6 +1012,7 @@ def __init__( def load_model(self) -> None: logger.info("Starting to load model %s...", self.model_config.model) + start_time = time.perf_counter() with CudaMemoryProfiler() as m: self.model = get_model(model_config=self.model_config, device_config=self.device_config, @@ -1020,10 +1021,11 @@ def load_model(self) -> None: parallel_config=self.parallel_config, scheduler_config=self.scheduler_config, cache_config=self.cache_config) - + end_time = time.perf_counter() self.model_memory_usage = m.consumed_memory - logger.info("Loading model weights took %.4f GB", - self.model_memory_usage / float(2**30)) + logger.info("Loading model weights took %.4f GB, cost %.2f s", + self.model_memory_usage / float(2**30), + end_time - start_time) if self.lora_config: assert supports_lora(self.model), "Model does not support LoRA" From 12ed3a57563a05a1ce047b1f64fe28c85e88341a Mon Sep 17 00:00:00 2001 From: brosoul Date: Thu, 7 Nov 2024 22:15:58 +0800 Subject: [PATCH 08/18] fix: metadata file dir --- vllm/engine/arg_utils.py | 10 ++++++---- vllm/model_executor/model_loader/stream/loader.py | 2 ++ 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/vllm/engine/arg_utils.py b/vllm/engine/arg_utils.py index ac05bd730ec27..92bf0e016e03f 100644 --- a/vllm/engine/arg_utils.py +++ b/vllm/engine/arg_utils.py @@ -2,6 +2,7 @@ import dataclasses import json from dataclasses import dataclass +from pathlib import Path from typing import (TYPE_CHECKING, Any, Dict, List, Mapping, Optional, Tuple, Type, Union) @@ -807,14 +808,15 @@ def create_model_config(self) -> ModelConfig: model_loader_extra_config = self.model_loader_extra_config or {} if isinstance(model_loader_extra_config, str): model_loader_extra_config = json.loads( - model_loader_extra_config - ) + model_loader_extra_config) stream_config = StreamConfig(**model_loader_extra_config) stream_model = stream_config.construct_stream_model() - config_dir = self.download_dir or "/tmp/stream_load/" + + config_dir = self.download_dir or str( + Path("/tmp/stream_load/").joinpath(self.model)) config_path = stream_model.download_config(config_dir) - + reset_tokenizer = self.model == self.tokenizer self.served_model_name = self.served_model_name or self.model self.model = str(config_path) diff --git a/vllm/model_executor/model_loader/stream/loader.py b/vllm/model_executor/model_loader/stream/loader.py index 88761c2dadc87..c5c2ee55e6732 100644 --- a/vllm/model_executor/model_loader/stream/loader.py +++ b/vllm/model_executor/model_loader/stream/loader.py @@ -175,6 +175,8 @@ def __post_init__(self): scheme, bucket_name, bucket_path = _parse_bucket_info_from_uri( self.model_uri ) + if not bucket_path.endswith("/"): + bucket_path += "/" self.model_source_type = scheme self.bucket_name = bucket_name From 3854e1486e302d458f6d656d6caff1cc63fbdb94 Mon Sep 17 00:00:00 2001 From: brosoul Date: Thu, 7 Nov 2024 22:24:09 +0800 Subject: [PATCH 09/18] style: format --- vllm/model_executor/model_loader/loader.py | 10 ++-- .../model_loader/stream/__init__.py | 1 - .../model_loader/stream/file.py | 39 ++++++------- .../model_loader/stream/loader.py | 53 ++++++++--------- .../model_loader/stream/utils.py | 57 ++++++++----------- .../model_loader/stream_loader.py | 21 ++++--- 6 files changed, 84 insertions(+), 97 deletions(-) diff --git a/vllm/model_executor/model_loader/loader.py b/vllm/model_executor/model_loader/loader.py index a23b2a4edc777..e7fad39e926f4 100644 --- a/vllm/model_executor/model_loader/loader.py +++ b/vllm/model_executor/model_loader/loader.py @@ -1175,7 +1175,7 @@ def __init__(self, load_config: LoadConfig): else: self.stream_loader_config = StreamConfig( **load_config.model_loader_extra_config) - + self.stream_model = self.stream_loader_config.construct_stream_model() def _verify_config(self, model_config: ModelConfig, @@ -1183,7 +1183,6 @@ def _verify_config(self, model_config: ModelConfig, self.stream_loader_config.verify_with_model_config(model_config) self.stream_loader_config.verify_with_parallel_config(parallel_config) - def load_model(self, *, model_config: ModelConfig, device_config: DeviceConfig, lora_config: Optional[LoRAConfig], @@ -1197,9 +1196,10 @@ def load_model(self, *, model_config: ModelConfig, model = _initialize_model(model_config, self.load_config, lora_config, cache_config, scheduler_config) - - model.load_weights(self.stream_model.get_weights_iterator(device_config.device)) - + + model.load_weights( + self.stream_model.get_weights_iterator(device_config.device)) + return model.eval() def download_model(self, model_config: ModelConfig) -> None: diff --git a/vllm/model_executor/model_loader/stream/__init__.py b/vllm/model_executor/model_loader/stream/__init__.py index fed0dc6eaefdc..8d849ddd52fcb 100644 --- a/vllm/model_executor/model_loader/stream/__init__.py +++ b/vllm/model_executor/model_loader/stream/__init__.py @@ -12,6 +12,5 @@ # See the License for the specific language governing permissions and # limitations under the License. - SUPPORTED_STREAM_STORAGE = ("s3", "tos", "local") DOWNLOAD_CACHE_DIR = ".cache" diff --git a/vllm/model_executor/model_loader/stream/file.py b/vllm/model_executor/model_loader/stream/file.py index 738e645c6398e..584acf36e98d6 100644 --- a/vllm/model_executor/model_loader/stream/file.py +++ b/vllm/model_executor/model_loader/stream/file.py @@ -29,6 +29,7 @@ class LoadFile: + def __init__(self, file_source: str) -> None: self.file_source = file_source @@ -46,6 +47,7 @@ def download(self, target_dir): class LocalFile(LoadFile): + def __init__(self, file: Union[str, Path]) -> None: if not Path(file).exists(): raise ValueError(f"file {file} not exist") @@ -55,9 +57,8 @@ def __init__(self, file: Union[str, Path]) -> None: def load_whole_file(self, num_threads: int = 1): if num_threads != 1: - logger.warning( - "num_threads %s is not supported for local file.", num_threads - ) + logger.warning("num_threads %s is not supported for local file.", + num_threads) tensor_bytes = np.memmap( self.file, @@ -80,6 +81,7 @@ def load_to_buffer(self, offset: int, count: int): class RemoteFile(LoadFile): + def __init__(self, file: str, file_source: str) -> None: self.file = file super().__init__(file_source=file_source) @@ -93,6 +95,7 @@ def download_file(self, target_dir: str, num_threads: int = 1): class S3File(RemoteFile): + def __init__( self, scheme: str, @@ -121,15 +124,15 @@ def __init__( self.s3_client.head_object(Bucket=bucket_name, Key=bucket_path) except Exception as e: raise ValueError( - f"S3 bucket path {bucket_path} not exist for {e}." - ) from e + f"S3 bucket path {bucket_path} not exist for {e}.") from e file = scheme + "://" + bucket_name + "/" + bucket_path super().__init__(file=file, file_source=scheme) @classmethod def from_uri(cls, file_uri: str, **kwargs): - scheme, bucket_name, bucket_path = _parse_bucket_info_from_uri(file_uri) + scheme, bucket_name, bucket_path = _parse_bucket_info_from_uri( + file_uri) cls(scheme, bucket_name, bucket_path, **kwargs) def load_whole_file(self, num_threads: int = 1): @@ -150,9 +153,9 @@ def load_whole_file(self, num_threads: int = 1): def load_to_bytes(self, offset: int, count: int): range_header = f"bytes={offset}-{offset+count-1}" - resp = self.s3_client.get_object( - Bucket=self.bucket_name, Key=self.bucket_path, Range=range_header - ) + resp = self.s3_client.get_object(Bucket=self.bucket_name, + Key=self.bucket_path, + Range=range_header) return read_to_bytes_io(resp.get("Body")) def download_file( @@ -162,13 +165,11 @@ def download_file( force_download: bool = False, ): try: - meta_data = self.s3_client.head_object( - Bucket=self.bucket_name, Key=self.bucket_path - ) + meta_data = self.s3_client.head_object(Bucket=self.bucket_name, + Key=self.bucket_path) except Exception as e: - raise ValueError( - "S3 bucket path %s not exist for %s.", self.bucket_path, e - ) from e + raise ValueError("S3 bucket path %s not exist for %s.", + self.bucket_path, e) from e # ensure target dir exist target_path = Path(target_dir) @@ -181,10 +182,10 @@ def download_file( etag = meta_data.get("ETag", "") file_size = meta_data.get("ContentLength", 0) - meta_data_file = meta_file(local_path=target_path, file_name=_file_name) - if not need_to_download( - local_file, meta_data_file, file_size, etag, force_download - ): + meta_data_file = meta_file(local_path=target_path, + file_name=_file_name) + if not need_to_download(local_file, meta_data_file, file_size, etag, + force_download): logger.info("file `%s` already exist.", self.bucket_path) return diff --git a/vllm/model_executor/model_loader/stream/loader.py b/vllm/model_executor/model_loader/stream/loader.py index c5c2ee55e6732..90d052bb8499b 100644 --- a/vllm/model_executor/model_loader/stream/loader.py +++ b/vllm/model_executor/model_loader/stream/loader.py @@ -52,13 +52,13 @@ def get_safetensors_metas(file: LoadFile): dtype=tensor_meta["dtype"], shape=tensor_meta["shape"], data_offsets=tensor_meta["data_offsets"], - ) - ) + )) # Ensure tensors chunks could be split continuously return sorted(metas, key=lambda obj: obj.real_offset) class StreamLoader: + def __init__( self, file: LoadFile, @@ -89,22 +89,22 @@ def _tensors_reader( # TODO use stream nonblocking IO for tensor_meta in tensor_metas: tensor_buffer = self.file.load_to_buffer( - offset=tensor_meta.real_offset, count=tensor_meta.count - ) + offset=tensor_meta.real_offset, count=tensor_meta.count) tensor = torch.frombuffer( - tensor_buffer, dtype=tensor_meta.dtype - ).view(tensor_meta.shape) + tensor_buffer, dtype=tensor_meta.dtype).view(tensor_meta.shape) if is_cuda: tensor = tensor.to(device, non_blocking=True) tensor_meta.set_tensor(tensor) transfer_out_queue.put(tensor_meta) def get_weights_iterator( - self, device: Union[torch.device, str] = "cpu" + self, + device: Union[torch.device, str] = "cpu" ) -> Generator[Tuple[str, torch.Tensor], None, None]: - tensors_per_reader: List[Tuple[TensorMeta, ...]] = ( - split_continue_tensors(self.tensors_metas, self.num_thread) - ) + tensors_per_reader: List[Tuple[TensorMeta, + ...]] = (split_continue_tensors( + self.tensors_metas, + self.num_thread)) effective_num_readers = len(tensors_per_reader) self._reader_pool = concurrent.futures.ThreadPoolExecutor( @@ -112,8 +112,7 @@ def get_weights_iterator( thread_name_prefix="SafetensorsReader", ) transfer_out_queue: queue.SimpleQueue[Union[Exception, TensorMeta]] = ( - queue.SimpleQueue() - ) # type: ignore + queue.SimpleQueue()) # type: ignore futures: List[concurrent.futures.Future] = [] barrier = threading.Barrier(effective_num_readers) @@ -131,9 +130,9 @@ def get_weights_iterator( try: for _ in range(len(self.tensors_metas)): - tensor_meta: Union[TensorMeta, Exception] = ( - transfer_out_queue.get(timeout=3600) - ) + tensor_meta: Union[TensorMeta, + Exception] = (transfer_out_queue.get( + timeout=3600)) if isinstance(tensor_meta, Exception): raise tensor_meta yield tensor_meta.name, tensor_meta.tensor @@ -141,18 +140,17 @@ def get_weights_iterator( raise def get_weights_iterator_wo_threads( - self, device: Union[torch.device, str] = "cpu" + self, + device: Union[torch.device, str] = "cpu" ) -> Generator[Tuple[str, torch.Tensor], None, None]: device = torch.device(device) is_cuda = device.type == "cuda" # TODO use stream nonblocking IO for tensor_meta in self.tensors_metas: tensor_buffer = self.file.load_to_bytes( - offset=tensor_meta.real_offset, count=tensor_meta.count - ) + offset=tensor_meta.real_offset, count=tensor_meta.count) tensor = torch.frombuffer( - tensor_buffer, dtype=tensor_meta.dtype - ).view(tensor_meta.shape) + tensor_buffer, dtype=tensor_meta.dtype).view(tensor_meta.shape) if is_cuda: tensor = tensor.to(device, non_blocking=True) @@ -173,8 +171,7 @@ class StreamModel: def __post_init__(self): scheme, bucket_name, bucket_path = _parse_bucket_info_from_uri( - self.model_uri - ) + self.model_uri) if not bucket_path.endswith("/"): bucket_path += "/" self.model_source_type = scheme @@ -198,8 +195,7 @@ def __post_init__(self): num_threads=self.num_threads, ) objects_out = self.s3_client.list_objects_v2( - Bucket=self.bucket_name, Delimiter="/", Prefix=bucket_path - ) + Bucket=self.bucket_name, Delimiter="/", Prefix=bucket_path) files = [ str(content.get("Key")) for content in objects_out.get("Contents", []) @@ -215,9 +211,9 @@ def __post_init__(self): if len(self.safetensors_files) == 0: raise ValueError(f"no safetensors file found in {self.model_uri}") - def download_config( - self, target_dir: str, force_download: bool = False - ) -> Path: + def download_config(self, + target_dir: str, + force_download: bool = False) -> Path: if self.model_source_type == "local": logger.info("local config no need to download") return Path(self.model_uri) @@ -258,6 +254,5 @@ def get_weights_iterator(self, device: Union[torch.device, str] = "cpu"): use_direct_io=self.use_direct_io, ) for name, tensor in safetensors_loader.get_weights_iterator( - device=device - ): + device=device): yield name, tensor diff --git a/vllm/model_executor/model_loader/stream/utils.py b/vllm/model_executor/model_loader/stream/utils.py index 0b3e8d7821951..5b23086323f4d 100644 --- a/vllm/model_executor/model_loader/stream/utils.py +++ b/vllm/model_executor/model_loader/stream/utils.py @@ -14,12 +14,8 @@ def meta_file(local_path: Union[Path, str], file_name: str) -> Path: - return ( - Path(local_path) - .joinpath(DOWNLOAD_CACHE_DIR) - .joinpath(f"{file_name}.metadata") - .absolute() - ) + return (Path(local_path).joinpath(DOWNLOAD_CACHE_DIR).joinpath( + f"{file_name}.metadata").absolute()) def save_meta_data(file_path: Union[Path, str], etag: str): @@ -72,9 +68,8 @@ def need_to_download( if force_download: return True - return not check_file_exist( - local_file, meta_data_file, expected_file_size, expected_etag - ) + return not check_file_exist(local_file, meta_data_file, expected_file_size, + expected_etag) def read_to_bytes_io(content, chunk_size=None): @@ -132,19 +127,19 @@ def _parse_bucket_info_from_uri(uri: str) -> Tuple[str, str, str]: return scheme, bucket_name, bucket_path -def _create_s3_client( - ak, sk, endpoint, region, num_threads=DEFAULT_POOL_CONNECTIONS -): +def _create_s3_client(ak, + sk, + endpoint, + region, + num_threads=DEFAULT_POOL_CONNECTIONS): ak = ak or os.getenv("AWS_ACCESS_KEY_ID") sk = sk or os.getenv("AWS_SECRET_ACCESS_KEY") endpoint = endpoint or os.getenv("AWS_ENDPOINT_URL") region = region or os.getenv("AWS_REGION") - max_pool_connections = ( - num_threads - if num_threads > DEFAULT_POOL_CONNECTIONS - else DEFAULT_POOL_CONNECTIONS - ) + max_pool_connections = (num_threads + if num_threads > DEFAULT_POOL_CONNECTIONS else + DEFAULT_POOL_CONNECTIONS) my_config = Config( # signature_version = 'v4', @@ -162,6 +157,7 @@ def _create_s3_client( class TensorMeta: + def __init__( self, name: str, @@ -209,22 +205,19 @@ def set_tensor(self, tensor: torch.Tensor) -> None: self._tensor = tensor def __str__(self) -> str: - return str( - { - "name": self._name, - "dtype": self._dtype, - "shape": self._shape, - "data_offsets": self._data_offsets, - } - ) + return str({ + "name": self._name, + "dtype": self._dtype, + "shape": self._shape, + "data_offsets": self._data_offsets, + }) def __repr__(self) -> str: return self.__str__() -def split_continue_tensors( - tensor_metas: List[TensorMeta], num_readers: int -) -> List[Tuple[TensorMeta, ...]]: +def split_continue_tensors(tensor_metas: List[TensorMeta], + num_readers: int) -> List[Tuple[TensorMeta, ...]]: """ Note: Usually, the number of groups for splitting tensors is greater than num_deaders. @@ -233,7 +226,7 @@ def split_continue_tensors( assert num_readers > 0, "num_readers should be greater than 0" if len(tensor_metas) <= num_readers: - return [(item,) for item in tensor_metas] + return [(item, ) for item in tensor_metas] max_offset = tensor_metas[-1].data_offsets[1] avg_size = max_offset // num_readers @@ -255,13 +248,13 @@ def split_continue_tensors( def split_continue_tensors_v1( - tensor_metas: List[TensorMeta], num_readers: int -) -> List[Tuple[TensorMeta, ...]]: + tensor_metas: List[TensorMeta], + num_readers: int) -> List[Tuple[TensorMeta, ...]]: assert len(tensor_metas) > 0, "tensor_metas should not be empty" assert num_readers > 0, "num_readers should be greater than 0" if len(tensor_metas) <= num_readers: - return [(item,) for item in tensor_metas] + return [(item, ) for item in tensor_metas] max_offset = tensor_metas[-1].data_offsets[1] avg_size = max_offset // num_readers diff --git a/vllm/model_executor/model_loader/stream_loader.py b/vllm/model_executor/model_loader/stream_loader.py index 989e631affa78..3989aa2004840 100644 --- a/vllm/model_executor/model_loader/stream_loader.py +++ b/vllm/model_executor/model_loader/stream_loader.py @@ -7,6 +7,7 @@ logger = init_logger(__name__) + @dataclass class StreamConfig: model_uri: str @@ -16,7 +17,7 @@ class StreamConfig: s3_region: Optional[str] = None s3_endpinit: Optional[str] = None use_pinmem: bool = False - use_direct_io: bool = False + use_direct_io: bool = False def verify_with_parallel_config( self, @@ -33,13 +34,11 @@ def verify_with_model_config(self, model_config: "ModelConfig") -> None: " is unstable and may lead to errors.") def construct_stream_model(self) -> StreamModel: - return StreamModel( - model_uri=self.model_uri, - num_threads=self.num_threads, - s3_access_key_id=self.s3_access_key_id, - s3_secret_access_key=self.s3_secret_access_key, - s3_endpinit=self.s3_endpinit, - s3_region=self.s3_region, - use_pinmem=self.use_pinmem, - use_direct_io=self.use_direct_io - ) \ No newline at end of file + return StreamModel(model_uri=self.model_uri, + num_threads=self.num_threads, + s3_access_key_id=self.s3_access_key_id, + s3_secret_access_key=self.s3_secret_access_key, + s3_endpinit=self.s3_endpinit, + s3_region=self.s3_region, + use_pinmem=self.use_pinmem, + use_direct_io=self.use_direct_io) From 7193ab1f4d0076ba2729521a17e1c945ac0b8224 Mon Sep 17 00:00:00 2001 From: brosoul Date: Fri, 8 Nov 2024 17:57:35 +0800 Subject: [PATCH 10/18] speed up load bytes into bytesio --- vllm/model_executor/model_loader/stream/utils.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/vllm/model_executor/model_loader/stream/utils.py b/vllm/model_executor/model_loader/stream/utils.py index 5b23086323f4d..8d644cbced420 100644 --- a/vllm/model_executor/model_loader/stream/utils.py +++ b/vllm/model_executor/model_loader/stream/utils.py @@ -73,16 +73,17 @@ def need_to_download( def read_to_bytes_io(content, chunk_size=None): - chunk_size = int(os.getenv("STREAM_READ_CHUNK_SIZE", 8388608)) # 8MB - bytes_io = BytesIO() + chunk_size = int(os.getenv("STREAM_READ_CHUNK_SIZE", 8388608)) # 8MB buf = content.read(chunk_size) + _all_bufs = [buf] + while buf: - bytes_io.write(buf) buf = content.read(chunk_size) + _all_bufs.append(buf) + bytes_io = BytesIO(b"".join(_all_bufs)) bytes_io.seek(0) return bytes_io - def filter_suffix_files(files: List[str], suffix: str) -> List[str]: return [file for file in files if file.endswith(suffix)] From 8d2c8560d5d2afcb13546e4231b66e8c0569bc5c Mon Sep 17 00:00:00 2001 From: brosoul Date: Fri, 8 Nov 2024 17:58:50 +0800 Subject: [PATCH 11/18] force tensor load to cpu due to vllm tensor copy strategy --- vllm/model_executor/model_loader/stream/loader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vllm/model_executor/model_loader/stream/loader.py b/vllm/model_executor/model_loader/stream/loader.py index 90d052bb8499b..7ad2ccf1e1efa 100644 --- a/vllm/model_executor/model_loader/stream/loader.py +++ b/vllm/model_executor/model_loader/stream/loader.py @@ -254,5 +254,5 @@ def get_weights_iterator(self, device: Union[torch.device, str] = "cpu"): use_direct_io=self.use_direct_io, ) for name, tensor in safetensors_loader.get_weights_iterator( - device=device): + device="cpu"): yield name, tensor From c2375b1649c99c8a87647c136108330bb9df6fa7 Mon Sep 17 00:00:00 2001 From: brosoul Date: Fri, 8 Nov 2024 18:01:57 +0800 Subject: [PATCH 12/18] format --- vllm/model_executor/model_loader/stream/utils.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/vllm/model_executor/model_loader/stream/utils.py b/vllm/model_executor/model_loader/stream/utils.py index 8d644cbced420..e17bba7b30365 100644 --- a/vllm/model_executor/model_loader/stream/utils.py +++ b/vllm/model_executor/model_loader/stream/utils.py @@ -73,7 +73,7 @@ def need_to_download( def read_to_bytes_io(content, chunk_size=None): - chunk_size = int(os.getenv("STREAM_READ_CHUNK_SIZE", 8388608)) # 8MB + chunk_size = int(os.getenv("STREAM_READ_CHUNK_SIZE", 8388608)) # 8MB buf = content.read(chunk_size) _all_bufs = [buf] @@ -84,6 +84,7 @@ def read_to_bytes_io(content, chunk_size=None): bytes_io.seek(0) return bytes_io + def filter_suffix_files(files: List[str], suffix: str) -> List[str]: return [file for file in files if file.endswith(suffix)] From 1f27913f55f4cce27336b1ef0a8488f4348be872 Mon Sep 17 00:00:00 2001 From: brosoul Date: Fri, 8 Nov 2024 18:05:45 +0800 Subject: [PATCH 13/18] fix device cpu --- vllm/model_executor/model_loader/loader.py | 2 +- vllm/model_executor/model_loader/stream/loader.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/vllm/model_executor/model_loader/loader.py b/vllm/model_executor/model_loader/loader.py index e7fad39e926f4..fa5e93151181d 100644 --- a/vllm/model_executor/model_loader/loader.py +++ b/vllm/model_executor/model_loader/loader.py @@ -1198,7 +1198,7 @@ def load_model(self, *, model_config: ModelConfig, scheduler_config) model.load_weights( - self.stream_model.get_weights_iterator(device_config.device)) + self.stream_model.get_weights_iterator("cpu")) return model.eval() diff --git a/vllm/model_executor/model_loader/stream/loader.py b/vllm/model_executor/model_loader/stream/loader.py index 7ad2ccf1e1efa..90d052bb8499b 100644 --- a/vllm/model_executor/model_loader/stream/loader.py +++ b/vllm/model_executor/model_loader/stream/loader.py @@ -254,5 +254,5 @@ def get_weights_iterator(self, device: Union[torch.device, str] = "cpu"): use_direct_io=self.use_direct_io, ) for name, tensor in safetensors_loader.get_weights_iterator( - device="cpu"): + device=device): yield name, tensor From b167008d476f019192642dda117d6dd8b29cd094 Mon Sep 17 00:00:00 2001 From: brosoul Date: Fri, 8 Nov 2024 18:06:13 +0800 Subject: [PATCH 14/18] format --- vllm/model_executor/model_loader/loader.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/vllm/model_executor/model_loader/loader.py b/vllm/model_executor/model_loader/loader.py index fa5e93151181d..52612a387b516 100644 --- a/vllm/model_executor/model_loader/loader.py +++ b/vllm/model_executor/model_loader/loader.py @@ -1197,8 +1197,7 @@ def load_model(self, *, model_config: ModelConfig, lora_config, cache_config, scheduler_config) - model.load_weights( - self.stream_model.get_weights_iterator("cpu")) + model.load_weights(self.stream_model.get_weights_iterator("cpu")) return model.eval() From 9927ce69c1c2929b54bdeeb62fc7b9cfafa6c057 Mon Sep 17 00:00:00 2001 From: brosoul Date: Tue, 12 Nov 2024 10:44:46 +0800 Subject: [PATCH 15/18] enable parallel load model --- vllm/model_executor/model_loader/stream_loader.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/vllm/model_executor/model_loader/stream_loader.py b/vllm/model_executor/model_loader/stream_loader.py index 3989aa2004840..710389d27f4f5 100644 --- a/vllm/model_executor/model_loader/stream_loader.py +++ b/vllm/model_executor/model_loader/stream_loader.py @@ -24,8 +24,8 @@ def verify_with_parallel_config( parallel_config: "ParallelConfig", ) -> None: if parallel_config.tensor_parallel_size > 1: - raise ValueError( - "steam loader does not support tensor parallelism yet") + logger.info("stream loader will use %s parallel to load model", + parallel_config.tensor_parallel_size) def verify_with_model_config(self, model_config: "ModelConfig") -> None: if model_config.quantization is not None: From 2e74b9c0534e25464be3561b6cc5d91ea95a9c7b Mon Sep 17 00:00:00 2001 From: brosoul Date: Tue, 12 Nov 2024 16:17:01 +0800 Subject: [PATCH 16/18] remove useless code --- vllm/entrypoints/openai/api_server.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/vllm/entrypoints/openai/api_server.py b/vllm/entrypoints/openai/api_server.py index 3da0a79ff52a1..fd6f36e8768dd 100644 --- a/vllm/entrypoints/openai/api_server.py +++ b/vllm/entrypoints/openai/api_server.py @@ -533,9 +533,6 @@ def signal_handler(*_) -> None: app = build_app(args) model_config = await engine_client.get_model_config() - logger.info(">>>>>>>>>>>>") - logger.info(model_config) - logger.info(">>>>>>>>>>>>") init_app_state(engine_client, model_config, app.state, args) temp_socket.close() From 8bcbdb1e8fd8fb5af624dd2df9b0de9df4095385 Mon Sep 17 00:00:00 2001 From: brosoul Date: Tue, 12 Nov 2024 17:24:09 +0800 Subject: [PATCH 17/18] rename num_thread to num_threads --- vllm/model_executor/model_loader/stream/loader.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/vllm/model_executor/model_loader/stream/loader.py b/vllm/model_executor/model_loader/stream/loader.py index 90d052bb8499b..2145795bde653 100644 --- a/vllm/model_executor/model_loader/stream/loader.py +++ b/vllm/model_executor/model_loader/stream/loader.py @@ -62,12 +62,12 @@ class StreamLoader: def __init__( self, file: LoadFile, - num_thread: int = 32, + num_threads: int = 32, use_pinmem: bool = False, use_direct_io: bool = False, ) -> None: self.file = file - self.num_thread = num_thread + self.num_threads = num_threads self.use_pinmem = use_pinmem self.use_direct_io = use_direct_io # TODO assert file type is safetensors @@ -86,7 +86,7 @@ def _tensors_reader( ) -> None: device = torch.device(device) is_cuda = device.type == "cuda" - # TODO use stream nonblocking IO + # TODO barrier could work in using stream nonblocking IO for tensor_meta in tensor_metas: tensor_buffer = self.file.load_to_buffer( offset=tensor_meta.real_offset, count=tensor_meta.count) @@ -104,7 +104,7 @@ def get_weights_iterator( tensors_per_reader: List[Tuple[TensorMeta, ...]] = (split_continue_tensors( self.tensors_metas, - self.num_thread)) + self.num_threads)) effective_num_readers = len(tensors_per_reader) self._reader_pool = concurrent.futures.ThreadPoolExecutor( @@ -116,7 +116,6 @@ def get_weights_iterator( futures: List[concurrent.futures.Future] = [] barrier = threading.Barrier(effective_num_readers) - for thread_idx, tensor_metas in enumerate(tensors_per_reader): future = self._reader_pool.submit( self._tensors_reader, @@ -249,7 +248,7 @@ def get_weights_iterator(self, device: Union[torch.device, str] = "cpu"): ) safetensors_loader = StreamLoader( file=safetensors_s3, - num_thread=self.num_threads, + num_threads=self.num_threads, use_pinmem=self.use_pinmem, use_direct_io=self.use_direct_io, ) From a2d1d1077d760e47f67ffc80f2c3f4b61bd97273 Mon Sep 17 00:00:00 2001 From: brosoul Date: Tue, 12 Nov 2024 18:48:44 +0800 Subject: [PATCH 18/18] add extra_require for stream loader --- setup.py | 1 + 1 file changed, 1 insertion(+) diff --git a/setup.py b/setup.py index 7da9115440433..a6186c48e5447 100644 --- a/setup.py +++ b/setup.py @@ -509,6 +509,7 @@ def _read_requirements(filename: str) -> List[str]: install_requires=get_requirements(), ext_modules=ext_modules, extras_require={ + "stream": ["boto3>=1.35.5"], "tensorizer": ["tensorizer>=2.9.0"], "video": ["opencv-python"], # Required for video processing "audio": ["librosa", "soundfile"] # Required for audio processing