Skip to content

Commit

Permalink
feat: automatic assignment tracking
Browse files Browse the repository at this point in the history
  • Loading branch information
Tim Yiu authored and Tim Yiu committed Jul 24, 2023
1 parent 71ff479 commit 9e9c4d0
Show file tree
Hide file tree
Showing 17 changed files with 396 additions and 3 deletions.
Empty file added src/__init__.py
Empty file.
1 change: 1 addition & 0 deletions src/amplitude_experiment/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@
from .cookie import AmplitudeCookie
from .local.client import LocalEvaluationClient
from .local.config import LocalEvaluationConfig
from .flagresult import FlagResult
3 changes: 3 additions & 0 deletions src/amplitude_experiment/assignment/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .assignment import Assignment, DAY_MILLIS
from .assignment_filter import AssignmentFilter
from .assignment_service import AssignmentService, to_event
24 changes: 24 additions & 0 deletions src/amplitude_experiment/assignment/assignment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import time
from typing import Dict

from src.amplitude_experiment.flagresult import FlagResult
from src.amplitude_experiment.user import User

DAY_MILLIS = 24 * 60 * 60 * 1000


class Assignment:

def __init__(self, user: User, results: Dict[str, FlagResult]):
self.user = user
self.results = results
self.timestamp = time.time()

def canonicalize(self) -> str:
user = self.user.user_id.strip() if self.user.user_id else 'undefined'
device = self.user.device_id.strip() if self.user.device_id else 'undefined'
sb = user + ' ' + device + ' '
for key in sorted(self.results):
value = self.results[key].value.strip() if self.results[key] else 'undefined'
sb += key.strip() + ' ' + value + ' '
return sb
7 changes: 7 additions & 0 deletions src/amplitude_experiment/assignment/assignment_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import amplitude


class AssignmentConfig(amplitude.Config):
def __init__(self, filter_capacity: int = 65536, **kw):
self.filter_capacity = filter_capacity
super(AssignmentConfig, self).__init__(**kw)
18 changes: 18 additions & 0 deletions src/amplitude_experiment/assignment/assignment_filter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import time

from .assignment import Assignment
from .assignment import DAY_MILLIS
from ..util.cache import Cache


class AssignmentFilter:
def __init__(self, size: int):
self.cache = Cache(size, DAY_MILLIS)

def should_track(self, assignment: Assignment) -> bool:
now = time.time()
canonical_assignment = assignment.canonicalize()
track = self.cache.get(canonical_assignment) is None
if track:
self.cache.put(canonical_assignment, object())
return track
41 changes: 41 additions & 0 deletions src/amplitude_experiment/assignment/assignment_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from amplitude import Amplitude, BaseEvent
from ..assignment.assignment import Assignment
from ..assignment.assignment import DAY_MILLIS
from ..assignment.assignment_filter import AssignmentFilter

FLAG_TYPE_MUTUAL_EXCLUSION_GROUP = "mutual-exclusion-group"


def to_event(assignment: Assignment) -> BaseEvent:
event = BaseEvent(event_type='[Experiment] Assignment', user_id=assignment.user.user_id,
device_id=assignment.user.device_id, event_properties={}, user_properties={})
for key in sorted(assignment.results):
event.event_properties[key + '.variant'] = assignment.results[key].value

set_props = {}
unset_props = {}

for key in sorted(assignment.results):
if assignment.results[key].type == FLAG_TYPE_MUTUAL_EXCLUSION_GROUP:
continue
elif assignment.results[key].is_default_variant:
unset_props[f'[Experiment] {key}'] = '-'
else:
set_props[f'[Experiment] {key}'] = assignment.results[key].value

event.user_properties['$set'] = set_props
event.user_properties['$unset'] = unset_props

event.insert_id = f'{event.user_id} {event.device_id} {hash(assignment.canonicalize())} {assignment.timestamp / DAY_MILLIS}'

return event


class AssignmentService:
def __init__(self, amplitude: Amplitude, assignment_filter: AssignmentFilter):
self.amplitude = amplitude
self.assignmentFilter = assignment_filter

def track(self, assignment: Assignment):
if self.assignmentFilter.should_track(assignment):
self.amplitude.track(to_event(assignment))
9 changes: 9 additions & 0 deletions src/amplitude_experiment/flagresult.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
class FlagResult:
def __init__(self, value: str, is_default_variant: bool, payload: str = None, expkey: str = None,
deployed: bool = None, type: str = None):
self.value = value
self.payload = payload
self.is_default_variant = is_default_variant
self.expkey = expkey
self.deployed = deployed
self.type = type
17 changes: 16 additions & 1 deletion src/amplitude_experiment/local/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
from threading import Lock
from typing import Any, List, Dict

from amplitude import Amplitude

from .config import LocalEvaluationConfig
from ..assignment import Assignment, AssignmentFilter, AssignmentService
from ..user import User
from ..connection_pool import HTTPConnectionPool
from .poller import Poller
Expand All @@ -25,10 +28,17 @@ def __init__(self, api_key: str, config: LocalEvaluationConfig = None):
Returns:
Experiment Client instance.
"""

if not api_key:
raise ValueError("Experiment API key is empty")
self.api_key = api_key
self.config = config or LocalEvaluationConfig()
self.assignment_service = None
if config and config.assignment_config:
print('set assignment service')
instance = Amplitude(config.assignment_config.api_key, config.assignment_config)
self.assignment_service = AssignmentService(instance, AssignmentFilter(
config.assignment_config.filter_capacity))
self.logger = logging.getLogger("Amplitude")
self.logger.addHandler(logging.StreamHandler())
if self.config.debug:
Expand Down Expand Up @@ -58,12 +68,16 @@ def evaluate(self, user: User, flag_keys: List[str] = None) -> Dict[str, Variant
"""
variants = {}
if self.flags is None or len(self.flags) == 0:
if self.assignment_service:
self.assignment_service.track(Assignment(user, {}))
return variants
user_json = str(user)
self.logger.debug(f"[Experiment] Evaluate: User: {user_json} - Flags: {self.flags}")
result_json = evaluate(self.flags, user_json)
self.logger.debug(f"[Experiment] Evaluate Result: {result_json}")
evaluation_result = json.loads(result_json)
if self.assignment_service:
self.assignment_service.track(Assignment(user, {}))
filter_result = flag_keys is not None
for key, value in evaluation_result.items():
if value.get('isDefaultVariant') or (filter_result and key not in flag_keys):
Expand All @@ -84,7 +98,8 @@ def __do_flags(self):
response = conn.request('GET', '/sdk/v1/flags', body, headers)
response_body = response.read().decode("utf8")
if response.status != 200:
raise Exception(f"[Experiment] Get flagConfigs - received error response: ${response.status}: ${response_body}")
raise Exception(
f"[Experiment] Get flagConfigs - received error response: ${response.status}: ${response_body}")
self.logger.debug(f"[Experiment] Got flag configs: {response_body}")
self.lock.acquire()
self.flags = response_body
Expand Down
7 changes: 6 additions & 1 deletion src/amplitude_experiment/local/config.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from src.amplitude_experiment.assignment.assignment_config import AssignmentConfig


class LocalEvaluationConfig:
"""Experiment Local Client Configuration"""

Expand All @@ -6,7 +9,8 @@ class LocalEvaluationConfig:
def __init__(self, debug: bool = False,
server_url: str = DEFAULT_SERVER_URL,
flag_config_polling_interval_millis: int = 30000,
flag_config_poller_request_timeout_millis: int = 10000):
flag_config_poller_request_timeout_millis: int = 10000,
assignment_config: AssignmentConfig = None):
"""
Initialize a config
Parameters:
Expand All @@ -25,3 +29,4 @@ def __init__(self, debug: bool = False,
self.server_url = server_url
self.flag_config_polling_interval_millis = flag_config_polling_interval_millis
self.flag_config_poller_request_timeout_millis = flag_config_poller_request_timeout_millis
self.assignment_config = assignment_config
1 change: 1 addition & 0 deletions src/amplitude_experiment/util/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .cache import Cache
67 changes: 67 additions & 0 deletions src/amplitude_experiment/util/cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import time

class Cache:
class Node:
def __init__(self, key, value):
self.key = key
self.value = value
self.prev = None
self.next = None
self.last_accessed_time = time.time()

def __init__(self, capacity, ttl_millis):
self.capacity = capacity
self.ttl_millis = ttl_millis
self.cache = {}
self.head = self.Node(None, None)
self.tail = self.Node(None, None)
self.head.next = self.tail
self.tail.prev = self.head

def _add_node(self, node):
node.prev = self.head
node.next = self.head.next
self.head.next.prev = node
self.head.next = node

def _remove_node(self, node):
prev = node.prev
next_node = node.next
prev.next = next_node
next_node.prev = prev

def _move_to_head(self, node):
self._remove_node(node)
self._add_node(node)

def get(self, key):
if key in self.cache:
node = self.cache[key]
current_time = time.time()
if (current_time - node.last_accessed_time) * 1000 <= self.ttl_millis:
node.last_accessed_time = current_time # Update last accessed time
self._move_to_head(node)
return node.value
else:
# Node has expired, remove it from the cache
self._remove_node(node)
del self.cache[key]
return None

def put(self, key, value):
if key in self.cache:
node = self.cache[key]
node.value = value
node.last_accessed_time = time.time() # Update last accessed time
self._move_to_head(node)
else:
if len(self.cache) >= self.capacity:
# Evict the least recently used node (tail's prev)
tail_prev = self.tail.prev
self._remove_node(tail_prev)
del self.cache[tail_prev.key]

new_node = self.Node(key, value)
self._add_node(new_node)
self.cache[key] = new_node

Empty file.
129 changes: 129 additions & 0 deletions tests/local/assignment/assignment_filter_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
import time
import unittest
from src.amplitude_experiment import User, FlagResult
from src.amplitude_experiment.assignment import Assignment, AssignmentFilter
from tests.local.util.mock_assignment_filter import MockAssignmentFilter


class AssignmentFilterTestCase(unittest.TestCase):

def test_single_assignment(self):
assignment_filter = AssignmentFilter(100)
user = User(user_id='user', device_id='device')
results = {}
result1 = FlagResult(value='on', is_default_variant=False)
result2 = FlagResult(value='control', is_default_variant=True)
results['flag-key-1'] = result1
results['flag-key-2'] = result2
assignment = Assignment(user, results)
self.assertTrue(assignment_filter.should_track(assignment))

def test_duplicate_assignments(self):
assignment_filter = AssignmentFilter(100)
user = User(user_id='user', device_id='device')
results = {}
result1 = FlagResult(value='on', is_default_variant=False)
result2 = FlagResult(value='control', is_default_variant=True)
results['flag-key-1'] = result1
results['flag-key-2'] = result2
assignment1 = Assignment(user, results)
assignment2 = Assignment(user, results)
self.assertTrue(assignment_filter.should_track(assignment1))
self.assertFalse(assignment_filter.should_track(assignment2))

def test_same_user_different_results(self):
assignment_filter = AssignmentFilter(100)
user = User(user_id='user', device_id='device')
results1 = {}
results2 = {}
result1 = FlagResult(value='on', is_default_variant=False)
result2 = FlagResult(value='control', is_default_variant=True)
results1['flag-key-1'] = result1
results1['flag-key-2'] = result2
results2['flag-key-2'] = result1
results2['flag-key-1'] = result2
assignment1 = Assignment(user, results1)
assignment2 = Assignment(user, results2)
self.assertTrue(assignment_filter.should_track(assignment1))
self.assertTrue(assignment_filter.should_track(assignment2))

def test_same_results_different_users(self):
assignment_filter = AssignmentFilter(100)
user1 = User(user_id='user', device_id='device')
user2 = User(user_id='different user', device_id='device')
results = {}
result1 = FlagResult(value='on', is_default_variant=False)
result2 = FlagResult(value='control', is_default_variant=True)
results['flag-key-1'] = result1
results['flag-key-2'] = result2
assignment1 = Assignment(user1, results)
assignment2 = Assignment(user2, results)
self.assertTrue(assignment_filter.should_track(assignment1))
self.assertTrue(assignment_filter.should_track(assignment2))

def test_empty_results(self):
assignment_filter = AssignmentFilter(100)
user1 = User(user_id='user', device_id='device')
user2 = User(user_id='different user', device_id='device')
assignment1 = Assignment(user1, {})
assignment2 = Assignment(user1, {})
assignment3 = Assignment(user2, {})
self.assertTrue(assignment_filter.should_track(assignment1))
self.assertFalse(assignment_filter.should_track(assignment2))
self.assertTrue(assignment_filter.should_track(assignment3))

def test_duplicate_assignments_with_different_ordering(self):
assignment_filter = AssignmentFilter(100)
user = User(user_id='user', device_id='device')
results1 = {}
results2 = {}
result1 = FlagResult(value='on', is_default_variant=False)
result2 = FlagResult(value='control', is_default_variant=True)
results1['flag-key-1'] = result1
results1['flag-key-2'] = result2
results2['flag-key-2'] = result2
results2['flag-key-1'] = result1
assignment1 = Assignment(user, results1)
assignment2 = Assignment(user, results2)
self.assertTrue(assignment_filter.should_track(assignment1))
self.assertFalse(assignment_filter.should_track(assignment2))

def test_lru_replacement(self):
assignment_filter = AssignmentFilter(2)
user1 = User(user_id='user1', device_id='device')
user2 = User(user_id='user2', device_id='device')
user3 = User(user_id='user3', device_id='device')
results = {}
result1 = FlagResult(value='on', is_default_variant=False)
result2 = FlagResult(value='control', is_default_variant=True)
results['flag-key-1'] = result1
results['flag-key-2'] = result2
assignment1 = Assignment(user1, results)
assignment2 = Assignment(user2, results)
assignment3 = Assignment(user3, results)
self.assertTrue(assignment_filter.should_track(assignment1))
self.assertTrue(assignment_filter.should_track(assignment2))
self.assertTrue(assignment_filter.should_track(assignment3))
self.assertTrue(assignment_filter.should_track(assignment1))

def test_lru_expiration(self):
assignment_filter = MockAssignmentFilter(100, 1000)
user1 = User(user_id='user1', device_id='device')
user2 = User(user_id='user2', device_id='device')
results = {}
result1 = FlagResult(value='on', is_default_variant=False)
result2 = FlagResult(value='control', is_default_variant=True)
results['flag-key-1'] = result1
results['flag-key-2'] = result2
assignment1 = Assignment(user1, results)
assignment2 = Assignment(user2, results)
# assignment1 should be evicted
self.assertTrue(assignment_filter.should_track(assignment1))
self.assertFalse(assignment_filter.should_track(assignment1))
time.sleep(1.1)
self.assertTrue(assignment_filter.should_track(assignment1))
# assignment2 should not be evicted
self.assertTrue(assignment_filter.should_track(assignment2))
self.assertFalse(assignment_filter.should_track(assignment2))
time.sleep(0.95)
self.assertFalse(assignment_filter.should_track(assignment2))
Loading

0 comments on commit 9e9c4d0

Please sign in to comment.