Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: dynamic rebalancing of clusters across shards #15036

Merged
merged 31 commits into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
dfbac57
Migrate Application Controller from Statefulset to Deployment
ishitasequeira Jul 10, 2023
92992dc
Add sharding deployment logic
ishitasequeira Jul 17, 2023
af120d9
Update sharding logic and add comments
ishitasequeira Jul 31, 2023
e4ea28e
Add heartbeat as an environment variable
ishitasequeira Aug 3, 2023
9702779
Add retry logic, heartbeat timeout environment variable
ishitasequeira Aug 7, 2023
01b4027
use the logic of pre-specified shard number on application controller…
ishitasequeira Aug 14, 2023
81ce977
fix manifests
ishitasequeira Aug 14, 2023
ea97b20
fix lint and e2e tests
ishitasequeira Aug 15, 2023
28ed4b0
comment out failing e2e test
ishitasequeira Aug 15, 2023
057732e
increase readiness probe interval period
ishitasequeira Aug 15, 2023
e64cee7
"comment out readiness probe to see if e2e tests succeed"
ishitasequeira Aug 15, 2023
cb184e9
revert commented readiness probe
ishitasequeira Aug 15, 2023
e0145c0
revert commented test case
ishitasequeira Aug 15, 2023
20e9a12
read environment variable for application controller deployment name
ishitasequeira Aug 23, 2023
91fd696
Add nil check on replica count for deployment of application controller
ishitasequeira Aug 31, 2023
ccf8b33
Address comments
ishitasequeira Sep 5, 2023
704eb2c
Add Informer, Update documentation, add unit tests
ishitasequeira Sep 12, 2023
4bd285d
update godoc
ishitasequeira Sep 12, 2023
6b9c2c0
remove unwanted code and logs
ishitasequeira Sep 12, 2023
ff31a45
Add more documentation
ishitasequeira Sep 20, 2023
579d515
revert ApplicationController manifest to StatefulSet
ishitasequeira Sep 21, 2023
7eba228
reverting updated docs
ishitasequeira Sep 21, 2023
422cf47
Add documentation for the new dynamic distribution feature
ishitasequeira Sep 21, 2023
d586314
update documentation
ishitasequeira Sep 21, 2023
1500a2d
Add an overlay for application controller deployment and update docum…
ishitasequeira Sep 22, 2023
d6f1224
fix nit
ishitasequeira Sep 22, 2023
bb69ec7
Marking the feature as alpha
ishitasequeira Sep 22, 2023
e27c3de
Add feature status link
ishitasequeira Sep 22, 2023
5d42013
revert go,mod changes
ishitasequeira Sep 22, 2023
2e38264
update docs to avoid focusing on StatefulSet/Deployment (#26)
crenshaw-dev Sep 22, 2023
b119743
minor update to the doc
ishitasequeira Sep 22, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@ import (
"github.com/argoproj/argo-cd/v2/util/settings"
"github.com/argoproj/argo-cd/v2/util/tls"
"github.com/argoproj/argo-cd/v2/util/trace"
kubeerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const (
// CLIName is the name of the CLI
cliName = "argocd-application-controller"
cliName = common.ApplicationController
// Default time in seconds for application resync period
defaultAppResyncPeriod = 180
// Default time in seconds for application hard resync period
Expand Down Expand Up @@ -92,7 +94,7 @@ func NewCommand() *cobra.Command {
config, err := clientConfig.ClientConfig()
errors.CheckError(err)
errors.CheckError(v1alpha1.SetK8SConfigDefaults(config))
config.UserAgent = fmt.Sprintf("argocd-application-controller/%s (%s)", vers.Version, vers.Platform)
config.UserAgent = fmt.Sprintf("%s/%s (%s)", common.DefaultApplicationControllerName, vers.Version, vers.Platform)

kubeClient := kubernetes.NewForConfigOrDie(config)
appClient := appclientset.NewForConfigOrDie(config)
Expand Down Expand Up @@ -138,6 +140,7 @@ func NewCommand() *cobra.Command {
}))
kubectl := kubeutil.NewKubectl()
clusterFilter := getClusterFilter(kubeClient, settingsMgr, shardingAlgorithm)
errors.CheckError(err)
appController, err = controller.NewApplicationController(
namespace,
settingsMgr,
Expand Down Expand Up @@ -208,20 +211,49 @@ func NewCommand() *cobra.Command {
}

func getClusterFilter(kubeClient *kubernetes.Clientset, settingsMgr *settings.SettingsManager, shardingAlgorithm string) sharding.ClusterFilterFunction {
replicas := env.ParseNumFromEnv(common.EnvControllerReplicas, 0, 0, math.MaxInt32)

var replicas int
shard := env.ParseNumFromEnv(common.EnvControllerShard, -1, -math.MaxInt32, math.MaxInt32)

applicationControllerName := env.StringFromEnv(common.EnvAppControllerName, common.DefaultApplicationControllerName)
appControllerDeployment, _ := kubeClient.AppsV1().Deployments(settingsMgr.GetNamespace()).Get(context.Background(), applicationControllerName, metav1.GetOptions{})
ishitasequeira marked this conversation as resolved.
Show resolved Hide resolved
crenshaw-dev marked this conversation as resolved.
Show resolved Hide resolved

if appControllerDeployment != nil && appControllerDeployment.Spec.Replicas != nil {
replicas = int(*appControllerDeployment.Spec.Replicas)
leoluz marked this conversation as resolved.
Show resolved Hide resolved
} else {
replicas = env.ParseNumFromEnv(common.EnvControllerReplicas, 0, 0, math.MaxInt32)
}

var clusterFilter func(cluster *v1alpha1.Cluster) bool
if replicas > 1 {
if shard < 0 {
// check for shard mapping using configmap if application-controller is a deployment
// else use existing logic to infer shard from pod name if application-controller is a statefulset
if appControllerDeployment != nil {

var err error
shard, err = sharding.InferShard()
// retry 3 times if we find a conflict while updating shard mapping configMap.
// If we still see conflicts after the retries, wait for next iteration of heartbeat process.
for i := 0; i <= common.AppControllerHeartbeatUpdateRetryCount; i++ {
shard, err = sharding.GetOrUpdateShardFromConfigMap(kubeClient, settingsMgr, replicas, shard)
if !kubeerrors.IsConflict(err) {
err = fmt.Errorf("unable to get shard due to error updating the sharding config map: %s", err)
break
}
log.Warnf("conflict when getting shard from shard mapping configMap. Retrying (%d/3)", i)
}
errors.CheckError(err)
} else {
if shard < 0 {
var err error
shard, err = sharding.InferShard()
errors.CheckError(err)
}
}
log.Infof("Processing clusters from shard %d", shard)
db := db.NewDB(settingsMgr.GetNamespace(), settingsMgr, kubeClient)
log.Infof("Using filter function: %s", shardingAlgorithm)
distributionFunction := sharding.GetDistributionFunction(db, shardingAlgorithm)
clusterFilter = sharding.GetClusterFilter(distributionFunction, shard)
clusterFilter = sharding.GetClusterFilter(db, distributionFunction, shard)
} else {
log.Info("Processing all cluster shards")
}
Expand Down
11 changes: 11 additions & 0 deletions common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ import (
"google.golang.org/grpc/status"
)

// Component names
const (
ApplicationController = "argocd-application-controller"
)

// Default service addresses and URLS of Argo CD internal services
const (
// DefaultRepoServerAddr is the gRPC address of the Argo CD repo server
Expand All @@ -34,6 +39,8 @@ const (
// ArgoCDTLSCertsConfigMapName contains TLS certificate data for connecting repositories. Will get mounted as volume to pods
ArgoCDTLSCertsConfigMapName = "argocd-tls-certs-cm"
ArgoCDGPGKeysConfigMapName = "argocd-gpg-keys-cm"
// ArgoCDAppControllerShardConfigMapName contains the application controller to shard mapping
ArgoCDAppControllerShardConfigMapName = "argocd-app-controller-shard-cm"
)

// Some default configurables
Expand Down Expand Up @@ -109,6 +116,8 @@ const (
// RoundRobinShardingAlgorithm is a flag value that can be opted for Sharding Algorithm it uses an equal distribution accross all shards
RoundRobinShardingAlgorithm = "round-robin"
DefaultShardingAlgorithm = LegacyShardingAlgorithm
// AppControllerHeartbeatUpdateRetryCount is the retry count for updating the Shard Mapping to the Shard Mapping ConfigMap used by Application Controller
AppControllerHeartbeatUpdateRetryCount = 3
)

// Dex related constants
Expand Down Expand Up @@ -209,6 +218,8 @@ const (
EnvPauseGenerationRequests = "ARGOCD_PAUSE_GEN_REQUESTS"
// EnvControllerReplicas is the number of controller replicas
EnvControllerReplicas = "ARGOCD_CONTROLLER_REPLICAS"
// EnvControllerHeartbeatTime will update the heartbeat for application controller to claim shard
EnvControllerHeartbeatTime = "ARGOCD_CONTROLLER_HEARTBEAT_TIME"
// EnvControllerShard is the shard number that should be handled by controller
EnvControllerShard = "ARGOCD_CONTROLLER_SHARD"
// EnvControllerShardingAlgorithm is the distribution sharding algorithm to be used: legacy or round-robin
Expand Down
38 changes: 33 additions & 5 deletions controller/appcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import (
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/informers"
informerv1 "k8s.io/client-go/informers/apps/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
Expand All @@ -51,6 +53,7 @@ import (
"github.com/argoproj/argo-cd/v2/reposerver/apiclient"
"github.com/argoproj/argo-cd/v2/util/argo"
argodiff "github.com/argoproj/argo-cd/v2/util/argo/diff"
"github.com/argoproj/argo-cd/v2/util/env"

appstatecache "github.com/argoproj/argo-cd/v2/util/cache/appstate"
"github.com/argoproj/argo-cd/v2/util/db"
Expand All @@ -59,10 +62,12 @@ import (
"github.com/argoproj/argo-cd/v2/util/helm"
logutils "github.com/argoproj/argo-cd/v2/util/log"
settings_util "github.com/argoproj/argo-cd/v2/util/settings"
kubeerrors "k8s.io/apimachinery/pkg/api/errors"
)

const (
updateOperationStateTimeout = 1 * time.Second
updateOperationStateTimeout = 1 * time.Second
defaultDeploymentInformerResyncDuration = 10
// orphanedIndex contains application which monitor orphaned resources by namespace
orphanedIndex = "orphaned"
)
Expand Down Expand Up @@ -105,6 +110,7 @@ type ApplicationController struct {
appInformer cache.SharedIndexInformer
appLister applisters.ApplicationLister
projInformer cache.SharedIndexInformer
deploymentInformer informerv1.DeploymentInformer
appStateManager AppStateManager
stateCache statecache.LiveStateCache
statusRefreshTimeout time.Duration
Expand Down Expand Up @@ -160,7 +166,7 @@ func NewApplicationController(
statusHardRefreshTimeout: appHardResyncPeriod,
refreshRequestedApps: make(map[string]CompareWith),
refreshRequestedAppsMutex: &sync.Mutex{},
auditLogger: argo.NewAuditLogger(namespace, kubeClientset, "argocd-application-controller"),
auditLogger: argo.NewAuditLogger(namespace, kubeClientset, common.ApplicationController),
settingsMgr: settingsMgr,
selfHealTimeout: selfHealTimeout,
clusterFilter: clusterFilter,
Expand Down Expand Up @@ -201,11 +207,31 @@ func NewApplicationController(
}
},
})

factory := informers.NewSharedInformerFactory(ctrl.kubeClientset, defaultDeploymentInformerResyncDuration)
deploymentInformer := factory.Apps().V1().Deployments()

readinessHealthCheck := func(r *http.Request) error {
applicationControllerName := env.StringFromEnv(common.EnvAppControllerName, common.DefaultApplicationControllerName)
appControllerDeployment, err := deploymentInformer.Lister().Deployments(settingsMgr.GetNamespace()).Get(applicationControllerName)
if !kubeerrors.IsNotFound(err) {
return fmt.Errorf("error retrieving Application Controller Deployment: %s", err)
}
if appControllerDeployment != nil {
if appControllerDeployment.Spec.Replicas != nil && int(*appControllerDeployment.Spec.Replicas) <= 0 {
return fmt.Errorf("application controller deployment replicas is not set or is less than 0, replicas: %d", appControllerDeployment.Spec.Replicas)
}
shard := env.ParseNumFromEnv(common.EnvControllerShard, -1, -math.MaxInt32, math.MaxInt32)
if _, err := sharding.GetOrUpdateShardFromConfigMap(kubeClientset.(*kubernetes.Clientset), settingsMgr, int(*appControllerDeployment.Spec.Replicas), shard); err != nil {
return fmt.Errorf("error while updating the heartbeat for to the Shard Mapping ConfigMap: %s", err)
}
}
return nil
}

metricsAddr := fmt.Sprintf("0.0.0.0:%d", metricsPort)
var err error
ctrl.metricsServer, err = metrics.NewMetricsServer(metricsAddr, appLister, ctrl.canProcessApp, func(r *http.Request) error {
return nil
}, metricsApplicationLabels)
ctrl.metricsServer, err = metrics.NewMetricsServer(metricsAddr, appLister, ctrl.canProcessApp, readinessHealthCheck, metricsApplicationLabels)
leoluz marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}
Expand All @@ -220,6 +246,7 @@ func NewApplicationController(
ctrl.appInformer = appInformer
ctrl.appLister = appLister
ctrl.projInformer = projInformer
ctrl.deploymentInformer = deploymentInformer
ctrl.appStateManager = appStateManager
ctrl.stateCache = stateCache

Expand Down Expand Up @@ -724,6 +751,7 @@ func (ctrl *ApplicationController) Run(ctx context.Context, statusProcessors int

go ctrl.appInformer.Run(ctx.Done())
go ctrl.projInformer.Run(ctx.Done())
go ctrl.deploymentInformer.Informer().Run(ctx.Done())

errors.CheckError(ctrl.stateCache.Init())

Expand Down
5 changes: 3 additions & 2 deletions controller/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
log "github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/labels"

"github.com/argoproj/argo-cd/v2/common"
argoappv1 "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
applister "github.com/argoproj/argo-cd/v2/pkg/client/listers/application/v1alpha1"
"github.com/argoproj/argo-cd/v2/util/git"
Expand Down Expand Up @@ -260,12 +261,12 @@ func (m *MetricsServer) IncKubernetesRequest(app *argoappv1.Application, server,
}

func (m *MetricsServer) IncRedisRequest(failed bool) {
m.redisRequestCounter.WithLabelValues(m.hostname, "argocd-application-controller", strconv.FormatBool(failed)).Inc()
m.redisRequestCounter.WithLabelValues(m.hostname, common.ApplicationController, strconv.FormatBool(failed)).Inc()
}

// ObserveRedisRequestDuration observes redis request duration
func (m *MetricsServer) ObserveRedisRequestDuration(duration time.Duration) {
m.redisRequestHistogram.WithLabelValues(m.hostname, "argocd-application-controller").Observe(duration.Seconds())
m.redisRequestHistogram.WithLabelValues(m.hostname, common.ApplicationController).Observe(duration.Seconds())
}

// IncReconcile increments the reconcile counter for an application
Expand Down
Loading
Loading