Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add otel instrumentation to http/grpc/sql libraries. #2440

Merged
merged 2 commits into from
Jan 16, 2025
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Update shutdown for otel on cli commands
Signed-off-by: Jeff Mendoza <[email protected]>
jeffmendoza committed Jan 16, 2025
commit 786c01350a7e580c12f7e76b12447c570b642b3c
10 changes: 6 additions & 4 deletions cmd/guaccollect/cmd/deps_dev.go
Original file line number Diff line number Diff line change
@@ -102,13 +102,16 @@ you have access to read and write to the respective blob store.`,
os.Exit(1)
}

var shutdown func(context.Context) error = func(context.Context) error { return nil }
if opts.enableOtel {
var err error
shutdown, err = metrics.SetupOTelSDK(ctx)
shutdown, err := metrics.SetupOTelSDK(ctx)
if err != nil {
logger.Fatalf("Error setting up Otel: %v", err)
}
defer func() {
if err := shutdown(ctx); err != nil {
logger.Errorf("Error on Otel shutdown: %v", err)
}
}()
}

// Register collector
@@ -131,7 +134,6 @@ you have access to read and write to the respective blob store.`,
}

initializeNATsandCollector(ctx, opts.pubsubAddr, opts.blobAddr, opts.publishToQueue)
shutdown(ctx)
},
}

10 changes: 6 additions & 4 deletions cmd/guaccollect/cmd/github.go
Original file line number Diff line number Diff line change
@@ -113,13 +113,16 @@ you have access to read and write to the respective blob store.`,
os.Exit(1)
}

var shutdown func(context.Context) error = func(context.Context) error { return nil }
if opts.enableOtel {
var err error
shutdown, err = metrics.SetupOTelSDK(ctx)
shutdown, err := metrics.SetupOTelSDK(ctx)
if err != nil {
logger.Fatalf("Error setting up Otel: %v", err)
}
defer func() {
if err := shutdown(ctx); err != nil {
logger.Errorf("Error on Otel shutdown: %v", err)
}
}()
}

// GITHUB_TOKEN is the default token name
@@ -167,7 +170,6 @@ you have access to read and write to the respective blob store.`,
}

initializeNATsandCollector(ctx, opts.pubsubAddr, opts.blobAddr, opts.publishToQueue)
shutdown(ctx)
},
}

10 changes: 6 additions & 4 deletions cmd/guaccollect/cmd/license.go
Original file line number Diff line number Diff line change
@@ -104,13 +104,16 @@ you have access to read and write to the respective blob store.`,
ctx := logging.WithLogger(context.Background())
logger := logging.FromContext(ctx)

var shutdown func(context.Context) error = func(context.Context) error { return nil }
if opts.enableOtel {
var err error
shutdown, err = metrics.SetupOTelSDK(ctx)
shutdown, err := metrics.SetupOTelSDK(ctx)
if err != nil {
logger.Fatalf("Error setting up Otel: %v", err)
}
defer func() {
if err := shutdown(ctx); err != nil {
logger.Errorf("Error on Otel shutdown: %v", err)
}
}()
}

if err := certify.RegisterCertifier(clearlydefined.NewClearlyDefinedCertifier, certifier.CertifierClearlyDefined); err != nil {
@@ -128,7 +131,6 @@ you have access to read and write to the respective blob store.`,
}

initializeNATsandCertifier(ctx, opts.blobAddr, opts.pubsubAddr, opts.poll, opts.publishToQueue, opts.interval, packageQueryFunc())
shutdown(ctx)
},
}

10 changes: 6 additions & 4 deletions cmd/guaccollect/cmd/osv.go
Original file line number Diff line number Diff line change
@@ -115,13 +115,16 @@ you have access to read and write to the respective blob store.`,
ctx := logging.WithLogger(context.Background())
logger := logging.FromContext(ctx)

var shutdown func(context.Context) error = func(context.Context) error { return nil }
if opts.enableOtel {
var err error
shutdown, err = metrics.SetupOTelSDK(ctx)
shutdown, err := metrics.SetupOTelSDK(ctx)
if err != nil {
logger.Fatalf("Error setting up Otel: %v", err)
}
defer func() {
if err := shutdown(ctx); err != nil {
logger.Errorf("Error on Otel shutdown: %v", err)
}
}()
}

if err := certify.RegisterCertifier(func() certifier.Certifier {
@@ -145,7 +148,6 @@ you have access to read and write to the respective blob store.`,
}

initializeNATsandCertifier(ctx, opts.blobAddr, opts.pubsubAddr, opts.poll, opts.publishToQueue, opts.interval, packageQueryFunc())
shutdown(ctx)
},
}

12 changes: 8 additions & 4 deletions cmd/guacgql/cmd/server.go
Original file line number Diff line number Diff line change
@@ -81,14 +81,19 @@ func startServer(cmd *cobra.Command) {
srvHandler = srv
}

var shutdown func(context.Context) error = func(context.Context) error { return nil }
if flags.enableOtel {
var err error
shutdown, err = metrics.SetupOTelSDK(ctx)
shutdown, err := metrics.SetupOTelSDK(ctx)
if err != nil {
logger.Fatalf("Error setting up Otel: %v", err)
}

srvHandler = otelhttp.NewHandler(srvHandler, "/")

defer func() {
if err := shutdown(ctx); err != nil {
logger.Errorf("Error on Otel shutdown: %v", err)
}
}()
}

if flags.tracegql {
@@ -126,7 +131,6 @@ func startServer(cmd *cobra.Command) {
ctx, cf := context.WithCancel(ctx)
go func() {
_ = server.Shutdown(ctx)
_ = shutdown(ctx)
done <- true
}()
select {
13 changes: 8 additions & 5 deletions cmd/guacingest/cmd/ingest.go
Original file line number Diff line number Diff line change
@@ -73,17 +73,20 @@ func ingest(cmd *cobra.Command, args []string) {
os.Exit(1)
}

ctx, cf := context.WithCancel(logging.WithLogger(context.Background()))
ctx := logging.WithLogger(context.Background())
logger := logging.FromContext(ctx)
transport := cli.HTTPHeaderTransport(ctx, opts.headerFile, http.DefaultTransport)

var shutdown func(context.Context) error = func(context.Context) error { return nil }
if opts.enableOtel {
var err error
shutdown, err = metrics.SetupOTelSDK(ctx)
shutdown, err := metrics.SetupOTelSDK(ctx)
if err != nil {
logger.Fatalf("Error setting up Otel: %v", err)
}
defer func() {
if err := shutdown(ctx); err != nil {
logger.Errorf("Error on Otel shutdown: %v", err)
}
}()
}

if strings.HasPrefix(opts.pubsubAddr, "nats://") {
@@ -113,6 +116,7 @@ func ingest(cmd *cobra.Command, args []string) {
}
defer csubClient.Close()

ctx, cf := context.WithCancel(ctx)
emit := func(d *processor.Document) error {
if _, err := ingestor.Ingest(
ctx,
@@ -150,7 +154,6 @@ func ingest(cmd *cobra.Command, args []string) {
s := <-sigs
logger.Infof("Signal received: %s, shutting down gracefully\n", s.String())
cf()
shutdown(ctx)

wg.Wait()
}
14 changes: 8 additions & 6 deletions cmd/guacone/cmd/deps_dev.go
Original file line number Diff line number Diff line change
@@ -74,13 +74,16 @@ var depsDevCmd = &cobra.Command{
logger := logging.FromContext(ctx)
transport := cli.HTTPHeaderTransport(ctx, opts.headerFile, http.DefaultTransport)

var shutdown func(context.Context) error = func(context.Context) error { return nil }
if opts.enableOtel {
var err error
shutdown, err = metrics.SetupOTelSDK(ctx)
shutdown, err := metrics.SetupOTelSDK(ctx)
if err != nil {
logger.Fatalf("Error setting up Otel: %v", err)
}
defer func() {
if err := shutdown(ctx); err != nil {
logger.Errorf("Error on Otel shutdown: %v", err)
}
}()
}

// Register collector
@@ -137,7 +140,7 @@ var depsDevCmd = &cobra.Command{
go func() {
defer wg.Done()
if err := collector.Collect(ctx, emit, errHandler); err != nil {
logger.Fatal(err)
logger.Errorf("collector exited with error: %v", err)
}
done <- true
}()
@@ -151,11 +154,10 @@ var depsDevCmd = &cobra.Command{
logger.Infof("Deps dev collector completed")
}
cf()
shutdown(ctx)
wg.Wait()

if gotErr {
logger.Fatalf("completed ingestion with error, %v of %v were successful", totalSuccess, totalNum)
logger.Errorf("completed ingestion with error, %v of %v were successful", totalSuccess, totalNum)
} else {
logger.Infof("completed ingesting %v documents of %v", totalSuccess, totalNum)
}
10 changes: 6 additions & 4 deletions cmd/guacone/cmd/eol.go
Original file line number Diff line number Diff line change
@@ -84,13 +84,16 @@ var eolCmd = &cobra.Command{
logger := logging.FromContext(ctx)
transport := cli.HTTPHeaderTransport(ctx, opts.headerFile, http.DefaultTransport)

var shutdown func(context.Context) error = func(context.Context) error { return nil }
if opts.enableOtel {
var err error
shutdown, err = metrics.SetupOTelSDK(ctx)
shutdown, err := metrics.SetupOTelSDK(ctx)
if err != nil {
logger.Fatalf("Error setting up Otel: %v", err)
}
defer func() {
if err := shutdown(ctx); err != nil {
logger.Errorf("Error on Otel shutdown: %v", err)
}
}()
}

if err := certify.RegisterCertifier(eol.NewEOLCertifier, eol.EOLCollector); err != nil {
@@ -214,7 +217,6 @@ var eolCmd = &cobra.Command{
logger.Infof("All certifiers completed")
}
ingestionStop <- true
shutdown(ctx)
wg.Wait()
cf()

14 changes: 8 additions & 6 deletions cmd/guacone/cmd/files.go
Original file line number Diff line number Diff line change
@@ -87,13 +87,16 @@ var filesCmd = &cobra.Command{
logger := logging.FromContext(ctx)
transport := cli.HTTPHeaderTransport(ctx, opts.headerFile, http.DefaultTransport)

var shutdown func(context.Context) error = func(context.Context) error { return nil }
if opts.enableOtel {
var err error
shutdown, err = metrics.SetupOTelSDK(ctx)
shutdown, err := metrics.SetupOTelSDK(ctx)
if err != nil {
logger.Fatalf("Error setting up Otel: %v", err)
}
defer func() {
if err := shutdown(ctx); err != nil {
logger.Errorf("Error on Otel shutdown: %v", err)
}
}()
}

// Register Keystore
@@ -175,16 +178,15 @@ var filesCmd = &cobra.Command{
}

if err := collector.Collect(ctx, emit, errHandler); err != nil {
logger.Fatal(err)
logger.Errorf("collector exited with error: %v", err)
}

if gotErr {
logger.Fatalf("completed ingestion with error, %v of %v were successful - the following files did not ingest successfully: %v",
logger.Errorf("completed ingestion with error, %v of %v were successful - the following files did not ingest successfully: %v",
totalSuccess, totalNum, strings.Join(filesWithErrors, " "))
} else {
logger.Infof("completed ingesting %v documents of %v", totalSuccess, totalNum)
}
shutdown(ctx)
},
}

31 changes: 17 additions & 14 deletions cmd/guacone/cmd/github.go
Original file line number Diff line number Diff line change
@@ -109,19 +109,22 @@ var githubCmd = &cobra.Command{
logger := logging.FromContext(ctx)
transport := cli.HTTPHeaderTransport(ctx, opts.headerFile, http.DefaultTransport)

var shutdown func(context.Context) error = func(context.Context) error { return nil }
if opts.enableOtel {
var err error
shutdown, err = metrics.SetupOTelSDK(ctx)
shutdown, err := metrics.SetupOTelSDK(ctx)
if err != nil {
logger.Fatalf("Error setting up Otel: %v", err)
}
defer func() {
if err := shutdown(ctx); err != nil {
logger.Errorf("Error on Otel shutdown: %v", err)
}
}()
}

// GITHUB_TOKEN is the default token name
ghc, err := githubclient.NewGithubClient(ctx, os.Getenv("GITHUB_TOKEN"))
if err != nil {
logger.Errorf("unable to create github client: %v", err)
logger.Fatalf("unable to create github client: %v", err)
}

// Register collector
@@ -141,23 +144,23 @@ var githubCmd = &cobra.Command{
}
if opts.ownerRepoName != "" {
if !strings.Contains(opts.ownerRepoName, "/") {
logger.Errorf("owner-repo flag must be in the format <owner>/<repo>")
logger.Fatalf("owner-repo flag must be in the format <owner>/<repo>")
} else {
ownerRepoName := strings.Split(opts.ownerRepoName, "/")
if len(ownerRepoName) != 2 {
logger.Errorf("owner-repo flag must be in the format <owner>/<repo>")
logger.Fatalf("owner-repo flag must be in the format <owner>/<repo>")
}
collectorOpts = append(collectorOpts, github.WithOwner(ownerRepoName[0]))
collectorOpts = append(collectorOpts, github.WithRepo(ownerRepoName[1]))
}
}
githubCollector, err := github.NewGithubCollector(collectorOpts...)
if err != nil {
logger.Errorf("unable to create Github collector: %v", err)
logger.Fatalf("unable to create Github collector: %v", err)
}
err = collector.RegisterDocumentCollector(githubCollector, github.GithubCollector)
if err != nil {
logger.Errorf("unable to register Github collector: %v", err)
logger.Fatalf("unable to register Github collector: %v", err)
}

csubClient, err := csub_client.NewClient(opts.csubClientOptions)
@@ -170,6 +173,9 @@ var githubCmd = &cobra.Command{

var errFound bool

ctx, cancel := context.WithCancel(ctx)
defer cancel()

emit := func(d *processor.Document) error {
_, err := ingestor.Ingest(
ctx,
@@ -194,13 +200,11 @@ var githubCmd = &cobra.Command{
logger.Info("collector ended gracefully")
return true
}
errFound = true
logger.Errorf("collector ended with error: %v", err)
return false
}

ctx, cancel := context.WithCancel(ctx)
defer cancel()

sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)

@@ -210,7 +214,7 @@ var githubCmd = &cobra.Command{
go func() {
defer wg.Done()
if err := collector.Collect(ctx, emit, errHandler); err != nil {
logger.Fatal(err)
logger.Errorf("collector exited with error: %v", err)
}
}()

@@ -222,12 +226,11 @@ var githubCmd = &cobra.Command{
logger.Info("Collector finished")
}

shutdown(ctx)
wg.Wait()
logger.Info("Shutdown complete")

if errFound {
logger.Fatalf("completed ingestion with error")
logger.Errorf("completed ingestion with error")
} else {
logger.Infof("completed ingestion")
}
Loading