Skip to content

Commit

Permalink
Merge pull request #18 from martijnvdp/concurrency
Browse files Browse the repository at this point in the history
add concurrency
  • Loading branch information
martijnvdp authored Mar 16, 2023
2 parents 6b7bfbf + 2004a10 commit e5d9721
Show file tree
Hide file tree
Showing 9 changed files with 63 additions and 37 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/gotests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ jobs:
-
name: Go Cyclo test
run: |
gocyclo -over 19 -ignore 'external' .
gocyclo -over 25 -ignore 'external' .
-
name: ineffassign test
run: |
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ Lambda event data:
"repositories": [ // optional if not specified it wil syn call repos that are configured with tags
"arn:aws:ecr:us-east-1:123456789012:repository/dev/datadog/datadog-operator","arn:aws:ecr:us-east-1:123456789012:repository/dev/datadog/datadog"]
"check_digest": true // check digest of existing tags on ecr and only add tags if the digest is not the same
"concurrent": 2 // max number of concurrent jobs
"max_results": 5
"slack_channel_id":"CDDF324"
"slack_errors_only": true // only return errors to slack
Expand Down
38 changes: 21 additions & 17 deletions pkg/lambda/check_digest.go
Original file line number Diff line number Diff line change
@@ -1,35 +1,39 @@
package lambda

import (
"crypto/sha256"
"encoding/hex"
"log"
"strings"

"github.com/google/go-containerregistry/pkg/authn"
"github.com/google/go-containerregistry/pkg/name"
"github.com/google/go-containerregistry/pkg/v1/remote"
"github.com/google/go-containerregistry/pkg/crane"
v1 "github.com/google/go-containerregistry/pkg/v1"
)

func getDigest(source string) (string, error) {
ref, err := name.ParseReference(source)
if err != nil {
panic(err)
}

img, err := remote.Image(ref, remote.WithAuthFromKeychain(authn.DefaultKeychain))
if err != nil && strings.Contains(err.Error(), "unsupported MediaType: \"application/vnd.docker.distribution.manifest.v1") {
return "", nil
}
if err != nil {
panic(err)
params := crane.Options{
Platform: &v1.Platform{
Architecture: "amd64",
OS: "linux",
},
}
opts := []crane.Option{crane.WithPlatform(params.Platform)}

digest, err := img.Digest()
manifest, err := crane.Manifest(source, opts...)
if err != nil && strings.Contains(err.Error(), "unsupported MediaType: \"application/vnd.docker.distribution.manifest.v1") {
return "", nil
}
if err != nil {
panic(err)
if err != nil && strings.Contains(err.Error(), "You have reached your pull rate limit.") {
log.Printf("Pull rate limit exceeded for %s", source)
return "", nil
}
return digest.String(), err

hash := sha256.New()
hash.Write(manifest)
digest := "sha256:" + hex.EncodeToString(hash.Sum(nil))

return digest, err
}

func checkNoDigest(imageName string, resultPublicRepoTags *[]string, resultsFromEcr *map[string]ecrResults) (result []string, err error) {
Expand Down
1 change: 1 addition & 0 deletions pkg/lambda/ecr.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ func (svc *ecrClient) getTagsToSync(i *inputRepository, ecrImageName string, max

return syncOptions{
tags: tags,
source: i.source,
ecrImageName: ecrImageName,
}, err
}
Expand Down
41 changes: 30 additions & 11 deletions pkg/lambda/lambda.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ import (
"path/filepath"
"strconv"
"strings"
"sync"
)

// LambdaEvent lambda input event data, fields have to be exported
type LambdaEvent struct {
Action string `json:"action"` // s3 or sync
CheckDigest bool `json:"check_digest"`
Concurrent int `json:"concurrent"` // number of concurrent syncs
Repositories []string `json:"repositories"`
MaxResults int `json:"max_results"`
SlackChannelID string `json:"slack_channel_id"`
Expand Down Expand Up @@ -145,29 +147,46 @@ func Start(ctx context.Context, event LambdaEvent) (response, error) {
}
log.Printf("Starting lambda for %s repositories", strconv.Itoa(len(repositories)))

for _, i := range repositories {
log.Printf("Processing repository: %s", i.source)
tagsToSync, err := svc.getTagsToSync(&i, i.ecrImageName, event.MaxResults, event.CheckDigest, environmentVars)

if err != nil {
return returnErr(err, environmentVars.slackOAuthToken, event.SlackChannelID, errSubject,
"Error getting tags to sync:")
totalItems, max := len(repositories), maxInt(event.Concurrent, 1)
var allTagsToSync []syncOptions
var wg sync.WaitGroup
var mu sync.Mutex
for i := 0; i < totalItems; i += max {
limit := max
if i+max > totalItems {
limit = totalItems - i
}

if len(tagsToSync.tags) <= 0 {
continue
wg.Add(limit)
for j := 0; j < limit; j++ {
repo := repositories[i+j]
log.Printf("Processing repository: %s", repo.source)
go func(j int) {
defer wg.Done()
tagsToSync, err := svc.getTagsToSync(&repo, repo.ecrImageName, event.MaxResults, event.CheckDigest, environmentVars)
if err != nil {
log.Fatal(err)
}
mu.Lock()
allTagsToSync = append(allTagsToSync, tagsToSync)
mu.Unlock()
}(j)

}
wg.Wait()
}

for _, tagsToSync := range allTagsToSync {
switch {
case event.Action == "s3":
csvOutput, err := buildCSVFile(i.source, tagsToSync, environmentVars)
csvOutput, err := buildCSVFile(tagsToSync, environmentVars)
if err != nil {
return returnErr(err, environmentVars.slackOAuthToken, event.SlackChannelID, errSubject,
"Error building csv output:")
}
csvContent = append(csvContent, csvOutput...)
default:
err = svc.syncImages(i.source, tagsToSync, environmentVars)
err = svc.syncImages(tagsToSync, environmentVars)
if err != nil {
return returnErr(err, environmentVars.slackOAuthToken, event.SlackChannelID, errSubject,
"Error syncing repositories:")
Expand Down
4 changes: 2 additions & 2 deletions pkg/lambda/output_to_s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,10 @@ func createZipFile(file string, target string) error {
return z.Close()
}

func buildCSVFile(source string, options syncOptions, env environmentVars) (csvContent []csvFormat, err error) {
func buildCSVFile(options syncOptions, env environmentVars) (csvContent []csvFormat, err error) {
for _, tag := range options.tags {
csvContent = append(csvContent, csvFormat{
source: source,
source: options.source,
imageECRURL: env.awsAccount + `.dkr.ecr.` + env.awsRegion + `.amazonaws.com/` + options.ecrImageName,
imageTag: tag,
})
Expand Down
5 changes: 2 additions & 3 deletions pkg/lambda/output_to_s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ func Test_createZipFile(t *testing.T) {

func Test_buildCSVFile(t *testing.T) {
type args struct {
source string
options syncOptions
env environmentVars
}
Expand All @@ -41,8 +40,8 @@ func Test_buildCSVFile(t *testing.T) {
{
name: "TestbuildCSV",
args: args{
source: "gcr.io/datadoghq/agent",
options: syncOptions{
source: "gcr.io/datadoghq/agent",
ecrImageName: "dev/datadoghq/agent",
tags: []string{"v7.32.0", "v7.31.0", "v7.28.0"},
},
Expand All @@ -57,7 +56,7 @@ func Test_buildCSVFile(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotCsvContent, err := buildCSVFile(tt.args.source, tt.args.options, tt.args.env)
gotCsvContent, err := buildCSVFile(tt.args.options, tt.args.env)
if (err != nil) != tt.wantErr {
t.Errorf("buildCSVFile() error = %v, wantErr %v", err, tt.wantErr)
return
Expand Down
7 changes: 4 additions & 3 deletions pkg/lambda/sync_images.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type loginOptions struct {

type syncOptions struct {
tags []string
source string
ecrImageName string
}

Expand Down Expand Up @@ -82,7 +83,7 @@ func (svc *ecrClient) copyImageWithCrane(imageName, tag, awsPrefix, ecrImageName
return nil
}

func (svc *ecrClient) syncImages(imageName string, options syncOptions, env environmentVars) error {
func (svc *ecrClient) syncImages(options syncOptions, env environmentVars) error {
awsPrefix := env.awsAccount + ".dkr.ecr." + env.awsRegion + ".amazonaws.com"
log.Printf("add login for %v", awsPrefix)
awsAuthData, err := svc.getECRAuthData()
Expand All @@ -104,8 +105,8 @@ func (svc *ecrClient) syncImages(imageName string, options syncOptions, env envi
}

for _, tag := range options.tags {
log.Printf("copying %s:%s to %s/%s:%s", imageName, tag, awsPrefix, options.ecrImageName, tag)
err := svc.copyImageWithCrane(imageName, tag, awsPrefix, options.ecrImageName)
log.Printf("copying %s:%s to %s/%s:%s", options.source, tag, awsPrefix, options.ecrImageName, tag)
err := svc.copyImageWithCrane(options.source, tag, awsPrefix, options.ecrImageName)

if err != nil {
log.Println("error copying image: ", err)
Expand Down
1 change: 1 addition & 0 deletions test/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ func main() {
os.Setenv("AWS_ACCOUNT_ID", "1234")

lambdaEvent := ecrImageSync.LambdaEvent{
Concurrent: 2,
CheckDigest: true,
MaxResults: 5,
}
Expand Down

0 comments on commit e5d9721

Please sign in to comment.