Skip to content

Commit

Permalink
add flag config updater tests
Browse files Browse the repository at this point in the history
  • Loading branch information
zhukaihan committed Oct 3, 2024
1 parent 4232012 commit 77f57a1
Show file tree
Hide file tree
Showing 4 changed files with 503 additions and 17 deletions.
21 changes: 13 additions & 8 deletions pkg/experiment/local/flag_config_stream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,15 @@ const streamApiMaxJitter = 5 * time.Second
const streamApiKeepaliveTimeout = 17 * time.Second
const streamApiReconnInterval = 15 * time.Minute

type flagConfigStreamApi interface {
Connect(
onInitUpdate func (map[string]*evaluation.Flag) error,
onUpdate func (map[string]*evaluation.Flag) error,
onError func (error),
) error
Close()
}

type flagConfigStreamApiV2 struct {
DeploymentKey string
ServerURL string
Expand Down Expand Up @@ -53,10 +62,7 @@ func (api *flagConfigStreamApiV2) Connect(
api.lock.Lock()
defer api.lock.Unlock()

err := api.closeInternal()
if (err != nil) {
return err
}
api.closeInternal()

// Create URL.
endpoint, err := url.Parse(api.ServerURL)
Expand Down Expand Up @@ -174,16 +180,15 @@ func parseData(data []byte) (map[string]*evaluation.Flag, error) {
return flags, nil
}

func (api *flagConfigStreamApiV2) closeInternal() error {
func (api *flagConfigStreamApiV2) closeInternal() {
if (api.stopCh != nil) {
close(api.stopCh)
api.stopCh = nil
}
return nil
}
func (api *flagConfigStreamApiV2) Close() error {
func (api *flagConfigStreamApiV2) Close() {
api.lock.Lock()
defer api.lock.Unlock()

return api.closeInternal()
api.closeInternal()
}
3 changes: 3 additions & 0 deletions pkg/experiment/local/flag_config_stream_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,19 @@ import (
)

type mockSseStream struct {
// Params
authToken string
url string
connectionTimeout time.Duration
keepaliveTimeout time.Duration
reconnInterval time.Duration
maxJitter time.Duration

// Channels to emit messages to simulate new events received through stream.
messageCh chan(StreamEvent)
errorCh chan(error)

// Channel to tell there's a connection call.
chConnected chan bool
}

Expand Down
29 changes: 21 additions & 8 deletions pkg/experiment/local/flag_config_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ type flagConfigUpdater interface {
Stop()
}

// The base for all flag config updaters.
// Contains a method to properly update the flag configs into storage and download cohorts.
type flagConfigUpdaterBase struct {
flagConfigStorage flagConfigStorage
cohortStorage cohortStorage
Expand All @@ -39,6 +41,7 @@ func newFlagConfigUpdaterBase(
}
}

// Updates the received flag configs into storage and download cohorts.
func (u *flagConfigUpdaterBase) update(flagConfigs map[string]*evaluation.Flag) error {

flagKeys := make(map[string]struct{})
Expand Down Expand Up @@ -112,14 +115,15 @@ func (u *flagConfigUpdaterBase) deleteUnusedCohorts() {
}
}

// The streamer for flag configs. It receives flag configs through server side events.
type flagConfigStreamer struct {
flagConfigUpdaterBase
flagConfigStreamApi *flagConfigStreamApiV2
flagConfigStreamApi flagConfigStreamApi
lock sync.Mutex
}

func NewFlagConfigStreamer(
flagConfigStreamApi *flagConfigStreamApiV2,
flagConfigStreamApi flagConfigStreamApi,
config *Config,
flagConfigStorage flagConfigStorage,
cohortStorage cohortStorage,
Expand All @@ -137,11 +141,14 @@ func (s *flagConfigStreamer) Start(onError func (error)) error {

s.stopInternal()
return s.flagConfigStreamApi.Connect(
nil,
func (flags map[string]*evaluation.Flag) error {
return s.update(flags)
},
func (flags map[string]*evaluation.Flag) error {
return s.update(flags)
},
func (err error) {
s.Stop()
if (onError != nil) {
onError(err)
}
Expand All @@ -159,6 +166,8 @@ func (s *flagConfigStreamer) Stop() {
s.stopInternal()
}

// The poller for flag configs. It polls every configured interval.
// On start, it polls a set of flag configs. If failed, error is returned. If success, poller starts.
type flagConfigPoller struct {
flagConfigUpdaterBase
flagConfigApi flagConfigApi
Expand Down Expand Up @@ -239,6 +248,8 @@ func (p *flagConfigPoller) Stop() {
p.stopInternal()
}

// A wrapper around flag config updaters to retry and fallback.
// If the main updater fails, it will fallback to the fallback updater and main updater enters retry loop.
type FlagConfigFallbackRetryWrapper struct {
mainUpdater flagConfigUpdater
fallbackUpdater flagConfigUpdater
Expand All @@ -261,9 +272,12 @@ func NewFlagConfigFallbackRetryWrapper(
}
}

/**
* Since the wrapper retries, so there will never be error case. Thus, onError will never be called.
*/
// Start tries to start main updater first.
// If it failed, start the fallback updater.
// If fallback updater failed as well, return error.
// If fallback updater succeed, main updater enters retry, return ok.
// Since the wrapper retries, so there will never be error case.
// Thus, onError will never be called.
func (w *FlagConfigFallbackRetryWrapper) Start(onError func (error)) error {
// if (mainUpdater is FlagConfigFallbackRetryWrapper) {
// throw Error("Do not use FlagConfigFallbackRetryWrapper as main updater. Fallback updater will never be used. Rewrite retry and fallback logic.")
Expand All @@ -284,7 +298,6 @@ func (w *FlagConfigFallbackRetryWrapper) Start(onError func (error)) error {
}
})
if (err == nil) {
fmt.Println("main start ok")
// Main start success, stop fallback.
if (w.fallbackUpdater != nil) {
w.fallbackUpdater.Stop()
Expand Down Expand Up @@ -329,7 +342,6 @@ func (w *FlagConfigFallbackRetryWrapper) scheduleRetry() {
w.retryTimer = nil
}
w.retryTimer = time.AfterFunc(randTimeDuration(w.retryDelay, w.maxJitter), func() {
fmt.Println("retrying")
w.lock.Lock()
defer w.lock.Unlock()

Expand All @@ -350,6 +362,7 @@ func (w *FlagConfigFallbackRetryWrapper) scheduleRetry() {
}
return
}
fmt.Println("retrying failed", err)

go func() {w.scheduleRetry()}()
})
Expand Down
Loading

0 comments on commit 77f57a1

Please sign in to comment.