diff --git a/cmd/main.go b/cmd/main.go index cad5467..c21d421 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -2,16 +2,83 @@ package main import ( "context" + "crypto/tls" + "fmt" "net/http" + "os" "strconv" + "time" "github.com/sirupsen/logrus" "github.com/virtual-kubelet/virtual-kubelet/log" logruslogger "github.com/virtual-kubelet/virtual-kubelet/log/logrus" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" slurm "github.com/intertwin-eu/interlink-slurm-plugin/pkg/slurm" + + "github.com/virtual-kubelet/virtual-kubelet/trace" + "github.com/virtual-kubelet/virtual-kubelet/trace/opentelemetry" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/sdk/resource" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.21.0" ) +func initProvider(ctx context.Context) (func(context.Context) error, error) { + res, err := resource.New(ctx, + resource.WithAttributes( + // the service name used to display traces in backends + semconv.ServiceName("InterLink-SLURM-plugin"), + ), + ) + if err != nil { + return nil, fmt.Errorf("failed to create resource: %w", err) + } + + ctx, cancel := context.WithTimeout(ctx, time.Second) + defer cancel() + + otlpEndpoint := os.Getenv("TELEMETRY_ENDPOINT") + + if otlpEndpoint == "" { + otlpEndpoint = "localhost:4317" + } + + fmt.Println("TELEMETRY_ENDPOINT: ", otlpEndpoint) + + conn := &grpc.ClientConn{} + creds := credentials.NewTLS(&tls.Config{InsecureSkipVerify: true}) + conn, err = grpc.DialContext(ctx, otlpEndpoint, grpc.WithTransportCredentials(creds), grpc.WithBlock()) + + if err != nil { + return nil, fmt.Errorf("failed to create gRPC connection to collector: %w", err) + } + + // Set up a trace exporter + traceExporter, err := otlptracegrpc.New(ctx, otlptracegrpc.WithGRPCConn(conn)) + if err != nil { + return nil, fmt.Errorf("failed to create trace exporter: %w", err) + } + + // Register the trace exporter with a TracerProvider, using a batch + // span processor to aggregate spans before export. + bsp := sdktrace.NewBatchSpanProcessor(traceExporter) + tracerProvider := sdktrace.NewTracerProvider( + sdktrace.WithSampler(sdktrace.AlwaysSample()), + sdktrace.WithResource(res), + sdktrace.WithSpanProcessor(bsp), + ) + otel.SetTracerProvider(tracerProvider) + + // set global propagator to tracecontext (the default is no-op). + otel.SetTextMapPropagator(propagation.TraceContext{}) + + return tracerProvider.Shutdown, nil +} + func main() { logger := logrus.StandardLogger() @@ -31,14 +98,32 @@ func main() { log.L = logruslogger.FromLogrus(logrus.NewEntry(logger)) JobIDs := make(map[string]*slurm.JidStruct) - Ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() - log.G(Ctx).Debug("Debug level: " + strconv.FormatBool(slurmConfig.VerboseLogging)) + + if os.Getenv("ENABLE_TRACING") == "1" { + shutdown, err := initProvider(ctx) + if err != nil { + log.G(ctx).Fatal(err) + } + defer func() { + if err = shutdown(ctx); err != nil { + log.G(ctx).Fatal("failed to shutdown TracerProvider: %w", err) + } + }() + + log.G(ctx).Info("Tracer setup succeeded") + + // TODO: disable this through options + trace.T = opentelemetry.Adapter{} + } + + log.G(ctx).Debug("Debug level: " + strconv.FormatBool(slurmConfig.VerboseLogging)) SidecarAPIs := slurm.SidecarHandler{ Config: slurmConfig, JIDs: &JobIDs, - Ctx: Ctx, + Ctx: ctx, } mutex := http.NewServeMux() @@ -47,11 +132,11 @@ func main() { mutex.HandleFunc("/delete", SidecarAPIs.StopHandler) mutex.HandleFunc("/getLogs", SidecarAPIs.GetLogsHandler) - slurm.CreateDirectories(slurmConfig) - slurm.LoadJIDs(Ctx, slurmConfig, &JobIDs) + SidecarAPIs.CreateDirectories() + SidecarAPIs.LoadJIDs() err = http.ListenAndServe(":"+slurmConfig.Sidecarport, mutex) if err != nil { - log.G(Ctx).Fatal(err) + log.G(ctx).Fatal(err) } } diff --git a/pkg/slurm/Create.go b/pkg/slurm/Create.go index 50279d7..a55881b 100644 --- a/pkg/slurm/Create.go +++ b/pkg/slurm/Create.go @@ -6,22 +6,36 @@ import ( "io" "net/http" "os" + "strconv" "strings" + "time" "github.com/containerd/containerd/log" commonIL "github.com/intertwin-eu/interlink/pkg/interlink" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + trace "go.opentelemetry.io/otel/trace" ) // SubmitHandler generates and submits a SLURM batch script according to provided data. // 1 Pod = 1 Job. If a Pod has multiple containers, every container is a line with it's parameters in the SLURM script. func (h *SidecarHandler) SubmitHandler(w http.ResponseWriter, r *http.Request) { + start := time.Now().UnixMicro() + tracer := otel.Tracer("interlink-API") + spanCtx, span := tracer.Start(h.Ctx, "CreateSLURM", trace.WithAttributes( + attribute.Int64("start.timestamp", start), + )) + defer span.End() + defer commonIL.SetDurationSpan(start, span) + log.G(h.Ctx).Info("Slurm Sidecar: received Submit call") statusCode := http.StatusOK bodyBytes, err := io.ReadAll(r.Body) if err != nil { statusCode = http.StatusInternalServerError - h.handleError(w, statusCode, err) + h.handleError(spanCtx, w, statusCode, err) return } @@ -34,9 +48,7 @@ func (h *SidecarHandler) SubmitHandler(w http.ResponseWriter, r *http.Request) { err = json.Unmarshal(bodyBytes, &dataList) if err != nil { statusCode = http.StatusInternalServerError - w.WriteHeader(statusCode) - w.Write([]byte("Some errors occurred while creating container. Check Slurm Sidecar's logs")) - log.G(h.Ctx).Error(err) + h.handleError(spanCtx, w, http.StatusGatewayTimeout, err) return } @@ -69,7 +81,7 @@ func (h *SidecarHandler) SubmitHandler(w http.ResponseWriter, r *http.Request) { commstr1 := []string{"singularity", "exec", "--containall", "--nv", singularityMounts, singularityOptions} - envs := prepareEnvs(h.Ctx, container) + envs := prepareEnvs(spanCtx, container) image := "" CPULimit, _ := container.Resources.Limits.Cpu().AsInt64() @@ -87,13 +99,11 @@ func (h *SidecarHandler) SubmitHandler(w http.ResponseWriter, r *http.Request) { resourceLimits.Memory += MemoryLimit } - mounts, err := prepareMounts(h.Ctx, h.Config, data, container, filesPath) + mounts, err := prepareMounts(spanCtx, h.Config, data, container, filesPath) log.G(h.Ctx).Debug(mounts) if err != nil { statusCode = http.StatusInternalServerError - w.WriteHeader(statusCode) - w.Write([]byte("Error prepairing mounts. Check Slurm Sidecar's logs")) - log.G(h.Ctx).Error(err) + h.handleError(spanCtx, w, http.StatusGatewayTimeout, err) os.RemoveAll(filesPath) return } @@ -120,10 +130,24 @@ func (h *SidecarHandler) SubmitHandler(w http.ResponseWriter, r *http.Request) { isInit = true } + span.SetAttributes( + attribute.String("job.container"+strconv.Itoa(i)+".name", container.Name), + attribute.Bool("job.container"+strconv.Itoa(i)+".isinit", isInit), + attribute.StringSlice("job.container"+strconv.Itoa(i)+".envs", envs), + attribute.String("job.container"+strconv.Itoa(i)+".image", image), + attribute.StringSlice("job.container"+strconv.Itoa(i)+".command", container.Command), + attribute.StringSlice("job.container"+strconv.Itoa(i)+".args", container.Args), + ) + singularity_command_pod = append(singularity_command_pod, SingularityCommand{singularityCommand: singularity_command, containerName: container.Name, containerArgs: container.Args, containerCommand: container.Command, isInitContainer: isInit}) } - path, err := produceSLURMScript(h.Ctx, h.Config, string(data.Pod.UID), filesPath, metadata, singularity_command_pod, resourceLimits) + span.SetAttributes( + attribute.Int64("job.limits.cpu", resourceLimits.CPU), + attribute.Int64("job.limits.memory", resourceLimits.Memory), + ) + + path, err := produceSLURMScript(spanCtx, h.Config, string(data.Pod.UID), filesPath, metadata, singularity_command_pod, resourceLimits) if err != nil { log.G(h.Ctx).Error(err) os.RemoveAll(filesPath) @@ -131,10 +155,9 @@ func (h *SidecarHandler) SubmitHandler(w http.ResponseWriter, r *http.Request) { } out, err := SLURMBatchSubmit(h.Ctx, h.Config, path) if err != nil { + span.AddEvent("Failed to submit the SLURM Job") statusCode = http.StatusInternalServerError - w.WriteHeader(statusCode) - w.Write([]byte("Error submitting Slurm script. Check Slurm Sidecar's logs")) - log.G(h.Ctx).Error(err) + h.handleError(spanCtx, w, http.StatusGatewayTimeout, err) os.RemoveAll(filesPath) return } @@ -142,28 +165,29 @@ func (h *SidecarHandler) SubmitHandler(w http.ResponseWriter, r *http.Request) { jid, err := handleJidAndPodUid(h.Ctx, data.Pod, h.JIDs, out, filesPath) if err != nil { statusCode = http.StatusInternalServerError - w.WriteHeader(statusCode) - w.Write([]byte("Error handling JID. Check Slurm Sidecar's logs")) - log.G(h.Ctx).Error(err) + h.handleError(spanCtx, w, http.StatusGatewayTimeout, err) os.RemoveAll(filesPath) - err = deleteContainer(h.Ctx, h.Config, string(data.Pod.UID), h.JIDs, filesPath) + err = deleteContainer(spanCtx, h.Config, string(data.Pod.UID), h.JIDs, filesPath) if err != nil { log.G(h.Ctx).Error(err) } return } + span.AddEvent("SLURM Job successfully submitted with ID " + jid) returnedJID = CreateStruct{PodUID: string(data.Pod.UID), PodJID: jid} returnedJIDBytes, err = json.Marshal(returnedJID) if err != nil { statusCode = http.StatusInternalServerError - h.handleError(w, statusCode, err) + h.handleError(spanCtx, w, statusCode, err) return } w.WriteHeader(statusCode) + commonIL.SetDurationSpan(start, span, commonIL.WithHTTPReturnCode(statusCode)) + if statusCode != http.StatusOK { w.Write([]byte("Some errors occurred while creating containers. Check Slurm Sidecar's logs")) } else { diff --git a/pkg/slurm/Delete.go b/pkg/slurm/Delete.go index f0a2b3c..4be046c 100644 --- a/pkg/slurm/Delete.go +++ b/pkg/slurm/Delete.go @@ -5,20 +5,34 @@ import ( "io" "net/http" "os" + "time" "github.com/containerd/containerd/log" + commonIL "github.com/intertwin-eu/interlink/pkg/interlink" v1 "k8s.io/api/core/v1" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + trace "go.opentelemetry.io/otel/trace" ) // StopHandler runs a scancel command, updating JIDs and cached statuses func (h *SidecarHandler) StopHandler(w http.ResponseWriter, r *http.Request) { + start := time.Now().UnixMicro() + tracer := otel.Tracer("interlink-API") + spanCtx, span := tracer.Start(h.Ctx, "DeleteSLURM", trace.WithAttributes( + attribute.Int64("start.timestamp", start), + )) + defer span.End() + defer commonIL.SetDurationSpan(start, span) + log.G(h.Ctx).Info("Slurm Sidecar: received Stop call") statusCode := http.StatusOK bodyBytes, err := io.ReadAll(r.Body) if err != nil { statusCode = http.StatusInternalServerError - h.handleError(w, statusCode, err) + h.handleError(spanCtx, w, statusCode, err) return } @@ -26,32 +40,34 @@ func (h *SidecarHandler) StopHandler(w http.ResponseWriter, r *http.Request) { err = json.Unmarshal(bodyBytes, &pod) if err != nil { statusCode = http.StatusInternalServerError - h.handleError(w, statusCode, err) + h.handleError(spanCtx, w, statusCode, err) return } filesPath := h.Config.DataRootFolder + pod.Namespace + "-" + string(pod.UID) - err = deleteContainer(h.Ctx, h.Config, string(pod.UID), h.JIDs, filesPath) + err = deleteContainer(spanCtx, h.Config, string(pod.UID), h.JIDs, filesPath) + if err != nil { statusCode = http.StatusInternalServerError - h.handleError(w, statusCode, err) + h.handleError(spanCtx, w, statusCode, err) return } if os.Getenv("SHARED_FS") != "true" { err = os.RemoveAll(filesPath) if err != nil { statusCode = http.StatusInternalServerError - h.handleError(w, statusCode, err) + h.handleError(spanCtx, w, statusCode, err) return } } + commonIL.SetDurationSpan(start, span, commonIL.WithHTTPReturnCode(statusCode)) + w.WriteHeader(statusCode) if statusCode != http.StatusOK { w.Write([]byte("Some errors occurred deleting containers. Check Slurm Sidecar's logs")) } else { - w.Write([]byte("All containers for submitted Pods have been deleted")) } } diff --git a/pkg/slurm/GetLogs.go b/pkg/slurm/GetLogs.go index 7a65520..dc92a50 100644 --- a/pkg/slurm/GetLogs.go +++ b/pkg/slurm/GetLogs.go @@ -11,11 +11,23 @@ import ( "github.com/containerd/containerd/log" commonIL "github.com/intertwin-eu/interlink/pkg/interlink" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + trace "go.opentelemetry.io/otel/trace" ) // GetLogsHandler reads Jobs' output file to return what's logged inside. // What's returned is based on the provided parameters (Tail/LimitBytes/Timestamps/etc) func (h *SidecarHandler) GetLogsHandler(w http.ResponseWriter, r *http.Request) { + start := time.Now().UnixMicro() + tracer := otel.Tracer("interlink-API") + spanCtx, span := tracer.Start(h.Ctx, "GetLogsSLURM", trace.WithAttributes( + attribute.Int64("start.timestamp", start), + )) + defer span.End() + defer commonIL.SetDurationSpan(start, span) + log.G(h.Ctx).Info("Docker Sidecar: received GetLogs call") var req commonIL.LogStruct statusCode := http.StatusOK @@ -24,21 +36,33 @@ func (h *SidecarHandler) GetLogsHandler(w http.ResponseWriter, r *http.Request) bodyBytes, err := io.ReadAll(r.Body) if err != nil { statusCode = http.StatusInternalServerError - h.handleError(w, statusCode, err) + h.handleError(spanCtx, w, statusCode, err) return } err = json.Unmarshal(bodyBytes, &req) if err != nil { statusCode = http.StatusInternalServerError - h.handleError(w, statusCode, err) + h.handleError(spanCtx, w, statusCode, err) return } + span.SetAttributes( + attribute.String("pod.name", req.PodName), + attribute.String("pod.namespace", req.Namespace), + attribute.Int("opts.limitbytes", req.Opts.LimitBytes), + attribute.Int("opts.since", req.Opts.SinceSeconds), + attribute.Int64("opts.sincetime", req.Opts.SinceTime.UnixMicro()), + attribute.Int("opts.tail", req.Opts.Tail), + attribute.Bool("opts.follow", req.Opts.Follow), + attribute.Bool("opts.previous", req.Opts.Previous), + attribute.Bool("opts.timestamps", req.Opts.Timestamps), + ) + path := h.Config.DataRootFolder + req.Namespace + "-" + req.PodUID var output []byte if req.Opts.Timestamps { - h.handleError(w, statusCode, err) + h.handleError(spanCtx, w, statusCode, err) return } else { log.G(h.Ctx).Info("Reading " + path + "/" + req.ContainerName + ".out") @@ -52,7 +76,8 @@ func (h *SidecarHandler) GetLogsHandler(w http.ResponseWriter, r *http.Request) } if err1 != nil && err2 != nil { - h.handleError(w, statusCode, err) + span.AddEvent("Error retrieving logs") + h.handleError(spanCtx, w, statusCode, err) return } @@ -115,6 +140,8 @@ func (h *SidecarHandler) GetLogsHandler(w http.ResponseWriter, r *http.Request) } } + commonIL.SetDurationSpan(start, span, commonIL.WithHTTPReturnCode(statusCode)) + if statusCode != http.StatusOK { w.Write([]byte("Some errors occurred while checking container status. Check Docker Sidecar's logs")) } else { diff --git a/pkg/slurm/Status.go b/pkg/slurm/Status.go index f29dea1..f8be0d9 100644 --- a/pkg/slurm/Status.go +++ b/pkg/slurm/Status.go @@ -18,10 +18,22 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" commonIL "github.com/intertwin-eu/interlink/pkg/interlink" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + trace "go.opentelemetry.io/otel/trace" ) // StatusHandler performs a squeue --me and uses regular expressions to get the running Jobs' status func (h *SidecarHandler) StatusHandler(w http.ResponseWriter, r *http.Request) { + start := time.Now().UnixMicro() + tracer := otel.Tracer("interlink-API") + spanCtx, span := tracer.Start(h.Ctx, "GetLogsSLURM", trace.WithAttributes( + attribute.Int64("start.timestamp", start), + )) + defer span.End() + defer commonIL.SetDurationSpan(start, span) + var req []*v1.Pod var resp []commonIL.PodStatus statusCode := http.StatusOK @@ -31,14 +43,14 @@ func (h *SidecarHandler) StatusHandler(w http.ResponseWriter, r *http.Request) { bodyBytes, err := io.ReadAll(r.Body) if err != nil { statusCode = http.StatusInternalServerError - h.handleError(w, statusCode, err) + h.handleError(spanCtx, w, statusCode, err) return } err = json.Unmarshal(bodyBytes, &req) if err != nil { statusCode = http.StatusInternalServerError - h.handleError(w, statusCode, err) + h.handleError(spanCtx, w, statusCode, err) return } @@ -54,7 +66,7 @@ func (h *SidecarHandler) StatusHandler(w http.ResponseWriter, r *http.Request) { if execReturn.Stderr != "" { statusCode = http.StatusInternalServerError - h.handleError(w, statusCode, errors.New("unable to retrieve job status: "+execReturn.Stderr)) + h.handleError(spanCtx, w, statusCode, errors.New("unable to retrieve job status: "+execReturn.Stderr)) return } @@ -63,7 +75,7 @@ func (h *SidecarHandler) StatusHandler(w http.ResponseWriter, r *http.Request) { uid := string(pod.UID) path := h.Config.DataRootFolder + pod.Namespace + "-" + string(pod.UID) - if checkIfJidExists((h.JIDs), uid) { + if checkIfJidExists(spanCtx, (h.JIDs), uid) { cmd := []string{"--noheader", "-a", "-j " + (*h.JIDs)[uid].JID} shell := exec.ExecTask{ Command: h.Config.Squeuepath, @@ -76,13 +88,14 @@ func (h *SidecarHandler) StatusHandler(w http.ResponseWriter, r *http.Request) { //log.G(h.Ctx).Info("Pod: " + jid.PodUID + " | JID: " + jid.JID) if execReturn.Stderr != "" { + span.AddEvent("squeue returned error " + execReturn.Stderr + " for Job " + (*h.JIDs)[uid].JID + ".\nGetting status from files") log.G(h.Ctx).Error("ERR: ", execReturn.Stderr) for _, ct := range pod.Spec.Containers { log.G(h.Ctx).Info("Getting exit status from " + path + "/" + ct.Name + ".status") file, err := os.Open(path + "/" + ct.Name + ".status") if err != nil { statusCode = http.StatusInternalServerError - h.handleError(w, statusCode, fmt.Errorf("unable to retrieve container status: %s", err)) + h.handleError(spanCtx, w, statusCode, fmt.Errorf("unable to retrieve container status: %s", err)) log.G(h.Ctx).Error() return } @@ -90,7 +103,7 @@ func (h *SidecarHandler) StatusHandler(w http.ResponseWriter, r *http.Request) { statusb, err := io.ReadAll(file) if err != nil { statusCode = http.StatusInternalServerError - h.handleError(w, statusCode, fmt.Errorf("unable to read container status: %s", err)) + h.handleError(spanCtx, w, statusCode, fmt.Errorf("unable to read container status: %s", err)) log.G(h.Ctx).Error() return } @@ -98,7 +111,7 @@ func (h *SidecarHandler) StatusHandler(w http.ResponseWriter, r *http.Request) { status, err := strconv.Atoi(strings.Replace(string(statusb), "\n", "", -1)) if err != nil { statusCode = http.StatusInternalServerError - h.handleError(w, statusCode, fmt.Errorf("unable to convert container status: %s", err)) + h.handleError(spanCtx, w, statusCode, fmt.Errorf("unable to convert container status: %s", err)) log.G(h.Ctx).Error() status = 500 } @@ -133,7 +146,7 @@ func (h *SidecarHandler) StatusHandler(w http.ResponseWriter, r *http.Request) { f, err := os.Create(path + "/FinishedAt.time") if err != nil { statusCode = http.StatusInternalServerError - h.handleError(w, statusCode, err) + h.handleError(spanCtx, w, statusCode, err) return } f.WriteString((*h.JIDs)[uid].EndTime.Format("2006-01-02 15:04:05.999999999 -0700 MST")) @@ -154,7 +167,7 @@ func (h *SidecarHandler) StatusHandler(w http.ResponseWriter, r *http.Request) { f, err := os.Create(path + "/StartedAt.time") if err != nil { statusCode = http.StatusInternalServerError - h.handleError(w, statusCode, err) + h.handleError(spanCtx, w, statusCode, err) return } f.WriteString((*h.JIDs)[uid].StartTime.Format("2006-01-02 15:04:05.999999999 -0700 MST")) @@ -170,7 +183,7 @@ func (h *SidecarHandler) StatusHandler(w http.ResponseWriter, r *http.Request) { f, err := os.Create(path + "/FinishedAt.time") if err != nil { statusCode = http.StatusInternalServerError - h.handleError(w, statusCode, err) + h.handleError(spanCtx, w, statusCode, err) return } f.WriteString((*h.JIDs)[uid].EndTime.Format("2006-01-02 15:04:05.999999999 -0700 MST")) @@ -197,7 +210,7 @@ func (h *SidecarHandler) StatusHandler(w http.ResponseWriter, r *http.Request) { f, err := os.Create(path + "/FinishedAt.time") if err != nil { statusCode = http.StatusInternalServerError - h.handleError(w, statusCode, err) + h.handleError(spanCtx, w, statusCode, err) return } f.WriteString((*h.JIDs)[uid].EndTime.Format("2006-01-02 15:04:05.999999999 -0700 MST")) @@ -218,7 +231,7 @@ func (h *SidecarHandler) StatusHandler(w http.ResponseWriter, r *http.Request) { f, err := os.Create(path + "/StartedAt.time") if err != nil { statusCode = http.StatusInternalServerError - h.handleError(w, statusCode, err) + h.handleError(spanCtx, w, statusCode, err) return } f.WriteString((*h.JIDs)[uid].StartTime.Format("2006-01-02 15:04:05.999999999 -0700 MST")) @@ -240,7 +253,7 @@ func (h *SidecarHandler) StatusHandler(w http.ResponseWriter, r *http.Request) { f, err := os.Create(path + "/FinishedAt.time") if err != nil { statusCode = http.StatusInternalServerError - h.handleError(w, statusCode, err) + h.handleError(spanCtx, w, statusCode, err) return } f.WriteString((*h.JIDs)[uid].EndTime.Format("2006-01-02 15:04:05.999999999 -0700 MST")) @@ -261,7 +274,7 @@ func (h *SidecarHandler) StatusHandler(w http.ResponseWriter, r *http.Request) { f, err := os.Create(path + "/FinishedAt.time") if err != nil { statusCode = http.StatusInternalServerError - h.handleError(w, statusCode, err) + h.handleError(spanCtx, w, statusCode, err) return } f.WriteString((*h.JIDs)[uid].EndTime.Format("2006-01-02 15:04:05.999999999 -0700 MST")) @@ -302,7 +315,7 @@ func (h *SidecarHandler) StatusHandler(w http.ResponseWriter, r *http.Request) { } else { bodyBytes, err := json.Marshal(resp) if err != nil { - h.handleError(w, statusCode, err) + h.handleError(spanCtx, w, statusCode, err) return } w.Write(bodyBytes) diff --git a/pkg/slurm/func.go b/pkg/slurm/func.go index e4af7f0..34097c6 100644 --- a/pkg/slurm/func.go +++ b/pkg/slurm/func.go @@ -7,6 +7,7 @@ import ( "net/http" "os" + "go.opentelemetry.io/otel/trace" "k8s.io/client-go/kubernetes" "github.com/containerd/containerd/log" @@ -44,7 +45,7 @@ func NewSlurmConfig() (SlurmConfig, error) { } if _, err := os.Stat(path); err != nil { - log.G(context.Background()).Error("File " + path + " doesn't exist. You can set a custom path by exporting INTERLINKCONFIGPATH. Exiting...") + log.G(context.Background()).Error("File " + path + " doesn't exist. You can set a custom path by exporting SLURMCONFIGPATH. Exiting...") return SlurmConfig{}, err } @@ -95,7 +96,9 @@ func NewSlurmConfig() (SlurmConfig, error) { return SlurmConfigInst, nil } -func (h *SidecarHandler) handleError(w http.ResponseWriter, statusCode int, err error) { +func (h *SidecarHandler) handleError(ctx context.Context, w http.ResponseWriter, statusCode int, err error) { + span := trace.SpanFromContext(ctx) + span.AddEvent("An error occurred:" + err.Error()) w.WriteHeader(statusCode) w.Write([]byte("Some errors occurred while creating container. Check Slurm Sidecar's logs")) log.G(h.Ctx).Error(err) diff --git a/pkg/slurm/prepare.go b/pkg/slurm/prepare.go index 05c6989..e59a307 100644 --- a/pkg/slurm/prepare.go +++ b/pkg/slurm/prepare.go @@ -21,6 +21,9 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" commonIL "github.com/intertwin-eu/interlink/pkg/interlink" + + "go.opentelemetry.io/otel/attribute" + trace "go.opentelemetry.io/otel/trace" ) type SidecarHandler struct { @@ -97,8 +100,8 @@ func parsingTimeFromString(Ctx context.Context, stringTime string, timestampForm } // CreateDirectories is just a function to be sure directories exists at runtime -func CreateDirectories(config SlurmConfig) error { - path := config.DataRootFolder +func (h *SidecarHandler) CreateDirectories() error { + path := h.Config.DataRootFolder if _, err := os.Stat(path); err != nil { if os.IsNotExist(err) { err = os.MkdirAll(path, os.ModePerm) @@ -113,19 +116,19 @@ func CreateDirectories(config SlurmConfig) error { // LoadJIDs loads Job IDs into the main JIDs struct from files in the root folder. // It's useful went down and needed to be restarded, but there were jobs running, for example. // Return only error in case of failure -func LoadJIDs(Ctx context.Context, config SlurmConfig, JIDs *map[string]*JidStruct) error { - path := config.DataRootFolder +func (h *SidecarHandler) LoadJIDs() error { + path := h.Config.DataRootFolder dir, err := os.Open(path) if err != nil { - log.G(Ctx).Error(err) + log.G(h.Ctx).Error(err) return err } defer dir.Close() entries, err := dir.ReadDir(0) if err != nil { - log.G(Ctx).Error(err) + log.G(h.Ctx).Error(err) return err } @@ -138,43 +141,43 @@ func LoadJIDs(Ctx context.Context, config SlurmConfig, JIDs *map[string]*JidStru JID, err := os.ReadFile(path + entry.Name() + "/" + "JobID.jid") if err != nil { - log.G(Ctx).Debug(err) + log.G(h.Ctx).Debug(err) continue } else { podUID, err = os.ReadFile(path + entry.Name() + "/" + "PodUID.uid") if err != nil { - log.G(Ctx).Debug(err) + log.G(h.Ctx).Debug(err) continue } else { podNamespace, err = os.ReadFile(path + entry.Name() + "/" + "PodNamespace.ns") if err != nil { - log.G(Ctx).Debug(err) + log.G(h.Ctx).Debug(err) continue } } StartedAtString, err := os.ReadFile(path + entry.Name() + "/" + "StartedAt.time") if err != nil { - log.G(Ctx).Debug(err) + log.G(h.Ctx).Debug(err) } else { - StartedAt, err = parsingTimeFromString(Ctx, string(StartedAtString), "2006-01-02 15:04:05.999999999 -0700 MST") + StartedAt, err = parsingTimeFromString(h.Ctx, string(StartedAtString), "2006-01-02 15:04:05.999999999 -0700 MST") if err != nil { - log.G(Ctx).Debug(err) + log.G(h.Ctx).Debug(err) } } } FinishedAtString, err := os.ReadFile(path + entry.Name() + "/" + "FinishedAt.time") if err != nil { - log.G(Ctx).Debug(err) + log.G(h.Ctx).Debug(err) } else { - FinishedAt, err = parsingTimeFromString(Ctx, string(FinishedAtString), "2006-01-02 15:04:05.999999999 -0700 MST") + FinishedAt, err = parsingTimeFromString(h.Ctx, string(FinishedAtString), "2006-01-02 15:04:05.999999999 -0700 MST") if err != nil { - log.G(Ctx).Debug(err) + log.G(h.Ctx).Debug(err) } } JIDEntry := JidStruct{PodUID: string(podUID), PodNamespace: string(podNamespace), JID: string(JID), StartTime: StartedAt, EndTime: FinishedAt} - (*JIDs)[string(podUID)] = &JIDEntry + (*h.JIDs)[string(podUID)] = &JIDEntry } } @@ -184,10 +187,14 @@ func LoadJIDs(Ctx context.Context, config SlurmConfig, JIDs *map[string]*JidStru // prepareEnvs reads all Environment variables from a container and append them to a slice of strings. // It returns the slice containing all envs in the form of key=value. func prepareEnvs(Ctx context.Context, container v1.Container) []string { + start := time.Now().UnixMicro() + span := trace.SpanFromContext(Ctx) + span.AddEvent("Preparing ENVs for container " + container.Name) + var envs []string + if len(container.Env) > 0 { log.G(Ctx).Info("-- Appending envs") - env := make([]string, 1) - env = append(env, "--env") + envs = append(envs, "--env") env_data := "" for _, envVar := range container.Env { tmp := (envVar.Name + "=" + envVar.Value + ",") @@ -197,14 +204,17 @@ func prepareEnvs(Ctx context.Context, container v1.Container) []string { env_data = env_data[:last] } if env_data == "" { - env = []string{} + envs = []string{} } - env = append(env, env_data) - - return env - } else { - return []string{} + envs = append(envs, env_data) } + + duration := time.Now().UnixMicro() - start + span.AddEvent("Prepared ENVs for container "+container.Name, trace.WithAttributes( + attribute.String("prepareenvs.container.name", container.Name), + attribute.Int64("prepareenvs.duration", duration), + attribute.StringSlice("prepareenvs.container.envs", envs))) + return envs } // prepareMounts iterates along the struct provided in the data parameter and checks for ConfigMaps, Secrets and EmptyDirs to be mounted. @@ -220,6 +230,11 @@ func prepareMounts( container v1.Container, workingPath string, ) (string, error) { + span := trace.SpanFromContext(Ctx) + start := time.Now().UnixMicro() + log.G(Ctx).Info(span) + span.AddEvent("Preparing Mounts for container " + container.Name) + log.G(Ctx).Info("-- Preparing mountpoints for " + container.Name) mountedData := "" @@ -234,7 +249,7 @@ func prepareMounts( for _, cont := range podData.Containers { for _, cfgMap := range cont.ConfigMaps { if container.Name == cont.Name { - configMapPath, env, err := mountData(Ctx, config, podData.Pod, container, cfgMap, workingPath) + configMapPath, env, err := mountData(Ctx, config, podData.Pod, container, cfgMap, workingPath, span) if err != nil { log.G(Ctx).Error(err) return "", err @@ -256,7 +271,7 @@ func prepareMounts( for _, secret := range cont.Secrets { if container.Name == cont.Name { - secretPath, env, err := mountData(Ctx, config, podData.Pod, container, secret, workingPath) + secretPath, env, err := mountData(Ctx, config, podData.Pod, container, secret, workingPath, span) if err != nil { log.G(Ctx).Error(err) return "", err @@ -279,7 +294,7 @@ func prepareMounts( } if container.Name == cont.Name { - edPath, _, err := mountData(Ctx, config, podData.Pod, container, "emptyDir", workingPath) + edPath, _, err := mountData(Ctx, config, podData.Pod, container, "emptyDir", workingPath, span) if err != nil { log.G(Ctx).Error(err) return "", err @@ -307,6 +322,12 @@ func prepareMounts( } log.G(Ctx).Debug(mountedData) + duration := time.Now().UnixMicro() - start + span.AddEvent("Prepared mounts for container "+container.Name, trace.WithAttributes( + attribute.String("peparemounts.container.name", container.Name), + attribute.Int64("preparemounts.duration", duration), + attribute.String("preparemounts.container.mounts", mountedData))) + return mountedData, nil } @@ -323,6 +344,10 @@ func produceSLURMScript( commands []SingularityCommand, resourceLimits ResourceLimits, ) (string, error) { + start := time.Now().UnixMicro() + span := trace.SpanFromContext(Ctx) + span.AddEvent("Producing SLURM script") + log.G(Ctx).Info("-- Creating file for the Slurm script") prefix = "" err := os.MkdirAll(path, os.ModePerm) @@ -489,6 +514,12 @@ func produceSLURMScript( log.G(Ctx).Debug("---- Written file") } + duration := time.Now().UnixMicro() - start + span.AddEvent("Produced SLURM script", trace.WithAttributes( + attribute.String("produceslurmscript.path", f.Name()), + attribute.Int64("preparemounts.duration", duration), + )) + return f.Name(), nil } @@ -584,7 +615,8 @@ func removeJID(podUID string, JIDs *map[string]*JidStruct) { // Returns the first encountered error. func deleteContainer(Ctx context.Context, config SlurmConfig, podUID string, JIDs *map[string]*JidStruct, path string) error { log.G(Ctx).Info("- Deleting Job for pod " + podUID) - if checkIfJidExists(JIDs, podUID) { + span := trace.SpanFromContext(Ctx) + if checkIfJidExists(Ctx, JIDs, podUID) { _, err := exec.Command(config.Scancelpath, (*JIDs)[podUID].JID).Output() if err != nil { log.G(Ctx).Error(err) @@ -594,11 +626,21 @@ func deleteContainer(Ctx context.Context, config SlurmConfig, podUID string, JID } } err := os.RemoveAll(path) + jid := (*JIDs)[podUID].JID removeJID(podUID, JIDs) + + span.SetAttributes( + attribute.String("delete.pod.uid", podUID), + attribute.String("delete.jid", jid), + ) + if err != nil { log.G(Ctx).Error(err) - return err + span.AddEvent("Failed to delete SLURM Job " + (*JIDs)[podUID].JID + " for Pod " + podUID) + } else { + span.AddEvent("SLURM Job " + jid + " for Pod " + podUID + " successfully deleted") } + return err } @@ -607,11 +649,13 @@ func deleteContainer(Ctx context.Context, config SlurmConfig, podUID string, JID // Returns 2 slices of string, one containing the ConfigMaps/Secrets/EmptyDirs paths and one the list of relatives ENVS to be used // to create the files inside the container. // It also returns the first encountered error. -func mountData(Ctx context.Context, config SlurmConfig, pod v1.Pod, container v1.Container, data interface{}, path string) ([]string, string, error) { +func mountData(Ctx context.Context, config SlurmConfig, pod v1.Pod, container v1.Container, data interface{}, path string, span trace.Span) ([]string, string, error) { + start := time.Now().UnixMicro() if config.ExportPodData { for _, mountSpec := range container.VolumeMounts { switch mount := data.(type) { case v1.ConfigMap: + span.AddEvent("Preparing ConfigMap mount") for _, vol := range pod.Spec.Volumes { if vol.ConfigMap != nil && vol.Name == mountSpec.Name && mount.Name == vol.ConfigMap.Name { configMaps := make(map[string]string) @@ -694,12 +738,18 @@ func mountData(Ctx context.Context, config SlurmConfig, pod v1.Pod, container v1 } } } + duration := time.Now().UnixMicro() - start + span.AddEvent("Prepared ConfigMap mounts", trace.WithAttributes( + attribute.String("mountdata.container.name", container.Name), + attribute.Int64("mountdata.duration", duration), + attribute.StringSlice("mountdata.container.configmaps", configMapNamePath))) return configMapNamePath, env, nil } } //} case v1.Secret: + span.AddEvent("Preparing ConfigMap mount") for _, vol := range pod.Spec.Volumes { if vol.Secret != nil && vol.Name == mountSpec.Name && mount.Name == vol.Secret.SecretName { secrets := make(map[string][]byte) @@ -784,12 +834,18 @@ func mountData(Ctx context.Context, config SlurmConfig, pod v1.Pod, container v1 } } } + duration := time.Now().UnixMicro() - start + span.AddEvent("Prepared Secrets mounts", trace.WithAttributes( + attribute.String("mountdata.container.name", container.Name), + attribute.Int64("mountdata.duration", duration), + attribute.StringSlice("mountdata.container.secrets", secretNamePath))) return secretNamePath, env, nil } } //} case string: + span.AddEvent("Preparing EmptyDirs mount") var edPaths []string for _, vol := range pod.Spec.Volumes { for _, mountSpec := range container.VolumeMounts { @@ -823,6 +879,11 @@ func mountData(Ctx context.Context, config SlurmConfig, pod v1.Pod, container v1 } } } + duration := time.Now().UnixMicro() - start + span.AddEvent("Prepared Secrets mounts", trace.WithAttributes( + attribute.String("mountdata.container.name", container.Name), + attribute.Int64("mountdata.duration", duration), + attribute.StringSlice("mountdata.container.emptydirs", edPaths))) return edPaths, "", nil } } @@ -831,12 +892,14 @@ func mountData(Ctx context.Context, config SlurmConfig, pod v1.Pod, container v1 } // checkIfJidExists checks if a JID is in the main JIDs struct -func checkIfJidExists(JIDs *map[string]*JidStruct, uid string) bool { +func checkIfJidExists(ctx context.Context, JIDs *map[string]*JidStruct, uid string) bool { + span := trace.SpanFromContext(ctx) _, ok := (*JIDs)[uid] if ok { return true } else { + span.AddEvent("Span for PodUID " + uid + " doesn't exist") return false } }