-
Notifications
You must be signed in to change notification settings - Fork 32
/
Copy pathtelemetry.py
118 lines (92 loc) · 3.36 KB
/
telemetry.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import json
import logging
import os
import uuid
from functools import lru_cache, wraps
from pathlib import Path
from typing import Any, Dict, Optional
from appdirs import user_data_dir
from dotenv import load_dotenv
from posthog import Posthog
load_dotenv()
_USER_DATA_DIR_NAME = "continuous_eval"
_DO_NOT_TRACK = "CONTINUOUS_EVAL_DO_NOT_TRACK"
_DEBUG_TELEMETRY = "CONTINUOUS_EVAL_DEBUG_TELEMETRY"
_USER_ID_PREFIX = "ce-"
logger = logging.getLogger("AnonymousTelemetry")
logger.setLevel(logging.DEBUG) # Set to lowest level to allow all messages
@lru_cache(maxsize=1)
def _do_not_track() -> bool:
return os.environ.get(_DO_NOT_TRACK, "false").lower() == "true"
@lru_cache(maxsize=1)
def _debug_telemetry() -> bool:
return os.environ.get(_DEBUG_TELEMETRY, "false").lower() == "true"
@lru_cache(maxsize=1)
def _get_or_generate_uid() -> str:
user_id_path = Path(user_data_dir(appname=_USER_DATA_DIR_NAME))
user_id_path.mkdir(parents=True, exist_ok=True)
uuid_filepath = user_id_path / "config.json"
user_id = None
if uuid_filepath.is_file():
# try reading the file first
try:
user_id = json.load(open(uuid_filepath))["userid"]
except Exception:
pass
if user_id is None:
user_id = _USER_ID_PREFIX + uuid.uuid4().hex
try:
with open(uuid_filepath, "w") as f:
json.dump({"userid": user_id}, f)
except Exception:
pass
return user_id
class AnonymousTelemetry:
def __init__(self):
self.uid = _get_or_generate_uid()
self._client = Posthog(
"phc_FS1KnMOU6v6FWqO5jyjiVDcdBKyHF61KCajn7oANpPC",
host="https://us.i.posthog.com",
debug=_debug_telemetry(),
)
if _do_not_track():
logger.debug("Telemetry is disabled")
self._client.disabled = True
def event(self, name: Optional[str] = None, info: Dict[str, Any] = {}):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# event_name = name or args[0].__class__.__name__
self.log_event(name=func.__qualname__, info=info)
return func(*args, **kwargs)
return wrapper
return decorator
def log_event(self, name: str, info: Dict[str, Any] = {}):
try:
self._client.capture(
distinct_id=self.uid, event=name, properties=info
)
except Exception as e:
# This way it silences all thread level logging as well
if _debug_telemetry():
logging.debug(f"Telemetry error: {e}")
def telemetry_initializer() -> AnonymousTelemetry:
"""
This function is executed once per child process to initialize telemetry.
"""
global telemetry
telemetry = AnonymousTelemetry()
logger.debug("Telemetry reinitialized in child process.")
return telemetry
telemetry = telemetry_initializer()
def telemetry_event(name: Optional[str] = None, info: Dict[str, Any] = {}):
global telemetry
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
event_name = name or args[0].__class__.__name__
info["__qualname__"] = func.__qualname__
telemetry.log_event(name=event_name, info=info)
return func(*args, **kwargs)
return wrapper
return decorator