From ac9a4b46fce662690933c0f26c2851fe9aca304b Mon Sep 17 00:00:00 2001 From: Surax98 Date: Mon, 26 Aug 2024 10:13:30 +0000 Subject: [PATCH] implemented otel tracing for interlink API && minors Signed-off-by: Surax98 --- cmd/interlink/main.go | 83 ++++++++++++++++++++++++++++ cmd/virtual-kubelet/main.go | 37 ++----------- pkg/interlink/api/create.go | 56 +++++++++++++------ pkg/interlink/api/delete.go | 55 ++++++++++++------ pkg/interlink/api/func.go | 21 ++++++- pkg/interlink/api/logs.go | 45 ++++++++++++--- pkg/interlink/api/ping.go | 36 +++++++++--- pkg/interlink/api/status.go | 72 +++++++++++++++++------- pkg/interlink/spans.go | 32 +++++++++++ pkg/interlink/types.go | 7 +++ pkg/virtualkubelet/execute.go | 50 ++++++++--------- pkg/virtualkubelet/virtualkubelet.go | 35 ++++-------- 12 files changed, 374 insertions(+), 155 deletions(-) create mode 100644 pkg/interlink/spans.go diff --git a/cmd/interlink/main.go b/cmd/interlink/main.go index 5b25d46f..3d6aed56 100644 --- a/cmd/interlink/main.go +++ b/cmd/interlink/main.go @@ -2,20 +2,86 @@ package main import ( "context" + "crypto/tls" "flag" "fmt" "net/http" + "os" "strings" + "time" "github.com/sirupsen/logrus" "github.com/virtual-kubelet/virtual-kubelet/log" logruslogger "github.com/virtual-kubelet/virtual-kubelet/log/logrus" + "github.com/virtual-kubelet/virtual-kubelet/trace" + "github.com/virtual-kubelet/virtual-kubelet/trace/opentelemetry" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" types "github.com/intertwin-eu/interlink/pkg/interlink" "github.com/intertwin-eu/interlink/pkg/interlink/api" "github.com/intertwin-eu/interlink/pkg/virtualkubelet" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/sdk/resource" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.21.0" ) +func initProvider(ctx context.Context) (func(context.Context) error, error) { + res, err := resource.New(ctx, + resource.WithAttributes( + // the service name used to display traces in backends + semconv.ServiceName("InterLink-API"), + ), + ) + if err != nil { + return nil, fmt.Errorf("failed to create resource: %w", err) + } + + ctx, cancel := context.WithTimeout(ctx, time.Second) + defer cancel() + + otlpEndpoint := os.Getenv("TELEMETRY_ENDPOINT") + + if otlpEndpoint == "" { + otlpEndpoint = "localhost:4317" + } + + fmt.Println("TELEMETRY_ENDPOINT: ", otlpEndpoint) + + conn := &grpc.ClientConn{} + creds := credentials.NewTLS(&tls.Config{InsecureSkipVerify: true}) + conn, err = grpc.DialContext(ctx, otlpEndpoint, grpc.WithTransportCredentials(creds), grpc.WithBlock()) + + if err != nil { + return nil, fmt.Errorf("failed to create gRPC connection to collector: %w", err) + } + + // Set up a trace exporter + traceExporter, err := otlptracegrpc.New(ctx, otlptracegrpc.WithGRPCConn(conn)) + if err != nil { + return nil, fmt.Errorf("failed to create trace exporter: %w", err) + } + + // Register the trace exporter with a TracerProvider, using a batch + // span processor to aggregate spans before export. + bsp := sdktrace.NewBatchSpanProcessor(traceExporter) + tracerProvider := sdktrace.NewTracerProvider( + sdktrace.WithSampler(sdktrace.AlwaysSample()), + sdktrace.WithResource(res), + sdktrace.WithSpanProcessor(bsp), + ) + otel.SetTracerProvider(tracerProvider) + + // set global propagator to tracecontext (the default is no-op). + otel.SetTextMapPropagator(propagation.TraceContext{}) + + return tracerProvider.Shutdown, nil +} + func main() { printVersion := flag.Bool("version", false, "show version") flag.Parse() @@ -44,6 +110,23 @@ func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + if os.Getenv("ENABLE_TRACING") == "1" { + shutdown, err := initProvider(ctx) + if err != nil { + log.G(ctx).Fatal(err) + } + defer func() { + if err = shutdown(ctx); err != nil { + log.G(ctx).Fatal("failed to shutdown TracerProvider: %w", err) + } + }() + + log.G(ctx).Info("Tracer setup succeeded") + + // TODO: disable this through options + trace.T = opentelemetry.Adapter{} + } + log.G(ctx).Info(interLinkConfig) log.G(ctx).Info("interLink version: ", virtualkubelet.KubeletVersion) diff --git a/cmd/virtual-kubelet/main.go b/cmd/virtual-kubelet/main.go index a72168bd..ab63153d 100644 --- a/cmd/virtual-kubelet/main.go +++ b/cmd/virtual-kubelet/main.go @@ -18,10 +18,8 @@ package main import ( "context" "crypto/tls" - "crypto/x509" "flag" "fmt" - "io/ioutil" "net" "os" "path" @@ -95,13 +93,11 @@ type Opts struct { ErrorsOnly bool } -func initProvider() (func(context.Context) error, error) { - ctx := context.Background() - +func initProvider(ctx context.Context) (func(context.Context) error, error) { res, err := resource.New(ctx, resource.WithAttributes( // the service name used to display traces in backends - semconv.ServiceName("InterLink-service"), + semconv.ServiceName("InterLink-VK"), ), ) if err != nil { @@ -119,32 +115,9 @@ func initProvider() (func(context.Context) error, error) { fmt.Println("TELEMETRY_ENDPOINT: ", otlpEndpoint) - crtFilePath := os.Getenv("TELEMETRY_CRTFILEPATH") - conn := &grpc.ClientConn{} - - if crtFilePath != "" { - cert, err := ioutil.ReadFile(crtFilePath) - if err != nil { - return nil, fmt.Errorf("failed to create resource: %w", err) - } - - roots := x509.NewCertPool() - if !roots.AppendCertsFromPEM(cert) { - return nil, fmt.Errorf("failed to create resource: %w", err) - } - - creds := credentials.NewTLS(&tls.Config{ - RootCAs: roots, - }) - - conn, err = grpc.DialContext(ctx, otlpEndpoint, - grpc.WithTransportCredentials(creds), - grpc.WithBlock(), - ) - } else { - conn, err = grpc.DialContext(ctx, otlpEndpoint, grpc.WithInsecure(), grpc.WithBlock()) - } + creds := credentials.NewTLS(&tls.Config{InsecureSkipVerify: true}) + conn, err = grpc.DialContext(ctx, otlpEndpoint, grpc.WithTransportCredentials(creds), grpc.WithBlock()) if err != nil { return nil, fmt.Errorf("failed to create gRPC connection to collector: %w", err) @@ -213,7 +186,7 @@ func main() { log.L = logruslogger.FromLogrus(logrus.NewEntry(logger)) if os.Getenv("ENABLE_TRACING") == "1" { - shutdown, err := initProvider() + shutdown, err := initProvider(ctx) if err != nil { log.G(ctx).Fatal(err) } diff --git a/pkg/interlink/api/create.go b/pkg/interlink/api/create.go index 9fddda70..c3afbf9b 100644 --- a/pkg/interlink/api/create.go +++ b/pkg/interlink/api/create.go @@ -5,14 +5,27 @@ import ( "encoding/json" "io" "net/http" + "time" "github.com/containerd/containerd/log" types "github.com/intertwin-eu/interlink/pkg/interlink" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + trace "go.opentelemetry.io/otel/trace" ) // CreateHandler collects and rearranges all needed ConfigMaps/Secrets/EmptyDirs to ship them to the sidecar, then sends a response to the client func (h *InterLinkHandler) CreateHandler(w http.ResponseWriter, r *http.Request) { + start := time.Now().UnixMicro() + tracer := otel.Tracer("interlink-API") + _, span := tracer.Start(h.Ctx, "CreateAPI", trace.WithAttributes( + attribute.Int64("start.timestamp", start), + )) + defer span.End() + defer types.SetDurationSpan(start, span) + log.G(h.Ctx).Info("InterLink: received Create call") statusCode := -1 @@ -21,7 +34,7 @@ func (h *InterLinkHandler) CreateHandler(w http.ResponseWriter, r *http.Request) if err != nil { statusCode = http.StatusInternalServerError w.WriteHeader(statusCode) - log.G(h.Ctx).Fatal(err) + log.G(h.Ctx).Error(err) return } @@ -30,19 +43,25 @@ func (h *InterLinkHandler) CreateHandler(w http.ResponseWriter, r *http.Request) err = json.Unmarshal(bodyBytes, &pod) if err != nil { statusCode = http.StatusInternalServerError - log.G(h.Ctx).Fatal(err) + log.G(h.Ctx).Error(err) w.WriteHeader(statusCode) return } + span.SetAttributes( + attribute.String("pod.name", pod.Pod.Name), + attribute.String("pod.namespace", pod.Pod.Namespace), + attribute.String("pod.uid", string(pod.Pod.UID)), + ) + var retrievedData []types.RetrievedPodData data := types.RetrievedPodData{} if h.Config.ExportPodData { - data, err = getData(h.Ctx, h.Config, pod) + data, err = getData(h.Ctx, h.Config, pod, span) if err != nil { statusCode = http.StatusInternalServerError - log.G(h.Ctx).Fatal(err) + log.G(h.Ctx).Error(err) w.WriteHeader(statusCode) return } @@ -54,7 +73,7 @@ func (h *InterLinkHandler) CreateHandler(w http.ResponseWriter, r *http.Request) bodyBytes, err = json.Marshal(retrievedData) if err != nil { w.WriteHeader(http.StatusInternalServerError) - log.G(h.Ctx).Fatal(err) + log.G(h.Ctx).Error(err) return } log.G(h.Ctx).Debug(string(bodyBytes)) @@ -66,7 +85,7 @@ func (h *InterLinkHandler) CreateHandler(w http.ResponseWriter, r *http.Request) if err != nil { statusCode = http.StatusInternalServerError w.WriteHeader(statusCode) - log.G(h.Ctx).Fatal(err) + log.G(h.Ctx).Error(err) return } @@ -82,17 +101,20 @@ func (h *InterLinkHandler) CreateHandler(w http.ResponseWriter, r *http.Request) return } - if resp.StatusCode == http.StatusOK { - statusCode = http.StatusOK - log.G(h.Ctx).Debug(statusCode) - } else { - statusCode = http.StatusInternalServerError - log.G(h.Ctx).Error(statusCode) + if resp != nil { + if resp.StatusCode == http.StatusOK { + statusCode = http.StatusOK + log.G(h.Ctx).Debug(statusCode) + } else { + statusCode = http.StatusInternalServerError + log.G(h.Ctx).Error(statusCode) + } + + returnValue, _ := io.ReadAll(resp.Body) + log.G(h.Ctx).Debug(string(returnValue)) + w.WriteHeader(statusCode) + types.SetDurationSpan(start, span, types.WithHTTPReturnCode(statusCode)) + w.Write(returnValue) } - - returnValue, _ := io.ReadAll(resp.Body) - log.G(h.Ctx).Debug(string(returnValue)) - w.WriteHeader(statusCode) - w.Write(returnValue) } } diff --git a/pkg/interlink/api/delete.go b/pkg/interlink/api/delete.go index f2d1ca2d..cce503ad 100644 --- a/pkg/interlink/api/delete.go +++ b/pkg/interlink/api/delete.go @@ -5,15 +5,28 @@ import ( "encoding/json" "io" "net/http" + "time" "github.com/containerd/containerd/log" v1 "k8s.io/api/core/v1" types "github.com/intertwin-eu/interlink/pkg/interlink" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + trace "go.opentelemetry.io/otel/trace" ) // DeleteHandler deletes the cached status for the provided Pod and forwards the request to the sidecar func (h *InterLinkHandler) DeleteHandler(w http.ResponseWriter, r *http.Request) { + start := time.Now().UnixMicro() + tracer := otel.Tracer("interlink-API") + _, span := tracer.Start(h.Ctx, "DeleteAPI", trace.WithAttributes( + attribute.Int64("start.timestamp", start), + )) + defer span.End() + defer types.SetDurationSpan(start, span) + log.G(h.Ctx).Info("InterLink: received Delete call") bodyBytes, err := io.ReadAll(r.Body) @@ -36,6 +49,12 @@ func (h *InterLinkHandler) DeleteHandler(w http.ResponseWriter, r *http.Request) log.G(h.Ctx).Fatal(err) } + span.SetAttributes( + attribute.String("pod.name", pod.Name), + attribute.String("pod.namespace", pod.Namespace), + attribute.String("pod.uid", string(pod.UID)), + ) + deleteCachedStatus(string(pod.UID)) req, err = http.NewRequest(http.MethodPost, h.SidecarEndpoint+"/delete", reader) if err != nil { @@ -55,24 +74,26 @@ func (h *InterLinkHandler) DeleteHandler(w http.ResponseWriter, r *http.Request) return } - returnValue, _ := io.ReadAll(resp.Body) - statusCode = resp.StatusCode + if resp != nil { + returnValue, _ := io.ReadAll(resp.Body) + statusCode = resp.StatusCode - if statusCode != http.StatusOK { - w.WriteHeader(http.StatusInternalServerError) - } else { - w.WriteHeader(http.StatusOK) - } - log.G(h.Ctx).Debug("InterLink: " + string(returnValue)) - var returnJson []types.PodStatus - returnJson = append(returnJson, types.PodStatus{PodName: pod.Name, PodUID: string(pod.UID), PodNamespace: pod.Namespace}) + if statusCode != http.StatusOK { + w.WriteHeader(http.StatusInternalServerError) + } else { + w.WriteHeader(http.StatusOK) + } + log.G(h.Ctx).Debug("InterLink: " + string(returnValue)) + var returnJson []types.PodStatus + returnJson = append(returnJson, types.PodStatus{PodName: pod.Name, PodUID: string(pod.UID), PodNamespace: pod.Namespace}) - bodyBytes, err = json.Marshal(returnJson) - if err != nil { - log.G(h.Ctx).Error(err) - w.Write([]byte{}) - } else { - w.Write(bodyBytes) + bodyBytes, err = json.Marshal(returnJson) + if err != nil { + log.G(h.Ctx).Error(err) + w.Write([]byte{}) + } else { + types.SetDurationSpan(start, span, types.WithHTTPReturnCode(statusCode)) + w.Write(bodyBytes) + } } - } diff --git a/pkg/interlink/api/func.go b/pkg/interlink/api/func.go index 7a078d82..a9915273 100644 --- a/pkg/interlink/api/func.go +++ b/pkg/interlink/api/func.go @@ -4,8 +4,11 @@ import ( "context" "path/filepath" "sync" + "time" "github.com/containerd/containerd/log" + "go.opentelemetry.io/otel/attribute" + trace "go.opentelemetry.io/otel/trace" v1 "k8s.io/api/core/v1" types "github.com/intertwin-eu/interlink/pkg/interlink" @@ -21,12 +24,15 @@ var PodStatuses MutexStatuses // getData retrieves ConfigMaps, Secrets and EmptyDirs from the provided pod by calling the retrieveData function. // The config is needed by the retrieveData function. // The function aggregates the return values of retrieveData function in a commonIL.RetrievedPodData variable and returns it, along with the first encountered error. -func getData(ctx context.Context, config types.InterLinkConfig, pod types.PodCreateRequests) (types.RetrievedPodData, error) { +func getData(ctx context.Context, config types.InterLinkConfig, pod types.PodCreateRequests, span trace.Span) (types.RetrievedPodData, error) { + start := time.Now().UnixMicro() + span.AddEvent("Retrieving data for pod " + pod.Pod.Name) log.G(ctx).Debug(pod.ConfigMaps) var retrievedData types.RetrievedPodData retrievedData.Pod = pod.Pod for _, container := range pod.Pod.Spec.InitContainers { + startContainer := time.Now().UnixMicro() log.G(ctx).Info("- Retrieving Secrets and ConfigMaps for the Docker Sidecar. InitContainer: " + container.Name) log.G(ctx).Debug(container.VolumeMounts) data, InterlinkIP := retrieveData(ctx, config, pod, container) @@ -35,9 +41,15 @@ func getData(ctx context.Context, config types.InterLinkConfig, pod types.PodCre return types.RetrievedPodData{}, InterlinkIP } retrievedData.Containers = append(retrievedData.Containers, data) + + durationContainer := time.Now().UnixMicro() - startContainer + span.AddEvent("Init Container "+container.Name, trace.WithAttributes( + attribute.Int64("initcontainer.getdata.duration", durationContainer), + attribute.String("pod.name", pod.Pod.Name))) } for _, container := range pod.Pod.Spec.Containers { + startContainer := time.Now().UnixMicro() log.G(ctx).Info("- Retrieving Secrets and ConfigMaps for the Docker Sidecar. Container: " + container.Name) log.G(ctx).Debug(container.VolumeMounts) data, err := retrieveData(ctx, config, pod, container) @@ -46,8 +58,15 @@ func getData(ctx context.Context, config types.InterLinkConfig, pod types.PodCre return types.RetrievedPodData{}, err } retrievedData.Containers = append(retrievedData.Containers, data) + + durationContainer := time.Now().UnixMicro() - startContainer + span.AddEvent("Container "+container.Name, trace.WithAttributes( + attribute.Int64("container.getdata.duration", durationContainer), + attribute.String("pod.name", pod.Pod.Name))) } + duration := time.Now().UnixMicro() - start + span.SetAttributes(attribute.Int64("getdata.duration", duration)) return retrievedData, nil } diff --git a/pkg/interlink/api/logs.go b/pkg/interlink/api/logs.go index 9189091b..63bd6f16 100644 --- a/pkg/interlink/api/logs.go +++ b/pkg/interlink/api/logs.go @@ -7,13 +7,26 @@ import ( "io" "net/http" "strconv" + "time" "github.com/containerd/containerd/log" types "github.com/intertwin-eu/interlink/pkg/interlink" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + trace "go.opentelemetry.io/otel/trace" ) func (h *InterLinkHandler) GetLogsHandler(w http.ResponseWriter, r *http.Request) { + start := time.Now().UnixMicro() + tracer := otel.Tracer("interlink-API") + _, span := tracer.Start(h.Ctx, "GetLogsAPI", trace.WithAttributes( + attribute.Int64("start.timestamp", start), + )) + defer span.End() + defer types.SetDurationSpan(start, span) + statusCode := http.StatusOK log.G(h.Ctx).Info("InterLink: received GetLogs call") bodyBytes, err := io.ReadAll(r.Body) @@ -31,6 +44,18 @@ func (h *InterLinkHandler) GetLogsHandler(w http.ResponseWriter, r *http.Request return } + span.SetAttributes( + attribute.String("pod.name", req2.PodName), + attribute.String("pod.namespace", req2.Namespace), + attribute.Int("opts.limitbytes", req2.Opts.LimitBytes), + attribute.Int("opts.since", req2.Opts.SinceSeconds), + attribute.Int64("opts.sincetime", req2.Opts.SinceTime.UnixMicro()), + attribute.Int("opts.tail", req2.Opts.Tail), + attribute.Bool("opts.follow", req2.Opts.Follow), + attribute.Bool("opts.previous", req2.Opts.Previous), + attribute.Bool("opts.timestamps", req2.Opts.Timestamps), + ) + log.G(h.Ctx).Info("InterLink: new GetLogs podUID: now ", string(req2.PodUID)) if (req2.Opts.Tail != 0 && req2.Opts.LimitBytes != 0) || (req2.Opts.SinceSeconds != 0 && !req2.Opts.SinceTime.IsZero()) { statusCode = http.StatusInternalServerError @@ -69,14 +94,18 @@ func (h *InterLinkHandler) GetLogsHandler(w http.ResponseWriter, r *http.Request return } - if resp.StatusCode != http.StatusOK { - log.L.Error("Unexpected error occured. Status code: " + strconv.Itoa(resp.StatusCode) + ". Check Sidecar's logs for further informations") - statusCode = http.StatusInternalServerError - } + if resp != nil { + if resp.StatusCode != http.StatusOK { + log.L.Error("Unexpected error occured. Status code: " + strconv.Itoa(resp.StatusCode) + ". Check Sidecar's logs for further informations") + statusCode = http.StatusInternalServerError + } + + returnValue, _ := io.ReadAll(resp.Body) + log.G(h.Ctx).Debug("InterLink: logs " + string(returnValue)) - returnValue, _ := io.ReadAll(resp.Body) - log.G(h.Ctx).Debug("InterLink: logs " + string(returnValue)) + types.SetDurationSpan(start, span, types.WithHTTPReturnCode(statusCode)) - w.WriteHeader(statusCode) - w.Write(returnValue) + w.WriteHeader(statusCode) + w.Write(returnValue) + } } diff --git a/pkg/interlink/api/ping.go b/pkg/interlink/api/ping.go index 730717cb..c0109031 100644 --- a/pkg/interlink/api/ping.go +++ b/pkg/interlink/api/ping.go @@ -5,13 +5,27 @@ import ( "encoding/json" "net/http" "strconv" + "time" "github.com/containerd/containerd/log" + types "github.com/intertwin-eu/interlink/pkg/interlink" v1 "k8s.io/api/core/v1" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + trace "go.opentelemetry.io/otel/trace" ) // Ping is just a very basic Ping function func (h *InterLinkHandler) Ping(w http.ResponseWriter, r *http.Request) { + start := time.Now().UnixMicro() + tracer := otel.Tracer("interlink-API") + _, span := tracer.Start(h.Ctx, "PingAPI", trace.WithAttributes( + attribute.Int64("start.timestamp", start), + )) + defer span.End() + defer types.SetDurationSpan(start, span) + log.G(h.Ctx).Info("InterLink: received Ping call") podsToBeChecked := []*v1.Pod{} @@ -29,7 +43,7 @@ func (h *InterLinkHandler) Ping(w http.ResponseWriter, r *http.Request) { log.G(h.Ctx).Info("InterLink: forwarding GetStatus call to sidecar") req.Header.Set("Content-Type", "application/json") log.G(h.Ctx).Debug(req) - reqPlugin, err := http.DefaultClient.Do(req) + respPlugin, err := http.DefaultClient.Do(req) if err != nil { log.G(h.Ctx).Error(err) w.WriteHeader(http.StatusServiceUnavailable) @@ -37,13 +51,17 @@ func (h *InterLinkHandler) Ping(w http.ResponseWriter, r *http.Request) { return } - if reqPlugin.StatusCode != http.StatusOK { - log.G(h.Ctx).Error("error pinging plugin") - w.WriteHeader(reqPlugin.StatusCode) - w.Write([]byte(strconv.Itoa(http.StatusServiceUnavailable))) - return - } + if respPlugin != nil { + if respPlugin.StatusCode != http.StatusOK { + log.G(h.Ctx).Error("error pinging plugin") + w.WriteHeader(respPlugin.StatusCode) + w.Write([]byte(strconv.Itoa(http.StatusServiceUnavailable))) + return + } + + types.SetDurationSpan(start, span, types.WithHTTPReturnCode(respPlugin.StatusCode)) - w.WriteHeader(http.StatusOK) - w.Write([]byte("0")) + w.WriteHeader(http.StatusOK) + w.Write([]byte("0")) + } } diff --git a/pkg/interlink/api/status.go b/pkg/interlink/api/status.go index da8502bd..7e0c04bb 100644 --- a/pkg/interlink/api/status.go +++ b/pkg/interlink/api/status.go @@ -6,14 +6,26 @@ import ( "io" "net/http" "strconv" + "time" "github.com/containerd/containerd/log" v1 "k8s.io/api/core/v1" types "github.com/intertwin-eu/interlink/pkg/interlink" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + trace "go.opentelemetry.io/otel/trace" ) func (h *InterLinkHandler) StatusHandler(w http.ResponseWriter, r *http.Request) { + start := time.Now().UnixMicro() + tracer := otel.Tracer("interlink-API") + _, span := tracer.Start(h.Ctx, "PingAPI", trace.WithAttributes( + attribute.Int64("start.timestamp", start), + )) + defer span.End() + defer types.SetDurationSpan(start, span) statusCode := http.StatusOK var pods []*v1.Pod log.G(h.Ctx).Info("InterLink: received GetStatus call") @@ -28,6 +40,10 @@ func (h *InterLinkHandler) StatusHandler(w http.ResponseWriter, r *http.Request) log.G(h.Ctx).Error(err) } + span.SetAttributes( + attribute.Int("pods.count", len(pods)), + ) + var podsToBeChecked []*v1.Pod var returnedStatuses []types.PodStatus //returned from the query to the sidecar var returnPods []types.PodStatus //returned to the vk @@ -36,7 +52,20 @@ func (h *InterLinkHandler) StatusHandler(w http.ResponseWriter, r *http.Request) for _, pod := range pods { cached := checkIfCached(string(pod.UID)) if pod.Status.Phase == v1.PodRunning || pod.Status.Phase == v1.PodPending || !cached { + span.AddEvent("Pod "+pod.Name+" is not cached", trace.WithAttributes( + attribute.String("pod.name", pod.Name), + attribute.String("pod.namespace", pod.Namespace), + attribute.String("pod.uid", string(pod.UID)), + attribute.String("pod.phase", string(pod.Status.Phase)), + )) podsToBeChecked = append(podsToBeChecked, pod) + } else if cached { + span.AddEvent("Pod "+pod.Name+" is cached", trace.WithAttributes( + attribute.String("pod.name", pod.Name), + attribute.String("pod.namespace", pod.Namespace), + attribute.String("pod.uid", string(pod.UID)), + attribute.String("pod.phase", string(pod.Status.Phase)), + )) } } PodStatuses.mu.Unlock() @@ -65,29 +94,32 @@ func (h *InterLinkHandler) StatusHandler(w http.ResponseWriter, r *http.Request) return } - if resp.StatusCode != http.StatusOK { - log.L.Error("Unexpected error occured. Status code: " + strconv.Itoa(resp.StatusCode) + ". Check Sidecar's logs for further informations") - statusCode = http.StatusInternalServerError - } + if resp != nil { + if resp.StatusCode != http.StatusOK { + log.L.Error("Unexpected error occured. Status code: " + strconv.Itoa(resp.StatusCode) + ". Check Sidecar's logs for further informations") + statusCode = http.StatusInternalServerError + } - bodyBytes, err = io.ReadAll(resp.Body) - if err != nil { - statusCode = http.StatusInternalServerError - w.WriteHeader(statusCode) - log.G(h.Ctx).Error(err) - return - } + bodyBytes, err = io.ReadAll(resp.Body) + if err != nil { + statusCode = http.StatusInternalServerError + w.WriteHeader(statusCode) + log.G(h.Ctx).Error(err) + return + } - log.G(h.Ctx).Debug(string(bodyBytes)) - err = json.Unmarshal(bodyBytes, &returnedStatuses) - if err != nil { - statusCode = http.StatusInternalServerError - w.WriteHeader(statusCode) - log.G(h.Ctx).Error(err) - return - } + log.G(h.Ctx).Debug(string(bodyBytes)) + err = json.Unmarshal(bodyBytes, &returnedStatuses) + if err != nil { + statusCode = http.StatusInternalServerError + w.WriteHeader(statusCode) + log.G(h.Ctx).Error(err) + return + } - updateStatuses(returnedStatuses) + updateStatuses(returnedStatuses) + types.SetDurationSpan(start, span, types.WithHTTPReturnCode(statusCode)) + } } diff --git a/pkg/interlink/spans.go b/pkg/interlink/spans.go new file mode 100644 index 00000000..73469b5b --- /dev/null +++ b/pkg/interlink/spans.go @@ -0,0 +1,32 @@ +package interlink + +import ( + "time" + + "go.opentelemetry.io/otel/attribute" + trace "go.opentelemetry.io/otel/trace" +) + +func WithHTTPReturnCode(code int) SpanOption { + return func(cfg *SpanConfig) { + cfg.HTTPReturnCode = code + cfg.SetHTTPCode = true + } +} + +func SetDurationSpan(startTime int64, span trace.Span, opts ...SpanOption) { + endTime := time.Now().UnixMicro() + config := &SpanConfig{} + + for _, opt := range opts { + opt(config) + } + + duration := endTime - startTime + span.SetAttributes(attribute.Int64("end.timestamp", endTime), + attribute.Int64("duration", duration)) + + if config.SetHTTPCode { + span.SetAttributes(attribute.Int("exit.code", config.HTTPReturnCode)) + } +} diff --git a/pkg/interlink/types.go b/pkg/interlink/types.go index 70729054..6ba742f4 100644 --- a/pkg/interlink/types.go +++ b/pkg/interlink/types.go @@ -61,3 +61,10 @@ type LogStruct struct { ContainerName string `json:"ContainerName"` Opts ContainerLogOpts `json:"Opts"` } + +type SpanConfig struct { + HTTPReturnCode int + SetHTTPCode bool +} + +type SpanOption func(*SpanConfig) diff --git a/pkg/virtualkubelet/execute.go b/pkg/virtualkubelet/execute.go index 75f2b5ed..cca1db40 100644 --- a/pkg/virtualkubelet/execute.go +++ b/pkg/virtualkubelet/execute.go @@ -69,8 +69,9 @@ func PingInterLink(ctx context.Context, config VirtualKubeletConfig) (bool, int, attribute.Int64("start.timestamp", startHttpCall), )) defer spanHttp.End() + defer types.SetDurationSpan(startHttpCall, spanHttp) + resp, err := http.DefaultClient.Do(req) - setDurationSpan(startHttpCall, spanHttp) if err != nil { spanHttp.SetAttributes(attribute.Int("exit.code", http.StatusInternalServerError)) @@ -78,7 +79,7 @@ func PingInterLink(ctx context.Context, config VirtualKubeletConfig) (bool, int, } if resp != nil { - spanHttp.SetAttributes(attribute.Int("exit.code", resp.StatusCode)) + types.SetDurationSpan(startHttpCall, spanHttp, types.WithHTTPReturnCode(resp.StatusCode)) retBytes, err := io.ReadAll(resp.Body) if err != nil { log.G(ctx).Error(err) @@ -95,6 +96,7 @@ func PingInterLink(ctx context.Context, config VirtualKubeletConfig) (bool, int, return false, retVal, nil } } + return true, retVal, nil } @@ -126,17 +128,16 @@ func updateCacheRequest(ctx context.Context, config VirtualKubeletConfig, pod v1 attribute.Int64("start.timestamp", startHttpCall), )) defer spanHttp.End() + defer types.SetDurationSpan(startHttpCall, spanHttp) + resp, err := http.DefaultClient.Do(req) - setDurationSpan(startHttpCall, spanHttp) if err != nil { log.L.Error(err) return err } if resp != nil { - statusCode := resp.StatusCode - spanHttp.SetAttributes(attribute.Int("exit.code", resp.StatusCode)) - - if statusCode != http.StatusOK { + types.SetDurationSpan(startHttpCall, spanHttp, types.WithHTTPReturnCode(resp.StatusCode)) + if resp.StatusCode != http.StatusOK { return errors.New("Unexpected error occured while updating InterLink cache. Status code: " + strconv.Itoa(resp.StatusCode) + ". Check InterLink's logs for further informations") } } @@ -171,18 +172,18 @@ func createRequest(ctx context.Context, config VirtualKubeletConfig, pod types.P attribute.Int64("start.timestamp", startHttpCall), )) defer spanHttp.End() + defer types.SetDurationSpan(startHttpCall, spanHttp) + resp, err := doRequest(req, token) - setDurationSpan(startHttpCall, spanHttp) if err != nil { log.L.Error(err) return nil, err } if resp != nil { - statusCode := resp.StatusCode - spanHttp.SetAttributes(attribute.Int("exit.code", resp.StatusCode)) + types.SetDurationSpan(startHttpCall, spanHttp, types.WithHTTPReturnCode(resp.StatusCode)) - if statusCode != http.StatusOK { + if resp.StatusCode != http.StatusOK { return nil, errors.New("Unexpected error occured while creating Pods. Status code: " + strconv.Itoa(resp.StatusCode) + ". Check InterLink's logs for further informations") } else { returnValue, err = io.ReadAll(resp.Body) @@ -222,8 +223,9 @@ func deleteRequest(ctx context.Context, config VirtualKubeletConfig, pod *v1.Pod attribute.Int64("start.timestamp", startHttpCall), )) defer spanHttp.End() + defer types.SetDurationSpan(startHttpCall, spanHttp) + resp, err := doRequest(req, token) - setDurationSpan(startHttpCall, spanHttp) if err != nil { log.G(context.Background()).Error(err) return nil, err @@ -231,8 +233,7 @@ func deleteRequest(ctx context.Context, config VirtualKubeletConfig, pod *v1.Pod if resp != nil { statusCode := resp.StatusCode - spanHttp.SetAttributes(attribute.Int("exit.code", resp.StatusCode)) - + types.SetDurationSpan(startHttpCall, spanHttp, types.WithHTTPReturnCode(resp.StatusCode)) if statusCode != http.StatusOK { return nil, errors.New("Unexpected error occured while deleting Pods. Status code: " + strconv.Itoa(resp.StatusCode) + ". Check InterLink's logs for further informations") } else { @@ -250,6 +251,7 @@ func deleteRequest(ctx context.Context, config VirtualKubeletConfig, pod *v1.Pod } } } + return returnValue, nil } @@ -280,14 +282,15 @@ func statusRequest(ctx context.Context, config VirtualKubeletConfig, podsList [] attribute.Int64("start.timestamp", startHttpCall), )) defer spanHttp.End() + defer types.SetDurationSpan(startHttpCall, spanHttp) + resp, err := doRequest(req, token) - setDurationSpan(startHttpCall, spanHttp) if err != nil { return nil, err } if resp != nil { - spanHttp.SetAttributes(attribute.Int("exit.code", resp.StatusCode)) + types.SetDurationSpan(startHttpCall, spanHttp, types.WithHTTPReturnCode(resp.StatusCode)) if resp.StatusCode != http.StatusOK { return nil, errors.New("Unexpected error occured while getting status. Status code: " + strconv.Itoa(resp.StatusCode) + ". Check InterLink's logs for further informations") } else { @@ -337,21 +340,23 @@ func LogRetrieval(ctx context.Context, config VirtualKubeletConfig, logsRequest attribute.Int64("start.timestamp", startHttpCall), )) defer spanHttp.End() + defer types.SetDurationSpan(startHttpCall, spanHttp) + resp, err := doRequest(req, token) - setDurationSpan(startHttpCall, spanHttp) if err != nil { log.G(ctx).Error(err) return nil, err } if resp != nil { - spanHttp.SetAttributes(attribute.Int("exit.code", resp.StatusCode)) + types.SetDurationSpan(startHttpCall, spanHttp, types.WithHTTPReturnCode(resp.StatusCode)) if resp.StatusCode != http.StatusOK { err = errors.New("Unexpected error occured while getting logs. Status code: " + strconv.Itoa(resp.StatusCode) + ". Check InterLink's logs for further informations") } else { returnValue = resp.Body } } + return returnValue, err } @@ -427,7 +432,7 @@ func RemoteExecution(ctx context.Context, config VirtualKubeletConfig, p *Virtua } else { pod.Status.Phase = v1.PodFailed pod.Status.Reason = "CFGMaps/Secrets not found" - for i, _ := range pod.Status.ContainerStatuses { + for i := range pod.Status.ContainerStatuses { pod.Status.ContainerStatuses[i].Ready = false } p.UpdatePod(ctx, pod) @@ -607,10 +612,3 @@ func checkPodsStatus(ctx context.Context, p *VirtualKubeletProvider, podsList [] return nil, err } - -func setDurationSpan(startTime int64, span trace.Span) { - endTime := time.Now().UnixMicro() - duration := endTime - startTime - span.SetAttributes(attribute.Int64("end.timestamp", endTime)) - span.SetAttributes(attribute.Int64("duration", duration)) -} diff --git a/pkg/virtualkubelet/virtualkubelet.go b/pkg/virtualkubelet/virtualkubelet.go index 1285f70a..159ac9a7 100644 --- a/pkg/virtualkubelet/virtualkubelet.go +++ b/pkg/virtualkubelet/virtualkubelet.go @@ -356,14 +356,13 @@ func (p *VirtualKubeletProvider) CreatePod(ctx context.Context, pod *v1.Pod) err attribute.Int64("start.timestamp", start), )) defer span.End() + defer types.SetDurationSpan(start, span) var hasInitContainers = false var state v1.ContainerState - defer span.End() key, err := buildKey(pod) if err != nil { - setDurationSpan(start, span) return err } now := metav1.NewTime(time.Now()) @@ -436,7 +435,6 @@ func (p *VirtualKubeletProvider) CreatePod(ctx context.Context, pod *v1.Pod) err err := RemoteExecution(ctx, p.config, p, pod, CREATE) if err != nil { if err.Error() == "Deleted pod before actual creation" { - setDurationSpan(start, span) log.G(ctx).Warn(err) } else { // TODO if node in NotReady put it to Unknown/pending? @@ -465,7 +463,6 @@ func (p *VirtualKubeletProvider) CreatePod(ctx context.Context, pod *v1.Pod) err p.UpdatePod(ctx, pod) } - setDurationSpan(start, span) return } }() @@ -485,8 +482,6 @@ func (p *VirtualKubeletProvider) CreatePod(ctx context.Context, pod *v1.Pod) err p.pods[key] = pod - setDurationSpan(start, span) - return nil } @@ -500,13 +495,12 @@ func (p *VirtualKubeletProvider) UpdatePod(ctx context.Context, pod *v1.Pod) err attribute.Int64("start.timestamp", start), )) defer span.End() + defer types.SetDurationSpan(start, span) log.G(ctx).Infof("receive UpdatePod %q", pod.Name) p.notifier(pod) - setDurationSpan(start, span) - return nil } @@ -520,17 +514,16 @@ func (p *VirtualKubeletProvider) DeletePod(ctx context.Context, pod *v1.Pod) (er attribute.Int64("start.timestamp", start), )) defer span.End() + defer types.SetDurationSpan(start, span) log.G(ctx).Infof("receive DeletePod %q", pod.Name) key, err := buildKey(pod) if err != nil { - setDurationSpan(start, span) return err } if _, exists := p.pods[key]; !exists { - setDurationSpan(start, span) return errdefs.NotFound("pod not found") } @@ -541,7 +534,6 @@ func (p *VirtualKubeletProvider) DeletePod(ctx context.Context, pod *v1.Pod) (er err = RemoteExecution(ctx, p.config, p, pod, DELETE) if err != nil { log.G(ctx).Error(err) - setDurationSpan(start, span) return } }() @@ -573,8 +565,6 @@ func (p *VirtualKubeletProvider) DeletePod(ctx context.Context, pod *v1.Pod) (er // delete from p.pods delete(p.pods, key) - setDurationSpan(start, span) - return nil } @@ -588,22 +578,19 @@ func (p *VirtualKubeletProvider) GetPod(ctx context.Context, namespace, name str attribute.Int64("start.timestamp", start), )) defer span.End() + defer types.SetDurationSpan(start, span) log.G(ctx).Infof("receive GetPod %q", name) key, err := buildKeyFromNames(namespace, name) if err != nil { - setDurationSpan(start, span) return nil, err } if pod, ok := p.pods[key]; ok { - setDurationSpan(start, span) return pod, nil } - setDurationSpan(start, span) - return nil, errdefs.NotFoundf("pod \"%s/%s\" is not known to the provider", namespace, name) } @@ -618,17 +605,15 @@ func (p *VirtualKubeletProvider) GetPodStatus(ctx context.Context, namespace, na attribute.Int64("start.timestamp", start), )) defer span.End() + defer types.SetDurationSpan(start, span) log.G(ctx).Infof("receive GetPodStatus %q", name) pod, err := p.GetPod(ctx, namespace, name) if err != nil { - setDurationSpan(start, span) return nil, err } - setDurationSpan(start, span) - return &pod.Status, nil } @@ -640,6 +625,7 @@ func (p *VirtualKubeletProvider) GetPods(ctx context.Context) ([]*v1.Pod, error) attribute.Int64("start.timestamp", start), )) defer span.End() + defer types.SetDurationSpan(start, span) log.G(ctx).Info("receive GetPods") @@ -652,7 +638,6 @@ func (p *VirtualKubeletProvider) GetPods(ctx context.Context) ([]*v1.Pod, error) pods = append(pods, pod) } - setDurationSpan(start, span) return pods, nil } @@ -776,6 +761,7 @@ func (p *VirtualKubeletProvider) GetLogs(ctx context.Context, namespace, podName attribute.Int64("start.timestamp", start), )) defer span.End() + defer types.SetDurationSpan(start, span) log.G(ctx).Infof("receive GetPodLogs %q", podName) @@ -792,7 +778,6 @@ func (p *VirtualKubeletProvider) GetLogs(ctx context.Context, namespace, podName Opts: types.ContainerLogOpts(opts), } - setDurationSpan(start, span) return LogRetrieval(ctx, p.config, logsRequest) } @@ -804,6 +789,7 @@ func (p *VirtualKubeletProvider) GetStatsSummary(ctx context.Context) (*stats.Su attribute.Int64("start.timestamp", start), )) defer span.End() + defer types.SetDurationSpan(start, span) // Grab the current timestamp so we can report it as the time the stats were generated. time := metav1.NewTime(time.Now()) @@ -874,7 +860,6 @@ func (p *VirtualKubeletProvider) GetStatsSummary(ctx context.Context) (*stats.Su } // Return the dummy stats. - setDurationSpan(start, span) return res, nil } @@ -887,6 +872,7 @@ func (p *VirtualKubeletProvider) RetrievePodsFromCluster(ctx context.Context) er attribute.Int64("start.timestamp", start), )) defer span.End() + defer types.SetDurationSpan(start, span) log.G(ctx).Info("Retrieving ALL Pods registered to the cluster and owned by VK") @@ -915,7 +901,6 @@ func (p *VirtualKubeletProvider) RetrievePodsFromCluster(ctx context.Context) er } - setDurationSpan(start, span) return err } @@ -937,6 +922,7 @@ func (p *VirtualKubeletProvider) initClientSet(ctx context.Context) error { attribute.Int64("start.timestamp", start), )) defer span.End() + defer types.SetDurationSpan(start, span) if p.clientSet == nil { kubeconfig := os.Getenv("KUBECONFIG") @@ -954,6 +940,5 @@ func (p *VirtualKubeletProvider) initClientSet(ctx context.Context) error { } } - setDurationSpan(start, span) return nil }