Skip to content

Commit

Permalink
added "object_disk_size" to upload and download command logs
Browse files Browse the repository at this point in the history
  • Loading branch information
Slach committed Jun 4, 2024
1 parent eb33790 commit 95545c0
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 32 deletions.
2 changes: 2 additions & 0 deletions ChangeLog.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
# v2.5.12
IMPROVEMENTS
- added "object_disk_size" to upload and download command logs

BUG FIXES
- fixed corner case in `API server` hang when `watch` background command failures, fix [929](https://github.com/Altinity/clickhouse-backup/pull/929) thanks @tadus21
Expand Down
30 changes: 19 additions & 11 deletions pkg/backup/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,19 +231,21 @@ func (b *Backuper) Download(backupName string, tablePattern string, partitions [

backupMetadata := remoteBackup.BackupMetadata
backupMetadata.Tables = tablesForDownload
backupMetadata.DataSize = dataSize
backupMetadata.MetadataSize = metadataSize

if b.isEmbedded && b.cfg.ClickHouse.EmbeddedBackupDisk != "" && backupMetadata.Tables != nil && len(backupMetadata.Tables) > 0 {
localClickHouseBackupFile := path.Join(b.EmbeddedBackupDataPath, backupName, ".backup")
remoteClickHouseBackupFile := path.Join(backupName, ".backup")
if err = b.downloadSingleBackupFile(ctx, remoteClickHouseBackupFile, localClickHouseBackupFile, disks); err != nil {
localEmbeddedMetadataSize := int64(0)
if localEmbeddedMetadataSize, err = b.downloadSingleBackupFile(ctx, remoteClickHouseBackupFile, localClickHouseBackupFile, disks); err != nil {
return err
}
metadataSize += uint64(localEmbeddedMetadataSize)
}

backupMetadata.CompressedSize = 0
backupMetadata.DataFormat = ""
backupMetadata.DataSize = dataSize
backupMetadata.MetadataSize = metadataSize
backupMetadata.ConfigSize = configSize
backupMetadata.RBACSize = rbacSize
backupMetadata.ClickhouseBackupVersion = backupVersion
Expand Down Expand Up @@ -285,7 +287,8 @@ func (b *Backuper) Download(backupName string, tablePattern string, partitions [

log.WithFields(apexLog.Fields{
"duration": utils.HumanizeDuration(time.Since(startDownload)),
"size": utils.FormatBytes(dataSize + metadataSize + rbacSize + configSize),
"download_size": utils.FormatBytes(dataSize + metadataSize + rbacSize + configSize),
"object_disk_size": utils.FormatBytes(backupMetadata.ObjectDiskSize),
"version": backupVersion,
}).Info("done")
return nil
Expand Down Expand Up @@ -1076,12 +1079,17 @@ func (b *Backuper) makePartHardlinks(exists, new string) error {
return nil
}

func (b *Backuper) downloadSingleBackupFile(ctx context.Context, remoteFile string, localFile string, disks []clickhouse.Disk) error {
if b.resume && b.resumableState.IsAlreadyProcessedBool(remoteFile) {
return nil
func (b *Backuper) downloadSingleBackupFile(ctx context.Context, remoteFile string, localFile string, disks []clickhouse.Disk) (int64, error) {
var size int64
var isProcessed bool
if b.resume {
if isProcessed, size = b.resumableState.IsAlreadyProcessed(remoteFile); isProcessed {
return size, nil
}
}
log := b.log.WithField("logger", "downloadSingleBackupFile")
retry := retrier.New(retrier.ConstantBackoff(b.cfg.General.RetriesOnFailure, b.cfg.General.RetriesDuration), nil)

err := retry.RunCtx(ctx, func(ctx context.Context) error {
remoteReader, err := b.dst.GetFileReader(ctx, remoteFile)
if err != nil {
Expand All @@ -1105,7 +1113,7 @@ func (b *Backuper) downloadSingleBackupFile(ctx context.Context, remoteFile stri
}
}()

_, err = io.CopyBuffer(localWriter, remoteReader, nil)
size, err = io.CopyBuffer(localWriter, remoteReader, nil)
if err != nil {
return err
}
Expand All @@ -1116,12 +1124,12 @@ func (b *Backuper) downloadSingleBackupFile(ctx context.Context, remoteFile stri
return nil
})
if err != nil {
return err
return 0, err
}
if b.resume {
b.resumableState.AppendToState(remoteFile, 0)
b.resumableState.AppendToState(remoteFile, size)
}
return nil
return size, nil
}

// filterDisksByStoragePolicyAndType - https://github.com/Altinity/clickhouse-backup/issues/561
Expand Down
46 changes: 26 additions & 20 deletions pkg/backup/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,16 @@ func (b *Backuper) Upload(backupName string, deleteSource bool, diffFrom, diffFr
if backupMetadata.ConfigSize, err = b.uploadConfigData(ctx, backupName); err != nil {
return fmt.Errorf("b.uploadConfigData return error: %v", err)
}
//upload embedded .backup file
if b.isEmbedded && b.cfg.ClickHouse.EmbeddedBackupDisk != "" && backupMetadata.Tables != nil && len(backupMetadata.Tables) > 0 {
localClickHouseBackupFile := path.Join(b.EmbeddedBackupDataPath, backupName, ".backup")
remoteClickHouseBackupFile := path.Join(backupName, ".backup")
localEmbeddedMetadataSize := int64(0)
if localEmbeddedMetadataSize, err = b.uploadSingleBackupFile(ctx, localClickHouseBackupFile, remoteClickHouseBackupFile); err != nil {
return fmt.Errorf("b.uploadSingleBackupFile return error: %v", err)
}
metadataSize += localEmbeddedMetadataSize
}

// upload metadata for backup
backupMetadata.CompressedSize = uint64(compressedDataSize)
Expand Down Expand Up @@ -220,20 +230,14 @@ func (b *Backuper) Upload(backupName string, deleteSource bool, diffFrom, diffFr
return fmt.Errorf("can't upload %s: %v", remoteBackupMetaFile, err)
}
}
if b.isEmbedded && b.cfg.ClickHouse.EmbeddedBackupDisk != "" && backupMetadata.Tables != nil && len(backupMetadata.Tables) > 0 {
localClickHouseBackupFile := path.Join(b.EmbeddedBackupDataPath, backupName, ".backup")
remoteClickHouseBackupFile := path.Join(backupName, ".backup")
if err = b.uploadSingleBackupFile(ctx, localClickHouseBackupFile, remoteClickHouseBackupFile); err != nil {
return fmt.Errorf("b.uploadSingleBackupFile return error: %v", err)
}
}
if b.resume {
b.resumableState.Close()
}
log.WithFields(apexLog.Fields{
"duration": utils.HumanizeDuration(time.Since(startUpload)),
"size": utils.FormatBytes(uint64(compressedDataSize) + uint64(metadataSize) + uint64(len(newBackupMetadataBody)) + backupMetadata.RBACSize + backupMetadata.ConfigSize),
"version": backupVersion,
"duration": utils.HumanizeDuration(time.Since(startUpload)),
"upload_size": utils.FormatBytes(uint64(compressedDataSize) + uint64(metadataSize) + uint64(len(newBackupMetadataBody)) + backupMetadata.RBACSize + backupMetadata.ConfigSize),
"object_disk_size": utils.FormatBytes(backupMetadata.ObjectDiskSize),
"version": backupVersion,
}).Info("done")

// Remote old backup retention
Expand Down Expand Up @@ -290,14 +294,16 @@ func (b *Backuper) RemoveOldBackupsRemote(ctx context.Context) error {
return nil
}

func (b *Backuper) uploadSingleBackupFile(ctx context.Context, localFile, remoteFile string) error {
if b.resume && b.resumableState.IsAlreadyProcessedBool(remoteFile) {
return nil
func (b *Backuper) uploadSingleBackupFile(ctx context.Context, localFile, remoteFile string) (int64, error) {
if b.resume {
if isProcessed, size := b.resumableState.IsAlreadyProcessed(remoteFile); isProcessed {
return size, nil
}
}
log := b.log.WithField("logger", "uploadSingleBackupFile")
f, err := os.Open(localFile)
if err != nil {
return fmt.Errorf("can't open %s: %v", localFile, err)
return 0, fmt.Errorf("can't open %s: %v", localFile, err)
}
defer func() {
if err := f.Close(); err != nil {
Expand All @@ -309,16 +315,16 @@ func (b *Backuper) uploadSingleBackupFile(ctx context.Context, localFile, remote
return b.dst.PutFile(ctx, remoteFile, f)
})
if err != nil {
return fmt.Errorf("can't upload %s: %v", remoteFile, err)
return 0, fmt.Errorf("can't upload %s: %v", remoteFile, err)
}
info, err := os.Stat(localFile)
if err != nil {
return 0, fmt.Errorf("can't stat %s", localFile)
}
if b.resume {
info, err := os.Stat(localFile)
if err != nil {
return fmt.Errorf("can't stat %s", localFile)
}
b.resumableState.AppendToState(remoteFile, info.Size())
}
return nil
return info.Size(), nil
}

func (b *Backuper) prepareTableListToUpload(ctx context.Context, backupName string, tablePattern string, partitions []string) (tablesForUpload ListOfTables, err error) {
Expand Down
2 changes: 1 addition & 1 deletion test/integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,7 @@ func TestConfigs(t *testing.T) {
}
}

// TestLongListRemote - no parallel, cause need to restart minito
// TestLongListRemote - no parallel, cause need to restart minio
func TestLongListRemote(t *testing.T) {
ch := &TestClickHouse{}
r := require.New(t)
Expand Down

0 comments on commit 95545c0

Please sign in to comment.