Skip to content

Commit

Permalink
Merge pull request #285 from Surax98/otel_interlink
Browse files Browse the repository at this point in the history
OpenTelemetry tracing for the interLink API server
  • Loading branch information
dciangot authored Aug 29, 2024
2 parents a2521d5 + be83bfb commit 24c4597
Show file tree
Hide file tree
Showing 12 changed files with 374 additions and 155 deletions.
83 changes: 83 additions & 0 deletions cmd/interlink/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,86 @@ package main

import (
"context"
"crypto/tls"
"flag"
"fmt"
"net/http"
"os"
"strings"
"time"

"github.com/sirupsen/logrus"
"github.com/virtual-kubelet/virtual-kubelet/log"
logruslogger "github.com/virtual-kubelet/virtual-kubelet/log/logrus"
"github.com/virtual-kubelet/virtual-kubelet/trace"
"github.com/virtual-kubelet/virtual-kubelet/trace/opentelemetry"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"

types "github.com/intertwin-eu/interlink/pkg/interlink"
"github.com/intertwin-eu/interlink/pkg/interlink/api"
"github.com/intertwin-eu/interlink/pkg/virtualkubelet"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.21.0"
)

func initProvider(ctx context.Context) (func(context.Context) error, error) {
res, err := resource.New(ctx,
resource.WithAttributes(
// the service name used to display traces in backends
semconv.ServiceName("InterLink-API"),
),
)
if err != nil {
return nil, fmt.Errorf("failed to create resource: %w", err)
}

ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()

otlpEndpoint := os.Getenv("TELEMETRY_ENDPOINT")

if otlpEndpoint == "" {
otlpEndpoint = "localhost:4317"
}

fmt.Println("TELEMETRY_ENDPOINT: ", otlpEndpoint)

conn := &grpc.ClientConn{}
creds := credentials.NewTLS(&tls.Config{InsecureSkipVerify: true})
conn, err = grpc.DialContext(ctx, otlpEndpoint, grpc.WithTransportCredentials(creds), grpc.WithBlock())

if err != nil {
return nil, fmt.Errorf("failed to create gRPC connection to collector: %w", err)
}

// Set up a trace exporter
traceExporter, err := otlptracegrpc.New(ctx, otlptracegrpc.WithGRPCConn(conn))
if err != nil {
return nil, fmt.Errorf("failed to create trace exporter: %w", err)
}

// Register the trace exporter with a TracerProvider, using a batch
// span processor to aggregate spans before export.
bsp := sdktrace.NewBatchSpanProcessor(traceExporter)
tracerProvider := sdktrace.NewTracerProvider(
sdktrace.WithSampler(sdktrace.AlwaysSample()),
sdktrace.WithResource(res),
sdktrace.WithSpanProcessor(bsp),
)
otel.SetTracerProvider(tracerProvider)

// set global propagator to tracecontext (the default is no-op).
otel.SetTextMapPropagator(propagation.TraceContext{})

return tracerProvider.Shutdown, nil
}

func main() {
printVersion := flag.Bool("version", false, "show version")
flag.Parse()
Expand Down Expand Up @@ -44,6 +110,23 @@ func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

if os.Getenv("ENABLE_TRACING") == "1" {
shutdown, err := initProvider(ctx)
if err != nil {
log.G(ctx).Fatal(err)
}
defer func() {
if err = shutdown(ctx); err != nil {
log.G(ctx).Fatal("failed to shutdown TracerProvider: %w", err)
}
}()

log.G(ctx).Info("Tracer setup succeeded")

// TODO: disable this through options
trace.T = opentelemetry.Adapter{}
}

log.G(ctx).Info(interLinkConfig)

log.G(ctx).Info("interLink version: ", virtualkubelet.KubeletVersion)
Expand Down
37 changes: 5 additions & 32 deletions cmd/virtual-kubelet/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@ package main
import (
"context"
"crypto/tls"
"crypto/x509"
"flag"
"fmt"
"io/ioutil"
"net"
"os"
"path"
Expand Down Expand Up @@ -95,13 +93,11 @@ type Opts struct {
ErrorsOnly bool
}

func initProvider() (func(context.Context) error, error) {
ctx := context.Background()

func initProvider(ctx context.Context) (func(context.Context) error, error) {
res, err := resource.New(ctx,
resource.WithAttributes(
// the service name used to display traces in backends
semconv.ServiceName("InterLink-service"),
semconv.ServiceName("InterLink-VK"),
),
)
if err != nil {
Expand All @@ -119,32 +115,9 @@ func initProvider() (func(context.Context) error, error) {

fmt.Println("TELEMETRY_ENDPOINT: ", otlpEndpoint)

crtFilePath := os.Getenv("TELEMETRY_CRTFILEPATH")

conn := &grpc.ClientConn{}

if crtFilePath != "" {
cert, err := ioutil.ReadFile(crtFilePath)
if err != nil {
return nil, fmt.Errorf("failed to create resource: %w", err)
}

roots := x509.NewCertPool()
if !roots.AppendCertsFromPEM(cert) {
return nil, fmt.Errorf("failed to create resource: %w", err)
}

creds := credentials.NewTLS(&tls.Config{
RootCAs: roots,
})

conn, err = grpc.DialContext(ctx, otlpEndpoint,
grpc.WithTransportCredentials(creds),
grpc.WithBlock(),
)
} else {
conn, err = grpc.DialContext(ctx, otlpEndpoint, grpc.WithInsecure(), grpc.WithBlock())
}
creds := credentials.NewTLS(&tls.Config{InsecureSkipVerify: true})
conn, err = grpc.DialContext(ctx, otlpEndpoint, grpc.WithTransportCredentials(creds), grpc.WithBlock())

if err != nil {
return nil, fmt.Errorf("failed to create gRPC connection to collector: %w", err)
Expand Down Expand Up @@ -213,7 +186,7 @@ func main() {
log.L = logruslogger.FromLogrus(logrus.NewEntry(logger))

if os.Getenv("ENABLE_TRACING") == "1" {
shutdown, err := initProvider()
shutdown, err := initProvider(ctx)
if err != nil {
log.G(ctx).Fatal(err)
}
Expand Down
56 changes: 39 additions & 17 deletions pkg/interlink/api/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,27 @@ import (
"encoding/json"
"io"
"net/http"
"time"

"github.com/containerd/containerd/log"

types "github.com/intertwin-eu/interlink/pkg/interlink"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
trace "go.opentelemetry.io/otel/trace"
)

// CreateHandler collects and rearranges all needed ConfigMaps/Secrets/EmptyDirs to ship them to the sidecar, then sends a response to the client
func (h *InterLinkHandler) CreateHandler(w http.ResponseWriter, r *http.Request) {
start := time.Now().UnixMicro()
tracer := otel.Tracer("interlink-API")
_, span := tracer.Start(h.Ctx, "CreateAPI", trace.WithAttributes(
attribute.Int64("start.timestamp", start),
))
defer span.End()
defer types.SetDurationSpan(start, span)

log.G(h.Ctx).Info("InterLink: received Create call")

statusCode := -1
Expand All @@ -21,7 +34,7 @@ func (h *InterLinkHandler) CreateHandler(w http.ResponseWriter, r *http.Request)
if err != nil {
statusCode = http.StatusInternalServerError
w.WriteHeader(statusCode)
log.G(h.Ctx).Fatal(err)
log.G(h.Ctx).Error(err)
return
}

Expand All @@ -30,19 +43,25 @@ func (h *InterLinkHandler) CreateHandler(w http.ResponseWriter, r *http.Request)
err = json.Unmarshal(bodyBytes, &pod)
if err != nil {
statusCode = http.StatusInternalServerError
log.G(h.Ctx).Fatal(err)
log.G(h.Ctx).Error(err)
w.WriteHeader(statusCode)
return
}

span.SetAttributes(
attribute.String("pod.name", pod.Pod.Name),
attribute.String("pod.namespace", pod.Pod.Namespace),
attribute.String("pod.uid", string(pod.Pod.UID)),
)

var retrievedData []types.RetrievedPodData

data := types.RetrievedPodData{}
if h.Config.ExportPodData {
data, err = getData(h.Ctx, h.Config, pod)
data, err = getData(h.Ctx, h.Config, pod, span)
if err != nil {
statusCode = http.StatusInternalServerError
log.G(h.Ctx).Fatal(err)
log.G(h.Ctx).Error(err)
w.WriteHeader(statusCode)
return
}
Expand All @@ -54,7 +73,7 @@ func (h *InterLinkHandler) CreateHandler(w http.ResponseWriter, r *http.Request)
bodyBytes, err = json.Marshal(retrievedData)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
log.G(h.Ctx).Fatal(err)
log.G(h.Ctx).Error(err)
return
}
log.G(h.Ctx).Debug(string(bodyBytes))
Expand All @@ -66,7 +85,7 @@ func (h *InterLinkHandler) CreateHandler(w http.ResponseWriter, r *http.Request)
if err != nil {
statusCode = http.StatusInternalServerError
w.WriteHeader(statusCode)
log.G(h.Ctx).Fatal(err)
log.G(h.Ctx).Error(err)
return
}

Expand All @@ -82,17 +101,20 @@ func (h *InterLinkHandler) CreateHandler(w http.ResponseWriter, r *http.Request)
return
}

if resp.StatusCode == http.StatusOK {
statusCode = http.StatusOK
log.G(h.Ctx).Debug(statusCode)
} else {
statusCode = http.StatusInternalServerError
log.G(h.Ctx).Error(statusCode)
if resp != nil {
if resp.StatusCode == http.StatusOK {
statusCode = http.StatusOK
log.G(h.Ctx).Debug(statusCode)
} else {
statusCode = http.StatusInternalServerError
log.G(h.Ctx).Error(statusCode)
}

returnValue, _ := io.ReadAll(resp.Body)
log.G(h.Ctx).Debug(string(returnValue))
w.WriteHeader(statusCode)
types.SetDurationSpan(start, span, types.WithHTTPReturnCode(statusCode))
w.Write(returnValue)
}

returnValue, _ := io.ReadAll(resp.Body)
log.G(h.Ctx).Debug(string(returnValue))
w.WriteHeader(statusCode)
w.Write(returnValue)
}
}
55 changes: 38 additions & 17 deletions pkg/interlink/api/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,28 @@ import (
"encoding/json"
"io"
"net/http"
"time"

"github.com/containerd/containerd/log"
v1 "k8s.io/api/core/v1"

types "github.com/intertwin-eu/interlink/pkg/interlink"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
trace "go.opentelemetry.io/otel/trace"
)

// DeleteHandler deletes the cached status for the provided Pod and forwards the request to the sidecar
func (h *InterLinkHandler) DeleteHandler(w http.ResponseWriter, r *http.Request) {
start := time.Now().UnixMicro()
tracer := otel.Tracer("interlink-API")
_, span := tracer.Start(h.Ctx, "DeleteAPI", trace.WithAttributes(
attribute.Int64("start.timestamp", start),
))
defer span.End()
defer types.SetDurationSpan(start, span)

log.G(h.Ctx).Info("InterLink: received Delete call")

bodyBytes, err := io.ReadAll(r.Body)
Expand All @@ -36,6 +49,12 @@ func (h *InterLinkHandler) DeleteHandler(w http.ResponseWriter, r *http.Request)
log.G(h.Ctx).Fatal(err)
}

span.SetAttributes(
attribute.String("pod.name", pod.Name),
attribute.String("pod.namespace", pod.Namespace),
attribute.String("pod.uid", string(pod.UID)),
)

deleteCachedStatus(string(pod.UID))
req, err = http.NewRequest(http.MethodPost, h.SidecarEndpoint+"/delete", reader)
if err != nil {
Expand All @@ -55,24 +74,26 @@ func (h *InterLinkHandler) DeleteHandler(w http.ResponseWriter, r *http.Request)
return
}

returnValue, _ := io.ReadAll(resp.Body)
statusCode = resp.StatusCode
if resp != nil {
returnValue, _ := io.ReadAll(resp.Body)
statusCode = resp.StatusCode

if statusCode != http.StatusOK {
w.WriteHeader(http.StatusInternalServerError)
} else {
w.WriteHeader(http.StatusOK)
}
log.G(h.Ctx).Debug("InterLink: " + string(returnValue))
var returnJson []types.PodStatus
returnJson = append(returnJson, types.PodStatus{PodName: pod.Name, PodUID: string(pod.UID), PodNamespace: pod.Namespace})
if statusCode != http.StatusOK {
w.WriteHeader(http.StatusInternalServerError)
} else {
w.WriteHeader(http.StatusOK)
}
log.G(h.Ctx).Debug("InterLink: " + string(returnValue))
var returnJson []types.PodStatus
returnJson = append(returnJson, types.PodStatus{PodName: pod.Name, PodUID: string(pod.UID), PodNamespace: pod.Namespace})

bodyBytes, err = json.Marshal(returnJson)
if err != nil {
log.G(h.Ctx).Error(err)
w.Write([]byte{})
} else {
w.Write(bodyBytes)
bodyBytes, err = json.Marshal(returnJson)
if err != nil {
log.G(h.Ctx).Error(err)
w.Write([]byte{})
} else {
types.SetDurationSpan(start, span, types.WithHTTPReturnCode(statusCode))
w.Write(bodyBytes)
}
}

}
Loading

0 comments on commit 24c4597

Please sign in to comment.