Skip to content

Commit

Permalink
[FSTORE-1626] Spark client without hopsfs (logicalclocks#419)
Browse files Browse the repository at this point in the history
  • Loading branch information
bubriks committed Dec 9, 2024
1 parent c9d6d74 commit 37c17c4
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 14 deletions.
15 changes: 7 additions & 8 deletions python/hopsworks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def login(
api_key_file: str = None,
hostname_verification: bool = False,
trust_store_path: str = None,
engine: Union[None, Literal["spark"], Literal["python"], Literal["training"]] = None,
engine: Union[None, Literal["spark"], Literal["python"], Literal["training"], Literal["spark-no-metastore"], Literal["spark-delta"]] = None,
) -> project.Project:
"""Connect to [Serverless Hopsworks](https://app.hopsworks.ai) by calling the `hopsworks.login()` function with no arguments.
Expand Down Expand Up @@ -124,13 +124,12 @@ def login(
api_key_file: Path to file wih Api Key
hostname_verification: Whether to verify Hopsworks' certificate
trust_store_path: Path on the file system containing the Hopsworks certificates
engine: Which engine to use, `"spark"`, `"python"` or `"training"`. Defaults to `None`,
which initializes the engine to Spark if the environment provides Spark, for
example on Hopsworks and Databricks, or falls back to Python if Spark is not
available, e.g. on local Python environments or AWS SageMaker. This option
allows you to override this behaviour. `"training"` engine is useful when only
feature store metadata is needed, for example training dataset location and label
information when Hopsworks training experiment is conducted.
engine: Specifies the engine to use. Possible options are "spark", "python", "training", "spark-no-metastore", or "spark-delta". The default value is None, which automatically selects the engine based on the environment:
"spark": Used if Spark is available, such as in Hopsworks or Databricks environments.
"python": Used in local Python environments or AWS SageMaker when Spark is not available.
"training": Used when only feature store metadata is needed, such as for obtaining training dataset locations and label information during Hopsworks training experiments.
"spark-no-metastore": Functions like "spark" but does not rely on the Hive metastore.
"spark-delta": Minimizes dependencies further by avoiding both Hive metastore and HopsFS.
# Returns
`Project`: The Project object to perform operations on
# Raises
Expand Down
9 changes: 9 additions & 0 deletions python/hopsworks_common/client/external.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,15 @@ def provide_project(self, project):
for conf_key, conf_value in configuration_dict.items():
_spark_session._jsc.hadoopConfiguration().set(conf_key, conf_value)

elif self._engine == "spark-delta":
_logger.debug(
"Running in Spark environment with no metastore and hopsfs, initializing Spark session"
)
_spark_session = SparkSession.builder \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()

hopsworks_common.client.get_connection()._provide_project()

def download_certs(self):
Expand Down
8 changes: 4 additions & 4 deletions python/hopsworks_common/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ class Connection:
defaults to the project from where the client is run from.
Defaults to `None`.
engine: Specifies the engine to use. Possible options are "spark", "python", "training", "spark-no-metastore", or "spark-delta". The default value is None, which automatically selects the engine based on the environment:
"spark": Used if Spark is available and the connection is not to serverless Hopsworks, such as in Hopsworks or Databricks environments.
"python": Used in local Python environments or AWS SageMaker when Spark is not available or the connection is done to serverless Hopsworks.
"spark": Used if Spark is available, such as in Hopsworks or Databricks environments.
"python": Used in local Python environments or AWS SageMaker when Spark is not available.
"training": Used when only feature store metadata is needed, such as for obtaining training dataset locations and label information during Hopsworks training experiments.
"spark-no-metastore": Functions like "spark" but does not rely on the Hive metastore.
"spark-delta": Minimizes dependencies further by avoiding both Hive metastore and HopsFS.
Expand Down Expand Up @@ -361,7 +361,7 @@ def connect(self) -> None:
else:
raise ConnectionError(
"Engine you are trying to initialize is unknown. "
"Supported engines are `'spark'`, `'python'` and `'training'`."
"Supported engines are `'spark'`, `'python'`, `'training'`, `'spark-no-metastore'`, and `'spark-delta'`."
)

# init client
Expand Down Expand Up @@ -518,7 +518,7 @@ def connection(
project: The name of the project to connect to. When running on Hopsworks, this
defaults to the project from where the client is run from.
Defaults to `None`.
engine: Which engine to use, `"spark"`, `"python"` or `"training"`. Defaults to `None`,
engine: Which engine to use, `"spark"`, `"python"`, `"training"`, `"spark-no-metastore"` or `"spark-delta"`. Defaults to `None`,
which initializes the engine to Spark if the environment provides Spark, for
example on Hopsworks and Databricks, or falls back to Python if Spark is not
available, e.g. on local Python environments or AWS SageMaker. This option
Expand Down
2 changes: 1 addition & 1 deletion python/hsfs/engine/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def init(engine_type: str) -> None:
raise ValueError(
"Hive engine is not supported in hopsworks client version >= 4.0."
)
elif engine_type == "spark-no-metastore":
elif engine_type == "spark-no-metastore" or engine_type == "spark-delta":
_engine = spark_no_metastore.Engine()
elif engine_type in python_types:
try:
Expand Down
2 changes: 1 addition & 1 deletion python/hsfs/engine/spark_no_metastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,6 @@ def __init__(self) -> None:

super().__init__()

def _sql_offline(self, sql_query):
def _sql_offline(self, sql_query, feature_store):
# Spark no metastore does not require the
return self._spark_session.sql(sql_query)

0 comments on commit 37c17c4

Please sign in to comment.