From 90963c45953edfd323361817b7f63e8e0f613cdd Mon Sep 17 00:00:00 2001 From: titanventura Date: Mon, 9 Oct 2023 21:22:12 +0530 Subject: [PATCH 1/5] add. concurrent resource fetching code --- internal/internal.go | 34 +++++++++++++++++++--------------- providers/aws/aws.go | 34 ++++++++++++++++++---------------- providers/providers.go | 37 +++++++++++++++++++++++++++++++++++++ 3 files changed, 74 insertions(+), 31 deletions(-) diff --git a/internal/internal.go b/internal/internal.go index 1765a1171..48b966938 100644 --- a/internal/internal.go +++ b/internal/internal.go @@ -79,10 +79,14 @@ func Exec(address string, port int, configPath string, telemetry bool, a utils.A _, err = cron.Every(1).Hours().Do(func() { log.Info("Fetching resources workflow has started") - err = fetchResources(ctx, clients, regions, telemetry) + + numWorkers := 64 + wp := providers.NewWorkerPool(numWorkers) + err = fetchResources(ctx, clients, regions, telemetry, wp) if err != nil { log.Fatal(err) } + wp.Wait() }) if err != nil { @@ -209,7 +213,7 @@ func setupDBConnection(c *models.Config) error { return nil } -func triggerFetchingWorfklow(ctx context.Context, client providers.ProviderClient, provider string, telemetry bool, regions []string) { +func triggerFetchingWorfklow(ctx context.Context, client providers.ProviderClient, provider string, telemetry bool, regions []string, wp *providers.WorkerPool) { localHub := sentry.CurrentHub().Clone() defer func() { @@ -233,7 +237,7 @@ func triggerFetchingWorfklow(ctx context.Context, client providers.ProviderClien switch provider { case "AWS": - aws.FetchResources(ctx, client, regions, db, telemetry, analytics) + aws.FetchResources(ctx, client, regions, db, telemetry, analytics, wp) case "DigitalOcean": do.FetchResources(ctx, client, db, telemetry, analytics) case "OCI": @@ -257,30 +261,30 @@ func triggerFetchingWorfklow(ctx context.Context, client providers.ProviderClien } } -func fetchResources(ctx context.Context, clients []providers.ProviderClient, regions []string, telemetry bool) error { +func fetchResources(ctx context.Context, clients []providers.ProviderClient, regions []string, telemetry bool, wp *providers.WorkerPool) error { for _, client := range clients { if client.AWSClient != nil { - go triggerFetchingWorfklow(ctx, client, "AWS", telemetry, regions) + go triggerFetchingWorfklow(ctx, client, "AWS", telemetry, regions, wp) } else if client.DigitalOceanClient != nil { - go triggerFetchingWorfklow(ctx, client, "DigitalOcean", telemetry, regions) + go triggerFetchingWorfklow(ctx, client, "DigitalOcean", telemetry, regions, wp) } else if client.OciClient != nil { - go triggerFetchingWorfklow(ctx, client, "OCI", telemetry, regions) + go triggerFetchingWorfklow(ctx, client, "OCI", telemetry, regions, wp) } else if client.CivoClient != nil { - go triggerFetchingWorfklow(ctx, client, "Civo", telemetry, regions) + go triggerFetchingWorfklow(ctx, client, "Civo", telemetry, regions, wp) } else if client.K8sClient != nil { - go triggerFetchingWorfklow(ctx, client, "Kubernetes", telemetry, regions) + go triggerFetchingWorfklow(ctx, client, "Kubernetes", telemetry, regions, wp) } else if client.LinodeClient != nil { - go triggerFetchingWorfklow(ctx, client, "Linode", telemetry, regions) + go triggerFetchingWorfklow(ctx, client, "Linode", telemetry, regions, wp) } else if client.TencentClient != nil { - go triggerFetchingWorfklow(ctx, client, "Tencent", telemetry, regions) + go triggerFetchingWorfklow(ctx, client, "Tencent", telemetry, regions, wp) } else if client.AzureClient != nil { - go triggerFetchingWorfklow(ctx, client, "Azure", telemetry, regions) + go triggerFetchingWorfklow(ctx, client, "Azure", telemetry, regions, wp) } else if client.ScalewayClient != nil { - go triggerFetchingWorfklow(ctx, client, "Scaleway", telemetry, regions) + go triggerFetchingWorfklow(ctx, client, "Scaleway", telemetry, regions, wp) } else if client.MongoDBAtlasClient != nil { - go triggerFetchingWorfklow(ctx, client, "MongoDBAtlas", telemetry, regions) + go triggerFetchingWorfklow(ctx, client, "MongoDBAtlas", telemetry, regions, wp) } else if client.GCPClient != nil { - go triggerFetchingWorfklow(ctx, client, "GCP", telemetry, regions) + go triggerFetchingWorfklow(ctx, client, "GCP", telemetry, regions, wp) } } return nil diff --git a/providers/aws/aws.go b/providers/aws/aws.go index ecf1e862e..7b1cc8f4d 100644 --- a/providers/aws/aws.go +++ b/providers/aws/aws.go @@ -98,7 +98,7 @@ func listOfSupportedServices() []providers.FetchDataFunction { } } -func FetchResources(ctx context.Context, client providers.ProviderClient, regions []string, db *bun.DB, telemetry bool, analytics utils.Analytics) { +func FetchResources(ctx context.Context, client providers.ProviderClient, regions []string, db *bun.DB, telemetry bool, analytics utils.Analytics, wp *providers.WorkerPool) { listOfSupportedRegions := getRegions() if len(regions) > 0 { log.Infof("Komiser will fetch resources from the following regions: %s", strings.Join(regions, ",")) @@ -108,23 +108,25 @@ func FetchResources(ctx context.Context, client providers.ProviderClient, region for _, region := range listOfSupportedRegions { client.AWSClient.Region = region for _, fetchResources := range listOfSupportedServices() { - resources, err := fetchResources(ctx, client) - if err != nil { - log.Warnf("[%s][AWS] %s", client.Name, err) - } else { - for _, resource := range resources { - _, err = db.NewInsert().Model(&resource).On("CONFLICT (resource_id) DO UPDATE").Set("cost = EXCLUDED.cost, relations=EXCLUDED.relations").Exec(context.Background()) - if err != nil { - log.WithError(err).Errorf("db trigger failed") + wp.SubmitTask(func() { + resources, err := fetchResources(ctx, client) + if err != nil { + log.Warnf("[%s][AWS] %s", client.Name, err) + } else { + for _, resource := range resources { + _, err = db.NewInsert().Model(&resource).On("CONFLICT (resource_id) DO UPDATE").Set("cost = EXCLUDED.cost, relations=EXCLUDED.relations").Exec(context.Background()) + if err != nil { + log.WithError(err).Errorf("db trigger failed") + } + } + if telemetry { + analytics.TrackEvent("discovered_resources", map[string]interface{}{ + "provider": "AWS", + "resources": len(resources), + }) } } - if telemetry { - analytics.TrackEvent("discovered_resources", map[string]interface{}{ - "provider": "AWS", - "resources": len(resources), - }) - } - } + }) } } } diff --git a/providers/providers.go b/providers/providers.go index 138fa34da..9e2b1dc22 100644 --- a/providers/providers.go +++ b/providers/providers.go @@ -2,6 +2,7 @@ package providers import ( "context" + "sync" "github.com/Azure/azure-sdk-for-go/sdk/azidentity" "github.com/aws/aws-sdk-go-v2/aws" @@ -47,3 +48,39 @@ type K8sClient struct { Client *kubernetes.Clientset OpencostBaseUrl string } + +type WorkerPool struct { + numWorkers int + tasks chan func() + wg sync.WaitGroup +} + +func NewWorkerPool(numWorkers int) *WorkerPool { + return &WorkerPool{ + numWorkers: numWorkers, + tasks: make(chan func()), + } +} + +func (wp *WorkerPool) Start() { + for i := 0; i < wp.numWorkers; i++ { + wp.wg.Add(1) + go wp.worker() + } +} + +func (wp *WorkerPool) SubmitTask(task func()) { + wp.tasks <- task +} + +func (wp *WorkerPool) Wait() { + close(wp.tasks) + wp.wg.Wait() +} + +func (wp *WorkerPool) worker() { + defer wp.wg.Done() + for task := range wp.tasks { + task() + } +} From 3599f1f22e0dd30e5edd499c4752ea775b4ddc5a Mon Sep 17 00:00:00 2001 From: titanventura Date: Tue, 10 Oct 2023 00:35:10 +0530 Subject: [PATCH 2/5] feat(concurrency): fix - waiting and waitgroup logic --- internal/internal.go | 32 +++++++++++++++++++++----------- providers/providers.go | 8 ++++---- 2 files changed, 25 insertions(+), 15 deletions(-) diff --git a/internal/internal.go b/internal/internal.go index 48b966938..2e3288c09 100644 --- a/internal/internal.go +++ b/internal/internal.go @@ -82,6 +82,7 @@ func Exec(address string, port int, configPath string, telemetry bool, a utils.A numWorkers := 64 wp := providers.NewWorkerPool(numWorkers) + wp.Start() err = fetchResources(ctx, clients, regions, telemetry, wp) if err != nil { log.Fatal(err) @@ -262,29 +263,38 @@ func triggerFetchingWorfklow(ctx context.Context, client providers.ProviderClien } func fetchResources(ctx context.Context, clients []providers.ProviderClient, regions []string, telemetry bool, wp *providers.WorkerPool) error { + + workflowTrigger := func(client providers.ProviderClient, provider string) { + wp.Wg.Add(1) + go func() { + defer wp.Wg.Done() + triggerFetchingWorfklow(ctx, client, provider, telemetry, regions, wp) + }() + } + for _, client := range clients { if client.AWSClient != nil { - go triggerFetchingWorfklow(ctx, client, "AWS", telemetry, regions, wp) + workflowTrigger(client, "AWS") } else if client.DigitalOceanClient != nil { - go triggerFetchingWorfklow(ctx, client, "DigitalOcean", telemetry, regions, wp) + workflowTrigger(client, "DigitalOcean") } else if client.OciClient != nil { - go triggerFetchingWorfklow(ctx, client, "OCI", telemetry, regions, wp) + workflowTrigger(client, "OCI") } else if client.CivoClient != nil { - go triggerFetchingWorfklow(ctx, client, "Civo", telemetry, regions, wp) + workflowTrigger(client, "Civo") } else if client.K8sClient != nil { - go triggerFetchingWorfklow(ctx, client, "Kubernetes", telemetry, regions, wp) + workflowTrigger(client, "Kubernetes") } else if client.LinodeClient != nil { - go triggerFetchingWorfklow(ctx, client, "Linode", telemetry, regions, wp) + workflowTrigger(client, "Linode") } else if client.TencentClient != nil { - go triggerFetchingWorfklow(ctx, client, "Tencent", telemetry, regions, wp) + workflowTrigger(client, "Tencent") } else if client.AzureClient != nil { - go triggerFetchingWorfklow(ctx, client, "Azure", telemetry, regions, wp) + workflowTrigger(client, "Azure") } else if client.ScalewayClient != nil { - go triggerFetchingWorfklow(ctx, client, "Scaleway", telemetry, regions, wp) + workflowTrigger(client, "Scaleway") } else if client.MongoDBAtlasClient != nil { - go triggerFetchingWorfklow(ctx, client, "MongoDBAtlas", telemetry, regions, wp) + workflowTrigger(client, "MongoDBAtlas") } else if client.GCPClient != nil { - go triggerFetchingWorfklow(ctx, client, "GCP", telemetry, regions, wp) + workflowTrigger(client, "GCP") } } return nil diff --git a/providers/providers.go b/providers/providers.go index 9e2b1dc22..81afb7b62 100644 --- a/providers/providers.go +++ b/providers/providers.go @@ -52,7 +52,7 @@ type K8sClient struct { type WorkerPool struct { numWorkers int tasks chan func() - wg sync.WaitGroup + Wg sync.WaitGroup } func NewWorkerPool(numWorkers int) *WorkerPool { @@ -64,23 +64,23 @@ func NewWorkerPool(numWorkers int) *WorkerPool { func (wp *WorkerPool) Start() { for i := 0; i < wp.numWorkers; i++ { - wp.wg.Add(1) go wp.worker() } } func (wp *WorkerPool) SubmitTask(task func()) { + wp.Wg.Add(1) wp.tasks <- task } func (wp *WorkerPool) Wait() { + wp.Wg.Wait() close(wp.tasks) - wp.wg.Wait() } func (wp *WorkerPool) worker() { - defer wp.wg.Done() for task := range wp.tasks { task() + wp.Wg.Done() } } From 493845ddeac6eab63f3a7f15dd7876937a725a16 Mon Sep 17 00:00:00 2001 From: Azanul Haque <42029519+Azanul@users.noreply.github.com> Date: Tue, 10 Oct 2023 06:30:56 +0000 Subject: [PATCH 3/5] using a separate WaitGroup Signed-off-by: Azanul Haque <42029519+Azanul@users.noreply.github.com> --- internal/internal.go | 10 +++++++--- providers/providers.go | 8 ++++---- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/internal/internal.go b/internal/internal.go index 2e3288c09..1c55b6de5 100644 --- a/internal/internal.go +++ b/internal/internal.go @@ -11,6 +11,7 @@ import ( "runtime" "strconv" "strings" + "sync" "time" "github.com/getsentry/sentry-go" @@ -54,7 +55,7 @@ var Arch = runtime.GOARCH var db *bun.DB var analytics utils.Analytics -func Exec(address string, port int, configPath string, telemetry bool, a utils.Analytics, regions []string, cmd *cobra.Command) error { +func Exec(address string, port int, configPath string, telemetry bool, a utils.Analytics, regions []string, _ *cobra.Command) error { analytics = a ctx := context.Background() @@ -264,10 +265,11 @@ func triggerFetchingWorfklow(ctx context.Context, client providers.ProviderClien func fetchResources(ctx context.Context, clients []providers.ProviderClient, regions []string, telemetry bool, wp *providers.WorkerPool) error { + var wwg sync.WaitGroup workflowTrigger := func(client providers.ProviderClient, provider string) { - wp.Wg.Add(1) + wwg.Add(1) go func() { - defer wp.Wg.Done() + defer wwg.Done() triggerFetchingWorfklow(ctx, client, provider, telemetry, regions, wp) }() } @@ -297,6 +299,8 @@ func fetchResources(ctx context.Context, clients []providers.ProviderClient, reg workflowTrigger(client, "GCP") } } + + wwg.Wait() return nil } diff --git a/providers/providers.go b/providers/providers.go index 81afb7b62..ef0198714 100644 --- a/providers/providers.go +++ b/providers/providers.go @@ -52,7 +52,7 @@ type K8sClient struct { type WorkerPool struct { numWorkers int tasks chan func() - Wg sync.WaitGroup + wg sync.WaitGroup } func NewWorkerPool(numWorkers int) *WorkerPool { @@ -69,18 +69,18 @@ func (wp *WorkerPool) Start() { } func (wp *WorkerPool) SubmitTask(task func()) { - wp.Wg.Add(1) + wp.wg.Add(1) wp.tasks <- task } func (wp *WorkerPool) Wait() { - wp.Wg.Wait() + wp.wg.Wait() close(wp.tasks) } func (wp *WorkerPool) worker() { for task := range wp.tasks { task() - wp.Wg.Done() + wp.wg.Done() } } From ce53d680bcce1708376ec475e6ae1992b6a83fb3 Mon Sep 17 00:00:00 2001 From: Azanul Date: Sun, 15 Oct 2023 22:55:34 +0530 Subject: [PATCH 4/5] refac(fetchResources): encapsulate Signed-off-by: Azanul --- internal/internal.go | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/internal/internal.go b/internal/internal.go index 1c55b6de5..29d8ea202 100644 --- a/internal/internal.go +++ b/internal/internal.go @@ -81,14 +81,7 @@ func Exec(address string, port int, configPath string, telemetry bool, a utils.A _, err = cron.Every(1).Hours().Do(func() { log.Info("Fetching resources workflow has started") - numWorkers := 64 - wp := providers.NewWorkerPool(numWorkers) - wp.Start() - err = fetchResources(ctx, clients, regions, telemetry, wp) - if err != nil { - log.Fatal(err) - } - wp.Wait() + fetchResources(ctx, clients, regions, telemetry) }) if err != nil { @@ -263,7 +256,10 @@ func triggerFetchingWorfklow(ctx context.Context, client providers.ProviderClien } } -func fetchResources(ctx context.Context, clients []providers.ProviderClient, regions []string, telemetry bool, wp *providers.WorkerPool) error { +func fetchResources(ctx context.Context, clients []providers.ProviderClient, regions []string, telemetry bool) { + numWorkers := 64 + wp := providers.NewWorkerPool(numWorkers) + wp.Start() var wwg sync.WaitGroup workflowTrigger := func(client providers.ProviderClient, provider string) { @@ -301,7 +297,7 @@ func fetchResources(ctx context.Context, clients []providers.ProviderClient, reg } wwg.Wait() - return nil + wp.Wait() } func checkUpgrade() { From 1d46d1dcaf166170532959644982b2dd03c49e47 Mon Sep 17 00:00:00 2001 From: Azanul Date: Sun, 15 Oct 2023 22:56:09 +0530 Subject: [PATCH 5/5] fear(fetchResources): benchmark test Signed-off-by: Azanul --- internal/internal_test.go | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) create mode 100644 internal/internal_test.go diff --git a/internal/internal_test.go b/internal/internal_test.go new file mode 100644 index 000000000..f8bfc9ee9 --- /dev/null +++ b/internal/internal_test.go @@ -0,0 +1,37 @@ +package internal + +import ( + "context" + "io" + "testing" + + log "github.com/sirupsen/logrus" + + "github.com/tailwarden/komiser/internal/config" + "github.com/tailwarden/komiser/utils" +) + +// BenchmarkFactorial benchmarks the Factorial function. +func BenchmarkFetchResources(b *testing.B) { + // Setup + ctx := context.TODO() + log.SetOutput(io.Discard) + analytics.Init() + cfg, clients, accounts, err := config.Load("/workspaces/komiser/config.toml", false, analytics, db) + if err != nil { + b.Fatalf("Error during config setup: %v", err) + } + err = setupDBConnection(cfg) + if err != nil { + b.Fatalf("Error during DB setup: %v", err) + } + err = utils.SetupSchema(db, cfg, accounts) + if err != nil { + b.Fatalf("Error during schema setup: %v", err) + } + + // The benchmark function will run b.N times + for i := 0; i < b.N; i++ { + fetchResources(ctx, clients, []string{}, false) + } +}