diff --git a/cmd/kvisor/agent/agent.go b/cmd/kvisor/agent/agent.go index 1680ef9e..e33c4865 100644 --- a/cmd/kvisor/agent/agent.go +++ b/cmd/kvisor/agent/agent.go @@ -21,6 +21,7 @@ import ( "github.com/samber/lo" "github.com/sirupsen/logrus" "github.com/spf13/cobra" + "golang.org/x/sync/errgroup" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" @@ -345,31 +346,6 @@ func run(ctx context.Context, logger logrus.FieldLogger, castaiClient castai.Cli blobsCache.RegisterHandlers(httpMux) } - if err := mngr.Add(manager.RunnableFunc(func(ctx context.Context) error { - // Start http server for scan job, metrics and pprof handlers. - httpAddr := fmt.Sprintf(":%d", cfg.HTTPPort) - log.Infof("starting http server on %s", httpAddr) - - srv := &http.Server{ - Addr: httpAddr, - Handler: httpMux, - WriteTimeout: 10 * time.Second, - ReadTimeout: 10 * time.Second, - } - go func() { - <-ctx.Done() - log.Info("stopping http server") - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - if err := srv.Shutdown(ctx); err != nil && !errors.Is(err, http.ErrServerClosed) { - log.Warnf("http server shutdown: %v", err) - } - }() - return srv.ListenAndServe() - })); err != nil { - return fmt.Errorf("add http server: %w", err) - } - if err := mngr.Add(telemetryManager); err != nil { return fmt.Errorf("add telemetry manager: %w", err) } @@ -387,7 +363,38 @@ func run(ctx context.Context, logger logrus.FieldLogger, castaiClient castai.Cli if err := mngr.Add(kubeCtrl); err != nil { return fmt.Errorf("add kube controller: %w", err) } - return mngr.Start(ctx) + + errg, ctx := errgroup.WithContext(ctx) + errg.Go(func() error { + return runHTTPServer(ctx, log, httpMux, cfg) + }) + errg.Go(func() error { + return mngr.Start(ctx) + }) + return errg.Wait() +} + +func runHTTPServer(ctx context.Context, log *logrus.Entry, httpMux *http.ServeMux, cfg config.Config) error { + // Start http server for scan job, metrics and pprof handlers. + httpAddr := fmt.Sprintf(":%d", cfg.HTTPPort) + log.Infof("starting http server on %s", httpAddr) + + srv := &http.Server{ + Addr: httpAddr, + Handler: httpMux, + WriteTimeout: 10 * time.Second, + ReadTimeout: 10 * time.Second, + } + go func() { + <-ctx.Done() + log.Info("stopping http server") + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + if err := srv.Shutdown(ctx); err != nil && !errors.Is(err, http.ErrServerClosed) { + log.Warnf("http server shutdown: %v", err) + } + }() + return srv.ListenAndServe() } func retrieveKubeConfig(log logrus.FieldLogger, kubepath string) (*rest.Config, error) {