Skip to content

Commit

Permalink
Update shutdown for otel on cli commands
Browse files Browse the repository at this point in the history
Signed-off-by: Jeff Mendoza <[email protected]>
  • Loading branch information
jeffmendoza committed Jan 16, 2025
1 parent 89cbb4d commit 786c013
Show file tree
Hide file tree
Showing 12 changed files with 93 additions and 65 deletions.
10 changes: 6 additions & 4 deletions cmd/guaccollect/cmd/deps_dev.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,16 @@ you have access to read and write to the respective blob store.`,
os.Exit(1)
}

var shutdown func(context.Context) error = func(context.Context) error { return nil }
if opts.enableOtel {
var err error
shutdown, err = metrics.SetupOTelSDK(ctx)
shutdown, err := metrics.SetupOTelSDK(ctx)
if err != nil {
logger.Fatalf("Error setting up Otel: %v", err)
}
defer func() {
if err := shutdown(ctx); err != nil {
logger.Errorf("Error on Otel shutdown: %v", err)
}
}()
}

// Register collector
Expand All @@ -131,7 +134,6 @@ you have access to read and write to the respective blob store.`,
}

initializeNATsandCollector(ctx, opts.pubsubAddr, opts.blobAddr, opts.publishToQueue)
shutdown(ctx)
},
}

Expand Down
10 changes: 6 additions & 4 deletions cmd/guaccollect/cmd/github.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,16 @@ you have access to read and write to the respective blob store.`,
os.Exit(1)
}

var shutdown func(context.Context) error = func(context.Context) error { return nil }
if opts.enableOtel {
var err error
shutdown, err = metrics.SetupOTelSDK(ctx)
shutdown, err := metrics.SetupOTelSDK(ctx)
if err != nil {
logger.Fatalf("Error setting up Otel: %v", err)
}
defer func() {
if err := shutdown(ctx); err != nil {
logger.Errorf("Error on Otel shutdown: %v", err)
}
}()
}

// GITHUB_TOKEN is the default token name
Expand Down Expand Up @@ -167,7 +170,6 @@ you have access to read and write to the respective blob store.`,
}

initializeNATsandCollector(ctx, opts.pubsubAddr, opts.blobAddr, opts.publishToQueue)
shutdown(ctx)
},
}

Expand Down
10 changes: 6 additions & 4 deletions cmd/guaccollect/cmd/license.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,16 @@ you have access to read and write to the respective blob store.`,
ctx := logging.WithLogger(context.Background())
logger := logging.FromContext(ctx)

var shutdown func(context.Context) error = func(context.Context) error { return nil }
if opts.enableOtel {
var err error
shutdown, err = metrics.SetupOTelSDK(ctx)
shutdown, err := metrics.SetupOTelSDK(ctx)
if err != nil {
logger.Fatalf("Error setting up Otel: %v", err)
}
defer func() {
if err := shutdown(ctx); err != nil {
logger.Errorf("Error on Otel shutdown: %v", err)
}
}()
}

if err := certify.RegisterCertifier(clearlydefined.NewClearlyDefinedCertifier, certifier.CertifierClearlyDefined); err != nil {
Expand All @@ -128,7 +131,6 @@ you have access to read and write to the respective blob store.`,
}

initializeNATsandCertifier(ctx, opts.blobAddr, opts.pubsubAddr, opts.poll, opts.publishToQueue, opts.interval, packageQueryFunc())
shutdown(ctx)
},
}

Expand Down
10 changes: 6 additions & 4 deletions cmd/guaccollect/cmd/osv.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,16 @@ you have access to read and write to the respective blob store.`,
ctx := logging.WithLogger(context.Background())
logger := logging.FromContext(ctx)

var shutdown func(context.Context) error = func(context.Context) error { return nil }
if opts.enableOtel {
var err error
shutdown, err = metrics.SetupOTelSDK(ctx)
shutdown, err := metrics.SetupOTelSDK(ctx)
if err != nil {
logger.Fatalf("Error setting up Otel: %v", err)
}
defer func() {
if err := shutdown(ctx); err != nil {
logger.Errorf("Error on Otel shutdown: %v", err)
}
}()
}

if err := certify.RegisterCertifier(func() certifier.Certifier {
Expand All @@ -145,7 +148,6 @@ you have access to read and write to the respective blob store.`,
}

initializeNATsandCertifier(ctx, opts.blobAddr, opts.pubsubAddr, opts.poll, opts.publishToQueue, opts.interval, packageQueryFunc())
shutdown(ctx)
},
}

Expand Down
12 changes: 8 additions & 4 deletions cmd/guacgql/cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,19 @@ func startServer(cmd *cobra.Command) {
srvHandler = srv
}

var shutdown func(context.Context) error = func(context.Context) error { return nil }
if flags.enableOtel {
var err error
shutdown, err = metrics.SetupOTelSDK(ctx)
shutdown, err := metrics.SetupOTelSDK(ctx)
if err != nil {
logger.Fatalf("Error setting up Otel: %v", err)
}

srvHandler = otelhttp.NewHandler(srvHandler, "/")

defer func() {
if err := shutdown(ctx); err != nil {
logger.Errorf("Error on Otel shutdown: %v", err)
}
}()
}

if flags.tracegql {
Expand Down Expand Up @@ -126,7 +131,6 @@ func startServer(cmd *cobra.Command) {
ctx, cf := context.WithCancel(ctx)
go func() {
_ = server.Shutdown(ctx)
_ = shutdown(ctx)
done <- true
}()
select {
Expand Down
13 changes: 8 additions & 5 deletions cmd/guacingest/cmd/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,20 @@ func ingest(cmd *cobra.Command, args []string) {
os.Exit(1)
}

ctx, cf := context.WithCancel(logging.WithLogger(context.Background()))
ctx := logging.WithLogger(context.Background())
logger := logging.FromContext(ctx)
transport := cli.HTTPHeaderTransport(ctx, opts.headerFile, http.DefaultTransport)

var shutdown func(context.Context) error = func(context.Context) error { return nil }
if opts.enableOtel {
var err error
shutdown, err = metrics.SetupOTelSDK(ctx)
shutdown, err := metrics.SetupOTelSDK(ctx)
if err != nil {
logger.Fatalf("Error setting up Otel: %v", err)
}
defer func() {
if err := shutdown(ctx); err != nil {
logger.Errorf("Error on Otel shutdown: %v", err)
}
}()
}

if strings.HasPrefix(opts.pubsubAddr, "nats://") {
Expand Down Expand Up @@ -113,6 +116,7 @@ func ingest(cmd *cobra.Command, args []string) {
}
defer csubClient.Close()

ctx, cf := context.WithCancel(ctx)
emit := func(d *processor.Document) error {
if _, err := ingestor.Ingest(
ctx,
Expand Down Expand Up @@ -150,7 +154,6 @@ func ingest(cmd *cobra.Command, args []string) {
s := <-sigs
logger.Infof("Signal received: %s, shutting down gracefully\n", s.String())
cf()
shutdown(ctx)

wg.Wait()
}
Expand Down
14 changes: 8 additions & 6 deletions cmd/guacone/cmd/deps_dev.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,16 @@ var depsDevCmd = &cobra.Command{
logger := logging.FromContext(ctx)
transport := cli.HTTPHeaderTransport(ctx, opts.headerFile, http.DefaultTransport)

var shutdown func(context.Context) error = func(context.Context) error { return nil }
if opts.enableOtel {
var err error
shutdown, err = metrics.SetupOTelSDK(ctx)
shutdown, err := metrics.SetupOTelSDK(ctx)
if err != nil {
logger.Fatalf("Error setting up Otel: %v", err)
}
defer func() {
if err := shutdown(ctx); err != nil {
logger.Errorf("Error on Otel shutdown: %v", err)
}
}()
}

// Register collector
Expand Down Expand Up @@ -137,7 +140,7 @@ var depsDevCmd = &cobra.Command{
go func() {
defer wg.Done()
if err := collector.Collect(ctx, emit, errHandler); err != nil {
logger.Fatal(err)
logger.Errorf("collector exited with error: %v", err)
}
done <- true
}()
Expand All @@ -151,11 +154,10 @@ var depsDevCmd = &cobra.Command{
logger.Infof("Deps dev collector completed")
}
cf()
shutdown(ctx)
wg.Wait()

if gotErr {
logger.Fatalf("completed ingestion with error, %v of %v were successful", totalSuccess, totalNum)
logger.Errorf("completed ingestion with error, %v of %v were successful", totalSuccess, totalNum)
} else {
logger.Infof("completed ingesting %v documents of %v", totalSuccess, totalNum)
}
Expand Down
10 changes: 6 additions & 4 deletions cmd/guacone/cmd/eol.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,16 @@ var eolCmd = &cobra.Command{
logger := logging.FromContext(ctx)
transport := cli.HTTPHeaderTransport(ctx, opts.headerFile, http.DefaultTransport)

var shutdown func(context.Context) error = func(context.Context) error { return nil }
if opts.enableOtel {
var err error
shutdown, err = metrics.SetupOTelSDK(ctx)
shutdown, err := metrics.SetupOTelSDK(ctx)
if err != nil {
logger.Fatalf("Error setting up Otel: %v", err)
}
defer func() {
if err := shutdown(ctx); err != nil {
logger.Errorf("Error on Otel shutdown: %v", err)
}
}()
}

if err := certify.RegisterCertifier(eol.NewEOLCertifier, eol.EOLCollector); err != nil {
Expand Down Expand Up @@ -214,7 +217,6 @@ var eolCmd = &cobra.Command{
logger.Infof("All certifiers completed")
}
ingestionStop <- true
shutdown(ctx)
wg.Wait()
cf()

Expand Down
14 changes: 8 additions & 6 deletions cmd/guacone/cmd/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,16 @@ var filesCmd = &cobra.Command{
logger := logging.FromContext(ctx)
transport := cli.HTTPHeaderTransport(ctx, opts.headerFile, http.DefaultTransport)

var shutdown func(context.Context) error = func(context.Context) error { return nil }
if opts.enableOtel {
var err error
shutdown, err = metrics.SetupOTelSDK(ctx)
shutdown, err := metrics.SetupOTelSDK(ctx)
if err != nil {
logger.Fatalf("Error setting up Otel: %v", err)
}
defer func() {
if err := shutdown(ctx); err != nil {
logger.Errorf("Error on Otel shutdown: %v", err)
}
}()
}

// Register Keystore
Expand Down Expand Up @@ -175,16 +178,15 @@ var filesCmd = &cobra.Command{
}

if err := collector.Collect(ctx, emit, errHandler); err != nil {
logger.Fatal(err)
logger.Errorf("collector exited with error: %v", err)
}

if gotErr {
logger.Fatalf("completed ingestion with error, %v of %v were successful - the following files did not ingest successfully: %v",
logger.Errorf("completed ingestion with error, %v of %v were successful - the following files did not ingest successfully: %v",
totalSuccess, totalNum, strings.Join(filesWithErrors, " "))
} else {
logger.Infof("completed ingesting %v documents of %v", totalSuccess, totalNum)
}
shutdown(ctx)
},
}

Expand Down
31 changes: 17 additions & 14 deletions cmd/guacone/cmd/github.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,19 +109,22 @@ var githubCmd = &cobra.Command{
logger := logging.FromContext(ctx)
transport := cli.HTTPHeaderTransport(ctx, opts.headerFile, http.DefaultTransport)

var shutdown func(context.Context) error = func(context.Context) error { return nil }
if opts.enableOtel {
var err error
shutdown, err = metrics.SetupOTelSDK(ctx)
shutdown, err := metrics.SetupOTelSDK(ctx)
if err != nil {
logger.Fatalf("Error setting up Otel: %v", err)
}
defer func() {
if err := shutdown(ctx); err != nil {
logger.Errorf("Error on Otel shutdown: %v", err)
}
}()
}

// GITHUB_TOKEN is the default token name
ghc, err := githubclient.NewGithubClient(ctx, os.Getenv("GITHUB_TOKEN"))
if err != nil {
logger.Errorf("unable to create github client: %v", err)
logger.Fatalf("unable to create github client: %v", err)
}

// Register collector
Expand All @@ -141,23 +144,23 @@ var githubCmd = &cobra.Command{
}
if opts.ownerRepoName != "" {
if !strings.Contains(opts.ownerRepoName, "/") {
logger.Errorf("owner-repo flag must be in the format <owner>/<repo>")
logger.Fatalf("owner-repo flag must be in the format <owner>/<repo>")
} else {
ownerRepoName := strings.Split(opts.ownerRepoName, "/")
if len(ownerRepoName) != 2 {
logger.Errorf("owner-repo flag must be in the format <owner>/<repo>")
logger.Fatalf("owner-repo flag must be in the format <owner>/<repo>")
}
collectorOpts = append(collectorOpts, github.WithOwner(ownerRepoName[0]))
collectorOpts = append(collectorOpts, github.WithRepo(ownerRepoName[1]))
}
}
githubCollector, err := github.NewGithubCollector(collectorOpts...)
if err != nil {
logger.Errorf("unable to create Github collector: %v", err)
logger.Fatalf("unable to create Github collector: %v", err)
}
err = collector.RegisterDocumentCollector(githubCollector, github.GithubCollector)
if err != nil {
logger.Errorf("unable to register Github collector: %v", err)
logger.Fatalf("unable to register Github collector: %v", err)
}

csubClient, err := csub_client.NewClient(opts.csubClientOptions)
Expand All @@ -170,6 +173,9 @@ var githubCmd = &cobra.Command{

var errFound bool

ctx, cancel := context.WithCancel(ctx)
defer cancel()

emit := func(d *processor.Document) error {
_, err := ingestor.Ingest(
ctx,
Expand All @@ -194,13 +200,11 @@ var githubCmd = &cobra.Command{
logger.Info("collector ended gracefully")
return true
}
errFound = true
logger.Errorf("collector ended with error: %v", err)
return false
}

ctx, cancel := context.WithCancel(ctx)
defer cancel()

sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)

Expand All @@ -210,7 +214,7 @@ var githubCmd = &cobra.Command{
go func() {
defer wg.Done()
if err := collector.Collect(ctx, emit, errHandler); err != nil {
logger.Fatal(err)
logger.Errorf("collector exited with error: %v", err)
}
}()

Expand All @@ -222,12 +226,11 @@ var githubCmd = &cobra.Command{
logger.Info("Collector finished")
}

shutdown(ctx)
wg.Wait()
logger.Info("Shutdown complete")

if errFound {
logger.Fatalf("completed ingestion with error")
logger.Errorf("completed ingestion with error")
} else {
logger.Infof("completed ingestion")
}
Expand Down
Loading

0 comments on commit 786c013

Please sign in to comment.