From cc2aadad6f12c4cdf6011f741d3a1e1042b1e93f Mon Sep 17 00:00:00 2001 From: Kirill Gavrilov Date: Fri, 4 Oct 2024 13:51:29 +0000 Subject: [PATCH 1/3] More s3 params --- cmd/client/main.go | 16 +++++++++++++--- pkg/message/put_message_v2.go | 2 ++ pkg/storage/s3storage.go | 14 ++++++++++---- 3 files changed, 25 insertions(+), 7 deletions(-) diff --git a/cmd/client/main.go b/cmd/client/main.go index 3e0a9ed..af9e309 100644 --- a/cmd/client/main.go +++ b/cmd/client/main.go @@ -24,9 +24,11 @@ var ( decrypt bool /* Put command flags */ - encrypt bool - storageClass string - tableSpace string + encrypt bool + storageClass string + tableSpace string + multipart_chunksize int64 + multipart_threshold int64 offset uint64 @@ -115,6 +117,14 @@ func putFunc(con net.Conn, instanceCnf *config.Instance, args []string) error { Name: message.TableSpaceSetting, Value: tableSpace, }, + { + Name: message.MultipartChunksize, + Value: fmt.Sprintf("%s", multipart_chunksize), + }, + { + Name: message.MultipartThreshold, + Value: fmt.Sprintf("%s", multipart_threshold), + }, }).Encode() _, err := con.Write(msg) if err != nil { diff --git a/pkg/message/put_message_v2.go b/pkg/message/put_message_v2.go index 28fc247..48c3519 100644 --- a/pkg/message/put_message_v2.go +++ b/pkg/message/put_message_v2.go @@ -8,6 +8,8 @@ import ( const StorageClassSetting = "StorageClass" const TableSpaceSetting = "TableSpace" +const MultipartChunksize = "MultipartChunksize" +const MultipartThreshold = "MultipartThreshold" type PutMessageV2 struct { Encrypt bool diff --git a/pkg/storage/s3storage.go b/pkg/storage/s3storage.go index 5ab15cf..131cd5b 100644 --- a/pkg/storage/s3storage.go +++ b/pkg/storage/s3storage.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "path" + "strconv" "strings" "github.com/yezzey-gp/aws-sdk-go/aws" @@ -69,14 +70,19 @@ func (s *S3StorageInteractor) PutFileToDest(name string, r io.Reader, settings [ objectPath := path.Join(s.cnf.StoragePrefix, name) + storageClass := ResolveStorageSetting(settings, message.StorageClassSetting, "STANDARD") + tableSpace := ResolveStorageSetting(settings, message.TableSpaceSetting, tablespace.DefaultTableSpace) + multipartChunksizeStr := ResolveStorageSetting(settings, message.MultipartChunksize, "") + multipartChunksize, err := strconv.ParseInt(multipartChunksizeStr, 10, 64) + if err != nil { + return err + } + up := s3manager.NewUploaderWithClient(sess, func(uploader *s3manager.Uploader) { - uploader.PartSize = int64(1 << 24) + uploader.PartSize = int64(multipartChunksize) uploader.Concurrency = 1 }) - storageClass := ResolveStorageSetting(settings, message.StorageClassSetting, "STANDARD") - tableSpace := ResolveStorageSetting(settings, message.TableSpaceSetting, tablespace.DefaultTableSpace) - bucket, ok := s.bucketMap[tableSpace] if !ok { err := fmt.Errorf("failed to match tablespace %s to s3 bucket.", tableSpace) From 449d74746ec77e9970f75f8acdfdfbc45e4d96a6 Mon Sep 17 00:00:00 2001 From: Kirill Gavrilov Date: Mon, 7 Oct 2024 23:33:51 +0000 Subject: [PATCH 2/3] multipart parameters --- cmd/client/main.go | 16 ++++++++-------- pkg/message/put_message_v2.go | 2 +- pkg/storage/s3storage.go | 29 ++++++++++++++++++++++++----- 3 files changed, 33 insertions(+), 14 deletions(-) diff --git a/cmd/client/main.go b/cmd/client/main.go index af9e309..d3a50a1 100644 --- a/cmd/client/main.go +++ b/cmd/client/main.go @@ -24,11 +24,11 @@ var ( decrypt bool /* Put command flags */ - encrypt bool - storageClass string - tableSpace string - multipart_chunksize int64 - multipart_threshold int64 + encrypt bool + storageClass string + tableSpace string + multipartChunksize int64 + doMultipart bool offset uint64 @@ -119,11 +119,11 @@ func putFunc(con net.Conn, instanceCnf *config.Instance, args []string) error { }, { Name: message.MultipartChunksize, - Value: fmt.Sprintf("%s", multipart_chunksize), + Value: fmt.Sprintf("%d", multipartChunksize), }, { - Name: message.MultipartThreshold, - Value: fmt.Sprintf("%s", multipart_threshold), + Name: message.DoMultipart, + Value: fmt.Sprintf("%t", doMultipart), }, }).Encode() _, err := con.Write(msg) diff --git a/pkg/message/put_message_v2.go b/pkg/message/put_message_v2.go index 48c3519..6eaf3c2 100644 --- a/pkg/message/put_message_v2.go +++ b/pkg/message/put_message_v2.go @@ -9,7 +9,7 @@ import ( const StorageClassSetting = "StorageClass" const TableSpaceSetting = "TableSpace" const MultipartChunksize = "MultipartChunksize" -const MultipartThreshold = "MultipartThreshold" +const DoMultipart = "DoMultipart" type PutMessageV2 struct { Encrypt bool diff --git a/pkg/storage/s3storage.go b/pkg/storage/s3storage.go index 131cd5b..b24bf14 100644 --- a/pkg/storage/s3storage.go +++ b/pkg/storage/s3storage.go @@ -1,6 +1,7 @@ package storage import ( + "bytes" "context" "fmt" "io" @@ -77,6 +78,10 @@ func (s *S3StorageInteractor) PutFileToDest(name string, r io.Reader, settings [ if err != nil { return err } + doMultipart, err := strconv.ParseBool(ResolveStorageSetting(settings, message.DoMultipart, "1")) + if err != nil { + return err + } up := s3manager.NewUploaderWithClient(sess, func(uploader *s3manager.Uploader) { uploader.PartSize = int64(multipartChunksize) @@ -90,14 +95,28 @@ func (s *S3StorageInteractor) PutFileToDest(name string, r io.Reader, settings [ return err } - _, err = up.Upload( - &s3manager.UploadInput{ + if doMultipart { + _, err = up.Upload( + &s3manager.UploadInput{ + Bucket: aws.String(bucket), + Key: aws.String(objectPath), + Body: r, + StorageClass: aws.String(storageClass), + }, + ) + } else { + var body []byte + body, err = io.ReadAll(r) + if err != nil { + return err + } + _, err = sess.PutObject(&s3.PutObjectInput{ Bucket: aws.String(bucket), Key: aws.String(objectPath), - Body: r, + Body: bytes.NewReader(body), StorageClass: aws.String(storageClass), - }, - ) + }) + } return err } From 07fd8ab7c7b92a316e06766df4478e0fcdf1fea5 Mon Sep 17 00:00:00 2001 From: Kirill Gavrilov Date: Mon, 7 Oct 2024 23:37:52 +0000 Subject: [PATCH 3/3] rename to multipart_upload --- cmd/client/main.go | 6 +++--- pkg/message/put_message_v2.go | 2 +- pkg/storage/s3storage.go | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/cmd/client/main.go b/cmd/client/main.go index d3a50a1..9e5fcf0 100644 --- a/cmd/client/main.go +++ b/cmd/client/main.go @@ -28,7 +28,7 @@ var ( storageClass string tableSpace string multipartChunksize int64 - doMultipart bool + multipartUpload bool offset uint64 @@ -122,8 +122,8 @@ func putFunc(con net.Conn, instanceCnf *config.Instance, args []string) error { Value: fmt.Sprintf("%d", multipartChunksize), }, { - Name: message.DoMultipart, - Value: fmt.Sprintf("%t", doMultipart), + Name: message.MultipartUpload, + Value: fmt.Sprintf("%t", multipartUpload), }, }).Encode() _, err := con.Write(msg) diff --git a/pkg/message/put_message_v2.go b/pkg/message/put_message_v2.go index 6eaf3c2..b9ba21f 100644 --- a/pkg/message/put_message_v2.go +++ b/pkg/message/put_message_v2.go @@ -9,7 +9,7 @@ import ( const StorageClassSetting = "StorageClass" const TableSpaceSetting = "TableSpace" const MultipartChunksize = "MultipartChunksize" -const DoMultipart = "DoMultipart" +const MultipartUpload = "MultipartUpload" type PutMessageV2 struct { Encrypt bool diff --git a/pkg/storage/s3storage.go b/pkg/storage/s3storage.go index b24bf14..a2dc98b 100644 --- a/pkg/storage/s3storage.go +++ b/pkg/storage/s3storage.go @@ -78,7 +78,7 @@ func (s *S3StorageInteractor) PutFileToDest(name string, r io.Reader, settings [ if err != nil { return err } - doMultipart, err := strconv.ParseBool(ResolveStorageSetting(settings, message.DoMultipart, "1")) + multipartUpload, err := strconv.ParseBool(ResolveStorageSetting(settings, message.MultipartUpload, "1")) if err != nil { return err } @@ -95,7 +95,7 @@ func (s *S3StorageInteractor) PutFileToDest(name string, r io.Reader, settings [ return err } - if doMultipart { + if multipartUpload { _, err = up.Upload( &s3manager.UploadInput{ Bucket: aws.String(bucket),