Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: reporter uses sources-server #357

Merged
merged 21 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion changelog/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
### Features
- feat(event-reporter): multisourced apps support improvements: reporting syncOperationRevisions, detecting correct resource sourceIdx, reporting correct git commit info
- feat(event-reporter): using sources-server for getting application version
10 changes: 10 additions & 0 deletions cmd/event-reporter-server/commands/event_reporter_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"math"
"time"

"github.com/argoproj/argo-cd/v2/pkg/sources_server_client"

"github.com/argoproj/argo-cd/v2/event_reporter/reporter"

"github.com/argoproj/argo-cd/v2/event_reporter"
Expand Down Expand Up @@ -104,6 +106,8 @@ func NewCommand() *cobra.Command {
rateLimiterBucketSize int
rateLimiterDuration time.Duration
rateLimiterLearningMode bool
useSourcesServer bool
sourcesServerBaseURL string
)
command := &cobra.Command{
Use: cliName,
Expand Down Expand Up @@ -192,6 +196,10 @@ func NewCommand() *cobra.Command {
Capacity: rateLimiterBucketSize,
LearningMode: rateLimiterLearningMode,
},
UseSourcesServer: useSourcesServer,
SourcesServerConfig: &sources_server_client.SourcesServerConfig{
BaseURL: sourcesServerBaseURL,
},
}

log.Infof("Starting event reporter server with grpc transport %v", useGrpc)
Expand Down Expand Up @@ -245,6 +253,8 @@ func NewCommand() *cobra.Command {
command.Flags().IntVar(&rateLimiterBucketSize, "rate-limiter-bucket-size", env.ParseNumFromEnv("RATE_LIMITER_BUCKET_SIZE", math.MaxInt, 0, math.MaxInt), "The maximum amount of requests allowed per window.")
command.Flags().DurationVar(&rateLimiterDuration, "rate-limiter-period", env.ParseDurationFromEnv("RATE_LIMITER_DURATION", 24*time.Hour, 0, math.MaxInt64), "The rate limit window size.")
command.Flags().BoolVar(&rateLimiterLearningMode, "rate-limiter-learning-mode", env.ParseBoolFromEnv("RATE_LIMITER_LEARNING_MODE_ENABLED", false), "The rate limit enabled in learning mode ( not blocking sending to queue but logging it )")
command.Flags().BoolVar(&useSourcesServer, "use-sources-server", env.ParseBoolFromEnv("SOURCES_SERVER_ENABLED", false), "Use sources-server instead of repo-server fork")
command.Flags().StringVar(&sourcesServerBaseURL, "sources-server-base-url", env.StringFromEnv("SOURCES_SERVER_BASE_URL", common.DefaultSourcesServerAddr), "Sources-server base URL")
cacheSrc = servercache.AddCacheFlagsToCmd(command, cacheutil.Options{
OnClientCreated: func(client *redis.Client) {
redisClient = client
Expand Down
3 changes: 2 additions & 1 deletion common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ const (
// DefaultDexServerAddr is the HTTP address of the Dex OIDC server, which we run a reverse proxy against
DefaultDexServerAddr = "argocd-dex-server:5556"
// DefaultRedisAddr is the default redis address
DefaultRedisAddr = "argocd-redis:6379"
DefaultRedisAddr = "argocd-redis:6379"
DefaultSourcesServerAddr = "sources-server:8090"
)

// Kubernetes ConfigMap and Secret resource names which hold Argo CD settings
Expand Down
6 changes: 4 additions & 2 deletions event_reporter/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"strings"
"time"

"github.com/argoproj/argo-cd/v2/pkg/sources_server_client"

"github.com/argoproj/argo-cd/v2/util/db"

appclient "github.com/argoproj/argo-cd/v2/event_reporter/application"
Expand Down Expand Up @@ -45,15 +47,15 @@ type eventReporterController struct {
metricsServer *metrics.MetricsServer
}

func NewEventReporterController(appInformer cache.SharedIndexInformer, cache *servercache.Cache, settingsMgr *settings.SettingsManager, applicationServiceClient appclient.ApplicationClient, appLister applisters.ApplicationLister, codefreshConfig *codefresh.CodefreshConfig, metricsServer *metrics.MetricsServer, featureManager *reporter.FeatureManager, rateLimiterOpts *reporter.RateLimiterOpts, db db.ArgoDB) EventReporterController {
func NewEventReporterController(appInformer cache.SharedIndexInformer, cache *servercache.Cache, settingsMgr *settings.SettingsManager, applicationServiceClient appclient.ApplicationClient, appLister applisters.ApplicationLister, codefreshConfig *codefresh.CodefreshConfig, metricsServer *metrics.MetricsServer, featureManager *reporter.FeatureManager, rateLimiterOpts *reporter.RateLimiterOpts, db db.ArgoDB, useSourcesServer bool, sourcesServerConfig *sources_server_client.SourcesServerConfig) EventReporterController {
appBroadcaster := reporter.NewBroadcaster(featureManager, metricsServer, rateLimiterOpts)
_, err := appInformer.AddEventHandler(appBroadcaster)
if err != nil {
log.Error(err)
}
return &eventReporterController{
appBroadcaster: appBroadcaster,
applicationEventReporter: reporter.NewApplicationEventReporter(cache, applicationServiceClient, appLister, codefreshConfig, metricsServer, db),
applicationEventReporter: reporter.NewApplicationEventReporter(cache, applicationServiceClient, appLister, codefreshConfig, metricsServer, db, useSourcesServer, sourcesServerConfig),
cache: cache,
settingsMgr: settingsMgr,
applicationServiceClient: applicationServiceClient,
Expand Down
6 changes: 6 additions & 0 deletions event_reporter/reporter/app_revision_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ func TestGetRevisionsDetails(t *testing.T) {
&metrics.MetricsServer{},
fakeArgoDb(),
"0.0.1",
false,
nil,
}

result, _ := reporter.getRevisionsDetails(context.Background(), &app, []string{expectedRevision})
Expand Down Expand Up @@ -122,6 +124,8 @@ func TestGetRevisionsDetails(t *testing.T) {
&metrics.MetricsServer{},
fakeArgoDb(),
"0.0.1",
false,
nil,
}

result, _ := reporter.getRevisionsDetails(context.Background(), &app, []string{expectedRevision1, expectedRevision2})
Expand Down Expand Up @@ -165,6 +169,8 @@ func TestGetRevisionsDetails(t *testing.T) {
&metrics.MetricsServer{},
fakeArgoDb(),
"0.0.1",
false,
nil,
}

result, _ := reporter.getRevisionsDetails(context.Background(), &app, []string{expectedRevision})
Expand Down
38 changes: 33 additions & 5 deletions event_reporter/reporter/application_event_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"strings"
"time"

"github.com/argoproj/argo-cd/v2/pkg/sources_server_client"

"github.com/argoproj/argo-cd/v2/util/db"

"github.com/argoproj/argo-cd/v2/event_reporter/utils"
Expand Down Expand Up @@ -46,6 +48,8 @@ type applicationEventReporter struct {
metricsServer *metrics.MetricsServer
db db.ArgoDB
runtimeVersion string
useSourcesServer bool
sourcesServerClient sources_server_client.SourceServerClientInteface
}

type ApplicationEventReporter interface {
Expand All @@ -59,7 +63,7 @@ type ApplicationEventReporter interface {
ShouldSendApplicationEvent(ae *appv1.ApplicationWatchEvent) (shouldSend bool, syncStatusChanged bool)
}

func NewApplicationEventReporter(cache *servercache.Cache, applicationServiceClient appclient.ApplicationClient, appLister applisters.ApplicationLister, codefreshConfig *codefresh.CodefreshConfig, metricsServer *metrics.MetricsServer, db db.ArgoDB) ApplicationEventReporter {
func NewApplicationEventReporter(cache *servercache.Cache, applicationServiceClient appclient.ApplicationClient, appLister applisters.ApplicationLister, codefreshConfig *codefresh.CodefreshConfig, metricsServer *metrics.MetricsServer, db db.ArgoDB, useSourcesServer bool, sourcesServerConfig *sources_server_client.SourcesServerConfig) ApplicationEventReporter {
return &applicationEventReporter{
cache: cache,
applicationServiceClient: applicationServiceClient,
Expand All @@ -68,6 +72,8 @@ func NewApplicationEventReporter(cache *servercache.Cache, applicationServiceCli
metricsServer: metricsServer,
db: db,
runtimeVersion: codefreshConfig.RuntimeVersion,
useSourcesServer: useSourcesServer,
sourcesServerClient: sources_server_client.NewSourceServerClient(sourcesServerConfig),
}
}

Expand Down Expand Up @@ -265,7 +271,20 @@ func (s *applicationEventReporter) resolveApplicationVersions(ctx context.Contex
}

syncManifests, _ := s.getDesiredManifests(ctx, logCtx, a, nil, &sourcePositions, syncResultRevisions)
return syncManifests.GetApplicationVersions()

var applicationVersions *apiclient.ApplicationVersions
if s.useSourcesServer {
log.Infof("cfGetAppVersion. Getting version from sourcesserver")
if len(*syncResultRevisions) == 0 {
return nil
}
appVers := s.sourcesServerClient.GetAppVersion(a, &(*syncResultRevisions)[0])
applicationVersions = utils.SourcesAppVersionsToRepo(appVers, logCtx)
} else {
applicationVersions = syncManifests.GetApplicationVersions()
}

return applicationVersions
}

syncResultRevision := utils.GetOperationSyncResultRevision(a)
Expand All @@ -275,7 +294,16 @@ func (s *applicationEventReporter) resolveApplicationVersions(ctx context.Contex
}

syncManifests, _ := s.getDesiredManifests(ctx, logCtx, a, syncResultRevision, nil, nil)
return syncManifests.GetApplicationVersions()

var applicationVersions *apiclient.ApplicationVersions
if s.useSourcesServer {
log.Infof("cfGetAppVersion. Getting version from sourcesserver")
appVers := s.sourcesServerClient.GetAppVersion(a, syncResultRevision)
applicationVersions = utils.SourcesAppVersionsToRepo(appVers, logCtx)
} else {
applicationVersions = syncManifests.GetApplicationVersions()
}
return applicationVersions
}

func (s *applicationEventReporter) getAppForResourceReporting(
Expand Down Expand Up @@ -309,8 +337,8 @@ func (s *applicationEventReporter) processResource(
appEventProcessingStartedAt string,
desiredManifests *apiclient.ManifestResponse,
manifestGenErr bool,
originalApplication *appv1.Application, // passed onlu if resource is app
applicationVersions *apiclient.ApplicationVersions, // passed onlu if resource is app
originalApplication *appv1.Application, // passed only if resource is app
applicationVersions *apiclient.ApplicationVersions, // passed only if resource is app
reportedEntityParentApp *ReportedEntityParentApp,
argoTrackingMetadata *ArgoTrackingMetadata,
) error {
Expand Down
2 changes: 2 additions & 0 deletions event_reporter/reporter/application_event_reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,8 @@ func fakeReporter(customAppServiceClient appclient.ApplicationClient) *applicati
metricsServ,
fakeArgoDb(),
"0.0.1",
false,
nil,
}
}

Expand Down
6 changes: 5 additions & 1 deletion event_reporter/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"strings"
"time"

"github.com/argoproj/argo-cd/v2/pkg/sources_server_client"

appclient "github.com/argoproj/argo-cd/v2/event_reporter/application"
"github.com/argoproj/argo-cd/v2/event_reporter/reporter"

Expand Down Expand Up @@ -95,6 +97,8 @@ type EventReporterServerOpts struct {
RootPath string
CodefreshConfig *codefresh.CodefreshConfig
RateLimiterOpts *reporter.RateLimiterOpts
UseSourcesServer bool
SourcesServerConfig *sources_server_client.SourcesServerConfig
}

type handlerSwitcher struct {
Expand Down Expand Up @@ -153,7 +157,7 @@ func (a *EventReporterServer) Init(ctx context.Context) {
}

func (a *EventReporterServer) RunController(ctx context.Context) {
controller := event_reporter.NewEventReporterController(a.appInformer, a.Cache, a.settingsMgr, a.ApplicationServiceClient, a.appLister, a.CodefreshConfig, a.serviceSet.MetricsServer, a.featureManager, a.RateLimiterOpts, a.db)
controller := event_reporter.NewEventReporterController(a.appInformer, a.Cache, a.settingsMgr, a.ApplicationServiceClient, a.appLister, a.CodefreshConfig, a.serviceSet.MetricsServer, a.featureManager, a.RateLimiterOpts, a.db, a.UseSourcesServer, a.SourcesServerConfig)
go controller.Run(ctx)
}

Expand Down
17 changes: 17 additions & 0 deletions event_reporter/utils/app_version.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ package utils
import (
"encoding/json"

log "github.com/sirupsen/logrus"

"github.com/argoproj/argo-cd/v2/pkg/apiclient/events"
"github.com/argoproj/argo-cd/v2/pkg/sources_server_client"
"github.com/argoproj/argo-cd/v2/reposerver/apiclient"
)

Expand All @@ -16,3 +19,17 @@ func RepoAppVersionsToEvent(applicationVersions *apiclient.ApplicationVersions)
}
return applicationVersionsEvents, nil
}

func SourcesAppVersionsToRepo(applicationVersions *sources_server_client.AppVersionResult, logCtx *log.Entry) *apiclient.ApplicationVersions {
if applicationVersions == nil {
return nil
}
applicationVersionsRepo := &apiclient.ApplicationVersions{}
applicationVersionsData, _ := json.Marshal(applicationVersions)
oleksandr-codefresh marked this conversation as resolved.
Show resolved Hide resolved
err := json.Unmarshal(applicationVersionsData, applicationVersionsRepo)
if err != nil {
logCtx.Errorf("can't unmarshal app version: %v", err)
return nil
}
return applicationVersionsRepo
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/desertbit/timer v0.0.0-20180107155436-c41aec40b27f // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/dlclark/regexp2 v1.11.2
github.com/dlclark/regexp2 v1.11.4
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
github.com/emirpasic/gods v1.18.1 // indirect
github.com/evanphx/json-patch/v5 v5.8.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -848,8 +848,8 @@ github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/r
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/distribution/reference v0.5.0 h1:/FUIFXtfc/x2gpa5/VGfiGLuOIdYa1t65IKK2OFGvA0=
github.com/distribution/reference v0.5.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E=
github.com/dlclark/regexp2 v1.11.2 h1:/u628IuisSTwri5/UKloiIsH8+qF2Pu7xEQX+yIKg68=
github.com/dlclark/regexp2 v1.11.2/go.mod h1:DHkYz0B9wPfa6wondMfaivmHpzrQ3v9q8cnmRbL6yW8=
github.com/dlclark/regexp2 v1.11.4 h1:rPYF9/LECdNymJufQKmri9gV604RvvABwgOA8un7yAo=
github.com/dlclark/regexp2 v1.11.4/go.mod h1:DHkYz0B9wPfa6wondMfaivmHpzrQ3v9q8cnmRbL6yW8=
github.com/dnaeon/go-vcr v1.1.0 h1:ReYa/UBrRyQdant9B4fNHGoCNKw6qh6P0fsdGmZpR7c=
github.com/dnaeon/go-vcr v1.1.0/go.mod h1:M7tiix8f0r6mKKJ3Yq/kqU1OYf3MnfmBWVbPx/yU9ko=
github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE=
Expand Down
108 changes: 108 additions & 0 deletions pkg/sources_server_client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package sources_server_client

import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
netUrl "net/url"

log "github.com/sirupsen/logrus"

"github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
)

type VersionPayload struct {
App v1alpha1.Application `json:"app"`
Revision string `json:"revision"`
}

type DependenciesMap struct {
Lock string `json:"helm/Chart.lock"`
Deps string `json:"helm/dependencies"`
Requirements string `json:"helm/requirements.yaml"`
}

type AppVersionResult struct {
AppVersion string `json:"appVersion"`
Dependencies DependenciesMap `json:"dependencies"`
}

type SourcesServerConfig struct {
BaseURL string
}

type sourceServerClient struct {
clientConfig *SourcesServerConfig
}

type SourceServerClientInteface interface {
GetAppVersion(app *v1alpha1.Application, revision *string) *AppVersionResult
}

func (c *sourceServerClient) sendRequest(method, url string, payload interface{}) ([]byte, error) {
oleksandr-codefresh marked this conversation as resolved.
Show resolved Hide resolved
var requestBody []byte
var err error
if payload != nil {
requestBody, err = json.Marshal(payload)
if err != nil {
return nil, fmt.Errorf("error marshalling payload: %w", err)
}
}

fullURL, err := netUrl.JoinPath(c.clientConfig.BaseURL, url)
if err != nil {
return nil, fmt.Errorf("error joining path: %w", err)
}

req, err := http.NewRequest(method, fullURL, bytes.NewBuffer(requestBody))
if err != nil {
return nil, fmt.Errorf("error creating request: %w", err)
}

req.Header.Set("Content-Type", "application/json")

client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return nil, fmt.Errorf("error making request: %w", err)
}
defer resp.Body.Close()

if resp.StatusCode < 200 || resp.StatusCode >= 300 {
body, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("server responded with status %d: %s", resp.StatusCode, string(body))
}

body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("error reading response body: %w", err)
}

return body, nil
}

func (c *sourceServerClient) GetAppVersion(app *v1alpha1.Application, revision *string) *AppVersionResult {
log.Infof("cfGetAppVersion. Sending request to sources-server for %s", app.Name)
appVersionResult, err := c.sendRequest("POST", "/getAppVersion", VersionPayload{App: *app, Revision: *revision})
if err != nil {
log.Errorf("error getting app version: %v", err)
return nil
}

var versionStruct AppVersionResult
err = json.Unmarshal(appVersionResult, &versionStruct)
if err != nil {
log.Errorf("error unmarshaling app version: %v", err)
return nil
}

return &versionStruct
}

func NewSourceServerClient(clientConfig *SourcesServerConfig) SourceServerClientInteface {
return &sourceServerClient{
clientConfig: clientConfig,
}
}
Loading