Skip to content

Commit

Permalink
[FSTORE-1626] Spark client without hopsfs (logicalclocks#419)
Browse files Browse the repository at this point in the history
  • Loading branch information
bubriks authored and davitbzh committed Dec 5, 2024
1 parent a7ab30b commit aad5c40
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 24 deletions.
21 changes: 8 additions & 13 deletions python/hopsworks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,7 @@ def login(
api_key_file: str = None,
hostname_verification: bool = False,
trust_store_path: str = None,
engine: Union[
None, Literal["spark"], Literal["python"], Literal["training"]
] = None,
engine: Union[None, Literal["spark"], Literal["python"], Literal["training"], Literal["spark-no-metastore"], Literal["spark-delta"]] = None,
) -> project.Project:
"""Connect to [Serverless Hopsworks](https://app.hopsworks.ai) by calling the `hopsworks.login()` function with no arguments.
Expand Down Expand Up @@ -126,13 +124,12 @@ def login(
api_key_file: Path to file wih Api Key
hostname_verification: Whether to verify Hopsworks' certificate
trust_store_path: Path on the file system containing the Hopsworks certificates
engine: Which engine to use, `"spark"`, `"python"` or `"training"`. Defaults to `None`,
which initializes the engine to Spark if the environment provides Spark, for
example on Hopsworks and Databricks, or falls back to Python if Spark is not
available, e.g. on local Python environments or AWS SageMaker. This option
allows you to override this behaviour. `"training"` engine is useful when only
feature store metadata is needed, for example training dataset location and label
information when Hopsworks training experiment is conducted.
engine: Specifies the engine to use. Possible options are "spark", "python", "training", "spark-no-metastore", or "spark-delta". The default value is None, which automatically selects the engine based on the environment:
"spark": Used if Spark is available, such as in Hopsworks or Databricks environments.
"python": Used in local Python environments or AWS SageMaker when Spark is not available.
"training": Used when only feature store metadata is needed, such as for obtaining training dataset locations and label information during Hopsworks training experiments.
"spark-no-metastore": Functions like "spark" but does not rely on the Hive metastore.
"spark-delta": Minimizes dependencies further by avoiding both Hive metastore and HopsFS.
# Returns
`Project`: The Project object to perform operations on
# Raises
Expand All @@ -149,9 +146,7 @@ def login(

# If inside hopsworks, just return the current project for now
if "REST_ENDPOINT" in os.environ:
_hw_connection = _hw_connection(
hostname_verification=hostname_verification, engine=engine
)
_hw_connection = _hw_connection(hostname_verification=hostname_verification, engine=engine)
_connected_project = _hw_connection.get_project()
_initialize_module_apis()
print("\nLogged in to project, explore it here " + _connected_project.get_url())
Expand Down
9 changes: 9 additions & 0 deletions python/hopsworks_common/client/external.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,15 @@ def provide_project(self, project):
for conf_key, conf_value in configuration_dict.items():
_spark_session._jsc.hadoopConfiguration().set(conf_key, conf_value)

elif self._engine == "spark-delta":
_logger.debug(
"Running in Spark environment with no metastore and hopsfs, initializing Spark session"
)
_spark_session = SparkSession.builder \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()

hopsworks_common.client.get_connection()._provide_project()

def download_certs(self):
Expand Down
22 changes: 13 additions & 9 deletions python/hopsworks_common/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,12 @@ class Connection:
project: The name of the project to connect to. When running on Hopsworks, this
defaults to the project from where the client is run from.
Defaults to `None`.
engine: Which engine to use, `"spark"`, `"python"` or `"training"`. Defaults to `None`,
which initializes the engine to Spark if the environment provides Spark, for
example on Hopsworks and Databricks, or falls back to Python if Spark is not
available, e.g. on local Python environments or AWS SageMaker. This option
allows you to override this behaviour. `"training"` engine is useful when only
feature store metadata is needed, for example training dataset location and label
information when Hopsworks training experiment is conducted.
engine: Specifies the engine to use. Possible options are "spark", "python", "training", "spark-no-metastore", or "spark-delta". The default value is None, which automatically selects the engine based on the environment:
"spark": Used if Spark is available, such as in Hopsworks or Databricks environments.
"python": Used in local Python environments or AWS SageMaker when Spark is not available.
"training": Used when only feature store metadata is needed, such as for obtaining training dataset locations and label information during Hopsworks training experiments.
"spark-no-metastore": Functions like "spark" but does not rely on the Hive metastore.
"spark-delta": Minimizes dependencies further by avoiding both Hive metastore and HopsFS.
hostname_verification: Whether or not to verify Hopsworks' certificate, defaults
to `True`.
trust_store_path: Path on the file system containing the Hopsworks certificates,
Expand Down Expand Up @@ -354,10 +353,15 @@ def connect(self) -> None:
and self._engine.lower() == "spark-no-metastore"
):
self._engine = "spark-no-metastore"
elif (
self._engine is not None
and self._engine.lower() == "spark-delta"
):
self._engine = "spark-delta"
else:
raise ConnectionError(
"Engine you are trying to initialize is unknown. "
"Supported engines are `'spark'`, `'python'` and `'training'`."
"Supported engines are `'spark'`, `'python'`, `'training'`, `'spark-no-metastore'`, and `'spark-delta'`."
)

# init client
Expand Down Expand Up @@ -514,7 +518,7 @@ def connection(
project: The name of the project to connect to. When running on Hopsworks, this
defaults to the project from where the client is run from.
Defaults to `None`.
engine: Which engine to use, `"spark"`, `"python"` or `"training"`. Defaults to `None`,
engine: Which engine to use, `"spark"`, `"python"`, `"training"`, `"spark-no-metastore"` or `"spark-delta"`. Defaults to `None`,
which initializes the engine to Spark if the environment provides Spark, for
example on Hopsworks and Databricks, or falls back to Python if Spark is not
available, e.g. on local Python environments or AWS SageMaker. This option
Expand Down
2 changes: 1 addition & 1 deletion python/hsfs/engine/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def init(engine_type: str) -> None:
raise ValueError(
"Hive engine is not supported in hopsworks client version >= 4.0."
)
elif engine_type == "spark-no-metastore":
elif engine_type == "spark-no-metastore" or engine_type == "spark-delta":
_engine = spark_no_metastore.Engine()
elif engine_type in python_types:
try:
Expand Down
2 changes: 1 addition & 1 deletion python/hsfs/engine/spark_no_metastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,6 @@ def __init__(self) -> None:

super().__init__()

def _sql_offline(self, sql_query):
def _sql_offline(self, sql_query, feature_store):
# Spark no metastore does not require the
return self._spark_session.sql(sql_query)

0 comments on commit aad5c40

Please sign in to comment.