diff --git a/docs/operator-manual/high_availability.md b/docs/operator-manual/high_availability.md index ac59c333ba7cb..05f11f119a0d5 100644 --- a/docs/operator-manual/high_availability.md +++ b/docs/operator-manual/high_availability.md @@ -156,6 +156,7 @@ spec: * The `ARGOCD_API_SERVER_REPLICAS` environment variable is used to divide [the limit of concurrent login requests (`ARGOCD_MAX_CONCURRENT_LOGIN_REQUESTS_COUNT`)](./user-management/index.md#failed-logins-rate-limiting) between each replica. * The `ARGOCD_GRPC_MAX_SIZE_MB` environment variable allows specifying the max size of the server response message in megabytes. The default value is 200. You might need to increase this for an Argo CD instance that manages 3000+ applications. +* The server's [webhook handler](./webhook.md#configuring-webhook-handlers-application-refresh-concurrency) refreshes apps by patching Applications with the `argocd.argoproj.io/refresh` annotation, sending network-bound requests to the Kubernetes API server. To speed up webhook processing, especially with thousands of applications, you can increase the max number of concurrent requests the webhook handler sends to the Kubernetes API Server using the `webhook.maxConcurrentAppRefresh` setting in your `argocd-cm` ConfigMap (`10` by default). Bear in mind that this might put the API server under heavy load if set too high. ### argocd-dex-server, argocd-redis diff --git a/docs/operator-manual/webhook.md b/docs/operator-manual/webhook.md index 1d5ad5ec79c96..a60f1a6346a30 100644 --- a/docs/operator-manual/webhook.md +++ b/docs/operator-manual/webhook.md @@ -97,3 +97,27 @@ stringData: ``` After saving, the changes should take effect automatically. + +## Configuring Webhook Handler's Application Refresh Concurrency + +After receiving a webhook event, ArgoCD refreshes all Applications which need to be refreshed. To do this, +the webhook handler adds the `argocd.argoproj.io/refresh` annotation to the Applications by sending patch +requests to your control-plane's Kubernetes API server for each Application. If your repository provider limits webhook +timeout and you have several hundreds or thousands of apps, you can speed up the handler's refresh process by +configuring how many concurrent patch requests the webhook handler can send to Kubernetes API server using the +`webhook.maxConcurrentAppRefresh` setting in your `argocd-cm` ConfigMap (`10` by default). Put into consideration your +Kubernetes API server's ability to handle requests. Setting this setting to a very high number (relative to your +Kubernetes API server) can potentially overload the Kubernetes API server. + +```yaml +apiVersion: v1 +kind: ConfigMap +metadata: + name: argocd-cm + namespace: argocd +... +data: + # max concurrent refresh annotation patch requests the webhook handler can send to Kubernetes API server. + webhook.maxConcurrentAppRefresh: 10 +... +``` diff --git a/util/settings/settings.go b/util/settings/settings.go index b992a32ed164d..7a8756e75a185 100644 --- a/util/settings/settings.go +++ b/util/settings/settings.go @@ -76,6 +76,8 @@ type ArgoCDSettings struct { WebhookAzureDevOpsUsername string `json:"webhookAzureDevOpsUsername,omitempty"` // WebhookAzureDevOpsPassword holds the password for authenticating Azure DevOps webhook events WebhookAzureDevOpsPassword string `json:"webhookAzureDevOpsPassword,omitempty"` + // WebhookMaxConcurrentAppRefresh sets the number of concurrent app refreshes run by the webhook handler + WebhookMaxConcurrentAppRefresh int `json:"webhookMaxConcurrentAppRefresh,omitempty"` // Secrets holds all secrets in argocd-secret as a map[string]string Secrets map[string]string `json:"secrets,omitempty"` // KustomizeBuildOptions is a string of kustomize build parameters @@ -495,6 +497,10 @@ const ( RespectRBAC = "resource.respectRBAC" RespectRBACValueStrict = "strict" RespectRBACValueNormal = "normal" + // settingsWebhookMaxConcurrentAppRefresh is the key for max concurrent app refresh + settingsWebhookMaxConcurrentAppRefreshKey = "webhook.maxConcurrentAppRefresh" + // defaultSettingsWebhookMaxConcurrentAppRefresh is the default value for the number of max concurrent app refresh + defaultSettingsWebhookMaxConcurrentAppRefresh = 10 ) var ( @@ -1447,6 +1453,19 @@ func updateSettingsFromConfigMap(settings *ArgoCDSettings, argoCDCM *apiv1.Confi settings.TrackingMethod = argoCDCM.Data[settingsResourceTrackingMethodKey] settings.OIDCTLSInsecureSkipVerify = argoCDCM.Data[oidcTLSInsecureSkipVerifyKey] == "true" settings.ExtensionConfig = argoCDCM.Data[extensionConfig] + if webhookMaxConcurrentAppRefresh := argoCDCM.Data[settingsWebhookMaxConcurrentAppRefreshKey]; len(webhookMaxConcurrentAppRefresh) > 0 { + i, err := strconv.Atoi(string(webhookMaxConcurrentAppRefresh)) + if err != nil { + log.Warnf("invalid input for %s: %s. Using the default value %d.", settingsWebhookMaxConcurrentAppRefreshKey, err.Error(), defaultSettingsWebhookMaxConcurrentAppRefresh) + } + if i < 1 { + i = defaultSettingsWebhookMaxConcurrentAppRefresh + log.Warnf("%s is less than 1. Using the default value %d.", settingsWebhookMaxConcurrentAppRefreshKey, defaultSettingsWebhookMaxConcurrentAppRefresh) + } + settings.WebhookMaxConcurrentAppRefresh = i + } else { + settings.WebhookMaxConcurrentAppRefresh = defaultSettingsWebhookMaxConcurrentAppRefresh + } } // validateExternalURL ensures the external URL that is set on the configmap is valid diff --git a/util/settings/settings_test.go b/util/settings/settings_test.go index 07a2c268a6bd7..5f213458fe000 100644 --- a/util/settings/settings_test.go +++ b/util/settings/settings_test.go @@ -169,6 +169,88 @@ func TestInClusterServerAddressEnabledByDefault(t *testing.T) { assert.Equal(t, true, settings.InClusterEnabled) } +func TestValidWebhookConfiguration(t *testing.T) { + secret := []byte("randomly-strong-secret") + uid := []byte("random-uid") + kubeClient := fake.NewSimpleClientset( + &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: common.ArgoCDConfigMapName, + Namespace: "default", + Labels: map[string]string{ + "app.kubernetes.io/part-of": "argocd", + }, + }, + Data: map[string]string{ + "webhook.maxConcurrentAppRefresh": "25", + }, + }, + &v1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: common.ArgoCDSecretName, + Namespace: "default", + Labels: map[string]string{ + "app.kubernetes.io/part-of": "argocd", + }, + }, + Data: map[string][]byte{ + "server.secretkey": nil, + "webhook.github.secret": secret, + "webhook.gitlab.secret": secret, + "webhook.bitbucket.uuid": uid, + "webhook.bitbucketserver.secret": secret, + "webhook.gogs.secret": secret, + "webhook.azuredevops.username": uid, + "webhook.azuredevops.password": secret, + }, + }, + ) + settingsManager := NewSettingsManager(context.Background(), kubeClient, "default") + settings, err := settingsManager.GetSettings() + assert.NoError(t, err) + assert.Equal(t, 25, settings.WebhookMaxConcurrentAppRefresh) + assert.Equal(t, string(secret), settings.WebhookGitHubSecret) + assert.Equal(t, string(secret), settings.WebhookGitLabSecret) + assert.Equal(t, string(uid), settings.WebhookBitbucketUUID) + assert.Equal(t, string(secret), settings.WebhookBitbucketServerSecret) + assert.Equal(t, string(secret), settings.WebhookGogsSecret) + assert.Equal(t, string(uid), settings.WebhookAzureDevOpsUsername) + assert.Equal(t, string(secret), settings.WebhookAzureDevOpsPassword) +} + +func TestInvalidWebhookConfiguration(t *testing.T) { + kubeClient := fake.NewSimpleClientset( + &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: common.ArgoCDConfigMapName, + Namespace: "default", + Labels: map[string]string{ + "app.kubernetes.io/part-of": "argocd", + }, + }, + Data: map[string]string{ + "webhook.maxConcurrentAppRefresh": "0", + }, + }, + &v1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: common.ArgoCDSecretName, + Namespace: "default", + Labels: map[string]string{ + "app.kubernetes.io/part-of": "argocd", + }, + }, + Data: map[string][]byte{ + "server.secretkey": nil, + }, + }, + ) + settingsManager := NewSettingsManager(context.Background(), kubeClient, "default") + settings, err := settingsManager.GetSettings() + assert.NoError(t, err) + assert.Equal(t, 10, settings.WebhookMaxConcurrentAppRefresh) +} + func TestGetAppInstanceLabelKey(t *testing.T) { _, settingsManager := fixtures(map[string]string{ "application.instanceLabelKey": "testLabel", diff --git a/util/webhook/webhook.go b/util/webhook/webhook.go index 9955540ea04a9..a181e4f14caf1 100644 --- a/util/webhook/webhook.go +++ b/util/webhook/webhook.go @@ -10,6 +10,7 @@ import ( "path/filepath" "regexp" "strings" + "sync" "github.com/go-playground/webhooks/v6/azuredevops" "github.com/go-playground/webhooks/v6/bitbucket" @@ -47,21 +48,26 @@ var ( errBasicAuthVerificationFailed = errors.New("basic auth verification failed") ) +const ( + defaultMaxConcurrentAppRefresh = 10 +) + type ArgoCDWebhookHandler struct { - repoCache *cache.Cache - serverCache *servercache.Cache - db db.ArgoDB - ns string - appNs []string - appClientset appclientset.Interface - github *github.Webhook - gitlab *gitlab.Webhook - bitbucket *bitbucket.Webhook - bitbucketserver *bitbucketserver.Webhook - azuredevops *azuredevops.Webhook - azuredevopsAuthHandler func(r *http.Request) error - gogs *gogs.Webhook - settingsSrc settingsSource + repoCache *cache.Cache + serverCache *servercache.Cache + db db.ArgoDB + ns string + appNs []string + appClientset appclientset.Interface + github *github.Webhook + gitlab *gitlab.Webhook + bitbucket *bitbucket.Webhook + bitbucketserver *bitbucketserver.Webhook + azuredevops *azuredevops.Webhook + azuredevopsAuthHandler func(r *http.Request) error + gogs *gogs.Webhook + settingsSrc settingsSource + maxConcurrentAppRefresh int } func NewHandler(namespace string, applicationNamespaces []string, appClientset appclientset.Interface, set *settings.ArgoCDSettings, settingsSrc settingsSource, repoCache *cache.Cache, serverCache *servercache.Cache, argoDB db.ArgoDB) *ArgoCDWebhookHandler { @@ -99,26 +105,38 @@ func NewHandler(namespace string, applicationNamespaces []string, appClientset a return nil } + // though this seems redundant in util/settings, this serves as a safeguard for the handler + maxConcurrentAppRefresh := getMaxConcurrentAppRefreshOrDefault(set) + acdWebhook := ArgoCDWebhookHandler{ - ns: namespace, - appNs: applicationNamespaces, - appClientset: appClientset, - github: githubWebhook, - gitlab: gitlabWebhook, - bitbucket: bitbucketWebhook, - bitbucketserver: bitbucketserverWebhook, - azuredevops: azuredevopsWebhook, - azuredevopsAuthHandler: azuredevopsAuthHandler, - gogs: gogsWebhook, - settingsSrc: settingsSrc, - repoCache: repoCache, - serverCache: serverCache, - db: argoDB, + ns: namespace, + appNs: applicationNamespaces, + appClientset: appClientset, + github: githubWebhook, + gitlab: gitlabWebhook, + bitbucket: bitbucketWebhook, + bitbucketserver: bitbucketserverWebhook, + azuredevops: azuredevopsWebhook, + azuredevopsAuthHandler: azuredevopsAuthHandler, + gogs: gogsWebhook, + settingsSrc: settingsSrc, + repoCache: repoCache, + serverCache: serverCache, + db: argoDB, + maxConcurrentAppRefresh: maxConcurrentAppRefresh, } return &acdWebhook } +func getMaxConcurrentAppRefreshOrDefault(set *settings.ArgoCDSettings) int { + if set.WebhookMaxConcurrentAppRefresh <= 0 { + log.Warnf("webhook setting for max concurrent set to <=0 or unset, using the default value %d", defaultMaxConcurrentAppRefresh) + return defaultMaxConcurrentAppRefresh + } + return set.WebhookMaxConcurrentAppRefresh +} + func parseRevision(ref string) string { refParts := strings.SplitN(ref, "/", 3) return refParts[len(refParts)-1] @@ -288,27 +306,54 @@ func (a *ArgoCDWebhookHandler) HandleEvent(payload interface{}) { log.Warnf("Failed to get repoRegexp: %s", err) continue } - for _, app := range filteredApps { - - for _, source := range app.Spec.GetSources() { - if sourceRevisionHasChanged(source, revision, touchedHead) && sourceUsesURL(source, webURL, repoRegexp) { - if appFilesHaveChanged(&app, changedFiles) { - namespacedAppInterface := a.appClientset.ArgoprojV1alpha1().Applications(app.ObjectMeta.Namespace) - _, err = argo.RefreshApp(namespacedAppInterface, app.ObjectMeta.Name, v1alpha1.RefreshTypeNormal) - if err != nil { - log.Warnf("Failed to refresh app '%s' for controller reprocessing: %v", app.ObjectMeta.Name, err) - continue - } - // No need to refresh multiple times if multiple sources match. - break - } else if change.shaBefore != "" && change.shaAfter != "" { - if err := a.storePreviouslyCachedManifests(&app, change, trackingMethod, appInstanceLabelKey); err != nil { - log.Warnf("Failed to store cached manifests of previous revision for app '%s': %v", app.Name, err) - } - } + + maxConcurrentAppRefresh := a.maxConcurrentAppRefresh + appBacklog := make(chan v1alpha1.Application, len(filteredApps)) + + for _, filteredApp := range filteredApps { + appBacklog <- filteredApp + } + close(appBacklog) + + var wg sync.WaitGroup + wg.Add(maxConcurrentAppRefresh) + + for i := 0; i < maxConcurrentAppRefresh; i++ { + go func() { + defer wg.Done() + for app := range appBacklog { + a.refreshChangedAppOrStoreCachedManifests(app, revision, touchedHead, webURL, repoRegexp, changedFiles, change, trackingMethod, appInstanceLabelKey) } + }() + } + wg.Wait() + } +} + +func (a *ArgoCDWebhookHandler) refreshChangedAppOrStoreCachedManifests(app v1alpha1.Application, revision string, touchedHead bool, webURL string, + repoRegexp *regexp.Regexp, changedFiles []string, change changeInfo, trackingMethod string, appInstanceLabelKey string) { + for _, source := range app.Spec.GetSources() { + if !(sourceRevisionHasChanged(source, revision, touchedHead) && sourceUsesURL(source, webURL, repoRegexp)) { + continue + } + if !appFilesHaveChanged(&app, changedFiles) { + // we cannot retrieve previous and next commit SHA for bitbucket. + // don't attempt to load/create cache for empty strings instead of SHA + if change.shaBefore == "" && change.shaAfter == "" { + continue + } + if err := a.storePreviouslyCachedManifests(&app, change, trackingMethod, appInstanceLabelKey); err != nil { + log.Warnf("Failed to store cached manifests of previous revision for app '%s': %v", app.Name, err) } + continue + } + namespacedAppInterface := a.appClientset.ArgoprojV1alpha1().Applications(app.ObjectMeta.Namespace) + if _, err := argo.RefreshApp(namespacedAppInterface, app.ObjectMeta.Name, v1alpha1.RefreshTypeNormal); err != nil { + log.Warnf("Failed to refresh app '%s' for controller reprocessing: %v", app.ObjectMeta.Name, err) + continue } + // No need to refresh multiple times if one source already matches + break } } diff --git a/util/webhook/webhook_test.go b/util/webhook/webhook_test.go index b241d7c671841..e5e7811f4afb7 100644 --- a/util/webhook/webhook_test.go +++ b/util/webhook/webhook_test.go @@ -2,6 +2,7 @@ package webhook import ( "bytes" + "context" "encoding/json" "fmt" "io" @@ -21,6 +22,9 @@ import ( "k8s.io/apimachinery/pkg/runtime" kubetesting "k8s.io/client-go/testing" + "github.com/argoproj/argo-cd/v2/common" + "github.com/argoproj/argo-cd/v2/reposerver/apiclient" + "github.com/argoproj/argo-cd/v2/util/argo" "github.com/argoproj/argo-cd/v2/util/cache/appstate" "github.com/argoproj/argo-cd/v2/util/db/mocks" @@ -67,7 +71,7 @@ func NewMockHandler(reactor *reactorDef, applicationNamespaces []string, objects } cacheClient := cacheutil.NewCache(cacheutil.NewInMemoryCache(1 * time.Hour)) - return NewHandler("argocd", applicationNamespaces, appClientset, &settings.ArgoCDSettings{}, &fakeSettingsSrc{}, cache.NewCache( + return NewHandler("argocd", applicationNamespaces, appClientset, &settings.ArgoCDSettings{WebhookMaxConcurrentAppRefresh: 10}, &fakeSettingsSrc{}, cache.NewCache( cacheClient, 1*time.Minute, 1*time.Minute, @@ -683,3 +687,184 @@ func Test_getWebUrlRegex(t *testing.T) { }) } } + +func Test_getMaxConcurrentAppRefreshOrDefault(t *testing.T) { + tests := []struct { + expected int + set *settings.ArgoCDSettings + name string + }{ + { + 25, + &settings.ArgoCDSettings{ + WebhookMaxConcurrentAppRefresh: 25, + }, + "should get the same value", + }, + { + 10, + &settings.ArgoCDSettings{ + WebhookMaxConcurrentAppRefresh: -1, + }, + "should return default value", + }, + { + 10, + &settings.ArgoCDSettings{}, + "should return default value", + }, + } + for _, testCase := range tests { + testCopy := testCase + t.Run(testCopy.name, func(t *testing.T) { + t.Parallel() + res := getMaxConcurrentAppRefreshOrDefault(testCopy.set) + assert.Equal(t, res, testCopy.expected) + }) + } +} + +func populateCache(t *testing.T, a *ArgoCDWebhookHandler, revision string, app v1alpha1.Application) { + var clusterInfo v1alpha1.ClusterInfo + if err := a.serverCache.SetClusterInfo(app.Spec.Destination.Server, &clusterInfo); err != nil { + t.Fatal(err) + } + fakeManifestResponse := &apiclient.ManifestResponse{Manifests: []string{"Fake"}} + cachedManifests := cache.CachedManifestResponse{ + CacheEntryHash: app.GetName(), + ManifestResponse: fakeManifestResponse, + } + source := app.Spec.GetSource() + refSources, _ := argo.GetRefSources(context.Background(), app.Spec, a.db) + err := a.repoCache.SetManifests(revision, &source, refSources, &clusterInfo, "argocd", string(argo.TrackingMethodLabel), + common.LabelKeyAppInstance, app.GetName(), &cachedManifests, nil) + if err != nil { + t.Fatal(err) + } +} + +func Test_refreshChangedAppOrStoreCachedManifests(t *testing.T) { + fakeOldRev := "fb81885d143ab7da038e3e4d3e792fe20f75c1e9" + fakeNewRev := "be2e98524d382426e06c78ba94aaef8bde250b33" + fakeChgInfo := changeInfo{shaBefore: fakeOldRev, shaAfter: fakeNewRev} + fakeDest := v1alpha1.ApplicationDestination{Server: "kubernetes.svc.cluster.local", Namespace: "argocd"} + fakeRepoURL := "https://provider.com/project/some-repo" + fakeDefaultSrc := &v1alpha1.ApplicationSource{RepoURL: fakeRepoURL, Path: ".", TargetRevision: "master"} + tests := []struct { + name string + app *v1alpha1.Application + revision string + changedFiles []string + expectedRefreshAnnotationVal string + expectCacheCalled bool + }{ + { + "should refresh app", + &v1alpha1.Application{ + ObjectMeta: metav1.ObjectMeta{ + Name: "app-to-refresh", + Namespace: "argocd", + Labels: map[string]string{common.LabelKeyAppInstance: "app-to-refresh"}, + Annotations: map[string]string{v1alpha1.AnnotationKeyManifestGeneratePaths: "./some-dir"}, + }, + Spec: v1alpha1.ApplicationSpec{ + Source: fakeDefaultSrc, + Destination: fakeDest, + }, + }, + "master", + []string{"./some-dir/file.yaml"}, + "normal", + false, + }, + { + "should skip refresh when push to different branch", + &v1alpha1.Application{ + ObjectMeta: metav1.ObjectMeta{ + Name: "app-to-skip-refresh", + Namespace: "argocd", + Labels: map[string]string{common.LabelKeyAppInstance: "app-to-refresh"}, + Annotations: map[string]string{v1alpha1.AnnotationKeyManifestGeneratePaths: "./some-dir"}, + }, + Spec: v1alpha1.ApplicationSpec{ + Source: fakeDefaultSrc, + Destination: fakeDest, + }, + }, + "different_branch", + []string{"./some-dir/file.yaml"}, + "", + false, + }, + { + "should skip refresh for app with different repo", + &v1alpha1.Application{ + ObjectMeta: metav1.ObjectMeta{ + Name: "app-to-ignore", + Namespace: "argocd", + Labels: map[string]string{common.LabelKeyAppInstance: "app-to-ignore"}, + }, + Spec: v1alpha1.ApplicationSpec{ + Source: &v1alpha1.ApplicationSource{ + RepoURL: "https://provider.com/project/ignored-repo", + Path: ".", + TargetRevision: "master", + }, + Destination: fakeDest, + }, + }, + "master", + []string{"some-dir/file.yaml"}, + "", + false, + }, + { + "should cache new revision and skip refresh", + &v1alpha1.Application{ + ObjectMeta: metav1.ObjectMeta{ + Name: "app-to-cache", + Namespace: "argocd", + Labels: map[string]string{common.LabelKeyAppInstance: "app-to-cache"}, + Annotations: map[string]string{v1alpha1.AnnotationKeyManifestGeneratePaths: "./some-dir"}, + }, + Spec: v1alpha1.ApplicationSpec{ + Source: fakeDefaultSrc, + Destination: fakeDest, + }, + }, + "master", + []string{"other-dir/file.yaml"}, + "", + true, + }, + } + for _, testCase := range tests { + tc := testCase + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + a := NewMockHandler(nil, []string{"argocd"}, tc.app) + populateCache(t, a, fakeOldRev, *tc.app) + repoRegexp, err := getWebUrlRegex(fakeRepoURL) + assert.NoError(t, err) + touchedHead := true // doesn't really matter in this test as we use master, just mandatory for the function + a.refreshChangedAppOrStoreCachedManifests(*tc.app, tc.revision, touchedHead, fakeRepoURL, + repoRegexp, tc.changedFiles, fakeChgInfo, string(argo.TrackingMethodLabel), common.LabelKeyAppInstance) + refreshApp, err := a.appClientset.ArgoprojV1alpha1().Applications("argocd").Get( + context.Background(), tc.app.GetName(), metav1.GetOptions{}) + assert.NoError(t, err) + refresh := refreshApp.Annotations[v1alpha1.AnnotationKeyRefresh] + assert.Equal(t, refresh, tc.expectedRefreshAnnotationVal) + + // we assume cache will miss if only the webhook's refreshApp is called, + // as application-controller would set the cache normally, but does not happen in this test + var clusterInfo v1alpha1.ClusterInfo + if err := a.serverCache.GetClusterInfo(tc.app.Spec.Destination.Server, &clusterInfo); err != nil { + t.Fatal(err) + } + err = a.repoCache.GetManifests(fakeNewRev, tc.app.Spec.Source, v1alpha1.RefTargetRevisionMapping{}, &clusterInfo, "argocd", string(argo.TrackingMethodLabel), + common.LabelKeyAppInstance, tc.app.GetName(), &cache.CachedManifestResponse{CacheEntryHash: tc.app.GetName()}, nil) + // cache hits only if webhook handler sets the cache + assert.Equal(t, tc.expectCacheCalled, err == nil) + }) + } +}