Skip to content

Commit

Permalink
Cache placement group to reduce Tower API requests
Browse files Browse the repository at this point in the history
  • Loading branch information
haijianyang committed Dec 5, 2023
1 parent 04ef554 commit 347fc25
Show file tree
Hide file tree
Showing 8 changed files with 123 additions and 17 deletions.
9 changes: 6 additions & 3 deletions controllers/elfcluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,10 +231,13 @@ func (r *ElfClusterReconciler) reconcileDelete(ctx *context.ClusterContext) (rec

func (r *ElfClusterReconciler) reconcileDeleteVMPlacementGroups(ctx *context.ClusterContext) (bool, error) {
placementGroupPrefix := towerresources.GetVMPlacementGroupNamePrefix(ctx.Cluster)
if pgCount, err := ctx.VMService.DeleteVMPlacementGroupsByNamePrefix(ctx, placementGroupPrefix); err != nil {
if pgNames, err := ctx.VMService.DeleteVMPlacementGroupsByNamePrefix(ctx, placementGroupPrefix); err != nil {
return false, err
} else if pgCount > 0 {
ctx.Logger.Info(fmt.Sprintf("Waiting for the placement groups with name prefix %s to be deleted", placementGroupPrefix), "count", pgCount)
} else if len(pgNames) > 0 {
ctx.Logger.Info(fmt.Sprintf("Waiting for the placement groups with name prefix %s to be deleted", placementGroupPrefix), "count", len(pgNames))

// Delete placement group caches.
delPGCaches(pgNames)

return false, nil
} else {
Expand Down
9 changes: 6 additions & 3 deletions controllers/elfcluster_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,23 +207,26 @@ var _ = Describe("ElfClusterReconciler", func() {
reconciler := &ElfClusterReconciler{ControllerContext: ctrlContext, NewVMService: mockNewVMService}
elfClusterKey := capiutil.ObjectKey(elfCluster)

mockVMService.EXPECT().DeleteVMPlacementGroupsByNamePrefix(gomock.Any(), towerresources.GetVMPlacementGroupNamePrefix(cluster)).Return(0, errors.New("some error"))
mockVMService.EXPECT().DeleteVMPlacementGroupsByNamePrefix(gomock.Any(), towerresources.GetVMPlacementGroupNamePrefix(cluster)).Return(nil, errors.New("some error"))

result, err := reconciler.Reconcile(ctx, ctrl.Request{NamespacedName: elfClusterKey})
Expect(result).To(BeZero())
Expect(err).To(HaveOccurred())

task.Status = models.NewTaskStatus(models.TaskStatusSUCCESSED)
logBuffer.Reset()
pg := fake.NewVMPlacementGroup(nil)
setPGCache(pg)
placementGroupPrefix := towerresources.GetVMPlacementGroupNamePrefix(cluster)
mockVMService.EXPECT().DeleteVMPlacementGroupsByNamePrefix(gomock.Any(), placementGroupPrefix).Return(1, nil)
mockVMService.EXPECT().DeleteVMPlacementGroupsByNamePrefix(gomock.Any(), placementGroupPrefix).Return([]string{*pg.Name}, nil)
result, err = reconciler.Reconcile(ctx, ctrl.Request{NamespacedName: elfClusterKey})
Expect(result).NotTo(BeZero())
Expect(err).NotTo(HaveOccurred())
Expect(logBuffer.String()).To(ContainSubstring(fmt.Sprintf("Waiting for the placement groups with name prefix %s to be deleted", placementGroupPrefix)))
Expect(getPGFromCache(*pg.Name)).To(BeNil())

logBuffer.Reset()
mockVMService.EXPECT().DeleteVMPlacementGroupsByNamePrefix(gomock.Any(), placementGroupPrefix).Return(0, nil)
mockVMService.EXPECT().DeleteVMPlacementGroupsByNamePrefix(gomock.Any(), placementGroupPrefix).Return(nil, nil)
mockVMService.EXPECT().DeleteLabel(towerresources.GetVMLabelClusterName(), elfCluster.Name, true).Return("labelid", nil)
mockVMService.EXPECT().DeleteLabel(towerresources.GetVMLabelVIP(), elfCluster.Spec.ControlPlaneEndpoint.Host, false).Return("labelid", nil)
mockVMService.EXPECT().DeleteLabel(towerresources.GetVMLabelNamespace(), elfCluster.Namespace, true).Return("", nil)
Expand Down
18 changes: 17 additions & 1 deletion controllers/elfmachine_controller_placement_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (r *ElfMachineReconciler) createPlacementGroup(ctx *context.MachineContext,

ctx.Logger.Info("Creating placement group succeeded", "taskID", *task.ID, "placementGroup", placementGroupName)

placementGroup, err := ctx.VMService.GetVMPlacementGroup(placementGroupName)
placementGroup, err := r.getPlacementGroup(ctx, placementGroupName)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -345,7 +345,14 @@ func (r *ElfMachineReconciler) getAvailableHostsForVM(ctx *context.MachineContex
return availableHosts
}

// getPlacementGroup returns the specified placement group.
// getPlacementGroup will get the placement group from the cache first.
// If the placement group does not exist in the cache, it will be fetched from Tower and saved to the cache(expiration time is 10s).
func (r *ElfMachineReconciler) getPlacementGroup(ctx *context.MachineContext, placementGroupName string) (*models.VMPlacementGroup, error) {
if placementGroup := getPGFromCache(placementGroupName); placementGroup != nil {
return placementGroup, nil
}

placementGroup, err := ctx.VMService.GetVMPlacementGroup(placementGroupName)
if err != nil {
return nil, err
Expand All @@ -358,6 +365,9 @@ func (r *ElfMachineReconciler) getPlacementGroup(ctx *context.MachineContext, pl
return nil, nil
}

// Save placement group cache.
setPGCache(placementGroup)

return placementGroup, nil
}

Expand Down Expand Up @@ -563,6 +573,9 @@ func (r *ElfMachineReconciler) addVMsToPlacementGroup(ctx *context.MachineContex
return err
}

// Delete placement group cache.
delPGCaches([]string{*placementGroup.Name})

taskID := *task.ID
task, err = ctx.VMService.WaitTask(ctx, taskID, config.WaitTaskTimeoutForPlacementGroupOperation, config.WaitTaskInterval)
if err != nil {
Expand Down Expand Up @@ -638,6 +651,9 @@ func (r *ElfMachineReconciler) deletePlacementGroup(ctx *context.MachineContext)
return false, nil
} else {
ctx.Logger.Info(fmt.Sprintf("Placement group %s deleted", *placementGroup.Name))

// Delete placement group cache.
delPGCaches([]string{*placementGroup.Name})
}

return true, nil
Expand Down
29 changes: 29 additions & 0 deletions controllers/elfmachine_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1136,12 +1136,14 @@ var _ = Describe("ElfMachineReconciler", func() {
mockVMService.EXPECT().FindByIDs([]string{*vm2.ID}).Return([]*models.VM{vm2}, nil)
mockVMService.EXPECT().AddVMsToPlacementGroup(placementGroup, gomock.Any()).Return(task, nil)
mockVMService.EXPECT().WaitTask(gomock.Any(), *task.ID, config.WaitTaskTimeoutForPlacementGroupOperation, config.WaitTaskInterval).Return(task, nil)
setPGCache(placementGroup)

reconciler := &ElfMachineReconciler{ControllerContext: ctrlContext, NewVMService: mockNewVMService}
ok, err := reconciler.joinPlacementGroup(machineContext, vm)
Expect(ok).To(BeTrue())
Expect(err).To(BeZero())
Expect(logBuffer.String()).To(ContainSubstring("Updating placement group succeeded"))
Expect(getPGFromCache(*placementGroup.Name)).To(BeNil())
})

It("should not migrate VM when VM is running and KCP is in rolling update", func() {
Expand Down Expand Up @@ -2670,10 +2672,12 @@ var _ = Describe("ElfMachineReconciler", func() {
mockVMService.EXPECT().GetVMPlacementGroup(placementGroupName).Return(placementGroup, nil)
mockVMService.EXPECT().DeleteVMPlacementGroupByID(gomock.Any(), *placementGroup.ID).Return(true, nil)

setPGCache(placementGroup)
reconciler = &ElfMachineReconciler{ControllerContext: ctrlContext, NewVMService: mockNewVMService}
ok, err = reconciler.deletePlacementGroup(machineContext)
Expect(ok).To(BeTrue())
Expect(err).NotTo(HaveOccurred())
Expect(getPGFromCache(*placementGroup.Name)).To(BeNil())

md.DeletionTimestamp = nil
md.Spec.Replicas = pointer.Int32(0)
Expand Down Expand Up @@ -2990,6 +2994,31 @@ var _ = Describe("ElfMachineReconciler", func() {
Expect(err).NotTo(HaveOccurred())
Expect(logBuffer.String()).To(ContainSubstring(fmt.Sprintf("Tower has duplicate placement group, skip creating placement group %s", placementGroupName)))
})

It("should save and get placement group cache", func() {
ctrlContext := newCtrlContexts(elfCluster, cluster, elfMachine, machine, secret, md)
machineContext := newMachineContext(ctrlContext, elfCluster, cluster, elfMachine, machine, mockVMService)
fake.InitOwnerReferences(ctrlContext, elfCluster, cluster, elfMachine, machine)
placementGroupName, err := towerresources.GetVMPlacementGroupName(ctx, ctrlContext.Client, machine, cluster)
Expect(err).NotTo(HaveOccurred())
placementGroup := fake.NewVMPlacementGroup(nil)
placementGroup.Name = service.TowerString(placementGroupName)

mockVMService.EXPECT().GetVMPlacementGroup(gomock.Any()).Return(placementGroup, nil)
Expect(getPGFromCache(*placementGroup.Name)).To(BeNil())
reconciler := &ElfMachineReconciler{ControllerContext: ctrlContext, NewVMService: mockNewVMService}
pg, err := reconciler.getPlacementGroup(machineContext, placementGroupName)
Expect(err).To(BeZero())
Expect(pg).To(Equal(placementGroup))
Expect(getPGFromCache(*placementGroup.Name)).To(Equal(placementGroup))

// Use cache
reconciler = &ElfMachineReconciler{ControllerContext: ctrlContext, NewVMService: mockNewVMService}
pg, err = reconciler.getPlacementGroup(machineContext, placementGroupName)
Expect(err).To(BeZero())
Expect(pg).To(Equal(placementGroup))
Expect(getPGFromCache(*placementGroup.Name)).To(Equal(placementGroup))
})
})

Context("Reconcile static IP allocation", func() {
Expand Down
34 changes: 34 additions & 0 deletions controllers/tower_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,40 @@ func getKeyForDuplicatePlacementGroupError(placementGroup string) string {
return fmt.Sprintf("pg:duplicate:%s", placementGroup)
}

// pgCacheDuration is the lifespan of placement group cache.
const pgCacheDuration = 10 * time.Second

func getKeyForPGCache(pgName string) string {
return fmt.Sprintf("pg:%s:cache", pgName)
}

// setPGCache saves the specified placement group to the memory,
// which can reduce access to the Tower service.
func setPGCache(pg *models.VMPlacementGroup) {
vmTaskErrorCache.Set(getKeyForPGCache(*pg.Name), *pg, gpuCacheDuration)
}

// delPGCaches deletes the specified placement group caches.
func delPGCaches(pgNames []string) {
for i := 0; i < len(pgNames); i++ {
vmTaskErrorCache.Delete(getKeyForPGCache(pgNames[i]))
}
}

// getPGFromCache gets the specified placement group from the memory.
func getPGFromCache(pgName string) *models.VMPlacementGroup {
key := getKeyForPGCache(pgName)
if val, found := vmTaskErrorCache.Get(key); found {
if pg, ok := val.(models.VMPlacementGroup); ok {
return &pg
}
// Delete unexpected data.
vmTaskErrorCache.Delete(key)

Check warning on line 197 in controllers/tower_cache.go

View check run for this annotation

Codecov / codecov/patch

controllers/tower_cache.go#L197

Added line #L197 was not covered by tests
}

return nil
}

/* GPU */

// gpuCacheDuration is the lifespan of gpu cache.
Expand Down
16 changes: 16 additions & 0 deletions controllers/tower_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,22 @@ var _ = Describe("TowerCache", func() {
expectConditions(elfMachine, []conditionAssertion{{infrav1.VMProvisionedCondition, corev1.ConditionFalse, clusterv1.ConditionSeverityInfo, infrav1.WaitingForPlacementGroupPolicySatisfiedReason}})
})

It("PG Cache", func() {
pgName := "pg"
pg := fake.NewVMPlacementGroup(nil)
pg.Name = &pgName
Expect(getPGFromCache(pgName)).To(BeNil())

setPGCache(pg)
Expect(getPGFromCache(pgName)).To(Equal(pg))
time.Sleep(pgCacheDuration)
Expect(getPGFromCache(pgName)).To(BeNil())

setPGCache(pg)
delPGCaches([]string{pgName})
Expect(getPGFromCache(pgName)).To(BeNil())
})

It("GPU Cache", func() {
gpuID := "gpu"
gpuVMInfo := models.GpuVMInfo{ID: service.TowerString(gpuID)}
Expand Down
4 changes: 2 additions & 2 deletions pkg/service/mock_services/vm_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 13 additions & 8 deletions pkg/service/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ type VMService interface {
GetVMPlacementGroup(name string) (*models.VMPlacementGroup, error)
AddVMsToPlacementGroup(placementGroup *models.VMPlacementGroup, vmIDs []string) (*models.Task, error)
DeleteVMPlacementGroupByID(ctx goctx.Context, id string) (bool, error)
DeleteVMPlacementGroupsByNamePrefix(ctx goctx.Context, placementGroupName string) (int, error)
DeleteVMPlacementGroupsByNamePrefix(ctx goctx.Context, placementGroupName string) ([]string, error)
GetGPUDevicesAllocationInfoByHostIDs(hostIDs []string, gpuDeviceUsage models.GpuDeviceUsage) (GPUVMInfos, error)
GetGPUDevicesAllocationInfoByIDs(gpuIDs []string) (GPUVMInfos, error)
GetVMGPUAllocationInfo(id string) (*models.VMGpuInfo, error)
Expand Down Expand Up @@ -907,9 +907,9 @@ func (svr *TowerVMService) DeleteVMPlacementGroupByID(ctx goctx.Context, id stri
// DeleteVMPlacementGroupsByNamePrefix deletes placement groups by name prefix.
//
// The return value:
// 1. 0 indicates that all specified placements have been deleted.
// 2. > 0 indicates that the names of the placement groups being deleted.
func (svr *TowerVMService) DeleteVMPlacementGroupsByNamePrefix(ctx goctx.Context, namePrefix string) (int, error) {
// 1. Empty string array indicates that all specified placements have been deleted.
// 2. Non-empty string array indicates that the names of the placement groups being deleted.
func (svr *TowerVMService) DeleteVMPlacementGroupsByNamePrefix(ctx goctx.Context, namePrefix string) ([]string, error) {

Check warning on line 912 in pkg/service/vm.go

View check run for this annotation

Codecov / codecov/patch

pkg/service/vm.go#L912

Added line #L912 was not covered by tests
// Deleting placement groups in batches, Tower will create a deletion task
// for each placement group.
// Some tasks may fail, and failed tasks need to be deleted again.
Expand All @@ -923,9 +923,9 @@ func (svr *TowerVMService) DeleteVMPlacementGroupsByNamePrefix(ctx goctx.Context

getVMPlacementGroupsResp, err := svr.Session.VMPlacementGroup.GetVMPlacementGroups(getVMPlacementGroupsParams)
if err != nil {
return 0, err
return nil, err

Check warning on line 926 in pkg/service/vm.go

View check run for this annotation

Codecov / codecov/patch

pkg/service/vm.go#L926

Added line #L926 was not covered by tests
} else if len(getVMPlacementGroupsResp.Payload) == 0 {
return 0, nil
return nil, nil

Check warning on line 928 in pkg/service/vm.go

View check run for this annotation

Codecov / codecov/patch

pkg/service/vm.go#L928

Added line #L928 was not covered by tests
}

deleteVMPlacementGroupParams := clientvmplacementgroup.NewDeleteVMPlacementGroupParams()
Expand All @@ -937,10 +937,15 @@ func (svr *TowerVMService) DeleteVMPlacementGroupsByNamePrefix(ctx goctx.Context
}

if _, err := svr.Session.VMPlacementGroup.DeleteVMPlacementGroup(deleteVMPlacementGroupParams); err != nil {
return 0, err
return nil, err
}

Check warning on line 941 in pkg/service/vm.go

View check run for this annotation

Codecov / codecov/patch

pkg/service/vm.go#L940-L941

Added lines #L940 - L941 were not covered by tests

pgNames := make([]string, len(getVMPlacementGroupsResp.Payload))
for i := 0; i < len(getVMPlacementGroupsResp.Payload); i++ {
pgNames[i] = *getVMPlacementGroupsResp.Payload[i].Name

Check warning on line 945 in pkg/service/vm.go

View check run for this annotation

Codecov / codecov/patch

pkg/service/vm.go#L943-L945

Added lines #L943 - L945 were not covered by tests
}

return len(getVMPlacementGroupsResp.Payload), nil
return pgNames, nil

Check warning on line 948 in pkg/service/vm.go

View check run for this annotation

Codecov / codecov/patch

pkg/service/vm.go#L948

Added line #L948 was not covered by tests
}

// GetGPUDevicesAllocationInfoByIDs returns the specified GPU devices with VMs and allocation details.
Expand Down

0 comments on commit 347fc25

Please sign in to comment.