Skip to content

Commit

Permalink
fix retry fallback start fail
Browse files Browse the repository at this point in the history
  • Loading branch information
zhukaihan committed Nov 4, 2024
1 parent 72d1fd1 commit 9cf168e
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 22 deletions.
6 changes: 3 additions & 3 deletions pkg/experiment/local/deployment_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type deploymentRunner struct {
}

const streamUpdaterRetryDelay = 15 * time.Second
const updaterRetryMaxJitter = 2 * time.Second
const updaterRetryMaxJitter = 1 * time.Second

func newDeploymentRunner(
config *Config,
Expand All @@ -28,9 +28,9 @@ func newDeploymentRunner(
cohortStorage cohortStorage,
cohortLoader *cohortLoader,
) *deploymentRunner {
flagConfigUpdater := newflagConfigFallbackRetryWrapper(newFlagConfigPoller(flagConfigApi, config, flagConfigStorage, cohortStorage, cohortLoader), nil, config.FlagConfigPollerInterval, updaterRetryMaxJitter, config.Debug)
flagConfigUpdater := newflagConfigFallbackRetryWrapper(newFlagConfigPoller(flagConfigApi, config, flagConfigStorage, cohortStorage, cohortLoader), nil, config.FlagConfigPollerInterval, updaterRetryMaxJitter, 0, 0, config.Debug)
if flagConfigStreamApi != nil {
flagConfigUpdater = newflagConfigFallbackRetryWrapper(newFlagConfigStreamer(flagConfigStreamApi, config, flagConfigStorage, cohortStorage, cohortLoader), flagConfigUpdater, streamUpdaterRetryDelay, updaterRetryMaxJitter, config.Debug)
flagConfigUpdater = newflagConfigFallbackRetryWrapper(newFlagConfigStreamer(flagConfigStreamApi, config, flagConfigStorage, cohortStorage, cohortLoader), flagConfigUpdater, streamUpdaterRetryDelay, updaterRetryMaxJitter, config.FlagConfigPollerInterval, 0, config.Debug)
}
dr := &deploymentRunner{
config: config,
Expand Down
74 changes: 60 additions & 14 deletions pkg/experiment/local/flag_config_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (s *flagConfigStreamer) Start(onError func(error)) error {
func(err error) {
s.Stop()
if onError != nil {
onError(err)
go func() {onError(err)}()
}
},
)
Expand Down Expand Up @@ -205,7 +205,9 @@ func (p *flagConfigPoller) Start(onError func(error)) error {
if err := p.periodicRefresh(); err != nil {
p.log.Error("Periodic updateFlagConfigs failed: %v", err)
p.Stop()
onError(err)
if (onError != nil) {
go func() {onError(err)}()
}
}
})
return nil
Expand Down Expand Up @@ -253,14 +255,20 @@ type flagConfigFallbackRetryWrapper struct {
retryDelay time.Duration
maxJitter time.Duration
retryTimer *time.Timer
fallbackStartRetryDelay time.Duration
fallbackStartRetryMaxJitter time.Duration
fallbackStartRetryTimer *time.Timer
lock sync.Mutex
isRunning bool
}

func newflagConfigFallbackRetryWrapper(
mainUpdater flagConfigUpdater,
fallbackUpdater flagConfigUpdater,
retryDelay time.Duration,
maxJitter time.Duration,
fallbackStartRetryDelay time.Duration,
fallbackStartRetryMaxJitter time.Duration,
debug bool,
) flagConfigUpdater {
return &flagConfigFallbackRetryWrapper{
Expand All @@ -269,6 +277,9 @@ func newflagConfigFallbackRetryWrapper(
fallbackUpdater: fallbackUpdater,
retryDelay: retryDelay,
maxJitter: maxJitter,
fallbackStartRetryDelay: fallbackStartRetryDelay,
fallbackStartRetryMaxJitter: fallbackStartRetryMaxJitter,
isRunning: false,
}
}

Expand All @@ -282,7 +293,7 @@ func newflagConfigFallbackRetryWrapper(
// Thus, onError will never be called.
func (w *flagConfigFallbackRetryWrapper) Start(onError func(error)) error {
// if (mainUpdater is flagConfigFallbackRetryWrapper) {
// throw Error("Do not use flagConfigFallbackRetryWrapper as main updater. Fallback updater will never be used. Rewrite retry and fallback logic.")
// return errors.New("Do not use flagConfigFallbackRetryWrapper as main updater. Fallback updater will never be used. Rewrite retry and fallback logic.")
// }

w.lock.Lock()
Expand All @@ -294,44 +305,51 @@ func (w *flagConfigFallbackRetryWrapper) Start(onError func(error)) error {
}

err := w.mainUpdater.Start(func(err error) {
w.log.Error("main updater updating err, starting fallback if available. error: ", err)
w.log.Debug("main updater updating err, starting fallback if available. error: ", err)
go func() { w.scheduleRetry() }() // Don't care if poller start error or not, always retry.
if w.fallbackUpdater != nil {
//nolint:errcheck
w.fallbackUpdater.Start(nil) // Don't care if fallback start success or fail.
}
go func() { w.fallbackStart() }()
})
if err == nil {
// Main start success, stop fallback.
if w.fallbackStartRetryTimer != nil {
w.fallbackStartRetryTimer.Stop()
}
if w.fallbackUpdater != nil {
w.fallbackUpdater.Stop()
}
w.isRunning = true
return nil
}
w.log.Debug("main updater start err, starting fallback. error: ", err)
if w.fallbackUpdater == nil {
// No fallback, main start failed is wrapper start fail
w.log.Error("main updater start err, no fallback. error: ", err)
return err
}
w.log.Debug("main updater start err, starting fallback. error: ", err)
err = w.fallbackUpdater.Start(nil)
if err != nil {
w.log.Debug("fallback updater start failed. error: ", err)
return err
}

w.isRunning = true
go func() { w.scheduleRetry() }()
return nil
}

func (w *flagConfigFallbackRetryWrapper) Stop() {
w.lock.Lock()
defer w.lock.Unlock()
w.isRunning = false

if w.retryTimer != nil {
w.retryTimer.Stop()
w.retryTimer = nil
}
w.mainUpdater.Stop()
if w.fallbackStartRetryTimer != nil {
w.fallbackStartRetryTimer.Stop()
}
if w.fallbackUpdater != nil {
w.fallbackUpdater.Stop()
}
Expand All @@ -341,6 +359,10 @@ func (w *flagConfigFallbackRetryWrapper) scheduleRetry() {
w.lock.Lock()
defer w.lock.Unlock()

if (!w.isRunning) {
return
}

if w.retryTimer != nil {
w.retryTimer.Stop()
w.retryTimer = nil
Expand All @@ -349,22 +371,26 @@ func (w *flagConfigFallbackRetryWrapper) scheduleRetry() {
w.lock.Lock()
defer w.lock.Unlock()

if (!w.isRunning) {
return
}

if w.retryTimer != nil {
w.retryTimer = nil
}

w.log.Debug("main updater retry start")
err := w.mainUpdater.Start(func(err error) {
w.log.Error("main updater updating err, starting fallback if available. error: ", err)
w.log.Debug("main updater updating err, starting fallback if available. error: ", err)
go func() { w.scheduleRetry() }() // Don't care if poller start error or not, always retry.
if w.fallbackUpdater != nil {
//nolint:errcheck
w.fallbackUpdater.Start(nil) // Don't care if fallback start success or fail.
}
go func() { w.fallbackStart() }()
})
if err == nil {
// Main start success, stop fallback.
w.log.Debug("main updater retry start success")
if w.fallbackStartRetryTimer != nil {
w.fallbackStartRetryTimer.Stop()
}
if w.fallbackUpdater != nil {
w.fallbackUpdater.Stop()
}
Expand All @@ -374,3 +400,23 @@ func (w *flagConfigFallbackRetryWrapper) scheduleRetry() {
go func() { w.scheduleRetry() }()
})
}

func (w *flagConfigFallbackRetryWrapper) fallbackStart() {
w.lock.Lock()
defer w.lock.Unlock()

if (!w.isRunning) {
return
}
if (w.fallbackUpdater == nil) {
return
}

err := w.fallbackUpdater.Start(nil)
if (err != nil) {
w.log.Debug("fallback updater start failed and scheduling retry")
w.fallbackStartRetryTimer = time.AfterFunc(randTimeDuration(w.fallbackStartRetryDelay, w.fallbackStartRetryMaxJitter), func() {
w.fallbackStart()
})
}
}
98 changes: 93 additions & 5 deletions pkg/experiment/local/flag_config_updater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ func TestFlagConfigFallbackRetryWrapper(t *testing.T) {
}
fallback.stopFunc = func() {
}
w := newflagConfigFallbackRetryWrapper(&main, &fallback, 1*time.Second, 0, true)
w := newflagConfigFallbackRetryWrapper(&main, &fallback, 1*time.Second, 0, 1*time.Second, 0, true)
err := w.Start(nil)
assert.Nil(t, err)
assert.NotNil(t, mainOnError)
Expand All @@ -299,7 +299,7 @@ func TestFlagConfigFallbackRetryWrapperBothStartFail(t *testing.T) {
}
fallback.stopFunc = func() {
}
w := newflagConfigFallbackRetryWrapper(&main, &fallback, 1*time.Second, 0, true)
w := newflagConfigFallbackRetryWrapper(&main, &fallback, 1*time.Second, 0, 1*time.Second, 0, true)
err := w.Start(nil)
assert.Equal(t, errors.New("fallback start error"), err)
assert.NotNil(t, mainOnError)
Expand Down Expand Up @@ -328,7 +328,7 @@ func TestFlagConfigFallbackRetryWrapperMainStartFailFallbackSuccess(t *testing.T
fallback.stopFunc = func() {
go func() { fallbackStopCh <- true }()
}
w := newflagConfigFallbackRetryWrapper(&main, &fallback, 1*time.Second, 0, true)
w := newflagConfigFallbackRetryWrapper(&main, &fallback, 1*time.Second, 0, 1*time.Second, 0, true)
err := w.Start(nil)
assert.Nil(t, err)
assert.NotNil(t, mainOnError)
Expand Down Expand Up @@ -374,7 +374,7 @@ func TestFlagConfigFallbackRetryWrapperMainUpdatingFail(t *testing.T) {
return nil
}
fallback.stopFunc = func() {}
w := newflagConfigFallbackRetryWrapper(&main, &fallback, 1*time.Second, 0, true)
w := newflagConfigFallbackRetryWrapper(&main, &fallback, 1*time.Second, 0, 1*time.Second, 0, true)
// Start success
err := w.Start(nil)
assert.Nil(t, err)
Expand Down Expand Up @@ -432,6 +432,94 @@ func TestFlagConfigFallbackRetryWrapperMainUpdatingFail(t *testing.T) {

}

func TestFlagConfigFallbackRetryWrapperMainUpdatingFailFallbackStartFail(t *testing.T) {
main := mockFlagConfigUpdater{}
var mainOnError func(error)
main.startFunc = func(onError func(error)) error {
mainOnError = onError
return nil
}
main.stopFunc = func() {
mainOnError = nil
}
fallback := mockFlagConfigUpdater{}
fallbackStartCh := make(chan bool)
fallbackStopCh := make(chan bool)
fallback.startFunc = func(onError func(error)) error {
println(1)
go func() { fallbackStartCh <- true }()
return errors.New("fallback start fail")
}
fallback.stopFunc = func() {}
w := newflagConfigFallbackRetryWrapper(&main, &fallback, 1100 * time.Millisecond, 0, 500 * time.Millisecond, 0, true)
// Start success
err := w.Start(nil)
assert.Nil(t, err)
assert.NotNil(t, mainOnError)
select {
case <-fallbackStartCh:
assert.Fail(t, "Unexpected fallback started")
default:
}

// Test main updating failed, fallback.
mainStartCh := make(chan bool)
main.startFunc = func(onError func(error)) error {
go func() { mainStartCh <- true }()
return errors.New("main start fail")
}
fallback.stopFunc = func() { // Start tracking fallback stops (Start() may call stops).
go func() { fallbackStopCh <- true }()
}
mainOnError(errors.New("main updating error"))
mainOnError = nil
<-fallbackStartCh // Fallbacks start tried.
<-fallbackStartCh // Fallbacks start retry once.
select {
case <-mainStartCh:
assert.Fail(t, "Unexpected fallback stopped")
default:
}
<-fallbackStartCh // Fallbacks start retry second.
select {
case <-mainStartCh:
assert.Fail(t, "Unexpected fallback stopped")
default:
}
// Main start failed again on retry.
<-mainStartCh
// Make next start success.
main.startFunc = func(onError func(error)) error {
go func() { mainStartCh <- true }()
return nil
}
// Fallback start continue to retry.
<-fallbackStartCh // Fallbacks start retry third.
select {
case <-mainStartCh:
assert.Fail(t, "Unexpected fallback stopped")
default:
}
<-fallbackStartCh // Fallbacks start retry fourth.
select {
case <-mainStartCh:
assert.Fail(t, "Unexpected fallback stopped")
default:
}
// Main start success.
<-mainStartCh

// No more fallback start.
time.Sleep(4100 * time.Millisecond)
select {
case <-fallbackStartCh:
assert.Fail(t, "Unexpected fallback start")
default:
}

w.Stop()
}

func TestFlagConfigFallbackRetryWrapperMainOnly(t *testing.T) {
main := mockFlagConfigUpdater{}
var mainOnError func(error)
Expand All @@ -442,7 +530,7 @@ func TestFlagConfigFallbackRetryWrapperMainOnly(t *testing.T) {
main.stopFunc = func() {
mainOnError = nil
}
w := newflagConfigFallbackRetryWrapper(&main, nil, 1*time.Second, 0, true)
w := newflagConfigFallbackRetryWrapper(&main, nil, 1*time.Second, 0, 1*time.Second, 0, true)
err := w.Start(nil)
assert.Nil(t, err)
assert.NotNil(t, mainOnError)
Expand Down

0 comments on commit 9cf168e

Please sign in to comment.