diff --git a/python/hopsworks_common/core/dataset_api.py b/python/hopsworks_common/core/dataset_api.py index 8dac9a421..f7ce40743 100644 --- a/python/hopsworks_common/core/dataset_api.py +++ b/python/hopsworks_common/core/dataset_api.py @@ -17,14 +17,16 @@ from __future__ import annotations import copy +import json import logging import math import os import shutil import time from concurrent.futures import ThreadPoolExecutor, wait +from typing import Literal, Optional, Union -from hopsworks_common import client, usage, util +from hopsworks_common import client, tag, usage, util from hopsworks_common.client.exceptions import DatasetException, RestAPIError from hopsworks_common.core import inode from tqdm.auto import tqdm @@ -42,11 +44,25 @@ class DatasetApi: def __init__(self): self._log = logging.getLogger(__name__) - DEFAULT_FLOW_CHUNK_SIZE = 1048576 + DEFAULT_UPLOAD_FLOW_CHUNK_SIZE = 10 * 1024 * 1024 + DEFAULT_UPLOAD_SIMULTANEOUS_UPLOADS = 3 + DEFAULT_UPLOAD_SIMULTANEOUS_CHUNKS = 3 + DEFAULT_UPLOAD_MAX_CHUNK_RETRIES = 1 + + DEFAULT_DOWNLOAD_FLOW_CHUNK_SIZE = 1024 * 1024 FLOW_PERMANENT_ERRORS = [404, 413, 415, 500, 501] + # alias for backwards-compatibility: + DEFAULT_FLOW_CHUNK_SIZE = DEFAULT_DOWNLOAD_FLOW_CHUNK_SIZE + @usage.method_logger - def download(self, path: str, local_path: str = None, overwrite: bool = False): + def download( + self, + path: str, + local_path: Optional[str] = None, + overwrite: Optional[bool] = False, + chunk_size: int = DEFAULT_DOWNLOAD_FLOW_CHUNK_SIZE, + ): """Download file from Hopsworks Filesystem to the current working directory. ```python @@ -64,6 +80,7 @@ def download(self, path: str, local_path: str = None, overwrite: bool = False): path: path in Hopsworks filesystem to the file local_path: path where to download the file in the local filesystem overwrite: overwrite local file if exists + chunk_size: upload chunk size in bytes. Default 1 MB # Returns `str`: Path to downloaded file # Raises @@ -122,9 +139,7 @@ def download(self, path: str, local_path: str = None, overwrite: bool = False): self._log.exception("Failed to initialize progress bar.") self._log.info("Starting download") - for chunk in response.iter_content( - chunk_size=self.DEFAULT_FLOW_CHUNK_SIZE - ): + for chunk in response.iter_content(chunk_size=chunk_size): f.write(chunk) if pbar is not None: @@ -143,11 +158,11 @@ def upload( local_path: str, upload_path: str, overwrite: bool = False, - chunk_size=DEFAULT_FLOW_CHUNK_SIZE, - simultaneous_uploads=3, - simultaneous_chunks=3, - max_chunk_retries=1, - chunk_retry_interval=1, + chunk_size: int = DEFAULT_UPLOAD_FLOW_CHUNK_SIZE, + simultaneous_uploads: int = DEFAULT_UPLOAD_SIMULTANEOUS_UPLOADS, + simultaneous_chunks: int = DEFAULT_UPLOAD_SIMULTANEOUS_CHUNKS, + max_chunk_retries: int = DEFAULT_UPLOAD_MAX_CHUNK_RETRIES, + chunk_retry_interval: int = 1, ): """Upload a file or directory to the Hopsworks filesystem. @@ -170,7 +185,7 @@ def upload( local_path: local path to file or directory to upload, can be relative or absolute upload_path: path to directory where to upload the file in Hopsworks Filesystem overwrite: overwrite file or directory if exists - chunk_size: upload chunk size in bytes. Default 1048576 bytes + chunk_size: upload chunk size in bytes. Default 10 MB simultaneous_chunks: number of simultaneous chunks to upload for each file upload. Default 3 simultaneous_uploads: number of simultaneous files to be uploaded for directories. Default 3 max_chunk_retries: maximum retry for a chunk. Default is 1 @@ -375,6 +390,16 @@ def _get(self, path: str): headers = {"content-type": "application/json"} return _client._send_request("GET", path_params, headers=headers) + def get(self, path: str): + """Get dataset. + + :param path: path to check + :type path: str + :return: dataset metadata + :rtype: dict + """ + return self._get(path) + def exists(self, path: str): """Check if a file exists in the Hopsworks Filesystem. @@ -391,6 +416,16 @@ def exists(self, path: str): except RestAPIError: return False + def path_exists(self, remote_path: str): + """Check if a path exists in datasets. + + :param remote_path: path to check + :type remote_path: str + :return: boolean whether path exists + :rtype: bool + """ + return self.exists(remote_path) + @usage.method_logger def remove(self, path: str): """Remove a path in the Hopsworks Filesystem. @@ -402,7 +437,17 @@ def remove(self, path: str): """ _client = client.get_instance() path_params = ["project", _client._project_id, "dataset", path] - _client._send_request("DELETE", path_params) + return _client._send_request("DELETE", path_params) + + def rm(self, remote_path: str): + """Remove a path in the Hopsworks Filesystem. + + # Arguments + remote_path: path to remove + # Raises + `RestAPIError`: If unable to remove the path + """ + return self.remove(remote_path) @usage.method_logger def mkdir(self, path: str): @@ -573,6 +618,27 @@ def list_files(self, path, offset, limit): return inode_lst["count"], inode.Inode.from_response_json(inode_lst) + @usage.method_logger + def list(self, remote_path, sort_by=None, limit=1000): + """List all files in a directory in datasets. + + :param remote_path: path to list + :type remote_path: str + :param sort_by: sort string + :type sort_by: str + :param limit: max number of returned files + :type limit: int + """ + # this method is probably to be merged with list_files + # they seem to handle paths differently and return different results, which prevents the merge at the moment (2024-09-03), due to the requirement of backwards-compatibility + _client = client.get_instance() + path_params = ["project", _client._project_id, "dataset", remote_path] + query_params = {"action": "listing", "sort_by": sort_by, "limit": limit} + headers = {"content-type": "application/json"} + return _client._send_request( + "GET", path_params, headers=headers, query_params=query_params + ) + @usage.method_logger def read_content(self, path: str, dataset_type: str = "DATASET"): _client = client.get_instance() @@ -591,3 +657,223 @@ def read_content(self, path: str, dataset_type: str = "DATASET"): } return _client._send_request("GET", path_params, query_params, stream=True) + + def chmod(self, remote_path, permissions): + """Chmod operation on file or directory in datasets. + + :param remote_path: path to chmod + :type remote_path: str + :param permissions: permissions string, for example u+x + :type permissions: str + """ + _client = client.get_instance() + path_params = ["project", _client._project_id, "dataset", remote_path] + headers = {"content-type": "application/json"} + query_params = {"action": "PERMISSION", "permissions": permissions} + return _client._send_request( + "PUT", path_params, headers=headers, query_params=query_params + ) + + # region Archiving + + def _archive( + self, + remote_path: str, + destination_path: Optional[str] = None, + block: bool = False, + timeout: Optional[int] = 120, + action: Union[Literal["unzip"], Literal["zip"]] = "unzip", + ): + """Internal (de)compression logic. + + # Arguments + remote_path: path to file or directory to unzip. + destination_path: path to upload the zip, defaults to None; is used only if action is zip. + block: if the operation should be blocking until complete, defaults to False. + timeout: timeout in seconds for the blocking, defaults to 120; if None, the blocking is unbounded. + action: zip or unzip, defaults to unzip. + + # Returns + `bool`: whether the operation completed in the specified timeout; if non-blocking, always returns True. + """ + + _client = client.get_instance() + path_params = ["project", _client._project_id, "dataset", remote_path] + + query_params = {"action": action} + + if destination_path is not None: + query_params["destination_path"] = destination_path + query_params["destination_type"] = "DATASET" + + headers = {"content-type": "application/json"} + + _client._send_request( + "POST", path_params, headers=headers, query_params=query_params + ) + + if not block: + # the call is successful at this point if we don't want to block + return True + + # Wait for zip file to appear. When it does, check that parent dir zipState is not set to CHOWNING + count = 0 + while timeout is None: + if action == "zip": + zip_path = remote_path + ".zip" + # Get the status of the zipped file + if destination_path is None: + zip_exists = self.path_exists(zip_path) + else: + zip_exists = self.path_exists( + destination_path + "/" + os.path.split(zip_path)[1] + ) + # Get the zipState of the directory being zipped + dir_status = self.get(remote_path) + zip_state = dir_status["zipState"] if "zipState" in dir_status else None + if zip_exists and zip_state == "NONE": + return True + elif action == "unzip": + # Get the status of the unzipped dir + unzipped_dir_exists = self.path_exists( + remote_path[: remote_path.index(".")] + ) + # Get the zipState of the zip being extracted + dir_status = self.get(remote_path) + zip_state = dir_status["zipState"] if "zipState" in dir_status else None + if unzipped_dir_exists and zip_state == "NONE": + return True + time.sleep(1) + count += 1 + if count >= timeout: + self._log.info( + f"Timeout of {timeout} seconds exceeded while {action} {remote_path}." + ) + return False + + def unzip( + self, remote_path: str, block: bool = False, timeout: Optional[int] = 120 + ): + """Unzip an archive in the dataset. + + # Arguments + remote_path: path to file or directory to unzip. + block: if the operation should be blocking until complete, defaults to False. + timeout: timeout in seconds for the blocking, defaults to 120; if None, the blocking is unbounded. + + # Returns + `bool`: whether the operation completed in the specified timeout; if non-blocking, always returns True. + """ + return self._archive(remote_path, block=block, timeout=timeout, action="unzip") + + def zip( + self, + remote_path: str, + destination_path: Optional[str] = None, + block: bool = False, + timeout: Optional[int] = 120, + ): + """Zip a file or directory in the dataset. + + # Arguments + remote_path: path to file or directory to unzip. + destination_path: path to upload the zip, defaults to None. + block: if the operation should be blocking until complete, defaults to False. + timeout: timeout in seconds for the blocking, defaults to 120; if None, the blocking is unbounded. + + # Returns + `bool`: whether the operation completed in the specified timeout; if non-blocking, always returns True. + """ + return self._archive( + remote_path, + destination_path=destination_path, + block=block, + timeout=timeout, + action="zip", + ) + + # region Dataset Tags + + def add(self, path, name, value): + """Attach a name/value tag to a model. + + A tag consists of a name/value pair. Tag names are unique identifiers. + The value of a tag can be any valid json - primitives, arrays or json objects. + + :param path: path to add the tag + :type path: str + :param name: name of the tag to be added + :type name: str + :param value: value of the tag to be added + :type value: str + """ + _client = client.get_instance() + path_params = [ + "project", + _client._project_id, + "dataset", + "tags", + "schema", + name, + path, + ] + headers = {"content-type": "application/json"} + json_value = json.dumps(value) + _client._send_request("PUT", path_params, headers=headers, data=json_value) + + def delete(self, path, name): + """Delete a tag. + + Tag names are unique identifiers. + + :param path: path to delete the tags + :type path: str + :param name: name of the tag to be removed + :type name: str + """ + _client = client.get_instance() + path_params = [ + "project", + _client._project_id, + "dataset", + "tags", + "schema", + name, + path, + ] + _client._send_request("DELETE", path_params) + + def get_tags(self, path, name: str = None): + """Get the tags. + + Gets all tags if no tag name is specified. + + :param path: path to get the tags + :type path: str + :param name: tag name + :type name: str + :return: dict of tag name/values + :rtype: dict + """ + _client = client.get_instance() + path_params = [ + "project", + _client._project_id, + "dataset", + "tags", + ] + + if name is not None: + path_params.append("schema") + path_params.append(name) + else: + path_params.append("all") + + path_params.append(path) + + return { + tag._name: json.loads(tag._value) + for tag in tag.Tag.from_response_json( + _client._send_request("GET", path_params) + ) + } diff --git a/python/hopsworks_common/engine/__init__.py b/python/hopsworks_common/engine/__init__.py new file mode 100644 index 000000000..ff8055b9b --- /dev/null +++ b/python/hopsworks_common/engine/__init__.py @@ -0,0 +1,15 @@ +# +# Copyright 2024 Hopsworks AB +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/python/hopsworks_common/kafka_topic.py b/python/hopsworks_common/kafka_topic.py index 43ae96fb4..c056917b7 100644 --- a/python/hopsworks_common/kafka_topic.py +++ b/python/hopsworks_common/kafka_topic.py @@ -14,19 +14,25 @@ # limitations under the License. # +from __future__ import annotations + import json +from typing import Optional import humps from hopsworks_common import usage, util +from hopsworks_common.constants import KAFKA_TOPIC from hopsworks_common.core import kafka_api class KafkaTopic: + """Configuration for a Kafka topic.""" + def __init__( self, - name=None, - num_of_replicas=None, - num_of_partitions=None, + name: Optional[str] = KAFKA_TOPIC.CREATE, + num_of_replicas: Optional[int] = None, + num_of_partitions: Optional[int] = None, schema_name=None, schema_version=None, schema_content=None, @@ -37,11 +43,18 @@ def __init__( expand=None, items=None, count=None, + num_replicas: Optional[int] = None, + num_partitions: Optional[int] = None, **kwargs, ): self._name = name - self._num_of_replicas = num_of_replicas - self._num_of_partitions = num_of_partitions + if not num_of_replicas: + num_of_replicas = num_replicas + if not num_of_partitions: + num_of_partitions = num_partitions + self._num_of_replicas, self._num_of_partitions = self._validate_topic_config( + self._name, num_of_replicas, num_of_partitions + ) self._schema_name = schema_name self._schema_version = schema_version self._schema_content = schema_content @@ -50,15 +63,63 @@ def __init__( self._kafka_api = kafka_api.KafkaApi() + def describe(self): + util.pretty_print(self) + + @classmethod + def _validate_topic_config(cls, name, num_replicas, num_partitions): + if name is not None and name != KAFKA_TOPIC.NONE: + if name == KAFKA_TOPIC.CREATE: + if num_replicas is None: + print( + "Setting number of replicas to default value '{}'".format( + KAFKA_TOPIC.NUM_REPLICAS + ) + ) + num_replicas = KAFKA_TOPIC.NUM_REPLICAS + if num_partitions is None: + print( + "Setting number of partitions to default value '{}'".format( + KAFKA_TOPIC.NUM_PARTITIONS + ) + ) + num_partitions = KAFKA_TOPIC.NUM_PARTITIONS + else: + if num_replicas is not None or num_partitions is not None: + raise ValueError( + "Number of replicas or partitions cannot be changed in existing kafka topics." + ) + elif name is None or name == KAFKA_TOPIC.NONE: + num_replicas = None + num_partitions = None + + return num_replicas, num_partitions + @classmethod def from_response_json(cls, json_dict): json_decamelized = humps.decamelize(json_dict) if "count" not in json_decamelized: - return cls(**json_decamelized) + return cls.from_json(json_decamelized) elif json_decamelized["count"] == 0: return [] else: - return [cls(**kafka_topic) for kafka_topic in json_decamelized["items"]] + return [cls.from_json(jd) for jd in json_decamelized["items"]] + + @classmethod + def from_json(cls, json_decamelized): + return KafkaTopic(**cls.extract_fields_from_json(json_decamelized)) + + @classmethod + def extract_fields_from_json(cls, json_decamelized): + kwargs = {} + kwargs["name"] = json_decamelized.pop("name") # required + kwargs["num_replicas"] = util.extract_field_from_json( + json_decamelized, ["num_of_replicas", "num_replicas"] + ) + kwargs["num_partitions"] = util.extract_field_from_json( + json_decamelized, ["num_of_partitions", "num_partitions"] + ) + return kwargs def update_from_response_json(self, json_dict): json_decamelized = humps.decamelize(json_dict) @@ -67,19 +128,41 @@ def update_from_response_json(self, json_dict): @property def name(self): - """Name of the topic""" + """Name of the Kafka topic.""" return self._name + @name.setter + def name(self, name: str): + self._name = name + @property def replicas(self): - """Replication factor for the topic""" + """Number of replicas of the Kafka topic.""" + return self._num_of_replicas + + @property + def num_replicas(self): + """Number of replicas of the Kafka topic.""" return self._num_of_replicas + @num_replicas.setter + def num_replicas(self, num_replicas: int): + self._num_of_replicas = num_replicas + @property def partitions(self): - """Number of partitions for the topic""" + """Number of partitions of the Kafka topic.""" + return self._num_of_partitions + + @property + def num_partitions(self): + """Number of partitions of the Kafka topic.""" return self._num_of_partitions + @num_partitions.setter + def topic_num_partitions(self, num_partitions: int): + self._num_partitions = num_partitions + @property def schema(self): """Schema for the topic""" @@ -100,6 +183,15 @@ def delete(self): def json(self): return json.dumps(self, cls=util.Encoder) + def to_dict(self): + return { + "kafkaTopicDTO": { + "name": self._name, + "numOfReplicas": self._num_of_replicas, + "numOfPartitions": self._num_of_partitions, + } + } + def __str__(self): return self.json() diff --git a/python/hopsworks_common/util.py b/python/hopsworks_common/util.py index b63453e7e..27a3ff8eb 100644 --- a/python/hopsworks_common/util.py +++ b/python/hopsworks_common/util.py @@ -16,9 +16,12 @@ from __future__ import annotations +import inspect import itertools import json +import os import re +import shutil import sys import threading import time @@ -35,9 +38,13 @@ ) from urllib.parse import urljoin, urlparse +import humps from hopsworks_common import client from hopsworks_common.client.exceptions import FeatureStoreException, JobException +from hopsworks_common.constants import MODEL, PREDICTOR, Default +from hopsworks_common.core.constants import HAS_PANDAS from hopsworks_common.git_file_status import GitFileStatus +from six import string_types if TYPE_CHECKING: @@ -55,6 +62,52 @@ def default(self, o: Any) -> Dict[str, Any]: return super().default(o) +class NumpyEncoder(json.JSONEncoder): + """Special json encoder for numpy types. + Note that some numpy types doesn't have native python equivalence, + hence json.dumps will raise TypeError. + In this case, you'll need to convert your numpy types into its closest python equivalence. + """ + + def convert(self, obj): + import base64 + + import numpy as np + + if HAS_PANDAS: + import pandas as pd + + def encode_binary(x): + return base64.encodebytes(x).decode("ascii") + + if isinstance(obj, np.ndarray): + if obj.dtype == np.object: + return [self.convert(x)[0] for x in obj.tolist()] + elif obj.dtype == np.bytes_: + return np.vectorize(encode_binary)(obj), True + else: + return obj.tolist(), True + + if isinstance(obj, datetime.date) or ( + HAS_PANDAS and isinstance(obj, pd.Timestamp) + ): + return obj.isoformat(), True + if isinstance(obj, bytes) or isinstance(obj, bytearray): + return encode_binary(obj), True + if isinstance(obj, np.generic): + return obj.item(), True + if isinstance(obj, np.datetime64): + return np.datetime_as_string(obj), True + return obj, False + + def default(self, obj): # pylint: disable=E0202 + res, converted = self.convert(obj) + if converted: + return res + else: + return super().default(obj) + + VALID_EMBEDDING_TYPE = { "array", "array", @@ -336,6 +389,10 @@ class VersionWarning(Warning): pass +class ProvenanceWarning(Warning): + pass + + class JobWarning(Warning): pass @@ -400,3 +457,267 @@ def is_interactive(): import __main__ as main return not hasattr(main, "__file__") + + +# Model registry + +# - schema and types + + +def set_model_class(model): + from hsml.llm.model import Model as LLMModel + from hsml.model import Model as BaseModel + from hsml.python.model import Model as PyModel + from hsml.sklearn.model import Model as SkLearnModel + from hsml.tensorflow.model import Model as TFModel + from hsml.torch.model import Model as TorchModel + + if "href" in model: + _ = model.pop("href") + if "type" in model: # backwards compatibility + _ = model.pop("type") + if "tags" in model: + _ = model.pop("tags") # tags are always retrieved from backend + + if "framework" not in model: + return BaseModel(**model) + + framework = model.pop("framework") + if framework == MODEL.FRAMEWORK_TENSORFLOW: + return TFModel(**model) + if framework == MODEL.FRAMEWORK_TORCH: + return TorchModel(**model) + if framework == MODEL.FRAMEWORK_SKLEARN: + return SkLearnModel(**model) + elif framework == MODEL.FRAMEWORK_PYTHON: + return PyModel(**model) + elif framework == MODEL.FRAMEWORK_LLM: + return LLMModel(**model) + else: + raise ValueError( + "framework {} is not a supported framework".format(str(framework)) + ) + + +def input_example_to_json(input_example): + import numpy as np + + if isinstance(input_example, np.ndarray): + if input_example.size > 0: + return _handle_tensor_input(input_example) + else: + raise ValueError( + "input_example of type {} can not be empty".format(type(input_example)) + ) + elif isinstance(input_example, dict): + return _handle_dict_input(input_example) + else: + return _handle_dataframe_input(input_example) + + +def _handle_tensor_input(input_tensor): + return input_tensor.tolist() + + +def _handle_dataframe_input(input_ex): + if HAS_PANDAS: + import pandas as pd + if HAS_PANDAS and isinstance(input_ex, pd.DataFrame): + if not input_ex.empty: + return input_ex.iloc[0].tolist() + else: + raise ValueError( + "input_example of type {} can not be empty".format(type(input_ex)) + ) + elif HAS_PANDAS and isinstance(input_ex, pd.Series): + if not input_ex.empty: + return input_ex.tolist() + else: + raise ValueError( + "input_example of type {} can not be empty".format(type(input_ex)) + ) + elif isinstance(input_ex, list): + if len(input_ex) > 0: + return input_ex + else: + raise ValueError( + "input_example of type {} can not be empty".format(type(input_ex)) + ) + else: + raise TypeError( + "{} is not a supported input example type".format(type(input_ex)) + ) + + +def _handle_dict_input(input_ex): + return input_ex + + +# - artifacts + + +def compress(archive_out_path, archive_name, path_to_archive): + if os.path.isdir(path_to_archive): + return shutil.make_archive( + os.path.join(archive_out_path, archive_name), "gztar", path_to_archive + ) + else: + return shutil.make_archive( + os.path.join(archive_out_path, archive_name), + "gztar", + os.path.dirname(path_to_archive), + os.path.basename(path_to_archive), + ) + + +def decompress(archive_file_path, extract_dir=None): + return shutil.unpack_archive(archive_file_path, extract_dir=extract_dir) + + +# - export models + + +def validate_metrics(metrics): + if metrics is not None: + if not isinstance(metrics, dict): + raise TypeError( + "provided metrics is of instance {}, expected a dict".format( + type(metrics) + ) + ) + + for metric in metrics: + # Validate key is a string + if not isinstance(metric, string_types): + raise TypeError( + "provided metrics key is of instance {}, expected a string".format( + type(metric) + ) + ) + # Validate value is a number + try: + float(metrics[metric]) + except ValueError as err: + raise ValueError( + "{} is not a number, only numbers can be attached as metadata for models.".format( + str(metrics[metric]) + ) + ) from err + + +# Model serving + + +def get_predictor_for_model(model, **kwargs): + from hsml.llm.model import Model as LLMModel + from hsml.llm.predictor import Predictor as vLLMPredictor + from hsml.model import Model as BaseModel + from hsml.predictor import Predictor as BasePredictor + from hsml.python.model import Model as PyModel + from hsml.python.predictor import Predictor as PyPredictor + from hsml.sklearn.model import Model as SkLearnModel + from hsml.sklearn.predictor import Predictor as SkLearnPredictor + from hsml.tensorflow.model import Model as TFModel + from hsml.tensorflow.predictor import Predictor as TFPredictor + from hsml.torch.model import Model as TorchModel + from hsml.torch.predictor import Predictor as TorchPredictor + + if not isinstance(model, BaseModel): + raise ValueError( + "model is of type {}, but an instance of {} class is expected".format( + type(model), BaseModel + ) + ) + + if type(model) is TFModel: + return TFPredictor(**kwargs) + if type(model) is TorchModel: + return TorchPredictor(**kwargs) + if type(model) is SkLearnModel: + return SkLearnPredictor(**kwargs) + if type(model) is PyModel: + return PyPredictor(**kwargs) + if type(model) is LLMModel: + return vLLMPredictor(**kwargs) + if type(model) is BaseModel: + return BasePredictor( # python as default framework and model server + model_framework=MODEL.FRAMEWORK_PYTHON, + model_server=PREDICTOR.MODEL_SERVER_PYTHON, + **kwargs, + ) + + +# General + + +def pretty_print(obj): + if isinstance(obj, list): + for logs in obj: + pretty_print(logs) + else: + json_decamelized = humps.decamelize(obj.to_dict()) + print(json.dumps(json_decamelized, indent=4, sort_keys=True)) + + +def get_members(cls, prefix=None): + for m in inspect.getmembers(cls, lambda m: not (inspect.isroutine(m))): + n = m[0] # name + if (prefix is not None and n.startswith(prefix)) or ( + prefix is None and not (n.startswith("__") and n.endswith("__")) + ): + yield m[1] # value + + +# - json + + +def extract_field_from_json(obj, fields, default=None, as_instance_of=None): + if isinstance(fields, list): + for field in fields: + value = extract_field_from_json(obj, field, default, as_instance_of) + if value is not None: + break + else: + value = obj.pop(fields) if fields in obj else default + if as_instance_of is not None: + if isinstance(value, list): + # if the field is a list, get all obj + value = [ + get_obj_from_json(obj=subvalue, cls=as_instance_of) + for subvalue in value + ] + else: + # otherwise, get single obj + value = get_obj_from_json(obj=value, cls=as_instance_of) + return value + + +def get_obj_from_json(obj, cls): + if obj is not None: + if isinstance(obj, cls): + return obj + if isinstance(obj, dict): + return cls.from_json(obj) + if isinstance(obj, Default): + return cls() + raise ValueError( + "Object of type {} cannot be converted to class {}".format(type(obj), cls) + ) + return obj + + +def feature_view_to_json(obj): + if obj is None: + return None + import importlib.util + + if importlib.util.find_spec("hsfs"): + from hsfs import feature_view + + if isinstance(obj, feature_view.FeatureView): + import json + + import humps + + return humps.camelize(json.loads(obj.json())) + return None diff --git a/python/hsfs/util.py b/python/hsfs/util.py index 302598302..41fa0b7d3 100644 --- a/python/hsfs/util.py +++ b/python/hsfs/util.py @@ -59,6 +59,9 @@ from hsfs.constructor import serving_prepared_statement +FeatureStoreEncoder = Encoder + + def validate_feature( ft: Union[str, feature.Feature, Dict[str, Any]], ) -> feature.Feature: @@ -109,6 +112,7 @@ def build_serving_keys_from_prepared_statements( "FEATURE_STORE_NAME_SUFFIX", "VALID_EMBEDDING_TYPE", "Encoder", + "FeatureStoreEncoder", "FeatureGroupWarning", "JobWarning", "StatisticsWarning", diff --git a/python/hsml/__init__.py b/python/hsml/__init__.py index 4fb8156e3..78cc93d00 100644 --- a/python/hsml/__init__.py +++ b/python/hsml/__init__.py @@ -16,8 +16,8 @@ import warnings -from hsml import util, version -from hsml.connection import Connection +from hopsworks_common import util, version +from hopsworks_common.connection import Connection connection = Connection.connection diff --git a/python/hsml/core/dataset_api.py b/python/hsml/core/dataset_api.py index 681fe3442..9c0e4f19b 100644 --- a/python/hsml/core/dataset_api.py +++ b/python/hsml/core/dataset_api.py @@ -14,577 +14,13 @@ # limitations under the License. # -from __future__ import annotations +from hopsworks_common.core.dataset_api import ( + Chunk, + DatasetApi, +) -import copy -import json -import math -import os -import time -from concurrent.futures import ThreadPoolExecutor, wait -from typing import Literal, Optional, Union -from hsml import client, tag -from hsml.client.exceptions import RestAPIError -from tqdm.auto import tqdm - - -class Chunk: - def __init__(self, content, number, status): - self.content = content - self.number = number - self.status = status - self.retries = 0 - - -class DatasetApi: - def __init__(self): - pass - - DEFAULT_UPLOAD_FLOW_CHUNK_SIZE = 10 - DEFAULT_UPLOAD_SIMULTANEOUS_UPLOADS = 3 - DEFAULT_UPLOAD_MAX_CHUNK_RETRIES = 1 - - DEFAULT_DOWNLOAD_FLOW_CHUNK_SIZE = 1_048_576 - FLOW_PERMANENT_ERRORS = [404, 413, 415, 500, 501] - - def upload( - self, - local_path: str, - upload_path: str, - overwrite: bool = False, - chunk_size=DEFAULT_UPLOAD_FLOW_CHUNK_SIZE, - simultaneous_uploads=DEFAULT_UPLOAD_SIMULTANEOUS_UPLOADS, - max_chunk_retries=DEFAULT_UPLOAD_MAX_CHUNK_RETRIES, - chunk_retry_interval=1, - ): - """Upload a file to the Hopsworks filesystem. - - ```python - import hopsworks - - project = hopsworks.login(project="my-project") - - dataset_api = project.get_dataset_api() - - uploaded_file_path = dataset_api.upload("my_local_file.txt", "Resources") - - ``` - # Arguments - local_path: local path to file to upload - upload_path: path to directory where to upload the file in Hopsworks Filesystem - overwrite: overwrite file if exists - chunk_size: upload chunk size in megabytes. Default 10 MB - simultaneous_uploads: number of simultaneous chunks to upload. Default 3 - max_chunk_retries: maximum retry for a chunk. Default is 1 - chunk_retry_interval: chunk retry interval in seconds. Default is 1sec - # Returns - `str`: Path to uploaded file - # Raises - `RestAPIError`: If unable to upload the file - """ - # local path could be absolute or relative, - if not os.path.isabs(local_path) and os.path.exists( - os.path.join(os.getcwd(), local_path) - ): - local_path = os.path.join(os.getcwd(), local_path) - - file_size = os.path.getsize(local_path) - - _, file_name = os.path.split(local_path) - - destination_path = upload_path + "/" + file_name - chunk_size_bytes = chunk_size * 1024 * 1024 - - if self.path_exists(destination_path): - if overwrite: - self.rm(destination_path) - else: - raise Exception( - "{} already exists, set overwrite=True to overwrite it".format( - local_path - ) - ) - - num_chunks = math.ceil(file_size / chunk_size_bytes) - - base_params = self._get_flow_base_params( - file_name, num_chunks, file_size, chunk_size_bytes - ) - - chunk_number = 1 - with open(local_path, "rb") as f: - pbar = None - try: - pbar = tqdm( - total=file_size, - bar_format="{desc}: {percentage:.3f}%|{bar}| {n_fmt}/{total_fmt} elapsed<{elapsed} remaining<{remaining}", - desc="Uploading", - ) - except Exception: - self._log.exception("Failed to initialize progress bar.") - self._log.info("Starting upload") - with ThreadPoolExecutor(simultaneous_uploads) as executor: - while True: - chunks = [] - for _ in range(simultaneous_uploads): - chunk = f.read(chunk_size_bytes) - if not chunk: - break - chunks.append(Chunk(chunk, chunk_number, "pending")) - chunk_number += 1 - - if len(chunks) == 0: - break - - # upload each chunk and update pbar - futures = [ - executor.submit( - self._upload_chunk, - base_params, - upload_path, - file_name, - chunk, - pbar, - max_chunk_retries, - chunk_retry_interval, - ) - for chunk in chunks - ] - # wait for all upload tasks to complete - _, _ = wait(futures) - try: - _ = [future.result() for future in futures] - except Exception as e: - if pbar is not None: - pbar.close() - raise e - - if pbar is not None: - pbar.close() - else: - self._log.info("Upload finished") - - return upload_path + "/" + os.path.basename(local_path) - - def _upload_chunk( - self, - base_params, - upload_path, - file_name, - chunk: Chunk, - pbar, - max_chunk_retries, - chunk_retry_interval, - ): - query_params = copy.copy(base_params) - query_params["flowCurrentChunkSize"] = len(chunk.content) - query_params["flowChunkNumber"] = chunk.number - - chunk.status = "uploading" - while True: - try: - self._upload_request( - query_params, upload_path, file_name, chunk.content - ) - break - except RestAPIError as re: - chunk.retries += 1 - if ( - re.response.status_code in DatasetApi.FLOW_PERMANENT_ERRORS - or chunk.retries > max_chunk_retries - ): - chunk.status = "failed" - raise re - time.sleep(chunk_retry_interval) - continue - - chunk.status = "uploaded" - - if pbar is not None: - pbar.update(query_params["flowCurrentChunkSize"]) - - def _get_flow_base_params(self, file_name, num_chunks, size, chunk_size): - return { - "templateId": -1, - "flowChunkSize": chunk_size, - "flowTotalSize": size, - "flowIdentifier": str(size) + "_" + file_name, - "flowFilename": file_name, - "flowRelativePath": file_name, - "flowTotalChunks": num_chunks, - } - - def _upload_request(self, params, path, file_name, chunk): - _client = client.get_instance() - path_params = ["project", _client._project_id, "dataset", "upload", path] - - # Flow configuration params are sent as form data - _client._send_request( - "POST", path_params, data=params, files={"file": (file_name, chunk)} - ) - - def download(self, path, local_path): - """Download file/directory on a path in datasets. - :param path: path to download - :type path: str - :param local_path: path to download in datasets - :type local_path: str - """ - - _client = client.get_instance() - path_params = [ - "project", - _client._project_id, - "dataset", - "download", - "with_auth", - path, - ] - query_params = {"type": "DATASET"} - - with _client._send_request( - "GET", path_params, query_params=query_params, stream=True - ) as response: - with open(local_path, "wb") as f: - downloaded = 0 - # if not response.headers.get("Content-Length"), file is still downloading - for chunk in response.iter_content( - chunk_size=self.DEFAULT_DOWNLOAD_FLOW_CHUNK_SIZE - ): - f.write(chunk) - downloaded += len(chunk) - - def get(self, remote_path): - """Get metadata about a path in datasets. - - :param remote_path: path to check - :type remote_path: str - :return: dataset metadata - :rtype: dict - """ - _client = client.get_instance() - path_params = ["project", _client._project_id, "dataset", remote_path] - headers = {"content-type": "application/json"} - return _client._send_request("GET", path_params, headers=headers) - - def path_exists(self, remote_path): - """Check if a path exists in datasets. - - :param remote_path: path to check - :type remote_path: str - :return: boolean whether path exists - :rtype: bool - """ - try: - self.get(remote_path) - return True - except RestAPIError: - return False - - def list(self, remote_path, sort_by=None, limit=1000): - """List all files in a directory in datasets. - - :param remote_path: path to list - :type remote_path: str - :param sort_by: sort string - :type sort_by: str - :param limit: max number of returned files - :type limit: int - """ - _client = client.get_instance() - path_params = ["project", _client._project_id, "dataset", remote_path] - query_params = {"action": "listing", "sort_by": sort_by, "limit": limit} - headers = {"content-type": "application/json"} - return _client._send_request( - "GET", path_params, headers=headers, query_params=query_params - ) - - def chmod(self, remote_path, permissions): - """Chmod operation on file or directory in datasets. - - :param remote_path: path to chmod - :type remote_path: str - :param permissions: permissions string, for example u+x - :type permissions: str - """ - _client = client.get_instance() - path_params = ["project", _client._project_id, "dataset", remote_path] - headers = {"content-type": "application/json"} - query_params = {"action": "PERMISSION", "permissions": permissions} - return _client._send_request( - "PUT", path_params, headers=headers, query_params=query_params - ) - - def mkdir(self, remote_path): - """Path to create in datasets. - - :param remote_path: path to create - :type remote_path: str - """ - _client = client.get_instance() - path_params = ["project", _client._project_id, "dataset", remote_path] - query_params = { - "action": "create", - "searchable": "true", - "generate_readme": "false", - "type": "DATASET", - } - headers = {"content-type": "application/json"} - return _client._send_request( - "POST", path_params, headers=headers, query_params=query_params - ) - - def rm(self, remote_path): - """Remove a path in datasets. - - :param remote_path: path to remove - :type remote_path: str - """ - _client = client.get_instance() - path_params = ["project", _client._project_id, "dataset", remote_path] - return _client._send_request("DELETE", path_params) - - def _archive( - self, - remote_path: str, - destination_path: Optional[str] = None, - block: bool = False, - timeout: Optional[int] = 120, - action: Union[Literal["unzip"], Literal["zip"]] = "unzip", - ): - """Internal (de)compression logic. - - # Arguments - remote_path: path to file or directory to unzip. - destination_path: path to upload the zip, defaults to None; is used only if action is zip. - block: if the operation should be blocking until complete, defaults to False. - timeout: timeout in seconds for the blocking, defaults to 120; if None, the blocking is unbounded. - action: zip or unzip, defaults to unzip. - - # Returns - `bool`: whether the operation completed in the specified timeout; if non-blocking, always returns True. - """ - - _client = client.get_instance() - path_params = ["project", _client._project_id, "dataset", remote_path] - - query_params = {"action": action} - - if destination_path is not None: - query_params["destination_path"] = destination_path - query_params["destination_type"] = "DATASET" - - headers = {"content-type": "application/json"} - - _client._send_request( - "POST", path_params, headers=headers, query_params=query_params - ) - - if not block: - # the call is successful at this point if we don't want to block - return True - - # Wait for zip file to appear. When it does, check that parent dir zipState is not set to CHOWNING - count = 0 - while timeout is None: - if action == "zip": - zip_path = remote_path + ".zip" - # Get the status of the zipped file - if destination_path is None: - zip_exists = self.path_exists(zip_path) - else: - zip_exists = self.path_exists( - destination_path + "/" + os.path.split(zip_path)[1] - ) - # Get the zipState of the directory being zipped - dir_status = self.get(remote_path) - zip_state = dir_status["zipState"] if "zipState" in dir_status else None - if zip_exists and zip_state == "NONE": - return True - elif action == "unzip": - # Get the status of the unzipped dir - unzipped_dir_exists = self.path_exists( - remote_path[: remote_path.index(".")] - ) - # Get the zipState of the zip being extracted - dir_status = self.get(remote_path) - zip_state = dir_status["zipState"] if "zipState" in dir_status else None - if unzipped_dir_exists and zip_state == "NONE": - return True - time.sleep(1) - count += 1 - if count >= timeout: - self._log.info( - f"Timeout of {timeout} seconds exceeded while {action} {remote_path}." - ) - return False - - def unzip( - self, remote_path: str, block: bool = False, timeout: Optional[int] = 120 - ): - """Unzip an archive in the dataset. - - # Arguments - remote_path: path to file or directory to unzip. - block: if the operation should be blocking until complete, defaults to False. - timeout: timeout in seconds for the blocking, defaults to 120; if None, the blocking is unbounded. - - # Returns - `bool`: whether the operation completed in the specified timeout; if non-blocking, always returns True. - """ - return self._archive(remote_path, block=block, timeout=timeout, action="unzip") - - def zip( - self, - remote_path: str, - destination_path: Optional[str] = None, - block: bool = False, - timeout: Optional[int] = 120, - ): - """Zip a file or directory in the dataset. - - # Arguments - remote_path: path to file or directory to unzip. - destination_path: path to upload the zip, defaults to None. - block: if the operation should be blocking until complete, defaults to False. - timeout: timeout in seconds for the blocking, defaults to 120; if None, the blocking is unbounded. - - # Returns - `bool`: whether the operation completed in the specified timeout; if non-blocking, always returns True. - """ - return self._archive( - remote_path, - destination_path=destination_path, - block=block, - timeout=timeout, - action="zip", - ) - - def move(self, source_path, destination_path): - """Move a file or directory in the dataset. - - A tag consists of a name/value pair. Tag names are unique identifiers. - The value of a tag can be any valid json - primitives, arrays or json objects. - - :param source_path: path to file or directory to move - :type source_path: str - :param destination_path: destination path - :type destination_path: str - """ - - _client = client.get_instance() - path_params = ["project", _client._project_id, "dataset", source_path] - - query_params = {"action": "move", "destination_path": destination_path} - headers = {"content-type": "application/json"} - - _client._send_request( - "POST", path_params, headers=headers, query_params=query_params - ) - - def copy(self, source_path, destination_path): - """Copy a file or directory in the dataset. - - A tag consists of a name/value pair. Tag names are unique identifiers. - The value of a tag can be any valid json - primitives, arrays or json objects. - - :param source_path: path to file or directory to copy - :type source_path: str - :param destination_path: destination path - :type destination_path: str - """ - - _client = client.get_instance() - path_params = ["project", _client._project_id, "dataset", source_path] - - query_params = {"action": "copy", "destination_path": destination_path} - headers = {"content-type": "application/json"} - - _client._send_request( - "POST", path_params, headers=headers, query_params=query_params - ) - - def add(self, path, name, value): - """Attach a name/value tag to a model. - - A tag consists of a name/value pair. Tag names are unique identifiers. - The value of a tag can be any valid json - primitives, arrays or json objects. - - :param path: path to add the tag - :type path: str - :param name: name of the tag to be added - :type name: str - :param value: value of the tag to be added - :type value: str - """ - _client = client.get_instance() - path_params = [ - "project", - _client._project_id, - "dataset", - "tags", - "schema", - name, - path, - ] - headers = {"content-type": "application/json"} - json_value = json.dumps(value) - _client._send_request("PUT", path_params, headers=headers, data=json_value) - - def delete(self, path, name): - """Delete a tag. - - Tag names are unique identifiers. - - :param path: path to delete the tags - :type path: str - :param name: name of the tag to be removed - :type name: str - """ - _client = client.get_instance() - path_params = [ - "project", - _client._project_id, - "dataset", - "tags", - "schema", - name, - path, - ] - _client._send_request("DELETE", path_params) - - def get_tags(self, path, name: str = None): - """Get the tags. - - Gets all tags if no tag name is specified. - - :param path: path to get the tags - :type path: str - :param name: tag name - :type name: str - :return: dict of tag name/values - :rtype: dict - """ - _client = client.get_instance() - path_params = [ - "project", - _client._project_id, - "dataset", - "tags", - ] - - if name is not None: - path_params.append("schema") - path_params.append(name) - else: - path_params.append("all") - - path_params.append(path) - - return { - tag._name: json.loads(tag._value) - for tag in tag.Tag.from_response_json( - _client._send_request("GET", path_params) - ) - } +__all__ = [ + "Chunk", + "DatasetApi", +] diff --git a/python/hsml/deployable_component.py b/python/hsml/deployable_component.py index cdd05e86c..6dd211fdc 100644 --- a/python/hsml/deployable_component.py +++ b/python/hsml/deployable_component.py @@ -18,8 +18,8 @@ from typing import Optional, Union import humps -from hsml import util -from hsml.constants import Default +from hopsworks_common import util +from hopsworks_common.constants import Default from hsml.inference_batcher import InferenceBatcher from hsml.resources import Resources @@ -53,7 +53,7 @@ def from_response_json(cls, json_dict): return cls.from_json(json_decamelized) def json(self): - return json.dumps(self, cls=util.MLEncoder) + return json.dumps(self, cls=util.Encoder) @abstractmethod def update_from_response_json(self, json_dict): diff --git a/python/hsml/deployable_component_logs.py b/python/hsml/deployable_component_logs.py index 7035030a4..18f91c38d 100644 --- a/python/hsml/deployable_component_logs.py +++ b/python/hsml/deployable_component_logs.py @@ -16,7 +16,7 @@ from datetime import datetime import humps -from hsml import util +from hopsworks_common import util class DeployableComponentLogs: diff --git a/python/hsml/deployment.py b/python/hsml/deployment.py index f23eec2ed..9c98b4e94 100644 --- a/python/hsml/deployment.py +++ b/python/hsml/deployment.py @@ -15,8 +15,7 @@ from typing import Dict, List, Optional, Union -from hopsworks_common import usage -from hsml import client, util +from hopsworks_common import client, usage, util from hsml import predictor as predictor_mod from hsml.client.exceptions import ModelServingException from hsml.client.istio.utils.infer_type import InferInput diff --git a/python/hsml/engine/model_engine.py b/python/hsml/engine/model_engine.py index 1cf0ba99f..4165c801b 100644 --- a/python/hsml/engine/model_engine.py +++ b/python/hsml/engine/model_engine.py @@ -20,9 +20,10 @@ import time import uuid -from hsml import client, constants, util -from hsml.client.exceptions import ModelRegistryException, RestAPIError -from hsml.core import dataset_api, model_api +from hopsworks_common import client, constants, util +from hopsworks_common.client.exceptions import ModelRegistryException, RestAPIError +from hopsworks_common.core import dataset_api +from hsml.core import model_api from hsml.engine import local_engine from tqdm.auto import tqdm diff --git a/python/hsml/engine/serving_engine.py b/python/hsml/engine/serving_engine.py index 164ac7504..4bc377ae5 100644 --- a/python/hsml/engine/serving_engine.py +++ b/python/hsml/engine/serving_engine.py @@ -20,18 +20,17 @@ import uuid from typing import Dict, List, Union -from hsml import constants -from hsml.client.exceptions import ModelServingException, RestAPIError -from hsml.client.istio.utils.infer_type import InferInput -from hsml.constants import ( +from hopsworks_common.client.exceptions import ModelServingException, RestAPIError +from hopsworks_common.client.istio.utils.infer_type import InferInput +from hopsworks_common.constants import ( DEPLOYMENT, + MODEL_SERVING, PREDICTOR, PREDICTOR_STATE, ) -from hsml.constants import ( - INFERENCE_ENDPOINTS as IE, -) -from hsml.core import dataset_api, serving_api +from hopsworks_common.constants import INFERENCE_ENDPOINTS as IE +from hopsworks_common.core import dataset_api +from hsml.core import serving_api from hsml.engine import local_engine from tqdm.auto import tqdm @@ -328,7 +327,7 @@ def _download_files_from_hopsfs_recursive( if path_attr.get("dir", False): # otherwise, make a recursive call for the folder if ( - basename == constants.MODEL_SERVING.ARTIFACTS_DIR_NAME + basename == MODEL_SERVING.ARTIFACTS_DIR_NAME ): # TODO: Not needed anymore continue # skip Artifacts subfolder local_folder_path = os.path.join(to_local_path, basename) @@ -383,7 +382,7 @@ def download_artifact_files(self, deployment_instance, local_path=None): str(uuid.uuid4()), deployment_instance.model_name, str(deployment_instance.model_version), - constants.MODEL_SERVING.ARTIFACTS_DIR_NAME, + MODEL_SERVING.ARTIFACTS_DIR_NAME, str(deployment_instance.artifact_version), ) os.makedirs(local_path, exist_ok=True) diff --git a/python/hsml/inference_batcher.py b/python/hsml/inference_batcher.py index 265615c56..575d4a306 100644 --- a/python/hsml/inference_batcher.py +++ b/python/hsml/inference_batcher.py @@ -17,8 +17,8 @@ from typing import Optional import humps -from hsml import util -from hsml.constants import INFERENCE_BATCHER +from hopsworks_common import util +from hopsworks_common.constants import INFERENCE_BATCHER class InferenceBatcher: @@ -84,7 +84,7 @@ def update_from_response_json(self, json_dict): return self def json(self): - return json.dumps(self, cls=util.MLEncoder) + return json.dumps(self, cls=util.Encoder) def to_dict(self): json = {"batchingEnabled": self._enabled} diff --git a/python/hsml/inference_endpoint.py b/python/hsml/inference_endpoint.py index af031dbf5..673dbdc2d 100644 --- a/python/hsml/inference_endpoint.py +++ b/python/hsml/inference_endpoint.py @@ -17,7 +17,7 @@ from typing import List, Optional import humps -from hsml import util +from hopsworks_common import util class InferenceEndpointPort: diff --git a/python/hsml/inference_logger.py b/python/hsml/inference_logger.py index 5377eca3d..e7ad42340 100644 --- a/python/hsml/inference_logger.py +++ b/python/hsml/inference_logger.py @@ -17,9 +17,9 @@ from typing import Optional, Union import humps -from hsml import util -from hsml.constants import DEFAULT, INFERENCE_LOGGER, Default -from hsml.kafka_topic import KafkaTopic +from hopsworks_common import util +from hopsworks_common.constants import DEFAULT, INFERENCE_LOGGER, Default +from hopsworks_common.kafka_topic import KafkaTopic class InferenceLogger: @@ -94,7 +94,7 @@ def update_from_response_json(self, json_dict): return self def json(self): - return json.dumps(self, cls=util.MLEncoder) + return json.dumps(self, cls=util.Encoder) def to_dict(self): json = {"inferenceLogging": self._mode} diff --git a/python/hsml/kafka_topic.py b/python/hsml/kafka_topic.py index 9dce0bb56..3d453abc9 100644 --- a/python/hsml/kafka_topic.py +++ b/python/hsml/kafka_topic.py @@ -13,125 +13,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -import json -from typing import Optional +from hopsworks_common.kafka_topic import ( + KafkaTopic, +) -import humps -from hsml import util -from hsml.constants import KAFKA_TOPIC - -class KafkaTopic: - """Configuration for a Kafka topic.""" - - def __init__( - self, - name: str = KAFKA_TOPIC.CREATE, - num_replicas: Optional[int] = None, - num_partitions: Optional[int] = None, - **kwargs, - ): - self._name = name - self._num_replicas, self._num_partitions = self._validate_topic_config( - self._name, num_replicas, num_partitions - ) - - def describe(self): - util.pretty_print(self) - - @classmethod - def _validate_topic_config(cls, name, num_replicas, num_partitions): - if name is not None and name != KAFKA_TOPIC.NONE: - if name == KAFKA_TOPIC.CREATE: - if num_replicas is None: - print( - "Setting number of replicas to default value '{}'".format( - KAFKA_TOPIC.NUM_REPLICAS - ) - ) - num_replicas = KAFKA_TOPIC.NUM_REPLICAS - if num_partitions is None: - print( - "Setting number of partitions to default value '{}'".format( - KAFKA_TOPIC.NUM_PARTITIONS - ) - ) - num_partitions = KAFKA_TOPIC.NUM_PARTITIONS - else: - if num_replicas is not None or num_partitions is not None: - raise ValueError( - "Number of replicas or partitions cannot be changed in existing kafka topics." - ) - elif name is None or name == KAFKA_TOPIC.NONE: - num_replicas = None - num_partitions = None - - return num_replicas, num_partitions - - @classmethod - def from_response_json(cls, json_dict): - json_decamelized = humps.decamelize(json_dict) - return cls.from_json(json_decamelized) - - @classmethod - def from_json(cls, json_decamelized): - return KafkaTopic(**cls.extract_fields_from_json(json_decamelized)) - - @classmethod - def extract_fields_from_json(cls, json_decamelized): - kwargs = {} - kwargs["name"] = json_decamelized.pop("name") # required - kwargs["num_replicas"] = util.extract_field_from_json( - json_decamelized, ["num_of_replicas", "num_replicas"] - ) - kwargs["num_partitions"] = util.extract_field_from_json( - json_decamelized, ["num_of_partitions", "num_partitions"] - ) - return kwargs - - def update_from_response_json(self, json_dict): - json_decamelized = humps.decamelize(json_dict) - self.__init__(**self.extract_fields_from_json(json_decamelized)) - return self - - def json(self): - return json.dumps(self, cls=util.MLEncoder) - - def to_dict(self): - return { - "kafkaTopicDTO": { - "name": self._name, - "numOfReplicas": self._num_replicas, - "numOfPartitions": self._num_partitions, - } - } - - @property - def name(self): - """Name of the Kafka topic.""" - return self._name - - @name.setter - def name(self, name: str): - self._name = name - - @property - def num_replicas(self): - """Number of replicas of the Kafka topic.""" - return self._num_replicas - - @num_replicas.setter - def num_replicas(self, num_replicas: int): - self._num_replicas = num_replicas - - @property - def num_partitions(self): - """Number of partitions of the Kafka topic.""" - return self._num_partitions - - @num_partitions.setter - def topic_num_partitions(self, num_partitions: int): - self._num_partitions = num_partitions - - def __repr__(self): - return f"KafkaTopic({self._name!r})" +__all__ = [ + KafkaTopic, +] diff --git a/python/hsml/model.py b/python/hsml/model.py index 2dfa7d7ab..2c897b50d 100644 --- a/python/hsml/model.py +++ b/python/hsml/model.py @@ -22,10 +22,9 @@ from typing import Any, Dict, Optional, Union import humps -from hopsworks_common import usage -from hsml import client, constants, util -from hsml.constants import ARTIFACT_VERSION -from hsml.constants import INFERENCE_ENDPOINTS as IE +from hopsworks_common import client, usage, util +from hopsworks_common.constants import ARTIFACT_VERSION, MODEL_REGISTRY +from hopsworks_common.constants import INFERENCE_ENDPOINTS as IE from hsml.core import explicit_provenance from hsml.engine import model_engine from hsml.inference_batcher import InferenceBatcher @@ -388,7 +387,7 @@ def update_from_response_json(self, json_dict): return self def json(self): - return json.dumps(self, cls=util.MLEncoder) + return json.dumps(self, cls=util.Encoder) def to_dict(self): return { @@ -568,7 +567,7 @@ def model_files_path(self): """path of the model files including version and files folder. Resolves to /Projects/{project_name}/Models/{name}/{version}/Files""" return "{}/{}".format( self.version_path, - constants.MODEL_REGISTRY.MODEL_FILES_DIR_NAME, + MODEL_REGISTRY.MODEL_FILES_DIR_NAME, ) @property diff --git a/python/hsml/model_registry.py b/python/hsml/model_registry.py index 70b90b989..9309eb7c4 100644 --- a/python/hsml/model_registry.py +++ b/python/hsml/model_registry.py @@ -17,8 +17,7 @@ import warnings import humps -from hopsworks_common import usage -from hsml import util +from hopsworks_common import usage, util from hsml.core import model_api from hsml.llm import signature as llm_signature # noqa: F401 from hsml.python import signature as python_signature # noqa: F401 diff --git a/python/hsml/model_serving.py b/python/hsml/model_serving.py index 9b3c34ba5..2d24d2b20 100644 --- a/python/hsml/model_serving.py +++ b/python/hsml/model_serving.py @@ -17,10 +17,9 @@ import os from typing import Optional, Union -from hopsworks_common import usage -from hsml import util -from hsml.constants import ARTIFACT_VERSION, PREDICTOR_STATE -from hsml.constants import INFERENCE_ENDPOINTS as IE +from hopsworks_common import usage, util +from hopsworks_common.constants import ARTIFACT_VERSION, PREDICTOR_STATE +from hopsworks_common.constants import INFERENCE_ENDPOINTS as IE from hsml.core import serving_api from hsml.deployment import Deployment from hsml.inference_batcher import InferenceBatcher diff --git a/python/hsml/predictor.py b/python/hsml/predictor.py index 236b7cb20..31c6aa138 100644 --- a/python/hsml/predictor.py +++ b/python/hsml/predictor.py @@ -17,15 +17,17 @@ from typing import Optional, Union import humps -from hsml import client, constants, deployment, util -from hsml.constants import ( +from hopsworks_common import client, util +from hopsworks_common.constants import ( ARTIFACT_VERSION, INFERENCE_ENDPOINTS, MODEL, + MODEL_SERVING, PREDICTOR, RESOURCES, Default, ) +from hsml import deployment from hsml.deployable_component import DeployableComponent from hsml.inference_batcher import InferenceBatcher from hsml.inference_logger import InferenceLogger @@ -294,7 +296,7 @@ def update_from_response_json(self, json_dict): return self def json(self): - return json.dumps(self, cls=util.MLEncoder) + return json.dumps(self, cls=util.Encoder) def to_dict(self): json = { @@ -400,7 +402,7 @@ def artifact_files_path(self): return "{}/{}/{}/{}".format( self._model_path, str(self._model_version), - constants.MODEL_SERVING.ARTIFACTS_DIR_NAME, + MODEL_SERVING.ARTIFACTS_DIR_NAME, str(self._artifact_version), ) diff --git a/python/hsml/predictor_state.py b/python/hsml/predictor_state.py index b145993e1..70a92fd1b 100644 --- a/python/hsml/predictor_state.py +++ b/python/hsml/predictor_state.py @@ -16,7 +16,7 @@ from typing import Optional import humps -from hsml import util +from hopsworks_common import util from hsml.predictor_state_condition import PredictorStateCondition diff --git a/python/hsml/predictor_state_condition.py b/python/hsml/predictor_state_condition.py index cf1c58934..5be445f3f 100644 --- a/python/hsml/predictor_state_condition.py +++ b/python/hsml/predictor_state_condition.py @@ -17,7 +17,7 @@ from typing import Optional import humps -from hsml import util +from hopsworks_common import util class PredictorStateCondition: @@ -60,7 +60,7 @@ def update_from_response_json(self, json_dict): return self def json(self): - return json.dumps(self, cls=util.MLEncoder) + return json.dumps(self, cls=util.Encoder) def to_dict(self): return { diff --git a/python/hsml/resources.py b/python/hsml/resources.py index a28f72d6b..613c378eb 100644 --- a/python/hsml/resources.py +++ b/python/hsml/resources.py @@ -18,8 +18,8 @@ from typing import Optional, Union import humps -from hsml import client, util -from hsml.constants import RESOURCES, Default +from hopsworks_common import client, util +from hopsworks_common.constants import RESOURCES, Default class Resources: @@ -66,7 +66,7 @@ def extract_fields_from_json(cls, json_decamelized): return kwargs def json(self): - return json.dumps(self, cls=util.MLEncoder) + return json.dumps(self, cls=util.Encoder) def to_dict(self): return {"cores": self._cores, "memory": self._memory, "gpus": self._gpus} @@ -301,7 +301,7 @@ def extract_fields_from_json(cls, json_decamelized): return kwargs def json(self): - return json.dumps(self, cls=util.MLEncoder) + return json.dumps(self, cls=util.Encoder) @abstractmethod def to_dict(self): diff --git a/python/hsml/tag.py b/python/hsml/tag.py index aecf2ed74..155279285 100644 --- a/python/hsml/tag.py +++ b/python/hsml/tag.py @@ -14,64 +14,11 @@ # limitations under the License. # -import json +from hopsworks_common.tag import ( + Tag, +) -import humps -from hsml import util - -class Tag: - def __init__( - self, - name, - value, - schema=None, - href=None, - expand=None, - items=None, - count=None, - type=None, - **kwargs, - ): - self._name = name - self._value = value - - def to_dict(self): - return { - "name": self._name, - "value": self._value, - } - - def json(self): - return json.dumps(self, cls=util.MLEncoder) - - @classmethod - def from_response_json(cls, json_dict): - json_decamelized = humps.decamelize(json_dict) - if "count" not in json_decamelized or json_decamelized["count"] == 0: - return [] - return [cls(**tag) for tag in json_decamelized["items"]] - - @property - def name(self): - """Name of the tag.""" - return self._name - - @name.setter - def name(self, name): - self._name = name - - @property - def value(self): - """Value of tag.""" - return self._value - - @value.setter - def value(self, value): - self._value = value - - def __str__(self): - return self.json() - - def __repr__(self): - return f"Tag({self._name!r}, {self._value!r})" +__all__ = [ + Tag, +] diff --git a/python/hsml/transformer.py b/python/hsml/transformer.py index ce4f43b99..8cb1ebdce 100644 --- a/python/hsml/transformer.py +++ b/python/hsml/transformer.py @@ -16,8 +16,8 @@ from typing import Optional, Union import humps -from hsml import client, util -from hsml.constants import RESOURCES, Default +from hopsworks_common import client, util +from hopsworks_common.constants import RESOURCES, Default from hsml.deployable_component import DeployableComponent from hsml.resources import TransformerResources diff --git a/python/hsml/util.py b/python/hsml/util.py index 461793ebf..41471bcc8 100644 --- a/python/hsml/util.py +++ b/python/hsml/util.py @@ -14,348 +14,51 @@ # limitations under the License. # -from __future__ import annotations - -import datetime -import inspect -import os -import shutil -from json import JSONEncoder, dumps -from urllib.parse import urljoin, urlparse - -import humps -import numpy as np -import pandas as pd -from hsml import client -from hsml.constants import MODEL, PREDICTOR, Default -from six import string_types - - -class VersionWarning(Warning): - pass - - -class ProvenanceWarning(Warning): - pass - - -class MLEncoder(JSONEncoder): - def default(self, obj): - try: - return obj.to_dict() - except AttributeError: - return super().default(obj) - - -class NumpyEncoder(JSONEncoder): - """Special json encoder for numpy types. - Note that some numpy types doesn't have native python equivalence, - hence json.dumps will raise TypeError. - In this case, you'll need to convert your numpy types into its closest python equivalence. - """ - - def convert(self, obj): - import base64 - - import numpy as np - import pandas as pd - - def encode_binary(x): - return base64.encodebytes(x).decode("ascii") - - if isinstance(obj, np.ndarray): - if obj.dtype == np.object: - return [self.convert(x)[0] for x in obj.tolist()] - elif obj.dtype == np.bytes_: - return np.vectorize(encode_binary)(obj), True - else: - return obj.tolist(), True - - if isinstance(obj, (pd.Timestamp, datetime.date)): - return obj.isoformat(), True - if isinstance(obj, bytes) or isinstance(obj, bytearray): - return encode_binary(obj), True - if isinstance(obj, np.generic): - return obj.item(), True - if isinstance(obj, np.datetime64): - return np.datetime_as_string(obj), True - return obj, False - - def default(self, obj): # pylint: disable=E0202 - res, converted = self.convert(obj) - if converted: - return res - else: - return super().default(obj) - - -# Model registry - -# - schema and types - - -def set_model_class(model): - from hsml.llm.model import Model as LLMModel - from hsml.model import Model as BaseModel - from hsml.python.model import Model as PyModel - from hsml.sklearn.model import Model as SkLearnModel - from hsml.tensorflow.model import Model as TFModel - from hsml.torch.model import Model as TorchModel - - if "href" in model: - _ = model.pop("href") - if "type" in model: # backwards compatibility - _ = model.pop("type") - if "tags" in model: - _ = model.pop("tags") # tags are always retrieved from backend - - if "framework" not in model: - return BaseModel(**model) - - framework = model.pop("framework") - if framework == MODEL.FRAMEWORK_TENSORFLOW: - return TFModel(**model) - if framework == MODEL.FRAMEWORK_TORCH: - return TorchModel(**model) - if framework == MODEL.FRAMEWORK_SKLEARN: - return SkLearnModel(**model) - elif framework == MODEL.FRAMEWORK_PYTHON: - return PyModel(**model) - elif framework == MODEL.FRAMEWORK_LLM: - return LLMModel(**model) - else: - raise ValueError( - "framework {} is not a supported framework".format(str(framework)) - ) - - -def input_example_to_json(input_example): - if isinstance(input_example, np.ndarray): - if input_example.size > 0: - return _handle_tensor_input(input_example) - else: - raise ValueError( - "input_example of type {} can not be empty".format(type(input_example)) - ) - elif isinstance(input_example, dict): - return _handle_dict_input(input_example) - else: - return _handle_dataframe_input(input_example) - - -def _handle_tensor_input(input_tensor): - return input_tensor.tolist() - - -def _handle_dataframe_input(input_ex): - if isinstance(input_ex, pd.DataFrame): - if not input_ex.empty: - return input_ex.iloc[0].tolist() - else: - raise ValueError( - "input_example of type {} can not be empty".format(type(input_ex)) - ) - elif isinstance(input_ex, pd.Series): - if not input_ex.empty: - return input_ex.tolist() - else: - raise ValueError( - "input_example of type {} can not be empty".format(type(input_ex)) - ) - elif isinstance(input_ex, list): - if len(input_ex) > 0: - return input_ex - else: - raise ValueError( - "input_example of type {} can not be empty".format(type(input_ex)) - ) - else: - raise TypeError( - "{} is not a supported input example type".format(type(input_ex)) - ) - - -def _handle_dict_input(input_ex): - return input_ex - - -# - artifacts - - -def compress(archive_out_path, archive_name, path_to_archive): - if os.path.isdir(path_to_archive): - return shutil.make_archive( - os.path.join(archive_out_path, archive_name), "gztar", path_to_archive - ) - else: - return shutil.make_archive( - os.path.join(archive_out_path, archive_name), - "gztar", - os.path.dirname(path_to_archive), - os.path.basename(path_to_archive), - ) - - -def decompress(archive_file_path, extract_dir=None): - return shutil.unpack_archive(archive_file_path, extract_dir=extract_dir) - - -# - export models - - -def validate_metrics(metrics): - if metrics is not None: - if not isinstance(metrics, dict): - raise TypeError( - "provided metrics is of instance {}, expected a dict".format( - type(metrics) - ) - ) - - for metric in metrics: - # Validate key is a string - if not isinstance(metric, string_types): - raise TypeError( - "provided metrics key is of instance {}, expected a string".format( - type(metric) - ) - ) - # Validate value is a number - try: - float(metrics[metric]) - except ValueError as err: - raise ValueError( - "{} is not a number, only numbers can be attached as metadata for models.".format( - str(metrics[metric]) - ) - ) from err - - -# Model serving - - -def get_predictor_for_model(model, **kwargs): - from hsml.llm.model import Model as LLMModel - from hsml.llm.predictor import Predictor as vLLMPredictor - from hsml.model import Model as BaseModel - from hsml.predictor import Predictor as BasePredictor - from hsml.python.model import Model as PyModel - from hsml.python.predictor import Predictor as PyPredictor - from hsml.sklearn.model import Model as SkLearnModel - from hsml.sklearn.predictor import Predictor as SkLearnPredictor - from hsml.tensorflow.model import Model as TFModel - from hsml.tensorflow.predictor import Predictor as TFPredictor - from hsml.torch.model import Model as TorchModel - from hsml.torch.predictor import Predictor as TorchPredictor - - if not isinstance(model, BaseModel): - raise ValueError( - "model is of type {}, but an instance of {} class is expected".format( - type(model), BaseModel - ) - ) - - if type(model) is TFModel: - return TFPredictor(**kwargs) - if type(model) is TorchModel: - return TorchPredictor(**kwargs) - if type(model) is SkLearnModel: - return SkLearnPredictor(**kwargs) - if type(model) is PyModel: - return PyPredictor(**kwargs) - if type(model) is LLMModel: - return vLLMPredictor(**kwargs) - if type(model) is BaseModel: - return BasePredictor( # python as default framework and model server - model_framework=MODEL.FRAMEWORK_PYTHON, - model_server=PREDICTOR.MODEL_SERVER_PYTHON, - **kwargs, - ) - - -def get_hostname_replaced_url(sub_path: str): - """ - construct and return an url with public hopsworks hostname and sub path - :param self: - :param sub_path: url sub-path after base url - :return: href url - """ - href = urljoin(client.get_instance()._base_url, sub_path) - url_parsed = client.get_instance().replace_public_host(urlparse(href)) - return url_parsed.geturl() - - -# General - - -def pretty_print(obj): - if isinstance(obj, list): - for logs in obj: - pretty_print(logs) - else: - json_decamelized = humps.decamelize(obj.to_dict()) - print(dumps(json_decamelized, indent=4, sort_keys=True)) - - -def get_members(cls, prefix=None): - for m in inspect.getmembers(cls, lambda m: not (inspect.isroutine(m))): - n = m[0] # name - if (prefix is not None and n.startswith(prefix)) or ( - prefix is None and not (n.startswith("__") and n.endswith("__")) - ): - yield m[1] # value - - -# - json - - -def extract_field_from_json(obj, fields, default=None, as_instance_of=None): - if isinstance(fields, list): - for field in fields: - value = extract_field_from_json(obj, field, default, as_instance_of) - if value is not None: - break - else: - value = obj.pop(fields) if fields in obj else default - if as_instance_of is not None: - if isinstance(value, list): - # if the field is a list, get all obj - value = [ - get_obj_from_json(obj=subvalue, cls=as_instance_of) - for subvalue in value - ] - else: - # otherwise, get single obj - value = get_obj_from_json(obj=value, cls=as_instance_of) - return value - - -def get_obj_from_json(obj, cls): - if obj is not None: - if isinstance(obj, cls): - return obj - if isinstance(obj, dict): - return cls.from_json(obj) - if isinstance(obj, Default): - return cls() - raise ValueError( - "Object of type {} cannot be converted to class {}".format(type(obj), cls) - ) - return obj - - -def feature_view_to_json(obj): - if obj is None: - return None - import importlib.util - - if importlib.util.find_spec("hsfs"): - from hsfs import feature_view - - if isinstance(obj, feature_view.FeatureView): - import json - - import humps - - return humps.camelize(json.loads(obj.json())) - return None +from hopsworks_common.util import ( + Encoder, + NumpyEncoder, + ProvenanceWarning, + VersionWarning, + _handle_dataframe_input, + _handle_dict_input, + _handle_tensor_input, + compress, + decompress, + extract_field_from_json, + feature_view_to_json, + get_hostname_replaced_url, + get_members, + get_obj_from_json, + get_predictor_for_model, + input_example_to_json, + pretty_print, + set_model_class, + validate_metrics, +) + + +MLEncoder = Encoder + + +__all__ = [ + "Encoder", + "MLEncoder", + "NumpyEncoder", + "ProvenanceWarning", + "VersionWarning", + "_handle_dataframe_input", + "_handle_dict_input", + "_handle_tensor_input", + "compress", + "decompress", + "extract_field_from_json", + "feature_view_to_json", + "get_hostname_replaced_url", + "get_members", + "get_obj_from_json", + "get_predictor_for_model", + "input_example_to_json", + "pretty_print", + "set_model_class", + "validate_metrics", +] diff --git a/python/tests/test_deployable_component.py b/python/tests/test_deployable_component.py index 97ec67018..e781ff0d7 100644 --- a/python/tests/test_deployable_component.py +++ b/python/tests/test_deployable_component.py @@ -40,7 +40,7 @@ def test_from_response_json(self, mocker): def test_constructor_default(self, mocker): # Arrange mock_get_obj_from_json = mocker.patch( - "hsml.util.get_obj_from_json", return_value=None + "hopsworks_common.util.get_obj_from_json", return_value=None ) mock_ib_init = mocker.patch( "hsml.inference_batcher.InferenceBatcher.__init__", return_value=None @@ -73,7 +73,7 @@ def test_constructor_with_params(self, mocker): resources = {} inf_batcher = inference_batcher.InferenceBatcher() mock_get_obj_from_json = mocker.patch( - "hsml.util.get_obj_from_json", return_value=inf_batcher + "hopsworks_common.util.get_obj_from_json", return_value=inf_batcher ) mock_ib_init = mocker.patch( "hsml.inference_batcher.InferenceBatcher.__init__", return_value=None diff --git a/python/tests/test_deployment.py b/python/tests/test_deployment.py index 88cffe9fb..d9494fe62 100644 --- a/python/tests/test_deployment.py +++ b/python/tests/test_deployment.py @@ -636,7 +636,7 @@ def test_get_logs_default(self, mocker, backend_fixtures): p = self._get_dummy_predictor(mocker, backend_fixtures) d = deployment.Deployment(predictor=p) mock_util_get_members = mocker.patch( - "hsml.util.get_members", return_value=["predictor"] + "hopsworks_common.util.get_members", return_value=["predictor"] ) mock_print = mocker.patch("builtins.print") @@ -663,7 +663,7 @@ def test_get_logs_component_valid(self, mocker, backend_fixtures): p = self._get_dummy_predictor(mocker, backend_fixtures) d = deployment.Deployment(predictor=p) mock_util_get_members = mocker.patch( - "hsml.util.get_members", return_value=["valid"] + "hopsworks_common.util.get_members", return_value=["valid"] ) mock_print = mocker.patch("builtins.print") @@ -702,7 +702,7 @@ def test_get_logs_tail(self, mocker, backend_fixtures): p = self._get_dummy_predictor(mocker, backend_fixtures) d = deployment.Deployment(predictor=p) mock_util_get_members = mocker.patch( - "hsml.util.get_members", return_value=["predictor"] + "hopsworks_common.util.get_members", return_value=["predictor"] ) mock_print = mocker.patch("builtins.print") @@ -729,7 +729,7 @@ def test_get_logs_no_logs(self, mocker, backend_fixtures): p = self._get_dummy_predictor(mocker, backend_fixtures) d = deployment.Deployment(predictor=p) mock_util_get_members = mocker.patch( - "hsml.util.get_members", return_value=["predictor"] + "hopsworks_common.util.get_members", return_value=["predictor"] ) mock_print = mocker.patch("builtins.print") @@ -760,10 +760,10 @@ class MockClient: path = "/p/" + str(mock_client._project_id) + "/deployments/" + str(d.id) mock_util_get_hostname_replaced_url = mocker.patch( - "hsml.util.get_hostname_replaced_url", return_value="url" + "hopsworks_common.util.get_hostname_replaced_url", return_value="url" ) mock_client_get_instance = mocker.patch( - "hsml.client.get_instance", return_value=mock_client + "hopsworks_common.client.get_instance", return_value=mock_client ) # Act @@ -783,7 +783,7 @@ def _get_dummy_predictor(self, mocker, backend_fixtures): mocker.patch("hsml.predictor.Predictor._validate_serving_tool") mocker.patch("hsml.predictor.Predictor._validate_resources") mocker.patch("hsml.predictor.Predictor._validate_script_file") - mocker.patch("hsml.util.get_obj_from_json") + mocker.patch("hopsworks_common.util.get_obj_from_json") return predictor.Predictor( id=p_json["id"], name=p_json["name"], diff --git a/python/tests/test_inference_logger.py b/python/tests/test_inference_logger.py index 1f137cefa..62360c32a 100644 --- a/python/tests/test_inference_logger.py +++ b/python/tests/test_inference_logger.py @@ -107,7 +107,7 @@ def test_constructor_default(self, mocker): ) default_kt = kafka_topic.KafkaTopic() mock_util_get_obj_from_json = mocker.patch( - "hsml.util.get_obj_from_json", return_value=default_kt + "hopsworks_common.util.get_obj_from_json", return_value=default_kt ) # Act @@ -136,7 +136,7 @@ def test_constructor_mode_all(self, mocker, backend_fixtures): ) default_kt = kafka_topic.KafkaTopic() mock_util_get_obj_from_json = mocker.patch( - "hsml.util.get_obj_from_json", return_value=default_kt + "hopsworks_common.util.get_obj_from_json", return_value=default_kt ) # Act @@ -163,7 +163,7 @@ def test_constructor_mode_inputs(self, mocker, backend_fixtures): ) default_kt = kafka_topic.KafkaTopic() mock_util_get_obj_from_json = mocker.patch( - "hsml.util.get_obj_from_json", return_value=default_kt + "hopsworks_common.util.get_obj_from_json", return_value=default_kt ) # Act @@ -190,7 +190,7 @@ def test_constructor_mode_outputs(self, mocker, backend_fixtures): ) default_kt = kafka_topic.KafkaTopic() mock_util_get_obj_from_json = mocker.patch( - "hsml.util.get_obj_from_json", return_value=default_kt + "hopsworks_common.util.get_obj_from_json", return_value=default_kt ) # Act @@ -220,7 +220,7 @@ def test_constructor_mode_all_and_kafka_topic(self, mocker, backend_fixtures): ) kt = kafka_topic.KafkaTopic(json["kafka_topic"]["name"]) mock_util_get_obj_from_json = mocker.patch( - "hsml.util.get_obj_from_json", return_value=kt + "hopsworks_common.util.get_obj_from_json", return_value=kt ) # Act @@ -250,7 +250,7 @@ def test_constructor_mode_inputs_and_kafka_topic(self, mocker, backend_fixtures) ) kt = kafka_topic.KafkaTopic(json["kafka_topic"]["name"]) mock_util_get_obj_from_json = mocker.patch( - "hsml.util.get_obj_from_json", return_value=kt + "hopsworks_common.util.get_obj_from_json", return_value=kt ) # Act @@ -280,7 +280,7 @@ def test_constructor_mode_outputs_and_kafka_topic(self, mocker, backend_fixtures ) kt = kafka_topic.KafkaTopic(json["kafka_topic"]["name"]) mock_util_get_obj_from_json = mocker.patch( - "hsml.util.get_obj_from_json", return_value=kt + "hopsworks_common.util.get_obj_from_json", return_value=kt ) # Act @@ -310,7 +310,7 @@ def test_constructor_mode_none_and_kafka_topic(self, mocker, backend_fixtures): ) kt = kafka_topic.KafkaTopic(json["kafka_topic"]["name"]) mock_util_get_obj_from_json = mocker.patch( - "hsml.util.get_obj_from_json", return_value=kt + "hopsworks_common.util.get_obj_from_json", return_value=kt ) # Act diff --git a/python/tests/test_kafka_topic.py b/python/tests/test_kafka_topic.py index b9ada2a91..fa91388a9 100644 --- a/python/tests/test_kafka_topic.py +++ b/python/tests/test_kafka_topic.py @@ -18,8 +18,8 @@ import humps import pytest -from hsml import kafka_topic -from hsml.constants import KAFKA_TOPIC +from hopsworks_common import kafka_topic +from hopsworks_common.constants import KAFKA_TOPIC class TestKafkaTopic: @@ -31,7 +31,9 @@ def test_from_response_json_with_name_only(self, mocker, backend_fixtures): "response" ]["kafka_topic_dto"] json_camelized = humps.camelize(json) # as returned by the backend - mock_kt_from_json = mocker.patch("hsml.kafka_topic.KafkaTopic.from_json") + mock_kt_from_json = mocker.patch( + "hopsworks_common.kafka_topic.KafkaTopic.from_json" + ) # Act _ = kafka_topic.KafkaTopic.from_response_json(json_camelized) @@ -45,7 +47,9 @@ def test_from_response_json_with_name_and_config(self, mocker, backend_fixtures) "response" ]["kafka_topic_dto"] json_camelized = humps.camelize(json) # as returned by the backend - mock_kt_from_json = mocker.patch("hsml.kafka_topic.KafkaTopic.from_json") + mock_kt_from_json = mocker.patch( + "hopsworks_common.kafka_topic.KafkaTopic.from_json" + ) # Act _ = kafka_topic.KafkaTopic.from_response_json(json_camelized) @@ -61,10 +65,11 @@ def test_from_json_with_name_only(self, mocker, backend_fixtures): "response" ]["kafka_topic_dto"] mock_kt_extract_fields = mocker.patch( - "hsml.kafka_topic.KafkaTopic.extract_fields_from_json", return_value=json + "hopsworks_common.kafka_topic.KafkaTopic.extract_fields_from_json", + return_value=json, ) mock_kt_init = mocker.patch( - "hsml.kafka_topic.KafkaTopic.__init__", return_value=None + "hopsworks_common.kafka_topic.KafkaTopic.__init__", return_value=None ) # Act @@ -80,10 +85,11 @@ def test_from_json_with_name_and_config(self, mocker, backend_fixtures): "response" ]["kafka_topic_dto"] mock_kt_extract_fields = mocker.patch( - "hsml.kafka_topic.KafkaTopic.extract_fields_from_json", return_value=json + "hopsworks_common.kafka_topic.KafkaTopic.extract_fields_from_json", + return_value=json, ) mock_kt_init = mocker.patch( - "hsml.kafka_topic.KafkaTopic.__init__", return_value=None + "hopsworks_common.kafka_topic.KafkaTopic.__init__", return_value=None ) # Act @@ -101,7 +107,7 @@ def test_constructor_existing_with_name_only(self, mocker, backend_fixtures): "response" ]["kafka_topic_dto"] mock_kt_validate_topic_config = mocker.patch( - "hsml.kafka_topic.KafkaTopic._validate_topic_config", + "hopsworks_common.kafka_topic.KafkaTopic._validate_topic_config", return_value=(KAFKA_TOPIC.NUM_REPLICAS, KAFKA_TOPIC.NUM_PARTITIONS), ) @@ -121,7 +127,7 @@ def test_constructor_existing_with_name_and_config(self, mocker, backend_fixture "response" ]["kafka_topic_dto"] mock_kt_validate_topic_config = mocker.patch( - "hsml.kafka_topic.KafkaTopic._validate_topic_config", + "hopsworks_common.kafka_topic.KafkaTopic._validate_topic_config", return_value=(json["num_replicas"], json["num_partitions"]), ) diff --git a/python/tests/test_model.py b/python/tests/test_model.py index 841f3ce6e..44ec19b5b 100644 --- a/python/tests/test_model.py +++ b/python/tests/test_model.py @@ -344,10 +344,10 @@ class ClientMock: _project_id = 1 mock_client_get_instance = mocker.patch( - "hsml.client.get_instance", return_value=ClientMock() + "hopsworks_common.client.get_instance", return_value=ClientMock() ) mock_util_get_hostname_replaced_url = mocker.patch( - "hsml.util.get_hostname_replaced_url", return_value="full_path" + "hopsworks_common.util.get_hostname_replaced_url", return_value="full_path" ) path_arg = "/p/1/models/" + m_json["name"] + "/" + str(m_json["version"]) diff --git a/python/tests/test_predictor.py b/python/tests/test_predictor.py index 166666baf..a48c3d877 100644 --- a/python/tests/test_predictor.py +++ b/python/tests/test_predictor.py @@ -271,6 +271,7 @@ def test_validate_serving_tool_valid(self, mocker): self._mock_serving_variables( mocker, SERVING_NUM_INSTANCES_NO_LIMIT, is_saas_connection=False ) + mocker.patch("hopsworks_common.client.get_instance") # Act st = predictor.Predictor._validate_serving_tool(PREDICTOR.SERVING_TOOL_DEFAULT) @@ -283,6 +284,7 @@ def test_validate_serving_tool_invalid(self, mocker): self._mock_serving_variables( mocker, SERVING_NUM_INSTANCES_NO_LIMIT, is_saas_connection=False ) + mocker.patch("hopsworks_common.client.get_instance") # Act with pytest.raises(ValueError) as e_info: @@ -296,6 +298,7 @@ def test_validate_serving_tool_valid_saas_connection(self, mocker): self._mock_serving_variables( mocker, SERVING_NUM_INSTANCES_NO_LIMIT, is_saas_connection=True ) + mocker.patch("hopsworks_common.client.get_instance") # Act st = predictor.Predictor._validate_serving_tool(PREDICTOR.SERVING_TOOL_KSERVE) @@ -308,6 +311,7 @@ def test_validate_serving_tool_invalid_saas_connection(self, mocker): self._mock_serving_variables( mocker, SERVING_NUM_INSTANCES_NO_LIMIT, is_saas_connection=True ) + mocker.patch("hopsworks_common.client.get_instance") # Act with pytest.raises(ValueError) as e_info: @@ -612,7 +616,9 @@ def spec(model, model_name, model_version, model_path): pass mock_get_predictor_for_model = mocker.patch( - "hsml.util.get_predictor_for_model", return_value=True, spec=spec + "hopsworks_common.util.get_predictor_for_model", + return_value=True, + spec=spec, ) class MockModel: @@ -713,16 +719,22 @@ def _mock_serving_variables( is_kserve_installed=True, ): mocker.patch( - "hsml.client.get_serving_resource_limits", + "hopsworks_common.client.get_serving_resource_limits", return_value=SERVING_RESOURCE_LIMITS, ) mocker.patch( - "hsml.client.get_serving_num_instances_limits", return_value=num_instances + "hopsworks_common.client.get_serving_num_instances_limits", + return_value=num_instances, ) mocker.patch( - "hsml.client.is_scale_to_zero_required", return_value=force_scale_to_zero + "hopsworks_common.client.is_scale_to_zero_required", + return_value=force_scale_to_zero, ) - mocker.patch("hsml.client.is_saas_connection", return_value=is_saas_connection) mocker.patch( - "hsml.client.is_kserve_installed", return_value=is_kserve_installed + "hopsworks_common.client.is_saas_connection", + return_value=is_saas_connection, + ) + mocker.patch( + "hopsworks_common.client.is_kserve_installed", + return_value=is_kserve_installed, ) diff --git a/python/tests/test_resources.py b/python/tests/test_resources.py index f77863b38..c6ff55d05 100644 --- a/python/tests/test_resources.py +++ b/python/tests/test_resources.py @@ -187,7 +187,7 @@ def test_constructor_component_resources(self, mocker, backend_fixtures): "get_component_resources_num_instances_requests_and_limits" ]["response"] mock_util_get_obj_from_json = mocker.patch( - "hsml.util.get_obj_from_json", + "hopsworks_common.util.get_obj_from_json", side_effect=[json["requests"], json["limits"]], ) mock_default_resource_limits = mocker.patch( @@ -384,7 +384,7 @@ def test_get_default_resource_limits_no_hard_limit_and_lower_than_default( resources.ComponentResources._get_default_resource_limits ) mock_get_serving_res_limits = mocker.patch( - "hsml.client.get_serving_resource_limits", + "hopsworks_common.client.get_serving_resource_limits", return_value=no_limit_res, # no upper limit ) @@ -408,7 +408,7 @@ def test_get_default_resource_limits_no_hard_limit_and_higher_than_default( resources.ComponentResources._get_default_resource_limits ) mock_get_serving_res_limits = mocker.patch( - "hsml.client.get_serving_resource_limits", + "hopsworks_common.client.get_serving_resource_limits", return_value=no_limit_res, # no upper limit ) @@ -432,7 +432,7 @@ def test_get_default_resource_limits_with_higher_hard_limit_and_lower_than_defau resources.ComponentResources._get_default_resource_limits ) mock_get_serving_res_limits = mocker.patch( - "hsml.client.get_serving_resource_limits", + "hopsworks_common.client.get_serving_resource_limits", return_value=hard_limit_res, # upper limit ) @@ -456,7 +456,7 @@ def test_get_default_resource_limits_with_higher_hard_limit_and_higher_than_defa resources.ComponentResources._get_default_resource_limits ) mock_get_serving_res_limits = mocker.patch( - "hsml.client.get_serving_resource_limits", + "hopsworks_common.client.get_serving_resource_limits", return_value=hard_limit_res, # upper limit ) @@ -481,7 +481,7 @@ def test_get_default_resource_limits_with_lower_hard_limit_and_lower_than_defaul resources.ComponentResources._get_default_resource_limits ) mock_get_serving_res_limits = mocker.patch( - "hsml.client.get_serving_resource_limits", + "hopsworks_common.client.get_serving_resource_limits", return_value=hard_limit_res, # upper limit ) @@ -506,7 +506,7 @@ def test_get_default_resource_limits_with_lower_hard_limit_and_higher_than_defau resources.ComponentResources._get_default_resource_limits ) mock_get_serving_res_limits = mocker.patch( - "hsml.client.get_serving_resource_limits", + "hopsworks_common.client.get_serving_resource_limits", return_value=hard_limit_res, # upper limit ) @@ -527,7 +527,7 @@ def test_validate_resources_no_hard_limits_valid_resources(self, mocker): requests = resources.Resources(cores=1, memory=1024, gpus=0) limits = resources.Resources(cores=2, memory=2048, gpus=1) mock_get_serving_res_limits = mocker.patch( - "hsml.client.get_serving_resource_limits", + "hopsworks_common.client.get_serving_resource_limits", return_value=no_limit_res, # upper limit ) @@ -543,7 +543,7 @@ def test_validate_resources_no_hard_limit_invalid_cores_request(self, mocker): requests = resources.Resources(cores=0, memory=1024, gpus=0) limits = resources.Resources(cores=2, memory=2048, gpus=1) mock_get_serving_res_limits = mocker.patch( - "hsml.client.get_serving_resource_limits", + "hopsworks_common.client.get_serving_resource_limits", return_value=no_limit_res, # upper limit ) @@ -563,7 +563,7 @@ def test_validate_resources_no_hard_limit_invalid_memory_request(self, mocker): requests = resources.Resources(cores=1, memory=0, gpus=0) limits = resources.Resources(cores=2, memory=2048, gpus=1) mock_get_serving_res_limits = mocker.patch( - "hsml.client.get_serving_resource_limits", + "hopsworks_common.client.get_serving_resource_limits", return_value=no_limit_res, # upper limit ) @@ -585,7 +585,7 @@ def test_validate_resources_no_hard_limit_invalid_gpus_request(self, mocker): ) # 0 gpus is accepted limits = resources.Resources(cores=2, memory=2048, gpus=1) mock_get_serving_res_limits = mocker.patch( - "hsml.client.get_serving_resource_limits", + "hopsworks_common.client.get_serving_resource_limits", return_value=no_limit_res, # upper limit ) @@ -606,7 +606,7 @@ def test_validate_resources_no_hard_limit_cores_request_out_of_range(self, mocke requests = resources.Resources(cores=2, memory=1024, gpus=0) limits = resources.Resources(cores=1, memory=2048, gpus=1) mock_get_serving_res_limits = mocker.patch( - "hsml.client.get_serving_resource_limits", + "hopsworks_common.client.get_serving_resource_limits", return_value=no_limit_res, # upper limit ) @@ -629,7 +629,7 @@ def test_validate_resources_no_hard_limit_invalid_memory_request_out_of_range( requests = resources.Resources(cores=1, memory=2048, gpus=0) limits = resources.Resources(cores=2, memory=1024, gpus=1) mock_get_serving_res_limits = mocker.patch( - "hsml.client.get_serving_resource_limits", + "hopsworks_common.client.get_serving_resource_limits", return_value=no_limit_res, # upper limit ) @@ -652,7 +652,7 @@ def test_validate_resources_no_hard_limit_invalid_gpus_request_out_of_range( requests = resources.Resources(cores=1, memory=1024, gpus=2) limits = resources.Resources(cores=2, memory=2048, gpus=1) mock_get_serving_res_limits = mocker.patch( - "hsml.client.get_serving_resource_limits", + "hopsworks_common.client.get_serving_resource_limits", return_value=no_limit_res, # upper limit ) @@ -673,7 +673,7 @@ def test_validate_resources_with_hard_limit_valid_resources(self, mocker): requests = resources.Resources(cores=1, memory=1024, gpus=0) limits = resources.Resources(cores=2, memory=2048, gpus=1) mock_get_serving_res_limits = mocker.patch( - "hsml.client.get_serving_resource_limits", + "hopsworks_common.client.get_serving_resource_limits", return_value=hard_limit_res, # upper limit ) @@ -689,7 +689,7 @@ def test_validate_resources_with_hard_limit_invalid_cores_limit(self, mocker): requests = resources.Resources(cores=2, memory=1024, gpus=0) limits = resources.Resources(cores=0, memory=2048, gpus=1) mock_get_serving_res_limits = mocker.patch( - "hsml.client.get_serving_resource_limits", + "hopsworks_common.client.get_serving_resource_limits", return_value=hard_limit_res, # upper limit ) @@ -709,7 +709,7 @@ def test_validate_resources_with_hard_limit_invalid_memory_limit(self, mocker): requests = resources.Resources(cores=2, memory=1024, gpus=0) limits = resources.Resources(cores=1, memory=0, gpus=1) mock_get_serving_res_limits = mocker.patch( - "hsml.client.get_serving_resource_limits", + "hopsworks_common.client.get_serving_resource_limits", return_value=hard_limit_res, # upper limit ) @@ -727,7 +727,7 @@ def test_validate_resources_with_hard_limit_invalid_gpus_limit(self, mocker): requests = resources.Resources(cores=2, memory=1024, gpus=0) limits = resources.Resources(cores=1, memory=2048, gpus=-1) mock_get_serving_res_limits = mocker.patch( - "hsml.client.get_serving_resource_limits", + "hopsworks_common.client.get_serving_resource_limits", return_value=hard_limit_res, # upper limit ) @@ -747,7 +747,7 @@ def test_validate_resources_with_hard_limit_invalid_cores_request(self, mocker): requests = resources.Resources(cores=2, memory=1024, gpus=0) limits = resources.Resources(cores=4, memory=2048, gpus=1) mock_get_serving_res_limits = mocker.patch( - "hsml.client.get_serving_resource_limits", + "hopsworks_common.client.get_serving_resource_limits", return_value=hard_limit_res, # upper limit ) @@ -768,7 +768,7 @@ def test_validate_resources_with_hard_limit_invalid_memory_request(self, mocker) requests = resources.Resources(cores=2, memory=1024, gpus=0) limits = resources.Resources(cores=3, memory=4076, gpus=1) mock_get_serving_res_limits = mocker.patch( - "hsml.client.get_serving_resource_limits", + "hopsworks_common.client.get_serving_resource_limits", return_value=hard_limit_res, # upper limit ) @@ -789,7 +789,7 @@ def test_validate_resources_with_hard_limit_invalid_gpus_request(self, mocker): requests = resources.Resources(cores=2, memory=1024, gpus=0) limits = resources.Resources(cores=3, memory=2048, gpus=4) mock_get_serving_res_limits = mocker.patch( - "hsml.client.get_serving_resource_limits", + "hopsworks_common.client.get_serving_resource_limits", return_value=hard_limit_res, # upper limit ) @@ -808,7 +808,7 @@ def test_validate_resources_with_hard_limit_invalid_gpus_request(self, mocker): def test_from_response_json_predictor_resources(self, mocker, backend_fixtures): mocker.patch( - "hsml.client.get_serving_resource_limits", + "hopsworks_common.client.get_serving_resource_limits", return_value=SERVING_RESOURCE_LIMITS, ) json = backend_fixtures["resources"][ @@ -831,7 +831,7 @@ def test_from_response_json_predictor_resources_specific_keys( self, mocker, backend_fixtures ): mocker.patch( - "hsml.client.get_serving_resource_limits", + "hopsworks_common.client.get_serving_resource_limits", return_value=SERVING_RESOURCE_LIMITS, ) json = backend_fixtures["resources"][ @@ -854,7 +854,7 @@ def test_from_response_json_predictor_resources_specific_keys( def test_from_response_json_transformer_resources(self, mocker, backend_fixtures): mocker.patch( - "hsml.client.get_serving_resource_limits", + "hopsworks_common.client.get_serving_resource_limits", return_value=SERVING_RESOURCE_LIMITS, ) json = backend_fixtures["resources"][ @@ -877,7 +877,7 @@ def test_from_response_json_transformer_resources_specific_keys( self, mocker, backend_fixtures ): mocker.patch( - "hsml.client.get_serving_resource_limits", + "hopsworks_common.client.get_serving_resource_limits", return_value=SERVING_RESOURCE_LIMITS, ) json = backend_fixtures["resources"][ @@ -900,7 +900,7 @@ def test_from_response_json_transformer_resources_default_limits( self, mocker, backend_fixtures ): mocker.patch( - "hsml.client.get_serving_resource_limits", + "hopsworks_common.client.get_serving_resource_limits", return_value=SERVING_RESOURCE_LIMITS, ) mocker.patch( diff --git a/python/tests/test_transformer.py b/python/tests/test_transformer.py index 7df302bd6..10150a7f1 100644 --- a/python/tests/test_transformer.py +++ b/python/tests/test_transformer.py @@ -298,12 +298,14 @@ def test_extract_fields_from_json(self, mocker, backend_fixtures): def _mock_serving_variables(self, mocker, num_instances, force_scale_to_zero=False): mocker.patch( - "hsml.client.get_serving_resource_limits", + "hopsworks_common.client.get_serving_resource_limits", return_value=SERVING_RESOURCE_LIMITS, ) mocker.patch( - "hsml.client.get_serving_num_instances_limits", return_value=num_instances + "hopsworks_common.client.get_serving_num_instances_limits", + return_value=num_instances, ) mocker.patch( - "hsml.client.is_scale_to_zero_required", return_value=force_scale_to_zero + "hopsworks_common.client.is_scale_to_zero_required", + return_value=force_scale_to_zero, ) diff --git a/python/tests/test_util.py b/python/tests/test_util.py index 076b2aea7..f92358755 100644 --- a/python/tests/test_util.py +++ b/python/tests/test_util.py @@ -19,15 +19,15 @@ from datetime import date, datetime from urllib.parse import ParseResult -import hsfs.util +import hopsworks_common.util import pytest import pytz -from hsfs.client.exceptions import FeatureStoreException -from hsfs.core.constants import HAS_AIOMYSQL, HAS_SQLALCHEMY +from hopsworks_common import util +from hopsworks_common.client.exceptions import FeatureStoreException +from hopsworks_common.constants import MODEL +from hopsworks_common.core.constants import HAS_AIOMYSQL, HAS_SQLALCHEMY from hsfs.embedding import EmbeddingFeature, EmbeddingIndex from hsfs.feature import Feature -from hsml import util -from hsml.constants import MODEL from hsml.llm.model import Model as LLMModel from hsml.llm.predictor import Predictor as LLMPredictor from hsml.model import Model as BaseModel @@ -134,9 +134,15 @@ def test_set_model_class_unsupported(self, backend_fixtures): def test_input_example_to_json_from_numpy(self, mocker, input_example_numpy): # Arrange - mock_handle_tensor_input = mocker.patch("hsml.util._handle_tensor_input") - mock_handle_dataframe_input = mocker.patch("hsml.util._handle_dataframe_input") - mock_handle_dict_input = mocker.patch("hsml.util._handle_dict_input") + mock_handle_tensor_input = mocker.patch( + "hopsworks_common.util._handle_tensor_input" + ) + mock_handle_dataframe_input = mocker.patch( + "hopsworks_common.util._handle_dataframe_input" + ) + mock_handle_dict_input = mocker.patch( + "hopsworks_common.util._handle_dict_input" + ) # Act util.input_example_to_json(input_example_numpy) @@ -148,9 +154,15 @@ def test_input_example_to_json_from_numpy(self, mocker, input_example_numpy): def test_input_example_to_json_from_dict(self, mocker, input_example_dict): # Arrange - mock_handle_tensor_input = mocker.patch("hsml.util._handle_tensor_input") - mock_handle_dataframe_input = mocker.patch("hsml.util._handle_dataframe_input") - mock_handle_dict_input = mocker.patch("hsml.util._handle_dict_input") + mock_handle_tensor_input = mocker.patch( + "hopsworks_common.util._handle_tensor_input" + ) + mock_handle_dataframe_input = mocker.patch( + "hopsworks_common.util._handle_dataframe_input" + ) + mock_handle_dict_input = mocker.patch( + "hopsworks_common.util._handle_dict_input" + ) # Act util.input_example_to_json(input_example_dict) @@ -164,9 +176,15 @@ def test_input_example_to_json_from_dataframe( self, mocker, input_example_dataframe_pandas_dataframe ): # Arrange - mock_handle_tensor_input = mocker.patch("hsml.util._handle_tensor_input") - mock_handle_dataframe_input = mocker.patch("hsml.util._handle_dataframe_input") - mock_handle_dict_input = mocker.patch("hsml.util._handle_dict_input") + mock_handle_tensor_input = mocker.patch( + "hopsworks_common.util._handle_tensor_input" + ) + mock_handle_dataframe_input = mocker.patch( + "hopsworks_common.util._handle_dataframe_input" + ) + mock_handle_dict_input = mocker.patch( + "hopsworks_common.util._handle_dict_input" + ) # Act util.input_example_to_json(input_example_dataframe_pandas_dataframe) @@ -178,9 +196,15 @@ def test_input_example_to_json_from_dataframe( def test_input_example_to_json_unsupported(self, mocker): # Arrange - mock_handle_tensor_input = mocker.patch("hsml.util._handle_tensor_input") - mock_handle_dataframe_input = mocker.patch("hsml.util._handle_dataframe_input") - mock_handle_dict_input = mocker.patch("hsml.util._handle_dict_input") + mock_handle_tensor_input = mocker.patch( + "hopsworks_common.util._handle_tensor_input" + ) + mock_handle_dataframe_input = mocker.patch( + "hopsworks_common.util._handle_dataframe_input" + ) + mock_handle_dict_input = mocker.patch( + "hopsworks_common.util._handle_dict_input" + ) # Act util.input_example_to_json(lambda unsupported_type: None) @@ -549,7 +573,7 @@ def test_get_hostname_replaced_url(self, mocker): mock_client = mocker.MagicMock() mock_client._base_url = base_url + "url" mock_client.replace_public_host = mocker.MagicMock(return_value=mock_url_parsed) - mocker.patch("hsml.client.get_instance", return_value=mock_client) + mocker.patch("hopsworks_common.client.get_instance", return_value=mock_client) # Act url = util.get_hostname_replaced_url(sub_path) @@ -593,7 +617,7 @@ class TEST: def test_extract_field_from_json(self, mocker): # Arrange json = {"a": "1", "b": "2"} - get_obj_from_json = mocker.patch("hsml.util.get_obj_from_json") + get_obj_from_json = mocker.patch("hopsworks_common.util.get_obj_from_json") # Act b = util.extract_field_from_json(json, "b") @@ -605,7 +629,7 @@ def test_extract_field_from_json(self, mocker): def test_extract_field_from_json_fields(self, mocker): # Arrange json = {"a": "1", "b": "2"} - get_obj_from_json = mocker.patch("hsml.util.get_obj_from_json") + get_obj_from_json = mocker.patch("hopsworks_common.util.get_obj_from_json") # Act b = util.extract_field_from_json(json, ["B", "b"]) # alternative fields @@ -618,7 +642,7 @@ def test_extract_field_from_json_as_instance_of_str(self, mocker): # Arrange json = {"a": "1", "b": "2"} get_obj_from_json = mocker.patch( - "hsml.util.get_obj_from_json", return_value="2" + "hopsworks_common.util.get_obj_from_json", return_value="2" ) # Act @@ -632,7 +656,7 @@ def test_extract_field_from_json_as_instance_of_list_str(self, mocker): # Arrange json = {"a": "1", "b": ["2", "2", "2"]} get_obj_from_json = mocker.patch( - "hsml.util.get_obj_from_json", return_value="2" + "hopsworks_common.util.get_obj_from_json", return_value="2" ) # Act @@ -704,91 +728,107 @@ class Test: assert "cannot be converted to class" in str(e_info.value) def test_get_hudi_datestr_from_timestamp(self): - dt = hsfs.util.get_hudi_datestr_from_timestamp(1640995200000) + dt = hopsworks_common.util.get_hudi_datestr_from_timestamp(1640995200000) assert dt == "20220101000000000" def test_convert_event_time_to_timestamp_timestamp(self): - dt = hsfs.util.convert_event_time_to_timestamp(1640995200) + dt = hopsworks_common.util.convert_event_time_to_timestamp(1640995200) assert dt == 1640995200000 def test_convert_event_time_to_timestamp_datetime(self): - dt = hsfs.util.convert_event_time_to_timestamp(datetime(2022, 1, 1, 0, 0, 0)) + dt = hopsworks_common.util.convert_event_time_to_timestamp( + datetime(2022, 1, 1, 0, 0, 0) + ) assert dt == 1640995200000 def test_convert_event_time_to_timestamp_datetime_tz(self): - dt = hsfs.util.convert_event_time_to_timestamp( + dt = hopsworks_common.util.convert_event_time_to_timestamp( pytz.timezone("US/Pacific").localize(datetime(2021, 12, 31, 16, 0, 0)) ) assert dt == 1640995200000 def test_convert_event_time_to_timestamp_date(self): - dt = hsfs.util.convert_event_time_to_timestamp(date(2022, 1, 1)) + dt = hopsworks_common.util.convert_event_time_to_timestamp(date(2022, 1, 1)) assert dt == 1640995200000 def test_convert_event_time_to_timestamp_string(self): - dt = hsfs.util.convert_event_time_to_timestamp("2022-01-01 00:00:00") + dt = hopsworks_common.util.convert_event_time_to_timestamp( + "2022-01-01 00:00:00" + ) assert dt == 1640995200000 def test_convert_iso_event_time_to_timestamp_string(self): - dt = hsfs.util.convert_event_time_to_timestamp("2022-01-01T00:00:00.000000Z") + dt = hopsworks_common.util.convert_event_time_to_timestamp( + "2022-01-01T00:00:00.000000Z" + ) assert dt == 1640995200000 def test_convert_event_time_to_timestamp_yyyy_mm_dd(self): - timestamp = hsfs.util.get_timestamp_from_date_string("2022-01-01") + timestamp = hopsworks_common.util.get_timestamp_from_date_string("2022-01-01") assert timestamp == 1640995200000 def test_convert_event_time_to_timestamp_yyyy_mm_dd_hh(self): - timestamp = hsfs.util.get_timestamp_from_date_string("2022-01-01 00") + timestamp = hopsworks_common.util.get_timestamp_from_date_string( + "2022-01-01 00" + ) assert timestamp == 1640995200000 def test_convert_event_time_to_timestamp_yyyy_mm_dd_hh_mm(self): - timestamp = hsfs.util.get_timestamp_from_date_string("2022-01-01 00:00") + timestamp = hopsworks_common.util.get_timestamp_from_date_string( + "2022-01-01 00:00" + ) assert timestamp == 1640995200000 def test_convert_event_time_to_timestamp_yyyy_mm_dd_hh_mm_ss(self): - timestamp = hsfs.util.get_timestamp_from_date_string("2022-01-01 00:00:00") + timestamp = hopsworks_common.util.get_timestamp_from_date_string( + "2022-01-01 00:00:00" + ) assert timestamp == 1640995200000 def test_convert_event_time_to_timestamp_yyyy_mm_dd_hh_mm_ss_f(self): - timestamp = hsfs.util.get_timestamp_from_date_string("2022-01-01 00:00:00.000") + timestamp = hopsworks_common.util.get_timestamp_from_date_string( + "2022-01-01 00:00:00.000" + ) assert timestamp == 1640995200000 def test_convert_event_time_to_timestamp_yyyy_mm_dd_hh_mm_ss_error(self): with pytest.raises(ValueError): - hsfs.util.get_timestamp_from_date_string("2022-13-01 00:00:00") + hopsworks_common.util.get_timestamp_from_date_string("2022-13-01 00:00:00") def test_convert_event_time_to_timestamp_yyyy_mm_dd_hh_mm_ss_error2(self): with pytest.raises(ValueError): - hsfs.util.get_timestamp_from_date_string("202-13-01 00:00:00") + hopsworks_common.util.get_timestamp_from_date_string("202-13-01 00:00:00") def test_convert_event_time_to_timestamp_yyyy_mm_dd_hh_mm_ss_error3(self): with pytest.raises(ValueError): - hsfs.util.get_timestamp_from_date_string("00:00:00 2022-01-01") + hopsworks_common.util.get_timestamp_from_date_string("00:00:00 2022-01-01") def test_convert_hudi_commit_time_to_timestamp(self): - timestamp = hsfs.util.get_timestamp_from_date_string("20221118095233099") + timestamp = hopsworks_common.util.get_timestamp_from_date_string( + "20221118095233099" + ) assert timestamp == 1668765153099 def test_get_dataset_type_HIVEDB(self): - db_type = hsfs.util.get_dataset_type( + db_type = hopsworks_common.util.get_dataset_type( "/apps/hive/warehouse/temp_featurestore.db/storage_connector_resources/kafka__tstore.jks" ) assert db_type == "HIVEDB" def test_get_dataset_type_HIVEDB_with_dfs(self): - db_type = hsfs.util.get_dataset_type( + db_type = hopsworks_common.util.get_dataset_type( "hdfs:///apps/hive/warehouse/temp_featurestore.db/storage_connector_resources/kafka__tstore.jks" ) assert db_type == "HIVEDB" def test_get_dataset_type_DATASET(self): - db_type = hsfs.util.get_dataset_type( + db_type = hopsworks_common.util.get_dataset_type( "/Projects/temp/Resources/kafka__tstore.jks" ) assert db_type == "DATASET" def test_get_dataset_type_DATASET_with_dfs(self): - db_type = hsfs.util.get_dataset_type( + db_type = hopsworks_common.util.get_dataset_type( "hdfs:///Projects/temp/Resources/kafka__tstore.jks" ) assert db_type == "DATASET" @@ -798,7 +838,7 @@ def test_get_job_url(self, mocker): mock_client_get_instance = mocker.patch("hopsworks_common.client.get_instance") # Act - hsfs.util.get_job_url(href="1/2/3/4/5/6/7/8") + hopsworks_common.util.get_job_url(href="1/2/3/4/5/6/7/8") # Assert assert ( @@ -819,7 +859,7 @@ def test_get_feature_group_url(self, mocker): mock_client_get_instance.return_value._project_id = 50 # Act - hsfs.util.get_feature_group_url( + hopsworks_common.util.get_feature_group_url( feature_group_id=feature_group_id, feature_store_id=feature_store_id ) @@ -846,7 +886,7 @@ def test_valid_embedding_type(self): Feature(name="feature4", type="array"), ] # Call the method and expect no exceptions - hsfs.util.validate_embedding_feature_type(embedding_index, schema) + hopsworks_common.util.validate_embedding_feature_type(embedding_index, schema) def test_invalid_embedding_type(self): embedding_index = EmbeddingIndex( @@ -862,7 +902,9 @@ def test_invalid_embedding_type(self): ] # Call the method and expect a FeatureStoreException with pytest.raises(FeatureStoreException): - hsfs.util.validate_embedding_feature_type(embedding_index, schema) + hopsworks_common.util.validate_embedding_feature_type( + embedding_index, schema + ) def test_missing_embedding_index(self): # Define a schema without an embedding index @@ -871,7 +913,7 @@ def test_missing_embedding_index(self): Feature(name="feature2", type="array"), ] # Call the method with an empty feature_group (no embedding index) - hsfs.util.validate_embedding_feature_type(None, schema) + hopsworks_common.util.validate_embedding_feature_type(None, schema) # No exception should be raised def test_empty_schema(self): @@ -884,7 +926,7 @@ def test_empty_schema(self): # Define an empty schema schema = [] # Call the method with an empty schema - hsfs.util.validate_embedding_feature_type(embedding_index, schema) + hopsworks_common.util.validate_embedding_feature_type(embedding_index, schema) # No exception should be raised @pytest.mark.skipif( @@ -895,7 +937,9 @@ def test_create_async_engine(self, mocker): # Test when get_running_loop() raises a RuntimeError with patch("asyncio.get_running_loop", side_effect=RuntimeError): # mock storage connector - online_connector = patch.object(hsfs.util, "get_online_connector") + online_connector = patch.object( + hopsworks_common.util, "get_online_connector" + ) with pytest.raises( RuntimeError, match="Event loop is not running. Please invoke this co-routine from a running loop or provide an event loop.",