Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add flag push #30

Merged
merged 18 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,7 @@ require (
github.com/amplitude/analytics-go v1.0.1
github.com/jarcoal/httpmock v1.3.1
github.com/joho/godotenv v1.5.1
github.com/r3labs/sse/v2 v2.10.0
github.com/stretchr/testify v1.9.0
gopkg.in/cenkalti/backoff.v1 v1.1.0
)
10 changes: 10 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,29 @@ github.com/maxatome/go-testdeep v1.12.0 h1:Ql7Go8Tg0C1D/uMMX59LAoYK7LffeJQ6X2T04
github.com/maxatome/go-testdeep v1.12.0/go.mod h1:lPZc/HAcJMP92l7yI6TRz1aZN5URwUBUAfUNvrclaNM=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/r3labs/sse/v2 v2.10.0 h1:hFEkLLFY4LDifoHdiCN/LlGBAdVJYsANaLqNYa1l/v0=
github.com/r3labs/sse/v2 v2.10.0/go.mod h1:Igau6Whc+F17QUgML1fYe1VPZzTV6EMCnYktEmkNJ7I=
github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/net v0.0.0-20191116160921-f9c825593386 h1:ktbWvQrW08Txdxno1PiDpSxPXG6ndGsfnJjRRtkM0LQ=
golang.org/x/net v0.0.0-20191116160921-f9c825593386/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
gopkg.in/cenkalti/backoff.v1 v1.1.0 h1:Arh75ttbsvlpVA7WtVpH4u9h6Zl46xuptxqLxPiSo4Y=
gopkg.in/cenkalti/backoff.v1 v1.1.0/go.mod h1:J6Vskwqd+OMVJl8C33mmtxTBs2gyzfv7UDAkHu8BrjI=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
Expand Down
4 changes: 2 additions & 2 deletions pkg/experiment/local/assignment_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ func TestToEvent(t *testing.T) {
},
},
"flag-key-2": {
Key: "control",
Key: "control",
Metadata: map[string]interface{}{
"default": true,
"default": true,
"segmentName": "All Other Users",
"flagVersion": float64(12),
},
Expand Down
14 changes: 11 additions & 3 deletions pkg/experiment/local/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/amplitude/analytics-go/amplitude"
"io/ioutil"
"net/http"
"net/url"
"reflect"
"sync"

"github.com/amplitude/analytics-go/amplitude"

"github.com/amplitude/experiment-go-server/internal/evaluation"

"github.com/amplitude/experiment-go-server/pkg/experiment"
Expand Down Expand Up @@ -59,9 +60,16 @@ func Initialize(apiKey string, config *Config) *Client {
var deploymentRunner *deploymentRunner
if config.CohortSyncConfig != nil {
cohortDownloadApi := newDirectCohortDownloadApi(config.CohortSyncConfig.ApiKey, config.CohortSyncConfig.SecretKey, config.CohortSyncConfig.MaxCohortSize, config.CohortSyncConfig.CohortServerUrl, config.Debug)
cohortLoader = newCohortLoader(cohortDownloadApi, cohortStorage)
cohortLoader = newCohortLoader(cohortDownloadApi, cohortStorage, config.Debug)
}
var flagStreamApi *flagConfigStreamApiV2
if config.StreamUpdates {
flagStreamApi = newFlagConfigStreamApiV2(apiKey, config.StreamServerUrl, config.StreamFlagConnTimeout)
}
deploymentRunner = newDeploymentRunner(config, newFlagConfigApiV2(apiKey, config.ServerUrl, config.FlagConfigPollerRequestTimeout), flagConfigStorage, cohortStorage, cohortLoader)
deploymentRunner = newDeploymentRunner(
config,
newFlagConfigApiV2(apiKey, config.ServerUrl, config.FlagConfigPollerRequestTimeout),
flagStreamApi, flagConfigStorage, cohortStorage, cohortLoader)
client = &Client{
log: log,
apiKey: apiKey,
Expand Down
157 changes: 157 additions & 0 deletions pkg/experiment/local/client_stream_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
package local

import (
"log"
"os"
"testing"

"github.com/amplitude/experiment-go-server/pkg/experiment"
"github.com/joho/godotenv"
"github.com/stretchr/testify/assert"
)

var streamClient *Client

func init() {
err := godotenv.Load()
if err != nil {
log.Printf("Error loading .env file: %v", err)
}
projectApiKey := os.Getenv("API_KEY")
secretKey := os.Getenv("SECRET_KEY")
cohortSyncConfig := CohortSyncConfig{
ApiKey: projectApiKey,
SecretKey: secretKey,
}
streamClient = Initialize("server-qz35UwzJ5akieoAdIgzM4m9MIiOLXLoz",
&Config{
StreamUpdates: true,
StreamServerUrl: "https://stream.lab.amplitude.com",
CohortSyncConfig: &cohortSyncConfig,
})
err = streamClient.Start()
if err != nil {
panic(err)
}
}

func TestMakeSureStreamEnabled(t *testing.T) {
assert.True(t, streamClient.config.StreamUpdates)
}

func TestStreamEvaluate(t *testing.T) {
user := &experiment.User{UserId: "test_user"}
result, err := streamClient.Evaluate(user, nil)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
variant := result["sdk-local-evaluation-ci-test"]
if variant.Key != "on" {
t.Fatalf("Unexpected variant %v", variant)
}
if variant.Value != "on" {
t.Fatalf("Unexpected variant %v", variant)
}
if variant.Payload != "payload" {
t.Fatalf("Unexpected variant %v", variant)
}
variant = result["sdk-ci-test"]
if variant.Key != "" {
t.Fatalf("Unexpected variant %v", variant)
}
if variant.Value != "" {
t.Fatalf("Unexpected variant %v", variant)
}
}

func TestStreamEvaluateV2AllFlags(t *testing.T) {
user := &experiment.User{UserId: "test_user"}
result, err := streamClient.EvaluateV2(user, nil)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
variant := result["sdk-local-evaluation-ci-test"]
if variant.Key != "on" {
t.Fatalf("Unexpected variant %v", variant)
}
if variant.Value != "on" {
t.Fatalf("Unexpected variant %v", variant)
}
if variant.Payload != "payload" {
t.Fatalf("Unexpected variant %v", variant)
}
variant = result["sdk-ci-test"]
if variant.Key != "off" {
t.Fatalf("Unexpected variant %v", variant)
}
if variant.Value != "" {
t.Fatalf("Unexpected variant %v", variant)
}
}

func TestStreamFlagMetadataLocalFlagKey(t *testing.T) {
md := streamClient.FlagMetadata("sdk-local-evaluation-ci-test")
if md["evaluationMode"] != "local" {
t.Fatalf("Unexpected metadata %v", md)
}
}

func TestStreamEvaluateV2Cohort(t *testing.T) {
targetedUser := &experiment.User{UserId: "12345"}
nonTargetedUser := &experiment.User{UserId: "not_targeted"}
flagKeys := []string{"sdk-local-evaluation-user-cohort-ci-test"}
result, err := streamClient.EvaluateV2(targetedUser, flagKeys)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
variant := result["sdk-local-evaluation-user-cohort-ci-test"]
if variant.Key != "on" {
t.Fatalf("Unexpected variant %v", variant)
}
if variant.Value != "on" {
t.Fatalf("Unexpected variant %v", variant)
}
result, err = streamClient.EvaluateV2(nonTargetedUser, flagKeys)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
variant = result["sdk-local-evaluation-user-cohort-ci-test"]
if variant.Key != "off" {
t.Fatalf("Unexpected variant %v", variant)
}
}

func TestStreamEvaluateV2GroupCohort(t *testing.T) {
targetedUser := &experiment.User{
UserId: "12345",
DeviceId: "device_id",
Groups: map[string][]string{
"org id": {"1"},
}}
nonTargetedUser := &experiment.User{
UserId: "12345",
DeviceId: "device_id",
Groups: map[string][]string{
"org id": {"not_targeted"},
}}
flagKeys := []string{"sdk-local-evaluation-group-cohort-ci-test"}
result, err := streamClient.EvaluateV2(targetedUser, flagKeys)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
variant := result["sdk-local-evaluation-group-cohort-ci-test"]
if variant.Key != "on" {
t.Fatalf("Unexpected variant %v", variant)
}
if variant.Value != "on" {
t.Fatalf("Unexpected variant %v", variant)
}
result, err = streamClient.EvaluateV2(nonTargetedUser, flagKeys)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
variant = result["sdk-local-evaluation-group-cohort-ci-test"]
if variant.Key != "off" {
t.Fatalf("Unexpected variant %v", variant)
}
}
5 changes: 3 additions & 2 deletions pkg/experiment/local/client_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package local

import (
"github.com/amplitude/experiment-go-server/pkg/experiment"
"github.com/joho/godotenv"
"log"
"os"
"testing"

"github.com/amplitude/experiment-go-server/pkg/experiment"
"github.com/joho/godotenv"
)

var client *Client
Expand Down
39 changes: 38 additions & 1 deletion pkg/experiment/local/cohort_loader.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,24 @@
package local

import (
"fmt"
"strings"
"sync"
"sync/atomic"

"github.com/amplitude/experiment-go-server/internal/logger"
)

type cohortLoader struct {
log *logger.Log
cohortDownloadApi cohortDownloadApi
cohortStorage cohortStorage
jobs sync.Map
executor *sync.Pool
lockJobs sync.Mutex
}

func newCohortLoader(cohortDownloadApi cohortDownloadApi, cohortStorage cohortStorage) *cohortLoader {
func newCohortLoader(cohortDownloadApi cohortDownloadApi, cohortStorage cohortStorage, debug bool) *cohortLoader {
return &cohortLoader{
cohortDownloadApi: cohortDownloadApi,
cohortStorage: cohortStorage,
Expand All @@ -22,6 +27,7 @@ func newCohortLoader(cohortDownloadApi cohortDownloadApi, cohortStorage cohortSt
return &CohortLoaderTask{}
},
},
log: logger.New(debug),
}
}

Expand Down Expand Up @@ -86,3 +92,34 @@ func (cl *cohortLoader) downloadCohort(cohortID string) (*Cohort, error) {
cohort := cl.cohortStorage.getCohort(cohortID)
return cl.cohortDownloadApi.getCohort(cohortID, cohort)
}

func (cl *cohortLoader) downloadCohorts(cohortIDs map[string]struct{}) {
var wg sync.WaitGroup
errorChan := make(chan error, len(cohortIDs))

for cohortID := range cohortIDs {
wg.Add(1)
go func(id string) {
defer wg.Done()
task := cl.loadCohort(id)
if err := task.wait(); err != nil {
errorChan <- fmt.Errorf("cohort %s: %v", id, err)
}
}(cohortID)
}

go func() {
wg.Wait()
close(errorChan)
}()

var errorMessages []string
for err := range errorChan {
errorMessages = append(errorMessages, err.Error())
cl.log.Error("Error downloading cohort: %v", err)
}

if len(errorMessages) > 0 {
cl.log.Error("One or more cohorts failed to download:\n%s", strings.Join(errorMessages, "\n"))
}
}
6 changes: 3 additions & 3 deletions pkg/experiment/local/cohort_loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
func TestLoadSuccess(t *testing.T) {
api := &MockCohortDownloadApi{}
storage := newInMemoryCohortStorage()
loader := newCohortLoader(api, storage)
loader := newCohortLoader(api, storage, true)

// Define mock behavior
api.On("getCohort", "a", mock.AnythingOfType("*local.Cohort")).Return(&Cohort{Id: "a", LastModified: 0, Size: 1, MemberIds: []string{"1"}, GroupType: userGroupType}, nil)
Expand Down Expand Up @@ -48,7 +48,7 @@ func TestLoadSuccess(t *testing.T) {
func TestFilterCohortsAlreadyComputed(t *testing.T) {
api := &MockCohortDownloadApi{}
storage := newInMemoryCohortStorage()
loader := newCohortLoader(api, storage)
loader := newCohortLoader(api, storage, true)

storage.putCohort(&Cohort{Id: "a", LastModified: 0, Size: 0, MemberIds: []string{}})
storage.putCohort(&Cohort{Id: "b", LastModified: 0, Size: 0, MemberIds: []string{}})
Expand Down Expand Up @@ -89,7 +89,7 @@ func TestFilterCohortsAlreadyComputed(t *testing.T) {
func TestLoadDownloadFailureThrows(t *testing.T) {
api := &MockCohortDownloadApi{}
storage := newInMemoryCohortStorage()
loader := newCohortLoader(api, storage)
loader := newCohortLoader(api, storage, true)

// Define mock behavior
api.On("getCohort", "a", mock.AnythingOfType("*local.Cohort")).Return(&Cohort{Id: "a", LastModified: 0, Size: 1, MemberIds: []string{"1"}, GroupType: userGroupType}, nil)
Expand Down
Loading
Loading