Skip to content

Commit

Permalink
adding alias function to modify output column names
Browse files Browse the repository at this point in the history
  • Loading branch information
manu-sj committed Dec 9, 2024
1 parent 99bbaf5 commit a0ca9ce
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 49 deletions.
8 changes: 8 additions & 0 deletions python/hopsworks_common/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,14 @@ class OPENSEARCH_CONFIG:
CA_CERTS = "ca_certs"


class FEATURES:
"""
Class that stores constants about a feature.
"""

MAX_LENGTH_NAME = 63


class KAFKA_SSL_CONFIG:
"""
Kafka SSL constant strings for configuration
Expand Down
8 changes: 5 additions & 3 deletions python/hsfs/core/feature_monitoring_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import humps
from hopsworks_common.client.exceptions import FeatureStoreException
from hopsworks_common.constants import FEATURES
from hsfs import util
from hsfs.core import (
feature_monitoring_config_engine,
Expand All @@ -34,7 +35,6 @@
from hsfs.core.job_schedule import JobSchedule


MAX_LENGTH_NAME = 63
MAX_LENGTH_DESCRIPTION = 2000


Expand Down Expand Up @@ -686,8 +686,10 @@ def name(self, name: str):
raise AttributeError("The name of a registered config is read-only.")
elif not isinstance(name, str):
raise TypeError("name must be of type str")
if len(name) > MAX_LENGTH_NAME:
raise ValueError("name must be less than {MAX_LENGTH_NAME} characters.")
if len(name) > FEATURES.MAX_LENGTH_NAME:
raise ValueError(
"name must be less than {FEATURES.MAX_LENGTH_NAME} characters."
)
self._name = name

@property
Expand Down
65 changes: 57 additions & 8 deletions python/hsfs/hopsworks_udf.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import humps
from hopsworks_common.client.exceptions import FeatureStoreException
from hopsworks_common.constants import FEATURES
from hsfs import engine, util
from hsfs.core.feature_descriptive_statistics import FeatureDescriptiveStatistics
from hsfs.decorators import typechecked
Expand Down Expand Up @@ -173,6 +174,7 @@ def __init__(
dropped_argument_names: Optional[List[str]] = None,
dropped_feature_names: Optional[List[str]] = None,
feature_name_prefix: Optional[str] = None,
output_column_names: Optional[str] = None,
):
self._return_types: List[str] = HopsworksUdf._validate_and_convert_output_types(
return_types
Expand All @@ -191,6 +193,12 @@ def __init__(
if isinstance(func, Callable)
else func
)

# The parameter `output_column_names` is initialized lazily.
# It is only initialized if the output column names are retrieved from the backend or explicitly specified using the `alias` function or is initialized with default column names if the UDF is accessed from a transformation function.
# Output column names are only stored in the backend when a model dependent or on demand transformation function is created using the defined UDF.
self._output_column_names: List[str] = []

if not transformation_features:
# New transformation function being declared so extract source code from function
self._transformation_features: List[TransformationFeature] = (
Expand All @@ -211,6 +219,7 @@ def __init__(
)
)
self._dropped_features = self._dropped_argument_names

else:
self._transformation_features = transformation_features
self._transformation_function_argument_names = (
Expand All @@ -222,15 +231,16 @@ def __init__(
if dropped_feature_names
else dropped_argument_names
)
self._output_column_names = (
output_column_names if output_column_names else []
)

self._formatted_function_source, self._module_imports = (
HopsworksUdf._format_source_code(self._function_source)
)

self._statistics: Optional[TransformationStatistics] = None

self._output_column_names: List[str] = []

@staticmethod
def _validate_and_convert_drop_features(
dropped_features: Union[str, List[str]],
Expand Down Expand Up @@ -691,6 +701,41 @@ def __call__(self, *features: List[str]) -> "HopsworksUdf":
udf.dropped_features = updated_dropped_features
return udf

def alias(self, *args: str):
"""
Set the names of the transformed features output by the UDF.
"""
if len(args) == 1 and isinstance(args[0], list):
# If a single list is passed, use it directly
output_col_names = args[0]
else:
# Otherwise, use the individual arguments as a list
output_col_names = list(args)
if any(
not isinstance(output_col_name, str) for output_col_name in output_col_names
):
raise FeatureStoreException(
f"Invalid output feature names provided for the transformation function '{repr(self)}'. Please ensure all arguments are strings."
)

self.output_column_names = output_col_names

return self

def _validate_output_col_name(self, output_col_names):
if any(
len(output_col_name) > FEATURES.MAX_LENGTH_NAME
for output_col_name in output_col_names
):
raise FeatureStoreException(
f"Invalid output feature names specified for the transformation function '{repr(self)}'. Please provide names shorter than {FEATURES.MAX_LENGTH_NAME} characters."
)

if output_col_names and len(output_col_names) != len(self.return_types):
raise FeatureStoreException(
f"Provided names for output columns does not match the number of columns returned from the UDF. Please provide {len(self.return_types)} names."
)

def update_return_type_one_hot(self):
self._return_types = [
self._return_types[0]
Expand Down Expand Up @@ -765,6 +810,7 @@ def to_dict(self) -> Dict[str, Any]:
"name": self.function_name,
"featureNamePrefix": self._feature_name_prefix,
"executionMode": self.execution_mode.value.upper(),
"outputColumnNames": self.output_column_names,
}

def json(self) -> str:
Expand Down Expand Up @@ -826,6 +872,12 @@ def from_response_json(
else None
)

output_column_names = (
[feature.strip() for feature in json_decamelized["output_column_names"]]
if json_decamelized.get("output_column_names", None)
else None
)

# Reconstructing statistics arguments.
arg_list, _, _, _ = HopsworksUdf._parse_function_signature(function_source_code)

Expand Down Expand Up @@ -870,6 +922,7 @@ def from_response_json(
execution_mode=UDFExecutionMode.from_string(
json_decamelized["execution_mode"]
),
output_column_names=output_column_names,
)

# Set transformation features if already set.
Expand Down Expand Up @@ -998,12 +1051,8 @@ def transformation_statistics(
def output_column_names(self, output_col_names: Union[str, List[str]]) -> None:
if not isinstance(output_col_names, List):
output_col_names = [output_col_names]
if not output_col_names and len(output_col_names) != len(self.return_types):
raise FeatureStoreException(
f"Provided names for output columns does not match the number of columns returned from the UDF. Please provide {len(self.return_types)} names."
)
else:
self._output_column_names = output_col_names
self._validate_output_col_name(output_col_names)
self._output_column_names = output_col_names

def __repr__(self):
return f'{self.function_name}({", ".join(self.transformation_features)})'
96 changes: 59 additions & 37 deletions python/hsfs/transformation_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import humps
from hopsworks_common.client.exceptions import FeatureStoreException
from hopsworks_common.constants import FEATURES
from hsfs import util
from hsfs.core import transformation_function_engine
from hsfs.core.feature_descriptive_statistics import FeatureDescriptiveStatistics
Expand Down Expand Up @@ -77,8 +78,13 @@ def __init__(
raise FeatureStoreException(
"Please use the hopsworks_udf decorator when defining transformation functions."
)
if not id and hopsworks_udf.output_column_names:
# Create a copy and reset the output column names of the UDF if the transformation function is newly created and the UDF has output column names assigned already.
# This happens for example if the same udf is used in a on-demand and a model-dependent transformation function.
hopsworks_udf._output_column_names = []
hopsworks_udf = copy.copy(hopsworks_udf)

self._hopsworks_udf: HopsworksUdf = hopsworks_udf
self.__hopsworks_udf: HopsworksUdf = hopsworks_udf
TransformationFunction._validate_transformation_type(
transformation_type=transformation_type, hopsworks_udf=hopsworks_udf
)
Expand Down Expand Up @@ -152,11 +158,8 @@ def __call__(self, *features: List[str]) -> TransformationFunction:
"""
# Deep copy so that the same transformation function can be used to create multiple new transformation function with different features.
transformation = copy.deepcopy(self)
transformation._hopsworks_udf = transformation._hopsworks_udf(*features)
# Regenerate output column names when setting new transformation features.
transformation._hopsworks_udf.output_column_names = (
transformation._get_output_column_names()
)
transformation.__hopsworks_udf = transformation.__hopsworks_udf(*features)

return transformation

@classmethod
Expand Down Expand Up @@ -227,9 +230,17 @@ def to_dict(self) -> Dict[str, Any]:
"id": self._id,
"version": self._version,
"featurestoreId": self._featurestore_id,
"hopsworksUdf": self._hopsworks_udf.to_dict(),
"hopsworksUdf": self.hopsworks_udf.to_dict(),
}

def alias(self, *args: str):
"""
Set the names of the transformed features output by the transformation function.
"""
self.__hopsworks_udf.alias(*args)

return self

def _get_output_column_names(self) -> str:
"""
Function that generates feature names for the transformed features
Expand All @@ -240,33 +251,43 @@ def _get_output_column_names(self) -> str:
# If function name matches the name of an input feature and the transformation function only returns one output feature then
# then the transformed output feature would have the same name as the input feature. i.e the input feature will get overwritten.
if (
len(self._hopsworks_udf.return_types) == 1
len(self.__hopsworks_udf.return_types) == 1
and any(
[
self.hopsworks_udf.function_name
self.__hopsworks_udf.function_name
== transformation_feature.feature_name
for transformation_feature in self.hopsworks_udf._transformation_features
for transformation_feature in self.__hopsworks_udf._transformation_features
]
)
and (
not self.hopsworks_udf.dropped_features
or self.hopsworks_udf.function_name
not in self.hopsworks_udf.dropped_features
not self.__hopsworks_udf.dropped_features
or self.__hopsworks_udf.function_name
not in self.__hopsworks_udf.dropped_features
)
):
return [self.hopsworks_udf.function_name]
output_col_names = [self.__hopsworks_udf.function_name]

if self.transformation_type == TransformationType.MODEL_DEPENDENT:
_BASE_COLUMN_NAME = f'{self._hopsworks_udf.function_name}_{"_".join(self._hopsworks_udf.transformation_features)}_'
if len(self._hopsworks_udf.return_types) > 1:
return [
_BASE_COLUMN_NAME = f'{self.__hopsworks_udf.function_name}_{"_".join(self.__hopsworks_udf.transformation_features)}_'
if len(self.__hopsworks_udf.return_types) > 1:
output_col_names = [
f"{_BASE_COLUMN_NAME}{i}"
for i in range(len(self._hopsworks_udf.return_types))
for i in range(len(self.__hopsworks_udf.return_types))
]
else:
return [f"{_BASE_COLUMN_NAME}"]
output_col_names = [f"{_BASE_COLUMN_NAME}"]
elif self.transformation_type == TransformationType.ON_DEMAND:
return [self._hopsworks_udf.function_name]
output_col_names = [self.__hopsworks_udf.function_name]

if any(
len(output_col_name) > FEATURES.MAX_LENGTH_NAME
for output_col_name in output_col_names
):
raise FeatureStoreException(
f"The default names for output features generated by the transformation function `{repr(self.__hopsworks_udf)}` exceeds the maximum length of {FEATURES.MAX_LENGTH_NAME} characters. Please use the `alias` function to assign shorter names to the output features."
)

return output_col_names

@staticmethod
def _validate_transformation_type(
Expand Down Expand Up @@ -311,7 +332,10 @@ def version(self, version: int) -> None:
@property
def hopsworks_udf(self) -> HopsworksUdf:
"""Meta data class for the user defined transformation function."""
return self._hopsworks_udf
# Make sure that the output column names for a model-dependent or on-demand transformation function, when accessed externally from the class.
if self.transformation_type and not self.__hopsworks_udf.output_column_names:
self.__hopsworks_udf.output_column_names = self._get_output_column_names()
return self.__hopsworks_udf

@property
def transformation_type(self) -> TransformationType:
Expand All @@ -321,41 +345,39 @@ def transformation_type(self) -> TransformationType:
@transformation_type.setter
def transformation_type(self, transformation_type) -> None:
self._transformation_type = transformation_type
# Generate output column names when setting transformation type
self._hopsworks_udf.output_column_names = self._get_output_column_names()

@property
def transformation_statistics(
self,
) -> Optional[TransformationStatistics]:
"""Feature statistics required for the defined UDF"""
return self.hopsworks_udf.transformation_statistics
return self.__hopsworks_udf.transformation_statistics

@transformation_statistics.setter
def transformation_statistics(
self, statistics: List[FeatureDescriptiveStatistics]
) -> None:
self.hopsworks_udf.transformation_statistics = statistics
self.__hopsworks_udf.transformation_statistics = statistics
# Generate output column names for one-hot encoder after transformation statistics is set.
# This is done because the number of output columns for one-hot encoding dependents on number of unique values in training dataset statistics.
if self.hopsworks_udf.function_name == "one_hot_encoder":
self._hopsworks_udf.output_column_names = self._get_output_column_names()
if self.__hopsworks_udf.function_name == "one_hot_encoder":
self.__hopsworks_udf.output_column_names = self._get_output_column_names()

@property
def output_column_names(self) -> List[str]:
"""Names of the output columns generated by the transformation functions"""
if self._hopsworks_udf.function_name == "one_hot_encoder" and len(
self._hopsworks_udf.output_column_names
) != len(self._hopsworks_udf.return_types):
self._hopsworks_udf.output_column_names = self._get_output_column_names()
return self._hopsworks_udf.output_column_names
if (
self.__hopsworks_udf.function_name == "one_hot_encoder"
and len(self.__hopsworks_udf.output_column_names)
!= len(self.__hopsworks_udf.return_types)
) or not self.__hopsworks_udf.output_column_names:
self.__hopsworks_udf.output_column_names = self._get_output_column_names()
return self.__hopsworks_udf.output_column_names

def __repr__(self):
if self.transformation_type == TransformationType.MODEL_DEPENDENT:
return (
f"Model-Dependent Transformation Function : {repr(self.hopsworks_udf)}"
)
return f"Model-Dependent Transformation Function : {repr(self.__hopsworks_udf)}"
elif self.transformation_type == TransformationType.ON_DEMAND:
return f"On-Demand Transformation Function : {repr(self.hopsworks_udf)}"
return f"On-Demand Transformation Function : {repr(self.__hopsworks_udf)}"
else:
return f"Transformation Function : {repr(self.hopsworks_udf)}"
return f"Transformation Function : {repr(self.__hopsworks_udf)}"
7 changes: 6 additions & 1 deletion python/tests/test_transformation_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,12 @@ def test2(col1):
transformation_type=TransformationType.MODEL_DEPENDENT,
)

assert tf.hopsworks_udf == test2
# Creating dict representation of udf.
udf_json = test2.to_dict()
# Adding output column names to dict for testing since it would be generated when UDF is accessed out the transformation function.
udf_json["outputColumnNames"] = ["test2_col1_"]

assert tf.hopsworks_udf.to_dict() == udf_json

def test_generate_output_column_names_one_argument_one_output_type(self):
@udf(int)
Expand Down

0 comments on commit a0ca9ce

Please sign in to comment.