From 0221954b5ec7bc72eb88c6a11a37c99d475467ea Mon Sep 17 00:00:00 2001 From: Aleksey Veresov Date: Thu, 31 Oct 2024 13:58:19 +0100 Subject: [PATCH 1/7] Add engine parameter to hopsworks.login (#385) * Add engine parameter to hopsworks.login * Fix the docstring for the engine parameter * Fix typing * Remove engine param from get_feature_store * Remove redundant code from get_feature_store --- python/hopsworks/__init__.py | 13 ++++++++++++- python/hopsworks_common/connection.py | 20 ++------------------ python/hopsworks_common/project.py | 7 ++----- 3 files changed, 16 insertions(+), 24 deletions(-) diff --git a/python/hopsworks/__init__.py b/python/hopsworks/__init__.py index 79d500769..220dcadb8 100644 --- a/python/hopsworks/__init__.py +++ b/python/hopsworks/__init__.py @@ -22,6 +22,7 @@ import tempfile import warnings from pathlib import Path +from typing import Literal, Union from hopsworks import client, constants, project, version from hopsworks.client.exceptions import ( @@ -83,6 +84,7 @@ def login( api_key_file: str = None, hostname_verification: bool = False, trust_store_path: str = None, + engine: Union[None, Literal["spark"], Literal["python"], Literal["training"]] = None, ) -> project.Project: """Connect to [Serverless Hopsworks](https://app.hopsworks.ai) by calling the `hopsworks.login()` function with no arguments. @@ -122,6 +124,13 @@ def login( api_key_file: Path to file wih Api Key hostname_verification: Whether to verify Hopsworks' certificate trust_store_path: Path on the file system containing the Hopsworks certificates + engine: Which engine to use, `"spark"`, `"python"` or `"training"`. Defaults to `None`, + which initializes the engine to Spark if the environment provides Spark, for + example on Hopsworks and Databricks, or falls back to Python if Spark is not + available, e.g. on local Python environments or AWS SageMaker. This option + allows you to override this behaviour. `"training"` engine is useful when only + feature store metadata is needed, for example training dataset location and label + information when Hopsworks training experiment is conducted. # Returns `Project`: The Project object to perform operations on # Raises @@ -138,7 +147,7 @@ def login( # If inside hopsworks, just return the current project for now if "REST_ENDPOINT" in os.environ: - _hw_connection = _hw_connection(hostname_verification=hostname_verification) + _hw_connection = _hw_connection(hostname_verification=hostname_verification, engine=engine) _connected_project = _hw_connection.get_project() _initialize_module_apis() print("\nLogged in to project, explore it here " + _connected_project.get_url()) @@ -207,6 +216,7 @@ def login( _hw_connection = _hw_connection( host=host, port=port, + engine=engine, api_key_file=api_key_path, hostname_verification=hostname_verification, trust_store_path=trust_store_path, @@ -246,6 +256,7 @@ def login( _hw_connection = _hw_connection( host=host, port=port, + engine=engine, api_key_value=api_key, hostname_verification=hostname_verification, trust_store_path=trust_store_path, diff --git a/python/hopsworks_common/connection.py b/python/hopsworks_common/connection.py index 49f504932..43a64bc76 100644 --- a/python/hopsworks_common/connection.py +++ b/python/hopsworks_common/connection.py @@ -100,7 +100,7 @@ class Connection: Defaults to `None`. engine: Which engine to use, `"spark"`, `"python"` or `"training"`. Defaults to `None`, which initializes the engine to Spark if the environment provides Spark, for - example on Hopsworks and Databricks, or falls back on Hive in Python if Spark is not + example on Hopsworks and Databricks, or falls back to Python if Spark is not available, e.g. on local Python environments or AWS SageMaker. This option allows you to override this behaviour. `"training"` engine is useful when only feature store metadata is needed, for example training dataset location and label @@ -151,7 +151,6 @@ def __init__( def get_feature_store( self, name: Optional[str] = None, - engine: Optional[str] = None, ): # -> feature_store.FeatureStore # the typing is commented out due to circular dependency, it breaks auto_doc.py """Get a reference to a feature store to perform operations on. @@ -161,25 +160,10 @@ def get_feature_store( # Arguments name: The name of the feature store, defaults to `None`. - engine: Which engine to use, `"spark"`, `"python"` or `"training"`. Defaults to `None`, - which initializes the engine to Spark if the environment provides Spark, for - example on Hopsworks and Databricks, or falls back on Hive in Python if Spark is not - available, e.g. on local Python environments or AWS SageMaker. This option - allows you to override this behaviour. `"training"` engine is useful when only - feature store metadata is needed, for example training dataset location and label - information when Hopsworks training experiment is conducted. # Returns `FeatureStore`. A feature store handle object to perform operations on. """ - # Ensure the engine is initialized and of right type - from hsfs import engine as hsfs_engine - - if engine: - global _hsfs_engine_type - _hsfs_engine_type = engine - hsfs_engine.get_instance() - if not name: name = client.get_instance()._project_name return self._feature_store_api.get(util.append_feature_store_suffix(name)) @@ -532,7 +516,7 @@ def connection( Defaults to `None`. engine: Which engine to use, `"spark"`, `"python"` or `"training"`. Defaults to `None`, which initializes the engine to Spark if the environment provides Spark, for - example on Hopsworks and Databricks, or falls back on Hive in Python if Spark is not + example on Hopsworks and Databricks, or falls back to Python if Spark is not available, e.g. on local Python environments or AWS SageMaker. This option allows you to override this behaviour. `"training"` engine is useful when only feature store metadata is needed, for example training dataset location and label diff --git a/python/hopsworks_common/project.py b/python/hopsworks_common/project.py index 7705b603b..b35cac288 100644 --- a/python/hopsworks_common/project.py +++ b/python/hopsworks_common/project.py @@ -109,7 +109,7 @@ def project_namespace(self): return self._project_namespace def get_feature_store( - self, name: Optional[str] = None, engine: Optional[str] = None + self, name: Optional[str] = None ): # -> hsfs.feature_store.FeatureStore """Connect to Project's Feature Store. @@ -127,15 +127,12 @@ def get_feature_store( # Arguments name: Project name of the feature store. - engine: Which engine to use, `"spark"`, `"python"` or `"training"`. - Defaults to `"python"` when connected to [Serverless Hopsworks](https://app.hopsworks.ai). - See [`hopsworks.connection`](connection.md#connection) documentation for more information. # Returns `hsfs.feature_store.FeatureStore`: The Feature Store API # Raises `RestAPIError`: If unable to connect """ - return client.get_connection().get_feature_store(name, engine) + return client.get_connection().get_feature_store(name) def get_model_registry(self): """Connect to Project's Model Registry API. From 8eb5c79b6aae70d417e1e62af06ed0b1f5c4f976 Mon Sep 17 00:00:00 2001 From: Javier Cabrera Date: Fri, 1 Nov 2024 09:35:28 +0100 Subject: [PATCH 2/7] Workflow for kube locust benchmark image (#386) --- locust_benchmark/Jenkinsfile | 20 ++++++++++++++++++++ locust_benchmark/KUBE_IMAGE_VERSION | 1 + locust_benchmark/build-manifest.json | 11 +++++++++++ 3 files changed, 32 insertions(+) create mode 100644 locust_benchmark/Jenkinsfile create mode 100644 locust_benchmark/KUBE_IMAGE_VERSION create mode 100644 locust_benchmark/build-manifest.json diff --git a/locust_benchmark/Jenkinsfile b/locust_benchmark/Jenkinsfile new file mode 100644 index 000000000..9d4465e97 --- /dev/null +++ b/locust_benchmark/Jenkinsfile @@ -0,0 +1,20 @@ +@Library("jenkins-library@main") + +import com.logicalclocks.jenkins.k8s.ImageBuilder + + +node("local") { + stage('Clone repository') { + checkout scm + } + + stage('Build and push image(s)') { + version = readFile "${env.WORKSPACE}/locust_benchmark/KUBE_IMAGE_VERSION" + withEnv(["VERSION=${version.trim()}"]) { + + def builder = new ImageBuilder(this) + m = readFile "${env.WORKSPACE}/locust_benchmark/build-manifest.json" + builder.run(m) + } + } +} \ No newline at end of file diff --git a/locust_benchmark/KUBE_IMAGE_VERSION b/locust_benchmark/KUBE_IMAGE_VERSION new file mode 100644 index 000000000..8b25206ff --- /dev/null +++ b/locust_benchmark/KUBE_IMAGE_VERSION @@ -0,0 +1 @@ +master \ No newline at end of file diff --git a/locust_benchmark/build-manifest.json b/locust_benchmark/build-manifest.json new file mode 100644 index 000000000..b09a9e8b2 --- /dev/null +++ b/locust_benchmark/build-manifest.json @@ -0,0 +1,11 @@ +[ + { + "name": "hopsworks/locust-hsfs", + "version": "env:VERSION", + "dockerFile": "locust_benchmark/Dockerfile", + "platforms": { + "append": ["linux/arm64"] + }, + "canUseCache": "true" + } +] \ No newline at end of file From cd14180c43540b0f0176c309fac0a2e0c51c927d Mon Sep 17 00:00:00 2001 From: Javier Cabrera Date: Fri, 1 Nov 2024 11:29:39 +0100 Subject: [PATCH 3/7] [HWORKS-1779][Append] Disable arm64 from build-manifest.json (#387) --- locust_benchmark/build-manifest.json | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/locust_benchmark/build-manifest.json b/locust_benchmark/build-manifest.json index b09a9e8b2..48599c6dc 100644 --- a/locust_benchmark/build-manifest.json +++ b/locust_benchmark/build-manifest.json @@ -3,9 +3,6 @@ "name": "hopsworks/locust-hsfs", "version": "env:VERSION", "dockerFile": "locust_benchmark/Dockerfile", - "platforms": { - "append": ["linux/arm64"] - }, "canUseCache": "true" } -] \ No newline at end of file +] From 90b1e386137e9be71488d70381a5a6ced1696918 Mon Sep 17 00:00:00 2001 From: Fabio Buso Date: Fri, 1 Nov 2024 12:03:26 +0100 Subject: [PATCH 4/7] [FSTORE-1593] S3 Connector doesn't refetch credentials when preparing Spark write (#389) --- python/hsfs/storage_connector.py | 1 + python/tests/test_feature_group.py | 3 +++ 2 files changed, 4 insertions(+) diff --git a/python/hsfs/storage_connector.py b/python/hsfs/storage_connector.py index 7ed887cd9..15ccdc8d6 100644 --- a/python/hsfs/storage_connector.py +++ b/python/hsfs/storage_connector.py @@ -369,6 +369,7 @@ def prepare_spark(self, path: Optional[str] = None) -> Optional[str]: # Arguments path: Path to prepare for reading from cloud storage. Defaults to `None`. """ + self.refetch() return engine.get_instance().setup_storage_connector(self, path) def connector_options(self) -> Dict[str, Any]: diff --git a/python/tests/test_feature_group.py b/python/tests/test_feature_group.py index 5e01b5a10..ea25bbff3 100644 --- a/python/tests/test_feature_group.py +++ b/python/tests/test_feature_group.py @@ -928,6 +928,7 @@ def test_prepare_spark_location_with_s3_connector(self, mocker, backend_fixtures # Arrange engine = spark.Engine() engine_instance = mocker.patch("hsfs.engine.get_instance", return_value=engine) + refetch_api = mocker.patch("hsfs.storage_connector.S3Connector.refetch") json = backend_fixtures["feature_group"]["get_basic_info"]["response"] fg = feature_group.FeatureGroup.from_response_json(json) fg._location = f"{fg.name}_{fg.version}" @@ -939,11 +940,13 @@ def test_prepare_spark_location_with_s3_connector(self, mocker, backend_fixtures # Assert assert fg.location == path engine_instance.assert_called_once() + refetch_api.assert_called_once() def test_prepare_spark_location_with_s3_connector_python(self, mocker, backend_fixtures): # Arrange engine = python.Engine() engine_instance = mocker.patch("hsfs.engine.get_instance", return_value=engine) + mocker.patch("hsfs.storage_connector.S3Connector.refetch") json = backend_fixtures["feature_group"]["get_basic_info"]["response"] fg = feature_group.FeatureGroup.from_response_json(json) fg._location = f"{fg.name}_{fg.version}" From bbf9326843e366c22098e982e7ba76336a8a86fb Mon Sep 17 00:00:00 2001 From: Dhananjay Mukhedkar <55157590+dhananjay-mk@users.noreply.github.com> Date: Fri, 1 Nov 2024 14:13:21 +0100 Subject: [PATCH 5/7] [FSTORE-1589] refactor locust client as per 4.x hopsworks login (#379) --- locust_benchmark/Dockerfile | 2 +- locust_benchmark/common/hopsworks_client.py | 25 ++++++-------- locust_benchmark/locustfile.py | 37 ++++++--------------- locust_benchmark/requirements.txt | 3 +- 4 files changed, 24 insertions(+), 43 deletions(-) diff --git a/locust_benchmark/Dockerfile b/locust_benchmark/Dockerfile index e437ab9b2..47ef44106 100644 --- a/locust_benchmark/Dockerfile +++ b/locust_benchmark/Dockerfile @@ -1,4 +1,4 @@ -FROM locustio/locust:2.17.0 +FROM locustio/locust:2.23.1 USER root diff --git a/locust_benchmark/common/hopsworks_client.py b/locust_benchmark/common/hopsworks_client.py index b9fbcae04..83963d3ff 100644 --- a/locust_benchmark/common/hopsworks_client.py +++ b/locust_benchmark/common/hopsworks_client.py @@ -7,10 +7,8 @@ import pandas as pd from locust.runners import MasterRunner, LocalRunner -import hsfs -from hsfs import client -from hsfs.client.exceptions import RestAPIError +import hopsworks class HopsworksClient: @@ -21,14 +19,14 @@ def __init__(self, environment=None): environment.runner, (MasterRunner, LocalRunner) ): print(self.hopsworks_config) - self.connection = hsfs.connection( + self.project = hopsworks.login( project=self.hopsworks_config.get("project", "test"), host=self.hopsworks_config.get("host", "localhost"), port=self.hopsworks_config.get("port", 443), api_key_file=".api_key", engine="python", ) - self.fs = self.connection.get_feature_store() + self.fs = self.project.get_feature_store() # test settings self.external = self.hopsworks_config.get("external", False) @@ -59,18 +57,15 @@ def insert_data(self, locust_fg): return locust_fg def get_or_create_fv(self, fg=None): - try: - return self.fs.get_feature_view("locust_fv", version=1) - except RestAPIError: - return self.fs.create_feature_view( - name="locust_fv", - query=fg.select_all(), - version=1, - ) + if fg is None: + fg = self.get_or_create_fg() + return self.fs.get_or_create_feature_view( + name="locust_fv", version=1, query=fg.select_all() + ) def close(self): - if client._client is not None: - self.connection.close() + if self.project is not None: + hopsworks.logout() def generate_insert_df(self, rows, schema_repetitions): data = {"ip": range(0, rows)} diff --git a/locust_benchmark/locustfile.py b/locust_benchmark/locustfile.py index d2d3ff933..105d80abd 100644 --- a/locust_benchmark/locustfile.py +++ b/locust_benchmark/locustfile.py @@ -3,7 +3,7 @@ from common.hopsworks_client import HopsworksClient from common.stop_watch import stopwatch from locust import HttpUser, User, task, constant, events -from locust.runners import MasterRunner, LocalRunner +from locust.runners import MasterRunner from urllib3 import PoolManager import nest_asyncio @@ -11,12 +11,8 @@ @events.init.add_listener def on_locust_init(environment, **kwargs): print("Locust process init") - - if isinstance(environment.runner, (MasterRunner, LocalRunner)): - # create feature view - environment.hopsworks_client = HopsworksClient(environment) - fg = environment.hopsworks_client.get_or_create_fg() - environment.hopsworks_client.get_or_create_fv(fg) + environment.hopsworks_client = HopsworksClient(environment) + environment.hopsworks_client.get_or_create_fg() @events.quitting.add_listener @@ -61,27 +57,21 @@ def get_feature_vector(self): class MySQLFeatureVectorLookup(User): - wait_time = constant(0) - weight = 5 - # fixed_count = 1 + wait_time = constant(0.001) + weight = 2 def __init__(self, environment): super().__init__(environment) - self.env = environment - self.client = HopsworksClient(environment) - self.fv = self.client.get_or_create_fv() + self.client = environment.hopsworks_client def on_start(self): - print("Init user") + self.fv = self.client.get_or_create_fv() self.fv.init_serving(external=self.client.external) nest_asyncio.apply() - def on_stop(self): - print("Closing user") - @task def get_feature_vector(self): - self._get_feature_vector({"ip": random.randint(0, self.client.rows - 1)}) + return self._get_feature_vector({"ip": random.randint(0, self.client.rows - 1)}) @stopwatch def _get_feature_vector(self, pk): @@ -89,14 +79,12 @@ def _get_feature_vector(self, pk): class MySQLFeatureVectorBatchLookup(User): - wait_time = constant(0) + wait_time = constant(0.001) weight = 1 - # fixed_count = 1 def __init__(self, environment): super().__init__(environment) - self.env = environment - self.client = HopsworksClient(environment) + self.client = environment.hopsworks_client self.fv = self.client.get_or_create_fv() def on_start(self): @@ -104,16 +92,13 @@ def on_start(self): self.fv.init_serving(external=self.client.external) nest_asyncio.apply() - def on_stop(self): - print("Closing user") - @task def get_feature_vector_batch(self): pks = [ {"ip": random.randint(0, self.client.rows - 1)} for i in range(self.client.batch_size) ] - self._get_feature_vectors(pks) + return self._get_feature_vectors(pks) @stopwatch def _get_feature_vectors(self, pk): diff --git a/locust_benchmark/requirements.txt b/locust_benchmark/requirements.txt index 2eef53a7f..d992f8066 100644 --- a/locust_benchmark/requirements.txt +++ b/locust_benchmark/requirements.txt @@ -1,3 +1,4 @@ markupsafe==2.0.1 -locust==2.17.0 +locust==2.23.1 +nest_asyncio==1.6.0 git+https://github.com/logicalclocks/hopsworks-api@main#egg=hopsworks[python]&subdirectory=python \ No newline at end of file From 550bdb4e665791b676b868f29e230ad99323c1b2 Mon Sep 17 00:00:00 2001 From: Fabio Buso Date: Fri, 1 Nov 2024 14:34:12 +0100 Subject: [PATCH 6/7] Add tablespace support to locust (#390) Co-authored-by: Javier Cabrera --- locust_benchmark/README.md | 1 + locust_benchmark/common/hopsworks_client.py | 2 ++ locust_benchmark/hopsworks_config.json | 9 +++++---- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/locust_benchmark/README.md b/locust_benchmark/README.md index c390b39db..eda8b440e 100644 --- a/locust_benchmark/README.md +++ b/locust_benchmark/README.md @@ -87,6 +87,7 @@ echo "[YOUR KEY]" > .api_key - `schema_repetitions`: This controls the number of features for the lookup. One schema repetition will result in 10 features plus primary key. Five repetitions will result in 50 features plus primary key. - `recreate_feature_group`: This controls if the previous feature group should be dropped and recreated. Set this to true when rerunning the benchmark with different size of rows or schema repetitions. - `batch_size`: This is relevant for the actual benchmark and controls how many feature vectors are looked up in the batch benchmark. +- `tablespace`: (Optional) If set creates a feature group using on-disk data. 3. Create the feature group diff --git a/locust_benchmark/common/hopsworks_client.py b/locust_benchmark/common/hopsworks_client.py index 83963d3ff..d82409892 100644 --- a/locust_benchmark/common/hopsworks_client.py +++ b/locust_benchmark/common/hopsworks_client.py @@ -36,6 +36,7 @@ def __init__(self, environment=None): "recreate_feature_group", False ) self.batch_size = self.hopsworks_config.get("batch_size", 100) + self.tablespace = self.hopsworks_config.get("tablespace", None) def get_or_create_fg(self): locust_fg = self.fs.get_or_create_feature_group( @@ -44,6 +45,7 @@ def get_or_create_fg(self): primary_key=["ip"], online_enabled=True, stream=True, + online_config={'table_space': self.tablespace} if self.tablespace else None ) return locust_fg diff --git a/locust_benchmark/hopsworks_config.json b/locust_benchmark/hopsworks_config.json index 6a8e60862..6e92b6739 100644 --- a/locust_benchmark/hopsworks_config.json +++ b/locust_benchmark/hopsworks_config.json @@ -1,10 +1,11 @@ { - "host": "localhost", + "host": "mercury.hops.works", "port": 443, - "project": "test", + "project": "fabio_demo", "external": true, - "rows": 100000, + "rows": 1000, "schema_repetitions": 1, "recreate_feature_group": true, - "batch_size": 100 + "batch_size": 100, + "tablespace": "ts1" } From e278ac0a84bcdafaf1a59633b3fc7ed03cb9d6c9 Mon Sep 17 00:00:00 2001 From: rcnnnghm Date: Fri, 1 Nov 2024 16:23:21 +0000 Subject: [PATCH 7/7] Close ``` to fix delta vacuum scroll in docs. (#391) --- python/hsfs/feature_group.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/hsfs/feature_group.py b/python/hsfs/feature_group.py index 9d286cc29..e2c42f1a3 100644 --- a/python/hsfs/feature_group.py +++ b/python/hsfs/feature_group.py @@ -3276,7 +3276,7 @@ def delta_vacuum( fg = fs.get_or_create_feature_group(...) commit_details = fg.delta_vacuum(retention_hours = 168) - + ``` # Arguments retention_hours: User provided retention period. The default retention threshold for the files is 7 days.