Skip to content
This repository has been archived by the owner on Sep 2, 2022. It is now read-only.

feat:support max concurrent workers for Reconciles #333

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ FLINK_OPERATOR_NAMESPACE ?= flink-operator-system
RESOURCE_PREFIX ?= flink-operator-
# The Kubernetes namespace to limit watching.
WATCH_NAMESPACE ?=
# The maximum number of concurrent Reconciles which can be run. Defaults to 1..
MAX_CONCURRENT ?= 1

GREEN=\033[1;32m
RED=\033[1;31m
Expand Down Expand Up @@ -112,6 +114,7 @@ ifneq ($(WATCH_NAMESPACE),)
|| true
endif
sed -E -i.bak "s/resources:/bases:/" config/deploy/kustomization.yaml
sed -E -i.bak "s/(\-\-maxConcurrentReconciles\=)/\1$(MAX_CONCURRENT)/" config/deploy/manager_auth_proxy_patch.yaml
rm config/deploy/*.bak

# Generate deploy template.
Expand Down
36 changes: 36 additions & 0 deletions api/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions config/default/manager_auth_proxy_patch.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,4 @@ spec:
args:
- "--metrics-addr=127.0.0.1:8080"
- "--watch-namespace="
- "--maxConcurrentReconciles="
6 changes: 4 additions & 2 deletions controllers/flinkcluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ package controllers

import (
"context"
"github.com/googlecloudplatform/flink-operator/controllers/history"
"time"

"github.com/go-logr/logr"
v1beta1 "github.com/googlecloudplatform/flink-operator/api/v1beta1"
"github.com/googlecloudplatform/flink-operator/controllers/flinkclient"
"github.com/googlecloudplatform/flink-operator/controllers/history"
"github.com/googlecloudplatform/flink-operator/controllers/model"
ctrlcontroller "sigs.k8s.io/controller-runtime/pkg/controller"

appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
Expand Down Expand Up @@ -83,9 +84,10 @@ func (reconciler *FlinkClusterReconciler) Reconcile(
// SetupWithManager registers this reconciler with the controller manager and
// starts watching FlinkCluster, Deployment and Service resources.
func (reconciler *FlinkClusterReconciler) SetupWithManager(
mgr ctrl.Manager) error {
mgr ctrl.Manager, maxConcurrentReconciles int) error {
reconciler.Mgr = mgr
return ctrl.NewControllerManagedBy(mgr).
WithOptions(ctrlcontroller.Options{MaxConcurrentReconciles: maxConcurrentReconciles}).
For(&v1beta1.FlinkCluster{}).
Owns(&appsv1.Deployment{}).
Owns(&corev1.Service{}).
Expand Down
Binary file added flink-operator
Binary file not shown.
4 changes: 3 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ func main() {
var metricsAddr string
var enableLeaderElection bool
var watchNamespace string
var maxConcurrentReconciles int
flag.StringVar(&metricsAddr, "metrics-addr", ":8080", "The address the metric endpoint binds to.")
flag.IntVar(&maxConcurrentReconciles, "maxConcurrentReconciles", 1, "The maximum number of concurrent Reconciles which can be run. Defaults to 1.")
flag.BoolVar(&enableLeaderElection, "enable-leader-election", false,
"Enable leader election for controller manager. Enabling this will ensure there is only one active controller manager.")
flag.StringVar(
Expand All @@ -78,7 +80,7 @@ func main() {
err = (&controllers.FlinkClusterReconciler{
Client: mgr.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("FlinkCluster"),
}).SetupWithManager(mgr)
}).SetupWithManager(mgr, maxConcurrentReconciles)
if err != nil {
setupLog.Error(err, "Unable to create controller", "controller", "FlinkCluster")
os.Exit(1)
Expand Down