Skip to content

Commit

Permalink
Restart applier manager watch loop on errors
Browse files Browse the repository at this point in the history
Exit the loop on error and restart it after a one-minute delay to allow
it to recover in a new run. Also replace the bespoke retry loop for
stacks with the Kubernetes client's wait package.

Signed-off-by: Tom Wieczorek <[email protected]>
  • Loading branch information
twz123 committed Oct 1, 2024
1 parent edb105c commit 404c6cf
Showing 1 changed file with 12 additions and 10 deletions.
22 changes: 12 additions & 10 deletions pkg/applier/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
"github.com/k0sproject/k0s/pkg/constant"
kubeutil "github.com/k0sproject/k0s/pkg/kubernetes"

"k8s.io/apimachinery/pkg/util/wait"

"github.com/fsnotify/fsnotify"
"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -73,7 +75,7 @@ func (m *Manager) Init(ctx context.Context) error {
m.stop = func(reason string) { cancel(errors.New(reason)); <-stopped }
go func() {
defer close(stopped)
m.runWatchers(ctx)
wait.UntilWithContext(ctx, m.runWatchers, 1*time.Minute)
}()
})
m.LeaderElector.AddLostLeaseCallback(func() {
Expand Down Expand Up @@ -113,8 +115,11 @@ func (m *Manager) runWatchers(ctx context.Context) {
err = watcher.Add(m.bundleDir)
if err != nil {
m.log.WithError(err).Error("Failed to watch bundle directory")
return
}

m.log.Info("Starting watch loop")

// Add all directories after the bundle dir has been added to the watcher.
// Doing it the other way round introduces a race condition when directories
// get created after the initial listing but before the watch starts.
Expand All @@ -125,6 +130,7 @@ func (m *Manager) runWatchers(ctx context.Context) {
return
}

ctx, cancel := context.WithCancelCause(ctx)
stacks := make(map[string]stack, len(dirs))

for _, dir := range dirs {
Expand All @@ -135,6 +141,7 @@ func (m *Manager) runWatchers(ctx context.Context) {
select {
case err := <-watcher.Errors:
m.log.WithError(err).Error("Watch error")
cancel(err)

case event := <-watcher.Events:
switch event.Op {
Expand Down Expand Up @@ -172,20 +179,15 @@ func (m *Manager) createStack(ctx context.Context, stacks map[string]stack, name
go func() {
defer close(stopped)
log := m.log.WithField("stack", name)
for {

wait.UntilWithContext(ctx, func(ctx context.Context) {
log.Info("Running stack")
if err := stack.Run(ctx); err != nil {
log.WithError(err).Error("Failed to run stack")
}
}, 1*time.Minute)

select {
case <-time.After(10 * time.Second):
continue
case <-ctx.Done():
log.Infof("Stack done (%v)", context.Cause(ctx))
return
}
}
log.Infof("Stack done (%v)", context.Cause(ctx))
}()
}

Expand Down

0 comments on commit 404c6cf

Please sign in to comment.