Skip to content

Commit

Permalink
[HWORKS-1048] Support multiple modularized project environments
Browse files Browse the repository at this point in the history
  • Loading branch information
robzor92 committed Jul 2, 2024
1 parent bdaff4c commit 07cfbaa
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 43 deletions.
51 changes: 34 additions & 17 deletions python/hopsworks/core/environment_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

from hopsworks import client, environment
from hopsworks.engine import environment_engine
from typing import Optional, List
import json


class EnvironmentApi:

Check failure on line 23 in python/hopsworks/core/environment_api.py

View workflow job for this annotation

GitHub Actions / Lint and Stylecheck

Ruff (I001)

python/hopsworks/core/environment_api.py:17:1: I001 Import block is un-sorted or un-formatted

Check failure on line 23 in python/hopsworks/core/environment_api.py

View workflow job for this annotation

GitHub Actions / Lint and Stylecheck

Ruff (I001)

python/hopsworks/core/environment_api.py:17:1: I001 Import block is un-sorted or un-formatted
Expand All @@ -29,7 +31,7 @@ def __init__(

self._environment_engine = environment_engine.EnvironmentEngine(project_id)

def create_environment(self, await_creation=True):
def create_environment(self, name: str, description: Optional[str] = None, base_environment_name: Optional[str] = "python-feature-pipeline", await_creation: Optional[bool] = True) -> environment.Environment:
"""Create Python environment for the project
```python
Expand All @@ -40,10 +42,13 @@ def create_environment(self, await_creation=True):
env_api = project.get_environment_api()
env = env_api.create_environment()
new_env = env_api.create_environment("my_custom_environment", base_environment_name="python-feature-pipeline")
```
# Arguments
name: name of the environment
base_environment_name: the name of the environment to copy from
await_creation: bool. If True the method returns only when the creation is finished. Default True
# Returns
`Environment`: The Environment object
Expand All @@ -57,21 +62,26 @@ def create_environment(self, await_creation=True):
self._project_id,
"python",
"environments",
client.get_python_version(),
name,
]
headers = {"content-type": "application/json"}
data = {"name": name,
"baseImage": {
"name": base_environment_name,
"description": description
}}
env = environment.Environment.from_response_json(
_client._send_request("POST", path_params, headers=headers),
_client._send_request("POST", path_params, headers=headers, data=json.dumps(data)),
self._project_id,
self._project_name,
)

if await_creation:
self._environment_engine.await_environment_command()
self._environment_engine.await_environment_command(name)

return env

def _get_environments(self):
def get_environments(self) -> List[environment.Environment]:
"""
Get all available python environments in the project
"""
Expand All @@ -88,7 +98,7 @@ def _get_environments(self):
self._project_name,
)

def get_environment(self):
def get_environment(self, name: str) -> environment.Environment:
"""Get handle for the Python environment for the project
```python
Expand All @@ -99,30 +109,37 @@ def get_environment(self):
env_api = project.get_environment_api()
env = env_api.get_environment()
env = env_api.get_environment("my_custom_environment")
```
# Returns
`Environment`: The Environment object
# Raises
`RestAPIError`: If unable to get the environment
"""
project_envs = self._get_environments()
if len(project_envs) == 0:
return None
elif len(project_envs) > 0:
return project_envs[0]

def _delete(self, python_version):
"""Delete the project Python environment"""
_client = client.get_instance()

path_params = ["project", self._project_id, "python", "environments", name]
query_params = {"expand": ["libraries", "commands"]}
headers = {"content-type": "application/json"}
return environment.Environment.from_response_json(
_client._send_request(
"GET", path_params, query_params=query_params, headers=headers
),
self._project_id,
self._project_name,
)

def _delete(self, name):
"""Delete the Python environment"""
_client = client.get_instance()

path_params = [
"project",
self._project_id,
"python",
"environments",
python_version,
name,
]
headers = {"content-type": "application/json"}
_client._send_request("DELETE", path_params, headers=headers),
4 changes: 2 additions & 2 deletions python/hopsworks/core/library_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def __init__(
self._project_id = project_id
self._project_name = project_name

def install(self, library_name: str, python_version: str, library_spec: dict):
def install(self, library_name: str, name: str, library_spec: dict):
"""Create Python environment for the project"""
_client = client.get_instance()

Expand All @@ -37,7 +37,7 @@ def install(self, library_name: str, python_version: str, library_spec: dict):
self._project_id,
"python",
"environments",
python_version,
name,
"libraries",
library_name,
]
Expand Down
16 changes: 8 additions & 8 deletions python/hopsworks/engine/environment_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,21 @@ class EnvironmentEngine:
def __init__(self, project_id):
self._project_id = project_id

def await_library_command(self, library_name=None):
def await_library_command(self, environment_name, library_name):
commands = [command.Command(status="ONGOING")]
while len(commands) > 0 and not self._is_final_status(commands[0]):
time.sleep(5)
library = self._poll_commands_library(library_name)
library = self._poll_commands_library(environment_name, library_name)
if library is None:
commands = []
else:
commands = library._commands

def await_environment_command(self):
def await_environment_command(self, environment_name):
commands = [command.Command(status="ONGOING")]
while len(commands) > 0 and not self._is_final_status(commands[0]):
time.sleep(5)
environment = self._poll_commands_environment()
environment = self._poll_commands_environment(environment_name)
if environment is None:
commands = []
else:
Expand All @@ -54,15 +54,15 @@ def _is_final_status(self, command):
else:
return False

def _poll_commands_library(self, library_name):
def _poll_commands_library(self, environment_name, library_name):
_client = client.get_instance()

path_params = [
"project",
self._project_id,
"python",
"environments",
client.get_python_version(),
environment_name,
"libraries",
library_name,
]
Expand All @@ -85,15 +85,15 @@ def _poll_commands_library(self, library_name):
):
return None

def _poll_commands_environment(self):
def _poll_commands_environment(self, environment_name):
_client = client.get_instance()

path_params = [
"project",
self._project_id,
"python",
"environments",
client.get_python_version(),
environment_name,
]

query_params = {"expand": "commands"}
Expand Down
48 changes: 33 additions & 15 deletions python/hopsworks/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@
from hopsworks import command, util
from hopsworks.core import environment_api, library_api
from hopsworks.engine import environment_engine
from typing import Optional


class Environment:

Check failure on line 26 in python/hopsworks/environment.py

View workflow job for this annotation

GitHub Actions / Lint and Stylecheck

Ruff (I001)

python/hopsworks/environment.py:17:1: I001 Import block is un-sorted or un-formatted

Check failure on line 26 in python/hopsworks/environment.py

View workflow job for this annotation

GitHub Actions / Lint and Stylecheck

Ruff (I001)

python/hopsworks/environment.py:17:1: I001 Import block is un-sorted or un-formatted
def __init__(
self,
python_version,
python_conflicts,
pip_search_enabled,
name=None,
description=None,
python_version=None,
python_conflicts=None,
pip_search_enabled=None,
conflicts=None,
conda_channel=None,
libraries=None,
Expand All @@ -38,6 +41,8 @@ def __init__(
project_name=None,
**kwargs,
):
self._name = name
self._description = description
self._python_version = python_version
self._python_conflicts = python_conflicts
self._pip_search_enabled = pip_search_enabled
Expand Down Expand Up @@ -77,7 +82,17 @@ def python_version(self):
"""Python version of the environment"""
return self._python_version

def install_wheel(self, path, await_installation=True):
@property
def name(self):
"""Name of the environment"""
return self._name

@property
def description(self):
"""Description of the environment"""
return self._description

def install_wheel(self, path, await_installation: Optional[bool] = True):
"""Install a python library packaged in a wheel file
```python
Expand All @@ -92,7 +107,8 @@ def install_wheel(self, path, await_installation=True):
# Install
env_api = project.get_environment_api()
env = env_api.get_environment()
env = env_api.get_environment("my_custom_environment")
env.install_wheel(whl_path)
```
Expand All @@ -103,7 +119,7 @@ def install_wheel(self, path, await_installation=True):
"""

# Wait for any ongoing environment operations
self._environment_engine.await_environment_command()
self._environment_engine.await_environment_command(self.name)

library_name = os.path.basename(path)

Expand All @@ -116,15 +132,15 @@ def install_wheel(self, path, await_installation=True):
}

library_rest = self._library_api.install(
library_name, self.python_version, library_spec
library_name, self.name, library_spec
)

if await_installation:
return self._environment_engine.await_library_command(library_name)
return self._environment_engine.await_library_command(self.name, library_name)

return library_rest

def install_requirements(self, path, await_installation=True):
def install_requirements(self, path, await_installation: Optional[bool] = True):
"""Install libraries specified in a requirements.txt file
```python
Expand All @@ -139,7 +155,9 @@ def install_requirements(self, path, await_installation=True):
# Install
env_api = project.get_environment_api()
env = env_api.get_environment()
env = env_api.get_environment("my_custom_environment")
env.install_requirements(requirements_path)
```
Expand All @@ -150,7 +168,7 @@ def install_requirements(self, path, await_installation=True):
"""

# Wait for any ongoing environment operations
self._environment_engine.await_environment_command()
self._environment_engine.await_environment_command(self.name)

library_name = os.path.basename(path)

Expand All @@ -163,11 +181,11 @@ def install_requirements(self, path, await_installation=True):
}

library_rest = self._library_api.install(
library_name, self.python_version, library_spec
library_name, self.name, library_spec
)

if await_installation:
return self._environment_engine.await_library_command(library_name)
return self._environment_engine.await_library_command(self.name, library_name)

return library_rest

Expand All @@ -178,7 +196,7 @@ def delete(self):
# Raises
`RestAPIError`.
"""
self._environment_api._delete(self.python_version)
self._environment_api._delete(self.name)

def __repr__(self):
return f"Environment({self._python_version!r})"
return f"Environment({self.name!r})"
2 changes: 1 addition & 1 deletion python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ classifiers = [

dependencies = [
"hsfs[python] @ git+https://[email protected]/logicalclocks/feature-store-api@master#subdirectory=python",
"hsml @ git+https://[email protected]/logicalclocks/machine-learning-api@main#subdirectory=python",
"hsml @ git+https://[email protected]/robzor92/machine-learning-api@HWORKS-1048#subdirectory=python",
"pyhumps==1.6.1",
"requests",
"furl",
Expand Down

0 comments on commit 07cfbaa

Please sign in to comment.