Skip to content

Commit

Permalink
Merge pull request #1065 from titanventura/concurrent-resource-fetchi…
Browse files Browse the repository at this point in the history
…ng-per-provider

feat: concurrent resource fetching per provider
  • Loading branch information
mlabouardy authored Oct 16, 2023
2 parents e9cdfd6 + 1d46d1d commit ee52e42
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 36 deletions.
54 changes: 34 additions & 20 deletions internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"runtime"
"strconv"
"strings"
"sync"
"time"

"github.com/getsentry/sentry-go"
Expand Down Expand Up @@ -54,7 +55,7 @@ var Arch = runtime.GOARCH
var db *bun.DB
var analytics utils.Analytics

func Exec(address string, port int, configPath string, telemetry bool, a utils.Analytics, regions []string, cmd *cobra.Command) error {
func Exec(address string, port int, configPath string, telemetry bool, a utils.Analytics, regions []string, _ *cobra.Command) error {
analytics = a

ctx := context.Background()
Expand All @@ -79,10 +80,8 @@ func Exec(address string, port int, configPath string, telemetry bool, a utils.A

_, err = cron.Every(1).Hours().Do(func() {
log.Info("Fetching resources workflow has started")
err = fetchResources(ctx, clients, regions, telemetry)
if err != nil {
log.Fatal(err)
}

fetchResources(ctx, clients, regions, telemetry)
})

if err != nil {
Expand Down Expand Up @@ -209,7 +208,7 @@ func setupDBConnection(c *models.Config) error {
return nil
}

func triggerFetchingWorfklow(ctx context.Context, client providers.ProviderClient, provider string, telemetry bool, regions []string) {
func triggerFetchingWorfklow(ctx context.Context, client providers.ProviderClient, provider string, telemetry bool, regions []string, wp *providers.WorkerPool) {
localHub := sentry.CurrentHub().Clone()

defer func() {
Expand All @@ -233,7 +232,7 @@ func triggerFetchingWorfklow(ctx context.Context, client providers.ProviderClien

switch provider {
case "AWS":
aws.FetchResources(ctx, client, regions, db, telemetry, analytics)
aws.FetchResources(ctx, client, regions, db, telemetry, analytics, wp)
case "DigitalOcean":
do.FetchResources(ctx, client, db, telemetry, analytics)
case "OCI":
Expand All @@ -257,33 +256,48 @@ func triggerFetchingWorfklow(ctx context.Context, client providers.ProviderClien
}
}

func fetchResources(ctx context.Context, clients []providers.ProviderClient, regions []string, telemetry bool) error {
func fetchResources(ctx context.Context, clients []providers.ProviderClient, regions []string, telemetry bool) {
numWorkers := 64
wp := providers.NewWorkerPool(numWorkers)
wp.Start()

var wwg sync.WaitGroup
workflowTrigger := func(client providers.ProviderClient, provider string) {
wwg.Add(1)
go func() {
defer wwg.Done()
triggerFetchingWorfklow(ctx, client, provider, telemetry, regions, wp)
}()
}

for _, client := range clients {
if client.AWSClient != nil {
go triggerFetchingWorfklow(ctx, client, "AWS", telemetry, regions)
workflowTrigger(client, "AWS")
} else if client.DigitalOceanClient != nil {
go triggerFetchingWorfklow(ctx, client, "DigitalOcean", telemetry, regions)
workflowTrigger(client, "DigitalOcean")
} else if client.OciClient != nil {
go triggerFetchingWorfklow(ctx, client, "OCI", telemetry, regions)
workflowTrigger(client, "OCI")
} else if client.CivoClient != nil {
go triggerFetchingWorfklow(ctx, client, "Civo", telemetry, regions)
workflowTrigger(client, "Civo")
} else if client.K8sClient != nil {
go triggerFetchingWorfklow(ctx, client, "Kubernetes", telemetry, regions)
workflowTrigger(client, "Kubernetes")
} else if client.LinodeClient != nil {
go triggerFetchingWorfklow(ctx, client, "Linode", telemetry, regions)
workflowTrigger(client, "Linode")
} else if client.TencentClient != nil {
go triggerFetchingWorfklow(ctx, client, "Tencent", telemetry, regions)
workflowTrigger(client, "Tencent")
} else if client.AzureClient != nil {
go triggerFetchingWorfklow(ctx, client, "Azure", telemetry, regions)
workflowTrigger(client, "Azure")
} else if client.ScalewayClient != nil {
go triggerFetchingWorfklow(ctx, client, "Scaleway", telemetry, regions)
workflowTrigger(client, "Scaleway")
} else if client.MongoDBAtlasClient != nil {
go triggerFetchingWorfklow(ctx, client, "MongoDBAtlas", telemetry, regions)
workflowTrigger(client, "MongoDBAtlas")
} else if client.GCPClient != nil {
go triggerFetchingWorfklow(ctx, client, "GCP", telemetry, regions)
workflowTrigger(client, "GCP")
}
}
return nil

wwg.Wait()
wp.Wait()
}

func checkUpgrade() {
Expand Down
37 changes: 37 additions & 0 deletions internal/internal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package internal

import (
"context"
"io"
"testing"

log "github.com/sirupsen/logrus"

"github.com/tailwarden/komiser/internal/config"
"github.com/tailwarden/komiser/utils"
)

// BenchmarkFactorial benchmarks the Factorial function.
func BenchmarkFetchResources(b *testing.B) {
// Setup
ctx := context.TODO()
log.SetOutput(io.Discard)
analytics.Init()
cfg, clients, accounts, err := config.Load("/workspaces/komiser/config.toml", false, analytics, db)
if err != nil {
b.Fatalf("Error during config setup: %v", err)
}
err = setupDBConnection(cfg)
if err != nil {
b.Fatalf("Error during DB setup: %v", err)
}
err = utils.SetupSchema(db, cfg, accounts)
if err != nil {
b.Fatalf("Error during schema setup: %v", err)
}

// The benchmark function will run b.N times
for i := 0; i < b.N; i++ {
fetchResources(ctx, clients, []string{}, false)
}
}
34 changes: 18 additions & 16 deletions providers/aws/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func listOfSupportedServices() []providers.FetchDataFunction {
}
}

func FetchResources(ctx context.Context, client providers.ProviderClient, regions []string, db *bun.DB, telemetry bool, analytics utils.Analytics) {
func FetchResources(ctx context.Context, client providers.ProviderClient, regions []string, db *bun.DB, telemetry bool, analytics utils.Analytics, wp *providers.WorkerPool) {
listOfSupportedRegions := getRegions()
if len(regions) > 0 {
log.Infof("Komiser will fetch resources from the following regions: %s", strings.Join(regions, ","))
Expand All @@ -108,23 +108,25 @@ func FetchResources(ctx context.Context, client providers.ProviderClient, region
for _, region := range listOfSupportedRegions {
client.AWSClient.Region = region
for _, fetchResources := range listOfSupportedServices() {
resources, err := fetchResources(ctx, client)
if err != nil {
log.Warnf("[%s][AWS] %s", client.Name, err)
} else {
for _, resource := range resources {
_, err = db.NewInsert().Model(&resource).On("CONFLICT (resource_id) DO UPDATE").Set("cost = EXCLUDED.cost, relations=EXCLUDED.relations").Exec(context.Background())
if err != nil {
log.WithError(err).Errorf("db trigger failed")
wp.SubmitTask(func() {
resources, err := fetchResources(ctx, client)
if err != nil {
log.Warnf("[%s][AWS] %s", client.Name, err)
} else {
for _, resource := range resources {
_, err = db.NewInsert().Model(&resource).On("CONFLICT (resource_id) DO UPDATE").Set("cost = EXCLUDED.cost, relations=EXCLUDED.relations").Exec(context.Background())
if err != nil {
log.WithError(err).Errorf("db trigger failed")
}
}
if telemetry {
analytics.TrackEvent("discovered_resources", map[string]interface{}{
"provider": "AWS",
"resources": len(resources),
})
}
}
if telemetry {
analytics.TrackEvent("discovered_resources", map[string]interface{}{
"provider": "AWS",
"resources": len(resources),
})
}
}
})
}
}
}
Expand Down
37 changes: 37 additions & 0 deletions providers/providers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package providers

import (
"context"
"sync"

"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/aws/aws-sdk-go-v2/aws"
Expand Down Expand Up @@ -47,3 +48,39 @@ type K8sClient struct {
Client *kubernetes.Clientset
OpencostBaseUrl string
}

type WorkerPool struct {
numWorkers int
tasks chan func()
wg sync.WaitGroup
}

func NewWorkerPool(numWorkers int) *WorkerPool {
return &WorkerPool{
numWorkers: numWorkers,
tasks: make(chan func()),
}
}

func (wp *WorkerPool) Start() {
for i := 0; i < wp.numWorkers; i++ {
go wp.worker()
}
}

func (wp *WorkerPool) SubmitTask(task func()) {
wp.wg.Add(1)
wp.tasks <- task
}

func (wp *WorkerPool) Wait() {
wp.wg.Wait()
close(wp.tasks)
}

func (wp *WorkerPool) worker() {
for task := range wp.tasks {
task()
wp.wg.Done()
}
}

0 comments on commit ee52e42

Please sign in to comment.