diff --git a/cmd/argocd-application-controller/commands/argocd_application_controller.go b/cmd/argocd-application-controller/commands/argocd_application_controller.go index c4a8f3b219c97..a4707e511974c 100644 --- a/cmd/argocd-application-controller/commands/argocd_application_controller.go +++ b/cmd/argocd-application-controller/commands/argocd_application_controller.go @@ -30,11 +30,13 @@ import ( "github.com/argoproj/argo-cd/v2/util/settings" "github.com/argoproj/argo-cd/v2/util/tls" "github.com/argoproj/argo-cd/v2/util/trace" + kubeerrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) const ( // CLIName is the name of the CLI - cliName = "argocd-application-controller" + cliName = common.ApplicationController // Default time in seconds for application resync period defaultAppResyncPeriod = 180 // Default time in seconds for application hard resync period @@ -92,7 +94,7 @@ func NewCommand() *cobra.Command { config, err := clientConfig.ClientConfig() errors.CheckError(err) errors.CheckError(v1alpha1.SetK8SConfigDefaults(config)) - config.UserAgent = fmt.Sprintf("argocd-application-controller/%s (%s)", vers.Version, vers.Platform) + config.UserAgent = fmt.Sprintf("%s/%s (%s)", common.DefaultApplicationControllerName, vers.Version, vers.Platform) kubeClient := kubernetes.NewForConfigOrDie(config) appClient := appclientset.NewForConfigOrDie(config) @@ -138,6 +140,7 @@ func NewCommand() *cobra.Command { })) kubectl := kubeutil.NewKubectl() clusterFilter := getClusterFilter(kubeClient, settingsMgr, shardingAlgorithm) + errors.CheckError(err) appController, err = controller.NewApplicationController( namespace, settingsMgr, @@ -208,20 +211,49 @@ func NewCommand() *cobra.Command { } func getClusterFilter(kubeClient *kubernetes.Clientset, settingsMgr *settings.SettingsManager, shardingAlgorithm string) sharding.ClusterFilterFunction { - replicas := env.ParseNumFromEnv(common.EnvControllerReplicas, 0, 0, math.MaxInt32) + + var replicas int shard := env.ParseNumFromEnv(common.EnvControllerShard, -1, -math.MaxInt32, math.MaxInt32) + + applicationControllerName := env.StringFromEnv(common.EnvAppControllerName, common.DefaultApplicationControllerName) + appControllerDeployment, _ := kubeClient.AppsV1().Deployments(settingsMgr.GetNamespace()).Get(context.Background(), applicationControllerName, metav1.GetOptions{}) + + if appControllerDeployment != nil && appControllerDeployment.Spec.Replicas != nil { + replicas = int(*appControllerDeployment.Spec.Replicas) + } else { + replicas = env.ParseNumFromEnv(common.EnvControllerReplicas, 0, 0, math.MaxInt32) + } + var clusterFilter func(cluster *v1alpha1.Cluster) bool if replicas > 1 { - if shard < 0 { + // check for shard mapping using configmap if application-controller is a deployment + // else use existing logic to infer shard from pod name if application-controller is a statefulset + if appControllerDeployment != nil { + var err error - shard, err = sharding.InferShard() + // retry 3 times if we find a conflict while updating shard mapping configMap. + // If we still see conflicts after the retries, wait for next iteration of heartbeat process. + for i := 0; i <= common.AppControllerHeartbeatUpdateRetryCount; i++ { + shard, err = sharding.GetOrUpdateShardFromConfigMap(kubeClient, settingsMgr, replicas, shard) + if !kubeerrors.IsConflict(err) { + err = fmt.Errorf("unable to get shard due to error updating the sharding config map: %s", err) + break + } + log.Warnf("conflict when getting shard from shard mapping configMap. Retrying (%d/3)", i) + } errors.CheckError(err) + } else { + if shard < 0 { + var err error + shard, err = sharding.InferShard() + errors.CheckError(err) + } } log.Infof("Processing clusters from shard %d", shard) db := db.NewDB(settingsMgr.GetNamespace(), settingsMgr, kubeClient) log.Infof("Using filter function: %s", shardingAlgorithm) distributionFunction := sharding.GetDistributionFunction(db, shardingAlgorithm) - clusterFilter = sharding.GetClusterFilter(distributionFunction, shard) + clusterFilter = sharding.GetClusterFilter(db, distributionFunction, shard) } else { log.Info("Processing all cluster shards") } diff --git a/common/common.go b/common/common.go index 5ea02095e8bb9..59e2d7b8474ab 100644 --- a/common/common.go +++ b/common/common.go @@ -12,6 +12,11 @@ import ( "google.golang.org/grpc/status" ) +// Component names +const ( + ApplicationController = "argocd-application-controller" +) + // Default service addresses and URLS of Argo CD internal services const ( // DefaultRepoServerAddr is the gRPC address of the Argo CD repo server @@ -34,6 +39,8 @@ const ( // ArgoCDTLSCertsConfigMapName contains TLS certificate data for connecting repositories. Will get mounted as volume to pods ArgoCDTLSCertsConfigMapName = "argocd-tls-certs-cm" ArgoCDGPGKeysConfigMapName = "argocd-gpg-keys-cm" + // ArgoCDAppControllerShardConfigMapName contains the application controller to shard mapping + ArgoCDAppControllerShardConfigMapName = "argocd-app-controller-shard-cm" ) // Some default configurables @@ -109,6 +116,8 @@ const ( // RoundRobinShardingAlgorithm is a flag value that can be opted for Sharding Algorithm it uses an equal distribution accross all shards RoundRobinShardingAlgorithm = "round-robin" DefaultShardingAlgorithm = LegacyShardingAlgorithm + // AppControllerHeartbeatUpdateRetryCount is the retry count for updating the Shard Mapping to the Shard Mapping ConfigMap used by Application Controller + AppControllerHeartbeatUpdateRetryCount = 3 ) // Dex related constants @@ -209,6 +218,8 @@ const ( EnvPauseGenerationRequests = "ARGOCD_PAUSE_GEN_REQUESTS" // EnvControllerReplicas is the number of controller replicas EnvControllerReplicas = "ARGOCD_CONTROLLER_REPLICAS" + // EnvControllerHeartbeatTime will update the heartbeat for application controller to claim shard + EnvControllerHeartbeatTime = "ARGOCD_CONTROLLER_HEARTBEAT_TIME" // EnvControllerShard is the shard number that should be handled by controller EnvControllerShard = "ARGOCD_CONTROLLER_SHARD" // EnvControllerShardingAlgorithm is the distribution sharding algorithm to be used: legacy or round-robin diff --git a/controller/appcontroller.go b/controller/appcontroller.go index e3ca42f9abdd7..7a088dd0bc126 100644 --- a/controller/appcontroller.go +++ b/controller/appcontroller.go @@ -34,6 +34,8 @@ import ( "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/informers" + informerv1 "k8s.io/client-go/informers/apps/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" @@ -51,6 +53,7 @@ import ( "github.com/argoproj/argo-cd/v2/reposerver/apiclient" "github.com/argoproj/argo-cd/v2/util/argo" argodiff "github.com/argoproj/argo-cd/v2/util/argo/diff" + "github.com/argoproj/argo-cd/v2/util/env" appstatecache "github.com/argoproj/argo-cd/v2/util/cache/appstate" "github.com/argoproj/argo-cd/v2/util/db" @@ -59,10 +62,12 @@ import ( "github.com/argoproj/argo-cd/v2/util/helm" logutils "github.com/argoproj/argo-cd/v2/util/log" settings_util "github.com/argoproj/argo-cd/v2/util/settings" + kubeerrors "k8s.io/apimachinery/pkg/api/errors" ) const ( - updateOperationStateTimeout = 1 * time.Second + updateOperationStateTimeout = 1 * time.Second + defaultDeploymentInformerResyncDuration = 10 // orphanedIndex contains application which monitor orphaned resources by namespace orphanedIndex = "orphaned" ) @@ -105,6 +110,7 @@ type ApplicationController struct { appInformer cache.SharedIndexInformer appLister applisters.ApplicationLister projInformer cache.SharedIndexInformer + deploymentInformer informerv1.DeploymentInformer appStateManager AppStateManager stateCache statecache.LiveStateCache statusRefreshTimeout time.Duration @@ -160,7 +166,7 @@ func NewApplicationController( statusHardRefreshTimeout: appHardResyncPeriod, refreshRequestedApps: make(map[string]CompareWith), refreshRequestedAppsMutex: &sync.Mutex{}, - auditLogger: argo.NewAuditLogger(namespace, kubeClientset, "argocd-application-controller"), + auditLogger: argo.NewAuditLogger(namespace, kubeClientset, common.ApplicationController), settingsMgr: settingsMgr, selfHealTimeout: selfHealTimeout, clusterFilter: clusterFilter, @@ -201,11 +207,31 @@ func NewApplicationController( } }, }) + + factory := informers.NewSharedInformerFactory(ctrl.kubeClientset, defaultDeploymentInformerResyncDuration) + deploymentInformer := factory.Apps().V1().Deployments() + + readinessHealthCheck := func(r *http.Request) error { + applicationControllerName := env.StringFromEnv(common.EnvAppControllerName, common.DefaultApplicationControllerName) + appControllerDeployment, err := deploymentInformer.Lister().Deployments(settingsMgr.GetNamespace()).Get(applicationControllerName) + if !kubeerrors.IsNotFound(err) { + return fmt.Errorf("error retrieving Application Controller Deployment: %s", err) + } + if appControllerDeployment != nil { + if appControllerDeployment.Spec.Replicas != nil && int(*appControllerDeployment.Spec.Replicas) <= 0 { + return fmt.Errorf("application controller deployment replicas is not set or is less than 0, replicas: %d", appControllerDeployment.Spec.Replicas) + } + shard := env.ParseNumFromEnv(common.EnvControllerShard, -1, -math.MaxInt32, math.MaxInt32) + if _, err := sharding.GetOrUpdateShardFromConfigMap(kubeClientset.(*kubernetes.Clientset), settingsMgr, int(*appControllerDeployment.Spec.Replicas), shard); err != nil { + return fmt.Errorf("error while updating the heartbeat for to the Shard Mapping ConfigMap: %s", err) + } + } + return nil + } + metricsAddr := fmt.Sprintf("0.0.0.0:%d", metricsPort) var err error - ctrl.metricsServer, err = metrics.NewMetricsServer(metricsAddr, appLister, ctrl.canProcessApp, func(r *http.Request) error { - return nil - }, metricsApplicationLabels) + ctrl.metricsServer, err = metrics.NewMetricsServer(metricsAddr, appLister, ctrl.canProcessApp, readinessHealthCheck, metricsApplicationLabels) if err != nil { return nil, err } @@ -220,6 +246,7 @@ func NewApplicationController( ctrl.appInformer = appInformer ctrl.appLister = appLister ctrl.projInformer = projInformer + ctrl.deploymentInformer = deploymentInformer ctrl.appStateManager = appStateManager ctrl.stateCache = stateCache @@ -724,6 +751,7 @@ func (ctrl *ApplicationController) Run(ctx context.Context, statusProcessors int go ctrl.appInformer.Run(ctx.Done()) go ctrl.projInformer.Run(ctx.Done()) + go ctrl.deploymentInformer.Informer().Run(ctx.Done()) errors.CheckError(ctrl.stateCache.Init()) diff --git a/controller/metrics/metrics.go b/controller/metrics/metrics.go index 3cfb16a249339..e4ef09552c09d 100644 --- a/controller/metrics/metrics.go +++ b/controller/metrics/metrics.go @@ -17,6 +17,7 @@ import ( log "github.com/sirupsen/logrus" "k8s.io/apimachinery/pkg/labels" + "github.com/argoproj/argo-cd/v2/common" argoappv1 "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" applister "github.com/argoproj/argo-cd/v2/pkg/client/listers/application/v1alpha1" "github.com/argoproj/argo-cd/v2/util/git" @@ -260,12 +261,12 @@ func (m *MetricsServer) IncKubernetesRequest(app *argoappv1.Application, server, } func (m *MetricsServer) IncRedisRequest(failed bool) { - m.redisRequestCounter.WithLabelValues(m.hostname, "argocd-application-controller", strconv.FormatBool(failed)).Inc() + m.redisRequestCounter.WithLabelValues(m.hostname, common.ApplicationController, strconv.FormatBool(failed)).Inc() } // ObserveRedisRequestDuration observes redis request duration func (m *MetricsServer) ObserveRedisRequestDuration(duration time.Duration) { - m.redisRequestHistogram.WithLabelValues(m.hostname, "argocd-application-controller").Observe(duration.Seconds()) + m.redisRequestHistogram.WithLabelValues(m.hostname, common.ApplicationController).Observe(duration.Seconds()) } // IncReconcile increments the reconcile counter for an application diff --git a/controller/sharding/sharding.go b/controller/sharding/sharding.go index 25058e4e23c53..526896531dbca 100644 --- a/controller/sharding/sharding.go +++ b/controller/sharding/sharding.go @@ -4,32 +4,57 @@ import ( "context" "fmt" "hash/fnv" - "math" "os" "sort" "strconv" "strings" + "time" + + "encoding/json" "github.com/argoproj/argo-cd/v2/common" "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" "github.com/argoproj/argo-cd/v2/util/db" "github.com/argoproj/argo-cd/v2/util/env" + "github.com/argoproj/argo-cd/v2/util/settings" log "github.com/sirupsen/logrus" + kubeerrors "k8s.io/apimachinery/pkg/api/errors" ) // Make it overridable for testing var osHostnameFunction = os.Hostname +// Make it overridable for testing +var heartbeatCurrentTime = metav1.Now + +var ( + HeartbeatDuration = env.ParseNumFromEnv(common.EnvControllerHeartbeatTime, 10, 10, 60) + HeartbeatTimeout = 3 * HeartbeatDuration +) + +const ShardControllerMappingKey = "shardControllerMapping" + type DistributionFunction func(c *v1alpha1.Cluster) int type ClusterFilterFunction func(c *v1alpha1.Cluster) bool +// shardApplicationControllerMapping stores the mapping of Shard Number to Application Controller in ConfigMap. +// It also stores the heartbeat of last synced time of the application controller. +type shardApplicationControllerMapping struct { + ShardNumber int + ControllerName string + HeartbeatTime metav1.Time +} + // GetClusterFilter returns a ClusterFilterFunction which is a function taking a cluster as a parameter // and returns wheter or not the cluster should be processed by a given shard. It calls the distributionFunction // to determine which shard will process the cluster, and if the given shard is equal to the calculated shard // the function will return true. -func GetClusterFilter(distributionFunction DistributionFunction, shard int) ClusterFilterFunction { - replicas := env.ParseNumFromEnv(common.EnvControllerReplicas, 0, 0, math.MaxInt32) +func GetClusterFilter(db db.ArgoDB, distributionFunction DistributionFunction, shard int) ClusterFilterFunction { + replicas := db.GetApplicationControllerReplicas() return func(c *v1alpha1.Cluster) bool { clusterShard := 0 if c != nil && c.Shard != nil { @@ -50,12 +75,12 @@ func GetClusterFilter(distributionFunction DistributionFunction, shard int) Clus // the current datas. func GetDistributionFunction(db db.ArgoDB, shardingAlgorithm string) DistributionFunction { log.Infof("Using filter function: %s", shardingAlgorithm) - distributionFunction := LegacyDistributionFunction() + distributionFunction := LegacyDistributionFunction(db) switch shardingAlgorithm { case common.RoundRobinShardingAlgorithm: distributionFunction = RoundRobinDistributionFunction(db) case common.LegacyShardingAlgorithm: - distributionFunction = LegacyDistributionFunction() + distributionFunction = LegacyDistributionFunction(db) default: log.Warnf("distribution type %s is not supported, defaulting to %s", shardingAlgorithm, common.DefaultShardingAlgorithm) } @@ -67,8 +92,8 @@ func GetDistributionFunction(db db.ArgoDB, shardingAlgorithm string) Distributio // is lightweight and can be distributed easily, however, it does not ensure an homogenous distribution as // some shards may get assigned more clusters than others. It is the legacy function distribution that is // kept for compatibility reasons -func LegacyDistributionFunction() DistributionFunction { - replicas := env.ParseNumFromEnv(common.EnvControllerReplicas, 0, 0, math.MaxInt32) +func LegacyDistributionFunction(db db.ArgoDB) DistributionFunction { + replicas := db.GetApplicationControllerReplicas() return func(c *v1alpha1.Cluster) int { if replicas == 0 { return -1 @@ -97,7 +122,7 @@ func LegacyDistributionFunction() DistributionFunction { // clusters +/-1 , but with the drawback of a reshuffling of clusters accross shards in case of some changes // in the cluster list func RoundRobinDistributionFunction(db db.ArgoDB) DistributionFunction { - replicas := env.ParseNumFromEnv(common.EnvControllerReplicas, 0, 0, math.MaxInt32) + replicas := db.GetApplicationControllerReplicas() return func(c *v1alpha1.Cluster) int { if replicas > 0 { if c == nil { // in-cluster does not necessarly have a secret assigned. So we are receiving a nil cluster here. @@ -123,7 +148,7 @@ func RoundRobinDistributionFunction(db db.ArgoDB) DistributionFunction { func InferShard() (int, error) { hostname, err := osHostnameFunction() if err != nil { - return 0, err + return -1, err } parts := strings.Split(hostname, "-") if len(parts) == 0 { @@ -162,3 +187,167 @@ func createClusterIndexByClusterIdMap(db db.ArgoDB) map[string]int { } return clusterIndexedByClusterId } + +// GetOrUpdateShardFromConfigMap finds the shard number from the shard mapping configmap. If the shard mapping configmap does not exist, +// the function creates the shard mapping configmap. +// The function takes the shard number from the environment variable (default value -1, if not set) and passes it to this function. +// If the shard value passed to this function is -1, that is, the shard was not set as an environment variable, +// we default the shard number to 0 for computing the default config map. +func GetOrUpdateShardFromConfigMap(kubeClient *kubernetes.Clientset, settingsMgr *settings.SettingsManager, replicas, shard int) (int, error) { + + hostname, err := osHostnameFunction() + if err != nil { + return -1, err + } + + // fetch the shard mapping configMap + shardMappingCM, err := kubeClient.CoreV1().ConfigMaps(settingsMgr.GetNamespace()).Get(context.Background(), common.ArgoCDAppControllerShardConfigMapName, metav1.GetOptions{}) + + if err != nil { + if !kubeerrors.IsNotFound(err) { + return -1, fmt.Errorf("error getting sharding config map: %s", err) + } + log.Infof("shard mapping configmap %s not found. Creating default shard mapping configmap.", common.ArgoCDAppControllerShardConfigMapName) + + // if the shard is not set as an environment variable, set the default value of shard to 0 for generating default CM + if shard == -1 { + shard = 0 + } + shardMappingCM, err = generateDefaultShardMappingCM(settingsMgr.GetNamespace(), hostname, replicas, shard) + if err != nil { + return -1, fmt.Errorf("error generating default shard mapping configmap %s", err) + } + if _, err = kubeClient.CoreV1().ConfigMaps(settingsMgr.GetNamespace()).Create(context.Background(), shardMappingCM, metav1.CreateOptions{}); err != nil { + return -1, fmt.Errorf("error creating shard mapping configmap %s", err) + } + // return 0 as the controller is assigned to shard 0 while generating default shard mapping ConfigMap + return shard, nil + } else { + // Identify the available shard and update the ConfigMap + data := shardMappingCM.Data[ShardControllerMappingKey] + var shardMappingData []shardApplicationControllerMapping + err := json.Unmarshal([]byte(data), &shardMappingData) + if err != nil { + return -1, fmt.Errorf("error unmarshalling shard config map data: %s", err) + } + + shard, shardMappingData := getOrUpdateShardNumberForController(shardMappingData, hostname, replicas, shard) + updatedShardMappingData, err := json.Marshal(shardMappingData) + if err != nil { + return -1, fmt.Errorf("error marshalling data of shard mapping ConfigMap: %s", err) + } + shardMappingCM.Data[ShardControllerMappingKey] = string(updatedShardMappingData) + + _, err = kubeClient.CoreV1().ConfigMaps(settingsMgr.GetNamespace()).Update(context.Background(), shardMappingCM, metav1.UpdateOptions{}) + if err != nil { + return -1, err + } + return shard, nil + } +} + +// getOrUpdateShardNumberForController takes list of shardApplicationControllerMapping and performs computation to find the matching or empty shard number +func getOrUpdateShardNumberForController(shardMappingData []shardApplicationControllerMapping, hostname string, replicas, shard int) (int, []shardApplicationControllerMapping) { + + // if current length of shardMappingData in shard mapping configMap is less than the number of replicas, + // create additional empty entries for missing shard numbers in shardMappingDataconfigMap + if len(shardMappingData) < replicas { + // generate extra default mappings + for currentShard := len(shardMappingData); currentShard < replicas; currentShard++ { + shardMappingData = append(shardMappingData, shardApplicationControllerMapping{ + ShardNumber: currentShard, + }) + } + } + + // if current length of shardMappingData in shard mapping configMap is more than the number of replicas, + // we replace the config map with default config map and let controllers self assign the new shard to itself + if len(shardMappingData) > replicas { + shardMappingData = getDefaultShardMappingData(replicas) + } + + if shard != -1 && shard < replicas { + log.Debugf("update heartbeat for shard %d", shard) + for i := range shardMappingData { + shardMapping := shardMappingData[i] + if shardMapping.ShardNumber == shard { + log.Debugf("Shard found. Updating heartbeat!!") + shardMapping.ControllerName = hostname + shardMapping.HeartbeatTime = heartbeatCurrentTime() + shardMappingData[i] = shardMapping + break + } + } + } else { + // find the matching shard with assigned controllerName + for i := range shardMappingData { + shardMapping := shardMappingData[i] + if shardMapping.ControllerName == hostname { + log.Debugf("Shard matched. Updating heartbeat!!") + shard = int(shardMapping.ShardNumber) + shardMapping.HeartbeatTime = heartbeatCurrentTime() + shardMappingData[i] = shardMapping + break + } + } + } + + // at this point, we have still not found a shard with matching hostname. + // So, find a shard with either no controller assigned or assigned controller + // with heartbeat past threshold + if shard == -1 { + for i := range shardMappingData { + shardMapping := shardMappingData[i] + if (shardMapping.ControllerName == "") || (metav1.Now().After(shardMapping.HeartbeatTime.Add(time.Duration(HeartbeatTimeout) * time.Second))) { + shard = int(shardMapping.ShardNumber) + log.Debugf("Empty shard found %d", shard) + shardMapping.ControllerName = hostname + shardMapping.HeartbeatTime = heartbeatCurrentTime() + shardMappingData[i] = shardMapping + break + } + } + } + return shard, shardMappingData +} + +// generateDefaultShardMappingCM creates a default shard mapping configMap. Assigns current controller to shard 0. +func generateDefaultShardMappingCM(namespace, hostname string, replicas, shard int) (*v1.ConfigMap, error) { + + shardingCM := &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: common.ArgoCDAppControllerShardConfigMapName, + Namespace: namespace, + }, + Data: map[string]string{}, + } + + shardMappingData := getDefaultShardMappingData(replicas) + + // if shard is not assigned to a controller, we use shard 0 + if shard == -1 || shard > replicas { + shard = 0 + } + shardMappingData[shard].ControllerName = hostname + shardMappingData[shard].HeartbeatTime = heartbeatCurrentTime() + + data, err := json.Marshal(shardMappingData) + if err != nil { + return nil, fmt.Errorf("error generating default ConfigMap: %s", err) + } + shardingCM.Data[ShardControllerMappingKey] = string(data) + + return shardingCM, nil +} + +func getDefaultShardMappingData(replicas int) []shardApplicationControllerMapping { + shardMappingData := make([]shardApplicationControllerMapping, 0) + + for i := 0; i < replicas; i++ { + mapping := shardApplicationControllerMapping{ + ShardNumber: i, + } + shardMappingData = append(shardMappingData, mapping) + } + return shardMappingData +} diff --git a/controller/sharding/sharding_test.go b/controller/sharding/sharding_test.go index 629c023c4a054..a8a25e11c4978 100644 --- a/controller/sharding/sharding_test.go +++ b/controller/sharding/sharding_test.go @@ -1,51 +1,59 @@ package sharding import ( + "encoding/json" "errors" "fmt" "os" "testing" + "time" "github.com/argoproj/argo-cd/v2/common" "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" dbmocks "github.com/argoproj/argo-cd/v2/util/db/mocks" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) func TestGetShardByID_NotEmptyID(t *testing.T) { - t.Setenv(common.EnvControllerReplicas, "1") - assert.Equal(t, 0, LegacyDistributionFunction()(&v1alpha1.Cluster{ID: "1"})) - assert.Equal(t, 0, LegacyDistributionFunction()(&v1alpha1.Cluster{ID: "2"})) - assert.Equal(t, 0, LegacyDistributionFunction()(&v1alpha1.Cluster{ID: "3"})) - assert.Equal(t, 0, LegacyDistributionFunction()(&v1alpha1.Cluster{ID: "4"})) + db := &dbmocks.ArgoDB{} + db.On("GetApplicationControllerReplicas").Return(1) + assert.Equal(t, 0, LegacyDistributionFunction(db)(&v1alpha1.Cluster{ID: "1"})) + assert.Equal(t, 0, LegacyDistributionFunction(db)(&v1alpha1.Cluster{ID: "2"})) + assert.Equal(t, 0, LegacyDistributionFunction(db)(&v1alpha1.Cluster{ID: "3"})) + assert.Equal(t, 0, LegacyDistributionFunction(db)(&v1alpha1.Cluster{ID: "4"})) } func TestGetShardByID_EmptyID(t *testing.T) { - t.Setenv(common.EnvControllerReplicas, "1") + db := &dbmocks.ArgoDB{} + db.On("GetApplicationControllerReplicas").Return(1) distributionFunction := LegacyDistributionFunction - shard := distributionFunction()(&v1alpha1.Cluster{}) + shard := distributionFunction(db)(&v1alpha1.Cluster{}) assert.Equal(t, 0, shard) } func TestGetShardByID_NoReplicas(t *testing.T) { - t.Setenv(common.EnvControllerReplicas, "0") + db := &dbmocks.ArgoDB{} + db.On("GetApplicationControllerReplicas").Return(0) distributionFunction := LegacyDistributionFunction - shard := distributionFunction()(&v1alpha1.Cluster{}) + shard := distributionFunction(db)(&v1alpha1.Cluster{}) assert.Equal(t, -1, shard) } func TestGetShardByID_NoReplicasUsingHashDistributionFunction(t *testing.T) { - t.Setenv(common.EnvControllerReplicas, "0") + db := &dbmocks.ArgoDB{} + db.On("GetApplicationControllerReplicas").Return(0) distributionFunction := LegacyDistributionFunction - shard := distributionFunction()(&v1alpha1.Cluster{}) + shard := distributionFunction(db)(&v1alpha1.Cluster{}) assert.Equal(t, -1, shard) } func TestGetShardByID_NoReplicasUsingHashDistributionFunctionWithClusters(t *testing.T) { db, cluster1, cluster2, cluster3, cluster4, cluster5 := createTestClusters() // Test with replicas set to 0 - t.Setenv(common.EnvControllerReplicas, "0") + db.On("GetApplicationControllerReplicas").Return(0) t.Setenv(common.EnvControllerShardingAlgorithm, common.RoundRobinShardingAlgorithm) distributionFunction := RoundRobinDistributionFunction(db) assert.Equal(t, -1, distributionFunction(nil)) @@ -59,8 +67,9 @@ func TestGetShardByID_NoReplicasUsingHashDistributionFunctionWithClusters(t *tes func TestGetClusterFilterDefault(t *testing.T) { shardIndex := 1 // ensuring that a shard with index 1 will process all the clusters with an "even" id (2,4,6,...) os.Unsetenv(common.EnvControllerShardingAlgorithm) - t.Setenv(common.EnvControllerReplicas, "2") - filter := GetClusterFilter(GetDistributionFunction(nil, common.DefaultShardingAlgorithm), shardIndex) + db := &dbmocks.ArgoDB{} + db.On("GetApplicationControllerReplicas").Return(2) + filter := GetClusterFilter(db, GetDistributionFunction(db, common.DefaultShardingAlgorithm), shardIndex) assert.False(t, filter(&v1alpha1.Cluster{ID: "1"})) assert.True(t, filter(&v1alpha1.Cluster{ID: "2"})) assert.False(t, filter(&v1alpha1.Cluster{ID: "3"})) @@ -69,9 +78,10 @@ func TestGetClusterFilterDefault(t *testing.T) { func TestGetClusterFilterLegacy(t *testing.T) { shardIndex := 1 // ensuring that a shard with index 1 will process all the clusters with an "even" id (2,4,6,...) - t.Setenv(common.EnvControllerReplicas, "2") + db := &dbmocks.ArgoDB{} + db.On("GetApplicationControllerReplicas").Return(2) t.Setenv(common.EnvControllerShardingAlgorithm, common.LegacyShardingAlgorithm) - filter := GetClusterFilter(GetDistributionFunction(nil, common.LegacyShardingAlgorithm), shardIndex) + filter := GetClusterFilter(db, GetDistributionFunction(db, common.LegacyShardingAlgorithm), shardIndex) assert.False(t, filter(&v1alpha1.Cluster{ID: "1"})) assert.True(t, filter(&v1alpha1.Cluster{ID: "2"})) assert.False(t, filter(&v1alpha1.Cluster{ID: "3"})) @@ -80,9 +90,10 @@ func TestGetClusterFilterLegacy(t *testing.T) { func TestGetClusterFilterUnknown(t *testing.T) { shardIndex := 1 // ensuring that a shard with index 1 will process all the clusters with an "even" id (2,4,6,...) - t.Setenv(common.EnvControllerReplicas, "2") + db := &dbmocks.ArgoDB{} + db.On("GetApplicationControllerReplicas").Return(2) t.Setenv(common.EnvControllerShardingAlgorithm, "unknown") - filter := GetClusterFilter(GetDistributionFunction(nil, "unknown"), shardIndex) + filter := GetClusterFilter(db, GetDistributionFunction(db, "unknown"), shardIndex) assert.False(t, filter(&v1alpha1.Cluster{ID: "1"})) assert.True(t, filter(&v1alpha1.Cluster{ID: "2"})) assert.False(t, filter(&v1alpha1.Cluster{ID: "3"})) @@ -91,8 +102,9 @@ func TestGetClusterFilterUnknown(t *testing.T) { func TestLegacyGetClusterFilterWithFixedShard(t *testing.T) { shardIndex := 1 // ensuring that a shard with index 1 will process all the clusters with an "even" id (2,4,6,...) - t.Setenv(common.EnvControllerReplicas, "2") - filter := GetClusterFilter(GetDistributionFunction(nil, common.DefaultShardingAlgorithm), shardIndex) + db := &dbmocks.ArgoDB{} + db.On("GetApplicationControllerReplicas").Return(2) + filter := GetClusterFilter(db, GetDistributionFunction(db, common.DefaultShardingAlgorithm), shardIndex) assert.False(t, filter(nil)) assert.False(t, filter(&v1alpha1.Cluster{ID: "1"})) assert.True(t, filter(&v1alpha1.Cluster{ID: "2"})) @@ -100,20 +112,19 @@ func TestLegacyGetClusterFilterWithFixedShard(t *testing.T) { assert.True(t, filter(&v1alpha1.Cluster{ID: "4"})) var fixedShard int64 = 4 - filter = GetClusterFilter(GetDistributionFunction(nil, common.DefaultShardingAlgorithm), int(fixedShard)) + filter = GetClusterFilter(db, GetDistributionFunction(db, common.DefaultShardingAlgorithm), int(fixedShard)) assert.False(t, filter(&v1alpha1.Cluster{ID: "4", Shard: &fixedShard})) fixedShard = 1 - filter = GetClusterFilter(GetDistributionFunction(nil, common.DefaultShardingAlgorithm), int(fixedShard)) + filter = GetClusterFilter(db, GetDistributionFunction(db, common.DefaultShardingAlgorithm), int(fixedShard)) assert.True(t, filter(&v1alpha1.Cluster{Name: "cluster4", ID: "4", Shard: &fixedShard})) } func TestRoundRobinGetClusterFilterWithFixedShard(t *testing.T) { shardIndex := 1 // ensuring that a shard with index 1 will process all the clusters with an "even" id (2,4,6,...) - t.Setenv(common.EnvControllerReplicas, "2") db, cluster1, cluster2, cluster3, cluster4, _ := createTestClusters() - - filter := GetClusterFilter(GetDistributionFunction(db, common.RoundRobinShardingAlgorithm), shardIndex) + db.On("GetApplicationControllerReplicas").Return(2) + filter := GetClusterFilter(db, GetDistributionFunction(db, common.RoundRobinShardingAlgorithm), shardIndex) assert.False(t, filter(nil)) assert.False(t, filter(&cluster1)) assert.True(t, filter(&cluster2)) @@ -123,20 +134,20 @@ func TestRoundRobinGetClusterFilterWithFixedShard(t *testing.T) { // a cluster with a fixed shard should be processed by the specified exact // same shard unless the specified shard index is greater than the number of replicas. var fixedShard int64 = 4 - filter = GetClusterFilter(GetDistributionFunction(db, common.RoundRobinShardingAlgorithm), int(fixedShard)) + filter = GetClusterFilter(db, GetDistributionFunction(db, common.RoundRobinShardingAlgorithm), int(fixedShard)) assert.False(t, filter(&v1alpha1.Cluster{Name: "cluster4", ID: "4", Shard: &fixedShard})) fixedShard = 1 - filter = GetClusterFilter(GetDistributionFunction(db, common.RoundRobinShardingAlgorithm), int(fixedShard)) + filter = GetClusterFilter(db, GetDistributionFunction(db, common.RoundRobinShardingAlgorithm), int(fixedShard)) assert.True(t, filter(&v1alpha1.Cluster{Name: "cluster4", ID: "4", Shard: &fixedShard})) } func TestGetClusterFilterLegacyHash(t *testing.T) { shardIndex := 1 // ensuring that a shard with index 1 will process all the clusters with an "even" id (2,4,6,...) - t.Setenv(common.EnvControllerReplicas, "2") t.Setenv(common.EnvControllerShardingAlgorithm, "hash") db, cluster1, cluster2, cluster3, cluster4, _ := createTestClusters() - filter := GetClusterFilter(GetDistributionFunction(db, common.LegacyShardingAlgorithm), shardIndex) + db.On("GetApplicationControllerReplicas").Return(2) + filter := GetClusterFilter(db, GetDistributionFunction(db, common.LegacyShardingAlgorithm), shardIndex) assert.False(t, filter(&cluster1)) assert.True(t, filter(&cluster2)) assert.False(t, filter(&cluster3)) @@ -145,22 +156,22 @@ func TestGetClusterFilterLegacyHash(t *testing.T) { // a cluster with a fixed shard should be processed by the specified exact // same shard unless the specified shard index is greater than the number of replicas. var fixedShard int64 = 4 - filter = GetClusterFilter(GetDistributionFunction(db, common.LegacyShardingAlgorithm), int(fixedShard)) + filter = GetClusterFilter(db, GetDistributionFunction(db, common.LegacyShardingAlgorithm), int(fixedShard)) assert.False(t, filter(&v1alpha1.Cluster{Name: "cluster4", ID: "4", Shard: &fixedShard})) fixedShard = 1 - filter = GetClusterFilter(GetDistributionFunction(db, common.LegacyShardingAlgorithm), int(fixedShard)) + filter = GetClusterFilter(db, GetDistributionFunction(db, common.LegacyShardingAlgorithm), int(fixedShard)) assert.True(t, filter(&v1alpha1.Cluster{Name: "cluster4", ID: "4", Shard: &fixedShard})) } func TestGetClusterFilterWithEnvControllerShardingAlgorithms(t *testing.T) { db, cluster1, cluster2, cluster3, cluster4, _ := createTestClusters() shardIndex := 1 - t.Setenv(common.EnvControllerReplicas, "2") + db.On("GetApplicationControllerReplicas").Return(2) t.Run("legacy", func(t *testing.T) { t.Setenv(common.EnvControllerShardingAlgorithm, common.LegacyShardingAlgorithm) - shardShouldProcessCluster := GetClusterFilter(GetDistributionFunction(db, common.LegacyShardingAlgorithm), shardIndex) + shardShouldProcessCluster := GetClusterFilter(db, GetDistributionFunction(db, common.LegacyShardingAlgorithm), shardIndex) assert.False(t, shardShouldProcessCluster(&cluster1)) assert.True(t, shardShouldProcessCluster(&cluster2)) assert.False(t, shardShouldProcessCluster(&cluster3)) @@ -170,7 +181,7 @@ func TestGetClusterFilterWithEnvControllerShardingAlgorithms(t *testing.T) { t.Run("roundrobin", func(t *testing.T) { t.Setenv(common.EnvControllerShardingAlgorithm, common.RoundRobinShardingAlgorithm) - shardShouldProcessCluster := GetClusterFilter(GetDistributionFunction(db, common.LegacyShardingAlgorithm), shardIndex) + shardShouldProcessCluster := GetClusterFilter(db, GetDistributionFunction(db, common.LegacyShardingAlgorithm), shardIndex) assert.False(t, shardShouldProcessCluster(&cluster1)) assert.True(t, shardShouldProcessCluster(&cluster2)) assert.False(t, shardShouldProcessCluster(&cluster3)) @@ -183,7 +194,7 @@ func TestGetShardByIndexModuloReplicasCountDistributionFunction2(t *testing.T) { db, cluster1, cluster2, cluster3, cluster4, cluster5 := createTestClusters() t.Run("replicas set to 1", func(t *testing.T) { - t.Setenv(common.EnvControllerReplicas, "1") + db.On("GetApplicationControllerReplicas").Return(1).Once() distributionFunction := RoundRobinDistributionFunction(db) assert.Equal(t, 0, distributionFunction(nil)) assert.Equal(t, 0, distributionFunction(&cluster1)) @@ -194,7 +205,7 @@ func TestGetShardByIndexModuloReplicasCountDistributionFunction2(t *testing.T) { }) t.Run("replicas set to 2", func(t *testing.T) { - t.Setenv(common.EnvControllerReplicas, "2") + db.On("GetApplicationControllerReplicas").Return(2).Once() distributionFunction := RoundRobinDistributionFunction(db) assert.Equal(t, 0, distributionFunction(nil)) assert.Equal(t, 0, distributionFunction(&cluster1)) @@ -205,7 +216,7 @@ func TestGetShardByIndexModuloReplicasCountDistributionFunction2(t *testing.T) { }) t.Run("replicas set to 3", func(t *testing.T) { - t.Setenv(common.EnvControllerReplicas, "3") + db.On("GetApplicationControllerReplicas").Return(3).Once() distributionFunction := RoundRobinDistributionFunction(db) assert.Equal(t, 0, distributionFunction(nil)) assert.Equal(t, 0, distributionFunction(&cluster1)) @@ -229,7 +240,7 @@ func TestGetShardByIndexModuloReplicasCountDistributionFunctionWhenClusterNumber clusterList.Items = append(clusterList.Items, cluster) } db.On("ListClusters", mock.Anything).Return(clusterList, nil) - t.Setenv(common.EnvControllerReplicas, "2") + db.On("GetApplicationControllerReplicas").Return(2) distributionFunction := RoundRobinDistributionFunction(&db) for i, c := range clusterList.Items { assert.Equal(t, i%2, distributionFunction(&c)) @@ -249,7 +260,7 @@ func TestGetShardByIndexModuloReplicasCountDistributionFunctionWhenClusterIsAdde db.On("ListClusters", mock.Anything).Return(clusterList, nil) // Test with replicas set to 2 - t.Setenv(common.EnvControllerReplicas, "2") + db.On("GetApplicationControllerReplicas").Return(2) distributionFunction := RoundRobinDistributionFunction(&db) assert.Equal(t, 0, distributionFunction(nil)) assert.Equal(t, 0, distributionFunction(&cluster1)) @@ -270,7 +281,7 @@ func TestGetShardByIndexModuloReplicasCountDistributionFunctionWhenClusterIsAdde func TestGetShardByIndexModuloReplicasCountDistributionFunction(t *testing.T) { db, cluster1, cluster2, _, _, _ := createTestClusters() - t.Setenv(common.EnvControllerReplicas, "2") + db.On("GetApplicationControllerReplicas").Return(2) distributionFunction := RoundRobinDistributionFunction(db) // Test that the function returns the correct shard for cluster1 and cluster2 @@ -291,8 +302,8 @@ func TestInferShard(t *testing.T) { // Override the os.Hostname function to return a specific hostname for testing defer func() { osHostnameFunction = os.Hostname }() - osHostnameFunction = func() (string, error) { return "example-shard-3", nil } expectedShard := 3 + osHostnameFunction = func() (string, error) { return "example-shard-3", nil } actualShard, _ := InferShard() assert.Equal(t, expectedShard, actualShard) @@ -333,3 +344,335 @@ func createCluster(name string, id string) v1alpha1.Cluster { } return cluster } + +func Test_getDefaultShardMappingData(t *testing.T) { + expectedData := []shardApplicationControllerMapping{ + { + ShardNumber: 0, + ControllerName: "", + }, { + ShardNumber: 1, + ControllerName: "", + }, + } + + shardMappingData := getDefaultShardMappingData(2) + assert.Equal(t, expectedData, shardMappingData) +} + +func Test_generateDefaultShardMappingCM_NoPredefinedShard(t *testing.T) { + replicas := 2 + expectedTime := metav1.Now() + defer func() { osHostnameFunction = os.Hostname }() + defer func() { heartbeatCurrentTime = metav1.Now }() + + expectedMapping := []shardApplicationControllerMapping{ + { + ShardNumber: 0, + ControllerName: "test-example", + HeartbeatTime: expectedTime, + }, { + ShardNumber: 1, + }, + } + + expectedMappingCM, err := json.Marshal(expectedMapping) + assert.NoError(t, err) + + expectedShadingCM := &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: common.ArgoCDAppControllerShardConfigMapName, + Namespace: "test", + }, + Data: map[string]string{ + "shardControllerMapping": string(expectedMappingCM), + }, + } + heartbeatCurrentTime = func() metav1.Time { return expectedTime } + osHostnameFunction = func() (string, error) { return "test-example", nil } + shardingCM, err := generateDefaultShardMappingCM("test", "test-example", replicas, -1) + assert.NoError(t, err) + assert.Equal(t, expectedShadingCM, shardingCM) + +} + +func Test_generateDefaultShardMappingCM_PredefinedShard(t *testing.T) { + replicas := 2 + expectedTime := metav1.Now() + defer func() { osHostnameFunction = os.Hostname }() + defer func() { heartbeatCurrentTime = metav1.Now }() + + expectedMapping := []shardApplicationControllerMapping{ + { + ShardNumber: 0, + }, { + ShardNumber: 1, + ControllerName: "test-example", + HeartbeatTime: expectedTime, + }, + } + + expectedMappingCM, err := json.Marshal(expectedMapping) + assert.NoError(t, err) + + expectedShadingCM := &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: common.ArgoCDAppControllerShardConfigMapName, + Namespace: "test", + }, + Data: map[string]string{ + "shardControllerMapping": string(expectedMappingCM), + }, + } + heartbeatCurrentTime = func() metav1.Time { return expectedTime } + osHostnameFunction = func() (string, error) { return "test-example", nil } + shardingCM, err := generateDefaultShardMappingCM("test", "test-example", replicas, 1) + assert.NoError(t, err) + assert.Equal(t, expectedShadingCM, shardingCM) + +} + +func Test_getOrUpdateShardNumberForController(t *testing.T) { + expectedTime := metav1.Now() + + testCases := []struct { + name string + shardApplicationControllerMapping []shardApplicationControllerMapping + hostname string + replicas int + shard int + expectedShard int + expectedShardMappingData []shardApplicationControllerMapping + }{ + { + name: "length of shard mapping less than number of replicas - Existing controller", + shardApplicationControllerMapping: []shardApplicationControllerMapping{ + { + ControllerName: "test-example", + ShardNumber: 0, + HeartbeatTime: metav1.Date(1, time.January, 1, 0, 0, 0, 0, time.UTC), + }, + }, + hostname: "test-example", + replicas: 2, + shard: -1, + expectedShard: 0, + expectedShardMappingData: []shardApplicationControllerMapping{ + { + ControllerName: "test-example", + ShardNumber: 0, + HeartbeatTime: expectedTime, + }, { + ControllerName: "", + ShardNumber: 1, + HeartbeatTime: metav1.Date(1, time.January, 1, 0, 0, 0, 0, time.UTC), + }, + }, + }, + { + name: "length of shard mapping less than number of replicas - New controller", + shardApplicationControllerMapping: []shardApplicationControllerMapping{ + { + ControllerName: "test-example", + ShardNumber: 0, + HeartbeatTime: expectedTime, + }, + }, + hostname: "test-example-1", + replicas: 2, + shard: -1, + expectedShard: 1, + expectedShardMappingData: []shardApplicationControllerMapping{ + { + ControllerName: "test-example", + ShardNumber: 0, + HeartbeatTime: expectedTime, + }, { + ControllerName: "test-example-1", + ShardNumber: 1, + HeartbeatTime: expectedTime, + }, + }, + }, + { + name: "length of shard mapping more than number of replicas", + shardApplicationControllerMapping: []shardApplicationControllerMapping{ + { + ControllerName: "test-example", + ShardNumber: 0, + HeartbeatTime: expectedTime, + }, { + ControllerName: "test-example-1", + ShardNumber: 1, + HeartbeatTime: expectedTime, + }, + }, + hostname: "test-example", + replicas: 1, + shard: -1, + expectedShard: 0, + expectedShardMappingData: []shardApplicationControllerMapping{ + { + ControllerName: "test-example", + ShardNumber: 0, + HeartbeatTime: expectedTime, + }, + }, + }, + { + name: "shard number is pre-specified and length of shard mapping less than number of replicas - Existing controller", + shardApplicationControllerMapping: []shardApplicationControllerMapping{ + { + ControllerName: "test-example-1", + ShardNumber: 1, + HeartbeatTime: metav1.Date(1, time.January, 1, 0, 0, 0, 0, time.UTC), + }, { + ControllerName: "test-example", + ShardNumber: 0, + HeartbeatTime: expectedTime, + }, + }, + hostname: "test-example-1", + replicas: 2, + shard: 1, + expectedShard: 1, + expectedShardMappingData: []shardApplicationControllerMapping{ + { + ControllerName: "test-example-1", + ShardNumber: 1, + HeartbeatTime: expectedTime, + }, { + ControllerName: "test-example", + ShardNumber: 0, + HeartbeatTime: expectedTime, + }, + }, + }, + { + name: "shard number is pre-specified and length of shard mapping less than number of replicas - New controller", + shardApplicationControllerMapping: []shardApplicationControllerMapping{ + { + ControllerName: "test-example", + ShardNumber: 0, + HeartbeatTime: expectedTime, + }, + }, + hostname: "test-example-1", + replicas: 2, + shard: 1, + expectedShard: 1, + expectedShardMappingData: []shardApplicationControllerMapping{ + { + ControllerName: "test-example", + ShardNumber: 0, + HeartbeatTime: expectedTime, + }, { + ControllerName: "test-example-1", + ShardNumber: 1, + HeartbeatTime: expectedTime, + }, + }, + }, + { + name: "shard number is pre-specified and length of shard mapping more than number of replicas", + shardApplicationControllerMapping: []shardApplicationControllerMapping{ + { + ControllerName: "test-example", + ShardNumber: 0, + HeartbeatTime: expectedTime, + }, { + ControllerName: "test-example-1", + ShardNumber: 1, + HeartbeatTime: expectedTime, + }, { + ControllerName: "test-example-2", + ShardNumber: 2, + HeartbeatTime: expectedTime, + }, + }, + hostname: "test-example", + replicas: 2, + shard: 1, + expectedShard: 1, + expectedShardMappingData: []shardApplicationControllerMapping{ + { + ControllerName: "", + ShardNumber: 0, + HeartbeatTime: metav1.Date(1, time.January, 1, 0, 0, 0, 0, time.UTC), + }, { + ControllerName: "test-example", + ShardNumber: 1, + HeartbeatTime: expectedTime, + }, + }, + }, + { + name: "updating heartbeat", + shardApplicationControllerMapping: []shardApplicationControllerMapping{ + { + ControllerName: "test-example", + ShardNumber: 0, + HeartbeatTime: expectedTime, + }, { + ControllerName: "test-example-1", + ShardNumber: 1, + HeartbeatTime: metav1.Date(1, time.January, 1, 0, 0, 0, 0, time.UTC), + }, + }, + hostname: "test-example-1", + replicas: 2, + shard: -1, + expectedShard: 1, + expectedShardMappingData: []shardApplicationControllerMapping{ + { + ControllerName: "test-example", + ShardNumber: 0, + HeartbeatTime: expectedTime, + }, { + ControllerName: "test-example-1", + ShardNumber: 1, + HeartbeatTime: expectedTime, + }, + }, + }, + { + name: "updating heartbeat - shard pre-defined", + shardApplicationControllerMapping: []shardApplicationControllerMapping{ + { + ControllerName: "test-example", + ShardNumber: 0, + HeartbeatTime: expectedTime, + }, { + ControllerName: "test-example-1", + ShardNumber: 1, + HeartbeatTime: metav1.Date(1, time.January, 1, 0, 0, 0, 0, time.UTC), + }, + }, + hostname: "test-example-1", + replicas: 2, + shard: 1, + expectedShard: 1, + expectedShardMappingData: []shardApplicationControllerMapping{ + { + ControllerName: "test-example", + ShardNumber: 0, + HeartbeatTime: expectedTime, + }, { + ControllerName: "test-example-1", + ShardNumber: 1, + HeartbeatTime: expectedTime, + }, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + defer func() { osHostnameFunction = os.Hostname }() + heartbeatCurrentTime = func() metav1.Time { return expectedTime } + shard, shardMappingData := getOrUpdateShardNumberForController(tc.shardApplicationControllerMapping, tc.hostname, tc.replicas, tc.shard) + assert.Equal(t, tc.expectedShard, shard) + assert.Equal(t, tc.expectedShardMappingData, shardMappingData) + }) + } +} diff --git a/docs/operator-manual/dynamic-cluster-distribution.md b/docs/operator-manual/dynamic-cluster-distribution.md new file mode 100644 index 0000000000000..b07165ae0219a --- /dev/null +++ b/docs/operator-manual/dynamic-cluster-distribution.md @@ -0,0 +1,57 @@ +# Dynamic Cluster Distribution + +*Current Status: [Alpha][1] (Since v2.9.0)* + +By default, clusters are assigned to shards indefinitely. For users of the default, hash-based sharding algorithm, this +static assignment is fine: shards will always be roughly-balanced by the hash-based algorithm. But for users of the +[round-robin](high_availability.md#argocd-application-controller) or other custom shard assignment algorithms, this +static assignment can lead to unbalanced shards when replicas are added or removed. + +Starting v2.9, Argo CD supports a dynamic cluster distribution feature. When replicas are added or removed, the sharding +algorithm is re-run to ensure that the clusters are distributed according to the algorithm. If the algorithm is +well-balanced, like round-robin, then the shards will be well-balanced. + +Previously, the shard count was set via the `ARGOCD_CONTROLLER_REPLICAS` environment variable. Changing the environment +variable forced a restart of all application controller pods. Now, the shard count is set via the `replicas` field of the deployment, +which does not require a restart of the application controller pods. + +## Enabling Dynamic Distribution of Clusters + +In order to utilize the feature, the manifests `manifests/ha/base/controller-deployment/` can be applied as a Kustomize +overlay. This overlay sets the StatefulSet replicas to `0` and deploys the application controller as a Deployment. The +dynamic distribution code automatically kicks in when the controller is deployed as a Deployment. + +!!! important + The use of a Deployment instead of a StatefulSet is an implementation detail which may change in future versions of + this feature. Therefore, the directory name of the Kustomize overlay may change as well. Monitor the release notes + to avoid issues. + +Note the introduction of new environment variable `ARGOCD_CONTROLLER_HEARTBEAT_TIME`. The environment variable is explained in [working of Dynamic Distribution Heartbeat Process](#working-of-dynamic-distribution) + +## Working of Dynamic Distribution + +To accomplish runtime distribution of clusters, the Application Controller uses a ConfigMap to associate a controller +pod with a shard number and a heartbeat to ensure that controller pods are still alive and handling their shard, in +effect, their share of the work. + +The Application Controller will create a new ConfigMap named `argocd-app-controller-shard-cm` to store the Controller <-> Shard mapping. The mapping would look like below for each shard: + +```yaml +ShardNumber : 0 +ControllerName : "argocd-application-controller-hydrxyt" +HeartbeatTime : "2009-11-17 20:34:58.651387237 +0000 UTC" +``` + +* `ControllerName`: Stores the hostname of the Application Controller pod +* `ShardNumber` : Stores the shard number managed by the controller pod +* `HeartbeatTime`: Stores the last time this heartbeat was updated. + +Controller Shard Mapping is updated in the ConfigMap during each readiness probe check of the pod, that is every 10 seconds (otherwise as configured). The controller will acquire the shard during every iteration of readiness probe check and try to update the ConfigMap with the `HeartbeatTime`. The default `HeartbeatDuration` after which the heartbeat should be updated is `10` seconds. If the ConfigMap was not updated for any controller pod for more than `3 * HeartbeatDuration`, then the readiness probe for the application pod is marked as `Unhealthy`. To increase the default `HeartbeatDuration`, you can set the environment variable `ARGOCD_CONTROLLER_HEARTBEAT_TIME` with the desired value. + +The new sharding mechanism does not monitor the environment variable `ARGOCD_CONTROLLER_REPLICAS` but instead reads the replica count directly from the Application Controller Deployment. The controller identifies the change in the number of replicas by comparing the replica count in the Application Controller Deployment and the number of mappings in the `argocd-app-controller-shard-cm` ConfigMap. + +In the scenario when the number of Application Controller replicas increases, a new entry is added to the list of mappings in the `argocd-app-controller-shard-cm` ConfigMap and the cluster distribution is triggered to re-distribute the clusters. + +In the scenario when the number of Application Controller replicas decreases, the mappings in the `argocd-app-controller-shard-cm` ConfigMap are reset and every controller acquires the shard again thus triggering the re-distribution of the clusters. + +[1]: https://github.com/argoproj/argoproj/blob/master/community/feature-status.md diff --git a/docs/operator-manual/high_availability.md b/docs/operator-manual/high_availability.md index e9d53e6be88a1..ac59c333ba7cb 100644 --- a/docs/operator-manual/high_availability.md +++ b/docs/operator-manual/high_availability.md @@ -61,8 +61,7 @@ reconciliation. In this case, we advise to use the preferred resource version in * If the controller is managing too many clusters and uses too much memory then you can shard clusters across multiple controller replicas. To enable sharding, increase the number of replicas in `argocd-application-controller` `StatefulSet` -and repeat the number of replicas in the `ARGOCD_CONTROLLER_REPLICAS` environment variable. The strategic merge patch below -demonstrates changes required to configure two controller replicas. +and repeat the number of replicas in the `ARGOCD_CONTROLLER_REPLICAS` environment variable. The strategic merge patch below demonstrates changes required to configure two controller replicas. * By default, the controller will update the cluster information every 10 seconds. If there is a problem with your cluster network environment that is causing the update time to take a long time, you can try modifying the environment variable `ARGO_CD_UPDATE_CLUSTER_INFO_TIMEOUT` to increase the timeout (the unit is seconds). diff --git a/manifests/base/application-controller-deployment/argocd-application-controller-deployment.yaml b/manifests/base/application-controller-deployment/argocd-application-controller-deployment.yaml new file mode 100644 index 0000000000000..14cb1a317bab3 --- /dev/null +++ b/manifests/base/application-controller-deployment/argocd-application-controller-deployment.yaml @@ -0,0 +1,218 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + labels: + app.kubernetes.io/name: argocd-application-controller + app.kubernetes.io/part-of: argocd + app.kubernetes.io/component: application-controller + name: argocd-application-controller +spec: + selector: + matchLabels: + app.kubernetes.io/name: argocd-application-controller + replicas: 1 + template: + metadata: + labels: + app.kubernetes.io/name: argocd-application-controller + spec: + containers: + - args: + - /usr/local/bin/argocd-application-controller + env: + - name: ARGOCD_CONTROLLER_REPLICAS + value: "1" + - name: ARGOCD_RECONCILIATION_TIMEOUT + valueFrom: + configMapKeyRef: + name: argocd-cm + key: timeout.reconciliation + optional: true + - name: ARGOCD_HARD_RECONCILIATION_TIMEOUT + valueFrom: + configMapKeyRef: + name: argocd-cm + key: timeout.hard.reconciliation + optional: true + - name: ARGOCD_APPLICATION_CONTROLLER_REPO_SERVER + valueFrom: + configMapKeyRef: + name: argocd-cmd-params-cm + key: repo.server + optional: true + - name: ARGOCD_APPLICATION_CONTROLLER_REPO_SERVER_TIMEOUT_SECONDS + valueFrom: + configMapKeyRef: + name: argocd-cmd-params-cm + key: controller.repo.server.timeout.seconds + optional: true + - name: ARGOCD_APPLICATION_CONTROLLER_STATUS_PROCESSORS + valueFrom: + configMapKeyRef: + name: argocd-cmd-params-cm + key: controller.status.processors + optional: true + - name: ARGOCD_APPLICATION_CONTROLLER_OPERATION_PROCESSORS + valueFrom: + configMapKeyRef: + name: argocd-cmd-params-cm + key: controller.operation.processors + optional: true + - name: ARGOCD_APPLICATION_CONTROLLER_LOGFORMAT + valueFrom: + configMapKeyRef: + name: argocd-cmd-params-cm + key: controller.log.format + optional: true + - name: ARGOCD_APPLICATION_CONTROLLER_LOGLEVEL + valueFrom: + configMapKeyRef: + name: argocd-cmd-params-cm + key: controller.log.level + optional: true + - name: ARGOCD_APPLICATION_CONTROLLER_METRICS_CACHE_EXPIRATION + valueFrom: + configMapKeyRef: + name: argocd-cmd-params-cm + key: controller.metrics.cache.expiration + optional: true + - name: ARGOCD_APPLICATION_CONTROLLER_SELF_HEAL_TIMEOUT_SECONDS + valueFrom: + configMapKeyRef: + name: argocd-cmd-params-cm + key: controller.self.heal.timeout.seconds + optional: true + - name: ARGOCD_APPLICATION_CONTROLLER_REPO_SERVER_PLAINTEXT + valueFrom: + configMapKeyRef: + name: argocd-cmd-params-cm + key: controller.repo.server.plaintext + optional: true + - name: ARGOCD_APPLICATION_CONTROLLER_REPO_SERVER_STRICT_TLS + valueFrom: + configMapKeyRef: + name: argocd-cmd-params-cm + key: controller.repo.server.strict.tls + optional: true + - name: ARGOCD_APPLICATION_CONTROLLER_PERSIST_RESOURCE_HEALTH + valueFrom: + configMapKeyRef: + name: argocd-cmd-params-cm + key: controller.resource.health.persist + optional: true + - name: ARGOCD_APP_STATE_CACHE_EXPIRATION + valueFrom: + configMapKeyRef: + name: argocd-cmd-params-cm + key: controller.app.state.cache.expiration + optional: true + - name: REDIS_SERVER + valueFrom: + configMapKeyRef: + name: argocd-cmd-params-cm + key: redis.server + optional: true + - name: REDIS_COMPRESSION + valueFrom: + configMapKeyRef: + name: argocd-cmd-params-cm + key: redis.compression + optional: true + - name: REDISDB + valueFrom: + configMapKeyRef: + name: argocd-cmd-params-cm + key: redis.db + optional: true + - name: ARGOCD_DEFAULT_CACHE_EXPIRATION + valueFrom: + configMapKeyRef: + name: argocd-cmd-params-cm + key: controller.default.cache.expiration + optional: true + - name: ARGOCD_APPLICATION_CONTROLLER_OTLP_ADDRESS + valueFrom: + configMapKeyRef: + name: argocd-cmd-params-cm + key: otlp.address + optional: true + - name: ARGOCD_APPLICATION_NAMESPACES + valueFrom: + configMapKeyRef: + name: argocd-cmd-params-cm + key: application.namespaces + optional: true + - name: ARGOCD_CONTROLLER_SHARDING_ALGORITHM + valueFrom: + configMapKeyRef: + name: argocd-cmd-params-cm + key: controller.sharding.algorithm + optional: true + - name: ARGOCD_APPLICATION_CONTROLLER_KUBECTL_PARALLELISM_LIMIT + valueFrom: + configMapKeyRef: + name: argocd-cmd-params-cm + key: controller.kubectl.parallelism.limit + optional: true + - name: ARGOCD_CONTROLLER_HEARTBEAT_TIME + valueFrom: + configMapKeyRef: + name: argocd-cmd-params-cm + key: controller.heatbeat.time + optional: true + image: quay.io/argoproj/argocd:latest + imagePullPolicy: Always + name: argocd-application-controller + ports: + - containerPort: 8082 + readinessProbe: + httpGet: + path: /healthz + port: 8082 + initialDelaySeconds: 5 + periodSeconds: 10 + securityContext: + runAsNonRoot: true + readOnlyRootFilesystem: true + allowPrivilegeEscalation: false + capabilities: + drop: + - ALL + seccompProfile: + type: RuntimeDefault + workingDir: /home/argocd + volumeMounts: + - name: argocd-repo-server-tls + mountPath: /app/config/controller/tls + - name: argocd-home + mountPath: /home/argocd + serviceAccountName: argocd-application-controller + affinity: + podAntiAffinity: + preferredDuringSchedulingIgnoredDuringExecution: + - weight: 100 + podAffinityTerm: + labelSelector: + matchLabels: + app.kubernetes.io/name: argocd-application-controller + topologyKey: kubernetes.io/hostname + - weight: 5 + podAffinityTerm: + labelSelector: + matchLabels: + app.kubernetes.io/part-of: argocd + topologyKey: kubernetes.io/hostname + volumes: + - emptyDir: {} + name: argocd-home + - name: argocd-repo-server-tls + secret: + secretName: argocd-repo-server-tls + optional: true + items: + - key: tls.crt + path: tls.crt + - key: tls.key + path: tls.key + - key: ca.crt + path: ca.crt \ No newline at end of file diff --git a/manifests/base/application-controller-deployment/argocd-application-controller-service.yaml b/manifests/base/application-controller-deployment/argocd-application-controller-service.yaml new file mode 100644 index 0000000000000..a769e75468483 --- /dev/null +++ b/manifests/base/application-controller-deployment/argocd-application-controller-service.yaml @@ -0,0 +1,20 @@ +apiVersion: v1 +kind: Service +metadata: + labels: + app.kubernetes.io/component: argocd-application-controller + app.kubernetes.io/name: argocd-application-controller + app.kubernetes.io/part-of: argocd + name: argocd-application-controller +spec: + ports: + - name: application-controller + protocol: TCP + port: 8082 + targetPort: 8082 + - name: metrics + protocol: TCP + port: 8084 + targetPort: 8084 + selector: + app.kubernetes.io/name: argocd-application-controller \ No newline at end of file diff --git a/manifests/base/application-controller-deployment/kustomization.yaml b/manifests/base/application-controller-deployment/kustomization.yaml new file mode 100644 index 0000000000000..8f35ec8bd388f --- /dev/null +++ b/manifests/base/application-controller-deployment/kustomization.yaml @@ -0,0 +1,6 @@ +apiVersion: kustomize.config.k8s.io/v1beta1 +kind: Kustomization + +resources: +- argocd-application-controller-service.yaml +- argocd-application-controller-deployment.yaml diff --git a/manifests/base/application-controller/argocd-application-controller-statefulset.yaml b/manifests/base/application-controller/argocd-application-controller-statefulset.yaml index 270fa05bcc62e..33f02100a947a 100644 --- a/manifests/base/application-controller/argocd-application-controller-statefulset.yaml +++ b/manifests/base/application-controller/argocd-application-controller-statefulset.yaml @@ -210,4 +210,4 @@ spec: - key: tls.key path: tls.key - key: ca.crt - path: ca.crt + path: ca.crt \ No newline at end of file diff --git a/manifests/ha/base/controller-deployment/argocd-application-controller-statefulset.yaml b/manifests/ha/base/controller-deployment/argocd-application-controller-statefulset.yaml new file mode 100644 index 0000000000000..10e4ea2ac7e3e --- /dev/null +++ b/manifests/ha/base/controller-deployment/argocd-application-controller-statefulset.yaml @@ -0,0 +1,15 @@ +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: argocd-application-controller +spec: + replicas: 0 + template: + spec: + containers: + - name: argocd-application-controller + args: + - /usr/local/bin/argocd-application-controller + env: + - name: ARGOCD_CONTROLLER_REPLICAS + value: "0" \ No newline at end of file diff --git a/manifests/ha/base/controller-deployment/argocd-cmd-params-cm.yaml b/manifests/ha/base/controller-deployment/argocd-cmd-params-cm.yaml new file mode 100644 index 0000000000000..5752543cc1af3 --- /dev/null +++ b/manifests/ha/base/controller-deployment/argocd-cmd-params-cm.yaml @@ -0,0 +1,6 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: argocd-cmd-params-cm +data: + redis.server: argocd-redis-ha-haproxy:6379 diff --git a/manifests/ha/base/controller-deployment/argocd-repo-server-deployment.yaml b/manifests/ha/base/controller-deployment/argocd-repo-server-deployment.yaml new file mode 100644 index 0000000000000..b237cf6c13b24 --- /dev/null +++ b/manifests/ha/base/controller-deployment/argocd-repo-server-deployment.yaml @@ -0,0 +1,26 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: argocd-repo-server +spec: + replicas: 2 + template: + spec: + affinity: + podAntiAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + - labelSelector: + matchLabels: + app.kubernetes.io/name: argocd-repo-server + topologyKey: kubernetes.io/hostname + preferredDuringSchedulingIgnoredDuringExecution: + - weight: 100 + podAffinityTerm: + labelSelector: + matchLabels: + app.kubernetes.io/name: argocd-repo-server + topologyKey: topology.kubernetes.io/zone + containers: + - name: argocd-repo-server + args: + - /usr/local/bin/argocd-repo-server diff --git a/manifests/ha/base/controller-deployment/argocd-server-deployment.yaml b/manifests/ha/base/controller-deployment/argocd-server-deployment.yaml new file mode 100644 index 0000000000000..49eb31b1b0f29 --- /dev/null +++ b/manifests/ha/base/controller-deployment/argocd-server-deployment.yaml @@ -0,0 +1,29 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: argocd-server +spec: + replicas: 2 + template: + spec: + affinity: + podAntiAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + - labelSelector: + matchLabels: + app.kubernetes.io/name: argocd-server + topologyKey: kubernetes.io/hostname + preferredDuringSchedulingIgnoredDuringExecution: + - weight: 100 + podAffinityTerm: + labelSelector: + matchLabels: + app.kubernetes.io/name: argocd-server + topologyKey: topology.kubernetes.io/zone + containers: + - name: argocd-server + env: + - name: ARGOCD_API_SERVER_REPLICAS + value: '2' + args: + - /usr/local/bin/argocd-server diff --git a/manifests/ha/base/controller-deployment/kustomization.yaml b/manifests/ha/base/controller-deployment/kustomization.yaml new file mode 100644 index 0000000000000..d6d20d99b4516 --- /dev/null +++ b/manifests/ha/base/controller-deployment/kustomization.yaml @@ -0,0 +1,25 @@ +apiVersion: kustomize.config.k8s.io/v1beta1 +kind: Kustomization + + +patches: +- path: argocd-repo-server-deployment.yaml +- path: argocd-server-deployment.yaml +- path: argocd-application-controller-statefulset.yaml +- path: argocd-cmd-params-cm.yaml + + +images: +- name: quay.io/argoproj/argocd + newName: quay.io/argoproj/argocd + newTag: latest +resources: +- ../../../base/application-controller +- ../../../base/application-controller-deployment +- ../../../base/applicationset-controller +- ../../../base/dex +- ../../../base/repo-server +- ../../../base/server +- ../../../base/config +- ../../../base/notification +- ../redis-ha diff --git a/manifests/ha/base/overlays/argocd-application-controller-statefulset.yaml b/manifests/ha/base/overlays/argocd-application-controller-statefulset.yaml index c288595170914..c7e5e0b8e1131 100644 --- a/manifests/ha/base/overlays/argocd-application-controller-statefulset.yaml +++ b/manifests/ha/base/overlays/argocd-application-controller-statefulset.yaml @@ -8,4 +8,4 @@ spec: containers: - name: argocd-application-controller args: - - /usr/local/bin/argocd-application-controller + - /usr/local/bin/argocd-application-controller \ No newline at end of file diff --git a/test/e2e/fixture/app/context.go b/test/e2e/fixture/app/context.go index aa961f30d15fa..41c8dbd17bcad 100644 --- a/test/e2e/fixture/app/context.go +++ b/test/e2e/fixture/app/context.go @@ -65,7 +65,7 @@ func GivenWithNamespace(t *testing.T, namespace string) *Context { func GivenWithSameState(t *testing.T) *Context { // ARGOCE_E2E_DEFAULT_TIMEOUT can be used to override the default timeout // for any context. - timeout := env.ParseNumFromEnv("ARGOCD_E2E_DEFAULT_TIMEOUT", 10, 0, 180) + timeout := env.ParseNumFromEnv("ARGOCD_E2E_DEFAULT_TIMEOUT", 20, 0, 180) return &Context{ t: t, destServer: v1alpha1.KubernetesInternalAPIServerAddr, diff --git a/util/db/db.go b/util/db/db.go index 05ae38e75bb84..d5361ad148456 100644 --- a/util/db/db.go +++ b/util/db/db.go @@ -2,12 +2,16 @@ package db import ( "context" + "math" "strings" v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" + "github.com/argoproj/argo-cd/v2/common" appv1 "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" + "github.com/argoproj/argo-cd/v2/util/env" "github.com/argoproj/argo-cd/v2/util/settings" ) @@ -83,6 +87,9 @@ type ArgoDB interface { AddGPGPublicKey(ctx context.Context, keyData string) (map[string]*appv1.GnuPGPublicKey, []string, error) // DeleteGPGPublicKey removes a GPG public key from the configuration DeleteGPGPublicKey(ctx context.Context, keyID string) error + + // GetApplicationControllerReplicas gets the replicas of application controller + GetApplicationControllerReplicas() int } type db struct { @@ -140,3 +147,14 @@ func (db *db) unmarshalFromSecretsStr(secrets map[*SecretMaperValidation]*v1.Sec func StripCRLFCharacter(input string) string { return strings.TrimSpace(input) } + +// GetApplicationControllerReplicas gets the replicas of application controller +func (db *db) GetApplicationControllerReplicas() int { + // get the replicas from application controller deployment, if the application controller deployment does not exist, check for environment variable + applicationControllerName := env.StringFromEnv(common.EnvAppControllerName, common.DefaultApplicationControllerName) + appControllerDeployment, _ := db.kubeclientset.AppsV1().Deployments(db.settingsMgr.GetNamespace()).Get(context.Background(), applicationControllerName, metav1.GetOptions{}) + if appControllerDeployment != nil { + return int(*appControllerDeployment.Spec.Replicas) + } + return env.ParseNumFromEnv(common.EnvControllerReplicas, 0, 0, math.MaxInt32) +} diff --git a/util/db/db_test.go b/util/db/db_test.go index feb204e7d5cf7..ca0c3eac2b85b 100644 --- a/util/db/db_test.go +++ b/util/db/db_test.go @@ -9,6 +9,7 @@ import ( "github.com/stretchr/testify/assert" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + appv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -749,3 +750,27 @@ func TestGetClusterServersByName_InClusterConfigured(t *testing.T) { assert.NoError(t, err) assert.ElementsMatch(t, []string{v1alpha1.KubernetesInternalAPIServerAddr}, servers) } + +func TestGetApplicationControllerReplicas(t *testing.T) { + clientset := getClientset(nil) + expectedReplicas := int32(2) + t.Setenv(common.EnvControllerReplicas, "2") + db := NewDB(testNamespace, settings.NewSettingsManager(context.Background(), clientset, testNamespace), clientset) + replicas := db.GetApplicationControllerReplicas() + assert.Equal(t, int(expectedReplicas), replicas) + + expectedReplicas = int32(3) + clientset = getClientset(nil, &appv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: common.ApplicationController, + Namespace: testNamespace, + }, + Spec: appv1.DeploymentSpec{ + Replicas: &expectedReplicas, + }, + }) + t.Setenv(common.EnvControllerReplicas, "2") + db = NewDB(testNamespace, settings.NewSettingsManager(context.Background(), clientset, testNamespace), clientset) + replicas = db.GetApplicationControllerReplicas() + assert.Equal(t, int(expectedReplicas), replicas) +} diff --git a/util/db/mocks/ArgoDB.go b/util/db/mocks/ArgoDB.go index eed84975d9080..68edf900bec7e 100644 --- a/util/db/mocks/ArgoDB.go +++ b/util/db/mocks/ArgoDB.go @@ -1,14 +1,14 @@ -// Code generated by mockery v1.0.0. DO NOT EDIT. +// Code generated by mockery v2.32.4. DO NOT EDIT. package mocks import ( context "context" + db "github.com/argoproj/argo-cd/v2/util/db" mock "github.com/stretchr/testify/mock" v1alpha1 "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" - db "github.com/argoproj/argo-cd/v2/util/db" ) // ArgoDB is an autogenerated mock type for the ArgoDB type @@ -21,6 +21,11 @@ func (_m *ArgoDB) AddGPGPublicKey(ctx context.Context, keyData string) (map[stri ret := _m.Called(ctx, keyData) var r0 map[string]*v1alpha1.GnuPGPublicKey + var r1 []string + var r2 error + if rf, ok := ret.Get(0).(func(context.Context, string) (map[string]*v1alpha1.GnuPGPublicKey, []string, error)); ok { + return rf(ctx, keyData) + } if rf, ok := ret.Get(0).(func(context.Context, string) map[string]*v1alpha1.GnuPGPublicKey); ok { r0 = rf(ctx, keyData) } else { @@ -29,7 +34,6 @@ func (_m *ArgoDB) AddGPGPublicKey(ctx context.Context, keyData string) (map[stri } } - var r1 []string if rf, ok := ret.Get(1).(func(context.Context, string) []string); ok { r1 = rf(ctx, keyData) } else { @@ -38,7 +42,6 @@ func (_m *ArgoDB) AddGPGPublicKey(ctx context.Context, keyData string) (map[stri } } - var r2 error if rf, ok := ret.Get(2).(func(context.Context, string) error); ok { r2 = rf(ctx, keyData) } else { @@ -53,6 +56,10 @@ func (_m *ArgoDB) CreateCluster(ctx context.Context, c *v1alpha1.Cluster) (*v1al ret := _m.Called(ctx, c) var r0 *v1alpha1.Cluster + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *v1alpha1.Cluster) (*v1alpha1.Cluster, error)); ok { + return rf(ctx, c) + } if rf, ok := ret.Get(0).(func(context.Context, *v1alpha1.Cluster) *v1alpha1.Cluster); ok { r0 = rf(ctx, c) } else { @@ -61,7 +68,6 @@ func (_m *ArgoDB) CreateCluster(ctx context.Context, c *v1alpha1.Cluster) (*v1al } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, *v1alpha1.Cluster) error); ok { r1 = rf(ctx, c) } else { @@ -76,6 +82,10 @@ func (_m *ArgoDB) CreateRepoCertificate(ctx context.Context, certificate *v1alph ret := _m.Called(ctx, certificate, upsert) var r0 *v1alpha1.RepositoryCertificateList + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *v1alpha1.RepositoryCertificateList, bool) (*v1alpha1.RepositoryCertificateList, error)); ok { + return rf(ctx, certificate, upsert) + } if rf, ok := ret.Get(0).(func(context.Context, *v1alpha1.RepositoryCertificateList, bool) *v1alpha1.RepositoryCertificateList); ok { r0 = rf(ctx, certificate, upsert) } else { @@ -84,7 +94,6 @@ func (_m *ArgoDB) CreateRepoCertificate(ctx context.Context, certificate *v1alph } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, *v1alpha1.RepositoryCertificateList, bool) error); ok { r1 = rf(ctx, certificate, upsert) } else { @@ -99,6 +108,10 @@ func (_m *ArgoDB) CreateRepository(ctx context.Context, r *v1alpha1.Repository) ret := _m.Called(ctx, r) var r0 *v1alpha1.Repository + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *v1alpha1.Repository) (*v1alpha1.Repository, error)); ok { + return rf(ctx, r) + } if rf, ok := ret.Get(0).(func(context.Context, *v1alpha1.Repository) *v1alpha1.Repository); ok { r0 = rf(ctx, r) } else { @@ -107,7 +120,6 @@ func (_m *ArgoDB) CreateRepository(ctx context.Context, r *v1alpha1.Repository) } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, *v1alpha1.Repository) error); ok { r1 = rf(ctx, r) } else { @@ -122,6 +134,10 @@ func (_m *ArgoDB) CreateRepositoryCredentials(ctx context.Context, r *v1alpha1.R ret := _m.Called(ctx, r) var r0 *v1alpha1.RepoCreds + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *v1alpha1.RepoCreds) (*v1alpha1.RepoCreds, error)); ok { + return rf(ctx, r) + } if rf, ok := ret.Get(0).(func(context.Context, *v1alpha1.RepoCreds) *v1alpha1.RepoCreds); ok { r0 = rf(ctx, r) } else { @@ -130,7 +146,6 @@ func (_m *ArgoDB) CreateRepositoryCredentials(ctx context.Context, r *v1alpha1.R } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, *v1alpha1.RepoCreds) error); ok { r1 = rf(ctx, r) } else { @@ -201,6 +216,10 @@ func (_m *ArgoDB) GetAllHelmRepositoryCredentials(ctx context.Context) ([]*v1alp ret := _m.Called(ctx) var r0 []*v1alpha1.RepoCreds + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) ([]*v1alpha1.RepoCreds, error)); ok { + return rf(ctx) + } if rf, ok := ret.Get(0).(func(context.Context) []*v1alpha1.RepoCreds); ok { r0 = rf(ctx) } else { @@ -209,7 +228,6 @@ func (_m *ArgoDB) GetAllHelmRepositoryCredentials(ctx context.Context) ([]*v1alp } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context) error); ok { r1 = rf(ctx) } else { @@ -219,11 +237,29 @@ func (_m *ArgoDB) GetAllHelmRepositoryCredentials(ctx context.Context) ([]*v1alp return r0, r1 } +// GetApplicationControllerReplicas provides a mock function with given fields: +func (_m *ArgoDB) GetApplicationControllerReplicas() int { + ret := _m.Called() + + var r0 int + if rf, ok := ret.Get(0).(func() int); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(int) + } + + return r0 +} + // GetCluster provides a mock function with given fields: ctx, server func (_m *ArgoDB) GetCluster(ctx context.Context, server string) (*v1alpha1.Cluster, error) { ret := _m.Called(ctx, server) var r0 *v1alpha1.Cluster + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string) (*v1alpha1.Cluster, error)); ok { + return rf(ctx, server) + } if rf, ok := ret.Get(0).(func(context.Context, string) *v1alpha1.Cluster); ok { r0 = rf(ctx, server) } else { @@ -232,7 +268,6 @@ func (_m *ArgoDB) GetCluster(ctx context.Context, server string) (*v1alpha1.Clus } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { r1 = rf(ctx, server) } else { @@ -247,6 +282,10 @@ func (_m *ArgoDB) GetClusterServersByName(ctx context.Context, name string) ([]s ret := _m.Called(ctx, name) var r0 []string + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string) ([]string, error)); ok { + return rf(ctx, name) + } if rf, ok := ret.Get(0).(func(context.Context, string) []string); ok { r0 = rf(ctx, name) } else { @@ -255,7 +294,6 @@ func (_m *ArgoDB) GetClusterServersByName(ctx context.Context, name string) ([]s } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { r1 = rf(ctx, name) } else { @@ -270,6 +308,10 @@ func (_m *ArgoDB) GetProjectClusters(ctx context.Context, project string) ([]*v1 ret := _m.Called(ctx, project) var r0 []*v1alpha1.Cluster + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string) ([]*v1alpha1.Cluster, error)); ok { + return rf(ctx, project) + } if rf, ok := ret.Get(0).(func(context.Context, string) []*v1alpha1.Cluster); ok { r0 = rf(ctx, project) } else { @@ -278,7 +320,6 @@ func (_m *ArgoDB) GetProjectClusters(ctx context.Context, project string) ([]*v1 } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { r1 = rf(ctx, project) } else { @@ -293,6 +334,10 @@ func (_m *ArgoDB) GetProjectRepositories(ctx context.Context, project string) ([ ret := _m.Called(ctx, project) var r0 []*v1alpha1.Repository + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string) ([]*v1alpha1.Repository, error)); ok { + return rf(ctx, project) + } if rf, ok := ret.Get(0).(func(context.Context, string) []*v1alpha1.Repository); ok { r0 = rf(ctx, project) } else { @@ -301,7 +346,6 @@ func (_m *ArgoDB) GetProjectRepositories(ctx context.Context, project string) ([ } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { r1 = rf(ctx, project) } else { @@ -316,6 +360,10 @@ func (_m *ArgoDB) GetRepository(ctx context.Context, url string) (*v1alpha1.Repo ret := _m.Called(ctx, url) var r0 *v1alpha1.Repository + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string) (*v1alpha1.Repository, error)); ok { + return rf(ctx, url) + } if rf, ok := ret.Get(0).(func(context.Context, string) *v1alpha1.Repository); ok { r0 = rf(ctx, url) } else { @@ -324,7 +372,6 @@ func (_m *ArgoDB) GetRepository(ctx context.Context, url string) (*v1alpha1.Repo } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { r1 = rf(ctx, url) } else { @@ -339,6 +386,10 @@ func (_m *ArgoDB) GetRepositoryCredentials(ctx context.Context, name string) (*v ret := _m.Called(ctx, name) var r0 *v1alpha1.RepoCreds + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string) (*v1alpha1.RepoCreds, error)); ok { + return rf(ctx, name) + } if rf, ok := ret.Get(0).(func(context.Context, string) *v1alpha1.RepoCreds); ok { r0 = rf(ctx, name) } else { @@ -347,7 +398,6 @@ func (_m *ArgoDB) GetRepositoryCredentials(ctx context.Context, name string) (*v } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { r1 = rf(ctx, name) } else { @@ -362,6 +412,10 @@ func (_m *ArgoDB) ListClusters(ctx context.Context) (*v1alpha1.ClusterList, erro ret := _m.Called(ctx) var r0 *v1alpha1.ClusterList + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) (*v1alpha1.ClusterList, error)); ok { + return rf(ctx) + } if rf, ok := ret.Get(0).(func(context.Context) *v1alpha1.ClusterList); ok { r0 = rf(ctx) } else { @@ -370,7 +424,6 @@ func (_m *ArgoDB) ListClusters(ctx context.Context) (*v1alpha1.ClusterList, erro } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context) error); ok { r1 = rf(ctx) } else { @@ -385,6 +438,10 @@ func (_m *ArgoDB) ListConfiguredGPGPublicKeys(ctx context.Context) (map[string]* ret := _m.Called(ctx) var r0 map[string]*v1alpha1.GnuPGPublicKey + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) (map[string]*v1alpha1.GnuPGPublicKey, error)); ok { + return rf(ctx) + } if rf, ok := ret.Get(0).(func(context.Context) map[string]*v1alpha1.GnuPGPublicKey); ok { r0 = rf(ctx) } else { @@ -393,7 +450,6 @@ func (_m *ArgoDB) ListConfiguredGPGPublicKeys(ctx context.Context) (map[string]* } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context) error); ok { r1 = rf(ctx) } else { @@ -408,6 +464,10 @@ func (_m *ArgoDB) ListHelmRepositories(ctx context.Context) ([]*v1alpha1.Reposit ret := _m.Called(ctx) var r0 []*v1alpha1.Repository + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) ([]*v1alpha1.Repository, error)); ok { + return rf(ctx) + } if rf, ok := ret.Get(0).(func(context.Context) []*v1alpha1.Repository); ok { r0 = rf(ctx) } else { @@ -416,7 +476,6 @@ func (_m *ArgoDB) ListHelmRepositories(ctx context.Context) ([]*v1alpha1.Reposit } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context) error); ok { r1 = rf(ctx) } else { @@ -431,6 +490,10 @@ func (_m *ArgoDB) ListRepoCertificates(ctx context.Context, selector *db.Certifi ret := _m.Called(ctx, selector) var r0 *v1alpha1.RepositoryCertificateList + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *db.CertificateListSelector) (*v1alpha1.RepositoryCertificateList, error)); ok { + return rf(ctx, selector) + } if rf, ok := ret.Get(0).(func(context.Context, *db.CertificateListSelector) *v1alpha1.RepositoryCertificateList); ok { r0 = rf(ctx, selector) } else { @@ -439,7 +502,6 @@ func (_m *ArgoDB) ListRepoCertificates(ctx context.Context, selector *db.Certifi } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, *db.CertificateListSelector) error); ok { r1 = rf(ctx, selector) } else { @@ -454,6 +516,10 @@ func (_m *ArgoDB) ListRepositories(ctx context.Context) ([]*v1alpha1.Repository, ret := _m.Called(ctx) var r0 []*v1alpha1.Repository + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) ([]*v1alpha1.Repository, error)); ok { + return rf(ctx) + } if rf, ok := ret.Get(0).(func(context.Context) []*v1alpha1.Repository); ok { r0 = rf(ctx) } else { @@ -462,7 +528,6 @@ func (_m *ArgoDB) ListRepositories(ctx context.Context) ([]*v1alpha1.Repository, } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context) error); ok { r1 = rf(ctx) } else { @@ -477,6 +542,10 @@ func (_m *ArgoDB) ListRepositoryCredentials(ctx context.Context) ([]string, erro ret := _m.Called(ctx) var r0 []string + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) ([]string, error)); ok { + return rf(ctx) + } if rf, ok := ret.Get(0).(func(context.Context) []string); ok { r0 = rf(ctx) } else { @@ -485,7 +554,6 @@ func (_m *ArgoDB) ListRepositoryCredentials(ctx context.Context) ([]string, erro } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context) error); ok { r1 = rf(ctx) } else { @@ -500,6 +568,10 @@ func (_m *ArgoDB) RemoveRepoCertificates(ctx context.Context, selector *db.Certi ret := _m.Called(ctx, selector) var r0 *v1alpha1.RepositoryCertificateList + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *db.CertificateListSelector) (*v1alpha1.RepositoryCertificateList, error)); ok { + return rf(ctx, selector) + } if rf, ok := ret.Get(0).(func(context.Context, *db.CertificateListSelector) *v1alpha1.RepositoryCertificateList); ok { r0 = rf(ctx, selector) } else { @@ -508,7 +580,6 @@ func (_m *ArgoDB) RemoveRepoCertificates(ctx context.Context, selector *db.Certi } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, *db.CertificateListSelector) error); ok { r1 = rf(ctx, selector) } else { @@ -523,13 +594,16 @@ func (_m *ArgoDB) RepositoryExists(ctx context.Context, repoURL string) (bool, e ret := _m.Called(ctx, repoURL) var r0 bool + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string) (bool, error)); ok { + return rf(ctx, repoURL) + } if rf, ok := ret.Get(0).(func(context.Context, string) bool); ok { r0 = rf(ctx, repoURL) } else { r0 = ret.Get(0).(bool) } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { r1 = rf(ctx, repoURL) } else { @@ -544,6 +618,10 @@ func (_m *ArgoDB) UpdateCluster(ctx context.Context, c *v1alpha1.Cluster) (*v1al ret := _m.Called(ctx, c) var r0 *v1alpha1.Cluster + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *v1alpha1.Cluster) (*v1alpha1.Cluster, error)); ok { + return rf(ctx, c) + } if rf, ok := ret.Get(0).(func(context.Context, *v1alpha1.Cluster) *v1alpha1.Cluster); ok { r0 = rf(ctx, c) } else { @@ -552,7 +630,6 @@ func (_m *ArgoDB) UpdateCluster(ctx context.Context, c *v1alpha1.Cluster) (*v1al } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, *v1alpha1.Cluster) error); ok { r1 = rf(ctx, c) } else { @@ -567,6 +644,10 @@ func (_m *ArgoDB) UpdateRepository(ctx context.Context, r *v1alpha1.Repository) ret := _m.Called(ctx, r) var r0 *v1alpha1.Repository + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *v1alpha1.Repository) (*v1alpha1.Repository, error)); ok { + return rf(ctx, r) + } if rf, ok := ret.Get(0).(func(context.Context, *v1alpha1.Repository) *v1alpha1.Repository); ok { r0 = rf(ctx, r) } else { @@ -575,7 +656,6 @@ func (_m *ArgoDB) UpdateRepository(ctx context.Context, r *v1alpha1.Repository) } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, *v1alpha1.Repository) error); ok { r1 = rf(ctx, r) } else { @@ -590,6 +670,10 @@ func (_m *ArgoDB) UpdateRepositoryCredentials(ctx context.Context, r *v1alpha1.R ret := _m.Called(ctx, r) var r0 *v1alpha1.RepoCreds + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *v1alpha1.RepoCreds) (*v1alpha1.RepoCreds, error)); ok { + return rf(ctx, r) + } if rf, ok := ret.Get(0).(func(context.Context, *v1alpha1.RepoCreds) *v1alpha1.RepoCreds); ok { r0 = rf(ctx, r) } else { @@ -598,7 +682,6 @@ func (_m *ArgoDB) UpdateRepositoryCredentials(ctx context.Context, r *v1alpha1.R } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, *v1alpha1.RepoCreds) error); ok { r1 = rf(ctx, r) } else { @@ -621,3 +704,17 @@ func (_m *ArgoDB) WatchClusters(ctx context.Context, handleAddEvent func(*v1alph return r0 } + +// NewArgoDB creates a new instance of ArgoDB. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewArgoDB(t interface { + mock.TestingT + Cleanup(func()) +}) *ArgoDB { + mock := &ArgoDB{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +}