Skip to content

Commit

Permalink
improve code
Browse files Browse the repository at this point in the history
  • Loading branch information
huaqing1994 committed Nov 1, 2024
1 parent a9903bc commit b5b0191
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 117 deletions.
8 changes: 4 additions & 4 deletions api/v1beta1/conditions_consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,14 +142,14 @@ const (
// usually transient and failed updating are automatically re-tried by the controller.
ExpandingRootPartitionFailedReason = "ExpandingRootPartitionFailed"

// ExpandingVMResourcesReason documents (Severity=Info) ElfMachine currently executing the
// ExpandingVMComputeResourcesReason documents (Severity=Info) ElfMachine currently executing the
// expand resources(CPU/memory) operation.
ExpandingVMResourcesReason = "ExpandingVMResources"
ExpandingVMComputeResourcesReason = "ExpandingVMComputeResources"

// ExpandingVMResourcesFailedReason (Severity=Warning) documents an ElfMachine controller detecting
// ExpandingVMComputeResourcesFailedReason (Severity=Warning) documents an ElfMachine controller detecting
// an error while expanding resources(CPU/memory); those kind of errors are usually transient and
// failed updating are automatically re-tried by the controller.
ExpandingVMResourcesFailedReason = "ExpandingVMResourcesFailed"
ExpandingVMComputeResourcesFailedReason = "ExpandingVMComputeResourcesFailed"

// RestartingKubeletReason documents (Severity=Info) ElfMachine currently executing the restart kubelet operation.
RestartingKubeletReason = "RestartingKubelet"
Expand Down
4 changes: 2 additions & 2 deletions controllers/elfmachine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1028,8 +1028,8 @@ func (r *ElfMachineReconciler) reconcileVMFailedTask(ctx goctx.Context, machineC
}
case service.IsUpdateVMTask(task) && conditions.IsFalse(machineCtx.ElfMachine, infrav1.ResourcesHotUpdatedCondition):
reason := conditions.GetReason(machineCtx.ElfMachine, infrav1.ResourcesHotUpdatedCondition)
if reason == infrav1.ExpandingVMResourcesReason || reason == infrav1.ExpandingVMResourcesFailedReason {
conditions.MarkFalse(machineCtx.ElfMachine, infrav1.ResourcesHotUpdatedCondition, infrav1.ExpandingVMResourcesFailedReason, clusterv1.ConditionSeverityWarning, errorMessage)
if reason == infrav1.ExpandingVMComputeResourcesReason || reason == infrav1.ExpandingVMComputeResourcesFailedReason {
conditions.MarkFalse(machineCtx.ElfMachine, infrav1.ResourcesHotUpdatedCondition, infrav1.ExpandingVMComputeResourcesFailedReason, clusterv1.ConditionSeverityWarning, errorMessage)
}
case service.IsPowerOnVMTask(task) || service.IsUpdateVMTask(task) || service.IsVMColdMigrationTask(task):
if machineCtx.ElfMachine.RequiresGPUDevices() {
Expand Down
103 changes: 28 additions & 75 deletions controllers/elfmachine_controller_resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,6 @@ func (r *ElfMachineReconciler) resizeVMVolume(ctx goctx.Context, machineCtx *con

// expandVMRootPartition adds new disk capacity to root partition.
func (r *ElfMachineReconciler) expandVMRootPartition(ctx goctx.Context, machineCtx *context.MachineContext) (bool, error) {
log := ctrl.LoggerFrom(ctx)

reason := conditions.GetReason(machineCtx.ElfMachine, infrav1.ResourcesHotUpdatedCondition)
if reason == "" {
return true, nil
Expand All @@ -153,61 +151,7 @@ func (r *ElfMachineReconciler) expandVMRootPartition(ctx goctx.Context, machineC
conditions.MarkFalse(machineCtx.ElfMachine, infrav1.ResourcesHotUpdatedCondition, infrav1.ExpandingRootPartitionReason, clusterv1.ConditionSeverityInfo, "")
}

if machineCtx.Machine.Status.NodeInfo == nil {
log.Info("Waiting for node exists for host agent expand vm root partition")

return false, nil
}

kubeClient, err := capiremote.NewClusterClient(ctx, "", r.Client, client.ObjectKey{Namespace: machineCtx.Cluster.Namespace, Name: machineCtx.Cluster.Name})
if err != nil {
return false, err
}

agentJob, err := hostagent.GetHostJob(ctx, kubeClient, machineCtx.ElfMachine.Namespace, hostagent.GetExpandRootPartitionJobName(machineCtx.ElfMachine))
if err != nil && !apierrors.IsNotFound(err) {
return false, err
}

if agentJob == nil {
agentJob, err = hostagent.ExpandRootPartition(ctx, kubeClient, machineCtx.ElfMachine)
if err != nil {
conditions.MarkFalse(machineCtx.ElfMachine, infrav1.ResourcesHotUpdatedCondition, infrav1.ExpandingRootPartitionFailedReason, clusterv1.ConditionSeverityInfo, err.Error())

return false, err
}

log.Info("Waiting for expanding root partition", "hostAgentJob", agentJob.Name)

return false, nil
}

switch agentJob.Status.Phase {
case agentv1.PhaseSucceeded:
log.Info("Expand root partition to root succeeded", "hostAgentJob", agentJob.Name)
case agentv1.PhaseFailed:
conditions.MarkFalse(machineCtx.ElfMachine, infrav1.ResourcesHotUpdatedCondition, infrav1.ExpandingRootPartitionFailedReason, clusterv1.ConditionSeverityWarning, agentJob.Status.FailureMessage)
log.Info("Expand root partition failed, will try again after three minutes", "hostAgentJob", agentJob.Name, "failureMessage", agentJob.Status.FailureMessage)

lastExecutionTime := agentJob.Status.LastExecutionTime
if lastExecutionTime == nil {
lastExecutionTime = &agentJob.CreationTimestamp
}
// Three minutes after the job fails, delete the job and try again.
if time.Now().After(lastExecutionTime.Add(3 * time.Minute)) {
if err := kubeClient.Delete(ctx, agentJob); err != nil {
return false, errors.Wrapf(err, "failed to delete expand root partition job %s/%s for retry", agentJob.Namespace, agentJob.Name)
}
}

return false, nil
default:
log.Info("Waiting for expanding root partition job done", "hostAgentJob", agentJob.Name, "jobStatus", agentJob.Status.Phase)

return false, nil
}

return true, nil
return r.reconcileHostJob(ctx, machineCtx, hostagent.HostAgentJobTypeExpandRootPartition)
}

// reconcileVMCPUAndMemory ensures that the vm CPU and memory are as expected.
Expand All @@ -222,8 +166,8 @@ func (r *ElfMachineReconciler) reconcileVMCPUAndMemory(ctx goctx.Context, machin

reason := conditions.GetReason(machineCtx.ElfMachine, infrav1.ResourcesHotUpdatedCondition)
if reason == "" ||
(reason != infrav1.ExpandingVMResourcesReason && reason != infrav1.ExpandingVMResourcesFailedReason) {
conditions.MarkFalse(machineCtx.ElfMachine, infrav1.ResourcesHotUpdatedCondition, infrav1.ExpandingVMResourcesReason, clusterv1.ConditionSeverityInfo, "")
(reason != infrav1.ExpandingVMComputeResourcesReason && reason != infrav1.ExpandingVMComputeResourcesFailedReason) {
conditions.MarkFalse(machineCtx.ElfMachine, infrav1.ResourcesHotUpdatedCondition, infrav1.ExpandingVMComputeResourcesReason, clusterv1.ConditionSeverityInfo, "")

// Save the condition first, and then expand the resources capacity.
// This prevents the resources expansion from succeeding but failing to save the
Expand All @@ -241,7 +185,7 @@ func (r *ElfMachineReconciler) reconcileVMCPUAndMemory(ctx goctx.Context, machin

withTaskVM, err := machineCtx.VMService.UpdateVM(vm, machineCtx.ElfMachine)
if err != nil {
conditions.MarkFalse(machineCtx.ElfMachine, infrav1.ResourcesHotUpdatedCondition, infrav1.ExpandingVMResourcesFailedReason, clusterv1.ConditionSeverityWarning, err.Error())
conditions.MarkFalse(machineCtx.ElfMachine, infrav1.ResourcesHotUpdatedCondition, infrav1.ExpandingVMComputeResourcesFailedReason, clusterv1.ConditionSeverityWarning, err.Error())

return false, errors.Wrapf(err, "failed to trigger update CPU and memory for VM %s", *vm.Name)
}
Expand All @@ -254,13 +198,11 @@ func (r *ElfMachineReconciler) reconcileVMCPUAndMemory(ctx goctx.Context, machin
}

func (r *ElfMachineReconciler) restartKubelet(ctx goctx.Context, machineCtx *context.MachineContext) (bool, error) {
log := ctrl.LoggerFrom(ctx)

reason := conditions.GetReason(machineCtx.ElfMachine, infrav1.ResourcesHotUpdatedCondition)
if reason == "" {
return true, nil
} else if reason != infrav1.ExpandingVMResourcesReason &&
reason != infrav1.ExpandingVMResourcesFailedReason &&
} else if reason != infrav1.ExpandingVMComputeResourcesReason &&
reason != infrav1.ExpandingVMComputeResourcesFailedReason &&
reason != infrav1.RestartingKubeletReason &&
reason != infrav1.RestartingKubeletFailedReason {
return true, nil
Expand All @@ -270,9 +212,15 @@ func (r *ElfMachineReconciler) restartKubelet(ctx goctx.Context, machineCtx *con
conditions.MarkFalse(machineCtx.ElfMachine, infrav1.ResourcesHotUpdatedCondition, infrav1.RestartingKubeletReason, clusterv1.ConditionSeverityInfo, "")
}

return r.reconcileHostJob(ctx, machineCtx, hostagent.HostAgentJobTypeRestartKubelet)
}

func (r *ElfMachineReconciler) reconcileHostJob(ctx goctx.Context, machineCtx *context.MachineContext, jobType hostagent.HostAgentJobType) (bool, error) {
log := ctrl.LoggerFrom(ctx)

// Agent needs to wait for the node exists before it can run and execute commands.
if machineCtx.Machine.Status.NodeInfo == nil {
log.Info("Waiting for node exists for host agent expand vm root partition")
log.Info("Waiting for node exists for host agent job", "jobType", jobType)

return false, nil
}
Expand All @@ -282,30 +230,35 @@ func (r *ElfMachineReconciler) restartKubelet(ctx goctx.Context, machineCtx *con
return false, err
}

agentJob, err := hostagent.GetHostJob(ctx, kubeClient, machineCtx.ElfMachine.Namespace, hostagent.GetRestartKubeletJobName(machineCtx.ElfMachine))
agentJob, err := hostagent.GetHostJob(ctx, kubeClient, machineCtx.ElfMachine.Namespace, hostagent.GetJobName(machineCtx.ElfMachine, jobType))
if err != nil && !apierrors.IsNotFound(err) {
return false, err
}

if agentJob == nil {
agentJob, err = hostagent.RestartMachineKubelet(ctx, kubeClient, machineCtx.ElfMachine)
if err != nil {
conditions.MarkFalse(machineCtx.ElfMachine, infrav1.ResourcesHotUpdatedCondition, infrav1.RestartingKubeletFailedReason, clusterv1.ConditionSeverityInfo, err.Error())
agentJob = hostagent.GenerateJob(machineCtx.ElfMachine, jobType)
if err = kubeClient.Create(ctx, agentJob); err != nil {
conditions.MarkFalse(machineCtx.ElfMachine, infrav1.ResourcesHotUpdatedCondition, infrav1.ExpandingRootPartitionFailedReason, clusterv1.ConditionSeverityInfo, err.Error())

return false, err
}

log.Info("Waiting for resting kubelet to expanding CPU and memory", "hostAgentJob", agentJob.Name)
log.Info("Waiting for job to complete", "hostAgentJob", agentJob.Name)

return false, nil
}

switch agentJob.Status.Phase {
case agentv1.PhaseSucceeded:
log.Info("Expand CPU and memory succeeded", "hostAgentJob", agentJob.Name)
log.Info("HostJob succeeded", "hostAgentJob", agentJob.Name)
case agentv1.PhaseFailed:
conditions.MarkFalse(machineCtx.ElfMachine, infrav1.ResourcesHotUpdatedCondition, infrav1.RestartingKubeletFailedReason, clusterv1.ConditionSeverityWarning, agentJob.Status.FailureMessage)
log.Info("Expand CPU and memory failed, will try again after three minutes", "hostAgentJob", agentJob.Name, "failureMessage", agentJob.Status.FailureMessage)
switch jobType {
case hostagent.HostAgentJobTypeExpandRootPartition:
conditions.MarkFalse(machineCtx.ElfMachine, infrav1.ResourcesHotUpdatedCondition, infrav1.ExpandingRootPartitionFailedReason, clusterv1.ConditionSeverityWarning, agentJob.Status.FailureMessage)
case hostagent.HostAgentJobTypeRestartKubelet:
conditions.MarkFalse(machineCtx.ElfMachine, infrav1.ResourcesHotUpdatedCondition, infrav1.RestartingKubeletFailedReason, clusterv1.ConditionSeverityWarning, agentJob.Status.FailureMessage)
}
log.Info("HostJob failed, will try again after three minutes", "hostAgentJob", agentJob.Name, "failureMessage", agentJob.Status.FailureMessage)

lastExecutionTime := agentJob.Status.LastExecutionTime
if lastExecutionTime == nil {
Expand All @@ -314,13 +267,13 @@ func (r *ElfMachineReconciler) restartKubelet(ctx goctx.Context, machineCtx *con
// Three minutes after the job fails, delete the job and try again.
if time.Now().After(lastExecutionTime.Add(3 * time.Minute)) {
if err := kubeClient.Delete(ctx, agentJob); err != nil {
return false, errors.Wrapf(err, "failed to delete expand CPU and memory job %s/%s for retry", agentJob.Namespace, agentJob.Name)
return false, errors.Wrapf(err, "failed to delete hostJob %s/%s for retry", agentJob.Namespace, agentJob.Name)
}
}

return false, nil
default:
log.Info("Waiting for expanding CPU and memory job done", "hostAgentJob", agentJob.Name, "jobStatus", agentJob.Status.Phase)
log.Info("Waiting for HostJob done", "hostAgentJob", agentJob.Name, "jobStatus", agentJob.Status.Phase)

return false, nil
}
Expand Down
Loading

0 comments on commit b5b0191

Please sign in to comment.