diff --git a/Makefile b/Makefile index ad5b65af7f6..491bc5d4b3a 100644 --- a/Makefile +++ b/Makefile @@ -20,11 +20,7 @@ EACHMODULE_FAILFAST_FLAG=-fail-fast=${EACHMODULE_FAILFAST} EACHMODULE_CONCURRENCY ?= 1 EACHMODULE_CONCURRENCY_FLAG=-c ${EACHMODULE_CONCURRENCY} -# TODO: github.com/aws/aws-sdk-go-v2/feature/s3/transfermanager is in beta -# avoid running any codegen operations / tests on it until then -# path is evaluated relatively by the tool, the right one will vary by the -# command context -EACHMODULE_SKIP ?= feature/s3/transfermanager:s3/transfermanager:transfermanager +EACHMODULE_SKIP ?= EACHMODULE_SKIP_FLAG=-skip="${EACHMODULE_SKIP}" EACHMODULE_FLAGS=${EACHMODULE_CONCURRENCY_FLAG} ${EACHMODULE_FAILFAST_FLAG} ${EACHMODULE_SKIP_FLAG} diff --git a/feature/s3/transfermanager/LICENSE.txt b/feature/s3/transfermanager/LICENSE.txt deleted file mode 100644 index d6456956733..00000000000 --- a/feature/s3/transfermanager/LICENSE.txt +++ /dev/null @@ -1,202 +0,0 @@ - - Apache License - Version 2.0, January 2004 - http://www.apache.org/licenses/ - - TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION - - 1. Definitions. - - "License" shall mean the terms and conditions for use, reproduction, - and distribution as defined by Sections 1 through 9 of this document. - - "Licensor" shall mean the copyright owner or entity authorized by - the copyright owner that is granting the License. - - "Legal Entity" shall mean the union of the acting entity and all - other entities that control, are controlled by, or are under common - control with that entity. For the purposes of this definition, - "control" means (i) the power, direct or indirect, to cause the - direction or management of such entity, whether by contract or - otherwise, or (ii) ownership of fifty percent (50%) or more of the - outstanding shares, or (iii) beneficial ownership of such entity. - - "You" (or "Your") shall mean an individual or Legal Entity - exercising permissions granted by this License. - - "Source" form shall mean the preferred form for making modifications, - including but not limited to software source code, documentation - source, and configuration files. - - "Object" form shall mean any form resulting from mechanical - transformation or translation of a Source form, including but - not limited to compiled object code, generated documentation, - and conversions to other media types. - - "Work" shall mean the work of authorship, whether in Source or - Object form, made available under the License, as indicated by a - copyright notice that is included in or attached to the work - (an example is provided in the Appendix below). - - "Derivative Works" shall mean any work, whether in Source or Object - form, that is based on (or derived from) the Work and for which the - editorial revisions, annotations, elaborations, or other modifications - represent, as a whole, an original work of authorship. For the purposes - of this License, Derivative Works shall not include works that remain - separable from, or merely link (or bind by name) to the interfaces of, - the Work and Derivative Works thereof. - - "Contribution" shall mean any work of authorship, including - the original version of the Work and any modifications or additions - to that Work or Derivative Works thereof, that is intentionally - submitted to Licensor for inclusion in the Work by the copyright owner - or by an individual or Legal Entity authorized to submit on behalf of - the copyright owner. For the purposes of this definition, "submitted" - means any form of electronic, verbal, or written communication sent - to the Licensor or its representatives, including but not limited to - communication on electronic mailing lists, source code control systems, - and issue tracking systems that are managed by, or on behalf of, the - Licensor for the purpose of discussing and improving the Work, but - excluding communication that is conspicuously marked or otherwise - designated in writing by the copyright owner as "Not a Contribution." - - "Contributor" shall mean Licensor and any individual or Legal Entity - on behalf of whom a Contribution has been received by Licensor and - subsequently incorporated within the Work. - - 2. Grant of Copyright License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - copyright license to reproduce, prepare Derivative Works of, - publicly display, publicly perform, sublicense, and distribute the - Work and such Derivative Works in Source or Object form. - - 3. Grant of Patent License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - (except as stated in this section) patent license to make, have made, - use, offer to sell, sell, import, and otherwise transfer the Work, - where such license applies only to those patent claims licensable - by such Contributor that are necessarily infringed by their - Contribution(s) alone or by combination of their Contribution(s) - with the Work to which such Contribution(s) was submitted. If You - institute patent litigation against any entity (including a - cross-claim or counterclaim in a lawsuit) alleging that the Work - or a Contribution incorporated within the Work constitutes direct - or contributory patent infringement, then any patent licenses - granted to You under this License for that Work shall terminate - as of the date such litigation is filed. - - 4. Redistribution. You may reproduce and distribute copies of the - Work or Derivative Works thereof in any medium, with or without - modifications, and in Source or Object form, provided that You - meet the following conditions: - - (a) You must give any other recipients of the Work or - Derivative Works a copy of this License; and - - (b) You must cause any modified files to carry prominent notices - stating that You changed the files; and - - (c) You must retain, in the Source form of any Derivative Works - that You distribute, all copyright, patent, trademark, and - attribution notices from the Source form of the Work, - excluding those notices that do not pertain to any part of - the Derivative Works; and - - (d) If the Work includes a "NOTICE" text file as part of its - distribution, then any Derivative Works that You distribute must - include a readable copy of the attribution notices contained - within such NOTICE file, excluding those notices that do not - pertain to any part of the Derivative Works, in at least one - of the following places: within a NOTICE text file distributed - as part of the Derivative Works; within the Source form or - documentation, if provided along with the Derivative Works; or, - within a display generated by the Derivative Works, if and - wherever such third-party notices normally appear. The contents - of the NOTICE file are for informational purposes only and - do not modify the License. You may add Your own attribution - notices within Derivative Works that You distribute, alongside - or as an addendum to the NOTICE text from the Work, provided - that such additional attribution notices cannot be construed - as modifying the License. - - You may add Your own copyright statement to Your modifications and - may provide additional or different license terms and conditions - for use, reproduction, or distribution of Your modifications, or - for any such Derivative Works as a whole, provided Your use, - reproduction, and distribution of the Work otherwise complies with - the conditions stated in this License. - - 5. Submission of Contributions. Unless You explicitly state otherwise, - any Contribution intentionally submitted for inclusion in the Work - by You to the Licensor shall be under the terms and conditions of - this License, without any additional terms or conditions. - Notwithstanding the above, nothing herein shall supersede or modify - the terms of any separate license agreement you may have executed - with Licensor regarding such Contributions. - - 6. Trademarks. This License does not grant permission to use the trade - names, trademarks, service marks, or product names of the Licensor, - except as required for reasonable and customary use in describing the - origin of the Work and reproducing the content of the NOTICE file. - - 7. Disclaimer of Warranty. Unless required by applicable law or - agreed to in writing, Licensor provides the Work (and each - Contributor provides its Contributions) on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - implied, including, without limitation, any warranties or conditions - of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A - PARTICULAR PURPOSE. You are solely responsible for determining the - appropriateness of using or redistributing the Work and assume any - risks associated with Your exercise of permissions under this License. - - 8. Limitation of Liability. In no event and under no legal theory, - whether in tort (including negligence), contract, or otherwise, - unless required by applicable law (such as deliberate and grossly - negligent acts) or agreed to in writing, shall any Contributor be - liable to You for damages, including any direct, indirect, special, - incidental, or consequential damages of any character arising as a - result of this License or out of the use or inability to use the - Work (including but not limited to damages for loss of goodwill, - work stoppage, computer failure or malfunction, or any and all - other commercial damages or losses), even if such Contributor - has been advised of the possibility of such damages. - - 9. Accepting Warranty or Additional Liability. While redistributing - the Work or Derivative Works thereof, You may choose to offer, - and charge a fee for, acceptance of support, warranty, indemnity, - or other liability obligations and/or rights consistent with this - License. However, in accepting such obligations, You may act only - on Your own behalf and on Your sole responsibility, not on behalf - of any other Contributor, and only if You agree to indemnify, - defend, and hold each Contributor harmless for any liability - incurred by, or claims asserted against, such Contributor by reason - of your accepting any such warranty or additional liability. - - END OF TERMS AND CONDITIONS - - APPENDIX: How to apply the Apache License to your work. - - To apply the Apache License to your work, attach the following - boilerplate notice, with the fields enclosed by brackets "[]" - replaced with your own identifying information. (Don't include - the brackets!) The text should be enclosed in the appropriate - comment syntax for the file format. We also recommend that a - file or class name and description of purpose be included on the - same "printed page" as the copyright notice for easier - identification within third-party archives. - - Copyright [yyyy] [name of copyright owner] - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. diff --git a/feature/s3/transfermanager/api.go b/feature/s3/transfermanager/api.go deleted file mode 100644 index dbe430dadb4..00000000000 --- a/feature/s3/transfermanager/api.go +++ /dev/null @@ -1,16 +0,0 @@ -package transfermanager - -import ( - "context" - - "github.com/aws/aws-sdk-go-v2/service/s3" -) - -// S3APIClient defines an interface doing S3 client side operations for transfer manager -type S3APIClient interface { - PutObject(context.Context, *s3.PutObjectInput, ...func(*s3.Options)) (*s3.PutObjectOutput, error) - UploadPart(context.Context, *s3.UploadPartInput, ...func(*s3.Options)) (*s3.UploadPartOutput, error) - CreateMultipartUpload(context.Context, *s3.CreateMultipartUploadInput, ...func(*s3.Options)) (*s3.CreateMultipartUploadOutput, error) - CompleteMultipartUpload(context.Context, *s3.CompleteMultipartUploadInput, ...func(*s3.Options)) (*s3.CompleteMultipartUploadOutput, error) - AbortMultipartUpload(context.Context, *s3.AbortMultipartUploadInput, ...func(*s3.Options)) (*s3.AbortMultipartUploadOutput, error) -} diff --git a/feature/s3/transfermanager/api_client.go b/feature/s3/transfermanager/api_client.go deleted file mode 100644 index 84ce16db425..00000000000 --- a/feature/s3/transfermanager/api_client.go +++ /dev/null @@ -1,51 +0,0 @@ -package transfermanager - -import ( - "github.com/aws/aws-sdk-go-v2/aws" -) - -const userAgentKey = "s3-transfer" - -// defaultMaxUploadParts is the maximum allowed number of parts in a multi-part upload -// on Amazon S3. -const defaultMaxUploadParts = 10000 - -// defaultPartSizeBytes is the default part size when transferring objects to/from S3 -const minPartSizeBytes = 1024 * 1024 * 8 - -// defaultMultipartUploadThreshold is the default size threshold in bytes indicating when to use multipart upload. -const defaultMultipartUploadThreshold = 1024 * 1024 * 16 - -// defaultTransferConcurrency is the default number of goroutines to spin up when -// using PutObject(). -const defaultTransferConcurrency = 5 - -// Client provides the API client to make operations call for Amazon Simple -// Storage Service's Transfer Manager -// It is safe to call Client methods concurrently across goroutines. -type Client struct { - options Options -} - -// New returns an initialized Client from the client Options. Provide -// more functional options to further configure the Client -func New(s3Client S3APIClient, opts Options, optFns ...func(*Options)) *Client { - opts.S3 = s3Client - for _, fn := range optFns { - fn(&opts) - } - - resolveConcurrency(&opts) - resolvePartSizeBytes(&opts) - resolveChecksumAlgorithm(&opts) - resolveMultipartUploadThreshold(&opts) - - return &Client{ - options: opts, - } -} - -// NewFromConfig returns a new Client from the provided s3 config -func NewFromConfig(s3Client S3APIClient, cfg aws.Config, optFns ...func(*Options)) *Client { - return New(s3Client, Options{}, optFns...) -} diff --git a/feature/s3/transfermanager/api_op_PutObject.go b/feature/s3/transfermanager/api_op_PutObject.go deleted file mode 100644 index 0785ec4e81e..00000000000 --- a/feature/s3/transfermanager/api_op_PutObject.go +++ /dev/null @@ -1,992 +0,0 @@ -package transfermanager - -import ( - "bytes" - "context" - "fmt" - "io" - "sort" - "sync" - "time" - - "github.com/aws/aws-sdk-go-v2/aws" - "github.com/aws/aws-sdk-go-v2/aws/middleware" - "github.com/aws/aws-sdk-go-v2/feature/s3/transfermanager/types" - "github.com/aws/aws-sdk-go-v2/service/s3" - s3types "github.com/aws/aws-sdk-go-v2/service/s3/types" - smithymiddleware "github.com/aws/smithy-go/middleware" -) - -// A MultipartUploadError wraps a failed S3 multipart upload. An error returned -// will satisfy this interface when a multi part upload failed to upload all -// chucks to S3. In the case of a failure the UploadID is needed to operate on -// the chunks, if any, which were uploaded. -// -// Example: -// -// c := transfermanager.New(client, opts) -// output, err := c.PutObject(context.Background(), input) -// if err != nil { -// var multierr transfermanager.MultipartUploadError -// if errors.As(err, &multierr) { -// fmt.Printf("upload failure UploadID=%s, %s\n", multierr.UploadID(), multierr.Error()) -// } else { -// fmt.Printf("upload failure, %s\n", err.Error()) -// } -// } -type MultipartUploadError interface { - error - - // UploadID returns the upload id for the S3 multipart upload that failed. - UploadID() string -} - -// A multipartUploadError wraps the upload ID of a failed s3 multipart upload. -// Composed of BaseError for code, message, and original error -// -// Should be used for an error that occurred failing a S3 multipart upload, -// and a upload ID is available. -type multipartUploadError struct { - err error - - // ID for multipart upload which failed. - uploadID string -} - -// Error returns the string representation of the error. -// -// Satisfies the error interface. -func (m *multipartUploadError) Error() string { - var extra string - if m.err != nil { - extra = fmt.Sprintf(", cause: %s", m.err.Error()) - } - return fmt.Sprintf("upload multipart failed, upload id: %s%s", m.uploadID, extra) -} - -// Unwrap returns the underlying error that cause the upload failure -func (m *multipartUploadError) Unwrap() error { - return m.err -} - -// UploadID returns the id of the S3 upload which failed. -func (m *multipartUploadError) UploadID() string { - return m.uploadID -} - -// PutObjectInput represents a request to the PutObject() call. It contains common fields -// of s3 PutObject and CreateMultipartUpload input -type PutObjectInput struct { - // Bucket the object is uploaded into - Bucket string - - // Object key for which the PUT action was initiated - Key string - - // Object data - Body io.Reader - - // The canned ACL to apply to the object. For more information, see [Canned ACL] in the Amazon - // S3 User Guide. - // - // When adding a new object, you can use headers to grant ACL-based permissions to - // individual Amazon Web Services accounts or to predefined groups defined by - // Amazon S3. These permissions are then added to the ACL on the object. By - // default, all objects are private. Only the owner has full access control. For - // more information, see [Access Control List (ACL) Overview]and [Managing ACLs Using the REST API] in the Amazon S3 User Guide. - // - // If the bucket that you're uploading objects to uses the bucket owner enforced - // setting for S3 Object Ownership, ACLs are disabled and no longer affect - // permissions. Buckets that use this setting only accept PUT requests that don't - // specify an ACL or PUT requests that specify bucket owner full control ACLs, such - // as the bucket-owner-full-control canned ACL or an equivalent form of this ACL - // expressed in the XML format. PUT requests that contain other ACLs (for example, - // custom grants to certain Amazon Web Services accounts) fail and return a 400 - // error with the error code AccessControlListNotSupported . For more information, - // see [Controlling ownership of objects and disabling ACLs]in the Amazon S3 User Guide. - // - // - This functionality is not supported for directory buckets. - // - // - This functionality is not supported for Amazon S3 on Outposts. - // - // [Managing ACLs Using the REST API]: https://docs.aws.amazon.com/AmazonS3/latest/dev/acl-using-rest-api.html - // [Access Control List (ACL) Overview]: https://docs.aws.amazon.com/AmazonS3/latest/dev/acl-overview.html - // [Canned ACL]: https://docs.aws.amazon.com/AmazonS3/latest/dev/acl-overview.html#CannedACL - // [Controlling ownership of objects and disabling ACLs]: https://docs.aws.amazon.com/AmazonS3/latest/userguide/about-object-ownership.html - ACL types.ObjectCannedACL - - // Specifies whether Amazon S3 should use an S3 Bucket Key for object encryption - // with server-side encryption using Key Management Service (KMS) keys (SSE-KMS). - // Setting this header to true causes Amazon S3 to use an S3 Bucket Key for object - // encryption with SSE-KMS. - // - // Specifying this header with a PUT action doesn’t affect bucket-level settings - // for S3 Bucket Key. - // - // This functionality is not supported for directory buckets. - BucketKeyEnabled bool - - // Can be used to specify caching behavior along the request/reply chain. For more - // information, see [http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.9]. - // - // [http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.9]: http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.9 - CacheControl string - - // Indicates the algorithm used to create the checksum for the object when you use - // the SDK. This header will not provide any additional functionality if you don't - // use the SDK. When you send this header, there must be a corresponding - // x-amz-checksum-algorithm or x-amz-trailer header sent. Otherwise, Amazon S3 - // fails the request with the HTTP status code 400 Bad Request . - // - // For the x-amz-checksum-algorithm header, replace algorithm with the - // supported algorithm from the following list: - // - // - CRC32 - // - // - CRC32C - // - // - SHA1 - // - // - SHA256 - // - // For more information, see [Checking object integrity] in the Amazon S3 User Guide. - // - // If the individual checksum value you provide through x-amz-checksum-algorithm - // doesn't match the checksum algorithm you set through - // x-amz-sdk-checksum-algorithm , Amazon S3 ignores any provided ChecksumAlgorithm - // parameter and uses the checksum algorithm that matches the provided value in - // x-amz-checksum-algorithm . - // - // For directory buckets, when you use Amazon Web Services SDKs, CRC32 is the - // default checksum algorithm that's used for performance. - // - // [Checking object integrity]: https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html - ChecksumAlgorithm types.ChecksumAlgorithm - - // Size of the body in bytes. This parameter is useful when the size of the body - // cannot be determined automatically. For more information, see [https://www.rfc-editor.org/rfc/rfc9110.html#name-content-length]. - // - // [https://www.rfc-editor.org/rfc/rfc9110.html#name-content-length]: https://www.rfc-editor.org/rfc/rfc9110.html#name-content-length - ContentLength int64 - - // Specifies presentational information for the object. For more information, see [https://www.rfc-editor.org/rfc/rfc6266#section-4]. - // - // [https://www.rfc-editor.org/rfc/rfc6266#section-4]: https://www.rfc-editor.org/rfc/rfc6266#section-4 - ContentDisposition string - - // Specifies what content encodings have been applied to the object and thus what - // decoding mechanisms must be applied to obtain the media-type referenced by the - // Content-Type header field. For more information, see [https://www.rfc-editor.org/rfc/rfc9110.html#field.content-encoding]. - // - // [https://www.rfc-editor.org/rfc/rfc9110.html#field.content-encoding]: https://www.rfc-editor.org/rfc/rfc9110.html#field.content-encoding - ContentEncoding string - - // The language the content is in. - ContentLanguage string - - // A standard MIME type describing the format of the contents. For more - // information, see [https://www.rfc-editor.org/rfc/rfc9110.html#name-content-type]. - // - // [https://www.rfc-editor.org/rfc/rfc9110.html#name-content-type]: https://www.rfc-editor.org/rfc/rfc9110.html#name-content-type - ContentType string - - // The account ID of the expected bucket owner. If the account ID that you provide - // does not match the actual owner of the bucket, the request fails with the HTTP - // status code 403 Forbidden (access denied). - ExpectedBucketOwner string - - // The date and time at which the object is no longer cacheable. For more - // information, see [https://www.rfc-editor.org/rfc/rfc7234#section-5.3]. - // - // [https://www.rfc-editor.org/rfc/rfc7234#section-5.3]: https://www.rfc-editor.org/rfc/rfc7234#section-5.3 - Expires time.Time - - // Gives the grantee READ, READ_ACP, and WRITE_ACP permissions on the object. - // - // - This functionality is not supported for directory buckets. - // - // - This functionality is not supported for Amazon S3 on Outposts. - GrantFullControl string - - // Allows grantee to read the object data and its metadata. - // - // - This functionality is not supported for directory buckets. - // - // - This functionality is not supported for Amazon S3 on Outposts. - GrantRead string - - // Allows grantee to read the object ACL. - // - // - This functionality is not supported for directory buckets. - // - // - This functionality is not supported for Amazon S3 on Outposts. - GrantReadACP string - - // Allows grantee to write the ACL for the applicable object. - // - // - This functionality is not supported for directory buckets. - // - // - This functionality is not supported for Amazon S3 on Outposts. - GrantWriteACP string - - // A map of metadata to store with the object in S3. - Metadata map[string]string - - // Specifies whether a legal hold will be applied to this object. For more - // information about S3 Object Lock, see [Object Lock]in the Amazon S3 User Guide. - // - // This functionality is not supported for directory buckets. - // - // [Object Lock]: https://docs.aws.amazon.com/AmazonS3/latest/dev/object-lock.html - ObjectLockLegalHoldStatus types.ObjectLockLegalHoldStatus - - // The Object Lock mode that you want to apply to this object. - // - // This functionality is not supported for directory buckets. - ObjectLockMode types.ObjectLockMode - - // The date and time when you want this object's Object Lock to expire. Must be - // formatted as a timestamp parameter. - // - // This functionality is not supported for directory buckets. - ObjectLockRetainUntilDate time.Time - - // Confirms that the requester knows that they will be charged for the request. - // Bucket owners need not specify this parameter in their requests. If either the - // source or destination S3 bucket has Requester Pays enabled, the requester will - // pay for corresponding charges to copy the object. For information about - // downloading objects from Requester Pays buckets, see [Downloading Objects in Requester Pays Buckets]in the Amazon S3 User - // Guide. - // - // This functionality is not supported for directory buckets. - // - // [Downloading Objects in Requester Pays Buckets]: https://docs.aws.amazon.com/AmazonS3/latest/dev/ObjectsinRequesterPaysBuckets.html - RequestPayer types.RequestPayer - - // Specifies the algorithm to use when encrypting the object (for example, AES256 ). - // - // This functionality is not supported for directory buckets. - SSECustomerAlgorithm string - - // Specifies the customer-provided encryption key for Amazon S3 to use in - // encrypting data. This value is used to store the object and then it is - // discarded; Amazon S3 does not store the encryption key. The key must be - // appropriate for use with the algorithm specified in the - // x-amz-server-side-encryption-customer-algorithm header. - // - // This functionality is not supported for directory buckets. - SSECustomerKey string - - // Specifies the Amazon Web Services KMS Encryption Context to use for object - // encryption. The value of this header is a base64-encoded UTF-8 string holding - // JSON with the encryption context key-value pairs. This value is stored as object - // metadata and automatically gets passed on to Amazon Web Services KMS for future - // GetObject or CopyObject operations on this object. This value must be - // explicitly added during CopyObject operations. - // - // This functionality is not supported for directory buckets. - SSEKMSEncryptionContext string - - // If x-amz-server-side-encryption has a valid value of aws:kms or aws:kms:dsse , - // this header specifies the ID (Key ID, Key ARN, or Key Alias) of the Key - // Management Service (KMS) symmetric encryption customer managed key that was used - // for the object. If you specify x-amz-server-side-encryption:aws:kms or - // x-amz-server-side-encryption:aws:kms:dsse , but do not provide - // x-amz-server-side-encryption-aws-kms-key-id , Amazon S3 uses the Amazon Web - // Services managed key ( aws/s3 ) to protect the data. If the KMS key does not - // exist in the same account that's issuing the command, you must use the full ARN - // and not just the ID. - // - // This functionality is not supported for directory buckets. - SSEKMSKeyID string - - // The server-side encryption algorithm that was used when you store this object - // in Amazon S3 (for example, AES256 , aws:kms , aws:kms:dsse ). - // - // General purpose buckets - You have four mutually exclusive options to protect - // data using server-side encryption in Amazon S3, depending on how you choose to - // manage the encryption keys. Specifically, the encryption key options are Amazon - // S3 managed keys (SSE-S3), Amazon Web Services KMS keys (SSE-KMS or DSSE-KMS), - // and customer-provided keys (SSE-C). Amazon S3 encrypts data with server-side - // encryption by using Amazon S3 managed keys (SSE-S3) by default. You can - // optionally tell Amazon S3 to encrypt data at rest by using server-side - // encryption with other key options. For more information, see [Using Server-Side Encryption]in the Amazon S3 - // User Guide. - // - // Directory buckets - For directory buckets, only the server-side encryption with - // Amazon S3 managed keys (SSE-S3) ( AES256 ) value is supported. - // - // [Using Server-Side Encryption]: https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingServerSideEncryption.html - ServerSideEncryption types.ServerSideEncryption - - // By default, Amazon S3 uses the STANDARD Storage Class to store newly created - // objects. The STANDARD storage class provides high durability and high - // availability. Depending on performance needs, you can specify a different - // Storage Class. For more information, see [Storage Classes]in the Amazon S3 User Guide. - // - // - For directory buckets, only the S3 Express One Zone storage class is - // supported to store newly created objects. - // - // - Amazon S3 on Outposts only uses the OUTPOSTS Storage Class. - // - // [Storage Classes]: https://docs.aws.amazon.com/AmazonS3/latest/dev/storage-class-intro.html - StorageClass types.StorageClass - - // The tag-set for the object. The tag-set must be encoded as URL Query - // parameters. (For example, "Key1=Value1") - // - // This functionality is not supported for directory buckets. - Tagging string - - // If the bucket is configured as a website, redirects requests for this object to - // another object in the same bucket or to an external URL. Amazon S3 stores the - // value of this header in the object metadata. For information about object - // metadata, see [Object Key and Metadata]in the Amazon S3 User Guide. - // - // In the following example, the request header sets the redirect to an object - // (anotherPage.html) in the same bucket: - // - // x-amz-website-redirect-location: /anotherPage.html - // - // In the following example, the request header sets the object redirect to - // another website: - // - // x-amz-website-redirect-location: http://www.example.com/ - // - // For more information about website hosting in Amazon S3, see [Hosting Websites on Amazon S3] and [How to Configure Website Page Redirects] in the - // Amazon S3 User Guide. - // - // This functionality is not supported for directory buckets. - // - // [How to Configure Website Page Redirects]: https://docs.aws.amazon.com/AmazonS3/latest/dev/how-to-page-redirect.html - // [Hosting Websites on Amazon S3]: https://docs.aws.amazon.com/AmazonS3/latest/dev/WebsiteHosting.html - // [Object Key and Metadata]: https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingMetadata.html - WebsiteRedirectLocation string -} - -// map non-zero string to *string -func nzstring(v string) *string { - if v == "" { - return nil - } - return aws.String(v) -} - -// map non-zero Time to *Time -func nztime(t time.Time) *time.Time { - if t.IsZero() { - return nil - } - return aws.Time(t) -} - -func (i PutObjectInput) mapSingleUploadInput(body io.Reader, checksumAlgorithm types.ChecksumAlgorithm) *s3.PutObjectInput { - input := &s3.PutObjectInput{ - Bucket: aws.String(i.Bucket), - Key: aws.String(i.Key), - Body: body, - } - if i.ACL != "" { - input.ACL = s3types.ObjectCannedACL(i.ACL) - } - if i.ChecksumAlgorithm != "" { - input.ChecksumAlgorithm = s3types.ChecksumAlgorithm(i.ChecksumAlgorithm) - } else { - input.ChecksumAlgorithm = s3types.ChecksumAlgorithm(checksumAlgorithm) - } - if i.ObjectLockLegalHoldStatus != "" { - input.ObjectLockLegalHoldStatus = s3types.ObjectLockLegalHoldStatus(i.ObjectLockLegalHoldStatus) - } - if i.ObjectLockMode != "" { - input.ObjectLockMode = s3types.ObjectLockMode(i.ObjectLockMode) - } - if i.RequestPayer != "" { - input.RequestPayer = s3types.RequestPayer(i.RequestPayer) - } - if i.ServerSideEncryption != "" { - input.ServerSideEncryption = s3types.ServerSideEncryption(i.ServerSideEncryption) - } - if i.StorageClass != "" { - input.StorageClass = s3types.StorageClass(i.StorageClass) - } - input.BucketKeyEnabled = aws.Bool(i.BucketKeyEnabled) - input.CacheControl = nzstring(i.CacheControl) - input.ContentDisposition = nzstring(i.ContentDisposition) - input.ContentEncoding = nzstring(i.ContentEncoding) - input.ContentLanguage = nzstring(i.ContentLanguage) - input.ContentType = nzstring(i.ContentType) - input.ExpectedBucketOwner = nzstring(i.ExpectedBucketOwner) - input.GrantFullControl = nzstring(i.GrantFullControl) - input.GrantRead = nzstring(i.GrantRead) - input.GrantReadACP = nzstring(i.GrantReadACP) - input.GrantWriteACP = nzstring(i.GrantWriteACP) - input.Metadata = i.Metadata - input.SSECustomerAlgorithm = nzstring(i.SSECustomerAlgorithm) - input.SSECustomerKey = nzstring(i.SSECustomerKey) - input.SSEKMSEncryptionContext = nzstring(i.SSEKMSEncryptionContext) - input.SSEKMSKeyId = nzstring(i.SSEKMSKeyID) - input.Tagging = nzstring(i.Tagging) - input.WebsiteRedirectLocation = nzstring(i.WebsiteRedirectLocation) - input.Expires = nztime(i.Expires) - input.ObjectLockRetainUntilDate = nztime(i.ObjectLockRetainUntilDate) - return input -} - -func (i PutObjectInput) mapCreateMultipartUploadInput() *s3.CreateMultipartUploadInput { - input := &s3.CreateMultipartUploadInput{ - Bucket: aws.String(i.Bucket), - Key: aws.String(i.Key), - } - if i.ACL != "" { - input.ACL = s3types.ObjectCannedACL(i.ACL) - } - if i.ChecksumAlgorithm != "" { - input.ChecksumAlgorithm = s3types.ChecksumAlgorithm(i.ChecksumAlgorithm) - } else { - input.ChecksumAlgorithm = s3types.ChecksumAlgorithm(i.ChecksumAlgorithm) - } - if i.ObjectLockLegalHoldStatus != "" { - input.ObjectLockLegalHoldStatus = s3types.ObjectLockLegalHoldStatus(i.ObjectLockLegalHoldStatus) - } - if i.ObjectLockMode != "" { - input.ObjectLockMode = s3types.ObjectLockMode(i.ObjectLockMode) - } - if i.RequestPayer != "" { - input.RequestPayer = s3types.RequestPayer(i.RequestPayer) - } - if i.ServerSideEncryption != "" { - input.ServerSideEncryption = s3types.ServerSideEncryption(i.ServerSideEncryption) - } - if i.StorageClass != "" { - input.StorageClass = s3types.StorageClass(i.StorageClass) - } - input.BucketKeyEnabled = aws.Bool(i.BucketKeyEnabled) - input.CacheControl = nzstring(i.CacheControl) - input.ContentDisposition = nzstring(i.ContentDisposition) - input.ContentEncoding = nzstring(i.ContentEncoding) - input.ContentLanguage = nzstring(i.ContentLanguage) - input.ContentType = nzstring(i.ContentType) - input.ExpectedBucketOwner = nzstring(i.ExpectedBucketOwner) - input.GrantFullControl = nzstring(i.GrantFullControl) - input.GrantRead = nzstring(i.GrantRead) - input.GrantReadACP = nzstring(i.GrantReadACP) - input.GrantWriteACP = nzstring(i.GrantWriteACP) - input.Metadata = i.Metadata - input.SSECustomerAlgorithm = nzstring(i.SSECustomerAlgorithm) - input.SSECustomerKey = nzstring(i.SSECustomerKey) - input.SSEKMSEncryptionContext = nzstring(i.SSEKMSEncryptionContext) - input.SSEKMSKeyId = nzstring(i.SSEKMSKeyID) - input.Tagging = nzstring(i.Tagging) - input.WebsiteRedirectLocation = nzstring(i.WebsiteRedirectLocation) - input.Expires = nztime(i.Expires) - input.ObjectLockRetainUntilDate = nztime(i.ObjectLockRetainUntilDate) - return input -} - -func (i PutObjectInput) mapCompleteMultipartUploadInput(uploadID *string, completedParts completedParts) *s3.CompleteMultipartUploadInput { - input := &s3.CompleteMultipartUploadInput{ - Bucket: aws.String(i.Bucket), - Key: aws.String(i.Key), - UploadId: uploadID, - } - if i.RequestPayer != "" { - input.RequestPayer = s3types.RequestPayer(i.RequestPayer) - } - input.ExpectedBucketOwner = nzstring(i.ExpectedBucketOwner) - input.SSECustomerAlgorithm = nzstring(i.SSECustomerAlgorithm) - input.SSECustomerKey = nzstring(i.SSECustomerKey) - var parts []s3types.CompletedPart - for _, part := range completedParts { - parts = append(parts, part.MapCompletedPart()) - } - if parts != nil { - input.MultipartUpload = &s3types.CompletedMultipartUpload{Parts: parts} - } - return input -} - -func (i PutObjectInput) mapUploadPartInput(body io.Reader, partNum *int32, uploadID *string) *s3.UploadPartInput { - input := &s3.UploadPartInput{ - Bucket: aws.String(i.Bucket), - Key: aws.String(i.Key), - Body: body, - PartNumber: partNum, - UploadId: uploadID, - } - if i.ChecksumAlgorithm != "" { - input.ChecksumAlgorithm = s3types.ChecksumAlgorithm(i.ChecksumAlgorithm) - } - if i.RequestPayer != "" { - input.RequestPayer = s3types.RequestPayer(i.RequestPayer) - } - input.ExpectedBucketOwner = nzstring(i.ExpectedBucketOwner) - input.SSECustomerAlgorithm = nzstring(i.SSECustomerAlgorithm) - input.SSECustomerKey = nzstring(i.SSECustomerKey) - return input -} - -func (i *PutObjectInput) mapAbortMultipartUploadInput(uploadID *string) *s3.AbortMultipartUploadInput { - input := &s3.AbortMultipartUploadInput{ - Bucket: aws.String(i.Bucket), - Key: aws.String(i.Key), - UploadId: uploadID, - } - return input -} - -// PutObjectOutput represents a response from the Upload() call. It contains common fields -// of s3 PutObject and CompleteMultipartUpload output -type PutObjectOutput struct { - // The ID for a multipart upload to S3. In the case of an error the error - // can be cast to the MultiUploadFailure interface to extract the upload ID. - // Will be empty string if multipart upload was not used, and the object - // was uploaded as a single PutObject call. - UploadID string - - // The list of parts that were uploaded and their checksums. Will be empty - // if multipart upload was not used, and the object was uploaded as a - // single PutObject call. - CompletedParts []types.CompletedPart - - // Indicates whether the uploaded object uses an S3 Bucket Key for server-side - // encryption with Amazon Web Services KMS (SSE-KMS). - BucketKeyEnabled bool - - // The base64-encoded, 32-bit CRC32 checksum of the object. - ChecksumCRC32 string - - // The base64-encoded, 32-bit CRC32C checksum of the object. - ChecksumCRC32C string - - // The base64-encoded, 160-bit SHA-1 digest of the object. - ChecksumSHA1 string - - // The base64-encoded, 256-bit SHA-256 digest of the object. - ChecksumSHA256 string - - // Entity tag for the uploaded object. - ETag string - - // If the object expiration is configured, this will contain the expiration date - // (expiry-date) and rule ID (rule-id). The value of rule-id is URL encoded. - Expiration string - - // The bucket where the newly created object is put - Bucket string - - // The object key of the newly created object. - Key string - - // If present, indicates that the requester was successfully charged for the - // request. - RequestCharged types.RequestCharged - - // If present, specifies the ID of the Amazon Web Services Key Management Service - // (Amazon Web Services KMS) symmetric customer managed customer master key (CMK) - // that was used for the object. - SSEKMSKeyID string - - // If you specified server-side encryption either with an Amazon S3-managed - // encryption key or an Amazon Web Services KMS customer master key (CMK) in your - // initiate multipart upload request, the response includes this header. It - // confirms the encryption algorithm that Amazon S3 used to encrypt the object. - ServerSideEncryption types.ServerSideEncryption - - // The version of the object that was uploaded. Will only be populated if - // the S3 Bucket is versioned. If the bucket is not versioned this field - // will not be set. - VersionID string - - // Metadata pertaining to the operation's result. - ResultMetadata smithymiddleware.Metadata -} - -func (o *PutObjectOutput) mapFromPutObjectOutput(out *s3.PutObjectOutput, bucket, key string) { - o.BucketKeyEnabled = aws.ToBool(out.BucketKeyEnabled) - o.ChecksumCRC32 = aws.ToString(out.ChecksumCRC32) - o.ChecksumCRC32C = aws.ToString(out.ChecksumCRC32C) - o.ChecksumSHA1 = aws.ToString(out.ChecksumSHA1) - o.ChecksumSHA256 = aws.ToString(out.ChecksumSHA256) - o.ETag = aws.ToString(out.ETag) - o.Expiration = aws.ToString(out.Expiration) - o.Bucket = bucket - o.Key = key - o.RequestCharged = types.RequestCharged(out.RequestCharged) - o.SSEKMSKeyID = aws.ToString(out.SSEKMSKeyId) - o.ServerSideEncryption = types.ServerSideEncryption(out.ServerSideEncryption) - o.VersionID = aws.ToString(out.VersionId) - o.ResultMetadata = out.ResultMetadata.Clone() -} - -func (o *PutObjectOutput) mapFromCompleteMultipartUploadOutput(out *s3.CompleteMultipartUploadOutput, bucket, uploadID string, completedParts completedParts) { - o.UploadID = uploadID - o.CompletedParts = completedParts - o.BucketKeyEnabled = aws.ToBool(out.BucketKeyEnabled) - o.ChecksumCRC32 = aws.ToString(out.ChecksumCRC32) - o.ChecksumCRC32C = aws.ToString(out.ChecksumCRC32C) - o.ChecksumSHA1 = aws.ToString(out.ChecksumSHA1) - o.ChecksumSHA256 = aws.ToString(out.ChecksumSHA256) - o.ETag = aws.ToString(out.ETag) - o.Expiration = aws.ToString(out.Expiration) - o.Bucket = bucket - o.Key = aws.ToString(out.Key) - o.RequestCharged = types.RequestCharged(out.RequestCharged) - o.SSEKMSKeyID = aws.ToString(out.SSEKMSKeyId) - o.ServerSideEncryption = types.ServerSideEncryption(out.ServerSideEncryption) - o.VersionID = aws.ToString(out.VersionId) - o.ResultMetadata = out.ResultMetadata -} - -// PutObject uploads an object to S3, intelligently buffering large -// files into smaller chunks and sending them in parallel across multiple -// goroutines. You can configure the chunk size and concurrency through the -// Options parameters. -// -// Additional functional options can be provided to configure the individual -// upload. These options are copies of the original Options instance, the client of which PutObject is called from. -// Modifying the options will not impact the original Client and Options instance. -func (c *Client) PutObject(ctx context.Context, input *PutObjectInput, opts ...func(*Options)) (*PutObjectOutput, error) { - i := uploader{in: input, options: c.options.Copy()} - for _, opt := range opts { - opt(&i.options) - } - - return i.upload(ctx) -} - -type uploader struct { - options Options - - in *PutObjectInput - - // PartPool allows for the re-usage of streaming payload part buffers between upload calls - partPool bytesBufferPool -} - -func (u *uploader) upload(ctx context.Context) (*PutObjectOutput, error) { - if err := u.init(); err != nil { - return nil, fmt.Errorf("unable to initialize upload: %w", err) - } - defer u.partPool.Close() - - clientOptions := []func(o *s3.Options){ - func(o *s3.Options) { - o.APIOptions = append(o.APIOptions, - middleware.AddSDKAgentKey(middleware.FeatureMetadata, userAgentKey), - addFeatureUserAgent, - ) - }} - - r, _, cleanUp, err := u.nextReader(ctx) - - if err == io.EOF { - return u.singleUpload(ctx, r, cleanUp, clientOptions...) - } else if err != nil { - cleanUp() - return nil, err - } - - mu := multiUploader{ - uploader: u, - } - return mu.upload(ctx, r, cleanUp, clientOptions...) -} - -func (u *uploader) init() error { - if err := u.initSize(); err != nil { - return err - } - u.partPool = newDefaultSlicePool(u.options.PartSizeBytes, u.options.Concurrency+1) - - return nil -} - -// initSize checks user configured partsize and up-size it if calculated part count exceeds max value -func (u *uploader) initSize() error { - if u.options.PartSizeBytes < minPartSizeBytes { - return fmt.Errorf("part size must be at least %d bytes", minPartSizeBytes) - } - - var bodySize int64 - switch r := u.in.Body.(type) { - case io.Seeker: - n, err := types.SeekerLen(r) - if err != nil { - return err - } - bodySize = n - default: - if l := u.in.ContentLength; l > 0 { - bodySize = l - } - } - - // Try to adjust partSize if it is too small and account for - // integer division truncation. - if bodySize/u.options.PartSizeBytes >= int64(defaultMaxUploadParts) { - // Add one to the part size to account for remainders - // during the size calculation. e.g odd number of bytes. - u.options.PartSizeBytes = (bodySize / int64(defaultMaxUploadParts)) + 1 - } - return nil -} - -func (u *uploader) singleUpload(ctx context.Context, r io.Reader, cleanUp func(), clientOptions ...func(*s3.Options)) (*PutObjectOutput, error) { - defer cleanUp() - - params := u.in.mapSingleUploadInput(r, u.options.ChecksumAlgorithm) - - out, err := u.options.S3.PutObject(ctx, params, clientOptions...) - if err != nil { - return nil, err - } - - var output PutObjectOutput - output.mapFromPutObjectOutput(out, u.in.Bucket, u.in.Key) - return &output, nil -} - -// nextReader reads the next chunk of data from input Body -func (u *uploader) nextReader(ctx context.Context) (io.Reader, int, func(), error) { - part, err := u.partPool.Get(ctx) - if err != nil { - return nil, 0, func() {}, err - } - - n, err := readFillBuf(u.in.Body, part) - - cleanup := func() { - u.partPool.Put(part) - } - return bytes.NewReader(part[0:n]), n, cleanup, err -} - -func readFillBuf(r io.Reader, b []byte) (offset int, err error) { - for offset < len(b) && err == nil { - var n int - n, err = r.Read(b[offset:]) - offset += n - } - return offset, err -} - -type multiUploader struct { - *uploader - wg sync.WaitGroup - m sync.Mutex - err error - uploadID *string - parts completedParts -} - -type ulChunk struct { - buf io.Reader - partNum *int32 - cleanup func() -} - -type completedParts []types.CompletedPart - -func (cp completedParts) Len() int { - return len(cp) -} - -func (cp completedParts) Less(i, j int) bool { - return aws.ToInt32(cp[i].PartNumber) < aws.ToInt32(cp[j].PartNumber) -} - -func (cp completedParts) Swap(i, j int) { - cp[i], cp[j] = cp[j], cp[i] -} - -// upload will perform a multipart upload using the firstBuf buffer containing -// the first chunk of data. -func (u *multiUploader) upload(ctx context.Context, firstBuf io.Reader, cleanup func(), clientOptions ...func(*s3.Options)) (*PutObjectOutput, error) { - params := u.uploader.in.mapCreateMultipartUploadInput() - - // Create a multipart - resp, err := u.uploader.options.S3.CreateMultipartUpload(ctx, params, clientOptions...) - if err != nil { - cleanup() - return nil, err - } - u.uploadID = resp.UploadId - - ch := make(chan ulChunk, u.options.Concurrency) - for i := 0; i < u.options.Concurrency; i++ { - // launch workers - u.wg.Add(1) - go u.readChunk(ctx, ch, clientOptions...) - } - - var partNum int32 = 1 - ch <- ulChunk{buf: firstBuf, partNum: aws.Int32(partNum), cleanup: cleanup} - for u.geterr() == nil && err == nil { - partNum++ - var ( - data io.Reader - nextChunkLen int - ok bool - ) - data, nextChunkLen, cleanup, err = u.nextReader(ctx) - ok, err = u.shouldContinue(partNum, nextChunkLen, err) - if !ok { - cleanup() - if err != nil { - u.seterr(err) - } - break - } - - ch <- ulChunk{buf: data, partNum: aws.Int32(partNum), cleanup: cleanup} - } - - // close the channel, wait for workers and complete upload - close(ch) - u.wg.Wait() - completeOut := u.complete(ctx, clientOptions...) - - if err := u.geterr(); err != nil { - return nil, &multipartUploadError{ - err: err, - uploadID: *u.uploadID, - } - } - - var out PutObjectOutput - out.mapFromCompleteMultipartUploadOutput(completeOut, aws.ToString(params.Bucket), aws.ToString(u.uploadID), u.parts) - return &out, nil -} - -func (u *multiUploader) shouldContinue(part int32, nextChunkLen int, err error) (bool, error) { - if err != nil && err != io.EOF { - return false, fmt.Errorf("read multipart upload data failed, %w", err) - } - - if nextChunkLen == 0 { - // No need to upload empty part, if file was empty to start - // with empty single part would of been created and never - // started multipart upload. - return false, nil - } - - // This upload exceeded maximum number of supported parts, error now. - if part > defaultMaxUploadParts { - return false, fmt.Errorf(fmt.Sprintf("exceeded total allowed S3 limit MaxUploadParts (%d). Adjust PartSize to fit in this limit", defaultMaxUploadParts)) - } - - return true, err -} - -// readChunk runs in worker goroutines to pull chunks off of the ch channel -// and send() them as UploadPart requests. -func (u *multiUploader) readChunk(ctx context.Context, ch chan ulChunk, clientOptions ...func(*s3.Options)) { - defer u.wg.Done() - for { - data, ok := <-ch - - if !ok { - break - } - - if u.geterr() == nil { - if err := u.send(ctx, data, clientOptions...); err != nil { - u.seterr(err) - } - } - - data.cleanup() - } -} - -// send performs an UploadPart request and keeps track of the completed -// part information. -func (u *multiUploader) send(ctx context.Context, c ulChunk, clientOptions ...func(*s3.Options)) error { - params := u.in.mapUploadPartInput(c.buf, c.partNum, u.uploadID) - resp, err := u.options.S3.UploadPart(ctx, params, clientOptions...) - if err != nil { - return err - } - - var completed types.CompletedPart - completed.MapFrom(resp, c.partNum) - - u.m.Lock() - u.parts = append(u.parts, completed) - u.m.Unlock() - - return nil -} - -// geterr is a thread-safe getter for the error object -func (u *multiUploader) geterr() error { - u.m.Lock() - defer u.m.Unlock() - - return u.err -} - -// seterr is a thread-safe setter for the error object -func (u *multiUploader) seterr(e error) { - u.m.Lock() - defer u.m.Unlock() - - u.err = e -} - -func (u *multiUploader) fail(ctx context.Context, clientOptions ...func(*s3.Options)) { - params := u.in.mapAbortMultipartUploadInput(u.uploadID) - _, err := u.options.S3.AbortMultipartUpload(ctx, params, clientOptions...) - if err != nil { - u.seterr(fmt.Errorf("failed to abort multipart upload (%v), triggered after multipart upload failed: %v", err, u.geterr())) - } -} - -// complete successfully completes a multipart upload and returns the response. -func (u *multiUploader) complete(ctx context.Context, clientOptions ...func(*s3.Options)) *s3.CompleteMultipartUploadOutput { - if u.geterr() != nil { - u.fail(ctx) - return nil - } - - // Parts must be sorted in PartNumber order. - sort.Sort(u.parts) - - params := u.in.mapCompleteMultipartUploadInput(u.uploadID, u.parts) - - resp, err := u.options.S3.CompleteMultipartUpload(ctx, params, clientOptions...) - if err != nil { - u.seterr(err) - u.fail(ctx) - } - - return resp -} - -func addFeatureUserAgent(stack *smithymiddleware.Stack) error { - ua, err := getOrAddRequestUserAgent(stack) - if err != nil { - return err - } - - ua.AddUserAgentFeature(middleware.UserAgentFeatureS3Transfer) - return nil -} - -func getOrAddRequestUserAgent(stack *smithymiddleware.Stack) (*middleware.RequestUserAgent, error) { - id := (*middleware.RequestUserAgent)(nil).ID() - mw, ok := stack.Build.Get(id) - if !ok { - mw = middleware.NewRequestUserAgent() - if err := stack.Build.Add(mw, smithymiddleware.After); err != nil { - return nil, err - } - } - - ua, ok := mw.(*middleware.RequestUserAgent) - if !ok { - return nil, fmt.Errorf("%T for %s middleware did not match expected type", mw, id) - } - - return ua, nil -} diff --git a/feature/s3/transfermanager/api_op_PutObject_integ_test.go b/feature/s3/transfermanager/api_op_PutObject_integ_test.go deleted file mode 100644 index 921e237e5dc..00000000000 --- a/feature/s3/transfermanager/api_op_PutObject_integ_test.go +++ /dev/null @@ -1,24 +0,0 @@ -//go:build integration -// +build integration - -package transfermanager - -import ( - "bytes" - "strings" - "testing" -) - -func TestInteg_PutObject(t *testing.T) { - cases := map[string]putObjectTestData{ - "seekable body": {Body: strings.NewReader("hello world"), ExpectBody: []byte("hello world")}, - "empty string body": {Body: strings.NewReader(""), ExpectBody: []byte("")}, - "multipart upload body": {Body: bytes.NewReader(largeObjectBuf), ExpectBody: largeObjectBuf}, - } - - for name, c := range cases { - t.Run(name, func(t *testing.T) { - testPutObject(t, setupMetadata.Buckets.Source.Name, c) - }) - } -} diff --git a/feature/s3/transfermanager/dns_cache.go b/feature/s3/transfermanager/dns_cache.go deleted file mode 100644 index f9f041cbb57..00000000000 --- a/feature/s3/transfermanager/dns_cache.go +++ /dev/null @@ -1,61 +0,0 @@ -package transfermanager - -import ( - "sync" - "time" - - "github.com/aws/smithy-go/container/private/cache" - "github.com/aws/smithy-go/container/private/cache/lru" -) - -// dnsCache implements an LRU cache of DNS query results by host. -// -// Cache retrievals will automatically rotate between IP addresses for -// multi-value query results. -type dnsCache struct { - mu sync.Mutex - addrs cache.Cache -} - -// newDNSCache returns an initialized dnsCache with given capacity. -func newDNSCache(cap int) *dnsCache { - return &dnsCache{ - addrs: lru.New(cap), - } -} - -// GetAddr returns the next IP address for the given host if present in the -// cache. -func (c *dnsCache) GetAddr(host string) (string, bool) { - c.mu.Lock() - defer c.mu.Unlock() - - v, ok := c.addrs.Get(host) - if !ok { - return "", false - } - - record := v.(*dnsCacheEntry) - if timeNow().After(record.expires) { - return "", false - } - - addr := record.addrs[record.index] - record.index = (record.index + 1) % len(record.addrs) - return addr, true -} - -// PutAddrs stores a DNS query result in the cache, overwriting any present -// entry for the host if it exists. -func (c *dnsCache) PutAddrs(host string, addrs []string, expires time.Time) { - c.mu.Lock() - defer c.mu.Unlock() - - c.addrs.Put(host, &dnsCacheEntry{addrs, expires, 0}) -} - -type dnsCacheEntry struct { - addrs []string - expires time.Time - index int -} diff --git a/feature/s3/transfermanager/go.mod b/feature/s3/transfermanager/go.mod deleted file mode 100644 index f5cff7bcdd3..00000000000 --- a/feature/s3/transfermanager/go.mod +++ /dev/null @@ -1,61 +0,0 @@ -module github.com/aws/aws-sdk-go-v2/feature/s3/transfermanager - -go 1.21 - -require ( - github.com/aws/aws-sdk-go-v2 v1.34.0 - github.com/aws/aws-sdk-go-v2/config v1.29.2 - github.com/aws/aws-sdk-go-v2/service/s3 v1.74.1 - github.com/aws/aws-sdk-go-v2/service/sts v1.33.10 - github.com/aws/smithy-go v1.22.2 -) - -require ( - github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.8 // indirect - github.com/aws/aws-sdk-go-v2/credentials v1.17.55 // indirect - github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.25 // indirect - github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.29 // indirect - github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.29 // indirect - github.com/aws/aws-sdk-go-v2/internal/ini v1.8.2 // indirect - github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.29 // indirect - github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.2 // indirect - github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.5.3 // indirect - github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.10 // indirect - github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.10 // indirect - github.com/aws/aws-sdk-go-v2/service/sso v1.24.12 // indirect - github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.11 // indirect -) - -replace github.com/aws/aws-sdk-go-v2 => ../../../ - -replace github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream => ../../../aws/protocol/eventstream/ - -replace github.com/aws/aws-sdk-go-v2/config => ../../../config/ - -replace github.com/aws/aws-sdk-go-v2/credentials => ../../../credentials/ - -replace github.com/aws/aws-sdk-go-v2/feature/ec2/imds => ../../../feature/ec2/imds/ - -replace github.com/aws/aws-sdk-go-v2/internal/configsources => ../../../internal/configsources/ - -replace github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 => ../../../internal/endpoints/v2/ - -replace github.com/aws/aws-sdk-go-v2/internal/ini => ../../../internal/ini/ - -replace github.com/aws/aws-sdk-go-v2/internal/v4a => ../../../internal/v4a/ - -replace github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding => ../../../service/internal/accept-encoding/ - -replace github.com/aws/aws-sdk-go-v2/service/internal/checksum => ../../../service/internal/checksum/ - -replace github.com/aws/aws-sdk-go-v2/service/internal/presigned-url => ../../../service/internal/presigned-url/ - -replace github.com/aws/aws-sdk-go-v2/service/internal/s3shared => ../../../service/internal/s3shared/ - -replace github.com/aws/aws-sdk-go-v2/service/s3 => ../../../service/s3/ - -replace github.com/aws/aws-sdk-go-v2/service/sso => ../../../service/sso/ - -replace github.com/aws/aws-sdk-go-v2/service/ssooidc => ../../../service/ssooidc/ - -replace github.com/aws/aws-sdk-go-v2/service/sts => ../../../service/sts/ diff --git a/feature/s3/transfermanager/go.sum b/feature/s3/transfermanager/go.sum deleted file mode 100644 index cad7e6ad46a..00000000000 --- a/feature/s3/transfermanager/go.sum +++ /dev/null @@ -1,2 +0,0 @@ -github.com/aws/smithy-go v1.22.2 h1:6D9hW43xKFrRx/tXXfAlIZc4JI+yQe6snnWcQyxSyLQ= -github.com/aws/smithy-go v1.22.2/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg= diff --git a/feature/s3/transfermanager/go_module_metadata.go b/feature/s3/transfermanager/go_module_metadata.go deleted file mode 100644 index 33f1ddfbdee..00000000000 --- a/feature/s3/transfermanager/go_module_metadata.go +++ /dev/null @@ -1,6 +0,0 @@ -// Code generated by internal/repotools/cmd/updatemodulemeta DO NOT EDIT. - -package transfermanager - -// goModuleVersion is the tagged release for this module -const goModuleVersion = "tip" diff --git a/feature/s3/transfermanager/internal/testing/endpoint.go b/feature/s3/transfermanager/internal/testing/endpoint.go deleted file mode 100644 index 082aedec4f2..00000000000 --- a/feature/s3/transfermanager/internal/testing/endpoint.go +++ /dev/null @@ -1,25 +0,0 @@ -package testing - -import ( - "context" - "net/url" - - "github.com/aws/aws-sdk-go-v2/service/s3" - smithyendpoints "github.com/aws/smithy-go/endpoints" -) - -// EndpointResolverV2 is a mock s3 endpoint resolver v2 for testing -type EndpointResolverV2 struct { - URL string -} - -// ResolveEndpoint returns the given endpoint url -func (r EndpointResolverV2) ResolveEndpoint(ctx context.Context, params s3.EndpointParameters) (smithyendpoints.Endpoint, error) { - u, err := url.Parse(r.URL) - if err != nil { - return smithyendpoints.Endpoint{}, err - } - return smithyendpoints.Endpoint{ - URI: *u, - }, nil -} diff --git a/feature/s3/transfermanager/internal/testing/upload.go b/feature/s3/transfermanager/internal/testing/upload.go deleted file mode 100644 index 1764fc089e2..00000000000 --- a/feature/s3/transfermanager/internal/testing/upload.go +++ /dev/null @@ -1,193 +0,0 @@ -package testing - -import ( - "context" - "fmt" - "io" - "io/ioutil" - "net/http" - "net/url" - "slices" - "sync" - - "github.com/aws/aws-sdk-go-v2/aws" - "github.com/aws/aws-sdk-go-v2/service/s3" -) - -// UploadLoggingClient is a mock client that can be used to record and stub responses for testing the Uploader. -type UploadLoggingClient struct { - Invocations []string - Params []interface{} - - ConsumeBody bool - - ignoredOperations []string - - PartNum int - m sync.Mutex - - PutObjectFn func(*UploadLoggingClient, *s3.PutObjectInput) (*s3.PutObjectOutput, error) - UploadPartFn func(*UploadLoggingClient, *s3.UploadPartInput) (*s3.UploadPartOutput, error) - CreateMultipartUploadFn func(*UploadLoggingClient, *s3.CreateMultipartUploadInput) (*s3.CreateMultipartUploadOutput, error) - CompleteMultipartUploadFn func(*UploadLoggingClient, *s3.CompleteMultipartUploadInput) (*s3.CompleteMultipartUploadOutput, error) - AbortMultipartUploadFn func(*UploadLoggingClient, *s3.AbortMultipartUploadInput) (*s3.AbortMultipartUploadOutput, error) -} - -func (u *UploadLoggingClient) simulateHTTPClientOption(optFns ...func(*s3.Options)) error { - - o := s3.Options{ - HTTPClient: httpDoFunc(func(r *http.Request) (*http.Response, error) { - return &http.Response{ - Request: r, - }, nil - }), - } - - for _, fn := range optFns { - fn(&o) - } - - _, err := o.HTTPClient.Do(&http.Request{ - URL: &url.URL{ - Scheme: "https", - Host: "mock.amazonaws.com", - Path: "/key", - RawQuery: "foo=bar", - }, - }) - if err != nil { - return err - } - - return nil -} - -type httpDoFunc func(*http.Request) (*http.Response, error) - -func (f httpDoFunc) Do(r *http.Request) (*http.Response, error) { - return f(r) -} - -func (u *UploadLoggingClient) traceOperation(name string, params interface{}) { - if slices.Contains(u.ignoredOperations, name) { - return - } - u.Invocations = append(u.Invocations, name) - u.Params = append(u.Params, params) - -} - -// PutObject is the S3 PutObject API. -func (u *UploadLoggingClient) PutObject(ctx context.Context, params *s3.PutObjectInput, optFns ...func(*s3.Options)) (*s3.PutObjectOutput, error) { - u.m.Lock() - defer u.m.Unlock() - - if u.ConsumeBody { - io.Copy(ioutil.Discard, params.Body) - } - - u.traceOperation("PutObject", params) - - if err := u.simulateHTTPClientOption(optFns...); err != nil { - return nil, err - } - - if u.PutObjectFn != nil { - return u.PutObjectFn(u, params) - } - - return &s3.PutObjectOutput{ - VersionId: aws.String("VERSION-ID"), - }, nil -} - -// UploadPart is the S3 UploadPart API. -func (u *UploadLoggingClient) UploadPart(ctx context.Context, params *s3.UploadPartInput, optFns ...func(*s3.Options)) (*s3.UploadPartOutput, error) { - u.m.Lock() - defer u.m.Unlock() - - if u.ConsumeBody { - io.Copy(ioutil.Discard, params.Body) - } - - u.traceOperation("UploadPart", params) - - if err := u.simulateHTTPClientOption(optFns...); err != nil { - return nil, err - } - - if u.UploadPartFn != nil { - return u.UploadPartFn(u, params) - } - - return &s3.UploadPartOutput{ - ETag: aws.String(fmt.Sprintf("ETAG%d", *params.PartNumber)), - }, nil -} - -// CreateMultipartUpload is the S3 CreateMultipartUpload API. -func (u *UploadLoggingClient) CreateMultipartUpload(ctx context.Context, params *s3.CreateMultipartUploadInput, optFns ...func(*s3.Options)) (*s3.CreateMultipartUploadOutput, error) { - u.m.Lock() - defer u.m.Unlock() - - u.traceOperation("CreateMultipartUpload", params) - - if err := u.simulateHTTPClientOption(optFns...); err != nil { - return nil, err - } - - if u.CreateMultipartUploadFn != nil { - return u.CreateMultipartUploadFn(u, params) - } - - return &s3.CreateMultipartUploadOutput{ - UploadId: aws.String("UPLOAD-ID"), - }, nil -} - -// CompleteMultipartUpload is the S3 CompleteMultipartUpload API. -func (u *UploadLoggingClient) CompleteMultipartUpload(ctx context.Context, params *s3.CompleteMultipartUploadInput, optFns ...func(*s3.Options)) (*s3.CompleteMultipartUploadOutput, error) { - u.m.Lock() - defer u.m.Unlock() - - u.traceOperation("CompleteMultipartUpload", params) - - if err := u.simulateHTTPClientOption(optFns...); err != nil { - return nil, err - } - - if u.CompleteMultipartUploadFn != nil { - return u.CompleteMultipartUploadFn(u, params) - } - - return &s3.CompleteMultipartUploadOutput{ - Location: aws.String("http://location"), - VersionId: aws.String("VERSION-ID"), - }, nil -} - -// AbortMultipartUpload is the S3 AbortMultipartUpload API. -func (u *UploadLoggingClient) AbortMultipartUpload(ctx context.Context, params *s3.AbortMultipartUploadInput, optFns ...func(*s3.Options)) (*s3.AbortMultipartUploadOutput, error) { - u.m.Lock() - defer u.m.Unlock() - - u.traceOperation("AbortMultipartUpload", params) - if err := u.simulateHTTPClientOption(optFns...); err != nil { - return nil, err - } - - if u.AbortMultipartUploadFn != nil { - return u.AbortMultipartUploadFn(u, params) - } - - return &s3.AbortMultipartUploadOutput{}, nil -} - -// NewUploadLoggingClient returns a new UploadLoggingClient. -func NewUploadLoggingClient(ignoredOps []string) (*UploadLoggingClient, *[]string, *[]interface{}) { - c := &UploadLoggingClient{ - ignoredOperations: ignoredOps, - } - - return c, &c.Invocations, &c.Params -} diff --git a/feature/s3/transfermanager/options.go b/feature/s3/transfermanager/options.go deleted file mode 100644 index a49e74afd64..00000000000 --- a/feature/s3/transfermanager/options.go +++ /dev/null @@ -1,63 +0,0 @@ -package transfermanager - -import "github.com/aws/aws-sdk-go-v2/feature/s3/transfermanager/types" - -// Options provides params needed for transfer api calls -type Options struct { - // The client to use when uploading to S3. - S3 S3APIClient - - // The buffer size (in bytes) to use when buffering data into chunks and - // sending them as parts to S3. The minimum allowed part size is 5MB, and - // if this value is set to zero, the DefaultUploadPartSize value will be used. - PartSizeBytes int64 - - // The threshold bytes to decide when the file should be multi-uploaded - MultipartUploadThreshold int64 - - // Option to disable checksum validation for download - DisableChecksum bool - - // Checksum algorithm to use for upload - ChecksumAlgorithm types.ChecksumAlgorithm - - // The number of goroutines to spin up in parallel per call to Upload when - // sending parts. If this is set to zero, the DefaultUploadConcurrency value - // will be used. - // - // The concurrency pool is not shared between calls to Upload. - Concurrency int -} - -func (o *Options) init() { -} - -func resolveConcurrency(o *Options) { - if o.Concurrency == 0 { - o.Concurrency = defaultTransferConcurrency - } -} - -func resolvePartSizeBytes(o *Options) { - if o.PartSizeBytes == 0 { - o.PartSizeBytes = minPartSizeBytes - } -} - -func resolveChecksumAlgorithm(o *Options) { - if o.ChecksumAlgorithm == "" { - o.ChecksumAlgorithm = types.ChecksumAlgorithmCrc32 - } -} - -func resolveMultipartUploadThreshold(o *Options) { - if o.MultipartUploadThreshold == 0 { - o.MultipartUploadThreshold = defaultMultipartUploadThreshold - } -} - -// Copy returns new copy of the Options -func (o Options) Copy() Options { - to := o - return to -} diff --git a/feature/s3/transfermanager/pool.go b/feature/s3/transfermanager/pool.go deleted file mode 100644 index a4a786e2357..00000000000 --- a/feature/s3/transfermanager/pool.go +++ /dev/null @@ -1,62 +0,0 @@ -package transfermanager - -import ( - "context" - "fmt" -) - -type bytesBufferPool interface { - Get(context.Context) ([]byte, error) - Put([]byte) - Close() -} - -type defaultSlicePool struct { - slices chan []byte -} - -func newDefaultSlicePool(sliceSize int64, capacity int) *defaultSlicePool { - p := &defaultSlicePool{} - - slices := make(chan []byte, capacity) - for i := 0; i < capacity; i++ { - slices <- make([]byte, sliceSize) - } - - p.slices = slices - return p -} - -var errZeroCapacity = fmt.Errorf("get called on zero capacity pool") - -func (p *defaultSlicePool) Get(ctx context.Context) ([]byte, error) { - select { - case <-ctx.Done(): - return nil, ctx.Err() - default: - } - - for { - select { - case bs, ok := <-p.slices: - if !ok { - return nil, errZeroCapacity - } - return bs, nil - case <-ctx.Done(): - return nil, ctx.Err() - } - } -} - -func (p *defaultSlicePool) Put(bs []byte) { - p.slices <- bs -} - -func (p *defaultSlicePool) Close() { - close(p.slices) - for range p.slices { - // drain channel - } - p.slices = nil -} diff --git a/feature/s3/transfermanager/pool_test.go b/feature/s3/transfermanager/pool_test.go deleted file mode 100644 index 74a6d9b49b0..00000000000 --- a/feature/s3/transfermanager/pool_test.go +++ /dev/null @@ -1,47 +0,0 @@ -package transfermanager - -import ( - "context" - "sync" - "testing" -) - -func TestDefaultSlicePool(t *testing.T) { - pool := newDefaultSlicePool(1, 2) - - var bs []byte - var err error - var wg sync.WaitGroup - - for i := 0; i < 200; i++ { - wg.Add(1) - go func() { - defer wg.Done() - pool.Put(make([]byte, 1)) - }() - } - // wait for a slice to be put back - for i := 0; i < 200; i++ { - bs, err = pool.Get(context.Background()) - if err != nil { - t.Errorf("failed to get slice from pool: %v", err) - } - } - - wg.Wait() - - // failed to get a slice due to ctx cancelled - ctx, cancel := context.WithCancel(context.Background()) - cancel() - bs, err = pool.Get(ctx) - if err == nil { - pool.Put(bs) - t.Errorf("expectd no slice to be returned") - } - - if e, a := 2, len(pool.slices); e != a { - t.Errorf("expect pool size to be %v, got %v", e, a) - } - - pool.Close() -} diff --git a/feature/s3/transfermanager/putobject_test.go b/feature/s3/transfermanager/putobject_test.go deleted file mode 100644 index 06cd0ebe149..00000000000 --- a/feature/s3/transfermanager/putobject_test.go +++ /dev/null @@ -1,904 +0,0 @@ -package transfermanager - -import ( - "bytes" - "context" - "fmt" - "io" - "io/ioutil" - "net/http" - "net/http/httptest" - "os" - "reflect" - "regexp" - "sort" - "strconv" - "strings" - "testing" - - "github.com/aws/aws-sdk-go-v2/aws" - "github.com/aws/aws-sdk-go-v2/aws/retry" - s3testing "github.com/aws/aws-sdk-go-v2/feature/s3/transfermanager/internal/testing" - "github.com/aws/aws-sdk-go-v2/internal/awstesting" - "github.com/aws/aws-sdk-go-v2/internal/sdk" - "github.com/aws/aws-sdk-go-v2/service/s3" - "github.com/aws/aws-sdk-go-v2/service/s3/types" -) - -// getReaderLength discards the bytes from reader and returns the length -func getReaderLength(r io.Reader) int64 { - n, _ := io.Copy(ioutil.Discard, r) - return n -} - -func TestUploadOrderMulti(t *testing.T) { - c, invocations, args := s3testing.NewUploadLoggingClient(nil) - mgr := New(c, Options{}) - - resp, err := mgr.PutObject(context.Background(), &PutObjectInput{ - Bucket: "Bucket", - Key: "Key - value", - Body: bytes.NewReader(buf20MB), - ServerSideEncryption: "aws:kms", - SSEKMSKeyID: "KmsId", - ContentType: "content/type", - }) - - if err != nil { - t.Errorf("Expected no error but received %v", err) - } - - if diff := cmpDiff([]string{"CreateMultipartUpload", "UploadPart", "UploadPart", "UploadPart", "CompleteMultipartUpload"}, *invocations); len(diff) > 0 { - t.Errorf(diff) - } - - if "UPLOAD-ID" != resp.UploadID { - t.Errorf("expect %q, got %q", "UPLOAD-ID", resp.UploadID) - } - - if "VERSION-ID" != resp.VersionID { - t.Errorf("expect %q, got %q", "VERSION-ID", resp.VersionID) - } - - // Validate input values - - // UploadPart - for i := 1; i < 4; i++ { - v := aws.ToString((*args)[i].(*s3.UploadPartInput).UploadId) - if "UPLOAD-ID" != v { - t.Errorf("Expected %q, but received %q", "UPLOAD-ID", v) - } - } - - // CompleteMultipartUpload - v := aws.ToString((*args)[4].(*s3.CompleteMultipartUploadInput).UploadId) - if "UPLOAD-ID" != v { - t.Errorf("Expected %q, but received %q", "UPLOAD-ID", v) - } - - parts := (*args)[4].(*s3.CompleteMultipartUploadInput).MultipartUpload.Parts - - for i := 0; i < 3; i++ { - num := parts[i].PartNumber - etag := aws.ToString(parts[i].ETag) - - if int32(i+1) != aws.ToInt32(num) { - t.Errorf("expect %d, got %d", i+1, num) - } - - if matched, err := regexp.MatchString(`^ETAG\d+$`, etag); !matched || err != nil { - t.Errorf("Failed regexp expression `^ETAG\\d+$`") - } - } - - // Custom headers - cmu := (*args)[0].(*s3.CreateMultipartUploadInput) - - if e, a := types.ServerSideEncryption("aws:kms"), cmu.ServerSideEncryption; e != a { - t.Errorf("expect %q, got %q", e, a) - } - - if e, a := "KmsId", aws.ToString(cmu.SSEKMSKeyId); e != a { - t.Errorf("expect %q, got %q", e, a) - } - - if e, a := "content/type", aws.ToString(cmu.ContentType); e != a { - t.Errorf("expect %q, got %q", e, a) - } -} - -func TestUploadOrderMultiDifferentPartSize(t *testing.T) { - c, ops, args := s3testing.NewUploadLoggingClient(nil) - mgr := New(c, Options{ - PartSizeBytes: 1024 * 1024 * 11, - Concurrency: 1, - }) - - _, err := mgr.PutObject(context.Background(), &PutObjectInput{ - Bucket: "Bucket", - Key: "Key", - Body: bytes.NewReader(buf20MB), - }) - - if err != nil { - t.Errorf("expect no error, got %v", err) - } - - vals := []string{"CreateMultipartUpload", "UploadPart", "UploadPart", "CompleteMultipartUpload"} - if !reflect.DeepEqual(vals, *ops) { - t.Errorf("expect %v, got %v", vals, *ops) - } - - // Part lengths - if len := getReaderLength((*args)[1].(*s3.UploadPartInput).Body); 1024*1024*11 != len { - t.Errorf("expect %d, got %d", 1024*1024*7, len) - } - if len := getReaderLength((*args)[2].(*s3.UploadPartInput).Body); 1024*1024*9 != len { - t.Errorf("expect %d, got %d", 1024*1024*5, len) - } -} - -func TestUploadFailIfPartSizeTooSmall(t *testing.T) { - mgr := New(s3.New(s3.Options{}), Options{}, - func(o *Options) { - o.PartSizeBytes = 5 - }) - resp, err := mgr.PutObject(context.Background(), &PutObjectInput{ - Bucket: "Bucket", - Key: "Key", - Body: bytes.NewReader(buf20MB), - }) - - if resp != nil { - t.Errorf("Expected response to be nil, but received %v", resp) - } - - if err == nil { - t.Errorf("Expected error, but received nil") - } - if e, a := "part size must be at least", err.Error(); !strings.Contains(a, e) { - t.Errorf("expect %v to be in %v", e, a) - } -} - -func TestUploadOrderSingle(t *testing.T) { - c, invocations, params := s3testing.NewUploadLoggingClient(nil) - mgr := New(c, Options{}) - resp, err := mgr.PutObject(context.Background(), &PutObjectInput{ - Bucket: "Bucket", - Key: "Key - value", - Body: bytes.NewReader(buf2MB), - ServerSideEncryption: "aws:kms", - SSEKMSKeyID: "KmsId", - ContentType: "content/type", - }) - - if err != nil { - t.Errorf("expect no error but received %v", err) - } - - if diff := cmpDiff([]string{"PutObject"}, *invocations); len(diff) > 0 { - t.Error(diff) - } - - if e := "VERSION-ID"; e != resp.VersionID { - t.Errorf("expect %q, got %q", e, resp.VersionID) - } - - if len(resp.UploadID) > 0 { - t.Errorf("expect empty string, got %q", resp.UploadID) - } - - putObjectInput := (*params)[0].(*s3.PutObjectInput) - - if e, a := types.ServerSideEncryption("aws:kms"), putObjectInput.ServerSideEncryption; e != a { - t.Errorf("expect %q, got %q", e, a) - } - - if e, a := "KmsId", aws.ToString(putObjectInput.SSEKMSKeyId); e != a { - t.Errorf("expect %q, got %q", e, a) - } - - if e, a := "content/type", aws.ToString(putObjectInput.ContentType); e != a { - t.Errorf("Expected %q, but received %q", e, a) - } -} - -func TestUploadSingleFailure(t *testing.T) { - c, invocations, _ := s3testing.NewUploadLoggingClient(nil) - - c.PutObjectFn = func(*s3testing.UploadLoggingClient, *s3.PutObjectInput) (*s3.PutObjectOutput, error) { - return nil, fmt.Errorf("put object failure") - } - - mgr := New(c, Options{}) - resp, err := mgr.PutObject(context.Background(), &PutObjectInput{ - Bucket: "Bucket", - Key: "Key", - Body: bytes.NewReader(buf2MB), - }) - - if err == nil { - t.Error("expect error, got nil") - } - - if diff := cmpDiff([]string{"PutObject"}, *invocations); len(diff) > 0 { - t.Error(diff) - } - - if resp != nil { - t.Errorf("expect response to be nil, got %v", resp) - } -} - -func TestUploadOrderZero(t *testing.T) { - c, invocations, params := s3testing.NewUploadLoggingClient(nil) - mgr := New(c, Options{}) - resp, err := mgr.PutObject(context.Background(), &PutObjectInput{ - Bucket: "Bucket", - Key: "Key", - Body: bytes.NewReader(make([]byte, 0)), - }) - - if err != nil { - t.Errorf("expect no error, got %v", err) - } - - if diff := cmpDiff([]string{"PutObject"}, *invocations); len(diff) > 0 { - t.Error(diff) - } - - if len(resp.UploadID) > 0 { - t.Errorf("expect empty string, got %q", resp.UploadID) - } - - if e, a := int64(0), getReaderLength((*params)[0].(*s3.PutObjectInput).Body); e != a { - t.Errorf("Expected %d, but received %d", e, a) - } -} - -func TestUploadOrderMultiFailure(t *testing.T) { - c, invocations, _ := s3testing.NewUploadLoggingClient(nil) - - c.UploadPartFn = func(u *s3testing.UploadLoggingClient, params *s3.UploadPartInput) (*s3.UploadPartOutput, error) { - if *params.PartNumber == 2 { - return nil, fmt.Errorf("an unexpected error") - } - return &s3.UploadPartOutput{ETag: aws.String(fmt.Sprintf("ETAG%d", u.PartNum))}, nil - } - - mgr := New(c, Options{}, func(o *Options) { - o.Concurrency = 1 - }) - _, err := mgr.PutObject(context.Background(), &PutObjectInput{ - Bucket: "Bucket", - Key: "Key", - Body: bytes.NewReader(buf20MB), - }) - - if err == nil { - t.Error("expect error, got nil") - } - - if diff := cmpDiff([]string{"CreateMultipartUpload", "UploadPart", "UploadPart", "AbortMultipartUpload"}, *invocations); len(diff) > 0 { - t.Error(diff) - } -} - -func TestUploadOrderMultiFailureOnComplete(t *testing.T) { - c, invocations, _ := s3testing.NewUploadLoggingClient(nil) - - c.CompleteMultipartUploadFn = func(*s3testing.UploadLoggingClient, *s3.CompleteMultipartUploadInput) (*s3.CompleteMultipartUploadOutput, error) { - return nil, fmt.Errorf("complete multipart error") - } - - mgr := New(c, Options{}, func(o *Options) { - o.Concurrency = 1 - }) - _, err := mgr.PutObject(context.Background(), &PutObjectInput{ - Bucket: "Bucket", - Key: "Key", - Body: bytes.NewReader(buf20MB), - }) - - if err == nil { - t.Error("expect error, got nil") - } - - if diff := cmpDiff([]string{"CreateMultipartUpload", "UploadPart", "UploadPart", "UploadPart", - "CompleteMultipartUpload", "AbortMultipartUpload"}, *invocations); len(diff) > 0 { - t.Error(diff) - } -} - -func TestUploadOrderMultiFailureOnCreate(t *testing.T) { - c, invocations, _ := s3testing.NewUploadLoggingClient(nil) - - c.CreateMultipartUploadFn = func(*s3testing.UploadLoggingClient, *s3.CreateMultipartUploadInput) (*s3.CreateMultipartUploadOutput, error) { - return nil, fmt.Errorf("create multipart upload failure") - } - - mgr := New(c, Options{}) - _, err := mgr.PutObject(context.Background(), &PutObjectInput{ - Bucket: "Bucket", - Key: "Key", - Body: bytes.NewReader(make([]byte, 1024*1024*12)), - }) - - if err == nil { - t.Error("expect error, got nil") - } - - if diff := cmpDiff([]string{"CreateMultipartUpload"}, *invocations); len(diff) > 0 { - t.Error(diff) - } -} - -type failreader struct { - times int - failCount int -} - -func (f *failreader) Read(b []byte) (int, error) { - f.failCount++ - if f.failCount >= f.times { - return 0, fmt.Errorf("random failure") - } - return len(b), nil -} - -func TestUploadOrderReadFail1(t *testing.T) { - c, invocations, _ := s3testing.NewUploadLoggingClient(nil) - mgr := New(c, Options{}) - _, err := mgr.PutObject(context.Background(), &PutObjectInput{ - Bucket: "Bucket", - Key: "Key", - Body: &failreader{times: 1}, - }) - if err == nil { - t.Fatalf("expect error to not be nil") - } - - if e, a := "random failure", err.Error(); !strings.Contains(a, e) { - t.Errorf("expect %v, got %v", e, a) - } - - if diff := cmpDiff([]string(nil), *invocations); len(diff) > 0 { - t.Error(diff) - } -} - -func TestUploadOrderReadFail2(t *testing.T) { - c, invocations, _ := s3testing.NewUploadLoggingClient([]string{"UploadPart"}) - mgr := New(c, Options{}, func(o *Options) { - o.Concurrency = 1 - }) - _, err := mgr.PutObject(context.Background(), &PutObjectInput{ - Bucket: "Bucket", - Key: "Key", - Body: &failreader{times: 2}, - }) - if err == nil { - t.Fatalf("expect error to not be nil") - } - - if e, a := "random failure", err.Error(); !strings.Contains(a, e) { - t.Errorf("expect %v, got %q", e, a) - } - - if diff := cmpDiff([]string{"CreateMultipartUpload", "AbortMultipartUpload"}, *invocations); len(diff) > 0 { - t.Error(diff) - } -} - -type sizedReader struct { - size int - cur int - err error -} - -func (s *sizedReader) Read(p []byte) (n int, err error) { - if s.cur >= s.size { - if s.err == nil { - s.err = io.EOF - } - return 0, s.err - } - - n = len(p) - s.cur += len(p) - if s.cur > s.size { - n -= s.cur - s.size - } - - return n, err -} - -func TestUploadOrderMultiBufferedReader(t *testing.T) { - c, invocations, params := s3testing.NewUploadLoggingClient(nil) - mgr := New(c, Options{}) - _, err := mgr.PutObject(context.Background(), &PutObjectInput{ - Bucket: "Bucket", - Key: "Key", - Body: &sizedReader{size: 1024 * 1024 * 21}, - }) - if err != nil { - t.Errorf("expect no error, got %v", err) - } - - if diff := cmpDiff([]string{"CreateMultipartUpload", "UploadPart", "UploadPart", - "UploadPart", "CompleteMultipartUpload"}, *invocations); len(diff) > 0 { - t.Error(diff) - } - - // Part lengths - var parts []int64 - for i := 1; i <= 3; i++ { - parts = append(parts, getReaderLength((*params)[i].(*s3.UploadPartInput).Body)) - } - sort.Slice(parts, func(i, j int) bool { - return parts[i] < parts[j] - }) - - if diff := cmpDiff([]int64{1024 * 1024 * 5, 1024 * 1024 * 8, 1024 * 1024 * 8}, parts); len(diff) > 0 { - t.Error(diff) - } -} - -func TestUploadOrderMultiBufferedReaderPartial(t *testing.T) { - c, invocations, params := s3testing.NewUploadLoggingClient(nil) - mgr := New(c, Options{}) - _, err := mgr.PutObject(context.Background(), &PutObjectInput{ - Bucket: "Bucket", - Key: "Key", - Body: &sizedReader{size: 1024 * 1024 * 21, err: io.EOF}, - }) - if err != nil { - t.Errorf("expect no error, got %v", err) - } - - if diff := cmpDiff([]string{"CreateMultipartUpload", "UploadPart", "UploadPart", - "UploadPart", "CompleteMultipartUpload"}, *invocations); len(diff) > 0 { - t.Error(diff) - } - - // Part lengths - var parts []int64 - for i := 1; i <= 3; i++ { - parts = append(parts, getReaderLength((*params)[i].(*s3.UploadPartInput).Body)) - } - sort.Slice(parts, func(i, j int) bool { - return parts[i] < parts[j] - }) - - if diff := cmpDiff([]int64{1024 * 1024 * 5, 1024 * 1024 * 8, 1024 * 1024 * 8}, parts); len(diff) > 0 { - t.Error(diff) - } -} - -// TestUploadOrderMultiBufferedReaderEOF tests the edge case where the -// file size is the same as part size. -func TestUploadOrderMultiBufferedReaderEOF(t *testing.T) { - c, invocations, params := s3testing.NewUploadLoggingClient(nil) - mgr := New(c, Options{}) - _, err := mgr.PutObject(context.Background(), &PutObjectInput{ - Bucket: "Bucket", - Key: "Key", - Body: &sizedReader{size: 1024 * 1024 * 16, err: io.EOF}, - }) - - if err != nil { - t.Errorf("expect no error, got %v", err) - } - - if diff := cmpDiff([]string{"CreateMultipartUpload", "UploadPart", "UploadPart", "CompleteMultipartUpload"}, *invocations); len(diff) > 0 { - t.Error(diff) - } - - // Part lengths - var parts []int64 - for i := 1; i <= 2; i++ { - parts = append(parts, getReaderLength((*params)[i].(*s3.UploadPartInput).Body)) - } - sort.Slice(parts, func(i, j int) bool { - return parts[i] < parts[j] - }) - - if diff := cmpDiff([]int64{1024 * 1024 * 8, 1024 * 1024 * 8}, parts); len(diff) > 0 { - t.Error(diff) - } -} - -func TestUploadOrderSingleBufferedReader(t *testing.T) { - c, invocations, _ := s3testing.NewUploadLoggingClient(nil) - mgr := New(c, Options{}) - resp, err := mgr.PutObject(context.Background(), &PutObjectInput{ - Bucket: "Bucket", - Key: "Key", - Body: &sizedReader{size: 1024 * 1024 * 2}, - }) - - if err != nil { - t.Errorf("expect no error, got %v", err) - } - - if diff := cmpDiff([]string{"PutObject"}, *invocations); len(diff) > 0 { - t.Error(diff) - } - - if len(resp.UploadID) > 0 { - t.Errorf("expect no value, got %q", resp.UploadID) - } -} - -func TestUploadZeroLenObject(t *testing.T) { - c, invocations, _ := s3testing.NewUploadLoggingClient(nil) - - mgr := New(c, Options{}) - resp, err := mgr.PutObject(context.Background(), &PutObjectInput{ - Bucket: "Bucket", - Key: "Key", - Body: strings.NewReader(""), - }) - - if err != nil { - t.Errorf("expect no error but received %v", err) - } - if diff := cmpDiff([]string{"PutObject"}, *invocations); len(diff) > 0 { - t.Errorf("expect request to have been made, but was not, %v", diff) - } - - if len(resp.UploadID) > 0 { - t.Errorf("expect empty string, but received %q", resp.UploadID) - } -} - -type testIncompleteReader struct { - Size int64 - read int64 -} - -func (r *testIncompleteReader) Read(p []byte) (n int, err error) { - r.read += int64(len(p)) - if r.read >= r.Size { - return int(r.read - r.Size), io.ErrUnexpectedEOF - } - return len(p), nil -} - -func TestUploadUnexpectedEOF(t *testing.T) { - c, invocations, _ := s3testing.NewUploadLoggingClient(nil) - mgr := New(c, Options{}, func(o *Options) { - o.Concurrency = 1 - o.PartSizeBytes = minPartSizeBytes - }) - _, err := mgr.PutObject(context.Background(), &PutObjectInput{ - Bucket: "Bucket", - Key: "Key", - Body: &testIncompleteReader{ - Size: minPartSizeBytes + 1, - }, - }) - if err == nil { - t.Error("expect error, got nil") - } - - // Ensure upload started. - if e, a := "CreateMultipartUpload", (*invocations)[0]; e != a { - t.Errorf("expect %q, got %q", e, a) - } - - // Part may or may not be sent because of timing of sending parts and - // reading next part in upload manager. Just check for the last abort. - if e, a := "AbortMultipartUpload", (*invocations)[len(*invocations)-1]; e != a { - t.Errorf("expect %q, got %q", e, a) - } -} - -func TestSSE(t *testing.T) { - c, _, _ := s3testing.NewUploadLoggingClient(nil) - c.UploadPartFn = func(u *s3testing.UploadLoggingClient, params *s3.UploadPartInput) (*s3.UploadPartOutput, error) { - if params.SSECustomerAlgorithm == nil { - t.Fatal("SSECustomerAlgoritm should not be nil") - } - if params.SSECustomerKey == nil { - t.Fatal("SSECustomerKey should not be nil") - } - return &s3.UploadPartOutput{ETag: aws.String(fmt.Sprintf("ETAG%d", u.PartNum))}, nil - } - - mgr := New(c, Options{}, func(o *Options) { - o.Concurrency = 5 - }) - - _, err := mgr.PutObject(context.Background(), &PutObjectInput{ - Bucket: "Bucket", - Key: "Key", - SSECustomerAlgorithm: "AES256", - SSECustomerKey: "foo", - Body: bytes.NewBuffer(make([]byte, 1024*1024*10)), - }) - - if err != nil { - t.Fatal("Expected no error, but received" + err.Error()) - } -} - -func TestUploadWithContextCanceled(t *testing.T) { - c := s3.New(s3.Options{ - UsePathStyle: true, - Region: "mock-region", - }) - u := New(c, Options{}) - - ctx := &awstesting.FakeContext{DoneCh: make(chan struct{})} - ctx.Error = fmt.Errorf("context canceled") - close(ctx.DoneCh) - - _, err := u.PutObject(ctx, &PutObjectInput{ - Bucket: "Bucket", - Key: "Key", - Body: bytes.NewReader(make([]byte, 0)), - }) - if err == nil { - t.Fatalf("expect error, got nil") - } - - if e, a := "canceled", err.Error(); !strings.Contains(a, e) { - t.Errorf("expected error message to contain %q, but did not %q", e, a) - } -} - -func TestUploadRetry(t *testing.T) { - const part, retries = 3, 10 - testFile, testFileCleanup, err := createTempFile(t, minPartSizeBytes*part) - if err != nil { - t.Fatalf("failed to create test file, %v", err) - } - defer testFileCleanup(t) - - cases := map[string]struct { - Body io.Reader - PartHandlers func(testing.TB) []http.Handler - }{ - "bytes.Buffer": { - Body: bytes.NewBuffer(make([]byte, minPartSizeBytes*part)), - PartHandlers: func(tb testing.TB) []http.Handler { - return buildFailHandlers(tb, part, retries) - }, - }, - "bytes.Reader": { - Body: bytes.NewReader(make([]byte, minPartSizeBytes*part)), - PartHandlers: func(tb testing.TB) []http.Handler { - return buildFailHandlers(tb, part, retries) - }, - }, - "os.File": { - Body: testFile, - PartHandlers: func(tb testing.TB) []http.Handler { - return buildFailHandlers(tb, part, retries) - }, - }, - } - - for name, c := range cases { - t.Run(name, func(t *testing.T) { - restoreSleep := sdk.TestingUseNopSleep() - defer restoreSleep() - - mux := newMockS3UploadServer(t, c.PartHandlers(t)) - server := httptest.NewServer(mux) - defer server.Close() - - client := s3.New(s3.Options{ - EndpointResolverV2: s3testing.EndpointResolverV2{URL: server.URL}, - UsePathStyle: true, - Retryer: retry.NewStandard(func(o *retry.StandardOptions) { - o.MaxAttempts = retries + 1 - }), - }) - - uploader := New(client, Options{}) - _, err := uploader.PutObject(context.Background(), &PutObjectInput{ - Bucket: "bucket", - Key: "key", - Body: c.Body, - }) - - if err != nil { - t.Fatalf("expect no error, got %v", err) - } - }) - } -} - -func newMockS3UploadServer(tb testing.TB, partHandler []http.Handler) *mockS3UploadServer { - s := &mockS3UploadServer{ - ServeMux: http.NewServeMux(), - partHandlers: partHandler, - tb: tb, - } - - s.HandleFunc("/", s.handleRequest) - - return s -} - -func buildFailHandlers(tb testing.TB, part, retry int) []http.Handler { - handlers := make([]http.Handler, part) - - for i := 0; i < part; i++ { - handlers[i] = &failPartHandler{ - tb: tb, - failLeft: retry, - successPartHandler: &successPartHandler{tb: tb}, - } - } - - return handlers -} - -type mockS3UploadServer struct { - *http.ServeMux - - tb testing.TB - partHandlers []http.Handler -} - -func (s mockS3UploadServer) handleRequest(w http.ResponseWriter, r *http.Request) { - defer func() { - if err := r.Body.Close(); err != nil { - failRequest(w, 0, "BodyCloseError", fmt.Sprintf("request body close error: %v", err)) - } - }() - - _, hasUploads := r.URL.Query()["uploads"] - - switch { - case r.Method == "POST" && hasUploads: - // CreateMultipartUpload request - w.Header().Set("Content-Length", strconv.Itoa(len(createUploadResp))) - w.Write([]byte(createUploadResp)) - case r.Method == "PUT": - partStr := r.URL.Query().Get("partNumber") - part, err := strconv.ParseInt(partStr, 10, 64) - if err != nil { - failRequest(w, 400, "BadRequest", fmt.Sprintf("unable to parse partNumber, %q, %v", partStr, err)) - return - } - if part <= 0 || part > int64(len(s.partHandlers)) { - failRequest(w, 400, "BadRequest", fmt.Sprintf("invalid partNumber %v", part)) - return - } - s.partHandlers[part-1].ServeHTTP(w, r) - case r.Method == "POST": - // CompleteMultipartUpload request - w.Header().Set("Content-Length", strconv.Itoa(len(completeUploadResp))) - w.Write([]byte(completeUploadResp)) - case r.Method == "DELETE": - w.Header().Set("Content-Length", strconv.Itoa(len(abortUploadResp))) - w.Write([]byte(abortUploadResp)) - w.WriteHeader(200) - default: - failRequest(w, 400, "BadRequest", fmt.Sprintf("invalid request %v %v", r.Method, r.URL)) - } -} - -func createTempFile(t *testing.T, size int64) (*os.File, func(*testing.T), error) { - file, err := ioutil.TempFile(os.TempDir(), aws.SDKName+t.Name()) - if err != nil { - return nil, nil, err - } - filename := file.Name() - if err := file.Truncate(size); err != nil { - return nil, nil, err - } - - return file, - func(t *testing.T) { - if err := file.Close(); err != nil { - t.Errorf("failed to close temp file, %s, %v", filename, err) - } - if err := os.Remove(filename); err != nil { - t.Errorf("failed to remove temp file, %s, %v", filename, err) - } - }, - nil -} - -type failPartHandler struct { - tb testing.TB - failLeft int - successPartHandler http.Handler -} - -func (h *failPartHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - defer func() { - if err := r.Body.Close(); err != nil { - failRequest(w, 0, "BodyCloseError", fmt.Sprintf("request body close error: %v", err)) - } - }() - - if h.failLeft == 0 && h.successPartHandler != nil { - h.successPartHandler.ServeHTTP(w, r) - return - } - - io.Copy(ioutil.Discard, r.Body) - failRequest(w, 500, "InternalException", fmt.Sprintf("mock error, partNumber %v", r.URL.Query().Get("partNumber"))) - h.failLeft-- -} - -func failRequest(w http.ResponseWriter, status int, code, msg string) { - msg = fmt.Sprintf(baseRequestErrorResp, code, msg) - w.Header().Set("Content-Length", strconv.Itoa(len(msg))) - w.WriteHeader(status) - w.Write([]byte(msg)) -} - -type successPartHandler struct { - tb testing.TB -} - -func (h *successPartHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - defer func() { - if err := r.Body.Close(); err != nil { - failRequest(w, 0, "BodyCloseError", fmt.Sprintf("request body close error: %v", err)) - } - }() - - n, err := io.Copy(ioutil.Discard, r.Body) - if err != nil { - failRequest(w, 400, "BadRequest", fmt.Sprintf("failed to read body, %v", err)) - return - } - contentLength := r.Header.Get("Content-Length") - expectLength, err := strconv.ParseInt(contentLength, 10, 64) - if err != nil { - h.tb.Logf("expect content-length, got %q, %v", contentLength, err) - failRequest(w, 400, "BadRequest", fmt.Sprintf("unable to get content-length %v", err)) - return - } - - if e, a := expectLength, n; e != a { - h.tb.Logf("expect content-length to be %v, got %v", e, a) - failRequest(w, 400, "BadRequest", fmt.Sprintf("content-length and body do not match, %v, %v", e, a)) - return - } - - w.Header().Set("Content-Length", strconv.Itoa(len(uploadPartResp))) - w.Write([]byte(uploadPartResp)) -} - -const createUploadResp = ` - bucket - key - abc123 -` - -const uploadPartResp = ` - key -` -const baseRequestErrorResp = ` - %s - %s - request-id - host-id -` - -const completeUploadResp = ` - bucket - key - key - https://bucket.us-west-2.amazonaws.com/key - abc123 -` - -const abortUploadResp = `` - -func cmpDiff(e, a interface{}) string { - if !reflect.DeepEqual(e, a) { - return fmt.Sprintf("%v != %v", e, a) - } - return "" -} diff --git a/feature/s3/transfermanager/rrdns.go b/feature/s3/transfermanager/rrdns.go deleted file mode 100644 index 7fac93a17df..00000000000 --- a/feature/s3/transfermanager/rrdns.go +++ /dev/null @@ -1,160 +0,0 @@ -package transfermanager - -import ( - "context" - "fmt" - "net" - "net/http" - "sync" - "time" - - "github.com/aws/aws-sdk-go-v2/internal/sync/singleflight" -) - -var timeNow = time.Now - -// WithRoundRobinDNS configures an http.Transport to spread HTTP connections -// across multiple IP addresses for a given host. -// -// This is recommended by the [S3 performance guide] in high-concurrency -// application environments. -// -// WithRoundRobinDNS wraps the underlying DialContext hook on http.Transport. -// Future modifications to this hook MUST preserve said wrapping in order for -// round-robin DNS to operate. -// -// [S3 performance guide]: https://docs.aws.amazon.com/AmazonS3/latest/userguide/optimizing-performance-design-patterns.html -func WithRoundRobinDNS(opts ...func(*RoundRobinDNSOptions)) func(*http.Transport) { - options := &RoundRobinDNSOptions{ - TTL: 30 * time.Second, - MaxHosts: 100, - } - for _, opt := range opts { - opt(options) - } - - return func(t *http.Transport) { - rr := &rrDNS{ - cache: newDNSCache(options.MaxHosts), - expiry: options.TTL, - resolver: &net.Resolver{}, - dialContext: t.DialContext, - } - t.DialContext = rr.DialContext - } -} - -// RoundRobinDNSOptions configures use of round-robin DNS. -type RoundRobinDNSOptions struct { - // The length of time for which the results of a DNS query are valid. - TTL time.Duration - - // A limit to the number of DNS query results, cached by hostname, which are - // stored. Round-robin DNS uses an LRU cache. - MaxHosts int -} - -type resolver interface { - LookupHost(context.Context, string) ([]string, error) -} - -type rrDNS struct { - sf singleflight.Group - cache *dnsCache - - expiry time.Duration - resolver resolver - - dialContext func(ctx context.Context, network, addr string) (net.Conn, error) -} - -// DialContext implements the DialContext hook used by http.Transport, -// pre-caching IP addresses for a given host and distributing them evenly -// across new connections. -func (r *rrDNS) DialContext(ctx context.Context, network, addr string) (net.Conn, error) { - host, port, err := net.SplitHostPort(addr) - if err != nil { - return nil, fmt.Errorf("rrdns split host/port: %w", err) - } - - ipaddr, err := r.getAddr(ctx, host) - if err != nil { - return nil, fmt.Errorf("rrdns lookup host: %w", err) - } - - return r.dialContext(ctx, network, net.JoinHostPort(ipaddr, port)) -} - -func (r *rrDNS) getAddr(ctx context.Context, host string) (string, error) { - addr, ok := r.cache.GetAddr(host) - if ok { - return addr, nil - } - return r.lookupHost(ctx, host) -} - -func (r *rrDNS) lookupHost(ctx context.Context, host string) (string, error) { - ch := r.sf.DoChan(host, func() (interface{}, error) { - addrs, err := r.resolver.LookupHost(ctx, host) - if err != nil { - return nil, err - } - - expires := timeNow().Add(r.expiry) - r.cache.PutAddrs(host, addrs, expires) - return nil, nil - }) - - select { - case result := <-ch: - if result.Err != nil { - return "", result.Err - } - - addr, _ := r.cache.GetAddr(host) - return addr, nil - case <-ctx.Done(): - return "", ctx.Err() - } -} - -// WithRotoDialer configures an http.Transport to cycle through multiple local -// network addresses when creating new HTTP connections. -// -// WithRotoDialer REPLACES the root DialContext hook on the underlying -// Transport, thereby destroying any previously-applied wrappings around it. If -// the caller needs to apply additional decorations to the DialContext hook, -// they must do so after applying WithRotoDialer. -func WithRotoDialer(addrs []net.Addr) func(*http.Transport) { - return func(t *http.Transport) { - var dialers []*net.Dialer - for _, addr := range addrs { - dialers = append(dialers, &net.Dialer{ - LocalAddr: addr, - }) - } - - t.DialContext = (&rotoDialer{ - dialers: dialers, - }).DialContext - } -} - -type rotoDialer struct { - mu sync.Mutex - dialers []*net.Dialer - index int -} - -func (r *rotoDialer) DialContext(ctx context.Context, network, addr string) (net.Conn, error) { - return r.next().DialContext(ctx, network, addr) -} - -func (r *rotoDialer) next() *net.Dialer { - r.mu.Lock() - defer r.mu.Unlock() - - d := r.dialers[r.index] - r.index = (r.index + 1) % len(r.dialers) - return d -} diff --git a/feature/s3/transfermanager/rrdns_test.go b/feature/s3/transfermanager/rrdns_test.go deleted file mode 100644 index c6e31c0cb28..00000000000 --- a/feature/s3/transfermanager/rrdns_test.go +++ /dev/null @@ -1,166 +0,0 @@ -package transfermanager - -import ( - "context" - "errors" - "net" - "testing" - "time" -) - -// these tests also cover the cache impl (cycling+expiry+evict) - -type mockNow struct { - now time.Time -} - -func (m *mockNow) Now() time.Time { - return m.now -} - -func (m *mockNow) Add(d time.Duration) { - m.now = m.now.Add(d) -} - -func useMockNow(m *mockNow) func() { - timeNow = m.Now - return func() { - timeNow = time.Now - } -} - -var errDialContextOK = errors.New("dial context ok") - -type mockResolver struct { - addrs map[string][]string - err error -} - -func (m *mockResolver) LookupHost(ctx context.Context, host string) ([]string, error) { - return m.addrs[host], m.err -} - -type mockDialContext struct { - calledWith string -} - -func (m *mockDialContext) DialContext(ctx context.Context, network, addr string) (net.Conn, error) { - m.calledWith = addr - return nil, errDialContextOK -} - -func TestRoundRobinDNS_CycleIPs(t *testing.T) { - restore := useMockNow(&mockNow{}) - defer restore() - - addrs := []string{"0.0.0.1", "0.0.0.2", "0.0.0.3"} - r := &mockResolver{ - addrs: map[string][]string{ - "s3.us-east-1.amazonaws.com": addrs, - }, - } - dc := &mockDialContext{} - - rr := &rrDNS{ - cache: newDNSCache(1), - resolver: r, - dialContext: dc.DialContext, - } - - expectDialContext(t, rr, dc, "s3.us-east-1.amazonaws.com", addrs[0]) - expectDialContext(t, rr, dc, "s3.us-east-1.amazonaws.com", addrs[1]) - expectDialContext(t, rr, dc, "s3.us-east-1.amazonaws.com", addrs[2]) - expectDialContext(t, rr, dc, "s3.us-east-1.amazonaws.com", addrs[0]) -} - -func TestRoundRobinDNS_MultiIP(t *testing.T) { - restore := useMockNow(&mockNow{}) - defer restore() - - r := &mockResolver{ - addrs: map[string][]string{ - "host1.com": {"0.0.0.1", "0.0.0.2", "0.0.0.3"}, - "host2.com": {"1.0.0.1", "1.0.0.2", "1.0.0.3"}, - }, - } - dc := &mockDialContext{} - - rr := &rrDNS{ - cache: newDNSCache(2), - resolver: r, - dialContext: dc.DialContext, - } - - expectDialContext(t, rr, dc, "host1.com", r.addrs["host1.com"][0]) - expectDialContext(t, rr, dc, "host2.com", r.addrs["host2.com"][0]) - expectDialContext(t, rr, dc, "host1.com", r.addrs["host1.com"][1]) - expectDialContext(t, rr, dc, "host2.com", r.addrs["host2.com"][1]) -} - -func TestRoundRobinDNS_MaxHosts(t *testing.T) { - restore := useMockNow(&mockNow{}) - defer restore() - - r := &mockResolver{ - addrs: map[string][]string{ - "host1.com": {"0.0.0.1", "0.0.0.2", "0.0.0.3"}, - "host2.com": {"0.0.0.1", "0.0.0.2", "0.0.0.3"}, - }, - } - dc := &mockDialContext{} - - rr := &rrDNS{ - cache: newDNSCache(1), - resolver: r, - dialContext: dc.DialContext, - } - - expectDialContext(t, rr, dc, "host1.com", r.addrs["host1.com"][0]) - expectDialContext(t, rr, dc, "host1.com", r.addrs["host1.com"][1]) - expectDialContext(t, rr, dc, "host2.com", r.addrs["host2.com"][0]) // evicts host1 - expectDialContext(t, rr, dc, "host1.com", r.addrs["host1.com"][0]) // evicts host2 - expectDialContext(t, rr, dc, "host2.com", r.addrs["host2.com"][0]) -} - -func TestRoundRobinDNS_Expires(t *testing.T) { - now := &mockNow{time.Unix(0, 0)} - restore := useMockNow(now) - defer restore() - - r := &mockResolver{ - addrs: map[string][]string{ - "host1.com": {"0.0.0.1", "0.0.0.2", "0.0.0.3"}, - }, - } - dc := &mockDialContext{} - - rr := &rrDNS{ - cache: newDNSCache(2), - expiry: 30, - resolver: r, - dialContext: dc.DialContext, - } - - expectDialContext(t, rr, dc, "host1.com", r.addrs["host1.com"][0]) - now.Add(16) // hasn't expired - expectDialContext(t, rr, dc, "host1.com", r.addrs["host1.com"][1]) - now.Add(16) // expired, starts over - expectDialContext(t, rr, dc, "host1.com", r.addrs["host1.com"][0]) -} - -func expectDialContext(t *testing.T, rr *rrDNS, dc *mockDialContext, host, expect string) { - const port = "443" - - t.Helper() - _, err := rr.DialContext(context.Background(), "", net.JoinHostPort(host, port)) - if err != errDialContextOK { - t.Errorf("expect sentinel err, got %v", err) - } - actual, _, err := net.SplitHostPort(dc.calledWith) - if err != nil { - t.Fatal(err) - } - if expect != actual { - t.Errorf("expect addr %s, got %s", expect, actual) - } -} diff --git a/feature/s3/transfermanager/setup_integ_test.go b/feature/s3/transfermanager/setup_integ_test.go deleted file mode 100644 index d8efb29a220..00000000000 --- a/feature/s3/transfermanager/setup_integ_test.go +++ /dev/null @@ -1,440 +0,0 @@ -//go:build integration -// +build integration - -package transfermanager - -import ( - "bytes" - "context" - "crypto/rand" - "crypto/tls" - "flag" - "fmt" - "io" - "io/ioutil" - "net/http" - "os" - "strings" - "testing" - "time" - - "github.com/aws/aws-sdk-go-v2/aws" - "github.com/aws/aws-sdk-go-v2/aws/arn" - "github.com/aws/aws-sdk-go-v2/config" - "github.com/aws/aws-sdk-go-v2/service/s3" - "github.com/aws/aws-sdk-go-v2/service/s3/types" - "github.com/aws/aws-sdk-go-v2/service/sts" -) - -var setupMetadata = struct { - AccountID string - Region string - Buckets struct { - Source struct { - Name string - ARN string - } - } -}{} - -// s3 client to use for integ testing -var s3Client *s3.Client - -// s3TransferManagerClient to use for integ testing -var s3TransferManagerClient *Client - -// sts client to use for integ testing -var stsClient *sts.Client - -// http client setting to use for integ testing -var httpClient *http.Client - -var region = "us-west-2" - -// large object buffer to test multipart upload -var largeObjectBuf []byte - -// TestMain executes at start of package tests -func TestMain(m *testing.M) { - flag.Parse() - flag.CommandLine.Visit(func(f *flag.Flag) { - if !(f.Name == "run" || f.Name == "test.run") { - return - } - value := f.Value.String() - if value == `NONE` { - os.Exit(0) - } - }) - - var result int - defer func() { - if r := recover(); r != nil { - fmt.Fprintln(os.Stderr, "S3 TransferManager integration tests panic,", r) - result = 1 - } - os.Exit(result) - }() - - var verifyTLS bool - var s3Endpoint string - - flag.StringVar(&s3Endpoint, "s3-endpoint", "", "integration endpoint for S3") - - flag.StringVar(&setupMetadata.AccountID, "account", "", "integration account id") - flag.BoolVar(&verifyTLS, "verify-tls", true, "verify server TLS certificate") - flag.Parse() - - httpClient = &http.Client{ - Transport: &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: verifyTLS}, - }, - } - - cfg, err := config.LoadDefaultConfig(context.Background(), - config.WithRegion(region)) - if err != nil { - fmt.Fprintf(os.Stderr, "Error occurred while loading config with region %v, %v", region, err) - result = 1 - return - } - - // assign the http client - cfg.HTTPClient = httpClient - - // create a s3 client - s3cfg := cfg.Copy() - if len(s3Endpoint) != 0 { - s3cfg.EndpointResolver = aws.EndpointResolverFunc(func(service, region string) (aws.Endpoint, error) { - return aws.Endpoint{ - URL: s3Endpoint, - PartitionID: "aws", - SigningName: "s3", - SigningRegion: region, - SigningMethod: "s3v4", - }, nil - }) - } - - // build s3 client from config - s3Client = s3.NewFromConfig(s3cfg) - - // build s3 transfermanager client from config - s3TransferManagerClient = NewFromConfig(s3Client, s3cfg) - - // build sts client from config - stsClient = sts.NewFromConfig(cfg) - - // context - ctx := context.Background() - - setupMetadata.AccountID, err = getAccountID(ctx) - if err != nil { - fmt.Fprintf(os.Stderr, "failed to get integration aws account id: %v\n", err) - result = 1 - return - } - - bucketCleanup, err := setupBuckets(ctx) - defer bucketCleanup() - if err != nil { - fmt.Fprintf(os.Stderr, "failed to setup integration test buckets: %v\n", err) - result = 1 - return - } - - largeObjectBuf = make([]byte, 20*1024*1024) - _, err = rand.Read(largeObjectBuf) - if err != nil { - fmt.Fprintf(os.Stderr, "failed to generate large object for multipart upload: %v\n", err) - result = 1 - return - } - - result = m.Run() -} - -// getAccountID retrieves account id -func getAccountID(ctx context.Context) (string, error) { - if len(setupMetadata.AccountID) != 0 { - return setupMetadata.AccountID, nil - } - identity, err := stsClient.GetCallerIdentity(ctx, nil) - if err != nil { - return "", fmt.Errorf("error fetching caller identity, %w", err) - } - return *identity.Account, nil -} - -// setupBuckets creates buckets needed for integration test -func setupBuckets(ctx context.Context) (func(), error) { - var cleanups []func() - - cleanup := func() { - for i := range cleanups { - cleanups[i]() - } - } - - bucketCreates := []struct { - name *string - arn *string - }{ - {name: &setupMetadata.Buckets.Source.Name, arn: &setupMetadata.Buckets.Source.ARN}, - } - - for _, bucket := range bucketCreates { - *bucket.name = GenerateBucketName() - - if err := SetupBucket(ctx, s3Client, *bucket.name); err != nil { - return cleanup, err - } - - // Compute ARN - bARN := arn.ARN{ - Partition: "aws", - Service: "s3", - Region: region, - AccountID: setupMetadata.AccountID, - Resource: fmt.Sprintf("bucket_name:%s", *bucket.name), - }.String() - - *bucket.arn = bARN - - bucketName := *bucket.name - cleanups = append(cleanups, func() { - if err := CleanupBucket(ctx, s3Client, bucketName); err != nil { - fmt.Fprintln(os.Stderr, err) - } - }) - } - - return cleanup, nil -} - -type putObjectTestData struct { - Body io.Reader - ExpectBody []byte - ExpectError string -} - -// UniqueID returns a unique UUID-like identifier for use in generating -// resources for integration tests. -// -// TODO: duped from service/internal/integrationtest, remove after beta. -func UniqueID() string { - uuid := make([]byte, 16) - io.ReadFull(rand.Reader, uuid) - return fmt.Sprintf("%x", uuid) -} - -func testPutObject(t *testing.T, bucket string, testData putObjectTestData, opts ...func(options *Options)) { - key := UniqueID() - - _, err := s3TransferManagerClient.PutObject(context.Background(), - &PutObjectInput{ - Bucket: bucket, - Key: key, - Body: testData.Body, - }, opts...) - if err != nil { - if len(testData.ExpectError) == 0 { - t.Fatalf("expect no error, got %v", err) - } - if e, a := testData.ExpectError, err.Error(); !strings.Contains(a, e) { - t.Fatalf("expect error to contain %v, got %v", e, a) - } - } else { - if e := testData.ExpectError; len(e) != 0 { - t.Fatalf("expect error: %v, got none", e) - } - } - - if len(testData.ExpectError) != 0 { - return - } - - resp, err := s3Client.GetObject(context.Background(), - &s3.GetObjectInput{ - Bucket: aws.String(bucket), - Key: aws.String(key), - }) - if err != nil { - t.Fatalf("expect no error, got %v", err) - } - - b, _ := ioutil.ReadAll(resp.Body) - if e, a := testData.ExpectBody, b; !bytes.EqualFold(e, a) { - t.Errorf("expect %s, got %s", e, a) - } -} - -// TODO: duped from service/internal/integrationtest, remove after beta. -const expressAZID = "usw2-az3" - -// TODO: duped from service/internal/integrationtest, remove after beta. -const expressSuffix = "--usw2-az3--x-s3" - -// BucketPrefix is the root prefix of integration test buckets. -// -// TODO: duped from service/internal/integrationtest, remove after beta. -const BucketPrefix = "aws-sdk-go-v2-integration" - -// GenerateBucketName returns a unique bucket name. -// -// TODO: duped from service/internal/integrationtest, remove after beta. -func GenerateBucketName() string { - return fmt.Sprintf("%s-%s", - BucketPrefix, UniqueID()) -} - -// GenerateBucketName returns a unique express-formatted bucket name. -// -// TODO: duped from service/internal/integrationtest, remove after beta. -func GenerateExpressBucketName() string { - return fmt.Sprintf( - "%s-%s%s", - BucketPrefix, - UniqueID()[0:8], // express suffix adds length, regain that here - expressSuffix, - ) -} - -// SetupBucket returns a test bucket created for the integration tests. -// -// TODO: duped from service/internal/integrationtest, remove after beta. -func SetupBucket(ctx context.Context, svc *s3.Client, bucketName string) (err error) { - fmt.Println("Setup: Creating test bucket,", bucketName) - _, err = svc.CreateBucket(ctx, &s3.CreateBucketInput{ - Bucket: &bucketName, - CreateBucketConfiguration: &types.CreateBucketConfiguration{ - LocationConstraint: "us-west-2", - }, - }) - if err != nil { - return fmt.Errorf("failed to create bucket %s, %v", bucketName, err) - } - - // TODO: change this to use waiter to wait until BucketExists instead of loop - // svc.WaitUntilBucketExists(HeadBucketInput) - - // HeadBucket to determine if bucket exists - var attempt = 0 - params := &s3.HeadBucketInput{ - Bucket: &bucketName, - } -pt: - _, err = svc.HeadBucket(ctx, params) - // increment an attempt - attempt++ - - // retry till 10 attempt - if err != nil { - if attempt < 10 { - goto pt - } - // fail if not succeed after 10 attempts - return fmt.Errorf("failed to determine if a bucket %s exists and you have permission to access it %v", bucketName, err) - } - - return nil -} - -// CleanupBucket deletes the contents of a S3 bucket, before deleting the bucket -// it self. -// TODO: list and delete methods should use paginators -// -// TODO: duped from service/internal/integrationtest, remove after beta. -func CleanupBucket(ctx context.Context, svc *s3.Client, bucketName string) (err error) { - var errs = make([]error, 0) - - fmt.Println("TearDown: Deleting objects from test bucket,", bucketName) - listObjectsResp, err := svc.ListObjectsV2(ctx, &s3.ListObjectsV2Input{ - Bucket: &bucketName, - }) - if err != nil { - return fmt.Errorf("failed to list objects, %w", err) - } - - for _, o := range listObjectsResp.Contents { - _, err := svc.DeleteObject(ctx, &s3.DeleteObjectInput{ - Bucket: &bucketName, - Key: o.Key, - }) - if err != nil { - errs = append(errs, err) - } - } - if len(errs) != 0 { - return fmt.Errorf("failed to delete objects, %s", errs) - } - - fmt.Println("TearDown: Deleting partial uploads from test bucket,", bucketName) - multipartUploadResp, err := svc.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{ - Bucket: &bucketName, - }) - if err != nil { - return fmt.Errorf("failed to list multipart objects, %w", err) - } - - for _, u := range multipartUploadResp.Uploads { - _, err = svc.AbortMultipartUpload(ctx, &s3.AbortMultipartUploadInput{ - Bucket: &bucketName, - Key: u.Key, - UploadId: u.UploadId, - }) - if err != nil { - errs = append(errs, err) - } - } - if len(errs) != 0 { - return fmt.Errorf("failed to delete multipart upload objects, %s", errs) - } - - fmt.Println("TearDown: Deleting test bucket,", bucketName) - _, err = svc.DeleteBucket(ctx, &s3.DeleteBucketInput{ - Bucket: &bucketName, - }) - if err != nil { - return fmt.Errorf("failed to delete bucket, %s", bucketName) - } - - return nil -} - -// SetupExpressBucket returns an express bucket for testing. -// -// TODO: duped from service/internal/integrationtest, remove after beta. -func SetupExpressBucket(ctx context.Context, svc *s3.Client, bucketName string) error { - if !strings.HasSuffix(bucketName, expressSuffix) { - return fmt.Errorf("bucket name %s is missing required suffix %s", bucketName, expressSuffix) - } - - fmt.Println("Setup: Creating test express bucket,", bucketName) - _, err := svc.CreateBucket(ctx, &s3.CreateBucketInput{ - Bucket: &bucketName, - CreateBucketConfiguration: &types.CreateBucketConfiguration{ - Location: &types.LocationInfo{ - Name: aws.String(expressAZID), - Type: types.LocationTypeAvailabilityZone, - }, - Bucket: &types.BucketInfo{ - DataRedundancy: types.DataRedundancySingleAvailabilityZone, - Type: types.BucketTypeDirectory, - }, - }, - }) - if err != nil { - return fmt.Errorf("create express bucket %s: %v", bucketName, err) - } - - w := s3.NewBucketExistsWaiter(svc) - err = w.Wait(ctx, &s3.HeadBucketInput{ - Bucket: &bucketName, - }, 10*time.Second) - if err != nil { - return fmt.Errorf("wait for express bucket %s: %v", bucketName, err) - } - - return nil -} diff --git a/feature/s3/transfermanager/shared_test.go b/feature/s3/transfermanager/shared_test.go deleted file mode 100644 index 364423e96c2..00000000000 --- a/feature/s3/transfermanager/shared_test.go +++ /dev/null @@ -1,4 +0,0 @@ -package transfermanager - -var buf20MB = make([]byte, 1024*1024*20) -var buf2MB = make([]byte, 1024*1024*2) diff --git a/feature/s3/transfermanager/types/types.go b/feature/s3/transfermanager/types/types.go deleted file mode 100644 index 8a2d877e461..00000000000 --- a/feature/s3/transfermanager/types/types.go +++ /dev/null @@ -1,346 +0,0 @@ -package types - -import ( - "io" - - "github.com/aws/aws-sdk-go-v2/service/s3" - "github.com/aws/aws-sdk-go-v2/service/s3/types" -) - -// ReadSeekCloser wraps a io.Reader returning a ReaderSeekerCloser. Allows the -// SDK to accept an io.Reader that is not also an io.Seeker for unsigned -// streaming payload API operations. -// -// A readSeekCloser wrapping an nonseekable io.Reader used in an API operation's -// input will prevent that operation being retried in the case of -// network errors, and cause operation requests to fail if the operation -// requires payload signing. -func ReadSeekCloser(r io.Reader) *ReaderSeekerCloser { - return &ReaderSeekerCloser{r} -} - -// ReaderSeekerCloser represents a reader that can also delegate io.Seeker and -// io.Closer interfaces to the underlying object if they are available. -type ReaderSeekerCloser struct { - r io.Reader -} - -// SeekerLen attempts to get the number of bytes remaining at the seeker's -// current position. Returns the number of bytes remaining or error. -func SeekerLen(s io.Seeker) (int64, error) { - // Determine if the seeker is actually seekable. ReaderSeekerCloser - // hides the fact that a io.Readers might not actually be seekable. - switch v := s.(type) { - case *ReaderSeekerCloser: - return v.GetLen() - } - - return computeSeekerLength(s) -} - -// GetLen returns the length of the bytes remaining in the underlying reader. -// Checks first for Len(), then io.Seeker to determine the size of the -// underlying reader. -// -// Will return -1 if the length cannot be determined. -func (r *ReaderSeekerCloser) GetLen() (int64, error) { - if l, ok := r.HasLen(); ok { - return int64(l), nil - } - - if s, ok := r.r.(io.Seeker); ok { - return computeSeekerLength(s) - } - - return -1, nil -} - -func computeSeekerLength(s io.Seeker) (int64, error) { - curOffset, err := s.Seek(0, io.SeekCurrent) - if err != nil { - return 0, err - } - - endOffset, err := s.Seek(0, io.SeekEnd) - if err != nil { - return 0, err - } - - _, err = s.Seek(curOffset, io.SeekStart) - if err != nil { - return 0, err - } - - return endOffset - curOffset, nil -} - -// HasLen returns the length of the underlying reader if the value implements -// the Len() int method. -func (r *ReaderSeekerCloser) HasLen() (int, bool) { - type lenner interface { - Len() int - } - - if lr, ok := r.r.(lenner); ok { - return lr.Len(), true - } - - return 0, false -} - -// Read reads from the reader up to size of p. The number of bytes read, and -// error if it occurred will be returned. -// -// If the reader is not an io.Reader zero bytes read, and nil error will be -// returned. -// -// Performs the same functionality as io.Reader Read -func (r *ReaderSeekerCloser) Read(p []byte) (int, error) { - switch t := r.r.(type) { - case io.Reader: - return t.Read(p) - } - return 0, nil -} - -// Seek sets the offset for the next Read to offset, interpreted according to -// whence: 0 means relative to the origin of the file, 1 means relative to the -// current offset, and 2 means relative to the end. Seek returns the new offset -// and an error, if any. -// -// If the ReaderSeekerCloser is not an io.Seeker nothing will be done. -func (r *ReaderSeekerCloser) Seek(offset int64, whence int) (int64, error) { - switch t := r.r.(type) { - case io.Seeker: - return t.Seek(offset, whence) - } - return int64(0), nil -} - -// IsSeeker returns if the underlying reader is also a seeker. -func (r *ReaderSeekerCloser) IsSeeker() bool { - _, ok := r.r.(io.Seeker) - return ok -} - -// Close closes the ReaderSeekerCloser. -// -// If the ReaderSeekerCloser is not an io.Closer nothing will be done. -func (r *ReaderSeekerCloser) Close() error { - switch t := r.r.(type) { - case io.Closer: - return t.Close() - } - return nil -} - -// ChecksumAlgorithm indicates the algorithm used to create the checksum for the object -type ChecksumAlgorithm string - -// Enum values for ChecksumAlgorithm -const ( - ChecksumAlgorithmCrc32 ChecksumAlgorithm = "CRC32" - ChecksumAlgorithmCrc32c = "CRC32C" - ChecksumAlgorithmSha1 = "SHA1" - ChecksumAlgorithmSha256 = "SHA256" -) - -// ObjectCannedACL defines the canned ACL to apply to the object, see [Canned ACL] in the -// Amazon S3 User Guide. -type ObjectCannedACL string - -// Enum values for ObjectCannedACL -const ( - ObjectCannedACLPrivate ObjectCannedACL = "private" - ObjectCannedACLPublicRead = "public-read" - ObjectCannedACLPublicReadWrite = "public-read-write" - ObjectCannedACLAuthenticatedRead = "authenticated-read" - ObjectCannedACLAwsExecRead = "aws-exec-read" - ObjectCannedACLBucketOwnerRead = "bucket-owner-read" - ObjectCannedACLBucketOwnerFullControl = "bucket-owner-full-control" -) - -// Values returns all known values for ObjectCannedACL. Note that this can be -// expanded in the future, and so it is only as up to date as the client. -// -// The ordering of this slice is not guaranteed to be stable across updates. -func (ObjectCannedACL) Values() []ObjectCannedACL { - return []ObjectCannedACL{ - "private", - "public-read", - "public-read-write", - "authenticated-read", - "aws-exec-read", - "bucket-owner-read", - "bucket-owner-full-control", - } -} - -// ObjectLockLegalHoldStatus specifies whether a legal hold will be applied to this object. For more -// information about S3 Object Lock, see [Object Lock] in the Amazon S3 User Guide. -type ObjectLockLegalHoldStatus string - -// Enum values for ObjectLockLegalHoldStatus -const ( - ObjectLockLegalHoldStatusOn ObjectLockLegalHoldStatus = "ON" - ObjectLockLegalHoldStatusOff = "OFF" -) - -// ObjectLockMode is the Object Lock mode that you want to apply to this object. -type ObjectLockMode string - -// Enum values for ObjectLockMode -const ( - ObjectLockModeGovernance ObjectLockMode = "GOVERNANCE" - ObjectLockModeCompliance = "COMPLIANCE" -) - -// RequestPayer confirms that the requester knows that they will be charged for the request. -// Bucket owners need not specify this parameter in their requests. If either the -// source or destination S3 bucket has Requester Pays enabled, the requester will -// pay for corresponding charges to copy the object. For information about -// downloading objects from Requester Pays buckets, see [Downloading Objects in Requester Pays Buckets]in the Amazon S3 User -// Guide. -type RequestPayer string - -// Enum values for RequestPayer -const ( - RequestPayerRequester RequestPayer = "requester" -) - -// ServerSideEncryption indicates the server-side encryption algorithm that was used when you store this object -// in Amazon S3 (for example, AES256 , aws:kms , aws:kms:dsse ) -type ServerSideEncryption string - -// Enum values for ServerSideEncryption -const ( - ServerSideEncryptionAes256 ServerSideEncryption = "AES256" - ServerSideEncryptionAwsKms = "aws:kms" - ServerSideEncryptionAwsKmsDsse = "aws:kms:dsse" -) - -// StorageClass specifies class to store newly created -// objects, which has default value of STANDARD. For more information, see -// [Storage Classes] in the Amazon S3 User Guide. -type StorageClass string - -// Enum values for StorageClass -const ( - StorageClassStandard StorageClass = "STANDARD" - StorageClassReducedRedundancy = "REDUCED_REDUNDANCY" - StorageClassStandardIa = "STANDARD_IA" - StorageClassOnezoneIa = "ONEZONE_IA" - StorageClassIntelligentTiering = "INTELLIGENT_TIERING" - StorageClassGlacier = "GLACIER" - StorageClassDeepArchive = "DEEP_ARCHIVE" - StorageClassOutposts = "OUTPOSTS" - StorageClassGlacierIr = "GLACIER_IR" - StorageClassSnow = "SNOW" - StorageClassExpressOnezone = "EXPRESS_ONEZONE" -) - -// CompletedPart includes details of the parts that were uploaded. -type CompletedPart struct { - - // The base64-encoded, 32-bit CRC32 checksum of the object. This will only be - // present if it was uploaded with the object. When you use an API operation on an - // object that was uploaded using multipart uploads, this value may not be a direct - // checksum value of the full object. Instead, it's a calculation based on the - // checksum values of each individual part. For more information about how - // checksums are calculated with multipart uploads, see [Checking object integrity]in the Amazon S3 User - // Guide. - // - // [Checking object integrity]: https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html#large-object-checksums - ChecksumCRC32 *string - - // The base64-encoded, 32-bit CRC32C checksum of the object. This will only be - // present if it was uploaded with the object. When you use an API operation on an - // object that was uploaded using multipart uploads, this value may not be a direct - // checksum value of the full object. Instead, it's a calculation based on the - // checksum values of each individual part. For more information about how - // checksums are calculated with multipart uploads, see [Checking object integrity]in the Amazon S3 User - // Guide. - // - // [Checking object integrity]: https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html#large-object-checksums - ChecksumCRC32C *string - - // The base64-encoded, 160-bit SHA-1 digest of the object. This will only be - // present if it was uploaded with the object. When you use the API operation on an - // object that was uploaded using multipart uploads, this value may not be a direct - // checksum value of the full object. Instead, it's a calculation based on the - // checksum values of each individual part. For more information about how - // checksums are calculated with multipart uploads, see [Checking object integrity]in the Amazon S3 User - // Guide. - // - // [Checking object integrity]: https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html#large-object-checksums - ChecksumSHA1 *string - - // The base64-encoded, 256-bit SHA-256 digest of the object. This will only be - // present if it was uploaded with the object. When you use an API operation on an - // object that was uploaded using multipart uploads, this value may not be a direct - // checksum value of the full object. Instead, it's a calculation based on the - // checksum values of each individual part. For more information about how - // checksums are calculated with multipart uploads, see [Checking object integrity]in the Amazon S3 User - // Guide. - // - // [Checking object integrity]: https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html#large-object-checksums - ChecksumSHA256 *string - - // Entity tag returned when the part was uploaded. - ETag *string - - // Part number that identifies the part. This is a positive integer between 1 and - // 10,000. - // - // - General purpose buckets - In CompleteMultipartUpload , when a additional - // checksum (including x-amz-checksum-crc32 , x-amz-checksum-crc32c , - // x-amz-checksum-sha1 , or x-amz-checksum-sha256 ) is applied to each part, the - // PartNumber must start at 1 and the part numbers must be consecutive. - // Otherwise, Amazon S3 generates an HTTP 400 Bad Request status code and an - // InvalidPartOrder error code. - // - // - Directory buckets - In CompleteMultipartUpload , the PartNumber must start - // at 1 and the part numbers must be consecutive. - PartNumber *int32 -} - -// MapCompletedPart maps CompletedPart to s3 types -func (cp CompletedPart) MapCompletedPart() types.CompletedPart { - return types.CompletedPart{ - ChecksumCRC32: cp.ChecksumCRC32, - ChecksumCRC32C: cp.ChecksumCRC32C, - ChecksumSHA1: cp.ChecksumSHA1, - ChecksumSHA256: cp.ChecksumSHA256, - ETag: cp.ETag, - PartNumber: cp.PartNumber, - } -} - -// MapFrom set CompletedPart fields from s3 UploadPartOutput -func (cp *CompletedPart) MapFrom(resp *s3.UploadPartOutput, partNum *int32) { - cp.ChecksumCRC32 = resp.ChecksumCRC32 - cp.ChecksumCRC32C = resp.ChecksumCRC32C - cp.ChecksumSHA1 = resp.ChecksumSHA1 - cp.ChecksumSHA256 = resp.ChecksumSHA256 - cp.ETag = resp.ETag - cp.PartNumber = partNum -} - -// RequestCharged indicates that the requester was successfully charged for the request. -type RequestCharged string - -// Enum values for RequestCharged -const ( - RequestChargedRequester RequestCharged = "requester" -) - -// Metadata provides storing and reading metadata values. Keys may be any -// comparable value type. Get and set will panic if key is not a comparable -// value type. -// -// Metadata uses lazy initialization, and Set method must be called as an -// addressable value, or pointer. Not doing so may cause key/value pair to not -// be set. -type Metadata struct { - values map[interface{}]interface{} -} diff --git a/modman.toml b/modman.toml index ce197c0467c..c642b50193b 100644 --- a/modman.toml +++ b/modman.toml @@ -27,9 +27,6 @@ [modules."feature/ec2/imds/internal/configtesting"] no_tag = true - [modules."feature/s3/transfermanager"] - no_tag = true - [modules."internal/codegen"] no_tag = true diff --git a/service/internal/integrationtest/s3/setup_test.go b/service/internal/integrationtest/s3/setup_test.go index e164336f6be..ea00037d6e3 100644 --- a/service/internal/integrationtest/s3/setup_test.go +++ b/service/internal/integrationtest/s3/setup_test.go @@ -346,8 +346,8 @@ func testWriteToObject(t *testing.T, bucket string, testData writeToObjectTestDa } } else { - if e := testData.ExpectError; len(e) != 0 { - t.Fatalf("expected error: %v, got none", e) + if len(testData.ExpectError) != 0 { + t.Fatalf("expected error: %v, got none", err) } }