diff --git a/Makefile b/Makefile index 491bc5d4b3a..ad5b65af7f6 100644 --- a/Makefile +++ b/Makefile @@ -20,7 +20,11 @@ EACHMODULE_FAILFAST_FLAG=-fail-fast=${EACHMODULE_FAILFAST} EACHMODULE_CONCURRENCY ?= 1 EACHMODULE_CONCURRENCY_FLAG=-c ${EACHMODULE_CONCURRENCY} -EACHMODULE_SKIP ?= +# TODO: github.com/aws/aws-sdk-go-v2/feature/s3/transfermanager is in beta +# avoid running any codegen operations / tests on it until then +# path is evaluated relatively by the tool, the right one will vary by the +# command context +EACHMODULE_SKIP ?= feature/s3/transfermanager:s3/transfermanager:transfermanager EACHMODULE_SKIP_FLAG=-skip="${EACHMODULE_SKIP}" EACHMODULE_FLAGS=${EACHMODULE_CONCURRENCY_FLAG} ${EACHMODULE_FAILFAST_FLAG} ${EACHMODULE_SKIP_FLAG} diff --git a/feature/s3/transfermanager/LICENSE.txt b/feature/s3/transfermanager/LICENSE.txt new file mode 100644 index 00000000000..d6456956733 --- /dev/null +++ b/feature/s3/transfermanager/LICENSE.txt @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/feature/s3/transfermanager/api.go b/feature/s3/transfermanager/api.go new file mode 100644 index 00000000000..dbe430dadb4 --- /dev/null +++ b/feature/s3/transfermanager/api.go @@ -0,0 +1,16 @@ +package transfermanager + +import ( + "context" + + "github.com/aws/aws-sdk-go-v2/service/s3" +) + +// S3APIClient defines an interface doing S3 client side operations for transfer manager +type S3APIClient interface { + PutObject(context.Context, *s3.PutObjectInput, ...func(*s3.Options)) (*s3.PutObjectOutput, error) + UploadPart(context.Context, *s3.UploadPartInput, ...func(*s3.Options)) (*s3.UploadPartOutput, error) + CreateMultipartUpload(context.Context, *s3.CreateMultipartUploadInput, ...func(*s3.Options)) (*s3.CreateMultipartUploadOutput, error) + CompleteMultipartUpload(context.Context, *s3.CompleteMultipartUploadInput, ...func(*s3.Options)) (*s3.CompleteMultipartUploadOutput, error) + AbortMultipartUpload(context.Context, *s3.AbortMultipartUploadInput, ...func(*s3.Options)) (*s3.AbortMultipartUploadOutput, error) +} diff --git a/feature/s3/transfermanager/api_client.go b/feature/s3/transfermanager/api_client.go new file mode 100644 index 00000000000..84ce16db425 --- /dev/null +++ b/feature/s3/transfermanager/api_client.go @@ -0,0 +1,51 @@ +package transfermanager + +import ( + "github.com/aws/aws-sdk-go-v2/aws" +) + +const userAgentKey = "s3-transfer" + +// defaultMaxUploadParts is the maximum allowed number of parts in a multi-part upload +// on Amazon S3. +const defaultMaxUploadParts = 10000 + +// defaultPartSizeBytes is the default part size when transferring objects to/from S3 +const minPartSizeBytes = 1024 * 1024 * 8 + +// defaultMultipartUploadThreshold is the default size threshold in bytes indicating when to use multipart upload. +const defaultMultipartUploadThreshold = 1024 * 1024 * 16 + +// defaultTransferConcurrency is the default number of goroutines to spin up when +// using PutObject(). +const defaultTransferConcurrency = 5 + +// Client provides the API client to make operations call for Amazon Simple +// Storage Service's Transfer Manager +// It is safe to call Client methods concurrently across goroutines. +type Client struct { + options Options +} + +// New returns an initialized Client from the client Options. Provide +// more functional options to further configure the Client +func New(s3Client S3APIClient, opts Options, optFns ...func(*Options)) *Client { + opts.S3 = s3Client + for _, fn := range optFns { + fn(&opts) + } + + resolveConcurrency(&opts) + resolvePartSizeBytes(&opts) + resolveChecksumAlgorithm(&opts) + resolveMultipartUploadThreshold(&opts) + + return &Client{ + options: opts, + } +} + +// NewFromConfig returns a new Client from the provided s3 config +func NewFromConfig(s3Client S3APIClient, cfg aws.Config, optFns ...func(*Options)) *Client { + return New(s3Client, Options{}, optFns...) +} diff --git a/feature/s3/transfermanager/api_op_PutObject.go b/feature/s3/transfermanager/api_op_PutObject.go new file mode 100644 index 00000000000..0785ec4e81e --- /dev/null +++ b/feature/s3/transfermanager/api_op_PutObject.go @@ -0,0 +1,992 @@ +package transfermanager + +import ( + "bytes" + "context" + "fmt" + "io" + "sort" + "sync" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/aws/middleware" + "github.com/aws/aws-sdk-go-v2/feature/s3/transfermanager/types" + "github.com/aws/aws-sdk-go-v2/service/s3" + s3types "github.com/aws/aws-sdk-go-v2/service/s3/types" + smithymiddleware "github.com/aws/smithy-go/middleware" +) + +// A MultipartUploadError wraps a failed S3 multipart upload. An error returned +// will satisfy this interface when a multi part upload failed to upload all +// chucks to S3. In the case of a failure the UploadID is needed to operate on +// the chunks, if any, which were uploaded. +// +// Example: +// +// c := transfermanager.New(client, opts) +// output, err := c.PutObject(context.Background(), input) +// if err != nil { +// var multierr transfermanager.MultipartUploadError +// if errors.As(err, &multierr) { +// fmt.Printf("upload failure UploadID=%s, %s\n", multierr.UploadID(), multierr.Error()) +// } else { +// fmt.Printf("upload failure, %s\n", err.Error()) +// } +// } +type MultipartUploadError interface { + error + + // UploadID returns the upload id for the S3 multipart upload that failed. + UploadID() string +} + +// A multipartUploadError wraps the upload ID of a failed s3 multipart upload. +// Composed of BaseError for code, message, and original error +// +// Should be used for an error that occurred failing a S3 multipart upload, +// and a upload ID is available. +type multipartUploadError struct { + err error + + // ID for multipart upload which failed. + uploadID string +} + +// Error returns the string representation of the error. +// +// Satisfies the error interface. +func (m *multipartUploadError) Error() string { + var extra string + if m.err != nil { + extra = fmt.Sprintf(", cause: %s", m.err.Error()) + } + return fmt.Sprintf("upload multipart failed, upload id: %s%s", m.uploadID, extra) +} + +// Unwrap returns the underlying error that cause the upload failure +func (m *multipartUploadError) Unwrap() error { + return m.err +} + +// UploadID returns the id of the S3 upload which failed. +func (m *multipartUploadError) UploadID() string { + return m.uploadID +} + +// PutObjectInput represents a request to the PutObject() call. It contains common fields +// of s3 PutObject and CreateMultipartUpload input +type PutObjectInput struct { + // Bucket the object is uploaded into + Bucket string + + // Object key for which the PUT action was initiated + Key string + + // Object data + Body io.Reader + + // The canned ACL to apply to the object. For more information, see [Canned ACL] in the Amazon + // S3 User Guide. + // + // When adding a new object, you can use headers to grant ACL-based permissions to + // individual Amazon Web Services accounts or to predefined groups defined by + // Amazon S3. These permissions are then added to the ACL on the object. By + // default, all objects are private. Only the owner has full access control. For + // more information, see [Access Control List (ACL) Overview]and [Managing ACLs Using the REST API] in the Amazon S3 User Guide. + // + // If the bucket that you're uploading objects to uses the bucket owner enforced + // setting for S3 Object Ownership, ACLs are disabled and no longer affect + // permissions. Buckets that use this setting only accept PUT requests that don't + // specify an ACL or PUT requests that specify bucket owner full control ACLs, such + // as the bucket-owner-full-control canned ACL or an equivalent form of this ACL + // expressed in the XML format. PUT requests that contain other ACLs (for example, + // custom grants to certain Amazon Web Services accounts) fail and return a 400 + // error with the error code AccessControlListNotSupported . For more information, + // see [Controlling ownership of objects and disabling ACLs]in the Amazon S3 User Guide. + // + // - This functionality is not supported for directory buckets. + // + // - This functionality is not supported for Amazon S3 on Outposts. + // + // [Managing ACLs Using the REST API]: https://docs.aws.amazon.com/AmazonS3/latest/dev/acl-using-rest-api.html + // [Access Control List (ACL) Overview]: https://docs.aws.amazon.com/AmazonS3/latest/dev/acl-overview.html + // [Canned ACL]: https://docs.aws.amazon.com/AmazonS3/latest/dev/acl-overview.html#CannedACL + // [Controlling ownership of objects and disabling ACLs]: https://docs.aws.amazon.com/AmazonS3/latest/userguide/about-object-ownership.html + ACL types.ObjectCannedACL + + // Specifies whether Amazon S3 should use an S3 Bucket Key for object encryption + // with server-side encryption using Key Management Service (KMS) keys (SSE-KMS). + // Setting this header to true causes Amazon S3 to use an S3 Bucket Key for object + // encryption with SSE-KMS. + // + // Specifying this header with a PUT action doesn’t affect bucket-level settings + // for S3 Bucket Key. + // + // This functionality is not supported for directory buckets. + BucketKeyEnabled bool + + // Can be used to specify caching behavior along the request/reply chain. For more + // information, see [http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.9]. + // + // [http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.9]: http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.9 + CacheControl string + + // Indicates the algorithm used to create the checksum for the object when you use + // the SDK. This header will not provide any additional functionality if you don't + // use the SDK. When you send this header, there must be a corresponding + // x-amz-checksum-algorithm or x-amz-trailer header sent. Otherwise, Amazon S3 + // fails the request with the HTTP status code 400 Bad Request . + // + // For the x-amz-checksum-algorithm header, replace algorithm with the + // supported algorithm from the following list: + // + // - CRC32 + // + // - CRC32C + // + // - SHA1 + // + // - SHA256 + // + // For more information, see [Checking object integrity] in the Amazon S3 User Guide. + // + // If the individual checksum value you provide through x-amz-checksum-algorithm + // doesn't match the checksum algorithm you set through + // x-amz-sdk-checksum-algorithm , Amazon S3 ignores any provided ChecksumAlgorithm + // parameter and uses the checksum algorithm that matches the provided value in + // x-amz-checksum-algorithm . + // + // For directory buckets, when you use Amazon Web Services SDKs, CRC32 is the + // default checksum algorithm that's used for performance. + // + // [Checking object integrity]: https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html + ChecksumAlgorithm types.ChecksumAlgorithm + + // Size of the body in bytes. This parameter is useful when the size of the body + // cannot be determined automatically. For more information, see [https://www.rfc-editor.org/rfc/rfc9110.html#name-content-length]. + // + // [https://www.rfc-editor.org/rfc/rfc9110.html#name-content-length]: https://www.rfc-editor.org/rfc/rfc9110.html#name-content-length + ContentLength int64 + + // Specifies presentational information for the object. For more information, see [https://www.rfc-editor.org/rfc/rfc6266#section-4]. + // + // [https://www.rfc-editor.org/rfc/rfc6266#section-4]: https://www.rfc-editor.org/rfc/rfc6266#section-4 + ContentDisposition string + + // Specifies what content encodings have been applied to the object and thus what + // decoding mechanisms must be applied to obtain the media-type referenced by the + // Content-Type header field. For more information, see [https://www.rfc-editor.org/rfc/rfc9110.html#field.content-encoding]. + // + // [https://www.rfc-editor.org/rfc/rfc9110.html#field.content-encoding]: https://www.rfc-editor.org/rfc/rfc9110.html#field.content-encoding + ContentEncoding string + + // The language the content is in. + ContentLanguage string + + // A standard MIME type describing the format of the contents. For more + // information, see [https://www.rfc-editor.org/rfc/rfc9110.html#name-content-type]. + // + // [https://www.rfc-editor.org/rfc/rfc9110.html#name-content-type]: https://www.rfc-editor.org/rfc/rfc9110.html#name-content-type + ContentType string + + // The account ID of the expected bucket owner. If the account ID that you provide + // does not match the actual owner of the bucket, the request fails with the HTTP + // status code 403 Forbidden (access denied). + ExpectedBucketOwner string + + // The date and time at which the object is no longer cacheable. For more + // information, see [https://www.rfc-editor.org/rfc/rfc7234#section-5.3]. + // + // [https://www.rfc-editor.org/rfc/rfc7234#section-5.3]: https://www.rfc-editor.org/rfc/rfc7234#section-5.3 + Expires time.Time + + // Gives the grantee READ, READ_ACP, and WRITE_ACP permissions on the object. + // + // - This functionality is not supported for directory buckets. + // + // - This functionality is not supported for Amazon S3 on Outposts. + GrantFullControl string + + // Allows grantee to read the object data and its metadata. + // + // - This functionality is not supported for directory buckets. + // + // - This functionality is not supported for Amazon S3 on Outposts. + GrantRead string + + // Allows grantee to read the object ACL. + // + // - This functionality is not supported for directory buckets. + // + // - This functionality is not supported for Amazon S3 on Outposts. + GrantReadACP string + + // Allows grantee to write the ACL for the applicable object. + // + // - This functionality is not supported for directory buckets. + // + // - This functionality is not supported for Amazon S3 on Outposts. + GrantWriteACP string + + // A map of metadata to store with the object in S3. + Metadata map[string]string + + // Specifies whether a legal hold will be applied to this object. For more + // information about S3 Object Lock, see [Object Lock]in the Amazon S3 User Guide. + // + // This functionality is not supported for directory buckets. + // + // [Object Lock]: https://docs.aws.amazon.com/AmazonS3/latest/dev/object-lock.html + ObjectLockLegalHoldStatus types.ObjectLockLegalHoldStatus + + // The Object Lock mode that you want to apply to this object. + // + // This functionality is not supported for directory buckets. + ObjectLockMode types.ObjectLockMode + + // The date and time when you want this object's Object Lock to expire. Must be + // formatted as a timestamp parameter. + // + // This functionality is not supported for directory buckets. + ObjectLockRetainUntilDate time.Time + + // Confirms that the requester knows that they will be charged for the request. + // Bucket owners need not specify this parameter in their requests. If either the + // source or destination S3 bucket has Requester Pays enabled, the requester will + // pay for corresponding charges to copy the object. For information about + // downloading objects from Requester Pays buckets, see [Downloading Objects in Requester Pays Buckets]in the Amazon S3 User + // Guide. + // + // This functionality is not supported for directory buckets. + // + // [Downloading Objects in Requester Pays Buckets]: https://docs.aws.amazon.com/AmazonS3/latest/dev/ObjectsinRequesterPaysBuckets.html + RequestPayer types.RequestPayer + + // Specifies the algorithm to use when encrypting the object (for example, AES256 ). + // + // This functionality is not supported for directory buckets. + SSECustomerAlgorithm string + + // Specifies the customer-provided encryption key for Amazon S3 to use in + // encrypting data. This value is used to store the object and then it is + // discarded; Amazon S3 does not store the encryption key. The key must be + // appropriate for use with the algorithm specified in the + // x-amz-server-side-encryption-customer-algorithm header. + // + // This functionality is not supported for directory buckets. + SSECustomerKey string + + // Specifies the Amazon Web Services KMS Encryption Context to use for object + // encryption. The value of this header is a base64-encoded UTF-8 string holding + // JSON with the encryption context key-value pairs. This value is stored as object + // metadata and automatically gets passed on to Amazon Web Services KMS for future + // GetObject or CopyObject operations on this object. This value must be + // explicitly added during CopyObject operations. + // + // This functionality is not supported for directory buckets. + SSEKMSEncryptionContext string + + // If x-amz-server-side-encryption has a valid value of aws:kms or aws:kms:dsse , + // this header specifies the ID (Key ID, Key ARN, or Key Alias) of the Key + // Management Service (KMS) symmetric encryption customer managed key that was used + // for the object. If you specify x-amz-server-side-encryption:aws:kms or + // x-amz-server-side-encryption:aws:kms:dsse , but do not provide + // x-amz-server-side-encryption-aws-kms-key-id , Amazon S3 uses the Amazon Web + // Services managed key ( aws/s3 ) to protect the data. If the KMS key does not + // exist in the same account that's issuing the command, you must use the full ARN + // and not just the ID. + // + // This functionality is not supported for directory buckets. + SSEKMSKeyID string + + // The server-side encryption algorithm that was used when you store this object + // in Amazon S3 (for example, AES256 , aws:kms , aws:kms:dsse ). + // + // General purpose buckets - You have four mutually exclusive options to protect + // data using server-side encryption in Amazon S3, depending on how you choose to + // manage the encryption keys. Specifically, the encryption key options are Amazon + // S3 managed keys (SSE-S3), Amazon Web Services KMS keys (SSE-KMS or DSSE-KMS), + // and customer-provided keys (SSE-C). Amazon S3 encrypts data with server-side + // encryption by using Amazon S3 managed keys (SSE-S3) by default. You can + // optionally tell Amazon S3 to encrypt data at rest by using server-side + // encryption with other key options. For more information, see [Using Server-Side Encryption]in the Amazon S3 + // User Guide. + // + // Directory buckets - For directory buckets, only the server-side encryption with + // Amazon S3 managed keys (SSE-S3) ( AES256 ) value is supported. + // + // [Using Server-Side Encryption]: https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingServerSideEncryption.html + ServerSideEncryption types.ServerSideEncryption + + // By default, Amazon S3 uses the STANDARD Storage Class to store newly created + // objects. The STANDARD storage class provides high durability and high + // availability. Depending on performance needs, you can specify a different + // Storage Class. For more information, see [Storage Classes]in the Amazon S3 User Guide. + // + // - For directory buckets, only the S3 Express One Zone storage class is + // supported to store newly created objects. + // + // - Amazon S3 on Outposts only uses the OUTPOSTS Storage Class. + // + // [Storage Classes]: https://docs.aws.amazon.com/AmazonS3/latest/dev/storage-class-intro.html + StorageClass types.StorageClass + + // The tag-set for the object. The tag-set must be encoded as URL Query + // parameters. (For example, "Key1=Value1") + // + // This functionality is not supported for directory buckets. + Tagging string + + // If the bucket is configured as a website, redirects requests for this object to + // another object in the same bucket or to an external URL. Amazon S3 stores the + // value of this header in the object metadata. For information about object + // metadata, see [Object Key and Metadata]in the Amazon S3 User Guide. + // + // In the following example, the request header sets the redirect to an object + // (anotherPage.html) in the same bucket: + // + // x-amz-website-redirect-location: /anotherPage.html + // + // In the following example, the request header sets the object redirect to + // another website: + // + // x-amz-website-redirect-location: http://www.example.com/ + // + // For more information about website hosting in Amazon S3, see [Hosting Websites on Amazon S3] and [How to Configure Website Page Redirects] in the + // Amazon S3 User Guide. + // + // This functionality is not supported for directory buckets. + // + // [How to Configure Website Page Redirects]: https://docs.aws.amazon.com/AmazonS3/latest/dev/how-to-page-redirect.html + // [Hosting Websites on Amazon S3]: https://docs.aws.amazon.com/AmazonS3/latest/dev/WebsiteHosting.html + // [Object Key and Metadata]: https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingMetadata.html + WebsiteRedirectLocation string +} + +// map non-zero string to *string +func nzstring(v string) *string { + if v == "" { + return nil + } + return aws.String(v) +} + +// map non-zero Time to *Time +func nztime(t time.Time) *time.Time { + if t.IsZero() { + return nil + } + return aws.Time(t) +} + +func (i PutObjectInput) mapSingleUploadInput(body io.Reader, checksumAlgorithm types.ChecksumAlgorithm) *s3.PutObjectInput { + input := &s3.PutObjectInput{ + Bucket: aws.String(i.Bucket), + Key: aws.String(i.Key), + Body: body, + } + if i.ACL != "" { + input.ACL = s3types.ObjectCannedACL(i.ACL) + } + if i.ChecksumAlgorithm != "" { + input.ChecksumAlgorithm = s3types.ChecksumAlgorithm(i.ChecksumAlgorithm) + } else { + input.ChecksumAlgorithm = s3types.ChecksumAlgorithm(checksumAlgorithm) + } + if i.ObjectLockLegalHoldStatus != "" { + input.ObjectLockLegalHoldStatus = s3types.ObjectLockLegalHoldStatus(i.ObjectLockLegalHoldStatus) + } + if i.ObjectLockMode != "" { + input.ObjectLockMode = s3types.ObjectLockMode(i.ObjectLockMode) + } + if i.RequestPayer != "" { + input.RequestPayer = s3types.RequestPayer(i.RequestPayer) + } + if i.ServerSideEncryption != "" { + input.ServerSideEncryption = s3types.ServerSideEncryption(i.ServerSideEncryption) + } + if i.StorageClass != "" { + input.StorageClass = s3types.StorageClass(i.StorageClass) + } + input.BucketKeyEnabled = aws.Bool(i.BucketKeyEnabled) + input.CacheControl = nzstring(i.CacheControl) + input.ContentDisposition = nzstring(i.ContentDisposition) + input.ContentEncoding = nzstring(i.ContentEncoding) + input.ContentLanguage = nzstring(i.ContentLanguage) + input.ContentType = nzstring(i.ContentType) + input.ExpectedBucketOwner = nzstring(i.ExpectedBucketOwner) + input.GrantFullControl = nzstring(i.GrantFullControl) + input.GrantRead = nzstring(i.GrantRead) + input.GrantReadACP = nzstring(i.GrantReadACP) + input.GrantWriteACP = nzstring(i.GrantWriteACP) + input.Metadata = i.Metadata + input.SSECustomerAlgorithm = nzstring(i.SSECustomerAlgorithm) + input.SSECustomerKey = nzstring(i.SSECustomerKey) + input.SSEKMSEncryptionContext = nzstring(i.SSEKMSEncryptionContext) + input.SSEKMSKeyId = nzstring(i.SSEKMSKeyID) + input.Tagging = nzstring(i.Tagging) + input.WebsiteRedirectLocation = nzstring(i.WebsiteRedirectLocation) + input.Expires = nztime(i.Expires) + input.ObjectLockRetainUntilDate = nztime(i.ObjectLockRetainUntilDate) + return input +} + +func (i PutObjectInput) mapCreateMultipartUploadInput() *s3.CreateMultipartUploadInput { + input := &s3.CreateMultipartUploadInput{ + Bucket: aws.String(i.Bucket), + Key: aws.String(i.Key), + } + if i.ACL != "" { + input.ACL = s3types.ObjectCannedACL(i.ACL) + } + if i.ChecksumAlgorithm != "" { + input.ChecksumAlgorithm = s3types.ChecksumAlgorithm(i.ChecksumAlgorithm) + } else { + input.ChecksumAlgorithm = s3types.ChecksumAlgorithm(i.ChecksumAlgorithm) + } + if i.ObjectLockLegalHoldStatus != "" { + input.ObjectLockLegalHoldStatus = s3types.ObjectLockLegalHoldStatus(i.ObjectLockLegalHoldStatus) + } + if i.ObjectLockMode != "" { + input.ObjectLockMode = s3types.ObjectLockMode(i.ObjectLockMode) + } + if i.RequestPayer != "" { + input.RequestPayer = s3types.RequestPayer(i.RequestPayer) + } + if i.ServerSideEncryption != "" { + input.ServerSideEncryption = s3types.ServerSideEncryption(i.ServerSideEncryption) + } + if i.StorageClass != "" { + input.StorageClass = s3types.StorageClass(i.StorageClass) + } + input.BucketKeyEnabled = aws.Bool(i.BucketKeyEnabled) + input.CacheControl = nzstring(i.CacheControl) + input.ContentDisposition = nzstring(i.ContentDisposition) + input.ContentEncoding = nzstring(i.ContentEncoding) + input.ContentLanguage = nzstring(i.ContentLanguage) + input.ContentType = nzstring(i.ContentType) + input.ExpectedBucketOwner = nzstring(i.ExpectedBucketOwner) + input.GrantFullControl = nzstring(i.GrantFullControl) + input.GrantRead = nzstring(i.GrantRead) + input.GrantReadACP = nzstring(i.GrantReadACP) + input.GrantWriteACP = nzstring(i.GrantWriteACP) + input.Metadata = i.Metadata + input.SSECustomerAlgorithm = nzstring(i.SSECustomerAlgorithm) + input.SSECustomerKey = nzstring(i.SSECustomerKey) + input.SSEKMSEncryptionContext = nzstring(i.SSEKMSEncryptionContext) + input.SSEKMSKeyId = nzstring(i.SSEKMSKeyID) + input.Tagging = nzstring(i.Tagging) + input.WebsiteRedirectLocation = nzstring(i.WebsiteRedirectLocation) + input.Expires = nztime(i.Expires) + input.ObjectLockRetainUntilDate = nztime(i.ObjectLockRetainUntilDate) + return input +} + +func (i PutObjectInput) mapCompleteMultipartUploadInput(uploadID *string, completedParts completedParts) *s3.CompleteMultipartUploadInput { + input := &s3.CompleteMultipartUploadInput{ + Bucket: aws.String(i.Bucket), + Key: aws.String(i.Key), + UploadId: uploadID, + } + if i.RequestPayer != "" { + input.RequestPayer = s3types.RequestPayer(i.RequestPayer) + } + input.ExpectedBucketOwner = nzstring(i.ExpectedBucketOwner) + input.SSECustomerAlgorithm = nzstring(i.SSECustomerAlgorithm) + input.SSECustomerKey = nzstring(i.SSECustomerKey) + var parts []s3types.CompletedPart + for _, part := range completedParts { + parts = append(parts, part.MapCompletedPart()) + } + if parts != nil { + input.MultipartUpload = &s3types.CompletedMultipartUpload{Parts: parts} + } + return input +} + +func (i PutObjectInput) mapUploadPartInput(body io.Reader, partNum *int32, uploadID *string) *s3.UploadPartInput { + input := &s3.UploadPartInput{ + Bucket: aws.String(i.Bucket), + Key: aws.String(i.Key), + Body: body, + PartNumber: partNum, + UploadId: uploadID, + } + if i.ChecksumAlgorithm != "" { + input.ChecksumAlgorithm = s3types.ChecksumAlgorithm(i.ChecksumAlgorithm) + } + if i.RequestPayer != "" { + input.RequestPayer = s3types.RequestPayer(i.RequestPayer) + } + input.ExpectedBucketOwner = nzstring(i.ExpectedBucketOwner) + input.SSECustomerAlgorithm = nzstring(i.SSECustomerAlgorithm) + input.SSECustomerKey = nzstring(i.SSECustomerKey) + return input +} + +func (i *PutObjectInput) mapAbortMultipartUploadInput(uploadID *string) *s3.AbortMultipartUploadInput { + input := &s3.AbortMultipartUploadInput{ + Bucket: aws.String(i.Bucket), + Key: aws.String(i.Key), + UploadId: uploadID, + } + return input +} + +// PutObjectOutput represents a response from the Upload() call. It contains common fields +// of s3 PutObject and CompleteMultipartUpload output +type PutObjectOutput struct { + // The ID for a multipart upload to S3. In the case of an error the error + // can be cast to the MultiUploadFailure interface to extract the upload ID. + // Will be empty string if multipart upload was not used, and the object + // was uploaded as a single PutObject call. + UploadID string + + // The list of parts that were uploaded and their checksums. Will be empty + // if multipart upload was not used, and the object was uploaded as a + // single PutObject call. + CompletedParts []types.CompletedPart + + // Indicates whether the uploaded object uses an S3 Bucket Key for server-side + // encryption with Amazon Web Services KMS (SSE-KMS). + BucketKeyEnabled bool + + // The base64-encoded, 32-bit CRC32 checksum of the object. + ChecksumCRC32 string + + // The base64-encoded, 32-bit CRC32C checksum of the object. + ChecksumCRC32C string + + // The base64-encoded, 160-bit SHA-1 digest of the object. + ChecksumSHA1 string + + // The base64-encoded, 256-bit SHA-256 digest of the object. + ChecksumSHA256 string + + // Entity tag for the uploaded object. + ETag string + + // If the object expiration is configured, this will contain the expiration date + // (expiry-date) and rule ID (rule-id). The value of rule-id is URL encoded. + Expiration string + + // The bucket where the newly created object is put + Bucket string + + // The object key of the newly created object. + Key string + + // If present, indicates that the requester was successfully charged for the + // request. + RequestCharged types.RequestCharged + + // If present, specifies the ID of the Amazon Web Services Key Management Service + // (Amazon Web Services KMS) symmetric customer managed customer master key (CMK) + // that was used for the object. + SSEKMSKeyID string + + // If you specified server-side encryption either with an Amazon S3-managed + // encryption key or an Amazon Web Services KMS customer master key (CMK) in your + // initiate multipart upload request, the response includes this header. It + // confirms the encryption algorithm that Amazon S3 used to encrypt the object. + ServerSideEncryption types.ServerSideEncryption + + // The version of the object that was uploaded. Will only be populated if + // the S3 Bucket is versioned. If the bucket is not versioned this field + // will not be set. + VersionID string + + // Metadata pertaining to the operation's result. + ResultMetadata smithymiddleware.Metadata +} + +func (o *PutObjectOutput) mapFromPutObjectOutput(out *s3.PutObjectOutput, bucket, key string) { + o.BucketKeyEnabled = aws.ToBool(out.BucketKeyEnabled) + o.ChecksumCRC32 = aws.ToString(out.ChecksumCRC32) + o.ChecksumCRC32C = aws.ToString(out.ChecksumCRC32C) + o.ChecksumSHA1 = aws.ToString(out.ChecksumSHA1) + o.ChecksumSHA256 = aws.ToString(out.ChecksumSHA256) + o.ETag = aws.ToString(out.ETag) + o.Expiration = aws.ToString(out.Expiration) + o.Bucket = bucket + o.Key = key + o.RequestCharged = types.RequestCharged(out.RequestCharged) + o.SSEKMSKeyID = aws.ToString(out.SSEKMSKeyId) + o.ServerSideEncryption = types.ServerSideEncryption(out.ServerSideEncryption) + o.VersionID = aws.ToString(out.VersionId) + o.ResultMetadata = out.ResultMetadata.Clone() +} + +func (o *PutObjectOutput) mapFromCompleteMultipartUploadOutput(out *s3.CompleteMultipartUploadOutput, bucket, uploadID string, completedParts completedParts) { + o.UploadID = uploadID + o.CompletedParts = completedParts + o.BucketKeyEnabled = aws.ToBool(out.BucketKeyEnabled) + o.ChecksumCRC32 = aws.ToString(out.ChecksumCRC32) + o.ChecksumCRC32C = aws.ToString(out.ChecksumCRC32C) + o.ChecksumSHA1 = aws.ToString(out.ChecksumSHA1) + o.ChecksumSHA256 = aws.ToString(out.ChecksumSHA256) + o.ETag = aws.ToString(out.ETag) + o.Expiration = aws.ToString(out.Expiration) + o.Bucket = bucket + o.Key = aws.ToString(out.Key) + o.RequestCharged = types.RequestCharged(out.RequestCharged) + o.SSEKMSKeyID = aws.ToString(out.SSEKMSKeyId) + o.ServerSideEncryption = types.ServerSideEncryption(out.ServerSideEncryption) + o.VersionID = aws.ToString(out.VersionId) + o.ResultMetadata = out.ResultMetadata +} + +// PutObject uploads an object to S3, intelligently buffering large +// files into smaller chunks and sending them in parallel across multiple +// goroutines. You can configure the chunk size and concurrency through the +// Options parameters. +// +// Additional functional options can be provided to configure the individual +// upload. These options are copies of the original Options instance, the client of which PutObject is called from. +// Modifying the options will not impact the original Client and Options instance. +func (c *Client) PutObject(ctx context.Context, input *PutObjectInput, opts ...func(*Options)) (*PutObjectOutput, error) { + i := uploader{in: input, options: c.options.Copy()} + for _, opt := range opts { + opt(&i.options) + } + + return i.upload(ctx) +} + +type uploader struct { + options Options + + in *PutObjectInput + + // PartPool allows for the re-usage of streaming payload part buffers between upload calls + partPool bytesBufferPool +} + +func (u *uploader) upload(ctx context.Context) (*PutObjectOutput, error) { + if err := u.init(); err != nil { + return nil, fmt.Errorf("unable to initialize upload: %w", err) + } + defer u.partPool.Close() + + clientOptions := []func(o *s3.Options){ + func(o *s3.Options) { + o.APIOptions = append(o.APIOptions, + middleware.AddSDKAgentKey(middleware.FeatureMetadata, userAgentKey), + addFeatureUserAgent, + ) + }} + + r, _, cleanUp, err := u.nextReader(ctx) + + if err == io.EOF { + return u.singleUpload(ctx, r, cleanUp, clientOptions...) + } else if err != nil { + cleanUp() + return nil, err + } + + mu := multiUploader{ + uploader: u, + } + return mu.upload(ctx, r, cleanUp, clientOptions...) +} + +func (u *uploader) init() error { + if err := u.initSize(); err != nil { + return err + } + u.partPool = newDefaultSlicePool(u.options.PartSizeBytes, u.options.Concurrency+1) + + return nil +} + +// initSize checks user configured partsize and up-size it if calculated part count exceeds max value +func (u *uploader) initSize() error { + if u.options.PartSizeBytes < minPartSizeBytes { + return fmt.Errorf("part size must be at least %d bytes", minPartSizeBytes) + } + + var bodySize int64 + switch r := u.in.Body.(type) { + case io.Seeker: + n, err := types.SeekerLen(r) + if err != nil { + return err + } + bodySize = n + default: + if l := u.in.ContentLength; l > 0 { + bodySize = l + } + } + + // Try to adjust partSize if it is too small and account for + // integer division truncation. + if bodySize/u.options.PartSizeBytes >= int64(defaultMaxUploadParts) { + // Add one to the part size to account for remainders + // during the size calculation. e.g odd number of bytes. + u.options.PartSizeBytes = (bodySize / int64(defaultMaxUploadParts)) + 1 + } + return nil +} + +func (u *uploader) singleUpload(ctx context.Context, r io.Reader, cleanUp func(), clientOptions ...func(*s3.Options)) (*PutObjectOutput, error) { + defer cleanUp() + + params := u.in.mapSingleUploadInput(r, u.options.ChecksumAlgorithm) + + out, err := u.options.S3.PutObject(ctx, params, clientOptions...) + if err != nil { + return nil, err + } + + var output PutObjectOutput + output.mapFromPutObjectOutput(out, u.in.Bucket, u.in.Key) + return &output, nil +} + +// nextReader reads the next chunk of data from input Body +func (u *uploader) nextReader(ctx context.Context) (io.Reader, int, func(), error) { + part, err := u.partPool.Get(ctx) + if err != nil { + return nil, 0, func() {}, err + } + + n, err := readFillBuf(u.in.Body, part) + + cleanup := func() { + u.partPool.Put(part) + } + return bytes.NewReader(part[0:n]), n, cleanup, err +} + +func readFillBuf(r io.Reader, b []byte) (offset int, err error) { + for offset < len(b) && err == nil { + var n int + n, err = r.Read(b[offset:]) + offset += n + } + return offset, err +} + +type multiUploader struct { + *uploader + wg sync.WaitGroup + m sync.Mutex + err error + uploadID *string + parts completedParts +} + +type ulChunk struct { + buf io.Reader + partNum *int32 + cleanup func() +} + +type completedParts []types.CompletedPart + +func (cp completedParts) Len() int { + return len(cp) +} + +func (cp completedParts) Less(i, j int) bool { + return aws.ToInt32(cp[i].PartNumber) < aws.ToInt32(cp[j].PartNumber) +} + +func (cp completedParts) Swap(i, j int) { + cp[i], cp[j] = cp[j], cp[i] +} + +// upload will perform a multipart upload using the firstBuf buffer containing +// the first chunk of data. +func (u *multiUploader) upload(ctx context.Context, firstBuf io.Reader, cleanup func(), clientOptions ...func(*s3.Options)) (*PutObjectOutput, error) { + params := u.uploader.in.mapCreateMultipartUploadInput() + + // Create a multipart + resp, err := u.uploader.options.S3.CreateMultipartUpload(ctx, params, clientOptions...) + if err != nil { + cleanup() + return nil, err + } + u.uploadID = resp.UploadId + + ch := make(chan ulChunk, u.options.Concurrency) + for i := 0; i < u.options.Concurrency; i++ { + // launch workers + u.wg.Add(1) + go u.readChunk(ctx, ch, clientOptions...) + } + + var partNum int32 = 1 + ch <- ulChunk{buf: firstBuf, partNum: aws.Int32(partNum), cleanup: cleanup} + for u.geterr() == nil && err == nil { + partNum++ + var ( + data io.Reader + nextChunkLen int + ok bool + ) + data, nextChunkLen, cleanup, err = u.nextReader(ctx) + ok, err = u.shouldContinue(partNum, nextChunkLen, err) + if !ok { + cleanup() + if err != nil { + u.seterr(err) + } + break + } + + ch <- ulChunk{buf: data, partNum: aws.Int32(partNum), cleanup: cleanup} + } + + // close the channel, wait for workers and complete upload + close(ch) + u.wg.Wait() + completeOut := u.complete(ctx, clientOptions...) + + if err := u.geterr(); err != nil { + return nil, &multipartUploadError{ + err: err, + uploadID: *u.uploadID, + } + } + + var out PutObjectOutput + out.mapFromCompleteMultipartUploadOutput(completeOut, aws.ToString(params.Bucket), aws.ToString(u.uploadID), u.parts) + return &out, nil +} + +func (u *multiUploader) shouldContinue(part int32, nextChunkLen int, err error) (bool, error) { + if err != nil && err != io.EOF { + return false, fmt.Errorf("read multipart upload data failed, %w", err) + } + + if nextChunkLen == 0 { + // No need to upload empty part, if file was empty to start + // with empty single part would of been created and never + // started multipart upload. + return false, nil + } + + // This upload exceeded maximum number of supported parts, error now. + if part > defaultMaxUploadParts { + return false, fmt.Errorf(fmt.Sprintf("exceeded total allowed S3 limit MaxUploadParts (%d). Adjust PartSize to fit in this limit", defaultMaxUploadParts)) + } + + return true, err +} + +// readChunk runs in worker goroutines to pull chunks off of the ch channel +// and send() them as UploadPart requests. +func (u *multiUploader) readChunk(ctx context.Context, ch chan ulChunk, clientOptions ...func(*s3.Options)) { + defer u.wg.Done() + for { + data, ok := <-ch + + if !ok { + break + } + + if u.geterr() == nil { + if err := u.send(ctx, data, clientOptions...); err != nil { + u.seterr(err) + } + } + + data.cleanup() + } +} + +// send performs an UploadPart request and keeps track of the completed +// part information. +func (u *multiUploader) send(ctx context.Context, c ulChunk, clientOptions ...func(*s3.Options)) error { + params := u.in.mapUploadPartInput(c.buf, c.partNum, u.uploadID) + resp, err := u.options.S3.UploadPart(ctx, params, clientOptions...) + if err != nil { + return err + } + + var completed types.CompletedPart + completed.MapFrom(resp, c.partNum) + + u.m.Lock() + u.parts = append(u.parts, completed) + u.m.Unlock() + + return nil +} + +// geterr is a thread-safe getter for the error object +func (u *multiUploader) geterr() error { + u.m.Lock() + defer u.m.Unlock() + + return u.err +} + +// seterr is a thread-safe setter for the error object +func (u *multiUploader) seterr(e error) { + u.m.Lock() + defer u.m.Unlock() + + u.err = e +} + +func (u *multiUploader) fail(ctx context.Context, clientOptions ...func(*s3.Options)) { + params := u.in.mapAbortMultipartUploadInput(u.uploadID) + _, err := u.options.S3.AbortMultipartUpload(ctx, params, clientOptions...) + if err != nil { + u.seterr(fmt.Errorf("failed to abort multipart upload (%v), triggered after multipart upload failed: %v", err, u.geterr())) + } +} + +// complete successfully completes a multipart upload and returns the response. +func (u *multiUploader) complete(ctx context.Context, clientOptions ...func(*s3.Options)) *s3.CompleteMultipartUploadOutput { + if u.geterr() != nil { + u.fail(ctx) + return nil + } + + // Parts must be sorted in PartNumber order. + sort.Sort(u.parts) + + params := u.in.mapCompleteMultipartUploadInput(u.uploadID, u.parts) + + resp, err := u.options.S3.CompleteMultipartUpload(ctx, params, clientOptions...) + if err != nil { + u.seterr(err) + u.fail(ctx) + } + + return resp +} + +func addFeatureUserAgent(stack *smithymiddleware.Stack) error { + ua, err := getOrAddRequestUserAgent(stack) + if err != nil { + return err + } + + ua.AddUserAgentFeature(middleware.UserAgentFeatureS3Transfer) + return nil +} + +func getOrAddRequestUserAgent(stack *smithymiddleware.Stack) (*middleware.RequestUserAgent, error) { + id := (*middleware.RequestUserAgent)(nil).ID() + mw, ok := stack.Build.Get(id) + if !ok { + mw = middleware.NewRequestUserAgent() + if err := stack.Build.Add(mw, smithymiddleware.After); err != nil { + return nil, err + } + } + + ua, ok := mw.(*middleware.RequestUserAgent) + if !ok { + return nil, fmt.Errorf("%T for %s middleware did not match expected type", mw, id) + } + + return ua, nil +} diff --git a/feature/s3/transfermanager/api_op_PutObject_integ_test.go b/feature/s3/transfermanager/api_op_PutObject_integ_test.go new file mode 100644 index 00000000000..921e237e5dc --- /dev/null +++ b/feature/s3/transfermanager/api_op_PutObject_integ_test.go @@ -0,0 +1,24 @@ +//go:build integration +// +build integration + +package transfermanager + +import ( + "bytes" + "strings" + "testing" +) + +func TestInteg_PutObject(t *testing.T) { + cases := map[string]putObjectTestData{ + "seekable body": {Body: strings.NewReader("hello world"), ExpectBody: []byte("hello world")}, + "empty string body": {Body: strings.NewReader(""), ExpectBody: []byte("")}, + "multipart upload body": {Body: bytes.NewReader(largeObjectBuf), ExpectBody: largeObjectBuf}, + } + + for name, c := range cases { + t.Run(name, func(t *testing.T) { + testPutObject(t, setupMetadata.Buckets.Source.Name, c) + }) + } +} diff --git a/feature/s3/transfermanager/dns_cache.go b/feature/s3/transfermanager/dns_cache.go new file mode 100644 index 00000000000..f9f041cbb57 --- /dev/null +++ b/feature/s3/transfermanager/dns_cache.go @@ -0,0 +1,61 @@ +package transfermanager + +import ( + "sync" + "time" + + "github.com/aws/smithy-go/container/private/cache" + "github.com/aws/smithy-go/container/private/cache/lru" +) + +// dnsCache implements an LRU cache of DNS query results by host. +// +// Cache retrievals will automatically rotate between IP addresses for +// multi-value query results. +type dnsCache struct { + mu sync.Mutex + addrs cache.Cache +} + +// newDNSCache returns an initialized dnsCache with given capacity. +func newDNSCache(cap int) *dnsCache { + return &dnsCache{ + addrs: lru.New(cap), + } +} + +// GetAddr returns the next IP address for the given host if present in the +// cache. +func (c *dnsCache) GetAddr(host string) (string, bool) { + c.mu.Lock() + defer c.mu.Unlock() + + v, ok := c.addrs.Get(host) + if !ok { + return "", false + } + + record := v.(*dnsCacheEntry) + if timeNow().After(record.expires) { + return "", false + } + + addr := record.addrs[record.index] + record.index = (record.index + 1) % len(record.addrs) + return addr, true +} + +// PutAddrs stores a DNS query result in the cache, overwriting any present +// entry for the host if it exists. +func (c *dnsCache) PutAddrs(host string, addrs []string, expires time.Time) { + c.mu.Lock() + defer c.mu.Unlock() + + c.addrs.Put(host, &dnsCacheEntry{addrs, expires, 0}) +} + +type dnsCacheEntry struct { + addrs []string + expires time.Time + index int +} diff --git a/feature/s3/transfermanager/go.mod b/feature/s3/transfermanager/go.mod new file mode 100644 index 00000000000..f5cff7bcdd3 --- /dev/null +++ b/feature/s3/transfermanager/go.mod @@ -0,0 +1,61 @@ +module github.com/aws/aws-sdk-go-v2/feature/s3/transfermanager + +go 1.21 + +require ( + github.com/aws/aws-sdk-go-v2 v1.34.0 + github.com/aws/aws-sdk-go-v2/config v1.29.2 + github.com/aws/aws-sdk-go-v2/service/s3 v1.74.1 + github.com/aws/aws-sdk-go-v2/service/sts v1.33.10 + github.com/aws/smithy-go v1.22.2 +) + +require ( + github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.8 // indirect + github.com/aws/aws-sdk-go-v2/credentials v1.17.55 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.25 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.29 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.29 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.8.2 // indirect + github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.29 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.2 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.5.3 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.10 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.10 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.24.12 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.11 // indirect +) + +replace github.com/aws/aws-sdk-go-v2 => ../../../ + +replace github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream => ../../../aws/protocol/eventstream/ + +replace github.com/aws/aws-sdk-go-v2/config => ../../../config/ + +replace github.com/aws/aws-sdk-go-v2/credentials => ../../../credentials/ + +replace github.com/aws/aws-sdk-go-v2/feature/ec2/imds => ../../../feature/ec2/imds/ + +replace github.com/aws/aws-sdk-go-v2/internal/configsources => ../../../internal/configsources/ + +replace github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 => ../../../internal/endpoints/v2/ + +replace github.com/aws/aws-sdk-go-v2/internal/ini => ../../../internal/ini/ + +replace github.com/aws/aws-sdk-go-v2/internal/v4a => ../../../internal/v4a/ + +replace github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding => ../../../service/internal/accept-encoding/ + +replace github.com/aws/aws-sdk-go-v2/service/internal/checksum => ../../../service/internal/checksum/ + +replace github.com/aws/aws-sdk-go-v2/service/internal/presigned-url => ../../../service/internal/presigned-url/ + +replace github.com/aws/aws-sdk-go-v2/service/internal/s3shared => ../../../service/internal/s3shared/ + +replace github.com/aws/aws-sdk-go-v2/service/s3 => ../../../service/s3/ + +replace github.com/aws/aws-sdk-go-v2/service/sso => ../../../service/sso/ + +replace github.com/aws/aws-sdk-go-v2/service/ssooidc => ../../../service/ssooidc/ + +replace github.com/aws/aws-sdk-go-v2/service/sts => ../../../service/sts/ diff --git a/feature/s3/transfermanager/go.sum b/feature/s3/transfermanager/go.sum new file mode 100644 index 00000000000..cad7e6ad46a --- /dev/null +++ b/feature/s3/transfermanager/go.sum @@ -0,0 +1,2 @@ +github.com/aws/smithy-go v1.22.2 h1:6D9hW43xKFrRx/tXXfAlIZc4JI+yQe6snnWcQyxSyLQ= +github.com/aws/smithy-go v1.22.2/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg= diff --git a/feature/s3/transfermanager/go_module_metadata.go b/feature/s3/transfermanager/go_module_metadata.go new file mode 100644 index 00000000000..33f1ddfbdee --- /dev/null +++ b/feature/s3/transfermanager/go_module_metadata.go @@ -0,0 +1,6 @@ +// Code generated by internal/repotools/cmd/updatemodulemeta DO NOT EDIT. + +package transfermanager + +// goModuleVersion is the tagged release for this module +const goModuleVersion = "tip" diff --git a/feature/s3/transfermanager/internal/testing/endpoint.go b/feature/s3/transfermanager/internal/testing/endpoint.go new file mode 100644 index 00000000000..082aedec4f2 --- /dev/null +++ b/feature/s3/transfermanager/internal/testing/endpoint.go @@ -0,0 +1,25 @@ +package testing + +import ( + "context" + "net/url" + + "github.com/aws/aws-sdk-go-v2/service/s3" + smithyendpoints "github.com/aws/smithy-go/endpoints" +) + +// EndpointResolverV2 is a mock s3 endpoint resolver v2 for testing +type EndpointResolverV2 struct { + URL string +} + +// ResolveEndpoint returns the given endpoint url +func (r EndpointResolverV2) ResolveEndpoint(ctx context.Context, params s3.EndpointParameters) (smithyendpoints.Endpoint, error) { + u, err := url.Parse(r.URL) + if err != nil { + return smithyendpoints.Endpoint{}, err + } + return smithyendpoints.Endpoint{ + URI: *u, + }, nil +} diff --git a/feature/s3/transfermanager/internal/testing/upload.go b/feature/s3/transfermanager/internal/testing/upload.go new file mode 100644 index 00000000000..1764fc089e2 --- /dev/null +++ b/feature/s3/transfermanager/internal/testing/upload.go @@ -0,0 +1,193 @@ +package testing + +import ( + "context" + "fmt" + "io" + "io/ioutil" + "net/http" + "net/url" + "slices" + "sync" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" +) + +// UploadLoggingClient is a mock client that can be used to record and stub responses for testing the Uploader. +type UploadLoggingClient struct { + Invocations []string + Params []interface{} + + ConsumeBody bool + + ignoredOperations []string + + PartNum int + m sync.Mutex + + PutObjectFn func(*UploadLoggingClient, *s3.PutObjectInput) (*s3.PutObjectOutput, error) + UploadPartFn func(*UploadLoggingClient, *s3.UploadPartInput) (*s3.UploadPartOutput, error) + CreateMultipartUploadFn func(*UploadLoggingClient, *s3.CreateMultipartUploadInput) (*s3.CreateMultipartUploadOutput, error) + CompleteMultipartUploadFn func(*UploadLoggingClient, *s3.CompleteMultipartUploadInput) (*s3.CompleteMultipartUploadOutput, error) + AbortMultipartUploadFn func(*UploadLoggingClient, *s3.AbortMultipartUploadInput) (*s3.AbortMultipartUploadOutput, error) +} + +func (u *UploadLoggingClient) simulateHTTPClientOption(optFns ...func(*s3.Options)) error { + + o := s3.Options{ + HTTPClient: httpDoFunc(func(r *http.Request) (*http.Response, error) { + return &http.Response{ + Request: r, + }, nil + }), + } + + for _, fn := range optFns { + fn(&o) + } + + _, err := o.HTTPClient.Do(&http.Request{ + URL: &url.URL{ + Scheme: "https", + Host: "mock.amazonaws.com", + Path: "/key", + RawQuery: "foo=bar", + }, + }) + if err != nil { + return err + } + + return nil +} + +type httpDoFunc func(*http.Request) (*http.Response, error) + +func (f httpDoFunc) Do(r *http.Request) (*http.Response, error) { + return f(r) +} + +func (u *UploadLoggingClient) traceOperation(name string, params interface{}) { + if slices.Contains(u.ignoredOperations, name) { + return + } + u.Invocations = append(u.Invocations, name) + u.Params = append(u.Params, params) + +} + +// PutObject is the S3 PutObject API. +func (u *UploadLoggingClient) PutObject(ctx context.Context, params *s3.PutObjectInput, optFns ...func(*s3.Options)) (*s3.PutObjectOutput, error) { + u.m.Lock() + defer u.m.Unlock() + + if u.ConsumeBody { + io.Copy(ioutil.Discard, params.Body) + } + + u.traceOperation("PutObject", params) + + if err := u.simulateHTTPClientOption(optFns...); err != nil { + return nil, err + } + + if u.PutObjectFn != nil { + return u.PutObjectFn(u, params) + } + + return &s3.PutObjectOutput{ + VersionId: aws.String("VERSION-ID"), + }, nil +} + +// UploadPart is the S3 UploadPart API. +func (u *UploadLoggingClient) UploadPart(ctx context.Context, params *s3.UploadPartInput, optFns ...func(*s3.Options)) (*s3.UploadPartOutput, error) { + u.m.Lock() + defer u.m.Unlock() + + if u.ConsumeBody { + io.Copy(ioutil.Discard, params.Body) + } + + u.traceOperation("UploadPart", params) + + if err := u.simulateHTTPClientOption(optFns...); err != nil { + return nil, err + } + + if u.UploadPartFn != nil { + return u.UploadPartFn(u, params) + } + + return &s3.UploadPartOutput{ + ETag: aws.String(fmt.Sprintf("ETAG%d", *params.PartNumber)), + }, nil +} + +// CreateMultipartUpload is the S3 CreateMultipartUpload API. +func (u *UploadLoggingClient) CreateMultipartUpload(ctx context.Context, params *s3.CreateMultipartUploadInput, optFns ...func(*s3.Options)) (*s3.CreateMultipartUploadOutput, error) { + u.m.Lock() + defer u.m.Unlock() + + u.traceOperation("CreateMultipartUpload", params) + + if err := u.simulateHTTPClientOption(optFns...); err != nil { + return nil, err + } + + if u.CreateMultipartUploadFn != nil { + return u.CreateMultipartUploadFn(u, params) + } + + return &s3.CreateMultipartUploadOutput{ + UploadId: aws.String("UPLOAD-ID"), + }, nil +} + +// CompleteMultipartUpload is the S3 CompleteMultipartUpload API. +func (u *UploadLoggingClient) CompleteMultipartUpload(ctx context.Context, params *s3.CompleteMultipartUploadInput, optFns ...func(*s3.Options)) (*s3.CompleteMultipartUploadOutput, error) { + u.m.Lock() + defer u.m.Unlock() + + u.traceOperation("CompleteMultipartUpload", params) + + if err := u.simulateHTTPClientOption(optFns...); err != nil { + return nil, err + } + + if u.CompleteMultipartUploadFn != nil { + return u.CompleteMultipartUploadFn(u, params) + } + + return &s3.CompleteMultipartUploadOutput{ + Location: aws.String("http://location"), + VersionId: aws.String("VERSION-ID"), + }, nil +} + +// AbortMultipartUpload is the S3 AbortMultipartUpload API. +func (u *UploadLoggingClient) AbortMultipartUpload(ctx context.Context, params *s3.AbortMultipartUploadInput, optFns ...func(*s3.Options)) (*s3.AbortMultipartUploadOutput, error) { + u.m.Lock() + defer u.m.Unlock() + + u.traceOperation("AbortMultipartUpload", params) + if err := u.simulateHTTPClientOption(optFns...); err != nil { + return nil, err + } + + if u.AbortMultipartUploadFn != nil { + return u.AbortMultipartUploadFn(u, params) + } + + return &s3.AbortMultipartUploadOutput{}, nil +} + +// NewUploadLoggingClient returns a new UploadLoggingClient. +func NewUploadLoggingClient(ignoredOps []string) (*UploadLoggingClient, *[]string, *[]interface{}) { + c := &UploadLoggingClient{ + ignoredOperations: ignoredOps, + } + + return c, &c.Invocations, &c.Params +} diff --git a/feature/s3/transfermanager/options.go b/feature/s3/transfermanager/options.go new file mode 100644 index 00000000000..a49e74afd64 --- /dev/null +++ b/feature/s3/transfermanager/options.go @@ -0,0 +1,63 @@ +package transfermanager + +import "github.com/aws/aws-sdk-go-v2/feature/s3/transfermanager/types" + +// Options provides params needed for transfer api calls +type Options struct { + // The client to use when uploading to S3. + S3 S3APIClient + + // The buffer size (in bytes) to use when buffering data into chunks and + // sending them as parts to S3. The minimum allowed part size is 5MB, and + // if this value is set to zero, the DefaultUploadPartSize value will be used. + PartSizeBytes int64 + + // The threshold bytes to decide when the file should be multi-uploaded + MultipartUploadThreshold int64 + + // Option to disable checksum validation for download + DisableChecksum bool + + // Checksum algorithm to use for upload + ChecksumAlgorithm types.ChecksumAlgorithm + + // The number of goroutines to spin up in parallel per call to Upload when + // sending parts. If this is set to zero, the DefaultUploadConcurrency value + // will be used. + // + // The concurrency pool is not shared between calls to Upload. + Concurrency int +} + +func (o *Options) init() { +} + +func resolveConcurrency(o *Options) { + if o.Concurrency == 0 { + o.Concurrency = defaultTransferConcurrency + } +} + +func resolvePartSizeBytes(o *Options) { + if o.PartSizeBytes == 0 { + o.PartSizeBytes = minPartSizeBytes + } +} + +func resolveChecksumAlgorithm(o *Options) { + if o.ChecksumAlgorithm == "" { + o.ChecksumAlgorithm = types.ChecksumAlgorithmCrc32 + } +} + +func resolveMultipartUploadThreshold(o *Options) { + if o.MultipartUploadThreshold == 0 { + o.MultipartUploadThreshold = defaultMultipartUploadThreshold + } +} + +// Copy returns new copy of the Options +func (o Options) Copy() Options { + to := o + return to +} diff --git a/feature/s3/transfermanager/pool.go b/feature/s3/transfermanager/pool.go new file mode 100644 index 00000000000..a4a786e2357 --- /dev/null +++ b/feature/s3/transfermanager/pool.go @@ -0,0 +1,62 @@ +package transfermanager + +import ( + "context" + "fmt" +) + +type bytesBufferPool interface { + Get(context.Context) ([]byte, error) + Put([]byte) + Close() +} + +type defaultSlicePool struct { + slices chan []byte +} + +func newDefaultSlicePool(sliceSize int64, capacity int) *defaultSlicePool { + p := &defaultSlicePool{} + + slices := make(chan []byte, capacity) + for i := 0; i < capacity; i++ { + slices <- make([]byte, sliceSize) + } + + p.slices = slices + return p +} + +var errZeroCapacity = fmt.Errorf("get called on zero capacity pool") + +func (p *defaultSlicePool) Get(ctx context.Context) ([]byte, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + + for { + select { + case bs, ok := <-p.slices: + if !ok { + return nil, errZeroCapacity + } + return bs, nil + case <-ctx.Done(): + return nil, ctx.Err() + } + } +} + +func (p *defaultSlicePool) Put(bs []byte) { + p.slices <- bs +} + +func (p *defaultSlicePool) Close() { + close(p.slices) + for range p.slices { + // drain channel + } + p.slices = nil +} diff --git a/feature/s3/transfermanager/pool_test.go b/feature/s3/transfermanager/pool_test.go new file mode 100644 index 00000000000..74a6d9b49b0 --- /dev/null +++ b/feature/s3/transfermanager/pool_test.go @@ -0,0 +1,47 @@ +package transfermanager + +import ( + "context" + "sync" + "testing" +) + +func TestDefaultSlicePool(t *testing.T) { + pool := newDefaultSlicePool(1, 2) + + var bs []byte + var err error + var wg sync.WaitGroup + + for i := 0; i < 200; i++ { + wg.Add(1) + go func() { + defer wg.Done() + pool.Put(make([]byte, 1)) + }() + } + // wait for a slice to be put back + for i := 0; i < 200; i++ { + bs, err = pool.Get(context.Background()) + if err != nil { + t.Errorf("failed to get slice from pool: %v", err) + } + } + + wg.Wait() + + // failed to get a slice due to ctx cancelled + ctx, cancel := context.WithCancel(context.Background()) + cancel() + bs, err = pool.Get(ctx) + if err == nil { + pool.Put(bs) + t.Errorf("expectd no slice to be returned") + } + + if e, a := 2, len(pool.slices); e != a { + t.Errorf("expect pool size to be %v, got %v", e, a) + } + + pool.Close() +} diff --git a/feature/s3/transfermanager/putobject_test.go b/feature/s3/transfermanager/putobject_test.go new file mode 100644 index 00000000000..06cd0ebe149 --- /dev/null +++ b/feature/s3/transfermanager/putobject_test.go @@ -0,0 +1,904 @@ +package transfermanager + +import ( + "bytes" + "context" + "fmt" + "io" + "io/ioutil" + "net/http" + "net/http/httptest" + "os" + "reflect" + "regexp" + "sort" + "strconv" + "strings" + "testing" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/aws/retry" + s3testing "github.com/aws/aws-sdk-go-v2/feature/s3/transfermanager/internal/testing" + "github.com/aws/aws-sdk-go-v2/internal/awstesting" + "github.com/aws/aws-sdk-go-v2/internal/sdk" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/service/s3/types" +) + +// getReaderLength discards the bytes from reader and returns the length +func getReaderLength(r io.Reader) int64 { + n, _ := io.Copy(ioutil.Discard, r) + return n +} + +func TestUploadOrderMulti(t *testing.T) { + c, invocations, args := s3testing.NewUploadLoggingClient(nil) + mgr := New(c, Options{}) + + resp, err := mgr.PutObject(context.Background(), &PutObjectInput{ + Bucket: "Bucket", + Key: "Key - value", + Body: bytes.NewReader(buf20MB), + ServerSideEncryption: "aws:kms", + SSEKMSKeyID: "KmsId", + ContentType: "content/type", + }) + + if err != nil { + t.Errorf("Expected no error but received %v", err) + } + + if diff := cmpDiff([]string{"CreateMultipartUpload", "UploadPart", "UploadPart", "UploadPart", "CompleteMultipartUpload"}, *invocations); len(diff) > 0 { + t.Errorf(diff) + } + + if "UPLOAD-ID" != resp.UploadID { + t.Errorf("expect %q, got %q", "UPLOAD-ID", resp.UploadID) + } + + if "VERSION-ID" != resp.VersionID { + t.Errorf("expect %q, got %q", "VERSION-ID", resp.VersionID) + } + + // Validate input values + + // UploadPart + for i := 1; i < 4; i++ { + v := aws.ToString((*args)[i].(*s3.UploadPartInput).UploadId) + if "UPLOAD-ID" != v { + t.Errorf("Expected %q, but received %q", "UPLOAD-ID", v) + } + } + + // CompleteMultipartUpload + v := aws.ToString((*args)[4].(*s3.CompleteMultipartUploadInput).UploadId) + if "UPLOAD-ID" != v { + t.Errorf("Expected %q, but received %q", "UPLOAD-ID", v) + } + + parts := (*args)[4].(*s3.CompleteMultipartUploadInput).MultipartUpload.Parts + + for i := 0; i < 3; i++ { + num := parts[i].PartNumber + etag := aws.ToString(parts[i].ETag) + + if int32(i+1) != aws.ToInt32(num) { + t.Errorf("expect %d, got %d", i+1, num) + } + + if matched, err := regexp.MatchString(`^ETAG\d+$`, etag); !matched || err != nil { + t.Errorf("Failed regexp expression `^ETAG\\d+$`") + } + } + + // Custom headers + cmu := (*args)[0].(*s3.CreateMultipartUploadInput) + + if e, a := types.ServerSideEncryption("aws:kms"), cmu.ServerSideEncryption; e != a { + t.Errorf("expect %q, got %q", e, a) + } + + if e, a := "KmsId", aws.ToString(cmu.SSEKMSKeyId); e != a { + t.Errorf("expect %q, got %q", e, a) + } + + if e, a := "content/type", aws.ToString(cmu.ContentType); e != a { + t.Errorf("expect %q, got %q", e, a) + } +} + +func TestUploadOrderMultiDifferentPartSize(t *testing.T) { + c, ops, args := s3testing.NewUploadLoggingClient(nil) + mgr := New(c, Options{ + PartSizeBytes: 1024 * 1024 * 11, + Concurrency: 1, + }) + + _, err := mgr.PutObject(context.Background(), &PutObjectInput{ + Bucket: "Bucket", + Key: "Key", + Body: bytes.NewReader(buf20MB), + }) + + if err != nil { + t.Errorf("expect no error, got %v", err) + } + + vals := []string{"CreateMultipartUpload", "UploadPart", "UploadPart", "CompleteMultipartUpload"} + if !reflect.DeepEqual(vals, *ops) { + t.Errorf("expect %v, got %v", vals, *ops) + } + + // Part lengths + if len := getReaderLength((*args)[1].(*s3.UploadPartInput).Body); 1024*1024*11 != len { + t.Errorf("expect %d, got %d", 1024*1024*7, len) + } + if len := getReaderLength((*args)[2].(*s3.UploadPartInput).Body); 1024*1024*9 != len { + t.Errorf("expect %d, got %d", 1024*1024*5, len) + } +} + +func TestUploadFailIfPartSizeTooSmall(t *testing.T) { + mgr := New(s3.New(s3.Options{}), Options{}, + func(o *Options) { + o.PartSizeBytes = 5 + }) + resp, err := mgr.PutObject(context.Background(), &PutObjectInput{ + Bucket: "Bucket", + Key: "Key", + Body: bytes.NewReader(buf20MB), + }) + + if resp != nil { + t.Errorf("Expected response to be nil, but received %v", resp) + } + + if err == nil { + t.Errorf("Expected error, but received nil") + } + if e, a := "part size must be at least", err.Error(); !strings.Contains(a, e) { + t.Errorf("expect %v to be in %v", e, a) + } +} + +func TestUploadOrderSingle(t *testing.T) { + c, invocations, params := s3testing.NewUploadLoggingClient(nil) + mgr := New(c, Options{}) + resp, err := mgr.PutObject(context.Background(), &PutObjectInput{ + Bucket: "Bucket", + Key: "Key - value", + Body: bytes.NewReader(buf2MB), + ServerSideEncryption: "aws:kms", + SSEKMSKeyID: "KmsId", + ContentType: "content/type", + }) + + if err != nil { + t.Errorf("expect no error but received %v", err) + } + + if diff := cmpDiff([]string{"PutObject"}, *invocations); len(diff) > 0 { + t.Error(diff) + } + + if e := "VERSION-ID"; e != resp.VersionID { + t.Errorf("expect %q, got %q", e, resp.VersionID) + } + + if len(resp.UploadID) > 0 { + t.Errorf("expect empty string, got %q", resp.UploadID) + } + + putObjectInput := (*params)[0].(*s3.PutObjectInput) + + if e, a := types.ServerSideEncryption("aws:kms"), putObjectInput.ServerSideEncryption; e != a { + t.Errorf("expect %q, got %q", e, a) + } + + if e, a := "KmsId", aws.ToString(putObjectInput.SSEKMSKeyId); e != a { + t.Errorf("expect %q, got %q", e, a) + } + + if e, a := "content/type", aws.ToString(putObjectInput.ContentType); e != a { + t.Errorf("Expected %q, but received %q", e, a) + } +} + +func TestUploadSingleFailure(t *testing.T) { + c, invocations, _ := s3testing.NewUploadLoggingClient(nil) + + c.PutObjectFn = func(*s3testing.UploadLoggingClient, *s3.PutObjectInput) (*s3.PutObjectOutput, error) { + return nil, fmt.Errorf("put object failure") + } + + mgr := New(c, Options{}) + resp, err := mgr.PutObject(context.Background(), &PutObjectInput{ + Bucket: "Bucket", + Key: "Key", + Body: bytes.NewReader(buf2MB), + }) + + if err == nil { + t.Error("expect error, got nil") + } + + if diff := cmpDiff([]string{"PutObject"}, *invocations); len(diff) > 0 { + t.Error(diff) + } + + if resp != nil { + t.Errorf("expect response to be nil, got %v", resp) + } +} + +func TestUploadOrderZero(t *testing.T) { + c, invocations, params := s3testing.NewUploadLoggingClient(nil) + mgr := New(c, Options{}) + resp, err := mgr.PutObject(context.Background(), &PutObjectInput{ + Bucket: "Bucket", + Key: "Key", + Body: bytes.NewReader(make([]byte, 0)), + }) + + if err != nil { + t.Errorf("expect no error, got %v", err) + } + + if diff := cmpDiff([]string{"PutObject"}, *invocations); len(diff) > 0 { + t.Error(diff) + } + + if len(resp.UploadID) > 0 { + t.Errorf("expect empty string, got %q", resp.UploadID) + } + + if e, a := int64(0), getReaderLength((*params)[0].(*s3.PutObjectInput).Body); e != a { + t.Errorf("Expected %d, but received %d", e, a) + } +} + +func TestUploadOrderMultiFailure(t *testing.T) { + c, invocations, _ := s3testing.NewUploadLoggingClient(nil) + + c.UploadPartFn = func(u *s3testing.UploadLoggingClient, params *s3.UploadPartInput) (*s3.UploadPartOutput, error) { + if *params.PartNumber == 2 { + return nil, fmt.Errorf("an unexpected error") + } + return &s3.UploadPartOutput{ETag: aws.String(fmt.Sprintf("ETAG%d", u.PartNum))}, nil + } + + mgr := New(c, Options{}, func(o *Options) { + o.Concurrency = 1 + }) + _, err := mgr.PutObject(context.Background(), &PutObjectInput{ + Bucket: "Bucket", + Key: "Key", + Body: bytes.NewReader(buf20MB), + }) + + if err == nil { + t.Error("expect error, got nil") + } + + if diff := cmpDiff([]string{"CreateMultipartUpload", "UploadPart", "UploadPart", "AbortMultipartUpload"}, *invocations); len(diff) > 0 { + t.Error(diff) + } +} + +func TestUploadOrderMultiFailureOnComplete(t *testing.T) { + c, invocations, _ := s3testing.NewUploadLoggingClient(nil) + + c.CompleteMultipartUploadFn = func(*s3testing.UploadLoggingClient, *s3.CompleteMultipartUploadInput) (*s3.CompleteMultipartUploadOutput, error) { + return nil, fmt.Errorf("complete multipart error") + } + + mgr := New(c, Options{}, func(o *Options) { + o.Concurrency = 1 + }) + _, err := mgr.PutObject(context.Background(), &PutObjectInput{ + Bucket: "Bucket", + Key: "Key", + Body: bytes.NewReader(buf20MB), + }) + + if err == nil { + t.Error("expect error, got nil") + } + + if diff := cmpDiff([]string{"CreateMultipartUpload", "UploadPart", "UploadPart", "UploadPart", + "CompleteMultipartUpload", "AbortMultipartUpload"}, *invocations); len(diff) > 0 { + t.Error(diff) + } +} + +func TestUploadOrderMultiFailureOnCreate(t *testing.T) { + c, invocations, _ := s3testing.NewUploadLoggingClient(nil) + + c.CreateMultipartUploadFn = func(*s3testing.UploadLoggingClient, *s3.CreateMultipartUploadInput) (*s3.CreateMultipartUploadOutput, error) { + return nil, fmt.Errorf("create multipart upload failure") + } + + mgr := New(c, Options{}) + _, err := mgr.PutObject(context.Background(), &PutObjectInput{ + Bucket: "Bucket", + Key: "Key", + Body: bytes.NewReader(make([]byte, 1024*1024*12)), + }) + + if err == nil { + t.Error("expect error, got nil") + } + + if diff := cmpDiff([]string{"CreateMultipartUpload"}, *invocations); len(diff) > 0 { + t.Error(diff) + } +} + +type failreader struct { + times int + failCount int +} + +func (f *failreader) Read(b []byte) (int, error) { + f.failCount++ + if f.failCount >= f.times { + return 0, fmt.Errorf("random failure") + } + return len(b), nil +} + +func TestUploadOrderReadFail1(t *testing.T) { + c, invocations, _ := s3testing.NewUploadLoggingClient(nil) + mgr := New(c, Options{}) + _, err := mgr.PutObject(context.Background(), &PutObjectInput{ + Bucket: "Bucket", + Key: "Key", + Body: &failreader{times: 1}, + }) + if err == nil { + t.Fatalf("expect error to not be nil") + } + + if e, a := "random failure", err.Error(); !strings.Contains(a, e) { + t.Errorf("expect %v, got %v", e, a) + } + + if diff := cmpDiff([]string(nil), *invocations); len(diff) > 0 { + t.Error(diff) + } +} + +func TestUploadOrderReadFail2(t *testing.T) { + c, invocations, _ := s3testing.NewUploadLoggingClient([]string{"UploadPart"}) + mgr := New(c, Options{}, func(o *Options) { + o.Concurrency = 1 + }) + _, err := mgr.PutObject(context.Background(), &PutObjectInput{ + Bucket: "Bucket", + Key: "Key", + Body: &failreader{times: 2}, + }) + if err == nil { + t.Fatalf("expect error to not be nil") + } + + if e, a := "random failure", err.Error(); !strings.Contains(a, e) { + t.Errorf("expect %v, got %q", e, a) + } + + if diff := cmpDiff([]string{"CreateMultipartUpload", "AbortMultipartUpload"}, *invocations); len(diff) > 0 { + t.Error(diff) + } +} + +type sizedReader struct { + size int + cur int + err error +} + +func (s *sizedReader) Read(p []byte) (n int, err error) { + if s.cur >= s.size { + if s.err == nil { + s.err = io.EOF + } + return 0, s.err + } + + n = len(p) + s.cur += len(p) + if s.cur > s.size { + n -= s.cur - s.size + } + + return n, err +} + +func TestUploadOrderMultiBufferedReader(t *testing.T) { + c, invocations, params := s3testing.NewUploadLoggingClient(nil) + mgr := New(c, Options{}) + _, err := mgr.PutObject(context.Background(), &PutObjectInput{ + Bucket: "Bucket", + Key: "Key", + Body: &sizedReader{size: 1024 * 1024 * 21}, + }) + if err != nil { + t.Errorf("expect no error, got %v", err) + } + + if diff := cmpDiff([]string{"CreateMultipartUpload", "UploadPart", "UploadPart", + "UploadPart", "CompleteMultipartUpload"}, *invocations); len(diff) > 0 { + t.Error(diff) + } + + // Part lengths + var parts []int64 + for i := 1; i <= 3; i++ { + parts = append(parts, getReaderLength((*params)[i].(*s3.UploadPartInput).Body)) + } + sort.Slice(parts, func(i, j int) bool { + return parts[i] < parts[j] + }) + + if diff := cmpDiff([]int64{1024 * 1024 * 5, 1024 * 1024 * 8, 1024 * 1024 * 8}, parts); len(diff) > 0 { + t.Error(diff) + } +} + +func TestUploadOrderMultiBufferedReaderPartial(t *testing.T) { + c, invocations, params := s3testing.NewUploadLoggingClient(nil) + mgr := New(c, Options{}) + _, err := mgr.PutObject(context.Background(), &PutObjectInput{ + Bucket: "Bucket", + Key: "Key", + Body: &sizedReader{size: 1024 * 1024 * 21, err: io.EOF}, + }) + if err != nil { + t.Errorf("expect no error, got %v", err) + } + + if diff := cmpDiff([]string{"CreateMultipartUpload", "UploadPart", "UploadPart", + "UploadPart", "CompleteMultipartUpload"}, *invocations); len(diff) > 0 { + t.Error(diff) + } + + // Part lengths + var parts []int64 + for i := 1; i <= 3; i++ { + parts = append(parts, getReaderLength((*params)[i].(*s3.UploadPartInput).Body)) + } + sort.Slice(parts, func(i, j int) bool { + return parts[i] < parts[j] + }) + + if diff := cmpDiff([]int64{1024 * 1024 * 5, 1024 * 1024 * 8, 1024 * 1024 * 8}, parts); len(diff) > 0 { + t.Error(diff) + } +} + +// TestUploadOrderMultiBufferedReaderEOF tests the edge case where the +// file size is the same as part size. +func TestUploadOrderMultiBufferedReaderEOF(t *testing.T) { + c, invocations, params := s3testing.NewUploadLoggingClient(nil) + mgr := New(c, Options{}) + _, err := mgr.PutObject(context.Background(), &PutObjectInput{ + Bucket: "Bucket", + Key: "Key", + Body: &sizedReader{size: 1024 * 1024 * 16, err: io.EOF}, + }) + + if err != nil { + t.Errorf("expect no error, got %v", err) + } + + if diff := cmpDiff([]string{"CreateMultipartUpload", "UploadPart", "UploadPart", "CompleteMultipartUpload"}, *invocations); len(diff) > 0 { + t.Error(diff) + } + + // Part lengths + var parts []int64 + for i := 1; i <= 2; i++ { + parts = append(parts, getReaderLength((*params)[i].(*s3.UploadPartInput).Body)) + } + sort.Slice(parts, func(i, j int) bool { + return parts[i] < parts[j] + }) + + if diff := cmpDiff([]int64{1024 * 1024 * 8, 1024 * 1024 * 8}, parts); len(diff) > 0 { + t.Error(diff) + } +} + +func TestUploadOrderSingleBufferedReader(t *testing.T) { + c, invocations, _ := s3testing.NewUploadLoggingClient(nil) + mgr := New(c, Options{}) + resp, err := mgr.PutObject(context.Background(), &PutObjectInput{ + Bucket: "Bucket", + Key: "Key", + Body: &sizedReader{size: 1024 * 1024 * 2}, + }) + + if err != nil { + t.Errorf("expect no error, got %v", err) + } + + if diff := cmpDiff([]string{"PutObject"}, *invocations); len(diff) > 0 { + t.Error(diff) + } + + if len(resp.UploadID) > 0 { + t.Errorf("expect no value, got %q", resp.UploadID) + } +} + +func TestUploadZeroLenObject(t *testing.T) { + c, invocations, _ := s3testing.NewUploadLoggingClient(nil) + + mgr := New(c, Options{}) + resp, err := mgr.PutObject(context.Background(), &PutObjectInput{ + Bucket: "Bucket", + Key: "Key", + Body: strings.NewReader(""), + }) + + if err != nil { + t.Errorf("expect no error but received %v", err) + } + if diff := cmpDiff([]string{"PutObject"}, *invocations); len(diff) > 0 { + t.Errorf("expect request to have been made, but was not, %v", diff) + } + + if len(resp.UploadID) > 0 { + t.Errorf("expect empty string, but received %q", resp.UploadID) + } +} + +type testIncompleteReader struct { + Size int64 + read int64 +} + +func (r *testIncompleteReader) Read(p []byte) (n int, err error) { + r.read += int64(len(p)) + if r.read >= r.Size { + return int(r.read - r.Size), io.ErrUnexpectedEOF + } + return len(p), nil +} + +func TestUploadUnexpectedEOF(t *testing.T) { + c, invocations, _ := s3testing.NewUploadLoggingClient(nil) + mgr := New(c, Options{}, func(o *Options) { + o.Concurrency = 1 + o.PartSizeBytes = minPartSizeBytes + }) + _, err := mgr.PutObject(context.Background(), &PutObjectInput{ + Bucket: "Bucket", + Key: "Key", + Body: &testIncompleteReader{ + Size: minPartSizeBytes + 1, + }, + }) + if err == nil { + t.Error("expect error, got nil") + } + + // Ensure upload started. + if e, a := "CreateMultipartUpload", (*invocations)[0]; e != a { + t.Errorf("expect %q, got %q", e, a) + } + + // Part may or may not be sent because of timing of sending parts and + // reading next part in upload manager. Just check for the last abort. + if e, a := "AbortMultipartUpload", (*invocations)[len(*invocations)-1]; e != a { + t.Errorf("expect %q, got %q", e, a) + } +} + +func TestSSE(t *testing.T) { + c, _, _ := s3testing.NewUploadLoggingClient(nil) + c.UploadPartFn = func(u *s3testing.UploadLoggingClient, params *s3.UploadPartInput) (*s3.UploadPartOutput, error) { + if params.SSECustomerAlgorithm == nil { + t.Fatal("SSECustomerAlgoritm should not be nil") + } + if params.SSECustomerKey == nil { + t.Fatal("SSECustomerKey should not be nil") + } + return &s3.UploadPartOutput{ETag: aws.String(fmt.Sprintf("ETAG%d", u.PartNum))}, nil + } + + mgr := New(c, Options{}, func(o *Options) { + o.Concurrency = 5 + }) + + _, err := mgr.PutObject(context.Background(), &PutObjectInput{ + Bucket: "Bucket", + Key: "Key", + SSECustomerAlgorithm: "AES256", + SSECustomerKey: "foo", + Body: bytes.NewBuffer(make([]byte, 1024*1024*10)), + }) + + if err != nil { + t.Fatal("Expected no error, but received" + err.Error()) + } +} + +func TestUploadWithContextCanceled(t *testing.T) { + c := s3.New(s3.Options{ + UsePathStyle: true, + Region: "mock-region", + }) + u := New(c, Options{}) + + ctx := &awstesting.FakeContext{DoneCh: make(chan struct{})} + ctx.Error = fmt.Errorf("context canceled") + close(ctx.DoneCh) + + _, err := u.PutObject(ctx, &PutObjectInput{ + Bucket: "Bucket", + Key: "Key", + Body: bytes.NewReader(make([]byte, 0)), + }) + if err == nil { + t.Fatalf("expect error, got nil") + } + + if e, a := "canceled", err.Error(); !strings.Contains(a, e) { + t.Errorf("expected error message to contain %q, but did not %q", e, a) + } +} + +func TestUploadRetry(t *testing.T) { + const part, retries = 3, 10 + testFile, testFileCleanup, err := createTempFile(t, minPartSizeBytes*part) + if err != nil { + t.Fatalf("failed to create test file, %v", err) + } + defer testFileCleanup(t) + + cases := map[string]struct { + Body io.Reader + PartHandlers func(testing.TB) []http.Handler + }{ + "bytes.Buffer": { + Body: bytes.NewBuffer(make([]byte, minPartSizeBytes*part)), + PartHandlers: func(tb testing.TB) []http.Handler { + return buildFailHandlers(tb, part, retries) + }, + }, + "bytes.Reader": { + Body: bytes.NewReader(make([]byte, minPartSizeBytes*part)), + PartHandlers: func(tb testing.TB) []http.Handler { + return buildFailHandlers(tb, part, retries) + }, + }, + "os.File": { + Body: testFile, + PartHandlers: func(tb testing.TB) []http.Handler { + return buildFailHandlers(tb, part, retries) + }, + }, + } + + for name, c := range cases { + t.Run(name, func(t *testing.T) { + restoreSleep := sdk.TestingUseNopSleep() + defer restoreSleep() + + mux := newMockS3UploadServer(t, c.PartHandlers(t)) + server := httptest.NewServer(mux) + defer server.Close() + + client := s3.New(s3.Options{ + EndpointResolverV2: s3testing.EndpointResolverV2{URL: server.URL}, + UsePathStyle: true, + Retryer: retry.NewStandard(func(o *retry.StandardOptions) { + o.MaxAttempts = retries + 1 + }), + }) + + uploader := New(client, Options{}) + _, err := uploader.PutObject(context.Background(), &PutObjectInput{ + Bucket: "bucket", + Key: "key", + Body: c.Body, + }) + + if err != nil { + t.Fatalf("expect no error, got %v", err) + } + }) + } +} + +func newMockS3UploadServer(tb testing.TB, partHandler []http.Handler) *mockS3UploadServer { + s := &mockS3UploadServer{ + ServeMux: http.NewServeMux(), + partHandlers: partHandler, + tb: tb, + } + + s.HandleFunc("/", s.handleRequest) + + return s +} + +func buildFailHandlers(tb testing.TB, part, retry int) []http.Handler { + handlers := make([]http.Handler, part) + + for i := 0; i < part; i++ { + handlers[i] = &failPartHandler{ + tb: tb, + failLeft: retry, + successPartHandler: &successPartHandler{tb: tb}, + } + } + + return handlers +} + +type mockS3UploadServer struct { + *http.ServeMux + + tb testing.TB + partHandlers []http.Handler +} + +func (s mockS3UploadServer) handleRequest(w http.ResponseWriter, r *http.Request) { + defer func() { + if err := r.Body.Close(); err != nil { + failRequest(w, 0, "BodyCloseError", fmt.Sprintf("request body close error: %v", err)) + } + }() + + _, hasUploads := r.URL.Query()["uploads"] + + switch { + case r.Method == "POST" && hasUploads: + // CreateMultipartUpload request + w.Header().Set("Content-Length", strconv.Itoa(len(createUploadResp))) + w.Write([]byte(createUploadResp)) + case r.Method == "PUT": + partStr := r.URL.Query().Get("partNumber") + part, err := strconv.ParseInt(partStr, 10, 64) + if err != nil { + failRequest(w, 400, "BadRequest", fmt.Sprintf("unable to parse partNumber, %q, %v", partStr, err)) + return + } + if part <= 0 || part > int64(len(s.partHandlers)) { + failRequest(w, 400, "BadRequest", fmt.Sprintf("invalid partNumber %v", part)) + return + } + s.partHandlers[part-1].ServeHTTP(w, r) + case r.Method == "POST": + // CompleteMultipartUpload request + w.Header().Set("Content-Length", strconv.Itoa(len(completeUploadResp))) + w.Write([]byte(completeUploadResp)) + case r.Method == "DELETE": + w.Header().Set("Content-Length", strconv.Itoa(len(abortUploadResp))) + w.Write([]byte(abortUploadResp)) + w.WriteHeader(200) + default: + failRequest(w, 400, "BadRequest", fmt.Sprintf("invalid request %v %v", r.Method, r.URL)) + } +} + +func createTempFile(t *testing.T, size int64) (*os.File, func(*testing.T), error) { + file, err := ioutil.TempFile(os.TempDir(), aws.SDKName+t.Name()) + if err != nil { + return nil, nil, err + } + filename := file.Name() + if err := file.Truncate(size); err != nil { + return nil, nil, err + } + + return file, + func(t *testing.T) { + if err := file.Close(); err != nil { + t.Errorf("failed to close temp file, %s, %v", filename, err) + } + if err := os.Remove(filename); err != nil { + t.Errorf("failed to remove temp file, %s, %v", filename, err) + } + }, + nil +} + +type failPartHandler struct { + tb testing.TB + failLeft int + successPartHandler http.Handler +} + +func (h *failPartHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + defer func() { + if err := r.Body.Close(); err != nil { + failRequest(w, 0, "BodyCloseError", fmt.Sprintf("request body close error: %v", err)) + } + }() + + if h.failLeft == 0 && h.successPartHandler != nil { + h.successPartHandler.ServeHTTP(w, r) + return + } + + io.Copy(ioutil.Discard, r.Body) + failRequest(w, 500, "InternalException", fmt.Sprintf("mock error, partNumber %v", r.URL.Query().Get("partNumber"))) + h.failLeft-- +} + +func failRequest(w http.ResponseWriter, status int, code, msg string) { + msg = fmt.Sprintf(baseRequestErrorResp, code, msg) + w.Header().Set("Content-Length", strconv.Itoa(len(msg))) + w.WriteHeader(status) + w.Write([]byte(msg)) +} + +type successPartHandler struct { + tb testing.TB +} + +func (h *successPartHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + defer func() { + if err := r.Body.Close(); err != nil { + failRequest(w, 0, "BodyCloseError", fmt.Sprintf("request body close error: %v", err)) + } + }() + + n, err := io.Copy(ioutil.Discard, r.Body) + if err != nil { + failRequest(w, 400, "BadRequest", fmt.Sprintf("failed to read body, %v", err)) + return + } + contentLength := r.Header.Get("Content-Length") + expectLength, err := strconv.ParseInt(contentLength, 10, 64) + if err != nil { + h.tb.Logf("expect content-length, got %q, %v", contentLength, err) + failRequest(w, 400, "BadRequest", fmt.Sprintf("unable to get content-length %v", err)) + return + } + + if e, a := expectLength, n; e != a { + h.tb.Logf("expect content-length to be %v, got %v", e, a) + failRequest(w, 400, "BadRequest", fmt.Sprintf("content-length and body do not match, %v, %v", e, a)) + return + } + + w.Header().Set("Content-Length", strconv.Itoa(len(uploadPartResp))) + w.Write([]byte(uploadPartResp)) +} + +const createUploadResp = ` + bucket + key + abc123 +` + +const uploadPartResp = ` + key +` +const baseRequestErrorResp = ` + %s + %s + request-id + host-id +` + +const completeUploadResp = ` + bucket + key + key + https://bucket.us-west-2.amazonaws.com/key + abc123 +` + +const abortUploadResp = `` + +func cmpDiff(e, a interface{}) string { + if !reflect.DeepEqual(e, a) { + return fmt.Sprintf("%v != %v", e, a) + } + return "" +} diff --git a/feature/s3/transfermanager/rrdns.go b/feature/s3/transfermanager/rrdns.go new file mode 100644 index 00000000000..7fac93a17df --- /dev/null +++ b/feature/s3/transfermanager/rrdns.go @@ -0,0 +1,160 @@ +package transfermanager + +import ( + "context" + "fmt" + "net" + "net/http" + "sync" + "time" + + "github.com/aws/aws-sdk-go-v2/internal/sync/singleflight" +) + +var timeNow = time.Now + +// WithRoundRobinDNS configures an http.Transport to spread HTTP connections +// across multiple IP addresses for a given host. +// +// This is recommended by the [S3 performance guide] in high-concurrency +// application environments. +// +// WithRoundRobinDNS wraps the underlying DialContext hook on http.Transport. +// Future modifications to this hook MUST preserve said wrapping in order for +// round-robin DNS to operate. +// +// [S3 performance guide]: https://docs.aws.amazon.com/AmazonS3/latest/userguide/optimizing-performance-design-patterns.html +func WithRoundRobinDNS(opts ...func(*RoundRobinDNSOptions)) func(*http.Transport) { + options := &RoundRobinDNSOptions{ + TTL: 30 * time.Second, + MaxHosts: 100, + } + for _, opt := range opts { + opt(options) + } + + return func(t *http.Transport) { + rr := &rrDNS{ + cache: newDNSCache(options.MaxHosts), + expiry: options.TTL, + resolver: &net.Resolver{}, + dialContext: t.DialContext, + } + t.DialContext = rr.DialContext + } +} + +// RoundRobinDNSOptions configures use of round-robin DNS. +type RoundRobinDNSOptions struct { + // The length of time for which the results of a DNS query are valid. + TTL time.Duration + + // A limit to the number of DNS query results, cached by hostname, which are + // stored. Round-robin DNS uses an LRU cache. + MaxHosts int +} + +type resolver interface { + LookupHost(context.Context, string) ([]string, error) +} + +type rrDNS struct { + sf singleflight.Group + cache *dnsCache + + expiry time.Duration + resolver resolver + + dialContext func(ctx context.Context, network, addr string) (net.Conn, error) +} + +// DialContext implements the DialContext hook used by http.Transport, +// pre-caching IP addresses for a given host and distributing them evenly +// across new connections. +func (r *rrDNS) DialContext(ctx context.Context, network, addr string) (net.Conn, error) { + host, port, err := net.SplitHostPort(addr) + if err != nil { + return nil, fmt.Errorf("rrdns split host/port: %w", err) + } + + ipaddr, err := r.getAddr(ctx, host) + if err != nil { + return nil, fmt.Errorf("rrdns lookup host: %w", err) + } + + return r.dialContext(ctx, network, net.JoinHostPort(ipaddr, port)) +} + +func (r *rrDNS) getAddr(ctx context.Context, host string) (string, error) { + addr, ok := r.cache.GetAddr(host) + if ok { + return addr, nil + } + return r.lookupHost(ctx, host) +} + +func (r *rrDNS) lookupHost(ctx context.Context, host string) (string, error) { + ch := r.sf.DoChan(host, func() (interface{}, error) { + addrs, err := r.resolver.LookupHost(ctx, host) + if err != nil { + return nil, err + } + + expires := timeNow().Add(r.expiry) + r.cache.PutAddrs(host, addrs, expires) + return nil, nil + }) + + select { + case result := <-ch: + if result.Err != nil { + return "", result.Err + } + + addr, _ := r.cache.GetAddr(host) + return addr, nil + case <-ctx.Done(): + return "", ctx.Err() + } +} + +// WithRotoDialer configures an http.Transport to cycle through multiple local +// network addresses when creating new HTTP connections. +// +// WithRotoDialer REPLACES the root DialContext hook on the underlying +// Transport, thereby destroying any previously-applied wrappings around it. If +// the caller needs to apply additional decorations to the DialContext hook, +// they must do so after applying WithRotoDialer. +func WithRotoDialer(addrs []net.Addr) func(*http.Transport) { + return func(t *http.Transport) { + var dialers []*net.Dialer + for _, addr := range addrs { + dialers = append(dialers, &net.Dialer{ + LocalAddr: addr, + }) + } + + t.DialContext = (&rotoDialer{ + dialers: dialers, + }).DialContext + } +} + +type rotoDialer struct { + mu sync.Mutex + dialers []*net.Dialer + index int +} + +func (r *rotoDialer) DialContext(ctx context.Context, network, addr string) (net.Conn, error) { + return r.next().DialContext(ctx, network, addr) +} + +func (r *rotoDialer) next() *net.Dialer { + r.mu.Lock() + defer r.mu.Unlock() + + d := r.dialers[r.index] + r.index = (r.index + 1) % len(r.dialers) + return d +} diff --git a/feature/s3/transfermanager/rrdns_test.go b/feature/s3/transfermanager/rrdns_test.go new file mode 100644 index 00000000000..c6e31c0cb28 --- /dev/null +++ b/feature/s3/transfermanager/rrdns_test.go @@ -0,0 +1,166 @@ +package transfermanager + +import ( + "context" + "errors" + "net" + "testing" + "time" +) + +// these tests also cover the cache impl (cycling+expiry+evict) + +type mockNow struct { + now time.Time +} + +func (m *mockNow) Now() time.Time { + return m.now +} + +func (m *mockNow) Add(d time.Duration) { + m.now = m.now.Add(d) +} + +func useMockNow(m *mockNow) func() { + timeNow = m.Now + return func() { + timeNow = time.Now + } +} + +var errDialContextOK = errors.New("dial context ok") + +type mockResolver struct { + addrs map[string][]string + err error +} + +func (m *mockResolver) LookupHost(ctx context.Context, host string) ([]string, error) { + return m.addrs[host], m.err +} + +type mockDialContext struct { + calledWith string +} + +func (m *mockDialContext) DialContext(ctx context.Context, network, addr string) (net.Conn, error) { + m.calledWith = addr + return nil, errDialContextOK +} + +func TestRoundRobinDNS_CycleIPs(t *testing.T) { + restore := useMockNow(&mockNow{}) + defer restore() + + addrs := []string{"0.0.0.1", "0.0.0.2", "0.0.0.3"} + r := &mockResolver{ + addrs: map[string][]string{ + "s3.us-east-1.amazonaws.com": addrs, + }, + } + dc := &mockDialContext{} + + rr := &rrDNS{ + cache: newDNSCache(1), + resolver: r, + dialContext: dc.DialContext, + } + + expectDialContext(t, rr, dc, "s3.us-east-1.amazonaws.com", addrs[0]) + expectDialContext(t, rr, dc, "s3.us-east-1.amazonaws.com", addrs[1]) + expectDialContext(t, rr, dc, "s3.us-east-1.amazonaws.com", addrs[2]) + expectDialContext(t, rr, dc, "s3.us-east-1.amazonaws.com", addrs[0]) +} + +func TestRoundRobinDNS_MultiIP(t *testing.T) { + restore := useMockNow(&mockNow{}) + defer restore() + + r := &mockResolver{ + addrs: map[string][]string{ + "host1.com": {"0.0.0.1", "0.0.0.2", "0.0.0.3"}, + "host2.com": {"1.0.0.1", "1.0.0.2", "1.0.0.3"}, + }, + } + dc := &mockDialContext{} + + rr := &rrDNS{ + cache: newDNSCache(2), + resolver: r, + dialContext: dc.DialContext, + } + + expectDialContext(t, rr, dc, "host1.com", r.addrs["host1.com"][0]) + expectDialContext(t, rr, dc, "host2.com", r.addrs["host2.com"][0]) + expectDialContext(t, rr, dc, "host1.com", r.addrs["host1.com"][1]) + expectDialContext(t, rr, dc, "host2.com", r.addrs["host2.com"][1]) +} + +func TestRoundRobinDNS_MaxHosts(t *testing.T) { + restore := useMockNow(&mockNow{}) + defer restore() + + r := &mockResolver{ + addrs: map[string][]string{ + "host1.com": {"0.0.0.1", "0.0.0.2", "0.0.0.3"}, + "host2.com": {"0.0.0.1", "0.0.0.2", "0.0.0.3"}, + }, + } + dc := &mockDialContext{} + + rr := &rrDNS{ + cache: newDNSCache(1), + resolver: r, + dialContext: dc.DialContext, + } + + expectDialContext(t, rr, dc, "host1.com", r.addrs["host1.com"][0]) + expectDialContext(t, rr, dc, "host1.com", r.addrs["host1.com"][1]) + expectDialContext(t, rr, dc, "host2.com", r.addrs["host2.com"][0]) // evicts host1 + expectDialContext(t, rr, dc, "host1.com", r.addrs["host1.com"][0]) // evicts host2 + expectDialContext(t, rr, dc, "host2.com", r.addrs["host2.com"][0]) +} + +func TestRoundRobinDNS_Expires(t *testing.T) { + now := &mockNow{time.Unix(0, 0)} + restore := useMockNow(now) + defer restore() + + r := &mockResolver{ + addrs: map[string][]string{ + "host1.com": {"0.0.0.1", "0.0.0.2", "0.0.0.3"}, + }, + } + dc := &mockDialContext{} + + rr := &rrDNS{ + cache: newDNSCache(2), + expiry: 30, + resolver: r, + dialContext: dc.DialContext, + } + + expectDialContext(t, rr, dc, "host1.com", r.addrs["host1.com"][0]) + now.Add(16) // hasn't expired + expectDialContext(t, rr, dc, "host1.com", r.addrs["host1.com"][1]) + now.Add(16) // expired, starts over + expectDialContext(t, rr, dc, "host1.com", r.addrs["host1.com"][0]) +} + +func expectDialContext(t *testing.T, rr *rrDNS, dc *mockDialContext, host, expect string) { + const port = "443" + + t.Helper() + _, err := rr.DialContext(context.Background(), "", net.JoinHostPort(host, port)) + if err != errDialContextOK { + t.Errorf("expect sentinel err, got %v", err) + } + actual, _, err := net.SplitHostPort(dc.calledWith) + if err != nil { + t.Fatal(err) + } + if expect != actual { + t.Errorf("expect addr %s, got %s", expect, actual) + } +} diff --git a/feature/s3/transfermanager/setup_integ_test.go b/feature/s3/transfermanager/setup_integ_test.go new file mode 100644 index 00000000000..d8efb29a220 --- /dev/null +++ b/feature/s3/transfermanager/setup_integ_test.go @@ -0,0 +1,440 @@ +//go:build integration +// +build integration + +package transfermanager + +import ( + "bytes" + "context" + "crypto/rand" + "crypto/tls" + "flag" + "fmt" + "io" + "io/ioutil" + "net/http" + "os" + "strings" + "testing" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/aws/arn" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/service/s3/types" + "github.com/aws/aws-sdk-go-v2/service/sts" +) + +var setupMetadata = struct { + AccountID string + Region string + Buckets struct { + Source struct { + Name string + ARN string + } + } +}{} + +// s3 client to use for integ testing +var s3Client *s3.Client + +// s3TransferManagerClient to use for integ testing +var s3TransferManagerClient *Client + +// sts client to use for integ testing +var stsClient *sts.Client + +// http client setting to use for integ testing +var httpClient *http.Client + +var region = "us-west-2" + +// large object buffer to test multipart upload +var largeObjectBuf []byte + +// TestMain executes at start of package tests +func TestMain(m *testing.M) { + flag.Parse() + flag.CommandLine.Visit(func(f *flag.Flag) { + if !(f.Name == "run" || f.Name == "test.run") { + return + } + value := f.Value.String() + if value == `NONE` { + os.Exit(0) + } + }) + + var result int + defer func() { + if r := recover(); r != nil { + fmt.Fprintln(os.Stderr, "S3 TransferManager integration tests panic,", r) + result = 1 + } + os.Exit(result) + }() + + var verifyTLS bool + var s3Endpoint string + + flag.StringVar(&s3Endpoint, "s3-endpoint", "", "integration endpoint for S3") + + flag.StringVar(&setupMetadata.AccountID, "account", "", "integration account id") + flag.BoolVar(&verifyTLS, "verify-tls", true, "verify server TLS certificate") + flag.Parse() + + httpClient = &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: verifyTLS}, + }, + } + + cfg, err := config.LoadDefaultConfig(context.Background(), + config.WithRegion(region)) + if err != nil { + fmt.Fprintf(os.Stderr, "Error occurred while loading config with region %v, %v", region, err) + result = 1 + return + } + + // assign the http client + cfg.HTTPClient = httpClient + + // create a s3 client + s3cfg := cfg.Copy() + if len(s3Endpoint) != 0 { + s3cfg.EndpointResolver = aws.EndpointResolverFunc(func(service, region string) (aws.Endpoint, error) { + return aws.Endpoint{ + URL: s3Endpoint, + PartitionID: "aws", + SigningName: "s3", + SigningRegion: region, + SigningMethod: "s3v4", + }, nil + }) + } + + // build s3 client from config + s3Client = s3.NewFromConfig(s3cfg) + + // build s3 transfermanager client from config + s3TransferManagerClient = NewFromConfig(s3Client, s3cfg) + + // build sts client from config + stsClient = sts.NewFromConfig(cfg) + + // context + ctx := context.Background() + + setupMetadata.AccountID, err = getAccountID(ctx) + if err != nil { + fmt.Fprintf(os.Stderr, "failed to get integration aws account id: %v\n", err) + result = 1 + return + } + + bucketCleanup, err := setupBuckets(ctx) + defer bucketCleanup() + if err != nil { + fmt.Fprintf(os.Stderr, "failed to setup integration test buckets: %v\n", err) + result = 1 + return + } + + largeObjectBuf = make([]byte, 20*1024*1024) + _, err = rand.Read(largeObjectBuf) + if err != nil { + fmt.Fprintf(os.Stderr, "failed to generate large object for multipart upload: %v\n", err) + result = 1 + return + } + + result = m.Run() +} + +// getAccountID retrieves account id +func getAccountID(ctx context.Context) (string, error) { + if len(setupMetadata.AccountID) != 0 { + return setupMetadata.AccountID, nil + } + identity, err := stsClient.GetCallerIdentity(ctx, nil) + if err != nil { + return "", fmt.Errorf("error fetching caller identity, %w", err) + } + return *identity.Account, nil +} + +// setupBuckets creates buckets needed for integration test +func setupBuckets(ctx context.Context) (func(), error) { + var cleanups []func() + + cleanup := func() { + for i := range cleanups { + cleanups[i]() + } + } + + bucketCreates := []struct { + name *string + arn *string + }{ + {name: &setupMetadata.Buckets.Source.Name, arn: &setupMetadata.Buckets.Source.ARN}, + } + + for _, bucket := range bucketCreates { + *bucket.name = GenerateBucketName() + + if err := SetupBucket(ctx, s3Client, *bucket.name); err != nil { + return cleanup, err + } + + // Compute ARN + bARN := arn.ARN{ + Partition: "aws", + Service: "s3", + Region: region, + AccountID: setupMetadata.AccountID, + Resource: fmt.Sprintf("bucket_name:%s", *bucket.name), + }.String() + + *bucket.arn = bARN + + bucketName := *bucket.name + cleanups = append(cleanups, func() { + if err := CleanupBucket(ctx, s3Client, bucketName); err != nil { + fmt.Fprintln(os.Stderr, err) + } + }) + } + + return cleanup, nil +} + +type putObjectTestData struct { + Body io.Reader + ExpectBody []byte + ExpectError string +} + +// UniqueID returns a unique UUID-like identifier for use in generating +// resources for integration tests. +// +// TODO: duped from service/internal/integrationtest, remove after beta. +func UniqueID() string { + uuid := make([]byte, 16) + io.ReadFull(rand.Reader, uuid) + return fmt.Sprintf("%x", uuid) +} + +func testPutObject(t *testing.T, bucket string, testData putObjectTestData, opts ...func(options *Options)) { + key := UniqueID() + + _, err := s3TransferManagerClient.PutObject(context.Background(), + &PutObjectInput{ + Bucket: bucket, + Key: key, + Body: testData.Body, + }, opts...) + if err != nil { + if len(testData.ExpectError) == 0 { + t.Fatalf("expect no error, got %v", err) + } + if e, a := testData.ExpectError, err.Error(); !strings.Contains(a, e) { + t.Fatalf("expect error to contain %v, got %v", e, a) + } + } else { + if e := testData.ExpectError; len(e) != 0 { + t.Fatalf("expect error: %v, got none", e) + } + } + + if len(testData.ExpectError) != 0 { + return + } + + resp, err := s3Client.GetObject(context.Background(), + &s3.GetObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + }) + if err != nil { + t.Fatalf("expect no error, got %v", err) + } + + b, _ := ioutil.ReadAll(resp.Body) + if e, a := testData.ExpectBody, b; !bytes.EqualFold(e, a) { + t.Errorf("expect %s, got %s", e, a) + } +} + +// TODO: duped from service/internal/integrationtest, remove after beta. +const expressAZID = "usw2-az3" + +// TODO: duped from service/internal/integrationtest, remove after beta. +const expressSuffix = "--usw2-az3--x-s3" + +// BucketPrefix is the root prefix of integration test buckets. +// +// TODO: duped from service/internal/integrationtest, remove after beta. +const BucketPrefix = "aws-sdk-go-v2-integration" + +// GenerateBucketName returns a unique bucket name. +// +// TODO: duped from service/internal/integrationtest, remove after beta. +func GenerateBucketName() string { + return fmt.Sprintf("%s-%s", + BucketPrefix, UniqueID()) +} + +// GenerateBucketName returns a unique express-formatted bucket name. +// +// TODO: duped from service/internal/integrationtest, remove after beta. +func GenerateExpressBucketName() string { + return fmt.Sprintf( + "%s-%s%s", + BucketPrefix, + UniqueID()[0:8], // express suffix adds length, regain that here + expressSuffix, + ) +} + +// SetupBucket returns a test bucket created for the integration tests. +// +// TODO: duped from service/internal/integrationtest, remove after beta. +func SetupBucket(ctx context.Context, svc *s3.Client, bucketName string) (err error) { + fmt.Println("Setup: Creating test bucket,", bucketName) + _, err = svc.CreateBucket(ctx, &s3.CreateBucketInput{ + Bucket: &bucketName, + CreateBucketConfiguration: &types.CreateBucketConfiguration{ + LocationConstraint: "us-west-2", + }, + }) + if err != nil { + return fmt.Errorf("failed to create bucket %s, %v", bucketName, err) + } + + // TODO: change this to use waiter to wait until BucketExists instead of loop + // svc.WaitUntilBucketExists(HeadBucketInput) + + // HeadBucket to determine if bucket exists + var attempt = 0 + params := &s3.HeadBucketInput{ + Bucket: &bucketName, + } +pt: + _, err = svc.HeadBucket(ctx, params) + // increment an attempt + attempt++ + + // retry till 10 attempt + if err != nil { + if attempt < 10 { + goto pt + } + // fail if not succeed after 10 attempts + return fmt.Errorf("failed to determine if a bucket %s exists and you have permission to access it %v", bucketName, err) + } + + return nil +} + +// CleanupBucket deletes the contents of a S3 bucket, before deleting the bucket +// it self. +// TODO: list and delete methods should use paginators +// +// TODO: duped from service/internal/integrationtest, remove after beta. +func CleanupBucket(ctx context.Context, svc *s3.Client, bucketName string) (err error) { + var errs = make([]error, 0) + + fmt.Println("TearDown: Deleting objects from test bucket,", bucketName) + listObjectsResp, err := svc.ListObjectsV2(ctx, &s3.ListObjectsV2Input{ + Bucket: &bucketName, + }) + if err != nil { + return fmt.Errorf("failed to list objects, %w", err) + } + + for _, o := range listObjectsResp.Contents { + _, err := svc.DeleteObject(ctx, &s3.DeleteObjectInput{ + Bucket: &bucketName, + Key: o.Key, + }) + if err != nil { + errs = append(errs, err) + } + } + if len(errs) != 0 { + return fmt.Errorf("failed to delete objects, %s", errs) + } + + fmt.Println("TearDown: Deleting partial uploads from test bucket,", bucketName) + multipartUploadResp, err := svc.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{ + Bucket: &bucketName, + }) + if err != nil { + return fmt.Errorf("failed to list multipart objects, %w", err) + } + + for _, u := range multipartUploadResp.Uploads { + _, err = svc.AbortMultipartUpload(ctx, &s3.AbortMultipartUploadInput{ + Bucket: &bucketName, + Key: u.Key, + UploadId: u.UploadId, + }) + if err != nil { + errs = append(errs, err) + } + } + if len(errs) != 0 { + return fmt.Errorf("failed to delete multipart upload objects, %s", errs) + } + + fmt.Println("TearDown: Deleting test bucket,", bucketName) + _, err = svc.DeleteBucket(ctx, &s3.DeleteBucketInput{ + Bucket: &bucketName, + }) + if err != nil { + return fmt.Errorf("failed to delete bucket, %s", bucketName) + } + + return nil +} + +// SetupExpressBucket returns an express bucket for testing. +// +// TODO: duped from service/internal/integrationtest, remove after beta. +func SetupExpressBucket(ctx context.Context, svc *s3.Client, bucketName string) error { + if !strings.HasSuffix(bucketName, expressSuffix) { + return fmt.Errorf("bucket name %s is missing required suffix %s", bucketName, expressSuffix) + } + + fmt.Println("Setup: Creating test express bucket,", bucketName) + _, err := svc.CreateBucket(ctx, &s3.CreateBucketInput{ + Bucket: &bucketName, + CreateBucketConfiguration: &types.CreateBucketConfiguration{ + Location: &types.LocationInfo{ + Name: aws.String(expressAZID), + Type: types.LocationTypeAvailabilityZone, + }, + Bucket: &types.BucketInfo{ + DataRedundancy: types.DataRedundancySingleAvailabilityZone, + Type: types.BucketTypeDirectory, + }, + }, + }) + if err != nil { + return fmt.Errorf("create express bucket %s: %v", bucketName, err) + } + + w := s3.NewBucketExistsWaiter(svc) + err = w.Wait(ctx, &s3.HeadBucketInput{ + Bucket: &bucketName, + }, 10*time.Second) + if err != nil { + return fmt.Errorf("wait for express bucket %s: %v", bucketName, err) + } + + return nil +} diff --git a/feature/s3/transfermanager/shared_test.go b/feature/s3/transfermanager/shared_test.go new file mode 100644 index 00000000000..364423e96c2 --- /dev/null +++ b/feature/s3/transfermanager/shared_test.go @@ -0,0 +1,4 @@ +package transfermanager + +var buf20MB = make([]byte, 1024*1024*20) +var buf2MB = make([]byte, 1024*1024*2) diff --git a/feature/s3/transfermanager/types/types.go b/feature/s3/transfermanager/types/types.go new file mode 100644 index 00000000000..8a2d877e461 --- /dev/null +++ b/feature/s3/transfermanager/types/types.go @@ -0,0 +1,346 @@ +package types + +import ( + "io" + + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/service/s3/types" +) + +// ReadSeekCloser wraps a io.Reader returning a ReaderSeekerCloser. Allows the +// SDK to accept an io.Reader that is not also an io.Seeker for unsigned +// streaming payload API operations. +// +// A readSeekCloser wrapping an nonseekable io.Reader used in an API operation's +// input will prevent that operation being retried in the case of +// network errors, and cause operation requests to fail if the operation +// requires payload signing. +func ReadSeekCloser(r io.Reader) *ReaderSeekerCloser { + return &ReaderSeekerCloser{r} +} + +// ReaderSeekerCloser represents a reader that can also delegate io.Seeker and +// io.Closer interfaces to the underlying object if they are available. +type ReaderSeekerCloser struct { + r io.Reader +} + +// SeekerLen attempts to get the number of bytes remaining at the seeker's +// current position. Returns the number of bytes remaining or error. +func SeekerLen(s io.Seeker) (int64, error) { + // Determine if the seeker is actually seekable. ReaderSeekerCloser + // hides the fact that a io.Readers might not actually be seekable. + switch v := s.(type) { + case *ReaderSeekerCloser: + return v.GetLen() + } + + return computeSeekerLength(s) +} + +// GetLen returns the length of the bytes remaining in the underlying reader. +// Checks first for Len(), then io.Seeker to determine the size of the +// underlying reader. +// +// Will return -1 if the length cannot be determined. +func (r *ReaderSeekerCloser) GetLen() (int64, error) { + if l, ok := r.HasLen(); ok { + return int64(l), nil + } + + if s, ok := r.r.(io.Seeker); ok { + return computeSeekerLength(s) + } + + return -1, nil +} + +func computeSeekerLength(s io.Seeker) (int64, error) { + curOffset, err := s.Seek(0, io.SeekCurrent) + if err != nil { + return 0, err + } + + endOffset, err := s.Seek(0, io.SeekEnd) + if err != nil { + return 0, err + } + + _, err = s.Seek(curOffset, io.SeekStart) + if err != nil { + return 0, err + } + + return endOffset - curOffset, nil +} + +// HasLen returns the length of the underlying reader if the value implements +// the Len() int method. +func (r *ReaderSeekerCloser) HasLen() (int, bool) { + type lenner interface { + Len() int + } + + if lr, ok := r.r.(lenner); ok { + return lr.Len(), true + } + + return 0, false +} + +// Read reads from the reader up to size of p. The number of bytes read, and +// error if it occurred will be returned. +// +// If the reader is not an io.Reader zero bytes read, and nil error will be +// returned. +// +// Performs the same functionality as io.Reader Read +func (r *ReaderSeekerCloser) Read(p []byte) (int, error) { + switch t := r.r.(type) { + case io.Reader: + return t.Read(p) + } + return 0, nil +} + +// Seek sets the offset for the next Read to offset, interpreted according to +// whence: 0 means relative to the origin of the file, 1 means relative to the +// current offset, and 2 means relative to the end. Seek returns the new offset +// and an error, if any. +// +// If the ReaderSeekerCloser is not an io.Seeker nothing will be done. +func (r *ReaderSeekerCloser) Seek(offset int64, whence int) (int64, error) { + switch t := r.r.(type) { + case io.Seeker: + return t.Seek(offset, whence) + } + return int64(0), nil +} + +// IsSeeker returns if the underlying reader is also a seeker. +func (r *ReaderSeekerCloser) IsSeeker() bool { + _, ok := r.r.(io.Seeker) + return ok +} + +// Close closes the ReaderSeekerCloser. +// +// If the ReaderSeekerCloser is not an io.Closer nothing will be done. +func (r *ReaderSeekerCloser) Close() error { + switch t := r.r.(type) { + case io.Closer: + return t.Close() + } + return nil +} + +// ChecksumAlgorithm indicates the algorithm used to create the checksum for the object +type ChecksumAlgorithm string + +// Enum values for ChecksumAlgorithm +const ( + ChecksumAlgorithmCrc32 ChecksumAlgorithm = "CRC32" + ChecksumAlgorithmCrc32c = "CRC32C" + ChecksumAlgorithmSha1 = "SHA1" + ChecksumAlgorithmSha256 = "SHA256" +) + +// ObjectCannedACL defines the canned ACL to apply to the object, see [Canned ACL] in the +// Amazon S3 User Guide. +type ObjectCannedACL string + +// Enum values for ObjectCannedACL +const ( + ObjectCannedACLPrivate ObjectCannedACL = "private" + ObjectCannedACLPublicRead = "public-read" + ObjectCannedACLPublicReadWrite = "public-read-write" + ObjectCannedACLAuthenticatedRead = "authenticated-read" + ObjectCannedACLAwsExecRead = "aws-exec-read" + ObjectCannedACLBucketOwnerRead = "bucket-owner-read" + ObjectCannedACLBucketOwnerFullControl = "bucket-owner-full-control" +) + +// Values returns all known values for ObjectCannedACL. Note that this can be +// expanded in the future, and so it is only as up to date as the client. +// +// The ordering of this slice is not guaranteed to be stable across updates. +func (ObjectCannedACL) Values() []ObjectCannedACL { + return []ObjectCannedACL{ + "private", + "public-read", + "public-read-write", + "authenticated-read", + "aws-exec-read", + "bucket-owner-read", + "bucket-owner-full-control", + } +} + +// ObjectLockLegalHoldStatus specifies whether a legal hold will be applied to this object. For more +// information about S3 Object Lock, see [Object Lock] in the Amazon S3 User Guide. +type ObjectLockLegalHoldStatus string + +// Enum values for ObjectLockLegalHoldStatus +const ( + ObjectLockLegalHoldStatusOn ObjectLockLegalHoldStatus = "ON" + ObjectLockLegalHoldStatusOff = "OFF" +) + +// ObjectLockMode is the Object Lock mode that you want to apply to this object. +type ObjectLockMode string + +// Enum values for ObjectLockMode +const ( + ObjectLockModeGovernance ObjectLockMode = "GOVERNANCE" + ObjectLockModeCompliance = "COMPLIANCE" +) + +// RequestPayer confirms that the requester knows that they will be charged for the request. +// Bucket owners need not specify this parameter in their requests. If either the +// source or destination S3 bucket has Requester Pays enabled, the requester will +// pay for corresponding charges to copy the object. For information about +// downloading objects from Requester Pays buckets, see [Downloading Objects in Requester Pays Buckets]in the Amazon S3 User +// Guide. +type RequestPayer string + +// Enum values for RequestPayer +const ( + RequestPayerRequester RequestPayer = "requester" +) + +// ServerSideEncryption indicates the server-side encryption algorithm that was used when you store this object +// in Amazon S3 (for example, AES256 , aws:kms , aws:kms:dsse ) +type ServerSideEncryption string + +// Enum values for ServerSideEncryption +const ( + ServerSideEncryptionAes256 ServerSideEncryption = "AES256" + ServerSideEncryptionAwsKms = "aws:kms" + ServerSideEncryptionAwsKmsDsse = "aws:kms:dsse" +) + +// StorageClass specifies class to store newly created +// objects, which has default value of STANDARD. For more information, see +// [Storage Classes] in the Amazon S3 User Guide. +type StorageClass string + +// Enum values for StorageClass +const ( + StorageClassStandard StorageClass = "STANDARD" + StorageClassReducedRedundancy = "REDUCED_REDUNDANCY" + StorageClassStandardIa = "STANDARD_IA" + StorageClassOnezoneIa = "ONEZONE_IA" + StorageClassIntelligentTiering = "INTELLIGENT_TIERING" + StorageClassGlacier = "GLACIER" + StorageClassDeepArchive = "DEEP_ARCHIVE" + StorageClassOutposts = "OUTPOSTS" + StorageClassGlacierIr = "GLACIER_IR" + StorageClassSnow = "SNOW" + StorageClassExpressOnezone = "EXPRESS_ONEZONE" +) + +// CompletedPart includes details of the parts that were uploaded. +type CompletedPart struct { + + // The base64-encoded, 32-bit CRC32 checksum of the object. This will only be + // present if it was uploaded with the object. When you use an API operation on an + // object that was uploaded using multipart uploads, this value may not be a direct + // checksum value of the full object. Instead, it's a calculation based on the + // checksum values of each individual part. For more information about how + // checksums are calculated with multipart uploads, see [Checking object integrity]in the Amazon S3 User + // Guide. + // + // [Checking object integrity]: https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html#large-object-checksums + ChecksumCRC32 *string + + // The base64-encoded, 32-bit CRC32C checksum of the object. This will only be + // present if it was uploaded with the object. When you use an API operation on an + // object that was uploaded using multipart uploads, this value may not be a direct + // checksum value of the full object. Instead, it's a calculation based on the + // checksum values of each individual part. For more information about how + // checksums are calculated with multipart uploads, see [Checking object integrity]in the Amazon S3 User + // Guide. + // + // [Checking object integrity]: https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html#large-object-checksums + ChecksumCRC32C *string + + // The base64-encoded, 160-bit SHA-1 digest of the object. This will only be + // present if it was uploaded with the object. When you use the API operation on an + // object that was uploaded using multipart uploads, this value may not be a direct + // checksum value of the full object. Instead, it's a calculation based on the + // checksum values of each individual part. For more information about how + // checksums are calculated with multipart uploads, see [Checking object integrity]in the Amazon S3 User + // Guide. + // + // [Checking object integrity]: https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html#large-object-checksums + ChecksumSHA1 *string + + // The base64-encoded, 256-bit SHA-256 digest of the object. This will only be + // present if it was uploaded with the object. When you use an API operation on an + // object that was uploaded using multipart uploads, this value may not be a direct + // checksum value of the full object. Instead, it's a calculation based on the + // checksum values of each individual part. For more information about how + // checksums are calculated with multipart uploads, see [Checking object integrity]in the Amazon S3 User + // Guide. + // + // [Checking object integrity]: https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html#large-object-checksums + ChecksumSHA256 *string + + // Entity tag returned when the part was uploaded. + ETag *string + + // Part number that identifies the part. This is a positive integer between 1 and + // 10,000. + // + // - General purpose buckets - In CompleteMultipartUpload , when a additional + // checksum (including x-amz-checksum-crc32 , x-amz-checksum-crc32c , + // x-amz-checksum-sha1 , or x-amz-checksum-sha256 ) is applied to each part, the + // PartNumber must start at 1 and the part numbers must be consecutive. + // Otherwise, Amazon S3 generates an HTTP 400 Bad Request status code and an + // InvalidPartOrder error code. + // + // - Directory buckets - In CompleteMultipartUpload , the PartNumber must start + // at 1 and the part numbers must be consecutive. + PartNumber *int32 +} + +// MapCompletedPart maps CompletedPart to s3 types +func (cp CompletedPart) MapCompletedPart() types.CompletedPart { + return types.CompletedPart{ + ChecksumCRC32: cp.ChecksumCRC32, + ChecksumCRC32C: cp.ChecksumCRC32C, + ChecksumSHA1: cp.ChecksumSHA1, + ChecksumSHA256: cp.ChecksumSHA256, + ETag: cp.ETag, + PartNumber: cp.PartNumber, + } +} + +// MapFrom set CompletedPart fields from s3 UploadPartOutput +func (cp *CompletedPart) MapFrom(resp *s3.UploadPartOutput, partNum *int32) { + cp.ChecksumCRC32 = resp.ChecksumCRC32 + cp.ChecksumCRC32C = resp.ChecksumCRC32C + cp.ChecksumSHA1 = resp.ChecksumSHA1 + cp.ChecksumSHA256 = resp.ChecksumSHA256 + cp.ETag = resp.ETag + cp.PartNumber = partNum +} + +// RequestCharged indicates that the requester was successfully charged for the request. +type RequestCharged string + +// Enum values for RequestCharged +const ( + RequestChargedRequester RequestCharged = "requester" +) + +// Metadata provides storing and reading metadata values. Keys may be any +// comparable value type. Get and set will panic if key is not a comparable +// value type. +// +// Metadata uses lazy initialization, and Set method must be called as an +// addressable value, or pointer. Not doing so may cause key/value pair to not +// be set. +type Metadata struct { + values map[interface{}]interface{} +} diff --git a/modman.toml b/modman.toml index c642b50193b..ce197c0467c 100644 --- a/modman.toml +++ b/modman.toml @@ -27,6 +27,9 @@ [modules."feature/ec2/imds/internal/configtesting"] no_tag = true + [modules."feature/s3/transfermanager"] + no_tag = true + [modules."internal/codegen"] no_tag = true diff --git a/service/internal/integrationtest/s3/setup_test.go b/service/internal/integrationtest/s3/setup_test.go index ea00037d6e3..e164336f6be 100644 --- a/service/internal/integrationtest/s3/setup_test.go +++ b/service/internal/integrationtest/s3/setup_test.go @@ -346,8 +346,8 @@ func testWriteToObject(t *testing.T, bucket string, testData writeToObjectTestDa } } else { - if len(testData.ExpectError) != 0 { - t.Fatalf("expected error: %v, got none", err) + if e := testData.ExpectError; len(e) != 0 { + t.Fatalf("expected error: %v, got none", e) } }