Skip to content

Commit

Permalink
change S3_MAX_PARTS_COUNT from 5000 to 1000 and minimal `S3_PAR…
Browse files Browse the repository at this point in the history
…T_SIZE` from 5Mb to 25Mb from by default to improve speedup S3 uploading / downloading
  • Loading branch information
Slach committed Jan 8, 2024
1 parent 36a778c commit feb82f8
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 18 deletions.
5 changes: 3 additions & 2 deletions ChangeLog.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# v2.5.0 (not released yet)
# v2.4.16
BUG FIXES
- increase `AZBLOB_TIMEOUT` to 4h, instead 15m to allow download long size data parts

- change `S3_MAX_PARTS_COUNT` from `5000` to `1000` and minimal `S3_PART_SIZE` from 5Mb to 25Mb from by default to improve speedup S3 uploading / downloading

# v2.4.15
BUG FIXES
- fix `create` and `restore` command for ReplicatedMergeTree tables with `frozen_metadata.txt` parsing
Expand Down
2 changes: 1 addition & 1 deletion pkg/backup/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ func (b *Backuper) AddTableToBackup(ctx context.Context, backupName, shadowBacku
realSize[disk.Name] = size
disksToPartsMap[disk.Name] = parts
log.WithField("disk", disk.Name).Debug("shadow moved")
if disk.Type == "s3" || disk.Type == "azure_blob_storage" && len(parts) > 0 {
if (disk.Type == "s3" || disk.Type == "azure_blob_storage") && len(parts) > 0 {
if err = config.ValidateObjectDiskConfig(b.cfg); err != nil {
return nil, nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ func DefaultConfig() *Config {
StorageClass: string(s3types.StorageClassStandard),
Concurrency: int(downloadConcurrency + 1),
PartSize: 0,
MaxPartsCount: 5000,
MaxPartsCount: 1000,
},
GCS: GCSConfig{
CompressionLevel: 1,
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/general.go
Original file line number Diff line number Diff line change
Expand Up @@ -658,8 +658,8 @@ func NewBackupDestination(ctx context.Context, cfg *config.Config, ch *clickhous
if cfg.General.MaxFileSize%cfg.S3.MaxPartsCount > 0 {
partSize++
}
if partSize < 5*1024*1024 {
partSize = 5 * 1024 * 1024
if partSize < 25*1024*1024 {
partSize = 25 * 1024 * 1024
}
if partSize > 5*1024*1024*1024 {
partSize = 5 * 1024 * 1024 * 1024
Expand Down
18 changes: 6 additions & 12 deletions pkg/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
awsV2http "github.com/aws/smithy-go/transport/http"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
)

type S3LogToApexLogAdapter struct {
Expand Down Expand Up @@ -519,25 +518,21 @@ func (s *S3) CopyObject(ctx context.Context, srcBucket, srcKey, dstKey string) (
if srcSize%s.Config.MaxPartsCount > 0 {
partSize++
}
if partSize < 5*1024*1024 {
partSize = 5 * 1024 * 1024
if partSize < 25*1024*1024 {
partSize = 25 * 1024 * 1024
}

// Calculate the number of parts
numParts := (srcSize + partSize - 1) / partSize

copyPartSemaphore := semaphore.NewWeighted(int64(s.Config.Concurrency))
copyPartErrGroup, ctx := errgroup.WithContext(ctx)
copyPartErrGroup.SetLimit(s.Config.Concurrency)

var mu sync.Mutex
var parts []s3types.CompletedPart

// Copy each part of the object
for partNumber := int64(1); partNumber <= numParts; partNumber++ {
if err := copyPartSemaphore.Acquire(ctx, 1); err != nil {
apexLog.Errorf("can't acquire semaphore during CopyObject data parts: %v", err)
break
}
// Calculate the byte range for the part
start := (partNumber - 1) * partSize
end := partNumber * partSize
Expand All @@ -547,7 +542,6 @@ func (s *S3) CopyObject(ctx context.Context, srcBucket, srcKey, dstKey string) (
currentPartNumber := int32(partNumber)

copyPartErrGroup.Go(func() error {
defer copyPartSemaphore.Release(1)
// Copy the part
uploadPartParams := &s3.UploadPartCopyInput{
Bucket: aws.String(s.Config.Bucket),
Expand All @@ -573,7 +567,7 @@ func (s *S3) CopyObject(ctx context.Context, srcBucket, srcKey, dstKey string) (
return nil
})
}
if err := copyPartErrGroup.Wait(); err != nil {
if wgWaitErr := copyPartErrGroup.Wait(); wgWaitErr != nil {
abortParams := &s3.AbortMultipartUploadInput{
Bucket: aws.String(s.Config.Bucket),
Key: aws.String(dstKey),
Expand All @@ -584,9 +578,9 @@ func (s *S3) CopyObject(ctx context.Context, srcBucket, srcKey, dstKey string) (
}
_, abortErr := s.client.AbortMultipartUpload(context.Background(), abortParams)
if abortErr != nil {
return 0, fmt.Errorf("aborting CopyObject multipart upload: %v, original error was: %v", abortErr, err)
return 0, fmt.Errorf("aborting CopyObject multipart upload: %v, original error was: %v", abortErr, wgWaitErr)
}
return 0, fmt.Errorf("one of CopyObject go-routine return error: %v", err)
return 0, fmt.Errorf("one of CopyObject go-routine return error: %v", wgWaitErr)
}
// Parts must be ordered by part number.
sort.Slice(parts, func(i int, j int) bool {
Expand Down

0 comments on commit feb82f8

Please sign in to comment.