Skip to content

Commit

Permalink
[FSTORE-1468] Make numpy optional (#338)
Browse files Browse the repository at this point in the history
* Remove dependency on numpy except from convert_to_default_dataframe

* Ruff

* Update pyproject.toml extras

* Fix

* Fix

* Attempt making numpy optional in convert_to_default_dataframe

* Address Manu's review
  • Loading branch information
aversey authored Oct 4, 2024
1 parent 147af3e commit a5c28e7
Show file tree
Hide file tree
Showing 18 changed files with 195 additions and 69 deletions.
1 change: 0 additions & 1 deletion locust_benchmark/create_feature_group.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from common.hopsworks_client import HopsworksClient

if __name__ == "__main__":

hopsworks_client = HopsworksClient()
fg = hopsworks_client.get_or_create_fg()
hopsworks_client.insert_data(fg)
Expand Down
17 changes: 17 additions & 0 deletions python/hopsworks_common/core/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@
# Avro
HAS_FAST_AVRO: bool = importlib.util.find_spec("fastavro") is not None
HAS_AVRO: bool = importlib.util.find_spec("avro") is not None
avro_not_installed_message = (
"Avro package not found. "
"If you want to use avro with Hopsworks you can install the corresponding extra via "
'`pip install "hopsworks[avro]"`. '
"You can also install avro directly in your environment with `pip install fastavro` or `pip install avro`. "
"You will need to restart your kernel if applicable."
)

# Confluent Kafka
HAS_CONFLUENT_KAFKA: bool = importlib.util.find_spec("confluent_kafka") is not None
Expand Down Expand Up @@ -55,7 +62,17 @@
)

HAS_PANDAS: bool = importlib.util.find_spec("pandas") is not None

# NumPy
HAS_NUMPY: bool = importlib.util.find_spec("numpy") is not None
numpy_not_installed_message = (
"Numpy package not found. "
"If you want to use numpy with Hopsworks you can install the corresponding extra via "
'`pip install "hopsworks[numpy]"`. '
"You can also install numpy directly in your environment with `pip install numpy`. "
"You will need to restart your kernel if applicable."
)

HAS_POLARS: bool = importlib.util.find_spec("polars") is not None
polars_not_installed_message = (
"Polars package not found. "
Expand Down
5 changes: 3 additions & 2 deletions python/hsfs/builtin_transformations.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
# limitations under the License.
#

import numpy as np
import math

import pandas as pd
from hsfs.hopsworks_udf import udf
from hsfs.transformation_statistics import TransformationStatistics
Expand Down Expand Up @@ -49,7 +50,7 @@ def label_encoder(feature: pd.Series, statistics=feature_statistics) -> pd.Serie
# Unknown categories not present in training dataset are encoded as -1.
return pd.Series(
[
value_to_index.get(data, -1) if not pd.isna(data) else np.nan
value_to_index.get(data, -1) if not pd.isna(data) else math.nan
for data in feature
]
)
Expand Down
6 changes: 5 additions & 1 deletion python/hsfs/constructor/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
from typing import Any, Dict, List, Optional, Tuple, TypeVar, Union

import humps
import numpy as np
import pandas as pd
from hopsworks_common.client.exceptions import FeatureStoreException
from hopsworks_common.core.constants import HAS_NUMPY
from hsfs import engine, storage_connector, util
from hsfs import feature_group as fg_mod
from hsfs.constructor import join
Expand All @@ -34,6 +34,10 @@
from hsfs.feature import Feature


if HAS_NUMPY:
import numpy as np


@typechecked
class Query:
ERROR_MESSAGE_FEATURE_AMBIGUOUS = (
Expand Down
6 changes: 5 additions & 1 deletion python/hsfs/core/feature_view_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
import warnings
from typing import Any, Dict, List, Optional, TypeVar, Union

import numpy as np
import pandas as pd
from hopsworks_common import client
from hopsworks_common.client.exceptions import FeatureStoreException
from hopsworks_common.core.constants import HAS_NUMPY
from hsfs import (
engine,
feature_group,
Expand All @@ -45,6 +45,10 @@
from hsfs.training_dataset_split import TrainingDatasetSplit


if HAS_NUMPY:
import numpy as np


class FeatureViewEngine:
ENTITY_TYPE = "featureview"
_TRAINING_DATA_API_PATH = "trainingdatasets"
Expand Down
7 changes: 5 additions & 2 deletions python/hsfs/core/kafka_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@
from io import BytesIO
from typing import TYPE_CHECKING, Any, Callable, Dict, Literal, Optional, Tuple, Union

import numpy as np
import pandas as pd
from hopsworks_common import client
from hopsworks_common.core.constants import HAS_NUMPY
from hsfs.core import storage_connector_api
from hsfs.core.constants import HAS_AVRO, HAS_CONFLUENT_KAFKA, HAS_FAST_AVRO
from tqdm import tqdm


if HAS_NUMPY:
import numpy as np

if HAS_CONFLUENT_KAFKA:
from confluent_kafka import Consumer, KafkaError, Producer, TopicPartition

Expand Down Expand Up @@ -202,7 +205,7 @@ def encode_row(complex_feature_writers, writer, row):
if isinstance(row, dict):
for k in row.keys():
# for avro to be able to serialize them, they need to be python data types
if isinstance(row[k], np.ndarray):
if HAS_NUMPY and isinstance(row[k], np.ndarray):
row[k] = row[k].tolist()
if isinstance(row[k], pd.Timestamp):
row[k] = row[k].to_pydatetime()
Expand Down
19 changes: 15 additions & 4 deletions python/hsfs/core/vector_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@
from io import BytesIO
from typing import Any, Callable, Dict, List, Literal, Optional, Set, Tuple, Union

import avro.io
import avro.schema
import numpy as np
import pandas as pd
from hopsworks_common import client
from hopsworks_common.core.constants import (
HAS_AVRO,
HAS_FAST_AVRO,
HAS_NUMPY,
HAS_POLARS,
avro_not_installed_message,
numpy_not_installed_message,
polars_not_installed_message,
)
from hsfs import (
Expand All @@ -52,9 +53,14 @@
)


if HAS_NUMPY:
import numpy as np

if HAS_FAST_AVRO:
from fastavro import schemaless_reader
else:
if HAS_AVRO:
import avro.io
import avro.schema
from avro.io import BinaryDecoder

if HAS_POLARS:
Expand Down Expand Up @@ -807,6 +813,8 @@ def handle_feature_vector_return_type(
return feature_vectorz
elif return_type.lower() == "numpy" and not inference_helper:
_logger.debug("Returning feature vector as numpy array")
if not HAS_NUMPY:
raise ModuleNotFoundError(numpy_not_installed_message)
return np.array(feature_vectorz)
# Only inference helper can return dict
elif return_type.lower() == "dict" and inference_helper:
Expand Down Expand Up @@ -1064,6 +1072,9 @@ def build_complex_feature_decoders(self) -> Dict[str, Callable]:
- deserialization of complex features from the online feature store
- conversion of string or int timestamps to datetime objects
"""
if not HAS_AVRO:
raise ModuleNotFoundError(avro_not_installed_message)

complex_feature_schemas = {
f.name: avro.io.DatumReader(
avro.schema.parse(
Expand Down
17 changes: 12 additions & 5 deletions python/hsfs/engine/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@

import boto3
import hsfs
import numpy as np
import pandas as pd
import pyarrow as pa
from botocore.response import StreamingBody
Expand Down Expand Up @@ -83,6 +82,7 @@
from hsfs.core.constants import (
HAS_AIOMYSQL,
HAS_GREAT_EXPECTATIONS,
HAS_NUMPY,
HAS_PANDAS,
HAS_PYARROW,
HAS_SQLALCHEMY,
Expand All @@ -98,6 +98,9 @@
if HAS_GREAT_EXPECTATIONS:
import great_expectations

if HAS_NUMPY:
import numpy as np

if HAS_AIOMYSQL and HAS_SQLALCHEMY:
from hsfs.core import util_sql

Expand Down Expand Up @@ -1464,11 +1467,13 @@ def _start_offline_materialization(offline_write_options: Dict[str, Any]) -> boo
def _convert_feature_log_to_df(feature_log, cols) -> pd.DataFrame:
if feature_log is None and cols:
return pd.DataFrame(columns=cols)
if not (
isinstance(feature_log, (list, np.ndarray, pd.DataFrame, pl.DataFrame))
if not (isinstance(feature_log, (list, pd.DataFrame, pl.DataFrame))) or (
HAS_NUMPY and isinstance(feature_log, np.ndarray)
):
raise ValueError(f"Type '{type(feature_log)}' not accepted")
if isinstance(feature_log, list) or isinstance(feature_log, np.ndarray):
if isinstance(feature_log, list) or (
HAS_NUMPY and isinstance(feature_log, np.ndarray)
):
Engine._validate_logging_list(feature_log, cols)
return pd.DataFrame(feature_log, columns=cols)
else:
Expand All @@ -1479,7 +1484,9 @@ def _convert_feature_log_to_df(feature_log, cols) -> pd.DataFrame:

@staticmethod
def _validate_logging_list(feature_log, cols):
if isinstance(feature_log[0], list) or isinstance(feature_log[0], np.ndarray):
if isinstance(feature_log[0], list) or (
HAS_NUMPY and isinstance(feature_log[0], np.ndarray)
):
provided_len = len(feature_log[0])
else:
provided_len = 1
Expand Down
130 changes: 96 additions & 34 deletions python/hsfs/engine/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,19 @@
from pyspark.rdd import RDD
from pyspark.sql import DataFrame

import numpy as np
import pandas as pd
import tzlocal
from hopsworks_common.core.constants import HAS_NUMPY, HAS_PANDAS
from hsfs.constructor import query

# in case importing in %%local
from hsfs.core.vector_db_client import VectorDbClient


if HAS_NUMPY:
import numpy as np


try:
import pyspark
from pyspark import SparkFiles
Expand Down Expand Up @@ -258,39 +262,11 @@ def _return_dataframe_type(self, dataframe, dataframe_type):

def convert_to_default_dataframe(self, dataframe):
if isinstance(dataframe, list):
dataframe = np.array(dataframe)

if isinstance(dataframe, np.ndarray):
if dataframe.ndim != 2:
raise TypeError(
"Cannot convert numpy array that do not have two dimensions to a dataframe. "
"The number of dimensions are: {}".format(dataframe.ndim)
)
num_cols = dataframe.shape[1]
dataframe_dict = {}
for n_col in list(range(num_cols)):
col_name = "col_" + str(n_col)
dataframe_dict[col_name] = dataframe[:, n_col]
dataframe = pd.DataFrame(dataframe_dict)

if isinstance(dataframe, pd.DataFrame):
# convert timestamps to current timezone
local_tz = tzlocal.get_localzone()
# make shallow copy so the original df does not get changed
dataframe_copy = dataframe.copy(deep=False)
for c in dataframe_copy.columns:
if isinstance(
dataframe_copy[c].dtype, pd.core.dtypes.dtypes.DatetimeTZDtype
):
# convert to utc timestamp
dataframe_copy[c] = dataframe_copy[c].dt.tz_convert(None)
if dataframe_copy[c].dtype == np.dtype("datetime64[ns]"):
# set the timezone to the client's timezone because that is
# what spark expects.
dataframe_copy[c] = dataframe_copy[c].dt.tz_localize(
str(local_tz), ambiguous="infer", nonexistent="shift_forward"
)
dataframe = self._spark_session.createDataFrame(dataframe_copy)
dataframe = self.convert_list_to_spark_dataframe(dataframe)
elif HAS_NUMPY and isinstance(dataframe, np.ndarray):
dataframe = self.convert_numpy_to_spark_dataframe(dataframe)
elif HAS_PANDAS and isinstance(dataframe, pd.DataFrame):
dataframe = self.convert_pandas_to_spark_dataframe(dataframe)
elif isinstance(dataframe, RDD):
dataframe = dataframe.toDF()

Expand Down Expand Up @@ -341,6 +317,92 @@ def convert_to_default_dataframe(self, dataframe):
)
)

@staticmethod
def utc_disguised_as_local(dt):
local_tz = tzlocal.get_localzone()
utc = timezone.utc
if not dt.tzinfo:
dt = dt.replace(tzinfo=utc)
return dt.astimezone(utc).replace(tzinfo=local_tz)

def convert_list_to_spark_dataframe(self, dataframe):
if HAS_NUMPY:
return self.convert_numpy_to_spark_dataframe(np.array(dataframe))
try:
dataframe[0][0]
except TypeError:
raise TypeError(
"Cannot convert a list that has less than two dimensions to a dataframe."
) from None
ok = False
try:
dataframe[0][0][0]
except TypeError:
ok = True
if not ok:
raise TypeError(
"Cannot convert a list that has more than two dimensions to a dataframe."
) from None
num_cols = len(dataframe[0])
if HAS_PANDAS:
dataframe_dict = {}
for n_col in range(num_cols):
c = "col_" + str(n_col)
dataframe_dict[c] = [dataframe[i][n_col] for i in range(len(dataframe))]
return self.convert_pandas_to_spark_dataframe(pd.DataFrame(dataframe_dict))
for i in range(len(dataframe)):
dataframe[i] = [
self.utc_disguised_as_local(d) if isinstance(d, datetime) else d
for d in dataframe[i]
]
return self._spark_session.createDataFrame(
dataframe, ["col_" + str(n) for n in range(num_cols)]
)

def convert_numpy_to_spark_dataframe(self, dataframe):
if dataframe.ndim != 2:
raise TypeError(
"Cannot convert numpy array that do not have two dimensions to a dataframe. "
"The number of dimensions are: {}".format(dataframe.ndim)
)
num_cols = dataframe.shape[1]
if HAS_PANDAS:
dataframe_dict = {}
for n_col in range(num_cols):
c = "col_" + str(n_col)
dataframe_dict[c] = dataframe[:, n_col]
return self.convert_pandas_to_spark_dataframe(pd.DataFrame(dataframe_dict))
# convert timestamps to current timezone
for n_col in range(num_cols):
if dataframe[:, n_col].dtype == np.dtype("datetime64[ns]"):
# set the timezone to the client's timezone because that is
# what spark expects.
dataframe[:, n_col] = np.array(
[self.utc_disguised_as_local(d.item()) for d in dataframe[:, n_col]]
)
return self._spark_session.createDataFrame(
dataframe.tolist(), ["col_" + str(n) for n in range(num_cols)]
)

def convert_pandas_to_spark_dataframe(self, dataframe):
# convert timestamps to current timezone
local_tz = tzlocal.get_localzone()
# make shallow copy so the original df does not get changed
dataframe_copy = dataframe.copy(deep=False)
for c in dataframe_copy.columns:
if isinstance(
dataframe_copy[c].dtype, pd.core.dtypes.dtypes.DatetimeTZDtype
):
# convert to utc timestamp
dataframe_copy[c] = dataframe_copy[c].dt.tz_convert(None)
if HAS_NUMPY and dataframe_copy[c].dtype == np.dtype("datetime64[ns]"):
# set the timezone to the client's timezone because that is
# what spark expects.
dataframe_copy[c] = dataframe_copy[c].dt.tz_localize(
str(local_tz), ambiguous="infer", nonexistent="shift_forward"
)
return self._spark_session.createDataFrame(dataframe_copy)

def save_dataframe(
self,
feature_group,
Expand Down
Loading

0 comments on commit a5c28e7

Please sign in to comment.