Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(webhook): make app refresh operation concurrent (#14269) #15326

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/operator-manual/high_availability.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ spec:
* The `ARGOCD_API_SERVER_REPLICAS` environment variable is used to divide [the limit of concurrent login requests (`ARGOCD_MAX_CONCURRENT_LOGIN_REQUESTS_COUNT`)](./user-management/index.md#failed-logins-rate-limiting) between each replica.
* The `ARGOCD_GRPC_MAX_SIZE_MB` environment variable allows specifying the max size of the server response message in megabytes.
The default value is 200. You might need to increase this for an Argo CD instance that manages 3000+ applications.
* The server's [webhook handler](./webhook.md#configuring-webhook-handlers-application-refresh-concurrency) refreshes apps by patching Applications with the `argocd.argoproj.io/refresh` annotation, sending network-bound requests to the Kubernetes API server. To speed up webhook processing, especially with thousands of applications, you can increase the max number of concurrent requests the webhook handler sends to the Kubernetes API Server using the `webhook.maxConcurrentAppRefresh` setting in your `argocd-cm` ConfigMap (`10` by default). Bear in mind that this might put the API server under heavy load if set too high.

### argocd-dex-server, argocd-redis

Expand Down
24 changes: 24 additions & 0 deletions docs/operator-manual/webhook.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,27 @@ stringData:
```

After saving, the changes should take effect automatically.

## Configuring Webhook Handler's Application Refresh Concurrency

After receiving a webhook event, ArgoCD refreshes all Applications which need to be refreshed. To do this,
the webhook handler adds the `argocd.argoproj.io/refresh` annotation to the Applications by sending patch
requests to your control-plane's Kubernetes API server for each Application. If your repository provider limits webhook
timeout and you have several hundreds or thousands of apps, you can speed up the handler's refresh process by
configuring how many concurrent patch requests the webhook handler can send to Kubernetes API server using the
`webhook.maxConcurrentAppRefresh` setting in your `argocd-cm` ConfigMap (`10` by default). Put into consideration your
Kubernetes API server's ability to handle requests. Setting this setting to a very high number (relative to your
Kubernetes API server) can potentially overload the Kubernetes API server.

```yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: argocd-cm
namespace: argocd
...
data:
# max concurrent refresh annotation patch requests the webhook handler can send to Kubernetes API server.
webhook.maxConcurrentAppRefresh: 10
...
```
19 changes: 19 additions & 0 deletions util/settings/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ type ArgoCDSettings struct {
WebhookAzureDevOpsUsername string `json:"webhookAzureDevOpsUsername,omitempty"`
// WebhookAzureDevOpsPassword holds the password for authenticating Azure DevOps webhook events
WebhookAzureDevOpsPassword string `json:"webhookAzureDevOpsPassword,omitempty"`
// WebhookMaxConcurrentAppRefresh sets the number of concurrent app refreshes run by the webhook handler
WebhookMaxConcurrentAppRefresh int `json:"webhookMaxConcurrentAppRefresh,omitempty"`
// Secrets holds all secrets in argocd-secret as a map[string]string
Secrets map[string]string `json:"secrets,omitempty"`
// KustomizeBuildOptions is a string of kustomize build parameters
Expand Down Expand Up @@ -495,6 +497,10 @@ const (
RespectRBAC = "resource.respectRBAC"
RespectRBACValueStrict = "strict"
RespectRBACValueNormal = "normal"
// settingsWebhookMaxConcurrentAppRefresh is the key for max concurrent app refresh
settingsWebhookMaxConcurrentAppRefreshKey = "webhook.maxConcurrentAppRefresh"
// defaultSettingsWebhookMaxConcurrentAppRefresh is the default value for the number of max concurrent app refresh
defaultSettingsWebhookMaxConcurrentAppRefresh = 10
)

var (
Expand Down Expand Up @@ -1447,6 +1453,19 @@ func updateSettingsFromConfigMap(settings *ArgoCDSettings, argoCDCM *apiv1.Confi
settings.TrackingMethod = argoCDCM.Data[settingsResourceTrackingMethodKey]
settings.OIDCTLSInsecureSkipVerify = argoCDCM.Data[oidcTLSInsecureSkipVerifyKey] == "true"
settings.ExtensionConfig = argoCDCM.Data[extensionConfig]
if webhookMaxConcurrentAppRefresh := argoCDCM.Data[settingsWebhookMaxConcurrentAppRefreshKey]; len(webhookMaxConcurrentAppRefresh) > 0 {
i, err := strconv.Atoi(string(webhookMaxConcurrentAppRefresh))
if err != nil {
log.Warnf("invalid input for %s: %s. Using the default value %d.", settingsWebhookMaxConcurrentAppRefreshKey, err.Error(), defaultSettingsWebhookMaxConcurrentAppRefresh)
}
if i < 1 {
i = defaultSettingsWebhookMaxConcurrentAppRefresh
log.Warnf("%s is less than 1. Using the default value %d.", settingsWebhookMaxConcurrentAppRefreshKey, defaultSettingsWebhookMaxConcurrentAppRefresh)
}
settings.WebhookMaxConcurrentAppRefresh = i
} else {
settings.WebhookMaxConcurrentAppRefresh = defaultSettingsWebhookMaxConcurrentAppRefresh
}
}

// validateExternalURL ensures the external URL that is set on the configmap is valid
Expand Down
82 changes: 82 additions & 0 deletions util/settings/settings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,88 @@ func TestInClusterServerAddressEnabledByDefault(t *testing.T) {
assert.Equal(t, true, settings.InClusterEnabled)
}

func TestValidWebhookConfiguration(t *testing.T) {
secret := []byte("randomly-strong-secret")
uid := []byte("random-uid")
kubeClient := fake.NewSimpleClientset(
&v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: common.ArgoCDConfigMapName,
Namespace: "default",
Labels: map[string]string{
"app.kubernetes.io/part-of": "argocd",
},
},
Data: map[string]string{
"webhook.maxConcurrentAppRefresh": "25",
},
},
&v1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: common.ArgoCDSecretName,
Namespace: "default",
Labels: map[string]string{
"app.kubernetes.io/part-of": "argocd",
},
},
Data: map[string][]byte{
"server.secretkey": nil,
"webhook.github.secret": secret,
"webhook.gitlab.secret": secret,
"webhook.bitbucket.uuid": uid,
"webhook.bitbucketserver.secret": secret,
"webhook.gogs.secret": secret,
"webhook.azuredevops.username": uid,
"webhook.azuredevops.password": secret,
},
},
)
settingsManager := NewSettingsManager(context.Background(), kubeClient, "default")
settings, err := settingsManager.GetSettings()
assert.NoError(t, err)
assert.Equal(t, 25, settings.WebhookMaxConcurrentAppRefresh)
assert.Equal(t, string(secret), settings.WebhookGitHubSecret)
assert.Equal(t, string(secret), settings.WebhookGitLabSecret)
assert.Equal(t, string(uid), settings.WebhookBitbucketUUID)
assert.Equal(t, string(secret), settings.WebhookBitbucketServerSecret)
assert.Equal(t, string(secret), settings.WebhookGogsSecret)
assert.Equal(t, string(uid), settings.WebhookAzureDevOpsUsername)
assert.Equal(t, string(secret), settings.WebhookAzureDevOpsPassword)
}

func TestInvalidWebhookConfiguration(t *testing.T) {
kubeClient := fake.NewSimpleClientset(
&v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: common.ArgoCDConfigMapName,
Namespace: "default",
Labels: map[string]string{
"app.kubernetes.io/part-of": "argocd",
},
},
Data: map[string]string{
"webhook.maxConcurrentAppRefresh": "0",
},
},
&v1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: common.ArgoCDSecretName,
Namespace: "default",
Labels: map[string]string{
"app.kubernetes.io/part-of": "argocd",
},
},
Data: map[string][]byte{
"server.secretkey": nil,
},
},
)
settingsManager := NewSettingsManager(context.Background(), kubeClient, "default")
settings, err := settingsManager.GetSettings()
assert.NoError(t, err)
assert.Equal(t, 10, settings.WebhookMaxConcurrentAppRefresh)
}

func TestGetAppInstanceLabelKey(t *testing.T) {
_, settingsManager := fixtures(map[string]string{
"application.instanceLabelKey": "testLabel",
Expand Down
137 changes: 91 additions & 46 deletions util/webhook/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"path/filepath"
"regexp"
"strings"
"sync"

"github.com/go-playground/webhooks/v6/azuredevops"
"github.com/go-playground/webhooks/v6/bitbucket"
Expand Down Expand Up @@ -47,21 +48,26 @@ var (
errBasicAuthVerificationFailed = errors.New("basic auth verification failed")
)

const (
defaultMaxConcurrentAppRefresh = 10
)

type ArgoCDWebhookHandler struct {
repoCache *cache.Cache
serverCache *servercache.Cache
db db.ArgoDB
ns string
appNs []string
appClientset appclientset.Interface
github *github.Webhook
gitlab *gitlab.Webhook
bitbucket *bitbucket.Webhook
bitbucketserver *bitbucketserver.Webhook
azuredevops *azuredevops.Webhook
azuredevopsAuthHandler func(r *http.Request) error
gogs *gogs.Webhook
settingsSrc settingsSource
repoCache *cache.Cache
serverCache *servercache.Cache
db db.ArgoDB
ns string
appNs []string
appClientset appclientset.Interface
github *github.Webhook
gitlab *gitlab.Webhook
bitbucket *bitbucket.Webhook
bitbucketserver *bitbucketserver.Webhook
azuredevops *azuredevops.Webhook
azuredevopsAuthHandler func(r *http.Request) error
gogs *gogs.Webhook
settingsSrc settingsSource
maxConcurrentAppRefresh int
}

func NewHandler(namespace string, applicationNamespaces []string, appClientset appclientset.Interface, set *settings.ArgoCDSettings, settingsSrc settingsSource, repoCache *cache.Cache, serverCache *servercache.Cache, argoDB db.ArgoDB) *ArgoCDWebhookHandler {
Expand Down Expand Up @@ -99,26 +105,38 @@ func NewHandler(namespace string, applicationNamespaces []string, appClientset a
return nil
}

// though this seems redundant in util/settings, this serves as a safeguard for the handler
maxConcurrentAppRefresh := getMaxConcurrentAppRefreshOrDefault(set)

acdWebhook := ArgoCDWebhookHandler{
ns: namespace,
appNs: applicationNamespaces,
appClientset: appClientset,
github: githubWebhook,
gitlab: gitlabWebhook,
bitbucket: bitbucketWebhook,
bitbucketserver: bitbucketserverWebhook,
azuredevops: azuredevopsWebhook,
azuredevopsAuthHandler: azuredevopsAuthHandler,
gogs: gogsWebhook,
settingsSrc: settingsSrc,
repoCache: repoCache,
serverCache: serverCache,
db: argoDB,
ns: namespace,
appNs: applicationNamespaces,
appClientset: appClientset,
github: githubWebhook,
gitlab: gitlabWebhook,
bitbucket: bitbucketWebhook,
bitbucketserver: bitbucketserverWebhook,
azuredevops: azuredevopsWebhook,
azuredevopsAuthHandler: azuredevopsAuthHandler,
gogs: gogsWebhook,
settingsSrc: settingsSrc,
repoCache: repoCache,
serverCache: serverCache,
db: argoDB,
maxConcurrentAppRefresh: maxConcurrentAppRefresh,
}

return &acdWebhook
}

func getMaxConcurrentAppRefreshOrDefault(set *settings.ArgoCDSettings) int {
if set.WebhookMaxConcurrentAppRefresh <= 0 {
log.Warnf("webhook setting for max concurrent set to <=0 or unset, using the default value %d", defaultMaxConcurrentAppRefresh)
return defaultMaxConcurrentAppRefresh
}
return set.WebhookMaxConcurrentAppRefresh
}

func parseRevision(ref string) string {
refParts := strings.SplitN(ref, "/", 3)
return refParts[len(refParts)-1]
Expand Down Expand Up @@ -288,27 +306,54 @@ func (a *ArgoCDWebhookHandler) HandleEvent(payload interface{}) {
log.Warnf("Failed to get repoRegexp: %s", err)
continue
}
for _, app := range filteredApps {

for _, source := range app.Spec.GetSources() {
if sourceRevisionHasChanged(source, revision, touchedHead) && sourceUsesURL(source, webURL, repoRegexp) {
if appFilesHaveChanged(&app, changedFiles) {
namespacedAppInterface := a.appClientset.ArgoprojV1alpha1().Applications(app.ObjectMeta.Namespace)
_, err = argo.RefreshApp(namespacedAppInterface, app.ObjectMeta.Name, v1alpha1.RefreshTypeNormal)
if err != nil {
log.Warnf("Failed to refresh app '%s' for controller reprocessing: %v", app.ObjectMeta.Name, err)
continue
}
// No need to refresh multiple times if multiple sources match.
break
} else if change.shaBefore != "" && change.shaAfter != "" {
if err := a.storePreviouslyCachedManifests(&app, change, trackingMethod, appInstanceLabelKey); err != nil {
log.Warnf("Failed to store cached manifests of previous revision for app '%s': %v", app.Name, err)
}
}

maxConcurrentAppRefresh := a.maxConcurrentAppRefresh
appBacklog := make(chan v1alpha1.Application, len(filteredApps))

for _, filteredApp := range filteredApps {
appBacklog <- filteredApp
}
close(appBacklog)

var wg sync.WaitGroup
wg.Add(maxConcurrentAppRefresh)

for i := 0; i < maxConcurrentAppRefresh; i++ {
go func() {
defer wg.Done()
for app := range appBacklog {
a.refreshChangedAppOrStoreCachedManifests(app, revision, touchedHead, webURL, repoRegexp, changedFiles, change, trackingMethod, appInstanceLabelKey)
}
}()
}
wg.Wait()
}
}

func (a *ArgoCDWebhookHandler) refreshChangedAppOrStoreCachedManifests(app v1alpha1.Application, revision string, touchedHead bool, webURL string,
repoRegexp *regexp.Regexp, changedFiles []string, change changeInfo, trackingMethod string, appInstanceLabelKey string) {
for _, source := range app.Spec.GetSources() {
if !(sourceRevisionHasChanged(source, revision, touchedHead) && sourceUsesURL(source, webURL, repoRegexp)) {
continue
}
if !appFilesHaveChanged(&app, changedFiles) {
// we cannot retrieve previous and next commit SHA for bitbucket.
// don't attempt to load/create cache for empty strings instead of SHA
if change.shaBefore == "" && change.shaAfter == "" {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

continue
}
if err := a.storePreviouslyCachedManifests(&app, change, trackingMethod, appInstanceLabelKey); err != nil {
log.Warnf("Failed to store cached manifests of previous revision for app '%s': %v", app.Name, err)
}
continue
}
namespacedAppInterface := a.appClientset.ArgoprojV1alpha1().Applications(app.ObjectMeta.Namespace)
if _, err := argo.RefreshApp(namespacedAppInterface, app.ObjectMeta.Name, v1alpha1.RefreshTypeNormal); err != nil {
log.Warnf("Failed to refresh app '%s' for controller reprocessing: %v", app.ObjectMeta.Name, err)
continue
}
// No need to refresh multiple times if one source already matches
break
}
}

Expand Down
Loading
Loading