Skip to content

Commit

Permalink
feat: support cohort targeting for local evaluation (#27)
Browse files Browse the repository at this point in the history
  • Loading branch information
tyiuhc authored Aug 27, 2024
1 parent c75a4c2 commit 92179de
Show file tree
Hide file tree
Showing 25 changed files with 2,050 additions and 53 deletions.
6 changes: 6 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ jobs:
with:
version: latest
test:
environment: Unit Test
runs-on: 'ubuntu-latest'
steps:
- name: Checkout
Expand All @@ -35,5 +36,10 @@ jobs:
go-version: '1.17'
check-latest: true
- name: Test
env:
API_KEY: ${{ secrets.API_KEY }}
SECRET_KEY: ${{ secrets.SECRET_KEY }}
EU_API_KEY: ${{ secrets.EU_API_KEY }}
EU_SECRET_KEY: ${{ secrets.EU_SECRET_KEY }}
run: |
go test ./...
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@
xpmt
.DS_Store
cmd/xpmt/bin/
pkg/experiment/local/.env
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,12 @@ Fetch variants for a user given an experiment user JSON object
```

> Note: must use single quotes around JSON object string
### Running unit tests suite
To set up for running test on local, create a `.env` file in `pkg/experiment/local` with following
contents, and replace `{API_KEY}` and `{SECRET_KEY}` (or `{EU_API_KEY}` and `{EU_SECRET_KEY}` for EU data center) for the project in test:

```
API_KEY={API_KEY}
SECRET_KEY={SECRET_KEY}
```
7 changes: 6 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,9 @@ go 1.12

require github.com/spaolacci/murmur3 v1.1.0

require github.com/amplitude/analytics-go v1.0.1
require (
github.com/amplitude/analytics-go v1.0.1
github.com/jarcoal/httpmock v1.3.1
github.com/joho/godotenv v1.5.1
github.com/stretchr/testify v1.9.0
)
13 changes: 11 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,27 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/jarcoal/httpmock v1.3.1 h1:iUx3whfZWVf3jT01hQTO/Eo5sAYtB2/rqaUuOtpInww=
github.com/jarcoal/httpmock v1.3.1/go.mod h1:3yb8rc4BI7TCBhFY8ng0gjuLKJNquuDNiPaZjnENuYg=
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
github.com/maxatome/go-testdeep v1.12.0 h1:Ql7Go8Tg0C1D/uMMX59LAoYK7LffeJQ6X2T04nTH68g=
github.com/maxatome/go-testdeep v1.12.0/go.mod h1:lPZc/HAcJMP92l7yI6TRz1aZN5URwUBUAfUNvrclaNM=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
Expand Down
57 changes: 56 additions & 1 deletion internal/evaluation/context.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package evaluation

import "github.com/amplitude/experiment-go-server/pkg/experiment"
import (
"github.com/amplitude/experiment-go-server/pkg/experiment"
)

func UserToContext(user *experiment.User) map[string]interface{} {
if user == nil {
return nil
}
context := make(map[string]interface{})
userMap := make(map[string]interface{})

if len(user.UserId) != 0 {
userMap["user_id"] = user.UserId
}
Expand Down Expand Up @@ -56,6 +59,58 @@ func UserToContext(user *experiment.User) map[string]interface{} {
if len(user.UserProperties) != 0 {
userMap["user_properties"] = user.UserProperties
}
if len(user.Groups) != 0 {
userMap["groups"] = user.Groups
}
if len(user.CohortIds) != 0 {
userMap["cohort_ids"] = extractKeys(user.CohortIds)
}

context["user"] = userMap

if user.Groups == nil {
return context
}

groups := make(map[string]interface{})
for groupType, groupNames := range user.Groups {
if len(groupNames) > 0 {
groupName := groupNames[0]
groupNameMap := map[string]interface{}{
"group_name": groupName,
}

if user.GroupProperties != nil {
if groupPropertiesType, ok := user.GroupProperties[groupType]; ok {
if groupPropertiesName, ok := groupPropertiesType[groupName]; ok {
groupNameMap["group_properties"] = groupPropertiesName
}
}
}

if user.GroupCohortIds != nil {
if groupCohortIdsType, ok := user.GroupCohortIds[groupType]; ok {
if groupCohortIdsName, ok := groupCohortIdsType[groupName]; ok {
groupNameMap["cohort_ids"] = extractKeys(groupCohortIdsName)
}
}
}

groups[groupType] = groupNameMap
}
}

if len(groups) > 0 {
context["groups"] = groups
}

return context
}

func extractKeys(m map[string]struct{}) []string {
keys := make([]string, 0, len(m))
for key := range m {
keys = append(keys, key)
}
return keys
}
121 changes: 92 additions & 29 deletions pkg/experiment/local/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@ type Client struct {
config *Config
client *http.Client
poller *poller
flags map[string]*evaluation.Flag
flagsMutex *sync.RWMutex
engine *evaluation.Engine
assignmentService *assignmentService
cohortStorage cohortStorage
flagConfigStorage flagConfigStorage
cohortLoader *cohortLoader
deploymentRunner *deploymentRunner
}

func Initialize(apiKey string, config *Config) *Client {
Expand All @@ -43,23 +46,35 @@ func Initialize(apiKey string, config *Config) *Client {
config = fillConfigDefaults(config)
log := logger.New(config.Debug)
var as *assignmentService
if config.AssignmentConfig != nil && config.AssignmentConfig.APIKey != "" {
if config.AssignmentConfig != nil && config.AssignmentConfig.APIKey != "" {
amplitudeClient := amplitude.NewClient(config.AssignmentConfig.Config)
as = &assignmentService{
amplitude: &amplitudeClient,
filter: newAssignmentFilter(config.AssignmentConfig.CacheCapacity),
filter: newAssignmentFilter(config.AssignmentConfig.CacheCapacity),
}
}
cohortStorage := newInMemoryCohortStorage()
flagConfigStorage := newInMemoryFlagConfigStorage()
var cohortLoader *cohortLoader
var deploymentRunner *deploymentRunner
if config.CohortSyncConfig != nil {
cohortDownloadApi := newDirectCohortDownloadApi(config.CohortSyncConfig.ApiKey, config.CohortSyncConfig.SecretKey, config.CohortSyncConfig.MaxCohortSize, config.CohortSyncConfig.CohortServerUrl, config.Debug)
cohortLoader = newCohortLoader(cohortDownloadApi, cohortStorage)
}
deploymentRunner = newDeploymentRunner(config, newFlagConfigApiV2(apiKey, config.ServerUrl, config.FlagConfigPollerRequestTimeout), flagConfigStorage, cohortStorage, cohortLoader)
client = &Client{
log: log,
apiKey: apiKey,
config: config,
client: &http.Client{},
poller: newPoller(),
flags: make(map[string]*evaluation.Flag),
flagsMutex: &sync.RWMutex{},
engine: evaluation.NewEngine(log),
log: log,
apiKey: apiKey,
config: config,
client: &http.Client{},
poller: newPoller(),
flagsMutex: &sync.RWMutex{},
engine: evaluation.NewEngine(log),
assignmentService: as,
cohortStorage: cohortStorage,
flagConfigStorage: flagConfigStorage,
cohortLoader: cohortLoader,
deploymentRunner: deploymentRunner,
}
client.log.Debug("config: %v", *config)
clients[apiKey] = client
Expand All @@ -69,20 +84,10 @@ func Initialize(apiKey string, config *Config) *Client {
}

func (c *Client) Start() error {
result, err := c.doFlagsV2()
err := c.deploymentRunner.start()
if err != nil {
return err
}
c.flags = result
c.poller.Poll(c.config.FlagConfigPollerInterval, func() {
result, err := c.doFlagsV2()
if err != nil {
return
}
c.flagsMutex.Lock()
c.flags = result
c.flagsMutex.Unlock()
})
return nil
}

Expand Down Expand Up @@ -110,10 +115,17 @@ func (c *Client) Evaluate(user *experiment.User, flagKeys []string) (map[string]
}

func (c *Client) EvaluateV2(user *experiment.User, flagKeys []string) (map[string]experiment.Variant, error) {
userContext := evaluation.UserToContext(user)
c.flagsMutex.RLock()
sortedFlags, err := topologicalSort(c.flags, flagKeys)
c.flagsMutex.RUnlock()
flagConfigs := c.flagConfigStorage.getFlagConfigs()
sortedFlags, err := topologicalSort(flagConfigs, flagKeys)
if err != nil {
return nil, err
}
c.requiredCohortsInStorage(sortedFlags)
enrichedUser, err := c.enrichUserWithCohorts(user, flagConfigs)
if err != nil {
return nil, err
}
userContext := evaluation.UserToContext(enrichedUser)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -149,9 +161,7 @@ func (c *Client) FlagsV2() (string, error) {

// FlagMetadata returns a copy of the flag's metadata. If the flag is not found then nil is returned.
func (c *Client) FlagMetadata(flagKey string) map[string]interface{} {
c.flagsMutex.RLock()
f := c.flags[flagKey]
c.flagsMutex.RUnlock()
f := c.flagConfigStorage.getFlagConfig(flagKey)
if f == nil {
return nil
}
Expand Down Expand Up @@ -329,3 +339,56 @@ func coerceString(value interface{}) string {
}
return fmt.Sprintf("%v", value)
}

func (c *Client) requiredCohortsInStorage(flagConfigs []*evaluation.Flag) {
storedCohortIDs := c.cohortStorage.getCohortIds()
for _, flag := range flagConfigs {
flagCohortIDs := getAllCohortIDsFromFlag(flag)
missingCohorts := difference(flagCohortIDs, storedCohortIDs)

if len(missingCohorts) > 0 {
if c.config.CohortSyncConfig != nil {
c.log.Debug(
"Evaluating flag %s dependent on cohorts %v without %v in storage",
flag.Key, flagCohortIDs, missingCohorts,
)
} else {
c.log.Debug(
"Evaluating flag %s dependent on cohorts %v without cohort syncing configured",
flag.Key, flagCohortIDs,
)
}
}
}
}

func (c *Client) enrichUserWithCohorts(user *experiment.User, flagConfigs map[string]*evaluation.Flag) (*experiment.User, error) {
flagConfigSlice := make([]*evaluation.Flag, 0, len(flagConfigs))

for _, value := range flagConfigs {
flagConfigSlice = append(flagConfigSlice, value)
}
groupedCohortIDs := getGroupedCohortIDsFromFlags(flagConfigSlice)

if cohortIDs, ok := groupedCohortIDs[userGroupType]; ok {
if len(cohortIDs) > 0 && user.UserId != "" {
user.CohortIds = c.cohortStorage.getCohortsForUser(user.UserId, cohortIDs)
}
}

if user.Groups != nil {
for groupType, groupNames := range user.Groups {
groupName := ""
if len(groupNames) > 0 {
groupName = groupNames[0]
}
if groupName == "" {
continue
}
if cohortIDs, ok := groupedCohortIDs[groupType]; ok {
user.AddGroupCohortIds(groupType, groupName, c.cohortStorage.getCohortsForGroup(groupType, groupName, cohortIDs))
}
}
}
return user, nil
}
55 changes: 55 additions & 0 deletions pkg/experiment/local/client_eu_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package local

import (
"github.com/amplitude/experiment-go-server/pkg/experiment"
"github.com/joho/godotenv"
"log"
"os"
"testing"
)

var clientEU *Client

func init() {
err := godotenv.Load()
if err != nil {
log.Printf("Error loading .env file: %v", err)
}
projectApiKey := os.Getenv("EU_API_KEY")
secretKey := os.Getenv("EU_SECRET_KEY")
cohortSyncConfig := CohortSyncConfig{
ApiKey: projectApiKey,
SecretKey: secretKey,
}
clientEU = Initialize("server-Qlp7XiSu6JtP2S3JzA95PnP27duZgQCF",
&Config{CohortSyncConfig: &cohortSyncConfig, ServerZone: EUServerZone})
err = clientEU.Start()
if err != nil {
panic(err)
}
}

func TestEvaluateV2CohortEU(t *testing.T) {
targetedUser := &experiment.User{UserId: "1", DeviceId: "0"}
nonTargetedUser := &experiment.User{UserId: "not_targeted", DeviceId: "0"}
flagKeys := []string{"sdk-local-evaluation-user-cohort"}
result, err := clientEU.EvaluateV2(targetedUser, flagKeys)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
variant := result["sdk-local-evaluation-user-cohort"]
if variant.Key != "on" {
t.Fatalf("Unexpected variant %v", variant)
}
if variant.Value != "on" {
t.Fatalf("Unexpected variant %v", variant)
}
result, err = clientEU.EvaluateV2(nonTargetedUser, flagKeys)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
variant = result["sdk-local-evaluation-user-cohort"]
if variant.Key != "off" {
t.Fatalf("Unexpected variant %v", variant)
}
}
Loading

0 comments on commit 92179de

Please sign in to comment.