Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FSTORE-1464] Unify tags_api, inode and dataset_api #225

Merged
merged 139 commits into from
Jul 22, 2024
Merged
Changes from 1 commit
Commits
Show all changes
139 commits
Select commit Hold shift + click to select a range
0505951
Initial commit
robzor92 Sep 13, 2021
91706f7
Add documentation building and stylecheck workflows (#2)
robzor92 Oct 15, 2021
7e51789
[HOPSWORKS-2745] New API for managing machine learning models (#3)
robzor92 Oct 21, 2021
7bc7052
CONTRIBUTING.md missing from root directory (#5)
robzor92 Oct 21, 2021
1324a1b
Machine Learning section renamed to Model Management (#9)
robzor92 Oct 22, 2021
42b9ed4
Remove Experiments and Serving from library description (#18)
robzor92 Oct 25, 2021
cb814b4
Progress bar for model export (#15)
robzor92 Oct 25, 2021
c6d366b
get_model_registry() should validate the existence of a Models datase…
robzor92 Oct 25, 2021
d3c80b3
Support passing list as signature (#23)
robzor92 Nov 10, 2021
c2b8687
Full support for shared model registry (#20)
robzor92 Nov 18, 2021
c79f6ca
Rename Signature to ModelSchema and add guide and examples (#27)
robzor92 Dec 7, 2021
0703744
Add checkstyle and suppressions.xml to Java submodule (#29)
robzor92 Dec 7, 2021
08f3ade
Model schema and input example should be an InodeDTO (#30)
robzor92 Dec 9, 2021
f534a29
Remove unnecessary move operation when saving model (#34)
robzor92 Dec 17, 2021
0ae8352
training_dataset field should only be set by backend (#40)
robzor92 Dec 21, 2021
74d6724
Input example should be the raw data (#38)
robzor92 Dec 28, 2021
7692b3f
Update mkdocs.yml to fix canonical URLs (#46)
robzor92 Jan 3, 2022
8dacc82
Rename path to model_path and make it absolute, add version_path (#49)
robzor92 Jan 14, 2022
6c441ca
Model tags API should use the model tags endpoints (#44)
robzor92 Jan 24, 2022
7818e00
schema and model_schema should be top level modules (#47)
robzor92 Jan 24, 2022
69ac1af
[HOPSWORKS-2827] Add serving module (#33)
javierdlrm Jan 27, 2022
78af96a
[HOPSWORKS-2827] Support dict as params (#53)
javierdlrm Jan 27, 2022
794621a
Add index.md and CONTRIBUTING.md to docs folder (#55)
robzor92 Jan 28, 2022
619369d
Add Compute and Administration sections to mkdocs (#52)
robzor92 Jan 28, 2022
f000835
Prepare for 2.6.0-SNAPSHOT development
robzor92 Jan 28, 2022
8e2cc07
[HOPSWORKS-2974] Check if istio is available (#57)
javierdlrm Feb 4, 2022
e363ade
Pin pip to 22.0.3 to avoid mkdocs build bug
robzor92 Feb 4, 2022
348ba78
Should be possible to export a single model file (#64)
robzor92 Feb 23, 2022
31ff998
Refactor API and add default kafka topic and resources (#63)
javierdlrm Feb 23, 2022
74c0142
Model export fails in external environment due to accessing old path …
robzor92 Mar 17, 2022
e2ec859
Model export fails in external environment due to accessing old path …
robzor92 Mar 17, 2022
697d59a
External clients instantiation fails due to invalid property (#70)
robzor92 Mar 17, 2022
90b220e
Remove region_name and secrets_store parameters (#79)
robzor92 Mar 19, 2022
db359ef
Disable documentating model deploy function (#81)
robzor92 Mar 28, 2022
ba294c3
Bump mkdocs, mkdocs-material and black
robzor92 Apr 4, 2022
65d5e29
Fixes for non-kube environment (#86)
javierdlrm Apr 21, 2022
a988c28
Add descriptive representation of objects (#88)
robzor92 Apr 29, 2022
1486fe2
[HOPSWORKS-3102][HOPSWORKS-2979] Support KServe and replace logicalcl…
javierdlrm May 4, 2022
ff35d72
Add istio external client (http-only) (#90)
javierdlrm May 4, 2022
62d8722
Set default serving tool to DEFAULT (#94)
javierdlrm May 5, 2022
a492e49
Add documentation for Model Serving (#93)
javierdlrm May 13, 2022
1a06a63
Fix util get_members (#98)
javierdlrm May 17, 2022
478f493
Add api.key auth fallback and project_path (#100)
javierdlrm May 23, 2022
7ab53fb
Update custom predictor script docs (#101)
javierdlrm Jun 3, 2022
1460e44
Print url to created models and deployments (#103)
robzor92 Jun 9, 2022
9197137
Refactor inference endpoints (#105)
javierdlrm Jun 15, 2022
62889ec
Validate metrics dict on client side (#107)
robzor92 Jun 16, 2022
eb05e2c
Filter deployments, print server logs and guidance messages (#108)
javierdlrm Jun 17, 2022
26cc2df
Improve predictor creation info messages (#109)
javierdlrm Jun 20, 2022
93d708b
Use hopsworks client if istio client not available (#111)
javierdlrm Jun 27, 2022
8a488e2
Add support for istio client with node ports (#112)
javierdlrm Jun 29, 2022
392d73a
Only keep API Reference (#114)
robzor92 Jun 30, 2022
9b009ec
Only keep API Reference, Append
robzor92 Jun 30, 2022
3b6723b
mkdocs-release link major.minor to major.minor.bugfix
robzor92 Jun 30, 2022
d93a9a2
Use same style as docs.hopsworks.ai (#115)
robzor92 Jul 4, 2022
6c57606
Show dropdown properly
robzor92 Jul 5, 2022
d876a8f
Add Tutorials section
robzor92 Jul 5, 2022
5676e16
Add redirect and expand sections by default
robzor92 Jul 6, 2022
d9769f9
Adjust API links to correct landing page
robzor92 Jul 6, 2022
e97d3ed
Update inject api links script to forward to correct API page
robzor92 Jul 7, 2022
cfc45d5
Add pymdown-extensions dependency (#118)
moritzmeister Jul 18, 2022
b92f614
Build istio host header with lower case (#122)
javierdlrm Jul 25, 2022
b53d1f9
Prepare development for 3.1.0-SNAPSHOT (#123)
javierdlrm Jul 25, 2022
dd8a60f
Add latest docs alias to mkdocs-release action and hide major version…
robzor92 Jul 26, 2022
87c2cbf
Adjust links for updated tutorials structure (#127)
moritzmeister Jul 26, 2022
c441cf3
Add serving resource allocation limits (#130)
javierdlrm Aug 3, 2022
8a05010
Improve default resource limits for deployments (#133)
javierdlrm Aug 3, 2022
4fad8c7
Support non-limited resource values (#135)
javierdlrm Aug 4, 2022
d9bca49
Use development version of mike to publish documentation (#139)
robzor92 Aug 9, 2022
9557cab
hopsworks link should point to login (#140)
robzor92 Aug 18, 2022
ba0e065
Selecting a different version dropdown should land on correct page (#…
robzor92 Sep 15, 2022
e2ff754
Separate scikit-learn from python deployments (#117)
javierdlrm Sep 15, 2022
40f6da8
[HWORKS-97] Fix default serving tool (#146)
javierdlrm Sep 21, 2022
829652e
[HOPSWORKS-3278] Payara5 fixes (#142)
ErmiasG Sep 28, 2022
8331f47
[HOPSWORKS-3278]-append type not set in DTO (#147)
ErmiasG Sep 29, 2022
774a757
[FSTORE-348] Refactor Tutorials documentation page (#148)
moritzmeister Oct 4, 2022
4bce241
[FSTORE-358] Add icon to external links in documentation navigation (…
moritzmeister Oct 6, 2022
acf8d38
[HWORKS-171] model version should be downloaded in temp directory ins…
robzor92 Oct 18, 2022
ba180fd
[HWORKS-167] Remove dev/SNAPSHOT version from hsfs, hsml and hopswork…
robzor92 Oct 21, 2022
ab8f5c7
[HWORKS-199] Should be possible to get python library version using _…
robzor92 Nov 1, 2022
94827d5
[HWORKS-125] Improve status of deployments (#157)
javierdlrm Nov 9, 2022
964797d
[HWORKS-125] Add missing status to deployment.save() method (#161)
javierdlrm Nov 16, 2022
3fbc4b4
[HWORKS-349] Add compatibility with 3.0 backend (#166)
javierdlrm Dec 20, 2022
c8a384c
Bump development version to 3.2.0-SNAPSHOT/3.2.0.dev1 (#168)
moritzmeister Dec 22, 2022
c744aca
Fix pre-commit with flake8 github repo (#165)
javierdlrm Dec 22, 2022
3bc7c6b
[HWORKS-313] Add model.deploy() method to docs (#163)
javierdlrm Feb 7, 2023
73f12d0
[HWORKS-220] zip/unzip for HA, swap steps (#172)
ErmiasG Feb 8, 2023
932bf7e
[HWORKS-126] Add idempotent methods for creating deployments
javierdlrm Feb 14, 2023
b9ac658
[HWORKS-352] Make input example consistent across data types
javierdlrm Feb 14, 2023
77b49f9
[HWORKS-413] Add docs for model evaluation metrics (#176)
javierdlrm Feb 17, 2023
cf68efd
[FSTORE-619] Add feature store javadoc entry to API doc (#180)
SirOibaf Mar 13, 2023
5462f7a
Bump development version to 3.3.0-SNAPSHOT/3.3.0.dev1 (#182)
moritzmeister Apr 14, 2023
b6dcf3e
[HWORKS-483] Add creating status for deployments (#183)
javierdlrm Apr 25, 2023
9493e33
[HWORKS-524] HSML ignores knative_domain parameter
SirOibaf Apr 25, 2023
4497357
[HWORKS-622] Fix istio client _close() call
javierdlrm Jun 30, 2023
66ee2bc
Bump development version to 3.4.0-SNAPSHOT/3.4.0.dev1 (#193)
kouzant Jul 7, 2023
43be7fc
[HWORKS-682] Include return type and description for model.download()…
robzor92 Aug 3, 2023
354a3f4
[HWORKS-731] Raise error if deployment cannot be created (#199)
javierdlrm Sep 17, 2023
9424ac7
Prepare development for 3.5.0.dev1
SirOibaf Sep 18, 2023
784b0b5
Trigger build
SirOibaf Sep 18, 2023
eee4f5b
[HWORKS-798] Add **kwargs to python client libraries (#205)
robzor92 Oct 18, 2023
8c99438
[HWORKS-829] Register models without compressing files (#206)
javierdlrm Nov 6, 2023
273c302
[HWORKS-565] parallelize uploads (#208)
ErmiasG Nov 9, 2023
4f17aad
[HWORKS-835] Download model artifact without compressing files (#209)
javierdlrm Nov 13, 2023
5029235
[HWORKS-835][Append] Check if path is a dir on item attributes (#211)
javierdlrm Nov 14, 2023
a68927d
Prepare for 3.7.0-SNAPSHOT development
SirOibaf Nov 23, 2023
5e626b8
[HWORKS-802] Python 3.12 support for clients (#213)
robzor92 Nov 30, 2023
cb37ad4
[HWORKS-882] Disallow exporting a model in the root path of a dataset…
robzor92 Dec 8, 2023
3a5a7c5
[HWORKS-135] Models backend should store metadata in tables instead o…
robzor92 Dec 18, 2023
0ddbcaa
Prepare for 3.8.0-SNAPSHOT development
SirOibaf Feb 6, 2024
69224fa
[HWORKS-888] Support for gRPC protocol in Python model deployments (#…
gibchikafa Mar 8, 2024
34de8ac
[HWORKS-888] Improve documentation for gRPC support (#219)
javierdlrm Mar 15, 2024
d692024
[HWORKS-936] Explicit provenance - model (#220)
bubriks Mar 19, 2024
19fd3b7
[HWORKS-1152] Add add_tag method + deprectation warning (#224)
vatj Apr 18, 2024
96b5b91
Revert "[HWORKS-1152] Add add_tag method + deprectation warning (#224…
vatj Apr 18, 2024
b256ef4
[HWORKS-1171] Make chunk size configurable when uploading models to t…
SirOibaf Apr 29, 2024
920ac39
[HWORKS-1161] explicit provenance - feature view - cache last trainin…
o-alex Apr 30, 2024
f51af57
[HWORKS-1161][APPEND] Fix model.to_dict when there's no td version (#…
o-alex May 1, 2024
f7b78f4
[FSTORE-1371][FSTORE-1346][FSTORE-1212] Migrate to ruff, pyproject.to…
vatj May 6, 2024
590f11c
Update python requirement to <3.13 (#234)
javierdlrm May 6, 2024
bad265d
[HWORKS-635] Add pytest for hsml pojo classes (#231)
javierdlrm May 7, 2024
0f2490a
[HWORKS-108] Allow users to stop deployments at any time (#233)
javierdlrm May 7, 2024
b42f28d
[HWORKS-936][APPEND] explicit provenance - model (#235)
o-alex May 13, 2024
43cdbab
[HWORKS-937] Explicit model provenance - hsml optimization - init ser…
o-alex May 15, 2024
6fe9d6a
[HWORKS-1309] Update release scripts to handle pyproject.toml (#238)
SirOibaf Jun 12, 2024
f47cbcb
[FSTORE-1426] add requirements.txt for documentation instead of using…
SirOibaf Jun 13, 2024
954a07b
Prepare for 4.0.0-SNAPSHOT development
SirOibaf Jun 13, 2024
5d6bca9
[HWORKS-1048] Support multiple modularized project environments (#240)
robzor92 Jul 5, 2024
b066aa9
[FSTORE-1411] On-Demand Transformation Functions (#1371)
manu-sj Jul 12, 2024
029def9
[FSTORE-1466] Refactor type convert out of python engine (#1363)
vatj Jul 12, 2024
cdee83d
Merge machine-learning-api into hsml subdirectory
aversey Jul 18, 2024
19c2075
Merge feature-store-api into hsfs subdirectory
aversey Jul 18, 2024
b3bb58a
[FSTORE-1454] Unify variable_api and project_api (#219)
aversey Jul 16, 2024
edeffbc
[FSTORE-1439] Merge hopsworks-api, feature-store-api and machine-lear…
aversey Jul 13, 2024
0790dab
[FSTORE-1439][APPEND] Apply ruff formatting and fixes to the merged r…
aversey Jul 17, 2024
c679078
Move decorators, constants, variable_api and client into hopsworks_co…
aversey Jul 18, 2024
7c25dc6
Revert "Merge feature-store-api into hsfs subdirectory"
aversey Jul 19, 2024
126818f
[FSTORE-1453] Move client, decorators, variable_api and constants to …
aversey Jul 18, 2024
6cc8e4d
Merge branch 'main-after-revert-rebase' of github.com:aversey/hopswor…
aversey Jul 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[FSTORE-1466] Refactor type convert out of python engine (#1363)
* Move type system converter to separate file

* More work on migrating type_system converter

* Minor clean ups

* Move tests around

* Moving more tests

* Move infer_spark_type

* More refactoring

* Mino test cleanups

* Fix mistype of _project_id in engine/python

* Fix engine/test_spark

---------

Co-authored-by: Aleksey Veresov <[email protected]>
vatj and aversey authored Jul 12, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
commit 029def97d48d8172df6040380c910d679a5a9d9c
4 changes: 3 additions & 1 deletion python/hsfs/core/constants.py
Original file line number Diff line number Diff line change
@@ -27,8 +27,10 @@
)
initialise_expectation_suite_for_single_expectation_api_message = "Initialize Expectation Suite by attaching to a Feature Group to enable single expectation API"

# Numpy
HAS_ARROW: bool = importlib.util.find_spec("pyarrow") is not None
HAS_PANDAS: bool = importlib.util.find_spec("pandas") is not None
HAS_NUMPY: bool = importlib.util.find_spec("numpy") is not None
HAS_POLARS: bool = importlib.util.find_spec("polars") is not None

# SQL packages
HAS_SQLALCHEMY: bool = importlib.util.find_spec("sqlalchemy") is not None
385 changes: 385 additions & 0 deletions python/hsfs/core/type_systems.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,385 @@
#
# Copyright 2024 Hopsworks AB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import annotations

import ast
import datetime
import decimal
from typing import TYPE_CHECKING, Literal, Union

import pytz
from hsfs.core.constants import HAS_ARROW, HAS_PANDAS, HAS_POLARS


if TYPE_CHECKING:
import numpy as np
import pandas as pd
import polars as pl

if HAS_ARROW:
import pyarrow as pa

# Decimal types are currently not supported
_INT_TYPES = [pa.uint8(), pa.uint16(), pa.int8(), pa.int16(), pa.int32()]
_BIG_INT_TYPES = [pa.uint32(), pa.int64()]
_FLOAT_TYPES = [pa.float16(), pa.float32()]
_DOUBLE_TYPES = [pa.float64()]
_TIMESTAMP_UNIT = ["ns", "us", "ms", "s"]
_BOOLEAN_TYPES = [pa.bool_()]
_STRING_TYPES = [pa.string(), pa.large_string()]
_DATE_TYPES = [pa.date32(), pa.date64()]
_BINARY_TYPES = [pa.binary(), pa.large_binary()]

PYARROW_HOPSWORKS_DTYPE_MAPPING = {
**dict.fromkeys(_INT_TYPES, "int"),
**dict.fromkeys(_BIG_INT_TYPES, "bigint"),
**dict.fromkeys(_FLOAT_TYPES, "float"),
**dict.fromkeys(_DOUBLE_TYPES, "double"),
**dict.fromkeys(
[
*[pa.timestamp(unit) for unit in _TIMESTAMP_UNIT],
*[
pa.timestamp(unit, tz=tz)
for unit in _TIMESTAMP_UNIT
for tz in pytz.all_timezones
],
],
"timestamp",
),
**dict.fromkeys(_BOOLEAN_TYPES, "boolean"),
**dict.fromkeys(
[
*_STRING_TYPES,
# Category type in pandas stored as dictinoary in pyarrow
*[
pa.dictionary(
value_type=value_type, index_type=index_type, ordered=ordered
)
for value_type in _STRING_TYPES
for index_type in _INT_TYPES + _BIG_INT_TYPES
for ordered in [True, False]
],
],
"string",
),
**dict.fromkeys(_DATE_TYPES, "date"),
**dict.fromkeys(_BINARY_TYPES, "binary"),
}
else:
PYARROW_HOPSWORKS_DTYPE_MAPPING = {}

# python cast column to offline type
if HAS_POLARS:
import polars as pl

polars_offline_dtype_mapping = {
"bigint": pl.Int64,
"int": pl.Int32,
"smallint": pl.Int16,
"tinyint": pl.Int8,
"float": pl.Float32,
"double": pl.Float64,
}

_polars_online_dtype_mapping = {
"bigint": pl.Int64,
"int": pl.Int32,
"smallint": pl.Int16,
"tinyint": pl.Int8,
"float": pl.Float32,
"double": pl.Float64,
}

if HAS_PANDAS:
import numpy as np
import pandas as pd

pandas_offline_dtype_mapping = {
"bigint": pd.Int64Dtype(),
"int": pd.Int32Dtype(),
"smallint": pd.Int16Dtype(),
"tinyint": pd.Int8Dtype(),
"float": np.dtype("float32"),
"double": np.dtype("float64"),
}

pandas_online_dtype_mapping = {
"bigint": pd.Int64Dtype(),
"int": pd.Int32Dtype(),
"smallint": pd.Int16Dtype(),
"tinyint": pd.Int8Dtype(),
"float": np.dtype("float32"),
"double": np.dtype("float64"),
}


def convert_pandas_dtype_to_offline_type(arrow_type: str) -> str:
# This is a simple type conversion between pandas dtypes and pyspark (hive) types,
# using pyarrow types obatined from pandas dataframe to convert pandas typed fields,
# A recurisive function "convert_pandas_object_type_to_offline_type" is used to convert complex types like lists and structures
# "_onvert_simple_pandas_dtype_to_offline_type" is used to convert simple types
# In the backend, the types specified here will also be used for mapping to Avro types.
if (
pa.types.is_list(arrow_type)
or pa.types.is_large_list(arrow_type)
or pa.types.is_struct(arrow_type)
):
return convert_pandas_object_type_to_offline_type(arrow_type)

return convert_simple_pandas_dtype_to_offline_type(arrow_type)


def convert_pandas_object_type_to_offline_type(arrow_type: str) -> str:
if pa.types.is_list(arrow_type) or pa.types.is_large_list(arrow_type):
# figure out sub type
sub_arrow_type = arrow_type.value_type
subtype = convert_pandas_dtype_to_offline_type(sub_arrow_type)
return "array<{}>".format(subtype)
if pa.types.is_struct(arrow_type):
struct_schema = {}
for index in range(arrow_type.num_fields):
struct_schema[arrow_type.field(index).name] = (
convert_pandas_dtype_to_offline_type(arrow_type.field(index).type)
)
return (
"struct<"
+ ",".join([f"{key}:{value}" for key, value in struct_schema.items()])
+ ">"
)

raise ValueError(f"dtype 'O' (arrow_type '{str(arrow_type)}') not supported")


def cast_pandas_column_to_offline_type(
feature_column: pd.Series, offline_type: str
) -> pd.Series:
offline_type = offline_type.lower()
if offline_type == "timestamp":
return pd.to_datetime(feature_column, utc=True).dt.tz_localize(None)
elif offline_type == "date":
return pd.to_datetime(feature_column, utc=True).dt.date
elif (
offline_type.startswith("array<")
or offline_type.startswith("struct<")
or offline_type == "boolean"
):
return feature_column.apply(
lambda x: (ast.literal_eval(x) if isinstance(x, str) else x)
if (x is not None and x != "")
else None
)
elif offline_type == "string":
return feature_column.apply(lambda x: str(x) if x is not None else None)
elif offline_type.startswith("decimal"):
return feature_column.apply(
lambda x: decimal.Decimal(x) if (x is not None) else None
)
else:
if offline_type in pandas_offline_dtype_mapping:
return feature_column.astype(pandas_offline_dtype_mapping[offline_type])
else:
return feature_column # handle gracefully, just return the column as-is


def cast_polars_column_to_offline_type(
feature_column: pl.Series, offline_type: str
) -> pl.Series:
offline_type = offline_type.lower()
if offline_type == "timestamp":
# convert (if tz!=UTC) to utc, then make timezone unaware
return feature_column.cast(pl.Datetime(time_zone=None))
elif offline_type == "date":
return feature_column.cast(pl.Date)
elif (
offline_type.startswith("array<")
or offline_type.startswith("struct<")
or offline_type == "boolean"
):
return feature_column.map_elements(
lambda x: (ast.literal_eval(x) if isinstance(x, str) else x)
if (x is not None and x != "")
else None
)
elif offline_type == "string":
return feature_column.map_elements(lambda x: str(x) if x is not None else None)
elif offline_type.startswith("decimal"):
return feature_column.map_elements(
lambda x: decimal.Decimal(x) if (x is not None) else None
)
else:
if offline_type in polars_offline_dtype_mapping:
return feature_column.cast(polars_offline_dtype_mapping[offline_type])
else:
return feature_column # handle gracefully, just return the column as-is


def cast_column_to_offline_type(
feature_column: Union[pd.Series, pl.Series], offline_type: str
) -> pd.Series:
if isinstance(feature_column, pd.Series):
return cast_pandas_column_to_offline_type(feature_column, offline_type.lower())
elif isinstance(feature_column, pl.Series):
return cast_polars_column_to_offline_type(feature_column, offline_type.lower())


def cast_column_to_online_type(
feature_column: pd.Series, online_type: str
) -> pd.Series:
online_type = online_type.lower()
if online_type == "timestamp":
# convert (if tz!=UTC) to utc, then make timezone unaware
return pd.to_datetime(feature_column, utc=True).dt.tz_localize(None)
elif online_type == "date":
return pd.to_datetime(feature_column, utc=True).dt.date
elif online_type.startswith("varchar") or online_type == "text":
return feature_column.apply(lambda x: str(x) if x is not None else None)
elif online_type == "boolean":
return feature_column.apply(
lambda x: (ast.literal_eval(x) if isinstance(x, str) else x)
if (x is not None and x != "")
else None
)
elif online_type.startswith("decimal"):
return feature_column.apply(
lambda x: decimal.Decimal(x) if (x is not None) else None
)
else:
if online_type in pandas_online_dtype_mapping:
casted_feature = feature_column.astype(
pandas_online_dtype_mapping[online_type]
)
return casted_feature
else:
return feature_column # handle gracefully, just return the column as-is


def convert_simple_pandas_dtype_to_offline_type(arrow_type: str) -> str:
try:
return PYARROW_HOPSWORKS_DTYPE_MAPPING[arrow_type]
except KeyError as err:
raise ValueError(f"dtype '{arrow_type}' not supported") from err


def translate_legacy_spark_type(
output_type: str,
) -> Literal[
"STRING",
"BINARY",
"BYTE",
"SHORT",
"INT",
"LONG",
"FLOAT",
"DOUBLE",
"TIMESTAMP",
"DATE",
"BOOLEAN",
]:
if output_type == "StringType()":
return "STRING"
elif output_type == "BinaryType()":
return "BINARY"
elif output_type == "ByteType()":
return "BYTE"
elif output_type == "ShortType()":
return "SHORT"
elif output_type == "IntegerType()":
return "INT"
elif output_type == "LongType()":
return "LONG"
elif output_type == "FloatType()":
return "FLOAT"
elif output_type == "DoubleType()":
return "DOUBLE"
elif output_type == "TimestampType()":
return "TIMESTAMP"
elif output_type == "DateType()":
return "DATE"
elif output_type == "BooleanType()":
return "BOOLEAN"
else:
return "STRING" # handle gracefully, and return STRING type, the default for spark udfs


def convert_spark_type_to_offline_type(spark_type_string: str) -> str:
if spark_type_string.endswith("Type()"):
spark_type_string = translate_legacy_spark_type(spark_type_string)
if spark_type_string == "STRING":
return "STRING"
elif spark_type_string == "BINARY":
return "BINARY"
elif spark_type_string == "BYTE":
return "INT"
elif spark_type_string == "SHORT":
return "INT"
elif spark_type_string == "INT":
return "INT"
elif spark_type_string == "LONG":
return "BIGINT"
elif spark_type_string == "FLOAT":
return "FLOAT"
elif spark_type_string == "DOUBLE":
return "DOUBLE"
elif spark_type_string == "TIMESTAMP":
return "TIMESTAMP"
elif spark_type_string == "DATE":
return "DATE"
elif spark_type_string == "BOOLEAN":
return "BOOLEAN"
else:
raise ValueError(
f"Return type {spark_type_string} not supported for transformation functions."
)


def infer_spark_type(output_type):
if not output_type:
return "STRING" # STRING is default type for spark udfs

if isinstance(output_type, str):
if output_type.endswith("Type()"):
return translate_legacy_spark_type(output_type)
output_type = output_type.lower()

if output_type in (str, "str", "string"):
return "STRING"
elif output_type in (bytes, "binary"):
return "BINARY"
elif output_type in (np.int8, "int8", "byte", "tinyint"):
return "BYTE"
elif output_type in (np.int16, "int16", "short", "smallint"):
return "SHORT"
elif output_type in (int, "int", "integer", np.int32):
return "INT"
elif output_type in (np.int64, "int64", "long", "bigint"):
return "LONG"
elif output_type in (float, "float"):
return "FLOAT"
elif output_type in (np.float64, "float64", "double"):
return "DOUBLE"
elif output_type in (
datetime.datetime,
np.datetime64,
"datetime",
"timestamp",
):
return "TIMESTAMP"
elif output_type in (datetime.date, "date"):
return "DATE"
elif output_type in (bool, "boolean", "bool"):
return "BOOLEAN"
else:
raise TypeError("Not supported type %s." % output_type)
291 changes: 22 additions & 269 deletions python/hsfs/engine/python.py
Original file line number Diff line number Diff line change
@@ -15,8 +15,6 @@
#
from __future__ import annotations

import ast
import decimal
import json
import math
import numbers
@@ -40,6 +38,11 @@
Union,
)

from hsfs.core.type_systems import (
cast_column_to_offline_type,
cast_column_to_online_type,
)


if TYPE_CHECKING:
import great_expectations
@@ -50,7 +53,6 @@
import pandas as pd
import polars as pl
import pyarrow as pa
import pytz
from botocore.response import StreamingBody
from hsfs import (
client,
@@ -78,7 +80,13 @@
training_dataset_job_conf,
transformation_function_engine,
)
from hsfs.core.constants import HAS_AIOMYSQL, HAS_GREAT_EXPECTATIONS, HAS_SQLALCHEMY
from hsfs.core.constants import (
HAS_AIOMYSQL,
HAS_ARROW,
HAS_GREAT_EXPECTATIONS,
HAS_PANDAS,
HAS_SQLALCHEMY,
)
from hsfs.core.feature_view_engine import FeatureViewEngine
from hsfs.core.vector_db_client import VectorDbClient
from hsfs.decorators import uses_great_expectations
@@ -91,71 +99,16 @@
if HAS_GREAT_EXPECTATIONS:
import great_expectations

if HAS_ARROW:
from hsfs.core.type_systems import PYARROW_HOPSWORKS_DTYPE_MAPPING
if HAS_AIOMYSQL and HAS_SQLALCHEMY:
from hsfs.core import util_sql

if HAS_SQLALCHEMY:
from sqlalchemy import sql


PYARROW_EXTENSION_ENABLE = False
try:
import pandas as pd
from packaging.version import Version

if Version(pd.__version__) > Version("2.0"):
PYARROW_EXTENSION_ENABLE = True
else:
PYARROW_EXTENSION_ENABLE = False
except Exception:
PYARROW_EXTENSION_ENABLE = False # Set PYARROW_EXTENSION_ENABLE to false if pyarrow or pandas cannot be imported

# Decimal types are currently not supported
_INT_TYPES = [pa.uint8(), pa.uint16(), pa.int8(), pa.int16(), pa.int32()]
_BIG_INT_TYPES = [pa.uint32(), pa.int64()]
_FLOAT_TYPES = [pa.float16(), pa.float32()]
_DOUBLE_TYPES = [pa.float64()]
_TIMESTAMP_UNIT = ["ns", "us", "ms", "s"]
_BOOLEAN_TYPES = [pa.bool_()]
_STRING_TYPES = [pa.string(), pa.large_string()]
_DATE_TYPES = [pa.date32(), pa.date64()]
_BINARY_TYPES = [pa.binary(), pa.large_binary()]

PYARROW_HOPSWORKS_DTYPE_MAPPING = {
**dict.fromkeys(_INT_TYPES, "int"),
**dict.fromkeys(_BIG_INT_TYPES, "bigint"),
**dict.fromkeys(_FLOAT_TYPES, "float"),
**dict.fromkeys(_DOUBLE_TYPES, "double"),
**dict.fromkeys(
[
*[pa.timestamp(unit) for unit in _TIMESTAMP_UNIT],
*[
pa.timestamp(unit, tz=tz)
for unit in _TIMESTAMP_UNIT
for tz in pytz.all_timezones
],
],
"timestamp",
),
**dict.fromkeys(_BOOLEAN_TYPES, "boolean"),
**dict.fromkeys(
[
*_STRING_TYPES,
# Category type in pandas stored as dictinoary in pyarrow
*[
pa.dictionary(
value_type=value_type, index_type=index_type, ordered=ordered
)
for value_type in _STRING_TYPES
for index_type in _INT_TYPES + _BIG_INT_TYPES
for ordered in [True, False]
],
],
"string",
),
**dict.fromkeys(_DATE_TYPES, "date"),
**dict.fromkeys(_BINARY_TYPES, "binary"),
}
if HAS_PANDAS:
from hsfs.core.type_systems import convert_pandas_dtype_to_offline_type


class Engine:
@@ -792,7 +745,7 @@ def parse_schema_feature_group(
for feat_name in arrow_schema.names:
name = util.autofix_feature_name(feat_name)
try:
converted_type = self._convert_pandas_dtype_to_offline_type(
converted_type = convert_pandas_dtype_to_offline_type(
arrow_schema.field(feat_name).type
)
except ValueError as e:
@@ -826,7 +779,7 @@ def save_dataframe(
)

if (
isinstance(feature_group, ExternalFeatureGroup)
hasattr(feature_group, "EXTERNAL_FEATURE_GROUP")
and feature_group.online_enabled
) or feature_group.stream:
return self._write_dataframe_kafka(
@@ -1273,7 +1226,7 @@ def _apply_transformation_function(
dataset, pl.dataframe.frame.DataFrame
):
# Converting polars dataframe to pandas because currently we support only pandas UDF's as transformation functions.
if PYARROW_EXTENSION_ENABLE:
if HAS_ARROW:
dataset = dataset.to_pandas(
use_pyarrow_extension_array=True
) # Zero copy if pyarrow extension can be used.
@@ -1433,215 +1386,15 @@ def _write_dataframe_kafka(
)
return feature_group.materialization_job

@staticmethod
def _convert_pandas_dtype_to_offline_type(arrow_type: str) -> str:
# This is a simple type conversion between pandas dtypes and pyspark (hive) types,
# using pyarrow types obatined from pandas dataframe to convert pandas typed fields,
# A recurisive function "_convert_pandas_object_type_to_offline_type" is used to convert complex types like lists and structures
# "_convert_simple_pandas_dtype_to_offline_type" is used to convert simple types
# In the backend, the types specified here will also be used for mapping to Avro types.

if (
pa.types.is_list(arrow_type)
or pa.types.is_large_list(arrow_type)
or pa.types.is_struct(arrow_type)
):
return Engine._convert_pandas_object_type_to_offline_type(arrow_type)

return Engine._convert_simple_pandas_dtype_to_offline_type(arrow_type)

@staticmethod
def convert_spark_type_to_offline_type(spark_type_string: str) -> str:
if spark_type_string.endswith("Type()"):
spark_type_string = util.translate_legacy_spark_type(spark_type_string)
if spark_type_string == "STRING":
return "STRING"
elif spark_type_string == "BINARY":
return "BINARY"
elif spark_type_string == "BYTE":
return "INT"
elif spark_type_string == "SHORT":
return "INT"
elif spark_type_string == "INT":
return "INT"
elif spark_type_string == "LONG":
return "BIGINT"
elif spark_type_string == "FLOAT":
return "FLOAT"
elif spark_type_string == "DOUBLE":
return "DOUBLE"
elif spark_type_string == "TIMESTAMP":
return "TIMESTAMP"
elif spark_type_string == "DATE":
return "DATE"
elif spark_type_string == "BOOLEAN":
return "BOOLEAN"
else:
raise ValueError(
f"Return type {spark_type_string} not supported for transformation functions."
)

@staticmethod
def _convert_simple_pandas_dtype_to_offline_type(arrow_type: str) -> str:
try:
return PYARROW_HOPSWORKS_DTYPE_MAPPING[arrow_type]
except KeyError as err:
raise ValueError(f"dtype '{arrow_type}' not supported") from err

@staticmethod
def _convert_pandas_object_type_to_offline_type(arrow_type: str) -> str:
if pa.types.is_list(arrow_type) or pa.types.is_large_list(arrow_type):
# figure out sub type
sub_arrow_type = arrow_type.value_type
subtype = Engine._convert_pandas_dtype_to_offline_type(sub_arrow_type)
return "array<{}>".format(subtype)
if pa.types.is_struct(arrow_type):
struct_schema = {}
for index in range(arrow_type.num_fields):
struct_schema[arrow_type.field(index).name] = (
Engine._convert_pandas_dtype_to_offline_type(
arrow_type.field(index).type
)
)
return (
"struct<"
+ ",".join([f"{key}:{value}" for key, value in struct_schema.items()])
+ ">"
)

raise ValueError(f"dtype 'O' (arrow_type '{str(arrow_type)}') not supported")

@staticmethod
def _cast_column_to_offline_type(
feature_column: pd.Series, offline_type: str
) -> pd.Series:
offline_type = offline_type.lower()
if offline_type == "timestamp":
# convert (if tz!=UTC) to utc, then make timezone unaware
if isinstance(feature_column, pl.Series):
return feature_column.cast(pl.Datetime(time_zone=None))
else:
return pd.to_datetime(feature_column, utc=True).dt.tz_localize(None)
elif offline_type == "date":
if isinstance(feature_column, pl.Series):
return feature_column.cast(pl.Date)
else:
return pd.to_datetime(feature_column, utc=True).dt.date
elif (
offline_type.startswith("array<")
or offline_type.startswith("struct<")
or offline_type == "boolean"
):
if isinstance(feature_column, pl.Series):
return feature_column.map_elements(
lambda x: (ast.literal_eval(x) if isinstance(x, str) else x)
if (x is not None and x != "")
else None
)
else:
return feature_column.apply(
lambda x: (ast.literal_eval(x) if isinstance(x, str) else x)
if (x is not None and x != "")
else None
)
elif offline_type == "string":
if isinstance(feature_column, pl.Series):
return feature_column.map_elements(
lambda x: str(x) if x is not None else None
)
else:
return feature_column.apply(lambda x: str(x) if x is not None else None)
elif offline_type.startswith("decimal"):
if isinstance(feature_column, pl.Series):
return feature_column.map_elements(
lambda x: decimal.Decimal(x) if (x is not None) else None
)
else:
return feature_column.apply(
lambda x: decimal.Decimal(x) if (x is not None) else None
)
else:
if isinstance(feature_column, pl.Series):
offline_dtype_mapping = {
"bigint": pl.Int64,
"int": pl.Int32,
"smallint": pl.Int16,
"tinyint": pl.Int8,
"float": pl.Float32,
"double": pl.Float64,
}
else:
offline_dtype_mapping = {
"bigint": pd.Int64Dtype(),
"int": pd.Int32Dtype(),
"smallint": pd.Int16Dtype(),
"tinyint": pd.Int8Dtype(),
"float": np.dtype("float32"),
"double": np.dtype("float64"),
}
if offline_type in offline_dtype_mapping:
if isinstance(feature_column, pl.Series):
casted_feature = feature_column.cast(
offline_dtype_mapping[offline_type]
)
else:
casted_feature = feature_column.astype(
offline_dtype_mapping[offline_type]
)
return casted_feature
else:
return feature_column # handle gracefully, just return the column as-is

@staticmethod
def _cast_column_to_online_type(
feature_column: pd.Series, online_type: str
) -> pd.Series:
online_type = online_type.lower()
if online_type == "timestamp":
# convert (if tz!=UTC) to utc, then make timezone unaware
return pd.to_datetime(feature_column, utc=True).dt.tz_localize(None)
elif online_type == "date":
return pd.to_datetime(feature_column, utc=True).dt.date
elif online_type.startswith("varchar") or online_type == "text":
return feature_column.apply(lambda x: str(x) if x is not None else None)
elif online_type == "boolean":
return feature_column.apply(
lambda x: (ast.literal_eval(x) if isinstance(x, str) else x)
if (x is not None and x != "")
else None
)
elif online_type.startswith("decimal"):
return feature_column.apply(
lambda x: decimal.Decimal(x) if (x is not None) else None
)
else:
online_dtype_mapping = {
"bigint": pd.Int64Dtype(),
"int": pd.Int32Dtype(),
"smallint": pd.Int16Dtype(),
"tinyint": pd.Int8Dtype(),
"float": np.dtype("float32"),
"double": np.dtype("float64"),
}
if online_type in online_dtype_mapping:
casted_feature = feature_column.astype(
online_dtype_mapping[online_type]
)
return casted_feature
else:
return feature_column # handle gracefully, just return the column as-is

@staticmethod
def cast_columns(
df: pd.DataFrame, schema: List["feature.Feature"], online: bool = False
df: pd.DataFrame, schema: List[feature.Feature], online: bool = False
) -> pd.DataFrame:
for _feat in schema:
if not online:
df[_feat.name] = Engine._cast_column_to_offline_type(
df[_feat.name], _feat.type
)
df[_feat.name] = cast_column_to_offline_type(df[_feat.name], _feat.type)
else:
df[_feat.name] = Engine._cast_column_to_online_type(
df[_feat.name] = cast_column_to_online_type(
df[_feat.name], _feat.online_type
)
return df
4 changes: 2 additions & 2 deletions python/hsfs/engine/spark.py
Original file line number Diff line number Diff line change
@@ -1157,7 +1157,7 @@ def parse_schema_feature_group(
for feat in dataframe.schema:
name = util.autofix_feature_name(feat.name)
try:
converted_type = Engine.convert_spark_type_to_offline_type(
converted_type = Engine._convert_spark_type_to_offline_type(
feat.dataType, using_hudi
)
except ValueError as e:
@@ -1396,7 +1396,7 @@ def get_unique_values(feature_dataframe, feature_name):
return [field[feature_name] for field in unique_values]

@staticmethod
def convert_spark_type_to_offline_type(spark_type, using_hudi):
def _convert_spark_type_to_offline_type(spark_type, using_hudi):
# The HiveSyncTool is strict and does not support schema evolution from tinyint/short to
# int. Avro, on the other hand, does not support tinyint/short and delivers them as int
# to Hive. Therefore, we need to force Hive to create int-typed columns in the first place.
41 changes: 0 additions & 41 deletions python/hsfs/util.py
Original file line number Diff line number Diff line change
@@ -354,47 +354,6 @@ def get_job_url(href: str) -> str:
return ui_url.geturl()


def translate_legacy_spark_type(
output_type: str,
) -> Literal[
"STRING",
"BINARY",
"BYTE",
"SHORT",
"INT",
"LONG",
"FLOAT",
"DOUBLE",
"TIMESTAMP",
"DATE",
"BOOLEAN",
]:
if output_type == "StringType()":
return "STRING"
elif output_type == "BinaryType()":
return "BINARY"
elif output_type == "ByteType()":
return "BYTE"
elif output_type == "ShortType()":
return "SHORT"
elif output_type == "IntegerType()":
return "INT"
elif output_type == "LongType()":
return "LONG"
elif output_type == "FloatType()":
return "FLOAT"
elif output_type == "DoubleType()":
return "DOUBLE"
elif output_type == "TimestampType()":
return "TIMESTAMP"
elif output_type == "DateType()":
return "DATE"
elif output_type == "BooleanType()":
return "BOOLEAN"
else:
return "STRING" # handle gracefully, and return STRING type, the default for spark udfs


def _loading_animation(message: str, stop_event: threading.Event) -> None:
for char in itertools.cycle([".", "..", "...", ""]):
if stop_event.is_set():
749 changes: 749 additions & 0 deletions python/tests/core/test_type_systems.py

Large diffs are not rendered by default.

621 changes: 2 additions & 619 deletions python/tests/engine/test_python.py

Large diffs are not rendered by default.

36 changes: 18 additions & 18 deletions python/tests/engine/test_spark.py
Original file line number Diff line number Diff line change
@@ -3867,7 +3867,7 @@ def test_read_options_provided_options_tsv(self):
def test_parse_schema_feature_group(self, mocker):
# Arrange
mock_spark_engine_convert_spark_type = mocker.patch(
"hsfs.engine.spark.Engine.convert_spark_type_to_offline_type"
"hsfs.engine.spark.Engine._convert_spark_type_to_offline_type"
)

spark_engine = spark.Engine()
@@ -3892,7 +3892,7 @@ def test_parse_schema_feature_group(self, mocker):
def test_parse_schema_feature_group_hudi(self, mocker):
# Arrange
mock_spark_engine_convert_spark_type = mocker.patch(
"hsfs.engine.spark.Engine.convert_spark_type_to_offline_type"
"hsfs.engine.spark.Engine._convert_spark_type_to_offline_type"
)

spark_engine = spark.Engine()
@@ -3917,7 +3917,7 @@ def test_parse_schema_feature_group_hudi(self, mocker):
def test_parse_schema_feature_group_value_error(self, mocker):
# Arrange
mock_spark_engine_convert_spark_type = mocker.patch(
"hsfs.engine.spark.Engine.convert_spark_type_to_offline_type"
"hsfs.engine.spark.Engine._convert_spark_type_to_offline_type"
)

spark_engine = spark.Engine()
@@ -3964,7 +3964,7 @@ def test_convert_spark_type(self):
spark_engine = spark.Engine()

# Act
result = spark_engine.convert_spark_type_to_offline_type(
result = spark_engine._convert_spark_type_to_offline_type(
spark_type=IntegerType(),
using_hudi=False,
)
@@ -4043,7 +4043,7 @@ def test_convert_spark_type_using_hudi_byte_type(self):
spark_engine = spark.Engine()

# Act
result = spark_engine.convert_spark_type_to_offline_type(
result = spark_engine._convert_spark_type_to_offline_type(
spark_type=ByteType(),
using_hudi=True,
)
@@ -4056,7 +4056,7 @@ def test_convert_spark_type_using_hudi_short_type(self):
spark_engine = spark.Engine()

# Act
result = spark_engine.convert_spark_type_to_offline_type(
result = spark_engine._convert_spark_type_to_offline_type(
spark_type=ShortType(),
using_hudi=True,
)
@@ -4069,7 +4069,7 @@ def test_convert_spark_type_using_hudi_bool_type(self):
spark_engine = spark.Engine()

# Act
result = spark_engine.convert_spark_type_to_offline_type(
result = spark_engine._convert_spark_type_to_offline_type(
spark_type=BooleanType(),
using_hudi=True,
)
@@ -4082,7 +4082,7 @@ def test_convert_spark_type_using_hudi_int_type(self):
spark_engine = spark.Engine()

# Act
result = spark_engine.convert_spark_type_to_offline_type(
result = spark_engine._convert_spark_type_to_offline_type(
spark_type=IntegerType(),
using_hudi=True,
)
@@ -4095,7 +4095,7 @@ def test_convert_spark_type_using_hudi_long_type(self):
spark_engine = spark.Engine()

# Act
result = spark_engine.convert_spark_type_to_offline_type(
result = spark_engine._convert_spark_type_to_offline_type(
spark_type=LongType(),
using_hudi=True,
)
@@ -4108,7 +4108,7 @@ def test_convert_spark_type_using_hudi_float_type(self):
spark_engine = spark.Engine()

# Act
result = spark_engine.convert_spark_type_to_offline_type(
result = spark_engine._convert_spark_type_to_offline_type(
spark_type=FloatType(),
using_hudi=True,
)
@@ -4121,7 +4121,7 @@ def test_convert_spark_type_using_hudi_double_type(self):
spark_engine = spark.Engine()

# Act
result = spark_engine.convert_spark_type_to_offline_type(
result = spark_engine._convert_spark_type_to_offline_type(
spark_type=DoubleType(),
using_hudi=True,
)
@@ -4134,7 +4134,7 @@ def test_convert_spark_type_using_hudi_decimal_type(self):
spark_engine = spark.Engine()

# Act
result = spark_engine.convert_spark_type_to_offline_type(
result = spark_engine._convert_spark_type_to_offline_type(
spark_type=DecimalType(),
using_hudi=True,
)
@@ -4147,7 +4147,7 @@ def test_convert_spark_type_using_hudi_timestamp_type(self):
spark_engine = spark.Engine()

# Act
result = spark_engine.convert_spark_type_to_offline_type(
result = spark_engine._convert_spark_type_to_offline_type(
spark_type=TimestampType(),
using_hudi=True,
)
@@ -4160,7 +4160,7 @@ def test_convert_spark_type_using_hudi_date_type(self):
spark_engine = spark.Engine()

# Act
result = spark_engine.convert_spark_type_to_offline_type(
result = spark_engine._convert_spark_type_to_offline_type(
spark_type=DateType(),
using_hudi=True,
)
@@ -4173,7 +4173,7 @@ def test_convert_spark_type_using_hudi_string_type(self):
spark_engine = spark.Engine()

# Act
result = spark_engine.convert_spark_type_to_offline_type(
result = spark_engine._convert_spark_type_to_offline_type(
spark_type=StringType(),
using_hudi=True,
)
@@ -4186,7 +4186,7 @@ def test_convert_spark_type_using_hudi_struct_type(self):
spark_engine = spark.Engine()

# Act
result = spark_engine.convert_spark_type_to_offline_type(
result = spark_engine._convert_spark_type_to_offline_type(
spark_type=StructType(),
using_hudi=True,
)
@@ -4199,7 +4199,7 @@ def test_convert_spark_type_using_hudi_binary_type(self):
spark_engine = spark.Engine()

# Act
result = spark_engine.convert_spark_type_to_offline_type(
result = spark_engine._convert_spark_type_to_offline_type(
spark_type=BinaryType(),
using_hudi=True,
)
@@ -4213,7 +4213,7 @@ def test_convert_spark_type_using_hudi_map_type(self):

# Act
with pytest.raises(ValueError) as e_info:
spark_engine.convert_spark_type_to_offline_type(
spark_engine._convert_spark_type_to_offline_type(
spark_type=MapType(StringType(), StringType()),
using_hudi=True,
)