Skip to content

Commit

Permalink
feat: dynamic rebalancing of clusters across shards (argoproj#15036)
Browse files Browse the repository at this point in the history
* Migrate Application Controller from Statefulset to Deployment

Signed-off-by: ishitasequeira <[email protected]>

* Add sharding deployment logic

Signed-off-by: ishitasequeira <[email protected]>

* Update sharding logic and add comments

Signed-off-by: ishitasequeira <[email protected]>

* Add heartbeat as an environment variable

Signed-off-by: ishitasequeira <[email protected]>

* Add retry logic, heartbeat timeout environment variable

Signed-off-by: ishitasequeira <[email protected]>

* use the logic of pre-specified shard number on application controller pod

Signed-off-by: ishitasequeira <[email protected]>

* fix manifests

Signed-off-by: ishitasequeira <[email protected]>

* fix lint and e2e tests

Signed-off-by: ishitasequeira <[email protected]>

* comment out failing e2e test

Signed-off-by: ishitasequeira <[email protected]>

* increase readiness probe interval period

Signed-off-by: ishitasequeira <[email protected]>

* "comment out readiness probe to see if e2e tests succeed"

Signed-off-by: ishitasequeira <[email protected]>

* revert commented readiness probe

Signed-off-by: ishitasequeira <[email protected]>

* revert commented test case

Signed-off-by: ishitasequeira <[email protected]>

* read environment variable for application controller deployment name

Signed-off-by: ishitasequeira <[email protected]>

* Add nil check on replica count for deployment of application controller

Signed-off-by: ishitasequeira <[email protected]>

* Address comments

Signed-off-by: ishitasequeira <[email protected]>

* Add Informer, Update documentation, add unit tests

Signed-off-by: ishitasequeira <[email protected]>

* update godoc

Signed-off-by: ishitasequeira <[email protected]>

* remove unwanted code and logs

Signed-off-by: ishitasequeira <[email protected]>

* Add more documentation

Signed-off-by: ishitasequeira <[email protected]>

* revert ApplicationController manifest to StatefulSet

Signed-off-by: ishitasequeira <[email protected]>

* reverting updated docs

Signed-off-by: ishitasequeira <[email protected]>

* Add documentation for the new dynamic distribution feature

Signed-off-by: ishitasequeira <[email protected]>

* update documentation

Signed-off-by: ishitasequeira <[email protected]>

* Add an overlay for application controller deployment and update documentation

Signed-off-by: ishitasequeira <[email protected]>

* fix nit

Signed-off-by: ishitasequeira <[email protected]>

* Marking the feature as alpha

Signed-off-by: ishitasequeira <[email protected]>

* Add feature status link

Signed-off-by: ishitasequeira <[email protected]>

* revert go,mod changes

Signed-off-by: ishitasequeira <[email protected]>

* update docs to avoid focusing on StatefulSet/Deployment (argoproj#26)

* update docs to avoid focusing on StatefulSet/Deployment

Signed-off-by: Michael Crenshaw <[email protected]>

---------

Signed-off-by: Michael Crenshaw <[email protected]>

* minor update to the doc

Signed-off-by: ishitasequeira <[email protected]>

---------

Signed-off-by: ishitasequeira <[email protected]>
Signed-off-by: Michael Crenshaw <[email protected]>
Co-authored-by: Michael Crenshaw <[email protected]>
  • Loading branch information
2 people authored and lukaszgyg committed Jan 12, 2024
1 parent 17a2eb2 commit daf5615
Show file tree
Hide file tree
Showing 22 changed files with 1,239 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@ import (
"github.com/argoproj/argo-cd/v2/util/settings"
"github.com/argoproj/argo-cd/v2/util/tls"
"github.com/argoproj/argo-cd/v2/util/trace"
kubeerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const (
// CLIName is the name of the CLI
cliName = "argocd-application-controller"
cliName = common.ApplicationController
// Default time in seconds for application resync period
defaultAppResyncPeriod = 180
// Default time in seconds for application hard resync period
Expand Down Expand Up @@ -92,7 +94,7 @@ func NewCommand() *cobra.Command {
config, err := clientConfig.ClientConfig()
errors.CheckError(err)
errors.CheckError(v1alpha1.SetK8SConfigDefaults(config))
config.UserAgent = fmt.Sprintf("argocd-application-controller/%s (%s)", vers.Version, vers.Platform)
config.UserAgent = fmt.Sprintf("%s/%s (%s)", common.DefaultApplicationControllerName, vers.Version, vers.Platform)

kubeClient := kubernetes.NewForConfigOrDie(config)
appClient := appclientset.NewForConfigOrDie(config)
Expand Down Expand Up @@ -138,6 +140,7 @@ func NewCommand() *cobra.Command {
}))
kubectl := kubeutil.NewKubectl()
clusterFilter := getClusterFilter(kubeClient, settingsMgr, shardingAlgorithm)
errors.CheckError(err)
appController, err = controller.NewApplicationController(
namespace,
settingsMgr,
Expand Down Expand Up @@ -208,20 +211,49 @@ func NewCommand() *cobra.Command {
}

func getClusterFilter(kubeClient *kubernetes.Clientset, settingsMgr *settings.SettingsManager, shardingAlgorithm string) sharding.ClusterFilterFunction {
replicas := env.ParseNumFromEnv(common.EnvControllerReplicas, 0, 0, math.MaxInt32)

var replicas int
shard := env.ParseNumFromEnv(common.EnvControllerShard, -1, -math.MaxInt32, math.MaxInt32)

applicationControllerName := env.StringFromEnv(common.EnvAppControllerName, common.DefaultApplicationControllerName)
appControllerDeployment, _ := kubeClient.AppsV1().Deployments(settingsMgr.GetNamespace()).Get(context.Background(), applicationControllerName, metav1.GetOptions{})

if appControllerDeployment != nil && appControllerDeployment.Spec.Replicas != nil {
replicas = int(*appControllerDeployment.Spec.Replicas)
} else {
replicas = env.ParseNumFromEnv(common.EnvControllerReplicas, 0, 0, math.MaxInt32)
}

var clusterFilter func(cluster *v1alpha1.Cluster) bool
if replicas > 1 {
if shard < 0 {
// check for shard mapping using configmap if application-controller is a deployment
// else use existing logic to infer shard from pod name if application-controller is a statefulset
if appControllerDeployment != nil {

var err error
shard, err = sharding.InferShard()
// retry 3 times if we find a conflict while updating shard mapping configMap.
// If we still see conflicts after the retries, wait for next iteration of heartbeat process.
for i := 0; i <= common.AppControllerHeartbeatUpdateRetryCount; i++ {
shard, err = sharding.GetOrUpdateShardFromConfigMap(kubeClient, settingsMgr, replicas, shard)
if !kubeerrors.IsConflict(err) {
err = fmt.Errorf("unable to get shard due to error updating the sharding config map: %s", err)
break
}
log.Warnf("conflict when getting shard from shard mapping configMap. Retrying (%d/3)", i)
}
errors.CheckError(err)
} else {
if shard < 0 {
var err error
shard, err = sharding.InferShard()
errors.CheckError(err)
}
}
log.Infof("Processing clusters from shard %d", shard)
db := db.NewDB(settingsMgr.GetNamespace(), settingsMgr, kubeClient)
log.Infof("Using filter function: %s", shardingAlgorithm)
distributionFunction := sharding.GetDistributionFunction(db, shardingAlgorithm)
clusterFilter = sharding.GetClusterFilter(distributionFunction, shard)
clusterFilter = sharding.GetClusterFilter(db, distributionFunction, shard)
} else {
log.Info("Processing all cluster shards")
}
Expand Down
11 changes: 11 additions & 0 deletions common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ import (
"google.golang.org/grpc/status"
)

// Component names
const (
ApplicationController = "argocd-application-controller"
)

// Default service addresses and URLS of Argo CD internal services
const (
// DefaultRepoServerAddr is the gRPC address of the Argo CD repo server
Expand All @@ -34,6 +39,8 @@ const (
// ArgoCDTLSCertsConfigMapName contains TLS certificate data for connecting repositories. Will get mounted as volume to pods
ArgoCDTLSCertsConfigMapName = "argocd-tls-certs-cm"
ArgoCDGPGKeysConfigMapName = "argocd-gpg-keys-cm"
// ArgoCDAppControllerShardConfigMapName contains the application controller to shard mapping
ArgoCDAppControllerShardConfigMapName = "argocd-app-controller-shard-cm"
)

// Some default configurables
Expand Down Expand Up @@ -109,6 +116,8 @@ const (
// RoundRobinShardingAlgorithm is a flag value that can be opted for Sharding Algorithm it uses an equal distribution accross all shards
RoundRobinShardingAlgorithm = "round-robin"
DefaultShardingAlgorithm = LegacyShardingAlgorithm
// AppControllerHeartbeatUpdateRetryCount is the retry count for updating the Shard Mapping to the Shard Mapping ConfigMap used by Application Controller
AppControllerHeartbeatUpdateRetryCount = 3
)

// Dex related constants
Expand Down Expand Up @@ -209,6 +218,8 @@ const (
EnvPauseGenerationRequests = "ARGOCD_PAUSE_GEN_REQUESTS"
// EnvControllerReplicas is the number of controller replicas
EnvControllerReplicas = "ARGOCD_CONTROLLER_REPLICAS"
// EnvControllerHeartbeatTime will update the heartbeat for application controller to claim shard
EnvControllerHeartbeatTime = "ARGOCD_CONTROLLER_HEARTBEAT_TIME"
// EnvControllerShard is the shard number that should be handled by controller
EnvControllerShard = "ARGOCD_CONTROLLER_SHARD"
// EnvControllerShardingAlgorithm is the distribution sharding algorithm to be used: legacy or round-robin
Expand Down
38 changes: 33 additions & 5 deletions controller/appcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import (
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/informers"
informerv1 "k8s.io/client-go/informers/apps/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
Expand All @@ -51,6 +53,7 @@ import (
"github.com/argoproj/argo-cd/v2/reposerver/apiclient"
"github.com/argoproj/argo-cd/v2/util/argo"
argodiff "github.com/argoproj/argo-cd/v2/util/argo/diff"
"github.com/argoproj/argo-cd/v2/util/env"

appstatecache "github.com/argoproj/argo-cd/v2/util/cache/appstate"
"github.com/argoproj/argo-cd/v2/util/db"
Expand All @@ -59,10 +62,12 @@ import (
"github.com/argoproj/argo-cd/v2/util/helm"
logutils "github.com/argoproj/argo-cd/v2/util/log"
settings_util "github.com/argoproj/argo-cd/v2/util/settings"
kubeerrors "k8s.io/apimachinery/pkg/api/errors"
)

const (
updateOperationStateTimeout = 1 * time.Second
updateOperationStateTimeout = 1 * time.Second
defaultDeploymentInformerResyncDuration = 10
// orphanedIndex contains application which monitor orphaned resources by namespace
orphanedIndex = "orphaned"
)
Expand Down Expand Up @@ -105,6 +110,7 @@ type ApplicationController struct {
appInformer cache.SharedIndexInformer
appLister applisters.ApplicationLister
projInformer cache.SharedIndexInformer
deploymentInformer informerv1.DeploymentInformer
appStateManager AppStateManager
stateCache statecache.LiveStateCache
statusRefreshTimeout time.Duration
Expand Down Expand Up @@ -160,7 +166,7 @@ func NewApplicationController(
statusHardRefreshTimeout: appHardResyncPeriod,
refreshRequestedApps: make(map[string]CompareWith),
refreshRequestedAppsMutex: &sync.Mutex{},
auditLogger: argo.NewAuditLogger(namespace, kubeClientset, "argocd-application-controller"),
auditLogger: argo.NewAuditLogger(namespace, kubeClientset, common.ApplicationController),
settingsMgr: settingsMgr,
selfHealTimeout: selfHealTimeout,
clusterFilter: clusterFilter,
Expand Down Expand Up @@ -201,11 +207,31 @@ func NewApplicationController(
}
},
})

factory := informers.NewSharedInformerFactory(ctrl.kubeClientset, defaultDeploymentInformerResyncDuration)
deploymentInformer := factory.Apps().V1().Deployments()

readinessHealthCheck := func(r *http.Request) error {
applicationControllerName := env.StringFromEnv(common.EnvAppControllerName, common.DefaultApplicationControllerName)
appControllerDeployment, err := deploymentInformer.Lister().Deployments(settingsMgr.GetNamespace()).Get(applicationControllerName)
if !kubeerrors.IsNotFound(err) {
return fmt.Errorf("error retrieving Application Controller Deployment: %s", err)
}
if appControllerDeployment != nil {
if appControllerDeployment.Spec.Replicas != nil && int(*appControllerDeployment.Spec.Replicas) <= 0 {
return fmt.Errorf("application controller deployment replicas is not set or is less than 0, replicas: %d", appControllerDeployment.Spec.Replicas)
}
shard := env.ParseNumFromEnv(common.EnvControllerShard, -1, -math.MaxInt32, math.MaxInt32)
if _, err := sharding.GetOrUpdateShardFromConfigMap(kubeClientset.(*kubernetes.Clientset), settingsMgr, int(*appControllerDeployment.Spec.Replicas), shard); err != nil {
return fmt.Errorf("error while updating the heartbeat for to the Shard Mapping ConfigMap: %s", err)
}
}
return nil
}

metricsAddr := fmt.Sprintf("0.0.0.0:%d", metricsPort)
var err error
ctrl.metricsServer, err = metrics.NewMetricsServer(metricsAddr, appLister, ctrl.canProcessApp, func(r *http.Request) error {
return nil
}, metricsApplicationLabels)
ctrl.metricsServer, err = metrics.NewMetricsServer(metricsAddr, appLister, ctrl.canProcessApp, readinessHealthCheck, metricsApplicationLabels)
if err != nil {
return nil, err
}
Expand All @@ -220,6 +246,7 @@ func NewApplicationController(
ctrl.appInformer = appInformer
ctrl.appLister = appLister
ctrl.projInformer = projInformer
ctrl.deploymentInformer = deploymentInformer
ctrl.appStateManager = appStateManager
ctrl.stateCache = stateCache

Expand Down Expand Up @@ -724,6 +751,7 @@ func (ctrl *ApplicationController) Run(ctx context.Context, statusProcessors int

go ctrl.appInformer.Run(ctx.Done())
go ctrl.projInformer.Run(ctx.Done())
go ctrl.deploymentInformer.Informer().Run(ctx.Done())

errors.CheckError(ctrl.stateCache.Init())

Expand Down
5 changes: 3 additions & 2 deletions controller/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
log "github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/labels"

"github.com/argoproj/argo-cd/v2/common"
argoappv1 "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
applister "github.com/argoproj/argo-cd/v2/pkg/client/listers/application/v1alpha1"
"github.com/argoproj/argo-cd/v2/util/git"
Expand Down Expand Up @@ -260,12 +261,12 @@ func (m *MetricsServer) IncKubernetesRequest(app *argoappv1.Application, server,
}

func (m *MetricsServer) IncRedisRequest(failed bool) {
m.redisRequestCounter.WithLabelValues(m.hostname, "argocd-application-controller", strconv.FormatBool(failed)).Inc()
m.redisRequestCounter.WithLabelValues(m.hostname, common.ApplicationController, strconv.FormatBool(failed)).Inc()
}

// ObserveRedisRequestDuration observes redis request duration
func (m *MetricsServer) ObserveRedisRequestDuration(duration time.Duration) {
m.redisRequestHistogram.WithLabelValues(m.hostname, "argocd-application-controller").Observe(duration.Seconds())
m.redisRequestHistogram.WithLabelValues(m.hostname, common.ApplicationController).Observe(duration.Seconds())
}

// IncReconcile increments the reconcile counter for an application
Expand Down
Loading

0 comments on commit daf5615

Please sign in to comment.