diff --git a/pkg/backup/create.go b/pkg/backup/create.go index 9038795e..41dc3341 100644 --- a/pkg/backup/create.go +++ b/pkg/backup/create.go @@ -593,7 +593,7 @@ func (b *Backuper) AddTableToBackup(ctx context.Context, backupName, shadowBacku return disksToPartsMap, realSize, err } realSize[disk.Name] += size - log.WithField("disk", disk.Name).WithField("duration", utils.HumanizeDuration(time.Since(start))).Info("object_disk data uploaded") + log.WithField("disk", disk.Name).WithField("duration", utils.HumanizeDuration(time.Since(start))).WithField("size", utils.FormatBytes(uint64(size))).Info("object_disk data uploaded") } // Clean all the files under the shadowPath, cause UNFREEZE unavailable if version < 21004000 { diff --git a/pkg/backup/restore.go b/pkg/backup/restore.go index 2f52ebd4..ccedbfd2 100644 --- a/pkg/backup/restore.go +++ b/pkg/backup/restore.go @@ -17,6 +17,7 @@ import ( "path/filepath" "regexp" "strings" + "sync" "time" "github.com/Altinity/clickhouse-backup/pkg/common" @@ -811,8 +812,11 @@ func (b *Backuper) restoreDataRegularByParts(ctx context.Context, backupName str } func (b *Backuper) downloadObjectDiskParts(ctx context.Context, backupName string, backupTable metadata.TableMetadata, diskMap, diskTypes map[string]string) error { - log := apexLog.WithFields(apexLog.Fields{"operation": "downloadObjectDiskParts"}) - start := time.Now() + log := apexLog.WithFields(apexLog.Fields{ + "operation": "downloadObjectDiskParts", + "table": fmt.Sprintf("%s.%s", backupTable.Database, backupTable.Table), + }) + size := int64(0) dbAndTableDir := path.Join(common.TablePathEncode(backupTable.Database), common.TablePathEncode(backupTable.Table)) ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -843,8 +847,10 @@ func (b *Backuper) downloadObjectDiskParts(ctx context.Context, backupName strin if err = object_disk.InitCredentialsAndConnections(ctx, b.ch, b.cfg, diskName); err != nil { return err } + start := time.Now() downloadObjectDiskPartsWorkingGroup, ctx := errgroup.WithContext(ctx) downloadObjectDiskPartsWorkingGroup.SetLimit(int(b.cfg.General.DownloadConcurrency)) + sizeMutex := sync.Mutex{} for _, part := range parts { partPath := path.Join(diskMap[diskName], "backup", backupName, "shadow", dbAndTableDir, diskName, part.Name) walkErr := filepath.Walk(partPath, func(fPath string, fInfo fs.FileInfo, err error) error { @@ -879,8 +885,12 @@ func (b *Backuper) downloadObjectDiskParts(ctx context.Context, backupName strin } else { return fmt.Errorf("incompatible object_disk[%s].Type=%s amd remote_storage: %s", diskName, diskType, b.cfg.General.RemoteStorage) } - if err = object_disk.CopyObject(ctx, b.ch, b.cfg, diskName, srcBucket, srcKey, storageObject.ObjectRelativePath); err != nil { + if copiedSize, copyObjectErr := object_disk.CopyObject(ctx, b.ch, b.cfg, diskName, srcBucket, srcKey, storageObject.ObjectRelativePath); copyObjectErr != nil { return fmt.Errorf("object_disk.CopyObject error: %v", err) + } else { + sizeMutex.Lock() + size += copiedSize + sizeMutex.Unlock() } } return nil @@ -894,9 +904,10 @@ func (b *Backuper) downloadObjectDiskParts(ctx context.Context, backupName strin if wgWaitErr := downloadObjectDiskPartsWorkingGroup.Wait(); wgWaitErr != nil { return fmt.Errorf("one of downloadObjectDiskParts go-routine return error: %v", wgWaitErr) } + log.WithField("disk", diskName).WithField("duration", utils.HumanizeDuration(time.Since(start))).WithField("size", utils.FormatBytes(uint64(size))).Info("object_disk data downloaded") } } - log.WithField("duration", utils.HumanizeDuration(time.Since(start))).Debugf("done") + return nil } diff --git a/pkg/storage/object_disk/object_disk.go b/pkg/storage/object_disk/object_disk.go index f0b8d178..8b93cbb4 100644 --- a/pkg/storage/object_disk/object_disk.go +++ b/pkg/storage/object_disk/object_disk.go @@ -602,6 +602,7 @@ func DeleteFile(ctx context.Context, diskName, remotePath string) error { return remoteStorage.DeleteFile(ctx, remotePath) } +/* func DeleteFileWithContent(ctx context.Context, ch *clickhouse.ClickHouse, cfg *config.Config, diskName, localPath string) error { if err := InitCredentialsAndConnections(ctx, ch, cfg, diskName); err != nil { return err @@ -629,13 +630,13 @@ func GetFileSize(ctx context.Context, ch *clickhouse.ClickHouse, cfg *config.Con } return fileInfo.Size(), nil } +*/ -func CopyObject(ctx context.Context, ch *clickhouse.ClickHouse, cfg *config.Config, diskName, srcBucket, srcKey, dstPath string) error { +func CopyObject(ctx context.Context, ch *clickhouse.ClickHouse, cfg *config.Config, diskName, srcBucket, srcKey, dstPath string) (int64, error) { if err := InitCredentialsAndConnections(ctx, ch, cfg, diskName); err != nil { - return err + return 0, err } connection := DisksConnections[diskName] remoteStorage := connection.GetRemoteStorage() - _, err := remoteStorage.CopyObject(ctx, srcBucket, srcKey, dstPath) - return err + return remoteStorage.CopyObject(ctx, srcBucket, srcKey, dstPath) }