Skip to content

Commit

Permalink
refactoring to avoid race-conditions
Browse files Browse the repository at this point in the history
  • Loading branch information
Slach committed Jan 23, 2024
1 parent 15b0774 commit ff65f29
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 14 deletions.
11 changes: 4 additions & 7 deletions pkg/backup/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"path"
"path/filepath"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/Altinity/clickhouse-backup/pkg/clickhouse"
Expand Down Expand Up @@ -208,7 +208,7 @@ func (b *Backuper) createBackupLocal(ctx context.Context, backupName string, par
}
// more precise data size calculation
for _, size := range realSize {
backupDataSize += uint64(size)
atomic.AddUint64(&backupDataSize, uint64(size))
}
}
// https://github.com/Altinity/clickhouse-backup/issues/529
Expand Down Expand Up @@ -643,7 +643,6 @@ func (b *Backuper) AddTableToBackup(ctx context.Context, backupName, shadowBacku
func (b *Backuper) uploadObjectDiskParts(ctx context.Context, backupName, backupShadowPath string, disk clickhouse.Disk) (int64, error) {
var size int64
var err error
var sizeMutex sync.Mutex
if err = object_disk.InitCredentialsAndConnections(ctx, b.ch, b.cfg, disk.Name); err != nil {
return 0, err
}
Expand Down Expand Up @@ -688,13 +687,11 @@ func (b *Backuper) uploadObjectDiskParts(ctx context.Context, backupName, backup
}
realSize += objSize
}
sizeMutex.Lock()
if realSize > objPartFileMeta.TotalSize {
size += realSize
atomic.AddInt64(&size, realSize)
} else {
size += objPartFileMeta.TotalSize
atomic.AddInt64(&size, objPartFileMeta.TotalSize)
}
sizeMutex.Unlock()
return nil
})
return nil
Expand Down
7 changes: 2 additions & 5 deletions pkg/backup/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"path/filepath"
"regexp"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/Altinity/clickhouse-backup/pkg/common"
Expand Down Expand Up @@ -850,7 +850,6 @@ func (b *Backuper) downloadObjectDiskParts(ctx context.Context, backupName strin
start := time.Now()
downloadObjectDiskPartsWorkingGroup, ctx := errgroup.WithContext(ctx)
downloadObjectDiskPartsWorkingGroup.SetLimit(int(b.cfg.General.DownloadConcurrency))
sizeMutex := sync.Mutex{}
for _, part := range parts {
partPath := path.Join(diskMap[diskName], "backup", backupName, "shadow", dbAndTableDir, diskName, part.Name)
walkErr := filepath.Walk(partPath, func(fPath string, fInfo fs.FileInfo, err error) error {
Expand Down Expand Up @@ -891,9 +890,7 @@ func (b *Backuper) downloadObjectDiskParts(ctx context.Context, backupName strin
if copiedSize, copyObjectErr := object_disk.CopyObject(ctx, b.ch, b.cfg, diskName, storageObject.ObjectSize, srcBucket, srcKey, storageObject.ObjectRelativePath); copyObjectErr != nil {
return fmt.Errorf("object_disk.CopyObject error: %v", err)
} else {
sizeMutex.Lock()
size += copiedSize
sizeMutex.Unlock()
atomic.AddInt64(&size, copiedSize)
}
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/object_disk/object_disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,6 @@ func InitCredentialsAndConnections(ctx context.Context, ch *clickhouse.ClickHous
var err error
var mu sync.Mutex
mu.Lock()
defer mu.Unlock()
if _, exists := DisksCredentials[diskName]; !exists {
DisksCredentials, err = getObjectDisksCredentials(ctx, ch)
if err != nil {
Expand All @@ -256,6 +255,7 @@ func InitCredentialsAndConnections(ctx context.Context, ch *clickhouse.ClickHous
}
DisksConnections[diskName] = *connection
}
mu.Unlock()
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,11 +525,11 @@ func (s *S3) CopyObject(ctx context.Context, srcSize int64, srcBucket, srcKey, d
return err
}
mu.Lock()
defer mu.Unlock()
parts = append(parts, s3types.CompletedPart{
ETag: partResp.CopyPartResult.ETag,
PartNumber: aws.Int32(currentPartNumber),
})
mu.Unlock()
return nil
})
}
Expand Down

0 comments on commit ff65f29

Please sign in to comment.