Skip to content

Commit

Permalink
refactoring of restore command to allow parallel execution of `ALTE…
Browse files Browse the repository at this point in the history
…R TABLE ... ATTACH PART` and improve parallelization of CopyObject during restore.
  • Loading branch information
Slach committed Jan 28, 2024
1 parent e22d174 commit c031026
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 27 deletions.
4 changes: 4 additions & 0 deletions ChangeLog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# v2.4.23
IMPROVEMENTS
- refactoring of `restore` command to allow parallel execution of `ALTER TABLE ... ATTACH PART` and improve parallelization of CopyObject during restore.

# v2.4.22
BUG FIXES
- change `S3_MAX_PARTS_COUNT` default value from `256` to `2000` to fix memory usage for s3 which increased for 2.4.16+
Expand Down
63 changes: 36 additions & 27 deletions pkg/backup/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -744,8 +744,11 @@ func (b *Backuper) restoreDataRegular(ctx context.Context, backupName string, ta
if len(missingTables) > 0 {
return fmt.Errorf("%s is not created. Restore schema first or create missing tables manually", strings.Join(missingTables, ", "))
}
restoreBackupWorkingGroup, restoreCtx := errgroup.WithContext(ctx)
restoreBackupWorkingGroup.SetLimit(int(b.cfg.General.DownloadConcurrency))

for i, table := range tablesForRestore {
for i := range tablesForRestore {
table := tablesForRestore[i]
// need mapped database path and original table.Database for HardlinkBackupPartsToStorage
dstDatabase := table.Database
if len(b.cfg.General.RestoreDatabaseMapping) > 0 {
Expand All @@ -761,28 +764,34 @@ func (b *Backuper) restoreDataRegular(ctx context.Context, backupName string, ta
if !ok {
return fmt.Errorf("can't find '%s.%s' in current system.tables", dstDatabase, table.Table)
}
// https://github.com/Altinity/clickhouse-backup/issues/529
if b.cfg.ClickHouse.RestoreAsAttach {
if err = b.restoreDataRegularByAttach(ctx, backupName, table, diskMap, diskTypes, disks, dstTable, log, tablesForRestore, i); err != nil {
return err
}
} else {
if err = b.restoreDataRegularByParts(ctx, backupName, table, diskMap, diskTypes, disks, dstTable, log, tablesForRestore, i); err != nil {
return err
restoreBackupWorkingGroup.Go(func() error {
// https://github.com/Altinity/clickhouse-backup/issues/529
if b.cfg.ClickHouse.RestoreAsAttach {
if restoreErr := b.restoreDataRegularByAttach(restoreCtx, backupName, table, diskMap, diskTypes, disks, dstTable, log); restoreErr != nil {
return restoreErr
}
} else {
if restoreErr := b.restoreDataRegularByParts(restoreCtx, backupName, table, diskMap, diskTypes, disks, dstTable, log); restoreErr != nil {
return restoreErr
}
}
}
// https://github.com/Altinity/clickhouse-backup/issues/529
for _, mutation := range table.Mutations {
if err := b.ch.ApplyMutation(ctx, tablesForRestore[i], mutation); err != nil {
log.Warnf("can't apply mutation %s for table `%s`.`%s` : %v", mutation.Command, tablesForRestore[i].Database, tablesForRestore[i].Table, err)
// https://github.com/Altinity/clickhouse-backup/issues/529
for _, mutation := range table.Mutations {
if err := b.ch.ApplyMutation(restoreCtx, tablesForRestore[i], mutation); err != nil {

Check failure on line 780 in pkg/backup/restore.go

View workflow job for this annotation

GitHub Actions / Build (1.21)

loop variable i captured by func literal
log.Warnf("can't apply mutation %s for table `%s`.`%s` : %v", mutation.Command, tablesForRestore[i].Database, tablesForRestore[i].Table, err)

Check failure on line 781 in pkg/backup/restore.go

View workflow job for this annotation

GitHub Actions / Build (1.21)

loop variable i captured by func literal

Check failure on line 781 in pkg/backup/restore.go

View workflow job for this annotation

GitHub Actions / Build (1.21)

loop variable i captured by func literal
}
}
}
log.Info("done")
log.Info("done")
return nil
})
}
if wgWaitErr := restoreBackupWorkingGroup.Wait(); wgWaitErr != nil {
return fmt.Errorf("one of restoreDataRegular go-routine return error: %v", wgWaitErr)
}
return nil
}

func (b *Backuper) restoreDataRegularByAttach(ctx context.Context, backupName string, table metadata.TableMetadata, diskMap, diskTypes map[string]string, disks []clickhouse.Disk, dstTable clickhouse.Table, log *apexLog.Entry, tablesForRestore ListOfTables, i int) error {
func (b *Backuper) restoreDataRegularByAttach(ctx context.Context, backupName string, table metadata.TableMetadata, diskMap, diskTypes map[string]string, disks []clickhouse.Disk, dstTable clickhouse.Table, log *apexLog.Entry) error {
if err := filesystemhelper.HardlinkBackupPartsToStorage(backupName, table, disks, dstTable.DataPaths, b.ch, false); err != nil {
return fmt.Errorf("can't copy data to storage '%s.%s': %v", table.Database, table.Table, err)
}
Expand All @@ -791,22 +800,22 @@ func (b *Backuper) restoreDataRegularByAttach(ctx context.Context, backupName st
return fmt.Errorf("can't restore object_disk server-side copy data parts '%s.%s': %v", table.Database, table.Table, err)
}

if err := b.ch.AttachTable(ctx, tablesForRestore[i]); err != nil {
return fmt.Errorf("can't attach table '%s.%s': %v", tablesForRestore[i].Database, tablesForRestore[i].Table, err)
if err := b.ch.AttachTable(ctx, table); err != nil {
return fmt.Errorf("can't attach table '%s.%s': %v", table.Database, table.Table, err)
}
return nil
}

func (b *Backuper) restoreDataRegularByParts(ctx context.Context, backupName string, table metadata.TableMetadata, diskMap, diskTypes map[string]string, disks []clickhouse.Disk, dstTable clickhouse.Table, log *apexLog.Entry, tablesForRestore ListOfTables, i int) error {
func (b *Backuper) restoreDataRegularByParts(ctx context.Context, backupName string, table metadata.TableMetadata, diskMap, diskTypes map[string]string, disks []clickhouse.Disk, dstTable clickhouse.Table, log *apexLog.Entry) error {
if err := filesystemhelper.HardlinkBackupPartsToStorage(backupName, table, disks, dstTable.DataPaths, b.ch, true); err != nil {
return fmt.Errorf("can't copy data to detached '%s.%s': %v", table.Database, table.Table, err)
}
log.Debug("data to 'detached' copied")
if err := b.downloadObjectDiskParts(ctx, backupName, table, diskMap, diskTypes); err != nil {
return fmt.Errorf("can't restore object_disk server-side copy data parts '%s.%s': %v", table.Database, table.Table, err)
}
if err := b.ch.AttachDataParts(tablesForRestore[i], disks); err != nil {
return fmt.Errorf("can't attach data parts for table '%s.%s': %v", tablesForRestore[i].Database, tablesForRestore[i].Table, err)
if err := b.ch.AttachDataParts(table, disks); err != nil {
return fmt.Errorf("can't attach data parts for table '%s.%s': %v", table.Database, table.Table, err)
}
return nil
}
Expand All @@ -828,6 +837,9 @@ func (b *Backuper) downloadObjectDiskParts(ctx context.Context, backupName strin
return fmt.Errorf("%s disk doesn't present in diskTypes: %v", diskName, diskTypes)
}
if diskType == "s3" || diskType == "azure_blob_storage" {
if err = config.ValidateObjectDiskConfig(b.cfg); err != nil {
return err
}
needToDownloadObjectDisk = true
break
}
Expand All @@ -841,14 +853,11 @@ func (b *Backuper) downloadObjectDiskParts(ctx context.Context, backupName strin
return fmt.Errorf("%s disk doesn't present in diskTypes: %v", diskName, diskTypes)
}
if diskType == "s3" || diskType == "azure_blob_storage" {
if err = config.ValidateObjectDiskConfig(b.cfg); err != nil {
return err
}
if err = object_disk.InitCredentialsAndConnections(ctx, b.ch, b.cfg, diskName); err != nil {
return err
}
start := time.Now()
downloadObjectDiskPartsWorkingGroup, ctx := errgroup.WithContext(ctx)
downloadObjectDiskPartsWorkingGroup, downloadCtx := errgroup.WithContext(ctx)
downloadObjectDiskPartsWorkingGroup.SetLimit(int(b.cfg.General.DownloadConcurrency))
for _, part := range parts {
partPath := path.Join(diskMap[diskName], "backup", backupName, "shadow", dbAndTableDir, diskName, part.Name)
Expand Down Expand Up @@ -887,7 +896,7 @@ func (b *Backuper) downloadObjectDiskParts(ctx context.Context, backupName strin
} else {
return fmt.Errorf("incompatible object_disk[%s].Type=%s amd remote_storage: %s", diskName, diskType, b.cfg.General.RemoteStorage)
}
if copiedSize, copyObjectErr := object_disk.CopyObject(ctx, b.ch, b.cfg, diskName, storageObject.ObjectSize, srcBucket, srcKey, storageObject.ObjectRelativePath); copyObjectErr != nil {
if copiedSize, copyObjectErr := object_disk.CopyObject(downloadCtx, b.ch, b.cfg, diskName, storageObject.ObjectSize, srcBucket, srcKey, storageObject.ObjectRelativePath); copyObjectErr != nil {
return fmt.Errorf("object_disk.CopyObject error: %v", err)
} else {
atomic.AddInt64(&size, copiedSize)
Expand Down

0 comments on commit c031026

Please sign in to comment.