Skip to content

Commit

Permalink
Optimize the Drbd installation process
Browse files Browse the repository at this point in the history
Signed-off-by: peng9808 <[email protected]>
  • Loading branch information
peng9808 committed Aug 13, 2024
1 parent 22658dc commit d1e2d56
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 24 deletions.
121 changes: 117 additions & 4 deletions controllers/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,20 @@ package controllers

import (
"context"
"fmt"
"github.com/hwameistor/hwameistor-operator/pkg/install/dataloadmanager"
"github.com/hwameistor/hwameistor-operator/pkg/install/datasetmanager"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"reflect"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/source"
"strings"
"time"

"k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -498,15 +509,14 @@ func (r *ClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
}

if !newInstance.Spec.DRBD.Disable {
if !newInstance.Status.DRBDAdapterCreated {
if newInstance.Status.DRBDAdapterCreated != true {
drbd.HandelDRBDConfigs(instance)
drbdAdapterJobCreatedNum, err := drbd.CreateDRBDAdapter(r.Client)
err := drbd.CreateDRBDAdapter(instance, r.Client)
if err != nil {
log.Errorf("Create DRBD Adapter err: %v", err)
log.Errorf("DRBD Install Error!: %v", err)
return ctrl.Result{}, err
} else {
newInstance.Status.DRBDAdapterCreated = true
newInstance.Status.DRBDAdapterCreatedJobNum = drbdAdapterJobCreatedNum
}
}
}
Expand Down Expand Up @@ -615,9 +625,112 @@ func (r *ClusterReconciler) SetupWithManager(mgr ctrl.Manager) error {
For(&hwameistoroperatorv1alpha1.Cluster{}).
Owns(&appsv1.DaemonSet{}).
Owns(&appsv1.Deployment{}).
Owns(&batchv1.Job{}).Watches(&source.Kind{Type: &batchv1.Job{}},
&handler.EnqueueRequestForObject{},
builder.WithPredicates(
predicate.Funcs{
UpdateFunc: r.handleDrbdJobUpdatePredicate,
DeleteFunc: r.handleDrbdJobDeletePredicate,
},
)).
Complete(r)
}

func (r *ClusterReconciler) handleDrbdJobUpdatePredicate(e event.UpdateEvent) bool {
newJob := e.ObjectNew.(*batchv1.Job)
r.handleDrbdJobUpdate(r.Client, newJob)
return true
}

func (r *ClusterReconciler) handleDrbdJobDeletePredicate(e event.DeleteEvent) bool {
job := e.Object.(*batchv1.Job)
r.handleDrbdJobDelete(r.Client, job)
return false
}

func (r *ClusterReconciler) handleDrbdJobUpdate(cli client.Client, job *batchv1.Job) error {
if !isJobComplete(job) {
return nil
}
log.Infof("Job %s has successfully completed.", job.Name)
var nodeList corev1.NodeList
if err := cli.List(context.TODO(), &nodeList); err != nil {
log.Errorf("List nodes err: %v", err)
return err
}
var jobList batchv1.JobList
if err := cli.List(context.TODO(), &jobList, client.InNamespace("hwameistor")); err != nil {
log.Errorf("List jobs err: %v", err)
return err
}
count := 0
for _, item := range jobList.Items {
if strings.HasPrefix(item.Name, "drbd-adapter") && isJobComplete(&item) {
count++
}
}

if count >= len(nodeList.Items) {
instance := &hwameistoroperatorv1alpha1.Cluster{}
if err := cli.Get(context.TODO(), types.NamespacedName{Name: "hwameistor-cluster", Namespace: "hwameistor"}, instance); err != nil {
log.Errorf("Get cluster instance err: %v", err)
return err
}
cluster := instance.DeepCopy()
cluster.Status.DRBDAdapterCreated = true
cluster.Status.DRBDAdapterCreatedJobNum = count
if err := cli.Status().Update(context.TODO(), cluster); err != nil {
log.Errorf("Update status err: %v", err)
return err
}
}

return nil
}

func (r *ClusterReconciler) handleDrbdJobDelete(cli client.Client, job *batchv1.Job) error {
instance := &hwameistoroperatorv1alpha1.Cluster{}
if err := cli.Get(context.TODO(), types.NamespacedName{Name: "hwameistor-cluster", Namespace: "hwameistor"}, instance); err != nil {
log.Errorf("Get cluster instance err: %v", err)
return err
}
if instance.Status.DRBDAdapterCreatedJobNum > 0 {
return nil
}

newJob := job.DeepCopy() // Create a copy of the deleted job
// Generate a unique name for the new Job
newJobName := fmt.Sprintf("%s-%d", job.Name, time.Now().Unix())
newJob.ObjectMeta = metav1.ObjectMeta{
Name: newJobName, // Use a unique name
Namespace: job.Namespace,
Labels: job.Labels,
}

// Reset status and other fields that should be unique for the new Job
newJob.Status = batchv1.JobStatus{}
newJob.ResourceVersion = ""
newJob.UID = ""

// Create the new Job
if err := cli.Create(context.TODO(), newJob); err != nil {
log.Errorf("Failed to create Job: %v", err)
return err
}

log.Infof("Successfully recreated Job %s in namespace %s", newJob.Name, newJob.Namespace)
return nil
}

func isJobComplete(job *batchv1.Job) bool {
for _, condition := range job.Status.Conditions {
if condition.Type == batchv1.JobComplete && condition.Status == corev1.ConditionTrue {
return true
}
}
return false
}

func FulfillClusterInstance(clusterInstance *hwameistoroperatorv1alpha1.Cluster) *hwameistoroperatorv1alpha1.Cluster {
newClusterInstance := clusterInstance.DeepCopy()

Expand Down
3 changes: 2 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package main
import (
"flag"
"fmt"
batchv1 "k8s.io/api/batch/v1"
"os"
"path"
"runtime"
Expand Down Expand Up @@ -77,7 +78,7 @@ func main() {
kubeconfig.Set(ctrl.GetConfigOrDie())

mgr, err := ctrl.NewManager(kubeconfig.Get(), ctrl.Options{
ClientDisableCacheFor: []client.Object{&hwameistoroperatorv1alpha1.Cluster{}},
ClientDisableCacheFor: []client.Object{&hwameistoroperatorv1alpha1.Cluster{}, &batchv1.Job{}},
Scheme: scheme,
MetricsBindAddress: metricsAddr,
Port: 9443,
Expand Down
32 changes: 13 additions & 19 deletions pkg/install/drbd/drbd_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,17 @@ package drbd

import (
"context"
operatorv1alpha1 "github.com/hwameistor/hwameistor-operator/api/v1alpha1"
"regexp"
"strings"

hwameistoriov1alpha1 "github.com/hwameistor/hwameistor-operator/api/v1alpha1"
operatorv1alpha1 "github.com/hwameistor/hwameistor-operator/api/v1alpha1"
"github.com/hwameistor/hwameistor-operator/pkg/install"
log "github.com/sirupsen/logrus"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"regexp"
"sigs.k8s.io/controller-runtime/pkg/client"
"strings"
)

var defaultDeployOnMaster = "no"
Expand Down Expand Up @@ -50,7 +50,6 @@ var distroRegexMap = map[string]string{
"kylin .*v10": "kylin10",
}

var ttlSecondsAfterFinished3600 = int32(3600)
var backoffLimit0 = int32(0)
var terminationGracePeriodSeconds0 = int64(0)

Expand All @@ -71,8 +70,6 @@ var useAffinity string
var nodeAffinity corev1.NodeAffinity
var namespace string

var adapterCreatedJobNum = 0

func HandelDRBDConfigs(clusterInstance *hwameistoriov1alpha1.Cluster) {
drbdConfigs := clusterInstance.Spec.DRBD
if drbdConfigs == nil {
Expand All @@ -98,11 +95,11 @@ func HandelDRBDConfigs(clusterInstance *hwameistoriov1alpha1.Cluster) {
nodeAffinity = *drbdConfigs.NodeAffinity
}

func CreateDRBDAdapter(cli client.Client) (int, error) {
func CreateDRBDAdapter(instance *hwameistoriov1alpha1.Cluster, cli client.Client) error {
nodeList := corev1.NodeList{}
if err := cli.List(context.TODO(), &nodeList); err != nil {
log.Errorf("List nodes err: %v", err)
return adapterCreatedJobNum, err
return err
}

for _, node := range nodeList.Items {
Expand All @@ -120,7 +117,7 @@ func CreateDRBDAdapter(cli client.Client) (int, error) {
matched, err := regexp.Match(k, []byte(osImage))
if err != nil {
log.Errorf("Regexp match err: %v", err)
return adapterCreatedJobNum, err
return err
}
if matched {
distro = v
Expand Down Expand Up @@ -148,8 +145,7 @@ func CreateDRBDAdapter(cli client.Client) (int, error) {
},
},
Spec: batchv1.JobSpec{
TTLSecondsAfterFinished: &ttlSecondsAfterFinished3600,
BackoffLimit: &backoffLimit0,
BackoffLimit: &backoffLimit0,
Template: corev1.PodTemplateSpec{
ObjectMeta: v1.ObjectMeta{
Labels: map[string]string{
Expand Down Expand Up @@ -376,7 +372,7 @@ func CreateDRBDAdapter(cli client.Client) (int, error) {
matched, err := regexp.Match("^rhel[78]$", []byte(distro))
if err != nil {
log.Errorf("Regexp match err: %v", err)
return adapterCreatedJobNum, err
return err
}
if matched {
for i, container := range job.Spec.Template.Spec.Containers {
Expand Down Expand Up @@ -409,16 +405,14 @@ func CreateDRBDAdapter(cli client.Client) (int, error) {
},
}
}

job.OwnerReferences = []v1.OwnerReference{*v1.NewControllerRef(instance, schema.FromAPIVersionAndKind("hwameistor.io/v1alpha1", "Cluster"))}
if err := cli.Create(context.TODO(), &job); err != nil {
log.Errorf("Create job err: %v", job)
return adapterCreatedJobNum, err
} else {
adapterCreatedJobNum = adapterCreatedJobNum + 1
log.Errorf("Create job err: %v", err)
return err
}
}

return adapterCreatedJobNum, nil
return nil
}

func FulfillDRBDSpec(clusterInstance *hwameistoriov1alpha1.Cluster) *hwameistoriov1alpha1.Cluster {
Expand Down

0 comments on commit d1e2d56

Please sign in to comment.