-
Notifications
You must be signed in to change notification settings - Fork 26
/
Copy pathtransformation_function.py
235 lines (201 loc) · 7.76 KB
/
transformation_function.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
#
# Copyright 2021. Logical Clocks AB
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import annotations
import copy
import json
from typing import Any, Dict, List, Optional, Union
import humps
from hsfs import util
from hsfs.client.exceptions import FeatureStoreException
from hsfs.core import transformation_function_engine
from hsfs.decorators import typechecked
from hsfs.hopsworks_udf import HopsworksUdf
@typechecked
class TransformationFunction:
"""
Main DTO class for transformation functions.
Attributes
----------
id (int) : Id of transformation function.
version (int) : Version of transformation function.
hopsworks_udf (HopsworksUdf): Meta data class for user defined functions.
"""
def __init__(
self,
featurestore_id: int,
hopsworks_udf: HopsworksUdf,
version: Optional[int] = None,
id: Optional[int] = None,
type=None,
items=None,
count=None,
href=None,
**kwargs,
):
self._id: int = id
self._featurestore_id: int = featurestore_id
self._version: int = version
self._transformation_function_engine = (
transformation_function_engine.TransformationFunctionEngine(
self._featurestore_id
)
)
if not isinstance(hopsworks_udf, HopsworksUdf):
raise FeatureStoreException(
"Please use the hopsworks_udf decorator when defining transformation functions."
)
self._hopsworks_udf: HopsworksUdf = hopsworks_udf
def save(self) -> None:
"""Save a transformation function into the backend.
!!! example
```python
# import hopsworks udf decorator
from hsfs.hopsworks_udf import HopsworksUdf
# define function
@udf(int)
def plus_one(value):
return value + 1
# create transformation function
plus_one_meta = fs.create_transformation_function(
transformation_function=plus_one,
version=1
)
# persist transformation function in backend
plus_one_meta.save()
```
"""
self._transformation_function_engine.save(self)
def delete(self) -> None:
"""Delete transformation function from backend.
!!! example
```python
# import hopsworks udf decorator
from hsfs.hopsworks_udf import HopsworksUdf
# define function
@udf(int)
def plus_one(value):
return value + 1
# create transformation function
plus_one_meta = fs.create_transformation_function(
transformation_function=plus_one,
version=1
)
# persist transformation function in backend
plus_one_meta.save()
# retrieve transformation function
plus_one_fn = fs.get_transformation_function(name="plus_one")
# delete transformation function from backend
plus_one_fn.delete()
```
"""
self._transformation_function_engine.delete(self)
def __call__(self, *features: List[str]) -> TransformationFunction:
"""
Update the feature to be using in the transformation function
# Arguments
features: `List[str]`. Name of features to be passed to the User Defined function
# Returns
`HopsworksUdf`: Meta data class for the user defined function.
# Raises
`FeatureStoreException: If the provided number of features do not match the number of arguments in the defined UDF or if the provided feature names are not strings.
"""
# Deep copy so that the same transformation function can be used to create multiple new transformation function with different features.
transformation = copy.deepcopy(self)
transformation._hopsworks_udf = transformation._hopsworks_udf(*features)
return transformation
@classmethod
def from_response_json(
cls, json_dict: Dict[str, Any]
) -> Union[TransformationFunction, List[TransformationFunction]]:
"""
Function that constructs the class object from its json serialization.
# Arguments
json_dict: `Dict[str, Any]`. Json serialized dictionary for the class.
# Returns
`TransformationFunction`: Json deserialized class object.
"""
json_decamelized = humps.decamelize(json_dict)
if "count" in json_decamelized:
if json_decamelized["count"] == 0:
return []
for tffn_dto in json_decamelized["items"]:
if tffn_dto.get("hopsworks_udf", False):
tffn_dto["hopsworks_udf"] = HopsworksUdf.from_response_json(
tffn_dto["hopsworks_udf"]
)
if json_decamelized["count"] == 1:
return cls(**json_decamelized["items"][0])
else:
return [cls(**tffn_dto) for tffn_dto in json_decamelized["items"]]
else:
if json_decamelized.get("hopsworks_udf", False):
json_decamelized["hopsworks_udf"] = HopsworksUdf.from_response_json(
json_decamelized["hopsworks_udf"]
)
return cls(**json_decamelized)
def update_from_response_json(
self, json_dict: Dict[str, Any]
) -> TransformationFunction:
"""
Function that updates the class object from its json serialization.
# Arguments
json_dict: `Dict[str, Any]`. Json serialized dictionary for the class.
# Returns
`TransformationFunction`: Json deserialized class object.
"""
json_decamelized = humps.decamelize(json_dict)
self.__init__(**json_decamelized)
return self
def json(self) -> str:
"""
Convert class into its json serialized form.
# Returns
`str`: Json serialized object.
"""
return json.dumps(self, cls=util.FeatureStoreEncoder)
def to_dict(self) -> Dict[str, Any]:
"""
Convert class into a dictionary.
# Returns
`Dict`: Dictionary that contains all data required to json serialize the object.
"""
return {
"id": self._id,
"version": self._version,
"featurestoreId": self._featurestore_id,
"hopsworksUdf": self._hopsworks_udf,
}
@property
def id(self) -> id:
"""Transformation function id."""
return self._id
@id.setter
def id(self, id: int) -> None:
self._id = id
@property
def version(self) -> int:
"""Version of the transformation function."""
return self._version
@version.setter
def version(self, version: int) -> None:
self._version = version
@property
def hopsworks_udf(self) -> HopsworksUdf:
"""Meta data class for the user defined transformation function."""
return self._hopsworks_udf
@property
def output_column_names(self) -> List[str]:
"""Output column names of transformation functions"""
return self._hopsworks_udf._output_column_names