Skip to content

Commit

Permalink
[FSTORE-1455] Add telemetry to hsml and hopsworks (logicalclocks#299)
Browse files Browse the repository at this point in the history
* Move usage to hopsworks_common

* Add telemetry

* Add telemetry to hopsworks_common

* Remove hopsworks.usage

* Fix imports

* Ruff

* Fix review remarks

* Fix Javier review remarks

* Ruff
  • Loading branch information
aversey authored Sep 3, 2024
1 parent 2ea73c1 commit 08c0c69
Show file tree
Hide file tree
Showing 34 changed files with 527 additions and 297 deletions.
17 changes: 17 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,23 @@ data = { "instances": [ model.input_example ] }
predictions = deployment.predict(data)
```

## Usage

Usage data is collected for improving quality of the library.
It is turned on by default if the backend is [Hopsworks Serverless](https://c.app.hopsworks.ai).
To turn it off, use one of the following ways:
```python
# use environment variable
import os
os.environ["ENABLE_HOPSWORKS_USAGE"] = "false"

# use `disable_usage_logging`
import hopsworks
hopsworks.disable_usage_logging()
```

The corresponding source code is in `python/hopsworks_common/usage.py`.

## Tutorials

Need more inspiration or want to learn more about the Hopsworks platform? Check out our [tutorials](https://github.com/logicalclocks/hopsworks-tutorials).
Expand Down
5 changes: 5 additions & 0 deletions python/hopsworks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from hopsworks.connection import Connection
from hopsworks.core import project_api, secret_api
from hopsworks.decorators import NoHopsworksConnectionError
from hopsworks_common import usage
from requests.exceptions import SSLError


Expand Down Expand Up @@ -476,3 +477,7 @@ def _set_active_project(project):
_client = client.get_instance()
if _client._is_external():
_client.provide_project(project.name)


def disable_usage_logging():
usage.disable()
2 changes: 2 additions & 0 deletions python/hopsworks/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from hopsworks import client, version
from hopsworks.core import project_api, secret_api, variable_api
from hopsworks.decorators import connected, not_connected
from hopsworks_common import usage
from requests.exceptions import ConnectionError


Expand Down Expand Up @@ -292,6 +293,7 @@ def connect(self):
self._project_api = project_api.ProjectApi()
self._secret_api = secret_api.SecretsApi()
self._variable_api = variable_api.VariableApi()
usage.init_usage(self._host, self._variable_api.get_version("hopsworks"))
except (TypeError, ConnectionError):
self._connected = False
raise
Expand Down
11 changes: 10 additions & 1 deletion python/hopsworks_common/core/dataset_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import time
from concurrent.futures import ThreadPoolExecutor, wait

from hopsworks_common import client, util
from hopsworks_common import client, usage, util
from hopsworks_common.client.exceptions import DatasetException, RestAPIError
from hopsworks_common.core import inode
from tqdm.auto import tqdm
Expand All @@ -45,6 +45,7 @@ def __init__(self):
DEFAULT_FLOW_CHUNK_SIZE = 1048576
FLOW_PERMANENT_ERRORS = [404, 413, 415, 500, 501]

@usage.method_logger
def download(self, path: str, local_path: str = None, overwrite: bool = False):
"""Download file from Hopsworks Filesystem to the current working directory.
Expand Down Expand Up @@ -136,6 +137,7 @@ def download(self, path: str, local_path: str = None, overwrite: bool = False):

return local_path

@usage.method_logger
def upload(
self,
local_path: str,
Expand Down Expand Up @@ -341,6 +343,7 @@ def exists(self, path: str):
except RestAPIError:
return False

@usage.method_logger
def remove(self, path: str):
"""Remove a path in the Hopsworks Filesystem.
Expand All @@ -353,6 +356,7 @@ def remove(self, path: str):
path_params = ["project", _client._project_id, "dataset", path]
_client._send_request("DELETE", path_params)

@usage.method_logger
def mkdir(self, path: str):
"""Create a directory in the Hopsworks Filesystem.
Expand Down Expand Up @@ -387,6 +391,7 @@ def mkdir(self, path: str):
"POST", path_params, headers=headers, query_params=query_params
)["attributes"]["path"]

@usage.method_logger
def copy(self, source_path: str, destination_path: str, overwrite: bool = False):
"""Copy a file or directory in the Hopsworks Filesystem.
Expand Down Expand Up @@ -426,6 +431,7 @@ def copy(self, source_path: str, destination_path: str, overwrite: bool = False)
}
_client._send_request("POST", path_params, query_params=query_params)

@usage.method_logger
def move(self, source_path: str, destination_path: str, overwrite: bool = False):
"""Move a file or directory in the Hopsworks Filesystem.
Expand Down Expand Up @@ -466,6 +472,7 @@ def move(self, source_path: str, destination_path: str, overwrite: bool = False)
}
_client._send_request("POST", path_params, query_params=query_params)

@usage.method_logger
def upload_feature_group(self, feature_group, path, dataframe):
# Convert the dataframe into PARQUET for upload
df_parquet = dataframe.to_parquet(index=False)
Expand Down Expand Up @@ -493,6 +500,7 @@ def upload_feature_group(self, feature_group, path, dataframe):

chunk_number += 1

@usage.method_logger
def list_files(self, path, offset, limit):
_client = client.get_instance()
path_params = [
Expand All @@ -512,6 +520,7 @@ def list_files(self, path, offset, limit):

return inode_lst["count"], inode.Inode.from_response_json(inode_lst)

@usage.method_logger
def read_content(self, path: str, dataset_type: str = "DATASET"):
_client = client.get_instance()

Expand Down
5 changes: 4 additions & 1 deletion python/hopsworks_common/core/environment_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@
import json
from typing import List, Optional

from hopsworks_common import client, environment
from hopsworks_common import client, environment, usage
from hopsworks_common.engine import environment_engine


class EnvironmentApi:
def __init__(self):
self._environment_engine = environment_engine.EnvironmentEngine()

@usage.method_logger
def create_environment(
self,
name: str,
Expand Down Expand Up @@ -80,6 +81,7 @@ def create_environment(

return env

@usage.method_logger
def get_environments(self) -> List[environment.Environment]:
"""
Get all available python environments in the project
Expand All @@ -95,6 +97,7 @@ def get_environments(self) -> List[environment.Environment]:
)
)

@usage.method_logger
def get_environment(self, name: str) -> environment.Environment:
"""Get handle for the Python environment for the project
Expand Down
5 changes: 4 additions & 1 deletion python/hopsworks_common/core/flink_cluster_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import json
import os

from hopsworks_common import client, flink_cluster, job, util
from hopsworks_common import client, flink_cluster, job, usage, util
from hopsworks_common.client.exceptions import RestAPIError
from hopsworks_common.core import job_api

Expand All @@ -26,6 +26,7 @@ class FlinkClusterApi:
def __init__(self):
self._job_api = job_api.JobApi()

@usage.method_logger
def get_configuration(self):
"""Get configuration for the Flink cluster.
Expand All @@ -36,6 +37,7 @@ def get_configuration(self):
"""
return self._job_api.get_configuration("FLINK")

@usage.method_logger
def setup_cluster(self, name: str, config=None):
"""Create a new flink job representing a flink cluster, or update an existing one.
Expand Down Expand Up @@ -95,6 +97,7 @@ def _create_cluster(self, name: str, config: dict):
)
return flink_cluster_obj

@usage.method_logger
def get_cluster(self, name: str):
"""Get the job corresponding to the flink cluster.
```python
Expand Down
7 changes: 7 additions & 0 deletions python/hopsworks_common/core/git_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
git_file_status,
git_op_execution,
git_repo,
usage,
util,
)
from hopsworks_common.client.exceptions import GitException
Expand All @@ -38,6 +39,7 @@ def __init__(self):
self._git_provider_api = git_provider_api.GitProviderApi()
self._log = logging.getLogger(__name__)

@usage.method_logger
def clone(self, url: str, path: str, provider: str = None, branch: str = None):
"""Clone a new Git Repo in to Hopsworks Filesystem.
Expand Down Expand Up @@ -101,6 +103,7 @@ def clone(self, url: str, path: str, provider: str = None, branch: str = None):
created_repo = self.get_repo(git_op.repository.name, git_op.repository.path)
return created_repo

@usage.method_logger
def get_repos(self):
"""Get the existing Git repositories
Expand All @@ -120,6 +123,7 @@ def get_repos(self):
_client._send_request("GET", path_params, query_params=query_params)
)

@usage.method_logger
def get_providers(self):
"""Get the configured Git providers
Expand All @@ -130,6 +134,7 @@ def get_providers(self):
"""
return self._git_provider_api._get_providers()

@usage.method_logger
def get_provider(self, provider: str):
"""Get the configured Git provider
Expand All @@ -142,6 +147,7 @@ def get_provider(self, provider: str):
"""
return self._git_provider_api._get_provider(provider)

@usage.method_logger
def set_provider(self, provider: str, username: str, token: str):
"""Configure a Git provider
Expand All @@ -165,6 +171,7 @@ def set_provider(self, provider: str, username: str, token: str):
"""
self._git_provider_api._set_provider(provider, username, token)

@usage.method_logger
def get_repo(self, name: str, path: str = None):
"""Get the cloned Git repository
Expand Down
13 changes: 12 additions & 1 deletion python/hopsworks_common/core/job_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import json
from typing import Any, Dict, Union

from hopsworks_common import client, execution, job, job_schedule, util
from hopsworks_common import client, execution, job, job_schedule, usage, util
from hopsworks_common.client.exceptions import RestAPIError
from hopsworks_common.core import (
ingestion_job_conf,
Expand All @@ -28,6 +28,7 @@


class JobApi:
@usage.method_logger
def create_job(self, name: str, config: dict):
"""Create a new job or update an existing one.
Expand Down Expand Up @@ -69,6 +70,7 @@ def create_job(self, name: str, config: dict):
print(f"Job created successfully, explore it at {created_job.get_url()}")
return created_job

@usage.method_logger
def get_job(self, name: str):
"""Get a job.
Expand All @@ -91,6 +93,7 @@ def get_job(self, name: str):
_client._send_request("GET", path_params, query_params=query_params)
)

@usage.method_logger
def get_jobs(self):
"""Get all jobs.
Expand All @@ -110,6 +113,7 @@ def get_jobs(self):
_client._send_request("GET", path_params, query_params=query_params)
)

@usage.method_logger
def exists(self, name: str):
"""Check if a job exists.
Expand All @@ -126,6 +130,7 @@ def exists(self, name: str):
except RestAPIError:
return False

@usage.method_logger
def get_configuration(self, type: str):
"""Get configuration for the specific job type.
Expand Down Expand Up @@ -206,6 +211,7 @@ def _delete_schedule_job(self, name):
path_params,
)

@usage.method_logger
def create(
self,
name: str,
Expand All @@ -223,18 +229,21 @@ def create(
)
)

@usage.method_logger
def launch(self, name: str, args: str = None) -> None:
_client = client.get_instance()
path_params = ["project", _client._project_id, "jobs", name, "executions"]

_client._send_request("POST", path_params, data=args)

@usage.method_logger
def get(self, name: str) -> job.Job:
_client = client.get_instance()
path_params = ["project", _client._project_id, "jobs", name]

return job.Job.from_response_json(_client._send_request("GET", path_params))

@usage.method_logger
def last_execution(self, job: job.Job) -> execution.Execution:
_client = client.get_instance()
path_params = ["project", _client._project_id, "jobs", job.name, "executions"]
Expand All @@ -249,6 +258,7 @@ def last_execution(self, job: job.Job) -> execution.Execution:
job=job,
)

@usage.method_logger
def create_or_update_schedule_job(
self, name: str, schedule_config: Dict[str, Any]
) -> job_schedule.JobSchedule:
Expand All @@ -263,6 +273,7 @@ def create_or_update_schedule_job(
)
)

@usage.method_logger
def delete_schedule_job(self, name: str) -> None:
_client = client.get_instance()
path_params = ["project", _client._project_id, "jobs", name, "schedule", "v2"]
Expand Down
Loading

0 comments on commit 08c0c69

Please sign in to comment.