Skip to content

Commit

Permalink
More S3 params (#63)
Browse files Browse the repository at this point in the history
* More s3 params

* multipart parameters

* rename to multipart_upload
  • Loading branch information
diPhantxm authored Oct 9, 2024
1 parent e1c51f9 commit b5217c2
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 12 deletions.
16 changes: 13 additions & 3 deletions cmd/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ var (

decrypt bool
/* Put command flags */
encrypt bool
storageClass string
tableSpace string
encrypt bool
storageClass string
tableSpace string
multipartChunksize int64
multipartUpload bool

offset uint64

Expand Down Expand Up @@ -115,6 +117,14 @@ func putFunc(con net.Conn, instanceCnf *config.Instance, args []string) error {
Name: message.TableSpaceSetting,
Value: tableSpace,
},
{
Name: message.MultipartChunksize,
Value: fmt.Sprintf("%d", multipartChunksize),
},
{
Name: message.MultipartUpload,
Value: fmt.Sprintf("%t", multipartUpload),
},
}).Encode()
_, err := con.Write(msg)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions pkg/message/put_message_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (

const StorageClassSetting = "StorageClass"
const TableSpaceSetting = "TableSpace"
const MultipartChunksize = "MultipartChunksize"
const MultipartUpload = "MultipartUpload"

type PutMessageV2 struct {
Encrypt bool
Expand Down
43 changes: 34 additions & 9 deletions pkg/storage/s3storage.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package storage

import (
"bytes"
"context"
"fmt"
"io"
"path"
"strconv"
"strings"

"github.com/yezzey-gp/aws-sdk-go/aws"
Expand Down Expand Up @@ -69,29 +71,52 @@ func (s *S3StorageInteractor) PutFileToDest(name string, r io.Reader, settings [

objectPath := path.Join(s.cnf.StoragePrefix, name)

storageClass := ResolveStorageSetting(settings, message.StorageClassSetting, "STANDARD")
tableSpace := ResolveStorageSetting(settings, message.TableSpaceSetting, tablespace.DefaultTableSpace)
multipartChunksizeStr := ResolveStorageSetting(settings, message.MultipartChunksize, "")
multipartChunksize, err := strconv.ParseInt(multipartChunksizeStr, 10, 64)
if err != nil {
return err
}
multipartUpload, err := strconv.ParseBool(ResolveStorageSetting(settings, message.MultipartUpload, "1"))
if err != nil {
return err
}

up := s3manager.NewUploaderWithClient(sess, func(uploader *s3manager.Uploader) {
uploader.PartSize = int64(1 << 24)
uploader.PartSize = int64(multipartChunksize)
uploader.Concurrency = 1
})

storageClass := ResolveStorageSetting(settings, message.StorageClassSetting, "STANDARD")
tableSpace := ResolveStorageSetting(settings, message.TableSpaceSetting, tablespace.DefaultTableSpace)

bucket, ok := s.bucketMap[tableSpace]
if !ok {
err := fmt.Errorf("failed to match tablespace %s to s3 bucket.", tableSpace)
ylogger.Zero.Err(err)
return err
}

_, err = up.Upload(
&s3manager.UploadInput{
if multipartUpload {
_, err = up.Upload(
&s3manager.UploadInput{
Bucket: aws.String(bucket),
Key: aws.String(objectPath),
Body: r,
StorageClass: aws.String(storageClass),
},
)
} else {
var body []byte
body, err = io.ReadAll(r)
if err != nil {
return err
}
_, err = sess.PutObject(&s3.PutObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(objectPath),
Body: r,
Body: bytes.NewReader(body),
StorageClass: aws.String(storageClass),
},
)
})
}

return err
}
Expand Down

0 comments on commit b5217c2

Please sign in to comment.