diff --git a/pkg/backup/create.go b/pkg/backup/create.go index 1fbbe89a..786517f5 100644 --- a/pkg/backup/create.go +++ b/pkg/backup/create.go @@ -12,7 +12,7 @@ import ( "path" "path/filepath" "strings" - "sync" + "sync/atomic" "time" "github.com/Altinity/clickhouse-backup/pkg/clickhouse" @@ -208,7 +208,7 @@ func (b *Backuper) createBackupLocal(ctx context.Context, backupName string, par } // more precise data size calculation for _, size := range realSize { - backupDataSize += uint64(size) + atomic.AddUint64(&backupDataSize, uint64(size)) } } // https://github.com/Altinity/clickhouse-backup/issues/529 @@ -643,7 +643,6 @@ func (b *Backuper) AddTableToBackup(ctx context.Context, backupName, shadowBacku func (b *Backuper) uploadObjectDiskParts(ctx context.Context, backupName, backupShadowPath string, disk clickhouse.Disk) (int64, error) { var size int64 var err error - var sizeMutex sync.Mutex if err = object_disk.InitCredentialsAndConnections(ctx, b.ch, b.cfg, disk.Name); err != nil { return 0, err } @@ -688,13 +687,11 @@ func (b *Backuper) uploadObjectDiskParts(ctx context.Context, backupName, backup } realSize += objSize } - sizeMutex.Lock() if realSize > objPartFileMeta.TotalSize { - size += realSize + atomic.AddInt64(&size, realSize) } else { - size += objPartFileMeta.TotalSize + atomic.AddInt64(&size, objPartFileMeta.TotalSize) } - sizeMutex.Unlock() return nil }) return nil diff --git a/pkg/backup/restore.go b/pkg/backup/restore.go index b2156c69..80a35aa1 100644 --- a/pkg/backup/restore.go +++ b/pkg/backup/restore.go @@ -17,7 +17,7 @@ import ( "path/filepath" "regexp" "strings" - "sync" + "sync/atomic" "time" "github.com/Altinity/clickhouse-backup/pkg/common" @@ -850,7 +850,6 @@ func (b *Backuper) downloadObjectDiskParts(ctx context.Context, backupName strin start := time.Now() downloadObjectDiskPartsWorkingGroup, ctx := errgroup.WithContext(ctx) downloadObjectDiskPartsWorkingGroup.SetLimit(int(b.cfg.General.DownloadConcurrency)) - sizeMutex := sync.Mutex{} for _, part := range parts { partPath := path.Join(diskMap[diskName], "backup", backupName, "shadow", dbAndTableDir, diskName, part.Name) walkErr := filepath.Walk(partPath, func(fPath string, fInfo fs.FileInfo, err error) error { @@ -891,9 +890,7 @@ func (b *Backuper) downloadObjectDiskParts(ctx context.Context, backupName strin if copiedSize, copyObjectErr := object_disk.CopyObject(ctx, b.ch, b.cfg, diskName, storageObject.ObjectSize, srcBucket, srcKey, storageObject.ObjectRelativePath); copyObjectErr != nil { return fmt.Errorf("object_disk.CopyObject error: %v", err) } else { - sizeMutex.Lock() - size += copiedSize - sizeMutex.Unlock() + atomic.AddInt64(&size, copiedSize) } } return nil diff --git a/pkg/storage/object_disk/object_disk.go b/pkg/storage/object_disk/object_disk.go index c36b7bd5..21d94910 100644 --- a/pkg/storage/object_disk/object_disk.go +++ b/pkg/storage/object_disk/object_disk.go @@ -239,7 +239,6 @@ func InitCredentialsAndConnections(ctx context.Context, ch *clickhouse.ClickHous var err error var mu sync.Mutex mu.Lock() - defer mu.Unlock() if _, exists := DisksCredentials[diskName]; !exists { DisksCredentials, err = getObjectDisksCredentials(ctx, ch) if err != nil { @@ -256,6 +255,7 @@ func InitCredentialsAndConnections(ctx context.Context, ch *clickhouse.ClickHous } DisksConnections[diskName] = *connection } + mu.Unlock() return nil } diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index 4f476a17..70324481 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -525,11 +525,11 @@ func (s *S3) CopyObject(ctx context.Context, srcSize int64, srcBucket, srcKey, d return err } mu.Lock() - defer mu.Unlock() parts = append(parts, s3types.CompletedPart{ ETag: partResp.CopyPartResult.ETag, PartNumber: aws.Int32(currentPartNumber), }) + mu.Unlock() return nil }) }