Skip to content

Commit

Permalink
fix: attempt to shut down when provider init fails (#2263)
Browse files Browse the repository at this point in the history
  • Loading branch information
acuteaura authored Nov 7, 2024
1 parent 2458b78 commit c3d6cc7
Showing 1 changed file with 29 additions and 31 deletions.
60 changes: 29 additions & 31 deletions pkg/providers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package providers

import (
"context"
"errors"
"fmt"
"os"
"sync"
Expand Down Expand Up @@ -183,7 +184,18 @@ func (c *Controller) Run(ctx context.Context) error {
return err
}

c.run(rootCtx)
err := c.run(rootCtx)
if err != nil {
log.Errorf("provider run returned error, exiting process: %s", err)

// attempt to give up leader status, should also release the waitgroup and exit the process
rootCancel()
go func() {
time.Sleep(time.Second * 5)
log.Errorf("process has not quit 5s after provider failure, forcing exit: %s", err)
os.Exit(1)
}()
}

wg.Wait()
return nil
Expand Down Expand Up @@ -382,7 +394,7 @@ func (c *Controller) initSharedInformers() *providertypes.ListerInformer {
return listerInformer
}

func (c *Controller) run(ctx context.Context) {
func (c *Controller) run(ctx context.Context) error {
log.Infow("controller tries to leading ...",
zap.String("namespace", c.namespace),
zap.String("pod", c.name),
Expand All @@ -406,29 +418,22 @@ func (c *Controller) run(ctx context.Context) {
CacheSynced: !c.cfg.EtcdServer.Enabled,
SSLKeyEncryptSalt: c.cfg.EtcdServer.SSLKeyEncryptSalt,
}

// TODO: needs retry logic
err := c.apisix.AddCluster(ctx, clusterOpts)
if err != nil && err != apisix.ErrDuplicatedCluster {
// TODO give up the leader role
log.Errorf("failed to add default cluster: %s", err)
return
return err
}

if err := c.apisix.Cluster(c.cfg.APISIX.DefaultClusterName).HasSynced(ctx); err != nil {
// TODO give up the leader role
log.Errorf("failed to wait the default cluster to be ready: %s", err)

// re-create apisix cluster, used in next c.run
if err = c.apisix.UpdateCluster(ctx, clusterOpts); err != nil {
log.Errorf("failed to update default cluster: %s", err)
return
}
return
return err
}

// Creation Phase

log.Info("creating controller")

c.informers = c.initSharedInformers()
common := &providertypes.Common{
ControllerNamespace: c.namespace,
Expand All @@ -443,14 +448,12 @@ func (c *Controller) run(ctx context.Context) {

c.namespaceProvider, err = namespace.NewWatchingNamespaceProvider(ctx, c.kubeClient, c.cfg, c.resourceSyncCh)
if err != nil {
ctx.Done()
return
return err
}

c.podProvider, err = pod.NewProvider(common, c.namespaceProvider)
if err != nil {
ctx.Done()
return
return err
}

c.translator = translation.NewTranslator(&translation.TranslatorOptions{
Expand All @@ -466,20 +469,17 @@ func (c *Controller) run(ctx context.Context) {

c.apisixProvider, c.apisixTranslator, err = apisixprovider.NewProvider(common, c.namespaceProvider, c.translator)
if err != nil {
ctx.Done()
return
return err
}

c.ingressProvider, err = ingressprovider.NewProvider(common, c.namespaceProvider, c.translator, c.apisixTranslator)
if err != nil {
ctx.Done()
return
return err
}

c.kubeProvider, err = k8s.NewProvider(common, c.translator, c.namespaceProvider, c.apisixProvider, c.ingressProvider)
if err != nil {
ctx.Done()
return
return err
}

if c.cfg.Kubernetes.EnableGatewayAPI {
Expand All @@ -495,8 +495,7 @@ func (c *Controller) run(ctx context.Context) {
ListerInformer: common.ListerInformer,
})
if err != nil {
ctx.Done()
return
return err
}
}

Expand All @@ -505,25 +504,22 @@ func (c *Controller) run(ctx context.Context) {
log.Info("init namespaces")

if err = c.namespaceProvider.Init(ctx); err != nil {
ctx.Done()
return
return err
}

log.Info("wait for resource sync")

// Wait for resource sync
if ok := c.informers.StartAndWaitForCacheSync(ctx); !ok {
ctx.Done()
return
return errors.New("StartAndWaitForCacheSync failed")
}

log.Info("init providers")

// Compare resource
if !c.cfg.EtcdServer.Enabled {
if err = c.apisixProvider.Init(ctx); err != nil {
ctx.Done()
return
return err
}
}

Expand Down Expand Up @@ -573,6 +569,8 @@ func (c *Controller) run(ctx context.Context) {
log.Error("Start failed, abort...")
cancelFunc()
}

return nil
}

func (c *Controller) checkClusterHealth(ctx context.Context, cancelFunc context.CancelFunc) {
Expand Down

0 comments on commit c3d6cc7

Please sign in to comment.