From 2665d5a6e340ef3a2d5c45b6244759410d5770fa Mon Sep 17 00:00:00 2001 From: kennethmhc Date: Tue, 10 Sep 2024 09:11:27 +0200 Subject: [PATCH 01/23] [APPEND][FSTORE-1509] Fix online model logging (#320) --- python/hsfs/core/feature_view_engine.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/hsfs/core/feature_view_engine.py b/python/hsfs/core/feature_view_engine.py index 93ec3a104..cba03007c 100644 --- a/python/hsfs/core/feature_view_engine.py +++ b/python/hsfs/core/feature_view_engine.py @@ -1210,7 +1210,7 @@ def _get_feature_logging_data( model_col_name=FeatureViewEngine._HSML_MODEL, predictions=predictions, training_dataset_version=training_dataset_version, - hsml_model=hsml_model, + hsml_model=self.get_hsml_model_value(hsml_model) if hsml_model else None, ) else: return engine.get_instance().get_feature_logging_df( From d4b25317dc017f3acfd70ccf5247093340a5049b Mon Sep 17 00:00:00 2001 From: manu-sj <152865565+manu-sj@users.noreply.github.com> Date: Tue, 10 Sep 2024 15:56:19 +0200 Subject: [PATCH 02/23] [FSTORE-1488] Create `select_features` function that return all features in the feature group excluding event_time and primary_key (#303) * adding function for selecting all features * adding unit tests * adding warnings * updating documentation * Slight change to docs and less agressive warning * Switch to positive and helpful warning --------- Co-authored-by: Victor Jouffrey --- python/hsfs/feature_group.py | 107 ++++++++++++++++++++++++++++- python/tests/test_feature_group.py | 6 ++ 2 files changed, 112 insertions(+), 1 deletion(-) diff --git a/python/hsfs/feature_group.py b/python/hsfs/feature_group.py index 7eb7062c3..95604d9ff 100644 --- a/python/hsfs/feature_group.py +++ b/python/hsfs/feature_group.py @@ -254,7 +254,7 @@ def select_all( include_primary_key: Optional[bool] = True, include_event_time: Optional[bool] = True, ) -> query.Query: - """Select all features in the feature group and return a query object. + """Select all features along with primary key and event time from the feature group and return a query object. The query can be used to construct joins of feature groups or create a feature view. @@ -319,6 +319,111 @@ def select_all( else: return self.select_except(self.primary_key + [self.event_time]) + def select_features( + self, + ) -> query.Query: + """Select all the features in the feature group and return a query object. + + Queries define the schema of Feature View objects which can be used to + create Training Datasets, read from the Online Feature Store, and more. They can + also be composed to create more complex queries using the `join` method. + + !!! info + This method does not select the primary key and event time of the feature group. + Use `select_all` to include them. + Note that primary keys do not need to be included in the query to allow joining + on them. + + !!! example + ```python + # connect to the Feature Store + fs = hopsworks.login().get_feature_store() + + # Some dataframe to create the feature group with + # both an event time and a primary key column + my_df.head() + +------------+------------+------------+------------+ + | id | feature_1 | ... | ts | + +------------+------------+------------+------------+ + | 8 | 8 | | 15 | + | 3 | 3 | ... | 6 | + | 1 | 1 | | 18 | + +------------+------------+------------+------------+ + + # Create the Feature Group instances + fg1 = fs.create_feature_group( + name = "fg1", + version=1, + primary_key=["id"], + event_time="ts", + ) + + # Insert data to the feature group. + fg1.insert(my_df) + + # select all features from `fg1` excluding primary key and event time + query = fg1.select_features() + + # show first 3 rows + query.show(3) + + # Output, no id or ts columns + +------------+------------+------------+ + | feature_1 | feature_2 | feature_3 | + +------------+------------+------------+ + | 8 | 7 | 15 | + | 3 | 1 | 6 | + | 1 | 2 | 18 | + +------------+------------+------------+ + ``` + + !!! example + ```python + # connect to the Feature Store + fs = hopsworks.login().get_feature_store() + + # Get the Feature Group from the previous example + fg1 = fs.get_feature_group("fg1", 1) + + # Some dataframe to create another feature group + # with a primary key column + +------------+------------+------------+ + | id_2 | feature_6 | feature_7 | + +------------+------------+------------+ + | 8 | 11 | | + | 3 | 4 | ... | + | 1 | 9 | | + +------------+------------+------------+ + + # join the two feature groups on their indexes, `id` and `id_2` + # but does not include them in the query + query = fg1.select_features().join(fg2.select_features(), left_on="id", right_on="id_2") + + # show first 5 rows + query.show(3) + + # Output + +------------+------------+------------+------------+------------+ + | feature_1 | feature_2 | feature_3 | feature_6 | feature_7 | + +------------+------------+------------+------------+------------+ + | 8 | 7 | 15 | 11 | 15 | + | 3 | 1 | 6 | 4 | 3 | + | 1 | 2 | 18 | 9 | 20 | + +------------+------------+------------+------------+------------+ + + ``` + + # Returns + `Query`. A query object with all features of the feature group. + """ + query = self.select_except(self.primary_key + [self.event_time]) + _logger.info( + f"Using {[f.name for f in query.features]} as features for the query." + "To include primary key and event time use `select_all`." + ) + + return query + def select(self, features: List[Union[str, feature.Feature]]) -> query.Query: """Select a subset of features of the feature group and return a query object. diff --git a/python/tests/test_feature_group.py b/python/tests/test_feature_group.py index 70d6b52f3..cf2618cf1 100644 --- a/python/tests/test_feature_group.py +++ b/python/tests/test_feature_group.py @@ -365,6 +365,12 @@ def test_select_all_exclude_pk_ts(self): assert len(features) == 2 assert set([f.name for f in features]) == {"f1", "f2"} + def test_select_features(self): + query = test_feature_group.select_features() + features = query.features + assert len(features) == 2 + assert set([f.name for f in features]) == {"f1", "f2"} + def test_materialization_job(self, mocker): mock_job = mocker.Mock() mock_job_api = mocker.patch( From 4c122f62437522bca78767a8cd396ca87925c17b Mon Sep 17 00:00:00 2001 From: Aleksey Veresov Date: Tue, 10 Sep 2024 17:52:55 +0200 Subject: [PATCH 03/23] Fix that InternalClientError was not in client.exceptions (#325) * Fix that InternalClientError was not in client.exceptions * Keep InternalClientError --- python/hopsworks_common/client/exceptions.py | 10 +++------- python/hsfs/client/exceptions.py | 4 ++-- python/hsml/client/exceptions.py | 4 ++-- 3 files changed, 7 insertions(+), 11 deletions(-) diff --git a/python/hopsworks_common/client/exceptions.py b/python/hopsworks_common/client/exceptions.py index 58f2118f3..a4f862517 100644 --- a/python/hopsworks_common/client/exceptions.py +++ b/python/hopsworks_common/client/exceptions.py @@ -118,14 +118,10 @@ def __init__(self, missing_argument: str) -> None: super().__init__(message) -class HopsworksClientError(TypeError): - """Raised when hopsworks internal client cannot be initialized due to missing arguments.""" +class InternalClientError(TypeError): + """Raised when hopsworks internal client is missing some necessary configuration.""" - def __init__(self, missing_argument): - message = ( - "{0} cannot be of type NoneType, {0} is a non-optional " - "argument to connect to hopsworks from an internal environment." - ).format(missing_argument) + def __init__(self, message: str) -> None: super().__init__(message) diff --git a/python/hsfs/client/exceptions.py b/python/hsfs/client/exceptions.py index a718c9ad4..945f9e7be 100644 --- a/python/hsfs/client/exceptions.py +++ b/python/hsfs/client/exceptions.py @@ -21,7 +21,7 @@ ExternalClientError, FeatureStoreException, GitException, - HopsworksClientError, + InternalClientError, JobException, JobExecutionException, KafkaException, @@ -42,7 +42,7 @@ "ExternalClientError", "FeatureStoreException", "GitException", - "HopsworksClientError", + "InternalClientError", "JobException", "JobExecutionException", "KafkaException", diff --git a/python/hsml/client/exceptions.py b/python/hsml/client/exceptions.py index a718c9ad4..945f9e7be 100644 --- a/python/hsml/client/exceptions.py +++ b/python/hsml/client/exceptions.py @@ -21,7 +21,7 @@ ExternalClientError, FeatureStoreException, GitException, - HopsworksClientError, + InternalClientError, JobException, JobExecutionException, KafkaException, @@ -42,7 +42,7 @@ "ExternalClientError", "FeatureStoreException", "GitException", - "HopsworksClientError", + "InternalClientError", "JobException", "JobExecutionException", "KafkaException", From 349b375db1a554c006bfd00523d90e6bdcf62e04 Mon Sep 17 00:00:00 2001 From: Dhananjay Mukhedkar <55157590+dhananjay-mk@users.noreply.github.com> Date: Wed, 11 Sep 2024 10:09:25 +0200 Subject: [PATCH 04/23] [FSTORE-1538] refactor closing connection pool (#324) * refactor closing connection pool * linting fixes --- python/hsfs/core/feature_view_engine.py | 82 +++++++++++++-------- python/hsfs/core/online_store_sql_engine.py | 12 ++- python/hsfs/engine/python.py | 44 ++++++----- python/hsfs/feature_logger.py | 8 +- python/hsfs/feature_view.py | 5 +- 5 files changed, 89 insertions(+), 62 deletions(-) diff --git a/python/hsfs/core/feature_view_engine.py b/python/hsfs/core/feature_view_engine.py index cba03007c..afdc9c75f 100644 --- a/python/hsfs/core/feature_view_engine.py +++ b/python/hsfs/core/feature_view_engine.py @@ -190,7 +190,9 @@ def delete(self, name, version=None): return self._feature_view_api.delete_by_name(name) def get_training_dataset_schema( - self, feature_view: feature_view.FeatureView, training_dataset_version: Optional[int] = None + self, + feature_view: feature_view.FeatureView, + training_dataset_version: Optional[int] = None, ): """ Function that returns the schema of the training dataset generated using the feature view. @@ -849,7 +851,9 @@ def _get_training_dataset_metadata( return td - def _get_training_datasets_metadata(self, feature_view_obj: feature_view.FeatureView): + def _get_training_datasets_metadata( + self, feature_view_obj: feature_view.FeatureView + ): tds = self._feature_view_api.get_training_datasets( feature_view_obj.name, feature_view_obj.version ) @@ -1114,8 +1118,7 @@ def log_features( pd.DataFrame, list[list], np.ndarray, TypeVar("pyspark.sql.DataFrame") ] = None, transformed_features: Union[ - pd.DataFrame, list[list], np.ndarray, TypeVar( - "pyspark.sql.DataFrame") + pd.DataFrame, list[list], np.ndarray, TypeVar("pyspark.sql.DataFrame") ] = None, predictions: Optional[Union[pd.DataFrame, list[list], np.ndarray]] = None, write_options: Optional[Dict[str, Any]] = None, @@ -1133,40 +1136,54 @@ def log_features( default_write_options.update(write_options) results = [] if logger: - logger.log(**{ - key: (self._get_feature_logging_data( - features_rows=features, - feature_logging=feature_logging, - transformed=transformed, - fv=fv, - predictions=predictions, - training_dataset_version=training_dataset_version, - hsml_model=hsml_model, - return_list=True, - ) if features else None) - for transformed, key, features in [ - (False, "untransformed_features", untransformed_features), (True, "transformed_features", transformed_features)] - } + logger.log( + **{ + key: ( + self._get_feature_logging_data( + features_rows=features, + feature_logging=feature_logging, + transformed=transformed, + fv=fv, + predictions=predictions, + training_dataset_version=training_dataset_version, + hsml_model=hsml_model, + return_list=True, + ) + if features + else None + ) + for transformed, key, features in [ + (False, "untransformed_features", untransformed_features), + (True, "transformed_features", transformed_features), + ] + } ) else: - for transformed, features in [(False, untransformed_features), (True, transformed_features)]: + for transformed, features in [ + (False, untransformed_features), + (True, transformed_features), + ]: fg = feature_logging.get_feature_group(transformed) if features is None: continue - results.append(fg.insert(self._get_feature_logging_data( - features_rows=features, - feature_logging=feature_logging, - transformed=transformed, - fv=fv, - predictions=predictions, - training_dataset_version=training_dataset_version, - hsml_model=hsml_model, - return_list=False, - ), write_options=default_write_options)) + results.append( + fg.insert( + self._get_feature_logging_data( + features_rows=features, + feature_logging=feature_logging, + transformed=transformed, + fv=fv, + predictions=predictions, + training_dataset_version=training_dataset_version, + hsml_model=hsml_model, + return_list=False, + ), + write_options=default_write_options, + ) + ) return results - def _get_feature_logging_data( self, features_rows, @@ -1223,10 +1240,11 @@ def _get_feature_logging_data( model_col_name=FeatureViewEngine._HSML_MODEL, predictions=predictions, training_dataset_version=training_dataset_version, - hsml_model=self.get_hsml_model_value(hsml_model) if hsml_model else None, + hsml_model=self.get_hsml_model_value(hsml_model) + if hsml_model + else None, ) - def read_feature_logs( self, fv, diff --git a/python/hsfs/core/online_store_sql_engine.py b/python/hsfs/core/online_store_sql_engine.py index 7c38d2b80..1d4cffbc7 100644 --- a/python/hsfs/core/online_store_sql_engine.py +++ b/python/hsfs/core/online_store_sql_engine.py @@ -574,10 +574,6 @@ async def _query_async_sql(self, stmt, bind_params): _logger.debug(f"Retrieved resultset: {resultset}. Closing cursor.") await cursor.close() - # close connection pool - self._connection_pool.close() - await self._connection_pool.wait_closed() - return resultset async def _execute_prep_statements( @@ -609,6 +605,14 @@ async def _execute_prep_statements( except asyncio.CancelledError as e: _logger.error(f"Failed executing prepared statements: {e}") raise e + finally: + # close connection pool + self._connection_pool.close() + try: + # await with timeout of 120 seconds + await asyncio.wait_for(self._connection_pool.wait_closed(), timeout=120) + except asyncio.TimeoutError as e: + _logger.error(f"Connection pool did not close within time limit: {e}") # Create a dict of results with the prepared statement index as key results_dict = {} diff --git a/python/hsfs/engine/python.py b/python/hsfs/engine/python.py index f4252d5d7..d05e7c2a7 100644 --- a/python/hsfs/engine/python.py +++ b/python/hsfs/engine/python.py @@ -1431,9 +1431,7 @@ def _convert_feature_log_to_df(feature_log, cols) -> pd.DataFrame: @staticmethod def _validate_logging_list(feature_log, cols): - if isinstance(feature_log[0], list) or isinstance( - feature_log[0], np.ndarray - ): + if isinstance(feature_log[0], list) or isinstance(feature_log[0], np.ndarray): provided_len = len(feature_log[0]) else: provided_len = 1 @@ -1458,12 +1456,9 @@ def get_logging_metadata( now = datetime.now() metadata = { td_col_name: [training_dataset_version for _ in range(size)], - model_col_name: [ - hsml_model - for _ in range(size) - ], + model_col_name: [hsml_model for _ in range(size)], time_col_name: pd.Series([now for _ in range(size)]), - "log_id": [str(uuid.uuid4()) for _ in range(size)] + "log_id": [str(uuid.uuid4()) for _ in range(size)], } if not batch: @@ -1508,9 +1503,7 @@ def get_feature_logging_df( for k, v in logging_metadata.items(): features[k] = pd.Series(v) # _cast_column_to_offline_type cannot cast string type - features[model_col_name] = features[model_col_name].astype( - pd.StringDtype() - ) + features[model_col_name] = features[model_col_name].astype(pd.StringDtype()) return features[[feat.name for feat in fg.features]] @staticmethod @@ -1528,8 +1521,17 @@ def get_feature_logging_list( ) -> list: if isinstance(features, pd.DataFrame): return Engine.get_feature_logging_df( - features, fg, td_features, td_predictions, td_col_name, time_col_name, model_col_name, predictions, training_dataset_version, hsml_model - ).to_dict(orient='records') + features, + fg, + td_features, + td_predictions, + td_col_name, + time_col_name, + model_col_name, + predictions, + training_dataset_version, + hsml_model, + ).to_dict(orient="records") else: log_vectors = [] @@ -1546,13 +1548,15 @@ def get_feature_logging_list( # get metadata for row in log_vectors: - row.update(Engine.get_logging_metadata( - td_col_name=td_col_name, - time_col_name=time_col_name, - model_col_name=model_col_name, - training_dataset_version=training_dataset_version, - hsml_model=hsml_model, - )) + row.update( + Engine.get_logging_metadata( + td_col_name=td_col_name, + time_col_name=time_col_name, + model_col_name=model_col_name, + training_dataset_version=training_dataset_version, + hsml_model=hsml_model, + ) + ) return log_vectors @staticmethod diff --git a/python/hsfs/feature_logger.py b/python/hsfs/feature_logger.py index f036ac331..91354fe05 100644 --- a/python/hsfs/feature_logger.py +++ b/python/hsfs/feature_logger.py @@ -19,10 +19,12 @@ class FeatureLogger(ABC): - @abstractmethod - def log(self, untransformed_features: Optional[List[Dict]] = None, - transformed_features: Optional[List[Dict]] = None): + def log( + self, + untransformed_features: Optional[List[Dict]] = None, + transformed_features: Optional[List[Dict]] = None, + ): pass @abstractmethod diff --git a/python/hsfs/feature_view.py b/python/hsfs/feature_view.py index 65db95876..3ecbf2068 100644 --- a/python/hsfs/feature_view.py +++ b/python/hsfs/feature_view.py @@ -3596,8 +3596,7 @@ def log( ] = None, predictions: Optional[Union[pd.DataFrame, list[list], np.ndarray]] = None, transformed_features: Union[ - pd.DataFrame, list[list], np.ndarray, TypeVar( - "pyspark.sql.DataFrame") + pd.DataFrame, list[list], np.ndarray, TypeVar("pyspark.sql.DataFrame") ] = None, write_options: Optional[Dict[str, Any]] = None, training_dataset_version: Optional[int] = None, @@ -3660,7 +3659,7 @@ def log( or self.get_last_accessed_training_dataset() ), hsml_model=model, - logger=self._feature_logger + logger=self._feature_logger, ) def get_log_timeline( From 67783a182dd375ccb17ef3dec19f6914ef60c90e Mon Sep 17 00:00:00 2001 From: Aleksey Veresov Date: Wed, 11 Sep 2024 10:31:32 +0200 Subject: [PATCH 05/23] Fix that error_code was not in RestAPIError (#327) * Fix that error_code was not in RestAPIError * Fix mocking --- python/hopsworks_common/client/exceptions.py | 3 +++ python/tests/test_feature_group.py | 4 ++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/python/hopsworks_common/client/exceptions.py b/python/hopsworks_common/client/exceptions.py index a4f862517..c02ea360d 100644 --- a/python/hopsworks_common/client/exceptions.py +++ b/python/hopsworks_common/client/exceptions.py @@ -50,6 +50,7 @@ def __init__(self, url: str, response: requests.Response) -> None: error_object = {"errorMsg": error_object} except Exception: error_object = {} + self.error_code = None message = ( "Metadata operation error: (url: {}). Server response: \n" "HTTP code: {}, HTTP reason: {}, body: {}, error code: {}, error msg: {}, user " @@ -63,6 +64,8 @@ def __init__(self, url: str, response: requests.Response) -> None: error_object.get("usrMsg", ""), ) ) + if len(error_object) != 0: + self.error_code = error_object.get("errorCode", "") super().__init__(message) self.url = url self.response = response diff --git a/python/tests/test_feature_group.py b/python/tests/test_feature_group.py index cf2618cf1..bb2944f97 100644 --- a/python/tests/test_feature_group.py +++ b/python/tests/test_feature_group.py @@ -407,7 +407,7 @@ def test_materialization_job_retry_success(self, mocker): mock_response_job_not_found.status_code = 404 mock_response_job_not_found.json.return_value = {"errorCode": 130009} - mock_response_not_found = mocker.Mock() + mock_response_not_found = mocker.MagicMock() mock_response_not_found.status_code = 404 mock_job = mocker.Mock() @@ -448,7 +448,7 @@ def test_materialization_job_retry_fail(self, mocker): # Arrange mocker.patch("time.sleep") - mock_response_not_found = mocker.Mock() + mock_response_not_found = mocker.MagicMock() mock_response_not_found.status_code = 404 mock_job_api = mocker.patch( From ff0dd4f037bf522636cc3faee4474ea79ea738bb Mon Sep 17 00:00:00 2001 From: Aleksey Veresov Date: Thu, 12 Sep 2024 08:31:40 +0200 Subject: [PATCH 06/23] Remove login warnings from Connection (#330) --- python/hopsworks_common/connection.py | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/python/hopsworks_common/connection.py b/python/hopsworks_common/connection.py index 6b317f37c..23b0b29ae 100644 --- a/python/hopsworks_common/connection.py +++ b/python/hopsworks_common/connection.py @@ -415,19 +415,6 @@ def connect(self) -> None: self._connected = False raise - _client = client.get_instance() - if _client._is_external() and not _client._project_name: - warnings.warn( - "Connected to Hopsworks. You must provide a project name to access project resources. " - "Use `connection.get_project('my_project')`.", - stacklevel=2, - ) - else: - print( - "Connected. Call `.close()` to terminate connection gracefully.", - flush=True, - ) - self._check_compatibility() @connected From ba9570322609e3633b339ed4aeed6647747ddeeb Mon Sep 17 00:00:00 2001 From: Victor Jouffrey <37411285+vatj@users.noreply.github.com> Date: Mon, 16 Sep 2024 07:41:47 +0200 Subject: [PATCH 07/23] [FSTORE-1540] Enforce releasing lock on mysql table after reading feature values from online store (#331) --- python/hsfs/core/online_store_sql_engine.py | 18 +++++++++--------- python/hsfs/core/util_sql.py | 14 +++++++------- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/python/hsfs/core/online_store_sql_engine.py b/python/hsfs/core/online_store_sql_engine.py index 1d4cffbc7..0f2362273 100644 --- a/python/hsfs/core/online_store_sql_engine.py +++ b/python/hsfs/core/online_store_sql_engine.py @@ -601,18 +601,18 @@ async def _execute_prep_statements( for key in prepared_statements ] # Run the queries in parallel using asyncio.gather - results = await asyncio.gather(*tasks) + results = await asyncio.wait_for( + asyncio.gather(*tasks), + timeout=self.connection_options.get("query_timeout", 120) + if self.connection_options + else 120, + ) except asyncio.CancelledError as e: _logger.error(f"Failed executing prepared statements: {e}") raise e - finally: - # close connection pool - self._connection_pool.close() - try: - # await with timeout of 120 seconds - await asyncio.wait_for(self._connection_pool.wait_closed(), timeout=120) - except asyncio.TimeoutError as e: - _logger.error(f"Connection pool did not close within time limit: {e}") + except asyncio.TimeoutError as e: + _logger.error(f"Query timed out: {e}") + raise e # Create a dict of results with the prepared statement index as key results_dict = {} diff --git a/python/hsfs/core/util_sql.py b/python/hsfs/core/util_sql.py index dc0271306..67a47676a 100644 --- a/python/hsfs/core/util_sql.py +++ b/python/hsfs/core/util_sql.py @@ -100,6 +100,9 @@ async def create_async_engine( else: hostname = url.host + if options is None: + options = {} + # create a aiomysql connection pool pool = await async_create_engine( host=hostname, @@ -108,12 +111,9 @@ async def create_async_engine( password=online_options["password"], db=url.database, loop=loop, - minsize=( - options.get("minsize", default_min_size) if options else default_min_size - ), - maxsize=( - options.get("maxsize", default_min_size) if options else default_min_size - ), - pool_recycle=(options.get("pool_recycle", -1) if options else -1), + minsize=options.get("minsize", default_min_size), + maxsize=options.get("maxsize", default_min_size), + pool_recycle=options.get("pool_recycle", -1), + autocommit=options.get("autocommit", True), ) return pool From b7eefb45c7497d1bc47097730ebeb274c7b03570 Mon Sep 17 00:00:00 2001 From: Robin Andersson Date: Mon, 16 Sep 2024 10:09:47 +0200 Subject: [PATCH 08/23] [HWORKS-1620] Set hostname verification off by default in python client (#332) --- python/hopsworks/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/hopsworks/__init__.py b/python/hopsworks/__init__.py index eb2b0c090..79d500769 100644 --- a/python/hopsworks/__init__.py +++ b/python/hopsworks/__init__.py @@ -81,7 +81,7 @@ def login( project: str = None, api_key_value: str = None, api_key_file: str = None, - hostname_verification: bool = True, + hostname_verification: bool = False, trust_store_path: str = None, ) -> project.Project: """Connect to [Serverless Hopsworks](https://app.hopsworks.ai) by calling the `hopsworks.login()` function with no arguments. From ddc9308c4ed3ce791af3d70c9928412d3b7c2580 Mon Sep 17 00:00:00 2001 From: manu-sj <152865565+manu-sj@users.noreply.github.com> Date: Tue, 17 Sep 2024 10:28:01 +0200 Subject: [PATCH 09/23] [HWORKS-1565][APPEND] Make sure the spark program exits after completion (#334) * printing stacktrace in case of exception * fixing style check * removinng duplicate code --- .../java/src/main/java/com/logicalclocks/utils/MainClass.java | 2 ++ utils/python/hsfs_utils.py | 4 ++++ 2 files changed, 6 insertions(+) diff --git a/utils/java/src/main/java/com/logicalclocks/utils/MainClass.java b/utils/java/src/main/java/com/logicalclocks/utils/MainClass.java index f3104e3bc..e80f2cd87 100644 --- a/utils/java/src/main/java/com/logicalclocks/utils/MainClass.java +++ b/utils/java/src/main/java/com/logicalclocks/utils/MainClass.java @@ -121,6 +121,8 @@ public static void main(String[] args) throws Exception { SparkEngine.getInstance().streamToHudiTable(streamFeatureGroup, writeOptions); } success = true; + } catch (Exception e) { + e.printStackTrace(); } finally { LOGGER.info("Closing spark session..."); try { diff --git a/utils/python/hsfs_utils.py b/utils/python/hsfs_utils.py index 40799c9f4..5af468873 100644 --- a/utils/python/hsfs_utils.py +++ b/utils/python/hsfs_utils.py @@ -6,6 +6,7 @@ import json from datetime import datetime from typing import Any, Dict +import traceback import fsspec.implementations.arrow as pfs @@ -304,6 +305,9 @@ def parse_isoformat_date(da: str) -> datetime: run_feature_monitoring(job_conf) success = True + except Exception: + # Printing stack trace of exception so that logs are not lost. + print(traceback.format_exc()) finally: if spark is not None: try: From 5bd9da58d57e397b0d0833e38756a681b2f6bcf2 Mon Sep 17 00:00:00 2001 From: manu-sj <152865565+manu-sj@users.noreply.github.com> Date: Tue, 17 Sep 2024 19:44:07 +0200 Subject: [PATCH 10/23] [FSTORE-1533] Importing hopsworks fails due to core dump while importing polars (#323) * making polars an optional dependency * updating pyproject and requirements docs * adding HAS_POLARS check to build docs * fixing review comments * fixing review comments updating typing and exception messages * fixing review comments updating typing and exception messages * fixing pyproject --- python/hopsworks_common/core/constants.py | 7 ++ python/hopsworks_common/core/type_systems.py | 10 ++- python/hopsworks_common/decorators.py | 12 ++++ python/hsfs/core/arrow_flight_client.py | 8 ++- .../core/transformation_function_engine.py | 7 +- python/hsfs/core/vector_server.py | 22 +++--- python/hsfs/engine/python.py | 69 +++++++++++++------ python/hsfs/feature_group.py | 5 +- python/hsfs/feature_store.py | 6 +- python/hsfs/feature_view.py | 17 +++-- python/hsfs/storage_connector.py | 5 +- python/pyproject.toml | 4 +- python/tests/engine/test_python.py | 60 +++++++++++++++- requirements-docs.txt | 1 + 14 files changed, 188 insertions(+), 45 deletions(-) diff --git a/python/hopsworks_common/core/constants.py b/python/hopsworks_common/core/constants.py index 56f98d01e..2cba250a6 100644 --- a/python/hopsworks_common/core/constants.py +++ b/python/hopsworks_common/core/constants.py @@ -47,6 +47,13 @@ HAS_PANDAS: bool = importlib.util.find_spec("pandas") is not None HAS_NUMPY: bool = importlib.util.find_spec("numpy") is not None HAS_POLARS: bool = importlib.util.find_spec("polars") is not None +polars_not_installed_message = ( + "Polars package not found. " + "If you want to use Polars with Hopsworks you can install the corresponding extra via " + """'`pip install "hopsworks[polars]"`. '""" + "You can also install polars directly in your environment with `pip install polars`. " + "You will need to restart your kernel if applicable." +) # SQL packages HAS_SQLALCHEMY: bool = importlib.util.find_spec("sqlalchemy") is not None diff --git a/python/hopsworks_common/core/type_systems.py b/python/hopsworks_common/core/type_systems.py index 046f5452e..aa38df296 100644 --- a/python/hopsworks_common/core/type_systems.py +++ b/python/hopsworks_common/core/type_systems.py @@ -22,7 +22,12 @@ from typing import TYPE_CHECKING, Literal, Union import pytz -from hopsworks_common.core.constants import HAS_ARROW, HAS_PANDAS, HAS_POLARS +from hopsworks_common.core.constants import ( + HAS_ARROW, + HAS_PANDAS, + HAS_POLARS, +) +from hopsworks_common.decorators import uses_polars if TYPE_CHECKING: @@ -195,6 +200,7 @@ def cast_pandas_column_to_offline_type( return feature_column # handle gracefully, just return the column as-is +@uses_polars def cast_polars_column_to_offline_type( feature_column: pl.Series, offline_type: str ) -> pl.Series: @@ -232,7 +238,7 @@ def cast_column_to_offline_type( ) -> pd.Series: if isinstance(feature_column, pd.Series): return cast_pandas_column_to_offline_type(feature_column, offline_type.lower()) - elif isinstance(feature_column, pl.Series): + elif HAS_POLARS and isinstance(feature_column, pl.Series): return cast_polars_column_to_offline_type(feature_column, offline_type.lower()) diff --git a/python/hopsworks_common/decorators.py b/python/hopsworks_common/decorators.py index d24321ffb..0e23113be 100644 --- a/python/hopsworks_common/decorators.py +++ b/python/hopsworks_common/decorators.py @@ -21,7 +21,9 @@ from hopsworks_common.core.constants import ( HAS_GREAT_EXPECTATIONS, + HAS_POLARS, great_expectations_not_installed_message, + polars_not_installed_message, ) @@ -84,3 +86,13 @@ def g(*args, **kwds): return f(*args, **kwds) return g + + +def uses_polars(f): + @functools.wraps(f) + def g(*args, **kwds): + if not HAS_POLARS: + raise ModuleNotFoundError(polars_not_installed_message) + return f(*args, **kwds) + + return g diff --git a/python/hsfs/core/arrow_flight_client.py b/python/hsfs/core/arrow_flight_client.py index 3ec28a68f..e03283a38 100644 --- a/python/hsfs/core/arrow_flight_client.py +++ b/python/hsfs/core/arrow_flight_client.py @@ -23,12 +23,12 @@ from functools import wraps from typing import Any, Dict, Optional, Union -import polars as pl import pyarrow import pyarrow._flight import pyarrow.flight from hopsworks_common import client from hopsworks_common.client.exceptions import FeatureStoreException +from hopsworks_common.core.constants import HAS_POLARS, polars_not_installed_message from hsfs import feature_group from hsfs.constructor import query from hsfs.core.variable_api import VariableApi @@ -37,6 +37,10 @@ from retrying import retry +if HAS_POLARS: + import polars as pl + + _logger = logging.getLogger(__name__) @@ -397,6 +401,8 @@ def _get_dataset(self, descriptor, timeout=None, dataframe_type="pandas"): reader = self._connection.do_get(info.endpoints[0].ticket, options) _logger.debug("Dataset fetched. Converting to dataframe %s.", dataframe_type) if dataframe_type.lower() == "polars": + if not HAS_POLARS: + raise ModuleNotFoundError(polars_not_installed_message) return pl.from_arrow(reader.read_all()) else: return reader.read_pandas() diff --git a/python/hsfs/core/transformation_function_engine.py b/python/hsfs/core/transformation_function_engine.py index b2351adaa..3d17df081 100644 --- a/python/hsfs/core/transformation_function_engine.py +++ b/python/hsfs/core/transformation_function_engine.py @@ -15,14 +15,17 @@ # from __future__ import annotations -from typing import Dict, List, Optional, Set, TypeVar, Union +from typing import TYPE_CHECKING, Dict, List, Optional, Set, TypeVar, Union import pandas as pd -import polars as pl from hsfs import feature_view, statistics, training_dataset, transformation_function from hsfs.core import transformation_function_api +if TYPE_CHECKING: + import polars as pl + + class TransformationFunctionEngine: BUILTIN_FN_NAMES = [ "min_max_scaler", diff --git a/python/hsfs/core/vector_server.py b/python/hsfs/core/vector_server.py index 3b28bd86a..ccc0f39d2 100755 --- a/python/hsfs/core/vector_server.py +++ b/python/hsfs/core/vector_server.py @@ -27,8 +27,12 @@ import avro.schema import numpy as np import pandas as pd -import polars as pl from hopsworks_common import client +from hopsworks_common.core.constants import ( + HAS_FAST_AVRO, + HAS_POLARS, + polars_not_installed_message, +) from hsfs import ( feature_view, training_dataset, @@ -48,14 +52,14 @@ ) -HAS_FASTAVRO = False -try: +if HAS_FAST_AVRO: from fastavro import schemaless_reader - - HAS_FASTAVRO = True -except ImportError: +else: from avro.io import BinaryDecoder +if HAS_POLARS: + import polars as pl + _logger = logging.getLogger(__name__) @@ -590,7 +594,7 @@ def _check_feature_vectors_type_and_convert_to_dict( return_type = "pandas" feature_vectors = feature_vectors.to_dict(orient="records") - elif isinstance(feature_vectors, pl.DataFrame): + elif HAS_POLARS and isinstance(feature_vectors, pl.DataFrame): return_type = "polars" feature_vectors = feature_vectors.to_pandas() feature_vectors = feature_vectors.to_dict(orient="records") @@ -823,6 +827,8 @@ def handle_feature_vector_return_type( return pandas_df elif return_type.lower() == "polars": _logger.debug("Returning feature vector as polars dataframe") + if not HAS_POLARS: + raise ModuleNotFoundError(polars_not_installed_message) return pl.DataFrame( feature_vectorz if batch else [feature_vectorz], schema=column_names if not inference_helper else None, @@ -1076,7 +1082,7 @@ def build_complex_feature_decoders(self) -> Dict[str, Callable]: _logger.debug( f"Building complex feature decoders corresponding to {complex_feature_schemas}." ) - if HAS_FASTAVRO: + if HAS_FAST_AVRO: _logger.debug("Using fastavro for deserialization.") return { f_name: ( diff --git a/python/hsfs/engine/python.py b/python/hsfs/engine/python.py index d05e7c2a7..81fae1c6f 100644 --- a/python/hsfs/engine/python.py +++ b/python/hsfs/engine/python.py @@ -52,11 +52,12 @@ import hsfs import numpy as np import pandas as pd -import polars as pl import pyarrow as pa from botocore.response import StreamingBody from hopsworks_common import client from hopsworks_common.client.exceptions import FeatureStoreException +from hopsworks_common.core.constants import HAS_POLARS, polars_not_installed_message +from hopsworks_common.decorators import uses_great_expectations, uses_polars from hsfs import ( feature, feature_view, @@ -88,7 +89,6 @@ HAS_SQLALCHEMY, ) from hsfs.core.vector_db_client import VectorDbClient -from hsfs.decorators import uses_great_expectations from hsfs.feature_group import ExternalFeatureGroup, FeatureGroup from hsfs.training_dataset import TrainingDataset from hsfs.training_dataset_feature import TrainingDatasetFeature @@ -109,6 +109,9 @@ if HAS_PANDAS: from hsfs.core.type_systems import convert_pandas_dtype_to_offline_type +if HAS_POLARS: + import polars as pl + _logger = logging.getLogger(__name__) @@ -214,6 +217,8 @@ def _jdbc( if "sqlalchemy" in str(type(mysql_conn)): sql_query = sql.text(sql_query) if dataframe_type.lower() == "polars": + if not HAS_POLARS: + raise ModuleNotFoundError(polars_not_installed_message) result_df = pl.read_database(sql_query, mysql_conn) else: result_df = pd.read_sql(sql_query, mysql_conn) @@ -247,6 +252,8 @@ def read( ) ) if dataframe_type.lower() == "polars": + if not HAS_POLARS: + raise ModuleNotFoundError(polars_not_installed_message) # Below check performed since some files materialized when creating training data are empty # If empty dataframe is in df_list then polars cannot concatenate df_list due to schema mismatch # However if the entire split contains only empty files which can occur when the data size is very small then one of the empty dataframe is return so that the column names can be accessed. @@ -278,9 +285,12 @@ def _read_pandas(self, data_format: str, obj: Any) -> pd.DataFrame: ) ) + @uses_polars def _read_polars( self, data_format: Literal["csv", "tsv", "parquet"], obj: Any ) -> pl.DataFrame: + if not HAS_POLARS: + raise ModuleNotFoundError(polars_not_installed_message) if data_format.lower() == "csv": return pl.read_csv(obj) elif data_format.lower() == "tsv": @@ -454,6 +464,8 @@ def read_vector_db( results = VectorDbClient.read_feature_group(feature_group, n) feature_names = [f.name for f in feature_group.features] if dataframe_type == "polars": + if not HAS_POLARS: + raise ModuleNotFoundError(polars_not_installed_message) df = pl.DataFrame(results, schema=feature_names) else: df = pd.DataFrame(results, columns=feature_names, index=None) @@ -513,7 +525,9 @@ def profile( exact_uniqueness: bool = True, ) -> str: # TODO: add statistics for correlations, histograms and exact_uniqueness - if isinstance(df, pl.DataFrame) or isinstance(df, pl.dataframe.frame.DataFrame): + if HAS_POLARS and ( + isinstance(df, pl.DataFrame) or isinstance(df, pl.dataframe.frame.DataFrame) + ): arrow_schema = df.to_arrow().schema else: arrow_schema = pa.Schema.from_pandas(df, preserve_index=False) @@ -526,8 +540,9 @@ def profile( or pa.types.is_large_list(field.type) or pa.types.is_struct(field.type) ) and PYARROW_HOPSWORKS_DTYPE_MAPPING[field.type] in ["timestamp", "date"]: - if isinstance(df, pl.DataFrame) or isinstance( - df, pl.dataframe.frame.DataFrame + if HAS_POLARS and ( + isinstance(df, pl.DataFrame) + or isinstance(df, pl.dataframe.frame.DataFrame) ): df = df.with_columns(pl.col(field.name).cast(pl.String)) else: @@ -546,8 +561,9 @@ def profile( stats[col] = df[col].describe().to_dict() final_stats = [] for col in relevant_columns: - if isinstance(df, pl.DataFrame) or isinstance( - df, pl.dataframe.frame.DataFrame + if HAS_POLARS and ( + isinstance(df, pl.DataFrame) + or isinstance(df, pl.dataframe.frame.DataFrame) ): stats[col] = dict(zip(stats["statistic"], stats[col])) # set data type @@ -632,8 +648,9 @@ def validate_with_great_expectations( ) -> great_expectations.core.ExpectationSuiteValidationResult: # This conversion might cause a bottleneck in performance when using polars with greater expectations. # This patch is done becuase currently great_expecatations does not support polars, would need to be made proper when support added. - if isinstance(dataframe, pl.DataFrame) or isinstance( - dataframe, pl.dataframe.frame.DataFrame + if HAS_POLARS and ( + isinstance(dataframe, pl.DataFrame) + or isinstance(dataframe, pl.dataframe.frame.DataFrame) ): warnings.warn( "Currently Great Expectations does not support Polars dataframes. This operation will convert to Pandas dataframe that can be slow.", @@ -654,10 +671,12 @@ def set_job_group(self, group_id: str, description: Optional[str]) -> None: def convert_to_default_dataframe( self, dataframe: Union[pd.DataFrame, pl.DataFrame, pl.dataframe.frame.DataFrame] ) -> Optional[pd.DataFrame]: - if ( - isinstance(dataframe, pd.DataFrame) - or isinstance(dataframe, pl.DataFrame) - or isinstance(dataframe, pl.dataframe.frame.DataFrame) + if isinstance(dataframe, pd.DataFrame) or ( + HAS_POLARS + and ( + isinstance(dataframe, pl.DataFrame) + or isinstance(dataframe, pl.dataframe.frame.DataFrame) + ) ): upper_case_features = [ col for col in dataframe.columns if any(re.finditer("[A-Z]", col)) @@ -700,7 +719,7 @@ def convert_to_default_dataframe( dataframe_copy[col].dtype, pd.core.dtypes.dtypes.DatetimeTZDtype ): dataframe_copy[col] = dataframe_copy[col].dt.tz_convert(None) - elif isinstance(dataframe_copy[col].dtype, pl.Datetime): + elif HAS_POLARS and isinstance(dataframe_copy[col].dtype, pl.Datetime): dataframe_copy = dataframe_copy.with_columns( pl.col(col).dt.replace_time_zone(None) ) @@ -725,8 +744,10 @@ def parse_schema_feature_group( feature_type_map[_feature.name] = _feature.type if isinstance(dataframe, pd.DataFrame): arrow_schema = pa.Schema.from_pandas(dataframe, preserve_index=False) - elif isinstance(dataframe, pl.DataFrame) or isinstance( - dataframe, pl.dataframe.frame.DataFrame + elif ( + HAS_POLARS + and isinstance(dataframe, pl.DataFrame) + or isinstance(dataframe, pl.dataframe.frame.DataFrame) ): arrow_schema = dataframe.to_arrow().schema features = [] @@ -999,13 +1020,16 @@ def _random_split( groups += [i] * int(df_size * split.percentage) groups += [len(splits) - 1] * (df_size - len(groups)) random.shuffle(groups) - if isinstance(df, pl.DataFrame) or isinstance(df, pl.dataframe.frame.DataFrame): + if HAS_POLARS and ( + isinstance(df, pl.DataFrame) or isinstance(df, pl.dataframe.frame.DataFrame) + ): df = df.with_columns(pl.Series(name=split_column, values=groups)) else: df[split_column] = groups for i, split in enumerate(splits): - if isinstance(df, pl.DataFrame) or isinstance( - df, pl.dataframe.frame.DataFrame + if HAS_POLARS and ( + isinstance(df, pl.DataFrame) + or isinstance(df, pl.dataframe.frame.DataFrame) ): split_df = df.filter(pl.col(split_column) == i).drop(split_column) else: @@ -1121,6 +1145,8 @@ def _return_dataframe_type( if dataframe_type.lower() in ["default", "pandas"]: return dataframe if dataframe_type.lower() == "polars": + if not HAS_POLARS: + raise ModuleNotFoundError(polars_not_installed_message) if not ( isinstance(dataframe, pl.DataFrame) or isinstance(dataframe, pl.Series) ): @@ -1220,8 +1246,9 @@ def _apply_transformation_function( """ dropped_features = set() - if isinstance(dataset, pl.DataFrame) or isinstance( - dataset, pl.dataframe.frame.DataFrame + if HAS_POLARS and ( + isinstance(dataset, pl.DataFrame) + or isinstance(dataset, pl.dataframe.frame.DataFrame) ): # Converting polars dataframe to pandas because currently we support only pandas UDF's as transformation functions. if HAS_ARROW: diff --git a/python/hsfs/feature_group.py b/python/hsfs/feature_group.py index 95604d9ff..764123cca 100644 --- a/python/hsfs/feature_group.py +++ b/python/hsfs/feature_group.py @@ -42,8 +42,8 @@ import humps import numpy as np import pandas as pd -import polars as pl from hopsworks_common.client.exceptions import FeatureStoreException, RestAPIError +from hopsworks_common.core.constants import HAS_POLARS from hsfs import ( engine, feature, @@ -104,6 +104,9 @@ if HAS_CONFLUENT_KAFKA: import confluent_kafka +if HAS_POLARS: + import polars as pl + _logger = logging.getLogger(__name__) diff --git a/python/hsfs/feature_store.py b/python/hsfs/feature_store.py index 6cf52e87f..e3c2f478d 100644 --- a/python/hsfs/feature_store.py +++ b/python/hsfs/feature_store.py @@ -23,7 +23,7 @@ import humps import numpy as np import pandas as pd -import polars as pl +from hopsworks_common.core.constants import HAS_POLARS from hsfs import ( expectation_suite, feature, @@ -53,6 +53,10 @@ from hsfs.transformation_function import TransformationFunction +if HAS_POLARS: + import polars as pl + + @typechecked class FeatureStore: DEFAULT_VERSION = 1 diff --git a/python/hsfs/feature_view.py b/python/hsfs/feature_view.py index 3ecbf2068..7527f4de7 100644 --- a/python/hsfs/feature_view.py +++ b/python/hsfs/feature_view.py @@ -35,8 +35,8 @@ import humps import numpy as np import pandas as pd -import polars as pl from hopsworks_common.client.exceptions import FeatureStoreException +from hopsworks_common.core.constants import HAS_POLARS from hsfs import ( feature_group, storage_connector, @@ -76,17 +76,23 @@ from hsml.model import Model -_logger = logging.getLogger(__name__) - TrainingDatasetDataFrameTypes = Union[ pd.DataFrame, TypeVar("pyspark.sql.DataFrame"), # noqa: F821 TypeVar("pyspark.RDD"), # noqa: F821 np.ndarray, List[List[Any]], - pl.DataFrame, ] +if HAS_POLARS: + import polars as pl + + TrainingDatasetDataFrameTypes = Union[ + TrainingDatasetDataFrameTypes, + pl.DataFrame, + ] + + SplineDataFrameTypes = Union[ pd.DataFrame, TypeVar("pyspark.sql.DataFrame"), # noqa: F821 @@ -97,6 +103,9 @@ ] +_logger = logging.getLogger(__name__) + + @typechecked class FeatureView: ENTITY_TYPE = "featureview" diff --git a/python/hsfs/storage_connector.py b/python/hsfs/storage_connector.py index 9f959b1da..97e963ad0 100644 --- a/python/hsfs/storage_connector.py +++ b/python/hsfs/storage_connector.py @@ -26,12 +26,15 @@ import humps import numpy as np import pandas as pd -import polars as pl from hopsworks_common import client +from hopsworks_common.core.constants import HAS_POLARS from hsfs import engine from hsfs.core import storage_connector_api +if HAS_POLARS: + import polars as pl + _logger = logging.getLogger(__name__) diff --git a/python/pyproject.toml b/python/pyproject.toml index 3e404ea9d..6cd64077e 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -52,7 +52,6 @@ dependencies = [ "fsspec", "retrying", "hopsworks_aiomysql[sa]==0.2.1", - "polars>=0.20.18,<=0.21.0", "opensearch-py>=1.1.0,<=2.4.2", "tqdm", "grpcio>=1.49.1,<2.0.0", # ^1.49.1 @@ -86,7 +85,8 @@ dev-pandas1 = [ "pandas<=1.5.3", "sqlalchemy<=1.4.48", ] -dev = ["hopsworks[dev-no-opt,great-expectations]"] +dev = ["hopsworks[dev-no-opt,great-expectations,polars]"] +polars=["polars>=0.20.18,<=0.21.0"] [build-system] requires = ["setuptools", "wheel"] diff --git a/python/tests/engine/test_python.py b/python/tests/engine/test_python.py index 7fb9b2372..49a66dc46 100644 --- a/python/tests/engine/test_python.py +++ b/python/tests/engine/test_python.py @@ -19,9 +19,9 @@ import hopsworks_common import numpy as np import pandas as pd -import polars as pl import pyarrow as pa import pytest +from hopsworks_common.core.constants import HAS_POLARS from hsfs import ( feature, feature_group, @@ -39,7 +39,11 @@ from hsfs.expectation_suite import ExpectationSuite from hsfs.hopsworks_udf import udf from hsfs.training_dataset_feature import TrainingDatasetFeature -from polars.testing import assert_frame_equal as polars_assert_frame_equal + + +if HAS_POLARS: + import polars as pl + from polars.testing import assert_frame_equal as polars_assert_frame_equal hopsworks_common.connection._hsfs_engine_type = "python" @@ -252,6 +256,10 @@ def test_read_hopsfs_connector_empty_dataframe(self, mocker): assert isinstance(dataframe, pd.DataFrame) assert len(dataframe) == 0 + @pytest.mark.skipif( + not HAS_POLARS, + reason="Polars is not installed.", + ) def test_read_hopsfs_connector_empty_dataframe_polars(self, mocker): # Arrange @@ -403,6 +411,10 @@ def test_read_pandas_other(self, mocker): assert mock_pandas_read_csv.call_count == 0 assert mock_pandas_read_parquet.call_count == 0 + @pytest.mark.skipif( + not HAS_POLARS, + reason="Polars is not installed.", + ) def test_read_polars_csv(self, mocker): # Arrange mock_pandas_read_csv = mocker.patch("polars.read_csv") @@ -417,6 +429,10 @@ def test_read_polars_csv(self, mocker): assert mock_pandas_read_csv.call_count == 1 assert mock_pandas_read_parquet.call_count == 0 + @pytest.mark.skipif( + not HAS_POLARS, + reason="Polars is not installed.", + ) def test_read_polars_tsv(self, mocker): # Arrange mock_pandas_read_csv = mocker.patch("polars.read_csv") @@ -431,6 +447,10 @@ def test_read_polars_tsv(self, mocker): assert mock_pandas_read_csv.call_count == 1 assert mock_pandas_read_parquet.call_count == 0 + @pytest.mark.skipif( + not HAS_POLARS, + reason="Polars is not installed.", + ) def test_read_polars_parquet(self, mocker): # Arrange mock_pandas_read_csv = mocker.patch("polars.read_csv") @@ -448,6 +468,10 @@ def test_read_polars_parquet(self, mocker): assert mock_pandas_read_csv.call_count == 0 assert mock_pandas_read_parquet.call_count == 1 + @pytest.mark.skipif( + not HAS_POLARS, + reason="Polars is not installed.", + ) def test_read_polars_other(self, mocker): # Arrange mock_pandas_read_csv = mocker.patch("polars.read_csv") @@ -885,6 +909,10 @@ def test_profile_pandas_with_null_column(self, mocker): ) assert mock_python_engine_convert_pandas_statistics.call_count == 3 + @pytest.mark.skipif( + not HAS_POLARS, + reason="Polars is not installed.", + ) def test_profile_polars(self, mocker): # Arrange mock_python_engine_convert_pandas_statistics = mocker.patch( @@ -923,6 +951,10 @@ def test_profile_polars(self, mocker): ) assert mock_python_engine_convert_pandas_statistics.call_count == 3 + @pytest.mark.skipif( + not HAS_POLARS, + reason="Polars is not installed.", + ) def test_profile_polars_with_null_column(self, mocker): # Arrange mock_python_engine_convert_pandas_statistics = mocker.patch( @@ -1261,6 +1293,10 @@ def test_convert_to_default_dataframe_pandas_with_spaces(self, mocker): "Feature names are sanitized to use underscore '_' in the feature store." ) + @pytest.mark.skipif( + not HAS_POLARS, + reason="Polars is not installed.", + ) def test_convert_to_default_dataframe_polars(self, mocker): # Arrange mock_warnings = mocker.patch("warnings.warn") @@ -1327,6 +1363,10 @@ def test_parse_schema_feature_group_pandas(self, mocker): assert result[0].name == "col1" assert result[1].name == "col2" + @pytest.mark.skipif( + not HAS_POLARS, + reason="Polars is not installed.", + ) def test_parse_schema_feature_group_polars(self, mocker): # Arrange mocker.patch("hsfs.core.type_systems.convert_pandas_dtype_to_offline_type") @@ -1648,6 +1688,10 @@ def test_split_labels_dataframe_type_pandas(self): assert isinstance(result_df, pd.DataFrame) assert result_df_split is None + @pytest.mark.skipif( + not HAS_POLARS, + reason="Polars is not installed.", + ) def test_split_labels_dataframe_type_polars(self): # Arrange python_engine = python.Engine() @@ -1743,6 +1787,10 @@ def test_split_labels_labels_dataframe_type_pandas(self): assert isinstance(result_df, pd.DataFrame) assert isinstance(result_df_split, pd.Series) + @pytest.mark.skipif( + not HAS_POLARS, + reason="Polars is not installed.", + ) def test_split_labels_labels_dataframe_type_polars(self): # Arrange python_engine = python.Engine() @@ -2416,6 +2464,10 @@ def test_return_dataframe_type_pandas(self): # Assert assert str(result) == " col1 col2\n0 1 3\n1 2 4" + @pytest.mark.skipif( + not HAS_POLARS, + reason="Polars is not installed.", + ) def test_return_dataframe_type_polars(self): # Arrange python_engine = python.Engine() @@ -2778,6 +2830,10 @@ def plus_two(col1, col2): assert result["plus_two_col1_col2_1"][0] == 12 assert result["plus_two_col1_col2_1"][1] == 13 + @pytest.mark.skipif( + not HAS_POLARS, + reason="Polars is not installed.", + ) def test_apply_transformation_function_polars(self, mocker): # Arrange mocker.patch("hopsworks_common.client.get_instance") diff --git a/requirements-docs.txt b/requirements-docs.txt index 8bc8d6230..8dde6cc39 100644 --- a/requirements-docs.txt +++ b/requirements-docs.txt @@ -8,4 +8,5 @@ mkdocs-jupyter==0.24.3 markdown==3.6 pymdown-extensions==10.7.1 mkdocs-macros-plugin==1.0.4 +polars==0.20.31 mkdocs-minify-plugin>=0.2.0 From 6e2190674cbf9d261be43d7e4b8401850eab6e8c Mon Sep 17 00:00:00 2001 From: Robin Andersson Date: Wed, 18 Sep 2024 10:50:17 +0200 Subject: [PATCH 11/23] [HWORKS-1624] hopsworks internal client is writing ca_chain.pem to cwd polluting the project datasets (#335) --- python/hopsworks_common/client/hopsworks.py | 22 +--- .../client/istio/hopsworks.py | 123 +----------------- 2 files changed, 4 insertions(+), 141 deletions(-) diff --git a/python/hopsworks_common/client/hopsworks.py b/python/hopsworks_common/client/hopsworks.py index d73b53865..18c612902 100644 --- a/python/hopsworks_common/client/hopsworks.py +++ b/python/hopsworks_common/client/hopsworks.py @@ -21,12 +21,6 @@ from hopsworks_common.client import auth, base -try: - import jks -except ImportError: - pass - - class Client(base.Client): HOPSWORKS_HOSTNAME_VERIFICATION = "HOPSWORKS_HOSTNAME_VERIFICATION" DOMAIN_CA_TRUSTSTORE_PEM = "DOMAIN_CA_TRUSTSTORE_PEM" @@ -56,7 +50,7 @@ def __init__(self, hostname_verification): self._hostname_verification = os.environ.get( self.HOPSWORKS_HOSTNAME_VERIFICATION, "{}".format(hostname_verification) ).lower() in ("true", "1", "y", "yes") - self._hopsworks_ca_trust_store_path = self._get_trust_store_path() + self._hopsworks_ca_trust_store_path = self._get_ca_chain_path() self._project_id = os.environ[self.PROJECT_ID] self._project_name = self._project_name() @@ -81,20 +75,6 @@ def _get_hopsworks_rest_endpoint(self): """Get the hopsworks REST endpoint for making requests to the REST API.""" return os.environ[self.REST_ENDPOINT] - def _get_trust_store_path(self): - """Convert truststore from jks to pem and return the location""" - ca_chain_path = Path(self.PEM_CA_CHAIN) - if not ca_chain_path.exists(): - keystore_pw = self._cert_key - ks = jks.KeyStore.load( - self._get_jks_key_store_path(), keystore_pw, try_decrypt_keys=True - ) - ts = jks.KeyStore.load( - self._get_jks_trust_store_path(), keystore_pw, try_decrypt_keys=True - ) - self._write_ca_chain(ks, ts, ca_chain_path) - return str(ca_chain_path) - def _get_ca_chain_path(self) -> str: return os.path.join("/tmp", "ca_chain.pem") diff --git a/python/hopsworks_common/client/istio/hopsworks.py b/python/hopsworks_common/client/istio/hopsworks.py index 1f676dd26..76017b408 100644 --- a/python/hopsworks_common/client/istio/hopsworks.py +++ b/python/hopsworks_common/client/istio/hopsworks.py @@ -14,22 +14,13 @@ # limitations under the License. # -import base64 import os -import textwrap -from pathlib import Path import requests from hopsworks_common.client import auth, exceptions from hopsworks_common.client.istio import base as istio -try: - import jks -except ImportError: - pass - - class Client(istio.Client): REQUESTS_VERIFY = "REQUESTS_VERIFY" PROJECT_ID = "HOPSWORKS_PROJECT_ID" @@ -54,7 +45,7 @@ def __init__(self, host, port): self._port = port self._base_url = "http://" + self._host + ":" + str(self._port) - trust_store_path = self._get_trust_store_path() + trust_store_path = self._get_ca_chain_path() hostname_verification = ( os.environ[self.REQUESTS_VERIFY] if self.REQUESTS_VERIFY in os.environ @@ -88,116 +79,8 @@ def _project_user(self): hops_user = os.environ[self.HDFS_USER] return hops_user - def _get_trust_store_path(self): - """Convert truststore from jks to pem and return the location""" - ca_chain_path = Path(self.PEM_CA_CHAIN) - if not ca_chain_path.exists(): - self._write_ca_chain(ca_chain_path) - return str(ca_chain_path) - - def _write_ca_chain(self, ca_chain_path): - """ - Converts JKS trustore file into PEM to be compatible with Python libraries - """ - keystore_pw = self._cert_key - keystore_ca_cert = self._convert_jks_to_pem( - self._get_jks_key_store_path(), keystore_pw - ) - truststore_ca_cert = self._convert_jks_to_pem( - self._get_jks_trust_store_path(), keystore_pw - ) - - with ca_chain_path.open("w") as f: - f.write(keystore_ca_cert + truststore_ca_cert) - - def _convert_jks_to_pem(self, jks_path, keystore_pw): - """ - Converts a keystore JKS that contains client private key, - client certificate and CA certificate that was used to - sign the certificate to PEM format and returns the CA certificate. - Args: - :jks_path: path to the JKS file - :pw: password for decrypting the JKS file - Returns: - strings: (ca_cert) - """ - # load the keystore and decrypt it with password - ks = jks.KeyStore.load(jks_path, keystore_pw, try_decrypt_keys=True) - ca_certs = "" - - # Convert CA Certificates into PEM format and append to string - for _alias, c in ks.certs.items(): - ca_certs = ca_certs + self._bytes_to_pem_str(c.cert, "CERTIFICATE") - return ca_certs - - def _bytes_to_pem_str(self, der_bytes, pem_type): - """ - Utility function for creating PEM files - - Args: - der_bytes: DER encoded bytes - pem_type: type of PEM, e.g Certificate, Private key, or RSA private key - - Returns: - PEM String for a DER-encoded certificate or private key - """ - pem_str = "" - pem_str = pem_str + "-----BEGIN {}-----".format(pem_type) + "\n" - pem_str = ( - pem_str - + "\r\n".join( - textwrap.wrap(base64.b64encode(der_bytes).decode("ascii"), 64) - ) - + "\n" - ) - pem_str = pem_str + "-----END {}-----".format(pem_type) + "\n" - return pem_str - - def _get_jks_trust_store_path(self): - """ - Get truststore location - - Returns: - truststore location - """ - t_certificate = Path(self.T_CERTIFICATE) - if t_certificate.exists(): - return str(t_certificate) - else: - username = os.environ[self.HADOOP_USER_NAME] - material_directory = Path(os.environ[self.MATERIAL_DIRECTORY]) - return str(material_directory.joinpath(username + self.TRUSTSTORE_SUFFIX)) - - def _get_jks_key_store_path(self): - """ - Get keystore location - - Returns: - keystore location - """ - k_certificate = Path(self.K_CERTIFICATE) - if k_certificate.exists(): - return str(k_certificate) - else: - username = os.environ[self.HADOOP_USER_NAME] - material_directory = Path(os.environ[self.MATERIAL_DIRECTORY]) - return str(material_directory.joinpath(username + self.KEYSTORE_SUFFIX)) - - def _get_cert_pw(self): - """ - Get keystore password from local container - - Returns: - Certificate password - """ - pwd_path = Path(self.MATERIAL_PWD) - if not pwd_path.exists(): - username = os.environ[self.HADOOP_USER_NAME] - material_directory = Path(os.environ[self.MATERIAL_DIRECTORY]) - pwd_path = material_directory.joinpath(username + self.CERT_KEY_SUFFIX) - - with pwd_path.open() as f: - return f.read() + def _get_ca_chain_path(self) -> str: + return os.path.join("/tmp", self.PEM_CA_CHAIN) def _get_serving_api_key(self): """Retrieve serving API key from environment variable.""" From 10726a987f4684b39d042a1ee495eddf9045cf98 Mon Sep 17 00:00:00 2001 From: Fabio Buso Date: Wed, 18 Sep 2024 23:49:44 +0200 Subject: [PATCH 12/23] [FSTORE-1534] Add support for working with multiple S3 connectors within the same application --- python/hsfs/engine/spark.py | 33 +++++++++++----- python/tests/engine/test_spark.py | 64 ++++++++++++++++++++++++++++++- 2 files changed, 85 insertions(+), 12 deletions(-) diff --git a/python/hsfs/engine/spark.py b/python/hsfs/engine/spark.py index a1fe19a62..53ea4dc0f 100644 --- a/python/hsfs/engine/spark.py +++ b/python/hsfs/engine/spark.py @@ -1188,41 +1188,54 @@ def setup_storage_connector(self, storage_connector, path=None): return path def _setup_s3_hadoop_conf(self, storage_connector, path): - FS_S3_ENDPOINT = "fs.s3a.endpoint" + # For legacy behaviour set the S3 values at global level + self._set_s3_hadoop_conf(storage_connector, "fs.s3a") + + # Set credentials at bucket level as well to allow users to use multiple + # storage connector in the same application. + self._set_s3_hadoop_conf( + storage_connector, f"fs.s3a.bucket.{storage_connector.bucket}" + ) + return path.replace("s3", "s3a", 1) if path is not None else None + + def _set_s3_hadoop_conf(self, storage_connector, prefix): if storage_connector.access_key: self._spark_context._jsc.hadoopConfiguration().set( - "fs.s3a.access.key", storage_connector.access_key + f"{prefix}.access.key", storage_connector.access_key ) if storage_connector.secret_key: self._spark_context._jsc.hadoopConfiguration().set( - "fs.s3a.secret.key", storage_connector.secret_key + f"{prefix}.secret.key", storage_connector.secret_key ) if storage_connector.server_encryption_algorithm: self._spark_context._jsc.hadoopConfiguration().set( - "fs.s3a.server-side-encryption-algorithm", + f"{prefix}.server-side-encryption-algorithm", storage_connector.server_encryption_algorithm, ) if storage_connector.server_encryption_key: self._spark_context._jsc.hadoopConfiguration().set( - "fs.s3a.server-side-encryption-key", + f"{prefix}.server-side-encryption-key", storage_connector.server_encryption_key, ) if storage_connector.session_token: + print(f"session token set for {prefix}") self._spark_context._jsc.hadoopConfiguration().set( - "fs.s3a.aws.credentials.provider", + f"{prefix}.aws.credentials.provider", "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider", ) self._spark_context._jsc.hadoopConfiguration().set( - "fs.s3a.session.token", + f"{prefix}.session.token", storage_connector.session_token, ) + + # This is the name of the property as expected from the user, without the bucket name. + FS_S3_ENDPOINT = "fs.s3a.endpoint" if FS_S3_ENDPOINT in storage_connector.arguments: self._spark_context._jsc.hadoopConfiguration().set( - FS_S3_ENDPOINT, storage_connector.spark_options().get(FS_S3_ENDPOINT) + f"{prefix}.endpoint", + storage_connector.spark_options().get(FS_S3_ENDPOINT), ) - return path.replace("s3", "s3a", 1) if path is not None else None - def _setup_adls_hadoop_conf(self, storage_connector, path): for k, v in storage_connector.spark_options().items(): self._spark_context._jsc.hadoopConfiguration().set(k, v) diff --git a/python/tests/engine/test_spark.py b/python/tests/engine/test_spark.py index f334de165..3c7fa999c 100644 --- a/python/tests/engine/test_spark.py +++ b/python/tests/engine/test_spark.py @@ -4372,7 +4372,7 @@ def test_setup_storage_connector_jdbc(self, mocker): assert mock_spark_engine_setup_adls_hadoop_conf.call_count == 0 assert mock_spark_engine_setup_gcp_hadoop_conf.call_count == 0 - def test_setup_s3_hadoop_conf(self, mocker): + def test_setup_s3_hadoop_conf_legacy(self, mocker): # Arrange mock_pyspark_getOrCreate = mocker.patch( "pyspark.sql.session.SparkSession.builder.getOrCreate" @@ -4384,6 +4384,7 @@ def test_setup_s3_hadoop_conf(self, mocker): id=1, name="test_connector", featurestore_id=99, + bucket="bucket-name", access_key="1", secret_key="2", server_encryption_algorithm="3", @@ -4402,7 +4403,7 @@ def test_setup_s3_hadoop_conf(self, mocker): assert result == "s3a_test_path" assert ( mock_pyspark_getOrCreate.return_value.sparkContext._jsc.hadoopConfiguration.return_value.set.call_count - == 7 + == 14 ) mock_pyspark_getOrCreate.return_value.sparkContext._jsc.hadoopConfiguration.return_value.set.assert_any_call( "fs.s3a.access.key", s3_connector.access_key @@ -4428,6 +4429,65 @@ def test_setup_s3_hadoop_conf(self, mocker): "fs.s3a.endpoint", s3_connector.arguments.get("fs.s3a.endpoint") ) + def test_setup_s3_hadoop_conf_bucket_scope(self, mocker): + # Arrange + mock_pyspark_getOrCreate = mocker.patch( + "pyspark.sql.session.SparkSession.builder.getOrCreate" + ) + + spark_engine = spark.Engine() + + s3_connector = storage_connector.S3Connector( + id=1, + name="test_connector", + featurestore_id=99, + bucket="bucket-name", + access_key="1", + secret_key="2", + server_encryption_algorithm="3", + server_encryption_key="4", + session_token="5", + arguments=[{"name": "fs.s3a.endpoint", "value": "testEndpoint"}], + ) + + # Act + result = spark_engine._setup_s3_hadoop_conf( + storage_connector=s3_connector, + path="s3_test_path", + ) + + # Assert + assert result == "s3a_test_path" + assert ( + mock_pyspark_getOrCreate.return_value.sparkContext._jsc.hadoopConfiguration.return_value.set.call_count + == 14 + ) + mock_pyspark_getOrCreate.return_value.sparkContext._jsc.hadoopConfiguration.return_value.set.assert_any_call( + "fs.s3a.bucket.bucket-name.access.key", s3_connector.access_key + ) + mock_pyspark_getOrCreate.return_value.sparkContext._jsc.hadoopConfiguration.return_value.set.assert_any_call( + "fs.s3a.bucket.bucket-name.secret.key", s3_connector.secret_key + ) + mock_pyspark_getOrCreate.return_value.sparkContext._jsc.hadoopConfiguration.return_value.set.assert_any_call( + "fs.s3a.bucket.bucket-name.server-side-encryption-algorithm", + s3_connector.server_encryption_algorithm, + ) + mock_pyspark_getOrCreate.return_value.sparkContext._jsc.hadoopConfiguration.return_value.set.assert_any_call( + "fs.s3a.bucket.bucket-name.server-side-encryption-key", + s3_connector.server_encryption_key, + ) + mock_pyspark_getOrCreate.return_value.sparkContext._jsc.hadoopConfiguration.return_value.set.assert_any_call( + "fs.s3a.bucket.bucket-name.aws.credentials.provider", + "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider", + ) + mock_pyspark_getOrCreate.return_value.sparkContext._jsc.hadoopConfiguration.return_value.set.assert_any_call( + "fs.s3a.bucket.bucket-name.session.token", s3_connector.session_token + ) + mock_pyspark_getOrCreate.return_value.sparkContext._jsc.hadoopConfiguration.return_value.set.assert_any_call( + "fs.s3a.bucket.bucket-name.endpoint", + s3_connector.arguments.get("fs.s3a.endpoint"), + ) + def test_setup_adls_hadoop_conf(self, mocker): # Arrange mock_pyspark_getOrCreate = mocker.patch( From c2184311ba636a329c27534bf9ff441507ade6ac Mon Sep 17 00:00:00 2001 From: Robin Andersson Date: Fri, 20 Sep 2024 13:48:14 +0200 Subject: [PATCH 13/23] [HWORKS-1624][APPEND] ca_chain.pem is needed when calling _get_credentials for internal clients (#337) --- python/hopsworks_common/client/hopsworks.py | 23 +++++++++++++++++++-- python/hopsworks_common/connection.py | 2 +- python/tests/test_connection.py | 2 +- 3 files changed, 23 insertions(+), 4 deletions(-) diff --git a/python/hopsworks_common/client/hopsworks.py b/python/hopsworks_common/client/hopsworks.py index 18c612902..d3058d637 100644 --- a/python/hopsworks_common/client/hopsworks.py +++ b/python/hopsworks_common/client/hopsworks.py @@ -21,6 +21,12 @@ from hopsworks_common.client import auth, base +try: + import jks +except ImportError: + pass + + class Client(base.Client): HOPSWORKS_HOSTNAME_VERIFICATION = "HOPSWORKS_HOSTNAME_VERIFICATION" DOMAIN_CA_TRUSTSTORE_PEM = "DOMAIN_CA_TRUSTSTORE_PEM" @@ -50,7 +56,7 @@ def __init__(self, hostname_verification): self._hostname_verification = os.environ.get( self.HOPSWORKS_HOSTNAME_VERIFICATION, "{}".format(hostname_verification) ).lower() in ("true", "1", "y", "yes") - self._hopsworks_ca_trust_store_path = self._get_ca_chain_path() + self._hopsworks_ca_trust_store_path = self._materialize_ca_chain() self._project_id = os.environ[self.PROJECT_ID] self._project_name = self._project_name() @@ -67,10 +73,23 @@ def __init__(self, hostname_verification): credentials = self._get_credentials(self._project_id) - self._write_pem_file(credentials["caChain"], self._get_ca_chain_path()) self._write_pem_file(credentials["clientCert"], self._get_client_cert_path()) self._write_pem_file(credentials["clientKey"], self._get_client_key_path()) + def _materialize_ca_chain(self): + """Convert truststore from jks to pem and return the location""" + ca_chain_path = Path(self._get_ca_chain_path()) + if not ca_chain_path.exists(): + keystore_pw = self._cert_key + ks = jks.KeyStore.load( + self._get_jks_key_store_path(), keystore_pw, try_decrypt_keys=True + ) + ts = jks.KeyStore.load( + self._get_jks_trust_store_path(), keystore_pw, try_decrypt_keys=True + ) + self._write_ca_chain(ks, ts, ca_chain_path) + return str(ca_chain_path) + def _get_hopsworks_rest_endpoint(self): """Get the hopsworks REST endpoint for making requests to the REST API.""" return os.environ[self.REST_ENDPOINT] diff --git a/python/hopsworks_common/connection.py b/python/hopsworks_common/connection.py index 23b0b29ae..da7ca9e52 100644 --- a/python/hopsworks_common/connection.py +++ b/python/hopsworks_common/connection.py @@ -38,7 +38,7 @@ HOPSWORKS_PORT_DEFAULT = 443 HOSTNAME_VERIFICATION_DEFAULT = os.environ.get( - "HOPSWORKS_HOSTNAME_VERIFICATION", "True" + "HOPSWORKS_HOSTNAME_VERIFICATION", "False" ).lower() in ("true", "1", "y", "yes") # alias for backwards compatibility: HOPSWORKS_HOSTNAME_VERIFICATION_DEFAULT = HOSTNAME_VERIFICATION_DEFAULT diff --git a/python/tests/test_connection.py b/python/tests/test_connection.py index cd38cfdfb..cba73b42d 100644 --- a/python/tests/test_connection.py +++ b/python/tests/test_connection.py @@ -31,7 +31,7 @@ def test_constants(self): # adding / removing / updating tests, if necessary. assert HOSTS.APP_HOST == "c.app.hopsworks.ai" assert HOPSWORKS_PORT_DEFAULT == 443 - assert HOSTNAME_VERIFICATION_DEFAULT + assert HOSTNAME_VERIFICATION_DEFAULT is False # constructor From 74828fb991e9fa926c29506f62411e6c286a7b6b Mon Sep 17 00:00:00 2001 From: Aleksey Veresov Date: Mon, 23 Sep 2024 12:49:39 +0200 Subject: [PATCH 14/23] Prepare docs for release (#339) --- CONTRIBUTING.md | 9 +++++---- docs/templates/api/connection.md | 8 +++----- docs/templates/api/connection_api.md | 11 ----------- docs/templates/api/job.md | 11 ----------- docs/templates/api/jobs.md | 2 +- docs/templates/connection_api.md | 11 ----------- mkdocs.yml | 12 +++++------- python/auto_doc.py | 29 +++++----------------------- 8 files changed, 19 insertions(+), 74 deletions(-) delete mode 100644 docs/templates/api/connection_api.md delete mode 100644 docs/templates/api/job.md delete mode 100644 docs/templates/connection_api.md diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index e2801b11b..613ec0169 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -36,7 +36,7 @@ We follow a few best practices for writing the Python documentation: -1. Use the google docstring style: +1. Use the Google docstring style: ```python """[One Line Summary] @@ -72,16 +72,17 @@ We use `mkdocs` together with `mike` ([for versioning](https://github.com/jimpor **Background about `mike`:** `mike` builds the documentation and commits it as a new directory to the gh-pages branch. Each directory corresponds to one version of the documentation. Additionally, `mike` maintains a json in the root of gh-pages with the mappings of versions/aliases for each of the directories available. With aliases you can define extra names like `dev` or `latest`, to indicate stable and unstable releases. -1. Install Hopsworks with `dev-docs` extras: +1. Install Hopsworks with `requirements-docs.txt`: ```bash - pip install -e ".[dev-docs]" + pip install -r requirements-docs.txt + pip install -e "python[dev]" ``` 2. To build the docs, first run the auto doc script: ```bash - python auto_doc.py + python python/auto_doc.py ``` ##### Option 1: Build only current version of docs diff --git a/docs/templates/api/connection.md b/docs/templates/api/connection.md index a978735e8..19e13f3eb 100644 --- a/docs/templates/api/connection.md +++ b/docs/templates/api/connection.md @@ -1,8 +1,6 @@ -# Connection API +# Connection -## Creation - -{{connection_create}} +{{connection}} ## Properties @@ -10,4 +8,4 @@ ## Methods -{{connection_methods}} \ No newline at end of file +{{connection_methods}} diff --git a/docs/templates/api/connection_api.md b/docs/templates/api/connection_api.md deleted file mode 100644 index 19e13f3eb..000000000 --- a/docs/templates/api/connection_api.md +++ /dev/null @@ -1,11 +0,0 @@ -# Connection - -{{connection}} - -## Properties - -{{connection_properties}} - -## Methods - -{{connection_methods}} diff --git a/docs/templates/api/job.md b/docs/templates/api/job.md deleted file mode 100644 index 9ad68d976..000000000 --- a/docs/templates/api/job.md +++ /dev/null @@ -1,11 +0,0 @@ -# Job - -{{job}} - -## Methods - -{{job_methods}} - -## Job Configuration - -{{job_configuration}} diff --git a/docs/templates/api/jobs.md b/docs/templates/api/jobs.md index 3f846a4f0..9a9a4bb07 100644 --- a/docs/templates/api/jobs.md +++ b/docs/templates/api/jobs.md @@ -1,6 +1,6 @@ # Jobs API -## Handle +## Handle {{job_api_handle}} diff --git a/docs/templates/connection_api.md b/docs/templates/connection_api.md deleted file mode 100644 index 19e13f3eb..000000000 --- a/docs/templates/connection_api.md +++ /dev/null @@ -1,11 +0,0 @@ -# Connection - -{{connection}} - -## Properties - -{{connection_properties}} - -## Methods - -{{connection_methods}} diff --git a/mkdocs.yml b/mkdocs.yml index 4e892dbbb..823e3c8f2 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -16,8 +16,8 @@ nav: - Guides: https://docs.hopsworks.ai/ - Setup and Installation: https://docs.hopsworks.ai/ - Administration: https://docs.hopsworks.ai/ - - API: - - API Reference: + - API: + - Platform API: - Login: generated/api/login.md - Connection: generated/api/connection.md - Projects: generated/api/projects.md @@ -33,7 +33,7 @@ nav: - KafkaSchema: generated/api/kafka_schema.md - Secrets: generated/api/secrets.md - OpenSearch: generated/api/opensearch.md - - Connection (HSFS): generated/api/connection_api.md + - Feature Store API: - ExpectationSuite: generated/api/expectation_suite_api.md - FeatureStore: generated/api/feature_store_api.md - FeatureGroup: generated/api/feature_group_api.md @@ -48,11 +48,10 @@ nav: - UDF: generated/api/udf.md - HopsworksUDF: generated/api/hopsworks_udf.md - TransformationFunction: generated/api/transformation_functions_api.md - - Transformation Statistics: + - Transformation Statistics: - TransformationStatistics: generated/api/transformation_statistics.md - FeatureTransformationStatistics: generated/api/feature_transformation_statistics.md - ValidationReport: generated/api/validation_report_api.md - - Job: generated/api/job.md - Provenance Links: generated/api/links.md - Statistics: - Statistics: generated/api/statistics_api.md @@ -66,7 +65,7 @@ nav: - EmbeddingIndex: generated/api/embedding_index_api.md - EmbeddingFeature: generated/api/embedding_feature_api.md - SimilarityFunctionType: generated/api/similarity_function_type_api.md - - Connection (HSML): generated/connection_api.md + - Machine Learning API: - Model Registry: - Model Registry: generated/model-registry/model_registry_api.md - Model: generated/model-registry/model_api.md @@ -82,7 +81,6 @@ nav: - Inference Batcher: generated/model-serving/inference_batcher_api.md - Resources: generated/model-serving/resources_api.md # Added to allow navigation using the side drawer - - Hopsworks API: https://docs.hopsworks.ai/hopsworks-api/latest/ - Feature Store JavaDoc: https://docs.hopsworks.ai/feature-store-javadoc/latest/ - Contributing: CONTRIBUTING.md - Community ↗: https://community.hopsworks.ai/ diff --git a/python/auto_doc.py b/python/auto_doc.py index 7d5baa7a4..49129f52a 100644 --- a/python/auto_doc.py +++ b/python/auto_doc.py @@ -40,7 +40,7 @@ "udf": ["hopsworks.udf"], }, "api/connection.md": { - "connection_create": ["hopsworks.connection.Connection.connection"], + "connection": ["hopsworks.connection.Connection.connection"], "connection_properties": keras_autodoc.get_properties( "hopsworks.connection.Connection" ), @@ -61,7 +61,10 @@ "job_get": ["hopsworks.core.job_api.JobApi.get_job"], "job_get_all": ["hopsworks.core.job_api.JobApi.get_jobs"], "job_properties": keras_autodoc.get_properties("hopsworks.job.Job"), - "job_config": ["hopsworks.core.job_api.JobApi.get_configuration"], + "job_config": [ + "hopsworks.core.job_api.JobApi.get_configuration", + "hopsworks_common.core.job_configuration.JobConfiguration", + ], "job_methods": keras_autodoc.get_methods( "hopsworks.job.Job", exclude=["from_response_json", "json"] ), @@ -190,13 +193,6 @@ "hopsworks.core.opensearch_api.OpenSearchApi" ), }, - "api/connection_api.md": { - "connection": ["hsfs.connection.Connection"], - "connection_properties": keras_autodoc.get_properties( - "hsfs.connection.Connection" - ), - "connection_methods": keras_autodoc.get_methods("hsfs.connection.Connection"), - }, "api/spine_group_api.md": { "fg": ["hsfs.feature_group.SpineGroup"], "fg_create": ["hsfs.feature_store.FeatureStore.get_or_create_spine_group"], @@ -413,14 +409,6 @@ "hsfs.validation_report.ValidationReport" ), }, - "api/job.md": { - "job_configuration": ["hsfs.core.job_configuration.JobConfiguration"], - "job": ["hsfs.core.job.Job"], - "job_methods": [ - "hsfs.core.job.Job.get_state", - "hsfs.core.job.Job.get_final_state", - ], - }, "api/query_api.md": { "query_methods": keras_autodoc.get_methods( "hsfs.constructor.query.Query", @@ -550,13 +538,6 @@ "similarity_function_type": ["hsfs.embedding.SimilarityFunctionType"], }, # Model registry - "connection_api.md": { - "connection": ["hsml.connection.Connection"], - "connection_properties": keras_autodoc.get_properties( - "hsml.connection.Connection", exclude=["trust_store_path"] - ), - "connection_methods": keras_autodoc.get_methods("hsml.connection.Connection"), - }, "model-registry/model_registry_api.md": { "mr_get": ["hsml.connection.Connection.get_model_registry"], "mr_modules": keras_autodoc.get_properties( From 31ad5aada09fef432941a341453735ac9553732b Mon Sep 17 00:00:00 2001 From: Alex Ormenisan Date: Tue, 24 Sep 2024 09:06:02 +0200 Subject: [PATCH 15/23] [HWORKS-1421] Add option to specify kubernetes namespace at project creation (#336) --- python/hopsworks_common/project.py | 7 +++++++ python/hsml/core/serving_api.py | 18 +++++++----------- python/hsml/deployment.py | 11 +++++++++++ python/hsml/predictor.py | 13 +++++++++++++ python/tests/fixtures/predictor_fixtures.json | 9 ++++++--- 5 files changed, 44 insertions(+), 14 deletions(-) diff --git a/python/hopsworks_common/project.py b/python/hopsworks_common/project.py index 37eaf7704..df82b3f79 100644 --- a/python/hopsworks_common/project.py +++ b/python/hopsworks_common/project.py @@ -51,6 +51,7 @@ def __init__( services=None, datasets=None, creation_status=None, + project_namespace=None, **kwargs, ): self._id = project_id @@ -67,6 +68,7 @@ def __init__( self._git_api = git_api.GitApi() self._dataset_api = dataset_api.DatasetApi() self._environment_api = environment_api.EnvironmentApi() + self._project_namespace = project_namespace @classmethod def from_response_json(cls, json_dict): @@ -101,6 +103,11 @@ def created(self): """Timestamp when the project was created""" return self._created + @property + def project_namespace(self): + """Kubernetes namespace used by project""" + return self._project_namespace + def get_feature_store( self, name: Optional[str] = None, engine: Optional[str] = None ): # -> hsfs.feature_store.FeatureStore diff --git a/python/hsml/core/serving_api.py b/python/hsml/core/serving_api.py index 149605f30..c17eba65c 100644 --- a/python/hsml/core/serving_api.py +++ b/python/hsml/core/serving_api.py @@ -260,7 +260,7 @@ def _send_inference_request_via_rest_protocol( path_params = self._get_istio_inference_path(deployment_instance) # - add host header headers["host"] = self._get_inference_request_host_header( - _client._project_name, + deployment_instance.project_namespace, deployment_instance.name, client.get_knative_domain(), ) @@ -291,9 +291,7 @@ def _send_inference_request_via_grpc_protocol( # the channel, which will be reused in all following calls on the same deployment object. # The gRPC channel is freed when calling deployment.stop() print("Initializing gRPC channel...") - deployment_instance._grpc_channel = self._create_grpc_channel( - deployment_instance.name - ) + deployment_instance._grpc_channel = self._create_grpc_channel(deployment_instance) # build an infer request request = InferRequest( infer_inputs=data, @@ -308,11 +306,11 @@ def _send_inference_request_via_grpc_protocol( # extract infer outputs return infer_response.outputs - def _create_grpc_channel(self, deployment_name: str): + def _create_grpc_channel(self, deployment_instance): _client = client.istio.get_instance() service_hostname = self._get_inference_request_host_header( - _client._project_name, - deployment_name, + deployment_instance.project_namespace, + deployment_instance.name, client.get_knative_domain(), ) return _client._create_grpc_channel(service_hostname) @@ -405,11 +403,9 @@ def get_logs(self, deployment_instance, component, tail): ) def _get_inference_request_host_header( - self, project_name: str, deployment_name: str, domain: str + self, project_namespace: str, deployment_name: str, domain: str ): - return "{}.{}.{}".format( - deployment_name, project_name.replace("_", "-"), domain - ).lower() + return "{}.{}.{}".format(deployment_name, project_namespace, domain).lower() def _get_hopsworks_inference_path(self, project_id: int, deployment_instance): return [ diff --git a/python/hsml/deployment.py b/python/hsml/deployment.py index 9a88e8873..8891b149f 100644 --- a/python/hsml/deployment.py +++ b/python/hsml/deployment.py @@ -38,10 +38,12 @@ def __init__( predictor, name: Optional[str] = None, description: Optional[str] = None, + project_namespace: str = None, **kwargs, ): self._predictor = predictor self._description = description + self._project_namespace = project_namespace if self._predictor is None: raise ModelServingException("A predictor is required") @@ -476,6 +478,15 @@ def environment(self): def environment(self, environment: str): self._predictor.environment = environment + @property + def project_namespace(self): + """Name of inference environment""" + return self._predictor.project_namespace + + @project_namespace.setter + def project_namespace(self, project_namespace: str): + self._predictor.project_namespace = project_namespace + def __repr__(self): desc = ( f", description: {self._description!r}" diff --git a/python/hsml/predictor.py b/python/hsml/predictor.py index 8ea54d23e..4c56dc2de 100644 --- a/python/hsml/predictor.py +++ b/python/hsml/predictor.py @@ -62,6 +62,7 @@ def __init__( creator: Optional[str] = None, api_protocol: Optional[str] = INFERENCE_ENDPOINTS.API_PROTOCOL_REST, environment: Optional[str] = None, + project_namespace: str = None, **kwargs, ): serving_tool = ( @@ -98,6 +99,7 @@ def __init__( self._validate_script_file(self._model_framework, self._script_file) self._api_protocol = api_protocol self._environment = environment + self._project_namespace = project_namespace def deploy(self): """Create a deployment for this predictor and persists it in the Model Serving. @@ -278,6 +280,7 @@ def extract_fields_from_json(cls, json_decamelized): if "environment_dto" in json_decamelized: environment = json_decamelized.pop("environment_dto") kwargs["environment"] = environment["name"] + kwargs["project_namespace"] = json_decamelized.pop("project_namespace") return kwargs def update_from_response_json(self, json_dict): @@ -305,6 +308,7 @@ def to_dict(self): "servingTool": self._serving_tool, "predictor": self._script_file, "apiProtocol": self._api_protocol, + "project_namespace": self._project_namespace, } if self.environment is not None: json = {**json, **{"environmentDTO": {"name": self._environment}}} @@ -478,6 +482,15 @@ def environment(self): def environment(self, environment): self._environment = environment + @property + def project_namespace(self): + """Kubernetes project namespace""" + return self._project_namespace + + @project_namespace.setter + def project_namespace(self, project_namespace): + self._project_namespace = project_namespace + def __repr__(self): desc = ( f", description: {self._description!r}" diff --git a/python/tests/fixtures/predictor_fixtures.json b/python/tests/fixtures/predictor_fixtures.json index 76adeebe3..6859f6cac 100644 --- a/python/tests/fixtures/predictor_fixtures.json +++ b/python/tests/fixtures/predictor_fixtures.json @@ -43,7 +43,8 @@ }, "environment_dto": { "name": "misc-inference-pipeline" - } + }, + "project_namespace": "test" } ] } @@ -98,7 +99,8 @@ }, "environment_dto": { "name": "misc-inference-pipeline" - } + }, + "project_namespace": "test" }, { "id": 2, @@ -140,7 +142,8 @@ }, "environment_dto": { "name": "misc-inference-pipeline" - } + }, + "project_namespace": "test" } ] } From 447d947be0268b5d425eb1383f8284bea575c633 Mon Sep 17 00:00:00 2001 From: Aleksey Veresov Date: Tue, 24 Sep 2024 16:35:26 +0200 Subject: [PATCH 16/23] Fix arrow_flight_client imports (#329) * Fix arrow_flight_client imports * Provide human-readable errors in case Pyarrow is not installed * Ruff --- python/hopsworks_common/core/constants.py | 24 +++++++---- python/hopsworks_common/core/type_systems.py | 4 +- python/hsfs/constructor/query.py | 4 +- python/hsfs/core/arrow_flight_client.py | 6 +++ python/hsfs/core/constants.py | 4 +- python/hsfs/core/feature_view_engine.py | 17 ++++++-- python/hsfs/engine/__init__.py | 3 +- python/hsfs/engine/python.py | 43 +++++++++++++++----- python/hsfs/feature_store.py | 5 ++- python/tests/core/test_type_systems.py | 6 +-- 10 files changed, 85 insertions(+), 31 deletions(-) diff --git a/python/hopsworks_common/core/constants.py b/python/hopsworks_common/core/constants.py index 2cba250a6..4ea804d4c 100644 --- a/python/hopsworks_common/core/constants.py +++ b/python/hopsworks_common/core/constants.py @@ -25,25 +25,35 @@ HAS_CONFLUENT_KAFKA: bool = importlib.util.find_spec("confluent_kafka") is not None confluent_kafka_not_installed_message = ( "Confluent Kafka package not found. " - "If you want to use Kafka with Hopsworks you can install the corresponding extras " - """`pip install hopsworks[python]` or `pip install "hopsworks[python]"` if using zsh. """ - "You can also install confluent-kafka directly in your environment e.g `pip install confluent-kafka`. " + "If you want to use Kafka with Hopsworks you can install the corresponding extras via " + '`pip install "hopsworks[python]"`. ' + "You can also install confluent-kafka directly in your environment with `pip install confluent-kafka`. " "You will need to restart your kernel if applicable." ) + # Data Validation / Great Expectations HAS_GREAT_EXPECTATIONS: bool = ( importlib.util.find_spec("great_expectations") is not None ) great_expectations_not_installed_message = ( "Great Expectations package not found. " - "If you want to use data validation with Hopsworks you can install the corresponding extras " - """`pip install hopsworks[great_expectations]` or `pip install "hopsworks[great_expectations]"` if using zsh. """ - "You can also install great-expectations directly in your environment e.g `pip install great-expectations`. " + "If you want to use data validation with Hopsworks you can install the corresponding extras via " + '`pip install "hopsworks[great_expectations]"`. ' + "You can also install great-expectations directly in your environment with `pip install great-expectations`. " "You will need to restart your kernel if applicable." ) initialise_expectation_suite_for_single_expectation_api_message = "Initialize Expectation Suite by attaching to a Feature Group to enable single expectation API" -HAS_ARROW: bool = importlib.util.find_spec("pyarrow") is not None +# Pyarrow +HAS_PYARROW: bool = importlib.util.find_spec("pyarrow") is not None +pyarrow_not_installed_message = ( + "Pyarrow package not found. " + "If you want to use Apache Arrow with Hopsworks you can install the corresponding extras via " + '`pip install "hopsworks[python]"`. ' + "You can also install pyarrow directly in your environment with `pip install pyarrow`. " + "You will need to restart your kernel if applicable." +) + HAS_PANDAS: bool = importlib.util.find_spec("pandas") is not None HAS_NUMPY: bool = importlib.util.find_spec("numpy") is not None HAS_POLARS: bool = importlib.util.find_spec("polars") is not None diff --git a/python/hopsworks_common/core/type_systems.py b/python/hopsworks_common/core/type_systems.py index aa38df296..42c11b378 100644 --- a/python/hopsworks_common/core/type_systems.py +++ b/python/hopsworks_common/core/type_systems.py @@ -23,9 +23,9 @@ import pytz from hopsworks_common.core.constants import ( - HAS_ARROW, HAS_PANDAS, HAS_POLARS, + HAS_PYARROW, ) from hopsworks_common.decorators import uses_polars @@ -35,7 +35,7 @@ import pandas as pd import polars as pl -if HAS_ARROW: +if HAS_PYARROW: import pyarrow as pa # Decimal types are currently not supported diff --git a/python/hsfs/constructor/query.py b/python/hsfs/constructor/query.py index 9a6c74272..b332ab1db 100644 --- a/python/hsfs/constructor/query.py +++ b/python/hsfs/constructor/query.py @@ -29,7 +29,7 @@ from hsfs.constructor import join from hsfs.constructor.filter import Filter, Logic from hsfs.constructor.fs_query import FsQuery -from hsfs.core import arrow_flight_client, query_constructor_api, storage_connector_api +from hsfs.core import query_constructor_api, storage_connector_api from hsfs.decorators import typechecked from hsfs.feature import Feature @@ -101,6 +101,8 @@ def _prep_read( online_conn = None if engine.get_instance().is_flyingduck_query_supported(self, read_options): + from hsfs.core import arrow_flight_client + sql_query = self._to_string(fs_query, online, asof=True) sql_query = arrow_flight_client.get_instance().create_query_object( self, sql_query, fs_query.on_demand_fg_aliases diff --git a/python/hsfs/core/arrow_flight_client.py b/python/hsfs/core/arrow_flight_client.py index e03283a38..53bb96a86 100644 --- a/python/hsfs/core/arrow_flight_client.py +++ b/python/hsfs/core/arrow_flight_client.py @@ -23,6 +23,12 @@ from functools import wraps from typing import Any, Dict, Optional, Union +from hopsworks_common.core.constants import HAS_PYARROW, pyarrow_not_installed_message + + +if not HAS_PYARROW: + raise ModuleNotFoundError(pyarrow_not_installed_message) + import pyarrow import pyarrow._flight import pyarrow.flight diff --git a/python/hsfs/core/constants.py b/python/hsfs/core/constants.py index a59a7240a..7edbd752b 100644 --- a/python/hsfs/core/constants.py +++ b/python/hsfs/core/constants.py @@ -16,7 +16,6 @@ from hopsworks_common.core.constants import ( HAS_AIOMYSQL, - HAS_ARROW, HAS_AVRO, HAS_CONFLUENT_KAFKA, HAS_FAST_AVRO, @@ -24,6 +23,7 @@ HAS_NUMPY, HAS_PANDAS, HAS_POLARS, + HAS_PYARROW, HAS_SQLALCHEMY, great_expectations_not_installed_message, initialise_expectation_suite_for_single_expectation_api_message, @@ -32,7 +32,7 @@ __all__ = [ "HAS_AIOMYSQL", - "HAS_ARROW", + "HAS_PYARROW", "HAS_AVRO", "HAS_CONFLUENT_KAFKA", "HAS_FAST_AVRO", diff --git a/python/hsfs/core/feature_view_engine.py b/python/hsfs/core/feature_view_engine.py index afdc9c75f..84b6c7bda 100644 --- a/python/hsfs/core/feature_view_engine.py +++ b/python/hsfs/core/feature_view_engine.py @@ -33,7 +33,6 @@ from hsfs.client import exceptions from hsfs.constructor.filter import Filter, Logic from hsfs.core import ( - arrow_flight_client, feature_view_api, query_constructor_api, statistics_engine, @@ -1009,7 +1008,17 @@ def get_models_provenance( def _check_feature_group_accessibility(self, feature_view_obj): if engine.get_type() == "python": - if arrow_flight_client.get_instance().is_enabled(): + try: + from hsfs.core import arrow_flight_client + + arrow_flight_client_imported = True + except ImportError: + arrow_flight_client_imported = False + + if ( + arrow_flight_client_imported + and arrow_flight_client.get_instance().is_enabled() + ): if not arrow_flight_client.supports( feature_view_obj.query.featuregroups ): @@ -1227,7 +1236,9 @@ def _get_feature_logging_data( model_col_name=FeatureViewEngine._HSML_MODEL, predictions=predictions, training_dataset_version=training_dataset_version, - hsml_model=self.get_hsml_model_value(hsml_model) if hsml_model else None, + hsml_model=self.get_hsml_model_value(hsml_model) + if hsml_model + else None, ) else: return engine.get_instance().get_feature_logging_df( diff --git a/python/hsfs/engine/__init__.py b/python/hsfs/engine/__init__.py index 9e3bab1e8..a4ee95daa 100644 --- a/python/hsfs/engine/__init__.py +++ b/python/hsfs/engine/__init__.py @@ -19,7 +19,6 @@ import hopsworks_common.connection from hsfs.client import exceptions -from hsfs.core import arrow_flight_client from hsfs.engine import spark, spark_no_metastore @@ -83,5 +82,7 @@ def get_type() -> str: def stop() -> None: global _engine + from hsfs.core import arrow_flight_client + _engine = None arrow_flight_client.close() diff --git a/python/hsfs/engine/python.py b/python/hsfs/engine/python.py index 81fae1c6f..96c4f9ecc 100644 --- a/python/hsfs/engine/python.py +++ b/python/hsfs/engine/python.py @@ -67,7 +67,6 @@ from hsfs import storage_connector as sc from hsfs.constructor import query from hsfs.core import ( - arrow_flight_client, dataset_api, feature_group_api, feature_view_api, @@ -83,11 +82,12 @@ ) from hsfs.core.constants import ( HAS_AIOMYSQL, - HAS_ARROW, HAS_GREAT_EXPECTATIONS, HAS_PANDAS, + HAS_PYARROW, HAS_SQLALCHEMY, ) +from hsfs.core.type_systems import PYARROW_HOPSWORKS_DTYPE_MAPPING from hsfs.core.vector_db_client import VectorDbClient from hsfs.feature_group import ExternalFeatureGroup, FeatureGroup from hsfs.training_dataset import TrainingDataset @@ -98,8 +98,6 @@ if HAS_GREAT_EXPECTATIONS: import great_expectations -if HAS_ARROW: - from hsfs.core.type_systems import PYARROW_HOPSWORKS_DTYPE_MAPPING if HAS_AIOMYSQL and HAS_SQLALCHEMY: from hsfs.core import util_sql @@ -157,6 +155,8 @@ def sql( def is_flyingduck_query_supported( self, query: "query.Query", read_options: Optional[Dict[str, Any]] = None ) -> bool: + from hsfs.core import arrow_flight_client + return arrow_flight_client.is_query_supported(query, read_options or {}) def _validate_dataframe_type(self, dataframe_type: str): @@ -180,6 +180,8 @@ def _sql_offline( ) -> Union[pd.DataFrame, pl.DataFrame]: self._validate_dataframe_type(dataframe_type) if isinstance(sql_query, dict) and "query_string" in sql_query: + from hsfs.core import arrow_flight_client + result_df = util.run_with_loading_animation( "Reading data from Hopsworks, using Hopsworks Feature Query Service", arrow_flight_client.get_instance().read_query, @@ -342,6 +344,8 @@ def _read_hopsfs_remote( for inode in inode_list: if not self._is_metadata_file(inode.path): + from hsfs.core import arrow_flight_client + if arrow_flight_client.is_data_format_supported( data_format, read_options ): @@ -539,7 +543,10 @@ def profile( or pa.types.is_list(field.type) or pa.types.is_large_list(field.type) or pa.types.is_struct(field.type) - ) and PYARROW_HOPSWORKS_DTYPE_MAPPING[field.type] in ["timestamp", "date"]: + ) and PYARROW_HOPSWORKS_DTYPE_MAPPING.get(field.type, None) in [ + "timestamp", + "date", + ]: if HAS_POLARS and ( isinstance(df, pl.DataFrame) or isinstance(df, pl.dataframe.frame.DataFrame) @@ -573,15 +580,21 @@ def profile( or pa.types.is_list(arrow_type) or pa.types.is_large_list(arrow_type) or pa.types.is_struct(arrow_type) - or PYARROW_HOPSWORKS_DTYPE_MAPPING[arrow_type] + or PYARROW_HOPSWORKS_DTYPE_MAPPING.get(arrow_type, None) in ["timestamp", "date", "binary", "string"] ): dataType = "String" - elif PYARROW_HOPSWORKS_DTYPE_MAPPING[arrow_type] in ["float", "double"]: + elif PYARROW_HOPSWORKS_DTYPE_MAPPING.get(arrow_type, None) in [ + "float", + "double", + ]: dataType = "Fractional" - elif PYARROW_HOPSWORKS_DTYPE_MAPPING[arrow_type] in ["int", "bigint"]: + elif PYARROW_HOPSWORKS_DTYPE_MAPPING.get(arrow_type, None) in [ + "int", + "bigint", + ]: dataType = "Integral" - elif PYARROW_HOPSWORKS_DTYPE_MAPPING[arrow_type] == "boolean": + elif PYARROW_HOPSWORKS_DTYPE_MAPPING.get(arrow_type, None) == "boolean": dataType = "Boolean" else: print( @@ -1077,8 +1090,16 @@ def write_training_dataset( "Currently only query based training datasets are supported by the Python engine" ) + try: + from hsfs.core import arrow_flight_client + + arrow_flight_client_imported = True + except ImportError: + arrow_flight_client_imported = False + if ( - arrow_flight_client.is_query_supported(dataset, user_write_options) + arrow_flight_client_imported + and arrow_flight_client.is_query_supported(dataset, user_write_options) and len(training_dataset.splits) == 0 and feature_view_obj and len(feature_view_obj.transformation_functions) == 0 @@ -1251,7 +1272,7 @@ def _apply_transformation_function( or isinstance(dataset, pl.dataframe.frame.DataFrame) ): # Converting polars dataframe to pandas because currently we support only pandas UDF's as transformation functions. - if HAS_ARROW: + if HAS_PYARROW: dataset = dataset.to_pandas( use_pyarrow_extension_array=True ) # Zero copy if pyarrow extension can be used. diff --git a/python/hsfs/feature_store.py b/python/hsfs/feature_store.py index e3c2f478d..2a384c961 100644 --- a/python/hsfs/feature_store.py +++ b/python/hsfs/feature_store.py @@ -37,7 +37,6 @@ from hsfs.client import exceptions from hsfs.constructor.query import Query from hsfs.core import ( - arrow_flight_client, feature_group_api, feature_group_engine, feature_view_engine, @@ -1790,10 +1789,14 @@ def get_feature_views(self, name: str) -> List[feature_view.FeatureView]: def _disable_hopsworks_feature_query_service_client(self): """Disable Hopsworks feature query service for the current session. This behaviour is not persisted on reset.""" + from hsfs.core import arrow_flight_client + arrow_flight_client._disable_feature_query_service_client() def _reset_hopsworks_feature_query_service_client(self): """Reset Hopsworks feature query service for the current session.""" + from hsfs.core import arrow_flight_client + arrow_flight_client.close() arrow_flight_client.get_instance() diff --git a/python/tests/core/test_type_systems.py b/python/tests/core/test_type_systems.py index 1c6c26b08..baeef01b3 100644 --- a/python/tests/core/test_type_systems.py +++ b/python/tests/core/test_type_systems.py @@ -17,10 +17,10 @@ import pytest from hsfs.core import type_systems -from hsfs.core.constants import HAS_ARROW, HAS_PANDAS +from hsfs.core.constants import HAS_PANDAS, HAS_PYARROW -if HAS_ARROW: +if HAS_PYARROW: import pyarrow as pa if HAS_PANDAS: @@ -32,7 +32,7 @@ class TestTypeSystems: @pytest.mark.skipif( - not HAS_ARROW or not HAS_PANDAS, reason="Arrow or Pandas are not installed" + not HAS_PYARROW or not HAS_PANDAS, reason="Arrow or Pandas are not installed" ) def test_infer_type_pyarrow_list(self): # Act From 4e678f98a4c0405cc05a706f6161b7c19522e49b Mon Sep 17 00:00:00 2001 From: manu-sj <152865565+manu-sj@users.noreply.github.com> Date: Wed, 25 Sep 2024 10:38:14 +0200 Subject: [PATCH 17/23] [FSTORE-1411][APPEND] On-Demand Transformations (#340) * using deep copy to copy feature group object instead of from_response * reverting deep copy change and using to_dict instead * using get to check if the attributes present in response dict --- python/hsfs/core/feature_group_engine.py | 2 +- python/hsfs/feature_group.py | 4 +++- python/hsfs/hopsworks_udf.py | 8 ++++---- python/hsfs/transformation_function.py | 2 +- 4 files changed, 9 insertions(+), 7 deletions(-) diff --git a/python/hsfs/core/feature_group_engine.py b/python/hsfs/core/feature_group_engine.py index 63184695f..a06d12ca9 100644 --- a/python/hsfs/core/feature_group_engine.py +++ b/python/hsfs/core/feature_group_engine.py @@ -259,7 +259,7 @@ def sql(self, query, feature_store_name, dataframe_type, online, read_options): read_options, ) - def _update_features_metadata(self, feature_group, features): + def _update_features_metadata(self, feature_group: fg.FeatureGroup, features): # perform changes on copy in case the update fails, so we don't leave # the user object in corrupted state copy_feature_group = fg.FeatureGroup.from_response_json(feature_group.to_dict()) diff --git a/python/hsfs/feature_group.py b/python/hsfs/feature_group.py index 764123cca..c3407ea2f 100644 --- a/python/hsfs/feature_group.py +++ b/python/hsfs/feature_group.py @@ -3554,7 +3554,9 @@ def to_dict(self) -> Dict[str, Any]: "topicName": self.topic_name, "notificationTopicName": self.notification_topic_name, "deprecated": self.deprecated, - "transformationFunctions": self._transformation_functions, + "transformationFunctions": [ + tf.to_dict() for tf in self._transformation_functions + ], } if self._online_config: fg_meta_dict["onlineConfig"] = self._online_config.to_dict() diff --git a/python/hsfs/hopsworks_udf.py b/python/hsfs/hopsworks_udf.py index 4746a3c56..7a3238ac8 100644 --- a/python/hsfs/hopsworks_udf.py +++ b/python/hsfs/hopsworks_udf.py @@ -315,7 +315,7 @@ def _extract_source_code(udf_function: Callable) -> str: except FileNotFoundError: module_imports = [""] warnings.warn( - "Cannot extract imported dependencies for the function module. Please make sure to import all dependencies for the UDF inside the function.", + "Cannot extract imported dependencies for the UDF from the module in which it is defined. Please make sure to import all dependencies for the UDF inside the function.", stacklevel=2, ) @@ -657,7 +657,7 @@ def from_response_json( dropped_feature.strip() for dropped_feature in json_decamelized["dropped_argument_names"] ] - if "dropped_argument_names" in json_decamelized + if json_decamelized.get("dropped_argument_names", None) else None ) transformation_function_argument_names = ( @@ -667,7 +667,7 @@ def from_response_json( "transformation_function_argument_names" ] ] - if "transformation_function_argument_names" in json_decamelized + if json_decamelized.get("transformation_function_argument_names", None) else None ) statistics_features = ( @@ -675,7 +675,7 @@ def from_response_json( feature.strip() for feature in json_decamelized["statistics_argument_names"] ] - if "statistics_argument_names" in json_decamelized + if json_decamelized.get("statistics_argument_names", None) else None ) diff --git a/python/hsfs/transformation_function.py b/python/hsfs/transformation_function.py index 9e8021984..560b8e45a 100644 --- a/python/hsfs/transformation_function.py +++ b/python/hsfs/transformation_function.py @@ -227,7 +227,7 @@ def to_dict(self) -> Dict[str, Any]: "id": self._id, "version": self._version, "featurestoreId": self._featurestore_id, - "hopsworksUdf": self._hopsworks_udf, + "hopsworksUdf": self._hopsworks_udf.to_dict(), } def _get_output_column_names(self) -> str: From 008b56a9a0b0c1b78ad0e2ee335cae7d6c5e4c90 Mon Sep 17 00:00:00 2001 From: Robin Andersson Date: Wed, 25 Sep 2024 13:43:10 +0200 Subject: [PATCH 18/23] [HWORKS-1421][APPEND] should be camelCase in Predictor.to_dict() (#344) --- python/hsml/predictor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/hsml/predictor.py b/python/hsml/predictor.py index 4c56dc2de..87f00c9aa 100644 --- a/python/hsml/predictor.py +++ b/python/hsml/predictor.py @@ -308,7 +308,7 @@ def to_dict(self): "servingTool": self._serving_tool, "predictor": self._script_file, "apiProtocol": self._api_protocol, - "project_namespace": self._project_namespace, + "projectNamespace": self._project_namespace, } if self.environment is not None: json = {**json, **{"environmentDTO": {"name": self._environment}}} From f5361e411c07ba3be2076b725bac35b6658566cf Mon Sep 17 00:00:00 2001 From: Robin Andersson Date: Wed, 25 Sep 2024 14:02:11 +0200 Subject: [PATCH 19/23] [HWORKS-1639] misc environment was renamed to pandas (#341) --- python/tests/fixtures/predictor_fixtures.json | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/python/tests/fixtures/predictor_fixtures.json b/python/tests/fixtures/predictor_fixtures.json index 6859f6cac..b90eaf6de 100644 --- a/python/tests/fixtures/predictor_fixtures.json +++ b/python/tests/fixtures/predictor_fixtures.json @@ -42,7 +42,7 @@ "name": "topic" }, "environment_dto": { - "name": "misc-inference-pipeline" + "name": "pandas-inference-pipeline" }, "project_namespace": "test" } @@ -98,7 +98,7 @@ "name": "topic" }, "environment_dto": { - "name": "misc-inference-pipeline" + "name": "pandas-inference-pipeline" }, "project_namespace": "test" }, @@ -141,7 +141,7 @@ "name": "topic" }, "environment_dto": { - "name": "misc-inference-pipeline" + "name": "pandas-inference-pipeline" }, "project_namespace": "test" } @@ -255,7 +255,7 @@ "name": "topic" }, "environment_dto": { - "name": "misc-inference-pipeline" + "name": "pandas-inference-pipeline" } } }, @@ -300,7 +300,7 @@ "name": "topic" }, "environment_dto": { - "name": "misc-inference-pipeline" + "name": "pandas-inference-pipeline" } } }, @@ -338,7 +338,7 @@ "name": "topic" }, "environment_dto": { - "name": "misc-inference-pipeline" + "name": "pandas-inference-pipeline" } } }, @@ -383,7 +383,7 @@ "name": "topic" }, "environment_dto": { - "name": "misc-inference-pipeline" + "name": "pandas-inference-pipeline" } } }, From 4e761c880753ee5133a6cdb4427a243fb28824e1 Mon Sep 17 00:00:00 2001 From: Fabio Buso Date: Wed, 25 Sep 2024 18:48:22 +0200 Subject: [PATCH 20/23] [FSTORE-1548] BigQuery connector is using the wrong project when querying arrowflight (#345) --- python/hsfs/storage_connector.py | 3 +- .../fixtures/storage_connector_fixtures.json | 28 ++++++++++++++++++- python/tests/test_storage_connector.py | 21 ++++++++++---- 3 files changed, 43 insertions(+), 9 deletions(-) diff --git a/python/hsfs/storage_connector.py b/python/hsfs/storage_connector.py index 97e963ad0..3618e2e68 100644 --- a/python/hsfs/storage_connector.py +++ b/python/hsfs/storage_connector.py @@ -1575,9 +1575,8 @@ def connector_options(self) -> Dict[str, Any]: """Return options to be passed to an external BigQuery connector library""" props = { "key_path": self._key_path, - "project_id": self._query_project, + "project_id": self._parent_project, "dataset_id": self._dataset, - "parent_project": self._parent_project, } return props diff --git a/python/tests/fixtures/storage_connector_fixtures.json b/python/tests/fixtures/storage_connector_fixtures.json index 82104f04a..41a5d662d 100644 --- a/python/tests/fixtures/storage_connector_fixtures.json +++ b/python/tests/fixtures/storage_connector_fixtures.json @@ -495,7 +495,7 @@ }, "headers": null }, - "get_big_query": { + "get_big_query_table": { "response": { "type": "featurestoreBigQueryConnectorDTO", "description": "BigQuery connector description", @@ -508,6 +508,32 @@ "dataset": "test_dataset", "query_table": "test_query_table", "query_project": "test_query_project", + "arguments": [{"name": "test_name", "value": "test_value"}] + }, + "method": "GET", + "path_params": [ + "project", + "119", + "featurestores", + 67, + "storageconnectors", + "test_big_query" + ], + "query_params": { + "temporaryCredentials": true + }, + "headers": null + }, + "get_big_query_query": { + "response": { + "type": "featurestoreBigQueryConnectorDTO", + "description": "BigQuery connector description", + "featurestoreId": 67, + "id": 1, + "name": "test_big_query", + "storageConnectorType": "BIGQUERY", + "key_path": "test_key_path", + "parent_project": "test_parent_project", "materialization_dataset": "test_materialization_dataset", "arguments": [{"name": "test_name", "value": "test_value"}] }, diff --git a/python/tests/test_storage_connector.py b/python/tests/test_storage_connector.py index 5e3ae2704..1811a86f1 100644 --- a/python/tests/test_storage_connector.py +++ b/python/tests/test_storage_connector.py @@ -800,7 +800,7 @@ def test_default_path(self, mocker): class TestBigQueryConnector: def test_from_response_json(self, backend_fixtures): # Arrange - json = backend_fixtures["storage_connector"]["get_big_query"]["response"] + json = backend_fixtures["storage_connector"]["get_big_query_table"]["response"] # Act sc = storage_connector.StorageConnector.from_response_json(json) @@ -815,7 +815,6 @@ def test_from_response_json(self, backend_fixtures): assert sc.dataset == "test_dataset" assert sc.query_table == "test_query_table" assert sc.query_project == "test_query_project" - assert sc.materialization_dataset == "test_materialization_dataset" assert sc.arguments == {"test_name": "test_value"} def test_from_response_json_basic_info(self, backend_fixtures): @@ -850,7 +849,7 @@ def test_credentials_base64_encoded(self, mocker, backend_fixtures, tmp_path): credentialsFile = tmp_path / "bigquery.json" credentialsFile.write_text(credentials) - json = backend_fixtures["storage_connector"]["get_big_query"]["response"] + json = backend_fixtures["storage_connector"]["get_big_query_table"]["response"] if isinstance(tmp_path, WindowsPath): json["key_path"] = "file:///" + str(credentialsFile.resolve()).replace( "\\", "/" @@ -891,9 +890,7 @@ def test_query_validation(self, mocker, backend_fixtures, tmp_path): credentials = '{"type": "service_account", "project_id": "test"}' credentialsFile = tmp_path / "bigquery.json" credentialsFile.write_text(credentials) - json = backend_fixtures["storage_connector"]["get_big_query"]["response"] - # remove property for query - json.pop("materialization_dataset") + json = backend_fixtures["storage_connector"]["get_big_query_table"]["response"] if isinstance(tmp_path, WindowsPath): json["key_path"] = "file:///" + str(credentialsFile.resolve()).replace( "\\", "/" @@ -905,3 +902,15 @@ def test_query_validation(self, mocker, backend_fixtures, tmp_path): # Assert with pytest.raises(ValueError): sc.read(query="select * from") + + def test_connector_options(self, backend_fixtures): + # Arrange + engine.set_instance("python", python.Engine()) + json = backend_fixtures["storage_connector"]["get_big_query_query"]["response"] + sc = storage_connector.StorageConnector.from_response_json(json) + + # Act + options = sc.connector_options() + + # Assert + assert options["project_id"] == "test_parent_project" From 48452dfad45fad33ee5597756d8e7ae893df52b3 Mon Sep 17 00:00:00 2001 From: Ralf Date: Mon, 30 Sep 2024 01:22:51 +0300 Subject: [PATCH 21/23] [FSTORE-1327] Activity UI doesn't correctly report Delta commit activities (#343) --- python/hsfs/core/delta_engine.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/python/hsfs/core/delta_engine.py b/python/hsfs/core/delta_engine.py index 56cce3a30..95ea7f37b 100644 --- a/python/hsfs/core/delta_engine.py +++ b/python/hsfs/core/delta_engine.py @@ -172,6 +172,8 @@ def _generate_merge_query(self, source_alias, updates_alias): @staticmethod def _get_last_commit_metadata(spark_context, base_path): fg_source_table = DeltaTable.forPath(spark_context, base_path) + + # Get info about the latest commit last_commit = fg_source_table.history(1).first().asDict() version = last_commit["version"] commit_timestamp = util.convert_event_time_to_timestamp( @@ -180,6 +182,12 @@ def _get_last_commit_metadata(spark_context, base_path): commit_date_string = util.get_hudi_datestr_from_timestamp(commit_timestamp) operation_metrics = last_commit["operationMetrics"] + # Get info about the oldest remaining commit + oldest_commit = fg_source_table.history().orderBy("version").first().asDict() + oldest_commit_timestamp = util.convert_event_time_to_timestamp( + oldest_commit["timestamp"] + ) + if version == 0: fg_commit = feature_group_commit.FeatureGroupCommit( commitid=None, @@ -188,7 +196,7 @@ def _get_last_commit_metadata(spark_context, base_path): rows_inserted=operation_metrics["numOutputRows"], rows_updated=0, rows_deleted=0, - last_active_commit_time=commit_timestamp, + last_active_commit_time=oldest_commit_timestamp, ) else: fg_commit = feature_group_commit.FeatureGroupCommit( @@ -198,7 +206,7 @@ def _get_last_commit_metadata(spark_context, base_path): rows_inserted=operation_metrics["numTargetRowsInserted"], rows_updated=operation_metrics["numTargetRowsUpdated"], rows_deleted=operation_metrics["numTargetRowsDeleted"], - last_active_commit_time=commit_timestamp, + last_active_commit_time=oldest_commit_timestamp, ) return fg_commit From 147af3ec99363923e1791a39b5ee7093f20694bb Mon Sep 17 00:00:00 2001 From: Fabio Buso Date: Mon, 30 Sep 2024 10:56:41 +0200 Subject: [PATCH 22/23] Prepare for 4.1.0-SNAPSHOT development --- java/beam/pom.xml | 2 +- java/flink/pom.xml | 2 +- java/hsfs/pom.xml | 2 +- java/pom.xml | 2 +- java/spark/pom.xml | 2 +- python/hopsworks_common/version.py | 2 +- utils/java/pom.xml | 2 +- 7 files changed, 7 insertions(+), 7 deletions(-) diff --git a/java/beam/pom.xml b/java/beam/pom.xml index 3b3f902ca..b240612d9 100644 --- a/java/beam/pom.xml +++ b/java/beam/pom.xml @@ -5,7 +5,7 @@ hsfs-parent com.logicalclocks - 4.0.0-SNAPSHOT + 4.1.0-SNAPSHOT 4.0.0 diff --git a/java/flink/pom.xml b/java/flink/pom.xml index d2d7b87e0..7e39ece2a 100644 --- a/java/flink/pom.xml +++ b/java/flink/pom.xml @@ -5,7 +5,7 @@ hsfs-parent com.logicalclocks - 4.0.0-SNAPSHOT + 4.1.0-SNAPSHOT 4.0.0 diff --git a/java/hsfs/pom.xml b/java/hsfs/pom.xml index 56847be5d..c56061427 100644 --- a/java/hsfs/pom.xml +++ b/java/hsfs/pom.xml @@ -5,7 +5,7 @@ hsfs-parent com.logicalclocks - 4.0.0-SNAPSHOT + 4.1.0-SNAPSHOT 4.0.0 diff --git a/java/pom.xml b/java/pom.xml index a0a8750aa..cc3dd776c 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -7,7 +7,7 @@ com.logicalclocks hsfs-parent pom - 4.0.0-SNAPSHOT + 4.1.0-SNAPSHOT hsfs spark diff --git a/java/spark/pom.xml b/java/spark/pom.xml index 2acda722b..185da5d20 100644 --- a/java/spark/pom.xml +++ b/java/spark/pom.xml @@ -22,7 +22,7 @@ hsfs-parent com.logicalclocks - 4.0.0-SNAPSHOT + 4.1.0-SNAPSHOT 4.0.0 diff --git a/python/hopsworks_common/version.py b/python/hopsworks_common/version.py index 03e334b40..52cd363fc 100644 --- a/python/hopsworks_common/version.py +++ b/python/hopsworks_common/version.py @@ -14,4 +14,4 @@ # limitations under the License. # -__version__ = "4.0.0.dev1" +__version__ = "4.1.0.dev1" diff --git a/utils/java/pom.xml b/utils/java/pom.xml index 1f9baabf9..196978d6c 100644 --- a/utils/java/pom.xml +++ b/utils/java/pom.xml @@ -5,7 +5,7 @@ com.logicalclocks hsfs-utils - 4.0.0-SNAPSHOT + 4.1.0-SNAPSHOT 3.2.0.0-SNAPSHOT From a5c28e7246247b362c437031b1c22f263f72a004 Mon Sep 17 00:00:00 2001 From: Aleksey Veresov Date: Fri, 4 Oct 2024 10:15:03 +0200 Subject: [PATCH 23/23] [FSTORE-1468] Make numpy optional (#338) * Remove dependency on numpy except from convert_to_default_dataframe * Ruff * Update pyproject.toml extras * Fix * Fix * Attempt making numpy optional in convert_to_default_dataframe * Address Manu's review --- locust_benchmark/create_feature_group.py | 1 - python/hopsworks_common/core/constants.py | 17 +++ python/hsfs/builtin_transformations.py | 5 +- python/hsfs/constructor/query.py | 6 +- python/hsfs/core/feature_view_engine.py | 6 +- python/hsfs/core/kafka_engine.py | 7 +- python/hsfs/core/vector_server.py | 19 +++- python/hsfs/engine/python.py | 17 ++- python/hsfs/engine/spark.py | 130 ++++++++++++++++------ python/hsfs/feature_group.py | 11 +- python/hsfs/feature_group_writer.py | 6 +- python/hsfs/feature_store.py | 6 +- python/hsfs/feature_view.py | 9 +- python/hsfs/storage_connector.py | 6 +- python/hsfs/training_dataset.py | 6 +- python/hsml/core/serving_api.py | 4 +- python/pyproject.toml | 7 +- utils/python/hsfs_utils.py | 1 - 18 files changed, 195 insertions(+), 69 deletions(-) diff --git a/locust_benchmark/create_feature_group.py b/locust_benchmark/create_feature_group.py index b61a69a41..2ac6cf568 100644 --- a/locust_benchmark/create_feature_group.py +++ b/locust_benchmark/create_feature_group.py @@ -1,7 +1,6 @@ from common.hopsworks_client import HopsworksClient if __name__ == "__main__": - hopsworks_client = HopsworksClient() fg = hopsworks_client.get_or_create_fg() hopsworks_client.insert_data(fg) diff --git a/python/hopsworks_common/core/constants.py b/python/hopsworks_common/core/constants.py index 4ea804d4c..0e1acddc4 100644 --- a/python/hopsworks_common/core/constants.py +++ b/python/hopsworks_common/core/constants.py @@ -20,6 +20,13 @@ # Avro HAS_FAST_AVRO: bool = importlib.util.find_spec("fastavro") is not None HAS_AVRO: bool = importlib.util.find_spec("avro") is not None +avro_not_installed_message = ( + "Avro package not found. " + "If you want to use avro with Hopsworks you can install the corresponding extra via " + '`pip install "hopsworks[avro]"`. ' + "You can also install avro directly in your environment with `pip install fastavro` or `pip install avro`. " + "You will need to restart your kernel if applicable." +) # Confluent Kafka HAS_CONFLUENT_KAFKA: bool = importlib.util.find_spec("confluent_kafka") is not None @@ -55,7 +62,17 @@ ) HAS_PANDAS: bool = importlib.util.find_spec("pandas") is not None + +# NumPy HAS_NUMPY: bool = importlib.util.find_spec("numpy") is not None +numpy_not_installed_message = ( + "Numpy package not found. " + "If you want to use numpy with Hopsworks you can install the corresponding extra via " + '`pip install "hopsworks[numpy]"`. ' + "You can also install numpy directly in your environment with `pip install numpy`. " + "You will need to restart your kernel if applicable." +) + HAS_POLARS: bool = importlib.util.find_spec("polars") is not None polars_not_installed_message = ( "Polars package not found. " diff --git a/python/hsfs/builtin_transformations.py b/python/hsfs/builtin_transformations.py index fa8348553..1fc2ce670 100644 --- a/python/hsfs/builtin_transformations.py +++ b/python/hsfs/builtin_transformations.py @@ -14,7 +14,8 @@ # limitations under the License. # -import numpy as np +import math + import pandas as pd from hsfs.hopsworks_udf import udf from hsfs.transformation_statistics import TransformationStatistics @@ -49,7 +50,7 @@ def label_encoder(feature: pd.Series, statistics=feature_statistics) -> pd.Serie # Unknown categories not present in training dataset are encoded as -1. return pd.Series( [ - value_to_index.get(data, -1) if not pd.isna(data) else np.nan + value_to_index.get(data, -1) if not pd.isna(data) else math.nan for data in feature ] ) diff --git a/python/hsfs/constructor/query.py b/python/hsfs/constructor/query.py index b332ab1db..2fa4ae8d0 100644 --- a/python/hsfs/constructor/query.py +++ b/python/hsfs/constructor/query.py @@ -21,9 +21,9 @@ from typing import Any, Dict, List, Optional, Tuple, TypeVar, Union import humps -import numpy as np import pandas as pd from hopsworks_common.client.exceptions import FeatureStoreException +from hopsworks_common.core.constants import HAS_NUMPY from hsfs import engine, storage_connector, util from hsfs import feature_group as fg_mod from hsfs.constructor import join @@ -34,6 +34,10 @@ from hsfs.feature import Feature +if HAS_NUMPY: + import numpy as np + + @typechecked class Query: ERROR_MESSAGE_FEATURE_AMBIGUOUS = ( diff --git a/python/hsfs/core/feature_view_engine.py b/python/hsfs/core/feature_view_engine.py index 84b6c7bda..be284f752 100644 --- a/python/hsfs/core/feature_view_engine.py +++ b/python/hsfs/core/feature_view_engine.py @@ -19,10 +19,10 @@ import warnings from typing import Any, Dict, List, Optional, TypeVar, Union -import numpy as np import pandas as pd from hopsworks_common import client from hopsworks_common.client.exceptions import FeatureStoreException +from hopsworks_common.core.constants import HAS_NUMPY from hsfs import ( engine, feature_group, @@ -45,6 +45,10 @@ from hsfs.training_dataset_split import TrainingDatasetSplit +if HAS_NUMPY: + import numpy as np + + class FeatureViewEngine: ENTITY_TYPE = "featureview" _TRAINING_DATA_API_PATH = "trainingdatasets" diff --git a/python/hsfs/core/kafka_engine.py b/python/hsfs/core/kafka_engine.py index 66ebba47f..d21b6ec22 100644 --- a/python/hsfs/core/kafka_engine.py +++ b/python/hsfs/core/kafka_engine.py @@ -20,14 +20,17 @@ from io import BytesIO from typing import TYPE_CHECKING, Any, Callable, Dict, Literal, Optional, Tuple, Union -import numpy as np import pandas as pd from hopsworks_common import client +from hopsworks_common.core.constants import HAS_NUMPY from hsfs.core import storage_connector_api from hsfs.core.constants import HAS_AVRO, HAS_CONFLUENT_KAFKA, HAS_FAST_AVRO from tqdm import tqdm +if HAS_NUMPY: + import numpy as np + if HAS_CONFLUENT_KAFKA: from confluent_kafka import Consumer, KafkaError, Producer, TopicPartition @@ -202,7 +205,7 @@ def encode_row(complex_feature_writers, writer, row): if isinstance(row, dict): for k in row.keys(): # for avro to be able to serialize them, they need to be python data types - if isinstance(row[k], np.ndarray): + if HAS_NUMPY and isinstance(row[k], np.ndarray): row[k] = row[k].tolist() if isinstance(row[k], pd.Timestamp): row[k] = row[k].to_pydatetime() diff --git a/python/hsfs/core/vector_server.py b/python/hsfs/core/vector_server.py index ccc0f39d2..a0f2ed2ab 100755 --- a/python/hsfs/core/vector_server.py +++ b/python/hsfs/core/vector_server.py @@ -23,14 +23,15 @@ from io import BytesIO from typing import Any, Callable, Dict, List, Literal, Optional, Set, Tuple, Union -import avro.io -import avro.schema -import numpy as np import pandas as pd from hopsworks_common import client from hopsworks_common.core.constants import ( + HAS_AVRO, HAS_FAST_AVRO, + HAS_NUMPY, HAS_POLARS, + avro_not_installed_message, + numpy_not_installed_message, polars_not_installed_message, ) from hsfs import ( @@ -52,9 +53,14 @@ ) +if HAS_NUMPY: + import numpy as np + if HAS_FAST_AVRO: from fastavro import schemaless_reader -else: +if HAS_AVRO: + import avro.io + import avro.schema from avro.io import BinaryDecoder if HAS_POLARS: @@ -807,6 +813,8 @@ def handle_feature_vector_return_type( return feature_vectorz elif return_type.lower() == "numpy" and not inference_helper: _logger.debug("Returning feature vector as numpy array") + if not HAS_NUMPY: + raise ModuleNotFoundError(numpy_not_installed_message) return np.array(feature_vectorz) # Only inference helper can return dict elif return_type.lower() == "dict" and inference_helper: @@ -1064,6 +1072,9 @@ def build_complex_feature_decoders(self) -> Dict[str, Callable]: - deserialization of complex features from the online feature store - conversion of string or int timestamps to datetime objects """ + if not HAS_AVRO: + raise ModuleNotFoundError(avro_not_installed_message) + complex_feature_schemas = { f.name: avro.io.DatumReader( avro.schema.parse( diff --git a/python/hsfs/engine/python.py b/python/hsfs/engine/python.py index 96c4f9ecc..5db04ab8e 100644 --- a/python/hsfs/engine/python.py +++ b/python/hsfs/engine/python.py @@ -50,7 +50,6 @@ import boto3 import hsfs -import numpy as np import pandas as pd import pyarrow as pa from botocore.response import StreamingBody @@ -83,6 +82,7 @@ from hsfs.core.constants import ( HAS_AIOMYSQL, HAS_GREAT_EXPECTATIONS, + HAS_NUMPY, HAS_PANDAS, HAS_PYARROW, HAS_SQLALCHEMY, @@ -98,6 +98,9 @@ if HAS_GREAT_EXPECTATIONS: import great_expectations +if HAS_NUMPY: + import numpy as np + if HAS_AIOMYSQL and HAS_SQLALCHEMY: from hsfs.core import util_sql @@ -1464,11 +1467,13 @@ def _start_offline_materialization(offline_write_options: Dict[str, Any]) -> boo def _convert_feature_log_to_df(feature_log, cols) -> pd.DataFrame: if feature_log is None and cols: return pd.DataFrame(columns=cols) - if not ( - isinstance(feature_log, (list, np.ndarray, pd.DataFrame, pl.DataFrame)) + if not (isinstance(feature_log, (list, pd.DataFrame, pl.DataFrame))) or ( + HAS_NUMPY and isinstance(feature_log, np.ndarray) ): raise ValueError(f"Type '{type(feature_log)}' not accepted") - if isinstance(feature_log, list) or isinstance(feature_log, np.ndarray): + if isinstance(feature_log, list) or ( + HAS_NUMPY and isinstance(feature_log, np.ndarray) + ): Engine._validate_logging_list(feature_log, cols) return pd.DataFrame(feature_log, columns=cols) else: @@ -1479,7 +1484,9 @@ def _convert_feature_log_to_df(feature_log, cols) -> pd.DataFrame: @staticmethod def _validate_logging_list(feature_log, cols): - if isinstance(feature_log[0], list) or isinstance(feature_log[0], np.ndarray): + if isinstance(feature_log[0], list) or ( + HAS_NUMPY and isinstance(feature_log[0], np.ndarray) + ): provided_len = len(feature_log[0]) else: provided_len = 1 diff --git a/python/hsfs/engine/spark.py b/python/hsfs/engine/spark.py index 53ea4dc0f..3a990fe18 100644 --- a/python/hsfs/engine/spark.py +++ b/python/hsfs/engine/spark.py @@ -31,15 +31,19 @@ from pyspark.rdd import RDD from pyspark.sql import DataFrame -import numpy as np import pandas as pd import tzlocal +from hopsworks_common.core.constants import HAS_NUMPY, HAS_PANDAS from hsfs.constructor import query # in case importing in %%local from hsfs.core.vector_db_client import VectorDbClient +if HAS_NUMPY: + import numpy as np + + try: import pyspark from pyspark import SparkFiles @@ -258,39 +262,11 @@ def _return_dataframe_type(self, dataframe, dataframe_type): def convert_to_default_dataframe(self, dataframe): if isinstance(dataframe, list): - dataframe = np.array(dataframe) - - if isinstance(dataframe, np.ndarray): - if dataframe.ndim != 2: - raise TypeError( - "Cannot convert numpy array that do not have two dimensions to a dataframe. " - "The number of dimensions are: {}".format(dataframe.ndim) - ) - num_cols = dataframe.shape[1] - dataframe_dict = {} - for n_col in list(range(num_cols)): - col_name = "col_" + str(n_col) - dataframe_dict[col_name] = dataframe[:, n_col] - dataframe = pd.DataFrame(dataframe_dict) - - if isinstance(dataframe, pd.DataFrame): - # convert timestamps to current timezone - local_tz = tzlocal.get_localzone() - # make shallow copy so the original df does not get changed - dataframe_copy = dataframe.copy(deep=False) - for c in dataframe_copy.columns: - if isinstance( - dataframe_copy[c].dtype, pd.core.dtypes.dtypes.DatetimeTZDtype - ): - # convert to utc timestamp - dataframe_copy[c] = dataframe_copy[c].dt.tz_convert(None) - if dataframe_copy[c].dtype == np.dtype("datetime64[ns]"): - # set the timezone to the client's timezone because that is - # what spark expects. - dataframe_copy[c] = dataframe_copy[c].dt.tz_localize( - str(local_tz), ambiguous="infer", nonexistent="shift_forward" - ) - dataframe = self._spark_session.createDataFrame(dataframe_copy) + dataframe = self.convert_list_to_spark_dataframe(dataframe) + elif HAS_NUMPY and isinstance(dataframe, np.ndarray): + dataframe = self.convert_numpy_to_spark_dataframe(dataframe) + elif HAS_PANDAS and isinstance(dataframe, pd.DataFrame): + dataframe = self.convert_pandas_to_spark_dataframe(dataframe) elif isinstance(dataframe, RDD): dataframe = dataframe.toDF() @@ -341,6 +317,92 @@ def convert_to_default_dataframe(self, dataframe): ) ) + @staticmethod + def utc_disguised_as_local(dt): + local_tz = tzlocal.get_localzone() + utc = timezone.utc + if not dt.tzinfo: + dt = dt.replace(tzinfo=utc) + return dt.astimezone(utc).replace(tzinfo=local_tz) + + def convert_list_to_spark_dataframe(self, dataframe): + if HAS_NUMPY: + return self.convert_numpy_to_spark_dataframe(np.array(dataframe)) + try: + dataframe[0][0] + except TypeError: + raise TypeError( + "Cannot convert a list that has less than two dimensions to a dataframe." + ) from None + ok = False + try: + dataframe[0][0][0] + except TypeError: + ok = True + if not ok: + raise TypeError( + "Cannot convert a list that has more than two dimensions to a dataframe." + ) from None + num_cols = len(dataframe[0]) + if HAS_PANDAS: + dataframe_dict = {} + for n_col in range(num_cols): + c = "col_" + str(n_col) + dataframe_dict[c] = [dataframe[i][n_col] for i in range(len(dataframe))] + return self.convert_pandas_to_spark_dataframe(pd.DataFrame(dataframe_dict)) + for i in range(len(dataframe)): + dataframe[i] = [ + self.utc_disguised_as_local(d) if isinstance(d, datetime) else d + for d in dataframe[i] + ] + return self._spark_session.createDataFrame( + dataframe, ["col_" + str(n) for n in range(num_cols)] + ) + + def convert_numpy_to_spark_dataframe(self, dataframe): + if dataframe.ndim != 2: + raise TypeError( + "Cannot convert numpy array that do not have two dimensions to a dataframe. " + "The number of dimensions are: {}".format(dataframe.ndim) + ) + num_cols = dataframe.shape[1] + if HAS_PANDAS: + dataframe_dict = {} + for n_col in range(num_cols): + c = "col_" + str(n_col) + dataframe_dict[c] = dataframe[:, n_col] + return self.convert_pandas_to_spark_dataframe(pd.DataFrame(dataframe_dict)) + # convert timestamps to current timezone + for n_col in range(num_cols): + if dataframe[:, n_col].dtype == np.dtype("datetime64[ns]"): + # set the timezone to the client's timezone because that is + # what spark expects. + dataframe[:, n_col] = np.array( + [self.utc_disguised_as_local(d.item()) for d in dataframe[:, n_col]] + ) + return self._spark_session.createDataFrame( + dataframe.tolist(), ["col_" + str(n) for n in range(num_cols)] + ) + + def convert_pandas_to_spark_dataframe(self, dataframe): + # convert timestamps to current timezone + local_tz = tzlocal.get_localzone() + # make shallow copy so the original df does not get changed + dataframe_copy = dataframe.copy(deep=False) + for c in dataframe_copy.columns: + if isinstance( + dataframe_copy[c].dtype, pd.core.dtypes.dtypes.DatetimeTZDtype + ): + # convert to utc timestamp + dataframe_copy[c] = dataframe_copy[c].dt.tz_convert(None) + if HAS_NUMPY and dataframe_copy[c].dtype == np.dtype("datetime64[ns]"): + # set the timezone to the client's timezone because that is + # what spark expects. + dataframe_copy[c] = dataframe_copy[c].dt.tz_localize( + str(local_tz), ambiguous="infer", nonexistent="shift_forward" + ) + return self._spark_session.createDataFrame(dataframe_copy) + def save_dataframe( self, feature_group, diff --git a/python/hsfs/feature_group.py b/python/hsfs/feature_group.py index c3407ea2f..4a1db2c57 100644 --- a/python/hsfs/feature_group.py +++ b/python/hsfs/feature_group.py @@ -22,7 +22,6 @@ import warnings from datetime import date, datetime from typing import ( - TYPE_CHECKING, Any, Dict, List, @@ -33,17 +32,12 @@ Union, ) - -if TYPE_CHECKING: - import great_expectations - import avro.schema import hsfs.expectation_suite import humps -import numpy as np import pandas as pd from hopsworks_common.client.exceptions import FeatureStoreException, RestAPIError -from hopsworks_common.core.constants import HAS_POLARS +from hopsworks_common.core.constants import HAS_NUMPY, HAS_POLARS from hsfs import ( engine, feature, @@ -104,6 +98,9 @@ if HAS_CONFLUENT_KAFKA: import confluent_kafka +if HAS_NUMPY: + import numpy as np + if HAS_POLARS: import polars as pl diff --git a/python/hsfs/feature_group_writer.py b/python/hsfs/feature_group_writer.py index 146d47bed..de63bd5a4 100644 --- a/python/hsfs/feature_group_writer.py +++ b/python/hsfs/feature_group_writer.py @@ -17,12 +17,16 @@ from typing import Any, Dict, List, Optional, Tuple, TypeVar, Union -import numpy as np import pandas as pd +from hopsworks_common.core.constants import HAS_NUMPY from hsfs.core.job import Job from hsfs.validation_report import ValidationReport +if HAS_NUMPY: + import numpy as np + + class FeatureGroupWriter: def __init__(self, feature_group): self._feature_group = feature_group diff --git a/python/hsfs/feature_store.py b/python/hsfs/feature_store.py index 2a384c961..77c48ae1a 100644 --- a/python/hsfs/feature_store.py +++ b/python/hsfs/feature_store.py @@ -21,9 +21,8 @@ from typing import Any, Dict, List, Optional, TypeVar, Union import humps -import numpy as np import pandas as pd -from hopsworks_common.core.constants import HAS_POLARS +from hopsworks_common.core.constants import HAS_NUMPY, HAS_POLARS from hsfs import ( expectation_suite, feature, @@ -52,6 +51,9 @@ from hsfs.transformation_function import TransformationFunction +if HAS_NUMPY: + import numpy as np + if HAS_POLARS: import polars as pl diff --git a/python/hsfs/feature_view.py b/python/hsfs/feature_view.py index 7527f4de7..6dbe7a585 100644 --- a/python/hsfs/feature_view.py +++ b/python/hsfs/feature_view.py @@ -33,10 +33,9 @@ ) import humps -import numpy as np import pandas as pd from hopsworks_common.client.exceptions import FeatureStoreException -from hopsworks_common.core.constants import HAS_POLARS +from hopsworks_common.core.constants import HAS_NUMPY, HAS_POLARS from hsfs import ( feature_group, storage_connector, @@ -76,6 +75,12 @@ from hsml.model import Model +if HAS_NUMPY: + import numpy as np + + +_logger = logging.getLogger(__name__) + TrainingDatasetDataFrameTypes = Union[ pd.DataFrame, TypeVar("pyspark.sql.DataFrame"), # noqa: F821 diff --git a/python/hsfs/storage_connector.py b/python/hsfs/storage_connector.py index 3618e2e68..5f953737f 100644 --- a/python/hsfs/storage_connector.py +++ b/python/hsfs/storage_connector.py @@ -24,14 +24,16 @@ from typing import Any, Dict, List, Optional, TypeVar, Union import humps -import numpy as np import pandas as pd from hopsworks_common import client -from hopsworks_common.core.constants import HAS_POLARS +from hopsworks_common.core.constants import HAS_NUMPY, HAS_POLARS from hsfs import engine from hsfs.core import storage_connector_api +if HAS_NUMPY: + import numpy as np + if HAS_POLARS: import polars as pl diff --git a/python/hsfs/training_dataset.py b/python/hsfs/training_dataset.py index 92db6d23e..94688b692 100644 --- a/python/hsfs/training_dataset.py +++ b/python/hsfs/training_dataset.py @@ -19,10 +19,10 @@ from typing import Any, Dict, List, Optional, Set, TypeVar, Union import humps -import numpy as np import pandas as pd from hopsworks_common import client from hopsworks_common.client.exceptions import RestAPIError +from hopsworks_common.core.constants import HAS_NUMPY from hsfs import engine, training_dataset_feature, util from hsfs.constructor import filter, query from hsfs.core import ( @@ -36,6 +36,10 @@ from hsfs.training_dataset_split import TrainingDatasetSplit +if HAS_NUMPY: + import numpy as np + + class TrainingDatasetBase: # NOTE: This class is exposed to users with the only purpose of providing information about a Training Dataset # and, therefore, it should not implement any functionality and remain with as minimal as possible diff --git a/python/hsml/core/serving_api.py b/python/hsml/core/serving_api.py index c17eba65c..92d947728 100644 --- a/python/hsml/core/serving_api.py +++ b/python/hsml/core/serving_api.py @@ -291,7 +291,9 @@ def _send_inference_request_via_grpc_protocol( # the channel, which will be reused in all following calls on the same deployment object. # The gRPC channel is freed when calling deployment.stop() print("Initializing gRPC channel...") - deployment_instance._grpc_channel = self._create_grpc_channel(deployment_instance) + deployment_instance._grpc_channel = self._create_grpc_channel( + deployment_instance + ) # build an infer request request = InferRequest( infer_inputs=data, diff --git a/python/pyproject.toml b/python/pyproject.toml index 6cd64077e..d655534df 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -42,7 +42,6 @@ dependencies = [ "furl", "boto3", "pandas<2.2.0", - "numpy<2", "pyjks", "mock", "avro==1.11.3", @@ -60,6 +59,7 @@ dependencies = [ [project.optional-dependencies] python = [ + "numpy<2", "pyarrow>=10.0", "confluent-kafka<=2.3.0", "fastavro>=1.4.11,<=1.8.4", @@ -86,7 +86,10 @@ dev-pandas1 = [ "sqlalchemy<=1.4.48", ] dev = ["hopsworks[dev-no-opt,great-expectations,polars]"] -polars=["polars>=0.20.18,<=0.21.0"] +polars=[ + "polars>=0.20.18,<=0.21.0", + "pyarrow>=10.0", +] [build-system] requires = ["setuptools", "wheel"] diff --git a/utils/python/hsfs_utils.py b/utils/python/hsfs_utils.py index 5af468873..dfc0badfb 100644 --- a/utils/python/hsfs_utils.py +++ b/utils/python/hsfs_utils.py @@ -318,4 +318,3 @@ def parse_isoformat_date(da: str) -> datetime: sys.exit(1) sys.exit(0) -