Skip to content

Commit

Permalink
feat(webhook): make app refresh operation concurrent
Browse files Browse the repository at this point in the history
Signed-off-by: phanama <[email protected]>
  • Loading branch information
phanama committed Sep 2, 2023
1 parent ef7f32e commit 5935fac
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 18 deletions.
14 changes: 14 additions & 0 deletions util/settings/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ type ArgoCDSettings struct {
WebhookAzureDevOpsUsername string `json:"webhookAzureDevOpsUsername,omitempty"`
// WebhookAzureDevOpsPassword holds the password for authenticating Azure DevOps webhook events
WebhookAzureDevOpsPassword string `json:"webhookAzureDevOpsPassword,omitempty"`
// WebhookAppRefreshConcurrency sets the number of concurrent app refreshes run by the webhook handler
WebhookAppRefreshConcurrency int `json:"webhookAppRefreshConcurrency,omitempty"`
// Secrets holds all secrets in argocd-secret as a map[string]string
Secrets map[string]string `json:"secrets,omitempty"`
// KustomizeBuildOptions is a string of kustomize build parameters
Expand Down Expand Up @@ -419,6 +421,10 @@ const (
settingsWebhookAzureDevOpsUsernameKey = "webhook.azuredevops.username"
// settingsWebhookAzureDevOpsPasswordKey is the key for Azure DevOps webhook password
settingsWebhookAzureDevOpsPasswordKey = "webhook.azuredevops.password"
// settingsWebhookAppRefreshConcurrency number of simultaneous applications to be processed
settingsWebhookAppRefreshConcurrency = "webhook.appRefreshConcurrency"
// defaultWebhookAppRefreshConcurrency default number for simultaneous applications to be processed
defaultWebhookAppRefreshConcurrency = 10
// settingsApplicationInstanceLabelKey is the key to configure injected app instance label key
settingsApplicationInstanceLabelKey = "application.instanceLabelKey"
// settingsResourceTrackingMethodKey is the key to configure tracking method for application resources
Expand Down Expand Up @@ -1471,6 +1477,14 @@ func (mgr *SettingsManager) updateSettingsFromSecret(settings *ArgoCDSettings, a
if azureDevOpsPassword := argoCDSecret.Data[settingsWebhookAzureDevOpsPasswordKey]; len(azureDevOpsPassword) > 0 {
settings.WebhookAzureDevOpsPassword = string(azureDevOpsPassword)
}
if webhookAppRefreshConcurrency := argoCDSecret.Data[settingsWebhookAppRefreshConcurrency]; len(webhookAppRefreshConcurrency) > 0 {
i, err := strconv.Atoi(string(webhookAppRefreshConcurrency))
if err != nil {
i = defaultWebhookAppRefreshConcurrency
log.Warnf("invalid input for %s: %s. Using the default value.", settingsWebhookAppRefreshConcurrency, err.Error())
}
settings.WebhookAppRefreshConcurrency = i
}

// The TLS certificate may be externally managed. We try to load it from an
// external secret first. If the external secret doesn't exist, we either
Expand Down
71 changes: 53 additions & 18 deletions util/webhook/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"path/filepath"
"regexp"
"strings"
"sync"

"github.com/go-playground/webhooks/v6/azuredevops"
"github.com/go-playground/webhooks/v6/bitbucket"
Expand Down Expand Up @@ -47,6 +48,10 @@ var (
errBasicAuthVerificationFailed = errors.New("basic auth verification failed")
)

const (
defaultAppRefreshConcurrency = 10
)

type ArgoCDWebhookHandler struct {
repoCache *cache.Cache
serverCache *servercache.Cache
Expand All @@ -62,6 +67,7 @@ type ArgoCDWebhookHandler struct {
azuredevopsAuthHandler func(r *http.Request) error
gogs *gogs.Webhook
settingsSrc settingsSource
appRefreshConccurency int
}

func NewHandler(namespace string, applicationNamespaces []string, appClientset appclientset.Interface, set *settings.ArgoCDSettings, settingsSrc settingsSource, repoCache *cache.Cache, serverCache *servercache.Cache, argoDB db.ArgoDB) *ArgoCDWebhookHandler {
Expand Down Expand Up @@ -99,6 +105,11 @@ func NewHandler(namespace string, applicationNamespaces []string, appClientset a
return nil
}

appRefreshConccurency := defaultAppRefreshConcurrency
if set.WebhookAppRefreshConcurrency > 0 {
appRefreshConccurency = set.WebhookAppRefreshConcurrency
}

acdWebhook := ArgoCDWebhookHandler{
ns: namespace,
appNs: applicationNamespaces,
Expand All @@ -114,6 +125,7 @@ func NewHandler(namespace string, applicationNamespaces []string, appClientset a
repoCache: repoCache,
serverCache: serverCache,
db: argoDB,
appRefreshConccurency: appRefreshConccurency,
}

return &acdWebhook
Expand Down Expand Up @@ -288,24 +300,47 @@ func (a *ArgoCDWebhookHandler) HandleEvent(payload interface{}) {
log.Warnf("Failed to get repoRegexp: %s", err)
continue
}
for _, app := range filteredApps {

for _, source := range app.Spec.GetSources() {
if sourceRevisionHasChanged(source, revision, touchedHead) && sourceUsesURL(source, webURL, repoRegexp) {
if appFilesHaveChanged(&app, changedFiles) {
namespacedAppInterface := a.appClientset.ArgoprojV1alpha1().Applications(app.ObjectMeta.Namespace)
_, err = argo.RefreshApp(namespacedAppInterface, app.ObjectMeta.Name, v1alpha1.RefreshTypeNormal)
if err != nil {
log.Warnf("Failed to refresh app '%s' for controller reprocessing: %v", app.ObjectMeta.Name, err)
continue
}
// No need to refresh multiple times if multiple sources match.
break
} else if change.shaBefore != "" && change.shaAfter != "" {
if err := a.storePreviouslyCachedManifests(&app, change, trackingMethod, appInstanceLabelKey); err != nil {
log.Warnf("Failed to store cached manifests of previous revision for app '%s': %v", app.Name, err)
}
}

appRefreshConccurency := a.appRefreshConccurency
appBacklog := make(chan v1alpha1.Application, len(filteredApps))

for _, filteredApp := range filteredApps {
appBacklog <- filteredApp
}
close(appBacklog)

var wg sync.WaitGroup
wg.Add(appRefreshConccurency)

for i := 0; i < appRefreshConccurency; i++ {
go func() {
defer wg.Done()
for app := range appBacklog {
a.refreshChangedAppOrStoreCachedManifests(app, revision, touchedHead, webURL, repoRegexp, changedFiles, change, trackingMethod, appInstanceLabelKey)
}
}()
}
wg.Wait()
}
}

func (a *ArgoCDWebhookHandler) refreshChangedAppOrStoreCachedManifests(app v1alpha1.Application, revision string, touchedHead bool, webURL string,
repoRegexp *regexp.Regexp, changedFiles []string, change changeInfo, trackingMethod string, appInstanceLabelKey string) {

for _, source := range app.Spec.GetSources() {
if sourceRevisionHasChanged(source, revision, touchedHead) && sourceUsesURL(source, webURL, repoRegexp) {
if appFilesHaveChanged(&app, changedFiles) {
namespacedAppInterface := a.appClientset.ArgoprojV1alpha1().Applications(app.ObjectMeta.Namespace)
_, err := argo.RefreshApp(namespacedAppInterface, app.ObjectMeta.Name, v1alpha1.RefreshTypeNormal)
if err != nil {
log.Warnf("Failed to refresh app '%s' for controller reprocessing: %v", app.ObjectMeta.Name, err)
continue
}
// No need to refresh multiple times if multiple sources match.
break
} else if change.shaBefore != "" && change.shaAfter != "" {
if err := a.storePreviouslyCachedManifests(&app, change, trackingMethod, appInstanceLabelKey); err != nil {
log.Warnf("Failed to store cached manifests of previous revision for app '%s': %v", app.Name, err)
}
}
}
Expand Down

0 comments on commit 5935fac

Please sign in to comment.