Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Enhance GPU metrics collection and error handling in vGPU monitor #827

Merged
merged 1 commit into from
Feb 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion charts/hami/templates/device-plugin/daemonsetnvidia.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,11 @@ spec:
- name: vgpu-monitor
image: {{ .Values.devicePlugin.image }}:{{ .Values.version }}
imagePullPolicy: {{ .Values.devicePlugin.imagePullPolicy | quote }}
command: ["vGPUmonitor"]
command:
- "vGPUmonitor"
{{- range .Values.devicePlugin.extraArgs }}
- {{ . }}
{{- end }}
securityContext:
allowPrivilegeEscalation: false
capabilities:
Expand Down
31 changes: 8 additions & 23 deletions cmd/vGPUmonitor/feedback.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"os"
"sort"
"strings"
"time"

"github.com/Project-HAMi/HAMi/pkg/monitor/nvidia"

Expand Down Expand Up @@ -233,43 +232,29 @@ func Observe(lister *nvidia.ContainerLister) {
utilizationSwitch := c.Info.GetUtilizationSwitch()
if CheckBlocking(utSwitchOn, priority, c) {
if recentKernel >= 0 {
klog.Infof("utSwitchon=%v", utSwitchOn)
klog.Infof("Setting Blocking to on %v", idx)
klog.V(5).Infof("utSwitchon=%v", utSwitchOn)
klog.V(5).Infof("Setting Blocking to on %v", idx)
c.Info.SetRecentKernel(-1)
}
} else {
if recentKernel < 0 {
klog.Infof("utSwitchon=%v", utSwitchOn)
klog.Infof("Setting Blocking to off %v", idx)
klog.V(5).Infof("utSwitchon=%v", utSwitchOn)
klog.V(5).Infof("Setting Blocking to off %v", idx)
c.Info.SetRecentKernel(0)
}
}
if CheckPriority(utSwitchOn, priority, c) {
if utilizationSwitch != 1 {
klog.Infof("utSwitchon=%v", utSwitchOn)
klog.Infof("Setting UtilizationSwitch to on %v", idx)
klog.V(5).Infof("utSwitchon=%v", utSwitchOn)
klog.V(5).Infof("Setting UtilizationSwitch to on %v", idx)
c.Info.SetUtilizationSwitch(1)
}
} else {
if utilizationSwitch != 0 {
klog.Infof("utSwitchon=%v", utSwitchOn)
klog.Infof("Setting UtilizationSwitch to off %v", idx)
klog.V(5).Infof("utSwitchon=%v", utSwitchOn)
klog.V(5).Infof("Setting UtilizationSwitch to off %v", idx)
c.Info.SetUtilizationSwitch(0)
}
}
}
}

func watchAndFeedback(lister *nvidia.ContainerLister) {
nvml.Init()
for {
time.Sleep(time.Second * 5)
err := lister.Update()
if err != nil {
klog.Errorf("Failed to update container list: %v", err)
continue
}
//klog.Infof("WatchAndFeedback srPodList=%v", srPodList)
Observe(lister)
}
}
149 changes: 138 additions & 11 deletions cmd/vGPUmonitor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,158 @@ limitations under the License.
package main

import (
"context"
"fmt"
"net/http"
"os"
"os/signal"
"sync"
"syscall"
"time"

"github.com/Project-HAMi/HAMi/pkg/monitor/nvidia"
"github.com/Project-HAMi/HAMi/pkg/util"
"github.com/Project-HAMi/HAMi/pkg/util/flag"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"

"github.com/NVIDIA/go-nvml/pkg/nvml"
"github.com/spf13/cobra"
"k8s.io/klog/v2"
)

//var addr = flag.String("listen-address", ":9394", "The address to listen on for HTTP requests.")

//const shared_directory = "/usr/local/vgpu/shared"
var (
rootCmd = &cobra.Command{
Use: "vGPUmonitor",
Short: "Hami vgpu vGPUmonitor",
Run: func(cmd *cobra.Command, args []string) {
flag.PrintPFlags(cmd.Flags())
start()
},
}
)

func main() {
func init() {
rootCmd.Flags().SortFlags = false
rootCmd.PersistentFlags().SortFlags = false
rootCmd.Flags().AddGoFlagSet(util.InitKlogFlags())
}

func start() {
if err := ValidateEnvVars(); err != nil {
klog.Fatalf("Failed to validate environment variables: %v", err)
}

containerLister, err := nvidia.NewContainerLister()
if err != nil {
klog.Fatalf("Failed to create container lister: %v", err)
}
cgroupDriver = 0
errchannel := make(chan error)
//go serveInfo(errchannel)
go initMetrics(containerLister)
go watchAndFeedback(containerLister)

cgroupDriver = 0 // Explicitly initialize

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var wg sync.WaitGroup
errCh := make(chan error, 2)

// Start the metrics service
wg.Add(1)
go func() {
defer wg.Done()
if err := initMetrics(ctx, containerLister); err != nil {
errCh <- err
}
}()

// Start the monitoring and feedback service
wg.Add(1)
go func() {
defer wg.Done()
if err := watchAndFeedback(ctx, containerLister); err != nil {
errCh <- err
}
}()

// Capture system signals
signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, syscall.SIGINT, syscall.SIGTERM)

select {
case sig := <-signalCh:
klog.Infof("Received signal: %s", sig)
cancel()
case err := <-errCh:
klog.Errorf("Received error: %v", err)
cancel()
}

// Wait for all goroutines to complete
wg.Wait()
close(errCh)
}

func initMetrics(ctx context.Context, containerLister *nvidia.ContainerLister) error {
klog.V(4).Info("Initializing metrics for vGPUmonitor")
reg := prometheus.NewRegistry()
//reg := prometheus.NewPedanticRegistry()

// Construct cluster managers. In real code, we would assign them to
// variables to then do something with them.
NewClusterManager("vGPU", reg, containerLister)
//NewClusterManager("ca", reg)

// Uncomment to add the standard process and Go metrics to the custom registry.
//reg.MustRegister(
// prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{}),
// prometheus.NewGoCollector(),
//)

http.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{}))
server := &http.Server{Addr: ":9394", Handler: nil}

// Starting the HTTP server in a goroutine
go func() {
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
klog.Errorf("Failed to serve metrics: %v", err)
}
}()

// Graceful shutdown on context cancellation
<-ctx.Done()
klog.V(4).Info("Shutting down metrics server")
if err := server.Shutdown(context.Background()); err != nil {
return err
}

return nil
}

func watchAndFeedback(ctx context.Context, lister *nvidia.ContainerLister) error {
if nvret := nvml.Init(); nvret != nvml.SUCCESS {
return fmt.Errorf("failed to initialize NVML: %s", nvml.ErrorString(nvret))
}
defer nvml.Shutdown()

for {
err := <-errchannel
klog.Errorf("failed to serve: %v", err)
select {
case <-ctx.Done():
klog.Info("Shutting down watchAndFeedback")
return nil
case <-time.After(time.Second * 5):
if err := lister.Update(); err != nil {
klog.Errorf("Failed to update container list: %v", err)
continue
}
//klog.Infof("WatchAndFeedback srPodList=%v", srPodList)
Observe(lister)
}
}
}

func main() {
if err := rootCmd.Execute(); err != nil {
klog.Fatal(err)
}
}
Loading