diff --git a/cmd/kwok-gpu-device-plugin/main.go b/cmd/kwok-gpu-device-plugin/main.go index d781784..822f3f2 100644 --- a/cmd/kwok-gpu-device-plugin/main.go +++ b/cmd/kwok-gpu-device-plugin/main.go @@ -11,6 +11,6 @@ func main() { requiredEnvVars := []string{constants.EnvTopologyCmName, constants.EnvTopologyCmNamespace, constants.EnvFakeGpuOperatorNs} config.ValidateConfig(requiredEnvVars) - appRunner := app.NewAppRunner(&status_updater.StatusUpdaterApp{}) + appRunner := app.NewAppRunner(&status_updater.KWOKDevicePluginApp{}) appRunner.Run() } diff --git a/internal/kwok-gpu-device-plugin/app.go b/internal/kwok-gpu-device-plugin/app.go index f2487d8..9e375bf 100644 --- a/internal/kwok-gpu-device-plugin/app.go +++ b/internal/kwok-gpu-device-plugin/app.go @@ -1,8 +1,6 @@ package kwokgdp import ( - "sync" - "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" @@ -24,39 +22,27 @@ var DynamicClientFn = func(c *rest.Config) dynamic.Interface { return dynamic.NewForConfigOrDie(c) } -type StatusUpdaterAppConfiguration struct { - TopologyCmName string `mapstructure:"TOPOLOGY_CM_NAME" validate:"required"` - TopologyCmNamespace string `mapstructure:"TOPOLOGY_CM_NAMESPACE" validate:"required"` -} - -type StatusUpdaterApp struct { +type KWOKDevicePluginApp struct { Controllers []controllers.Interface kubeClient kubernetes.Interface stopCh chan struct{} - wg *sync.WaitGroup } -func (app *StatusUpdaterApp) Run() { - app.wg.Add(len(app.Controllers)) +func (app *KWOKDevicePluginApp) Run() { for _, controller := range app.Controllers { go func(controller controllers.Interface) { - defer app.wg.Done() controller.Run(app.stopCh) }(controller) } - - app.wg.Wait() } -func (app *StatusUpdaterApp) Init(stopCh chan struct{}) { +func (app *KWOKDevicePluginApp) Init(stopCh chan struct{}) { app.stopCh = stopCh clusterConfig := InClusterConfigFn() clusterConfig.QPS = 100 clusterConfig.Burst = 200 - app.wg = &sync.WaitGroup{} - app.kubeClient = KubeClientFn(clusterConfig) app.Controllers = append( @@ -66,12 +52,6 @@ func (app *StatusUpdaterApp) Init(stopCh chan struct{}) { ) } -func (app *StatusUpdaterApp) Name() string { +func (app *KWOKDevicePluginApp) Name() string { return "StatusUpdater" } - -func (app *StatusUpdaterApp) GetConfig() interface{} { - var config StatusUpdaterAppConfiguration - - return config -} diff --git a/internal/kwok-gpu-device-plugin/app_test.go b/internal/kwok-gpu-device-plugin/app_test.go new file mode 100644 index 0000000..781f6ee --- /dev/null +++ b/internal/kwok-gpu-device-plugin/app_test.go @@ -0,0 +1,146 @@ +package kwokgdp + +import ( + "context" + "sync" + "testing" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/spf13/viper" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/fake" + + "github.com/run-ai/fake-gpu-operator/internal/common/constants" + "github.com/run-ai/fake-gpu-operator/internal/common/topology" + cmcontroller "github.com/run-ai/fake-gpu-operator/internal/kwok-gpu-device-plugin/controllers/configmap" + "github.com/run-ai/fake-gpu-operator/internal/status-updater/controllers" +) + +const ( + gpuOperatorNamespace = "gpu-operator" + nodePoolLabelKey = "run.ai/node-pool" + defaultNodePoolName = "default" +) + +func TestKwokGpuDevicePlugin(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "KwokGpuDevicePlugin Suite") +} + +var _ = Describe("KwokGpuDevicePlugin", func() { + var ( + app *KWOKDevicePluginApp + kubeClient kubernetes.Interface + stopChan chan struct{} + wg *sync.WaitGroup + ) + + BeforeEach(func() { + clusterTopology := topology.ClusterTopology{ + NodePoolLabelKey: nodePoolLabelKey, + NodePools: map[string]topology.NodePoolTopology{ + defaultNodePoolName: { + GpuCount: 4, + GpuMemory: 1000, + GpuProduct: "nvidia-tesla-t4", + }, + }, + MigStrategy: "none", + } + clusterTopologyCM, err := topology.ToClusterTopologyCM(&clusterTopology) + Expect(err).ToNot(HaveOccurred()) + clusterTopologyCM.Name = "cluster-topology" + clusterTopologyCM.Namespace = gpuOperatorNamespace + + kubeClient = fake.NewSimpleClientset(clusterTopologyCM) + stopChan = make(chan struct{}) + + viper.SetDefault(constants.EnvTopologyCmName, clusterTopologyCM.Name) + viper.SetDefault(constants.EnvTopologyCmNamespace, gpuOperatorNamespace) + + app = &KWOKDevicePluginApp{ + Controllers: []controllers.Interface{ + cmcontroller.NewConfigMapController( + kubeClient, gpuOperatorNamespace, + ), + }, + kubeClient: kubeClient, + stopCh: stopChan, + } + wg = &sync.WaitGroup{} + wg.Add(1) + go func() { + app.Run() + wg.Done() + }() + }) + + AfterEach(func() { + close(stopChan) + wg.Wait() + }) + + Context("app", func() { + It("should run until channel is closed", func() {}) + + Context("ConfigMap", func() { + It("should handle new Config Map without node labels", func() { + _, err := kubeClient.CoreV1().ConfigMaps(gpuOperatorNamespace).Create(context.TODO(), &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "configmap1", + Namespace: gpuOperatorNamespace, + }, + }, metav1.CreateOptions{}) + Expect(err).ToNot(HaveOccurred()) + }) + + It("should add gpu devices to kwok nodes by configmap data", func() { + node1 := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + Labels: map[string]string{ + nodePoolLabelKey: defaultNodePoolName, + }, + Annotations: map[string]string{ + constants.AnnotationKwokNode: "fake", + }, + }, + } + _, err := kubeClient.CoreV1().Nodes().Create(context.TODO(), node1, metav1.CreateOptions{}) + Expect(err).ToNot(HaveOccurred()) + + nodeTopology := topology.NodeTopology{ + GpuMemory: 1000, + GpuProduct: "nvidia-tesla-t4", + Gpus: []topology.GpuDetails{ + {ID: "fake-gpu-id-1", Status: topology.GpuStatus{}}, + {ID: "fake-gpu-id-2", Status: topology.GpuStatus{}}, + {ID: "fake-gpu-id-3", Status: topology.GpuStatus{}}, + {ID: "fake-gpu-id-4", Status: topology.GpuStatus{}}, + }, + } + cm, err := topology.ToNodeTopologyCM(&nodeTopology, node1.Name) + Expect(err).ToNot(HaveOccurred()) + cm.Namespace = gpuOperatorNamespace + + _, err = kubeClient.CoreV1().ConfigMaps(gpuOperatorNamespace).Create(context.TODO(), cm, metav1.CreateOptions{}) + Expect(err).ToNot(HaveOccurred()) + + Eventually(func() bool { + node, err := kubeClient.CoreV1().Nodes().Get(context.TODO(), node1.Name, metav1.GetOptions{}) + if err != nil { + return false + } + + gpuQuantity := node.Status.Capacity[constants.GpuResourceName] + return gpuQuantity.Value() == int64(4) + }, 2*time.Second, 100*time.Millisecond).Should(BeTrue()) + }) + }) + }) +}) diff --git a/internal/kwok-gpu-device-plugin/controllers/configmap/controller.go b/internal/kwok-gpu-device-plugin/controllers/configmap/controller.go index 3cd0296..6efd194 100644 --- a/internal/kwok-gpu-device-plugin/controllers/configmap/controller.go +++ b/internal/kwok-gpu-device-plugin/controllers/configmap/controller.go @@ -44,6 +44,7 @@ func NewConfigMapController( kubeClient: kubeClient, cmInformer: informerFactory.Core().V1().ConfigMaps().Informer(), nodeLister: informerFactory.Core().V1().Nodes().Lister(), + informerFactory: informerFactory, handler: cmhandler.NewConfigMapHandler(kubeClient, clusterTopology), clusterTopology: clusterTopology, } diff --git a/internal/kwok-gpu-device-plugin/handlers/configmap/handler.go b/internal/kwok-gpu-device-plugin/handlers/configmap/handler.go index 5cc5332..92683d6 100644 --- a/internal/kwok-gpu-device-plugin/handlers/configmap/handler.go +++ b/internal/kwok-gpu-device-plugin/handlers/configmap/handler.go @@ -33,11 +33,11 @@ func NewConfigMapHandler(kubeClient kubernetes.Interface, clusterTopology *topol } func (p *ConfigMapHandler) HandleAdd(cm *v1.ConfigMap, node *v1.Node) error { - log.Printf("Handling node addition: %s\n", cm.Name) + log.Printf("Handling config map addition: %s\n", cm.Name) nodeTopology, err := topology.FromNodeTopologyCM(cm) if err != nil { - return fmt.Errorf("failed to create node topology ConfigMap: %w", err) + return fmt.Errorf("failed to read node topology ConfigMap: %w", err) } return p.applyFakeDevicePlugin(len(nodeTopology.Gpus), node)