Skip to content

Commit

Permalink
Merge remote-tracking branch 'feature-store-api/master' into the-merge
Browse files Browse the repository at this point in the history
  • Loading branch information
aversey committed Jul 5, 2024
2 parents fab4b95 + 65a05ca commit c16dab6
Show file tree
Hide file tree
Showing 30 changed files with 1,026 additions and 473 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
Expand All @@ -60,11 +61,15 @@ public class HopsworksInternalClient implements HopsworksHttpClient {
private static final Logger LOGGER = LoggerFactory.getLogger(HopsworksInternalClient.class.getName());

private static final String DOMAIN_CA_TRUSTSTORE = "hopsworks.domain.truststore";
private static final String TOKEN_PATH = "token.jwt";
private static final String TOKEN_PATH = (Paths.get(System.getenv("MATERIAL_DIRECTORY"),
"token.jwt")).toString();

private static final String MATERIAL_PASSWD = "material_passwd";
private static final String T_CERTIFICATE = "t_certificate";
private static final String K_CERTIFICATE = "k_certificate";
private static final String MATERIAL_PASSWD = (Paths.get(System.getenv("MATERIAL_DIRECTORY"),
"material_passwd")).toString();
private static final String T_CERTIFICATE = (Paths.get(System.getenv("MATERIAL_DIRECTORY"),
"t_certificate")).toString();
private static final String K_CERTIFICATE = (Paths.get(System.getenv("MATERIAL_DIRECTORY"),
"k_certificate")).toString();

private PoolingHttpClientConnectionManager connectionPool = null;

Expand Down Expand Up @@ -109,6 +114,9 @@ private Registry<ConnectionSocketFactory> createConnectionFactory()
Properties systemProperties = System.getProperties();

Path trustStorePath = Paths.get(systemProperties.getProperty(DOMAIN_CA_TRUSTSTORE));
if (trustStorePath == null || !Files.exists(trustStorePath)) {
trustStorePath = Paths.get(T_CERTIFICATE);
}
LOGGER.info("Trust store path: " + trustStorePath);
SSLContext sslCtx = SSLContexts.custom()
.loadTrustMaterial(trustStorePath.toFile(), null, new TrustSelfSignedStrategy())
Expand Down
2 changes: 1 addition & 1 deletion hsfs/python/.pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
exclude: setup.py
repos:
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.4.2
rev: v0.5.0
hooks:
- id: ruff
args: [--fix]
Expand Down
2 changes: 1 addition & 1 deletion hsfs/python/hsfs/client/external.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def __init__(
# are needed when the application starts (before user code is run)
# So in this case, we can't materialize the certificates on the fly.
_logger.debug("Running in Spark environment, initializing Spark session")
_spark_session = SparkSession.builder.enableHiveSupport.getOrCreate()
_spark_session = SparkSession.builder.enableHiveSupport().getOrCreate()

self._validate_spark_configuration(_spark_session)
with open(
Expand Down
6 changes: 3 additions & 3 deletions hsfs/python/hsfs/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,9 @@ def connect(self) -> None:
self._engine is None and importlib.util.find_spec("pyspark")
):
self._engine = "spark"
elif (
self._engine is not None and self._engine.lower() in ["hive", "python"]
) or (self._engine is None and not importlib.util.find_spec("pyspark")):
elif (self._engine is not None and self._engine.lower() == "python") or (
self._engine is None and not importlib.util.find_spec("pyspark")
):
self._engine = "python"
elif self._engine is not None and self._engine.lower() == "training":
self._engine = "training"
Expand Down
4 changes: 0 additions & 4 deletions hsfs/python/hsfs/constructor/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,12 +162,8 @@ def read(
dataframe_type: DataFrame type to return. Defaults to `"default"`.
read_options: Dictionary of read options for Spark in spark engine.
Only for python engine:
* key `"use_hive"` and value `True` to read query with Hive instead of
[Hopsworks Feature Query Service](https://docs.hopsworks.ai/latest/setup_installation/common/arrow_flight_duckdb/).
* key `"arrow_flight_config"` to pass a dictionary of arrow flight configurations.
For example: `{"arrow_flight_config": {"timeout": 900}}`
* key "hive_config" to pass a dictionary of hive or tez configurations.
For example: `{"hive_config": {"hive.tez.cpu.vcores": 2, "tez.grouping.split-count": "3"}}`
Defaults to `{}`.
# Returns
Expand Down
16 changes: 6 additions & 10 deletions hsfs/python/hsfs/core/arrow_flight_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,7 @@ def _should_retry_healthcheck_or_certificate_registration(exception):
def is_data_format_supported(data_format: str, read_options: Optional[Dict[str, Any]]):
if data_format not in ArrowFlightClient.SUPPORTED_FORMATS:
return False
elif read_options and (
read_options.get("use_hive", False) or read_options.get("use_spark", False)
):
elif read_options and read_options.get("use_spark", False):
return False
else:
return get_instance()._should_be_used()
Expand Down Expand Up @@ -122,9 +120,7 @@ def _is_query_supported_rec(query: query.Query):


def is_query_supported(query: query.Query, read_options: Optional[Dict[str, Any]]):
if read_options and (
read_options.get("use_hive", False) or read_options.get("use_spark", False)
):
if read_options and read_options.get("use_spark", False):
return False
elif not _is_query_supported_rec(query):
return False
Expand All @@ -138,12 +134,12 @@ class ArrowFlightClient:
StorageConnector.SNOWFLAKE,
StorageConnector.BIGQUERY,
]
READ_ERROR = 'Could not read data using Hopsworks Feature Query Service. If the issue persists, use read_options={"use_hive": True} instead.'
READ_ERROR = "Could not read data using Hopsworks Feature Query Service."
WRITE_ERROR = 'Could not write data using Hopsworks Feature Query Service. If the issue persists, use write_options={"use_spark": True} instead.'
DEFAULTING_TO_DIFFERENT_SERVICE_WARNING = (
"Defaulting to Hive/Spark execution for this call."
"Defaulting to Spark execution for this call."
)
CLIENT_WILL_STAY_ACTIVE_WARNING = 'The client will remain active for future calls. If the issue persists, use read_options={"use_hive": True} or write_options={"use_spark": True}.'
CLIENT_WILL_STAY_ACTIVE_WARNING = 'The client will remain active for future calls. If the issue persists write_options={"use_spark": True}.'
DEFAULT_TIMEOUT_SECONDS = 900
DEFAULT_HEALTHCHECK_TIMEOUT_SECONDS = 5
DEFAULT_GRPC_MIN_RECONNECT_BACKOFF_MS = 2000
Expand Down Expand Up @@ -192,7 +188,7 @@ def __init__(self, disabled_for_session: bool = False):
self._health_check()
self._register_certificates()
except Exception as e:
_logger.debug("Failed to connect to Hopsworks Feature Query Service")
_logger.debug("Failed to connect to Hopsworks Feature Query Service.")
_logger.exception(e)
warnings.warn(
f"Failed to connect to Hopsworks Feature Query Service, got {str(e)}."
Expand Down
7 changes: 7 additions & 0 deletions hsfs/python/hsfs/core/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,10 @@
"You will need to restart your kernel if applicable."
)
initialise_expectation_suite_for_single_expectation_api_message = "Initialize Expectation Suite by attaching to a Feature Group to enable single expectation API"

# Numpy
HAS_NUMPY: bool = importlib.util.find_spec("numpy") is not None

# SQL packages
HAS_SQLALCHEMY: bool = importlib.util.find_spec("sqlalchemy") is not None
HAS_AIOMYSQL: bool = importlib.util.find_spec("aiomysql") is not None
53 changes: 53 additions & 0 deletions hsfs/python/hsfs/core/feature_logging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import json
from typing import Any, Dict

import humps
from hsfs import feature_group, util


class FeatureLogging:

def __init__(self, id: int,
transformed_features: "feature_group.FeatureGroup",
untransformed_features: "feature_group.FeatureGroup"):
self._id = id
self._transformed_features = transformed_features
self._untransformed_features = untransformed_features

@classmethod
def from_response_json(cls, json_dict: Dict[str, Any]) -> 'FeatureLogging':
from hsfs.feature_group import FeatureGroup # avoid circular import
json_decamelized = humps.decamelize(json_dict)
transformed_features = json_decamelized.get('transformed_log')
untransformed_features = json_decamelized.get('untransformed_log')
if transformed_features:
transformed_features = FeatureGroup.from_response_json(transformed_features)
if untransformed_features:
untransformed_features = FeatureGroup.from_response_json(untransformed_features)
return cls(json_decamelized.get('id'), transformed_features, untransformed_features)

@property
def transformed_features(self) -> "feature_group.FeatureGroup":
return self._transformed_features

@property
def untransformed_features(self) -> "feature_group.FeatureGroup":
return self._untransformed_features

@property
def id(self) -> str:
return self._id

def to_dict(self):
return {
'id': self._id,
'transformed_log': self._transformed_features,
'untransformed_log': self._untransformed_features,
}

def json(self) -> Dict[str, Any]:
return json.dumps(self, cls=util.FeatureStoreEncoder)

def __repr__(self):
return self.json()

103 changes: 103 additions & 0 deletions hsfs/python/hsfs/core/feature_view_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from hsfs.client.exceptions import RestAPIError
from hsfs.constructor import query, serving_prepared_statement
from hsfs.core import explicit_provenance, job, training_dataset_job_conf
from hsfs.core.job import Job


class FeatureViewApi:
Expand All @@ -43,6 +44,13 @@ class FeatureViewApi:
_COMPUTE = "compute"
_PROVENANCE = "provenance"
_LINKS = "links"
_LOGGING = "log"
_PAUSE_LOGGING = "pause"
_RESUME_LOGGING = "resume"
_MATERIALIZE_LOGGING = "materialize"
_TRANSFORMED_lOG = "transformed"
_UNTRANSFORMED_LOG = "untransformed"


def __init__(self, feature_store_id: int) -> None:
self._feature_store_id = feature_store_id
Expand Down Expand Up @@ -358,3 +366,98 @@ def get_models_provenance(
explicit_provenance.Links.Type.MODEL,
training_dataset_version=training_dataset_version,
)

def enable_feature_logging(
self,
feature_view_name: str,
feature_view_version: int,):
_client = client.get_instance()
path_params = self._base_path + [
feature_view_name,
self._VERSION,
feature_view_version,
self._LOGGING,
]
_client._send_request("PUT", path_params, {})

def pause_feature_logging(
self,
feature_view_name: str,
feature_view_version: int,):
_client = client.get_instance()
path_params = self._base_path + [
feature_view_name,
self._VERSION,
feature_view_version,
self._LOGGING,
self._PAUSE_LOGGING,
]
return _client._send_request("POST", path_params, {})

def resume_feature_logging(
self,
feature_view_name: str,
feature_view_version: int,):
_client = client.get_instance()
path_params = self._base_path + [
feature_view_name,
self._VERSION,
feature_view_version,
self._LOGGING,
self._RESUME_LOGGING,
]
return _client._send_request("POST", path_params, {})

def materialize_feature_logging(
self,
feature_view_name: str,
feature_view_version: int,):
_client = client.get_instance()
path_params = self._base_path + [
feature_view_name,
self._VERSION,
feature_view_version,
self._LOGGING,
self._MATERIALIZE_LOGGING,
]
jobs_json = _client._send_request("POST", path_params, {})
jobs = []
if jobs_json.get("count", 0) > 1:
for item in jobs_json["items"]:
jobs.append(Job.from_response_json(item))
else:
jobs.append(Job.from_response_json(jobs_json))
return jobs

def get_feature_logging(
self,
feature_view_name: str,
feature_view_version: int,):
_client = client.get_instance()
path_params = self._base_path + [
feature_view_name,
self._VERSION,
feature_view_version,
self._LOGGING,
]
return _client._send_request("GET", path_params, {})

def delete_feature_logs(
self,
feature_view_name: str,
feature_view_version: int,
transformed: bool = None,
):
_client = client.get_instance()
path_params = self._base_path + [
feature_view_name,
self._VERSION,
feature_view_version,
self._LOGGING,
]
if transformed is not None:
if transformed:
path_params += [self._TRANSFORMED_lOG]
else:
path_params += [self._UNTRANSFORMED_LOG]
_client._send_request("DELETE", path_params, {})
Loading

0 comments on commit c16dab6

Please sign in to comment.