Skip to content

Commit

Permalink
first opentelemtry tracing implementation
Browse files Browse the repository at this point in the history
Signed-off-by: Surax98 <[email protected]>
  • Loading branch information
Surax98 committed Aug 29, 2024
1 parent 37d0a3b commit 910e358
Show file tree
Hide file tree
Showing 7 changed files with 313 additions and 82 deletions.
97 changes: 91 additions & 6 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,83 @@ package main

import (
"context"
"crypto/tls"
"fmt"
"net/http"
"os"
"strconv"
"time"

"github.com/sirupsen/logrus"
"github.com/virtual-kubelet/virtual-kubelet/log"
logruslogger "github.com/virtual-kubelet/virtual-kubelet/log/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"

slurm "github.com/intertwin-eu/interlink-slurm-plugin/pkg/slurm"

"github.com/virtual-kubelet/virtual-kubelet/trace"
"github.com/virtual-kubelet/virtual-kubelet/trace/opentelemetry"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.21.0"
)

func initProvider(ctx context.Context) (func(context.Context) error, error) {
res, err := resource.New(ctx,
resource.WithAttributes(
// the service name used to display traces in backends
semconv.ServiceName("InterLink-SLURM-plugin"),
),
)
if err != nil {
return nil, fmt.Errorf("failed to create resource: %w", err)
}

ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()

otlpEndpoint := os.Getenv("TELEMETRY_ENDPOINT")

if otlpEndpoint == "" {
otlpEndpoint = "localhost:4317"
}

fmt.Println("TELEMETRY_ENDPOINT: ", otlpEndpoint)

conn := &grpc.ClientConn{}
creds := credentials.NewTLS(&tls.Config{InsecureSkipVerify: true})
conn, err = grpc.DialContext(ctx, otlpEndpoint, grpc.WithTransportCredentials(creds), grpc.WithBlock())

if err != nil {
return nil, fmt.Errorf("failed to create gRPC connection to collector: %w", err)
}

// Set up a trace exporter
traceExporter, err := otlptracegrpc.New(ctx, otlptracegrpc.WithGRPCConn(conn))
if err != nil {
return nil, fmt.Errorf("failed to create trace exporter: %w", err)
}

// Register the trace exporter with a TracerProvider, using a batch
// span processor to aggregate spans before export.
bsp := sdktrace.NewBatchSpanProcessor(traceExporter)
tracerProvider := sdktrace.NewTracerProvider(
sdktrace.WithSampler(sdktrace.AlwaysSample()),
sdktrace.WithResource(res),
sdktrace.WithSpanProcessor(bsp),
)
otel.SetTracerProvider(tracerProvider)

// set global propagator to tracecontext (the default is no-op).
otel.SetTextMapPropagator(propagation.TraceContext{})

return tracerProvider.Shutdown, nil
}

func main() {
logger := logrus.StandardLogger()

Expand All @@ -31,14 +98,32 @@ func main() {
log.L = logruslogger.FromLogrus(logrus.NewEntry(logger))

JobIDs := make(map[string]*slurm.JidStruct)
Ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
log.G(Ctx).Debug("Debug level: " + strconv.FormatBool(slurmConfig.VerboseLogging))

if os.Getenv("ENABLE_TRACING") == "1" {
shutdown, err := initProvider(ctx)
if err != nil {
log.G(ctx).Fatal(err)
}
defer func() {
if err = shutdown(ctx); err != nil {
log.G(ctx).Fatal("failed to shutdown TracerProvider: %w", err)
}
}()

log.G(ctx).Info("Tracer setup succeeded")

// TODO: disable this through options
trace.T = opentelemetry.Adapter{}
}

log.G(ctx).Debug("Debug level: " + strconv.FormatBool(slurmConfig.VerboseLogging))

SidecarAPIs := slurm.SidecarHandler{
Config: slurmConfig,
JIDs: &JobIDs,
Ctx: Ctx,
Ctx: ctx,
}

mutex := http.NewServeMux()
Expand All @@ -47,11 +132,11 @@ func main() {
mutex.HandleFunc("/delete", SidecarAPIs.StopHandler)
mutex.HandleFunc("/getLogs", SidecarAPIs.GetLogsHandler)

slurm.CreateDirectories(slurmConfig)
slurm.LoadJIDs(Ctx, slurmConfig, &JobIDs)
SidecarAPIs.CreateDirectories()
SidecarAPIs.LoadJIDs()

err = http.ListenAndServe(":"+slurmConfig.Sidecarport, mutex)
if err != nil {
log.G(Ctx).Fatal(err)
log.G(ctx).Fatal(err)
}
}
60 changes: 42 additions & 18 deletions pkg/slurm/Create.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,36 @@ import (
"io"
"net/http"
"os"
"strconv"
"strings"
"time"

"github.com/containerd/containerd/log"

commonIL "github.com/intertwin-eu/interlink/pkg/interlink"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
trace "go.opentelemetry.io/otel/trace"
)

// SubmitHandler generates and submits a SLURM batch script according to provided data.
// 1 Pod = 1 Job. If a Pod has multiple containers, every container is a line with it's parameters in the SLURM script.
func (h *SidecarHandler) SubmitHandler(w http.ResponseWriter, r *http.Request) {
start := time.Now().UnixMicro()
tracer := otel.Tracer("interlink-API")
spanCtx, span := tracer.Start(h.Ctx, "CreateSLURM", trace.WithAttributes(
attribute.Int64("start.timestamp", start),
))
defer span.End()
defer commonIL.SetDurationSpan(start, span)

log.G(h.Ctx).Info("Slurm Sidecar: received Submit call")
statusCode := http.StatusOK
bodyBytes, err := io.ReadAll(r.Body)
if err != nil {
statusCode = http.StatusInternalServerError
h.handleError(w, statusCode, err)
h.handleError(spanCtx, w, statusCode, err)
return
}

Expand All @@ -34,9 +48,7 @@ func (h *SidecarHandler) SubmitHandler(w http.ResponseWriter, r *http.Request) {
err = json.Unmarshal(bodyBytes, &dataList)
if err != nil {
statusCode = http.StatusInternalServerError
w.WriteHeader(statusCode)
w.Write([]byte("Some errors occurred while creating container. Check Slurm Sidecar's logs"))
log.G(h.Ctx).Error(err)
h.handleError(spanCtx, w, http.StatusGatewayTimeout, err)
return
}

Expand Down Expand Up @@ -69,7 +81,7 @@ func (h *SidecarHandler) SubmitHandler(w http.ResponseWriter, r *http.Request) {

commstr1 := []string{"singularity", "exec", "--containall", "--nv", singularityMounts, singularityOptions}

envs := prepareEnvs(h.Ctx, container)
envs := prepareEnvs(spanCtx, container)
image := ""

CPULimit, _ := container.Resources.Limits.Cpu().AsInt64()
Expand All @@ -87,13 +99,11 @@ func (h *SidecarHandler) SubmitHandler(w http.ResponseWriter, r *http.Request) {
resourceLimits.Memory += MemoryLimit
}

mounts, err := prepareMounts(h.Ctx, h.Config, data, container, filesPath)
mounts, err := prepareMounts(spanCtx, h.Config, data, container, filesPath)
log.G(h.Ctx).Debug(mounts)
if err != nil {
statusCode = http.StatusInternalServerError
w.WriteHeader(statusCode)
w.Write([]byte("Error prepairing mounts. Check Slurm Sidecar's logs"))
log.G(h.Ctx).Error(err)
h.handleError(spanCtx, w, http.StatusGatewayTimeout, err)
os.RemoveAll(filesPath)
return
}
Expand All @@ -120,50 +130,64 @@ func (h *SidecarHandler) SubmitHandler(w http.ResponseWriter, r *http.Request) {
isInit = true
}

span.SetAttributes(
attribute.String("job.container"+strconv.Itoa(i)+".name", container.Name),
attribute.Bool("job.container"+strconv.Itoa(i)+".isinit", isInit),
attribute.StringSlice("job.container"+strconv.Itoa(i)+".envs", envs),
attribute.String("job.container"+strconv.Itoa(i)+".image", image),
attribute.StringSlice("job.container"+strconv.Itoa(i)+".command", container.Command),
attribute.StringSlice("job.container"+strconv.Itoa(i)+".args", container.Args),
)

singularity_command_pod = append(singularity_command_pod, SingularityCommand{singularityCommand: singularity_command, containerName: container.Name, containerArgs: container.Args, containerCommand: container.Command, isInitContainer: isInit})
}

path, err := produceSLURMScript(h.Ctx, h.Config, string(data.Pod.UID), filesPath, metadata, singularity_command_pod, resourceLimits)
span.SetAttributes(
attribute.Int64("job.limits.cpu", resourceLimits.CPU),
attribute.Int64("job.limits.memory", resourceLimits.Memory),
)

path, err := produceSLURMScript(spanCtx, h.Config, string(data.Pod.UID), filesPath, metadata, singularity_command_pod, resourceLimits)
if err != nil {
log.G(h.Ctx).Error(err)
os.RemoveAll(filesPath)
return
}
out, err := SLURMBatchSubmit(h.Ctx, h.Config, path)
if err != nil {
span.AddEvent("Failed to submit the SLURM Job")
statusCode = http.StatusInternalServerError
w.WriteHeader(statusCode)
w.Write([]byte("Error submitting Slurm script. Check Slurm Sidecar's logs"))
log.G(h.Ctx).Error(err)
h.handleError(spanCtx, w, http.StatusGatewayTimeout, err)
os.RemoveAll(filesPath)
return
}
log.G(h.Ctx).Info(out)
jid, err := handleJidAndPodUid(h.Ctx, data.Pod, h.JIDs, out, filesPath)
if err != nil {
statusCode = http.StatusInternalServerError
w.WriteHeader(statusCode)
w.Write([]byte("Error handling JID. Check Slurm Sidecar's logs"))
log.G(h.Ctx).Error(err)
h.handleError(spanCtx, w, http.StatusGatewayTimeout, err)
os.RemoveAll(filesPath)
err = deleteContainer(h.Ctx, h.Config, string(data.Pod.UID), h.JIDs, filesPath)
err = deleteContainer(spanCtx, h.Config, string(data.Pod.UID), h.JIDs, filesPath)
if err != nil {
log.G(h.Ctx).Error(err)
}
return
}

span.AddEvent("SLURM Job successfully submitted with ID " + jid)
returnedJID = CreateStruct{PodUID: string(data.Pod.UID), PodJID: jid}

returnedJIDBytes, err = json.Marshal(returnedJID)
if err != nil {
statusCode = http.StatusInternalServerError
h.handleError(w, statusCode, err)
h.handleError(spanCtx, w, statusCode, err)
return
}

w.WriteHeader(statusCode)

commonIL.SetDurationSpan(start, span, commonIL.WithHTTPReturnCode(statusCode))

if statusCode != http.StatusOK {
w.Write([]byte("Some errors occurred while creating containers. Check Slurm Sidecar's logs"))
} else {
Expand Down
28 changes: 22 additions & 6 deletions pkg/slurm/Delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,53 +5,69 @@ import (
"io"
"net/http"
"os"
"time"

"github.com/containerd/containerd/log"
commonIL "github.com/intertwin-eu/interlink/pkg/interlink"
v1 "k8s.io/api/core/v1"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
trace "go.opentelemetry.io/otel/trace"
)

// StopHandler runs a scancel command, updating JIDs and cached statuses
func (h *SidecarHandler) StopHandler(w http.ResponseWriter, r *http.Request) {
start := time.Now().UnixMicro()
tracer := otel.Tracer("interlink-API")
spanCtx, span := tracer.Start(h.Ctx, "DeleteSLURM", trace.WithAttributes(
attribute.Int64("start.timestamp", start),
))
defer span.End()
defer commonIL.SetDurationSpan(start, span)

log.G(h.Ctx).Info("Slurm Sidecar: received Stop call")
statusCode := http.StatusOK

bodyBytes, err := io.ReadAll(r.Body)
if err != nil {
statusCode = http.StatusInternalServerError
h.handleError(w, statusCode, err)
h.handleError(spanCtx, w, statusCode, err)
return
}

var pod *v1.Pod
err = json.Unmarshal(bodyBytes, &pod)
if err != nil {
statusCode = http.StatusInternalServerError
h.handleError(w, statusCode, err)
h.handleError(spanCtx, w, statusCode, err)
return
}

filesPath := h.Config.DataRootFolder + pod.Namespace + "-" + string(pod.UID)

err = deleteContainer(h.Ctx, h.Config, string(pod.UID), h.JIDs, filesPath)
err = deleteContainer(spanCtx, h.Config, string(pod.UID), h.JIDs, filesPath)

if err != nil {
statusCode = http.StatusInternalServerError
h.handleError(w, statusCode, err)
h.handleError(spanCtx, w, statusCode, err)
return
}
if os.Getenv("SHARED_FS") != "true" {
err = os.RemoveAll(filesPath)
if err != nil {
statusCode = http.StatusInternalServerError
h.handleError(w, statusCode, err)
h.handleError(spanCtx, w, statusCode, err)
return
}
}

commonIL.SetDurationSpan(start, span, commonIL.WithHTTPReturnCode(statusCode))

w.WriteHeader(statusCode)
if statusCode != http.StatusOK {
w.Write([]byte("Some errors occurred deleting containers. Check Slurm Sidecar's logs"))
} else {

w.Write([]byte("All containers for submitted Pods have been deleted"))
}
}
Loading

0 comments on commit 910e358

Please sign in to comment.