diff --git a/sky/provision/instance_setup.py b/sky/provision/instance_setup.py index 86d1c59f36c..523df8f7032 100644 --- a/sky/provision/instance_setup.py +++ b/sky/provision/instance_setup.py @@ -15,9 +15,12 @@ from sky.provision import logging as provision_logging from sky.provision import metadata_utils from sky.skylet import constants +from sky.usage import constants as usage_constants +from sky.usage import usage_lib from sky.utils import accelerator_registry from sky.utils import command_runner from sky.utils import common_utils +from sky.utils import env_options from sky.utils import subprocess_utils from sky.utils import timeline from sky.utils import ux_utils @@ -67,6 +70,26 @@ 'sky.skylet.attempt_skylet;') +def _set_usage_run_id_cmd() -> str: + """Gets the command to set the usage run id. + + The command saves the current usage run id to the file, so that the skylet + can use it to report the heartbeat. + + We use a function instead of a constant so that the usage run id is the + latest one when the function is called. + """ + return (f'cat {usage_constants.USAGE_RUN_ID_FILE} || ' + f'echo "{usage_lib.messages.usage.run_id}" > ' + f'{usage_constants.USAGE_RUN_ID_FILE}') + + +def _set_skypilot_env_var_cmd() -> str: + """Sets the skypilot environment variables on the remote machine.""" + env_vars = env_options.Options.all_options() + return '; '.join([f'export {k}={v}' for k, v in env_vars.items()]) + + def _auto_retry(should_retry: Callable[[Exception], bool] = lambda _: True): """Decorator that retries the function if it fails. @@ -450,11 +473,17 @@ def start_skylet_on_head_node(cluster_name: str, logger.info(f'Running command on head node: {MAYBE_SKYLET_RESTART_CMD}') # We need to source bashrc for skylet to make sure the autostop event can # access the path to the cloud CLIs. - returncode, stdout, stderr = head_runner.run(MAYBE_SKYLET_RESTART_CMD, - stream_logs=False, - require_outputs=True, - log_path=log_path_abs, - source_bashrc=True) + set_usage_run_id_cmd = _set_usage_run_id_cmd() + # Set the skypilot environment variables, including the usage type, debug + # info, and other options. + set_skypilot_env_var_cmd = _set_skypilot_env_var_cmd() + returncode, stdout, stderr = head_runner.run( + f'{set_usage_run_id_cmd}; {set_skypilot_env_var_cmd}; ' + f'{MAYBE_SKYLET_RESTART_CMD}', + stream_logs=False, + require_outputs=True, + log_path=log_path_abs, + source_bashrc=True) if returncode: raise RuntimeError('Failed to start skylet on the head node ' f'(exit code {returncode}). Error: ' diff --git a/sky/skylet/events.py b/sky/skylet/events.py index b6e99707dab..b5cf76c6efe 100644 --- a/sky/skylet/events.py +++ b/sky/skylet/events.py @@ -18,6 +18,7 @@ from sky.skylet import autostop_lib from sky.skylet import constants from sky.skylet import job_lib +from sky.usage import usage_lib from sky.utils import cluster_yaml_utils from sky.utils import common_utils from sky.utils import ux_utils @@ -87,6 +88,14 @@ def _run(self): serve_utils.update_service_status() +class UsageHeartbeatReportEvent(SkyletEvent): + """Skylet event for reporting usage.""" + EVENT_INTERVAL_SECONDS = 600 + + def _run(self): + usage_lib.send_heartbeat(interval_seconds=self.EVENT_INTERVAL_SECONDS) + + class AutostopEvent(SkyletEvent): """Skylet event for autostop. diff --git a/sky/skylet/skylet.py b/sky/skylet/skylet.py index a114d622de4..6bcf9208d5f 100644 --- a/sky/skylet/skylet.py +++ b/sky/skylet/skylet.py @@ -25,6 +25,8 @@ # unhealthy, this event will correctly update the controller # status to CONTROLLER_FAILED. events.ServiceUpdateEvent(), + # Report usage heartbeat every 10 minutes. + events.UsageHeartbeatReportEvent(), ] while True: diff --git a/sky/usage/constants.py b/sky/usage/constants.py index c2f8c6d067b..4845e781f46 100644 --- a/sky/usage/constants.py +++ b/sky/usage/constants.py @@ -3,7 +3,6 @@ LOG_URL = 'http://usage.skypilot.co:9090/loki/api/v1/push' # pylint: disable=line-too-long USAGE_MESSAGE_SCHEMA_VERSION = 1 - PRIVACY_POLICY_PATH = '~/.sky/privacy_policy' USAGE_POLICY_MESSAGE = ( @@ -15,3 +14,6 @@ USAGE_MESSAGE_REDACT_KEYS = ['setup', 'run', 'envs'] USAGE_MESSAGE_REDACT_TYPES = {str, dict} + +USAGE_RUN_ID_ENV_VAR = 'SKYPILOT_USAGE_RUN_ID' +USAGE_RUN_ID_FILE = '~/.sky/usage_run_id' diff --git a/sky/usage/loki-s3-config.yaml b/sky/usage/loki-s3-config.yaml index e94dfaba50e..c54143de45c 100644 --- a/sky/usage/loki-s3-config.yaml +++ b/sky/usage/loki-s3-config.yaml @@ -4,6 +4,8 @@ server: http_listen_port: 9090 http_server_read_timeout: 300s http_server_write_timeout: 300s + grpc_server_max_recv_msg_size: 1073741824 # 128 MB + grpc_server_max_send_msg_size: 1073741824 # 128 MB schema_config: configs: @@ -14,12 +16,18 @@ schema_config: index: prefix: loki_index_ period: 24h + - from: 2024-12-30 + store: tsdb + object_store: aws + schema: v13 + index: + prefix: loki_index_ + period: 24h ingester: chunk_idle_period: 3m chunk_block_size: 262144 chunk_retain_period: 1m - max_transfer_retries: 0 wal: enabled: true dir: /loki/wal @@ -31,18 +39,19 @@ ingester: storage_config: aws: - bucketnames: sky-host-loki + bucketnames: skypilot-loki region: us-west-2 access_key_id: PLACEHOLDER secret_access_key: PLACEHOLDER s3forcepathstyle: true boltdb_shipper: active_index_directory: /loki/index - shared_store: s3 cache_location: /loki/boltdb-cache + tsdb_shipper: + active_index_directory: /loki/tsdb-index + cache_location: /loki/tsdb-cache querier: - query_timeout: 10m max_concurrent: 32 query_scheduler: max_outstanding_requests_per_tenant: 4096 @@ -53,23 +62,27 @@ query_range: limits_config: max_query_series: 5000 - ingestion_rate_strategy: local # Default: global + ingestion_rate_strategy: local max_global_streams_per_user: 5000 - max_query_length: 0h # Default: 721h - max_query_parallelism: 32 # Old Default: 14 + max_query_length: 0h + max_query_parallelism: 32 max_entries_limit_per_query: 1000000 - max_streams_per_user: 0 # Old Default: 10000 + max_streams_per_user: 0 reject_old_samples: true reject_old_samples_max_age: 168h + query_timeout: 10m + allow_structured_metadata: false compactor: working_directory: /loki/boltdb-shipper-compactor - shared_store: aws chunk_store_config: - max_look_back_period: 0s + chunk_cache_config: + # Updated cache configuration for Loki 3.3 + embedded_cache: + enabled: true + max_size_mb: 1024 # 1GB cache, expressed in MB table_manager: retention_deletes_enabled: false retention_period: 0s - diff --git a/sky/usage/usage_lib.py b/sky/usage/usage_lib.py index 07867939ee5..6f7c4323821 100644 --- a/sky/usage/usage_lib.py +++ b/sky/usage/usage_lib.py @@ -36,6 +36,7 @@ def _get_current_timestamp_ns() -> int: class MessageType(enum.Enum): """Types for messages to be sent to Loki.""" USAGE = 'usage' + HEARTBEAT = 'heartbeat' # TODO(zhwu): Add more types, e.g., cluster_lifecycle. @@ -59,8 +60,9 @@ def get_properties(self) -> Dict[str, Any]: properties = self.__dict__.copy() return {k: v for k, v in properties.items() if not k.startswith('_')} - def __repr__(self): - raise NotImplementedError + def __repr__(self) -> str: + d = self.get_properties() + return json.dumps(d) class UsageMessageToReport(MessageToReport): @@ -152,10 +154,6 @@ def __init__(self) -> None: self.exception: Optional[str] = None # entrypoint_context self.stacktrace: Optional[str] = None # entrypoint_context - def __repr__(self) -> str: - d = self.get_properties() - return json.dumps(d) - def update_entrypoint(self, msg: str): self.entrypoint = msg @@ -267,16 +265,43 @@ def update_runtime(self, name_or_fn: str): name_or_fn) +class HeartbeatMessageToReport(MessageToReport): + """Message to be reported to Grafana Loki for heartbeat on a cluster.""" + + def __init__(self, interval_seconds: int = 600): + super().__init__(constants.USAGE_MESSAGE_SCHEMA_VERSION) + # This interval_seconds is mainly for recording the heartbeat interval + # in the heartbeat message, so that the collector can use it. + self.interval_seconds = interval_seconds + + def get_properties(self) -> Dict[str, Any]: + properties = super().get_properties() + # The run id is set by the skylet, which will always be the same for + # the entire lifetime of the run. + with open(os.path.expanduser(constants.USAGE_RUN_ID_FILE), + 'r', + encoding='utf-8') as f: + properties['run_id'] = f.read().strip() + return properties + + class MessageCollection: """A collection of messages.""" def __init__(self): - self._messages = {MessageType.USAGE: UsageMessageToReport()} + self._messages = { + MessageType.USAGE: UsageMessageToReport(), + MessageType.HEARTBEAT: HeartbeatMessageToReport() + } @property - def usage(self): + def usage(self) -> UsageMessageToReport: return self._messages[MessageType.USAGE] + @property + def heartbeat(self) -> HeartbeatMessageToReport: + return self._messages[MessageType.HEARTBEAT] + def reset(self, message_type: MessageType): self._messages[message_type] = self._messages[message_type].__class__() @@ -300,13 +325,22 @@ def _send_to_loki(message_type: MessageType): message = messages[message_type] + # In case the message has no start time, set it to the current time. + message.start() message.send_time = _get_current_timestamp_ns() log_timestamp = message.start_time environment = 'prod' if env_options.Options.IS_DEVELOPER.get(): environment = 'dev' - prom_labels = {'type': message_type.value, 'environment': environment} + prom_labels = { + 'type': message_type.value, + 'environment': environment, + 'schema_version': message.schema_version, + } + if message_type == MessageType.USAGE: + prom_labels['new_cluster'] = (message.original_cluster_status != 'UP' + and message.final_cluster_status == 'UP') headers = {'Content-type': 'application/json'} payload = { @@ -384,7 +418,7 @@ def prepare_json_from_yaml_config( def _send_local_messages(): """Send all messages not been uploaded to Loki.""" for msg_type, message in messages.items(): - if not message.message_sent: + if not message.message_sent and msg_type != MessageType.HEARTBEAT: # Avoid the fallback entrypoint to send the message again # in normal case. try: @@ -394,6 +428,11 @@ def _send_local_messages(): f'exception caught: {type(e)}({e})') +def send_heartbeat(interval_seconds: int = 600): + messages.heartbeat.interval_seconds = interval_seconds + _send_to_loki(MessageType.HEARTBEAT) + + @contextlib.contextmanager def entrypoint_context(name: str, fallback: bool = False): """Context manager for entrypoint. diff --git a/sky/utils/env_options.py b/sky/utils/env_options.py index cfc20a76253..b1dd10a219b 100644 --- a/sky/utils/env_options.py +++ b/sky/utils/env_options.py @@ -1,6 +1,7 @@ """Global environment options for sky.""" import enum import os +from typing import Dict class Options(enum.Enum): @@ -35,3 +36,8 @@ def get(self) -> bool: def env_key(self) -> str: """The environment variable key name.""" return self.value[0] + + @classmethod + def all_options(cls) -> Dict[str, bool]: + """Returns all options as a dictionary.""" + return {option.env_key: option.get() for option in list(Options)}