From 2f6233e6263226be6b29a4b4ec2fc7f0ba00dc38 Mon Sep 17 00:00:00 2001 From: davitbzh Date: Wed, 13 Nov 2024 12:18:56 +0100 Subject: [PATCH 1/8] WIP --- .../core/external_feature_group_engine.py | 4 ++- python/hsfs/core/feature_group_engine.py | 4 ++- python/hsfs/core/feature_view_engine.py | 6 ++-- python/hsfs/core/spine_group_engine.py | 4 ++- python/hsfs/feature.py | 14 +++++++- python/hsfs/feature_group.py | 32 ++++++++++++++++--- python/hsfs/feature_store.py | 10 ++++++ python/hsfs/feature_view.py | 18 +++++------ 8 files changed, 71 insertions(+), 21 deletions(-) diff --git a/python/hsfs/core/external_feature_group_engine.py b/python/hsfs/core/external_feature_group_engine.py index 7e08946a7..a66975f28 100644 --- a/python/hsfs/core/external_feature_group_engine.py +++ b/python/hsfs/core/external_feature_group_engine.py @@ -42,12 +42,14 @@ def save(self, feature_group): external_dataset ) - # set primary and partition key columns + # set primary, foreign and partition key columns # we should move this to the backend util.verify_attribute_key_names(feature_group, True) for feat in feature_group.features: if feat.name in feature_group.primary_key: feat.primary = True + if feat.name in feature_group.foreign_key: + feat.foreign = True util.validate_embedding_feature_type( feature_group.embedding_index, feature_group._features ) diff --git a/python/hsfs/core/feature_group_engine.py b/python/hsfs/core/feature_group_engine.py index 0eb5c441a..1647513b0 100644 --- a/python/hsfs/core/feature_group_engine.py +++ b/python/hsfs/core/feature_group_engine.py @@ -443,13 +443,15 @@ def save_feature_group_metadata( feature_group.features, dataframe_features ) - # set primary and partition key columns + # set primary, foreign and partition key columns # we should move this to the backend util.verify_attribute_key_names(feature_group) for feat in feature_group.features: if feat.name in feature_group.primary_key: feat.primary = True + if feat.name in feature_group.foreign_key: + feat.foreign = True if feat.name in feature_group.partition_key: feat.partition = True if ( diff --git a/python/hsfs/core/feature_view_engine.py b/python/hsfs/core/feature_view_engine.py index be284f752..423f439a2 100644 --- a/python/hsfs/core/feature_view_engine.py +++ b/python/hsfs/core/feature_view_engine.py @@ -389,9 +389,9 @@ def create_training_dataset( training_dataset_obj, user_write_options, spine=None, - primary_keys=False, - event_time=False, - training_helper_columns=False, + primary_keys=True, + event_time=True, + training_helper_columns=True, ): self._set_event_time(feature_view_obj, training_dataset_obj) updated_instance = self._create_training_data_metadata( diff --git a/python/hsfs/core/spine_group_engine.py b/python/hsfs/core/spine_group_engine.py index 498c615f9..dab06d1a1 100644 --- a/python/hsfs/core/spine_group_engine.py +++ b/python/hsfs/core/spine_group_engine.py @@ -34,12 +34,14 @@ def save(self, feature_group): feature_group.dataframe ) - # set primary and partition key columns + # set primary, foreign and partition key columns # we should move this to the backend util.verify_attribute_key_names(feature_group, True) for feat in feature_group.features: if feat.name in feature_group.primary_key: feat.primary = True + if feat.name in feature_group.foreign_key: + feat.foreign = True # need to save dataframe during save since otherwise it will be lost dataframe = feature_group.dataframe diff --git a/python/hsfs/feature.py b/python/hsfs/feature.py index bb57139ed..1a34b5179 100644 --- a/python/hsfs/feature.py +++ b/python/hsfs/feature.py @@ -42,6 +42,7 @@ def __init__( type: Optional[str] = None, description: Optional[str] = None, primary: bool = False, + foreign: bool = False, partition: bool = False, hudi_precombine_key: bool = False, online_type: Optional[str] = None, @@ -61,6 +62,7 @@ def __init__( self._type = type self._description = description self._primary = primary + self._foreign = foreign self._partition = partition self._hudi_precombine_key = hudi_precombine_key self._online_type = online_type @@ -93,6 +95,7 @@ def to_dict(self) -> Dict[str, Any]: "partition": self._partition, "hudiPrecombineKey": self._hudi_precombine_key, "primary": self._primary, + "foreign": self._foreign, "onlineType": self._online_type, "defaultValue": self._default_value, "featureGroupId": self._feature_group_id, @@ -178,6 +181,15 @@ def primary(self) -> bool: def primary(self, primary: bool) -> None: self._primary = primary + @property + def foreign(self) -> bool: + """Whether the feature is part of the foreign key of the feature group.""" + return self._foreign + + @foreign.setter + def foreign(self, foreign: bool) -> None: + self._foreign = foreign + @property def partition(self) -> bool: """Whether the feature is part of the partition key of the feature group.""" @@ -262,7 +274,7 @@ def __str__(self) -> str: return self.json() def __repr__(self) -> str: - return f"Feature({self._name!r}, {self._type!r}, {self._description!r}, {self._primary}, {self._partition}, {self._online_type!r}, {self._default_value!r}, {self._feature_group_id!r})" + return f"Feature({self._name!r}, {self._type!r}, {self._description!r}, {self._primary}, {self._foreign}, {self._partition}, {self._online_type!r}, {self._default_value!r}, {self._feature_group_id!r})" def __hash__(self) -> int: return hash(f"{self.feature_group_id}_{self.name}") diff --git a/python/hsfs/feature_group.py b/python/hsfs/feature_group.py index 47cc974a7..cd6431686 100644 --- a/python/hsfs/feature_group.py +++ b/python/hsfs/feature_group.py @@ -261,6 +261,7 @@ def delete(self) -> None: def select_all( self, include_primary_key: Optional[bool] = True, + include_foreign_key: Optional[bool] = True, include_event_time: Optional[bool] = True, ) -> query.Query: """Select all features along with primary key and event time from the feature group and return a query object. @@ -314,7 +315,7 @@ def select_all( # Returns `Query`. A query object with all features of the feature group. """ - if include_event_time and include_primary_key: + if include_event_time and include_primary_key and include_foreign_key: return query.Query( left_feature_group=self, left_features=self._features, @@ -322,11 +323,13 @@ def select_all( feature_store_id=self._feature_store_id, ) elif include_event_time: - return self.select_except(self.primary_key) + return self.select_except(self.primary_key + self.foreign_key) elif include_primary_key: - return self.select_except([self.event_time]) - else: + return self.select_except(self.foreign_key + [self.event_time]) + elif include_foreign_key: return self.select_except(self.primary_key + [self.event_time]) + else: + return self.select_except(self.primary_key + self.foreign_key + [self.event_time]) def select_features( self, @@ -425,7 +428,7 @@ def select_features( # Returns `Query`. A query object with all features of the feature group. """ - query = self.select_except(self.primary_key + [self.event_time]) + query = self.select_except(self.primary_key + self.foreign_key + [self.event_time]) _logger.info( f"Using {[f.name for f in query.features]} as features for the query." "To include primary key and event time use `select_all`." @@ -2216,6 +2219,7 @@ def __init__( description: Optional[str] = "", partition_key: Optional[List[str]] = None, primary_key: Optional[List[str]] = None, + foreign_key: Optional[List[str]] = None, hudi_precombine_key: Optional[str] = None, featurestore_name: Optional[str] = None, embedding_index: Optional["EmbeddingIndex"] = None, @@ -2302,6 +2306,9 @@ def __init__( self.primary_key: List[str] = [ feat.name for feat in self._features if feat.primary is True ] + self.foreign_key: List[str] = [ + feat.name for feat in self._features if feat.foreign is True + ] self._partition_key: List[str] = [ feat.name for feat in self._features if feat.partition is True ] @@ -2329,6 +2336,7 @@ def __init__( self._stream = True self.primary_key = primary_key + self.foreign_key = foreign_key self.partition_key = partition_key self._hudi_precombine_key = ( util.autofix_feature_name(hudi_precombine_key) @@ -3793,6 +3801,7 @@ def __init__( version: Optional[int] = None, description: Optional[str] = None, primary_key: Optional[List[str]] = None, + foreign_key: Optional[List[str]] = None, featurestore_id: Optional[int] = None, featurestore_name: Optional[str] = None, created: Optional[str] = None, @@ -3877,6 +3886,11 @@ def __init__( if self._features else [] ) + self.foreign_key = ( + [feat.name for feat in self._features if feat.foreign is True] + if self._features + else [] + ) self.statistics_config = statistics_config self._options = ( @@ -3886,6 +3900,7 @@ def __init__( ) else: self.primary_key = primary_key + self.foreign_key = foreign_key self.statistics_config = statistics_config self._features = features self._options = options or {} @@ -4319,6 +4334,7 @@ def __init__( version: Optional[int] = None, description: Optional[str] = None, primary_key: Optional[List[str]] = None, + foreign_key: Optional[List[str]] = None, featurestore_id: Optional[int] = None, featurestore_name: Optional[str] = None, created: Optional[str] = None, @@ -4396,8 +4412,14 @@ def __init__( if self._features else [] ) + self.foreign_key = ( + [feat.name for feat in self._features if feat.foreign is True] + if self._features + else [] + ) else: self.primary_key = primary_key + self.foreign_key = foreign_key self._features = features self._href = href diff --git a/python/hsfs/feature_store.py b/python/hsfs/feature_store.py index 4b45c9c77..8be0e1bea 100644 --- a/python/hsfs/feature_store.py +++ b/python/hsfs/feature_store.py @@ -501,6 +501,7 @@ def create_feature_group( time_travel_format: Optional[str] = "HUDI", partition_key: Optional[List[str]] = None, primary_key: Optional[List[str]] = None, + foreign_key: Optional[List[str]] = None, embedding_index: Optional[EmbeddingIndex] = None, hudi_precombine_key: Optional[str] = None, features: Optional[List[feature.Feature]] = None, @@ -648,6 +649,7 @@ def plus_two(value): time_travel_format=time_travel_format, partition_key=partition_key or [], primary_key=primary_key or [], + foreign_key=foreign_key or [], hudi_precombine_key=hudi_precombine_key, featurestore_id=self._id, featurestore_name=self._name, @@ -679,6 +681,7 @@ def get_or_create_feature_group( time_travel_format: Optional[str] = "HUDI", partition_key: Optional[List[str]] = None, primary_key: Optional[List[str]] = None, + foreign_key: Optional[List[str]] = None, embedding_index: Optional[EmbeddingIndex] = None, hudi_precombine_key: Optional[str] = None, features: Optional[List[feature.Feature]] = None, @@ -820,6 +823,7 @@ def get_or_create_feature_group( time_travel_format=time_travel_format, partition_key=partition_key or [], primary_key=primary_key or [], + foreign_key=foreign_key or [], embedding_index=embedding_index, hudi_precombine_key=hudi_precombine_key, featurestore_id=self._id, @@ -855,6 +859,7 @@ def create_on_demand_feature_group( version: Optional[int] = None, description: Optional[str] = "", primary_key: Optional[List[str]] = None, + foreign_key: Optional[List[str]] = None, features: Optional[List[feature.Feature]] = None, statistics_config: Optional[Union[StatisticsConfig, bool, dict]] = None, event_time: Optional[str] = None, @@ -943,6 +948,7 @@ def create_on_demand_feature_group( version=version, description=description, primary_key=primary_key or [], + foreign_key=foreign_key or [], featurestore_id=self._id, featurestore_name=self._name, features=features or [], @@ -967,6 +973,7 @@ def create_external_feature_group( version: Optional[int] = None, description: Optional[str] = "", primary_key: Optional[List[str]] = None, + foreign_key: Optional[List[str]] = None, embedding_index: Optional[EmbeddingIndex] = None, features: Optional[List[feature.Feature]] = None, statistics_config: Optional[Union[StatisticsConfig, bool, dict]] = None, @@ -1101,6 +1108,7 @@ def create_external_feature_group( version=version, description=description, primary_key=primary_key or [], + foreign_key=foreign_key or [], embedding_index=embedding_index, featurestore_id=self._id, featurestore_name=self._name, @@ -1123,6 +1131,7 @@ def get_or_create_spine_group( version: Optional[int] = None, description: Optional[str] = "", primary_key: Optional[List[str]] = None, + foreign_key: Optional[List[str]] = None, event_time: Optional[str] = None, features: Optional[List[feature.Feature]] = None, dataframe: Union[ @@ -1249,6 +1258,7 @@ def get_or_create_spine_group( version=version, description=description, primary_key=primary_key or [], + foreign_key=foreign_key or [], event_time=event_time, features=features or [], dataframe=dataframe, diff --git a/python/hsfs/feature_view.py b/python/hsfs/feature_view.py index e30002775..0a25286f8 100644 --- a/python/hsfs/feature_view.py +++ b/python/hsfs/feature_view.py @@ -1471,9 +1471,9 @@ def create_training_data( td, write_options or {}, spine=spine, - primary_keys=kwargs.get("primary_keys") or primary_key, - event_time=event_time, - training_helper_columns=training_helper_columns, + #primary_keys=kwargs.get("primary_keys") or primary_key, + #event_time=event_time, + #training_helper_columns=training_helper_columns, ) warnings.warn( "Incremented version to `{}`.".format(td.version), @@ -1759,9 +1759,9 @@ def create_train_test_split( td, write_options or {}, spine=spine, - primary_keys=kwargs.get("primary_keys") or primary_key, - event_time=event_time, - training_helper_columns=training_helper_columns, + #primary_keys=kwargs.get("primary_keys") or primary_key, + #event_time=event_time, + #training_helper_columns=training_helper_columns, ) warnings.warn( "Incremented version to `{}`.".format(td.version), @@ -2043,9 +2043,9 @@ def create_train_validation_test_split( td, write_options or {}, spine=spine, - primary_keys=kwargs.get("primary_keys") or primary_key, - event_time=event_time, - training_helper_columns=training_helper_columns, + #primary_keys=kwargs.get("primary_keys") or primary_key, + #event_time=event_time, + #training_helper_columns=training_helper_columns, ) warnings.warn( "Incremented version to `{}`.".format(td.version), From 368599806b90d7d13bb80550b297de7418163813 Mon Sep 17 00:00:00 2001 From: davitbzh Date: Wed, 13 Nov 2024 19:45:02 +0100 Subject: [PATCH 2/8] FSTORE-1436 --- python/hsfs/constructor/query.py | 28 +++++++++++++++++++++++----- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/python/hsfs/constructor/query.py b/python/hsfs/constructor/query.py index 2fa4ae8d0..1a58b90e5 100644 --- a/python/hsfs/constructor/query.py +++ b/python/hsfs/constructor/query.py @@ -720,7 +720,12 @@ def _get_feature_by_name( query_features[feat.name] = query_features.get(feat.name, []) + [ feature_entry ] - for join_obj in self.joins: + + # collect joins. we do it recursively to collect nested joins. + joins = set(self.joins) + [self._fg_rec_add_joins(q_join, joins) for q_join in self.joins] + + for join_obj in joins: for feat in join_obj.query._left_features: feature_entry = ( feat, @@ -815,17 +820,30 @@ def get_feature(self, feature_name: str) -> Feature: """ return self._get_feature_by_name(feature_name)[0] - def _fg_rec_add(self, join_object, featuregroups): + def _fg_rec_add_joins(self, join_object, joins): + """ + Recursively get a query object from nested join and add to joins list. + + # Arguments + join_object: `Join object`. + """ + if len(join_object.query.joins) > 0: + for nested_join in join_object.query.joins: + self._fg_rec_add_joins(nested_join, joins) + for join in join_object.query.joins: + joins.add(join) + + def _fg_rec_add(self, join_object, feature_groups): """ - Recursively get a feature groups from nested join and add to featuregroups list. + Recursively get a feature groups from nested join and add to feature_groups list. # Arguments join_object: `Join object`. """ if len(join_object.query.joins) > 0: for nested_join in join_object.query.joins: - self._fg_rec_add(nested_join, featuregroups) - featuregroups.add(join_object.query._left_feature_group) + self._fg_rec_add(nested_join, feature_groups) + feature_groups.add(join_object.query._left_feature_group) def __getattr__(self, name: str) -> Any: try: From 43d7acf53b621556fdb88f96aecf920e1b933f27 Mon Sep 17 00:00:00 2001 From: davitbzh Date: Mon, 18 Nov 2024 17:50:09 +0100 Subject: [PATCH 3/8] FSTORE-1436 --- python/hsfs/feature_group.py | 24 +++++----- python/hsfs/feature_store.py | 9 ++++ python/hsfs/feature_view.py | 44 ------------------- .../test_external_feature_group_engine.py | 1 + .../tests/core/test_feature_group_engine.py | 36 +++++++++++++++ python/tests/test_feature_group.py | 21 +++++++-- 6 files changed, 75 insertions(+), 60 deletions(-) diff --git a/python/hsfs/feature_group.py b/python/hsfs/feature_group.py index cd6431686..67dee693b 100644 --- a/python/hsfs/feature_group.py +++ b/python/hsfs/feature_group.py @@ -262,6 +262,7 @@ def select_all( self, include_primary_key: Optional[bool] = True, include_foreign_key: Optional[bool] = True, + include_partition_key: Optional[bool] = True, include_event_time: Optional[bool] = True, ) -> query.Query: """Select all features along with primary key and event time from the feature group and return a query object. @@ -310,6 +311,10 @@ def select_all( # Arguments include_primary_key: If True, include primary key of the feature group to the feature list. Defaults to True. + include_foreign_key: If True, include foreign key of the feature group + to the feature list. Defaults to True. + include_partition_key: If True, include partition key of the feature group + to the feature list. Defaults to True. include_event_time: If True, include event time of the feature group to the feature list. Defaults to True. # Returns @@ -323,13 +328,15 @@ def select_all( feature_store_id=self._feature_store_id, ) elif include_event_time: - return self.select_except(self.primary_key + self.foreign_key) + return self.select_except(self.primary_key + self.foreign_key + self.partition_key) elif include_primary_key: - return self.select_except(self.foreign_key + [self.event_time]) + return self.select_except(self.foreign_key + self.partition_key + [self.event_time]) elif include_foreign_key: - return self.select_except(self.primary_key + [self.event_time]) - else: + return self.select_except(self.primary_key + self.partition_key + [self.event_time]) + elif include_partition_key: return self.select_except(self.primary_key + self.foreign_key + [self.event_time]) + else: + return self.select_except(self.primary_key + self.partition_key + self.foreign_key + [self.event_time]) def select_features( self, @@ -428,7 +435,7 @@ def select_features( # Returns `Query`. A query object with all features of the feature group. """ - query = self.select_except(self.primary_key + self.foreign_key + [self.event_time]) + query = self.select_except(self.primary_key + self.partition_key + self.foreign_key + [self.event_time]) _logger.info( f"Using {[f.name for f in query.features]} as features for the query." "To include primary key and event time use `select_all`." @@ -4334,7 +4341,6 @@ def __init__( version: Optional[int] = None, description: Optional[str] = None, primary_key: Optional[List[str]] = None, - foreign_key: Optional[List[str]] = None, featurestore_id: Optional[int] = None, featurestore_name: Optional[str] = None, created: Optional[str] = None, @@ -4412,14 +4418,8 @@ def __init__( if self._features else [] ) - self.foreign_key = ( - [feat.name for feat in self._features if feat.foreign is True] - if self._features - else [] - ) else: self.primary_key = primary_key - self.foreign_key = foreign_key self._features = features self._href = href diff --git a/python/hsfs/feature_store.py b/python/hsfs/feature_store.py index 8be0e1bea..8958aadb9 100644 --- a/python/hsfs/feature_store.py +++ b/python/hsfs/feature_store.py @@ -751,6 +751,9 @@ def get_or_create_feature_group( feature group. This primary key can be a composite key of multiple features and will be used as joining key, if not specified otherwise. Defaults to empty list `[]`, and the feature group won't have any primary key. + foreign_key: A list of feature names to be used as foreign key for the feature group. + Foreign key is referencing the primary key of another feature group and can be used as joining key. + Defaults to empty list `[]`, and the feature group won't have any foreign key. embedding_index: [`EmbeddingIndex`](./embedding_index_api.md). If an embedding index is provided, the vector database is used as online feature store. This enables similarity search by using [`find_neighbors`](./feature_group_api.md#find_neighbors). @@ -906,6 +909,9 @@ def create_on_demand_feature_group( feature group. This primary key can be a composite key of multiple features and will be used as joining key, if not specified otherwise. Defaults to empty list `[]`, and the feature group won't have any primary key. + foreign_key: A list of feature names to be used as foreign key for the feature group. + Foreign key is referencing the primary key of another feature group and can be used as joining key. + Defaults to empty list `[]`, and the feature group won't have any foreign key. features: Optionally, define the schema of the external feature group manually as a list of `Feature` objects. Defaults to empty list `[]` and will use the schema information of the DataFrame resulting by executing the provided query @@ -1065,6 +1071,9 @@ def create_external_feature_group( feature group. This primary key can be a composite key of multiple features and will be used as joining key, if not specified otherwise. Defaults to empty list `[]`, and the feature group won't have any primary key. + foreign_key: A list of feature names to be used as foreign key for the feature group. + Foreign key is referencing the primary key of another feature group and can be used as joining key. + Defaults to empty list `[]`, and the feature group won't have any foreign key. features: Optionally, define the schema of the external feature group manually as a list of `Feature` objects. Defaults to empty list `[]` and will use the schema information of the DataFrame resulting by executing the provided query diff --git a/python/hsfs/feature_view.py b/python/hsfs/feature_view.py index 0a25286f8..faeb57105 100644 --- a/python/hsfs/feature_view.py +++ b/python/hsfs/feature_view.py @@ -1269,9 +1269,6 @@ def create_training_data( statistics_config: Optional[Union[StatisticsConfig, bool, dict]] = None, write_options: Optional[Dict[Any, Any]] = None, spine: Optional[SplineDataFrameTypes] = None, - primary_key: bool = False, - event_time: bool = False, - training_helper_columns: bool = False, **kwargs, ) -> Tuple[int, job.Job]: """Create the metadata for a training dataset and save the corresponding training data into `location`. @@ -1436,14 +1433,6 @@ def create_training_data( It is possible to directly pass a spine group instead of a dataframe to overwrite the left side of the feature join, however, the same features as in the original feature group that is being replaced need to be available in the spine group. - primary_key: whether to include primary key features or not. Defaults to `False`, no primary key - features. - event_time: whether to include event time feature or not. Defaults to `False`, no event time feature. - training_helper_columns: whether to include training helper columns or not. Training helper columns are a - list of feature names in the feature view, defined during its creation, that are not the part of the - model schema itself but can be used during training as a helper for extra information. - If training helper columns were not defined in the feature view then`training_helper_columns=True` - will not have any effect. Defaults to `False`, no training helper columns. # Returns (td_version, `Job`): Tuple of training dataset version and job. When using the `python` engine, it returns the Hopsworks Job @@ -1471,9 +1460,6 @@ def create_training_data( td, write_options or {}, spine=spine, - #primary_keys=kwargs.get("primary_keys") or primary_key, - #event_time=event_time, - #training_helper_columns=training_helper_columns, ) warnings.warn( "Incremented version to `{}`.".format(td.version), @@ -1502,9 +1488,6 @@ def create_train_test_split( statistics_config: Optional[Union[StatisticsConfig, bool, dict]] = None, write_options: Optional[Dict[Any, Any]] = None, spine: Optional[SplineDataFrameTypes] = None, - primary_key: bool = False, - event_time: bool = False, - training_helper_columns: bool = False, **kwargs, ) -> Tuple[int, job.Job]: """Create the metadata for a training dataset and save the corresponding training data into `location`. @@ -1715,15 +1698,6 @@ def create_train_test_split( It is possible to directly pass a spine group instead of a dataframe to overwrite the left side of the feature join, however, the same features as in the original feature group that is being replaced need to be available in the spine group. - primary_key: whether to include primary key features or not. Defaults to `False`, no primary key - features. - event_time: whether to include event time feature or not. Defaults to `False`, no event time feature. - training_helper_columns: whether to include training helper columns or not. - Training helper columns are a list of feature names in the feature view, defined during its creation, - that are not the part of the model schema itself but can be used during training as a helper for - extra information. If training helper columns were not defined in the feature view - then`training_helper_columns=True` will not have any effect. Defaults to `False`, no training helper - columns. # Returns (td_version, `Job`): Tuple of training dataset version and job. When using the `python` engine, it returns the Hopsworks Job @@ -1759,9 +1733,6 @@ def create_train_test_split( td, write_options or {}, spine=spine, - #primary_keys=kwargs.get("primary_keys") or primary_key, - #event_time=event_time, - #training_helper_columns=training_helper_columns, ) warnings.warn( "Incremented version to `{}`.".format(td.version), @@ -1792,9 +1763,6 @@ def create_train_validation_test_split( statistics_config: Optional[Union[StatisticsConfig, bool, dict]] = None, write_options: Optional[Dict[Any, Any]] = None, spine: Optional[SplineDataFrameTypes] = None, - primary_key: bool = False, - event_time: bool = False, - training_helper_columns: bool = False, **kwargs, ) -> Tuple[int, job.Job]: """Create the metadata for a training dataset and save the corresponding training data into `location`. @@ -1991,15 +1959,6 @@ def create_train_validation_test_split( It is possible to directly pass a spine group instead of a dataframe to overwrite the left side of the feature join, however, the same features as in the original feature group that is being replaced need to be available in the spine group. - primary_key: whether to include primary key features or not. Defaults to `False`, no primary key - features. - event_time: whether to include event time feature or not. Defaults to `False`, no event time feature. - training_helper_columns: whether to include training helper columns or not. - Training helper columns are a list of feature names in the feature view, defined during its creation, - that are not the part of the model schema itself but can be used during training as a helper for - extra information. If training helper columns were not defined in the feature view - then`training_helper_columns=True` will not have any effect. Defaults to `False`, no training helper - columns. # Returns (td_version, `Job`): Tuple of training dataset version and job. When using the `python` engine, it returns the Hopsworks Job @@ -2043,9 +2002,6 @@ def create_train_validation_test_split( td, write_options or {}, spine=spine, - #primary_keys=kwargs.get("primary_keys") or primary_key, - #event_time=event_time, - #training_helper_columns=training_helper_columns, ) warnings.warn( "Incremented version to `{}`.".format(td.version), diff --git a/python/tests/core/test_external_feature_group_engine.py b/python/tests/core/test_external_feature_group_engine.py index aa208b4e8..c7c29cb7b 100644 --- a/python/tests/core/test_external_feature_group_engine.py +++ b/python/tests/core/test_external_feature_group_engine.py @@ -74,6 +74,7 @@ def test_save_primary_key(self, mocker): version=1, featurestore_id=feature_store_id, primary_key=["f"], + foreign_key=[], storage_connector=mocker.patch("hsfs.storage_connector.JdbcConnector"), ) diff --git a/python/tests/core/test_feature_group_engine.py b/python/tests/core/test_feature_group_engine.py index e57f2c0c3..b2609a75c 100644 --- a/python/tests/core/test_feature_group_engine.py +++ b/python/tests/core/test_feature_group_engine.py @@ -42,6 +42,7 @@ def test_save(self, mocker): version=1, featurestore_id=feature_store_id, primary_key=[], + foreign_key=[], partition_key=[], id=10, ) @@ -80,6 +81,7 @@ def test(feature): version=1, featurestore_id=feature_store_id, primary_key=[], + foreign_key=[], partition_key=[], transformation_functions=[test], id=10, @@ -121,6 +123,7 @@ def test_save_ge_report(self, mocker): version=1, featurestore_id=feature_store_id, primary_key=[], + foreign_key=[], partition_key=[], id=10, ) @@ -169,6 +172,7 @@ def test_insert(self, mocker): version=1, featurestore_id=feature_store_id, primary_key=[], + foreign_key=[], partition_key=[], ) @@ -215,6 +219,7 @@ def test(feature): featurestore_id=feature_store_id, transformation_functions=[test], primary_key=[], + foreign_key=[], partition_key=[], ) @@ -260,6 +265,7 @@ def test_insert_id(self, mocker): version=1, featurestore_id=feature_store_id, primary_key=[], + foreign_key=[], partition_key=[], id=10, ) @@ -308,6 +314,7 @@ def test_insert_ge_report(self, mocker): version=1, featurestore_id=feature_store_id, primary_key=[], + foreign_key=[], partition_key=[], ) @@ -360,6 +367,7 @@ def test_insert_storage(self, mocker): version=1, featurestore_id=feature_store_id, primary_key=[], + foreign_key=[], partition_key=[], ) @@ -405,6 +413,7 @@ def test_insert_overwrite(self, mocker): version=1, featurestore_id=feature_store_id, primary_key=[], + foreign_key=[], partition_key=[], ) @@ -438,6 +447,7 @@ def test_delete(self, mocker): version=1, featurestore_id=feature_store_id, primary_key=[], + foreign_key=[], partition_key=[], id=10, ) @@ -466,6 +476,7 @@ def test_commit_details(self, mocker): version=1, featurestore_id=feature_store_id, primary_key=[], + foreign_key=[], partition_key=[], id=10, ) @@ -499,6 +510,7 @@ def test_commit_details_time_travel_format(self, mocker): version=1, featurestore_id=feature_store_id, primary_key=[], + foreign_key=[], partition_key=[], time_travel_format="wrong", id=10, @@ -533,6 +545,7 @@ def test_commit_details_time_travel_format_hudi(self, mocker): version=1, featurestore_id=feature_store_id, primary_key=[], + foreign_key=[], partition_key=[], time_travel_format="HUDI", id=10, @@ -564,6 +577,7 @@ def test_commit_details_time_travel_format_hudi_fg_commit(self, mocker): version=1, featurestore_id=feature_store_id, primary_key=[], + foreign_key=[], partition_key=[], time_travel_format="HUDI", id=10, @@ -608,6 +622,7 @@ def test_commit_delete(self, mocker): version=1, featurestore_id=feature_store_id, primary_key=[], + foreign_key=[], partition_key=[], id=10, ) @@ -634,6 +649,7 @@ def test_clean_delta(self, mocker): version=1, featurestore_id=feature_store_id, primary_key=[], + foreign_key=[], partition_key=[], id=10, time_travel_format="DELTA", @@ -658,6 +674,7 @@ def test_clean_hudi(self, mocker): version=1, featurestore_id=feature_store_id, primary_key=[], + foreign_key=[], partition_key=[], id=10, time_travel_format="HUDI", @@ -734,6 +751,7 @@ def test_update_features_metadata(self, mocker): version=1, featurestore_id=feature_store_id, primary_key=[], + foreign_key=[], partition_key=[], id=10, ) @@ -790,6 +808,7 @@ def test_append_features(self, mocker): version=1, featurestore_id=feature_store_id, primary_key=[], + foreign_key=[], partition_key=[], features=[f, f1], id=10, @@ -822,6 +841,7 @@ def test_update_description(self, mocker): version=1, featurestore_id=feature_store_id, primary_key=[], + foreign_key=[], partition_key=[], id=10, ) @@ -848,6 +868,7 @@ def test_get_subject(self, mocker): version=1, featurestore_id=feature_store_id, primary_key=[], + foreign_key=[], partition_key=[], id=10, ) @@ -881,6 +902,7 @@ def test_insert_stream(self, mocker): version=1, featurestore_id=feature_store_id, primary_key=[], + foreign_key=[], partition_key=[], id=10, ) @@ -931,6 +953,7 @@ def test_insert_stream_online_enabled(self, mocker): version=1, featurestore_id=feature_store_id, primary_key=[], + foreign_key=[], partition_key=[], online_enabled=True, ) @@ -980,6 +1003,7 @@ def test_insert_stream_stream(self, mocker): version=1, featurestore_id=feature_store_id, primary_key=[], + foreign_key=[], partition_key=[], stream=True, ) @@ -1028,6 +1052,7 @@ def test(feature): version=1, featurestore_id=feature_store_id, primary_key=[], + foreign_key=[], partition_key=[], transformation_functions=[test], stream=True, @@ -1078,6 +1103,7 @@ def test_insert_stream_online_enabled_id(self, mocker): version=1, featurestore_id=feature_store_id, primary_key=[], + foreign_key=[], partition_key=[], online_enabled=True, id=10, @@ -1277,6 +1303,7 @@ def test_save_feature_group_metadata(self, mocker): version=1, featurestore_id=feature_store_id, primary_key=[], + foreign_key=[], partition_key=[], id=10, ) @@ -1325,6 +1352,7 @@ def test_save_feature_group_metadata_features(self, mocker): version=1, featurestore_id=feature_store_id, primary_key=[], + foreign_key=[], partition_key=[], features=[f], id=10, @@ -1374,6 +1402,7 @@ def test_save_feature_group_metadata_primary_partition_precombine(self, mocker): version=1, featurestore_id=feature_store_id, primary_key=["f"], + foreign_key=[], partition_key=["f"], hudi_precombine_key="f", time_travel_format="HUDI", @@ -1426,6 +1455,7 @@ def test_save_feature_group_metadata_primary_partition_precombine_event_error( version=1, featurestore_id=feature_store_id, primary_key=["feature_name"], + foreign_key=[], partition_key=["f"], hudi_precombine_key="f", event_time="f", @@ -1442,6 +1472,7 @@ def test_save_feature_group_metadata_primary_partition_precombine_event_error( version=1, featurestore_id=feature_store_id, primary_key=["f"], + foreign_key=[], partition_key=["feature_name"], hudi_precombine_key="f", event_time="f", @@ -1458,6 +1489,7 @@ def test_save_feature_group_metadata_primary_partition_precombine_event_error( version=1, featurestore_id=feature_store_id, primary_key=["f"], + foreign_key=[], partition_key=["f"], hudi_precombine_key="feature_name", event_time="f", @@ -1474,6 +1506,7 @@ def test_save_feature_group_metadata_primary_partition_precombine_event_error( version=1, featurestore_id=feature_store_id, primary_key=["f"], + foreign_key=[], partition_key=["f"], hudi_precombine_key="f", event_time="feature_name", @@ -1530,6 +1563,7 @@ def test_save_feature_group_metadata_write_options(self, mocker): version=1, featurestore_id=feature_store_id, primary_key=[], + foreign_key=[], partition_key=[], stream=True, id=10, @@ -1576,6 +1610,7 @@ def test(feature): version=1, featurestore_id=feature_store_id, primary_key=[], + foreign_key=[], partition_key=[], id=10, transformation_functions=[test("col2")], @@ -1618,6 +1653,7 @@ def test(feature): version=1, featurestore_id=feature_store_id, primary_key=[], + foreign_key=[], partition_key=[], id=10, transformation_functions=[test("col2")], diff --git a/python/tests/test_feature_group.py b/python/tests/test_feature_group.py index ea25bbff3..873483a81 100644 --- a/python/tests/test_feature_group.py +++ b/python/tests/test_feature_group.py @@ -46,11 +46,13 @@ time_travel_format="HUDI", partition_key=[], primary_key=["pk"], + foreign_key=["fk"], hudi_precombine_key="pk", featurestore_id=1, featurestore_name="fs", features=[ feature.Feature("pk", primary=True), + feature.Feature("fk", foreign=False), feature.Feature("ts", primary=False), feature.Feature("f1", primary=False), feature.Feature("f2", primary=False), @@ -342,8 +344,8 @@ def test_constructor_with_list_event_time_for_compatibility( def test_select_all(self): query = test_feature_group.select_all() features = query.features - assert len(features) == 4 - assert set([f.name for f in features]) == {"pk", "ts", "f1", "f2"} + assert len(features) == 5 + assert set([f.name for f in features]) == {"pk", "fk", "ts", "f1", "f2"} def test_select_all_exclude_pk(self): query = test_feature_group.select_all(include_primary_key=False) @@ -362,8 +364,8 @@ def test_select_all_exclude_pk_ts(self): include_primary_key=False, include_event_time=False ) features = query.features - assert len(features) == 2 - assert set([f.name for f in features]) == {"f1", "f2"} + assert len(features) == 3 + assert set([f.name for f in features]) == {"f1", "f2", "fk"} def test_select_features(self): query = test_feature_group.select_features() @@ -382,6 +384,7 @@ def test_materialization_job(self, mocker): version=2, featurestore_id=99, primary_key=[], + foreign_key=[], partition_key=[], id=10, ) @@ -427,6 +430,7 @@ def test_materialization_job_retry_success(self, mocker): version=2, featurestore_id=99, primary_key=[], + foreign_key=[], partition_key=[], id=10, ) @@ -461,6 +465,7 @@ def test_materialization_job_retry_fail(self, mocker): version=2, featurestore_id=99, primary_key=[], + foreign_key=[], partition_key=[], id=10, ) @@ -479,6 +484,7 @@ def test_multi_part_insert_return_writer(self, mocker): version=2, featurestore_id=99, primary_key=[], + foreign_key=[], partition_key=[], id=10, ) @@ -498,6 +504,7 @@ def test_multi_part_insert_call_insert(self, mocker, dataframe_fixture_basic): version=2, featurestore_id=99, primary_key=[], + foreign_key=[], partition_key=[], id=10, ) @@ -523,6 +530,7 @@ def test_save_feature_list(self, mocker): version=2, featurestore_id=99, primary_key=[], + foreign_key=[], partition_key=[], ) @@ -547,6 +555,7 @@ def test_save_feature_in_create(self, mocker): featurestore_id=99, features=features, primary_key=[], + foreign_key=[], partition_key=[], ) @@ -559,6 +568,7 @@ def test_save_exception_empty_input(self): version=2, featurestore_id=99, primary_key=[], + foreign_key=[], partition_key=[], ) @@ -585,6 +595,7 @@ def test_save_with_non_feature_list(self, mocker): version=2, featurestore_id=99, primary_key=[], + foreign_key=[], partition_key=[], ) @@ -611,6 +622,7 @@ def test_save_report_true_default(self, mocker, dataframe_fixture_basic): version=2, featurestore_id=99, primary_key=[], + foreign_key=[], partition_key=[], id=10, ) @@ -644,6 +656,7 @@ def test_save_report_default_overwritable(self, mocker, dataframe_fixture_basic) version=2, featurestore_id=99, primary_key=[], + foreign_key=[], partition_key=[], id=10, ) From f8ed8226a85c2b0d762de812043e376744c4ba6f Mon Sep 17 00:00:00 2001 From: davitbzh Date: Wed, 20 Nov 2024 10:29:17 +0100 Subject: [PATCH 4/8] reformat --- python/hopsworks_common/core/dataset_api.py | 50 ++++++++++--- python/hsfs/core/delta_engine.py | 32 +++------ python/hsfs/core/feature_group_api.py | 4 +- python/hsfs/core/hudi_engine.py | 14 ++-- python/hsfs/engine/python.py | 20 ++++-- python/hsfs/engine/spark.py | 72 ++++++++++--------- python/hsfs/feature_group.py | 37 +++++++--- python/hsfs/feature_store.py | 8 ++- python/hsfs/storage_connector.py | 4 +- python/hsml/core/hdfs_api.py | 1 - python/tests/core/test_arrow_flight_client.py | 2 +- .../tests/core/test_feature_group_engine.py | 4 +- python/tests/fixtures/model_fixtures.py | 1 + python/tests/test_feature_group.py | 12 +++- 14 files changed, 155 insertions(+), 106 deletions(-) diff --git a/python/hopsworks_common/core/dataset_api.py b/python/hopsworks_common/core/dataset_api.py index f7ce40743..644412d32 100644 --- a/python/hopsworks_common/core/dataset_api.py +++ b/python/hopsworks_common/core/dataset_api.py @@ -208,8 +208,10 @@ def upload( if self.exists(destination_path): if overwrite: - if 'datasetType' in self._get(destination_path): - raise DatasetException("overwrite=True not supported on a top-level dataset") + if "datasetType" in self._get(destination_path): + raise DatasetException( + "overwrite=True not supported on a top-level dataset" + ) else: self.remove(destination_path) else: @@ -240,7 +242,14 @@ def upload( # uploading files in the same folder is done concurrently futures = [ executor.submit( - self._upload_file, f_name, root + os.sep + f_name, remote_base_path, chunk_size, simultaneous_chunks, max_chunk_retries, chunk_retry_interval + self._upload_file, + f_name, + root + os.sep + f_name, + remote_base_path, + chunk_size, + simultaneous_chunks, + max_chunk_retries, + chunk_retry_interval, ) for f_name in files ] @@ -252,13 +261,28 @@ def upload( except Exception as e: raise e else: - self._upload_file(file_name, local_path, upload_path, chunk_size, simultaneous_chunks, max_chunk_retries, chunk_retry_interval) + self._upload_file( + file_name, + local_path, + upload_path, + chunk_size, + simultaneous_chunks, + max_chunk_retries, + chunk_retry_interval, + ) return upload_path + "/" + os.path.basename(local_path) - - def _upload_file(self, file_name, local_path, upload_path, chunk_size, simultaneous_chunks, max_chunk_retries, chunk_retry_interval): - + def _upload_file( + self, + file_name, + local_path, + upload_path, + chunk_size, + simultaneous_chunks, + max_chunk_retries, + chunk_retry_interval, + ): file_size = os.path.getsize(local_path) num_chunks = math.ceil(file_size / chunk_size) @@ -508,8 +532,10 @@ def copy(self, source_path: str, destination_path: str, overwrite: bool = False) """ if self.exists(destination_path): if overwrite: - if 'datasetType' in self._get(destination_path): - raise DatasetException("overwrite=True not supported on a top-level dataset") + if "datasetType" in self._get(destination_path): + raise DatasetException( + "overwrite=True not supported on a top-level dataset" + ) else: self.remove(destination_path) else: @@ -551,8 +577,10 @@ def move(self, source_path: str, destination_path: str, overwrite: bool = False) """ if self.exists(destination_path): if overwrite: - if 'datasetType' in self._get(destination_path): - raise DatasetException("overwrite=True not supported on a top-level dataset") + if "datasetType" in self._get(destination_path): + raise DatasetException( + "overwrite=True not supported on a top-level dataset" + ) else: self.remove(destination_path) else: diff --git a/python/hsfs/core/delta_engine.py b/python/hsfs/core/delta_engine.py index 452052c54..c3f4052e9 100644 --- a/python/hsfs/core/delta_engine.py +++ b/python/hsfs/core/delta_engine.py @@ -57,9 +57,7 @@ def register_temporary_table(self, delta_fg_alias, read_options): delta_options = self._setup_delta_read_opts(delta_fg_alias, read_options) self._spark_session.read.format(self.DELTA_SPARK_FORMAT).options( **delta_options - ).load(location).createOrReplaceTempView( - delta_fg_alias.alias - ) + ).load(location).createOrReplaceTempView(delta_fg_alias.alias) def _setup_delta_read_opts(self, delta_fg_alias, read_options): delta_options = {} @@ -89,16 +87,12 @@ def _setup_delta_read_opts(self, delta_fg_alias, read_options): def delete_record(self, delete_df): location = self._feature_group.prepare_spark_location() - if not DeltaTable.isDeltaTable( - self._spark_session, location - ): + if not DeltaTable.isDeltaTable(self._spark_session, location): raise FeatureStoreException( f"This is no data available in Feature group {self._feature_group.name}, or it not DELTA enabled " ) else: - fg_source_table = DeltaTable.forPath( - self._spark_session, location - ) + fg_source_table = DeltaTable.forPath(self._spark_session, location) source_alias = ( f"{self._feature_group.name}_{self._feature_group.version}_source" @@ -112,9 +106,7 @@ def delete_record(self, delete_df): delete_df.alias(updates_alias), merge_query_str ).whenMatchedDelete().execute() - fg_commit = self._get_last_commit_metadata( - self._spark_session, location - ) + fg_commit = self._get_last_commit_metadata(self._spark_session, location) return self._feature_group_api.commit(self._feature_group, fg_commit) def _write_delta_dataset(self, dataset, write_options): @@ -123,9 +115,7 @@ def _write_delta_dataset(self, dataset, write_options): if write_options is None: write_options = {} - if not DeltaTable.isDeltaTable( - self._spark_session, location - ): + if not DeltaTable.isDeltaTable(self._spark_session, location): ( dataset.write.format(DeltaEngine.DELTA_SPARK_FORMAT) .options(**write_options) @@ -138,9 +128,7 @@ def _write_delta_dataset(self, dataset, write_options): .save(location) ) else: - fg_source_table = DeltaTable.forPath( - self._spark_session, location - ) + fg_source_table = DeltaTable.forPath(self._spark_session, location) source_alias = ( f"{self._feature_group.name}_{self._feature_group.version}_source" @@ -154,13 +142,13 @@ def _write_delta_dataset(self, dataset, write_options): dataset.alias(updates_alias), merge_query_str ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute() - return self._get_last_commit_metadata( - self._spark_session, location - ) + return self._get_last_commit_metadata(self._spark_session, location) def vacuum(self, retention_hours: int): location = self._feature_group.prepare_spark_location() - retention = f"RETAIN {retention_hours} HOURS" if retention_hours is not None else "" + retention = ( + f"RETAIN {retention_hours} HOURS" if retention_hours is not None else "" + ) self._spark_session.sql(f"VACUUM '{location}' {retention}") def _generate_merge_query(self, source_alias, updates_alias): diff --git a/python/hsfs/core/feature_group_api.py b/python/hsfs/core/feature_group_api.py index 037228c73..49d5f2e05 100644 --- a/python/hsfs/core/feature_group_api.py +++ b/python/hsfs/core/feature_group_api.py @@ -446,9 +446,7 @@ def update_table_schema( headers = {"content-type": "application/json"} return job.Job.from_response_json( - _client._send_request( - "POST", path_params, headers=headers - ), + _client._send_request("POST", path_params, headers=headers), ) def get_parent_feature_groups( diff --git a/python/hsfs/core/hudi_engine.py b/python/hsfs/core/hudi_engine.py index e96b8ea56..34fc58932 100644 --- a/python/hsfs/core/hudi_engine.py +++ b/python/hsfs/core/hudi_engine.py @@ -20,9 +20,13 @@ class HudiEngine: - - HUDI_SPEC_FEATURE_NAMES = ["_hoodie_record_key", "_hoodie_partition_path", - "_hoodie_commit_time", "_hoodie_file_name", "_hoodie_commit_seqno"] + HUDI_SPEC_FEATURE_NAMES = [ + "_hoodie_record_key", + "_hoodie_partition_path", + "_hoodie_commit_time", + "_hoodie_file_name", + "_hoodie_commit_seqno", + ] HUDI_SPARK_FORMAT = "org.apache.hudi" HUDI_TABLE_NAME = "hoodie.table.name" @@ -109,9 +113,7 @@ def register_temporary_table(self, hudi_fg_alias, read_options): hudi_options = self._setup_hudi_read_opts(hudi_fg_alias, read_options) self._spark_session.read.format(self.HUDI_SPARK_FORMAT).options( **hudi_options - ).load(location).createOrReplaceTempView( - hudi_fg_alias.alias - ) + ).load(location).createOrReplaceTempView(hudi_fg_alias.alias) def _write_hudi_dataset(self, dataset, save_mode, operation, write_options): location = self._feature_group.prepare_spark_location() diff --git a/python/hsfs/engine/python.py b/python/hsfs/engine/python.py index a6de77364..6faa8dea6 100644 --- a/python/hsfs/engine/python.py +++ b/python/hsfs/engine/python.py @@ -1203,11 +1203,11 @@ def save_stream_dataframe( "Stream ingestion is not available on Python environments, because it requires Spark as engine." ) - def update_table_schema(self, feature_group: Union[FeatureGroup, ExternalFeatureGroup]) -> None: + def update_table_schema( + self, feature_group: Union[FeatureGroup, ExternalFeatureGroup] + ) -> None: _job = self._feature_group_api.update_table_schema(feature_group) - _job._wait_for_job( - await_termination=True - ) + _job._wait_for_job(await_termination=True) def _get_app_options( self, user_write_options: Optional[Dict[str, Any]] = None @@ -1516,7 +1516,11 @@ def _write_dataframe_kafka( now = datetime.now(timezone.utc) feature_group.materialization_job.run( args=feature_group.materialization_job.config.get("defaultArgs", "") - + (f" -initialCheckPointString {initial_check_point}" if initial_check_point else ""), + + ( + f" -initialCheckPointString {initial_check_point}" + if initial_check_point + else "" + ), await_termination=offline_write_options.get("wait_for_job", False), ) offline_backfill_every_hr = offline_write_options.pop( @@ -1546,7 +1550,11 @@ def _write_dataframe_kafka( # provide the initial_check_point as it will reduce the read amplification of materialization job feature_group.materialization_job.run( args=feature_group.materialization_job.config.get("defaultArgs", "") - + (f" -initialCheckPointString {initial_check_point}" if initial_check_point else ""), + + ( + f" -initialCheckPointString {initial_check_point}" + if initial_check_point + else "" + ), await_termination=offline_write_options.get("wait_for_job", False), ) return feature_group.materialization_job diff --git a/python/hsfs/engine/spark.py b/python/hsfs/engine/spark.py index 67e15468b..507be67b3 100644 --- a/python/hsfs/engine/spark.py +++ b/python/hsfs/engine/spark.py @@ -198,7 +198,9 @@ def register_external_temporary_table(self, external_fg, alias): external_fg.query, external_fg.data_format, external_fg.options, - external_fg.storage_connector._get_path(external_fg.path), # cant rely on location since this method can be used before FG is saved + external_fg.storage_connector._get_path( + external_fg.path + ), # cant rely on location since this method can be used before FG is saved ) else: external_dataset = external_fg.dataframe @@ -222,9 +224,7 @@ def register_hudi_temporary_table( read_options, ) - self.reconcile_schema( - hudi_fg_alias, read_options, hudi_engine_instance - ) + self.reconcile_schema(hudi_fg_alias, read_options, hudi_engine_instance) def register_delta_temporary_table( self, delta_fg_alias, feature_store_id, feature_store_name, read_options @@ -242,16 +242,14 @@ def register_delta_temporary_table( read_options, ) - self.reconcile_schema( - delta_fg_alias, read_options, delta_engine_instance - ) + self.reconcile_schema(delta_fg_alias, read_options, delta_engine_instance) - def reconcile_schema( - self, fg_alias, read_options, engine_instance - ): + def reconcile_schema(self, fg_alias, read_options, engine_instance): if sorted(self._spark_session.table(fg_alias.alias).columns) != sorted( - [feature.name for feature in fg_alias.feature_group._features] + - hudi_engine.HudiEngine.HUDI_SPEC_FEATURE_NAMES if fg_alias.feature_group.time_travel_format == "HUDI" else [] + [feature.name for feature in fg_alias.feature_group._features] + + hudi_engine.HudiEngine.HUDI_SPEC_FEATURE_NAMES + if fg_alias.feature_group.time_travel_format == "HUDI" + else [] ): full_fg = feature_group_api.FeatureGroupApi().get( feature_store_id=fg_alias.feature_group._feature_store_id, @@ -644,28 +642,28 @@ def _serialize_to_avro( ) def _deserialize_from_avro( - self, - feature_group: Union[fg_mod.FeatureGroup, fg_mod.ExternalFeatureGroup], - dataframe: Union[RDD, DataFrame], - ): - """ - Deserializes 'value' column from binary using avro schema and unpacks it into columns. - """ - decoded_dataframe = dataframe.select( - from_avro("value", feature_group._get_encoded_avro_schema()).alias("value") - ).select(col("value.*")) + self, + feature_group: Union[fg_mod.FeatureGroup, fg_mod.ExternalFeatureGroup], + dataframe: Union[RDD, DataFrame], + ): + """ + Deserializes 'value' column from binary using avro schema and unpacks it into columns. + """ + decoded_dataframe = dataframe.select( + from_avro("value", feature_group._get_encoded_avro_schema()).alias("value") + ).select(col("value.*")) - """Decodes all complex type features from binary using their avro type as schema.""" - return decoded_dataframe.select( - [ - field["name"] - if field["name"] not in feature_group.get_complex_features() - else from_avro( - field["name"], feature_group._get_feature_avro_schema(field["name"]) - ).alias(field["name"]) - for field in json.loads(feature_group.avro_schema)["fields"] - ] - ) + """Decodes all complex type features from binary using their avro type as schema.""" + return decoded_dataframe.select( + [ + field["name"] + if field["name"] not in feature_group.get_complex_features() + else from_avro( + field["name"], feature_group._get_feature_avro_schema(field["name"]) + ).alias(field["name"]) + for field in json.loads(feature_group.avro_schema)["fields"] + ] + ) def get_training_data( self, @@ -1353,7 +1351,9 @@ def _save_empty_dataframe(self, feature_group): for _feature in feature_group.features: if _feature.name not in dataframe.columns: - dataframe = dataframe.withColumn(_feature.name, lit(None).cast(_feature.type)) + dataframe = dataframe.withColumn( + _feature.name, lit(None).cast(_feature.type) + ) self.save_dataframe( feature_group, @@ -1372,7 +1372,9 @@ def _add_cols_to_delta_table(self, feature_group): for _feature in feature_group.features: if _feature.name not in dataframe.columns: - dataframe = dataframe.withColumn(_feature.name, lit(None).cast(_feature.type)) + dataframe = dataframe.withColumn( + _feature.name, lit(None).cast(_feature.type) + ) dataframe.limit(0).write.format("delta").mode("append").option( "mergeSchema", "true" diff --git a/python/hsfs/feature_group.py b/python/hsfs/feature_group.py index 67dee693b..c0763d315 100644 --- a/python/hsfs/feature_group.py +++ b/python/hsfs/feature_group.py @@ -328,15 +328,28 @@ def select_all( feature_store_id=self._feature_store_id, ) elif include_event_time: - return self.select_except(self.primary_key + self.foreign_key + self.partition_key) + return self.select_except( + self.primary_key + self.foreign_key + self.partition_key + ) elif include_primary_key: - return self.select_except(self.foreign_key + self.partition_key + [self.event_time]) + return self.select_except( + self.foreign_key + self.partition_key + [self.event_time] + ) elif include_foreign_key: - return self.select_except(self.primary_key + self.partition_key + [self.event_time]) + return self.select_except( + self.primary_key + self.partition_key + [self.event_time] + ) elif include_partition_key: - return self.select_except(self.primary_key + self.foreign_key + [self.event_time]) + return self.select_except( + self.primary_key + self.foreign_key + [self.event_time] + ) else: - return self.select_except(self.primary_key + self.partition_key + self.foreign_key + [self.event_time]) + return self.select_except( + self.primary_key + + self.partition_key + + self.foreign_key + + [self.event_time] + ) def select_features( self, @@ -435,7 +448,9 @@ def select_features( # Returns `Query`. A query object with all features of the feature group. """ - query = self.select_except(self.primary_key + self.partition_key + self.foreign_key + [self.event_time]) + query = self.select_except( + self.primary_key + self.partition_key + self.foreign_key + [self.event_time] + ) _logger.info( f"Using {[f.name for f in query.features]} as features for the query." "To include primary key and event time use `select_all`." @@ -2075,7 +2090,7 @@ def storage_connector(self) -> "sc.StorageConnector": def prepare_spark_location(self) -> str: location = self.location - if (self.storage_connector is not None): + if self.storage_connector is not None: location = self.storage_connector.prepare_spark(location) return location @@ -2348,8 +2363,10 @@ def __init__( self._hudi_precombine_key = ( util.autofix_feature_name(hudi_precombine_key) if hudi_precombine_key is not None - and (self._time_travel_format is None - or self._time_travel_format == "HUDI") + and ( + self._time_travel_format is None + or self._time_travel_format == "HUDI" + ) else None ) self.statistics_config = statistics_config @@ -3279,7 +3296,7 @@ def delta_vacuum( self, retention_hours: int = None, ) -> None: - """ Vacuum files that are no longer referenced by a Delta table and are older than the retention threshold. + """Vacuum files that are no longer referenced by a Delta table and are older than the retention threshold. This method can only be used on feature groups stored as DELTA. !!! example diff --git a/python/hsfs/feature_store.py b/python/hsfs/feature_store.py index 8958aadb9..825dcf7c5 100644 --- a/python/hsfs/feature_store.py +++ b/python/hsfs/feature_store.py @@ -527,7 +527,9 @@ def create_feature_group( ] ] = None, offline_backfill_every_hr: Optional[Union[int, str]] = None, - storage_connector: Union[storage_connector.StorageConnector, Dict[str, Any]] = None, + storage_connector: Union[ + storage_connector.StorageConnector, Dict[str, Any] + ] = None, path: Optional[str] = None, ) -> feature_group.FeatureGroup: """Create a feature group metadata object. @@ -702,7 +704,9 @@ def get_or_create_feature_group( ] = None, online_config: Optional[Union[OnlineConfig, Dict[str, Any]]] = None, offline_backfill_every_hr: Optional[Union[int, str]] = None, - storage_connector: Union[storage_connector.StorageConnector, Dict[str, Any]] = None, + storage_connector: Union[ + storage_connector.StorageConnector, Dict[str, Any] + ] = None, path: Optional[str] = None, ) -> Union[ feature_group.FeatureGroup, diff --git a/python/hsfs/storage_connector.py b/python/hsfs/storage_connector.py index 15ccdc8d6..57c45167b 100644 --- a/python/hsfs/storage_connector.py +++ b/python/hsfs/storage_connector.py @@ -137,9 +137,7 @@ def spark_options(self) -> None: pass def prepare_spark(self, path: Optional[str] = None) -> Optional[str]: - _logger.info( - "This Storage Connector cannot be prepared for Spark." - ) + _logger.info("This Storage Connector cannot be prepared for Spark.") return path def read( diff --git a/python/hsml/core/hdfs_api.py b/python/hsml/core/hdfs_api.py index d786bce37..657ec04b4 100644 --- a/python/hsml/core/hdfs_api.py +++ b/python/hsml/core/hdfs_api.py @@ -21,7 +21,6 @@ class HdfsApi: def __init__(self): - import fsspec.implementations.arrow as pfs host, port = os.environ["LIBHDFS_DEFAULT_FS"].split(":") diff --git a/python/tests/core/test_arrow_flight_client.py b/python/tests/core/test_arrow_flight_client.py index e67006d67..fb891d92f 100644 --- a/python/tests/core/test_arrow_flight_client.py +++ b/python/tests/core/test_arrow_flight_client.py @@ -431,7 +431,7 @@ def test_construct_query_object_snowflake(self, mocker, backend_fixtures): "connectors": { "test.tpch1snowflake_1": { "time_travel_type": None, - "type": 'SNOWFLAKE', + "type": "SNOWFLAKE", "options": { "user": "test_user", "account": "test_url", diff --git a/python/tests/core/test_feature_group_engine.py b/python/tests/core/test_feature_group_engine.py index b2609a75c..02205e5fa 100644 --- a/python/tests/core/test_feature_group_engine.py +++ b/python/tests/core/test_feature_group_engine.py @@ -820,9 +820,7 @@ def test_append_features(self, mocker): fg_engine.append_features(feature_group=fg, new_features=[f1, f2]) # Assert - assert ( - mock_engine_get_instance.return_value.update_table_schema.call_count == 1 - ) + assert mock_engine_get_instance.return_value.update_table_schema.call_count == 1 assert len(mock_fg_engine_update_features_metadata.call_args[0][1]) == 4 def test_update_description(self, mocker): diff --git a/python/tests/fixtures/model_fixtures.py b/python/tests/fixtures/model_fixtures.py index 9b3796d05..8dfc59833 100644 --- a/python/tests/fixtures/model_fixtures.py +++ b/python/tests/fixtures/model_fixtures.py @@ -66,6 +66,7 @@ def model_tensorflow(): def model_torch(): return TorchModel(MODEL_TORCH_ID, MODEL_TORCH_NAME) + @pytest.fixture def model_llm(): return LLMModel(MODEL_LLM_ID, MODEL_LLM_NAME) diff --git a/python/tests/test_feature_group.py b/python/tests/test_feature_group.py index 873483a81..b8dcc12d0 100644 --- a/python/tests/test_feature_group.py +++ b/python/tests/test_feature_group.py @@ -945,7 +945,9 @@ def test_prepare_spark_location_with_s3_connector(self, mocker, backend_fixtures json = backend_fixtures["feature_group"]["get_basic_info"]["response"] fg = feature_group.FeatureGroup.from_response_json(json) fg._location = f"{fg.name}_{fg.version}" - fg._storage_connector = storage_connector.S3Connector(id=1, name="s3_conn", featurestore_id=fg.feature_store_id) + fg._storage_connector = storage_connector.S3Connector( + id=1, name="s3_conn", featurestore_id=fg.feature_store_id + ) # Act path = fg.prepare_spark_location() @@ -955,7 +957,9 @@ def test_prepare_spark_location_with_s3_connector(self, mocker, backend_fixtures engine_instance.assert_called_once() refetch_api.assert_called_once() - def test_prepare_spark_location_with_s3_connector_python(self, mocker, backend_fixtures): + def test_prepare_spark_location_with_s3_connector_python( + self, mocker, backend_fixtures + ): # Arrange engine = python.Engine() engine_instance = mocker.patch("hsfs.engine.get_instance", return_value=engine) @@ -963,7 +967,9 @@ def test_prepare_spark_location_with_s3_connector_python(self, mocker, backend_f json = backend_fixtures["feature_group"]["get_basic_info"]["response"] fg = feature_group.FeatureGroup.from_response_json(json) fg._location = f"{fg.name}_{fg.version}" - fg._storage_connector = storage_connector.S3Connector(id=1, name="s3_conn", featurestore_id=fg.feature_store_id) + fg._storage_connector = storage_connector.S3Connector( + id=1, name="s3_conn", featurestore_id=fg.feature_store_id + ) # Act with pytest.raises(AttributeError): From bd5d29e8d4e5606db9e4c321a57b092fcf53a875 Mon Sep 17 00:00:00 2001 From: davitbzh Date: Wed, 20 Nov 2024 11:52:52 +0100 Subject: [PATCH 5/8] reformat --- python/hsfs/constructor/query.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/hsfs/constructor/query.py b/python/hsfs/constructor/query.py index 1a58b90e5..5c0b8c70d 100644 --- a/python/hsfs/constructor/query.py +++ b/python/hsfs/constructor/query.py @@ -830,8 +830,8 @@ def _fg_rec_add_joins(self, join_object, joins): if len(join_object.query.joins) > 0: for nested_join in join_object.query.joins: self._fg_rec_add_joins(nested_join, joins) - for join in join_object.query.joins: - joins.add(join) + for q_join in join_object.query.joins: + joins.add(q_join) def _fg_rec_add(self, join_object, feature_groups): """ From d705c222e9ed957c46704124133852afb4324c74 Mon Sep 17 00:00:00 2001 From: davitbzh <44586065+davitbzh@users.noreply.github.com> Date: Tue, 3 Dec 2024 16:17:59 +0100 Subject: [PATCH 6/8] Update python/hsfs/feature_group.py Co-authored-by: manu-sj <152865565+manu-sj@users.noreply.github.com> --- python/hsfs/feature_group.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/hsfs/feature_group.py b/python/hsfs/feature_group.py index c0763d315..d426bcf21 100644 --- a/python/hsfs/feature_group.py +++ b/python/hsfs/feature_group.py @@ -320,7 +320,7 @@ def select_all( # Returns `Query`. A query object with all features of the feature group. """ - if include_event_time and include_primary_key and include_foreign_key: + if include_event_time and include_primary_key and include_foreign_key and include_partition_key: return query.Query( left_feature_group=self, left_features=self._features, From 7fe58eb8dc762b153992e0e2b2b70029d953e3b2 Mon Sep 17 00:00:00 2001 From: davitbzh <44586065+davitbzh@users.noreply.github.com> Date: Tue, 3 Dec 2024 16:20:26 +0100 Subject: [PATCH 7/8] Update python/tests/test_feature_group.py Co-authored-by: manu-sj <152865565+manu-sj@users.noreply.github.com> --- python/tests/test_feature_group.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/tests/test_feature_group.py b/python/tests/test_feature_group.py index b8dcc12d0..fca9953e8 100644 --- a/python/tests/test_feature_group.py +++ b/python/tests/test_feature_group.py @@ -52,7 +52,7 @@ featurestore_name="fs", features=[ feature.Feature("pk", primary=True), - feature.Feature("fk", foreign=False), + feature.Feature("fk", foreign=True), feature.Feature("ts", primary=False), feature.Feature("f1", primary=False), feature.Feature("f2", primary=False), From 9b168fb48daf326bec74a2dcf4b368a073ce038b Mon Sep 17 00:00:00 2001 From: davitbzh Date: Tue, 3 Dec 2024 21:19:37 +0100 Subject: [PATCH 8/8] format --- python/hsfs/feature_group.py | 38 +++++++++++------------------- python/hsfs/feature_store.py | 6 +++++ python/tests/test_feature_group.py | 14 +++++++---- 3 files changed, 30 insertions(+), 28 deletions(-) diff --git a/python/hsfs/feature_group.py b/python/hsfs/feature_group.py index d426bcf21..8562924a8 100644 --- a/python/hsfs/feature_group.py +++ b/python/hsfs/feature_group.py @@ -320,36 +320,26 @@ def select_all( # Returns `Query`. A query object with all features of the feature group. """ - if include_event_time and include_primary_key and include_foreign_key and include_partition_key: + removed_keys = [] + + if not include_event_time: + removed_keys += [self.event_time] + if not include_primary_key: + removed_keys += self.primary_key + if not include_foreign_key: + removed_keys += self.foreign_key + if not include_partition_key: + removed_keys += self.partition_key + + if removed_keys: + return self.select_except(removed_keys) + else: return query.Query( left_feature_group=self, left_features=self._features, feature_store_name=self._feature_store_name, feature_store_id=self._feature_store_id, ) - elif include_event_time: - return self.select_except( - self.primary_key + self.foreign_key + self.partition_key - ) - elif include_primary_key: - return self.select_except( - self.foreign_key + self.partition_key + [self.event_time] - ) - elif include_foreign_key: - return self.select_except( - self.primary_key + self.partition_key + [self.event_time] - ) - elif include_partition_key: - return self.select_except( - self.primary_key + self.foreign_key + [self.event_time] - ) - else: - return self.select_except( - self.primary_key - + self.partition_key - + self.foreign_key - + [self.event_time] - ) def select_features( self, diff --git a/python/hsfs/feature_store.py b/python/hsfs/feature_store.py index 825dcf7c5..5a36e85a4 100644 --- a/python/hsfs/feature_store.py +++ b/python/hsfs/feature_store.py @@ -588,6 +588,9 @@ def plus_two(value): feature group. This primary key can be a composite key of multiple features and will be used as joining key, if not specified otherwise. Defaults to empty list `[]`, and the feature group won't have any primary key. + foreign_key: A list of feature names to be used as foreign key for the feature group. + Foreign key is referencing the primary key of another feature group and can be used as joining key. + Defaults to empty list `[]`, and the feature group won't have any foreign key. embedding_index: [`EmbeddingIndex`](./embedding_index_api.md). If an embedding index is provided, vector database is used as online feature store. This enables similarity search by using [`find_neighbors`](./feature_group_api.md#find_neighbors). @@ -1238,6 +1241,9 @@ def get_or_create_spine_group( spine group. This primary key can be a composite key of multiple features and will be used as joining key, if not specified otherwise. Defaults to empty list `[]`, and the spine group won't have any primary key. + foreign_key: A list of feature names to be used as foreign key for the feature group. + Foreign key is referencing the primary key of another feature group and can be used as joining key. + Defaults to empty list `[]`, and the feature group won't have any foreign key. event_time: Optionally, provide the name of the feature containing the event time for the features in this spine group. If event_time is set the spine group can be used for point-in-time joins. Defaults to `None`. diff --git a/python/tests/test_feature_group.py b/python/tests/test_feature_group.py index fca9953e8..cdd0a6129 100644 --- a/python/tests/test_feature_group.py +++ b/python/tests/test_feature_group.py @@ -350,14 +350,20 @@ def test_select_all(self): def test_select_all_exclude_pk(self): query = test_feature_group.select_all(include_primary_key=False) features = query.features - assert len(features) == 3 - assert set([f.name for f in features]) == {"ts", "f1", "f2"} + assert len(features) == 4 + assert set([f.name for f in features]) == {"ts", "fk", "f1", "f2"} + + def test_select_all_exclude_fk(self): + query = test_feature_group.select_all(include_foreign_key=False) + features = query.features + assert len(features) == 4 + assert set([f.name for f in features]) == {"f1", "f2", "pk", "ts"} def test_select_all_exclude_ts(self): query = test_feature_group.select_all(include_event_time=False) features = query.features - assert len(features) == 3 - assert set([f.name for f in features]) == {"pk", "f1", "f2"} + assert len(features) == 4 + assert set([f.name for f in features]) == {"pk", "fk", "f1", "f2"} def test_select_all_exclude_pk_ts(self): query = test_feature_group.select_all(