Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
feat: support batchConfirm && GrayTime
Browse files Browse the repository at this point in the history
Signed-off-by: charlie <[email protected]>
  • Loading branch information
Charlie17Li committed Sep 25, 2023
1 parent 0acc987 commit 5dae446
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 7 deletions.
15 changes: 9 additions & 6 deletions module-controller/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ import (
"go.uber.org/zap/zapcore"
"os"

// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.
_ "k8s.io/client-go/plugin/pkg/client/auth"

"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/util/workqueue"

// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.
_ "k8s.io/client-go/plugin/pkg/client/auth"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
Expand Down Expand Up @@ -92,8 +93,10 @@ func main() {
}

if err = (&controller.ModuleDeploymentReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
DelayingQueue: workqueue.NewDelayingQueue(),
Set: map[string][]int32{},
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ModuleDeployment")
os.Exit(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"context"
"fmt"
"strconv"
"strings"
"sync"
"time"

v1 "k8s.io/api/apps/v1"
Expand All @@ -29,6 +31,8 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/workqueue"
"k8s.io/utils/pointer"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -44,7 +48,10 @@ import (
// ModuleDeploymentReconciler reconciles a ModuleDeployment object
type ModuleDeploymentReconciler struct {
client.Client
Scheme *runtime.Scheme
Scheme *runtime.Scheme
DelayingQueue workqueue.DelayingInterface
Set map[string][]int32
mutex sync.Mutex
}

//+kubebuilder:rbac:groups=serverless.alipay.com,resources=moduledeployments,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -126,6 +133,21 @@ func (r *ModuleDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Req
return ctrl.Result{}, err
}
}
case moduledeploymentv1alpha1.ModuleDeploymentReleaseProgressWaitingForConfirmation:
moduleDeployment.Status.ReleaseStatus.Progress = moduledeploymentv1alpha1.ModuleDeploymentReleaseProgressPaused
if err := r.Status().Update(ctx, moduleDeployment); err != nil {
return ctrl.Result{}, err
}
case moduledeploymentv1alpha1.ModuleDeploymentReleaseProgressPaused:
moduleDeployment.Spec.Pause = true
if err := r.Update(ctx, moduleDeployment); err != nil {
return ctrl.Result{}, err
}

moduleDeployment.Status.ReleaseStatus.Progress = moduledeploymentv1alpha1.ModuleDeploymentReleaseProgressExecuting
if err := r.Status().Update(ctx, moduleDeployment); err != nil {
return ctrl.Result{}, err
}
}

// update moduleDeployment owner reference
Expand Down Expand Up @@ -432,8 +454,61 @@ func (r *ModuleDeploymentReconciler) createNewReplicaSet(ctx context.Context, mo
return moduleReplicaSet, nil
}

type waitingLoop struct {
*ModuleDeploymentReconciler
}

func (w *waitingLoop) Start(ctx context.Context) error {
for i := 0; i < 3; i++ {
go wait.Until(w.worker, time.Second, ctx.Done())
}
return nil
}

func (w *waitingLoop) worker() {
for w.processItem() {
}
}

func (w *waitingLoop) processItem() bool {
item, shutdown := w.DelayingQueue.Get()
if shutdown {
return false
}

defer w.DelayingQueue.Done(item)

_ = w.HandleDelayingItem(item.(string))
return true
}

func (r *ModuleDeploymentReconciler) HandleDelayingItem(key string) error {
arr := strings.Split(key, "/")
if len(arr) != 2 {
return fmt.Errorf("invalid key %v", key)
}
moduleDeployment := &moduledeploymentv1alpha1.ModuleDeployment{}
name, namespace := arr[0], arr[1]
err := r.Get(context.TODO(), types.NamespacedName{Namespace: namespace, Name: name}, moduleDeployment)
if err != nil {
if errors.IsNotFound(err) {
return nil
}
log.Log.Error(err, "get moduleDeployment failed")
return err
}
if moduleDeployment.Spec.Pause {
moduleDeployment.Spec.Pause = false
if err = r.Update(context.TODO(), moduleDeployment); err != nil {
log.Log.Error(err, "update moduleDeployment failed")
}
}
return err
}

// SetupWithManager sets up the controller with the Manager.
func (r *ModuleDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) error {
mgr.Add(&waitingLoop{r})
return ctrl.NewControllerManagedBy(mgr).
For(&moduledeploymentv1alpha1.ModuleDeployment{}).
Complete(r)
Expand Down

0 comments on commit 5dae446

Please sign in to comment.