Skip to content

Commit

Permalink
observability: add unitests for tracing
Browse files Browse the repository at this point in the history
Signed-off-by: Prashanth Dintyala <[email protected]>
  • Loading branch information
saiprashanth173 committed Nov 6, 2024
1 parent 6e6af34 commit 15db34f
Show file tree
Hide file tree
Showing 6 changed files with 200 additions and 9 deletions.
1 change: 1 addition & 0 deletions .gitlab-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ test:short:
- ${SCRIPTS_DIR}/clean_deploy.sh --target-cnt $NUM_TARGET --proxy-cnt $NUM_PROXY --mountpath-cnt $FS_CNT --deployment all
- make test-short
- FLAGS="--duration=10s" make test-aisloader
- make test-tracing-unit

# Runs cluster with 5 proxies and 5 targets (each with 6 mountpaths).
test:short:python:
Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,9 @@ test-short: test-envcheck ## Run short tests
@RE="$(RE)" BUCKET="$(BUCKET)" TESTS_DIR="$(TESTS_DIR)" AIS_ENDPOINT="$(AIS_ENDPOINT)" $(SHELL) "$(SCRIPTS_DIR)/bootstrap.sh" test-short
@cd $(BUILD_DIR)/cli && go test -v -tags=debug ./...

test-tracing-unit:
@cd tracing && go test -v -tags=oteltracing ./...

test-assorted: test-envcheck # Run specific tests
@RE="ETLBucket|ETLConnectionError|ETLInitCode" BUCKET="$(BUCKET)" TESTS_DIR="$(TESTS_DIR)" AIS_ENDPOINT="$(AIS_ENDPOINT)" $(SHELL) "$(SCRIPTS_DIR)/bootstrap.sh" test-long

Expand Down
6 changes: 3 additions & 3 deletions cmn/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ type (
Enabled bool `json:"enabled"`
SkipVerify bool `json:"skip_verify"` // allow insecure exporter gRPC connection

SamplerProbablity float64 `json:"-"`
SamplerProbability float64 `json:"-"`
}

// NOTE: Updating TracingConfig requires restart.
Expand Down Expand Up @@ -1814,13 +1814,13 @@ func (c *TracingConf) Validate() error {
return errors.New("invalid tracing.exporter_endpoint can't be empty when tracing is enabled")
}
if c.SamplerProbabilityStr == "" {
c.SamplerProbablity = defaultSampleProbability
c.SamplerProbability = defaultSampleProbability
} else {
prob, err := strconv.ParseFloat(c.SamplerProbabilityStr, 64)
if err != nil {
return nil
}
c.SamplerProbablity = prob
c.SamplerProbability = prob
}
return nil
}
Expand Down
17 changes: 17 additions & 0 deletions tracing/tracing_internal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Package tracing offers support for distributed tracing utilizing OpenTelemetry (OTEL).
/*
* Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
*/
package tracing

import (
"testing"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

func TestTracing(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, t.Name())
}
17 changes: 11 additions & 6 deletions tracing/tracing_on.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ import (
"os"
"strings"

"github.com/NVIDIA/aistore/api/env"
"github.com/NVIDIA/aistore/cmn"
"github.com/NVIDIA/aistore/cmn/cos"
"github.com/NVIDIA/aistore/cmn/nlog"
"github.com/NVIDIA/aistore/core/meta"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel"
Expand All @@ -34,7 +36,7 @@ func loadAccessToken(tokenFilePath string) string {
return strings.TrimSpace(string(data))
}

func newExporter(conf *cmn.TracingConf) (trace.SpanExporter, error) {
var newExporter = func(conf *cmn.TracingConf) (trace.SpanExporter, error) {
headers := map[string]string{}
if conf.ExporterAuth.IsEnabled() {
token := loadAccessToken(conf.ExporterAuth.TokenFile)
Expand Down Expand Up @@ -64,7 +66,7 @@ func newResource(conf *cmn.TracingConf, snode *meta.Snode, version string) *reso
semconv.ServiceNameKey.String(serviceName),
attribute.String("version", version),
attribute.String("daemonID", snode.DaeID),
attribute.String("pod", os.Getenv("MY_POD")), // TODO: get from consts
attribute.String("pod", os.Getenv(env.AIS.K8sPod)),
}
for k, v := range conf.ExtraAttributes {
attrs = append(attrs, attribute.String(k, v))
Expand All @@ -85,7 +87,7 @@ func IsEnabled() bool {

func Init(conf *cmn.TracingConf, snode *meta.Snode, version string) {
if conf == nil || !conf.Enabled {
cos.ExitLogf("distributed tracing not enabled (%+v)", conf)
nlog.Infof("distributed tracing not enabled (%+v)", conf)
return
}

Expand All @@ -94,7 +96,7 @@ func Init(conf *cmn.TracingConf, snode *meta.Snode, version string) {
cos.AssertNoErr(err)

tp = trace.NewTracerProvider(
trace.WithSampler(trace.ParentBased(trace.TraceIDRatioBased(conf.SamplerProbablity))),
trace.WithSampler(trace.ParentBased(trace.TraceIDRatioBased(conf.SamplerProbability))),
trace.WithBatcher(exp),
trace.WithResource(newResource(conf, snode, version)),
)
Expand All @@ -109,7 +111,7 @@ func Init(conf *cmn.TracingConf, snode *meta.Snode, version string) {
}

func Shutdown() {
if tp != nil {
if !IsEnabled() {
return
}
if err := tp.Shutdown(context.Background()); err != nil {
Expand All @@ -118,7 +120,10 @@ func Shutdown() {
}

func NewTraceableHandler(handler http.Handler, operation string) http.Handler {
return otelhttp.NewHandler(handler, operation)
if IsEnabled() {
return otelhttp.NewHandler(handler, operation)
}
return handler
}

func NewTraceableClient(client *http.Client) *http.Client {
Expand Down
165 changes: 165 additions & 0 deletions tracing/tracing_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
//go:build oteltracing

// Package tracing offers support for distributed tracing utilizing OpenTelemetry (OTEL).
/*
* Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
*/
package tracing

import (
"context"
"io"
"net/http"
"net/http/httptest"

"github.com/NVIDIA/aistore/cmn"
"github.com/NVIDIA/aistore/core/meta"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
)

var _ = Describe("Tracing", func() {
const aisVersion = "v3.33"

var (
exporter *tracetest.InMemoryExporter

origExporter = newExporter
dummySnode = &meta.Snode{DaeID: "test", DaeType: "proxy"}

newTestHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("-"))
})

expectResourceAttrs = func(attrs []attribute.KeyValue) {
expectedAttributes := map[string]string{
"service.name": "aistore-" + dummySnode.DaeType,
"version": aisVersion,
"daemonID": dummySnode.DaeID,
}
Expect(len(attrs)).NotTo(BeEquivalentTo(0))
matched := 0
for _, attribute := range attrs {
value, ok := expectedAttributes[string(attribute.Key)]
if !ok {
continue
}
Expect(attribute.Value.AsString()).To(BeEquivalentTo(value))
matched++
}
Expect(matched).To(BeEquivalentTo(len(expectedAttributes)))
}
)

BeforeEach(func() {
exporter = tracetest.NewInMemoryExporter()
newExporter = func(conf *cmn.TracingConf) (trace.SpanExporter, error) {
return exporter, nil
}
})

AfterEach(func() {
newExporter = origExporter
})

Describe("Server", func() {
AfterEach(func() {
Shutdown()
tp = nil
})
It("should export server trace when tracing enabled", func() {
Init(&cmn.TracingConf{
ExporterEndpoint: "dummy",
Enabled: true,
SamplerProbability: 1.0,
}, dummySnode, aisVersion)
Expect(IsEnabled()).To(BeTrue())

server := httptest.NewServer(NewTraceableHandler(newTestHandler, "testendpoint"))
defer server.Close()

_, err := http.Get(server.URL)
Expect(err).NotTo(HaveOccurred())

tp.ForceFlush(context.Background())

Expect(len(exporter.GetSpans())).To(BeEquivalentTo(1))
span := exporter.GetSpans()[0]
expectResourceAttrs(span.Resource.Attributes())
})

It("should do nothing when tracing disabled", func() {
Init(&cmn.TracingConf{
Enabled: false,
}, dummySnode, aisVersion)
Expect(IsEnabled()).To(BeFalse())

server := httptest.NewServer(NewTraceableHandler(newTestHandler, "testendpoint"))
defer server.Close()

_, err := http.Get(server.URL)
Expect(err).NotTo(HaveOccurred())

Expect(len(exporter.GetSpans())).To(BeEquivalentTo(0))
})
})

Describe("Client", func() {
AfterEach(func() {
Shutdown()
tp = nil
})
It("should export client trace when tracing enabled", func() {
Init(&cmn.TracingConf{
ExporterEndpoint: "dummy",
Enabled: true,
SamplerProbability: 1.0,
}, dummySnode, aisVersion)
Expect(IsEnabled()).To(BeTrue())

server := httptest.NewServer(newTestHandler)
defer server.Close()

client := NewTraceableClient(http.DefaultClient)
_, isOtelType := client.Transport.(*otelhttp.Transport)
Expect(isOtelType).To(BeTrue())

resp, err := client.Get(server.URL)
Expect(err).NotTo(HaveOccurred())

_, err = io.ReadAll(resp.Body)
Expect(err).NotTo(HaveOccurred())

tp.ForceFlush(context.Background())

Expect(len(exporter.GetSpans())).To(BeEquivalentTo(1))
span := exporter.GetSpans()[0]
expectResourceAttrs(span.Resource.Attributes())
})

It("should do nothing when tracing disabled", func() {
Init(&cmn.TracingConf{
Enabled: false,
}, dummySnode, aisVersion)
Expect(IsEnabled()).To(BeFalse())

server := httptest.NewServer(newTestHandler)
defer server.Close()

client := NewTraceableClient(http.DefaultClient)

resp, err := client.Get(server.URL)
Expect(err).NotTo(HaveOccurred())

_, err = io.ReadAll(resp.Body)
Expect(err).NotTo(HaveOccurred())

Expect(len(exporter.GetSpans())).To(BeEquivalentTo(0))
})
})
})

0 comments on commit 15db34f

Please sign in to comment.