Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add heartbeat for usage collection and update to Loki 3.3 #4499

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
39 changes: 34 additions & 5 deletions sky/provision/instance_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@
from sky.provision import logging as provision_logging
from sky.provision import metadata_utils
from sky.skylet import constants
from sky.usage import constants as usage_constants
from sky.usage import usage_lib
from sky.utils import accelerator_registry
from sky.utils import command_runner
from sky.utils import common_utils
from sky.utils import env_options
from sky.utils import subprocess_utils
from sky.utils import timeline
from sky.utils import ux_utils
Expand Down Expand Up @@ -67,6 +70,26 @@
'sky.skylet.attempt_skylet;')


def _set_usage_run_id_cmd() -> str:
"""Gets the command to set the usage run id.

The command saves the current usage run id to the file, so that the skylet
can use it to report the heartbeat.

We use a function instead of a constant so that the usage run id is the
latest one when the function is called.
"""
return (f'cat {usage_constants.USAGE_RUN_ID_FILE} || '
f'echo "{usage_lib.messages.usage.run_id}" > '
f'{usage_constants.USAGE_RUN_ID_FILE}')


def _set_skypilot_env_var_cmd() -> str:
"""Sets the skypilot environment variables on the remote machine."""
env_vars = env_options.Options.all_options()
return '; '.join([f'export {k}={v}' for k, v in env_vars.items()])


def _auto_retry(should_retry: Callable[[Exception], bool] = lambda _: True):
"""Decorator that retries the function if it fails.

Expand Down Expand Up @@ -450,11 +473,17 @@ def start_skylet_on_head_node(cluster_name: str,
logger.info(f'Running command on head node: {MAYBE_SKYLET_RESTART_CMD}')
# We need to source bashrc for skylet to make sure the autostop event can
# access the path to the cloud CLIs.
returncode, stdout, stderr = head_runner.run(MAYBE_SKYLET_RESTART_CMD,
stream_logs=False,
require_outputs=True,
log_path=log_path_abs,
source_bashrc=True)
set_usage_run_id_cmd = _set_usage_run_id_cmd()
# Set the skypilot environment variables, including the usage type, debug
# info, and other options.
set_skypilot_env_var_cmd = _set_skypilot_env_var_cmd()
returncode, stdout, stderr = head_runner.run(
f'{set_usage_run_id_cmd}; {set_skypilot_env_var_cmd}; '
f'{MAYBE_SKYLET_RESTART_CMD}',
stream_logs=False,
require_outputs=True,
log_path=log_path_abs,
source_bashrc=True)
if returncode:
raise RuntimeError('Failed to start skylet on the head node '
f'(exit code {returncode}). Error: '
Expand Down
9 changes: 9 additions & 0 deletions sky/skylet/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from sky.skylet import autostop_lib
from sky.skylet import constants
from sky.skylet import job_lib
from sky.usage import usage_lib
from sky.utils import cluster_yaml_utils
from sky.utils import common_utils
from sky.utils import ux_utils
Expand Down Expand Up @@ -87,6 +88,14 @@ def _run(self):
serve_utils.update_service_status()


class UsageHeartbeatReportEvent(SkyletEvent):
"""Skylet event for reporting usage."""
EVENT_INTERVAL_SECONDS = 600

def _run(self):
usage_lib.send_heartbeat(interval_seconds=self.EVENT_INTERVAL_SECONDS)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a little bit curious why we need this interval seconds argument. Doesnt skylet already has a interval mechanism..?

def run(self):
self._n = (self._n + 1) % self._event_interval
if self._n % self._event_interval == 0:
logger.debug(f'{self.__class__.__name__} triggered')
try:
self._run()
except Exception as e: # pylint: disable=broad-except
# Keep the skylet running even if an event fails.
logger.error(f'{self.__class__.__name__} error: {e}')
with ux_utils.enable_traceback():
logger.error(traceback.format_exc())



class AutostopEvent(SkyletEvent):
"""Skylet event for autostop.

Expand Down
2 changes: 2 additions & 0 deletions sky/skylet/skylet.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
# unhealthy, this event will correctly update the controller
# status to CONTROLLER_FAILED.
events.ServiceUpdateEvent(),
# Report usage heartbeat every 10 minutes.
events.UsageHeartbeatReportEvent(),
]

while True:
Expand Down
4 changes: 3 additions & 1 deletion sky/usage/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
LOG_URL = 'http://usage.skypilot.co:9090/loki/api/v1/push' # pylint: disable=line-too-long

USAGE_MESSAGE_SCHEMA_VERSION = 1

PRIVACY_POLICY_PATH = '~/.sky/privacy_policy'

USAGE_POLICY_MESSAGE = (
Expand All @@ -15,3 +14,6 @@

USAGE_MESSAGE_REDACT_KEYS = ['setup', 'run', 'envs']
USAGE_MESSAGE_REDACT_TYPES = {str, dict}

USAGE_RUN_ID_ENV_VAR = 'SKYPILOT_USAGE_RUN_ID'
USAGE_RUN_ID_FILE = '~/.sky/usage_run_id'
35 changes: 24 additions & 11 deletions sky/usage/loki-s3-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ server:
http_listen_port: 9090
http_server_read_timeout: 300s
http_server_write_timeout: 300s
grpc_server_max_recv_msg_size: 1073741824 # 128 MB
grpc_server_max_send_msg_size: 1073741824 # 128 MB

schema_config:
configs:
Expand All @@ -14,12 +16,18 @@ schema_config:
index:
prefix: loki_index_
period: 24h
- from: 2024-12-30
store: tsdb
object_store: aws
schema: v13
index:
prefix: loki_index_
period: 24h

ingester:
chunk_idle_period: 3m
chunk_block_size: 262144
chunk_retain_period: 1m
max_transfer_retries: 0
wal:
enabled: true
dir: /loki/wal
Expand All @@ -31,18 +39,19 @@ ingester:

storage_config:
aws:
bucketnames: sky-host-loki
bucketnames: skypilot-loki
region: us-west-2
access_key_id: PLACEHOLDER
secret_access_key: PLACEHOLDER
s3forcepathstyle: true
boltdb_shipper:
active_index_directory: /loki/index
shared_store: s3
cache_location: /loki/boltdb-cache
tsdb_shipper:
active_index_directory: /loki/tsdb-index
cache_location: /loki/tsdb-cache

querier:
query_timeout: 10m
max_concurrent: 32
query_scheduler:
max_outstanding_requests_per_tenant: 4096
Expand All @@ -53,23 +62,27 @@ query_range:

limits_config:
max_query_series: 5000
ingestion_rate_strategy: local # Default: global
ingestion_rate_strategy: local
max_global_streams_per_user: 5000
max_query_length: 0h # Default: 721h
max_query_parallelism: 32 # Old Default: 14
max_query_length: 0h
max_query_parallelism: 32
max_entries_limit_per_query: 1000000
max_streams_per_user: 0 # Old Default: 10000
max_streams_per_user: 0
reject_old_samples: true
reject_old_samples_max_age: 168h
query_timeout: 10m
allow_structured_metadata: false

compactor:
working_directory: /loki/boltdb-shipper-compactor
shared_store: aws

chunk_store_config:
max_look_back_period: 0s
chunk_cache_config:
# Updated cache configuration for Loki 3.3
embedded_cache:
enabled: true
max_size_mb: 1024 # 1GB cache, expressed in MB

table_manager:
retention_deletes_enabled: false
retention_period: 0s

59 changes: 49 additions & 10 deletions sky/usage/usage_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def _get_current_timestamp_ns() -> int:
class MessageType(enum.Enum):
"""Types for messages to be sent to Loki."""
USAGE = 'usage'
HEARTBEAT = 'heartbeat'
# TODO(zhwu): Add more types, e.g., cluster_lifecycle.


Expand All @@ -59,8 +60,9 @@ def get_properties(self) -> Dict[str, Any]:
properties = self.__dict__.copy()
return {k: v for k, v in properties.items() if not k.startswith('_')}

def __repr__(self):
raise NotImplementedError
def __repr__(self) -> str:
d = self.get_properties()
return json.dumps(d)


class UsageMessageToReport(MessageToReport):
Expand Down Expand Up @@ -152,10 +154,6 @@ def __init__(self) -> None:
self.exception: Optional[str] = None # entrypoint_context
self.stacktrace: Optional[str] = None # entrypoint_context

def __repr__(self) -> str:
d = self.get_properties()
return json.dumps(d)

def update_entrypoint(self, msg: str):
self.entrypoint = msg

Expand Down Expand Up @@ -267,16 +265,43 @@ def update_runtime(self, name_or_fn: str):
name_or_fn)


class HeartbeatMessageToReport(MessageToReport):
"""Message to be reported to Grafana Loki for heartbeat on a cluster."""

def __init__(self, interval_seconds: int = 600):
super().__init__(constants.USAGE_MESSAGE_SCHEMA_VERSION)
# This interval_seconds is mainly for recording the heartbeat interval
# in the heartbeat message, so that the collector can use it.
self.interval_seconds = interval_seconds
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this variable used?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it will be included as part of the payload sent when we are calling str(message) in _send_to_loki. This is mostly for future proof in case we need to change the interval seconds in the future.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment for it.


def get_properties(self) -> Dict[str, Any]:
properties = super().get_properties()
# The run id is set by the skylet, which will always be the same for
# the entire lifetime of the run.
with open(os.path.expanduser(constants.USAGE_RUN_ID_FILE),
'r',
encoding='utf-8') as f:
properties['run_id'] = f.read().strip()
return properties


class MessageCollection:
"""A collection of messages."""

def __init__(self):
self._messages = {MessageType.USAGE: UsageMessageToReport()}
self._messages = {
MessageType.USAGE: UsageMessageToReport(),
MessageType.HEARTBEAT: HeartbeatMessageToReport()
}

@property
def usage(self):
def usage(self) -> UsageMessageToReport:
return self._messages[MessageType.USAGE]

@property
def heartbeat(self) -> HeartbeatMessageToReport:
return self._messages[MessageType.HEARTBEAT]

def reset(self, message_type: MessageType):
self._messages[message_type] = self._messages[message_type].__class__()

Expand All @@ -300,13 +325,22 @@ def _send_to_loki(message_type: MessageType):

message = messages[message_type]

# In case the message has no start time, set it to the current time.
message.start()
message.send_time = _get_current_timestamp_ns()
log_timestamp = message.start_time

environment = 'prod'
if env_options.Options.IS_DEVELOPER.get():
environment = 'dev'
prom_labels = {'type': message_type.value, 'environment': environment}
prom_labels = {
'type': message_type.value,
'environment': environment,
'schema_version': message.schema_version,
}
if message_type == MessageType.USAGE:
prom_labels['new_cluster'] = (message.original_cluster_status != 'UP'
and message.final_cluster_status == 'UP')

headers = {'Content-type': 'application/json'}
payload = {
Expand Down Expand Up @@ -384,7 +418,7 @@ def prepare_json_from_yaml_config(
def _send_local_messages():
"""Send all messages not been uploaded to Loki."""
for msg_type, message in messages.items():
if not message.message_sent:
if not message.message_sent and msg_type != MessageType.HEARTBEAT:
# Avoid the fallback entrypoint to send the message again
# in normal case.
try:
Expand All @@ -394,6 +428,11 @@ def _send_local_messages():
f'exception caught: {type(e)}({e})')


def send_heartbeat(interval_seconds: int = 600):
messages.heartbeat.interval_seconds = interval_seconds
_send_to_loki(MessageType.HEARTBEAT)


@contextlib.contextmanager
def entrypoint_context(name: str, fallback: bool = False):
"""Context manager for entrypoint.
Expand Down
6 changes: 6 additions & 0 deletions sky/utils/env_options.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Global environment options for sky."""
import enum
import os
from typing import Dict


class Options(enum.Enum):
Expand Down Expand Up @@ -35,3 +36,8 @@ def get(self) -> bool:
def env_key(self) -> str:
"""The environment variable key name."""
return self.value[0]

@classmethod
def all_options(cls) -> Dict[str, bool]:
"""Returns all options as a dictionary."""
return {option.env_key: option.get() for option in list(Options)}
Loading