From 347fc25452285300253273b4a69402121e20689e Mon Sep 17 00:00:00 2001 From: haijianyang Date: Tue, 5 Dec 2023 02:07:39 -0500 Subject: [PATCH] Cache placement group to reduce Tower API requests --- controllers/elfcluster_controller.go | 9 +++-- controllers/elfcluster_controller_test.go | 9 +++-- .../elfmachine_controller_placement_group.go | 18 +++++++++- controllers/elfmachine_controller_test.go | 29 ++++++++++++++++ controllers/tower_cache.go | 34 +++++++++++++++++++ controllers/tower_cache_test.go | 16 +++++++++ pkg/service/mock_services/vm_mock.go | 4 +-- pkg/service/vm.go | 21 +++++++----- 8 files changed, 123 insertions(+), 17 deletions(-) diff --git a/controllers/elfcluster_controller.go b/controllers/elfcluster_controller.go index a09f47c7..4d322f82 100644 --- a/controllers/elfcluster_controller.go +++ b/controllers/elfcluster_controller.go @@ -231,10 +231,13 @@ func (r *ElfClusterReconciler) reconcileDelete(ctx *context.ClusterContext) (rec func (r *ElfClusterReconciler) reconcileDeleteVMPlacementGroups(ctx *context.ClusterContext) (bool, error) { placementGroupPrefix := towerresources.GetVMPlacementGroupNamePrefix(ctx.Cluster) - if pgCount, err := ctx.VMService.DeleteVMPlacementGroupsByNamePrefix(ctx, placementGroupPrefix); err != nil { + if pgNames, err := ctx.VMService.DeleteVMPlacementGroupsByNamePrefix(ctx, placementGroupPrefix); err != nil { return false, err - } else if pgCount > 0 { - ctx.Logger.Info(fmt.Sprintf("Waiting for the placement groups with name prefix %s to be deleted", placementGroupPrefix), "count", pgCount) + } else if len(pgNames) > 0 { + ctx.Logger.Info(fmt.Sprintf("Waiting for the placement groups with name prefix %s to be deleted", placementGroupPrefix), "count", len(pgNames)) + + // Delete placement group caches. + delPGCaches(pgNames) return false, nil } else { diff --git a/controllers/elfcluster_controller_test.go b/controllers/elfcluster_controller_test.go index ac04f7dd..b9aa5158 100644 --- a/controllers/elfcluster_controller_test.go +++ b/controllers/elfcluster_controller_test.go @@ -207,7 +207,7 @@ var _ = Describe("ElfClusterReconciler", func() { reconciler := &ElfClusterReconciler{ControllerContext: ctrlContext, NewVMService: mockNewVMService} elfClusterKey := capiutil.ObjectKey(elfCluster) - mockVMService.EXPECT().DeleteVMPlacementGroupsByNamePrefix(gomock.Any(), towerresources.GetVMPlacementGroupNamePrefix(cluster)).Return(0, errors.New("some error")) + mockVMService.EXPECT().DeleteVMPlacementGroupsByNamePrefix(gomock.Any(), towerresources.GetVMPlacementGroupNamePrefix(cluster)).Return(nil, errors.New("some error")) result, err := reconciler.Reconcile(ctx, ctrl.Request{NamespacedName: elfClusterKey}) Expect(result).To(BeZero()) @@ -215,15 +215,18 @@ var _ = Describe("ElfClusterReconciler", func() { task.Status = models.NewTaskStatus(models.TaskStatusSUCCESSED) logBuffer.Reset() + pg := fake.NewVMPlacementGroup(nil) + setPGCache(pg) placementGroupPrefix := towerresources.GetVMPlacementGroupNamePrefix(cluster) - mockVMService.EXPECT().DeleteVMPlacementGroupsByNamePrefix(gomock.Any(), placementGroupPrefix).Return(1, nil) + mockVMService.EXPECT().DeleteVMPlacementGroupsByNamePrefix(gomock.Any(), placementGroupPrefix).Return([]string{*pg.Name}, nil) result, err = reconciler.Reconcile(ctx, ctrl.Request{NamespacedName: elfClusterKey}) Expect(result).NotTo(BeZero()) Expect(err).NotTo(HaveOccurred()) Expect(logBuffer.String()).To(ContainSubstring(fmt.Sprintf("Waiting for the placement groups with name prefix %s to be deleted", placementGroupPrefix))) + Expect(getPGFromCache(*pg.Name)).To(BeNil()) logBuffer.Reset() - mockVMService.EXPECT().DeleteVMPlacementGroupsByNamePrefix(gomock.Any(), placementGroupPrefix).Return(0, nil) + mockVMService.EXPECT().DeleteVMPlacementGroupsByNamePrefix(gomock.Any(), placementGroupPrefix).Return(nil, nil) mockVMService.EXPECT().DeleteLabel(towerresources.GetVMLabelClusterName(), elfCluster.Name, true).Return("labelid", nil) mockVMService.EXPECT().DeleteLabel(towerresources.GetVMLabelVIP(), elfCluster.Spec.ControlPlaneEndpoint.Host, false).Return("labelid", nil) mockVMService.EXPECT().DeleteLabel(towerresources.GetVMLabelNamespace(), elfCluster.Namespace, true).Return("", nil) diff --git a/controllers/elfmachine_controller_placement_group.go b/controllers/elfmachine_controller_placement_group.go index 4903f32f..bb25319f 100644 --- a/controllers/elfmachine_controller_placement_group.go +++ b/controllers/elfmachine_controller_placement_group.go @@ -116,7 +116,7 @@ func (r *ElfMachineReconciler) createPlacementGroup(ctx *context.MachineContext, ctx.Logger.Info("Creating placement group succeeded", "taskID", *task.ID, "placementGroup", placementGroupName) - placementGroup, err := ctx.VMService.GetVMPlacementGroup(placementGroupName) + placementGroup, err := r.getPlacementGroup(ctx, placementGroupName) if err != nil { return nil, err } @@ -345,7 +345,14 @@ func (r *ElfMachineReconciler) getAvailableHostsForVM(ctx *context.MachineContex return availableHosts } +// getPlacementGroup returns the specified placement group. +// getPlacementGroup will get the placement group from the cache first. +// If the placement group does not exist in the cache, it will be fetched from Tower and saved to the cache(expiration time is 10s). func (r *ElfMachineReconciler) getPlacementGroup(ctx *context.MachineContext, placementGroupName string) (*models.VMPlacementGroup, error) { + if placementGroup := getPGFromCache(placementGroupName); placementGroup != nil { + return placementGroup, nil + } + placementGroup, err := ctx.VMService.GetVMPlacementGroup(placementGroupName) if err != nil { return nil, err @@ -358,6 +365,9 @@ func (r *ElfMachineReconciler) getPlacementGroup(ctx *context.MachineContext, pl return nil, nil } + // Save placement group cache. + setPGCache(placementGroup) + return placementGroup, nil } @@ -563,6 +573,9 @@ func (r *ElfMachineReconciler) addVMsToPlacementGroup(ctx *context.MachineContex return err } + // Delete placement group cache. + delPGCaches([]string{*placementGroup.Name}) + taskID := *task.ID task, err = ctx.VMService.WaitTask(ctx, taskID, config.WaitTaskTimeoutForPlacementGroupOperation, config.WaitTaskInterval) if err != nil { @@ -638,6 +651,9 @@ func (r *ElfMachineReconciler) deletePlacementGroup(ctx *context.MachineContext) return false, nil } else { ctx.Logger.Info(fmt.Sprintf("Placement group %s deleted", *placementGroup.Name)) + + // Delete placement group cache. + delPGCaches([]string{*placementGroup.Name}) } return true, nil diff --git a/controllers/elfmachine_controller_test.go b/controllers/elfmachine_controller_test.go index 5631af65..3bb61f99 100644 --- a/controllers/elfmachine_controller_test.go +++ b/controllers/elfmachine_controller_test.go @@ -1136,12 +1136,14 @@ var _ = Describe("ElfMachineReconciler", func() { mockVMService.EXPECT().FindByIDs([]string{*vm2.ID}).Return([]*models.VM{vm2}, nil) mockVMService.EXPECT().AddVMsToPlacementGroup(placementGroup, gomock.Any()).Return(task, nil) mockVMService.EXPECT().WaitTask(gomock.Any(), *task.ID, config.WaitTaskTimeoutForPlacementGroupOperation, config.WaitTaskInterval).Return(task, nil) + setPGCache(placementGroup) reconciler := &ElfMachineReconciler{ControllerContext: ctrlContext, NewVMService: mockNewVMService} ok, err := reconciler.joinPlacementGroup(machineContext, vm) Expect(ok).To(BeTrue()) Expect(err).To(BeZero()) Expect(logBuffer.String()).To(ContainSubstring("Updating placement group succeeded")) + Expect(getPGFromCache(*placementGroup.Name)).To(BeNil()) }) It("should not migrate VM when VM is running and KCP is in rolling update", func() { @@ -2670,10 +2672,12 @@ var _ = Describe("ElfMachineReconciler", func() { mockVMService.EXPECT().GetVMPlacementGroup(placementGroupName).Return(placementGroup, nil) mockVMService.EXPECT().DeleteVMPlacementGroupByID(gomock.Any(), *placementGroup.ID).Return(true, nil) + setPGCache(placementGroup) reconciler = &ElfMachineReconciler{ControllerContext: ctrlContext, NewVMService: mockNewVMService} ok, err = reconciler.deletePlacementGroup(machineContext) Expect(ok).To(BeTrue()) Expect(err).NotTo(HaveOccurred()) + Expect(getPGFromCache(*placementGroup.Name)).To(BeNil()) md.DeletionTimestamp = nil md.Spec.Replicas = pointer.Int32(0) @@ -2990,6 +2994,31 @@ var _ = Describe("ElfMachineReconciler", func() { Expect(err).NotTo(HaveOccurred()) Expect(logBuffer.String()).To(ContainSubstring(fmt.Sprintf("Tower has duplicate placement group, skip creating placement group %s", placementGroupName))) }) + + It("should save and get placement group cache", func() { + ctrlContext := newCtrlContexts(elfCluster, cluster, elfMachine, machine, secret, md) + machineContext := newMachineContext(ctrlContext, elfCluster, cluster, elfMachine, machine, mockVMService) + fake.InitOwnerReferences(ctrlContext, elfCluster, cluster, elfMachine, machine) + placementGroupName, err := towerresources.GetVMPlacementGroupName(ctx, ctrlContext.Client, machine, cluster) + Expect(err).NotTo(HaveOccurred()) + placementGroup := fake.NewVMPlacementGroup(nil) + placementGroup.Name = service.TowerString(placementGroupName) + + mockVMService.EXPECT().GetVMPlacementGroup(gomock.Any()).Return(placementGroup, nil) + Expect(getPGFromCache(*placementGroup.Name)).To(BeNil()) + reconciler := &ElfMachineReconciler{ControllerContext: ctrlContext, NewVMService: mockNewVMService} + pg, err := reconciler.getPlacementGroup(machineContext, placementGroupName) + Expect(err).To(BeZero()) + Expect(pg).To(Equal(placementGroup)) + Expect(getPGFromCache(*placementGroup.Name)).To(Equal(placementGroup)) + + // Use cache + reconciler = &ElfMachineReconciler{ControllerContext: ctrlContext, NewVMService: mockNewVMService} + pg, err = reconciler.getPlacementGroup(machineContext, placementGroupName) + Expect(err).To(BeZero()) + Expect(pg).To(Equal(placementGroup)) + Expect(getPGFromCache(*placementGroup.Name)).To(Equal(placementGroup)) + }) }) Context("Reconcile static IP allocation", func() { diff --git a/controllers/tower_cache.go b/controllers/tower_cache.go index c7887867..aa4af560 100644 --- a/controllers/tower_cache.go +++ b/controllers/tower_cache.go @@ -166,6 +166,40 @@ func getKeyForDuplicatePlacementGroupError(placementGroup string) string { return fmt.Sprintf("pg:duplicate:%s", placementGroup) } +// pgCacheDuration is the lifespan of placement group cache. +const pgCacheDuration = 10 * time.Second + +func getKeyForPGCache(pgName string) string { + return fmt.Sprintf("pg:%s:cache", pgName) +} + +// setPGCache saves the specified placement group to the memory, +// which can reduce access to the Tower service. +func setPGCache(pg *models.VMPlacementGroup) { + vmTaskErrorCache.Set(getKeyForPGCache(*pg.Name), *pg, gpuCacheDuration) +} + +// delPGCaches deletes the specified placement group caches. +func delPGCaches(pgNames []string) { + for i := 0; i < len(pgNames); i++ { + vmTaskErrorCache.Delete(getKeyForPGCache(pgNames[i])) + } +} + +// getPGFromCache gets the specified placement group from the memory. +func getPGFromCache(pgName string) *models.VMPlacementGroup { + key := getKeyForPGCache(pgName) + if val, found := vmTaskErrorCache.Get(key); found { + if pg, ok := val.(models.VMPlacementGroup); ok { + return &pg + } + // Delete unexpected data. + vmTaskErrorCache.Delete(key) + } + + return nil +} + /* GPU */ // gpuCacheDuration is the lifespan of gpu cache. diff --git a/controllers/tower_cache_test.go b/controllers/tower_cache_test.go index d726bf99..e3061f00 100644 --- a/controllers/tower_cache_test.go +++ b/controllers/tower_cache_test.go @@ -170,6 +170,22 @@ var _ = Describe("TowerCache", func() { expectConditions(elfMachine, []conditionAssertion{{infrav1.VMProvisionedCondition, corev1.ConditionFalse, clusterv1.ConditionSeverityInfo, infrav1.WaitingForPlacementGroupPolicySatisfiedReason}}) }) + It("PG Cache", func() { + pgName := "pg" + pg := fake.NewVMPlacementGroup(nil) + pg.Name = &pgName + Expect(getPGFromCache(pgName)).To(BeNil()) + + setPGCache(pg) + Expect(getPGFromCache(pgName)).To(Equal(pg)) + time.Sleep(pgCacheDuration) + Expect(getPGFromCache(pgName)).To(BeNil()) + + setPGCache(pg) + delPGCaches([]string{pgName}) + Expect(getPGFromCache(pgName)).To(BeNil()) + }) + It("GPU Cache", func() { gpuID := "gpu" gpuVMInfo := models.GpuVMInfo{ID: service.TowerString(gpuID)} diff --git a/pkg/service/mock_services/vm_mock.go b/pkg/service/mock_services/vm_mock.go index fe88e19f..fd096d8d 100644 --- a/pkg/service/mock_services/vm_mock.go +++ b/pkg/service/mock_services/vm_mock.go @@ -159,10 +159,10 @@ func (mr *MockVMServiceMockRecorder) DeleteVMPlacementGroupByID(ctx, id interfac } // DeleteVMPlacementGroupsByNamePrefix mocks base method. -func (m *MockVMService) DeleteVMPlacementGroupsByNamePrefix(ctx context.Context, placementGroupName string) (int, error) { +func (m *MockVMService) DeleteVMPlacementGroupsByNamePrefix(ctx context.Context, placementGroupName string) ([]string, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "DeleteVMPlacementGroupsByNamePrefix", ctx, placementGroupName) - ret0, _ := ret[0].(int) + ret0, _ := ret[0].([]string) ret1, _ := ret[1].(error) return ret0, ret1 } diff --git a/pkg/service/vm.go b/pkg/service/vm.go index 6ec188a3..2e5c7e72 100644 --- a/pkg/service/vm.go +++ b/pkg/service/vm.go @@ -71,7 +71,7 @@ type VMService interface { GetVMPlacementGroup(name string) (*models.VMPlacementGroup, error) AddVMsToPlacementGroup(placementGroup *models.VMPlacementGroup, vmIDs []string) (*models.Task, error) DeleteVMPlacementGroupByID(ctx goctx.Context, id string) (bool, error) - DeleteVMPlacementGroupsByNamePrefix(ctx goctx.Context, placementGroupName string) (int, error) + DeleteVMPlacementGroupsByNamePrefix(ctx goctx.Context, placementGroupName string) ([]string, error) GetGPUDevicesAllocationInfoByHostIDs(hostIDs []string, gpuDeviceUsage models.GpuDeviceUsage) (GPUVMInfos, error) GetGPUDevicesAllocationInfoByIDs(gpuIDs []string) (GPUVMInfos, error) GetVMGPUAllocationInfo(id string) (*models.VMGpuInfo, error) @@ -907,9 +907,9 @@ func (svr *TowerVMService) DeleteVMPlacementGroupByID(ctx goctx.Context, id stri // DeleteVMPlacementGroupsByNamePrefix deletes placement groups by name prefix. // // The return value: -// 1. 0 indicates that all specified placements have been deleted. -// 2. > 0 indicates that the names of the placement groups being deleted. -func (svr *TowerVMService) DeleteVMPlacementGroupsByNamePrefix(ctx goctx.Context, namePrefix string) (int, error) { +// 1. Empty string array indicates that all specified placements have been deleted. +// 2. Non-empty string array indicates that the names of the placement groups being deleted. +func (svr *TowerVMService) DeleteVMPlacementGroupsByNamePrefix(ctx goctx.Context, namePrefix string) ([]string, error) { // Deleting placement groups in batches, Tower will create a deletion task // for each placement group. // Some tasks may fail, and failed tasks need to be deleted again. @@ -923,9 +923,9 @@ func (svr *TowerVMService) DeleteVMPlacementGroupsByNamePrefix(ctx goctx.Context getVMPlacementGroupsResp, err := svr.Session.VMPlacementGroup.GetVMPlacementGroups(getVMPlacementGroupsParams) if err != nil { - return 0, err + return nil, err } else if len(getVMPlacementGroupsResp.Payload) == 0 { - return 0, nil + return nil, nil } deleteVMPlacementGroupParams := clientvmplacementgroup.NewDeleteVMPlacementGroupParams() @@ -937,10 +937,15 @@ func (svr *TowerVMService) DeleteVMPlacementGroupsByNamePrefix(ctx goctx.Context } if _, err := svr.Session.VMPlacementGroup.DeleteVMPlacementGroup(deleteVMPlacementGroupParams); err != nil { - return 0, err + return nil, err + } + + pgNames := make([]string, len(getVMPlacementGroupsResp.Payload)) + for i := 0; i < len(getVMPlacementGroupsResp.Payload); i++ { + pgNames[i] = *getVMPlacementGroupsResp.Payload[i].Name } - return len(getVMPlacementGroupsResp.Payload), nil + return pgNames, nil } // GetGPUDevicesAllocationInfoByIDs returns the specified GPU devices with VMs and allocation details.