From 3e60dffc7e495c3cde766846b896048389c099b6 Mon Sep 17 00:00:00 2001 From: Slach Date: Fri, 15 Dec 2023 16:01:29 +0400 Subject: [PATCH] refactoring `semaphore.NewWeighted()` to `errgroup.SetLimit()`, add parallelization to `create` and `restore` command during call `CopyObject` --- ChangeLog.md | 5 ++++ pkg/backup/create.go | 59 +++++++++++++++++++++++-------------- pkg/backup/download.go | 50 +++++++------------------------- pkg/backup/restore.go | 66 ++++++++++++++++++++---------------------- pkg/backup/upload.go | 31 +++++++------------- 5 files changed, 96 insertions(+), 115 deletions(-) diff --git a/ChangeLog.md b/ChangeLog.md index 1164d077..004067c9 100644 --- a/ChangeLog.md +++ b/ChangeLog.md @@ -1,3 +1,8 @@ +# v2.4.14 +IMPROVEMENTS +- refactoring `semaphore.NewWeighted()` to `errgroup.SetLimit()` +- add parallelization to `create` and `restore` command during call `CopyObject` + # v2.4.13 BUG FIXES - fix object_disk.CopyObject during restore to allow use properly S3 endpoint diff --git a/pkg/backup/create.go b/pkg/backup/create.go index 7fab0969..7ef2570d 100644 --- a/pkg/backup/create.go +++ b/pkg/backup/create.go @@ -5,10 +5,12 @@ import ( "encoding/json" "errors" "fmt" + "golang.org/x/sync/errgroup" "os" "path" "path/filepath" "strings" + "sync" "time" "github.com/Altinity/clickhouse-backup/pkg/clickhouse" @@ -624,11 +626,20 @@ func (b *Backuper) AddTableToBackup(ctx context.Context, backupName, shadowBacku func (b *Backuper) uploadObjectDiskParts(ctx context.Context, backupName, backupShadowPath string, disk clickhouse.Disk) (int64, error) { var size int64 var err error + var sizeMutex sync.Mutex if err = object_disk.InitCredentialsAndConnections(ctx, b.ch, b.cfg, disk.Name); err != nil { return 0, err } + ctx, cancel := context.WithCancel(ctx) + defer cancel() + uploadObjectDiskPartsWorkingGroup, ctx := errgroup.WithContext(ctx) + uploadObjectDiskPartsWorkingGroup.SetLimit(int(b.cfg.General.UploadConcurrency)) + srcDiskConnection, exists := object_disk.DisksConnections[disk.Name] + if !exists { + return 0, fmt.Errorf("uploadObjectDiskParts: %s not present in object_disk.DisksConnections", disk.Name) + } - if err := filepath.Walk(backupShadowPath, func(fPath string, fInfo os.FileInfo, err error) error { + walkErr := filepath.Walk(backupShadowPath, func(fPath string, fInfo os.FileInfo, err error) error { if err != nil { return err } @@ -640,31 +651,37 @@ func (b *Backuper) uploadObjectDiskParts(ctx context.Context, backupName, backup return err } var realSize, objSize int64 - // @TODO think about parallelization here after test pass - for _, storageObject := range objPartFileMeta.StorageObjects { - srcDiskConnection, exists := object_disk.DisksConnections[disk.Name] - if !exists { - return fmt.Errorf("uploadObjectDiskParts: %s not present in object_disk.DisksConnections", disk.Name) + + uploadObjectDiskPartsWorkingGroup.Go(func() error { + for _, storageObject := range objPartFileMeta.StorageObjects { + if objSize, err = b.dst.CopyObject( + ctx, + srcDiskConnection.GetRemoteBucket(), + path.Join(srcDiskConnection.GetRemotePath(), storageObject.ObjectRelativePath), + path.Join(backupName, disk.Name, storageObject.ObjectRelativePath), + ); err != nil { + return err + } + realSize += objSize } - if objSize, err = b.dst.CopyObject( - ctx, - srcDiskConnection.GetRemoteBucket(), - path.Join(srcDiskConnection.GetRemotePath(), storageObject.ObjectRelativePath), - path.Join(backupName, disk.Name, storageObject.ObjectRelativePath), - ); err != nil { - return err + sizeMutex.Lock() + if realSize > objPartFileMeta.TotalSize { + size += realSize + } else { + size += objPartFileMeta.TotalSize } - realSize += objSize - } - if realSize > objPartFileMeta.TotalSize { - size += realSize - } else { - size += objPartFileMeta.TotalSize - } + sizeMutex.Unlock() + return nil + }) return nil - }); err != nil { + }) + if walkErr != nil { return 0, err } + + if wgWaitErr := uploadObjectDiskPartsWorkingGroup.Wait(); wgWaitErr != nil { + return 0, fmt.Errorf("one of uploadObjectDiskParts go-routine return error: %v", wgWaitErr) + } return size, nil } diff --git a/pkg/backup/download.go b/pkg/backup/download.go index ca6ff65c..ab14eb10 100644 --- a/pkg/backup/download.go +++ b/pkg/backup/download.go @@ -25,7 +25,6 @@ import ( "time" "golang.org/x/sync/errgroup" - "golang.org/x/sync/semaphore" "github.com/Altinity/clickhouse-backup/pkg/common" "github.com/Altinity/clickhouse-backup/pkg/metadata" @@ -72,7 +71,6 @@ func (b *Backuper) Download(backupName string, tablePattern string, partitions [ if err != nil { return err } - ctx, cancel = context.WithCancel(ctx) defer cancel() backupName = utils.CleanBackupNameRE.ReplaceAllString(backupName, "") if err := b.ch.Connect(); err != nil { @@ -189,18 +187,13 @@ func (b *Backuper) Download(backupName string, tablePattern string, partitions [ log.Debugf("prepare table METADATA concurrent semaphore with concurrency=%d len(tablesForDownload)=%d", b.cfg.General.DownloadConcurrency, len(tablesForDownload)) tableMetadataAfterDownload := make([]metadata.TableMetadata, len(tablesForDownload)) - downloadSemaphore := semaphore.NewWeighted(int64(b.cfg.General.DownloadConcurrency)) metadataGroup, metadataCtx := errgroup.WithContext(ctx) + metadataGroup.SetLimit(int(b.cfg.General.DownloadConcurrency)) for i, t := range tablesForDownload { - if err := downloadSemaphore.Acquire(metadataCtx, 1); err != nil { - log.Errorf("can't acquire semaphore during Download metadata: %v", err) - break - } metadataLogger := log.WithField("table_metadata", fmt.Sprintf("%s.%s", t.Database, t.Table)) idx := i tableTitle := t metadataGroup.Go(func() error { - defer downloadSemaphore.Release(1) downloadedMetadata, size, err := b.downloadTableMetadata(metadataCtx, backupName, disks, metadataLogger, tableTitle, schemaOnly, partitions) if err != nil { return err @@ -232,14 +225,9 @@ func (b *Backuper) Download(backupName string, tablePattern string, partitions [ if tableMetadata.MetadataOnly { continue } - if err := downloadSemaphore.Acquire(dataCtx, 1); err != nil { - log.Errorf("can't acquire semaphore during Download table data: %v", err) - break - } dataSize += tableMetadata.TotalBytes idx := i dataGroup.Go(func() error { - defer downloadSemaphore.Release(1) start := time.Now() if err := b.downloadTableData(dataCtx, remoteBackup.BackupMetadata, tableMetadataAfterDownload[idx]); err != nil { return err @@ -489,9 +477,10 @@ func (b *Backuper) downloadBackupRelatedDir(ctx context.Context, remoteBackup st func (b *Backuper) downloadTableData(ctx context.Context, remoteBackup metadata.BackupMetadata, table metadata.TableMetadata) error { log := b.log.WithField("logger", "downloadTableData") dbAndTableDir := path.Join(common.TablePathEncode(table.Database), common.TablePathEncode(table.Table)) - - s := semaphore.NewWeighted(int64(b.cfg.General.DownloadConcurrency)) - g, dataCtx := errgroup.WithContext(ctx) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + dataGroup, dataCtx := errgroup.WithContext(ctx) + dataGroup.SetLimit(int(b.cfg.General.DownloadConcurrency)) if remoteBackup.DataFormat != DirectoryFormat { capacity := 0 @@ -501,22 +490,16 @@ func (b *Backuper) downloadTableData(ctx context.Context, remoteBackup metadata. downloadOffset[disk] = 0 } log.Debugf("start %s.%s with concurrency=%d len(table.Files[...])=%d", table.Database, table.Table, b.cfg.General.DownloadConcurrency, capacity) - breakByErrorArchive: for common.SumMapValuesInt(downloadOffset) < capacity { for disk := range table.Files { if downloadOffset[disk] >= len(table.Files[disk]) { continue } archiveFile := table.Files[disk][downloadOffset[disk]] - if err := s.Acquire(dataCtx, 1); err != nil { - log.Errorf("can't acquire semaphore %s archive: %v", archiveFile, err) - break breakByErrorArchive - } tableLocalDir := b.getLocalBackupDataPathForTable(remoteBackup.BackupName, disk, dbAndTableDir) downloadOffset[disk] += 1 tableRemoteFile := path.Join(remoteBackup.BackupName, "shadow", common.TablePathEncode(table.Database), common.TablePathEncode(table.Table), archiveFile) - g.Go(func() error { - defer s.Release(1) + dataGroup.Go(func() error { log.Debugf("start download %s", tableRemoteFile) if b.resume && b.resumableState.IsAlreadyProcessedBool(tableRemoteFile) { return nil @@ -543,7 +526,6 @@ func (b *Backuper) downloadTableData(ctx context.Context, remoteBackup metadata. } log.Debugf("start %s.%s with concurrency=%d len(table.Parts[...])=%d", table.Database, table.Table, b.cfg.General.DownloadConcurrency, capacity) - breakByErrorDirectory: for disk, parts := range table.Parts { tableRemotePath := path.Join(remoteBackup.BackupName, "shadow", dbAndTableDir, disk) diskPath := b.DiskToPathMap[disk] @@ -556,13 +538,8 @@ func (b *Backuper) downloadTableData(ctx context.Context, remoteBackup metadata. continue } partRemotePath := path.Join(tableRemotePath, part.Name) - if err := s.Acquire(dataCtx, 1); err != nil { - log.Errorf("can't acquire semaphore %s directory: %v", partRemotePath, err) - break breakByErrorDirectory - } partLocalPath := path.Join(tableLocalPath, part.Name) - g.Go(func() error { - defer s.Release(1) + dataGroup.Go(func() error { log.Debugf("start %s -> %s", partRemotePath, partLocalPath) if b.resume && b.resumableState.IsAlreadyProcessedBool(partRemotePath) { return nil @@ -579,7 +556,7 @@ func (b *Backuper) downloadTableData(ctx context.Context, remoteBackup metadata. } } } - if err := g.Wait(); err != nil { + if err := dataGroup.Wait(); err != nil { return fmt.Errorf("one of downloadTableData go-routine return error: %v", err) } @@ -597,14 +574,14 @@ func (b *Backuper) downloadDiffParts(ctx context.Context, remoteBackup metadata. log := b.log.WithField("operation", "downloadDiffParts") log.WithField("table", fmt.Sprintf("%s.%s", table.Database, table.Table)).Debug("start") start := time.Now() + ctx, cancel := context.WithCancel(ctx) + defer cancel() downloadedDiffParts := uint32(0) - s := semaphore.NewWeighted(int64(b.cfg.General.DownloadConcurrency)) downloadDiffGroup, downloadDiffCtx := errgroup.WithContext(ctx) - + downloadDiffGroup.SetLimit(int(b.cfg.General.DownloadConcurrency)) diffRemoteFilesCache := map[string]*sync.Mutex{} diffRemoteFilesLock := &sync.Mutex{} -breakByError: for disk, parts := range table.Parts { for _, part := range parts { newPath := path.Join(b.DiskToPathMap[disk], "backup", remoteBackup.BackupName, "shadow", dbAndTableDir, disk, part.Name) @@ -620,14 +597,9 @@ breakByError: return fmt.Errorf("%s stat return error: %v", existsPath, err) } if err != nil && os.IsNotExist(err) { - if err := s.Acquire(downloadDiffCtx, 1); err != nil { - log.Errorf("can't acquire semaphore during downloadDiffParts: %v", err) - break breakByError - } partForDownload := part diskForDownload := disk downloadDiffGroup.Go(func() error { - defer s.Release(1) tableRemoteFiles, err := b.findDiffBackupFilesRemote(downloadDiffCtx, remoteBackup, table, diskForDownload, partForDownload, log) if err != nil { return err diff --git a/pkg/backup/restore.go b/pkg/backup/restore.go index a104c0b0..b1967037 100644 --- a/pkg/backup/restore.go +++ b/pkg/backup/restore.go @@ -7,8 +7,8 @@ import ( "github.com/Altinity/clickhouse-backup/pkg/config" "github.com/Altinity/clickhouse-backup/pkg/keeper" "github.com/Altinity/clickhouse-backup/pkg/status" - "github.com/Altinity/clickhouse-backup/pkg/storage" "github.com/Altinity/clickhouse-backup/pkg/storage/object_disk" + "golang.org/x/sync/errgroup" "io/fs" "net/url" "os" @@ -814,6 +814,8 @@ func (b *Backuper) downloadObjectDiskParts(ctx context.Context, backupName strin log := apexLog.WithFields(apexLog.Fields{"operation": "downloadObjectDiskParts"}) start := time.Now() dbAndTableDir := path.Join(common.TablePathEncode(backupTable.Database), common.TablePathEncode(backupTable.Table)) + ctx, cancel := context.WithCancel(ctx) + defer cancel() var err error needToDownloadObjectDisk := false for diskName := range backupTable.Parts { @@ -829,19 +831,6 @@ func (b *Backuper) downloadObjectDiskParts(ctx context.Context, backupName strin if !needToDownloadObjectDisk { return nil } - b.dst, err = storage.NewBackupDestination(ctx, b.cfg, b.ch, false, backupName) - if err != nil { - return err - } - if err = b.dst.Connect(ctx); err != nil { - return fmt.Errorf("can't connect to %s: %v", b.dst.Kind(), err) - } - defer func() { - if err := b.dst.Close(ctx); err != nil { - b.log.Warnf("downloadObjectDiskParts: can't close BackupDestination error: %v", err) - } - }() - for diskName, parts := range backupTable.Parts { diskType, exists := diskTypes[diskName] if !exists { @@ -854,9 +843,11 @@ func (b *Backuper) downloadObjectDiskParts(ctx context.Context, backupName strin if err = object_disk.InitCredentialsAndConnections(ctx, b.ch, b.cfg, diskName); err != nil { return err } + downloadObjectDiskPartsWorkingGroup, ctx := errgroup.WithContext(ctx) + downloadObjectDiskPartsWorkingGroup.SetLimit(int(b.cfg.General.DownloadConcurrency)) for _, part := range parts { partPath := path.Join(diskMap[diskName], "backup", backupName, "shadow", dbAndTableDir, diskName, part.Name) - if err := filepath.Walk(partPath, func(fPath string, fInfo fs.FileInfo, err error) error { + walkErr := filepath.Walk(partPath, func(fPath string, fInfo fs.FileInfo, err error) error { if err != nil { return err } @@ -870,29 +861,36 @@ func (b *Backuper) downloadObjectDiskParts(ctx context.Context, backupName strin if objMeta.StorageObjectCount < 1 && objMeta.Version != object_disk.VersionRelativePath { return fmt.Errorf("%s: invalid object_disk.Metadata: %#v", fPath, objMeta) } - var srcBucket, srcKey string - for _, storageObject := range objMeta.StorageObjects { - if b.cfg.General.RemoteStorage == "s3" && diskType == "s3" { - srcBucket = b.cfg.S3.Bucket - srcKey = path.Join(b.cfg.S3.ObjectDiskPath, backupName, diskName, storageObject.ObjectRelativePath) - } else if b.cfg.General.RemoteStorage == "gcs" && diskType == "s3" { - srcBucket = b.cfg.GCS.Bucket - srcKey = path.Join(b.cfg.GCS.ObjectDiskPath, backupName, diskName, storageObject.ObjectRelativePath) - } else if b.cfg.General.RemoteStorage == "azblob" && diskType == "azure_blob_storage" { - srcBucket = b.cfg.AzureBlob.Container - srcKey = path.Join(b.cfg.AzureBlob.ObjectDiskPath, backupName, diskName, storageObject.ObjectRelativePath) - } else { - return fmt.Errorf("incompatible object_disk[%s].Type=%s amd remote_storage: %s", diskName, diskType, b.cfg.General.RemoteStorage) - } - if err = object_disk.CopyObject(ctx, b.ch, b.cfg, diskName, srcBucket, srcKey, storageObject.ObjectRelativePath); err != nil { - return fmt.Errorf("object_disk.CopyObject error: %v", err) + downloadObjectDiskPartsWorkingGroup.Go(func() error { + var srcBucket, srcKey string + for _, storageObject := range objMeta.StorageObjects { + if b.cfg.General.RemoteStorage == "s3" && diskType == "s3" { + srcBucket = b.cfg.S3.Bucket + srcKey = path.Join(b.cfg.S3.ObjectDiskPath, backupName, diskName, storageObject.ObjectRelativePath) + } else if b.cfg.General.RemoteStorage == "gcs" && diskType == "s3" { + srcBucket = b.cfg.GCS.Bucket + srcKey = path.Join(b.cfg.GCS.ObjectDiskPath, backupName, diskName, storageObject.ObjectRelativePath) + } else if b.cfg.General.RemoteStorage == "azblob" && diskType == "azure_blob_storage" { + srcBucket = b.cfg.AzureBlob.Container + srcKey = path.Join(b.cfg.AzureBlob.ObjectDiskPath, backupName, diskName, storageObject.ObjectRelativePath) + } else { + return fmt.Errorf("incompatible object_disk[%s].Type=%s amd remote_storage: %s", diskName, diskType, b.cfg.General.RemoteStorage) + } + if err = object_disk.CopyObject(ctx, b.ch, b.cfg, diskName, srcBucket, srcKey, storageObject.ObjectRelativePath); err != nil { + return fmt.Errorf("object_disk.CopyObject error: %v", err) + } } - } + return nil + }) return nil - }); err != nil { - return err + }) + if walkErr != nil { + return walkErr } } + if wgWaitErr := downloadObjectDiskPartsWorkingGroup.Wait(); wgWaitErr != nil { + return fmt.Errorf("one of downloadObjectDiskParts go-routine return error: %v", wgWaitErr) + } } } log.WithField("duration", utils.HumanizeDuration(time.Since(start))).Debugf("done") diff --git a/pkg/backup/upload.go b/pkg/backup/upload.go index 0a84ba87..3c6fa778 100644 --- a/pkg/backup/upload.go +++ b/pkg/backup/upload.go @@ -23,7 +23,6 @@ import ( "github.com/eapache/go-resiliency/retrier" "golang.org/x/sync/errgroup" - "golang.org/x/sync/semaphore" "github.com/Altinity/clickhouse-backup/pkg/common" "github.com/Altinity/clickhouse-backup/pkg/filesystemhelper" @@ -131,14 +130,10 @@ func (b *Backuper) Upload(backupName, diffFrom, diffFromRemote, tablePattern str metadataSize := int64(0) log.Debugf("prepare table concurrent semaphore with concurrency=%d len(tablesForUpload)=%d", b.cfg.General.UploadConcurrency, len(tablesForUpload)) - uploadSemaphore := semaphore.NewWeighted(int64(b.cfg.General.UploadConcurrency)) uploadGroup, uploadCtx := errgroup.WithContext(ctx) + uploadGroup.SetLimit(int(b.cfg.General.UploadConcurrency)) for i, table := range tablesForUpload { - if err := uploadSemaphore.Acquire(uploadCtx, 1); err != nil { - log.Errorf("can't acquire semaphore during Upload table: %v", err) - break - } start := time.Now() if !schemaOnly { if diffTable, diffExists := tablesForUploadFromDiff[metadata.TableTitle{ @@ -151,7 +146,6 @@ func (b *Backuper) Upload(backupName, diffFrom, diffFromRemote, tablePattern str } idx := i uploadGroup.Go(func() error { - defer uploadSemaphore.Release(1) var uploadedBytes int64 if !schemaOnly { var files map[string][]string @@ -505,12 +499,14 @@ func (b *Backuper) uploadTableData(ctx context.Context, backupName string, table } log := b.log.WithField("logger", "uploadTableData") log.Debugf("start %s.%s with concurrency=%d len(table.Parts[...])=%d", table.Database, table.Table, b.cfg.General.UploadConcurrency, capacity) - s := semaphore.NewWeighted(int64(b.cfg.General.UploadConcurrency)) - g, ctx := errgroup.WithContext(ctx) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + dataGroup, ctx := errgroup.WithContext(ctx) + dataGroup.SetLimit(int(b.cfg.General.UploadConcurrency)) var uploadedBytes int64 - splitParts := make(map[string][]metadata.SplitPartFiles, 0) - splitPartsOffset := make(map[string]int, 0) + splitParts := make(map[string][]metadata.SplitPartFiles) + splitPartsOffset := make(map[string]int) splitPartsCapacity := 0 for disk := range table.Parts { backupPath := b.getLocalBackupDataPathForTable(backupName, disk, dbAndTablePath) @@ -522,16 +518,11 @@ func (b *Backuper) uploadTableData(ctx context.Context, backupName string, table splitPartsOffset[disk] = 0 splitPartsCapacity += len(splitPartsList) } -breakByError: for common.SumMapValuesInt(splitPartsOffset) < splitPartsCapacity { for disk := range table.Parts { if splitPartsOffset[disk] >= len(splitParts[disk]) { continue } - if err := s.Acquire(ctx, 1); err != nil { - log.Errorf("can't acquire semaphore during Upload data parts: %v", err) - break breakByError - } backupPath := b.getLocalBackupDataPathForTable(backupName, disk, dbAndTablePath) splitPart := splitParts[disk][splitPartsOffset[disk]] partSuffix := splitPart.Prefix @@ -541,8 +532,7 @@ breakByError: if b.cfg.GetCompressionFormat() == "none" { remotePath := path.Join(baseRemoteDataPath, disk) remotePathFull := path.Join(remotePath, partSuffix) - g.Go(func() error { - defer s.Release(1) + dataGroup.Go(func() error { if b.resume { if isProcessed, processedSize := b.resumableState.IsAlreadyProcessed(remotePathFull); isProcessed { atomic.AddInt64(&uploadedBytes, processedSize) @@ -567,8 +557,7 @@ breakByError: uploadedFiles[disk] = append(uploadedFiles[disk], fileName) remoteDataFile := path.Join(baseRemoteDataPath, fileName) localFiles := partFiles - g.Go(func() error { - defer s.Release(1) + dataGroup.Go(func() error { if b.resume { if isProcessed, processedSize := b.resumableState.IsAlreadyProcessed(remoteDataFile); isProcessed { atomic.AddInt64(&uploadedBytes, processedSize) @@ -604,7 +593,7 @@ breakByError: } } } - if err := g.Wait(); err != nil { + if err := dataGroup.Wait(); err != nil { return nil, 0, fmt.Errorf("one of uploadTableData go-routine return error: %v", err) } log.Debugf("finish %s.%s with concurrency=%d len(table.Parts[...])=%d uploadedFiles=%v, uploadedBytes=%v", table.Database, table.Table, b.cfg.General.UploadConcurrency, capacity, uploadedFiles, uploadedBytes)