Skip to content

Commit

Permalink
monitor for dcgm
Browse files Browse the repository at this point in the history
  • Loading branch information
ferris-cx committed Dec 19, 2024
1 parent a62dd49 commit b7d8ddb
Show file tree
Hide file tree
Showing 2 changed files with 207 additions and 1 deletion.
3 changes: 3 additions & 0 deletions cmd/koordlet/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package main

import (
"flag"
"github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/gpu"

Check failure on line 21 in cmd/koordlet/main.go

View workflow job for this annotation

GitHub Actions / golangci-lint

could not import github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/gpu (-: # github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/gpu
"net/http"
_ "net/http/pprof"
"time"
Expand Down Expand Up @@ -80,6 +81,8 @@ func main() {
// Expose the Prometheus http endpoint
go installHTTPHandler()

go gpu.StartGrpc()

// Start the Cmd
klog.Info("Starting the koordlet daemon")
d.Run(stopCtx.Done())
Expand Down
205 changes: 204 additions & 1 deletion pkg/koordlet/runtimehooks/hooks/gpu/gpu.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,17 @@ package gpu

import (
"fmt"
"google.golang.org/grpc"
"io/ioutil"
"net"
"os"
"path/filepath"
"strings"
"sync"
"time"

"k8s.io/klog/v2"
pb "k8s.io/kubelet/pkg/apis/podresources/v1alpha1"

ext "github.com/koordinator-sh/koordinator/apis/extension"
schedulingv1alpha1 "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1"
Expand All @@ -30,13 +38,37 @@ import (
rmconfig "github.com/koordinator-sh/koordinator/pkg/runtimeproxy/config"
)

const GpuAllocEnv = "NVIDIA_VISIBLE_DEVICES"
const (
GpuAllocEnv = "NVIDIA_VISIBLE_DEVICES"
ServerPodResourcesKubeletSocket = "/pod-resources/koordlet.sock"
ServerPodResourcesKubeletCheckPoint = "/pod-resources/podgpu.json"
interval = 30 * time.Second
)

var (
NodePodResources []*pb.PodResources
PodResourcesLock sync.RWMutex
)

type PodResourcesServer struct{}

func (s *PodResourcesServer) List(ctx context.Context, req *pb.ListPodResourcesRequest) (*pb.ListPodResourcesResponse, error) {

Check failure on line 55 in pkg/koordlet/runtimehooks/hooks/gpu/gpu.go

View workflow job for this annotation

GitHub Actions / unit-tests(Run Go build)

undefined: context

Check failure on line 55 in pkg/koordlet/runtimehooks/hooks/gpu/gpu.go

View workflow job for this annotation

GitHub Actions / unit-tests(Verify govet)

undefined: context

Check failure on line 55 in pkg/koordlet/runtimehooks/hooks/gpu/gpu.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: context

Check failure on line 55 in pkg/koordlet/runtimehooks/hooks/gpu/gpu.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: context

Check failure on line 55 in pkg/koordlet/runtimehooks/hooks/gpu/gpu.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: context

Check failure on line 55 in pkg/koordlet/runtimehooks/hooks/gpu/gpu.go

View workflow job for this annotation

GitHub Actions / unit-tests(Run Go test)

undefined: context
klog.V(1).Infof("List(): list resp nodePodResources %v", NodePodResources)
return &pb.ListPodResourcesResponse{PodResources: NodePodResources}, nil
}

type gpuPlugin struct{}

func (p *gpuPlugin) Register(op hooks.Options) {
klog.V(5).Infof("register hook %v", "gpu env inject")
hooks.Register(rmconfig.PreCreateContainer, "gpu env inject", "inject NVIDIA_VISIBLE_DEVICES env into container", p.InjectContainerGPUEnv)

//Construct the memory object by loading pod-gpu.json from disk
nodePodResources, err := LoadNodePodResourcesFromFile(ServerPodResourcesKubeletCheckPoint)
if err != nil {
klog.Errorf("Register(): Error loading nodePodResources: %v\n", err)
}
NodePodResources = nodePodResources
}

var singleton *gpuPlugin
Expand Down Expand Up @@ -106,5 +138,176 @@ func (p *gpuPlugin) InjectContainerGPUEnv(proto protocol.HooksProtocol) error {
}
}

klog.V(1).Infof("InjectContainerGPUEnv(): start to convertToPodResources")
convertToPodResources(containerReq, devices)

return nil
}

func convertToPodResources(request protocol.ContainerRequest, deviceAllocation []*ext.DeviceAllocation) *pb.ContainerDevices {
klog.V(1).Infof("convertToPodResources(): enter into convertToPodResources")
podName := request.PodMeta.Name
nameSpace := request.PodMeta.Namespace
containerName := request.ContainerMeta.Name

devices := make([]string, 0, len(deviceAllocation))
for _, device := range deviceAllocation {
devices = append(devices, device.ID)
}

containerDevices := &pb.ContainerDevices{
ResourceName: ext.ResourceNvidiaGPU.String(),
DeviceIds: devices,
}
klog.V(1).Infof("convertToPodResources(): containerDevices: %v", containerDevices)

containerDevicesSlice := make([]*pb.ContainerDevices, 0, 1)
containerDevicesSlice = append(containerDevicesSlice, containerDevices)

var podFound bool

PodResourcesLock.RLock()
for i, podResources := range NodePodResources {
klog.V(1).Infof("convertToPodResources(): traversal nodePodResources %d,%s/%s", i, podResources.GetName(), podResources.GetNamespace())
if podResources.Name == podName && podResources.Namespace == nameSpace {
klog.V(1).Infof("convertToPodResources(): pod is found in nodePodResources %d,%s/%s", i, podResources.GetName(), podResources.GetNamespace())
podFound = true

PodResourcesLock.RUnlock()

PodResourcesLock.Lock()
containerExists := false
for j, container := range podResources.Containers {
if container.Name == containerName {
klog.V(1).Infof("convertToPodResources(): container %s is found", containerName)
containerExists = true
podResources.Containers[j] = &pb.ContainerResources{
Name: containerName,
Devices: containerDevicesSlice,
}
break
}
}
if !containerExists {
klog.V(1).Infof("convertToPodResources(): container %s is not found", containerName)
podResources.Containers = append(podResources.Containers, &pb.ContainerResources{
Name: containerName,
Devices: containerDevicesSlice,
})
}
NodePodResources[i] = podResources
PodResourcesLock.Unlock()
break
}
}

if !podFound {
PodResourcesLock.RUnlock()
PodResourcesLock.Lock()
newPodResources := &pb.PodResources{
Name: podName,
Namespace: nameSpace,
Containers: []*pb.ContainerResources{
{
Name: containerName,
Devices: containerDevicesSlice,
},
},
}
NodePodResources = append(NodePodResources, newPodResources)
PodResourcesLock.Unlock()
}

klog.V(1).Infof("convertToPodResources(): nodePodResources: %v", NodePodResources)
return containerDevices
}

func StartGrpc() error {
lis, err := net.Listen("unix", ServerPodResourcesKubeletSocket)
if err != nil {
klog.Errorf("failed to listen: %v", err)
return err
}
if err := setSocketPermissions(ServerPodResourcesKubeletSocket); err != nil {
klog.Errorf("failed to set socket permissions: %v", err)
return err
}
server := grpc.NewServer()
pb.RegisterPodResourcesListerServer(server, &PodResourcesServer{})

startCheckpoint()

klog.V(4).Infof("startGrpc():Starting gRPC server on %s", ServerPodResourcesKubeletSocket)
if err := server.Serve(lis); err != nil {
klog.Errorf("failed to serve: %v", err)
return err
}
klog.V(1).Infof("startGrpc():end...")
return nil
}

func setSocketPermissions(socketPath string) error {
// In a real application, you would set the correct permissions here.
// For example:
return os.Chmod(socketPath, 0660)
//return nil
}

func startCheckpoint() {
stopCh := make(chan struct{})
go PeriodicSave(ServerPodResourcesKubeletCheckPoint, interval, stopCh)
}

func EnsureDirectory(path string) error {
return os.MkdirAll(filepath.Dir(path), os.ModePerm)
}

func SaveNodePodResourcesToFile(filename string, data []*pb.PodResources) error {
if err := EnsureDirectory(filename); err != nil {
return fmt.Errorf("failed to ensure directory for %s: %v", filename, err)
}

jsonData, err := json.MarshalIndent(data, "", " ")

Check failure on line 270 in pkg/koordlet/runtimehooks/hooks/gpu/gpu.go

View workflow job for this annotation

GitHub Actions / unit-tests(Run Go build)

undefined: json

Check failure on line 270 in pkg/koordlet/runtimehooks/hooks/gpu/gpu.go

View workflow job for this annotation

GitHub Actions / unit-tests(Verify govet)

undefined: json

Check failure on line 270 in pkg/koordlet/runtimehooks/hooks/gpu/gpu.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: json

Check failure on line 270 in pkg/koordlet/runtimehooks/hooks/gpu/gpu.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: json

Check failure on line 270 in pkg/koordlet/runtimehooks/hooks/gpu/gpu.go

View workflow job for this annotation

GitHub Actions / unit-tests(Run Go test)

undefined: json
if err != nil {
return fmt.Errorf("failed to marshal nodePodResources to JSON: %v", err)
}

if err := ioutil.WriteFile(filename, jsonData, 0644); err != nil {
return fmt.Errorf("failed to write JSON data to file %s: %v", filename, err)
}
klog.V(5).Infof("SaveNodePodResourcesToFile(): Saved nodePodResources to %s", filename)
return nil
}

func PeriodicSave(filename string, interval time.Duration, stopCh <-chan struct{}) {
ticker := time.NewTicker(interval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
if err := SaveNodePodResourcesToFile(filename, NodePodResources); err != nil {
klog.V(1).Infof("PeriodicSave(): SaveNodePodResourcesToFile(): Error saving nodePodResources: %v", err)
}
case <-stopCh:
fmt.Println("Stopping periodic save.")
return
}
}
}

func LoadNodePodResourcesFromFile(filePath string) ([]*pb.PodResources, error) {
klog.V(1).Infof("LoadNodePodResourcesFromFile():start to load PodResources from %s", filePath)
data, err := os.ReadFile(filePath)
if err != nil {
return nil, fmt.Errorf("failed to read file %s: %v", filePath, err)
}

var nodePodResources []*pb.PodResources
if err := json.Unmarshal(data, &nodePodResources); err != nil {

Check failure on line 307 in pkg/koordlet/runtimehooks/hooks/gpu/gpu.go

View workflow job for this annotation

GitHub Actions / unit-tests(Run Go build)

undefined: json

Check failure on line 307 in pkg/koordlet/runtimehooks/hooks/gpu/gpu.go

View workflow job for this annotation

GitHub Actions / unit-tests(Verify govet)

undefined: json

Check failure on line 307 in pkg/koordlet/runtimehooks/hooks/gpu/gpu.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: json) (typecheck)

Check failure on line 307 in pkg/koordlet/runtimehooks/hooks/gpu/gpu.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: json) (typecheck)

Check failure on line 307 in pkg/koordlet/runtimehooks/hooks/gpu/gpu.go

View workflow job for this annotation

GitHub Actions / unit-tests(Run Go test)

undefined: json
return nil, fmt.Errorf("failed to unmarshal JSON data from file %s: %v", filePath, err)
}

klog.V(1).Infof("LoadNodePodResourcesFromFile():Loaded %d PodResources from %s\n", len(nodePodResources), filePath)
return nodePodResources, nil
}

0 comments on commit b7d8ddb

Please sign in to comment.