Skip to content

Commit

Permalink
Merge pull request #156 from Pix4D/PCI-2125_non_naive_s3_resource_check
Browse files Browse the repository at this point in the history
Non naive s3 resource check
  • Loading branch information
xtremerui authored Mar 2, 2022
2 parents 80c54bd + 6faa4ee commit 5d50637
Show file tree
Hide file tree
Showing 8 changed files with 444 additions and 149 deletions.
17 changes: 13 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,11 @@ version numbers.

One of the following two options must be specified:

* `regexp`: *Optional.* The pattern to match filenames against within S3. The first
grouped match is used to extract the version, or if a group is explicitly
named `version`, that group is used. At least one capture group must be
specified, with parentheses.
* `regexp`: *Optional.* The forward-slash (`/`) delimited sequence of patterns to
match against the sub-directories and filenames of the objects stored within
the S3 bucket. The first grouped match is used to extract the version, or if
a group is explicitly named `version`, that group is used. At least one
capture group must be specified, with parentheses.

The version extracted from this pattern is used to version the resource.
Semantic versions, or just numbers, are supported. Accordingly, full regular
Expand Down Expand Up @@ -255,6 +256,14 @@ docker build . -t s3-resource --target tests -f dockerfiles/ubuntu/Dockerfile \
--build-arg S3_ENDPOINT="https://s3.amazonaws.com"
```

##### Speeding up integration tests by skipping large file upload

One of the integration tests uploads a large file (>40GB) and so can be slow.
It can be skipped by adding the following option when running the tests:
```
--build-arg S3_TESTING_NO_LARGE_UPLOAD=true
```

##### Integration tests using role assumption

If `S3_TESTING_AWS_ROLE_ARN` is set to a role ARN, this role will be assumed for accessing
Expand Down
35 changes: 29 additions & 6 deletions check/command_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"

"github.com/concourse/s3-resource"
s3resource "github.com/concourse/s3-resource"
"github.com/concourse/s3-resource/fakes"

. "github.com/concourse/s3-resource/check"
Expand Down Expand Up @@ -37,11 +37,24 @@ var _ = Describe("Check Command", func() {
s3client = &fakes.FakeS3Client{}
command = NewCommand(s3client)

s3client.BucketFilesReturns([]string{
"files/abc-0.0.1.tgz",
"files/abc-2.33.333.tgz",
"files/abc-2.4.3.tgz",
"files/abc-3.53.tgz",
s3client.ChunkedBucketListReturnsOnCall(0, s3resource.BucketListChunk{
Truncated: false,
ContinuationToken: nil,
CommonPrefixes: []string{"files/abc-3/"},
Paths: []string{
"files/abc-0.0.1.tgz",
"files/abc-2.33.333.tgz",
"files/abc-2.4.3.tgz",
"files/abc-3.53.tgz",
},
}, nil)
s3client.ChunkedBucketListReturnsOnCall(1, s3resource.BucketListChunk{
Truncated: false,
ContinuationToken: nil,
Paths: []string{
"files/abc-3/53.tgz",
"files/abc-3/no-magic",
},
}, nil)
})

Expand Down Expand Up @@ -123,6 +136,16 @@ var _ = Describe("Check Command", func() {
Expect(response).To(ConsistOf(s3resource.Version{Path: "files/abc-2.33.333.tgz"}))
})
})

Context("when the regexp does not contain any magic regexp char", func() {
It("does not explode", func() {
request.Source.Regexp = "files/abc-3/no-magic"
response, err := command.Run(request)
Ω(err).ShouldNot(HaveOccurred())

Ω(response).Should(HaveLen(0))
})
})
})

Context("when there is a previous version", func() {
Expand Down
1 change: 1 addition & 0 deletions dockerfiles/alpine/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ ARG S3_TESTING_AWS_ROLE_ARN
ARG S3_VERSIONED_TESTING_BUCKET
ARG S3_TESTING_BUCKET
ARG S3_TESTING_REGION
ARG S3_TESTING_NO_LARGE_UPLOAD
ARG S3_ENDPOINT
ARG TEST_SESSION_TOKEN
COPY --from=builder /tests /go-tests
Expand Down
83 changes: 83 additions & 0 deletions fakes/fake_s3client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,21 @@ type FakeS3Client struct {
result1 []string
result2 error
}
ChunkedBucketListStub func(string, string, *string) (s3resource.BucketListChunk, error)
chunkedBucketListMutex sync.RWMutex
chunkedBucketListArgsForCall []struct {
arg1 string
arg2 string
arg3 *string
}
chunkedBucketListReturns struct {
result1 s3resource.BucketListChunk
result2 error
}
chunkedBucketListReturnsOnCall map[int]struct {
result1 s3resource.BucketListChunk
result2 error
}
DeleteFileStub func(string, string) error
deleteFileMutex sync.RWMutex
deleteFileArgsForCall []struct {
Expand Down Expand Up @@ -265,6 +280,72 @@ func (fake *FakeS3Client) BucketFilesReturnsOnCall(i int, result1 []string, resu
}{result1, result2}
}

func (fake *FakeS3Client) ChunkedBucketList(arg1 string, arg2 string, arg3 *string) (s3resource.BucketListChunk, error) {
fake.chunkedBucketListMutex.Lock()
ret, specificReturn := fake.chunkedBucketListReturnsOnCall[len(fake.chunkedBucketListArgsForCall)]
fake.chunkedBucketListArgsForCall = append(fake.chunkedBucketListArgsForCall, struct {
arg1 string
arg2 string
arg3 *string
}{arg1, arg2, arg3})
stub := fake.ChunkedBucketListStub
fakeReturns := fake.chunkedBucketListReturns
fake.recordInvocation("ChunkedBucketList", []interface{}{arg1, arg2, arg3})
fake.chunkedBucketListMutex.Unlock()
if stub != nil {
return stub(arg1, arg2, arg3)
}
if specificReturn {
return ret.result1, ret.result2
}
return fakeReturns.result1, fakeReturns.result2
}

func (fake *FakeS3Client) ChunkedBucketListCallCount() int {
fake.chunkedBucketListMutex.RLock()
defer fake.chunkedBucketListMutex.RUnlock()
return len(fake.chunkedBucketListArgsForCall)
}

func (fake *FakeS3Client) ChunkedBucketListCalls(stub func(string, string, *string) (s3resource.BucketListChunk, error)) {
fake.chunkedBucketListMutex.Lock()
defer fake.chunkedBucketListMutex.Unlock()
fake.ChunkedBucketListStub = stub
}

func (fake *FakeS3Client) ChunkedBucketListArgsForCall(i int) (string, string, *string) {
fake.chunkedBucketListMutex.RLock()
defer fake.chunkedBucketListMutex.RUnlock()
argsForCall := fake.chunkedBucketListArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3
}

func (fake *FakeS3Client) ChunkedBucketListReturns(result1 s3resource.BucketListChunk, result2 error) {
fake.chunkedBucketListMutex.Lock()
defer fake.chunkedBucketListMutex.Unlock()
fake.ChunkedBucketListStub = nil
fake.chunkedBucketListReturns = struct {
result1 s3resource.BucketListChunk
result2 error
}{result1, result2}
}

func (fake *FakeS3Client) ChunkedBucketListReturnsOnCall(i int, result1 s3resource.BucketListChunk, result2 error) {
fake.chunkedBucketListMutex.Lock()
defer fake.chunkedBucketListMutex.Unlock()
fake.ChunkedBucketListStub = nil
if fake.chunkedBucketListReturnsOnCall == nil {
fake.chunkedBucketListReturnsOnCall = make(map[int]struct {
result1 s3resource.BucketListChunk
result2 error
})
}
fake.chunkedBucketListReturnsOnCall[i] = struct {
result1 s3resource.BucketListChunk
result2 error
}{result1, result2}
}

func (fake *FakeS3Client) DeleteFile(arg1 string, arg2 string) error {
fake.deleteFileMutex.Lock()
ret, specificReturn := fake.deleteFileReturnsOnCall[len(fake.deleteFileArgsForCall)]
Expand Down Expand Up @@ -713,6 +794,8 @@ func (fake *FakeS3Client) Invocations() map[string][][]interface{} {
defer fake.bucketFileVersionsMutex.RUnlock()
fake.bucketFilesMutex.RLock()
defer fake.bucketFilesMutex.RUnlock()
fake.chunkedBucketListMutex.RLock()
defer fake.chunkedBucketListMutex.RUnlock()
fake.deleteFileMutex.RLock()
defer fake.deleteFileMutex.RUnlock()
fake.deleteVersionedFileMutex.RLock()
Expand Down
8 changes: 6 additions & 2 deletions integration/out_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/concourse/s3-resource"
s3resource "github.com/concourse/s3-resource"
"github.com/concourse/s3-resource/out"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/onsi/gomega/gbytes"
"github.com/onsi/gomega/gexec"

"github.com/nu7hatch/gouuid"
uuid "github.com/nu7hatch/gouuid"
)

var _ = Describe("out", func() {
Expand Down Expand Up @@ -300,6 +300,10 @@ var _ = Describe("out", func() {

Context("with a large file that is multiple of MaxUploadParts", func() {
BeforeEach(func() {
if os.Getenv("S3_TESTING_NO_LARGE_UPLOAD") != "" {
Skip("'S3_TESTING_NO_LARGE_UPLOAD' is set, skipping.")
}

path := filepath.Join(sourceDir, "large-file-to-upload")

// touch the file
Expand Down
120 changes: 63 additions & 57 deletions s3client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/aws/aws-sdk-go/aws/credentials/stscreds"
"io"
"io/ioutil"
"os"
"strings"
"time"

"github.com/aws/aws-sdk-go/aws/credentials/stscreds"

"net/http"

"github.com/aws/aws-sdk-go/aws"
Expand All @@ -28,6 +29,8 @@ type S3Client interface {
BucketFiles(bucketName string, prefixHint string) ([]string, error)
BucketFileVersions(bucketName string, remotePath string) ([]string, error)

ChunkedBucketList(bucketName string, prefix string, continuationToken *string) (BucketListChunk, error)

UploadFile(bucketName string, remotePath string, localPath string, options UploadFileOptions) (string, error)
DownloadFile(bucketName string, remotePath string, versionID string, localPath string) error

Expand Down Expand Up @@ -149,17 +152,24 @@ func NewAwsConfig(
return awsConfig
}

func (client *s3client) BucketFiles(bucketName string, prefixHint string) ([]string, error) {
entries, err := client.getBucketContents(bucketName, prefixHint)

if err != nil {
return []string{}, err
}

paths := make([]string, 0, len(entries))

for _, entry := range entries {
paths = append(paths, *entry.Key)
// BucketFiles returns all the files in bucketName immediately under directoryPrefix
func (client *s3client) BucketFiles(bucketName string, directoryPrefix string) ([]string, error) {
if !strings.HasSuffix(directoryPrefix, "/") {
directoryPrefix = directoryPrefix + "/"
}
var (
continuationToken *string
truncated bool
paths []string
)
for continuationToken, truncated = nil, true; truncated; {
s3ListChunk, err := client.ChunkedBucketList(bucketName, directoryPrefix, continuationToken)
if err != nil {
return []string{}, err
}
truncated = s3ListChunk.Truncated
continuationToken = s3ListChunk.ContinuationToken
paths = append(paths, s3ListChunk.Paths...)
}
return paths, nil
}
Expand Down Expand Up @@ -189,6 +199,47 @@ func (client *s3client) BucketFileVersions(bucketName string, remotePath string)
return versions, nil
}

type BucketListChunk struct {
Truncated bool
ContinuationToken *string
CommonPrefixes []string
Paths []string
}

// ChunkedBucketList lists the S3 bucket `bucketName` content's under `prefix` one chunk at a time
//
// The returned `BucketListChunk` contains part of the files and subdirectories
// present in `bucketName` under `prefix`. The files are listed in `Paths` and
// the subdirectories in `CommonPrefixes`. If the returned chunk does not
// include all the files and subdirectories, the `Truncated` flag will be set
// to `true` and the `ContinuationToken` can be used to retrieve the next chunk.
func (client *s3client) ChunkedBucketList(bucketName string, prefix string, continuationToken *string) (BucketListChunk, error) {
params := &s3.ListObjectsV2Input{
Bucket: aws.String(bucketName),
ContinuationToken: continuationToken,
Delimiter: aws.String("/"),
Prefix: aws.String(prefix),
}
response, err := client.client.ListObjectsV2(params)
if err != nil {
return BucketListChunk{}, err
}
commonPrefixes := make([]string, 0, len(response.CommonPrefixes))
paths := make([]string, 0, len(response.Contents))
for _, commonPrefix := range response.CommonPrefixes {
commonPrefixes = append(commonPrefixes, *commonPrefix.Prefix)
}
for _, path := range response.Contents {
paths = append(paths, *path.Key)
}
return BucketListChunk{
Truncated: *response.IsTruncated,
ContinuationToken: response.NextContinuationToken,
CommonPrefixes: commonPrefixes,
Paths: paths,
}, nil
}

func (client *s3client) UploadFile(bucketName string, remotePath string, localPath string, options UploadFileOptions) (string, error) {
uploader := s3manager.NewUploaderWithClient(client.client)

Expand Down Expand Up @@ -398,51 +449,6 @@ func (client *s3client) DeleteFile(bucketName string, remotePath string) error {
return err
}

func (client *s3client) getBucketContents(bucketName string, prefix string) (map[string]*s3.Object, error) {
bucketContents := map[string]*s3.Object{}
marker := ""

for {
listObjectsResponse, err := client.client.ListObjects(&s3.ListObjectsInput{
Bucket: aws.String(bucketName),
Prefix: aws.String(prefix),
Marker: aws.String(marker),
})

if err != nil {
return bucketContents, err
}

lastKey := ""

for _, key := range listObjectsResponse.Contents {
bucketContents[*key.Key] = key

lastKey = *key.Key
}

if *listObjectsResponse.IsTruncated {
prevMarker := marker
if listObjectsResponse.NextMarker == nil {
// From the s3 docs: If response does not include the
// NextMarker and it is truncated, you can use the value of the
// last Key in the response as the marker in the subsequent
// request to get the next set of object keys.
marker = lastKey
} else {
marker = *listObjectsResponse.NextMarker
}
if marker == prevMarker {
return nil, errors.New("Unable to list all bucket objects; perhaps this is a CloudFront S3 bucket that needs its `Query String Forwarding and Caching` set to `Forward all, cache based on all`?")
}
} else {
break
}
}

return bucketContents, nil
}

func (client *s3client) getBucketVersioning(bucketName string) (bool, error) {
params := &s3.GetBucketVersioningInput{
Bucket: aws.String(bucketName),
Expand Down
Loading

0 comments on commit 5d50637

Please sign in to comment.