From 19a45c477ab2e1308a422e0b404e5f4569ca53b6 Mon Sep 17 00:00:00 2001
From: Victor Jouffrey <37411285+vatj@users.noreply.github.com>
Date: Thu, 27 Jun 2024 11:47:09 +0200
Subject: [PATCH] [FSTORE-1423] Refactor Great Expectations to make it an
optional dependency (#1344)
* TYPE_CHECKING for fg and expectation suite
* Adding constants for recurring error messages
* More type_checking
* Add more TYPE_CHECKING
* Expectation Suite rely on Great Expectations only if installed
* Add ge_installed check before reading data to run FG validation
* more refactoring and import checks
* Minor fixes
* Change to import error and move to import checker to own file
* Better to use modulenotfoundError
* Switch import in test
* refactor
* fix unit tests
* Throw module not found error if GE is not installed and Expectation Suite is expected to run
* Add github actions to run pytest in an env with no great_expectations
* Start skipping test which require great_expectations
* Module not found error catch and skip in python engine
* Add skipif to avoid error if GE installed
* Apply code review
* Update spark engine
* Remove extra import
* Fix extras
* to see or not to see underscores
* fix workflow pyspark
* Test fixing autodoc
* More testing to fix autodoc
* Give up on docs
* Fix HAS_GREAT_EXPECTATIONS in tests
---
.github/workflows/optional-dependency.yml | 29 +++
CONTRIBUTING.md | 2 +-
auto_doc.py | 97 ++++----
docs/CONTRIBUTING.md | 2 +-
docs/index.md | 52 ++++-
.../external_feature_group_alias.py | 4 +-
python/hsfs/core/constants.py | 15 ++
.../core/feature_monitoring_config_engine.py | 4 +-
python/hsfs/core/great_expectation_engine.py | 53 +++--
.../core/monitoring_window_config_engine.py | 6 +-
python/hsfs/core/validation_report_engine.py | 29 ++-
python/hsfs/core/validation_result_engine.py | 13 +-
python/hsfs/decorators.py | 15 ++
python/hsfs/engine/python.py | 44 +++-
python/hsfs/engine/spark.py | 48 ++--
python/hsfs/expectation_suite.py | 158 +++++++------
python/hsfs/feature_group.py | 209 ++++++++++++------
python/hsfs/feature_store.py | 85 +++----
python/hsfs/feature_view.py | 2 +-
python/hsfs/ge_expectation.py | 22 +-
python/hsfs/ge_validation_result.py | 43 ++--
python/hsfs/util.py | 11 +-
python/hsfs/validation_report.py | 54 +++--
python/pyproject.toml | 32 +--
.../core/test_great_expectation_engine.py | 103 +++++++--
python/tests/engine/test_python.py | 40 +++-
python/tests/engine/test_spark.py | 5 +
python/tests/test_feature_group.py | 10 +-
python/tests/test_ge_expectation.py | 13 +-
python/tests/test_validation_report.py | 2 +-
requirements-docs.txt | 3 +-
31 files changed, 810 insertions(+), 395 deletions(-)
create mode 100644 .github/workflows/optional-dependency.yml
create mode 100644 python/hsfs/core/constants.py
diff --git a/.github/workflows/optional-dependency.yml b/.github/workflows/optional-dependency.yml
new file mode 100644
index 000000000..547b02029
--- /dev/null
+++ b/.github/workflows/optional-dependency.yml
@@ -0,0 +1,29 @@
+name: optional-dependency
+
+on: pull_request
+
+jobs:
+ unit_tests_no_great_expectations:
+ name: Unit Testing (No Great Expectations)
+ runs-on: ubuntu-latest
+
+ steps:
+ - name: Set Timezone
+ run: sudo timedatectl set-timezone UTC
+
+ - uses: actions/checkout@v4
+ - name: Copy README
+ run: cp README.md python/
+
+ - uses: actions/setup-python@v5
+ name: Setup Python
+ with:
+ python-version: "3.10"
+ cache: "pip"
+ cache-dependency-path: "python/setup.py"
+ - run: pip install -e python[python,dev-no-opt]
+
+ - name: Run Pytest suite
+ env:
+ ENABLE_HOPSWORKS_USAGE: "false"
+ run: pytest python/tests
\ No newline at end of file
diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md
index 3fa2df1ee..0df3de08e 100644
--- a/CONTRIBUTING.md
+++ b/CONTRIBUTING.md
@@ -81,7 +81,7 @@ We use `mkdocs` together with `mike` ([for versioning](https://github.com/jimpor
2. Install HSFS with `docs` extras:
```bash
- pip install -e ".[python,dev,docs]"
+ pip install -e ".[python,dev]" && pip install -r ../requirements-docs.txt
```
3. To build the docs, first run the auto doc script:
diff --git a/auto_doc.py b/auto_doc.py
index cde0170e4..a98af258b 100644
--- a/auto_doc.py
+++ b/auto_doc.py
@@ -4,7 +4,6 @@
import keras_autodoc
-
PAGES = {
"api/connection_api.md": {
"connection": ["hsfs.connection.Connection"],
@@ -13,54 +12,6 @@
),
"connection_methods": keras_autodoc.get_methods("hsfs.connection.Connection"),
},
- "api/expectation_suite_api.md": {
- "expectation_suite": ["hsfs.expectation_suite.ExpectationSuite"],
- "expectation_suite_attach": [
- "hsfs.feature_group.FeatureGroup.save_expectation_suite"
- ],
- "single_expectation_api": [
- "hsfs.expectation_suite.ExpectationSuite.add_expectation",
- "hsfs.expectation_suite.ExpectationSuite.replace_expectation",
- "hsfs.expectation_suite.ExpectationSuite.remove_expectation",
- ],
- "expectation_suite_properties": keras_autodoc.get_properties(
- "hsfs.expectation_suite.ExpectationSuite"
- ),
- "expectation_suite_methods": keras_autodoc.get_methods(
- "hsfs.expectation_suite.ExpectationSuite"
- ),
- },
- "api/feature_store_api.md": {
- "fs": ["hsfs.feature_store.FeatureStore"],
- "fs_get": ["hsfs.connection.Connection.get_feature_store"],
- "fs_properties": keras_autodoc.get_properties(
- "hsfs.feature_store.FeatureStore"
- ),
- "fs_methods": keras_autodoc.get_methods("hsfs.feature_store.FeatureStore"),
- },
- "api/feature_group_api.md": {
- "fg": ["hsfs.feature_group.FeatureGroup"],
- "fg_create": [
- "hsfs.feature_store.FeatureStore.create_feature_group",
- "hsfs.feature_store.FeatureStore.get_or_create_feature_group",
- ],
- "fg_get": ["hsfs.feature_store.FeatureStore.get_feature_group"],
- "fg_properties": keras_autodoc.get_properties(
- "hsfs.feature_group.FeatureGroup"
- ),
- "fg_methods": keras_autodoc.get_methods("hsfs.feature_group.FeatureGroup"),
- },
- "api/external_feature_group_api.md": {
- "fg": ["hsfs.feature_group.ExternalFeatureGroup"],
- "fg_create": ["hsfs.feature_store.FeatureStore.create_external_feature_group"],
- "fg_get": ["hsfs.feature_store.FeatureStore.get_external_feature_group"],
- "fg_properties": keras_autodoc.get_properties(
- "hsfs.feature_group.ExternalFeatureGroup"
- ),
- "fg_methods": keras_autodoc.get_methods(
- "hsfs.feature_group.ExternalFeatureGroup"
- ),
- },
"api/spine_group_api.md": {
"fg": ["hsfs.feature_group.SpineGroup"],
"fg_create": ["hsfs.feature_store.FeatureStore.get_or_create_spine_group"],
@@ -120,6 +71,54 @@
"feature_properties": keras_autodoc.get_properties("hsfs.feature.Feature"),
"feature_methods": keras_autodoc.get_methods("hsfs.feature.Feature"),
},
+ "api/expectation_suite_api.md": {
+ "expectation_suite": ["hsfs.expectation_suite.ExpectationSuite"],
+ "expectation_suite_attach": [
+ "hsfs.feature_group.FeatureGroup.save_expectation_suite"
+ ],
+ "single_expectation_api": [
+ "hsfs.expectation_suite.ExpectationSuite.add_expectation",
+ "hsfs.expectation_suite.ExpectationSuite.replace_expectation",
+ "hsfs.expectation_suite.ExpectationSuite.remove_expectation",
+ ],
+ "expectation_suite_properties": keras_autodoc.get_properties(
+ "hsfs.expectation_suite.ExpectationSuite"
+ ),
+ "expectation_suite_methods": keras_autodoc.get_methods(
+ "hsfs.expectation_suite.ExpectationSuite"
+ ),
+ },
+ "api/feature_store_api.md": {
+ "fs": ["hsfs.feature_store.FeatureStore"],
+ "fs_get": ["hsfs.connection.Connection.get_feature_store"],
+ "fs_properties": keras_autodoc.get_properties(
+ "hsfs.feature_store.FeatureStore"
+ ),
+ "fs_methods": keras_autodoc.get_methods("hsfs.feature_store.FeatureStore"),
+ },
+ "api/feature_group_api.md": {
+ "fg": ["hsfs.feature_group.FeatureGroup"],
+ "fg_create": [
+ "hsfs.feature_store.FeatureStore.create_feature_group",
+ "hsfs.feature_store.FeatureStore.get_or_create_feature_group",
+ ],
+ "fg_get": ["hsfs.feature_store.FeatureStore.get_feature_group"],
+ "fg_properties": keras_autodoc.get_properties(
+ "hsfs.feature_group.FeatureGroup"
+ ),
+ "fg_methods": keras_autodoc.get_methods("hsfs.feature_group.FeatureGroup"),
+ },
+ "api/external_feature_group_api.md": {
+ "fg": ["hsfs.feature_group.ExternalFeatureGroup"],
+ "fg_create": ["hsfs.feature_store.FeatureStore.create_external_feature_group"],
+ "fg_get": ["hsfs.feature_store.FeatureStore.get_external_feature_group"],
+ "fg_properties": keras_autodoc.get_properties(
+ "hsfs.feature_group.ExternalFeatureGroup"
+ ),
+ "fg_methods": keras_autodoc.get_methods(
+ "hsfs.feature_group.ExternalFeatureGroup"
+ ),
+ },
"api/storage_connector_api.md": {
"sc_get": [
"hsfs.feature_store.FeatureStore.get_storage_connector",
diff --git a/docs/CONTRIBUTING.md b/docs/CONTRIBUTING.md
index 3fa2df1ee..0df3de08e 100644
--- a/docs/CONTRIBUTING.md
+++ b/docs/CONTRIBUTING.md
@@ -81,7 +81,7 @@ We use `mkdocs` together with `mike` ([for versioning](https://github.com/jimpor
2. Install HSFS with `docs` extras:
```bash
- pip install -e ".[python,dev,docs]"
+ pip install -e ".[python,dev]" && pip install -r ../requirements-docs.txt
```
3. To build the docs, first run the auto doc script:
diff --git a/docs/index.md b/docs/index.md
index eab7e1942..a13ea2ce5 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -9,6 +9,10 @@
src="https://img.shields.io/badge/docs-HSFS-orange"
alt="Hopsworks Feature Store Documentation"
/>
+
-
None:
self._on_demand_feature_group: Union[
- "feature_group.ExternalFeatureGroup", "feature_group.SpineGroup"
+ feature_group.ExternalFeatureGroup, "feature_group.SpineGroup"
]
if not on_demand_feature_group["spine"]:
self._on_demand_feature_group = (
@@ -48,7 +48,7 @@ def from_response_json(cls, json_dict: Dict[str, Any]) -> ExternalFeatureGroupAl
@property
def on_demand_feature_group(
self,
- ) -> Union["feature_group.ExternalFeatureGroup", "feature_group.SpineGroup"]:
+ ) -> Union[feature_group.ExternalFeatureGroup, "feature_group.SpineGroup"]:
return self._on_demand_feature_group
@property
diff --git a/python/hsfs/core/constants.py b/python/hsfs/core/constants.py
new file mode 100644
index 000000000..66a997543
--- /dev/null
+++ b/python/hsfs/core/constants.py
@@ -0,0 +1,15 @@
+import importlib.util
+
+
+# Data Validation / Great Expectations
+HAS_GREAT_EXPECTATIONS: bool = (
+ importlib.util.find_spec("great_expectations") is not None
+)
+great_expectations_not_installed_message = (
+ "Great Expectations package not found. "
+ "If you want to use data validation with Hopsworks you can install the corresponding extras "
+ """`pip install hopsworks[great_expectations]` or `pip install "hopsworks[great_expectations]"` if using zsh. """
+ "You can also install great-expectations directly in your environment e.g `pip install great-expectations`. "
+ "You will need to restart your kernel if applicable."
+)
+initialise_expectation_suite_for_single_expectation_api_message = "Initialize Expectation Suite by attaching to a Feature Group to enable single expectation API"
diff --git a/python/hsfs/core/feature_monitoring_config_engine.py b/python/hsfs/core/feature_monitoring_config_engine.py
index 1a1073bf9..25e9e8bfc 100644
--- a/python/hsfs/core/feature_monitoring_config_engine.py
+++ b/python/hsfs/core/feature_monitoring_config_engine.py
@@ -344,13 +344,13 @@ def get_monitoring_job(
def run_feature_monitoring(
self,
- entity: Union["feature_group.FeatureGroup", "feature_view.FeatureView"],
+ entity: Union[feature_group.FeatureGroup, "feature_view.FeatureView"],
config_name: str,
) -> List[FeatureMonitoringResult]:
"""Main function used by the job to actually perform the monitoring.
Args:
- entity: Union["feature_group.FeatureGroup", "feature_view.FeatureView"]
+ entity: Union[feature_group.FeatureGroup, "feature_view.FeatureView"]
Featuregroup or Featureview object containing the feature to monitor.
config_name: str: name of the monitoring config.
diff --git a/python/hsfs/core/great_expectation_engine.py b/python/hsfs/core/great_expectation_engine.py
index 50fe99f36..b89e5f39f 100644
--- a/python/hsfs/core/great_expectation_engine.py
+++ b/python/hsfs/core/great_expectation_engine.py
@@ -15,11 +15,18 @@
#
from __future__ import annotations
-from typing import Any, Dict, Optional, Union
+from typing import TYPE_CHECKING, Any, Dict, Literal, Optional, Union
-import great_expectations as ge
-from hsfs import engine, validation_report
+
+if TYPE_CHECKING:
+ import great_expectations
+ import pandas as pd
+
+
+from hsfs import engine, util, validation_report
from hsfs import expectation_suite as es
+from hsfs import feature_group as fg_mod
+from hsfs.core.constants import HAS_GREAT_EXPECTATIONS
class GreatExpectationEngine:
@@ -35,17 +42,19 @@ def __init__(self, feature_store_id: int):
def validate(
self,
- feature_group,
- dataframe,
+ feature_group: Union[fg_mod.FeatureGroup, fg_mod.ExternalFeatureGroup],
+ dataframe: pd.DataFrame,
expectation_suite: Union[
- ge.core.ExpectationSuite, es.ExpectationSuite, None
+ great_expectations.core.ExpectationSuite, es.ExpectationSuite, None
] = None,
save_report: bool = False,
- validation_options: Dict[str, Any] = None,
+ validation_options: Optional[Dict[str, Any]] = None,
ge_type: bool = True,
- ingestion_result: str = "UNKNOWN",
+ ingestion_result: Literal[
+ "unknown", "ingested", "rejected", "fg_data", "experiment"
+ ] = "unknown",
) -> Union[
- ge.core.ExpectationSuiteValidationResult,
+ great_expectations.core.ExpectationSuiteValidationResult,
validation_report.ValidationReport,
None,
]:
@@ -59,6 +68,15 @@ def validate(
if self.should_run_validation(
expectation_suite=suite, validation_options=validation_options
):
+ if not HAS_GREAT_EXPECTATIONS:
+ raise ModuleNotFoundError(
+ f"Feature Group {feature_group.name}, v{feature_group.version} is configured to run validation with Great Expectations, "
+ "but Great Expectations is not installed. Please install it using `pip install great_expectations`.\n"
+ "Alternatively you can disable Great Expectations validation by setting `run_validation=False`"
+ "in the validation_options, or disable/delete the suite in the Feature Group Edit UI.\n"
+ f"{util.get_feature_group_url(feature_group.feature_store_id, feature_group.id)}."
+ )
+
report = engine.get_instance().validate_with_great_expectations(
dataframe=dataframe,
expectation_suite=suite.to_ge_type(),
@@ -89,11 +107,11 @@ def validate(
def fetch_or_convert_expectation_suite(
self,
- feature_group,
+ feature_group: Union[fg_mod.FeatureGroup, fg_mod.ExternalFeatureGroup],
expectation_suite: Union[
- ge.core.ExpectationSuite, es.ExpectationSuite, None
+ great_expectations.core.ExpectationSuite, es.ExpectationSuite, None
] = None,
- validation_options: dict = None,
+ validation_options: Optional[Dict[str, Any]] = None,
) -> Optional[es.ExpectationSuite]:
"""Convert provided expectation suite or fetch the one attached to the Feature Group from backend."""
if expectation_suite is not None:
@@ -104,7 +122,7 @@ def fetch_or_convert_expectation_suite(
"fetch_expectation_suite", True
):
return feature_group.expectation_suite
- return feature_group.get_expectation_suite(False)
+ return feature_group.get_expectation_suite(ge_type=False)
def should_run_validation(
self,
@@ -124,13 +142,16 @@ def should_run_validation(
def save_or_convert_report(
self,
feature_group,
- report: ge.core.ExpectationSuiteValidationResult,
+ report: great_expectations.core.ExpectationSuiteValidationResult,
save_report: bool,
ge_type: bool,
validation_options: Dict[str, Any],
- ingestion_result: str = "UNKNOWN",
+ ingestion_result: Literal[
+ "unknown", "ingested", "rejected", "fg_data", "experiment"
+ ] = "unknown",
) -> Union[
- ge.core.ExpectationSuiteValidationResult, validation_report.ValidationReport
+ great_expectations.core.ExpectationSuiteValidationResult,
+ validation_report.ValidationReport,
]:
save_report = validation_options.get("save_report", save_report)
if save_report:
diff --git a/python/hsfs/core/monitoring_window_config_engine.py b/python/hsfs/core/monitoring_window_config_engine.py
index 6e08fbc72..579439d4e 100644
--- a/python/hsfs/core/monitoring_window_config_engine.py
+++ b/python/hsfs/core/monitoring_window_config_engine.py
@@ -230,7 +230,7 @@ def get_window_start_end_times(
def run_single_window_monitoring(
self,
- entity: Union["feature_group.FeatureGroup", "feature_view.FeatureView"],
+ entity: Union[feature_group.FeatureGroup, "feature_view.FeatureView"],
monitoring_window_config: "mwc.MonitoringWindowConfig",
feature_name: Optional[str] = None,
) -> List[FeatureDescriptiveStatistics]:
@@ -318,7 +318,7 @@ def run_single_window_monitoring(
def fetch_entity_data_in_monitoring_window(
self,
- entity: Union["feature_group.FeatureGroup", "feature_view.FeatureView"],
+ entity: Union[feature_group.FeatureGroup, "feature_view.FeatureView"],
start_time: Optional[int],
end_time: Optional[int],
row_percentage: float,
@@ -402,7 +402,7 @@ def fetch_feature_view_data(
def fetch_feature_group_data(
self,
- entity: "feature_group.FeatureGroup",
+ entity: feature_group.FeatureGroup,
feature_name: Optional[str] = None,
start_time: Optional[int] = None,
end_time: Optional[int] = None,
diff --git a/python/hsfs/core/validation_report_engine.py b/python/hsfs/core/validation_report_engine.py
index 4f151e7ff..dd63d75f8 100644
--- a/python/hsfs/core/validation_report_engine.py
+++ b/python/hsfs/core/validation_report_engine.py
@@ -15,14 +15,22 @@
#
from __future__ import annotations
-from typing import List, Union
+from typing import TYPE_CHECKING, List, Union
+
+
+if TYPE_CHECKING:
+ import great_expectations
-import great_expectations as ge
from hsfs import client, util
from hsfs.core import validation_report_api
+from hsfs.core.constants import HAS_GREAT_EXPECTATIONS
from hsfs.validation_report import ValidationReport
+if HAS_GREAT_EXPECTATIONS:
+ import great_expectations
+
+
class ValidationReportEngine:
def __init__(self, feature_store_id: int, feature_group_id: int):
"""Validation Report engine.
@@ -39,7 +47,9 @@ def __init__(self, feature_store_id: int, feature_group_id: int):
)
def save(
- self, validation_report: ValidationReport, ge_type: bool = True
+ self,
+ validation_report: ValidationReport,
+ ge_type: bool = HAS_GREAT_EXPECTATIONS,
) -> ValidationReport:
saved_report = self._validation_report_api.create(validation_report)
url = self._get_validation_report_url()
@@ -50,8 +60,10 @@ def save(
return saved_report
def get_last(
- self, ge_type: bool = True
- ) -> Union[ValidationReport, ge.core.ExpectationSuiteValidationResult, None]:
+ self, ge_type: bool = HAS_GREAT_EXPECTATIONS
+ ) -> Union[
+ ValidationReport, great_expectations.core.ExpectationSuiteValidationResult, None
+ ]:
"""Get the most recent Validation Report of a Feature Group."""
url = self._get_validation_report_url()
print(
@@ -67,8 +79,11 @@ def get_last(
return reports[0]
def get_all(
- self, ge_type: bool = True
- ) -> Union[List[ValidationReport], List[ge.core.ExpectationSuiteValidationResult]]:
+ self, ge_type: bool = HAS_GREAT_EXPECTATIONS
+ ) -> Union[
+ List[ValidationReport],
+ List[great_expectations.core.ExpectationSuiteValidationResult],
+ ]:
"""Get all Validation Report of a Feature Group."""
url = self._get_validation_report_url()
print(
diff --git a/python/hsfs/core/validation_result_engine.py b/python/hsfs/core/validation_result_engine.py
index 6c3a86b3e..a1c34e4d8 100644
--- a/python/hsfs/core/validation_result_engine.py
+++ b/python/hsfs/core/validation_result_engine.py
@@ -16,9 +16,13 @@
from __future__ import annotations
from datetime import date, datetime
-from typing import Dict, List, Union
+from typing import TYPE_CHECKING, Dict, List, Union
+
+
+if TYPE_CHECKING:
+ import great_expectations
+
-from great_expectations.core import ExpectationValidationResult
from hsfs import util
from hsfs.core import validation_result_api
from hsfs.ge_validation_result import ValidationResult
@@ -44,7 +48,10 @@ def get_validation_history(
end_validation_time: Union[str, int, datetime, date, None] = None,
filter_by: List[str] = None,
ge_type: bool = True,
- ) -> Union[List[ValidationResult], List[ExpectationValidationResult]]:
+ ) -> Union[
+ List[ValidationResult],
+ List[great_expectations.core.ExpectationValidationResult],
+ ]:
"""Get Validation Results relevant to an Expectation specified by expectation_id.
:param expectation_id: id of the expectation for which to fetch the validation history
diff --git a/python/hsfs/decorators.py b/python/hsfs/decorators.py
index 1c66ee001..3ce15277f 100644
--- a/python/hsfs/decorators.py
+++ b/python/hsfs/decorators.py
@@ -18,6 +18,11 @@
import functools
import os
+from hsfs.core.constants import (
+ HAS_GREAT_EXPECTATIONS,
+ great_expectations_not_installed_message,
+)
+
def not_connected(fn):
@functools.wraps(fn)
@@ -68,3 +73,13 @@ def typechecked(
target: _T,
) -> _T:
return target if target else typechecked
+
+
+def uses_great_expectations(f):
+ @functools.wraps(f)
+ def g(*args, **kwds):
+ if not HAS_GREAT_EXPECTATIONS:
+ raise ModuleNotFoundError(great_expectations_not_installed_message)
+ return f(*args, **kwds)
+
+ return g
diff --git a/python/hsfs/engine/python.py b/python/hsfs/engine/python.py
index 8e64e6ec9..caaecdffb 100644
--- a/python/hsfs/engine/python.py
+++ b/python/hsfs/engine/python.py
@@ -30,11 +30,24 @@
from datetime import datetime, timezone
from io import BytesIO
from pathlib import Path
-from typing import Any, Callable, Dict, List, Literal, Optional, Tuple, Union
+from typing import (
+ TYPE_CHECKING,
+ Any,
+ Callable,
+ Dict,
+ List,
+ Literal,
+ Optional,
+ Tuple,
+ Union,
+)
+
+
+if TYPE_CHECKING:
+ import great_expectations
import avro
import boto3
-import great_expectations as ge
import hsfs
import numpy as np
import pandas as pd
@@ -70,7 +83,9 @@
transformation_function_engine,
variable_api,
)
+from hsfs.core.constants import HAS_GREAT_EXPECTATIONS
from hsfs.core.vector_db_client import VectorDbClient
+from hsfs.decorators import uses_great_expectations
from hsfs.feature_group import ExternalFeatureGroup, FeatureGroup
from hsfs.training_dataset import TrainingDataset
from hsfs.training_dataset_split import TrainingDatasetSplit
@@ -93,6 +108,9 @@
except ImportError:
pass
+if HAS_GREAT_EXPECTATIONS:
+ import great_expectations
+
# Decimal types are currently not supported
_INT_TYPES = [pa.uint8(), pa.uint16(), pa.int8(), pa.int16(), pa.int32()]
_BIG_INT_TYPES = [pa.uint32(), pa.int64()]
@@ -206,7 +224,6 @@ def _sql_offline(
hive_config: Optional[Dict[str, Any]] = None,
arrow_flight_config: Optional[Dict[str, Any]] = None,
) -> Union[pd.DataFrame, pl.DataFrame]:
-
self._validate_dataframe_type(dataframe_type)
if isinstance(sql_query, dict) and "query_string" in sql_query:
result_df = util.run_with_loading_animation(
@@ -510,7 +527,12 @@ def show(
sql_query, feature_store, online_conn, "default", read_options or {}
).head(n)
- def read_vector_db(self, feature_group: "hsfs.feature_group.FeatureGroup", n: int =None, dataframe_type: str="default") -> Union[pd.DataFrame, pl.DataFrame, np.ndarray, List[List[Any]]]:
+ def read_vector_db(
+ self,
+ feature_group: "hsfs.feature_group.FeatureGroup",
+ n: int = None,
+ dataframe_type: str = "default",
+ ) -> Union[pd.DataFrame, pl.DataFrame, np.ndarray, List[List[Any]]]:
dataframe_type = dataframe_type.lower()
self._validate_dataframe_type(dataframe_type)
@@ -686,12 +708,13 @@ def validate(
"Deequ data validation is only available with Spark Engine. Use validate_with_great_expectations"
)
+ @uses_great_expectations
def validate_with_great_expectations(
self,
dataframe: Union[pl.DataFrame, pd.DataFrame],
- expectation_suite: ge.core.ExpectationSuite,
+ expectation_suite: great_expectations.core.ExpectationSuite,
ge_validate_kwargs: Optional[Dict[Any, Any]] = None,
- ) -> ge.core.ExpectationSuiteValidationResult:
+ ) -> great_expectations.core.ExpectationSuiteValidationResult:
# This conversion might cause a bottleneck in performance when using polars with greater expectations.
# This patch is done becuase currently great_expecatations does not support polars, would need to be made proper when support added.
if isinstance(dataframe, pl.DataFrame) or isinstance(
@@ -705,7 +728,7 @@ def validate_with_great_expectations(
dataframe = dataframe.to_pandas()
if ge_validate_kwargs is None:
ge_validate_kwargs = {}
- report = ge.from_pandas(
+ report = great_expectations.from_pandas(
dataframe, expectation_suite=expectation_suite
).validate(**ge_validate_kwargs)
return report
@@ -1439,8 +1462,11 @@ def acked(err: Exception, msg: Any) -> None:
elif not isinstance(
feature_group, ExternalFeatureGroup
) and self._start_offline_materialization(offline_write_options):
- if (not offline_write_options.get("skip_offsets", False)
- and self._job_api.last_execution(feature_group.materialization_job)): # always skip offsets if executing job for the first time
+ if not offline_write_options.get(
+ "skip_offsets", False
+ ) and self._job_api.last_execution(
+ feature_group.materialization_job
+ ): # always skip offsets if executing job for the first time
# don't provide the current offsets (read from where the job last left off)
initial_check_point = ""
# provide the initial_check_point as it will reduce the read amplification of materialization job
diff --git a/python/hsfs/engine/spark.py b/python/hsfs/engine/spark.py
index b9f8621cf..9142d6483 100644
--- a/python/hsfs/engine/spark.py
+++ b/python/hsfs/engine/spark.py
@@ -23,7 +23,11 @@
import shutil
import warnings
from datetime import date, datetime, timezone
-from typing import Any, List, Optional, TypeVar, Union
+from typing import TYPE_CHECKING, Any, List, Optional, TypeVar, Union
+
+
+if TYPE_CHECKING:
+ import great_expectations
import avro
import numpy as np
@@ -76,12 +80,6 @@ def iteritems(self):
except ImportError:
pass
-from great_expectations.core.batch import RuntimeBatchRequest
-from great_expectations.data_context import BaseDataContext
-from great_expectations.data_context.types.base import (
- DataContextConfig,
- InMemoryStoreBackendDefaults,
-)
from hsfs import client, feature, training_dataset_feature, util
from hsfs import feature_group as fg_mod
from hsfs.client import hopsworks
@@ -94,10 +92,16 @@ def iteritems(self):
storage_connector_api,
transformation_function_engine,
)
+from hsfs.core.constants import HAS_GREAT_EXPECTATIONS
+from hsfs.decorators import uses_great_expectations
from hsfs.storage_connector import StorageConnector
from hsfs.training_dataset_split import TrainingDatasetSplit
+if HAS_GREAT_EXPECTATIONS:
+ import great_expectations
+
+
class Engine:
HIVE_FORMAT = "hive"
KAFKA_FORMAT = "kafka"
@@ -152,7 +156,14 @@ def show(self, sql_query, feature_store, n, online_conn, read_options=None):
sql_query, feature_store, online_conn, "default", read_options
).show(n)
- def read_vector_db(self, feature_group: fg_mod.FeatureGroup, n: int =None, dataframe_type: str="default") -> Union[pd.DataFrame, np.ndarray, List[List[Any]], TypeVar("pyspark.sql.DataFrame")]:
+ def read_vector_db(
+ self,
+ feature_group: fg_mod.FeatureGroup,
+ n: int = None,
+ dataframe_type: str = "default",
+ ) -> Union[
+ pd.DataFrame, np.ndarray, List[List[Any]], TypeVar("pyspark.sql.DataFrame")
+ ]:
results = VectorDbClient.read_feature_group(feature_group, n)
feature_names = [f.name for f in feature_group.features]
dataframe_type = dataframe_type.lower()
@@ -962,21 +973,28 @@ def profile(
exact_uniqueness,
)
+ @uses_great_expectations
def validate_with_great_expectations(
self,
dataframe: TypeVar("pyspark.sql.DataFrame"), # noqa: F821
- expectation_suite: TypeVar("ge.core.ExpectationSuite"), # noqa: F821
+ expectation_suite: great_expectations.core.ExpectationSuite, # noqa: F821
ge_validate_kwargs: Optional[dict],
):
# NOTE: InMemoryStoreBackendDefaults SHOULD NOT BE USED in normal settings. You
# may experience data loss as it persists nothing. It is used here for testing.
# Please refer to docs to learn how to instantiate your DataContext.
- store_backend_defaults = InMemoryStoreBackendDefaults()
- data_context_config = DataContextConfig(
- store_backend_defaults=store_backend_defaults,
- checkpoint_store_name=store_backend_defaults.checkpoint_store_name,
+ store_backend_defaults = (
+ great_expectations.data_context.types.base.InMemoryStoreBackendDefaults()
+ )
+ data_context_config = (
+ great_expectations.data_context.types.base.DataContextConfig(
+ store_backend_defaults=store_backend_defaults,
+ checkpoint_store_name=store_backend_defaults.checkpoint_store_name,
+ )
+ )
+ context = great_expectations.data_context.BaseDataContext(
+ project_config=data_context_config
)
- context = BaseDataContext(project_config=data_context_config)
datasource = {
"name": "my_spark_dataframe",
@@ -995,7 +1013,7 @@ def validate_with_great_expectations(
context.add_datasource(**datasource)
# Here is a RuntimeBatchRequest using a dataframe
- batch_request = RuntimeBatchRequest(
+ batch_request = great_expectations.core.batch.RuntimeBatchRequest(
datasource_name="my_spark_dataframe",
data_connector_name="default_runtime_data_connector_name",
data_asset_name="", # This can be anything that identifies this data_asset for you
diff --git a/python/hsfs/expectation_suite.py b/python/hsfs/expectation_suite.py
index 2d8f5151d..e8026dcf6 100644
--- a/python/hsfs/expectation_suite.py
+++ b/python/hsfs/expectation_suite.py
@@ -17,18 +17,32 @@
import json
import re
-from typing import Any, Dict, List, Optional, Union
+from typing import TYPE_CHECKING, Any, Dict, List, Literal, Optional, Union
+
+
+if TYPE_CHECKING:
+ import great_expectations
-import great_expectations as ge
import humps
from hsfs import util
from hsfs.client.exceptions import FeatureStoreException
from hsfs.core import expectation_suite_engine
+from hsfs.core.constants import (
+ HAS_GREAT_EXPECTATIONS,
+ initialise_expectation_suite_for_single_expectation_api_message,
+)
from hsfs.core.expectation_engine import ExpectationEngine
from hsfs.core.variable_api import VariableApi
+
+# if great_expectations is not installed, we will default to using native Hopsworks class as return values
+from hsfs.decorators import uses_great_expectations
from hsfs.ge_expectation import GeExpectation
+if HAS_GREAT_EXPECTATIONS:
+ import great_expectations
+
+
class ExpectationSuite:
"""Metadata object representing an feature validation expectation in the Feature Store."""
@@ -36,28 +50,25 @@ def __init__(
self,
expectation_suite_name: str,
expectations: List[
- Union[ge.core.ExpectationConfiguration, dict, GeExpectation]
+ Union[
+ great_expectations.core.ExpectationConfiguration,
+ Dict[str, Any],
+ GeExpectation,
+ ]
],
meta: Dict[str, Any],
id: Optional[int] = None,
- data_asset_type: Optional[str] = None,
- ge_cloud_id: Optional[int] = None,
run_validation: bool = True,
- validation_ingestion_policy: str = "ALWAYS",
+ validation_ingestion_policy: Literal["always", " strict"] = "always",
feature_store_id: Optional[int] = None,
feature_group_id: Optional[int] = None,
href: Optional[str] = None,
- expand: Optional[str] = None,
- items: Optional[List[Dict[str, Any]]] = None,
- count: Optional[int] = None,
- type: Optional[str] = None,
- created: Optional[int] = None,
**kwargs,
) -> None:
self._id = id
self._expectation_suite_name = expectation_suite_name
- self._ge_cloud_id = ge_cloud_id
- self._data_asset_type = data_asset_type
+ self._ge_cloud_id = kwargs.get("ge_cloud_id", None)
+ self._data_asset_type = kwargs.get("data_asset_type", None)
self._run_validation = run_validation
self._validation_ingestion_policy = validation_ingestion_policy.upper()
self._expectations = []
@@ -72,15 +83,15 @@ def __init__(
self._feature_group_id = None
self._feature_store_id = None
- self._variable_api: "VariableApi" = VariableApi()
+ self._variable_api: VariableApi = VariableApi()
# use setters because these need to be transformed from stringified json
self.expectations = expectations
self.meta = meta
- self._expectation_engine: Optional["ExpectationEngine"] = None
+ self._expectation_engine: Optional[ExpectationEngine] = None
self._expectation_suite_engine: Optional[
- "expectation_suite_engine.ExpectationSuiteEngine"
+ expectation_suite_engine.ExpectationSuiteEngine
] = None
if self.id:
@@ -105,13 +116,13 @@ def __init__(
@classmethod
def from_response_json(
cls, json_dict: Dict[str, Any]
- ) -> Union["ExpectationSuite", List["ExpectationSuite"]]:
+ ) -> Optional[Union[ExpectationSuite, List[ExpectationSuite]]]:
json_decamelized = humps.decamelize(json_dict)
if (
"count" in json_decamelized
): # todo count is expected also when providing dict
if json_decamelized["count"] == 0:
- return None # todo sometimes empty list returns [] others None
+ return None
return [
cls(**expectation_suite)
for expectation_suite in json_decamelized["items"]
@@ -120,19 +131,20 @@ def from_response_json(
return cls(**json_decamelized)
@classmethod
+ @uses_great_expectations
def from_ge_type(
cls,
- ge_expectation_suite: ge.core.ExpectationSuite,
+ ge_expectation_suite: great_expectations.core.ExpectationSuite,
run_validation: bool = True,
- validation_ingestion_policy: str = "ALWAYS",
+ validation_ingestion_policy: Literal["ALWAYS", "STRICT"] = "ALWAYS",
id: Optional[int] = None,
feature_store_id: Optional[int] = None,
feature_group_id: Optional[int] = None,
- ) -> "ExpectationSuite":
+ ) -> ExpectationSuite:
"""Used to create a Hopsworks Expectation Suite instance from a great_expectations instance.
# Arguments
- ge_expectation_suite: great_expectations.ExpectationSuite
+ ge_expectation_suite: great_expectations.core.ExpectationSuite
The great_expectations ExpectationSuite instance to convert to a Hopsworks ExpectationSuite.
run_validation: bool
Whether to run validation on inserts when the expectation suite is attached.
@@ -172,7 +184,7 @@ def to_dict(self) -> Dict[str, Any]:
"geCloudId": self._ge_cloud_id,
"dataAssetType": self._data_asset_type,
"runValidation": self._run_validation,
- "validationIngestionPolicy": self._validation_ingestion_policy,
+ "validationIngestionPolicy": self._validation_ingestion_policy.upper(),
}
def to_json_dict(self, decamelize: bool = False) -> Dict[str, Any]:
@@ -188,7 +200,7 @@ def to_json_dict(self, decamelize: bool = False) -> Dict[str, Any]:
"geCloudId": self._ge_cloud_id,
"dataAssetType": self._data_asset_type,
"runValidation": self._run_validation,
- "validationIngestionPolicy": self._validation_ingestion_policy,
+ "validationIngestionPolicy": self._validation_ingestion_policy.upper(),
}
if decamelize:
@@ -199,8 +211,9 @@ def to_json_dict(self, decamelize: bool = False) -> Dict[str, Any]:
def json(self) -> str:
return json.dumps(self, cls=util.FeatureStoreEncoder)
- def to_ge_type(self) -> ge.core.ExpectationSuite:
- return ge.core.ExpectationSuite(
+ @uses_great_expectations
+ def to_ge_type(self) -> great_expectations.core.ExpectationSuite:
+ return great_expectations.core.ExpectationSuite(
expectation_suite_name=self._expectation_suite_name,
ge_cloud_id=self._ge_cloud_id,
data_asset_type=self._data_asset_type,
@@ -229,7 +242,7 @@ def _init_expectation_engine(
)
else:
raise ValueError(
- "Initialise the Expectation Suite first by attaching to a Feature Group"
+ initialise_expectation_suite_for_single_expectation_api_message
)
def _init_expectation_suite_engine(
@@ -256,7 +269,12 @@ def _init_expectation_suite_engine(
# Emulate GE single expectation api to edit list of expectations
def _convert_expectation(
- self, expectation: Union[GeExpectation, ge.core.ExpectationConfiguration, dict]
+ self,
+ expectation: Union[
+ GeExpectation,
+ great_expectations.core.ExpectationConfiguration,
+ Dict[str, Any],
+ ],
) -> GeExpectation:
"""
Convert different representation of expectation to Hopsworks GeExpectation type.
@@ -270,7 +288,9 @@ def _convert_expectation(
# Raises
`TypeError`
"""
- if isinstance(expectation, ge.core.ExpectationConfiguration):
+ if HAS_GREAT_EXPECTATIONS and isinstance(
+ expectation, great_expectations.core.ExpectationConfiguration
+ ):
return GeExpectation(**expectation.to_json_dict())
elif isinstance(expectation, GeExpectation):
return expectation
@@ -282,8 +302,8 @@ def _convert_expectation(
)
def get_expectation(
- self, expectation_id: int, ge_type: bool = True
- ) -> Union[GeExpectation, ge.core.ExpectationConfiguration]:
+ self, expectation_id: int, ge_type: bool = HAS_GREAT_EXPECTATIONS
+ ) -> Union[GeExpectation, great_expectations.core.ExpectationConfiguration]:
"""
Fetch expectation with expectation_id from the backend.
@@ -301,7 +321,8 @@ def get_expectation(
# Arguments
expectation_id: Id of the expectation to fetch from the backend.
- ge_type: Whether to return native Great Expectations object or Hopsworks abstraction, defaults to True.
+ ge_type: Whether to return native Great Expectations object or Hopsworks abstraction,
+ defaults to True if great_expectations is installed else false.
# Returns
The expectation with expectation_id registered in the backend.
@@ -310,13 +331,6 @@ def get_expectation(
`hsfs.client.exceptions.RestAPIError`
`hsfs.client.exceptions.FeatureStoreException`
"""
- major, minor = self._variable_api.parse_major_and_minor(
- self._variable_api.get_version("hopsworks")
- )
- if major == "3" and minor == "0":
- raise FeatureStoreException(
- "The hopsworks server does not support this operation. Update server to hopsworks >3.1 to enable support."
- )
if self.id and self._expectation_engine:
if ge_type:
return self._expectation_engine.get(expectation_id).to_ge_type()
@@ -324,14 +338,16 @@ def get_expectation(
return self._expectation_engine.get(expectation_id)
else:
raise FeatureStoreException(
- "Initialize Expectation Suite by attaching to a Feature Group to enable single expectation API"
+ initialise_expectation_suite_for_single_expectation_api_message
)
def add_expectation(
self,
- expectation: Union[GeExpectation, ge.core.ExpectationConfiguration],
- ge_type: bool = True,
- ) -> Union[GeExpectation, ge.core.ExpectationConfiguration]:
+ expectation: Union[
+ GeExpectation, great_expectations.core.ExpectationConfiguration
+ ],
+ ge_type: bool = HAS_GREAT_EXPECTATIONS,
+ ) -> Union[GeExpectation, great_expectations.core.ExpectationConfiguration]:
"""
Append an expectation to the local suite or in the backend if attached to a Feature Group.
@@ -363,7 +379,8 @@ def add_expectation(
```
# Arguments
expectation: The new expectation object.
- ge_type: Whether to return native Great Expectations object or Hopsworks abstraction, defaults to True.
+ ge_type: Whether to return native Great Expectations object or Hopsworks abstraction,
+ defaults to True if great_expectations is installed else false.
# Returns
The new expectation attached to the Feature Group.
@@ -372,14 +389,6 @@ def add_expectation(
`hsfs.client.exceptions.RestAPIError`
`hsfs.client.exceptions.FeatureStoreException`
"""
- major, minor = self._variable_api.parse_major_and_minor(
- self._variable_api.get_version("hopsworks")
- )
- if major == "3" and minor == "0":
- raise FeatureStoreException(
- "The hopsworks server does not support this operation. Update server to hopsworks >3.1 to enable support."
- )
-
if self.id:
converted_expectation = self._convert_expectation(expectation=expectation)
converted_expectation = self._expectation_engine.create(
@@ -392,14 +401,16 @@ def add_expectation(
return converted_expectation
else:
raise FeatureStoreException(
- "Initialize Expectation Suite by attaching to a Feature Group to enable single expectation API"
+ initialise_expectation_suite_for_single_expectation_api_message
)
def replace_expectation(
self,
- expectation: Union[GeExpectation, ge.core.ExpectationConfiguration],
- ge_type: bool = True,
- ) -> Union[GeExpectation, ge.core.ExpectationConfiguration]:
+ expectation: Union[
+ GeExpectation, great_expectations.core.ExpectationConfiguration
+ ],
+ ge_type: bool = HAS_GREAT_EXPECTATIONS,
+ ) -> Union[GeExpectation, great_expectations.core.ExpectationConfiguration]:
"""
Update an expectation from the suite locally or from the backend if attached to a Feature Group.
@@ -410,7 +421,8 @@ def replace_expectation(
# Arguments
expectation: The updated expectation object. The meta field should contain an expectationId field.
- ge_type: Whether to return native Great Expectations object or Hopsworks abstraction, defaults to True.
+ ge_type: Whether to return native Great Expectations object or Hopsworks abstraction,
+ defaults to True if great_expectations is installed else false.
# Returns
The updated expectation attached to the Feature Group.
@@ -419,14 +431,6 @@ def replace_expectation(
`hsfs.client.exceptions.RestAPIError`
`hsfs.client.exceptions.FeatureStoreException`
"""
- major, minor = self._variable_api.parse_major_and_minor(
- self._variable_api.get_version("hopsworks")
- )
- if major == "3" and minor == "0":
- raise FeatureStoreException(
- "The hopsworks server does not support this operation. Update server to hopsworks >3.1 to enable support."
- )
-
if self.id:
converted_expectation = self._convert_expectation(expectation=expectation)
# To update an expectation we need an id either from meta field or from self.id
@@ -443,7 +447,7 @@ def replace_expectation(
return converted_expectation
else:
raise FeatureStoreException(
- "Initialize Expectation Suite by attaching to a Feature Group to enable single expectation API"
+ initialise_expectation_suite_for_single_expectation_api_message
)
def remove_expectation(self, expectation_id: Optional[int] = None) -> None:
@@ -462,20 +466,12 @@ def remove_expectation(self, expectation_id: Optional[int] = None) -> None:
`hsfs.client.exceptions.RestAPIError`
`hsfs.client.exceptions.FeatureStoreException`
"""
- major, minor = self._variable_api.parse_major_and_minor(
- self._variable_api.get_version("hopsworks")
- )
- if major == "3" and minor == "0":
- raise FeatureStoreException(
- "The hopsworks server does not support this operation. Update server to hopsworks >3.1 to enable support."
- )
-
if self.id:
self._expectation_engine.delete(expectation_id=expectation_id)
self.expectations = self._expectation_engine.get_expectations_by_suite_id()
else:
raise FeatureStoreException(
- "Initialize Expectation Suite by attaching to a Feature Group to enable single expectation API"
+ initialise_expectation_suite_for_single_expectation_api_message
)
# End of single expectation API
@@ -558,16 +554,18 @@ def run_validation(self, run_validation: bool) -> None:
)
@property
- def validation_ingestion_policy(self) -> str:
+ def validation_ingestion_policy(self) -> Literal["always", "strict"]:
"""Whether to ingest a df based on the validation result.
- "STRICT" : ingest df only if all expectations succeed,
- "ALWAYS" : always ingest df, even if one or more expectations fail
+ "strict" : ingest df only if all expectations succeed,
+ "always" : always ingest df, even if one or more expectations fail
"""
return self._validation_ingestion_policy
@validation_ingestion_policy.setter
- def validation_ingestion_policy(self, validation_ingestion_policy: str) -> None:
+ def validation_ingestion_policy(
+ self, validation_ingestion_policy: Literal["always", "strict"]
+ ) -> None:
self._validation_ingestion_policy = validation_ingestion_policy.upper()
if self.id:
self._expectation_suite_engine.update_metadata_from_fields(
@@ -583,7 +581,7 @@ def expectations(self) -> List[GeExpectation]:
def expectations(
self,
expectations: Union[
- List[ge.core.ExpectationConfiguration],
+ List[great_expectations.core.ExpectationConfiguration],
List[GeExpectation],
List[dict],
None,
diff --git a/python/hsfs/feature_group.py b/python/hsfs/feature_group.py
index de5577417..f49096354 100644
--- a/python/hsfs/feature_group.py
+++ b/python/hsfs/feature_group.py
@@ -21,11 +21,24 @@
import time
import warnings
from datetime import date, datetime
-from typing import Any, Dict, List, Optional, Tuple, TypeVar, Union
+from typing import (
+ TYPE_CHECKING,
+ Any,
+ Dict,
+ List,
+ Literal,
+ Optional,
+ Tuple,
+ TypeVar,
+ Union,
+)
+
+
+if TYPE_CHECKING:
+ import great_expectations
import avro.schema
import confluent_kafka
-import great_expectations as ge
import humps
import numpy as np
import pandas as pd
@@ -66,10 +79,15 @@
)
from hsfs.core import feature_monitoring_config as fmc
from hsfs.core import feature_monitoring_result as fmr
+from hsfs.core.constants import (
+ HAS_GREAT_EXPECTATIONS,
+)
from hsfs.core.job import Job
from hsfs.core.variable_api import VariableApi
from hsfs.core.vector_db_client import VectorDbClient
-from hsfs.decorators import typechecked
+
+# if great_expectations is not installed, we will default to using native Hopsworks class as return values
+from hsfs.decorators import typechecked, uses_great_expectations
from hsfs.embedding import EmbeddingIndex
from hsfs.expectation_suite import ExpectationSuite
from hsfs.ge_validation_result import ValidationResult
@@ -78,6 +96,10 @@
from hsfs.validation_report import ValidationReport
+if HAS_GREAT_EXPECTATIONS:
+ import great_expectations
+
+
_logger = logging.getLogger(__name__)
@@ -92,9 +114,13 @@ def __init__(
event_time: Optional[Union[str, int, date, datetime]] = None,
online_enabled: bool = False,
id: Optional[int] = None,
- embedding_index: Optional["EmbeddingIndex"] = None,
+ embedding_index: Optional[EmbeddingIndex] = None,
expectation_suite: Optional[
- Union["ExpectationSuite", "ge.core.ExpectationSuite", Dict[str, Any]]
+ Union[
+ ExpectationSuite,
+ great_expectations.core.ExpectationSuite,
+ Dict[str, Any],
+ ]
] = None,
online_topic_name: Optional[str] = None,
topic_name: Optional[str] = None,
@@ -124,13 +150,13 @@ def __init__(
self._feature_group_engine: Optional[
feature_group_engine.FeatureGroupEngine
] = None
- self._statistics_engine: "statistics_engine.StatisticsEngine" = (
+ self._statistics_engine: statistics_engine.StatisticsEngine = (
statistics_engine.StatisticsEngine(featurestore_id, self.ENTITY_TYPE)
)
- self._code_engine: "code_engine.CodeEngine" = code_engine.CodeEngine(
+ self._code_engine: code_engine.CodeEngine = code_engine.CodeEngine(
featurestore_id, self.ENTITY_TYPE
)
- self._great_expectation_engine: "great_expectation_engine.GreatExpectationEngine" = great_expectation_engine.GreatExpectationEngine(
+ self._great_expectation_engine: great_expectation_engine.GreatExpectationEngine = great_expectation_engine.GreatExpectationEngine(
featurestore_id
)
if self._id is not None:
@@ -139,27 +165,27 @@ def __init__(
feature_store_id=featurestore_id, feature_group_id=self._id
)
self._expectation_suite_engine: Optional[
- "expectation_suite_engine.ExpectationSuiteEngine"
+ expectation_suite_engine.ExpectationSuiteEngine
] = expectation_suite_engine.ExpectationSuiteEngine(
feature_store_id=featurestore_id, feature_group_id=self._id
)
self._validation_report_engine: Optional[
- "validation_report_engine.ValidationReportEngine"
+ validation_report_engine.ValidationReportEngine
] = validation_report_engine.ValidationReportEngine(
featurestore_id, self._id
)
self._validation_result_engine: Optional[
- "validation_result_engine.ValidationResultEngine"
+ validation_result_engine.ValidationResultEngine
] = validation_result_engine.ValidationResultEngine(
featurestore_id, self._id
)
self._feature_monitoring_config_engine: Optional[
- "feature_monitoring_config_engine.FeatureMonitoringConfigEngine"
+ feature_monitoring_config_engine.FeatureMonitoringConfigEngine
] = feature_monitoring_config_engine.FeatureMonitoringConfigEngine(
feature_store_id=featurestore_id,
feature_group_id=self._id,
)
- self._feature_monitoring_result_engine: "feature_monitoring_result_engine.FeatureMonitoringResultEngine" = feature_monitoring_result_engine.FeatureMonitoringResultEngine(
+ self._feature_monitoring_result_engine: feature_monitoring_result_engine.FeatureMonitoringResultEngine = feature_monitoring_result_engine.FeatureMonitoringResultEngine(
feature_store_id=self._feature_store_id,
feature_group_id=self._id,
)
@@ -543,8 +569,13 @@ def get_storage_connector(self):
"""
storage_connector_provenance = self.get_storage_connector_provenance()
- if storage_connector_provenance.inaccessible or storage_connector_provenance.deleted:
- _logger.info("The parent storage connector is deleted or inaccessible. For more details access `get_storage_connector_provenance`")
+ if (
+ storage_connector_provenance.inaccessible
+ or storage_connector_provenance.deleted
+ ):
+ _logger.info(
+ "The parent storage connector is deleted or inaccessible. For more details access `get_storage_connector_provenance`"
+ )
if storage_connector_provenance.accessible:
return storage_connector_provenance.accessible[0]
@@ -878,8 +909,8 @@ def append_features(
return self
def get_expectation_suite(
- self, ge_type: bool = True
- ) -> Union[ExpectationSuite, ge.core.ExpectationSuite, None]:
+ self, ge_type: bool = HAS_GREAT_EXPECTATIONS
+ ) -> Union[ExpectationSuite, great_expectations.core.ExpectationSuite, None]:
"""Return the expectation suite attached to the feature group if it exists.
!!! example
@@ -896,7 +927,8 @@ def get_expectation_suite(
# Arguments
ge_type: If `True` returns a native Great Expectation type, Hopsworks
custom type otherwise. Conversion can be performed via the `to_ge_type()`
- method on hopsworks type. Defaults to `True`.
+ method on hopsworks type. Defaults to `True` if Great Expectations is installed,
+ else `False`.
# Returns
`ExpectationSuite`. The expectation suite attached to the feature group.
@@ -915,11 +947,13 @@ def get_expectation_suite(
def save_expectation_suite(
self,
- expectation_suite: Union[ExpectationSuite, ge.core.ExpectationSuite],
+ expectation_suite: Union[
+ ExpectationSuite, great_expectations.core.ExpectationSuite
+ ],
run_validation: bool = True,
- validation_ingestion_policy: str = "ALWAYS",
+ validation_ingestion_policy: Literal["always", "strict"] = "always",
overwrite: bool = False,
- ) -> Union[ExpectationSuite, ge.core.ExpectationSuite]:
+ ) -> Union[ExpectationSuite, great_expectations.core.ExpectationSuite]:
"""Attach an expectation suite to a feature group and saves it for future use. If an expectation
suite is already attached, it is replaced. Note that the provided expectation suite is modified
inplace to include expectationId fields.
@@ -947,7 +981,9 @@ def save_expectation_suite(
# Raises
`hsfs.client.exceptions.RestAPIError`.
"""
- if isinstance(expectation_suite, ge.core.ExpectationSuite):
+ if HAS_GREAT_EXPECTATIONS and isinstance(
+ expectation_suite, great_expectations.core.ExpectationSuite
+ ):
tmp_expectation_suite = ExpectationSuite.from_ge_type(
ge_expectation_suite=expectation_suite,
run_validation=run_validation,
@@ -1001,8 +1037,10 @@ def delete_expectation_suite(self) -> None:
self._expectation_suite = None
def get_latest_validation_report(
- self, ge_type: bool = True
- ) -> Union[ValidationReport, ge.core.ExpectationSuiteValidationResult, None]:
+ self, ge_type: bool = HAS_GREAT_EXPECTATIONS
+ ) -> Union[
+ ValidationReport, great_expectations.core.ExpectationSuiteValidationResult, None
+ ]:
"""Return the latest validation report attached to the Feature Group if it exists.
!!! example
@@ -1019,7 +1057,8 @@ def get_latest_validation_report(
# Arguments
ge_type: If `True` returns a native Great Expectation type, Hopsworks
custom type otherwise. Conversion can be performed via the `to_ge_type()`
- method on hopsworks type. Defaults to `True`.
+ method on hopsworks type. Defaults to `True` if Great Expectations is installed,
+ else `False`.
# Returns
`ValidationReport`. The latest validation report attached to the Feature Group.
@@ -1030,8 +1069,12 @@ def get_latest_validation_report(
return self._validation_report_engine.get_last(ge_type=ge_type)
def get_all_validation_reports(
- self, ge_type: bool = True
- ) -> List[Union[ValidationReport, ge.core.ExpectationSuiteValidationResult]]:
+ self, ge_type: bool = HAS_GREAT_EXPECTATIONS
+ ) -> List[
+ Union[
+ ValidationReport, great_expectations.core.ExpectationSuiteValidationResult
+ ]
+ ]:
"""Return the latest validation report attached to the feature group if it exists.
!!! example
@@ -1048,7 +1091,8 @@ def get_all_validation_reports(
# Arguments
ge_type: If `True` returns a native Great Expectation type, Hopsworks
custom type otherwise. Conversion can be performed via the `to_ge_type()`
- method on hopsworks type. Defaults to `True`.
+ method on hopsworks type. Defaults to `True` if Great Expectations is installed,
+ else `False`.
# Returns
Union[List[`ValidationReport`], `ValidationReport`]. All validation reports attached to the feature group.
@@ -1069,11 +1113,13 @@ def save_validation_report(
validation_report: Union[
Dict[str, Any],
ValidationReport,
- ge.core.expectation_validation_result.ExpectationSuiteValidationResult,
+ great_expectations.core.expectation_validation_result.ExpectationSuiteValidationResult,
],
- ingestion_result: str = "UNKNOWN",
- ge_type: bool = True,
- ) -> Union[ValidationReport, ge.core.ExpectationSuiteValidationResult]:
+ ingestion_result: Literal["unknown", "experiment", "fg_data"] = "UNKNOWN",
+ ge_type: bool = HAS_GREAT_EXPECTATIONS,
+ ) -> Union[
+ ValidationReport, great_expectations.core.ExpectationSuiteValidationResult
+ ]:
"""Save validation report to hopsworks platform along previous reports of the same Feature Group.
!!! example
@@ -1101,15 +1147,16 @@ def save_validation_report(
already in the Feature Group.
ge_type: If `True` returns a native Great Expectation type, Hopsworks
custom type otherwise. Conversion can be performed via the `to_ge_type()`
- method on hopsworks type. Defaults to `True`.
+ method on hopsworks type. Defaults to `True` if Great Expectations is installed,
+ else `False`.
# Raises
`hsfs.client.exceptions.RestAPIError`.
"""
if self._id:
- if isinstance(
+ if HAS_GREAT_EXPECTATIONS and isinstance(
validation_report,
- ge.core.expectation_validation_result.ExpectationSuiteValidationResult,
+ great_expectations.core.expectation_validation_result.ExpectationSuiteValidationResult,
):
report = ValidationReport(
**validation_report.to_json_dict(),
@@ -1137,9 +1184,14 @@ def get_validation_history(
expectation_id: int,
start_validation_time: Union[str, int, datetime, date, None] = None,
end_validation_time: Union[str, int, datetime, date, None] = None,
- filter_by: List[str] = None,
- ge_type: bool = True,
- ) -> Union[List[ValidationResult], List[ge.core.ExpectationValidationResult]]:
+ filter_by: List[
+ Literal["ingested", "rejected", "unknown", "fg_data", "experiment"]
+ ] = None,
+ ge_type: bool = HAS_GREAT_EXPECTATIONS,
+ ) -> Union[
+ List[ValidationResult],
+ List[great_expectations.core.ExpectationValidationResult],
+ ]:
"""Fetch validation history of an Expectation specified by its id.
!!! example
@@ -1160,6 +1212,10 @@ def get_validation_history(
Supported format include timestamps(int), datetime, date or string formatted to be datutils parsable. See examples above.
end_validation_time: fetch only validation result prior to the provided time, inclusive.
Supported format include timestamps(int), datetime, date or string formatted to be datutils parsable. See examples above.
+ ge_type: If `True` returns a native Great Expectation type, Hopsworks
+ custom type otherwise. Conversion can be performed via the `to_ge_type()`
+ method on hopsworks type. Defaults to `True` if Great Expectations is installed,
+ else `False`.
# Raises
`hsfs.client.exceptions.RestAPIError`.
@@ -1167,14 +1223,6 @@ def get_validation_history(
# Return
Union[List[`ValidationResult`], List[`ExpectationValidationResult`]] A list of validation result connected to the expectation_id
"""
- major, minor = self._variable_api.parse_major_and_minor(
- self._variable_api.get_version("hopsworks")
- )
- if major == "3" and minor == "0":
- raise FeatureStoreException(
- "The hopsworks server does not support this operation. Update server to hopsworks >3.1 to enable support."
- )
-
if self._id:
return self._validation_result_engine.get_validation_history(
expectation_id=expectation_id,
@@ -1188,6 +1236,7 @@ def get_validation_history(
"Only Feature Group registered with Hopsworks can fetch validation history."
)
+ @uses_great_expectations
def validate(
self,
dataframe: Optional[
@@ -1196,13 +1245,17 @@ def validate(
expectation_suite: Optional[ExpectationSuite] = None,
save_report: Optional[bool] = False,
validation_options: Optional[Dict[str, Any]] = None,
- ingestion_result: str = "UNKNOWN",
+ ingestion_result: Literal[
+ "unknown", "ingested", "rejected", "fg_data", "experiement"
+ ] = "unknown",
ge_type: bool = True,
- ) -> Union[ge.core.ExpectationSuiteValidationResult, ValidationReport, None]:
+ ) -> Union[
+ great_expectations.core.ExpectationSuiteValidationResult, ValidationReport, None
+ ]:
"""Run validation based on the attached expectations.
- Runs any expectation attached with Deequ. But also runs attached Great Expectation
- Suites.
+ Runs the expectation suite attached to the feature group against the provided dataframe.
+ Raise an error if the great_expectations package is not installed.
!!! example
```python
@@ -1232,7 +1285,8 @@ def validate(
already in the Feature Group.
save_report: Whether to save the report to the backend. This is only possible if the Expectation suite
is initialised and attached to the Feature Group. Defaults to False.
- ge_type: Whether to return a Great Expectations object or Hopsworks own abstraction. Defaults to True.
+ ge_type: Whether to return a Great Expectations object or Hopsworks own abstraction.
+ Defaults to `True` if Great Expectations is installed, else `False`.
# Returns
A Validation Report produced by Great Expectations.
@@ -1240,7 +1294,7 @@ def validate(
# Activity is logged only if a the validation concerns the feature group and not a specific dataframe
if dataframe is None:
dataframe = self.read()
- if ingestion_result == "UNKNOWN":
+ if ingestion_result.upper() == "UNKNOWN":
ingestion_result = "FG_DATA"
return self._great_expectation_engine.validate(
@@ -1249,7 +1303,7 @@ def validate(
expectation_suite=expectation_suite,
save_report=save_report,
validation_options=validation_options or {},
- ingestion_result=ingestion_result,
+ ingestion_result=ingestion_result.upper(),
ge_type=ge_type,
)
@@ -1813,7 +1867,10 @@ def expectation_suite(
def expectation_suite(
self,
expectation_suite: Union[
- ExpectationSuite, ge.core.ExpectationSuite, Dict[str, Any], None
+ ExpectationSuite,
+ great_expectations.core.ExpectationSuite,
+ Dict[str, Any],
+ None,
],
) -> None:
if isinstance(expectation_suite, ExpectationSuite):
@@ -1821,7 +1878,10 @@ def expectation_suite(
tmp_expectation_suite["feature_group_id"] = self._id
tmp_expectation_suite["feature_store_id"] = self._feature_store_id
self._expectation_suite = ExpectationSuite(**tmp_expectation_suite)
- elif isinstance(expectation_suite, ge.core.expectation_suite.ExpectationSuite):
+ elif HAS_GREAT_EXPECTATIONS and isinstance(
+ expectation_suite,
+ great_expectations.core.expectation_suite.ExpectationSuite,
+ ):
self._expectation_suite = ExpectationSuite(
**expectation_suite.to_json_dict(),
feature_store_id=self._feature_store_id,
@@ -2014,12 +2074,16 @@ def __init__(
event_time: Optional[str] = None,
stream: bool = False,
expectation_suite: Optional[
- Union["ge.core.ExpectationSuite", "ExpectationSuite", Dict[str, Any]]
+ Union[
+ great_expectations.core.ExpectationSuite,
+ ExpectationSuite,
+ Dict[str, Any],
+ ]
] = None,
- parents: Optional[List["explicit_provenance.Links"]] = None,
+ parents: Optional[List[explicit_provenance.Links]] = None,
href: Optional[str] = None,
delta_streamer_job_conf: Optional[
- Union[Dict[str, Any], "deltastreamer_jobconf.DeltaStreamerJobConf"]
+ Union[Dict[str, Any], deltastreamer_jobconf.DeltaStreamerJobConf]
] = None,
deprecated: bool = False,
**kwargs,
@@ -2394,7 +2458,10 @@ def save(
write_options: Optional[Dict[str, Any]] = None,
validation_options: Optional[Dict[str, Any]] = None,
wait: bool = False,
- ) -> Tuple[Optional["Job"], Optional["ge.core.ExpectationSuiteValidationResult"]]:
+ ) -> Tuple[
+ Optional["Job"],
+ Optional[great_expectations.core.ExpectationSuiteValidationResult],
+ ]:
"""Persist the metadata and materialize the feature group to the feature store.
!!! warning "Changed in 3.3.0"
@@ -3410,7 +3477,7 @@ class ExternalFeatureGroup(FeatureGroupBase):
def __init__(
self,
- storage_connector: Union["sc.StorageConnector", Dict[str, Any]],
+ storage_connector: Union[sc.StorageConnector, Dict[str, Any]],
query: Optional[str] = None,
data_format: Optional[str] = None,
path: Optional[str] = None,
@@ -3424,12 +3491,16 @@ def __init__(
created: Optional[str] = None,
creator: Optional[Dict[str, Any]] = None,
id: Optional[int] = None,
- features: Optional[Union[List[Dict[str, Any]], List["feature.Feature"]]] = None,
+ features: Optional[Union[List[Dict[str, Any]], List[feature.Feature]]] = None,
location: Optional[str] = None,
- statistics_config: Optional[Union["StatisticsConfig", Dict[str, Any]]] = None,
+ statistics_config: Optional[Union[StatisticsConfig, Dict[str, Any]]] = None,
event_time: Optional[str] = None,
expectation_suite: Optional[
- Union["ExpectationSuite", "ge.core.ExpectationSuite", Dict[str, Any]]
+ Union[
+ ExpectationSuite,
+ great_expectations.core.ExpectationSuite,
+ Dict[str, Any],
+ ]
] = None,
online_enabled: bool = False,
href: Optional[str] = None,
@@ -3438,7 +3509,7 @@ def __init__(
notification_topic_name: Optional[str] = None,
spine: bool = False,
deprecated: bool = False,
- embedding_index: Optional["EmbeddingIndex"] = None,
+ embedding_index: Optional[EmbeddingIndex] = None,
**kwargs,
) -> None:
super().__init__(
@@ -3552,7 +3623,9 @@ def insert(
validation_options: Optional[Dict[str, Any]] = None,
save_code: Optional[bool] = True,
wait: bool = False,
- ) -> Tuple[None, Optional["ge.core.ExpectationSuiteValidationResult"]]:
+ ) -> Tuple[
+ None, Optional[great_expectations.core.ExpectationSuiteValidationResult]
+ ]:
"""Insert the dataframe feature values ONLY in the online feature store.
External Feature Groups contains metadata about feature data in an external storage system.
@@ -3959,12 +4032,12 @@ def __init__(
created: Optional[str] = None,
creator: Optional[Dict[str, Any]] = None,
id: Optional[int] = None,
- features: Optional[List[Union["feature.Feature", Dict[str, Any]]]] = None,
+ features: Optional[List[Union[feature.Feature, Dict[str, Any]]]] = None,
location: Optional[str] = None,
- statistics_config: Optional["StatisticsConfig"] = None,
+ statistics_config: Optional[StatisticsConfig] = None,
event_time: Optional[str] = None,
expectation_suite: Optional[
- Union["ExpectationSuite", "ge.core.ExpectationSuite"]
+ Union[ExpectationSuite, great_expectations.core.ExpectationSuite]
] = None,
online_enabled: bool = False,
href: Optional[str] = None,
diff --git a/python/hsfs/feature_store.py b/python/hsfs/feature_store.py
index c8a18dc6c..ef2d63571 100644
--- a/python/hsfs/feature_store.py
+++ b/python/hsfs/feature_store.py
@@ -20,7 +20,6 @@
import warnings
from typing import Any, Dict, List, Optional, TypeVar, Union
-import great_expectations as ge
import humps
import numpy
import numpy as np
@@ -208,7 +207,7 @@ def get_feature_groups(
@usage.method_logger
def get_on_demand_feature_group(
self, name: str, version: int = None
- ) -> "feature_group.ExternalFeatureGroup":
+ ) -> feature_group.ExternalFeatureGroup:
"""Get a external feature group entity from the feature store.
!!! warning "Deprecated"
@@ -234,7 +233,7 @@ def get_on_demand_feature_group(
@usage.method_logger
def get_external_feature_group(
self, name: str, version: int = None
- ) -> "feature_group.ExternalFeatureGroup":
+ ) -> feature_group.ExternalFeatureGroup:
"""Get a external feature group entity from the feature store.
Getting a external feature group from the Feature Store means getting its
@@ -280,7 +279,7 @@ def get_external_feature_group(
@usage.method_logger
def get_on_demand_feature_groups(
self, name: str
- ) -> List["feature_group.ExternalFeatureGroup"]:
+ ) -> List[feature_group.ExternalFeatureGroup]:
"""Get a list of all versions of an external feature group entity from the feature store.
!!! warning "Deprecated"
@@ -304,7 +303,7 @@ def get_on_demand_feature_groups(
@usage.method_logger
def get_external_feature_groups(
self, name: str
- ) -> List["feature_group.ExternalFeatureGroup"]:
+ ) -> List[feature_group.ExternalFeatureGroup]:
"""Get a list of all versions of an external feature group entity from the feature store.
Getting a external feature group from the Feature Store means getting its
@@ -337,7 +336,7 @@ def get_external_feature_groups(
def get_training_dataset(
self, name: str, version: int = None
- ) -> "training_dataset.TrainingDataset":
+ ) -> training_dataset.TrainingDataset:
"""Get a training dataset entity from the feature store.
!!! warning "Deprecated"
@@ -376,7 +375,7 @@ def get_training_dataset(
def get_training_datasets(
self, name: str
- ) -> List["training_dataset.TrainingDataset"]:
+ ) -> List[training_dataset.TrainingDataset]:
"""Get a list of all versions of a training dataset entity from the feature store.
!!! warning "Deprecated"
@@ -397,7 +396,7 @@ def get_training_datasets(
return self._training_dataset_api.get(name, None)
@usage.method_logger
- def get_storage_connector(self, name: str) -> "storage_connector.StorageConnector":
+ def get_storage_connector(self, name: str) -> storage_connector.StorageConnector:
"""Get a previously created storage connector from the feature store.
Storage connectors encapsulate all information needed for the execution engine
@@ -469,7 +468,7 @@ def sql(
)
@usage.method_logger
- def get_online_storage_connector(self) -> "storage_connector.StorageConnector":
+ def get_online_storage_connector(self) -> storage_connector.StorageConnector:
"""Get the storage connector for the Online Feature Store of the respective
project's feature store.
@@ -505,12 +504,15 @@ def create_feature_group(
event_time: Optional[str] = None,
stream: Optional[bool] = False,
expectation_suite: Optional[
- Union[expectation_suite.ExpectationSuite, ge.core.ExpectationSuite]
+ Union[
+ expectation_suite.ExpectationSuite,
+ TypeVar("great_expectations.core.ExpectationSuite"),
+ ]
] = None,
parents: Optional[List[feature_group.FeatureGroup]] = None,
topic_name: Optional[str] = None,
notification_topic_name: Optional[str] = None,
- ) -> "feature_group.FeatureGroup":
+ ) -> feature_group.FeatureGroup:
"""Create a feature group metadata object.
!!! example
@@ -576,7 +578,7 @@ def create_feature_group(
time for the features in this feature group. If event_time is set
the feature group can be used for point-in-time joins. Defaults to `None`.
- !!!note "Event time data type restriction"
+ !!! note "Event time data type restriction"
The supported data types for the event time column are: `timestamp`, `date` and `bigint`.
@@ -635,7 +637,10 @@ def get_or_create_feature_group(
features: Optional[List[feature.Feature]] = None,
statistics_config: Optional[Union[StatisticsConfig, bool, dict]] = None,
expectation_suite: Optional[
- Union[expectation_suite.ExpectationSuite, ge.core.ExpectationSuite]
+ Union[
+ expectation_suite.ExpectationSuite,
+ TypeVar("great_expectations.core.ExpectationSuite"),
+ ]
] = None,
event_time: Optional[str] = None,
stream: Optional[bool] = False,
@@ -643,9 +648,9 @@ def get_or_create_feature_group(
topic_name: Optional[str] = None,
notification_topic_name: Optional[str] = None,
) -> Union[
- "feature_group.FeatureGroup",
- "feature_group.ExternalFeatureGroup",
- "feature_group.SpineGroup",
+ feature_group.FeatureGroup,
+ feature_group.ExternalFeatureGroup,
+ feature_group.SpineGroup,
]:
"""Get feature group metadata object or create a new one if it doesn't exist. This method doesn't update existing feature group metadata object.
@@ -713,7 +718,7 @@ def get_or_create_feature_group(
time for the features in this feature group. If event_time is set
the feature group can be used for point-in-time joins. Defaults to `None`.
- !!!note "Event time data type restriction"
+ !!! note "Event time data type restriction"
The supported data types for the event time column are: `timestamp`, `date` and `bigint`.
@@ -781,11 +786,14 @@ def create_on_demand_feature_group(
statistics_config: Optional[Union[StatisticsConfig, bool, dict]] = None,
event_time: Optional[str] = None,
expectation_suite: Optional[
- Union[expectation_suite.ExpectationSuite, ge.core.ExpectationSuite]
+ Union[
+ expectation_suite.ExpectationSuite,
+ TypeVar("great_expectations.core.ExpectationSuite"),
+ ]
] = None,
topic_name: Optional[str] = None,
notification_topic_name: Optional[str] = None,
- ) -> "feature_group.ExternalFeatureGroup":
+ ) -> feature_group.ExternalFeatureGroup:
"""Create a external feature group metadata object.
!!! warning "Deprecated"
@@ -835,19 +843,20 @@ def create_on_demand_feature_group(
event_time: Optionally, provide the name of the feature containing the event
time for the features in this feature group. If event_time is set
the feature group can be used for point-in-time joins. Defaults to `None`.
+ !!! note "Event time data type restriction"
+ The supported data types for the event time column are: `timestamp`, `date` and `bigint`.
topic_name: Optionally, define the name of the topic used for data ingestion. If left undefined it
defaults to using project topic.
notification_topic_name: Optionally, define the name of the topic used for sending notifications when entries
are inserted or updated on the online feature store. If left undefined no notifications are sent.
-
- !!!note "Event time data type restriction"
- The supported data types for the event time column are: `timestamp`, `date` and `bigint`.
-
-
expectation_suite: Optionally, attach an expectation suite to the feature
group which dataframes should be validated against upon insertion.
Defaults to `None`.
+
+
+
+
# Returns
`ExternalFeatureGroup`. The external feature group metadata object.
"""
@@ -890,12 +899,15 @@ def create_external_feature_group(
statistics_config: Optional[Union[StatisticsConfig, bool, dict]] = None,
event_time: Optional[str] = None,
expectation_suite: Optional[
- Union[expectation_suite.ExpectationSuite, ge.core.ExpectationSuite]
+ Union[
+ expectation_suite.ExpectationSuite,
+ TypeVar("great_expectations.core.ExpectationSuite"),
+ ]
] = None,
- online_enabled: Optional[bool] = False,
+ online_enabled: bool = False,
topic_name: Optional[str] = None,
notification_topic_name: Optional[str] = None,
- ) -> "feature_group.ExternalFeatureGroup":
+ ) -> feature_group.ExternalFeatureGroup:
"""Create a external feature group metadata object.
!!! example
@@ -985,12 +997,11 @@ def create_external_feature_group(
!!! note "Event time data type restriction"
The supported data types for the event time column are: `timestamp`, `date` and `bigint`.
-
+ online_enabled: Define whether it should be possible to sync the feature group to
+ the online feature store for low latency access, defaults to `False`.
expectation_suite: Optionally, attach an expectation suite to the feature
group which dataframes should be validated against upon insertion.
Defaults to `None`.
- online_enabled: Define whether it should be possible to sync the feature group to
- the online feature store for low latency access, defaults to `False`.
topic_name: Optionally, define the name of the topic used for data ingestion. If left undefined it
defaults to using project topic.
notification_topic_name: Optionally, define the name of the topic used for sending notifications when entries
@@ -1039,7 +1050,7 @@ def get_or_create_spine_group(
np.ndarray,
List[list],
] = None,
- ) -> "feature_group.SpineGroup":
+ ) -> feature_group.SpineGroup:
"""Create a spine group metadata object.
Instead of using a feature group to save a label/prediction target, you can use a spine together with a dataframe containing the labels.
@@ -1131,7 +1142,7 @@ def get_or_create_spine_group(
schema information of the DataFrame resulting by executing the provided query
against the data source.
- !!!note "Event time data type restriction"
+ !!! note "Event time data type restriction"
The supported data types for the event time column are: `timestamp`, `date` and `bigint`.
@@ -1295,7 +1306,7 @@ def create_transformation_function(
bool,
],
version: Optional[int] = None,
- ) -> "TransformationFunction":
+ ) -> TransformationFunction:
"""Create a transformation function metadata object.
!!! example
@@ -1339,7 +1350,7 @@ def get_transformation_function(
self,
name: str,
version: Optional[int] = None,
- ) -> "TransformationFunction":
+ ) -> TransformationFunction:
"""Get transformation function metadata object.
!!! example "Get transformation function by name. This will default to version 1"
@@ -1437,7 +1448,7 @@ def get_transformation_function(
return self._transformation_function_engine.get_transformation_fn(name, version)
@usage.method_logger
- def get_transformation_functions(self) -> List["TransformationFunction"]:
+ def get_transformation_functions(self) -> List[TransformationFunction]:
"""Get all transformation functions metadata objects.
!!! example "Get all transformation functions"
@@ -1657,7 +1668,7 @@ def get_or_create_feature_view(
@usage.method_logger
def get_feature_view(
self, name: str, version: int = None
- ) -> "feature_view.FeatureView":
+ ) -> feature_view.FeatureView:
"""Get a feature view entity from the feature store.
Getting a feature view from the Feature Store means getting its metadata.
@@ -1697,7 +1708,7 @@ def get_feature_view(
return self._feature_view_engine.get(name, version)
@usage.method_logger
- def get_feature_views(self, name: str) -> List["feature_view.FeatureView"]:
+ def get_feature_views(self, name: str) -> List[feature_view.FeatureView]:
"""Get a list of all versions of a feature view entity from the feature store.
Getting a feature view from the Feature Store means getting its metadata.
diff --git a/python/hsfs/feature_view.py b/python/hsfs/feature_view.py
index d4ea5fc51..1afc9569b 100644
--- a/python/hsfs/feature_view.py
+++ b/python/hsfs/feature_view.py
@@ -907,7 +907,7 @@ def _extract_primary_key(self, result_key: Dict[str, str]) -> Dict[str, str]:
def _get_embedding_fgs(
self,
- ) -> Set["feature_group.FeatureGroup"]:
+ ) -> Set[feature_group.FeatureGroup]:
return set([fg for fg in self.query.featuregroups if fg.embedding_index])
@usage.method_logger
diff --git a/python/hsfs/ge_expectation.py b/python/hsfs/ge_expectation.py
index 6ec2f99bf..fcb68b199 100644
--- a/python/hsfs/ge_expectation.py
+++ b/python/hsfs/ge_expectation.py
@@ -16,11 +16,20 @@
from __future__ import annotations
import json
-from typing import Any, Dict, Optional
+from typing import TYPE_CHECKING, Any, Dict, Optional
+
+
+if TYPE_CHECKING:
+ import great_expectations
-import great_expectations as ge
import humps
from hsfs import util
+from hsfs.core.constants import HAS_GREAT_EXPECTATIONS
+from hsfs.decorators import uses_great_expectations
+
+
+if HAS_GREAT_EXPECTATIONS:
+ import great_expectations
class GeExpectation:
@@ -63,7 +72,9 @@ def from_response_json(cls, json_dict):
return cls(**json_decamelized)
@classmethod
- def from_ge_type(cls, ge_expectation: ge.core.ExpectationConfiguration):
+ def from_ge_type(
+ cls, ge_expectation: great_expectations.core.ExpectationConfiguration
+ ):
return cls(**ge_expectation.to_json_dict())
def to_dict(self) -> Dict[str, Any]:
@@ -100,8 +111,9 @@ def __repr__(self):
+ f"kwargs={self._kwargs}, meta={self._meta})"
)
- def to_ge_type(self) -> ge.core.ExpectationConfiguration:
- return ge.core.ExpectationConfiguration(
+ @uses_great_expectations
+ def to_ge_type(self) -> great_expectations.core.ExpectationConfiguration:
+ return great_expectations.core.ExpectationConfiguration(
expectation_type=self.expectation_type, kwargs=self.kwargs, meta=self.meta
)
diff --git a/python/hsfs/ge_validation_result.py b/python/hsfs/ge_validation_result.py
index eaa2b150d..b345c9b23 100644
--- a/python/hsfs/ge_validation_result.py
+++ b/python/hsfs/ge_validation_result.py
@@ -17,12 +17,22 @@
import datetime
import json
-from typing import Any, Dict, Optional, Union
+from typing import TYPE_CHECKING, Any, Dict, Literal, Optional, Union
+
+
+if TYPE_CHECKING:
+ import great_expectations
+
import dateutil
-import great_expectations as ge
import humps
from hsfs import util
+from hsfs.core.constants import HAS_GREAT_EXPECTATIONS
+from hsfs.decorators import uses_great_expectations
+
+
+if HAS_GREAT_EXPECTATIONS:
+ import great_expectations
class ValidationResult:
@@ -40,12 +50,9 @@ def __init__(
expectation_id: Optional[int] = None,
validation_report_id: Optional[int] = None,
validation_time: Optional[int] = None,
- ingestion_result: Optional[str] = "UNKNOWN",
- href=None,
- expand=None,
- items=None,
- count=None,
- type=None,
+ ingestion_result: Literal[
+ "unknown", "ingested", "rejected", "fg_data", "experiment"
+ ] = "UNKNOWN",
**kwargs,
):
self._id = id
@@ -101,8 +108,9 @@ def to_json_dict(self) -> Dict[str, Any]:
"meta": self._meta,
}
- def to_ge_type(self) -> ge.core.ExpectationValidationResult:
- return ge.core.ExpectationValidationResult(
+ @uses_great_expectations
+ def to_ge_type(self) -> great_expectations.core.ExpectationValidationResult:
+ return great_expectations.core.ExpectationValidationResult(
success=self.success,
exception_info=self.exception_info,
expectation_config=self.expectation_config,
@@ -221,13 +229,20 @@ def validation_time(
self._validation_time = None
@property
- def ingestion_result(self) -> str:
+ def ingestion_result(
+ self,
+ ) -> Literal["ingested", "rejected", "unknown", "experiment", "fg_data"]:
return self._ingestion_result
@ingestion_result.setter
- def ingestion_result(self, ingestion_result: str = "UNKNOWN"):
- allowed_values = ["INGESTED", "REJECTED", "UNKNOWN", "EXPERIMENT", "FG_DATA"]
- if ingestion_result.upper() in allowed_values:
+ def ingestion_result(
+ self,
+ ingestion_result: Literal[
+ "ingested", "rejected", "unknown", "experiment", "fg_data"
+ ] = "unknown",
+ ):
+ allowed_values = ["ingested", "rejected", "unknown", "experiment", "fg_data"]
+ if ingestion_result.lower() in allowed_values:
self._ingestion_result = ingestion_result
else:
raise ValueError(
diff --git a/python/hsfs/util.py b/python/hsfs/util.py
index 3728b7f2f..6d4e095e1 100644
--- a/python/hsfs/util.py
+++ b/python/hsfs/util.py
@@ -18,8 +18,8 @@
import asyncio
import itertools
import json
+import logging
import re
-import sys
import threading
import time
from datetime import date, datetime, timezone
@@ -40,6 +40,8 @@
FEATURE_STORE_NAME_SUFFIX = "_featurestore"
+_logger = logging.getLogger(__name__)
+
class FeatureStoreEncoder(json.JSONEncoder):
def default(self, o: Any) -> Dict[str, Any]:
@@ -540,13 +542,6 @@ def build_serving_keys_from_prepared_statements(
return serving_keys
-def is_runtime_notebook():
- if "ipykernel" in sys.modules:
- return True
- else:
- return False
-
-
class NpDatetimeEncoder(json.JSONEncoder):
def default(self, obj):
dtypes = (np.datetime64, np.complexfloating)
diff --git a/python/hsfs/validation_report.py b/python/hsfs/validation_report.py
index 5de4d529c..c6aa3fc81 100644
--- a/python/hsfs/validation_report.py
+++ b/python/hsfs/validation_report.py
@@ -16,14 +16,23 @@
from __future__ import annotations
import json
-from typing import Any, Dict, List, Optional, Union
+from typing import TYPE_CHECKING, Any, Dict, List, Literal, Optional, Union
+
+
+if TYPE_CHECKING:
+ import great_expectations
-import great_expectations as ge
import humps
from hsfs import util
+from hsfs.core.constants import HAS_GREAT_EXPECTATIONS
+from hsfs.decorators import uses_great_expectations
from hsfs.ge_validation_result import ValidationResult
+if HAS_GREAT_EXPECTATIONS:
+ import great_expectations
+
+
class ValidationReport:
"""Metadata object representing a validation report generated by Great Expectations in the Feature Store."""
@@ -32,9 +41,9 @@ def __init__(
success: bool,
results: List[
Union[
- "ValidationResult",
+ ValidationResult,
Dict[str, Any],
- ge.core.expectation_validation_result.ExpectationValidationResult,
+ great_expectations.core.expectation_validation_result.ExpectationValidationResult,
]
],
meta: Optional[Union[Dict[str, Any], str]],
@@ -44,13 +53,10 @@ def __init__(
full_report_path: Optional[str] = None,
featurestore_id: Optional[int] = None,
featuregroup_id: Optional[int] = None,
- href: Optional[str] = None,
- expand: bool = None,
- items: Optional[Dict[str, Any]] = None,
- count: Optional[int] = None,
- type: Optional[str] = None,
validation_time: Optional[str] = None,
- ingestion_result: str = "UNKNOWN",
+ ingestion_result: Literal[
+ "ingested", "rejected", "unknown", "experiment", "fg_data"
+ ] = "unknown",
**kwargs,
) -> None:
self._id = id
@@ -69,7 +75,7 @@ def __init__(
@classmethod
def from_response_json(
cls, json_dict: Dict[str, Any]
- ) -> Union[List["ValidationReport"], "ValidationReport"]:
+ ) -> Union[List[ValidationReport], ValidationReport]:
json_decamelized = humps.decamelize(json_dict)
if (
"count" in json_decamelized
@@ -94,7 +100,7 @@ def to_dict(self) -> Dict[str, Union[int, str, bool]]:
"statistics": json.dumps(self._statistics),
"results": self._results,
"meta": json.dumps(self._meta),
- "ingestionResult": self._ingestion_result,
+ "ingestionResult": self._ingestion_result.upper(),
}
def to_json_dict(self) -> Dict[str, Any]:
@@ -107,8 +113,9 @@ def to_json_dict(self) -> Dict[str, Any]:
"meta": self._meta,
}
- def to_ge_type(self) -> ge.core.ExpectationSuiteValidationResult:
- return ge.core.ExpectationSuiteValidationResult(
+ @uses_great_expectations
+ def to_ge_type(self) -> great_expectations.core.ExpectationSuiteValidationResult:
+ return great_expectations.core.ExpectationSuiteValidationResult(
success=self.success,
statistics=self.statistics,
results=[result.to_ge_type() for result in self.results],
@@ -135,7 +142,7 @@ def success(self, success: bool) -> None:
self._success = success
@property
- def results(self) -> List["ValidationResult"]:
+ def results(self) -> List[ValidationResult]:
"""List of expectation results obtained after validation."""
return self._results
@@ -144,9 +151,9 @@ def results(
self,
results: List[
Union[
- "ValidationResult",
+ ValidationResult,
Dict[str, Any],
- ge.core.expectation_validation_result.ExpectationValidationResult,
+ great_expectations.core.expectation_validation_result.ExpectationValidationResult,
]
],
) -> None:
@@ -158,7 +165,7 @@ def results(
self._results = [ValidationResult(**result) for result in results]
elif isinstance(
results[0],
- ge.core.expectation_validation_result.ExpectationValidationResult,
+ great_expectations.core.expectation_validation_result.ExpectationValidationResult,
):
self._results = [
ValidationResult(**result.to_json_dict()) for result in results
@@ -224,11 +231,16 @@ def ingestion_result(self) -> str:
return self._ingestion_result
@ingestion_result.setter
- def ingestion_result(self, ingestion_result: Optional[str]) -> None:
- allowed_values = ["INGESTED", "REJECTED", "UNKNOWN", "EXPERIMENT", "FG_DATA"]
+ def ingestion_result(
+ self,
+ ingestion_result: Optional[
+ Literal["ingested", "rejected", "experiment", "unknown", "fg_data"]
+ ],
+ ) -> None:
if ingestion_result is None:
ingestion_result = "UNKNOWN"
- if ingestion_result.upper() in allowed_values:
+ allowed_values = ["ingested", "rejected", "experiment", "unknown", "fg_data"]
+ if ingestion_result.lower() in allowed_values:
self._ingestion_result = ingestion_result
else:
raise ValueError(
diff --git a/python/pyproject.toml b/python/pyproject.toml
index 0ba86dfc4..499d2097e 100644
--- a/python/pyproject.toml
+++ b/python/pyproject.toml
@@ -41,7 +41,6 @@ dependencies = [
"avro==1.11.3",
"sqlalchemy",
"PyMySQL[rsa]",
- "great_expectations==0.18.12",
"tzlocal",
"fsspec",
"retrying",
@@ -51,7 +50,21 @@ dependencies = [
]
[project.optional-dependencies]
-dev = [
+hive = [
+ "pyhopshive[thrift]",
+ "pyarrow>=10.0",
+ "confluent-kafka<=2.3.0",
+ "fastavro>=1.4.11,<=1.8.4",
+]
+python = [
+ "pyhopshive[thrift]",
+ "pyarrow>=10.0",
+ "confluent-kafka<=2.3.0",
+ "fastavro>=1.4.11,<=1.8.4",
+ "tqdm",
+]
+great-expectations = ["great_expectations==0.18.12"]
+dev-no-opt = [
"pytest==7.4.4",
"pytest-mock==3.12.0",
"ruff",
@@ -68,19 +81,8 @@ dev-pandas1 = [
"pandas<=1.5.3",
"sqlalchemy<=1.4.48",
]
-hive = [
- "pyhopshive[thrift]",
- "pyarrow>=10.0",
- "confluent-kafka<=2.3.0",
- "fastavro>=1.4.11,<=1.8.4",
-]
-python = [
- "pyhopshive[thrift]",
- "pyarrow>=10.0",
- "confluent-kafka<=2.3.0",
- "fastavro>=1.4.11,<=1.8.4",
- "tqdm",
-]
+dev = ["hsfs[dev-no-opt]", "hsfs[great-expectations]"]
+
[build-system]
requires = ["setuptools", "wheel"]
diff --git a/python/tests/core/test_great_expectation_engine.py b/python/tests/core/test_great_expectation_engine.py
index 6c42655a5..38c7360b9 100644
--- a/python/tests/core/test_great_expectation_engine.py
+++ b/python/tests/core/test_great_expectation_engine.py
@@ -14,10 +14,16 @@
# limitations under the License.
#
-import great_expectations as ge
import hsfs.expectation_suite as es
+import pandas as pd
+import pytest
from hsfs import feature_group, validation_report
from hsfs.core import great_expectation_engine
+from hsfs.core.constants import HAS_GREAT_EXPECTATIONS
+
+
+if HAS_GREAT_EXPECTATIONS:
+ import great_expectations
class TestCodeEngine:
@@ -96,6 +102,10 @@ def test_validate_suite(self, mocker):
assert mock_fg_save_validation_report.call_count == 0
assert mock_vr.call_count == 0
+ @pytest.mark.skipif(
+ not HAS_GREAT_EXPECTATIONS,
+ reason="Great Expectations is not installed",
+ )
def test_validate_suite_validation_options(self, mocker):
# Arrange
feature_store_id = 99
@@ -137,6 +147,10 @@ def test_validate_suite_validation_options(self, mocker):
assert mock_fg_save_validation_report.call_count == 0
assert mock_vr.call_count == 0
+ @pytest.mark.skipif(
+ not HAS_GREAT_EXPECTATIONS,
+ reason="Great Expectations is not installed",
+ )
def test_validate_suite_validation_options_save_report(self, mocker):
# Arrange
feature_store_id = 99
@@ -179,6 +193,10 @@ def test_validate_suite_validation_options_save_report(self, mocker):
assert mock_fg_save_validation_report.call_count == 1
assert mock_vr.call_count == 0
+ @pytest.mark.skipif(
+ not HAS_GREAT_EXPECTATIONS,
+ reason="Great Expectations is not installed",
+ )
def test_convert_expectation_suite(self, mocker):
# Arrange
feature_store_id = 99
@@ -189,7 +207,7 @@ def test_convert_expectation_suite(self, mocker):
mocker.patch("hsfs.engine.get_type")
mocker.patch("hsfs.engine.get_instance")
- suite = ge.core.ExpectationSuite(
+ suite = great_expectations.core.ExpectationSuite(
expectation_suite_name="suite_name",
)
@@ -199,7 +217,7 @@ def test_convert_expectation_suite(self, mocker):
featurestore_id=feature_store_id,
primary_key=[],
partition_key=[],
- expectation_suite=ge.core.ExpectationSuite(
+ expectation_suite=great_expectations.core.ExpectationSuite(
expectation_suite_name="attached_to_feature_group",
),
)
@@ -218,6 +236,10 @@ def test_convert_expectation_suite(self, mocker):
assert converted_suite.expectation_suite_name == "suite_name"
assert mock_fg_get_expectation_suite.call_count == 0
+ @pytest.mark.skipif(
+ not HAS_GREAT_EXPECTATIONS,
+ reason="Great Expectations is not installed.",
+ )
def test_fake_convert_expectation_suite(self, mocker):
# Arrange
feature_store_id = 99
@@ -238,7 +260,7 @@ def test_fake_convert_expectation_suite(self, mocker):
featurestore_id=feature_store_id,
primary_key=[],
partition_key=[],
- expectation_suite=ge.core.ExpectationSuite(
+ expectation_suite=great_expectations.core.ExpectationSuite(
expectation_suite_name="attached_to_feature_group",
),
)
@@ -257,6 +279,10 @@ def test_fake_convert_expectation_suite(self, mocker):
assert converted_suite.expectation_suite_name == "suite_name"
assert mock_fg_get_expectation_suite.call_count == 0
+ @pytest.mark.skipif(
+ not HAS_GREAT_EXPECTATIONS,
+ reason="Great Expectations is not installed.",
+ )
def test_fetch_expectation_suite(self, mocker):
# Arrange
feature_store_id = 99
@@ -275,7 +301,7 @@ def test_fetch_expectation_suite(self, mocker):
featurestore_id=feature_store_id,
primary_key=[],
partition_key=[],
- expectation_suite=ge.core.ExpectationSuite(
+ expectation_suite=great_expectations.core.ExpectationSuite(
expectation_suite_name="attached_to_feature_group",
),
)
@@ -292,6 +318,10 @@ def test_fetch_expectation_suite(self, mocker):
# Assert
assert mock_fg_get_expectation_suite.call_count == 1
+ @pytest.mark.skipif(
+ not HAS_GREAT_EXPECTATIONS,
+ reason="Great Expectations is not installed.",
+ )
def test_fetch_expectation_suite_false(self, mocker):
# Arrange
feature_store_id = 99
@@ -310,7 +340,7 @@ def test_fetch_expectation_suite_false(self, mocker):
featurestore_id=feature_store_id,
primary_key=[],
partition_key=[],
- expectation_suite=ge.core.ExpectationSuite(
+ expectation_suite=great_expectations.core.ExpectationSuite(
expectation_suite_name="attached_to_feature_group",
),
)
@@ -331,7 +361,7 @@ def test_fetch_expectation_suite_false(self, mocker):
assert result.expectation_suite_name == "attached_to_feature_group"
assert isinstance(result, es.ExpectationSuite)
- def test_should_run_validation_based_on_suite(self, mocker):
+ def test_should_run_validation_based_on_suite(self):
# Arrange
feature_store_id = 99
ge_engine = great_expectation_engine.GreatExpectationEngine(
@@ -353,7 +383,7 @@ def test_should_run_validation_based_on_suite(self, mocker):
# Assert
assert run_validation is True
- def test_should_not_run_validation_based_on_suite(self, mocker):
+ def test_should_not_run_validation_based_on_suite(self):
# Arrange
feature_store_id = 99
ge_engine = great_expectation_engine.GreatExpectationEngine(
@@ -397,7 +427,7 @@ def test_should_run_validation_based_validation_options(self, mocker):
# Assert
assert run_validation is False
- def test_should_not_run_validation_based_validation_options(self, mocker):
+ def test_should_not_run_validation_based_validation_options(self):
# Arrange
feature_store_id = 99
ge_engine = great_expectation_engine.GreatExpectationEngine(
@@ -419,6 +449,10 @@ def test_should_not_run_validation_based_validation_options(self, mocker):
# Assert
assert run_validation is True
+ @pytest.mark.skipif(
+ not HAS_GREAT_EXPECTATIONS,
+ reason="Great Expectations is not installed",
+ )
def test_not_save_but_convert_report(self, mocker):
# Arrange
feature_store_id = 99
@@ -441,7 +475,7 @@ def test_not_save_but_convert_report(self, mocker):
partition_key=[],
)
- report = ge.core.ExpectationSuiteValidationResult()
+ report = great_expectations.core.ExpectationSuiteValidationResult()
mock_save_validation_report = mocker.patch(
"hsfs.feature_group.FeatureGroup.save_validation_report"
@@ -460,6 +494,10 @@ def test_not_save_but_convert_report(self, mocker):
assert isinstance(converted_report, validation_report.ValidationReport)
assert mock_save_validation_report.call_count == 0
+ @pytest.mark.skipif(
+ not HAS_GREAT_EXPECTATIONS,
+ reason="Great Expectations is not installed",
+ )
def test_save_but_not_convert_report(self, mocker):
# Arrange
feature_store_id = 99
@@ -482,7 +520,7 @@ def test_save_but_not_convert_report(self, mocker):
partition_key=[],
)
- report = ge.core.ExpectationSuiteValidationResult()
+ report = great_expectations.core.ExpectationSuiteValidationResult()
mock_save_validation_report = mocker.patch(
"hsfs.feature_group.FeatureGroup.save_validation_report"
@@ -500,6 +538,10 @@ def test_save_but_not_convert_report(self, mocker):
# Assert
assert mock_save_validation_report.call_count == 1
+ @pytest.mark.skipif(
+ not HAS_GREAT_EXPECTATIONS,
+ reason="Great Expectations is not installed",
+ )
def test_not_save_and_not_convert_report(self, mocker):
# Arrange
feature_store_id = 99
@@ -523,7 +565,7 @@ def test_not_save_and_not_convert_report(self, mocker):
partition_key=[],
)
- report = ge.core.ExpectationSuiteValidationResult()
+ report = great_expectations.core.ExpectationSuiteValidationResult()
mock_save_validation_report = mocker.patch(
"hsfs.feature_group.FeatureGroup.save_validation_report"
@@ -539,5 +581,40 @@ def test_not_save_and_not_convert_report(self, mocker):
)
# Assert
- assert isinstance(converted_report, ge.core.ExpectationSuiteValidationResult)
+ assert isinstance(
+ converted_report, great_expectations.core.ExpectationSuiteValidationResult
+ )
assert mock_save_validation_report.call_count == 0
+
+ @pytest.mark.skipif(
+ HAS_GREAT_EXPECTATIONS,
+ reason="Great Expectations is installed, do not check for module not found error",
+ )
+ def test_raise_module_not_found_error(self, mocker):
+ # Arrange
+ ge_engine = great_expectation_engine.GreatExpectationEngine(feature_store_id=11)
+ mocker.patch("hsfs.engine.get_type")
+ mocker.patch("hsfs.engine.get_instance")
+ fg = feature_group.FeatureGroup(
+ name="test",
+ version=1,
+ featurestore_id=11,
+ partition_key=[],
+ primary_key=["id"],
+ )
+ df = pd.DataFrame({"id": [1, 2], "name": ["Alice", "Bob"]})
+ suite = es.ExpectationSuite(
+ expectation_suite_name="suite_name",
+ expectations=[],
+ meta={},
+ run_validation=True,
+ )
+ mocker.patch("hsfs.util.get_feature_group_url", return_value="https://url")
+
+ # Act
+ with pytest.raises(ModuleNotFoundError):
+ ge_engine.validate(
+ feature_group=fg,
+ dataframe=df,
+ expectation_suite=suite,
+ )
diff --git a/python/tests/engine/test_python.py b/python/tests/engine/test_python.py
index 08bc8d52a..de070ea04 100644
--- a/python/tests/engine/test_python.py
+++ b/python/tests/engine/test_python.py
@@ -35,7 +35,9 @@
from hsfs.constructor import query
from hsfs.constructor.hudi_feature_group_alias import HudiFeatureGroupAlias
from hsfs.core import inode, job
+from hsfs.core.constants import HAS_GREAT_EXPECTATIONS
from hsfs.engine import python
+from hsfs.expectation_suite import ExpectationSuite
from hsfs.training_dataset_feature import TrainingDatasetFeature
from polars.testing import assert_frame_equal as polars_assert_frame_equal
@@ -777,11 +779,6 @@ def test_show(self, mocker):
assert mock_python_engine_sql.call_count == 1
def test_cast_columns(self, mocker):
- class LabelIndex:
- def __init__(self, label, index):
- self.label = label
- self.index = index
-
python_engine = python.Engine()
d = {
"string": ["s", "s"],
@@ -842,7 +839,7 @@ def __init__(self, label, index):
for col in cast_df.columns:
assert cast_df[col].dtype == expected[col]
- def test_register_external_temporary_table(self, mocker):
+ def test_register_external_temporary_table(self):
# Arrange
python_engine = python.Engine()
@@ -1302,6 +1299,10 @@ def test_validate(self):
== "Deequ data validation is only available with Spark Engine. Use validate_with_great_expectations"
)
+ @pytest.mark.skipif(
+ not HAS_GREAT_EXPECTATIONS,
+ reason="Great Expectations is not installed.",
+ )
def test_validate_with_great_expectations(self, mocker):
# Arrange
mock_ge_from_pandas = mocker.patch("great_expectations.from_pandas")
@@ -1316,6 +1317,26 @@ def test_validate_with_great_expectations(self, mocker):
# Assert
assert mock_ge_from_pandas.call_count == 1
+ @pytest.mark.skipif(
+ HAS_GREAT_EXPECTATIONS,
+ reason="Great Expectations is installed.",
+ )
+ def test_validate_with_great_expectations_raise_module_not_found(self):
+ # Arrange
+ python_engine = python.Engine()
+ suite = ExpectationSuite(
+ expectation_suite_name="test_suite",
+ expectations=[],
+ meta={},
+ run_validation=True,
+ )
+
+ # Act
+ with pytest.raises(ModuleNotFoundError):
+ python_engine.validate_with_great_expectations(
+ dataframe=None, expectation_suite=suite, ge_validate_kwargs={}
+ )
+
def test_set_job_group(self):
# Arrange
python_engine = python.Engine()
@@ -3763,7 +3784,7 @@ def test_materialization_kafka_first_job_execution(self, mocker):
args="defaults tests_offsets",
await_termination=False,
)
-
+
def test_materialization_kafka_skip_offsets(self, mocker):
# Arrange
mocker.patch("hsfs.engine.python.Engine._get_kafka_config", return_value={})
@@ -3805,7 +3826,10 @@ def test_materialization_kafka_skip_offsets(self, mocker):
python_engine._write_dataframe_kafka(
feature_group=fg,
dataframe=df,
- offline_write_options={"start_offline_materialization": True, "skip_offsets": True},
+ offline_write_options={
+ "start_offline_materialization": True,
+ "skip_offsets": True,
+ },
)
# Assert
diff --git a/python/tests/engine/test_spark.py b/python/tests/engine/test_spark.py
index 5c7d76add..8b98f521c 100644
--- a/python/tests/engine/test_spark.py
+++ b/python/tests/engine/test_spark.py
@@ -32,6 +32,7 @@
from hsfs.client import exceptions
from hsfs.constructor import hudi_feature_group_alias, query
from hsfs.core import training_dataset_engine
+from hsfs.core.constants import HAS_GREAT_EXPECTATIONS
from hsfs.engine import spark
from hsfs.training_dataset_feature import TrainingDatasetFeature
from pyspark.sql import DataFrame
@@ -3392,6 +3393,10 @@ def test_profile(self, mocker):
== 1
)
+ @pytest.mark.skipif(
+ HAS_GREAT_EXPECTATIONS is False,
+ reason="Great Expectations is not installed",
+ )
def test_validate_with_great_expectations(self, mocker):
# Arrange
spark_engine = spark.Engine()
diff --git a/python/tests/test_feature_group.py b/python/tests/test_feature_group.py
index 56b870d23..3475d8664 100644
--- a/python/tests/test_feature_group.py
+++ b/python/tests/test_feature_group.py
@@ -13,7 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-
import warnings
import hsfs
@@ -31,6 +30,7 @@
util,
)
from hsfs.client.exceptions import FeatureStoreException, RestAPIError
+from hsfs.core.constants import HAS_GREAT_EXPECTATIONS
from hsfs.engine import python
@@ -145,7 +145,7 @@ def test_from_response_json_basic_info(self, backend_fixtures):
assert fg._feature_store_id == 67
assert fg.description == ""
assert fg.partition_key == []
- assert fg.primary_key == ['intt']
+ assert fg.primary_key == ["intt"]
assert fg.hudi_precombine_key is None
assert fg._feature_store_name is None
assert fg.created is None
@@ -322,7 +322,7 @@ def test_constructor_with_list_event_time_for_compatibility(
version=1,
description="fg_description",
event_time=["event_date"],
- features=features
+ features=features,
)
with pytest.raises(FeatureStoreException):
util.verify_attribute_key_names(new_fg, False)
@@ -810,6 +810,10 @@ def test_feature_group_set_expectation_suite(
assert fg.expectation_suite._feature_group_id == fg.id
assert fg.expectation_suite._feature_store_id == fg.feature_store_id
+ @pytest.mark.skipif(
+ not HAS_GREAT_EXPECTATIONS,
+ reason="great_expectations not installed",
+ )
def test_feature_group_save_expectation_suite_from_ge_type(
self, mocker, backend_fixtures
):
diff --git a/python/tests/test_ge_expectation.py b/python/tests/test_ge_expectation.py
index 85f3b195e..16a694720 100644
--- a/python/tests/test_ge_expectation.py
+++ b/python/tests/test_ge_expectation.py
@@ -13,10 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-
-
-import great_expectations as ge
+import pytest
from hsfs import ge_expectation
+from hsfs.core.constants import HAS_GREAT_EXPECTATIONS
class TestGeExpectation:
@@ -62,13 +61,19 @@ def test_from_response_json_list_empty(self, backend_fixtures):
# Assert
assert len(ge_list) == 0
+ @pytest.mark.skipif(
+ not HAS_GREAT_EXPECTATIONS,
+ reason="great_expectations not installed",
+ )
def test_from_ge_object(self):
+ import great_expectations
+
# Arrange
expectationId = 32
expectation_type = "expect_column_min_to_be_between"
kwargs = {"kwargs_key": "kwargs_value"}
meta = {"meta_key": "meta_value", "expectationId": expectationId}
- ge_object = ge.core.ExpectationConfiguration(
+ ge_object = great_expectations.core.ExpectationConfiguration(
expectation_type=expectation_type,
kwargs=kwargs,
meta=meta,
diff --git a/python/tests/test_validation_report.py b/python/tests/test_validation_report.py
index c273221d7..a883e7188 100644
--- a/python/tests/test_validation_report.py
+++ b/python/tests/test_validation_report.py
@@ -56,7 +56,7 @@ def test_from_response_json_basic_info(self, backend_fixtures):
assert vr._validation_time is None
assert vr._featurestore_id is None
assert vr._featuregroup_id is None
- assert vr.ingestion_result == "UNKNOWN"
+ assert vr.ingestion_result == "unknown"
assert len(vr.results) == 1
assert isinstance(vr.results[0], ge_validation_result.ValidationResult)
assert vr.meta == {"meta_key": "meta_value"}
diff --git a/requirements-docs.txt b/requirements-docs.txt
index d1499a262..2a2e7927b 100644
--- a/requirements-docs.txt
+++ b/requirements-docs.txt
@@ -1,7 +1,7 @@
mkdocs==1.5.3
mkdocs-material==9.5.17
mike==2.0.0
-sphinx==7.2.6
+sphinx==7.3.7
keras_autodoc @ git+https://git@github.com/logicalclocks/keras-autodoc
markdown-include==0.8.1
mkdocs-jupyter==0.24.3
@@ -9,3 +9,4 @@ markdown==3.6
pymdown-extensions==10.7.1
mkdocs-macros-plugin==1.0.4
mkdocs-minify-plugin>=0.2.0
+