Skip to content

Commit

Permalink
refactoring semaphore.NewWeighted() to errgroup.SetLimit(), add p…
Browse files Browse the repository at this point in the history
…arallelization to `create` and `restore` command during call `CopyObject`
  • Loading branch information
Slach committed Dec 15, 2023
1 parent 17dcdc2 commit 3e60dff
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 115 deletions.
5 changes: 5 additions & 0 deletions ChangeLog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
# v2.4.14
IMPROVEMENTS
- refactoring `semaphore.NewWeighted()` to `errgroup.SetLimit()`
- add parallelization to `create` and `restore` command during call `CopyObject`

# v2.4.13
BUG FIXES
- fix object_disk.CopyObject during restore to allow use properly S3 endpoint
Expand Down
59 changes: 38 additions & 21 deletions pkg/backup/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ import (
"encoding/json"
"errors"
"fmt"
"golang.org/x/sync/errgroup"
"os"
"path"
"path/filepath"
"strings"
"sync"
"time"

"github.com/Altinity/clickhouse-backup/pkg/clickhouse"
Expand Down Expand Up @@ -624,11 +626,20 @@ func (b *Backuper) AddTableToBackup(ctx context.Context, backupName, shadowBacku
func (b *Backuper) uploadObjectDiskParts(ctx context.Context, backupName, backupShadowPath string, disk clickhouse.Disk) (int64, error) {
var size int64
var err error
var sizeMutex sync.Mutex
if err = object_disk.InitCredentialsAndConnections(ctx, b.ch, b.cfg, disk.Name); err != nil {
return 0, err
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
uploadObjectDiskPartsWorkingGroup, ctx := errgroup.WithContext(ctx)
uploadObjectDiskPartsWorkingGroup.SetLimit(int(b.cfg.General.UploadConcurrency))
srcDiskConnection, exists := object_disk.DisksConnections[disk.Name]
if !exists {
return 0, fmt.Errorf("uploadObjectDiskParts: %s not present in object_disk.DisksConnections", disk.Name)
}

if err := filepath.Walk(backupShadowPath, func(fPath string, fInfo os.FileInfo, err error) error {
walkErr := filepath.Walk(backupShadowPath, func(fPath string, fInfo os.FileInfo, err error) error {
if err != nil {
return err
}
Expand All @@ -640,31 +651,37 @@ func (b *Backuper) uploadObjectDiskParts(ctx context.Context, backupName, backup
return err
}
var realSize, objSize int64
// @TODO think about parallelization here after test pass
for _, storageObject := range objPartFileMeta.StorageObjects {
srcDiskConnection, exists := object_disk.DisksConnections[disk.Name]
if !exists {
return fmt.Errorf("uploadObjectDiskParts: %s not present in object_disk.DisksConnections", disk.Name)

uploadObjectDiskPartsWorkingGroup.Go(func() error {
for _, storageObject := range objPartFileMeta.StorageObjects {
if objSize, err = b.dst.CopyObject(
ctx,
srcDiskConnection.GetRemoteBucket(),
path.Join(srcDiskConnection.GetRemotePath(), storageObject.ObjectRelativePath),
path.Join(backupName, disk.Name, storageObject.ObjectRelativePath),
); err != nil {
return err
}
realSize += objSize
}
if objSize, err = b.dst.CopyObject(
ctx,
srcDiskConnection.GetRemoteBucket(),
path.Join(srcDiskConnection.GetRemotePath(), storageObject.ObjectRelativePath),
path.Join(backupName, disk.Name, storageObject.ObjectRelativePath),
); err != nil {
return err
sizeMutex.Lock()
if realSize > objPartFileMeta.TotalSize {
size += realSize
} else {
size += objPartFileMeta.TotalSize
}
realSize += objSize
}
if realSize > objPartFileMeta.TotalSize {
size += realSize
} else {
size += objPartFileMeta.TotalSize
}
sizeMutex.Unlock()
return nil
})
return nil
}); err != nil {
})
if walkErr != nil {
return 0, err
}

if wgWaitErr := uploadObjectDiskPartsWorkingGroup.Wait(); wgWaitErr != nil {
return 0, fmt.Errorf("one of uploadObjectDiskParts go-routine return error: %v", wgWaitErr)
}
return size, nil
}

Expand Down
50 changes: 11 additions & 39 deletions pkg/backup/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"time"

"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"

"github.com/Altinity/clickhouse-backup/pkg/common"
"github.com/Altinity/clickhouse-backup/pkg/metadata"
Expand Down Expand Up @@ -72,7 +71,6 @@ func (b *Backuper) Download(backupName string, tablePattern string, partitions [
if err != nil {
return err
}
ctx, cancel = context.WithCancel(ctx)
defer cancel()
backupName = utils.CleanBackupNameRE.ReplaceAllString(backupName, "")
if err := b.ch.Connect(); err != nil {
Expand Down Expand Up @@ -189,18 +187,13 @@ func (b *Backuper) Download(backupName string, tablePattern string, partitions [

log.Debugf("prepare table METADATA concurrent semaphore with concurrency=%d len(tablesForDownload)=%d", b.cfg.General.DownloadConcurrency, len(tablesForDownload))
tableMetadataAfterDownload := make([]metadata.TableMetadata, len(tablesForDownload))
downloadSemaphore := semaphore.NewWeighted(int64(b.cfg.General.DownloadConcurrency))
metadataGroup, metadataCtx := errgroup.WithContext(ctx)
metadataGroup.SetLimit(int(b.cfg.General.DownloadConcurrency))
for i, t := range tablesForDownload {
if err := downloadSemaphore.Acquire(metadataCtx, 1); err != nil {
log.Errorf("can't acquire semaphore during Download metadata: %v", err)
break
}
metadataLogger := log.WithField("table_metadata", fmt.Sprintf("%s.%s", t.Database, t.Table))
idx := i
tableTitle := t
metadataGroup.Go(func() error {
defer downloadSemaphore.Release(1)
downloadedMetadata, size, err := b.downloadTableMetadata(metadataCtx, backupName, disks, metadataLogger, tableTitle, schemaOnly, partitions)
if err != nil {
return err
Expand Down Expand Up @@ -232,14 +225,9 @@ func (b *Backuper) Download(backupName string, tablePattern string, partitions [
if tableMetadata.MetadataOnly {
continue
}
if err := downloadSemaphore.Acquire(dataCtx, 1); err != nil {
log.Errorf("can't acquire semaphore during Download table data: %v", err)
break
}
dataSize += tableMetadata.TotalBytes
idx := i
dataGroup.Go(func() error {
defer downloadSemaphore.Release(1)
start := time.Now()
if err := b.downloadTableData(dataCtx, remoteBackup.BackupMetadata, tableMetadataAfterDownload[idx]); err != nil {
return err
Expand Down Expand Up @@ -489,9 +477,10 @@ func (b *Backuper) downloadBackupRelatedDir(ctx context.Context, remoteBackup st
func (b *Backuper) downloadTableData(ctx context.Context, remoteBackup metadata.BackupMetadata, table metadata.TableMetadata) error {
log := b.log.WithField("logger", "downloadTableData")
dbAndTableDir := path.Join(common.TablePathEncode(table.Database), common.TablePathEncode(table.Table))

s := semaphore.NewWeighted(int64(b.cfg.General.DownloadConcurrency))
g, dataCtx := errgroup.WithContext(ctx)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
dataGroup, dataCtx := errgroup.WithContext(ctx)
dataGroup.SetLimit(int(b.cfg.General.DownloadConcurrency))

if remoteBackup.DataFormat != DirectoryFormat {
capacity := 0
Expand All @@ -501,22 +490,16 @@ func (b *Backuper) downloadTableData(ctx context.Context, remoteBackup metadata.
downloadOffset[disk] = 0
}
log.Debugf("start %s.%s with concurrency=%d len(table.Files[...])=%d", table.Database, table.Table, b.cfg.General.DownloadConcurrency, capacity)
breakByErrorArchive:
for common.SumMapValuesInt(downloadOffset) < capacity {
for disk := range table.Files {
if downloadOffset[disk] >= len(table.Files[disk]) {
continue
}
archiveFile := table.Files[disk][downloadOffset[disk]]
if err := s.Acquire(dataCtx, 1); err != nil {
log.Errorf("can't acquire semaphore %s archive: %v", archiveFile, err)
break breakByErrorArchive
}
tableLocalDir := b.getLocalBackupDataPathForTable(remoteBackup.BackupName, disk, dbAndTableDir)
downloadOffset[disk] += 1
tableRemoteFile := path.Join(remoteBackup.BackupName, "shadow", common.TablePathEncode(table.Database), common.TablePathEncode(table.Table), archiveFile)
g.Go(func() error {
defer s.Release(1)
dataGroup.Go(func() error {
log.Debugf("start download %s", tableRemoteFile)
if b.resume && b.resumableState.IsAlreadyProcessedBool(tableRemoteFile) {
return nil
Expand All @@ -543,7 +526,6 @@ func (b *Backuper) downloadTableData(ctx context.Context, remoteBackup metadata.
}
log.Debugf("start %s.%s with concurrency=%d len(table.Parts[...])=%d", table.Database, table.Table, b.cfg.General.DownloadConcurrency, capacity)

breakByErrorDirectory:
for disk, parts := range table.Parts {
tableRemotePath := path.Join(remoteBackup.BackupName, "shadow", dbAndTableDir, disk)
diskPath := b.DiskToPathMap[disk]
Expand All @@ -556,13 +538,8 @@ func (b *Backuper) downloadTableData(ctx context.Context, remoteBackup metadata.
continue
}
partRemotePath := path.Join(tableRemotePath, part.Name)
if err := s.Acquire(dataCtx, 1); err != nil {
log.Errorf("can't acquire semaphore %s directory: %v", partRemotePath, err)
break breakByErrorDirectory
}
partLocalPath := path.Join(tableLocalPath, part.Name)
g.Go(func() error {
defer s.Release(1)
dataGroup.Go(func() error {
log.Debugf("start %s -> %s", partRemotePath, partLocalPath)
if b.resume && b.resumableState.IsAlreadyProcessedBool(partRemotePath) {
return nil
Expand All @@ -579,7 +556,7 @@ func (b *Backuper) downloadTableData(ctx context.Context, remoteBackup metadata.
}
}
}
if err := g.Wait(); err != nil {
if err := dataGroup.Wait(); err != nil {
return fmt.Errorf("one of downloadTableData go-routine return error: %v", err)
}

Expand All @@ -597,14 +574,14 @@ func (b *Backuper) downloadDiffParts(ctx context.Context, remoteBackup metadata.
log := b.log.WithField("operation", "downloadDiffParts")
log.WithField("table", fmt.Sprintf("%s.%s", table.Database, table.Table)).Debug("start")
start := time.Now()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
downloadedDiffParts := uint32(0)
s := semaphore.NewWeighted(int64(b.cfg.General.DownloadConcurrency))
downloadDiffGroup, downloadDiffCtx := errgroup.WithContext(ctx)

downloadDiffGroup.SetLimit(int(b.cfg.General.DownloadConcurrency))
diffRemoteFilesCache := map[string]*sync.Mutex{}
diffRemoteFilesLock := &sync.Mutex{}

breakByError:
for disk, parts := range table.Parts {
for _, part := range parts {
newPath := path.Join(b.DiskToPathMap[disk], "backup", remoteBackup.BackupName, "shadow", dbAndTableDir, disk, part.Name)
Expand All @@ -620,14 +597,9 @@ breakByError:
return fmt.Errorf("%s stat return error: %v", existsPath, err)
}
if err != nil && os.IsNotExist(err) {
if err := s.Acquire(downloadDiffCtx, 1); err != nil {
log.Errorf("can't acquire semaphore during downloadDiffParts: %v", err)
break breakByError
}
partForDownload := part
diskForDownload := disk
downloadDiffGroup.Go(func() error {
defer s.Release(1)
tableRemoteFiles, err := b.findDiffBackupFilesRemote(downloadDiffCtx, remoteBackup, table, diskForDownload, partForDownload, log)
if err != nil {
return err
Expand Down
66 changes: 32 additions & 34 deletions pkg/backup/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import (
"github.com/Altinity/clickhouse-backup/pkg/config"
"github.com/Altinity/clickhouse-backup/pkg/keeper"
"github.com/Altinity/clickhouse-backup/pkg/status"
"github.com/Altinity/clickhouse-backup/pkg/storage"
"github.com/Altinity/clickhouse-backup/pkg/storage/object_disk"
"golang.org/x/sync/errgroup"
"io/fs"
"net/url"
"os"
Expand Down Expand Up @@ -814,6 +814,8 @@ func (b *Backuper) downloadObjectDiskParts(ctx context.Context, backupName strin
log := apexLog.WithFields(apexLog.Fields{"operation": "downloadObjectDiskParts"})
start := time.Now()
dbAndTableDir := path.Join(common.TablePathEncode(backupTable.Database), common.TablePathEncode(backupTable.Table))
ctx, cancel := context.WithCancel(ctx)
defer cancel()
var err error
needToDownloadObjectDisk := false
for diskName := range backupTable.Parts {
Expand All @@ -829,19 +831,6 @@ func (b *Backuper) downloadObjectDiskParts(ctx context.Context, backupName strin
if !needToDownloadObjectDisk {
return nil
}
b.dst, err = storage.NewBackupDestination(ctx, b.cfg, b.ch, false, backupName)
if err != nil {
return err
}
if err = b.dst.Connect(ctx); err != nil {
return fmt.Errorf("can't connect to %s: %v", b.dst.Kind(), err)
}
defer func() {
if err := b.dst.Close(ctx); err != nil {
b.log.Warnf("downloadObjectDiskParts: can't close BackupDestination error: %v", err)
}
}()

for diskName, parts := range backupTable.Parts {
diskType, exists := diskTypes[diskName]
if !exists {
Expand All @@ -854,9 +843,11 @@ func (b *Backuper) downloadObjectDiskParts(ctx context.Context, backupName strin
if err = object_disk.InitCredentialsAndConnections(ctx, b.ch, b.cfg, diskName); err != nil {
return err
}
downloadObjectDiskPartsWorkingGroup, ctx := errgroup.WithContext(ctx)
downloadObjectDiskPartsWorkingGroup.SetLimit(int(b.cfg.General.DownloadConcurrency))
for _, part := range parts {
partPath := path.Join(diskMap[diskName], "backup", backupName, "shadow", dbAndTableDir, diskName, part.Name)
if err := filepath.Walk(partPath, func(fPath string, fInfo fs.FileInfo, err error) error {
walkErr := filepath.Walk(partPath, func(fPath string, fInfo fs.FileInfo, err error) error {
if err != nil {
return err
}
Expand All @@ -870,29 +861,36 @@ func (b *Backuper) downloadObjectDiskParts(ctx context.Context, backupName strin
if objMeta.StorageObjectCount < 1 && objMeta.Version != object_disk.VersionRelativePath {
return fmt.Errorf("%s: invalid object_disk.Metadata: %#v", fPath, objMeta)
}
var srcBucket, srcKey string
for _, storageObject := range objMeta.StorageObjects {
if b.cfg.General.RemoteStorage == "s3" && diskType == "s3" {
srcBucket = b.cfg.S3.Bucket
srcKey = path.Join(b.cfg.S3.ObjectDiskPath, backupName, diskName, storageObject.ObjectRelativePath)
} else if b.cfg.General.RemoteStorage == "gcs" && diskType == "s3" {
srcBucket = b.cfg.GCS.Bucket
srcKey = path.Join(b.cfg.GCS.ObjectDiskPath, backupName, diskName, storageObject.ObjectRelativePath)
} else if b.cfg.General.RemoteStorage == "azblob" && diskType == "azure_blob_storage" {
srcBucket = b.cfg.AzureBlob.Container
srcKey = path.Join(b.cfg.AzureBlob.ObjectDiskPath, backupName, diskName, storageObject.ObjectRelativePath)
} else {
return fmt.Errorf("incompatible object_disk[%s].Type=%s amd remote_storage: %s", diskName, diskType, b.cfg.General.RemoteStorage)
}
if err = object_disk.CopyObject(ctx, b.ch, b.cfg, diskName, srcBucket, srcKey, storageObject.ObjectRelativePath); err != nil {
return fmt.Errorf("object_disk.CopyObject error: %v", err)
downloadObjectDiskPartsWorkingGroup.Go(func() error {
var srcBucket, srcKey string
for _, storageObject := range objMeta.StorageObjects {
if b.cfg.General.RemoteStorage == "s3" && diskType == "s3" {
srcBucket = b.cfg.S3.Bucket
srcKey = path.Join(b.cfg.S3.ObjectDiskPath, backupName, diskName, storageObject.ObjectRelativePath)
} else if b.cfg.General.RemoteStorage == "gcs" && diskType == "s3" {
srcBucket = b.cfg.GCS.Bucket
srcKey = path.Join(b.cfg.GCS.ObjectDiskPath, backupName, diskName, storageObject.ObjectRelativePath)
} else if b.cfg.General.RemoteStorage == "azblob" && diskType == "azure_blob_storage" {
srcBucket = b.cfg.AzureBlob.Container
srcKey = path.Join(b.cfg.AzureBlob.ObjectDiskPath, backupName, diskName, storageObject.ObjectRelativePath)
} else {
return fmt.Errorf("incompatible object_disk[%s].Type=%s amd remote_storage: %s", diskName, diskType, b.cfg.General.RemoteStorage)
}
if err = object_disk.CopyObject(ctx, b.ch, b.cfg, diskName, srcBucket, srcKey, storageObject.ObjectRelativePath); err != nil {
return fmt.Errorf("object_disk.CopyObject error: %v", err)
}
}
}
return nil
})
return nil
}); err != nil {
return err
})
if walkErr != nil {
return walkErr
}
}
if wgWaitErr := downloadObjectDiskPartsWorkingGroup.Wait(); wgWaitErr != nil {
return fmt.Errorf("one of downloadObjectDiskParts go-routine return error: %v", wgWaitErr)
}
}
}
log.WithField("duration", utils.HumanizeDuration(time.Since(start))).Debugf("done")
Expand Down
Loading

0 comments on commit 3e60dff

Please sign in to comment.