Skip to content

Commit

Permalink
Merge branch 'main' into sort-docs
Browse files Browse the repository at this point in the history
  • Loading branch information
SirOibaf authored Nov 1, 2024
2 parents 543e7b2 + e278ac0 commit 91e6aa5
Show file tree
Hide file tree
Showing 15 changed files with 82 additions and 72 deletions.
2 changes: 1 addition & 1 deletion locust_benchmark/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM locustio/locust:2.17.0
FROM locustio/locust:2.23.1

USER root

Expand Down
20 changes: 20 additions & 0 deletions locust_benchmark/Jenkinsfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
@Library("jenkins-library@main")

import com.logicalclocks.jenkins.k8s.ImageBuilder


node("local") {
stage('Clone repository') {
checkout scm
}

stage('Build and push image(s)') {
version = readFile "${env.WORKSPACE}/locust_benchmark/KUBE_IMAGE_VERSION"
withEnv(["VERSION=${version.trim()}"]) {

def builder = new ImageBuilder(this)
m = readFile "${env.WORKSPACE}/locust_benchmark/build-manifest.json"
builder.run(m)
}
}
}
1 change: 1 addition & 0 deletions locust_benchmark/KUBE_IMAGE_VERSION
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
master
1 change: 1 addition & 0 deletions locust_benchmark/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ echo "[YOUR KEY]" > .api_key
- `schema_repetitions`: This controls the number of features for the lookup. One schema repetition will result in 10 features plus primary key. Five repetitions will result in 50 features plus primary key.
- `recreate_feature_group`: This controls if the previous feature group should be dropped and recreated. Set this to true when rerunning the benchmark with different size of rows or schema repetitions.
- `batch_size`: This is relevant for the actual benchmark and controls how many feature vectors are looked up in the batch benchmark.
- `tablespace`: (Optional) If set creates a feature group using on-disk data.

3. Create the feature group

Expand Down
8 changes: 8 additions & 0 deletions locust_benchmark/build-manifest.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[
{
"name": "hopsworks/locust-hsfs",
"version": "env:VERSION",
"dockerFile": "locust_benchmark/Dockerfile",
"canUseCache": "true"
}
]
27 changes: 12 additions & 15 deletions locust_benchmark/common/hopsworks_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,8 @@
import pandas as pd

from locust.runners import MasterRunner, LocalRunner
import hsfs

from hsfs import client
from hsfs.client.exceptions import RestAPIError
import hopsworks


class HopsworksClient:
Expand All @@ -21,14 +19,14 @@ def __init__(self, environment=None):
environment.runner, (MasterRunner, LocalRunner)
):
print(self.hopsworks_config)
self.connection = hsfs.connection(
self.project = hopsworks.login(
project=self.hopsworks_config.get("project", "test"),
host=self.hopsworks_config.get("host", "localhost"),
port=self.hopsworks_config.get("port", 443),
api_key_file=".api_key",
engine="python",
)
self.fs = self.connection.get_feature_store()
self.fs = self.project.get_feature_store()

# test settings
self.external = self.hopsworks_config.get("external", False)
Expand All @@ -38,6 +36,7 @@ def __init__(self, environment=None):
"recreate_feature_group", False
)
self.batch_size = self.hopsworks_config.get("batch_size", 100)
self.tablespace = self.hopsworks_config.get("tablespace", None)

def get_or_create_fg(self):
locust_fg = self.fs.get_or_create_feature_group(
Expand All @@ -46,6 +45,7 @@ def get_or_create_fg(self):
primary_key=["ip"],
online_enabled=True,
stream=True,
online_config={'table_space': self.tablespace} if self.tablespace else None
)
return locust_fg

Expand All @@ -59,18 +59,15 @@ def insert_data(self, locust_fg):
return locust_fg

def get_or_create_fv(self, fg=None):
try:
return self.fs.get_feature_view("locust_fv", version=1)
except RestAPIError:
return self.fs.create_feature_view(
name="locust_fv",
query=fg.select_all(),
version=1,
)
if fg is None:
fg = self.get_or_create_fg()
return self.fs.get_or_create_feature_view(
name="locust_fv", version=1, query=fg.select_all()
)

def close(self):
if client._client is not None:
self.connection.close()
if self.project is not None:
hopsworks.logout()

def generate_insert_df(self, rows, schema_repetitions):
data = {"ip": range(0, rows)}
Expand Down
9 changes: 5 additions & 4 deletions locust_benchmark/hopsworks_config.json
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
{
"host": "localhost",
"host": "mercury.hops.works",
"port": 443,
"project": "test",
"project": "fabio_demo",
"external": true,
"rows": 100000,
"rows": 1000,
"schema_repetitions": 1,
"recreate_feature_group": true,
"batch_size": 100
"batch_size": 100,
"tablespace": "ts1"
}
37 changes: 11 additions & 26 deletions locust_benchmark/locustfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,16 @@
from common.hopsworks_client import HopsworksClient
from common.stop_watch import stopwatch
from locust import HttpUser, User, task, constant, events
from locust.runners import MasterRunner, LocalRunner
from locust.runners import MasterRunner
from urllib3 import PoolManager
import nest_asyncio


@events.init.add_listener
def on_locust_init(environment, **kwargs):
print("Locust process init")

if isinstance(environment.runner, (MasterRunner, LocalRunner)):
# create feature view
environment.hopsworks_client = HopsworksClient(environment)
fg = environment.hopsworks_client.get_or_create_fg()
environment.hopsworks_client.get_or_create_fv(fg)
environment.hopsworks_client = HopsworksClient(environment)
environment.hopsworks_client.get_or_create_fg()


@events.quitting.add_listener
Expand Down Expand Up @@ -61,59 +57,48 @@ def get_feature_vector(self):


class MySQLFeatureVectorLookup(User):
wait_time = constant(0)
weight = 5
# fixed_count = 1
wait_time = constant(0.001)
weight = 2

def __init__(self, environment):
super().__init__(environment)
self.env = environment
self.client = HopsworksClient(environment)
self.fv = self.client.get_or_create_fv()
self.client = environment.hopsworks_client

def on_start(self):
print("Init user")
self.fv = self.client.get_or_create_fv()
self.fv.init_serving(external=self.client.external)
nest_asyncio.apply()

def on_stop(self):
print("Closing user")

@task
def get_feature_vector(self):
self._get_feature_vector({"ip": random.randint(0, self.client.rows - 1)})
return self._get_feature_vector({"ip": random.randint(0, self.client.rows - 1)})

@stopwatch
def _get_feature_vector(self, pk):
self.fv.get_feature_vector(pk)


class MySQLFeatureVectorBatchLookup(User):
wait_time = constant(0)
wait_time = constant(0.001)
weight = 1
# fixed_count = 1

def __init__(self, environment):
super().__init__(environment)
self.env = environment
self.client = HopsworksClient(environment)
self.client = environment.hopsworks_client
self.fv = self.client.get_or_create_fv()

def on_start(self):
print("Init user")
self.fv.init_serving(external=self.client.external)
nest_asyncio.apply()

def on_stop(self):
print("Closing user")

@task
def get_feature_vector_batch(self):
pks = [
{"ip": random.randint(0, self.client.rows - 1)}
for i in range(self.client.batch_size)
]
self._get_feature_vectors(pks)
return self._get_feature_vectors(pks)

@stopwatch
def _get_feature_vectors(self, pk):
Expand Down
3 changes: 2 additions & 1 deletion locust_benchmark/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
markupsafe==2.0.1
locust==2.17.0
locust==2.23.1
nest_asyncio==1.6.0
git+https://github.com/logicalclocks/hopsworks-api@main#egg=hopsworks[python]&subdirectory=python
13 changes: 12 additions & 1 deletion python/hopsworks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import tempfile
import warnings
from pathlib import Path
from typing import Literal, Union

from hopsworks import client, constants, project, version
from hopsworks.client.exceptions import (
Expand Down Expand Up @@ -83,6 +84,7 @@ def login(
api_key_file: str = None,
hostname_verification: bool = False,
trust_store_path: str = None,
engine: Union[None, Literal["spark"], Literal["python"], Literal["training"]] = None,
) -> project.Project:
"""Connect to [Serverless Hopsworks](https://app.hopsworks.ai) by calling the `hopsworks.login()` function with no arguments.
Expand Down Expand Up @@ -122,6 +124,13 @@ def login(
api_key_file: Path to file wih Api Key
hostname_verification: Whether to verify Hopsworks' certificate
trust_store_path: Path on the file system containing the Hopsworks certificates
engine: Which engine to use, `"spark"`, `"python"` or `"training"`. Defaults to `None`,
which initializes the engine to Spark if the environment provides Spark, for
example on Hopsworks and Databricks, or falls back to Python if Spark is not
available, e.g. on local Python environments or AWS SageMaker. This option
allows you to override this behaviour. `"training"` engine is useful when only
feature store metadata is needed, for example training dataset location and label
information when Hopsworks training experiment is conducted.
# Returns
`Project`: The Project object to perform operations on
# Raises
Expand All @@ -138,7 +147,7 @@ def login(

# If inside hopsworks, just return the current project for now
if "REST_ENDPOINT" in os.environ:
_hw_connection = _hw_connection(hostname_verification=hostname_verification)
_hw_connection = _hw_connection(hostname_verification=hostname_verification, engine=engine)
_connected_project = _hw_connection.get_project()
_initialize_module_apis()
print("\nLogged in to project, explore it here " + _connected_project.get_url())
Expand Down Expand Up @@ -207,6 +216,7 @@ def login(
_hw_connection = _hw_connection(
host=host,
port=port,
engine=engine,
api_key_file=api_key_path,
hostname_verification=hostname_verification,
trust_store_path=trust_store_path,
Expand Down Expand Up @@ -246,6 +256,7 @@ def login(
_hw_connection = _hw_connection(
host=host,
port=port,
engine=engine,
api_key_value=api_key,
hostname_verification=hostname_verification,
trust_store_path=trust_store_path,
Expand Down
20 changes: 2 additions & 18 deletions python/hopsworks_common/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ class Connection:
Defaults to `None`.
engine: Which engine to use, `"spark"`, `"python"` or `"training"`. Defaults to `None`,
which initializes the engine to Spark if the environment provides Spark, for
example on Hopsworks and Databricks, or falls back on Hive in Python if Spark is not
example on Hopsworks and Databricks, or falls back to Python if Spark is not
available, e.g. on local Python environments or AWS SageMaker. This option
allows you to override this behaviour. `"training"` engine is useful when only
feature store metadata is needed, for example training dataset location and label
Expand Down Expand Up @@ -151,7 +151,6 @@ def __init__(
def get_feature_store(
self,
name: Optional[str] = None,
engine: Optional[str] = None,
): # -> feature_store.FeatureStore
# the typing is commented out due to circular dependency, it breaks auto_doc.py
"""Get a reference to a feature store to perform operations on.
Expand All @@ -161,25 +160,10 @@ def get_feature_store(
# Arguments
name: The name of the feature store, defaults to `None`.
engine: Which engine to use, `"spark"`, `"python"` or `"training"`. Defaults to `None`,
which initializes the engine to Spark if the environment provides Spark, for
example on Hopsworks and Databricks, or falls back on Hive in Python if Spark is not
available, e.g. on local Python environments or AWS SageMaker. This option
allows you to override this behaviour. `"training"` engine is useful when only
feature store metadata is needed, for example training dataset location and label
information when Hopsworks training experiment is conducted.
# Returns
`FeatureStore`. A feature store handle object to perform operations on.
"""
# Ensure the engine is initialized and of right type
from hsfs import engine as hsfs_engine

if engine:
global _hsfs_engine_type
_hsfs_engine_type = engine
hsfs_engine.get_instance()

if not name:
name = client.get_instance()._project_name
return self._feature_store_api.get(util.append_feature_store_suffix(name))
Expand Down Expand Up @@ -532,7 +516,7 @@ def connection(
Defaults to `None`.
engine: Which engine to use, `"spark"`, `"python"` or `"training"`. Defaults to `None`,
which initializes the engine to Spark if the environment provides Spark, for
example on Hopsworks and Databricks, or falls back on Hive in Python if Spark is not
example on Hopsworks and Databricks, or falls back to Python if Spark is not
available, e.g. on local Python environments or AWS SageMaker. This option
allows you to override this behaviour. `"training"` engine is useful when only
feature store metadata is needed, for example training dataset location and label
Expand Down
7 changes: 2 additions & 5 deletions python/hopsworks_common/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def project_namespace(self):
return self._project_namespace

def get_feature_store(
self, name: Optional[str] = None, engine: Optional[str] = None
self, name: Optional[str] = None
): # -> hsfs.feature_store.FeatureStore
"""Connect to Project's Feature Store.
Expand All @@ -127,15 +127,12 @@ def get_feature_store(
# Arguments
name: Project name of the feature store.
engine: Which engine to use, `"spark"`, `"python"` or `"training"`.
Defaults to `"python"` when connected to [Serverless Hopsworks](https://app.hopsworks.ai).
See [`hopsworks.connection`](connection.md#connection) documentation for more information.
# Returns
`hsfs.feature_store.FeatureStore`: The Feature Store API
# Raises
`RestAPIError`: If unable to connect
"""
return client.get_connection().get_feature_store(name, engine)
return client.get_connection().get_feature_store(name)

def get_model_registry(self):
"""Connect to Project's Model Registry API.
Expand Down
2 changes: 1 addition & 1 deletion python/hsfs/feature_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -3276,7 +3276,7 @@ def delta_vacuum(
fg = fs.get_or_create_feature_group(...)
commit_details = fg.delta_vacuum(retention_hours = 168)
```
# Arguments
retention_hours: User provided retention period. The default retention threshold for the files is 7 days.
Expand Down
1 change: 1 addition & 0 deletions python/hsfs/storage_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,7 @@ def prepare_spark(self, path: Optional[str] = None) -> Optional[str]:
# Arguments
path: Path to prepare for reading from cloud storage. Defaults to `None`.
"""
self.refetch()
return engine.get_instance().setup_storage_connector(self, path)

def connector_options(self) -> Dict[str, Any]:
Expand Down
Loading

0 comments on commit 91e6aa5

Please sign in to comment.