Skip to content
This repository has been archived by the owner on Oct 20, 2022. It is now read-only.

Commit

Permalink
[Fix/Operator] Parameter context controller case spam (#152)
Browse files Browse the repository at this point in the history
* add feature and bump version

* append changelog

* fix parameter context & set requeue configuration #124

* bump version & append changelog
  • Loading branch information
erdrix authored Nov 8, 2021
1 parent 4502cc0 commit c4edae6
Show file tree
Hide file tree
Showing 19 changed files with 222 additions and 114 deletions.
12 changes: 11 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,21 @@

### Fixed Bugs

## v0.7.2

### Added

- [PR #152](https://github.com/Orange-OpenSource/nifikop/pull/152) - **[Operator]** Configurable requeue interval (#124)

### Fixed Bugs

- [PR #152](https://github.com/Orange-OpenSource/nifikop/pull/152) - **[Operator/NiFiParameterContext]** Fix is sync control in nil value case.

## v0.7.1

### Added

- [PR #144](https://github.com/Orange-OpenSource/nifikop/pull/144) - **[Operator/Parameter]** Support empty string and no value set.
- [PR #144](https://github.com/Orange-OpenSource/nifikop/pull/144) - **[Operator/NiFiParameterContext]** Support empty string and no value set.

## v0.7.0

Expand Down
2 changes: 1 addition & 1 deletion config/manager/manager.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ spec:
- /manager
args:
- --leader-elect
image: orangeopensource/nifikop:v0.7.1-release
image: orangeopensource/nifikop:v0.7.2-release
name: nifikop
securityContext:
allowPrivilegeEscalation: false
Expand Down
29 changes: 17 additions & 12 deletions controllers/nificluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,13 @@ var clusterUsersFinalizer = "nificlusters.nifi.orange.com/users"
// NifiClusterReconciler reconciles a NifiCluster object
type NifiClusterReconciler struct {
client.Client
DirectClient client.Reader
Log logr.Logger
Scheme *runtime.Scheme
Namespaces []string
Recorder record.EventRecorder
DirectClient client.Reader
Log logr.Logger
Scheme *runtime.Scheme
Namespaces []string
Recorder record.EventRecorder
RequeueIntervals map[string]int
RequeueOffset int
}

// +kubebuilder:rbac:groups="",resources=persistentvolumeclaims,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -127,38 +129,40 @@ func (r *NifiClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
nifi.New(r.Client, r.DirectClient, r.Scheme, instance),
}

intervalNotReady := util.GetRequeueInterval(r.RequeueIntervals["CLUSTER_TASK_NOT_READY_REQUEUE_INTERVAL"], r.RequeueOffset)
intervalRunning := util.GetRequeueInterval(r.RequeueIntervals["CLUSTER_TASK_RUNNING_REQUEUE_INTERVAL"], r.RequeueOffset)
for _, rec := range reconcilers {
err = rec.Reconcile(r.Log)
if err != nil {
switch errors.Cause(err).(type) {
case errorfactory.NodesUnreachable:
r.Log.Info("Nodes unreachable, may still be starting up")
return reconcile.Result{
RequeueAfter: time.Duration(15) * time.Second,
RequeueAfter: intervalNotReady,
}, nil
case errorfactory.NodesNotReady:
r.Log.Info("Nodes not ready, may still be starting up")
return reconcile.Result{
RequeueAfter: time.Duration(15) * time.Second,
RequeueAfter: intervalNotReady,
}, nil
case errorfactory.ResourceNotReady:
r.Log.Info("A new resource was not found or may not be ready")
r.Log.Info(err.Error())
return reconcile.Result{
RequeueAfter: time.Duration(7) * time.Second,
RequeueAfter: intervalNotReady/2,
}, nil
case errorfactory.ReconcileRollingUpgrade:
r.Log.Info("Rolling Upgrade in Progress")
return reconcile.Result{
RequeueAfter: time.Duration(15) * time.Second,
RequeueAfter: intervalRunning,
}, nil
case errorfactory.NifiClusterNotReady:
return reconcile.Result{
RequeueAfter: time.Duration(15) * time.Second,
RequeueAfter: intervalNotReady,
}, nil
case errorfactory.NifiClusterTaskRunning:
return reconcile.Result{
RequeueAfter: time.Duration(20) * time.Second,
RequeueAfter: intervalRunning,
}, nil
default:
return RequeueWithError(r.Log, err.Error(), err)
Expand Down Expand Up @@ -250,14 +254,15 @@ func (r *NifiClusterReconciler) checkFinalizers(ctx context.Context,

// Do any necessary PKI cleanup - a PKI backend should make sure any
// user finalizations are done before it does its final cleanup
interval := util.GetRequeueInterval(r.RequeueIntervals["CLUSTER_TASK_NOT_READY_REQUEUE_INTERVAL"]/3, r.RequeueOffset)
r.Log.Info("Tearing down any PKI resources for the nificluster")
if err = pki.GetPKIManager(r.Client, cluster).FinalizePKI(ctx, r.Log); err != nil {
switch err.(type) {
case errorfactory.ResourceNotReady:
r.Log.Info("The PKI is not ready to be torn down")
return ctrl.Result{
Requeue: true,
RequeueAfter: time.Duration(5) * time.Second,
RequeueAfter: interval,
}, nil
default:
return RequeueWithError(r.Log, "failed to finalize PKI", err)
Expand Down
25 changes: 16 additions & 9 deletions controllers/nificlustertask_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/Orange-OpenSource/nifikop/pkg/errorfactory"
"github.com/Orange-OpenSource/nifikop/pkg/k8sutil"
"github.com/Orange-OpenSource/nifikop/pkg/nificlient/config"
"github.com/Orange-OpenSource/nifikop/pkg/util"
"github.com/Orange-OpenSource/nifikop/pkg/util/clientconfig"
nifiutil "github.com/Orange-OpenSource/nifikop/pkg/util/nifi"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -46,9 +47,11 @@ import (
// NifiClusterTaskReconciler reconciles
type NifiClusterTaskReconciler struct {
client.Client
Log logr.Logger
Scheme *runtime.Scheme
Recorder record.EventRecorder
Log logr.Logger
Scheme *runtime.Scheme
Recorder record.EventRecorder
RequeueIntervals map[string]int
RequeueOffset int
}

// Reconcile is part of the main kubernetes reconciliation loop which aims to
Expand All @@ -63,6 +66,9 @@ type NifiClusterTaskReconciler struct {
func (r *NifiClusterTaskReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
_ = r.Log.WithValues("nificlustertask", req.NamespacedName)

intervalNotReady := util.GetRequeueInterval(r.RequeueIntervals["CLUSTER_TASK_NOT_READY_REQUEUE_INTERVAL"], r.RequeueOffset)
intervalRunning := util.GetRequeueInterval(r.RequeueIntervals["CLUSTER_TASK_RUNNING_REQUEUE_INTERVAL"], r.RequeueOffset)
intervalTimeout := util.GetRequeueInterval(r.RequeueIntervals["CLUSTER_TASK_TIMEOUT_REQUEUE_INTERVAL"], r.RequeueOffset)
// Fetch the NifiCluster instance
instance := &v1alpha1.NifiCluster{}
err := r.Client.Get(ctx, req.NamespacedName, instance)
Expand Down Expand Up @@ -95,15 +101,16 @@ func (r *NifiClusterTaskReconciler) Reconcile(ctx context.Context, req ctrl.Requ
switch errors.Cause(err).(type) {
case errorfactory.NifiClusterNotReady, errorfactory.ResourceNotReady:
return reconcile.Result{
RequeueAfter: time.Duration(15) * time.Second,
RequeueAfter: intervalNotReady,
}, nil
case errorfactory.NifiClusterTaskRunning:
return reconcile.Result{
RequeueAfter: time.Duration(20) * time.Second,
RequeueAfter: intervalRunning,
}, nil
case errorfactory.NifiClusterTaskTimeout, errorfactory.NifiClusterTaskFailure:

return reconcile.Result{
RequeueAfter: time.Duration(20) * time.Second,
RequeueAfter: intervalTimeout,
}, nil
default:
return RequeueWithError(r.Log, err.Error(), err)
Expand Down Expand Up @@ -131,15 +138,15 @@ func (r *NifiClusterTaskReconciler) Reconcile(ctx context.Context, req ctrl.Requ
switch errors.Cause(err).(type) {
case errorfactory.NifiClusterNotReady:
return reconcile.Result{
RequeueAfter: time.Duration(15) * time.Second,
RequeueAfter: intervalNotReady,
}, nil
case errorfactory.NifiClusterTaskRunning:
return reconcile.Result{
RequeueAfter: time.Duration(20) * time.Second,
RequeueAfter: intervalRunning,
}, nil
case errorfactory.NifiClusterTaskTimeout, errorfactory.NifiClusterTaskFailure:
return reconcile.Result{
RequeueAfter: time.Duration(20) * time.Second,
RequeueAfter: intervalTimeout,
}, nil
default:
return RequeueWithError(r.Log, err.Error(), err)
Expand Down
22 changes: 12 additions & 10 deletions controllers/nifidataflow_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,11 @@ var dataflowFinalizer = "nifidataflows.nifi.orange.com/finalizer"
// NifiDataflowReconciler reconciles a NifiDataflow object
type NifiDataflowReconciler struct {
client.Client
Log logr.Logger
Scheme *runtime.Scheme
Recorder record.EventRecorder
Log logr.Logger
Scheme *runtime.Scheme
Recorder record.EventRecorder
RequeueInterval int
RequeueOffset int
}

// +kubebuilder:rbac:groups=nifi.orange.com,resources=nifidataflows,verbs=get;list;watch;create;update;patch;delete
Expand All @@ -71,7 +73,7 @@ func (r *NifiDataflowReconciler) Reconcile(ctx context.Context, req ctrl.Request
_ = r.Log.WithValues("nifidataflow", req.NamespacedName)

var err error

interval := util.GetRequeueInterval(r.RequeueInterval, r.RequeueOffset)
// Fetch the NifiDataflow instance
instance := &v1alpha1.NifiDataflow{}
if err = r.Client.Get(ctx, req.NamespacedName, instance); err != nil {
Expand Down Expand Up @@ -239,7 +241,7 @@ func (r *NifiDataflowReconciler) Reconcile(ctx context.Context, req ctrl.Request
instance.Spec.ClusterRef.Name, clusterConnect.Id()))

// the cluster does not exist - should have been caught pre-flight
return RequeueAfter(time.Duration(15) * time.Second)
return RequeueAfter(interval)
}

// Ìn case of the cluster reference changed.
Expand All @@ -258,7 +260,7 @@ func (r *NifiDataflowReconciler) Reconcile(ctx context.Context, req ctrl.Request
if err := r.Client.Update(ctx, current); err != nil {
return RequeueWithError(r.Log, "failed to update NifiDatafllow", err)
}
return RequeueAfter(time.Duration(15) * time.Second)
return RequeueAfter(interval)
}

if (instance.Spec.SyncNever() && len(instance.Status.State) > 0) ||
Expand Down Expand Up @@ -346,7 +348,7 @@ func (r *NifiDataflowReconciler) Reconcile(ctx context.Context, req ctrl.Request
errorfactory.NifiFlowControllerServiceScheduling,
errorfactory.NifiFlowScheduling, errorfactory.NifiFlowSyncing:
return reconcile.Result{
RequeueAfter: time.Duration(5) * time.Second,
RequeueAfter: interval/3,
}, nil
default:
r.Recorder.Event(instance, corev1.EventTypeWarning, "SynchronizingFailed",
Expand Down Expand Up @@ -401,7 +403,7 @@ func (r *NifiDataflowReconciler) Reconcile(ctx context.Context, req ctrl.Request
if err := dataflow.ScheduleDataflow(instance, clientConfig); err != nil {
switch errors.Cause(err).(type) {
case errorfactory.NifiFlowControllerServiceScheduling, errorfactory.NifiFlowScheduling:
return RequeueAfter(time.Duration(5) * time.Second)
return RequeueAfter(interval/3)
default:
r.Recorder.Event(instance, corev1.EventTypeWarning, "StartingFailed",
fmt.Sprintf("Starting dataflow %s based on flow {bucketId : %s, flowId: %s, version: %s} failed.",
Expand Down Expand Up @@ -443,7 +445,7 @@ func (r *NifiDataflowReconciler) Reconcile(ctx context.Context, req ctrl.Request
return Reconciled()
}

return RequeueAfter(time.Duration(5) * time.Second)
return RequeueAfter(interval/3)
}

// SetupWithManager sets up the controller with the Manager.
Expand Down Expand Up @@ -485,7 +487,7 @@ func (r *NifiDataflowReconciler) checkFinalizers(ctx context.Context, flow *v1al
if err = r.finalizeNifiDataflow(flow, config); err != nil {
switch errors.Cause(err).(type) {
case errorfactory.NifiConnectionDropping, errorfactory.NifiFlowDraining:
return RequeueAfter(time.Duration(5) * time.Second)
return RequeueAfter(util.GetRequeueInterval(r.RequeueInterval/3, r.RequeueOffset))
default:
return RequeueWithError(r.Log, "failed to finalize NiFiDataflow", err)
}
Expand Down
18 changes: 10 additions & 8 deletions controllers/nifiparametercontext_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,11 @@ var parameterContextFinalizer = "nifiparametercontexts.nifi.orange.com/finalizer
// NifiParameterContextReconciler reconciles a NifiParameterContext object
type NifiParameterContextReconciler struct {
client.Client
Log logr.Logger
Scheme *runtime.Scheme
Recorder record.EventRecorder
Log logr.Logger
Scheme *runtime.Scheme
Recorder record.EventRecorder
RequeueInterval int
RequeueOffset int
}

// +kubebuilder:rbac:groups=nifi.orange.com,resources=nifiparametercontexts,verbs=get;list;watch;create;update;patch;delete
Expand All @@ -68,7 +70,7 @@ type NifiParameterContextReconciler struct {
// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile
func (r *NifiParameterContextReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
_ = r.Log.WithValues("nifiparametercontext", req.NamespacedName)

interval := util.GetRequeueInterval(r.RequeueInterval, r.RequeueOffset)
var err error

// Fetch the NifiParameterContext instance
Expand Down Expand Up @@ -185,7 +187,7 @@ func (r *NifiParameterContextReconciler) Reconcile(ctx context.Context, req ctrl
instance.Spec.ClusterRef.Name, clusterConnect.Id()))

// the cluster does not exist - should have been caught pre-flight
return RequeueAfter(time.Duration(15) * time.Second)
return RequeueAfter(interval)
}

// Ìn case of the cluster reference changed.
Expand All @@ -204,7 +206,7 @@ func (r *NifiParameterContextReconciler) Reconcile(ctx context.Context, req ctrl
if err := r.Client.Update(ctx, current); err != nil {
return RequeueWithError(r.Log, "failed to update NifiParameterContext", err)
}
return RequeueAfter(time.Duration(15) * time.Second)
return RequeueAfter(interval)
}

r.Recorder.Event(instance, corev1.EventTypeNormal, "Reconciling",
Expand Down Expand Up @@ -248,7 +250,7 @@ func (r *NifiParameterContextReconciler) Reconcile(ctx context.Context, req ctrl
if err != nil {
switch errors.Cause(err).(type) {
case errorfactory.NifiParameterContextUpdateRequestRunning:
return RequeueAfter(time.Duration(5) * time.Second)
return RequeueAfter(interval/3)
default:
r.Recorder.Event(instance, corev1.EventTypeNormal, "SynchronizingFailed",
fmt.Sprintf("Synchronizing parameter context %s failed", instance.Name))
Expand Down Expand Up @@ -280,7 +282,7 @@ func (r *NifiParameterContextReconciler) Reconcile(ctx context.Context, req ctrl

r.Log.Info("Ensured Parameter Context")

return RequeueAfter(time.Duration(15) * time.Second)
return RequeueAfter(interval)
}

// SetupWithManager sets up the controller with the Manager.
Expand Down
16 changes: 9 additions & 7 deletions controllers/nifiregistryclient_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,11 @@ var registryClientFinalizer = "nifiregistryclients.nifi.orange.com/finalizer"
// NifiRegistryClientReconciler reconciles a NifiRegistryClient object
type NifiRegistryClientReconciler struct {
client.Client
Log logr.Logger
Scheme *runtime.Scheme
Recorder record.EventRecorder
Log logr.Logger
Scheme *runtime.Scheme
Recorder record.EventRecorder
RequeueInterval int
RequeueOffset int
}

// +kubebuilder:rbac:groups=nifi.orange.com,resources=nifiregistryclients,verbs=get;list;watch;create;update;patch;delete
Expand All @@ -66,7 +68,7 @@ type NifiRegistryClientReconciler struct {
// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile
func (r *NifiRegistryClientReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
_ = r.Log.WithValues("nifiregistryclient", req.NamespacedName)

interval := util.GetRequeueInterval(r.RequeueInterval, r.RequeueOffset)
var err error

// Fetch the NifiRegistryClient instance
Expand Down Expand Up @@ -160,7 +162,7 @@ func (r *NifiRegistryClientReconciler) Reconcile(ctx context.Context, req ctrl.R
fmt.Sprintf("The referenced cluster is not ready yet : %s in %s",
instance.Spec.ClusterRef.Name, clusterConnect.Id()))
// the cluster does not exist - should have been caught pre-flight
return RequeueAfter(time.Duration(15) * time.Second)
return RequeueAfter(interval)
}

// Ìn case of the cluster reference changed.
Expand All @@ -179,7 +181,7 @@ func (r *NifiRegistryClientReconciler) Reconcile(ctx context.Context, req ctrl.R
if err := r.Client.Update(ctx, current); err != nil {
return RequeueWithError(r.Log, "failed to update NifiRegistryClient", err)
}
return RequeueAfter(time.Duration(15) * time.Second)
return RequeueAfter(interval)
}

r.Recorder.Event(instance, corev1.EventTypeNormal, "Reconciling",
Expand Down Expand Up @@ -254,7 +256,7 @@ func (r *NifiRegistryClientReconciler) Reconcile(ctx context.Context, req ctrl.R

r.Log.Info("Ensured Registry Client")

return RequeueAfter(time.Duration(15) * time.Second)
return RequeueAfter(interval)
}

// SetupWithManager sets up the controller with the Manager.
Expand Down
Loading

0 comments on commit c4edae6

Please sign in to comment.