From 786c01350a7e580c12f7e76b12447c570b642b3c Mon Sep 17 00:00:00 2001 From: Jeff Mendoza Date: Thu, 16 Jan 2025 13:20:24 -0800 Subject: [PATCH] Update shutdown for otel on cli commands Signed-off-by: Jeff Mendoza --- cmd/guaccollect/cmd/deps_dev.go | 10 ++++++---- cmd/guaccollect/cmd/github.go | 10 ++++++---- cmd/guaccollect/cmd/license.go | 10 ++++++---- cmd/guaccollect/cmd/osv.go | 10 ++++++---- cmd/guacgql/cmd/server.go | 12 ++++++++---- cmd/guacingest/cmd/ingest.go | 13 ++++++++----- cmd/guacone/cmd/deps_dev.go | 14 ++++++++------ cmd/guacone/cmd/eol.go | 10 ++++++---- cmd/guacone/cmd/files.go | 14 ++++++++------ cmd/guacone/cmd/github.go | 31 +++++++++++++++++-------------- cmd/guacone/cmd/license.go | 12 +++++++----- cmd/guacone/cmd/osv.go | 12 +++++++----- 12 files changed, 93 insertions(+), 65 deletions(-) diff --git a/cmd/guaccollect/cmd/deps_dev.go b/cmd/guaccollect/cmd/deps_dev.go index f54634edb6..6d3a357ea9 100644 --- a/cmd/guaccollect/cmd/deps_dev.go +++ b/cmd/guaccollect/cmd/deps_dev.go @@ -102,13 +102,16 @@ you have access to read and write to the respective blob store.`, os.Exit(1) } - var shutdown func(context.Context) error = func(context.Context) error { return nil } if opts.enableOtel { - var err error - shutdown, err = metrics.SetupOTelSDK(ctx) + shutdown, err := metrics.SetupOTelSDK(ctx) if err != nil { logger.Fatalf("Error setting up Otel: %v", err) } + defer func() { + if err := shutdown(ctx); err != nil { + logger.Errorf("Error on Otel shutdown: %v", err) + } + }() } // Register collector @@ -131,7 +134,6 @@ you have access to read and write to the respective blob store.`, } initializeNATsandCollector(ctx, opts.pubsubAddr, opts.blobAddr, opts.publishToQueue) - shutdown(ctx) }, } diff --git a/cmd/guaccollect/cmd/github.go b/cmd/guaccollect/cmd/github.go index 9dc57984e0..bf8ed7d9ec 100644 --- a/cmd/guaccollect/cmd/github.go +++ b/cmd/guaccollect/cmd/github.go @@ -113,13 +113,16 @@ you have access to read and write to the respective blob store.`, os.Exit(1) } - var shutdown func(context.Context) error = func(context.Context) error { return nil } if opts.enableOtel { - var err error - shutdown, err = metrics.SetupOTelSDK(ctx) + shutdown, err := metrics.SetupOTelSDK(ctx) if err != nil { logger.Fatalf("Error setting up Otel: %v", err) } + defer func() { + if err := shutdown(ctx); err != nil { + logger.Errorf("Error on Otel shutdown: %v", err) + } + }() } // GITHUB_TOKEN is the default token name @@ -167,7 +170,6 @@ you have access to read and write to the respective blob store.`, } initializeNATsandCollector(ctx, opts.pubsubAddr, opts.blobAddr, opts.publishToQueue) - shutdown(ctx) }, } diff --git a/cmd/guaccollect/cmd/license.go b/cmd/guaccollect/cmd/license.go index 920274cc3b..9a67c07358 100644 --- a/cmd/guaccollect/cmd/license.go +++ b/cmd/guaccollect/cmd/license.go @@ -104,13 +104,16 @@ you have access to read and write to the respective blob store.`, ctx := logging.WithLogger(context.Background()) logger := logging.FromContext(ctx) - var shutdown func(context.Context) error = func(context.Context) error { return nil } if opts.enableOtel { - var err error - shutdown, err = metrics.SetupOTelSDK(ctx) + shutdown, err := metrics.SetupOTelSDK(ctx) if err != nil { logger.Fatalf("Error setting up Otel: %v", err) } + defer func() { + if err := shutdown(ctx); err != nil { + logger.Errorf("Error on Otel shutdown: %v", err) + } + }() } if err := certify.RegisterCertifier(clearlydefined.NewClearlyDefinedCertifier, certifier.CertifierClearlyDefined); err != nil { @@ -128,7 +131,6 @@ you have access to read and write to the respective blob store.`, } initializeNATsandCertifier(ctx, opts.blobAddr, opts.pubsubAddr, opts.poll, opts.publishToQueue, opts.interval, packageQueryFunc()) - shutdown(ctx) }, } diff --git a/cmd/guaccollect/cmd/osv.go b/cmd/guaccollect/cmd/osv.go index 06d3a2dd31..9fadfc0a7f 100644 --- a/cmd/guaccollect/cmd/osv.go +++ b/cmd/guaccollect/cmd/osv.go @@ -115,13 +115,16 @@ you have access to read and write to the respective blob store.`, ctx := logging.WithLogger(context.Background()) logger := logging.FromContext(ctx) - var shutdown func(context.Context) error = func(context.Context) error { return nil } if opts.enableOtel { - var err error - shutdown, err = metrics.SetupOTelSDK(ctx) + shutdown, err := metrics.SetupOTelSDK(ctx) if err != nil { logger.Fatalf("Error setting up Otel: %v", err) } + defer func() { + if err := shutdown(ctx); err != nil { + logger.Errorf("Error on Otel shutdown: %v", err) + } + }() } if err := certify.RegisterCertifier(func() certifier.Certifier { @@ -145,7 +148,6 @@ you have access to read and write to the respective blob store.`, } initializeNATsandCertifier(ctx, opts.blobAddr, opts.pubsubAddr, opts.poll, opts.publishToQueue, opts.interval, packageQueryFunc()) - shutdown(ctx) }, } diff --git a/cmd/guacgql/cmd/server.go b/cmd/guacgql/cmd/server.go index 19b9a531e6..48518e3c50 100644 --- a/cmd/guacgql/cmd/server.go +++ b/cmd/guacgql/cmd/server.go @@ -81,14 +81,19 @@ func startServer(cmd *cobra.Command) { srvHandler = srv } - var shutdown func(context.Context) error = func(context.Context) error { return nil } if flags.enableOtel { - var err error - shutdown, err = metrics.SetupOTelSDK(ctx) + shutdown, err := metrics.SetupOTelSDK(ctx) if err != nil { logger.Fatalf("Error setting up Otel: %v", err) } + srvHandler = otelhttp.NewHandler(srvHandler, "/") + + defer func() { + if err := shutdown(ctx); err != nil { + logger.Errorf("Error on Otel shutdown: %v", err) + } + }() } if flags.tracegql { @@ -126,7 +131,6 @@ func startServer(cmd *cobra.Command) { ctx, cf := context.WithCancel(ctx) go func() { _ = server.Shutdown(ctx) - _ = shutdown(ctx) done <- true }() select { diff --git a/cmd/guacingest/cmd/ingest.go b/cmd/guacingest/cmd/ingest.go index 18db0d437b..bff6120905 100644 --- a/cmd/guacingest/cmd/ingest.go +++ b/cmd/guacingest/cmd/ingest.go @@ -73,17 +73,20 @@ func ingest(cmd *cobra.Command, args []string) { os.Exit(1) } - ctx, cf := context.WithCancel(logging.WithLogger(context.Background())) + ctx := logging.WithLogger(context.Background()) logger := logging.FromContext(ctx) transport := cli.HTTPHeaderTransport(ctx, opts.headerFile, http.DefaultTransport) - var shutdown func(context.Context) error = func(context.Context) error { return nil } if opts.enableOtel { - var err error - shutdown, err = metrics.SetupOTelSDK(ctx) + shutdown, err := metrics.SetupOTelSDK(ctx) if err != nil { logger.Fatalf("Error setting up Otel: %v", err) } + defer func() { + if err := shutdown(ctx); err != nil { + logger.Errorf("Error on Otel shutdown: %v", err) + } + }() } if strings.HasPrefix(opts.pubsubAddr, "nats://") { @@ -113,6 +116,7 @@ func ingest(cmd *cobra.Command, args []string) { } defer csubClient.Close() + ctx, cf := context.WithCancel(ctx) emit := func(d *processor.Document) error { if _, err := ingestor.Ingest( ctx, @@ -150,7 +154,6 @@ func ingest(cmd *cobra.Command, args []string) { s := <-sigs logger.Infof("Signal received: %s, shutting down gracefully\n", s.String()) cf() - shutdown(ctx) wg.Wait() } diff --git a/cmd/guacone/cmd/deps_dev.go b/cmd/guacone/cmd/deps_dev.go index 054df42fe3..c96052f3ba 100644 --- a/cmd/guacone/cmd/deps_dev.go +++ b/cmd/guacone/cmd/deps_dev.go @@ -74,13 +74,16 @@ var depsDevCmd = &cobra.Command{ logger := logging.FromContext(ctx) transport := cli.HTTPHeaderTransport(ctx, opts.headerFile, http.DefaultTransport) - var shutdown func(context.Context) error = func(context.Context) error { return nil } if opts.enableOtel { - var err error - shutdown, err = metrics.SetupOTelSDK(ctx) + shutdown, err := metrics.SetupOTelSDK(ctx) if err != nil { logger.Fatalf("Error setting up Otel: %v", err) } + defer func() { + if err := shutdown(ctx); err != nil { + logger.Errorf("Error on Otel shutdown: %v", err) + } + }() } // Register collector @@ -137,7 +140,7 @@ var depsDevCmd = &cobra.Command{ go func() { defer wg.Done() if err := collector.Collect(ctx, emit, errHandler); err != nil { - logger.Fatal(err) + logger.Errorf("collector exited with error: %v", err) } done <- true }() @@ -151,11 +154,10 @@ var depsDevCmd = &cobra.Command{ logger.Infof("Deps dev collector completed") } cf() - shutdown(ctx) wg.Wait() if gotErr { - logger.Fatalf("completed ingestion with error, %v of %v were successful", totalSuccess, totalNum) + logger.Errorf("completed ingestion with error, %v of %v were successful", totalSuccess, totalNum) } else { logger.Infof("completed ingesting %v documents of %v", totalSuccess, totalNum) } diff --git a/cmd/guacone/cmd/eol.go b/cmd/guacone/cmd/eol.go index 896305bcd5..d2cddb380d 100644 --- a/cmd/guacone/cmd/eol.go +++ b/cmd/guacone/cmd/eol.go @@ -84,13 +84,16 @@ var eolCmd = &cobra.Command{ logger := logging.FromContext(ctx) transport := cli.HTTPHeaderTransport(ctx, opts.headerFile, http.DefaultTransport) - var shutdown func(context.Context) error = func(context.Context) error { return nil } if opts.enableOtel { - var err error - shutdown, err = metrics.SetupOTelSDK(ctx) + shutdown, err := metrics.SetupOTelSDK(ctx) if err != nil { logger.Fatalf("Error setting up Otel: %v", err) } + defer func() { + if err := shutdown(ctx); err != nil { + logger.Errorf("Error on Otel shutdown: %v", err) + } + }() } if err := certify.RegisterCertifier(eol.NewEOLCertifier, eol.EOLCollector); err != nil { @@ -214,7 +217,6 @@ var eolCmd = &cobra.Command{ logger.Infof("All certifiers completed") } ingestionStop <- true - shutdown(ctx) wg.Wait() cf() diff --git a/cmd/guacone/cmd/files.go b/cmd/guacone/cmd/files.go index 12a4bd146f..0675bd67e4 100644 --- a/cmd/guacone/cmd/files.go +++ b/cmd/guacone/cmd/files.go @@ -87,13 +87,16 @@ var filesCmd = &cobra.Command{ logger := logging.FromContext(ctx) transport := cli.HTTPHeaderTransport(ctx, opts.headerFile, http.DefaultTransport) - var shutdown func(context.Context) error = func(context.Context) error { return nil } if opts.enableOtel { - var err error - shutdown, err = metrics.SetupOTelSDK(ctx) + shutdown, err := metrics.SetupOTelSDK(ctx) if err != nil { logger.Fatalf("Error setting up Otel: %v", err) } + defer func() { + if err := shutdown(ctx); err != nil { + logger.Errorf("Error on Otel shutdown: %v", err) + } + }() } // Register Keystore @@ -175,16 +178,15 @@ var filesCmd = &cobra.Command{ } if err := collector.Collect(ctx, emit, errHandler); err != nil { - logger.Fatal(err) + logger.Errorf("collector exited with error: %v", err) } if gotErr { - logger.Fatalf("completed ingestion with error, %v of %v were successful - the following files did not ingest successfully: %v", + logger.Errorf("completed ingestion with error, %v of %v were successful - the following files did not ingest successfully: %v", totalSuccess, totalNum, strings.Join(filesWithErrors, " ")) } else { logger.Infof("completed ingesting %v documents of %v", totalSuccess, totalNum) } - shutdown(ctx) }, } diff --git a/cmd/guacone/cmd/github.go b/cmd/guacone/cmd/github.go index 5b48efeb5f..c917f8b0d1 100644 --- a/cmd/guacone/cmd/github.go +++ b/cmd/guacone/cmd/github.go @@ -109,19 +109,22 @@ var githubCmd = &cobra.Command{ logger := logging.FromContext(ctx) transport := cli.HTTPHeaderTransport(ctx, opts.headerFile, http.DefaultTransport) - var shutdown func(context.Context) error = func(context.Context) error { return nil } if opts.enableOtel { - var err error - shutdown, err = metrics.SetupOTelSDK(ctx) + shutdown, err := metrics.SetupOTelSDK(ctx) if err != nil { logger.Fatalf("Error setting up Otel: %v", err) } + defer func() { + if err := shutdown(ctx); err != nil { + logger.Errorf("Error on Otel shutdown: %v", err) + } + }() } // GITHUB_TOKEN is the default token name ghc, err := githubclient.NewGithubClient(ctx, os.Getenv("GITHUB_TOKEN")) if err != nil { - logger.Errorf("unable to create github client: %v", err) + logger.Fatalf("unable to create github client: %v", err) } // Register collector @@ -141,11 +144,11 @@ var githubCmd = &cobra.Command{ } if opts.ownerRepoName != "" { if !strings.Contains(opts.ownerRepoName, "/") { - logger.Errorf("owner-repo flag must be in the format /") + logger.Fatalf("owner-repo flag must be in the format /") } else { ownerRepoName := strings.Split(opts.ownerRepoName, "/") if len(ownerRepoName) != 2 { - logger.Errorf("owner-repo flag must be in the format /") + logger.Fatalf("owner-repo flag must be in the format /") } collectorOpts = append(collectorOpts, github.WithOwner(ownerRepoName[0])) collectorOpts = append(collectorOpts, github.WithRepo(ownerRepoName[1])) @@ -153,11 +156,11 @@ var githubCmd = &cobra.Command{ } githubCollector, err := github.NewGithubCollector(collectorOpts...) if err != nil { - logger.Errorf("unable to create Github collector: %v", err) + logger.Fatalf("unable to create Github collector: %v", err) } err = collector.RegisterDocumentCollector(githubCollector, github.GithubCollector) if err != nil { - logger.Errorf("unable to register Github collector: %v", err) + logger.Fatalf("unable to register Github collector: %v", err) } csubClient, err := csub_client.NewClient(opts.csubClientOptions) @@ -170,6 +173,9 @@ var githubCmd = &cobra.Command{ var errFound bool + ctx, cancel := context.WithCancel(ctx) + defer cancel() + emit := func(d *processor.Document) error { _, err := ingestor.Ingest( ctx, @@ -194,13 +200,11 @@ var githubCmd = &cobra.Command{ logger.Info("collector ended gracefully") return true } + errFound = true logger.Errorf("collector ended with error: %v", err) return false } - ctx, cancel := context.WithCancel(ctx) - defer cancel() - sigs := make(chan os.Signal, 1) signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) @@ -210,7 +214,7 @@ var githubCmd = &cobra.Command{ go func() { defer wg.Done() if err := collector.Collect(ctx, emit, errHandler); err != nil { - logger.Fatal(err) + logger.Errorf("collector exited with error: %v", err) } }() @@ -222,12 +226,11 @@ var githubCmd = &cobra.Command{ logger.Info("Collector finished") } - shutdown(ctx) wg.Wait() logger.Info("Shutdown complete") if errFound { - logger.Fatalf("completed ingestion with error") + logger.Errorf("completed ingestion with error") } else { logger.Infof("completed ingestion") } diff --git a/cmd/guacone/cmd/license.go b/cmd/guacone/cmd/license.go index d16c046f8b..c803c32a01 100644 --- a/cmd/guacone/cmd/license.go +++ b/cmd/guacone/cmd/license.go @@ -97,13 +97,16 @@ var cdCmd = &cobra.Command{ logger := logging.FromContext(ctx) transport := cli.HTTPHeaderTransport(ctx, opts.headerFile, http.DefaultTransport) - var shutdown func(context.Context) error = func(context.Context) error { return nil } if opts.enableOtel { - var err error - shutdown, err = metrics.SetupOTelSDK(ctx) + shutdown, err := metrics.SetupOTelSDK(ctx) if err != nil { logger.Fatalf("Error setting up Otel: %v", err) } + defer func() { + if err := shutdown(ctx); err != nil { + logger.Errorf("Error on Otel shutdown: %v", err) + } + }() } if err := certify.RegisterCertifier(clearlydefined.NewClearlyDefinedCertifier, certifier.CertifierClearlyDefined); err != nil { @@ -128,6 +131,7 @@ var cdCmd = &cobra.Command{ ingestionStop := make(chan bool, 1) tickInterval := 30 * time.Second ticker := time.NewTicker(tickInterval) + ctx, cf := context.WithCancel(ctx) var gotErr int32 var wg sync.WaitGroup @@ -245,7 +249,6 @@ var cdCmd = &cobra.Command{ return true } - ctx, cf := context.WithCancel(ctx) done := make(chan bool, 1) wg.Add(1) go func() { @@ -265,7 +268,6 @@ var cdCmd = &cobra.Command{ logger.Infof("All certifiers completed") } ingestionStop <- true - shutdown(ctx) wg.Wait() cf() diff --git a/cmd/guacone/cmd/osv.go b/cmd/guacone/cmd/osv.go index 1a29c8c424..41e60ac609 100644 --- a/cmd/guacone/cmd/osv.go +++ b/cmd/guacone/cmd/osv.go @@ -100,13 +100,16 @@ var osvCmd = &cobra.Command{ logger := logging.FromContext(ctx) transport := cli.HTTPHeaderTransport(ctx, opts.headerFile, http.DefaultTransport) - var shutdown func(context.Context) error = func(context.Context) error { return nil } if opts.enableOtel { - var err error - shutdown, err = metrics.SetupOTelSDK(ctx) + shutdown, err := metrics.SetupOTelSDK(ctx) if err != nil { logger.Fatalf("Error setting up Otel: %v", err) } + defer func() { + if err := shutdown(ctx); err != nil { + logger.Errorf("Error on Otel shutdown: %v", err) + } + }() } if err := certify.RegisterCertifier(func() certifier.Certifier { @@ -137,6 +140,7 @@ var osvCmd = &cobra.Command{ ingestionStop := make(chan bool, 1) tickInterval := 30 * time.Second ticker := time.NewTicker(tickInterval) + ctx, cf := context.WithCancel(ctx) var gotErr int32 var wg sync.WaitGroup @@ -257,7 +261,6 @@ var osvCmd = &cobra.Command{ return true } - ctx, cf := context.WithCancel(ctx) done := make(chan bool, 1) wg.Add(1) go func() { @@ -277,7 +280,6 @@ var osvCmd = &cobra.Command{ logger.Infof("All certifiers completed") } ingestionStop <- true - shutdown(ctx) wg.Wait() cf()