diff --git a/pkg/experiment/local/deployment_runner.go b/pkg/experiment/local/deployment_runner.go index 2a2312f..bf22d82 100644 --- a/pkg/experiment/local/deployment_runner.go +++ b/pkg/experiment/local/deployment_runner.go @@ -18,7 +18,7 @@ type deploymentRunner struct { } const streamUpdaterRetryDelay = 15 * time.Second -const updaterRetryMaxJitter = 2 * time.Second +const updaterRetryMaxJitter = 1 * time.Second func newDeploymentRunner( config *Config, @@ -28,9 +28,9 @@ func newDeploymentRunner( cohortStorage cohortStorage, cohortLoader *cohortLoader, ) *deploymentRunner { - flagConfigUpdater := newflagConfigFallbackRetryWrapper(newFlagConfigPoller(flagConfigApi, config, flagConfigStorage, cohortStorage, cohortLoader), nil, config.FlagConfigPollerInterval, updaterRetryMaxJitter, config.Debug) + flagConfigUpdater := newflagConfigFallbackRetryWrapper(newFlagConfigPoller(flagConfigApi, config, flagConfigStorage, cohortStorage, cohortLoader), nil, config.FlagConfigPollerInterval, updaterRetryMaxJitter, 0, 0, config.Debug) if flagConfigStreamApi != nil { - flagConfigUpdater = newflagConfigFallbackRetryWrapper(newFlagConfigStreamer(flagConfigStreamApi, config, flagConfigStorage, cohortStorage, cohortLoader), flagConfigUpdater, streamUpdaterRetryDelay, updaterRetryMaxJitter, config.Debug) + flagConfigUpdater = newflagConfigFallbackRetryWrapper(newFlagConfigStreamer(flagConfigStreamApi, config, flagConfigStorage, cohortStorage, cohortLoader), flagConfigUpdater, streamUpdaterRetryDelay, updaterRetryMaxJitter, config.FlagConfigPollerInterval, 0, config.Debug) } dr := &deploymentRunner{ config: config, diff --git a/pkg/experiment/local/flag_config_updater.go b/pkg/experiment/local/flag_config_updater.go index e7de7a7..a1b7865 100644 --- a/pkg/experiment/local/flag_config_updater.go +++ b/pkg/experiment/local/flag_config_updater.go @@ -149,7 +149,7 @@ func (s *flagConfigStreamer) Start(onError func(error)) error { func(err error) { s.Stop() if onError != nil { - onError(err) + go func() {onError(err)}() } }, ) @@ -205,7 +205,9 @@ func (p *flagConfigPoller) Start(onError func(error)) error { if err := p.periodicRefresh(); err != nil { p.log.Error("Periodic updateFlagConfigs failed: %v", err) p.Stop() - onError(err) + if (onError != nil) { + go func() {onError(err)}() + } } }) return nil @@ -253,7 +255,11 @@ type flagConfigFallbackRetryWrapper struct { retryDelay time.Duration maxJitter time.Duration retryTimer *time.Timer + fallbackStartRetryDelay time.Duration + fallbackStartRetryMaxJitter time.Duration + fallbackStartRetryTimer *time.Timer lock sync.Mutex + isRunning bool } func newflagConfigFallbackRetryWrapper( @@ -261,6 +267,8 @@ func newflagConfigFallbackRetryWrapper( fallbackUpdater flagConfigUpdater, retryDelay time.Duration, maxJitter time.Duration, + fallbackStartRetryDelay time.Duration, + fallbackStartRetryMaxJitter time.Duration, debug bool, ) flagConfigUpdater { return &flagConfigFallbackRetryWrapper{ @@ -269,6 +277,9 @@ func newflagConfigFallbackRetryWrapper( fallbackUpdater: fallbackUpdater, retryDelay: retryDelay, maxJitter: maxJitter, + fallbackStartRetryDelay: fallbackStartRetryDelay, + fallbackStartRetryMaxJitter: fallbackStartRetryMaxJitter, + isRunning: false, } } @@ -282,7 +293,7 @@ func newflagConfigFallbackRetryWrapper( // Thus, onError will never be called. func (w *flagConfigFallbackRetryWrapper) Start(onError func(error)) error { // if (mainUpdater is flagConfigFallbackRetryWrapper) { - // throw Error("Do not use flagConfigFallbackRetryWrapper as main updater. Fallback updater will never be used. Rewrite retry and fallback logic.") + // return errors.New("Do not use flagConfigFallbackRetryWrapper as main updater. Fallback updater will never be used. Rewrite retry and fallback logic.") // } w.lock.Lock() @@ -294,31 +305,34 @@ func (w *flagConfigFallbackRetryWrapper) Start(onError func(error)) error { } err := w.mainUpdater.Start(func(err error) { - w.log.Error("main updater updating err, starting fallback if available. error: ", err) + w.log.Debug("main updater updating err, starting fallback if available. error: ", err) go func() { w.scheduleRetry() }() // Don't care if poller start error or not, always retry. - if w.fallbackUpdater != nil { - //nolint:errcheck - w.fallbackUpdater.Start(nil) // Don't care if fallback start success or fail. - } + go func() { w.fallbackStart() }() }) if err == nil { // Main start success, stop fallback. + if w.fallbackStartRetryTimer != nil { + w.fallbackStartRetryTimer.Stop() + } if w.fallbackUpdater != nil { w.fallbackUpdater.Stop() } + w.isRunning = true return nil } - w.log.Debug("main updater start err, starting fallback. error: ", err) if w.fallbackUpdater == nil { // No fallback, main start failed is wrapper start fail + w.log.Error("main updater start err, no fallback. error: ", err) return err } + w.log.Debug("main updater start err, starting fallback. error: ", err) err = w.fallbackUpdater.Start(nil) if err != nil { w.log.Debug("fallback updater start failed. error: ", err) return err } + w.isRunning = true go func() { w.scheduleRetry() }() return nil } @@ -326,12 +340,16 @@ func (w *flagConfigFallbackRetryWrapper) Start(onError func(error)) error { func (w *flagConfigFallbackRetryWrapper) Stop() { w.lock.Lock() defer w.lock.Unlock() + w.isRunning = false if w.retryTimer != nil { w.retryTimer.Stop() w.retryTimer = nil } w.mainUpdater.Stop() + if w.fallbackStartRetryTimer != nil { + w.fallbackStartRetryTimer.Stop() + } if w.fallbackUpdater != nil { w.fallbackUpdater.Stop() } @@ -341,6 +359,10 @@ func (w *flagConfigFallbackRetryWrapper) scheduleRetry() { w.lock.Lock() defer w.lock.Unlock() + if (!w.isRunning) { + return + } + if w.retryTimer != nil { w.retryTimer.Stop() w.retryTimer = nil @@ -349,22 +371,26 @@ func (w *flagConfigFallbackRetryWrapper) scheduleRetry() { w.lock.Lock() defer w.lock.Unlock() + if (!w.isRunning) { + return + } + if w.retryTimer != nil { w.retryTimer = nil } w.log.Debug("main updater retry start") err := w.mainUpdater.Start(func(err error) { - w.log.Error("main updater updating err, starting fallback if available. error: ", err) + w.log.Debug("main updater updating err, starting fallback if available. error: ", err) go func() { w.scheduleRetry() }() // Don't care if poller start error or not, always retry. - if w.fallbackUpdater != nil { - //nolint:errcheck - w.fallbackUpdater.Start(nil) // Don't care if fallback start success or fail. - } + go func() { w.fallbackStart() }() }) if err == nil { // Main start success, stop fallback. w.log.Debug("main updater retry start success") + if w.fallbackStartRetryTimer != nil { + w.fallbackStartRetryTimer.Stop() + } if w.fallbackUpdater != nil { w.fallbackUpdater.Stop() } @@ -374,3 +400,23 @@ func (w *flagConfigFallbackRetryWrapper) scheduleRetry() { go func() { w.scheduleRetry() }() }) } + +func (w *flagConfigFallbackRetryWrapper) fallbackStart() { + w.lock.Lock() + defer w.lock.Unlock() + + if (!w.isRunning) { + return + } + if (w.fallbackUpdater == nil) { + return + } + + err := w.fallbackUpdater.Start(nil) + if (err != nil) { + w.log.Debug("fallback updater start failed and scheduling retry") + w.fallbackStartRetryTimer = time.AfterFunc(randTimeDuration(w.fallbackStartRetryDelay, w.fallbackStartRetryMaxJitter), func() { + w.fallbackStart() + }) + } +} diff --git a/pkg/experiment/local/flag_config_updater_test.go b/pkg/experiment/local/flag_config_updater_test.go index 6d0fac4..769f88a 100644 --- a/pkg/experiment/local/flag_config_updater_test.go +++ b/pkg/experiment/local/flag_config_updater_test.go @@ -274,7 +274,7 @@ func TestFlagConfigFallbackRetryWrapper(t *testing.T) { } fallback.stopFunc = func() { } - w := newflagConfigFallbackRetryWrapper(&main, &fallback, 1*time.Second, 0, true) + w := newflagConfigFallbackRetryWrapper(&main, &fallback, 1*time.Second, 0, 1*time.Second, 0, true) err := w.Start(nil) assert.Nil(t, err) assert.NotNil(t, mainOnError) @@ -299,7 +299,7 @@ func TestFlagConfigFallbackRetryWrapperBothStartFail(t *testing.T) { } fallback.stopFunc = func() { } - w := newflagConfigFallbackRetryWrapper(&main, &fallback, 1*time.Second, 0, true) + w := newflagConfigFallbackRetryWrapper(&main, &fallback, 1*time.Second, 0, 1*time.Second, 0, true) err := w.Start(nil) assert.Equal(t, errors.New("fallback start error"), err) assert.NotNil(t, mainOnError) @@ -328,7 +328,7 @@ func TestFlagConfigFallbackRetryWrapperMainStartFailFallbackSuccess(t *testing.T fallback.stopFunc = func() { go func() { fallbackStopCh <- true }() } - w := newflagConfigFallbackRetryWrapper(&main, &fallback, 1*time.Second, 0, true) + w := newflagConfigFallbackRetryWrapper(&main, &fallback, 1*time.Second, 0, 1*time.Second, 0, true) err := w.Start(nil) assert.Nil(t, err) assert.NotNil(t, mainOnError) @@ -374,7 +374,7 @@ func TestFlagConfigFallbackRetryWrapperMainUpdatingFail(t *testing.T) { return nil } fallback.stopFunc = func() {} - w := newflagConfigFallbackRetryWrapper(&main, &fallback, 1*time.Second, 0, true) + w := newflagConfigFallbackRetryWrapper(&main, &fallback, 1*time.Second, 0, 1*time.Second, 0, true) // Start success err := w.Start(nil) assert.Nil(t, err) @@ -432,6 +432,94 @@ func TestFlagConfigFallbackRetryWrapperMainUpdatingFail(t *testing.T) { } +func TestFlagConfigFallbackRetryWrapperMainUpdatingFailFallbackStartFail(t *testing.T) { + main := mockFlagConfigUpdater{} + var mainOnError func(error) + main.startFunc = func(onError func(error)) error { + mainOnError = onError + return nil + } + main.stopFunc = func() { + mainOnError = nil + } + fallback := mockFlagConfigUpdater{} + fallbackStartCh := make(chan bool) + fallbackStopCh := make(chan bool) + fallback.startFunc = func(onError func(error)) error { + println(1) + go func() { fallbackStartCh <- true }() + return errors.New("fallback start fail") + } + fallback.stopFunc = func() {} + w := newflagConfigFallbackRetryWrapper(&main, &fallback, 1100 * time.Millisecond, 0, 500 * time.Millisecond, 0, true) + // Start success + err := w.Start(nil) + assert.Nil(t, err) + assert.NotNil(t, mainOnError) + select { + case <-fallbackStartCh: + assert.Fail(t, "Unexpected fallback started") + default: + } + + // Test main updating failed, fallback. + mainStartCh := make(chan bool) + main.startFunc = func(onError func(error)) error { + go func() { mainStartCh <- true }() + return errors.New("main start fail") + } + fallback.stopFunc = func() { // Start tracking fallback stops (Start() may call stops). + go func() { fallbackStopCh <- true }() + } + mainOnError(errors.New("main updating error")) + mainOnError = nil + <-fallbackStartCh // Fallbacks start tried. + <-fallbackStartCh // Fallbacks start retry once. + select { + case <-mainStartCh: + assert.Fail(t, "Unexpected fallback stopped") + default: + } + <-fallbackStartCh // Fallbacks start retry second. + select { + case <-mainStartCh: + assert.Fail(t, "Unexpected fallback stopped") + default: + } + // Main start failed again on retry. + <-mainStartCh + // Make next start success. + main.startFunc = func(onError func(error)) error { + go func() { mainStartCh <- true }() + return nil + } + // Fallback start continue to retry. + <-fallbackStartCh // Fallbacks start retry third. + select { + case <-mainStartCh: + assert.Fail(t, "Unexpected fallback stopped") + default: + } + <-fallbackStartCh // Fallbacks start retry fourth. + select { + case <-mainStartCh: + assert.Fail(t, "Unexpected fallback stopped") + default: + } + // Main start success. + <-mainStartCh + + // No more fallback start. + time.Sleep(4100 * time.Millisecond) + select { + case <-fallbackStartCh: + assert.Fail(t, "Unexpected fallback start") + default: + } + + w.Stop() +} + func TestFlagConfigFallbackRetryWrapperMainOnly(t *testing.T) { main := mockFlagConfigUpdater{} var mainOnError func(error) @@ -442,7 +530,7 @@ func TestFlagConfigFallbackRetryWrapperMainOnly(t *testing.T) { main.stopFunc = func() { mainOnError = nil } - w := newflagConfigFallbackRetryWrapper(&main, nil, 1*time.Second, 0, true) + w := newflagConfigFallbackRetryWrapper(&main, nil, 1*time.Second, 0, 1*time.Second, 0, true) err := w.Start(nil) assert.Nil(t, err) assert.NotNil(t, mainOnError)