diff --git a/util/settings/settings.go b/util/settings/settings.go index 9cd38417e081e..e245234194924 100644 --- a/util/settings/settings.go +++ b/util/settings/settings.go @@ -75,6 +75,8 @@ type ArgoCDSettings struct { WebhookAzureDevOpsUsername string `json:"webhookAzureDevOpsUsername,omitempty"` // WebhookAzureDevOpsPassword holds the password for authenticating Azure DevOps webhook events WebhookAzureDevOpsPassword string `json:"webhookAzureDevOpsPassword,omitempty"` + // WebhookAppRefreshConcurrency sets the number of concurrent app refreshes run by the webhook handler + WebhookAppRefreshConcurrency int `json:"webhookAppRefreshConcurrency,omitempty"` // Secrets holds all secrets in argocd-secret as a map[string]string Secrets map[string]string `json:"secrets,omitempty"` // KustomizeBuildOptions is a string of kustomize build parameters @@ -419,6 +421,10 @@ const ( settingsWebhookAzureDevOpsUsernameKey = "webhook.azuredevops.username" // settingsWebhookAzureDevOpsPasswordKey is the key for Azure DevOps webhook password settingsWebhookAzureDevOpsPasswordKey = "webhook.azuredevops.password" + // settingsWebhookAppRefreshConcurrency number of simultaneous applications to be processed + settingsWebhookAppRefreshConcurrency = "webhook.appRefreshConcurrency" + // defaultWebhookAppRefreshConcurrency default number for simultaneous applications to be processed + defaultWebhookAppRefreshConcurrency = 10 // settingsApplicationInstanceLabelKey is the key to configure injected app instance label key settingsApplicationInstanceLabelKey = "application.instanceLabelKey" // settingsResourceTrackingMethodKey is the key to configure tracking method for application resources @@ -1471,6 +1477,14 @@ func (mgr *SettingsManager) updateSettingsFromSecret(settings *ArgoCDSettings, a if azureDevOpsPassword := argoCDSecret.Data[settingsWebhookAzureDevOpsPasswordKey]; len(azureDevOpsPassword) > 0 { settings.WebhookAzureDevOpsPassword = string(azureDevOpsPassword) } + if webhookAppRefreshConcurrency := argoCDSecret.Data[settingsWebhookAppRefreshConcurrency]; len(webhookAppRefreshConcurrency) > 0 { + i, err := strconv.Atoi(string(webhookAppRefreshConcurrency)) + if err != nil { + i = defaultWebhookAppRefreshConcurrency + log.Warnf("invalid input for %s: %s. Using the default value.", settingsWebhookAppRefreshConcurrency, err.Error()) + } + settings.WebhookAppRefreshConcurrency = i + } // The TLS certificate may be externally managed. We try to load it from an // external secret first. If the external secret doesn't exist, we either diff --git a/util/webhook/webhook.go b/util/webhook/webhook.go index 9955540ea04a9..17bbdf4b36d6d 100644 --- a/util/webhook/webhook.go +++ b/util/webhook/webhook.go @@ -10,6 +10,7 @@ import ( "path/filepath" "regexp" "strings" + "sync" "github.com/go-playground/webhooks/v6/azuredevops" "github.com/go-playground/webhooks/v6/bitbucket" @@ -47,6 +48,10 @@ var ( errBasicAuthVerificationFailed = errors.New("basic auth verification failed") ) +const ( + defaultAppRefreshConcurrency = 10 +) + type ArgoCDWebhookHandler struct { repoCache *cache.Cache serverCache *servercache.Cache @@ -62,6 +67,7 @@ type ArgoCDWebhookHandler struct { azuredevopsAuthHandler func(r *http.Request) error gogs *gogs.Webhook settingsSrc settingsSource + appRefreshConccurency int } func NewHandler(namespace string, applicationNamespaces []string, appClientset appclientset.Interface, set *settings.ArgoCDSettings, settingsSrc settingsSource, repoCache *cache.Cache, serverCache *servercache.Cache, argoDB db.ArgoDB) *ArgoCDWebhookHandler { @@ -99,6 +105,11 @@ func NewHandler(namespace string, applicationNamespaces []string, appClientset a return nil } + appRefreshConccurency := defaultAppRefreshConcurrency + if set.WebhookAppRefreshConcurrency > 0 { + appRefreshConccurency = set.WebhookAppRefreshConcurrency + } + acdWebhook := ArgoCDWebhookHandler{ ns: namespace, appNs: applicationNamespaces, @@ -114,6 +125,7 @@ func NewHandler(namespace string, applicationNamespaces []string, appClientset a repoCache: repoCache, serverCache: serverCache, db: argoDB, + appRefreshConccurency: appRefreshConccurency, } return &acdWebhook @@ -288,24 +300,47 @@ func (a *ArgoCDWebhookHandler) HandleEvent(payload interface{}) { log.Warnf("Failed to get repoRegexp: %s", err) continue } - for _, app := range filteredApps { - - for _, source := range app.Spec.GetSources() { - if sourceRevisionHasChanged(source, revision, touchedHead) && sourceUsesURL(source, webURL, repoRegexp) { - if appFilesHaveChanged(&app, changedFiles) { - namespacedAppInterface := a.appClientset.ArgoprojV1alpha1().Applications(app.ObjectMeta.Namespace) - _, err = argo.RefreshApp(namespacedAppInterface, app.ObjectMeta.Name, v1alpha1.RefreshTypeNormal) - if err != nil { - log.Warnf("Failed to refresh app '%s' for controller reprocessing: %v", app.ObjectMeta.Name, err) - continue - } - // No need to refresh multiple times if multiple sources match. - break - } else if change.shaBefore != "" && change.shaAfter != "" { - if err := a.storePreviouslyCachedManifests(&app, change, trackingMethod, appInstanceLabelKey); err != nil { - log.Warnf("Failed to store cached manifests of previous revision for app '%s': %v", app.Name, err) - } - } + + appRefreshConccurency := a.appRefreshConccurency + appBacklog := make(chan v1alpha1.Application, len(filteredApps)) + + for _, filteredApp := range filteredApps { + appBacklog <- filteredApp + } + close(appBacklog) + + var wg sync.WaitGroup + wg.Add(appRefreshConccurency) + + for i := 0; i < appRefreshConccurency; i++ { + go func() { + defer wg.Done() + for app := range appBacklog { + a.refreshChangedAppOrStoreCachedManifests(app, revision, touchedHead, webURL, repoRegexp, changedFiles, change, trackingMethod, appInstanceLabelKey) + } + }() + } + wg.Wait() + } +} + +func (a *ArgoCDWebhookHandler) refreshChangedAppOrStoreCachedManifests(app v1alpha1.Application, revision string, touchedHead bool, webURL string, + repoRegexp *regexp.Regexp, changedFiles []string, change changeInfo, trackingMethod string, appInstanceLabelKey string) { + + for _, source := range app.Spec.GetSources() { + if sourceRevisionHasChanged(source, revision, touchedHead) && sourceUsesURL(source, webURL, repoRegexp) { + if appFilesHaveChanged(&app, changedFiles) { + namespacedAppInterface := a.appClientset.ArgoprojV1alpha1().Applications(app.ObjectMeta.Namespace) + _, err := argo.RefreshApp(namespacedAppInterface, app.ObjectMeta.Name, v1alpha1.RefreshTypeNormal) + if err != nil { + log.Warnf("Failed to refresh app '%s' for controller reprocessing: %v", app.ObjectMeta.Name, err) + continue + } + // No need to refresh multiple times if multiple sources match. + break + } else if change.shaBefore != "" && change.shaAfter != "" { + if err := a.storePreviouslyCachedManifests(&app, change, trackingMethod, appInstanceLabelKey); err != nil { + log.Warnf("Failed to store cached manifests of previous revision for app '%s': %v", app.Name, err) } } }